From 5d04afd748d944a3fa9810973b6ffbc4f81323d1 Mon Sep 17 00:00:00 2001 From: mleku Date: Tue, 2 Sep 2025 23:01:13 +0100 Subject: [PATCH] Remove unused `eventpool` package, improve logging levels, standardize websocket handling, and add `HandleClose` functionality. --- app/handle-close.go | 36 +++++++++++++++++++++ app/handle-event.go | 2 +- app/handle-message.go | 1 + app/handle-websocket.go | 33 ++++++++++++------- app/main.go | 8 +++-- app/publisher.go | 4 +-- app/server.go | 7 +++- cmd/eventpool/eventpool.go | 65 -------------------------------------- 8 files changed, 73 insertions(+), 83 deletions(-) delete mode 100644 cmd/eventpool/eventpool.go diff --git a/app/handle-close.go b/app/handle-close.go index 4879f7a..a55cd19 100644 --- a/app/handle-close.go +++ b/app/handle-close.go @@ -1 +1,37 @@ package app + +import ( + "errors" + + "encoders.orly/envelopes/closeenvelope" + "lol.mleku.dev/chk" + "lol.mleku.dev/log" +) + +// HandleClose processes a CLOSE envelope by unmarshalling the request, +// validates the presence of an field, and signals cancellation for +// the associated listener through the server's publisher mechanism. +func (l *Listener) HandleClose( + req []byte, +) (err error) { + var rem []byte + env := closeenvelope.New() + if rem, err = env.Unmarshal(req); chk.E(err) { + return + } + if len(rem) > 0 { + log.I.F("extra '%s'", rem) + } + if len(env.ID) == 0 { + return errors.New("CLOSE has no ") + } + l.publishers.Receive( + &W{ + Cancel: true, + remote: l.remote, + Conn: l.conn, + Id: string(env.ID), + }, + ) + return +} diff --git a/app/handle-event.go b/app/handle-event.go index d57d294..70c9c6a 100644 --- a/app/handle-event.go +++ b/app/handle-event.go @@ -57,7 +57,7 @@ func (l *Listener) HandleEvent(c context.Context, msg []byte) ( if _, _, err = l.SaveEvent(c, env.E, false, nil); chk.E(err) { return } - // Send a success response after storing + // Send a success response storing if err = Ok.Ok(l, env, ""); chk.E(err) { return } diff --git a/app/handle-message.go b/app/handle-message.go index fca7099..7fc7afe 100644 --- a/app/handle-message.go +++ b/app/handle-message.go @@ -35,6 +35,7 @@ func (l *Listener) HandleMessage(msg []byte, remote string) { err = l.HandleReq(l.ctx, rem) case closeenvelope.L: log.D.F("closeenvelope: %s", rem) + err = l.HandleClose(rem) case authenvelope.L: log.D.F("authenvelope: %s", rem) default: diff --git a/app/handle-websocket.go b/app/handle-websocket.go index 0cbe9d8..327cc0e 100644 --- a/app/handle-websocket.go +++ b/app/handle-websocket.go @@ -9,10 +9,15 @@ import ( "github.com/coder/websocket" "lol.mleku.dev/chk" "lol.mleku.dev/log" - "protocol.orly/publish" + "utils.orly/units" ) const ( + DefaultWriteWait = 10 * time.Second + DefaultPongWait = 60 * time.Second + DefaultPingWait = DefaultPongWait / 2 + DefaultMaxMessageSize = 1 * units.Mb + // CloseMessage denotes a close control message. The optional message // payload contains a numeric code and text. Use the FormatCloseMessage // function to format a close message payload. @@ -42,8 +47,7 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { return } whitelist: - var cancel context.CancelFunc - s.Ctx, cancel = context.WithCancel(s.Ctx) + ctx, cancel := context.WithCancel(s.Ctx) defer cancel() var err error var conn *websocket.Conn @@ -52,24 +56,32 @@ whitelist: ); chk.E(err) { return } + conn.SetReadLimit(DefaultMaxMessageSize) defer conn.CloseNow() listener := &Listener{ - ctx: s.Ctx, + ctx: ctx, Server: s, conn: conn, remote: remote, } - listener.publishers = publish.New(NewPublisher()) - go s.Pinger(s.Ctx, conn, time.NewTicker(time.Second*10), cancel) + ticker := time.NewTicker(DefaultPingWait) + go s.Pinger(ctx, conn, ticker, cancel) + defer func() { + log.D.F("closing websocket connection from %s", remote) + cancel() + ticker.Stop() + listener.publishers.Receive(&W{Cancel: true}) + }() for { select { - case <-s.Ctx.Done(): + case <-ctx.Done(): return default: } var typ websocket.MessageType var msg []byte - if typ, msg, err = conn.Read(s.Ctx); err != nil { + log.I.F("waiting for message from %s", remote) + if typ, msg, err = conn.Read(ctx); chk.E(err) { if strings.Contains( err.Error(), "use of closed network connection", ) { @@ -88,7 +100,7 @@ whitelist: return } if typ == PingMessage { - if err = conn.Write(s.Ctx, PongMessage, msg); chk.E(err) { + if err = conn.Write(ctx, PongMessage, msg); chk.E(err) { return } continue @@ -109,8 +121,7 @@ func (s *Server) Pinger( for { select { case <-ticker.C: - if err = conn.Write(ctx, PingMessage, nil); err != nil { - log.E.F("error writing ping: %v; closing websocket", err) + if err = conn.Ping(ctx); chk.E(err) { return } case <-ctx.Done(): diff --git a/app/main.go b/app/main.go index 05ff2ec..76e8645 100644 --- a/app/main.go +++ b/app/main.go @@ -9,6 +9,7 @@ import ( "lol.mleku.dev/chk" "lol.mleku.dev/log" "next.orly.dev/app/config" + "protocol.orly/publish" ) func Run( @@ -24,9 +25,10 @@ func Run( }() // start listener l := &Server{ - Ctx: ctx, - Config: cfg, - D: db, + Ctx: ctx, + Config: cfg, + D: db, + publishers: publish.New(NewPublisher()), } addr := fmt.Sprintf("%s:%d", cfg.Listen, cfg.Port) log.I.F("starting listener on http://%s", addr) diff --git a/app/publisher.go b/app/publisher.go index d1631eb..40511de 100644 --- a/app/publisher.go +++ b/app/publisher.go @@ -93,10 +93,10 @@ func (p *P) Receive(msg typer.T) { if m.Cancel { if m.Id == "" { p.removeSubscriber(m.Conn) - log.T.F("removed listener %s", m.remote) + log.D.F("removed listener %s", m.remote) } else { p.removeSubscriberId(m.Conn, m.Id) - log.T.C( + log.D.C( func() string { return fmt.Sprintf( "removed subscription %s for %s", m.Id, diff --git a/app/server.go b/app/server.go index 2db5c7c..ef548ad 100644 --- a/app/server.go +++ b/app/server.go @@ -2,6 +2,7 @@ package app import ( "context" + "fmt" "net/http" "database.orly" @@ -20,7 +21,11 @@ type Server struct { } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - log.T.F("path %v header %v", r.URL, r.Header) + log.T.C( + func() string { + return fmt.Sprintf("path %v header %v", r.URL, r.Header) + }, + ) if r.Header.Get("Upgrade") == "websocket" { s.HandleWebsocket(w, r) } else if r.Header.Get("Accept") == "application/nostr+json" { diff --git a/cmd/eventpool/eventpool.go b/cmd/eventpool/eventpool.go deleted file mode 100644 index 7163542..0000000 --- a/cmd/eventpool/eventpool.go +++ /dev/null @@ -1,65 +0,0 @@ -package main - -import ( - "time" - - "encoders.orly/event" - "encoders.orly/hex" - "encoders.orly/json" - "encoders.orly/tag" - "github.com/pkg/profile" - lol "lol.mleku.dev" - "lol.mleku.dev/chk" - "lukechampine.com/frand" - "utils.orly" - "utils.orly/bufpool" -) - -func main() { - lol.SetLogLevel("info") - prof := profile.Start(profile.CPUProfile) - defer prof.Stop() - for range 1000000 { - ev := event.New() - ev.ID = frand.Bytes(32) - ev.Pubkey = frand.Bytes(32) - ev.CreatedAt = time.Now().Unix() - ev.Kind = 1 - ev.Tags = &tag.S{ - {T: [][]byte{[]byte("t"), []byte("hashtag")}}, - { - T: [][]byte{ - []byte("e"), - hex.EncAppend(nil, frand.Bytes(32)), - }, - }, - } - ev.Content = frand.Bytes(frand.Intn(1024) + 1) - ev.Sig = frand.Bytes(64) - // log.I.S(ev) - b, err := json.Marshal(ev) - if chk.E(err) { - return - } - var bc []byte - bc = append(bc, b...) - // log.I.F("%s", bc) - ev2 := event.New() - if err = json.Unmarshal(b, ev2); chk.E(err) { - return - } - var b2 []byte - if b2, err = json.Marshal(ev); err != nil { - return - } - if !utils.FastEqual(bc, b2) { - return - } - // free up the resources for the next iteration - ev.Free() - ev2.Free() - bufpool.PutBytes(b) - bufpool.PutBytes(b2) - bufpool.PutBytes(bc) - } -}