diff --git a/pkg/spider/spider.go b/pkg/spider/spider.go index e241aec..e4e6696 100644 --- a/pkg/spider/spider.go +++ b/pkg/spider/spider.go @@ -7,16 +7,18 @@ import ( "sync" "time" - "lol.mleku.dev/chk" - "lol.mleku.dev/errorf" - "lol.mleku.dev/log" - "next.orly.dev/pkg/database" + "git.mleku.dev/mleku/nostr/crypto/keys" "git.mleku.dev/mleku/nostr/encoders/filter" "git.mleku.dev/mleku/nostr/encoders/hex" "git.mleku.dev/mleku/nostr/encoders/tag" "git.mleku.dev/mleku/nostr/encoders/timestamp" - "next.orly.dev/pkg/interfaces/publisher" "git.mleku.dev/mleku/nostr/ws" + "lol.mleku.dev/chk" + "lol.mleku.dev/errorf" + "lol.mleku.dev/log" + "next.orly.dev/pkg/database" + "next.orly.dev/pkg/interfaces/publisher" + dsync "next.orly.dev/pkg/sync" ) const ( @@ -53,8 +55,10 @@ type Spider struct { mode string // Configuration - adminRelays []string - followList [][]byte + adminRelays []string + followList [][]byte + relayIdentityPubkey string // Our relay's identity pubkey (hex) + selfURLs map[string]bool // URLs discovered to be ourselves (for fast lookups) // State management mu sync.RWMutex @@ -129,14 +133,24 @@ func New(ctx context.Context, db *database.D, pub publisher.I, mode string) (s * } ctx, cancel := context.WithCancel(ctx) + + // Get relay identity pubkey for self-detection + var relayPubkey string + if skb, err := db.GetRelayIdentitySecret(); err == nil && len(skb) == 32 { + pk, _ := keys.SecretBytesToPubKeyHex(skb) + relayPubkey = pk + } + s = &Spider{ - ctx: ctx, - cancel: cancel, - db: db, - pub: pub, - mode: mode, - connections: make(map[string]*RelayConnection), - followListUpdated: make(chan struct{}, 1), + ctx: ctx, + cancel: cancel, + db: db, + pub: pub, + mode: mode, + relayIdentityPubkey: relayPubkey, + selfURLs: make(map[string]bool), + connections: make(map[string]*RelayConnection), + followListUpdated: make(chan struct{}, 1), } return @@ -254,9 +268,15 @@ func (s *Spider) updateConnections() { return } - // Update connections for current admin relays + // Update connections for current admin relays (filtering out self) currentRelays := make(map[string]bool) for _, url := range adminRelays { + // Check if this relay URL is ourselves + if s.isSelfRelay(url) { + log.D.F("spider: skipping self-relay: %s", url) + continue + } + currentRelays[url] = true if conn, exists := s.connections[url]; exists { @@ -804,3 +824,42 @@ func (rc *RelayConnection) close() { rc.cancel() } + +// isSelfRelay checks if a relay URL is actually ourselves by comparing NIP-11 pubkeys +func (s *Spider) isSelfRelay(relayURL string) bool { + // If we don't have a relay identity pubkey, can't compare + if s.relayIdentityPubkey == "" { + return false + } + + s.mu.RLock() + // Fast path: check if we already know this URL is ours + if s.selfURLs[relayURL] { + s.mu.RUnlock() + log.D.F("spider: skipping self-relay (known URL): %s", relayURL) + return true + } + s.mu.RUnlock() + + // Slow path: check via NIP-11 pubkey + nip11Cache := dsync.NewNIP11Cache(30 * time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + peerPubkey, err := nip11Cache.GetPubkey(ctx, relayURL) + if err != nil { + log.D.F("spider: couldn't fetch NIP-11 for %s: %v", relayURL, err) + return false + } + + if peerPubkey == s.relayIdentityPubkey { + log.I.F("spider: discovered self-relay: %s (pubkey: %s)", relayURL, s.relayIdentityPubkey) + // Cache this URL as ours for future fast lookups + s.mu.Lock() + s.selfURLs[relayURL] = true + s.mu.Unlock() + return true + } + + return false +} diff --git a/pkg/sync/cluster.go b/pkg/sync/cluster.go index 068ab54..15ddc00 100644 --- a/pkg/sync/cluster.go +++ b/pkg/sync/cluster.go @@ -25,6 +25,7 @@ type ClusterManager struct { db *database.D adminNpubs []string relayIdentityPubkey string // Our relay's identity pubkey (hex) + selfURLs map[string]bool // URLs discovered to be ourselves (for fast lookups) members map[string]*ClusterMember // keyed by relay URL membersMux sync.RWMutex pollTicker *time.Ticker @@ -78,6 +79,7 @@ func NewClusterManager(ctx context.Context, db *database.D, adminNpubs []string, db: db, adminNpubs: adminNpubs, relayIdentityPubkey: relayPubkey, + selfURLs: make(map[string]bool), members: make(map[string]*ClusterMember), pollDone: make(chan struct{}), propagatePrivilegedEvents: propagatePrivilegedEvents, @@ -265,48 +267,47 @@ func (cm *ClusterManager) UpdateMembership(relayURLs []string) { } } - // Add new members + // Add new members (filter out self once at this point) for _, url := range relayURLs { - // Skip if this is our own relay (check via NIP-11 pubkey) - if cm.isSelfRelay(url) { - log.D.F("skipping cluster member (self): %s (pubkey matches our relay identity)", url) + // Skip if already exists + if _, exists := cm.members[url]; exists { continue } - if _, exists := cm.members[url]; !exists { - // For simplicity, assume HTTP and WebSocket URLs are the same - // In practice, you'd need to parse these properly - member := &ClusterMember{ - HTTPURL: url, - WebSocketURL: url, // TODO: Convert to WebSocket URL - LastSerial: 0, - Status: "unknown", - } - cm.members[url] = member - log.I.F("added cluster member: %s", url) + // Fast path: check if we already know this URL is ours + if cm.selfURLs[url] { + log.I.F("removed self from cluster members (known URL): %s", url) + continue } + + // Slow path: check via NIP-11 pubkey + if cm.relayIdentityPubkey != "" { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + peerPubkey, err := cm.nip11Cache.GetPubkey(ctx, url) + cancel() + + if err != nil { + log.D.F("couldn't fetch NIP-11 for %s, adding to cluster anyway: %v", url, err) + } else if peerPubkey == cm.relayIdentityPubkey { + log.I.F("removed self from cluster members (discovered): %s (pubkey: %s)", url, cm.relayIdentityPubkey) + // Cache this URL as ours for future fast lookups + cm.selfURLs[url] = true + continue + } + } + + // Add member + member := &ClusterMember{ + HTTPURL: url, + WebSocketURL: url, // TODO: Convert to WebSocket URL + LastSerial: 0, + Status: "unknown", + } + cm.members[url] = member + log.I.F("added cluster member: %s", url) } } -// isSelfRelay checks if a relay URL is actually ourselves by comparing NIP-11 pubkeys -func (cm *ClusterManager) isSelfRelay(relayURL string) bool { - // If we don't have a relay identity pubkey, can't compare - if cm.relayIdentityPubkey == "" { - return false - } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - peerPubkey, err := cm.nip11Cache.GetPubkey(ctx, relayURL) - if err != nil { - log.D.F("couldn't fetch NIP-11 for %s to check if self: %v", relayURL, err) - return false - } - - return peerPubkey == cm.relayIdentityPubkey -} - // HandleMembershipEvent processes a cluster membership event (Kind 39108) func (cm *ClusterManager) HandleMembershipEvent(event *event.E) error { // Verify the event is signed by a cluster admin @@ -352,18 +353,37 @@ func (cm *ClusterManager) HandleLatestSerial(w http.ResponseWriter, r *http.Requ } // Check if request is from ourselves by examining the Referer or Origin header + // Note: Self-members are already filtered out, but this catches edge cases origin := r.Header.Get("Origin") referer := r.Header.Get("Referer") - if origin != "" && cm.isSelfRelay(origin) { - log.D.F("rejecting cluster latest request from self (origin: %s)", origin) - http.Error(w, "Cannot sync with self", http.StatusBadRequest) - return - } - if referer != "" && cm.isSelfRelay(referer) { - log.D.F("rejecting cluster latest request from self (referer: %s)", referer) - http.Error(w, "Cannot sync with self", http.StatusBadRequest) - return + if cm.relayIdentityPubkey != "" && (origin != "" || referer != "") { + checkURL := origin + if checkURL == "" { + checkURL = referer + } + + // Fast path: check known self-URLs + if cm.selfURLs[checkURL] { + log.D.F("rejecting cluster latest request from self (known URL): %s", checkURL) + http.Error(w, "Cannot sync with self", http.StatusBadRequest) + return + } + + // Slow path: verify via NIP-11 + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + peerPubkey, err := cm.nip11Cache.GetPubkey(ctx, checkURL) + cancel() + + if err == nil && peerPubkey == cm.relayIdentityPubkey { + log.D.F("rejecting cluster latest request from self (discovered): %s", checkURL) + // Cache for future fast lookups + cm.membersMux.Lock() + cm.selfURLs[checkURL] = true + cm.membersMux.Unlock() + http.Error(w, "Cannot sync with self", http.StatusBadRequest) + return + } } // Get the latest serial from database by querying for the highest serial @@ -390,18 +410,37 @@ func (cm *ClusterManager) HandleEventsRange(w http.ResponseWriter, r *http.Reque } // Check if request is from ourselves by examining the Referer or Origin header + // Note: Self-members are already filtered out, but this catches edge cases origin := r.Header.Get("Origin") referer := r.Header.Get("Referer") - if origin != "" && cm.isSelfRelay(origin) { - log.D.F("rejecting cluster events request from self (origin: %s)", origin) - http.Error(w, "Cannot sync with self", http.StatusBadRequest) - return - } - if referer != "" && cm.isSelfRelay(referer) { - log.D.F("rejecting cluster events request from self (referer: %s)", referer) - http.Error(w, "Cannot sync with self", http.StatusBadRequest) - return + if cm.relayIdentityPubkey != "" && (origin != "" || referer != "") { + checkURL := origin + if checkURL == "" { + checkURL = referer + } + + // Fast path: check known self-URLs + if cm.selfURLs[checkURL] { + log.D.F("rejecting cluster events request from self (known URL): %s", checkURL) + http.Error(w, "Cannot sync with self", http.StatusBadRequest) + return + } + + // Slow path: verify via NIP-11 + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + peerPubkey, err := cm.nip11Cache.GetPubkey(ctx, checkURL) + cancel() + + if err == nil && peerPubkey == cm.relayIdentityPubkey { + log.D.F("rejecting cluster events request from self (discovered): %s", checkURL) + // Cache for future fast lookups + cm.membersMux.Lock() + cm.selfURLs[checkURL] = true + cm.membersMux.Unlock() + http.Error(w, "Cannot sync with self", http.StatusBadRequest) + return + } } // Parse query parameters diff --git a/pkg/sync/manager.go b/pkg/sync/manager.go index 9bf8370..6afa17f 100644 --- a/pkg/sync/manager.go +++ b/pkg/sync/manager.go @@ -26,6 +26,7 @@ type Manager struct { nodeID string relayURL string peers []string + selfURLs map[string]bool // URLs discovered to be ourselves (for fast lookups) currentSerial uint64 peerSerials map[string]uint64 // peer URL -> latest serial seen relayGroupMgr *RelayGroupManager @@ -72,6 +73,7 @@ func NewManager(ctx context.Context, db *database.D, nodeID, relayURL string, pe nodeID: nodeID, relayURL: relayURL, peers: peers, + selfURLs: make(map[string]bool), currentSerial: 0, peerSerials: make(map[string]uint64), relayGroupMgr: relayGroupMgr, @@ -79,6 +81,44 @@ func NewManager(ctx context.Context, db *database.D, nodeID, relayURL string, pe policyManager: policyManager, } + // Add our configured relay URL to self-URLs cache if provided + if m.relayURL != "" { + m.selfURLs[m.relayURL] = true + } + + // Remove self from peer list once at startup if we have a nodeID + if m.nodeID != "" { + filteredPeers := make([]string, 0, len(m.peers)) + for _, peerURL := range m.peers { + // Fast path: check if we already know this URL is ours + if m.selfURLs[peerURL] { + log.I.F("removed self from sync peer list (known URL): %s", peerURL) + continue + } + + // Slow path: check via NIP-11 pubkey + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + peerPubkey, err := m.nip11Cache.GetPubkey(ctx, peerURL) + cancel() + + if err != nil { + log.D.F("couldn't fetch NIP-11 for %s, keeping in peer list: %v", peerURL, err) + filteredPeers = append(filteredPeers, peerURL) + continue + } + + if peerPubkey == m.nodeID { + log.I.F("removed self from sync peer list (discovered): %s (pubkey: %s)", peerURL, m.nodeID) + // Cache this URL as ours for future fast lookups + m.selfURLs[peerURL] = true + continue + } + + filteredPeers = append(filteredPeers, peerURL) + } + m.peers = filteredPeers + } + // Start sync routine go m.syncRoutine() @@ -173,36 +213,13 @@ func (m *Manager) syncRoutine() { // syncWithPeersSequentially syncs with all configured peers one at a time func (m *Manager) syncWithPeersSequentially() { for _, peerURL := range m.peers { - // Check if this peer is ourselves via NIP-11 pubkey - if m.isSelfPeer(peerURL) { - log.D.F("skipping sync with self: %s (pubkey matches our relay identity)", peerURL) - continue - } + // Self-peers are already filtered out during initialization/update m.syncWithPeer(peerURL) // Small delay between peers to avoid overwhelming time.Sleep(100 * time.Millisecond) } } -// isSelfPeer checks if a peer URL is actually ourselves by comparing NIP-11 pubkeys -func (m *Manager) isSelfPeer(peerURL string) bool { - // If we don't have a nodeID, can't compare - if m.nodeID == "" { - return false - } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - peerPubkey, err := m.nip11Cache.GetPubkey(ctx, peerURL) - if err != nil { - log.D.F("couldn't fetch NIP-11 for %s to check if self: %v", peerURL, err) - return false - } - - return peerPubkey == m.nodeID -} - // syncWithPeer syncs with a specific peer func (m *Manager) syncWithPeer(peerURL string) { // Get the peer's current serial @@ -417,6 +434,13 @@ func (m *Manager) HandleCurrentRequest(w http.ResponseWriter, r *http.Request) { // Reject requests from ourselves (same nodeID) if req.NodeID != "" && req.NodeID == m.nodeID { log.D.F("rejecting sync current request from self (nodeID: %s)", req.NodeID) + // Cache the requesting relay URL as ours for future fast lookups + if req.RelayURL != "" { + m.mutex.Lock() + m.selfURLs[req.RelayURL] = true + m.mutex.Unlock() + log.D.F("cached self-URL from inbound request: %s", req.RelayURL) + } http.Error(w, "Cannot sync with self", http.StatusBadRequest) return } @@ -447,6 +471,13 @@ func (m *Manager) HandleEventIDsRequest(w http.ResponseWriter, r *http.Request) // Reject requests from ourselves (same nodeID) if req.NodeID != "" && req.NodeID == m.nodeID { log.D.F("rejecting sync event-ids request from self (nodeID: %s)", req.NodeID) + // Cache the requesting relay URL as ours for future fast lookups + if req.RelayURL != "" { + m.mutex.Lock() + m.selfURLs[req.RelayURL] = true + m.mutex.Unlock() + log.D.F("cached self-URL from inbound request: %s", req.RelayURL) + } http.Error(w, "Cannot sync with self", http.StatusBadRequest) return } diff --git a/pkg/version/version b/pkg/version/version index f96e4b5..0143f63 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.29.18 \ No newline at end of file +v0.29.19 \ No newline at end of file