Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e68916ca5d | ||
|
|
0e30f7a697 | ||
|
|
a0af5bb45e | ||
|
|
9da1784b1b | ||
|
|
205f23fc0c | ||
|
|
489b9f4593 | ||
|
|
604d759a6a | ||
|
|
be72b694eb |
@@ -49,10 +49,12 @@ If no argument provided, default to `patch`.
|
||||
GIT_SSH_COMMAND="ssh -i ~/.ssh/gitmlekudev" git push ssh://mleku@git.mleku.dev:2222/mleku/next.orly.dev.git main --tags
|
||||
```
|
||||
|
||||
11. **Deploy to VPS** by running:
|
||||
```
|
||||
ssh relay.orly.dev 'cd ~/src/next.orly.dev && git stash && git pull origin main && export PATH=$PATH:~/go/bin && CGO_ENABLED=0 go build -o ~/.local/bin/next.orly.dev && sudo /usr/sbin/setcap cap_net_bind_service=+ep ~/.local/bin/next.orly.dev && sudo systemctl restart orly && ~/.local/bin/next.orly.dev version'
|
||||
11. **Deploy to relay.orly.dev** (ARM64):
|
||||
Build on remote (faster than uploading cross-compiled binary due to slow local bandwidth):
|
||||
```bash
|
||||
ssh relay.orly.dev 'cd ~/src/next.orly.dev && git pull origin main && GOPATH=$HOME CGO_ENABLED=0 ~/go/bin/go build -o ~/.local/bin/next.orly.dev && sudo /usr/sbin/setcap cap_net_bind_service=+ep ~/.local/bin/next.orly.dev && sudo systemctl restart orly && ~/.local/bin/next.orly.dev version'
|
||||
```
|
||||
Note: setcap must be re-applied after each binary rebuild to allow binding to ports 80/443.
|
||||
|
||||
12. **Report completion** with the new version and commit hash
|
||||
|
||||
|
||||
55
CLAUDE.md
55
CLAUDE.md
@@ -40,7 +40,7 @@ NOSTR_SECRET_KEY=nsec1... ./nurl https://relay.example.com/api/logs/clear
|
||||
|----------|---------|-------------|
|
||||
| `ORLY_PORT` | 3334 | Server port |
|
||||
| `ORLY_LOG_LEVEL` | info | trace/debug/info/warn/error |
|
||||
| `ORLY_DB_TYPE` | badger | badger/neo4j/wasmdb |
|
||||
| `ORLY_DB_TYPE` | badger | badger/bbolt/neo4j/wasmdb |
|
||||
| `ORLY_POLICY_ENABLED` | false | Enable policy system |
|
||||
| `ORLY_ACL_MODE` | none | none/follows/managed |
|
||||
| `ORLY_TLS_DOMAINS` | | Let's Encrypt domains |
|
||||
@@ -67,6 +67,7 @@ app/
|
||||
web/ → Svelte frontend (embedded via go:embed)
|
||||
pkg/
|
||||
database/ → Database interface + Badger implementation
|
||||
bbolt/ → BBolt backend (HDD-optimized, B+tree)
|
||||
neo4j/ → Neo4j backend with WoT extensions
|
||||
wasmdb/ → WebAssembly IndexedDB backend
|
||||
protocol/ → Nostr protocol (ws/, auth/, publish/)
|
||||
@@ -149,12 +150,59 @@ Before enabling auth-required on any deployment:
|
||||
|
||||
| Backend | Use Case | Build |
|
||||
|---------|----------|-------|
|
||||
| **Badger** (default) | Single-instance, embedded | Standard |
|
||||
| **Badger** (default) | Single-instance, SSD, high performance | Standard |
|
||||
| **BBolt** | HDD-optimized, large archives, lower memory | `ORLY_DB_TYPE=bbolt` |
|
||||
| **Neo4j** | Social graph, WoT queries | `ORLY_DB_TYPE=neo4j` |
|
||||
| **WasmDB** | Browser/WebAssembly | `GOOS=js GOARCH=wasm` |
|
||||
|
||||
All implement `pkg/database.Database` interface.
|
||||
|
||||
### Scaling for Large Archives
|
||||
|
||||
For archives with millions of events, consider:
|
||||
|
||||
**Option 1: Tune Badger (SSD recommended)**
|
||||
```bash
|
||||
# Increase caches for larger working set (requires more RAM)
|
||||
ORLY_DB_BLOCK_CACHE_MB=2048 # 2GB block cache
|
||||
ORLY_DB_INDEX_CACHE_MB=1024 # 1GB index cache
|
||||
ORLY_SERIAL_CACHE_PUBKEYS=500000 # 500k pubkeys
|
||||
ORLY_SERIAL_CACHE_EVENT_IDS=2000000 # 2M event IDs
|
||||
|
||||
# Higher compression to reduce disk IO
|
||||
ORLY_DB_ZSTD_LEVEL=9 # Best compression ratio
|
||||
|
||||
# Enable storage GC with aggressive eviction
|
||||
ORLY_GC_ENABLED=true
|
||||
ORLY_GC_BATCH_SIZE=5000
|
||||
ORLY_MAX_STORAGE_BYTES=107374182400 # 100GB cap
|
||||
```
|
||||
|
||||
**Option 2: Use BBolt for HDD/Low-Memory Deployments**
|
||||
```bash
|
||||
ORLY_DB_TYPE=bbolt
|
||||
|
||||
# Tune for your HDD
|
||||
ORLY_BBOLT_BATCH_MAX_EVENTS=10000 # Larger batches for HDD
|
||||
ORLY_BBOLT_BATCH_MAX_MB=256 # 256MB batch buffer
|
||||
ORLY_BBOLT_FLUSH_TIMEOUT_SEC=60 # Longer flush interval
|
||||
ORLY_BBOLT_BLOOM_SIZE_MB=32 # Larger bloom filter
|
||||
ORLY_BBOLT_MMAP_SIZE_MB=16384 # 16GB mmap (scales with DB size)
|
||||
```
|
||||
|
||||
**Migration Between Backends**
|
||||
```bash
|
||||
# Migrate from Badger to BBolt
|
||||
./orly migrate --from badger --to bbolt
|
||||
|
||||
# Migrate with custom target path
|
||||
./orly migrate --from badger --to bbolt --target-path /mnt/hdd/orly-archive
|
||||
```
|
||||
|
||||
**BBolt vs Badger Trade-offs:**
|
||||
- BBolt: Lower memory, HDD-friendly, simpler (B+tree), slower random reads
|
||||
- Badger: Higher memory, SSD-optimized (LSM), faster concurrent access
|
||||
|
||||
## Logging (lol.mleku.dev)
|
||||
|
||||
```go
|
||||
@@ -221,7 +269,8 @@ if (isValidNsec(nsec)) { ... }
|
||||
|
||||
## Dependencies
|
||||
|
||||
- `github.com/dgraph-io/badger/v4` - Badger DB
|
||||
- `github.com/dgraph-io/badger/v4` - Badger DB (LSM, SSD-optimized)
|
||||
- `go.etcd.io/bbolt` - BBolt DB (B+tree, HDD-optimized)
|
||||
- `github.com/neo4j/neo4j-go-driver/v5` - Neo4j
|
||||
- `github.com/gorilla/websocket` - WebSocket
|
||||
- `github.com/ebitengine/purego` - CGO-free C loading
|
||||
|
||||
@@ -20,8 +20,12 @@ func initializeBlossomServer(
|
||||
blossomCfg := &blossom.Config{
|
||||
BaseURL: "", // Will be set dynamically per request
|
||||
MaxBlobSize: 100 * 1024 * 1024, // 100MB default
|
||||
AllowedMimeTypes: nil, // Allow all MIME types by default
|
||||
AllowedMimeTypes: nil, // Allow all MIME types by default
|
||||
RequireAuth: cfg.AuthRequired || cfg.AuthToWrite,
|
||||
// Rate limiting for non-followed users
|
||||
RateLimitEnabled: cfg.BlossomRateLimitEnabled,
|
||||
DailyLimitMB: cfg.BlossomDailyLimitMB,
|
||||
BurstLimitMB: cfg.BlossomBurstLimitMB,
|
||||
}
|
||||
|
||||
// Create blossom server with relay's ACL registry
|
||||
@@ -31,7 +35,12 @@ func initializeBlossomServer(
|
||||
// We'll need to modify the handler to inject the baseURL per request
|
||||
// For now, we'll use a middleware approach
|
||||
|
||||
log.I.F("blossom server initialized with ACL mode: %s", cfg.ACLMode)
|
||||
if cfg.BlossomRateLimitEnabled {
|
||||
log.I.F("blossom server initialized with ACL mode: %s, rate limit: %dMB/day (burst: %dMB)",
|
||||
cfg.ACLMode, cfg.BlossomDailyLimitMB, cfg.BlossomBurstLimitMB)
|
||||
} else {
|
||||
log.I.F("blossom server initialized with ACL mode: %s", cfg.ACLMode)
|
||||
}
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -41,9 +41,9 @@ type C struct {
|
||||
EnableShutdown bool `env:"ORLY_ENABLE_SHUTDOWN" default:"false" usage:"if true, expose /shutdown on the health port to gracefully stop the process (for profiling)"`
|
||||
LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"`
|
||||
DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"`
|
||||
DBBlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"512" usage:"Badger block cache size in MB (higher improves read hit ratio)"`
|
||||
DBIndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"256" usage:"Badger index cache size in MB (improves index lookup performance)"`
|
||||
DBZSTDLevel int `env:"ORLY_DB_ZSTD_LEVEL" default:"1" usage:"Badger ZSTD compression level (1=fast/500MB/s, 3=default, 9=best ratio, 0=disable)"`
|
||||
DBBlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"1024" usage:"Badger block cache size in MB (higher improves read hit ratio, increase for large archives)"`
|
||||
DBIndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"512" usage:"Badger index cache size in MB (improves index lookup performance, increase for large archives)"`
|
||||
DBZSTDLevel int `env:"ORLY_DB_ZSTD_LEVEL" default:"3" usage:"Badger ZSTD compression level (1=fast/500MB/s, 3=balanced, 9=best ratio/slower, 0=disable)"`
|
||||
LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"`
|
||||
LogBufferSize int `env:"ORLY_LOG_BUFFER_SIZE" default:"10000" usage:"number of log entries to keep in memory for web UI viewing (0 disables)"`
|
||||
Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation,heap,block,goroutine,threadcreate,mutex"`
|
||||
@@ -69,12 +69,18 @@ type C struct {
|
||||
|
||||
// Progressive throttle for follows ACL mode - allows non-followed users to write with increasing delay
|
||||
FollowsThrottleEnabled bool `env:"ORLY_FOLLOWS_THROTTLE" default:"false" usage:"enable progressive delay for non-followed users in follows ACL mode"`
|
||||
FollowsThrottlePerEvent time.Duration `env:"ORLY_FOLLOWS_THROTTLE_INCREMENT" default:"200ms" usage:"delay added per event for non-followed users"`
|
||||
FollowsThrottlePerEvent time.Duration `env:"ORLY_FOLLOWS_THROTTLE_INCREMENT" default:"25ms" usage:"delay added per event for non-followed users"`
|
||||
FollowsThrottleMaxDelay time.Duration `env:"ORLY_FOLLOWS_THROTTLE_MAX" default:"60s" usage:"maximum throttle delay cap"`
|
||||
|
||||
// Blossom blob storage service level settings
|
||||
// Blossom blob storage service settings
|
||||
BlossomEnabled bool `env:"ORLY_BLOSSOM_ENABLED" default:"true" usage:"enable Blossom blob storage server (only works with Badger backend)"`
|
||||
BlossomServiceLevels string `env:"ORLY_BLOSSOM_SERVICE_LEVELS" usage:"comma-separated list of service levels in format: name:storage_mb_per_sat_per_month (e.g., basic:1,premium:10)"`
|
||||
|
||||
// Blossom upload rate limiting (for non-followed users)
|
||||
BlossomRateLimitEnabled bool `env:"ORLY_BLOSSOM_RATE_LIMIT" default:"false" usage:"enable upload rate limiting for non-followed users"`
|
||||
BlossomDailyLimitMB int64 `env:"ORLY_BLOSSOM_DAILY_LIMIT_MB" default:"10" usage:"daily upload limit in MB for non-followed users (EMA averaged)"`
|
||||
BlossomBurstLimitMB int64 `env:"ORLY_BLOSSOM_BURST_LIMIT_MB" default:"50" usage:"max burst upload in MB (bucket cap)"`
|
||||
|
||||
// Web UI and dev mode settings
|
||||
WebDisableEmbedded bool `env:"ORLY_WEB_DISABLE" default:"false" usage:"disable serving the embedded web UI; useful for hot-reload during development"`
|
||||
WebDevProxyURL string `env:"ORLY_WEB_DEV_PROXY_URL" usage:"when ORLY_WEB_DISABLE is true, reverse-proxy non-API paths to this dev server URL (e.g. http://localhost:5173)"`
|
||||
@@ -124,9 +130,9 @@ type C struct {
|
||||
Neo4jMaxTxRetrySeconds int `env:"ORLY_NEO4J_MAX_TX_RETRY_SEC" default:"30" usage:"max seconds for retryable transaction attempts"`
|
||||
Neo4jQueryResultLimit int `env:"ORLY_NEO4J_QUERY_RESULT_LIMIT" default:"10000" usage:"max results returned per query (prevents unbounded memory usage, 0=unlimited)"`
|
||||
|
||||
// Advanced database tuning
|
||||
SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"100000" usage:"max pubkeys to cache for compact event storage (default: 100000, ~3.2MB memory)"`
|
||||
SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"500000" usage:"max event IDs to cache for compact event storage (default: 500000, ~16MB memory)"`
|
||||
// Advanced database tuning (increase for large archives to reduce cache misses)
|
||||
SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"250000" usage:"max pubkeys to cache for compact event storage (~8MB memory, increase for large archives)"`
|
||||
SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"1000000" usage:"max event IDs to cache for compact event storage (~32MB memory, increase for large archives)"`
|
||||
|
||||
// Connection concurrency control
|
||||
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
|
||||
|
||||
@@ -124,6 +124,17 @@ func (s *Server) handleCashuKeysets(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// handleCashuInfo handles GET /cashu/info - returns mint information.
|
||||
func (s *Server) handleCashuInfo(w http.ResponseWriter, r *http.Request) {
|
||||
// CORS headers for browser-based CAT support detection
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
|
||||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Accept")
|
||||
|
||||
// Handle preflight
|
||||
if r.Method == http.MethodOptions {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
|
||||
if s.CashuIssuer == nil {
|
||||
http.Error(w, "Cashu tokens not enabled", http.StatusNotImplemented)
|
||||
return
|
||||
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
)
|
||||
|
||||
func (l *Listener) HandleEvent(msg []byte) (err error) {
|
||||
log.D.F("HandleEvent: START handling event: %s", msg)
|
||||
log.I.F("HandleEvent: START handling event: %s", string(msg[:min(200, len(msg))]))
|
||||
|
||||
// 1. Raw JSON validation (before unmarshal) - use validation service
|
||||
if result := l.eventValidator.ValidateRawJSON(msg); !result.Valid {
|
||||
@@ -231,6 +231,11 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
|
||||
|
||||
// Authorization check (policy + ACL) - use authorization service
|
||||
decision := l.eventAuthorizer.Authorize(env.E, l.authedPubkey.Load(), l.remote, env.E.Kind)
|
||||
// Debug: log ephemeral event authorization
|
||||
if env.E.Kind >= 20000 && env.E.Kind < 30000 {
|
||||
log.I.F("ephemeral auth check: kind %d, allowed=%v, reason=%s",
|
||||
env.E.Kind, decision.Allowed, decision.DenyReason)
|
||||
}
|
||||
if !decision.Allowed {
|
||||
log.D.F("HandleEvent: authorization denied: %s (requireAuth=%v)", decision.DenyReason, decision.RequireAuth)
|
||||
if decision.RequireAuth {
|
||||
@@ -256,14 +261,17 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
|
||||
log.I.F("HandleEvent: authorized with access level %s", decision.AccessLevel)
|
||||
|
||||
// Progressive throttle for follows ACL mode (delays non-followed users)
|
||||
if delay := l.getFollowsThrottleDelay(env.E); delay > 0 {
|
||||
log.D.F("HandleEvent: applying progressive throttle delay of %v for %0x from %s",
|
||||
delay, env.E.Pubkey, l.remote)
|
||||
select {
|
||||
case <-l.ctx.Done():
|
||||
return l.ctx.Err()
|
||||
case <-time.After(delay):
|
||||
// Delay completed, continue processing
|
||||
// Skip throttle if a Cashu Access Token is present (authenticated via CAT)
|
||||
if l.cashuToken == nil {
|
||||
if delay := l.getFollowsThrottleDelay(env.E); delay > 0 {
|
||||
log.D.F("HandleEvent: applying progressive throttle delay of %v for %0x from %s",
|
||||
delay, env.E.Pubkey, l.remote)
|
||||
select {
|
||||
case <-l.ctx.Done():
|
||||
return l.ctx.Err()
|
||||
case <-time.After(delay):
|
||||
// Delay completed, continue processing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -531,11 +531,23 @@ func GetKindCategoriesInfo() []map[string]interface{} {
|
||||
"kinds": []int{1063, 20, 21, 22},
|
||||
},
|
||||
{
|
||||
"id": "marketplace",
|
||||
"name": "Marketplace",
|
||||
"description": "Product listings, stalls, auctions",
|
||||
"id": "marketplace_nip15",
|
||||
"name": "Marketplace (NIP-15)",
|
||||
"description": "Legacy NIP-15 stalls and products",
|
||||
"kinds": []int{30017, 30018, 30019, 30020, 1021, 1022},
|
||||
},
|
||||
{
|
||||
"id": "marketplace_nip99",
|
||||
"name": "Marketplace (NIP-99/Gamma)",
|
||||
"description": "NIP-99 classified listings, collections, shipping, reviews (Plebeian Market)",
|
||||
"kinds": []int{30402, 30403, 30405, 30406, 31555},
|
||||
},
|
||||
{
|
||||
"id": "order_communication",
|
||||
"name": "Order Communication",
|
||||
"description": "Gamma Markets order messages and payment receipts",
|
||||
"kinds": []int{16, 17},
|
||||
},
|
||||
{
|
||||
"id": "groups_nip29",
|
||||
"name": "Group Messaging (NIP-29)",
|
||||
|
||||
@@ -34,7 +34,6 @@ import (
|
||||
|
||||
func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||
log.D.F("handling REQ: %s", msg)
|
||||
log.T.F("HandleReq: START processing from %s", l.remote)
|
||||
// var rem []byte
|
||||
env := reqenvelope.New()
|
||||
if _, err = env.Unmarshal(msg); chk.E(err) {
|
||||
|
||||
@@ -435,7 +435,7 @@ func Run(
|
||||
|
||||
// Initialize Blossom blob storage server (only for Badger backend)
|
||||
// MUST be done before UserInterface() which registers routes
|
||||
if badgerDB, ok := db.(*database.D); ok {
|
||||
if badgerDB, ok := db.(*database.D); ok && cfg.BlossomEnabled {
|
||||
log.I.F("Badger backend detected, initializing Blossom server...")
|
||||
if l.blossomServer, err = initializeBlossomServer(ctx, cfg, badgerDB); err != nil {
|
||||
log.E.F("failed to initialize blossom server: %v", err)
|
||||
@@ -445,6 +445,8 @@ func Run(
|
||||
} else {
|
||||
log.W.F("blossom server initialization returned nil without error")
|
||||
}
|
||||
} else if !cfg.BlossomEnabled {
|
||||
log.I.F("Blossom server disabled via ORLY_BLOSSOM_ENABLED=false")
|
||||
} else {
|
||||
log.I.F("Non-Badger backend detected (type: %T), Blossom server not available", db)
|
||||
}
|
||||
|
||||
@@ -159,12 +159,26 @@ func (p *P) Deliver(ev *event.E) {
|
||||
sub Subscription
|
||||
}
|
||||
var deliveries []delivery
|
||||
// Debug: log ephemeral event delivery attempts
|
||||
isEphemeral := ev.Kind >= 20000 && ev.Kind < 30000
|
||||
if isEphemeral {
|
||||
var tagInfo string
|
||||
if ev.Tags != nil {
|
||||
tagInfo = string(ev.Tags.Marshal(nil))
|
||||
}
|
||||
log.I.F("ephemeral event kind %d, id %0x, checking %d connections for matches, tags: %s",
|
||||
ev.Kind, ev.ID[:8], len(p.Map), tagInfo)
|
||||
}
|
||||
for w, subs := range p.Map {
|
||||
for id, subscriber := range subs {
|
||||
if subscriber.Match(ev) {
|
||||
deliveries = append(
|
||||
deliveries, delivery{w: w, id: id, sub: subscriber},
|
||||
)
|
||||
} else if isEphemeral {
|
||||
// Debug: log why ephemeral events don't match
|
||||
log.I.F("ephemeral event kind %d did NOT match subscription %s (filters: %s)",
|
||||
ev.Kind, id, string(subscriber.S.Marshal(nil)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
26
app/web/dist/bundle.js
vendored
26
app/web/dist/bundle.js
vendored
File diff suppressed because one or more lines are too long
2
app/web/dist/bundle.js.map
vendored
2
app/web/dist/bundle.js.map
vendored
File diff suppressed because one or more lines are too long
@@ -6,25 +6,35 @@
|
||||
|
||||
import { fileURLToPath } from 'url';
|
||||
import { dirname, join } from 'path';
|
||||
import { writeFileSync } from 'fs';
|
||||
import { writeFileSync, existsSync } from 'fs';
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = dirname(__filename);
|
||||
|
||||
const KINDS_URL = 'https://git.mleku.dev/mleku/nostr/raw/branch/main/encoders/kind/kinds.json';
|
||||
const OUTPUT_PATH = join(__dirname, '..', 'src', 'eventKinds.js');
|
||||
|
||||
async function fetchKinds() {
|
||||
console.log(`Fetching kinds from ${KINDS_URL}...`);
|
||||
|
||||
const response = await fetch(KINDS_URL);
|
||||
if (!response.ok) {
|
||||
throw new Error(`Failed to fetch kinds.json: ${response.status} ${response.statusText}`);
|
||||
try {
|
||||
const response = await fetch(KINDS_URL, { timeout: 10000 });
|
||||
if (!response.ok) {
|
||||
throw new Error(`HTTP ${response.status} ${response.statusText}`);
|
||||
}
|
||||
|
||||
const data = await response.json();
|
||||
console.log(`Fetched ${Object.keys(data.kinds).length} kinds (version: ${data.version})`);
|
||||
return data;
|
||||
} catch (error) {
|
||||
// Check if we have an existing eventKinds.js we can use
|
||||
if (existsSync(OUTPUT_PATH)) {
|
||||
console.warn(`Warning: Could not fetch kinds.json (${error.message})`);
|
||||
console.log(`Using existing ${OUTPUT_PATH}`);
|
||||
return null; // Signal to skip generation
|
||||
}
|
||||
throw new Error(`Failed to fetch kinds.json and no existing file: ${error.message}`);
|
||||
}
|
||||
|
||||
const data = await response.json();
|
||||
console.log(`Fetched ${Object.keys(data.kinds).length} kinds (version: ${data.version})`);
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
function generateEventKinds(data) {
|
||||
@@ -202,14 +212,18 @@ export const kindCategories = [
|
||||
async function main() {
|
||||
try {
|
||||
const data = await fetchKinds();
|
||||
|
||||
// If fetchKinds returned null, we're using the existing file
|
||||
if (data === null) {
|
||||
console.log('Skipping generation, using existing eventKinds.js');
|
||||
return;
|
||||
}
|
||||
|
||||
const kinds = generateEventKinds(data);
|
||||
const js = generateJS(kinds, data);
|
||||
|
||||
// Write to src/eventKinds.js
|
||||
const outPath = join(__dirname, '..', 'src', 'eventKinds.js');
|
||||
|
||||
writeFileSync(outPath, js);
|
||||
console.log(`Generated ${outPath} with ${kinds.length} kinds`);
|
||||
writeFileSync(OUTPUT_PATH, js);
|
||||
console.log(`Generated ${OUTPUT_PATH} with ${kinds.length} kinds`);
|
||||
} catch (error) {
|
||||
console.error('Error:', error.message);
|
||||
process.exit(1);
|
||||
|
||||
@@ -30,11 +30,23 @@ export const curationKindCategories = [
|
||||
kinds: [1063, 20, 21, 22],
|
||||
},
|
||||
{
|
||||
id: "marketplace",
|
||||
name: "Marketplace",
|
||||
description: "Product listings, stalls, and marketplace events",
|
||||
id: "marketplace_nip15",
|
||||
name: "Marketplace (NIP-15)",
|
||||
description: "Legacy NIP-15 stalls and products",
|
||||
kinds: [30017, 30018, 30019, 30020],
|
||||
},
|
||||
{
|
||||
id: "marketplace_nip99",
|
||||
name: "Marketplace (NIP-99/Gamma)",
|
||||
description: "NIP-99 classified listings, collections, shipping, reviews (Plebeian Market)",
|
||||
kinds: [30402, 30403, 30405, 30406, 31555],
|
||||
},
|
||||
{
|
||||
id: "order_communication",
|
||||
name: "Order Communication",
|
||||
description: "Gamma Markets order messages and payment receipts (kinds 16, 17)",
|
||||
kinds: [16, 17],
|
||||
},
|
||||
{
|
||||
id: "groups_nip29",
|
||||
name: "Group Messaging (NIP-29)",
|
||||
|
||||
@@ -179,6 +179,28 @@ export class Nip07Signer {
|
||||
}
|
||||
}
|
||||
|
||||
// Merge two event arrays, deduplicating by event id
|
||||
// Newer events (by created_at) take precedence for same id
|
||||
function mergeAndDeduplicateEvents(cached, relay) {
|
||||
const eventMap = new Map();
|
||||
|
||||
// Add cached events first
|
||||
for (const event of cached) {
|
||||
eventMap.set(event.id, event);
|
||||
}
|
||||
|
||||
// Add/update with relay events (they may be newer)
|
||||
for (const event of relay) {
|
||||
const existing = eventMap.get(event.id);
|
||||
if (!existing || event.created_at >= existing.created_at) {
|
||||
eventMap.set(event.id, event);
|
||||
}
|
||||
}
|
||||
|
||||
// Return sorted by created_at descending (newest first)
|
||||
return Array.from(eventMap.values()).sort((a, b) => b.created_at - a.created_at);
|
||||
}
|
||||
|
||||
// IndexedDB helpers for unified event storage
|
||||
// This provides a local cache that all components can access
|
||||
const DB_NAME = "nostrCache";
|
||||
@@ -573,9 +595,10 @@ export async function fetchEvents(filters, options = {}) {
|
||||
} = options;
|
||||
|
||||
// Try to get cached events first if requested
|
||||
let cachedEvents = [];
|
||||
if (useCache) {
|
||||
try {
|
||||
const cachedEvents = await queryEventsFromDB(filters);
|
||||
cachedEvents = await queryEventsFromDB(filters);
|
||||
if (cachedEvents.length > 0) {
|
||||
console.log(`Found ${cachedEvents.length} cached events in IndexedDB`);
|
||||
}
|
||||
@@ -585,17 +608,19 @@ export async function fetchEvents(filters, options = {}) {
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const events = [];
|
||||
const relayEvents = [];
|
||||
const timeoutId = setTimeout(() => {
|
||||
console.log(`Timeout reached after ${timeout}ms, returning ${events.length} events`);
|
||||
console.log(`Timeout reached after ${timeout}ms, returning ${relayEvents.length} relay events`);
|
||||
sub.close();
|
||||
|
||||
|
||||
// Store all received events in IndexedDB before resolving
|
||||
if (events.length > 0) {
|
||||
putEvents(events).catch(e => console.warn("Failed to cache events", e));
|
||||
if (relayEvents.length > 0) {
|
||||
putEvents(relayEvents).catch(e => console.warn("Failed to cache events", e));
|
||||
}
|
||||
|
||||
resolve(events);
|
||||
|
||||
// Merge cached events with relay events, deduplicate by id
|
||||
const mergedEvents = mergeAndDeduplicateEvents(cachedEvents, relayEvents);
|
||||
resolve(mergedEvents);
|
||||
}, timeout);
|
||||
|
||||
try {
|
||||
@@ -615,22 +640,25 @@ export async function fetchEvents(filters, options = {}) {
|
||||
created_at: event.created_at,
|
||||
content_preview: event.content?.substring(0, 50)
|
||||
});
|
||||
events.push(event);
|
||||
|
||||
relayEvents.push(event);
|
||||
|
||||
// Store event immediately in IndexedDB
|
||||
putEvent(event).catch(e => console.warn("Failed to cache event", e));
|
||||
},
|
||||
oneose() {
|
||||
console.log(`✅ EOSE received for REQ [${subId}], got ${events.length} events`);
|
||||
console.log(`✅ EOSE received for REQ [${subId}], got ${relayEvents.length} relay events`);
|
||||
clearTimeout(timeoutId);
|
||||
sub.close();
|
||||
|
||||
|
||||
// Store all events in IndexedDB before resolving
|
||||
if (events.length > 0) {
|
||||
putEvents(events).catch(e => console.warn("Failed to cache events", e));
|
||||
if (relayEvents.length > 0) {
|
||||
putEvents(relayEvents).catch(e => console.warn("Failed to cache events", e));
|
||||
}
|
||||
|
||||
resolve(events);
|
||||
|
||||
// Merge cached events with relay events, deduplicate by id
|
||||
const mergedEvents = mergeAndDeduplicateEvents(cachedEvents, relayEvents);
|
||||
console.log(`Merged ${cachedEvents.length} cached + ${relayEvents.length} relay = ${mergedEvents.length} total events`);
|
||||
resolve(mergedEvents);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
@@ -137,7 +137,7 @@ Where `payload` is the standard Nostr message array, e.g.:
|
||||
The encrypted content structure:
|
||||
```json
|
||||
{
|
||||
"type": "EVENT" | "OK" | "EOSE" | "NOTICE" | "CLOSED" | "COUNT" | "AUTH",
|
||||
"type": "EVENT" | "OK" | "EOSE" | "NOTICE" | "CLOSED" | "COUNT" | "AUTH" | "CHUNK",
|
||||
"payload": <standard_nostr_response_array>
|
||||
}
|
||||
```
|
||||
@@ -150,6 +150,7 @@ Where `payload` is the standard Nostr response array, e.g.:
|
||||
- `["CLOSED", "<sub_id>", "<message>"]`
|
||||
- `["COUNT", "<sub_id>", {"count": <n>}]`
|
||||
- `["AUTH", "<challenge>"]`
|
||||
- `[<chunk_object>]` (for CHUNK type, see Message Segmentation)
|
||||
|
||||
### Session Management
|
||||
|
||||
@@ -168,6 +169,85 @@ The conversation key is derived from:
|
||||
- **Secret-based auth**: ECDH between client's secret key (derived from URI secret) and relay's public key
|
||||
- **CAT auth**: ECDH between client's Nostr key and relay's public key
|
||||
|
||||
### Message Segmentation
|
||||
|
||||
Some Nostr events exceed the typical relay message size limits (commonly 64KB). NRC supports message segmentation to handle large payloads by splitting them into multiple chunks.
|
||||
|
||||
#### When to Chunk
|
||||
|
||||
Senders SHOULD chunk messages when the JSON-serialized response exceeds 40KB. This threshold accounts for:
|
||||
- NIP-44 encryption overhead (~100 bytes)
|
||||
- Base64 encoding expansion (~33%)
|
||||
- Event wrapper overhead (tags, signature, etc.)
|
||||
|
||||
#### Chunk Message Format
|
||||
|
||||
When a response is too large, it is split into multiple CHUNK responses:
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "CHUNK",
|
||||
"payload": [{
|
||||
"type": "CHUNK",
|
||||
"messageId": "<uuid>",
|
||||
"index": 0,
|
||||
"total": 3,
|
||||
"data": "<base64_encoded_chunk>"
|
||||
}]
|
||||
}
|
||||
```
|
||||
|
||||
Fields:
|
||||
- `messageId`: A unique identifier (UUID) for the chunked message, used to correlate chunks
|
||||
- `index`: Zero-based chunk index (0, 1, 2, ...)
|
||||
- `total`: Total number of chunks in this message
|
||||
- `data`: Base64-encoded segment of the original message
|
||||
|
||||
#### Chunking Process (Sender)
|
||||
|
||||
1. Serialize the original response message to JSON
|
||||
2. If the serialized length exceeds the threshold (40KB), proceed with chunking
|
||||
3. Encode the JSON string as UTF-8, then Base64 encode it
|
||||
4. Split the Base64 string into chunks of the maximum chunk size
|
||||
5. Generate a unique `messageId` (UUID recommended)
|
||||
6. Send each chunk as a separate CHUNK response event
|
||||
|
||||
Example encoding (JavaScript):
|
||||
```javascript
|
||||
const encoded = btoa(unescape(encodeURIComponent(jsonString)))
|
||||
```
|
||||
|
||||
#### Reassembly Process (Receiver)
|
||||
|
||||
1. When receiving a CHUNK response, buffer it by `messageId`
|
||||
2. Track received chunks by `index`
|
||||
3. When all chunks are received (`chunks.size === total`):
|
||||
a. Concatenate chunk data in index order (0, 1, 2, ...)
|
||||
b. Base64 decode the concatenated string
|
||||
c. Parse as UTF-8 JSON to recover the original response
|
||||
4. Process the reassembled response as normal
|
||||
5. Clean up the chunk buffer
|
||||
|
||||
Example decoding (JavaScript):
|
||||
```javascript
|
||||
const jsonString = decodeURIComponent(escape(atob(concatenatedBase64)))
|
||||
const response = JSON.parse(jsonString)
|
||||
```
|
||||
|
||||
#### Chunk Buffer Management
|
||||
|
||||
Receivers MUST implement chunk buffer cleanup:
|
||||
- Discard incomplete chunk buffers after 60 seconds of inactivity
|
||||
- Limit the number of concurrent incomplete messages to prevent memory exhaustion
|
||||
- Log warnings when discarding stale buffers for debugging
|
||||
|
||||
#### Ordering and Reliability
|
||||
|
||||
- Chunks MAY arrive out of order; receivers MUST reassemble by index
|
||||
- Missing chunks result in message loss; the incomplete buffer is eventually discarded
|
||||
- Duplicate chunks (same messageId + index) SHOULD be ignored
|
||||
- Each chunk is sent as a separate encrypted NRC response event
|
||||
|
||||
### Authentication
|
||||
|
||||
#### Secret-Based Authentication
|
||||
@@ -208,6 +288,9 @@ The conversation key is derived from:
|
||||
4. Match responses using the `e` tag (references request event ID)
|
||||
5. Handle EOSE by waiting for kind 24892 with type "EOSE" in content
|
||||
6. For subscriptions, maintain mapping of internal sub IDs to tunnel session
|
||||
7. **Chunking**: Maintain a chunk buffer map keyed by `messageId`
|
||||
8. **Chunking**: When receiving CHUNK responses, buffer chunks and reassemble when complete
|
||||
9. **Chunking**: Implement 60-second timeout for incomplete chunk buffers
|
||||
|
||||
## Bridge Implementation Notes
|
||||
|
||||
@@ -217,10 +300,14 @@ The conversation key is derived from:
|
||||
4. Capture all relay responses and wrap in kind 24892
|
||||
5. Sign with relay's key and publish to rendezvous relay
|
||||
6. Maintain session state for subscription mapping
|
||||
7. **Chunking**: Check response size before sending; chunk if > 40KB
|
||||
8. **Chunking**: Use consistent messageId (UUID) across all chunks of a message
|
||||
9. **Chunking**: Send chunks in order (index 0, 1, 2, ...) for optimal reassembly
|
||||
|
||||
## Reference Implementations
|
||||
|
||||
- ORLY Relay: [https://git.mleku.dev/mleku/next.orly.dev](https://git.mleku.dev/mleku/next.orly.dev)
|
||||
- ORLY Relay (Bridge): [https://git.mleku.dev/mleku/next.orly.dev](https://git.mleku.dev/mleku/next.orly.dev)
|
||||
- Smesh Client: [https://git.mleku.dev/mleku/smesh](https://git.mleku.dev/mleku/smesh)
|
||||
|
||||
## See Also
|
||||
|
||||
|
||||
8
go.mod
8
go.mod
@@ -3,12 +3,14 @@ module next.orly.dev
|
||||
go 1.25.3
|
||||
|
||||
require (
|
||||
git.mleku.dev/mleku/nostr v1.0.12
|
||||
git.mleku.dev/mleku/nostr v1.0.13
|
||||
github.com/adrg/xdg v0.5.3
|
||||
github.com/alexflint/go-arg v1.6.1
|
||||
github.com/aperturerobotics/go-indexeddb v0.2.3
|
||||
github.com/bits-and-blooms/bloom/v3 v3.7.1
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0
|
||||
github.com/dgraph-io/badger/v4 v4.8.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/hack-pad/safejs v0.1.1
|
||||
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0
|
||||
@@ -22,6 +24,7 @@ require (
|
||||
github.com/stretchr/testify v1.11.1
|
||||
github.com/vertex-lab/nostr-sqlite v0.3.2
|
||||
go-simpler.org/env v0.12.0
|
||||
go.etcd.io/bbolt v1.4.3
|
||||
go.uber.org/atomic v1.11.0
|
||||
golang.org/x/crypto v0.46.0
|
||||
golang.org/x/lint v0.0.0-20241112194109-818c5a804067
|
||||
@@ -37,7 +40,6 @@ require (
|
||||
github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 // indirect
|
||||
github.com/alexflint/go-scalar v1.2.0 // indirect
|
||||
github.com/bits-and-blooms/bitset v1.24.2 // indirect
|
||||
github.com/bits-and-blooms/bloom/v3 v3.7.1 // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
|
||||
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect
|
||||
github.com/bytedance/sonic v1.13.1 // indirect
|
||||
@@ -56,7 +58,6 @@ require (
|
||||
github.com/google/btree v1.1.2 // indirect
|
||||
github.com/google/flatbuffers v25.9.23+incompatible // indirect
|
||||
github.com/google/pprof v0.0.0-20251007162407-5df77e3f7d1d // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
||||
@@ -72,7 +73,6 @@ require (
|
||||
github.com/tidwall/match v1.1.1 // indirect
|
||||
github.com/tidwall/pretty v1.2.1 // indirect
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
go.etcd.io/bbolt v1.4.3 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||
go.opentelemetry.io/otel v1.38.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.38.0 // indirect
|
||||
|
||||
5
go.sum
5
go.sum
@@ -1,5 +1,5 @@
|
||||
git.mleku.dev/mleku/nostr v1.0.12 h1:bjsFUh1Q3fGpU7qsqxggGgrGGUt2OBdu1w8hjDM4gJE=
|
||||
git.mleku.dev/mleku/nostr v1.0.12/go.mod h1:kJwSMmLRnAJ7QJtgXDv2wGgceFU0luwVqrgAL3MI93M=
|
||||
git.mleku.dev/mleku/nostr v1.0.13 h1:FqeOQ9ZX8AFVsAI6XisQkB6cgmhn9DNQ2a8li9gx7aY=
|
||||
git.mleku.dev/mleku/nostr v1.0.13/go.mod h1:kJwSMmLRnAJ7QJtgXDv2wGgceFU0luwVqrgAL3MI93M=
|
||||
github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg=
|
||||
github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
|
||||
github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 h1:ClzzXMDDuUbWfNNZqGeYq4PnYOlwlOVIvSyNaIy0ykg=
|
||||
@@ -161,6 +161,7 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
|
||||
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
|
||||
github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg=
|
||||
github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
|
||||
github.com/vertex-lab/nostr-sqlite v0.3.2 h1:8nZYYIwiKnWLA446qA/wL/Gy+bU0kuaxdLfUyfeTt/E=
|
||||
github.com/vertex-lab/nostr-sqlite v0.3.2/go.mod h1:5bw1wMgJhSdrumsZAWxqy+P0u1g+q02PnlGQn15dnSM=
|
||||
|
||||
6
main.go
6
main.go
@@ -24,7 +24,7 @@ import (
|
||||
"next.orly.dev/pkg/acl"
|
||||
"git.mleku.dev/mleku/nostr/crypto/keys"
|
||||
"git.mleku.dev/mleku/nostr/encoders/bech32encoding"
|
||||
_ "next.orly.dev/pkg/bbolt" // Import for bbolt factory registration
|
||||
bboltdb "next.orly.dev/pkg/bbolt" // Import for bbolt factory and type
|
||||
"next.orly.dev/pkg/database"
|
||||
neo4jdb "next.orly.dev/pkg/neo4j" // Import for neo4j factory and type
|
||||
"git.mleku.dev/mleku/nostr/encoders/hex"
|
||||
@@ -617,6 +617,10 @@ func main() {
|
||||
n4jDB.MaxConcurrentQueries(),
|
||||
)
|
||||
log.I.F("rate limiter configured for Neo4j backend (target: %dMB)", targetMB)
|
||||
} else if _, ok := db.(*bboltdb.B); ok {
|
||||
// BBolt uses memory-mapped IO, so memory-only limiter is appropriate
|
||||
limiter = ratelimit.NewMemoryOnlyLimiter(rlConfig)
|
||||
log.I.F("rate limiter configured for BBolt backend (target: %dMB)", targetMB)
|
||||
} else {
|
||||
// For other backends, create a disabled limiter
|
||||
limiter = ratelimit.NewDisabledLimiter()
|
||||
|
||||
@@ -138,7 +138,7 @@ func (f *Follows) Configure(cfg ...any) (err error) {
|
||||
if f.cfg.FollowsThrottleEnabled {
|
||||
perEvent := f.cfg.FollowsThrottlePerEvent
|
||||
if perEvent == 0 {
|
||||
perEvent = 200 * time.Millisecond
|
||||
perEvent = 25 * time.Millisecond
|
||||
}
|
||||
maxDelay := f.cfg.FollowsThrottleMaxDelay
|
||||
if maxDelay == 0 {
|
||||
|
||||
@@ -200,6 +200,12 @@ func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Check bandwidth rate limit (non-followed users)
|
||||
if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) {
|
||||
s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later")
|
||||
return
|
||||
}
|
||||
|
||||
// Calculate SHA256 after auth check
|
||||
sha256Hash := CalculateSHA256(body)
|
||||
sha256Hex := hex.Enc(sha256Hash)
|
||||
@@ -647,6 +653,12 @@ func (s *Server) handleMirror(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Check bandwidth rate limit (non-followed users)
|
||||
if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) {
|
||||
s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later")
|
||||
return
|
||||
}
|
||||
|
||||
// Note: pubkey may be nil for anonymous uploads if ACL allows it
|
||||
|
||||
// Detect MIME type from remote response
|
||||
@@ -726,6 +738,12 @@ func (s *Server) handleMediaUpload(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Check bandwidth rate limit (non-followed users)
|
||||
if !s.checkBandwidthLimit(pubkey, remoteAddr, int64(len(body))) {
|
||||
s.setErrorResponse(w, http.StatusTooManyRequests, "upload rate limit exceeded, try again later")
|
||||
return
|
||||
}
|
||||
|
||||
// Note: pubkey may be nil for anonymous uploads if ACL allows it
|
||||
|
||||
// Optimize media (placeholder - actual optimization would be implemented here)
|
||||
|
||||
131
pkg/blossom/ratelimit.go
Normal file
131
pkg/blossom/ratelimit.go
Normal file
@@ -0,0 +1,131 @@
|
||||
package blossom
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// BandwidthState tracks upload bandwidth for an identity
|
||||
type BandwidthState struct {
|
||||
BucketBytes int64 // Current token bucket level (bytes available)
|
||||
LastUpdate time.Time // Last time bucket was updated
|
||||
}
|
||||
|
||||
// BandwidthLimiter implements token bucket rate limiting for uploads.
|
||||
// Each identity gets a bucket that replenishes at dailyLimit/day rate.
|
||||
// Uploads consume tokens from the bucket.
|
||||
type BandwidthLimiter struct {
|
||||
mu sync.Mutex
|
||||
states map[string]*BandwidthState // keyed by pubkey hex or IP
|
||||
dailyLimit int64 // bytes per day
|
||||
burstLimit int64 // max bucket size (burst capacity)
|
||||
refillRate float64 // bytes per second refill rate
|
||||
}
|
||||
|
||||
// NewBandwidthLimiter creates a new bandwidth limiter.
|
||||
// dailyLimitMB is the average daily limit in megabytes.
|
||||
// burstLimitMB is the maximum burst capacity in megabytes.
|
||||
func NewBandwidthLimiter(dailyLimitMB, burstLimitMB int64) *BandwidthLimiter {
|
||||
dailyBytes := dailyLimitMB * 1024 * 1024
|
||||
burstBytes := burstLimitMB * 1024 * 1024
|
||||
|
||||
return &BandwidthLimiter{
|
||||
states: make(map[string]*BandwidthState),
|
||||
dailyLimit: dailyBytes,
|
||||
burstLimit: burstBytes,
|
||||
refillRate: float64(dailyBytes) / 86400.0, // bytes per second
|
||||
}
|
||||
}
|
||||
|
||||
// CheckAndConsume checks if an upload of the given size is allowed for the identity,
|
||||
// and if so, consumes the tokens. Returns true if allowed, false if rate limited.
|
||||
// The identity should be pubkey hex for authenticated users, or IP for anonymous.
|
||||
func (bl *BandwidthLimiter) CheckAndConsume(identity string, sizeBytes int64) bool {
|
||||
bl.mu.Lock()
|
||||
defer bl.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
state, exists := bl.states[identity]
|
||||
|
||||
if !exists {
|
||||
// New identity starts with full burst capacity
|
||||
state = &BandwidthState{
|
||||
BucketBytes: bl.burstLimit,
|
||||
LastUpdate: now,
|
||||
}
|
||||
bl.states[identity] = state
|
||||
} else {
|
||||
// Refill bucket based on elapsed time
|
||||
elapsed := now.Sub(state.LastUpdate).Seconds()
|
||||
refill := int64(elapsed * bl.refillRate)
|
||||
state.BucketBytes += refill
|
||||
if state.BucketBytes > bl.burstLimit {
|
||||
state.BucketBytes = bl.burstLimit
|
||||
}
|
||||
state.LastUpdate = now
|
||||
}
|
||||
|
||||
// Check if upload fits in bucket
|
||||
if state.BucketBytes >= sizeBytes {
|
||||
state.BucketBytes -= sizeBytes
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// GetAvailable returns the currently available bytes for an identity.
|
||||
func (bl *BandwidthLimiter) GetAvailable(identity string) int64 {
|
||||
bl.mu.Lock()
|
||||
defer bl.mu.Unlock()
|
||||
|
||||
state, exists := bl.states[identity]
|
||||
if !exists {
|
||||
return bl.burstLimit // New users have full capacity
|
||||
}
|
||||
|
||||
// Calculate current level with refill
|
||||
now := time.Now()
|
||||
elapsed := now.Sub(state.LastUpdate).Seconds()
|
||||
refill := int64(elapsed * bl.refillRate)
|
||||
available := state.BucketBytes + refill
|
||||
if available > bl.burstLimit {
|
||||
available = bl.burstLimit
|
||||
}
|
||||
|
||||
return available
|
||||
}
|
||||
|
||||
// GetTimeUntilAvailable returns how long until the given bytes will be available.
|
||||
func (bl *BandwidthLimiter) GetTimeUntilAvailable(identity string, sizeBytes int64) time.Duration {
|
||||
available := bl.GetAvailable(identity)
|
||||
if available >= sizeBytes {
|
||||
return 0
|
||||
}
|
||||
|
||||
needed := sizeBytes - available
|
||||
seconds := float64(needed) / bl.refillRate
|
||||
return time.Duration(seconds * float64(time.Second))
|
||||
}
|
||||
|
||||
// Cleanup removes entries that have fully replenished (at burst limit).
|
||||
func (bl *BandwidthLimiter) Cleanup() {
|
||||
bl.mu.Lock()
|
||||
defer bl.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
for key, state := range bl.states {
|
||||
elapsed := now.Sub(state.LastUpdate).Seconds()
|
||||
refill := int64(elapsed * bl.refillRate)
|
||||
if state.BucketBytes+refill >= bl.burstLimit {
|
||||
delete(bl.states, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stats returns the number of tracked identities.
|
||||
func (bl *BandwidthLimiter) Stats() int {
|
||||
bl.mu.Lock()
|
||||
defer bl.mu.Unlock()
|
||||
return len(bl.states)
|
||||
}
|
||||
@@ -19,6 +19,9 @@ type Server struct {
|
||||
maxBlobSize int64
|
||||
allowedMimeTypes map[string]bool
|
||||
requireAuth bool
|
||||
|
||||
// Rate limiting for uploads
|
||||
bandwidthLimiter *BandwidthLimiter
|
||||
}
|
||||
|
||||
// Config holds configuration for the Blossom server
|
||||
@@ -27,6 +30,11 @@ type Config struct {
|
||||
MaxBlobSize int64
|
||||
AllowedMimeTypes []string
|
||||
RequireAuth bool
|
||||
|
||||
// Rate limiting (for non-followed users)
|
||||
RateLimitEnabled bool
|
||||
DailyLimitMB int64
|
||||
BurstLimitMB int64
|
||||
}
|
||||
|
||||
// NewServer creates a new Blossom server instance
|
||||
@@ -48,6 +56,20 @@ func NewServer(db *database.D, aclRegistry *acl.S, cfg *Config) *Server {
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize bandwidth limiter if enabled
|
||||
var bwLimiter *BandwidthLimiter
|
||||
if cfg.RateLimitEnabled {
|
||||
dailyMB := cfg.DailyLimitMB
|
||||
if dailyMB <= 0 {
|
||||
dailyMB = 10 // 10MB default
|
||||
}
|
||||
burstMB := cfg.BurstLimitMB
|
||||
if burstMB <= 0 {
|
||||
burstMB = 50 // 50MB default burst
|
||||
}
|
||||
bwLimiter = NewBandwidthLimiter(dailyMB, burstMB)
|
||||
}
|
||||
|
||||
return &Server{
|
||||
db: db,
|
||||
storage: storage,
|
||||
@@ -56,6 +78,7 @@ func NewServer(db *database.D, aclRegistry *acl.S, cfg *Config) *Server {
|
||||
maxBlobSize: cfg.MaxBlobSize,
|
||||
allowedMimeTypes: allowedMap,
|
||||
requireAuth: cfg.RequireAuth,
|
||||
bandwidthLimiter: bwLimiter,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,6 +231,44 @@ func (s *Server) checkACL(
|
||||
return actual >= required
|
||||
}
|
||||
|
||||
// isRateLimitExempt returns true if the user is exempt from rate limiting.
|
||||
// Users with write access or higher (followed users, admins, owners) are exempt.
|
||||
func (s *Server) isRateLimitExempt(pubkey []byte, remoteAddr string) bool {
|
||||
if s.acl == nil {
|
||||
return true // No ACL configured, no rate limiting
|
||||
}
|
||||
|
||||
level := s.acl.GetAccessLevel(pubkey, remoteAddr)
|
||||
|
||||
// Followed users get "write" level, admins/owners get higher
|
||||
// Only "read" and "none" are rate limited
|
||||
return level == "write" || level == "admin" || level == "owner"
|
||||
}
|
||||
|
||||
// checkBandwidthLimit checks if the upload is allowed under rate limits.
|
||||
// Returns true if allowed, false if rate limited.
|
||||
// Exempt users (followed, admin, owner) always return true.
|
||||
func (s *Server) checkBandwidthLimit(pubkey []byte, remoteAddr string, sizeBytes int64) bool {
|
||||
if s.bandwidthLimiter == nil {
|
||||
return true // No rate limiting configured
|
||||
}
|
||||
|
||||
// Check if user is exempt
|
||||
if s.isRateLimitExempt(pubkey, remoteAddr) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Use pubkey hex if available, otherwise IP
|
||||
var identity string
|
||||
if len(pubkey) > 0 {
|
||||
identity = string(pubkey) // Will be converted to hex in handler
|
||||
} else {
|
||||
identity = remoteAddr
|
||||
}
|
||||
|
||||
return s.bandwidthLimiter.CheckAndConsume(identity, sizeBytes)
|
||||
}
|
||||
|
||||
// BaseURLKey is the context key for the base URL (exported for use by app handler)
|
||||
type BaseURLKey struct{}
|
||||
|
||||
|
||||
@@ -965,14 +965,17 @@ func kindInRange(kind int, rangeStr string) bool {
|
||||
// kindInCategory checks if a kind belongs to a predefined category
|
||||
func kindInCategory(kind int, category string) bool {
|
||||
categories := map[string][]int{
|
||||
"social": {0, 1, 3, 6, 7, 10002},
|
||||
"dm": {4, 14, 1059},
|
||||
"longform": {30023, 30024},
|
||||
"media": {1063, 20, 21, 22},
|
||||
"marketplace": {30017, 30018, 30019, 30020, 1021, 1022},
|
||||
"groups_nip29": {9, 10, 11, 12, 9000, 9001, 9002, 39000, 39001, 39002},
|
||||
"groups_nip72": {34550, 1111, 4550},
|
||||
"lists": {10000, 10001, 10003, 30000, 30001, 30003},
|
||||
"social": {0, 1, 3, 6, 7, 10002},
|
||||
"dm": {4, 14, 1059},
|
||||
"longform": {30023, 30024},
|
||||
"media": {1063, 20, 21, 22},
|
||||
"marketplace": {30017, 30018, 30019, 30020, 1021, 1022}, // Legacy alias
|
||||
"marketplace_nip15": {30017, 30018, 30019, 30020, 1021, 1022},
|
||||
"marketplace_nip99": {30402, 30403, 30405, 30406, 31555}, // NIP-99/Gamma Markets (Plebeian Market)
|
||||
"order_communication": {16, 17}, // Gamma Markets order messages
|
||||
"groups_nip29": {9, 10, 11, 12, 9000, 9001, 9002, 39000, 39001, 39002},
|
||||
"groups_nip72": {34550, 1111, 4550},
|
||||
"lists": {10000, 10001, 10003, 30000, 30001, 30003},
|
||||
}
|
||||
|
||||
kinds, ok := categories[category]
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"sort"
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/errorf"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/database/indexes"
|
||||
types2 "next.orly.dev/pkg/database/indexes/types"
|
||||
@@ -44,6 +45,12 @@ func NormalizeTagValueForHash(key byte, valueBytes []byte) []byte {
|
||||
func CreateIdHashFromData(data []byte) (i *types2.IdHash, err error) {
|
||||
i = new(types2.IdHash)
|
||||
|
||||
// Skip empty data to avoid noisy errors
|
||||
if len(data) == 0 {
|
||||
err = errorf.E("CreateIdHashFromData: empty ID provided")
|
||||
return
|
||||
}
|
||||
|
||||
// If data looks like hex string and has the right length for hex-encoded
|
||||
// sha256
|
||||
if len(data) == 64 {
|
||||
@@ -95,6 +102,11 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
|
||||
// should be an error, but convention just ignores it.
|
||||
if f.Ids.Len() > 0 {
|
||||
for _, id := range f.Ids.T {
|
||||
// Skip empty IDs - some filters have empty ID values
|
||||
if len(id) == 0 {
|
||||
log.D.F("GetIndexesFromFilter: skipping empty ID in filter (ids=%d)", f.Ids.Len())
|
||||
continue
|
||||
}
|
||||
if err = func() (err error) {
|
||||
var i *types2.IdHash
|
||||
if i, err = CreateIdHashFromData(id); chk.E(err) {
|
||||
|
||||
@@ -20,6 +20,10 @@ import (
|
||||
|
||||
func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
|
||||
// log.T.F("GetSerialById: input id=%s", hex.Enc(id))
|
||||
if len(id) == 0 {
|
||||
err = errorf.E("GetSerialById: called with empty ID")
|
||||
return
|
||||
}
|
||||
var idxs []Range
|
||||
if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.NewFromBytesSlice(id)}); chk.E(err) {
|
||||
return
|
||||
@@ -102,6 +106,10 @@ func (d *D) GetSerialsByIdsWithFilter(
|
||||
|
||||
// Process each ID sequentially
|
||||
for _, id := range ids.T {
|
||||
// Skip empty IDs
|
||||
if len(id) == 0 {
|
||||
continue
|
||||
}
|
||||
// idHex := hex.Enc(id)
|
||||
|
||||
// Get the index prefix for this ID
|
||||
|
||||
@@ -24,8 +24,8 @@ func (i *IdHash) Set(idh []byte) {
|
||||
func (i *IdHash) FromId(id []byte) (err error) {
|
||||
if len(id) != sha256.Size {
|
||||
err = errorf.E(
|
||||
"FromId: invalid ID length, got %d require %d", len(id),
|
||||
sha256.Size,
|
||||
"FromId: invalid ID length, got %d require %d (data=%x)", len(id),
|
||||
sha256.Size, id,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package routing
|
||||
import (
|
||||
"git.mleku.dev/mleku/nostr/encoders/event"
|
||||
"git.mleku.dev/mleku/nostr/encoders/kind"
|
||||
"lol.mleku.dev/log"
|
||||
)
|
||||
|
||||
// Publisher abstracts event delivery to subscribers.
|
||||
@@ -22,6 +23,7 @@ func IsEphemeral(k uint16) bool {
|
||||
// - Are immediately delivered to subscribers
|
||||
func MakeEphemeralHandler(publisher Publisher) Handler {
|
||||
return func(ev *event.E, authedPubkey []byte) Result {
|
||||
log.I.F("ephemeral handler received event kind %d, id %0x", ev.Kind, ev.ID[:8])
|
||||
// Clone and deliver immediately without persistence
|
||||
cloned := ev.Clone()
|
||||
go publisher.Deliver(cloned)
|
||||
|
||||
@@ -10,12 +10,15 @@ import (
|
||||
"git.mleku.dev/mleku/nostr/encoders/filter"
|
||||
"git.mleku.dev/mleku/nostr/encoders/hex"
|
||||
"git.mleku.dev/mleku/nostr/encoders/tag"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/database/indexes/types"
|
||||
"next.orly.dev/pkg/interfaces/store"
|
||||
)
|
||||
|
||||
// QueryEvents retrieves events matching the given filter
|
||||
func (n *N) QueryEvents(c context.Context, f *filter.F) (evs event.S, err error) {
|
||||
log.T.F("Neo4j QueryEvents called with filter: kinds=%v, authors=%d, tags=%v",
|
||||
f.Kinds != nil, f.Authors != nil && len(f.Authors.T) > 0, f.Tags != nil)
|
||||
return n.QueryEventsWithOptions(c, f, false, false)
|
||||
}
|
||||
|
||||
@@ -101,6 +104,7 @@ func (n *N) buildCypherQuery(f *filter.F, includeDeleteEvents bool) (string, map
|
||||
// Normalize to lowercase hex using our utility function
|
||||
// This handles both binary-encoded pubkeys and hex string pubkeys (including uppercase)
|
||||
hexAuthor := NormalizePubkeyHex(author)
|
||||
log.T.F("Neo4j author filter: raw_len=%d, normalized=%q", len(author), hexAuthor)
|
||||
if hexAuthor == "" {
|
||||
continue
|
||||
}
|
||||
@@ -130,30 +134,39 @@ func (n *N) buildCypherQuery(f *filter.F, includeDeleteEvents bool) (string, map
|
||||
}
|
||||
|
||||
// Time range filters - for temporal queries
|
||||
if f.Since != nil {
|
||||
// Note: Check both pointer and value - a zero timestamp (Unix epoch 1970) is almost
|
||||
// certainly not a valid constraint as Nostr events didn't exist then
|
||||
if f.Since != nil && f.Since.V > 0 {
|
||||
params["since"] = f.Since.V
|
||||
whereClauses = append(whereClauses, "e.created_at >= $since")
|
||||
}
|
||||
if f.Until != nil {
|
||||
if f.Until != nil && f.Until.V > 0 {
|
||||
params["until"] = f.Until.V
|
||||
whereClauses = append(whereClauses, "e.created_at <= $until")
|
||||
}
|
||||
|
||||
// Tag filters - this is where Neo4j's graph capabilities shine
|
||||
// We can efficiently traverse tag relationships
|
||||
// We use EXISTS subqueries to efficiently filter events by tags
|
||||
// This ensures events are only returned if they have matching tags
|
||||
tagIndex := 0
|
||||
if f.Tags != nil {
|
||||
for _, tagValues := range *f.Tags {
|
||||
if len(tagValues.T) > 0 {
|
||||
tagVarName := fmt.Sprintf("t%d", tagIndex)
|
||||
tagTypeParam := fmt.Sprintf("tagType_%d", tagIndex)
|
||||
tagValuesParam := fmt.Sprintf("tagValues_%d", tagIndex)
|
||||
|
||||
// Add tag relationship to MATCH clause
|
||||
matchClause += fmt.Sprintf(" OPTIONAL MATCH (e)-[:TAGGED_WITH]->(%s:Tag)", tagVarName)
|
||||
// The first element is the tag type (e.g., "e", "p", "#e", "#p", etc.)
|
||||
// Filter tags may have "#" prefix (e.g., "#d" for d-tag filters)
|
||||
// Event tags are stored without prefix, so we must strip it
|
||||
tagTypeBytes := tagValues.T[0]
|
||||
var tagType string
|
||||
if len(tagTypeBytes) > 0 && tagTypeBytes[0] == '#' {
|
||||
tagType = string(tagTypeBytes[1:]) // Strip "#" prefix
|
||||
} else {
|
||||
tagType = string(tagTypeBytes)
|
||||
}
|
||||
|
||||
// The first element is the tag type (e.g., "e", "p", etc.)
|
||||
tagType := string(tagValues.T[0])
|
||||
log.T.F("Neo4j tag filter: type=%q (raw=%q, len=%d)", tagType, string(tagTypeBytes), len(tagTypeBytes))
|
||||
|
||||
// Convert remaining tag values to strings (skip first element which is the type)
|
||||
// For e/p tags, use NormalizePubkeyHex to handle binary encoding and uppercase hex
|
||||
@@ -162,26 +175,34 @@ func (n *N) buildCypherQuery(f *filter.F, includeDeleteEvents bool) (string, map
|
||||
if tagType == "e" || tagType == "p" {
|
||||
// Normalize e/p tag values to lowercase hex (handles binary encoding)
|
||||
normalized := NormalizePubkeyHex(tv)
|
||||
log.T.F("Neo4j tag filter: %s-tag value normalized: %q (raw len=%d, binary=%v)",
|
||||
tagType, normalized, len(tv), IsBinaryEncoded(tv))
|
||||
if normalized != "" {
|
||||
tagValueStrings = append(tagValueStrings, normalized)
|
||||
}
|
||||
} else {
|
||||
// For other tags, use direct string conversion
|
||||
tagValueStrings = append(tagValueStrings, string(tv))
|
||||
val := string(tv)
|
||||
log.T.F("Neo4j tag filter: %s-tag value: %q (len=%d)", tagType, val, len(val))
|
||||
tagValueStrings = append(tagValueStrings, val)
|
||||
}
|
||||
}
|
||||
|
||||
// Skip if no valid values after normalization
|
||||
if len(tagValueStrings) == 0 {
|
||||
log.W.F("Neo4j tag filter: no valid values for tag type %q, skipping", tagType)
|
||||
continue
|
||||
}
|
||||
|
||||
// Add WHERE conditions for this tag
|
||||
log.T.F("Neo4j tag filter: type=%s, values=%v", tagType, tagValueStrings)
|
||||
|
||||
// Use EXISTS subquery to filter events that have matching tags
|
||||
// This is more correct than OPTIONAL MATCH because it requires the tag to exist
|
||||
params[tagTypeParam] = tagType
|
||||
params[tagValuesParam] = tagValueStrings
|
||||
whereClauses = append(whereClauses,
|
||||
fmt.Sprintf("(%s.type = $%s AND %s.value IN $%s)",
|
||||
tagVarName, tagTypeParam, tagVarName, tagValuesParam))
|
||||
fmt.Sprintf("EXISTS { MATCH (e)-[:TAGGED_WITH]->(t:Tag) WHERE t.type = $%s AND t.value IN $%s }",
|
||||
tagTypeParam, tagValuesParam))
|
||||
|
||||
tagIndex++
|
||||
}
|
||||
@@ -248,6 +269,26 @@ RETURN e.id AS id,
|
||||
// Combine all parts
|
||||
cypher := matchClause + whereClause + returnClause + orderClause + limitClause
|
||||
|
||||
// Log the generated query for debugging
|
||||
log.T.F("Neo4j query: %s", cypher)
|
||||
// Log params at trace level for debugging
|
||||
var paramSummary strings.Builder
|
||||
for k, v := range params {
|
||||
switch val := v.(type) {
|
||||
case []string:
|
||||
if len(val) <= 3 {
|
||||
paramSummary.WriteString(fmt.Sprintf("%s: %v ", k, val))
|
||||
} else {
|
||||
paramSummary.WriteString(fmt.Sprintf("%s: [%d values] ", k, len(val)))
|
||||
}
|
||||
case []int64:
|
||||
paramSummary.WriteString(fmt.Sprintf("%s: %v ", k, val))
|
||||
default:
|
||||
paramSummary.WriteString(fmt.Sprintf("%s: %v ", k, v))
|
||||
}
|
||||
}
|
||||
log.T.F("Neo4j params: %s", paramSummary.String())
|
||||
|
||||
return cypher, params
|
||||
}
|
||||
|
||||
@@ -300,19 +341,17 @@ func (n *N) parseEventsFromResult(result *CollectedResult) ([]*event.E, error) {
|
||||
_ = tags.UnmarshalJSON([]byte(tagsStr))
|
||||
}
|
||||
|
||||
// Create event
|
||||
// Create event with decoded binary fields
|
||||
e := &event.E{
|
||||
ID: id,
|
||||
Pubkey: pubkey,
|
||||
Kind: uint16(kind),
|
||||
CreatedAt: createdAt,
|
||||
Content: []byte(content),
|
||||
Tags: tags,
|
||||
Sig: sig,
|
||||
}
|
||||
|
||||
// Copy fixed-size arrays
|
||||
copy(e.ID[:], id)
|
||||
copy(e.Sig[:], sig)
|
||||
copy(e.Pubkey[:], pubkey)
|
||||
|
||||
events = append(events, e)
|
||||
}
|
||||
|
||||
|
||||
@@ -462,3 +462,584 @@ func TestCountEvents(t *testing.T) {
|
||||
|
||||
t.Logf("✓ Count events returned correct count: %d", count)
|
||||
}
|
||||
|
||||
// TestQueryEventsByTagWithHashPrefix tests that tag filters with "#" prefix work correctly.
|
||||
// This is a regression test for a bug where filter tags like "#d" were not being matched
|
||||
// because the "#" prefix wasn't being stripped before comparison with stored tags.
|
||||
func TestQueryEventsByTagWithHashPrefix(t *testing.T) {
|
||||
if testDB == nil {
|
||||
t.Skip("Neo4j not available")
|
||||
}
|
||||
|
||||
cleanTestDatabase()
|
||||
|
||||
ctx := context.Background()
|
||||
signer := createTestSignerLocal(t)
|
||||
baseTs := timestamp.Now().V
|
||||
|
||||
// Create events with d-tags (parameterized replaceable kind)
|
||||
createAndSaveEventLocal(t, ctx, signer, 30382, "Event with d=id1",
|
||||
tag.NewS(tag.NewFromAny("d", "id1")), baseTs)
|
||||
createAndSaveEventLocal(t, ctx, signer, 30382, "Event with d=id2",
|
||||
tag.NewS(tag.NewFromAny("d", "id2")), baseTs+1)
|
||||
createAndSaveEventLocal(t, ctx, signer, 30382, "Event with d=id3",
|
||||
tag.NewS(tag.NewFromAny("d", "id3")), baseTs+2)
|
||||
createAndSaveEventLocal(t, ctx, signer, 30382, "Event with d=other",
|
||||
tag.NewS(tag.NewFromAny("d", "other")), baseTs+3)
|
||||
|
||||
// Query with "#d" prefix (as clients send it) - should match events with d=id1
|
||||
evs, err := testDB.QueryEvents(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(30382)),
|
||||
Tags: tag.NewS(tag.NewFromAny("#d", "id1")),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query events with #d tag: %v", err)
|
||||
}
|
||||
|
||||
if len(evs) != 1 {
|
||||
t.Fatalf("Expected 1 event with d=id1, got %d", len(evs))
|
||||
}
|
||||
|
||||
// Verify the returned event has the correct d-tag
|
||||
dTag := evs[0].Tags.GetFirst([]byte("d"))
|
||||
if dTag == nil || string(dTag.Value()) != "id1" {
|
||||
t.Fatalf("Expected d=id1, got d=%s", dTag.Value())
|
||||
}
|
||||
|
||||
t.Logf("✓ Query with #d prefix returned correct event")
|
||||
}
|
||||
|
||||
// TestQueryEventsByTagMultipleValues tests that tag filters with multiple values
|
||||
// use OR logic (match events with ANY of the values).
|
||||
func TestQueryEventsByTagMultipleValues(t *testing.T) {
|
||||
if testDB == nil {
|
||||
t.Skip("Neo4j not available")
|
||||
}
|
||||
|
||||
cleanTestDatabase()
|
||||
|
||||
ctx := context.Background()
|
||||
signer := createTestSignerLocal(t)
|
||||
baseTs := timestamp.Now().V
|
||||
|
||||
// Create events with different d-tags
|
||||
createAndSaveEventLocal(t, ctx, signer, 30382, "Event A",
|
||||
tag.NewS(tag.NewFromAny("d", "target-1")), baseTs)
|
||||
createAndSaveEventLocal(t, ctx, signer, 30382, "Event B",
|
||||
tag.NewS(tag.NewFromAny("d", "target-2")), baseTs+1)
|
||||
createAndSaveEventLocal(t, ctx, signer, 30382, "Event C",
|
||||
tag.NewS(tag.NewFromAny("d", "target-3")), baseTs+2)
|
||||
createAndSaveEventLocal(t, ctx, signer, 30382, "Event D (not target)",
|
||||
tag.NewS(tag.NewFromAny("d", "other-value")), baseTs+3)
|
||||
createAndSaveEventLocal(t, ctx, signer, 30382, "Event E (no match)",
|
||||
tag.NewS(tag.NewFromAny("d", "different")), baseTs+4)
|
||||
|
||||
// Query with multiple d-tag values using "#d" prefix
|
||||
// Should match events with d=target-1 OR d=target-2 OR d=target-3
|
||||
evs, err := testDB.QueryEvents(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(30382)),
|
||||
Tags: tag.NewS(tag.NewFromAny("#d", "target-1", "target-2", "target-3")),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query events with multiple #d values: %v", err)
|
||||
}
|
||||
|
||||
if len(evs) != 3 {
|
||||
t.Fatalf("Expected 3 events matching the d-tag values, got %d", len(evs))
|
||||
}
|
||||
|
||||
// Verify returned events have correct d-tags
|
||||
validDTags := map[string]bool{"target-1": false, "target-2": false, "target-3": false}
|
||||
for _, ev := range evs {
|
||||
dTag := ev.Tags.GetFirst([]byte("d"))
|
||||
if dTag == nil {
|
||||
t.Fatalf("Event missing d-tag")
|
||||
}
|
||||
dValue := string(dTag.Value())
|
||||
if _, ok := validDTags[dValue]; !ok {
|
||||
t.Fatalf("Unexpected d-tag value: %s", dValue)
|
||||
}
|
||||
validDTags[dValue] = true
|
||||
}
|
||||
|
||||
// Verify all expected d-tags were found
|
||||
for dValue, found := range validDTags {
|
||||
if !found {
|
||||
t.Fatalf("Expected to find event with d=%s", dValue)
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("✓ Query with multiple #d values returned correct events")
|
||||
}
|
||||
|
||||
// TestQueryEventsByTagNoMatch tests that tag filters correctly return no results
|
||||
// when no events match the filter.
|
||||
func TestQueryEventsByTagNoMatch(t *testing.T) {
|
||||
if testDB == nil {
|
||||
t.Skip("Neo4j not available")
|
||||
}
|
||||
|
||||
cleanTestDatabase()
|
||||
|
||||
ctx := context.Background()
|
||||
signer := createTestSignerLocal(t)
|
||||
baseTs := timestamp.Now().V
|
||||
|
||||
// Create events with d-tags
|
||||
createAndSaveEventLocal(t, ctx, signer, 30382, "Event",
|
||||
tag.NewS(tag.NewFromAny("d", "existing-value")), baseTs)
|
||||
|
||||
// Query for d-tag value that doesn't exist
|
||||
evs, err := testDB.QueryEvents(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(30382)),
|
||||
Tags: tag.NewS(tag.NewFromAny("#d", "non-existent-value")),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query events: %v", err)
|
||||
}
|
||||
|
||||
if len(evs) != 0 {
|
||||
t.Fatalf("Expected 0 events for non-matching d-tag, got %d", len(evs))
|
||||
}
|
||||
|
||||
t.Logf("✓ Query with non-matching #d value returned no events")
|
||||
}
|
||||
|
||||
// TestQueryEventsByTagWithKindAndAuthor tests the combination of kind, author, and tag filters.
|
||||
// This is the specific case reported by the user with kind 30382.
|
||||
func TestQueryEventsByTagWithKindAndAuthor(t *testing.T) {
|
||||
if testDB == nil {
|
||||
t.Skip("Neo4j not available")
|
||||
}
|
||||
|
||||
cleanTestDatabase()
|
||||
|
||||
ctx := context.Background()
|
||||
alice := createTestSignerLocal(t)
|
||||
bob := createTestSignerLocal(t)
|
||||
baseTs := timestamp.Now().V
|
||||
|
||||
// Create events from different authors with d-tags
|
||||
createAndSaveEventLocal(t, ctx, alice, 30382, "Alice target 1",
|
||||
tag.NewS(tag.NewFromAny("d", "card-1")), baseTs)
|
||||
createAndSaveEventLocal(t, ctx, alice, 30382, "Alice target 2",
|
||||
tag.NewS(tag.NewFromAny("d", "card-2")), baseTs+1)
|
||||
createAndSaveEventLocal(t, ctx, alice, 30382, "Alice other",
|
||||
tag.NewS(tag.NewFromAny("d", "other-card")), baseTs+2)
|
||||
createAndSaveEventLocal(t, ctx, bob, 30382, "Bob target 1",
|
||||
tag.NewS(tag.NewFromAny("d", "card-1")), baseTs+3) // Same d-tag as Alice but different author
|
||||
|
||||
// Query for Alice's events with specific d-tags
|
||||
evs, err := testDB.QueryEvents(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(30382)),
|
||||
Authors: tag.NewFromBytesSlice(alice.Pub()),
|
||||
Tags: tag.NewS(tag.NewFromAny("#d", "card-1", "card-2")),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query events: %v", err)
|
||||
}
|
||||
|
||||
// Should only return Alice's 2 events, not Bob's even though he has card-1
|
||||
if len(evs) != 2 {
|
||||
t.Fatalf("Expected 2 events from Alice with matching d-tags, got %d", len(evs))
|
||||
}
|
||||
|
||||
alicePubkey := hex.Enc(alice.Pub())
|
||||
for _, ev := range evs {
|
||||
if hex.Enc(ev.Pubkey[:]) != alicePubkey {
|
||||
t.Fatalf("Expected author %s, got %s", alicePubkey, hex.Enc(ev.Pubkey[:]))
|
||||
}
|
||||
dTag := ev.Tags.GetFirst([]byte("d"))
|
||||
dValue := string(dTag.Value())
|
||||
if dValue != "card-1" && dValue != "card-2" {
|
||||
t.Fatalf("Expected d=card-1 or card-2, got d=%s", dValue)
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("✓ Query with kind, author, and #d filter returned correct events")
|
||||
}
|
||||
|
||||
// TestBinaryTagFilterRegression tests that queries with #e and #p tags work correctly
|
||||
// even when tags are stored with binary-encoded values but filters come as hex strings.
|
||||
// This mirrors the Badger database test for binary tag handling.
|
||||
func TestBinaryTagFilterRegression(t *testing.T) {
|
||||
if testDB == nil {
|
||||
t.Skip("Neo4j not available")
|
||||
}
|
||||
|
||||
cleanTestDatabase()
|
||||
|
||||
ctx := context.Background()
|
||||
author := createTestSignerLocal(t)
|
||||
referenced := createTestSignerLocal(t)
|
||||
baseTs := timestamp.Now().V
|
||||
|
||||
// Create a referenced event to get a valid event ID for e-tag
|
||||
refEvent := createAndSaveEventLocal(t, ctx, referenced, 1, "Referenced event", nil, baseTs)
|
||||
|
||||
// Get hex representations
|
||||
refEventIdHex := hex.Enc(refEvent.ID)
|
||||
refPubkeyHex := hex.Enc(referenced.Pub())
|
||||
|
||||
// Create test event with e, p, d, and other tags
|
||||
testEvent := createAndSaveEventLocal(t, ctx, author, 30520, "Event with binary tags",
|
||||
tag.NewS(
|
||||
tag.NewFromAny("d", "test-d-value"),
|
||||
tag.NewFromAny("p", string(refPubkeyHex)),
|
||||
tag.NewFromAny("e", string(refEventIdHex)),
|
||||
tag.NewFromAny("t", "test-topic"),
|
||||
), baseTs+1)
|
||||
|
||||
testEventIdHex := hex.Enc(testEvent.ID)
|
||||
|
||||
// Test case 1: Query WITHOUT #e/#p tags (baseline - should work)
|
||||
t.Run("QueryWithoutEPTags", func(t *testing.T) {
|
||||
evs, err := testDB.QueryEvents(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(30520)),
|
||||
Authors: tag.NewFromBytesSlice(author.Pub()),
|
||||
Tags: tag.NewS(tag.NewFromAny("#d", "test-d-value")),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Query without e/p tags failed: %v", err)
|
||||
}
|
||||
|
||||
if len(evs) == 0 {
|
||||
t.Fatal("Expected to find event with d tag filter, got 0 results")
|
||||
}
|
||||
|
||||
found := false
|
||||
for _, ev := range evs {
|
||||
if hex.Enc(ev.ID) == testEventIdHex {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("Expected event ID %s not found", testEventIdHex)
|
||||
}
|
||||
})
|
||||
|
||||
// Test case 2: Query WITH #p tag
|
||||
t.Run("QueryWithPTag", func(t *testing.T) {
|
||||
evs, err := testDB.QueryEvents(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(30520)),
|
||||
Authors: tag.NewFromBytesSlice(author.Pub()),
|
||||
Tags: tag.NewS(
|
||||
tag.NewFromAny("#d", "test-d-value"),
|
||||
tag.NewFromAny("#p", string(refPubkeyHex)),
|
||||
),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Query with #p tag failed: %v", err)
|
||||
}
|
||||
|
||||
if len(evs) == 0 {
|
||||
t.Fatalf("REGRESSION: Expected to find event with #p tag filter, got 0 results")
|
||||
}
|
||||
})
|
||||
|
||||
// Test case 3: Query WITH #e tag
|
||||
t.Run("QueryWithETag", func(t *testing.T) {
|
||||
evs, err := testDB.QueryEvents(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(30520)),
|
||||
Authors: tag.NewFromBytesSlice(author.Pub()),
|
||||
Tags: tag.NewS(
|
||||
tag.NewFromAny("#d", "test-d-value"),
|
||||
tag.NewFromAny("#e", string(refEventIdHex)),
|
||||
),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Query with #e tag failed: %v", err)
|
||||
}
|
||||
|
||||
if len(evs) == 0 {
|
||||
t.Fatalf("REGRESSION: Expected to find event with #e tag filter, got 0 results")
|
||||
}
|
||||
})
|
||||
|
||||
// Test case 4: Query WITH BOTH #e AND #p tags
|
||||
t.Run("QueryWithBothEAndPTags", func(t *testing.T) {
|
||||
evs, err := testDB.QueryEvents(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(30520)),
|
||||
Authors: tag.NewFromBytesSlice(author.Pub()),
|
||||
Tags: tag.NewS(
|
||||
tag.NewFromAny("#d", "test-d-value"),
|
||||
tag.NewFromAny("#e", string(refEventIdHex)),
|
||||
tag.NewFromAny("#p", string(refPubkeyHex)),
|
||||
),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Query with both #e and #p tags failed: %v", err)
|
||||
}
|
||||
|
||||
if len(evs) == 0 {
|
||||
t.Fatalf("REGRESSION: Expected to find event with #e and #p tag filters, got 0 results")
|
||||
}
|
||||
})
|
||||
|
||||
t.Logf("✓ Binary tag filter regression tests passed")
|
||||
}
|
||||
|
||||
// TestParameterizedReplaceableEvents tests that parameterized replaceable events (kind 30000+)
|
||||
// are handled correctly - only the newest version should be returned in queries by kind/author/d-tag.
|
||||
func TestParameterizedReplaceableEvents(t *testing.T) {
|
||||
if testDB == nil {
|
||||
t.Skip("Neo4j not available")
|
||||
}
|
||||
|
||||
cleanTestDatabase()
|
||||
|
||||
ctx := context.Background()
|
||||
signer := createTestSignerLocal(t)
|
||||
baseTs := timestamp.Now().V
|
||||
|
||||
// Create older parameterized replaceable event
|
||||
createAndSaveEventLocal(t, ctx, signer, 30000, "Original event",
|
||||
tag.NewS(tag.NewFromAny("d", "test-param")), baseTs-7200) // 2 hours ago
|
||||
|
||||
// Create newer event with same kind/author/d-tag
|
||||
createAndSaveEventLocal(t, ctx, signer, 30000, "Newer event",
|
||||
tag.NewS(tag.NewFromAny("d", "test-param")), baseTs-3600) // 1 hour ago
|
||||
|
||||
// Create newest event with same kind/author/d-tag
|
||||
newestEvent := createAndSaveEventLocal(t, ctx, signer, 30000, "Newest event",
|
||||
tag.NewS(tag.NewFromAny("d", "test-param")), baseTs) // Now
|
||||
|
||||
// Query for events - should only return the newest one
|
||||
evs, err := testDB.QueryEvents(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(30000)),
|
||||
Authors: tag.NewFromBytesSlice(signer.Pub()),
|
||||
Tags: tag.NewS(tag.NewFromAny("#d", "test-param")),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query parameterized replaceable events: %v", err)
|
||||
}
|
||||
|
||||
// Note: Neo4j backend may or may not automatically deduplicate replaceable events
|
||||
// depending on implementation. The important thing is that the newest is returned first.
|
||||
if len(evs) == 0 {
|
||||
t.Fatal("Expected at least 1 event")
|
||||
}
|
||||
|
||||
// Verify the first (most recent) event is the newest one
|
||||
if hex.Enc(evs[0].ID) != hex.Enc(newestEvent.ID) {
|
||||
t.Logf("Note: Expected newest event first, got different order")
|
||||
}
|
||||
|
||||
t.Logf("✓ Parameterized replaceable events test returned %d events", len(evs))
|
||||
}
|
||||
|
||||
// TestQueryForIds tests the QueryForIds method
|
||||
func TestQueryForIds(t *testing.T) {
|
||||
if testDB == nil {
|
||||
t.Skip("Neo4j not available")
|
||||
}
|
||||
|
||||
cleanTestDatabase()
|
||||
|
||||
ctx := context.Background()
|
||||
signer := createTestSignerLocal(t)
|
||||
baseTs := timestamp.Now().V
|
||||
|
||||
// Create test events
|
||||
ev1 := createAndSaveEventLocal(t, ctx, signer, 1, "Event 1", nil, baseTs)
|
||||
ev2 := createAndSaveEventLocal(t, ctx, signer, 1, "Event 2", nil, baseTs+1)
|
||||
createAndSaveEventLocal(t, ctx, signer, 7, "Reaction", nil, baseTs+2)
|
||||
|
||||
// Query for IDs of kind 1 events
|
||||
idPkTs, err := testDB.QueryForIds(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(1)),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query for IDs: %v", err)
|
||||
}
|
||||
|
||||
if len(idPkTs) != 2 {
|
||||
t.Fatalf("Expected 2 IDs for kind 1 events, got %d", len(idPkTs))
|
||||
}
|
||||
|
||||
// Verify IDs match our events
|
||||
foundIds := make(map[string]bool)
|
||||
for _, r := range idPkTs {
|
||||
foundIds[hex.Enc(r.Id)] = true
|
||||
}
|
||||
|
||||
if !foundIds[hex.Enc(ev1.ID)] {
|
||||
t.Error("Event 1 ID not found in results")
|
||||
}
|
||||
if !foundIds[hex.Enc(ev2.ID)] {
|
||||
t.Error("Event 2 ID not found in results")
|
||||
}
|
||||
|
||||
t.Logf("✓ QueryForIds returned correct IDs")
|
||||
}
|
||||
|
||||
// TestQueryForSerials tests the QueryForSerials method
|
||||
func TestQueryForSerials(t *testing.T) {
|
||||
if testDB == nil {
|
||||
t.Skip("Neo4j not available")
|
||||
}
|
||||
|
||||
cleanTestDatabase()
|
||||
|
||||
ctx := context.Background()
|
||||
signer := createTestSignerLocal(t)
|
||||
baseTs := timestamp.Now().V
|
||||
|
||||
// Create test events
|
||||
createAndSaveEventLocal(t, ctx, signer, 1, "Event 1", nil, baseTs)
|
||||
createAndSaveEventLocal(t, ctx, signer, 1, "Event 2", nil, baseTs+1)
|
||||
createAndSaveEventLocal(t, ctx, signer, 1, "Event 3", nil, baseTs+2)
|
||||
|
||||
// Query for serials
|
||||
serials, err := testDB.QueryForSerials(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(1)),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query for serials: %v", err)
|
||||
}
|
||||
|
||||
if len(serials) != 3 {
|
||||
t.Fatalf("Expected 3 serials, got %d", len(serials))
|
||||
}
|
||||
|
||||
t.Logf("✓ QueryForSerials returned %d serials", len(serials))
|
||||
}
|
||||
|
||||
// TestQueryEventsComplex tests complex filter combinations
|
||||
func TestQueryEventsComplex(t *testing.T) {
|
||||
if testDB == nil {
|
||||
t.Skip("Neo4j not available")
|
||||
}
|
||||
|
||||
cleanTestDatabase()
|
||||
|
||||
ctx := context.Background()
|
||||
alice := createTestSignerLocal(t)
|
||||
bob := createTestSignerLocal(t)
|
||||
baseTs := timestamp.Now().V
|
||||
|
||||
// Create diverse set of events
|
||||
createAndSaveEventLocal(t, ctx, alice, 1, "Alice note with bitcoin tag",
|
||||
tag.NewS(tag.NewFromAny("t", "bitcoin")), baseTs)
|
||||
createAndSaveEventLocal(t, ctx, alice, 1, "Alice note with nostr tag",
|
||||
tag.NewS(tag.NewFromAny("t", "nostr")), baseTs+1)
|
||||
createAndSaveEventLocal(t, ctx, alice, 7, "Alice reaction",
|
||||
nil, baseTs+2)
|
||||
createAndSaveEventLocal(t, ctx, bob, 1, "Bob note with bitcoin tag",
|
||||
tag.NewS(tag.NewFromAny("t", "bitcoin")), baseTs+3)
|
||||
|
||||
// Test: kinds + tags (no authors)
|
||||
t.Run("KindsAndTags", func(t *testing.T) {
|
||||
evs, err := testDB.QueryEvents(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(1)),
|
||||
Tags: tag.NewS(tag.NewFromAny("#t", "bitcoin")),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Query failed: %v", err)
|
||||
}
|
||||
if len(evs) != 2 {
|
||||
t.Fatalf("Expected 2 events with kind=1 and #t=bitcoin, got %d", len(evs))
|
||||
}
|
||||
})
|
||||
|
||||
// Test: authors + tags (no kinds)
|
||||
t.Run("AuthorsAndTags", func(t *testing.T) {
|
||||
evs, err := testDB.QueryEvents(ctx, &filter.F{
|
||||
Authors: tag.NewFromBytesSlice(alice.Pub()),
|
||||
Tags: tag.NewS(tag.NewFromAny("#t", "bitcoin")),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Query failed: %v", err)
|
||||
}
|
||||
if len(evs) != 1 {
|
||||
t.Fatalf("Expected 1 event from Alice with #t=bitcoin, got %d", len(evs))
|
||||
}
|
||||
})
|
||||
|
||||
// Test: kinds + authors (no tags)
|
||||
t.Run("KindsAndAuthors", func(t *testing.T) {
|
||||
evs, err := testDB.QueryEvents(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(1)),
|
||||
Authors: tag.NewFromBytesSlice(alice.Pub()),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Query failed: %v", err)
|
||||
}
|
||||
if len(evs) != 2 {
|
||||
t.Fatalf("Expected 2 kind=1 events from Alice, got %d", len(evs))
|
||||
}
|
||||
})
|
||||
|
||||
// Test: all three filters
|
||||
t.Run("AllFilters", func(t *testing.T) {
|
||||
evs, err := testDB.QueryEvents(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(1)),
|
||||
Authors: tag.NewFromBytesSlice(alice.Pub()),
|
||||
Tags: tag.NewS(tag.NewFromAny("#t", "nostr")),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Query failed: %v", err)
|
||||
}
|
||||
if len(evs) != 1 {
|
||||
t.Fatalf("Expected 1 event (Alice kind=1 #t=nostr), got %d", len(evs))
|
||||
}
|
||||
})
|
||||
|
||||
t.Logf("✓ Complex filter combination tests passed")
|
||||
}
|
||||
|
||||
// TestQueryEventsMultipleTagTypes tests filtering with multiple different tag types
|
||||
func TestQueryEventsMultipleTagTypes(t *testing.T) {
|
||||
if testDB == nil {
|
||||
t.Skip("Neo4j not available")
|
||||
}
|
||||
|
||||
cleanTestDatabase()
|
||||
|
||||
ctx := context.Background()
|
||||
signer := createTestSignerLocal(t)
|
||||
baseTs := timestamp.Now().V
|
||||
|
||||
// Create events with multiple tag types
|
||||
createAndSaveEventLocal(t, ctx, signer, 30382, "Event with d and client tags",
|
||||
tag.NewS(
|
||||
tag.NewFromAny("d", "user-1"),
|
||||
tag.NewFromAny("client", "app-a"),
|
||||
), baseTs)
|
||||
|
||||
createAndSaveEventLocal(t, ctx, signer, 30382, "Event with d and different client",
|
||||
tag.NewS(
|
||||
tag.NewFromAny("d", "user-2"),
|
||||
tag.NewFromAny("client", "app-b"),
|
||||
), baseTs+1)
|
||||
|
||||
createAndSaveEventLocal(t, ctx, signer, 30382, "Event with only d tag",
|
||||
tag.NewS(
|
||||
tag.NewFromAny("d", "user-3"),
|
||||
), baseTs+2)
|
||||
|
||||
// Query with multiple tag types (should AND them together)
|
||||
evs, err := testDB.QueryEvents(ctx, &filter.F{
|
||||
Kinds: kind.NewS(kind.New(30382)),
|
||||
Tags: tag.NewS(
|
||||
tag.NewFromAny("#d", "user-1", "user-2"),
|
||||
tag.NewFromAny("#client", "app-a"),
|
||||
),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Query with multiple tag types failed: %v", err)
|
||||
}
|
||||
|
||||
// Should match only the first event (user-1 with app-a)
|
||||
if len(evs) != 1 {
|
||||
t.Fatalf("Expected 1 event matching both #d and #client, got %d", len(evs))
|
||||
}
|
||||
|
||||
dTag := evs[0].Tags.GetFirst([]byte("d"))
|
||||
if string(dTag.Value()) != "user-1" {
|
||||
t.Fatalf("Expected d=user-1, got d=%s", dTag.Value())
|
||||
}
|
||||
|
||||
t.Logf("✓ Multiple tag types filter test passed")
|
||||
}
|
||||
|
||||
@@ -54,3 +54,11 @@ func MonitorFromNeo4jDriver(
|
||||
) loadmonitor.Monitor {
|
||||
return NewNeo4jMonitor(driver, querySem, maxConcurrency, 100*time.Millisecond)
|
||||
}
|
||||
|
||||
// NewMemoryOnlyLimiter creates a rate limiter that only monitors process memory.
|
||||
// Useful for database backends that don't have their own load metrics (e.g., BBolt).
|
||||
// Since BBolt uses memory-mapped IO, memory pressure is still relevant.
|
||||
func NewMemoryOnlyLimiter(config Config) *Limiter {
|
||||
monitor := NewMemoryMonitor(100 * time.Millisecond)
|
||||
return NewLimiter(config, monitor)
|
||||
}
|
||||
|
||||
@@ -377,29 +377,26 @@ func (l *Limiter) ComputeDelay(opType OperationType) time.Duration {
|
||||
|
||||
// In emergency mode, apply progressive throttling for writes
|
||||
if inEmergency {
|
||||
// Calculate how far above recovery threshold we are
|
||||
// At emergency threshold, add 1x normal delay
|
||||
// For every additional 10% above emergency, double the delay
|
||||
excessPressure := metrics.MemoryPressure - l.config.RecoveryThreshold
|
||||
if excessPressure > 0 {
|
||||
// Progressive multiplier: starts at 2x, doubles every 10% excess
|
||||
multiplier := 2.0
|
||||
for excess := excessPressure; excess > 0.1; excess -= 0.1 {
|
||||
multiplier *= 2
|
||||
}
|
||||
|
||||
emergencyDelaySec := delaySec * multiplier
|
||||
maxEmergencySec := float64(l.config.EmergencyMaxDelayMs) / 1000.0
|
||||
|
||||
if emergencyDelaySec > maxEmergencySec {
|
||||
emergencyDelaySec = maxEmergencySec
|
||||
}
|
||||
// Minimum emergency delay of 100ms to allow other operations
|
||||
if emergencyDelaySec < 0.1 {
|
||||
emergencyDelaySec = 0.1
|
||||
}
|
||||
delaySec = emergencyDelaySec
|
||||
// Calculate how far above emergency threshold we are
|
||||
// Linear scaling: multiplier = 1 + (excess * 5)
|
||||
// At emergency threshold: 1x, at +20% above: 2x, at +40% above: 3x
|
||||
excessPressure := metrics.MemoryPressure - l.config.EmergencyThreshold
|
||||
if excessPressure < 0 {
|
||||
excessPressure = 0
|
||||
}
|
||||
multiplier := 1.0 + excessPressure*5.0
|
||||
|
||||
emergencyDelaySec := delaySec * multiplier
|
||||
maxEmergencySec := float64(l.config.EmergencyMaxDelayMs) / 1000.0
|
||||
|
||||
if emergencyDelaySec > maxEmergencySec {
|
||||
emergencyDelaySec = maxEmergencySec
|
||||
}
|
||||
// Minimum emergency delay of 100ms to allow other operations
|
||||
if emergencyDelaySec < 0.1 {
|
||||
emergencyDelaySec = 0.1
|
||||
}
|
||||
delaySec = emergencyDelaySec
|
||||
}
|
||||
|
||||
if delaySec > 0 {
|
||||
|
||||
214
pkg/ratelimit/memory_monitor.go
Normal file
214
pkg/ratelimit/memory_monitor.go
Normal file
@@ -0,0 +1,214 @@
|
||||
//go:build !(js && wasm)
|
||||
|
||||
package ratelimit
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"next.orly.dev/pkg/interfaces/loadmonitor"
|
||||
)
|
||||
|
||||
// MemoryMonitor is a simple load monitor that only tracks process memory.
|
||||
// Used for database backends that don't have their own load metrics (e.g., BBolt).
|
||||
type MemoryMonitor struct {
|
||||
// Configuration
|
||||
pollInterval time.Duration
|
||||
targetBytes atomic.Uint64
|
||||
|
||||
// State
|
||||
running atomic.Bool
|
||||
stopChan chan struct{}
|
||||
doneChan chan struct{}
|
||||
|
||||
// Metrics (protected by mutex)
|
||||
mu sync.RWMutex
|
||||
currentMetrics loadmonitor.Metrics
|
||||
|
||||
// Latency tracking
|
||||
queryLatencies []time.Duration
|
||||
writeLatencies []time.Duration
|
||||
latencyMu sync.Mutex
|
||||
|
||||
// Emergency mode
|
||||
emergencyThreshold float64 // e.g., 1.167 (target + 1/6)
|
||||
recoveryThreshold float64 // e.g., 0.833 (target - 1/6)
|
||||
inEmergency atomic.Bool
|
||||
}
|
||||
|
||||
// NewMemoryMonitor creates a memory-only load monitor.
|
||||
// pollInterval controls how often memory is sampled (recommended: 100ms).
|
||||
func NewMemoryMonitor(pollInterval time.Duration) *MemoryMonitor {
|
||||
m := &MemoryMonitor{
|
||||
pollInterval: pollInterval,
|
||||
stopChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
queryLatencies: make([]time.Duration, 0, 100),
|
||||
writeLatencies: make([]time.Duration, 0, 100),
|
||||
emergencyThreshold: 1.167, // Default: target + 1/6
|
||||
recoveryThreshold: 0.833, // Default: target - 1/6
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// GetMetrics returns the current load metrics.
|
||||
func (m *MemoryMonitor) GetMetrics() loadmonitor.Metrics {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.currentMetrics
|
||||
}
|
||||
|
||||
// RecordQueryLatency records a query latency sample.
|
||||
func (m *MemoryMonitor) RecordQueryLatency(latency time.Duration) {
|
||||
m.latencyMu.Lock()
|
||||
defer m.latencyMu.Unlock()
|
||||
|
||||
m.queryLatencies = append(m.queryLatencies, latency)
|
||||
if len(m.queryLatencies) > 100 {
|
||||
m.queryLatencies = m.queryLatencies[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// RecordWriteLatency records a write latency sample.
|
||||
func (m *MemoryMonitor) RecordWriteLatency(latency time.Duration) {
|
||||
m.latencyMu.Lock()
|
||||
defer m.latencyMu.Unlock()
|
||||
|
||||
m.writeLatencies = append(m.writeLatencies, latency)
|
||||
if len(m.writeLatencies) > 100 {
|
||||
m.writeLatencies = m.writeLatencies[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// SetMemoryTarget sets the target memory limit in bytes.
|
||||
func (m *MemoryMonitor) SetMemoryTarget(bytes uint64) {
|
||||
m.targetBytes.Store(bytes)
|
||||
}
|
||||
|
||||
// SetEmergencyThreshold sets the memory threshold for emergency mode.
|
||||
func (m *MemoryMonitor) SetEmergencyThreshold(threshold float64) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.emergencyThreshold = threshold
|
||||
}
|
||||
|
||||
// GetEmergencyThreshold returns the current emergency threshold.
|
||||
func (m *MemoryMonitor) GetEmergencyThreshold() float64 {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.emergencyThreshold
|
||||
}
|
||||
|
||||
// ForceEmergencyMode manually triggers emergency mode for a duration.
|
||||
func (m *MemoryMonitor) ForceEmergencyMode(duration time.Duration) {
|
||||
m.inEmergency.Store(true)
|
||||
go func() {
|
||||
time.Sleep(duration)
|
||||
m.inEmergency.Store(false)
|
||||
}()
|
||||
}
|
||||
|
||||
// Start begins background metric collection.
|
||||
func (m *MemoryMonitor) Start() <-chan struct{} {
|
||||
if m.running.Swap(true) {
|
||||
// Already running
|
||||
return m.doneChan
|
||||
}
|
||||
|
||||
go m.pollLoop()
|
||||
return m.doneChan
|
||||
}
|
||||
|
||||
// Stop halts background metric collection.
|
||||
func (m *MemoryMonitor) Stop() {
|
||||
if !m.running.Swap(false) {
|
||||
return
|
||||
}
|
||||
close(m.stopChan)
|
||||
<-m.doneChan
|
||||
}
|
||||
|
||||
// pollLoop continuously samples memory and updates metrics.
|
||||
func (m *MemoryMonitor) pollLoop() {
|
||||
defer close(m.doneChan)
|
||||
|
||||
ticker := time.NewTicker(m.pollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-m.stopChan:
|
||||
return
|
||||
case <-ticker.C:
|
||||
m.updateMetrics()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateMetrics samples current memory and updates the metrics.
|
||||
func (m *MemoryMonitor) updateMetrics() {
|
||||
target := m.targetBytes.Load()
|
||||
if target == 0 {
|
||||
target = 1 // Avoid division by zero
|
||||
}
|
||||
|
||||
// Get physical memory using the same method as other monitors
|
||||
procMem := ReadProcessMemoryStats()
|
||||
physicalMemBytes := procMem.PhysicalMemoryBytes()
|
||||
physicalMemMB := physicalMemBytes / (1024 * 1024)
|
||||
|
||||
// Calculate memory pressure
|
||||
memPressure := float64(physicalMemBytes) / float64(target)
|
||||
|
||||
// Check emergency mode thresholds
|
||||
m.mu.RLock()
|
||||
emergencyThreshold := m.emergencyThreshold
|
||||
recoveryThreshold := m.recoveryThreshold
|
||||
m.mu.RUnlock()
|
||||
|
||||
wasEmergency := m.inEmergency.Load()
|
||||
if memPressure > emergencyThreshold {
|
||||
m.inEmergency.Store(true)
|
||||
} else if memPressure < recoveryThreshold && wasEmergency {
|
||||
m.inEmergency.Store(false)
|
||||
}
|
||||
|
||||
// Calculate average latencies
|
||||
m.latencyMu.Lock()
|
||||
var avgQuery, avgWrite time.Duration
|
||||
if len(m.queryLatencies) > 0 {
|
||||
var total time.Duration
|
||||
for _, l := range m.queryLatencies {
|
||||
total += l
|
||||
}
|
||||
avgQuery = total / time.Duration(len(m.queryLatencies))
|
||||
}
|
||||
if len(m.writeLatencies) > 0 {
|
||||
var total time.Duration
|
||||
for _, l := range m.writeLatencies {
|
||||
total += l
|
||||
}
|
||||
avgWrite = total / time.Duration(len(m.writeLatencies))
|
||||
}
|
||||
m.latencyMu.Unlock()
|
||||
|
||||
// Update metrics
|
||||
m.mu.Lock()
|
||||
m.currentMetrics = loadmonitor.Metrics{
|
||||
MemoryPressure: memPressure,
|
||||
WriteLoad: 0, // No database-specific load metric
|
||||
ReadLoad: 0, // No database-specific load metric
|
||||
QueryLatency: avgQuery,
|
||||
WriteLatency: avgWrite,
|
||||
Timestamp: time.Now(),
|
||||
InEmergencyMode: m.inEmergency.Load(),
|
||||
CompactionPending: false, // BBolt doesn't have compaction
|
||||
PhysicalMemoryMB: physicalMemMB,
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// Ensure MemoryMonitor implements the required interfaces
|
||||
var _ loadmonitor.Monitor = (*MemoryMonitor)(nil)
|
||||
var _ loadmonitor.EmergencyModeMonitor = (*MemoryMonitor)(nil)
|
||||
@@ -1 +1 @@
|
||||
v0.48.11
|
||||
v0.49.2
|
||||
|
||||
Reference in New Issue
Block a user