Add Neo4j memory tuning config and query result limits (v0.43.0)
Some checks failed
Go / build-and-release (push) Has been cancelled
Some checks failed
Go / build-and-release (push) Has been cancelled
- Add Neo4j driver config options for memory management: - ORLY_NEO4J_MAX_CONN_POOL (default: 25) - connection pool size - ORLY_NEO4J_FETCH_SIZE (default: 1000) - records per batch - ORLY_NEO4J_MAX_TX_RETRY_SEC (default: 30) - transaction retry timeout - ORLY_NEO4J_QUERY_RESULT_LIMIT (default: 10000) - max results per query - Apply driver settings when creating Neo4j connection (pool size, fetch size, retry time) - Enforce query result limit as safety cap on all Cypher queries - Fix QueryForSerials and QueryForIds to preserve LIMIT clauses - Add comprehensive memory tuning documentation with sizing guidelines - Add NIP-46 signer-based authentication for bunker connections - Update go.mod with new dependencies Files modified: - app/config/config.go: Add Neo4j driver tuning config vars - main.go: Pass new config values to database factory - pkg/database/factory.go: Add Neo4j tuning fields to DatabaseConfig - pkg/database/factory_wasm.go: Mirror factory.go changes for WASM - pkg/neo4j/neo4j.go: Apply driver config, add getter methods - pkg/neo4j/query-events.go: Enforce query result limit, fix LIMIT preservation - docs/NEO4J_BACKEND.md: Add Memory Tuning section, update Docker example - CLAUDE.md: Add Neo4j memory tuning quick reference - app/handle-req.go: NIP-46 signer authentication - app/publisher.go: HasActiveNIP46Signer check - pkg/protocol/publish/publisher.go: NIP46SignerChecker interface - go.mod: Add dependencies 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -46,6 +46,14 @@ NOSTR_SECRET_KEY=nsec1... ./nurl https://relay.example.com/api/logs/clear
|
|||||||
| `ORLY_TLS_DOMAINS` | | Let's Encrypt domains |
|
| `ORLY_TLS_DOMAINS` | | Let's Encrypt domains |
|
||||||
| `ORLY_AUTH_TO_WRITE` | false | Require auth for writes |
|
| `ORLY_AUTH_TO_WRITE` | false | Require auth for writes |
|
||||||
|
|
||||||
|
**Neo4j Memory Tuning** (only when `ORLY_DB_TYPE=neo4j`):
|
||||||
|
|
||||||
|
| Variable | Default | Description |
|
||||||
|
|----------|---------|-------------|
|
||||||
|
| `ORLY_NEO4J_MAX_CONN_POOL` | 25 | Max connections (lower = less memory) |
|
||||||
|
| `ORLY_NEO4J_FETCH_SIZE` | 1000 | Records per batch (-1=all) |
|
||||||
|
| `ORLY_NEO4J_QUERY_RESULT_LIMIT` | 10000 | Max results per query (0=unlimited) |
|
||||||
|
|
||||||
See `./orly help` for all options. **All env vars MUST be defined in `app/config/config.go`**.
|
See `./orly help` for all options. **All env vars MUST be defined in `app/config/config.go`**.
|
||||||
|
|
||||||
## Architecture
|
## Architecture
|
||||||
|
|||||||
@@ -105,6 +105,12 @@ type C struct {
|
|||||||
Neo4jUser string `env:"ORLY_NEO4J_USER" default:"neo4j" usage:"Neo4j authentication username (only used when ORLY_DB_TYPE=neo4j)"`
|
Neo4jUser string `env:"ORLY_NEO4J_USER" default:"neo4j" usage:"Neo4j authentication username (only used when ORLY_DB_TYPE=neo4j)"`
|
||||||
Neo4jPassword string `env:"ORLY_NEO4J_PASSWORD" default:"password" usage:"Neo4j authentication password (only used when ORLY_DB_TYPE=neo4j)"`
|
Neo4jPassword string `env:"ORLY_NEO4J_PASSWORD" default:"password" usage:"Neo4j authentication password (only used when ORLY_DB_TYPE=neo4j)"`
|
||||||
|
|
||||||
|
// Neo4j driver tuning (memory and connection management)
|
||||||
|
Neo4jMaxConnPoolSize int `env:"ORLY_NEO4J_MAX_CONN_POOL" default:"25" usage:"max Neo4j connection pool size (driver default: 100, lower reduces memory)"`
|
||||||
|
Neo4jFetchSize int `env:"ORLY_NEO4J_FETCH_SIZE" default:"1000" usage:"max records per fetch batch (prevents memory overflow, -1=fetch all)"`
|
||||||
|
Neo4jMaxTxRetrySeconds int `env:"ORLY_NEO4J_MAX_TX_RETRY_SEC" default:"30" usage:"max seconds for retryable transaction attempts"`
|
||||||
|
Neo4jQueryResultLimit int `env:"ORLY_NEO4J_QUERY_RESULT_LIMIT" default:"10000" usage:"max results returned per query (prevents unbounded memory usage, 0=unlimited)"`
|
||||||
|
|
||||||
// Advanced database tuning
|
// Advanced database tuning
|
||||||
SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"100000" usage:"max pubkeys to cache for compact event storage (default: 100000, ~3.2MB memory)"`
|
SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"100000" usage:"max pubkeys to cache for compact event storage (default: 100000, ~3.2MB memory)"`
|
||||||
SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"500000" usage:"max event IDs to cache for compact event storage (default: 500000, ~16MB memory)"`
|
SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"500000" usage:"max event IDs to cache for compact event storage (default: 500000, ~16MB memory)"`
|
||||||
@@ -472,6 +478,7 @@ func (cfg *C) GetDatabaseConfigValues() (
|
|||||||
serialCachePubkeys, serialCacheEventIds int,
|
serialCachePubkeys, serialCacheEventIds int,
|
||||||
zstdLevel int,
|
zstdLevel int,
|
||||||
neo4jURI, neo4jUser, neo4jPassword string,
|
neo4jURI, neo4jUser, neo4jPassword string,
|
||||||
|
neo4jMaxConnPoolSize, neo4jFetchSize, neo4jMaxTxRetrySeconds, neo4jQueryResultLimit int,
|
||||||
) {
|
) {
|
||||||
// Parse query cache max age from string to duration
|
// Parse query cache max age from string to duration
|
||||||
queryCacheMaxAge = 5 * time.Minute // Default
|
queryCacheMaxAge = 5 * time.Minute // Default
|
||||||
@@ -487,7 +494,8 @@ func (cfg *C) GetDatabaseConfigValues() (
|
|||||||
cfg.QueryCacheDisabled,
|
cfg.QueryCacheDisabled,
|
||||||
cfg.SerialCachePubkeys, cfg.SerialCacheEventIds,
|
cfg.SerialCachePubkeys, cfg.SerialCacheEventIds,
|
||||||
cfg.DBZSTDLevel,
|
cfg.DBZSTDLevel,
|
||||||
cfg.Neo4jURI, cfg.Neo4jUser, cfg.Neo4jPassword
|
cfg.Neo4jURI, cfg.Neo4jUser, cfg.Neo4jPassword,
|
||||||
|
cfg.Neo4jMaxConnPoolSize, cfg.Neo4jFetchSize, cfg.Neo4jMaxTxRetrySeconds, cfg.Neo4jQueryResultLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRateLimitConfigValues returns the rate limiting configuration values.
|
// GetRateLimitConfigValues returns the rate limiting configuration values.
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ import (
|
|||||||
"next.orly.dev/pkg/policy"
|
"next.orly.dev/pkg/policy"
|
||||||
"next.orly.dev/pkg/protocol/graph"
|
"next.orly.dev/pkg/protocol/graph"
|
||||||
"next.orly.dev/pkg/protocol/nip43"
|
"next.orly.dev/pkg/protocol/nip43"
|
||||||
|
"next.orly.dev/pkg/protocol/publish"
|
||||||
"git.mleku.dev/mleku/nostr/utils/normalize"
|
"git.mleku.dev/mleku/nostr/utils/normalize"
|
||||||
"git.mleku.dev/mleku/nostr/utils/pointers"
|
"git.mleku.dev/mleku/nostr/utils/pointers"
|
||||||
)
|
)
|
||||||
@@ -52,6 +53,51 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
|||||||
)
|
)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NIP-46 signer-based authentication:
|
||||||
|
// If client is not authenticated and requests kind 24133 with exactly one #p tag,
|
||||||
|
// check if there's an active signer subscription for that pubkey.
|
||||||
|
// If so, authenticate the client as that pubkey.
|
||||||
|
const kindNIP46 = 24133
|
||||||
|
if len(l.authedPubkey.Load()) == 0 && len(*env.Filters) == 1 {
|
||||||
|
f := (*env.Filters)[0]
|
||||||
|
if f != nil && f.Kinds != nil && f.Kinds.Len() == 1 {
|
||||||
|
isNIP46Kind := false
|
||||||
|
for _, k := range f.Kinds.K {
|
||||||
|
if k.K == kindNIP46 {
|
||||||
|
isNIP46Kind = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if isNIP46Kind && f.Tags != nil {
|
||||||
|
pTag := f.Tags.GetFirst([]byte("p"))
|
||||||
|
// Must have exactly one pubkey in the #p tag
|
||||||
|
if pTag != nil && pTag.Len() == 2 {
|
||||||
|
signerPubkey := pTag.Value()
|
||||||
|
// Convert to binary if hex
|
||||||
|
var signerPubkeyBin []byte
|
||||||
|
if len(signerPubkey) == 64 {
|
||||||
|
signerPubkeyBin, _ = hexenc.Dec(string(signerPubkey))
|
||||||
|
} else if len(signerPubkey) == 32 {
|
||||||
|
signerPubkeyBin = signerPubkey
|
||||||
|
}
|
||||||
|
if len(signerPubkeyBin) == 32 {
|
||||||
|
// Check if there's an active signer for this pubkey
|
||||||
|
if socketPub := l.publishers.GetSocketPublisher(); socketPub != nil {
|
||||||
|
if checker, ok := socketPub.(publish.NIP46SignerChecker); ok {
|
||||||
|
if checker.HasActiveNIP46Signer(signerPubkeyBin) {
|
||||||
|
log.I.F("NIP-46 auth: client %s authenticated via active signer %s",
|
||||||
|
l.remote, hexenc.Enc(signerPubkeyBin))
|
||||||
|
l.authedPubkey.Store(signerPubkeyBin)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// send a challenge to the client to auth if an ACL is active, auth is required, or AuthToWrite is enabled
|
// send a challenge to the client to auth if an ACL is active, auth is required, or AuthToWrite is enabled
|
||||||
if len(l.authedPubkey.Load()) == 0 && (acl.Registry.Active.Load() != "none" || l.Config.AuthRequired || l.Config.AuthToWrite) {
|
if len(l.authedPubkey.Load()) == 0 && (acl.Registry.Active.Load() != "none" || l.Config.AuthRequired || l.Config.AuthToWrite) {
|
||||||
if err = authenvelope.NewChallengeWith(l.challenge.Load()).
|
if err = authenvelope.NewChallengeWith(l.challenge.Load()).
|
||||||
|
|||||||
@@ -320,6 +320,67 @@ func (p *P) removeSubscriber(ws *websocket.Conn) {
|
|||||||
delete(p.WriteChans, ws)
|
delete(p.WriteChans, ws)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasActiveNIP46Signer checks if there's an active subscription for kind 24133
|
||||||
|
// where the given pubkey is involved (either as author filter or in #p tag filter).
|
||||||
|
// This is used to authenticate clients by proving a signer is connected for that pubkey.
|
||||||
|
func (p *P) HasActiveNIP46Signer(signerPubkey []byte) bool {
|
||||||
|
const kindNIP46 = 24133
|
||||||
|
p.Mx.RLock()
|
||||||
|
defer p.Mx.RUnlock()
|
||||||
|
|
||||||
|
for _, subs := range p.Map {
|
||||||
|
for _, sub := range subs {
|
||||||
|
if sub.S == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, f := range *sub.S {
|
||||||
|
if f == nil || f.Kinds == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Check if filter is for kind 24133
|
||||||
|
hasNIP46Kind := false
|
||||||
|
for _, k := range f.Kinds.K {
|
||||||
|
if k.K == kindNIP46 {
|
||||||
|
hasNIP46Kind = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !hasNIP46Kind {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Check if the signer pubkey matches the #p tag filter
|
||||||
|
if f.Tags != nil {
|
||||||
|
pTag := f.Tags.GetFirst([]byte("p"))
|
||||||
|
if pTag != nil && pTag.Len() >= 2 {
|
||||||
|
for i := 1; i < pTag.Len(); i++ {
|
||||||
|
tagValue := pTag.T[i]
|
||||||
|
// Compare - handle both binary and hex formats
|
||||||
|
if len(tagValue) == 32 && len(signerPubkey) == 32 {
|
||||||
|
if utils.FastEqual(tagValue, signerPubkey) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
} else if len(tagValue) == 64 && len(signerPubkey) == 32 {
|
||||||
|
// tagValue is hex, signerPubkey is binary
|
||||||
|
if string(tagValue) == hex.Enc(signerPubkey) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
} else if len(tagValue) == 32 && len(signerPubkey) == 64 {
|
||||||
|
// tagValue is binary, signerPubkey is hex
|
||||||
|
if hex.Enc(tagValue) == string(signerPubkey) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
} else if utils.FastEqual(tagValue, signerPubkey) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// canSeePrivateEvent checks if the authenticated user can see an event with a private tag
|
// canSeePrivateEvent checks if the authenticated user can see an event with a private tag
|
||||||
func (p *P) canSeePrivateEvent(
|
func (p *P) canSeePrivateEvent(
|
||||||
authedPubkey, privatePubkey []byte, remote string,
|
authedPubkey, privatePubkey []byte, remote string,
|
||||||
|
|||||||
@@ -194,6 +194,12 @@ ORLY_DB_TYPE="neo4j"
|
|||||||
|
|
||||||
# Data Directory (for Badger metadata storage)
|
# Data Directory (for Badger metadata storage)
|
||||||
ORLY_DATA_DIR="~/.local/share/ORLY"
|
ORLY_DATA_DIR="~/.local/share/ORLY"
|
||||||
|
|
||||||
|
# Neo4j Driver Tuning (Memory Management)
|
||||||
|
ORLY_NEO4J_MAX_CONN_POOL=25 # Max connections (default: 25, driver default: 100)
|
||||||
|
ORLY_NEO4J_FETCH_SIZE=1000 # Records per fetch batch (default: 1000, -1=all)
|
||||||
|
ORLY_NEO4J_MAX_TX_RETRY_SEC=30 # Max transaction retry time in seconds
|
||||||
|
ORLY_NEO4J_QUERY_RESULT_LIMIT=10000 # Max results per query (0=unlimited)
|
||||||
```
|
```
|
||||||
|
|
||||||
### Example Docker Compose Setup
|
### Example Docker Compose Setup
|
||||||
@@ -209,6 +215,15 @@ services:
|
|||||||
environment:
|
environment:
|
||||||
- NEO4J_AUTH=neo4j/password
|
- NEO4J_AUTH=neo4j/password
|
||||||
- NEO4J_PLUGINS=["apoc"]
|
- NEO4J_PLUGINS=["apoc"]
|
||||||
|
# Memory tuning for production
|
||||||
|
- NEO4J_server_memory_heap_initial__size=512m
|
||||||
|
- NEO4J_server_memory_heap_max__size=1g
|
||||||
|
- NEO4J_server_memory_pagecache_size=512m
|
||||||
|
# Transaction memory limits (prevent runaway queries)
|
||||||
|
- NEO4J_dbms_memory_transaction_total__max=256m
|
||||||
|
- NEO4J_dbms_memory_transaction_max=64m
|
||||||
|
# Query timeout
|
||||||
|
- NEO4J_dbms_transaction_timeout=30s
|
||||||
volumes:
|
volumes:
|
||||||
- neo4j_data:/data
|
- neo4j_data:/data
|
||||||
- neo4j_logs:/logs
|
- neo4j_logs:/logs
|
||||||
@@ -222,6 +237,10 @@ services:
|
|||||||
- ORLY_NEO4J_URI=bolt://neo4j:7687
|
- ORLY_NEO4J_URI=bolt://neo4j:7687
|
||||||
- ORLY_NEO4J_USER=neo4j
|
- ORLY_NEO4J_USER=neo4j
|
||||||
- ORLY_NEO4J_PASSWORD=password
|
- ORLY_NEO4J_PASSWORD=password
|
||||||
|
# Driver tuning for memory management
|
||||||
|
- ORLY_NEO4J_MAX_CONN_POOL=25
|
||||||
|
- ORLY_NEO4J_FETCH_SIZE=1000
|
||||||
|
- ORLY_NEO4J_QUERY_RESULT_LIMIT=10000
|
||||||
depends_on:
|
depends_on:
|
||||||
- neo4j
|
- neo4j
|
||||||
|
|
||||||
@@ -248,15 +267,127 @@ volumes:
|
|||||||
- Composite: kind + created_at
|
- Composite: kind + created_at
|
||||||
- Tag type + value
|
- Tag type + value
|
||||||
|
|
||||||
2. **Cache Configuration**: Configure Neo4j's page cache and heap size:
|
2. **Cache Configuration**: Configure Neo4j's page cache and heap size (see Memory Tuning below)
|
||||||
```conf
|
|
||||||
# neo4j.conf
|
3. **Query Limits**: The relay automatically enforces `ORLY_NEO4J_QUERY_RESULT_LIMIT` (default: 10000) to prevent unbounded queries from exhausting memory
|
||||||
dbms.memory.heap.initial_size=2G
|
|
||||||
dbms.memory.heap.max_size=4G
|
## Memory Tuning
|
||||||
dbms.memory.pagecache.size=4G
|
|
||||||
|
Neo4j runs as a separate process (typically in Docker), so memory management involves both the relay driver settings and Neo4j server configuration.
|
||||||
|
|
||||||
|
### Understanding Memory Layers
|
||||||
|
|
||||||
|
1. **ORLY Relay Process** (~35MB RSS typical)
|
||||||
|
- Go driver connection pool
|
||||||
|
- Query result buffering
|
||||||
|
- Controlled by `ORLY_NEO4J_*` environment variables
|
||||||
|
|
||||||
|
2. **Neo4j Server Process** (512MB-4GB+ depending on data)
|
||||||
|
- JVM heap for Java objects
|
||||||
|
- Page cache for graph data
|
||||||
|
- Transaction memory for query execution
|
||||||
|
- Controlled by `NEO4J_*` environment variables
|
||||||
|
|
||||||
|
### Relay Driver Tuning (ORLY side)
|
||||||
|
|
||||||
|
| Variable | Default | Description |
|
||||||
|
|----------|---------|-------------|
|
||||||
|
| `ORLY_NEO4J_MAX_CONN_POOL` | 25 | Max connections in pool. Lower = less memory, but may bottleneck under high load. Driver default is 100. |
|
||||||
|
| `ORLY_NEO4J_FETCH_SIZE` | 1000 | Records fetched per batch. Lower = less memory per query, more round trips. Set to -1 for all (risky). |
|
||||||
|
| `ORLY_NEO4J_MAX_TX_RETRY_SEC` | 30 | Max seconds to retry failed transactions. |
|
||||||
|
| `ORLY_NEO4J_QUERY_RESULT_LIMIT` | 10000 | Hard cap on results per query. Prevents unbounded queries. Set to 0 for unlimited (not recommended). |
|
||||||
|
|
||||||
|
**Recommended settings for memory-constrained environments:**
|
||||||
|
```bash
|
||||||
|
ORLY_NEO4J_MAX_CONN_POOL=10
|
||||||
|
ORLY_NEO4J_FETCH_SIZE=500
|
||||||
|
ORLY_NEO4J_QUERY_RESULT_LIMIT=5000
|
||||||
```
|
```
|
||||||
|
|
||||||
3. **Query Limits**: Always use LIMIT in queries to prevent memory exhaustion
|
### Neo4j Server Tuning (Docker/neo4j.conf)
|
||||||
|
|
||||||
|
**JVM Heap Memory** - For Java objects and query processing:
|
||||||
|
```bash
|
||||||
|
# Docker environment variables
|
||||||
|
NEO4J_server_memory_heap_initial__size=512m
|
||||||
|
NEO4J_server_memory_heap_max__size=1g
|
||||||
|
|
||||||
|
# neo4j.conf equivalent
|
||||||
|
server.memory.heap.initial_size=512m
|
||||||
|
server.memory.heap.max_size=1g
|
||||||
|
```
|
||||||
|
|
||||||
|
**Page Cache** - For caching graph data from disk:
|
||||||
|
```bash
|
||||||
|
# Docker
|
||||||
|
NEO4J_server_memory_pagecache_size=512m
|
||||||
|
|
||||||
|
# neo4j.conf
|
||||||
|
server.memory.pagecache.size=512m
|
||||||
|
```
|
||||||
|
|
||||||
|
**Transaction Memory Limits** - Prevent runaway queries:
|
||||||
|
```bash
|
||||||
|
# Docker
|
||||||
|
NEO4J_dbms_memory_transaction_total__max=256m # Global limit across all transactions
|
||||||
|
NEO4J_dbms_memory_transaction_max=64m # Per-transaction limit
|
||||||
|
|
||||||
|
# neo4j.conf
|
||||||
|
dbms.memory.transaction.total.max=256m
|
||||||
|
db.memory.transaction.max=64m
|
||||||
|
```
|
||||||
|
|
||||||
|
**Query Timeout** - Kill long-running queries:
|
||||||
|
```bash
|
||||||
|
# Docker
|
||||||
|
NEO4J_dbms_transaction_timeout=30s
|
||||||
|
|
||||||
|
# neo4j.conf
|
||||||
|
dbms.transaction.timeout=30s
|
||||||
|
```
|
||||||
|
|
||||||
|
### Memory Sizing Guidelines
|
||||||
|
|
||||||
|
| Deployment Size | Heap | Page Cache | Total Neo4j | ORLY Pool |
|
||||||
|
|-----------------|------|------------|-------------|-----------|
|
||||||
|
| Development | 512m | 256m | ~1GB | 10 |
|
||||||
|
| Small relay (<100k events) | 1g | 512m | ~2GB | 25 |
|
||||||
|
| Medium relay (<1M events) | 2g | 1g | ~4GB | 50 |
|
||||||
|
| Large relay (>1M events) | 4g | 2g | ~8GB | 100 |
|
||||||
|
|
||||||
|
**Formula for Page Cache:**
|
||||||
|
```
|
||||||
|
Page Cache = Data Size on Disk × 1.2
|
||||||
|
```
|
||||||
|
|
||||||
|
Use `neo4j-admin server memory-recommendation` inside the container to get tailored recommendations.
|
||||||
|
|
||||||
|
### Monitoring Memory Usage
|
||||||
|
|
||||||
|
**Check Neo4j memory from relay logs:**
|
||||||
|
```bash
|
||||||
|
# Driver config is logged at startup
|
||||||
|
grep "connecting to neo4j" /path/to/orly.log
|
||||||
|
# Output: connecting to neo4j at bolt://... (pool=25, fetch=1000, txRetry=30s)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Check Neo4j server memory:**
|
||||||
|
```bash
|
||||||
|
# Inside Neo4j container
|
||||||
|
docker exec neo4j neo4j-admin server memory-recommendation
|
||||||
|
|
||||||
|
# Or query via Cypher
|
||||||
|
CALL dbms.listPools() YIELD pool, heapMemoryUsed, heapMemoryUsedBytes
|
||||||
|
RETURN pool, heapMemoryUsed
|
||||||
|
```
|
||||||
|
|
||||||
|
**Monitor transaction memory:**
|
||||||
|
```cypher
|
||||||
|
CALL dbms.listTransactions()
|
||||||
|
YIELD transactionId, currentQuery, allocatedBytes
|
||||||
|
RETURN transactionId, currentQuery, allocatedBytes
|
||||||
|
ORDER BY allocatedBytes DESC
|
||||||
|
```
|
||||||
|
|
||||||
## Implementation Details
|
## Implementation Details
|
||||||
|
|
||||||
|
|||||||
6
go.mod
6
go.mod
@@ -5,7 +5,9 @@ go 1.25.3
|
|||||||
require (
|
require (
|
||||||
git.mleku.dev/mleku/nostr v1.0.11
|
git.mleku.dev/mleku/nostr v1.0.11
|
||||||
github.com/adrg/xdg v0.5.3
|
github.com/adrg/xdg v0.5.3
|
||||||
|
github.com/alexflint/go-arg v1.6.1
|
||||||
github.com/aperturerobotics/go-indexeddb v0.2.3
|
github.com/aperturerobotics/go-indexeddb v0.2.3
|
||||||
|
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0
|
||||||
github.com/dgraph-io/badger/v4 v4.8.0
|
github.com/dgraph-io/badger/v4 v4.8.0
|
||||||
github.com/gorilla/websocket v1.5.3
|
github.com/gorilla/websocket v1.5.3
|
||||||
github.com/hack-pad/safejs v0.1.1
|
github.com/hack-pad/safejs v0.1.1
|
||||||
@@ -23,6 +25,7 @@ require (
|
|||||||
go.uber.org/atomic v1.11.0
|
go.uber.org/atomic v1.11.0
|
||||||
golang.org/x/crypto v0.46.0
|
golang.org/x/crypto v0.46.0
|
||||||
golang.org/x/lint v0.0.0-20241112194109-818c5a804067
|
golang.org/x/lint v0.0.0-20241112194109-818c5a804067
|
||||||
|
golang.zx2c4.com/wireguard v0.0.0-20250521234502-f333402bd9cb
|
||||||
honnef.co/go/tools v0.6.1
|
honnef.co/go/tools v0.6.1
|
||||||
lol.mleku.dev v1.0.5
|
lol.mleku.dev v1.0.5
|
||||||
lukechampine.com/frand v1.5.1
|
lukechampine.com/frand v1.5.1
|
||||||
@@ -31,7 +34,6 @@ require (
|
|||||||
require (
|
require (
|
||||||
github.com/BurntSushi/toml v1.5.0 // indirect
|
github.com/BurntSushi/toml v1.5.0 // indirect
|
||||||
github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 // indirect
|
github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 // indirect
|
||||||
github.com/alexflint/go-arg v1.6.1 // indirect
|
|
||||||
github.com/alexflint/go-scalar v1.2.0 // indirect
|
github.com/alexflint/go-scalar v1.2.0 // indirect
|
||||||
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
|
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
|
||||||
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect
|
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect
|
||||||
@@ -42,7 +44,6 @@ require (
|
|||||||
github.com/coder/websocket v1.8.12 // indirect
|
github.com/coder/websocket v1.8.12 // indirect
|
||||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
github.com/decred/dcrd/crypto/blake256 v1.1.0 // indirect
|
github.com/decred/dcrd/crypto/blake256 v1.1.0 // indirect
|
||||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
|
|
||||||
github.com/dgraph-io/ristretto/v2 v2.3.0 // indirect
|
github.com/dgraph-io/ristretto/v2 v2.3.0 // indirect
|
||||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||||
github.com/ebitengine/purego v0.9.1 // indirect
|
github.com/ebitengine/purego v0.9.1 // indirect
|
||||||
@@ -82,7 +83,6 @@ require (
|
|||||||
golang.org/x/time v0.7.0 // indirect
|
golang.org/x/time v0.7.0 // indirect
|
||||||
golang.org/x/tools v0.40.0 // indirect
|
golang.org/x/tools v0.40.0 // indirect
|
||||||
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
||||||
golang.zx2c4.com/wireguard v0.0.0-20250521234502-f333402bd9cb // indirect
|
|
||||||
google.golang.org/protobuf v1.36.10 // indirect
|
google.golang.org/protobuf v1.36.10 // indirect
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
gvisor.dev/gvisor v0.0.0-20250503011706-39ed1f5ac29c // indirect
|
gvisor.dev/gvisor v0.0.0-20250503011706-39ed1f5ac29c // indirect
|
||||||
|
|||||||
7
main.go
7
main.go
@@ -525,7 +525,8 @@ func makeDatabaseConfig(cfg *config.C) *database.DatabaseConfig {
|
|||||||
queryCacheDisabled,
|
queryCacheDisabled,
|
||||||
serialCachePubkeys, serialCacheEventIds,
|
serialCachePubkeys, serialCacheEventIds,
|
||||||
zstdLevel,
|
zstdLevel,
|
||||||
neo4jURI, neo4jUser, neo4jPassword := cfg.GetDatabaseConfigValues()
|
neo4jURI, neo4jUser, neo4jPassword,
|
||||||
|
neo4jMaxConnPoolSize, neo4jFetchSize, neo4jMaxTxRetrySeconds, neo4jQueryResultLimit := cfg.GetDatabaseConfigValues()
|
||||||
|
|
||||||
return &database.DatabaseConfig{
|
return &database.DatabaseConfig{
|
||||||
DataDir: dataDir,
|
DataDir: dataDir,
|
||||||
@@ -541,5 +542,9 @@ func makeDatabaseConfig(cfg *config.C) *database.DatabaseConfig {
|
|||||||
Neo4jURI: neo4jURI,
|
Neo4jURI: neo4jURI,
|
||||||
Neo4jUser: neo4jUser,
|
Neo4jUser: neo4jUser,
|
||||||
Neo4jPassword: neo4jPassword,
|
Neo4jPassword: neo4jPassword,
|
||||||
|
Neo4jMaxConnPoolSize: neo4jMaxConnPoolSize,
|
||||||
|
Neo4jFetchSize: neo4jFetchSize,
|
||||||
|
Neo4jMaxTxRetrySeconds: neo4jMaxTxRetrySeconds,
|
||||||
|
Neo4jQueryResultLimit: neo4jQueryResultLimit,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,6 +35,12 @@ type DatabaseConfig struct {
|
|||||||
Neo4jURI string // ORLY_NEO4J_URI
|
Neo4jURI string // ORLY_NEO4J_URI
|
||||||
Neo4jUser string // ORLY_NEO4J_USER
|
Neo4jUser string // ORLY_NEO4J_USER
|
||||||
Neo4jPassword string // ORLY_NEO4J_PASSWORD
|
Neo4jPassword string // ORLY_NEO4J_PASSWORD
|
||||||
|
|
||||||
|
// Neo4j driver tuning (memory and connection management)
|
||||||
|
Neo4jMaxConnPoolSize int // ORLY_NEO4J_MAX_CONN_POOL - max connection pool size (default: 25)
|
||||||
|
Neo4jFetchSize int // ORLY_NEO4J_FETCH_SIZE - max records per fetch batch (default: 1000)
|
||||||
|
Neo4jMaxTxRetrySeconds int // ORLY_NEO4J_MAX_TX_RETRY_SEC - max transaction retry time (default: 30)
|
||||||
|
Neo4jQueryResultLimit int // ORLY_NEO4J_QUERY_RESULT_LIMIT - max results per query (default: 10000, 0=unlimited)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabase creates a database instance based on the specified type.
|
// NewDatabase creates a database instance based on the specified type.
|
||||||
|
|||||||
@@ -31,6 +31,12 @@ type DatabaseConfig struct {
|
|||||||
Neo4jURI string // ORLY_NEO4J_URI
|
Neo4jURI string // ORLY_NEO4J_URI
|
||||||
Neo4jUser string // ORLY_NEO4J_USER
|
Neo4jUser string // ORLY_NEO4J_USER
|
||||||
Neo4jPassword string // ORLY_NEO4J_PASSWORD
|
Neo4jPassword string // ORLY_NEO4J_PASSWORD
|
||||||
|
|
||||||
|
// Neo4j driver tuning (memory and connection management)
|
||||||
|
Neo4jMaxConnPoolSize int // ORLY_NEO4J_MAX_CONN_POOL - max connection pool size (default: 25)
|
||||||
|
Neo4jFetchSize int // ORLY_NEO4J_FETCH_SIZE - max records per fetch batch (default: 1000)
|
||||||
|
Neo4jMaxTxRetrySeconds int // ORLY_NEO4J_MAX_TX_RETRY_SEC - max transaction retry time (default: 30)
|
||||||
|
Neo4jQueryResultLimit int // ORLY_NEO4J_QUERY_RESULT_LIMIT - max results per query (default: 10000, 0=unlimited)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabase creates a database instance based on the specified type.
|
// NewDatabase creates a database instance based on the specified type.
|
||||||
|
|||||||
@@ -20,9 +20,14 @@ import (
|
|||||||
"next.orly.dev/pkg/utils/apputil"
|
"next.orly.dev/pkg/utils/apputil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// maxConcurrentQueries limits the number of concurrent Neo4j queries to prevent
|
// Default configuration values (used when config values are 0 or not set)
|
||||||
// authentication rate limiting and connection exhaustion
|
const (
|
||||||
const maxConcurrentQueries = 10
|
defaultMaxConcurrentQueries = 10
|
||||||
|
defaultMaxConnPoolSize = 25
|
||||||
|
defaultFetchSize = 1000
|
||||||
|
defaultMaxTxRetrySeconds = 30
|
||||||
|
defaultQueryResultLimit = 10000
|
||||||
|
)
|
||||||
|
|
||||||
// maxRetryAttempts is the maximum number of times to retry a query on rate limit
|
// maxRetryAttempts is the maximum number of times to retry a query on rate limit
|
||||||
const maxRetryAttempts = 3
|
const maxRetryAttempts = 3
|
||||||
@@ -45,6 +50,12 @@ type N struct {
|
|||||||
neo4jUser string
|
neo4jUser string
|
||||||
neo4jPassword string
|
neo4jPassword string
|
||||||
|
|
||||||
|
// Driver tuning options
|
||||||
|
maxConnPoolSize int // max connections in pool
|
||||||
|
fetchSize int // records per fetch batch
|
||||||
|
maxTxRetryTime time.Duration
|
||||||
|
queryResultLimit int // max results per query (0=unlimited)
|
||||||
|
|
||||||
ready chan struct{} // Closed when database is ready to serve requests
|
ready chan struct{} // Closed when database is ready to serve requests
|
||||||
|
|
||||||
// querySem limits concurrent queries to prevent rate limiting
|
// querySem limits concurrent queries to prevent rate limiting
|
||||||
@@ -118,6 +129,24 @@ func NewWithConfig(
|
|||||||
neo4jPassword = "password"
|
neo4jPassword = "password"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply defaults for driver tuning options
|
||||||
|
maxConnPoolSize := cfg.Neo4jMaxConnPoolSize
|
||||||
|
if maxConnPoolSize <= 0 {
|
||||||
|
maxConnPoolSize = defaultMaxConnPoolSize
|
||||||
|
}
|
||||||
|
fetchSize := cfg.Neo4jFetchSize
|
||||||
|
if fetchSize == 0 {
|
||||||
|
fetchSize = defaultFetchSize
|
||||||
|
}
|
||||||
|
maxTxRetrySeconds := cfg.Neo4jMaxTxRetrySeconds
|
||||||
|
if maxTxRetrySeconds <= 0 {
|
||||||
|
maxTxRetrySeconds = defaultMaxTxRetrySeconds
|
||||||
|
}
|
||||||
|
queryResultLimit := cfg.Neo4jQueryResultLimit
|
||||||
|
if queryResultLimit == 0 {
|
||||||
|
queryResultLimit = defaultQueryResultLimit
|
||||||
|
}
|
||||||
|
|
||||||
n = &N{
|
n = &N{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
@@ -126,8 +155,12 @@ func NewWithConfig(
|
|||||||
neo4jURI: neo4jURI,
|
neo4jURI: neo4jURI,
|
||||||
neo4jUser: neo4jUser,
|
neo4jUser: neo4jUser,
|
||||||
neo4jPassword: neo4jPassword,
|
neo4jPassword: neo4jPassword,
|
||||||
|
maxConnPoolSize: maxConnPoolSize,
|
||||||
|
fetchSize: fetchSize,
|
||||||
|
maxTxRetryTime: time.Duration(maxTxRetrySeconds) * time.Second,
|
||||||
|
queryResultLimit: queryResultLimit,
|
||||||
ready: make(chan struct{}),
|
ready: make(chan struct{}),
|
||||||
querySem: make(chan struct{}, maxConcurrentQueries),
|
querySem: make(chan struct{}, defaultMaxConcurrentQueries),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure the data directory exists
|
// Ensure the data directory exists
|
||||||
@@ -191,12 +224,24 @@ func New(
|
|||||||
|
|
||||||
// initNeo4jClient establishes connection to Neo4j server
|
// initNeo4jClient establishes connection to Neo4j server
|
||||||
func (n *N) initNeo4jClient() error {
|
func (n *N) initNeo4jClient() error {
|
||||||
n.Logger.Infof("connecting to neo4j at %s", n.neo4jURI)
|
n.Logger.Infof("connecting to neo4j at %s (pool=%d, fetch=%d, txRetry=%v)",
|
||||||
|
n.neo4jURI, n.maxConnPoolSize, n.fetchSize, n.maxTxRetryTime)
|
||||||
|
|
||||||
// Create Neo4j driver
|
// Create Neo4j driver with tuned configuration
|
||||||
driver, err := neo4j.NewDriverWithContext(
|
driver, err := neo4j.NewDriverWithContext(
|
||||||
n.neo4jURI,
|
n.neo4jURI,
|
||||||
neo4j.BasicAuth(n.neo4jUser, n.neo4jPassword, ""),
|
neo4j.BasicAuth(n.neo4jUser, n.neo4jPassword, ""),
|
||||||
|
func(config *neo4j.Config) {
|
||||||
|
// Limit connection pool size to reduce memory usage
|
||||||
|
config.MaxConnectionPoolSize = n.maxConnPoolSize
|
||||||
|
|
||||||
|
// Set fetch size to batch records and prevent memory overflow
|
||||||
|
// -1 means fetch all (driver default), positive value limits batch size
|
||||||
|
config.FetchSize = n.fetchSize
|
||||||
|
|
||||||
|
// Set max transaction retry time
|
||||||
|
config.MaxTransactionRetryTime = n.maxTxRetryTime
|
||||||
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create neo4j driver: %w", err)
|
return fmt.Errorf("failed to create neo4j driver: %w", err)
|
||||||
@@ -462,3 +507,19 @@ func (n *N) QuerySem() chan struct{} {
|
|||||||
func (n *N) MaxConcurrentQueries() int {
|
func (n *N) MaxConcurrentQueries() int {
|
||||||
return cap(n.querySem)
|
return cap(n.querySem)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueryResultLimit returns the configured maximum results per query.
|
||||||
|
// Returns 0 if unlimited (no limit applied).
|
||||||
|
func (n *N) QueryResultLimit() int {
|
||||||
|
return n.queryResultLimit
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchSize returns the configured fetch batch size.
|
||||||
|
func (n *N) FetchSize() int {
|
||||||
|
return n.fetchSize
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxConnPoolSize returns the configured connection pool size.
|
||||||
|
func (n *N) MaxConnPoolSize() int {
|
||||||
|
return n.maxConnPoolSize
|
||||||
|
}
|
||||||
|
|||||||
@@ -223,10 +223,25 @@ RETURN e.id AS id,
|
|||||||
// Add ordering (most recent first)
|
// Add ordering (most recent first)
|
||||||
orderClause := " ORDER BY e.created_at DESC"
|
orderClause := " ORDER BY e.created_at DESC"
|
||||||
|
|
||||||
// Add limit if specified
|
// Add limit - use the smaller of requested limit and configured max limit
|
||||||
|
// This prevents unbounded queries that could exhaust memory
|
||||||
limitClause := ""
|
limitClause := ""
|
||||||
|
requestedLimit := 0
|
||||||
if f.Limit != nil && *f.Limit > 0 {
|
if f.Limit != nil && *f.Limit > 0 {
|
||||||
params["limit"] = *f.Limit
|
requestedLimit = int(*f.Limit)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply the configured query result limit as a safety cap
|
||||||
|
// If queryResultLimit is 0 (unlimited), only use the requested limit
|
||||||
|
effectiveLimit := requestedLimit
|
||||||
|
if n.queryResultLimit > 0 {
|
||||||
|
if effectiveLimit == 0 || effectiveLimit > n.queryResultLimit {
|
||||||
|
effectiveLimit = n.queryResultLimit
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if effectiveLimit > 0 {
|
||||||
|
params["limit"] = effectiveLimit
|
||||||
limitClause = " LIMIT $limit"
|
limitClause = " LIMIT $limit"
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -358,11 +373,16 @@ func (n *N) QueryForSerials(c context.Context, f *filter.F) (
|
|||||||
return nil, fmt.Errorf("invalid query structure")
|
return nil, fmt.Errorf("invalid query structure")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rebuild query with serial-only return
|
// Rebuild query with serial-only return, preserving ORDER BY and LIMIT
|
||||||
cypher = cypherParts[0] + returnClause
|
cypher = cypherParts[0] + returnClause
|
||||||
if strings.Contains(cypherParts[1], "ORDER BY") {
|
remainder := cypherParts[1]
|
||||||
orderPart := " ORDER BY" + strings.Split(cypherParts[1], "ORDER BY")[1]
|
if strings.Contains(remainder, "ORDER BY") {
|
||||||
cypher += orderPart
|
orderAndLimit := " ORDER BY" + strings.Split(remainder, "ORDER BY")[1]
|
||||||
|
cypher += orderAndLimit
|
||||||
|
} else if strings.Contains(remainder, "LIMIT") {
|
||||||
|
// No ORDER BY but has LIMIT
|
||||||
|
limitPart := " LIMIT" + strings.Split(remainder, "LIMIT")[1]
|
||||||
|
cypher += limitPart
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := n.ExecuteRead(c, cypher, params)
|
result, err := n.ExecuteRead(c, cypher, params)
|
||||||
@@ -417,10 +437,16 @@ func (n *N) QueryForIds(c context.Context, f *filter.F) (
|
|||||||
return nil, fmt.Errorf("invalid query structure")
|
return nil, fmt.Errorf("invalid query structure")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Rebuild query preserving ORDER BY and LIMIT
|
||||||
cypher = cypherParts[0] + returnClause
|
cypher = cypherParts[0] + returnClause
|
||||||
if strings.Contains(cypherParts[1], "ORDER BY") {
|
remainder := cypherParts[1]
|
||||||
orderPart := " ORDER BY" + strings.Split(cypherParts[1], "ORDER BY")[1]
|
if strings.Contains(remainder, "ORDER BY") {
|
||||||
cypher += orderPart
|
orderAndLimit := " ORDER BY" + strings.Split(remainder, "ORDER BY")[1]
|
||||||
|
cypher += orderAndLimit
|
||||||
|
} else if strings.Contains(remainder, "LIMIT") {
|
||||||
|
// No ORDER BY but has LIMIT
|
||||||
|
limitPart := " LIMIT" + strings.Split(remainder, "LIMIT")[1]
|
||||||
|
cypher += limitPart
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := n.ExecuteRead(c, cypher, params)
|
result, err := n.ExecuteRead(c, cypher, params)
|
||||||
|
|||||||
@@ -24,6 +24,11 @@ type WriteChanSetter interface {
|
|||||||
GetWriteChan(*websocket.Conn) (chan WriteRequest, bool)
|
GetWriteChan(*websocket.Conn) (chan WriteRequest, bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NIP46SignerChecker defines the interface for checking active NIP-46 signers
|
||||||
|
type NIP46SignerChecker interface {
|
||||||
|
HasActiveNIP46Signer(signerPubkey []byte) bool
|
||||||
|
}
|
||||||
|
|
||||||
// S is the control structure for the subscription management scheme.
|
// S is the control structure for the subscription management scheme.
|
||||||
type S struct {
|
type S struct {
|
||||||
publisher.Publishers
|
publisher.Publishers
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
v0.42.0
|
v0.43.0
|
||||||
|
|||||||
Reference in New Issue
Block a user