Compare commits

...

2 Commits

Author SHA1 Message Date
woikos
a0af5bb45e Fix Neo4j query returning zero events for REQ filters (v0.49.1)
Some checks are pending
Go / build-and-release (push) Waiting to run
- Fix zero-value timestamp filter bug: since/until with value 0 were
  being added as WHERE clauses, causing queries to match no events
- Fix event parsing: use direct slice assignment instead of copy() on
  nil slices for ID, Pubkey, and Sig fields

Files modified:
- pkg/neo4j/query-events.go: Fix buildCypherQuery and parseEventsFromResult

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 10:56:31 +01:00
woikos
9da1784b1b Add Blossom bandwidth limiting and tune rate limiters (v0.49.0)
Some checks are pending
Go / build-and-release (push) Waiting to run
- Add token-bucket bandwidth rate limiting for Blossom uploads
  - ORLY_BLOSSOM_RATE_LIMIT enables limiting (default: false)
  - ORLY_BLOSSOM_DAILY_LIMIT_MB sets daily limit (default: 10MB)
  - ORLY_BLOSSOM_BURST_LIMIT_MB sets burst cap (default: 50MB)
  - Followed users, admins, owners are exempt (unlimited)
- Change emergency mode throttling from exponential to linear scaling
  - Old: 16x multiplier at emergency threshold entry
  - New: 1x at threshold, +1x per 20% excess pressure
- Reduce follows ACL throttle increment from 200ms to 25ms per event
- Update dependencies

Files modified:
- app/blossom.go: Pass rate limit config to blossom server
- app/config/config.go: Add Blossom rate limit config options
- pkg/blossom/ratelimit.go: New bandwidth limiter implementation
- pkg/blossom/server.go: Add rate limiter integration
- pkg/blossom/handlers.go: Check rate limits on upload/mirror/media
- pkg/ratelimit/limiter.go: Linear emergency throttling
- pkg/acl/follows.go: Reduce default throttle increment

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 08:09:39 +01:00
18 changed files with 329 additions and 52 deletions

View File

