diff --git a/app/handle-websocket.go b/app/handle-websocket.go index ff4444e..9155614 100644 --- a/app/handle-websocket.go +++ b/app/handle-websocket.go @@ -20,7 +20,7 @@ const ( DefaultPongWait = 60 * time.Second DefaultPingWait = DefaultPongWait / 2 DefaultWriteTimeout = 3 * time.Second - DefaultMaxMessageSize = 1 * units.Mb + DefaultMaxMessageSize = 100 * units.Mb // CloseMessage denotes a close control message. The optional message // payload contains a numeric code and text. Use the FormatCloseMessage @@ -62,6 +62,8 @@ whitelist: OriginPatterns: []string{"*"}, // Allow all origins for proxy compatibility // Don't check origin when behind a proxy - let the proxy handle it InsecureSkipVerify: true, + // Try to set a higher compression threshold to allow larger messages + CompressionMode: websocket.CompressionDisabled, } if conn, err = websocket.Accept(w, r, acceptOptions); chk.E(err) { @@ -69,7 +71,10 @@ whitelist: return } log.T.F("websocket accepted from %s path=%s", remote, r.URL.String()) + + // Set read limit immediately after connection is established conn.SetReadLimit(DefaultMaxMessageSize) + log.D.F("set read limit to %d bytes (%d MB) for %s", DefaultMaxMessageSize, DefaultMaxMessageSize/units.Mb, remote) defer conn.CloseNow() listener := &Listener{ ctx: ctx, @@ -145,6 +150,14 @@ whitelist: log.T.F("connection from %s closed: %v", remote, err) return } + // Handle message too big errors specifically + if strings.Contains(err.Error(), "MessageTooBig") || + strings.Contains(err.Error(), "read limited at") { + log.D.F("client %s hit message size limit: %v", remote, err) + // Don't log this as an error since it's a client-side limit + // Just close the connection gracefully + return + } status := websocket.CloseStatus(err) switch status { case websocket.StatusNormalClosure, @@ -155,6 +168,8 @@ whitelist: log.T.F( "connection from %s closed with status: %v", remote, status, ) + case websocket.StatusMessageTooBig: + log.D.F("client %s sent message too big: %v", remote, err) default: log.E.F("unexpected close error from %s: %v", remote, err) } @@ -190,6 +205,10 @@ whitelist: writeCancel() continue } + // Log message size for debugging + if len(msg) > 1000 { // Only log for larger messages + log.D.F("received large message from %s: %d bytes", remote, len(msg)) + } // log.T.F("received message from %s: %s", remote, string(msg)) listener.HandleMessage(msg, remote) } diff --git a/pkg/spider/spider.go b/pkg/spider/spider.go index eec105e..8e75591 100644 --- a/pkg/spider/spider.go +++ b/pkg/spider/spider.go @@ -23,6 +23,10 @@ import ( const ( OneTimeSpiderSyncMarker = "spider_one_time_sync_completed" SpiderLastScanMarker = "spider_last_scan_time" + // MaxWebSocketMessageSize is the maximum size for WebSocket messages to avoid 32KB limit + MaxWebSocketMessageSize = 30 * 1024 // 30KB to be safe + // PubkeyHexSize is the size of a hex-encoded pubkey (32 bytes = 64 hex chars) + PubkeyHexSize = 64 ) type Spider struct { @@ -271,11 +275,33 @@ func (s *Spider) discoverRelays(followedPubkeys [][]byte) ([]string, error) { return urls, nil } +// calculateOptimalChunkSize calculates the optimal chunk size for pubkeys to stay under message size limit +func (s *Spider) calculateOptimalChunkSize() int { + // Estimate the size of a filter with timestamps and other fields + // Base filter overhead: ~200 bytes for timestamps, limits, etc. + baseFilterSize := 200 + + // Calculate how many pubkeys we can fit in the remaining space + availableSpace := MaxWebSocketMessageSize - baseFilterSize + maxPubkeys := availableSpace / PubkeyHexSize + + // Use a conservative chunk size (80% of max to be safe) + chunkSize := int(float64(maxPubkeys) * 0.8) + + // Ensure minimum chunk size of 10 + if chunkSize < 10 { + chunkSize = 10 + } + + log.D.F("Spider: calculated optimal chunk size: %d pubkeys (max would be %d)", chunkSize, maxPubkeys) + return chunkSize +} + // queryRelayForEvents connects to a relay and queries for events from followed pubkeys func (s *Spider) queryRelayForEvents( relayURL string, followedPubkeys [][]byte, startTime, endTime time.Time, ) (int, error) { - log.T.F("Spider sync: querying relay %s", relayURL) + log.T.F("Spider sync: querying relay %s with %d pubkeys", relayURL, len(followedPubkeys)) // Connect to the relay with a timeout context ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second) @@ -287,82 +313,110 @@ func (s *Spider) queryRelayForEvents( } defer client.Close() - // Create filter for the time range and followed pubkeys - f := &filter.F{ - Authors: tag.NewFromBytesSlice(followedPubkeys...), - Since: timestamp.FromUnix(startTime.Unix()), - Until: timestamp.FromUnix(endTime.Unix()), - Limit: func() *uint { l := uint(1000); return &l }(), // Limit to avoid overwhelming - } + // Break pubkeys into chunks to avoid 32KB message limit + chunkSize := s.calculateOptimalChunkSize() + totalEventsSaved := 0 - // Subscribe to get events - sub, err := client.Subscribe(ctx, filter.NewS(f)) - if err != nil { - return 0, err - } - defer sub.Unsub() - - eventsCount := 0 - eventsSaved := 0 - timeout := time.After(10 * time.Second) // Timeout for receiving events - - for { - select { - case <-ctx.Done(): - log.T.F( - "Spider sync: context done for relay %s, saved %d/%d events", - relayURL, eventsSaved, eventsCount, - ) - return eventsSaved, nil - case <-timeout: - log.T.F( - "Spider sync: timeout for relay %s, saved %d/%d events", - relayURL, eventsSaved, eventsCount, - ) - return eventsSaved, nil - case <-sub.EndOfStoredEvents: - log.T.F( - "Spider sync: end of stored events for relay %s, saved %d/%d events", - relayURL, eventsSaved, eventsCount, - ) - return eventsSaved, nil - case ev := <-sub.Events: - if ev == nil { - continue - } - eventsCount++ - - // Verify the event signature - if ok, err := ev.Verify(); !ok || err != nil { - log.T.F( - "Spider sync: invalid event signature from relay %s", - relayURL, - ) - ev.Free() - continue - } - - // Save the event to the database - if _, _, err := s.db.SaveEvent(s.ctx, ev); err != nil { - if !strings.HasPrefix(err.Error(), "blocked:") { - log.T.F( - "Spider sync: error saving event from relay %s: %v", - relayURL, err, - ) - } - // Event might already exist, which is fine for deduplication - } else { - eventsSaved++ - if eventsSaved%10 == 0 { - log.T.F( - "Spider sync: saved %d events from relay %s", - eventsSaved, relayURL, - ) - } - } - ev.Free() + for i := 0; i < len(followedPubkeys); i += chunkSize { + end := i + chunkSize + if end > len(followedPubkeys) { + end = len(followedPubkeys) } + + chunk := followedPubkeys[i:end] + log.T.F("Spider sync: processing chunk %d-%d (%d pubkeys) for relay %s", + i, end-1, len(chunk), relayURL) + + // Create filter for this chunk of pubkeys + f := &filter.F{ + Authors: tag.NewFromBytesSlice(chunk...), + Since: timestamp.FromUnix(startTime.Unix()), + Until: timestamp.FromUnix(endTime.Unix()), + Limit: func() *uint { l := uint(1000); return &l }(), // Limit to avoid overwhelming + } + + // Subscribe to get events for this chunk + sub, err := client.Subscribe(ctx, filter.NewS(f)) + if err != nil { + log.E.F("Spider sync: failed to subscribe to chunk %d-%d for relay %s: %v", + i, end-1, relayURL, err) + continue + } + + chunkEventsSaved := 0 + chunkEventsCount := 0 + timeout := time.After(10 * time.Second) // Timeout for receiving events + + chunkDone := false + for !chunkDone { + select { + case <-ctx.Done(): + log.T.F( + "Spider sync: context done for relay %s chunk %d-%d, saved %d/%d events", + relayURL, i, end-1, chunkEventsSaved, chunkEventsCount, + ) + chunkDone = true + case <-timeout: + log.T.F( + "Spider sync: timeout for relay %s chunk %d-%d, saved %d/%d events", + relayURL, i, end-1, chunkEventsSaved, chunkEventsCount, + ) + chunkDone = true + case <-sub.EndOfStoredEvents: + log.T.F( + "Spider sync: end of stored events for relay %s chunk %d-%d, saved %d/%d events", + relayURL, i, end-1, chunkEventsSaved, chunkEventsCount, + ) + chunkDone = true + case ev := <-sub.Events: + if ev == nil { + continue + } + chunkEventsCount++ + + // Verify the event signature + if ok, err := ev.Verify(); !ok || err != nil { + log.T.F( + "Spider sync: invalid event signature from relay %s", + relayURL, + ) + ev.Free() + continue + } + + // Save the event to the database + if _, _, err := s.db.SaveEvent(s.ctx, ev); err != nil { + if !strings.HasPrefix(err.Error(), "blocked:") { + log.T.F( + "Spider sync: error saving event from relay %s: %v", + relayURL, err, + ) + } + // Event might already exist, which is fine for deduplication + } else { + chunkEventsSaved++ + if chunkEventsSaved%10 == 0 { + log.T.F( + "Spider sync: saved %d events from relay %s chunk %d-%d", + chunkEventsSaved, relayURL, i, end-1, + ) + } + } + ev.Free() + } + } + + // Clean up subscription + sub.Unsub() + totalEventsSaved += chunkEventsSaved + + log.T.F("Spider sync: completed chunk %d-%d for relay %s, saved %d events", + i, end-1, relayURL, chunkEventsSaved) } + + log.T.F("Spider sync: completed all chunks for relay %s, total saved %d events", + relayURL, totalEventsSaved) + return totalEventsSaved, nil } // Stop stops the spider functionality