From 898aa0cb6329cdd77cab66f65903f50a2062dec1 Mon Sep 17 00:00:00 2001 From: mleku Date: Wed, 3 Sep 2025 00:10:36 +0100 Subject: [PATCH] Add context to `NewPublisher`, improve logging levels, dispatch events on publish, and refine envelope handling fixes a panic from the nil context --- app/handle-event.go | 1 + app/handle-req.go | 1 + app/handle-websocket.go | 2 +- app/main.go | 2 +- app/ok.go | 2 +- app/publisher.go | 18 ++++++------------ 6 files changed, 11 insertions(+), 15 deletions(-) diff --git a/app/handle-event.go b/app/handle-event.go index 70c9c6a..4b09079 100644 --- a/app/handle-event.go +++ b/app/handle-event.go @@ -57,6 +57,7 @@ func (l *Listener) HandleEvent(c context.Context, msg []byte) ( if _, _, err = l.SaveEvent(c, env.E, false, nil); chk.E(err) { return } + l.publishers.Deliver(env.E) // Send a success response storing if err = Ok.Ok(l, env, ""); chk.E(err) { return diff --git a/app/handle-req.go b/app/handle-req.go index 4d6a413..106cf85 100644 --- a/app/handle-req.go +++ b/app/handle-req.go @@ -60,6 +60,7 @@ func (l *Listener) HandleReq(c context.Context, msg []byte) ( } // write the EOSE to signal to the client that all events found have been // sent. + log.T.F("sending EOSE to %s", l.remote) if err = eoseenvelope.NewFrom(env.Subscription). Write(l); chk.E(err) { return diff --git a/app/handle-websocket.go b/app/handle-websocket.go index 327cc0e..0a66b1a 100644 --- a/app/handle-websocket.go +++ b/app/handle-websocket.go @@ -80,7 +80,7 @@ whitelist: } var typ websocket.MessageType var msg []byte - log.I.F("waiting for message from %s", remote) + log.T.F("waiting for message from %s", remote) if typ, msg, err = conn.Read(ctx); chk.E(err) { if strings.Contains( err.Error(), "use of closed network connection", diff --git a/app/main.go b/app/main.go index 76e8645..57bfcd9 100644 --- a/app/main.go +++ b/app/main.go @@ -28,7 +28,7 @@ func Run( Ctx: ctx, Config: cfg, D: db, - publishers: publish.New(NewPublisher()), + publishers: publish.New(NewPublisher(ctx)), } addr := fmt.Sprintf("%s:%d", cfg.Listen, cfg.Port) log.I.F("starting listener on http://%s", addr) diff --git a/app/ok.go b/app/ok.go index f7ab81f..5ef6ad4 100644 --- a/app/ok.go +++ b/app/ok.go @@ -40,7 +40,7 @@ var Ok = OKs{ params ...any, ) (err error) { return okenvelope.NewFrom( - env.Id(), true, nil, + env.Id(), true, []byte{}, ).Write(l) }, AuthRequired: func( diff --git a/app/publisher.go b/app/publisher.go index 40511de..5214e9f 100644 --- a/app/publisher.go +++ b/app/publisher.go @@ -63,8 +63,9 @@ type P struct { var _ publisher.I = &P{} -func NewPublisher() (publisher *P) { +func NewPublisher(c context.Context) (publisher *P) { return &P{ + c: c, Map: make(Map), } } @@ -113,7 +114,7 @@ func (p *P) Receive(msg typer.T) { subs = make(map[string]Subscription) subs[m.Id] = Subscription{S: m.Filters, remote: m.remote} p.Map[m.Conn] = subs - log.T.C( + log.D.C( func() string { return fmt.Sprintf( "created new subscription for %s, %s", @@ -124,7 +125,7 @@ func (p *P) Receive(msg typer.T) { ) } else { subs[m.Id] = Subscription{S: m.Filters, remote: m.remote} - log.T.C( + log.D.C( func() string { return fmt.Sprintf( "added subscription %s for %s", m.Id, @@ -151,7 +152,7 @@ func (p *P) Deliver(ev *event.E) { var err error p.Mx.Lock() defer p.Mx.Unlock() - log.T.C( + log.D.C( func() string { return fmt.Sprintf( "delivering event %0x to websocket subscribers %d", ev.ID, @@ -160,13 +161,6 @@ func (p *P) Deliver(ev *event.E) { }, ) for w, subs := range p.Map { - log.T.C( - func() string { - return fmt.Sprintf( - "%v %s", subs, - ) - }, - ) for id, subscriber := range subs { if !subscriber.Match(ev) { continue @@ -185,7 +179,7 @@ func (p *P) Deliver(ev *event.E) { ); chk.E(err) { continue } - log.T.C( + log.D.C( func() string { return fmt.Sprintf( "dispatched event %0x to subscription %s, %s",