From 15e2988222db951035621deefd81a28cb98f388c Mon Sep 17 00:00:00 2001 From: mleku Date: Tue, 21 Oct 2025 16:31:28 +0100 Subject: [PATCH] Refactor self-connection handling and improve message processing - Removed self-connection detection logic from the Listener and Server, simplifying the message handling process. - Updated the HandleMessage and handle-websocket functions to eliminate checks for self-connections, enhancing clarity and maintainability. - Adjusted AUTH challenge logic to focus solely on blacklisted IPs, streamlining connection management. --- app/handle-message.go | 6 -- app/handle-websocket.go | 10 +-- app/listener.go | 1 - app/server.go | 27 -------- pkg/acl/follows.go | 143 ++++++++++++++++++++-------------------- 5 files changed, 72 insertions(+), 115 deletions(-) diff --git a/app/handle-message.go b/app/handle-message.go index 41838f1..90d85c8 100644 --- a/app/handle-message.go +++ b/app/handle-message.go @@ -16,12 +16,6 @@ import ( ) func (l *Listener) HandleMessage(msg []byte, remote string) { - // Ignore all messages from self-connections - if l.isSelfConnection { - log.D.F("ignoring message from self-connection %s", remote) - return - } - // Handle blacklisted IPs - discard messages but keep connection open until timeout if l.isBlacklisted { // Check if timeout has been reached diff --git a/app/handle-websocket.go b/app/handle-websocket.go index 33344af..4cc0fbd 100644 --- a/app/handle-websocket.go +++ b/app/handle-websocket.go @@ -88,12 +88,6 @@ whitelist: startTime: time.Now(), } - // Detect self-connections early to avoid sending AUTH challenges - listener.isSelfConnection = s.isSelfConnection(remote) - if listener.isSelfConnection { - log.W.F("detected self-connection from %s, marking connection", remote) - } - // Check for blacklisted IPs listener.isBlacklisted = s.isIPBlacklisted(remote) if listener.isBlacklisted { @@ -103,7 +97,7 @@ whitelist: chal := make([]byte, 32) rand.Read(chal) listener.challenge.Store([]byte(hex.Enc(chal))) - if s.Config.ACLMode != "none" && !listener.isSelfConnection { + if s.Config.ACLMode != "none" { log.D.F("sending AUTH challenge to %s", remote) if err = authenvelope.NewChallengeWith(listener.challenge.Load()). Write(listener); chk.E(err) { @@ -111,8 +105,6 @@ whitelist: return } log.D.F("AUTH challenge sent successfully to %s", remote) - } else if listener.isSelfConnection { - log.D.F("skipping AUTH challenge for self-connection from %s", remote) } ticker := time.NewTicker(DefaultPingWait) go s.Pinger(ctx, conn, ticker, cancel) diff --git a/app/listener.go b/app/listener.go index 3a3fdc8..26a8b32 100644 --- a/app/listener.go +++ b/app/listener.go @@ -24,7 +24,6 @@ type Listener struct { challenge atomic.Bytes authedPubkey atomic.Bytes startTime time.Time - isSelfConnection bool // Marker to identify self-connections isBlacklisted bool // Marker to identify blacklisted IPs blacklistTimeout time.Time // When to timeout blacklisted connections // Diagnostics: per-connection counters diff --git a/app/server.go b/app/server.go index cad8b01..2f23ede 100644 --- a/app/server.go +++ b/app/server.go @@ -49,33 +49,6 @@ type Server struct { policyManager *policy.P } -// isSelfConnection checks if the connection is coming from the relay itself -func (s *Server) isSelfConnection(remote string) bool { - // Check for localhost connections - if strings.HasPrefix(remote, "127.0.0.1:") || strings.HasPrefix(remote, "::1:") || strings.HasPrefix(remote, "[::1]:") { - return true - } - - // Check for connections from the same IP as the server - // This handles cases where the relay connects to itself via its public IP - if s.Config.Listen != "" { - // Extract IP from listen address (e.g., "0.0.0.0" -> "0.0.0.0") - listenIP := s.Config.Listen - if listenIP == "0.0.0.0" || listenIP == "" { - // If listening on all interfaces, check if remote IP matches any local interface - // For now, we'll be conservative and only check localhost - return false - } - // Check if remote IP matches the listen IP - remoteIP := strings.Split(remote, ":")[0] - if remoteIP == listenIP { - return true - } - } - - return false -} - // isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system func (s *Server) isIPBlacklisted(remote string) bool { // Extract IP from remote address (e.g., "192.168.1.1:12345" -> "192.168.1.1") diff --git a/pkg/acl/follows.go b/pkg/acl/follows.go index ad0d2df..3bab5f1 100644 --- a/pkg/acl/follows.go +++ b/pkg/acl/follows.go @@ -396,88 +396,87 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) { log.T.F("follows syncer: sent ping to %s", u) continue default: - } - - // Set a read timeout to avoid hanging - readCtx, readCancel := context.WithTimeout(ctx, 60*time.Second) - _, data, err := c.Read(readCtx) - readCancel() - if err != nil { - _ = c.Close(websocket.StatusNormalClosure, "read err") - break - } - label, rem, err := envelopes.Identify(data) - if chk.E(err) { - continue - } - switch label { - case eventenvelope.L: - res, _, err := eventenvelope.ParseResult(rem) - if chk.E(err) || res == nil || res.Event == nil { - continue - } - // verify signature before saving - if ok, err := res.Event.Verify(); chk.T(err) || !ok { + // Set a read timeout to avoid hanging + readCtx, readCancel := context.WithTimeout(ctx, 60*time.Second) + _, data, err := c.Read(readCtx) + readCancel() + if err != nil { + _ = c.Close(websocket.StatusNormalClosure, "read err") + break + } + label, rem, err := envelopes.Identify(data) + if chk.E(err) { continue } + switch label { + case eventenvelope.L: + res, _, err := eventenvelope.ParseResult(rem) + if chk.E(err) || res == nil || res.Event == nil { + continue + } + // verify signature before saving + if ok, err := res.Event.Verify(); chk.T(err) || !ok { + continue + } - // Process events based on kind - switch res.Event.Kind { - case kind.FollowList.K: - // Check if this is from an admin and process immediately - if f.isAdminPubkey(res.Event.Pubkey) { - log.I.F( - "follows syncer: received admin follow list from %s on relay %s - processing immediately", - hex.EncodeToString(res.Event.Pubkey), u, - ) - f.extractFollowedPubkeys(res.Event) - } else { + // Process events based on kind + switch res.Event.Kind { + case kind.FollowList.K: + // Check if this is from an admin and process immediately + if f.isAdminPubkey(res.Event.Pubkey) { + log.I.F( + "follows syncer: received admin follow list from %s on relay %s - processing immediately", + hex.EncodeToString(res.Event.Pubkey), u, + ) + f.extractFollowedPubkeys(res.Event) + } else { + log.T.F( + "follows syncer: received follow list from non-admin %s on relay %s - ignoring", + hex.EncodeToString(res.Event.Pubkey), u, + ) + } + case kind.RelayListMetadata.K: log.T.F( - "follows syncer: received follow list from non-admin %s on relay %s - ignoring", + "follows syncer: received kind 10002 (relay list) event from %s on relay %s", + hex.EncodeToString(res.Event.Pubkey), u, + ) + default: + // Log all other events from followed users + log.T.F( + "follows syncer: received kind %d event from %s on relay %s", + res.Event.Kind, hex.EncodeToString(res.Event.Pubkey), u, ) } - case kind.RelayListMetadata.K: - log.T.F( - "follows syncer: received kind 10002 (relay list) event from %s on relay %s", - hex.EncodeToString(res.Event.Pubkey), u, - ) - default: - // Log all other events from followed users - log.T.F( - "follows syncer: received kind %d event from %s on relay %s", - res.Event.Kind, - hex.EncodeToString(res.Event.Pubkey), u, - ) - } - if _, err = f.D.SaveEvent( - ctx, res.Event, - ); err != nil { - if !strings.HasPrefix( - err.Error(), "blocked:", - ) { - log.W.F( - "follows syncer: save event failed: %v", - err, - ) + if _, err = f.D.SaveEvent( + ctx, res.Event, + ); err != nil { + if !strings.HasPrefix( + err.Error(), "blocked:", + ) { + log.W.F( + "follows syncer: save event failed: %v", + err, + ) + } + // ignore duplicates and continue + } else { + // Only dispatch if the event was newly saved (no error) + if f.pubs != nil { + go f.pubs.Deliver(res.Event) + } + // log.I.F( + // "saved new event from follows syncer: %0x", + // res.Event.ID, + // ) } - // ignore duplicates and continue - } else { - // Only dispatch if the event was newly saved (no error) - if f.pubs != nil { - go f.pubs.Deliver(res.Event) - } - // log.I.F( - // "saved new event from follows syncer: %0x", - // res.Event.ID, - // ) + case eoseenvelope.L: + log.T.F("follows syncer: received EOSE from %s, continuing persistent subscription", u) + // Continue the subscription for new events + default: + // ignore other labels } - case eoseenvelope.L: - log.T.F("follows syncer: received EOSE from %s, continuing persistent subscription", u) - // Continue the subscription for new events - default: - // ignore other labels } } // Connection dropped, reconnect after delay