Improve logging consistency across the application, handle context cancellation during WebSocket writes, and introduce async ACL reconfiguration for admin events.
This commit is contained in:
@@ -99,7 +99,7 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
|
||||
return
|
||||
default:
|
||||
// user has write access or better, continue
|
||||
log.D.F("user has %s access", accessLevel)
|
||||
// log.D.F("user has %s access", accessLevel)
|
||||
}
|
||||
// if the event is a delete, process the delete
|
||||
if env.E.Kind == kind.EventDeletion.K {
|
||||
@@ -146,7 +146,8 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
|
||||
if err = Ok.Ok(l, env, ""); chk.E(err) {
|
||||
return
|
||||
}
|
||||
defer l.publishers.Deliver(env.E)
|
||||
// Deliver the event to subscribers immediately after sending OK response
|
||||
l.publishers.Deliver(env.E)
|
||||
log.D.F("saved event %0x", env.E.ID)
|
||||
var isNewFromAdmin bool
|
||||
for _, admin := range l.Admins {
|
||||
@@ -156,11 +157,16 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
|
||||
}
|
||||
}
|
||||
if isNewFromAdmin {
|
||||
log.I.F("new event from admin %0x", env.E.Pubkey)
|
||||
// if a follow list was saved, reconfigure ACLs now that it is persisted
|
||||
if env.E.Kind == kind.FollowList.K ||
|
||||
env.E.Kind == kind.RelayListMetadata.K {
|
||||
if err = acl.Registry.Configure(); chk.E(err) {
|
||||
}
|
||||
// Run ACL reconfiguration asynchronously to prevent blocking websocket operations
|
||||
go func() {
|
||||
if err := acl.Registry.Configure(); chk.E(err) {
|
||||
log.E.F("failed to reconfigure ACL: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
return
|
||||
|
||||
@@ -9,10 +9,11 @@ import (
|
||||
"encoders.orly/envelopes/reqenvelope"
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/errorf"
|
||||
"lol.mleku.dev/log"
|
||||
)
|
||||
|
||||
func (l *Listener) HandleMessage(msg []byte, remote string) {
|
||||
// log.D.F("%s received message:\n%s", remote, msg)
|
||||
log.D.F("%s received message:\n%s", remote, msg)
|
||||
var err error
|
||||
var t string
|
||||
var rem []byte
|
||||
|
||||
@@ -2,6 +2,7 @@ package app
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
acl "acl.orly"
|
||||
"encoders.orly/envelopes/authenvelope"
|
||||
@@ -25,7 +26,7 @@ import (
|
||||
)
|
||||
|
||||
func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||
// log.T.F("HandleReq: from %s", l.remote)
|
||||
log.T.F("HandleReq: from %s\n%s\n", l.remote, msg)
|
||||
var rem []byte
|
||||
env := reqenvelope.New()
|
||||
if rem, err = env.Unmarshal(msg); chk.E(err) {
|
||||
@@ -54,42 +55,58 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||
return
|
||||
default:
|
||||
// user has read access or better, continue
|
||||
// log.D.F("user has %s access", accessLevel)
|
||||
log.D.F("user has %s access", accessLevel)
|
||||
}
|
||||
var events event.S
|
||||
for _, f := range *env.Filters {
|
||||
// idsLen := 0; kindsLen := 0; authorsLen := 0; tagsLen := 0
|
||||
// if f != nil {
|
||||
// if f.Ids != nil { idsLen = f.Ids.Len() }
|
||||
// if f.Kinds != nil { kindsLen = f.Kinds.Len() }
|
||||
// if f.Authors != nil { authorsLen = f.Authors.Len() }
|
||||
// if f.Tags != nil { tagsLen = f.Tags.Len() }
|
||||
// }
|
||||
// log.T.F("REQ %s: filter summary ids=%d kinds=%d authors=%d tags=%d", env.Subscription, idsLen, kindsLen, authorsLen, tagsLen)
|
||||
// if f != nil && f.Authors != nil && f.Authors.Len() > 0 {
|
||||
// var authors []string
|
||||
// for _, a := range f.Authors.T { authors = append(authors, hex.Enc(a)) }
|
||||
// log.T.F("REQ %s: authors=%v", env.Subscription, authors)
|
||||
// }
|
||||
// if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 {
|
||||
// log.T.F("REQ %s: kinds=%v", env.Subscription, f.Kinds.ToUint16())
|
||||
// }
|
||||
// if f != nil && f.Ids != nil && f.Ids.Len() > 0 {
|
||||
// var ids []string
|
||||
// for _, id := range f.Ids.T {
|
||||
// ids = append(ids, hex.Enc(id))
|
||||
// }
|
||||
// var lim any
|
||||
// if pointers.Present(f.Limit) {
|
||||
// lim = *f.Limit
|
||||
// } else {
|
||||
// lim = nil
|
||||
// }
|
||||
// log.T.F(
|
||||
// "REQ %s: ids filter count=%d ids=%v limit=%v", env.Subscription,
|
||||
// f.Ids.Len(), ids, lim,
|
||||
// )
|
||||
// }
|
||||
idsLen := 0
|
||||
kindsLen := 0
|
||||
authorsLen := 0
|
||||
tagsLen := 0
|
||||
if f != nil {
|
||||
if f.Ids != nil {
|
||||
idsLen = f.Ids.Len()
|
||||
}
|
||||
if f.Kinds != nil {
|
||||
kindsLen = f.Kinds.Len()
|
||||
}
|
||||
if f.Authors != nil {
|
||||
authorsLen = f.Authors.Len()
|
||||
}
|
||||
if f.Tags != nil {
|
||||
tagsLen = f.Tags.Len()
|
||||
}
|
||||
}
|
||||
log.T.F(
|
||||
"REQ %s: filter summary ids=%d kinds=%d authors=%d tags=%d",
|
||||
env.Subscription, idsLen, kindsLen, authorsLen, tagsLen,
|
||||
)
|
||||
if f != nil && f.Authors != nil && f.Authors.Len() > 0 {
|
||||
var authors []string
|
||||
for _, a := range f.Authors.T {
|
||||
authors = append(authors, hex.Enc(a))
|
||||
}
|
||||
log.T.F("REQ %s: authors=%v", env.Subscription, authors)
|
||||
}
|
||||
if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 {
|
||||
log.T.F("REQ %s: kinds=%v", env.Subscription, f.Kinds.ToUint16())
|
||||
}
|
||||
if f != nil && f.Ids != nil && f.Ids.Len() > 0 {
|
||||
var ids []string
|
||||
for _, id := range f.Ids.T {
|
||||
ids = append(ids, hex.Enc(id))
|
||||
}
|
||||
var lim any
|
||||
if pointers.Present(f.Limit) {
|
||||
lim = *f.Limit
|
||||
} else {
|
||||
lim = nil
|
||||
}
|
||||
log.T.F(
|
||||
"REQ %s: ids filter count=%d ids=%v limit=%v", env.Subscription,
|
||||
f.Ids.Len(), ids, lim,
|
||||
)
|
||||
}
|
||||
if pointers.Present(f.Limit) {
|
||||
if *f.Limit == 0 {
|
||||
continue
|
||||
@@ -107,16 +124,16 @@ privCheck:
|
||||
for _, ev := range events {
|
||||
if kind.IsPrivileged(ev.Kind) &&
|
||||
accessLevel != "admin" { // admins can see all events
|
||||
// log.I.F("checking privileged event %s", ev.ID)
|
||||
log.I.F("checking privileged event %s", ev.ID)
|
||||
pk := l.authedPubkey.Load()
|
||||
if pk == nil {
|
||||
continue
|
||||
}
|
||||
if utils.FastEqual(ev.Pubkey, pk) {
|
||||
// log.I.F(
|
||||
// "privileged event %s is for logged in pubkey %0x", ev.ID,
|
||||
// pk,
|
||||
// )
|
||||
log.I.F(
|
||||
"privileged event %s is for logged in pubkey %0x", ev.ID,
|
||||
pk,
|
||||
)
|
||||
tmp = append(tmp, ev)
|
||||
continue
|
||||
}
|
||||
@@ -127,10 +144,10 @@ privCheck:
|
||||
continue
|
||||
}
|
||||
if utils.FastEqual(pt, pk) {
|
||||
// log.I.F(
|
||||
// "privileged event %s is for logged in pubkey %0x",
|
||||
// ev.ID, pk,
|
||||
// )
|
||||
log.I.F(
|
||||
"privileged event %s is for logged in pubkey %0x",
|
||||
ev.ID, pk,
|
||||
)
|
||||
tmp = append(tmp, ev)
|
||||
continue privCheck
|
||||
}
|
||||
@@ -146,10 +163,15 @@ privCheck:
|
||||
events = tmp
|
||||
seen := make(map[string]struct{})
|
||||
for _, ev := range events {
|
||||
// log.T.F(
|
||||
// "REQ %s: sending EVENT id=%s kind=%d", env.Subscription,
|
||||
// hex.Enc(ev.ID), ev.Kind,
|
||||
// )
|
||||
log.T.F(
|
||||
"REQ %s: sending EVENT id=%s kind=%d", env.Subscription,
|
||||
hex.Enc(ev.ID), ev.Kind,
|
||||
)
|
||||
log.T.C(
|
||||
func() string {
|
||||
return fmt.Sprintf("event:\n%s\n", ev.Serialize())
|
||||
},
|
||||
)
|
||||
var res *eventenvelope.Result
|
||||
if res, err = eventenvelope.NewResultWith(
|
||||
env.Subscription, ev,
|
||||
@@ -164,7 +186,7 @@ privCheck:
|
||||
}
|
||||
// write the EOSE to signal to the client that all events found have been
|
||||
// sent.
|
||||
// log.T.F("sending EOSE to %s", l.remote)
|
||||
log.T.F("sending EOSE to %s", l.remote)
|
||||
if err = eoseenvelope.NewFrom(env.Subscription).
|
||||
Write(l); chk.E(err) {
|
||||
return
|
||||
@@ -172,10 +194,10 @@ privCheck:
|
||||
// if the query was for just Ids, we know there can't be any more results,
|
||||
// so cancel the subscription.
|
||||
cancel := true
|
||||
// log.T.F(
|
||||
// "REQ %s: computing cancel/subscription; events_sent=%d",
|
||||
// env.Subscription, len(events),
|
||||
// )
|
||||
log.T.F(
|
||||
"REQ %s: computing cancel/subscription; events_sent=%d",
|
||||
env.Subscription, len(events),
|
||||
)
|
||||
var subbedFilters filter.S
|
||||
for _, f := range *env.Filters {
|
||||
if f.Ids.Len() < 1 {
|
||||
@@ -190,10 +212,10 @@ privCheck:
|
||||
}
|
||||
notFounds = append(notFounds, id)
|
||||
}
|
||||
// log.T.F(
|
||||
// "REQ %s: ids outstanding=%d of %d", env.Subscription,
|
||||
// len(notFounds), f.Ids.Len(),
|
||||
// )
|
||||
log.T.F(
|
||||
"REQ %s: ids outstanding=%d of %d", env.Subscription,
|
||||
len(notFounds), f.Ids.Len(),
|
||||
)
|
||||
// if all were found, don't add to subbedFilters
|
||||
if len(notFounds) == 0 {
|
||||
continue
|
||||
|
||||
@@ -95,13 +95,20 @@ whitelist:
|
||||
}
|
||||
var typ websocket.MessageType
|
||||
var msg []byte
|
||||
// log.T.F("waiting for message from %s", remote)
|
||||
if typ, msg, err = conn.Read(ctx); chk.E(err) {
|
||||
log.T.F("waiting for message from %s", remote)
|
||||
if typ, msg, err = conn.Read(ctx); err != nil {
|
||||
if strings.Contains(
|
||||
err.Error(), "use of closed network connection",
|
||||
) {
|
||||
return
|
||||
}
|
||||
// Handle EOF errors gracefully - these occur when client closes connection
|
||||
// or sends incomplete/malformed WebSocket frames
|
||||
if strings.Contains(err.Error(), "EOF") ||
|
||||
strings.Contains(err.Error(), "failed to read frame header") {
|
||||
log.T.F("connection from %s closed: %v", remote, err)
|
||||
return
|
||||
}
|
||||
status := websocket.CloseStatus(err)
|
||||
switch status {
|
||||
case websocket.StatusNormalClosure,
|
||||
@@ -109,6 +116,7 @@ whitelist:
|
||||
websocket.StatusNoStatusRcvd,
|
||||
websocket.StatusAbnormalClosure,
|
||||
websocket.StatusProtocolError:
|
||||
log.T.F("connection from %s closed with status: %v", remote, status)
|
||||
default:
|
||||
log.E.F("unexpected close error from %s: %v", remote, err)
|
||||
}
|
||||
|
||||
@@ -3,12 +3,15 @@ package app
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/coder/websocket"
|
||||
"lol.mleku.dev/chk"
|
||||
"utils.orly/atomic"
|
||||
)
|
||||
|
||||
const WriteTimeout = 10 * time.Second
|
||||
|
||||
type Listener struct {
|
||||
*Server
|
||||
conn *websocket.Conn
|
||||
@@ -20,7 +23,12 @@ type Listener struct {
|
||||
}
|
||||
|
||||
func (l *Listener) Write(p []byte) (n int, err error) {
|
||||
if err = l.conn.Write(l.ctx, websocket.MessageText, p); chk.E(err) {
|
||||
// Use a separate context with timeout for writes to prevent race conditions
|
||||
// where the main connection context gets cancelled while writing events
|
||||
writeCtx, cancel := context.WithTimeout(context.Background(), WriteTimeout)
|
||||
defer cancel()
|
||||
|
||||
if err = l.conn.Write(writeCtx, websocket.MessageText, p); chk.E(err) {
|
||||
return
|
||||
}
|
||||
n = len(p)
|
||||
|
||||
@@ -28,6 +28,9 @@ func Run(
|
||||
var err error
|
||||
var adminKeys [][]byte
|
||||
for _, admin := range cfg.Admins {
|
||||
if len(admin) == 0 {
|
||||
continue
|
||||
}
|
||||
var pk []byte
|
||||
if pk, err = bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(err) {
|
||||
continue
|
||||
|
||||
@@ -101,17 +101,17 @@ func (p *P) Receive(msg typer.T) {
|
||||
if m.Cancel {
|
||||
if m.Id == "" {
|
||||
p.removeSubscriber(m.Conn)
|
||||
// log.D.F("removed listener %s", m.remote)
|
||||
log.D.F("removed listener %s", m.remote)
|
||||
} else {
|
||||
p.removeSubscriberId(m.Conn, m.Id)
|
||||
// log.D.C(
|
||||
// func() string {
|
||||
// return fmt.Sprintf(
|
||||
// "removed subscription %s for %s", m.Id,
|
||||
// m.remote,
|
||||
// )
|
||||
// },
|
||||
// )
|
||||
log.D.C(
|
||||
func() string {
|
||||
return fmt.Sprintf(
|
||||
"removed subscription %s for %s", m.Id,
|
||||
m.remote,
|
||||
)
|
||||
},
|
||||
)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -123,27 +123,27 @@ func (p *P) Receive(msg typer.T) {
|
||||
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
|
||||
}
|
||||
p.Map[m.Conn] = subs
|
||||
// log.D.C(
|
||||
// func() string {
|
||||
// return fmt.Sprintf(
|
||||
// "created new subscription for %s, %s",
|
||||
// m.remote,
|
||||
// m.Filters.Marshal(nil),
|
||||
// )
|
||||
// },
|
||||
// )
|
||||
log.D.C(
|
||||
func() string {
|
||||
return fmt.Sprintf(
|
||||
"created new subscription for %s, %s",
|
||||
m.remote,
|
||||
m.Filters.Marshal(nil),
|
||||
)
|
||||
},
|
||||
)
|
||||
} else {
|
||||
subs[m.Id] = Subscription{
|
||||
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
|
||||
}
|
||||
// log.D.C(
|
||||
// func() string {
|
||||
// return fmt.Sprintf(
|
||||
// "added subscription %s for %s", m.Id,
|
||||
// m.remote,
|
||||
// )
|
||||
// },
|
||||
// )
|
||||
log.D.C(
|
||||
func() string {
|
||||
return fmt.Sprintf(
|
||||
"added subscription %s for %s", m.Id,
|
||||
m.remote,
|
||||
)
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -179,14 +179,16 @@ func (p *P) Deliver(ev *event.E) {
|
||||
}
|
||||
}
|
||||
p.Mx.RUnlock()
|
||||
log.D.C(
|
||||
func() string {
|
||||
return fmt.Sprintf(
|
||||
"delivering event %0x to websocket subscribers %d", ev.ID,
|
||||
len(deliveries),
|
||||
)
|
||||
},
|
||||
)
|
||||
if len(deliveries) > 0 {
|
||||
log.D.C(
|
||||
func() string {
|
||||
return fmt.Sprintf(
|
||||
"delivering event %0x to websocket subscribers %d", ev.ID,
|
||||
len(deliveries),
|
||||
)
|
||||
},
|
||||
)
|
||||
}
|
||||
for _, d := range deliveries {
|
||||
// If the event is privileged, enforce that the subscriber's authed pubkey matches
|
||||
// either the event pubkey or appears in any 'p' tag of the event.
|
||||
@@ -218,8 +220,15 @@ func (p *P) Deliver(ev *event.E) {
|
||||
if res, err = eventenvelope.NewResultWith(d.id, ev); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
// Use a separate context with timeout for writes to prevent race conditions
|
||||
// where the publisher context gets cancelled while writing events
|
||||
writeCtx, cancel := context.WithTimeout(
|
||||
context.Background(), WriteTimeout,
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
if err = d.w.Write(
|
||||
p.c, websocket.MessageText, res.Marshal(nil),
|
||||
writeCtx, websocket.MessageText, res.Marshal(nil),
|
||||
); chk.E(err) {
|
||||
// On error, remove the subscriber connection safely
|
||||
p.removeSubscriber(d.w)
|
||||
@@ -245,9 +254,9 @@ func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
|
||||
var subs map[string]Subscription
|
||||
var ok bool
|
||||
if subs, ok = p.Map[ws]; ok {
|
||||
delete(p.Map[ws], id)
|
||||
_ = subs
|
||||
if len(subs) == 0 {
|
||||
delete(subs, id)
|
||||
// Check the actual map after deletion, not the original reference
|
||||
if len(p.Map[ws]) == 0 {
|
||||
delete(p.Map, ws)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user