diff --git a/pkg/acl/follows.go b/pkg/acl/follows.go index 513ad9f..d1c6c49 100644 --- a/pkg/acl/follows.go +++ b/pkg/acl/follows.go @@ -3,31 +3,41 @@ package acl import ( "context" "reflect" + "strings" "sync" + "time" database "database.orly" "database.orly/indexes/types" "encoders.orly/bech32encoding" + "encoders.orly/envelopes" + "encoders.orly/envelopes/eoseenvelope" + "encoders.orly/envelopes/eventenvelope" + "encoders.orly/envelopes/reqenvelope" "encoders.orly/event" "encoders.orly/filter" "encoders.orly/hex" "encoders.orly/kind" "encoders.orly/tag" + "github.com/coder/websocket" "lol.mleku.dev/chk" "lol.mleku.dev/errorf" "lol.mleku.dev/log" "next.orly.dev/app/config" utils "utils.orly" + "utils.orly/normalize" + "utils.orly/values" ) type Follows struct { Ctx context.Context cfg *config.C *database.D - followsMx sync.RWMutex - admins [][]byte - follows [][]byte - updated chan struct{} + followsMx sync.RWMutex + admins [][]byte + follows [][]byte + updated chan struct{} + subsCancel context.CancelFunc } func (f *Follows) Configure(cfg ...any) (err error) { @@ -132,17 +142,194 @@ func (f *Follows) GetACLInfo() (name, description, documentation string) { func (f *Follows) Type() string { return "follows" } +func (f *Follows) adminRelays() (urls []string) { + f.followsMx.RLock() + admins := make([][]byte, len(f.admins)) + copy(admins, f.admins) + f.followsMx.RUnlock() + seen := make(map[string]struct{}) + for _, adm := range admins { + fl := &filter.F{ + Authors: tag.NewFromAny(adm), + Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)), + } + idxs, err := database.GetIndexesFromFilter(fl) + if chk.E(err) { + continue + } + var sers types.Uint40s + for _, idx := range idxs { + s, err := f.D.GetSerialsByRange(idx) + if chk.E(err) { + continue + } + sers = append(sers, s...) + } + for _, s := range sers { + ev, err := f.D.FetchEventBySerial(s) + if chk.E(err) || ev == nil { + continue + } + for _, v := range ev.Tags.GetAll([]byte("r")) { + u := string(v.Value()) + n := string(normalize.URL(u)) + if n == "" { + continue + } + if _, ok := seen[n]; ok { + continue + } + seen[n] = struct{}{} + urls = append(urls, n) + } + } + } + return +} + +func (f *Follows) startSubscriptions(ctx context.Context) { + // build authors list: admins + follows + f.followsMx.RLock() + authors := make([][]byte, 0, len(f.admins)+len(f.follows)) + authors = append(authors, f.admins...) + authors = append(authors, f.follows...) + f.followsMx.RUnlock() + if len(authors) == 0 { + log.W.F("follows syncer: no authors (admins+follows) to subscribe to") + return + } + urls := f.adminRelays() + if len(urls) == 0 { + log.W.F("follows syncer: no admin relays found in DB (kind 10002)") + return + } + log.I.F( + "follows syncer: subscribing to %d relays for %d authors", len(urls), + len(authors), + ) + for _, u := range urls { + u := u + go func() { + backoff := time.Second + for { + select { + case <-ctx.Done(): + return + default: + } + c, _, err := websocket.Dial(ctx, u, nil) + if err != nil { + log.W.F("follows syncer: dial %s failed: %v", u, err) + timer := time.NewTimer(backoff) + select { + case <-ctx.Done(): + return + case <-timer.C: + } + if backoff < 30*time.Second { + backoff *= 2 + } + continue + } + backoff = time.Second + // send REQ + ff := &filter.S{} + f1 := &filter.F{ + Authors: tag.NewFromBytesSlice(authors...), + Limit: values.ToUintPointer(0), + } + *ff = append(*ff, f1) + req := reqenvelope.NewFrom([]byte("follows-sync"), ff) + if err := c.Write( + ctx, websocket.MessageText, req.Marshal(nil), + ); chk.E(err) { + _ = c.Close(websocket.StatusInternalError, "write failed") + continue + } + log.I.F("sent REQ to %s for follows subscription", u) + // read loop + for { + select { + case <-ctx.Done(): + _ = c.Close(websocket.StatusNormalClosure, "ctx done") + return + default: + } + _, data, err := c.Read(ctx) + if err != nil { + _ = c.Close(websocket.StatusNormalClosure, "read err") + break + } + label, rem, err := envelopes.Identify(data) + if chk.E(err) { + continue + } + switch label { + case eventenvelope.L: + res, _, err := eventenvelope.ParseResult(rem) + if chk.E(err) || res == nil || res.Event == nil { + continue + } + // verify signature before saving + if ok, err := res.Event.Verify(); chk.T(err) || !ok { + continue + } + if _, _, err := f.D.SaveEvent( + ctx, res.Event, + ); err != nil { + if !strings.HasPrefix( + err.Error(), "event already exists", + ) { + log.W.F( + "follows syncer: save event failed: %v", + err, + ) + } + // ignore duplicates and continue + } + log.I.F( + "saved new event from follows syncer: %0x", + res.Event.ID, + ) + case eoseenvelope.L: + // ignore, continue subscription + default: + // ignore other labels + } + } + // loop reconnect + } + }() + } +} + func (f *Follows) Syncer() { log.I.F("starting follows syncer") go func() { + // start immediately if Configure already ran for { + var innerCancel context.CancelFunc select { case <-f.Ctx.Done(): + if f.subsCancel != nil { + f.subsCancel() + } return case <-f.updated: - // close and reopen subscriptions to users on the follow list and - // admins - log.I.F("reopening subscriptions") + // close and reopen subscriptions to users on the follow list and admins + if f.subsCancel != nil { + log.I.F("follows syncer: cancelling existing subscriptions") + f.subsCancel() + } + ctx, cancel := context.WithCancel(f.Ctx) + f.subsCancel = cancel + innerCancel = cancel + log.I.F("follows syncer: (re)opening subscriptions") + f.startSubscriptions(ctx) + } + // small sleep to avoid tight loop if updated fires rapidly + if innerCancel == nil { + time.Sleep(50 * time.Millisecond) } } }()