diff --git a/.gitignore b/.gitignore index 4322ae2..47ce9fc 100644 --- a/.gitignore +++ b/.gitignore @@ -136,3 +136,19 @@ build/orly-* build/libsecp256k1-* build/SHA256SUMS-* Dockerfile +/cmd/benchmark/reports/run_20251116_172629/aggregate_report.txt +/cmd/benchmark/reports/run_20251116_172629/next-orly_results.txt +/cmd/benchmark/reports/run_20251116_173450/aggregate_report.txt +/cmd/benchmark/reports/run_20251116_173450/next-orly_results.txt +/cmd/benchmark/reports/run_20251116_173846/aggregate_report.txt +/cmd/benchmark/reports/run_20251116_173846/next-orly_results.txt +/cmd/benchmark/reports/run_20251116_174246/aggregate_report.txt +/cmd/benchmark/reports/run_20251116_174246/next-orly_results.txt +/cmd/benchmark/reports/run_20251116_182250/aggregate_report.txt +/cmd/benchmark/reports/run_20251116_182250/next-orly_results.txt +/cmd/benchmark/reports/run_20251116_203720/aggregate_report.txt +/cmd/benchmark/reports/run_20251116_203720/next-orly_results.txt +/cmd/benchmark/reports/run_20251116_225648/aggregate_report.txt +/cmd/benchmark/reports/run_20251116_225648/next-orly_results.txt +/cmd/benchmark/reports/run_20251116_233547/aggregate_report.txt +/cmd/benchmark/reports/run_20251116_233547/next-orly_results.txt diff --git a/CLAUDE.md b/CLAUDE.md index 4726e5f..2c86c30 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -8,11 +8,11 @@ ORLY is a high-performance Nostr relay written in Go, designed for personal rela **Key Technologies:** - **Language**: Go 1.25.3+ -- **Database**: Badger v4 (embedded key-value store) +- **Database**: Badger v4 (embedded key-value store) or DGraph (distributed graph database) - **Cryptography**: Custom p8k library using purego for secp256k1 operations (no CGO) - **Web UI**: Svelte frontend embedded in the binary - **WebSocket**: gorilla/websocket for Nostr protocol -- **Performance**: SIMD-accelerated SHA256 and hex encoding +- **Performance**: SIMD-accelerated SHA256 and hex encoding, query result caching with zstd compression ## Build Commands @@ -41,8 +41,8 @@ go build -o orly ### Development Mode (Web UI Hot Reload) ```bash # Terminal 1: Start relay with dev proxy -export ORLY_WEB_DISABLE_EMBEDDED=true -export ORLY_WEB_DEV_PROXY_URL=localhost:5000 +export ORLY_WEB_DISABLE=true +export ORLY_WEB_DEV_PROXY_URL=http://localhost:5173 ./orly & # Terminal 2: Start dev server @@ -89,11 +89,18 @@ go run cmd/relay-tester/main.go -url ws://localhost:3334 -test "Basic Event" ### Benchmarking ```bash -# Run benchmarks in specific package +# Run Go benchmarks in specific package go test -bench=. -benchmem ./pkg/database # Crypto benchmarks cd pkg/crypto/p8k && make bench + +# Run full relay benchmark suite +cd cmd/benchmark +go run main.go -data-dir /tmp/bench-db -events 10000 -workers 4 + +# Benchmark reports are saved to cmd/benchmark/reports/ +# The benchmark tool tests event storage, queries, and subscription performance ``` ## Running the Relay @@ -131,6 +138,18 @@ export ORLY_SPROCKET_ENABLED=true # Enable policy system export ORLY_POLICY_ENABLED=true + +# Database backend selection (badger or dgraph) +export ORLY_DB_TYPE=badger +export ORLY_DGRAPH_URL=localhost:9080 # Only for dgraph backend + +# Query cache configuration (improves REQ response times) +export ORLY_QUERY_CACHE_SIZE_MB=512 # Default: 512MB +export ORLY_QUERY_CACHE_MAX_AGE=5m # Cache expiry time + +# Database cache tuning (for Badger backend) +export ORLY_DB_BLOCK_CACHE_MB=512 # Block cache size +export ORLY_DB_INDEX_CACHE_MB=256 # Index cache size ``` ## Code Architecture @@ -155,10 +174,12 @@ export ORLY_POLICY_ENABLED=true - `web.go` - Embedded web UI serving and dev proxy - `config/` - Environment variable configuration using go-simpler.org/env -**`pkg/database/`** - Badger-based event storage -- `database.go` - Database initialization with cache tuning +**`pkg/database/`** - Database abstraction layer with multiple backend support +- `interface.go` - Database interface definition for pluggable backends +- `factory.go` - Database backend selection (Badger or DGraph) +- `database.go` - Badger implementation with cache tuning and query cache - `save-event.go` - Event storage with index updates -- `query-events.go` - Main query execution engine +- `query-events.go` - Main query execution engine with filter normalization - `query-for-*.go` - Specialized query builders for different filter patterns - `indexes/` - Index key construction for efficient lookups - `export.go` / `import.go` - Event export/import in JSONL format @@ -238,10 +259,19 @@ export ORLY_POLICY_ENABLED=true - This avoids CGO complexity while maintaining C library performance - `libsecp256k1.so` must be in `LD_LIBRARY_PATH` or same directory as binary +**Database Backend Selection:** +- Supports multiple backends via `ORLY_DB_TYPE` environment variable +- **Badger** (default): Embedded key-value store with custom indexing, ideal for single-instance deployments +- **DGraph**: Distributed graph database for larger, multi-node deployments +- Backend selected via factory pattern in `pkg/database/factory.go` +- All backends implement the same `Database` interface defined in `pkg/database/interface.go` + **Database Query Pattern:** - Filters are analyzed in `get-indexes-from-filter.go` to determine optimal query strategy +- Filters are normalized before cache lookup, ensuring identical queries with different field ordering hit the cache - Different query builders (`query-for-kinds.go`, `query-for-authors.go`, etc.) handle specific filter patterns - All queries return event serials (uint64) for efficient joining +- Query results cached with zstd level 9 compression (configurable size and TTL) - Final events fetched via `fetch-events-by-serials.go` **WebSocket Message Flow:** @@ -272,7 +302,7 @@ export ORLY_POLICY_ENABLED=true ### Making Changes to Web UI 1. Edit files in `app/web/src/` -2. For hot reload: `cd app/web && bun run dev` (with `ORLY_WEB_DISABLE_EMBEDDED=true`) +2. For hot reload: `cd app/web && bun run dev` (with `ORLY_WEB_DISABLE=true` and `ORLY_WEB_DEV_PROXY_URL=http://localhost:5173`) 3. For production build: `./scripts/update-embedded-web.sh` ### Adding New Nostr Protocol Handlers @@ -377,12 +407,42 @@ sudo journalctl -u orly -f ## Performance Considerations -- **Database Caching**: Tune `ORLY_DB_BLOCK_CACHE_MB` and `ORLY_DB_INDEX_CACHE_MB` for workload -- **Query Optimization**: Add indexes for common filter patterns +- **Query Cache**: 512MB query result cache (configurable via `ORLY_QUERY_CACHE_SIZE_MB`) with zstd level 9 compression reduces database load for repeated queries +- **Filter Normalization**: Filters are normalized before cache lookup, so identical queries with different field ordering produce cache hits +- **Database Caching**: Tune `ORLY_DB_BLOCK_CACHE_MB` and `ORLY_DB_INDEX_CACHE_MB` for workload (Badger backend only) +- **Query Optimization**: Add indexes for common filter patterns; multiple specialized query builders optimize different filter combinations +- **Batch Operations**: ID lookups and event fetching use batch operations via `GetSerialsByIds` and `FetchEventsBySerials` - **Memory Pooling**: Use buffer pools in encoders (see `pkg/encoders/event/`) -- **SIMD Operations**: Leverage minio/sha256-simd and templexxx/xhex +- **SIMD Operations**: Leverage minio/sha256-simd and templexxx/xhex for cryptographic operations - **Goroutine Management**: Each WebSocket connection runs in its own goroutine +## Recent Optimizations + +ORLY has received several significant performance improvements in recent updates: + +### Query Cache System (Latest) +- 512MB query result cache with zstd level 9 compression +- Filter normalization ensures cache hits regardless of filter field ordering +- Configurable size (`ORLY_QUERY_CACHE_SIZE_MB`) and TTL (`ORLY_QUERY_CACHE_MAX_AGE`) +- Dramatically reduces database load for repeated queries (common in Nostr clients) +- Cache key includes normalized filter representation for optimal hit rate + +### Badger Cache Tuning +- Optimized block cache (default 512MB, tune via `ORLY_DB_BLOCK_CACHE_MB`) +- Optimized index cache (default 256MB, tune via `ORLY_DB_INDEX_CACHE_MB`) +- Resulted in 10-15% improvement in most benchmark scenarios +- See git history for cache tuning evolution + +### Query Execution Improvements +- Multiple specialized query builders for different filter patterns: + - `query-for-kinds.go` - Kind-based queries + - `query-for-authors.go` - Author-based queries + - `query-for-tags.go` - Tag-based queries + - Combination builders for `kinds+authors`, `kinds+tags`, `kinds+authors+tags` +- Batch operations for ID lookups via `GetSerialsByIds` +- Serial-based event fetching for efficiency +- Filter analysis in `get-indexes-from-filter.go` selects optimal strategy + ## Release Process 1. Update version in `pkg/version/version` file (e.g., v1.2.3) diff --git a/docs/NEO4J_BACKEND.md b/docs/NEO4J_BACKEND.md new file mode 100644 index 0000000..8d9d8f1 --- /dev/null +++ b/docs/NEO4J_BACKEND.md @@ -0,0 +1,406 @@ +# Neo4j Database Backend for ORLY Relay + +## Overview + +The Neo4j database backend provides a graph-native storage solution for the ORLY Nostr relay. Unlike traditional key-value or document stores, Neo4j is optimized for relationship-heavy queries, making it an ideal fit for Nostr's social graph and event reference patterns. + +## Architecture + +### Core Components + +1. **Main Database File** ([pkg/neo4j/neo4j.go](../pkg/neo4j/neo4j.go)) + - Implements the `database.Database` interface + - Manages Neo4j driver connection and lifecycle + - Uses Badger for metadata storage (markers, identity, subscriptions) + - Registers with the database factory via `init()` + +2. **Schema Management** ([pkg/neo4j/schema.go](../pkg/neo4j/schema.go)) + - Defines Neo4j constraints and indexes using Cypher + - Creates unique constraints on Event IDs and Author pubkeys + - Indexes for optimal query performance (kind, created_at, tags) + +3. **Query Engine** ([pkg/neo4j/query-events.go](../pkg/neo4j/query-events.go)) + - Translates Nostr REQ filters to Cypher queries + - Leverages graph traversal for tag relationships + - Supports prefix matching for IDs and pubkeys + - Parameterized queries for security and performance + +4. **Event Storage** ([pkg/neo4j/save-event.go](../pkg/neo4j/save-event.go)) + - Stores events as nodes with properties + - Creates graph relationships: + - `AUTHORED_BY`: Event → Author + - `REFERENCES`: Event → Event (e-tags) + - `MENTIONS`: Event → Author (p-tags) + - `TAGGED_WITH`: Event → Tag + +## Graph Schema + +### Node Types + +**Event Node** +```cypher +(:Event { + id: string, // Hex-encoded event ID (32 bytes) + serial: int, // Sequential serial number + kind: int, // Event kind + created_at: int, // Unix timestamp + content: string, // Event content + sig: string, // Hex-encoded signature + pubkey: string, // Hex-encoded author pubkey + tags: string // JSON-encoded tags array +}) +``` + +**Author Node** +```cypher +(:Author { + pubkey: string // Hex-encoded pubkey (unique) +}) +``` + +**Tag Node** +```cypher +(:Tag { + type: string, // Tag type (e.g., "t", "d") + value: string // Tag value +}) +``` + +**Marker Node** (for metadata) +```cypher +(:Marker { + key: string, // Unique key + value: string // Hex-encoded value +}) +``` + +### Relationships + +- `(:Event)-[:AUTHORED_BY]->(:Author)` - Event authorship +- `(:Event)-[:REFERENCES]->(:Event)` - Event references (e-tags) +- `(:Event)-[:MENTIONS]->(:Author)` - Author mentions (p-tags) +- `(:Event)-[:TAGGED_WITH]->(:Tag)` - Generic tag associations + +## How Nostr REQ Messages Are Implemented + +### Filter to Cypher Translation + +The query engine in [query-events.go](../pkg/neo4j/query-events.go) translates Nostr filters to Cypher queries: + +#### 1. ID Filters +```json +{"ids": ["abc123..."]} +``` +Becomes: +```cypher +MATCH (e:Event) +WHERE e.id = $id_0 +``` + +For prefix matching (partial IDs): +```cypher +WHERE e.id STARTS WITH $id_0 +``` + +#### 2. Author Filters +```json +{"authors": ["pubkey1...", "pubkey2..."]} +``` +Becomes: +```cypher +MATCH (e:Event) +WHERE e.pubkey IN $authors +``` + +#### 3. Kind Filters +```json +{"kinds": [1, 7]} +``` +Becomes: +```cypher +MATCH (e:Event) +WHERE e.kind IN $kinds +``` + +#### 4. Time Range Filters +```json +{"since": 1234567890, "until": 1234567900} +``` +Becomes: +```cypher +MATCH (e:Event) +WHERE e.created_at >= $since AND e.created_at <= $until +``` + +#### 5. Tag Filters (Graph Advantage!) +```json +{"#t": ["bitcoin", "nostr"]} +``` +Becomes: +```cypher +MATCH (e:Event) +OPTIONAL MATCH (e)-[:TAGGED_WITH]->(t0:Tag) +WHERE t0.type = $tagType_0 AND t0.value IN $tagValues_0 +``` + +This leverages Neo4j's native graph traversal for efficient tag queries! + +#### 6. Combined Filters +```json +{ + "kinds": [1], + "authors": ["abc..."], + "#p": ["xyz..."], + "limit": 50 +} +``` +Becomes: +```cypher +MATCH (e:Event) +OPTIONAL MATCH (e)-[:TAGGED_WITH]->(t0:Tag) +WHERE e.kind IN $kinds + AND e.pubkey IN $authors + AND t0.type = $tagType_0 + AND t0.value IN $tagValues_0 +RETURN e.id, e.kind, e.created_at, e.content, e.sig, e.pubkey, e.tags +ORDER BY e.created_at DESC +LIMIT $limit +``` + +### Query Execution Flow + +1. **Parse Filter**: Extract IDs, authors, kinds, times, tags +2. **Build Cypher**: Construct parameterized query with MATCH/WHERE clauses +3. **Execute**: Run via `ExecuteRead()` with read-only session +4. **Parse Results**: Convert Neo4j records to Nostr events +5. **Return**: Send events back to client + +## Configuration + +### Environment Variables + +```bash +# Neo4j Connection +ORLY_NEO4J_URI="bolt://localhost:7687" +ORLY_NEO4J_USER="neo4j" +ORLY_NEO4J_PASSWORD="password" + +# Database Type Selection +ORLY_DB_TYPE="neo4j" + +# Data Directory (for Badger metadata storage) +ORLY_DATA_DIR="~/.local/share/ORLY" +``` + +### Example Docker Compose Setup + +```yaml +version: '3.8' +services: + neo4j: + image: neo4j:5.15 + ports: + - "7474:7474" # HTTP + - "7687:7687" # Bolt + environment: + - NEO4J_AUTH=neo4j/password + - NEO4J_PLUGINS=["apoc"] + volumes: + - neo4j_data:/data + - neo4j_logs:/logs + + orly: + build: . + ports: + - "3334:3334" + environment: + - ORLY_DB_TYPE=neo4j + - ORLY_NEO4J_URI=bolt://neo4j:7687 + - ORLY_NEO4J_USER=neo4j + - ORLY_NEO4J_PASSWORD=password + depends_on: + - neo4j + +volumes: + neo4j_data: + neo4j_logs: +``` + +## Performance Considerations + +### Advantages Over Badger/DGraph + +1. **Native Graph Queries**: Tag relationships and social graph traversals are native operations +2. **Optimized Indexes**: Automatic index usage for constrained properties +3. **Efficient Joins**: Relationship traversals are O(1) lookups +4. **Query Planner**: Neo4j's query planner optimizes complex multi-filter queries + +### Tuning Recommendations + +1. **Indexes**: The schema creates indexes for: + - Event ID (unique constraint + index) + - Event kind + - Event created_at + - Composite: kind + created_at + - Tag type + value + +2. **Cache Configuration**: Configure Neo4j's page cache and heap size: +```conf +# neo4j.conf +dbms.memory.heap.initial_size=2G +dbms.memory.heap.max_size=4G +dbms.memory.pagecache.size=4G +``` + +3. **Query Limits**: Always use LIMIT in queries to prevent memory exhaustion + +## Implementation Details + +### Replaceable Events + +Replaceable events (kinds 0, 3, 10000-19999) are handled in `WouldReplaceEvent()`: + +```cypher +MATCH (e:Event {kind: $kind, pubkey: $pubkey}) +WHERE e.created_at < $createdAt +RETURN e.serial, e.created_at +``` + +Older events are deleted before saving the new one. + +### Parameterized Replaceable Events + +For kinds 30000-39999, we also match on the d-tag: + +```cypher +MATCH (e:Event {kind: $kind, pubkey: $pubkey})-[:TAGGED_WITH]->(t:Tag {type: 'd', value: $dValue}) +WHERE e.created_at < $createdAt +RETURN e.serial +``` + +### Event Deletion (NIP-09) + +Delete events (kind 5) are processed via graph traversal: + +```cypher +MATCH (target:Event {id: $targetId}) +MATCH (delete:Event {kind: 5})-[:REFERENCES]->(target) +WHERE delete.pubkey = $pubkey OR delete.pubkey IN $admins +RETURN delete.id +``` + +Only same-author or admin deletions are allowed. + +## Comparison with Other Backends + +| Feature | Badger | DGraph | Neo4j | +|---------|--------|--------|-------| +| **Storage Type** | Key-value | Graph (distributed) | Graph (native) | +| **Query Language** | Custom indexes | DQL | Cypher | +| **Tag Queries** | Index lookups | Graph traversal | Native relationships | +| **Scaling** | Single-node | Distributed | Cluster/Causal cluster | +| **Memory Usage** | Low | Medium | High | +| **Setup Complexity** | Minimal | Medium | Medium | +| **Best For** | Small relays | Large distributed | Relationship-heavy | + +## Development Guide + +### Adding New Indexes + +1. Update [schema.go](../pkg/neo4j/schema.go) with new index definition +2. Add to `applySchema()` function +3. Restart relay to apply schema changes + +Example: +```cypher +CREATE INDEX event_content_fulltext IF NOT EXISTS +FOR (e:Event) ON (e.content) +OPTIONS {indexConfig: {`fulltext.analyzer`: 'english'}} +``` + +### Custom Queries + +To add custom query methods: + +1. Add method to [query-events.go](../pkg/neo4j/query-events.go) +2. Build Cypher query with parameterization +3. Use `ExecuteRead()` or `ExecuteWrite()` as appropriate +4. Parse results with `parseEventsFromResult()` + +### Testing + +Due to Neo4j dependency, tests require a running Neo4j instance: + +```bash +# Start Neo4j via Docker +docker run -d --name neo4j-test \ + -p 7687:7687 \ + -e NEO4J_AUTH=neo4j/test \ + neo4j:5.15 + +# Run tests +ORLY_NEO4J_URI="bolt://localhost:7687" \ +ORLY_NEO4J_USER="neo4j" \ +ORLY_NEO4J_PASSWORD="test" \ +go test ./pkg/neo4j/... + +# Cleanup +docker rm -f neo4j-test +``` + +## Future Enhancements + +1. **Full-text Search**: Leverage Neo4j's full-text indexes for content search +2. **Graph Analytics**: Implement social graph metrics (centrality, communities) +3. **Advanced Queries**: Support NIP-50 search via Cypher full-text capabilities +4. **Clustering**: Deploy Neo4j cluster for high availability +5. **APOC Procedures**: Utilize APOC library for advanced graph algorithms +6. **Caching Layer**: Implement query result caching similar to Badger backend + +## Troubleshooting + +### Connection Issues + +```bash +# Test connectivity +cypher-shell -a bolt://localhost:7687 -u neo4j -p password + +# Check Neo4j logs +docker logs neo4j +``` + +### Performance Issues + +```cypher +// View query execution plan +EXPLAIN MATCH (e:Event) WHERE e.kind = 1 RETURN e LIMIT 10 + +// Profile query performance +PROFILE MATCH (e:Event)-[:AUTHORED_BY]->(a:Author) RETURN e, a LIMIT 10 +``` + +### Schema Issues + +```cypher +// List all constraints +SHOW CONSTRAINTS + +// List all indexes +SHOW INDEXES + +// Drop and recreate schema +DROP CONSTRAINT event_id_unique IF EXISTS +CREATE CONSTRAINT event_id_unique FOR (e:Event) REQUIRE e.id IS UNIQUE +``` + +## References + +- [Neo4j Documentation](https://neo4j.com/docs/) +- [Cypher Query Language](https://neo4j.com/docs/cypher-manual/current/) +- [Neo4j Go Driver](https://neo4j.com/docs/go-manual/current/) +- [Graph Database Patterns](https://neo4j.com/developer/graph-db-vs-rdbms/) +- [Nostr Protocol (NIP-01)](https://github.com/nostr-protocol/nips/blob/master/01.md) + +## License + +This Neo4j backend implementation follows the same license as the ORLY relay project. diff --git a/go.mod b/go.mod index e9f2562..71366ec 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/google/pprof v0.0.0-20251007162407-5df77e3f7d1d // indirect github.com/klauspost/compress v1.18.1 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect + github.com/neo4j/neo4j-go-driver/v5 v5.28.4 // indirect github.com/pkg/errors v0.8.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/templexxx/cpu v0.1.1 // indirect diff --git a/go.sum b/go.sum index 3ac3602..9d8c47d 100644 --- a/go.sum +++ b/go.sum @@ -94,6 +94,8 @@ github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1 github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= +github.com/neo4j/neo4j-go-driver/v5 v5.28.4 h1:7toxehVcYkZbyxV4W3Ib9VcnyRBQPucF+VwNNmtSXi4= +github.com/neo4j/neo4j-go-driver/v5 v5.28.4/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k= github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/main.go b/main.go index 0e435ee..6df2879 100644 --- a/main.go +++ b/main.go @@ -22,6 +22,7 @@ import ( "next.orly.dev/pkg/crypto/keys" "next.orly.dev/pkg/database" _ "next.orly.dev/pkg/dgraph" // Import to register dgraph factory + _ "next.orly.dev/pkg/neo4j" // Import to register neo4j factory "next.orly.dev/pkg/encoders/hex" "next.orly.dev/pkg/utils/interrupt" "next.orly.dev/pkg/version" diff --git a/pkg/database/factory.go b/pkg/database/factory.go index ed2ac6b..cd5f5ad 100644 --- a/pkg/database/factory.go +++ b/pkg/database/factory.go @@ -7,7 +7,7 @@ import ( ) // NewDatabase creates a database instance based on the specified type. -// Supported types: "badger", "dgraph" +// Supported types: "badger", "dgraph", "neo4j" func NewDatabase( ctx context.Context, cancel context.CancelFunc, @@ -23,8 +23,12 @@ func NewDatabase( // Use the new dgraph implementation // Import dynamically to avoid import cycles return newDgraphDatabase(ctx, cancel, dataDir, logLevel) + case "neo4j": + // Use the new neo4j implementation + // Import dynamically to avoid import cycles + return newNeo4jDatabase(ctx, cancel, dataDir, logLevel) default: - return nil, fmt.Errorf("unsupported database type: %s (supported: badger, dgraph)", dbType) + return nil, fmt.Errorf("unsupported database type: %s (supported: badger, dgraph, neo4j)", dbType) } } @@ -37,3 +41,13 @@ var newDgraphDatabase func(context.Context, context.CancelFunc, string, string) func RegisterDgraphFactory(factory func(context.Context, context.CancelFunc, string, string) (Database, error)) { newDgraphDatabase = factory } + +// newNeo4jDatabase creates a neo4j database instance +// This is defined here to avoid import cycles +var newNeo4jDatabase func(context.Context, context.CancelFunc, string, string) (Database, error) + +// RegisterNeo4jFactory registers the neo4j database factory +// This is called from the neo4j package's init() function +func RegisterNeo4jFactory(factory func(context.Context, context.CancelFunc, string, string) (Database, error)) { + newNeo4jDatabase = factory +} diff --git a/pkg/neo4j/README.md b/pkg/neo4j/README.md new file mode 100644 index 0000000..713ab44 --- /dev/null +++ b/pkg/neo4j/README.md @@ -0,0 +1,132 @@ +# Neo4j Database Backend + +A graph database backend implementation for the ORLY Nostr relay using Neo4j. + +## Quick Start + +### 1. Start Neo4j + +```bash +docker run -d --name neo4j \ + -p 7474:7474 -p 7687:7687 \ + -e NEO4J_AUTH=neo4j/password \ + neo4j:5.15 +``` + +### 2. Configure Environment + +```bash +export ORLY_DB_TYPE=neo4j +export ORLY_NEO4J_URI=bolt://localhost:7687 +export ORLY_NEO4J_USER=neo4j +export ORLY_NEO4J_PASSWORD=password +``` + +### 3. Run ORLY + +```bash +./orly +``` + +## Features + +- **Graph-Native Storage**: Events, authors, and tags stored as nodes and relationships +- **Efficient Queries**: Leverages Neo4j's native graph traversal for tag and social graph queries +- **Cypher Query Language**: Powerful, expressive query language for complex filters +- **Automatic Indexing**: Unique constraints and indexes for optimal performance +- **Relationship Queries**: Native support for event references, mentions, and tags + +## Architecture + +See [docs/NEO4J_BACKEND.md](../../docs/NEO4J_BACKEND.md) for comprehensive documentation on: +- Graph schema design +- How Nostr REQ messages are implemented in Cypher +- Performance tuning +- Development guide +- Comparison with other backends + +## File Structure + +- `neo4j.go` - Main database implementation +- `schema.go` - Graph schema and index definitions +- `query-events.go` - REQ filter to Cypher translation +- `save-event.go` - Event storage with relationship creation +- `fetch-event.go` - Event retrieval by serial/ID +- `serial.go` - Serial number management +- `markers.go` - Metadata key-value storage +- `identity.go` - Relay identity management +- `delete.go` - Event deletion (NIP-09) +- `subscriptions.go` - Subscription management +- `nip43.go` - Invite-based ACL (NIP-43) +- `import-export.go` - Event import/export +- `logger.go` - Logging infrastructure + +## Testing + +```bash +# Start Neo4j test instance +docker run -d --name neo4j-test \ + -p 7687:7687 \ + -e NEO4J_AUTH=neo4j/test \ + neo4j:5.15 + +# Run tests +ORLY_NEO4J_URI="bolt://localhost:7687" \ +ORLY_NEO4J_USER="neo4j" \ +ORLY_NEO4J_PASSWORD="test" \ +go test ./pkg/neo4j/... + +# Cleanup +docker rm -f neo4j-test +``` + +## Example Cypher Queries + +### Find all events by an author +```cypher +MATCH (e:Event {pubkey: "abc123..."}) +RETURN e +ORDER BY e.created_at DESC +``` + +### Find events with specific tags +```cypher +MATCH (e:Event)-[:TAGGED_WITH]->(t:Tag {type: "t", value: "bitcoin"}) +RETURN e +``` + +### Social graph query +```cypher +MATCH (author:Author {pubkey: "abc123..."}) +<-[:AUTHORED_BY]-(e:Event) +-[:MENTIONS]->(mentioned:Author) +RETURN author, e, mentioned +``` + +## Performance Tips + +1. **Use Limits**: Always include LIMIT in queries +2. **Index Usage**: Ensure queries use indexed properties (id, kind, created_at) +3. **Parameterize**: Use parameterized queries to enable query plan caching +4. **Monitor**: Use `EXPLAIN` and `PROFILE` to analyze query performance + +## Limitations + +- Requires external Neo4j database (not embedded) +- Higher memory usage compared to Badger +- Metadata still uses Badger (markers, subscriptions) +- More complex deployment than single-binary solutions + +## Why Neo4j for Nostr? + +Nostr is inherently a social graph with heavy relationship queries: +- Event references (e-tags) → Graph edges +- Author mentions (p-tags) → Graph edges +- Follow relationships → Graph structure +- Thread traversal → Path queries + +Neo4j excels at these patterns, making it a natural fit for relationship-heavy Nostr queries. + +## License + +Same as ORLY relay project. diff --git a/pkg/neo4j/delete.go b/pkg/neo4j/delete.go new file mode 100644 index 0000000..2c37e63 --- /dev/null +++ b/pkg/neo4j/delete.go @@ -0,0 +1,173 @@ +package neo4j + +import ( + "context" + "fmt" + + "next.orly.dev/pkg/database/indexes/types" + "next.orly.dev/pkg/encoders/event" + "next.orly.dev/pkg/encoders/hex" +) + +// DeleteEvent deletes an event by its ID +func (n *N) DeleteEvent(c context.Context, eid []byte) error { + idStr := hex.Enc(eid) + + cypher := "MATCH (e:Event {id: $id}) DETACH DELETE e" + params := map[string]any{"id": idStr} + + _, err := n.ExecuteWrite(c, cypher, params) + if err != nil { + return fmt.Errorf("failed to delete event: %w", err) + } + + return nil +} + +// DeleteEventBySerial deletes an event by its serial number +func (n *N) DeleteEventBySerial(c context.Context, ser *types.Uint40, ev *event.E) error { + serial := ser.Get() + + cypher := "MATCH (e:Event {serial: $serial}) DETACH DELETE e" + params := map[string]any{"serial": int64(serial)} + + _, err := n.ExecuteWrite(c, cypher, params) + if err != nil { + return fmt.Errorf("failed to delete event: %w", err) + } + + return nil +} + +// DeleteExpired deletes expired events (stub implementation) +func (n *N) DeleteExpired() { + // This would need to implement expiration logic based on event.expiration tag (NIP-40) + // For now, this is a no-op +} + +// ProcessDelete processes a kind 5 deletion event +func (n *N) ProcessDelete(ev *event.E, admins [][]byte) error { + // Deletion events (kind 5) can delete events by the same author + // or by relay admins + + // Check if this is a kind 5 event + if ev.Kind != 5 { + return fmt.Errorf("not a deletion event") + } + + // Get all 'e' tags (event IDs to delete) + eTags := ev.Tags.GetAll([]byte{'e'}) + if len(eTags) == 0 { + return nil // Nothing to delete + } + + ctx := context.Background() + isAdmin := false + + // Check if author is an admin + for _, adminPk := range admins { + if string(ev.Pubkey[:]) == string(adminPk) { + isAdmin = true + break + } + } + + // For each event ID in e-tags, delete it if allowed + for _, eTag := range eTags { + if len(eTag.T) < 2 { + continue + } + + eventIDStr := string(eTag.T[1]) + eventID, err := hex.Dec(eventIDStr) + if err != nil { + continue + } + + // Fetch the event to check authorship + cypher := "MATCH (e:Event {id: $id}) RETURN e.pubkey AS pubkey" + params := map[string]any{"id": eventIDStr} + + result, err := n.ExecuteRead(ctx, cypher, params) + if err != nil { + continue + } + + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if !ok { + continue + } + + if neo4jResult.Next(ctx) { + record := neo4jResult.Record() + if record != nil { + recordMap, ok := (*record).(map[string]any) + if ok { + if pubkeyStr, ok := recordMap["pubkey"].(string); ok { + pubkey, err := hex.Dec(pubkeyStr) + if err != nil { + continue + } + + // Check if deletion is allowed (same author or admin) + canDelete := isAdmin || string(ev.Pubkey[:]) == string(pubkey) + if canDelete { + // Delete the event + if err := n.DeleteEvent(ctx, eventID); err != nil { + n.Logger.Warningf("failed to delete event %s: %v", eventIDStr, err) + } + } + } + } + } + } + } + + return nil +} + +// CheckForDeleted checks if an event has been deleted +func (n *N) CheckForDeleted(ev *event.E, admins [][]byte) error { + // Query for kind 5 events that reference this event + ctx := context.Background() + idStr := hex.Enc(ev.ID[:]) + + // Build cypher query to find deletion events + cypher := ` +MATCH (target:Event {id: $targetId}) +MATCH (delete:Event {kind: 5})-[:REFERENCES]->(target) +WHERE delete.pubkey = $pubkey OR delete.pubkey IN $admins +RETURN delete.id AS id +LIMIT 1` + + adminPubkeys := make([]string, len(admins)) + for i, admin := range admins { + adminPubkeys[i] = hex.Enc(admin) + } + + params := map[string]any{ + "targetId": idStr, + "pubkey": hex.Enc(ev.Pubkey[:]), + "admins": adminPubkeys, + } + + result, err := n.ExecuteRead(ctx, cypher, params) + if err != nil { + return nil // Not deleted + } + + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if ok && neo4jResult.Next(ctx) { + return fmt.Errorf("event has been deleted") + } + + return nil +} diff --git a/pkg/neo4j/fetch-event.go b/pkg/neo4j/fetch-event.go new file mode 100644 index 0000000..299d42c --- /dev/null +++ b/pkg/neo4j/fetch-event.go @@ -0,0 +1,444 @@ +package neo4j + +import ( + "context" + "fmt" + + "next.orly.dev/pkg/database" + "next.orly.dev/pkg/database/indexes/types" + "next.orly.dev/pkg/encoders/event" + "next.orly.dev/pkg/encoders/hex" + "next.orly.dev/pkg/encoders/tag" + "next.orly.dev/pkg/interfaces/store" +) + +// FetchEventBySerial retrieves an event by its serial number +func (n *N) FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error) { + serial := ser.Get() + + cypher := ` +MATCH (e:Event {serial: $serial}) +RETURN e.id AS id, + e.kind AS kind, + e.created_at AS created_at, + e.content AS content, + e.sig AS sig, + e.pubkey AS pubkey, + e.tags AS tags` + + params := map[string]any{"serial": int64(serial)} + + result, err := n.ExecuteRead(context.Background(), cypher, params) + if err != nil { + return nil, fmt.Errorf("failed to fetch event by serial: %w", err) + } + + evs, err := n.parseEventsFromResult(result) + if err != nil { + return nil, err + } + + if len(evs) == 0 { + return nil, fmt.Errorf("event not found") + } + + return evs[0], nil +} + +// FetchEventsBySerials retrieves multiple events by their serial numbers +func (n *N) FetchEventsBySerials(serials []*types.Uint40) ( + events map[uint64]*event.E, err error, +) { + if len(serials) == 0 { + return make(map[uint64]*event.E), nil + } + + // Build list of serial numbers + serialNums := make([]int64, len(serials)) + for i, ser := range serials { + serialNums[i] = int64(ser.Get()) + } + + cypher := ` +MATCH (e:Event) +WHERE e.serial IN $serials +RETURN e.id AS id, + e.kind AS kind, + e.created_at AS created_at, + e.content AS content, + e.sig AS sig, + e.pubkey AS pubkey, + e.tags AS tags, + e.serial AS serial` + + params := map[string]any{"serials": serialNums} + + result, err := n.ExecuteRead(context.Background(), cypher, params) + if err != nil { + return nil, fmt.Errorf("failed to fetch events by serials: %w", err) + } + + // Parse events and map by serial + events = make(map[uint64]*event.E) + ctx := context.Background() + + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if !ok { + return events, nil + } + + for neo4jResult.Next(ctx) { + record := neo4jResult.Record() + if record == nil { + continue + } + + recordMap, ok := (*record).(map[string]any) + if !ok { + continue + } + + // Parse event + idStr, _ := recordMap["id"].(string) + kind, _ := recordMap["kind"].(int64) + createdAt, _ := recordMap["created_at"].(int64) + content, _ := recordMap["content"].(string) + sigStr, _ := recordMap["sig"].(string) + pubkeyStr, _ := recordMap["pubkey"].(string) + tagsStr, _ := recordMap["tags"].(string) + serialVal, _ := recordMap["serial"].(int64) + + id, err := hex.Dec(idStr) + if err != nil { + continue + } + sig, err := hex.Dec(sigStr) + if err != nil { + continue + } + pubkey, err := hex.Dec(pubkeyStr) + if err != nil { + continue + } + + tags := tag.NewS() + if tagsStr != "" { + _ = tags.UnmarshalJSON([]byte(tagsStr)) + } + + e := &event.E{ + Kind: uint16(kind), + CreatedAt: createdAt, + Content: []byte(content), + Tags: tags, + } + + copy(e.ID[:], id) + copy(e.Sig[:], sig) + copy(e.Pubkey[:], pubkey) + + events[uint64(serialVal)] = e + } + + return events, nil +} + +// GetSerialById retrieves the serial number for an event ID +func (n *N) GetSerialById(id []byte) (ser *types.Uint40, err error) { + idStr := hex.Enc(id) + + cypher := "MATCH (e:Event {id: $id}) RETURN e.serial AS serial" + params := map[string]any{"id": idStr} + + result, err := n.ExecuteRead(context.Background(), cypher, params) + if err != nil { + return nil, fmt.Errorf("failed to get serial by ID: %w", err) + } + + ctx := context.Background() + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if !ok { + return nil, fmt.Errorf("invalid result type") + } + + if neo4jResult.Next(ctx) { + record := neo4jResult.Record() + if record != nil { + recordMap, ok := (*record).(map[string]any) + if ok { + if serialVal, ok := recordMap["serial"].(int64); ok { + ser = &types.Uint40{} + ser.Set(uint64(serialVal)) + return ser, nil + } + } + } + } + + return nil, fmt.Errorf("event not found") +} + +// GetSerialsByIds retrieves serial numbers for multiple event IDs +func (n *N) GetSerialsByIds(ids *tag.T) ( + serials map[string]*types.Uint40, err error, +) { + serials = make(map[string]*types.Uint40) + + if len(ids.T) == 0 { + return serials, nil + } + + // Extract ID strings + idStrs := make([]string, 0, len(ids.T)) + for _, idTag := range ids.T { + if len(idTag) >= 2 { + idStrs = append(idStrs, string(idTag[1])) + } + } + + if len(idStrs) == 0 { + return serials, nil + } + + cypher := ` +MATCH (e:Event) +WHERE e.id IN $ids +RETURN e.id AS id, e.serial AS serial` + + params := map[string]any{"ids": idStrs} + + result, err := n.ExecuteRead(context.Background(), cypher, params) + if err != nil { + return nil, fmt.Errorf("failed to get serials by IDs: %w", err) + } + + ctx := context.Background() + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if !ok { + return serials, nil + } + + for neo4jResult.Next(ctx) { + record := neo4jResult.Record() + if record == nil { + continue + } + + recordMap, ok := (*record).(map[string]any) + if !ok { + continue + } + + idStr, _ := recordMap["id"].(string) + serialVal, _ := recordMap["serial"].(int64) + + serial := &types.Uint40{} + serial.Set(uint64(serialVal)) + serials[idStr] = serial + } + + return serials, nil +} + +// GetSerialsByIdsWithFilter retrieves serials with a filter function +func (n *N) GetSerialsByIdsWithFilter( + ids *tag.T, fn func(ev *event.E, ser *types.Uint40) bool, +) (serials map[string]*types.Uint40, err error) { + serials = make(map[string]*types.Uint40) + + if fn == nil { + // No filter, just return all + return n.GetSerialsByIds(ids) + } + + // With filter, need to fetch events + for _, idTag := range ids.T { + if len(idTag) < 2 { + continue + } + + idBytes, err := hex.Dec(string(idTag[1])) + if err != nil { + continue + } + + serial, err := n.GetSerialById(idBytes) + if err != nil { + continue + } + + ev, err := n.FetchEventBySerial(serial) + if err != nil { + continue + } + + if fn(ev, serial) { + serials[string(idTag[1])] = serial + } + } + + return serials, nil +} + +// GetSerialsByRange retrieves serials within a range +func (n *N) GetSerialsByRange(idx database.Range) ( + serials types.Uint40s, err error, +) { + // This would need to be implemented based on how ranges are defined + // For now, returning not implemented + err = fmt.Errorf("not implemented") + return +} + +// GetFullIdPubkeyBySerial retrieves ID and pubkey for a serial number +func (n *N) GetFullIdPubkeyBySerial(ser *types.Uint40) ( + fidpk *store.IdPkTs, err error, +) { + serial := ser.Get() + + cypher := ` +MATCH (e:Event {serial: $serial}) +RETURN e.id AS id, + e.pubkey AS pubkey, + e.created_at AS created_at` + + params := map[string]any{"serial": int64(serial)} + + result, err := n.ExecuteRead(context.Background(), cypher, params) + if err != nil { + return nil, fmt.Errorf("failed to get ID and pubkey by serial: %w", err) + } + + ctx := context.Background() + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if !ok { + return nil, fmt.Errorf("invalid result type") + } + + if neo4jResult.Next(ctx) { + record := neo4jResult.Record() + if record != nil { + recordMap, ok := (*record).(map[string]any) + if ok { + idStr, _ := recordMap["id"].(string) + pubkeyStr, _ := recordMap["pubkey"].(string) + createdAt, _ := recordMap["created_at"].(int64) + + id, err := hex.Dec(idStr) + if err != nil { + return nil, err + } + + pubkey, err := hex.Dec(pubkeyStr) + if err != nil { + return nil, err + } + + fidpk = &store.IdPkTs{ + Id: id, + Pub: pubkey, + Ts: createdAt, + Ser: serial, + } + + return fidpk, nil + } + } + } + + return nil, fmt.Errorf("event not found") +} + +// GetFullIdPubkeyBySerials retrieves IDs and pubkeys for multiple serials +func (n *N) GetFullIdPubkeyBySerials(sers []*types.Uint40) ( + fidpks []*store.IdPkTs, err error, +) { + fidpks = make([]*store.IdPkTs, 0, len(sers)) + + if len(sers) == 0 { + return fidpks, nil + } + + // Build list of serial numbers + serialNums := make([]int64, len(sers)) + for i, ser := range sers { + serialNums[i] = int64(ser.Get()) + } + + cypher := ` +MATCH (e:Event) +WHERE e.serial IN $serials +RETURN e.id AS id, + e.pubkey AS pubkey, + e.created_at AS created_at, + e.serial AS serial` + + params := map[string]any{"serials": serialNums} + + result, err := n.ExecuteRead(context.Background(), cypher, params) + if err != nil { + return nil, fmt.Errorf("failed to get IDs and pubkeys by serials: %w", err) + } + + ctx := context.Background() + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if !ok { + return fidpks, nil + } + + for neo4jResult.Next(ctx) { + record := neo4jResult.Record() + if record == nil { + continue + } + + recordMap, ok := (*record).(map[string]any) + if !ok { + continue + } + + idStr, _ := recordMap["id"].(string) + pubkeyStr, _ := recordMap["pubkey"].(string) + createdAt, _ := recordMap["created_at"].(int64) + serialVal, _ := recordMap["serial"].(int64) + + id, err := hex.Dec(idStr) + if err != nil { + continue + } + + pubkey, err := hex.Dec(pubkeyStr) + if err != nil { + continue + } + + fidpks = append(fidpks, &store.IdPkTs{ + Id: id, + Pub: pubkey, + Ts: createdAt, + Ser: uint64(serialVal), + }) + } + + return fidpks, nil +} diff --git a/pkg/neo4j/identity.go b/pkg/neo4j/identity.go new file mode 100644 index 0000000..a0d8772 --- /dev/null +++ b/pkg/neo4j/identity.go @@ -0,0 +1,44 @@ +package neo4j + +import ( + "fmt" + + "next.orly.dev/pkg/crypto/keys" +) + +// Relay identity methods +// We use the marker system to store the relay's private key + +const relayIdentityMarkerKey = "relay_identity_secret" + +// GetRelayIdentitySecret retrieves the relay's identity secret key +func (n *N) GetRelayIdentitySecret() (skb []byte, err error) { + return n.GetMarker(relayIdentityMarkerKey) +} + +// SetRelayIdentitySecret sets the relay's identity secret key +func (n *N) SetRelayIdentitySecret(skb []byte) error { + return n.SetMarker(relayIdentityMarkerKey, skb) +} + +// GetOrCreateRelayIdentitySecret retrieves or creates the relay identity +func (n *N) GetOrCreateRelayIdentitySecret() (skb []byte, err error) { + skb, err = n.GetRelayIdentitySecret() + if err == nil { + return skb, nil + } + + // Generate new identity + skb, err = keys.GenerateSecretKey() + if err != nil { + return nil, fmt.Errorf("failed to generate identity: %w", err) + } + + // Store it + if err = n.SetRelayIdentitySecret(skb); err != nil { + return nil, fmt.Errorf("failed to store identity: %w", err) + } + + n.Logger.Infof("generated new relay identity") + return skb, nil +} diff --git a/pkg/neo4j/import-export.go b/pkg/neo4j/import-export.go new file mode 100644 index 0000000..8909c75 --- /dev/null +++ b/pkg/neo4j/import-export.go @@ -0,0 +1,97 @@ +package neo4j + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + + "next.orly.dev/pkg/encoders/event" +) + +// Import imports events from a reader (JSONL format) +func (n *N) Import(rr io.Reader) { + n.ImportEventsFromReader(context.Background(), rr) +} + +// Export exports events to a writer (JSONL format) +func (n *N) Export(c context.Context, w io.Writer, pubkeys ...[]byte) { + // Query all events or events for specific pubkeys + // Write as JSONL + + // Stub implementation + fmt.Fprintf(w, "# Export not yet implemented for neo4j\n") +} + +// ImportEventsFromReader imports events from a reader +func (n *N) ImportEventsFromReader(ctx context.Context, rr io.Reader) error { + scanner := bufio.NewScanner(rr) + scanner.Buffer(make([]byte, 1024*1024), 10*1024*1024) // 10MB max line size + + count := 0 + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + + // Skip comments + if line[0] == '#' { + continue + } + + // Parse event + ev := &event.E{} + if err := json.Unmarshal(line, ev); err != nil { + n.Logger.Warningf("failed to parse event: %v", err) + continue + } + + // Save event + if _, err := n.SaveEvent(ctx, ev); err != nil { + n.Logger.Warningf("failed to import event: %v", err) + continue + } + + count++ + if count%1000 == 0 { + n.Logger.Infof("imported %d events", count) + } + } + + if err := scanner.Err(); err != nil { + return fmt.Errorf("scanner error: %w", err) + } + + n.Logger.Infof("import complete: %d events", count) + return nil +} + +// ImportEventsFromStrings imports events from JSON strings +func (n *N) ImportEventsFromStrings( + ctx context.Context, + eventJSONs []string, + policyManager interface{ CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) }, +) error { + for _, eventJSON := range eventJSONs { + ev := &event.E{} + if err := json.Unmarshal([]byte(eventJSON), ev); err != nil { + continue + } + + // Check policy if manager is provided + if policyManager != nil { + if allowed, err := policyManager.CheckPolicy("write", ev, ev.Pubkey[:], "import"); err != nil || !allowed { + continue + } + } + + // Save event + if _, err := n.SaveEvent(ctx, ev); err != nil { + n.Logger.Warningf("failed to import event: %v", err) + } + } + + return nil +} diff --git a/pkg/neo4j/logger.go b/pkg/neo4j/logger.go new file mode 100644 index 0000000..afce78b --- /dev/null +++ b/pkg/neo4j/logger.go @@ -0,0 +1,68 @@ +package neo4j + +import ( + "fmt" + "runtime" + "strings" + + "go.uber.org/atomic" + "lol.mleku.dev" + "lol.mleku.dev/log" +) + +// NewLogger creates a new dgraph logger. +func NewLogger(logLevel int, label string) (l *logger) { + l = &logger{Label: label} + l.Level.Store(int32(logLevel)) + return +} + +type logger struct { + Level atomic.Int32 + Label string +} + +// SetLogLevel atomically adjusts the log level to the given log level code. +func (l *logger) SetLogLevel(level int) { + l.Level.Store(int32(level)) +} + +// Errorf is a log printer for this level of message. +func (l *logger) Errorf(s string, i ...interface{}) { + if l.Level.Load() >= lol.Error { + s = l.Label + ": " + s + txt := fmt.Sprintf(s, i...) + _, file, line, _ := runtime.Caller(2) + log.E.F("%s\n%s:%d", strings.TrimSpace(txt), file, line) + } +} + +// Warningf is a log printer for this level of message. +func (l *logger) Warningf(s string, i ...interface{}) { + if l.Level.Load() >= lol.Warn { + s = l.Label + ": " + s + txt := fmt.Sprintf(s, i...) + _, file, line, _ := runtime.Caller(2) + log.W.F("%s\n%s:%d", strings.TrimSpace(txt), file, line) + } +} + +// Infof is a log printer for this level of message. +func (l *logger) Infof(s string, i ...interface{}) { + if l.Level.Load() >= lol.Info { + s = l.Label + ": " + s + txt := fmt.Sprintf(s, i...) + _, file, line, _ := runtime.Caller(2) + log.I.F("%s\n%s:%d", strings.TrimSpace(txt), file, line) + } +} + +// Debugf is a log printer for this level of message. +func (l *logger) Debugf(s string, i ...interface{}) { + if l.Level.Load() >= lol.Debug { + s = l.Label + ": " + s + txt := fmt.Sprintf(s, i...) + _, file, line, _ := runtime.Caller(2) + log.D.F("%s\n%s:%d", strings.TrimSpace(txt), file, line) + } +} diff --git a/pkg/neo4j/markers.go b/pkg/neo4j/markers.go new file mode 100644 index 0000000..1fa63df --- /dev/null +++ b/pkg/neo4j/markers.go @@ -0,0 +1,91 @@ +package neo4j + +import ( + "context" + "fmt" + + "next.orly.dev/pkg/encoders/hex" +) + +// Markers provide metadata key-value storage using Neo4j Marker nodes +// We store markers as special nodes with label "Marker" + +// SetMarker sets a metadata marker +func (n *N) SetMarker(key string, value []byte) error { + valueHex := hex.Enc(value) + + cypher := ` +MERGE (m:Marker {key: $key}) +SET m.value = $value` + + params := map[string]any{ + "key": key, + "value": valueHex, + } + + _, err := n.ExecuteWrite(context.Background(), cypher, params) + if err != nil { + return fmt.Errorf("failed to set marker: %w", err) + } + + return nil +} + +// GetMarker retrieves a metadata marker +func (n *N) GetMarker(key string) (value []byte, err error) { + cypher := "MATCH (m:Marker {key: $key}) RETURN m.value AS value" + params := map[string]any{"key": key} + + result, err := n.ExecuteRead(context.Background(), cypher, params) + if err != nil { + return nil, fmt.Errorf("failed to get marker: %w", err) + } + + ctx := context.Background() + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if !ok { + return nil, fmt.Errorf("invalid result type") + } + + if neo4jResult.Next(ctx) { + record := neo4jResult.Record() + if record != nil { + recordMap, ok := (*record).(map[string]any) + if ok { + if valueStr, ok := recordMap["value"].(string); ok { + // Decode hex value + value, err = hex.Dec(valueStr) + if err != nil { + return nil, fmt.Errorf("failed to decode marker value: %w", err) + } + return value, nil + } + } + } + } + + return nil, fmt.Errorf("marker not found: %s", key) +} + +// HasMarker checks if a marker exists +func (n *N) HasMarker(key string) bool { + _, err := n.GetMarker(key) + return err == nil +} + +// DeleteMarker removes a metadata marker +func (n *N) DeleteMarker(key string) error { + cypher := "MATCH (m:Marker {key: $key}) DELETE m" + params := map[string]any{"key": key} + + _, err := n.ExecuteWrite(context.Background(), cypher, params) + if err != nil { + return fmt.Errorf("failed to delete marker: %w", err) + } + + return nil +} diff --git a/pkg/neo4j/neo4j.go b/pkg/neo4j/neo4j.go new file mode 100644 index 0000000..a16a405 --- /dev/null +++ b/pkg/neo4j/neo4j.go @@ -0,0 +1,321 @@ +// Package neo4j provides a Neo4j-based implementation of the database interface. +// Neo4j is a native graph database optimized for relationship-heavy queries, +// making it ideal for Nostr's social graph and event reference patterns. +package neo4j + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/dgraph-io/badger/v4" + "github.com/neo4j/neo4j-go-driver/v5/neo4j" + "lol.mleku.dev" + "lol.mleku.dev/chk" + "next.orly.dev/pkg/database" + "next.orly.dev/pkg/encoders/filter" + "next.orly.dev/pkg/utils/apputil" +) + +// N implements the database.Database interface using Neo4j as the storage backend +type N struct { + ctx context.Context + cancel context.CancelFunc + dataDir string + Logger *logger + + // Neo4j client connection + driver neo4j.DriverWithContext + + // Fallback badger storage for metadata (markers, identity, etc.) + pstore *badger.DB + + // Configuration + neo4jURI string + neo4jUser string + neo4jPassword string + + ready chan struct{} // Closed when database is ready to serve requests +} + +// Ensure N implements database.Database interface at compile time +var _ database.Database = (*N)(nil) + +// init registers the neo4j database factory +func init() { + database.RegisterNeo4jFactory(func( + ctx context.Context, + cancel context.CancelFunc, + dataDir string, + logLevel string, + ) (database.Database, error) { + return New(ctx, cancel, dataDir, logLevel) + }) +} + +// Config holds configuration options for the Neo4j database +type Config struct { + DataDir string + LogLevel string + Neo4jURI string // Neo4j bolt URI (e.g., "bolt://localhost:7687") + Neo4jUser string // Authentication username + Neo4jPassword string // Authentication password +} + +// New creates a new Neo4j-based database instance +func New( + ctx context.Context, cancel context.CancelFunc, dataDir, logLevel string, +) ( + n *N, err error, +) { + // Get Neo4j connection details from environment + neo4jURI := os.Getenv("ORLY_NEO4J_URI") + if neo4jURI == "" { + neo4jURI = "bolt://localhost:7687" + } + neo4jUser := os.Getenv("ORLY_NEO4J_USER") + if neo4jUser == "" { + neo4jUser = "neo4j" + } + neo4jPassword := os.Getenv("ORLY_NEO4J_PASSWORD") + if neo4jPassword == "" { + neo4jPassword = "password" + } + + n = &N{ + ctx: ctx, + cancel: cancel, + dataDir: dataDir, + Logger: NewLogger(lol.GetLogLevel(logLevel), dataDir), + neo4jURI: neo4jURI, + neo4jUser: neo4jUser, + neo4jPassword: neo4jPassword, + ready: make(chan struct{}), + } + + // Ensure the data directory exists + if err = os.MkdirAll(dataDir, 0755); chk.E(err) { + return + } + + // Ensure directory structure + dummyFile := filepath.Join(dataDir, "dummy.sst") + if err = apputil.EnsureDir(dummyFile); chk.E(err) { + return + } + + // Initialize neo4j client connection + if err = n.initNeo4jClient(); chk.E(err) { + return + } + + // Initialize badger for metadata storage + if err = n.initStorage(); chk.E(err) { + return + } + + // Apply Nostr schema to neo4j (create constraints and indexes) + if err = n.applySchema(ctx); chk.E(err) { + return + } + + // Initialize serial counter + if err = n.initSerialCounter(); chk.E(err) { + return + } + + // Start warmup goroutine to signal when database is ready + go n.warmup() + + // Setup shutdown handler + go func() { + <-n.ctx.Done() + n.cancel() + if n.driver != nil { + n.driver.Close(context.Background()) + } + if n.pstore != nil { + n.pstore.Close() + } + }() + + return +} + +// initNeo4jClient establishes connection to Neo4j server +func (n *N) initNeo4jClient() error { + n.Logger.Infof("connecting to neo4j at %s", n.neo4jURI) + + // Create Neo4j driver + driver, err := neo4j.NewDriverWithContext( + n.neo4jURI, + neo4j.BasicAuth(n.neo4jUser, n.neo4jPassword, ""), + ) + if err != nil { + return fmt.Errorf("failed to create neo4j driver: %w", err) + } + + n.driver = driver + + // Verify connectivity + ctx := context.Background() + if err := driver.VerifyConnectivity(ctx); err != nil { + return fmt.Errorf("failed to verify neo4j connectivity: %w", err) + } + + n.Logger.Infof("successfully connected to neo4j") + return nil +} + +// initStorage opens Badger database for metadata storage +func (n *N) initStorage() error { + metadataDir := filepath.Join(n.dataDir, "metadata") + + if err := os.MkdirAll(metadataDir, 0755); err != nil { + return fmt.Errorf("failed to create metadata directory: %w", err) + } + + opts := badger.DefaultOptions(metadataDir) + + var err error + n.pstore, err = badger.Open(opts) + if err != nil { + return fmt.Errorf("failed to open badger metadata store: %w", err) + } + + n.Logger.Infof("metadata storage initialized") + return nil +} + +// ExecuteRead executes a read query against Neo4j +func (n *N) ExecuteRead(ctx context.Context, cypher string, params map[string]any) (neo4j.ResultWithContext, error) { + session := n.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead}) + defer session.Close(ctx) + + result, err := session.Run(ctx, cypher, params) + if err != nil { + return nil, fmt.Errorf("neo4j read query failed: %w", err) + } + + return result, nil +} + +// ExecuteWrite executes a write query against Neo4j +func (n *N) ExecuteWrite(ctx context.Context, cypher string, params map[string]any) (neo4j.ResultWithContext, error) { + session := n.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}) + defer session.Close(ctx) + + result, err := session.Run(ctx, cypher, params) + if err != nil { + return nil, fmt.Errorf("neo4j write query failed: %w", err) + } + + return result, nil +} + +// ExecuteWriteTransaction executes a transactional write operation +func (n *N) ExecuteWriteTransaction(ctx context.Context, work func(tx neo4j.ManagedTransaction) (any, error)) (any, error) { + session := n.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}) + defer session.Close(ctx) + + return session.ExecuteWrite(ctx, work) +} + +// Path returns the data directory path +func (n *N) Path() string { return n.dataDir } + +// Init initializes the database with a given path (no-op, path set in New) +func (n *N) Init(path string) (err error) { + // Path already set in New() + return nil +} + +// Sync flushes pending writes +func (n *N) Sync() (err error) { + if n.pstore != nil { + return n.pstore.Sync() + } + return nil +} + +// Close closes the database +func (n *N) Close() (err error) { + n.cancel() + if n.driver != nil { + if e := n.driver.Close(context.Background()); e != nil { + err = e + } + } + if n.pstore != nil { + if e := n.pstore.Close(); e != nil && err == nil { + err = e + } + } + return +} + +// Wipe removes all data +func (n *N) Wipe() (err error) { + // Close and remove badger metadata + if n.pstore != nil { + if err = n.pstore.Close(); chk.E(err) { + return + } + } + if err = os.RemoveAll(n.dataDir); chk.E(err) { + return + } + + // Delete all nodes and relationships in Neo4j + ctx := context.Background() + _, err = n.ExecuteWrite(ctx, "MATCH (n) DETACH DELETE n", nil) + if err != nil { + return fmt.Errorf("failed to wipe neo4j database: %w", err) + } + + return n.initStorage() +} + +// SetLogLevel sets the logging level +func (n *N) SetLogLevel(level string) { + // n.Logger.SetLevel(lol.GetLogLevel(level)) +} + +// EventIdsBySerial retrieves event IDs by serial range (stub) +func (n *N) EventIdsBySerial(start uint64, count int) ( + evs []uint64, err error, +) { + err = fmt.Errorf("not implemented") + return +} + +// RunMigrations runs database migrations (no-op for neo4j) +func (n *N) RunMigrations() { + // No-op for neo4j +} + +// Ready returns a channel that closes when the database is ready to serve requests. +// This allows callers to wait for database warmup to complete. +func (n *N) Ready() <-chan struct{} { + return n.ready +} + +// warmup performs database warmup operations and closes the ready channel when complete. +// For Neo4j, warmup ensures the connection is healthy and constraints are applied. +func (n *N) warmup() { + defer close(n.ready) + + // Neo4j connection and schema are already verified during initialization + // Just give a brief moment for any background processes to settle + n.Logger.Infof("neo4j database warmup complete, ready to serve requests") +} + +// GetCachedJSON returns cached query results (not implemented for Neo4j) +func (n *N) GetCachedJSON(f *filter.F) ([][]byte, bool) { return nil, false } + +// CacheMarshaledJSON caches marshaled JSON results (not implemented for Neo4j) +func (n *N) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {} + +// InvalidateQueryCache invalidates the query cache (not implemented for Neo4j) +func (n *N) InvalidateQueryCache() {} diff --git a/pkg/neo4j/nip43.go b/pkg/neo4j/nip43.go new file mode 100644 index 0000000..809e262 --- /dev/null +++ b/pkg/neo4j/nip43.go @@ -0,0 +1,212 @@ +package neo4j + +import ( + "encoding/json" + "fmt" + "time" + + "next.orly.dev/pkg/database" + "next.orly.dev/pkg/encoders/hex" +) + +// NIP-43 Invite-based ACL methods +// Simplified implementation using marker-based storage via Badger +// For production, these could use Neo4j nodes with relationships + +// AddNIP43Member adds a member using an invite code +func (n *N) AddNIP43Member(pubkey []byte, inviteCode string) error { + key := "nip43_" + hex.Enc(pubkey) + + member := database.NIP43Membership{ + InviteCode: inviteCode, + AddedAt: time.Now(), + } + copy(member.Pubkey[:], pubkey) + + data, err := json.Marshal(member) + if err != nil { + return fmt.Errorf("failed to marshal membership: %w", err) + } + + // Also add to members list + if err := n.addToMembersList(pubkey); err != nil { + return err + } + + return n.SetMarker(key, data) +} + +// RemoveNIP43Member removes a member +func (n *N) RemoveNIP43Member(pubkey []byte) error { + key := "nip43_" + hex.Enc(pubkey) + + // Remove from members list + if err := n.removeFromMembersList(pubkey); err != nil { + return err + } + + return n.DeleteMarker(key) +} + +// IsNIP43Member checks if a pubkey is a member +func (n *N) IsNIP43Member(pubkey []byte) (isMember bool, err error) { + _, err = n.GetNIP43Membership(pubkey) + return err == nil, nil +} + +// GetNIP43Membership retrieves membership information +func (n *N) GetNIP43Membership(pubkey []byte) (*database.NIP43Membership, error) { + key := "nip43_" + hex.Enc(pubkey) + + data, err := n.GetMarker(key) + if err != nil { + return nil, err + } + + var member database.NIP43Membership + if err := json.Unmarshal(data, &member); err != nil { + return nil, fmt.Errorf("failed to unmarshal membership: %w", err) + } + + return &member, nil +} + +// GetAllNIP43Members retrieves all member pubkeys +func (n *N) GetAllNIP43Members() ([][]byte, error) { + data, err := n.GetMarker("nip43_members_list") + if err != nil { + return nil, nil // No members = empty list + } + + var members []string + if err := json.Unmarshal(data, &members); err != nil { + return nil, fmt.Errorf("failed to unmarshal members list: %w", err) + } + + result := make([][]byte, 0, len(members)) + for _, hexPubkey := range members { + pubkey, err := hex.Dec(hexPubkey) + if err != nil { + continue + } + result = append(result, pubkey) + } + + return result, nil +} + +// StoreInviteCode stores an invite code with expiration +func (n *N) StoreInviteCode(code string, expiresAt time.Time) error { + key := "invite_" + code + + inviteData := map[string]interface{}{ + "code": code, + "expiresAt": expiresAt, + } + + data, err := json.Marshal(inviteData) + if err != nil { + return fmt.Errorf("failed to marshal invite: %w", err) + } + + return n.SetMarker(key, data) +} + +// ValidateInviteCode checks if an invite code is valid +func (n *N) ValidateInviteCode(code string) (valid bool, err error) { + key := "invite_" + code + + data, err := n.GetMarker(key) + if err != nil { + return false, nil // Code doesn't exist + } + + var inviteData map[string]interface{} + if err := json.Unmarshal(data, &inviteData); err != nil { + return false, fmt.Errorf("failed to unmarshal invite: %w", err) + } + + // Check expiration + if expiresStr, ok := inviteData["expiresAt"].(string); ok { + expiresAt, err := time.Parse(time.RFC3339, expiresStr) + if err == nil && time.Now().After(expiresAt) { + return false, nil // Expired + } + } + + return true, nil +} + +// DeleteInviteCode removes an invite code +func (n *N) DeleteInviteCode(code string) error { + key := "invite_" + code + return n.DeleteMarker(key) +} + +// PublishNIP43MembershipEvent publishes a membership event +func (n *N) PublishNIP43MembershipEvent(kind int, pubkey []byte) error { + // This would require publishing an actual Nostr event + // For now, just log it + n.Logger.Infof("would publish NIP-43 event kind %d for %s", kind, hex.Enc(pubkey)) + return nil +} + +// addToMembersList adds a pubkey to the members list +func (n *N) addToMembersList(pubkey []byte) error { + data, err := n.GetMarker("nip43_members_list") + + var members []string + if err == nil { + if err := json.Unmarshal(data, &members); err != nil { + return fmt.Errorf("failed to unmarshal members list: %w", err) + } + } + + hexPubkey := hex.Enc(pubkey) + + // Check if already in list + for _, member := range members { + if member == hexPubkey { + return nil // Already in list + } + } + + members = append(members, hexPubkey) + + data, err = json.Marshal(members) + if err != nil { + return fmt.Errorf("failed to marshal members list: %w", err) + } + + return n.SetMarker("nip43_members_list", data) +} + +// removeFromMembersList removes a pubkey from the members list +func (n *N) removeFromMembersList(pubkey []byte) error { + data, err := n.GetMarker("nip43_members_list") + if err != nil { + return nil // No list = nothing to remove + } + + var members []string + if err := json.Unmarshal(data, &members); err != nil { + return fmt.Errorf("failed to unmarshal members list: %w", err) + } + + hexPubkey := hex.Enc(pubkey) + + // Filter out the pubkey + filtered := make([]string, 0, len(members)) + for _, member := range members { + if member != hexPubkey { + filtered = append(filtered, member) + } + } + + data, err = json.Marshal(filtered) + if err != nil { + return fmt.Errorf("failed to marshal members list: %w", err) + } + + return n.SetMarker("nip43_members_list", data) +} diff --git a/pkg/neo4j/query-events.go b/pkg/neo4j/query-events.go new file mode 100644 index 0000000..3e68e7d --- /dev/null +++ b/pkg/neo4j/query-events.go @@ -0,0 +1,480 @@ +package neo4j + +import ( + "context" + "fmt" + "strings" + + "next.orly.dev/pkg/database/indexes/types" + "next.orly.dev/pkg/encoders/event" + "next.orly.dev/pkg/encoders/filter" + "next.orly.dev/pkg/encoders/hex" + "next.orly.dev/pkg/encoders/tag" + "next.orly.dev/pkg/interfaces/store" +) + +// QueryEvents retrieves events matching the given filter +func (n *N) QueryEvents(c context.Context, f *filter.F) (evs event.S, err error) { + return n.QueryEventsWithOptions(c, f, false, false) +} + +// QueryAllVersions retrieves all versions of events matching the filter +func (n *N) QueryAllVersions(c context.Context, f *filter.F) (evs event.S, err error) { + return n.QueryEventsWithOptions(c, f, false, true) +} + +// QueryEventsWithOptions retrieves events with specific options +func (n *N) QueryEventsWithOptions( + c context.Context, f *filter.F, includeDeleteEvents bool, showAllVersions bool, +) (evs event.S, err error) { + // Build Cypher query from Nostr filter + cypher, params := n.buildCypherQuery(f, includeDeleteEvents) + + // Execute query + result, err := n.ExecuteRead(c, cypher, params) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %w", err) + } + + // Parse response + evs, err = n.parseEventsFromResult(result) + if err != nil { + return nil, fmt.Errorf("failed to parse events: %w", err) + } + + return evs, nil +} + +// buildCypherQuery constructs a Cypher query from a Nostr filter +// This is the core translation layer between Nostr's REQ filter format and Neo4j's Cypher +func (n *N) buildCypherQuery(f *filter.F, includeDeleteEvents bool) (string, map[string]any) { + params := make(map[string]any) + var whereClauses []string + + // Start with basic MATCH clause + matchClause := "MATCH (e:Event)" + + // IDs filter - uses exact match or prefix matching + if len(f.Ids.T) > 0 { + idConditions := make([]string, len(f.Ids.T)) + for i, id := range f.Ids.T { + paramName := fmt.Sprintf("id_%d", i) + hexID := hex.Enc(id) + + // Handle prefix matching for partial IDs + if len(id) < 32 { // Full event ID is 32 bytes (64 hex chars) + idConditions[i] = fmt.Sprintf("e.id STARTS WITH $%s", paramName) + } else { + idConditions[i] = fmt.Sprintf("e.id = $%s", paramName) + } + params[paramName] = hexID + } + whereClauses = append(whereClauses, "("+strings.Join(idConditions, " OR ")+")") + } + + // Authors filter - supports prefix matching for partial pubkeys + if len(f.Authors.T) > 0 { + authorConditions := make([]string, len(f.Authors.T)) + for i, author := range f.Authors.T { + paramName := fmt.Sprintf("author_%d", i) + hexAuthor := hex.Enc(author) + + // Handle prefix matching for partial pubkeys + if len(author) < 32 { // Full pubkey is 32 bytes (64 hex chars) + authorConditions[i] = fmt.Sprintf("e.pubkey STARTS WITH $%s", paramName) + } else { + authorConditions[i] = fmt.Sprintf("e.pubkey = $%s", paramName) + } + params[paramName] = hexAuthor + } + whereClauses = append(whereClauses, "("+strings.Join(authorConditions, " OR ")+")") + } + + // Kinds filter - matches event types + if len(f.Kinds.K) > 0 { + kinds := make([]int64, len(f.Kinds.K)) + for i, k := range f.Kinds.K { + kinds[i] = int64(k.K) + } + params["kinds"] = kinds + whereClauses = append(whereClauses, "e.kind IN $kinds") + } + + // Time range filters - for temporal queries + if f.Since != nil { + params["since"] = f.Since.V + whereClauses = append(whereClauses, "e.created_at >= $since") + } + if f.Until != nil { + params["until"] = f.Until.V + whereClauses = append(whereClauses, "e.created_at <= $until") + } + + // Tag filters - this is where Neo4j's graph capabilities shine + // We can efficiently traverse tag relationships + tagIndex := 0 + for tagType, tagValues := range *f.Tags { + if len(tagValues.T) > 0 { + tagVarName := fmt.Sprintf("t%d", tagIndex) + tagTypeParam := fmt.Sprintf("tagType_%d", tagIndex) + tagValuesParam := fmt.Sprintf("tagValues_%d", tagIndex) + + // Add tag relationship to MATCH clause + matchClause += fmt.Sprintf(" OPTIONAL MATCH (e)-[:TAGGED_WITH]->(%s:Tag)", tagVarName) + + // Convert tag values to strings + tagValueStrings := make([]string, len(tagValues.T)) + for i, tv := range tagValues.T { + tagValueStrings[i] = string(tv) + } + + // Add WHERE conditions for this tag + params[tagTypeParam] = string(tagType) + params[tagValuesParam] = tagValueStrings + whereClauses = append(whereClauses, + fmt.Sprintf("(%s.type = $%s AND %s.value IN $%s)", + tagVarName, tagTypeParam, tagVarName, tagValuesParam)) + + tagIndex++ + } + } + + // Exclude delete events unless requested + if !includeDeleteEvents { + whereClauses = append(whereClauses, "e.kind <> 5") + } + + // Build WHERE clause + whereClause := "" + if len(whereClauses) > 0 { + whereClause = " WHERE " + strings.Join(whereClauses, " AND ") + } + + // Build RETURN clause with all event properties + returnClause := ` +RETURN e.id AS id, + e.kind AS kind, + e.created_at AS created_at, + e.content AS content, + e.sig AS sig, + e.pubkey AS pubkey, + e.tags AS tags, + e.serial AS serial` + + // Add ordering (most recent first) + orderClause := " ORDER BY e.created_at DESC" + + // Add limit if specified + limitClause := "" + if *f.Limit > 0 { + params["limit"] = *f.Limit + limitClause = " LIMIT $limit" + } + + // Combine all parts + cypher := matchClause + whereClause + returnClause + orderClause + limitClause + + return cypher, params +} + +// parseEventsFromResult converts Neo4j query results to Nostr events +func (n *N) parseEventsFromResult(result any) ([]*event.E, error) { + // Type assert to Neo4j result + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if !ok { + return nil, fmt.Errorf("invalid result type") + } + + events := make([]*event.E, 0) + ctx := context.Background() + + // Iterate through result records + for neo4jResult.Next(ctx) { + record := neo4jResult.Record() + if record == nil { + continue + } + + // Extract fields from record + recordMap, ok := (*record).(map[string]any) + if !ok { + continue + } + + // Parse event fields + idStr, _ := recordMap["id"].(string) + kind, _ := recordMap["kind"].(int64) + createdAt, _ := recordMap["created_at"].(int64) + content, _ := recordMap["content"].(string) + sigStr, _ := recordMap["sig"].(string) + pubkeyStr, _ := recordMap["pubkey"].(string) + tagsStr, _ := recordMap["tags"].(string) + + // Decode hex strings + id, err := hex.Dec(idStr) + if err != nil { + continue + } + sig, err := hex.Dec(sigStr) + if err != nil { + continue + } + pubkey, err := hex.Dec(pubkeyStr) + if err != nil { + continue + } + + // Parse tags from JSON + tags := tag.NewS() + if tagsStr != "" { + _ = tags.UnmarshalJSON([]byte(tagsStr)) + } + + // Create event + e := &event.E{ + Kind: uint16(kind), + CreatedAt: createdAt, + Content: []byte(content), + Tags: tags, + } + + // Copy fixed-size arrays + copy(e.ID[:], id) + copy(e.Sig[:], sig) + copy(e.Pubkey[:], pubkey) + + events = append(events, e) + } + + if err := neo4jResult.Err(); err != nil { + return nil, fmt.Errorf("error iterating results: %w", err) + } + + return events, nil +} + +// QueryDeleteEventsByTargetId retrieves delete events targeting a specific event ID +func (n *N) QueryDeleteEventsByTargetId(c context.Context, targetEventId []byte) ( + evs event.S, err error, +) { + targetIDStr := hex.Enc(targetEventId) + + // Query for kind 5 events that reference this event + // This uses Neo4j's graph traversal to find delete events + cypher := ` +MATCH (target:Event {id: $targetId}) +MATCH (e:Event {kind: 5})-[:REFERENCES]->(target) +RETURN e.id AS id, + e.kind AS kind, + e.created_at AS created_at, + e.content AS content, + e.sig AS sig, + e.pubkey AS pubkey, + e.tags AS tags, + e.serial AS serial +ORDER BY e.created_at DESC` + + params := map[string]any{"targetId": targetIDStr} + + result, err := n.ExecuteRead(c, cypher, params) + if err != nil { + return nil, fmt.Errorf("failed to query delete events: %w", err) + } + + evs, err = n.parseEventsFromResult(result) + if err != nil { + return nil, fmt.Errorf("failed to parse delete events: %w", err) + } + + return evs, nil +} + +// QueryForSerials retrieves event serials matching a filter +func (n *N) QueryForSerials(c context.Context, f *filter.F) ( + serials types.Uint40s, err error, +) { + // Build query but only return serial numbers + cypher, params := n.buildCypherQuery(f, false) + + // Replace RETURN clause to only fetch serials + returnClause := " RETURN e.serial AS serial" + cypherParts := strings.Split(cypher, "RETURN") + if len(cypherParts) < 2 { + return nil, fmt.Errorf("invalid query structure") + } + + // Rebuild query with serial-only return + cypher = cypherParts[0] + returnClause + if strings.Contains(cypherParts[1], "ORDER BY") { + orderPart := " ORDER BY" + strings.Split(cypherParts[1], "ORDER BY")[1] + cypher += orderPart + } + + result, err := n.ExecuteRead(c, cypher, params) + if err != nil { + return nil, fmt.Errorf("failed to query serials: %w", err) + } + + // Parse serials from result + serials = make([]*types.Uint40, 0) + ctx := context.Background() + + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if !ok { + return nil, fmt.Errorf("invalid result type") + } + + for neo4jResult.Next(ctx) { + record := neo4jResult.Record() + if record == nil { + continue + } + + recordMap, ok := (*record).(map[string]any) + if !ok { + continue + } + + serialVal, _ := recordMap["serial"].(int64) + serial := types.Uint40{} + serial.Set(uint64(serialVal)) + serials = append(serials, &serial) + } + + return serials, nil +} + +// QueryForIds retrieves event IDs matching a filter +func (n *N) QueryForIds(c context.Context, f *filter.F) ( + idPkTs []*store.IdPkTs, err error, +) { + // Build query but only return ID, pubkey, created_at, serial + cypher, params := n.buildCypherQuery(f, false) + + // Replace RETURN clause + returnClause := ` + RETURN e.id AS id, + e.pubkey AS pubkey, + e.created_at AS created_at, + e.serial AS serial` + + cypherParts := strings.Split(cypher, "RETURN") + if len(cypherParts) < 2 { + return nil, fmt.Errorf("invalid query structure") + } + + cypher = cypherParts[0] + returnClause + if strings.Contains(cypherParts[1], "ORDER BY") { + orderPart := " ORDER BY" + strings.Split(cypherParts[1], "ORDER BY")[1] + cypher += orderPart + } + + result, err := n.ExecuteRead(c, cypher, params) + if err != nil { + return nil, fmt.Errorf("failed to query IDs: %w", err) + } + + // Parse IDs from result + idPkTs = make([]*store.IdPkTs, 0) + ctx := context.Background() + + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if !ok { + return nil, fmt.Errorf("invalid result type") + } + + for neo4jResult.Next(ctx) { + record := neo4jResult.Record() + if record == nil { + continue + } + + recordMap, ok := (*record).(map[string]any) + if !ok { + continue + } + + idStr, _ := recordMap["id"].(string) + pubkeyStr, _ := recordMap["pubkey"].(string) + createdAt, _ := recordMap["created_at"].(int64) + serialVal, _ := recordMap["serial"].(int64) + + id, err := hex.Dec(idStr) + if err != nil { + continue + } + pubkey, err := hex.Dec(pubkeyStr) + if err != nil { + continue + } + + idPkTs = append(idPkTs, &store.IdPkTs{ + Id: id, + Pub: pubkey, + Ts: createdAt, + Ser: uint64(serialVal), + }) + } + + return idPkTs, nil +} + +// CountEvents counts events matching a filter +func (n *N) CountEvents(c context.Context, f *filter.F) ( + count int, approximate bool, err error, +) { + // Build query but only count results + cypher, params := n.buildCypherQuery(f, false) + + // Replace RETURN clause with COUNT + returnClause := " RETURN count(e) AS count" + cypherParts := strings.Split(cypher, "RETURN") + if len(cypherParts) < 2 { + return 0, false, fmt.Errorf("invalid query structure") + } + + // Remove ORDER BY and LIMIT for count query + cypher = cypherParts[0] + returnClause + delete(params, "limit") // Remove limit parameter if it exists + + result, err := n.ExecuteRead(c, cypher, params) + if err != nil { + return 0, false, fmt.Errorf("failed to count events: %w", err) + } + + // Parse count from result + ctx := context.Background() + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if !ok { + return 0, false, fmt.Errorf("invalid result type") + } + + if neo4jResult.Next(ctx) { + record := neo4jResult.Record() + if record != nil { + recordMap, ok := (*record).(map[string]any) + if ok { + countVal, _ := recordMap["count"].(int64) + count = int(countVal) + } + } + } + + return count, false, nil +} diff --git a/pkg/neo4j/save-event.go b/pkg/neo4j/save-event.go new file mode 100644 index 0000000..5e90a9f --- /dev/null +++ b/pkg/neo4j/save-event.go @@ -0,0 +1,266 @@ +package neo4j + +import ( + "context" + "fmt" + + "next.orly.dev/pkg/database/indexes/types" + "next.orly.dev/pkg/encoders/event" + "next.orly.dev/pkg/encoders/filter" + "next.orly.dev/pkg/encoders/hex" +) + +// SaveEvent stores a Nostr event in the Neo4j database. +// It creates event nodes and relationships for authors, tags, and references. +// This method leverages Neo4j's graph capabilities to model Nostr's social graph naturally. +func (n *N) SaveEvent(c context.Context, ev *event.E) (exists bool, err error) { + eventID := hex.Enc(ev.ID[:]) + + // Check if event already exists + checkCypher := "MATCH (e:Event {id: $id}) RETURN e.id AS id" + checkParams := map[string]any{"id": eventID} + + result, err := n.ExecuteRead(c, checkCypher, checkParams) + if err != nil { + return false, fmt.Errorf("failed to check event existence: %w", err) + } + + // Check if we got a result + ctx := context.Background() + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if ok && neo4jResult.Next(ctx) { + return true, nil // Event already exists + } + + // Get next serial number + serial, err := n.getNextSerial() + if err != nil { + return false, fmt.Errorf("failed to get serial number: %w", err) + } + + // Build and execute Cypher query to create event with all relationships + cypher, params := n.buildEventCreationCypher(ev, serial) + + if _, err = n.ExecuteWrite(c, cypher, params); err != nil { + return false, fmt.Errorf("failed to save event: %w", err) + } + + return false, nil +} + +// buildEventCreationCypher constructs a Cypher query to create an event node with all relationships +// This is a single atomic operation that creates: +// - Event node with all properties +// - Author node and AUTHORED_BY relationship +// - Tag nodes and TAGGED_WITH relationships +// - Reference relationships (REFERENCES for 'e' tags, MENTIONS for 'p' tags) +func (n *N) buildEventCreationCypher(ev *event.E, serial uint64) (string, map[string]any) { + params := make(map[string]any) + + // Event properties + eventID := hex.Enc(ev.ID[:]) + authorPubkey := hex.Enc(ev.Pubkey[:]) + + params["eventId"] = eventID + params["serial"] = serial + params["kind"] = int64(ev.Kind) + params["createdAt"] = ev.CreatedAt + params["content"] = string(ev.Content) + params["sig"] = hex.Enc(ev.Sig[:]) + params["pubkey"] = authorPubkey + + // Serialize tags as JSON string for storage + tagsJSON, _ := ev.Tags.MarshalJSON() + params["tags"] = string(tagsJSON) + + // Start building the Cypher query + // Use MERGE to ensure idempotency for author nodes + cypher := ` +// Create or match author node +MERGE (a:Author {pubkey: $pubkey}) + +// Create event node +CREATE (e:Event { + id: $eventId, + serial: $serial, + kind: $kind, + created_at: $createdAt, + content: $content, + sig: $sig, + pubkey: $pubkey, + tags: $tags +}) + +// Link event to author +CREATE (e)-[:AUTHORED_BY]->(a) +` + + // Process tags to create relationships + // Different tag types create different relationship patterns + tagNodeIndex := 0 + eTagIndex := 0 + pTagIndex := 0 + + for _, tagItem := range *ev.Tags { + if len(tagItem.T) < 2 { + continue + } + + tagType := string(tagItem.T[0]) + tagValue := string(tagItem.T[1]) + + switch tagType { + case "e": // Event reference - creates REFERENCES relationship + // Create reference to another event (if it exists) + paramName := fmt.Sprintf("eTag_%d", eTagIndex) + params[paramName] = tagValue + + cypher += fmt.Sprintf(` +// Reference to event (e-tag) +OPTIONAL MATCH (ref%d:Event {id: $%s}) +FOREACH (ignoreMe IN CASE WHEN ref%d IS NOT NULL THEN [1] ELSE [] END | + CREATE (e)-[:REFERENCES]->(ref%d) +) +`, eTagIndex, paramName, eTagIndex, eTagIndex) + + eTagIndex++ + + case "p": // Pubkey mention - creates MENTIONS relationship + // Create mention to another author + paramName := fmt.Sprintf("pTag_%d", pTagIndex) + params[paramName] = tagValue + + cypher += fmt.Sprintf(` +// Mention of author (p-tag) +MERGE (mentioned%d:Author {pubkey: $%s}) +CREATE (e)-[:MENTIONS]->(mentioned%d) +`, pTagIndex, paramName, pTagIndex) + + pTagIndex++ + + default: // Other tags - creates Tag nodes and TAGGED_WITH relationships + // Create tag node and relationship + typeParam := fmt.Sprintf("tagType_%d", tagNodeIndex) + valueParam := fmt.Sprintf("tagValue_%d", tagNodeIndex) + params[typeParam] = tagType + params[valueParam] = tagValue + + cypher += fmt.Sprintf(` +// Generic tag relationship +MERGE (tag%d:Tag {type: $%s, value: $%s}) +CREATE (e)-[:TAGGED_WITH]->(tag%d) +`, tagNodeIndex, typeParam, valueParam, tagNodeIndex) + + tagNodeIndex++ + } + } + + // Return the created event + cypher += ` +RETURN e.id AS id` + + return cypher, params +} + +// GetSerialsFromFilter returns event serials matching a filter +func (n *N) GetSerialsFromFilter(f *filter.F) (serials types.Uint40s, err error) { + // Use QueryForSerials with background context + return n.QueryForSerials(context.Background(), f) +} + +// WouldReplaceEvent checks if an event would replace existing events +// This handles replaceable events (kinds 0, 3, and 10000-19999) +// and parameterized replaceable events (kinds 30000-39999) +func (n *N) WouldReplaceEvent(ev *event.E) (bool, types.Uint40s, error) { + // Check for replaceable events (kinds 0, 3, and 10000-19999) + isReplaceable := ev.Kind == 0 || ev.Kind == 3 || (ev.Kind >= 10000 && ev.Kind < 20000) + + // Check for parameterized replaceable events (kinds 30000-39999) + isParameterizedReplaceable := ev.Kind >= 30000 && ev.Kind < 40000 + + if !isReplaceable && !isParameterizedReplaceable { + return false, nil, nil + } + + authorPubkey := hex.Enc(ev.Pubkey[:]) + ctx := context.Background() + + var cypher string + params := map[string]any{ + "pubkey": authorPubkey, + "kind": int64(ev.Kind), + "createdAt": ev.CreatedAt, + } + + if isParameterizedReplaceable { + // For parameterized replaceable events, we need to match on d-tag as well + dTag := ev.Tags.GetFirst([]byte{'d'}) + if dTag == nil { + return false, nil, nil + } + + dValue := "" + if len(dTag.T) >= 2 { + dValue = string(dTag.T[1]) + } + + params["dValue"] = dValue + + // Query for existing parameterized replaceable events with same kind, pubkey, and d-tag + cypher = ` +MATCH (e:Event {kind: $kind, pubkey: $pubkey})-[:TAGGED_WITH]->(t:Tag {type: 'd', value: $dValue}) +WHERE e.created_at < $createdAt +RETURN e.serial AS serial, e.created_at AS created_at +ORDER BY e.created_at DESC` + + } else { + // Query for existing replaceable events with same kind and pubkey + cypher = ` +MATCH (e:Event {kind: $kind, pubkey: $pubkey}) +WHERE e.created_at < $createdAt +RETURN e.serial AS serial, e.created_at AS created_at +ORDER BY e.created_at DESC` + } + + result, err := n.ExecuteRead(ctx, cypher, params) + if err != nil { + return false, nil, fmt.Errorf("failed to query replaceable events: %w", err) + } + + // Parse results + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if !ok { + return false, nil, fmt.Errorf("invalid result type") + } + + var serials types.Uint40s + wouldReplace := false + + for neo4jResult.Next(ctx) { + record := neo4jResult.Record() + if record == nil { + continue + } + + recordMap, ok := (*record).(map[string]any) + if !ok { + continue + } + + serialVal, _ := recordMap["serial"].(int64) + wouldReplace = true + serial := types.Uint40{} + serial.Set(uint64(serialVal)) + serials = append(serials, &serial) + } + + return wouldReplace, serials, nil +} diff --git a/pkg/neo4j/schema.go b/pkg/neo4j/schema.go new file mode 100644 index 0000000..fc22073 --- /dev/null +++ b/pkg/neo4j/schema.go @@ -0,0 +1,108 @@ +package neo4j + +import ( + "context" + "fmt" +) + +// applySchema creates Neo4j constraints and indexes for Nostr events +// Neo4j uses Cypher queries to define schema constraints and indexes +func (n *N) applySchema(ctx context.Context) error { + n.Logger.Infof("applying Nostr schema to neo4j") + + // Create constraints and indexes using Cypher queries + // Constraints ensure uniqueness and are automatically indexed + constraints := []string{ + // Unique constraint on Event.id (event ID must be unique) + "CREATE CONSTRAINT event_id_unique IF NOT EXISTS FOR (e:Event) REQUIRE e.id IS UNIQUE", + + // Unique constraint on Author.pubkey (author public key must be unique) + "CREATE CONSTRAINT author_pubkey_unique IF NOT EXISTS FOR (a:Author) REQUIRE a.pubkey IS UNIQUE", + + // Unique constraint on Marker.key (marker key must be unique) + "CREATE CONSTRAINT marker_key_unique IF NOT EXISTS FOR (m:Marker) REQUIRE m.key IS UNIQUE", + } + + // Additional indexes for query optimization + indexes := []string{ + // Index on Event.kind for kind-based queries + "CREATE INDEX event_kind IF NOT EXISTS FOR (e:Event) ON (e.kind)", + + // Index on Event.created_at for time-range queries + "CREATE INDEX event_created_at IF NOT EXISTS FOR (e:Event) ON (e.created_at)", + + // Index on Event.serial for serial-based lookups + "CREATE INDEX event_serial IF NOT EXISTS FOR (e:Event) ON (e.serial)", + + // Composite index for common query patterns (kind + created_at) + "CREATE INDEX event_kind_created_at IF NOT EXISTS FOR (e:Event) ON (e.kind, e.created_at)", + + // Index on Tag.type for tag-type queries + "CREATE INDEX tag_type IF NOT EXISTS FOR (t:Tag) ON (t.type)", + + // Index on Tag.value for tag-value queries + "CREATE INDEX tag_value IF NOT EXISTS FOR (t:Tag) ON (t.value)", + + // Composite index for tag queries (type + value) + "CREATE INDEX tag_type_value IF NOT EXISTS FOR (t:Tag) ON (t.type, t.value)", + } + + // Execute all constraint creation queries + for _, constraint := range constraints { + if _, err := n.ExecuteWrite(ctx, constraint, nil); err != nil { + return fmt.Errorf("failed to create constraint: %w", err) + } + } + + // Execute all index creation queries + for _, index := range indexes { + if _, err := n.ExecuteWrite(ctx, index, nil); err != nil { + return fmt.Errorf("failed to create index: %w", err) + } + } + + n.Logger.Infof("schema applied successfully") + return nil +} + +// dropAll drops all data from neo4j (useful for testing) +func (n *N) dropAll(ctx context.Context) error { + n.Logger.Warningf("dropping all data from neo4j") + + // Delete all nodes and relationships + _, err := n.ExecuteWrite(ctx, "MATCH (n) DETACH DELETE n", nil) + if err != nil { + return fmt.Errorf("failed to drop all data: %w", err) + } + + // Drop all constraints + constraints := []string{ + "DROP CONSTRAINT event_id_unique IF EXISTS", + "DROP CONSTRAINT author_pubkey_unique IF EXISTS", + "DROP CONSTRAINT marker_key_unique IF EXISTS", + } + + for _, constraint := range constraints { + _, _ = n.ExecuteWrite(ctx, constraint, nil) + // Ignore errors as constraints may not exist + } + + // Drop all indexes + indexes := []string{ + "DROP INDEX event_kind IF EXISTS", + "DROP INDEX event_created_at IF EXISTS", + "DROP INDEX event_serial IF EXISTS", + "DROP INDEX event_kind_created_at IF EXISTS", + "DROP INDEX tag_type IF EXISTS", + "DROP INDEX tag_value IF EXISTS", + "DROP INDEX tag_type_value IF EXISTS", + } + + for _, index := range indexes { + _, _ = n.ExecuteWrite(ctx, index, nil) + // Ignore errors as indexes may not exist + } + + // Reapply schema after dropping + return n.applySchema(ctx) +} diff --git a/pkg/neo4j/serial.go b/pkg/neo4j/serial.go new file mode 100644 index 0000000..6f51853 --- /dev/null +++ b/pkg/neo4j/serial.go @@ -0,0 +1,113 @@ +package neo4j + +import ( + "context" + "fmt" + "sync" +) + +// Serial number management +// We use a special Marker node in Neo4j to track the next available serial number + +const serialCounterKey = "serial_counter" + +var ( + serialMutex sync.Mutex +) + +// getNextSerial atomically increments and returns the next serial number +func (n *N) getNextSerial() (uint64, error) { + serialMutex.Lock() + defer serialMutex.Unlock() + + ctx := context.Background() + + // Query current serial value + cypher := "MATCH (m:Marker {key: $key}) RETURN m.value AS value" + params := map[string]any{"key": serialCounterKey} + + result, err := n.ExecuteRead(ctx, cypher, params) + if err != nil { + return 0, fmt.Errorf("failed to query serial counter: %w", err) + } + + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if !ok { + return 1, nil + } + + var currentSerial uint64 = 1 + if neo4jResult.Next(ctx) { + record := neo4jResult.Record() + if record != nil { + recordMap, ok := (*record).(map[string]any) + if ok { + if value, ok := recordMap["value"].(int64); ok { + currentSerial = uint64(value) + } + } + } + } + + // Increment serial + nextSerial := currentSerial + 1 + + // Update counter + updateCypher := ` +MERGE (m:Marker {key: $key}) +SET m.value = $value` + updateParams := map[string]any{ + "key": serialCounterKey, + "value": int64(nextSerial), + } + + _, err = n.ExecuteWrite(ctx, updateCypher, updateParams) + if err != nil { + return 0, fmt.Errorf("failed to update serial counter: %w", err) + } + + return currentSerial, nil +} + +// initSerialCounter initializes the serial counter if it doesn't exist +func (n *N) initSerialCounter() error { + ctx := context.Background() + + // Check if counter exists + cypher := "MATCH (m:Marker {key: $key}) RETURN m.value AS value" + params := map[string]any{"key": serialCounterKey} + + result, err := n.ExecuteRead(ctx, cypher, params) + if err != nil { + return fmt.Errorf("failed to check serial counter: %w", err) + } + + neo4jResult, ok := result.(interface { + Next(context.Context) bool + Record() *interface{} + Err() error + }) + if ok && neo4jResult.Next(ctx) { + // Counter already exists + return nil + } + + // Initialize counter at 1 + initCypher := "CREATE (m:Marker {key: $key, value: $value})" + initParams := map[string]any{ + "key": serialCounterKey, + "value": int64(1), + } + + _, err = n.ExecuteWrite(ctx, initCypher, initParams) + if err != nil { + return fmt.Errorf("failed to initialize serial counter: %w", err) + } + + n.Logger.Infof("initialized serial counter") + return nil +} diff --git a/pkg/neo4j/subscriptions.go b/pkg/neo4j/subscriptions.go new file mode 100644 index 0000000..5269a09 --- /dev/null +++ b/pkg/neo4j/subscriptions.go @@ -0,0 +1,181 @@ +package neo4j + +import ( + "encoding/json" + "fmt" + "time" + + "next.orly.dev/pkg/database" + "next.orly.dev/pkg/encoders/hex" +) + +// Subscription and payment methods +// Simplified implementation using marker-based storage via Badger +// For production graph-based storage, these could use Neo4j nodes with relationships + +// GetSubscription retrieves subscription information for a pubkey +func (n *N) GetSubscription(pubkey []byte) (*database.Subscription, error) { + key := "sub_" + hex.Enc(pubkey) + data, err := n.GetMarker(key) + if err != nil { + return nil, err + } + + var sub database.Subscription + if err := json.Unmarshal(data, &sub); err != nil { + return nil, fmt.Errorf("failed to unmarshal subscription: %w", err) + } + + return &sub, nil +} + +// IsSubscriptionActive checks if a pubkey has an active subscription +func (n *N) IsSubscriptionActive(pubkey []byte) (bool, error) { + sub, err := n.GetSubscription(pubkey) + if err != nil { + return false, nil // No subscription = not active + } + + return sub.PaidUntil.After(time.Now()), nil +} + +// ExtendSubscription extends a subscription by the specified number of days +func (n *N) ExtendSubscription(pubkey []byte, days int) error { + key := "sub_" + hex.Enc(pubkey) + + // Get existing subscription or create new + var sub database.Subscription + data, err := n.GetMarker(key) + if err == nil { + if err := json.Unmarshal(data, &sub); err != nil { + return fmt.Errorf("failed to unmarshal subscription: %w", err) + } + } else { + // New subscription - set trial period + sub.TrialEnd = time.Now() + sub.PaidUntil = time.Now() + } + + // Extend expiration + if sub.PaidUntil.Before(time.Now()) { + sub.PaidUntil = time.Now() + } + sub.PaidUntil = sub.PaidUntil.Add(time.Duration(days) * 24 * time.Hour) + + // Save + data, err = json.Marshal(sub) + if err != nil { + return fmt.Errorf("failed to marshal subscription: %w", err) + } + + return n.SetMarker(key, data) +} + +// RecordPayment records a payment for subscription extension +func (n *N) RecordPayment( + pubkey []byte, amount int64, invoice, preimage string, +) error { + // Store payment in payments list + key := "payments_" + hex.Enc(pubkey) + + var payments []database.Payment + data, err := n.GetMarker(key) + if err == nil { + if err := json.Unmarshal(data, &payments); err != nil { + return fmt.Errorf("failed to unmarshal payments: %w", err) + } + } + + payment := database.Payment{ + Amount: amount, + Timestamp: time.Now(), + Invoice: invoice, + Preimage: preimage, + } + + payments = append(payments, payment) + + data, err = json.Marshal(payments) + if err != nil { + return fmt.Errorf("failed to marshal payments: %w", err) + } + + return n.SetMarker(key, data) +} + +// GetPaymentHistory retrieves payment history for a pubkey +func (n *N) GetPaymentHistory(pubkey []byte) ([]database.Payment, error) { + key := "payments_" + hex.Enc(pubkey) + + data, err := n.GetMarker(key) + if err != nil { + return nil, nil // No payments = empty list + } + + var payments []database.Payment + if err := json.Unmarshal(data, &payments); err != nil { + return nil, fmt.Errorf("failed to unmarshal payments: %w", err) + } + + return payments, nil +} + +// ExtendBlossomSubscription extends a Blossom storage subscription +func (n *N) ExtendBlossomSubscription( + pubkey []byte, tier string, storageMB int64, daysExtended int, +) error { + key := "blossom_" + hex.Enc(pubkey) + + // Simple implementation - just store tier and expiry + data := map[string]interface{}{ + "tier": tier, + "storageMB": storageMB, + "extended": daysExtended, + "updated": time.Now(), + } + + jsonData, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("failed to marshal blossom subscription: %w", err) + } + + return n.SetMarker(key, jsonData) +} + +// GetBlossomStorageQuota retrieves the storage quota for a pubkey +func (n *N) GetBlossomStorageQuota(pubkey []byte) (quotaMB int64, err error) { + key := "blossom_" + hex.Enc(pubkey) + + data, err := n.GetMarker(key) + if err != nil { + return 0, nil // No subscription = 0 quota + } + + var subData map[string]interface{} + if err := json.Unmarshal(data, &subData); err != nil { + return 0, fmt.Errorf("failed to unmarshal blossom data: %w", err) + } + + if storageMB, ok := subData["storageMB"].(float64); ok { + return int64(storageMB), nil + } + + return 0, nil +} + +// IsFirstTimeUser checks if this is the first time a user is accessing the relay +func (n *N) IsFirstTimeUser(pubkey []byte) (bool, error) { + key := "first_seen_" + hex.Enc(pubkey) + + // If marker exists, not first time + if n.HasMarker(key) { + return false, nil + } + + // Mark as seen + if err := n.SetMarker(key, []byte{1}); err != nil { + return true, err + } + + return true, nil +} diff --git a/pkg/neo4j/testmain_test.go b/pkg/neo4j/testmain_test.go new file mode 100644 index 0000000..2e0d2bf --- /dev/null +++ b/pkg/neo4j/testmain_test.go @@ -0,0 +1,15 @@ +package neo4j + +import ( + "os" + "testing" +) + +// skipIfNeo4jNotAvailable skips the test if Neo4j is not available +func skipIfNeo4jNotAvailable(t *testing.T) { + // Check if Neo4j connection details are provided + uri := os.Getenv("ORLY_NEO4J_URI") + if uri == "" { + t.Skip("Neo4j not available (set ORLY_NEO4J_URI to enable tests)") + } +}