Integrate ACL with publishers for background event dispatch, ensure proper buffer adjustments in event encoding, and enhance follows sync with event delivery logic.
This commit is contained in:
@@ -24,6 +24,7 @@ import (
|
||||
"next.orly.dev/pkg/encoders/hex"
|
||||
"next.orly.dev/pkg/encoders/kind"
|
||||
"next.orly.dev/pkg/encoders/tag"
|
||||
"next.orly.dev/pkg/protocol/publish"
|
||||
utils "next.orly.dev/pkg/utils"
|
||||
"next.orly.dev/pkg/utils/normalize"
|
||||
"next.orly.dev/pkg/utils/values"
|
||||
@@ -33,6 +34,7 @@ type Follows struct {
|
||||
Ctx context.Context
|
||||
cfg *config.C
|
||||
*database.D
|
||||
pubs *publish.S
|
||||
followsMx sync.RWMutex
|
||||
admins [][]byte
|
||||
follows [][]byte
|
||||
@@ -53,6 +55,9 @@ func (f *Follows) Configure(cfg ...any) (err error) {
|
||||
case context.Context:
|
||||
// log.D.F("setting ACL context: %s", c.Value("id"))
|
||||
f.Ctx = c
|
||||
case *publish.S:
|
||||
// set publisher for dispatching new events
|
||||
f.pubs = c
|
||||
default:
|
||||
err = errorf.E("invalid type: %T", reflect.TypeOf(ca))
|
||||
}
|
||||
@@ -290,11 +295,16 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
|
||||
)
|
||||
}
|
||||
// ignore duplicates and continue
|
||||
} else {
|
||||
// Only dispatch if the event was newly saved (no error)
|
||||
if f.pubs != nil {
|
||||
f.pubs.Deliver(res.Event)
|
||||
}
|
||||
log.I.F(
|
||||
"saved new event from follows syncer: %0x",
|
||||
res.Event.ID,
|
||||
)
|
||||
}
|
||||
log.I.F(
|
||||
"saved new event from follows syncer: %0x",
|
||||
res.Event.ID,
|
||||
)
|
||||
case eoseenvelope.L:
|
||||
// ignore, continue subscription
|
||||
default:
|
||||
|
||||
@@ -121,7 +121,7 @@ func (ev *E) Marshal(dst []byte) (b []byte) {
|
||||
// integrates properly with the buffer pool, reducing GC pressure and
|
||||
// avoiding new heap allocations.
|
||||
if cap(b) < len(b)+len(ev.Content)+7+256+2 {
|
||||
b2 := make([]byte, len(b)+len(ev.Content)*2+512)
|
||||
b2 := make([]byte, len(b)+len(ev.Content)*2+1024)
|
||||
copy(b2, b)
|
||||
b2 = b2[:len(b)]
|
||||
// return the old buffer to the pool for reuse.
|
||||
|
||||
Reference in New Issue
Block a user