diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 6385d7d..d809688 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -57,7 +57,9 @@ "Bash(./build.sh)", "Bash(./pkg/wasm/shell/run.sh:*)", "Bash(./run.sh echo.wasm)", - "Bash(./test.sh)" + "Bash(./test.sh)", + "Bash(ORLY_PPROF=cpu ORLY_LOG_LEVEL=info ORLY_LISTEN=0.0.0.0 ORLY_PORT=3334 ORLY_ADMINS=npub1fjqqy4a93z5zsjwsfxqhc2764kvykfdyttvldkkkdera8dr78vhsmmleku ORLY_OWNERS=npub1fjqqy4a93z5zsjwsfxqhc2764kvykfdyttvldkkkdera8dr78vhsmmleku ORLY_ACL_MODE=follows ORLY_SPIDER_MODE=follows timeout 120 go run:*)", + "Bash(go tool pprof:*)" ], "deny": [], "ask": [] diff --git a/app/main.go b/app/main.go index b4b5f82..f94b624 100644 --- a/app/main.go +++ b/app/main.go @@ -122,6 +122,21 @@ func Run( log.E.F("failed to start spider manager: %v", err) } else { log.I.F("spider manager started successfully in '%s' mode", cfg.SpiderMode) + + // Hook up follow list update notifications from ACL to spider + if cfg.SpiderMode == "follows" { + for _, aclInstance := range acl.Registry.ACL { + if aclInstance.Type() == "follows" { + if follows, ok := aclInstance.(*acl.Follows); ok { + follows.SetFollowListUpdateCallback(func() { + log.I.F("follow list updated, notifying spider") + l.spiderManager.NotifyFollowListUpdate() + }) + log.I.F("spider: follow list update notifications configured") + } + } + } + } } } } diff --git a/cmd/find/main.go b/cmd/FIND/main.go similarity index 100% rename from cmd/find/main.go rename to cmd/FIND/main.go diff --git a/contrib/stella/APACHE-PROXY-GUIDE.md b/contrib/stella/APACHE-PROXY-GUIDE.md index 5e91457..fedb1d6 100644 --- a/contrib/stella/APACHE-PROXY-GUIDE.md +++ b/contrib/stella/APACHE-PROXY-GUIDE.md @@ -27,7 +27,7 @@ docker run -d \ -v /data/orly-relay:/data \ -e ORLY_OWNERS=npub1v30tsz9vw6ylpz63g0a702nj3xa26t3m7p5us8f2y2sd8v6cnsvq465zjx \ -e ORLY_ADMINS=npub1v30tsz9vw6ylpz63g0a702nj3xa26t3m7p5us8f2y2sd8v6cnsvq465zjx,npub1l5sga6xg72phsz5422ykujprejwud075ggrr3z2hwyrfgr7eylqstegx9z,npub1m4ny6hjqzepn4rxknuq94c2gpqzr29ufkkw7ttcxyak7v43n6vvsajc2jl \ - -e ORLY_BOOTSTRAP_RELAYS=wss://profiles.nostr1.com,wss://purplepag.es,wss://relay.nostr.band,wss://relay.damus.io \ + -e ORLY_BOOTSTRAP_RELAYS=wss://profiles.nostr1.com,wss://purplepag.es,wss://relay.damus.io \ -e ORLY_RELAY_URL=wss://orly-relay.imwald.eu \ -e ORLY_ACL_MODE=follows \ -e ORLY_SUBSCRIPTION_ENABLED=false \ diff --git a/contrib/stella/docker-compose.yml b/contrib/stella/docker-compose.yml index 29815eb..25445af 100644 --- a/contrib/stella/docker-compose.yml +++ b/contrib/stella/docker-compose.yml @@ -28,7 +28,7 @@ services: - ORLY_ACL_MODE=follows # Bootstrap relay URLs for initial sync - - ORLY_BOOTSTRAP_RELAYS=wss://profiles.nostr1.com,wss://purplepag.es,wss://relay.nostr.band,wss://relay.damus.io + - ORLY_BOOTSTRAP_RELAYS=wss://profiles.nostr1.com,wss://purplepag.es,wss://relay.damus.io # Subscription Settings (optional) - ORLY_SUBSCRIPTION_ENABLED=false diff --git a/pkg/acl/follows.go b/pkg/acl/follows.go index 27b12ba..cbd2887 100644 --- a/pkg/acl/follows.go +++ b/pkg/acl/follows.go @@ -46,6 +46,8 @@ type Follows struct { subsCancel context.CancelFunc // Track last follow list fetch time lastFollowListFetch time.Time + // Callback for external notification of follow list changes + onFollowListUpdate func() } func (f *Follows) Configure(cfg ...any) (err error) { @@ -314,7 +316,6 @@ func (f *Follows) adminRelays() (urls []string) { "wss://nostr.wine", "wss://nos.lol", "wss://relay.damus.io", - "wss://nostr.band", } log.I.F("using failover relays: %v", failoverRelays) for _, relay := range failoverRelays { @@ -933,6 +934,13 @@ func (f *Follows) AdminRelays() []string { return f.adminRelays() } +// SetFollowListUpdateCallback sets a callback to be called when the follow list is updated +func (f *Follows) SetFollowListUpdateCallback(callback func()) { + f.followsMx.Lock() + defer f.followsMx.Unlock() + f.onFollowListUpdate = callback +} + // AddFollow appends a pubkey to the in-memory follows list if not already present // and signals the syncer to refresh subscriptions. func (f *Follows) AddFollow(pub []byte) { @@ -961,6 +969,10 @@ func (f *Follows) AddFollow(pub []byte) { // if channel is full or not yet listened to, ignore } } + // notify external listeners (e.g., spider) + if f.onFollowListUpdate != nil { + go f.onFollowListUpdate() + } } func init() { diff --git a/pkg/encoders/envelopes/reqenvelope/reqenvelope.go b/pkg/encoders/envelopes/reqenvelope/reqenvelope.go index b6deb6c..1712f97 100644 --- a/pkg/encoders/envelopes/reqenvelope/reqenvelope.go +++ b/pkg/encoders/envelopes/reqenvelope/reqenvelope.go @@ -6,7 +6,6 @@ import ( "io" "lol.mleku.dev/chk" - "lol.mleku.dev/log" "next.orly.dev/pkg/encoders/envelopes" "next.orly.dev/pkg/encoders/filter" "next.orly.dev/pkg/encoders/text" @@ -86,24 +85,19 @@ func (en *T) Marshal(dst []byte) (b []byte) { // string is correctly unescaped by NIP-01 escaping rules. func (en *T) Unmarshal(b []byte) (r []byte, err error) { r = b - log.I.F("%s", r) if en.Subscription, r, err = text.UnmarshalQuoted(r); chk.E(err) { return } - log.I.F("%s", r) if r, err = text.Comma(r); chk.E(err) { return } - log.I.F("%s", r) en.Filters = new(filter.S) if r, err = en.Filters.Unmarshal(r); chk.E(err) { return } - log.I.F("%s", r) if r, err = envelopes.SkipToTheEnd(r); chk.E(err) { return } - log.I.F("%s", r) return } diff --git a/pkg/protocol/ws/client.go b/pkg/protocol/ws/client.go index 91e8fb3..718a5f0 100644 --- a/pkg/protocol/ws/client.go +++ b/pkg/protocol/ws/client.go @@ -111,6 +111,7 @@ type RelayOption interface { var ( _ RelayOption = (WithCustomHandler)(nil) _ RelayOption = (WithRequestHeader)(nil) + _ RelayOption = (WithNoticeHandler)(nil) ) // WithCustomHandler must be a function that handles any relay message that couldn't be @@ -128,6 +129,18 @@ func (ch WithRequestHeader) ApplyRelayOption(r *Client) { r.requestHeader = http.Header(ch) } +// WithNoticeHandler must be a function that handles NOTICE messages from the relay. +type WithNoticeHandler func(notice []byte) + +func (nh WithNoticeHandler) ApplyRelayOption(r *Client) { + r.notices = make(chan []byte, 8) + go func() { + for notice := range r.notices { + nh(notice) + } + }() +} + // String just returns the relay URL. func (r *Client) String() string { return r.URL diff --git a/pkg/spider/spider.go b/pkg/spider/spider.go index 7c0e3a6..ef4d3ab 100644 --- a/pkg/spider/spider.go +++ b/pkg/spider/spider.go @@ -3,6 +3,7 @@ package spider import ( "context" "fmt" + "strings" "sync" "time" @@ -23,12 +24,24 @@ const ( BatchSize = 20 // CatchupWindow is the extra time added to disconnection periods for catch-up CatchupWindow = 30 * time.Minute - // ReconnectDelay is the delay between reconnection attempts - ReconnectDelay = 5 * time.Second - // MaxReconnectDelay is the maximum delay between reconnection attempts - MaxReconnectDelay = 5 * time.Minute - // BlackoutPeriod is the duration to blacklist a relay after MaxReconnectDelay is reached + // ReconnectDelay is the initial delay between reconnection attempts + ReconnectDelay = 10 * time.Second + // MaxReconnectDelay is the maximum delay before switching to blackout + MaxReconnectDelay = 1 * time.Hour + // BlackoutPeriod is the duration to blacklist a relay after max backoff is reached BlackoutPeriod = 24 * time.Hour + // BatchCreationDelay is the delay between creating each batch subscription + BatchCreationDelay = 500 * time.Millisecond + // RateLimitBackoffDuration is how long to wait when we get a rate limit error + RateLimitBackoffDuration = 1 * time.Minute + // RateLimitBackoffMultiplier is the factor by which we increase backoff on repeated rate limits + RateLimitBackoffMultiplier = 2 + // MaxRateLimitBackoff is the maximum backoff duration for rate limiting + MaxRateLimitBackoff = 30 * time.Minute + // MainLoopInterval is how often the spider checks for updates + MainLoopInterval = 5 * time.Minute + // EventHandlerBufferSize is the buffer size for event channels + EventHandlerBufferSize = 100 ) // Spider manages connections to admin relays and syncs events for followed pubkeys @@ -51,6 +64,9 @@ type Spider struct { // Callbacks for getting updated data getAdminRelays func() []string getFollowList func() [][]byte + + // Notification channel for follow list updates + followListUpdated chan struct{} } // RelayConnection manages a single relay connection and its subscriptions @@ -72,6 +88,10 @@ type RelayConnection struct { // Blackout tracking for IP filters blackoutUntil time.Time + + // Rate limiting tracking + rateLimitBackoff time.Duration + rateLimitUntil time.Time } // BatchSubscription represents a subscription for a batch of pubkeys @@ -110,12 +130,13 @@ func New(ctx context.Context, db *database.D, pub publisher.I, mode string) (s * ctx, cancel := context.WithCancel(ctx) s = &Spider{ - ctx: ctx, - cancel: cancel, - db: db, - pub: pub, - mode: mode, - connections: make(map[string]*RelayConnection), + ctx: ctx, + cancel: cancel, + db: db, + pub: pub, + mode: mode, + connections: make(map[string]*RelayConnection), + followListUpdated: make(chan struct{}, 1), } return @@ -129,6 +150,19 @@ func (s *Spider) SetCallbacks(getAdminRelays func() []string, getFollowList func s.getFollowList = getFollowList } +// NotifyFollowListUpdate signals the spider that the follow list has been updated +func (s *Spider) NotifyFollowListUpdate() { + if s.followListUpdated != nil { + select { + case s.followListUpdated <- struct{}{}: + log.D.F("spider: follow list update notification sent") + default: + // Channel full, update already pending + log.D.F("spider: follow list update notification already pending") + } + } +} + // Start begins the spider operation func (s *Spider) Start() (err error) { s.mu.Lock() @@ -182,14 +216,20 @@ func (s *Spider) Stop() { // mainLoop is the main spider loop that manages connections and subscriptions func (s *Spider) mainLoop() { - ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds + ticker := time.NewTicker(MainLoopInterval) defer ticker.Stop() + log.I.F("spider: main loop started, checking every %v", MainLoopInterval) + for { select { case <-s.ctx.Done(): return + case <-s.followListUpdated: + log.I.F("spider: follow list updated, refreshing connections") + s.updateConnections() case <-ticker.C: + log.D.F("spider: periodic check triggered") s.updateConnections() } } @@ -261,19 +301,24 @@ func (s *Spider) createConnection(url string, followList [][]byte) { // manage handles the lifecycle of a relay connection func (rc *RelayConnection) manage(followList [][]byte) { for { + // Check context first select { case <-rc.ctx.Done(): + log.D.F("spider: connection manager for %s stopping (context done)", rc.url) return default: } // Check if relay is blacked out if rc.isBlackedOut() { - log.D.F("spider: %s is blacked out until %v", rc.url, rc.blackoutUntil) + waitDuration := time.Until(rc.blackoutUntil) + log.I.F("spider: %s is blacked out for %v more", rc.url, waitDuration) + + // Wait for blackout to expire or context cancellation select { case <-rc.ctx.Done(): return - case <-time.After(time.Until(rc.blackoutUntil)): + case <-time.After(waitDuration): // Blackout expired, reset delay and try again rc.reconnectDelay = ReconnectDelay log.I.F("spider: blackout period ended for %s, retrying", rc.url) @@ -282,6 +327,7 @@ func (rc *RelayConnection) manage(followList [][]byte) { } // Attempt to connect + log.D.F("spider: attempting to connect to %s (backoff: %v)", rc.url, rc.reconnectDelay) if err := rc.connect(); chk.E(err) { log.W.F("spider: failed to connect to %s: %v", rc.url, err) rc.waitBeforeReconnect() @@ -290,8 +336,17 @@ func (rc *RelayConnection) manage(followList [][]byte) { log.I.F("spider: connected to %s", rc.url) rc.connectionStartTime = time.Now() - rc.reconnectDelay = ReconnectDelay // Reset delay on successful connection - rc.blackoutUntil = time.Time{} // Clear blackout on successful connection + + // Only reset reconnect delay on successful connection + // (don't reset if we had a quick disconnect before) + if rc.reconnectDelay > ReconnectDelay*8 { + // Gradual recovery: reduce by half instead of full reset + rc.reconnectDelay = rc.reconnectDelay / 2 + log.D.F("spider: reducing backoff for %s to %v", rc.url, rc.reconnectDelay) + } else { + rc.reconnectDelay = ReconnectDelay + } + rc.blackoutUntil = time.Time{} // Clear blackout on successful connection // Create subscriptions for follow list rc.createSubscriptions(followList) @@ -300,19 +355,25 @@ func (rc *RelayConnection) manage(followList [][]byte) { <-rc.client.Context().Done() log.W.F("spider: disconnected from %s: %v", rc.url, rc.client.ConnectionCause()) - - // Check if disconnection happened very quickly (likely IP filter) + + // Check if disconnection happened very quickly (likely IP filter or ban) connectionDuration := time.Since(rc.connectionStartTime) - const quickDisconnectThreshold = 30 * time.Second + const quickDisconnectThreshold = 2 * time.Minute if connectionDuration < quickDisconnectThreshold { - log.W.F("spider: quick disconnection from %s after %v (likely IP filter)", rc.url, connectionDuration) - // Don't reset the delay, keep the backoff + log.W.F("spider: quick disconnection from %s after %v (likely connection issue/ban)", rc.url, connectionDuration) + // Don't reset the delay, keep the backoff and increase it rc.waitBeforeReconnect() } else { - // Normal disconnection, reset backoff for future connections - rc.reconnectDelay = ReconnectDelay + // Normal disconnection after decent uptime - gentle backoff + log.I.F("spider: normal disconnection from %s after %v uptime", rc.url, connectionDuration) + // Small delay before reconnecting + select { + case <-rc.ctx.Done(): + return + case <-time.After(5 * time.Second): + } } - + rc.handleDisconnection() // Clean up @@ -326,15 +387,56 @@ func (rc *RelayConnection) connect() (err error) { connectCtx, cancel := context.WithTimeout(rc.ctx, 10*time.Second) defer cancel() - if rc.client, err = ws.RelayConnect(connectCtx, rc.url); chk.E(err) { + // Create client with notice handler to detect rate limiting + rc.client, err = ws.RelayConnect(connectCtx, rc.url, ws.WithNoticeHandler(rc.handleNotice)) + if chk.E(err) { return } return } +// handleNotice processes NOTICE messages from the relay +func (rc *RelayConnection) handleNotice(notice []byte) { + noticeStr := string(notice) + log.D.F("spider: NOTICE from %s: '%s'", rc.url, noticeStr) + + // Check for rate limiting errors + if strings.Contains(noticeStr, "too many concurrent REQs") || + strings.Contains(noticeStr, "rate limit") || + strings.Contains(noticeStr, "slow down") { + rc.handleRateLimit() + } +} + +// handleRateLimit applies backoff when rate limiting is detected +func (rc *RelayConnection) handleRateLimit() { + rc.mu.Lock() + defer rc.mu.Unlock() + + // Initialize backoff if not set + if rc.rateLimitBackoff == 0 { + rc.rateLimitBackoff = RateLimitBackoffDuration + } else { + // Exponential backoff + rc.rateLimitBackoff *= RateLimitBackoffMultiplier + if rc.rateLimitBackoff > MaxRateLimitBackoff { + rc.rateLimitBackoff = MaxRateLimitBackoff + } + } + + rc.rateLimitUntil = time.Now().Add(rc.rateLimitBackoff) + log.W.F("spider: rate limit detected on %s, backing off for %v until %v", + rc.url, rc.rateLimitBackoff, rc.rateLimitUntil) + + // Close all current subscriptions to reduce load + rc.clearSubscriptionsLocked() +} + // waitBeforeReconnect waits before attempting to reconnect with exponential backoff func (rc *RelayConnection) waitBeforeReconnect() { + log.I.F("spider: waiting %v before reconnecting to %s", rc.reconnectDelay, rc.url) + select { case <-rc.ctx.Done(): return @@ -342,12 +444,14 @@ func (rc *RelayConnection) waitBeforeReconnect() { } // Exponential backoff - double every time + // 10s -> 20s -> 40s -> 80s (1.3m) -> 160s (2.7m) -> 320s (5.3m) -> 640s (10.7m) -> 1280s (21m) -> 2560s (42m) -> 3600s (1h) rc.reconnectDelay *= 2 - - // If backoff exceeds 5 minutes, blackout for 24 hours + + // Cap at MaxReconnectDelay (1 hour), then switch to 24-hour blackout if rc.reconnectDelay >= MaxReconnectDelay { rc.blackoutUntil = time.Now().Add(BlackoutPeriod) - log.W.F("spider: max backoff exceeded for %s (reached %v), blacking out for 24 hours", rc.url, rc.reconnectDelay) + rc.reconnectDelay = ReconnectDelay // Reset for after blackout + log.W.F("spider: max reconnect backoff reached for %s, entering 24-hour blackout period", rc.url) } } @@ -375,7 +479,24 @@ func (rc *RelayConnection) handleDisconnection() { // createSubscriptions creates batch subscriptions for the follow list func (rc *RelayConnection) createSubscriptions(followList [][]byte) { rc.mu.Lock() - defer rc.mu.Unlock() + + // Check if we're in a rate limit backoff period + if time.Now().Before(rc.rateLimitUntil) { + remaining := time.Until(rc.rateLimitUntil) + rc.mu.Unlock() + log.W.F("spider: skipping subscription creation for %s, rate limited for %v more", rc.url, remaining) + + // Schedule retry after backoff period + go func() { + time.Sleep(remaining) + rc.createSubscriptions(followList) + }() + return + } + + // Clear rate limit backoff on successful subscription attempt + rc.rateLimitBackoff = 0 + rc.rateLimitUntil = time.Time{} // Clear existing subscriptions rc.clearSubscriptionsLocked() @@ -386,9 +507,27 @@ func (rc *RelayConnection) createSubscriptions(followList [][]byte) { log.I.F("spider: creating %d subscription batches for %d pubkeys on %s", len(batches), len(followList), rc.url) + // Release lock before creating subscriptions to avoid holding it during delays + rc.mu.Unlock() + for i, batch := range batches { - batchID := fmt.Sprintf("batch-%d", i) // Simple batch ID + // Check context before creating each batch + select { + case <-rc.ctx.Done(): + return + default: + } + + batchID := fmt.Sprintf("batch-%d", i) + + rc.mu.Lock() rc.createBatchSubscription(batchID, batch) + rc.mu.Unlock() + + // Add delay between batches to avoid overwhelming the relay + if i < len(batches)-1 { // Don't delay after the last batch + time.Sleep(BatchCreationDelay) + } } } @@ -457,6 +596,10 @@ func (rc *RelayConnection) createBatchSubscription(batchID string, pubkeys [][]b // handleEvents processes events from the subscription func (bs *BatchSubscription) handleEvents() { + // Throttle event processing to avoid CPU spikes + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + for { select { case <-bs.relay.ctx.Done(): @@ -466,13 +609,19 @@ func (bs *BatchSubscription) handleEvents() { return // Subscription closed } + // Wait for throttle tick to avoid processing events too rapidly + <-ticker.C + // Save event to database if _, err := bs.relay.spider.db.SaveEvent(bs.relay.ctx, ev); err != nil { + // Ignore duplicate events and other errors + log.T.F("spider: failed to save event from %s: %v", bs.relay.url, err) } else { // Publish event if it was newly saved if bs.relay.spider.pub != nil { go bs.relay.spider.pub.Deliver(ev) } + log.T.F("spider: saved event from %s", bs.relay.url) } } } @@ -485,7 +634,14 @@ func (rc *RelayConnection) updateSubscriptions(followList [][]byte) { } rc.mu.Lock() - defer rc.mu.Unlock() + + // Check if we're in a rate limit backoff period + if time.Now().Before(rc.rateLimitUntil) { + remaining := time.Until(rc.rateLimitUntil) + rc.mu.Unlock() + log.D.F("spider: deferring subscription update for %s, rate limited for %v more", rc.url, remaining) + return + } // Check if we need to perform catch-up for disconnected subscriptions now := time.Now() @@ -507,9 +663,28 @@ func (rc *RelayConnection) updateSubscriptions(followList [][]byte) { rc.clearSubscriptionsLocked() batches := rc.createBatches(followList) + + // Release lock before creating subscriptions + rc.mu.Unlock() + for i, batch := range batches { + // Check context before creating each batch + select { + case <-rc.ctx.Done(): + return + default: + } + batchID := fmt.Sprintf("batch-%d", i) + + rc.mu.Lock() rc.createBatchSubscription(batchID, batch) + rc.mu.Unlock() + + // Add delay between batches + if i < len(batches)-1 { + time.Sleep(BatchCreationDelay) + } } } @@ -559,39 +734,43 @@ func (rc *RelayConnection) performCatchup(sub *BatchSubscription, disconnectTime } defer catchupSub.Unsub() - // Process catch-up events + // Process catch-up events with throttling eventCount := 0 - timeout := time.After(30 * time.Second) + timeout := time.After(60 * time.Second) // Increased timeout for catch-up + throttle := time.NewTicker(20 * time.Millisecond) + defer throttle.Stop() for { select { case <-catchupCtx.Done(): - log.D.F("spider: catch-up completed on %s, processed %d events", rc.url, eventCount) + log.I.F("spider: catch-up completed on %s, processed %d events", rc.url, eventCount) return case <-timeout: - log.D.F("spider: catch-up timeout on %s, processed %d events", rc.url, eventCount) + log.I.F("spider: catch-up timeout on %s, processed %d events", rc.url, eventCount) return case <-catchupSub.EndOfStoredEvents: - log.D.F("spider: catch-up EOSE on %s, processed %d events", rc.url, eventCount) + log.I.F("spider: catch-up EOSE on %s, processed %d events", rc.url, eventCount) return case ev := <-catchupSub.Events: if ev == nil { return } + // Throttle event processing + <-throttle.C + eventCount++ // Save event to database if _, err := rc.spider.db.SaveEvent(rc.ctx, ev); err != nil { - if !chk.E(err) { - log.T.F("spider: catch-up saved event %s from %s", - hex.Enc(ev.ID[:]), rc.url) - } + // Silently ignore errors (mostly duplicates) } else { // Publish event if it was newly saved if rc.spider.pub != nil { go rc.spider.pub.Deliver(ev) } + log.T.F("spider: catch-up saved event %s from %s", + hex.Enc(ev.ID[:]), rc.url) } } } diff --git a/pkg/version/version b/pkg/version/version index f53b7bd..5bc2965 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.28.2 \ No newline at end of file +v0.29.0 \ No newline at end of file