Enhance aggregator functionality for Nostr event collection
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled

- Updated the aggregator to support both public (npub) and private (nsec) key inputs for event searching, enabling authentication for relays that require it.
- Implemented bloom filter loading and appending capabilities for efficient incremental data collection.
- Added timeout parameters for maximum runtime and stuck progress detection to improve reliability.
- Enhanced README with detailed usage instructions, authentication behavior, and examples for incremental collection.
- Bumped version to v0.17.16.
This commit is contained in:
2025-10-23 13:00:01 +01:00
parent 4c53709e2d
commit da1119db7c
3 changed files with 460 additions and 30 deletions

View File

@@ -5,45 +5,129 @@ A comprehensive program that searches for all events related to a specific npub
## Usage ## Usage
```bash ```bash
go run main.go -npub <npub> [-since <timestamp>] [-until <timestamp>] go run main.go -key <nsec|npub> [-since <timestamp>] [-until <timestamp>] [-filter <file>] [-output <file>]
``` ```
Where: Where:
- `<npub>` is a bech32-encoded Nostr public key (starting with "npub1") - `<nsec|npub>` is either a bech32-encoded Nostr private key (nsec1...) or public key (npub1...)
- `<timestamp>` is a Unix timestamp (seconds since epoch) - optional - `<timestamp>` is a Unix timestamp (seconds since epoch) - optional
- `<file>` is a file path for bloom filter input/output - optional
### Parameters
- **`-key`**: Required. The bech32-encoded Nostr key to search for events
- **nsec**: Private key (enables authentication to relays that require it)
- **npub**: Public key (authentication disabled)
- **`-since`**: Optional. Start timestamp (Unix seconds). Only events after this time
- **`-until`**: Optional. End timestamp (Unix seconds). Only events before this time
- **`-filter`**: Optional. File containing base64-encoded bloom filter from previous runs
- **`-output`**: Optional. Output file for events (default: stdout)
### Authentication
When using an **nsec** (private key), the aggregator will:
- Derive the public key from the private key for event searching
- Attempt to authenticate to relays that require it (NIP-42)
- Continue working even if authentication fails on some relays
- Log authentication success/failure for each relay
When using an **npub** (public key), the aggregator will:
- Search for events using the provided public key
- Skip authentication (no private key available)
- Work with public relays that don't require authentication
### Behavior
- **Without `-filter`**: Creates new bloom filter, outputs to stdout or truncates output file
- **With `-filter`**: Loads existing bloom filter, automatically appends to output file
- **Bloom filter output**: Always written to stderr with timestamp information and base64 data
## Examples ## Examples
### Basic Usage
```bash ```bash
# Get all events related to a user (authored by and mentioning) # Get all events related to a user using public key (no authentication)
go run main.go -npub npub1234567890abcdef... go run main.go -key npub1234567890abcdef...
# Get all events related to a user using private key (with authentication)
go run main.go -key nsec1234567890abcdef...
# Get events related to a user since January 1, 2022 # Get events related to a user since January 1, 2022
go run main.go -npub npub1234567890abcdef... -since 1640995200 go run main.go -key npub1234567890abcdef... -since 1640995200
# Get events related to a user between two dates # Get events related to a user between two dates
go run main.go -npub npub1234567890abcdef... -since 1640995200 -until 1672531200 go run main.go -key npub1234567890abcdef... -since 1640995200 -until 1672531200
# Get events related to a user until December 31, 2022 # Get events related to a user until December 31, 2022
go run main.go -npub npub1234567890abcdef... -until 1672531200 go run main.go -key npub1234567890abcdef... -until 1672531200
```
### Incremental Collection with Bloom Filter
```bash
# First run: Collect initial events and save bloom filter (using npub)
go run main.go -key npub1234567890abcdef... -since 1640995200 -until 1672531200 -output events.jsonl 2>bloom_filter.txt
# Second run: Continue from where we left off, append new events (using nsec for auth)
go run main.go -key nsec1234567890abcdef... -since 1672531200 -until 1704067200 -filter bloom_filter.txt -output events.jsonl 2>bloom_filter_updated.txt
# Third run: Collect even more recent events
go run main.go -key nsec1234567890abcdef... -since 1704067200 -filter bloom_filter_updated.txt -output events.jsonl 2>bloom_filter_final.txt
```
### Output Redirection
```bash
# Events to file, bloom filter to stderr (visible in terminal)
go run main.go -key npub1... -output events.jsonl
# Events to file, bloom filter to separate file
go run main.go -key npub1... -output events.jsonl 2>bloom_filter.txt
# Events to stdout, bloom filter to file (useful for piping events)
go run main.go -key npub1... 2>bloom_filter.txt | jq .
# Using nsec for authentication to access private relays
go run main.go -key nsec1... -output events.jsonl 2>bloom_filter.txt
``` ```
## Features ## Features
### Core Functionality
- **Comprehensive event discovery**: Finds both events authored by the user and events that mention the user - **Comprehensive event discovery**: Finds both events authored by the user and events that mention the user
- **Dynamic relay discovery**: Automatically discovers and connects to new relays from relay list events (kind 10002) - **Dynamic relay discovery**: Automatically discovers and connects to new relays from relay list events (kind 10002)
- **Progressive backward fetching**: Systematically collects historical data in time-based batches - **Progressive backward fetching**: Systematically collects historical data in time-based batches
- **Triple filter approach**: Uses separate filters for authored events, p-tag mentions, and relay list events - **Triple filter approach**: Uses separate filters for authored events, p-tag mentions, and relay list events
- **Intelligent time management**: Works backwards from current time (or until timestamp) to since timestamp - **Intelligent time management**: Works backwards from current time (or until timestamp) to since timestamp
### Authentication & Access
- **Private key support**: Use nsec keys to authenticate to relays that require it (NIP-42)
- **Public key compatibility**: Continue to work with npub keys for public relay access
- **Graceful fallback**: Continue operation even if authentication fails on some relays
- **Auth-required relay access**: Access private notes and restricted content on authenticated relays
- **Flexible key input**: Automatically detects and handles both nsec and npub key formats
### Memory Management
- **Memory-efficient deduplication**: Uses bloom filter with ~0.1% false positive rate instead of unbounded maps - **Memory-efficient deduplication**: Uses bloom filter with ~0.1% false positive rate instead of unbounded maps
- **Fixed memory footprint**: Bloom filter uses only ~1.75MB for 1M events with controlled memory growth - **Fixed memory footprint**: Bloom filter uses only ~1.75MB for 1M events with controlled memory growth
- **Memory monitoring**: Real-time memory usage tracking and automatic garbage collection - **Memory monitoring**: Real-time memory usage tracking and automatic garbage collection
- **Persistent deduplication**: Bloom filter can be saved and reused across multiple runs
### Incremental Collection
- **Bloom filter persistence**: Save deduplication state between runs for efficient incremental collection
- **Automatic append mode**: When loading existing bloom filter, automatically appends to output file
- **Timestamp tracking**: Records actual time range of processed events in bloom filter output
- **Seamless continuation**: Resume collection from where previous run left off without duplicates
### Reliability & Performance
- Connects to multiple relays simultaneously with dynamic expansion - Connects to multiple relays simultaneously with dynamic expansion
- Outputs events in JSONL format (one JSON object per line) - Outputs events in JSONL format (one JSON object per line)
- Handles connection failures gracefully - Handles connection failures gracefully
- Continues running until all relay connections are closed - Continues running until all relay connections are closed
- Time-based filtering with Unix timestamps (since/until parameters) - Time-based filtering with Unix timestamps (since/until parameters)
- Input validation for timestamp ranges - Input validation for timestamp ranges
- Rate limiting and backoff for relay connection management
## Event Discovery ## Event Discovery
@@ -70,6 +154,61 @@ The aggregator uses an intelligent progressive backward fetching strategy:
4. **Efficient processing**: Processes each time batch completely before moving to the next 4. **Efficient processing**: Processes each time batch completely before moving to the next
5. **Boundary respect**: Stops when reaching the since timestamp or beginning of available data 5. **Boundary respect**: Stops when reaching the since timestamp or beginning of available data
## Incremental Collection Workflow
The aggregator supports efficient incremental data collection using persistent bloom filters. This allows you to build comprehensive event archives over time without re-processing duplicate events.
### How It Works
1. **First Run**: Creates a new bloom filter and collects events for the specified time range
2. **Bloom Filter Output**: At completion, outputs bloom filter summary to stderr with:
- Event statistics (processed count, estimated unique events)
- Time range covered (actual timestamps of collected events)
- Base64-encoded bloom filter data for reuse
3. **Subsequent Runs**: Load the saved bloom filter to skip already-seen events
4. **Automatic Append**: When using an existing filter, new events are appended to the output file
### Bloom Filter Output Format
The bloom filter output includes comprehensive metadata:
```
=== BLOOM FILTER SUMMARY ===
Events processed: 1247
Estimated unique events: 1247
Bloom filter size: 1.75 MB
False positive rate: ~0.1%
Hash functions: 10
Time range covered: 1640995200 to 1672531200
Time range (human): 2022-01-01T00:00:00Z to 2023-01-01T00:00:00Z
Bloom filter (base64):
[base64-encoded binary data]
=== END BLOOM FILTER ===
```
### Best Practices
- **Save bloom filters**: Always redirect stderr to a file to preserve the bloom filter
- **Sequential time ranges**: Use non-overlapping time ranges for optimal efficiency
- **Regular updates**: Update your bloom filter file after each run for the latest state
- **Backup filters**: Keep copies of bloom filter files for different time periods
### Example Workflow
```bash
# Month 1: January 2022 (using npub for public relays)
go run main.go -key npub1... -since 1640995200 -until 1643673600 -output jan2022.jsonl 2>filter_jan.txt
# Month 2: February 2022 (using nsec for auth-required relays, append to same file)
go run main.go -key nsec1... -since 1643673600 -until 1646092800 -filter filter_jan.txt -output all_events.jsonl 2>filter_feb.txt
# Month 3: March 2022 (continue with authentication for complete coverage)
go run main.go -key nsec1... -since 1646092800 -until 1648771200 -filter filter_feb.txt -output all_events.jsonl 2>filter_mar.txt
# Result: all_events.jsonl contains deduplicated events from all three months, including private relay content
```
## Memory Management ## Memory Management
The aggregator uses advanced memory management techniques to handle large-scale data collection: The aggregator uses advanced memory management techniques to handle large-scale data collection:
@@ -108,6 +247,8 @@ The program starts with the following initial relays:
## Output Format ## Output Format
### Event Output (stdout or -output file)
Each line of output is a JSON object representing a Nostr event with the following fields: Each line of output is a JSON object representing a Nostr event with the following fields:
- `id`: Event ID (hex) - `id`: Event ID (hex)
@@ -117,3 +258,32 @@ Each line of output is a JSON object representing a Nostr event with the followi
- `tags`: Array of tag arrays - `tags`: Array of tag arrays
- `content`: Event content string - `content`: Event content string
- `sig`: Event signature (hex) - `sig`: Event signature (hex)
### Bloom Filter Output (stderr)
At program completion, a comprehensive bloom filter summary is written to stderr containing:
- **Statistics**: Event counts, memory usage, performance metrics
- **Time Range**: Actual timestamp range of collected events (both Unix and human-readable)
- **Configuration**: Bloom filter parameters (size, hash functions, false positive rate)
- **Binary Data**: Base64-encoded bloom filter for reuse in subsequent runs
The bloom filter output is structured with clear markers (`=== BLOOM FILTER SUMMARY ===` and `=== END BLOOM FILTER ===`) making it easy to parse and extract the base64 data programmatically.
### Output Separation
- **Events**: Always go to stdout (default) or the file specified by `-output`
- **Bloom Filter**: Always goes to stderr, allowing separate redirection
- **Logs**: Runtime information and progress updates go to stderr
This separation allows flexible output handling:
```bash
# Events to file, bloom filter visible in terminal
./aggregator -npub npub1... -output events.jsonl
# Both events and bloom filter to separate files
./aggregator -npub npub1... -output events.jsonl 2>bloom_filter.txt
# Events piped to another program, bloom filter saved
./aggregator -npub npub1... 2>bloom_filter.txt | jq '.content'
```

