Files
next.orly.dev/pkg/spider/directory.go
2025-11-27 00:02:14 +00:00

613 lines
15 KiB
Go

package spider
import (
"context"
"sync"
"sync/atomic"
"time"
"git.mleku.dev/mleku/nostr/crypto/keys"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter"
"git.mleku.dev/mleku/nostr/encoders/kind"
"git.mleku.dev/mleku/nostr/encoders/tag"
"git.mleku.dev/mleku/nostr/utils/normalize"
"git.mleku.dev/mleku/nostr/ws"
"lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/interfaces/publisher"
dsync "next.orly.dev/pkg/sync"
)
const (
// DirectorySpiderDefaultInterval is how often the directory spider runs
DirectorySpiderDefaultInterval = 24 * time.Hour
// DirectorySpiderDefaultMaxHops is the maximum hop distance for relay discovery
DirectorySpiderDefaultMaxHops = 3
// DirectorySpiderRelayTimeout is the timeout for connecting to and querying a relay
DirectorySpiderRelayTimeout = 30 * time.Second
// DirectorySpiderQueryTimeout is the timeout for waiting for EOSE on a query
DirectorySpiderQueryTimeout = 60 * time.Second
// DirectorySpiderRelayDelay is the delay between processing relays (rate limiting)
DirectorySpiderRelayDelay = 500 * time.Millisecond
// DirectorySpiderMaxEventsPerQuery is the limit for each query
DirectorySpiderMaxEventsPerQuery = 5000
)
// DirectorySpider manages periodic relay discovery and metadata synchronization.
// It discovers relays by crawling kind 10002 (relay list) events, expanding outward
// in hops from seed pubkeys (whitelisted users), then fetches essential metadata
// events (kinds 0, 3, 10000, 10002) from all discovered relays.
type DirectorySpider struct {
ctx context.Context
cancel context.CancelFunc
db *database.D
pub publisher.I
// Configuration
interval time.Duration
maxHops int
// State
running atomic.Bool
lastRun time.Time
// Relay discovery state (reset each run)
mu sync.Mutex
discoveredRelays map[string]int // URL -> hop distance
processedRelays map[string]bool // Already fetched metadata from
// Self-detection
relayIdentityPubkey string
selfURLs map[string]bool
nip11Cache *dsync.NIP11Cache
// Callback for getting seed pubkeys (whitelisted users)
getSeedPubkeys func() [][]byte
// Trigger channel for manual runs
triggerChan chan struct{}
}
// NewDirectorySpider creates a new DirectorySpider instance.
func NewDirectorySpider(
ctx context.Context,
db *database.D,
pub publisher.I,
interval time.Duration,
maxHops int,
) (ds *DirectorySpider, err error) {
if db == nil {
err = errorf.E("database cannot be nil")
return
}
if interval <= 0 {
interval = DirectorySpiderDefaultInterval
}
if maxHops <= 0 {
maxHops = DirectorySpiderDefaultMaxHops
}
ctx, cancel := context.WithCancel(ctx)
// Get relay identity pubkey for self-detection
var relayPubkey string
if skb, err := db.GetRelayIdentitySecret(); err == nil && len(skb) == 32 {
pk, _ := keys.SecretBytesToPubKeyHex(skb)
relayPubkey = pk
}
ds = &DirectorySpider{
ctx: ctx,
cancel: cancel,
db: db,
pub: pub,
interval: interval,
maxHops: maxHops,
discoveredRelays: make(map[string]int),
processedRelays: make(map[string]bool),
relayIdentityPubkey: relayPubkey,
selfURLs: make(map[string]bool),
nip11Cache: dsync.NewNIP11Cache(30 * time.Minute),
triggerChan: make(chan struct{}, 1),
}
return
}
// SetSeedCallback sets the callback function for getting seed pubkeys (whitelisted users).
func (ds *DirectorySpider) SetSeedCallback(getSeedPubkeys func() [][]byte) {
ds.mu.Lock()
defer ds.mu.Unlock()
ds.getSeedPubkeys = getSeedPubkeys
}
// Start begins the directory spider operation.
func (ds *DirectorySpider) Start() (err error) {
if ds.running.Load() {
err = errorf.E("directory spider already running")
return
}
if ds.getSeedPubkeys == nil {
err = errorf.E("seed callback must be set before starting")
return
}
ds.running.Store(true)
go ds.mainLoop()
log.I.F("directory spider: started (interval: %v, max hops: %d)", ds.interval, ds.maxHops)
return
}
// Stop stops the directory spider operation.
func (ds *DirectorySpider) Stop() {
if !ds.running.Load() {
return
}
ds.running.Store(false)
ds.cancel()
log.I.F("directory spider: stopped")
}
// TriggerNow forces an immediate run of the directory spider.
func (ds *DirectorySpider) TriggerNow() {
select {
case ds.triggerChan <- struct{}{}:
log.I.F("directory spider: manual trigger sent")
default:
log.I.F("directory spider: trigger already pending")
}
}
// LastRun returns the time of the last completed run.
func (ds *DirectorySpider) LastRun() time.Time {
ds.mu.Lock()
defer ds.mu.Unlock()
return ds.lastRun
}
// mainLoop is the main spider loop that runs periodically.
func (ds *DirectorySpider) mainLoop() {
// Run immediately on start
ds.runOnce()
ticker := time.NewTicker(ds.interval)
defer ticker.Stop()
log.I.F("directory spider: main loop started, running every %v", ds.interval)
for {
select {
case <-ds.ctx.Done():
return
case <-ds.triggerChan:
log.I.F("directory spider: manual trigger received")
ds.runOnce()
case <-ticker.C:
log.I.F("directory spider: scheduled run triggered")
ds.runOnce()
}
}
}
// runOnce performs a single directory spider run.
func (ds *DirectorySpider) runOnce() {
if !ds.running.Load() {
return
}
log.I.F("directory spider: starting run")
start := time.Now()
// Reset state for this run
ds.mu.Lock()
ds.discoveredRelays = make(map[string]int)
ds.processedRelays = make(map[string]bool)
ds.mu.Unlock()
// Phase 1: Discover relays via hop expansion
if err := ds.discoverRelays(); err != nil {
log.E.F("directory spider: relay discovery failed: %v", err)
return
}
ds.mu.Lock()
relayCount := len(ds.discoveredRelays)
ds.mu.Unlock()
log.I.F("directory spider: discovered %d relays", relayCount)
// Phase 2: Fetch metadata from all discovered relays
if err := ds.fetchMetadataFromRelays(); err != nil {
log.E.F("directory spider: metadata fetch failed: %v", err)
return
}
ds.mu.Lock()
ds.lastRun = time.Now()
ds.mu.Unlock()
log.I.F("directory spider: completed run in %v", time.Since(start))
}
// discoverRelays performs the multi-hop relay discovery.
func (ds *DirectorySpider) discoverRelays() error {
// Get seed pubkeys from callback
seedPubkeys := ds.getSeedPubkeys()
if len(seedPubkeys) == 0 {
log.W.F("directory spider: no seed pubkeys available")
return nil
}
log.I.F("directory spider: starting relay discovery with %d seed pubkeys", len(seedPubkeys))
// Round 0: Get relay lists from seed pubkeys in local database
seedRelays, err := ds.getRelaysFromLocalDB(seedPubkeys)
if err != nil {
return errorf.W("failed to get relays from local DB: %v", err)
}
// Add seed relays at hop 0
ds.mu.Lock()
for _, url := range seedRelays {
if !ds.isSelfRelay(url) {
ds.discoveredRelays[url] = 0
}
}
ds.mu.Unlock()
log.I.F("directory spider: found %d seed relays from local database", len(seedRelays))
// Rounds 1 to maxHops: Expand outward
for hop := 1; hop <= ds.maxHops; hop++ {
select {
case <-ds.ctx.Done():
return ds.ctx.Err()
default:
}
// Get relays at previous hop level that haven't been processed
ds.mu.Lock()
var relaysToProcess []string
for url, hopLevel := range ds.discoveredRelays {
if hopLevel == hop-1 && !ds.processedRelays[url] {
relaysToProcess = append(relaysToProcess, url)
}
}
ds.mu.Unlock()
if len(relaysToProcess) == 0 {
log.I.F("directory spider: no relays to process at hop %d", hop)
break
}
log.I.F("directory spider: hop %d - processing %d relays", hop, len(relaysToProcess))
newRelaysThisHop := 0
// Process each relay serially
for _, relayURL := range relaysToProcess {
select {
case <-ds.ctx.Done():
return ds.ctx.Err()
default:
}
// Fetch kind 10002 events from this relay
events, err := ds.fetchRelayListsFromRelay(relayURL)
if err != nil {
log.W.F("directory spider: failed to fetch from %s: %v", relayURL, err)
// Mark as processed even on failure to avoid retrying
ds.mu.Lock()
ds.processedRelays[relayURL] = true
ds.mu.Unlock()
continue
}
// Extract new relay URLs
newRelays := ds.extractRelaysFromEvents(events)
ds.mu.Lock()
ds.processedRelays[relayURL] = true
for _, newURL := range newRelays {
if _, exists := ds.discoveredRelays[newURL]; !exists {
if !ds.isSelfRelay(newURL) {
ds.discoveredRelays[newURL] = hop
newRelaysThisHop++
}
}
}
ds.mu.Unlock()
// Rate limiting delay between relays
time.Sleep(DirectorySpiderRelayDelay)
}
log.I.F("directory spider: hop %d - discovered %d new relays", hop, newRelaysThisHop)
}
return nil
}
// getRelaysFromLocalDB queries the local database for kind 10002 events from seed pubkeys.
func (ds *DirectorySpider) getRelaysFromLocalDB(seedPubkeys [][]byte) ([]string, error) {
ctx, cancel := context.WithTimeout(ds.ctx, 30*time.Second)
defer cancel()
// Query for kind 10002 from seed pubkeys
f := &filter.F{
Authors: tag.NewFromBytesSlice(seedPubkeys...),
Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
}
events, err := ds.db.QueryEvents(ctx, f)
if err != nil {
return nil, err
}
return ds.extractRelaysFromEvents(events), nil
}
// fetchRelayListsFromRelay connects to a relay and fetches all kind 10002 events.
func (ds *DirectorySpider) fetchRelayListsFromRelay(relayURL string) ([]*event.E, error) {
ctx, cancel := context.WithTimeout(ds.ctx, DirectorySpiderRelayTimeout)
defer cancel()
log.D.F("directory spider: connecting to %s", relayURL)
client, err := ws.RelayConnect(ctx, relayURL)
if err != nil {
return nil, errorf.W("failed to connect: %v", err)
}
defer client.Close()
// Query for all kind 10002 events
limit := uint(DirectorySpiderMaxEventsPerQuery)
f := filter.NewS(&filter.F{
Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
Limit: &limit,
})
sub, err := client.Subscribe(ctx, f)
if err != nil {
return nil, errorf.W("failed to subscribe: %v", err)
}
defer sub.Unsub()
var events []*event.E
queryCtx, queryCancel := context.WithTimeout(ctx, DirectorySpiderQueryTimeout)
defer queryCancel()
// Collect events until EOSE or timeout
for {
select {
case <-queryCtx.Done():
log.D.F("directory spider: query timeout for %s, got %d events", relayURL, len(events))
return events, nil
case <-sub.EndOfStoredEvents:
log.D.F("directory spider: EOSE from %s, got %d events", relayURL, len(events))
return events, nil
case ev := <-sub.Events:
if ev == nil {
return events, nil
}
events = append(events, ev)
}
}
}
// extractRelaysFromEvents parses kind 10002 events and extracts relay URLs from "r" tags.
func (ds *DirectorySpider) extractRelaysFromEvents(events []*event.E) []string {
seen := make(map[string]bool)
var relays []string
for _, ev := range events {
// Get all "r" tags
rTags := ev.Tags.GetAll([]byte("r"))
for _, rTag := range rTags {
if len(rTag.T) < 2 {
continue
}
urlBytes := rTag.Value()
if len(urlBytes) == 0 {
continue
}
// Normalize the URL
normalized := string(normalize.URL(string(urlBytes)))
if normalized == "" {
continue
}
if !seen[normalized] {
seen[normalized] = true
relays = append(relays, normalized)
}
}
}
return relays
}
// fetchMetadataFromRelays iterates through all discovered relays and fetches metadata.
func (ds *DirectorySpider) fetchMetadataFromRelays() error {
ds.mu.Lock()
// Copy relay list to avoid holding lock during network operations
var relays []string
for url := range ds.discoveredRelays {
relays = append(relays, url)
}
ds.mu.Unlock()
log.I.F("directory spider: fetching metadata from %d relays", len(relays))
// Kinds to fetch: 0 (profile), 3 (follow list), 10000 (mute list), 10002 (relay list)
kindsToFetch := []uint16{
kind.ProfileMetadata.K, // 0
kind.FollowList.K, // 3
kind.MuteList.K, // 10000
kind.RelayListMetadata.K, // 10002
}
totalSaved := 0
totalDuplicates := 0
for _, relayURL := range relays {
select {
case <-ds.ctx.Done():
return ds.ctx.Err()
default:
}
ds.mu.Lock()
alreadyProcessed := ds.processedRelays[relayURL]
ds.mu.Unlock()
if alreadyProcessed {
continue
}
log.D.F("directory spider: fetching metadata from %s", relayURL)
for _, k := range kindsToFetch {
select {
case <-ds.ctx.Done():
return ds.ctx.Err()
default:
}
events, err := ds.fetchKindFromRelay(relayURL, k)
if err != nil {
log.W.F("directory spider: failed to fetch kind %d from %s: %v", k, relayURL, err)
continue
}
saved, duplicates := ds.storeEvents(events)
totalSaved += saved
totalDuplicates += duplicates
log.D.F("directory spider: kind %d from %s: %d saved, %d duplicates",
k, relayURL, saved, duplicates)
}
ds.mu.Lock()
ds.processedRelays[relayURL] = true
ds.mu.Unlock()
// Rate limiting delay between relays
time.Sleep(DirectorySpiderRelayDelay)
}
log.I.F("directory spider: metadata fetch complete - %d events saved, %d duplicates",
totalSaved, totalDuplicates)
return nil
}
// fetchKindFromRelay connects to a relay and fetches events of a specific kind.
func (ds *DirectorySpider) fetchKindFromRelay(relayURL string, k uint16) ([]*event.E, error) {
ctx, cancel := context.WithTimeout(ds.ctx, DirectorySpiderRelayTimeout)
defer cancel()
client, err := ws.RelayConnect(ctx, relayURL)
if err != nil {
return nil, errorf.W("failed to connect: %v", err)
}
defer client.Close()
// Query for events of this kind
limit := uint(DirectorySpiderMaxEventsPerQuery)
f := filter.NewS(&filter.F{
Kinds: kind.NewS(kind.New(k)),
Limit: &limit,
})
sub, err := client.Subscribe(ctx, f)
if err != nil {
return nil, errorf.W("failed to subscribe: %v", err)
}
defer sub.Unsub()
var events []*event.E
queryCtx, queryCancel := context.WithTimeout(ctx, DirectorySpiderQueryTimeout)
defer queryCancel()
for {
select {
case <-queryCtx.Done():
return events, nil
case <-sub.EndOfStoredEvents:
return events, nil
case ev := <-sub.Events:
if ev == nil {
return events, nil
}
events = append(events, ev)
}
}
}
// storeEvents saves events to the database and publishes new ones.
func (ds *DirectorySpider) storeEvents(events []*event.E) (saved, duplicates int) {
for _, ev := range events {
_, err := ds.db.SaveEvent(ds.ctx, ev)
if err != nil {
if chk.T(err) {
// Most errors are duplicates, which is expected
duplicates++
}
continue
}
saved++
// Publish event to active subscribers
if ds.pub != nil {
go ds.pub.Deliver(ev)
}
}
return
}
// isSelfRelay checks if a relay URL is ourselves by comparing NIP-11 pubkeys.
func (ds *DirectorySpider) isSelfRelay(relayURL string) bool {
// If we don't have a relay identity pubkey, can't compare
if ds.relayIdentityPubkey == "" {
return false
}
ds.mu.Lock()
// Fast path: check if we already know this URL is ours
if ds.selfURLs[relayURL] {
ds.mu.Unlock()
return true
}
ds.mu.Unlock()
// Slow path: check via NIP-11 pubkey
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
peerPubkey, err := ds.nip11Cache.GetPubkey(ctx, relayURL)
if err != nil {
// Can't determine, assume not self
return false
}
if peerPubkey == ds.relayIdentityPubkey {
log.D.F("directory spider: discovered self-relay: %s", relayURL)
ds.mu.Lock()
ds.selfURLs[relayURL] = true
ds.mu.Unlock()
return true
}
return false
}