Remove spider functionality and related configurations
- Deleted the spider package and its associated functionality from the main application. - Removed references to spider mode and frequency from configuration files and documentation. - Updated the main application to eliminate spider initialization and related logic. - Cleaned up import statements and configuration options to reflect the removal of spider features.
This commit is contained in:
@@ -1,457 +0,0 @@
|
||||
package spider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/app/config"
|
||||
"next.orly.dev/pkg/acl"
|
||||
"next.orly.dev/pkg/database"
|
||||
"next.orly.dev/pkg/database/indexes/types"
|
||||
"next.orly.dev/pkg/encoders/filter"
|
||||
"next.orly.dev/pkg/encoders/kind"
|
||||
"next.orly.dev/pkg/encoders/tag"
|
||||
"next.orly.dev/pkg/encoders/timestamp"
|
||||
"next.orly.dev/pkg/protocol/ws"
|
||||
"next.orly.dev/pkg/utils/normalize"
|
||||
)
|
||||
|
||||
const (
|
||||
OneTimeSpiderSyncMarker = "spider_one_time_sync_completed"
|
||||
SpiderLastScanMarker = "spider_last_scan_time"
|
||||
// MaxWebSocketMessageSize is the maximum size for WebSocket messages
|
||||
MaxWebSocketMessageSize = 100 * 1024 * 1024 // 100MB
|
||||
// PubkeyHexSize is the size of a hex-encoded pubkey (32 bytes = 64 hex chars)
|
||||
PubkeyHexSize = 64
|
||||
)
|
||||
|
||||
type Spider struct {
|
||||
db *database.D
|
||||
cfg *config.C
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
// Configured relay addresses for self-detection
|
||||
relayAddresses []string
|
||||
}
|
||||
|
||||
func New(
|
||||
db *database.D, cfg *config.C, ctx context.Context,
|
||||
cancel context.CancelFunc,
|
||||
) *Spider {
|
||||
return &Spider{
|
||||
db: db,
|
||||
cfg: cfg,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
relayAddresses: cfg.RelayAddresses,
|
||||
}
|
||||
}
|
||||
|
||||
// Start initializes the spider functionality based on configuration
|
||||
func (s *Spider) Start() {
|
||||
if s.cfg.SpiderMode != "follows" {
|
||||
log.D.Ln("Spider mode is not set to 'follows', skipping spider functionality")
|
||||
return
|
||||
}
|
||||
|
||||
log.I.Ln("Starting spider in follow mode")
|
||||
|
||||
// Check if one-time sync has been completed
|
||||
if !s.db.HasMarker(OneTimeSpiderSyncMarker) {
|
||||
log.I.Ln("Performing one-time spider sync back one month")
|
||||
go s.performOneTimeSync()
|
||||
} else {
|
||||
log.D.Ln("One-time spider sync already completed, skipping")
|
||||
}
|
||||
|
||||
// Start periodic scanning
|
||||
go s.startPeriodicScanning()
|
||||
}
|
||||
|
||||
// performOneTimeSync performs the initial sync going back one month
|
||||
func (s *Spider) performOneTimeSync() {
|
||||
defer func() {
|
||||
// Mark the one-time sync as completed
|
||||
timestamp := strconv.FormatInt(time.Now().Unix(), 10)
|
||||
if err := s.db.SetMarker(
|
||||
OneTimeSpiderSyncMarker, []byte(timestamp),
|
||||
); err != nil {
|
||||
log.E.F("Failed to set one-time sync marker: %v", err)
|
||||
} else {
|
||||
log.I.Ln("One-time spider sync completed and marked")
|
||||
}
|
||||
}()
|
||||
|
||||
// Calculate the time one month ago
|
||||
oneMonthAgo := time.Now().AddDate(0, -1, 0)
|
||||
log.I.F("Starting one-time spider sync from %v", oneMonthAgo)
|
||||
|
||||
// Perform the sync (placeholder - would need actual implementation based on follows)
|
||||
if err := s.performSync(oneMonthAgo, time.Now()); err != nil {
|
||||
log.E.F("One-time spider sync failed: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.I.Ln("One-time spider sync completed successfully")
|
||||
}
|
||||
|
||||
// startPeriodicScanning starts the regular scanning process
|
||||
func (s *Spider) startPeriodicScanning() {
|
||||
ticker := time.NewTicker(s.cfg.SpiderFrequency)
|
||||
defer ticker.Stop()
|
||||
|
||||
log.I.F("Starting periodic spider scanning every %v", s.cfg.SpiderFrequency)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
log.D.Ln("Spider periodic scanning stopped due to context cancellation")
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.performPeriodicScan()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// performPeriodicScan performs the regular scan of the last two hours (double the frequency window)
|
||||
func (s *Spider) performPeriodicScan() {
|
||||
// Calculate the scanning window (double the frequency period)
|
||||
scanWindow := s.cfg.SpiderFrequency * 2
|
||||
scanStart := time.Now().Add(-scanWindow)
|
||||
scanEnd := time.Now()
|
||||
|
||||
log.D.F(
|
||||
"Performing periodic spider scan from %v to %v (window: %v)", scanStart,
|
||||
scanEnd, scanWindow,
|
||||
)
|
||||
|
||||
if err := s.performSync(scanStart, scanEnd); err != nil {
|
||||
log.E.F("Periodic spider scan failed: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Update the last scan marker
|
||||
timestamp := strconv.FormatInt(time.Now().Unix(), 10)
|
||||
if err := s.db.SetMarker(
|
||||
SpiderLastScanMarker, []byte(timestamp),
|
||||
); err != nil {
|
||||
log.E.F("Failed to update last scan marker: %v", err)
|
||||
}
|
||||
|
||||
log.D.F("Periodic spider scan completed successfully")
|
||||
}
|
||||
|
||||
// performSync performs the actual sync operation for the given time range
|
||||
func (s *Spider) performSync(startTime, endTime time.Time) error {
|
||||
log.D.F(
|
||||
"Spider sync from %v to %v - starting implementation", startTime,
|
||||
endTime,
|
||||
)
|
||||
|
||||
// 1. Check ACL mode is set to "follows"
|
||||
if s.cfg.ACLMode != "follows" {
|
||||
log.D.F(
|
||||
"Spider sync skipped - ACL mode is not 'follows' (current: %s)",
|
||||
s.cfg.ACLMode,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// 2. Get the list of followed users from the ACL system
|
||||
followedPubkeys, err := s.getFollowedPubkeys()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(followedPubkeys) == 0 {
|
||||
log.D.Ln("Spider sync: no followed pubkeys found")
|
||||
return nil
|
||||
}
|
||||
|
||||
log.D.F("Spider sync: found %d followed pubkeys", len(followedPubkeys))
|
||||
|
||||
// 3. Discover relay lists from followed users
|
||||
relayURLs, err := s.discoverRelays(followedPubkeys)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(relayURLs) == 0 {
|
||||
log.W.Ln("Spider sync: no relays discovered from followed users")
|
||||
return nil
|
||||
}
|
||||
|
||||
log.I.F("Spider sync: discovered %d relay URLs", len(relayURLs))
|
||||
|
||||
// 4. Query each relay for events from followed pubkeys in the time range
|
||||
eventsFound := 0
|
||||
for _, relayURL := range relayURLs {
|
||||
log.I.F("Spider sync: fetching follow lists from relay %s", relayURL)
|
||||
count, err := s.queryRelayForEvents(
|
||||
relayURL, followedPubkeys, startTime, endTime,
|
||||
)
|
||||
if err != nil {
|
||||
log.E.F("Spider sync: error querying relay %s: %v", relayURL, err)
|
||||
continue
|
||||
}
|
||||
log.I.F("Spider sync: completed fetching from relay %s, found %d events", relayURL, count)
|
||||
eventsFound += count
|
||||
}
|
||||
|
||||
log.I.F(
|
||||
"Spider sync completed: found %d new events from %d relays",
|
||||
eventsFound, len(relayURLs),
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getFollowedPubkeys retrieves the list of followed pubkeys from the ACL system
|
||||
func (s *Spider) getFollowedPubkeys() ([][]byte, error) {
|
||||
// Access the ACL registry to get the current ACL instance
|
||||
var followedPubkeys [][]byte
|
||||
|
||||
// Get all ACL instances and find the active one
|
||||
for _, aclInstance := range acl.Registry.ACL {
|
||||
if aclInstance.Type() == acl.Registry.Active.Load() {
|
||||
// Cast to *Follows to access the follows field
|
||||
if followsACL, ok := aclInstance.(*acl.Follows); ok {
|
||||
followedPubkeys = followsACL.GetFollowedPubkeys()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return followedPubkeys, nil
|
||||
}
|
||||
|
||||
// discoverRelays discovers relay URLs from kind 10002 events of followed users
|
||||
func (s *Spider) discoverRelays(followedPubkeys [][]byte) ([]string, error) {
|
||||
seen := make(map[string]struct{})
|
||||
var urls []string
|
||||
|
||||
for _, pubkey := range followedPubkeys {
|
||||
// Query for kind 10002 (RelayListMetadata) events from this pubkey
|
||||
fl := &filter.F{
|
||||
Authors: tag.NewFromAny(pubkey),
|
||||
Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
|
||||
}
|
||||
|
||||
idxs, err := database.GetIndexesFromFilter(fl)
|
||||
if chk.E(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
var sers types.Uint40s
|
||||
for _, idx := range idxs {
|
||||
s, err := s.db.GetSerialsByRange(idx)
|
||||
if chk.E(err) {
|
||||
continue
|
||||
}
|
||||
sers = append(sers, s...)
|
||||
}
|
||||
|
||||
for _, ser := range sers {
|
||||
ev, err := s.db.FetchEventBySerial(ser)
|
||||
if chk.E(err) || ev == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Extract relay URLs from 'r' tags
|
||||
for _, v := range ev.Tags.GetAll([]byte("r")) {
|
||||
u := string(v.Value())
|
||||
n := string(normalize.URL(u))
|
||||
if n == "" {
|
||||
continue
|
||||
}
|
||||
// Skip if this relay is one of the configured relay addresses
|
||||
skipRelay := false
|
||||
for _, relayAddr := range s.relayAddresses {
|
||||
if n == relayAddr {
|
||||
log.D.F("spider: skipping configured relay address: %s", n)
|
||||
skipRelay = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if skipRelay {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[n]; ok {
|
||||
continue
|
||||
}
|
||||
seen[n] = struct{}{}
|
||||
urls = append(urls, n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return urls, nil
|
||||
}
|
||||
|
||||
// calculateOptimalChunkSize calculates the optimal chunk size for pubkeys to stay under message size limit
|
||||
func (s *Spider) calculateOptimalChunkSize() int {
|
||||
// Estimate the size of a filter with timestamps and other fields
|
||||
// Base filter overhead: ~200 bytes for timestamps, limits, etc.
|
||||
baseFilterSize := 200
|
||||
|
||||
// Calculate how many pubkeys we can fit in the remaining space
|
||||
availableSpace := MaxWebSocketMessageSize - baseFilterSize
|
||||
maxPubkeys := availableSpace / PubkeyHexSize
|
||||
|
||||
// Use a conservative chunk size (80% of max to be safe)
|
||||
chunkSize := int(float64(maxPubkeys) * 0.8)
|
||||
|
||||
// Ensure minimum chunk size of 10
|
||||
if chunkSize < 10 {
|
||||
chunkSize = 10
|
||||
}
|
||||
|
||||
log.D.F(
|
||||
"Spider: calculated optimal chunk size: %d pubkeys (max would be %d)",
|
||||
chunkSize, maxPubkeys,
|
||||
)
|
||||
return chunkSize
|
||||
}
|
||||
|
||||
// queryRelayForEvents connects to a relay and queries for events from followed pubkeys
|
||||
func (s *Spider) queryRelayForEvents(
|
||||
relayURL string, followedPubkeys [][]byte, startTime, endTime time.Time,
|
||||
) (int, error) {
|
||||
log.T.F(
|
||||
"Spider sync: querying relay %s with %d pubkeys", relayURL,
|
||||
len(followedPubkeys),
|
||||
)
|
||||
|
||||
// Connect to the relay with a timeout context
|
||||
ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
client, err := ws.RelayConnect(ctx, relayURL)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// Break pubkeys into chunks to avoid 32KB message limit
|
||||
chunkSize := s.calculateOptimalChunkSize()
|
||||
totalEventsSaved := 0
|
||||
|
||||
for i := 0; i < len(followedPubkeys); i += chunkSize {
|
||||
end := i + chunkSize
|
||||
if end > len(followedPubkeys) {
|
||||
end = len(followedPubkeys)
|
||||
}
|
||||
|
||||
chunk := followedPubkeys[i:end]
|
||||
log.T.F(
|
||||
"Spider sync: processing chunk %d-%d (%d pubkeys) for relay %s",
|
||||
i, end-1, len(chunk), relayURL,
|
||||
)
|
||||
|
||||
// Create filter for this chunk of pubkeys
|
||||
f := &filter.F{
|
||||
Authors: tag.NewFromBytesSlice(chunk...),
|
||||
Since: timestamp.FromUnix(startTime.Unix()),
|
||||
Until: timestamp.FromUnix(endTime.Unix()),
|
||||
Limit: func() *uint { l := uint(500); return &l }(), // Limit to avoid overwhelming
|
||||
}
|
||||
|
||||
// Subscribe to get events for this chunk
|
||||
sub, err := client.Subscribe(ctx, filter.NewS(f))
|
||||
if err != nil {
|
||||
log.E.F(
|
||||
"Spider sync: failed to subscribe to chunk %d-%d for relay %s: %v",
|
||||
i, end-1, relayURL, err,
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
chunkEventsSaved := 0
|
||||
chunkEventsCount := 0
|
||||
timeout := time.After(10 * time.Second) // Timeout for receiving events
|
||||
|
||||
chunkDone := false
|
||||
for !chunkDone {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.T.F(
|
||||
"Spider sync: context done for relay %s chunk %d-%d, saved %d/%d events",
|
||||
relayURL, i, end-1, chunkEventsSaved, chunkEventsCount,
|
||||
)
|
||||
chunkDone = true
|
||||
case <-timeout:
|
||||
log.T.F(
|
||||
"Spider sync: timeout for relay %s chunk %d-%d, saved %d/%d events",
|
||||
relayURL, i, end-1, chunkEventsSaved, chunkEventsCount,
|
||||
)
|
||||
chunkDone = true
|
||||
case <-sub.EndOfStoredEvents:
|
||||
log.T.F(
|
||||
"Spider sync: end of stored events for relay %s chunk %d-%d, saved %d/%d events",
|
||||
relayURL, i, end-1, chunkEventsSaved, chunkEventsCount,
|
||||
)
|
||||
chunkDone = true
|
||||
case ev := <-sub.Events:
|
||||
if ev == nil {
|
||||
continue
|
||||
}
|
||||
chunkEventsCount++
|
||||
|
||||
// Verify the event signature
|
||||
if ok, err := ev.Verify(); !ok || err != nil {
|
||||
log.T.F(
|
||||
"Spider sync: invalid event signature from relay %s",
|
||||
relayURL,
|
||||
)
|
||||
ev.Free()
|
||||
continue
|
||||
}
|
||||
|
||||
// Save the event to the database
|
||||
if _, err := s.db.SaveEvent(s.ctx, ev); err != nil {
|
||||
if !strings.HasPrefix(err.Error(), "blocked:") {
|
||||
log.T.F(
|
||||
"Spider sync: error saving event from relay %s: %v",
|
||||
relayURL, err,
|
||||
)
|
||||
}
|
||||
// Event might already exist, which is fine for deduplication
|
||||
} else {
|
||||
chunkEventsSaved++
|
||||
if chunkEventsSaved%10 == 0 {
|
||||
log.T.F(
|
||||
"Spider sync: saved %d events from relay %s chunk %d-%d",
|
||||
chunkEventsSaved, relayURL, i, end-1,
|
||||
)
|
||||
}
|
||||
}
|
||||
ev.Free()
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up subscription
|
||||
sub.Unsub()
|
||||
totalEventsSaved += chunkEventsSaved
|
||||
|
||||
log.T.F(
|
||||
"Spider sync: completed chunk %d-%d for relay %s, saved %d events",
|
||||
i, end-1, relayURL, chunkEventsSaved,
|
||||
)
|
||||
}
|
||||
|
||||
log.T.F(
|
||||
"Spider sync: completed all chunks for relay %s, total saved %d events",
|
||||
relayURL, totalEventsSaved,
|
||||
)
|
||||
return totalEventsSaved, nil
|
||||
}
|
||||
|
||||
// Stop stops the spider functionality
|
||||
func (s *Spider) Stop() {
|
||||
log.D.Ln("Stopping spider")
|
||||
s.cancel()
|
||||
}
|
||||
Reference in New Issue
Block a user