From bf7ca1da43154c975e7bf67691d8aeaa2be421ba Mon Sep 17 00:00:00 2001 From: mleku Date: Thu, 11 Sep 2025 11:37:25 +0100 Subject: [PATCH] Improve logging consistency across the application, handle context cancellation during WebSocket writes, and introduce async ACL reconfiguration for admin events. --- app/handle-event.go | 14 ++- app/handle-message.go | 3 +- app/handle-req.go | 134 ++++++++++++++---------- app/handle-websocket.go | 12 ++- app/listener.go | 10 +- app/main.go | 3 + app/publisher.go | 85 ++++++++------- pkg/acl/follows.go | 2 +- pkg/database/get-indexes-from-filter.go | 6 +- pkg/database/indexes/types/pubhash.go | 7 +- pkg/database/save-event.go | 6 ++ pkg/encoders/event/canonical.go | 3 +- pkg/encoders/kind/kind.go | 2 +- 13 files changed, 176 insertions(+), 111 deletions(-) diff --git a/app/handle-event.go b/app/handle-event.go index 653254a..0473f51 100644 --- a/app/handle-event.go +++ b/app/handle-event.go @@ -99,7 +99,7 @@ func (l *Listener) HandleEvent(msg []byte) (err error) { return default: // user has write access or better, continue - log.D.F("user has %s access", accessLevel) + // log.D.F("user has %s access", accessLevel) } // if the event is a delete, process the delete if env.E.Kind == kind.EventDeletion.K { @@ -146,7 +146,8 @@ func (l *Listener) HandleEvent(msg []byte) (err error) { if err = Ok.Ok(l, env, ""); chk.E(err) { return } - defer l.publishers.Deliver(env.E) + // Deliver the event to subscribers immediately after sending OK response + l.publishers.Deliver(env.E) log.D.F("saved event %0x", env.E.ID) var isNewFromAdmin bool for _, admin := range l.Admins { @@ -156,11 +157,16 @@ func (l *Listener) HandleEvent(msg []byte) (err error) { } } if isNewFromAdmin { + log.I.F("new event from admin %0x", env.E.Pubkey) // if a follow list was saved, reconfigure ACLs now that it is persisted if env.E.Kind == kind.FollowList.K || env.E.Kind == kind.RelayListMetadata.K { - if err = acl.Registry.Configure(); chk.E(err) { - } + // Run ACL reconfiguration asynchronously to prevent blocking websocket operations + go func() { + if err := acl.Registry.Configure(); chk.E(err) { + log.E.F("failed to reconfigure ACL: %v", err) + } + }() } } return diff --git a/app/handle-message.go b/app/handle-message.go index 628d1a1..9ae36da 100644 --- a/app/handle-message.go +++ b/app/handle-message.go @@ -9,10 +9,11 @@ import ( "encoders.orly/envelopes/reqenvelope" "lol.mleku.dev/chk" "lol.mleku.dev/errorf" + "lol.mleku.dev/log" ) func (l *Listener) HandleMessage(msg []byte, remote string) { - // log.D.F("%s received message:\n%s", remote, msg) + log.D.F("%s received message:\n%s", remote, msg) var err error var t string var rem []byte diff --git a/app/handle-req.go b/app/handle-req.go index 770ce76..0d7d364 100644 --- a/app/handle-req.go +++ b/app/handle-req.go @@ -2,6 +2,7 @@ package app import ( "errors" + "fmt" acl "acl.orly" "encoders.orly/envelopes/authenvelope" @@ -25,7 +26,7 @@ import ( ) func (l *Listener) HandleReq(msg []byte) (err error) { - // log.T.F("HandleReq: from %s", l.remote) + log.T.F("HandleReq: from %s\n%s\n", l.remote, msg) var rem []byte env := reqenvelope.New() if rem, err = env.Unmarshal(msg); chk.E(err) { @@ -54,42 +55,58 @@ func (l *Listener) HandleReq(msg []byte) (err error) { return default: // user has read access or better, continue - // log.D.F("user has %s access", accessLevel) + log.D.F("user has %s access", accessLevel) } var events event.S for _, f := range *env.Filters { - // idsLen := 0; kindsLen := 0; authorsLen := 0; tagsLen := 0 - // if f != nil { - // if f.Ids != nil { idsLen = f.Ids.Len() } - // if f.Kinds != nil { kindsLen = f.Kinds.Len() } - // if f.Authors != nil { authorsLen = f.Authors.Len() } - // if f.Tags != nil { tagsLen = f.Tags.Len() } - // } - // log.T.F("REQ %s: filter summary ids=%d kinds=%d authors=%d tags=%d", env.Subscription, idsLen, kindsLen, authorsLen, tagsLen) - // if f != nil && f.Authors != nil && f.Authors.Len() > 0 { - // var authors []string - // for _, a := range f.Authors.T { authors = append(authors, hex.Enc(a)) } - // log.T.F("REQ %s: authors=%v", env.Subscription, authors) - // } - // if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 { - // log.T.F("REQ %s: kinds=%v", env.Subscription, f.Kinds.ToUint16()) - // } - // if f != nil && f.Ids != nil && f.Ids.Len() > 0 { - // var ids []string - // for _, id := range f.Ids.T { - // ids = append(ids, hex.Enc(id)) - // } - // var lim any - // if pointers.Present(f.Limit) { - // lim = *f.Limit - // } else { - // lim = nil - // } - // log.T.F( - // "REQ %s: ids filter count=%d ids=%v limit=%v", env.Subscription, - // f.Ids.Len(), ids, lim, - // ) - // } + idsLen := 0 + kindsLen := 0 + authorsLen := 0 + tagsLen := 0 + if f != nil { + if f.Ids != nil { + idsLen = f.Ids.Len() + } + if f.Kinds != nil { + kindsLen = f.Kinds.Len() + } + if f.Authors != nil { + authorsLen = f.Authors.Len() + } + if f.Tags != nil { + tagsLen = f.Tags.Len() + } + } + log.T.F( + "REQ %s: filter summary ids=%d kinds=%d authors=%d tags=%d", + env.Subscription, idsLen, kindsLen, authorsLen, tagsLen, + ) + if f != nil && f.Authors != nil && f.Authors.Len() > 0 { + var authors []string + for _, a := range f.Authors.T { + authors = append(authors, hex.Enc(a)) + } + log.T.F("REQ %s: authors=%v", env.Subscription, authors) + } + if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 { + log.T.F("REQ %s: kinds=%v", env.Subscription, f.Kinds.ToUint16()) + } + if f != nil && f.Ids != nil && f.Ids.Len() > 0 { + var ids []string + for _, id := range f.Ids.T { + ids = append(ids, hex.Enc(id)) + } + var lim any + if pointers.Present(f.Limit) { + lim = *f.Limit + } else { + lim = nil + } + log.T.F( + "REQ %s: ids filter count=%d ids=%v limit=%v", env.Subscription, + f.Ids.Len(), ids, lim, + ) + } if pointers.Present(f.Limit) { if *f.Limit == 0 { continue @@ -107,16 +124,16 @@ privCheck: for _, ev := range events { if kind.IsPrivileged(ev.Kind) && accessLevel != "admin" { // admins can see all events - // log.I.F("checking privileged event %s", ev.ID) + log.I.F("checking privileged event %s", ev.ID) pk := l.authedPubkey.Load() if pk == nil { continue } if utils.FastEqual(ev.Pubkey, pk) { - // log.I.F( - // "privileged event %s is for logged in pubkey %0x", ev.ID, - // pk, - // ) + log.I.F( + "privileged event %s is for logged in pubkey %0x", ev.ID, + pk, + ) tmp = append(tmp, ev) continue } @@ -127,10 +144,10 @@ privCheck: continue } if utils.FastEqual(pt, pk) { - // log.I.F( - // "privileged event %s is for logged in pubkey %0x", - // ev.ID, pk, - // ) + log.I.F( + "privileged event %s is for logged in pubkey %0x", + ev.ID, pk, + ) tmp = append(tmp, ev) continue privCheck } @@ -146,10 +163,15 @@ privCheck: events = tmp seen := make(map[string]struct{}) for _, ev := range events { - // log.T.F( - // "REQ %s: sending EVENT id=%s kind=%d", env.Subscription, - // hex.Enc(ev.ID), ev.Kind, - // ) + log.T.F( + "REQ %s: sending EVENT id=%s kind=%d", env.Subscription, + hex.Enc(ev.ID), ev.Kind, + ) + log.T.C( + func() string { + return fmt.Sprintf("event:\n%s\n", ev.Serialize()) + }, + ) var res *eventenvelope.Result if res, err = eventenvelope.NewResultWith( env.Subscription, ev, @@ -164,7 +186,7 @@ privCheck: } // write the EOSE to signal to the client that all events found have been // sent. - // log.T.F("sending EOSE to %s", l.remote) + log.T.F("sending EOSE to %s", l.remote) if err = eoseenvelope.NewFrom(env.Subscription). Write(l); chk.E(err) { return @@ -172,10 +194,10 @@ privCheck: // if the query was for just Ids, we know there can't be any more results, // so cancel the subscription. cancel := true - // log.T.F( - // "REQ %s: computing cancel/subscription; events_sent=%d", - // env.Subscription, len(events), - // ) + log.T.F( + "REQ %s: computing cancel/subscription; events_sent=%d", + env.Subscription, len(events), + ) var subbedFilters filter.S for _, f := range *env.Filters { if f.Ids.Len() < 1 { @@ -190,10 +212,10 @@ privCheck: } notFounds = append(notFounds, id) } - // log.T.F( - // "REQ %s: ids outstanding=%d of %d", env.Subscription, - // len(notFounds), f.Ids.Len(), - // ) + log.T.F( + "REQ %s: ids outstanding=%d of %d", env.Subscription, + len(notFounds), f.Ids.Len(), + ) // if all were found, don't add to subbedFilters if len(notFounds) == 0 { continue diff --git a/app/handle-websocket.go b/app/handle-websocket.go index 097e557..dba2cc8 100644 --- a/app/handle-websocket.go +++ b/app/handle-websocket.go @@ -95,13 +95,20 @@ whitelist: } var typ websocket.MessageType var msg []byte - // log.T.F("waiting for message from %s", remote) - if typ, msg, err = conn.Read(ctx); chk.E(err) { + log.T.F("waiting for message from %s", remote) + if typ, msg, err = conn.Read(ctx); err != nil { if strings.Contains( err.Error(), "use of closed network connection", ) { return } + // Handle EOF errors gracefully - these occur when client closes connection + // or sends incomplete/malformed WebSocket frames + if strings.Contains(err.Error(), "EOF") || + strings.Contains(err.Error(), "failed to read frame header") { + log.T.F("connection from %s closed: %v", remote, err) + return + } status := websocket.CloseStatus(err) switch status { case websocket.StatusNormalClosure, @@ -109,6 +116,7 @@ whitelist: websocket.StatusNoStatusRcvd, websocket.StatusAbnormalClosure, websocket.StatusProtocolError: + log.T.F("connection from %s closed with status: %v", remote, status) default: log.E.F("unexpected close error from %s: %v", remote, err) } diff --git a/app/listener.go b/app/listener.go index 67024f9..264b091 100644 --- a/app/listener.go +++ b/app/listener.go @@ -3,12 +3,15 @@ package app import ( "context" "net/http" + "time" "github.com/coder/websocket" "lol.mleku.dev/chk" "utils.orly/atomic" ) +const WriteTimeout = 10 * time.Second + type Listener struct { *Server conn *websocket.Conn @@ -20,7 +23,12 @@ type Listener struct { } func (l *Listener) Write(p []byte) (n int, err error) { - if err = l.conn.Write(l.ctx, websocket.MessageText, p); chk.E(err) { + // Use a separate context with timeout for writes to prevent race conditions + // where the main connection context gets cancelled while writing events + writeCtx, cancel := context.WithTimeout(context.Background(), WriteTimeout) + defer cancel() + + if err = l.conn.Write(writeCtx, websocket.MessageText, p); chk.E(err) { return } n = len(p) diff --git a/app/main.go b/app/main.go index 3343594..884fba8 100644 --- a/app/main.go +++ b/app/main.go @@ -28,6 +28,9 @@ func Run( var err error var adminKeys [][]byte for _, admin := range cfg.Admins { + if len(admin) == 0 { + continue + } var pk []byte if pk, err = bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(err) { continue diff --git a/app/publisher.go b/app/publisher.go index 8cdf669..d73b1b2 100644 --- a/app/publisher.go +++ b/app/publisher.go @@ -101,17 +101,17 @@ func (p *P) Receive(msg typer.T) { if m.Cancel { if m.Id == "" { p.removeSubscriber(m.Conn) - // log.D.F("removed listener %s", m.remote) + log.D.F("removed listener %s", m.remote) } else { p.removeSubscriberId(m.Conn, m.Id) - // log.D.C( - // func() string { - // return fmt.Sprintf( - // "removed subscription %s for %s", m.Id, - // m.remote, - // ) - // }, - // ) + log.D.C( + func() string { + return fmt.Sprintf( + "removed subscription %s for %s", m.Id, + m.remote, + ) + }, + ) } return } @@ -123,27 +123,27 @@ func (p *P) Receive(msg typer.T) { S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey, } p.Map[m.Conn] = subs - // log.D.C( - // func() string { - // return fmt.Sprintf( - // "created new subscription for %s, %s", - // m.remote, - // m.Filters.Marshal(nil), - // ) - // }, - // ) + log.D.C( + func() string { + return fmt.Sprintf( + "created new subscription for %s, %s", + m.remote, + m.Filters.Marshal(nil), + ) + }, + ) } else { subs[m.Id] = Subscription{ S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey, } - // log.D.C( - // func() string { - // return fmt.Sprintf( - // "added subscription %s for %s", m.Id, - // m.remote, - // ) - // }, - // ) + log.D.C( + func() string { + return fmt.Sprintf( + "added subscription %s for %s", m.Id, + m.remote, + ) + }, + ) } } } @@ -179,14 +179,16 @@ func (p *P) Deliver(ev *event.E) { } } p.Mx.RUnlock() - log.D.C( - func() string { - return fmt.Sprintf( - "delivering event %0x to websocket subscribers %d", ev.ID, - len(deliveries), - ) - }, - ) + if len(deliveries) > 0 { + log.D.C( + func() string { + return fmt.Sprintf( + "delivering event %0x to websocket subscribers %d", ev.ID, + len(deliveries), + ) + }, + ) + } for _, d := range deliveries { // If the event is privileged, enforce that the subscriber's authed pubkey matches // either the event pubkey or appears in any 'p' tag of the event. @@ -218,8 +220,15 @@ func (p *P) Deliver(ev *event.E) { if res, err = eventenvelope.NewResultWith(d.id, ev); chk.E(err) { continue } + // Use a separate context with timeout for writes to prevent race conditions + // where the publisher context gets cancelled while writing events + writeCtx, cancel := context.WithTimeout( + context.Background(), WriteTimeout, + ) + defer cancel() + if err = d.w.Write( - p.c, websocket.MessageText, res.Marshal(nil), + writeCtx, websocket.MessageText, res.Marshal(nil), ); chk.E(err) { // On error, remove the subscriber connection safely p.removeSubscriber(d.w) @@ -245,9 +254,9 @@ func (p *P) removeSubscriberId(ws *websocket.Conn, id string) { var subs map[string]Subscription var ok bool if subs, ok = p.Map[ws]; ok { - delete(p.Map[ws], id) - _ = subs - if len(subs) == 0 { + delete(subs, id) + // Check the actual map after deletion, not the original reference + if len(p.Map[ws]) == 0 { delete(p.Map, ws) } } diff --git a/pkg/acl/follows.go b/pkg/acl/follows.go index ba5169a..b8e9f70 100644 --- a/pkg/acl/follows.go +++ b/pkg/acl/follows.go @@ -282,7 +282,7 @@ func (f *Follows) startSubscriptions(ctx context.Context) { ctx, res.Event, ); err != nil { if !strings.HasPrefix( - err.Error(), "event already exists", + err.Error(), "blocked:", ) { log.W.F( "follows syncer: save event failed: %v", diff --git a/pkg/database/get-indexes-from-filter.go b/pkg/database/get-indexes-from-filter.go index 4811a97..2e6e2a4 100644 --- a/pkg/database/get-indexes-from-filter.go +++ b/pkg/database/get-indexes-from-filter.go @@ -9,6 +9,7 @@ import ( types2 "database.orly/indexes/types" "encoders.orly/filter" "lol.mleku.dev/chk" + "lol.mleku.dev/log" ) type Range struct { @@ -95,16 +96,13 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) { return } b := buf.Bytes() - // Create range that will match any serial value with this ID prefix end := make([]byte, len(b)) copy(end, b) - // Fill the end range with 0xff bytes to match all possible serial values for i := 0; i < 5; i++ { end = append(end, 0xff) } - r := Range{b, end} idxs = append(idxs, r) return @@ -241,6 +239,7 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) { for _, t := range *f.Tags { if t.Len() >= 2 && (len(t.Key()) == 1 || (len(t.Key()) == 2 && t.Key()[0] == '#')) { var p *types2.PubHash + log.I.S(author) if p, err = CreatePubHashFromData(author); chk.E(err) { return } @@ -363,6 +362,7 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) { if f.Authors != nil && f.Authors.Len() > 0 { for _, author := range f.Authors.T { var p *types2.PubHash + log.I.S(author) if p, err = CreatePubHashFromData(author); chk.E(err) { return } diff --git a/pkg/database/indexes/types/pubhash.go b/pkg/database/indexes/types/pubhash.go index 6d47871..1aacda9 100644 --- a/pkg/database/indexes/types/pubhash.go +++ b/pkg/database/indexes/types/pubhash.go @@ -15,10 +15,13 @@ const PubHashLen = 8 type PubHash struct{ val [PubHashLen]byte } func (ph *PubHash) FromPubkey(pk []byte) (err error) { + if len(pk) == 0 { + panic("nil pubkey") + } if len(pk) != schnorr.PubKeyBytesLen { err = errorf.E( - "invalid Pubkey length, got %d require %d", - len(pk), schnorr.PubKeyBytesLen, + "invalid Pubkey length, got %d require %d %0x", + len(pk), schnorr.PubKeyBytesLen, pk, ) return } diff --git a/pkg/database/save-event.go b/pkg/database/save-event.go index 6744d2c..548075c 100644 --- a/pkg/database/save-event.go +++ b/pkg/database/save-event.go @@ -3,6 +3,7 @@ package database import ( "bytes" "context" + "fmt" "strings" "database.orly/indexes" @@ -235,5 +236,10 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) { "total data written: %d bytes keys %d bytes values for event ID %s", kc, vc, hex.Enc(ev.ID), ) + log.T.C( + func() string { + return fmt.Sprintf("event:\n%s\n", ev.Serialize()) + }, + ) return } diff --git a/pkg/encoders/event/canonical.go b/pkg/encoders/event/canonical.go index 598d898..238d3d3 100644 --- a/pkg/encoders/event/canonical.go +++ b/pkg/encoders/event/canonical.go @@ -5,7 +5,6 @@ import ( "encoders.orly/hex" "encoders.orly/ints" "encoders.orly/text" - "lol.mleku.dev/log" ) // ToCanonical converts the event to the canonical encoding used to derive the @@ -23,7 +22,7 @@ func (ev *E) ToCanonical(dst []byte) (b []byte) { b = append(b, ',') b = text.AppendQuote(b, ev.Content, text.NostrEscape) b = append(b, ']') - log.D.F("canonical: %s", b) + // log.D.F("canonical: %s", b) return } diff --git a/pkg/encoders/kind/kind.go b/pkg/encoders/kind/kind.go index d456003..efe3f15 100644 --- a/pkg/encoders/kind/kind.go +++ b/pkg/encoders/kind/kind.go @@ -333,7 +333,7 @@ var ( CommunityDefinition = &K{34550} ACLEvent = &K{39998} // ParameterizedReplaceableEnd is an event type that... - ParameterizedReplaceableEnd = &K{39999} + ParameterizedReplaceableEnd = &K{40000} ) var MapMx sync.RWMutex