diff --git a/app/config/config.go b/app/config/config.go index 5c52374..8d6ba22 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -68,6 +68,11 @@ type C struct { // Spider settings SpiderMode string `env:"ORLY_SPIDER_MODE" default:"none" usage:"spider mode for syncing events: none, follows"` + // Directory Spider settings + DirectorySpiderEnabled bool `env:"ORLY_DIRECTORY_SPIDER" default:"false" usage:"enable directory spider for metadata sync (kinds 0, 3, 10000, 10002)"` + DirectorySpiderInterval time.Duration `env:"ORLY_DIRECTORY_SPIDER_INTERVAL" default:"24h" usage:"how often to run directory spider"` + DirectorySpiderMaxHops int `env:"ORLY_DIRECTORY_SPIDER_HOPS" default:"3" usage:"maximum hops for relay discovery from seed users"` + PolicyEnabled bool `env:"ORLY_POLICY_ENABLED" default:"false" usage:"enable policy-based event processing (configuration found in $HOME/.config/ORLY/policy.json)"` // NIP-43 Relay Access Metadata and Requests diff --git a/app/main.go b/app/main.go index d364df9..18c6b47 100644 --- a/app/main.go +++ b/app/main.go @@ -141,6 +141,44 @@ func Run( } } + // Initialize directory spider if enabled (only for Badger backend) + if badgerDB, ok := db.(*database.D); ok && cfg.DirectorySpiderEnabled { + if l.directorySpider, err = spider.NewDirectorySpider( + ctx, + badgerDB, + l.publishers, + cfg.DirectorySpiderInterval, + cfg.DirectorySpiderMaxHops, + ); chk.E(err) { + log.E.F("failed to create directory spider: %v", err) + } else { + // Set up callback to get seed pubkeys (whitelisted users) + l.directorySpider.SetSeedCallback(func() [][]byte { + var pubkeys [][]byte + // Get followed pubkeys from follows ACL if available + for _, aclInstance := range acl.Registry.ACL { + if aclInstance.Type() == "follows" { + if follows, ok := aclInstance.(*acl.Follows); ok { + pubkeys = append(pubkeys, follows.GetFollowedPubkeys()...) + } + } + } + // Fall back to admin keys if no follows ACL + if len(pubkeys) == 0 { + pubkeys = adminKeys + } + return pubkeys + }) + + if err = l.directorySpider.Start(); chk.E(err) { + log.E.F("failed to start directory spider: %v", err) + } else { + log.I.F("directory spider started (interval: %v, max hops: %d)", + cfg.DirectorySpiderInterval, cfg.DirectorySpiderMaxHops) + } + } + } + // Initialize relay group manager (only for Badger backend) if badgerDB, ok := db.(*database.D); ok { l.relayGroupMgr = dsync.NewRelayGroupManager(badgerDB, cfg.RelayGroupAdmins) @@ -360,6 +398,12 @@ func Run( log.I.F("spider manager stopped") } + // Stop directory spider if running + if l.directorySpider != nil { + l.directorySpider.Stop() + log.I.F("directory spider stopped") + } + // Create shutdown context with timeout shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second) defer cancelShutdown() diff --git a/app/server.go b/app/server.go index a24864c..e73b328 100644 --- a/app/server.go +++ b/app/server.go @@ -48,17 +48,22 @@ type Server struct { challengeMutex sync.RWMutex challenges map[string][]byte - paymentProcessor *PaymentProcessor - sprocketManager *SprocketManager - policyManager *policy.P - spiderManager *spider.Spider - syncManager *dsync.Manager - relayGroupMgr *dsync.RelayGroupManager - clusterManager *dsync.ClusterManager - blossomServer *blossom.Server - InviteManager *nip43.InviteManager - cfg *config.C - db database.Database // Changed from *database.D to interface + // Message processing pause mutex for policy/follow list updates + // Use RLock() for normal message processing, Lock() for updates + messagePauseMutex sync.RWMutex + + paymentProcessor *PaymentProcessor + sprocketManager *SprocketManager + policyManager *policy.P + spiderManager *spider.Spider + directorySpider *spider.DirectorySpider + syncManager *dsync.Manager + relayGroupMgr *dsync.RelayGroupManager + clusterManager *dsync.ClusterManager + blossomServer *blossom.Server + InviteManager *nip43.InviteManager + cfg *config.C + db database.Database // Changed from *database.D to interface } // isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system @@ -1135,3 +1140,32 @@ func (s *Server) updatePeerAdminACL(peerPubkey []byte) { } } } + +// ============================================================================= +// Message Processing Pause/Resume for Policy and Follow List Updates +// ============================================================================= + +// PauseMessageProcessing acquires an exclusive lock to pause all message processing. +// This should be called before updating policy configuration or follow lists. +// Call ResumeMessageProcessing to release the lock after updates are complete. +func (s *Server) PauseMessageProcessing() { + s.messagePauseMutex.Lock() +} + +// ResumeMessageProcessing releases the exclusive lock to resume message processing. +// This should be called after policy configuration or follow list updates are complete. +func (s *Server) ResumeMessageProcessing() { + s.messagePauseMutex.Unlock() +} + +// AcquireMessageProcessingLock acquires a read lock for normal message processing. +// This allows concurrent message processing while blocking during policy updates. +// Call ReleaseMessageProcessingLock when message processing is complete. +func (s *Server) AcquireMessageProcessingLock() { + s.messagePauseMutex.RLock() +} + +// ReleaseMessageProcessingLock releases the read lock after message processing. +func (s *Server) ReleaseMessageProcessingLock() { + s.messagePauseMutex.RUnlock() +} diff --git a/pkg/spider/directory.go b/pkg/spider/directory.go new file mode 100644 index 0000000..8420796 --- /dev/null +++ b/pkg/spider/directory.go @@ -0,0 +1,612 @@ +package spider + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "git.mleku.dev/mleku/nostr/crypto/keys" + "git.mleku.dev/mleku/nostr/encoders/event" + "git.mleku.dev/mleku/nostr/encoders/filter" + "git.mleku.dev/mleku/nostr/encoders/kind" + "git.mleku.dev/mleku/nostr/encoders/tag" + "git.mleku.dev/mleku/nostr/utils/normalize" + "git.mleku.dev/mleku/nostr/ws" + "lol.mleku.dev/chk" + "lol.mleku.dev/errorf" + "lol.mleku.dev/log" + "next.orly.dev/pkg/database" + "next.orly.dev/pkg/interfaces/publisher" + dsync "next.orly.dev/pkg/sync" +) + +const ( + // DirectorySpiderDefaultInterval is how often the directory spider runs + DirectorySpiderDefaultInterval = 24 * time.Hour + // DirectorySpiderDefaultMaxHops is the maximum hop distance for relay discovery + DirectorySpiderDefaultMaxHops = 3 + // DirectorySpiderRelayTimeout is the timeout for connecting to and querying a relay + DirectorySpiderRelayTimeout = 30 * time.Second + // DirectorySpiderQueryTimeout is the timeout for waiting for EOSE on a query + DirectorySpiderQueryTimeout = 60 * time.Second + // DirectorySpiderRelayDelay is the delay between processing relays (rate limiting) + DirectorySpiderRelayDelay = 500 * time.Millisecond + // DirectorySpiderMaxEventsPerQuery is the limit for each query + DirectorySpiderMaxEventsPerQuery = 5000 +) + +// DirectorySpider manages periodic relay discovery and metadata synchronization. +// It discovers relays by crawling kind 10002 (relay list) events, expanding outward +// in hops from seed pubkeys (whitelisted users), then fetches essential metadata +// events (kinds 0, 3, 10000, 10002) from all discovered relays. +type DirectorySpider struct { + ctx context.Context + cancel context.CancelFunc + db *database.D + pub publisher.I + + // Configuration + interval time.Duration + maxHops int + + // State + running atomic.Bool + lastRun time.Time + + // Relay discovery state (reset each run) + mu sync.Mutex + discoveredRelays map[string]int // URL -> hop distance + processedRelays map[string]bool // Already fetched metadata from + + // Self-detection + relayIdentityPubkey string + selfURLs map[string]bool + nip11Cache *dsync.NIP11Cache + + // Callback for getting seed pubkeys (whitelisted users) + getSeedPubkeys func() [][]byte + + // Trigger channel for manual runs + triggerChan chan struct{} +} + +// NewDirectorySpider creates a new DirectorySpider instance. +func NewDirectorySpider( + ctx context.Context, + db *database.D, + pub publisher.I, + interval time.Duration, + maxHops int, +) (ds *DirectorySpider, err error) { + if db == nil { + err = errorf.E("database cannot be nil") + return + } + + if interval <= 0 { + interval = DirectorySpiderDefaultInterval + } + if maxHops <= 0 { + maxHops = DirectorySpiderDefaultMaxHops + } + + ctx, cancel := context.WithCancel(ctx) + + // Get relay identity pubkey for self-detection + var relayPubkey string + if skb, err := db.GetRelayIdentitySecret(); err == nil && len(skb) == 32 { + pk, _ := keys.SecretBytesToPubKeyHex(skb) + relayPubkey = pk + } + + ds = &DirectorySpider{ + ctx: ctx, + cancel: cancel, + db: db, + pub: pub, + interval: interval, + maxHops: maxHops, + discoveredRelays: make(map[string]int), + processedRelays: make(map[string]bool), + relayIdentityPubkey: relayPubkey, + selfURLs: make(map[string]bool), + nip11Cache: dsync.NewNIP11Cache(30 * time.Minute), + triggerChan: make(chan struct{}, 1), + } + + return +} + +// SetSeedCallback sets the callback function for getting seed pubkeys (whitelisted users). +func (ds *DirectorySpider) SetSeedCallback(getSeedPubkeys func() [][]byte) { + ds.mu.Lock() + defer ds.mu.Unlock() + ds.getSeedPubkeys = getSeedPubkeys +} + +// Start begins the directory spider operation. +func (ds *DirectorySpider) Start() (err error) { + if ds.running.Load() { + err = errorf.E("directory spider already running") + return + } + + if ds.getSeedPubkeys == nil { + err = errorf.E("seed callback must be set before starting") + return + } + + ds.running.Store(true) + go ds.mainLoop() + + log.I.F("directory spider: started (interval: %v, max hops: %d)", ds.interval, ds.maxHops) + return +} + +// Stop stops the directory spider operation. +func (ds *DirectorySpider) Stop() { + if !ds.running.Load() { + return + } + + ds.running.Store(false) + ds.cancel() + + log.I.F("directory spider: stopped") +} + +// TriggerNow forces an immediate run of the directory spider. +func (ds *DirectorySpider) TriggerNow() { + select { + case ds.triggerChan <- struct{}{}: + log.I.F("directory spider: manual trigger sent") + default: + log.I.F("directory spider: trigger already pending") + } +} + +// LastRun returns the time of the last completed run. +func (ds *DirectorySpider) LastRun() time.Time { + ds.mu.Lock() + defer ds.mu.Unlock() + return ds.lastRun +} + +// mainLoop is the main spider loop that runs periodically. +func (ds *DirectorySpider) mainLoop() { + // Run immediately on start + ds.runOnce() + + ticker := time.NewTicker(ds.interval) + defer ticker.Stop() + + log.I.F("directory spider: main loop started, running every %v", ds.interval) + + for { + select { + case <-ds.ctx.Done(): + return + case <-ds.triggerChan: + log.I.F("directory spider: manual trigger received") + ds.runOnce() + case <-ticker.C: + log.I.F("directory spider: scheduled run triggered") + ds.runOnce() + } + } +} + +// runOnce performs a single directory spider run. +func (ds *DirectorySpider) runOnce() { + if !ds.running.Load() { + return + } + + log.I.F("directory spider: starting run") + start := time.Now() + + // Reset state for this run + ds.mu.Lock() + ds.discoveredRelays = make(map[string]int) + ds.processedRelays = make(map[string]bool) + ds.mu.Unlock() + + // Phase 1: Discover relays via hop expansion + if err := ds.discoverRelays(); err != nil { + log.E.F("directory spider: relay discovery failed: %v", err) + return + } + + ds.mu.Lock() + relayCount := len(ds.discoveredRelays) + ds.mu.Unlock() + + log.I.F("directory spider: discovered %d relays", relayCount) + + // Phase 2: Fetch metadata from all discovered relays + if err := ds.fetchMetadataFromRelays(); err != nil { + log.E.F("directory spider: metadata fetch failed: %v", err) + return + } + + ds.mu.Lock() + ds.lastRun = time.Now() + ds.mu.Unlock() + + log.I.F("directory spider: completed run in %v", time.Since(start)) +} + +// discoverRelays performs the multi-hop relay discovery. +func (ds *DirectorySpider) discoverRelays() error { + // Get seed pubkeys from callback + seedPubkeys := ds.getSeedPubkeys() + if len(seedPubkeys) == 0 { + log.W.F("directory spider: no seed pubkeys available") + return nil + } + + log.I.F("directory spider: starting relay discovery with %d seed pubkeys", len(seedPubkeys)) + + // Round 0: Get relay lists from seed pubkeys in local database + seedRelays, err := ds.getRelaysFromLocalDB(seedPubkeys) + if err != nil { + return errorf.W("failed to get relays from local DB: %v", err) + } + + // Add seed relays at hop 0 + ds.mu.Lock() + for _, url := range seedRelays { + if !ds.isSelfRelay(url) { + ds.discoveredRelays[url] = 0 + } + } + ds.mu.Unlock() + + log.I.F("directory spider: found %d seed relays from local database", len(seedRelays)) + + // Rounds 1 to maxHops: Expand outward + for hop := 1; hop <= ds.maxHops; hop++ { + select { + case <-ds.ctx.Done(): + return ds.ctx.Err() + default: + } + + // Get relays at previous hop level that haven't been processed + ds.mu.Lock() + var relaysToProcess []string + for url, hopLevel := range ds.discoveredRelays { + if hopLevel == hop-1 && !ds.processedRelays[url] { + relaysToProcess = append(relaysToProcess, url) + } + } + ds.mu.Unlock() + + if len(relaysToProcess) == 0 { + log.I.F("directory spider: no relays to process at hop %d", hop) + break + } + + log.I.F("directory spider: hop %d - processing %d relays", hop, len(relaysToProcess)) + + newRelaysThisHop := 0 + + // Process each relay serially + for _, relayURL := range relaysToProcess { + select { + case <-ds.ctx.Done(): + return ds.ctx.Err() + default: + } + + // Fetch kind 10002 events from this relay + events, err := ds.fetchRelayListsFromRelay(relayURL) + if err != nil { + log.W.F("directory spider: failed to fetch from %s: %v", relayURL, err) + // Mark as processed even on failure to avoid retrying + ds.mu.Lock() + ds.processedRelays[relayURL] = true + ds.mu.Unlock() + continue + } + + // Extract new relay URLs + newRelays := ds.extractRelaysFromEvents(events) + + ds.mu.Lock() + ds.processedRelays[relayURL] = true + for _, newURL := range newRelays { + if _, exists := ds.discoveredRelays[newURL]; !exists { + if !ds.isSelfRelay(newURL) { + ds.discoveredRelays[newURL] = hop + newRelaysThisHop++ + } + } + } + ds.mu.Unlock() + + // Rate limiting delay between relays + time.Sleep(DirectorySpiderRelayDelay) + } + + log.I.F("directory spider: hop %d - discovered %d new relays", hop, newRelaysThisHop) + } + + return nil +} + +// getRelaysFromLocalDB queries the local database for kind 10002 events from seed pubkeys. +func (ds *DirectorySpider) getRelaysFromLocalDB(seedPubkeys [][]byte) ([]string, error) { + ctx, cancel := context.WithTimeout(ds.ctx, 30*time.Second) + defer cancel() + + // Query for kind 10002 from seed pubkeys + f := &filter.F{ + Authors: tag.NewFromBytesSlice(seedPubkeys...), + Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)), + } + + events, err := ds.db.QueryEvents(ctx, f) + if err != nil { + return nil, err + } + + return ds.extractRelaysFromEvents(events), nil +} + +// fetchRelayListsFromRelay connects to a relay and fetches all kind 10002 events. +func (ds *DirectorySpider) fetchRelayListsFromRelay(relayURL string) ([]*event.E, error) { + ctx, cancel := context.WithTimeout(ds.ctx, DirectorySpiderRelayTimeout) + defer cancel() + + log.D.F("directory spider: connecting to %s", relayURL) + + client, err := ws.RelayConnect(ctx, relayURL) + if err != nil { + return nil, errorf.W("failed to connect: %v", err) + } + defer client.Close() + + // Query for all kind 10002 events + limit := uint(DirectorySpiderMaxEventsPerQuery) + f := filter.NewS(&filter.F{ + Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)), + Limit: &limit, + }) + + sub, err := client.Subscribe(ctx, f) + if err != nil { + return nil, errorf.W("failed to subscribe: %v", err) + } + defer sub.Unsub() + + var events []*event.E + queryCtx, queryCancel := context.WithTimeout(ctx, DirectorySpiderQueryTimeout) + defer queryCancel() + + // Collect events until EOSE or timeout + for { + select { + case <-queryCtx.Done(): + log.D.F("directory spider: query timeout for %s, got %d events", relayURL, len(events)) + return events, nil + case <-sub.EndOfStoredEvents: + log.D.F("directory spider: EOSE from %s, got %d events", relayURL, len(events)) + return events, nil + case ev := <-sub.Events: + if ev == nil { + return events, nil + } + events = append(events, ev) + } + } +} + +// extractRelaysFromEvents parses kind 10002 events and extracts relay URLs from "r" tags. +func (ds *DirectorySpider) extractRelaysFromEvents(events []*event.E) []string { + seen := make(map[string]bool) + var relays []string + + for _, ev := range events { + // Get all "r" tags + rTags := ev.Tags.GetAll([]byte("r")) + for _, rTag := range rTags { + if len(rTag.T) < 2 { + continue + } + urlBytes := rTag.Value() + if len(urlBytes) == 0 { + continue + } + + // Normalize the URL + normalized := string(normalize.URL(string(urlBytes))) + if normalized == "" { + continue + } + + if !seen[normalized] { + seen[normalized] = true + relays = append(relays, normalized) + } + } + } + + return relays +} + +// fetchMetadataFromRelays iterates through all discovered relays and fetches metadata. +func (ds *DirectorySpider) fetchMetadataFromRelays() error { + ds.mu.Lock() + // Copy relay list to avoid holding lock during network operations + var relays []string + for url := range ds.discoveredRelays { + relays = append(relays, url) + } + ds.mu.Unlock() + + log.I.F("directory spider: fetching metadata from %d relays", len(relays)) + + // Kinds to fetch: 0 (profile), 3 (follow list), 10000 (mute list), 10002 (relay list) + kindsToFetch := []uint16{ + kind.ProfileMetadata.K, // 0 + kind.FollowList.K, // 3 + kind.MuteList.K, // 10000 + kind.RelayListMetadata.K, // 10002 + } + + totalSaved := 0 + totalDuplicates := 0 + + for _, relayURL := range relays { + select { + case <-ds.ctx.Done(): + return ds.ctx.Err() + default: + } + + ds.mu.Lock() + alreadyProcessed := ds.processedRelays[relayURL] + ds.mu.Unlock() + + if alreadyProcessed { + continue + } + + log.D.F("directory spider: fetching metadata from %s", relayURL) + + for _, k := range kindsToFetch { + select { + case <-ds.ctx.Done(): + return ds.ctx.Err() + default: + } + + events, err := ds.fetchKindFromRelay(relayURL, k) + if err != nil { + log.W.F("directory spider: failed to fetch kind %d from %s: %v", k, relayURL, err) + continue + } + + saved, duplicates := ds.storeEvents(events) + totalSaved += saved + totalDuplicates += duplicates + + log.D.F("directory spider: kind %d from %s: %d saved, %d duplicates", + k, relayURL, saved, duplicates) + } + + ds.mu.Lock() + ds.processedRelays[relayURL] = true + ds.mu.Unlock() + + // Rate limiting delay between relays + time.Sleep(DirectorySpiderRelayDelay) + } + + log.I.F("directory spider: metadata fetch complete - %d events saved, %d duplicates", + totalSaved, totalDuplicates) + + return nil +} + +// fetchKindFromRelay connects to a relay and fetches events of a specific kind. +func (ds *DirectorySpider) fetchKindFromRelay(relayURL string, k uint16) ([]*event.E, error) { + ctx, cancel := context.WithTimeout(ds.ctx, DirectorySpiderRelayTimeout) + defer cancel() + + client, err := ws.RelayConnect(ctx, relayURL) + if err != nil { + return nil, errorf.W("failed to connect: %v", err) + } + defer client.Close() + + // Query for events of this kind + limit := uint(DirectorySpiderMaxEventsPerQuery) + f := filter.NewS(&filter.F{ + Kinds: kind.NewS(kind.New(k)), + Limit: &limit, + }) + + sub, err := client.Subscribe(ctx, f) + if err != nil { + return nil, errorf.W("failed to subscribe: %v", err) + } + defer sub.Unsub() + + var events []*event.E + queryCtx, queryCancel := context.WithTimeout(ctx, DirectorySpiderQueryTimeout) + defer queryCancel() + + for { + select { + case <-queryCtx.Done(): + return events, nil + case <-sub.EndOfStoredEvents: + return events, nil + case ev := <-sub.Events: + if ev == nil { + return events, nil + } + events = append(events, ev) + } + } +} + +// storeEvents saves events to the database and publishes new ones. +func (ds *DirectorySpider) storeEvents(events []*event.E) (saved, duplicates int) { + for _, ev := range events { + _, err := ds.db.SaveEvent(ds.ctx, ev) + if err != nil { + if chk.T(err) { + // Most errors are duplicates, which is expected + duplicates++ + } + continue + } + saved++ + + // Publish event to active subscribers + if ds.pub != nil { + go ds.pub.Deliver(ev) + } + } + return +} + +// isSelfRelay checks if a relay URL is ourselves by comparing NIP-11 pubkeys. +func (ds *DirectorySpider) isSelfRelay(relayURL string) bool { + // If we don't have a relay identity pubkey, can't compare + if ds.relayIdentityPubkey == "" { + return false + } + + ds.mu.Lock() + // Fast path: check if we already know this URL is ours + if ds.selfURLs[relayURL] { + ds.mu.Unlock() + return true + } + ds.mu.Unlock() + + // Slow path: check via NIP-11 pubkey + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + peerPubkey, err := ds.nip11Cache.GetPubkey(ctx, relayURL) + if err != nil { + // Can't determine, assume not self + return false + } + + if peerPubkey == ds.relayIdentityPubkey { + log.D.F("directory spider: discovered self-relay: %s", relayURL) + ds.mu.Lock() + ds.selfURLs[relayURL] = true + ds.mu.Unlock() + return true + } + + return false +} diff --git a/pkg/spider/directory_test.go b/pkg/spider/directory_test.go new file mode 100644 index 0000000..7ca088c --- /dev/null +++ b/pkg/spider/directory_test.go @@ -0,0 +1,166 @@ +package spider + +import ( + "context" + "testing" + "time" + + "git.mleku.dev/mleku/nostr/encoders/event" + "git.mleku.dev/mleku/nostr/encoders/kind" + "git.mleku.dev/mleku/nostr/encoders/tag" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestExtractRelaysFromEvents(t *testing.T) { + ds := &DirectorySpider{} + + tests := []struct { + name string + events []*event.E + expected []string + }{ + { + name: "empty events", + events: []*event.E{}, + expected: []string{}, + }, + { + name: "single event with relays", + events: []*event.E{ + { + Kind: kind.RelayListMetadata.K, + Tags: &tag.S{ + tag.NewFromBytesSlice([]byte("r"), []byte("wss://relay1.example.com")), + tag.NewFromBytesSlice([]byte("r"), []byte("wss://relay2.example.com")), + }, + }, + }, + expected: []string{"wss://relay1.example.com", "wss://relay2.example.com"}, + }, + { + name: "multiple events with duplicate relays", + events: []*event.E{ + { + Kind: kind.RelayListMetadata.K, + Tags: &tag.S{ + tag.NewFromBytesSlice([]byte("r"), []byte("wss://relay1.example.com")), + }, + }, + { + Kind: kind.RelayListMetadata.K, + Tags: &tag.S{ + tag.NewFromBytesSlice([]byte("r"), []byte("wss://relay1.example.com")), + tag.NewFromBytesSlice([]byte("r"), []byte("wss://relay3.example.com")), + }, + }, + }, + expected: []string{"wss://relay1.example.com", "wss://relay3.example.com"}, + }, + { + name: "event with empty r tags", + events: []*event.E{ + { + Kind: kind.RelayListMetadata.K, + Tags: &tag.S{ + tag.NewFromBytesSlice([]byte("r")), // empty value + tag.NewFromBytesSlice([]byte("r"), []byte("wss://valid.relay.com")), + }, + }, + }, + expected: []string{"wss://valid.relay.com"}, + }, + { + name: "normalizes relay URLs", + events: []*event.E{ + { + Kind: kind.RelayListMetadata.K, + Tags: &tag.S{ + tag.NewFromBytesSlice([]byte("r"), []byte("wss://relay.example.com")), + tag.NewFromBytesSlice([]byte("r"), []byte("wss://relay.example.com/")), // duplicate with trailing slash + }, + }, + }, + expected: []string{"wss://relay.example.com"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ds.extractRelaysFromEvents(tt.events) + + // For empty case, check length + if len(tt.expected) == 0 { + assert.Empty(t, result) + return + } + + // Check that all expected relays are present (order may vary) + assert.Len(t, result, len(tt.expected)) + for _, expected := range tt.expected { + assert.Contains(t, result, expected) + } + }) + } +} + +func TestDirectorySpiderLifecycle(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create spider without database (will return error) + _, err := NewDirectorySpider(ctx, nil, nil, 0, 0) + require.Error(t, err) + assert.Contains(t, err.Error(), "database cannot be nil") +} + +func TestDirectorySpiderDefaults(t *testing.T) { + // Test that defaults are applied correctly + assert.Equal(t, 24*time.Hour, DirectorySpiderDefaultInterval) + assert.Equal(t, 3, DirectorySpiderDefaultMaxHops) + assert.Equal(t, 30*time.Second, DirectorySpiderRelayTimeout) + assert.Equal(t, 60*time.Second, DirectorySpiderQueryTimeout) + assert.Equal(t, 500*time.Millisecond, DirectorySpiderRelayDelay) + assert.Equal(t, 5000, DirectorySpiderMaxEventsPerQuery) +} + +func TestTriggerNow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ds := &DirectorySpider{ + ctx: ctx, + triggerChan: make(chan struct{}, 1), + } + + // First trigger should succeed + ds.TriggerNow() + + // Verify trigger was sent + select { + case <-ds.triggerChan: + // Expected + default: + t.Error("trigger was not sent") + } + + // Second trigger while channel is empty should also succeed + ds.TriggerNow() + + // But if we trigger again without draining, it should not block + ds.TriggerNow() // Should not block due to select default case +} + +func TestLastRun(t *testing.T) { + ds := &DirectorySpider{} + + // Initially should be zero + assert.True(t, ds.LastRun().IsZero()) + + // Set a time + now := time.Now() + ds.lastRun = now + + // Should return the set time + assert.Equal(t, now, ds.LastRun()) +}