Increase WebSocket message size limit to 100MB and implement handling for oversized messages. Introduce optimal chunk size calculation in Spider for efficient pubkey processing, ensuring compliance with WebSocket constraints. Enhance logging for message sizes and connection events for better debugging.
This commit is contained in:
@@ -23,6 +23,10 @@ import (
|
||||
const (
|
||||
OneTimeSpiderSyncMarker = "spider_one_time_sync_completed"
|
||||
SpiderLastScanMarker = "spider_last_scan_time"
|
||||
// MaxWebSocketMessageSize is the maximum size for WebSocket messages to avoid 32KB limit
|
||||
MaxWebSocketMessageSize = 30 * 1024 // 30KB to be safe
|
||||
// PubkeyHexSize is the size of a hex-encoded pubkey (32 bytes = 64 hex chars)
|
||||
PubkeyHexSize = 64
|
||||
)
|
||||
|
||||
type Spider struct {
|
||||
@@ -271,11 +275,33 @@ func (s *Spider) discoverRelays(followedPubkeys [][]byte) ([]string, error) {
|
||||
return urls, nil
|
||||
}
|
||||
|
||||
// calculateOptimalChunkSize calculates the optimal chunk size for pubkeys to stay under message size limit
|
||||
func (s *Spider) calculateOptimalChunkSize() int {
|
||||
// Estimate the size of a filter with timestamps and other fields
|
||||
// Base filter overhead: ~200 bytes for timestamps, limits, etc.
|
||||
baseFilterSize := 200
|
||||
|
||||
// Calculate how many pubkeys we can fit in the remaining space
|
||||
availableSpace := MaxWebSocketMessageSize - baseFilterSize
|
||||
maxPubkeys := availableSpace / PubkeyHexSize
|
||||
|
||||
// Use a conservative chunk size (80% of max to be safe)
|
||||
chunkSize := int(float64(maxPubkeys) * 0.8)
|
||||
|
||||
// Ensure minimum chunk size of 10
|
||||
if chunkSize < 10 {
|
||||
chunkSize = 10
|
||||
}
|
||||
|
||||
log.D.F("Spider: calculated optimal chunk size: %d pubkeys (max would be %d)", chunkSize, maxPubkeys)
|
||||
return chunkSize
|
||||
}
|
||||
|
||||
// queryRelayForEvents connects to a relay and queries for events from followed pubkeys
|
||||
func (s *Spider) queryRelayForEvents(
|
||||
relayURL string, followedPubkeys [][]byte, startTime, endTime time.Time,
|
||||
) (int, error) {
|
||||
log.T.F("Spider sync: querying relay %s", relayURL)
|
||||
log.T.F("Spider sync: querying relay %s with %d pubkeys", relayURL, len(followedPubkeys))
|
||||
|
||||
// Connect to the relay with a timeout context
|
||||
ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second)
|
||||
@@ -287,82 +313,110 @@ func (s *Spider) queryRelayForEvents(
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// Create filter for the time range and followed pubkeys
|
||||
f := &filter.F{
|
||||
Authors: tag.NewFromBytesSlice(followedPubkeys...),
|
||||
Since: timestamp.FromUnix(startTime.Unix()),
|
||||
Until: timestamp.FromUnix(endTime.Unix()),
|
||||
Limit: func() *uint { l := uint(1000); return &l }(), // Limit to avoid overwhelming
|
||||
}
|
||||
// Break pubkeys into chunks to avoid 32KB message limit
|
||||
chunkSize := s.calculateOptimalChunkSize()
|
||||
totalEventsSaved := 0
|
||||
|
||||
// Subscribe to get events
|
||||
sub, err := client.Subscribe(ctx, filter.NewS(f))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer sub.Unsub()
|
||||
|
||||
eventsCount := 0
|
||||
eventsSaved := 0
|
||||
timeout := time.After(10 * time.Second) // Timeout for receiving events
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.T.F(
|
||||
"Spider sync: context done for relay %s, saved %d/%d events",
|
||||
relayURL, eventsSaved, eventsCount,
|
||||
)
|
||||
return eventsSaved, nil
|
||||
case <-timeout:
|
||||
log.T.F(
|
||||
"Spider sync: timeout for relay %s, saved %d/%d events",
|
||||
relayURL, eventsSaved, eventsCount,
|
||||
)
|
||||
return eventsSaved, nil
|
||||
case <-sub.EndOfStoredEvents:
|
||||
log.T.F(
|
||||
"Spider sync: end of stored events for relay %s, saved %d/%d events",
|
||||
relayURL, eventsSaved, eventsCount,
|
||||
)
|
||||
return eventsSaved, nil
|
||||
case ev := <-sub.Events:
|
||||
if ev == nil {
|
||||
continue
|
||||
}
|
||||
eventsCount++
|
||||
|
||||
// Verify the event signature
|
||||
if ok, err := ev.Verify(); !ok || err != nil {
|
||||
log.T.F(
|
||||
"Spider sync: invalid event signature from relay %s",
|
||||
relayURL,
|
||||
)
|
||||
ev.Free()
|
||||
continue
|
||||
}
|
||||
|
||||
// Save the event to the database
|
||||
if _, _, err := s.db.SaveEvent(s.ctx, ev); err != nil {
|
||||
if !strings.HasPrefix(err.Error(), "blocked:") {
|
||||
log.T.F(
|
||||
"Spider sync: error saving event from relay %s: %v",
|
||||
relayURL, err,
|
||||
)
|
||||
}
|
||||
// Event might already exist, which is fine for deduplication
|
||||
} else {
|
||||
eventsSaved++
|
||||
if eventsSaved%10 == 0 {
|
||||
log.T.F(
|
||||
"Spider sync: saved %d events from relay %s",
|
||||
eventsSaved, relayURL,
|
||||
)
|
||||
}
|
||||
}
|
||||
ev.Free()
|
||||
for i := 0; i < len(followedPubkeys); i += chunkSize {
|
||||
end := i + chunkSize
|
||||
if end > len(followedPubkeys) {
|
||||
end = len(followedPubkeys)
|
||||
}
|
||||
|
||||
chunk := followedPubkeys[i:end]
|
||||
log.T.F("Spider sync: processing chunk %d-%d (%d pubkeys) for relay %s",
|
||||
i, end-1, len(chunk), relayURL)
|
||||
|
||||
// Create filter for this chunk of pubkeys
|
||||
f := &filter.F{
|
||||
Authors: tag.NewFromBytesSlice(chunk...),
|
||||
Since: timestamp.FromUnix(startTime.Unix()),
|
||||
Until: timestamp.FromUnix(endTime.Unix()),
|
||||
Limit: func() *uint { l := uint(1000); return &l }(), // Limit to avoid overwhelming
|
||||
}
|
||||
|
||||
// Subscribe to get events for this chunk
|
||||
sub, err := client.Subscribe(ctx, filter.NewS(f))
|
||||
if err != nil {
|
||||
log.E.F("Spider sync: failed to subscribe to chunk %d-%d for relay %s: %v",
|
||||
i, end-1, relayURL, err)
|
||||
continue
|
||||
}
|
||||
|
||||
chunkEventsSaved := 0
|
||||
chunkEventsCount := 0
|
||||
timeout := time.After(10 * time.Second) // Timeout for receiving events
|
||||
|
||||
chunkDone := false
|
||||
for !chunkDone {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.T.F(
|
||||
"Spider sync: context done for relay %s chunk %d-%d, saved %d/%d events",
|
||||
relayURL, i, end-1, chunkEventsSaved, chunkEventsCount,
|
||||
)
|
||||
chunkDone = true
|
||||
case <-timeout:
|
||||
log.T.F(
|
||||
"Spider sync: timeout for relay %s chunk %d-%d, saved %d/%d events",
|
||||
relayURL, i, end-1, chunkEventsSaved, chunkEventsCount,
|
||||
)
|
||||
chunkDone = true
|
||||
case <-sub.EndOfStoredEvents:
|
||||
log.T.F(
|
||||
"Spider sync: end of stored events for relay %s chunk %d-%d, saved %d/%d events",
|
||||
relayURL, i, end-1, chunkEventsSaved, chunkEventsCount,
|
||||
)
|
||||
chunkDone = true
|
||||
case ev := <-sub.Events:
|
||||
if ev == nil {
|
||||
continue
|
||||
}
|
||||
chunkEventsCount++
|
||||
|
||||
// Verify the event signature
|
||||
if ok, err := ev.Verify(); !ok || err != nil {
|
||||
log.T.F(
|
||||
"Spider sync: invalid event signature from relay %s",
|
||||
relayURL,
|
||||
)
|
||||
ev.Free()
|
||||
continue
|
||||
}
|
||||
|
||||
// Save the event to the database
|
||||
if _, _, err := s.db.SaveEvent(s.ctx, ev); err != nil {
|
||||
if !strings.HasPrefix(err.Error(), "blocked:") {
|
||||
log.T.F(
|
||||
"Spider sync: error saving event from relay %s: %v",
|
||||
relayURL, err,
|
||||
)
|
||||
}
|
||||
// Event might already exist, which is fine for deduplication
|
||||
} else {
|
||||
chunkEventsSaved++
|
||||
if chunkEventsSaved%10 == 0 {
|
||||
log.T.F(
|
||||
"Spider sync: saved %d events from relay %s chunk %d-%d",
|
||||
chunkEventsSaved, relayURL, i, end-1,
|
||||
)
|
||||
}
|
||||
}
|
||||
ev.Free()
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up subscription
|
||||
sub.Unsub()
|
||||
totalEventsSaved += chunkEventsSaved
|
||||
|
||||
log.T.F("Spider sync: completed chunk %d-%d for relay %s, saved %d events",
|
||||
i, end-1, relayURL, chunkEventsSaved)
|
||||
}
|
||||
|
||||
log.T.F("Spider sync: completed all chunks for relay %s, total saved %d events",
|
||||
relayURL, totalEventsSaved)
|
||||
return totalEventsSaved, nil
|
||||
}
|
||||
|
||||
// Stop stops the spider functionality
|
||||
|
||||
Reference in New Issue
Block a user