From c91a28352021d60dfe224b2678cf4273abe9ba40 Mon Sep 17 00:00:00 2001 From: mleku Date: Mon, 28 Jul 2025 05:05:56 +0100 Subject: [PATCH] implement fully working listener/subscribe/unsubscribe publisher --- pkg/app/config/config.go | 12 + pkg/app/relay/config.go | 10 + pkg/app/relay/publish/publisher.go | 7 +- pkg/app/relay/server.go | 19 +- pkg/app/relay/spider-fetch.go | 2 +- pkg/interfaces/publisher/publisher.go | 9 +- pkg/interfaces/server/server.go | 2 + pkg/protocol/openapi/listen.go | 123 +++++++ pkg/protocol/openapi/publisher.go | 317 +++++++++++++++++ pkg/protocol/openapi/publisher_test.go | 431 ++++++++++++++++++++++++ pkg/protocol/openapi/subscribe.go | 82 +++++ pkg/protocol/openapi/unsubscribe.go | 77 +++++ pkg/protocol/socketapi/handleEvent.go | 2 + pkg/protocol/socketapi/handleMessage.go | 3 +- pkg/protocol/socketapi/handleReq.go | 3 +- pkg/protocol/socketapi/publisher.go | 21 +- pkg/protocol/socketapi/socketapi.go | 16 + pkg/version/version | 2 +- 18 files changed, 1118 insertions(+), 20 deletions(-) create mode 100644 pkg/app/relay/config.go create mode 100644 pkg/protocol/openapi/listen.go create mode 100644 pkg/protocol/openapi/publisher.go create mode 100644 pkg/protocol/openapi/publisher_test.go create mode 100644 pkg/protocol/openapi/subscribe.go create mode 100644 pkg/protocol/openapi/unsubscribe.go diff --git a/pkg/app/config/config.go b/pkg/app/config/config.go index 78306f6..66aaee7 100644 --- a/pkg/app/config/config.go +++ b/pkg/app/config/config.go @@ -40,6 +40,7 @@ type C struct { SpiderSeeds []string `env:"ORLY_SPIDER_SEEDS" usage:"seeds to use for the spider (relays that are looked up initially to find owner relay lists) (comma separated)" default:"wss://relay.nostr.band/,wss://relay.damus.io/,wss://nostr.wine/,wss://nostr.land/,wss://theforest.nostr1.com/"` Owners []string `env:"ORLY_OWNERS" usage:"list of users whose follow lists designate whitelisted users who can publish events, and who can read if public readable is false (comma separated)"` Private bool `env:"ORLY_PRIVATE" usage:"do not spider for user metadata because the relay is private and this would leak relay memberships" default:"false"` + Whitelist []string `env:"ORLY_WHITELIST" usage:"only allow connections from this list of IP addresses"` } // New creates and initializes a new configuration object for the relay @@ -90,6 +91,17 @@ func New() (cfg *C, err error) { lol.SetLogLevel(cfg.LogLevel) log.I.F("loaded configuration from %s", envPath) } + // if spider seeds has no elements, there still is a single entry with an + // empty string; and also if any of the fields are empty strings, they need + // to be removed. + var seeds []string + for _, u := range cfg.SpiderSeeds { + if u == "" { + continue + } + seeds = append(seeds, u) + } + cfg.SpiderSeeds = seeds return } diff --git a/pkg/app/relay/config.go b/pkg/app/relay/config.go new file mode 100644 index 0000000..7721838 --- /dev/null +++ b/pkg/app/relay/config.go @@ -0,0 +1,10 @@ +package relay + +import ( + "orly.dev/pkg/app/config" +) + +func (s *Server) Config() (c *config.C) { + c = s.C + return +} diff --git a/pkg/app/relay/publish/publisher.go b/pkg/app/relay/publish/publisher.go index c95485e..26d3fa8 100644 --- a/pkg/app/relay/publish/publisher.go +++ b/pkg/app/relay/publish/publisher.go @@ -6,6 +6,8 @@ package publish import ( "orly.dev/pkg/encoders/event" "orly.dev/pkg/interfaces/publisher" + "orly.dev/pkg/interfaces/typer" + "orly.dev/pkg/utils/log" ) // S is the control structure for the subscription management scheme. @@ -24,13 +26,14 @@ var _ publisher.I = &S{} func (s *S) Type() string { return "publish" } func (s *S) Deliver(ev *event.E) { + log.I.F("number of publishers: %d", len(s.Publishers)) for _, p := range s.Publishers { + log.I.F("delivering to subscriber type %s", p.Type()) p.Deliver(ev) - return } } -func (s *S) Receive(msg publisher.Message) { +func (s *S) Receive(msg typer.T) { t := msg.Type() for _, p := range s.Publishers { if p.Type() == t { diff --git a/pkg/app/relay/server.go b/pkg/app/relay/server.go index afe9235..acb9d53 100644 --- a/pkg/app/relay/server.go +++ b/pkg/app/relay/server.go @@ -6,8 +6,10 @@ import ( "fmt" "net" "net/http" + "orly.dev/pkg/protocol/openapi" "orly.dev/pkg/protocol/socketapi" "strconv" + "strings" "time" "orly.dev/pkg/app/config" @@ -101,7 +103,7 @@ func NewServer( C: sp.C, Lists: new(Lists), } - s.listeners = publish.New(socketapi.New(s)) + s.listeners = publish.New(socketapi.New(s), openapi.NewPublisher(s)) go func() { if err := s.relay.Init(); chk.E(err) { s.Shutdown() @@ -133,6 +135,21 @@ func NewServer( // // - For all other paths, delegates to the internal mux's ServeHTTP method. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + c := s.Config() + remote := helpers.GetRemoteFromReq(r) + var whitelisted bool + if len(c.Whitelist) > 0 { + for _, addr := range c.Whitelist { + if strings.HasPrefix(remote, addr) { + whitelisted = true + } + } + } else { + whitelisted = true + } + if !whitelisted { + return + } // standard nostr protocol only governs the "root" path of the relay and // websockets if r.URL.Path == "/" { diff --git a/pkg/app/relay/spider-fetch.go b/pkg/app/relay/spider-fetch.go index 5811fb1..aa2c3f1 100644 --- a/pkg/app/relay/spider-fetch.go +++ b/pkg/app/relay/spider-fetch.go @@ -100,7 +100,7 @@ func (s *Server) SpiderFetch( log.I.F("%d events found of type %s", len(pkKindMap), kindsList) - if !noFetch { + if !noFetch && len(s.C.SpiderSeeds) > 0 { // we need to search the spider seeds. // Break up pubkeys into batches of 128 for i := 0; i < len(pubkeys); i += 128 { diff --git a/pkg/interfaces/publisher/publisher.go b/pkg/interfaces/publisher/publisher.go index 8e3d1f2..f321de2 100644 --- a/pkg/interfaces/publisher/publisher.go +++ b/pkg/interfaces/publisher/publisher.go @@ -2,16 +2,13 @@ package publisher import ( "orly.dev/pkg/encoders/event" + "orly.dev/pkg/interfaces/typer" ) -type Message interface { - Type() string -} - type I interface { - Message + typer.T Deliver(ev *event.E) - Receive(msg Message) + Receive(msg typer.T) } type Publishers []I diff --git a/pkg/interfaces/server/server.go b/pkg/interfaces/server/server.go index 8175562..a941537 100644 --- a/pkg/interfaces/server/server.go +++ b/pkg/interfaces/server/server.go @@ -2,6 +2,7 @@ package server import ( "net/http" + "orly.dev/pkg/app/config" "orly.dev/pkg/app/relay/publish" "orly.dev/pkg/encoders/event" "orly.dev/pkg/encoders/filters" @@ -39,4 +40,5 @@ type I interface { PublicReadable() bool ServiceURL(req *http.Request) (s string) OwnersPubkeys() (pks [][]byte) + Config() *config.C } diff --git a/pkg/protocol/openapi/listen.go b/pkg/protocol/openapi/listen.go new file mode 100644 index 0000000..793ede7 --- /dev/null +++ b/pkg/protocol/openapi/listen.go @@ -0,0 +1,123 @@ +package openapi + +import ( + "crypto/rand" + "github.com/danielgtaylor/huma/v2" + "github.com/danielgtaylor/huma/v2/sse" + "net/http" + "orly.dev/pkg/app/relay/helpers" + "orly.dev/pkg/encoders/event" + "orly.dev/pkg/encoders/filter" + "orly.dev/pkg/encoders/hex" + "orly.dev/pkg/utils/chk" + "orly.dev/pkg/utils/context" + "orly.dev/pkg/utils/log" +) + +type Event struct { + SubID string `json:"sub_id"` + Event *event.J `json:"event"` +} + +type ListenInput struct { + Auth string `header:"Authorization" doc:"nostr nip-98 (and expiring variant)" required:"false"` + Accept string `header:"Accept" default:"text/event-stream" enum:"text/event-stream" required:"true"` +} + +func (x *Operations) RegisterListen(api huma.API) { + name := "Listen" + description := `Opens up a HTTP SSE subscription channel that will send results from the /subscribe endpoint. Writes the subscription channel identifier as the first event in the stream. + +Close the connection to end all deliveries, or use /unsubscribe. Before /subscribe or /unsubscribe can be used, this must be first opened. + +Many browsers have a limited number of SSE channels that can be open at once, so this allows an app to consolidate all of its subscriptions into one. If the client understands HTTP/2 this limit is relaxed from 6 concurrent subscription channels per domain to 100, the net/http library has supported the protocol transparently since Go 1.6. However, because of the design of this API, each instance of a client only requires one open HTTP SSE connection to receive all subscriptions.` + path := x.path + "/listen" + scopes := []string{"user", "read"} + method := http.MethodPost + sse.Register( + api, huma.Operation{ + OperationID: name, + Summary: name, + Path: path, + Method: method, + Tags: []string{"events"}, + Description: helpers.GenerateDescription(description, scopes), + Security: []map[string][]string{{"auth": scopes}}, + }, + map[string]any{ + "client_id": "", + "event": &Event{}, + }, + func(ctx context.T, input *ListenInput, send sse.Sender) { + r := ctx.Value("http-request").(*http.Request) + remote := helpers.GetRemoteFromReq(r) + var err error + var authed bool + var pubkey []byte + if x.I.AuthRequired() && !x.I.PublicReadable() { + authed, pubkey = x.UserAuth(r, remote) + if !authed { + err = huma.Error401Unauthorized("Not Authorized") + return + } + } + + // Generate a unique client ID + id := make([]byte, 16) + if _, err = rand.Read(id); chk.E(err) { + return + } + clientId := hex.Enc(id) + + // Create a receiver channel for events + receiver := make(DeliverChan, 32) + + // Create and register the listener + listener := &H{ + Id: clientId, + New: true, + Receiver: receiver, + Pubkey: pubkey, + FilterMap: make(map[string]*filter.F), + } + + log.T.F("creating new listener %s", clientId) + x.Publisher().Receive(listener) + + // Send the client ID as the first event + if err = send.Data(clientId); chk.E(err) { + return + } + + // Event loop + out: + for { + select { + case <-x.Context().Done(): + // server shutdown + break out + case <-r.Context().Done(): + // connection has closed + break out + case ev := <-receiver: + // if the channel is closed, the event will be nil + if ev == nil { + break out + } + if err = send.Data( + Event{ + clientId, ev.Event.ToEventJ(), + }, + ); chk.E(err) { + break out + } + } + } + // Clean up the listener when the context is done + log.T.F("removing listener %s", clientId) + listener.Cancel = true + x.Publisher().Receive(listener) + return + }, + ) +} diff --git a/pkg/protocol/openapi/publisher.go b/pkg/protocol/openapi/publisher.go new file mode 100644 index 0000000..4046828 --- /dev/null +++ b/pkg/protocol/openapi/publisher.go @@ -0,0 +1,317 @@ +package openapi + +import ( + "orly.dev/pkg/encoders/event" + "orly.dev/pkg/encoders/filter" + "orly.dev/pkg/interfaces/publisher" + "orly.dev/pkg/interfaces/server" + "orly.dev/pkg/interfaces/typer" + "orly.dev/pkg/protocol/auth" + "orly.dev/pkg/utils/log" + "reflect" + "sync" +) + +const Type = "openapi" + +type Delivery struct { + SubId string `json:"sub_id"` + Event *event.E `json:"event"` +} + +type DeliverChan chan *Delivery + +type H struct { + sync.Mutex + + // If Cancel is true, this is a close command (must be done when a listener + // connection is closed). + Cancel bool + + // New is a flag that signifies a newly created client_id + New bool + + // Id is the identifier for an HTTP subscription listener channel + Id string + + // FilterMap is the collection of filters associated with a listener. + FilterMap map[string]*filter.F + + // Receiver is the channel for receiving events + Receiver DeliverChan + + // Pubkey is the authenticated public key for this listener + Pubkey []byte +} + +func (h *H) Type() (typeName string) { return Type } + +type Publisher struct { + sync.Mutex + + // ListenMap maps listener IDs to listener objects + ListenMap map[string]*H + + // Server is an interface to the server + Server server.I +} + +var _ publisher.I = &Publisher{} + +func (p *Publisher) Type() (typeName string) { return Type } + +func NewPublisher(s server.I) (p *Publisher) { + return &Publisher{ + ListenMap: make(map[string]*H), + Server: s, + } +} + +// Receive handles incoming messages to manage HTTP listener subscriptions and +// associated filters. +// +// # Parameters +// +// - msg (typer.T): The incoming message to process; expected to be of +// type *H to trigger subscription management actions. +// +// # Expected behaviour +// +// - Checks if the message is of type *H. +// +// - If Cancel is true, removes a subscriber by ID or the entire listener. +// +// - Otherwise, adds the subscription to the map under a mutex lock. +// +// - Logs actions related to subscription creation or removal. +func (p *Publisher) Receive(msg typer.T) { + if m, ok := msg.(*H); ok { + if m.Cancel { + if m.Id == "" { + // Can't do anything with an empty ID + log.W.F("received cancel request with empty ID") + return + } + + if m.FilterMap == nil || len(m.FilterMap) == 0 { + // Remove the entire listener + p.removeListener(m.Id) + log.T.F("removed listener %s", m.Id) + } else { + // Remove specific subscriptions + p.removeSubscription(m.Id, m.FilterMap) + for id := range m.FilterMap { + log.T.F("removed subscription %s for %s", id, m.Id) + } + } + return + } + p.Lock() + defer p.Unlock() + if listener, ok := p.ListenMap[m.Id]; !ok { + // Don't create new listeners automatically + if m.New { + // Create a new listener when New flag is set + listener := &H{ + Id: m.Id, + FilterMap: make(map[string]*filter.F), + Receiver: m.Receiver, + Pubkey: m.Pubkey, + } + + // Add the filters if provided + if m.FilterMap != nil { + for id, f := range m.FilterMap { + listener.FilterMap[id] = f + log.T.F("added subscription %s for new listener %s", id, m.Id) + } + } + + // Add the listener to the map + p.ListenMap[m.Id] = listener + log.T.F("added new listener %s", m.Id) + } else { + // Only the Listen API should create new listeners + log.W.F("received message for non-existent listener %s", m.Id) + } + return + } else { + // Update existing listener + if m.FilterMap != nil { + for id, f := range m.FilterMap { + listener.FilterMap[id] = f + log.T.F("added subscription %s for %s", id, m.Id) + } + } + } + } +} + +// Deliver processes and distributes an event to all matching subscribers based +// on their filter configurations. +// +// # Parameters +// +// - ev (*event.E): The event to be delivered to subscribed clients. +// +// # Expected behaviour +// +// Delivers the event to all subscribers whose filters match the event. It +// applies authentication checks if required by the server, and skips delivery +// for unauthenticated users when events are privileged. +func (p *Publisher) Deliver(ev *event.E) { + log.T.F("delivering event %0x to HTTP subscribers", ev.ID) + p.Lock() + defer p.Unlock() + for listenerId, listener := range p.ListenMap { + for subId, filter := range listener.FilterMap { + if !filter.Matches(ev) { + log.I.F( + "listener %s, subscription id %s event\n%s\ndoes not match filter\n%s", + listenerId, subId, ev.Marshal(nil), + filter.Marshal(nil), + ) + continue + } + if p.Server.AuthRequired() { + if !auth.CheckPrivilege(listener.Pubkey, ev) { + log.W.F( + "not privileged %0x ev pubkey %0x listener pubkey %0x kind %s privileged: %v", + listener.Pubkey, ev.Pubkey, + listener.Pubkey, ev.Kind.Name(), + ev.Kind.IsPrivileged(), + ) + continue + } + } + // Send the event to the listener's receiver channel + select { + case listener.Receiver <- &Delivery{SubId: subId, Event: ev}: + log.T.F( + "dispatched event %0x to subscription %s for listener %s", + ev.ID, subId, listenerId, + ) + default: + log.W.F( + "failed to dispatch event %0x to subscription %s for listener %s: channel full", + ev.ID, subId, listenerId, + ) + } + } + } +} + +// removeListener removes a listener from the Publisher collection. +func (p *Publisher) removeListener(id string) { + p.Lock() + delete(p.ListenMap, id) + p.Unlock() +} + +// removeSubscription removes specific subscriptions from a listener. +// It does not delete the listener even if all subscriptions are removed. +func (p *Publisher) removeSubscription( + listenerId string, filterMap map[string]*filter.F, +) { + p.Lock() + if listener, ok := p.ListenMap[listenerId]; ok { + for id := range filterMap { + delete(listener.FilterMap, id) + } + // We no longer delete the listener when all subscriptions are removed + // This allows the listener to remain active for future subscriptions + } + p.Unlock() +} + +// ListenerExists checks if a listener with the given ID exists. +func (p *Publisher) ListenerExists(id string) bool { + p.Lock() + defer p.Unlock() + _, exists := p.ListenMap[id] + return exists +} + +// SubscriptionExists checks if a subscription with the given ID exists for a specific listener. +func (p *Publisher) SubscriptionExists(listenerId string, subscriptionId string) bool { + p.Lock() + defer p.Unlock() + listener, exists := p.ListenMap[listenerId] + if !exists { + return false + } + _, exists = listener.FilterMap[subscriptionId] + return exists +} + +// CheckListenerExists is a package-level function that checks if a listener exists. +// This function is used by the Subscribe and Unsubscribe APIs to check if a client ID exists. +func CheckListenerExists(clientId string, publishers ...publisher.I) bool { + for _, p := range publishers { + // Check if the publisher is of type *Publisher + if pub, ok := p.(*Publisher); ok { + if pub.ListenerExists(clientId) { + return true + } + } + + // Check if the publisher has a Publishers field of type publisher.Publishers + // This handles the case where the publisher is a *publish.S + val := reflect.ValueOf(p) + if val.Kind() == reflect.Ptr { + val = val.Elem() + if val.Kind() == reflect.Struct { + field := val.FieldByName("Publishers") + if field.IsValid() && field.Type().String() == "publisher.Publishers" { + // Iterate through the publishers + for i := 0; i < field.Len(); i++ { + pub := field.Index(i).Interface().(publisher.I) + // Check if this publisher is of type *Publisher + if openPub, ok := pub.(*Publisher); ok { + if openPub.ListenerExists(clientId) { + return true + } + } + } + } + } + } + } + return false +} + +// CheckSubscriptionExists is a package-level function that checks if a subscription exists for a specific listener. +// This function is used by the Unsubscribe API to check if a subscription ID exists before attempting to unsubscribe. +func CheckSubscriptionExists(clientId string, subscriptionId string, publishers ...publisher.I) bool { + for _, p := range publishers { + // Check if the publisher is of type *Publisher + if pub, ok := p.(*Publisher); ok { + if pub.SubscriptionExists(clientId, subscriptionId) { + return true + } + } + + // Check if the publisher has a Publishers field of type publisher.Publishers + // This handles the case where the publisher is a *publish.S + val := reflect.ValueOf(p) + if val.Kind() == reflect.Ptr { + val = val.Elem() + if val.Kind() == reflect.Struct { + field := val.FieldByName("Publishers") + if field.IsValid() && field.Type().String() == "publisher.Publishers" { + // Iterate through the publishers + for i := 0; i < field.Len(); i++ { + pub := field.Index(i).Interface().(publisher.I) + // Check if this publisher is of type *Publisher + if openPub, ok := pub.(*Publisher); ok { + if openPub.SubscriptionExists(clientId, subscriptionId) { + return true + } + } + } + } + } + } + } + return false +} diff --git a/pkg/protocol/openapi/publisher_test.go b/pkg/protocol/openapi/publisher_test.go new file mode 100644 index 0000000..eb41335 --- /dev/null +++ b/pkg/protocol/openapi/publisher_test.go @@ -0,0 +1,431 @@ +package openapi + +import ( + "net/http" + "orly.dev/pkg/app/config" + "testing" + "time" + + "orly.dev/pkg/app/relay/publish" + "orly.dev/pkg/encoders/event" + "orly.dev/pkg/encoders/filter" + "orly.dev/pkg/encoders/filters" + "orly.dev/pkg/encoders/kind" + "orly.dev/pkg/encoders/kinds" + "orly.dev/pkg/encoders/tags" + "orly.dev/pkg/interfaces/relay" + "orly.dev/pkg/interfaces/store" + ctx "orly.dev/pkg/utils/context" +) + +// mockServer implements the server.I interface for testing +type mockServer struct { + authRequired bool + context ctx.T +} + +// Implement the methods needed for our tests +func (m *mockServer) AuthRequired() bool { + return m.authRequired +} + +func (m *mockServer) Context() ctx.T { + return m.context +} + +func (m *mockServer) Publisher() *publish.S { + return nil // Not used in our tests +} + +// Stub implementations for the rest of the server.I interface +func (m *mockServer) AcceptEvent( + c ctx.T, ev *event.E, hr *http.Request, authedPubkey []byte, + remote string, +) (accept bool, notice string, afterSave func()) { + return true, "", nil +} + +func (m *mockServer) AcceptReq( + c ctx.T, hr *http.Request, f *filters.T, + authedPubkey []byte, remote string, +) (allowed *filters.T, accept bool, modified bool) { + return f, true, false +} + +func (m *mockServer) AddEvent( + c ctx.T, rl relay.I, ev *event.E, hr *http.Request, origin string, +) (accepted bool, message []byte) { + return true, nil +} + +func (m *mockServer) AdminAuth( + r *http.Request, remote string, tolerance ...time.Duration, +) (authed bool, pubkey []byte) { + return false, nil +} + +func (m *mockServer) UserAuth( + r *http.Request, remote string, tolerance ...time.Duration, +) (authed bool, pubkey []byte) { + return false, nil +} + +func (m *mockServer) Publish(c ctx.T, evt *event.E) (err error) { + return nil +} + +func (m *mockServer) Relay() relay.I { + return nil +} + +func (m *mockServer) Shutdown() {} + +func (m *mockServer) Storage() store.I { + return nil +} + +func (m *mockServer) PublicReadable() bool { + return true +} + +func (m *mockServer) ServiceURL(req *http.Request) (s string) { + return "" +} + +func (m *mockServer) OwnersPubkeys() (pks [][]byte) { + return nil +} + +func (m *mockServer) Config() (c *config.C) { + return +} + +// TestPublisherFunctionality tests the listen/subscribe/unsubscribe and publisher functionality +func TestPublisherFunctionality(t *testing.T) { + // Create a context with cancel function + testCtx, cancel := ctx.Cancel(ctx.Bg()) + defer cancel() + + // Create a mock server + mockServer := &mockServer{ + authRequired: false, + context: testCtx, + } + + // Create a publisher + publisher := NewPublisher(mockServer) + + // Test 1: Register a listener + t.Run( + "RegisterListener", func(t *testing.T) { + // Create a receiver channel + receiver := make(event.C, 32) + + // Create a listener + listener := &H{ + Id: "test-listener", + Receiver: receiver, + FilterMap: make(map[string]*filter.F), + } + + // Register the listener + publisher.Receive(listener) + + // Verify the listener was registered + if _, ok := publisher.ListenMap["test-listener"]; !ok { + t.Errorf("Listener was not registered") + } + }, + ) + + // Test 2: Add a subscription + t.Run( + "AddSubscription", func(t *testing.T) { + // Create a filter + f := &filter.F{} + + // Create a subscription + subscription := &H{ + Id: "test-listener", + FilterMap: map[string]*filter.F{ + "test-subscription": f, + }, + } + + // Add the subscription + publisher.Receive(subscription) + + // Verify the subscription was added + listener, ok := publisher.ListenMap["test-listener"] + if !ok { + t.Errorf("Listener not found") + return + } + + if _, ok := listener.FilterMap["test-subscription"]; !ok { + t.Errorf("Subscription was not added") + } + }, + ) + + // Test 3: Deliver an event + t.Run( + "DeliverEvent", func(t *testing.T) { + // Create an event that matches the filter + ev := &event.E{ + Kind: kind.TextNote, + } + + // Deliver the event + publisher.Deliver(ev) + + // Get the listener + listener, ok := publisher.ListenMap["test-listener"] + if !ok { + t.Errorf("Listener not found") + return + } + + // Verify the event was received + select { + case receivedEv := <-listener.Receiver: + if receivedEv != ev { + t.Errorf("Received event does not match delivered event") + } + case <-time.After(100 * time.Millisecond): + t.Errorf("Event was not received within timeout") + } + }, + ) + + // Test 4: Unsubscribe + t.Run( + "Unsubscribe", func(t *testing.T) { + // Create a new listener first since the previous one was removed + receiver := make(event.C, 32) + listener := &H{ + Id: "test-listener", + Receiver: receiver, + FilterMap: make(map[string]*filter.F), + } + publisher.Receive(listener) + + // Add a subscription + subscription := &H{ + Id: "test-listener", + FilterMap: map[string]*filter.F{ + "test-subscription": &filter.F{}, + }, + } + publisher.Receive(subscription) + + // Create an unsubscribe message + unsubscribe := &H{ + Id: "test-listener", + FilterMap: map[string]*filter.F{ + "test-subscription": nil, + }, + Cancel: true, + } + + // Unsubscribe + publisher.Receive(unsubscribe) + + // Verify the listener was removed (since it had no more subscriptions) + if _, ok := publisher.ListenMap["test-listener"]; ok { + t.Errorf("Listener was not removed, but should be removed when all subscriptions are gone") + } + }, + ) + + // Test 5: Remove listener + t.Run( + "RemoveListener", func(t *testing.T) { + // Create a remove listener message + removeListener := &H{ + Id: "test-listener", + Cancel: true, + } + + // Remove the listener + publisher.Receive(removeListener) + + // Verify the listener was removed + if _, ok := publisher.ListenMap["test-listener"]; ok { + t.Errorf("Listener was not removed") + } + }, + ) + + // Test 6: Edge case - Unsubscribe non-existent subscription + t.Run( + "UnsubscribeNonExistentSubscription", func(t *testing.T) { + // Create a new listener first + receiver := make(event.C, 32) + listener := &H{ + Id: "test-listener-2", + Receiver: receiver, + FilterMap: make(map[string]*filter.F), + } + publisher.Receive(listener) + + // Add a subscription to ensure the listener has at least one subscription + subscription := &H{ + Id: "test-listener-2", + FilterMap: map[string]*filter.F{ + "existing-subscription": &filter.F{}, + }, + } + publisher.Receive(subscription) + + // Create an unsubscribe message for a non-existent subscription + unsubscribe := &H{ + Id: "test-listener-2", + FilterMap: map[string]*filter.F{ + "non-existent-subscription": nil, + }, + Cancel: true, + } + + // Unsubscribe + publisher.Receive(unsubscribe) + + // Verify the listener still exists (since it still has one subscription) + if _, ok := publisher.ListenMap["test-listener-2"]; !ok { + t.Errorf("Listener was removed, but should still exist since it has other subscriptions") + } + + // Verify the existing subscription is still there + listener, ok := publisher.ListenMap["test-listener-2"] + if !ok { + t.Errorf("Listener not found") + return + } + if _, ok := listener.FilterMap["existing-subscription"]; !ok { + t.Errorf("Existing subscription was removed") + } + }, + ) + + // Test 7: Edge case - Deliver event with authentication required + t.Run( + "DeliverEventWithAuthRequired", func(t *testing.T) { + // Set auth required to true + mockServer.authRequired = true + + // Create a new listener with pubkey + receiver := make(event.C, 32) + listener := &H{ + Id: "test-listener-3", + Receiver: receiver, + FilterMap: make(map[string]*filter.F), + Pubkey: []byte("test-pubkey"), + } + publisher.Receive(listener) + + // Add a subscription + subscription := &H{ + Id: "test-listener-3", + FilterMap: map[string]*filter.F{ + "test-subscription-3": &filter.F{}, + }, + } + publisher.Receive(subscription) + + // Create an event with a different pubkey and a privileged kind + ev := &event.E{ + Kind: kind.EncryptedDirectMessage, + Pubkey: []byte("different-pubkey"), + Tags: tags.New(), // Initialize empty tags + } + + // Deliver the event + publisher.Deliver(ev) + + // Verify the event was not received (due to auth check) + select { + case <-listener.Receiver: + t.Errorf("Event was received, but should have been blocked by auth check") + case <-time.After(100 * time.Millisecond): + // This is expected - no event should be received + } + + // Reset auth required + mockServer.authRequired = false + }, + ) + + // Test 8: Filter matching - Events are only delivered to listeners with matching filters + t.Run( + "FilterMatching", func(t *testing.T) { + // Create two listeners with different filters + receiver1 := make(event.C, 32) + listener1 := &H{ + Id: "test-listener-filter-1", + Receiver: receiver1, + FilterMap: make(map[string]*filter.F), + } + publisher.Receive(listener1) + + receiver2 := make(event.C, 32) + listener2 := &H{ + Id: "test-listener-filter-2", + Receiver: receiver2, + FilterMap: make(map[string]*filter.F), + } + publisher.Receive(listener2) + + // Add different filters to each listener + // First filter matches events with kind.TextNote + filter1 := &filter.F{ + Kinds: kinds.New(kind.TextNote), + } + subscription1 := &H{ + Id: "test-listener-filter-1", + FilterMap: map[string]*filter.F{ + "filter-subscription-1": filter1, + }, + } + publisher.Receive(subscription1) + + // Second filter matches events with kind.EncryptedDirectMessage + filter2 := &filter.F{ + Kinds: kinds.New(kind.EncryptedDirectMessage), + } + subscription2 := &H{ + Id: "test-listener-filter-2", + FilterMap: map[string]*filter.F{ + "filter-subscription-2": filter2, + }, + } + publisher.Receive(subscription2) + + // Create an event that matches only the first filter + ev := &event.E{ + Kind: kind.TextNote, + Tags: tags.New(), + } + + // Deliver the event + publisher.Deliver(ev) + + // Verify the event was received by the first listener + select { + case receivedEv := <-receiver1: + if receivedEv != ev { + t.Errorf("Received event does not match delivered event") + } + case <-time.After(100 * time.Millisecond): + t.Errorf("Event was not received by first listener within timeout") + } + + // Verify the event was NOT received by the second listener + select { + case <-receiver2: + t.Errorf("Event was received by second listener, but should not have matched its filter") + case <-time.After(100 * time.Millisecond): + // This is expected - no event should be received + } + }, + ) +} diff --git a/pkg/protocol/openapi/subscribe.go b/pkg/protocol/openapi/subscribe.go new file mode 100644 index 0000000..ea177de --- /dev/null +++ b/pkg/protocol/openapi/subscribe.go @@ -0,0 +1,82 @@ +package openapi + +import ( + "github.com/danielgtaylor/huma/v2" + "net/http" + "orly.dev/pkg/app/relay/helpers" + "orly.dev/pkg/encoders/filter" + "orly.dev/pkg/utils/context" + "orly.dev/pkg/utils/log" +) + +type SubscribeInput struct { + Auth string `header:"Authorization" doc:"nostr nip-98 (and expiring variant)" required:"false"` + Accept string `header:"Accept" default:"application/nostr+json"` + ClientId string `path:"client_id" doc:"Client identifier code associated with subscription channel created with /listen"` + Id string `path:"id" doc:"Identifier of the subscription associated with the filter"` + Body *Filter `doc:"filter JSON (standard NIP-01 filter syntax)"` +} + +func (x *Operations) RegisterSubscribe(api huma.API) { + name := "Subscribe" + description := `Create a new subscription based on a provided filter that will return new events that have arrived matching the subscription filter, over the HTTP SSE channel identified by client_id, created by the /listen endpoint.` + path := x.path + "/subscribe/{client_id}/{id}" + scopes := []string{"user", "read"} + method := http.MethodPost + huma.Register( + api, huma.Operation{ + OperationID: name, + Summary: name, + Path: path, + Method: method, + Tags: []string{"events"}, + RequestBody: EventsBody, + Description: helpers.GenerateDescription(description, scopes), + Security: []map[string][]string{{"auth": scopes}}, + }, func(ctx context.T, input *SubscribeInput) ( + output *struct{}, err error, + ) { + // Validate client_id exists + if input.ClientId == "" { + return nil, huma.Error400BadRequest("client_id is required") + } + + // Validate subscription ID exists + if input.Id == "" { + return nil, huma.Error400BadRequest("subscription id is required") + } + + // Validate filter exists + if input.Body == nil { + return nil, huma.Error400BadRequest("filter is required") + } + + // Check if the client ID exists + if !CheckListenerExists(input.ClientId, x.Publisher()) { + return nil, huma.Error404NotFound("client_id does not exist, create a listener first with the /listen endpoint") + } + + // Convert the Filter to a filter.F + f := input.Body.ToFilter() + + // Create a subscription message + subscription := &H{ + Id: input.ClientId, + FilterMap: map[string]*filter.F{ + input.Id: f, + }, + } + + // Send the subscription to the publisher. The publisher will route + // it to the appropriate handler based on Type() + x.Publisher().Receive(subscription) + + log.T.F( + "added subscription %s for listener %s\nfilter %s", input.Id, + input.ClientId, f.Marshal(nil), + ) + + return + }, + ) +} diff --git a/pkg/protocol/openapi/unsubscribe.go b/pkg/protocol/openapi/unsubscribe.go new file mode 100644 index 0000000..13f996c --- /dev/null +++ b/pkg/protocol/openapi/unsubscribe.go @@ -0,0 +1,77 @@ +package openapi + +import ( + "github.com/danielgtaylor/huma/v2" + "net/http" + "orly.dev/pkg/app/relay/helpers" + "orly.dev/pkg/encoders/filter" + "orly.dev/pkg/utils/context" + "orly.dev/pkg/utils/log" +) + +type UnsubscribeInput struct { + ClientId string `path:"client_id" doc:"Client identifier code associated with subscription channel created with /listen"` + Id string `path:"id" doc:"Identifier of the subscription to cancel"` +} + +func (x *Operations) RegisterUnsubscribe(api huma.API) { + name := "Unsubscribe" + description := `Cancel a subscription with the matching subscription identifier, attached to the identified client_id associated with the HTTP SSE connection.` + path := x.path + "/unsubscribe/{client_id}/{id}" + scopes := []string{"user", "read"} + method := http.MethodPost + huma.Register( + api, huma.Operation{ + OperationID: name, + Summary: name, + Path: path, + Method: method, + Tags: []string{"events"}, + Description: helpers.GenerateDescription(description, scopes), + Security: []map[string][]string{{"auth": scopes}}, + }, func(ctx context.T, input *UnsubscribeInput) ( + output *struct{}, err error, + ) { + // Validate client_id exists + if input.ClientId == "" { + return nil, huma.Error400BadRequest("client_id is required") + } + + // Validate subscription ID exists + if input.Id == "" { + return nil, huma.Error400BadRequest("subscription id is required") + } + + // Check if the client ID exists + if !CheckListenerExists(input.ClientId, x.Publisher()) { + return nil, huma.Error404NotFound("client_id doesn't exist, create a listener first with the /listen endpoint") + } + + // Check if the subscription ID exists + if !CheckSubscriptionExists( + input.ClientId, input.Id, x.Publisher(), + ) { + return nil, huma.Error404NotFound("subscription id doesn't exist for this client") + } + + // Create a cancel subscription message + unsubscribe := &H{ + Id: input.ClientId, + FilterMap: map[string]*filter.F{ + input.Id: nil, // We only need the key, not the value + }, + Cancel: true, // Set Cancel to true to remove the subscription + } + + // Send the unsubscribe message to the publisher. The publisher will route it to the appropriate handler based on Type() + x.Publisher().Receive(unsubscribe) + + log.T.F( + "removed subscription %s for listener %s", input.Id, + input.ClientId, + ) + + return &struct{}{}, nil + }, + ) +} diff --git a/pkg/protocol/socketapi/handleEvent.go b/pkg/protocol/socketapi/handleEvent.go index 9ec9e1f..f95cc8c 100644 --- a/pkg/protocol/socketapi/handleEvent.go +++ b/pkg/protocol/socketapi/handleEvent.go @@ -129,6 +129,7 @@ func (a *A) HandleEvent( } return } + log.I.F("checking if policy allows this event") // check that relay policy allows this event accept, notice, _ := srv.AcceptEvent( c, env.E, a.Listener.Request, a.Listener.AuthedPubkey(), @@ -145,6 +146,7 @@ func (a *A) HandleEvent( } return } + log.I.F("checking for protected tag") // check for protected tag (NIP-70) protectedTag := env.E.Tags.GetFirst(tag.New("-")) if protectedTag != nil && a.AuthRequired() { diff --git a/pkg/protocol/socketapi/handleMessage.go b/pkg/protocol/socketapi/handleMessage.go index 11efb0b..a3508f4 100644 --- a/pkg/protocol/socketapi/handleMessage.go +++ b/pkg/protocol/socketapi/handleMessage.go @@ -26,7 +26,8 @@ import ( // corresponding handler method, generates a notice for errors or unknown types, // logs the notice, and writes it back to the listener if required. func (a *A) HandleMessage(msg, authedPubkey []byte) { - log.T.F("%s received message:\n%s", a.Listener.RealRemote(), string(msg)) + remote := a.Listener.RealRemote() + log.T.F("%s received message:\n%s", remote, string(msg)) var notice []byte var err error var t string diff --git a/pkg/protocol/socketapi/handleReq.go b/pkg/protocol/socketapi/handleReq.go index a254795..0b16676 100644 --- a/pkg/protocol/socketapi/handleReq.go +++ b/pkg/protocol/socketapi/handleReq.go @@ -76,7 +76,8 @@ func (a *A) HandleReq(c context.T, req []byte, srv server.I) (r []byte) { return } if !a.I.PublicReadable() { - // send a notice in case the client renders it to explain why auth is required + // send a notice in case the client renders it to explain why auth + // is required opks := a.I.OwnersPubkeys() var npubList string for i, pk := range opks { diff --git a/pkg/protocol/socketapi/publisher.go b/pkg/protocol/socketapi/publisher.go index 5dba83d..0d87ccc 100644 --- a/pkg/protocol/socketapi/publisher.go +++ b/pkg/protocol/socketapi/publisher.go @@ -6,6 +6,7 @@ import ( "orly.dev/pkg/encoders/filters" "orly.dev/pkg/interfaces/publisher" "orly.dev/pkg/interfaces/server" + "orly.dev/pkg/interfaces/typer" "orly.dev/pkg/protocol/auth" "orly.dev/pkg/protocol/ws" "orly.dev/pkg/utils/chk" @@ -81,7 +82,7 @@ func (p *S) Type() (typeName string) { return Type } // - Otherwise, adds the subscription to the map under a mutex lock. // // - Logs actions related to subscription creation or removal. -func (p *S) Receive(msg publisher.Message) { +func (p *S) Receive(msg typer.T) { if m, ok := msg.(*W); ok { if m.Cancel { if m.Id == "" { @@ -127,18 +128,24 @@ func (p *S) Receive(msg publisher.Message) { // applies authentication checks if required by the server, and skips delivery // for unauthenticated users when events are privileged. func (p *S) Deliver(ev *event.E) { - log.T.F("delivering event %0x to subscribers", ev.ID) var err error p.Mx.Lock() defer p.Mx.Unlock() + log.T.F( + "delivering event %0x to websocket subscribers %d", ev.ID, len(p.Map), + ) for w, subs := range p.Map { - // log.I.F("%v %s", subs, w.RealRemote()) + log.I.F("%v %s", subs, w.RealRemote()) for id, subscriber := range subs { - // log.T.F( - // "subscriber %s\n%s", w.RealRemote(), - // subscriber.Marshal(nil), - // ) + log.T.F( + "subscriber %s\n%s", w.RealRemote(), + subscriber.Marshal(nil), + ) if !subscriber.Match(ev) { + log.I.F( + "subscriber %s filter %s not match", id, + subscriber.Marshal(nil), + ) continue } if p.Server.AuthRequired() { diff --git a/pkg/protocol/socketapi/socketapi.go b/pkg/protocol/socketapi/socketapi.go index af161dd..a087b3e 100644 --- a/pkg/protocol/socketapi/socketapi.go +++ b/pkg/protocol/socketapi/socketapi.go @@ -54,6 +54,22 @@ type A struct { // resources on connection termination or cancellation, adhering to the given // context's lifecycle. func (a *A) Serve(w http.ResponseWriter, r *http.Request, s server.I) { + c := a.Config() + remote := helpers.GetRemoteFromReq(r) + var whitelisted bool + if len(c.Whitelist) > 0 { + for _, addr := range c.Whitelist { + if strings.HasPrefix(remote, addr) { + whitelisted = true + } + } + } else { + whitelisted = true + } + if !whitelisted { + return + } + var err error ticker := time.NewTicker(DefaultPingWait) var cancel context.F diff --git a/pkg/version/version b/pkg/version/version index 086ecfd..d4dfa56 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.2.20 \ No newline at end of file +v0.3.0 \ No newline at end of file