diff --git a/app/handle-websocket.go b/app/handle-websocket.go index f9f7fa1..96f677a 100644 --- a/app/handle-websocket.go +++ b/app/handle-websocket.go @@ -71,6 +71,10 @@ whitelist: // Set read limit immediately after connection is established conn.SetReadLimit(DefaultMaxMessageSize) log.D.F("set read limit to %d bytes (%d MB) for %s", DefaultMaxMessageSize, DefaultMaxMessageSize/units.Mb, remote) + + // Set initial read deadline - pong handler will extend it when pongs are received + conn.SetReadDeadline(time.Now().Add(DefaultPongWait)) + defer conn.Close() listener := &Listener{ ctx: ctx, @@ -100,12 +104,12 @@ whitelist: log.D.F("AUTH challenge sent successfully to %s", remote) } ticker := time.NewTicker(DefaultPingWait) - // Set pong handler + // Set pong handler - extends read deadline when pongs are received conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(DefaultPongWait)) return nil }) - // Set ping handler + // Set ping handler - extends read deadline when pings are received conn.SetPingHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(DefaultPongWait)) return conn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(DefaultWriteTimeout)) @@ -159,14 +163,14 @@ whitelist: var msg []byte log.T.F("waiting for message from %s", remote) - // Set read deadline for context cancellation - deadline := time.Now().Add(DefaultPongWait) + // Don't set read deadline here - it's set initially and extended by pong handler + // This prevents premature timeouts on idle connections with active subscriptions if ctx.Err() != nil { return } - conn.SetReadDeadline(deadline) // Block waiting for message; rely on pings and context cancellation to detect dead peers + // The read deadline is managed by the pong handler which extends it when pongs are received typ, msg, err = conn.ReadMessage() if err != nil { @@ -187,6 +191,12 @@ whitelist: log.T.F("connection from %s closed: %v", remote, err) return } + // Handle timeout errors specifically - these can occur on idle connections + // but pongs should extend the deadline, so a timeout usually means dead connection + if strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded") { + log.T.F("connection from %s read timeout (likely dead connection): %v", remote, err) + return + } // Handle message too big errors specifically if strings.Contains(err.Error(), "message too large") || strings.Contains(err.Error(), "read limited at") { @@ -216,13 +226,41 @@ whitelist: deadline := time.Now().Add(DefaultWriteTimeout) conn.SetWriteDeadline(deadline) pongStart := time.Now() - if err = conn.WriteControl(websocket.PongMessage, msg, deadline); chk.E(err) { + if err = conn.WriteControl(websocket.PongMessage, msg, deadline); err != nil { pongDuration := time.Since(pongStart) - log.E.F( - "failed to send PONG to %s after %v: %v", remote, - pongDuration, err, - ) - return + + // Check if this is a timeout vs a connection error + isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded") + isConnectionError := strings.Contains(err.Error(), "use of closed network connection") || + strings.Contains(err.Error(), "broken pipe") || + strings.Contains(err.Error(), "connection reset") || + websocket.IsCloseError(err, websocket.CloseAbnormalClosure, + websocket.CloseGoingAway, + websocket.CloseNoStatusReceived) + + if isConnectionError { + log.E.F( + "failed to send PONG to %s after %v (connection error): %v", remote, + pongDuration, err, + ) + return + } else if isTimeout { + // Timeout on pong - log but don't close immediately + // The read deadline will catch dead connections + log.W.F( + "failed to send PONG to %s after %v (timeout, but connection may still be alive): %v", remote, + pongDuration, err, + ) + // Continue - don't close connection on pong timeout + } else { + // Unknown error - log and continue + log.E.F( + "failed to send PONG to %s after %v (unknown error): %v", remote, + pongDuration, err, + ) + // Continue - don't close on unknown errors + } + continue } pongDuration := time.Since(pongStart) log.D.F("sent PONG to %s successfully in %v", remote, pongDuration) @@ -264,12 +302,40 @@ func (s *Server) Pinger( if err = conn.WriteControl(websocket.PingMessage, []byte{}, deadline); err != nil { pingDuration := time.Since(pingStart) - log.E.F( - "PING #%d FAILED after %v: %v", pingCount, pingDuration, - err, - ) - chk.E(err) - return + + // Check if this is a timeout vs a connection error + isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded") + isConnectionError := strings.Contains(err.Error(), "use of closed network connection") || + strings.Contains(err.Error(), "broken pipe") || + strings.Contains(err.Error(), "connection reset") || + websocket.IsCloseError(err, websocket.CloseAbnormalClosure, + websocket.CloseGoingAway, + websocket.CloseNoStatusReceived) + + if isConnectionError { + log.E.F( + "PING #%d FAILED after %v (connection error): %v", pingCount, pingDuration, + err, + ) + chk.E(err) + return + } else if isTimeout { + // Timeout on ping - log but don't stop pinger immediately + // The read deadline will catch dead connections + log.W.F( + "PING #%d timeout after %v (connection may still be alive): %v", pingCount, pingDuration, + err, + ) + // Continue - don't stop pinger on timeout + } else { + // Unknown error - log and continue + log.E.F( + "PING #%d FAILED after %v (unknown error): %v", pingCount, pingDuration, + err, + ) + // Continue - don't stop pinger on unknown errors + } + continue } pingDuration := time.Since(pingStart) diff --git a/app/publisher.go b/app/publisher.go index c9cd2ca..c8ea945 100644 --- a/app/publisher.go +++ b/app/publisher.go @@ -283,17 +283,36 @@ func (p *P) Deliver(ev *event.E) { hex.Enc(ev.ID), d.sub.remote, d.id, deliveryDuration, err) // Check for timeout specifically - if strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline") { + isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded") + if isTimeout { log.E.F("subscription delivery TIMEOUT: event=%s to=%s after %v (limit=%v)", hex.Enc(ev.ID), d.sub.remote, deliveryDuration, DefaultWriteTimeout) } - // Log connection cleanup - log.D.F("removing failed subscriber connection: %s", d.sub.remote) + // Only close connection on permanent errors, not transient timeouts + // WebSocket write errors typically indicate connection issues, but we should + // distinguish between timeouts (client might be slow) and connection errors + isConnectionError := strings.Contains(err.Error(), "use of closed network connection") || + strings.Contains(err.Error(), "broken pipe") || + strings.Contains(err.Error(), "connection reset") || + websocket.IsCloseError(err, websocket.CloseAbnormalClosure, + websocket.CloseGoingAway, + websocket.CloseNoStatusReceived) - // On error, remove the subscriber connection safely - p.removeSubscriber(d.w) - _ = d.w.Close() + if isConnectionError { + log.D.F("removing failed subscriber connection due to connection error: %s", d.sub.remote) + p.removeSubscriber(d.w) + _ = d.w.Close() + } else if isTimeout { + // For timeouts, log but don't immediately close - give it another chance + // The read deadline will catch dead connections eventually + log.W.F("subscription delivery timeout for %s (client may be slow), skipping event but keeping connection", d.sub.remote) + } else { + // Unknown error - be conservative and close + log.D.F("removing failed subscriber connection due to unknown error: %s", d.sub.remote) + p.removeSubscriber(d.w) + _ = d.w.Close() + } continue }