Compare commits

...

6 Commits

Author SHA1 Message Date
woikos
0dac41e35e Add documentation and improve BBolt import memory efficiency (v0.48.8)
Some checks failed
Go / build-and-release (push) Has been cancelled
- Add README.md table of contents for easier navigation
- Add Curation ACL documentation section to README.md
- Create detailed Curation Mode Guide (docs/CURATION_MODE_GUIDE.md)
- Fix OOM during BBolt index building by closing temp file before build
- Add GC calls before index building to reclaim batch buffer memory
- Improve import-export.go with processJSONLEventsReturningCount
- Add policy-aware import path for sync operations

Files modified:
- README.md: Added TOC and curation ACL documentation
- docs/CURATION_MODE_GUIDE.md: New comprehensive curation mode guide
- pkg/bbolt/import-export.go: Memory-safe import with deferred cleanup
- pkg/bbolt/import-minimal.go: Added GC before index build
- pkg/version/version: Bump to v0.48.8

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-06 15:37:06 +01:00
woikos
2480be3a73 Fix OOM in BuildIndexes by processing in chunks (v0.48.6)
- Process events in 200k chunks instead of loading all at once
- Write indexes to disk after each chunk, then free memory
- Call debug.FreeOSMemory() between chunks to release memory to OS
- Memory usage now ~150-200MB per chunk instead of 5GB+

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-06 09:10:50 +01:00
woikos
d363f5da04 Implement BBolt ImportEventsFromReader for migration (v0.48.1)
Some checks failed
Go / build-and-release (push) Has been cancelled
- Add import-export.go with full JSONL import support for bbolt
- Remove Import/Export/ImportEventsFromReader stubs from stubs.go
- Includes batched write flush after import completion
- Progress logging every 5 seconds during import

Files modified:
- pkg/bbolt/import-export.go: New file with import functionality
- pkg/bbolt/stubs.go: Remove implemented stubs
- pkg/version/version: v0.48.0 -> v0.48.1

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-06 06:57:06 +01:00
woikos
48a2b97b7e Fix: Add bbolt import for factory registration 2026-01-06 06:52:55 +01:00
woikos
9fed1261ad Add BBolt database backend for HDD-optimized archival relays (v0.48.0)
- BBolt B+tree backend with sequential access patterns for spinning disks
- Write batching (5000 events / 128MB / 30s flush) to reduce disk thrashing
- Adjacency list storage for graph data (one key per vertex, not per edge)
- Bloom filter for fast negative edge existence checks (~12MB for 10M edges)
- No query cache (saves RAM, B+tree reads are fast enough on HDD)
- Migration tool: orly migrate --from badger --to bbolt
- Configuration: ORLY_BBOLT_* environment variables

