From 098595717f7e4fab4f10b2efbbece5f2e9ecfe51 Mon Sep 17 00:00:00 2001 From: mleku Date: Fri, 12 Sep 2025 16:36:22 +0100 Subject: [PATCH] Integrate ACL with publishers for background event dispatch, ensure proper buffer adjustments in event encoding, and enhance follows sync with event delivery logic. --- app/main.go | 5 +++++ pkg/acl/follows.go | 18 ++++++++++++++---- pkg/encoders/event/event.go | 2 +- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/app/main.go b/app/main.go index ee6caca..711236e 100644 --- a/app/main.go +++ b/app/main.go @@ -8,6 +8,7 @@ import ( "lol.mleku.dev/chk" "lol.mleku.dev/log" "next.orly.dev/app/config" + acl "next.orly.dev/pkg/acl" database "next.orly.dev/pkg/database" "next.orly.dev/pkg/encoders/bech32encoding" "next.orly.dev/pkg/protocol/publish" @@ -45,6 +46,10 @@ func Run( publishers: publish.New(NewPublisher(ctx)), Admins: adminKeys, } + // provide publisher to ACL so background sync can dispatch events + if err := acl.Registry.Configure(cfg, db, ctx, l.publishers); chk.E(err) { + // if configuration fails, proceed but log; ACL might be 'none' + } addr := fmt.Sprintf("%s:%d", cfg.Listen, cfg.Port) log.I.F("starting listener on http://%s", addr) go func() { diff --git a/pkg/acl/follows.go b/pkg/acl/follows.go index 1395a96..d0b6aaa 100644 --- a/pkg/acl/follows.go +++ b/pkg/acl/follows.go @@ -24,6 +24,7 @@ import ( "next.orly.dev/pkg/encoders/hex" "next.orly.dev/pkg/encoders/kind" "next.orly.dev/pkg/encoders/tag" + "next.orly.dev/pkg/protocol/publish" utils "next.orly.dev/pkg/utils" "next.orly.dev/pkg/utils/normalize" "next.orly.dev/pkg/utils/values" @@ -33,6 +34,7 @@ type Follows struct { Ctx context.Context cfg *config.C *database.D + pubs *publish.S followsMx sync.RWMutex admins [][]byte follows [][]byte @@ -53,6 +55,9 @@ func (f *Follows) Configure(cfg ...any) (err error) { case context.Context: // log.D.F("setting ACL context: %s", c.Value("id")) f.Ctx = c + case *publish.S: + // set publisher for dispatching new events + f.pubs = c default: err = errorf.E("invalid type: %T", reflect.TypeOf(ca)) } @@ -290,11 +295,16 @@ func (f *Follows) startSubscriptions(ctx context.Context) { ) } // ignore duplicates and continue + } else { + // Only dispatch if the event was newly saved (no error) + if f.pubs != nil { + f.pubs.Deliver(res.Event) + } + log.I.F( + "saved new event from follows syncer: %0x", + res.Event.ID, + ) } - log.I.F( - "saved new event from follows syncer: %0x", - res.Event.ID, - ) case eoseenvelope.L: // ignore, continue subscription default: diff --git a/pkg/encoders/event/event.go b/pkg/encoders/event/event.go index 49a7dbc..105e2f1 100644 --- a/pkg/encoders/event/event.go +++ b/pkg/encoders/event/event.go @@ -121,7 +121,7 @@ func (ev *E) Marshal(dst []byte) (b []byte) { // integrates properly with the buffer pool, reducing GC pressure and // avoiding new heap allocations. if cap(b) < len(b)+len(ev.Content)+7+256+2 { - b2 := make([]byte, len(b)+len(ev.Content)*2+512) + b2 := make([]byte, len(b)+len(ev.Content)*2+1024) copy(b2, b) b2 = b2[:len(b)] // return the old buffer to the pool for reuse.