diff --git a/app/config/config.go b/app/config/config.go index 7d4c3b6..6ef9332 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -31,12 +31,15 @@ type C struct { EnableShutdown bool `env:"ORLY_ENABLE_SHUTDOWN" default:"false" usage:"if true, expose /shutdown on the health port to gracefully stop the process (for profiling)"` LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"` DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"` + DBBlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"512" usage:"Badger block cache size in MB (higher improves read hit ratio)"` + DBIndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"256" usage:"Badger index cache size in MB (improves index lookup performance)"` LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"` Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation,heap,block,goroutine,threadcreate,mutex"` PprofPath string `env:"ORLY_PPROF_PATH" usage:"optional directory to write pprof profiles into (inside container); default is temporary dir"` PprofHTTP bool `env:"ORLY_PPROF_HTTP" default:"false" usage:"if true, expose net/http/pprof on port 6060"` OpenPprofWeb bool `env:"ORLY_OPEN_PPROF_WEB" default:"false" usage:"if true, automatically open the pprof web viewer when profiling is enabled"` IPWhitelist []string `env:"ORLY_IP_WHITELIST" usage:"comma-separated list of IP addresses to allow access from, matches on prefixes to allow private subnets, eg 10.0.0 = 10.0.0.0/8"` + IPBlacklist []string `env:"ORLY_IP_BLACKLIST" usage:"comma-separated list of IP addresses to block; matches on prefixes to allow subnets, e.g. 192.168 = 192.168.0.0/16"` Admins []string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"` Owners []string `env:"ORLY_OWNERS" usage:"comma-separated list of owner npubs, who have full control of the relay for wipe and restart and other functions"` ACLMode string `env:"ORLY_ACL_MODE" usage:"ACL mode: follows, managed (nip-86), none" default:"none"` diff --git a/app/main.go b/app/main.go index 5e6f5f3..56eb661 100644 --- a/app/main.go +++ b/app/main.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "net/http" + "sync" + "time" "lol.mleku.dev/chk" "lol.mleku.dev/log" @@ -18,11 +20,14 @@ import ( func Run( ctx context.Context, cfg *config.C, db *database.D, ) (quit chan struct{}) { + quit = make(chan struct{}) + var once sync.Once + // shutdown handler go func() { <-ctx.Done() log.I.F("shutting down") - close(quit) + once.Do(func() { close(quit) }) }() // get the admins var err error @@ -112,9 +117,37 @@ func Run( } addr := fmt.Sprintf("%s:%d", cfg.Listen, cfg.Port) log.I.F("starting listener on http://%s", addr) + + // Create HTTP server for graceful shutdown + srv := &http.Server{ + Addr: addr, + Handler: l, + } + go func() { - chk.E(http.ListenAndServe(addr, l)) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.E.F("HTTP server error: %v", err) + } }() - quit = make(chan struct{}) + + // Graceful shutdown handler + go func() { + <-ctx.Done() + log.I.F("shutting down HTTP server gracefully") + + // Create shutdown context with timeout + shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second) + defer cancelShutdown() + + // Shutdown the server gracefully + if err := srv.Shutdown(shutdownCtx); err != nil { + log.E.F("HTTP server shutdown error: %v", err) + } else { + log.I.F("HTTP server shutdown completed") + } + + once.Do(func() { close(quit) }) + }() + return } diff --git a/app/server.go b/app/server.go index 2f23ede..878714a 100644 --- a/app/server.go +++ b/app/server.go @@ -54,6 +54,16 @@ func (s *Server) isIPBlacklisted(remote string) bool { // Extract IP from remote address (e.g., "192.168.1.1:12345" -> "192.168.1.1") remoteIP := strings.Split(remote, ":")[0] + // Check static IP blacklist from config first + if len(s.Config.IPBlacklist) > 0 { + for _, blocked := range s.Config.IPBlacklist { + // Allow simple prefix matching for subnets (e.g., "192.168" matches 192.168.0.0/16) + if blocked != "" && strings.HasPrefix(remoteIP, blocked) { + return true + } + } + } + // Check if managed ACL is available and active if s.Config.ACLMode == "managed" { for _, aclInstance := range acl.Registry.ACL { diff --git a/main.go b/main.go index 9f7c65e..e422295 100644 --- a/main.go +++ b/main.go @@ -282,11 +282,14 @@ func main() { select { case <-sigs: fmt.Printf("\r") - cancel() + log.I.F("received shutdown signal, starting graceful shutdown") + cancel() // This will trigger HTTP server shutdown + <-quit // Wait for HTTP server to shut down chk.E(db.Close()) log.I.F("exiting") return case <-quit: + log.I.F("application quit signal received") cancel() chk.E(db.Close()) log.I.F("exiting") diff --git a/pkg/acl/follows.go b/pkg/acl/follows.go index 3bab5f1..cb7b762 100644 --- a/pkg/acl/follows.go +++ b/pkg/acl/follows.go @@ -178,6 +178,33 @@ func (f *Follows) adminRelays() (urls []string) { copy(admins, f.admins) f.followsMx.RUnlock() seen := make(map[string]struct{}) + // Build a set of normalized self relay addresses to avoid self-connections + selfSet := make(map[string]struct{}) + selfHosts := make(map[string]struct{}) + if f.cfg != nil && len(f.cfg.RelayAddresses) > 0 { + for _, a := range f.cfg.RelayAddresses { + n := string(normalize.URL(a)) + if n == "" { + continue + } + selfSet[n] = struct{}{} + // Also record hostname (without port) for robust matching + // Accept simple splitting; normalize.URL ensures scheme://host[:port] + host := n + if i := strings.Index(host, "://"); i >= 0 { + host = host[i+3:] + } + if j := strings.Index(host, "/"); j >= 0 { + host = host[:j] + } + if k := strings.Index(host, ":"); k >= 0 { + host = host[:k] + } + if host != "" { + selfHosts[host] = struct{}{} + } + } + } // First, try to get relay URLs from admin kind 10002 events for _, adm := range admins { @@ -208,6 +235,26 @@ func (f *Follows) adminRelays() (urls []string) { if n == "" { continue } + // Skip if this URL is one of our configured self relay addresses or hosts + if _, isSelf := selfSet[n]; isSelf { + log.D.F("follows syncer: skipping configured self relay address: %s", n) + continue + } + // Host match + host := n + if i := strings.Index(host, "://"); i >= 0 { + host = host[i+3:] + } + if j := strings.Index(host, "/"); j >= 0 { + host = host[:j] + } + if k := strings.Index(host, ":"); k >= 0 { + host = host[:k] + } + if _, isSelfHost := selfHosts[host]; isSelfHost { + log.D.F("follows syncer: skipping configured self relay address: %s", n) + continue + } if _, ok := seen[n]; ok { continue } @@ -228,6 +275,26 @@ func (f *Follows) adminRelays() (urls []string) { log.W.F("invalid bootstrap relay URL: %s", relay) continue } + // Skip if this URL is one of our configured self relay addresses or hosts + if _, isSelf := selfSet[n]; isSelf { + log.D.F("follows syncer: skipping configured self relay address: %s", n) + continue + } + // Host match + host := n + if i := strings.Index(host, "://"); i >= 0 { + host = host[i+3:] + } + if j := strings.Index(host, "/"); j >= 0 { + host = host[:j] + } + if k := strings.Index(host, ":"); k >= 0 { + host = host[:k] + } + if _, isSelfHost := selfHosts[host]; isSelfHost { + log.D.F("follows syncer: skipping configured self relay address: %s", n) + continue + } if _, ok := seen[n]; ok { continue } @@ -359,14 +426,16 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) { f3 := &filter.F{ Authors: tag.NewFromBytesSlice(authors...), Since: oneMonthAgo, - Limit: values.ToUintPointer(1000), + Limit: values.ToUintPointer(500), } *ff = append(*ff, f1, f2, f3) // Use a subscription ID for event sync (no follow lists) subID := "event-sync" req := reqenvelope.NewFrom([]byte(subID), ff) + reqBytes := req.Marshal(nil) + log.T.F("follows syncer: outbound REQ to %s: %s", u, string(reqBytes)) if err = c.Write( - ctx, websocket.MessageText, req.Marshal(nil), + ctx, websocket.MessageText, reqBytes, ); chk.E(err) { log.W.F( "follows syncer: failed to send event REQ to %s: %v", u, err, @@ -623,7 +692,9 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) { // Use a specific subscription ID for follow list fetching subID := "follow-lists-fetch" req := reqenvelope.NewFrom([]byte(subID), ff) - if err = c.Write(ctx, websocket.MessageText, req.Marshal(nil)); chk.E(err) { + reqBytes := req.Marshal(nil) + log.T.F("follows syncer: outbound REQ to %s: %s", relayURL, string(reqBytes)) + if err = c.Write(ctx, websocket.MessageText, reqBytes); chk.E(err) { log.W.F("follows syncer: failed to send follow list REQ to %s: %v", relayURL, err) return } diff --git a/pkg/database/database.go b/pkg/database/database.go index 5ff2bb6..f99d550 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -5,6 +5,7 @@ import ( "errors" "os" "path/filepath" + "strconv" "time" "github.com/dgraph-io/badger/v4" @@ -52,18 +53,29 @@ func New( } opts := badger.DefaultOptions(d.dataDir) - // Use sane defaults to avoid excessive memory usage during startup. - // Badger's default BlockSize is small (e.g., 4KB). Overriding it to very large values - // can cause massive allocations and OOM panics during deployments. - // Set BlockCacheSize to a moderate value and keep BlockSize small. - opts.BlockCacheSize = int64(256 * units.Mb) // 256 MB cache - opts.BlockSize = 4 * units.Kb // 4 KB block size + // Configure caches based on environment to better match workload. + // Defaults aim for higher hit ratios under read-heavy workloads while remaining safe. + var blockCacheMB = 512 // default 512 MB + var indexCacheMB = 256 // default 256 MB + if v := os.Getenv("ORLY_DB_BLOCK_CACHE_MB"); v != "" { + if n, perr := strconv.Atoi(v); perr == nil && n > 0 { + blockCacheMB = n + } + } + if v := os.Getenv("ORLY_DB_INDEX_CACHE_MB"); v != "" { + if n, perr := strconv.Atoi(v); perr == nil && n > 0 { + indexCacheMB = n + } + } + opts.BlockCacheSize = int64(blockCacheMB * units.Mb) + opts.IndexCacheSize = int64(indexCacheMB * units.Mb) + opts.BlockSize = 4 * units.Kb // 4 KB block size // Prevent huge allocations during table building and memtable flush. // Badger's TableBuilder buffer is sized by BaseTableSize; ensure it's small. - opts.BaseTableSize = 64 * units.Mb // 64 MB per table (default ~2MB, increased for fewer files but safe) - opts.MemTableSize = 64 * units.Mb // 64 MB memtable to match table size + opts.BaseTableSize = 64 * units.Mb // 64 MB per table (default ~2MB, increased for fewer files but safe) + opts.MemTableSize = 64 * units.Mb // 64 MB memtable to match table size // Keep value log files to a moderate size as well - opts.ValueLogFileSize = 256 * units.Mb // 256 MB value log files + opts.ValueLogFileSize = 256 * units.Mb // 256 MB value log files opts.CompactL0OnClose = true opts.LmaxCompaction = true opts.Compression = options.None diff --git a/pkg/protocol/ws/client.go b/pkg/protocol/ws/client.go index 38da1d5..5fd9706 100644 --- a/pkg/protocol/ws/client.go +++ b/pkg/protocol/ws/client.go @@ -263,6 +263,7 @@ func (r *Client) ConnectWithTLS( case wr := <-r.writeQueue: // all write requests will go through this to prevent races // log.D.F("{%s} sending %v\n", r.URL, string(wr.msg)) + log.T.F("WS.Client: outbound message to %s: %s", r.URL, string(wr.msg)) if err = r.Connection.WriteMessage( r.connectionContext, wr.msg, ); err != nil { diff --git a/pkg/protocol/ws/subscription.go b/pkg/protocol/ws/subscription.go index 3527934..4ac1912 100644 --- a/pkg/protocol/ws/subscription.go +++ b/pkg/protocol/ws/subscription.go @@ -171,6 +171,7 @@ func (sub *Subscription) Close() { if sub.Client.IsConnected() { closeMsg := closeenvelope.NewFrom(sub.id) closeb := closeMsg.Marshal(nil) + log.T.F("WS.Subscription.Close: outbound CLOSE to %s: %s", sub.Client.URL, string(closeb)) <-sub.Client.Write(closeb) } } @@ -191,6 +192,7 @@ func (sub *Subscription) Fire() (err error) { "WS.Subscription.Fire: sending REQ id=%s filters=%d bytes=%d", sub.GetID(), len(*sub.Filters), len(reqb), ) + log.T.F("WS.Subscription.Fire: outbound REQ to %s: %s", sub.Client.URL, string(reqb)) if err = <-sub.Client.Write(reqb); err != nil { err = fmt.Errorf("failed to write: %w", err) log.T.F( diff --git a/pkg/spider/spider.go b/pkg/spider/spider.go index e700a38..8f3eacb 100644 --- a/pkg/spider/spider.go +++ b/pkg/spider/spider.go @@ -357,7 +357,7 @@ func (s *Spider) queryRelayForEvents( Authors: tag.NewFromBytesSlice(chunk...), Since: timestamp.FromUnix(startTime.Unix()), Until: timestamp.FromUnix(endTime.Unix()), - Limit: func() *uint { l := uint(1000); return &l }(), // Limit to avoid overwhelming + Limit: func() *uint { l := uint(500); return &l }(), // Limit to avoid overwhelming } // Subscribe to get events for this chunk