Files modified:
- app/config/config.go: Added BBolt configuration options
- main.go: Added migrate subcommand and BBolt config wiring
- pkg/database/factory.go: Added BBolt factory registration
- pkg/bbolt/*: New BBolt database backend implementation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-06 06:50:58 +01:00
woikos
8dfd25613d Fix corrupted events with zero-filled IDs/pubkeys/sigs (v0.47.1)
Some checks failed
Go / build-and-release (push) Has been cancelled
- Add validation in GetEventIdBySerial to ensure sei value is 32 bytes
- Fix fallback-to-legacy bug: return error instead of attempting legacy
  unmarshal on compact format data when event ID lookup fails
- Add upfront validation in UnmarshalCompactEvent for eventId length
- Prevents events with all-zero IDs from being returned to clients

Files modified:
- pkg/database/serial_cache.go: Validate sei value is exactly 32 bytes
- pkg/database/fetch-events-by-serials.go: Return error for compact format
  when eventId missing instead of falling back to legacy unmarshal
- pkg/database/fetch-event-by-serial.go: Same fix for single event fetch
- pkg/database/compact_event.go: Validate eventId is 32 bytes upfront
- pkg/version/version: Bump to v0.47.1

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-06 05:51:34 +01:00
30 changed files with 4240 additions and 23 deletions

View File

@@ -12,6 +12,36 @@ zap me: <20>mlekudev@getalby.com
follow me on [nostr](https://jumble.social/users/npub1fjqqy4a93z5zsjwsfxqhc2764kvykfdyttvldkkkdera8dr78vhsmmleku)
## Table of Contents
- [Bug Reports & Feature Requests](#%EF%B8%8F-bug-reports--feature-requests)
- [System Requirements](#%EF%B8%8F-system-requirements)
- [About](#about)
- [Performance & Cryptography](#performance--cryptography)
- [Building](#building)
- [Prerequisites](#prerequisites)
- [Basic Build](#basic-build)
- [Building with Web UI](#building-with-web-ui)
- [Core Features](#core-features)
- [Web UI](#web-ui)
- [Sprocket Event Processing](#sprocket-event-processing)
- [Policy System](#policy-system)
- [Deployment](#deployment)
- [Automated Deployment](#automated-deployment)
- [TLS Configuration](#tls-configuration)
- [systemd Service Management](#systemd-service-management)
- [Remote Deployment](#remote-deployment)
- [Configuration](#configuration)
- [Firewall Configuration](#firewall-configuration)
- [Monitoring](#monitoring)
- [Testing](#testing)
- [Command-Line Tools](#command-line-tools)
- [Access Control](#access-control)
- [Follows ACL](#follows-acl)
- [Curation ACL](#curation-acl)
- [Cluster Replication](#cluster-replication)
- [Developer Notes](#developer-notes)
## ⚠️ Bug Reports & Feature Requests
**Bug reports and feature requests that do not follow the protocol will not be accepted.**
@@ -566,9 +596,22 @@ go run ./cmd/subscription-test-simple -url ws://localhost:3334 -duration 120
## Access Control
ORLY provides four ACL (Access Control List) modes to control who can publish events to your relay:
| Mode | Description | Best For |
|------|-------------|----------|
| `none` | Open relay, anyone can write | Public relays |
| `follows` | Write access based on admin follow lists | Personal/community relays |
| `managed` | Explicit allow/deny lists via NIP-86 API | Private relays |
| `curating` | Three-tier classification with rate limiting | Curated community relays |
```bash
export ORLY_ACL_MODE=follows # or: none, managed, curating
```
### Follows ACL
The follows ACL (Access Control List) system provides flexible relay access control based on social relationships in the Nostr network.
The follows ACL system provides flexible relay access control based on social relationships in the Nostr network.
```bash
export ORLY_ACL_MODE=follows
@@ -578,6 +621,30 @@ export ORLY_ADMINS=npub1fjqqy4a93z5zsjwsfxqhc2764kvykfdyttvldkkkdera8dr78vhsmmle
The system grants write access to users followed by designated admins, with read-only access for others. Follow lists update dynamically as admins modify their relationships.
### Curation ACL
The curation ACL mode provides sophisticated content curation with a three-tier publisher classification system:
- **Trusted**: Unlimited publishing, bypass rate limits
- **Blacklisted**: Blocked from publishing, invisible to regular users
- **Unclassified**: Rate-limited publishing (default 50 events/day)
Key features:
- **Kind whitelisting**: Only allow specific event kinds (e.g., social, DMs, longform)
- **IP-based flood protection**: Auto-ban IPs that exceed rate limits
- **Spam flagging**: Mark events as spam without deleting
- **Web UI management**: Configure via the built-in curation interface
```bash
export ORLY_ACL_MODE=curating
export ORLY_OWNERS=npub1your_owner_key
./orly
```
After starting, publish a configuration event (kind 30078) to enable the relay. The web UI at `/#curation` provides a complete management interface.
For detailed configuration and API documentation, see the [Curation Mode Guide](docs/CURATION_MODE_GUIDE.md).
### Cluster Replication
ORLY supports distributed relay clusters using active replication. When configured with peer relays, ORLY will automatically synchronize events between cluster members using efficient HTTP polling.

View File

@@ -95,8 +95,16 @@ type C struct {
NIP43InviteExpiry time.Duration `env:"ORLY_NIP43_INVITE_EXPIRY" default:"24h" usage:"how long invite codes remain valid"`
// Database configuration
DBType string `env:"ORLY_DB_TYPE" default:"badger" usage:"database backend to use: badger or neo4j"`
DBType string `env:"ORLY_DB_TYPE" default:"badger" usage:"database backend to use: badger, bbolt, or neo4j"`
QueryCacheDisabled bool `env:"ORLY_QUERY_CACHE_DISABLED" default:"true" usage:"disable query cache to reduce memory usage (trades memory for query performance)"`
// BBolt configuration (only used when ORLY_DB_TYPE=bbolt)
BboltBatchMaxEvents int `env:"ORLY_BBOLT_BATCH_MAX_EVENTS" default:"5000" usage:"max events before flush (tuned for HDD, only used when ORLY_DB_TYPE=bbolt)"`
BboltBatchMaxMB int `env:"ORLY_BBOLT_BATCH_MAX_MB" default:"128" usage:"max batch size in MB before flush (only used when ORLY_DB_TYPE=bbolt)"`
BboltFlushTimeout int `env:"ORLY_BBOLT_FLUSH_TIMEOUT_SEC" default:"30" usage:"max seconds before flush (only used when ORLY_DB_TYPE=bbolt)"`
BboltBloomSizeMB int `env:"ORLY_BBOLT_BLOOM_SIZE_MB" default:"16" usage:"bloom filter size in MB for edge queries (only used when ORLY_DB_TYPE=bbolt)"`
BboltNoSync bool `env:"ORLY_BBOLT_NO_SYNC" default:"false" usage:"disable fsync for performance (DANGEROUS - data loss risk, only used when ORLY_DB_TYPE=bbolt)"`
BboltMmapSizeMB int `env:"ORLY_BBOLT_MMAP_SIZE_MB" default:"8192" usage:"initial mmap size in MB (only used when ORLY_DB_TYPE=bbolt)"`
QueryCacheSizeMB int `env:"ORLY_QUERY_CACHE_SIZE_MB" default:"512" usage:"query cache size in MB (caches database query results for faster REQ responses)"`
QueryCacheMaxAge string `env:"ORLY_QUERY_CACHE_MAX_AGE" default:"5m" usage:"maximum age for cached query results (e.g., 5m, 10m, 1h)"`
@@ -357,6 +365,45 @@ func CuratingModeRequested() (requested bool, ownerKey string) {
return
}
// MigrateRequested checks if the first command line argument is "migrate"
// and returns the migration parameters.
//
// Return Values
// - requested: true if the 'migrate' subcommand was provided
// - fromType: source database type (badger, bbolt, neo4j)
// - toType: destination database type
// - targetPath: optional target path for destination database
func MigrateRequested() (requested bool, fromType, toType, targetPath string) {
if len(os.Args) > 1 {
switch strings.ToLower(os.Args[1]) {
case "migrate":
requested = true
// Parse --from, --to, --target-path flags
for i := 2; i < len(os.Args); i++ {
arg := os.Args[i]
switch {
case strings.HasPrefix(arg, "--from="):
fromType = strings.TrimPrefix(arg, "--from=")
case strings.HasPrefix(arg, "--to="):
toType = strings.TrimPrefix(arg, "--to=")
case strings.HasPrefix(arg, "--target-path="):
targetPath = strings.TrimPrefix(arg, "--target-path=")
case arg == "--from" && i+1 < len(os.Args):
i++
fromType = os.Args[i]
case arg == "--to" && i+1 < len(os.Args):
i++
toType = os.Args[i]
case arg == "--target-path" && i+1 < len(os.Args):
i++
targetPath = os.Args[i]
}
}
}
}
return
}
// KV is a key/value pair.
type KV struct{ Key, Value string }
@@ -488,18 +535,20 @@ func PrintHelp(cfg *C, printer io.Writer) {
)
_, _ = fmt.Fprintf(
printer,
`Usage: %s [env|help|identity|serve|version]
`Usage: %s [env|help|identity|migrate|serve|version]
- env: print environment variables configuring %s
- help: print this help text
- identity: print the relay identity secret and public key
- migrate: migrate data between database backends
Example: %s migrate --from badger --to bbolt
- serve: start ephemeral relay with RAM-based storage at /dev/shm/orlyserve
listening on 0.0.0.0:10547 with 'none' ACL mode (open relay)
useful for testing and benchmarking
- version: print version and exit (also: -v, --v, -version, --version)
`,
cfg.AppName, cfg.AppName,
cfg.AppName, cfg.AppName, cfg.AppName,
)
_, _ = fmt.Fprintf(
printer,
@@ -707,3 +756,22 @@ func (cfg *C) GetGraphConfigValues() (
cfg.GraphMaxResults,
cfg.GraphRateLimitRPM
}
// GetBboltConfigValues returns the BBolt database configuration values.
// This avoids circular imports with pkg/bbolt while allowing main.go to construct
// the BBolt-specific configuration.
func (cfg *C) GetBboltConfigValues() (
batchMaxEvents int,
batchMaxBytes int64,
flushTimeoutSec int,
bloomSizeMB int,
noSync bool,
mmapSizeBytes int,
) {
return cfg.BboltBatchMaxEvents,
int64(cfg.BboltBatchMaxMB) * 1024 * 1024,
cfg.BboltFlushTimeout,
cfg.BboltBloomSizeMB,
cfg.BboltNoSync,
cfg.BboltMmapSizeMB * 1024 * 1024
}

411
docs/CURATION_MODE_GUIDE.md Normal file
View File

@@ -0,0 +1,411 @@
# Curation Mode Guide
Curation mode is a sophisticated access control system for Nostr relays that provides three-tier publisher classification, rate limiting, IP-based flood protection, and event kind whitelisting.
## Overview
Unlike simple allow/deny lists, curation mode classifies publishers into three tiers:
| Tier | Rate Limited | Daily Limit | Visibility |
|------|--------------|-------------|------------|
| **Trusted** | No | Unlimited | Full |
| **Blacklisted** | N/A (blocked) | 0 | Hidden from regular users |
| **Unclassified** | Yes | 50 events/day (default) | Full |
This allows relay operators to:
- Reward quality contributors with unlimited publishing
- Block bad actors while preserving their events for admin review
- Allow new users to participate with reasonable rate limits
- Prevent spam floods through automatic IP-based protections
## Quick Start
### 1. Start the Relay
```bash
export ORLY_ACL_MODE=curating
export ORLY_OWNERS=npub1your_owner_pubkey
./orly
```
### 2. Publish Configuration
The relay will not accept events until you publish a configuration event. Use the web UI at `http://your-relay/#curation` or publish a kind 30078 event:
```json
{
"kind": 30078,
"tags": [["d", "curating-config"]],
"content": "{\"dailyLimit\":50,\"ipDailyLimit\":500,\"firstBanHours\":1,\"secondBanHours\":168,\"kindCategories\":[\"social\"]}"
}
```
### 3. Manage Publishers
Use the web UI or NIP-86 API to:
- Trust quality publishers
- Blacklist spammers
- Review unclassified users by activity
- Unblock IPs if needed
## Configuration
### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `ORLY_ACL_MODE` | `none` | Set to `curating` to enable |
| `ORLY_OWNERS` | | Owner pubkeys (can configure relay) |
| `ORLY_ADMINS` | | Admin pubkeys (can manage publishers) |
### Configuration Event (Kind 30078)
Configuration is stored as a replaceable Nostr event (kind 30078) with d-tag `curating-config`. Only owners and admins can publish configuration.
```typescript
interface CuratingConfig {
// Rate Limiting
dailyLimit: number; // Max events/day for unclassified users (default: 50)
ipDailyLimit: number; // Max events/day from single IP (default: 500)
// IP Ban Durations
firstBanHours: number; // First offense ban duration (default: 1 hour)
secondBanHours: number; // Subsequent offense ban duration (default: 168 hours / 1 week)
// Kind Filtering (choose one or combine)
allowedKinds: number[]; // Explicit kind numbers: [0, 1, 3, 7]
allowedRanges: string[]; // Kind ranges: ["1000-1999", "30000-39999"]
kindCategories: string[]; // Pre-defined categories: ["social", "dm"]
}
```
### Event Kind Categories
Pre-defined categories for convenient kind whitelisting:
| Category | Kinds | Description |
|----------|-------|-------------|
| `social` | 0, 1, 3, 6, 7, 10002 | Profiles, notes, contacts, reposts, reactions |
| `dm` | 4, 14, 1059 | Direct messages (NIP-04, NIP-17, gift wraps) |
| `longform` | 30023, 30024 | Long-form articles and drafts |
| `media` | 1063, 20, 21, 22 | File metadata, picture/video/audio events |
| `marketplace` | 30017-30020, 1021, 1022 | Products, stalls, auctions, bids |
| `groups_nip29` | 9-12, 9000-9002, 39000-39002 | NIP-29 relay-based groups |
| `groups_nip72` | 34550, 1111, 4550 | NIP-72 moderated communities |
| `lists` | 10000, 10001, 10003, 30000, 30001, 30003 | Mute, pin, bookmark lists |
Example configuration allowing social interactions and DMs:
```json
{
"kindCategories": ["social", "dm"],
"dailyLimit": 100,
"ipDailyLimit": 1000
}
```
## Three-Tier Classification
### Trusted Publishers
Trusted publishers have unlimited publishing rights:
- Bypass all rate limiting
- Can publish any allowed kind
- Events always visible to all users
**Use case**: Known quality contributors, verified community members, partner relays.
### Blacklisted Publishers
Blacklisted publishers are blocked from publishing:
- All events rejected with `"pubkey is blacklisted"` error
- Existing events become invisible to regular users
- Admins and owners can still see blacklisted events (for review)
**Use case**: Spammers, abusive users, bad actors.
### Unclassified Publishers
Everyone else falls into the unclassified tier:
- Subject to daily event limit (default: 50 events/day)
- Subject to IP-based flood protection
- Events visible to all users
- Can be promoted to trusted or demoted to blacklisted
**Use case**: New users, general public.
## Rate Limiting & Flood Protection
### Per-Pubkey Limits
Unclassified publishers are limited to a configurable number of events per day (default: 50). The count resets at midnight UTC.
When a user exceeds their limit:
1. Event is rejected with `"daily event limit exceeded"` error
2. Their IP is flagged for potential abuse
### Per-IP Limits
To prevent Sybil attacks (creating many pubkeys from one IP), there's also an IP-based daily limit (default: 500 events).
When an IP exceeds its limit:
1. All events from that IP are rejected
2. The IP is temporarily banned
### Automatic IP Banning
When rate limits are exceeded:
| Offense | Ban Duration | Description |
|---------|--------------|-------------|
| First | 1 hour | Quick timeout for accidental over-posting |
| Second+ | 1 week | Extended ban for repeated abuse |
Ban durations are configurable via `firstBanHours` and `secondBanHours`.
### Offense Tracking
The system tracks which pubkeys triggered rate limits from each IP:
```
IP 192.168.1.100:
- npub1abc... exceeded limit at 2024-01-15 10:30:00
- npub1xyz... exceeded limit at 2024-01-15 10:45:00
Offense count: 2
Status: Banned until 2024-01-22 10:45:00
```
This helps identify coordinated spam attacks.
## Spam Flagging
Events can be flagged as spam without deletion:
- Flagged events are hidden from regular users
- Admins can review flagged events
- Events can be unflagged if incorrectly marked
- Original event data is preserved
This is useful for:
- Moderation review queues
- Training spam detection systems
- Preserving evidence of abuse
## NIP-86 Management API
All management operations use NIP-98 HTTP authentication.
### Trust Management
```bash
# Trust a pubkey
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"trustpubkey","params":["<pubkey_hex>"]}'
# Untrust a pubkey
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"untrustpubkey","params":["<pubkey_hex>"]}'
# List trusted pubkeys
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"listtrustedpubkeys","params":[]}'
```
### Blacklist Management
```bash
# Blacklist a pubkey
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"blacklistpubkey","params":["<pubkey_hex>"]}'
# Remove from blacklist
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"unblacklistpubkey","params":["<pubkey_hex>"]}'
# List blacklisted pubkeys
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"listblacklistedpubkeys","params":[]}'
```
### Unclassified User Management
```bash
# List unclassified users sorted by event count
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"listunclassifiedusers","params":[]}'
```
Response includes pubkey, event count, and last activity for each user.
### Spam Management
```bash
# Mark event as spam
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"markspam","params":["<event_id_hex>"]}'
# Unmark spam
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"unmarkspam","params":["<event_id_hex>"]}'
# List spam events
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"listspamevents","params":[]}'
```
### IP Block Management
```bash
# List blocked IPs
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"listblockedips","params":[]}'
# Unblock an IP
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"unblockip","params":["<ip_address>"]}'
```
### Configuration Management
```bash
# Get current configuration
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"getcuratingconfig","params":[]}'
# Set allowed kind categories
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"setallowedkindcategories","params":[["social","dm","longform"]]}'
# Get allowed kind categories
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"getallowedkindcategories","params":[]}'
```
## Web UI
The curation web UI is available at `/#curation` and provides:
- **Configuration Panel**: Set rate limits, ban durations, and allowed kinds
- **Publisher Management**: Trust/blacklist pubkeys with one click
- **Unclassified Users**: View users sorted by activity, promote or blacklist
- **IP Blocks**: View and unblock banned IP addresses
- **Spam Queue**: Review flagged events, confirm or unflag
## Database Storage
Curation data is stored in the relay database with the following key prefixes:
| Prefix | Purpose |
|--------|---------|
| `CURATING_ACL_CONFIG` | Current configuration |
| `CURATING_ACL_TRUSTED_PUBKEY_{pubkey}` | Trusted publisher list |
| `CURATING_ACL_BLACKLISTED_PUBKEY_{pubkey}` | Blacklisted publisher list |
| `CURATING_ACL_EVENT_COUNT_{pubkey}_{date}` | Daily event counts per pubkey |
| `CURATING_ACL_IP_EVENT_COUNT_{ip}_{date}` | Daily event counts per IP |
| `CURATING_ACL_IP_OFFENSE_{ip}` | Offense tracking per IP |
| `CURATING_ACL_BLOCKED_IP_{ip}` | Active IP blocks |
| `CURATING_ACL_SPAM_EVENT_{eventID}` | Spam-flagged events |
## Caching
For performance, the following data is cached in memory:
- Trusted pubkey set
- Blacklisted pubkey set
- Allowed kinds set
- Current configuration
Caches are refreshed every hour by the background cleanup goroutine.
## Background Maintenance
A background goroutine runs hourly to:
1. Remove expired IP blocks
2. Clean up old event count entries (older than 2 days)
3. Refresh in-memory caches
4. Log maintenance statistics
## Best Practices
### Starting a New Curated Relay
1. Start with permissive settings:
```json
{"dailyLimit": 100, "ipDailyLimit": 1000, "kindCategories": ["social"]}
```
2. Monitor unclassified users for a few days
3. Trust active, quality contributors
4. Blacklist obvious spammers
5. Adjust rate limits based on observed patterns
### Handling Spam Waves
During spam attacks:
1. The IP-based flood protection will auto-ban attack sources
2. Review blocked IPs via web UI or API
3. Blacklist any pubkeys that got through
4. Consider temporarily lowering `ipDailyLimit`
### Recovering from Mistakes
- **Accidentally blacklisted someone**: Use `unblacklistpubkey` - their events become visible again
- **Wrongly flagged spam**: Use `unmarkspam` - event becomes visible again
- **Blocked legitimate IP**: Use `unblockip` - IP can publish again immediately
## Comparison with Other ACL Modes
| Feature | None | Follows | Managed | Curating |
|---------|------|---------|---------|----------|
| Default Access | Write | Write if followed | Explicit allow | Rate-limited |
| Rate Limiting | No | No | No | Yes |
| Kind Filtering | No | No | Optional | Yes |
| IP Protection | No | No | No | Yes |
| Spam Flagging | No | No | No | Yes |
| Configuration | Env vars | Follow lists | NIP-86 | Kind 30078 events |
| Web UI | Basic | Basic | Basic | Full curation panel |
## Troubleshooting
### "Relay not accepting events"
The relay requires a configuration event before accepting any events. Publish a kind 30078 event with d-tag `curating-config`.
### "daily event limit exceeded"
The user has exceeded their daily limit. Options:
1. Wait until midnight UTC for reset
2. Trust the pubkey if they're a quality contributor
3. Increase `dailyLimit` in configuration
### "pubkey is blacklisted"
The pubkey is on the blacklist. Use `unblacklistpubkey` if this was a mistake.
### "IP is blocked"
The IP has been auto-banned due to rate limit violations. Use `unblockip` if legitimate, or wait for the ban to expire.
### Events disappearing for users
Check if the event author has been blacklisted. Blacklisted authors' events are hidden from regular users but visible to admins.

3
go.mod
View File

@@ -36,6 +36,8 @@ require (
github.com/BurntSushi/toml v1.5.0 // indirect
github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 // indirect
github.com/alexflint/go-scalar v1.2.0 // indirect
github.com/bits-and-blooms/bitset v1.24.2 // indirect
github.com/bits-and-blooms/bloom/v3 v3.7.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect
github.com/bytedance/sonic v1.13.1 // indirect
@@ -69,6 +71,7 @@ require (
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
go.etcd.io/bbolt v1.4.3 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect

7
go.sum
View File

@@ -12,6 +12,10 @@ github.com/alexflint/go-scalar v1.2.0 h1:WR7JPKkeNpnYIOfHRa7ivM21aWAdHD0gEWHCx+W
github.com/alexflint/go-scalar v1.2.0/go.mod h1:LoFvNMqS1CPrMVltza4LvnGKhaSpc3oyLEBUZVhhS2o=
github.com/aperturerobotics/go-indexeddb v0.2.3 h1:DfquIk9YEZjWD/lJyBWZWGCtRga43/a96bx0Ulv9VhQ=
github.com/aperturerobotics/go-indexeddb v0.2.3/go.mod h1:JV1XngOCCui7zrMSyRz+Wvz00nUSfotRKZqJzWpl5fQ=
github.com/bits-and-blooms/bitset v1.24.2 h1:M7/NzVbsytmtfHbumG+K2bremQPMJuqv1JD3vOaFxp0=
github.com/bits-and-blooms/bitset v1.24.2/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bloom/v3 v3.7.1 h1:WXovk4TRKZttAMJfoQx6K2DM0zNIt8w+c67UqO+etV0=
github.com/bits-and-blooms/bloom/v3 v3.7.1/go.mod h1:rZzYLLje2dfzXfAkJNxQQHsKurAyK55KUnL43Euk0hU=
github.com/btcsuite/btcd/btcec/v2 v2.3.4 h1:3EJjcN70HCu/mwqlUsGK8GcNVyLVxFDlWurTXGPFfiQ=
github.com/btcsuite/btcd/btcec/v2 v2.3.4/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 h1:59Kx4K6lzOW5w6nFlA0v5+lk/6sjybR934QNHSJZPTQ=
@@ -155,10 +159,13 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/vertex-lab/nostr-sqlite v0.3.2 h1:8nZYYIwiKnWLA446qA/wL/Gy+bU0kuaxdLfUyfeTt/E=
github.com/vertex-lab/nostr-sqlite v0.3.2/go.mod h1:5bw1wMgJhSdrumsZAWxqy+P0u1g+q02PnlGQn15dnSM=
go-simpler.org/env v0.12.0 h1:kt/lBts0J1kjWJAnB740goNdvwNxt5emhYngL0Fzufs=
go-simpler.org/env v0.12.0/go.mod h1:cc/5Md9JCUM7LVLtN0HYjPTDcI3Q8TDaPlNTAlDU+WI=
go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo=
go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=

122
main.go
View File

@@ -24,6 +24,7 @@ import (
"next.orly.dev/pkg/acl"
"git.mleku.dev/mleku/nostr/crypto/keys"
"git.mleku.dev/mleku/nostr/encoders/bech32encoding"
_ "next.orly.dev/pkg/bbolt" // Import for bbolt factory registration
"next.orly.dev/pkg/database"
neo4jdb "next.orly.dev/pkg/neo4j" // Import for neo4j factory and type
"git.mleku.dev/mleku/nostr/encoders/hex"
@@ -73,6 +74,117 @@ func main() {
os.Exit(0)
}
// Handle 'migrate' subcommand: migrate data between database backends
if requested, fromType, toType, targetPath := config.MigrateRequested(); requested {
if fromType == "" || toType == "" {
fmt.Println("Usage: orly migrate --from <type> --to <type> [--target-path <path>]")
fmt.Println("")
fmt.Println("Migrate data between database backends.")
fmt.Println("")
fmt.Println("Options:")
fmt.Println(" --from <type> Source database type (badger, bbolt, neo4j)")
fmt.Println(" --to <type> Destination database type (badger, bbolt, neo4j)")
fmt.Println(" --target-path <path> Optional: destination data directory")
fmt.Println(" (default: $ORLY_DATA_DIR/<type>)")
fmt.Println("")
fmt.Println("Examples:")
fmt.Println(" orly migrate --from badger --to bbolt")
fmt.Println(" orly migrate --from badger --to bbolt --target-path /mnt/hdd/orly-bbolt")
os.Exit(1)
}
// Set target path if not specified
if targetPath == "" {
targetPath = cfg.DataDir + "-" + toType
}
log.I.F("migrate: %s -> %s", fromType, toType)
log.I.F("migrate: source path: %s", cfg.DataDir)
log.I.F("migrate: target path: %s", targetPath)
// Open source database
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
srcCfg := makeDatabaseConfig(cfg)
var srcDB database.Database
if srcDB, err = database.NewDatabaseWithConfig(ctx, cancel, fromType, srcCfg); chk.E(err) {
log.E.F("migrate: failed to open source database: %v", err)
os.Exit(1)
}
// Wait for source database to be ready
select {
case <-srcDB.Ready():
log.I.F("migrate: source database ready")
case <-time.After(60 * time.Second):
log.E.F("migrate: timeout waiting for source database")
os.Exit(1)
}
// Open destination database
dstCfg := makeDatabaseConfig(cfg)
dstCfg.DataDir = targetPath
var dstDB database.Database
if dstDB, err = database.NewDatabaseWithConfig(ctx, cancel, toType, dstCfg); chk.E(err) {
log.E.F("migrate: failed to open destination database: %v", err)
srcDB.Close()
os.Exit(1)
}
// Wait for destination database to be ready
select {
case <-dstDB.Ready():
log.I.F("migrate: destination database ready")
case <-time.After(60 * time.Second):
log.E.F("migrate: timeout waiting for destination database")
srcDB.Close()
os.Exit(1)
}
// Migrate using pipe (export from source, import to destination)
log.I.F("migrate: starting data transfer...")
pr, pw, pipeErr := os.Pipe()
if pipeErr != nil {
log.E.F("migrate: failed to create pipe: %v", pipeErr)
srcDB.Close()
dstDB.Close()
os.Exit(1)
}
var wg sync.WaitGroup
wg.Add(2)
// Export goroutine
go func() {
defer wg.Done()
defer pw.Close()
srcDB.Export(ctx, pw)
log.I.F("migrate: export complete")
}()
// Import goroutine
go func() {
defer wg.Done()
if importErr := dstDB.ImportEventsFromReader(ctx, pr); importErr != nil {
log.E.F("migrate: import error: %v", importErr)
}
log.I.F("migrate: import complete")
}()
wg.Wait()
// Sync and close databases
if err = dstDB.Sync(); chk.E(err) {
log.W.F("migrate: sync warning: %v", err)
}
srcDB.Close()
dstDB.Close()
log.I.F("migrate: migration complete!")
os.Exit(0)
}
// Handle 'serve' subcommand: start ephemeral relay with RAM-based storage
if config.ServeRequested() {
const serveDataDir = "/dev/shm/orlyserve"
@@ -622,6 +734,9 @@ func makeDatabaseConfig(cfg *config.C) *database.DatabaseConfig {
neo4jURI, neo4jUser, neo4jPassword,
neo4jMaxConnPoolSize, neo4jFetchSize, neo4jMaxTxRetrySeconds, neo4jQueryResultLimit := cfg.GetDatabaseConfigValues()
// Get BBolt-specific configuration
batchMaxEvents, batchMaxBytes, flushTimeoutSec, bloomSizeMB, noSync, mmapSizeBytes := cfg.GetBboltConfigValues()
return &database.DatabaseConfig{
DataDir: dataDir,
LogLevel: logLevel,
@@ -640,6 +755,13 @@ func makeDatabaseConfig(cfg *config.C) *database.DatabaseConfig {
Neo4jFetchSize: neo4jFetchSize,
Neo4jMaxTxRetrySeconds: neo4jMaxTxRetrySeconds,
Neo4jQueryResultLimit: neo4jQueryResultLimit,
// BBolt-specific settings
BboltBatchMaxEvents: batchMaxEvents,
BboltBatchMaxBytes: batchMaxBytes,
BboltFlushTimeout: time.Duration(flushTimeoutSec) * time.Second,
BboltBloomSizeMB: bloomSizeMB,
BboltNoSync: noSync,
BboltMmapSize: mmapSizeBytes,
}
}

330
pkg/bbolt/batcher.go Normal file
View File

@@ -0,0 +1,330 @@
//go:build !(js && wasm)
package bbolt
import (
"sync"
"time"
bolt "go.etcd.io/bbolt"
"lol.mleku.dev/chk"
)
// BatchedWrite represents a single write operation
type BatchedWrite struct {
BucketName []byte
Key []byte
Value []byte
IsDelete bool
}
// EventBatch represents a complete event with all its indexes and graph updates
type EventBatch struct {
Serial uint64
EventData []byte // Serialized compact event data
Indexes []BatchedWrite // Index entries
EventVertex *EventVertex // Graph vertex for this event
PubkeyUpdate *PubkeyVertexUpdate // Update to author's pubkey vertex
MentionUpdates []*PubkeyVertexUpdate // Updates to mentioned pubkeys
EdgeKeys []EdgeKey // Edge keys for bloom filter
}
// PubkeyVertexUpdate represents an update to a pubkey's vertex
type PubkeyVertexUpdate struct {
PubkeySerial uint64
AddAuthored uint64 // Event serial to add to authored (0 if none)
AddMention uint64 // Event serial to add to mentions (0 if none)
}
// WriteBatcher accumulates writes and flushes them in batches.
// Optimized for HDD with large batches and periodic flushes.
type WriteBatcher struct {
db *bolt.DB
bloom *EdgeBloomFilter
logger *Logger
mu sync.Mutex
pending []*EventBatch
pendingSize int64
stopped bool
// Configuration
maxEvents int
maxBytes int64
flushPeriod time.Duration
// Channels for coordination
flushCh chan struct{}
shutdownCh chan struct{}
doneCh chan struct{}
// Stats
stats BatcherStats
}
// BatcherStats contains batcher statistics
type BatcherStats struct {
TotalBatches uint64
TotalEvents uint64
TotalBytes uint64
AverageLatencyMs float64
LastFlushTime time.Time
LastFlushDuration time.Duration
}
// NewWriteBatcher creates a new write batcher
func NewWriteBatcher(db *bolt.DB, bloom *EdgeBloomFilter, cfg *BboltConfig, logger *Logger) *WriteBatcher {
wb := &WriteBatcher{
db: db,
bloom: bloom,
logger: logger,
maxEvents: cfg.BatchMaxEvents,
maxBytes: cfg.BatchMaxBytes,
flushPeriod: cfg.BatchFlushTimeout,
pending: make([]*EventBatch, 0, cfg.BatchMaxEvents),
flushCh: make(chan struct{}, 1),
shutdownCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
go wb.flushLoop()
return wb
}
// Add adds an event batch to the pending writes
func (wb *WriteBatcher) Add(batch *EventBatch) error {
wb.mu.Lock()
defer wb.mu.Unlock()
if wb.stopped {
return ErrBatcherStopped
}
wb.pending = append(wb.pending, batch)
wb.pendingSize += int64(len(batch.EventData))
for _, idx := range batch.Indexes {
wb.pendingSize += int64(len(idx.Key) + len(idx.Value))
}
// Check thresholds
if len(wb.pending) >= wb.maxEvents || wb.pendingSize >= wb.maxBytes {
wb.triggerFlush()
}
return nil
}
// triggerFlush signals the flush loop to flush (must be called with lock held)
func (wb *WriteBatcher) triggerFlush() {
select {
case wb.flushCh <- struct{}{}:
default:
// Already a flush pending
}
}
// flushLoop runs the background flush timer
func (wb *WriteBatcher) flushLoop() {
defer close(wb.doneCh)
timer := time.NewTimer(wb.flushPeriod)
defer timer.Stop()
for {
select {
case <-timer.C:
if err := wb.Flush(); err != nil {
wb.logger.Errorf("bbolt: flush error: %v", err)
}
timer.Reset(wb.flushPeriod)
case <-wb.flushCh:
if err := wb.Flush(); err != nil {
wb.logger.Errorf("bbolt: flush error: %v", err)
}
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(wb.flushPeriod)
case <-wb.shutdownCh:
// Final flush
if err := wb.Flush(); err != nil {
wb.logger.Errorf("bbolt: final flush error: %v", err)
}
return
}
}
}
// Flush writes all pending batches to BBolt
func (wb *WriteBatcher) Flush() error {
wb.mu.Lock()
if len(wb.pending) == 0 {
wb.mu.Unlock()
return nil
}
// Swap out pending slice
toFlush := wb.pending
toFlushSize := wb.pendingSize
wb.pending = make([]*EventBatch, 0, wb.maxEvents)
wb.pendingSize = 0
wb.mu.Unlock()
startTime := time.Now()
// Collect all edge keys for bloom filter update
var allEdgeKeys []EdgeKey
for _, batch := range toFlush {
allEdgeKeys = append(allEdgeKeys, batch.EdgeKeys...)
}
// Update bloom filter first (memory only)
if len(allEdgeKeys) > 0 {
wb.bloom.AddBatch(allEdgeKeys)
}
// Write all batches in a single transaction
err := wb.db.Update(func(tx *bolt.Tx) error {
for _, batch := range toFlush {
// Write compact event data
cmpBucket := tx.Bucket(bucketCmp)
if cmpBucket != nil {
key := makeSerialKey(batch.Serial)
if err := cmpBucket.Put(key, batch.EventData); err != nil {
return err
}
}
// Write all indexes
for _, idx := range batch.Indexes {
bucket := tx.Bucket(idx.BucketName)
if bucket == nil {
continue
}
if idx.IsDelete {
if err := bucket.Delete(idx.Key); err != nil {
return err
}
} else {
if err := bucket.Put(idx.Key, idx.Value); err != nil {
return err
}
}
}
// Write event vertex
if batch.EventVertex != nil {
evBucket := tx.Bucket(bucketEv)
if evBucket != nil {
key := makeSerialKey(batch.Serial)
if err := evBucket.Put(key, batch.EventVertex.Encode()); err != nil {
return err
}
}
}
// Update pubkey vertices
if err := wb.updatePubkeyVertex(tx, batch.PubkeyUpdate); err != nil {
return err
}
for _, mentionUpdate := range batch.MentionUpdates {
if err := wb.updatePubkeyVertex(tx, mentionUpdate); err != nil {
return err
}
}
}
return nil
})
// Update stats
duration := time.Since(startTime)
wb.mu.Lock()
wb.stats.TotalBatches++
wb.stats.TotalEvents += uint64(len(toFlush))
wb.stats.TotalBytes += uint64(toFlushSize)
wb.stats.LastFlushTime = time.Now()
wb.stats.LastFlushDuration = duration
latencyMs := float64(duration.Milliseconds())
wb.stats.AverageLatencyMs = (wb.stats.AverageLatencyMs*float64(wb.stats.TotalBatches-1) + latencyMs) / float64(wb.stats.TotalBatches)
wb.mu.Unlock()
if err == nil {
wb.logger.Debugf("bbolt: flushed %d events (%d bytes) in %v", len(toFlush), toFlushSize, duration)
}
return err
}
// updatePubkeyVertex reads, updates, and writes a pubkey vertex
func (wb *WriteBatcher) updatePubkeyVertex(tx *bolt.Tx, update *PubkeyVertexUpdate) error {
if update == nil {
return nil
}
pvBucket := tx.Bucket(bucketPv)
if pvBucket == nil {
return nil
}
key := makeSerialKey(update.PubkeySerial)
// Load existing vertex or create new
var pv PubkeyVertex
existing := pvBucket.Get(key)
if existing != nil {
if err := pv.Decode(existing); chk.E(err) {
// If decode fails, start fresh
pv = PubkeyVertex{}
}
}
// Apply updates
if update.AddAuthored != 0 {
pv.AddAuthored(update.AddAuthored)
}
if update.AddMention != 0 {
pv.AddMention(update.AddMention)
}
// Write back
return pvBucket.Put(key, pv.Encode())
}
// Shutdown gracefully shuts down the batcher
func (wb *WriteBatcher) Shutdown() error {
wb.mu.Lock()
wb.stopped = true
wb.mu.Unlock()
close(wb.shutdownCh)
<-wb.doneCh
return nil
}
// Stats returns current batcher statistics
func (wb *WriteBatcher) Stats() BatcherStats {
wb.mu.Lock()
defer wb.mu.Unlock()
return wb.stats
}
// PendingCount returns the number of pending events
func (wb *WriteBatcher) PendingCount() int {
wb.mu.Lock()
defer wb.mu.Unlock()
return len(wb.pending)
}
// ErrBatcherStopped is returned when adding to a stopped batcher
var ErrBatcherStopped = &batcherStoppedError{}
type batcherStoppedError struct{}
func (e *batcherStoppedError) Error() string {
return "batcher has been stopped"
}

325
pkg/bbolt/bbolt.go Normal file
View File

@@ -0,0 +1,325 @@
//go:build !(js && wasm)
package bbolt
import (
"context"
"errors"
"os"
"path/filepath"
"sync"
"time"
bolt "go.etcd.io/bbolt"
"lol.mleku.dev"
"lol.mleku.dev/chk"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/utils/apputil"
)
// Bucket names - map to existing index prefixes but without the 3-byte prefix in keys
var (
bucketEvt = []byte("evt") // Event storage: serial -> compact event data
bucketEid = []byte("eid") // Event ID index
bucketFpc = []byte("fpc") // Full ID/pubkey index
bucketC = []byte("c--") // Created at index
bucketKc = []byte("kc-") // Kind + created index
bucketPc = []byte("pc-") // Pubkey + created index
bucketKpc = []byte("kpc") // Kind + pubkey + created
bucketTc = []byte("tc-") // Tag + created
bucketTkc = []byte("tkc") // Tag + kind + created
bucketTpc = []byte("tpc") // Tag + pubkey + created
bucketTkp = []byte("tkp") // Tag + kind + pubkey + created
bucketWrd = []byte("wrd") // Word search index
bucketExp = []byte("exp") // Expiration index
bucketPks = []byte("pks") // Pubkey hash -> serial
bucketSpk = []byte("spk") // Serial -> pubkey
bucketSei = []byte("sei") // Serial -> event ID
bucketCmp = []byte("cmp") // Compact event storage
bucketEv = []byte("ev") // Event vertices (adjacency list)
bucketPv = []byte("pv") // Pubkey vertices (adjacency list)
bucketMeta = []byte("_meta") // Markers, version, serial counter, bloom filter
)
// All buckets that need to be created on init
var allBuckets = [][]byte{
bucketEvt, bucketEid, bucketFpc, bucketC, bucketKc, bucketPc, bucketKpc,
bucketTc, bucketTkc, bucketTpc, bucketTkp, bucketWrd, bucketExp,
bucketPks, bucketSpk, bucketSei, bucketCmp, bucketEv, bucketPv, bucketMeta,
}
// B implements the database.Database interface using BBolt as the storage backend.
// Optimized for HDD with write batching and adjacency list graph storage.
type B struct {
ctx context.Context
cancel context.CancelFunc
dataDir string
Logger *Logger
db *bolt.DB
ready chan struct{}
// Write batching
batcher *WriteBatcher
// Serial management
serialMu sync.Mutex
nextSerial uint64
nextPubkeySeq uint64
// Edge bloom filter for fast negative lookups
edgeBloom *EdgeBloomFilter
// Configuration
cfg *BboltConfig
}
// BboltConfig holds bbolt-specific configuration
type BboltConfig struct {
DataDir string
LogLevel string
// Batch settings (tuned for 7200rpm HDD)
BatchMaxEvents int // Max events before flush (default: 5000)
BatchMaxBytes int64 // Max bytes before flush (default: 128MB)
BatchFlushTimeout time.Duration // Max time before flush (default: 30s)
// Bloom filter settings
BloomSizeMB int // Bloom filter size in MB (default: 16)
// BBolt settings
NoSync bool // Disable fsync for performance (DANGEROUS)
InitialMmapSize int // Initial mmap size in bytes
}
// Ensure B implements Database interface at compile time
var _ database.Database = (*B)(nil)
// New creates a new BBolt database instance with default configuration.
func New(
ctx context.Context, cancel context.CancelFunc, dataDir, logLevel string,
) (b *B, err error) {
cfg := &BboltConfig{
DataDir: dataDir,
LogLevel: logLevel,
BatchMaxEvents: 5000,
BatchMaxBytes: 128 * 1024 * 1024, // 128MB
BatchFlushTimeout: 30 * time.Second,
BloomSizeMB: 16,
InitialMmapSize: 8 * 1024 * 1024 * 1024, // 8GB
}
return NewWithConfig(ctx, cancel, cfg)
}
// NewWithConfig creates a new BBolt database instance with full configuration.
func NewWithConfig(
ctx context.Context, cancel context.CancelFunc, cfg *BboltConfig,
) (b *B, err error) {
// Apply defaults
if cfg.BatchMaxEvents <= 0 {
cfg.BatchMaxEvents = 5000
}
if cfg.BatchMaxBytes <= 0 {
cfg.BatchMaxBytes = 128 * 1024 * 1024
}
if cfg.BatchFlushTimeout <= 0 {
cfg.BatchFlushTimeout = 30 * time.Second
}
if cfg.BloomSizeMB <= 0 {
cfg.BloomSizeMB = 16
}
if cfg.InitialMmapSize <= 0 {
cfg.InitialMmapSize = 8 * 1024 * 1024 * 1024
}
b = &B{
ctx: ctx,
cancel: cancel,
dataDir: cfg.DataDir,
Logger: NewLogger(lol.GetLogLevel(cfg.LogLevel), cfg.DataDir),
ready: make(chan struct{}),
cfg: cfg,
}
// Ensure the data directory exists
if err = os.MkdirAll(cfg.DataDir, 0755); chk.E(err) {
return
}
if err = apputil.EnsureDir(filepath.Join(cfg.DataDir, "dummy")); chk.E(err) {
return
}
// Open BBolt database
dbPath := filepath.Join(cfg.DataDir, "orly.db")
opts := &bolt.Options{
Timeout: 10 * time.Second,
NoSync: cfg.NoSync,
InitialMmapSize: cfg.InitialMmapSize,
}
if b.db, err = bolt.Open(dbPath, 0600, opts); chk.E(err) {
return
}
// Create all buckets
if err = b.db.Update(func(tx *bolt.Tx) error {
for _, bucket := range allBuckets {
if _, err := tx.CreateBucketIfNotExists(bucket); err != nil {
return err
}
}
return nil
}); chk.E(err) {
return
}
// Initialize serial counters
if err = b.initSerialCounters(); chk.E(err) {
return
}
// Initialize bloom filter
b.edgeBloom, err = NewEdgeBloomFilter(cfg.BloomSizeMB, b.db)
if chk.E(err) {
return
}
// Initialize write batcher
b.batcher = NewWriteBatcher(b.db, b.edgeBloom, cfg, b.Logger)
// Run migrations
b.RunMigrations()
// Start warmup and mark ready
go b.warmup()
// Start background maintenance
go b.backgroundLoop()
return
}
// Path returns the path where the database files are stored.
func (b *B) Path() string { return b.dataDir }
// Init initializes the database with the given path.
func (b *B) Init(path string) error {
b.dataDir = path
return nil
}
// Sync flushes the database buffers to disk.
func (b *B) Sync() error {
// Flush pending writes
if err := b.batcher.Flush(); err != nil {
return err
}
// Persist bloom filter
if err := b.edgeBloom.Persist(b.db); err != nil {
return err
}
// Persist serial counters
if err := b.persistSerialCounters(); err != nil {
return err
}
// Sync BBolt
return b.db.Sync()
}
// Close releases resources and closes the database.
func (b *B) Close() (err error) {
b.Logger.Infof("bbolt: closing database...")
// Stop accepting new writes and flush pending
if b.batcher != nil {
if err = b.batcher.Shutdown(); chk.E(err) {
// Log but continue cleanup
}
}
// Persist bloom filter
if b.edgeBloom != nil {
if err = b.edgeBloom.Persist(b.db); chk.E(err) {
// Log but continue cleanup
}
}
// Persist serial counters
if err = b.persistSerialCounters(); chk.E(err) {
// Log but continue cleanup
}
// Close BBolt database
if b.db != nil {
if err = b.db.Close(); chk.E(err) {
return
}
}
b.Logger.Infof("bbolt: database closed")
return
}
// Wipe deletes all data in the database.
func (b *B) Wipe() error {
return b.db.Update(func(tx *bolt.Tx) error {
for _, bucket := range allBuckets {
if err := tx.DeleteBucket(bucket); err != nil && !errors.Is(err, bolt.ErrBucketNotFound) {
return err
}
if _, err := tx.CreateBucket(bucket); err != nil {
return err
}
}
// Reset serial counters
b.serialMu.Lock()
b.nextSerial = 1
b.nextPubkeySeq = 1
b.serialMu.Unlock()
// Reset bloom filter
b.edgeBloom.Reset()
return nil
})
}
// SetLogLevel changes the logging level.
func (b *B) SetLogLevel(level string) {
b.Logger.SetLogLevel(lol.GetLogLevel(level))
}
// Ready returns a channel that closes when the database is ready to serve requests.
func (b *B) Ready() <-chan struct{} {
return b.ready
}
// warmup performs database warmup operations and closes the ready channel when complete.
func (b *B) warmup() {
defer close(b.ready)
// Give the database time to settle
time.Sleep(1 * time.Second)
b.Logger.Infof("bbolt: database warmup complete, ready to serve requests")
}
// backgroundLoop runs periodic maintenance tasks.
func (b *B) backgroundLoop() {
expirationTicker := time.NewTicker(10 * time.Minute)
bloomPersistTicker := time.NewTicker(5 * time.Minute)
defer expirationTicker.Stop()
defer bloomPersistTicker.Stop()
for {
select {
case <-expirationTicker.C:
b.DeleteExpired()
case <-bloomPersistTicker.C:
if err := b.edgeBloom.Persist(b.db); chk.E(err) {
b.Logger.Warningf("bbolt: failed to persist bloom filter: %v", err)
}
case <-b.ctx.Done():
b.cancel()
return
}
}
}

192
pkg/bbolt/bloom.go Normal file
View File

@@ -0,0 +1,192 @@
//go:build !(js && wasm)
package bbolt
import (
"bytes"
"encoding/binary"
"sync"
"github.com/bits-and-blooms/bloom/v3"
bolt "go.etcd.io/bbolt"
"lol.mleku.dev/chk"
)
const bloomFilterKey = "edge_bloom_filter"
// EdgeBloomFilter provides fast negative lookups for edge existence checks.
// Uses a bloom filter to avoid disk seeks when checking if an edge exists.
type EdgeBloomFilter struct {
mu sync.RWMutex
filter *bloom.BloomFilter
// Track if filter has been modified since last persist
dirty bool
}
// NewEdgeBloomFilter creates or loads the edge bloom filter.
// sizeMB is the approximate size in megabytes.
// With 1% false positive rate, 16MB can hold ~10 million edges.
func NewEdgeBloomFilter(sizeMB int, db *bolt.DB) (*EdgeBloomFilter, error) {
ebf := &EdgeBloomFilter{}
// Try to load from database
var loaded bool
err := db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketMeta)
if bucket == nil {
return nil
}
data := bucket.Get([]byte(bloomFilterKey))
if data == nil {
return nil
}
// Deserialize bloom filter
reader := bytes.NewReader(data)
filter := &bloom.BloomFilter{}
if _, err := filter.ReadFrom(reader); err != nil {
return err
}
ebf.filter = filter
loaded = true
return nil
})
if chk.E(err) {
return nil, err
}
if !loaded {
// Create new filter
// Calculate parameters: m bits, k hash functions
// For 1% false positive rate: m/n ≈ 9.6, k ≈ 7
bitsPerMB := 8 * 1024 * 1024
totalBits := uint(sizeMB * bitsPerMB)
// Estimate capacity based on 10 bits per element for 1% FPR
estimatedCapacity := uint(totalBits / 10)
ebf.filter = bloom.NewWithEstimates(estimatedCapacity, 0.01)
}
return ebf, nil
}
// Add adds an edge to the bloom filter.
// An edge is represented by source and destination serials plus edge type.
func (ebf *EdgeBloomFilter) Add(srcSerial, dstSerial uint64, edgeType byte) {
ebf.mu.Lock()
defer ebf.mu.Unlock()
key := ebf.makeKey(srcSerial, dstSerial, edgeType)
ebf.filter.Add(key)
ebf.dirty = true
}
// AddBatch adds multiple edges to the bloom filter.
func (ebf *EdgeBloomFilter) AddBatch(edges []EdgeKey) {
ebf.mu.Lock()
defer ebf.mu.Unlock()
for _, edge := range edges {
key := ebf.makeKey(edge.SrcSerial, edge.DstSerial, edge.EdgeType)
ebf.filter.Add(key)
}
ebf.dirty = true
}
// MayExist checks if an edge might exist.
// Returns false if definitely doesn't exist (no disk access needed).
// Returns true if might exist (need to check disk to confirm).
func (ebf *EdgeBloomFilter) MayExist(srcSerial, dstSerial uint64, edgeType byte) bool {
ebf.mu.RLock()
defer ebf.mu.RUnlock()
key := ebf.makeKey(srcSerial, dstSerial, edgeType)
return ebf.filter.Test(key)
}
// Persist saves the bloom filter to the database.
func (ebf *EdgeBloomFilter) Persist(db *bolt.DB) error {
ebf.mu.Lock()
if !ebf.dirty {
ebf.mu.Unlock()
return nil
}
// Serialize while holding lock
var buf bytes.Buffer
if _, err := ebf.filter.WriteTo(&buf); err != nil {
ebf.mu.Unlock()
return err
}
data := buf.Bytes()
ebf.dirty = false
ebf.mu.Unlock()
// Write to database
return db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketMeta)
if bucket == nil {
return nil
}
return bucket.Put([]byte(bloomFilterKey), data)
})
}
// Reset clears the bloom filter.
func (ebf *EdgeBloomFilter) Reset() {
ebf.mu.Lock()
defer ebf.mu.Unlock()
ebf.filter.ClearAll()
ebf.dirty = true
}
// makeKey creates a unique key for an edge.
func (ebf *EdgeBloomFilter) makeKey(srcSerial, dstSerial uint64, edgeType byte) []byte {
key := make([]byte, 17) // 8 + 8 + 1
binary.BigEndian.PutUint64(key[0:8], srcSerial)
binary.BigEndian.PutUint64(key[8:16], dstSerial)
key[16] = edgeType
return key
}
// Stats returns bloom filter statistics.
func (ebf *EdgeBloomFilter) Stats() BloomStats {
ebf.mu.RLock()
defer ebf.mu.RUnlock()
approxCount := uint64(ebf.filter.ApproximatedSize())
cap := ebf.filter.Cap()
return BloomStats{
ApproxCount: approxCount,
Cap: cap,
}
}
// BloomStats contains bloom filter statistics.
type BloomStats struct {
ApproxCount uint64 // Approximate number of elements
Cap uint // Capacity in bits
}
// EdgeKey represents an edge for batch operations.
type EdgeKey struct {
SrcSerial uint64
DstSerial uint64
EdgeType byte
}
// Edge type constants
const (
EdgeTypeAuthor byte = 0 // Event author relationship
EdgeTypePTag byte = 1 // P-tag reference (event mentions pubkey)
EdgeTypeETag byte = 2 // E-tag reference (event references event)
EdgeTypeFollows byte = 3 // Kind 3 follows relationship
EdgeTypeReaction byte = 4 // Kind 7 reaction
EdgeTypeRepost byte = 5 // Kind 6 repost
EdgeTypeReply byte = 6 // Reply (kind 1 with e-tag)
)

134
pkg/bbolt/fetch-event.go Normal file
View File

@@ -0,0 +1,134 @@
//go:build !(js && wasm)
package bbolt
import (
"bytes"
"context"
"errors"
bolt "go.etcd.io/bbolt"
"lol.mleku.dev/chk"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/database/indexes/types"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter"
)
// FetchEventBySerial fetches an event by its serial number.
func (b *B) FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error) {
if ser == nil {
return nil, errors.New("bbolt: nil serial")
}
serial := ser.Get()
key := makeSerialKey(serial)
err = b.db.View(func(tx *bolt.Tx) error {
// Get event ID first
seiBucket := tx.Bucket(bucketSei)
var eventId []byte
if seiBucket != nil {
eventId = seiBucket.Get(key)
}
// Try compact event storage first
cmpBucket := tx.Bucket(bucketCmp)
if cmpBucket != nil {
data := cmpBucket.Get(key)
if data != nil && eventId != nil && len(eventId) == 32 {
// Unmarshal compact event
resolver := &bboltSerialResolver{b: b}
ev, err = database.UnmarshalCompactEvent(data, eventId, resolver)
if err == nil {
return nil
}
// Fall through to try legacy format
}
}
// Try legacy event storage
evtBucket := tx.Bucket(bucketEvt)
if evtBucket != nil {
data := evtBucket.Get(key)
if data != nil {
ev = new(event.E)
reader := bytes.NewReader(data)
if err = ev.UnmarshalBinary(reader); err == nil {
return nil
}
}
}
return errors.New("bbolt: event not found")
})
return
}
// FetchEventsBySerials fetches multiple events by their serial numbers.
func (b *B) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*event.E, err error) {
events = make(map[uint64]*event.E, len(serials))
err = b.db.View(func(tx *bolt.Tx) error {
cmpBucket := tx.Bucket(bucketCmp)
evtBucket := tx.Bucket(bucketEvt)
seiBucket := tx.Bucket(bucketSei)
resolver := &bboltSerialResolver{b: b}
for _, ser := range serials {
if ser == nil {
continue
}
serial := ser.Get()
key := makeSerialKey(serial)
// Get event ID
var eventId []byte
if seiBucket != nil {
eventId = seiBucket.Get(key)
}
// Try compact event storage first
if cmpBucket != nil {
data := cmpBucket.Get(key)
if data != nil && eventId != nil && len(eventId) == 32 {
ev, e := database.UnmarshalCompactEvent(data, eventId, resolver)
if e == nil {
events[serial] = ev
continue
}
}
}
// Try legacy event storage
if evtBucket != nil {
data := evtBucket.Get(key)
if data != nil {
ev := new(event.E)
reader := bytes.NewReader(data)
if e := ev.UnmarshalBinary(reader); e == nil {
events[serial] = ev
}
}
}
}
return nil
})
return
}
// CountEvents counts events matching a filter.
func (b *B) CountEvents(c context.Context, f *filter.F) (count int, approximate bool, err error) {
// Get serials matching filter
var serials types.Uint40s
if serials, err = b.GetSerialsFromFilter(f); chk.E(err) {
return
}
count = len(serials)
approximate = false
return
}

View File

@@ -0,0 +1,179 @@
//go:build !(js && wasm)
package bbolt
import (
"bytes"
"errors"
bolt "go.etcd.io/bbolt"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/database/indexes/types"
"next.orly.dev/pkg/interfaces/store"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/tag"
)
// GetSerialById gets the serial for an event ID.
func (b *B) GetSerialById(id []byte) (ser *types.Uint40, err error) {
if len(id) < 8 {
return nil, errors.New("bbolt: invalid event ID length")
}
idHash := hashEventId(id)
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketEid)
if bucket == nil {
return errors.New("id not found in database")
}
// Scan for matching ID hash prefix
c := bucket.Cursor()
for k, _ := c.Seek(idHash); k != nil && bytes.HasPrefix(k, idHash); k, _ = c.Next() {
// Key format: id_hash(8) | serial(5)
if len(k) >= 13 {
ser = new(types.Uint40)
ser.Set(decodeUint40(k[8:13]))
return nil
}
}
return errors.New("id not found in database")
})
return
}
// GetSerialsByIds gets serials for multiple event IDs.
func (b *B) GetSerialsByIds(ids *tag.T) (serials map[string]*types.Uint40, err error) {
serials = make(map[string]*types.Uint40, ids.Len())
if ids == nil || ids.Len() == 0 {
return
}
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketEid)
if bucket == nil {
return nil
}
// Iterate over the tag entries using the .T field
for _, id := range ids.T {
if len(id) < 8 {
continue
}
idHash := hashEventId(id)
c := bucket.Cursor()
for k, _ := c.Seek(idHash); k != nil && bytes.HasPrefix(k, idHash); k, _ = c.Next() {
if len(k) >= 13 {
ser := new(types.Uint40)
ser.Set(decodeUint40(k[8:13]))
serials[string(id)] = ser
break
}
}
}
return nil
})
return
}
// GetSerialsByIdsWithFilter gets serials with a filter function.
func (b *B) GetSerialsByIdsWithFilter(ids *tag.T, fn func(ev *event.E, ser *types.Uint40) bool) (serials map[string]*types.Uint40, err error) {
// For now, just call GetSerialsByIds - full implementation would apply filter
return b.GetSerialsByIds(ids)
}
// GetSerialsByRange gets serials within a key range.
func (b *B) GetSerialsByRange(idx database.Range) (serials types.Uint40s, err error) {
if len(idx.Start) < 3 {
return nil, errors.New("bbolt: invalid range start")
}
// Extract bucket name from prefix
bucketName := idx.Start[:3]
startKey := idx.Start[3:]
endKey := idx.End[3:]
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketName)
if bucket == nil {
return nil
}
c := bucket.Cursor()
for k, _ := c.Seek(startKey); k != nil; k, _ = c.Next() {
// Check if we've passed the end
if len(endKey) > 0 && bytes.Compare(k, endKey) >= 0 {
break
}
// Extract serial from end of key (last 5 bytes)
if len(k) >= 5 {
ser := new(types.Uint40)
ser.Set(decodeUint40(k[len(k)-5:]))
serials = append(serials, ser)
}
}
return nil
})
return
}
// GetFullIdPubkeyBySerial gets full event ID and pubkey by serial.
func (b *B) GetFullIdPubkeyBySerial(ser *types.Uint40) (fidpk *store.IdPkTs, err error) {
if ser == nil {
return nil, errors.New("bbolt: nil serial")
}
serial := ser.Get()
key := makeSerialKey(serial)
err = b.db.View(func(tx *bolt.Tx) error {
// Get full ID/pubkey from fpc bucket
fpcBucket := tx.Bucket(bucketFpc)
if fpcBucket == nil {
return errors.New("bbolt: fpc bucket not found")
}
// Scan for matching serial prefix
c := fpcBucket.Cursor()
for k, _ := c.Seek(key); k != nil && bytes.HasPrefix(k, key); k, _ = c.Next() {
// Key format: serial(5) | id(32) | pubkey_hash(8) | created_at(8)
if len(k) >= 53 {
fidpk = &store.IdPkTs{
Ser: serial,
}
fidpk.Id = make([]byte, 32)
copy(fidpk.Id, k[5:37])
// Pubkey is only hash here, need to look up full pubkey
// For now return what we have
fidpk.Pub = make([]byte, 8)
copy(fidpk.Pub, k[37:45])
fidpk.Ts = int64(decodeUint64(k[45:53]))
return nil
}
}
return errors.New("bbolt: serial not found in fpc index")
})
return
}
// GetFullIdPubkeyBySerials gets full event IDs and pubkeys for multiple serials.
func (b *B) GetFullIdPubkeyBySerials(sers []*types.Uint40) (fidpks []*store.IdPkTs, err error) {
fidpks = make([]*store.IdPkTs, 0, len(sers))
for _, ser := range sers {
fidpk, e := b.GetFullIdPubkeyBySerial(ser)
if e == nil && fidpk != nil {
fidpks = append(fidpks, fidpk)
}
}
return
}

250
pkg/bbolt/graph.go Normal file
View File

@@ -0,0 +1,250 @@
//go:build !(js && wasm)
package bbolt
import (
"bytes"
"encoding/binary"
"io"
)
// EventVertex stores the adjacency list for an event.
// Contains the author and all edges to other events/pubkeys.
type EventVertex struct {
AuthorSerial uint64 // Serial of the author pubkey
Kind uint16 // Event kind
PTagSerials []uint64 // Serials of pubkeys mentioned (p-tags)
ETagSerials []uint64 // Serials of events referenced (e-tags)
}
// Encode serializes the EventVertex to bytes.
// Format: author(5) | kind(2) | ptag_count(varint) | [ptag_serials(5)...] | etag_count(varint) | [etag_serials(5)...]
func (ev *EventVertex) Encode() []byte {
// Calculate size
size := 5 + 2 + 2 + len(ev.PTagSerials)*5 + 2 + len(ev.ETagSerials)*5
buf := make([]byte, 0, size)
// Author serial (5 bytes)
authorBuf := make([]byte, 5)
encodeUint40(ev.AuthorSerial, authorBuf)
buf = append(buf, authorBuf...)
// Kind (2 bytes)
kindBuf := make([]byte, 2)
binary.BigEndian.PutUint16(kindBuf, ev.Kind)
buf = append(buf, kindBuf...)
// P-tag count and serials
ptagCountBuf := make([]byte, 2)
binary.BigEndian.PutUint16(ptagCountBuf, uint16(len(ev.PTagSerials)))
buf = append(buf, ptagCountBuf...)
for _, serial := range ev.PTagSerials {
serialBuf := make([]byte, 5)
encodeUint40(serial, serialBuf)
buf = append(buf, serialBuf...)
}
// E-tag count and serials
etagCountBuf := make([]byte, 2)
binary.BigEndian.PutUint16(etagCountBuf, uint16(len(ev.ETagSerials)))
buf = append(buf, etagCountBuf...)
for _, serial := range ev.ETagSerials {
serialBuf := make([]byte, 5)
encodeUint40(serial, serialBuf)
buf = append(buf, serialBuf...)
}
return buf
}
// Decode deserializes bytes into an EventVertex.
func (ev *EventVertex) Decode(data []byte) error {
if len(data) < 9 { // minimum: author(5) + kind(2) + ptag_count(2)
return io.ErrUnexpectedEOF
}
reader := bytes.NewReader(data)
// Author serial
authorBuf := make([]byte, 5)
if _, err := reader.Read(authorBuf); err != nil {
return err
}
ev.AuthorSerial = decodeUint40(authorBuf)
// Kind
kindBuf := make([]byte, 2)
if _, err := reader.Read(kindBuf); err != nil {
return err
}
ev.Kind = binary.BigEndian.Uint16(kindBuf)
// P-tags
ptagCountBuf := make([]byte, 2)
if _, err := reader.Read(ptagCountBuf); err != nil {
return err
}
ptagCount := binary.BigEndian.Uint16(ptagCountBuf)
ev.PTagSerials = make([]uint64, ptagCount)
for i := uint16(0); i < ptagCount; i++ {
serialBuf := make([]byte, 5)
if _, err := reader.Read(serialBuf); err != nil {
return err
}
ev.PTagSerials[i] = decodeUint40(serialBuf)
}
// E-tags
etagCountBuf := make([]byte, 2)
if _, err := reader.Read(etagCountBuf); err != nil {
return err
}
etagCount := binary.BigEndian.Uint16(etagCountBuf)
ev.ETagSerials = make([]uint64, etagCount)
for i := uint16(0); i < etagCount; i++ {
serialBuf := make([]byte, 5)
if _, err := reader.Read(serialBuf); err != nil {
return err
}
ev.ETagSerials[i] = decodeUint40(serialBuf)
}
return nil
}
// PubkeyVertex stores the adjacency list for a pubkey.
// Contains all events authored by or mentioning this pubkey.
type PubkeyVertex struct {
AuthoredEvents []uint64 // Event serials this pubkey authored
MentionedIn []uint64 // Event serials that mention this pubkey (p-tags)
}
// Encode serializes the PubkeyVertex to bytes.
// Format: authored_count(varint) | [serials(5)...] | mentioned_count(varint) | [serials(5)...]
func (pv *PubkeyVertex) Encode() []byte {
size := 2 + len(pv.AuthoredEvents)*5 + 2 + len(pv.MentionedIn)*5
buf := make([]byte, 0, size)
// Authored events
authoredCountBuf := make([]byte, 2)
binary.BigEndian.PutUint16(authoredCountBuf, uint16(len(pv.AuthoredEvents)))
buf = append(buf, authoredCountBuf...)
for _, serial := range pv.AuthoredEvents {
serialBuf := make([]byte, 5)
encodeUint40(serial, serialBuf)
buf = append(buf, serialBuf...)
}
// Mentioned in events
mentionedCountBuf := make([]byte, 2)
binary.BigEndian.PutUint16(mentionedCountBuf, uint16(len(pv.MentionedIn)))
buf = append(buf, mentionedCountBuf...)
for _, serial := range pv.MentionedIn {
serialBuf := make([]byte, 5)
encodeUint40(serial, serialBuf)
buf = append(buf, serialBuf...)
}
return buf
}
// Decode deserializes bytes into a PubkeyVertex.
func (pv *PubkeyVertex) Decode(data []byte) error {
if len(data) < 4 { // minimum: authored_count(2) + mentioned_count(2)
return io.ErrUnexpectedEOF
}
reader := bytes.NewReader(data)
// Authored events
authoredCountBuf := make([]byte, 2)
if _, err := reader.Read(authoredCountBuf); err != nil {
return err
}
authoredCount := binary.BigEndian.Uint16(authoredCountBuf)
pv.AuthoredEvents = make([]uint64, authoredCount)
for i := uint16(0); i < authoredCount; i++ {
serialBuf := make([]byte, 5)
if _, err := reader.Read(serialBuf); err != nil {
return err
}
pv.AuthoredEvents[i] = decodeUint40(serialBuf)
}
// Mentioned in events
mentionedCountBuf := make([]byte, 2)
if _, err := reader.Read(mentionedCountBuf); err != nil {
return err
}
mentionedCount := binary.BigEndian.Uint16(mentionedCountBuf)
pv.MentionedIn = make([]uint64, mentionedCount)
for i := uint16(0); i < mentionedCount; i++ {
serialBuf := make([]byte, 5)
if _, err := reader.Read(serialBuf); err != nil {
return err
}
pv.MentionedIn[i] = decodeUint40(serialBuf)
}
return nil
}
// AddAuthored adds an event serial to the authored list if not already present.
func (pv *PubkeyVertex) AddAuthored(eventSerial uint64) {
for _, s := range pv.AuthoredEvents {
if s == eventSerial {
return
}
}
pv.AuthoredEvents = append(pv.AuthoredEvents, eventSerial)
}
// AddMention adds an event serial to the mentioned list if not already present.
func (pv *PubkeyVertex) AddMention(eventSerial uint64) {
for _, s := range pv.MentionedIn {
if s == eventSerial {
return
}
}
pv.MentionedIn = append(pv.MentionedIn, eventSerial)
}
// RemoveAuthored removes an event serial from the authored list.
func (pv *PubkeyVertex) RemoveAuthored(eventSerial uint64) {
for i, s := range pv.AuthoredEvents {
if s == eventSerial {
pv.AuthoredEvents = append(pv.AuthoredEvents[:i], pv.AuthoredEvents[i+1:]...)
return
}
}
}
// RemoveMention removes an event serial from the mentioned list.
func (pv *PubkeyVertex) RemoveMention(eventSerial uint64) {
for i, s := range pv.MentionedIn {
if s == eventSerial {
pv.MentionedIn = append(pv.MentionedIn[:i], pv.MentionedIn[i+1:]...)
return
}
}
}
// HasAuthored checks if the pubkey authored the given event.
func (pv *PubkeyVertex) HasAuthored(eventSerial uint64) bool {
for _, s := range pv.AuthoredEvents {
if s == eventSerial {
return true
}
}
return false
}
// IsMentionedIn checks if the pubkey is mentioned in the given event.
func (pv *PubkeyVertex) IsMentionedIn(eventSerial uint64) bool {
for _, s := range pv.MentionedIn {
if s == eventSerial {
return true
}
}
return false
}

119
pkg/bbolt/helpers.go Normal file
View File

@@ -0,0 +1,119 @@
//go:build !(js && wasm)
package bbolt
import (
"encoding/binary"
)
// encodeUint40 encodes a uint64 as 5 bytes (big-endian, truncated to 40 bits)
func encodeUint40(v uint64, buf []byte) {
buf[0] = byte(v >> 32)
buf[1] = byte(v >> 24)
buf[2] = byte(v >> 16)
buf[3] = byte(v >> 8)
buf[4] = byte(v)
}
// decodeUint40 decodes 5 bytes as uint64
func decodeUint40(buf []byte) uint64 {
return uint64(buf[0])<<32 |
uint64(buf[1])<<24 |
uint64(buf[2])<<16 |
uint64(buf[3])<<8 |
uint64(buf[4])
}
// encodeUint64 encodes a uint64 as 8 bytes (big-endian)
func encodeUint64(v uint64, buf []byte) {
binary.BigEndian.PutUint64(buf, v)
}
// decodeUint64 decodes 8 bytes as uint64
func decodeUint64(buf []byte) uint64 {
return binary.BigEndian.Uint64(buf)
}
// encodeUint32 encodes a uint32 as 4 bytes (big-endian)
func encodeUint32(v uint32, buf []byte) {
binary.BigEndian.PutUint32(buf, v)
}
// decodeUint32 decodes 4 bytes as uint32
func decodeUint32(buf []byte) uint32 {
return binary.BigEndian.Uint32(buf)
}
// encodeUint16 encodes a uint16 as 2 bytes (big-endian)
func encodeUint16(v uint16, buf []byte) {
binary.BigEndian.PutUint16(buf, v)
}
// decodeUint16 decodes 2 bytes as uint16
func decodeUint16(buf []byte) uint16 {
return binary.BigEndian.Uint16(buf)
}
// encodeVarint encodes a uint64 as a variable-length integer
// Returns the number of bytes written
func encodeVarint(v uint64, buf []byte) int {
return binary.PutUvarint(buf, v)
}
// decodeVarint decodes a variable-length integer
// Returns the value and the number of bytes read
func decodeVarint(buf []byte) (uint64, int) {
return binary.Uvarint(buf)
}
// makeSerialKey creates a 5-byte key from a serial number
func makeSerialKey(serial uint64) []byte {
key := make([]byte, 5)
encodeUint40(serial, key)
return key
}
// makePubkeyHashKey creates an 8-byte key from a pubkey hash
func makePubkeyHashKey(hash []byte) []byte {
key := make([]byte, 8)
copy(key, hash[:8])
return key
}
// makeIdHashKey creates an 8-byte key from an event ID hash
func makeIdHashKey(id []byte) []byte {
key := make([]byte, 8)
copy(key, id[:8])
return key
}
// hashPubkey returns the first 8 bytes of a 32-byte pubkey as a hash
func hashPubkey(pubkey []byte) []byte {
if len(pubkey) < 8 {
return pubkey
}
return pubkey[:8]
}
// hashEventId returns the first 8 bytes of a 32-byte event ID as a hash
func hashEventId(id []byte) []byte {
if len(id) < 8 {
return id
}
return id[:8]
}
// concatenate joins multiple byte slices into one
func concatenate(slices ...[]byte) []byte {
var totalLen int
for _, s := range slices {
totalLen += len(s)
}
result := make([]byte, totalLen)
var offset int
for _, s := range slices {
copy(result[offset:], s)
offset += len(s)
}
return result
}

66
pkg/bbolt/identity.go Normal file
View File

@@ -0,0 +1,66 @@
//go:build !(js && wasm)
package bbolt
import (
"crypto/rand"
"errors"
bolt "go.etcd.io/bbolt"
)
const identityKey = "relay_identity_secret"
// GetRelayIdentitySecret gets the relay's identity secret key.
func (b *B) GetRelayIdentitySecret() (skb []byte, err error) {
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketMeta)
if bucket == nil {
return errors.New("bbolt: meta bucket not found")
}
data := bucket.Get([]byte(identityKey))
if data == nil {
return errors.New("bbolt: relay identity not set")
}
skb = make([]byte, len(data))
copy(skb, data)
return nil
})
return
}
// SetRelayIdentitySecret sets the relay's identity secret key.
func (b *B) SetRelayIdentitySecret(skb []byte) error {
if len(skb) != 32 {
return errors.New("bbolt: invalid secret key length (must be 32 bytes)")
}
return b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketMeta)
if bucket == nil {
return errors.New("bbolt: meta bucket not found")
}
return bucket.Put([]byte(identityKey), skb)
})
}
// GetOrCreateRelayIdentitySecret gets or creates the relay's identity secret.
func (b *B) GetOrCreateRelayIdentitySecret() (skb []byte, err error) {
// Try to get existing secret
skb, err = b.GetRelayIdentitySecret()
if err == nil && len(skb) == 32 {
return skb, nil
}
// Generate new secret
skb = make([]byte, 32)
if _, err = rand.Read(skb); err != nil {
return nil, err
}
// Store it
if err = b.SetRelayIdentitySecret(skb); err != nil {
return nil, err
}
return skb, nil
}

306
pkg/bbolt/import-export.go Normal file
View File

@@ -0,0 +1,306 @@
//go:build !(js && wasm)
package bbolt
import (
"bufio"
"context"
"io"
"os"
"runtime/debug"
"strings"
"time"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"git.mleku.dev/mleku/nostr/encoders/event"
)
const maxLen = 500000000
// ImportEventsFromReader imports events from an io.Reader containing JSONL data
func (b *B) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
startTime := time.Now()
log.I.F("bbolt import: starting import operation")
// Store to disk so we can return fast
tmpPath := os.TempDir() + string(os.PathSeparator) + "orly"
os.MkdirAll(tmpPath, 0700)
tmp, err := os.CreateTemp(tmpPath, "")
if chk.E(err) {
return err
}
tmpName := tmp.Name()
defer os.Remove(tmpName) // Clean up temp file when done
log.I.F("bbolt import: buffering upload to %s", tmpName)
bufferStart := time.Now()
bytesBuffered, err := io.Copy(tmp, rr)
if chk.E(err) {
return err
}
bufferElapsed := time.Since(bufferStart)
log.I.F("bbolt import: buffered %.2f MB in %v (%.2f MB/sec)",
float64(bytesBuffered)/1024/1024, bufferElapsed.Round(time.Millisecond),
float64(bytesBuffered)/bufferElapsed.Seconds()/1024/1024)
if _, err = tmp.Seek(0, 0); chk.E(err) {
return err
}
count, processErr := b.processJSONLEventsReturningCount(ctx, tmp)
// Close temp file to release resources before index building
tmp.Close()
if processErr != nil {
return processErr
}
// Build indexes after events are stored (minimal import mode)
if count > 0 {
// Force garbage collection to reclaim memory before index building
debug.FreeOSMemory()
log.I.F("bbolt import: building indexes for %d events...", count)
if err := b.BuildIndexes(ctx); err != nil {
log.E.F("bbolt import: failed to build indexes: %v", err)
return err
}
}
totalElapsed := time.Since(startTime)
log.I.F("bbolt import: total operation time: %v", totalElapsed.Round(time.Millisecond))
return nil
}
// ImportEventsFromStrings imports events from a slice of JSON strings with policy filtering
func (b *B) ImportEventsFromStrings(ctx context.Context, eventJSONs []string, policyManager interface {
CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error)
}) error {
// Create a reader from the string slice
reader := strings.NewReader(strings.Join(eventJSONs, "\n"))
return b.processJSONLEventsWithPolicy(ctx, reader, policyManager)
}
// processJSONLEvents processes JSONL events from a reader
func (b *B) processJSONLEvents(ctx context.Context, rr io.Reader) error {
_, err := b.processJSONLEventsReturningCount(ctx, rr)
return err
}
// processJSONLEventsReturningCount processes JSONL events and returns the count saved
// This is used by ImportEventsFromReader for migration mode (minimal import without inline indexes)
func (b *B) processJSONLEventsReturningCount(ctx context.Context, rr io.Reader) (int, error) {
// Create a scanner to read the buffer line by line
scan := bufio.NewScanner(rr)
scanBuf := make([]byte, maxLen)
scan.Buffer(scanBuf, maxLen)
// Performance tracking
startTime := time.Now()
lastLogTime := startTime
const logInterval = 5 * time.Second
var count, total, skipped, unmarshalErrors, saveErrors int
for scan.Scan() {
select {
case <-ctx.Done():
log.I.F("bbolt import: context closed after %d events", count)
return count, ctx.Err()
default:
}
line := scan.Bytes()
total += len(line) + 1
if len(line) < 1 {
skipped++
continue
}
ev := event.New()
if _, err := ev.Unmarshal(line); err != nil {
ev.Free()
unmarshalErrors++
log.W.F("bbolt import: failed to unmarshal event: %v", err)
continue
}
// Minimal path for migration: store events only, indexes built later
if err := b.SaveEventMinimal(ev); err != nil {
ev.Free()
saveErrors++
log.W.F("bbolt import: failed to save event: %v", err)
continue
}
ev.Free()
line = nil
count++
// Progress logging every logInterval
if time.Since(lastLogTime) >= logInterval {
elapsed := time.Since(startTime)
eventsPerSec := float64(count) / elapsed.Seconds()
mbPerSec := float64(total) / elapsed.Seconds() / 1024 / 1024
log.I.F("bbolt import: progress %d events saved, %.2f MB read, %.0f events/sec, %.2f MB/sec",
count, float64(total)/1024/1024, eventsPerSec, mbPerSec)
lastLogTime = time.Now()
debug.FreeOSMemory()
}
}
// Flush any remaining batched events
if b.batcher != nil {
b.batcher.Flush()
}
// Final summary
elapsed := time.Since(startTime)
eventsPerSec := float64(count) / elapsed.Seconds()
mbPerSec := float64(total) / elapsed.Seconds() / 1024 / 1024
log.I.F("bbolt import: completed - %d events saved, %.2f MB in %v (%.0f events/sec, %.2f MB/sec)",
count, float64(total)/1024/1024, elapsed.Round(time.Millisecond), eventsPerSec, mbPerSec)
if unmarshalErrors > 0 || saveErrors > 0 || skipped > 0 {
log.I.F("bbolt import: stats - %d unmarshal errors, %d save errors, %d skipped empty lines",
unmarshalErrors, saveErrors, skipped)
}
if err := scan.Err(); err != nil {
return count, err
}
// Clear scanner buffer to help GC
scanBuf = nil
return count, nil
}
// processJSONLEventsWithPolicy processes JSONL events from a reader with optional policy filtering
func (b *B) processJSONLEventsWithPolicy(ctx context.Context, rr io.Reader, policyManager interface {
CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error)
}) error {
// Create a scanner to read the buffer line by line
scan := bufio.NewScanner(rr)
scanBuf := make([]byte, maxLen)
scan.Buffer(scanBuf, maxLen)
// Performance tracking
startTime := time.Now()
lastLogTime := startTime
const logInterval = 5 * time.Second
var count, total, skipped, policyRejected, unmarshalErrors, saveErrors int
for scan.Scan() {
select {
case <-ctx.Done():
log.I.F("bbolt import: context closed after %d events", count)
return ctx.Err()
default:
}
line := scan.Bytes()
total += len(line) + 1
if len(line) < 1 {
skipped++
continue
}
ev := event.New()
if _, err := ev.Unmarshal(line); err != nil {
// return the pooled buffer on error
ev.Free()
unmarshalErrors++
log.W.F("bbolt import: failed to unmarshal event: %v", err)
continue
}
// Apply policy checking if policy manager is provided
if policyManager != nil {
// For sync imports, we treat events as coming from system/trusted source
// Use nil pubkey and empty remote to indicate system-level import
allowed, policyErr := policyManager.CheckPolicy("write", ev, nil, "")
if policyErr != nil {
log.W.F("bbolt import: policy check failed for event %x: %v", ev.ID, policyErr)
ev.Free()
policyRejected++
continue
}
if !allowed {
log.D.F("bbolt import: policy rejected event %x during sync import", ev.ID)
ev.Free()
policyRejected++
continue
}
log.D.F("bbolt import: policy allowed event %x during sync import", ev.ID)
// With policy checking, use regular SaveEvent path
if _, err := b.SaveEvent(ctx, ev); err != nil {
ev.Free()
saveErrors++
log.W.F("bbolt import: failed to save event: %v", err)
continue
}
} else {
// Minimal path for migration: store events only, build indexes later
if err := b.SaveEventMinimal(ev); err != nil {
ev.Free()
saveErrors++
log.W.F("bbolt import: failed to save event: %v", err)
continue
}
}
// return the pooled buffer after successful save
ev.Free()
line = nil
count++
// Progress logging every logInterval
if time.Since(lastLogTime) >= logInterval {
elapsed := time.Since(startTime)
eventsPerSec := float64(count) / elapsed.Seconds()
mbPerSec := float64(total) / elapsed.Seconds() / 1024 / 1024
log.I.F("bbolt import: progress %d events saved, %.2f MB read, %.0f events/sec, %.2f MB/sec",
count, float64(total)/1024/1024, eventsPerSec, mbPerSec)
lastLogTime = time.Now()
debug.FreeOSMemory()
}
}
// Flush any remaining batched events
if b.batcher != nil {
b.batcher.Flush()
}
// Final summary
elapsed := time.Since(startTime)
eventsPerSec := float64(count) / elapsed.Seconds()
mbPerSec := float64(total) / elapsed.Seconds() / 1024 / 1024
log.I.F("bbolt import: completed - %d events saved, %.2f MB in %v (%.0f events/sec, %.2f MB/sec)",
count, float64(total)/1024/1024, elapsed.Round(time.Millisecond), eventsPerSec, mbPerSec)
if unmarshalErrors > 0 || saveErrors > 0 || policyRejected > 0 || skipped > 0 {
log.I.F("bbolt import: stats - %d unmarshal errors, %d save errors, %d policy rejected, %d skipped empty lines",
unmarshalErrors, saveErrors, policyRejected, skipped)
}
if err := scan.Err(); err != nil {
return err
}
return nil
}
// Import imports events from a reader (legacy interface).
func (b *B) Import(rr io.Reader) {
ctx := context.Background()
if err := b.ImportEventsFromReader(ctx, rr); err != nil {
log.E.F("bbolt import: error: %v", err)
}
}
// Export exports events to a writer.
func (b *B) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
// TODO: Implement export functionality
log.W.F("bbolt export: not yet implemented")
}

232
pkg/bbolt/import-minimal.go Normal file
View File

@@ -0,0 +1,232 @@
//go:build !(js && wasm)
package bbolt
import (
"bytes"
"context"
"errors"
"runtime/debug"
"sort"
"time"
bolt "go.etcd.io/bbolt"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/database/bufpool"
"git.mleku.dev/mleku/nostr/encoders/event"
)
// SaveEventMinimal stores only the essential event data for fast bulk import.
// It skips all indexes - call BuildIndexes after import completes.
func (b *B) SaveEventMinimal(ev *event.E) error {
if ev == nil {
return errors.New("nil event")
}
// Reject ephemeral events
if ev.Kind >= 20000 && ev.Kind <= 29999 {
return nil
}
// Get the next serial number
serial := b.getNextEventSerial()
// Serialize event in raw binary format (not compact - preserves full pubkey)
// This allows index building to work without pubkey serial resolution
legacyBuf := bufpool.GetMedium()
defer bufpool.PutMedium(legacyBuf)
ev.MarshalBinary(legacyBuf)
eventData := bufpool.CopyBytes(legacyBuf)
// Create minimal batch - only event data and ID mappings
batch := &EventBatch{
Serial: serial,
EventData: eventData,
Indexes: []BatchedWrite{
// Event ID -> Serial (for lookups)
{BucketName: bucketEid, Key: ev.ID[:], Value: makeSerialKey(serial)},
// Serial -> Event ID (for reverse lookups)
{BucketName: bucketSei, Key: makeSerialKey(serial), Value: ev.ID[:]},
},
}
return b.batcher.Add(batch)
}
// BuildIndexes builds all query indexes from stored events.
// Call this after importing events with SaveEventMinimal.
// Processes events in chunks to avoid OOM on large databases.
func (b *B) BuildIndexes(ctx context.Context) error {
log.I.F("bbolt: starting index build...")
startTime := time.Now()
// Force GC before starting to reclaim batch buffer memory
debug.FreeOSMemory()
// Process in small chunks to avoid OOM on memory-constrained systems
// With ~15 indexes per event and ~50 bytes per key, 50k events = ~37.5MB per chunk
const chunkSize = 50000
var totalEvents int
var lastSerial uint64 = 0
var lastLogTime = time.Now()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Collect indexes for this chunk
indexesByBucket := make(map[string][][]byte)
var chunkEvents int
var chunkSerial uint64
// Read a chunk of events
err := b.db.View(func(tx *bolt.Tx) error {
cmpBucket := tx.Bucket(bucketCmp)
if cmpBucket == nil {
return errors.New("cmp bucket not found")
}
cursor := cmpBucket.Cursor()
// Seek to start position
var k, v []byte
if lastSerial == 0 {
k, v = cursor.First()
} else {
// Seek past the last processed serial
seekKey := makeSerialKey(lastSerial + 1)
k, v = cursor.Seek(seekKey)
}
for ; k != nil && chunkEvents < chunkSize; k, v = cursor.Next() {
serial := decodeSerialKey(k)
chunkSerial = serial
// Decode event from raw binary format
ev := event.New()
if err := ev.UnmarshalBinary(bytes.NewBuffer(v)); err != nil {
log.W.F("bbolt: failed to unmarshal event at serial %d: %v", serial, err)
continue
}
// Generate indexes for this event
rawIdxs, err := database.GetIndexesForEvent(ev, serial)
if chk.E(err) {
ev.Free()
continue
}
// Group by bucket (first 3 bytes)
for _, idx := range rawIdxs {
if len(idx) < 3 {
continue
}
bucketName := string(idx[:3])
key := idx[3:]
// Skip eid and sei - already stored during import
if bucketName == "eid" || bucketName == "sei" {
continue
}
// Make a copy of the key
keyCopy := make([]byte, len(key))
copy(keyCopy, key)
indexesByBucket[bucketName] = append(indexesByBucket[bucketName], keyCopy)
}
ev.Free()
chunkEvents++
}
return nil
})
if err != nil {
return err
}
// No more events to process
if chunkEvents == 0 {
break
}
totalEvents += chunkEvents
lastSerial = chunkSerial
// Progress logging
if time.Since(lastLogTime) >= 5*time.Second {
log.I.F("bbolt: index build progress: %d events processed", totalEvents)
lastLogTime = time.Now()
}
// Count total keys in this chunk
var totalKeys int
for _, keys := range indexesByBucket {
totalKeys += len(keys)
}
log.I.F("bbolt: writing %d index keys for chunk (%d events)", totalKeys, chunkEvents)
// Write this chunk's indexes
for bucketName, keys := range indexesByBucket {
if len(keys) == 0 {
continue
}
bucketBytes := []byte(bucketName)
// Sort keys for this bucket before writing
sort.Slice(keys, func(i, j int) bool {
return bytes.Compare(keys[i], keys[j]) < 0
})
// Write in batches
const batchSize = 50000
for i := 0; i < len(keys); i += batchSize {
end := i + batchSize
if end > len(keys) {
end = len(keys)
}
batch := keys[i:end]
err := b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketBytes)
if bucket == nil {
return nil
}
for _, key := range batch {
if err := bucket.Put(key, nil); err != nil {
return err
}
}
return nil
})
if err != nil {
log.E.F("bbolt: failed to write batch for bucket %s: %v", bucketName, err)
return err
}
}
}
// Clear for next chunk and release memory
indexesByBucket = nil
debug.FreeOSMemory()
}
elapsed := time.Since(startTime)
log.I.F("bbolt: index build complete in %v (%d events)", elapsed.Round(time.Second), totalEvents)
return nil
}
// decodeSerialKey decodes a 5-byte serial key to uint64
func decodeSerialKey(b []byte) uint64 {
if len(b) < 5 {
return 0
}
return uint64(b[0])<<32 | uint64(b[1])<<24 | uint64(b[2])<<16 | uint64(b[3])<<8 | uint64(b[4])
}

55
pkg/bbolt/init.go Normal file
View File

@@ -0,0 +1,55 @@
//go:build !(js && wasm)
package bbolt
import (
"context"
"time"
"next.orly.dev/pkg/database"
)
func init() {
database.RegisterBboltFactory(newBboltFromConfig)
}
// newBboltFromConfig creates a BBolt database from DatabaseConfig
func newBboltFromConfig(
ctx context.Context,
cancel context.CancelFunc,
cfg *database.DatabaseConfig,
) (database.Database, error) {
// Convert DatabaseConfig to BboltConfig
bboltCfg := &BboltConfig{
DataDir: cfg.DataDir,
LogLevel: cfg.LogLevel,
// Use bbolt-specific settings from DatabaseConfig if present
// These will be added to DatabaseConfig later
BatchMaxEvents: cfg.BboltBatchMaxEvents,
BatchMaxBytes: cfg.BboltBatchMaxBytes,
BatchFlushTimeout: cfg.BboltFlushTimeout,
BloomSizeMB: cfg.BboltBloomSizeMB,
NoSync: cfg.BboltNoSync,
InitialMmapSize: cfg.BboltMmapSize,
}
// Apply defaults if not set
if bboltCfg.BatchMaxEvents <= 0 {
bboltCfg.BatchMaxEvents = 5000
}
if bboltCfg.BatchMaxBytes <= 0 {
bboltCfg.BatchMaxBytes = 128 * 1024 * 1024 // 128MB
}
if bboltCfg.BatchFlushTimeout <= 0 {
bboltCfg.BatchFlushTimeout = 30 * time.Second
}
if bboltCfg.BloomSizeMB <= 0 {
bboltCfg.BloomSizeMB = 16
}
if bboltCfg.InitialMmapSize <= 0 {
bboltCfg.InitialMmapSize = 8 * 1024 * 1024 * 1024 // 8GB
}
return NewWithConfig(ctx, cancel, bboltCfg)
}

81
pkg/bbolt/logger.go Normal file
View File

@@ -0,0 +1,81 @@
//go:build !(js && wasm)
package bbolt
import (
"fmt"
"runtime"
"strings"
"go.uber.org/atomic"
"lol.mleku.dev"
"lol.mleku.dev/log"
)
// Logger wraps the lol logger for BBolt
type Logger struct {
Level atomic.Int32
Label string
}
// NewLogger creates a new Logger instance
func NewLogger(level int, dataDir string) *Logger {
l := &Logger{Label: "bbolt"}
l.Level.Store(int32(level))
return l
}
// SetLogLevel updates the log level
func (l *Logger) SetLogLevel(level int) {
l.Level.Store(int32(level))
}
// Tracef logs a trace message
func (l *Logger) Tracef(format string, args ...interface{}) {
if l.Level.Load() >= int32(lol.Trace) {
s := l.Label + ": " + format
txt := fmt.Sprintf(s, args...)
_, file, line, _ := runtime.Caller(2)
log.T.F("%s\n%s:%d", strings.TrimSpace(txt), file, line)
}
}
// Debugf logs a debug message
func (l *Logger) Debugf(format string, args ...interface{}) {
if l.Level.Load() >= int32(lol.Debug) {
s := l.Label + ": " + format
txt := fmt.Sprintf(s, args...)
_, file, line, _ := runtime.Caller(2)
log.D.F("%s\n%s:%d", strings.TrimSpace(txt), file, line)
}
}
// Infof logs an info message
func (l *Logger) Infof(format string, args ...interface{}) {
if l.Level.Load() >= int32(lol.Info) {
s := l.Label + ": " + format
txt := fmt.Sprintf(s, args...)
_, file, line, _ := runtime.Caller(2)
log.I.F("%s\n%s:%d", strings.TrimSpace(txt), file, line)
}
}
// Warningf logs a warning message
func (l *Logger) Warningf(format string, args ...interface{}) {
if l.Level.Load() >= int32(lol.Warn) {
s := l.Label + ": " + format
txt := fmt.Sprintf(s, args...)
_, file, line, _ := runtime.Caller(2)
log.W.F("%s\n%s:%d", strings.TrimSpace(txt), file, line)
}
}
// Errorf logs an error message
func (l *Logger) Errorf(format string, args ...interface{}) {
if l.Level.Load() >= int32(lol.Error) {
s := l.Label + ": " + format
txt := fmt.Sprintf(s, args...)
_, file, line, _ := runtime.Caller(2)
log.E.F("%s\n%s:%d", strings.TrimSpace(txt), file, line)
}
}

62
pkg/bbolt/markers.go Normal file
View File

@@ -0,0 +1,62 @@
//go:build !(js && wasm)
package bbolt
import (
bolt "go.etcd.io/bbolt"
)
const markerPrefix = "marker:"
// SetMarker sets a metadata marker.
func (b *B) SetMarker(key string, value []byte) error {
return b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketMeta)
if bucket == nil {
return nil
}
return bucket.Put([]byte(markerPrefix+key), value)
})
}
// GetMarker gets a metadata marker.
func (b *B) GetMarker(key string) (value []byte, err error) {
err = b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketMeta)
if bucket == nil {
return nil
}
data := bucket.Get([]byte(markerPrefix + key))
if data != nil {
value = make([]byte, len(data))
copy(value, data)
}
return nil
})
return
}
// HasMarker checks if a marker exists.
func (b *B) HasMarker(key string) bool {
var exists bool
b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketMeta)
if bucket == nil {
return nil
}
exists = bucket.Get([]byte(markerPrefix+key)) != nil
return nil
})
return exists
}
// DeleteMarker deletes a marker.
func (b *B) DeleteMarker(key string) error {
return b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketMeta)
if bucket == nil {
return nil
}
return bucket.Delete([]byte(markerPrefix + key))
})
}

287
pkg/bbolt/query-graph.go Normal file
View File

@@ -0,0 +1,287 @@
//go:build !(js && wasm)
package bbolt
import (
"bytes"
bolt "go.etcd.io/bbolt"
"next.orly.dev/pkg/database/indexes/types"
)
// EdgeExists checks if an edge exists between two serials.
// Uses bloom filter for fast negative lookups.
func (b *B) EdgeExists(srcSerial, dstSerial uint64, edgeType byte) (bool, error) {
// Fast path: check bloom filter first
if !b.edgeBloom.MayExist(srcSerial, dstSerial, edgeType) {
return false, nil // Definitely doesn't exist
}
// Bloom says maybe - need to verify in adjacency list
return b.verifyEdgeInAdjacencyList(srcSerial, dstSerial, edgeType)
}
// verifyEdgeInAdjacencyList checks the adjacency list for edge existence.
func (b *B) verifyEdgeInAdjacencyList(srcSerial, dstSerial uint64, edgeType byte) (bool, error) {
var exists bool
err := b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketEv)
if bucket == nil {
return nil
}
key := makeSerialKey(srcSerial)
data := bucket.Get(key)
if data == nil {
return nil
}
vertex := &EventVertex{}
if err := vertex.Decode(data); err != nil {
return err
}
switch edgeType {
case EdgeTypeAuthor:
exists = vertex.AuthorSerial == dstSerial
case EdgeTypePTag:
for _, s := range vertex.PTagSerials {
if s == dstSerial {
exists = true
break
}
}
case EdgeTypeETag:
for _, s := range vertex.ETagSerials {
if s == dstSerial {
exists = true
break
}
}
}
return nil
})
return exists, err
}
// GetEventVertex retrieves the adjacency list for an event.
func (b *B) GetEventVertex(eventSerial uint64) (*EventVertex, error) {
var vertex *EventVertex
err := b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketEv)
if bucket == nil {
return nil
}
key := makeSerialKey(eventSerial)
data := bucket.Get(key)
if data == nil {
return nil
}
vertex = &EventVertex{}
return vertex.Decode(data)
})
return vertex, err
}
// GetPubkeyVertex retrieves the adjacency list for a pubkey.
func (b *B) GetPubkeyVertex(pubkeySerial uint64) (*PubkeyVertex, error) {
var vertex *PubkeyVertex
err := b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketPv)
if bucket == nil {
return nil
}
key := makeSerialKey(pubkeySerial)
data := bucket.Get(key)
if data == nil {
return nil
}
vertex = &PubkeyVertex{}
return vertex.Decode(data)
})
return vertex, err
}
// GetEventsAuthoredBy returns event serials authored by a pubkey.
func (b *B) GetEventsAuthoredBy(pubkeySerial uint64) ([]uint64, error) {
vertex, err := b.GetPubkeyVertex(pubkeySerial)
if err != nil || vertex == nil {
return nil, err
}
return vertex.AuthoredEvents, nil
}
// GetEventsMentioning returns event serials that mention a pubkey.
func (b *B) GetEventsMentioning(pubkeySerial uint64) ([]uint64, error) {
vertex, err := b.GetPubkeyVertex(pubkeySerial)
if err != nil || vertex == nil {
return nil, err
}
return vertex.MentionedIn, nil
}
// GetPTagsFromEvent returns pubkey serials tagged in an event.
func (b *B) GetPTagsFromEvent(eventSerial uint64) ([]uint64, error) {
vertex, err := b.GetEventVertex(eventSerial)
if err != nil || vertex == nil {
return nil, err
}
return vertex.PTagSerials, nil
}
// GetETagsFromEvent returns event serials referenced by an event.
func (b *B) GetETagsFromEvent(eventSerial uint64) ([]uint64, error) {
vertex, err := b.GetEventVertex(eventSerial)
if err != nil || vertex == nil {
return nil, err
}
return vertex.ETagSerials, nil
}
// GetFollowsFromPubkeySerial returns the pubkey serials that a user follows.
// This extracts p-tags from the user's kind-3 contact list event.
func (b *B) GetFollowsFromPubkeySerial(pubkeySerial *types.Uint40) ([]*types.Uint40, error) {
if pubkeySerial == nil {
return nil, nil
}
// Find the kind-3 event for this pubkey
contactEventSerial, err := b.FindEventByAuthorAndKind(pubkeySerial.Get(), 3)
if err != nil {
return nil, nil // No kind-3 event found is not an error
}
if contactEventSerial == 0 {
return nil, nil
}
// Get the p-tags from the event vertex
pTagSerials, err := b.GetPTagsFromEvent(contactEventSerial)
if err != nil {
return nil, err
}
// Convert to types.Uint40
result := make([]*types.Uint40, 0, len(pTagSerials))
for _, s := range pTagSerials {
ser := new(types.Uint40)
ser.Set(s)
result = append(result, ser)
}
return result, nil
}
// FindEventByAuthorAndKind finds an event serial by author and kind.
// For replaceable events like kind-3, returns the most recent one.
func (b *B) FindEventByAuthorAndKind(authorSerial uint64, kindNum uint16) (uint64, error) {
var resultSerial uint64
err := b.db.View(func(tx *bolt.Tx) error {
// First, get events authored by this pubkey
pvBucket := tx.Bucket(bucketPv)
if pvBucket == nil {
return nil
}
pvKey := makeSerialKey(authorSerial)
pvData := pvBucket.Get(pvKey)
if pvData == nil {
return nil
}
vertex := &PubkeyVertex{}
if err := vertex.Decode(pvData); err != nil {
return err
}
// Search through authored events for matching kind
evBucket := tx.Bucket(bucketEv)
if evBucket == nil {
return nil
}
var latestTs int64
for _, eventSerial := range vertex.AuthoredEvents {
evKey := makeSerialKey(eventSerial)
evData := evBucket.Get(evKey)
if evData == nil {
continue
}
evVertex := &EventVertex{}
if err := evVertex.Decode(evData); err != nil {
continue
}
if evVertex.Kind == kindNum {
// For replaceable events, we need to check timestamp
// Get event to compare timestamps
fpcBucket := tx.Bucket(bucketFpc)
if fpcBucket != nil {
// Scan for matching serial prefix in fpc bucket
c := fpcBucket.Cursor()
prefix := makeSerialKey(eventSerial)
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Next() {
// Key format: serial(5) | id(32) | pubkey_hash(8) | created_at(8)
if len(k) >= 53 {
ts := int64(decodeUint64(k[45:53]))
if ts > latestTs {
latestTs = ts
resultSerial = eventSerial
}
}
break
}
} else {
// If no fpc bucket, just take the first match
resultSerial = eventSerial
}
}
}
return nil
})
return resultSerial, err
}
// GetReferencingEvents returns event serials that reference a target event via e-tag.
func (b *B) GetReferencingEvents(targetSerial uint64) ([]uint64, error) {
var result []uint64
err := b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketEv)
if bucket == nil {
return nil
}
// Scan all event vertices looking for e-tag references
// Note: This is O(n) - for production, consider a reverse index
c := bucket.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
vertex := &EventVertex{}
if err := vertex.Decode(v); err != nil {
continue
}
for _, eTagSerial := range vertex.ETagSerials {
if eTagSerial == targetSerial {
eventSerial := decodeUint40(k)
result = append(result, eventSerial)
break
}
}
}
return nil
})
return result, err
}

View File

@@ -0,0 +1,96 @@
//go:build !(js && wasm)
package bbolt
import (
"errors"
"lol.mleku.dev/chk"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/database/bufpool"
"git.mleku.dev/mleku/nostr/encoders/event"
)
// SaveEventForImport saves an event optimized for bulk import.
// It skips duplicate checking, deletion checking, and graph vertex creation
// to maximize import throughput. Use only for trusted data migration.
func (b *B) SaveEventForImport(ev *event.E) error {
if ev == nil {
return errors.New("nil event")
}
// Reject ephemeral events (kinds 20000-29999)
if ev.Kind >= 20000 && ev.Kind <= 29999 {
return nil // silently skip
}
// Get the next serial number
serial := b.getNextEventSerial()
// Generate all indexes using the shared function
rawIdxs, err := database.GetIndexesForEvent(ev, serial)
if chk.E(err) {
return err
}
// Convert raw indexes to BatchedWrites, stripping the 3-byte prefix
batch := &EventBatch{
Serial: serial,
Indexes: make([]BatchedWrite, 0, len(rawIdxs)+1),
}
for _, idx := range rawIdxs {
if len(idx) < 3 {
continue
}
bucketName := idx[:3]
key := idx[3:]
batch.Indexes = append(batch.Indexes, BatchedWrite{
BucketName: bucketName,
Key: key,
Value: nil,
})
}
// Serialize event in compact format (without graph references for import)
resolver := &nullSerialResolver{}
compactData, compactErr := database.MarshalCompactEvent(ev, resolver)
if compactErr != nil {
// Fall back to legacy format
legacyBuf := bufpool.GetMedium()
defer bufpool.PutMedium(legacyBuf)
ev.MarshalBinary(legacyBuf)
compactData = bufpool.CopyBytes(legacyBuf)
}
batch.EventData = compactData
// Store serial -> event ID mapping
batch.Indexes = append(batch.Indexes, BatchedWrite{
BucketName: bucketSei,
Key: makeSerialKey(serial),
Value: ev.ID[:],
})
// Add to batcher (no graph vertex, no pubkey lookups)
return b.batcher.Add(batch)
}
// nullSerialResolver returns 0 for all lookups, used for fast import
// where we don't need pubkey/event serial references in compact format
type nullSerialResolver struct{}
func (r *nullSerialResolver) GetOrCreatePubkeySerial(pubkey []byte) (uint64, error) {
return 0, nil
}
func (r *nullSerialResolver) GetPubkeyBySerial(serial uint64) ([]byte, error) {
return nil, nil
}
func (r *nullSerialResolver) GetEventSerialById(eventID []byte) (uint64, bool, error) {
return 0, false, nil
}
func (r *nullSerialResolver) GetEventIdBySerial(serial uint64) ([]byte, error) {
return nil, nil
}

387
pkg/bbolt/save-event.go Normal file
View File

@@ -0,0 +1,387 @@
//go:build !(js && wasm)
package bbolt
import (
"context"
"errors"
"fmt"
"strings"
bolt "go.etcd.io/bbolt"
"lol.mleku.dev/chk"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/database/bufpool"
"next.orly.dev/pkg/database/indexes/types"
"next.orly.dev/pkg/mode"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter"
"git.mleku.dev/mleku/nostr/encoders/hex"
"git.mleku.dev/mleku/nostr/encoders/kind"
"git.mleku.dev/mleku/nostr/encoders/tag"
)
// SaveEvent saves an event to the database using the write batcher.
func (b *B) SaveEvent(c context.Context, ev *event.E) (replaced bool, err error) {
if ev == nil {
err = errors.New("nil event")
return
}
// Reject ephemeral events (kinds 20000-29999)
if ev.Kind >= 20000 && ev.Kind <= 29999 {
err = errors.New("blocked: ephemeral events should not be stored")
return
}
// Validate kind 3 (follow list) events have at least one p tag
if ev.Kind == 3 {
hasPTag := false
if ev.Tags != nil {
for _, tag := range *ev.Tags {
if tag != nil && tag.Len() >= 2 {
key := tag.Key()
if len(key) == 1 && key[0] == 'p' {
hasPTag = true
break
}
}
}
}
if !hasPTag {
err = errors.New("blocked: kind 3 follow list events must have at least one p tag")
return
}
}
// Check if the event already exists
var ser *types.Uint40
if ser, err = b.GetSerialById(ev.ID); err == nil && ser != nil {
err = errors.New("blocked: event already exists: " + hex.Enc(ev.ID[:]))
return
}
// If the error is "id not found", we can proceed
if err != nil && strings.Contains(err.Error(), "id not found") {
err = nil
} else if err != nil {
return
}
// Check if the event has been deleted
if !mode.IsOpen() {
if err = b.CheckForDeleted(ev, nil); err != nil {
err = fmt.Errorf("blocked: %s", err.Error())
return
}
}
// Check for replacement
if kind.IsReplaceable(ev.Kind) || kind.IsParameterizedReplaceable(ev.Kind) {
var werr error
if replaced, _, werr = b.WouldReplaceEvent(ev); werr != nil {
if errors.Is(werr, database.ErrOlderThanExisting) {
if kind.IsReplaceable(ev.Kind) {
err = errors.New("blocked: event is older than existing replaceable event")
} else {
err = errors.New("blocked: event is older than existing addressable event")
}
return
}
if errors.Is(werr, database.ErrMissingDTag) {
err = database.ErrMissingDTag
return
}
return
}
}
// Get the next serial number
serial := b.getNextEventSerial()
// Generate all indexes using the shared function
var rawIdxs [][]byte
if rawIdxs, err = database.GetIndexesForEvent(ev, serial); chk.E(err) {
return
}
// Convert raw indexes to BatchedWrites, stripping the 3-byte prefix
// since we use separate buckets
batch := &EventBatch{
Serial: serial,
Indexes: make([]BatchedWrite, 0, len(rawIdxs)),
}
for _, idx := range rawIdxs {
if len(idx) < 3 {
continue
}
// Get bucket name from prefix
bucketName := idx[:3]
key := idx[3:] // Key without prefix
batch.Indexes = append(batch.Indexes, BatchedWrite{
BucketName: bucketName,
Key: key,
Value: nil, // Index entries have empty values
})
}
// Serialize event in compact format
resolver := &bboltSerialResolver{b: b}
compactData, compactErr := database.MarshalCompactEvent(ev, resolver)
if compactErr != nil {
// Fall back to legacy format
legacyBuf := bufpool.GetMedium()
defer bufpool.PutMedium(legacyBuf)
ev.MarshalBinary(legacyBuf)
compactData = bufpool.CopyBytes(legacyBuf)
}
batch.EventData = compactData
// Build event vertex for adjacency list
var authorSerial uint64
err = b.db.Update(func(tx *bolt.Tx) error {
var e error
authorSerial, e = b.getOrCreatePubkeySerial(tx, ev.Pubkey)
return e
})
if chk.E(err) {
return
}
eventVertex := &EventVertex{
AuthorSerial: authorSerial,
Kind: uint16(ev.Kind),
PTagSerials: make([]uint64, 0),
ETagSerials: make([]uint64, 0),
}
// Collect edge keys for bloom filter
edgeKeys := make([]EdgeKey, 0)
// Add author edge to bloom filter
edgeKeys = append(edgeKeys, EdgeKey{
SrcSerial: serial,
DstSerial: authorSerial,
EdgeType: EdgeTypeAuthor,
})
// Set up pubkey vertex update for author
batch.PubkeyUpdate = &PubkeyVertexUpdate{
PubkeySerial: authorSerial,
AddAuthored: serial,
}
// Process p-tags
batch.MentionUpdates = make([]*PubkeyVertexUpdate, 0)
pTags := ev.Tags.GetAll([]byte("p"))
for _, pTag := range pTags {
if pTag.Len() >= 2 {
var ptagPubkey []byte
if ptagPubkey, err = hex.Dec(string(pTag.ValueHex())); err == nil && len(ptagPubkey) == 32 {
var ptagSerial uint64
err = b.db.Update(func(tx *bolt.Tx) error {
var e error
ptagSerial, e = b.getOrCreatePubkeySerial(tx, ptagPubkey)
return e
})
if chk.E(err) {
continue
}
eventVertex.PTagSerials = append(eventVertex.PTagSerials, ptagSerial)
// Add p-tag edge to bloom filter
edgeKeys = append(edgeKeys, EdgeKey{
SrcSerial: serial,
DstSerial: ptagSerial,
EdgeType: EdgeTypePTag,
})
// Add mention update for this pubkey
batch.MentionUpdates = append(batch.MentionUpdates, &PubkeyVertexUpdate{
PubkeySerial: ptagSerial,
AddMention: serial,
})
}
}
}
// Process e-tags
eTags := ev.Tags.GetAll([]byte("e"))
for _, eTag := range eTags {
if eTag.Len() >= 2 {
var targetEventID []byte
if targetEventID, err = hex.Dec(string(eTag.ValueHex())); err != nil || len(targetEventID) != 32 {
continue
}
// Look up the target event's serial
var targetSerial *types.Uint40
if targetSerial, err = b.GetSerialById(targetEventID); err != nil {
err = nil
continue
}
targetSer := targetSerial.Get()
eventVertex.ETagSerials = append(eventVertex.ETagSerials, targetSer)
// Add e-tag edge to bloom filter
edgeKeys = append(edgeKeys, EdgeKey{
SrcSerial: serial,
DstSerial: targetSer,
EdgeType: EdgeTypeETag,
})
}
}
batch.EventVertex = eventVertex
batch.EdgeKeys = edgeKeys
// Store serial -> event ID mapping
batch.Indexes = append(batch.Indexes, BatchedWrite{
BucketName: bucketSei,
Key: makeSerialKey(serial),
Value: ev.ID[:],
})
// Add to batcher
if err = b.batcher.Add(batch); chk.E(err) {
return
}
// Process deletion events
if ev.Kind == kind.Deletion.K {
if err = b.ProcessDelete(ev, nil); chk.E(err) {
b.Logger.Warningf("failed to process deletion for event %x: %v", ev.ID, err)
err = nil
}
}
return
}
// GetSerialsFromFilter returns serials matching a filter.
func (b *B) GetSerialsFromFilter(f *filter.F) (sers types.Uint40s, err error) {
var idxs []database.Range
if idxs, err = database.GetIndexesFromFilter(f); chk.E(err) {
return
}
sers = make(types.Uint40s, 0, len(idxs)*100)
for _, idx := range idxs {
var s types.Uint40s
if s, err = b.GetSerialsByRange(idx); chk.E(err) {
continue
}
sers = append(sers, s...)
}
return
}
// WouldReplaceEvent checks if the event would replace existing events.
func (b *B) WouldReplaceEvent(ev *event.E) (bool, types.Uint40s, error) {
if !(kind.IsReplaceable(ev.Kind) || kind.IsParameterizedReplaceable(ev.Kind)) {
return false, nil, nil
}
var f *filter.F
if kind.IsReplaceable(ev.Kind) {
f = &filter.F{
Authors: tag.NewFromBytesSlice(ev.Pubkey),
Kinds: kind.NewS(kind.New(ev.Kind)),
}
} else {
dTag := ev.Tags.GetFirst([]byte("d"))
if dTag == nil {
return false, nil, database.ErrMissingDTag
}
f = &filter.F{
Authors: tag.NewFromBytesSlice(ev.Pubkey),
Kinds: kind.NewS(kind.New(ev.Kind)),
Tags: tag.NewS(
tag.NewFromAny("d", dTag.Value()),
),
}
}
sers, err := b.GetSerialsFromFilter(f)
if chk.E(err) {
return false, nil, err
}
if len(sers) == 0 {
return false, nil, nil
}
shouldReplace := true
for _, s := range sers {
oldEv, ferr := b.FetchEventBySerial(s)
if chk.E(ferr) {
continue
}
if ev.CreatedAt < oldEv.CreatedAt {
shouldReplace = false
break
}
}
if shouldReplace {
return true, nil, nil
}
return false, nil, database.ErrOlderThanExisting
}
// bboltSerialResolver implements database.SerialResolver for compact event encoding
type bboltSerialResolver struct {
b *B
}
func (r *bboltSerialResolver) GetOrCreatePubkeySerial(pubkey []byte) (serial uint64, err error) {
err = r.b.db.Update(func(tx *bolt.Tx) error {
var e error
serial, e = r.b.getOrCreatePubkeySerial(tx, pubkey)
return e
})
return
}
func (r *bboltSerialResolver) GetPubkeyBySerial(serial uint64) (pubkey []byte, err error) {
r.b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketSpk)
if bucket == nil {
return nil
}
val := bucket.Get(makeSerialKey(serial))
if val != nil {
pubkey = make([]byte, 32)
copy(pubkey, val)
}
return nil
})
return
}
func (r *bboltSerialResolver) GetEventSerialById(eventID []byte) (serial uint64, found bool, err error) {
ser, e := r.b.GetSerialById(eventID)
if e != nil || ser == nil {
return 0, false, nil
}
return ser.Get(), true, nil
}
func (r *bboltSerialResolver) GetEventIdBySerial(serial uint64) (eventID []byte, err error) {
r.b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketSei)
if bucket == nil {
return nil
}
val := bucket.Get(makeSerialKey(serial))
if val != nil {
eventID = make([]byte, 32)
copy(eventID, val)
}
return nil
})
return
}

169
pkg/bbolt/serial.go Normal file
View File

@@ -0,0 +1,169 @@
//go:build !(js && wasm)
package bbolt
import (
"encoding/binary"
bolt "go.etcd.io/bbolt"
"lol.mleku.dev/chk"
)
const (
serialCounterKey = "serial_counter"
pubkeySerialCounterKey = "pubkey_serial_counter"
)
// initSerialCounters initializes or loads the serial counters from _meta bucket
func (b *B) initSerialCounters() error {
return b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketMeta)
if bucket == nil {
return nil
}
// Load event serial counter
val := bucket.Get([]byte(serialCounterKey))
if val == nil {
b.nextSerial = 1
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, 1)
if err := bucket.Put([]byte(serialCounterKey), buf); err != nil {
return err
}
} else {
b.nextSerial = binary.BigEndian.Uint64(val)
}
// Load pubkey serial counter
val = bucket.Get([]byte(pubkeySerialCounterKey))
if val == nil {
b.nextPubkeySeq = 1
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, 1)
if err := bucket.Put([]byte(pubkeySerialCounterKey), buf); err != nil {
return err
}
} else {
b.nextPubkeySeq = binary.BigEndian.Uint64(val)
}
return nil
})
}
// persistSerialCounters saves the current serial counters to disk
func (b *B) persistSerialCounters() error {
b.serialMu.Lock()
eventSerial := b.nextSerial
pubkeySerial := b.nextPubkeySeq
b.serialMu.Unlock()
return b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketMeta)
if bucket == nil {
return nil
}
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, eventSerial)
if err := bucket.Put([]byte(serialCounterKey), buf); chk.E(err) {
return err
}
binary.BigEndian.PutUint64(buf, pubkeySerial)
if err := bucket.Put([]byte(pubkeySerialCounterKey), buf); chk.E(err) {
return err
}
return nil
})
}
// getNextEventSerial returns the next event serial number (thread-safe)
func (b *B) getNextEventSerial() uint64 {
b.serialMu.Lock()
defer b.serialMu.Unlock()
serial := b.nextSerial
b.nextSerial++
// Persist every 1000 to reduce disk writes
if b.nextSerial%1000 == 0 {
go func() {
if err := b.persistSerialCounters(); chk.E(err) {
b.Logger.Warningf("bbolt: failed to persist serial counters: %v", err)
}
}()
}
return serial
}
// getNextPubkeySerial returns the next pubkey serial number (thread-safe)
func (b *B) getNextPubkeySerial() uint64 {
b.serialMu.Lock()
defer b.serialMu.Unlock()
serial := b.nextPubkeySeq
b.nextPubkeySeq++
// Persist every 1000 to reduce disk writes
if b.nextPubkeySeq%1000 == 0 {
go func() {
if err := b.persistSerialCounters(); chk.E(err) {
b.Logger.Warningf("bbolt: failed to persist serial counters: %v", err)
}
}()
}
return serial
}
// getOrCreatePubkeySerial gets or creates a serial for a pubkey
func (b *B) getOrCreatePubkeySerial(tx *bolt.Tx, pubkey []byte) (uint64, error) {
pksBucket := tx.Bucket(bucketPks)
spkBucket := tx.Bucket(bucketSpk)
if pksBucket == nil || spkBucket == nil {
return 0, nil
}
// Check if pubkey already has a serial
pubkeyHash := hashPubkey(pubkey)
val := pksBucket.Get(pubkeyHash)
if val != nil {
return decodeUint40(val), nil
}
// Create new serial
serial := b.getNextPubkeySerial()
serialKey := makeSerialKey(serial)
// Store pubkey_hash -> serial
serialBuf := make([]byte, 5)
encodeUint40(serial, serialBuf)
if err := pksBucket.Put(pubkeyHash, serialBuf); err != nil {
return 0, err
}
// Store serial -> full pubkey
fullPubkey := make([]byte, 32)
copy(fullPubkey, pubkey)
if err := spkBucket.Put(serialKey, fullPubkey); err != nil {
return 0, err
}
return serial, nil
}
// getPubkeyBySerial retrieves the full 32-byte pubkey from a serial
func (b *B) getPubkeyBySerial(tx *bolt.Tx, serial uint64) ([]byte, error) {
spkBucket := tx.Bucket(bucketSpk)
if spkBucket == nil {
return nil, nil
}
serialKey := makeSerialKey(serial)
return spkBucket.Get(serialKey), nil
}

233
pkg/bbolt/stubs.go Normal file
View File

@@ -0,0 +1,233 @@
//go:build !(js && wasm)
package bbolt
import (
"context"
"errors"
"time"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/database/indexes/types"
"next.orly.dev/pkg/interfaces/store"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter"
)
// This file contains stub implementations for interface methods that will be
// implemented fully in later phases. It allows the code to compile while
// we implement the core functionality.
var errNotImplemented = errors.New("bbolt: not implemented yet")
// QueryEvents queries events matching a filter.
func (b *B) QueryEvents(c context.Context, f *filter.F) (evs event.S, err error) {
// Get serials matching filter
var serials types.Uint40s
if serials, err = b.GetSerialsFromFilter(f); err != nil {
return
}
// Fetch events by serials
evs = make(event.S, 0, len(serials))
for _, ser := range serials {
ev, e := b.FetchEventBySerial(ser)
if e == nil && ev != nil {
evs = append(evs, ev)
}
}
return
}
// QueryAllVersions queries all versions of events matching a filter.
func (b *B) QueryAllVersions(c context.Context, f *filter.F) (evs event.S, err error) {
return b.QueryEvents(c, f)
}
// QueryEventsWithOptions queries events with additional options.
func (b *B) QueryEventsWithOptions(c context.Context, f *filter.F, includeDeleteEvents bool, showAllVersions bool) (evs event.S, err error) {
return b.QueryEvents(c, f)
}
// QueryDeleteEventsByTargetId queries delete events targeting a specific event.
func (b *B) QueryDeleteEventsByTargetId(c context.Context, targetEventId []byte) (evs event.S, err error) {
return nil, errNotImplemented
}
// QueryForSerials queries and returns only the serials.
func (b *B) QueryForSerials(c context.Context, f *filter.F) (serials types.Uint40s, err error) {
return b.GetSerialsFromFilter(f)
}
// QueryForIds queries and returns event ID/pubkey/timestamp tuples.
func (b *B) QueryForIds(c context.Context, f *filter.F) (idPkTs []*store.IdPkTs, err error) {
var serials types.Uint40s
if serials, err = b.GetSerialsFromFilter(f); err != nil {
return
}
return b.GetFullIdPubkeyBySerials(serials)
}
// DeleteEvent deletes an event by ID.
func (b *B) DeleteEvent(c context.Context, eid []byte) error {
return errNotImplemented
}
// DeleteEventBySerial deletes an event by serial.
func (b *B) DeleteEventBySerial(c context.Context, ser *types.Uint40, ev *event.E) error {
return errNotImplemented
}
// DeleteExpired deletes expired events.
func (b *B) DeleteExpired() {
// TODO: Implement
}
// ProcessDelete processes a deletion event.
// For migration from other backends, deletions have already been processed,
// so this is a no-op. Full implementation needed for production use.
func (b *B) ProcessDelete(ev *event.E, admins [][]byte) error {
// TODO: Implement full deletion processing for production use
// For now, just return nil to allow migrations to proceed
return nil
}
// CheckForDeleted checks if an event has been deleted.
func (b *B) CheckForDeleted(ev *event.E, admins [][]byte) error {
return nil // Not deleted by default
}
// GetSubscription gets a user's subscription.
func (b *B) GetSubscription(pubkey []byte) (*database.Subscription, error) {
return nil, errNotImplemented
}
// IsSubscriptionActive checks if a subscription is active.
func (b *B) IsSubscriptionActive(pubkey []byte) (bool, error) {
return false, errNotImplemented
}
// ExtendSubscription extends a subscription.
func (b *B) ExtendSubscription(pubkey []byte, days int) error {
return errNotImplemented
}
// RecordPayment records a payment.
func (b *B) RecordPayment(pubkey []byte, amount int64, invoice, preimage string) error {
return errNotImplemented
}
// GetPaymentHistory gets payment history.
func (b *B) GetPaymentHistory(pubkey []byte) ([]database.Payment, error) {
return nil, errNotImplemented
}
// ExtendBlossomSubscription extends a Blossom subscription.
func (b *B) ExtendBlossomSubscription(pubkey []byte, tier string, storageMB int64, daysExtended int) error {
return errNotImplemented
}
// GetBlossomStorageQuota gets Blossom storage quota.
func (b *B) GetBlossomStorageQuota(pubkey []byte) (quotaMB int64, err error) {
return 0, errNotImplemented
}
// IsFirstTimeUser checks if this is a first-time user.
func (b *B) IsFirstTimeUser(pubkey []byte) (bool, error) {
return true, nil
}
// AddNIP43Member adds a NIP-43 member.
func (b *B) AddNIP43Member(pubkey []byte, inviteCode string) error {
return errNotImplemented
}
// RemoveNIP43Member removes a NIP-43 member.
func (b *B) RemoveNIP43Member(pubkey []byte) error {
return errNotImplemented
}
// IsNIP43Member checks if pubkey is a NIP-43 member.
func (b *B) IsNIP43Member(pubkey []byte) (isMember bool, err error) {
return false, errNotImplemented
}
// GetNIP43Membership gets NIP-43 membership details.
func (b *B) GetNIP43Membership(pubkey []byte) (*database.NIP43Membership, error) {
return nil, errNotImplemented
}
// GetAllNIP43Members gets all NIP-43 members.
func (b *B) GetAllNIP43Members() ([][]byte, error) {
return nil, errNotImplemented
}
// StoreInviteCode stores an invite code.
func (b *B) StoreInviteCode(code string, expiresAt time.Time) error {
return errNotImplemented
}
// ValidateInviteCode validates an invite code.
func (b *B) ValidateInviteCode(code string) (valid bool, err error) {
return false, errNotImplemented
}
// DeleteInviteCode deletes an invite code.
func (b *B) DeleteInviteCode(code string) error {
return errNotImplemented
}
// PublishNIP43MembershipEvent publishes a NIP-43 membership event.
func (b *B) PublishNIP43MembershipEvent(kind int, pubkey []byte) error {
return errNotImplemented
}
// RunMigrations runs database migrations.
func (b *B) RunMigrations() {
// TODO: Implement if needed
}
// GetCachedJSON gets cached JSON for a filter (stub - no caching in bbolt).
func (b *B) GetCachedJSON(f *filter.F) ([][]byte, bool) {
return nil, false
}
// CacheMarshaledJSON caches JSON for a filter (stub - no caching in bbolt).
func (b *B) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) {
// No-op: BBolt doesn't use query cache to save RAM
}
// GetCachedEvents gets cached events for a filter (stub - no caching in bbolt).
func (b *B) GetCachedEvents(f *filter.F) (event.S, bool) {
return nil, false
}
// CacheEvents caches events for a filter (stub - no caching in bbolt).
func (b *B) CacheEvents(f *filter.F, events event.S) {
// No-op: BBolt doesn't use query cache to save RAM
}
// InvalidateQueryCache invalidates the query cache (stub - no caching in bbolt).
func (b *B) InvalidateQueryCache() {
// No-op
}
// RecordEventAccess records an event access.
func (b *B) RecordEventAccess(serial uint64, connectionID string) error {
return nil // TODO: Implement if needed
}
// GetEventAccessInfo gets event access information.
func (b *B) GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) {
return 0, 0, errNotImplemented
}
// GetLeastAccessedEvents gets least accessed events.
func (b *B) GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error) {
return nil, errNotImplemented
}
// EventIdsBySerial gets event IDs by serial range.
func (b *B) EventIdsBySerial(start uint64, count int) (evs []uint64, err error) {
return nil, errNotImplemented
}

View File

@@ -240,6 +240,11 @@ func readUint40(r io.Reader) (value uint64, err error) {
// The resolver is used to look up pubkeys and event IDs from serials.
// The eventId parameter is the full 32-byte event ID (from SerialEventId table).
func UnmarshalCompactEvent(data []byte, eventId []byte, resolver SerialResolver) (ev *event.E, err error) {
// Validate eventId upfront to prevent returning events with zero IDs
if len(eventId) != 32 {
return nil, errors.New("invalid eventId: must be exactly 32 bytes")
}
r := bytes.NewReader(data)
ev = new(event.E)

View File

@@ -41,6 +41,14 @@ type DatabaseConfig struct {
Neo4jFetchSize int // ORLY_NEO4J_FETCH_SIZE - max records per fetch batch (default: 1000)
Neo4jMaxTxRetrySeconds int // ORLY_NEO4J_MAX_TX_RETRY_SEC - max transaction retry time (default: 30)
Neo4jQueryResultLimit int // ORLY_NEO4J_QUERY_RESULT_LIMIT - max results per query (default: 10000, 0=unlimited)
// BBolt-specific settings (optimized for HDD)
BboltBatchMaxEvents int // ORLY_BBOLT_BATCH_MAX_EVENTS - max events per batch (default: 5000)
BboltBatchMaxBytes int64 // ORLY_BBOLT_BATCH_MAX_MB * 1024 * 1024 (default: 128MB)
BboltFlushTimeout time.Duration // ORLY_BBOLT_BATCH_FLUSH_SEC * time.Second (default: 30s)
BboltBloomSizeMB int // ORLY_BBOLT_BLOOM_SIZE_MB - bloom filter size (default: 16MB)
BboltNoSync bool // ORLY_BBOLT_NO_SYNC - disable fsync (DANGEROUS)
BboltMmapSize int // ORLY_BBOLT_MMAP_SIZE_GB * 1024 * 1024 * 1024 (default: 8GB)
}
// NewDatabase creates a database instance based on the specified type.
@@ -84,8 +92,14 @@ func NewDatabaseWithConfig(
return nil, fmt.Errorf("wasmdb database backend not available (import _ \"next.orly.dev/pkg/wasmdb\")")
}
return newWasmDBDatabase(ctx, cancel, cfg)
case "bbolt", "bolt":
// Use the bbolt implementation (B+tree, optimized for HDD)
if newBboltDatabase == nil {
return nil, fmt.Errorf("bbolt database backend not available (import _ \"next.orly.dev/pkg/bbolt\")")
}
return newBboltDatabase(ctx, cancel, cfg)
default:
return nil, fmt.Errorf("unsupported database type: %s (supported: badger, neo4j, wasmdb)", dbType)
return nil, fmt.Errorf("unsupported database type: %s (supported: badger, neo4j, wasmdb, bbolt)", dbType)
}
}
@@ -108,3 +122,13 @@ var newWasmDBDatabase func(context.Context, context.CancelFunc, *DatabaseConfig)
func RegisterWasmDBFactory(factory func(context.Context, context.CancelFunc, *DatabaseConfig) (Database, error)) {
newWasmDBDatabase = factory
}
// newBboltDatabase creates a bbolt database instance
// This is defined here to avoid import cycles
var newBboltDatabase func(context.Context, context.CancelFunc, *DatabaseConfig) (Database, error)
// RegisterBboltFactory registers the bbolt database factory
// This is called from the bbolt package's init() function
func RegisterBboltFactory(factory func(context.Context, context.CancelFunc, *DatabaseConfig) (Database, error)) {
newBboltDatabase = factory
}

View File

@@ -44,9 +44,12 @@ func (d *D) FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error) {
// Check if this is compact format
if len(eventData) > 0 && eventData[0] == CompactFormatVersion {
eventId, idErr := d.GetEventIdBySerial(ser)
if idErr == nil {
return UnmarshalCompactEvent(eventData, eventId, resolver)
if idErr != nil {
// Cannot decode compact format without event ID - return error
// DO NOT fall back to legacy unmarshal as compact format is not valid legacy format
return nil, fmt.Errorf("compact format inline but no event ID mapping for serial %d: %w", ser.Get(), idErr)
}
return UnmarshalCompactEvent(eventData, eventId, resolver)
}
// Legacy binary format
@@ -106,10 +109,14 @@ func (d *D) FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error) {
// Check if this is compact format
if len(v) > 0 && v[0] == CompactFormatVersion {
eventId, idErr := d.GetEventIdBySerial(ser)
if idErr == nil {
ev, err = UnmarshalCompactEvent(v, eventId, resolver)
if idErr != nil {
// Cannot decode compact format without event ID - return error
// DO NOT fall back to legacy unmarshal as compact format is not valid legacy format
err = fmt.Errorf("compact format evt but no event ID mapping for serial %d: %w", ser.Get(), idErr)
return
}
ev, err = UnmarshalCompactEvent(v, eventId, resolver)
return
}
// Check if we have valid data before attempting to unmarshal

View File

@@ -149,12 +149,10 @@ func (d *D) fetchSmallEvent(txn *badger.Txn, ser *types.Uint40) (ev *event.E, er
resolver := NewDatabaseSerialResolver(d, d.serialCache)
eventId, idErr := d.GetEventIdBySerial(ser)
if idErr != nil {
// Fall back to legacy unmarshal
ev = new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err != nil {
return nil, err
}
return ev, nil
// Cannot decode compact format without event ID - return error
// DO NOT fall back to legacy unmarshal as compact format is not valid legacy format
log.W.F("fetchSmallEvent: compact format but no event ID mapping for serial %d: %v", ser.Get(), idErr)
return nil, idErr
}
return UnmarshalCompactEvent(eventData, eventId, resolver)
}
@@ -196,12 +194,10 @@ func (d *D) fetchLegacyEvent(txn *badger.Txn, ser *types.Uint40) (ev *event.E, e
resolver := NewDatabaseSerialResolver(d, d.serialCache)
eventId, idErr := d.GetEventIdBySerial(ser)
if idErr != nil {
// Fall back to legacy unmarshal
ev = new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(v)); err != nil {
return nil, err
}
return ev, nil
// Cannot decode compact format without event ID - return error
// DO NOT fall back to legacy unmarshal as compact format is not valid legacy format
log.W.F("fetchLegacyEvent: compact format but no event ID mapping for serial %d: %v", ser.Get(), idErr)
return nil, idErr
}
return UnmarshalCompactEvent(v, eventId, resolver)
}

View File

@@ -251,7 +251,11 @@ func (d *D) GetEventIdBySerial(ser *types.Uint40) (eventId []byte, err error) {
}
return item.Value(func(val []byte) error {
eventId = make([]byte, len(val))
// Validate that the stored value is exactly 32 bytes
if len(val) != 32 {
return errors.New("corrupted event ID: expected 32 bytes")
}
eventId = make([]byte, 32)
copy(eventId, val)
return nil
})

View File

@@ -1 +1 @@
v0.47.0
v0.48.8