|
|
|
|
@@ -65,18 +65,17 @@ whitelist:
|
|
|
|
|
conn.SetReadLimit(DefaultMaxMessageSize)
|
|
|
|
|
defer conn.CloseNow()
|
|
|
|
|
listener := &Listener{
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
Server: s,
|
|
|
|
|
conn: conn,
|
|
|
|
|
remote: remote,
|
|
|
|
|
req: r,
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
Server: s,
|
|
|
|
|
conn: conn,
|
|
|
|
|
remote: remote,
|
|
|
|
|
req: r,
|
|
|
|
|
startTime: time.Now(),
|
|
|
|
|
}
|
|
|
|
|
chal := make([]byte, 32)
|
|
|
|
|
rand.Read(chal)
|
|
|
|
|
listener.challenge.Store([]byte(hex.Enc(chal)))
|
|
|
|
|
// If admins are configured, immediately prompt client to AUTH (NIP-42)
|
|
|
|
|
if len(s.Config.Admins) > 0 {
|
|
|
|
|
// log.D.F("sending initial AUTH challenge to %s", remote)
|
|
|
|
|
if s.Config.ACLMode != "none" {
|
|
|
|
|
log.D.F("sending AUTH challenge to %s", remote)
|
|
|
|
|
if err = authenvelope.NewChallengeWith(listener.challenge.Load()).
|
|
|
|
|
Write(listener); chk.E(err) {
|
|
|
|
|
@@ -89,20 +88,23 @@ whitelist:
|
|
|
|
|
go s.Pinger(ctx, conn, ticker, cancel)
|
|
|
|
|
defer func() {
|
|
|
|
|
log.D.F("closing websocket connection from %s", remote)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Cancel context and stop pinger
|
|
|
|
|
cancel()
|
|
|
|
|
ticker.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Cancel all subscriptions for this connection
|
|
|
|
|
log.D.F("cancelling subscriptions for %s", remote)
|
|
|
|
|
listener.publishers.Receive(&W{Cancel: true})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Log detailed connection statistics
|
|
|
|
|
log.D.F("ws connection closed %s: msgs=%d, REQs=%d, EVENTs=%d, duration=%v",
|
|
|
|
|
remote, listener.msgCount, listener.reqCount, listener.eventCount,
|
|
|
|
|
time.Since(time.Now())) // Note: This will be near-zero, would need start time tracked
|
|
|
|
|
|
|
|
|
|
dur := time.Since(listener.startTime)
|
|
|
|
|
log.D.F(
|
|
|
|
|
"ws connection closed %s: msgs=%d, REQs=%d, EVENTs=%d, duration=%v",
|
|
|
|
|
remote, listener.msgCount, listener.reqCount, listener.eventCount,
|
|
|
|
|
dur,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Log any remaining connection state
|
|
|
|
|
if listener.authedPubkey.Load() != nil {
|
|
|
|
|
log.D.F("ws connection %s was authenticated", remote)
|
|
|
|
|
@@ -118,7 +120,7 @@ whitelist:
|
|
|
|
|
}
|
|
|
|
|
var typ websocket.MessageType
|
|
|
|
|
var msg []byte
|
|
|
|
|
// log.T.F("waiting for message from %s", remote)
|
|
|
|
|
log.T.F("waiting for message from %s", remote)
|
|
|
|
|
|
|
|
|
|
// Block waiting for message; rely on pings and context cancellation to detect dead peers
|
|
|
|
|
typ, msg, err = conn.Read(ctx)
|
|
|
|
|
@@ -160,9 +162,15 @@ whitelist:
|
|
|
|
|
pongStart := time.Now()
|
|
|
|
|
if err = conn.Write(writeCtx, PongMessage, msg); chk.E(err) {
|
|
|
|
|
pongDuration := time.Since(pongStart)
|
|
|
|
|
log.E.F("failed to send PONG to %s after %v: %v", remote, pongDuration, err)
|
|
|
|
|
log.E.F(
|
|
|
|
|
"failed to send PONG to %s after %v: %v", remote,
|
|
|
|
|
pongDuration, err,
|
|
|
|
|
)
|
|
|
|
|
if writeCtx.Err() != nil {
|
|
|
|
|
log.E.F("PONG write timeout to %s after %v (limit=%v)", remote, pongDuration, DefaultWriteTimeout)
|
|
|
|
|
log.E.F(
|
|
|
|
|
"PONG write timeout to %s after %v (limit=%v)", remote,
|
|
|
|
|
pongDuration, DefaultWriteTimeout,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
writeCancel()
|
|
|
|
|
return
|
|
|
|
|
@@ -196,31 +204,37 @@ func (s *Server) Pinger(
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
pingCount++
|
|
|
|
|
log.D.F("sending PING #%d", pingCount)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Create a write context with timeout for ping operation
|
|
|
|
|
pingCtx, pingCancel := context.WithTimeout(ctx, DefaultWriteTimeout)
|
|
|
|
|
pingStart := time.Now()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if err = conn.Ping(pingCtx); err != nil {
|
|
|
|
|
pingDuration := time.Since(pingStart)
|
|
|
|
|
log.E.F("PING #%d FAILED after %v: %v", pingCount, pingDuration, err)
|
|
|
|
|
|
|
|
|
|
log.E.F(
|
|
|
|
|
"PING #%d FAILED after %v: %v", pingCount, pingDuration,
|
|
|
|
|
err,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if pingCtx.Err() != nil {
|
|
|
|
|
log.E.F("PING #%d timeout after %v (limit=%v)", pingCount, pingDuration, DefaultWriteTimeout)
|
|
|
|
|
log.E.F(
|
|
|
|
|
"PING #%d timeout after %v (limit=%v)", pingCount,
|
|
|
|
|
pingDuration, DefaultWriteTimeout,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
chk.E(err)
|
|
|
|
|
pingCancel()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pingDuration := time.Since(pingStart)
|
|
|
|
|
log.D.F("PING #%d sent successfully in %v", pingCount, pingDuration)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if pingDuration > time.Millisecond*100 {
|
|
|
|
|
log.D.F("SLOW PING #%d: %v (>100ms)", pingCount, pingDuration)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pingCancel()
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
log.D.F("pinger context cancelled after %d pings", pingCount)
|
|
|
|
|
|