Introduce Ctx() for proper context management in Listener and replace direct context usage in HandleDelete with Ctx().

also introduce a 3 second timeout for websocket read failure
This commit is contained in:
2025-09-11 12:34:01 +01:00
parent bf7ca1da43
commit e2b7152221
6 changed files with 15022 additions and 13 deletions

View File

@@ -107,7 +107,7 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) (err error) {
string(at.DTag), ev.CreatedAt, env.E.CreatedAt, string(at.DTag), ev.CreatedAt, env.E.CreatedAt,
) )
if err = l.DeleteEventBySerial( if err = l.DeleteEventBySerial(
l.Ctx, s, ev, l.Ctx(), s, ev,
); chk.E(err) { ); chk.E(err) {
continue continue
} }
@@ -165,7 +165,7 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) (err error) {
"HandleDelete: deleting event %s by authorized user %s", "HandleDelete: deleting event %s by authorized user %s",
hex.Enc(ev.ID), hex.Enc(env.E.Pubkey), hex.Enc(ev.ID), hex.Enc(env.E.Pubkey),
) )
if err = l.DeleteEventBySerial(l.Ctx, s, ev); chk.E(err) { if err = l.DeleteEventBySerial(l.Ctx(), s, ev); chk.E(err) {
continue continue
} }
} }

View File

@@ -1,8 +1,10 @@
package app package app
import ( import (
"context"
"fmt" "fmt"
"strings" "strings"
"time"
acl "acl.orly" acl "acl.orly"
"encoders.orly/envelopes/authenvelope" "encoders.orly/envelopes/authenvelope"
@@ -127,9 +129,11 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
} }
} }
} }
// store the event // store the event - use a separate context to prevent cancellation issues
saveCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// log.I.F("saving event %0x, %s", env.E.ID, env.E.Serialize()) // log.I.F("saving event %0x, %s", env.E.ID, env.E.Serialize())
if _, _, err = l.SaveEvent(l.Ctx, env.E); err != nil { if _, _, err = l.SaveEvent(saveCtx, env.E); err != nil {
if strings.HasPrefix(err.Error(), "blocked:") { if strings.HasPrefix(err.Error(), "blocked:") {
errStr := err.Error()[len("blocked: "):len(err.Error())] errStr := err.Error()[len("blocked: "):len(err.Error())]
if err = Ok.Error( if err = Ok.Error(

View File

@@ -1,8 +1,10 @@
package app package app
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"time"
acl "acl.orly" acl "acl.orly"
"encoders.orly/envelopes/authenvelope" "encoders.orly/envelopes/authenvelope"
@@ -26,7 +28,7 @@ import (
) )
func (l *Listener) HandleReq(msg []byte) (err error) { func (l *Listener) HandleReq(msg []byte) (err error) {
log.T.F("HandleReq: from %s\n%s\n", l.remote, msg) log.T.F("HandleReq: START processing from %s\n%s\n", l.remote, msg)
var rem []byte var rem []byte
env := reqenvelope.New() env := reqenvelope.New()
if rem, err = env.Unmarshal(msg); chk.E(err) { if rem, err = env.Unmarshal(msg); chk.E(err) {
@@ -112,12 +114,18 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
continue continue
} }
} }
if events, err = l.QueryEvents(l.Ctx, f); chk.E(err) { // Use a separate context for QueryEvents to prevent cancellation issues
if errors.Is(err, badger.ErrDBClosed) { queryCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
return defer cancel()
} log.T.F("HandleReq: About to QueryEvents for %s, main context done: %v", l.remote, l.ctx.Err() != nil)
err = nil if events, err = l.QueryEvents(queryCtx, f); chk.E(err) {
} if errors.Is(err, badger.ErrDBClosed) {
return
}
log.T.F("HandleReq: QueryEvents error for %s: %v", l.remote, err)
err = nil
}
log.T.F("HandleReq: QueryEvents completed for %s, found %d events", l.remote, len(events))
} }
var tmp event.S var tmp event.S
privCheck: privCheck:
@@ -252,5 +260,6 @@ privCheck:
return return
} }
} }
log.T.F("HandleReq: COMPLETED processing from %s", l.remote)
return return
} }

View File

@@ -19,6 +19,7 @@ const (
DefaultWriteWait = 10 * time.Second DefaultWriteWait = 10 * time.Second
DefaultPongWait = 60 * time.Second DefaultPongWait = 60 * time.Second
DefaultPingWait = DefaultPongWait / 2 DefaultPingWait = DefaultPongWait / 2
DefaultReadTimeout = 3 * time.Second // Read timeout to detect stalled connections
DefaultMaxMessageSize = 1 * units.Mb DefaultMaxMessageSize = 1 * units.Mb
// CloseMessage denotes a close control message. The optional message // CloseMessage denotes a close control message. The optional message
@@ -96,12 +97,26 @@ whitelist:
var typ websocket.MessageType var typ websocket.MessageType
var msg []byte var msg []byte
log.T.F("waiting for message from %s", remote) log.T.F("waiting for message from %s", remote)
if typ, msg, err = conn.Read(ctx); err != nil {
// Create a read context with timeout to prevent indefinite blocking
readCtx, readCancel := context.WithTimeout(ctx, DefaultReadTimeout)
typ, msg, err = conn.Read(readCtx)
readCancel()
if err != nil {
if strings.Contains( if strings.Contains(
err.Error(), "use of closed network connection", err.Error(), "use of closed network connection",
) { ) {
return return
} }
// Handle timeout errors - occurs when client becomes unresponsive
if strings.Contains(err.Error(), "context deadline exceeded") {
log.T.F(
"connection from %s timed out after %v", remote,
DefaultReadTimeout,
)
return
}
// Handle EOF errors gracefully - these occur when client closes connection // Handle EOF errors gracefully - these occur when client closes connection
// or sends incomplete/malformed WebSocket frames // or sends incomplete/malformed WebSocket frames
if strings.Contains(err.Error(), "EOF") || if strings.Contains(err.Error(), "EOF") ||
@@ -116,7 +131,9 @@ whitelist:
websocket.StatusNoStatusRcvd, websocket.StatusNoStatusRcvd,
websocket.StatusAbnormalClosure, websocket.StatusAbnormalClosure,
websocket.StatusProtocolError: websocket.StatusProtocolError:
log.T.F("connection from %s closed with status: %v", remote, status) log.T.F(
"connection from %s closed with status: %v", remote, status,
)
default: default:
log.E.F("unexpected close error from %s: %v", remote, err) log.E.F("unexpected close error from %s: %v", remote, err)
} }
@@ -128,6 +145,7 @@ whitelist:
} }
continue continue
} }
log.T.F("received message from %s: %s", remote, string(msg))
go listener.HandleMessage(msg, remote) go listener.HandleMessage(msg, remote)
} }
} }

View File

@@ -22,6 +22,12 @@ type Listener struct {
authedPubkey atomic.Bytes authedPubkey atomic.Bytes
} }
// Ctx returns the listener's context, but creates a new context for each operation
// to prevent cancellation from affecting subsequent operations
func (l *Listener) Ctx() context.Context {
return l.ctx
}
func (l *Listener) Write(p []byte) (n int, err error) { func (l *Listener) Write(p []byte) (n int, err error) {
// Use a separate context with timeout for writes to prevent race conditions // Use a separate context with timeout for writes to prevent race conditions
// where the main connection context gets cancelled while writing events // where the main connection context gets cancelled while writing events

14972
stacktrace.txt Normal file

File diff suppressed because it is too large Load Diff