Compare commits

...

4 Commits

Author SHA1 Message Date
woikos
be72b694eb Add BBolt rate limiting and tune Badger defaults for large archives (v0.48.12)
Some checks failed
Go / build-and-release (push) Has been cancelled
- Increase Badger cache defaults: block 512→1024MB, index 256→512MB
- Increase serial cache defaults: pubkeys 100k→250k, event IDs 500k→1M
- Change ZSTD default from level 1 (fast) to level 3 (balanced)
- Add memory-only rate limiter for BBolt backend
- Add BBolt to database backend docs with scaling recommendations
- Document migration between Badger and BBolt backends

Files modified:
- app/config/config.go: Tuned defaults for large-scale deployments
- main.go: Add BBolt rate limiter support
- pkg/ratelimit/factory.go: Add NewMemoryOnlyLimiter factory
- pkg/ratelimit/memory_monitor.go: New memory-only load monitor
- CLAUDE.md: Add BBolt docs and scaling guide

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 11:55:07 +01:00
woikos
61f6027a64 Remove auto-profile creation and add auth config docs (v0.48.11)
Some checks failed
Go / build-and-release (push) Has been cancelled
- Remove createDefaultProfile() function from nostr.js that auto-created
  placeholder profiles for new users - profiles should not be auto-generated
- Add auth-required configuration caution section to CLAUDE.md documenting
  risks of enabling NIP-42 auth on production relays

Files modified:
- CLAUDE.md: Added auth-required configuration section
- app/web/src/nostr.js: Removed createDefaultProfile and auto-profile logic
- app/web/dist/bundle.js: Rebuilt with changes
- pkg/version/version: v0.48.11

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 10:22:56 +01:00
woikos
e7bc9a4a97 Add progressive throttle for follows ACL mode (v0.48.10)
Some checks failed
Go / build-and-release (push) Has been cancelled
- Add progressive throttle feature for follows ACL mode, allowing
  non-followed users to write with increasing delay instead of blocking
- Delay increases linearly per event (default 200ms) and decays at 1:1
  ratio with elapsed time, capping at configurable max (default 60s)
- Track both IP and pubkey independently to prevent evasion
- Add periodic cleanup to remove fully-decayed throttle entries
- Fix BBolt serial resolver to return proper errors when buckets or
  entries are not found

Files modified:
- app/config/config.go: Add ORLY_FOLLOWS_THROTTLE_* env vars
- app/handle-event.go: Apply throttle delay before event processing
- app/listener.go: Add getFollowsThrottleDelay helper method
- pkg/acl/follows.go: Integrate throttle with follows ACL
- pkg/acl/follows_throttle.go: New progressive throttle implementation
- pkg/bbolt/save-event.go: Return errors from serial lookups
- pkg/version/version: Bump to v0.48.10

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 17:39:04 +01:00
woikos
41a3b5c0a5 Fix OOM crash from corrupt compact event data
Add sanity bounds to prevent memory exhaustion when decoding corrupt
events with garbage varint values. Previously, corrupt data could cause
massive allocations (e.g., make([]byte, 2^60)) leading to OOM crashes.

- Add MaxTagsPerEvent (10000), MaxTagElements (100), MaxContentLength (10MB),
  MaxTagElementLength (1MB) limits
- Return sentinel errors for corrupt data instead of logging
- Silently skip corrupt events (caller handles gracefully)

This fixes crash loops on archive.orly.dev where OOM during writes
left corrupt events in bbolt database.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 11:46:36 +01:00
16 changed files with 599 additions and 85 deletions

View File

@@ -3,12 +3,8 @@
"allow": [],
"deny": [],
"ask": [],
"additionalDirectories": [
"/home/mleku/smesh",
"/home/mleku/Tourmaline",
"/home/mleku/Amber"
]
"additionalDirectories": []
},
"outputStyle": "Default",
"MAX_THINKING_TOKENS": "8000"
"MAX_THINKING_TOKENS": "16000"
}

View File

