diff --git a/app/config/config.go b/app/config/config.go index 7e46e91..23a2194 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -23,23 +23,25 @@ import ( // and default values. It defines parameters for app behaviour, storage // locations, logging, and network settings used across the relay service. type C struct { - AppName string `env:"ORLY_APP_NAME" usage:"set a name to display on information about the relay" default:"ORLY"` - DataDir string `env:"ORLY_DATA_DIR" usage:"storage location for the event store" default:"~/.local/share/ORLY"` - Listen string `env:"ORLY_LISTEN" default:"0.0.0.0" usage:"network listen address"` - Port int `env:"ORLY_PORT" default:"3334" usage:"port to listen on"` - HealthPort int `env:"ORLY_HEALTH_PORT" default:"0" usage:"optional health check HTTP port; 0 disables"` - EnableShutdown bool `env:"ORLY_ENABLE_SHUTDOWN" default:"false" usage:"if true, expose /shutdown on the health port to gracefully stop the process (for profiling)"` - LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"` - DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"` - LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"` - Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation,heap,block,goroutine,threadcreate,mutex"` - PprofPath string `env:"ORLY_PPROF_PATH" usage:"optional directory to write pprof profiles into (inside container); default is temporary dir"` - PprofHTTP bool `env:"ORLY_PPROF_HTTP" default:"false" usage:"if true, expose net/http/pprof on port 6060"` - OpenPprofWeb bool `env:"ORLY_OPEN_PPROF_WEB" default:"false" usage:"if true, automatically open the pprof web viewer when profiling is enabled"` - IPWhitelist []string `env:"ORLY_IP_WHITELIST" usage:"comma-separated list of IP addresses to allow access from, matches on prefixes to allow private subnets, eg 10.0.0 = 10.0.0.0/8"` - Admins []string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"` - Owners []string `env:"ORLY_OWNERS" usage:"comma-separated list of owner npubs, who have full control of the relay for wipe and restart and other functions"` - ACLMode string `env:"ORLY_ACL_MODE" usage:"ACL mode: follows,none" default:"none"` + AppName string `env:"ORLY_APP_NAME" usage:"set a name to display on information about the relay" default:"ORLY"` + DataDir string `env:"ORLY_DATA_DIR" usage:"storage location for the event store" default:"~/.local/share/ORLY"` + Listen string `env:"ORLY_LISTEN" default:"0.0.0.0" usage:"network listen address"` + Port int `env:"ORLY_PORT" default:"3334" usage:"port to listen on"` + HealthPort int `env:"ORLY_HEALTH_PORT" default:"0" usage:"optional health check HTTP port; 0 disables"` + EnableShutdown bool `env:"ORLY_ENABLE_SHUTDOWN" default:"false" usage:"if true, expose /shutdown on the health port to gracefully stop the process (for profiling)"` + LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"` + DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"` + LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"` + Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation,heap,block,goroutine,threadcreate,mutex"` + PprofPath string `env:"ORLY_PPROF_PATH" usage:"optional directory to write pprof profiles into (inside container); default is temporary dir"` + PprofHTTP bool `env:"ORLY_PPROF_HTTP" default:"false" usage:"if true, expose net/http/pprof on port 6060"` + OpenPprofWeb bool `env:"ORLY_OPEN_PPROF_WEB" default:"false" usage:"if true, automatically open the pprof web viewer when profiling is enabled"` + IPWhitelist []string `env:"ORLY_IP_WHITELIST" usage:"comma-separated list of IP addresses to allow access from, matches on prefixes to allow private subnets, eg 10.0.0 = 10.0.0.0/8"` + Admins []string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"` + Owners []string `env:"ORLY_OWNERS" usage:"comma-separated list of owner npubs, who have full control of the relay for wipe and restart and other functions"` + ACLMode string `env:"ORLY_ACL_MODE" usage:"ACL mode: follows,none" default:"none"` + SpiderMode string `env:"ORLY_SPIDER_MODE" usage:"spider mode: none,follow" default:"none"` + SpiderFrequency time.Duration `env:"ORLY_SPIDER_FREQUENCY" usage:"spider frequency in seconds" default:"1h"` } // New creates and initializes a new configuration object for the relay diff --git a/app/handle-event.go b/app/handle-event.go index 982600b..f6b5275 100644 --- a/app/handle-event.go +++ b/app/handle-event.go @@ -103,6 +103,20 @@ func (l *Listener) HandleEvent(msg []byte) (err error) { // user has write access or better, continue // log.D.F("user has %s access", accessLevel) } + // check for protected tag (NIP-70) + protectedTag := env.E.Tags.GetFirst([]byte("-")) + if protectedTag != nil && acl.Registry.Active.Load() != "none" { + // check that the pubkey of the event matches the authed pubkey + if !utils.FastEqual(l.authedPubkey.Load(), env.E.Pubkey) { + if err = Ok.Blocked( + l, env, + "protected tag may only be published by user authed to the same pubkey", + ); chk.E(err) { + return + } + return + } + } // if the event is a delete, process the delete if env.E.Kind == kind.EventDeletion.K { if err = l.HandleDelete(env); err != nil { diff --git a/app/handle-message.go b/app/handle-message.go index 99b0c64..aecfec9 100644 --- a/app/handle-message.go +++ b/app/handle-message.go @@ -1,9 +1,9 @@ package app import ( + "fmt" + "lol.mleku.dev/chk" - "lol.mleku.dev/errorf" - "lol.mleku.dev/log" "next.orly.dev/pkg/encoders/envelopes" "next.orly.dev/pkg/encoders/envelopes/authenvelope" "next.orly.dev/pkg/encoders/envelopes/closeenvelope" @@ -13,7 +13,7 @@ import ( ) func (l *Listener) HandleMessage(msg []byte, remote string) { - log.D.F("%s received message:\n%s", remote, msg) + // log.D.F("%s received message:\n%s", remote, msg) var err error var t string var rem []byte @@ -32,7 +32,7 @@ func (l *Listener) HandleMessage(msg []byte, remote string) { // log.D.F("authenvelope: %s %s", remote, rem) err = l.HandleAuth(rem) default: - err = errorf.E("unknown envelope type %s\n%s", t, rem) + err = fmt.Errorf("unknown envelope type %s\n%s", t, rem) } } if err != nil { @@ -43,7 +43,7 @@ func (l *Listener) HandleMessage(msg []byte, remote string) { // ) // }, // ) - if err = noticeenvelope.NewFrom(err.Error()).Write(l); chk.E(err) { + if err = noticeenvelope.NewFrom(err.Error()).Write(l); err != nil { return } } diff --git a/app/handle-req.go b/app/handle-req.go index d097c45..3c86065 100644 --- a/app/handle-req.go +++ b/app/handle-req.go @@ -9,7 +9,7 @@ import ( "github.com/dgraph-io/badger/v4" "lol.mleku.dev/chk" "lol.mleku.dev/log" - acl "next.orly.dev/pkg/acl" + "next.orly.dev/pkg/acl" "next.orly.dev/pkg/encoders/envelopes/authenvelope" "next.orly.dev/pkg/encoders/envelopes/closedenvelope" "next.orly.dev/pkg/encoders/envelopes/eoseenvelope" @@ -22,21 +22,21 @@ import ( "next.orly.dev/pkg/encoders/kind" "next.orly.dev/pkg/encoders/reason" "next.orly.dev/pkg/encoders/tag" - utils "next.orly.dev/pkg/utils" + "next.orly.dev/pkg/utils" "next.orly.dev/pkg/utils/normalize" "next.orly.dev/pkg/utils/pointers" ) func (l *Listener) HandleReq(msg []byte) (err error) { - log.T.F("HandleReq: START processing from %s\n%s\n", l.remote, msg) - var rem []byte + // log.T.F("HandleReq: START processing from %s\n%s\n", l.remote, msg) + // var rem []byte env := reqenvelope.New() - if rem, err = env.Unmarshal(msg); chk.E(err) { + if _, err = env.Unmarshal(msg); chk.E(err) { return normalize.Error.Errorf(err.Error()) } - if len(rem) > 0 { - log.I.F("REQ extra bytes: '%s'", rem) - } + // if len(rem) > 0 { + // log.I.F("REQ extra bytes: '%s'", rem) + // } // send a challenge to the client to auth if an ACL is active if acl.Registry.Active.Load() != "none" { if err = authenvelope.NewChallengeWith(l.challenge.Load()). @@ -57,59 +57,59 @@ func (l *Listener) HandleReq(msg []byte) (err error) { return default: // user has read access or better, continue - log.D.F("user has %s access", accessLevel) + // log.D.F("user has %s access", accessLevel) } var events event.S for _, f := range *env.Filters { - idsLen := 0 - kindsLen := 0 - authorsLen := 0 - tagsLen := 0 - if f != nil { - if f.Ids != nil { - idsLen = f.Ids.Len() - } - if f.Kinds != nil { - kindsLen = f.Kinds.Len() - } - if f.Authors != nil { - authorsLen = f.Authors.Len() - } - if f.Tags != nil { - tagsLen = f.Tags.Len() - } - } - log.T.F( - "REQ %s: filter summary ids=%d kinds=%d authors=%d tags=%d", - env.Subscription, idsLen, kindsLen, authorsLen, tagsLen, - ) + // idsLen := 0 + // kindsLen := 0 + // authorsLen := 0 + // tagsLen := 0 + // if f != nil { + // if f.Ids != nil { + // idsLen = f.Ids.Len() + // } + // if f.Kinds != nil { + // kindsLen = f.Kinds.Len() + // } + // if f.Authors != nil { + // authorsLen = f.Authors.Len() + // } + // if f.Tags != nil { + // tagsLen = f.Tags.Len() + // } + // } + // log.T.F( + // "REQ %s: filter summary ids=%d kinds=%d authors=%d tags=%d", + // env.Subscription, idsLen, kindsLen, authorsLen, tagsLen, + // ) if f != nil && f.Authors != nil && f.Authors.Len() > 0 { var authors []string for _, a := range f.Authors.T { authors = append(authors, hex.Enc(a)) } - log.T.F("REQ %s: authors=%v", env.Subscription, authors) + // log.T.F("REQ %s: authors=%v", env.Subscription, authors) } - if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 { - log.T.F("REQ %s: kinds=%v", env.Subscription, f.Kinds.ToUint16()) - } - if f != nil && f.Ids != nil && f.Ids.Len() > 0 { - var ids []string - for _, id := range f.Ids.T { - ids = append(ids, hex.Enc(id)) - } - var lim any - if pointers.Present(f.Limit) { - lim = *f.Limit - } else { - lim = nil - } - log.T.F( - "REQ %s: ids filter count=%d ids=%v limit=%v", env.Subscription, - f.Ids.Len(), ids, lim, - ) - } - if pointers.Present(f.Limit) { + // if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 { + // log.T.F("REQ %s: kinds=%v", env.Subscription, f.Kinds.ToUint16()) + // } + // if f != nil && f.Ids != nil && f.Ids.Len() > 0 { + // var ids []string + // for _, id := range f.Ids.T { + // ids = append(ids, hex.Enc(id)) + // } + // // var lim any + // // if pointers.Present(f.Limit) { + // // lim = *f.Limit + // // } else { + // // lim = nil + // // } + // // log.T.F( + // // "REQ %s: ids filter count=%d ids=%v limit=%v", env.Subscription, + // // f.Ids.Len(), ids, lim, + // // ) + // } + if f != nil && pointers.Present(f.Limit) { if *f.Limit == 0 { continue } @@ -119,15 +119,15 @@ func (l *Listener) HandleReq(msg []byte) (err error) { context.Background(), 30*time.Second, ) defer cancel() - log.T.F( - "HandleReq: About to QueryEvents for %s, main context done: %v", - l.remote, l.ctx.Err() != nil, - ) + // log.T.F( + // "HandleReq: About to QueryEvents for %s, main context done: %v", + // l.remote, l.ctx.Err() != nil, + // ) if events, err = l.QueryEvents(queryCtx, f); chk.E(err) { if errors.Is(err, badger.ErrDBClosed) { return } - log.T.F("HandleReq: QueryEvents error for %s: %v", l.remote, err) + // log.T.F("HandleReq: QueryEvents error for %s: %v", l.remote, err) err = nil } defer func() { @@ -135,23 +135,23 @@ func (l *Listener) HandleReq(msg []byte) (err error) { ev.Free() } }() - log.T.F( - "HandleReq: QueryEvents completed for %s, found %d events", - l.remote, len(events), - ) + // log.T.F( + // "HandleReq: QueryEvents completed for %s, found %d events", + // l.remote, len(events), + // ) } var tmp event.S privCheck: for _, ev := range events { if kind.IsPrivileged(ev.Kind) && accessLevel != "admin" { // admins can see all events - log.T.C( - func() string { - return fmt.Sprintf( - "checking privileged event %0x", ev.ID, - ) - }, - ) + // log.T.C( + // func() string { + // return fmt.Sprintf( + // "checking privileged event %0x", ev.ID, + // ) + // }, + // ) pk := l.authedPubkey.Load() if pk == nil { continue @@ -175,26 +175,26 @@ privCheck: continue } if utils.FastEqual(pt, pk) { - log.T.C( - func() string { - return fmt.Sprintf( - "privileged event %s is for logged in pubkey %0x", - ev.ID, pk, - ) - }, - ) + // log.T.C( + // func() string { + // return fmt.Sprintf( + // "privileged event %s is for logged in pubkey %0x", + // ev.ID, pk, + // ) + // }, + // ) tmp = append(tmp, ev) continue privCheck } } - log.T.C( - func() string { - return fmt.Sprintf( - "privileged event %s does not contain the logged in pubkey %0x", - ev.ID, pk, - ) - }, - ) + // log.T.C( + // func() string { + // return fmt.Sprintf( + // "privileged event %s does not contain the logged in pubkey %0x", + // ev.ID, pk, + // ) + // }, + // ) } else { tmp = append(tmp, ev) } @@ -202,19 +202,19 @@ privCheck: events = tmp seen := make(map[string]struct{}) for _, ev := range events { - log.D.C( - func() string { - return fmt.Sprintf( - "REQ %s: sending EVENT id=%s kind=%d", env.Subscription, - hex.Enc(ev.ID), ev.Kind, - ) - }, - ) - log.T.C( - func() string { - return fmt.Sprintf("event:\n%s\n", ev.Serialize()) - }, - ) + // log.D.C( + // func() string { + // return fmt.Sprintf( + // "REQ %s: sending EVENT id=%s kind=%d", env.Subscription, + // hex.Enc(ev.ID), ev.Kind, + // ) + // }, + // ) + // log.T.C( + // func() string { + // return fmt.Sprintf("event:\n%s\n", ev.Serialize()) + // }, + // ) var res *eventenvelope.Result if res, err = eventenvelope.NewResultWith( env.Subscription, ev, @@ -229,7 +229,7 @@ privCheck: } // write the EOSE to signal to the client that all events found have been // sent. - log.T.F("sending EOSE to %s", l.remote) + // log.T.F("sending EOSE to %s", l.remote) if err = eoseenvelope.NewFrom(env.Subscription). Write(l); chk.E(err) { return @@ -237,10 +237,10 @@ privCheck: // if the query was for just Ids, we know there can't be any more results, // so cancel the subscription. cancel := true - log.T.F( - "REQ %s: computing cancel/subscription; events_sent=%d", - env.Subscription, len(events), - ) + // log.T.F( + // "REQ %s: computing cancel/subscription; events_sent=%d", + // env.Subscription, len(events), + // ) var subbedFilters filter.S for _, f := range *env.Filters { if f.Ids.Len() < 1 { @@ -255,10 +255,10 @@ privCheck: } notFounds = append(notFounds, id) } - log.T.F( - "REQ %s: ids outstanding=%d of %d", env.Subscription, - len(notFounds), f.Ids.Len(), - ) + // log.T.F( + // "REQ %s: ids outstanding=%d of %d", env.Subscription, + // len(notFounds), f.Ids.Len(), + // ) // if all were found, don't add to subbedFilters if len(notFounds) == 0 { continue @@ -295,6 +295,6 @@ privCheck: return } } - log.T.F("HandleReq: COMPLETED processing from %s", l.remote) + // log.T.F("HandleReq: COMPLETED processing from %s", l.remote) return } diff --git a/app/handle-websocket.go b/app/handle-websocket.go index 8add693..282204e 100644 --- a/app/handle-websocket.go +++ b/app/handle-websocket.go @@ -97,7 +97,7 @@ whitelist: } var typ websocket.MessageType var msg []byte - log.T.F("waiting for message from %s", remote) + // log.T.F("waiting for message from %s", remote) // Create a read context with timeout to prevent indefinite blocking readCtx, readCancel := context.WithTimeout(ctx, DefaultReadTimeout) @@ -152,7 +152,7 @@ whitelist: writeCancel() continue } - log.T.F("received message from %s: %s", remote, string(msg)) + // log.T.F("received message from %s: %s", remote, string(msg)) go listener.HandleMessage(msg, remote) } } diff --git a/main.go b/main.go index 17a2f78..9308748 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( "next.orly.dev/app/config" "next.orly.dev/pkg/acl" "next.orly.dev/pkg/database" + "next.orly.dev/pkg/spider" "next.orly.dev/pkg/version" ) @@ -158,6 +159,12 @@ func main() { } acl.Registry.Syncer() + // Initialize and start spider functionality if enabled + spiderCtx, spiderCancel := context.WithCancel(ctx) + spiderInstance := spider.New(db, cfg, spiderCtx, spiderCancel) + spiderInstance.Start() + defer spiderInstance.Stop() + // Start HTTP pprof server if enabled if cfg.PprofHTTP { pprofAddr := fmt.Sprintf("%s:%d", cfg.Listen, 6060) diff --git a/pkg/acl/follows.go b/pkg/acl/follows.go index dc01e12..2774c4e 100644 --- a/pkg/acl/follows.go +++ b/pkg/acl/follows.go @@ -360,6 +360,16 @@ func (f *Follows) Syncer() { f.updated <- struct{}{} } +// GetFollowedPubkeys returns a copy of the followed pubkeys list +func (f *Follows) GetFollowedPubkeys() [][]byte { + f.followsMx.RLock() + defer f.followsMx.RUnlock() + + followedPubkeys := make([][]byte, len(f.follows)) + copy(followedPubkeys, f.follows) + return followedPubkeys +} + func init() { log.T.F("registering follows ACL") Registry.Register(new(Follows)) diff --git a/pkg/database/markers.go b/pkg/database/markers.go new file mode 100644 index 0000000..2251853 --- /dev/null +++ b/pkg/database/markers.go @@ -0,0 +1,62 @@ +package database + +import ( + "github.com/dgraph-io/badger/v4" + "lol.mleku.dev/chk" +) + +const ( + markerPrefix = "MARKER:" +) + +// SetMarker stores an arbitrary marker in the database +func (d *D) SetMarker(key string, value []byte) (err error) { + markerKey := []byte(markerPrefix + key) + + err = d.Update(func(txn *badger.Txn) error { + return txn.Set(markerKey, value) + }) + + return +} + +// GetMarker retrieves an arbitrary marker from the database +func (d *D) GetMarker(key string) (value []byte, err error) { + markerKey := []byte(markerPrefix + key) + + err = d.View(func(txn *badger.Txn) error { + item, err := txn.Get(markerKey) + if err != nil { + return err + } + + value, err = item.ValueCopy(nil) + return err + }) + + return +} + +// HasMarker checks if a marker exists in the database +func (d *D) HasMarker(key string) (exists bool) { + markerKey := []byte(markerPrefix + key) + + err := d.View(func(txn *badger.Txn) error { + _, err := txn.Get(markerKey) + return err + }) + + exists = !chk.E(err) + return +} + +// DeleteMarker removes a marker from the database +func (d *D) DeleteMarker(key string) (err error) { + markerKey := []byte(markerPrefix + key) + + err = d.Update(func(txn *badger.Txn) error { + return txn.Delete(markerKey) + }) + + return +} \ No newline at end of file diff --git a/pkg/encoders/tag/tags.go b/pkg/encoders/tag/tags.go index 8c426cb..19003ed 100644 --- a/pkg/encoders/tag/tags.go +++ b/pkg/encoders/tag/tags.go @@ -24,7 +24,8 @@ func NewSWithCap(c int) (s *S) { func (s *S) Len() int { if s == nil { - panic("tags cannot be used without initialization") + return 0 + // panic("tags cannot be used without initialization") } return len(*s) } diff --git a/pkg/spider/spider.go b/pkg/spider/spider.go new file mode 100644 index 0000000..eec105e --- /dev/null +++ b/pkg/spider/spider.go @@ -0,0 +1,372 @@ +package spider + +import ( + "context" + "strconv" + "strings" + "time" + + "lol.mleku.dev/chk" + "lol.mleku.dev/log" + "next.orly.dev/app/config" + "next.orly.dev/pkg/acl" + "next.orly.dev/pkg/database" + "next.orly.dev/pkg/database/indexes/types" + "next.orly.dev/pkg/encoders/filter" + "next.orly.dev/pkg/encoders/kind" + "next.orly.dev/pkg/encoders/tag" + "next.orly.dev/pkg/encoders/timestamp" + "next.orly.dev/pkg/protocol/ws" + "next.orly.dev/pkg/utils/normalize" +) + +const ( + OneTimeSpiderSyncMarker = "spider_one_time_sync_completed" + SpiderLastScanMarker = "spider_last_scan_time" +) + +type Spider struct { + db *database.D + cfg *config.C + ctx context.Context + cancel context.CancelFunc +} + +func New( + db *database.D, cfg *config.C, ctx context.Context, + cancel context.CancelFunc, +) *Spider { + return &Spider{ + db: db, + cfg: cfg, + ctx: ctx, + cancel: cancel, + } +} + +// Start initializes the spider functionality based on configuration +func (s *Spider) Start() { + if s.cfg.SpiderMode != "follows" { + log.D.Ln("Spider mode is not set to 'follows', skipping spider functionality") + return + } + + log.I.Ln("Starting spider in follow mode") + + // Check if one-time sync has been completed + if !s.db.HasMarker(OneTimeSpiderSyncMarker) { + log.I.Ln("Performing one-time spider sync back one month") + go s.performOneTimeSync() + } else { + log.D.Ln("One-time spider sync already completed, skipping") + } + + // Start periodic scanning + go s.startPeriodicScanning() +} + +// performOneTimeSync performs the initial sync going back one month +func (s *Spider) performOneTimeSync() { + defer func() { + // Mark the one-time sync as completed + timestamp := strconv.FormatInt(time.Now().Unix(), 10) + if err := s.db.SetMarker( + OneTimeSpiderSyncMarker, []byte(timestamp), + ); err != nil { + log.E.F("Failed to set one-time sync marker: %v", err) + } else { + log.I.Ln("One-time spider sync completed and marked") + } + }() + + // Calculate the time one month ago + oneMonthAgo := time.Now().AddDate(0, -1, 0) + log.I.F("Starting one-time spider sync from %v", oneMonthAgo) + + // Perform the sync (placeholder - would need actual implementation based on follows) + if err := s.performSync(oneMonthAgo, time.Now()); err != nil { + log.E.F("One-time spider sync failed: %v", err) + return + } + + log.I.Ln("One-time spider sync completed successfully") +} + +// startPeriodicScanning starts the regular scanning process +func (s *Spider) startPeriodicScanning() { + ticker := time.NewTicker(s.cfg.SpiderFrequency) + defer ticker.Stop() + + log.I.F("Starting periodic spider scanning every %v", s.cfg.SpiderFrequency) + + for { + select { + case <-s.ctx.Done(): + log.D.Ln("Spider periodic scanning stopped due to context cancellation") + return + case <-ticker.C: + s.performPeriodicScan() + } + } +} + +// performPeriodicScan performs the regular scan of the last two hours (double the frequency window) +func (s *Spider) performPeriodicScan() { + // Calculate the scanning window (double the frequency period) + scanWindow := s.cfg.SpiderFrequency * 2 + scanStart := time.Now().Add(-scanWindow) + scanEnd := time.Now() + + log.D.F( + "Performing periodic spider scan from %v to %v (window: %v)", scanStart, + scanEnd, scanWindow, + ) + + if err := s.performSync(scanStart, scanEnd); err != nil { + log.E.F("Periodic spider scan failed: %v", err) + return + } + + // Update the last scan marker + timestamp := strconv.FormatInt(time.Now().Unix(), 10) + if err := s.db.SetMarker( + SpiderLastScanMarker, []byte(timestamp), + ); err != nil { + log.E.F("Failed to update last scan marker: %v", err) + } + + log.D.F("Periodic spider scan completed successfully") +} + +// performSync performs the actual sync operation for the given time range +func (s *Spider) performSync(startTime, endTime time.Time) error { + log.D.F( + "Spider sync from %v to %v - starting implementation", startTime, + endTime, + ) + + // 1. Check ACL mode is set to "follows" + if s.cfg.ACLMode != "follows" { + log.D.F( + "Spider sync skipped - ACL mode is not 'follows' (current: %s)", + s.cfg.ACLMode, + ) + return nil + } + + // 2. Get the list of followed users from the ACL system + followedPubkeys, err := s.getFollowedPubkeys() + if err != nil { + return err + } + + if len(followedPubkeys) == 0 { + log.D.Ln("Spider sync: no followed pubkeys found") + return nil + } + + log.D.F("Spider sync: found %d followed pubkeys", len(followedPubkeys)) + + // 3. Discover relay lists from followed users + relayURLs, err := s.discoverRelays(followedPubkeys) + if err != nil { + return err + } + + if len(relayURLs) == 0 { + log.W.Ln("Spider sync: no relays discovered from followed users") + return nil + } + + log.I.F("Spider sync: discovered %d relay URLs", len(relayURLs)) + + // 4. Query each relay for events from followed pubkeys in the time range + eventsFound := 0 + for _, relayURL := range relayURLs { + count, err := s.queryRelayForEvents( + relayURL, followedPubkeys, startTime, endTime, + ) + if err != nil { + log.E.F("Spider sync: error querying relay %s: %v", relayURL, err) + continue + } + eventsFound += count + } + + log.I.F( + "Spider sync completed: found %d new events from %d relays", + eventsFound, len(relayURLs), + ) + + return nil +} + +// getFollowedPubkeys retrieves the list of followed pubkeys from the ACL system +func (s *Spider) getFollowedPubkeys() ([][]byte, error) { + // Access the ACL registry to get the current ACL instance + var followedPubkeys [][]byte + + // Get all ACL instances and find the active one + for _, aclInstance := range acl.Registry.ACL { + if aclInstance.Type() == acl.Registry.Active.Load() { + // Cast to *Follows to access the follows field + if followsACL, ok := aclInstance.(*acl.Follows); ok { + followedPubkeys = followsACL.GetFollowedPubkeys() + break + } + } + } + + return followedPubkeys, nil +} + +// discoverRelays discovers relay URLs from kind 10002 events of followed users +func (s *Spider) discoverRelays(followedPubkeys [][]byte) ([]string, error) { + seen := make(map[string]struct{}) + var urls []string + + for _, pubkey := range followedPubkeys { + // Query for kind 10002 (RelayListMetadata) events from this pubkey + fl := &filter.F{ + Authors: tag.NewFromAny(pubkey), + Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)), + } + + idxs, err := database.GetIndexesFromFilter(fl) + if chk.E(err) { + continue + } + + var sers types.Uint40s + for _, idx := range idxs { + s, err := s.db.GetSerialsByRange(idx) + if chk.E(err) { + continue + } + sers = append(sers, s...) + } + + for _, ser := range sers { + ev, err := s.db.FetchEventBySerial(ser) + if chk.E(err) || ev == nil { + continue + } + + // Extract relay URLs from 'r' tags + for _, v := range ev.Tags.GetAll([]byte("r")) { + u := string(v.Value()) + n := string(normalize.URL(u)) + if n == "" { + continue + } + if _, ok := seen[n]; ok { + continue + } + seen[n] = struct{}{} + urls = append(urls, n) + } + } + } + + return urls, nil +} + +// queryRelayForEvents connects to a relay and queries for events from followed pubkeys +func (s *Spider) queryRelayForEvents( + relayURL string, followedPubkeys [][]byte, startTime, endTime time.Time, +) (int, error) { + log.T.F("Spider sync: querying relay %s", relayURL) + + // Connect to the relay with a timeout context + ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second) + defer cancel() + + client, err := ws.RelayConnect(ctx, relayURL) + if err != nil { + return 0, err + } + defer client.Close() + + // Create filter for the time range and followed pubkeys + f := &filter.F{ + Authors: tag.NewFromBytesSlice(followedPubkeys...), + Since: timestamp.FromUnix(startTime.Unix()), + Until: timestamp.FromUnix(endTime.Unix()), + Limit: func() *uint { l := uint(1000); return &l }(), // Limit to avoid overwhelming + } + + // Subscribe to get events + sub, err := client.Subscribe(ctx, filter.NewS(f)) + if err != nil { + return 0, err + } + defer sub.Unsub() + + eventsCount := 0 + eventsSaved := 0 + timeout := time.After(10 * time.Second) // Timeout for receiving events + + for { + select { + case <-ctx.Done(): + log.T.F( + "Spider sync: context done for relay %s, saved %d/%d events", + relayURL, eventsSaved, eventsCount, + ) + return eventsSaved, nil + case <-timeout: + log.T.F( + "Spider sync: timeout for relay %s, saved %d/%d events", + relayURL, eventsSaved, eventsCount, + ) + return eventsSaved, nil + case <-sub.EndOfStoredEvents: + log.T.F( + "Spider sync: end of stored events for relay %s, saved %d/%d events", + relayURL, eventsSaved, eventsCount, + ) + return eventsSaved, nil + case ev := <-sub.Events: + if ev == nil { + continue + } + eventsCount++ + + // Verify the event signature + if ok, err := ev.Verify(); !ok || err != nil { + log.T.F( + "Spider sync: invalid event signature from relay %s", + relayURL, + ) + ev.Free() + continue + } + + // Save the event to the database + if _, _, err := s.db.SaveEvent(s.ctx, ev); err != nil { + if !strings.HasPrefix(err.Error(), "blocked:") { + log.T.F( + "Spider sync: error saving event from relay %s: %v", + relayURL, err, + ) + } + // Event might already exist, which is fine for deduplication + } else { + eventsSaved++ + if eventsSaved%10 == 0 { + log.T.F( + "Spider sync: saved %d events from relay %s", + eventsSaved, relayURL, + ) + } + } + ev.Free() + } + } +} + +// Stop stops the spider functionality +func (s *Spider) Stop() { + log.D.Ln("Stopping spider") + s.cancel() +} diff --git a/pkg/version/version b/pkg/version/version index a009943..48080b4 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.4.9 \ No newline at end of file +v0.5.0 \ No newline at end of file