Refactor self-connection handling and improve message processing

- Removed self-connection detection logic from the Listener and Server, simplifying the message handling process.
- Updated the HandleMessage and handle-websocket functions to eliminate checks for self-connections, enhancing clarity and maintainability.
- Adjusted AUTH challenge logic to focus solely on blacklisted IPs, streamlining connection management.
This commit is contained in:
2025-10-21 16:31:28 +01:00
parent 95c6082564
commit 15e2988222
5 changed files with 72 additions and 115 deletions

View File

@@ -16,12 +16,6 @@ import (
)
func (l *Listener) HandleMessage(msg []byte, remote string) {
// Ignore all messages from self-connections
if l.isSelfConnection {
log.D.F("ignoring message from self-connection %s", remote)
return
}
// Handle blacklisted IPs - discard messages but keep connection open until timeout
if l.isBlacklisted {
// Check if timeout has been reached

View File

@@ -88,12 +88,6 @@ whitelist:
startTime: time.Now(),
}
// Detect self-connections early to avoid sending AUTH challenges
listener.isSelfConnection = s.isSelfConnection(remote)
if listener.isSelfConnection {
log.W.F("detected self-connection from %s, marking connection", remote)
}
// Check for blacklisted IPs
listener.isBlacklisted = s.isIPBlacklisted(remote)
if listener.isBlacklisted {
@@ -103,7 +97,7 @@ whitelist:
chal := make([]byte, 32)
rand.Read(chal)
listener.challenge.Store([]byte(hex.Enc(chal)))
if s.Config.ACLMode != "none" && !listener.isSelfConnection {
if s.Config.ACLMode != "none" {
log.D.F("sending AUTH challenge to %s", remote)
if err = authenvelope.NewChallengeWith(listener.challenge.Load()).
Write(listener); chk.E(err) {
@@ -111,8 +105,6 @@ whitelist:
return
}
log.D.F("AUTH challenge sent successfully to %s", remote)
} else if listener.isSelfConnection {
log.D.F("skipping AUTH challenge for self-connection from %s", remote)
}
ticker := time.NewTicker(DefaultPingWait)
go s.Pinger(ctx, conn, ticker, cancel)

View File

@@ -24,7 +24,6 @@ type Listener struct {
challenge atomic.Bytes
authedPubkey atomic.Bytes
startTime time.Time
isSelfConnection bool // Marker to identify self-connections
isBlacklisted bool // Marker to identify blacklisted IPs
blacklistTimeout time.Time // When to timeout blacklisted connections
// Diagnostics: per-connection counters

View File

@@ -49,33 +49,6 @@ type Server struct {
policyManager *policy.P
}
// isSelfConnection checks if the connection is coming from the relay itself
func (s *Server) isSelfConnection(remote string) bool {
// Check for localhost connections
if strings.HasPrefix(remote, "127.0.0.1:") || strings.HasPrefix(remote, "::1:") || strings.HasPrefix(remote, "[::1]:") {
return true
}
// Check for connections from the same IP as the server
// This handles cases where the relay connects to itself via its public IP
if s.Config.Listen != "" {
// Extract IP from listen address (e.g., "0.0.0.0" -> "0.0.0.0")
listenIP := s.Config.Listen
if listenIP == "0.0.0.0" || listenIP == "" {
// If listening on all interfaces, check if remote IP matches any local interface
// For now, we'll be conservative and only check localhost
return false
}
// Check if remote IP matches the listen IP
remoteIP := strings.Split(remote, ":")[0]
if remoteIP == listenIP {
return true
}
}
return false
}
// isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system
func (s *Server) isIPBlacklisted(remote string) bool {
// Extract IP from remote address (e.g., "192.168.1.1:12345" -> "192.168.1.1")

View File

@@ -396,88 +396,87 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
log.T.F("follows syncer: sent ping to %s", u)
continue
default:
}
// Set a read timeout to avoid hanging
readCtx, readCancel := context.WithTimeout(ctx, 60*time.Second)
_, data, err := c.Read(readCtx)
readCancel()
if err != nil {
_ = c.Close(websocket.StatusNormalClosure, "read err")
break
}
label, rem, err := envelopes.Identify(data)
if chk.E(err) {
continue
}
switch label {
case eventenvelope.L:
res, _, err := eventenvelope.ParseResult(rem)
if chk.E(err) || res == nil || res.Event == nil {
continue
}
// verify signature before saving
if ok, err := res.Event.Verify(); chk.T(err) || !ok {
// Set a read timeout to avoid hanging
readCtx, readCancel := context.WithTimeout(ctx, 60*time.Second)
_, data, err := c.Read(readCtx)
readCancel()
if err != nil {
_ = c.Close(websocket.StatusNormalClosure, "read err")
break
}
label, rem, err := envelopes.Identify(data)
if chk.E(err) {
continue
}
switch label {
case eventenvelope.L:
res, _, err := eventenvelope.ParseResult(rem)
if chk.E(err) || res == nil || res.Event == nil {
continue
}
// verify signature before saving
if ok, err := res.Event.Verify(); chk.T(err) || !ok {
continue
}
// Process events based on kind
switch res.Event.Kind {
case kind.FollowList.K:
// Check if this is from an admin and process immediately
if f.isAdminPubkey(res.Event.Pubkey) {
log.I.F(
"follows syncer: received admin follow list from %s on relay %s - processing immediately",
hex.EncodeToString(res.Event.Pubkey), u,
)
f.extractFollowedPubkeys(res.Event)
} else {
// Process events based on kind
switch res.Event.Kind {
case kind.FollowList.K:
// Check if this is from an admin and process immediately
if f.isAdminPubkey(res.Event.Pubkey) {
log.I.F(
"follows syncer: received admin follow list from %s on relay %s - processing immediately",
hex.EncodeToString(res.Event.Pubkey), u,
)
f.extractFollowedPubkeys(res.Event)
} else {
log.T.F(
"follows syncer: received follow list from non-admin %s on relay %s - ignoring",
hex.EncodeToString(res.Event.Pubkey), u,
)
}
case kind.RelayListMetadata.K:
log.T.F(
"follows syncer: received follow list from non-admin %s on relay %s - ignoring",
"follows syncer: received kind 10002 (relay list) event from %s on relay %s",
hex.EncodeToString(res.Event.Pubkey), u,
)
default:
// Log all other events from followed users
log.T.F(
"follows syncer: received kind %d event from %s on relay %s",
res.Event.Kind,
hex.EncodeToString(res.Event.Pubkey), u,
)
}
case kind.RelayListMetadata.K:
log.T.F(
"follows syncer: received kind 10002 (relay list) event from %s on relay %s",
hex.EncodeToString(res.Event.Pubkey), u,
)
default:
// Log all other events from followed users
log.T.F(
"follows syncer: received kind %d event from %s on relay %s",
res.Event.Kind,
hex.EncodeToString(res.Event.Pubkey), u,
)
}
if _, err = f.D.SaveEvent(
ctx, res.Event,
); err != nil {
if !strings.HasPrefix(
err.Error(), "blocked:",
) {
log.W.F(
"follows syncer: save event failed: %v",
err,
)
if _, err = f.D.SaveEvent(
ctx, res.Event,
); err != nil {
if !strings.HasPrefix(
err.Error(), "blocked:",
) {
log.W.F(
"follows syncer: save event failed: %v",
err,
)
}
// ignore duplicates and continue
} else {
// Only dispatch if the event was newly saved (no error)
if f.pubs != nil {
go f.pubs.Deliver(res.Event)
}
// log.I.F(
// "saved new event from follows syncer: %0x",
// res.Event.ID,
// )
}
// ignore duplicates and continue
} else {
// Only dispatch if the event was newly saved (no error)
if f.pubs != nil {
go f.pubs.Deliver(res.Event)
}
// log.I.F(
// "saved new event from follows syncer: %0x",
// res.Event.ID,
// )
case eoseenvelope.L:
log.T.F("follows syncer: received EOSE from %s, continuing persistent subscription", u)
// Continue the subscription for new events
default:
// ignore other labels
}
case eoseenvelope.L:
log.T.F("follows syncer: received EOSE from %s, continuing persistent subscription", u)
// Continue the subscription for new events
default:
// ignore other labels
}
}
// Connection dropped, reconnect after delay