Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9da1784b1b | ||
|
|
205f23fc0c | ||
|
|
489b9f4593 | ||
|
|
604d759a6a | ||
|
|
be72b694eb | ||
|
|
61f6027a64 |
@@ -49,10 +49,12 @@ If no argument provided, default to `patch`.
|
||||
GIT_SSH_COMMAND="ssh -i ~/.ssh/gitmlekudev" git push ssh://mleku@git.mleku.dev:2222/mleku/next.orly.dev.git main --tags
|
||||
```
|
||||
|
||||
11. **Deploy to VPS** by running:
|
||||
```
|
||||
ssh relay.orly.dev 'cd ~/src/next.orly.dev && git stash && git pull origin main && export PATH=$PATH:~/go/bin && CGO_ENABLED=0 go build -o ~/.local/bin/next.orly.dev && sudo /usr/sbin/setcap cap_net_bind_service=+ep ~/.local/bin/next.orly.dev && sudo systemctl restart orly && ~/.local/bin/next.orly.dev version'
|
||||
11. **Deploy to relay.orly.dev** (ARM64):
|
||||
Build on remote (faster than uploading cross-compiled binary due to slow local bandwidth):
|
||||
```bash
|
||||
ssh relay.orly.dev 'cd ~/src/next.orly.dev && git pull origin main && GOPATH=$HOME CGO_ENABLED=0 ~/go/bin/go build -o ~/.local/bin/next.orly.dev && sudo /usr/sbin/setcap cap_net_bind_service=+ep ~/.local/bin/next.orly.dev && sudo systemctl restart orly && ~/.local/bin/next.orly.dev version'
|
||||
```
|
||||
Note: setcap must be re-applied after each binary rebuild to allow binding to ports 80/443.
|
||||
|
||||
12. **Report completion** with the new version and commit hash
|
||||
|
||||
|
||||
70
CLAUDE.md
70
CLAUDE.md
@@ -40,7 +40,7 @@ NOSTR_SECRET_KEY=nsec1... ./nurl https://relay.example.com/api/logs/clear
|
||||
|----------|---------|-------------|
|
||||
| `ORLY_PORT` | 3334 | Server port |
|
||||
| `ORLY_LOG_LEVEL` | info | trace/debug/info/warn/error |
|
||||
| `ORLY_DB_TYPE` | badger | badger/neo4j/wasmdb |
|
||||
| `ORLY_DB_TYPE` | badger | badger/bbolt/neo4j/wasmdb |
|
||||
| `ORLY_POLICY_ENABLED` | false | Enable policy system |
|
||||
| `ORLY_ACL_MODE` | none | none/follows/managed |
|
||||
| `ORLY_TLS_DOMAINS` | | Let's Encrypt domains |
|
||||
@@ -67,6 +67,7 @@ app/
|
||||
web/ → Svelte frontend (embedded via go:embed)
|
||||
pkg/
|
||||
database/ → Database interface + Badger implementation
|
||||
bbolt/ → BBolt backend (HDD-optimized, B+tree)
|
||||
neo4j/ → Neo4j backend with WoT extensions
|
||||
wasmdb/ → WebAssembly IndexedDB backend
|
||||
protocol/ → Nostr protocol (ws/, auth/, publish/)
|
||||
@@ -130,16 +131,78 @@ if timeout > DefaultTimeoutSeconds {
|
||||
- Provide public API methods (`IsEnabled()`, `CheckPolicy()`)
|
||||
- Never change unexported→exported to fix bugs
|
||||
|
||||
### 6. Auth-Required Configuration (CAUTION)
|
||||
|
||||
**Be extremely careful when modifying auth-related settings in deployment configs.**
|
||||
|
||||
The `ORLY_AUTH_REQUIRED` and `ORLY_AUTH_TO_WRITE` settings control whether clients must authenticate via NIP-42 before interacting with the relay. Changing these on a production relay can:
|
||||
|
||||
- **Lock out all existing clients** if they don't support NIP-42 auth
|
||||
- **Break automated systems** (bots, bridges, scrapers) that depend on anonymous access
|
||||
- **Cause data sync issues** if upstream relays can't push events
|
||||
|
||||
Before enabling auth-required on any deployment:
|
||||
1. Verify all expected clients support NIP-42
|
||||
2. Ensure the relay identity key is properly configured
|
||||
3. Test with a non-production instance first
|
||||
|
||||
## Database Backends
|
||||
|
||||
| Backend | Use Case | Build |
|
||||
|---------|----------|-------|
|
||||
| **Badger** (default) | Single-instance, embedded | Standard |
|
||||
| **Badger** (default) | Single-instance, SSD, high performance | Standard |
|
||||
| **BBolt** | HDD-optimized, large archives, lower memory | `ORLY_DB_TYPE=bbolt` |
|
||||
| **Neo4j** | Social graph, WoT queries | `ORLY_DB_TYPE=neo4j` |
|
||||
| **WasmDB** | Browser/WebAssembly | `GOOS=js GOARCH=wasm` |
|
||||
|
||||
All implement `pkg/database.Database` interface.
|
||||
|
||||
### Scaling for Large Archives
|
||||
|
||||
For archives with millions of events, consider:
|
||||
|
||||
**Option 1: Tune Badger (SSD recommended)**
|
||||
```bash
|
||||
# Increase caches for larger working set (requires more RAM)
|
||||
ORLY_DB_BLOCK_CACHE_MB=2048 # 2GB block cache
|
||||
ORLY_DB_INDEX_CACHE_MB=1024 # 1GB index cache
|
||||
ORLY_SERIAL_CACHE_PUBKEYS=500000 # 500k pubkeys
|
||||
ORLY_SERIAL_CACHE_EVENT_IDS=2000000 # 2M event IDs
|
||||
|
||||
# Higher compression to reduce disk IO
|
||||
ORLY_DB_ZSTD_LEVEL=9 # Best compression ratio
|
||||
|
||||
# Enable storage GC with aggressive eviction
|
||||
ORLY_GC_ENABLED=true
|
||||
ORLY_GC_BATCH_SIZE=5000
|
||||
ORLY_MAX_STORAGE_BYTES=107374182400 # 100GB cap
|
||||
```
|
||||
|
||||
**Option 2: Use BBolt for HDD/Low-Memory Deployments**
|
||||
```bash
|
||||
ORLY_DB_TYPE=bbolt
|
||||
|
||||
# Tune for your HDD
|
||||
ORLY_BBOLT_BATCH_MAX_EVENTS=10000 # Larger batches for HDD
|
||||
ORLY_BBOLT_BATCH_MAX_MB=256 # 256MB batch buffer
|
||||
ORLY_BBOLT_FLUSH_TIMEOUT_SEC=60 # Longer flush interval
|
||||
ORLY_BBOLT_BLOOM_SIZE_MB=32 # Larger bloom filter
|
||||
ORLY_BBOLT_MMAP_SIZE_MB=16384 # 16GB mmap (scales with DB size)
|
||||
```
|
||||
|
||||
**Migration Between Backends**
|
||||
```bash
|
||||
# Migrate from Badger to BBolt
|
||||
./orly migrate --from badger --to bbolt
|
||||
|
||||
# Migrate with custom target path
|
||||
./orly migrate --from badger --to bbolt --target-path /mnt/hdd/orly-archive
|
||||
```
|
||||
|
||||
**BBolt vs Badger Trade-offs:**
|
||||
- BBolt: Lower memory, HDD-friendly, simpler (B+tree), slower random reads
|
||||
- Badger: Higher memory, SSD-optimized (LSM), faster concurrent access
|
||||
|
||||
## Logging (lol.mleku.dev)
|
||||
|
||||
```go
|
||||
@@ -206,7 +269,8 @@ if (isValidNsec(nsec)) { ... }
|
||||
|
||||
## Dependencies
|
||||
|
||||
- `github.com/dgraph-io/badger/v4` - Badger DB
|
||||
- `github.com/dgraph-io/badger/v4` - Badger DB (LSM, SSD-optimized)
|
||||
- `go.etcd.io/bbolt` - BBolt DB (B+tree, HDD-optimized)
|
||||
- `github.com/neo4j/neo4j-go-driver/v5` - Neo4j
|
||||
- `github.com/gorilla/websocket` - WebSocket
|
||||
- `github.com/ebitengine/purego` - CGO-free C loading
|
||||
|
||||
@@ -20,8 +20,12 @@ func initializeBlossomServer(
|
||||
blossomCfg := &blossom.Config{
|
||||
BaseURL: "", // Will be set dynamically per request
|
||||
MaxBlobSize: 100 * 1024 * 1024, // 100MB default
|
||||
AllowedMimeTypes: nil, // Allow all MIME types by default
|
||||
AllowedMimeTypes: nil, // Allow all MIME types by default
|
||||
RequireAuth: cfg.AuthRequired || cfg.AuthToWrite,
|
||||
// Rate limiting for non-followed users
|
||||
RateLimitEnabled: cfg.BlossomRateLimitEnabled,
|
||||
DailyLimitMB: cfg.BlossomDailyLimitMB,
|
||||
BurstLimitMB: cfg.BlossomBurstLimitMB,
|
||||
}
|
||||
|
||||
// Create blossom server with relay's ACL registry
|
||||
@@ -31,7 +35,12 @@ func initializeBlossomServer(
|
||||
// We'll need to modify the handler to inject the baseURL per request
|
||||
// For now, we'll use a middleware approach
|
||||
|
||||
log.I.F("blossom server initialized with ACL mode: %s", cfg.ACLMode)
|
||||
if cfg.BlossomRateLimitEnabled {
|
||||
log.I.F("blossom server initialized with ACL mode: %s, rate limit: %dMB/day (burst: %dMB)",
|
||||
cfg.ACLMode, cfg.BlossomDailyLimitMB, cfg.BlossomBurstLimitMB)
|
||||
} else {
|
||||
log.I.F("blossom server initialized with ACL mode: %s", cfg.ACLMode)
|
||||
}
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -41,9 +41,9 @@ type C struct {
|
||||
EnableShutdown bool `env:"ORLY_ENABLE_SHUTDOWN" default:"false" usage:"if true, expose /shutdown on the health port to gracefully stop the process (for profiling)"`
|
||||
LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"`
|
||||
DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"`
|
||||
DBBlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"512" usage:"Badger block cache size in MB (higher improves read hit ratio)"`
|
||||
DBIndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"256" usage:"Badger index cache size in MB (improves index lookup performance)"`
|
||||
DBZSTDLevel int `env:"ORLY_DB_ZSTD_LEVEL" default:"1" usage:"Badger ZSTD compression level (1=fast/500MB/s, 3=default, 9=best ratio, 0=disable)"`
|
||||
DBBlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"1024" usage:"Badger block cache size in MB (higher improves read hit ratio, increase for large archives)"`
|
||||
DBIndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"512" usage:"Badger index cache size in MB (improves index lookup performance, increase for large archives)"`
|
||||
DBZSTDLevel int `env:"ORLY_DB_ZSTD_LEVEL" default:"3" usage:"Badger ZSTD compression level (1=fast/500MB/s, 3=balanced, 9=best ratio/slower, 0=disable)"`
|
||||
LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"`
|
||||
LogBufferSize int `env:"ORLY_LOG_BUFFER_SIZE" default:"10000" usage:"number of log entries to keep in memory for web UI viewing (0 disables)"`
|
||||
Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation,heap,block,goroutine,threadcreate,mutex"`
|
||||
@@ -69,12 +69,18 @@ type C struct {
|
||||
|
||||
// Progressive throttle for follows ACL mode - allows non-followed users to write with increasing delay
|
||||
FollowsThrottleEnabled bool `env:"ORLY_FOLLOWS_THROTTLE" default:"false" usage:"enable progressive delay for non-followed users in follows ACL mode"`
|
||||
FollowsThrottlePerEvent time.Duration `env:"ORLY_FOLLOWS_THROTTLE_INCREMENT" default:"200ms" usage:"delay added per event for non-followed users"`
|
||||
FollowsThrottlePerEvent time.Duration `env:"ORLY_FOLLOWS_THROTTLE_INCREMENT" default:"25ms" usage:"delay added per event for non-followed users"`
|
||||
FollowsThrottleMaxDelay time.Duration `env:"ORLY_FOLLOWS_THROTTLE_MAX" default:"60s" usage:"maximum throttle delay cap"`
|
||||
|
||||
// Blossom blob storage service level settings
|
||||
// Blossom blob storage service settings
|
||||
BlossomEnabled bool `env:"ORLY_BLOSSOM_ENABLED" default:"true" usage:"enable Blossom blob storage server (only works with Badger backend)"`
|
||||
BlossomServiceLevels string `env:"ORLY_BLOSSOM_SERVICE_LEVELS" usage:"comma-separated list of service levels in format: name:storage_mb_per_sat_per_month (e.g., basic:1,premium:10)"`
|
||||
|
||||
// Blossom upload rate limiting (for non-followed users)
|
||||
BlossomRateLimitEnabled bool `env:"ORLY_BLOSSOM_RATE_LIMIT" default:"false" usage:"enable upload rate limiting for non-followed users"`
|
||||
BlossomDailyLimitMB int64 `env:"ORLY_BLOSSOM_DAILY_LIMIT_MB" default:"10" usage:"daily upload limit in MB for non-followed users (EMA averaged)"`
|
||||
BlossomBurstLimitMB int64 `env:"ORLY_BLOSSOM_BURST_LIMIT_MB" default:"50" usage:"max burst upload in MB (bucket cap)"`
|
||||
|
||||
// Web UI and dev mode settings
|
||||
WebDisableEmbedded bool `env:"ORLY_WEB_DISABLE" default:"false" usage:"disable serving the embedded web UI; useful for hot-reload during development"`
|
||||
WebDevProxyURL string `env:"ORLY_WEB_DEV_PROXY_URL" usage:"when ORLY_WEB_DISABLE is true, reverse-proxy non-API paths to this dev server URL (e.g. http://localhost:5173)"`
|
||||
@@ -124,9 +130,9 @@ type C struct {
|
||||
Neo4jMaxTxRetrySeconds int `env:"ORLY_NEO4J_MAX_TX_RETRY_SEC" default:"30" usage:"max seconds for retryable transaction attempts"`
|
||||
Neo4jQueryResultLimit int `env:"ORLY_NEO4J_QUERY_RESULT_LIMIT" default:"10000" usage:"max results returned per query (prevents unbounded memory usage, 0=unlimited)"`
|
||||
|
||||
// Advanced database tuning
|
||||
SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"100000" usage:"max pubkeys to cache for compact event storage (default: 100000, ~3.2MB memory)"`
|
||||
SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"500000" usage:"max event IDs to cache for compact event storage (default: 500000, ~16MB memory)"`
|
||||
// Advanced database tuning (increase for large archives to reduce cache misses)
|
||||
SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"250000" usage:"max pubkeys to cache for compact event storage (~8MB memory, increase for large archives)"`
|
||||
SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"1000000" usage:"max event IDs to cache for compact event storage (~32MB memory, increase for large archives)"`
|
||||
|
||||
// Connection concurrency control
|
||||
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
|
||||
|
||||
@@ -124,6 +124,17 @@ func (s *Server) handleCashuKeysets(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// handleCashuInfo handles GET /cashu/info - returns mint information.
|
||||
func (s *Server) handleCashuInfo(w http.ResponseWriter, r *http.Request) {
|
||||
// CORS headers for browser-based CAT support detection
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
|
||||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Accept")
|
||||
|
||||
// Handle preflight
|
||||
if r.Method == http.MethodOptions {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
|
||||
if s.CashuIssuer == nil {
|
||||
http.Error(w, "Cashu tokens not enabled", http.StatusNotImplemented)
|
||||
return
|
||||
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
)
|
||||
|
||||
func (l *Listener) HandleEvent(msg []byte) (err error) {
|
||||
log.D.F("HandleEvent: START handling event: %s", msg)
|
||||
log.I.F("HandleEvent: START handling event: %s", string(msg[:min(200, len(msg))]))
|
||||
|
||||
// 1. Raw JSON validation (before unmarshal) - use validation service
|
||||
if result := l.eventValidator.ValidateRawJSON(msg); !result.Valid {
|
||||
@@ -231,6 +231,11 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
|
||||
|
||||
// Authorization check (policy + ACL) - use authorization service
|
||||
decision := l.eventAuthorizer.Authorize(env.E, l.authedPubkey.Load(), l.remote, env.E.Kind)
|
||||
// Debug: log ephemeral event authorization
|
||||
if env.E.Kind >= 20000 && env.E.Kind < 30000 {
|
||||
log.I.F("ephemeral auth check: kind %d, allowed=%v, reason=%s",
|
||||
env.E.Kind, decision.Allowed, decision.DenyReason)
|
||||
}
|
||||
if !decision.Allowed {
|
||||
log.D.F("HandleEvent: authorization denied: %s (requireAuth=%v)", decision.DenyReason, decision.RequireAuth)
|
||||
if decision.RequireAuth {
|
||||
@@ -256,14 +261,17 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
|
||||
log.I.F("HandleEvent: authorized with access level %s", decision.AccessLevel)
|
||||
|
||||
// Progressive throttle for follows ACL mode (delays non-followed users)
|
||||
if delay := l.getFollowsThrottleDelay(env.E); delay > 0 {
|
||||
log.D.F("HandleEvent: applying progressive throttle delay of %v for %0x from %s",
|
||||
delay, env.E.Pubkey, l.remote)
|
||||
select {
|
||||
case <-l.ctx.Done():
|
||||
return l.ctx.Err()
|
||||
case <-time.After(delay):
|
||||
// Delay completed, continue processing
|
||||
// Skip throttle if a Cashu Access Token is present (authenticated via CAT)
|
||||
if l.cashuToken == nil {
|
||||
if delay := l.getFollowsThrottleDelay(env.E); delay > 0 {
|
||||
log.D.F("HandleEvent: applying progressive throttle delay of %v for %0x from %s",
|
||||
delay, env.E.Pubkey, l.remote)
|
||||
select {
|
||||
case <-l.ctx.Done():
|
||||
return l.ctx.Err()
|
||||
case <-time.After(delay):
|
||||
// Delay completed, continue processing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -435,7 +435,7 @@ func Run(
|
||||
|
||||
// Initialize Blossom blob storage server (only for Badger backend)
|
||||
// MUST be done before UserInterface() which registers routes
|
||||
if badgerDB, ok := db.(*database.D); ok {
|
||||
if badgerDB, ok := db.(*database.D); ok && cfg.BlossomEnabled {
|
||||
log.I.F("Badger backend detected, initializing Blossom server...")
|
||||
if l.blossomServer, err = initializeBlossomServer(ctx, cfg, badgerDB); err != nil {
|
||||
log.E.F("failed to initialize blossom server: %v", err)
|
||||
@@ -445,6 +445,8 @@ func Run(
|
||||
} else {
|
||||
log.W.F("blossom server initialization returned nil without error")
|
||||
}
|
||||
} else if !cfg.BlossomEnabled {
|
||||
log.I.F("Blossom server disabled via ORLY_BLOSSOM_ENABLED=false")
|
||||
} else {
|
||||
log.I.F("Non-Badger backend detected (type: %T), Blossom server not available", db)
|
||||
}
|
||||
|
||||
@@ -159,12 +159,26 @@ func (p *P) Deliver(ev *event.E) {
|
||||
sub Subscription
|
||||
}
|
||||
var deliveries []delivery
|
||||
// Debug: log ephemeral event delivery attempts
|
||||
isEphemeral := ev.Kind >= 20000 && ev.Kind < 30000
|
||||
if isEphemeral {
|
||||
var tagInfo string
|
||||
if ev.Tags != nil {
|
||||
tagInfo = string(ev.Tags.Marshal(nil))
|
||||
}
|
||||
log.I.F("ephemeral event kind %d, id %0x, checking %d connections for matches, tags: %s",
|
||||
ev.Kind, ev.ID[:8], len(p.Map), tagInfo)
|
||||
}
|
||||
for w, subs := range p.Map {
|
||||
for id, subscriber := range subs {
|
||||
if subscriber.Match(ev) {
|
||||
deliveries = append(
|
||||
deliveries, delivery{w: w, id: id, sub: subscriber},
|
||||
)
|
||||
} else if isEphemeral {
|
||||
// Debug: log why ephemeral events don't match
|
||||
log.I.F("ephemeral event kind %d did NOT match subscription %s (filters: %s)",
|
||||
ev.Kind, id, string(subscriber.S.Marshal(nil)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
4
app/web/dist/bundle.js
vendored
4
app/web/dist/bundle.js
vendored
File diff suppressed because one or more lines are too long
2
app/web/dist/bundle.js.map
vendored
2
app/web/dist/bundle.js.map
vendored
File diff suppressed because one or more lines are too long
@@ -6,25 +6,35 @@
|
||||
|
||||
import { fileURLToPath } from 'url';
|
||||
import { dirname, join } from 'path';
|
||||
import { writeFileSync } from 'fs';
|
||||
import { writeFileSync, existsSync } from 'fs';
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = dirname(__filename);
|
||||
|
||||
const KINDS_URL = 'https://git.mleku.dev/mleku/nostr/raw/branch/main/encoders/kind/kinds.json';
|
||||
const OUTPUT_PATH = join(__dirname, '..', 'src', 'eventKinds.js');
|
||||
|
||||
async function fetchKinds() {
|
||||
console.log(`Fetching kinds from ${KINDS_URL}...`);
|
||||
|
||||
const response = await fetch(KINDS_URL);
|
||||
if (!response.ok) {
|
||||
throw new Error(`Failed to fetch kinds.json: ${response.status} ${response.statusText}`);
|
||||
try {
|
||||
const response = await fetch(KINDS_URL, { timeout: 10000 });
|
||||
if (!response.ok) {
|
||||
throw new Error(`HTTP ${response.status} ${response.statusText}`);
|
||||
}
|
||||
|
||||
const data = await response.json();
|
||||
console.log(`Fetched ${Object.keys(data.kinds).length} kinds (version: ${data.version})`);
|
||||
return data;
|
||||
} catch (error) {
|
||||
// Check if we have an existing eventKinds.js we can use
|
||||
if (existsSync(OUTPUT_PATH)) {
|
||||
console.warn(`Warning: Could not fetch kinds.json (${error.message})`);
|
||||
console.log(`Using existing ${OUTPUT_PATH}`);
|
||||
return null; // Signal to skip generation
|
||||
}
|
||||
throw new Error(`Failed to fetch kinds.json and no existing file: ${error.message}`);
|
||||
}
|
||||
|
||||
const data = await response.json();
|
||||
console.log(`Fetched ${Object.keys(data.kinds).length} kinds (version: ${data.version})`);
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
function generateEventKinds(data) {
|
||||
@@ -202,14 +212,18 @@ export const kindCategories = [
|
||||
async function main() {
|
||||
try {
|
||||
const data = await fetchKinds();
|
||||
|
||||
// If fetchKinds returned null, we're using the existing file
|
||||
if (data === null) {
|
||||
console.log('Skipping generation, using existing eventKinds.js');
|
||||
return;
|
||||
}
|
||||
|
||||
const kinds = generateEventKinds(data);
|
||||
const js = generateJS(kinds, data);
|
||||
|
||||
// Write to src/eventKinds.js
|
||||
const outPath = join(__dirname, '..', 'src', 'eventKinds.js');
|
||||
|
||||
writeFileSync(outPath, js);
|
||||
console.log(`Generated ${outPath} with ${kinds.length} kinds`);
|
||||
writeFileSync(OUTPUT_PATH, js);
|
||||
console.log(`Generated ${OUTPUT_PATH} with ${kinds.length} kinds`);
|
||||
} catch (error) {
|
||||
console.error('Error:', error.message);
|
||||
process.exit(1);
|
||||
|
||||
@@ -179,6 +179,28 @@ export class Nip07Signer {
|
||||
}
|
||||
}
|
||||
|
||||
// Merge two event arrays, deduplicating by event id
|
||||
// Newer events (by created_at) take precedence for same id
|
||||
function mergeAndDeduplicateEvents(cached, relay) {
|
||||
const eventMap = new Map();
|
||||
|
||||
// Add cached events first
|
||||
for (const event of cached) {
|
||||
eventMap.set(event.id, event);
|
||||
}
|
||||
|
||||
// Add/update with relay events (they may be newer)
|
||||
for (const event of relay) {
|
||||
const existing = eventMap.get(event.id);
|
||||
if (!existing || event.created_at >= existing.created_at) {
|
||||
eventMap.set(event.id, event);
|
||||
}
|
||||
}
|
||||
|
||||
// Return sorted by created_at descending (newest first)
|
||||
return Array.from(eventMap.values()).sort((a, b) => b.created_at - a.created_at);
|
||||
}
|
||||
|
||||
// IndexedDB helpers for unified event storage
|
||||
// This provides a local cache that all components can access
|
||||
const DB_NAME = "nostrCache";
|
||||
@@ -480,14 +502,9 @@ export async function fetchUserProfile(pubkey) {
|
||||
console.warn("Failed to fetch profile from fallback relays:", error);
|
||||
}
|
||||
|
||||
// 4) No profile found anywhere - create a default profile for new users
|
||||
console.log("No profile found for pubkey, creating default:", pubkey);
|
||||
try {
|
||||
return await createDefaultProfile(pubkey);
|
||||
} catch (e) {
|
||||
console.error("Failed to create default profile:", e);
|
||||
return null;
|
||||
}
|
||||
// 4) No profile found anywhere
|
||||
console.log("No profile found for pubkey:", pubkey);
|
||||
return null;
|
||||
}
|
||||
|
||||
// Helper to fetch profile from fallback relays
|
||||
@@ -561,57 +578,6 @@ async function processProfileEvent(profileEvent, pubkey) {
|
||||
return profile;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a default profile for new users
|
||||
* @param {string} pubkey - The user's public key (hex)
|
||||
* @returns {Promise<Object>} - The created profile
|
||||
*/
|
||||
async function createDefaultProfile(pubkey) {
|
||||
// Generate name from first 6 chars of pubkey
|
||||
const shortId = pubkey.slice(0, 6);
|
||||
const defaultName = `testuser${shortId}`;
|
||||
|
||||
// Get the current origin for the logo URL
|
||||
const logoUrl = `${window.location.origin}/orly.png`;
|
||||
|
||||
const profileContent = {
|
||||
name: defaultName,
|
||||
display_name: defaultName,
|
||||
picture: logoUrl,
|
||||
about: "New ORLY user"
|
||||
};
|
||||
|
||||
const profile = {
|
||||
name: defaultName,
|
||||
displayName: defaultName,
|
||||
picture: logoUrl,
|
||||
about: "New ORLY user",
|
||||
pubkey: pubkey
|
||||
};
|
||||
|
||||
// Try to publish the profile if we have a signer
|
||||
if (nostrClient.signer) {
|
||||
try {
|
||||
const event = {
|
||||
kind: 0,
|
||||
content: JSON.stringify(profileContent),
|
||||
tags: [],
|
||||
created_at: Math.floor(Date.now() / 1000)
|
||||
};
|
||||
|
||||
// Sign and publish using the websocket-auth client
|
||||
const signedEvent = await nostrClient.signer.signEvent(event);
|
||||
await nostrClient.publish(signedEvent);
|
||||
console.log("Default profile published:", signedEvent.id);
|
||||
} catch (e) {
|
||||
console.warn("Failed to publish default profile:", e);
|
||||
// Still return the profile even if publishing fails
|
||||
}
|
||||
}
|
||||
|
||||
return profile;
|
||||
}
|
||||
|
||||
// Fetch events
|
||||
export async function fetchEvents(filters, options = {}) {
|
||||
console.log(`Starting event fetch with filters:`, JSON.stringify(filters, null, 2));
|
||||
@@ -629,9 +595,10 @@ export async function fetchEvents(filters, options = {}) {
|
||||
} = options;
|
||||
|
||||
// Try to get cached events first if requested
|
||||
let cachedEvents = [];
|
||||
if (useCache) {
|
||||
try {
|
||||
const cachedEvents = await queryEventsFromDB(filters);
|
||||
cachedEvents = await queryEventsFromDB(filters);
|
||||
if (cachedEvents.length > 0) {
|
||||
console.log(`Found ${cachedEvents.length} cached events in IndexedDB`);
|
||||
}
|
||||
@@ -641,17 +608,19 @@ export async function fetchEvents(filters, options = {}) {
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const events = [];
|
||||
const relayEvents = [];
|
||||
const timeoutId = setTimeout(() => {
|
||||
console.log(`Timeout reached after ${timeout}ms, returning ${events.length} events`);
|
||||
console.log(`Timeout reached after ${timeout}ms, returning ${relayEvents.length} relay events`);
|
||||
sub.close();
|
||||
|
||||
|
||||
// Store all received events in IndexedDB before resolving
|
||||
if (events.length > 0) {
|
||||
putEvents(events).catch(e => console.warn("Failed to cache events", e));
|
||||
if (relayEvents.length > 0) {
|
||||
putEvents(relayEvents).catch(e => console.warn("Failed to cache events", e));
|
||||
}
|
||||
|
||||
resolve(events);
|
||||
|
||||
// Merge cached events with relay events, deduplicate by id
|
||||
const mergedEvents = mergeAndDeduplicateEvents(cachedEvents, relayEvents);
|
||||
resolve(mergedEvents);
|
||||
}, timeout);
|
||||
|
||||
try {
|
||||
@@ -671,22 +640,25 @@ export async function fetchEvents(filters, options = {}) {
|
||||
created_at: event.created_at,
|
||||
content_preview: event.content?.substring(0, 50)
|
||||
});
|
||||
events.push(event);
|
||||
|
||||
relayEvents.push(event);
|
||||
|
||||
// Store event immediately in IndexedDB
|
||||
putEvent(event).catch(e => console.warn("Failed to cache event", e));
|
||||
},
|
||||
oneose() {
|
||||
console.log(`✅ EOSE received for REQ [${subId}], got ${events.length} events`);
|
||||
console.log(`✅ EOSE received for REQ [${subId}], got ${relayEvents.length} relay events`);
|
||||
clearTimeout(timeoutId);
|
||||
sub.close();
|
||||
|
||||
|
||||
// Store all events in IndexedDB before resolving
|
||||
if (events.length > 0) {
|
||||
putEvents(events).catch(e => console.warn("Failed to cache events", e));
|
||||
if (relayEvents.length > 0) {
|
||||
putEvents(relayEvents).catch(e => console.warn("Failed to cache events", e));
|
||||
}
|
||||
|
||||
resolve(events);
|
||||
|
||||
// Merge cached events with relay events, deduplicate by id
|
||||
const mergedEvents = mergeAndDeduplicateEvents(cachedEvents, relayEvents);
|
||||
console.log(`Merged ${cachedEvents.length} cached + ${relayEvents.length} relay = ${mergedEvents.length} total events`);
|
||||
resolve(mergedEvents);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
@@ -137,7 +137,7 @@ Where `payload` is the standard Nostr message array, e.g.:
|
||||
The encrypted content structure:
|
||||
```json
|
||||
{
|
||||
"type": "EVENT" | "OK" | "EOSE" | "NOTICE" | "CLOSED" | "COUNT" | "AUTH",
|
||||
"type": "EVENT" | "OK" | "EOSE" | "NOTICE" | "CLOSED" | "COUNT" | "AUTH" | "CHUNK",
|
||||
"payload": <standard_nostr_response_array>
|
||||
}
|
||||
```
|
||||
@@ -150,6 +150,7 @@ Where `payload` is the standard Nostr response array, e.g.:
|
||||
- `["CLOSED", "<sub_id>", "<message>"]`
|
||||
- `["COUNT", "<sub_id>", {"count": <n>}]`
|
||||
- `["AUTH", "<challenge>"]`
|
||||
- `[<chunk_object>]` (for CHUNK type, see Message Segmentation)
|
||||
|
||||
### Session Management
|
||||
|
||||
@@ -168,6 +169,85 @@ The conversation key is derived from:
|
||||
- **Secret-based auth**: ECDH between client's secret key (derived from URI secret) and relay's public key
|
||||
- **CAT auth**: ECDH between client's Nostr key and relay's public key
|
||||
|
||||
### Message Segmentation
|
||||
|
||||
Some Nostr events exceed the typical relay message size limits (commonly 64KB). NRC supports message segmentation to handle large payloads by splitting them into multiple chunks.
|
||||
|
||||
#### When to Chunk
|
||||
|
||||
Senders SHOULD chunk messages when the JSON-serialized response exceeds 40KB. This threshold accounts for:
|
||||
- NIP-44 encryption overhead (~100 bytes)
|
||||
- Base64 encoding expansion (~33%)
|
||||
- Event wrapper overhead (tags, signature, etc.)
|
||||
|
||||
#### Chunk Message Format
|
||||
|
||||
When a response is too large, it is split into multiple CHUNK responses:
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "CHUNK",
|
||||
"payload": [{
|
||||
"type": "CHUNK",
|
||||
"messageId": "<uuid>",
|
||||
"index": 0,
|
||||
"total": 3,
|
||||
"data": "<base64_encoded_chunk>"
|
||||
}]
|
||||
}
|
||||
```
|
||||
|
||||
Fields:
|
||||
- `messageId`: A unique identifier (UUID) for the chunked message, used to correlate chunks
|
||||
- `index`: Zero-based chunk index (0, 1, 2, ...)
|
||||
- `total`: Total number of chunks in this message
|
||||
- `data`: Base64-encoded segment of the original message
|
||||
|
||||
#### Chunking Process (Sender)
|
||||
|
||||
1. Serialize the original response message to JSON
|
||||
2. If the serialized length exceeds the threshold (40KB), proceed with chunking
|
||||
3. Encode the JSON string as UTF-8, then Base64 encode it
|
||||
4. Split the Base64 string into chunks of the maximum chunk size
|
||||
5. Generate a unique `messageId` (UUID recommended)
|
||||
6. Send each chunk as a separate CHUNK response event
|
||||
|
||||
Example encoding (JavaScript):
|
||||
```javascript
|
||||
const encoded = btoa(unescape(encodeURIComponent(jsonString)))
|
||||
```
|
||||
|
||||
#### Reassembly Process (Receiver)
|
||||
|
||||
1. When receiving a CHUNK response, buffer it by `messageId`
|
||||
2. Track received chunks by `index`
|
||||
3. When all chunks are received (`chunks.size === total`):
|
||||
a. Concatenate chunk data in index order (0, 1, 2, ...)
|
||||
b. Base64 decode the concatenated string
|
||||
c. Parse as UTF-8 JSON to recover the original response
|
||||
4. Process the reassembled response as normal
|
||||
5. Clean up the chunk buffer
|
||||
|
||||
Example decoding (JavaScript):
|
||||
```javascript
|
||||
const jsonString = decodeURIComponent(escape(atob(concatenatedBase64)))
|
||||
const response = JSON.parse(jsonString)
|
||||
```
|
||||
|
||||
#### Chunk Buffer Management
|
||||
|
||||
Receivers MUST implement chunk buffer cleanup:
|
||||
- Discard incomplete chunk buffers after 60 seconds of inactivity
|
||||
- Limit the number of concurrent incomplete messages to prevent memory exhaustion
|
||||
- Log warnings when discarding stale buffers for debugging
|
||||
|
||||
#### Ordering and Reliability
|
||||
|
||||
- Chunks MAY arrive out of order; receivers MUST reassemble by index
|
||||
- Missing chunks result in message loss; the incomplete buffer is eventually discarded
|
||||
- Duplicate chunks (same messageId + index) SHOULD be ignored
|
||||
- Each chunk is sent as a separate encrypted NRC response event
|
||||
|
||||
### Authentication
|
||||
|
||||
#### Secret-Based Authentication
|
||||
@@ -208,6 +288,9 @@ The conversation key is derived from:
|
||||
4. Match responses using the `e` tag (references request event ID)
|
||||
5. Handle EOSE by waiting for kind 24892 with type "EOSE" in content
|
||||
6. For subscriptions, maintain mapping of internal sub IDs to tunnel session
|
||||
7. **Chunking**: Maintain a chunk buffer map keyed by `messageId`
|
||||
8. **Chunking**: When receiving CHUNK responses, buffer chunks and reassemble when complete
|
||||
9. **Chunking**: Implement 60-second timeout for incomplete chunk buffers
|
||||
|
||||
## Bridge Implementation Notes
|
||||
|
||||
@@ -217,10 +300,14 @@ The conversation key is derived from:
|
||||
4. Capture all relay responses and wrap in kind 24892
|
||||
5. Sign with relay's key and publish to rendezvous relay
|
||||
6. Maintain session state for subscription mapping
|
||||
7. **Chunking**: Check response size before sending; chunk if > 40KB
|
||||
8. **Chunking**: Use consistent messageId (UUID) across all chunks of a message
|
||||
9. **Chunking**: Send chunks in order (index 0, 1, 2, ...) for optimal reassembly
|
||||
|
||||
## Reference Implementations
|
||||
|
||||
- ORLY Relay: [https://git.mleku.dev/mleku/next.orly.dev](https://git.mleku.dev/mleku/next.orly.dev)
|
||||
- ORLY Relay (Bridge): [https://git.mleku.dev/mleku/next.orly.dev](https://git.mleku.dev/mleku/next.orly.dev)
|
||||
- Smesh Client: [https://git.mleku.dev/mleku/smesh](https://git.mleku.dev/mleku/smesh)
|
||||
|
||||
## See Also
|
||||
|
||||
|
||||
8
go.mod
8
go.mod
@@ -3,12 +3,14 @@ module next.orly.dev
|
||||
go 1.25.3
|
||||
|
||||
require (
|
||||
git.mleku.dev/mleku/nostr v1.0.12
|
||||
git.mleku.dev/mleku/nostr v1.0.13
|
||||
github.com/adrg/xdg v0.5.3
|
||||
github.com/alexflint/go-arg v1.6.1
|
||||
github.com/aperturerobotics/go-indexeddb v0.2.3
|
||||
github.com/bits-and-blooms/bloom/v3 v3.7.1
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0
|
||||
github.com/dgraph-io/badger/v4 v4.8.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/hack-pad/safejs v0.1.1
|
||||
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0
|
||||
@@ -22,6 +24,7 @@ require (
|
||||
github.com/stretchr/testify v1.11.1
|
||||
github.com/vertex-lab/nostr-sqlite v0.3.2
|
||||
go-simpler.org/env v0.12.0
|
||||
go.etcd.io/bbolt v1.4.3
|
||||
go.uber.org/atomic v1.11.0
|
||||
golang.org/x/crypto v0.46.0
|
||||
golang.org/x/lint v0.0.0-20241112194109-818c5a804067
|
||||
@@ -37,7 +40,6 @@ require (
|
||||
github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 // indirect
|
||||
github.com/alexflint/go-scalar v1.2.0 // indirect
|
||||
github.com/bits-and-blooms/bitset v1.24.2 // indirect
|
||||
github.com/bits-and-blooms/bloom/v3 v3.7.1 // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
|
||||
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect
|
||||
github.com/bytedance/sonic v1.13.1 // indirect
|
||||
@@ -56,7 +58,6 @@ require (
|
||||
github.com/google/btree v1.1.2 // indirect
|
||||
github.com/google/flatbuffers v25.9.23+incompatible // indirect
|
||||
github.com/google/pprof v0.0.0-20251007162407-5df77e3f7d1d // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
||||
@@ -72,7 +73,6 @@ require (
|
||||
github.com/tidwall/match v1.1.1 // indirect
|
||||
github.com/tidwall/pretty v1.2.1 // indirect
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
go.etcd.io/bbolt v1.4.3 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||
go.opentelemetry.io/otel v1.38.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.38.0 // indirect
|
||||
|
||||
5
go.sum
5
go.sum
@@ -1,5 +1,5 @@
|
||||
git.mleku.dev/mleku/nostr v1.0.12 h1:bjsFUh1Q3fGpU7qsqxggGgrGGUt2OBdu1w8hjDM4gJE=
|
||||
git.mleku.dev/mleku/nostr v1.0.12/go.mod h1:kJwSMmLRnAJ7QJtgXDv2wGgceFU0luwVqrgAL3MI93M=
|
||||
git.mleku.dev/mleku/nostr v1.0.13 h1:FqeOQ9ZX8AFVsAI6XisQkB6cgmhn9DNQ2a8li9gx7aY=
|
||||
git.mleku.dev/mleku/nostr v1.0.13/go.mod h1:kJwSMmLRnAJ7QJtgXDv2wGgceFU0luwVqrgAL3MI93M=
|
||||
github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg=
|
||||
github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
|
||||
github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 h1:ClzzXMDDuUbWfNNZqGeYq4PnYOlwlOVIvSyNaIy0ykg=
|
||||
@@ -161,6 +161,7 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
|
||||
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
|
||||
github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg=
|
||||
github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
|
||||
github.com/vertex-lab/nostr-sqlite v0.3.2 h1:8nZYYIwiKnWLA446qA/wL/Gy+bU0kuaxdLfUyfeTt/E=
|
||||
github.com/vertex-lab/nostr-sqlite v0.3.2/go.mod h1:5bw1wMgJhSdrumsZAWxqy+P0u1g+q02PnlGQn15dnSM=
|
||||
|
||||
6
main.go
6
main.go
@@ -24,7 +24,7 @@ import (
|
||||
"next.orly.dev/pkg/acl"
|
||||
"git.mleku.dev/mleku/nostr/crypto/keys"
|
||||
"git.mleku.dev/mleku/nostr/encoders/bech32encoding"
|
||||
_ "next.orly.dev/pkg/bbolt" // Import for bbolt factory registration
|
||||
bboltdb "next.orly.dev/pkg/bbolt" // Import for bbolt factory and type
|
||||
"next.orly.dev/pkg/database"
|
||||
neo4jdb "next.orly.dev/pkg/neo4j" // Import for neo4j factory and type
|
||||
"git.mleku.dev/mleku/nostr/encoders/hex"
|
||||
@@ -617,6 +617,10 @@ func main() {
|
||||
n4jDB.MaxConcurrentQueries(),
|
||||
)
|
||||
log.I.F("rate limiter configured for Neo4j backend (target: %dMB)", targetMB)
|
||||
} else if _, ok := db.(*bboltdb.B); ok {
|
||||
// BBolt uses memory-mapped IO, so memory-only limiter is appropriate
|
||||
limiter = ratelimit.NewMemoryOnlyLimiter(rlConfig)
|
||||
log.I.F("rate limiter configured for BBolt backend (target: %dMB)", targetMB)
|
||||
} else {
|
||||
// For other backends, create a disabled limiter
|
||||
limiter = ratelimit.NewDisabledLimiter()
|
||||
|
||||
@@ -138,7 +138,7 @@ func (f *Follows) Configure(cfg ...any) (err error) {
|
||||
if f.cfg.FollowsThrottleEnabled {
|
||||
perEvent := f.cfg.FollowsThrottlePerEvent
|
||||
if perEvent == 0 {
|
||||
perEvent = 200 * time.Millisecond
|
||||
perEvent = 25 * time.Millisecond
|
||||
}
|
||||
maxDelay := f.cfg.FollowsThrottleMaxDelay
|
||||
if maxDelay == 0 {
|
||||
|
||||
@@ -200,6 +200,12 @@ func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Check bandwidth rate limit (non-followed users)
|
||||
if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) {
|
||||
s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later")
|
||||
return
|
||||
}
|
||||
|
||||
// Calculate SHA256 after auth check
|
||||
sha256Hash := CalculateSHA256(body)
|
||||
sha256Hex := hex.Enc(sha256Hash)
|
||||
@@ -647,6 +653,12 @@ func (s *Server) handleMirror(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Check bandwidth rate limit (non-followed users)
|
||||
if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) {
|
||||
s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later")
|
||||
return
|
||||
}
|
||||
|
||||
// Note: pubkey may be nil for anonymous uploads if ACL allows it
|
||||
|
||||
// Detect MIME type from remote response
|
||||
@@ -726,6 +738,12 @@ func (s *Server) handleMediaUpload(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Check bandwidth rate limit (non-followed users)
|
||||
if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) {
|
||||
s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later")
|
||||
return
|
||||
}
|
||||
|
||||
// Note: pubkey may be nil for anonymous uploads if ACL allows it
|
||||
|
||||
// Optimize media (placeholder - actual optimization would be implemented here)
|
||||
|
||||
131
pkg/blossom/ratelimit.go
Normal file
131
pkg/blossom/ratelimit.go
Normal file
@@ -0,0 +1,131 @@
|
||||
package blossom
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// BandwidthState tracks upload bandwidth for an identity
|
||||
type BandwidthState struct {
|
||||
BucketBytes int64 // Current token bucket level (bytes available)
|
||||
LastUpdate time.Time // Last time bucket was updated
|
||||
}
|
||||
|
||||
// BandwidthLimiter implements token bucket rate limiting for uploads.
|
||||
// Each identity gets a bucket that replenishes at dailyLimit/day rate.
|
||||
// Uploads consume tokens from the bucket.
|
||||
type BandwidthLimiter struct {
|
||||
mu sync.Mutex
|
||||
states map[string]*BandwidthState // keyed by pubkey hex or IP
|
||||
dailyLimit int64 // bytes per day
|
||||
burstLimit int64 // max bucket size (burst capacity)
|
||||
refillRate float64 // bytes per second refill rate
|
||||
}
|
||||
|
||||
// NewBandwidthLimiter creates a new bandwidth limiter.
|
||||
// dailyLimitMB is the average daily limit in megabytes.
|
||||
// burstLimitMB is the maximum burst capacity in megabytes.
|
||||
func NewBandwidthLimiter(dailyLimitMB, burstLimitMB int64) *BandwidthLimiter {
|
||||
dailyBytes := dailyLimitMB * 1024 * 1024
|
||||
burstBytes := burstLimitMB * 1024 * 1024
|
||||
|
||||
return &BandwidthLimiter{
|
||||
states: make(map[string]*BandwidthState),
|
||||
dailyLimit: dailyBytes,
|
||||
burstLimit: burstBytes,
|
||||
refillRate: float64(dailyBytes) / 86400.0, // bytes per second
|
||||
}
|
||||
}
|
||||
|
||||
// CheckAndConsume checks if an upload of the given size is allowed for the identity,
|
||||
// and if so, consumes the tokens. Returns true if allowed, false if rate limited.
|
||||
// The identity should be pubkey hex for authenticated users, or IP for anonymous.
|
||||
func (bl *BandwidthLimiter) CheckAndConsume(identity string, sizeBytes int64) bool {
|
||||
bl.mu.Lock()
|
||||
defer bl.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
state, exists := bl.states[identity]
|
||||
|
||||
if !exists {
|
||||
// New identity starts with full burst capacity
|
||||
state = &BandwidthState{
|
||||
BucketBytes: bl.burstLimit,
|
||||
LastUpdate: now,
|
||||
}
|
||||
bl.states[identity] = state
|
||||
} else {
|
||||
// Refill bucket based on elapsed time
|
||||
elapsed := now.Sub(state.LastUpdate).Seconds()
|
||||
refill := int64(elapsed * bl.refillRate)
|
||||
state.BucketBytes += refill
|
||||
if state.BucketBytes > bl.burstLimit {
|
||||
state.BucketBytes = bl.burstLimit
|
||||
}
|
||||
state.LastUpdate = now
|
||||
}
|
||||
|
||||
// Check if upload fits in bucket
|
||||
if state.BucketBytes >= sizeBytes {
|
||||
state.BucketBytes -= sizeBytes
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// GetAvailable returns the currently available bytes for an identity.
|
||||
func (bl *BandwidthLimiter) GetAvailable(identity string) int64 {
|
||||
bl.mu.Lock()
|
||||
defer bl.mu.Unlock()
|
||||
|
||||
state, exists := bl.states[identity]
|
||||
if !exists {
|
||||
return bl.burstLimit // New users have full capacity
|
||||
}
|
||||
|
||||
// Calculate current level with refill
|
||||
now := time.Now()
|
||||
elapsed := now.Sub(state.LastUpdate).Seconds()
|
||||
refill := int64(elapsed * bl.refillRate)
|
||||
available := state.BucketBytes + refill
|
||||
if available > bl.burstLimit {
|
||||
available = bl.burstLimit
|
||||
}
|
||||
|
||||
return available
|
||||
}
|
||||
|
||||
// GetTimeUntilAvailable returns how long until the given bytes will be available.
|
||||
func (bl *BandwidthLimiter) GetTimeUntilAvailable(identity string, sizeBytes int64) time.Duration {
|
||||
available := bl.GetAvailable(identity)
|
||||
if available >= sizeBytes {
|
||||
return 0
|
||||
}
|
||||
|
||||
needed := sizeBytes - available
|
||||
seconds := float64(needed) / bl.refillRate
|
||||
return time.Duration(seconds * float64(time.Second))
|
||||
}
|
||||
|
||||
// Cleanup removes entries that have fully replenished (at burst limit).
|
||||
func (bl *BandwidthLimiter) Cleanup() {
|
||||
bl.mu.Lock()
|
||||
defer bl.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
for key, state := range bl.states {
|
||||
elapsed := now.Sub(state.LastUpdate).Seconds()
|
||||
refill := int64(elapsed * bl.refillRate)
|
||||
if state.BucketBytes+refill >= bl.burstLimit {
|
||||
delete(bl.states, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stats returns the number of tracked identities.
|
||||
func (bl *BandwidthLimiter) Stats() int {
|
||||
bl.mu.Lock()
|
||||
defer bl.mu.Unlock()
|
||||
return len(bl.states)
|
||||
}
|
||||
@@ -19,6 +19,9 @@ type Server struct {
|
||||
maxBlobSize int64
|
||||
allowedMimeTypes map[string]bool
|
||||
requireAuth bool
|
||||
|
||||
// Rate limiting for uploads
|
||||
bandwidthLimiter *BandwidthLimiter
|
||||
}
|
||||
|
||||
// Config holds configuration for the Blossom server
|
||||
@@ -27,6 +30,11 @@ type Config struct {
|
||||
MaxBlobSize int64
|
||||
AllowedMimeTypes []string
|
||||
RequireAuth bool
|
||||
|
||||
// Rate limiting (for non-followed users)
|
||||
RateLimitEnabled bool
|
||||
DailyLimitMB int64
|
||||
BurstLimitMB int64
|
||||
}
|
||||
|
||||
// NewServer creates a new Blossom server instance
|
||||
@@ -48,6 +56,20 @@ func NewServer(db *database.D, aclRegistry *acl.S, cfg *Config) *Server {
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize bandwidth limiter if enabled
|
||||
var bwLimiter *BandwidthLimiter
|
||||
if cfg.RateLimitEnabled {
|
||||
dailyMB := cfg.DailyLimitMB
|
||||
if dailyMB <= 0 {
|
||||
dailyMB = 10 // 10MB default
|
||||
}
|
||||
burstMB := cfg.BurstLimitMB
|
||||
if burstMB <= 0 {
|
||||
burstMB = 50 // 50MB default burst
|
||||
}
|
||||
bwLimiter = NewBandwidthLimiter(dailyMB, burstMB)
|
||||
}
|
||||
|
||||
return &Server{
|
||||
db: db,
|
||||
storage: storage,
|
||||
@@ -56,6 +78,7 @@ func NewServer(db *database.D, aclRegistry *acl.S, cfg *Config) *Server {
|
||||
maxBlobSize: cfg.MaxBlobSize,
|
||||
allowedMimeTypes: allowedMap,
|
||||
requireAuth: cfg.RequireAuth,
|
||||
bandwidthLimiter: bwLimiter,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,6 +231,44 @@ func (s *Server) checkACL(
|
||||
return actual >= required
|
||||
}
|
||||
|
||||
// isRateLimitExempt returns true if the user is exempt from rate limiting.
|
||||
// Users with write access or higher (followed users, admins, owners) are exempt.
|
||||
func (s *Server) isRateLimitExempt(pubkey []byte, remoteAddr string) bool {
|
||||
if s.acl == nil {
|
||||
return true // No ACL configured, no rate limiting
|
||||
}
|
||||
|
||||
level := s.acl.GetAccessLevel(pubkey, remoteAddr)
|
||||
|
||||
// Followed users get "write" level, admins/owners get higher
|
||||
// Only "read" and "none" are rate limited
|
||||
return level == "write" || level == "admin" || level == "owner"
|
||||
}
|
||||
|
||||
// checkBandwidthLimit checks if the upload is allowed under rate limits.
|
||||
// Returns true if allowed, false if rate limited.
|
||||
// Exempt users (followed, admin, owner) always return true.
|
||||
func (s *Server) checkBandwidthLimit(pubkey []byte, remoteAddr string, sizeBytes int64) bool {
|
||||
if s.bandwidthLimiter == nil {
|
||||
return true // No rate limiting configured
|
||||
}
|
||||
|
||||
// Check if user is exempt
|
||||
if s.isRateLimitExempt(pubkey, remoteAddr) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Use pubkey hex if available, otherwise IP
|
||||
var identity string
|
||||
if len(pubkey) > 0 {
|
||||
identity = string(pubkey) // Will be converted to hex in handler
|
||||
} else {
|
||||
identity = remoteAddr
|
||||
}
|
||||
|
||||
return s.bandwidthLimiter.CheckAndConsume(identity, sizeBytes)
|
||||
}
|
||||
|
||||
// BaseURLKey is the context key for the base URL (exported for use by app handler)
|
||||
type BaseURLKey struct{}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"sort"
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/errorf"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/database/indexes"
|
||||
types2 "next.orly.dev/pkg/database/indexes/types"
|
||||
@@ -44,6 +45,12 @@ func NormalizeTagValueForHash(key byte, valueBytes []byte) []byte {
|
||||
func CreateIdHashFromData(data []byte) (i *types2.IdHash, err error) {
|
||||
i = new(types2.IdHash)
|
||||
|
||||
// Skip empty data to avoid noisy errors
|
||||
if len(data) == 0 {
|
||||
err = errorf.E("CreateIdHashFromData: empty ID provided")
|
||||
return
|
||||
}
|
||||
|
||||
// If data looks like hex string and has the right length for hex-encoded
|
||||
// sha256
|
||||
if len(data) == 64 {
|
||||
@@ -95,6 +102,11 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
|
||||
// should be an error, but convention just ignores it.
|
||||
if f.Ids.Len() > 0 {
|
||||
for _, id := range f.Ids.T {
|
||||
// Skip empty IDs - some filters have empty ID values
|
||||
if len(id) == 0 {
|
||||
log.D.F("GetIndexesFromFilter: skipping empty ID in filter (ids=%d)", f.Ids.Len())
|
||||
continue
|
||||
}
|
||||
if err = func() (err error) {
|
||||
var i *types2.IdHash
|
||||
if i, err = CreateIdHashFromData(id); chk.E(err) {
|
||||
|
||||
@@ -20,6 +20,10 @@ import (
|
||||
|
||||
func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
|
||||
// log.T.F("GetSerialById: input id=%s", hex.Enc(id))
|
||||
if len(id) == 0 {
|
||||
err = errorf.E("GetSerialById: called with empty ID")
|
||||
return
|
||||
}
|
||||
var idxs []Range
|
||||
if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.NewFromBytesSlice(id)}); chk.E(err) {
|
||||
return
|
||||
@@ -102,6 +106,10 @@ func (d *D) GetSerialsByIdsWithFilter(
|
||||
|
||||
// Process each ID sequentially
|
||||
for _, id := range ids.T {
|
||||
// Skip empty IDs
|
||||
if len(id) == 0 {
|
||||
continue
|
||||
}
|
||||
// idHex := hex.Enc(id)
|
||||
|
||||
// Get the index prefix for this ID
|
||||
|
||||
@@ -24,8 +24,8 @@ func (i *IdHash) Set(idh []byte) {
|
||||
func (i *IdHash) FromId(id []byte) (err error) {
|
||||
if len(id) != sha256.Size {
|
||||
err = errorf.E(
|
||||
"FromId: invalid ID length, got %d require %d", len(id),
|
||||
sha256.Size,
|
||||
"FromId: invalid ID length, got %d require %d (data=%x)", len(id),
|
||||
sha256.Size, id,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package routing
|
||||
import (
|
||||
"git.mleku.dev/mleku/nostr/encoders/event"
|
||||
"git.mleku.dev/mleku/nostr/encoders/kind"
|
||||
"lol.mleku.dev/log"
|
||||
)
|
||||
|
||||
// Publisher abstracts event delivery to subscribers.
|
||||
@@ -22,6 +23,7 @@ func IsEphemeral(k uint16) bool {
|
||||
// - Are immediately delivered to subscribers
|
||||
func MakeEphemeralHandler(publisher Publisher) Handler {
|
||||
return func(ev *event.E, authedPubkey []byte) Result {
|
||||
log.I.F("ephemeral handler received event kind %d, id %0x", ev.Kind, ev.ID[:8])
|
||||
// Clone and deliver immediately without persistence
|
||||
cloned := ev.Clone()
|
||||
go publisher.Deliver(cloned)
|
||||
|
||||
@@ -54,3 +54,11 @@ func MonitorFromNeo4jDriver(
|
||||
) loadmonitor.Monitor {
|
||||
return NewNeo4jMonitor(driver, querySem, maxConcurrency, 100*time.Millisecond)
|
||||
}
|
||||
|
||||
// NewMemoryOnlyLimiter creates a rate limiter that only monitors process memory.
|
||||
// Useful for database backends that don't have their own load metrics (e.g., BBolt).
|
||||
// Since BBolt uses memory-mapped IO, memory pressure is still relevant.
|
||||
func NewMemoryOnlyLimiter(config Config) *Limiter {
|
||||
monitor := NewMemoryMonitor(100 * time.Millisecond)
|
||||
return NewLimiter(config, monitor)
|
||||
}
|
||||
|
||||
@@ -377,29 +377,26 @@ func (l *Limiter) ComputeDelay(opType OperationType) time.Duration {
|
||||
|
||||
// In emergency mode, apply progressive throttling for writes
|
||||
if inEmergency {
|
||||
// Calculate how far above recovery threshold we are
|
||||
// At emergency threshold, add 1x normal delay
|
||||
// For every additional 10% above emergency, double the delay
|
||||
excessPressure := metrics.MemoryPressure - l.config.RecoveryThreshold
|
||||
if excessPressure > 0 {
|
||||
// Progressive multiplier: starts at 2x, doubles every 10% excess
|
||||
multiplier := 2.0
|
||||
for excess := excessPressure; excess > 0.1; excess -= 0.1 {
|
||||
multiplier *= 2
|
||||
}
|
||||
|
||||
emergencyDelaySec := delaySec * multiplier
|
||||
maxEmergencySec := float64(l.config.EmergencyMaxDelayMs) / 1000.0
|
||||
|
||||
if emergencyDelaySec > maxEmergencySec {
|
||||
emergencyDelaySec = maxEmergencySec
|
||||
}
|
||||
// Minimum emergency delay of 100ms to allow other operations
|
||||
if emergencyDelaySec < 0.1 {
|
||||
emergencyDelaySec = 0.1
|
||||
}
|
||||
delaySec = emergencyDelaySec
|
||||
// Calculate how far above emergency threshold we are
|
||||
// Linear scaling: multiplier = 1 + (excess * 5)
|
||||
// At emergency threshold: 1x, at +20% above: 2x, at +40% above: 3x
|
||||
excessPressure := metrics.MemoryPressure - l.config.EmergencyThreshold
|
||||
if excessPressure < 0 {
|
||||
excessPressure = 0
|
||||
}
|
||||
multiplier := 1.0 + excessPressure*5.0
|
||||
|
||||
emergencyDelaySec := delaySec * multiplier
|
||||
maxEmergencySec := float64(l.config.EmergencyMaxDelayMs) / 1000.0
|
||||
|
||||
if emergencyDelaySec > maxEmergencySec {
|
||||
emergencyDelaySec = maxEmergencySec
|
||||
}
|
||||
// Minimum emergency delay of 100ms to allow other operations
|
||||
if emergencyDelaySec < 0.1 {
|
||||
emergencyDelaySec = 0.1
|
||||
}
|
||||
delaySec = emergencyDelaySec
|
||||
}
|
||||
|
||||
if delaySec > 0 {
|
||||
|
||||
214
pkg/ratelimit/memory_monitor.go
Normal file
214
pkg/ratelimit/memory_monitor.go
Normal file
@@ -0,0 +1,214 @@
|
||||
//go:build !(js && wasm)
|
||||
|
||||
package ratelimit
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"next.orly.dev/pkg/interfaces/loadmonitor"
|
||||
)
|
||||
|
||||
// MemoryMonitor is a simple load monitor that only tracks process memory.
|
||||
// Used for database backends that don't have their own load metrics (e.g., BBolt).
|
||||
type MemoryMonitor struct {
|
||||
// Configuration
|
||||
pollInterval time.Duration
|
||||
targetBytes atomic.Uint64
|
||||
|
||||
// State
|
||||
running atomic.Bool
|
||||
stopChan chan struct{}
|
||||
doneChan chan struct{}
|
||||
|
||||
// Metrics (protected by mutex)
|
||||
mu sync.RWMutex
|
||||
currentMetrics loadmonitor.Metrics
|
||||
|
||||
// Latency tracking
|
||||
queryLatencies []time.Duration
|
||||
writeLatencies []time.Duration
|
||||
latencyMu sync.Mutex
|
||||
|
||||
// Emergency mode
|
||||
emergencyThreshold float64 // e.g., 1.167 (target + 1/6)
|
||||
recoveryThreshold float64 // e.g., 0.833 (target - 1/6)
|
||||
inEmergency atomic.Bool
|
||||
}
|
||||
|
||||
// NewMemoryMonitor creates a memory-only load monitor.
|
||||
// pollInterval controls how often memory is sampled (recommended: 100ms).
|
||||
func NewMemoryMonitor(pollInterval time.Duration) *MemoryMonitor {
|
||||
m := &MemoryMonitor{
|
||||
pollInterval: pollInterval,
|
||||
stopChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
queryLatencies: make([]time.Duration, 0, 100),
|
||||
writeLatencies: make([]time.Duration, 0, 100),
|
||||
emergencyThreshold: 1.167, // Default: target + 1/6
|
||||
recoveryThreshold: 0.833, // Default: target - 1/6
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// GetMetrics returns the current load metrics.
|
||||
func (m *MemoryMonitor) GetMetrics() loadmonitor.Metrics {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.currentMetrics
|
||||
}
|
||||
|
||||
// RecordQueryLatency records a query latency sample.
|
||||
func (m *MemoryMonitor) RecordQueryLatency(latency time.Duration) {
|
||||
m.latencyMu.Lock()
|
||||
defer m.latencyMu.Unlock()
|
||||
|
||||
m.queryLatencies = append(m.queryLatencies, latency)
|
||||
if len(m.queryLatencies) > 100 {
|
||||
m.queryLatencies = m.queryLatencies[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// RecordWriteLatency records a write latency sample.
|
||||
func (m *MemoryMonitor) RecordWriteLatency(latency time.Duration) {
|
||||
m.latencyMu.Lock()
|
||||
defer m.latencyMu.Unlock()
|
||||
|
||||
m.writeLatencies = append(m.writeLatencies, latency)
|
||||
if len(m.writeLatencies) > 100 {
|
||||
m.writeLatencies = m.writeLatencies[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// SetMemoryTarget sets the target memory limit in bytes.
|
||||
func (m *MemoryMonitor) SetMemoryTarget(bytes uint64) {
|
||||
m.targetBytes.Store(bytes)
|
||||
}
|
||||
|
||||
// SetEmergencyThreshold sets the memory threshold for emergency mode.
|
||||
func (m *MemoryMonitor) SetEmergencyThreshold(threshold float64) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.emergencyThreshold = threshold
|
||||
}
|
||||
|
||||
// GetEmergencyThreshold returns the current emergency threshold.
|
||||
func (m *MemoryMonitor) GetEmergencyThreshold() float64 {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.emergencyThreshold
|
||||
}
|
||||
|
||||
// ForceEmergencyMode manually triggers emergency mode for a duration.
|
||||
func (m *MemoryMonitor) ForceEmergencyMode(duration time.Duration) {
|
||||
m.inEmergency.Store(true)
|
||||
go func() {
|
||||
time.Sleep(duration)
|
||||
m.inEmergency.Store(false)
|
||||
}()
|
||||
}
|
||||
|
||||
// Start begins background metric collection.
|
||||
func (m *MemoryMonitor) Start() <-chan struct{} {
|
||||
if m.running.Swap(true) {
|
||||
// Already running
|
||||
return m.doneChan
|
||||
}
|
||||
|
||||
go m.pollLoop()
|
||||
return m.doneChan
|
||||
}
|
||||
|
||||
// Stop halts background metric collection.
|
||||
func (m *MemoryMonitor) Stop() {
|
||||
if !m.running.Swap(false) {
|
||||
return
|
||||
}
|
||||
close(m.stopChan)
|
||||
<-m.doneChan
|
||||
}
|
||||
|
||||
// pollLoop continuously samples memory and updates metrics.
|
||||
func (m *MemoryMonitor) pollLoop() {
|
||||
defer close(m.doneChan)
|
||||
|
||||
ticker := time.NewTicker(m.pollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-m.stopChan:
|
||||
return
|
||||
case <-ticker.C:
|
||||
m.updateMetrics()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateMetrics samples current memory and updates the metrics.
|
||||
func (m *MemoryMonitor) updateMetrics() {
|
||||
target := m.targetBytes.Load()
|
||||
if target == 0 {
|
||||
target = 1 // Avoid division by zero
|
||||
}
|
||||
|
||||
// Get physical memory using the same method as other monitors
|
||||
procMem := ReadProcessMemoryStats()
|
||||
physicalMemBytes := procMem.PhysicalMemoryBytes()
|
||||
physicalMemMB := physicalMemBytes / (1024 * 1024)
|
||||
|
||||
// Calculate memory pressure
|
||||
memPressure := float64(physicalMemBytes) / float64(target)
|
||||
|
||||
// Check emergency mode thresholds
|
||||
m.mu.RLock()
|
||||
emergencyThreshold := m.emergencyThreshold
|
||||
recoveryThreshold := m.recoveryThreshold
|
||||
m.mu.RUnlock()
|
||||
|
||||
wasEmergency := m.inEmergency.Load()
|
||||
if memPressure > emergencyThreshold {
|
||||
m.inEmergency.Store(true)
|
||||
} else if memPressure < recoveryThreshold && wasEmergency {
|
||||
m.inEmergency.Store(false)
|
||||
}
|
||||
|
||||
// Calculate average latencies
|
||||
m.latencyMu.Lock()
|
||||
var avgQuery, avgWrite time.Duration
|
||||
if len(m.queryLatencies) > 0 {
|
||||
var total time.Duration
|
||||
for _, l := range m.queryLatencies {
|
||||
total += l
|
||||
}
|
||||
avgQuery = total / time.Duration(len(m.queryLatencies))
|
||||
}
|
||||
if len(m.writeLatencies) > 0 {
|
||||
var total time.Duration
|
||||
for _, l := range m.writeLatencies {
|
||||
total += l
|
||||
}
|
||||
avgWrite = total / time.Duration(len(m.writeLatencies))
|
||||
}
|
||||
m.latencyMu.Unlock()
|
||||
|
||||
// Update metrics
|
||||
m.mu.Lock()
|
||||
m.currentMetrics = loadmonitor.Metrics{
|
||||
MemoryPressure: memPressure,
|
||||
WriteLoad: 0, // No database-specific load metric
|
||||
ReadLoad: 0, // No database-specific load metric
|
||||
QueryLatency: avgQuery,
|
||||
WriteLatency: avgWrite,
|
||||
Timestamp: time.Now(),
|
||||
InEmergencyMode: m.inEmergency.Load(),
|
||||
CompactionPending: false, // BBolt doesn't have compaction
|
||||
PhysicalMemoryMB: physicalMemMB,
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// Ensure MemoryMonitor implements the required interfaces
|
||||
var _ loadmonitor.Monitor = (*MemoryMonitor)(nil)
|
||||
var _ loadmonitor.EmergencyModeMonitor = (*MemoryMonitor)(nil)
|
||||
@@ -1 +1 @@
|
||||
v0.48.10
|
||||
v0.49.0
|
||||
|
||||
Reference in New Issue
Block a user