Improve logging and handling for WebSocket message processing, delivery, and diagnostics.
Some checks failed
Go / build (push) Has been cancelled

- Enhanced logging for WebSocket writes, message handling, and delivery timing.
- Added diagnostics for slow deliveries, failures, and context timeouts.
- Incorporated extensive error handling for malformed messages and client notifications.
- Enabled command results and refined subscription management.
- Introduced detailed connection state tracking and metrics for messages, requests, and events.
- Added new `run-market-probe.sh` script for relay testing and Market seeding.
This commit is contained in:
2025-10-01 08:27:22 +01:00
parent f6054f3c37
commit 48c7fab795
13 changed files with 900 additions and 96 deletions

View File

@@ -14,39 +14,65 @@ import (
)
func (l *Listener) HandleMessage(msg []byte, remote string) {
// log.D.F("%s received message:\n%s", remote, msg)
msgPreview := string(msg)
if len(msgPreview) > 150 {
msgPreview = msgPreview[:150] + "..."
}
log.D.F("%s processing message (len=%d): %s", remote, len(msg), msgPreview)
l.msgCount++
var err error
var t string
var rem []byte
if t, rem, err = envelopes.Identify(msg); !chk.E(err) {
switch t {
case eventenvelope.L:
// log.D.F("eventenvelope: %s %s", remote, rem)
err = l.HandleEvent(rem)
case reqenvelope.L:
// log.D.F("reqenvelope: %s %s", remote, rem)
err = l.HandleReq(rem)
case closeenvelope.L:
// log.D.F("closeenvelope: %s %s", remote, rem)
err = l.HandleClose(rem)
case authenvelope.L:
// log.D.F("authenvelope: %s %s", remote, rem)
err = l.HandleAuth(rem)
default:
err = fmt.Errorf("unknown envelope type %s\n%s", t, rem)
// Attempt to identify the envelope type
if t, rem, err = envelopes.Identify(msg); err != nil {
log.E.F("%s envelope identification FAILED (len=%d): %v", remote, len(msg), err)
log.D.F("%s malformed message content: %q", remote, msgPreview)
chk.E(err)
// Send error notice to client
if noticeErr := noticeenvelope.NewFrom("malformed message: " + err.Error()).Write(l); noticeErr != nil {
log.E.F("%s failed to send malformed message notice: %v", remote, noticeErr)
}
return
}
log.D.F("%s identified envelope type: %s (payload_len=%d)", remote, t, len(rem))
// Process the identified envelope type
switch t {
case eventenvelope.L:
log.D.F("%s processing EVENT envelope", remote)
l.eventCount++
err = l.HandleEvent(rem)
case reqenvelope.L:
log.D.F("%s processing REQ envelope", remote)
l.reqCount++
err = l.HandleReq(rem)
case closeenvelope.L:
log.D.F("%s processing CLOSE envelope", remote)
err = l.HandleClose(rem)
case authenvelope.L:
log.D.F("%s processing AUTH envelope", remote)
err = l.HandleAuth(rem)
default:
err = fmt.Errorf("unknown envelope type %s", t)
log.E.F("%s unknown envelope type: %s (payload: %q)", remote, t, string(rem))
}
// Handle any processing errors
if err != nil {
log.D.C(
func() string {
return fmt.Sprintf(
"notice->%s %s", remote, err,
)
},
)
if err = noticeenvelope.NewFrom(err.Error()).Write(l); err != nil {
log.E.F("%s message processing FAILED (type=%s): %v", remote, t, err)
log.D.F("%s error context - original message: %q", remote, msgPreview)
// Send error notice to client
noticeMsg := fmt.Sprintf("%s: %s", t, err.Error())
if noticeErr := noticeenvelope.NewFrom(noticeMsg).Write(l); noticeErr != nil {
log.E.F("%s failed to send error notice after %s processing failure: %v", remote, t, noticeErr)
return
}
log.D.F("%s sent error notice for %s processing failure", remote, t)
} else {
log.D.F("%s message processing SUCCESS (type=%s)", remote, t)
}
}

View File

@@ -41,7 +41,7 @@ func (s *Server) HandleRelayInfo(w http.ResponseWriter, r *http.Request) {
relayinfo.GenericTagQueries,
// relayinfo.NostrMarketplace,
relayinfo.EventTreatment,
// relayinfo.CommandResults,
relayinfo.CommandResults,
relayinfo.ParameterizedReplaceableEvents,
relayinfo.ExpirationTimestamp,
relayinfo.ProtectedEvents,
@@ -57,7 +57,7 @@ func (s *Server) HandleRelayInfo(w http.ResponseWriter, r *http.Request) {
relayinfo.GenericTagQueries,
// relayinfo.NostrMarketplace,
relayinfo.EventTreatment,
// relayinfo.CommandResults,
relayinfo.CommandResults,
relayinfo.ParameterizedReplaceableEvents,
relayinfo.ExpirationTimestamp,
relayinfo.ProtectedEvents,

View File

@@ -16,7 +16,6 @@ import (
"next.orly.dev/pkg/encoders/envelopes/closedenvelope"
"next.orly.dev/pkg/encoders/envelopes/eoseenvelope"
"next.orly.dev/pkg/encoders/envelopes/eventenvelope"
"next.orly.dev/pkg/encoders/envelopes/okenvelope"
"next.orly.dev/pkg/encoders/envelopes/reqenvelope"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/filter"
@@ -30,12 +29,13 @@ import (
)
func (l *Listener) HandleReq(msg []byte) (err error) {
// log.T.F("HandleReq: START processing from %s\n%s\n", l.remote, msg)
log.D.F("HandleReq: START processing from %s", l.remote)
// var rem []byte
env := reqenvelope.New()
if _, err = env.Unmarshal(msg); chk.E(err) {
return normalize.Error.Errorf(err.Error())
}
log.D.C(func() string { return fmt.Sprintf("REQ sub=%s filters=%d", env.Subscription, len(*env.Filters)) })
// send a challenge to the client to auth if an ACL is active
if acl.Registry.Active.Load() != "none" {
if err = authenvelope.NewChallengeWith(l.challenge.Load()).
@@ -47,8 +47,9 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load(), l.remote)
switch accessLevel {
case "none":
if err = okenvelope.NewFrom(
env.Subscription, false,
// For REQ denial, send a CLOSED with auth-required reason (NIP-01)
if err = closedenvelope.NewFrom(
env.Subscription,
reason.AuthRequired.F("user not authed or has no read access"),
).Write(l); chk.E(err) {
return
@@ -58,35 +59,74 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
// user has read access or better, continue
}
var events event.S
// Create a single context for all filter queries, tied to the connection context, to prevent leaks and support timely cancellation
queryCtx, queryCancel := context.WithTimeout(
l.ctx, 30*time.Second,
)
defer queryCancel()
// Collect all events from all filters
var allEvents event.S
for _, f := range *env.Filters {
if f != nil && f.Authors != nil && f.Authors.Len() > 0 {
var authors []string
for _, a := range f.Authors.T {
authors = append(authors, hex.Enc(a))
if f != nil {
// Summarize filter details for diagnostics (avoid internal fields)
var kindsLen int
if f.Kinds != nil {
kindsLen = f.Kinds.Len()
}
var authorsLen int
if f.Authors != nil {
authorsLen = f.Authors.Len()
}
var idsLen int
if f.Ids != nil {
idsLen = f.Ids.Len()
}
var dtag string
if f.Tags != nil {
if d := f.Tags.GetFirst([]byte("d")); d != nil {
dtag = string(d.Value())
}
}
var lim any
if f.Limit != nil {
lim = *f.Limit
}
var since any
if f.Since != nil {
since = f.Since.Int()
}
var until any
if f.Until != nil {
until = f.Until.Int()
}
log.D.C(func() string {
return fmt.Sprintf("REQ %s filter: kinds.len=%d authors.len=%d ids.len=%d d=%q limit=%v since=%v until=%v", env.Subscription, kindsLen, authorsLen, idsLen, dtag, lim, since, until)
})
}
if f != nil && pointers.Present(f.Limit) {
if *f.Limit == 0 {
continue
}
}
// Use a separate context for QueryEvents to prevent cancellation issues
queryCtx, cancel := context.WithTimeout(
context.Background(), 30*time.Second,
)
defer cancel()
if events, err = l.QueryEvents(queryCtx, f); chk.E(err) {
var filterEvents event.S
if filterEvents, err = l.QueryEvents(queryCtx, f); chk.E(err) {
if errors.Is(err, badger.ErrDBClosed) {
return
}
log.E.F("QueryEvents failed for filter: %v", err)
err = nil
continue
}
defer func() {
for _, ev := range events {
ev.Free()
}
}()
// Append events from this filter to the overall collection
allEvents = append(allEvents, filterEvents...)
}
events = allEvents
defer func() {
for _, ev := range events {
ev.Free()
}
}()
var tmp event.S
privCheck:
for _, ev := range events {
@@ -216,7 +256,7 @@ privCheck:
}
// write the EOSE to signal to the client that all events found have been
// sent.
log.T.F("sending EOSE to %s", l.remote)
log.D.F("sending EOSE to %s", l.remote)
if err = eoseenvelope.NewFrom(env.Subscription).
Write(l); chk.E(err) {
return
@@ -224,7 +264,7 @@ privCheck:
// if the query was for just Ids, we know there can't be any more results,
// so cancel the subscription.
cancel := true
log.T.F(
log.D.F(
"REQ %s: computing cancel/subscription; events_sent=%d",
env.Subscription, len(events),
)
@@ -257,8 +297,8 @@ privCheck:
}
// also, if we received the limit number of events, subscription ded
if pointers.Present(f.Limit) {
if len(events) < int(*f.Limit) {
cancel = false
if len(events) >= int(*f.Limit) {
cancel = true
}
}
}
@@ -276,12 +316,8 @@ privCheck:
},
)
} else {
if err = closedenvelope.NewFrom(
env.Subscription, nil,
).Write(l); chk.E(err) {
return
}
// suppress server-sent CLOSED; client will close subscription if desired
}
log.T.F("HandleReq: COMPLETED processing from %s", l.remote)
log.D.F("HandleReq: COMPLETED processing from %s", l.remote)
return
}

View File

@@ -58,8 +58,10 @@ whitelist:
if conn, err = websocket.Accept(
w, r, &websocket.AcceptOptions{OriginPatterns: []string{"*"}},
); chk.E(err) {
log.E.F("websocket accept failed from %s: %v", remote, err)
return
}
log.T.F("websocket accepted from %s path=%s", remote, r.URL.String())
conn.SetReadLimit(DefaultMaxMessageSize)
defer conn.CloseNow()
listener := &Listener{
@@ -75,18 +77,38 @@ whitelist:
// If admins are configured, immediately prompt client to AUTH (NIP-42)
if len(s.Config.Admins) > 0 {
// log.D.F("sending initial AUTH challenge to %s", remote)
log.D.F("sending AUTH challenge to %s", remote)
if err = authenvelope.NewChallengeWith(listener.challenge.Load()).
Write(listener); chk.E(err) {
log.E.F("failed to send AUTH challenge to %s: %v", remote, err)
return
}
log.D.F("AUTH challenge sent successfully to %s", remote)
}
ticker := time.NewTicker(DefaultPingWait)
go s.Pinger(ctx, conn, ticker, cancel)
defer func() {
// log.D.F("closing websocket connection from %s", remote)
log.D.F("closing websocket connection from %s", remote)
// Cancel context and stop pinger
cancel()
ticker.Stop()
// Cancel all subscriptions for this connection
log.D.F("cancelling subscriptions for %s", remote)
listener.publishers.Receive(&W{Cancel: true})
// Log detailed connection statistics
log.D.F("ws connection closed %s: msgs=%d, REQs=%d, EVENTs=%d, duration=%v",
remote, listener.msgCount, listener.reqCount, listener.eventCount,
time.Since(time.Now())) // Note: This will be near-zero, would need start time tracked
// Log any remaining connection state
if listener.authedPubkey.Load() != nil {
log.D.F("ws connection %s was authenticated", remote)
} else {
log.D.F("ws connection %s was not authenticated", remote)
}
}()
for {
select {
@@ -130,14 +152,26 @@ whitelist:
return
}
if typ == PingMessage {
log.D.F("received PING from %s, sending PONG", remote)
// Create a write context with timeout for pong response
writeCtx, writeCancel := context.WithTimeout(
ctx, DefaultWriteTimeout,
)
pongStart := time.Now()
if err = conn.Write(writeCtx, PongMessage, msg); chk.E(err) {
pongDuration := time.Since(pongStart)
log.E.F("failed to send PONG to %s after %v: %v", remote, pongDuration, err)
if writeCtx.Err() != nil {
log.E.F("PONG write timeout to %s after %v (limit=%v)", remote, pongDuration, DefaultWriteTimeout)
}
writeCancel()
return
}
pongDuration := time.Since(pongStart)
log.D.F("sent PONG to %s successfully in %v", remote, pongDuration)
if pongDuration > time.Millisecond*50 {
log.D.F("SLOW PONG to %s: %v (>50ms)", remote, pongDuration)
}
writeCancel()
continue
}
@@ -151,21 +185,45 @@ func (s *Server) Pinger(
cancel context.CancelFunc,
) {
defer func() {
log.D.F("pinger shutting down")
cancel()
ticker.Stop()
}()
var err error
pingCount := 0
for {
select {
case <-ticker.C:
pingCount++
log.D.F("sending PING #%d", pingCount)
// Create a write context with timeout for ping operation
pingCtx, pingCancel := context.WithTimeout(ctx, DefaultWriteTimeout)
if err = conn.Ping(pingCtx); chk.E(err) {
pingStart := time.Now()
if err = conn.Ping(pingCtx); err != nil {
pingDuration := time.Since(pingStart)
log.E.F("PING #%d FAILED after %v: %v", pingCount, pingDuration, err)
if pingCtx.Err() != nil {
log.E.F("PING #%d timeout after %v (limit=%v)", pingCount, pingDuration, DefaultWriteTimeout)
}
chk.E(err)
pingCancel()
return
}
pingDuration := time.Since(pingStart)
log.D.F("PING #%d sent successfully in %v", pingCount, pingDuration)
if pingDuration > time.Millisecond*100 {
log.D.F("SLOW PING #%d: %v (>100ms)", pingCount, pingDuration)
}
pingCancel()
case <-ctx.Done():
log.D.F("pinger context cancelled after %d pings", pingCount)
return
}
}

View File

@@ -3,9 +3,11 @@ package app
import (
"context"
"net/http"
"time"
"github.com/coder/websocket"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/utils/atomic"
)
@@ -17,6 +19,10 @@ type Listener struct {
req *http.Request
challenge atomic.Bytes
authedPubkey atomic.Bytes
// Diagnostics: per-connection counters
msgCount int
reqCount int
eventCount int
}
// Ctx returns the listener's context, but creates a new context for each operation
@@ -26,6 +32,16 @@ func (l *Listener) Ctx() context.Context {
}
func (l *Listener) Write(p []byte) (n int, err error) {
start := time.Now()
msgLen := len(p)
// Log message attempt with content preview (first 200 chars for diagnostics)
preview := string(p)
if len(preview) > 200 {
preview = preview[:200] + "..."
}
log.D.F("ws->%s attempting write: len=%d preview=%q", l.remote, msgLen, preview)
// Use a separate context with timeout for writes to prevent race conditions
// where the main connection context gets cancelled while writing events
writeCtx, cancel := context.WithTimeout(
@@ -33,9 +49,42 @@ func (l *Listener) Write(p []byte) (n int, err error) {
)
defer cancel()
if err = l.conn.Write(writeCtx, websocket.MessageText, p); chk.E(err) {
// Attempt the write operation
writeStart := time.Now()
if err = l.conn.Write(writeCtx, websocket.MessageText, p); err != nil {
writeDuration := time.Since(writeStart)
totalDuration := time.Since(start)
// Log detailed failure information
log.E.F("ws->%s WRITE FAILED: len=%d duration=%v write_duration=%v error=%v preview=%q",
l.remote, msgLen, totalDuration, writeDuration, err, preview)
// Check if this is a context timeout
if writeCtx.Err() != nil {
log.E.F("ws->%s write timeout after %v (limit=%v)", l.remote, writeDuration, DefaultWriteTimeout)
}
// Check connection state
if l.conn != nil {
log.D.F("ws->%s connection state during failure: remote_addr=%v", l.remote, l.req.RemoteAddr)
}
chk.E(err) // Still call the original error handler
return
}
n = len(p)
// Log successful write with timing
writeDuration := time.Since(writeStart)
totalDuration := time.Since(start)
n = msgLen
log.D.F("ws->%s WRITE SUCCESS: len=%d duration=%v write_duration=%v",
l.remote, n, totalDuration, writeDuration)
// Log slow writes for performance diagnostics
if writeDuration > time.Millisecond*100 {
log.D.F("ws->%s SLOW WRITE detected: %v (>100ms) len=%d", l.remote, writeDuration, n)
}
return
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/coder/websocket"
"lol.mleku.dev/chk"
@@ -210,39 +211,68 @@ func (p *P) Deliver(ev *event.E) {
break
}
}
}
if !allowed {
// Skip delivery for this subscriber
continue
}
}
var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith(d.id, ev); chk.E(err) {
continue
}
// Use a separate context with timeout for writes to prevent race conditions
// where the publisher context gets cancelled while writing events
writeCtx, cancel := context.WithTimeout(
context.Background(), DefaultWriteTimeout,
)
defer cancel()
}
if !allowed {
log.D.F("subscription delivery DENIED for privileged event %s to %s (auth mismatch)",
hex.Enc(ev.ID), d.sub.remote)
// Skip delivery for this subscriber
continue
}
}
var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith(d.id, ev); chk.E(err) {
log.E.F("failed to create event envelope for %s to %s: %v",
hex.Enc(ev.ID), d.sub.remote, err)
continue
}
// Log delivery attempt
msgData := res.Marshal(nil)
log.D.F("attempting delivery of event %s (kind=%d, len=%d) to subscription %s @ %s",
hex.Enc(ev.ID), ev.Kind, len(msgData), d.id, d.sub.remote)
// Use a separate context with timeout for writes to prevent race conditions
// where the publisher context gets cancelled while writing events
writeCtx, cancel := context.WithTimeout(
context.Background(), DefaultWriteTimeout,
)
defer cancel()
if err = d.w.Write(
writeCtx, websocket.MessageText, res.Marshal(nil),
); err != nil {
// On error, remove the subscriber connection safely
p.removeSubscriber(d.w)
_ = d.w.CloseNow()
continue
}
log.D.C(
func() string {
return fmt.Sprintf(
"dispatched event %0x to subscription %s, %s",
ev.ID, d.id, d.sub.remote,
)
},
)
deliveryStart := time.Now()
if err = d.w.Write(
writeCtx, websocket.MessageText, msgData,
); err != nil {
deliveryDuration := time.Since(deliveryStart)
// Log detailed failure information
log.E.F("subscription delivery FAILED: event=%s to=%s sub=%s duration=%v error=%v",
hex.Enc(ev.ID), d.sub.remote, d.id, deliveryDuration, err)
// Check for timeout specifically
if writeCtx.Err() != nil {
log.E.F("subscription delivery TIMEOUT: event=%s to=%s after %v (limit=%v)",
hex.Enc(ev.ID), d.sub.remote, deliveryDuration, DefaultWriteTimeout)
}
// Log connection cleanup
log.D.F("removing failed subscriber connection: %s", d.sub.remote)
// On error, remove the subscriber connection safely
p.removeSubscriber(d.w)
_ = d.w.CloseNow()
continue
}
deliveryDuration := time.Since(deliveryStart)
log.D.F("subscription delivery SUCCESS: event=%s to=%s sub=%s duration=%v len=%d",
hex.Enc(ev.ID), d.sub.remote, d.id, deliveryDuration, len(msgData))
// Log slow deliveries for performance monitoring
if deliveryDuration > time.Millisecond*50 {
log.D.F("SLOW subscription delivery: event=%s to=%s duration=%v (>50ms)",
hex.Enc(ev.ID), d.sub.remote, deliveryDuration)
}
}
}