diff --git a/pkg/app/relay/spider-fetch.go b/pkg/app/relay/spider-fetch.go index da2d4a0..31da477 100644 --- a/pkg/app/relay/spider-fetch.go +++ b/pkg/app/relay/spider-fetch.go @@ -140,7 +140,7 @@ func (s *Server) SpiderFetch( default: } var evss event.S - var cli *ws.Relay + var cli *ws.Client if cli, err = ws.RelayConnect( context.Bg(), seed, ); chk.E(err) { diff --git a/pkg/encoders/filter/filter.go b/pkg/encoders/filter/filter.go index 8a85ef2..f7c75c3 100644 --- a/pkg/encoders/filter/filter.go +++ b/pkg/encoders/filter/filter.go @@ -23,6 +23,7 @@ import ( "orly.dev/pkg/encoders/timestamp" "orly.dev/pkg/utils/chk" "orly.dev/pkg/utils/errorf" + "orly.dev/pkg/utils/log" "orly.dev/pkg/utils/pointers" "lukechampine.com/frand" @@ -182,12 +183,12 @@ func (f *F) Marshal(dst []byte) (b []byte) { dst = append(dst, '[') for i, value := range values { dst = append(dst, '"') - if tKey[1] == 'e' || tKey[1] == 'p' { - // event and pubkey tags are binary 32 bytes - dst = hex.EncAppend(dst, value) - } else { - dst = append(dst, value...) - } + // if tKey[1] == 'e' || tKey[1] == 'p' { + // // event and pubkey tags are binary 32 bytes + // dst = hex.EncAppend(dst, value) + // } else { + dst = append(dst, value...) + // } dst = append(dst, '"') if i < len(values)-1 { dst = append(dst, ',') @@ -301,29 +302,29 @@ func (f *F) Unmarshal(b []byte) (r []byte, err error) { } k := make([]byte, len(key)) copy(k, key) - switch key[1] { - case 'e', 'p': - // the tags must all be 64 character hexadecimal - var ff [][]byte - if ff, r, err = text2.UnmarshalHexArray( - r, - sha256.Size, - ); chk.E(err) { - return - } - ff = append([][]byte{k}, ff...) - f.Tags = f.Tags.AppendTags(tag.FromBytesSlice(ff...)) - // f.Tags.F = append(f.Tags.F, tag.New(ff...)) - default: - // other types of tags can be anything - var ff [][]byte - if ff, r, err = text2.UnmarshalStringArray(r); chk.E(err) { - return - } - ff = append([][]byte{k}, ff...) - f.Tags = f.Tags.AppendTags(tag.FromBytesSlice(ff...)) - // f.Tags.F = append(f.Tags.F, tag.New(ff...)) + // switch key[1] { + // case 'e', 'p': + // // the tags must all be 64 character hexadecimal + // var ff [][]byte + // if ff, r, err = text2.UnmarshalHexArray( + // r, + // sha256.Size, + // ); chk.E(err) { + // return + // } + // ff = append([][]byte{k}, ff...) + // f.Tags = f.Tags.AppendTags(tag.FromBytesSlice(ff...)) + // // f.Tags.F = append(f.Tags.F, tag.New(ff...)) + // default: + // other types of tags can be anything + var ff [][]byte + if ff, r, err = text2.UnmarshalStringArray(r); chk.E(err) { + return } + ff = append([][]byte{k}, ff...) + f.Tags = f.Tags.AppendTags(tag.FromBytesSlice(ff...)) + // f.Tags.F = append(f.Tags.F, tag.New(ff...)) + // } state = betweenKV case IDs[0]: if len(key) < len(IDs) { @@ -445,32 +446,35 @@ invalid: // determines if the event matches the filter, ignoring timestamp constraints.. func (f *F) MatchesIgnoringTimestampConstraints(ev *event.E) bool { if ev == nil { - // log.F.ToSliceOfBytes("nil event") + log.I.F("nil event") return false } if f.Ids.Len() > 0 && !f.Ids.Contains(ev.ID) { - // log.F.ToSliceOfBytes("no ids in filter match event\nEVENT %s\nFILTER %s", ev.ToObject().String(), f.ToObject().String()) + log.I.F("no ids in filter match event") return false } if f.Kinds.Len() > 0 && !f.Kinds.Contains(ev.Kind) { - // log.F.ToSliceOfBytes("no matching kinds in filter\nEVENT %s\nFILTER %s", ev.ToObject().String(), f.ToObject().String()) + log.I.F( + "no matching kinds in filter", + ) return false } if f.Authors.Len() > 0 && !f.Authors.Contains(ev.Pubkey) { - // log.F.ToSliceOfBytes("no matching authors in filter\nEVENT %s\nFILTER %s", ev.ToObject().String(), f.ToObject().String()) - return false - } - if f.Tags.Len() > 0 && !ev.Tags.Intersects(f.Tags) { + log.I.F("no matching authors in filter") return false } + // if f.Tags.Len() > 0 && !ev.Tags.Intersects(f.Tags) { + // return false + // } if f.Tags.Len() > 0 { for _, v := range f.Tags.ToSliceOfTags() { tvs := v.ToSliceOfBytes() if !ev.Tags.ContainsAny(v.FilterKey(), tag.New(tvs...)) { + log.I.F("no matching tags in filter") return false } } - return false + // return false } return true } @@ -481,11 +485,11 @@ func (f *F) Matches(ev *event.E) (match bool) { return } if f.Since.Int() != 0 && ev.CreatedAt.I64() < f.Since.I64() { - // log.F.ToSliceOfBytes("event is older than since\nEVENT %s\nFILTER %s", ev.ToObject().String(), f.ToObject().String()) + log.I.F("event is older than since") return } if f.Until.Int() != 0 && ev.CreatedAt.I64() > f.Until.I64() { - // log.F.ToSliceOfBytes("event is newer than until\nEVENT %s\nFILTER %s", ev.ToObject().String(), f.ToObject().String()) + log.I.F("event is newer than until") return } return true diff --git a/pkg/encoders/tags/tags.go b/pkg/encoders/tags/tags.go index a85644d..dcdf5d6 100644 --- a/pkg/encoders/tags/tags.go +++ b/pkg/encoders/tags/tags.go @@ -309,7 +309,7 @@ func (t *T) ContainsAny(tagName []byte, values *tag.T) bool { continue } for _, candidate := range values.ToSliceOfBytes() { - if bytes.Equal(v.Value(), candidate) { + if bytes.HasPrefix(v.Value(), candidate) { return true } } diff --git a/pkg/protocol/nwc/client.go b/pkg/protocol/nwc/client.go index c1681b3..9c723c0 100644 --- a/pkg/protocol/nwc/client.go +++ b/pkg/protocol/nwc/client.go @@ -20,7 +20,6 @@ import ( "orly.dev/pkg/protocol/ws" "orly.dev/pkg/utils/chk" "orly.dev/pkg/utils/context" - "orly.dev/pkg/utils/log" "orly.dev/pkg/utils/values" ) @@ -115,10 +114,6 @@ func (cl *Client) RPC( if err = ev.Sign(cl.clientSecretKey); chk.E(err) { return } - var ok bool - if ok, err = ev.Verify(); chk.E(err) { - } - log.I.F("verify: %v", ok) var rc *ws.Client if rc, err = ws.RelayConnect(c, cl.relay); chk.E(err) { return @@ -129,9 +124,9 @@ func (cl *Client) RPC( c, filters.New( &filter.F{ Limit: values.ToUintPointer(1), - Kinds: kinds.New(kind.WalletRequest), - Authors: tag.New(cl.clientSecretKey.Pub()), - Tags: tags.New(tag.New([]byte("#e"), ev.ID)), + Kinds: kinds.New(kind.WalletResponse), + Authors: tag.New(cl.walletPublicKey), + Tags: tags.New(tag.New("#e", hex.Enc(ev.ID))), }, ), ); chk.E(err) { @@ -141,7 +136,6 @@ func (cl *Client) RPC( if err = rc.Publish(context.Bg(), ev); chk.E(err) { return } - log.I.F("published event %s", ev.Marshal(nil)) select { case <-c.Done(): err = fmt.Errorf("context canceled waiting for response") diff --git a/pkg/protocol/ws/client.go b/pkg/protocol/ws/client.go index a100686..b4efc81 100644 --- a/pkg/protocol/ws/client.go +++ b/pkg/protocol/ws/client.go @@ -23,6 +23,7 @@ import ( "orly.dev/pkg/encoders/event" "orly.dev/pkg/encoders/filter" "orly.dev/pkg/encoders/filters" + "orly.dev/pkg/encoders/hex" "orly.dev/pkg/encoders/kind" "orly.dev/pkg/encoders/tag" "orly.dev/pkg/encoders/tags" @@ -37,7 +38,7 @@ import ( var subscriptionIDCounter atomic.Int64 // Relay represents a connection to a Nostr relay. -type Relay struct { +type Client struct { closeMutex sync.Mutex URL string @@ -68,9 +69,9 @@ type writeRequest struct { } // NewRelay returns a new relay. It takes a context that, when canceled, will close the relay connection. -func NewRelay(ctx context.T, url string, opts ...RelayOption) *Relay { +func NewRelay(ctx context.T, url string, opts ...RelayOption) *Client { ctx, cancel := context.Cause(ctx) - r := &Relay{ + r := &Client{ URL: string(normalize.URL(url)), connectionContext: ctx, connectionContextCancel: cancel, @@ -97,7 +98,7 @@ func NewRelay(ctx context.T, url string, opts ...RelayOption) *Relay { // The ongoing relay connection uses a background context. To close the connection, call r.Close(). // If you need fine grained long-term connection contexts, use NewRelay() instead. func RelayConnect(ctx context.T, url string, opts ...RelayOption) ( - *Relay, error, + *Client, error, ) { r := NewRelay(context.Bg(), url, opts...) err := r.Connect(ctx) @@ -106,7 +107,7 @@ func RelayConnect(ctx context.T, url string, opts ...RelayOption) ( // RelayOption is the type of the argument passed when instantiating relay connections. type RelayOption interface { - ApplyRelayOption(*Relay) + ApplyRelayOption(*Client) } var ( @@ -119,7 +120,7 @@ var ( // when not given, defaults to logging the notices. type WithNoticeHandler func(notice string) -func (nh WithNoticeHandler) ApplyRelayOption(r *Relay) { +func (nh WithNoticeHandler) ApplyRelayOption(r *Client) { r.noticeHandler = nh } @@ -127,28 +128,28 @@ func (nh WithNoticeHandler) ApplyRelayOption(r *Relay) { // parsed as a standard envelope. type WithCustomHandler func(data string) -func (ch WithCustomHandler) ApplyRelayOption(r *Relay) { +func (ch WithCustomHandler) ApplyRelayOption(r *Client) { r.customHandler = ch } // WithRequestHeader sets the HTTP request header of the websocket preflight request. type WithRequestHeader http.Header -func (ch WithRequestHeader) ApplyRelayOption(r *Relay) { +func (ch WithRequestHeader) ApplyRelayOption(r *Client) { r.requestHeader = http.Header(ch) } // String just returns the relay URL. -func (r *Relay) String() string { +func (r *Client) String() string { return r.URL } // Context retrieves the context that is associated with this relay connection. // It will be closed when the relay is disconnected. -func (r *Relay) Context() context.T { return r.connectionContext } +func (r *Client) Context() context.T { return r.connectionContext } // IsConnected returns true if the connection to this relay seems to be active. -func (r *Relay) IsConnected() bool { return r.connectionContext.Err() == nil } +func (r *Client) IsConnected() bool { return r.connectionContext.Err() == nil } // Connect tries to establish a websocket connection to r.URL. // If the context expires before the connection is complete, an error is returned. @@ -157,7 +158,7 @@ func (r *Relay) IsConnected() bool { return r.connectionContext.Err() == nil } // // The given context here is only used during the connection phase. The long-living // relay connection will be based on the context given to NewRelay(). -func (r *Relay) Connect(ctx context.T) error { +func (r *Client) Connect(ctx context.T) error { return r.ConnectWithTLS(ctx, nil) } @@ -171,7 +172,7 @@ func subIdToSerial(subId string) int64 { } // ConnectWithTLS is like Connect(), but takes a special tls.Config if you need that. -func (r *Relay) ConnectWithTLS( +func (r *Client) ConnectWithTLS( ctx context.T, tlsConfig *tls.Config, ) (err error) { if r.connectionContext == nil || r.Subscriptions == nil { @@ -276,8 +277,8 @@ func (r *Relay) ConnectWithTLS( r.challenge = env.Challenge case eventenvelope.L: var env *eventenvelope.Result - var ev *event.E - if env, err = eventenvelope.NewResultWith(rem, ev); chk.E(err) { + env = eventenvelope.NewResult() + if _, err = env.Unmarshal(rem); chk.E(err) { continue } sub, ok := r.Subscriptions.Load(subIdToSerial(env.Subscription.String())) @@ -290,8 +291,8 @@ func (r *Relay) ConnectWithTLS( } if !sub.Filters.Match(env.Event) { log.I.F( - "{%s} filter does not match: %v ~ %v\n", r.URL, - sub.Filters, env.Event, + "{%s} filter does not match: %v ~ %s\n", r.URL, + sub.Filters, env.Event.Marshal(nil), ) continue } @@ -362,7 +363,7 @@ func (r *Relay) ConnectWithTLS( } // Write queues an arbitrary message to be sent to the relay. -func (r *Relay) Write(msg []byte) <-chan error { +func (r *Client) Write(msg []byte) <-chan error { ch := make(chan error) select { case r.writeQueue <- writeRequest{msg: msg, answer: ch}: @@ -374,13 +375,13 @@ func (r *Relay) Write(msg []byte) <-chan error { // Publish sends an "EVENT" command to the relay r as in NIP-01 and waits for an // OK response. -func (r *Relay) Publish(ctx context.T, ev *event.E) error { +func (r *Client) Publish(ctx context.T, ev *event.E) error { return r.publish(ctx, ev.ID, ev) } // Auth sends an "AUTH" command client->relay as in NIP-42 and waits for an OK // response. -func (r *Relay) Auth( +func (r *Client) Auth( ctx context.T, sign signer.I, ) (err error) { authEvent := &event.E{ @@ -398,8 +399,8 @@ func (r *Relay) Auth( return r.publish(ctx, authEvent.ID, authEvent) } -func (r *Relay) publish( - ctx context.T, id []byte, env *event.E, +func (r *Client) publish( + ctx context.T, id []byte, ev *event.E, ) error { var err error var cancel context.F @@ -418,7 +419,7 @@ func (r *Relay) publish( // listen for an OK callback gotOk := false - ids := string(id) + ids := hex.Enc(id) r.okCallbacks.Store( ids, func(ok bool, reason string) { gotOk = true @@ -430,7 +431,8 @@ func (r *Relay) publish( ) defer r.okCallbacks.Delete(ids) // publish event - envb := env.Marshal(nil) + envb := eventenvelope.NewSubmissionWith(ev).Marshal(nil) + // envb := ev.Marshal(nil) if err = <-r.Write(envb); err != nil { return err } @@ -457,7 +459,7 @@ func (r *Relay) publish( // Remember to cancel subscriptions, either by calling `.Unsub()` on them or // ensuring their `context.T` will be canceled at some point. Failure to // do that will result in a huge number of halted goroutines being created. -func (r *Relay) Subscribe( +func (r *Client) Subscribe( ctx context.T, ff *filters.T, opts ...SubscriptionOption, ) (sub *Subscription, err error) { sub = r.PrepareSubscription(ctx, ff, opts...) @@ -477,7 +479,7 @@ func (r *Relay) Subscribe( // // Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.T` will be canceled at some point. // Failure to do that will result in a huge number of halted goroutines being created. -func (r *Relay) PrepareSubscription( +func (r *Client) PrepareSubscription( ctx context.T, ff *filters.T, opts ...SubscriptionOption, ) *Subscription { current := subscriptionIDCounter.Add(1) @@ -521,7 +523,7 @@ func (r *Relay) PrepareSubscription( // QueryEvents subscribes to events matching the given filter and returns a channel of events. // // In most cases it's better to use Pool instead of this method. -func (r *Relay) QueryEvents(ctx context.T, f *filter.F) ( +func (r *Client) QueryEvents(ctx context.T, f *filter.F) ( evc event.C, err error, ) { var sub *Subscription @@ -551,7 +553,7 @@ func (r *Relay) QueryEvents(ctx context.T, f *filter.F) ( // limit is exceeded. So this method will return an error if the limit is nil. // If the query blocks, the caller needs to cancel the context to prevent the // thread stalling. -func (r *Relay) QuerySync(ctx context.T, f *filter.F) ( +func (r *Client) QuerySync(ctx context.T, f *filter.F) ( evs event.S, err error, ) { if f.Limit == nil { @@ -631,11 +633,11 @@ func (r *Relay) QuerySync(ctx context.T, f *filter.F) ( // } // Close closes the relay connection. -func (r *Relay) Close() error { +func (r *Client) Close() error { return r.close(errors.New("relay connection closed")) } -func (r *Relay) close(reason error) error { +func (r *Client) close(reason error) error { r.closeMutex.Lock() defer r.closeMutex.Unlock() diff --git a/pkg/protocol/ws/client_test.go b/pkg/protocol/ws/client_test.go index a25d563..d6a3c09 100644 --- a/pkg/protocol/ws/client_test.go +++ b/pkg/protocol/ws/client_test.go @@ -275,7 +275,7 @@ var anyOriginHandshake = func( return nil } -func mustRelayConnect(url string) (client *Relay) { +func mustRelayConnect(url string) (client *Client) { rl, err := RelayConnect(context.Background(), url) if err != nil { panic(err.Error()) diff --git a/pkg/protocol/ws/pool.go b/pkg/protocol/ws/pool.go index 247cb94..c06328f 100644 --- a/pkg/protocol/ws/pool.go +++ b/pkg/protocol/ws/pool.go @@ -30,7 +30,7 @@ const ( // Pool manages connections to multiple relays, ensures they are reopened when necessary and not duplicated. type Pool struct { - Relays *xsync.MapOf[string, *Relay] + Relays *xsync.MapOf[string, *Client] Context context.T authHandler func() signer.I @@ -55,7 +55,7 @@ type DirectedFilter struct { // RelayEvent represents an event received from a specific relay. type RelayEvent struct { *event.E - Relay *Relay + Relay *Client } func (ie RelayEvent) String() string { @@ -73,7 +73,7 @@ type PoolOption interface { func NewPool(c context.T, opts ...PoolOption) (pool *Pool) { ctx, cancel := context.Cause(c) pool = &Pool{ - Relays: xsync.NewMapOf[string, *Relay](), + Relays: xsync.NewMapOf[string, *Client](), Context: ctx, cancel: cancel, } @@ -189,7 +189,7 @@ func namedLock[V ~[]byte | ~string](name V) (unlock func()) { // EnsureRelay ensures that a relay connection exists and is active. // If the relay is not connected, it attempts to connect. -func (p *Pool) EnsureRelay(url string) (*Relay, error) { +func (p *Pool) EnsureRelay(url string) (*Client, error) { nm := string(normalize.URL(url)) defer namedLock(nm)() relay, ok := p.Relays.Load(nm) @@ -236,7 +236,7 @@ func (p *Pool) EnsureRelay(url string) (*Relay, error) { type PublishResult struct { Error error RelayURL string - Relay *Relay + Relay *Client } // todo: this didn't used to be in this package... probably don't want to add it diff --git a/pkg/protocol/ws/pool_test.go b/pkg/protocol/ws/pool_test.go index 3bcb004..84e7afa 100644 --- a/pkg/protocol/ws/pool_test.go +++ b/pkg/protocol/ws/pool_test.go @@ -111,7 +111,7 @@ func TestRelayEventString(t *testing.T) { CreatedAt: timestamp.Now(), } - client := &Relay{URL: "wss://test.relay"} + client := &Client{URL: "wss://test.relay"} ie := RelayEvent{E: testEvent, Relay: client} str := ie.String() diff --git a/pkg/protocol/ws/subscription.go b/pkg/protocol/ws/subscription.go index 971e5a0..eab786f 100644 --- a/pkg/protocol/ws/subscription.go +++ b/pkg/protocol/ws/subscription.go @@ -20,7 +20,7 @@ type Subscription struct { counter int64 id string - Relay *Relay + Relay *Client Filters *filters.T // // for this to be treated as a COUNT and not a REQ this must be set diff --git a/pkg/version/version b/pkg/version/version index 215872e..48080b4 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.4.15 \ No newline at end of file +v0.5.0 \ No newline at end of file