From da1119db7c806a5576bf653f154e8841110bccf0 Mon Sep 17 00:00:00 2001 From: mleku Date: Thu, 23 Oct 2025 13:00:01 +0100 Subject: [PATCH] Enhance aggregator functionality for Nostr event collection - Updated the aggregator to support both public (npub) and private (nsec) key inputs for event searching, enabling authentication for relays that require it. - Implemented bloom filter loading and appending capabilities for efficient incremental data collection. - Added timeout parameters for maximum runtime and stuck progress detection to improve reliability. - Enhanced README with detailed usage instructions, authentication behavior, and examples for incremental collection. - Bumped version to v0.17.16. --- cmd/aggregator/README.md | 184 +++++++++++++++++++++++- cmd/aggregator/main.go | 304 ++++++++++++++++++++++++++++++++++++--- pkg/version/version | 2 +- 3 files changed, 460 insertions(+), 30 deletions(-) diff --git a/cmd/aggregator/README.md b/cmd/aggregator/README.md index c44f0ca..c332585 100644 --- a/cmd/aggregator/README.md +++ b/cmd/aggregator/README.md @@ -5,45 +5,129 @@ A comprehensive program that searches for all events related to a specific npub ## Usage ```bash -go run main.go -npub [-since ] [-until ] +go run main.go -key [-since ] [-until ] [-filter ] [-output ] ``` Where: -- `` is a bech32-encoded Nostr public key (starting with "npub1") +- `` is either a bech32-encoded Nostr private key (nsec1...) or public key (npub1...) - `` is a Unix timestamp (seconds since epoch) - optional +- `` is a file path for bloom filter input/output - optional + +### Parameters + +- **`-key`**: Required. The bech32-encoded Nostr key to search for events + - **nsec**: Private key (enables authentication to relays that require it) + - **npub**: Public key (authentication disabled) +- **`-since`**: Optional. Start timestamp (Unix seconds). Only events after this time +- **`-until`**: Optional. End timestamp (Unix seconds). Only events before this time +- **`-filter`**: Optional. File containing base64-encoded bloom filter from previous runs +- **`-output`**: Optional. Output file for events (default: stdout) + +### Authentication + +When using an **nsec** (private key), the aggregator will: +- Derive the public key from the private key for event searching +- Attempt to authenticate to relays that require it (NIP-42) +- Continue working even if authentication fails on some relays +- Log authentication success/failure for each relay + +When using an **npub** (public key), the aggregator will: +- Search for events using the provided public key +- Skip authentication (no private key available) +- Work with public relays that don't require authentication + +### Behavior + +- **Without `-filter`**: Creates new bloom filter, outputs to stdout or truncates output file +- **With `-filter`**: Loads existing bloom filter, automatically appends to output file +- **Bloom filter output**: Always written to stderr with timestamp information and base64 data ## Examples +### Basic Usage + ```bash -# Get all events related to a user (authored by and mentioning) -go run main.go -npub npub1234567890abcdef... +# Get all events related to a user using public key (no authentication) +go run main.go -key npub1234567890abcdef... + +# Get all events related to a user using private key (with authentication) +go run main.go -key nsec1234567890abcdef... # Get events related to a user since January 1, 2022 -go run main.go -npub npub1234567890abcdef... -since 1640995200 +go run main.go -key npub1234567890abcdef... -since 1640995200 # Get events related to a user between two dates -go run main.go -npub npub1234567890abcdef... -since 1640995200 -until 1672531200 +go run main.go -key npub1234567890abcdef... -since 1640995200 -until 1672531200 # Get events related to a user until December 31, 2022 -go run main.go -npub npub1234567890abcdef... -until 1672531200 +go run main.go -key npub1234567890abcdef... -until 1672531200 +``` + +### Incremental Collection with Bloom Filter + +```bash +# First run: Collect initial events and save bloom filter (using npub) +go run main.go -key npub1234567890abcdef... -since 1640995200 -until 1672531200 -output events.jsonl 2>bloom_filter.txt + +# Second run: Continue from where we left off, append new events (using nsec for auth) +go run main.go -key nsec1234567890abcdef... -since 1672531200 -until 1704067200 -filter bloom_filter.txt -output events.jsonl 2>bloom_filter_updated.txt + +# Third run: Collect even more recent events +go run main.go -key nsec1234567890abcdef... -since 1704067200 -filter bloom_filter_updated.txt -output events.jsonl 2>bloom_filter_final.txt +``` + +### Output Redirection + +```bash +# Events to file, bloom filter to stderr (visible in terminal) +go run main.go -key npub1... -output events.jsonl + +# Events to file, bloom filter to separate file +go run main.go -key npub1... -output events.jsonl 2>bloom_filter.txt + +# Events to stdout, bloom filter to file (useful for piping events) +go run main.go -key npub1... 2>bloom_filter.txt | jq . + +# Using nsec for authentication to access private relays +go run main.go -key nsec1... -output events.jsonl 2>bloom_filter.txt ``` ## Features +### Core Functionality - **Comprehensive event discovery**: Finds both events authored by the user and events that mention the user - **Dynamic relay discovery**: Automatically discovers and connects to new relays from relay list events (kind 10002) - **Progressive backward fetching**: Systematically collects historical data in time-based batches - **Triple filter approach**: Uses separate filters for authored events, p-tag mentions, and relay list events - **Intelligent time management**: Works backwards from current time (or until timestamp) to since timestamp + +### Authentication & Access +- **Private key support**: Use nsec keys to authenticate to relays that require it (NIP-42) +- **Public key compatibility**: Continue to work with npub keys for public relay access +- **Graceful fallback**: Continue operation even if authentication fails on some relays +- **Auth-required relay access**: Access private notes and restricted content on authenticated relays +- **Flexible key input**: Automatically detects and handles both nsec and npub key formats + +### Memory Management - **Memory-efficient deduplication**: Uses bloom filter with ~0.1% false positive rate instead of unbounded maps - **Fixed memory footprint**: Bloom filter uses only ~1.75MB for 1M events with controlled memory growth - **Memory monitoring**: Real-time memory usage tracking and automatic garbage collection +- **Persistent deduplication**: Bloom filter can be saved and reused across multiple runs + +### Incremental Collection +- **Bloom filter persistence**: Save deduplication state between runs for efficient incremental collection +- **Automatic append mode**: When loading existing bloom filter, automatically appends to output file +- **Timestamp tracking**: Records actual time range of processed events in bloom filter output +- **Seamless continuation**: Resume collection from where previous run left off without duplicates + +### Reliability & Performance - Connects to multiple relays simultaneously with dynamic expansion - Outputs events in JSONL format (one JSON object per line) - Handles connection failures gracefully - Continues running until all relay connections are closed - Time-based filtering with Unix timestamps (since/until parameters) - Input validation for timestamp ranges +- Rate limiting and backoff for relay connection management ## Event Discovery @@ -70,6 +154,61 @@ The aggregator uses an intelligent progressive backward fetching strategy: 4. **Efficient processing**: Processes each time batch completely before moving to the next 5. **Boundary respect**: Stops when reaching the since timestamp or beginning of available data +## Incremental Collection Workflow + +The aggregator supports efficient incremental data collection using persistent bloom filters. This allows you to build comprehensive event archives over time without re-processing duplicate events. + +### How It Works + +1. **First Run**: Creates a new bloom filter and collects events for the specified time range +2. **Bloom Filter Output**: At completion, outputs bloom filter summary to stderr with: + - Event statistics (processed count, estimated unique events) + - Time range covered (actual timestamps of collected events) + - Base64-encoded bloom filter data for reuse +3. **Subsequent Runs**: Load the saved bloom filter to skip already-seen events +4. **Automatic Append**: When using an existing filter, new events are appended to the output file + +### Bloom Filter Output Format + +The bloom filter output includes comprehensive metadata: + +``` +=== BLOOM FILTER SUMMARY === +Events processed: 1247 +Estimated unique events: 1247 +Bloom filter size: 1.75 MB +False positive rate: ~0.1% +Hash functions: 10 +Time range covered: 1640995200 to 1672531200 +Time range (human): 2022-01-01T00:00:00Z to 2023-01-01T00:00:00Z + +Bloom filter (base64): +[base64-encoded binary data] +=== END BLOOM FILTER === +``` + +### Best Practices + +- **Save bloom filters**: Always redirect stderr to a file to preserve the bloom filter +- **Sequential time ranges**: Use non-overlapping time ranges for optimal efficiency +- **Regular updates**: Update your bloom filter file after each run for the latest state +- **Backup filters**: Keep copies of bloom filter files for different time periods + +### Example Workflow + +```bash +# Month 1: January 2022 (using npub for public relays) +go run main.go -key npub1... -since 1640995200 -until 1643673600 -output jan2022.jsonl 2>filter_jan.txt + +# Month 2: February 2022 (using nsec for auth-required relays, append to same file) +go run main.go -key nsec1... -since 1643673600 -until 1646092800 -filter filter_jan.txt -output all_events.jsonl 2>filter_feb.txt + +# Month 3: March 2022 (continue with authentication for complete coverage) +go run main.go -key nsec1... -since 1646092800 -until 1648771200 -filter filter_feb.txt -output all_events.jsonl 2>filter_mar.txt + +# Result: all_events.jsonl contains deduplicated events from all three months, including private relay content +``` + ## Memory Management The aggregator uses advanced memory management techniques to handle large-scale data collection: @@ -108,6 +247,8 @@ The program starts with the following initial relays: ## Output Format +### Event Output (stdout or -output file) + Each line of output is a JSON object representing a Nostr event with the following fields: - `id`: Event ID (hex) @@ -117,3 +258,32 @@ Each line of output is a JSON object representing a Nostr event with the followi - `tags`: Array of tag arrays - `content`: Event content string - `sig`: Event signature (hex) + +### Bloom Filter Output (stderr) + +At program completion, a comprehensive bloom filter summary is written to stderr containing: + +- **Statistics**: Event counts, memory usage, performance metrics +- **Time Range**: Actual timestamp range of collected events (both Unix and human-readable) +- **Configuration**: Bloom filter parameters (size, hash functions, false positive rate) +- **Binary Data**: Base64-encoded bloom filter for reuse in subsequent runs + +The bloom filter output is structured with clear markers (`=== BLOOM FILTER SUMMARY ===` and `=== END BLOOM FILTER ===`) making it easy to parse and extract the base64 data programmatically. + +### Output Separation + +- **Events**: Always go to stdout (default) or the file specified by `-output` +- **Bloom Filter**: Always goes to stderr, allowing separate redirection +- **Logs**: Runtime information and progress updates go to stderr + +This separation allows flexible output handling: +```bash +# Events to file, bloom filter visible in terminal +./aggregator -npub npub1... -output events.jsonl + +# Both events and bloom filter to separate files +./aggregator -npub npub1... -output events.jsonl 2>bloom_filter.txt + +# Events piped to another program, bloom filter saved +./aggregator -npub npub1... 2>bloom_filter.txt | jq '.content' +``` diff --git a/cmd/aggregator/main.go b/cmd/aggregator/main.go index e101a95..bc160db 100644 --- a/cmd/aggregator/main.go +++ b/cmd/aggregator/main.go @@ -17,6 +17,7 @@ import ( "lol.mleku.dev/chk" "lol.mleku.dev/log" + "next.orly.dev/pkg/crypto/p256k" "next.orly.dev/pkg/crypto/sha256" "next.orly.dev/pkg/encoders/bech32encoding" "next.orly.dev/pkg/encoders/event" @@ -25,6 +26,7 @@ import ( "next.orly.dev/pkg/encoders/kind" "next.orly.dev/pkg/encoders/tag" "next.orly.dev/pkg/encoders/timestamp" + "next.orly.dev/pkg/interfaces/signer" "next.orly.dev/pkg/protocol/ws" ) @@ -40,6 +42,11 @@ const ( maxRetryDelay = 60 * time.Second maxRetries = 5 batchSize = time.Hour * 24 * 7 // 1 week batches + + // Timeout parameters + maxRunTime = 30 * time.Minute // Maximum total runtime + relayTimeout = 5 * time.Minute // Timeout per relay + stuckProgressTimeout = 2 * time.Minute // Timeout if no progress is made ) var relays = []string{ @@ -297,13 +304,57 @@ type Aggregator struct { relayStatesMutex sync.RWMutex completionTracker *CompletionTracker timeWindows []TimeWindow + // Track actual time range of processed events + actualSince *timestamp.T + actualUntil *timestamp.T + timeMutex sync.RWMutex + // Bloom filter file for loading existing state + bloomFilterFile string + appendMode bool + // Progress tracking for timeout detection + startTime time.Time + lastProgress int + lastProgressTime time.Time + progressMutex sync.RWMutex + // Authentication support + signer signer.I // Optional signer for relay authentication + hasPrivateKey bool // Whether we have a private key for auth } -func NewAggregator(npub string, since, until *timestamp.T) (agg *Aggregator, err error) { - // Decode npub to get pubkey bytes +func NewAggregator(keyInput string, since, until *timestamp.T, bloomFilterFile string) (agg *Aggregator, err error) { var pubkeyBytes []byte - if pubkeyBytes, err = bech32encoding.NpubToBytes(npub); chk.E(err) { - return nil, fmt.Errorf("failed to decode npub: %w", err) + var signer signer.I + var hasPrivateKey bool + + // Determine if input is nsec (private key) or npub (public key) + if strings.HasPrefix(keyInput, "nsec") { + // Handle nsec (private key) - derive pubkey and enable authentication + var secretBytes []byte + if secretBytes, err = bech32encoding.NsecToBytes(keyInput); chk.E(err) { + return nil, fmt.Errorf("failed to decode nsec: %w", err) + } + + // Create signer from private key + signer = &p256k.Signer{} + if err = signer.InitSec(secretBytes); chk.E(err) { + return nil, fmt.Errorf("failed to initialize signer: %w", err) + } + + // Get public key from signer + pubkeyBytes = signer.Pub() + hasPrivateKey = true + + log.I.F("using private key (nsec) - authentication enabled") + } else if strings.HasPrefix(keyInput, "npub") { + // Handle npub (public key only) - no authentication + if pubkeyBytes, err = bech32encoding.NpubToBytes(keyInput); chk.E(err) { + return nil, fmt.Errorf("failed to decode npub: %w", err) + } + hasPrivateKey = false + + log.I.F("using public key (npub) - authentication disabled") + } else { + return nil, fmt.Errorf("key input must start with 'nsec' or 'npub', got: %s", keyInput[:4]) } ctx, cancel := context.WithCancel(context.Background()) @@ -314,10 +365,27 @@ func NewAggregator(npub string, since, until *timestamp.T) (agg *Aggregator, err progressiveEnd = timestamp.Now() } + // Initialize bloom filter - either new or loaded from file + var bloomFilter *BloomFilter + var appendMode bool + + if bloomFilterFile != "" { + // Try to load existing bloom filter + if bloomFilter, err = loadBloomFilterFromFile(bloomFilterFile); err != nil { + log.W.F("failed to load bloom filter from %s: %v, creating new filter", bloomFilterFile, err) + bloomFilter = NewBloomFilter(bloomFilterBits, bloomFilterHashFuncs) + } else { + log.I.F("loaded existing bloom filter from %s", bloomFilterFile) + appendMode = true + } + } else { + bloomFilter = NewBloomFilter(bloomFilterBits, bloomFilterHashFuncs) + } + agg = &Aggregator{ - npub: npub, + npub: keyInput, pubkeyBytes: pubkeyBytes, - seenEvents: NewBloomFilter(bloomFilterBits, bloomFilterHashFuncs), + seenEvents: bloomFilter, seenRelays: make(map[string]bool), relayQueue: make(chan string, 100), ctx: ctx, @@ -329,6 +397,13 @@ func NewAggregator(npub string, since, until *timestamp.T) (agg *Aggregator, err eventCount: 0, relayStates: make(map[string]*RelayState), completionTracker: NewCompletionTracker(), + bloomFilterFile: bloomFilterFile, + appendMode: appendMode, + startTime: time.Now(), + lastProgress: 0, + lastProgressTime: time.Now(), + signer: signer, + hasPrivateKey: hasPrivateKey, } // Calculate time windows for progressive fetching @@ -342,6 +417,54 @@ func NewAggregator(npub string, since, until *timestamp.T) (agg *Aggregator, err return } +// loadBloomFilterFromFile loads a bloom filter from a file containing base64 encoded data +func loadBloomFilterFromFile(filename string) (*BloomFilter, error) { + data, err := os.ReadFile(filename) + if err != nil { + return nil, fmt.Errorf("failed to read file: %w", err) + } + + // Find the base64 data between the markers + content := string(data) + startMarker := "Bloom filter (base64):\n" + endMarker := "\n=== END BLOOM FILTER ===" + + startIdx := strings.Index(content, startMarker) + if startIdx == -1 { + return nil, fmt.Errorf("bloom filter start marker not found") + } + startIdx += len(startMarker) + + endIdx := strings.Index(content[startIdx:], endMarker) + if endIdx == -1 { + return nil, fmt.Errorf("bloom filter end marker not found") + } + + base64Data := strings.TrimSpace(content[startIdx : startIdx+endIdx]) + return FromBase64(base64Data) +} + +// updateActualTimeRange updates the actual time range of processed events +func (a *Aggregator) updateActualTimeRange(eventTime *timestamp.T) { + a.timeMutex.Lock() + defer a.timeMutex.Unlock() + + if a.actualSince == nil || eventTime.I64() < a.actualSince.I64() { + a.actualSince = eventTime + } + + if a.actualUntil == nil || eventTime.I64() > a.actualUntil.I64() { + a.actualUntil = eventTime + } +} + +// getActualTimeRange returns the actual time range of processed events +func (a *Aggregator) getActualTimeRange() (since, until *timestamp.T) { + a.timeMutex.RLock() + defer a.timeMutex.RUnlock() + return a.actualSince, a.actualUntil +} + // calculateTimeWindows pre-calculates all time windows for progressive fetching func (a *Aggregator) calculateTimeWindows() { if a.since == nil { @@ -420,6 +543,12 @@ func (a *Aggregator) markRelayRateLimited(relayURL string) { state.rateLimited = true state.retryCount++ + if state.retryCount >= maxRetries { + log.W.F("relay %s permanently failed after %d retries", relayURL, maxRetries) + state.completed = true // Mark as completed to exclude from future attempts + return + } + // Exponential backoff with jitter delay := time.Duration(float64(baseRetryDelay) * math.Pow(2, float64(state.retryCount-1))) if delay > maxRetryDelay { @@ -457,19 +586,43 @@ func (a *Aggregator) checkAllCompleted() bool { // Check if all relay-time window combinations are completed totalCombinations := len(allRelays) * len(a.timeWindows) completedCombinations := 0 + availableCombinations := 0 // Combinations from relays that haven't permanently failed for _, relayURL := range allRelays { + state := a.getOrCreateRelayState(relayURL) + state.mutex.RLock() + isRelayFailed := state.retryCount >= maxRetries + state.mutex.RUnlock() + for _, window := range a.timeWindows { windowKey := fmt.Sprintf("%d-%d", window.since.I64(), window.until.I64()) if a.completionTracker.IsCompleted(relayURL, windowKey) { completedCombinations++ } + + // Only count combinations from relays that haven't permanently failed + if !isRelayFailed { + availableCombinations++ + } } } + // Update progress tracking + a.progressMutex.Lock() + if completedCombinations > a.lastProgress { + a.lastProgress = completedCombinations + a.lastProgressTime = time.Now() + } + a.progressMutex.Unlock() + if totalCombinations > 0 { progress := float64(completedCombinations) / float64(totalCombinations) * 100 - log.I.F("completion progress: %d/%d (%.1f%%)", completedCombinations, totalCombinations, progress) + log.I.F("completion progress: %d/%d (%.1f%%) - available: %d", completedCombinations, totalCombinations, progress, availableCombinations) + + // Consider complete if we've finished all available combinations (excluding permanently failed relays) + if availableCombinations > 0 { + return completedCombinations >= availableCombinations + } return completedCombinations == totalCombinations } @@ -561,6 +714,19 @@ func (a *Aggregator) connectToRelay(relayURL string) { log.I.F("connected to relay: %s", relayURL) + // Attempt authentication if we have a private key + if a.hasPrivateKey && a.signer != nil { + authCtx, authCancel := context.WithTimeout(a.ctx, 5*time.Second) + defer authCancel() + + if err = client.Auth(authCtx, a.signer); err != nil { + log.W.F("authentication failed for relay %s: %v", relayURL, err) + // Continue without authentication - some relays may not require it + } else { + log.I.F("successfully authenticated to relay: %s", relayURL) + } + } + // Perform progressive backward fetching a.progressiveFetch(client, relayURL) } @@ -666,14 +832,24 @@ func (a *Aggregator) fetchTimeWindow(client *ws.Client, relayURL string, window Until: window.until, } - // Subscribe to events using all filters + // Subscribe to events using all filters with a dedicated context and timeout + // Use a longer timeout to avoid premature cancellation by completion monitor + subCtx, subCancel := context.WithTimeout(context.Background(), 10*time.Minute) + var sub *ws.Subscription var err error - if sub, err = client.Subscribe(a.ctx, filter.NewS(f1, f2, f3)); chk.E(err) { + if sub, err = client.Subscribe(subCtx, filter.NewS(f1, f2, f3)); chk.E(err) { + subCancel() // Cancel context on error log.E.F("failed to subscribe to relay %s: %v", relayURL, err) return false } + // Ensure subscription is cleaned up when we're done + defer func() { + sub.Unsub() + subCancel() + }() + log.I.F("subscribed to batch from %s for pubkey %s (authored by, mentioning, and relay lists)", relayURL, a.npub) // Process events for this batch @@ -683,13 +859,14 @@ func (a *Aggregator) fetchTimeWindow(client *ws.Client, relayURL string, window for !batchComplete && !rateLimited { select { case <-a.ctx.Done(): - sub.Unsub() - log.I.F("context cancelled, stopping batch for relay %s", relayURL) + log.I.F("aggregator context cancelled, stopping batch for relay %s", relayURL) + return false + case <-subCtx.Done(): + log.W.F("subscription timeout for relay %s", relayURL) return false case ev := <-sub.Events: if ev == nil { log.I.F("event channel closed for relay %s", relayURL) - sub.Unsub() return false } @@ -703,6 +880,9 @@ func (a *Aggregator) fetchTimeWindow(client *ws.Client, relayURL string, window // Mark event as seen a.markEventSeen(eventID) + // Update actual time range + a.updateActualTimeRange(timestamp.FromUnix(ev.CreatedAt)) + // Process relay list events to discover new relays if ev.Kind == 10002 { a.processRelayListEvent(ev) @@ -757,7 +937,7 @@ func (a *Aggregator) isRateLimitMessage(message string) bool { } func (a *Aggregator) Start() (err error) { - log.I.F("starting aggregator for npub: %s", a.npub) + log.I.F("starting aggregator for key: %s", a.npub) log.I.F("pubkey bytes: %s", hex.Enc(a.pubkeyBytes)) log.I.F("bloom filter: %d bits (%.2fMB), %d hash functions, ~0.1%% false positive rate", bloomFilterBits, float64(a.seenEvents.MemoryUsage())/1024/1024, bloomFilterHashFuncs) @@ -809,9 +989,8 @@ func (a *Aggregator) completionMonitor() { case <-a.ctx.Done(): return case <-ticker.C: - if a.checkAllCompleted() { - log.I.F("all relay-time window combinations completed, terminating aggregator") - a.cancel() // This will trigger context cancellation + // Check for various termination conditions + if a.shouldTerminate() { return } @@ -821,6 +1000,38 @@ func (a *Aggregator) completionMonitor() { } } +// shouldTerminate checks various conditions that should cause the aggregator to terminate +func (a *Aggregator) shouldTerminate() bool { + now := time.Now() + + // Check if all work is completed + if a.checkAllCompleted() { + log.I.F("all relay-time window combinations completed, terminating aggregator") + a.cancel() + return true + } + + // Check for maximum runtime timeout + if now.Sub(a.startTime) > maxRunTime { + log.W.F("maximum runtime (%v) exceeded, terminating aggregator", maxRunTime) + a.cancel() + return true + } + + // Check for stuck progress timeout + a.progressMutex.RLock() + timeSinceProgress := now.Sub(a.lastProgressTime) + a.progressMutex.RUnlock() + + if timeSinceProgress > stuckProgressTimeout { + log.W.F("no progress made for %v, terminating aggregator", timeSinceProgress) + a.cancel() + return true + } + + return false +} + // retryRateLimitedRelays checks for rate-limited relays that can be retried func (a *Aggregator) retryRateLimitedRelays() { a.relayStatesMutex.RLock() @@ -890,6 +1101,9 @@ func (a *Aggregator) outputBloomFilter() { estimatedEvents := a.seenEvents.EstimatedItems() memoryUsage := float64(a.seenEvents.MemoryUsage()) / 1024 / 1024 + // Get actual time range of processed events + actualSince, actualUntil := a.getActualTimeRange() + // Output to stderr so it doesn't interfere with JSONL event output to stdout fmt.Fprintf(os.Stderr, "\n=== BLOOM FILTER SUMMARY ===\n") fmt.Fprintf(os.Stderr, "Events processed: %d\n", a.eventCount) @@ -897,6 +1111,23 @@ func (a *Aggregator) outputBloomFilter() { fmt.Fprintf(os.Stderr, "Bloom filter size: %.2f MB\n", memoryUsage) fmt.Fprintf(os.Stderr, "False positive rate: ~0.1%%\n") fmt.Fprintf(os.Stderr, "Hash functions: %d\n", bloomFilterHashFuncs) + + // Output time range information + if actualSince != nil && actualUntil != nil { + fmt.Fprintf(os.Stderr, "Time range covered: %d to %d\n", actualSince.I64(), actualUntil.I64()) + fmt.Fprintf(os.Stderr, "Time range (human): %s to %s\n", + time.Unix(actualSince.I64(), 0).UTC().Format(time.RFC3339), + time.Unix(actualUntil.I64(), 0).UTC().Format(time.RFC3339)) + } else if a.since != nil && a.until != nil { + // Fallback to requested range if no events were processed + fmt.Fprintf(os.Stderr, "Requested time range: %d to %d\n", a.since.I64(), a.until.I64()) + fmt.Fprintf(os.Stderr, "Requested range (human): %s to %s\n", + time.Unix(a.since.I64(), 0).UTC().Format(time.RFC3339), + time.Unix(a.until.I64(), 0).UTC().Format(time.RFC3339)) + } else { + fmt.Fprintf(os.Stderr, "Time range: unbounded\n") + } + fmt.Fprintf(os.Stderr, "\nBloom filter (base64):\n%s\n", base64Filter) fmt.Fprintf(os.Stderr, "=== END BLOOM FILTER ===\n") } @@ -958,19 +1189,28 @@ func parseTimestamp(s string) (ts *timestamp.T, err error) { } func main() { - var npub string + var keyInput string var sinceStr string var untilStr string + var bloomFilterFile string + var outputFile string - flag.StringVar(&npub, "npub", "", "npub (bech32-encoded public key) to search for events") + flag.StringVar(&keyInput, "key", "", "nsec (private key) or npub (public key) to search for events") flag.StringVar(&sinceStr, "since", "", "start timestamp (Unix timestamp) - only events after this time") flag.StringVar(&untilStr, "until", "", "end timestamp (Unix timestamp) - only events before this time") + flag.StringVar(&bloomFilterFile, "filter", "", "file containing base64 encoded bloom filter to exclude already seen events") + flag.StringVar(&outputFile, "output", "", "output file for events (default: stdout)") flag.Parse() - if npub == "" { - fmt.Fprintf(os.Stderr, "Usage: %s -npub [-since ] [-until ]\n", os.Args[0]) - fmt.Fprintf(os.Stderr, "Example: %s -npub npub1... -since 1640995200 -until 1672531200\n", os.Args[0]) + if keyInput == "" { + fmt.Fprintf(os.Stderr, "Usage: %s -key [-since ] [-until ] [-filter ] [-output ]\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Example: %s -key npub1... -since 1640995200 -until 1672531200 -filter bloom.txt -output events.jsonl\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Example: %s -key nsec1... -since 1640995200 -until 1672531200 -output events.jsonl\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "\nKey types:\n") + fmt.Fprintf(os.Stderr, " nsec: Private key (enables authentication to relays that require it)\n") + fmt.Fprintf(os.Stderr, " npub: Public key (authentication disabled)\n") fmt.Fprintf(os.Stderr, "\nTimestamps should be Unix timestamps (seconds since epoch)\n") + fmt.Fprintf(os.Stderr, "If -filter is provided, output will be appended to the output file\n") os.Exit(1) } @@ -993,8 +1233,28 @@ func main() { os.Exit(1) } + // Set up output redirection if needed + if outputFile != "" { + var file *os.File + if bloomFilterFile != "" { + // Append mode if bloom filter is provided + file, err = os.OpenFile(outputFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + } else { + // Truncate mode if no bloom filter + file, err = os.OpenFile(outputFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + } + if err != nil { + fmt.Fprintf(os.Stderr, "Error opening output file: %v\n", err) + os.Exit(1) + } + defer file.Close() + + // Redirect stdout to file + os.Stdout = file + } + var agg *Aggregator - if agg, err = NewAggregator(npub, since, until); chk.E(err) { + if agg, err = NewAggregator(keyInput, since, until, bloomFilterFile); chk.E(err) { fmt.Fprintf(os.Stderr, "Error creating aggregator: %v\n", err) os.Exit(1) } diff --git a/pkg/version/version b/pkg/version/version index 26b3c0e..67bca01 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.17.15 \ No newline at end of file +v0.17.16 \ No newline at end of file