Compare commits

...

2 Commits

Author SHA1 Message Date
4c53709e2d Add aggregator functionality for Nostr event collection
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
- Introduced a new `aggregator` package to search for events related to a specific npub across multiple Nostr relays.
- Implemented dynamic relay discovery from relay list events and progressive backward time-based fetching for comprehensive historical data collection.
- Added a bloom filter for memory-efficient event deduplication with a low false positive rate.
- Enhanced memory management with real-time monitoring and automatic garbage collection.
- Updated README with usage instructions, features, and detailed explanations of event discovery and memory management strategies.
- Bumped version to v0.17.15.
2025-10-23 12:17:50 +01:00
a4fc3d8d9b Implement spider functionality for event synchronization
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
- Introduced a new `spider` package to manage connections to admin relays and synchronize events for followed pubkeys.
- Added configuration options for spider mode in the application settings, allowing for different operational modes (e.g., follows).
- Implemented callback mechanisms to dynamically retrieve admin relays and follow lists.
- Enhanced the main application to initialize and manage the spider, including starting and stopping its operation.
- Added tests to validate spider creation, callbacks, and operational behavior.
- Bumped version to v0.17.14.
2025-10-22 22:24:21 +01:00
12 changed files with 2234 additions and 21 deletions

View File

@@ -59,6 +59,9 @@ type C struct {
// Sprocket settings
SprocketEnabled bool `env:"ORLY_SPROCKET_ENABLED" default:"false" usage:"enable sprocket event processing plugin system"`
// Spider settings
SpiderMode string `env:"ORLY_SPIDER_MODE" default:"none" usage:"spider mode for syncing events: none, follows"`
PolicyEnabled bool `env:"ORLY_POLICY_ENABLED" default:"false" usage:"enable policy-based event processing (configuration found in $HOME/.config/ORLY/policy.json)"`
}

View File

