Add BBolt rate limiting and tune Badger defaults for large archives (v0.48.12)
Some checks failed
Go / build-and-release (push) Has been cancelled
Some checks failed
Go / build-and-release (push) Has been cancelled
- Increase Badger cache defaults: block 512→1024MB, index 256→512MB - Increase serial cache defaults: pubkeys 100k→250k, event IDs 500k→1M - Change ZSTD default from level 1 (fast) to level 3 (balanced) - Add memory-only rate limiter for BBolt backend - Add BBolt to database backend docs with scaling recommendations - Document migration between Badger and BBolt backends Files modified: - app/config/config.go: Tuned defaults for large-scale deployments - main.go: Add BBolt rate limiter support - pkg/ratelimit/factory.go: Add NewMemoryOnlyLimiter factory - pkg/ratelimit/memory_monitor.go: New memory-only load monitor - CLAUDE.md: Add BBolt docs and scaling guide 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -54,3 +54,11 @@ func MonitorFromNeo4jDriver(
|
||||
) loadmonitor.Monitor {
|
||||
return NewNeo4jMonitor(driver, querySem, maxConcurrency, 100*time.Millisecond)
|
||||
}
|
||||
|
||||
// NewMemoryOnlyLimiter creates a rate limiter that only monitors process memory.
|
||||
// Useful for database backends that don't have their own load metrics (e.g., BBolt).
|
||||
// Since BBolt uses memory-mapped IO, memory pressure is still relevant.
|
||||
func NewMemoryOnlyLimiter(config Config) *Limiter {
|
||||
monitor := NewMemoryMonitor(100 * time.Millisecond)
|
||||
return NewLimiter(config, monitor)
|
||||
}
|
||||
|
||||
214
pkg/ratelimit/memory_monitor.go
Normal file
214
pkg/ratelimit/memory_monitor.go
Normal file
@@ -0,0 +1,214 @@
|
||||
//go:build !(js && wasm)
|
||||
|
||||
package ratelimit
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"next.orly.dev/pkg/interfaces/loadmonitor"
|
||||
)
|
||||
|
||||
// MemoryMonitor is a simple load monitor that only tracks process memory.
|
||||
// Used for database backends that don't have their own load metrics (e.g., BBolt).
|
||||
type MemoryMonitor struct {
|
||||
// Configuration
|
||||
pollInterval time.Duration
|
||||
targetBytes atomic.Uint64
|
||||
|
||||
// State
|
||||
running atomic.Bool
|
||||
stopChan chan struct{}
|
||||
doneChan chan struct{}
|
||||
|
||||
// Metrics (protected by mutex)
|
||||
mu sync.RWMutex
|
||||
currentMetrics loadmonitor.Metrics
|
||||
|
||||
// Latency tracking
|
||||
queryLatencies []time.Duration
|
||||
writeLatencies []time.Duration
|
||||
latencyMu sync.Mutex
|
||||
|
||||
// Emergency mode
|
||||
emergencyThreshold float64 // e.g., 1.167 (target + 1/6)
|
||||
recoveryThreshold float64 // e.g., 0.833 (target - 1/6)
|
||||
inEmergency atomic.Bool
|
||||
}
|
||||
|
||||
// NewMemoryMonitor creates a memory-only load monitor.
|
||||
// pollInterval controls how often memory is sampled (recommended: 100ms).
|
||||
func NewMemoryMonitor(pollInterval time.Duration) *MemoryMonitor {
|
||||
m := &MemoryMonitor{
|
||||
pollInterval: pollInterval,
|
||||
stopChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
queryLatencies: make([]time.Duration, 0, 100),
|
||||
writeLatencies: make([]time.Duration, 0, 100),
|
||||
emergencyThreshold: 1.167, // Default: target + 1/6
|
||||
recoveryThreshold: 0.833, // Default: target - 1/6
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// GetMetrics returns the current load metrics.
|
||||
func (m *MemoryMonitor) GetMetrics() loadmonitor.Metrics {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.currentMetrics
|
||||
}
|
||||
|
||||
// RecordQueryLatency records a query latency sample.
|
||||
func (m *MemoryMonitor) RecordQueryLatency(latency time.Duration) {
|
||||
m.latencyMu.Lock()
|
||||
defer m.latencyMu.Unlock()
|
||||
|
||||
m.queryLatencies = append(m.queryLatencies, latency)
|
||||
if len(m.queryLatencies) > 100 {
|
||||
m.queryLatencies = m.queryLatencies[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// RecordWriteLatency records a write latency sample.
|
||||
func (m *MemoryMonitor) RecordWriteLatency(latency time.Duration) {
|
||||
m.latencyMu.Lock()
|
||||
defer m.latencyMu.Unlock()
|
||||
|
||||
m.writeLatencies = append(m.writeLatencies, latency)
|
||||
if len(m.writeLatencies) > 100 {
|
||||
m.writeLatencies = m.writeLatencies[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// SetMemoryTarget sets the target memory limit in bytes.
|
||||
func (m *MemoryMonitor) SetMemoryTarget(bytes uint64) {
|
||||
m.targetBytes.Store(bytes)
|
||||
}
|
||||
|
||||
// SetEmergencyThreshold sets the memory threshold for emergency mode.
|
||||
func (m *MemoryMonitor) SetEmergencyThreshold(threshold float64) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.emergencyThreshold = threshold
|
||||
}
|
||||
|
||||
// GetEmergencyThreshold returns the current emergency threshold.
|
||||
func (m *MemoryMonitor) GetEmergencyThreshold() float64 {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.emergencyThreshold
|
||||
}
|
||||
|
||||
// ForceEmergencyMode manually triggers emergency mode for a duration.
|
||||
func (m *MemoryMonitor) ForceEmergencyMode(duration time.Duration) {
|
||||
m.inEmergency.Store(true)
|
||||
go func() {
|
||||
time.Sleep(duration)
|
||||
m.inEmergency.Store(false)
|
||||
}()
|
||||
}
|
||||
|
||||
// Start begins background metric collection.
|
||||
func (m *MemoryMonitor) Start() <-chan struct{} {
|
||||
if m.running.Swap(true) {
|
||||
// Already running
|
||||
return m.doneChan
|
||||
}
|
||||
|
||||
go m.pollLoop()
|
||||
return m.doneChan
|
||||
}
|
||||
|
||||
// Stop halts background metric collection.
|
||||
func (m *MemoryMonitor) Stop() {
|
||||
if !m.running.Swap(false) {
|
||||
return
|
||||
}
|
||||
close(m.stopChan)
|
||||
<-m.doneChan
|
||||
}
|
||||
|
||||
// pollLoop continuously samples memory and updates metrics.
|
||||
func (m *MemoryMonitor) pollLoop() {
|
||||
defer close(m.doneChan)
|
||||
|
||||
ticker := time.NewTicker(m.pollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-m.stopChan:
|
||||
return
|
||||
case <-ticker.C:
|
||||
m.updateMetrics()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateMetrics samples current memory and updates the metrics.
|
||||
func (m *MemoryMonitor) updateMetrics() {
|
||||
target := m.targetBytes.Load()
|
||||
if target == 0 {
|
||||
target = 1 // Avoid division by zero
|
||||
}
|
||||
|
||||
// Get physical memory using the same method as other monitors
|
||||
procMem := ReadProcessMemoryStats()
|
||||
physicalMemBytes := procMem.PhysicalMemoryBytes()
|
||||
physicalMemMB := physicalMemBytes / (1024 * 1024)
|
||||
|
||||
// Calculate memory pressure
|
||||
memPressure := float64(physicalMemBytes) / float64(target)
|
||||
|
||||
// Check emergency mode thresholds
|
||||
m.mu.RLock()
|
||||
emergencyThreshold := m.emergencyThreshold
|
||||
recoveryThreshold := m.recoveryThreshold
|
||||
m.mu.RUnlock()
|
||||
|
||||
wasEmergency := m.inEmergency.Load()
|
||||
if memPressure > emergencyThreshold {
|
||||
m.inEmergency.Store(true)
|
||||
} else if memPressure < recoveryThreshold && wasEmergency {
|
||||
m.inEmergency.Store(false)
|
||||
}
|
||||
|
||||
// Calculate average latencies
|
||||
m.latencyMu.Lock()
|
||||
var avgQuery, avgWrite time.Duration
|
||||
if len(m.queryLatencies) > 0 {
|
||||
var total time.Duration
|
||||
for _, l := range m.queryLatencies {
|
||||
total += l
|
||||
}
|
||||
avgQuery = total / time.Duration(len(m.queryLatencies))
|
||||
}
|
||||
if len(m.writeLatencies) > 0 {
|
||||
var total time.Duration
|
||||
for _, l := range m.writeLatencies {
|
||||
total += l
|
||||
}
|
||||
avgWrite = total / time.Duration(len(m.writeLatencies))
|
||||
}
|
||||
m.latencyMu.Unlock()
|
||||
|
||||
// Update metrics
|
||||
m.mu.Lock()
|
||||
m.currentMetrics = loadmonitor.Metrics{
|
||||
MemoryPressure: memPressure,
|
||||
WriteLoad: 0, // No database-specific load metric
|
||||
ReadLoad: 0, // No database-specific load metric
|
||||
QueryLatency: avgQuery,
|
||||
WriteLatency: avgWrite,
|
||||
Timestamp: time.Now(),
|
||||
InEmergencyMode: m.inEmergency.Load(),
|
||||
CompactionPending: false, // BBolt doesn't have compaction
|
||||
PhysicalMemoryMB: physicalMemMB,
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// Ensure MemoryMonitor implements the required interfaces
|
||||
var _ loadmonitor.Monitor = (*MemoryMonitor)(nil)
|
||||
var _ loadmonitor.EmergencyModeMonitor = (*MemoryMonitor)(nil)
|
||||
@@ -1 +1 @@
|
||||
v0.48.11
|
||||
v0.48.12
|
||||
|
||||
Reference in New Issue
Block a user