Migrate internal module imports to unified package path.
Replaced legacy `*.orly` module imports with `next.orly.dev/pkg` paths across the codebase for consistency. Removed legacy `go.mod` files from sub-packages, consolidating dependency management. Added Dockerfiles and configurations for benchmarking environments.
This commit is contained in:
@@ -7,12 +7,12 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"encoders.orly/event"
|
||||
"encoders.orly/kind"
|
||||
"encoders.orly/tag"
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/errorf"
|
||||
"utils.orly"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/kind"
|
||||
"next.orly.dev/pkg/encoders/tag"
|
||||
"next.orly.dev/pkg/utils"
|
||||
)
|
||||
|
||||
// GenerateChallenge creates a reasonable, 16-byte base64 challenge string
|
||||
|
||||
@@ -3,9 +3,9 @@ package auth
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"crypto.orly/p256k"
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/crypto/p256k"
|
||||
)
|
||||
|
||||
func TestCreateUnsigned(t *testing.T) {
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
module protocol.orly
|
||||
|
||||
go 1.25.0
|
||||
|
||||
require (
|
||||
crypto.orly v0.0.0-00010101000000-000000000000
|
||||
encoders.orly v0.0.0-00010101000000-000000000000
|
||||
interfaces.orly v0.0.0-00010101000000-000000000000
|
||||
lol.mleku.dev v1.0.2
|
||||
utils.orly v0.0.0-00010101000000-000000000000
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/fatih/color v1.18.0 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
||||
github.com/mattn/go-colorable v0.1.14 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/templexxx/cpu v0.0.1 // indirect
|
||||
github.com/templexxx/xhex v0.0.0-20200614015412-aed53437177b // indirect
|
||||
golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b // indirect
|
||||
golang.org/x/sys v0.35.0 // indirect
|
||||
)
|
||||
|
||||
replace (
|
||||
acl.orly => ../acl
|
||||
crypto.orly => ../crypto
|
||||
database.orly => ../database
|
||||
encoders.orly => ../encoders
|
||||
interfaces.orly => ../interfaces
|
||||
next.orly.dev => ../../
|
||||
protocol.orly => ../protocol
|
||||
utils.orly => ../utils
|
||||
)
|
||||
@@ -1,23 +0,0 @@
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
|
||||
github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
|
||||
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
||||
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
|
||||
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/templexxx/cpu v0.0.1 h1:hY4WdLOgKdc8y13EYklu9OUTXik80BkxHoWvTO6MQQY=
|
||||
github.com/templexxx/cpu v0.0.1/go.mod h1:w7Tb+7qgcAlIyX4NhLuDKt78AHA5SzPmq0Wj6HiEnnk=
|
||||
github.com/templexxx/xhex v0.0.0-20200614015412-aed53437177b h1:XeDLE6c9mzHpdv3Wb1+pWBaWv/BlHK0ZYIu/KaL6eHg=
|
||||
github.com/templexxx/xhex v0.0.0-20200614015412-aed53437177b/go.mod h1:7rwmCH0wC2fQvNEvPZ3sKXukhyCTyiaZ5VTZMQYpZKQ=
|
||||
golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b h1:DXr+pvt3nC887026GRP39Ej11UATqWDmWuS99x26cD0=
|
||||
golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b/go.mod h1:4QTo5u+SEIbbKW1RacMZq1YEfOBqeXa19JeshGi+zc4=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
|
||||
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
lol.mleku.dev v1.0.2 h1:bSV1hHnkmt1hq+9nSvRwN6wgcI7itbM3XRZ4dMB438c=
|
||||
lol.mleku.dev v1.0.2/go.mod h1:DQ0WnmkntA9dPLCXgvtIgYt5G0HSqx3wSTLolHgWeLA=
|
||||
lukechampine.com/frand v1.5.1 h1:fg0eRtdmGFIxhP5zQJzM1lFDbD6CUfu/f+7WgAZd5/w=
|
||||
lukechampine.com/frand v1.5.1/go.mod h1:4VstaWc2plN4Mjr10chUD46RAVGWhpkZ5Nja8+Azp0Q=
|
||||
@@ -1,9 +1,9 @@
|
||||
package publish
|
||||
|
||||
import (
|
||||
"encoders.orly/event"
|
||||
"interfaces.orly/publisher"
|
||||
"interfaces.orly/typer"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/interfaces/publisher"
|
||||
"next.orly.dev/pkg/interfaces/typer"
|
||||
)
|
||||
|
||||
// S is the control structure for the subscription management scheme.
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/errorf"
|
||||
"utils.orly/normalize"
|
||||
"next.orly.dev/pkg/utils/normalize"
|
||||
)
|
||||
|
||||
// Fetch fetches the NIP-11 Info.
|
||||
|
||||
@@ -7,11 +7,11 @@ import (
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"encoders.orly/kind"
|
||||
"encoders.orly/timestamp"
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
"utils.orly/number"
|
||||
"next.orly.dev/pkg/encoders/kind"
|
||||
"next.orly.dev/pkg/encoders/timestamp"
|
||||
"next.orly.dev/pkg/utils/number"
|
||||
)
|
||||
|
||||
// NIP is a number and description of a nostr "improvement" possibility.
|
||||
|
||||
629
pkg/protocol/ws/client.go
Normal file
629
pkg/protocol/ws/client.go
Normal file
@@ -0,0 +1,629 @@
|
||||
package ws
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/encoders/envelopes"
|
||||
"next.orly.dev/pkg/encoders/envelopes/authenvelope"
|
||||
"next.orly.dev/pkg/encoders/envelopes/closedenvelope"
|
||||
"next.orly.dev/pkg/encoders/envelopes/eoseenvelope"
|
||||
"next.orly.dev/pkg/encoders/envelopes/eventenvelope"
|
||||
"next.orly.dev/pkg/encoders/envelopes/noticeenvelope"
|
||||
"next.orly.dev/pkg/encoders/envelopes/okenvelope"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/filter"
|
||||
"next.orly.dev/pkg/encoders/hex"
|
||||
"next.orly.dev/pkg/encoders/kind"
|
||||
"next.orly.dev/pkg/encoders/tag"
|
||||
"next.orly.dev/pkg/interfaces/codec"
|
||||
"next.orly.dev/pkg/interfaces/signer"
|
||||
"next.orly.dev/pkg/utils/normalize"
|
||||
)
|
||||
|
||||
var subscriptionIDCounter atomic.Int64
|
||||
|
||||
// Client represents a connection to a Nostr relay.
|
||||
type Client struct {
|
||||
closeMutex sync.Mutex
|
||||
|
||||
URL string
|
||||
requestHeader http.Header // e.g. for origin header
|
||||
|
||||
Connection *Connection
|
||||
Subscriptions *xsync.MapOf[string, *Subscription]
|
||||
|
||||
ConnectionError error
|
||||
connectionContext context.Context // will be canceled when the connection closes
|
||||
connectionContextCancel context.CancelCauseFunc
|
||||
|
||||
challenge []byte // NIP-42 challenge, we only keep the last
|
||||
notices chan []byte // NIP-01 NOTICEs
|
||||
customHandler func(string) // nonstandard unparseable messages
|
||||
okCallbacks *xsync.MapOf[string, func(bool, string)]
|
||||
writeQueue chan writeRequest
|
||||
subscriptionChannelCloseQueue chan []byte
|
||||
|
||||
// custom things that aren't often used
|
||||
//
|
||||
AssumeValid bool // this will skip verifying signatures for events received from this relay
|
||||
}
|
||||
|
||||
type writeRequest struct {
|
||||
msg []byte
|
||||
answer chan error
|
||||
}
|
||||
|
||||
// NewRelay returns a new relay. It takes a context that, when canceled, will close the relay connection.
|
||||
func NewRelay(ctx context.Context, url string, opts ...RelayOption) *Client {
|
||||
ctx, cancel := context.WithCancelCause(ctx)
|
||||
r := &Client{
|
||||
URL: string(normalize.URL(url)),
|
||||
connectionContext: ctx,
|
||||
connectionContextCancel: cancel,
|
||||
Subscriptions: xsync.NewMapOf[string, *Subscription](),
|
||||
okCallbacks: xsync.NewMapOf[string, func(
|
||||
bool, string,
|
||||
)](),
|
||||
writeQueue: make(chan writeRequest),
|
||||
subscriptionChannelCloseQueue: make(chan []byte),
|
||||
requestHeader: nil,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt.ApplyRelayOption(r)
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
// RelayConnect returns a relay object connected to url.
|
||||
//
|
||||
// The given subscription is only used during the connection phase. Once successfully connected, cancelling ctx has no effect.
|
||||
//
|
||||
// The ongoing relay connection uses a background context. To close the connection, call r.Close().
|
||||
// If you need fine grained long-term connection contexts, use NewRelay() instead.
|
||||
func RelayConnect(ctx context.Context, url string, opts ...RelayOption) (
|
||||
*Client, error,
|
||||
) {
|
||||
r := NewRelay(context.Background(), url, opts...)
|
||||
err := r.Connect(ctx)
|
||||
return r, err
|
||||
}
|
||||
|
||||
// RelayOption is the type of the argument passed when instantiating relay connections.
|
||||
type RelayOption interface {
|
||||
ApplyRelayOption(*Client)
|
||||
}
|
||||
|
||||
var (
|
||||
_ RelayOption = (WithCustomHandler)(nil)
|
||||
_ RelayOption = (WithRequestHeader)(nil)
|
||||
)
|
||||
|
||||
// WithCustomHandler must be a function that handles any relay message that couldn't be
|
||||
// parsed as a standard envelope.
|
||||
type WithCustomHandler func(data string)
|
||||
|
||||
func (ch WithCustomHandler) ApplyRelayOption(r *Client) {
|
||||
r.customHandler = ch
|
||||
}
|
||||
|
||||
// WithRequestHeader sets the HTTP request header of the websocket preflight request.
|
||||
type WithRequestHeader http.Header
|
||||
|
||||
func (ch WithRequestHeader) ApplyRelayOption(r *Client) {
|
||||
r.requestHeader = http.Header(ch)
|
||||
}
|
||||
|
||||
// String just returns the relay URL.
|
||||
func (r *Client) String() string {
|
||||
return r.URL
|
||||
}
|
||||
|
||||
// Context retrieves the context that is associated with this relay connection.
|
||||
// It will be closed when the relay is disconnected.
|
||||
func (r *Client) Context() context.Context { return r.connectionContext }
|
||||
|
||||
// IsConnected returns true if the connection to this relay seems to be active.
|
||||
func (r *Client) IsConnected() bool { return r.connectionContext.Err() == nil }
|
||||
|
||||
// Connect tries to establish a websocket connection to r.URL.
|
||||
// If the context expires before the connection is complete, an error is returned.
|
||||
// Once successfully connected, context expiration has no effect: call r.Close
|
||||
// to close the connection.
|
||||
//
|
||||
// The given context here is only used during the connection phase. The long-living
|
||||
// relay connection will be based on the context given to NewRelay().
|
||||
func (r *Client) Connect(ctx context.Context) error {
|
||||
return r.ConnectWithTLS(ctx, nil)
|
||||
}
|
||||
|
||||
func extractSubID(jsonStr string) string {
|
||||
// look for "EVENT" pattern
|
||||
start := strings.Index(jsonStr, `"EVENT"`)
|
||||
if start == -1 {
|
||||
return ""
|
||||
}
|
||||
|
||||
// move to the next quote
|
||||
offset := strings.Index(jsonStr[start+7:], `"`)
|
||||
if offset == -1 {
|
||||
return ""
|
||||
}
|
||||
|
||||
start += 7 + offset + 1
|
||||
|
||||
// find the ending quote
|
||||
end := strings.Index(jsonStr[start:], `"`)
|
||||
|
||||
// get the contents
|
||||
return jsonStr[start : start+end]
|
||||
}
|
||||
|
||||
func subIdToSerial(subId string) int64 {
|
||||
n := strings.Index(subId, ":")
|
||||
if n < 0 || n > len(subId) {
|
||||
return -1
|
||||
}
|
||||
serialId, _ := strconv.ParseInt(subId[0:n], 10, 64)
|
||||
return serialId
|
||||
}
|
||||
|
||||
// ConnectWithTLS is like Connect(), but takes a special tls.Config if you need that.
|
||||
func (r *Client) ConnectWithTLS(
|
||||
ctx context.Context, tlsConfig *tls.Config,
|
||||
) error {
|
||||
if r.connectionContext == nil || r.Subscriptions == nil {
|
||||
return fmt.Errorf("relay must be initialized with a call to NewRelay()")
|
||||
}
|
||||
|
||||
if r.URL == "" {
|
||||
return fmt.Errorf("invalid relay URL '%s'", r.URL)
|
||||
}
|
||||
|
||||
if _, ok := ctx.Deadline(); !ok {
|
||||
// if no timeout is set, force it to 7 seconds
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeoutCause(
|
||||
ctx, 7*time.Second, errors.New("connection took too long"),
|
||||
)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
conn, err := NewConnection(ctx, r.URL, r.requestHeader, tlsConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening websocket to '%s': %w", r.URL, err)
|
||||
}
|
||||
r.Connection = conn
|
||||
|
||||
// ping every 29 seconds
|
||||
ticker := time.NewTicker(29 * time.Second)
|
||||
|
||||
// queue all write operations here so we don't do mutex spaghetti
|
||||
go func() {
|
||||
var err error
|
||||
for {
|
||||
select {
|
||||
case <-r.connectionContext.Done():
|
||||
ticker.Stop()
|
||||
r.Connection = nil
|
||||
|
||||
for _, sub := range r.Subscriptions.Range {
|
||||
sub.unsub(
|
||||
fmt.Errorf(
|
||||
"relay connection closed: %w / %w",
|
||||
context.Cause(r.connectionContext),
|
||||
r.ConnectionError,
|
||||
),
|
||||
)
|
||||
}
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
err = r.Connection.Ping(r.connectionContext)
|
||||
if err != nil && !strings.Contains(
|
||||
err.Error(), "failed to wait for pong",
|
||||
) {
|
||||
log.I.F(
|
||||
"{%s} error writing ping: %v; closing websocket", r.URL,
|
||||
err,
|
||||
)
|
||||
r.Close() // this should trigger a context cancelation
|
||||
return
|
||||
}
|
||||
|
||||
case wr := <-r.writeQueue:
|
||||
// all write requests will go through this to prevent races
|
||||
log.D.F("{%s} sending %v\n", r.URL, string(wr.msg))
|
||||
if err = r.Connection.WriteMessage(
|
||||
r.connectionContext, wr.msg,
|
||||
); err != nil {
|
||||
wr.answer <- err
|
||||
}
|
||||
close(wr.answer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// general message reader loop
|
||||
go func() {
|
||||
|
||||
for {
|
||||
buf := new(bytes.Buffer)
|
||||
for {
|
||||
buf.Reset()
|
||||
if err := conn.ReadMessage(
|
||||
r.connectionContext, buf,
|
||||
); err != nil {
|
||||
r.ConnectionError = err
|
||||
r.Close()
|
||||
return
|
||||
}
|
||||
message := buf.Bytes()
|
||||
var t string
|
||||
if t, message, err = envelopes.Identify(message); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
switch t {
|
||||
case noticeenvelope.L:
|
||||
env := noticeenvelope.New()
|
||||
if env, message, err = noticeenvelope.Parse(message); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
// see WithNoticeHandler
|
||||
if r.notices != nil {
|
||||
r.notices <- env.Message
|
||||
} else {
|
||||
log.E.F("NOTICE from %s: '%s'\n", r.URL, env.Message)
|
||||
}
|
||||
case authenvelope.L:
|
||||
env := authenvelope.NewChallenge()
|
||||
if env, message, err = authenvelope.ParseChallenge(message); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
if len(env.Challenge) == 0 {
|
||||
continue
|
||||
}
|
||||
r.challenge = env.Challenge
|
||||
case eventenvelope.L:
|
||||
env := eventenvelope.NewResult()
|
||||
if env, message, err = eventenvelope.ParseResult(message); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
if len(env.Subscription) == 0 {
|
||||
continue
|
||||
}
|
||||
if sub, ok := r.Subscriptions.Load(string(env.Subscription)); !ok {
|
||||
log.D.F(
|
||||
"{%s} no subscription with id '%s'\n", r.URL,
|
||||
env.Subscription,
|
||||
)
|
||||
continue
|
||||
} else {
|
||||
// check if the event matches the desired filter, ignore otherwise
|
||||
if !sub.Filters.Match(env.Event) {
|
||||
// log.D.F(
|
||||
// "{%s} filter does not match: %v ~ %v\n", r.URL,
|
||||
// sub.Filters, env.Event,
|
||||
// )
|
||||
continue
|
||||
}
|
||||
// check signature, ignore invalid, except from trusted (AssumeValid) relays
|
||||
if !r.AssumeValid {
|
||||
if ok, err = env.Event.Verify(); !ok {
|
||||
log.E.F(
|
||||
"{%s} bad signature on %s\n", r.URL,
|
||||
env.Event,
|
||||
)
|
||||
continue
|
||||
}
|
||||
}
|
||||
// dispatch this to the internal .events channel of the subscription
|
||||
sub.dispatchEvent(env.Event)
|
||||
}
|
||||
case eoseenvelope.L:
|
||||
env := eoseenvelope.New()
|
||||
if env, message, err = eoseenvelope.Parse(message); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
if subscription, ok := r.Subscriptions.Load(string(env.Subscription)); ok {
|
||||
subscription.dispatchEose()
|
||||
}
|
||||
case closedenvelope.L:
|
||||
env := closedenvelope.New()
|
||||
if env, message, err = closedenvelope.Parse(message); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
if subscription, ok := r.Subscriptions.Load(string(env.Subscription)); ok {
|
||||
subscription.handleClosed(env.ReasonString())
|
||||
}
|
||||
case okenvelope.L:
|
||||
env := okenvelope.New()
|
||||
if env, message, err = okenvelope.Parse(message); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
if okCallback, exist := r.okCallbacks.Load(string(env.EventID)); exist {
|
||||
okCallback(env.OK, env.ReasonString())
|
||||
} else {
|
||||
log.I.F(
|
||||
"{%s} got an unexpected OK message for event %s",
|
||||
r.URL,
|
||||
env.EventID,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write queues an arbitrary message to be sent to the relay.
|
||||
func (r *Client) Write(msg []byte) <-chan error {
|
||||
ch := make(chan error)
|
||||
select {
|
||||
case r.writeQueue <- writeRequest{msg: msg, answer: ch}:
|
||||
case <-r.connectionContext.Done():
|
||||
go func() { ch <- fmt.Errorf("connection closed") }()
|
||||
}
|
||||
return ch
|
||||
}
|
||||
|
||||
// Publish sends an "EVENT" command to the relay r as in NIP-01 and waits for an OK response.
|
||||
func (r *Client) Publish(ctx context.Context, ev *event.E) error {
|
||||
return r.publish(
|
||||
ctx, hex.Enc(ev.ID), eventenvelope.NewSubmissionWith(ev),
|
||||
)
|
||||
}
|
||||
|
||||
// Auth sends an "AUTH" command client->relay as in NIP-42 and waits for an OK response.
|
||||
//
|
||||
// You don't have to build the AUTH event yourself, this function takes a function to which the
|
||||
// event that must be signed will be passed, so it's only necessary to sign that.
|
||||
func (r *Client) Auth(
|
||||
ctx context.Context, sign signer.I,
|
||||
) (err error) {
|
||||
authEvent := &event.E{
|
||||
CreatedAt: time.Now().Unix(),
|
||||
Kind: kind.ClientAuthentication.K,
|
||||
Tags: tag.NewS(
|
||||
tag.NewFromAny("relay", r.URL),
|
||||
tag.NewFromAny("challenge", r.challenge),
|
||||
),
|
||||
Content: nil,
|
||||
}
|
||||
if err = authEvent.Sign(sign); err != nil {
|
||||
return fmt.Errorf("error signing auth event: %w", err)
|
||||
}
|
||||
|
||||
return r.publish(
|
||||
ctx, hex.Enc(authEvent.ID), authenvelope.NewResponseWith(authEvent),
|
||||
)
|
||||
}
|
||||
|
||||
func (r *Client) publish(
|
||||
ctx context.Context, id string, env codec.Envelope,
|
||||
) error {
|
||||
var err error
|
||||
var cancel context.CancelFunc
|
||||
|
||||
if _, ok := ctx.Deadline(); !ok {
|
||||
// if no timeout is set, force it to 7 seconds
|
||||
ctx, cancel = context.WithTimeoutCause(
|
||||
ctx, 7*time.Second, fmt.Errorf("given up waiting for an OK"),
|
||||
)
|
||||
defer cancel()
|
||||
} else {
|
||||
// otherwise make the context cancellable so we can stop everything upon receiving an "OK"
|
||||
ctx, cancel = context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
// listen for an OK callback
|
||||
gotOk := false
|
||||
r.okCallbacks.Store(
|
||||
id, func(ok bool, reason string) {
|
||||
gotOk = true
|
||||
if !ok {
|
||||
err = fmt.Errorf("msg: %s", reason)
|
||||
}
|
||||
cancel()
|
||||
},
|
||||
)
|
||||
defer r.okCallbacks.Delete(id)
|
||||
|
||||
// publish event
|
||||
envb := env.Marshal(nil)
|
||||
if err = <-r.Write(envb); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// this will be called when we get an OK or when the context has been canceled
|
||||
if gotOk {
|
||||
return err
|
||||
}
|
||||
return ctx.Err()
|
||||
case <-r.connectionContext.Done():
|
||||
// this is caused when we lose connectivity
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe sends a "REQ" command to the relay r as in NIP-01.
|
||||
// Events are returned through the channel sub.Events.
|
||||
// The subscription is closed when context ctx is cancelled ("CLOSE" in NIP-01).
|
||||
//
|
||||
// Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.Context` will be canceled at some point.
|
||||
// Failure to do that will result in a huge number of halted goroutines being created.
|
||||
func (r *Client) Subscribe(
|
||||
ctx context.Context, ff *filter.S, opts ...SubscriptionOption,
|
||||
) (*Subscription, error) {
|
||||
sub := r.PrepareSubscription(ctx, ff, opts...)
|
||||
|
||||
if r.Connection == nil {
|
||||
return nil, fmt.Errorf("not connected to %s", r.URL)
|
||||
}
|
||||
|
||||
if err := sub.Fire(); err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"couldn't subscribe to %v at %s: %w", ff, r.URL, err,
|
||||
)
|
||||
}
|
||||
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
// PrepareSubscription creates a subscription, but doesn't fire it.
|
||||
//
|
||||
// Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.Context` will be canceled at some point.
|
||||
// Failure to do that will result in a huge number of halted goroutines being created.
|
||||
func (r *Client) PrepareSubscription(
|
||||
ctx context.Context, ff *filter.S, opts ...SubscriptionOption,
|
||||
) (sub *Subscription) {
|
||||
current := subscriptionIDCounter.Add(1)
|
||||
ctx, cancel := context.WithCancelCause(ctx)
|
||||
sub = &Subscription{
|
||||
Client: r,
|
||||
Context: ctx,
|
||||
cancel: cancel,
|
||||
counter: current,
|
||||
Events: make(event.C),
|
||||
EndOfStoredEvents: make(chan struct{}, 1),
|
||||
ClosedReason: make(chan string, 1),
|
||||
Filters: ff,
|
||||
match: ff.Match,
|
||||
}
|
||||
label := ""
|
||||
for _, opt := range opts {
|
||||
switch o := opt.(type) {
|
||||
case WithLabel:
|
||||
label = string(o)
|
||||
case WithCheckDuplicate:
|
||||
sub.checkDuplicate = o
|
||||
case WithCheckDuplicateReplaceable:
|
||||
sub.checkDuplicateReplaceable = o
|
||||
}
|
||||
}
|
||||
// subscription id computation
|
||||
buf := subIdPool.Get().([]byte)[:0]
|
||||
buf = strconv.AppendInt(buf, sub.counter, 10)
|
||||
buf = append(buf, ':')
|
||||
buf = append(buf, label...)
|
||||
defer subIdPool.Put(buf)
|
||||
sub.id = buf
|
||||
r.Subscriptions.Store(string(buf), sub)
|
||||
// start handling events, eose, unsub etc:
|
||||
go sub.start()
|
||||
return sub
|
||||
}
|
||||
|
||||
// QueryEvents subscribes to events matching the given filter and returns a channel of events.
|
||||
//
|
||||
// In most cases it's better to use SimplePool instead of this method.
|
||||
func (r *Client) QueryEvents(
|
||||
ctx context.Context, f *filter.F, opts ...SubscriptionOption,
|
||||
) (
|
||||
evc event.C, err error,
|
||||
) {
|
||||
var sub *Subscription
|
||||
if sub, err = r.Subscribe(ctx, filter.NewS(f), opts...); chk.E(err) {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-sub.ClosedReason:
|
||||
case <-sub.EndOfStoredEvents:
|
||||
case <-ctx.Done():
|
||||
case <-r.Context().Done():
|
||||
}
|
||||
sub.unsub(errors.New("QueryEvents() ended"))
|
||||
return
|
||||
}
|
||||
}()
|
||||
evc = sub.Events
|
||||
return
|
||||
}
|
||||
|
||||
// QuerySync subscribes to events matching the given filter and returns a slice of events.
|
||||
// This method blocks until all events are received or the context is canceled.
|
||||
//
|
||||
// In most cases it's better to use SimplePool instead of this method.
|
||||
func (r *Client) QuerySync(
|
||||
ctx context.Context, ff *filter.F, opts ...SubscriptionOption,
|
||||
) (
|
||||
[]*event.E, error,
|
||||
) {
|
||||
if _, ok := ctx.Deadline(); !ok {
|
||||
// if no timeout is set, force it to 7 seconds
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeoutCause(
|
||||
ctx, 7*time.Second, errors.New("QuerySync() took too long"),
|
||||
)
|
||||
defer cancel()
|
||||
}
|
||||
var lim int
|
||||
if ff.Limit != nil {
|
||||
lim = int(*ff.Limit)
|
||||
}
|
||||
events := make([]*event.E, 0, max(lim, 250))
|
||||
ch, err := r.QueryEvents(ctx, ff, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for evt := range ch {
|
||||
events = append(events, evt)
|
||||
}
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
||||
// Close closes the relay connection.
|
||||
func (r *Client) Close() error {
|
||||
return r.close(errors.New("Close() called"))
|
||||
}
|
||||
|
||||
func (r *Client) close(reason error) error {
|
||||
r.closeMutex.Lock()
|
||||
defer r.closeMutex.Unlock()
|
||||
|
||||
if r.connectionContextCancel == nil {
|
||||
return fmt.Errorf("relay already closed")
|
||||
}
|
||||
r.connectionContextCancel(reason)
|
||||
r.connectionContextCancel = nil
|
||||
|
||||
if r.Connection == nil {
|
||||
return fmt.Errorf("relay not connected")
|
||||
}
|
||||
|
||||
err := r.Connection.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var subIdPool = sync.Pool{
|
||||
New: func() any { return make([]byte, 0, 15) },
|
||||
}
|
||||
277
pkg/protocol/ws/client_test.go
Normal file
277
pkg/protocol/ws/client_test.go
Normal file
@@ -0,0 +1,277 @@
|
||||
//go:build !js
|
||||
|
||||
package ws
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/websocket"
|
||||
"lol.mleku.dev/chk"
|
||||
"next.orly.dev/pkg/crypto/p256k"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/filter"
|
||||
"next.orly.dev/pkg/encoders/hex"
|
||||
"next.orly.dev/pkg/encoders/kind"
|
||||
"next.orly.dev/pkg/encoders/tag"
|
||||
"next.orly.dev/pkg/utils"
|
||||
"next.orly.dev/pkg/utils/normalize"
|
||||
)
|
||||
|
||||
func TestPublish(t *testing.T) {
|
||||
// test note to be sent over websocket
|
||||
priv, pub := makeKeyPair(t)
|
||||
textNote := &event.E{
|
||||
Kind: kind.TextNote.K,
|
||||
Content: []byte("hello"),
|
||||
CreatedAt: 1672068534, // random fixed timestamp
|
||||
Tags: tag.NewS(tag.NewFromAny("foo", "bar")),
|
||||
Pubkey: pub,
|
||||
}
|
||||
sign := &p256k.Signer{}
|
||||
var err error
|
||||
if err = sign.InitSec(priv); chk.E(err) {
|
||||
}
|
||||
err = textNote.Sign(sign)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// fake relay server
|
||||
var mu sync.Mutex // guards published to satisfy go test -race
|
||||
var published bool
|
||||
ws := newWebsocketServer(
|
||||
func(conn *websocket.Conn) {
|
||||
mu.Lock()
|
||||
published = true
|
||||
mu.Unlock()
|
||||
// verify the client sent exactly the textNote
|
||||
var raw []json.RawMessage
|
||||
err := websocket.JSON.Receive(conn, &raw)
|
||||
assert.NoError(t, err)
|
||||
|
||||
event := parseEventMessage(t, raw)
|
||||
assert.True(
|
||||
t, utils.FastEqual(event.Serialize(), textNote.Serialize()),
|
||||
)
|
||||
|
||||
// send back an ok nip-20 command result
|
||||
res := []any{"OK", hex.Enc(textNote.ID), true, ""}
|
||||
err = websocket.JSON.Send(conn, res)
|
||||
assert.NoError(t, err)
|
||||
},
|
||||
)
|
||||
defer ws.Close()
|
||||
|
||||
// connect a client and send the text note
|
||||
rl := mustRelayConnect(t, ws.URL)
|
||||
err = rl.Publish(context.Background(), textNote)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.True(t, published, "fake relay server saw no event")
|
||||
}
|
||||
|
||||
// func TestPublishBlocked(t *testing.T) {
|
||||
// // test note to be sent over websocket
|
||||
// textNote := &event.E{
|
||||
// Kind: kind.TextNote, Content: []byte("hello"),
|
||||
// CreatedAt: timestamp.Now(),
|
||||
// }
|
||||
// textNote.ID = textNote.GetIDBytes()
|
||||
//
|
||||
// // fake relay server
|
||||
// ws := newWebsocketServer(
|
||||
// func(conn *websocket.Conn) {
|
||||
// // discard received message; not interested
|
||||
// var raw []json.RawMessage
|
||||
// err := websocket.JSON.Receive(conn, &raw)
|
||||
// assert.NoError(t, err)
|
||||
//
|
||||
// // send back a not ok nip-20 command result
|
||||
// res := []any{"OK", textNote.IdString(), false, "blocked"}
|
||||
// websocket.JSON.Send(conn, res)
|
||||
// },
|
||||
// )
|
||||
// defer ws.Close()
|
||||
//
|
||||
// // connect a client and send a text note
|
||||
// rl := mustRelayConnect(t, ws.URL)
|
||||
// err := rl.Publish(context.Background(), textNote)
|
||||
// assert.Error(t, err)
|
||||
// }
|
||||
|
||||
func TestPublishWriteFailed(t *testing.T) {
|
||||
// test note to be sent over websocket
|
||||
textNote := &event.E{
|
||||
Kind: kind.TextNote.K,
|
||||
Content: []byte("hello"),
|
||||
CreatedAt: time.Now().Unix(),
|
||||
}
|
||||
textNote.ID = textNote.GetIDBytes()
|
||||
// fake relay server
|
||||
ws := newWebsocketServer(
|
||||
func(conn *websocket.Conn) {
|
||||
// reject receive - force send error
|
||||
conn.Close()
|
||||
},
|
||||
)
|
||||
defer ws.Close()
|
||||
// connect a client and send a text note
|
||||
rl := mustRelayConnect(t, ws.URL)
|
||||
// Force brief period of time so that publish always fails on closed socket.
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
err := rl.Publish(context.Background(), textNote)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestConnectContext(t *testing.T) {
|
||||
// fake relay server
|
||||
var mu sync.Mutex // guards connected to satisfy go test -race
|
||||
var connected bool
|
||||
ws := newWebsocketServer(
|
||||
func(conn *websocket.Conn) {
|
||||
mu.Lock()
|
||||
connected = true
|
||||
mu.Unlock()
|
||||
io.ReadAll(conn) // discard all input
|
||||
},
|
||||
)
|
||||
defer ws.Close()
|
||||
|
||||
// relay client
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
r, err := RelayConnect(ctx, ws.URL)
|
||||
assert.NoError(t, err)
|
||||
|
||||
defer r.Close()
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
assert.True(t, connected, "fake relay server saw no client connect")
|
||||
}
|
||||
|
||||
func TestConnectContextCanceled(t *testing.T) {
|
||||
// fake relay server
|
||||
ws := newWebsocketServer(discardingHandler)
|
||||
defer ws.Close()
|
||||
|
||||
// relay client
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel() // make ctx expired
|
||||
_, err := RelayConnect(ctx, ws.URL)
|
||||
assert.ErrorIs(t, err, context.Canceled)
|
||||
}
|
||||
|
||||
func TestConnectWithOrigin(t *testing.T) {
|
||||
// fake relay server
|
||||
// default handler requires origin golang.org/x/net/websocket
|
||||
ws := httptest.NewServer(websocket.Handler(discardingHandler))
|
||||
defer ws.Close()
|
||||
|
||||
// relay client
|
||||
r := NewRelay(
|
||||
context.Background(), string(normalize.URL(ws.URL)),
|
||||
WithRequestHeader(http.Header{"origin": {"https://example.com"}}),
|
||||
)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
err := r.Connect(ctx)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func discardingHandler(conn *websocket.Conn) {
|
||||
io.ReadAll(conn) // discard all input
|
||||
}
|
||||
|
||||
func newWebsocketServer(handler func(*websocket.Conn)) *httptest.Server {
|
||||
return httptest.NewServer(
|
||||
&websocket.Server{
|
||||
Handshake: anyOriginHandshake,
|
||||
Handler: handler,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// anyOriginHandshake is an alternative to default in golang.org/x/net/websocket
|
||||
// which checks for origin. nostr client sends no origin and it makes no difference
|
||||
// for the tests here anyway.
|
||||
var anyOriginHandshake = func(conf *websocket.Config, r *http.Request) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeKeyPair(t *testing.T) (sec, pub []byte) {
|
||||
t.Helper()
|
||||
sign := &p256k.Signer{}
|
||||
var err error
|
||||
if err = sign.Generate(); chk.E(err) {
|
||||
return
|
||||
}
|
||||
sec = sign.Sec()
|
||||
pub = sign.Pub()
|
||||
assert.NoError(t, err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func mustRelayConnect(t *testing.T, url string) *Client {
|
||||
t.Helper()
|
||||
|
||||
rl, err := RelayConnect(context.Background(), url)
|
||||
require.NoError(t, err)
|
||||
|
||||
return rl
|
||||
}
|
||||
|
||||
func parseEventMessage(t *testing.T, raw []json.RawMessage) *event.E {
|
||||
t.Helper()
|
||||
|
||||
assert.Condition(
|
||||
t, func() (success bool) {
|
||||
return len(raw) >= 2
|
||||
},
|
||||
)
|
||||
|
||||
var typ string
|
||||
err := json.Unmarshal(raw[0], &typ)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "EVENT", typ)
|
||||
|
||||
event := &event.E{}
|
||||
_, err = event.Unmarshal(raw[1])
|
||||
require.NoError(t, err)
|
||||
|
||||
return event
|
||||
}
|
||||
|
||||
func parseSubscriptionMessage(
|
||||
t *testing.T, raw []json.RawMessage,
|
||||
) (subid string, ff *filter.S) {
|
||||
t.Helper()
|
||||
|
||||
assert.Greater(t, len(raw), 3)
|
||||
|
||||
var typ string
|
||||
err := json.Unmarshal(raw[0], &typ)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "REQ", typ)
|
||||
|
||||
var id string
|
||||
err = json.Unmarshal(raw[1], &id)
|
||||
assert.NoError(t, err)
|
||||
ff = &filter.S{}
|
||||
for _, b := range raw[2:] {
|
||||
var f *filter.F
|
||||
err = json.Unmarshal(b, &f)
|
||||
assert.NoError(t, err)
|
||||
*ff = append(*ff, f)
|
||||
}
|
||||
return id, ff
|
||||
}
|
||||
79
pkg/protocol/ws/connection.go
Normal file
79
pkg/protocol/ws/connection.go
Normal file
@@ -0,0 +1,79 @@
|
||||
package ws
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"lol.mleku.dev/errorf"
|
||||
"next.orly.dev/pkg/utils/units"
|
||||
|
||||
ws "github.com/coder/websocket"
|
||||
)
|
||||
|
||||
// Connection represents a websocket connection to a Nostr relay.
|
||||
type Connection struct {
|
||||
conn *ws.Conn
|
||||
}
|
||||
|
||||
// NewConnection creates a new websocket connection to a Nostr relay.
|
||||
func NewConnection(
|
||||
ctx context.Context, url string, reqHeader http.Header,
|
||||
tlsConfig *tls.Config,
|
||||
) (c *Connection, err error) {
|
||||
var conn *ws.Conn
|
||||
if conn, _, err = ws.Dial(
|
||||
ctx, url, getConnectionOptions(reqHeader, tlsConfig),
|
||||
); err != nil {
|
||||
return
|
||||
}
|
||||
conn.SetReadLimit(33 * units.Mb)
|
||||
return &Connection{
|
||||
conn: conn,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// WriteMessage writes arbitrary bytes to the websocket connection.
|
||||
func (c *Connection) WriteMessage(
|
||||
ctx context.Context, data []byte,
|
||||
) (err error) {
|
||||
if err = c.conn.Write(ctx, ws.MessageText, data); err != nil {
|
||||
err = errorf.E("failed to write message: %w", err)
|
||||
return
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadMessage reads arbitrary bytes from the websocket connection into the provided buffer.
|
||||
func (c *Connection) ReadMessage(
|
||||
ctx context.Context, buf io.Writer,
|
||||
) (err error) {
|
||||
var reader io.Reader
|
||||
if _, reader, err = c.conn.Reader(ctx); err != nil {
|
||||
err = fmt.Errorf("failed to get reader: %w", err)
|
||||
return
|
||||
}
|
||||
if _, err = io.Copy(buf, reader); err != nil {
|
||||
err = fmt.Errorf("failed to read message: %w", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Close closes the websocket connection.
|
||||
func (c *Connection) Close() error {
|
||||
return c.conn.Close(ws.StatusNormalClosure, "")
|
||||
}
|
||||
|
||||
// Ping sends a ping message to the websocket connection.
|
||||
func (c *Connection) Ping(ctx context.Context) error {
|
||||
ctx, cancel := context.WithTimeoutCause(
|
||||
ctx, time.Millisecond*800, errors.New("ping took too long"),
|
||||
)
|
||||
defer cancel()
|
||||
return c.conn.Ping(ctx)
|
||||
}
|
||||
36
pkg/protocol/ws/connection_options.go
Normal file
36
pkg/protocol/ws/connection_options.go
Normal file
@@ -0,0 +1,36 @@
|
||||
//go:build !js
|
||||
|
||||
package ws
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"net/http"
|
||||
"net/textproto"
|
||||
|
||||
ws "github.com/coder/websocket"
|
||||
)
|
||||
|
||||
var defaultConnectionOptions = &ws.DialOptions{
|
||||
CompressionMode: ws.CompressionContextTakeover,
|
||||
HTTPHeader: http.Header{
|
||||
textproto.CanonicalMIMEHeaderKey("User-Agent"): {"github.com/nbd-wtf/go-nostr"},
|
||||
},
|
||||
}
|
||||
|
||||
func getConnectionOptions(
|
||||
requestHeader http.Header, tlsConfig *tls.Config,
|
||||
) *ws.DialOptions {
|
||||
if requestHeader == nil && tlsConfig == nil {
|
||||
return defaultConnectionOptions
|
||||
}
|
||||
|
||||
return &ws.DialOptions{
|
||||
HTTPHeader: requestHeader,
|
||||
CompressionMode: ws.CompressionContextTakeover,
|
||||
HTTPClient: &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: tlsConfig,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
189
pkg/protocol/ws/subscription.go
Normal file
189
pkg/protocol/ws/subscription.go
Normal file
@@ -0,0 +1,189 @@
|
||||
package ws
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"next.orly.dev/pkg/encoders/envelopes/closeenvelope"
|
||||
"next.orly.dev/pkg/encoders/envelopes/reqenvelope"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/filter"
|
||||
"next.orly.dev/pkg/encoders/timestamp"
|
||||
)
|
||||
|
||||
type ReplaceableKey struct {
|
||||
PubKey string
|
||||
D string
|
||||
}
|
||||
|
||||
// Subscription represents a subscription to a relay.
|
||||
type Subscription struct {
|
||||
counter int64
|
||||
id []byte
|
||||
|
||||
Client *Client
|
||||
Filters *filter.S
|
||||
|
||||
// the Events channel emits all EVENTs that come in a Subscription
|
||||
// will be closed when the subscription ends
|
||||
Events chan *event.E
|
||||
mu sync.Mutex
|
||||
|
||||
// the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription
|
||||
EndOfStoredEvents chan struct{}
|
||||
|
||||
// the ClosedReason channel emits the reason when a CLOSED message is received
|
||||
ClosedReason chan string
|
||||
|
||||
// Context will be .Done() when the subscription ends
|
||||
Context context.Context
|
||||
|
||||
// if it is not nil, checkDuplicate will be called for every event received
|
||||
// if it returns true that event will not be processed further.
|
||||
checkDuplicate func(id string, relay string) bool
|
||||
|
||||
// if it is not nil, checkDuplicateReplaceable will be called for every event received
|
||||
// if it returns true that event will not be processed further.
|
||||
checkDuplicateReplaceable func(rk ReplaceableKey, ts *timestamp.T) bool
|
||||
|
||||
match func(*event.E) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints
|
||||
live atomic.Bool
|
||||
eosed atomic.Bool
|
||||
cancel context.CancelCauseFunc
|
||||
|
||||
// this keeps track of the events we've received before the EOSE that we must dispatch before
|
||||
// closing the EndOfStoredEvents channel
|
||||
storedwg sync.WaitGroup
|
||||
}
|
||||
|
||||
// SubscriptionOption is the type of the argument passed when instantiating relay connections.
|
||||
// Some examples are WithLabel.
|
||||
type SubscriptionOption interface {
|
||||
IsSubscriptionOption()
|
||||
}
|
||||
|
||||
// WithLabel puts a label on the subscription (it is prepended to the automatic id) that is sent to relays.
|
||||
type WithLabel string
|
||||
|
||||
func (_ WithLabel) IsSubscriptionOption() {}
|
||||
|
||||
// WithCheckDuplicate sets checkDuplicate on the subscription
|
||||
type WithCheckDuplicate func(id, relay string) bool
|
||||
|
||||
func (_ WithCheckDuplicate) IsSubscriptionOption() {}
|
||||
|
||||
// WithCheckDuplicateReplaceable sets checkDuplicateReplaceable on the subscription
|
||||
type WithCheckDuplicateReplaceable func(rk ReplaceableKey, ts *timestamp.T) bool
|
||||
|
||||
func (_ WithCheckDuplicateReplaceable) IsSubscriptionOption() {}
|
||||
|
||||
var (
|
||||
_ SubscriptionOption = (WithLabel)("")
|
||||
_ SubscriptionOption = (WithCheckDuplicate)(nil)
|
||||
_ SubscriptionOption = (WithCheckDuplicateReplaceable)(nil)
|
||||
)
|
||||
|
||||
func (sub *Subscription) start() {
|
||||
<-sub.Context.Done()
|
||||
// the subscription ends once the context is canceled (if not already)
|
||||
sub.unsub(errors.New("context done on start()")) // this will set sub.live to false
|
||||
// do this so we don't have the possibility of closing the Events channel and then trying to send to it
|
||||
sub.mu.Lock()
|
||||
close(sub.Events)
|
||||
sub.mu.Unlock()
|
||||
}
|
||||
|
||||
// GetID returns the subscription ID.
|
||||
func (sub *Subscription) GetID() string { return string(sub.id) }
|
||||
|
||||
func (sub *Subscription) dispatchEvent(evt *event.E) {
|
||||
added := false
|
||||
if !sub.eosed.Load() {
|
||||
sub.storedwg.Add(1)
|
||||
added = true
|
||||
}
|
||||
go func() {
|
||||
sub.mu.Lock()
|
||||
defer sub.mu.Unlock()
|
||||
|
||||
if sub.live.Load() {
|
||||
select {
|
||||
case sub.Events <- evt:
|
||||
case <-sub.Context.Done():
|
||||
}
|
||||
}
|
||||
if added {
|
||||
sub.storedwg.Done()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (sub *Subscription) dispatchEose() {
|
||||
if sub.eosed.CompareAndSwap(false, true) {
|
||||
sub.match = sub.Filters.MatchIgnoringTimestampConstraints
|
||||
go func() {
|
||||
sub.storedwg.Wait()
|
||||
sub.EndOfStoredEvents <- struct{}{}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// handleClosed handles the CLOSED message from a relay.
|
||||
func (sub *Subscription) handleClosed(reason string) {
|
||||
go func() {
|
||||
sub.ClosedReason <- reason
|
||||
sub.live.Store(false) // set this so we don't send an unnecessary CLOSE to the relay
|
||||
sub.unsub(fmt.Errorf("CLOSED received: %s", reason))
|
||||
}()
|
||||
}
|
||||
|
||||
// Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01.
|
||||
// Unsub() also closes the channel sub.Events and makes a new one.
|
||||
func (sub *Subscription) Unsub() {
|
||||
sub.unsub(errors.New("Unsub() called"))
|
||||
}
|
||||
|
||||
// unsub is the internal implementation of Unsub.
|
||||
func (sub *Subscription) unsub(err error) {
|
||||
// cancel the context (if it's not canceled already)
|
||||
sub.cancel(err)
|
||||
// mark subscription as closed and send a CLOSE to the relay (naïve sync.Once implementation)
|
||||
if sub.live.CompareAndSwap(true, false) {
|
||||
sub.Close()
|
||||
}
|
||||
// remove subscription from our map
|
||||
sub.Client.Subscriptions.Delete(string(sub.id))
|
||||
}
|
||||
|
||||
// Close just sends a CLOSE message. You probably want Unsub() instead.
|
||||
func (sub *Subscription) Close() {
|
||||
if sub.Client.IsConnected() {
|
||||
closeMsg := closeenvelope.NewFrom(sub.id)
|
||||
closeb := closeMsg.Marshal(nil)
|
||||
<-sub.Client.Write(closeb)
|
||||
}
|
||||
}
|
||||
|
||||
// Sub sets sub.Filters and then calls sub.Fire(ctx).
|
||||
// The subscription will be closed if the context expires.
|
||||
func (sub *Subscription) Sub(_ context.Context, ff *filter.S) {
|
||||
sub.Filters = ff
|
||||
chk.E(sub.Fire())
|
||||
}
|
||||
|
||||
// Fire sends the "REQ" command to the relay.
|
||||
func (sub *Subscription) Fire() (err error) {
|
||||
var reqb []byte
|
||||
reqb = reqenvelope.NewFrom(sub.id, sub.Filters).Marshal(nil)
|
||||
sub.live.Store(true)
|
||||
if err = <-sub.Client.Write(reqb); err != nil {
|
||||
err = fmt.Errorf("failed to write: %w", err)
|
||||
sub.cancel(err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
104
pkg/protocol/ws/subscription_test.go
Normal file
104
pkg/protocol/ws/subscription_test.go
Normal file
@@ -0,0 +1,104 @@
|
||||
package ws
|
||||
|
||||
// const RELAY = "wss://nos.lol"
|
||||
|
||||
// // test if we can fetch a couple of random events
|
||||
// func TestSubscribeBasic(t *testing.T) {
|
||||
// rl := mustRelayConnect(t, RELAY)
|
||||
// defer rl.Close()
|
||||
//
|
||||
// sub, err := rl.Subscribe(
|
||||
// context.Background(), filters.New(
|
||||
// &filter.F{
|
||||
// Kinds: &kinds.T{K: []*kind.T{kind.TextNote}},
|
||||
// Limit: values.ToUintPointer(2),
|
||||
// },
|
||||
// ),
|
||||
// )
|
||||
// assert.NoError(t, err)
|
||||
// timeout := time.After(5 * time.Second)
|
||||
// n := 0
|
||||
// for {
|
||||
// select {
|
||||
// case event := <-sub.Events:
|
||||
// assert.NotNil(t, event)
|
||||
// n++
|
||||
// case <-sub.EndOfStoredEvents:
|
||||
// assert.Equal(t, 2, n)
|
||||
// sub.Unsub()
|
||||
// return
|
||||
// case <-rl.Context().Done():
|
||||
// t.Fatalf("connection closed: %v", rl.Context().Err())
|
||||
// case <-timeout:
|
||||
// t.Fatalf("timeout")
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // test if we can do multiple nested subscriptions
|
||||
// func TestNestedSubscriptions(t *testing.T) {
|
||||
// rl := mustRelayConnect(t, RELAY)
|
||||
// defer rl.Close()
|
||||
//
|
||||
// n := atomic.Uint32{}
|
||||
//
|
||||
// // fetch 2 replies to a note
|
||||
// sub, err := rl.Subscribe(
|
||||
// context.Background(), filters.New(
|
||||
// &filter.F{
|
||||
// Kinds: kinds.New(kind.TextNote),
|
||||
// Tags: tags.New(
|
||||
// tag.New(
|
||||
// "e",
|
||||
// "0e34a74f8547e3b95d52a2543719b109fd0312aba144e2ef95cba043f42fe8c5",
|
||||
// ),
|
||||
// ),
|
||||
// Limit: values.ToUintPointer(3),
|
||||
// },
|
||||
// ),
|
||||
// )
|
||||
// assert.NoError(t, err)
|
||||
//
|
||||
// for {
|
||||
// select {
|
||||
// case ev := <-sub.Events:
|
||||
// // now fetch author of this
|
||||
// sub, err := rl.Subscribe(
|
||||
// context.Background(), filters.New(
|
||||
// &filter.F{
|
||||
// Kinds: kinds.New(kind.ProfileMetadata),
|
||||
// Authors: tag.New(ev.PubKeyString()),
|
||||
// Limit: values.ToUintPointer(1),
|
||||
// },
|
||||
// ),
|
||||
// )
|
||||
// assert.NoError(t, err)
|
||||
//
|
||||
// for {
|
||||
// select {
|
||||
// case <-sub.Events:
|
||||
// // do another subscription here in "sync" mode, just so
|
||||
// // we're sure things aren't blocking
|
||||
// rl.QuerySync(
|
||||
// context.Background(),
|
||||
// &filter.F{Limit: values.ToUintPointer(1)},
|
||||
// )
|
||||
// n.Add(1)
|
||||
// if n.Load() == 3 {
|
||||
// // if we get here, it means the test passed
|
||||
// return
|
||||
// }
|
||||
// case <-sub.Context.Done():
|
||||
// case <-sub.EndOfStoredEvents:
|
||||
// sub.Unsub()
|
||||
// }
|
||||
// }
|
||||
// case <-sub.EndOfStoredEvents:
|
||||
// sub.Unsub()
|
||||
// return
|
||||
// case <-sub.Context.Done():
|
||||
// t.Fatalf("connection closed: %v", rl.Context().Err())
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
Reference in New Issue
Block a user