From 9da1784b1b333b5de6098e680f7e18b341b1d5f0 Mon Sep 17 00:00:00 2001 From: woikos Date: Wed, 14 Jan 2026 08:09:39 +0100 Subject: [PATCH] Add Blossom bandwidth limiting and tune rate limiters (v0.49.0) - Add token-bucket bandwidth rate limiting for Blossom uploads - ORLY_BLOSSOM_RATE_LIMIT enables limiting (default: false) - ORLY_BLOSSOM_DAILY_LIMIT_MB sets daily limit (default: 10MB) - ORLY_BLOSSOM_BURST_LIMIT_MB sets burst cap (default: 50MB) - Followed users, admins, owners are exempt (unlimited) - Change emergency mode throttling from exponential to linear scaling - Old: 16x multiplier at emergency threshold entry - New: 1x at threshold, +1x per 20% excess pressure - Reduce follows ACL throttle increment from 200ms to 25ms per event - Update dependencies Files modified: - app/blossom.go: Pass rate limit config to blossom server - app/config/config.go: Add Blossom rate limit config options - pkg/blossom/ratelimit.go: New bandwidth limiter implementation - pkg/blossom/server.go: Add rate limiter integration - pkg/blossom/handlers.go: Check rate limits on upload/mirror/media - pkg/ratelimit/limiter.go: Linear emergency throttling - pkg/acl/follows.go: Reduce default throttle increment Co-Authored-By: Claude Opus 4.5 --- app/blossom.go | 13 ++- app/config/config.go | 7 +- app/handle-cashu.go | 11 ++ app/handle-event.go | 26 +++-- app/publisher.go | 14 +++ go.mod | 8 +- go.sum | 5 +- pkg/acl/follows.go | 2 +- pkg/blossom/handlers.go | 18 ++++ pkg/blossom/ratelimit.go | 131 ++++++++++++++++++++++++ pkg/blossom/server.go | 61 +++++++++++ pkg/database/get-indexes-from-filter.go | 12 +++ pkg/database/get-serial-by-id.go | 8 ++ pkg/database/indexes/types/idhash.go | 4 +- pkg/event/routing/ephemeral.go | 2 + pkg/ratelimit/limiter.go | 41 ++++---- pkg/version/version | 2 +- 17 files changed, 321 insertions(+), 44 deletions(-) create mode 100644 pkg/blossom/ratelimit.go diff --git a/app/blossom.go b/app/blossom.go index 59cd50e..0d13552 100644 --- a/app/blossom.go +++ b/app/blossom.go @@ -20,8 +20,12 @@ func initializeBlossomServer( blossomCfg := &blossom.Config{ BaseURL: "", // Will be set dynamically per request MaxBlobSize: 100 * 1024 * 1024, // 100MB default - AllowedMimeTypes: nil, // Allow all MIME types by default + AllowedMimeTypes: nil, // Allow all MIME types by default RequireAuth: cfg.AuthRequired || cfg.AuthToWrite, + // Rate limiting for non-followed users + RateLimitEnabled: cfg.BlossomRateLimitEnabled, + DailyLimitMB: cfg.BlossomDailyLimitMB, + BurstLimitMB: cfg.BlossomBurstLimitMB, } // Create blossom server with relay's ACL registry @@ -31,7 +35,12 @@ func initializeBlossomServer( // We'll need to modify the handler to inject the baseURL per request // For now, we'll use a middleware approach - log.I.F("blossom server initialized with ACL mode: %s", cfg.ACLMode) + if cfg.BlossomRateLimitEnabled { + log.I.F("blossom server initialized with ACL mode: %s, rate limit: %dMB/day (burst: %dMB)", + cfg.ACLMode, cfg.BlossomDailyLimitMB, cfg.BlossomBurstLimitMB) + } else { + log.I.F("blossom server initialized with ACL mode: %s", cfg.ACLMode) + } return bs, nil } diff --git a/app/config/config.go b/app/config/config.go index e00ce2f..657dbd1 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -69,13 +69,18 @@ type C struct { // Progressive throttle for follows ACL mode - allows non-followed users to write with increasing delay FollowsThrottleEnabled bool `env:"ORLY_FOLLOWS_THROTTLE" default:"false" usage:"enable progressive delay for non-followed users in follows ACL mode"` - FollowsThrottlePerEvent time.Duration `env:"ORLY_FOLLOWS_THROTTLE_INCREMENT" default:"200ms" usage:"delay added per event for non-followed users"` + FollowsThrottlePerEvent time.Duration `env:"ORLY_FOLLOWS_THROTTLE_INCREMENT" default:"25ms" usage:"delay added per event for non-followed users"` FollowsThrottleMaxDelay time.Duration `env:"ORLY_FOLLOWS_THROTTLE_MAX" default:"60s" usage:"maximum throttle delay cap"` // Blossom blob storage service settings BlossomEnabled bool `env:"ORLY_BLOSSOM_ENABLED" default:"true" usage:"enable Blossom blob storage server (only works with Badger backend)"` BlossomServiceLevels string `env:"ORLY_BLOSSOM_SERVICE_LEVELS" usage:"comma-separated list of service levels in format: name:storage_mb_per_sat_per_month (e.g., basic:1,premium:10)"` + // Blossom upload rate limiting (for non-followed users) + BlossomRateLimitEnabled bool `env:"ORLY_BLOSSOM_RATE_LIMIT" default:"false" usage:"enable upload rate limiting for non-followed users"` + BlossomDailyLimitMB int64 `env:"ORLY_BLOSSOM_DAILY_LIMIT_MB" default:"10" usage:"daily upload limit in MB for non-followed users (EMA averaged)"` + BlossomBurstLimitMB int64 `env:"ORLY_BLOSSOM_BURST_LIMIT_MB" default:"50" usage:"max burst upload in MB (bucket cap)"` + // Web UI and dev mode settings WebDisableEmbedded bool `env:"ORLY_WEB_DISABLE" default:"false" usage:"disable serving the embedded web UI; useful for hot-reload during development"` WebDevProxyURL string `env:"ORLY_WEB_DEV_PROXY_URL" usage:"when ORLY_WEB_DISABLE is true, reverse-proxy non-API paths to this dev server URL (e.g. http://localhost:5173)"` diff --git a/app/handle-cashu.go b/app/handle-cashu.go index da94aea..b911366 100644 --- a/app/handle-cashu.go +++ b/app/handle-cashu.go @@ -124,6 +124,17 @@ func (s *Server) handleCashuKeysets(w http.ResponseWriter, r *http.Request) { // handleCashuInfo handles GET /cashu/info - returns mint information. func (s *Server) handleCashuInfo(w http.ResponseWriter, r *http.Request) { + // CORS headers for browser-based CAT support detection + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Accept") + + // Handle preflight + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusNoContent) + return + } + if s.CashuIssuer == nil { http.Error(w, "Cashu tokens not enabled", http.StatusNotImplemented) return diff --git a/app/handle-event.go b/app/handle-event.go index d810aed..f728c85 100644 --- a/app/handle-event.go +++ b/app/handle-event.go @@ -21,7 +21,7 @@ import ( ) func (l *Listener) HandleEvent(msg []byte) (err error) { - log.D.F("HandleEvent: START handling event: %s", msg) + log.I.F("HandleEvent: START handling event: %s", string(msg[:min(200, len(msg))])) // 1. Raw JSON validation (before unmarshal) - use validation service if result := l.eventValidator.ValidateRawJSON(msg); !result.Valid { @@ -231,6 +231,11 @@ func (l *Listener) HandleEvent(msg []byte) (err error) { // Authorization check (policy + ACL) - use authorization service decision := l.eventAuthorizer.Authorize(env.E, l.authedPubkey.Load(), l.remote, env.E.Kind) + // Debug: log ephemeral event authorization + if env.E.Kind >= 20000 && env.E.Kind < 30000 { + log.I.F("ephemeral auth check: kind %d, allowed=%v, reason=%s", + env.E.Kind, decision.Allowed, decision.DenyReason) + } if !decision.Allowed { log.D.F("HandleEvent: authorization denied: %s (requireAuth=%v)", decision.DenyReason, decision.RequireAuth) if decision.RequireAuth { @@ -256,14 +261,17 @@ func (l *Listener) HandleEvent(msg []byte) (err error) { log.I.F("HandleEvent: authorized with access level %s", decision.AccessLevel) // Progressive throttle for follows ACL mode (delays non-followed users) - if delay := l.getFollowsThrottleDelay(env.E); delay > 0 { - log.D.F("HandleEvent: applying progressive throttle delay of %v for %0x from %s", - delay, env.E.Pubkey, l.remote) - select { - case <-l.ctx.Done(): - return l.ctx.Err() - case <-time.After(delay): - // Delay completed, continue processing + // Skip throttle if a Cashu Access Token is present (authenticated via CAT) + if l.cashuToken == nil { + if delay := l.getFollowsThrottleDelay(env.E); delay > 0 { + log.D.F("HandleEvent: applying progressive throttle delay of %v for %0x from %s", + delay, env.E.Pubkey, l.remote) + select { + case <-l.ctx.Done(): + return l.ctx.Err() + case <-time.After(delay): + // Delay completed, continue processing + } } } diff --git a/app/publisher.go b/app/publisher.go index 2e6cbf8..947bf39 100644 --- a/app/publisher.go +++ b/app/publisher.go @@ -159,12 +159,26 @@ func (p *P) Deliver(ev *event.E) { sub Subscription } var deliveries []delivery + // Debug: log ephemeral event delivery attempts + isEphemeral := ev.Kind >= 20000 && ev.Kind < 30000 + if isEphemeral { + var tagInfo string + if ev.Tags != nil { + tagInfo = string(ev.Tags.Marshal(nil)) + } + log.I.F("ephemeral event kind %d, id %0x, checking %d connections for matches, tags: %s", + ev.Kind, ev.ID[:8], len(p.Map), tagInfo) + } for w, subs := range p.Map { for id, subscriber := range subs { if subscriber.Match(ev) { deliveries = append( deliveries, delivery{w: w, id: id, sub: subscriber}, ) + } else if isEphemeral { + // Debug: log why ephemeral events don't match + log.I.F("ephemeral event kind %d did NOT match subscription %s (filters: %s)", + ev.Kind, id, string(subscriber.S.Marshal(nil))) } } } diff --git a/go.mod b/go.mod index 4c77b9c..a32304d 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,14 @@ module next.orly.dev go 1.25.3 require ( - git.mleku.dev/mleku/nostr v1.0.12 + git.mleku.dev/mleku/nostr v1.0.13 github.com/adrg/xdg v0.5.3 github.com/alexflint/go-arg v1.6.1 github.com/aperturerobotics/go-indexeddb v0.2.3 + github.com/bits-and-blooms/bloom/v3 v3.7.1 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 github.com/dgraph-io/badger/v4 v4.8.0 + github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/hack-pad/safejs v0.1.1 github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 @@ -22,6 +24,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/vertex-lab/nostr-sqlite v0.3.2 go-simpler.org/env v0.12.0 + go.etcd.io/bbolt v1.4.3 go.uber.org/atomic v1.11.0 golang.org/x/crypto v0.46.0 golang.org/x/lint v0.0.0-20241112194109-818c5a804067 @@ -37,7 +40,6 @@ require ( github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 // indirect github.com/alexflint/go-scalar v1.2.0 // indirect github.com/bits-and-blooms/bitset v1.24.2 // indirect - github.com/bits-and-blooms/bloom/v3 v3.7.1 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect github.com/bytedance/sonic v1.13.1 // indirect @@ -56,7 +58,6 @@ require ( github.com/google/btree v1.1.2 // indirect github.com/google/flatbuffers v25.9.23+incompatible // indirect github.com/google/pprof v0.0.0-20251007162407-5df77e3f7d1d // indirect - github.com/google/uuid v1.6.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect @@ -72,7 +73,6 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect - go.etcd.io/bbolt v1.4.3 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel v1.38.0 // indirect go.opentelemetry.io/otel/metric v1.38.0 // indirect diff --git a/go.sum b/go.sum index e4ec704..71f7323 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -git.mleku.dev/mleku/nostr v1.0.12 h1:bjsFUh1Q3fGpU7qsqxggGgrGGUt2OBdu1w8hjDM4gJE= -git.mleku.dev/mleku/nostr v1.0.12/go.mod h1:kJwSMmLRnAJ7QJtgXDv2wGgceFU0luwVqrgAL3MI93M= +git.mleku.dev/mleku/nostr v1.0.13 h1:FqeOQ9ZX8AFVsAI6XisQkB6cgmhn9DNQ2a8li9gx7aY= +git.mleku.dev/mleku/nostr v1.0.13/go.mod h1:kJwSMmLRnAJ7QJtgXDv2wGgceFU0luwVqrgAL3MI93M= github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg= github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 h1:ClzzXMDDuUbWfNNZqGeYq4PnYOlwlOVIvSyNaIy0ykg= @@ -161,6 +161,7 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg= github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/vertex-lab/nostr-sqlite v0.3.2 h1:8nZYYIwiKnWLA446qA/wL/Gy+bU0kuaxdLfUyfeTt/E= github.com/vertex-lab/nostr-sqlite v0.3.2/go.mod h1:5bw1wMgJhSdrumsZAWxqy+P0u1g+q02PnlGQn15dnSM= diff --git a/pkg/acl/follows.go b/pkg/acl/follows.go index bfd7851..172769f 100644 --- a/pkg/acl/follows.go +++ b/pkg/acl/follows.go @@ -138,7 +138,7 @@ func (f *Follows) Configure(cfg ...any) (err error) { if f.cfg.FollowsThrottleEnabled { perEvent := f.cfg.FollowsThrottlePerEvent if perEvent == 0 { - perEvent = 200 * time.Millisecond + perEvent = 25 * time.Millisecond } maxDelay := f.cfg.FollowsThrottleMaxDelay if maxDelay == 0 { diff --git a/pkg/blossom/handlers.go b/pkg/blossom/handlers.go index d4cc626..adabab2 100644 --- a/pkg/blossom/handlers.go +++ b/pkg/blossom/handlers.go @@ -200,6 +200,12 @@ func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) { return } + // Check bandwidth rate limit (non-followed users) + if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) { + s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later") + return + } + // Calculate SHA256 after auth check sha256Hash := CalculateSHA256(body) sha256Hex := hex.Enc(sha256Hash) @@ -647,6 +653,12 @@ func (s *Server) handleMirror(w http.ResponseWriter, r *http.Request) { return } + // Check bandwidth rate limit (non-followed users) + if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) { + s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later") + return + } + // Note: pubkey may be nil for anonymous uploads if ACL allows it // Detect MIME type from remote response @@ -726,6 +738,12 @@ func (s *Server) handleMediaUpload(w http.ResponseWriter, r *http.Request) { return } + // Check bandwidth rate limit (non-followed users) + if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) { + s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later") + return + } + // Note: pubkey may be nil for anonymous uploads if ACL allows it // Optimize media (placeholder - actual optimization would be implemented here) diff --git a/pkg/blossom/ratelimit.go b/pkg/blossom/ratelimit.go new file mode 100644 index 0000000..b9a8feb --- /dev/null +++ b/pkg/blossom/ratelimit.go @@ -0,0 +1,131 @@ +package blossom + +import ( + "sync" + "time" +) + +// BandwidthState tracks upload bandwidth for an identity +type BandwidthState struct { + BucketBytes int64 // Current token bucket level (bytes available) + LastUpdate time.Time // Last time bucket was updated +} + +// BandwidthLimiter implements token bucket rate limiting for uploads. +// Each identity gets a bucket that replenishes at dailyLimit/day rate. +// Uploads consume tokens from the bucket. +type BandwidthLimiter struct { + mu sync.Mutex + states map[string]*BandwidthState // keyed by pubkey hex or IP + dailyLimit int64 // bytes per day + burstLimit int64 // max bucket size (burst capacity) + refillRate float64 // bytes per second refill rate +} + +// NewBandwidthLimiter creates a new bandwidth limiter. +// dailyLimitMB is the average daily limit in megabytes. +// burstLimitMB is the maximum burst capacity in megabytes. +func NewBandwidthLimiter(dailyLimitMB, burstLimitMB int64) *BandwidthLimiter { + dailyBytes := dailyLimitMB * 1024 * 1024 + burstBytes := burstLimitMB * 1024 * 1024 + + return &BandwidthLimiter{ + states: make(map[string]*BandwidthState), + dailyLimit: dailyBytes, + burstLimit: burstBytes, + refillRate: float64(dailyBytes) / 86400.0, // bytes per second + } +} + +// CheckAndConsume checks if an upload of the given size is allowed for the identity, +// and if so, consumes the tokens. Returns true if allowed, false if rate limited. +// The identity should be pubkey hex for authenticated users, or IP for anonymous. +func (bl *BandwidthLimiter) CheckAndConsume(identity string, sizeBytes int64) bool { + bl.mu.Lock() + defer bl.mu.Unlock() + + now := time.Now() + state, exists := bl.states[identity] + + if !exists { + // New identity starts with full burst capacity + state = &BandwidthState{ + BucketBytes: bl.burstLimit, + LastUpdate: now, + } + bl.states[identity] = state + } else { + // Refill bucket based on elapsed time + elapsed := now.Sub(state.LastUpdate).Seconds() + refill := int64(elapsed * bl.refillRate) + state.BucketBytes += refill + if state.BucketBytes > bl.burstLimit { + state.BucketBytes = bl.burstLimit + } + state.LastUpdate = now + } + + // Check if upload fits in bucket + if state.BucketBytes >= sizeBytes { + state.BucketBytes -= sizeBytes + return true + } + + return false +} + +// GetAvailable returns the currently available bytes for an identity. +func (bl *BandwidthLimiter) GetAvailable(identity string) int64 { + bl.mu.Lock() + defer bl.mu.Unlock() + + state, exists := bl.states[identity] + if !exists { + return bl.burstLimit // New users have full capacity + } + + // Calculate current level with refill + now := time.Now() + elapsed := now.Sub(state.LastUpdate).Seconds() + refill := int64(elapsed * bl.refillRate) + available := state.BucketBytes + refill + if available > bl.burstLimit { + available = bl.burstLimit + } + + return available +} + +// GetTimeUntilAvailable returns how long until the given bytes will be available. +func (bl *BandwidthLimiter) GetTimeUntilAvailable(identity string, sizeBytes int64) time.Duration { + available := bl.GetAvailable(identity) + if available >= sizeBytes { + return 0 + } + + needed := sizeBytes - available + seconds := float64(needed) / bl.refillRate + return time.Duration(seconds * float64(time.Second)) +} + +// Cleanup removes entries that have fully replenished (at burst limit). +func (bl *BandwidthLimiter) Cleanup() { + bl.mu.Lock() + defer bl.mu.Unlock() + + now := time.Now() + for key, state := range bl.states { + elapsed := now.Sub(state.LastUpdate).Seconds() + refill := int64(elapsed * bl.refillRate) + if state.BucketBytes+refill >= bl.burstLimit { + delete(bl.states, key) + } + } +} + +// Stats returns the number of tracked identities. +func (bl *BandwidthLimiter) Stats() int { + bl.mu.Lock() + defer bl.mu.Unlock() + return len(bl.states) +} diff --git a/pkg/blossom/server.go b/pkg/blossom/server.go index b722f7d..6172367 100644 --- a/pkg/blossom/server.go +++ b/pkg/blossom/server.go @@ -19,6 +19,9 @@ type Server struct { maxBlobSize int64 allowedMimeTypes map[string]bool requireAuth bool + + // Rate limiting for uploads + bandwidthLimiter *BandwidthLimiter } // Config holds configuration for the Blossom server @@ -27,6 +30,11 @@ type Config struct { MaxBlobSize int64 AllowedMimeTypes []string RequireAuth bool + + // Rate limiting (for non-followed users) + RateLimitEnabled bool + DailyLimitMB int64 + BurstLimitMB int64 } // NewServer creates a new Blossom server instance @@ -48,6 +56,20 @@ func NewServer(db *database.D, aclRegistry *acl.S, cfg *Config) *Server { } } + // Initialize bandwidth limiter if enabled + var bwLimiter *BandwidthLimiter + if cfg.RateLimitEnabled { + dailyMB := cfg.DailyLimitMB + if dailyMB <= 0 { + dailyMB = 10 // 10MB default + } + burstMB := cfg.BurstLimitMB + if burstMB <= 0 { + burstMB = 50 // 50MB default burst + } + bwLimiter = NewBandwidthLimiter(dailyMB, burstMB) + } + return &Server{ db: db, storage: storage, @@ -56,6 +78,7 @@ func NewServer(db *database.D, aclRegistry *acl.S, cfg *Config) *Server { maxBlobSize: cfg.MaxBlobSize, allowedMimeTypes: allowedMap, requireAuth: cfg.RequireAuth, + bandwidthLimiter: bwLimiter, } } @@ -208,6 +231,44 @@ func (s *Server) checkACL( return actual >= required } +// isRateLimitExempt returns true if the user is exempt from rate limiting. +// Users with write access or higher (followed users, admins, owners) are exempt. +func (s *Server) isRateLimitExempt(pubkey []byte, remoteAddr string) bool { + if s.acl == nil { + return true // No ACL configured, no rate limiting + } + + level := s.acl.GetAccessLevel(pubkey, remoteAddr) + + // Followed users get "write" level, admins/owners get higher + // Only "read" and "none" are rate limited + return level == "write" || level == "admin" || level == "owner" +} + +// checkBandwidthLimit checks if the upload is allowed under rate limits. +// Returns true if allowed, false if rate limited. +// Exempt users (followed, admin, owner) always return true. +func (s *Server) checkBandwidthLimit(pubkey []byte, remoteAddr string, sizeBytes int64) bool { + if s.bandwidthLimiter == nil { + return true // No rate limiting configured + } + + // Check if user is exempt + if s.isRateLimitExempt(pubkey, remoteAddr) { + return true + } + + // Use pubkey hex if available, otherwise IP + var identity string + if len(pubkey) > 0 { + identity = string(pubkey) // Will be converted to hex in handler + } else { + identity = remoteAddr + } + + return s.bandwidthLimiter.CheckAndConsume(identity, sizeBytes) +} + // BaseURLKey is the context key for the base URL (exported for use by app handler) type BaseURLKey struct{} diff --git a/pkg/database/get-indexes-from-filter.go b/pkg/database/get-indexes-from-filter.go index 04d59c2..8b2a77d 100644 --- a/pkg/database/get-indexes-from-filter.go +++ b/pkg/database/get-indexes-from-filter.go @@ -6,6 +6,7 @@ import ( "sort" "lol.mleku.dev/chk" + "lol.mleku.dev/errorf" "lol.mleku.dev/log" "next.orly.dev/pkg/database/indexes" types2 "next.orly.dev/pkg/database/indexes/types" @@ -44,6 +45,12 @@ func NormalizeTagValueForHash(key byte, valueBytes []byte) []byte { func CreateIdHashFromData(data []byte) (i *types2.IdHash, err error) { i = new(types2.IdHash) + // Skip empty data to avoid noisy errors + if len(data) == 0 { + err = errorf.E("CreateIdHashFromData: empty ID provided") + return + } + // If data looks like hex string and has the right length for hex-encoded // sha256 if len(data) == 64 { @@ -95,6 +102,11 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) { // should be an error, but convention just ignores it. if f.Ids.Len() > 0 { for _, id := range f.Ids.T { + // Skip empty IDs - some filters have empty ID values + if len(id) == 0 { + log.D.F("GetIndexesFromFilter: skipping empty ID in filter (ids=%d)", f.Ids.Len()) + continue + } if err = func() (err error) { var i *types2.IdHash if i, err = CreateIdHashFromData(id); chk.E(err) { diff --git a/pkg/database/get-serial-by-id.go b/pkg/database/get-serial-by-id.go index c83888a..cc69079 100644 --- a/pkg/database/get-serial-by-id.go +++ b/pkg/database/get-serial-by-id.go @@ -20,6 +20,10 @@ import ( func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) { // log.T.F("GetSerialById: input id=%s", hex.Enc(id)) + if len(id) == 0 { + err = errorf.E("GetSerialById: called with empty ID") + return + } var idxs []Range if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.NewFromBytesSlice(id)}); chk.E(err) { return @@ -102,6 +106,10 @@ func (d *D) GetSerialsByIdsWithFilter( // Process each ID sequentially for _, id := range ids.T { + // Skip empty IDs + if len(id) == 0 { + continue + } // idHex := hex.Enc(id) // Get the index prefix for this ID diff --git a/pkg/database/indexes/types/idhash.go b/pkg/database/indexes/types/idhash.go index bc8f868..a20ca72 100644 --- a/pkg/database/indexes/types/idhash.go +++ b/pkg/database/indexes/types/idhash.go @@ -24,8 +24,8 @@ func (i *IdHash) Set(idh []byte) { func (i *IdHash) FromId(id []byte) (err error) { if len(id) != sha256.Size { err = errorf.E( - "FromId: invalid ID length, got %d require %d", len(id), - sha256.Size, + "FromId: invalid ID length, got %d require %d (data=%x)", len(id), + sha256.Size, id, ) return } diff --git a/pkg/event/routing/ephemeral.go b/pkg/event/routing/ephemeral.go index d8dca7c..9b9a855 100644 --- a/pkg/event/routing/ephemeral.go +++ b/pkg/event/routing/ephemeral.go @@ -3,6 +3,7 @@ package routing import ( "git.mleku.dev/mleku/nostr/encoders/event" "git.mleku.dev/mleku/nostr/encoders/kind" + "lol.mleku.dev/log" ) // Publisher abstracts event delivery to subscribers. @@ -22,6 +23,7 @@ func IsEphemeral(k uint16) bool { // - Are immediately delivered to subscribers func MakeEphemeralHandler(publisher Publisher) Handler { return func(ev *event.E, authedPubkey []byte) Result { + log.I.F("ephemeral handler received event kind %d, id %0x", ev.Kind, ev.ID[:8]) // Clone and deliver immediately without persistence cloned := ev.Clone() go publisher.Deliver(cloned) diff --git a/pkg/ratelimit/limiter.go b/pkg/ratelimit/limiter.go index 6ef9511..929592e 100644 --- a/pkg/ratelimit/limiter.go +++ b/pkg/ratelimit/limiter.go @@ -377,29 +377,26 @@ func (l *Limiter) ComputeDelay(opType OperationType) time.Duration { // In emergency mode, apply progressive throttling for writes if inEmergency { - // Calculate how far above recovery threshold we are - // At emergency threshold, add 1x normal delay - // For every additional 10% above emergency, double the delay - excessPressure := metrics.MemoryPressure - l.config.RecoveryThreshold - if excessPressure > 0 { - // Progressive multiplier: starts at 2x, doubles every 10% excess - multiplier := 2.0 - for excess := excessPressure; excess > 0.1; excess -= 0.1 { - multiplier *= 2 - } - - emergencyDelaySec := delaySec * multiplier - maxEmergencySec := float64(l.config.EmergencyMaxDelayMs) / 1000.0 - - if emergencyDelaySec > maxEmergencySec { - emergencyDelaySec = maxEmergencySec - } - // Minimum emergency delay of 100ms to allow other operations - if emergencyDelaySec < 0.1 { - emergencyDelaySec = 0.1 - } - delaySec = emergencyDelaySec + // Calculate how far above emergency threshold we are + // Linear scaling: multiplier = 1 + (excess * 5) + // At emergency threshold: 1x, at +20% above: 2x, at +40% above: 3x + excessPressure := metrics.MemoryPressure - l.config.EmergencyThreshold + if excessPressure < 0 { + excessPressure = 0 } + multiplier := 1.0 + excessPressure*5.0 + + emergencyDelaySec := delaySec * multiplier + maxEmergencySec := float64(l.config.EmergencyMaxDelayMs) / 1000.0 + + if emergencyDelaySec > maxEmergencySec { + emergencyDelaySec = maxEmergencySec + } + // Minimum emergency delay of 100ms to allow other operations + if emergencyDelaySec < 0.1 { + emergencyDelaySec = 0.1 + } + delaySec = emergencyDelaySec } if delaySec > 0 { diff --git a/pkg/version/version b/pkg/version/version index 504769c..9dc0e18 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.48.15 +v0.49.0