View File

@@ -17,6 +17,7 @@ import (
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
"lol.mleku.dev/log" "lol.mleku.dev/log"
"next.orly.dev/pkg/crypto/p256k"
"next.orly.dev/pkg/crypto/sha256" "next.orly.dev/pkg/crypto/sha256"
"next.orly.dev/pkg/encoders/bech32encoding" "next.orly.dev/pkg/encoders/bech32encoding"
"next.orly.dev/pkg/encoders/event" "next.orly.dev/pkg/encoders/event"
@@ -25,6 +26,7 @@ import (
"next.orly.dev/pkg/encoders/kind" "next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag" "next.orly.dev/pkg/encoders/tag"
"next.orly.dev/pkg/encoders/timestamp" "next.orly.dev/pkg/encoders/timestamp"
"next.orly.dev/pkg/interfaces/signer"
"next.orly.dev/pkg/protocol/ws" "next.orly.dev/pkg/protocol/ws"
) )
@@ -40,6 +42,11 @@ const (
maxRetryDelay = 60 * time.Second maxRetryDelay = 60 * time.Second
maxRetries = 5 maxRetries = 5
batchSize = time.Hour * 24 * 7 // 1 week batches batchSize = time.Hour * 24 * 7 // 1 week batches
// Timeout parameters
maxRunTime = 30 * time.Minute // Maximum total runtime
relayTimeout = 5 * time.Minute // Timeout per relay
stuckProgressTimeout = 2 * time.Minute // Timeout if no progress is made
) )
var relays = []string{ var relays = []string{
@@ -297,13 +304,57 @@ type Aggregator struct {
relayStatesMutex sync.RWMutex relayStatesMutex sync.RWMutex
completionTracker *CompletionTracker completionTracker *CompletionTracker
timeWindows []TimeWindow timeWindows []TimeWindow
// Track actual time range of processed events
actualSince *timestamp.T
actualUntil *timestamp.T
timeMutex sync.RWMutex
// Bloom filter file for loading existing state
bloomFilterFile string
appendMode bool
// Progress tracking for timeout detection
startTime time.Time
lastProgress int
lastProgressTime time.Time
progressMutex sync.RWMutex
// Authentication support
signer signer.I // Optional signer for relay authentication
hasPrivateKey bool // Whether we have a private key for auth
} }
func NewAggregator(npub string, since, until *timestamp.T) (agg *Aggregator, err error) { func NewAggregator(keyInput string, since, until *timestamp.T, bloomFilterFile string) (agg *Aggregator, err error) {
// Decode npub to get pubkey bytes
var pubkeyBytes []byte var pubkeyBytes []byte
if pubkeyBytes, err = bech32encoding.NpubToBytes(npub); chk.E(err) { var signer signer.I
return nil, fmt.Errorf("failed to decode npub: %w", err) var hasPrivateKey bool
// Determine if input is nsec (private key) or npub (public key)
if strings.HasPrefix(keyInput, "nsec") {
// Handle nsec (private key) - derive pubkey and enable authentication
var secretBytes []byte
if secretBytes, err = bech32encoding.NsecToBytes(keyInput); chk.E(err) {
return nil, fmt.Errorf("failed to decode nsec: %w", err)
}
// Create signer from private key
signer = &p256k.Signer{}
if err = signer.InitSec(secretBytes); chk.E(err) {
return nil, fmt.Errorf("failed to initialize signer: %w", err)
}
// Get public key from signer
pubkeyBytes = signer.Pub()
hasPrivateKey = true
log.I.F("using private key (nsec) - authentication enabled")
} else if strings.HasPrefix(keyInput, "npub") {
// Handle npub (public key only) - no authentication
if pubkeyBytes, err = bech32encoding.NpubToBytes(keyInput); chk.E(err) {
return nil, fmt.Errorf("failed to decode npub: %w", err)
}
hasPrivateKey = false
log.I.F("using public key (npub) - authentication disabled")
} else {
return nil, fmt.Errorf("key input must start with 'nsec' or 'npub', got: %s", keyInput[:4])
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@@ -314,10 +365,27 @@ func NewAggregator(npub string, since, until *timestamp.T) (agg *Aggregator, err
progressiveEnd = timestamp.Now() progressiveEnd = timestamp.Now()
} }
// Initialize bloom filter - either new or loaded from file
var bloomFilter *BloomFilter
var appendMode bool
if bloomFilterFile != "" {
// Try to load existing bloom filter
if bloomFilter, err = loadBloomFilterFromFile(bloomFilterFile); err != nil {
log.W.F("failed to load bloom filter from %s: %v, creating new filter", bloomFilterFile, err)
bloomFilter = NewBloomFilter(bloomFilterBits, bloomFilterHashFuncs)
} else {
log.I.F("loaded existing bloom filter from %s", bloomFilterFile)
appendMode = true
}
} else {
bloomFilter = NewBloomFilter(bloomFilterBits, bloomFilterHashFuncs)
}
agg = &Aggregator{ agg = &Aggregator{
npub: npub, npub: keyInput,
pubkeyBytes: pubkeyBytes, pubkeyBytes: pubkeyBytes,
seenEvents: NewBloomFilter(bloomFilterBits, bloomFilterHashFuncs), seenEvents: bloomFilter,
seenRelays: make(map[string]bool), seenRelays: make(map[string]bool),
relayQueue: make(chan string, 100), relayQueue: make(chan string, 100),
ctx: ctx, ctx: ctx,
@@ -329,6 +397,13 @@ func NewAggregator(npub string, since, until *timestamp.T) (agg *Aggregator, err
eventCount: 0, eventCount: 0,
relayStates: make(map[string]*RelayState), relayStates: make(map[string]*RelayState),
completionTracker: NewCompletionTracker(), completionTracker: NewCompletionTracker(),
bloomFilterFile: bloomFilterFile,
appendMode: appendMode,
startTime: time.Now(),
lastProgress: 0,
lastProgressTime: time.Now(),
signer: signer,
hasPrivateKey: hasPrivateKey,
} }
// Calculate time windows for progressive fetching // Calculate time windows for progressive fetching
@@ -342,6 +417,54 @@ func NewAggregator(npub string, since, until *timestamp.T) (agg *Aggregator, err
return return
} }
// loadBloomFilterFromFile loads a bloom filter from a file containing base64 encoded data
func loadBloomFilterFromFile(filename string) (*BloomFilter, error) {
data, err := os.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
// Find the base64 data between the markers
content := string(data)
startMarker := "Bloom filter (base64):\n"
endMarker := "\n=== END BLOOM FILTER ==="
startIdx := strings.Index(content, startMarker)
if startIdx == -1 {
return nil, fmt.Errorf("bloom filter start marker not found")
}
startIdx += len(startMarker)
endIdx := strings.Index(content[startIdx:], endMarker)
if endIdx == -1 {
return nil, fmt.Errorf("bloom filter end marker not found")
}
base64Data := strings.TrimSpace(content[startIdx : startIdx+endIdx])
return FromBase64(base64Data)
}
// updateActualTimeRange updates the actual time range of processed events
func (a *Aggregator) updateActualTimeRange(eventTime *timestamp.T) {
a.timeMutex.Lock()
defer a.timeMutex.Unlock()
if a.actualSince == nil || eventTime.I64() < a.actualSince.I64() {
a.actualSince = eventTime
}
if a.actualUntil == nil || eventTime.I64() > a.actualUntil.I64() {
a.actualUntil = eventTime
}
}
// getActualTimeRange returns the actual time range of processed events
func (a *Aggregator) getActualTimeRange() (since, until *timestamp.T) {
a.timeMutex.RLock()
defer a.timeMutex.RUnlock()
return a.actualSince, a.actualUntil
}
// calculateTimeWindows pre-calculates all time windows for progressive fetching // calculateTimeWindows pre-calculates all time windows for progressive fetching
func (a *Aggregator) calculateTimeWindows() { func (a *Aggregator) calculateTimeWindows() {
if a.since == nil { if a.since == nil {
@@ -420,6 +543,12 @@ func (a *Aggregator) markRelayRateLimited(relayURL string) {
state.rateLimited = true state.rateLimited = true
state.retryCount++ state.retryCount++
if state.retryCount >= maxRetries {
log.W.F("relay %s permanently failed after %d retries", relayURL, maxRetries)
state.completed = true // Mark as completed to exclude from future attempts
return
}
// Exponential backoff with jitter // Exponential backoff with jitter
delay := time.Duration(float64(baseRetryDelay) * math.Pow(2, float64(state.retryCount-1))) delay := time.Duration(float64(baseRetryDelay) * math.Pow(2, float64(state.retryCount-1)))
if delay > maxRetryDelay { if delay > maxRetryDelay {
@@ -457,19 +586,43 @@ func (a *Aggregator) checkAllCompleted() bool {
// Check if all relay-time window combinations are completed // Check if all relay-time window combinations are completed
totalCombinations := len(allRelays) * len(a.timeWindows) totalCombinations := len(allRelays) * len(a.timeWindows)
completedCombinations := 0 completedCombinations := 0
availableCombinations := 0 // Combinations from relays that haven't permanently failed
for _, relayURL := range allRelays { for _, relayURL := range allRelays {
state := a.getOrCreateRelayState(relayURL)
state.mutex.RLock()
isRelayFailed := state.retryCount >= maxRetries
state.mutex.RUnlock()
for _, window := range a.timeWindows { for _, window := range a.timeWindows {
windowKey := fmt.Sprintf("%d-%d", window.since.I64(), window.until.I64()) windowKey := fmt.Sprintf("%d-%d", window.since.I64(), window.until.I64())
if a.completionTracker.IsCompleted(relayURL, windowKey) { if a.completionTracker.IsCompleted(relayURL, windowKey) {
completedCombinations++ completedCombinations++
} }
// Only count combinations from relays that haven't permanently failed
if !isRelayFailed {
availableCombinations++
}
} }
} }
// Update progress tracking
a.progressMutex.Lock()
if completedCombinations > a.lastProgress {
a.lastProgress = completedCombinations
a.lastProgressTime = time.Now()
}
a.progressMutex.Unlock()
if totalCombinations > 0 { if totalCombinations > 0 {
progress := float64(completedCombinations) / float64(totalCombinations) * 100 progress := float64(completedCombinations) / float64(totalCombinations) * 100
log.I.F("completion progress: %d/%d (%.1f%%)", completedCombinations, totalCombinations, progress) log.I.F("completion progress: %d/%d (%.1f%%) - available: %d", completedCombinations, totalCombinations, progress, availableCombinations)
// Consider complete if we've finished all available combinations (excluding permanently failed relays)
if availableCombinations > 0 {
return completedCombinations >= availableCombinations
}
return completedCombinations == totalCombinations return completedCombinations == totalCombinations
} }
@@ -561,6 +714,19 @@ func (a *Aggregator) connectToRelay(relayURL string) {
log.I.F("connected to relay: %s", relayURL) log.I.F("connected to relay: %s", relayURL)
// Attempt authentication if we have a private key
if a.hasPrivateKey && a.signer != nil {
authCtx, authCancel := context.WithTimeout(a.ctx, 5*time.Second)
defer authCancel()
if err = client.Auth(authCtx, a.signer); err != nil {
log.W.F("authentication failed for relay %s: %v", relayURL, err)
// Continue without authentication - some relays may not require it
} else {
log.I.F("successfully authenticated to relay: %s", relayURL)
}
}
// Perform progressive backward fetching // Perform progressive backward fetching
a.progressiveFetch(client, relayURL) a.progressiveFetch(client, relayURL)
} }
@@ -666,14 +832,24 @@ func (a *Aggregator) fetchTimeWindow(client *ws.Client, relayURL string, window
Until: window.until, Until: window.until,
} }
// Subscribe to events using all filters // Subscribe to events using all filters with a dedicated context and timeout
// Use a longer timeout to avoid premature cancellation by completion monitor
subCtx, subCancel := context.WithTimeout(context.Background(), 10*time.Minute)
var sub *ws.Subscription var sub *ws.Subscription
var err error var err error
if sub, err = client.Subscribe(a.ctx, filter.NewS(f1, f2, f3)); chk.E(err) { if sub, err = client.Subscribe(subCtx, filter.NewS(f1, f2, f3)); chk.E(err) {
subCancel() // Cancel context on error
log.E.F("failed to subscribe to relay %s: %v", relayURL, err) log.E.F("failed to subscribe to relay %s: %v", relayURL, err)
return false return false
} }
// Ensure subscription is cleaned up when we're done
defer func() {
sub.Unsub()
subCancel()
}()
log.I.F("subscribed to batch from %s for pubkey %s (authored by, mentioning, and relay lists)", relayURL, a.npub) log.I.F("subscribed to batch from %s for pubkey %s (authored by, mentioning, and relay lists)", relayURL, a.npub)
// Process events for this batch // Process events for this batch
@@ -683,13 +859,14 @@ func (a *Aggregator) fetchTimeWindow(client *ws.Client, relayURL string, window
for !batchComplete && !rateLimited { for !batchComplete && !rateLimited {
select { select {
case <-a.ctx.Done(): case <-a.ctx.Done():
sub.Unsub() log.I.F("aggregator context cancelled, stopping batch for relay %s", relayURL)
log.I.F("context cancelled, stopping batch for relay %s", relayURL) return false
case <-subCtx.Done():
log.W.F("subscription timeout for relay %s", relayURL)
return false return false
case ev := <-sub.Events: case ev := <-sub.Events:
if ev == nil { if ev == nil {
log.I.F("event channel closed for relay %s", relayURL) log.I.F("event channel closed for relay %s", relayURL)
sub.Unsub()
return false return false
} }
@@ -703,6 +880,9 @@ func (a *Aggregator) fetchTimeWindow(client *ws.Client, relayURL string, window
// Mark event as seen // Mark event as seen
a.markEventSeen(eventID) a.markEventSeen(eventID)
// Update actual time range
a.updateActualTimeRange(timestamp.FromUnix(ev.CreatedAt))
// Process relay list events to discover new relays // Process relay list events to discover new relays
if ev.Kind == 10002 { if ev.Kind == 10002 {
a.processRelayListEvent(ev) a.processRelayListEvent(ev)
@@ -757,7 +937,7 @@ func (a *Aggregator) isRateLimitMessage(message string) bool {
} }
func (a *Aggregator) Start() (err error) { func (a *Aggregator) Start() (err error) {
log.I.F("starting aggregator for npub: %s", a.npub) log.I.F("starting aggregator for key: %s", a.npub)
log.I.F("pubkey bytes: %s", hex.Enc(a.pubkeyBytes)) log.I.F("pubkey bytes: %s", hex.Enc(a.pubkeyBytes))
log.I.F("bloom filter: %d bits (%.2fMB), %d hash functions, ~0.1%% false positive rate", log.I.F("bloom filter: %d bits (%.2fMB), %d hash functions, ~0.1%% false positive rate",
bloomFilterBits, float64(a.seenEvents.MemoryUsage())/1024/1024, bloomFilterHashFuncs) bloomFilterBits, float64(a.seenEvents.MemoryUsage())/1024/1024, bloomFilterHashFuncs)
@@ -809,9 +989,8 @@ func (a *Aggregator) completionMonitor() {
case <-a.ctx.Done(): case <-a.ctx.Done():
return return
case <-ticker.C: case <-ticker.C:
if a.checkAllCompleted() { // Check for various termination conditions
log.I.F("all relay-time window combinations completed, terminating aggregator") if a.shouldTerminate() {
a.cancel() // This will trigger context cancellation
return return
} }
@@ -821,6 +1000,38 @@ func (a *Aggregator) completionMonitor() {
} }
} }
// shouldTerminate checks various conditions that should cause the aggregator to terminate
func (a *Aggregator) shouldTerminate() bool {
now := time.Now()
// Check if all work is completed
if a.checkAllCompleted() {
log.I.F("all relay-time window combinations completed, terminating aggregator")
a.cancel()
return true
}
// Check for maximum runtime timeout
if now.Sub(a.startTime) > maxRunTime {
log.W.F("maximum runtime (%v) exceeded, terminating aggregator", maxRunTime)
a.cancel()
return true
}
// Check for stuck progress timeout
a.progressMutex.RLock()
timeSinceProgress := now.Sub(a.lastProgressTime)
a.progressMutex.RUnlock()
if timeSinceProgress > stuckProgressTimeout {
log.W.F("no progress made for %v, terminating aggregator", timeSinceProgress)
a.cancel()
return true
}
return false
}
// retryRateLimitedRelays checks for rate-limited relays that can be retried // retryRateLimitedRelays checks for rate-limited relays that can be retried
func (a *Aggregator) retryRateLimitedRelays() { func (a *Aggregator) retryRateLimitedRelays() {
a.relayStatesMutex.RLock() a.relayStatesMutex.RLock()
@@ -890,6 +1101,9 @@ func (a *Aggregator) outputBloomFilter() {
estimatedEvents := a.seenEvents.EstimatedItems() estimatedEvents := a.seenEvents.EstimatedItems()
memoryUsage := float64(a.seenEvents.MemoryUsage()) / 1024 / 1024 memoryUsage := float64(a.seenEvents.MemoryUsage()) / 1024 / 1024
// Get actual time range of processed events
actualSince, actualUntil := a.getActualTimeRange()
// Output to stderr so it doesn't interfere with JSONL event output to stdout // Output to stderr so it doesn't interfere with JSONL event output to stdout
fmt.Fprintf(os.Stderr, "\n=== BLOOM FILTER SUMMARY ===\n") fmt.Fprintf(os.Stderr, "\n=== BLOOM FILTER SUMMARY ===\n")
fmt.Fprintf(os.Stderr, "Events processed: %d\n", a.eventCount) fmt.Fprintf(os.Stderr, "Events processed: %d\n", a.eventCount)
@@ -897,6 +1111,23 @@ func (a *Aggregator) outputBloomFilter() {
fmt.Fprintf(os.Stderr, "Bloom filter size: %.2f MB\n", memoryUsage) fmt.Fprintf(os.Stderr, "Bloom filter size: %.2f MB\n", memoryUsage)
fmt.Fprintf(os.Stderr, "False positive rate: ~0.1%%\n") fmt.Fprintf(os.Stderr, "False positive rate: ~0.1%%\n")
fmt.Fprintf(os.Stderr, "Hash functions: %d\n", bloomFilterHashFuncs) fmt.Fprintf(os.Stderr, "Hash functions: %d\n", bloomFilterHashFuncs)
// Output time range information
if actualSince != nil && actualUntil != nil {
fmt.Fprintf(os.Stderr, "Time range covered: %d to %d\n", actualSince.I64(), actualUntil.I64())
fmt.Fprintf(os.Stderr, "Time range (human): %s to %s\n",
time.Unix(actualSince.I64(), 0).UTC().Format(time.RFC3339),
time.Unix(actualUntil.I64(), 0).UTC().Format(time.RFC3339))
} else if a.since != nil && a.until != nil {
// Fallback to requested range if no events were processed
fmt.Fprintf(os.Stderr, "Requested time range: %d to %d\n", a.since.I64(), a.until.I64())
fmt.Fprintf(os.Stderr, "Requested range (human): %s to %s\n",
time.Unix(a.since.I64(), 0).UTC().Format(time.RFC3339),
time.Unix(a.until.I64(), 0).UTC().Format(time.RFC3339))
} else {
fmt.Fprintf(os.Stderr, "Time range: unbounded\n")
}
fmt.Fprintf(os.Stderr, "\nBloom filter (base64):\n%s\n", base64Filter) fmt.Fprintf(os.Stderr, "\nBloom filter (base64):\n%s\n", base64Filter)
fmt.Fprintf(os.Stderr, "=== END BLOOM FILTER ===\n") fmt.Fprintf(os.Stderr, "=== END BLOOM FILTER ===\n")
} }
@@ -958,19 +1189,28 @@ func parseTimestamp(s string) (ts *timestamp.T, err error) {
} }
func main() { func main() {
var npub string var keyInput string
var sinceStr string var sinceStr string
var untilStr string var untilStr string
var bloomFilterFile string
var outputFile string
flag.StringVar(&npub, "npub", "", "npub (bech32-encoded public key) to search for events") flag.StringVar(&keyInput, "key", "", "nsec (private key) or npub (public key) to search for events")
flag.StringVar(&sinceStr, "since", "", "start timestamp (Unix timestamp) - only events after this time") flag.StringVar(&sinceStr, "since", "", "start timestamp (Unix timestamp) - only events after this time")
flag.StringVar(&untilStr, "until", "", "end timestamp (Unix timestamp) - only events before this time") flag.StringVar(&untilStr, "until", "", "end timestamp (Unix timestamp) - only events before this time")
flag.StringVar(&bloomFilterFile, "filter", "", "file containing base64 encoded bloom filter to exclude already seen events")
flag.StringVar(&outputFile, "output", "", "output file for events (default: stdout)")
flag.Parse() flag.Parse()
if npub == "" { if keyInput == "" {
fmt.Fprintf(os.Stderr, "Usage: %s -npub <npub> [-since <timestamp>] [-until <timestamp>]\n", os.Args[0]) fmt.Fprintf(os.Stderr, "Usage: %s -key <nsec|npub> [-since <timestamp>] [-until <timestamp>] [-filter <file>] [-output <file>]\n", os.Args[0])
fmt.Fprintf(os.Stderr, "Example: %s -npub npub1... -since 1640995200 -until 1672531200\n", os.Args[0]) fmt.Fprintf(os.Stderr, "Example: %s -key npub1... -since 1640995200 -until 1672531200 -filter bloom.txt -output events.jsonl\n", os.Args[0])
fmt.Fprintf(os.Stderr, "Example: %s -key nsec1... -since 1640995200 -until 1672531200 -output events.jsonl\n", os.Args[0])
fmt.Fprintf(os.Stderr, "\nKey types:\n")
fmt.Fprintf(os.Stderr, " nsec: Private key (enables authentication to relays that require it)\n")
fmt.Fprintf(os.Stderr, " npub: Public key (authentication disabled)\n")
fmt.Fprintf(os.Stderr, "\nTimestamps should be Unix timestamps (seconds since epoch)\n") fmt.Fprintf(os.Stderr, "\nTimestamps should be Unix timestamps (seconds since epoch)\n")
fmt.Fprintf(os.Stderr, "If -filter is provided, output will be appended to the output file\n")
os.Exit(1) os.Exit(1)
} }
@@ -993,8 +1233,28 @@ func main() {
os.Exit(1) os.Exit(1)
} }
// Set up output redirection if needed
if outputFile != "" {
var file *os.File
if bloomFilterFile != "" {
// Append mode if bloom filter is provided
file, err = os.OpenFile(outputFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
} else {
// Truncate mode if no bloom filter
file, err = os.OpenFile(outputFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
}
if err != nil {
fmt.Fprintf(os.Stderr, "Error opening output file: %v\n", err)
os.Exit(1)
}
defer file.Close()
// Redirect stdout to file
os.Stdout = file
}
var agg *Aggregator var agg *Aggregator
if agg, err = NewAggregator(npub, since, until); chk.E(err) { if agg, err = NewAggregator(keyInput, since, until, bloomFilterFile); chk.E(err) {
fmt.Fprintf(os.Stderr, "Error creating aggregator: %v\n", err) fmt.Fprintf(os.Stderr, "Error creating aggregator: %v\n", err)
os.Exit(1) os.Exit(1)
} }

View File

@@ -1 +1 @@
v0.17.15 v0.17.16