@@ -3,6 +3,7 @@ package app
import (
"fmt"
"time"
"unicode"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
@@ -15,6 +16,42 @@ import (
"next.orly.dev/pkg/encoders/envelopes/reqenvelope"
)
// validateJSONMessage checks if a message contains invalid control characters
// that would cause JSON parsing to fail
func validateJSONMessage(msg []byte) (err error) {
for i, b := range msg {
// Check for invalid control characters in JSON strings
if b < 32 && b != '\t' && b != '\n' && b != '\r' {
// Allow some control characters that might be valid in certain contexts
// but reject form feed (\f), backspace (\b), and other problematic ones
switch b {
case '\b', '\f', 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07,
0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17,
0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F:
return fmt.Errorf("invalid control character 0x%02X at position %d", b, i)
}
}
// Check for non-printable characters that might indicate binary data
if b > 127 && !unicode.IsPrint(rune(b)) {
// Allow valid UTF-8 sequences, but be suspicious of random binary data
if i < len(msg)-1 {
// Quick check: if we see a lot of high-bit characters in sequence,
// it might be binary data masquerading as text
highBitCount := 0
for j := i; j < len(msg) && j < i+10; j++ {
if msg[j] > 127 {
highBitCount++
}
}
if highBitCount > 7 { // More than 70% high-bit chars in a 10-byte window
return fmt.Errorf("suspicious binary data detected at position %d", i)
}
}
}
}
return
}
func (l *Listener) HandleMessage(msg []byte, remote string) {
// Handle blacklisted IPs - discard messages but keep connection open until timeout
if l.isBlacklisted {
@@ -35,6 +72,17 @@ func (l *Listener) HandleMessage(msg []byte, remote string) {
}
// log.D.F("%s processing message (len=%d): %s", remote, len(msg), msgPreview)
// Validate message for invalid characters before processing
if err := validateJSONMessage(msg); err != nil {
log.E.F("%s message validation FAILED (len=%d): %v", remote, len(msg), err)
log.T.F("%s invalid message content: %q", remote, msgPreview)
// Send error notice to client
if noticeErr := noticeenvelope.NewFrom("invalid message format: " + err.Error()).Write(l); noticeErr != nil {
log.E.F("%s failed to send validation error notice: %v", remote, noticeErr)
}
return
}
l.msgCount++
var err error
var t string

View File

@@ -35,6 +35,12 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
// var rem []byte
env := reqenvelope.New()
if _, err = env.Unmarshal(msg); chk.E(err) {
// Provide more specific error context for JSON parsing failures
if strings.Contains(err.Error(), "invalid character") {
log.E.F("REQ JSON parsing failed from %s: %v", l.remote, err)
log.T.F("REQ malformed message from %s: %q", l.remote, string(msg))
return normalize.Error.Errorf("malformed REQ message: %s", err.Error())
}
return normalize.Error.Errorf(err.Error())
}

View File

@@ -10,11 +10,13 @@ import (
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/app/config"
"next.orly.dev/pkg/acl"
"next.orly.dev/pkg/crypto/keys"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/encoders/bech32encoding"
"next.orly.dev/pkg/policy"
"next.orly.dev/pkg/protocol/publish"
"next.orly.dev/pkg/spider"
)
func Run(
@@ -69,6 +71,48 @@ func Run(
// Initialize policy manager
l.policyManager = policy.NewWithManager(ctx, cfg.AppName, cfg.PolicyEnabled)
// Initialize spider manager based on mode
if cfg.SpiderMode != "none" {
if l.spiderManager, err = spider.New(ctx, db, l.publishers, cfg.SpiderMode); chk.E(err) {
log.E.F("failed to create spider manager: %v", err)
} else {
// Set up callbacks for follows mode
if cfg.SpiderMode == "follows" {
l.spiderManager.SetCallbacks(
func() []string {
// Get admin relays from follows ACL if available
for _, aclInstance := range acl.Registry.ACL {
if aclInstance.Type() == "follows" {
if follows, ok := aclInstance.(*acl.Follows); ok {
return follows.AdminRelays()
}
}
}
return nil
},
func() [][]byte {
// Get followed pubkeys from follows ACL if available
for _, aclInstance := range acl.Registry.ACL {
if aclInstance.Type() == "follows" {
if follows, ok := aclInstance.(*acl.Follows); ok {
return follows.GetFollowedPubkeys()
}
}
}
return nil
},
)
}
if err = l.spiderManager.Start(); chk.E(err) {
log.E.F("failed to start spider manager: %v", err)
} else {
log.I.F("spider manager started successfully in '%s' mode", cfg.SpiderMode)
}
}
}
// Initialize the user interface
l.UserInterface()
@@ -135,6 +179,12 @@ func Run(
<-ctx.Done()
log.I.F("shutting down HTTP server gracefully")
// Stop spider manager if running
if l.spiderManager != nil {
l.spiderManager.Stop()
log.I.F("spider manager stopped")
}
// Create shutdown context with timeout
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelShutdown()

View File

@@ -26,6 +26,7 @@ import (
"next.orly.dev/pkg/protocol/auth"
"next.orly.dev/pkg/protocol/httpauth"
"next.orly.dev/pkg/protocol/publish"
"next.orly.dev/pkg/spider"
)
type Server struct {
@@ -47,6 +48,7 @@ type Server struct {
paymentProcessor *PaymentProcessor
sprocketManager *SprocketManager
policyManager *policy.P
spiderManager *spider.Spider
}
// isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system

119
cmd/aggregator/README.md Normal file
View File

@@ -0,0 +1,119 @@
# Nostr Event Aggregator
A comprehensive program that searches for all events related to a specific npub across multiple Nostr relays and outputs them in JSONL format to stdout. The program finds both events authored by the user and events that mention the user in "p" tags. It features dynamic relay discovery from relay list events and progressive backward time-based fetching for complete historical data collection.
## Usage
```bash
go run main.go -npub <npub> [-since <timestamp>] [-until <timestamp>]
```
Where:
- `<npub>` is a bech32-encoded Nostr public key (starting with "npub1")
- `<timestamp>` is a Unix timestamp (seconds since epoch) - optional
## Examples
```bash
# Get all events related to a user (authored by and mentioning)
go run main.go -npub npub1234567890abcdef...
# Get events related to a user since January 1, 2022
go run main.go -npub npub1234567890abcdef... -since 1640995200
# Get events related to a user between two dates
go run main.go -npub npub1234567890abcdef... -since 1640995200 -until 1672531200
# Get events related to a user until December 31, 2022
go run main.go -npub npub1234567890abcdef... -until 1672531200
```
## Features
- **Comprehensive event discovery**: Finds both events authored by the user and events that mention the user
- **Dynamic relay discovery**: Automatically discovers and connects to new relays from relay list events (kind 10002)
- **Progressive backward fetching**: Systematically collects historical data in time-based batches
- **Triple filter approach**: Uses separate filters for authored events, p-tag mentions, and relay list events
- **Intelligent time management**: Works backwards from current time (or until timestamp) to since timestamp
- **Memory-efficient deduplication**: Uses bloom filter with ~0.1% false positive rate instead of unbounded maps
- **Fixed memory footprint**: Bloom filter uses only ~1.75MB for 1M events with controlled memory growth
- **Memory monitoring**: Real-time memory usage tracking and automatic garbage collection
- Connects to multiple relays simultaneously with dynamic expansion
- Outputs events in JSONL format (one JSON object per line)
- Handles connection failures gracefully
- Continues running until all relay connections are closed
- Time-based filtering with Unix timestamps (since/until parameters)
- Input validation for timestamp ranges
## Event Discovery
The aggregator searches for three types of events:
1. **Authored Events**: Events where the specified npub is the author (pubkey field matches)
2. **Mentioned Events**: Events that contain "p" tags referencing the specified npub (replies, mentions, etc.)
3. **Relay List Events**: Kind 10002 events that contain relay URLs for dynamic relay discovery
This comprehensive approach ensures you capture all events related to a user, including:
- Posts authored by the user
- Replies to the user's posts
- Posts that mention or tag the user
- Any other events that reference the user in p-tags
- Relay list metadata for discovering additional relays
## Progressive Fetching
The aggregator uses an intelligent progressive backward fetching strategy:
1. **Time-based batches**: Fetches data in weekly batches working backwards from the end time
2. **Dynamic relay expansion**: As relay list events are discovered, new relays are automatically added to the search
3. **Complete coverage**: Ensures all events between since and until timestamps are collected
4. **Efficient processing**: Processes each time batch completely before moving to the next
5. **Boundary respect**: Stops when reaching the since timestamp or beginning of available data
## Memory Management
The aggregator uses advanced memory management techniques to handle large-scale data collection:
### Bloom Filter Deduplication
- **Fixed Size**: Uses exactly 1.75MB for the bloom filter regardless of event count
- **Low False Positive Rate**: Configured for ~0.1% false positive rate with 1M events
- **Hash Functions**: Uses 10 independent hash functions based on SHA256 for optimal distribution
- **Thread-Safe**: Concurrent access protected with read-write mutexes
### Memory Monitoring
- **Real-time Tracking**: Monitors total memory usage every 30 seconds
- **Automatic GC**: Triggers garbage collection when approaching memory limits
- **Statistics Logging**: Reports bloom filter usage, estimated event count, and memory consumption
- **Controlled Growth**: Prevents unbounded memory growth through fixed-size data structures
### Performance Characteristics
- **Memory Usage**: ~1.75MB bloom filter + ~256MB total memory limit
- **False Positives**: ~0.1% chance of incorrectly identifying a duplicate (very low impact)
- **Scalability**: Can handle millions of events without memory issues
- **Efficiency**: O(k) time complexity for both add and lookup operations (k = hash functions)
## Relays
The program starts with the following initial relays:
- wss://nostr.wine/
- wss://nostr.land/
- wss://orly-relay.imwald.eu
- wss://relay.orly.dev/
- wss://relay.damus.io/
- wss://nos.lol/
- wss://theforest.nostr1.com/
**Dynamic Relay Discovery**: Additional relays are automatically discovered and added during execution when the program finds relay list events (kind 10002) authored by the target user. This ensures comprehensive coverage across the user's preferred relay network.
## Output Format
Each line of output is a JSON object representing a Nostr event with the following fields:
- `id`: Event ID (hex)
- `pubkey`: Author's public key (hex)
- `created_at`: Unix timestamp
- `kind`: Event kind number
- `tags`: Array of tag arrays
- `content`: Event content string
- `sig`: Event signature (hex)

1006
cmd/aggregator/main.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -266,7 +266,7 @@ func (f *Follows) adminRelays() (urls []string) {
// If no admin relays found, use bootstrap relays as fallback
if len(urls) == 0 {
log.I.F("no admin relays found in DB, checking bootstrap relays")
log.I.F("no admin relays found in DB, checking bootstrap relays and failover relays")
if len(f.cfg.BootstrapRelays) > 0 {
log.I.F("using bootstrap relays: %v", f.cfg.BootstrapRelays)
for _, relay := range f.cfg.BootstrapRelays {
@@ -302,7 +302,53 @@ func (f *Follows) adminRelays() (urls []string) {
urls = append(urls, n)
}
} else {
log.W.F("no bootstrap relays configured")
log.I.F("no bootstrap relays configured, using failover relays")
}
// If still no relays found, use hardcoded failover relays
// These relays will be used to fetch admin relay lists (kind 10002) and store them
// in the database so they're found next time
if len(urls) == 0 {
failoverRelays := []string{
"wss://nostr.land",
"wss://nostr.wine",
"wss://nos.lol",
"wss://relay.damus.io",
"wss://nostr.band",
}
log.I.F("using failover relays: %v", failoverRelays)
for _, relay := range failoverRelays {
n := string(normalize.URL(relay))
if n == "" {
log.W.F("invalid failover relay URL: %s", relay)
continue
}
// Skip if this URL is one of our configured self relay addresses or hosts
if _, isSelf := selfSet[n]; isSelf {
log.D.F("follows syncer: skipping configured self relay address: %s", n)
continue
}
// Host match
host := n
if i := strings.Index(host, "://"); i >= 0 {
host = host[i+3:]
}
if j := strings.Index(host, "/"); j >= 0 {
host = host[:j]
}
if k := strings.Index(host, ":"); k >= 0 {
host = host[:k]
}
if _, isSelfHost := selfHosts[host]; isSelfHost {
log.D.F("follows syncer: skipping configured self relay address: %s", n)
continue
}
if _, ok := seen[n]; ok {
continue
}
seen[n] = struct{}{}
urls = append(urls, n)
}
}
}
@@ -451,6 +497,7 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
keepaliveTicker := time.NewTicker(30 * time.Second)
defer keepaliveTicker.Stop()
readLoop:
for {
select {
case <-ctx.Done():
@@ -460,7 +507,7 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
// Send ping to keep connection alive
if err := c.Ping(ctx); err != nil {
log.T.F("follows syncer: ping failed for %s: %v", u, err)
break
break readLoop
}
log.T.F("follows syncer: sent ping to %s", u)
continue
@@ -471,7 +518,7 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
readCancel()
if err != nil {
_ = c.Close(websocket.StatusNormalClosure, "read err")
break
break readLoop
}
label, rem, err := envelopes.Identify(data)
if chk.E(err) {
@@ -634,7 +681,7 @@ func (f *Follows) fetchAdminFollowLists() {
urls := f.adminRelays()
if len(urls) == 0 {
log.W.F("follows syncer: no admin relays found for follow list fetching")
log.W.F("follows syncer: no relays available for follow list fetching (no admin relays, bootstrap relays, or failover relays)")
return
}
@@ -680,14 +727,19 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) {
log.I.F("follows syncer: fetching follow lists from relay %s", relayURL)
// Create filter for follow lists only (kind 3)
// Create filter for follow lists and relay lists (kind 3 and kind 10002)
ff := &filter.S{}
f1 := &filter.F{
Authors: tag.NewFromBytesSlice(authors...),
Kinds: kind.NewS(kind.New(kind.FollowList.K)),
Limit: values.ToUintPointer(100),
}
*ff = append(*ff, f1)
f2 := &filter.F{
Authors: tag.NewFromBytesSlice(authors...),
Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
Limit: values.ToUintPointer(100),
}
*ff = append(*ff, f1, f2)
// Use a specific subscription ID for follow list fetching
subID := "follow-lists-fetch"
@@ -699,24 +751,28 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) {
return
}
log.T.F("follows syncer: sent follow list REQ to %s", relayURL)
log.T.F("follows syncer: sent follow list and relay list REQ to %s", relayURL)
// Read follow list events with timeout
// Collect all events before processing
var followListEvents []*event.E
var relayListEvents []*event.E
// Read events with timeout
timeout := time.After(10 * time.Second)
for {
select {
case <-ctx.Done():
return
goto processEvents
case <-timeout:
log.T.F("follows syncer: timeout reading follow lists from %s", relayURL)
return
log.T.F("follows syncer: timeout reading events from %s", relayURL)
goto processEvents
default:
}
_, data, err := c.Read(ctx)
if err != nil {
log.T.F("follows syncer: error reading follow lists from %s: %v", relayURL, err)
return
log.T.F("follows syncer: error reading events from %s: %v", relayURL, err)
goto processEvents
}
label, rem, err := envelopes.Identify(data)
@@ -731,19 +787,101 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) {
continue
}
// Process follow list events
if res.Event.Kind == kind.FollowList.K {
// Collect events by kind
switch res.Event.Kind {
case kind.FollowList.K:
log.I.F("follows syncer: received follow list from %s on relay %s",
hex.EncodeToString(res.Event.Pubkey), relayURL)
f.extractFollowedPubkeys(res.Event)
followListEvents = append(followListEvents, res.Event)
case kind.RelayListMetadata.K:
log.I.F("follows syncer: received relay list from %s on relay %s",
hex.EncodeToString(res.Event.Pubkey), relayURL)
relayListEvents = append(relayListEvents, res.Event)
}
case eoseenvelope.L:
log.T.F("follows syncer: end of follow list events from %s", relayURL)
return
log.T.F("follows syncer: end of events from %s", relayURL)
goto processEvents
default:
// ignore other labels
}
}
processEvents:
// Process collected events - keep only the newest per pubkey and save to database
f.processCollectedEvents(relayURL, followListEvents, relayListEvents)
}
// processCollectedEvents processes the collected events, keeping only the newest per pubkey
func (f *Follows) processCollectedEvents(relayURL string, followListEvents, relayListEvents []*event.E) {
// Process follow list events (kind 3) - keep newest per pubkey
latestFollowLists := make(map[string]*event.E)
for _, ev := range followListEvents {
pubkeyHex := hex.EncodeToString(ev.Pubkey)
existing, exists := latestFollowLists[pubkeyHex]
if !exists || ev.CreatedAt > existing.CreatedAt {
latestFollowLists[pubkeyHex] = ev
}
}
// Process relay list events (kind 10002) - keep newest per pubkey
latestRelayLists := make(map[string]*event.E)
for _, ev := range relayListEvents {
pubkeyHex := hex.EncodeToString(ev.Pubkey)
existing, exists := latestRelayLists[pubkeyHex]
if !exists || ev.CreatedAt > existing.CreatedAt {
latestRelayLists[pubkeyHex] = ev
}
}
// Save and process the newest events
savedFollowLists := 0
savedRelayLists := 0
// Save follow list events to database and extract follows
for pubkeyHex, ev := range latestFollowLists {
if _, err := f.D.SaveEvent(f.Ctx, ev); err != nil {
if !strings.HasPrefix(err.Error(), "blocked:") {
log.W.F("follows syncer: failed to save follow list from %s: %v", pubkeyHex, err)
}
} else {
savedFollowLists++
log.I.F("follows syncer: saved newest follow list from %s (created_at: %d) from relay %s",
pubkeyHex, ev.CreatedAt, relayURL)
}
// Extract followed pubkeys from admin follow lists
if f.isAdminPubkey(ev.Pubkey) {
log.I.F("follows syncer: processing admin follow list from %s", pubkeyHex)
f.extractFollowedPubkeys(ev)
}
}
// Save relay list events to database
for pubkeyHex, ev := range latestRelayLists {
if _, err := f.D.SaveEvent(f.Ctx, ev); err != nil {
if !strings.HasPrefix(err.Error(), "blocked:") {
log.W.F("follows syncer: failed to save relay list from %s: %v", pubkeyHex, err)
}
} else {
savedRelayLists++
log.I.F("follows syncer: saved newest relay list from %s (created_at: %d) from relay %s",
pubkeyHex, ev.CreatedAt, relayURL)
}
}
log.I.F("follows syncer: processed %d follow lists and %d relay lists from %s, saved %d follow lists and %d relay lists",
len(followListEvents), len(relayListEvents), relayURL, savedFollowLists, savedRelayLists)
// If we saved any relay lists, trigger a refresh of subscriptions to use the new relay lists
if savedRelayLists > 0 {
log.I.F("follows syncer: saved new relay lists, triggering subscription refresh")
// Signal that follows have been updated to refresh subscriptions
select {
case f.updated <- struct{}{}:
default:
// Channel might be full, that's okay
}
}
}
// GetFollowedPubkeys returns a copy of the followed pubkeys list
@@ -783,6 +921,11 @@ func (f *Follows) extractFollowedPubkeys(event *event.E) {
}
}
// AdminRelays returns the admin relay URLs
func (f *Follows) AdminRelays() []string {
return f.adminRelays()
}
// AddFollow appends a pubkey to the in-memory follows list if not already present
// and signals the syncer to refresh subscriptions.
func (f *Follows) AddFollow(pub []byte) {

View File

@@ -114,9 +114,20 @@ func UnmarshalQuoted(b []byte) (content, rem []byte, err error) {
//
// backspace, tab, newline, form feed or carriage return.
case '\b', '\t', '\n', '\f', '\r':
pos := len(content) - len(rem)
contextStart := pos - 10
if contextStart < 0 {
contextStart = 0
}
contextEnd := pos + 10
if contextEnd > len(content) {
contextEnd = len(content)
}
err = errorf.E(
"invalid character '%s' in quoted string",
"invalid character '%s' in quoted string (position %d, context: %q)",
NostrEscape(nil, rem[:1]),
pos,
string(content[contextStart:contextEnd]),
)
return
}

581
pkg/spider/spider.go Normal file
View File

@@ -0,0 +1,581 @@
package spider
import (
"context"
"encoding/hex"
"fmt"
"sync"
"time"
"lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/encoders/filter"
"next.orly.dev/pkg/encoders/tag"
"next.orly.dev/pkg/encoders/timestamp"
"next.orly.dev/pkg/interfaces/publisher"
"next.orly.dev/pkg/protocol/ws"
)
const (
// BatchSize is the number of pubkeys per subscription batch
BatchSize = 20
// CatchupWindow is the extra time added to disconnection periods for catch-up
CatchupWindow = 30 * time.Minute
// ReconnectDelay is the delay between reconnection attempts
ReconnectDelay = 5 * time.Second
// MaxReconnectDelay is the maximum delay between reconnection attempts
MaxReconnectDelay = 5 * time.Minute
)
// Spider manages connections to admin relays and syncs events for followed pubkeys
type Spider struct {
ctx context.Context
cancel context.CancelFunc
db *database.D
pub publisher.I
mode string
// Configuration
adminRelays []string
followList [][]byte
// State management
mu sync.RWMutex
connections map[string]*RelayConnection
running bool
// Callbacks for getting updated data
getAdminRelays func() []string
getFollowList func() [][]byte
}
// RelayConnection manages a single relay connection and its subscriptions
type RelayConnection struct {
url string
client *ws.Client
ctx context.Context
cancel context.CancelFunc
spider *Spider
// Subscription management
mu sync.RWMutex
subscriptions map[string]*BatchSubscription
// Disconnection tracking
lastDisconnect time.Time
reconnectDelay time.Duration
}
// BatchSubscription represents a subscription for a batch of pubkeys
type BatchSubscription struct {
id string
pubkeys [][]byte
startTime time.Time
sub *ws.Subscription
relay *RelayConnection
// Track disconnection periods for catch-up
disconnectedAt *time.Time
}
// DisconnectionPeriod tracks when a subscription was disconnected
type DisconnectionPeriod struct {
Start time.Time
End time.Time
}
// New creates a new Spider instance
func New(ctx context.Context, db *database.D, pub publisher.I, mode string) (s *Spider, err error) {
if db == nil {
err = errorf.E("database cannot be nil")
return
}
// Validate mode
switch mode {
case "follows", "none":
// Valid modes
default:
err = errorf.E("invalid spider mode: %s (valid modes: none, follows)", mode)
return
}
ctx, cancel := context.WithCancel(ctx)
s = &Spider{
ctx: ctx,
cancel: cancel,
db: db,
pub: pub,
mode: mode,
connections: make(map[string]*RelayConnection),
}
return
}
// SetCallbacks sets the callback functions for getting updated admin relays and follow lists
func (s *Spider) SetCallbacks(getAdminRelays func() []string, getFollowList func() [][]byte) {
s.mu.Lock()
defer s.mu.Unlock()
s.getAdminRelays = getAdminRelays
s.getFollowList = getFollowList
}
// Start begins the spider operation
func (s *Spider) Start() (err error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.running {
err = errorf.E("spider already running")
return
}
// Handle 'none' mode - no-op
if s.mode == "none" {
log.I.F("spider: mode is 'none', not starting")
return
}
if s.getAdminRelays == nil || s.getFollowList == nil {
err = errorf.E("callbacks must be set before starting")
return
}
s.running = true
// Start the main loop
go s.mainLoop()
log.I.F("spider: started in '%s' mode", s.mode)
return
}
// Stop stops the spider operation
func (s *Spider) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.running {
return
}
s.running = false
s.cancel()
// Close all connections
for _, conn := range s.connections {
conn.close()
}
s.connections = make(map[string]*RelayConnection)
log.I.F("spider: stopped")
}
// mainLoop is the main spider loop that manages connections and subscriptions
func (s *Spider) mainLoop() {
ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.updateConnections()
}
}
}
// updateConnections updates relay connections based on current admin relays and follow lists
func (s *Spider) updateConnections() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.running {
return
}
// Get current admin relays and follow list
adminRelays := s.getAdminRelays()
followList := s.getFollowList()
if len(adminRelays) == 0 || len(followList) == 0 {
log.D.F("spider: no admin relays (%d) or follow list (%d) available",
len(adminRelays), len(followList))
return
}
// Update connections for current admin relays
currentRelays := make(map[string]bool)
for _, url := range adminRelays {
currentRelays[url] = true
if conn, exists := s.connections[url]; exists {
// Update existing connection
conn.updateSubscriptions(followList)
} else {
// Create new connection
s.createConnection(url, followList)
}
}
// Remove connections for relays no longer in admin list
for url, conn := range s.connections {
if !currentRelays[url] {
log.I.F("spider: removing connection to %s (no longer in admin relays)", url)
conn.close()
delete(s.connections, url)
}
}
}
// createConnection creates a new relay connection
func (s *Spider) createConnection(url string, followList [][]byte) {
log.I.F("spider: creating connection to %s", url)
ctx, cancel := context.WithCancel(s.ctx)
conn := &RelayConnection{
url: url,
ctx: ctx,
cancel: cancel,
spider: s,
subscriptions: make(map[string]*BatchSubscription),
reconnectDelay: ReconnectDelay,
}
s.connections[url] = conn
// Start connection in goroutine
go conn.manage(followList)
}
// manage handles the lifecycle of a relay connection
func (rc *RelayConnection) manage(followList [][]byte) {
for {
select {
case <-rc.ctx.Done():
return
default:
}
// Attempt to connect
if err := rc.connect(); chk.E(err) {
log.W.F("spider: failed to connect to %s: %v", rc.url, err)
rc.waitBeforeReconnect()
continue
}
log.I.F("spider: connected to %s", rc.url)
rc.reconnectDelay = ReconnectDelay // Reset delay on successful connection
// Create subscriptions for follow list
rc.createSubscriptions(followList)
// Wait for disconnection
<-rc.client.Context().Done()
log.W.F("spider: disconnected from %s: %v", rc.url, rc.client.ConnectionCause())
rc.handleDisconnection()
// Clean up
rc.client = nil
rc.clearSubscriptions()
}
}
// connect establishes a websocket connection to the relay
func (rc *RelayConnection) connect() (err error) {
connectCtx, cancel := context.WithTimeout(rc.ctx, 10*time.Second)
defer cancel()
if rc.client, err = ws.RelayConnect(connectCtx, rc.url); chk.E(err) {
return
}
return
}
// waitBeforeReconnect waits before attempting to reconnect with exponential backoff
func (rc *RelayConnection) waitBeforeReconnect() {
select {
case <-rc.ctx.Done():
return
case <-time.After(rc.reconnectDelay):
}
// Exponential backoff
rc.reconnectDelay *= 2
if rc.reconnectDelay > MaxReconnectDelay {
rc.reconnectDelay = MaxReconnectDelay
}
}
// handleDisconnection records disconnection time for catch-up logic
func (rc *RelayConnection) handleDisconnection() {
now := time.Now()
rc.lastDisconnect = now
// Mark all subscriptions as disconnected
rc.mu.Lock()
defer rc.mu.Unlock()
for _, sub := range rc.subscriptions {
if sub.disconnectedAt == nil {
sub.disconnectedAt = &now
}
}
}
// createSubscriptions creates batch subscriptions for the follow list
func (rc *RelayConnection) createSubscriptions(followList [][]byte) {
rc.mu.Lock()
defer rc.mu.Unlock()
// Clear existing subscriptions
rc.clearSubscriptionsLocked()
// Create batches of pubkeys
batches := rc.createBatches(followList)
log.I.F("spider: creating %d subscription batches for %d pubkeys on %s",
len(batches), len(followList), rc.url)
for i, batch := range batches {
batchID := fmt.Sprintf("batch-%d", i) // Simple batch ID
rc.createBatchSubscription(batchID, batch)
}
}
// createBatches splits the follow list into batches of BatchSize
func (rc *RelayConnection) createBatches(followList [][]byte) (batches [][][]byte) {
for i := 0; i < len(followList); i += BatchSize {
end := i + BatchSize
if end > len(followList) {
end = len(followList)
}
batch := make([][]byte, end-i)
copy(batch, followList[i:end])
batches = append(batches, batch)
}
return
}
// createBatchSubscription creates a subscription for a batch of pubkeys
func (rc *RelayConnection) createBatchSubscription(batchID string, pubkeys [][]byte) {
if rc.client == nil {
return
}
// Create filters: one for authors, one for p tags
var pTags tag.S
for _, pk := range pubkeys {
pTags = append(pTags, tag.NewFromAny("p", pk))
}
filters := filter.NewS(
&filter.F{
Authors: tag.NewFromBytesSlice(pubkeys...),
},
&filter.F{
Tags: tag.NewS(pTags...),
},
)
// Subscribe
sub, err := rc.client.Subscribe(rc.ctx, filters)
if chk.E(err) {
log.E.F("spider: failed to create subscription %s on %s: %v", batchID, rc.url, err)
return
}
batchSub := &BatchSubscription{
id: batchID,
pubkeys: pubkeys,
startTime: time.Now(),
sub: sub,
relay: rc,
}
rc.subscriptions[batchID] = batchSub
// Start event handler
go batchSub.handleEvents()
log.D.F("spider: created subscription %s for %d pubkeys on %s",
batchID, len(pubkeys), rc.url)
}
// handleEvents processes events from the subscription
func (bs *BatchSubscription) handleEvents() {
for {
select {
case <-bs.relay.ctx.Done():
return
case ev := <-bs.sub.Events:
if ev == nil {
return // Subscription closed
}
// Save event to database
if _, err := bs.relay.spider.db.SaveEvent(bs.relay.ctx, ev); err != nil {
if !chk.E(err) {
log.T.F("spider: saved event %s from %s",
hex.EncodeToString(ev.ID[:]), bs.relay.url)
}
} else {
// Publish event if it was newly saved
if bs.relay.spider.pub != nil {
go bs.relay.spider.pub.Deliver(ev)
}
}
}
}
}
// updateSubscriptions updates subscriptions for a connection with new follow list
func (rc *RelayConnection) updateSubscriptions(followList [][]byte) {
if rc.client == nil || !rc.client.IsConnected() {
return // Will be handled on reconnection
}
rc.mu.Lock()
defer rc.mu.Unlock()
// Check if we need to perform catch-up for disconnected subscriptions
now := time.Now()
needsCatchup := false
for _, sub := range rc.subscriptions {
if sub.disconnectedAt != nil {
needsCatchup = true
rc.performCatchup(sub, *sub.disconnectedAt, now, followList)
sub.disconnectedAt = nil // Clear disconnection marker
}
}
if needsCatchup {
log.I.F("spider: performed catch-up for disconnected subscriptions on %s", rc.url)
}
// Recreate subscriptions with updated follow list
rc.clearSubscriptionsLocked()
batches := rc.createBatches(followList)
for i, batch := range batches {
batchID := fmt.Sprintf("batch-%d", i)
rc.createBatchSubscription(batchID, batch)
}
}
// performCatchup queries for events missed during disconnection
func (rc *RelayConnection) performCatchup(sub *BatchSubscription, disconnectTime, reconnectTime time.Time, followList [][]byte) {
// Expand time window by CatchupWindow on both sides
since := disconnectTime.Add(-CatchupWindow)
until := reconnectTime.Add(CatchupWindow)
log.I.F("spider: performing catch-up for %s from %v to %v (expanded window)",
rc.url, since, until)
// Create catch-up filters with time constraints
sinceTs := timestamp.T{V: since.Unix()}
untilTs := timestamp.T{V: until.Unix()}
var pTags tag.S
for _, pk := range sub.pubkeys {
pTags = append(pTags, tag.NewFromAny("p", pk))
}
filters := filter.NewS(
&filter.F{
Authors: tag.NewFromBytesSlice(sub.pubkeys...),
Since: &sinceTs,
Until: &untilTs,
},
&filter.F{
Tags: tag.NewS(pTags...),
Since: &sinceTs,
Until: &untilTs,
},
)
// Create temporary subscription for catch-up
catchupCtx, cancel := context.WithTimeout(rc.ctx, 30*time.Second)
defer cancel()
catchupSub, err := rc.client.Subscribe(catchupCtx, filters)
if chk.E(err) {
log.E.F("spider: failed to create catch-up subscription on %s: %v", rc.url, err)
return
}
defer catchupSub.Unsub()
// Process catch-up events
eventCount := 0
timeout := time.After(30 * time.Second)
for {
select {
case <-catchupCtx.Done():
log.D.F("spider: catch-up completed on %s, processed %d events", rc.url, eventCount)
return
case <-timeout:
log.D.F("spider: catch-up timeout on %s, processed %d events", rc.url, eventCount)
return
case <-catchupSub.EndOfStoredEvents:
log.D.F("spider: catch-up EOSE on %s, processed %d events", rc.url, eventCount)
return
case ev := <-catchupSub.Events:
if ev == nil {
return
}
eventCount++
// Save event to database
if _, err := rc.spider.db.SaveEvent(rc.ctx, ev); err != nil {
if !chk.E(err) {
log.T.F("spider: catch-up saved event %s from %s",
hex.EncodeToString(ev.ID[:]), rc.url)
}
} else {
// Publish event if it was newly saved
if rc.spider.pub != nil {
go rc.spider.pub.Deliver(ev)
}
}
}
}
}
// clearSubscriptions clears all subscriptions (with lock)
func (rc *RelayConnection) clearSubscriptions() {
rc.mu.Lock()
defer rc.mu.Unlock()
rc.clearSubscriptionsLocked()
}
// clearSubscriptionsLocked clears all subscriptions (without lock)
func (rc *RelayConnection) clearSubscriptionsLocked() {
for _, sub := range rc.subscriptions {
if sub.sub != nil {
sub.sub.Unsub()
}
}
rc.subscriptions = make(map[string]*BatchSubscription)
}
// close closes the relay connection
func (rc *RelayConnection) close() {
rc.clearSubscriptions()
if rc.client != nil {
rc.client.Close()
rc.client = nil
}
rc.cancel()
}

244
pkg/spider/spider_test.go Normal file
View File

@@ -0,0 +1,244 @@
package spider
import (
"context"
"os"
"testing"
"time"
"next.orly.dev/pkg/database"
)
func TestSpiderCreation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a temporary database for testing
tempDir, err := os.MkdirTemp("", "spider-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
db, err := database.New(ctx, cancel, tempDir, "error")
if err != nil {
t.Fatalf("Failed to create test database: %v", err)
}
defer db.Close()
// Test spider creation
spider, err := New(ctx, db, nil, "follows")
if err != nil {
t.Fatalf("Failed to create spider: %v", err)
}
if spider == nil {
t.Fatal("Spider is nil")
}
// Test that spider is not running initially
spider.mu.RLock()
running := spider.running
spider.mu.RUnlock()
if running {
t.Error("Spider should not be running initially")
}
}
func TestSpiderCallbacks(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a temporary database for testing
tempDir, err := os.MkdirTemp("", "spider-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
db, err := database.New(ctx, cancel, tempDir, "error")
if err != nil {
t.Fatalf("Failed to create test database: %v", err)
}
defer db.Close()
spider, err := New(ctx, db, nil, "follows")
if err != nil {
t.Fatalf("Failed to create spider: %v", err)
}
// Test callback setup
testRelays := []string{"wss://relay1.example.com", "wss://relay2.example.com"}
testPubkeys := [][]byte{{1, 2, 3}, {4, 5, 6}}
spider.SetCallbacks(
func() []string { return testRelays },
func() [][]byte { return testPubkeys },
)
// Verify callbacks are set
spider.mu.RLock()
hasCallbacks := spider.getAdminRelays != nil && spider.getFollowList != nil
spider.mu.RUnlock()
if !hasCallbacks {
t.Error("Callbacks should be set")
}
// Test that start fails without callbacks being set first
spider2, err := New(ctx, db, nil, "follows")
if err != nil {
t.Fatalf("Failed to create second spider: %v", err)
}
err = spider2.Start()
if err == nil {
t.Error("Start should fail when callbacks are not set")
}
}
func TestSpiderModeValidation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a temporary database for testing
tempDir, err := os.MkdirTemp("", "spider-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
db, err := database.New(ctx, cancel, tempDir, "error")
if err != nil {
t.Fatalf("Failed to create test database: %v", err)
}
defer db.Close()
// Test valid mode
spider, err := New(ctx, db, nil, "follows")
if err != nil {
t.Fatalf("Failed to create spider with valid mode: %v", err)
}
if spider == nil {
t.Fatal("Spider should not be nil for valid mode")
}
// Test invalid mode
_, err = New(ctx, db, nil, "invalid")
if err == nil {
t.Error("Should fail with invalid mode")
}
// Test none mode (should succeed but be a no-op)
spider2, err := New(ctx, db, nil, "none")
if err != nil {
t.Errorf("Should succeed with 'none' mode: %v", err)
}
if spider2 == nil {
t.Error("Spider should not be nil for 'none' mode")
}
// Test that 'none' mode doesn't require callbacks
err = spider2.Start()
if err != nil {
t.Errorf("'none' mode should start without callbacks: %v", err)
}
}
func TestSpiderBatching(t *testing.T) {
// Test batch creation logic
followList := make([][]byte, 50) // 50 pubkeys
for i := range followList {
followList[i] = make([]byte, 32)
for j := range followList[i] {
followList[i][j] = byte(i)
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rc := &RelayConnection{
url: "wss://test.relay.com",
ctx: ctx,
}
batches := rc.createBatches(followList)
// Should create 3 batches: 20, 20, 10
expectedBatches := 3
if len(batches) != expectedBatches {
t.Errorf("Expected %d batches, got %d", expectedBatches, len(batches))
}
// Check batch sizes
if len(batches[0]) != BatchSize {
t.Errorf("First batch should have %d pubkeys, got %d", BatchSize, len(batches[0]))
}
if len(batches[1]) != BatchSize {
t.Errorf("Second batch should have %d pubkeys, got %d", BatchSize, len(batches[1]))
}
if len(batches[2]) != 10 {
t.Errorf("Third batch should have 10 pubkeys, got %d", len(batches[2]))
}
}
func TestSpiderStartStop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a temporary database for testing
tempDir, err := os.MkdirTemp("", "spider-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
db, err := database.New(ctx, cancel, tempDir, "error")
if err != nil {
t.Fatalf("Failed to create test database: %v", err)
}
defer db.Close()
spider, err := New(ctx, db, nil, "follows")
if err != nil {
t.Fatalf("Failed to create spider: %v", err)
}
// Set up callbacks
spider.SetCallbacks(
func() []string { return []string{"wss://test.relay.com"} },
func() [][]byte { return [][]byte{{1, 2, 3}} },
)
// Test start
err = spider.Start()
if err != nil {
t.Fatalf("Failed to start spider: %v", err)
}
// Verify spider is running
spider.mu.RLock()
running := spider.running
spider.mu.RUnlock()
if !running {
t.Error("Spider should be running after start")
}
// Test stop
spider.Stop()
// Give it a moment to stop
time.Sleep(100 * time.Millisecond)
// Verify spider is stopped
spider.mu.RLock()
running = spider.running
spider.mu.RUnlock()
if running {
t.Error("Spider should not be running after stop")
}
}

View File

@@ -1 +1 @@
v0.17.12
v0.17.15