Enhance WebSocket connection management and error handling
- Set initial read deadline for connections to prevent premature timeouts on idle connections. - Improved pong and ping handlers to extend read deadlines and handle timeout errors more effectively. - Refined error logging for connection issues, distinguishing between timeouts and connection errors to enhance debugging. - Updated subscriber delivery logic to handle timeouts gracefully, allowing for potential recovery without immediate disconnection.
This commit is contained in:
@@ -71,6 +71,10 @@ whitelist:
|
||||
// Set read limit immediately after connection is established
|
||||
conn.SetReadLimit(DefaultMaxMessageSize)
|
||||
log.D.F("set read limit to %d bytes (%d MB) for %s", DefaultMaxMessageSize, DefaultMaxMessageSize/units.Mb, remote)
|
||||
|
||||
// Set initial read deadline - pong handler will extend it when pongs are received
|
||||
conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
|
||||
|
||||
defer conn.Close()
|
||||
listener := &Listener{
|
||||
ctx: ctx,
|
||||
@@ -100,12 +104,12 @@ whitelist:
|
||||
log.D.F("AUTH challenge sent successfully to %s", remote)
|
||||
}
|
||||
ticker := time.NewTicker(DefaultPingWait)
|
||||
// Set pong handler
|
||||
// Set pong handler - extends read deadline when pongs are received
|
||||
conn.SetPongHandler(func(string) error {
|
||||
conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
|
||||
return nil
|
||||
})
|
||||
// Set ping handler
|
||||
// Set ping handler - extends read deadline when pings are received
|
||||
conn.SetPingHandler(func(string) error {
|
||||
conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
|
||||
return conn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(DefaultWriteTimeout))
|
||||
@@ -159,14 +163,14 @@ whitelist:
|
||||
var msg []byte
|
||||
log.T.F("waiting for message from %s", remote)
|
||||
|
||||
// Set read deadline for context cancellation
|
||||
deadline := time.Now().Add(DefaultPongWait)
|
||||
// Don't set read deadline here - it's set initially and extended by pong handler
|
||||
// This prevents premature timeouts on idle connections with active subscriptions
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
conn.SetReadDeadline(deadline)
|
||||
|
||||
// Block waiting for message; rely on pings and context cancellation to detect dead peers
|
||||
// The read deadline is managed by the pong handler which extends it when pongs are received
|
||||
typ, msg, err = conn.ReadMessage()
|
||||
|
||||
if err != nil {
|
||||
@@ -187,6 +191,12 @@ whitelist:
|
||||
log.T.F("connection from %s closed: %v", remote, err)
|
||||
return
|
||||
}
|
||||
// Handle timeout errors specifically - these can occur on idle connections
|
||||
// but pongs should extend the deadline, so a timeout usually means dead connection
|
||||
if strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded") {
|
||||
log.T.F("connection from %s read timeout (likely dead connection): %v", remote, err)
|
||||
return
|
||||
}
|
||||
// Handle message too big errors specifically
|
||||
if strings.Contains(err.Error(), "message too large") ||
|
||||
strings.Contains(err.Error(), "read limited at") {
|
||||
@@ -216,13 +226,41 @@ whitelist:
|
||||
deadline := time.Now().Add(DefaultWriteTimeout)
|
||||
conn.SetWriteDeadline(deadline)
|
||||
pongStart := time.Now()
|
||||
if err = conn.WriteControl(websocket.PongMessage, msg, deadline); chk.E(err) {
|
||||
if err = conn.WriteControl(websocket.PongMessage, msg, deadline); err != nil {
|
||||
pongDuration := time.Since(pongStart)
|
||||
log.E.F(
|
||||
"failed to send PONG to %s after %v: %v", remote,
|
||||
pongDuration, err,
|
||||
)
|
||||
return
|
||||
|
||||
// Check if this is a timeout vs a connection error
|
||||
isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded")
|
||||
isConnectionError := strings.Contains(err.Error(), "use of closed network connection") ||
|
||||
strings.Contains(err.Error(), "broken pipe") ||
|
||||
strings.Contains(err.Error(), "connection reset") ||
|
||||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure,
|
||||
websocket.CloseGoingAway,
|
||||
websocket.CloseNoStatusReceived)
|
||||
|
||||
if isConnectionError {
|
||||
log.E.F(
|
||||
"failed to send PONG to %s after %v (connection error): %v", remote,
|
||||
pongDuration, err,
|
||||
)
|
||||
return
|
||||
} else if isTimeout {
|
||||
// Timeout on pong - log but don't close immediately
|
||||
// The read deadline will catch dead connections
|
||||
log.W.F(
|
||||
"failed to send PONG to %s after %v (timeout, but connection may still be alive): %v", remote,
|
||||
pongDuration, err,
|
||||
)
|
||||
// Continue - don't close connection on pong timeout
|
||||
} else {
|
||||
// Unknown error - log and continue
|
||||
log.E.F(
|
||||
"failed to send PONG to %s after %v (unknown error): %v", remote,
|
||||
pongDuration, err,
|
||||
)
|
||||
// Continue - don't close on unknown errors
|
||||
}
|
||||
continue
|
||||
}
|
||||
pongDuration := time.Since(pongStart)
|
||||
log.D.F("sent PONG to %s successfully in %v", remote, pongDuration)
|
||||
@@ -264,12 +302,40 @@ func (s *Server) Pinger(
|
||||
|
||||
if err = conn.WriteControl(websocket.PingMessage, []byte{}, deadline); err != nil {
|
||||
pingDuration := time.Since(pingStart)
|
||||
log.E.F(
|
||||
"PING #%d FAILED after %v: %v", pingCount, pingDuration,
|
||||
err,
|
||||
)
|
||||
chk.E(err)
|
||||
return
|
||||
|
||||
// Check if this is a timeout vs a connection error
|
||||
isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded")
|
||||
isConnectionError := strings.Contains(err.Error(), "use of closed network connection") ||
|
||||
strings.Contains(err.Error(), "broken pipe") ||
|
||||
strings.Contains(err.Error(), "connection reset") ||
|
||||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure,
|
||||
websocket.CloseGoingAway,
|
||||
websocket.CloseNoStatusReceived)
|
||||
|
||||
if isConnectionError {
|
||||
log.E.F(
|
||||
"PING #%d FAILED after %v (connection error): %v", pingCount, pingDuration,
|
||||
err,
|
||||
)
|
||||
chk.E(err)
|
||||
return
|
||||
} else if isTimeout {
|
||||
// Timeout on ping - log but don't stop pinger immediately
|
||||
// The read deadline will catch dead connections
|
||||
log.W.F(
|
||||
"PING #%d timeout after %v (connection may still be alive): %v", pingCount, pingDuration,
|
||||
err,
|
||||
)
|
||||
// Continue - don't stop pinger on timeout
|
||||
} else {
|
||||
// Unknown error - log and continue
|
||||
log.E.F(
|
||||
"PING #%d FAILED after %v (unknown error): %v", pingCount, pingDuration,
|
||||
err,
|
||||
)
|
||||
// Continue - don't stop pinger on unknown errors
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
pingDuration := time.Since(pingStart)
|
||||
|
||||
@@ -283,17 +283,36 @@ func (p *P) Deliver(ev *event.E) {
|
||||
hex.Enc(ev.ID), d.sub.remote, d.id, deliveryDuration, err)
|
||||
|
||||
// Check for timeout specifically
|
||||
if strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline") {
|
||||
isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded")
|
||||
if isTimeout {
|
||||
log.E.F("subscription delivery TIMEOUT: event=%s to=%s after %v (limit=%v)",
|
||||
hex.Enc(ev.ID), d.sub.remote, deliveryDuration, DefaultWriteTimeout)
|
||||
}
|
||||
|
||||
// Log connection cleanup
|
||||
log.D.F("removing failed subscriber connection: %s", d.sub.remote)
|
||||
// Only close connection on permanent errors, not transient timeouts
|
||||
// WebSocket write errors typically indicate connection issues, but we should
|
||||
// distinguish between timeouts (client might be slow) and connection errors
|
||||
isConnectionError := strings.Contains(err.Error(), "use of closed network connection") ||
|
||||
strings.Contains(err.Error(), "broken pipe") ||
|
||||
strings.Contains(err.Error(), "connection reset") ||
|
||||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure,
|
||||
websocket.CloseGoingAway,
|
||||
websocket.CloseNoStatusReceived)
|
||||
|
||||
// On error, remove the subscriber connection safely
|
||||
p.removeSubscriber(d.w)
|
||||
_ = d.w.Close()
|
||||
if isConnectionError {
|
||||
log.D.F("removing failed subscriber connection due to connection error: %s", d.sub.remote)
|
||||
p.removeSubscriber(d.w)
|
||||
_ = d.w.Close()
|
||||
} else if isTimeout {
|
||||
// For timeouts, log but don't immediately close - give it another chance
|
||||
// The read deadline will catch dead connections eventually
|
||||
log.W.F("subscription delivery timeout for %s (client may be slow), skipping event but keeping connection", d.sub.remote)
|
||||
} else {
|
||||
// Unknown error - be conservative and close
|
||||
log.D.F("removing failed subscriber connection due to unknown error: %s", d.sub.remote)
|
||||
p.removeSubscriber(d.w)
|
||||
_ = d.w.Close()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user