From f22bf3f388428ab7a0d095f2c3e5a312a1be7bef Mon Sep 17 00:00:00 2001 From: mleku Date: Mon, 29 Dec 2025 02:18:05 +0200 Subject: [PATCH] Add Neo4j memory tuning config and query result limits (v0.43.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add Neo4j driver config options for memory management: - ORLY_NEO4J_MAX_CONN_POOL (default: 25) - connection pool size - ORLY_NEO4J_FETCH_SIZE (default: 1000) - records per batch - ORLY_NEO4J_MAX_TX_RETRY_SEC (default: 30) - transaction retry timeout - ORLY_NEO4J_QUERY_RESULT_LIMIT (default: 10000) - max results per query - Apply driver settings when creating Neo4j connection (pool size, fetch size, retry time) - Enforce query result limit as safety cap on all Cypher queries - Fix QueryForSerials and QueryForIds to preserve LIMIT clauses - Add comprehensive memory tuning documentation with sizing guidelines - Add NIP-46 signer-based authentication for bunker connections - Update go.mod with new dependencies Files modified: - app/config/config.go: Add Neo4j driver tuning config vars - main.go: Pass new config values to database factory - pkg/database/factory.go: Add Neo4j tuning fields to DatabaseConfig - pkg/database/factory_wasm.go: Mirror factory.go changes for WASM - pkg/neo4j/neo4j.go: Apply driver config, add getter methods - pkg/neo4j/query-events.go: Enforce query result limit, fix LIMIT preservation - docs/NEO4J_BACKEND.md: Add Memory Tuning section, update Docker example - CLAUDE.md: Add Neo4j memory tuning quick reference - app/handle-req.go: NIP-46 signer authentication - app/publisher.go: HasActiveNIP46Signer check - pkg/protocol/publish/publisher.go: NIP46SignerChecker interface - go.mod: Add dependencies 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- CLAUDE.md | 8 ++ app/config/config.go | 10 ++- app/handle-req.go | 46 ++++++++++ app/publisher.go | 61 +++++++++++++ docs/NEO4J_BACKEND.md | 145 ++++++++++++++++++++++++++++-- go.mod | 6 +- main.go | 33 ++++--- pkg/database/factory.go | 6 ++ pkg/database/factory_wasm.go | 6 ++ pkg/neo4j/neo4j.go | 89 +++++++++++++++--- pkg/neo4j/query-events.go | 44 +++++++-- pkg/protocol/publish/publisher.go | 5 ++ pkg/version/version | 2 +- 13 files changed, 412 insertions(+), 49 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 5669698..c65ea81 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -46,6 +46,14 @@ NOSTR_SECRET_KEY=nsec1... ./nurl https://relay.example.com/api/logs/clear | `ORLY_TLS_DOMAINS` | | Let's Encrypt domains | | `ORLY_AUTH_TO_WRITE` | false | Require auth for writes | +**Neo4j Memory Tuning** (only when `ORLY_DB_TYPE=neo4j`): + +| Variable | Default | Description | +|----------|---------|-------------| +| `ORLY_NEO4J_MAX_CONN_POOL` | 25 | Max connections (lower = less memory) | +| `ORLY_NEO4J_FETCH_SIZE` | 1000 | Records per batch (-1=all) | +| `ORLY_NEO4J_QUERY_RESULT_LIMIT` | 10000 | Max results per query (0=unlimited) | + See `./orly help` for all options. **All env vars MUST be defined in `app/config/config.go`**. ## Architecture diff --git a/app/config/config.go b/app/config/config.go index fdf7050..60578c0 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -105,6 +105,12 @@ type C struct { Neo4jUser string `env:"ORLY_NEO4J_USER" default:"neo4j" usage:"Neo4j authentication username (only used when ORLY_DB_TYPE=neo4j)"` Neo4jPassword string `env:"ORLY_NEO4J_PASSWORD" default:"password" usage:"Neo4j authentication password (only used when ORLY_DB_TYPE=neo4j)"` + // Neo4j driver tuning (memory and connection management) + Neo4jMaxConnPoolSize int `env:"ORLY_NEO4J_MAX_CONN_POOL" default:"25" usage:"max Neo4j connection pool size (driver default: 100, lower reduces memory)"` + Neo4jFetchSize int `env:"ORLY_NEO4J_FETCH_SIZE" default:"1000" usage:"max records per fetch batch (prevents memory overflow, -1=fetch all)"` + Neo4jMaxTxRetrySeconds int `env:"ORLY_NEO4J_MAX_TX_RETRY_SEC" default:"30" usage:"max seconds for retryable transaction attempts"` + Neo4jQueryResultLimit int `env:"ORLY_NEO4J_QUERY_RESULT_LIMIT" default:"10000" usage:"max results returned per query (prevents unbounded memory usage, 0=unlimited)"` + // Advanced database tuning SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"100000" usage:"max pubkeys to cache for compact event storage (default: 100000, ~3.2MB memory)"` SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"500000" usage:"max event IDs to cache for compact event storage (default: 500000, ~16MB memory)"` @@ -472,6 +478,7 @@ func (cfg *C) GetDatabaseConfigValues() ( serialCachePubkeys, serialCacheEventIds int, zstdLevel int, neo4jURI, neo4jUser, neo4jPassword string, + neo4jMaxConnPoolSize, neo4jFetchSize, neo4jMaxTxRetrySeconds, neo4jQueryResultLimit int, ) { // Parse query cache max age from string to duration queryCacheMaxAge = 5 * time.Minute // Default @@ -487,7 +494,8 @@ func (cfg *C) GetDatabaseConfigValues() ( cfg.QueryCacheDisabled, cfg.SerialCachePubkeys, cfg.SerialCacheEventIds, cfg.DBZSTDLevel, - cfg.Neo4jURI, cfg.Neo4jUser, cfg.Neo4jPassword + cfg.Neo4jURI, cfg.Neo4jUser, cfg.Neo4jPassword, + cfg.Neo4jMaxConnPoolSize, cfg.Neo4jFetchSize, cfg.Neo4jMaxTxRetrySeconds, cfg.Neo4jQueryResultLimit } // GetRateLimitConfigValues returns the rate limiting configuration values. diff --git a/app/handle-req.go b/app/handle-req.go index 7ce146d..c8d8ffb 100644 --- a/app/handle-req.go +++ b/app/handle-req.go @@ -27,6 +27,7 @@ import ( "next.orly.dev/pkg/policy" "next.orly.dev/pkg/protocol/graph" "next.orly.dev/pkg/protocol/nip43" + "next.orly.dev/pkg/protocol/publish" "git.mleku.dev/mleku/nostr/utils/normalize" "git.mleku.dev/mleku/nostr/utils/pointers" ) @@ -52,6 +53,51 @@ func (l *Listener) HandleReq(msg []byte) (err error) { ) }, ) + + // NIP-46 signer-based authentication: + // If client is not authenticated and requests kind 24133 with exactly one #p tag, + // check if there's an active signer subscription for that pubkey. + // If so, authenticate the client as that pubkey. + const kindNIP46 = 24133 + if len(l.authedPubkey.Load()) == 0 && len(*env.Filters) == 1 { + f := (*env.Filters)[0] + if f != nil && f.Kinds != nil && f.Kinds.Len() == 1 { + isNIP46Kind := false + for _, k := range f.Kinds.K { + if k.K == kindNIP46 { + isNIP46Kind = true + break + } + } + if isNIP46Kind && f.Tags != nil { + pTag := f.Tags.GetFirst([]byte("p")) + // Must have exactly one pubkey in the #p tag + if pTag != nil && pTag.Len() == 2 { + signerPubkey := pTag.Value() + // Convert to binary if hex + var signerPubkeyBin []byte + if len(signerPubkey) == 64 { + signerPubkeyBin, _ = hexenc.Dec(string(signerPubkey)) + } else if len(signerPubkey) == 32 { + signerPubkeyBin = signerPubkey + } + if len(signerPubkeyBin) == 32 { + // Check if there's an active signer for this pubkey + if socketPub := l.publishers.GetSocketPublisher(); socketPub != nil { + if checker, ok := socketPub.(publish.NIP46SignerChecker); ok { + if checker.HasActiveNIP46Signer(signerPubkeyBin) { + log.I.F("NIP-46 auth: client %s authenticated via active signer %s", + l.remote, hexenc.Enc(signerPubkeyBin)) + l.authedPubkey.Store(signerPubkeyBin) + } + } + } + } + } + } + } + } + // send a challenge to the client to auth if an ACL is active, auth is required, or AuthToWrite is enabled if len(l.authedPubkey.Load()) == 0 && (acl.Registry.Active.Load() != "none" || l.Config.AuthRequired || l.Config.AuthToWrite) { if err = authenvelope.NewChallengeWith(l.challenge.Load()). diff --git a/app/publisher.go b/app/publisher.go index 6eb70b7..2e6cbf8 100644 --- a/app/publisher.go +++ b/app/publisher.go @@ -320,6 +320,67 @@ func (p *P) removeSubscriber(ws *websocket.Conn) { delete(p.WriteChans, ws) } +// HasActiveNIP46Signer checks if there's an active subscription for kind 24133 +// where the given pubkey is involved (either as author filter or in #p tag filter). +// This is used to authenticate clients by proving a signer is connected for that pubkey. +func (p *P) HasActiveNIP46Signer(signerPubkey []byte) bool { + const kindNIP46 = 24133 + p.Mx.RLock() + defer p.Mx.RUnlock() + + for _, subs := range p.Map { + for _, sub := range subs { + if sub.S == nil { + continue + } + for _, f := range *sub.S { + if f == nil || f.Kinds == nil { + continue + } + // Check if filter is for kind 24133 + hasNIP46Kind := false + for _, k := range f.Kinds.K { + if k.K == kindNIP46 { + hasNIP46Kind = true + break + } + } + if !hasNIP46Kind { + continue + } + // Check if the signer pubkey matches the #p tag filter + if f.Tags != nil { + pTag := f.Tags.GetFirst([]byte("p")) + if pTag != nil && pTag.Len() >= 2 { + for i := 1; i < pTag.Len(); i++ { + tagValue := pTag.T[i] + // Compare - handle both binary and hex formats + if len(tagValue) == 32 && len(signerPubkey) == 32 { + if utils.FastEqual(tagValue, signerPubkey) { + return true + } + } else if len(tagValue) == 64 && len(signerPubkey) == 32 { + // tagValue is hex, signerPubkey is binary + if string(tagValue) == hex.Enc(signerPubkey) { + return true + } + } else if len(tagValue) == 32 && len(signerPubkey) == 64 { + // tagValue is binary, signerPubkey is hex + if hex.Enc(tagValue) == string(signerPubkey) { + return true + } + } else if utils.FastEqual(tagValue, signerPubkey) { + return true + } + } + } + } + } + } + } + return false +} + // canSeePrivateEvent checks if the authenticated user can see an event with a private tag func (p *P) canSeePrivateEvent( authedPubkey, privatePubkey []byte, remote string, diff --git a/docs/NEO4J_BACKEND.md b/docs/NEO4J_BACKEND.md index 3310d4f..3814a5b 100644 --- a/docs/NEO4J_BACKEND.md +++ b/docs/NEO4J_BACKEND.md @@ -194,6 +194,12 @@ ORLY_DB_TYPE="neo4j" # Data Directory (for Badger metadata storage) ORLY_DATA_DIR="~/.local/share/ORLY" + +# Neo4j Driver Tuning (Memory Management) +ORLY_NEO4J_MAX_CONN_POOL=25 # Max connections (default: 25, driver default: 100) +ORLY_NEO4J_FETCH_SIZE=1000 # Records per fetch batch (default: 1000, -1=all) +ORLY_NEO4J_MAX_TX_RETRY_SEC=30 # Max transaction retry time in seconds +ORLY_NEO4J_QUERY_RESULT_LIMIT=10000 # Max results per query (0=unlimited) ``` ### Example Docker Compose Setup @@ -209,6 +215,15 @@ services: environment: - NEO4J_AUTH=neo4j/password - NEO4J_PLUGINS=["apoc"] + # Memory tuning for production + - NEO4J_server_memory_heap_initial__size=512m + - NEO4J_server_memory_heap_max__size=1g + - NEO4J_server_memory_pagecache_size=512m + # Transaction memory limits (prevent runaway queries) + - NEO4J_dbms_memory_transaction_total__max=256m + - NEO4J_dbms_memory_transaction_max=64m + # Query timeout + - NEO4J_dbms_transaction_timeout=30s volumes: - neo4j_data:/data - neo4j_logs:/logs @@ -222,6 +237,10 @@ services: - ORLY_NEO4J_URI=bolt://neo4j:7687 - ORLY_NEO4J_USER=neo4j - ORLY_NEO4J_PASSWORD=password + # Driver tuning for memory management + - ORLY_NEO4J_MAX_CONN_POOL=25 + - ORLY_NEO4J_FETCH_SIZE=1000 + - ORLY_NEO4J_QUERY_RESULT_LIMIT=10000 depends_on: - neo4j @@ -248,15 +267,127 @@ volumes: - Composite: kind + created_at - Tag type + value -2. **Cache Configuration**: Configure Neo4j's page cache and heap size: -```conf -# neo4j.conf -dbms.memory.heap.initial_size=2G -dbms.memory.heap.max_size=4G -dbms.memory.pagecache.size=4G +2. **Cache Configuration**: Configure Neo4j's page cache and heap size (see Memory Tuning below) + +3. **Query Limits**: The relay automatically enforces `ORLY_NEO4J_QUERY_RESULT_LIMIT` (default: 10000) to prevent unbounded queries from exhausting memory + +## Memory Tuning + +Neo4j runs as a separate process (typically in Docker), so memory management involves both the relay driver settings and Neo4j server configuration. + +### Understanding Memory Layers + +1. **ORLY Relay Process** (~35MB RSS typical) + - Go driver connection pool + - Query result buffering + - Controlled by `ORLY_NEO4J_*` environment variables + +2. **Neo4j Server Process** (512MB-4GB+ depending on data) + - JVM heap for Java objects + - Page cache for graph data + - Transaction memory for query execution + - Controlled by `NEO4J_*` environment variables + +### Relay Driver Tuning (ORLY side) + +| Variable | Default | Description | +|----------|---------|-------------| +| `ORLY_NEO4J_MAX_CONN_POOL` | 25 | Max connections in pool. Lower = less memory, but may bottleneck under high load. Driver default is 100. | +| `ORLY_NEO4J_FETCH_SIZE` | 1000 | Records fetched per batch. Lower = less memory per query, more round trips. Set to -1 for all (risky). | +| `ORLY_NEO4J_MAX_TX_RETRY_SEC` | 30 | Max seconds to retry failed transactions. | +| `ORLY_NEO4J_QUERY_RESULT_LIMIT` | 10000 | Hard cap on results per query. Prevents unbounded queries. Set to 0 for unlimited (not recommended). | + +**Recommended settings for memory-constrained environments:** +```bash +ORLY_NEO4J_MAX_CONN_POOL=10 +ORLY_NEO4J_FETCH_SIZE=500 +ORLY_NEO4J_QUERY_RESULT_LIMIT=5000 ``` -3. **Query Limits**: Always use LIMIT in queries to prevent memory exhaustion +### Neo4j Server Tuning (Docker/neo4j.conf) + +**JVM Heap Memory** - For Java objects and query processing: +```bash +# Docker environment variables +NEO4J_server_memory_heap_initial__size=512m +NEO4J_server_memory_heap_max__size=1g + +# neo4j.conf equivalent +server.memory.heap.initial_size=512m +server.memory.heap.max_size=1g +``` + +**Page Cache** - For caching graph data from disk: +```bash +# Docker +NEO4J_server_memory_pagecache_size=512m + +# neo4j.conf +server.memory.pagecache.size=512m +``` + +**Transaction Memory Limits** - Prevent runaway queries: +```bash +# Docker +NEO4J_dbms_memory_transaction_total__max=256m # Global limit across all transactions +NEO4J_dbms_memory_transaction_max=64m # Per-transaction limit + +# neo4j.conf +dbms.memory.transaction.total.max=256m +db.memory.transaction.max=64m +``` + +**Query Timeout** - Kill long-running queries: +```bash +# Docker +NEO4J_dbms_transaction_timeout=30s + +# neo4j.conf +dbms.transaction.timeout=30s +``` + +### Memory Sizing Guidelines + +| Deployment Size | Heap | Page Cache | Total Neo4j | ORLY Pool | +|-----------------|------|------------|-------------|-----------| +| Development | 512m | 256m | ~1GB | 10 | +| Small relay (<100k events) | 1g | 512m | ~2GB | 25 | +| Medium relay (<1M events) | 2g | 1g | ~4GB | 50 | +| Large relay (>1M events) | 4g | 2g | ~8GB | 100 | + +**Formula for Page Cache:** +``` +Page Cache = Data Size on Disk × 1.2 +``` + +Use `neo4j-admin server memory-recommendation` inside the container to get tailored recommendations. + +### Monitoring Memory Usage + +**Check Neo4j memory from relay logs:** +```bash +# Driver config is logged at startup +grep "connecting to neo4j" /path/to/orly.log +# Output: connecting to neo4j at bolt://... (pool=25, fetch=1000, txRetry=30s) +``` + +**Check Neo4j server memory:** +```bash +# Inside Neo4j container +docker exec neo4j neo4j-admin server memory-recommendation + +# Or query via Cypher +CALL dbms.listPools() YIELD pool, heapMemoryUsed, heapMemoryUsedBytes +RETURN pool, heapMemoryUsed +``` + +**Monitor transaction memory:** +```cypher +CALL dbms.listTransactions() +YIELD transactionId, currentQuery, allocatedBytes +RETURN transactionId, currentQuery, allocatedBytes +ORDER BY allocatedBytes DESC +``` ## Implementation Details diff --git a/go.mod b/go.mod index 9420ab2..d6a6e04 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,9 @@ go 1.25.3 require ( git.mleku.dev/mleku/nostr v1.0.11 github.com/adrg/xdg v0.5.3 + github.com/alexflint/go-arg v1.6.1 github.com/aperturerobotics/go-indexeddb v0.2.3 + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 github.com/dgraph-io/badger/v4 v4.8.0 github.com/gorilla/websocket v1.5.3 github.com/hack-pad/safejs v0.1.1 @@ -23,6 +25,7 @@ require ( go.uber.org/atomic v1.11.0 golang.org/x/crypto v0.46.0 golang.org/x/lint v0.0.0-20241112194109-818c5a804067 + golang.zx2c4.com/wireguard v0.0.0-20250521234502-f333402bd9cb honnef.co/go/tools v0.6.1 lol.mleku.dev v1.0.5 lukechampine.com/frand v1.5.1 @@ -31,7 +34,6 @@ require ( require ( github.com/BurntSushi/toml v1.5.0 // indirect github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 // indirect - github.com/alexflint/go-arg v1.6.1 // indirect github.com/alexflint/go-scalar v1.2.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect @@ -42,7 +44,6 @@ require ( github.com/coder/websocket v1.8.12 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/crypto/blake256 v1.1.0 // indirect - github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect github.com/dgraph-io/ristretto/v2 v2.3.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/ebitengine/purego v0.9.1 // indirect @@ -82,7 +83,6 @@ require ( golang.org/x/time v0.7.0 // indirect golang.org/x/tools v0.40.0 // indirect golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect - golang.zx2c4.com/wireguard v0.0.0-20250521234502-f333402bd9cb // indirect google.golang.org/protobuf v1.36.10 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gvisor.dev/gvisor v0.0.0-20250503011706-39ed1f5ac29c // indirect diff --git a/main.go b/main.go index f899362..4f969d7 100644 --- a/main.go +++ b/main.go @@ -525,21 +525,26 @@ func makeDatabaseConfig(cfg *config.C) *database.DatabaseConfig { queryCacheDisabled, serialCachePubkeys, serialCacheEventIds, zstdLevel, - neo4jURI, neo4jUser, neo4jPassword := cfg.GetDatabaseConfigValues() + neo4jURI, neo4jUser, neo4jPassword, + neo4jMaxConnPoolSize, neo4jFetchSize, neo4jMaxTxRetrySeconds, neo4jQueryResultLimit := cfg.GetDatabaseConfigValues() return &database.DatabaseConfig{ - DataDir: dataDir, - LogLevel: logLevel, - BlockCacheMB: blockCacheMB, - IndexCacheMB: indexCacheMB, - QueryCacheSizeMB: queryCacheSizeMB, - QueryCacheMaxAge: queryCacheMaxAge, - QueryCacheDisabled: queryCacheDisabled, - SerialCachePubkeys: serialCachePubkeys, - SerialCacheEventIds: serialCacheEventIds, - ZSTDLevel: zstdLevel, - Neo4jURI: neo4jURI, - Neo4jUser: neo4jUser, - Neo4jPassword: neo4jPassword, + DataDir: dataDir, + LogLevel: logLevel, + BlockCacheMB: blockCacheMB, + IndexCacheMB: indexCacheMB, + QueryCacheSizeMB: queryCacheSizeMB, + QueryCacheMaxAge: queryCacheMaxAge, + QueryCacheDisabled: queryCacheDisabled, + SerialCachePubkeys: serialCachePubkeys, + SerialCacheEventIds: serialCacheEventIds, + ZSTDLevel: zstdLevel, + Neo4jURI: neo4jURI, + Neo4jUser: neo4jUser, + Neo4jPassword: neo4jPassword, + Neo4jMaxConnPoolSize: neo4jMaxConnPoolSize, + Neo4jFetchSize: neo4jFetchSize, + Neo4jMaxTxRetrySeconds: neo4jMaxTxRetrySeconds, + Neo4jQueryResultLimit: neo4jQueryResultLimit, } } diff --git a/pkg/database/factory.go b/pkg/database/factory.go index 936127c..72577ad 100644 --- a/pkg/database/factory.go +++ b/pkg/database/factory.go @@ -35,6 +35,12 @@ type DatabaseConfig struct { Neo4jURI string // ORLY_NEO4J_URI Neo4jUser string // ORLY_NEO4J_USER Neo4jPassword string // ORLY_NEO4J_PASSWORD + + // Neo4j driver tuning (memory and connection management) + Neo4jMaxConnPoolSize int // ORLY_NEO4J_MAX_CONN_POOL - max connection pool size (default: 25) + Neo4jFetchSize int // ORLY_NEO4J_FETCH_SIZE - max records per fetch batch (default: 1000) + Neo4jMaxTxRetrySeconds int // ORLY_NEO4J_MAX_TX_RETRY_SEC - max transaction retry time (default: 30) + Neo4jQueryResultLimit int // ORLY_NEO4J_QUERY_RESULT_LIMIT - max results per query (default: 10000, 0=unlimited) } // NewDatabase creates a database instance based on the specified type. diff --git a/pkg/database/factory_wasm.go b/pkg/database/factory_wasm.go index 003bdec..e23f4eb 100644 --- a/pkg/database/factory_wasm.go +++ b/pkg/database/factory_wasm.go @@ -31,6 +31,12 @@ type DatabaseConfig struct { Neo4jURI string // ORLY_NEO4J_URI Neo4jUser string // ORLY_NEO4J_USER Neo4jPassword string // ORLY_NEO4J_PASSWORD + + // Neo4j driver tuning (memory and connection management) + Neo4jMaxConnPoolSize int // ORLY_NEO4J_MAX_CONN_POOL - max connection pool size (default: 25) + Neo4jFetchSize int // ORLY_NEO4J_FETCH_SIZE - max records per fetch batch (default: 1000) + Neo4jMaxTxRetrySeconds int // ORLY_NEO4J_MAX_TX_RETRY_SEC - max transaction retry time (default: 30) + Neo4jQueryResultLimit int // ORLY_NEO4J_QUERY_RESULT_LIMIT - max results per query (default: 10000, 0=unlimited) } // NewDatabase creates a database instance based on the specified type. diff --git a/pkg/neo4j/neo4j.go b/pkg/neo4j/neo4j.go index 75a64d5..864bd11 100644 --- a/pkg/neo4j/neo4j.go +++ b/pkg/neo4j/neo4j.go @@ -20,9 +20,14 @@ import ( "next.orly.dev/pkg/utils/apputil" ) -// maxConcurrentQueries limits the number of concurrent Neo4j queries to prevent -// authentication rate limiting and connection exhaustion -const maxConcurrentQueries = 10 +// Default configuration values (used when config values are 0 or not set) +const ( + defaultMaxConcurrentQueries = 10 + defaultMaxConnPoolSize = 25 + defaultFetchSize = 1000 + defaultMaxTxRetrySeconds = 30 + defaultQueryResultLimit = 10000 +) // maxRetryAttempts is the maximum number of times to retry a query on rate limit const maxRetryAttempts = 3 @@ -45,6 +50,12 @@ type N struct { neo4jUser string neo4jPassword string + // Driver tuning options + maxConnPoolSize int // max connections in pool + fetchSize int // records per fetch batch + maxTxRetryTime time.Duration + queryResultLimit int // max results per query (0=unlimited) + ready chan struct{} // Closed when database is ready to serve requests // querySem limits concurrent queries to prevent rate limiting @@ -118,16 +129,38 @@ func NewWithConfig( neo4jPassword = "password" } + // Apply defaults for driver tuning options + maxConnPoolSize := cfg.Neo4jMaxConnPoolSize + if maxConnPoolSize <= 0 { + maxConnPoolSize = defaultMaxConnPoolSize + } + fetchSize := cfg.Neo4jFetchSize + if fetchSize == 0 { + fetchSize = defaultFetchSize + } + maxTxRetrySeconds := cfg.Neo4jMaxTxRetrySeconds + if maxTxRetrySeconds <= 0 { + maxTxRetrySeconds = defaultMaxTxRetrySeconds + } + queryResultLimit := cfg.Neo4jQueryResultLimit + if queryResultLimit == 0 { + queryResultLimit = defaultQueryResultLimit + } + n = &N{ - ctx: ctx, - cancel: cancel, - dataDir: cfg.DataDir, - Logger: NewLogger(lol.GetLogLevel(cfg.LogLevel), cfg.DataDir), - neo4jURI: neo4jURI, - neo4jUser: neo4jUser, - neo4jPassword: neo4jPassword, - ready: make(chan struct{}), - querySem: make(chan struct{}, maxConcurrentQueries), + ctx: ctx, + cancel: cancel, + dataDir: cfg.DataDir, + Logger: NewLogger(lol.GetLogLevel(cfg.LogLevel), cfg.DataDir), + neo4jURI: neo4jURI, + neo4jUser: neo4jUser, + neo4jPassword: neo4jPassword, + maxConnPoolSize: maxConnPoolSize, + fetchSize: fetchSize, + maxTxRetryTime: time.Duration(maxTxRetrySeconds) * time.Second, + queryResultLimit: queryResultLimit, + ready: make(chan struct{}), + querySem: make(chan struct{}, defaultMaxConcurrentQueries), } // Ensure the data directory exists @@ -191,12 +224,24 @@ func New( // initNeo4jClient establishes connection to Neo4j server func (n *N) initNeo4jClient() error { - n.Logger.Infof("connecting to neo4j at %s", n.neo4jURI) + n.Logger.Infof("connecting to neo4j at %s (pool=%d, fetch=%d, txRetry=%v)", + n.neo4jURI, n.maxConnPoolSize, n.fetchSize, n.maxTxRetryTime) - // Create Neo4j driver + // Create Neo4j driver with tuned configuration driver, err := neo4j.NewDriverWithContext( n.neo4jURI, neo4j.BasicAuth(n.neo4jUser, n.neo4jPassword, ""), + func(config *neo4j.Config) { + // Limit connection pool size to reduce memory usage + config.MaxConnectionPoolSize = n.maxConnPoolSize + + // Set fetch size to batch records and prevent memory overflow + // -1 means fetch all (driver default), positive value limits batch size + config.FetchSize = n.fetchSize + + // Set max transaction retry time + config.MaxTransactionRetryTime = n.maxTxRetryTime + }, ) if err != nil { return fmt.Errorf("failed to create neo4j driver: %w", err) @@ -462,3 +507,19 @@ func (n *N) QuerySem() chan struct{} { func (n *N) MaxConcurrentQueries() int { return cap(n.querySem) } + +// QueryResultLimit returns the configured maximum results per query. +// Returns 0 if unlimited (no limit applied). +func (n *N) QueryResultLimit() int { + return n.queryResultLimit +} + +// FetchSize returns the configured fetch batch size. +func (n *N) FetchSize() int { + return n.fetchSize +} + +// MaxConnPoolSize returns the configured connection pool size. +func (n *N) MaxConnPoolSize() int { + return n.maxConnPoolSize +} diff --git a/pkg/neo4j/query-events.go b/pkg/neo4j/query-events.go index 79e716c..9b04b2c 100644 --- a/pkg/neo4j/query-events.go +++ b/pkg/neo4j/query-events.go @@ -223,10 +223,25 @@ RETURN e.id AS id, // Add ordering (most recent first) orderClause := " ORDER BY e.created_at DESC" - // Add limit if specified + // Add limit - use the smaller of requested limit and configured max limit + // This prevents unbounded queries that could exhaust memory limitClause := "" + requestedLimit := 0 if f.Limit != nil && *f.Limit > 0 { - params["limit"] = *f.Limit + requestedLimit = int(*f.Limit) + } + + // Apply the configured query result limit as a safety cap + // If queryResultLimit is 0 (unlimited), only use the requested limit + effectiveLimit := requestedLimit + if n.queryResultLimit > 0 { + if effectiveLimit == 0 || effectiveLimit > n.queryResultLimit { + effectiveLimit = n.queryResultLimit + } + } + + if effectiveLimit > 0 { + params["limit"] = effectiveLimit limitClause = " LIMIT $limit" } @@ -358,11 +373,16 @@ func (n *N) QueryForSerials(c context.Context, f *filter.F) ( return nil, fmt.Errorf("invalid query structure") } - // Rebuild query with serial-only return + // Rebuild query with serial-only return, preserving ORDER BY and LIMIT cypher = cypherParts[0] + returnClause - if strings.Contains(cypherParts[1], "ORDER BY") { - orderPart := " ORDER BY" + strings.Split(cypherParts[1], "ORDER BY")[1] - cypher += orderPart + remainder := cypherParts[1] + if strings.Contains(remainder, "ORDER BY") { + orderAndLimit := " ORDER BY" + strings.Split(remainder, "ORDER BY")[1] + cypher += orderAndLimit + } else if strings.Contains(remainder, "LIMIT") { + // No ORDER BY but has LIMIT + limitPart := " LIMIT" + strings.Split(remainder, "LIMIT")[1] + cypher += limitPart } result, err := n.ExecuteRead(c, cypher, params) @@ -417,10 +437,16 @@ func (n *N) QueryForIds(c context.Context, f *filter.F) ( return nil, fmt.Errorf("invalid query structure") } + // Rebuild query preserving ORDER BY and LIMIT cypher = cypherParts[0] + returnClause - if strings.Contains(cypherParts[1], "ORDER BY") { - orderPart := " ORDER BY" + strings.Split(cypherParts[1], "ORDER BY")[1] - cypher += orderPart + remainder := cypherParts[1] + if strings.Contains(remainder, "ORDER BY") { + orderAndLimit := " ORDER BY" + strings.Split(remainder, "ORDER BY")[1] + cypher += orderAndLimit + } else if strings.Contains(remainder, "LIMIT") { + // No ORDER BY but has LIMIT + limitPart := " LIMIT" + strings.Split(remainder, "LIMIT")[1] + cypher += limitPart } result, err := n.ExecuteRead(c, cypher, params) diff --git a/pkg/protocol/publish/publisher.go b/pkg/protocol/publish/publisher.go index 7846e21..a3a4fbb 100644 --- a/pkg/protocol/publish/publisher.go +++ b/pkg/protocol/publish/publisher.go @@ -24,6 +24,11 @@ type WriteChanSetter interface { GetWriteChan(*websocket.Conn) (chan WriteRequest, bool) } +// NIP46SignerChecker defines the interface for checking active NIP-46 signers +type NIP46SignerChecker interface { + HasActiveNIP46Signer(signerPubkey []byte) bool +} + // S is the control structure for the subscription management scheme. type S struct { publisher.Publishers diff --git a/pkg/version/version b/pkg/version/version index 01efe7f..3170957 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.42.0 +v0.43.0