add directory spider
This commit is contained in:
@@ -68,6 +68,11 @@ type C struct {
|
|||||||
// Spider settings
|
// Spider settings
|
||||||
SpiderMode string `env:"ORLY_SPIDER_MODE" default:"none" usage:"spider mode for syncing events: none, follows"`
|
SpiderMode string `env:"ORLY_SPIDER_MODE" default:"none" usage:"spider mode for syncing events: none, follows"`
|
||||||
|
|
||||||
|
// Directory Spider settings
|
||||||
|
DirectorySpiderEnabled bool `env:"ORLY_DIRECTORY_SPIDER" default:"false" usage:"enable directory spider for metadata sync (kinds 0, 3, 10000, 10002)"`
|
||||||
|
DirectorySpiderInterval time.Duration `env:"ORLY_DIRECTORY_SPIDER_INTERVAL" default:"24h" usage:"how often to run directory spider"`
|
||||||
|
DirectorySpiderMaxHops int `env:"ORLY_DIRECTORY_SPIDER_HOPS" default:"3" usage:"maximum hops for relay discovery from seed users"`
|
||||||
|
|
||||||
PolicyEnabled bool `env:"ORLY_POLICY_ENABLED" default:"false" usage:"enable policy-based event processing (configuration found in $HOME/.config/ORLY/policy.json)"`
|
PolicyEnabled bool `env:"ORLY_POLICY_ENABLED" default:"false" usage:"enable policy-based event processing (configuration found in $HOME/.config/ORLY/policy.json)"`
|
||||||
|
|
||||||
// NIP-43 Relay Access Metadata and Requests
|
// NIP-43 Relay Access Metadata and Requests
|
||||||
|
|||||||
44
app/main.go
44
app/main.go
@@ -141,6 +141,44 @@ func Run(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialize directory spider if enabled (only for Badger backend)
|
||||||
|
if badgerDB, ok := db.(*database.D); ok && cfg.DirectorySpiderEnabled {
|
||||||
|
if l.directorySpider, err = spider.NewDirectorySpider(
|
||||||
|
ctx,
|
||||||
|
badgerDB,
|
||||||
|
l.publishers,
|
||||||
|
cfg.DirectorySpiderInterval,
|
||||||
|
cfg.DirectorySpiderMaxHops,
|
||||||
|
); chk.E(err) {
|
||||||
|
log.E.F("failed to create directory spider: %v", err)
|
||||||
|
} else {
|
||||||
|
// Set up callback to get seed pubkeys (whitelisted users)
|
||||||
|
l.directorySpider.SetSeedCallback(func() [][]byte {
|
||||||
|
var pubkeys [][]byte
|
||||||
|
// Get followed pubkeys from follows ACL if available
|
||||||
|
for _, aclInstance := range acl.Registry.ACL {
|
||||||
|
if aclInstance.Type() == "follows" {
|
||||||
|
if follows, ok := aclInstance.(*acl.Follows); ok {
|
||||||
|
pubkeys = append(pubkeys, follows.GetFollowedPubkeys()...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Fall back to admin keys if no follows ACL
|
||||||
|
if len(pubkeys) == 0 {
|
||||||
|
pubkeys = adminKeys
|
||||||
|
}
|
||||||
|
return pubkeys
|
||||||
|
})
|
||||||
|
|
||||||
|
if err = l.directorySpider.Start(); chk.E(err) {
|
||||||
|
log.E.F("failed to start directory spider: %v", err)
|
||||||
|
} else {
|
||||||
|
log.I.F("directory spider started (interval: %v, max hops: %d)",
|
||||||
|
cfg.DirectorySpiderInterval, cfg.DirectorySpiderMaxHops)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize relay group manager (only for Badger backend)
|
// Initialize relay group manager (only for Badger backend)
|
||||||
if badgerDB, ok := db.(*database.D); ok {
|
if badgerDB, ok := db.(*database.D); ok {
|
||||||
l.relayGroupMgr = dsync.NewRelayGroupManager(badgerDB, cfg.RelayGroupAdmins)
|
l.relayGroupMgr = dsync.NewRelayGroupManager(badgerDB, cfg.RelayGroupAdmins)
|
||||||
@@ -360,6 +398,12 @@ func Run(
|
|||||||
log.I.F("spider manager stopped")
|
log.I.F("spider manager stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stop directory spider if running
|
||||||
|
if l.directorySpider != nil {
|
||||||
|
l.directorySpider.Stop()
|
||||||
|
log.I.F("directory spider stopped")
|
||||||
|
}
|
||||||
|
|
||||||
// Create shutdown context with timeout
|
// Create shutdown context with timeout
|
||||||
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
|
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
defer cancelShutdown()
|
defer cancelShutdown()
|
||||||
|
|||||||
@@ -48,17 +48,22 @@ type Server struct {
|
|||||||
challengeMutex sync.RWMutex
|
challengeMutex sync.RWMutex
|
||||||
challenges map[string][]byte
|
challenges map[string][]byte
|
||||||
|
|
||||||
paymentProcessor *PaymentProcessor
|
// Message processing pause mutex for policy/follow list updates
|
||||||
sprocketManager *SprocketManager
|
// Use RLock() for normal message processing, Lock() for updates
|
||||||
policyManager *policy.P
|
messagePauseMutex sync.RWMutex
|
||||||
spiderManager *spider.Spider
|
|
||||||
syncManager *dsync.Manager
|
paymentProcessor *PaymentProcessor
|
||||||
relayGroupMgr *dsync.RelayGroupManager
|
sprocketManager *SprocketManager
|
||||||
clusterManager *dsync.ClusterManager
|
policyManager *policy.P
|
||||||
blossomServer *blossom.Server
|
spiderManager *spider.Spider
|
||||||
InviteManager *nip43.InviteManager
|
directorySpider *spider.DirectorySpider
|
||||||
cfg *config.C
|
syncManager *dsync.Manager
|
||||||
db database.Database // Changed from *database.D to interface
|
relayGroupMgr *dsync.RelayGroupManager
|
||||||
|
clusterManager *dsync.ClusterManager
|
||||||
|
blossomServer *blossom.Server
|
||||||
|
InviteManager *nip43.InviteManager
|
||||||
|
cfg *config.C
|
||||||
|
db database.Database // Changed from *database.D to interface
|
||||||
}
|
}
|
||||||
|
|
||||||
// isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system
|
// isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system
|
||||||
@@ -1135,3 +1140,32 @@ func (s *Server) updatePeerAdminACL(peerPubkey []byte) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// =============================================================================
|
||||||
|
// Message Processing Pause/Resume for Policy and Follow List Updates
|
||||||
|
// =============================================================================
|
||||||
|
|
||||||
|
// PauseMessageProcessing acquires an exclusive lock to pause all message processing.
|
||||||
|
// This should be called before updating policy configuration or follow lists.
|
||||||
|
// Call ResumeMessageProcessing to release the lock after updates are complete.
|
||||||
|
func (s *Server) PauseMessageProcessing() {
|
||||||
|
s.messagePauseMutex.Lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResumeMessageProcessing releases the exclusive lock to resume message processing.
|
||||||
|
// This should be called after policy configuration or follow list updates are complete.
|
||||||
|
func (s *Server) ResumeMessageProcessing() {
|
||||||
|
s.messagePauseMutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// AcquireMessageProcessingLock acquires a read lock for normal message processing.
|
||||||
|
// This allows concurrent message processing while blocking during policy updates.
|
||||||
|
// Call ReleaseMessageProcessingLock when message processing is complete.
|
||||||
|
func (s *Server) AcquireMessageProcessingLock() {
|
||||||
|
s.messagePauseMutex.RLock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReleaseMessageProcessingLock releases the read lock after message processing.
|
||||||
|
func (s *Server) ReleaseMessageProcessingLock() {
|
||||||
|
s.messagePauseMutex.RUnlock()
|
||||||
|
}
|
||||||
|
|||||||
612
pkg/spider/directory.go
Normal file
612
pkg/spider/directory.go
Normal file
@@ -0,0 +1,612 @@
|
|||||||
|
package spider
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.mleku.dev/mleku/nostr/crypto/keys"
|
||||||
|
"git.mleku.dev/mleku/nostr/encoders/event"
|
||||||
|
"git.mleku.dev/mleku/nostr/encoders/filter"
|
||||||
|
"git.mleku.dev/mleku/nostr/encoders/kind"
|
||||||
|
"git.mleku.dev/mleku/nostr/encoders/tag"
|
||||||
|
"git.mleku.dev/mleku/nostr/utils/normalize"
|
||||||
|
"git.mleku.dev/mleku/nostr/ws"
|
||||||
|
"lol.mleku.dev/chk"
|
||||||
|
"lol.mleku.dev/errorf"
|
||||||
|
"lol.mleku.dev/log"
|
||||||
|
"next.orly.dev/pkg/database"
|
||||||
|
"next.orly.dev/pkg/interfaces/publisher"
|
||||||
|
dsync "next.orly.dev/pkg/sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// DirectorySpiderDefaultInterval is how often the directory spider runs
|
||||||
|
DirectorySpiderDefaultInterval = 24 * time.Hour
|
||||||
|
// DirectorySpiderDefaultMaxHops is the maximum hop distance for relay discovery
|
||||||
|
DirectorySpiderDefaultMaxHops = 3
|
||||||
|
// DirectorySpiderRelayTimeout is the timeout for connecting to and querying a relay
|
||||||
|
DirectorySpiderRelayTimeout = 30 * time.Second
|
||||||
|
// DirectorySpiderQueryTimeout is the timeout for waiting for EOSE on a query
|
||||||
|
DirectorySpiderQueryTimeout = 60 * time.Second
|
||||||
|
// DirectorySpiderRelayDelay is the delay between processing relays (rate limiting)
|
||||||
|
DirectorySpiderRelayDelay = 500 * time.Millisecond
|
||||||
|
// DirectorySpiderMaxEventsPerQuery is the limit for each query
|
||||||
|
DirectorySpiderMaxEventsPerQuery = 5000
|
||||||
|
)
|
||||||
|
|
||||||
|
// DirectorySpider manages periodic relay discovery and metadata synchronization.
|
||||||
|
// It discovers relays by crawling kind 10002 (relay list) events, expanding outward
|
||||||
|
// in hops from seed pubkeys (whitelisted users), then fetches essential metadata
|
||||||
|
// events (kinds 0, 3, 10000, 10002) from all discovered relays.
|
||||||
|
type DirectorySpider struct {
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
db *database.D
|
||||||
|
pub publisher.I
|
||||||
|
|
||||||
|
// Configuration
|
||||||
|
interval time.Duration
|
||||||
|
maxHops int
|
||||||
|
|
||||||
|
// State
|
||||||
|
running atomic.Bool
|
||||||
|
lastRun time.Time
|
||||||
|
|
||||||
|
// Relay discovery state (reset each run)
|
||||||
|
mu sync.Mutex
|
||||||
|
discoveredRelays map[string]int // URL -> hop distance
|
||||||
|
processedRelays map[string]bool // Already fetched metadata from
|
||||||
|
|
||||||
|
// Self-detection
|
||||||
|
relayIdentityPubkey string
|
||||||
|
selfURLs map[string]bool
|
||||||
|
nip11Cache *dsync.NIP11Cache
|
||||||
|
|
||||||
|
// Callback for getting seed pubkeys (whitelisted users)
|
||||||
|
getSeedPubkeys func() [][]byte
|
||||||
|
|
||||||
|
// Trigger channel for manual runs
|
||||||
|
triggerChan chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDirectorySpider creates a new DirectorySpider instance.
|
||||||
|
func NewDirectorySpider(
|
||||||
|
ctx context.Context,
|
||||||
|
db *database.D,
|
||||||
|
pub publisher.I,
|
||||||
|
interval time.Duration,
|
||||||
|
maxHops int,
|
||||||
|
) (ds *DirectorySpider, err error) {
|
||||||
|
if db == nil {
|
||||||
|
err = errorf.E("database cannot be nil")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if interval <= 0 {
|
||||||
|
interval = DirectorySpiderDefaultInterval
|
||||||
|
}
|
||||||
|
if maxHops <= 0 {
|
||||||
|
maxHops = DirectorySpiderDefaultMaxHops
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
|
// Get relay identity pubkey for self-detection
|
||||||
|
var relayPubkey string
|
||||||
|
if skb, err := db.GetRelayIdentitySecret(); err == nil && len(skb) == 32 {
|
||||||
|
pk, _ := keys.SecretBytesToPubKeyHex(skb)
|
||||||
|
relayPubkey = pk
|
||||||
|
}
|
||||||
|
|
||||||
|
ds = &DirectorySpider{
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
db: db,
|
||||||
|
pub: pub,
|
||||||
|
interval: interval,
|
||||||
|
maxHops: maxHops,
|
||||||
|
discoveredRelays: make(map[string]int),
|
||||||
|
processedRelays: make(map[string]bool),
|
||||||
|
relayIdentityPubkey: relayPubkey,
|
||||||
|
selfURLs: make(map[string]bool),
|
||||||
|
nip11Cache: dsync.NewNIP11Cache(30 * time.Minute),
|
||||||
|
triggerChan: make(chan struct{}, 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetSeedCallback sets the callback function for getting seed pubkeys (whitelisted users).
|
||||||
|
func (ds *DirectorySpider) SetSeedCallback(getSeedPubkeys func() [][]byte) {
|
||||||
|
ds.mu.Lock()
|
||||||
|
defer ds.mu.Unlock()
|
||||||
|
ds.getSeedPubkeys = getSeedPubkeys
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start begins the directory spider operation.
|
||||||
|
func (ds *DirectorySpider) Start() (err error) {
|
||||||
|
if ds.running.Load() {
|
||||||
|
err = errorf.E("directory spider already running")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if ds.getSeedPubkeys == nil {
|
||||||
|
err = errorf.E("seed callback must be set before starting")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ds.running.Store(true)
|
||||||
|
go ds.mainLoop()
|
||||||
|
|
||||||
|
log.I.F("directory spider: started (interval: %v, max hops: %d)", ds.interval, ds.maxHops)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops the directory spider operation.
|
||||||
|
func (ds *DirectorySpider) Stop() {
|
||||||
|
if !ds.running.Load() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ds.running.Store(false)
|
||||||
|
ds.cancel()
|
||||||
|
|
||||||
|
log.I.F("directory spider: stopped")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TriggerNow forces an immediate run of the directory spider.
|
||||||
|
func (ds *DirectorySpider) TriggerNow() {
|
||||||
|
select {
|
||||||
|
case ds.triggerChan <- struct{}{}:
|
||||||
|
log.I.F("directory spider: manual trigger sent")
|
||||||
|
default:
|
||||||
|
log.I.F("directory spider: trigger already pending")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// LastRun returns the time of the last completed run.
|
||||||
|
func (ds *DirectorySpider) LastRun() time.Time {
|
||||||
|
ds.mu.Lock()
|
||||||
|
defer ds.mu.Unlock()
|
||||||
|
return ds.lastRun
|
||||||
|
}
|
||||||
|
|
||||||
|
// mainLoop is the main spider loop that runs periodically.
|
||||||
|
func (ds *DirectorySpider) mainLoop() {
|
||||||
|
// Run immediately on start
|
||||||
|
ds.runOnce()
|
||||||
|
|
||||||
|
ticker := time.NewTicker(ds.interval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
log.I.F("directory spider: main loop started, running every %v", ds.interval)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ds.ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ds.triggerChan:
|
||||||
|
log.I.F("directory spider: manual trigger received")
|
||||||
|
ds.runOnce()
|
||||||
|
case <-ticker.C:
|
||||||
|
log.I.F("directory spider: scheduled run triggered")
|
||||||
|
ds.runOnce()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// runOnce performs a single directory spider run.
|
||||||
|
func (ds *DirectorySpider) runOnce() {
|
||||||
|
if !ds.running.Load() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.I.F("directory spider: starting run")
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
// Reset state for this run
|
||||||
|
ds.mu.Lock()
|
||||||
|
ds.discoveredRelays = make(map[string]int)
|
||||||
|
ds.processedRelays = make(map[string]bool)
|
||||||
|
ds.mu.Unlock()
|
||||||
|
|
||||||
|
// Phase 1: Discover relays via hop expansion
|
||||||
|
if err := ds.discoverRelays(); err != nil {
|
||||||
|
log.E.F("directory spider: relay discovery failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ds.mu.Lock()
|
||||||
|
relayCount := len(ds.discoveredRelays)
|
||||||
|
ds.mu.Unlock()
|
||||||
|
|
||||||
|
log.I.F("directory spider: discovered %d relays", relayCount)
|
||||||
|
|
||||||
|
// Phase 2: Fetch metadata from all discovered relays
|
||||||
|
if err := ds.fetchMetadataFromRelays(); err != nil {
|
||||||
|
log.E.F("directory spider: metadata fetch failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ds.mu.Lock()
|
||||||
|
ds.lastRun = time.Now()
|
||||||
|
ds.mu.Unlock()
|
||||||
|
|
||||||
|
log.I.F("directory spider: completed run in %v", time.Since(start))
|
||||||
|
}
|
||||||
|
|
||||||
|
// discoverRelays performs the multi-hop relay discovery.
|
||||||
|
func (ds *DirectorySpider) discoverRelays() error {
|
||||||
|
// Get seed pubkeys from callback
|
||||||
|
seedPubkeys := ds.getSeedPubkeys()
|
||||||
|
if len(seedPubkeys) == 0 {
|
||||||
|
log.W.F("directory spider: no seed pubkeys available")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.I.F("directory spider: starting relay discovery with %d seed pubkeys", len(seedPubkeys))
|
||||||
|
|
||||||
|
// Round 0: Get relay lists from seed pubkeys in local database
|
||||||
|
seedRelays, err := ds.getRelaysFromLocalDB(seedPubkeys)
|
||||||
|
if err != nil {
|
||||||
|
return errorf.W("failed to get relays from local DB: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add seed relays at hop 0
|
||||||
|
ds.mu.Lock()
|
||||||
|
for _, url := range seedRelays {
|
||||||
|
if !ds.isSelfRelay(url) {
|
||||||
|
ds.discoveredRelays[url] = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ds.mu.Unlock()
|
||||||
|
|
||||||
|
log.I.F("directory spider: found %d seed relays from local database", len(seedRelays))
|
||||||
|
|
||||||
|
// Rounds 1 to maxHops: Expand outward
|
||||||
|
for hop := 1; hop <= ds.maxHops; hop++ {
|
||||||
|
select {
|
||||||
|
case <-ds.ctx.Done():
|
||||||
|
return ds.ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get relays at previous hop level that haven't been processed
|
||||||
|
ds.mu.Lock()
|
||||||
|
var relaysToProcess []string
|
||||||
|
for url, hopLevel := range ds.discoveredRelays {
|
||||||
|
if hopLevel == hop-1 && !ds.processedRelays[url] {
|
||||||
|
relaysToProcess = append(relaysToProcess, url)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ds.mu.Unlock()
|
||||||
|
|
||||||
|
if len(relaysToProcess) == 0 {
|
||||||
|
log.I.F("directory spider: no relays to process at hop %d", hop)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
log.I.F("directory spider: hop %d - processing %d relays", hop, len(relaysToProcess))
|
||||||
|
|
||||||
|
newRelaysThisHop := 0
|
||||||
|
|
||||||
|
// Process each relay serially
|
||||||
|
for _, relayURL := range relaysToProcess {
|
||||||
|
select {
|
||||||
|
case <-ds.ctx.Done():
|
||||||
|
return ds.ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch kind 10002 events from this relay
|
||||||
|
events, err := ds.fetchRelayListsFromRelay(relayURL)
|
||||||
|
if err != nil {
|
||||||
|
log.W.F("directory spider: failed to fetch from %s: %v", relayURL, err)
|
||||||
|
// Mark as processed even on failure to avoid retrying
|
||||||
|
ds.mu.Lock()
|
||||||
|
ds.processedRelays[relayURL] = true
|
||||||
|
ds.mu.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract new relay URLs
|
||||||
|
newRelays := ds.extractRelaysFromEvents(events)
|
||||||
|
|
||||||
|
ds.mu.Lock()
|
||||||
|
ds.processedRelays[relayURL] = true
|
||||||
|
for _, newURL := range newRelays {
|
||||||
|
if _, exists := ds.discoveredRelays[newURL]; !exists {
|
||||||
|
if !ds.isSelfRelay(newURL) {
|
||||||
|
ds.discoveredRelays[newURL] = hop
|
||||||
|
newRelaysThisHop++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ds.mu.Unlock()
|
||||||
|
|
||||||
|
// Rate limiting delay between relays
|
||||||
|
time.Sleep(DirectorySpiderRelayDelay)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.I.F("directory spider: hop %d - discovered %d new relays", hop, newRelaysThisHop)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getRelaysFromLocalDB queries the local database for kind 10002 events from seed pubkeys.
|
||||||
|
func (ds *DirectorySpider) getRelaysFromLocalDB(seedPubkeys [][]byte) ([]string, error) {
|
||||||
|
ctx, cancel := context.WithTimeout(ds.ctx, 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Query for kind 10002 from seed pubkeys
|
||||||
|
f := &filter.F{
|
||||||
|
Authors: tag.NewFromBytesSlice(seedPubkeys...),
|
||||||
|
Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
|
||||||
|
}
|
||||||
|
|
||||||
|
events, err := ds.db.QueryEvents(ctx, f)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ds.extractRelaysFromEvents(events), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchRelayListsFromRelay connects to a relay and fetches all kind 10002 events.
|
||||||
|
func (ds *DirectorySpider) fetchRelayListsFromRelay(relayURL string) ([]*event.E, error) {
|
||||||
|
ctx, cancel := context.WithTimeout(ds.ctx, DirectorySpiderRelayTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
log.D.F("directory spider: connecting to %s", relayURL)
|
||||||
|
|
||||||
|
client, err := ws.RelayConnect(ctx, relayURL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errorf.W("failed to connect: %v", err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Query for all kind 10002 events
|
||||||
|
limit := uint(DirectorySpiderMaxEventsPerQuery)
|
||||||
|
f := filter.NewS(&filter.F{
|
||||||
|
Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
|
||||||
|
Limit: &limit,
|
||||||
|
})
|
||||||
|
|
||||||
|
sub, err := client.Subscribe(ctx, f)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errorf.W("failed to subscribe: %v", err)
|
||||||
|
}
|
||||||
|
defer sub.Unsub()
|
||||||
|
|
||||||
|
var events []*event.E
|
||||||
|
queryCtx, queryCancel := context.WithTimeout(ctx, DirectorySpiderQueryTimeout)
|
||||||
|
defer queryCancel()
|
||||||
|
|
||||||
|
// Collect events until EOSE or timeout
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-queryCtx.Done():
|
||||||
|
log.D.F("directory spider: query timeout for %s, got %d events", relayURL, len(events))
|
||||||
|
return events, nil
|
||||||
|
case <-sub.EndOfStoredEvents:
|
||||||
|
log.D.F("directory spider: EOSE from %s, got %d events", relayURL, len(events))
|
||||||
|
return events, nil
|
||||||
|
case ev := <-sub.Events:
|
||||||
|
if ev == nil {
|
||||||
|
return events, nil
|
||||||
|
}
|
||||||
|
events = append(events, ev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// extractRelaysFromEvents parses kind 10002 events and extracts relay URLs from "r" tags.
|
||||||
|
func (ds *DirectorySpider) extractRelaysFromEvents(events []*event.E) []string {
|
||||||
|
seen := make(map[string]bool)
|
||||||
|
var relays []string
|
||||||
|
|
||||||
|
for _, ev := range events {
|
||||||
|
// Get all "r" tags
|
||||||
|
rTags := ev.Tags.GetAll([]byte("r"))
|
||||||
|
for _, rTag := range rTags {
|
||||||
|
if len(rTag.T) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
urlBytes := rTag.Value()
|
||||||
|
if len(urlBytes) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normalize the URL
|
||||||
|
normalized := string(normalize.URL(string(urlBytes)))
|
||||||
|
if normalized == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !seen[normalized] {
|
||||||
|
seen[normalized] = true
|
||||||
|
relays = append(relays, normalized)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return relays
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchMetadataFromRelays iterates through all discovered relays and fetches metadata.
|
||||||
|
func (ds *DirectorySpider) fetchMetadataFromRelays() error {
|
||||||
|
ds.mu.Lock()
|
||||||
|
// Copy relay list to avoid holding lock during network operations
|
||||||
|
var relays []string
|
||||||
|
for url := range ds.discoveredRelays {
|
||||||
|
relays = append(relays, url)
|
||||||
|
}
|
||||||
|
ds.mu.Unlock()
|
||||||
|
|
||||||
|
log.I.F("directory spider: fetching metadata from %d relays", len(relays))
|
||||||
|
|
||||||
|
// Kinds to fetch: 0 (profile), 3 (follow list), 10000 (mute list), 10002 (relay list)
|
||||||
|
kindsToFetch := []uint16{
|
||||||
|
kind.ProfileMetadata.K, // 0
|
||||||
|
kind.FollowList.K, // 3
|
||||||
|
kind.MuteList.K, // 10000
|
||||||
|
kind.RelayListMetadata.K, // 10002
|
||||||
|
}
|
||||||
|
|
||||||
|
totalSaved := 0
|
||||||
|
totalDuplicates := 0
|
||||||
|
|
||||||
|
for _, relayURL := range relays {
|
||||||
|
select {
|
||||||
|
case <-ds.ctx.Done():
|
||||||
|
return ds.ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
ds.mu.Lock()
|
||||||
|
alreadyProcessed := ds.processedRelays[relayURL]
|
||||||
|
ds.mu.Unlock()
|
||||||
|
|
||||||
|
if alreadyProcessed {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.D.F("directory spider: fetching metadata from %s", relayURL)
|
||||||
|
|
||||||
|
for _, k := range kindsToFetch {
|
||||||
|
select {
|
||||||
|
case <-ds.ctx.Done():
|
||||||
|
return ds.ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
events, err := ds.fetchKindFromRelay(relayURL, k)
|
||||||
|
if err != nil {
|
||||||
|
log.W.F("directory spider: failed to fetch kind %d from %s: %v", k, relayURL, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
saved, duplicates := ds.storeEvents(events)
|
||||||
|
totalSaved += saved
|
||||||
|
totalDuplicates += duplicates
|
||||||
|
|
||||||
|
log.D.F("directory spider: kind %d from %s: %d saved, %d duplicates",
|
||||||
|
k, relayURL, saved, duplicates)
|
||||||
|
}
|
||||||
|
|
||||||
|
ds.mu.Lock()
|
||||||
|
ds.processedRelays[relayURL] = true
|
||||||
|
ds.mu.Unlock()
|
||||||
|
|
||||||
|
// Rate limiting delay between relays
|
||||||
|
time.Sleep(DirectorySpiderRelayDelay)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.I.F("directory spider: metadata fetch complete - %d events saved, %d duplicates",
|
||||||
|
totalSaved, totalDuplicates)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchKindFromRelay connects to a relay and fetches events of a specific kind.
|
||||||
|
func (ds *DirectorySpider) fetchKindFromRelay(relayURL string, k uint16) ([]*event.E, error) {
|
||||||
|
ctx, cancel := context.WithTimeout(ds.ctx, DirectorySpiderRelayTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
client, err := ws.RelayConnect(ctx, relayURL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errorf.W("failed to connect: %v", err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Query for events of this kind
|
||||||
|
limit := uint(DirectorySpiderMaxEventsPerQuery)
|
||||||
|
f := filter.NewS(&filter.F{
|
||||||
|
Kinds: kind.NewS(kind.New(k)),
|
||||||
|
Limit: &limit,
|
||||||
|
})
|
||||||
|
|
||||||
|
sub, err := client.Subscribe(ctx, f)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errorf.W("failed to subscribe: %v", err)
|
||||||
|
}
|
||||||
|
defer sub.Unsub()
|
||||||
|
|
||||||
|
var events []*event.E
|
||||||
|
queryCtx, queryCancel := context.WithTimeout(ctx, DirectorySpiderQueryTimeout)
|
||||||
|
defer queryCancel()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-queryCtx.Done():
|
||||||
|
return events, nil
|
||||||
|
case <-sub.EndOfStoredEvents:
|
||||||
|
return events, nil
|
||||||
|
case ev := <-sub.Events:
|
||||||
|
if ev == nil {
|
||||||
|
return events, nil
|
||||||
|
}
|
||||||
|
events = append(events, ev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// storeEvents saves events to the database and publishes new ones.
|
||||||
|
func (ds *DirectorySpider) storeEvents(events []*event.E) (saved, duplicates int) {
|
||||||
|
for _, ev := range events {
|
||||||
|
_, err := ds.db.SaveEvent(ds.ctx, ev)
|
||||||
|
if err != nil {
|
||||||
|
if chk.T(err) {
|
||||||
|
// Most errors are duplicates, which is expected
|
||||||
|
duplicates++
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
saved++
|
||||||
|
|
||||||
|
// Publish event to active subscribers
|
||||||
|
if ds.pub != nil {
|
||||||
|
go ds.pub.Deliver(ev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// isSelfRelay checks if a relay URL is ourselves by comparing NIP-11 pubkeys.
|
||||||
|
func (ds *DirectorySpider) isSelfRelay(relayURL string) bool {
|
||||||
|
// If we don't have a relay identity pubkey, can't compare
|
||||||
|
if ds.relayIdentityPubkey == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
ds.mu.Lock()
|
||||||
|
// Fast path: check if we already know this URL is ours
|
||||||
|
if ds.selfURLs[relayURL] {
|
||||||
|
ds.mu.Unlock()
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
ds.mu.Unlock()
|
||||||
|
|
||||||
|
// Slow path: check via NIP-11 pubkey
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
peerPubkey, err := ds.nip11Cache.GetPubkey(ctx, relayURL)
|
||||||
|
if err != nil {
|
||||||
|
// Can't determine, assume not self
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if peerPubkey == ds.relayIdentityPubkey {
|
||||||
|
log.D.F("directory spider: discovered self-relay: %s", relayURL)
|
||||||
|
ds.mu.Lock()
|
||||||
|
ds.selfURLs[relayURL] = true
|
||||||
|
ds.mu.Unlock()
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
166
pkg/spider/directory_test.go
Normal file
166
pkg/spider/directory_test.go
Normal file
@@ -0,0 +1,166 @@
|
|||||||
|
package spider
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.mleku.dev/mleku/nostr/encoders/event"
|
||||||
|
"git.mleku.dev/mleku/nostr/encoders/kind"
|
||||||
|
"git.mleku.dev/mleku/nostr/encoders/tag"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestExtractRelaysFromEvents(t *testing.T) {
|
||||||
|
ds := &DirectorySpider{}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
events []*event.E
|
||||||
|
expected []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "empty events",
|
||||||
|
events: []*event.E{},
|
||||||
|
expected: []string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "single event with relays",
|
||||||
|
events: []*event.E{
|
||||||
|
{
|
||||||
|
Kind: kind.RelayListMetadata.K,
|
||||||
|
Tags: &tag.S{
|
||||||
|
tag.NewFromBytesSlice([]byte("r"), []byte("wss://relay1.example.com")),
|
||||||
|
tag.NewFromBytesSlice([]byte("r"), []byte("wss://relay2.example.com")),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expected: []string{"wss://relay1.example.com", "wss://relay2.example.com"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple events with duplicate relays",
|
||||||
|
events: []*event.E{
|
||||||
|
{
|
||||||
|
Kind: kind.RelayListMetadata.K,
|
||||||
|
Tags: &tag.S{
|
||||||
|
tag.NewFromBytesSlice([]byte("r"), []byte("wss://relay1.example.com")),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Kind: kind.RelayListMetadata.K,
|
||||||
|
Tags: &tag.S{
|
||||||
|
tag.NewFromBytesSlice([]byte("r"), []byte("wss://relay1.example.com")),
|
||||||
|
tag.NewFromBytesSlice([]byte("r"), []byte("wss://relay3.example.com")),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expected: []string{"wss://relay1.example.com", "wss://relay3.example.com"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "event with empty r tags",
|
||||||
|
events: []*event.E{
|
||||||
|
{
|
||||||
|
Kind: kind.RelayListMetadata.K,
|
||||||
|
Tags: &tag.S{
|
||||||
|
tag.NewFromBytesSlice([]byte("r")), // empty value
|
||||||
|
tag.NewFromBytesSlice([]byte("r"), []byte("wss://valid.relay.com")),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expected: []string{"wss://valid.relay.com"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "normalizes relay URLs",
|
||||||
|
events: []*event.E{
|
||||||
|
{
|
||||||
|
Kind: kind.RelayListMetadata.K,
|
||||||
|
Tags: &tag.S{
|
||||||
|
tag.NewFromBytesSlice([]byte("r"), []byte("wss://relay.example.com")),
|
||||||
|
tag.NewFromBytesSlice([]byte("r"), []byte("wss://relay.example.com/")), // duplicate with trailing slash
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expected: []string{"wss://relay.example.com"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
result := ds.extractRelaysFromEvents(tt.events)
|
||||||
|
|
||||||
|
// For empty case, check length
|
||||||
|
if len(tt.expected) == 0 {
|
||||||
|
assert.Empty(t, result)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that all expected relays are present (order may vary)
|
||||||
|
assert.Len(t, result, len(tt.expected))
|
||||||
|
for _, expected := range tt.expected {
|
||||||
|
assert.Contains(t, result, expected)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDirectorySpiderLifecycle(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Create spider without database (will return error)
|
||||||
|
_, err := NewDirectorySpider(ctx, nil, nil, 0, 0)
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.Contains(t, err.Error(), "database cannot be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDirectorySpiderDefaults(t *testing.T) {
|
||||||
|
// Test that defaults are applied correctly
|
||||||
|
assert.Equal(t, 24*time.Hour, DirectorySpiderDefaultInterval)
|
||||||
|
assert.Equal(t, 3, DirectorySpiderDefaultMaxHops)
|
||||||
|
assert.Equal(t, 30*time.Second, DirectorySpiderRelayTimeout)
|
||||||
|
assert.Equal(t, 60*time.Second, DirectorySpiderQueryTimeout)
|
||||||
|
assert.Equal(t, 500*time.Millisecond, DirectorySpiderRelayDelay)
|
||||||
|
assert.Equal(t, 5000, DirectorySpiderMaxEventsPerQuery)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTriggerNow(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
ds := &DirectorySpider{
|
||||||
|
ctx: ctx,
|
||||||
|
triggerChan: make(chan struct{}, 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
// First trigger should succeed
|
||||||
|
ds.TriggerNow()
|
||||||
|
|
||||||
|
// Verify trigger was sent
|
||||||
|
select {
|
||||||
|
case <-ds.triggerChan:
|
||||||
|
// Expected
|
||||||
|
default:
|
||||||
|
t.Error("trigger was not sent")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second trigger while channel is empty should also succeed
|
||||||
|
ds.TriggerNow()
|
||||||
|
|
||||||
|
// But if we trigger again without draining, it should not block
|
||||||
|
ds.TriggerNow() // Should not block due to select default case
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLastRun(t *testing.T) {
|
||||||
|
ds := &DirectorySpider{}
|
||||||
|
|
||||||
|
// Initially should be zero
|
||||||
|
assert.True(t, ds.LastRun().IsZero())
|
||||||
|
|
||||||
|
// Set a time
|
||||||
|
now := time.Now()
|
||||||
|
ds.lastRun = now
|
||||||
|
|
||||||
|
// Should return the set time
|
||||||
|
assert.Equal(t, now, ds.LastRun())
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user