Add admin relay handling and real-time subscription syncing in follows implementation
This commit is contained in:
@@ -3,31 +3,41 @@ package acl
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
database "database.orly"
|
||||
"database.orly/indexes/types"
|
||||
"encoders.orly/bech32encoding"
|
||||
"encoders.orly/envelopes"
|
||||
"encoders.orly/envelopes/eoseenvelope"
|
||||
"encoders.orly/envelopes/eventenvelope"
|
||||
"encoders.orly/envelopes/reqenvelope"
|
||||
"encoders.orly/event"
|
||||
"encoders.orly/filter"
|
||||
"encoders.orly/hex"
|
||||
"encoders.orly/kind"
|
||||
"encoders.orly/tag"
|
||||
"github.com/coder/websocket"
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/errorf"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/app/config"
|
||||
utils "utils.orly"
|
||||
"utils.orly/normalize"
|
||||
"utils.orly/values"
|
||||
)
|
||||
|
||||
type Follows struct {
|
||||
Ctx context.Context
|
||||
cfg *config.C
|
||||
*database.D
|
||||
followsMx sync.RWMutex
|
||||
admins [][]byte
|
||||
follows [][]byte
|
||||
updated chan struct{}
|
||||
followsMx sync.RWMutex
|
||||
admins [][]byte
|
||||
follows [][]byte
|
||||
updated chan struct{}
|
||||
subsCancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (f *Follows) Configure(cfg ...any) (err error) {
|
||||
@@ -132,17 +142,194 @@ func (f *Follows) GetACLInfo() (name, description, documentation string) {
|
||||
|
||||
func (f *Follows) Type() string { return "follows" }
|
||||
|
||||
func (f *Follows) adminRelays() (urls []string) {
|
||||
f.followsMx.RLock()
|
||||
admins := make([][]byte, len(f.admins))
|
||||
copy(admins, f.admins)
|
||||
f.followsMx.RUnlock()
|
||||
seen := make(map[string]struct{})
|
||||
for _, adm := range admins {
|
||||
fl := &filter.F{
|
||||
Authors: tag.NewFromAny(adm),
|
||||
Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
|
||||
}
|
||||
idxs, err := database.GetIndexesFromFilter(fl)
|
||||
if chk.E(err) {
|
||||
continue
|
||||
}
|
||||
var sers types.Uint40s
|
||||
for _, idx := range idxs {
|
||||
s, err := f.D.GetSerialsByRange(idx)
|
||||
if chk.E(err) {
|
||||
continue
|
||||
}
|
||||
sers = append(sers, s...)
|
||||
}
|
||||
for _, s := range sers {
|
||||
ev, err := f.D.FetchEventBySerial(s)
|
||||
if chk.E(err) || ev == nil {
|
||||
continue
|
||||
}
|
||||
for _, v := range ev.Tags.GetAll([]byte("r")) {
|
||||
u := string(v.Value())
|
||||
n := string(normalize.URL(u))
|
||||
if n == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[n]; ok {
|
||||
continue
|
||||
}
|
||||
seen[n] = struct{}{}
|
||||
urls = append(urls, n)
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (f *Follows) startSubscriptions(ctx context.Context) {
|
||||
// build authors list: admins + follows
|
||||
f.followsMx.RLock()
|
||||
authors := make([][]byte, 0, len(f.admins)+len(f.follows))
|
||||
authors = append(authors, f.admins...)
|
||||
authors = append(authors, f.follows...)
|
||||
f.followsMx.RUnlock()
|
||||
if len(authors) == 0 {
|
||||
log.W.F("follows syncer: no authors (admins+follows) to subscribe to")
|
||||
return
|
||||
}
|
||||
urls := f.adminRelays()
|
||||
if len(urls) == 0 {
|
||||
log.W.F("follows syncer: no admin relays found in DB (kind 10002)")
|
||||
return
|
||||
}
|
||||
log.I.F(
|
||||
"follows syncer: subscribing to %d relays for %d authors", len(urls),
|
||||
len(authors),
|
||||
)
|
||||
for _, u := range urls {
|
||||
u := u
|
||||
go func() {
|
||||
backoff := time.Second
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
c, _, err := websocket.Dial(ctx, u, nil)
|
||||
if err != nil {
|
||||
log.W.F("follows syncer: dial %s failed: %v", u, err)
|
||||
timer := time.NewTimer(backoff)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-timer.C:
|
||||
}
|
||||
if backoff < 30*time.Second {
|
||||
backoff *= 2
|
||||
}
|
||||
continue
|
||||
}
|
||||
backoff = time.Second
|
||||
// send REQ
|
||||
ff := &filter.S{}
|
||||
f1 := &filter.F{
|
||||
Authors: tag.NewFromBytesSlice(authors...),
|
||||
Limit: values.ToUintPointer(0),
|
||||
}
|
||||
*ff = append(*ff, f1)
|
||||
req := reqenvelope.NewFrom([]byte("follows-sync"), ff)
|
||||
if err := c.Write(
|
||||
ctx, websocket.MessageText, req.Marshal(nil),
|
||||
); chk.E(err) {
|
||||
_ = c.Close(websocket.StatusInternalError, "write failed")
|
||||
continue
|
||||
}
|
||||
log.I.F("sent REQ to %s for follows subscription", u)
|
||||
// read loop
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
_ = c.Close(websocket.StatusNormalClosure, "ctx done")
|
||||
return
|
||||
default:
|
||||
}
|
||||
_, data, err := c.Read(ctx)
|
||||
if err != nil {
|
||||
_ = c.Close(websocket.StatusNormalClosure, "read err")
|
||||
break
|
||||
}
|
||||
label, rem, err := envelopes.Identify(data)
|
||||
if chk.E(err) {
|
||||
continue
|
||||
}
|
||||
switch label {
|
||||
case eventenvelope.L:
|
||||
res, _, err := eventenvelope.ParseResult(rem)
|
||||
if chk.E(err) || res == nil || res.Event == nil {
|
||||
continue
|
||||
}
|
||||
// verify signature before saving
|
||||
if ok, err := res.Event.Verify(); chk.T(err) || !ok {
|
||||
continue
|
||||
}
|
||||
if _, _, err := f.D.SaveEvent(
|
||||
ctx, res.Event,
|
||||
); err != nil {
|
||||
if !strings.HasPrefix(
|
||||
err.Error(), "event already exists",
|
||||
) {
|
||||
log.W.F(
|
||||
"follows syncer: save event failed: %v",
|
||||
err,
|
||||
)
|
||||
}
|
||||
// ignore duplicates and continue
|
||||
}
|
||||
log.I.F(
|
||||
"saved new event from follows syncer: %0x",
|
||||
res.Event.ID,
|
||||
)
|
||||
case eoseenvelope.L:
|
||||
// ignore, continue subscription
|
||||
default:
|
||||
// ignore other labels
|
||||
}
|
||||
}
|
||||
// loop reconnect
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (f *Follows) Syncer() {
|
||||
log.I.F("starting follows syncer")
|
||||
go func() {
|
||||
// start immediately if Configure already ran
|
||||
for {
|
||||
var innerCancel context.CancelFunc
|
||||
select {
|
||||
case <-f.Ctx.Done():
|
||||
if f.subsCancel != nil {
|
||||
f.subsCancel()
|
||||
}
|
||||
return
|
||||
case <-f.updated:
|
||||
// close and reopen subscriptions to users on the follow list and
|
||||
// admins
|
||||
log.I.F("reopening subscriptions")
|
||||
// close and reopen subscriptions to users on the follow list and admins
|
||||
if f.subsCancel != nil {
|
||||
log.I.F("follows syncer: cancelling existing subscriptions")
|
||||
f.subsCancel()
|
||||
}
|
||||
ctx, cancel := context.WithCancel(f.Ctx)
|
||||
f.subsCancel = cancel
|
||||
innerCancel = cancel
|
||||
log.I.F("follows syncer: (re)opening subscriptions")
|
||||
f.startSubscriptions(ctx)
|
||||
}
|
||||
// small sleep to avoid tight loop if updated fires rapidly
|
||||
if innerCancel == nil {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
Reference in New Issue
Block a user