@@ -20,8 +20,12 @@ func initializeBlossomServer(
blossomCfg := &blossom.Config{
BaseURL: "", // Will be set dynamically per request
MaxBlobSize: 100 * 1024 * 1024, // 100MB default
AllowedMimeTypes: nil, // Allow all MIME types by default
AllowedMimeTypes: nil, // Allow all MIME types by default
RequireAuth: cfg.AuthRequired || cfg.AuthToWrite,
// Rate limiting for non-followed users
RateLimitEnabled: cfg.BlossomRateLimitEnabled,
DailyLimitMB: cfg.BlossomDailyLimitMB,
BurstLimitMB: cfg.BlossomBurstLimitMB,
}
// Create blossom server with relay's ACL registry
@@ -31,7 +35,12 @@ func initializeBlossomServer(
// We'll need to modify the handler to inject the baseURL per request
// For now, we'll use a middleware approach
log.I.F("blossom server initialized with ACL mode: %s", cfg.ACLMode)
if cfg.BlossomRateLimitEnabled {
log.I.F("blossom server initialized with ACL mode: %s, rate limit: %dMB/day (burst: %dMB)",
cfg.ACLMode, cfg.BlossomDailyLimitMB, cfg.BlossomBurstLimitMB)
} else {
log.I.F("blossom server initialized with ACL mode: %s", cfg.ACLMode)
}
return bs, nil
}

View File

@@ -69,13 +69,18 @@ type C struct {
// Progressive throttle for follows ACL mode - allows non-followed users to write with increasing delay
FollowsThrottleEnabled bool `env:"ORLY_FOLLOWS_THROTTLE" default:"false" usage:"enable progressive delay for non-followed users in follows ACL mode"`
FollowsThrottlePerEvent time.Duration `env:"ORLY_FOLLOWS_THROTTLE_INCREMENT" default:"200ms" usage:"delay added per event for non-followed users"`
FollowsThrottlePerEvent time.Duration `env:"ORLY_FOLLOWS_THROTTLE_INCREMENT" default:"25ms" usage:"delay added per event for non-followed users"`
FollowsThrottleMaxDelay time.Duration `env:"ORLY_FOLLOWS_THROTTLE_MAX" default:"60s" usage:"maximum throttle delay cap"`
// Blossom blob storage service settings
BlossomEnabled bool `env:"ORLY_BLOSSOM_ENABLED" default:"true" usage:"enable Blossom blob storage server (only works with Badger backend)"`
BlossomServiceLevels string `env:"ORLY_BLOSSOM_SERVICE_LEVELS" usage:"comma-separated list of service levels in format: name:storage_mb_per_sat_per_month (e.g., basic:1,premium:10)"`
// Blossom upload rate limiting (for non-followed users)
BlossomRateLimitEnabled bool `env:"ORLY_BLOSSOM_RATE_LIMIT" default:"false" usage:"enable upload rate limiting for non-followed users"`
BlossomDailyLimitMB int64 `env:"ORLY_BLOSSOM_DAILY_LIMIT_MB" default:"10" usage:"daily upload limit in MB for non-followed users (EMA averaged)"`
BlossomBurstLimitMB int64 `env:"ORLY_BLOSSOM_BURST_LIMIT_MB" default:"50" usage:"max burst upload in MB (bucket cap)"`
// Web UI and dev mode settings
WebDisableEmbedded bool `env:"ORLY_WEB_DISABLE" default:"false" usage:"disable serving the embedded web UI; useful for hot-reload during development"`
WebDevProxyURL string `env:"ORLY_WEB_DEV_PROXY_URL" usage:"when ORLY_WEB_DISABLE is true, reverse-proxy non-API paths to this dev server URL (e.g. http://localhost:5173)"`

View File

@@ -124,6 +124,17 @@ func (s *Server) handleCashuKeysets(w http.ResponseWriter, r *http.Request) {
// handleCashuInfo handles GET /cashu/info - returns mint information.
func (s *Server) handleCashuInfo(w http.ResponseWriter, r *http.Request) {
// CORS headers for browser-based CAT support detection
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Accept")
// Handle preflight
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if s.CashuIssuer == nil {
http.Error(w, "Cashu tokens not enabled", http.StatusNotImplemented)
return

View File

@@ -21,7 +21,7 @@ import (
)
func (l *Listener) HandleEvent(msg []byte) (err error) {
log.D.F("HandleEvent: START handling event: %s", msg)
log.I.F("HandleEvent: START handling event: %s", string(msg[:min(200, len(msg))]))
// 1. Raw JSON validation (before unmarshal) - use validation service
if result := l.eventValidator.ValidateRawJSON(msg); !result.Valid {
@@ -231,6 +231,11 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
// Authorization check (policy + ACL) - use authorization service
decision := l.eventAuthorizer.Authorize(env.E, l.authedPubkey.Load(), l.remote, env.E.Kind)
// Debug: log ephemeral event authorization
if env.E.Kind >= 20000 && env.E.Kind < 30000 {
log.I.F("ephemeral auth check: kind %d, allowed=%v, reason=%s",
env.E.Kind, decision.Allowed, decision.DenyReason)
}
if !decision.Allowed {
log.D.F("HandleEvent: authorization denied: %s (requireAuth=%v)", decision.DenyReason, decision.RequireAuth)
if decision.RequireAuth {
@@ -256,14 +261,17 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
log.I.F("HandleEvent: authorized with access level %s", decision.AccessLevel)
// Progressive throttle for follows ACL mode (delays non-followed users)
if delay := l.getFollowsThrottleDelay(env.E); delay > 0 {
log.D.F("HandleEvent: applying progressive throttle delay of %v for %0x from %s",
delay, env.E.Pubkey, l.remote)
select {
case <-l.ctx.Done():
return l.ctx.Err()
case <-time.After(delay):
// Delay completed, continue processing
// Skip throttle if a Cashu Access Token is present (authenticated via CAT)
if l.cashuToken == nil {
if delay := l.getFollowsThrottleDelay(env.E); delay > 0 {
log.D.F("HandleEvent: applying progressive throttle delay of %v for %0x from %s",
delay, env.E.Pubkey, l.remote)
select {
case <-l.ctx.Done():
return l.ctx.Err()
case <-time.After(delay):
// Delay completed, continue processing
}
}
}

View File

@@ -159,12 +159,26 @@ func (p *P) Deliver(ev *event.E) {
sub Subscription
}
var deliveries []delivery
// Debug: log ephemeral event delivery attempts
isEphemeral := ev.Kind >= 20000 && ev.Kind < 30000
if isEphemeral {
var tagInfo string
if ev.Tags != nil {
tagInfo = string(ev.Tags.Marshal(nil))
}
log.I.F("ephemeral event kind %d, id %0x, checking %d connections for matches, tags: %s",
ev.Kind, ev.ID[:8], len(p.Map), tagInfo)
}
for w, subs := range p.Map {
for id, subscriber := range subs {
if subscriber.Match(ev) {
deliveries = append(
deliveries, delivery{w: w, id: id, sub: subscriber},
)
} else if isEphemeral {
// Debug: log why ephemeral events don't match
log.I.F("ephemeral event kind %d did NOT match subscription %s (filters: %s)",
ev.Kind, id, string(subscriber.S.Marshal(nil)))
}
}
}

8
go.mod
View File

@@ -3,12 +3,14 @@ module next.orly.dev
go 1.25.3
require (
git.mleku.dev/mleku/nostr v1.0.12
git.mleku.dev/mleku/nostr v1.0.13
github.com/adrg/xdg v0.5.3
github.com/alexflint/go-arg v1.6.1
github.com/aperturerobotics/go-indexeddb v0.2.3
github.com/bits-and-blooms/bloom/v3 v3.7.1
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0
github.com/dgraph-io/badger/v4 v4.8.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/hack-pad/safejs v0.1.1
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0
@@ -22,6 +24,7 @@ require (
github.com/stretchr/testify v1.11.1
github.com/vertex-lab/nostr-sqlite v0.3.2
go-simpler.org/env v0.12.0
go.etcd.io/bbolt v1.4.3
go.uber.org/atomic v1.11.0
golang.org/x/crypto v0.46.0
golang.org/x/lint v0.0.0-20241112194109-818c5a804067
@@ -37,7 +40,6 @@ require (
github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 // indirect
github.com/alexflint/go-scalar v1.2.0 // indirect
github.com/bits-and-blooms/bitset v1.24.2 // indirect
github.com/bits-and-blooms/bloom/v3 v3.7.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect
github.com/bytedance/sonic v1.13.1 // indirect
@@ -56,7 +58,6 @@ require (
github.com/google/btree v1.1.2 // indirect
github.com/google/flatbuffers v25.9.23+incompatible // indirect
github.com/google/pprof v0.0.0-20251007162407-5df77e3f7d1d // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
@@ -72,7 +73,6 @@ require (
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
go.etcd.io/bbolt v1.4.3 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect

5
go.sum
View File

@@ -1,5 +1,5 @@
git.mleku.dev/mleku/nostr v1.0.12 h1:bjsFUh1Q3fGpU7qsqxggGgrGGUt2OBdu1w8hjDM4gJE=
git.mleku.dev/mleku/nostr v1.0.12/go.mod h1:kJwSMmLRnAJ7QJtgXDv2wGgceFU0luwVqrgAL3MI93M=
git.mleku.dev/mleku/nostr v1.0.13 h1:FqeOQ9ZX8AFVsAI6XisQkB6cgmhn9DNQ2a8li9gx7aY=
git.mleku.dev/mleku/nostr v1.0.13/go.mod h1:kJwSMmLRnAJ7QJtgXDv2wGgceFU0luwVqrgAL3MI93M=
github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg=
github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 h1:ClzzXMDDuUbWfNNZqGeYq4PnYOlwlOVIvSyNaIy0ykg=
@@ -161,6 +161,7 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg=
github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/vertex-lab/nostr-sqlite v0.3.2 h1:8nZYYIwiKnWLA446qA/wL/Gy+bU0kuaxdLfUyfeTt/E=
github.com/vertex-lab/nostr-sqlite v0.3.2/go.mod h1:5bw1wMgJhSdrumsZAWxqy+P0u1g+q02PnlGQn15dnSM=

View File

@@ -138,7 +138,7 @@ func (f *Follows) Configure(cfg ...any) (err error) {
if f.cfg.FollowsThrottleEnabled {
perEvent := f.cfg.FollowsThrottlePerEvent
if perEvent == 0 {
perEvent = 200 * time.Millisecond
perEvent = 25 * time.Millisecond
}
maxDelay := f.cfg.FollowsThrottleMaxDelay
if maxDelay == 0 {

View File

@@ -200,6 +200,12 @@ func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) {
return
}
// Check bandwidth rate limit (non-followed users)
if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) {
s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later")
return
}
// Calculate SHA256 after auth check
sha256Hash := CalculateSHA256(body)
sha256Hex := hex.Enc(sha256Hash)
@@ -647,6 +653,12 @@ func (s *Server) handleMirror(w http.ResponseWriter, r *http.Request) {
return
}
// Check bandwidth rate limit (non-followed users)
if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) {
s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later")
return
}
// Note: pubkey may be nil for anonymous uploads if ACL allows it
// Detect MIME type from remote response
@@ -726,6 +738,12 @@ func (s *Server) handleMediaUpload(w http.ResponseWriter, r *http.Request) {
return
}
// Check bandwidth rate limit (non-followed users)
if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) {
s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later")
return
}
// Note: pubkey may be nil for anonymous uploads if ACL allows it
// Optimize media (placeholder - actual optimization would be implemented here)

131
pkg/blossom/ratelimit.go Normal file
View File

@@ -0,0 +1,131 @@
package blossom
import (
"sync"
"time"
)
// BandwidthState tracks upload bandwidth for an identity
type BandwidthState struct {
BucketBytes int64 // Current token bucket level (bytes available)
LastUpdate time.Time // Last time bucket was updated
}
// BandwidthLimiter implements token bucket rate limiting for uploads.
// Each identity gets a bucket that replenishes at dailyLimit/day rate.
// Uploads consume tokens from the bucket.
type BandwidthLimiter struct {
mu sync.Mutex
states map[string]*BandwidthState // keyed by pubkey hex or IP
dailyLimit int64 // bytes per day
burstLimit int64 // max bucket size (burst capacity)
refillRate float64 // bytes per second refill rate
}
// NewBandwidthLimiter creates a new bandwidth limiter.
// dailyLimitMB is the average daily limit in megabytes.
// burstLimitMB is the maximum burst capacity in megabytes.
func NewBandwidthLimiter(dailyLimitMB, burstLimitMB int64) *BandwidthLimiter {
dailyBytes := dailyLimitMB * 1024 * 1024
burstBytes := burstLimitMB * 1024 * 1024
return &BandwidthLimiter{
states: make(map[string]*BandwidthState),
dailyLimit: dailyBytes,
burstLimit: burstBytes,
refillRate: float64(dailyBytes) / 86400.0, // bytes per second
}
}
// CheckAndConsume checks if an upload of the given size is allowed for the identity,
// and if so, consumes the tokens. Returns true if allowed, false if rate limited.
// The identity should be pubkey hex for authenticated users, or IP for anonymous.
func (bl *BandwidthLimiter) CheckAndConsume(identity string, sizeBytes int64) bool {
bl.mu.Lock()
defer bl.mu.Unlock()
now := time.Now()
state, exists := bl.states[identity]
if !exists {
// New identity starts with full burst capacity
state = &BandwidthState{
BucketBytes: bl.burstLimit,
LastUpdate: now,
}
bl.states[identity] = state
} else {
// Refill bucket based on elapsed time
elapsed := now.Sub(state.LastUpdate).Seconds()
refill := int64(elapsed * bl.refillRate)
state.BucketBytes += refill
if state.BucketBytes > bl.burstLimit {
state.BucketBytes = bl.burstLimit
}
state.LastUpdate = now
}
// Check if upload fits in bucket
if state.BucketBytes >= sizeBytes {
state.BucketBytes -= sizeBytes
return true
}
return false
}
// GetAvailable returns the currently available bytes for an identity.
func (bl *BandwidthLimiter) GetAvailable(identity string) int64 {
bl.mu.Lock()
defer bl.mu.Unlock()
state, exists := bl.states[identity]
if !exists {
return bl.burstLimit // New users have full capacity
}
// Calculate current level with refill
now := time.Now()
elapsed := now.Sub(state.LastUpdate).Seconds()
refill := int64(elapsed * bl.refillRate)
available := state.BucketBytes + refill
if available > bl.burstLimit {
available = bl.burstLimit
}
return available
}
// GetTimeUntilAvailable returns how long until the given bytes will be available.
func (bl *BandwidthLimiter) GetTimeUntilAvailable(identity string, sizeBytes int64) time.Duration {
available := bl.GetAvailable(identity)
if available >= sizeBytes {
return 0
}
needed := sizeBytes - available
seconds := float64(needed) / bl.refillRate
return time.Duration(seconds * float64(time.Second))
}
// Cleanup removes entries that have fully replenished (at burst limit).
func (bl *BandwidthLimiter) Cleanup() {
bl.mu.Lock()
defer bl.mu.Unlock()
now := time.Now()
for key, state := range bl.states {
elapsed := now.Sub(state.LastUpdate).Seconds()
refill := int64(elapsed * bl.refillRate)
if state.BucketBytes+refill >= bl.burstLimit {
delete(bl.states, key)
}
}
}
// Stats returns the number of tracked identities.
func (bl *BandwidthLimiter) Stats() int {
bl.mu.Lock()
defer bl.mu.Unlock()
return len(bl.states)
}

View File

@@ -19,6 +19,9 @@ type Server struct {
maxBlobSize int64
allowedMimeTypes map[string]bool
requireAuth bool
// Rate limiting for uploads
bandwidthLimiter *BandwidthLimiter
}
// Config holds configuration for the Blossom server
@@ -27,6 +30,11 @@ type Config struct {
MaxBlobSize int64
AllowedMimeTypes []string
RequireAuth bool
// Rate limiting (for non-followed users)
RateLimitEnabled bool
DailyLimitMB int64
BurstLimitMB int64
}
// NewServer creates a new Blossom server instance
@@ -48,6 +56,20 @@ func NewServer(db *database.D, aclRegistry *acl.S, cfg *Config) *Server {
}
}
// Initialize bandwidth limiter if enabled
var bwLimiter *BandwidthLimiter
if cfg.RateLimitEnabled {
dailyMB := cfg.DailyLimitMB
if dailyMB <= 0 {
dailyMB = 10 // 10MB default
}
burstMB := cfg.BurstLimitMB
if burstMB <= 0 {
burstMB = 50 // 50MB default burst
}
bwLimiter = NewBandwidthLimiter(dailyMB, burstMB)
}
return &Server{
db: db,
storage: storage,
@@ -56,6 +78,7 @@ func NewServer(db *database.D, aclRegistry *acl.S, cfg *Config) *Server {
maxBlobSize: cfg.MaxBlobSize,
allowedMimeTypes: allowedMap,
requireAuth: cfg.RequireAuth,
bandwidthLimiter: bwLimiter,
}
}
@@ -208,6 +231,44 @@ func (s *Server) checkACL(
return actual >= required
}
// isRateLimitExempt returns true if the user is exempt from rate limiting.
// Users with write access or higher (followed users, admins, owners) are exempt.
func (s *Server) isRateLimitExempt(pubkey []byte, remoteAddr string) bool {
if s.acl == nil {
return true // No ACL configured, no rate limiting
}
level := s.acl.GetAccessLevel(pubkey, remoteAddr)
// Followed users get "write" level, admins/owners get higher
// Only "read" and "none" are rate limited
return level == "write" || level == "admin" || level == "owner"
}
// checkBandwidthLimit checks if the upload is allowed under rate limits.
// Returns true if allowed, false if rate limited.
// Exempt users (followed, admin, owner) always return true.
func (s *Server) checkBandwidthLimit(pubkey []byte, remoteAddr string, sizeBytes int64) bool {
if s.bandwidthLimiter == nil {
return true // No rate limiting configured
}
// Check if user is exempt
if s.isRateLimitExempt(pubkey, remoteAddr) {
return true
}
// Use pubkey hex if available, otherwise IP
var identity string
if len(pubkey) > 0 {
identity = string(pubkey) // Will be converted to hex in handler
} else {
identity = remoteAddr
}
return s.bandwidthLimiter.CheckAndConsume(identity, sizeBytes)
}
// BaseURLKey is the context key for the base URL (exported for use by app handler)
type BaseURLKey struct{}

View File

@@ -6,6 +6,7 @@ import (
"sort"
"lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database/indexes"
types2 "next.orly.dev/pkg/database/indexes/types"
@@ -44,6 +45,12 @@ func NormalizeTagValueForHash(key byte, valueBytes []byte) []byte {
func CreateIdHashFromData(data []byte) (i *types2.IdHash, err error) {
i = new(types2.IdHash)
// Skip empty data to avoid noisy errors
if len(data) == 0 {
err = errorf.E("CreateIdHashFromData: empty ID provided")
return
}
// If data looks like hex string and has the right length for hex-encoded
// sha256
if len(data) == 64 {
@@ -95,6 +102,11 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
// should be an error, but convention just ignores it.
if f.Ids.Len() > 0 {
for _, id := range f.Ids.T {
// Skip empty IDs - some filters have empty ID values
if len(id) == 0 {
log.D.F("GetIndexesFromFilter: skipping empty ID in filter (ids=%d)", f.Ids.Len())
continue
}
if err = func() (err error) {
var i *types2.IdHash
if i, err = CreateIdHashFromData(id); chk.E(err) {

View File

@@ -20,6 +20,10 @@ import (
func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
// log.T.F("GetSerialById: input id=%s", hex.Enc(id))
if len(id) == 0 {
err = errorf.E("GetSerialById: called with empty ID")
return
}
var idxs []Range
if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.NewFromBytesSlice(id)}); chk.E(err) {
return
@@ -102,6 +106,10 @@ func (d *D) GetSerialsByIdsWithFilter(
// Process each ID sequentially
for _, id := range ids.T {
// Skip empty IDs
if len(id) == 0 {
continue
}
// idHex := hex.Enc(id)
// Get the index prefix for this ID

View File

@@ -24,8 +24,8 @@ func (i *IdHash) Set(idh []byte) {
func (i *IdHash) FromId(id []byte) (err error) {
if len(id) != sha256.Size {
err = errorf.E(
"FromId: invalid ID length, got %d require %d", len(id),
sha256.Size,
"FromId: invalid ID length, got %d require %d (data=%x)", len(id),
sha256.Size, id,
)
return
}

View File

@@ -3,6 +3,7 @@ package routing
import (
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/kind"
"lol.mleku.dev/log"
)
// Publisher abstracts event delivery to subscribers.
@@ -22,6 +23,7 @@ func IsEphemeral(k uint16) bool {
// - Are immediately delivered to subscribers
func MakeEphemeralHandler(publisher Publisher) Handler {
return func(ev *event.E, authedPubkey []byte) Result {
log.I.F("ephemeral handler received event kind %d, id %0x", ev.Kind, ev.ID[:8])
// Clone and deliver immediately without persistence
cloned := ev.Clone()
go publisher.Deliver(cloned)

View File

@@ -130,11 +130,13 @@ func (n *N) buildCypherQuery(f *filter.F, includeDeleteEvents bool) (string, map
}
// Time range filters - for temporal queries
if f.Since != nil {
// Note: Check both pointer and value - a zero timestamp (Unix epoch 1970) is almost
// certainly not a valid constraint as Nostr events didn't exist then
if f.Since != nil && f.Since.V > 0 {
params["since"] = f.Since.V
whereClauses = append(whereClauses, "e.created_at >= $since")
}
if f.Until != nil {
if f.Until != nil && f.Until.V > 0 {
params["until"] = f.Until.V
whereClauses = append(whereClauses, "e.created_at <= $until")
}
@@ -300,19 +302,17 @@ func (n *N) parseEventsFromResult(result *CollectedResult) ([]*event.E, error) {
_ = tags.UnmarshalJSON([]byte(tagsStr))
}
// Create event
// Create event with decoded binary fields
e := &event.E{
ID: id,
Pubkey: pubkey,
Kind: uint16(kind),
CreatedAt: createdAt,
Content: []byte(content),
Tags: tags,
Sig: sig,
}
// Copy fixed-size arrays
copy(e.ID[:], id)
copy(e.Sig[:], sig)
copy(e.Pubkey[:], pubkey)
events = append(events, e)
}

View File

@@ -377,29 +377,26 @@ func (l *Limiter) ComputeDelay(opType OperationType) time.Duration {
// In emergency mode, apply progressive throttling for writes
if inEmergency {
// Calculate how far above recovery threshold we are
// At emergency threshold, add 1x normal delay
// For every additional 10% above emergency, double the delay
excessPressure := metrics.MemoryPressure - l.config.RecoveryThreshold
if excessPressure > 0 {
// Progressive multiplier: starts at 2x, doubles every 10% excess
multiplier := 2.0
for excess := excessPressure; excess > 0.1; excess -= 0.1 {
multiplier *= 2
}
emergencyDelaySec := delaySec * multiplier
maxEmergencySec := float64(l.config.EmergencyMaxDelayMs) / 1000.0
if emergencyDelaySec > maxEmergencySec {
emergencyDelaySec = maxEmergencySec
}
// Minimum emergency delay of 100ms to allow other operations
if emergencyDelaySec < 0.1 {
emergencyDelaySec = 0.1
}
delaySec = emergencyDelaySec
// Calculate how far above emergency threshold we are
// Linear scaling: multiplier = 1 + (excess * 5)
// At emergency threshold: 1x, at +20% above: 2x, at +40% above: 3x
excessPressure := metrics.MemoryPressure - l.config.EmergencyThreshold
if excessPressure < 0 {
excessPressure = 0
}
multiplier := 1.0 + excessPressure*5.0
emergencyDelaySec := delaySec * multiplier
maxEmergencySec := float64(l.config.EmergencyMaxDelayMs) / 1000.0
if emergencyDelaySec > maxEmergencySec {
emergencyDelaySec = maxEmergencySec
}
// Minimum emergency delay of 100ms to allow other operations
if emergencyDelaySec < 0.1 {
emergencyDelaySec = 0.1
}
delaySec = emergencyDelaySec
}
if delaySec > 0 {

View File

@@ -1 +1 @@
v0.48.15
v0.49.1