Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
be72b694eb | ||
|
|
61f6027a64 | ||
|
|
e7bc9a4a97 | ||
|
|
41a3b5c0a5 |
@@ -3,12 +3,8 @@
|
||||
"allow": [],
|
||||
"deny": [],
|
||||
"ask": [],
|
||||
"additionalDirectories": [
|
||||
"/home/mleku/smesh",
|
||||
"/home/mleku/Tourmaline",
|
||||
"/home/mleku/Amber"
|
||||
]
|
||||
"additionalDirectories": []
|
||||
},
|
||||
"outputStyle": "Default",
|
||||
"MAX_THINKING_TOKENS": "8000"
|
||||
"MAX_THINKING_TOKENS": "16000"
|
||||
}
|
||||
|
||||
70
CLAUDE.md
70
CLAUDE.md
@@ -40,7 +40,7 @@ NOSTR_SECRET_KEY=nsec1... ./nurl https://relay.example.com/api/logs/clear
|
||||
|----------|---------|-------------|
|
||||
| `ORLY_PORT` | 3334 | Server port |
|
||||
| `ORLY_LOG_LEVEL` | info | trace/debug/info/warn/error |
|
||||
| `ORLY_DB_TYPE` | badger | badger/neo4j/wasmdb |
|
||||
| `ORLY_DB_TYPE` | badger | badger/bbolt/neo4j/wasmdb |
|
||||
| `ORLY_POLICY_ENABLED` | false | Enable policy system |
|
||||
| `ORLY_ACL_MODE` | none | none/follows/managed |
|
||||
| `ORLY_TLS_DOMAINS` | | Let's Encrypt domains |
|
||||
@@ -67,6 +67,7 @@ app/
|
||||
web/ → Svelte frontend (embedded via go:embed)
|
||||
pkg/
|
||||
database/ → Database interface + Badger implementation
|
||||
bbolt/ → BBolt backend (HDD-optimized, B+tree)
|
||||
neo4j/ → Neo4j backend with WoT extensions
|
||||
wasmdb/ → WebAssembly IndexedDB backend
|
||||
protocol/ → Nostr protocol (ws/, auth/, publish/)
|
||||
@@ -130,16 +131,78 @@ if timeout > DefaultTimeoutSeconds {
|
||||
- Provide public API methods (`IsEnabled()`, `CheckPolicy()`)
|
||||
- Never change unexported→exported to fix bugs
|
||||
|
||||
### 6. Auth-Required Configuration (CAUTION)
|
||||
|
||||
**Be extremely careful when modifying auth-related settings in deployment configs.**
|
||||
|
||||
The `ORLY_AUTH_REQUIRED` and `ORLY_AUTH_TO_WRITE` settings control whether clients must authenticate via NIP-42 before interacting with the relay. Changing these on a production relay can:
|
||||
|
||||
- **Lock out all existing clients** if they don't support NIP-42 auth
|
||||
- **Break automated systems** (bots, bridges, scrapers) that depend on anonymous access
|
||||
- **Cause data sync issues** if upstream relays can't push events
|
||||
|
||||
Before enabling auth-required on any deployment:
|
||||
1. Verify all expected clients support NIP-42
|
||||
2. Ensure the relay identity key is properly configured
|
||||
3. Test with a non-production instance first
|
||||
|
||||
## Database Backends
|
||||
|
||||
| Backend | Use Case | Build |
|
||||
|---------|----------|-------|
|
||||
| **Badger** (default) | Single-instance, embedded | Standard |
|
||||
| **Badger** (default) | Single-instance, SSD, high performance | Standard |
|
||||
| **BBolt** | HDD-optimized, large archives, lower memory | `ORLY_DB_TYPE=bbolt` |
|
||||
| **Neo4j** | Social graph, WoT queries | `ORLY_DB_TYPE=neo4j` |
|
||||
| **WasmDB** | Browser/WebAssembly | `GOOS=js GOARCH=wasm` |
|
||||
|
||||
All implement `pkg/database.Database` interface.
|
||||
|
||||
### Scaling for Large Archives
|
||||
|
||||
For archives with millions of events, consider:
|
||||
|
||||
**Option 1: Tune Badger (SSD recommended)**
|
||||
```bash
|
||||
# Increase caches for larger working set (requires more RAM)
|
||||
ORLY_DB_BLOCK_CACHE_MB=2048 # 2GB block cache
|
||||
ORLY_DB_INDEX_CACHE_MB=1024 # 1GB index cache
|
||||
ORLY_SERIAL_CACHE_PUBKEYS=500000 # 500k pubkeys
|
||||
ORLY_SERIAL_CACHE_EVENT_IDS=2000000 # 2M event IDs
|
||||
|
||||
# Higher compression to reduce disk IO
|
||||
ORLY_DB_ZSTD_LEVEL=9 # Best compression ratio
|
||||
|
||||
# Enable storage GC with aggressive eviction
|
||||
ORLY_GC_ENABLED=true
|
||||
ORLY_GC_BATCH_SIZE=5000
|
||||
ORLY_MAX_STORAGE_BYTES=107374182400 # 100GB cap
|
||||
```
|
||||
|
||||
**Option 2: Use BBolt for HDD/Low-Memory Deployments**
|
||||
```bash
|
||||
ORLY_DB_TYPE=bbolt
|
||||
|
||||
# Tune for your HDD
|
||||
ORLY_BBOLT_BATCH_MAX_EVENTS=10000 # Larger batches for HDD
|
||||
ORLY_BBOLT_BATCH_MAX_MB=256 # 256MB batch buffer
|
||||
ORLY_BBOLT_FLUSH_TIMEOUT_SEC=60 # Longer flush interval
|
||||
ORLY_BBOLT_BLOOM_SIZE_MB=32 # Larger bloom filter
|
||||
ORLY_BBOLT_MMAP_SIZE_MB=16384 # 16GB mmap (scales with DB size)
|
||||
```
|
||||
|
||||
**Migration Between Backends**
|
||||
```bash
|
||||
# Migrate from Badger to BBolt
|
||||
./orly migrate --from badger --to bbolt
|
||||
|
||||
# Migrate with custom target path
|
||||
./orly migrate --from badger --to bbolt --target-path /mnt/hdd/orly-archive
|
||||
```
|
||||
|
||||
**BBolt vs Badger Trade-offs:**
|
||||
- BBolt: Lower memory, HDD-friendly, simpler (B+tree), slower random reads
|
||||
- Badger: Higher memory, SSD-optimized (LSM), faster concurrent access
|
||||
|
||||
## Logging (lol.mleku.dev)
|
||||
|
||||
```go
|
||||
@@ -206,7 +269,8 @@ if (isValidNsec(nsec)) { ... }
|
||||
|
||||
## Dependencies
|
||||
|
||||
- `github.com/dgraph-io/badger/v4` - Badger DB
|
||||
- `github.com/dgraph-io/badger/v4` - Badger DB (LSM, SSD-optimized)
|
||||
- `go.etcd.io/bbolt` - BBolt DB (B+tree, HDD-optimized)
|
||||
- `github.com/neo4j/neo4j-go-driver/v5` - Neo4j
|
||||
- `github.com/gorilla/websocket` - WebSocket
|
||||
- `github.com/ebitengine/purego` - CGO-free C loading
|
||||
|
||||
@@ -41,9 +41,9 @@ type C struct {
|
||||
EnableShutdown bool `env:"ORLY_ENABLE_SHUTDOWN" default:"false" usage:"if true, expose /shutdown on the health port to gracefully stop the process (for profiling)"`
|
||||
LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"`
|
||||
DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"`
|
||||
DBBlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"512" usage:"Badger block cache size in MB (higher improves read hit ratio)"`
|
||||
DBIndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"256" usage:"Badger index cache size in MB (improves index lookup performance)"`
|
||||
DBZSTDLevel int `env:"ORLY_DB_ZSTD_LEVEL" default:"1" usage:"Badger ZSTD compression level (1=fast/500MB/s, 3=default, 9=best ratio, 0=disable)"`
|
||||
DBBlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"1024" usage:"Badger block cache size in MB (higher improves read hit ratio, increase for large archives)"`
|
||||
DBIndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"512" usage:"Badger index cache size in MB (improves index lookup performance, increase for large archives)"`
|
||||
DBZSTDLevel int `env:"ORLY_DB_ZSTD_LEVEL" default:"3" usage:"Badger ZSTD compression level (1=fast/500MB/s, 3=balanced, 9=best ratio/slower, 0=disable)"`
|
||||
LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"`
|
||||
LogBufferSize int `env:"ORLY_LOG_BUFFER_SIZE" default:"10000" usage:"number of log entries to keep in memory for web UI viewing (0 disables)"`
|
||||
Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation,heap,block,goroutine,threadcreate,mutex"`
|
||||
@@ -67,6 +67,11 @@ type C struct {
|
||||
ClusterAdmins []string `env:"ORLY_CLUSTER_ADMINS" usage:"comma-separated list of npubs authorized to manage cluster membership"`
|
||||
FollowListFrequency time.Duration `env:"ORLY_FOLLOW_LIST_FREQUENCY" usage:"how often to fetch admin follow lists (default: 1h)" default:"1h"`
|
||||
|
||||
// Progressive throttle for follows ACL mode - allows non-followed users to write with increasing delay
|
||||
FollowsThrottleEnabled bool `env:"ORLY_FOLLOWS_THROTTLE" default:"false" usage:"enable progressive delay for non-followed users in follows ACL mode"`
|
||||
FollowsThrottlePerEvent time.Duration `env:"ORLY_FOLLOWS_THROTTLE_INCREMENT" default:"200ms" usage:"delay added per event for non-followed users"`
|
||||
FollowsThrottleMaxDelay time.Duration `env:"ORLY_FOLLOWS_THROTTLE_MAX" default:"60s" usage:"maximum throttle delay cap"`
|
||||
|
||||
// Blossom blob storage service level settings
|
||||
BlossomServiceLevels string `env:"ORLY_BLOSSOM_SERVICE_LEVELS" usage:"comma-separated list of service levels in format: name:storage_mb_per_sat_per_month (e.g., basic:1,premium:10)"`
|
||||
|
||||
@@ -119,9 +124,9 @@ type C struct {
|
||||
Neo4jMaxTxRetrySeconds int `env:"ORLY_NEO4J_MAX_TX_RETRY_SEC" default:"30" usage:"max seconds for retryable transaction attempts"`
|
||||
Neo4jQueryResultLimit int `env:"ORLY_NEO4J_QUERY_RESULT_LIMIT" default:"10000" usage:"max results returned per query (prevents unbounded memory usage, 0=unlimited)"`
|
||||
|
||||
// Advanced database tuning
|
||||
SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"100000" usage:"max pubkeys to cache for compact event storage (default: 100000, ~3.2MB memory)"`
|
||||
SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"500000" usage:"max event IDs to cache for compact event storage (default: 500000, ~16MB memory)"`
|
||||
// Advanced database tuning (increase for large archives to reduce cache misses)
|
||||
SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"250000" usage:"max pubkeys to cache for compact event storage (~8MB memory, increase for large archives)"`
|
||||
SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"1000000" usage:"max event IDs to cache for compact event storage (~32MB memory, increase for large archives)"`
|
||||
|
||||
// Connection concurrency control
|
||||
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
|
||||
@@ -841,3 +846,15 @@ func (cfg *C) GetNRCConfigValues() (
|
||||
cfg.NRCUseCashu,
|
||||
sessionTimeout
|
||||
}
|
||||
|
||||
// GetFollowsThrottleConfigValues returns the progressive throttle configuration values
|
||||
// for the follows ACL mode. This allows non-followed users to write with increasing delay.
|
||||
func (cfg *C) GetFollowsThrottleConfigValues() (
|
||||
enabled bool,
|
||||
perEvent time.Duration,
|
||||
maxDelay time.Duration,
|
||||
) {
|
||||
return cfg.FollowsThrottleEnabled,
|
||||
cfg.FollowsThrottlePerEvent,
|
||||
cfg.FollowsThrottleMaxDelay
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
@@ -254,6 +255,18 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
|
||||
}
|
||||
log.I.F("HandleEvent: authorized with access level %s", decision.AccessLevel)
|
||||
|
||||
// Progressive throttle for follows ACL mode (delays non-followed users)
|
||||
if delay := l.getFollowsThrottleDelay(env.E); delay > 0 {
|
||||
log.D.F("HandleEvent: applying progressive throttle delay of %v for %0x from %s",
|
||||
delay, env.E.Pubkey, l.remote)
|
||||
select {
|
||||
case <-l.ctx.Done():
|
||||
return l.ctx.Err()
|
||||
case <-time.After(delay):
|
||||
// Delay completed, continue processing
|
||||
}
|
||||
}
|
||||
|
||||
// Route special event kinds (ephemeral, etc.) - use routing service
|
||||
if routeResult := l.eventRouter.Route(env.E, l.authedPubkey.Load()); routeResult.Action != routing.Continue {
|
||||
if routeResult.Action == routing.Handled {
|
||||
|
||||
@@ -301,6 +301,22 @@ func (l *Listener) getManagedACL() *database.ManagedACL {
|
||||
return nil
|
||||
}
|
||||
|
||||
// getFollowsThrottleDelay returns the progressive throttle delay for follows ACL mode.
|
||||
// Returns 0 if not in follows mode, throttle is disabled, or user is exempt.
|
||||
func (l *Listener) getFollowsThrottleDelay(ev *event.E) time.Duration {
|
||||
// Only applies to follows ACL mode
|
||||
if acl.Registry.Active.Load() != "follows" {
|
||||
return 0
|
||||
}
|
||||
// Find the Follows ACL instance and get the throttle delay
|
||||
for _, aclInstance := range acl.Registry.ACL {
|
||||
if follows, ok := aclInstance.(*acl.Follows); ok {
|
||||
return follows.GetThrottleDelay(ev.Pubkey, l.remote)
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// QueryEvents queries events using the database QueryEvents method
|
||||
func (l *Listener) QueryEvents(ctx context.Context, f *filter.F) (event.S, error) {
|
||||
return l.DB.QueryEvents(ctx, f)
|
||||
|
||||
2
app/web/dist/bundle.js
vendored
2
app/web/dist/bundle.js
vendored
File diff suppressed because one or more lines are too long
2
app/web/dist/bundle.js.map
vendored
2
app/web/dist/bundle.js.map
vendored
File diff suppressed because one or more lines are too long
@@ -480,14 +480,9 @@ export async function fetchUserProfile(pubkey) {
|
||||
console.warn("Failed to fetch profile from fallback relays:", error);
|
||||
}
|
||||
|
||||
// 4) No profile found anywhere - create a default profile for new users
|
||||
console.log("No profile found for pubkey, creating default:", pubkey);
|
||||
try {
|
||||
return await createDefaultProfile(pubkey);
|
||||
} catch (e) {
|
||||
console.error("Failed to create default profile:", e);
|
||||
return null;
|
||||
}
|
||||
// 4) No profile found anywhere
|
||||
console.log("No profile found for pubkey:", pubkey);
|
||||
return null;
|
||||
}
|
||||
|
||||
// Helper to fetch profile from fallback relays
|
||||
@@ -561,57 +556,6 @@ async function processProfileEvent(profileEvent, pubkey) {
|
||||
return profile;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a default profile for new users
|
||||
* @param {string} pubkey - The user's public key (hex)
|
||||
* @returns {Promise<Object>} - The created profile
|
||||
*/
|
||||
async function createDefaultProfile(pubkey) {
|
||||
// Generate name from first 6 chars of pubkey
|
||||
const shortId = pubkey.slice(0, 6);
|
||||
const defaultName = `testuser${shortId}`;
|
||||
|
||||
// Get the current origin for the logo URL
|
||||
const logoUrl = `${window.location.origin}/orly.png`;
|
||||
|
||||
const profileContent = {
|
||||
name: defaultName,
|
||||
display_name: defaultName,
|
||||
picture: logoUrl,
|
||||
about: "New ORLY user"
|
||||
};
|
||||
|
||||
const profile = {
|
||||
name: defaultName,
|
||||
displayName: defaultName,
|
||||
picture: logoUrl,
|
||||
about: "New ORLY user",
|
||||
pubkey: pubkey
|
||||
};
|
||||
|
||||
// Try to publish the profile if we have a signer
|
||||
if (nostrClient.signer) {
|
||||
try {
|
||||
const event = {
|
||||
kind: 0,
|
||||
content: JSON.stringify(profileContent),
|
||||
tags: [],
|
||||
created_at: Math.floor(Date.now() / 1000)
|
||||
};
|
||||
|
||||
// Sign and publish using the websocket-auth client
|
||||
const signedEvent = await nostrClient.signer.signEvent(event);
|
||||
await nostrClient.publish(signedEvent);
|
||||
console.log("Default profile published:", signedEvent.id);
|
||||
} catch (e) {
|
||||
console.warn("Failed to publish default profile:", e);
|
||||
// Still return the profile even if publishing fails
|
||||
}
|
||||
}
|
||||
|
||||
return profile;
|
||||
}
|
||||
|
||||
// Fetch events
|
||||
export async function fetchEvents(filters, options = {}) {
|
||||
console.log(`Starting event fetch with filters:`, JSON.stringify(filters, null, 2));
|
||||
|
||||
6
main.go
6
main.go
@@ -24,7 +24,7 @@ import (
|
||||
"next.orly.dev/pkg/acl"
|
||||
"git.mleku.dev/mleku/nostr/crypto/keys"
|
||||
"git.mleku.dev/mleku/nostr/encoders/bech32encoding"
|
||||
_ "next.orly.dev/pkg/bbolt" // Import for bbolt factory registration
|
||||
bboltdb "next.orly.dev/pkg/bbolt" // Import for bbolt factory and type
|
||||
"next.orly.dev/pkg/database"
|
||||
neo4jdb "next.orly.dev/pkg/neo4j" // Import for neo4j factory and type
|
||||
"git.mleku.dev/mleku/nostr/encoders/hex"
|
||||
@@ -617,6 +617,10 @@ func main() {
|
||||
n4jDB.MaxConcurrentQueries(),
|
||||
)
|
||||
log.I.F("rate limiter configured for Neo4j backend (target: %dMB)", targetMB)
|
||||
} else if _, ok := db.(*bboltdb.B); ok {
|
||||
// BBolt uses memory-mapped IO, so memory-only limiter is appropriate
|
||||
limiter = ratelimit.NewMemoryOnlyLimiter(rlConfig)
|
||||
log.I.F("rate limiter configured for BBolt backend (target: %dMB)", targetMB)
|
||||
} else {
|
||||
// For other backends, create a disabled limiter
|
||||
limiter = ratelimit.NewDisabledLimiter()
|
||||
|
||||
@@ -45,6 +45,8 @@ type Follows struct {
|
||||
lastFollowListFetch time.Time
|
||||
// Callback for external notification of follow list changes
|
||||
onFollowListUpdate func()
|
||||
// Progressive throttle for non-followed users (nil if disabled)
|
||||
throttle *ProgressiveThrottle
|
||||
}
|
||||
|
||||
func (f *Follows) Configure(cfg ...any) (err error) {
|
||||
@@ -131,6 +133,22 @@ func (f *Follows) Configure(cfg ...any) (err error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize progressive throttle if enabled
|
||||
if f.cfg.FollowsThrottleEnabled {
|
||||
perEvent := f.cfg.FollowsThrottlePerEvent
|
||||
if perEvent == 0 {
|
||||
perEvent = 200 * time.Millisecond
|
||||
}
|
||||
maxDelay := f.cfg.FollowsThrottleMaxDelay
|
||||
if maxDelay == 0 {
|
||||
maxDelay = 60 * time.Second
|
||||
}
|
||||
f.throttle = NewProgressiveThrottle(perEvent, maxDelay)
|
||||
log.I.F("follows ACL: progressive throttle enabled (increment: %v, max: %v)",
|
||||
perEvent, maxDelay)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -155,6 +173,10 @@ func (f *Follows) GetAccessLevel(pub []byte, address string) (level string) {
|
||||
if f.cfg == nil {
|
||||
return "write"
|
||||
}
|
||||
// If throttle enabled, non-followed users get write access (with delay applied in handle-event)
|
||||
if f.throttle != nil {
|
||||
return "write"
|
||||
}
|
||||
return "read"
|
||||
}
|
||||
|
||||
@@ -165,6 +187,41 @@ func (f *Follows) GetACLInfo() (name, description, documentation string) {
|
||||
|
||||
func (f *Follows) Type() string { return "follows" }
|
||||
|
||||
// GetThrottleDelay returns the progressive throttle delay for this event.
|
||||
// Returns 0 if throttle is disabled or if the user is exempt (owner/admin/followed).
|
||||
func (f *Follows) GetThrottleDelay(pubkey []byte, ip string) time.Duration {
|
||||
if f.throttle == nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Check if user is exempt from throttling
|
||||
f.followsMx.RLock()
|
||||
defer f.followsMx.RUnlock()
|
||||
|
||||
// Owners bypass throttle
|
||||
for _, v := range f.owners {
|
||||
if utils.FastEqual(v, pubkey) {
|
||||
return 0
|
||||
}
|
||||
}
|
||||
// Admins bypass throttle
|
||||
for _, v := range f.admins {
|
||||
if utils.FastEqual(v, pubkey) {
|
||||
return 0
|
||||
}
|
||||
}
|
||||
// Followed users bypass throttle
|
||||
for _, v := range f.follows {
|
||||
if utils.FastEqual(v, pubkey) {
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// Non-followed users get throttled
|
||||
pubkeyHex := hex.EncodeToString(pubkey)
|
||||
return f.throttle.GetDelay(ip, pubkeyHex)
|
||||
}
|
||||
|
||||
func (f *Follows) adminRelays() (urls []string) {
|
||||
f.followsMx.RLock()
|
||||
admins := make([][]byte, len(f.admins))
|
||||
@@ -353,6 +410,29 @@ func (f *Follows) Syncer() {
|
||||
|
||||
// Start periodic follow list and metadata fetching
|
||||
go f.startPeriodicFollowListFetching()
|
||||
|
||||
// Start throttle cleanup goroutine if throttle is enabled
|
||||
if f.throttle != nil {
|
||||
go f.throttleCleanup()
|
||||
}
|
||||
}
|
||||
|
||||
// throttleCleanup periodically removes fully-decayed throttle entries
|
||||
func (f *Follows) throttleCleanup() {
|
||||
ticker := time.NewTicker(10 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-f.Ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
f.throttle.Cleanup()
|
||||
ipCount, pubkeyCount := f.throttle.Stats()
|
||||
log.T.F("follows throttle: cleanup complete, tracking %d IPs and %d pubkeys",
|
||||
ipCount, pubkeyCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// startPeriodicFollowListFetching starts periodic fetching of admin follow lists
|
||||
|
||||
126
pkg/acl/follows_throttle.go
Normal file
126
pkg/acl/follows_throttle.go
Normal file
@@ -0,0 +1,126 @@
|
||||
package acl
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ThrottleState tracks accumulated delay for an identity (IP or pubkey)
|
||||
type ThrottleState struct {
|
||||
AccumulatedDelay time.Duration
|
||||
LastEventTime time.Time
|
||||
}
|
||||
|
||||
// ProgressiveThrottle implements linear delay with time decay.
|
||||
// Each event adds perEvent delay, and delay decays at 1:1 ratio with elapsed time.
|
||||
// This creates a natural rate limit that averages to 1 event per perEvent interval.
|
||||
type ProgressiveThrottle struct {
|
||||
mu sync.Mutex
|
||||
ipStates map[string]*ThrottleState
|
||||
pubkeyStates map[string]*ThrottleState
|
||||
perEvent time.Duration // delay increment per event (default 200ms)
|
||||
maxDelay time.Duration // cap (default 60s)
|
||||
}
|
||||
|
||||
// NewProgressiveThrottle creates a new throttle with the given parameters.
|
||||
// perEvent is the delay added per event (e.g., 200ms).
|
||||
// maxDelay is the maximum accumulated delay cap (e.g., 60s).
|
||||
func NewProgressiveThrottle(perEvent, maxDelay time.Duration) *ProgressiveThrottle {
|
||||
return &ProgressiveThrottle{
|
||||
ipStates: make(map[string]*ThrottleState),
|
||||
pubkeyStates: make(map[string]*ThrottleState),
|
||||
perEvent: perEvent,
|
||||
maxDelay: maxDelay,
|
||||
}
|
||||
}
|
||||
|
||||
// GetDelay returns accumulated delay for this identity and updates state.
|
||||
// It tracks both IP and pubkey independently and returns the maximum of both.
|
||||
// This prevents evasion via different pubkeys from same IP or vice versa.
|
||||
func (pt *ProgressiveThrottle) GetDelay(ip, pubkeyHex string) time.Duration {
|
||||
pt.mu.Lock()
|
||||
defer pt.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
var ipDelay, pubkeyDelay time.Duration
|
||||
|
||||
if ip != "" {
|
||||
ipDelay = pt.updateState(pt.ipStates, ip, now)
|
||||
}
|
||||
if pubkeyHex != "" {
|
||||
pubkeyDelay = pt.updateState(pt.pubkeyStates, pubkeyHex, now)
|
||||
}
|
||||
|
||||
// Return max of both to prevent evasion
|
||||
if ipDelay > pubkeyDelay {
|
||||
return ipDelay
|
||||
}
|
||||
return pubkeyDelay
|
||||
}
|
||||
|
||||
// updateState calculates and updates the delay for a single identity.
|
||||
// The algorithm:
|
||||
// 1. Decay: subtract elapsed time from accumulated delay (1:1 ratio)
|
||||
// 2. Add: add perEvent for this new event
|
||||
// 3. Cap: limit to maxDelay
|
||||
func (pt *ProgressiveThrottle) updateState(states map[string]*ThrottleState, key string, now time.Time) time.Duration {
|
||||
state, exists := states[key]
|
||||
if !exists {
|
||||
// First event from this identity
|
||||
states[key] = &ThrottleState{
|
||||
AccumulatedDelay: pt.perEvent,
|
||||
LastEventTime: now,
|
||||
}
|
||||
return pt.perEvent
|
||||
}
|
||||
|
||||
// Decay: subtract elapsed time (1:1 ratio)
|
||||
elapsed := now.Sub(state.LastEventTime)
|
||||
state.AccumulatedDelay -= elapsed
|
||||
if state.AccumulatedDelay < 0 {
|
||||
state.AccumulatedDelay = 0
|
||||
}
|
||||
|
||||
// Add new event's delay
|
||||
state.AccumulatedDelay += pt.perEvent
|
||||
state.LastEventTime = now
|
||||
|
||||
// Cap at max
|
||||
if state.AccumulatedDelay > pt.maxDelay {
|
||||
state.AccumulatedDelay = pt.maxDelay
|
||||
}
|
||||
|
||||
return state.AccumulatedDelay
|
||||
}
|
||||
|
||||
// Cleanup removes entries that have fully decayed (no remaining delay).
|
||||
// This should be called periodically to prevent unbounded memory growth.
|
||||
func (pt *ProgressiveThrottle) Cleanup() {
|
||||
pt.mu.Lock()
|
||||
defer pt.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
|
||||
// Remove IP entries that have fully decayed
|
||||
for k, v := range pt.ipStates {
|
||||
elapsed := now.Sub(v.LastEventTime)
|
||||
if elapsed >= v.AccumulatedDelay {
|
||||
delete(pt.ipStates, k)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove pubkey entries that have fully decayed
|
||||
for k, v := range pt.pubkeyStates {
|
||||
elapsed := now.Sub(v.LastEventTime)
|
||||
if elapsed >= v.AccumulatedDelay {
|
||||
delete(pt.pubkeyStates, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stats returns the current number of tracked IPs and pubkeys (for monitoring)
|
||||
func (pt *ProgressiveThrottle) Stats() (ipCount, pubkeyCount int) {
|
||||
pt.mu.Lock()
|
||||
defer pt.mu.Unlock()
|
||||
return len(pt.ipStates), len(pt.pubkeyStates)
|
||||
}
|
||||
@@ -350,12 +350,15 @@ func (r *bboltSerialResolver) GetPubkeyBySerial(serial uint64) (pubkey []byte, e
|
||||
r.b.db.View(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(bucketSpk)
|
||||
if bucket == nil {
|
||||
err = errors.New("bbolt: spk bucket not found")
|
||||
return nil
|
||||
}
|
||||
val := bucket.Get(makeSerialKey(serial))
|
||||
if val != nil {
|
||||
pubkey = make([]byte, 32)
|
||||
copy(pubkey, val)
|
||||
} else {
|
||||
err = errors.New("bbolt: pubkey serial not found")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
@@ -374,12 +377,15 @@ func (r *bboltSerialResolver) GetEventIdBySerial(serial uint64) (eventID []byte,
|
||||
r.b.db.View(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(bucketSei)
|
||||
if bucket == nil {
|
||||
err = errors.New("bbolt: sei bucket not found")
|
||||
return nil
|
||||
}
|
||||
val := bucket.Get(makeSerialKey(serial))
|
||||
if val != nil {
|
||||
eventID = make([]byte, 32)
|
||||
copy(eventID, val)
|
||||
} else {
|
||||
err = errors.New("bbolt: event serial not found")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -52,6 +52,20 @@ const (
|
||||
TagElementPubkeySerial = 0x01 // Pubkey serial reference (5 bytes)
|
||||
TagElementEventSerial = 0x02 // Event ID serial reference (5 bytes)
|
||||
TagElementEventIdFull = 0x03 // Full event ID (32 bytes) - for unknown refs
|
||||
|
||||
// Sanity limits to prevent OOM from corrupt data
|
||||
MaxTagsPerEvent = 10000 // Maximum number of tags in an event
|
||||
MaxTagElements = 100 // Maximum elements in a single tag
|
||||
MaxContentLength = 10 << 20 // 10MB max content
|
||||
MaxTagElementLength = 1 << 20 // 1MB max for a single tag element
|
||||
)
|
||||
|
||||
var (
|
||||
ErrTooManyTags = errors.New("corrupt data: too many tags")
|
||||
ErrTooManyTagElems = errors.New("corrupt data: too many tag elements")
|
||||
ErrContentTooLarge = errors.New("corrupt data: content too large")
|
||||
ErrTagElementTooLong = errors.New("corrupt data: tag element too long")
|
||||
ErrUnknownTagElemType = errors.New("corrupt data: unknown tag element type")
|
||||
)
|
||||
|
||||
// SerialResolver is an interface for resolving serials during compact encoding/decoding.
|
||||
@@ -287,12 +301,15 @@ func UnmarshalCompactEvent(data []byte, eventId []byte, resolver SerialResolver)
|
||||
if nTags, err = varint.Decode(r); chk.E(err) {
|
||||
return nil, err
|
||||
}
|
||||
if nTags > MaxTagsPerEvent {
|
||||
return nil, ErrTooManyTags // Don't log - caller handles gracefully
|
||||
}
|
||||
if nTags > 0 {
|
||||
ev.Tags = tag.NewSWithCap(int(nTags))
|
||||
for i := uint64(0); i < nTags; i++ {
|
||||
var t *tag.T
|
||||
if t, err = decodeCompactTag(r, resolver); chk.E(err) {
|
||||
return nil, err
|
||||
if t, err = decodeCompactTag(r, resolver); err != nil {
|
||||
return nil, err // Don't log corruption errors
|
||||
}
|
||||
*ev.Tags = append(*ev.Tags, t)
|
||||
}
|
||||
@@ -303,6 +320,9 @@ func UnmarshalCompactEvent(data []byte, eventId []byte, resolver SerialResolver)
|
||||
if contentLen, err = varint.Decode(r); chk.E(err) {
|
||||
return nil, err
|
||||
}
|
||||
if contentLen > MaxContentLength {
|
||||
return nil, ErrContentTooLarge
|
||||
}
|
||||
ev.Content = make([]byte, contentLen)
|
||||
if _, err = io.ReadFull(r, ev.Content); chk.E(err) {
|
||||
return nil, err
|
||||
@@ -320,16 +340,19 @@ func UnmarshalCompactEvent(data []byte, eventId []byte, resolver SerialResolver)
|
||||
// decodeCompactTag decodes a single tag from compact format.
|
||||
func decodeCompactTag(r io.Reader, resolver SerialResolver) (t *tag.T, err error) {
|
||||
var nElems uint64
|
||||
if nElems, err = varint.Decode(r); chk.E(err) {
|
||||
if nElems, err = varint.Decode(r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if nElems > MaxTagElements {
|
||||
return nil, ErrTooManyTagElems
|
||||
}
|
||||
|
||||
t = tag.NewWithCap(int(nElems))
|
||||
|
||||
for i := uint64(0); i < nElems; i++ {
|
||||
var elem []byte
|
||||
if elem, err = decodeTagElement(r, resolver); chk.E(err) {
|
||||
return nil, err
|
||||
if elem, err = decodeTagElement(r, resolver); err != nil {
|
||||
return nil, err // Don't log corruption errors
|
||||
}
|
||||
t.T = append(t.T, elem)
|
||||
}
|
||||
@@ -350,9 +373,12 @@ func decodeTagElement(r io.Reader, resolver SerialResolver) (elem []byte, err er
|
||||
case TagElementRaw:
|
||||
// Raw bytes: varint length + data
|
||||
var length uint64
|
||||
if length, err = varint.Decode(r); chk.E(err) {
|
||||
if length, err = varint.Decode(r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if length > MaxTagElementLength {
|
||||
return nil, ErrTagElementTooLong
|
||||
}
|
||||
elem = make([]byte, length)
|
||||
if _, err = io.ReadFull(r, elem); err != nil {
|
||||
return nil, err
|
||||
@@ -402,7 +428,7 @@ func decodeTagElement(r io.Reader, resolver SerialResolver) (elem []byte, err er
|
||||
return elem, nil
|
||||
|
||||
default:
|
||||
return nil, errors.New("unknown tag element type flag")
|
||||
return nil, ErrUnknownTagElemType
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -54,3 +54,11 @@ func MonitorFromNeo4jDriver(
|
||||
) loadmonitor.Monitor {
|
||||
return NewNeo4jMonitor(driver, querySem, maxConcurrency, 100*time.Millisecond)
|
||||
}
|
||||
|
||||
// NewMemoryOnlyLimiter creates a rate limiter that only monitors process memory.
|
||||
// Useful for database backends that don't have their own load metrics (e.g., BBolt).
|
||||
// Since BBolt uses memory-mapped IO, memory pressure is still relevant.
|
||||
func NewMemoryOnlyLimiter(config Config) *Limiter {
|
||||
monitor := NewMemoryMonitor(100 * time.Millisecond)
|
||||
return NewLimiter(config, monitor)
|
||||
}
|
||||
|
||||
214
pkg/ratelimit/memory_monitor.go
Normal file
214
pkg/ratelimit/memory_monitor.go
Normal file
@@ -0,0 +1,214 @@
|
||||
//go:build !(js && wasm)
|
||||
|
||||
package ratelimit
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"next.orly.dev/pkg/interfaces/loadmonitor"
|
||||
)
|
||||
|
||||
// MemoryMonitor is a simple load monitor that only tracks process memory.
|
||||
// Used for database backends that don't have their own load metrics (e.g., BBolt).
|
||||
type MemoryMonitor struct {
|
||||
// Configuration
|
||||
pollInterval time.Duration
|
||||
targetBytes atomic.Uint64
|
||||
|
||||
// State
|
||||
running atomic.Bool
|
||||
stopChan chan struct{}
|
||||
doneChan chan struct{}
|
||||
|
||||
// Metrics (protected by mutex)
|
||||
mu sync.RWMutex
|
||||
currentMetrics loadmonitor.Metrics
|
||||
|
||||
// Latency tracking
|
||||
queryLatencies []time.Duration
|
||||
writeLatencies []time.Duration
|
||||
latencyMu sync.Mutex
|
||||
|
||||
// Emergency mode
|
||||
emergencyThreshold float64 // e.g., 1.167 (target + 1/6)
|
||||
recoveryThreshold float64 // e.g., 0.833 (target - 1/6)
|
||||
inEmergency atomic.Bool
|
||||
}
|
||||
|
||||
// NewMemoryMonitor creates a memory-only load monitor.
|
||||
// pollInterval controls how often memory is sampled (recommended: 100ms).
|
||||
func NewMemoryMonitor(pollInterval time.Duration) *MemoryMonitor {
|
||||
m := &MemoryMonitor{
|
||||
pollInterval: pollInterval,
|
||||
stopChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
queryLatencies: make([]time.Duration, 0, 100),
|
||||
writeLatencies: make([]time.Duration, 0, 100),
|
||||
emergencyThreshold: 1.167, // Default: target + 1/6
|
||||
recoveryThreshold: 0.833, // Default: target - 1/6
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// GetMetrics returns the current load metrics.
|
||||
func (m *MemoryMonitor) GetMetrics() loadmonitor.Metrics {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.currentMetrics
|
||||
}
|
||||
|
||||
// RecordQueryLatency records a query latency sample.
|
||||
func (m *MemoryMonitor) RecordQueryLatency(latency time.Duration) {
|
||||
m.latencyMu.Lock()
|
||||
defer m.latencyMu.Unlock()
|
||||
|
||||
m.queryLatencies = append(m.queryLatencies, latency)
|
||||
if len(m.queryLatencies) > 100 {
|
||||
m.queryLatencies = m.queryLatencies[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// RecordWriteLatency records a write latency sample.
|
||||
func (m *MemoryMonitor) RecordWriteLatency(latency time.Duration) {
|
||||
m.latencyMu.Lock()
|
||||
defer m.latencyMu.Unlock()
|
||||
|
||||
m.writeLatencies = append(m.writeLatencies, latency)
|
||||
if len(m.writeLatencies) > 100 {
|
||||
m.writeLatencies = m.writeLatencies[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// SetMemoryTarget sets the target memory limit in bytes.
|
||||
func (m *MemoryMonitor) SetMemoryTarget(bytes uint64) {
|
||||
m.targetBytes.Store(bytes)
|
||||
}
|
||||
|
||||
// SetEmergencyThreshold sets the memory threshold for emergency mode.
|
||||
func (m *MemoryMonitor) SetEmergencyThreshold(threshold float64) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.emergencyThreshold = threshold
|
||||
}
|
||||
|
||||
// GetEmergencyThreshold returns the current emergency threshold.
|
||||
func (m *MemoryMonitor) GetEmergencyThreshold() float64 {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.emergencyThreshold
|
||||
}
|
||||
|
||||
// ForceEmergencyMode manually triggers emergency mode for a duration.
|
||||
func (m *MemoryMonitor) ForceEmergencyMode(duration time.Duration) {
|
||||
m.inEmergency.Store(true)
|
||||
go func() {
|
||||
time.Sleep(duration)
|
||||
m.inEmergency.Store(false)
|
||||
}()
|
||||
}
|
||||
|
||||
// Start begins background metric collection.
|
||||
func (m *MemoryMonitor) Start() <-chan struct{} {
|
||||
if m.running.Swap(true) {
|
||||
// Already running
|
||||
return m.doneChan
|
||||
}
|
||||
|
||||
go m.pollLoop()
|
||||
return m.doneChan
|
||||
}
|
||||
|
||||
// Stop halts background metric collection.
|
||||
func (m *MemoryMonitor) Stop() {
|
||||
if !m.running.Swap(false) {
|
||||
return
|
||||
}
|
||||
close(m.stopChan)
|
||||
<-m.doneChan
|
||||
}
|
||||
|
||||
// pollLoop continuously samples memory and updates metrics.
|
||||
func (m *MemoryMonitor) pollLoop() {
|
||||
defer close(m.doneChan)
|
||||
|
||||
ticker := time.NewTicker(m.pollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-m.stopChan:
|
||||
return
|
||||
case <-ticker.C:
|
||||
m.updateMetrics()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateMetrics samples current memory and updates the metrics.
|
||||
func (m *MemoryMonitor) updateMetrics() {
|
||||
target := m.targetBytes.Load()
|
||||
if target == 0 {
|
||||
target = 1 // Avoid division by zero
|
||||
}
|
||||
|
||||
// Get physical memory using the same method as other monitors
|
||||
procMem := ReadProcessMemoryStats()
|
||||
physicalMemBytes := procMem.PhysicalMemoryBytes()
|
||||
physicalMemMB := physicalMemBytes / (1024 * 1024)
|
||||
|
||||
// Calculate memory pressure
|
||||
memPressure := float64(physicalMemBytes) / float64(target)
|
||||
|
||||
// Check emergency mode thresholds
|
||||
m.mu.RLock()
|
||||
emergencyThreshold := m.emergencyThreshold
|
||||
recoveryThreshold := m.recoveryThreshold
|
||||
m.mu.RUnlock()
|
||||
|
||||
wasEmergency := m.inEmergency.Load()
|
||||
if memPressure > emergencyThreshold {
|
||||
m.inEmergency.Store(true)
|
||||
} else if memPressure < recoveryThreshold && wasEmergency {
|
||||
m.inEmergency.Store(false)
|
||||
}
|
||||
|
||||
// Calculate average latencies
|
||||
m.latencyMu.Lock()
|
||||
var avgQuery, avgWrite time.Duration
|
||||
if len(m.queryLatencies) > 0 {
|
||||
var total time.Duration
|
||||
for _, l := range m.queryLatencies {
|
||||
total += l
|
||||
}
|
||||
avgQuery = total / time.Duration(len(m.queryLatencies))
|
||||
}
|
||||
if len(m.writeLatencies) > 0 {
|
||||
var total time.Duration
|
||||
for _, l := range m.writeLatencies {
|
||||
total += l
|
||||
}
|
||||
avgWrite = total / time.Duration(len(m.writeLatencies))
|
||||
}
|
||||
m.latencyMu.Unlock()
|
||||
|
||||
// Update metrics
|
||||
m.mu.Lock()
|
||||
m.currentMetrics = loadmonitor.Metrics{
|
||||
MemoryPressure: memPressure,
|
||||
WriteLoad: 0, // No database-specific load metric
|
||||
ReadLoad: 0, // No database-specific load metric
|
||||
QueryLatency: avgQuery,
|
||||
WriteLatency: avgWrite,
|
||||
Timestamp: time.Now(),
|
||||
InEmergencyMode: m.inEmergency.Load(),
|
||||
CompactionPending: false, // BBolt doesn't have compaction
|
||||
PhysicalMemoryMB: physicalMemMB,
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// Ensure MemoryMonitor implements the required interfaces
|
||||
var _ loadmonitor.Monitor = (*MemoryMonitor)(nil)
|
||||
var _ loadmonitor.EmergencyModeMonitor = (*MemoryMonitor)(nil)
|
||||
@@ -1 +1 @@
|
||||
v0.48.9
|
||||
v0.48.12
|
||||
|
||||
Reference in New Issue
Block a user