@@ -40,7 +40,7 @@ NOSTR_SECRET_KEY=nsec1... ./nurl https://relay.example.com/api/logs/clear
|----------|---------|-------------|
| `ORLY_PORT` | 3334 | Server port |
| `ORLY_LOG_LEVEL` | info | trace/debug/info/warn/error |
| `ORLY_DB_TYPE` | badger | badger/neo4j/wasmdb |
| `ORLY_DB_TYPE` | badger | badger/bbolt/neo4j/wasmdb |
| `ORLY_POLICY_ENABLED` | false | Enable policy system |
| `ORLY_ACL_MODE` | none | none/follows/managed |
| `ORLY_TLS_DOMAINS` | | Let's Encrypt domains |
@@ -67,6 +67,7 @@ app/
web/ → Svelte frontend (embedded via go:embed)
pkg/
database/ → Database interface + Badger implementation
bbolt/ → BBolt backend (HDD-optimized, B+tree)
neo4j/ → Neo4j backend with WoT extensions
wasmdb/ → WebAssembly IndexedDB backend
protocol/ → Nostr protocol (ws/, auth/, publish/)
@@ -130,16 +131,78 @@ if timeout > DefaultTimeoutSeconds {
- Provide public API methods (`IsEnabled()`, `CheckPolicy()`)
- Never change unexported→exported to fix bugs
### 6. Auth-Required Configuration (CAUTION)
**Be extremely careful when modifying auth-related settings in deployment configs.**
The `ORLY_AUTH_REQUIRED` and `ORLY_AUTH_TO_WRITE` settings control whether clients must authenticate via NIP-42 before interacting with the relay. Changing these on a production relay can:
- **Lock out all existing clients** if they don't support NIP-42 auth
- **Break automated systems** (bots, bridges, scrapers) that depend on anonymous access
- **Cause data sync issues** if upstream relays can't push events
Before enabling auth-required on any deployment:
1. Verify all expected clients support NIP-42
2. Ensure the relay identity key is properly configured
3. Test with a non-production instance first
## Database Backends
| Backend | Use Case | Build |
|---------|----------|-------|
| **Badger** (default) | Single-instance, embedded | Standard |
| **Badger** (default) | Single-instance, SSD, high performance | Standard |
| **BBolt** | HDD-optimized, large archives, lower memory | `ORLY_DB_TYPE=bbolt` |
| **Neo4j** | Social graph, WoT queries | `ORLY_DB_TYPE=neo4j` |
| **WasmDB** | Browser/WebAssembly | `GOOS=js GOARCH=wasm` |
All implement `pkg/database.Database` interface.
### Scaling for Large Archives
For archives with millions of events, consider:
**Option 1: Tune Badger (SSD recommended)**
```bash
# Increase caches for larger working set (requires more RAM)
ORLY_DB_BLOCK_CACHE_MB=2048 # 2GB block cache
ORLY_DB_INDEX_CACHE_MB=1024 # 1GB index cache
ORLY_SERIAL_CACHE_PUBKEYS=500000 # 500k pubkeys
ORLY_SERIAL_CACHE_EVENT_IDS=2000000 # 2M event IDs
# Higher compression to reduce disk IO
ORLY_DB_ZSTD_LEVEL=9 # Best compression ratio
# Enable storage GC with aggressive eviction
ORLY_GC_ENABLED=true
ORLY_GC_BATCH_SIZE=5000
ORLY_MAX_STORAGE_BYTES=107374182400 # 100GB cap
```
**Option 2: Use BBolt for HDD/Low-Memory Deployments**
```bash
ORLY_DB_TYPE=bbolt
# Tune for your HDD
ORLY_BBOLT_BATCH_MAX_EVENTS=10000 # Larger batches for HDD
ORLY_BBOLT_BATCH_MAX_MB=256 # 256MB batch buffer
ORLY_BBOLT_FLUSH_TIMEOUT_SEC=60 # Longer flush interval
ORLY_BBOLT_BLOOM_SIZE_MB=32 # Larger bloom filter
ORLY_BBOLT_MMAP_SIZE_MB=16384 # 16GB mmap (scales with DB size)
```
**Migration Between Backends**
```bash
# Migrate from Badger to BBolt
./orly migrate --from badger --to bbolt
# Migrate with custom target path
./orly migrate --from badger --to bbolt --target-path /mnt/hdd/orly-archive
```
**BBolt vs Badger Trade-offs:**
- BBolt: Lower memory, HDD-friendly, simpler (B+tree), slower random reads
- Badger: Higher memory, SSD-optimized (LSM), faster concurrent access
## Logging (lol.mleku.dev)
```go
@@ -206,7 +269,8 @@ if (isValidNsec(nsec)) { ... }
## Dependencies
- `github.com/dgraph-io/badger/v4` - Badger DB
- `github.com/dgraph-io/badger/v4` - Badger DB (LSM, SSD-optimized)
- `go.etcd.io/bbolt` - BBolt DB (B+tree, HDD-optimized)
- `github.com/neo4j/neo4j-go-driver/v5` - Neo4j
- `github.com/gorilla/websocket` - WebSocket
- `github.com/ebitengine/purego` - CGO-free C loading

View File

@@ -41,9 +41,9 @@ type C struct {
EnableShutdown bool `env:"ORLY_ENABLE_SHUTDOWN" default:"false" usage:"if true, expose /shutdown on the health port to gracefully stop the process (for profiling)"`
LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"`
DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"`
DBBlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"512" usage:"Badger block cache size in MB (higher improves read hit ratio)"`
DBIndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"256" usage:"Badger index cache size in MB (improves index lookup performance)"`
DBZSTDLevel int `env:"ORLY_DB_ZSTD_LEVEL" default:"1" usage:"Badger ZSTD compression level (1=fast/500MB/s, 3=default, 9=best ratio, 0=disable)"`
DBBlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"1024" usage:"Badger block cache size in MB (higher improves read hit ratio, increase for large archives)"`
DBIndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"512" usage:"Badger index cache size in MB (improves index lookup performance, increase for large archives)"`
DBZSTDLevel int `env:"ORLY_DB_ZSTD_LEVEL" default:"3" usage:"Badger ZSTD compression level (1=fast/500MB/s, 3=balanced, 9=best ratio/slower, 0=disable)"`
LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"`
LogBufferSize int `env:"ORLY_LOG_BUFFER_SIZE" default:"10000" usage:"number of log entries to keep in memory for web UI viewing (0 disables)"`
Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation,heap,block,goroutine,threadcreate,mutex"`
@@ -67,6 +67,11 @@ type C struct {
ClusterAdmins []string `env:"ORLY_CLUSTER_ADMINS" usage:"comma-separated list of npubs authorized to manage cluster membership"`
FollowListFrequency time.Duration `env:"ORLY_FOLLOW_LIST_FREQUENCY" usage:"how often to fetch admin follow lists (default: 1h)" default:"1h"`
// Progressive throttle for follows ACL mode - allows non-followed users to write with increasing delay
FollowsThrottleEnabled bool `env:"ORLY_FOLLOWS_THROTTLE" default:"false" usage:"enable progressive delay for non-followed users in follows ACL mode"`
FollowsThrottlePerEvent time.Duration `env:"ORLY_FOLLOWS_THROTTLE_INCREMENT" default:"200ms" usage:"delay added per event for non-followed users"`
FollowsThrottleMaxDelay time.Duration `env:"ORLY_FOLLOWS_THROTTLE_MAX" default:"60s" usage:"maximum throttle delay cap"`
// Blossom blob storage service level settings
BlossomServiceLevels string `env:"ORLY_BLOSSOM_SERVICE_LEVELS" usage:"comma-separated list of service levels in format: name:storage_mb_per_sat_per_month (e.g., basic:1,premium:10)"`
@@ -119,9 +124,9 @@ type C struct {
Neo4jMaxTxRetrySeconds int `env:"ORLY_NEO4J_MAX_TX_RETRY_SEC" default:"30" usage:"max seconds for retryable transaction attempts"`
Neo4jQueryResultLimit int `env:"ORLY_NEO4J_QUERY_RESULT_LIMIT" default:"10000" usage:"max results returned per query (prevents unbounded memory usage, 0=unlimited)"`
// Advanced database tuning
SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"100000" usage:"max pubkeys to cache for compact event storage (default: 100000, ~3.2MB memory)"`
SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"500000" usage:"max event IDs to cache for compact event storage (default: 500000, ~16MB memory)"`
// Advanced database tuning (increase for large archives to reduce cache misses)
SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"250000" usage:"max pubkeys to cache for compact event storage (~8MB memory, increase for large archives)"`
SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"1000000" usage:"max event IDs to cache for compact event storage (~32MB memory, increase for large archives)"`
// Connection concurrency control
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
@@ -841,3 +846,15 @@ func (cfg *C) GetNRCConfigValues() (
cfg.NRCUseCashu,
sessionTimeout
}
// GetFollowsThrottleConfigValues returns the progressive throttle configuration values
// for the follows ACL mode. This allows non-followed users to write with increasing delay.
func (cfg *C) GetFollowsThrottleConfigValues() (
enabled bool,
perEvent time.Duration,
maxDelay time.Duration,
) {
return cfg.FollowsThrottleEnabled,
cfg.FollowsThrottlePerEvent,
cfg.FollowsThrottleMaxDelay
}

View File

@@ -2,6 +2,7 @@ package app
import (
"context"
"time"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
@@ -254,6 +255,18 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
}
log.I.F("HandleEvent: authorized with access level %s", decision.AccessLevel)
// Progressive throttle for follows ACL mode (delays non-followed users)
if delay := l.getFollowsThrottleDelay(env.E); delay > 0 {
log.D.F("HandleEvent: applying progressive throttle delay of %v for %0x from %s",
delay, env.E.Pubkey, l.remote)
select {
case <-l.ctx.Done():
return l.ctx.Err()
case <-time.After(delay):
// Delay completed, continue processing
}
}
// Route special event kinds (ephemeral, etc.) - use routing service
if routeResult := l.eventRouter.Route(env.E, l.authedPubkey.Load()); routeResult.Action != routing.Continue {
if routeResult.Action == routing.Handled {

View File

@@ -301,6 +301,22 @@ func (l *Listener) getManagedACL() *database.ManagedACL {
return nil
}
// getFollowsThrottleDelay returns the progressive throttle delay for follows ACL mode.
// Returns 0 if not in follows mode, throttle is disabled, or user is exempt.
func (l *Listener) getFollowsThrottleDelay(ev *event.E) time.Duration {
// Only applies to follows ACL mode
if acl.Registry.Active.Load() != "follows" {
return 0
}
// Find the Follows ACL instance and get the throttle delay
for _, aclInstance := range acl.Registry.ACL {
if follows, ok := aclInstance.(*acl.Follows); ok {
return follows.GetThrottleDelay(ev.Pubkey, l.remote)
}
}
return 0
}
// QueryEvents queries events using the database QueryEvents method
func (l *Listener) QueryEvents(ctx context.Context, f *filter.F) (event.S, error) {
return l.DB.QueryEvents(ctx, f)

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -480,14 +480,9 @@ export async function fetchUserProfile(pubkey) {
console.warn("Failed to fetch profile from fallback relays:", error);
}
// 4) No profile found anywhere - create a default profile for new users
console.log("No profile found for pubkey, creating default:", pubkey);
try {
return await createDefaultProfile(pubkey);
} catch (e) {
console.error("Failed to create default profile:", e);
return null;
}
// 4) No profile found anywhere
console.log("No profile found for pubkey:", pubkey);
return null;
}
// Helper to fetch profile from fallback relays
@@ -561,57 +556,6 @@ async function processProfileEvent(profileEvent, pubkey) {
return profile;
}
/**
* Create a default profile for new users
* @param {string} pubkey - The user's public key (hex)
* @returns {Promise<Object>} - The created profile
*/
async function createDefaultProfile(pubkey) {
// Generate name from first 6 chars of pubkey
const shortId = pubkey.slice(0, 6);
const defaultName = `testuser${shortId}`;
// Get the current origin for the logo URL
const logoUrl = `${window.location.origin}/orly.png`;
const profileContent = {
name: defaultName,
display_name: defaultName,
picture: logoUrl,
about: "New ORLY user"
};
const profile = {
name: defaultName,
displayName: defaultName,
picture: logoUrl,
about: "New ORLY user",
pubkey: pubkey
};
// Try to publish the profile if we have a signer
if (nostrClient.signer) {
try {
const event = {
kind: 0,
content: JSON.stringify(profileContent),
tags: [],
created_at: Math.floor(Date.now() / 1000)
};
// Sign and publish using the websocket-auth client
const signedEvent = await nostrClient.signer.signEvent(event);
await nostrClient.publish(signedEvent);
console.log("Default profile published:", signedEvent.id);
} catch (e) {
console.warn("Failed to publish default profile:", e);
// Still return the profile even if publishing fails
}
}
return profile;
}
// Fetch events
export async function fetchEvents(filters, options = {}) {
console.log(`Starting event fetch with filters:`, JSON.stringify(filters, null, 2));

View File

@@ -24,7 +24,7 @@ import (
"next.orly.dev/pkg/acl"
"git.mleku.dev/mleku/nostr/crypto/keys"
"git.mleku.dev/mleku/nostr/encoders/bech32encoding"
_ "next.orly.dev/pkg/bbolt" // Import for bbolt factory registration
bboltdb "next.orly.dev/pkg/bbolt" // Import for bbolt factory and type
"next.orly.dev/pkg/database"
neo4jdb "next.orly.dev/pkg/neo4j" // Import for neo4j factory and type
"git.mleku.dev/mleku/nostr/encoders/hex"
@@ -617,6 +617,10 @@ func main() {
n4jDB.MaxConcurrentQueries(),
)
log.I.F("rate limiter configured for Neo4j backend (target: %dMB)", targetMB)
} else if _, ok := db.(*bboltdb.B); ok {
// BBolt uses memory-mapped IO, so memory-only limiter is appropriate
limiter = ratelimit.NewMemoryOnlyLimiter(rlConfig)
log.I.F("rate limiter configured for BBolt backend (target: %dMB)", targetMB)
} else {
// For other backends, create a disabled limiter
limiter = ratelimit.NewDisabledLimiter()

View File

@@ -45,6 +45,8 @@ type Follows struct {
lastFollowListFetch time.Time
// Callback for external notification of follow list changes
onFollowListUpdate func()
// Progressive throttle for non-followed users (nil if disabled)
throttle *ProgressiveThrottle
}
func (f *Follows) Configure(cfg ...any) (err error) {
@@ -131,6 +133,22 @@ func (f *Follows) Configure(cfg ...any) (err error) {
}
}
}
// Initialize progressive throttle if enabled
if f.cfg.FollowsThrottleEnabled {
perEvent := f.cfg.FollowsThrottlePerEvent
if perEvent == 0 {
perEvent = 200 * time.Millisecond
}
maxDelay := f.cfg.FollowsThrottleMaxDelay
if maxDelay == 0 {
maxDelay = 60 * time.Second
}
f.throttle = NewProgressiveThrottle(perEvent, maxDelay)
log.I.F("follows ACL: progressive throttle enabled (increment: %v, max: %v)",
perEvent, maxDelay)
}
return
}
@@ -155,6 +173,10 @@ func (f *Follows) GetAccessLevel(pub []byte, address string) (level string) {
if f.cfg == nil {
return "write"
}
// If throttle enabled, non-followed users get write access (with delay applied in handle-event)
if f.throttle != nil {
return "write"
}
return "read"
}
@@ -165,6 +187,41 @@ func (f *Follows) GetACLInfo() (name, description, documentation string) {
func (f *Follows) Type() string { return "follows" }
// GetThrottleDelay returns the progressive throttle delay for this event.
// Returns 0 if throttle is disabled or if the user is exempt (owner/admin/followed).
func (f *Follows) GetThrottleDelay(pubkey []byte, ip string) time.Duration {
if f.throttle == nil {
return 0
}
// Check if user is exempt from throttling
f.followsMx.RLock()
defer f.followsMx.RUnlock()
// Owners bypass throttle
for _, v := range f.owners {
if utils.FastEqual(v, pubkey) {
return 0
}
}
// Admins bypass throttle
for _, v := range f.admins {
if utils.FastEqual(v, pubkey) {
return 0
}
}
// Followed users bypass throttle
for _, v := range f.follows {
if utils.FastEqual(v, pubkey) {
return 0
}
}
// Non-followed users get throttled
pubkeyHex := hex.EncodeToString(pubkey)
return f.throttle.GetDelay(ip, pubkeyHex)
}
func (f *Follows) adminRelays() (urls []string) {
f.followsMx.RLock()
admins := make([][]byte, len(f.admins))
@@ -353,6 +410,29 @@ func (f *Follows) Syncer() {
// Start periodic follow list and metadata fetching
go f.startPeriodicFollowListFetching()
// Start throttle cleanup goroutine if throttle is enabled
if f.throttle != nil {
go f.throttleCleanup()
}
}
// throttleCleanup periodically removes fully-decayed throttle entries
func (f *Follows) throttleCleanup() {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
for {
select {
case <-f.Ctx.Done():
return
case <-ticker.C:
f.throttle.Cleanup()
ipCount, pubkeyCount := f.throttle.Stats()
log.T.F("follows throttle: cleanup complete, tracking %d IPs and %d pubkeys",
ipCount, pubkeyCount)
}
}
}
// startPeriodicFollowListFetching starts periodic fetching of admin follow lists

126
pkg/acl/follows_throttle.go Normal file
View File

@@ -0,0 +1,126 @@
package acl
import (
"sync"
"time"
)
// ThrottleState tracks accumulated delay for an identity (IP or pubkey)
type ThrottleState struct {
AccumulatedDelay time.Duration
LastEventTime time.Time
}
// ProgressiveThrottle implements linear delay with time decay.
// Each event adds perEvent delay, and delay decays at 1:1 ratio with elapsed time.
// This creates a natural rate limit that averages to 1 event per perEvent interval.
type ProgressiveThrottle struct {
mu sync.Mutex
ipStates map[string]*ThrottleState
pubkeyStates map[string]*ThrottleState
perEvent time.Duration // delay increment per event (default 200ms)
maxDelay time.Duration // cap (default 60s)
}
// NewProgressiveThrottle creates a new throttle with the given parameters.
// perEvent is the delay added per event (e.g., 200ms).
// maxDelay is the maximum accumulated delay cap (e.g., 60s).
func NewProgressiveThrottle(perEvent, maxDelay time.Duration) *ProgressiveThrottle {
return &ProgressiveThrottle{
ipStates: make(map[string]*ThrottleState),
pubkeyStates: make(map[string]*ThrottleState),
perEvent: perEvent,
maxDelay: maxDelay,
}
}
// GetDelay returns accumulated delay for this identity and updates state.
// It tracks both IP and pubkey independently and returns the maximum of both.
// This prevents evasion via different pubkeys from same IP or vice versa.
func (pt *ProgressiveThrottle) GetDelay(ip, pubkeyHex string) time.Duration {
pt.mu.Lock()
defer pt.mu.Unlock()
now := time.Now()
var ipDelay, pubkeyDelay time.Duration
if ip != "" {
ipDelay = pt.updateState(pt.ipStates, ip, now)
}
if pubkeyHex != "" {
pubkeyDelay = pt.updateState(pt.pubkeyStates, pubkeyHex, now)
}
// Return max of both to prevent evasion
if ipDelay > pubkeyDelay {
return ipDelay
}
return pubkeyDelay
}
// updateState calculates and updates the delay for a single identity.
// The algorithm:
// 1. Decay: subtract elapsed time from accumulated delay (1:1 ratio)
// 2. Add: add perEvent for this new event
// 3. Cap: limit to maxDelay
func (pt *ProgressiveThrottle) updateState(states map[string]*ThrottleState, key string, now time.Time) time.Duration {
state, exists := states[key]
if !exists {
// First event from this identity
states[key] = &ThrottleState{
AccumulatedDelay: pt.perEvent,
LastEventTime: now,
}
return pt.perEvent
}
// Decay: subtract elapsed time (1:1 ratio)
elapsed := now.Sub(state.LastEventTime)
state.AccumulatedDelay -= elapsed
if state.AccumulatedDelay < 0 {
state.AccumulatedDelay = 0
}
// Add new event's delay
state.AccumulatedDelay += pt.perEvent
state.LastEventTime = now
// Cap at max
if state.AccumulatedDelay > pt.maxDelay {
state.AccumulatedDelay = pt.maxDelay
}
return state.AccumulatedDelay
}
// Cleanup removes entries that have fully decayed (no remaining delay).
// This should be called periodically to prevent unbounded memory growth.
func (pt *ProgressiveThrottle) Cleanup() {
pt.mu.Lock()
defer pt.mu.Unlock()
now := time.Now()
// Remove IP entries that have fully decayed
for k, v := range pt.ipStates {
elapsed := now.Sub(v.LastEventTime)
if elapsed >= v.AccumulatedDelay {
delete(pt.ipStates, k)
}
}
// Remove pubkey entries that have fully decayed
for k, v := range pt.pubkeyStates {
elapsed := now.Sub(v.LastEventTime)
if elapsed >= v.AccumulatedDelay {
delete(pt.pubkeyStates, k)
}
}
}
// Stats returns the current number of tracked IPs and pubkeys (for monitoring)
func (pt *ProgressiveThrottle) Stats() (ipCount, pubkeyCount int) {
pt.mu.Lock()
defer pt.mu.Unlock()
return len(pt.ipStates), len(pt.pubkeyStates)
}

View File

@@ -350,12 +350,15 @@ func (r *bboltSerialResolver) GetPubkeyBySerial(serial uint64) (pubkey []byte, e
r.b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketSpk)
if bucket == nil {
err = errors.New("bbolt: spk bucket not found")
return nil
}
val := bucket.Get(makeSerialKey(serial))
if val != nil {
pubkey = make([]byte, 32)
copy(pubkey, val)
} else {
err = errors.New("bbolt: pubkey serial not found")
}
return nil
})
@@ -374,12 +377,15 @@ func (r *bboltSerialResolver) GetEventIdBySerial(serial uint64) (eventID []byte,
r.b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketSei)
if bucket == nil {
err = errors.New("bbolt: sei bucket not found")
return nil
}
val := bucket.Get(makeSerialKey(serial))
if val != nil {
eventID = make([]byte, 32)
copy(eventID, val)
} else {
err = errors.New("bbolt: event serial not found")
}
return nil
})

View File

@@ -52,6 +52,20 @@ const (
TagElementPubkeySerial = 0x01 // Pubkey serial reference (5 bytes)
TagElementEventSerial = 0x02 // Event ID serial reference (5 bytes)
TagElementEventIdFull = 0x03 // Full event ID (32 bytes) - for unknown refs
// Sanity limits to prevent OOM from corrupt data
MaxTagsPerEvent = 10000 // Maximum number of tags in an event
MaxTagElements = 100 // Maximum elements in a single tag
MaxContentLength = 10 << 20 // 10MB max content
MaxTagElementLength = 1 << 20 // 1MB max for a single tag element
)
var (
ErrTooManyTags = errors.New("corrupt data: too many tags")
ErrTooManyTagElems = errors.New("corrupt data: too many tag elements")
ErrContentTooLarge = errors.New("corrupt data: content too large")
ErrTagElementTooLong = errors.New("corrupt data: tag element too long")
ErrUnknownTagElemType = errors.New("corrupt data: unknown tag element type")
)
// SerialResolver is an interface for resolving serials during compact encoding/decoding.
@@ -287,12 +301,15 @@ func UnmarshalCompactEvent(data []byte, eventId []byte, resolver SerialResolver)
if nTags, err = varint.Decode(r); chk.E(err) {
return nil, err
}
if nTags > MaxTagsPerEvent {
return nil, ErrTooManyTags // Don't log - caller handles gracefully
}
if nTags > 0 {
ev.Tags = tag.NewSWithCap(int(nTags))
for i := uint64(0); i < nTags; i++ {
var t *tag.T
if t, err = decodeCompactTag(r, resolver); chk.E(err) {
return nil, err
if t, err = decodeCompactTag(r, resolver); err != nil {
return nil, err // Don't log corruption errors
}
*ev.Tags = append(*ev.Tags, t)
}
@@ -303,6 +320,9 @@ func UnmarshalCompactEvent(data []byte, eventId []byte, resolver SerialResolver)
if contentLen, err = varint.Decode(r); chk.E(err) {
return nil, err
}
if contentLen > MaxContentLength {
return nil, ErrContentTooLarge
}
ev.Content = make([]byte, contentLen)
if _, err = io.ReadFull(r, ev.Content); chk.E(err) {
return nil, err
@@ -320,16 +340,19 @@ func UnmarshalCompactEvent(data []byte, eventId []byte, resolver SerialResolver)
// decodeCompactTag decodes a single tag from compact format.
func decodeCompactTag(r io.Reader, resolver SerialResolver) (t *tag.T, err error) {
var nElems uint64
if nElems, err = varint.Decode(r); chk.E(err) {
if nElems, err = varint.Decode(r); err != nil {
return nil, err
}
if nElems > MaxTagElements {
return nil, ErrTooManyTagElems
}
t = tag.NewWithCap(int(nElems))
for i := uint64(0); i < nElems; i++ {
var elem []byte
if elem, err = decodeTagElement(r, resolver); chk.E(err) {
return nil, err
if elem, err = decodeTagElement(r, resolver); err != nil {
return nil, err // Don't log corruption errors
}
t.T = append(t.T, elem)
}
@@ -350,9 +373,12 @@ func decodeTagElement(r io.Reader, resolver SerialResolver) (elem []byte, err er
case TagElementRaw:
// Raw bytes: varint length + data
var length uint64
if length, err = varint.Decode(r); chk.E(err) {
if length, err = varint.Decode(r); err != nil {
return nil, err
}
if length > MaxTagElementLength {
return nil, ErrTagElementTooLong
}
elem = make([]byte, length)
if _, err = io.ReadFull(r, elem); err != nil {
return nil, err
@@ -402,7 +428,7 @@ func decodeTagElement(r io.Reader, resolver SerialResolver) (elem []byte, err er
return elem, nil
default:
return nil, errors.New("unknown tag element type flag")
return nil, ErrUnknownTagElemType
}
}

View File

@@ -54,3 +54,11 @@ func MonitorFromNeo4jDriver(
) loadmonitor.Monitor {
return NewNeo4jMonitor(driver, querySem, maxConcurrency, 100*time.Millisecond)
}
// NewMemoryOnlyLimiter creates a rate limiter that only monitors process memory.
// Useful for database backends that don't have their own load metrics (e.g., BBolt).
// Since BBolt uses memory-mapped IO, memory pressure is still relevant.
func NewMemoryOnlyLimiter(config Config) *Limiter {
monitor := NewMemoryMonitor(100 * time.Millisecond)
return NewLimiter(config, monitor)
}

View File

@@ -0,0 +1,214 @@
//go:build !(js && wasm)
package ratelimit
import (
"sync"
"sync/atomic"
"time"
"next.orly.dev/pkg/interfaces/loadmonitor"
)
// MemoryMonitor is a simple load monitor that only tracks process memory.
// Used for database backends that don't have their own load metrics (e.g., BBolt).
type MemoryMonitor struct {
// Configuration
pollInterval time.Duration
targetBytes atomic.Uint64
// State
running atomic.Bool
stopChan chan struct{}
doneChan chan struct{}
// Metrics (protected by mutex)
mu sync.RWMutex
currentMetrics loadmonitor.Metrics
// Latency tracking
queryLatencies []time.Duration
writeLatencies []time.Duration
latencyMu sync.Mutex
// Emergency mode
emergencyThreshold float64 // e.g., 1.167 (target + 1/6)
recoveryThreshold float64 // e.g., 0.833 (target - 1/6)
inEmergency atomic.Bool
}
// NewMemoryMonitor creates a memory-only load monitor.
// pollInterval controls how often memory is sampled (recommended: 100ms).
func NewMemoryMonitor(pollInterval time.Duration) *MemoryMonitor {
m := &MemoryMonitor{
pollInterval: pollInterval,
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
queryLatencies: make([]time.Duration, 0, 100),
writeLatencies: make([]time.Duration, 0, 100),
emergencyThreshold: 1.167, // Default: target + 1/6
recoveryThreshold: 0.833, // Default: target - 1/6
}
return m
}
// GetMetrics returns the current load metrics.
func (m *MemoryMonitor) GetMetrics() loadmonitor.Metrics {
m.mu.RLock()
defer m.mu.RUnlock()
return m.currentMetrics
}
// RecordQueryLatency records a query latency sample.
func (m *MemoryMonitor) RecordQueryLatency(latency time.Duration) {
m.latencyMu.Lock()
defer m.latencyMu.Unlock()
m.queryLatencies = append(m.queryLatencies, latency)
if len(m.queryLatencies) > 100 {
m.queryLatencies = m.queryLatencies[1:]
}
}
// RecordWriteLatency records a write latency sample.
func (m *MemoryMonitor) RecordWriteLatency(latency time.Duration) {
m.latencyMu.Lock()
defer m.latencyMu.Unlock()
m.writeLatencies = append(m.writeLatencies, latency)
if len(m.writeLatencies) > 100 {
m.writeLatencies = m.writeLatencies[1:]
}
}
// SetMemoryTarget sets the target memory limit in bytes.
func (m *MemoryMonitor) SetMemoryTarget(bytes uint64) {
m.targetBytes.Store(bytes)
}
// SetEmergencyThreshold sets the memory threshold for emergency mode.
func (m *MemoryMonitor) SetEmergencyThreshold(threshold float64) {
m.mu.Lock()
defer m.mu.Unlock()
m.emergencyThreshold = threshold
}
// GetEmergencyThreshold returns the current emergency threshold.
func (m *MemoryMonitor) GetEmergencyThreshold() float64 {
m.mu.RLock()
defer m.mu.RUnlock()
return m.emergencyThreshold
}
// ForceEmergencyMode manually triggers emergency mode for a duration.
func (m *MemoryMonitor) ForceEmergencyMode(duration time.Duration) {
m.inEmergency.Store(true)
go func() {
time.Sleep(duration)
m.inEmergency.Store(false)
}()
}
// Start begins background metric collection.
func (m *MemoryMonitor) Start() <-chan struct{} {
if m.running.Swap(true) {
// Already running
return m.doneChan
}
go m.pollLoop()
return m.doneChan
}
// Stop halts background metric collection.
func (m *MemoryMonitor) Stop() {
if !m.running.Swap(false) {
return
}
close(m.stopChan)
<-m.doneChan
}
// pollLoop continuously samples memory and updates metrics.
func (m *MemoryMonitor) pollLoop() {
defer close(m.doneChan)
ticker := time.NewTicker(m.pollInterval)
defer ticker.Stop()
for {
select {
case <-m.stopChan:
return
case <-ticker.C:
m.updateMetrics()
}
}
}
// updateMetrics samples current memory and updates the metrics.
func (m *MemoryMonitor) updateMetrics() {
target := m.targetBytes.Load()
if target == 0 {
target = 1 // Avoid division by zero
}
// Get physical memory using the same method as other monitors
procMem := ReadProcessMemoryStats()
physicalMemBytes := procMem.PhysicalMemoryBytes()
physicalMemMB := physicalMemBytes / (1024 * 1024)
// Calculate memory pressure
memPressure := float64(physicalMemBytes) / float64(target)
// Check emergency mode thresholds
m.mu.RLock()
emergencyThreshold := m.emergencyThreshold
recoveryThreshold := m.recoveryThreshold
m.mu.RUnlock()
wasEmergency := m.inEmergency.Load()
if memPressure > emergencyThreshold {
m.inEmergency.Store(true)
} else if memPressure < recoveryThreshold && wasEmergency {
m.inEmergency.Store(false)
}
// Calculate average latencies
m.latencyMu.Lock()
var avgQuery, avgWrite time.Duration
if len(m.queryLatencies) > 0 {
var total time.Duration
for _, l := range m.queryLatencies {
total += l
}
avgQuery = total / time.Duration(len(m.queryLatencies))
}
if len(m.writeLatencies) > 0 {
var total time.Duration
for _, l := range m.writeLatencies {
total += l
}
avgWrite = total / time.Duration(len(m.writeLatencies))
}
m.latencyMu.Unlock()
// Update metrics
m.mu.Lock()
m.currentMetrics = loadmonitor.Metrics{
MemoryPressure: memPressure,
WriteLoad: 0, // No database-specific load metric
ReadLoad: 0, // No database-specific load metric
QueryLatency: avgQuery,
WriteLatency: avgWrite,
Timestamp: time.Now(),
InEmergencyMode: m.inEmergency.Load(),
CompactionPending: false, // BBolt doesn't have compaction
PhysicalMemoryMB: physicalMemMB,
}
m.mu.Unlock()
}
// Ensure MemoryMonitor implements the required interfaces
var _ loadmonitor.Monitor = (*MemoryMonitor)(nil)
var _ loadmonitor.EmergencyModeMonitor = (*MemoryMonitor)(nil)

View File

@@ -1 +1 @@
v0.48.9
v0.48.12