From 95c608256404bb93648b164cf13c6623bd2cef14 Mon Sep 17 00:00:00 2001 From: mleku Date: Tue, 21 Oct 2025 16:31:17 +0100 Subject: [PATCH] Implement blacklisting for IPs and enhance follow list fetching - Added functionality to handle blacklisted IPs, allowing connections to remain open until a timeout is reached. - Introduced periodic fetching of admin follow lists to improve synchronization with relay data. - Updated WebSocket message size limits to accommodate larger payloads. - Enhanced logging for better traceability during follow list fetching and event processing. - Refactored event subscription logic to improve clarity and maintainability. --- app/config/config.go | 2 + app/handle-message.go | 20 +++ app/handle-req.go | 15 +- app/handle-websocket.go | 27 ++- app/listener.go | 17 +- app/server.go | 47 ++++- cmd/benchmark/configs/config.toml | 4 +- cmd/benchmark/configs/strfry.conf | 2 +- cmd/benchmark/setup-external-relays.sh | 6 +- main.go | 50 +++--- pkg/acl/follows.go | 236 +++++++++++++++++++++++-- pkg/spider/spider.go | 29 ++- 12 files changed, 385 insertions(+), 70 deletions(-) diff --git a/app/config/config.go b/app/config/config.go index 2225345..7d4c3b6 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -48,6 +48,8 @@ type C struct { SubscriptionEnabled bool `env:"ORLY_SUBSCRIPTION_ENABLED" default:"false" usage:"enable subscription-based access control requiring payment for non-directory events"` MonthlyPriceSats int64 `env:"ORLY_MONTHLY_PRICE_SATS" default:"6000" usage:"price in satoshis for one month subscription (default ~$2 USD)"` RelayURL string `env:"ORLY_RELAY_URL" usage:"base URL for the relay dashboard (e.g., https://relay.example.com)"` + RelayAddresses []string `env:"ORLY_RELAY_ADDRESSES" usage:"comma-separated list of websocket addresses for this relay (e.g., wss://relay.example.com,wss://backup.example.com) - used by spider to avoid self-connections"` + FollowListFrequency time.Duration `env:"ORLY_FOLLOW_LIST_FREQUENCY" usage:"how often to fetch admin follow lists (default: 1h)" default:"1h"` // Web UI and dev mode settings WebDisableEmbedded bool `env:"ORLY_WEB_DISABLE" default:"false" usage:"disable serving the embedded web UI; useful for hot-reload during development"` diff --git a/app/handle-message.go b/app/handle-message.go index e314058..41838f1 100644 --- a/app/handle-message.go +++ b/app/handle-message.go @@ -2,6 +2,7 @@ package app import ( "fmt" + "time" "lol.mleku.dev/chk" "lol.mleku.dev/log" @@ -15,6 +16,25 @@ import ( ) func (l *Listener) HandleMessage(msg []byte, remote string) { + // Ignore all messages from self-connections + if l.isSelfConnection { + log.D.F("ignoring message from self-connection %s", remote) + return + } + + // Handle blacklisted IPs - discard messages but keep connection open until timeout + if l.isBlacklisted { + // Check if timeout has been reached + if time.Now().After(l.blacklistTimeout) { + log.W.F("blacklisted IP %s timeout reached, closing connection", remote) + // Close the connection by cancelling the context + // The websocket handler will detect this and close the connection + return + } + log.D.F("discarding message from blacklisted IP %s (timeout in %v)", remote, time.Until(l.blacklistTimeout)) + return + } + msgPreview := string(msg) if len(msgPreview) > 150 { msgPreview = msgPreview[:150] + "..." diff --git a/app/handle-req.go b/app/handle-req.go index 5489c84..396ff1b 100644 --- a/app/handle-req.go +++ b/app/handle-req.go @@ -37,6 +37,7 @@ func (l *Listener) HandleReq(msg []byte) (err error) { if _, err = env.Unmarshal(msg); chk.E(err) { return normalize.Error.Errorf(err.Error()) } + log.T.C( func() string { return fmt.Sprintf( @@ -131,16 +132,18 @@ func (l *Listener) HandleReq(msg []byte) (err error) { ) // Process large author lists by breaking them into chunks - if f.Authors != nil && f.Authors.Len() > 50 { + if f.Authors != nil && f.Authors.Len() > 1000 { log.W.F("REQ %s: breaking down large author list (%d authors) into chunks", env.Subscription, f.Authors.Len()) - // Calculate chunk size based on kinds to avoid OOM - chunkSize := 50 + // Calculate chunk size to stay under message size limits + // Each pubkey is 64 hex chars, plus JSON overhead, so ~100 bytes per author + // Target ~50MB per chunk to stay well under 100MB limit + chunkSize := ClientMessageSizeLimit / 200 // ~500KB per chunk if f.Kinds != nil && f.Kinds.Len() > 0 { // Reduce chunk size if there are multiple kinds to prevent too many index ranges - chunkSize = 50 / f.Kinds.Len() - if chunkSize < 10 { - chunkSize = 10 // Minimum chunk size + chunkSize = chunkSize / f.Kinds.Len() + if chunkSize < 100 { + chunkSize = 100 // Minimum chunk size } } diff --git a/app/handle-websocket.go b/app/handle-websocket.go index 9155614..33344af 100644 --- a/app/handle-websocket.go +++ b/app/handle-websocket.go @@ -21,6 +21,9 @@ const ( DefaultPingWait = DefaultPongWait / 2 DefaultWriteTimeout = 3 * time.Second DefaultMaxMessageSize = 100 * units.Mb + // ClientMessageSizeLimit is the maximum message size that clients can handle + // This is set to 100MB to allow large messages + ClientMessageSizeLimit = 100 * 1024 * 1024 // 100MB // CloseMessage denotes a close control message. The optional message // payload contains a numeric code and text. Use the FormatCloseMessage @@ -84,10 +87,23 @@ whitelist: req: r, startTime: time.Now(), } + + // Detect self-connections early to avoid sending AUTH challenges + listener.isSelfConnection = s.isSelfConnection(remote) + if listener.isSelfConnection { + log.W.F("detected self-connection from %s, marking connection", remote) + } + + // Check for blacklisted IPs + listener.isBlacklisted = s.isIPBlacklisted(remote) + if listener.isBlacklisted { + log.W.F("detected blacklisted IP %s, marking connection for timeout", remote) + listener.blacklistTimeout = time.Now().Add(time.Minute) // Timeout after 1 minute + } chal := make([]byte, 32) rand.Read(chal) listener.challenge.Store([]byte(hex.Enc(chal))) - if s.Config.ACLMode != "none" { + if s.Config.ACLMode != "none" && !listener.isSelfConnection { log.D.F("sending AUTH challenge to %s", remote) if err = authenvelope.NewChallengeWith(listener.challenge.Load()). Write(listener); chk.E(err) { @@ -95,6 +111,8 @@ whitelist: return } log.D.F("AUTH challenge sent successfully to %s", remote) + } else if listener.isSelfConnection { + log.D.F("skipping AUTH challenge for self-connection from %s", remote) } ticker := time.NewTicker(DefaultPingWait) go s.Pinger(ctx, conn, ticker, cancel) @@ -130,6 +148,13 @@ whitelist: return default: } + + // Check if blacklisted connection has timed out + if listener.isBlacklisted && time.Now().After(listener.blacklistTimeout) { + log.W.F("blacklisted IP %s timeout reached, closing connection", remote) + return + } + var typ websocket.MessageType var msg []byte log.T.F("waiting for message from %s", remote) diff --git a/app/listener.go b/app/listener.go index 63a3542..3a3fdc8 100644 --- a/app/listener.go +++ b/app/listener.go @@ -17,13 +17,16 @@ import ( type Listener struct { *Server - conn *websocket.Conn - ctx context.Context - remote string - req *http.Request - challenge atomic.Bytes - authedPubkey atomic.Bytes - startTime time.Time + conn *websocket.Conn + ctx context.Context + remote string + req *http.Request + challenge atomic.Bytes + authedPubkey atomic.Bytes + startTime time.Time + isSelfConnection bool // Marker to identify self-connections + isBlacklisted bool // Marker to identify blacklisted IPs + blacklistTimeout time.Time // When to timeout blacklisted connections // Diagnostics: per-connection counters msgCount int reqCount int diff --git a/app/server.go b/app/server.go index a4f9500..cad8b01 100644 --- a/app/server.go +++ b/app/server.go @@ -32,7 +32,6 @@ type Server struct { mux *http.ServeMux Config *config.C Ctx context.Context - remote string publishers *publish.S Admins [][]byte Owners [][]byte @@ -50,6 +49,52 @@ type Server struct { policyManager *policy.P } +// isSelfConnection checks if the connection is coming from the relay itself +func (s *Server) isSelfConnection(remote string) bool { + // Check for localhost connections + if strings.HasPrefix(remote, "127.0.0.1:") || strings.HasPrefix(remote, "::1:") || strings.HasPrefix(remote, "[::1]:") { + return true + } + + // Check for connections from the same IP as the server + // This handles cases where the relay connects to itself via its public IP + if s.Config.Listen != "" { + // Extract IP from listen address (e.g., "0.0.0.0" -> "0.0.0.0") + listenIP := s.Config.Listen + if listenIP == "0.0.0.0" || listenIP == "" { + // If listening on all interfaces, check if remote IP matches any local interface + // For now, we'll be conservative and only check localhost + return false + } + // Check if remote IP matches the listen IP + remoteIP := strings.Split(remote, ":")[0] + if remoteIP == listenIP { + return true + } + } + + return false +} + +// isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system +func (s *Server) isIPBlacklisted(remote string) bool { + // Extract IP from remote address (e.g., "192.168.1.1:12345" -> "192.168.1.1") + remoteIP := strings.Split(remote, ":")[0] + + // Check if managed ACL is available and active + if s.Config.ACLMode == "managed" { + for _, aclInstance := range acl.Registry.ACL { + if aclInstance.Type() == "managed" { + if managed, ok := aclInstance.(*acl.Managed); ok { + return managed.IsIPBlocked(remoteIP) + } + } + } + } + + return false +} + func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Set comprehensive CORS headers for proxy compatibility w.Header().Set("Access-Control-Allow-Origin", "*") diff --git a/cmd/benchmark/configs/config.toml b/cmd/benchmark/configs/config.toml index 4f0dff4..54b8107 100644 --- a/cmd/benchmark/configs/config.toml +++ b/cmd/benchmark/configs/config.toml @@ -18,8 +18,8 @@ address = "0.0.0.0" messages_per_sec = 0 subscriptions_per_min = 0 max_event_bytes = 65535 -max_ws_message_bytes = 131072 -max_ws_frame_bytes = 131072 +max_ws_message_bytes = 104857600 +max_ws_frame_bytes = 104857600 [authorization] pubkey_whitelist = [] diff --git a/cmd/benchmark/configs/strfry.conf b/cmd/benchmark/configs/strfry.conf index 5712fa3..c1b7942 100644 --- a/cmd/benchmark/configs/strfry.conf +++ b/cmd/benchmark/configs/strfry.conf @@ -41,7 +41,7 @@ relay { } # Maximum accepted incoming websocket frame size (should be larger than max event) (restart required) - maxWebsocketPayloadSize = 131072 + maxWebsocketPayloadSize = 104857600 # Websocket-level PING message frequency (should be less than any reverse proxy idle timeouts) (restart required) autoPingSeconds = 55 diff --git a/cmd/benchmark/setup-external-relays.sh b/cmd/benchmark/setup-external-relays.sh index 02b7119..e16b364 100755 --- a/cmd/benchmark/setup-external-relays.sh +++ b/cmd/benchmark/setup-external-relays.sh @@ -250,7 +250,7 @@ relay { } # Maximum accepted incoming websocket frame size (should be larger than max event) (restart required) - maxWebsocketPayloadSize = 131072 + maxWebsocketPayloadSize = 104857600 # Websocket-level PING message frequency (should be less than any reverse proxy idle timeouts) (restart required) autoPingSeconds = 55 @@ -332,8 +332,8 @@ address = "0.0.0.0" messages_per_sec = 0 subscriptions_per_min = 0 max_event_bytes = 65535 -max_ws_message_bytes = 131072 -max_ws_frame_bytes = 131072 +max_ws_message_bytes = 104857600 +max_ws_frame_bytes = 104857600 [authorization] pubkey_whitelist = [] diff --git a/main.go b/main.go index d6f78ca..9f7c65e 100644 --- a/main.go +++ b/main.go @@ -53,32 +53,32 @@ func main() { runtime.GOMAXPROCS(runtime.NumCPU() * 4) var err error var cfg *config.C - if cfg, err = config.New(); chk.T(err) { - } - log.I.F("starting %s %s", cfg.AppName, version.V) + if cfg, err = config.New(); chk.T(err) { + } + log.I.F("starting %s %s", cfg.AppName, version.V) - // Handle 'identity' subcommand: print relay identity secret and pubkey and exit - if config.IdentityRequested() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - var db *database.D - if db, err = database.New(ctx, cancel, cfg.DataDir, cfg.DBLogLevel); chk.E(err) { - os.Exit(1) - } - defer db.Close() - skb, err := db.GetOrCreateRelayIdentitySecret() - if chk.E(err) { - os.Exit(1) - } - pk, err := keys.SecretBytesToPubKeyHex(skb) - if chk.E(err) { - os.Exit(1) - } - fmt.Printf("identity secret: %s\nidentity pubkey: %s\n", hex.Enc(skb), pk) - os.Exit(0) - } + // Handle 'identity' subcommand: print relay identity secret and pubkey and exit + if config.IdentityRequested() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var db *database.D + if db, err = database.New(ctx, cancel, cfg.DataDir, cfg.DBLogLevel); chk.E(err) { + os.Exit(1) + } + defer db.Close() + skb, err := db.GetOrCreateRelayIdentitySecret() + if chk.E(err) { + os.Exit(1) + } + pk, err := keys.SecretBytesToPubKeyHex(skb) + if chk.E(err) { + os.Exit(1) + } + fmt.Printf("identity secret: %s\nidentity pubkey: %s\n", hex.Enc(skb), pk) + os.Exit(0) + } - // If OpenPprofWeb is true and profiling is enabled, we need to ensure HTTP profiling is also enabled + // If OpenPprofWeb is true and profiling is enabled, we need to ensure HTTP profiling is also enabled if cfg.OpenPprofWeb && cfg.Pprof != "" && !cfg.PprofHTTP { log.I.F("enabling HTTP pprof server to support web viewer") cfg.PprofHTTP = true @@ -220,7 +220,7 @@ func main() { // Open the pprof web viewer if enabled if cfg.OpenPprofWeb && cfg.Pprof != "" { - pprofURL := fmt.Sprintf("http://localhost:6060/debug/pprof/") + pprofURL := "http://localhost:6060/debug/pprof/" go func() { // Wait a moment for the server to start time.Sleep(500 * time.Millisecond) diff --git a/pkg/acl/follows.go b/pkg/acl/follows.go index 450a9ee..ad0d2df 100644 --- a/pkg/acl/follows.go +++ b/pkg/acl/follows.go @@ -44,6 +44,8 @@ type Follows struct { follows [][]byte updated chan struct{} subsCancel context.CancelFunc + // Track last follow list fetch time + lastFollowListFetch time.Time } func (f *Follows) Configure(cfg ...any) (err error) { @@ -240,7 +242,7 @@ func (f *Follows) adminRelays() (urls []string) { return } -func (f *Follows) startSubscriptions(ctx context.Context) { +func (f *Follows) startEventSubscriptions(ctx context.Context) { // build authors list: admins + follows f.followsMx.RLock() authors := make([][]byte, 0, len(f.admins)+len(f.follows)) @@ -257,10 +259,11 @@ func (f *Follows) startSubscriptions(ctx context.Context) { log.W.F("follows syncer: no admin relays found in DB (kind 10002) and no bootstrap relays configured") return } - log.T.F( + log.I.F( "follows syncer: subscribing to %d relays for %d authors", len(urls), len(authors), ) + log.I.F("follows syncer: starting follow list fetching from relays: %v", urls) for _, u := range urls { u := u go func() { @@ -336,11 +339,13 @@ func (f *Follows) startSubscriptions(ctx context.Context) { } backoff = time.Second log.T.F("follows syncer: successfully connected to %s", u) + log.I.F("follows syncer: subscribing to events from relay %s", u) - // send REQ for kind 3 (follow lists), kind 10002 (relay lists), and all events from follows + // send REQ for admin follow lists, relay lists, and all events from follows ff := &filter.S{} + // Add filter for admin follow lists (kind 3) - for immediate updates f1 := &filter.F{ - Authors: tag.NewFromBytesSlice(authors...), + Authors: tag.NewFromBytesSlice(f.admins...), Kinds: kind.NewS(kind.New(kind.FollowList.K)), Limit: values.ToUintPointer(100), } @@ -357,29 +362,46 @@ func (f *Follows) startSubscriptions(ctx context.Context) { Limit: values.ToUintPointer(1000), } *ff = append(*ff, f1, f2, f3) - req := reqenvelope.NewFrom([]byte("follows-sync"), ff) + // Use a subscription ID for event sync (no follow lists) + subID := "event-sync" + req := reqenvelope.NewFrom([]byte(subID), ff) if err = c.Write( ctx, websocket.MessageText, req.Marshal(nil), ); chk.E(err) { log.W.F( - "follows syncer: failed to send REQ to %s: %v", u, err, + "follows syncer: failed to send event REQ to %s: %v", u, err, ) _ = c.Close(websocket.StatusInternalError, "write failed") continue } log.T.F( - "follows syncer: sent REQ to %s for kind 3, 10002, and all events (last 30 days) from followed users", + "follows syncer: sent event REQ to %s for admin follow lists, kind 10002, and all events (last 30 days) from followed users", u, ) - // read loop + // read loop with keepalive + keepaliveTicker := time.NewTicker(30 * time.Second) + defer keepaliveTicker.Stop() + for { select { case <-ctx.Done(): _ = c.Close(websocket.StatusNormalClosure, "ctx done") return + case <-keepaliveTicker.C: + // Send ping to keep connection alive + if err := c.Ping(ctx); err != nil { + log.T.F("follows syncer: ping failed for %s: %v", u, err) + break + } + log.T.F("follows syncer: sent ping to %s", u) + continue default: } - _, data, err := c.Read(ctx) + + // Set a read timeout to avoid hanging + readCtx, readCancel := context.WithTimeout(ctx, 60*time.Second) + _, data, err := c.Read(readCtx) + readCancel() if err != nil { _ = c.Close(websocket.StatusNormalClosure, "read err") break @@ -402,12 +424,19 @@ func (f *Follows) startSubscriptions(ctx context.Context) { // Process events based on kind switch res.Event.Kind { case kind.FollowList.K: - log.T.F( - "follows syncer: received kind 3 (follow list) event from %s on relay %s", - hex.EncodeToString(res.Event.Pubkey), u, - ) - // Extract followed pubkeys from 'p' tags in kind 3 events - f.extractFollowedPubkeys(res.Event) + // Check if this is from an admin and process immediately + if f.isAdminPubkey(res.Event.Pubkey) { + log.I.F( + "follows syncer: received admin follow list from %s on relay %s - processing immediately", + hex.EncodeToString(res.Event.Pubkey), u, + ) + f.extractFollowedPubkeys(res.Event) + } else { + log.T.F( + "follows syncer: received follow list from non-admin %s on relay %s - ignoring", + hex.EncodeToString(res.Event.Pubkey), u, + ) + } case kind.RelayListMetadata.K: log.T.F( "follows syncer: received kind 10002 (relay list) event from %s on relay %s", @@ -445,12 +474,23 @@ func (f *Follows) startSubscriptions(ctx context.Context) { // ) } case eoseenvelope.L: - // ignore, continue subscription + log.T.F("follows syncer: received EOSE from %s, continuing persistent subscription", u) + // Continue the subscription for new events default: // ignore other labels } } - // loop reconnect + // Connection dropped, reconnect after delay + log.W.F("follows syncer: connection to %s dropped, will reconnect in 30 seconds", u) + + // Wait before reconnecting to avoid tight reconnection loops + timer := time.NewTimer(30 * time.Second) + select { + case <-ctx.Done(): + return + case <-timer.C: + // Continue to reconnect + } } }() } @@ -458,6 +498,11 @@ func (f *Follows) startSubscriptions(ctx context.Context) { func (f *Follows) Syncer() { log.I.F("starting follows syncer") + + // Start periodic follow list fetching + go f.startPeriodicFollowListFetching() + + // Start event subscriptions go func() { // start immediately if Configure already ran for { @@ -478,7 +523,7 @@ func (f *Follows) Syncer() { f.subsCancel = cancel innerCancel = cancel log.I.F("follows syncer: (re)opening subscriptions") - f.startSubscriptions(ctx) + f.startEventSubscriptions(ctx) } // small sleep to avoid tight loop if updated fires rapidly if innerCancel == nil { @@ -489,6 +534,148 @@ func (f *Follows) Syncer() { f.updated <- struct{}{} } +// startPeriodicFollowListFetching starts periodic fetching of admin follow lists +func (f *Follows) startPeriodicFollowListFetching() { + frequency := f.cfg.FollowListFrequency + if frequency == 0 { + frequency = time.Hour // Default to 1 hour + } + + log.I.F("starting periodic follow list fetching every %v", frequency) + + ticker := time.NewTicker(frequency) + defer ticker.Stop() + + // Fetch immediately on startup + f.fetchAdminFollowLists() + + for { + select { + case <-f.Ctx.Done(): + log.D.F("periodic follow list fetching stopped due to context cancellation") + return + case <-ticker.C: + f.fetchAdminFollowLists() + } + } +} + +// fetchAdminFollowLists fetches follow lists from admin relays +func (f *Follows) fetchAdminFollowLists() { + log.I.F("follows syncer: fetching admin follow lists") + + urls := f.adminRelays() + if len(urls) == 0 { + log.W.F("follows syncer: no admin relays found for follow list fetching") + return + } + + // build authors list: admins only (not follows) + f.followsMx.RLock() + authors := make([][]byte, len(f.admins)) + copy(authors, f.admins) + f.followsMx.RUnlock() + + if len(authors) == 0 { + log.W.F("follows syncer: no admins to fetch follow lists for") + return + } + + log.I.F("follows syncer: fetching follow lists from %d relays for %d admins", len(urls), len(authors)) + + for _, u := range urls { + go f.fetchFollowListsFromRelay(u, authors) + } +} + +// fetchFollowListsFromRelay fetches follow lists from a specific relay +func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) { + ctx, cancel := context.WithTimeout(f.Ctx, 30*time.Second) + defer cancel() + + // Create proper headers for the WebSocket connection + headers := http.Header{} + headers.Set("User-Agent", "ORLY-Relay/0.9.2") + headers.Set("Origin", "https://orly.dev") + + // Use proper WebSocket dial options + dialOptions := &websocket.DialOptions{ + HTTPHeader: headers, + } + + c, _, err := websocket.Dial(ctx, relayURL, dialOptions) + if err != nil { + log.W.F("follows syncer: failed to connect to %s for follow list fetch: %v", relayURL, err) + return + } + defer c.Close(websocket.StatusNormalClosure, "follow list fetch complete") + + log.I.F("follows syncer: fetching follow lists from relay %s", relayURL) + + // Create filter for follow lists only (kind 3) + ff := &filter.S{} + f1 := &filter.F{ + Authors: tag.NewFromBytesSlice(authors...), + Kinds: kind.NewS(kind.New(kind.FollowList.K)), + Limit: values.ToUintPointer(100), + } + *ff = append(*ff, f1) + + // Use a specific subscription ID for follow list fetching + subID := "follow-lists-fetch" + req := reqenvelope.NewFrom([]byte(subID), ff) + if err = c.Write(ctx, websocket.MessageText, req.Marshal(nil)); chk.E(err) { + log.W.F("follows syncer: failed to send follow list REQ to %s: %v", relayURL, err) + return + } + + log.T.F("follows syncer: sent follow list REQ to %s", relayURL) + + // Read follow list events with timeout + timeout := time.After(10 * time.Second) + for { + select { + case <-ctx.Done(): + return + case <-timeout: + log.T.F("follows syncer: timeout reading follow lists from %s", relayURL) + return + default: + } + + _, data, err := c.Read(ctx) + if err != nil { + log.T.F("follows syncer: error reading follow lists from %s: %v", relayURL, err) + return + } + + label, rem, err := envelopes.Identify(data) + if chk.E(err) { + continue + } + + switch label { + case eventenvelope.L: + res, _, err := eventenvelope.ParseResult(rem) + if chk.E(err) || res == nil || res.Event == nil { + continue + } + + // Process follow list events + if res.Event.Kind == kind.FollowList.K { + log.I.F("follows syncer: received follow list from %s on relay %s", + hex.EncodeToString(res.Event.Pubkey), relayURL) + f.extractFollowedPubkeys(res.Event) + } + case eoseenvelope.L: + log.T.F("follows syncer: end of follow list events from %s", relayURL) + return + default: + // ignore other labels + } + } +} + // GetFollowedPubkeys returns a copy of the followed pubkeys list func (f *Follows) GetFollowedPubkeys() [][]byte { f.followsMx.RLock() @@ -499,6 +686,19 @@ func (f *Follows) GetFollowedPubkeys() [][]byte { return followedPubkeys } +// isAdminPubkey checks if a pubkey belongs to an admin +func (f *Follows) isAdminPubkey(pubkey []byte) bool { + f.followsMx.RLock() + defer f.followsMx.RUnlock() + + for _, admin := range f.admins { + if utils.FastEqual(admin, pubkey) { + return true + } + } + return false +} + // extractFollowedPubkeys extracts followed pubkeys from 'p' tags in kind 3 events func (f *Follows) extractFollowedPubkeys(event *event.E) { if event.Kind != kind.FollowList.K { diff --git a/pkg/spider/spider.go b/pkg/spider/spider.go index 7a608e1..e700a38 100644 --- a/pkg/spider/spider.go +++ b/pkg/spider/spider.go @@ -23,8 +23,8 @@ import ( const ( OneTimeSpiderSyncMarker = "spider_one_time_sync_completed" SpiderLastScanMarker = "spider_last_scan_time" - // MaxWebSocketMessageSize is the maximum size for WebSocket messages to avoid 32KB limit - MaxWebSocketMessageSize = 30 * 1024 // 30KB to be safe + // MaxWebSocketMessageSize is the maximum size for WebSocket messages + MaxWebSocketMessageSize = 100 * 1024 * 1024 // 100MB // PubkeyHexSize is the size of a hex-encoded pubkey (32 bytes = 64 hex chars) PubkeyHexSize = 64 ) @@ -34,6 +34,8 @@ type Spider struct { cfg *config.C ctx context.Context cancel context.CancelFunc + // Configured relay addresses for self-detection + relayAddresses []string } func New( @@ -41,10 +43,11 @@ func New( cancel context.CancelFunc, ) *Spider { return &Spider{ - db: db, - cfg: cfg, - ctx: ctx, - cancel: cancel, + db: db, + cfg: cfg, + ctx: ctx, + cancel: cancel, + relayAddresses: cfg.RelayAddresses, } } @@ -187,6 +190,7 @@ func (s *Spider) performSync(startTime, endTime time.Time) error { // 4. Query each relay for events from followed pubkeys in the time range eventsFound := 0 for _, relayURL := range relayURLs { + log.I.F("Spider sync: fetching follow lists from relay %s", relayURL) count, err := s.queryRelayForEvents( relayURL, followedPubkeys, startTime, endTime, ) @@ -194,6 +198,7 @@ func (s *Spider) performSync(startTime, endTime time.Time) error { log.E.F("Spider sync: error querying relay %s: %v", relayURL, err) continue } + log.I.F("Spider sync: completed fetching from relay %s, found %d events", relayURL, count) eventsFound += count } @@ -263,6 +268,18 @@ func (s *Spider) discoverRelays(followedPubkeys [][]byte) ([]string, error) { if n == "" { continue } + // Skip if this relay is one of the configured relay addresses + skipRelay := false + for _, relayAddr := range s.relayAddresses { + if n == relayAddr { + log.D.F("spider: skipping configured relay address: %s", n) + skipRelay = true + break + } + } + if skipRelay { + continue + } if _, ok := seen[n]; ok { continue }