diff --git a/CLAUDE.md b/CLAUDE.md index 3edbf8a..dc7c2b4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -40,7 +40,7 @@ NOSTR_SECRET_KEY=nsec1... ./nurl https://relay.example.com/api/logs/clear |----------|---------|-------------| | `ORLY_PORT` | 3334 | Server port | | `ORLY_LOG_LEVEL` | info | trace/debug/info/warn/error | -| `ORLY_DB_TYPE` | badger | badger/neo4j/wasmdb | +| `ORLY_DB_TYPE` | badger | badger/bbolt/neo4j/wasmdb | | `ORLY_POLICY_ENABLED` | false | Enable policy system | | `ORLY_ACL_MODE` | none | none/follows/managed | | `ORLY_TLS_DOMAINS` | | Let's Encrypt domains | @@ -67,6 +67,7 @@ app/ web/ → Svelte frontend (embedded via go:embed) pkg/ database/ → Database interface + Badger implementation + bbolt/ → BBolt backend (HDD-optimized, B+tree) neo4j/ → Neo4j backend with WoT extensions wasmdb/ → WebAssembly IndexedDB backend protocol/ → Nostr protocol (ws/, auth/, publish/) @@ -149,12 +150,59 @@ Before enabling auth-required on any deployment: | Backend | Use Case | Build | |---------|----------|-------| -| **Badger** (default) | Single-instance, embedded | Standard | +| **Badger** (default) | Single-instance, SSD, high performance | Standard | +| **BBolt** | HDD-optimized, large archives, lower memory | `ORLY_DB_TYPE=bbolt` | | **Neo4j** | Social graph, WoT queries | `ORLY_DB_TYPE=neo4j` | | **WasmDB** | Browser/WebAssembly | `GOOS=js GOARCH=wasm` | All implement `pkg/database.Database` interface. +### Scaling for Large Archives + +For archives with millions of events, consider: + +**Option 1: Tune Badger (SSD recommended)** +```bash +# Increase caches for larger working set (requires more RAM) +ORLY_DB_BLOCK_CACHE_MB=2048 # 2GB block cache +ORLY_DB_INDEX_CACHE_MB=1024 # 1GB index cache +ORLY_SERIAL_CACHE_PUBKEYS=500000 # 500k pubkeys +ORLY_SERIAL_CACHE_EVENT_IDS=2000000 # 2M event IDs + +# Higher compression to reduce disk IO +ORLY_DB_ZSTD_LEVEL=9 # Best compression ratio + +# Enable storage GC with aggressive eviction +ORLY_GC_ENABLED=true +ORLY_GC_BATCH_SIZE=5000 +ORLY_MAX_STORAGE_BYTES=107374182400 # 100GB cap +``` + +**Option 2: Use BBolt for HDD/Low-Memory Deployments** +```bash +ORLY_DB_TYPE=bbolt + +# Tune for your HDD +ORLY_BBOLT_BATCH_MAX_EVENTS=10000 # Larger batches for HDD +ORLY_BBOLT_BATCH_MAX_MB=256 # 256MB batch buffer +ORLY_BBOLT_FLUSH_TIMEOUT_SEC=60 # Longer flush interval +ORLY_BBOLT_BLOOM_SIZE_MB=32 # Larger bloom filter +ORLY_BBOLT_MMAP_SIZE_MB=16384 # 16GB mmap (scales with DB size) +``` + +**Migration Between Backends** +```bash +# Migrate from Badger to BBolt +./orly migrate --from badger --to bbolt + +# Migrate with custom target path +./orly migrate --from badger --to bbolt --target-path /mnt/hdd/orly-archive +``` + +**BBolt vs Badger Trade-offs:** +- BBolt: Lower memory, HDD-friendly, simpler (B+tree), slower random reads +- Badger: Higher memory, SSD-optimized (LSM), faster concurrent access + ## Logging (lol.mleku.dev) ```go @@ -221,7 +269,8 @@ if (isValidNsec(nsec)) { ... } ## Dependencies -- `github.com/dgraph-io/badger/v4` - Badger DB +- `github.com/dgraph-io/badger/v4` - Badger DB (LSM, SSD-optimized) +- `go.etcd.io/bbolt` - BBolt DB (B+tree, HDD-optimized) - `github.com/neo4j/neo4j-go-driver/v5` - Neo4j - `github.com/gorilla/websocket` - WebSocket - `github.com/ebitengine/purego` - CGO-free C loading diff --git a/app/config/config.go b/app/config/config.go index 700bb86..4199545 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -41,9 +41,9 @@ type C struct { EnableShutdown bool `env:"ORLY_ENABLE_SHUTDOWN" default:"false" usage:"if true, expose /shutdown on the health port to gracefully stop the process (for profiling)"` LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"` DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"` - DBBlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"512" usage:"Badger block cache size in MB (higher improves read hit ratio)"` - DBIndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"256" usage:"Badger index cache size in MB (improves index lookup performance)"` - DBZSTDLevel int `env:"ORLY_DB_ZSTD_LEVEL" default:"1" usage:"Badger ZSTD compression level (1=fast/500MB/s, 3=default, 9=best ratio, 0=disable)"` + DBBlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"1024" usage:"Badger block cache size in MB (higher improves read hit ratio, increase for large archives)"` + DBIndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"512" usage:"Badger index cache size in MB (improves index lookup performance, increase for large archives)"` + DBZSTDLevel int `env:"ORLY_DB_ZSTD_LEVEL" default:"3" usage:"Badger ZSTD compression level (1=fast/500MB/s, 3=balanced, 9=best ratio/slower, 0=disable)"` LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"` LogBufferSize int `env:"ORLY_LOG_BUFFER_SIZE" default:"10000" usage:"number of log entries to keep in memory for web UI viewing (0 disables)"` Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation,heap,block,goroutine,threadcreate,mutex"` @@ -124,9 +124,9 @@ type C struct { Neo4jMaxTxRetrySeconds int `env:"ORLY_NEO4J_MAX_TX_RETRY_SEC" default:"30" usage:"max seconds for retryable transaction attempts"` Neo4jQueryResultLimit int `env:"ORLY_NEO4J_QUERY_RESULT_LIMIT" default:"10000" usage:"max results returned per query (prevents unbounded memory usage, 0=unlimited)"` - // Advanced database tuning - SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"100000" usage:"max pubkeys to cache for compact event storage (default: 100000, ~3.2MB memory)"` - SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"500000" usage:"max event IDs to cache for compact event storage (default: 500000, ~16MB memory)"` + // Advanced database tuning (increase for large archives to reduce cache misses) + SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"250000" usage:"max pubkeys to cache for compact event storage (~8MB memory, increase for large archives)"` + SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"1000000" usage:"max event IDs to cache for compact event storage (~32MB memory, increase for large archives)"` // Connection concurrency control MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"` diff --git a/main.go b/main.go index 7030128..3322e90 100644 --- a/main.go +++ b/main.go @@ -24,7 +24,7 @@ import ( "next.orly.dev/pkg/acl" "git.mleku.dev/mleku/nostr/crypto/keys" "git.mleku.dev/mleku/nostr/encoders/bech32encoding" - _ "next.orly.dev/pkg/bbolt" // Import for bbolt factory registration + bboltdb "next.orly.dev/pkg/bbolt" // Import for bbolt factory and type "next.orly.dev/pkg/database" neo4jdb "next.orly.dev/pkg/neo4j" // Import for neo4j factory and type "git.mleku.dev/mleku/nostr/encoders/hex" @@ -617,6 +617,10 @@ func main() { n4jDB.MaxConcurrentQueries(), ) log.I.F("rate limiter configured for Neo4j backend (target: %dMB)", targetMB) + } else if _, ok := db.(*bboltdb.B); ok { + // BBolt uses memory-mapped IO, so memory-only limiter is appropriate + limiter = ratelimit.NewMemoryOnlyLimiter(rlConfig) + log.I.F("rate limiter configured for BBolt backend (target: %dMB)", targetMB) } else { // For other backends, create a disabled limiter limiter = ratelimit.NewDisabledLimiter() diff --git a/pkg/ratelimit/factory.go b/pkg/ratelimit/factory.go index c92255a..550e881 100644 --- a/pkg/ratelimit/factory.go +++ b/pkg/ratelimit/factory.go @@ -54,3 +54,11 @@ func MonitorFromNeo4jDriver( ) loadmonitor.Monitor { return NewNeo4jMonitor(driver, querySem, maxConcurrency, 100*time.Millisecond) } + +// NewMemoryOnlyLimiter creates a rate limiter that only monitors process memory. +// Useful for database backends that don't have their own load metrics (e.g., BBolt). +// Since BBolt uses memory-mapped IO, memory pressure is still relevant. +func NewMemoryOnlyLimiter(config Config) *Limiter { + monitor := NewMemoryMonitor(100 * time.Millisecond) + return NewLimiter(config, monitor) +} diff --git a/pkg/ratelimit/memory_monitor.go b/pkg/ratelimit/memory_monitor.go new file mode 100644 index 0000000..6fbe1f9 --- /dev/null +++ b/pkg/ratelimit/memory_monitor.go @@ -0,0 +1,214 @@ +//go:build !(js && wasm) + +package ratelimit + +import ( + "sync" + "sync/atomic" + "time" + + "next.orly.dev/pkg/interfaces/loadmonitor" +) + +// MemoryMonitor is a simple load monitor that only tracks process memory. +// Used for database backends that don't have their own load metrics (e.g., BBolt). +type MemoryMonitor struct { + // Configuration + pollInterval time.Duration + targetBytes atomic.Uint64 + + // State + running atomic.Bool + stopChan chan struct{} + doneChan chan struct{} + + // Metrics (protected by mutex) + mu sync.RWMutex + currentMetrics loadmonitor.Metrics + + // Latency tracking + queryLatencies []time.Duration + writeLatencies []time.Duration + latencyMu sync.Mutex + + // Emergency mode + emergencyThreshold float64 // e.g., 1.167 (target + 1/6) + recoveryThreshold float64 // e.g., 0.833 (target - 1/6) + inEmergency atomic.Bool +} + +// NewMemoryMonitor creates a memory-only load monitor. +// pollInterval controls how often memory is sampled (recommended: 100ms). +func NewMemoryMonitor(pollInterval time.Duration) *MemoryMonitor { + m := &MemoryMonitor{ + pollInterval: pollInterval, + stopChan: make(chan struct{}), + doneChan: make(chan struct{}), + queryLatencies: make([]time.Duration, 0, 100), + writeLatencies: make([]time.Duration, 0, 100), + emergencyThreshold: 1.167, // Default: target + 1/6 + recoveryThreshold: 0.833, // Default: target - 1/6 + } + return m +} + +// GetMetrics returns the current load metrics. +func (m *MemoryMonitor) GetMetrics() loadmonitor.Metrics { + m.mu.RLock() + defer m.mu.RUnlock() + return m.currentMetrics +} + +// RecordQueryLatency records a query latency sample. +func (m *MemoryMonitor) RecordQueryLatency(latency time.Duration) { + m.latencyMu.Lock() + defer m.latencyMu.Unlock() + + m.queryLatencies = append(m.queryLatencies, latency) + if len(m.queryLatencies) > 100 { + m.queryLatencies = m.queryLatencies[1:] + } +} + +// RecordWriteLatency records a write latency sample. +func (m *MemoryMonitor) RecordWriteLatency(latency time.Duration) { + m.latencyMu.Lock() + defer m.latencyMu.Unlock() + + m.writeLatencies = append(m.writeLatencies, latency) + if len(m.writeLatencies) > 100 { + m.writeLatencies = m.writeLatencies[1:] + } +} + +// SetMemoryTarget sets the target memory limit in bytes. +func (m *MemoryMonitor) SetMemoryTarget(bytes uint64) { + m.targetBytes.Store(bytes) +} + +// SetEmergencyThreshold sets the memory threshold for emergency mode. +func (m *MemoryMonitor) SetEmergencyThreshold(threshold float64) { + m.mu.Lock() + defer m.mu.Unlock() + m.emergencyThreshold = threshold +} + +// GetEmergencyThreshold returns the current emergency threshold. +func (m *MemoryMonitor) GetEmergencyThreshold() float64 { + m.mu.RLock() + defer m.mu.RUnlock() + return m.emergencyThreshold +} + +// ForceEmergencyMode manually triggers emergency mode for a duration. +func (m *MemoryMonitor) ForceEmergencyMode(duration time.Duration) { + m.inEmergency.Store(true) + go func() { + time.Sleep(duration) + m.inEmergency.Store(false) + }() +} + +// Start begins background metric collection. +func (m *MemoryMonitor) Start() <-chan struct{} { + if m.running.Swap(true) { + // Already running + return m.doneChan + } + + go m.pollLoop() + return m.doneChan +} + +// Stop halts background metric collection. +func (m *MemoryMonitor) Stop() { + if !m.running.Swap(false) { + return + } + close(m.stopChan) + <-m.doneChan +} + +// pollLoop continuously samples memory and updates metrics. +func (m *MemoryMonitor) pollLoop() { + defer close(m.doneChan) + + ticker := time.NewTicker(m.pollInterval) + defer ticker.Stop() + + for { + select { + case <-m.stopChan: + return + case <-ticker.C: + m.updateMetrics() + } + } +} + +// updateMetrics samples current memory and updates the metrics. +func (m *MemoryMonitor) updateMetrics() { + target := m.targetBytes.Load() + if target == 0 { + target = 1 // Avoid division by zero + } + + // Get physical memory using the same method as other monitors + procMem := ReadProcessMemoryStats() + physicalMemBytes := procMem.PhysicalMemoryBytes() + physicalMemMB := physicalMemBytes / (1024 * 1024) + + // Calculate memory pressure + memPressure := float64(physicalMemBytes) / float64(target) + + // Check emergency mode thresholds + m.mu.RLock() + emergencyThreshold := m.emergencyThreshold + recoveryThreshold := m.recoveryThreshold + m.mu.RUnlock() + + wasEmergency := m.inEmergency.Load() + if memPressure > emergencyThreshold { + m.inEmergency.Store(true) + } else if memPressure < recoveryThreshold && wasEmergency { + m.inEmergency.Store(false) + } + + // Calculate average latencies + m.latencyMu.Lock() + var avgQuery, avgWrite time.Duration + if len(m.queryLatencies) > 0 { + var total time.Duration + for _, l := range m.queryLatencies { + total += l + } + avgQuery = total / time.Duration(len(m.queryLatencies)) + } + if len(m.writeLatencies) > 0 { + var total time.Duration + for _, l := range m.writeLatencies { + total += l + } + avgWrite = total / time.Duration(len(m.writeLatencies)) + } + m.latencyMu.Unlock() + + // Update metrics + m.mu.Lock() + m.currentMetrics = loadmonitor.Metrics{ + MemoryPressure: memPressure, + WriteLoad: 0, // No database-specific load metric + ReadLoad: 0, // No database-specific load metric + QueryLatency: avgQuery, + WriteLatency: avgWrite, + Timestamp: time.Now(), + InEmergencyMode: m.inEmergency.Load(), + CompactionPending: false, // BBolt doesn't have compaction + PhysicalMemoryMB: physicalMemMB, + } + m.mu.Unlock() +} + +// Ensure MemoryMonitor implements the required interfaces +var _ loadmonitor.Monitor = (*MemoryMonitor)(nil) +var _ loadmonitor.EmergencyModeMonitor = (*MemoryMonitor)(nil) diff --git a/pkg/version/version b/pkg/version/version index d2dba66..5574a36 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.48.11 +v0.48.12