From 2491fd273853e9410d92c1f22ae7eb7ca3ca275d Mon Sep 17 00:00:00 2001 From: mleku Date: Sun, 7 Sep 2025 20:24:04 +0100 Subject: [PATCH] wire up trigger to restart sync for ACL spider --- app/handle-req.go | 3 ++- app/main.go | 1 - main.go | 3 ++- pkg/acl/acl.go | 9 +++++++++ pkg/acl/follows.go | 27 +++++++++++++++++++++++++++ pkg/acl/none.go | 2 ++ pkg/interfaces/acl/acl.go | 4 ++++ 7 files changed, 46 insertions(+), 3 deletions(-) diff --git a/app/handle-req.go b/app/handle-req.go index fdd1360..d9ae83f 100644 --- a/app/handle-req.go +++ b/app/handle-req.go @@ -68,7 +68,8 @@ func (l *Listener) HandleReq(msg []byte) ( err = nil } } - // write out the events to the socket + // todo: filter out privileged events from the results if the user is not + // authed or authed to a non-privileged pubkey. seen := make(map[string]struct{}) for _, ev := range events { // track the IDs we've sent diff --git a/app/main.go b/app/main.go index b8f43da..3343594 100644 --- a/app/main.go +++ b/app/main.go @@ -34,7 +34,6 @@ func Run( } adminKeys = append(adminKeys, pk) } - // start listener l := &Server{ Ctx: ctx, diff --git a/main.go b/main.go index 4d41edb..f7f6183 100644 --- a/main.go +++ b/main.go @@ -30,9 +30,10 @@ func main() { os.Exit(1) } acl.Registry.Active.Store(cfg.ACLMode) - if err = acl.Registry.Configure(cfg, db); chk.E(err) { + if err = acl.Registry.Configure(cfg, db, ctx); chk.E(err) { os.Exit(1) } + acl.Registry.Syncer() quit := app.Run(ctx, cfg, db) sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt) diff --git a/pkg/acl/acl.go b/pkg/acl/acl.go index 3f6428f..623ed38 100644 --- a/pkg/acl/acl.go +++ b/pkg/acl/acl.go @@ -48,6 +48,15 @@ func (s *S) GetACLInfo() (name, description, documentation string) { return } +func (s *S) Syncer() { + for _, i := range s.ACL { + if i.Type() == s.Active.Load() { + i.Syncer() + break + } + } +} + func (s *S) Type() (typ string) { for _, i := range s.ACL { if i.Type() == s.Active.Load() { diff --git a/pkg/acl/follows.go b/pkg/acl/follows.go index 684a692..513ad9f 100644 --- a/pkg/acl/follows.go +++ b/pkg/acl/follows.go @@ -1,6 +1,7 @@ package acl import ( + "context" "reflect" "sync" @@ -20,11 +21,13 @@ import ( ) type Follows struct { + Ctx context.Context cfg *config.C *database.D followsMx sync.RWMutex admins [][]byte follows [][]byte + updated chan struct{} } func (f *Follows) Configure(cfg ...any) (err error) { @@ -37,6 +40,9 @@ func (f *Follows) Configure(cfg ...any) (err error) { case *database.D: log.D.F("setting ACL database: %s", c.Path()) f.D = c + case context.Context: + log.D.F("setting ACL context: %s", c.Value("id")) + f.Ctx = c default: err = errorf.E("invalid type: %T", reflect.TypeOf(ca)) } @@ -92,6 +98,11 @@ func (f *Follows) Configure(cfg ...any) (err error) { } } } + if f.updated == nil { + f.updated = make(chan struct{}) + } else { + f.updated <- struct{}{} + } return } @@ -121,6 +132,22 @@ func (f *Follows) GetACLInfo() (name, description, documentation string) { func (f *Follows) Type() string { return "follows" } +func (f *Follows) Syncer() { + log.I.F("starting follows syncer") + go func() { + for { + select { + case <-f.Ctx.Done(): + return + case <-f.updated: + // close and reopen subscriptions to users on the follow list and + // admins + log.I.F("reopening subscriptions") + } + } + }() +} + func init() { log.T.F("registering follows ACL") Registry.Register(new(Follows)) diff --git a/pkg/acl/none.go b/pkg/acl/none.go index 29335cf..37b915f 100644 --- a/pkg/acl/none.go +++ b/pkg/acl/none.go @@ -20,6 +20,8 @@ func (n None) Type() string { return "none" } +func (n None) Syncer() {} + func init() { log.T.F("registering none ACL") Registry.Register(new(None)) diff --git a/pkg/interfaces/acl/acl.go b/pkg/interfaces/acl/acl.go index cbce759..dbf6883 100644 --- a/pkg/interfaces/acl/acl.go +++ b/pkg/interfaces/acl/acl.go @@ -27,5 +27,9 @@ type I interface { // explain briefly how it works, and then a long text of documentation of // the ACL's rules and configuration (in asciidoc or markdown). GetACLInfo() (name, description, documentation string) + // Syncer is a worker thread that does things in the background like syncing + // with other relays on admin relay lists using subscriptions for all events + // that arrive elsewhere relevant to the ACL scheme. + Syncer() typer.T }