wire up trigger to restart sync for ACL spider
This commit is contained in:
@@ -68,7 +68,8 @@ func (l *Listener) HandleReq(msg []byte) (
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
// write out the events to the socket
|
||||
// todo: filter out privileged events from the results if the user is not
|
||||
// authed or authed to a non-privileged pubkey.
|
||||
seen := make(map[string]struct{})
|
||||
for _, ev := range events {
|
||||
// track the IDs we've sent
|
||||
|
||||
@@ -34,7 +34,6 @@ func Run(
|
||||
}
|
||||
adminKeys = append(adminKeys, pk)
|
||||
}
|
||||
|
||||
// start listener
|
||||
l := &Server{
|
||||
Ctx: ctx,
|
||||
|
||||
3
main.go
3
main.go
@@ -30,9 +30,10 @@ func main() {
|
||||
os.Exit(1)
|
||||
}
|
||||
acl.Registry.Active.Store(cfg.ACLMode)
|
||||
if err = acl.Registry.Configure(cfg, db); chk.E(err) {
|
||||
if err = acl.Registry.Configure(cfg, db, ctx); chk.E(err) {
|
||||
os.Exit(1)
|
||||
}
|
||||
acl.Registry.Syncer()
|
||||
quit := app.Run(ctx, cfg, db)
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, os.Interrupt)
|
||||
|
||||
@@ -48,6 +48,15 @@ func (s *S) GetACLInfo() (name, description, documentation string) {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *S) Syncer() {
|
||||
for _, i := range s.ACL {
|
||||
if i.Type() == s.Active.Load() {
|
||||
i.Syncer()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *S) Type() (typ string) {
|
||||
for _, i := range s.ACL {
|
||||
if i.Type() == s.Active.Load() {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package acl
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
@@ -20,11 +21,13 @@ import (
|
||||
)
|
||||
|
||||
type Follows struct {
|
||||
Ctx context.Context
|
||||
cfg *config.C
|
||||
*database.D
|
||||
followsMx sync.RWMutex
|
||||
admins [][]byte
|
||||
follows [][]byte
|
||||
updated chan struct{}
|
||||
}
|
||||
|
||||
func (f *Follows) Configure(cfg ...any) (err error) {
|
||||
@@ -37,6 +40,9 @@ func (f *Follows) Configure(cfg ...any) (err error) {
|
||||
case *database.D:
|
||||
log.D.F("setting ACL database: %s", c.Path())
|
||||
f.D = c
|
||||
case context.Context:
|
||||
log.D.F("setting ACL context: %s", c.Value("id"))
|
||||
f.Ctx = c
|
||||
default:
|
||||
err = errorf.E("invalid type: %T", reflect.TypeOf(ca))
|
||||
}
|
||||
@@ -92,6 +98,11 @@ func (f *Follows) Configure(cfg ...any) (err error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if f.updated == nil {
|
||||
f.updated = make(chan struct{})
|
||||
} else {
|
||||
f.updated <- struct{}{}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -121,6 +132,22 @@ func (f *Follows) GetACLInfo() (name, description, documentation string) {
|
||||
|
||||
func (f *Follows) Type() string { return "follows" }
|
||||
|
||||
func (f *Follows) Syncer() {
|
||||
log.I.F("starting follows syncer")
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-f.Ctx.Done():
|
||||
return
|
||||
case <-f.updated:
|
||||
// close and reopen subscriptions to users on the follow list and
|
||||
// admins
|
||||
log.I.F("reopening subscriptions")
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func init() {
|
||||
log.T.F("registering follows ACL")
|
||||
Registry.Register(new(Follows))
|
||||
|
||||
@@ -20,6 +20,8 @@ func (n None) Type() string {
|
||||
return "none"
|
||||
}
|
||||
|
||||
func (n None) Syncer() {}
|
||||
|
||||
func init() {
|
||||
log.T.F("registering none ACL")
|
||||
Registry.Register(new(None))
|
||||
|
||||
@@ -27,5 +27,9 @@ type I interface {
|
||||
// explain briefly how it works, and then a long text of documentation of
|
||||
// the ACL's rules and configuration (in asciidoc or markdown).
|
||||
GetACLInfo() (name, description, documentation string)
|
||||
// Syncer is a worker thread that does things in the background like syncing
|
||||
// with other relays on admin relay lists using subscriptions for all events
|
||||
// that arrive elsewhere relevant to the ACL scheme.
|
||||
Syncer()
|
||||
typer.T
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user