From a4fc3d8d9b65a123cdbe34d0d81c3ebec7d888a3 Mon Sep 17 00:00:00 2001 From: mleku Date: Wed, 22 Oct 2025 22:24:21 +0100 Subject: [PATCH] Implement spider functionality for event synchronization - Introduced a new `spider` package to manage connections to admin relays and synchronize events for followed pubkeys. - Added configuration options for spider mode in the application settings, allowing for different operational modes (e.g., follows). - Implemented callback mechanisms to dynamically retrieve admin relays and follow lists. - Enhanced the main application to initialize and manage the spider, including starting and stopping its operation. - Added tests to validate spider creation, callbacks, and operational behavior. - Bumped version to v0.17.14. --- app/config/config.go | 3 + app/handle-message.go | 48 +++ app/handle-req.go | 6 + app/main.go | 50 +++ app/server.go | 2 + pkg/acl/follows.go | 181 +++++++++-- pkg/encoders/text/helpers.go | 13 +- pkg/spider/spider.go | 581 +++++++++++++++++++++++++++++++++++ pkg/spider/spider_test.go | 244 +++++++++++++++ pkg/version/version | 2 +- 10 files changed, 1109 insertions(+), 21 deletions(-) create mode 100644 pkg/spider/spider.go create mode 100644 pkg/spider/spider_test.go diff --git a/app/config/config.go b/app/config/config.go index 4f886db..d4780d1 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -59,6 +59,9 @@ type C struct { // Sprocket settings SprocketEnabled bool `env:"ORLY_SPROCKET_ENABLED" default:"false" usage:"enable sprocket event processing plugin system"` + // Spider settings + SpiderMode string `env:"ORLY_SPIDER_MODE" default:"none" usage:"spider mode for syncing events: none, follows"` + PolicyEnabled bool `env:"ORLY_POLICY_ENABLED" default:"false" usage:"enable policy-based event processing (configuration found in $HOME/.config/ORLY/policy.json)"` } diff --git a/app/handle-message.go b/app/handle-message.go index 90d85c8..1cc937c 100644 --- a/app/handle-message.go +++ b/app/handle-message.go @@ -3,6 +3,7 @@ package app import ( "fmt" "time" + "unicode" "lol.mleku.dev/chk" "lol.mleku.dev/log" @@ -15,6 +16,42 @@ import ( "next.orly.dev/pkg/encoders/envelopes/reqenvelope" ) +// validateJSONMessage checks if a message contains invalid control characters +// that would cause JSON parsing to fail +func validateJSONMessage(msg []byte) (err error) { + for i, b := range msg { + // Check for invalid control characters in JSON strings + if b < 32 && b != '\t' && b != '\n' && b != '\r' { + // Allow some control characters that might be valid in certain contexts + // but reject form feed (\f), backspace (\b), and other problematic ones + switch b { + case '\b', '\f', 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, + 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, + 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F: + return fmt.Errorf("invalid control character 0x%02X at position %d", b, i) + } + } + // Check for non-printable characters that might indicate binary data + if b > 127 && !unicode.IsPrint(rune(b)) { + // Allow valid UTF-8 sequences, but be suspicious of random binary data + if i < len(msg)-1 { + // Quick check: if we see a lot of high-bit characters in sequence, + // it might be binary data masquerading as text + highBitCount := 0 + for j := i; j < len(msg) && j < i+10; j++ { + if msg[j] > 127 { + highBitCount++ + } + } + if highBitCount > 7 { // More than 70% high-bit chars in a 10-byte window + return fmt.Errorf("suspicious binary data detected at position %d", i) + } + } + } + } + return +} + func (l *Listener) HandleMessage(msg []byte, remote string) { // Handle blacklisted IPs - discard messages but keep connection open until timeout if l.isBlacklisted { @@ -35,6 +72,17 @@ func (l *Listener) HandleMessage(msg []byte, remote string) { } // log.D.F("%s processing message (len=%d): %s", remote, len(msg), msgPreview) + // Validate message for invalid characters before processing + if err := validateJSONMessage(msg); err != nil { + log.E.F("%s message validation FAILED (len=%d): %v", remote, len(msg), err) + log.T.F("%s invalid message content: %q", remote, msgPreview) + // Send error notice to client + if noticeErr := noticeenvelope.NewFrom("invalid message format: " + err.Error()).Write(l); noticeErr != nil { + log.E.F("%s failed to send validation error notice: %v", remote, noticeErr) + } + return + } + l.msgCount++ var err error var t string diff --git a/app/handle-req.go b/app/handle-req.go index 0c9c3c7..95314cb 100644 --- a/app/handle-req.go +++ b/app/handle-req.go @@ -35,6 +35,12 @@ func (l *Listener) HandleReq(msg []byte) (err error) { // var rem []byte env := reqenvelope.New() if _, err = env.Unmarshal(msg); chk.E(err) { + // Provide more specific error context for JSON parsing failures + if strings.Contains(err.Error(), "invalid character") { + log.E.F("REQ JSON parsing failed from %s: %v", l.remote, err) + log.T.F("REQ malformed message from %s: %q", l.remote, string(msg)) + return normalize.Error.Errorf("malformed REQ message: %s", err.Error()) + } return normalize.Error.Errorf(err.Error()) } diff --git a/app/main.go b/app/main.go index 56eb661..5516f8f 100644 --- a/app/main.go +++ b/app/main.go @@ -10,11 +10,13 @@ import ( "lol.mleku.dev/chk" "lol.mleku.dev/log" "next.orly.dev/app/config" + "next.orly.dev/pkg/acl" "next.orly.dev/pkg/crypto/keys" "next.orly.dev/pkg/database" "next.orly.dev/pkg/encoders/bech32encoding" "next.orly.dev/pkg/policy" "next.orly.dev/pkg/protocol/publish" + "next.orly.dev/pkg/spider" ) func Run( @@ -69,6 +71,48 @@ func Run( // Initialize policy manager l.policyManager = policy.NewWithManager(ctx, cfg.AppName, cfg.PolicyEnabled) + + // Initialize spider manager based on mode + if cfg.SpiderMode != "none" { + if l.spiderManager, err = spider.New(ctx, db, l.publishers, cfg.SpiderMode); chk.E(err) { + log.E.F("failed to create spider manager: %v", err) + } else { + // Set up callbacks for follows mode + if cfg.SpiderMode == "follows" { + l.spiderManager.SetCallbacks( + func() []string { + // Get admin relays from follows ACL if available + for _, aclInstance := range acl.Registry.ACL { + if aclInstance.Type() == "follows" { + if follows, ok := aclInstance.(*acl.Follows); ok { + return follows.AdminRelays() + } + } + } + return nil + }, + func() [][]byte { + // Get followed pubkeys from follows ACL if available + for _, aclInstance := range acl.Registry.ACL { + if aclInstance.Type() == "follows" { + if follows, ok := aclInstance.(*acl.Follows); ok { + return follows.GetFollowedPubkeys() + } + } + } + return nil + }, + ) + } + + if err = l.spiderManager.Start(); chk.E(err) { + log.E.F("failed to start spider manager: %v", err) + } else { + log.I.F("spider manager started successfully in '%s' mode", cfg.SpiderMode) + } + } + } + // Initialize the user interface l.UserInterface() @@ -135,6 +179,12 @@ func Run( <-ctx.Done() log.I.F("shutting down HTTP server gracefully") + // Stop spider manager if running + if l.spiderManager != nil { + l.spiderManager.Stop() + log.I.F("spider manager stopped") + } + // Create shutdown context with timeout shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second) defer cancelShutdown() diff --git a/app/server.go b/app/server.go index 878714a..1e5d7df 100644 --- a/app/server.go +++ b/app/server.go @@ -26,6 +26,7 @@ import ( "next.orly.dev/pkg/protocol/auth" "next.orly.dev/pkg/protocol/httpauth" "next.orly.dev/pkg/protocol/publish" + "next.orly.dev/pkg/spider" ) type Server struct { @@ -47,6 +48,7 @@ type Server struct { paymentProcessor *PaymentProcessor sprocketManager *SprocketManager policyManager *policy.P + spiderManager *spider.Spider } // isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system diff --git a/pkg/acl/follows.go b/pkg/acl/follows.go index cb7b762..2a01ba5 100644 --- a/pkg/acl/follows.go +++ b/pkg/acl/follows.go @@ -266,7 +266,7 @@ func (f *Follows) adminRelays() (urls []string) { // If no admin relays found, use bootstrap relays as fallback if len(urls) == 0 { - log.I.F("no admin relays found in DB, checking bootstrap relays") + log.I.F("no admin relays found in DB, checking bootstrap relays and failover relays") if len(f.cfg.BootstrapRelays) > 0 { log.I.F("using bootstrap relays: %v", f.cfg.BootstrapRelays) for _, relay := range f.cfg.BootstrapRelays { @@ -302,7 +302,53 @@ func (f *Follows) adminRelays() (urls []string) { urls = append(urls, n) } } else { - log.W.F("no bootstrap relays configured") + log.I.F("no bootstrap relays configured, using failover relays") + } + + // If still no relays found, use hardcoded failover relays + // These relays will be used to fetch admin relay lists (kind 10002) and store them + // in the database so they're found next time + if len(urls) == 0 { + failoverRelays := []string{ + "wss://nostr.land", + "wss://nostr.wine", + "wss://nos.lol", + "wss://relay.damus.io", + "wss://nostr.band", + } + log.I.F("using failover relays: %v", failoverRelays) + for _, relay := range failoverRelays { + n := string(normalize.URL(relay)) + if n == "" { + log.W.F("invalid failover relay URL: %s", relay) + continue + } + // Skip if this URL is one of our configured self relay addresses or hosts + if _, isSelf := selfSet[n]; isSelf { + log.D.F("follows syncer: skipping configured self relay address: %s", n) + continue + } + // Host match + host := n + if i := strings.Index(host, "://"); i >= 0 { + host = host[i+3:] + } + if j := strings.Index(host, "/"); j >= 0 { + host = host[:j] + } + if k := strings.Index(host, ":"); k >= 0 { + host = host[:k] + } + if _, isSelfHost := selfHosts[host]; isSelfHost { + log.D.F("follows syncer: skipping configured self relay address: %s", n) + continue + } + if _, ok := seen[n]; ok { + continue + } + seen[n] = struct{}{} + urls = append(urls, n) + } } } @@ -451,6 +497,7 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) { keepaliveTicker := time.NewTicker(30 * time.Second) defer keepaliveTicker.Stop() + readLoop: for { select { case <-ctx.Done(): @@ -460,7 +507,7 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) { // Send ping to keep connection alive if err := c.Ping(ctx); err != nil { log.T.F("follows syncer: ping failed for %s: %v", u, err) - break + break readLoop } log.T.F("follows syncer: sent ping to %s", u) continue @@ -471,7 +518,7 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) { readCancel() if err != nil { _ = c.Close(websocket.StatusNormalClosure, "read err") - break + break readLoop } label, rem, err := envelopes.Identify(data) if chk.E(err) { @@ -634,7 +681,7 @@ func (f *Follows) fetchAdminFollowLists() { urls := f.adminRelays() if len(urls) == 0 { - log.W.F("follows syncer: no admin relays found for follow list fetching") + log.W.F("follows syncer: no relays available for follow list fetching (no admin relays, bootstrap relays, or failover relays)") return } @@ -680,14 +727,19 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) { log.I.F("follows syncer: fetching follow lists from relay %s", relayURL) - // Create filter for follow lists only (kind 3) + // Create filter for follow lists and relay lists (kind 3 and kind 10002) ff := &filter.S{} f1 := &filter.F{ Authors: tag.NewFromBytesSlice(authors...), Kinds: kind.NewS(kind.New(kind.FollowList.K)), Limit: values.ToUintPointer(100), } - *ff = append(*ff, f1) + f2 := &filter.F{ + Authors: tag.NewFromBytesSlice(authors...), + Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)), + Limit: values.ToUintPointer(100), + } + *ff = append(*ff, f1, f2) // Use a specific subscription ID for follow list fetching subID := "follow-lists-fetch" @@ -699,24 +751,28 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) { return } - log.T.F("follows syncer: sent follow list REQ to %s", relayURL) + log.T.F("follows syncer: sent follow list and relay list REQ to %s", relayURL) - // Read follow list events with timeout + // Collect all events before processing + var followListEvents []*event.E + var relayListEvents []*event.E + + // Read events with timeout timeout := time.After(10 * time.Second) for { select { case <-ctx.Done(): - return + goto processEvents case <-timeout: - log.T.F("follows syncer: timeout reading follow lists from %s", relayURL) - return + log.T.F("follows syncer: timeout reading events from %s", relayURL) + goto processEvents default: } _, data, err := c.Read(ctx) if err != nil { - log.T.F("follows syncer: error reading follow lists from %s: %v", relayURL, err) - return + log.T.F("follows syncer: error reading events from %s: %v", relayURL, err) + goto processEvents } label, rem, err := envelopes.Identify(data) @@ -731,19 +787,101 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) { continue } - // Process follow list events - if res.Event.Kind == kind.FollowList.K { + // Collect events by kind + switch res.Event.Kind { + case kind.FollowList.K: log.I.F("follows syncer: received follow list from %s on relay %s", hex.EncodeToString(res.Event.Pubkey), relayURL) - f.extractFollowedPubkeys(res.Event) + followListEvents = append(followListEvents, res.Event) + case kind.RelayListMetadata.K: + log.I.F("follows syncer: received relay list from %s on relay %s", + hex.EncodeToString(res.Event.Pubkey), relayURL) + relayListEvents = append(relayListEvents, res.Event) } case eoseenvelope.L: - log.T.F("follows syncer: end of follow list events from %s", relayURL) - return + log.T.F("follows syncer: end of events from %s", relayURL) + goto processEvents default: // ignore other labels } } + +processEvents: + // Process collected events - keep only the newest per pubkey and save to database + f.processCollectedEvents(relayURL, followListEvents, relayListEvents) +} + +// processCollectedEvents processes the collected events, keeping only the newest per pubkey +func (f *Follows) processCollectedEvents(relayURL string, followListEvents, relayListEvents []*event.E) { + // Process follow list events (kind 3) - keep newest per pubkey + latestFollowLists := make(map[string]*event.E) + for _, ev := range followListEvents { + pubkeyHex := hex.EncodeToString(ev.Pubkey) + existing, exists := latestFollowLists[pubkeyHex] + if !exists || ev.CreatedAt > existing.CreatedAt { + latestFollowLists[pubkeyHex] = ev + } + } + + // Process relay list events (kind 10002) - keep newest per pubkey + latestRelayLists := make(map[string]*event.E) + for _, ev := range relayListEvents { + pubkeyHex := hex.EncodeToString(ev.Pubkey) + existing, exists := latestRelayLists[pubkeyHex] + if !exists || ev.CreatedAt > existing.CreatedAt { + latestRelayLists[pubkeyHex] = ev + } + } + + // Save and process the newest events + savedFollowLists := 0 + savedRelayLists := 0 + + // Save follow list events to database and extract follows + for pubkeyHex, ev := range latestFollowLists { + if _, err := f.D.SaveEvent(f.Ctx, ev); err != nil { + if !strings.HasPrefix(err.Error(), "blocked:") { + log.W.F("follows syncer: failed to save follow list from %s: %v", pubkeyHex, err) + } + } else { + savedFollowLists++ + log.I.F("follows syncer: saved newest follow list from %s (created_at: %d) from relay %s", + pubkeyHex, ev.CreatedAt, relayURL) + } + + // Extract followed pubkeys from admin follow lists + if f.isAdminPubkey(ev.Pubkey) { + log.I.F("follows syncer: processing admin follow list from %s", pubkeyHex) + f.extractFollowedPubkeys(ev) + } + } + + // Save relay list events to database + for pubkeyHex, ev := range latestRelayLists { + if _, err := f.D.SaveEvent(f.Ctx, ev); err != nil { + if !strings.HasPrefix(err.Error(), "blocked:") { + log.W.F("follows syncer: failed to save relay list from %s: %v", pubkeyHex, err) + } + } else { + savedRelayLists++ + log.I.F("follows syncer: saved newest relay list from %s (created_at: %d) from relay %s", + pubkeyHex, ev.CreatedAt, relayURL) + } + } + + log.I.F("follows syncer: processed %d follow lists and %d relay lists from %s, saved %d follow lists and %d relay lists", + len(followListEvents), len(relayListEvents), relayURL, savedFollowLists, savedRelayLists) + + // If we saved any relay lists, trigger a refresh of subscriptions to use the new relay lists + if savedRelayLists > 0 { + log.I.F("follows syncer: saved new relay lists, triggering subscription refresh") + // Signal that follows have been updated to refresh subscriptions + select { + case f.updated <- struct{}{}: + default: + // Channel might be full, that's okay + } + } } // GetFollowedPubkeys returns a copy of the followed pubkeys list @@ -783,6 +921,11 @@ func (f *Follows) extractFollowedPubkeys(event *event.E) { } } +// AdminRelays returns the admin relay URLs +func (f *Follows) AdminRelays() []string { + return f.adminRelays() +} + // AddFollow appends a pubkey to the in-memory follows list if not already present // and signals the syncer to refresh subscriptions. func (f *Follows) AddFollow(pub []byte) { diff --git a/pkg/encoders/text/helpers.go b/pkg/encoders/text/helpers.go index 6199b9c..ffc97d0 100644 --- a/pkg/encoders/text/helpers.go +++ b/pkg/encoders/text/helpers.go @@ -114,9 +114,20 @@ func UnmarshalQuoted(b []byte) (content, rem []byte, err error) { // // backspace, tab, newline, form feed or carriage return. case '\b', '\t', '\n', '\f', '\r': + pos := len(content) - len(rem) + contextStart := pos - 10 + if contextStart < 0 { + contextStart = 0 + } + contextEnd := pos + 10 + if contextEnd > len(content) { + contextEnd = len(content) + } err = errorf.E( - "invalid character '%s' in quoted string", + "invalid character '%s' in quoted string (position %d, context: %q)", NostrEscape(nil, rem[:1]), + pos, + string(content[contextStart:contextEnd]), ) return } diff --git a/pkg/spider/spider.go b/pkg/spider/spider.go new file mode 100644 index 0000000..ac33c13 --- /dev/null +++ b/pkg/spider/spider.go @@ -0,0 +1,581 @@ +package spider + +import ( + "context" + "encoding/hex" + "fmt" + "sync" + "time" + + "lol.mleku.dev/chk" + "lol.mleku.dev/errorf" + "lol.mleku.dev/log" + "next.orly.dev/pkg/database" + "next.orly.dev/pkg/encoders/filter" + "next.orly.dev/pkg/encoders/tag" + "next.orly.dev/pkg/encoders/timestamp" + "next.orly.dev/pkg/interfaces/publisher" + "next.orly.dev/pkg/protocol/ws" +) + +const ( + // BatchSize is the number of pubkeys per subscription batch + BatchSize = 20 + // CatchupWindow is the extra time added to disconnection periods for catch-up + CatchupWindow = 30 * time.Minute + // ReconnectDelay is the delay between reconnection attempts + ReconnectDelay = 5 * time.Second + // MaxReconnectDelay is the maximum delay between reconnection attempts + MaxReconnectDelay = 5 * time.Minute +) + +// Spider manages connections to admin relays and syncs events for followed pubkeys +type Spider struct { + ctx context.Context + cancel context.CancelFunc + db *database.D + pub publisher.I + mode string + + // Configuration + adminRelays []string + followList [][]byte + + // State management + mu sync.RWMutex + connections map[string]*RelayConnection + running bool + + // Callbacks for getting updated data + getAdminRelays func() []string + getFollowList func() [][]byte +} + +// RelayConnection manages a single relay connection and its subscriptions +type RelayConnection struct { + url string + client *ws.Client + ctx context.Context + cancel context.CancelFunc + spider *Spider + + // Subscription management + mu sync.RWMutex + subscriptions map[string]*BatchSubscription + + // Disconnection tracking + lastDisconnect time.Time + reconnectDelay time.Duration +} + +// BatchSubscription represents a subscription for a batch of pubkeys +type BatchSubscription struct { + id string + pubkeys [][]byte + startTime time.Time + sub *ws.Subscription + relay *RelayConnection + + // Track disconnection periods for catch-up + disconnectedAt *time.Time +} + +// DisconnectionPeriod tracks when a subscription was disconnected +type DisconnectionPeriod struct { + Start time.Time + End time.Time +} + +// New creates a new Spider instance +func New(ctx context.Context, db *database.D, pub publisher.I, mode string) (s *Spider, err error) { + if db == nil { + err = errorf.E("database cannot be nil") + return + } + + // Validate mode + switch mode { + case "follows", "none": + // Valid modes + default: + err = errorf.E("invalid spider mode: %s (valid modes: none, follows)", mode) + return + } + + ctx, cancel := context.WithCancel(ctx) + s = &Spider{ + ctx: ctx, + cancel: cancel, + db: db, + pub: pub, + mode: mode, + connections: make(map[string]*RelayConnection), + } + + return +} + +// SetCallbacks sets the callback functions for getting updated admin relays and follow lists +func (s *Spider) SetCallbacks(getAdminRelays func() []string, getFollowList func() [][]byte) { + s.mu.Lock() + defer s.mu.Unlock() + s.getAdminRelays = getAdminRelays + s.getFollowList = getFollowList +} + +// Start begins the spider operation +func (s *Spider) Start() (err error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.running { + err = errorf.E("spider already running") + return + } + + // Handle 'none' mode - no-op + if s.mode == "none" { + log.I.F("spider: mode is 'none', not starting") + return + } + + if s.getAdminRelays == nil || s.getFollowList == nil { + err = errorf.E("callbacks must be set before starting") + return + } + + s.running = true + + // Start the main loop + go s.mainLoop() + + log.I.F("spider: started in '%s' mode", s.mode) + return +} + +// Stop stops the spider operation +func (s *Spider) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.running { + return + } + + s.running = false + s.cancel() + + // Close all connections + for _, conn := range s.connections { + conn.close() + } + s.connections = make(map[string]*RelayConnection) + + log.I.F("spider: stopped") +} + +// mainLoop is the main spider loop that manages connections and subscriptions +func (s *Spider) mainLoop() { + ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds + defer ticker.Stop() + + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + s.updateConnections() + } + } +} + +// updateConnections updates relay connections based on current admin relays and follow lists +func (s *Spider) updateConnections() { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.running { + return + } + + // Get current admin relays and follow list + adminRelays := s.getAdminRelays() + followList := s.getFollowList() + + if len(adminRelays) == 0 || len(followList) == 0 { + log.D.F("spider: no admin relays (%d) or follow list (%d) available", + len(adminRelays), len(followList)) + return + } + + // Update connections for current admin relays + currentRelays := make(map[string]bool) + for _, url := range adminRelays { + currentRelays[url] = true + + if conn, exists := s.connections[url]; exists { + // Update existing connection + conn.updateSubscriptions(followList) + } else { + // Create new connection + s.createConnection(url, followList) + } + } + + // Remove connections for relays no longer in admin list + for url, conn := range s.connections { + if !currentRelays[url] { + log.I.F("spider: removing connection to %s (no longer in admin relays)", url) + conn.close() + delete(s.connections, url) + } + } +} + +// createConnection creates a new relay connection +func (s *Spider) createConnection(url string, followList [][]byte) { + log.I.F("spider: creating connection to %s", url) + + ctx, cancel := context.WithCancel(s.ctx) + conn := &RelayConnection{ + url: url, + ctx: ctx, + cancel: cancel, + spider: s, + subscriptions: make(map[string]*BatchSubscription), + reconnectDelay: ReconnectDelay, + } + + s.connections[url] = conn + + // Start connection in goroutine + go conn.manage(followList) +} + +// manage handles the lifecycle of a relay connection +func (rc *RelayConnection) manage(followList [][]byte) { + for { + select { + case <-rc.ctx.Done(): + return + default: + } + + // Attempt to connect + if err := rc.connect(); chk.E(err) { + log.W.F("spider: failed to connect to %s: %v", rc.url, err) + rc.waitBeforeReconnect() + continue + } + + log.I.F("spider: connected to %s", rc.url) + rc.reconnectDelay = ReconnectDelay // Reset delay on successful connection + + // Create subscriptions for follow list + rc.createSubscriptions(followList) + + // Wait for disconnection + <-rc.client.Context().Done() + + log.W.F("spider: disconnected from %s: %v", rc.url, rc.client.ConnectionCause()) + rc.handleDisconnection() + + // Clean up + rc.client = nil + rc.clearSubscriptions() + } +} + +// connect establishes a websocket connection to the relay +func (rc *RelayConnection) connect() (err error) { + connectCtx, cancel := context.WithTimeout(rc.ctx, 10*time.Second) + defer cancel() + + if rc.client, err = ws.RelayConnect(connectCtx, rc.url); chk.E(err) { + return + } + + return +} + +// waitBeforeReconnect waits before attempting to reconnect with exponential backoff +func (rc *RelayConnection) waitBeforeReconnect() { + select { + case <-rc.ctx.Done(): + return + case <-time.After(rc.reconnectDelay): + } + + // Exponential backoff + rc.reconnectDelay *= 2 + if rc.reconnectDelay > MaxReconnectDelay { + rc.reconnectDelay = MaxReconnectDelay + } +} + +// handleDisconnection records disconnection time for catch-up logic +func (rc *RelayConnection) handleDisconnection() { + now := time.Now() + rc.lastDisconnect = now + + // Mark all subscriptions as disconnected + rc.mu.Lock() + defer rc.mu.Unlock() + + for _, sub := range rc.subscriptions { + if sub.disconnectedAt == nil { + sub.disconnectedAt = &now + } + } +} + +// createSubscriptions creates batch subscriptions for the follow list +func (rc *RelayConnection) createSubscriptions(followList [][]byte) { + rc.mu.Lock() + defer rc.mu.Unlock() + + // Clear existing subscriptions + rc.clearSubscriptionsLocked() + + // Create batches of pubkeys + batches := rc.createBatches(followList) + + log.I.F("spider: creating %d subscription batches for %d pubkeys on %s", + len(batches), len(followList), rc.url) + + for i, batch := range batches { + batchID := fmt.Sprintf("batch-%d", i) // Simple batch ID + rc.createBatchSubscription(batchID, batch) + } +} + +// createBatches splits the follow list into batches of BatchSize +func (rc *RelayConnection) createBatches(followList [][]byte) (batches [][][]byte) { + for i := 0; i < len(followList); i += BatchSize { + end := i + BatchSize + if end > len(followList) { + end = len(followList) + } + + batch := make([][]byte, end-i) + copy(batch, followList[i:end]) + batches = append(batches, batch) + } + return +} + +// createBatchSubscription creates a subscription for a batch of pubkeys +func (rc *RelayConnection) createBatchSubscription(batchID string, pubkeys [][]byte) { + if rc.client == nil { + return + } + + // Create filters: one for authors, one for p tags + var pTags tag.S + for _, pk := range pubkeys { + pTags = append(pTags, tag.NewFromAny("p", pk)) + } + + filters := filter.NewS( + &filter.F{ + Authors: tag.NewFromBytesSlice(pubkeys...), + }, + &filter.F{ + Tags: tag.NewS(pTags...), + }, + ) + + // Subscribe + sub, err := rc.client.Subscribe(rc.ctx, filters) + if chk.E(err) { + log.E.F("spider: failed to create subscription %s on %s: %v", batchID, rc.url, err) + return + } + + batchSub := &BatchSubscription{ + id: batchID, + pubkeys: pubkeys, + startTime: time.Now(), + sub: sub, + relay: rc, + } + + rc.subscriptions[batchID] = batchSub + + // Start event handler + go batchSub.handleEvents() + + log.D.F("spider: created subscription %s for %d pubkeys on %s", + batchID, len(pubkeys), rc.url) +} + +// handleEvents processes events from the subscription +func (bs *BatchSubscription) handleEvents() { + for { + select { + case <-bs.relay.ctx.Done(): + return + case ev := <-bs.sub.Events: + if ev == nil { + return // Subscription closed + } + + // Save event to database + if _, err := bs.relay.spider.db.SaveEvent(bs.relay.ctx, ev); err != nil { + if !chk.E(err) { + log.T.F("spider: saved event %s from %s", + hex.EncodeToString(ev.ID[:]), bs.relay.url) + } + } else { + // Publish event if it was newly saved + if bs.relay.spider.pub != nil { + go bs.relay.spider.pub.Deliver(ev) + } + } + } + } +} + +// updateSubscriptions updates subscriptions for a connection with new follow list +func (rc *RelayConnection) updateSubscriptions(followList [][]byte) { + if rc.client == nil || !rc.client.IsConnected() { + return // Will be handled on reconnection + } + + rc.mu.Lock() + defer rc.mu.Unlock() + + // Check if we need to perform catch-up for disconnected subscriptions + now := time.Now() + needsCatchup := false + + for _, sub := range rc.subscriptions { + if sub.disconnectedAt != nil { + needsCatchup = true + rc.performCatchup(sub, *sub.disconnectedAt, now, followList) + sub.disconnectedAt = nil // Clear disconnection marker + } + } + + if needsCatchup { + log.I.F("spider: performed catch-up for disconnected subscriptions on %s", rc.url) + } + + // Recreate subscriptions with updated follow list + rc.clearSubscriptionsLocked() + + batches := rc.createBatches(followList) + for i, batch := range batches { + batchID := fmt.Sprintf("batch-%d", i) + rc.createBatchSubscription(batchID, batch) + } +} + +// performCatchup queries for events missed during disconnection +func (rc *RelayConnection) performCatchup(sub *BatchSubscription, disconnectTime, reconnectTime time.Time, followList [][]byte) { + // Expand time window by CatchupWindow on both sides + since := disconnectTime.Add(-CatchupWindow) + until := reconnectTime.Add(CatchupWindow) + + log.I.F("spider: performing catch-up for %s from %v to %v (expanded window)", + rc.url, since, until) + + // Create catch-up filters with time constraints + sinceTs := timestamp.T{V: since.Unix()} + untilTs := timestamp.T{V: until.Unix()} + + var pTags tag.S + for _, pk := range sub.pubkeys { + pTags = append(pTags, tag.NewFromAny("p", pk)) + } + + filters := filter.NewS( + &filter.F{ + Authors: tag.NewFromBytesSlice(sub.pubkeys...), + Since: &sinceTs, + Until: &untilTs, + }, + &filter.F{ + Tags: tag.NewS(pTags...), + Since: &sinceTs, + Until: &untilTs, + }, + ) + + // Create temporary subscription for catch-up + catchupCtx, cancel := context.WithTimeout(rc.ctx, 30*time.Second) + defer cancel() + + catchupSub, err := rc.client.Subscribe(catchupCtx, filters) + if chk.E(err) { + log.E.F("spider: failed to create catch-up subscription on %s: %v", rc.url, err) + return + } + defer catchupSub.Unsub() + + // Process catch-up events + eventCount := 0 + timeout := time.After(30 * time.Second) + + for { + select { + case <-catchupCtx.Done(): + log.D.F("spider: catch-up completed on %s, processed %d events", rc.url, eventCount) + return + case <-timeout: + log.D.F("spider: catch-up timeout on %s, processed %d events", rc.url, eventCount) + return + case <-catchupSub.EndOfStoredEvents: + log.D.F("spider: catch-up EOSE on %s, processed %d events", rc.url, eventCount) + return + case ev := <-catchupSub.Events: + if ev == nil { + return + } + + eventCount++ + + // Save event to database + if _, err := rc.spider.db.SaveEvent(rc.ctx, ev); err != nil { + if !chk.E(err) { + log.T.F("spider: catch-up saved event %s from %s", + hex.EncodeToString(ev.ID[:]), rc.url) + } + } else { + // Publish event if it was newly saved + if rc.spider.pub != nil { + go rc.spider.pub.Deliver(ev) + } + } + } + } +} + +// clearSubscriptions clears all subscriptions (with lock) +func (rc *RelayConnection) clearSubscriptions() { + rc.mu.Lock() + defer rc.mu.Unlock() + rc.clearSubscriptionsLocked() +} + +// clearSubscriptionsLocked clears all subscriptions (without lock) +func (rc *RelayConnection) clearSubscriptionsLocked() { + for _, sub := range rc.subscriptions { + if sub.sub != nil { + sub.sub.Unsub() + } + } + rc.subscriptions = make(map[string]*BatchSubscription) +} + +// close closes the relay connection +func (rc *RelayConnection) close() { + rc.clearSubscriptions() + + if rc.client != nil { + rc.client.Close() + rc.client = nil + } + + rc.cancel() +} diff --git a/pkg/spider/spider_test.go b/pkg/spider/spider_test.go new file mode 100644 index 0000000..58b04b1 --- /dev/null +++ b/pkg/spider/spider_test.go @@ -0,0 +1,244 @@ +package spider + +import ( + "context" + "os" + "testing" + "time" + + "next.orly.dev/pkg/database" +) + +func TestSpiderCreation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a temporary database for testing + tempDir, err := os.MkdirTemp("", "spider-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + db, err := database.New(ctx, cancel, tempDir, "error") + if err != nil { + t.Fatalf("Failed to create test database: %v", err) + } + defer db.Close() + + // Test spider creation + spider, err := New(ctx, db, nil, "follows") + if err != nil { + t.Fatalf("Failed to create spider: %v", err) + } + + if spider == nil { + t.Fatal("Spider is nil") + } + + // Test that spider is not running initially + spider.mu.RLock() + running := spider.running + spider.mu.RUnlock() + + if running { + t.Error("Spider should not be running initially") + } +} + +func TestSpiderCallbacks(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a temporary database for testing + tempDir, err := os.MkdirTemp("", "spider-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + db, err := database.New(ctx, cancel, tempDir, "error") + if err != nil { + t.Fatalf("Failed to create test database: %v", err) + } + defer db.Close() + + spider, err := New(ctx, db, nil, "follows") + if err != nil { + t.Fatalf("Failed to create spider: %v", err) + } + + // Test callback setup + testRelays := []string{"wss://relay1.example.com", "wss://relay2.example.com"} + testPubkeys := [][]byte{{1, 2, 3}, {4, 5, 6}} + + spider.SetCallbacks( + func() []string { return testRelays }, + func() [][]byte { return testPubkeys }, + ) + + // Verify callbacks are set + spider.mu.RLock() + hasCallbacks := spider.getAdminRelays != nil && spider.getFollowList != nil + spider.mu.RUnlock() + + if !hasCallbacks { + t.Error("Callbacks should be set") + } + + // Test that start fails without callbacks being set first + spider2, err := New(ctx, db, nil, "follows") + if err != nil { + t.Fatalf("Failed to create second spider: %v", err) + } + + err = spider2.Start() + if err == nil { + t.Error("Start should fail when callbacks are not set") + } +} + +func TestSpiderModeValidation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a temporary database for testing + tempDir, err := os.MkdirTemp("", "spider-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + db, err := database.New(ctx, cancel, tempDir, "error") + if err != nil { + t.Fatalf("Failed to create test database: %v", err) + } + defer db.Close() + + // Test valid mode + spider, err := New(ctx, db, nil, "follows") + if err != nil { + t.Fatalf("Failed to create spider with valid mode: %v", err) + } + if spider == nil { + t.Fatal("Spider should not be nil for valid mode") + } + + // Test invalid mode + _, err = New(ctx, db, nil, "invalid") + if err == nil { + t.Error("Should fail with invalid mode") + } + + // Test none mode (should succeed but be a no-op) + spider2, err := New(ctx, db, nil, "none") + if err != nil { + t.Errorf("Should succeed with 'none' mode: %v", err) + } + if spider2 == nil { + t.Error("Spider should not be nil for 'none' mode") + } + + // Test that 'none' mode doesn't require callbacks + err = spider2.Start() + if err != nil { + t.Errorf("'none' mode should start without callbacks: %v", err) + } +} + +func TestSpiderBatching(t *testing.T) { + // Test batch creation logic + followList := make([][]byte, 50) // 50 pubkeys + for i := range followList { + followList[i] = make([]byte, 32) + for j := range followList[i] { + followList[i][j] = byte(i) + } + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + rc := &RelayConnection{ + url: "wss://test.relay.com", + ctx: ctx, + } + + batches := rc.createBatches(followList) + + // Should create 3 batches: 20, 20, 10 + expectedBatches := 3 + if len(batches) != expectedBatches { + t.Errorf("Expected %d batches, got %d", expectedBatches, len(batches)) + } + + // Check batch sizes + if len(batches[0]) != BatchSize { + t.Errorf("First batch should have %d pubkeys, got %d", BatchSize, len(batches[0])) + } + if len(batches[1]) != BatchSize { + t.Errorf("Second batch should have %d pubkeys, got %d", BatchSize, len(batches[1])) + } + if len(batches[2]) != 10 { + t.Errorf("Third batch should have 10 pubkeys, got %d", len(batches[2])) + } +} + +func TestSpiderStartStop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a temporary database for testing + tempDir, err := os.MkdirTemp("", "spider-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + db, err := database.New(ctx, cancel, tempDir, "error") + if err != nil { + t.Fatalf("Failed to create test database: %v", err) + } + defer db.Close() + + spider, err := New(ctx, db, nil, "follows") + if err != nil { + t.Fatalf("Failed to create spider: %v", err) + } + + // Set up callbacks + spider.SetCallbacks( + func() []string { return []string{"wss://test.relay.com"} }, + func() [][]byte { return [][]byte{{1, 2, 3}} }, + ) + + // Test start + err = spider.Start() + if err != nil { + t.Fatalf("Failed to start spider: %v", err) + } + + // Verify spider is running + spider.mu.RLock() + running := spider.running + spider.mu.RUnlock() + + if !running { + t.Error("Spider should be running after start") + } + + // Test stop + spider.Stop() + + // Give it a moment to stop + time.Sleep(100 * time.Millisecond) + + // Verify spider is stopped + spider.mu.RLock() + running = spider.running + spider.mu.RUnlock() + + if running { + t.Error("Spider should not be running after stop") + } +} diff --git a/pkg/version/version b/pkg/version/version index adcdd54..6fa3e08 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.17.12 \ No newline at end of file +v0.17.14 \ No newline at end of file