package spider import ( "context" "sync" "sync/atomic" "time" "git.mleku.dev/mleku/nostr/crypto/keys" "git.mleku.dev/mleku/nostr/encoders/event" "git.mleku.dev/mleku/nostr/encoders/filter" "git.mleku.dev/mleku/nostr/encoders/kind" "git.mleku.dev/mleku/nostr/encoders/tag" "git.mleku.dev/mleku/nostr/utils/normalize" "git.mleku.dev/mleku/nostr/ws" "lol.mleku.dev/chk" "lol.mleku.dev/errorf" "lol.mleku.dev/log" "next.orly.dev/pkg/database" "next.orly.dev/pkg/interfaces/publisher" dsync "next.orly.dev/pkg/sync" ) const ( // DirectorySpiderDefaultInterval is how often the directory spider runs DirectorySpiderDefaultInterval = 24 * time.Hour // DirectorySpiderDefaultMaxHops is the maximum hop distance for relay discovery DirectorySpiderDefaultMaxHops = 3 // DirectorySpiderRelayTimeout is the timeout for connecting to and querying a relay DirectorySpiderRelayTimeout = 30 * time.Second // DirectorySpiderQueryTimeout is the timeout for waiting for EOSE on a query DirectorySpiderQueryTimeout = 60 * time.Second // DirectorySpiderRelayDelay is the delay between processing relays (rate limiting) DirectorySpiderRelayDelay = 500 * time.Millisecond // DirectorySpiderMaxEventsPerQuery is the limit for each query DirectorySpiderMaxEventsPerQuery = 5000 ) // DirectorySpider manages periodic relay discovery and metadata synchronization. // It discovers relays by crawling kind 10002 (relay list) events, expanding outward // in hops from seed pubkeys (whitelisted users), then fetches essential metadata // events (kinds 0, 3, 10000, 10002) from all discovered relays. type DirectorySpider struct { ctx context.Context cancel context.CancelFunc db *database.D pub publisher.I // Configuration interval time.Duration maxHops int // State running atomic.Bool lastRun time.Time // Relay discovery state (reset each run) mu sync.Mutex discoveredRelays map[string]int // URL -> hop distance processedRelays map[string]bool // Already fetched metadata from // Self-detection relayIdentityPubkey string selfURLs map[string]bool nip11Cache *dsync.NIP11Cache // Callback for getting seed pubkeys (whitelisted users) getSeedPubkeys func() [][]byte // Trigger channel for manual runs triggerChan chan struct{} } // NewDirectorySpider creates a new DirectorySpider instance. func NewDirectorySpider( ctx context.Context, db *database.D, pub publisher.I, interval time.Duration, maxHops int, ) (ds *DirectorySpider, err error) { if db == nil { err = errorf.E("database cannot be nil") return } if interval <= 0 { interval = DirectorySpiderDefaultInterval } if maxHops <= 0 { maxHops = DirectorySpiderDefaultMaxHops } ctx, cancel := context.WithCancel(ctx) // Get relay identity pubkey for self-detection var relayPubkey string if skb, err := db.GetRelayIdentitySecret(); err == nil && len(skb) == 32 { pk, _ := keys.SecretBytesToPubKeyHex(skb) relayPubkey = pk } ds = &DirectorySpider{ ctx: ctx, cancel: cancel, db: db, pub: pub, interval: interval, maxHops: maxHops, discoveredRelays: make(map[string]int), processedRelays: make(map[string]bool), relayIdentityPubkey: relayPubkey, selfURLs: make(map[string]bool), nip11Cache: dsync.NewNIP11Cache(30 * time.Minute), triggerChan: make(chan struct{}, 1), } return } // SetSeedCallback sets the callback function for getting seed pubkeys (whitelisted users). func (ds *DirectorySpider) SetSeedCallback(getSeedPubkeys func() [][]byte) { ds.mu.Lock() defer ds.mu.Unlock() ds.getSeedPubkeys = getSeedPubkeys } // Start begins the directory spider operation. func (ds *DirectorySpider) Start() (err error) { if ds.running.Load() { err = errorf.E("directory spider already running") return } if ds.getSeedPubkeys == nil { err = errorf.E("seed callback must be set before starting") return } ds.running.Store(true) go ds.mainLoop() log.I.F("directory spider: started (interval: %v, max hops: %d)", ds.interval, ds.maxHops) return } // Stop stops the directory spider operation. func (ds *DirectorySpider) Stop() { if !ds.running.Load() { return } ds.running.Store(false) ds.cancel() log.I.F("directory spider: stopped") } // TriggerNow forces an immediate run of the directory spider. func (ds *DirectorySpider) TriggerNow() { select { case ds.triggerChan <- struct{}{}: log.I.F("directory spider: manual trigger sent") default: log.I.F("directory spider: trigger already pending") } } // LastRun returns the time of the last completed run. func (ds *DirectorySpider) LastRun() time.Time { ds.mu.Lock() defer ds.mu.Unlock() return ds.lastRun } // mainLoop is the main spider loop that runs periodically. func (ds *DirectorySpider) mainLoop() { // Run immediately on start ds.runOnce() ticker := time.NewTicker(ds.interval) defer ticker.Stop() log.I.F("directory spider: main loop started, running every %v", ds.interval) for { select { case <-ds.ctx.Done(): return case <-ds.triggerChan: log.I.F("directory spider: manual trigger received") ds.runOnce() case <-ticker.C: log.I.F("directory spider: scheduled run triggered") ds.runOnce() } } } // runOnce performs a single directory spider run. func (ds *DirectorySpider) runOnce() { if !ds.running.Load() { return } log.I.F("directory spider: starting run") start := time.Now() // Reset state for this run ds.mu.Lock() ds.discoveredRelays = make(map[string]int) ds.processedRelays = make(map[string]bool) ds.mu.Unlock() // Phase 1: Discover relays via hop expansion if err := ds.discoverRelays(); err != nil { log.E.F("directory spider: relay discovery failed: %v", err) return } ds.mu.Lock() relayCount := len(ds.discoveredRelays) ds.mu.Unlock() log.I.F("directory spider: discovered %d relays", relayCount) // Phase 2: Fetch metadata from all discovered relays if err := ds.fetchMetadataFromRelays(); err != nil { log.E.F("directory spider: metadata fetch failed: %v", err) return } ds.mu.Lock() ds.lastRun = time.Now() ds.mu.Unlock() log.I.F("directory spider: completed run in %v", time.Since(start)) } // discoverRelays performs the multi-hop relay discovery. func (ds *DirectorySpider) discoverRelays() error { // Get seed pubkeys from callback seedPubkeys := ds.getSeedPubkeys() if len(seedPubkeys) == 0 { log.W.F("directory spider: no seed pubkeys available") return nil } log.I.F("directory spider: starting relay discovery with %d seed pubkeys", len(seedPubkeys)) // Round 0: Get relay lists from seed pubkeys in local database seedRelays, err := ds.getRelaysFromLocalDB(seedPubkeys) if err != nil { return errorf.W("failed to get relays from local DB: %v", err) } // Add seed relays at hop 0 ds.mu.Lock() for _, url := range seedRelays { if !ds.isSelfRelay(url) { ds.discoveredRelays[url] = 0 } } ds.mu.Unlock() log.I.F("directory spider: found %d seed relays from local database", len(seedRelays)) // Rounds 1 to maxHops: Expand outward for hop := 1; hop <= ds.maxHops; hop++ { select { case <-ds.ctx.Done(): return ds.ctx.Err() default: } // Get relays at previous hop level that haven't been processed ds.mu.Lock() var relaysToProcess []string for url, hopLevel := range ds.discoveredRelays { if hopLevel == hop-1 && !ds.processedRelays[url] { relaysToProcess = append(relaysToProcess, url) } } ds.mu.Unlock() if len(relaysToProcess) == 0 { log.I.F("directory spider: no relays to process at hop %d", hop) break } log.I.F("directory spider: hop %d - processing %d relays", hop, len(relaysToProcess)) newRelaysThisHop := 0 // Process each relay serially for _, relayURL := range relaysToProcess { select { case <-ds.ctx.Done(): return ds.ctx.Err() default: } // Fetch kind 10002 events from this relay events, err := ds.fetchRelayListsFromRelay(relayURL) if err != nil { log.W.F("directory spider: failed to fetch from %s: %v", relayURL, err) // Mark as processed even on failure to avoid retrying ds.mu.Lock() ds.processedRelays[relayURL] = true ds.mu.Unlock() continue } // Extract new relay URLs newRelays := ds.extractRelaysFromEvents(events) ds.mu.Lock() ds.processedRelays[relayURL] = true for _, newURL := range newRelays { if _, exists := ds.discoveredRelays[newURL]; !exists { if !ds.isSelfRelay(newURL) { ds.discoveredRelays[newURL] = hop newRelaysThisHop++ } } } ds.mu.Unlock() // Rate limiting delay between relays time.Sleep(DirectorySpiderRelayDelay) } log.I.F("directory spider: hop %d - discovered %d new relays", hop, newRelaysThisHop) } return nil } // getRelaysFromLocalDB queries the local database for kind 10002 events from seed pubkeys. func (ds *DirectorySpider) getRelaysFromLocalDB(seedPubkeys [][]byte) ([]string, error) { ctx, cancel := context.WithTimeout(ds.ctx, 30*time.Second) defer cancel() // Query for kind 10002 from seed pubkeys f := &filter.F{ Authors: tag.NewFromBytesSlice(seedPubkeys...), Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)), } events, err := ds.db.QueryEvents(ctx, f) if err != nil { return nil, err } return ds.extractRelaysFromEvents(events), nil } // fetchRelayListsFromRelay connects to a relay and fetches all kind 10002 events. func (ds *DirectorySpider) fetchRelayListsFromRelay(relayURL string) ([]*event.E, error) { ctx, cancel := context.WithTimeout(ds.ctx, DirectorySpiderRelayTimeout) defer cancel() log.D.F("directory spider: connecting to %s", relayURL) client, err := ws.RelayConnect(ctx, relayURL) if err != nil { return nil, errorf.W("failed to connect: %v", err) } defer client.Close() // Query for all kind 10002 events limit := uint(DirectorySpiderMaxEventsPerQuery) f := filter.NewS(&filter.F{ Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)), Limit: &limit, }) sub, err := client.Subscribe(ctx, f) if err != nil { return nil, errorf.W("failed to subscribe: %v", err) } defer sub.Unsub() var events []*event.E queryCtx, queryCancel := context.WithTimeout(ctx, DirectorySpiderQueryTimeout) defer queryCancel() // Collect events until EOSE or timeout for { select { case <-queryCtx.Done(): log.D.F("directory spider: query timeout for %s, got %d events", relayURL, len(events)) return events, nil case <-sub.EndOfStoredEvents: log.D.F("directory spider: EOSE from %s, got %d events", relayURL, len(events)) return events, nil case ev := <-sub.Events: if ev == nil { return events, nil } events = append(events, ev) } } } // extractRelaysFromEvents parses kind 10002 events and extracts relay URLs from "r" tags. func (ds *DirectorySpider) extractRelaysFromEvents(events []*event.E) []string { seen := make(map[string]bool) var relays []string for _, ev := range events { // Get all "r" tags rTags := ev.Tags.GetAll([]byte("r")) for _, rTag := range rTags { if len(rTag.T) < 2 { continue } urlBytes := rTag.Value() if len(urlBytes) == 0 { continue } // Normalize the URL normalized := string(normalize.URL(string(urlBytes))) if normalized == "" { continue } if !seen[normalized] { seen[normalized] = true relays = append(relays, normalized) } } } return relays } // fetchMetadataFromRelays iterates through all discovered relays and fetches metadata. func (ds *DirectorySpider) fetchMetadataFromRelays() error { ds.mu.Lock() // Copy relay list to avoid holding lock during network operations var relays []string for url := range ds.discoveredRelays { relays = append(relays, url) } ds.mu.Unlock() log.I.F("directory spider: fetching metadata from %d relays", len(relays)) // Kinds to fetch: 0 (profile), 3 (follow list), 10000 (mute list), 10002 (relay list) kindsToFetch := []uint16{ kind.ProfileMetadata.K, // 0 kind.FollowList.K, // 3 kind.MuteList.K, // 10000 kind.RelayListMetadata.K, // 10002 } totalSaved := 0 totalDuplicates := 0 for _, relayURL := range relays { select { case <-ds.ctx.Done(): return ds.ctx.Err() default: } ds.mu.Lock() alreadyProcessed := ds.processedRelays[relayURL] ds.mu.Unlock() if alreadyProcessed { continue } log.D.F("directory spider: fetching metadata from %s", relayURL) for _, k := range kindsToFetch { select { case <-ds.ctx.Done(): return ds.ctx.Err() default: } events, err := ds.fetchKindFromRelay(relayURL, k) if err != nil { log.W.F("directory spider: failed to fetch kind %d from %s: %v", k, relayURL, err) continue } saved, duplicates := ds.storeEvents(events) totalSaved += saved totalDuplicates += duplicates log.D.F("directory spider: kind %d from %s: %d saved, %d duplicates", k, relayURL, saved, duplicates) } ds.mu.Lock() ds.processedRelays[relayURL] = true ds.mu.Unlock() // Rate limiting delay between relays time.Sleep(DirectorySpiderRelayDelay) } log.I.F("directory spider: metadata fetch complete - %d events saved, %d duplicates", totalSaved, totalDuplicates) return nil } // fetchKindFromRelay connects to a relay and fetches events of a specific kind. func (ds *DirectorySpider) fetchKindFromRelay(relayURL string, k uint16) ([]*event.E, error) { ctx, cancel := context.WithTimeout(ds.ctx, DirectorySpiderRelayTimeout) defer cancel() client, err := ws.RelayConnect(ctx, relayURL) if err != nil { return nil, errorf.W("failed to connect: %v", err) } defer client.Close() // Query for events of this kind limit := uint(DirectorySpiderMaxEventsPerQuery) f := filter.NewS(&filter.F{ Kinds: kind.NewS(kind.New(k)), Limit: &limit, }) sub, err := client.Subscribe(ctx, f) if err != nil { return nil, errorf.W("failed to subscribe: %v", err) } defer sub.Unsub() var events []*event.E queryCtx, queryCancel := context.WithTimeout(ctx, DirectorySpiderQueryTimeout) defer queryCancel() for { select { case <-queryCtx.Done(): return events, nil case <-sub.EndOfStoredEvents: return events, nil case ev := <-sub.Events: if ev == nil { return events, nil } events = append(events, ev) } } } // storeEvents saves events to the database and publishes new ones. func (ds *DirectorySpider) storeEvents(events []*event.E) (saved, duplicates int) { for _, ev := range events { _, err := ds.db.SaveEvent(ds.ctx, ev) if err != nil { if chk.T(err) { // Most errors are duplicates, which is expected duplicates++ } continue } saved++ // Publish event to active subscribers if ds.pub != nil { go ds.pub.Deliver(ev) } } return } // isSelfRelay checks if a relay URL is ourselves by comparing NIP-11 pubkeys. func (ds *DirectorySpider) isSelfRelay(relayURL string) bool { // If we don't have a relay identity pubkey, can't compare if ds.relayIdentityPubkey == "" { return false } ds.mu.Lock() // Fast path: check if we already know this URL is ours if ds.selfURLs[relayURL] { ds.mu.Unlock() return true } ds.mu.Unlock() // Slow path: check via NIP-11 pubkey ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() peerPubkey, err := ds.nip11Cache.GetPubkey(ctx, relayURL) if err != nil { // Can't determine, assume not self return false } if peerPubkey == ds.relayIdentityPubkey { log.D.F("directory spider: discovered self-relay: %s", relayURL) ds.mu.Lock() ds.selfURLs[relayURL] = true ds.mu.Unlock() return true } return false }