Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
4c53709e2d
|
|||
|
a4fc3d8d9b
|
@@ -59,6 +59,9 @@ type C struct {
|
||||
// Sprocket settings
|
||||
SprocketEnabled bool `env:"ORLY_SPROCKET_ENABLED" default:"false" usage:"enable sprocket event processing plugin system"`
|
||||
|
||||
// Spider settings
|
||||
SpiderMode string `env:"ORLY_SPIDER_MODE" default:"none" usage:"spider mode for syncing events: none, follows"`
|
||||
|
||||
PolicyEnabled bool `env:"ORLY_POLICY_ENABLED" default:"false" usage:"enable policy-based event processing (configuration found in $HOME/.config/ORLY/policy.json)"`
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package app
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
@@ -15,6 +16,42 @@ import (
|
||||
"next.orly.dev/pkg/encoders/envelopes/reqenvelope"
|
||||
)
|
||||
|
||||
// validateJSONMessage checks if a message contains invalid control characters
|
||||
// that would cause JSON parsing to fail
|
||||
func validateJSONMessage(msg []byte) (err error) {
|
||||
for i, b := range msg {
|
||||
// Check for invalid control characters in JSON strings
|
||||
if b < 32 && b != '\t' && b != '\n' && b != '\r' {
|
||||
// Allow some control characters that might be valid in certain contexts
|
||||
// but reject form feed (\f), backspace (\b), and other problematic ones
|
||||
switch b {
|
||||
case '\b', '\f', 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07,
|
||||
0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17,
|
||||
0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F:
|
||||
return fmt.Errorf("invalid control character 0x%02X at position %d", b, i)
|
||||
}
|
||||
}
|
||||
// Check for non-printable characters that might indicate binary data
|
||||
if b > 127 && !unicode.IsPrint(rune(b)) {
|
||||
// Allow valid UTF-8 sequences, but be suspicious of random binary data
|
||||
if i < len(msg)-1 {
|
||||
// Quick check: if we see a lot of high-bit characters in sequence,
|
||||
// it might be binary data masquerading as text
|
||||
highBitCount := 0
|
||||
for j := i; j < len(msg) && j < i+10; j++ {
|
||||
if msg[j] > 127 {
|
||||
highBitCount++
|
||||
}
|
||||
}
|
||||
if highBitCount > 7 { // More than 70% high-bit chars in a 10-byte window
|
||||
return fmt.Errorf("suspicious binary data detected at position %d", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (l *Listener) HandleMessage(msg []byte, remote string) {
|
||||
// Handle blacklisted IPs - discard messages but keep connection open until timeout
|
||||
if l.isBlacklisted {
|
||||
@@ -35,6 +72,17 @@ func (l *Listener) HandleMessage(msg []byte, remote string) {
|
||||
}
|
||||
// log.D.F("%s processing message (len=%d): %s", remote, len(msg), msgPreview)
|
||||
|
||||
// Validate message for invalid characters before processing
|
||||
if err := validateJSONMessage(msg); err != nil {
|
||||
log.E.F("%s message validation FAILED (len=%d): %v", remote, len(msg), err)
|
||||
log.T.F("%s invalid message content: %q", remote, msgPreview)
|
||||
// Send error notice to client
|
||||
if noticeErr := noticeenvelope.NewFrom("invalid message format: " + err.Error()).Write(l); noticeErr != nil {
|
||||
log.E.F("%s failed to send validation error notice: %v", remote, noticeErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
l.msgCount++
|
||||
var err error
|
||||
var t string
|
||||
|
||||
@@ -35,6 +35,12 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||
// var rem []byte
|
||||
env := reqenvelope.New()
|
||||
if _, err = env.Unmarshal(msg); chk.E(err) {
|
||||
// Provide more specific error context for JSON parsing failures
|
||||
if strings.Contains(err.Error(), "invalid character") {
|
||||
log.E.F("REQ JSON parsing failed from %s: %v", l.remote, err)
|
||||
log.T.F("REQ malformed message from %s: %q", l.remote, string(msg))
|
||||
return normalize.Error.Errorf("malformed REQ message: %s", err.Error())
|
||||
}
|
||||
return normalize.Error.Errorf(err.Error())
|
||||
}
|
||||
|
||||
|
||||
50
app/main.go
50
app/main.go
@@ -10,11 +10,13 @@ import (
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/app/config"
|
||||
"next.orly.dev/pkg/acl"
|
||||
"next.orly.dev/pkg/crypto/keys"
|
||||
"next.orly.dev/pkg/database"
|
||||
"next.orly.dev/pkg/encoders/bech32encoding"
|
||||
"next.orly.dev/pkg/policy"
|
||||
"next.orly.dev/pkg/protocol/publish"
|
||||
"next.orly.dev/pkg/spider"
|
||||
)
|
||||
|
||||
func Run(
|
||||
@@ -69,6 +71,48 @@ func Run(
|
||||
|
||||
// Initialize policy manager
|
||||
l.policyManager = policy.NewWithManager(ctx, cfg.AppName, cfg.PolicyEnabled)
|
||||
|
||||
// Initialize spider manager based on mode
|
||||
if cfg.SpiderMode != "none" {
|
||||
if l.spiderManager, err = spider.New(ctx, db, l.publishers, cfg.SpiderMode); chk.E(err) {
|
||||
log.E.F("failed to create spider manager: %v", err)
|
||||
} else {
|
||||
// Set up callbacks for follows mode
|
||||
if cfg.SpiderMode == "follows" {
|
||||
l.spiderManager.SetCallbacks(
|
||||
func() []string {
|
||||
// Get admin relays from follows ACL if available
|
||||
for _, aclInstance := range acl.Registry.ACL {
|
||||
if aclInstance.Type() == "follows" {
|
||||
if follows, ok := aclInstance.(*acl.Follows); ok {
|
||||
return follows.AdminRelays()
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
func() [][]byte {
|
||||
// Get followed pubkeys from follows ACL if available
|
||||
for _, aclInstance := range acl.Registry.ACL {
|
||||
if aclInstance.Type() == "follows" {
|
||||
if follows, ok := aclInstance.(*acl.Follows); ok {
|
||||
return follows.GetFollowedPubkeys()
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if err = l.spiderManager.Start(); chk.E(err) {
|
||||
log.E.F("failed to start spider manager: %v", err)
|
||||
} else {
|
||||
log.I.F("spider manager started successfully in '%s' mode", cfg.SpiderMode)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize the user interface
|
||||
l.UserInterface()
|
||||
|
||||
@@ -135,6 +179,12 @@ func Run(
|
||||
<-ctx.Done()
|
||||
log.I.F("shutting down HTTP server gracefully")
|
||||
|
||||
// Stop spider manager if running
|
||||
if l.spiderManager != nil {
|
||||
l.spiderManager.Stop()
|
||||
log.I.F("spider manager stopped")
|
||||
}
|
||||
|
||||
// Create shutdown context with timeout
|
||||
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancelShutdown()
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"next.orly.dev/pkg/protocol/auth"
|
||||
"next.orly.dev/pkg/protocol/httpauth"
|
||||
"next.orly.dev/pkg/protocol/publish"
|
||||
"next.orly.dev/pkg/spider"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
@@ -47,6 +48,7 @@ type Server struct {
|
||||
paymentProcessor *PaymentProcessor
|
||||
sprocketManager *SprocketManager
|
||||
policyManager *policy.P
|
||||
spiderManager *spider.Spider
|
||||
}
|
||||
|
||||
// isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system
|
||||
|
||||
119
cmd/aggregator/README.md
Normal file
119
cmd/aggregator/README.md
Normal file
@@ -0,0 +1,119 @@
|
||||
# Nostr Event Aggregator
|
||||
|
||||
A comprehensive program that searches for all events related to a specific npub across multiple Nostr relays and outputs them in JSONL format to stdout. The program finds both events authored by the user and events that mention the user in "p" tags. It features dynamic relay discovery from relay list events and progressive backward time-based fetching for complete historical data collection.
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
go run main.go -npub <npub> [-since <timestamp>] [-until <timestamp>]
|
||||
```
|
||||
|
||||
Where:
|
||||
- `<npub>` is a bech32-encoded Nostr public key (starting with "npub1")
|
||||
- `<timestamp>` is a Unix timestamp (seconds since epoch) - optional
|
||||
|
||||
## Examples
|
||||
|
||||
```bash
|
||||
# Get all events related to a user (authored by and mentioning)
|
||||
go run main.go -npub npub1234567890abcdef...
|
||||
|
||||
# Get events related to a user since January 1, 2022
|
||||
go run main.go -npub npub1234567890abcdef... -since 1640995200
|
||||
|
||||
# Get events related to a user between two dates
|
||||
go run main.go -npub npub1234567890abcdef... -since 1640995200 -until 1672531200
|
||||
|
||||
# Get events related to a user until December 31, 2022
|
||||
go run main.go -npub npub1234567890abcdef... -until 1672531200
|
||||
```
|
||||
|
||||
## Features
|
||||
|
||||
- **Comprehensive event discovery**: Finds both events authored by the user and events that mention the user
|
||||
- **Dynamic relay discovery**: Automatically discovers and connects to new relays from relay list events (kind 10002)
|
||||
- **Progressive backward fetching**: Systematically collects historical data in time-based batches
|
||||
- **Triple filter approach**: Uses separate filters for authored events, p-tag mentions, and relay list events
|
||||
- **Intelligent time management**: Works backwards from current time (or until timestamp) to since timestamp
|
||||
- **Memory-efficient deduplication**: Uses bloom filter with ~0.1% false positive rate instead of unbounded maps
|
||||
- **Fixed memory footprint**: Bloom filter uses only ~1.75MB for 1M events with controlled memory growth
|
||||
- **Memory monitoring**: Real-time memory usage tracking and automatic garbage collection
|
||||
- Connects to multiple relays simultaneously with dynamic expansion
|
||||
- Outputs events in JSONL format (one JSON object per line)
|
||||
- Handles connection failures gracefully
|
||||
- Continues running until all relay connections are closed
|
||||
- Time-based filtering with Unix timestamps (since/until parameters)
|
||||
- Input validation for timestamp ranges
|
||||
|
||||
## Event Discovery
|
||||
|
||||
The aggregator searches for three types of events:
|
||||
|
||||
1. **Authored Events**: Events where the specified npub is the author (pubkey field matches)
|
||||
2. **Mentioned Events**: Events that contain "p" tags referencing the specified npub (replies, mentions, etc.)
|
||||
3. **Relay List Events**: Kind 10002 events that contain relay URLs for dynamic relay discovery
|
||||
|
||||
This comprehensive approach ensures you capture all events related to a user, including:
|
||||
- Posts authored by the user
|
||||
- Replies to the user's posts
|
||||
- Posts that mention or tag the user
|
||||
- Any other events that reference the user in p-tags
|
||||
- Relay list metadata for discovering additional relays
|
||||
|
||||
## Progressive Fetching
|
||||
|
||||
The aggregator uses an intelligent progressive backward fetching strategy:
|
||||
|
||||
1. **Time-based batches**: Fetches data in weekly batches working backwards from the end time
|
||||
2. **Dynamic relay expansion**: As relay list events are discovered, new relays are automatically added to the search
|
||||
3. **Complete coverage**: Ensures all events between since and until timestamps are collected
|
||||
4. **Efficient processing**: Processes each time batch completely before moving to the next
|
||||
5. **Boundary respect**: Stops when reaching the since timestamp or beginning of available data
|
||||
|
||||
## Memory Management
|
||||
|
||||
The aggregator uses advanced memory management techniques to handle large-scale data collection:
|
||||
|
||||
### Bloom Filter Deduplication
|
||||
- **Fixed Size**: Uses exactly 1.75MB for the bloom filter regardless of event count
|
||||
- **Low False Positive Rate**: Configured for ~0.1% false positive rate with 1M events
|
||||
- **Hash Functions**: Uses 10 independent hash functions based on SHA256 for optimal distribution
|
||||
- **Thread-Safe**: Concurrent access protected with read-write mutexes
|
||||
|
||||
### Memory Monitoring
|
||||
- **Real-time Tracking**: Monitors total memory usage every 30 seconds
|
||||
- **Automatic GC**: Triggers garbage collection when approaching memory limits
|
||||
- **Statistics Logging**: Reports bloom filter usage, estimated event count, and memory consumption
|
||||
- **Controlled Growth**: Prevents unbounded memory growth through fixed-size data structures
|
||||
|
||||
### Performance Characteristics
|
||||
- **Memory Usage**: ~1.75MB bloom filter + ~256MB total memory limit
|
||||
- **False Positives**: ~0.1% chance of incorrectly identifying a duplicate (very low impact)
|
||||
- **Scalability**: Can handle millions of events without memory issues
|
||||
- **Efficiency**: O(k) time complexity for both add and lookup operations (k = hash functions)
|
||||
|
||||
## Relays
|
||||
|
||||
The program starts with the following initial relays:
|
||||
|
||||
- wss://nostr.wine/
|
||||
- wss://nostr.land/
|
||||
- wss://orly-relay.imwald.eu
|
||||
- wss://relay.orly.dev/
|
||||
- wss://relay.damus.io/
|
||||
- wss://nos.lol/
|
||||
- wss://theforest.nostr1.com/
|
||||
|
||||
**Dynamic Relay Discovery**: Additional relays are automatically discovered and added during execution when the program finds relay list events (kind 10002) authored by the target user. This ensures comprehensive coverage across the user's preferred relay network.
|
||||
|
||||
## Output Format
|
||||
|
||||
Each line of output is a JSON object representing a Nostr event with the following fields:
|
||||
|
||||
- `id`: Event ID (hex)
|
||||
- `pubkey`: Author's public key (hex)
|
||||
- `created_at`: Unix timestamp
|
||||
- `kind`: Event kind number
|
||||
- `tags`: Array of tag arrays
|
||||
- `content`: Event content string
|
||||
- `sig`: Event signature (hex)
|
||||
1006
cmd/aggregator/main.go
Normal file
1006
cmd/aggregator/main.go
Normal file
File diff suppressed because it is too large
Load Diff
@@ -266,7 +266,7 @@ func (f *Follows) adminRelays() (urls []string) {
|
||||
|
||||
// If no admin relays found, use bootstrap relays as fallback
|
||||
if len(urls) == 0 {
|
||||
log.I.F("no admin relays found in DB, checking bootstrap relays")
|
||||
log.I.F("no admin relays found in DB, checking bootstrap relays and failover relays")
|
||||
if len(f.cfg.BootstrapRelays) > 0 {
|
||||
log.I.F("using bootstrap relays: %v", f.cfg.BootstrapRelays)
|
||||
for _, relay := range f.cfg.BootstrapRelays {
|
||||
@@ -302,7 +302,53 @@ func (f *Follows) adminRelays() (urls []string) {
|
||||
urls = append(urls, n)
|
||||
}
|
||||
} else {
|
||||
log.W.F("no bootstrap relays configured")
|
||||
log.I.F("no bootstrap relays configured, using failover relays")
|
||||
}
|
||||
|
||||
// If still no relays found, use hardcoded failover relays
|
||||
// These relays will be used to fetch admin relay lists (kind 10002) and store them
|
||||
// in the database so they're found next time
|
||||
if len(urls) == 0 {
|
||||
failoverRelays := []string{
|
||||
"wss://nostr.land",
|
||||
"wss://nostr.wine",
|
||||
"wss://nos.lol",
|
||||
"wss://relay.damus.io",
|
||||
"wss://nostr.band",
|
||||
}
|
||||
log.I.F("using failover relays: %v", failoverRelays)
|
||||
for _, relay := range failoverRelays {
|
||||
n := string(normalize.URL(relay))
|
||||
if n == "" {
|
||||
log.W.F("invalid failover relay URL: %s", relay)
|
||||
continue
|
||||
}
|
||||
// Skip if this URL is one of our configured self relay addresses or hosts
|
||||
if _, isSelf := selfSet[n]; isSelf {
|
||||
log.D.F("follows syncer: skipping configured self relay address: %s", n)
|
||||
continue
|
||||
}
|
||||
// Host match
|
||||
host := n
|
||||
if i := strings.Index(host, "://"); i >= 0 {
|
||||
host = host[i+3:]
|
||||
}
|
||||
if j := strings.Index(host, "/"); j >= 0 {
|
||||
host = host[:j]
|
||||
}
|
||||
if k := strings.Index(host, ":"); k >= 0 {
|
||||
host = host[:k]
|
||||
}
|
||||
if _, isSelfHost := selfHosts[host]; isSelfHost {
|
||||
log.D.F("follows syncer: skipping configured self relay address: %s", n)
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[n]; ok {
|
||||
continue
|
||||
}
|
||||
seen[n] = struct{}{}
|
||||
urls = append(urls, n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -451,6 +497,7 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
|
||||
keepaliveTicker := time.NewTicker(30 * time.Second)
|
||||
defer keepaliveTicker.Stop()
|
||||
|
||||
readLoop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -460,7 +507,7 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
|
||||
// Send ping to keep connection alive
|
||||
if err := c.Ping(ctx); err != nil {
|
||||
log.T.F("follows syncer: ping failed for %s: %v", u, err)
|
||||
break
|
||||
break readLoop
|
||||
}
|
||||
log.T.F("follows syncer: sent ping to %s", u)
|
||||
continue
|
||||
@@ -471,7 +518,7 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
|
||||
readCancel()
|
||||
if err != nil {
|
||||
_ = c.Close(websocket.StatusNormalClosure, "read err")
|
||||
break
|
||||
break readLoop
|
||||
}
|
||||
label, rem, err := envelopes.Identify(data)
|
||||
if chk.E(err) {
|
||||
@@ -634,7 +681,7 @@ func (f *Follows) fetchAdminFollowLists() {
|
||||
|
||||
urls := f.adminRelays()
|
||||
if len(urls) == 0 {
|
||||
log.W.F("follows syncer: no admin relays found for follow list fetching")
|
||||
log.W.F("follows syncer: no relays available for follow list fetching (no admin relays, bootstrap relays, or failover relays)")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -680,14 +727,19 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) {
|
||||
|
||||
log.I.F("follows syncer: fetching follow lists from relay %s", relayURL)
|
||||
|
||||
// Create filter for follow lists only (kind 3)
|
||||
// Create filter for follow lists and relay lists (kind 3 and kind 10002)
|
||||
ff := &filter.S{}
|
||||
f1 := &filter.F{
|
||||
Authors: tag.NewFromBytesSlice(authors...),
|
||||
Kinds: kind.NewS(kind.New(kind.FollowList.K)),
|
||||
Limit: values.ToUintPointer(100),
|
||||
}
|
||||
*ff = append(*ff, f1)
|
||||
f2 := &filter.F{
|
||||
Authors: tag.NewFromBytesSlice(authors...),
|
||||
Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
|
||||
Limit: values.ToUintPointer(100),
|
||||
}
|
||||
*ff = append(*ff, f1, f2)
|
||||
|
||||
// Use a specific subscription ID for follow list fetching
|
||||
subID := "follow-lists-fetch"
|
||||
@@ -699,24 +751,28 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) {
|
||||
return
|
||||
}
|
||||
|
||||
log.T.F("follows syncer: sent follow list REQ to %s", relayURL)
|
||||
log.T.F("follows syncer: sent follow list and relay list REQ to %s", relayURL)
|
||||
|
||||
// Read follow list events with timeout
|
||||
// Collect all events before processing
|
||||
var followListEvents []*event.E
|
||||
var relayListEvents []*event.E
|
||||
|
||||
// Read events with timeout
|
||||
timeout := time.After(10 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
goto processEvents
|
||||
case <-timeout:
|
||||
log.T.F("follows syncer: timeout reading follow lists from %s", relayURL)
|
||||
return
|
||||
log.T.F("follows syncer: timeout reading events from %s", relayURL)
|
||||
goto processEvents
|
||||
default:
|
||||
}
|
||||
|
||||
_, data, err := c.Read(ctx)
|
||||
if err != nil {
|
||||
log.T.F("follows syncer: error reading follow lists from %s: %v", relayURL, err)
|
||||
return
|
||||
log.T.F("follows syncer: error reading events from %s: %v", relayURL, err)
|
||||
goto processEvents
|
||||
}
|
||||
|
||||
label, rem, err := envelopes.Identify(data)
|
||||
@@ -731,19 +787,101 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Process follow list events
|
||||
if res.Event.Kind == kind.FollowList.K {
|
||||
// Collect events by kind
|
||||
switch res.Event.Kind {
|
||||
case kind.FollowList.K:
|
||||
log.I.F("follows syncer: received follow list from %s on relay %s",
|
||||
hex.EncodeToString(res.Event.Pubkey), relayURL)
|
||||
f.extractFollowedPubkeys(res.Event)
|
||||
followListEvents = append(followListEvents, res.Event)
|
||||
case kind.RelayListMetadata.K:
|
||||
log.I.F("follows syncer: received relay list from %s on relay %s",
|
||||
hex.EncodeToString(res.Event.Pubkey), relayURL)
|
||||
relayListEvents = append(relayListEvents, res.Event)
|
||||
}
|
||||
case eoseenvelope.L:
|
||||
log.T.F("follows syncer: end of follow list events from %s", relayURL)
|
||||
return
|
||||
log.T.F("follows syncer: end of events from %s", relayURL)
|
||||
goto processEvents
|
||||
default:
|
||||
// ignore other labels
|
||||
}
|
||||
}
|
||||
|
||||
processEvents:
|
||||
// Process collected events - keep only the newest per pubkey and save to database
|
||||
f.processCollectedEvents(relayURL, followListEvents, relayListEvents)
|
||||
}
|
||||
|
||||
// processCollectedEvents processes the collected events, keeping only the newest per pubkey
|
||||
func (f *Follows) processCollectedEvents(relayURL string, followListEvents, relayListEvents []*event.E) {
|
||||
// Process follow list events (kind 3) - keep newest per pubkey
|
||||
latestFollowLists := make(map[string]*event.E)
|
||||
for _, ev := range followListEvents {
|
||||
pubkeyHex := hex.EncodeToString(ev.Pubkey)
|
||||
existing, exists := latestFollowLists[pubkeyHex]
|
||||
if !exists || ev.CreatedAt > existing.CreatedAt {
|
||||
latestFollowLists[pubkeyHex] = ev
|
||||
}
|
||||
}
|
||||
|
||||
// Process relay list events (kind 10002) - keep newest per pubkey
|
||||
latestRelayLists := make(map[string]*event.E)
|
||||
for _, ev := range relayListEvents {
|
||||
pubkeyHex := hex.EncodeToString(ev.Pubkey)
|
||||
existing, exists := latestRelayLists[pubkeyHex]
|
||||
if !exists || ev.CreatedAt > existing.CreatedAt {
|
||||
latestRelayLists[pubkeyHex] = ev
|
||||
}
|
||||
}
|
||||
|
||||
// Save and process the newest events
|
||||
savedFollowLists := 0
|
||||
savedRelayLists := 0
|
||||
|
||||
// Save follow list events to database and extract follows
|
||||
for pubkeyHex, ev := range latestFollowLists {
|
||||
if _, err := f.D.SaveEvent(f.Ctx, ev); err != nil {
|
||||
if !strings.HasPrefix(err.Error(), "blocked:") {
|
||||
log.W.F("follows syncer: failed to save follow list from %s: %v", pubkeyHex, err)
|
||||
}
|
||||
} else {
|
||||
savedFollowLists++
|
||||
log.I.F("follows syncer: saved newest follow list from %s (created_at: %d) from relay %s",
|
||||
pubkeyHex, ev.CreatedAt, relayURL)
|
||||
}
|
||||
|
||||
// Extract followed pubkeys from admin follow lists
|
||||
if f.isAdminPubkey(ev.Pubkey) {
|
||||
log.I.F("follows syncer: processing admin follow list from %s", pubkeyHex)
|
||||
f.extractFollowedPubkeys(ev)
|
||||
}
|
||||
}
|
||||
|
||||
// Save relay list events to database
|
||||
for pubkeyHex, ev := range latestRelayLists {
|
||||
if _, err := f.D.SaveEvent(f.Ctx, ev); err != nil {
|
||||
if !strings.HasPrefix(err.Error(), "blocked:") {
|
||||
log.W.F("follows syncer: failed to save relay list from %s: %v", pubkeyHex, err)
|
||||
}
|
||||
} else {
|
||||
savedRelayLists++
|
||||
log.I.F("follows syncer: saved newest relay list from %s (created_at: %d) from relay %s",
|
||||
pubkeyHex, ev.CreatedAt, relayURL)
|
||||
}
|
||||
}
|
||||
|
||||
log.I.F("follows syncer: processed %d follow lists and %d relay lists from %s, saved %d follow lists and %d relay lists",
|
||||
len(followListEvents), len(relayListEvents), relayURL, savedFollowLists, savedRelayLists)
|
||||
|
||||
// If we saved any relay lists, trigger a refresh of subscriptions to use the new relay lists
|
||||
if savedRelayLists > 0 {
|
||||
log.I.F("follows syncer: saved new relay lists, triggering subscription refresh")
|
||||
// Signal that follows have been updated to refresh subscriptions
|
||||
select {
|
||||
case f.updated <- struct{}{}:
|
||||
default:
|
||||
// Channel might be full, that's okay
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetFollowedPubkeys returns a copy of the followed pubkeys list
|
||||
@@ -783,6 +921,11 @@ func (f *Follows) extractFollowedPubkeys(event *event.E) {
|
||||
}
|
||||
}
|
||||
|
||||
// AdminRelays returns the admin relay URLs
|
||||
func (f *Follows) AdminRelays() []string {
|
||||
return f.adminRelays()
|
||||
}
|
||||
|
||||
// AddFollow appends a pubkey to the in-memory follows list if not already present
|
||||
// and signals the syncer to refresh subscriptions.
|
||||
func (f *Follows) AddFollow(pub []byte) {
|
||||
|
||||
@@ -114,9 +114,20 @@ func UnmarshalQuoted(b []byte) (content, rem []byte, err error) {
|
||||
//
|
||||
// backspace, tab, newline, form feed or carriage return.
|
||||
case '\b', '\t', '\n', '\f', '\r':
|
||||
pos := len(content) - len(rem)
|
||||
contextStart := pos - 10
|
||||
if contextStart < 0 {
|
||||
contextStart = 0
|
||||
}
|
||||
contextEnd := pos + 10
|
||||
if contextEnd > len(content) {
|
||||
contextEnd = len(content)
|
||||
}
|
||||
err = errorf.E(
|
||||
"invalid character '%s' in quoted string",
|
||||
"invalid character '%s' in quoted string (position %d, context: %q)",
|
||||
NostrEscape(nil, rem[:1]),
|
||||
pos,
|
||||
string(content[contextStart:contextEnd]),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
581
pkg/spider/spider.go
Normal file
581
pkg/spider/spider.go
Normal file
@@ -0,0 +1,581 @@
|
||||
package spider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/errorf"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/database"
|
||||
"next.orly.dev/pkg/encoders/filter"
|
||||
"next.orly.dev/pkg/encoders/tag"
|
||||
"next.orly.dev/pkg/encoders/timestamp"
|
||||
"next.orly.dev/pkg/interfaces/publisher"
|
||||
"next.orly.dev/pkg/protocol/ws"
|
||||
)
|
||||
|
||||
const (
|
||||
// BatchSize is the number of pubkeys per subscription batch
|
||||
BatchSize = 20
|
||||
// CatchupWindow is the extra time added to disconnection periods for catch-up
|
||||
CatchupWindow = 30 * time.Minute
|
||||
// ReconnectDelay is the delay between reconnection attempts
|
||||
ReconnectDelay = 5 * time.Second
|
||||
// MaxReconnectDelay is the maximum delay between reconnection attempts
|
||||
MaxReconnectDelay = 5 * time.Minute
|
||||
)
|
||||
|
||||
// Spider manages connections to admin relays and syncs events for followed pubkeys
|
||||
type Spider struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
db *database.D
|
||||
pub publisher.I
|
||||
mode string
|
||||
|
||||
// Configuration
|
||||
adminRelays []string
|
||||
followList [][]byte
|
||||
|
||||
// State management
|
||||
mu sync.RWMutex
|
||||
connections map[string]*RelayConnection
|
||||
running bool
|
||||
|
||||
// Callbacks for getting updated data
|
||||
getAdminRelays func() []string
|
||||
getFollowList func() [][]byte
|
||||
}
|
||||
|
||||
// RelayConnection manages a single relay connection and its subscriptions
|
||||
type RelayConnection struct {
|
||||
url string
|
||||
client *ws.Client
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
spider *Spider
|
||||
|
||||
// Subscription management
|
||||
mu sync.RWMutex
|
||||
subscriptions map[string]*BatchSubscription
|
||||
|
||||
// Disconnection tracking
|
||||
lastDisconnect time.Time
|
||||
reconnectDelay time.Duration
|
||||
}
|
||||
|
||||
// BatchSubscription represents a subscription for a batch of pubkeys
|
||||
type BatchSubscription struct {
|
||||
id string
|
||||
pubkeys [][]byte
|
||||
startTime time.Time
|
||||
sub *ws.Subscription
|
||||
relay *RelayConnection
|
||||
|
||||
// Track disconnection periods for catch-up
|
||||
disconnectedAt *time.Time
|
||||
}
|
||||
|
||||
// DisconnectionPeriod tracks when a subscription was disconnected
|
||||
type DisconnectionPeriod struct {
|
||||
Start time.Time
|
||||
End time.Time
|
||||
}
|
||||
|
||||
// New creates a new Spider instance
|
||||
func New(ctx context.Context, db *database.D, pub publisher.I, mode string) (s *Spider, err error) {
|
||||
if db == nil {
|
||||
err = errorf.E("database cannot be nil")
|
||||
return
|
||||
}
|
||||
|
||||
// Validate mode
|
||||
switch mode {
|
||||
case "follows", "none":
|
||||
// Valid modes
|
||||
default:
|
||||
err = errorf.E("invalid spider mode: %s (valid modes: none, follows)", mode)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
s = &Spider{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
db: db,
|
||||
pub: pub,
|
||||
mode: mode,
|
||||
connections: make(map[string]*RelayConnection),
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// SetCallbacks sets the callback functions for getting updated admin relays and follow lists
|
||||
func (s *Spider) SetCallbacks(getAdminRelays func() []string, getFollowList func() [][]byte) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.getAdminRelays = getAdminRelays
|
||||
s.getFollowList = getFollowList
|
||||
}
|
||||
|
||||
// Start begins the spider operation
|
||||
func (s *Spider) Start() (err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.running {
|
||||
err = errorf.E("spider already running")
|
||||
return
|
||||
}
|
||||
|
||||
// Handle 'none' mode - no-op
|
||||
if s.mode == "none" {
|
||||
log.I.F("spider: mode is 'none', not starting")
|
||||
return
|
||||
}
|
||||
|
||||
if s.getAdminRelays == nil || s.getFollowList == nil {
|
||||
err = errorf.E("callbacks must be set before starting")
|
||||
return
|
||||
}
|
||||
|
||||
s.running = true
|
||||
|
||||
// Start the main loop
|
||||
go s.mainLoop()
|
||||
|
||||
log.I.F("spider: started in '%s' mode", s.mode)
|
||||
return
|
||||
}
|
||||
|
||||
// Stop stops the spider operation
|
||||
func (s *Spider) Stop() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if !s.running {
|
||||
return
|
||||
}
|
||||
|
||||
s.running = false
|
||||
s.cancel()
|
||||
|
||||
// Close all connections
|
||||
for _, conn := range s.connections {
|
||||
conn.close()
|
||||
}
|
||||
s.connections = make(map[string]*RelayConnection)
|
||||
|
||||
log.I.F("spider: stopped")
|
||||
}
|
||||
|
||||
// mainLoop is the main spider loop that manages connections and subscriptions
|
||||
func (s *Spider) mainLoop() {
|
||||
ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.updateConnections()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateConnections updates relay connections based on current admin relays and follow lists
|
||||
func (s *Spider) updateConnections() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if !s.running {
|
||||
return
|
||||
}
|
||||
|
||||
// Get current admin relays and follow list
|
||||
adminRelays := s.getAdminRelays()
|
||||
followList := s.getFollowList()
|
||||
|
||||
if len(adminRelays) == 0 || len(followList) == 0 {
|
||||
log.D.F("spider: no admin relays (%d) or follow list (%d) available",
|
||||
len(adminRelays), len(followList))
|
||||
return
|
||||
}
|
||||
|
||||
// Update connections for current admin relays
|
||||
currentRelays := make(map[string]bool)
|
||||
for _, url := range adminRelays {
|
||||
currentRelays[url] = true
|
||||
|
||||
if conn, exists := s.connections[url]; exists {
|
||||
// Update existing connection
|
||||
conn.updateSubscriptions(followList)
|
||||
} else {
|
||||
// Create new connection
|
||||
s.createConnection(url, followList)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove connections for relays no longer in admin list
|
||||
for url, conn := range s.connections {
|
||||
if !currentRelays[url] {
|
||||
log.I.F("spider: removing connection to %s (no longer in admin relays)", url)
|
||||
conn.close()
|
||||
delete(s.connections, url)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// createConnection creates a new relay connection
|
||||
func (s *Spider) createConnection(url string, followList [][]byte) {
|
||||
log.I.F("spider: creating connection to %s", url)
|
||||
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
conn := &RelayConnection{
|
||||
url: url,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
spider: s,
|
||||
subscriptions: make(map[string]*BatchSubscription),
|
||||
reconnectDelay: ReconnectDelay,
|
||||
}
|
||||
|
||||
s.connections[url] = conn
|
||||
|
||||
// Start connection in goroutine
|
||||
go conn.manage(followList)
|
||||
}
|
||||
|
||||
// manage handles the lifecycle of a relay connection
|
||||
func (rc *RelayConnection) manage(followList [][]byte) {
|
||||
for {
|
||||
select {
|
||||
case <-rc.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Attempt to connect
|
||||
if err := rc.connect(); chk.E(err) {
|
||||
log.W.F("spider: failed to connect to %s: %v", rc.url, err)
|
||||
rc.waitBeforeReconnect()
|
||||
continue
|
||||
}
|
||||
|
||||
log.I.F("spider: connected to %s", rc.url)
|
||||
rc.reconnectDelay = ReconnectDelay // Reset delay on successful connection
|
||||
|
||||
// Create subscriptions for follow list
|
||||
rc.createSubscriptions(followList)
|
||||
|
||||
// Wait for disconnection
|
||||
<-rc.client.Context().Done()
|
||||
|
||||
log.W.F("spider: disconnected from %s: %v", rc.url, rc.client.ConnectionCause())
|
||||
rc.handleDisconnection()
|
||||
|
||||
// Clean up
|
||||
rc.client = nil
|
||||
rc.clearSubscriptions()
|
||||
}
|
||||
}
|
||||
|
||||
// connect establishes a websocket connection to the relay
|
||||
func (rc *RelayConnection) connect() (err error) {
|
||||
connectCtx, cancel := context.WithTimeout(rc.ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if rc.client, err = ws.RelayConnect(connectCtx, rc.url); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// waitBeforeReconnect waits before attempting to reconnect with exponential backoff
|
||||
func (rc *RelayConnection) waitBeforeReconnect() {
|
||||
select {
|
||||
case <-rc.ctx.Done():
|
||||
return
|
||||
case <-time.After(rc.reconnectDelay):
|
||||
}
|
||||
|
||||
// Exponential backoff
|
||||
rc.reconnectDelay *= 2
|
||||
if rc.reconnectDelay > MaxReconnectDelay {
|
||||
rc.reconnectDelay = MaxReconnectDelay
|
||||
}
|
||||
}
|
||||
|
||||
// handleDisconnection records disconnection time for catch-up logic
|
||||
func (rc *RelayConnection) handleDisconnection() {
|
||||
now := time.Now()
|
||||
rc.lastDisconnect = now
|
||||
|
||||
// Mark all subscriptions as disconnected
|
||||
rc.mu.Lock()
|
||||
defer rc.mu.Unlock()
|
||||
|
||||
for _, sub := range rc.subscriptions {
|
||||
if sub.disconnectedAt == nil {
|
||||
sub.disconnectedAt = &now
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// createSubscriptions creates batch subscriptions for the follow list
|
||||
func (rc *RelayConnection) createSubscriptions(followList [][]byte) {
|
||||
rc.mu.Lock()
|
||||
defer rc.mu.Unlock()
|
||||
|
||||
// Clear existing subscriptions
|
||||
rc.clearSubscriptionsLocked()
|
||||
|
||||
// Create batches of pubkeys
|
||||
batches := rc.createBatches(followList)
|
||||
|
||||
log.I.F("spider: creating %d subscription batches for %d pubkeys on %s",
|
||||
len(batches), len(followList), rc.url)
|
||||
|
||||
for i, batch := range batches {
|
||||
batchID := fmt.Sprintf("batch-%d", i) // Simple batch ID
|
||||
rc.createBatchSubscription(batchID, batch)
|
||||
}
|
||||
}
|
||||
|
||||
// createBatches splits the follow list into batches of BatchSize
|
||||
func (rc *RelayConnection) createBatches(followList [][]byte) (batches [][][]byte) {
|
||||
for i := 0; i < len(followList); i += BatchSize {
|
||||
end := i + BatchSize
|
||||
if end > len(followList) {
|
||||
end = len(followList)
|
||||
}
|
||||
|
||||
batch := make([][]byte, end-i)
|
||||
copy(batch, followList[i:end])
|
||||
batches = append(batches, batch)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// createBatchSubscription creates a subscription for a batch of pubkeys
|
||||
func (rc *RelayConnection) createBatchSubscription(batchID string, pubkeys [][]byte) {
|
||||
if rc.client == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Create filters: one for authors, one for p tags
|
||||
var pTags tag.S
|
||||
for _, pk := range pubkeys {
|
||||
pTags = append(pTags, tag.NewFromAny("p", pk))
|
||||
}
|
||||
|
||||
filters := filter.NewS(
|
||||
&filter.F{
|
||||
Authors: tag.NewFromBytesSlice(pubkeys...),
|
||||
},
|
||||
&filter.F{
|
||||
Tags: tag.NewS(pTags...),
|
||||
},
|
||||
)
|
||||
|
||||
// Subscribe
|
||||
sub, err := rc.client.Subscribe(rc.ctx, filters)
|
||||
if chk.E(err) {
|
||||
log.E.F("spider: failed to create subscription %s on %s: %v", batchID, rc.url, err)
|
||||
return
|
||||
}
|
||||
|
||||
batchSub := &BatchSubscription{
|
||||
id: batchID,
|
||||
pubkeys: pubkeys,
|
||||
startTime: time.Now(),
|
||||
sub: sub,
|
||||
relay: rc,
|
||||
}
|
||||
|
||||
rc.subscriptions[batchID] = batchSub
|
||||
|
||||
// Start event handler
|
||||
go batchSub.handleEvents()
|
||||
|
||||
log.D.F("spider: created subscription %s for %d pubkeys on %s",
|
||||
batchID, len(pubkeys), rc.url)
|
||||
}
|
||||
|
||||
// handleEvents processes events from the subscription
|
||||
func (bs *BatchSubscription) handleEvents() {
|
||||
for {
|
||||
select {
|
||||
case <-bs.relay.ctx.Done():
|
||||
return
|
||||
case ev := <-bs.sub.Events:
|
||||
if ev == nil {
|
||||
return // Subscription closed
|
||||
}
|
||||
|
||||
// Save event to database
|
||||
if _, err := bs.relay.spider.db.SaveEvent(bs.relay.ctx, ev); err != nil {
|
||||
if !chk.E(err) {
|
||||
log.T.F("spider: saved event %s from %s",
|
||||
hex.EncodeToString(ev.ID[:]), bs.relay.url)
|
||||
}
|
||||
} else {
|
||||
// Publish event if it was newly saved
|
||||
if bs.relay.spider.pub != nil {
|
||||
go bs.relay.spider.pub.Deliver(ev)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateSubscriptions updates subscriptions for a connection with new follow list
|
||||
func (rc *RelayConnection) updateSubscriptions(followList [][]byte) {
|
||||
if rc.client == nil || !rc.client.IsConnected() {
|
||||
return // Will be handled on reconnection
|
||||
}
|
||||
|
||||
rc.mu.Lock()
|
||||
defer rc.mu.Unlock()
|
||||
|
||||
// Check if we need to perform catch-up for disconnected subscriptions
|
||||
now := time.Now()
|
||||
needsCatchup := false
|
||||
|
||||
for _, sub := range rc.subscriptions {
|
||||
if sub.disconnectedAt != nil {
|
||||
needsCatchup = true
|
||||
rc.performCatchup(sub, *sub.disconnectedAt, now, followList)
|
||||
sub.disconnectedAt = nil // Clear disconnection marker
|
||||
}
|
||||
}
|
||||
|
||||
if needsCatchup {
|
||||
log.I.F("spider: performed catch-up for disconnected subscriptions on %s", rc.url)
|
||||
}
|
||||
|
||||
// Recreate subscriptions with updated follow list
|
||||
rc.clearSubscriptionsLocked()
|
||||
|
||||
batches := rc.createBatches(followList)
|
||||
for i, batch := range batches {
|
||||
batchID := fmt.Sprintf("batch-%d", i)
|
||||
rc.createBatchSubscription(batchID, batch)
|
||||
}
|
||||
}
|
||||
|
||||
// performCatchup queries for events missed during disconnection
|
||||
func (rc *RelayConnection) performCatchup(sub *BatchSubscription, disconnectTime, reconnectTime time.Time, followList [][]byte) {
|
||||
// Expand time window by CatchupWindow on both sides
|
||||
since := disconnectTime.Add(-CatchupWindow)
|
||||
until := reconnectTime.Add(CatchupWindow)
|
||||
|
||||
log.I.F("spider: performing catch-up for %s from %v to %v (expanded window)",
|
||||
rc.url, since, until)
|
||||
|
||||
// Create catch-up filters with time constraints
|
||||
sinceTs := timestamp.T{V: since.Unix()}
|
||||
untilTs := timestamp.T{V: until.Unix()}
|
||||
|
||||
var pTags tag.S
|
||||
for _, pk := range sub.pubkeys {
|
||||
pTags = append(pTags, tag.NewFromAny("p", pk))
|
||||
}
|
||||
|
||||
filters := filter.NewS(
|
||||
&filter.F{
|
||||
Authors: tag.NewFromBytesSlice(sub.pubkeys...),
|
||||
Since: &sinceTs,
|
||||
Until: &untilTs,
|
||||
},
|
||||
&filter.F{
|
||||
Tags: tag.NewS(pTags...),
|
||||
Since: &sinceTs,
|
||||
Until: &untilTs,
|
||||
},
|
||||
)
|
||||
|
||||
// Create temporary subscription for catch-up
|
||||
catchupCtx, cancel := context.WithTimeout(rc.ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
catchupSub, err := rc.client.Subscribe(catchupCtx, filters)
|
||||
if chk.E(err) {
|
||||
log.E.F("spider: failed to create catch-up subscription on %s: %v", rc.url, err)
|
||||
return
|
||||
}
|
||||
defer catchupSub.Unsub()
|
||||
|
||||
// Process catch-up events
|
||||
eventCount := 0
|
||||
timeout := time.After(30 * time.Second)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-catchupCtx.Done():
|
||||
log.D.F("spider: catch-up completed on %s, processed %d events", rc.url, eventCount)
|
||||
return
|
||||
case <-timeout:
|
||||
log.D.F("spider: catch-up timeout on %s, processed %d events", rc.url, eventCount)
|
||||
return
|
||||
case <-catchupSub.EndOfStoredEvents:
|
||||
log.D.F("spider: catch-up EOSE on %s, processed %d events", rc.url, eventCount)
|
||||
return
|
||||
case ev := <-catchupSub.Events:
|
||||
if ev == nil {
|
||||
return
|
||||
}
|
||||
|
||||
eventCount++
|
||||
|
||||
// Save event to database
|
||||
if _, err := rc.spider.db.SaveEvent(rc.ctx, ev); err != nil {
|
||||
if !chk.E(err) {
|
||||
log.T.F("spider: catch-up saved event %s from %s",
|
||||
hex.EncodeToString(ev.ID[:]), rc.url)
|
||||
}
|
||||
} else {
|
||||
// Publish event if it was newly saved
|
||||
if rc.spider.pub != nil {
|
||||
go rc.spider.pub.Deliver(ev)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// clearSubscriptions clears all subscriptions (with lock)
|
||||
func (rc *RelayConnection) clearSubscriptions() {
|
||||
rc.mu.Lock()
|
||||
defer rc.mu.Unlock()
|
||||
rc.clearSubscriptionsLocked()
|
||||
}
|
||||
|
||||
// clearSubscriptionsLocked clears all subscriptions (without lock)
|
||||
func (rc *RelayConnection) clearSubscriptionsLocked() {
|
||||
for _, sub := range rc.subscriptions {
|
||||
if sub.sub != nil {
|
||||
sub.sub.Unsub()
|
||||
}
|
||||
}
|
||||
rc.subscriptions = make(map[string]*BatchSubscription)
|
||||
}
|
||||
|
||||
// close closes the relay connection
|
||||
func (rc *RelayConnection) close() {
|
||||
rc.clearSubscriptions()
|
||||
|
||||
if rc.client != nil {
|
||||
rc.client.Close()
|
||||
rc.client = nil
|
||||
}
|
||||
|
||||
rc.cancel()
|
||||
}
|
||||
244
pkg/spider/spider_test.go
Normal file
244
pkg/spider/spider_test.go
Normal file
@@ -0,0 +1,244 @@
|
||||
package spider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"next.orly.dev/pkg/database"
|
||||
)
|
||||
|
||||
func TestSpiderCreation(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create a temporary database for testing
|
||||
tempDir, err := os.MkdirTemp("", "spider-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
db, err := database.New(ctx, cancel, tempDir, "error")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Test spider creation
|
||||
spider, err := New(ctx, db, nil, "follows")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create spider: %v", err)
|
||||
}
|
||||
|
||||
if spider == nil {
|
||||
t.Fatal("Spider is nil")
|
||||
}
|
||||
|
||||
// Test that spider is not running initially
|
||||
spider.mu.RLock()
|
||||
running := spider.running
|
||||
spider.mu.RUnlock()
|
||||
|
||||
if running {
|
||||
t.Error("Spider should not be running initially")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSpiderCallbacks(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create a temporary database for testing
|
||||
tempDir, err := os.MkdirTemp("", "spider-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
db, err := database.New(ctx, cancel, tempDir, "error")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
spider, err := New(ctx, db, nil, "follows")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create spider: %v", err)
|
||||
}
|
||||
|
||||
// Test callback setup
|
||||
testRelays := []string{"wss://relay1.example.com", "wss://relay2.example.com"}
|
||||
testPubkeys := [][]byte{{1, 2, 3}, {4, 5, 6}}
|
||||
|
||||
spider.SetCallbacks(
|
||||
func() []string { return testRelays },
|
||||
func() [][]byte { return testPubkeys },
|
||||
)
|
||||
|
||||
// Verify callbacks are set
|
||||
spider.mu.RLock()
|
||||
hasCallbacks := spider.getAdminRelays != nil && spider.getFollowList != nil
|
||||
spider.mu.RUnlock()
|
||||
|
||||
if !hasCallbacks {
|
||||
t.Error("Callbacks should be set")
|
||||
}
|
||||
|
||||
// Test that start fails without callbacks being set first
|
||||
spider2, err := New(ctx, db, nil, "follows")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create second spider: %v", err)
|
||||
}
|
||||
|
||||
err = spider2.Start()
|
||||
if err == nil {
|
||||
t.Error("Start should fail when callbacks are not set")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSpiderModeValidation(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create a temporary database for testing
|
||||
tempDir, err := os.MkdirTemp("", "spider-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
db, err := database.New(ctx, cancel, tempDir, "error")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Test valid mode
|
||||
spider, err := New(ctx, db, nil, "follows")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create spider with valid mode: %v", err)
|
||||
}
|
||||
if spider == nil {
|
||||
t.Fatal("Spider should not be nil for valid mode")
|
||||
}
|
||||
|
||||
// Test invalid mode
|
||||
_, err = New(ctx, db, nil, "invalid")
|
||||
if err == nil {
|
||||
t.Error("Should fail with invalid mode")
|
||||
}
|
||||
|
||||
// Test none mode (should succeed but be a no-op)
|
||||
spider2, err := New(ctx, db, nil, "none")
|
||||
if err != nil {
|
||||
t.Errorf("Should succeed with 'none' mode: %v", err)
|
||||
}
|
||||
if spider2 == nil {
|
||||
t.Error("Spider should not be nil for 'none' mode")
|
||||
}
|
||||
|
||||
// Test that 'none' mode doesn't require callbacks
|
||||
err = spider2.Start()
|
||||
if err != nil {
|
||||
t.Errorf("'none' mode should start without callbacks: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSpiderBatching(t *testing.T) {
|
||||
// Test batch creation logic
|
||||
followList := make([][]byte, 50) // 50 pubkeys
|
||||
for i := range followList {
|
||||
followList[i] = make([]byte, 32)
|
||||
for j := range followList[i] {
|
||||
followList[i][j] = byte(i)
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
rc := &RelayConnection{
|
||||
url: "wss://test.relay.com",
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
batches := rc.createBatches(followList)
|
||||
|
||||
// Should create 3 batches: 20, 20, 10
|
||||
expectedBatches := 3
|
||||
if len(batches) != expectedBatches {
|
||||
t.Errorf("Expected %d batches, got %d", expectedBatches, len(batches))
|
||||
}
|
||||
|
||||
// Check batch sizes
|
||||
if len(batches[0]) != BatchSize {
|
||||
t.Errorf("First batch should have %d pubkeys, got %d", BatchSize, len(batches[0]))
|
||||
}
|
||||
if len(batches[1]) != BatchSize {
|
||||
t.Errorf("Second batch should have %d pubkeys, got %d", BatchSize, len(batches[1]))
|
||||
}
|
||||
if len(batches[2]) != 10 {
|
||||
t.Errorf("Third batch should have 10 pubkeys, got %d", len(batches[2]))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSpiderStartStop(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create a temporary database for testing
|
||||
tempDir, err := os.MkdirTemp("", "spider-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
db, err := database.New(ctx, cancel, tempDir, "error")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
spider, err := New(ctx, db, nil, "follows")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create spider: %v", err)
|
||||
}
|
||||
|
||||
// Set up callbacks
|
||||
spider.SetCallbacks(
|
||||
func() []string { return []string{"wss://test.relay.com"} },
|
||||
func() [][]byte { return [][]byte{{1, 2, 3}} },
|
||||
)
|
||||
|
||||
// Test start
|
||||
err = spider.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start spider: %v", err)
|
||||
}
|
||||
|
||||
// Verify spider is running
|
||||
spider.mu.RLock()
|
||||
running := spider.running
|
||||
spider.mu.RUnlock()
|
||||
|
||||
if !running {
|
||||
t.Error("Spider should be running after start")
|
||||
}
|
||||
|
||||
// Test stop
|
||||
spider.Stop()
|
||||
|
||||
// Give it a moment to stop
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Verify spider is stopped
|
||||
spider.mu.RLock()
|
||||
running = spider.running
|
||||
spider.mu.RUnlock()
|
||||
|
||||
if running {
|
||||
t.Error("Spider should not be running after stop")
|
||||
}
|
||||
}
|
||||
@@ -1 +1 @@
|
||||
v0.17.12
|
||||
v0.17.15
|
||||
Reference in New Issue
Block a user