package acl import ( "context" "reflect" "strings" "sync" "time" "github.com/coder/websocket" "lol.mleku.dev/chk" "lol.mleku.dev/errorf" "lol.mleku.dev/log" "next.orly.dev/app/config" "next.orly.dev/pkg/database" "next.orly.dev/pkg/database/indexes/types" "next.orly.dev/pkg/encoders/bech32encoding" "next.orly.dev/pkg/encoders/envelopes" "next.orly.dev/pkg/encoders/envelopes/eoseenvelope" "next.orly.dev/pkg/encoders/envelopes/eventenvelope" "next.orly.dev/pkg/encoders/envelopes/reqenvelope" "next.orly.dev/pkg/encoders/event" "next.orly.dev/pkg/encoders/filter" "next.orly.dev/pkg/encoders/hex" "next.orly.dev/pkg/encoders/kind" "next.orly.dev/pkg/encoders/tag" "next.orly.dev/pkg/protocol/publish" "next.orly.dev/pkg/utils" "next.orly.dev/pkg/utils/normalize" "next.orly.dev/pkg/utils/values" ) type Follows struct { Ctx context.Context cfg *config.C *database.D pubs *publish.S followsMx sync.RWMutex admins [][]byte follows [][]byte updated chan struct{} subsCancel context.CancelFunc } func (f *Follows) Configure(cfg ...any) (err error) { log.I.F("configuring follows ACL") for _, ca := range cfg { switch c := ca.(type) { case *config.C: // log.D.F("setting ACL config: %v", c) f.cfg = c case *database.D: // log.D.F("setting ACL database: %s", c.Path()) f.D = c case context.Context: // log.D.F("setting ACL context: %s", c.Value("id")) f.Ctx = c case *publish.S: // set publisher for dispatching new events f.pubs = c default: err = errorf.E("invalid type: %T", reflect.TypeOf(ca)) } } if f.cfg == nil || f.D == nil { err = errorf.E("both config and database must be set") return } // find admin follow lists f.followsMx.Lock() defer f.followsMx.Unlock() // log.I.F("finding admins") f.follows, f.admins = nil, nil for _, admin := range f.cfg.Admins { // log.I.F("%s", admin) var adm []byte if a, e := bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(e) { continue } else { adm = a } // log.I.F("admin: %0x", adm) f.admins = append(f.admins, adm) fl := &filter.F{ Authors: tag.NewFromAny(adm), Kinds: kind.NewS(kind.New(kind.FollowList.K)), } var idxs []database.Range if idxs, err = database.GetIndexesFromFilter(fl); chk.E(err) { return } var sers types.Uint40s for _, idx := range idxs { var s types.Uint40s if s, err = f.D.GetSerialsByRange(idx); chk.E(err) { continue } sers = append(sers, s...) } if len(sers) > 0 { for _, s := range sers { var ev *event.E if ev, err = f.D.FetchEventBySerial(s); chk.E(err) { continue } // log.I.F("admin follow list:\n%s", ev.Serialize()) for _, v := range ev.Tags.GetAll([]byte("p")) { // log.I.F("adding follow: %s", v.Value()) var a []byte if b, e := hex.Dec(string(v.Value())); chk.E(e) { continue } else { a = b } f.follows = append(f.follows, a) } } } } if f.updated == nil { f.updated = make(chan struct{}) } else { f.updated <- struct{}{} } return } func (f *Follows) GetAccessLevel(pub []byte, address string) (level string) { if f.cfg == nil { return "write" } f.followsMx.RLock() defer f.followsMx.RUnlock() for _, v := range f.admins { if utils.FastEqual(v, pub) { return "admin" } } for _, v := range f.follows { if utils.FastEqual(v, pub) { return "write" } } return "read" } func (f *Follows) GetACLInfo() (name, description, documentation string) { return "follows", "whitelist follows of admins", `This ACL mode searches for follow lists of admins and grants all followers write access` } func (f *Follows) Type() string { return "follows" } func (f *Follows) adminRelays() (urls []string) { f.followsMx.RLock() admins := make([][]byte, len(f.admins)) copy(admins, f.admins) f.followsMx.RUnlock() seen := make(map[string]struct{}) for _, adm := range admins { fl := &filter.F{ Authors: tag.NewFromAny(adm), Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)), } idxs, err := database.GetIndexesFromFilter(fl) if chk.E(err) { continue } var sers types.Uint40s for _, idx := range idxs { s, err := f.D.GetSerialsByRange(idx) if chk.E(err) { continue } sers = append(sers, s...) } for _, s := range sers { ev, err := f.D.FetchEventBySerial(s) if chk.E(err) || ev == nil { continue } for _, v := range ev.Tags.GetAll([]byte("r")) { u := string(v.Value()) n := string(normalize.URL(u)) if n == "" { continue } if _, ok := seen[n]; ok { continue } seen[n] = struct{}{} urls = append(urls, n) } } } return } func (f *Follows) startSubscriptions(ctx context.Context) { // build authors list: admins + follows f.followsMx.RLock() authors := make([][]byte, 0, len(f.admins)+len(f.follows)) authors = append(authors, f.admins...) authors = append(authors, f.follows...) f.followsMx.RUnlock() if len(authors) == 0 { log.W.F("follows syncer: no authors (admins+follows) to subscribe to") return } urls := f.adminRelays() log.I.S(urls) if len(urls) == 0 { log.W.F("follows syncer: no admin relays found in DB (kind 10002)") return } log.T.F( "follows syncer: subscribing to %d relays for %d authors", len(urls), len(authors), ) for _, u := range urls { u := u go func() { backoff := time.Second for { select { case <-ctx.Done(): return default: } c, _, err := websocket.Dial(ctx, u, nil) if err != nil { log.W.F("follows syncer: dial %s failed: %v", u, err) if strings.Contains( err.Error(), "response status code 101 but got 403", ) { // 403 means the relay is not accepting connections from // us. Forbidden is the meaning, usually used to // indicate either the IP or user is blocked. so stop // trying this one. return } timer := time.NewTimer(backoff) select { case <-ctx.Done(): return case <-timer.C: } if backoff < 30*time.Second { backoff *= 2 } continue } backoff = time.Second // send REQ ff := &filter.S{} f1 := &filter.F{ Authors: tag.NewFromBytesSlice(authors...), Limit: values.ToUintPointer(0), } *ff = append(*ff, f1) req := reqenvelope.NewFrom([]byte("follows-sync"), ff) if err = c.Write( ctx, websocket.MessageText, req.Marshal(nil), ); chk.E(err) { _ = c.Close(websocket.StatusInternalError, "write failed") continue } log.T.F("sent REQ to %s for follows subscription", u) // read loop for { select { case <-ctx.Done(): _ = c.Close(websocket.StatusNormalClosure, "ctx done") return default: } _, data, err := c.Read(ctx) if err != nil { _ = c.Close(websocket.StatusNormalClosure, "read err") break } label, rem, err := envelopes.Identify(data) if chk.E(err) { continue } switch label { case eventenvelope.L: res, _, err := eventenvelope.ParseResult(rem) if chk.E(err) || res == nil || res.Event == nil { continue } // verify signature before saving if ok, err := res.Event.Verify(); chk.T(err) || !ok { continue } if _, _, err = f.D.SaveEvent( ctx, res.Event, ); err != nil { if !strings.HasPrefix( err.Error(), "blocked:", ) { log.W.F( "follows syncer: save event failed: %v", err, ) } // ignore duplicates and continue } else { // Only dispatch if the event was newly saved (no error) if f.pubs != nil { go f.pubs.Deliver(res.Event) } // log.I.F( // "saved new event from follows syncer: %0x", // res.Event.ID, // ) } case eoseenvelope.L: // ignore, continue subscription default: // ignore other labels } } // loop reconnect } }() } } func (f *Follows) Syncer() { log.I.F("starting follows syncer") go func() { // start immediately if Configure already ran for { var innerCancel context.CancelFunc select { case <-f.Ctx.Done(): if f.subsCancel != nil { f.subsCancel() } return case <-f.updated: // close and reopen subscriptions to users on the follow list and admins if f.subsCancel != nil { log.I.F("follows syncer: cancelling existing subscriptions") f.subsCancel() } ctx, cancel := context.WithCancel(f.Ctx) f.subsCancel = cancel innerCancel = cancel log.I.F("follows syncer: (re)opening subscriptions") f.startSubscriptions(ctx) } // small sleep to avoid tight loop if updated fires rapidly if innerCancel == nil { time.Sleep(50 * time.Millisecond) } } }() f.updated <- struct{}{} } // GetFollowedPubkeys returns a copy of the followed pubkeys list func (f *Follows) GetFollowedPubkeys() [][]byte { f.followsMx.RLock() defer f.followsMx.RUnlock() followedPubkeys := make([][]byte, len(f.follows)) copy(followedPubkeys, f.follows) return followedPubkeys } func init() { log.T.F("registering follows ACL") Registry.Register(new(Follows)) }