Files
next.orly.dev/pkg/acl/follows.go
2025-10-03 17:01:34 +02:00

430 lines
10 KiB
Go

package acl
import (
"bytes"
"context"
"reflect"
"strings"
"sync"
"time"
"github.com/coder/websocket"
"lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
"next.orly.dev/app/config"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/database/indexes/types"
"next.orly.dev/pkg/encoders/bech32encoding"
"next.orly.dev/pkg/encoders/envelopes"
"next.orly.dev/pkg/encoders/envelopes/eoseenvelope"
"next.orly.dev/pkg/encoders/envelopes/eventenvelope"
"next.orly.dev/pkg/encoders/envelopes/reqenvelope"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/filter"
"next.orly.dev/pkg/encoders/hex"
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag"
"next.orly.dev/pkg/protocol/publish"
"next.orly.dev/pkg/utils"
"next.orly.dev/pkg/utils/normalize"
"next.orly.dev/pkg/utils/values"
)
type Follows struct {
Ctx context.Context
cfg *config.C
*database.D
pubs *publish.S
followsMx sync.RWMutex
admins [][]byte
follows [][]byte
updated chan struct{}
subsCancel context.CancelFunc
}
func (f *Follows) Configure(cfg ...any) (err error) {
log.I.F("configuring follows ACL")
for _, ca := range cfg {
switch c := ca.(type) {
case *config.C:
// log.D.F("setting ACL config: %v", c)
f.cfg = c
case *database.D:
// log.D.F("setting ACL database: %s", c.Path())
f.D = c
case context.Context:
// log.D.F("setting ACL context: %s", c.Value("id"))
f.Ctx = c
case *publish.S:
// set publisher for dispatching new events
f.pubs = c
default:
err = errorf.E("invalid type: %T", reflect.TypeOf(ca))
}
}
if f.cfg == nil || f.D == nil {
err = errorf.E("both config and database must be set")
return
}
// find admin follow lists
f.followsMx.Lock()
defer f.followsMx.Unlock()
// log.I.F("finding admins")
f.follows, f.admins = nil, nil
for _, admin := range f.cfg.Admins {
// log.I.F("%s", admin)
var adm []byte
if a, e := bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(e) {
continue
} else {
adm = a
}
// log.I.F("admin: %0x", adm)
f.admins = append(f.admins, adm)
fl := &filter.F{
Authors: tag.NewFromAny(adm),
Kinds: kind.NewS(kind.New(kind.FollowList.K)),
}
var idxs []database.Range
if idxs, err = database.GetIndexesFromFilter(fl); chk.E(err) {
return
}
var sers types.Uint40s
for _, idx := range idxs {
var s types.Uint40s
if s, err = f.D.GetSerialsByRange(idx); chk.E(err) {
continue
}
sers = append(sers, s...)
}
if len(sers) > 0 {
for _, s := range sers {
var ev *event.E
if ev, err = f.D.FetchEventBySerial(s); chk.E(err) {
continue
}
// log.I.F("admin follow list:\n%s", ev.Serialize())
for _, v := range ev.Tags.GetAll([]byte("p")) {
// log.I.F("adding follow: %s", v.Value())
var a []byte
if b, e := hex.Dec(string(v.Value())); chk.E(e) {
continue
} else {
a = b
}
f.follows = append(f.follows, a)
}
}
}
}
if f.updated == nil {
f.updated = make(chan struct{})
} else {
f.updated <- struct{}{}
}
return
}
func (f *Follows) GetAccessLevel(pub []byte, address string) (level string) {
if f.cfg == nil {
return "write"
}
f.followsMx.RLock()
defer f.followsMx.RUnlock()
for _, v := range f.admins {
if utils.FastEqual(v, pub) {
return "admin"
}
}
for _, v := range f.follows {
if utils.FastEqual(v, pub) {
return "write"
}
}
return "read"
}
func (f *Follows) GetACLInfo() (name, description, documentation string) {
return "follows", "whitelist follows of admins",
`This ACL mode searches for follow lists of admins and grants all followers write access`
}
func (f *Follows) Type() string { return "follows" }
func (f *Follows) adminRelays() (urls []string) {
f.followsMx.RLock()
admins := make([][]byte, len(f.admins))
copy(admins, f.admins)
f.followsMx.RUnlock()
seen := make(map[string]struct{})
// First, try to get relay URLs from admin kind 10002 events
for _, adm := range admins {
fl := &filter.F{
Authors: tag.NewFromAny(adm),
Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
}
idxs, err := database.GetIndexesFromFilter(fl)
if chk.E(err) {
continue
}
var sers types.Uint40s
for _, idx := range idxs {
s, err := f.D.GetSerialsByRange(idx)
if chk.E(err) {
continue
}
sers = append(sers, s...)
}
for _, s := range sers {
ev, err := f.D.FetchEventBySerial(s)
if chk.E(err) || ev == nil {
continue
}
for _, v := range ev.Tags.GetAll([]byte("r")) {
u := string(v.Value())
n := string(normalize.URL(u))
if n == "" {
continue
}
if _, ok := seen[n]; ok {
continue
}
seen[n] = struct{}{}
urls = append(urls, n)
}
}
}
// If no admin relays found, use bootstrap relays as fallback
if len(urls) == 0 {
log.I.F("no admin relays found in DB, checking bootstrap relays")
if len(f.cfg.BootstrapRelays) > 0 {
log.I.F("using bootstrap relays: %v", f.cfg.BootstrapRelays)
for _, relay := range f.cfg.BootstrapRelays {
n := string(normalize.URL(relay))
if n == "" {
log.W.F("invalid bootstrap relay URL: %s", relay)
continue
}
if _, ok := seen[n]; ok {
continue
}
seen[n] = struct{}{}
urls = append(urls, n)
}
} else {
log.W.F("no bootstrap relays configured")
}
}
return
}
func (f *Follows) startSubscriptions(ctx context.Context) {
// build authors list: admins + follows
f.followsMx.RLock()
authors := make([][]byte, 0, len(f.admins)+len(f.follows))
authors = append(authors, f.admins...)
authors = append(authors, f.follows...)
f.followsMx.RUnlock()
if len(authors) == 0 {
log.W.F("follows syncer: no authors (admins+follows) to subscribe to")
return
}
urls := f.adminRelays()
log.I.S(urls)
if len(urls) == 0 {
log.W.F("follows syncer: no admin relays found in DB (kind 10002) and no bootstrap relays configured")
return
}
log.T.F(
"follows syncer: subscribing to %d relays for %d authors", len(urls),
len(authors),
)
for _, u := range urls {
u := u
go func() {
backoff := time.Second
for {
select {
case <-ctx.Done():
return
default:
}
c, _, err := websocket.Dial(ctx, u, nil)
if err != nil {
log.W.F("follows syncer: dial %s failed: %v", u, err)
if strings.Contains(
err.Error(), "response status code 101 but got 403",
) {
// 403 means the relay is not accepting connections from
// us. Forbidden is the meaning, usually used to
// indicate either the IP or user is blocked. so stop
// trying this one.
return
}
timer := time.NewTimer(backoff)
select {
case <-ctx.Done():
return
case <-timer.C:
}
if backoff < 30*time.Second {
backoff *= 2
}
continue
}
backoff = time.Second
// send REQ
ff := &filter.S{}
f1 := &filter.F{
Authors: tag.NewFromBytesSlice(authors...),
Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
Limit: values.ToUintPointer(0),
}
*ff = append(*ff, f1)
req := reqenvelope.NewFrom([]byte("follows-sync"), ff)
if err = c.Write(
ctx, websocket.MessageText, req.Marshal(nil),
); chk.E(err) {
_ = c.Close(websocket.StatusInternalError, "write failed")
continue
}
log.T.F("sent REQ to %s for follows subscription", u)
// read loop
for {
select {
case <-ctx.Done():
_ = c.Close(websocket.StatusNormalClosure, "ctx done")
return
default:
}
_, data, err := c.Read(ctx)
if err != nil {
_ = c.Close(websocket.StatusNormalClosure, "read err")
break
}
label, rem, err := envelopes.Identify(data)
if chk.E(err) {
continue
}
switch label {
case eventenvelope.L:
res, _, err := eventenvelope.ParseResult(rem)
if chk.E(err) || res == nil || res.Event == nil {
continue
}
// verify signature before saving
if ok, err := res.Event.Verify(); chk.T(err) || !ok {
continue
}
if _, _, err = f.D.SaveEvent(
ctx, res.Event,
); err != nil {
if !strings.HasPrefix(
err.Error(), "blocked:",
) {
log.W.F(
"follows syncer: save event failed: %v",
err,
)
}
// ignore duplicates and continue
} else {
// Only dispatch if the event was newly saved (no error)
if f.pubs != nil {
go f.pubs.Deliver(res.Event)
}
// log.I.F(
// "saved new event from follows syncer: %0x",
// res.Event.ID,
// )
}
case eoseenvelope.L:
// ignore, continue subscription
default:
// ignore other labels
}
}
// loop reconnect
}
}()
}
}
func (f *Follows) Syncer() {
log.I.F("starting follows syncer")
go func() {
// start immediately if Configure already ran
for {
var innerCancel context.CancelFunc
select {
case <-f.Ctx.Done():
if f.subsCancel != nil {
f.subsCancel()
}
return
case <-f.updated:
// close and reopen subscriptions to users on the follow list and admins
if f.subsCancel != nil {
log.I.F("follows syncer: cancelling existing subscriptions")
f.subsCancel()
}
ctx, cancel := context.WithCancel(f.Ctx)
f.subsCancel = cancel
innerCancel = cancel
log.I.F("follows syncer: (re)opening subscriptions")
f.startSubscriptions(ctx)
}
// small sleep to avoid tight loop if updated fires rapidly
if innerCancel == nil {
time.Sleep(50 * time.Millisecond)
}
}
}()
f.updated <- struct{}{}
}
// GetFollowedPubkeys returns a copy of the followed pubkeys list
func (f *Follows) GetFollowedPubkeys() [][]byte {
f.followsMx.RLock()
defer f.followsMx.RUnlock()
followedPubkeys := make([][]byte, len(f.follows))
copy(followedPubkeys, f.follows)
return followedPubkeys
}
// AddFollow appends a pubkey to the in-memory follows list if not already present
// and signals the syncer to refresh subscriptions.
func (f *Follows) AddFollow(pub []byte) {
if len(pub) == 0 {
return
}
f.followsMx.Lock()
defer f.followsMx.Unlock()
for _, p := range f.follows {
if bytes.Equal(p, pub) {
return
}
}
b := make([]byte, len(pub))
copy(b, pub)
f.follows = append(f.follows, b)
// notify syncer if initialized
if f.updated != nil {
select {
case f.updated <- struct{}{}:
default:
// if channel is full or not yet listened to, ignore
}
}
}
func init() {
log.T.F("registering follows ACL")
Registry.Register(new(Follows))
}