From 5fd58681c936ea4608ae79c382a764e757025520 Mon Sep 17 00:00:00 2001 From: mleku Date: Wed, 8 Oct 2025 20:40:46 +0100 Subject: [PATCH] Increase WebSocket message size limit to 100MB and implement handling for oversized messages. Introduce optimal chunk size calculation in Spider for efficient pubkey processing, ensuring compliance with WebSocket constraints. Enhance logging for message sizes and connection events for better debugging. --- app/handle-websocket.go | 21 ++++- pkg/spider/spider.go | 202 +++++++++++++++++++++++++--------------- 2 files changed, 148 insertions(+), 75 deletions(-) diff --git a/app/handle-websocket.go b/app/handle-websocket.go index ff4444e..9155614 100644 --- a/app/handle-websocket.go +++ b/app/handle-websocket.go @@ -20,7 +20,7 @@ const ( DefaultPongWait = 60 * time.Second DefaultPingWait = DefaultPongWait / 2 DefaultWriteTimeout = 3 * time.Second - DefaultMaxMessageSize = 1 * units.Mb + DefaultMaxMessageSize = 100 * units.Mb // CloseMessage denotes a close control message. The optional message // payload contains a numeric code and text. Use the FormatCloseMessage @@ -62,6 +62,8 @@ whitelist: OriginPatterns: []string{"*"}, // Allow all origins for proxy compatibility // Don't check origin when behind a proxy - let the proxy handle it InsecureSkipVerify: true, + // Try to set a higher compression threshold to allow larger messages + CompressionMode: websocket.CompressionDisabled, } if conn, err = websocket.Accept(w, r, acceptOptions); chk.E(err) { @@ -69,7 +71,10 @@ whitelist: return } log.T.F("websocket accepted from %s path=%s", remote, r.URL.String()) + + // Set read limit immediately after connection is established conn.SetReadLimit(DefaultMaxMessageSize) + log.D.F("set read limit to %d bytes (%d MB) for %s", DefaultMaxMessageSize, DefaultMaxMessageSize/units.Mb, remote) defer conn.CloseNow() listener := &Listener{ ctx: ctx, @@ -145,6 +150,14 @@ whitelist: log.T.F("connection from %s closed: %v", remote, err) return } + // Handle message too big errors specifically + if strings.Contains(err.Error(), "MessageTooBig") || + strings.Contains(err.Error(), "read limited at") { + log.D.F("client %s hit message size limit: %v", remote, err) + // Don't log this as an error since it's a client-side limit + // Just close the connection gracefully + return + } status := websocket.CloseStatus(err) switch status { case websocket.StatusNormalClosure, @@ -155,6 +168,8 @@ whitelist: log.T.F( "connection from %s closed with status: %v", remote, status, ) + case websocket.StatusMessageTooBig: + log.D.F("client %s sent message too big: %v", remote, err) default: log.E.F("unexpected close error from %s: %v", remote, err) } @@ -190,6 +205,10 @@ whitelist: writeCancel() continue } + // Log message size for debugging + if len(msg) > 1000 { // Only log for larger messages + log.D.F("received large message from %s: %d bytes", remote, len(msg)) + } // log.T.F("received message from %s: %s", remote, string(msg)) listener.HandleMessage(msg, remote) } diff --git a/pkg/spider/spider.go b/pkg/spider/spider.go index eec105e..8e75591 100644 --- a/pkg/spider/spider.go +++ b/pkg/spider/spider.go @@ -23,6 +23,10 @@ import ( const ( OneTimeSpiderSyncMarker = "spider_one_time_sync_completed" SpiderLastScanMarker = "spider_last_scan_time" + // MaxWebSocketMessageSize is the maximum size for WebSocket messages to avoid 32KB limit + MaxWebSocketMessageSize = 30 * 1024 // 30KB to be safe + // PubkeyHexSize is the size of a hex-encoded pubkey (32 bytes = 64 hex chars) + PubkeyHexSize = 64 ) type Spider struct { @@ -271,11 +275,33 @@ func (s *Spider) discoverRelays(followedPubkeys [][]byte) ([]string, error) { return urls, nil } +// calculateOptimalChunkSize calculates the optimal chunk size for pubkeys to stay under message size limit +func (s *Spider) calculateOptimalChunkSize() int { + // Estimate the size of a filter with timestamps and other fields + // Base filter overhead: ~200 bytes for timestamps, limits, etc. + baseFilterSize := 200 + + // Calculate how many pubkeys we can fit in the remaining space + availableSpace := MaxWebSocketMessageSize - baseFilterSize + maxPubkeys := availableSpace / PubkeyHexSize + + // Use a conservative chunk size (80% of max to be safe) + chunkSize := int(float64(maxPubkeys) * 0.8) + + // Ensure minimum chunk size of 10 + if chunkSize < 10 { + chunkSize = 10 + } + + log.D.F("Spider: calculated optimal chunk size: %d pubkeys (max would be %d)", chunkSize, maxPubkeys) + return chunkSize +} + // queryRelayForEvents connects to a relay and queries for events from followed pubkeys func (s *Spider) queryRelayForEvents( relayURL string, followedPubkeys [][]byte, startTime, endTime time.Time, ) (int, error) { - log.T.F("Spider sync: querying relay %s", relayURL) + log.T.F("Spider sync: querying relay %s with %d pubkeys", relayURL, len(followedPubkeys)) // Connect to the relay with a timeout context ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second) @@ -287,82 +313,110 @@ func (s *Spider) queryRelayForEvents( } defer client.Close() - // Create filter for the time range and followed pubkeys - f := &filter.F{ - Authors: tag.NewFromBytesSlice(followedPubkeys...), - Since: timestamp.FromUnix(startTime.Unix()), - Until: timestamp.FromUnix(endTime.Unix()), - Limit: func() *uint { l := uint(1000); return &l }(), // Limit to avoid overwhelming - } + // Break pubkeys into chunks to avoid 32KB message limit + chunkSize := s.calculateOptimalChunkSize() + totalEventsSaved := 0 - // Subscribe to get events - sub, err := client.Subscribe(ctx, filter.NewS(f)) - if err != nil { - return 0, err - } - defer sub.Unsub() - - eventsCount := 0 - eventsSaved := 0 - timeout := time.After(10 * time.Second) // Timeout for receiving events - - for { - select { - case <-ctx.Done(): - log.T.F( - "Spider sync: context done for relay %s, saved %d/%d events", - relayURL, eventsSaved, eventsCount, - ) - return eventsSaved, nil - case <-timeout: - log.T.F( - "Spider sync: timeout for relay %s, saved %d/%d events", - relayURL, eventsSaved, eventsCount, - ) - return eventsSaved, nil - case <-sub.EndOfStoredEvents: - log.T.F( - "Spider sync: end of stored events for relay %s, saved %d/%d events", - relayURL, eventsSaved, eventsCount, - ) - return eventsSaved, nil - case ev := <-sub.Events: - if ev == nil { - continue - } - eventsCount++ - - // Verify the event signature - if ok, err := ev.Verify(); !ok || err != nil { - log.T.F( - "Spider sync: invalid event signature from relay %s", - relayURL, - ) - ev.Free() - continue - } - - // Save the event to the database - if _, _, err := s.db.SaveEvent(s.ctx, ev); err != nil { - if !strings.HasPrefix(err.Error(), "blocked:") { - log.T.F( - "Spider sync: error saving event from relay %s: %v", - relayURL, err, - ) - } - // Event might already exist, which is fine for deduplication - } else { - eventsSaved++ - if eventsSaved%10 == 0 { - log.T.F( - "Spider sync: saved %d events from relay %s", - eventsSaved, relayURL, - ) - } - } - ev.Free() + for i := 0; i < len(followedPubkeys); i += chunkSize { + end := i + chunkSize + if end > len(followedPubkeys) { + end = len(followedPubkeys) } + + chunk := followedPubkeys[i:end] + log.T.F("Spider sync: processing chunk %d-%d (%d pubkeys) for relay %s", + i, end-1, len(chunk), relayURL) + + // Create filter for this chunk of pubkeys + f := &filter.F{ + Authors: tag.NewFromBytesSlice(chunk...), + Since: timestamp.FromUnix(startTime.Unix()), + Until: timestamp.FromUnix(endTime.Unix()), + Limit: func() *uint { l := uint(1000); return &l }(), // Limit to avoid overwhelming + } + + // Subscribe to get events for this chunk + sub, err := client.Subscribe(ctx, filter.NewS(f)) + if err != nil { + log.E.F("Spider sync: failed to subscribe to chunk %d-%d for relay %s: %v", + i, end-1, relayURL, err) + continue + } + + chunkEventsSaved := 0 + chunkEventsCount := 0 + timeout := time.After(10 * time.Second) // Timeout for receiving events + + chunkDone := false + for !chunkDone { + select { + case <-ctx.Done(): + log.T.F( + "Spider sync: context done for relay %s chunk %d-%d, saved %d/%d events", + relayURL, i, end-1, chunkEventsSaved, chunkEventsCount, + ) + chunkDone = true + case <-timeout: + log.T.F( + "Spider sync: timeout for relay %s chunk %d-%d, saved %d/%d events", + relayURL, i, end-1, chunkEventsSaved, chunkEventsCount, + ) + chunkDone = true + case <-sub.EndOfStoredEvents: + log.T.F( + "Spider sync: end of stored events for relay %s chunk %d-%d, saved %d/%d events", + relayURL, i, end-1, chunkEventsSaved, chunkEventsCount, + ) + chunkDone = true + case ev := <-sub.Events: + if ev == nil { + continue + } + chunkEventsCount++ + + // Verify the event signature + if ok, err := ev.Verify(); !ok || err != nil { + log.T.F( + "Spider sync: invalid event signature from relay %s", + relayURL, + ) + ev.Free() + continue + } + + // Save the event to the database + if _, _, err := s.db.SaveEvent(s.ctx, ev); err != nil { + if !strings.HasPrefix(err.Error(), "blocked:") { + log.T.F( + "Spider sync: error saving event from relay %s: %v", + relayURL, err, + ) + } + // Event might already exist, which is fine for deduplication + } else { + chunkEventsSaved++ + if chunkEventsSaved%10 == 0 { + log.T.F( + "Spider sync: saved %d events from relay %s chunk %d-%d", + chunkEventsSaved, relayURL, i, end-1, + ) + } + } + ev.Free() + } + } + + // Clean up subscription + sub.Unsub() + totalEventsSaved += chunkEventsSaved + + log.T.F("Spider sync: completed chunk %d-%d for relay %s, saved %d events", + i, end-1, relayURL, chunkEventsSaved) } + + log.T.F("Spider sync: completed all chunks for relay %s, total saved %d events", + relayURL, totalEventsSaved) + return totalEventsSaved, nil } // Stop stops the spider functionality