Add connection start time tracking and improve logging for WebSocket lifecycle
Some checks failed
Go / build (push) Has been cancelled
Some checks failed
Go / build (push) Has been cancelled
- Introduced `startTime` field in `Listener` to log connection duration. - Enhanced diagnostics with detailed connection statistics on close. - Improved logging and error handling for PING/PONG and message lifecycle. - Updated version to v0.8.7.
This commit is contained in:
@@ -65,18 +65,17 @@ whitelist:
|
|||||||
conn.SetReadLimit(DefaultMaxMessageSize)
|
conn.SetReadLimit(DefaultMaxMessageSize)
|
||||||
defer conn.CloseNow()
|
defer conn.CloseNow()
|
||||||
listener := &Listener{
|
listener := &Listener{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
Server: s,
|
Server: s,
|
||||||
conn: conn,
|
conn: conn,
|
||||||
remote: remote,
|
remote: remote,
|
||||||
req: r,
|
req: r,
|
||||||
|
startTime: time.Now(),
|
||||||
}
|
}
|
||||||
chal := make([]byte, 32)
|
chal := make([]byte, 32)
|
||||||
rand.Read(chal)
|
rand.Read(chal)
|
||||||
listener.challenge.Store([]byte(hex.Enc(chal)))
|
listener.challenge.Store([]byte(hex.Enc(chal)))
|
||||||
// If admins are configured, immediately prompt client to AUTH (NIP-42)
|
if s.Config.ACLMode != "none" {
|
||||||
if len(s.Config.Admins) > 0 {
|
|
||||||
// log.D.F("sending initial AUTH challenge to %s", remote)
|
|
||||||
log.D.F("sending AUTH challenge to %s", remote)
|
log.D.F("sending AUTH challenge to %s", remote)
|
||||||
if err = authenvelope.NewChallengeWith(listener.challenge.Load()).
|
if err = authenvelope.NewChallengeWith(listener.challenge.Load()).
|
||||||
Write(listener); chk.E(err) {
|
Write(listener); chk.E(err) {
|
||||||
@@ -89,20 +88,23 @@ whitelist:
|
|||||||
go s.Pinger(ctx, conn, ticker, cancel)
|
go s.Pinger(ctx, conn, ticker, cancel)
|
||||||
defer func() {
|
defer func() {
|
||||||
log.D.F("closing websocket connection from %s", remote)
|
log.D.F("closing websocket connection from %s", remote)
|
||||||
|
|
||||||
// Cancel context and stop pinger
|
// Cancel context and stop pinger
|
||||||
cancel()
|
cancel()
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
|
|
||||||
// Cancel all subscriptions for this connection
|
// Cancel all subscriptions for this connection
|
||||||
log.D.F("cancelling subscriptions for %s", remote)
|
log.D.F("cancelling subscriptions for %s", remote)
|
||||||
listener.publishers.Receive(&W{Cancel: true})
|
listener.publishers.Receive(&W{Cancel: true})
|
||||||
|
|
||||||
// Log detailed connection statistics
|
// Log detailed connection statistics
|
||||||
log.D.F("ws connection closed %s: msgs=%d, REQs=%d, EVENTs=%d, duration=%v",
|
dur := time.Since(listener.startTime)
|
||||||
remote, listener.msgCount, listener.reqCount, listener.eventCount,
|
log.D.F(
|
||||||
time.Since(time.Now())) // Note: This will be near-zero, would need start time tracked
|
"ws connection closed %s: msgs=%d, REQs=%d, EVENTs=%d, duration=%v",
|
||||||
|
remote, listener.msgCount, listener.reqCount, listener.eventCount,
|
||||||
|
dur,
|
||||||
|
)
|
||||||
|
|
||||||
// Log any remaining connection state
|
// Log any remaining connection state
|
||||||
if listener.authedPubkey.Load() != nil {
|
if listener.authedPubkey.Load() != nil {
|
||||||
log.D.F("ws connection %s was authenticated", remote)
|
log.D.F("ws connection %s was authenticated", remote)
|
||||||
@@ -118,7 +120,7 @@ whitelist:
|
|||||||
}
|
}
|
||||||
var typ websocket.MessageType
|
var typ websocket.MessageType
|
||||||
var msg []byte
|
var msg []byte
|
||||||
// log.T.F("waiting for message from %s", remote)
|
log.T.F("waiting for message from %s", remote)
|
||||||
|
|
||||||
// Block waiting for message; rely on pings and context cancellation to detect dead peers
|
// Block waiting for message; rely on pings and context cancellation to detect dead peers
|
||||||
typ, msg, err = conn.Read(ctx)
|
typ, msg, err = conn.Read(ctx)
|
||||||
@@ -160,9 +162,15 @@ whitelist:
|
|||||||
pongStart := time.Now()
|
pongStart := time.Now()
|
||||||
if err = conn.Write(writeCtx, PongMessage, msg); chk.E(err) {
|
if err = conn.Write(writeCtx, PongMessage, msg); chk.E(err) {
|
||||||
pongDuration := time.Since(pongStart)
|
pongDuration := time.Since(pongStart)
|
||||||
log.E.F("failed to send PONG to %s after %v: %v", remote, pongDuration, err)
|
log.E.F(
|
||||||
|
"failed to send PONG to %s after %v: %v", remote,
|
||||||
|
pongDuration, err,
|
||||||
|
)
|
||||||
if writeCtx.Err() != nil {
|
if writeCtx.Err() != nil {
|
||||||
log.E.F("PONG write timeout to %s after %v (limit=%v)", remote, pongDuration, DefaultWriteTimeout)
|
log.E.F(
|
||||||
|
"PONG write timeout to %s after %v (limit=%v)", remote,
|
||||||
|
pongDuration, DefaultWriteTimeout,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
writeCancel()
|
writeCancel()
|
||||||
return
|
return
|
||||||
@@ -196,31 +204,37 @@ func (s *Server) Pinger(
|
|||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
pingCount++
|
pingCount++
|
||||||
log.D.F("sending PING #%d", pingCount)
|
log.D.F("sending PING #%d", pingCount)
|
||||||
|
|
||||||
// Create a write context with timeout for ping operation
|
// Create a write context with timeout for ping operation
|
||||||
pingCtx, pingCancel := context.WithTimeout(ctx, DefaultWriteTimeout)
|
pingCtx, pingCancel := context.WithTimeout(ctx, DefaultWriteTimeout)
|
||||||
pingStart := time.Now()
|
pingStart := time.Now()
|
||||||
|
|
||||||
if err = conn.Ping(pingCtx); err != nil {
|
if err = conn.Ping(pingCtx); err != nil {
|
||||||
pingDuration := time.Since(pingStart)
|
pingDuration := time.Since(pingStart)
|
||||||
log.E.F("PING #%d FAILED after %v: %v", pingCount, pingDuration, err)
|
log.E.F(
|
||||||
|
"PING #%d FAILED after %v: %v", pingCount, pingDuration,
|
||||||
|
err,
|
||||||
|
)
|
||||||
|
|
||||||
if pingCtx.Err() != nil {
|
if pingCtx.Err() != nil {
|
||||||
log.E.F("PING #%d timeout after %v (limit=%v)", pingCount, pingDuration, DefaultWriteTimeout)
|
log.E.F(
|
||||||
|
"PING #%d timeout after %v (limit=%v)", pingCount,
|
||||||
|
pingDuration, DefaultWriteTimeout,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
chk.E(err)
|
chk.E(err)
|
||||||
pingCancel()
|
pingCancel()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
pingDuration := time.Since(pingStart)
|
pingDuration := time.Since(pingStart)
|
||||||
log.D.F("PING #%d sent successfully in %v", pingCount, pingDuration)
|
log.D.F("PING #%d sent successfully in %v", pingCount, pingDuration)
|
||||||
|
|
||||||
if pingDuration > time.Millisecond*100 {
|
if pingDuration > time.Millisecond*100 {
|
||||||
log.D.F("SLOW PING #%d: %v (>100ms)", pingCount, pingDuration)
|
log.D.F("SLOW PING #%d: %v (>100ms)", pingCount, pingDuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
pingCancel()
|
pingCancel()
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
log.D.F("pinger context cancelled after %d pings", pingCount)
|
log.D.F("pinger context cancelled after %d pings", pingCount)
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ type Listener struct {
|
|||||||
req *http.Request
|
req *http.Request
|
||||||
challenge atomic.Bytes
|
challenge atomic.Bytes
|
||||||
authedPubkey atomic.Bytes
|
authedPubkey atomic.Bytes
|
||||||
|
startTime time.Time
|
||||||
// Diagnostics: per-connection counters
|
// Diagnostics: per-connection counters
|
||||||
msgCount int
|
msgCount int
|
||||||
reqCount int
|
reqCount int
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
|
|
||||||
"lol.mleku.dev/chk"
|
"lol.mleku.dev/chk"
|
||||||
|
"lol.mleku.dev/log"
|
||||||
"next.orly.dev/pkg/utils"
|
"next.orly.dev/pkg/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -83,6 +84,10 @@ func (s *S) MarshalJSON() (b []byte, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *S) Marshal(dst []byte) (b []byte) {
|
func (s *S) Marshal(dst []byte) (b []byte) {
|
||||||
|
if s == nil {
|
||||||
|
log.I.F("tags cannot be used without initialization")
|
||||||
|
return
|
||||||
|
}
|
||||||
b = dst
|
b = dst
|
||||||
b = append(b, '[')
|
b = append(b, '[')
|
||||||
for i, ss := range *s {
|
for i, ss := range *s {
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
v0.8.6
|
v0.8.7
|
||||||
Reference in New Issue
Block a user