forked from mleku/next.orly.dev
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
da1119db7c
|
|||
|
4c53709e2d
|
|||
|
a4fc3d8d9b
|
@@ -59,6 +59,9 @@ type C struct {
|
||||
// Sprocket settings
|
||||
SprocketEnabled bool `env:"ORLY_SPROCKET_ENABLED" default:"false" usage:"enable sprocket event processing plugin system"`
|
||||
|
||||
// Spider settings
|
||||
SpiderMode string `env:"ORLY_SPIDER_MODE" default:"none" usage:"spider mode for syncing events: none, follows"`
|
||||
|
||||
PolicyEnabled bool `env:"ORLY_POLICY_ENABLED" default:"false" usage:"enable policy-based event processing (configuration found in $HOME/.config/ORLY/policy.json)"`
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package app
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
@@ -15,6 +16,42 @@ import (
|
||||
"next.orly.dev/pkg/encoders/envelopes/reqenvelope"
|
||||
)
|
||||
|
||||
// validateJSONMessage checks if a message contains invalid control characters
|
||||
// that would cause JSON parsing to fail
|
||||
func validateJSONMessage(msg []byte) (err error) {
|
||||
for i, b := range msg {
|
||||
// Check for invalid control characters in JSON strings
|
||||
if b < 32 && b != '\t' && b != '\n' && b != '\r' {
|
||||
// Allow some control characters that might be valid in certain contexts
|
||||
// but reject form feed (\f), backspace (\b), and other problematic ones
|
||||
switch b {
|
||||
case '\b', '\f', 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07,
|
||||
0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17,
|
||||
0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F:
|
||||
return fmt.Errorf("invalid control character 0x%02X at position %d", b, i)
|
||||
}
|
||||
}
|
||||
// Check for non-printable characters that might indicate binary data
|
||||
if b > 127 && !unicode.IsPrint(rune(b)) {
|
||||
// Allow valid UTF-8 sequences, but be suspicious of random binary data
|
||||
if i < len(msg)-1 {
|
||||
// Quick check: if we see a lot of high-bit characters in sequence,
|
||||
// it might be binary data masquerading as text
|
||||
highBitCount := 0
|
||||
for j := i; j < len(msg) && j < i+10; j++ {
|
||||
if msg[j] > 127 {
|
||||
highBitCount++
|
||||
}
|
||||
}
|
||||
if highBitCount > 7 { // More than 70% high-bit chars in a 10-byte window
|
||||
return fmt.Errorf("suspicious binary data detected at position %d", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (l *Listener) HandleMessage(msg []byte, remote string) {
|
||||
// Handle blacklisted IPs - discard messages but keep connection open until timeout
|
||||
if l.isBlacklisted {
|
||||
@@ -35,6 +72,17 @@ func (l *Listener) HandleMessage(msg []byte, remote string) {
|
||||
}
|
||||
// log.D.F("%s processing message (len=%d): %s", remote, len(msg), msgPreview)
|
||||
|
||||
// Validate message for invalid characters before processing
|
||||
if err := validateJSONMessage(msg); err != nil {
|
||||
log.E.F("%s message validation FAILED (len=%d): %v", remote, len(msg), err)
|
||||
log.T.F("%s invalid message content: %q", remote, msgPreview)
|
||||
// Send error notice to client
|
||||
if noticeErr := noticeenvelope.NewFrom("invalid message format: " + err.Error()).Write(l); noticeErr != nil {
|
||||
log.E.F("%s failed to send validation error notice: %v", remote, noticeErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
l.msgCount++
|
||||
var err error
|
||||
var t string
|
||||
|
||||
@@ -35,6 +35,12 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||
// var rem []byte
|
||||
env := reqenvelope.New()
|
||||
if _, err = env.Unmarshal(msg); chk.E(err) {
|
||||
// Provide more specific error context for JSON parsing failures
|
||||
if strings.Contains(err.Error(), "invalid character") {
|
||||
log.E.F("REQ JSON parsing failed from %s: %v", l.remote, err)
|
||||
log.T.F("REQ malformed message from %s: %q", l.remote, string(msg))
|
||||
return normalize.Error.Errorf("malformed REQ message: %s", err.Error())
|
||||
}
|
||||
return normalize.Error.Errorf(err.Error())
|
||||
}
|
||||
|
||||
|
||||
50
app/main.go
50
app/main.go
@@ -10,11 +10,13 @@ import (
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/app/config"
|
||||
"next.orly.dev/pkg/acl"
|
||||
"next.orly.dev/pkg/crypto/keys"
|
||||
"next.orly.dev/pkg/database"
|
||||
"next.orly.dev/pkg/encoders/bech32encoding"
|
||||
"next.orly.dev/pkg/policy"
|
||||
"next.orly.dev/pkg/protocol/publish"
|
||||
"next.orly.dev/pkg/spider"
|
||||
)
|
||||
|
||||
func Run(
|
||||
@@ -69,6 +71,48 @@ func Run(
|
||||
|
||||
// Initialize policy manager
|
||||
l.policyManager = policy.NewWithManager(ctx, cfg.AppName, cfg.PolicyEnabled)
|
||||
|
||||
// Initialize spider manager based on mode
|
||||
if cfg.SpiderMode != "none" {
|
||||
if l.spiderManager, err = spider.New(ctx, db, l.publishers, cfg.SpiderMode); chk.E(err) {
|
||||
log.E.F("failed to create spider manager: %v", err)
|
||||
} else {
|
||||
// Set up callbacks for follows mode
|
||||
if cfg.SpiderMode == "follows" {
|
||||
l.spiderManager.SetCallbacks(
|
||||
func() []string {
|
||||
// Get admin relays from follows ACL if available
|
||||
for _, aclInstance := range acl.Registry.ACL {
|
||||
if aclInstance.Type() == "follows" {
|
||||
if follows, ok := aclInstance.(*acl.Follows); ok {
|
||||
return follows.AdminRelays()
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
func() [][]byte {
|
||||
// Get followed pubkeys from follows ACL if available
|
||||
for _, aclInstance := range acl.Registry.ACL {
|
||||
if aclInstance.Type() == "follows" {
|
||||
if follows, ok := aclInstance.(*acl.Follows); ok {
|
||||
return follows.GetFollowedPubkeys()
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if err = l.spiderManager.Start(); chk.E(err) {
|
||||
log.E.F("failed to start spider manager: %v", err)
|
||||
} else {
|
||||
log.I.F("spider manager started successfully in '%s' mode", cfg.SpiderMode)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize the user interface
|
||||
l.UserInterface()
|
||||
|
||||
@@ -135,6 +179,12 @@ func Run(
|
||||
<-ctx.Done()
|
||||
log.I.F("shutting down HTTP server gracefully")
|
||||
|
||||
// Stop spider manager if running
|
||||
if l.spiderManager != nil {
|
||||
l.spiderManager.Stop()
|
||||
log.I.F("spider manager stopped")
|
||||
}
|
||||
|
||||
// Create shutdown context with timeout
|
||||
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancelShutdown()
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"next.orly.dev/pkg/protocol/auth"
|
||||
"next.orly.dev/pkg/protocol/httpauth"
|
||||
"next.orly.dev/pkg/protocol/publish"
|
||||
"next.orly.dev/pkg/spider"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
@@ -47,6 +48,7 @@ type Server struct {
|
||||
paymentProcessor *PaymentProcessor
|
||||
sprocketManager *SprocketManager
|
||||
policyManager *policy.P
|
||||
spiderManager *spider.Spider
|
||||
}
|
||||
|
||||
// isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system
|
||||
|
||||
289
cmd/aggregator/README.md
Normal file
289
cmd/aggregator/README.md
Normal file
@@ -0,0 +1,289 @@
|
||||
# Nostr Event Aggregator
|
||||
|
||||
A comprehensive program that searches for all events related to a specific npub across multiple Nostr relays and outputs them in JSONL format to stdout. The program finds both events authored by the user and events that mention the user in "p" tags. It features dynamic relay discovery from relay list events and progressive backward time-based fetching for complete historical data collection.
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
go run main.go -key <nsec|npub> [-since <timestamp>] [-until <timestamp>] [-filter <file>] [-output <file>]
|
||||
```
|
||||
|
||||
Where:
|
||||
- `<nsec|npub>` is either a bech32-encoded Nostr private key (nsec1...) or public key (npub1...)
|
||||
- `<timestamp>` is a Unix timestamp (seconds since epoch) - optional
|
||||
- `<file>` is a file path for bloom filter input/output - optional
|
||||
|
||||
### Parameters
|
||||
|
||||
- **`-key`**: Required. The bech32-encoded Nostr key to search for events
|
||||
- **nsec**: Private key (enables authentication to relays that require it)
|
||||
- **npub**: Public key (authentication disabled)
|
||||
- **`-since`**: Optional. Start timestamp (Unix seconds). Only events after this time
|
||||
- **`-until`**: Optional. End timestamp (Unix seconds). Only events before this time
|
||||
- **`-filter`**: Optional. File containing base64-encoded bloom filter from previous runs
|
||||
- **`-output`**: Optional. Output file for events (default: stdout)
|
||||
|
||||
### Authentication
|
||||
|
||||
When using an **nsec** (private key), the aggregator will:
|
||||
- Derive the public key from the private key for event searching
|
||||
- Attempt to authenticate to relays that require it (NIP-42)
|
||||
- Continue working even if authentication fails on some relays
|
||||
- Log authentication success/failure for each relay
|
||||
|
||||
When using an **npub** (public key), the aggregator will:
|
||||
- Search for events using the provided public key
|
||||
- Skip authentication (no private key available)
|
||||
- Work with public relays that don't require authentication
|
||||
|
||||
### Behavior
|
||||
|
||||
- **Without `-filter`**: Creates new bloom filter, outputs to stdout or truncates output file
|
||||
- **With `-filter`**: Loads existing bloom filter, automatically appends to output file
|
||||
- **Bloom filter output**: Always written to stderr with timestamp information and base64 data
|
||||
|
||||
## Examples
|
||||
|
||||
### Basic Usage
|
||||
|
||||
```bash
|
||||
# Get all events related to a user using public key (no authentication)
|
||||
go run main.go -key npub1234567890abcdef...
|
||||
|
||||
# Get all events related to a user using private key (with authentication)
|
||||
go run main.go -key nsec1234567890abcdef...
|
||||
|
||||
# Get events related to a user since January 1, 2022
|
||||
go run main.go -key npub1234567890abcdef... -since 1640995200
|
||||
|
||||
# Get events related to a user between two dates
|
||||
go run main.go -key npub1234567890abcdef... -since 1640995200 -until 1672531200
|
||||
|
||||
# Get events related to a user until December 31, 2022
|
||||
go run main.go -key npub1234567890abcdef... -until 1672531200
|
||||
```
|
||||
|
||||
### Incremental Collection with Bloom Filter
|
||||
|
||||
```bash
|
||||
# First run: Collect initial events and save bloom filter (using npub)
|
||||
go run main.go -key npub1234567890abcdef... -since 1640995200 -until 1672531200 -output events.jsonl 2>bloom_filter.txt
|
||||
|
||||
# Second run: Continue from where we left off, append new events (using nsec for auth)
|
||||
go run main.go -key nsec1234567890abcdef... -since 1672531200 -until 1704067200 -filter bloom_filter.txt -output events.jsonl 2>bloom_filter_updated.txt
|
||||
|
||||
# Third run: Collect even more recent events
|
||||
go run main.go -key nsec1234567890abcdef... -since 1704067200 -filter bloom_filter_updated.txt -output events.jsonl 2>bloom_filter_final.txt
|
||||
```
|
||||
|
||||
### Output Redirection
|
||||
|
||||
```bash
|
||||
# Events to file, bloom filter to stderr (visible in terminal)
|
||||
go run main.go -key npub1... -output events.jsonl
|
||||
|
||||
# Events to file, bloom filter to separate file
|
||||
go run main.go -key npub1... -output events.jsonl 2>bloom_filter.txt
|
||||
|
||||
# Events to stdout, bloom filter to file (useful for piping events)
|
||||
go run main.go -key npub1... 2>bloom_filter.txt | jq .
|
||||
|
||||
# Using nsec for authentication to access private relays
|
||||
go run main.go -key nsec1... -output events.jsonl 2>bloom_filter.txt
|
||||
```
|
||||
|
||||
## Features
|
||||
|
||||
### Core Functionality
|
||||
- **Comprehensive event discovery**: Finds both events authored by the user and events that mention the user
|
||||
- **Dynamic relay discovery**: Automatically discovers and connects to new relays from relay list events (kind 10002)
|
||||
- **Progressive backward fetching**: Systematically collects historical data in time-based batches
|
||||
- **Triple filter approach**: Uses separate filters for authored events, p-tag mentions, and relay list events
|
||||
- **Intelligent time management**: Works backwards from current time (or until timestamp) to since timestamp
|
||||
|
||||
### Authentication & Access
|
||||
- **Private key support**: Use nsec keys to authenticate to relays that require it (NIP-42)
|
||||
- **Public key compatibility**: Continue to work with npub keys for public relay access
|
||||
- **Graceful fallback**: Continue operation even if authentication fails on some relays
|
||||
- **Auth-required relay access**: Access private notes and restricted content on authenticated relays
|
||||
- **Flexible key input**: Automatically detects and handles both nsec and npub key formats
|
||||
|
||||
### Memory Management
|
||||
- **Memory-efficient deduplication**: Uses bloom filter with ~0.1% false positive rate instead of unbounded maps
|
||||
- **Fixed memory footprint**: Bloom filter uses only ~1.75MB for 1M events with controlled memory growth
|
||||
- **Memory monitoring**: Real-time memory usage tracking and automatic garbage collection
|
||||
- **Persistent deduplication**: Bloom filter can be saved and reused across multiple runs
|
||||
|
||||
### Incremental Collection
|
||||
- **Bloom filter persistence**: Save deduplication state between runs for efficient incremental collection
|
||||
- **Automatic append mode**: When loading existing bloom filter, automatically appends to output file
|
||||
- **Timestamp tracking**: Records actual time range of processed events in bloom filter output
|
||||
- **Seamless continuation**: Resume collection from where previous run left off without duplicates
|
||||
|
||||
### Reliability & Performance
|
||||
- Connects to multiple relays simultaneously with dynamic expansion
|
||||
- Outputs events in JSONL format (one JSON object per line)
|
||||
- Handles connection failures gracefully
|
||||
- Continues running until all relay connections are closed
|
||||
- Time-based filtering with Unix timestamps (since/until parameters)
|
||||
- Input validation for timestamp ranges
|
||||
- Rate limiting and backoff for relay connection management
|
||||
|
||||
## Event Discovery
|
||||
|
||||
The aggregator searches for three types of events:
|
||||
|
||||
1. **Authored Events**: Events where the specified npub is the author (pubkey field matches)
|
||||
2. **Mentioned Events**: Events that contain "p" tags referencing the specified npub (replies, mentions, etc.)
|
||||
3. **Relay List Events**: Kind 10002 events that contain relay URLs for dynamic relay discovery
|
||||
|
||||
This comprehensive approach ensures you capture all events related to a user, including:
|
||||
- Posts authored by the user
|
||||
- Replies to the user's posts
|
||||
- Posts that mention or tag the user
|
||||
- Any other events that reference the user in p-tags
|
||||
- Relay list metadata for discovering additional relays
|
||||
|
||||
## Progressive Fetching
|
||||
|
||||
The aggregator uses an intelligent progressive backward fetching strategy:
|
||||
|
||||
1. **Time-based batches**: Fetches data in weekly batches working backwards from the end time
|
||||
2. **Dynamic relay expansion**: As relay list events are discovered, new relays are automatically added to the search
|
||||
3. **Complete coverage**: Ensures all events between since and until timestamps are collected
|
||||
4. **Efficient processing**: Processes each time batch completely before moving to the next
|
||||
5. **Boundary respect**: Stops when reaching the since timestamp or beginning of available data
|
||||
|
||||
## Incremental Collection Workflow
|
||||
|
||||
The aggregator supports efficient incremental data collection using persistent bloom filters. This allows you to build comprehensive event archives over time without re-processing duplicate events.
|
||||
|
||||
### How It Works
|
||||
|
||||
1. **First Run**: Creates a new bloom filter and collects events for the specified time range
|
||||
2. **Bloom Filter Output**: At completion, outputs bloom filter summary to stderr with:
|
||||
- Event statistics (processed count, estimated unique events)
|
||||
- Time range covered (actual timestamps of collected events)
|
||||
- Base64-encoded bloom filter data for reuse
|
||||
3. **Subsequent Runs**: Load the saved bloom filter to skip already-seen events
|
||||
4. **Automatic Append**: When using an existing filter, new events are appended to the output file
|
||||
|
||||
### Bloom Filter Output Format
|
||||
|
||||
The bloom filter output includes comprehensive metadata:
|
||||
|
||||
```
|
||||
=== BLOOM FILTER SUMMARY ===
|
||||
Events processed: 1247
|
||||
Estimated unique events: 1247
|
||||
Bloom filter size: 1.75 MB
|
||||
False positive rate: ~0.1%
|
||||
Hash functions: 10
|
||||
Time range covered: 1640995200 to 1672531200
|
||||
Time range (human): 2022-01-01T00:00:00Z to 2023-01-01T00:00:00Z
|
||||
|
||||
Bloom filter (base64):
|
||||
[base64-encoded binary data]
|
||||
=== END BLOOM FILTER ===
|
||||
```
|
||||
|
||||
### Best Practices
|
||||
|
||||
- **Save bloom filters**: Always redirect stderr to a file to preserve the bloom filter
|
||||
- **Sequential time ranges**: Use non-overlapping time ranges for optimal efficiency
|
||||
- **Regular updates**: Update your bloom filter file after each run for the latest state
|
||||
- **Backup filters**: Keep copies of bloom filter files for different time periods
|
||||
|
||||
### Example Workflow
|
||||
|
||||
```bash
|
||||
# Month 1: January 2022 (using npub for public relays)
|
||||
go run main.go -key npub1... -since 1640995200 -until 1643673600 -output jan2022.jsonl 2>filter_jan.txt
|
||||
|
||||
# Month 2: February 2022 (using nsec for auth-required relays, append to same file)
|
||||
go run main.go -key nsec1... -since 1643673600 -until 1646092800 -filter filter_jan.txt -output all_events.jsonl 2>filter_feb.txt
|
||||
|
||||
# Month 3: March 2022 (continue with authentication for complete coverage)
|
||||
go run main.go -key nsec1... -since 1646092800 -until 1648771200 -filter filter_feb.txt -output all_events.jsonl 2>filter_mar.txt
|
||||
|
||||
# Result: all_events.jsonl contains deduplicated events from all three months, including private relay content
|
||||
```
|
||||
|
||||
## Memory Management
|
||||
|
||||
The aggregator uses advanced memory management techniques to handle large-scale data collection:
|
||||
|
||||
### Bloom Filter Deduplication
|
||||
- **Fixed Size**: Uses exactly 1.75MB for the bloom filter regardless of event count
|
||||
- **Low False Positive Rate**: Configured for ~0.1% false positive rate with 1M events
|
||||
- **Hash Functions**: Uses 10 independent hash functions based on SHA256 for optimal distribution
|
||||
- **Thread-Safe**: Concurrent access protected with read-write mutexes
|
||||
|
||||
### Memory Monitoring
|
||||
- **Real-time Tracking**: Monitors total memory usage every 30 seconds
|
||||
- **Automatic GC**: Triggers garbage collection when approaching memory limits
|
||||
- **Statistics Logging**: Reports bloom filter usage, estimated event count, and memory consumption
|
||||
- **Controlled Growth**: Prevents unbounded memory growth through fixed-size data structures
|
||||
|
||||
### Performance Characteristics
|
||||
- **Memory Usage**: ~1.75MB bloom filter + ~256MB total memory limit
|
||||
- **False Positives**: ~0.1% chance of incorrectly identifying a duplicate (very low impact)
|
||||
- **Scalability**: Can handle millions of events without memory issues
|
||||
- **Efficiency**: O(k) time complexity for both add and lookup operations (k = hash functions)
|
||||
|
||||
## Relays
|
||||
|
||||
The program starts with the following initial relays:
|
||||
|
||||
- wss://nostr.wine/
|
||||
- wss://nostr.land/
|
||||
- wss://orly-relay.imwald.eu
|
||||
- wss://relay.orly.dev/
|
||||
- wss://relay.damus.io/
|
||||
- wss://nos.lol/
|
||||
- wss://theforest.nostr1.com/
|
||||
|
||||
**Dynamic Relay Discovery**: Additional relays are automatically discovered and added during execution when the program finds relay list events (kind 10002) authored by the target user. This ensures comprehensive coverage across the user's preferred relay network.
|
||||
|
||||
## Output Format
|
||||
|
||||
### Event Output (stdout or -output file)
|
||||
|
||||
Each line of output is a JSON object representing a Nostr event with the following fields:
|
||||
|
||||
- `id`: Event ID (hex)
|
||||
- `pubkey`: Author's public key (hex)
|
||||
- `created_at`: Unix timestamp
|
||||
- `kind`: Event kind number
|
||||
- `tags`: Array of tag arrays
|
||||
- `content`: Event content string
|
||||
- `sig`: Event signature (hex)
|
||||
|
||||
### Bloom Filter Output (stderr)
|
||||
|
||||
At program completion, a comprehensive bloom filter summary is written to stderr containing:
|
||||
|
||||
- **Statistics**: Event counts, memory usage, performance metrics
|
||||
- **Time Range**: Actual timestamp range of collected events (both Unix and human-readable)
|
||||
- **Configuration**: Bloom filter parameters (size, hash functions, false positive rate)
|
||||
- **Binary Data**: Base64-encoded bloom filter for reuse in subsequent runs
|
||||
|
||||
The bloom filter output is structured with clear markers (`=== BLOOM FILTER SUMMARY ===` and `=== END BLOOM FILTER ===`) making it easy to parse and extract the base64 data programmatically.
|
||||
|
||||
### Output Separation
|
||||
|
||||
- **Events**: Always go to stdout (default) or the file specified by `-output`
|
||||
- **Bloom Filter**: Always goes to stderr, allowing separate redirection
|
||||
- **Logs**: Runtime information and progress updates go to stderr
|
||||
|
||||
This separation allows flexible output handling:
|
||||
```bash
|
||||
# Events to file, bloom filter visible in terminal
|
||||
./aggregator -npub npub1... -output events.jsonl
|
||||
|
||||
# Both events and bloom filter to separate files
|
||||
./aggregator -npub npub1... -output events.jsonl 2>bloom_filter.txt
|
||||
|
||||
# Events piped to another program, bloom filter saved
|
||||
./aggregator -npub npub1... 2>bloom_filter.txt | jq '.content'
|
||||
```
|
||||
1266
cmd/aggregator/main.go
Normal file
1266
cmd/aggregator/main.go
Normal file
File diff suppressed because it is too large
Load Diff
@@ -266,7 +266,7 @@ func (f *Follows) adminRelays() (urls []string) {
|
||||
|
||||
// If no admin relays found, use bootstrap relays as fallback
|
||||
if len(urls) == 0 {
|
||||
log.I.F("no admin relays found in DB, checking bootstrap relays")
|
||||
log.I.F("no admin relays found in DB, checking bootstrap relays and failover relays")
|
||||
if len(f.cfg.BootstrapRelays) > 0 {
|
||||
log.I.F("using bootstrap relays: %v", f.cfg.BootstrapRelays)
|
||||
for _, relay := range f.cfg.BootstrapRelays {
|
||||
@@ -302,7 +302,53 @@ func (f *Follows) adminRelays() (urls []string) {
|
||||
urls = append(urls, n)
|
||||
}
|
||||
} else {
|
||||
log.W.F("no bootstrap relays configured")
|
||||
log.I.F("no bootstrap relays configured, using failover relays")
|
||||
}
|
||||
|
||||
// If still no relays found, use hardcoded failover relays
|
||||
// These relays will be used to fetch admin relay lists (kind 10002) and store them
|
||||
// in the database so they're found next time
|
||||
if len(urls) == 0 {
|
||||
failoverRelays := []string{
|
||||
"wss://nostr.land",
|
||||
"wss://nostr.wine",
|
||||
"wss://nos.lol",
|
||||
"wss://relay.damus.io",
|
||||
"wss://nostr.band",
|
||||
}
|
||||
log.I.F("using failover relays: %v", failoverRelays)
|
||||
for _, relay := range failoverRelays {
|
||||
n := string(normalize.URL(relay))
|
||||
if n == "" {
|
||||
log.W.F("invalid failover relay URL: %s", relay)
|
||||
continue
|
||||
}
|
||||
// Skip if this URL is one of our configured self relay addresses or hosts
|
||||
if _, isSelf := selfSet[n]; isSelf {
|
||||
log.D.F("follows syncer: skipping configured self relay address: %s", n)
|
||||
continue
|
||||
}
|
||||
// Host match
|
||||
host := n
|
||||
if i := strings.Index(host, "://"); i >= 0 {
|
||||
host = host[i+3:]
|
||||
}
|
||||
if j := strings.Index(host, "/"); j >= 0 {
|
||||
host = host[:j]
|
||||
}
|
||||
if k := strings.Index(host, ":"); k >= 0 {
|
||||
host = host[:k]
|
||||
}
|
||||
if _, isSelfHost := selfHosts[host]; isSelfHost {
|
||||
log.D.F("follows syncer: skipping configured self relay address: %s", n)
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[n]; ok {
|
||||
continue
|
||||
}
|
||||
seen[n] = struct{}{}
|
||||
urls = append(urls, n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -451,6 +497,7 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
|
||||
keepaliveTicker := time.NewTicker(30 * time.Second)
|
||||
defer keepaliveTicker.Stop()
|
||||
|
||||
readLoop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -460,7 +507,7 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
|
||||
// Send ping to keep connection alive
|
||||
if err := c.Ping(ctx); err != nil {
|
||||
log.T.F("follows syncer: ping failed for %s: %v", u, err)
|
||||
break
|
||||
break readLoop
|
||||
}
|
||||
log.T.F("follows syncer: sent ping to %s", u)
|
||||
continue
|
||||
@@ -471,7 +518,7 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
|
||||
readCancel()
|
||||
if err != nil {
|
||||
_ = c.Close(websocket.StatusNormalClosure, "read err")
|
||||
break
|
||||
break readLoop
|
||||
}
|
||||
label, rem, err := envelopes.Identify(data)
|
||||
if chk.E(err) {
|
||||
@@ -634,7 +681,7 @@ func (f *Follows) fetchAdminFollowLists() {
|
||||
|
||||
urls := f.adminRelays()
|
||||
if len(urls) == 0 {
|
||||
log.W.F("follows syncer: no admin relays found for follow list fetching")
|
||||
log.W.F("follows syncer: no relays available for follow list fetching (no admin relays, bootstrap relays, or failover relays)")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -680,14 +727,19 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) {
|
||||
|
||||
log.I.F("follows syncer: fetching follow lists from relay %s", relayURL)
|
||||
|
||||
// Create filter for follow lists only (kind 3)
|
||||
// Create filter for follow lists and relay lists (kind 3 and kind 10002)
|
||||
ff := &filter.S{}
|
||||
f1 := &filter.F{
|
||||
Authors: tag.NewFromBytesSlice(authors...),
|
||||
Kinds: kind.NewS(kind.New(kind.FollowList.K)),
|
||||
Limit: values.ToUintPointer(100),
|
||||
}
|
||||
*ff = append(*ff, f1)
|
||||
f2 := &filter.F{
|
||||
Authors: tag.NewFromBytesSlice(authors...),
|
||||
Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
|
||||
Limit: values.ToUintPointer(100),
|
||||
}
|
||||
*ff = append(*ff, f1, f2)
|
||||
|
||||
// Use a specific subscription ID for follow list fetching
|
||||
subID := "follow-lists-fetch"
|
||||
@@ -699,24 +751,28 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) {
|
||||
return
|
||||
}
|
||||
|
||||
log.T.F("follows syncer: sent follow list REQ to %s", relayURL)
|
||||
log.T.F("follows syncer: sent follow list and relay list REQ to %s", relayURL)
|
||||
|
||||
// Read follow list events with timeout
|
||||
// Collect all events before processing
|
||||
var followListEvents []*event.E
|
||||
var relayListEvents []*event.E
|
||||
|
||||
// Read events with timeout
|
||||
timeout := time.After(10 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
goto processEvents
|
||||
case <-timeout:
|
||||
log.T.F("follows syncer: timeout reading follow lists from %s", relayURL)
|
||||
return
|
||||
log.T.F("follows syncer: timeout reading events from %s", relayURL)
|
||||
goto processEvents
|
||||
default:
|
||||
}
|
||||
|
||||
_, data, err := c.Read(ctx)
|
||||
if err != nil {
|
||||
log.T.F("follows syncer: error reading follow lists from %s: %v", relayURL, err)
|
||||
return
|
||||
log.T.F("follows syncer: error reading events from %s: %v", relayURL, err)
|
||||
goto processEvents
|
||||
}
|
||||
|
||||
label, rem, err := envelopes.Identify(data)
|
||||
@@ -731,19 +787,101 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Process follow list events
|
||||
if res.Event.Kind == kind.FollowList.K {
|
||||
// Collect events by kind
|
||||
switch res.Event.Kind {
|
||||
case kind.FollowList.K:
|
||||
log.I.F("follows syncer: received follow list from %s on relay %s",
|
||||
hex.EncodeToString(res.Event.Pubkey), relayURL)
|
||||
f.extractFollowedPubkeys(res.Event)
|
||||
followListEvents = append(followListEvents, res.Event)
|
||||
case kind.RelayListMetadata.K:
|
||||
log.I.F("follows syncer: received relay list from %s on relay %s",
|
||||
hex.EncodeToString(res.Event.Pubkey), relayURL)
|
||||
relayListEvents = append(relayListEvents, res.Event)
|
||||
}
|
||||
case eoseenvelope.L:
|
||||
log.T.F("follows syncer: end of follow list events from %s", relayURL)
|
||||
return
|
||||
log.T.F("follows syncer: end of events from %s", relayURL)
|
||||
goto processEvents
|
||||
default:
|
||||
// ignore other labels
|
||||
}
|
||||
}
|
||||
|
||||
processEvents:
|
||||
// Process collected events - keep only the newest per pubkey and save to database
|
||||
f.processCollectedEvents(relayURL, followListEvents, relayListEvents)
|
||||
}
|
||||
|
||||
// processCollectedEvents processes the collected events, keeping only the newest per pubkey
|
||||
func (f *Follows) processCollectedEvents(relayURL string, followListEvents, relayListEvents []*event.E) {
|
||||
// Process follow list events (kind 3) - keep newest per pubkey
|
||||
latestFollowLists := make(map[string]*event.E)
|
||||
for _, ev := range followListEvents {
|
||||
pubkeyHex := hex.EncodeToString(ev.Pubkey)
|
||||
existing, exists := latestFollowLists[pubkeyHex]
|
||||
if !exists || ev.CreatedAt > existing.CreatedAt {
|
||||
latestFollowLists[pubkeyHex] = ev
|
||||
}
|
||||
}
|
||||
|
||||
// Process relay list events (kind 10002) - keep newest per pubkey
|
||||
latestRelayLists := make(map[string]*event.E)
|
||||
for _, ev := range relayListEvents {
|
||||
pubkeyHex := hex.EncodeToString(ev.Pubkey)
|
||||
existing, exists := latestRelayLists[pubkeyHex]
|
||||
if !exists || ev.CreatedAt > existing.CreatedAt {
|
||||
latestRelayLists[pubkeyHex] = ev
|
||||
}
|
||||
}
|
||||
|
||||
// Save and process the newest events
|
||||
savedFollowLists := 0
|
||||
savedRelayLists := 0
|
||||
|
||||
// Save follow list events to database and extract follows
|
||||
for pubkeyHex, ev := range latestFollowLists {
|
||||
if _, err := f.D.SaveEvent(f.Ctx, ev); err != nil {
|
||||
if !strings.HasPrefix(err.Error(), "blocked:") {
|
||||
log.W.F("follows syncer: failed to save follow list from %s: %v", pubkeyHex, err)
|
||||
}
|
||||
} else {
|
||||
savedFollowLists++
|
||||
log.I.F("follows syncer: saved newest follow list from %s (created_at: %d) from relay %s",
|
||||
pubkeyHex, ev.CreatedAt, relayURL)
|
||||
}
|
||||
|
||||
// Extract followed pubkeys from admin follow lists
|
||||
if f.isAdminPubkey(ev.Pubkey) {
|
||||
log.I.F("follows syncer: processing admin follow list from %s", pubkeyHex)
|
||||
f.extractFollowedPubkeys(ev)
|
||||
}
|
||||
}
|
||||
|
||||
// Save relay list events to database
|
||||
for pubkeyHex, ev := range latestRelayLists {
|
||||
if _, err := f.D.SaveEvent(f.Ctx, ev); err != nil {
|
||||
if !strings.HasPrefix(err.Error(), "blocked:") {
|
||||
log.W.F("follows syncer: failed to save relay list from %s: %v", pubkeyHex, err)
|
||||
}
|
||||
} else {
|
||||
savedRelayLists++
|
||||
log.I.F("follows syncer: saved newest relay list from %s (created_at: %d) from relay %s",
|
||||
pubkeyHex, ev.CreatedAt, relayURL)
|
||||
}
|
||||
}
|
||||
|
||||
log.I.F("follows syncer: processed %d follow lists and %d relay lists from %s, saved %d follow lists and %d relay lists",
|
||||
len(followListEvents), len(relayListEvents), relayURL, savedFollowLists, savedRelayLists)
|
||||
|
||||
// If we saved any relay lists, trigger a refresh of subscriptions to use the new relay lists
|
||||
if savedRelayLists > 0 {
|
||||
log.I.F("follows syncer: saved new relay lists, triggering subscription refresh")
|
||||
// Signal that follows have been updated to refresh subscriptions
|
||||
select {
|
||||
case f.updated <- struct{}{}:
|
||||
default:
|
||||
// Channel might be full, that's okay
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetFollowedPubkeys returns a copy of the followed pubkeys list
|
||||
@@ -783,6 +921,11 @@ func (f *Follows) extractFollowedPubkeys(event *event.E) {
|
||||
}
|
||||
}
|
||||
|
||||
// AdminRelays returns the admin relay URLs
|
||||
func (f *Follows) AdminRelays() []string {
|
||||
return f.adminRelays()
|
||||
}
|
||||
|
||||
// AddFollow appends a pubkey to the in-memory follows list if not already present
|
||||
// and signals the syncer to refresh subscriptions.
|
||||
func (f *Follows) AddFollow(pub []byte) {
|
||||
|
||||
@@ -114,9 +114,20 @@ func UnmarshalQuoted(b []byte) (content, rem []byte, err error) {
|
||||
//
|
||||
// backspace, tab, newline, form feed or carriage return.
|
||||
case '\b', '\t', '\n', '\f', '\r':
|
||||
pos := len(content) - len(rem)
|
||||
contextStart := pos - 10
|
||||
if contextStart < 0 {
|
||||
contextStart = 0
|
||||
}
|
||||
contextEnd := pos + 10
|
||||
if contextEnd > len(content) {
|
||||
contextEnd = len(content)
|
||||
}
|
||||
err = errorf.E(
|
||||
"invalid character '%s' in quoted string",
|
||||
"invalid character '%s' in quoted string (position %d, context: %q)",
|
||||
NostrEscape(nil, rem[:1]),
|
||||
pos,
|
||||
string(content[contextStart:contextEnd]),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
581
pkg/spider/spider.go
Normal file
581
pkg/spider/spider.go
Normal file
@@ -0,0 +1,581 @@
|
||||
package spider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/errorf"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/database"
|
||||
"next.orly.dev/pkg/encoders/filter"
|
||||
"next.orly.dev/pkg/encoders/tag"
|
||||
"next.orly.dev/pkg/encoders/timestamp"
|
||||
"next.orly.dev/pkg/interfaces/publisher"
|
||||
"next.orly.dev/pkg/protocol/ws"
|
||||
)
|
||||
|
||||
const (
|
||||
// BatchSize is the number of pubkeys per subscription batch
|
||||
BatchSize = 20
|
||||
// CatchupWindow is the extra time added to disconnection periods for catch-up
|
||||
CatchupWindow = 30 * time.Minute
|
||||
// ReconnectDelay is the delay between reconnection attempts
|
||||
ReconnectDelay = 5 * time.Second
|
||||
// MaxReconnectDelay is the maximum delay between reconnection attempts
|
||||
MaxReconnectDelay = 5 * time.Minute
|
||||
)
|
||||
|
||||
// Spider manages connections to admin relays and syncs events for followed pubkeys
|
||||
type Spider struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
db *database.D
|
||||
pub publisher.I
|
||||
mode string
|
||||
|
||||
// Configuration
|
||||
adminRelays []string
|
||||
followList [][]byte
|
||||
|
||||
// State management
|
||||
mu sync.RWMutex
|
||||
connections map[string]*RelayConnection
|
||||
running bool
|
||||
|
||||
// Callbacks for getting updated data
|
||||
getAdminRelays func() []string
|
||||
getFollowList func() [][]byte
|
||||
}
|
||||
|
||||
// RelayConnection manages a single relay connection and its subscriptions
|
||||
type RelayConnection struct {
|
||||
url string
|
||||
client *ws.Client
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
spider *Spider
|
||||
|
||||
// Subscription management
|
||||
mu sync.RWMutex
|
||||
subscriptions map[string]*BatchSubscription
|
||||
|
||||
// Disconnection tracking
|
||||
lastDisconnect time.Time
|
||||
reconnectDelay time.Duration
|
||||
}
|
||||
|
||||
// BatchSubscription represents a subscription for a batch of pubkeys
|
||||
type BatchSubscription struct {
|
||||
id string
|
||||
pubkeys [][]byte
|
||||
startTime time.Time
|
||||
sub *ws.Subscription
|
||||
relay *RelayConnection
|
||||
|
||||
// Track disconnection periods for catch-up
|
||||
disconnectedAt *time.Time
|
||||
}
|
||||
|
||||
// DisconnectionPeriod tracks when a subscription was disconnected
|
||||
type DisconnectionPeriod struct {
|
||||
Start time.Time
|
||||
End time.Time
|
||||
}
|
||||
|
||||
// New creates a new Spider instance
|
||||
func New(ctx context.Context, db *database.D, pub publisher.I, mode string) (s *Spider, err error) {
|
||||
if db == nil {
|
||||
err = errorf.E("database cannot be nil")
|
||||
return
|
||||
}
|
||||
|
||||
// Validate mode
|
||||
switch mode {
|
||||
case "follows", "none":
|
||||
// Valid modes
|
||||
default:
|
||||
err = errorf.E("invalid spider mode: %s (valid modes: none, follows)", mode)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
s = &Spider{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
db: db,
|
||||
pub: pub,
|
||||
mode: mode,
|
||||
connections: make(map[string]*RelayConnection),
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// SetCallbacks sets the callback functions for getting updated admin relays and follow lists
|
||||
func (s *Spider) SetCallbacks(getAdminRelays func() []string, getFollowList func() [][]byte) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.getAdminRelays = getAdminRelays
|
||||
s.getFollowList = getFollowList
|
||||
}
|
||||
|
||||
// Start begins the spider operation
|
||||
func (s *Spider) Start() (err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.running {
|
||||
err = errorf.E("spider already running")
|
||||
return
|
||||
}
|
||||
|
||||
// Handle 'none' mode - no-op
|
||||
if s.mode == "none" {
|
||||
log.I.F("spider: mode is 'none', not starting")
|
||||
return
|
||||
}
|
||||
|
||||
if s.getAdminRelays == nil || s.getFollowList == nil {
|
||||
err = errorf.E("callbacks must be set before starting")
|
||||
return
|
||||
}
|
||||
|
||||
s.running = true
|
||||
|
||||
// Start the main loop
|
||||
go s.mainLoop()
|
||||
|
||||
log.I.F("spider: started in '%s' mode", s.mode)
|
||||
return
|
||||
}
|
||||
|
||||
// Stop stops the spider operation
|
||||
func (s *Spider) Stop() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if !s.running {
|
||||
return
|
||||
}
|
||||
|
||||
s.running = false
|
||||
s.cancel()
|
||||
|
||||
// Close all connections
|
||||
for _, conn := range s.connections {
|
||||
conn.close()
|
||||
}
|
||||
s.connections = make(map[string]*RelayConnection)
|
||||
|
||||
log.I.F("spider: stopped")
|
||||
}
|
||||
|
||||
// mainLoop is the main spider loop that manages connections and subscriptions
|
||||
func (s *Spider) mainLoop() {
|
||||
ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.updateConnections()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateConnections updates relay connections based on current admin relays and follow lists
|
||||
func (s *Spider) updateConnections() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if !s.running {
|
||||
return
|
||||
}
|
||||
|
||||
// Get current admin relays and follow list
|
||||
adminRelays := s.getAdminRelays()
|
||||
followList := s.getFollowList()
|
||||
|
||||
if len(adminRelays) == 0 || len(followList) == 0 {
|
||||
log.D.F("spider: no admin relays (%d) or follow list (%d) available",
|
||||
len(adminRelays), len(followList))
|
||||
return
|
||||
}
|
||||
|
||||
// Update connections for current admin relays
|
||||
currentRelays := make(map[string]bool)
|
||||
for _, url := range adminRelays {
|
||||
currentRelays[url] = true
|
||||
|
||||
if conn, exists := s.connections[url]; exists {
|
||||
// Update existing connection
|
||||
conn.updateSubscriptions(followList)
|
||||
} else {
|
||||
// Create new connection
|
||||
s.createConnection(url, followList)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove connections for relays no longer in admin list
|
||||
for url, conn := range s.connections {
|
||||
if !currentRelays[url] {
|
||||
log.I.F("spider: removing connection to %s (no longer in admin relays)", url)
|
||||
conn.close()
|
||||
delete(s.connections, url)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// createConnection creates a new relay connection
|
||||
func (s *Spider) createConnection(url string, followList [][]byte) {
|
||||
log.I.F("spider: creating connection to %s", url)
|
||||
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
conn := &RelayConnection{
|
||||
url: url,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
spider: s,
|
||||
subscriptions: make(map[string]*BatchSubscription),
|
||||
reconnectDelay: ReconnectDelay,
|
||||
}
|
||||
|
||||
s.connections[url] = conn
|
||||
|
||||
// Start connection in goroutine
|
||||
go conn.manage(followList)
|
||||
}
|
||||
|
||||
// manage handles the lifecycle of a relay connection
|
||||
func (rc *RelayConnection) manage(followList [][]byte) {
|
||||
for {
|
||||
select {
|
||||
case <-rc.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Attempt to connect
|
||||
if err := rc.connect(); chk.E(err) {
|
||||
log.W.F("spider: failed to connect to %s: %v", rc.url, err)
|
||||
rc.waitBeforeReconnect()
|
||||
continue
|
||||
}
|
||||
|
||||
log.I.F("spider: connected to %s", rc.url)
|
||||
rc.reconnectDelay = ReconnectDelay // Reset delay on successful connection
|
||||
|
||||
// Create subscriptions for follow list
|
||||
rc.createSubscriptions(followList)
|
||||
|
||||
// Wait for disconnection
|
||||
<-rc.client.Context().Done()
|
||||
|
||||
log.W.F("spider: disconnected from %s: %v", rc.url, rc.client.ConnectionCause())
|
||||
rc.handleDisconnection()
|
||||
|
||||
// Clean up
|
||||
rc.client = nil
|
||||
rc.clearSubscriptions()
|
||||
}
|
||||
}
|
||||
|
||||
// connect establishes a websocket connection to the relay
|
||||
func (rc *RelayConnection) connect() (err error) {
|
||||
connectCtx, cancel := context.WithTimeout(rc.ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if rc.client, err = ws.RelayConnect(connectCtx, rc.url); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// waitBeforeReconnect waits before attempting to reconnect with exponential backoff
|
||||
func (rc *RelayConnection) waitBeforeReconnect() {
|
||||
select {
|
||||
case <-rc.ctx.Done():
|
||||
return
|
||||
case <-time.After(rc.reconnectDelay):
|
||||
}
|
||||
|
||||
// Exponential backoff
|
||||
rc.reconnectDelay *= 2
|
||||
if rc.reconnectDelay > MaxReconnectDelay {
|
||||
rc.reconnectDelay = MaxReconnectDelay
|
||||
}
|
||||
}
|
||||
|
||||
// handleDisconnection records disconnection time for catch-up logic
|
||||
func (rc *RelayConnection) handleDisconnection() {
|
||||
now := time.Now()
|
||||
rc.lastDisconnect = now
|
||||
|
||||
// Mark all subscriptions as disconnected
|
||||
rc.mu.Lock()
|
||||
defer rc.mu.Unlock()
|
||||
|
||||
for _, sub := range rc.subscriptions {
|
||||
if sub.disconnectedAt == nil {
|
||||
sub.disconnectedAt = &now
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// createSubscriptions creates batch subscriptions for the follow list
|
||||
func (rc *RelayConnection) createSubscriptions(followList [][]byte) {
|
||||
rc.mu.Lock()
|
||||
defer rc.mu.Unlock()
|
||||
|
||||
// Clear existing subscriptions
|
||||
rc.clearSubscriptionsLocked()
|
||||
|
||||
// Create batches of pubkeys
|
||||
batches := rc.createBatches(followList)
|
||||
|
||||
log.I.F("spider: creating %d subscription batches for %d pubkeys on %s",
|
||||
len(batches), len(followList), rc.url)
|
||||
|
||||
for i, batch := range batches {
|
||||
batchID := fmt.Sprintf("batch-%d", i) // Simple batch ID
|
||||
rc.createBatchSubscription(batchID, batch)
|
||||
}
|
||||
}
|
||||
|
||||
// createBatches splits the follow list into batches of BatchSize
|
||||
func (rc *RelayConnection) createBatches(followList [][]byte) (batches [][][]byte) {
|
||||
for i := 0; i < len(followList); i += BatchSize {
|
||||
end := i + BatchSize
|
||||
if end > len(followList) {
|
||||
end = len(followList)
|
||||
}
|
||||
|
||||
batch := make([][]byte, end-i)
|
||||
copy(batch, followList[i:end])
|
||||
batches = append(batches, batch)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// createBatchSubscription creates a subscription for a batch of pubkeys
|
||||
func (rc *RelayConnection) createBatchSubscription(batchID string, pubkeys [][]byte) {
|
||||
if rc.client == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Create filters: one for authors, one for p tags
|
||||
var pTags tag.S
|
||||
for _, pk := range pubkeys {
|
||||
pTags = append(pTags, tag.NewFromAny("p", pk))
|
||||
}
|
||||
|
||||
filters := filter.NewS(
|
||||
&filter.F{
|
||||
Authors: tag.NewFromBytesSlice(pubkeys...),
|
||||
},
|
||||
&filter.F{
|
||||
Tags: tag.NewS(pTags...),
|
||||
},
|
||||
)
|
||||
|
||||
// Subscribe
|
||||
sub, err := rc.client.Subscribe(rc.ctx, filters)
|
||||
if chk.E(err) {
|
||||
log.E.F("spider: failed to create subscription %s on %s: %v", batchID, rc.url, err)
|
||||
return
|
||||
}
|
||||
|
||||
batchSub := &BatchSubscription{
|
||||
id: batchID,
|
||||
pubkeys: pubkeys,
|
||||
startTime: time.Now(),
|
||||
sub: sub,
|
||||
relay: rc,
|
||||
}
|
||||
|
||||
rc.subscriptions[batchID] = batchSub
|
||||
|
||||
// Start event handler
|
||||
go batchSub.handleEvents()
|
||||
|
||||
log.D.F("spider: created subscription %s for %d pubkeys on %s",
|
||||
batchID, len(pubkeys), rc.url)
|
||||
}
|
||||
|
||||
// handleEvents processes events from the subscription
|
||||
func (bs *BatchSubscription) handleEvents() {
|
||||
for {
|
||||
select {
|
||||
case <-bs.relay.ctx.Done():
|
||||
return
|
||||
case ev := <-bs.sub.Events:
|
||||
if ev == nil {
|
||||
return // Subscription closed
|
||||
}
|
||||
|
||||
// Save event to database
|
||||
if _, err := bs.relay.spider.db.SaveEvent(bs.relay.ctx, ev); err != nil {
|
||||
if !chk.E(err) {
|
||||
log.T.F("spider: saved event %s from %s",
|
||||
hex.EncodeToString(ev.ID[:]), bs.relay.url)
|
||||
}
|
||||
} else {
|
||||
// Publish event if it was newly saved
|
||||
if bs.relay.spider.pub != nil {
|
||||
go bs.relay.spider.pub.Deliver(ev)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateSubscriptions updates subscriptions for a connection with new follow list
|
||||
func (rc *RelayConnection) updateSubscriptions(followList [][]byte) {
|
||||
if rc.client == nil || !rc.client.IsConnected() {
|
||||
return // Will be handled on reconnection
|
||||
}
|
||||
|
||||
rc.mu.Lock()
|
||||
defer rc.mu.Unlock()
|
||||
|
||||
// Check if we need to perform catch-up for disconnected subscriptions
|
||||
now := time.Now()
|
||||
needsCatchup := false
|
||||
|
||||
for _, sub := range rc.subscriptions {
|
||||
if sub.disconnectedAt != nil {
|
||||
needsCatchup = true
|
||||
rc.performCatchup(sub, *sub.disconnectedAt, now, followList)
|
||||
sub.disconnectedAt = nil // Clear disconnection marker
|
||||
}
|
||||
}
|
||||
|
||||
if needsCatchup {
|
||||
log.I.F("spider: performed catch-up for disconnected subscriptions on %s", rc.url)
|
||||
}
|
||||
|
||||
// Recreate subscriptions with updated follow list
|
||||
rc.clearSubscriptionsLocked()
|
||||
|
||||
batches := rc.createBatches(followList)
|
||||
for i, batch := range batches {
|
||||
batchID := fmt.Sprintf("batch-%d", i)
|
||||
rc.createBatchSubscription(batchID, batch)
|
||||
}
|
||||
}
|
||||
|
||||
// performCatchup queries for events missed during disconnection
|
||||
func (rc *RelayConnection) performCatchup(sub *BatchSubscription, disconnectTime, reconnectTime time.Time, followList [][]byte) {
|
||||
// Expand time window by CatchupWindow on both sides
|
||||
since := disconnectTime.Add(-CatchupWindow)
|
||||
until := reconnectTime.Add(CatchupWindow)
|
||||
|
||||
log.I.F("spider: performing catch-up for %s from %v to %v (expanded window)",
|
||||
rc.url, since, until)
|
||||
|
||||
// Create catch-up filters with time constraints
|
||||
sinceTs := timestamp.T{V: since.Unix()}
|
||||
untilTs := timestamp.T{V: until.Unix()}
|
||||
|
||||
var pTags tag.S
|
||||
for _, pk := range sub.pubkeys {
|
||||
pTags = append(pTags, tag.NewFromAny("p", pk))
|
||||
}
|
||||
|
||||
filters := filter.NewS(
|
||||
&filter.F{
|
||||
Authors: tag.NewFromBytesSlice(sub.pubkeys...),
|
||||
Since: &sinceTs,
|
||||
Until: &untilTs,
|
||||
},
|
||||
&filter.F{
|
||||
Tags: tag.NewS(pTags...),
|
||||
Since: &sinceTs,
|
||||
Until: &untilTs,
|
||||
},
|
||||
)
|
||||
|
||||
// Create temporary subscription for catch-up
|
||||
catchupCtx, cancel := context.WithTimeout(rc.ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
catchupSub, err := rc.client.Subscribe(catchupCtx, filters)
|
||||
if chk.E(err) {
|
||||
log.E.F("spider: failed to create catch-up subscription on %s: %v", rc.url, err)
|
||||
return
|
||||
}
|
||||
defer catchupSub.Unsub()
|
||||
|
||||
// Process catch-up events
|
||||
eventCount := 0
|
||||
timeout := time.After(30 * time.Second)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-catchupCtx.Done():
|
||||
log.D.F("spider: catch-up completed on %s, processed %d events", rc.url, eventCount)
|
||||
return
|
||||
case <-timeout:
|
||||
log.D.F("spider: catch-up timeout on %s, processed %d events", rc.url, eventCount)
|
||||
return
|
||||
case <-catchupSub.EndOfStoredEvents:
|
||||
log.D.F("spider: catch-up EOSE on %s, processed %d events", rc.url, eventCount)
|
||||
return
|
||||
case ev := <-catchupSub.Events:
|
||||
if ev == nil {
|
||||
return
|
||||
}
|
||||
|
||||
eventCount++
|
||||
|
||||
// Save event to database
|
||||
if _, err := rc.spider.db.SaveEvent(rc.ctx, ev); err != nil {
|
||||
if !chk.E(err) {
|
||||
log.T.F("spider: catch-up saved event %s from %s",
|
||||
hex.EncodeToString(ev.ID[:]), rc.url)
|
||||
}
|
||||
} else {
|
||||
// Publish event if it was newly saved
|
||||
if rc.spider.pub != nil {
|
||||
go rc.spider.pub.Deliver(ev)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// clearSubscriptions clears all subscriptions (with lock)
|
||||
func (rc *RelayConnection) clearSubscriptions() {
|
||||
rc.mu.Lock()
|
||||
defer rc.mu.Unlock()
|
||||
rc.clearSubscriptionsLocked()
|
||||
}
|
||||
|
||||
// clearSubscriptionsLocked clears all subscriptions (without lock)
|
||||
func (rc *RelayConnection) clearSubscriptionsLocked() {
|
||||
for _, sub := range rc.subscriptions {
|
||||
if sub.sub != nil {
|
||||
sub.sub.Unsub()
|
||||
}
|
||||
}
|
||||
rc.subscriptions = make(map[string]*BatchSubscription)
|
||||
}
|
||||
|
||||
// close closes the relay connection
|
||||
func (rc *RelayConnection) close() {
|
||||
rc.clearSubscriptions()
|
||||
|
||||
if rc.client != nil {
|
||||
rc.client.Close()
|
||||
rc.client = nil
|
||||
}
|
||||
|
||||
rc.cancel()
|
||||
}
|
||||
244
pkg/spider/spider_test.go
Normal file
244
pkg/spider/spider_test.go
Normal file
@@ -0,0 +1,244 @@
|
||||
package spider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"next.orly.dev/pkg/database"
|
||||
)
|
||||
|
||||
func TestSpiderCreation(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create a temporary database for testing
|
||||
tempDir, err := os.MkdirTemp("", "spider-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
db, err := database.New(ctx, cancel, tempDir, "error")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Test spider creation
|
||||
spider, err := New(ctx, db, nil, "follows")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create spider: %v", err)
|
||||
}
|
||||
|
||||
if spider == nil {
|
||||
t.Fatal("Spider is nil")
|
||||
}
|
||||
|
||||
// Test that spider is not running initially
|
||||
spider.mu.RLock()
|
||||
running := spider.running
|
||||
spider.mu.RUnlock()
|
||||
|
||||
if running {
|
||||
t.Error("Spider should not be running initially")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSpiderCallbacks(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create a temporary database for testing
|
||||
tempDir, err := os.MkdirTemp("", "spider-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
db, err := database.New(ctx, cancel, tempDir, "error")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
spider, err := New(ctx, db, nil, "follows")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create spider: %v", err)
|
||||
}
|
||||
|
||||
// Test callback setup
|
||||
testRelays := []string{"wss://relay1.example.com", "wss://relay2.example.com"}
|
||||
testPubkeys := [][]byte{{1, 2, 3}, {4, 5, 6}}
|
||||
|
||||
spider.SetCallbacks(
|
||||
func() []string { return testRelays },
|
||||
func() [][]byte { return testPubkeys },
|
||||
)
|
||||
|
||||
// Verify callbacks are set
|
||||
spider.mu.RLock()
|
||||
hasCallbacks := spider.getAdminRelays != nil && spider.getFollowList != nil
|
||||
spider.mu.RUnlock()
|
||||
|
||||
if !hasCallbacks {
|
||||
t.Error("Callbacks should be set")
|
||||
}
|
||||
|
||||
// Test that start fails without callbacks being set first
|
||||
spider2, err := New(ctx, db, nil, "follows")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create second spider: %v", err)
|
||||
}
|
||||
|
||||
err = spider2.Start()
|
||||
if err == nil {
|
||||
t.Error("Start should fail when callbacks are not set")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSpiderModeValidation(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create a temporary database for testing
|
||||
tempDir, err := os.MkdirTemp("", "spider-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
db, err := database.New(ctx, cancel, tempDir, "error")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Test valid mode
|
||||
spider, err := New(ctx, db, nil, "follows")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create spider with valid mode: %v", err)
|
||||
}
|
||||
if spider == nil {
|
||||
t.Fatal("Spider should not be nil for valid mode")
|
||||
}
|
||||
|
||||
// Test invalid mode
|
||||
_, err = New(ctx, db, nil, "invalid")
|
||||
if err == nil {
|
||||
t.Error("Should fail with invalid mode")
|
||||
}
|
||||
|
||||
// Test none mode (should succeed but be a no-op)
|
||||
spider2, err := New(ctx, db, nil, "none")
|
||||
if err != nil {
|
||||
t.Errorf("Should succeed with 'none' mode: %v", err)
|
||||
}
|
||||
if spider2 == nil {
|
||||
t.Error("Spider should not be nil for 'none' mode")
|
||||
}
|
||||
|
||||
// Test that 'none' mode doesn't require callbacks
|
||||
err = spider2.Start()
|
||||
if err != nil {
|
||||
t.Errorf("'none' mode should start without callbacks: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSpiderBatching(t *testing.T) {
|
||||
// Test batch creation logic
|
||||
followList := make([][]byte, 50) // 50 pubkeys
|
||||
for i := range followList {
|
||||
followList[i] = make([]byte, 32)
|
||||
for j := range followList[i] {
|
||||
followList[i][j] = byte(i)
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
rc := &RelayConnection{
|
||||
url: "wss://test.relay.com",
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
batches := rc.createBatches(followList)
|
||||
|
||||
// Should create 3 batches: 20, 20, 10
|
||||
expectedBatches := 3
|
||||
if len(batches) != expectedBatches {
|
||||
t.Errorf("Expected %d batches, got %d", expectedBatches, len(batches))
|
||||
}
|
||||
|
||||
// Check batch sizes
|
||||
if len(batches[0]) != BatchSize {
|
||||
t.Errorf("First batch should have %d pubkeys, got %d", BatchSize, len(batches[0]))
|
||||
}
|
||||
if len(batches[1]) != BatchSize {
|
||||
t.Errorf("Second batch should have %d pubkeys, got %d", BatchSize, len(batches[1]))
|
||||
}
|
||||
if len(batches[2]) != 10 {
|
||||
t.Errorf("Third batch should have 10 pubkeys, got %d", len(batches[2]))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSpiderStartStop(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create a temporary database for testing
|
||||
tempDir, err := os.MkdirTemp("", "spider-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
db, err := database.New(ctx, cancel, tempDir, "error")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
spider, err := New(ctx, db, nil, "follows")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create spider: %v", err)
|
||||
}
|
||||
|
||||
// Set up callbacks
|
||||
spider.SetCallbacks(
|
||||
func() []string { return []string{"wss://test.relay.com"} },
|
||||
func() [][]byte { return [][]byte{{1, 2, 3}} },
|
||||
)
|
||||
|
||||
// Test start
|
||||
err = spider.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start spider: %v", err)
|
||||
}
|
||||
|
||||
// Verify spider is running
|
||||
spider.mu.RLock()
|
||||
running := spider.running
|
||||
spider.mu.RUnlock()
|
||||
|
||||
if !running {
|
||||
t.Error("Spider should be running after start")
|
||||
}
|
||||
|
||||
// Test stop
|
||||
spider.Stop()
|
||||
|
||||
// Give it a moment to stop
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Verify spider is stopped
|
||||
spider.mu.RLock()
|
||||
running = spider.running
|
||||
spider.mu.RUnlock()
|
||||
|
||||
if running {
|
||||
t.Error("Spider should not be running after stop")
|
||||
}
|
||||
}
|
||||
@@ -1 +1 @@
|
||||
v0.17.12
|
||||
v0.17.16
|
||||
Reference in New Issue
Block a user