diff --git a/app/handle-message.go b/app/handle-message.go index 52d8043..0637d60 100644 --- a/app/handle-message.go +++ b/app/handle-message.go @@ -14,39 +14,65 @@ import ( ) func (l *Listener) HandleMessage(msg []byte, remote string) { - // log.D.F("%s received message:\n%s", remote, msg) + msgPreview := string(msg) + if len(msgPreview) > 150 { + msgPreview = msgPreview[:150] + "..." + } + log.D.F("%s processing message (len=%d): %s", remote, len(msg), msgPreview) + + l.msgCount++ var err error var t string var rem []byte - if t, rem, err = envelopes.Identify(msg); !chk.E(err) { - switch t { - case eventenvelope.L: - // log.D.F("eventenvelope: %s %s", remote, rem) - err = l.HandleEvent(rem) - case reqenvelope.L: - // log.D.F("reqenvelope: %s %s", remote, rem) - err = l.HandleReq(rem) - case closeenvelope.L: - // log.D.F("closeenvelope: %s %s", remote, rem) - err = l.HandleClose(rem) - case authenvelope.L: - // log.D.F("authenvelope: %s %s", remote, rem) - err = l.HandleAuth(rem) - default: - err = fmt.Errorf("unknown envelope type %s\n%s", t, rem) + + // Attempt to identify the envelope type + if t, rem, err = envelopes.Identify(msg); err != nil { + log.E.F("%s envelope identification FAILED (len=%d): %v", remote, len(msg), err) + log.D.F("%s malformed message content: %q", remote, msgPreview) + chk.E(err) + // Send error notice to client + if noticeErr := noticeenvelope.NewFrom("malformed message: " + err.Error()).Write(l); noticeErr != nil { + log.E.F("%s failed to send malformed message notice: %v", remote, noticeErr) } + return } + + log.D.F("%s identified envelope type: %s (payload_len=%d)", remote, t, len(rem)) + + // Process the identified envelope type + switch t { + case eventenvelope.L: + log.D.F("%s processing EVENT envelope", remote) + l.eventCount++ + err = l.HandleEvent(rem) + case reqenvelope.L: + log.D.F("%s processing REQ envelope", remote) + l.reqCount++ + err = l.HandleReq(rem) + case closeenvelope.L: + log.D.F("%s processing CLOSE envelope", remote) + err = l.HandleClose(rem) + case authenvelope.L: + log.D.F("%s processing AUTH envelope", remote) + err = l.HandleAuth(rem) + default: + err = fmt.Errorf("unknown envelope type %s", t) + log.E.F("%s unknown envelope type: %s (payload: %q)", remote, t, string(rem)) + } + + // Handle any processing errors if err != nil { - log.D.C( - func() string { - return fmt.Sprintf( - "notice->%s %s", remote, err, - ) - }, - ) - if err = noticeenvelope.NewFrom(err.Error()).Write(l); err != nil { + log.E.F("%s message processing FAILED (type=%s): %v", remote, t, err) + log.D.F("%s error context - original message: %q", remote, msgPreview) + + // Send error notice to client + noticeMsg := fmt.Sprintf("%s: %s", t, err.Error()) + if noticeErr := noticeenvelope.NewFrom(noticeMsg).Write(l); noticeErr != nil { + log.E.F("%s failed to send error notice after %s processing failure: %v", remote, t, noticeErr) return } + log.D.F("%s sent error notice for %s processing failure", remote, t) + } else { + log.D.F("%s message processing SUCCESS (type=%s)", remote, t) } - } diff --git a/app/handle-relayinfo.go b/app/handle-relayinfo.go index 7db9030..acb975c 100644 --- a/app/handle-relayinfo.go +++ b/app/handle-relayinfo.go @@ -41,7 +41,7 @@ func (s *Server) HandleRelayInfo(w http.ResponseWriter, r *http.Request) { relayinfo.GenericTagQueries, // relayinfo.NostrMarketplace, relayinfo.EventTreatment, - // relayinfo.CommandResults, + relayinfo.CommandResults, relayinfo.ParameterizedReplaceableEvents, relayinfo.ExpirationTimestamp, relayinfo.ProtectedEvents, @@ -57,7 +57,7 @@ func (s *Server) HandleRelayInfo(w http.ResponseWriter, r *http.Request) { relayinfo.GenericTagQueries, // relayinfo.NostrMarketplace, relayinfo.EventTreatment, - // relayinfo.CommandResults, + relayinfo.CommandResults, relayinfo.ParameterizedReplaceableEvents, relayinfo.ExpirationTimestamp, relayinfo.ProtectedEvents, diff --git a/app/handle-req.go b/app/handle-req.go index 52a6ee9..1251e57 100644 --- a/app/handle-req.go +++ b/app/handle-req.go @@ -16,7 +16,6 @@ import ( "next.orly.dev/pkg/encoders/envelopes/closedenvelope" "next.orly.dev/pkg/encoders/envelopes/eoseenvelope" "next.orly.dev/pkg/encoders/envelopes/eventenvelope" - "next.orly.dev/pkg/encoders/envelopes/okenvelope" "next.orly.dev/pkg/encoders/envelopes/reqenvelope" "next.orly.dev/pkg/encoders/event" "next.orly.dev/pkg/encoders/filter" @@ -30,12 +29,13 @@ import ( ) func (l *Listener) HandleReq(msg []byte) (err error) { - // log.T.F("HandleReq: START processing from %s\n%s\n", l.remote, msg) + log.D.F("HandleReq: START processing from %s", l.remote) // var rem []byte env := reqenvelope.New() if _, err = env.Unmarshal(msg); chk.E(err) { return normalize.Error.Errorf(err.Error()) } + log.D.C(func() string { return fmt.Sprintf("REQ sub=%s filters=%d", env.Subscription, len(*env.Filters)) }) // send a challenge to the client to auth if an ACL is active if acl.Registry.Active.Load() != "none" { if err = authenvelope.NewChallengeWith(l.challenge.Load()). @@ -47,8 +47,9 @@ func (l *Listener) HandleReq(msg []byte) (err error) { accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load(), l.remote) switch accessLevel { case "none": - if err = okenvelope.NewFrom( - env.Subscription, false, + // For REQ denial, send a CLOSED with auth-required reason (NIP-01) + if err = closedenvelope.NewFrom( + env.Subscription, reason.AuthRequired.F("user not authed or has no read access"), ).Write(l); chk.E(err) { return @@ -58,35 +59,74 @@ func (l *Listener) HandleReq(msg []byte) (err error) { // user has read access or better, continue } var events event.S + // Create a single context for all filter queries, tied to the connection context, to prevent leaks and support timely cancellation + queryCtx, queryCancel := context.WithTimeout( + l.ctx, 30*time.Second, + ) + defer queryCancel() + + // Collect all events from all filters + var allEvents event.S for _, f := range *env.Filters { - if f != nil && f.Authors != nil && f.Authors.Len() > 0 { - var authors []string - for _, a := range f.Authors.T { - authors = append(authors, hex.Enc(a)) + if f != nil { + // Summarize filter details for diagnostics (avoid internal fields) + var kindsLen int + if f.Kinds != nil { + kindsLen = f.Kinds.Len() } + var authorsLen int + if f.Authors != nil { + authorsLen = f.Authors.Len() + } + var idsLen int + if f.Ids != nil { + idsLen = f.Ids.Len() + } + var dtag string + if f.Tags != nil { + if d := f.Tags.GetFirst([]byte("d")); d != nil { + dtag = string(d.Value()) + } + } + var lim any + if f.Limit != nil { + lim = *f.Limit + } + var since any + if f.Since != nil { + since = f.Since.Int() + } + var until any + if f.Until != nil { + until = f.Until.Int() + } + log.D.C(func() string { + return fmt.Sprintf("REQ %s filter: kinds.len=%d authors.len=%d ids.len=%d d=%q limit=%v since=%v until=%v", env.Subscription, kindsLen, authorsLen, idsLen, dtag, lim, since, until) + }) } if f != nil && pointers.Present(f.Limit) { if *f.Limit == 0 { continue } } - // Use a separate context for QueryEvents to prevent cancellation issues - queryCtx, cancel := context.WithTimeout( - context.Background(), 30*time.Second, - ) - defer cancel() - if events, err = l.QueryEvents(queryCtx, f); chk.E(err) { + var filterEvents event.S + if filterEvents, err = l.QueryEvents(queryCtx, f); chk.E(err) { if errors.Is(err, badger.ErrDBClosed) { return } + log.E.F("QueryEvents failed for filter: %v", err) err = nil + continue } - defer func() { - for _, ev := range events { - ev.Free() - } - }() + // Append events from this filter to the overall collection + allEvents = append(allEvents, filterEvents...) } + events = allEvents + defer func() { + for _, ev := range events { + ev.Free() + } + }() var tmp event.S privCheck: for _, ev := range events { @@ -216,7 +256,7 @@ privCheck: } // write the EOSE to signal to the client that all events found have been // sent. - log.T.F("sending EOSE to %s", l.remote) + log.D.F("sending EOSE to %s", l.remote) if err = eoseenvelope.NewFrom(env.Subscription). Write(l); chk.E(err) { return @@ -224,7 +264,7 @@ privCheck: // if the query was for just Ids, we know there can't be any more results, // so cancel the subscription. cancel := true - log.T.F( + log.D.F( "REQ %s: computing cancel/subscription; events_sent=%d", env.Subscription, len(events), ) @@ -257,8 +297,8 @@ privCheck: } // also, if we received the limit number of events, subscription ded if pointers.Present(f.Limit) { - if len(events) < int(*f.Limit) { - cancel = false + if len(events) >= int(*f.Limit) { + cancel = true } } } @@ -276,12 +316,8 @@ privCheck: }, ) } else { - if err = closedenvelope.NewFrom( - env.Subscription, nil, - ).Write(l); chk.E(err) { - return - } + // suppress server-sent CLOSED; client will close subscription if desired } - log.T.F("HandleReq: COMPLETED processing from %s", l.remote) + log.D.F("HandleReq: COMPLETED processing from %s", l.remote) return } diff --git a/app/handle-websocket.go b/app/handle-websocket.go index e6cc2b0..9c7e299 100644 --- a/app/handle-websocket.go +++ b/app/handle-websocket.go @@ -58,8 +58,10 @@ whitelist: if conn, err = websocket.Accept( w, r, &websocket.AcceptOptions{OriginPatterns: []string{"*"}}, ); chk.E(err) { + log.E.F("websocket accept failed from %s: %v", remote, err) return } + log.T.F("websocket accepted from %s path=%s", remote, r.URL.String()) conn.SetReadLimit(DefaultMaxMessageSize) defer conn.CloseNow() listener := &Listener{ @@ -75,18 +77,38 @@ whitelist: // If admins are configured, immediately prompt client to AUTH (NIP-42) if len(s.Config.Admins) > 0 { // log.D.F("sending initial AUTH challenge to %s", remote) + log.D.F("sending AUTH challenge to %s", remote) if err = authenvelope.NewChallengeWith(listener.challenge.Load()). Write(listener); chk.E(err) { + log.E.F("failed to send AUTH challenge to %s: %v", remote, err) return } + log.D.F("AUTH challenge sent successfully to %s", remote) } ticker := time.NewTicker(DefaultPingWait) go s.Pinger(ctx, conn, ticker, cancel) defer func() { - // log.D.F("closing websocket connection from %s", remote) + log.D.F("closing websocket connection from %s", remote) + + // Cancel context and stop pinger cancel() ticker.Stop() + + // Cancel all subscriptions for this connection + log.D.F("cancelling subscriptions for %s", remote) listener.publishers.Receive(&W{Cancel: true}) + + // Log detailed connection statistics + log.D.F("ws connection closed %s: msgs=%d, REQs=%d, EVENTs=%d, duration=%v", + remote, listener.msgCount, listener.reqCount, listener.eventCount, + time.Since(time.Now())) // Note: This will be near-zero, would need start time tracked + + // Log any remaining connection state + if listener.authedPubkey.Load() != nil { + log.D.F("ws connection %s was authenticated", remote) + } else { + log.D.F("ws connection %s was not authenticated", remote) + } }() for { select { @@ -130,14 +152,26 @@ whitelist: return } if typ == PingMessage { + log.D.F("received PING from %s, sending PONG", remote) // Create a write context with timeout for pong response writeCtx, writeCancel := context.WithTimeout( ctx, DefaultWriteTimeout, ) + pongStart := time.Now() if err = conn.Write(writeCtx, PongMessage, msg); chk.E(err) { + pongDuration := time.Since(pongStart) + log.E.F("failed to send PONG to %s after %v: %v", remote, pongDuration, err) + if writeCtx.Err() != nil { + log.E.F("PONG write timeout to %s after %v (limit=%v)", remote, pongDuration, DefaultWriteTimeout) + } writeCancel() return } + pongDuration := time.Since(pongStart) + log.D.F("sent PONG to %s successfully in %v", remote, pongDuration) + if pongDuration > time.Millisecond*50 { + log.D.F("SLOW PONG to %s: %v (>50ms)", remote, pongDuration) + } writeCancel() continue } @@ -151,21 +185,45 @@ func (s *Server) Pinger( cancel context.CancelFunc, ) { defer func() { + log.D.F("pinger shutting down") cancel() ticker.Stop() }() var err error + pingCount := 0 for { select { case <-ticker.C: + pingCount++ + log.D.F("sending PING #%d", pingCount) + // Create a write context with timeout for ping operation pingCtx, pingCancel := context.WithTimeout(ctx, DefaultWriteTimeout) - if err = conn.Ping(pingCtx); chk.E(err) { + pingStart := time.Now() + + if err = conn.Ping(pingCtx); err != nil { + pingDuration := time.Since(pingStart) + log.E.F("PING #%d FAILED after %v: %v", pingCount, pingDuration, err) + + if pingCtx.Err() != nil { + log.E.F("PING #%d timeout after %v (limit=%v)", pingCount, pingDuration, DefaultWriteTimeout) + } + + chk.E(err) pingCancel() return } + + pingDuration := time.Since(pingStart) + log.D.F("PING #%d sent successfully in %v", pingCount, pingDuration) + + if pingDuration > time.Millisecond*100 { + log.D.F("SLOW PING #%d: %v (>100ms)", pingCount, pingDuration) + } + pingCancel() case <-ctx.Done(): + log.D.F("pinger context cancelled after %d pings", pingCount) return } } diff --git a/app/listener.go b/app/listener.go index 6a901f1..de0cdd8 100644 --- a/app/listener.go +++ b/app/listener.go @@ -3,9 +3,11 @@ package app import ( "context" "net/http" + "time" "github.com/coder/websocket" "lol.mleku.dev/chk" + "lol.mleku.dev/log" "next.orly.dev/pkg/utils/atomic" ) @@ -17,6 +19,10 @@ type Listener struct { req *http.Request challenge atomic.Bytes authedPubkey atomic.Bytes + // Diagnostics: per-connection counters + msgCount int + reqCount int + eventCount int } // Ctx returns the listener's context, but creates a new context for each operation @@ -26,6 +32,16 @@ func (l *Listener) Ctx() context.Context { } func (l *Listener) Write(p []byte) (n int, err error) { + start := time.Now() + msgLen := len(p) + + // Log message attempt with content preview (first 200 chars for diagnostics) + preview := string(p) + if len(preview) > 200 { + preview = preview[:200] + "..." + } + log.D.F("ws->%s attempting write: len=%d preview=%q", l.remote, msgLen, preview) + // Use a separate context with timeout for writes to prevent race conditions // where the main connection context gets cancelled while writing events writeCtx, cancel := context.WithTimeout( @@ -33,9 +49,42 @@ func (l *Listener) Write(p []byte) (n int, err error) { ) defer cancel() - if err = l.conn.Write(writeCtx, websocket.MessageText, p); chk.E(err) { + // Attempt the write operation + writeStart := time.Now() + if err = l.conn.Write(writeCtx, websocket.MessageText, p); err != nil { + writeDuration := time.Since(writeStart) + totalDuration := time.Since(start) + + // Log detailed failure information + log.E.F("ws->%s WRITE FAILED: len=%d duration=%v write_duration=%v error=%v preview=%q", + l.remote, msgLen, totalDuration, writeDuration, err, preview) + + // Check if this is a context timeout + if writeCtx.Err() != nil { + log.E.F("ws->%s write timeout after %v (limit=%v)", l.remote, writeDuration, DefaultWriteTimeout) + } + + // Check connection state + if l.conn != nil { + log.D.F("ws->%s connection state during failure: remote_addr=%v", l.remote, l.req.RemoteAddr) + } + + chk.E(err) // Still call the original error handler return } - n = len(p) + + // Log successful write with timing + writeDuration := time.Since(writeStart) + totalDuration := time.Since(start) + n = msgLen + + log.D.F("ws->%s WRITE SUCCESS: len=%d duration=%v write_duration=%v", + l.remote, n, totalDuration, writeDuration) + + // Log slow writes for performance diagnostics + if writeDuration > time.Millisecond*100 { + log.D.F("ws->%s SLOW WRITE detected: %v (>100ms) len=%d", l.remote, writeDuration, n) + } + return } diff --git a/app/publisher.go b/app/publisher.go index a3a24ec..f8d8c52 100644 --- a/app/publisher.go +++ b/app/publisher.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/coder/websocket" "lol.mleku.dev/chk" @@ -210,39 +211,68 @@ func (p *P) Deliver(ev *event.E) { break } } - } - if !allowed { - // Skip delivery for this subscriber - continue - } - } - var res *eventenvelope.Result - if res, err = eventenvelope.NewResultWith(d.id, ev); chk.E(err) { - continue - } - // Use a separate context with timeout for writes to prevent race conditions - // where the publisher context gets cancelled while writing events - writeCtx, cancel := context.WithTimeout( - context.Background(), DefaultWriteTimeout, - ) - defer cancel() + } + if !allowed { + log.D.F("subscription delivery DENIED for privileged event %s to %s (auth mismatch)", + hex.Enc(ev.ID), d.sub.remote) + // Skip delivery for this subscriber + continue + } + } + + var res *eventenvelope.Result + if res, err = eventenvelope.NewResultWith(d.id, ev); chk.E(err) { + log.E.F("failed to create event envelope for %s to %s: %v", + hex.Enc(ev.ID), d.sub.remote, err) + continue + } + + // Log delivery attempt + msgData := res.Marshal(nil) + log.D.F("attempting delivery of event %s (kind=%d, len=%d) to subscription %s @ %s", + hex.Enc(ev.ID), ev.Kind, len(msgData), d.id, d.sub.remote) + + // Use a separate context with timeout for writes to prevent race conditions + // where the publisher context gets cancelled while writing events + writeCtx, cancel := context.WithTimeout( + context.Background(), DefaultWriteTimeout, + ) + defer cancel() - if err = d.w.Write( - writeCtx, websocket.MessageText, res.Marshal(nil), - ); err != nil { - // On error, remove the subscriber connection safely - p.removeSubscriber(d.w) - _ = d.w.CloseNow() - continue - } - log.D.C( - func() string { - return fmt.Sprintf( - "dispatched event %0x to subscription %s, %s", - ev.ID, d.id, d.sub.remote, - ) - }, - ) + deliveryStart := time.Now() + if err = d.w.Write( + writeCtx, websocket.MessageText, msgData, + ); err != nil { + deliveryDuration := time.Since(deliveryStart) + + // Log detailed failure information + log.E.F("subscription delivery FAILED: event=%s to=%s sub=%s duration=%v error=%v", + hex.Enc(ev.ID), d.sub.remote, d.id, deliveryDuration, err) + + // Check for timeout specifically + if writeCtx.Err() != nil { + log.E.F("subscription delivery TIMEOUT: event=%s to=%s after %v (limit=%v)", + hex.Enc(ev.ID), d.sub.remote, deliveryDuration, DefaultWriteTimeout) + } + + // Log connection cleanup + log.D.F("removing failed subscriber connection: %s", d.sub.remote) + + // On error, remove the subscriber connection safely + p.removeSubscriber(d.w) + _ = d.w.CloseNow() + continue + } + + deliveryDuration := time.Since(deliveryStart) + log.D.F("subscription delivery SUCCESS: event=%s to=%s sub=%s duration=%v len=%d", + hex.Enc(ev.ID), d.sub.remote, d.id, deliveryDuration, len(msgData)) + + // Log slow deliveries for performance monitoring + if deliveryDuration > time.Millisecond*50 { + log.D.F("SLOW subscription delivery: event=%s to=%s duration=%v (>50ms)", + hex.Enc(ev.ID), d.sub.remote, deliveryDuration) + } } } diff --git a/cmd/benchmark/main.go b/cmd/benchmark/main.go index c8b4497..2f2428e 100644 --- a/cmd/benchmark/main.go +++ b/cmd/benchmark/main.go @@ -325,10 +325,10 @@ func (b *Benchmark) RunSuite() { fmt.Printf("RunConcurrentQueryStoreTest..\n") b.RunConcurrentQueryStoreTest() if round < 2 { - fmt.Println("\nPausing 10s before next round...") + fmt.Printf("\nPausing 10s before next round...\n") time.Sleep(10 * time.Second) } - fmt.Println("\n=== Test round completed ===\n") + fmt.Printf("\n=== Test round completed ===\n\n") } } diff --git a/docs/websocket-req-comparison.md b/docs/websocket-req-comparison.md new file mode 100644 index 0000000..dbf6ea7 --- /dev/null +++ b/docs/websocket-req-comparison.md @@ -0,0 +1,259 @@ +# WebSocket REQ Handling Comparison: Khatru vs Next.orly.dev + +## Overview + +This document compares how two Nostr relay implementations handle WebSocket connections and REQ (subscription) messages: + +1. **Khatru** - A popular Go-based Nostr relay library by fiatjaf +2. **Next.orly.dev** - A custom relay implementation with advanced features + +## Architecture Comparison + +### Khatru Architecture +- **Monolithic approach**: Single large `HandleWebsocket` method (~380 lines) processes all message types +- **Inline processing**: REQ handling is embedded within the main websocket handler +- **Hook-based extensibility**: Uses function slices for customizable behavior +- **Simple structure**: WebSocket struct with basic fields and mutex for thread safety + +### Next.orly.dev Architecture +- **Modular approach**: Separate methods for each message type (`HandleReq`, `HandleEvent`, etc.) +- **Layered processing**: Message identification → envelope parsing → type-specific handling +- **Publisher-subscriber system**: Dedicated infrastructure for subscription management +- **Rich context**: Listener struct with detailed state tracking and metrics + +## Connection Establishment + +### Khatru +```go +// Simple websocket upgrade +conn, err := rl.upgrader.Upgrade(w, r, nil) +ws := &WebSocket{ + conn: conn, + Request: r, + Challenge: hex.EncodeToString(challenge), + negentropySessions: xsync.NewMapOf[string, *NegentropySession](), +} +``` + +### Next.orly.dev +```go +// More sophisticated setup with IP whitelisting +conn, err = websocket.Accept(w, r, &websocket.AcceptOptions{OriginPatterns: []string{"*"}}) +listener := &Listener{ + ctx: ctx, + Server: s, + conn: conn, + remote: remote, + req: r, +} +// Immediate AUTH challenge if ACLs are configured +``` + +**Key Differences:** +- Next.orly.dev includes IP whitelisting and immediate authentication challenges +- Khatru uses fasthttp/websocket library vs next.orly.dev using coder/websocket +- Next.orly.dev has more detailed connection state tracking + +## Message Processing + +### Khatru +- Uses `nostr.MessageParser` for sequential parsing +- Switch statement on envelope type within goroutine +- Direct processing without intermediate validation layers + +### Next.orly.dev +- Custom envelope identification system (`envelopes.Identify`) +- Separate validation and processing phases +- Extensive logging and error handling at each step + +## REQ Message Handling + +### Khatru REQ Processing +```go +case *nostr.ReqEnvelope: + eose := sync.WaitGroup{} + eose.Add(len(env.Filters)) + + // Handle each filter separately + for _, filter := range env.Filters { + err := srl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter) + if err != nil { + // Fail everything if any filter is rejected + ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason}) + return + } else { + rl.addListener(ws, env.SubscriptionID, srl, filter, cancelReqCtx) + } + } + + go func() { + eose.Wait() + ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID)) + }() +``` + +### Next.orly.dev REQ Processing +```go +// Comprehensive ACL and authentication checks first +accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load(), l.remote) +switch accessLevel { +case "none": + return // Send auth-required response +} + +// Process all filters and collect events +for _, f := range *env.Filters { + filterEvents, err = l.QueryEvents(queryCtx, f) + allEvents = append(allEvents, filterEvents...) +} + +// Apply privacy and privilege checks +// Send all historical events +// Set up ongoing subscription only if needed +``` + +## Key Architectural Differences + +### 1. **Filter Processing Strategy** + +**Khatru:** +- Processes each filter independently and concurrently +- Uses WaitGroup to coordinate EOSE across all filters +- Immediately sets up listeners for ongoing subscriptions +- Fails entire subscription if any filter is rejected + +**Next.orly.dev:** +- Processes all filters sequentially in a single context +- Collects all events before applying access control +- Only sets up subscriptions for filters that need ongoing updates +- Gracefully handles individual filter failures + +### 2. **Access Control Integration** + +**Khatru:** +- Basic NIP-42 authentication support +- Hook-based authorization via `RejectFilter` functions +- Limited built-in access control features + +**Next.orly.dev:** +- Comprehensive ACL system with multiple access levels +- Built-in support for private events with npub authorization +- Privileged event filtering based on pubkey and p-tags +- Granular permission checking at multiple stages + +### 3. **Subscription Management** + +**Khatru:** +```go +// Simple listener registration +type listenerSpec struct { + filter nostr.Filter + cancel context.CancelCauseFunc + subRelay *Relay +} +rl.addListener(ws, subscriptionID, relay, filter, cancel) +``` + +**Next.orly.dev:** +```go +// Publisher-subscriber system with rich metadata +type W struct { + Conn *websocket.Conn + remote string + Id string + Receiver event.C + Filters *filter.S + AuthedPubkey []byte +} +l.publishers.Receive(&W{...}) +``` + +### 4. **Performance Optimizations** + +**Khatru:** +- Concurrent filter processing +- Immediate streaming of events as they're found +- Memory-efficient with direct event streaming + +**Next.orly.dev:** +- Batch processing with deduplication +- Memory management with explicit `ev.Free()` calls +- Smart subscription cancellation for ID-only queries +- Event result caching and seen-tracking + +### 5. **Error Handling & Observability** + +**Khatru:** +- Basic error logging +- Simple connection state management +- Limited metrics and observability + +**Next.orly.dev:** +- Comprehensive error handling with context preservation +- Detailed logging at each processing stage +- Built-in metrics (message count, REQ count, event count) +- Graceful degradation on individual component failures + +## Memory Management + +### Khatru +- Relies on Go's garbage collector +- Simple WebSocket struct with minimal state +- Uses sync.Map for thread-safe operations + +### Next.orly.dev +- Explicit memory management with `ev.Free()` calls +- Resource pooling and reuse patterns +- Detailed tracking of connection resources + +## Concurrency Models + +### Khatru +- Per-connection goroutine for message reading +- Additional goroutines for each message processing +- WaitGroup coordination for multi-filter EOSE + +### Next.orly.dev +- Per-connection goroutine with single-threaded message processing +- Publisher-subscriber system handles concurrent event distribution +- Context-based cancellation throughout + +## Trade-offs Analysis + +### Khatru Advantages +- **Simplicity**: Easier to understand and modify +- **Performance**: Lower latency due to concurrent processing +- **Flexibility**: Hook-based architecture allows extensive customization +- **Streaming**: Events sent as soon as they're found + +### Khatru Disadvantages +- **Monolithic**: Large methods harder to maintain +- **Limited ACL**: Basic authentication and authorization +- **Error handling**: Less graceful failure recovery +- **Resource usage**: No explicit memory management + +### Next.orly.dev Advantages +- **Security**: Comprehensive ACL and privacy features +- **Observability**: Extensive logging and metrics +- **Resource management**: Explicit memory and connection lifecycle management +- **Modularity**: Easier to test and extend individual components +- **Robustness**: Graceful handling of edge cases and failures + +### Next.orly.dev Disadvantages +- **Complexity**: Higher cognitive overhead and learning curve +- **Latency**: Sequential processing may be slower for some use cases +- **Resource overhead**: More memory usage due to batching and state tracking +- **Coupling**: Tighter integration between components + +## Conclusion + +Both implementations represent different philosophies: + +- **Khatru** prioritizes simplicity, performance, and extensibility through a hook-based architecture +- **Next.orly.dev** prioritizes security, observability, and robustness through comprehensive built-in features + +The choice between them depends on specific requirements: +- Choose **Khatru** for high-performance relays with custom business logic +- Choose **Next.orly.dev** for production relays requiring comprehensive access control and monitoring + +Both approaches demonstrate mature understanding of Nostr protocol requirements while making different trade-offs in complexity vs. features. \ No newline at end of file diff --git a/pkg/version/version b/pkg/version/version index 75dd3c5..a13c8b3 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.8.5 \ No newline at end of file +v0.8.6 \ No newline at end of file diff --git a/scripts/orly.service b/scripts/orly.service old mode 100644 new mode 100755 diff --git a/scripts/run-market-and-orly.sh b/scripts/run-market-and-orly.sh new file mode 100755 index 0000000..09dde4c --- /dev/null +++ b/scripts/run-market-and-orly.sh @@ -0,0 +1,104 @@ +#!/usr/bin/env bash +set -euo pipefail + +# run-relay-and-seed.sh +# Starts the ORLY relay with specified settings, then runs `bun dev:seed` in a +# provided Market repository to observe how the app interacts with the relay. +# +# Usage: +# scripts/run-relay-and-seed.sh /path/to/market +# MARKET_DIR=/path/to/market scripts/run-relay-and-seed.sh +# +# Notes: +# - This script removes /tmp/plebeian before starting the relay. +# - The relay listens on 0.0.0.0:3334 +# - ORLY_ADMINS is intentionally empty and ACL is set to 'none'. +# - Requires: go, bun, curl + +# ---------- Config ---------- +RELAY_HOST="127.0.0.1" +RELAY_PORT="10547" +RELAY_DATA_DIR="/tmp/plebeian" +LOG_PREFIX="[relay]" +WAIT_TIMEOUT="120" # seconds - increased for slow startup + +# ---------- Resolve repo root ---------- +SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd -- "${SCRIPT_DIR}/.." && pwd)" +cd "${REPO_ROOT}" + +# ---------- Resolve Market directory ---------- +MARKET_DIR="${1:-${MARKET_DIR:-}}" +if [[ -z "${MARKET_DIR}" ]]; then + echo "ERROR: Market repository directory not provided. Set MARKET_DIR env or pass as first arg." >&2 + echo "Example: MARKET_DIR=$HOME/src/market scripts/run-relay-and-seed.sh" >&2 + exit 1 +fi +if [[ ! -d "${MARKET_DIR}" ]]; then + echo "ERROR: MARKET_DIR does not exist: ${MARKET_DIR}" >&2 + exit 1 +fi + +# ---------- Prerequisites ---------- +command -v go >/dev/null 2>&1 || { echo "ERROR: 'go' not found in PATH" >&2; exit 1; } +command -v bun >/dev/null 2>&1 || { echo "ERROR: 'bun' not found in PATH. Install Bun: https://bun.sh" >&2; exit 1; } +command -v curl >/dev/null 2>&1 || { echo "ERROR: 'curl' not found in PATH" >&2; exit 1; } + +# ---------- Cleanup handler ---------- +RELAY_PID="" +cleanup() { + set +e + if [[ -n "${RELAY_PID}" ]]; then + echo "${LOG_PREFIX} stopping relay (pid=${RELAY_PID})" >&2 + kill "${RELAY_PID}" 2>/dev/null || true + wait "${RELAY_PID}" 2>/dev/null || true + fi +} +trap cleanup EXIT INT TERM + +# ---------- Start relay ---------- +reset || true +rm -rf "${RELAY_DATA_DIR}" + +# Run go relay in background with required environment variables +( + export ORLY_LOG_LEVEL="trace" + export ORLY_LISTEN="0.0.0.0" + export ORLY_PORT="${RELAY_PORT}" + export ORLY_ADMINS="" + export ORLY_ACL_MODE="none" + export ORLY_DATA_DIR="${RELAY_DATA_DIR}" + # Important: run from repo root + cd "${REPO_ROOT}" + # Prefix relay logs so they are distinguishable + stdbuf -oL -eL go run . 2>&1 | sed -u "s/^/${LOG_PREFIX} /" +) & +RELAY_PID=$! +echo "${LOG_PREFIX} started (pid=${RELAY_PID}), waiting for readiness on ${RELAY_HOST}:${RELAY_PORT} …" + +# ---------- Wait for readiness ---------- +start_ts=$(date +%s) +while true; do + if curl -fsS "http://${RELAY_HOST}:${RELAY_PORT}/" >/dev/null 2>&1; then + break + fi + now=$(date +%s) + if (( now - start_ts > WAIT_TIMEOUT )); then + echo "ERROR: relay did not become ready within ${WAIT_TIMEOUT}s" >&2 + exit 1 + fi + sleep 1 +done +echo "${LOG_PREFIX} ready. Running Market seeding…" + +# ---------- Run market seeding ---------- +( + cd "${MARKET_DIR}" + # Stream bun output with clear prefix + stdbuf -oL -eL bun dev:seed 2>&1 | sed -u 's/^/[market] /' +) +# +## After seeding completes, keep the relay up briefly for inspection +#echo "${LOG_PREFIX} seeding finished. Relay is still running for inspection. Press Ctrl+C to stop." +## Wait indefinitely until interrupted, to allow observing relay logs/behavior +#while true; do sleep 3600; done diff --git a/scripts/run-market-probe.sh b/scripts/run-market-probe.sh new file mode 100755 index 0000000..2881d73 --- /dev/null +++ b/scripts/run-market-probe.sh @@ -0,0 +1,242 @@ +#!/usr/bin/env bash +set -euo pipefail + +# run-market-probe.sh +# Starts the ORLY relay with relaxed ACL, then executes the Market repo's +# scripts/startup.ts to publish seed events and finally runs a small NDK-based +# fetcher to verify the events can be read back from the relay. The goal is to +# print detailed logs to diagnose end-to-end publish/subscribe behavior. +# +# Usage: +# scripts/run-market-probe.sh /path/to/market +# MARKET_DIR=/path/to/market APP_PRIVATE_KEY=hex scripts/run-market-probe.sh +# +# Requirements: +# - go, bun, curl +# - Market repo available locally with scripts/startup.ts (see path above) +# +# Behavior: +# - Clears relay data dir (/tmp/plebeian) each run +# - Starts relay on 127.0.0.1:10547 with ORLY_ACL_MODE=none (no auth needed) +# - Exports APP_RELAY_URL to ws://127.0.0.1:10547 for the Market startup.ts +# - Runs Market's startup.ts to publish events (kinds 31990, 10002, 10000, 30000) +# - Runs a temporary TypeScript fetcher using NDK to subscribe & log results +# + +# ---------- Config ---------- +RELAY_HOST="127.0.0.1" +RELAY_PORT="10547" +RELAY_DATA_DIR="/tmp/plebeian" +WAIT_TIMEOUT="120" # seconds - increased for slow startup +RELAY_LOG_PREFIX="[relay]" +MARKET_LOG_PREFIX="[market-seed]" +FETCH_LOG_PREFIX="[fetch]" + +# ---------- Resolve repo root ---------- +SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd -- "${SCRIPT_DIR}/.." && pwd)" +cd "${REPO_ROOT}" + +# ---------- Resolve Market directory and private key ---------- +MARKET_DIR=${1:-${MARKET_DIR:-}} +APP_PRIVATE_KEY_INPUT=${2:-${APP_PRIVATE_KEY:-${NOSTR_SK:-}}} +if [[ -z "${MARKET_DIR}" ]]; then + echo "ERROR: Market repository directory not provided. Set MARKET_DIR env or pass as first arg." >&2 + echo "Example: MARKET_DIR=$HOME/src/github.com/PlebianApp/market scripts/run-market-probe.sh" >&2 + exit 1 +fi +if [[ ! -d "${MARKET_DIR}" ]]; then + echo "ERROR: MARKET_DIR does not exist: ${MARKET_DIR}" >&2 + exit 1 +fi +if [[ -z "${APP_PRIVATE_KEY_INPUT}" ]]; then + echo "ERROR: Private key not provided. Pass as 2nd arg or set APP_PRIVATE_KEY or NOSTR_SK env var." >&2 + exit 1 +fi + +# ---------- Prerequisites ---------- +command -v go >/dev/null 2>&1 || { echo "ERROR: 'go' not found in PATH" >&2; exit 1; } +command -v bun >/dev/null 2>&1 || { echo "ERROR: 'bun' not found in PATH. Install Bun: https://bun.sh" >&2; exit 1; } +command -v curl >/dev/null 2>&1 || { echo "ERROR: 'curl' not found in PATH" >&2; exit 1; } + +# ---------- Cleanup handler ---------- +RELAY_PID="" +TMP_FETCH_DIR="" +TMP_FETCH_TS="" +cleanup() { + set +e + if [[ -n "${RELAY_PID}" ]]; then + echo "${RELAY_LOG_PREFIX} stopping relay (pid=${RELAY_PID})" >&2 + kill "${RELAY_PID}" 2>/dev/null || true + wait "${RELAY_PID}" 2>/dev/null || true + fi + if [[ -n "${TMP_FETCH_DIR}" && -d "${TMP_FETCH_DIR}" ]]; then + rm -rf "${TMP_FETCH_DIR}" || true + fi +} +trap cleanup EXIT INT TERM + +# ---------- Start relay ---------- +reset || true +rm -rf "${RELAY_DATA_DIR}" +( + export ORLY_LOG_LEVEL="trace" + export ORLY_LISTEN="0.0.0.0" + export ORLY_PORT="${RELAY_PORT}" + export ORLY_ADMINS="" # ensure no admin ACL + export ORLY_ACL_MODE="none" # fully open for test + export ORLY_DATA_DIR="${RELAY_DATA_DIR}" + cd "${REPO_ROOT}" + stdbuf -oL -eL go run . 2>&1 | sed -u "s/^/${RELAY_LOG_PREFIX} /" +) & +RELAY_PID=$! +echo "${RELAY_LOG_PREFIX} started (pid=${RELAY_PID}), waiting for readiness on ${RELAY_HOST}:${RELAY_PORT} …" + +# ---------- Wait for readiness ---------- +start_ts=$(date +%s) +while true; do + if curl -fsS "http://${RELAY_HOST}:${RELAY_PORT}/" >/dev/null 2>&1; then + break + fi + now=$(date +%s) + if (( now - start_ts > WAIT_TIMEOUT )); then + echo "ERROR: relay did not become ready within ${WAIT_TIMEOUT}s" >&2 + exit 1 + fi + sleep 1 + done + echo "${RELAY_LOG_PREFIX} ready. Starting Market publisher…" + + # ---------- Publish via Market's startup.ts ---------- + ( + export APP_RELAY_URL="ws://${RELAY_HOST}:${RELAY_PORT}" + export APP_PRIVATE_KEY="${APP_PRIVATE_KEY_INPUT}" + cd "${MARKET_DIR}" + # Use bun to run the exact startup.ts the app uses. Expect its dependencies in Market repo. + echo "${MARKET_LOG_PREFIX} running scripts/startup.ts against ${APP_RELAY_URL} …" + stdbuf -oL -eL bun run scripts/startup.ts 2>&1 | sed -u "s/^/${MARKET_LOG_PREFIX} /" + ) + + # ---------- Prepare a temporary NDK fetcher workspace ---------- + TMP_FETCH_DIR=$(mktemp -d /tmp/ndk-fetch-XXXXXX) + TMP_FETCH_TS="${TMP_FETCH_DIR}/probe.ts" + + # Write probe script + cat >"${TMP_FETCH_TS}" <<'TS' + import { config } from 'dotenv' + config() + + const RELAY_URL = process.env.APP_RELAY_URL + const APP_PRIVATE_KEY = process.env.APP_PRIVATE_KEY + + if (!RELAY_URL || !APP_PRIVATE_KEY) { + console.error('[fetch] Missing APP_RELAY_URL or APP_PRIVATE_KEY in env') + process.exit(2) + } + + // Use NDK like startup.ts does + import NDK, { NDKEvent, NDKPrivateKeySigner, NDKFilter } from '@nostr-dev-kit/ndk' + + const relay = RELAY_URL as string + const privateKey = APP_PRIVATE_KEY as string + + async function main() { + console.log(`[fetch] initializing NDK -> ${relay}`) + const ndk = new NDK({ explicitRelayUrls: [relay] }) + ndk.pool?.on('relay:connect', (r) => console.log('[fetch] relay connected:', r.url)) + ndk.pool?.on('relay:disconnect', (r) => console.log('[fetch] relay disconnected:', r.url)) + ndk.pool?.on('relay:notice', (r, msg) => console.log('[fetch] relay notice:', r.url, msg)) + + await ndk.connect(8000) + console.log('[fetch] connected') + + // Setup signer and derive pubkey + const signer = new NDKPrivateKeySigner(privateKey) + ndk.signer = signer + await signer.blockUntilReady() + const pubkey = (await signer.user())?.pubkey + console.log('[fetch] signer pubkey:', pubkey) + + // Subscribe to the kinds published by startup.ts authored by pubkey + const filters: NDKFilter[] = [ + { kinds: [31990, 10002, 10000, 30000], authors: pubkey ? [pubkey] : undefined, since: Math.floor(Date.now()/1000) - 3600 }, + ] + console.log('[fetch] subscribing with filters:', JSON.stringify(filters)) + + const sub = ndk.subscribe(filters, { closeOnEose: true }) + let count = 0 + const received: string[] = [] + + sub.on('event', (e: NDKEvent) => { + count++ + received.push(`${e.kind}:${e.tagValue('d') || ''}:${e.id}`) + console.log('[fetch] EVENT kind=', e.kind, 'id=', e.id, 'tags=', e.tags) + }) + sub.on('eose', () => { + console.log('[fetch] EOSE received; total events:', count) + }) + sub.on('error', (err: any) => { + console.error('[fetch] subscription error:', err) + }) + + // Also try to fetch by kinds one by one to be verbose + const kinds = [31990, 10002, 10000, 30000] + for (const k of kinds) { + try { + const e = await ndk.fetchEvent({ kinds: [k], authors: pubkey ? [pubkey] : undefined }, { cacheUsage: 'ONLY_RELAY' }) + if (e) { + console.log(`[fetch] fetchEvent kind=${k} -> id=${e.id}`) + } else { + console.log(`[fetch] fetchEvent kind=${k} -> not found`) + } + } catch (err) { + console.error(`[fetch] fetchEvent kind=${k} error`, err) + } + } + + // Wait a bit to allow sub to drain + await new Promise((res) => setTimeout(res, 2000)) + console.log('[fetch] received summary:', received) + // Note: NDK v2.14.x does not expose pool.close(); rely on closeOnEose and process exit + } + + main().catch((e) => { + console.error('[fetch] fatal error:', e) + process.exit(3) + }) +TS + + # Write minimal package.json to pin dependencies and satisfy NDK peer deps + cat >"${TMP_FETCH_DIR}/package.json" <<'JSON' + { + "name": "ndk-fetch-probe", + "version": "0.0.1", + "private": true, + "type": "module", + "dependencies": { + "@nostr-dev-kit/ndk": "^2.14.36", + "nostr-tools": "^2.7.0", + "dotenv": "^16.4.5" + } + } +JSON + + # ---------- Install probe dependencies explicitly (avoid Bun auto-install pitfalls) ---------- + ( + cd "${TMP_FETCH_DIR}" + echo "${FETCH_LOG_PREFIX} installing probe deps (@nostr-dev-kit/ndk, nostr-tools, dotenv) …" + stdbuf -oL -eL bun install 2>&1 | sed -u "s/^/${FETCH_LOG_PREFIX} [install] /" + ) + + # ---------- Run the fetcher ---------- + ( + export APP_RELAY_URL="ws://${RELAY_HOST}:${RELAY_PORT}" + export APP_PRIVATE_KEY="${APP_PRIVATE_KEY_INPUT}" + echo "${FETCH_LOG_PREFIX} running fetch probe against ${APP_RELAY_URL} …" + ( + cd "${TMP_FETCH_DIR}" + stdbuf -oL -eL bun "${TMP_FETCH_TS}" 2>&1 | sed -u "s/^/${FETCH_LOG_PREFIX} /" + ) + ) + + echo "[probe] Completed. Review logs above for publish/subscribe flow." \ No newline at end of file diff --git a/scripts/runtests.sh b/scripts/runtests.sh old mode 100644 new mode 100755