diff --git a/cmd/aggregator/README.md b/cmd/aggregator/README.md new file mode 100644 index 0000000..c44f0ca --- /dev/null +++ b/cmd/aggregator/README.md @@ -0,0 +1,119 @@ +# Nostr Event Aggregator + +A comprehensive program that searches for all events related to a specific npub across multiple Nostr relays and outputs them in JSONL format to stdout. The program finds both events authored by the user and events that mention the user in "p" tags. It features dynamic relay discovery from relay list events and progressive backward time-based fetching for complete historical data collection. + +## Usage + +```bash +go run main.go -npub [-since ] [-until ] +``` + +Where: +- `` is a bech32-encoded Nostr public key (starting with "npub1") +- `` is a Unix timestamp (seconds since epoch) - optional + +## Examples + +```bash +# Get all events related to a user (authored by and mentioning) +go run main.go -npub npub1234567890abcdef... + +# Get events related to a user since January 1, 2022 +go run main.go -npub npub1234567890abcdef... -since 1640995200 + +# Get events related to a user between two dates +go run main.go -npub npub1234567890abcdef... -since 1640995200 -until 1672531200 + +# Get events related to a user until December 31, 2022 +go run main.go -npub npub1234567890abcdef... -until 1672531200 +``` + +## Features + +- **Comprehensive event discovery**: Finds both events authored by the user and events that mention the user +- **Dynamic relay discovery**: Automatically discovers and connects to new relays from relay list events (kind 10002) +- **Progressive backward fetching**: Systematically collects historical data in time-based batches +- **Triple filter approach**: Uses separate filters for authored events, p-tag mentions, and relay list events +- **Intelligent time management**: Works backwards from current time (or until timestamp) to since timestamp +- **Memory-efficient deduplication**: Uses bloom filter with ~0.1% false positive rate instead of unbounded maps +- **Fixed memory footprint**: Bloom filter uses only ~1.75MB for 1M events with controlled memory growth +- **Memory monitoring**: Real-time memory usage tracking and automatic garbage collection +- Connects to multiple relays simultaneously with dynamic expansion +- Outputs events in JSONL format (one JSON object per line) +- Handles connection failures gracefully +- Continues running until all relay connections are closed +- Time-based filtering with Unix timestamps (since/until parameters) +- Input validation for timestamp ranges + +## Event Discovery + +The aggregator searches for three types of events: + +1. **Authored Events**: Events where the specified npub is the author (pubkey field matches) +2. **Mentioned Events**: Events that contain "p" tags referencing the specified npub (replies, mentions, etc.) +3. **Relay List Events**: Kind 10002 events that contain relay URLs for dynamic relay discovery + +This comprehensive approach ensures you capture all events related to a user, including: +- Posts authored by the user +- Replies to the user's posts +- Posts that mention or tag the user +- Any other events that reference the user in p-tags +- Relay list metadata for discovering additional relays + +## Progressive Fetching + +The aggregator uses an intelligent progressive backward fetching strategy: + +1. **Time-based batches**: Fetches data in weekly batches working backwards from the end time +2. **Dynamic relay expansion**: As relay list events are discovered, new relays are automatically added to the search +3. **Complete coverage**: Ensures all events between since and until timestamps are collected +4. **Efficient processing**: Processes each time batch completely before moving to the next +5. **Boundary respect**: Stops when reaching the since timestamp or beginning of available data + +## Memory Management + +The aggregator uses advanced memory management techniques to handle large-scale data collection: + +### Bloom Filter Deduplication +- **Fixed Size**: Uses exactly 1.75MB for the bloom filter regardless of event count +- **Low False Positive Rate**: Configured for ~0.1% false positive rate with 1M events +- **Hash Functions**: Uses 10 independent hash functions based on SHA256 for optimal distribution +- **Thread-Safe**: Concurrent access protected with read-write mutexes + +### Memory Monitoring +- **Real-time Tracking**: Monitors total memory usage every 30 seconds +- **Automatic GC**: Triggers garbage collection when approaching memory limits +- **Statistics Logging**: Reports bloom filter usage, estimated event count, and memory consumption +- **Controlled Growth**: Prevents unbounded memory growth through fixed-size data structures + +### Performance Characteristics +- **Memory Usage**: ~1.75MB bloom filter + ~256MB total memory limit +- **False Positives**: ~0.1% chance of incorrectly identifying a duplicate (very low impact) +- **Scalability**: Can handle millions of events without memory issues +- **Efficiency**: O(k) time complexity for both add and lookup operations (k = hash functions) + +## Relays + +The program starts with the following initial relays: + +- wss://nostr.wine/ +- wss://nostr.land/ +- wss://orly-relay.imwald.eu +- wss://relay.orly.dev/ +- wss://relay.damus.io/ +- wss://nos.lol/ +- wss://theforest.nostr1.com/ + +**Dynamic Relay Discovery**: Additional relays are automatically discovered and added during execution when the program finds relay list events (kind 10002) authored by the target user. This ensures comprehensive coverage across the user's preferred relay network. + +## Output Format + +Each line of output is a JSON object representing a Nostr event with the following fields: + +- `id`: Event ID (hex) +- `pubkey`: Author's public key (hex) +- `created_at`: Unix timestamp +- `kind`: Event kind number +- `tags`: Array of tag arrays +- `content`: Event content string +- `sig`: Event signature (hex) diff --git a/cmd/aggregator/main.go b/cmd/aggregator/main.go new file mode 100644 index 0000000..e101a95 --- /dev/null +++ b/cmd/aggregator/main.go @@ -0,0 +1,1006 @@ +package main + +import ( + "context" + "encoding/base64" + "encoding/binary" + "encoding/json" + "flag" + "fmt" + "math" + "os" + "runtime" + "strconv" + "strings" + "sync" + "time" + + "lol.mleku.dev/chk" + "lol.mleku.dev/log" + "next.orly.dev/pkg/crypto/sha256" + "next.orly.dev/pkg/encoders/bech32encoding" + "next.orly.dev/pkg/encoders/event" + "next.orly.dev/pkg/encoders/filter" + "next.orly.dev/pkg/encoders/hex" + "next.orly.dev/pkg/encoders/kind" + "next.orly.dev/pkg/encoders/tag" + "next.orly.dev/pkg/encoders/timestamp" + "next.orly.dev/pkg/protocol/ws" +) + +const ( + // Bloom filter parameters for ~0.1% false positive rate with 1M events + bloomFilterBits = 14377588 // ~1.75MB for 1M events at 0.1% FPR + bloomFilterHashFuncs = 10 // Optimal number of hash functions + maxMemoryMB = 256 // Maximum memory usage in MB + memoryCheckInterval = 30 * time.Second + + // Rate limiting parameters + baseRetryDelay = 1 * time.Second + maxRetryDelay = 60 * time.Second + maxRetries = 5 + batchSize = time.Hour * 24 * 7 // 1 week batches +) + +var relays = []string{ + "wss://nostr.wine/", + "wss://nostr.land/", + "wss://orly-relay.imwald.eu", + "wss://relay.orly.dev/", + "wss://relay.damus.io/", + "wss://nos.lol/", + "wss://theforest.nostr1.com/", +} + +// BloomFilter implements a memory-efficient bloom filter for event deduplication +type BloomFilter struct { + bits []byte + size uint32 + hashFunc int + mutex sync.RWMutex +} + +// NewBloomFilter creates a new bloom filter with specified parameters +func NewBloomFilter(bits uint32, hashFuncs int) *BloomFilter { + return &BloomFilter{ + bits: make([]byte, (bits+7)/8), // Round up to nearest byte + size: bits, + hashFunc: hashFuncs, + } +} + +// hash generates multiple hash values for a given input using SHA256 +func (bf *BloomFilter) hash(data []byte) []uint32 { + hashes := make([]uint32, bf.hashFunc) + + // Use SHA256 as base hash + baseHash := sha256.Sum256(data) + + // Generate multiple hash values by combining with different salts + for i := 0; i < bf.hashFunc; i++ { + // Create salt by appending index + saltedData := make([]byte, len(baseHash)+4) + copy(saltedData, baseHash[:]) + binary.LittleEndian.PutUint32(saltedData[len(baseHash):], uint32(i)) + + // Hash the salted data + h := sha256.Sum256(saltedData) + + // Convert first 4 bytes to uint32 and mod by filter size + hashVal := binary.LittleEndian.Uint32(h[:4]) + hashes[i] = hashVal % bf.size + } + + return hashes +} + +// Add adds an item to the bloom filter +func (bf *BloomFilter) Add(data []byte) { + bf.mutex.Lock() + defer bf.mutex.Unlock() + + hashes := bf.hash(data) + for _, h := range hashes { + byteIndex := h / 8 + bitIndex := h % 8 + bf.bits[byteIndex] |= 1 << bitIndex + } +} + +// Contains checks if an item might be in the bloom filter +func (bf *BloomFilter) Contains(data []byte) bool { + bf.mutex.RLock() + defer bf.mutex.RUnlock() + + hashes := bf.hash(data) + for _, h := range hashes { + byteIndex := h / 8 + bitIndex := h % 8 + if bf.bits[byteIndex]&(1<= m { + return uint32(m / k) // Saturated filter + } + + estimated := -(m / k) * math.Log(1-(x/m)) + return uint32(estimated) +} + +// MemoryUsage returns the memory usage in bytes +func (bf *BloomFilter) MemoryUsage() int { + return len(bf.bits) +} + +// ToBase64 serializes the bloom filter to a base64 encoded string +func (bf *BloomFilter) ToBase64() string { + bf.mutex.RLock() + defer bf.mutex.RUnlock() + + // Create a serialization format: [size:4][hashFunc:4][bits:variable] + serialized := make([]byte, 8+len(bf.bits)) + + // Write size (4 bytes) + binary.LittleEndian.PutUint32(serialized[0:4], bf.size) + + // Write hash function count (4 bytes) + binary.LittleEndian.PutUint32(serialized[4:8], uint32(bf.hashFunc)) + + // Write bits data + copy(serialized[8:], bf.bits) + + return base64.StdEncoding.EncodeToString(serialized) +} + +// FromBase64 deserializes a bloom filter from a base64 encoded string +func FromBase64(encoded string) (*BloomFilter, error) { + data, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + return nil, fmt.Errorf("failed to decode base64: %w", err) + } + + if len(data) < 8 { + return nil, fmt.Errorf("invalid bloom filter data: too short") + } + + // Read size (4 bytes) + size := binary.LittleEndian.Uint32(data[0:4]) + + // Read hash function count (4 bytes) + hashFunc := int(binary.LittleEndian.Uint32(data[4:8])) + + // Read bits data + bits := make([]byte, len(data)-8) + copy(bits, data[8:]) + + // Validate that the bits length matches the expected size + expectedBytesLen := (size + 7) / 8 + if uint32(len(bits)) != expectedBytesLen { + return nil, fmt.Errorf("invalid bloom filter data: bits length mismatch") + } + + return &BloomFilter{ + bits: bits, + size: size, + hashFunc: hashFunc, + }, nil +} + +// RelayState tracks the state and rate limiting for each relay +type RelayState struct { + url string + retryCount int + nextRetryTime time.Time + rateLimited bool + completed bool + mutex sync.RWMutex +} + +// TimeWindow represents a time range for progressive fetching +type TimeWindow struct { + since *timestamp.T + until *timestamp.T +} + +// CompletionTracker tracks which relay-time window combinations have been completed +type CompletionTracker struct { + completed map[string]map[string]bool // relay -> timewindow -> completed + mutex sync.RWMutex +} + +func NewCompletionTracker() *CompletionTracker { + return &CompletionTracker{ + completed: make(map[string]map[string]bool), + } +} + +func (ct *CompletionTracker) MarkCompleted(relayURL string, timeWindow string) { + ct.mutex.Lock() + defer ct.mutex.Unlock() + + if ct.completed[relayURL] == nil { + ct.completed[relayURL] = make(map[string]bool) + } + ct.completed[relayURL][timeWindow] = true +} + +func (ct *CompletionTracker) IsCompleted(relayURL string, timeWindow string) bool { + ct.mutex.RLock() + defer ct.mutex.RUnlock() + + if ct.completed[relayURL] == nil { + return false + } + return ct.completed[relayURL][timeWindow] +} + +func (ct *CompletionTracker) GetCompletionStatus() (completed, total int) { + ct.mutex.RLock() + defer ct.mutex.RUnlock() + + for _, windows := range ct.completed { + for _, isCompleted := range windows { + total++ + if isCompleted { + completed++ + } + } + } + return +} + +type Aggregator struct { + npub string + pubkeyBytes []byte + seenEvents *BloomFilter + seenRelays map[string]bool + relayQueue chan string + relayMutex sync.RWMutex + ctx context.Context + cancel context.CancelFunc + since *timestamp.T + until *timestamp.T + wg sync.WaitGroup + progressiveEnd *timestamp.T + memoryTicker *time.Ticker + eventCount uint64 + relayStates map[string]*RelayState + relayStatesMutex sync.RWMutex + completionTracker *CompletionTracker + timeWindows []TimeWindow +} + +func NewAggregator(npub string, since, until *timestamp.T) (agg *Aggregator, err error) { + // Decode npub to get pubkey bytes + var pubkeyBytes []byte + if pubkeyBytes, err = bech32encoding.NpubToBytes(npub); chk.E(err) { + return nil, fmt.Errorf("failed to decode npub: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + // Set progressive end to current time if until is not specified + progressiveEnd := until + if progressiveEnd == nil { + progressiveEnd = timestamp.Now() + } + + agg = &Aggregator{ + npub: npub, + pubkeyBytes: pubkeyBytes, + seenEvents: NewBloomFilter(bloomFilterBits, bloomFilterHashFuncs), + seenRelays: make(map[string]bool), + relayQueue: make(chan string, 100), + ctx: ctx, + cancel: cancel, + since: since, + until: until, + progressiveEnd: progressiveEnd, + memoryTicker: time.NewTicker(memoryCheckInterval), + eventCount: 0, + relayStates: make(map[string]*RelayState), + completionTracker: NewCompletionTracker(), + } + + // Calculate time windows for progressive fetching + agg.calculateTimeWindows() + + // Add initial relays to queue + for _, relayURL := range relays { + agg.addRelay(relayURL) + } + + return +} + +// calculateTimeWindows pre-calculates all time windows for progressive fetching +func (a *Aggregator) calculateTimeWindows() { + if a.since == nil { + // If no since time, we'll just work backwards from progressiveEnd + // We can't pre-calculate windows without a start time + return + } + + var windows []TimeWindow + currentUntil := a.progressiveEnd + + for currentUntil.I64() > a.since.I64() { + currentSince := timestamp.FromUnix(currentUntil.I64() - int64(batchSize.Seconds())) + if currentSince.I64() < a.since.I64() { + currentSince = a.since + } + + windows = append(windows, TimeWindow{ + since: currentSince, + until: currentUntil, + }) + + currentUntil = currentSince + if currentUntil.I64() <= a.since.I64() { + break + } + } + + a.timeWindows = windows + log.I.F("calculated %d time windows for progressive fetching", len(windows)) +} + +// getOrCreateRelayState gets or creates a relay state for rate limiting +func (a *Aggregator) getOrCreateRelayState(relayURL string) *RelayState { + a.relayStatesMutex.Lock() + defer a.relayStatesMutex.Unlock() + + if state, exists := a.relayStates[relayURL]; exists { + return state + } + + state := &RelayState{ + url: relayURL, + retryCount: 0, + nextRetryTime: time.Now(), + rateLimited: false, + completed: false, + } + a.relayStates[relayURL] = state + return state +} + +// shouldRetryRelay checks if a relay should be retried based on rate limiting +func (a *Aggregator) shouldRetryRelay(relayURL string) bool { + state := a.getOrCreateRelayState(relayURL) + state.mutex.RLock() + defer state.mutex.RUnlock() + + if state.completed { + return false + } + + if state.rateLimited && time.Now().Before(state.nextRetryTime) { + return false + } + + return state.retryCount < maxRetries +} + +// markRelayRateLimited marks a relay as rate limited and sets retry time +func (a *Aggregator) markRelayRateLimited(relayURL string) { + state := a.getOrCreateRelayState(relayURL) + state.mutex.Lock() + defer state.mutex.Unlock() + + state.rateLimited = true + state.retryCount++ + + // Exponential backoff with jitter + delay := time.Duration(float64(baseRetryDelay) * math.Pow(2, float64(state.retryCount-1))) + if delay > maxRetryDelay { + delay = maxRetryDelay + } + + state.nextRetryTime = time.Now().Add(delay) + log.W.F("relay %s rate limited, retry %d/%d in %v", relayURL, state.retryCount, maxRetries, delay) +} + +// markRelayCompleted marks a relay as completed for all time windows +func (a *Aggregator) markRelayCompleted(relayURL string) { + state := a.getOrCreateRelayState(relayURL) + state.mutex.Lock() + defer state.mutex.Unlock() + + state.completed = true + log.I.F("relay %s marked as completed", relayURL) +} + +// checkAllCompleted checks if all relay-time window combinations are completed +func (a *Aggregator) checkAllCompleted() bool { + if len(a.timeWindows) == 0 { + // If no time windows calculated, we can't determine completion + return false + } + + a.relayStatesMutex.RLock() + allRelays := make([]string, 0, len(a.relayStates)) + for relayURL := range a.relayStates { + allRelays = append(allRelays, relayURL) + } + a.relayStatesMutex.RUnlock() + + // Check if all relay-time window combinations are completed + totalCombinations := len(allRelays) * len(a.timeWindows) + completedCombinations := 0 + + for _, relayURL := range allRelays { + for _, window := range a.timeWindows { + windowKey := fmt.Sprintf("%d-%d", window.since.I64(), window.until.I64()) + if a.completionTracker.IsCompleted(relayURL, windowKey) { + completedCombinations++ + } + } + } + + if totalCombinations > 0 { + progress := float64(completedCombinations) / float64(totalCombinations) * 100 + log.I.F("completion progress: %d/%d (%.1f%%)", completedCombinations, totalCombinations, progress) + return completedCombinations == totalCombinations + } + + return false +} + +func (a *Aggregator) isEventSeen(eventID string) (seen bool) { + return a.seenEvents.Contains([]byte(eventID)) +} + +func (a *Aggregator) markEventSeen(eventID string) { + a.seenEvents.Add([]byte(eventID)) + a.eventCount++ +} + +func (a *Aggregator) addRelay(relayURL string) { + a.relayMutex.Lock() + defer a.relayMutex.Unlock() + + if !a.seenRelays[relayURL] { + a.seenRelays[relayURL] = true + select { + case a.relayQueue <- relayURL: + log.I.F("added new relay to queue: %s", relayURL) + default: + log.W.F("relay queue full, skipping: %s", relayURL) + } + } +} + +func (a *Aggregator) processRelayListEvent(ev *event.E) { + // Extract relay URLs from "r" tags in kind 10002 events + if ev.Kind != 10002 { // RelayListMetadata + return + } + + log.I.F("processing relay list event from %s", hex.Enc(ev.Pubkey)) + + for _, tag := range ev.Tags.GetAll([]byte("r")) { + if len(tag.T) >= 2 { + relayURL := string(tag.T[1]) + if relayURL != "" { + log.I.F("discovered relay from relay list: %s", relayURL) + a.addRelay(relayURL) + } + } + } +} + +func (a *Aggregator) outputEvent(ev *event.E) (err error) { + // Convert event to JSON and output to stdout + var jsonBytes []byte + if jsonBytes, err = json.Marshal(map[string]interface{}{ + "id": hex.Enc(ev.ID), + "pubkey": hex.Enc(ev.Pubkey), + "created_at": ev.CreatedAt, + "kind": ev.Kind, + "tags": ev.Tags, + "content": string(ev.Content), + "sig": hex.Enc(ev.Sig), + }); chk.E(err) { + return fmt.Errorf("failed to marshal event to JSON: %w", err) + } + + fmt.Println(string(jsonBytes)) + return +} + +func (a *Aggregator) connectToRelay(relayURL string) { + defer func() { + log.I.F("relay connection finished: %s", relayURL) + a.wg.Done() + }() + + log.I.F("connecting to relay: %s", relayURL) + + // Create context with timeout for connection + connCtx, connCancel := context.WithTimeout(a.ctx, 10*time.Second) + defer connCancel() + + // Connect to relay + var client *ws.Client + var err error + if client, err = ws.RelayConnect(connCtx, relayURL); chk.E(err) { + log.E.F("failed to connect to relay %s: %v", relayURL, err) + return + } + defer client.Close() + + log.I.F("connected to relay: %s", relayURL) + + // Perform progressive backward fetching + a.progressiveFetch(client, relayURL) +} + +func (a *Aggregator) progressiveFetch(client *ws.Client, relayURL string) { + // Check if relay should be retried + if !a.shouldRetryRelay(relayURL) { + log.W.F("skipping relay %s due to rate limiting or max retries", relayURL) + return + } + + // Create hex-encoded pubkey for p-tags + pubkeyHex := hex.Enc(a.pubkeyBytes) + + // Use pre-calculated time windows if available, otherwise calculate on the fly + var windows []TimeWindow + if len(a.timeWindows) > 0 { + windows = a.timeWindows + } else { + // Fallback to dynamic calculation for unlimited time ranges + currentUntil := a.progressiveEnd + for { + currentSince := timestamp.FromUnix(currentUntil.I64() - int64(batchSize.Seconds())) + if a.since != nil && currentSince.I64() < a.since.I64() { + currentSince = a.since + } + + windows = append(windows, TimeWindow{ + since: currentSince, + until: currentUntil, + }) + + currentUntil = currentSince + if a.since != nil && currentUntil.I64() <= a.since.I64() { + break + } + + // Prevent infinite loops for unlimited ranges + if len(windows) > 1000 { + log.W.F("limiting to 1000 time windows for relay %s", relayURL) + break + } + } + } + + // Process each time window + for _, window := range windows { + windowKey := fmt.Sprintf("%d-%d", window.since.I64(), window.until.I64()) + + // Skip if already completed + if a.completionTracker.IsCompleted(relayURL, windowKey) { + continue + } + + select { + case <-a.ctx.Done(): + log.I.F("context cancelled, stopping progressive fetch for relay %s", relayURL) + return + default: + } + + log.I.F("fetching batch from %s: %d to %d", relayURL, window.since.I64(), window.until.I64()) + + // Try to fetch this time window with retry logic + success := a.fetchTimeWindow(client, relayURL, window, pubkeyHex) + + if success { + // Mark this time window as completed for this relay + a.completionTracker.MarkCompleted(relayURL, windowKey) + } else { + // If fetch failed, mark relay as rate limited and return + a.markRelayRateLimited(relayURL) + return + } + } + + // Mark relay as completed for all time windows + a.markRelayCompleted(relayURL) + log.I.F("completed all time windows for relay %s", relayURL) +} + +func (a *Aggregator) fetchTimeWindow(client *ws.Client, relayURL string, window TimeWindow, pubkeyHex string) bool { + // Create filters for this time batch + f1 := &filter.F{ + Authors: tag.NewFromBytesSlice(a.pubkeyBytes), + Since: window.since, + Until: window.until, + } + + f2 := &filter.F{ + Tags: tag.NewSWithCap(1), + Since: window.since, + Until: window.until, + } + pTag := tag.NewFromAny("p", pubkeyHex) + f2.Tags.Append(pTag) + + // Add relay list filter to discover new relays + f3 := &filter.F{ + Authors: tag.NewFromBytesSlice(a.pubkeyBytes), + Kinds: kind.NewS(kind.New(10002)), // RelayListMetadata + Since: window.since, + Until: window.until, + } + + // Subscribe to events using all filters + var sub *ws.Subscription + var err error + if sub, err = client.Subscribe(a.ctx, filter.NewS(f1, f2, f3)); chk.E(err) { + log.E.F("failed to subscribe to relay %s: %v", relayURL, err) + return false + } + + log.I.F("subscribed to batch from %s for pubkey %s (authored by, mentioning, and relay lists)", relayURL, a.npub) + + // Process events for this batch + batchComplete := false + rateLimited := false + + for !batchComplete && !rateLimited { + select { + case <-a.ctx.Done(): + sub.Unsub() + log.I.F("context cancelled, stopping batch for relay %s", relayURL) + return false + case ev := <-sub.Events: + if ev == nil { + log.I.F("event channel closed for relay %s", relayURL) + sub.Unsub() + return false + } + + eventID := hex.Enc(ev.ID) + + // Check if we've already seen this event + if a.isEventSeen(eventID) { + continue + } + + // Mark event as seen + a.markEventSeen(eventID) + + // Process relay list events to discover new relays + if ev.Kind == 10002 { + a.processRelayListEvent(ev) + } + + // Output event to stdout + if err = a.outputEvent(ev); chk.E(err) { + log.E.F("failed to output event: %v", err) + } + case <-sub.EndOfStoredEvents: + log.I.F("end of stored events for batch on relay %s", relayURL) + batchComplete = true + case reason := <-sub.ClosedReason: + reasonStr := string(reason) + log.W.F("subscription closed for relay %s: %s", relayURL, reasonStr) + + // Check for rate limiting messages + if a.isRateLimitMessage(reasonStr) { + log.W.F("detected rate limiting from relay %s", relayURL) + rateLimited = true + } + + sub.Unsub() + return !rateLimited + // Note: NOTICE messages are handled at the client level, not subscription level + // Rate limiting detection will primarily rely on CLOSED messages + } + } + + sub.Unsub() + return !rateLimited +} + +// isRateLimitMessage checks if a message indicates rate limiting +func (a *Aggregator) isRateLimitMessage(message string) bool { + message = strings.ToLower(message) + rateLimitIndicators := []string{ + "too many", + "rate limit", + "slow down", + "concurrent req", + "throttle", + "backoff", + } + + for _, indicator := range rateLimitIndicators { + if strings.Contains(message, indicator) { + return true + } + } + return false +} + +func (a *Aggregator) Start() (err error) { + log.I.F("starting aggregator for npub: %s", a.npub) + log.I.F("pubkey bytes: %s", hex.Enc(a.pubkeyBytes)) + log.I.F("bloom filter: %d bits (%.2fMB), %d hash functions, ~0.1%% false positive rate", + bloomFilterBits, float64(a.seenEvents.MemoryUsage())/1024/1024, bloomFilterHashFuncs) + + // Start memory monitoring goroutine + go a.memoryMonitor() + + // Start relay processor goroutine + go a.processRelayQueue() + + // Start completion monitoring goroutine + go a.completionMonitor() + + // Add initial relay count to wait group + a.wg.Add(len(relays)) + log.I.F("waiting for %d initial relay connections to complete", len(relays)) + + // Wait for all relay connections to finish OR completion + done := make(chan struct{}) + go func() { + a.wg.Wait() + close(done) + }() + + select { + case <-done: + log.I.F("all relay connections completed") + case <-a.ctx.Done(): + log.I.F("aggregator terminated due to completion") + } + + // Stop memory monitoring + a.memoryTicker.Stop() + + // Output bloom filter summary + a.outputBloomFilter() + + log.I.F("aggregator finished") + return +} + +// completionMonitor periodically checks if all work is completed and terminates if so +func (a *Aggregator) completionMonitor() { + ticker := time.NewTicker(10 * time.Second) // Check every 10 seconds + defer ticker.Stop() + + for { + select { + case <-a.ctx.Done(): + return + case <-ticker.C: + if a.checkAllCompleted() { + log.I.F("all relay-time window combinations completed, terminating aggregator") + a.cancel() // This will trigger context cancellation + return + } + + // Also check for rate-limited relays that can be retried + a.retryRateLimitedRelays() + } + } +} + +// retryRateLimitedRelays checks for rate-limited relays that can be retried +func (a *Aggregator) retryRateLimitedRelays() { + a.relayStatesMutex.RLock() + defer a.relayStatesMutex.RUnlock() + + for relayURL, state := range a.relayStates { + state.mutex.RLock() + canRetry := state.rateLimited && + time.Now().After(state.nextRetryTime) && + state.retryCount < maxRetries && + !state.completed + state.mutex.RUnlock() + + if canRetry { + log.I.F("retrying rate-limited relay: %s", relayURL) + + // Reset rate limiting status + state.mutex.Lock() + state.rateLimited = false + state.mutex.Unlock() + + // Add back to queue for retry + select { + case a.relayQueue <- relayURL: + a.wg.Add(1) + default: + log.W.F("relay queue full, skipping retry for %s", relayURL) + } + } + } +} + +func (a *Aggregator) processRelayQueue() { + initialRelayCount := len(relays) + processedInitial := 0 + + for { + select { + case <-a.ctx.Done(): + log.I.F("relay queue processor stopping") + return + case relayURL := <-a.relayQueue: + log.I.F("processing relay from queue: %s", relayURL) + + // For dynamically discovered relays (after initial ones), add to wait group + if processedInitial >= initialRelayCount { + a.wg.Add(1) + } else { + processedInitial++ + } + + go a.connectToRelay(relayURL) + } + } +} + +func (a *Aggregator) Stop() { + a.cancel() + if a.memoryTicker != nil { + a.memoryTicker.Stop() + } +} + +// outputBloomFilter outputs the bloom filter as base64 to stderr with statistics +func (a *Aggregator) outputBloomFilter() { + base64Filter := a.seenEvents.ToBase64() + estimatedEvents := a.seenEvents.EstimatedItems() + memoryUsage := float64(a.seenEvents.MemoryUsage()) / 1024 / 1024 + + // Output to stderr so it doesn't interfere with JSONL event output to stdout + fmt.Fprintf(os.Stderr, "\n=== BLOOM FILTER SUMMARY ===\n") + fmt.Fprintf(os.Stderr, "Events processed: %d\n", a.eventCount) + fmt.Fprintf(os.Stderr, "Estimated unique events: %d\n", estimatedEvents) + fmt.Fprintf(os.Stderr, "Bloom filter size: %.2f MB\n", memoryUsage) + fmt.Fprintf(os.Stderr, "False positive rate: ~0.1%%\n") + fmt.Fprintf(os.Stderr, "Hash functions: %d\n", bloomFilterHashFuncs) + fmt.Fprintf(os.Stderr, "\nBloom filter (base64):\n%s\n", base64Filter) + fmt.Fprintf(os.Stderr, "=== END BLOOM FILTER ===\n") +} + +// getMemoryUsageMB returns current memory usage in MB +func (a *Aggregator) getMemoryUsageMB() float64 { + var m runtime.MemStats + runtime.ReadMemStats(&m) + return float64(m.Alloc) / 1024 / 1024 +} + +// memoryMonitor monitors memory usage and logs statistics +func (a *Aggregator) memoryMonitor() { + for { + select { + case <-a.ctx.Done(): + log.I.F("memory monitor stopping") + return + case <-a.memoryTicker.C: + memUsage := a.getMemoryUsageMB() + bloomMemMB := float64(a.seenEvents.MemoryUsage()) / 1024 / 1024 + estimatedEvents := a.seenEvents.EstimatedItems() + + log.I.F("memory stats: total=%.2fMB, bloom=%.2fMB, events=%d, estimated_events=%d", + memUsage, bloomMemMB, a.eventCount, estimatedEvents) + + // Check if we're approaching memory limits + if memUsage > maxMemoryMB { + log.W.F("high memory usage detected: %.2fMB (limit: %dMB)", memUsage, maxMemoryMB) + + // Force garbage collection + runtime.GC() + + // Check again after GC + newMemUsage := a.getMemoryUsageMB() + log.I.F("memory usage after GC: %.2fMB", newMemUsage) + + // If still too high, warn but continue (bloom filter has fixed size) + if newMemUsage > maxMemoryMB*1.2 { + log.E.F("critical memory usage: %.2fMB, but continuing with bloom filter", newMemUsage) + } + } + } + } +} + +func parseTimestamp(s string) (ts *timestamp.T, err error) { + if s == "" { + return nil, nil + } + + var t int64 + if t, err = strconv.ParseInt(s, 10, 64); chk.E(err) { + return nil, fmt.Errorf("invalid timestamp format: %w", err) + } + + ts = timestamp.FromUnix(t) + return +} + +func main() { + var npub string + var sinceStr string + var untilStr string + + flag.StringVar(&npub, "npub", "", "npub (bech32-encoded public key) to search for events") + flag.StringVar(&sinceStr, "since", "", "start timestamp (Unix timestamp) - only events after this time") + flag.StringVar(&untilStr, "until", "", "end timestamp (Unix timestamp) - only events before this time") + flag.Parse() + + if npub == "" { + fmt.Fprintf(os.Stderr, "Usage: %s -npub [-since ] [-until ]\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Example: %s -npub npub1... -since 1640995200 -until 1672531200\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "\nTimestamps should be Unix timestamps (seconds since epoch)\n") + os.Exit(1) + } + + var since, until *timestamp.T + var err error + + if since, err = parseTimestamp(sinceStr); chk.E(err) { + fmt.Fprintf(os.Stderr, "Error parsing since timestamp: %v\n", err) + os.Exit(1) + } + + if until, err = parseTimestamp(untilStr); chk.E(err) { + fmt.Fprintf(os.Stderr, "Error parsing until timestamp: %v\n", err) + os.Exit(1) + } + + // Validate that since is before until if both are provided + if since != nil && until != nil && since.I64() >= until.I64() { + fmt.Fprintf(os.Stderr, "Error: since timestamp must be before until timestamp\n") + os.Exit(1) + } + + var agg *Aggregator + if agg, err = NewAggregator(npub, since, until); chk.E(err) { + fmt.Fprintf(os.Stderr, "Error creating aggregator: %v\n", err) + os.Exit(1) + } + + if err = agg.Start(); chk.E(err) { + fmt.Fprintf(os.Stderr, "Error running aggregator: %v\n", err) + os.Exit(1) + } +} diff --git a/pkg/version/version b/pkg/version/version index 6fa3e08..26b3c0e 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.17.14 \ No newline at end of file +v0.17.15 \ No newline at end of file