Compare commits

..

2 Commits

Author SHA1 Message Date
woikos
0dac41e35e Add documentation and improve BBolt import memory efficiency (v0.48.8)
Some checks failed
Go / build-and-release (push) Has been cancelled
- Add README.md table of contents for easier navigation
- Add Curation ACL documentation section to README.md
- Create detailed Curation Mode Guide (docs/CURATION_MODE_GUIDE.md)
- Fix OOM during BBolt index building by closing temp file before build
- Add GC calls before index building to reclaim batch buffer memory
- Improve import-export.go with processJSONLEventsReturningCount
- Add policy-aware import path for sync operations

Files modified:
- README.md: Added TOC and curation ACL documentation
- docs/CURATION_MODE_GUIDE.md: New comprehensive curation mode guide
- pkg/bbolt/import-export.go: Memory-safe import with deferred cleanup
- pkg/bbolt/import-minimal.go: Added GC before index build
- pkg/version/version: Bump to v0.48.8

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-06 15:37:06 +01:00
woikos
2480be3a73 Fix OOM in BuildIndexes by processing in chunks (v0.48.6)
- Process events in 200k chunks instead of loading all at once
- Write indexes to disk after each chunk, then free memory
- Call debug.FreeOSMemory() between chunks to release memory to OS
- Memory usage now ~150-200MB per chunk instead of 5GB+

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-06 09:10:50 +01:00
7 changed files with 940 additions and 16 deletions

View File

@@ -12,6 +12,36 @@ zap me: <20>mlekudev@getalby.com
follow me on [nostr](https://jumble.social/users/npub1fjqqy4a93z5zsjwsfxqhc2764kvykfdyttvldkkkdera8dr78vhsmmleku)
## Table of Contents
- [Bug Reports & Feature Requests](#%EF%B8%8F-bug-reports--feature-requests)
- [System Requirements](#%EF%B8%8F-system-requirements)
- [About](#about)
- [Performance & Cryptography](#performance--cryptography)
- [Building](#building)
- [Prerequisites](#prerequisites)
- [Basic Build](#basic-build)
- [Building with Web UI](#building-with-web-ui)
- [Core Features](#core-features)
- [Web UI](#web-ui)
- [Sprocket Event Processing](#sprocket-event-processing)
- [Policy System](#policy-system)
- [Deployment](#deployment)
- [Automated Deployment](#automated-deployment)
- [TLS Configuration](#tls-configuration)
- [systemd Service Management](#systemd-service-management)
- [Remote Deployment](#remote-deployment)
- [Configuration](#configuration)
- [Firewall Configuration](#firewall-configuration)
- [Monitoring](#monitoring)
- [Testing](#testing)
- [Command-Line Tools](#command-line-tools)
- [Access Control](#access-control)
- [Follows ACL](#follows-acl)
- [Curation ACL](#curation-acl)
- [Cluster Replication](#cluster-replication)
- [Developer Notes](#developer-notes)
## ⚠️ Bug Reports & Feature Requests
**Bug reports and feature requests that do not follow the protocol will not be accepted.**
@@ -566,9 +596,22 @@ go run ./cmd/subscription-test-simple -url ws://localhost:3334 -duration 120
## Access Control
ORLY provides four ACL (Access Control List) modes to control who can publish events to your relay:
| Mode | Description | Best For |
|------|-------------|----------|
| `none` | Open relay, anyone can write | Public relays |
| `follows` | Write access based on admin follow lists | Personal/community relays |
| `managed` | Explicit allow/deny lists via NIP-86 API | Private relays |
| `curating` | Three-tier classification with rate limiting | Curated community relays |
```bash
export ORLY_ACL_MODE=follows # or: none, managed, curating
```
### Follows ACL
The follows ACL (Access Control List) system provides flexible relay access control based on social relationships in the Nostr network.
The follows ACL system provides flexible relay access control based on social relationships in the Nostr network.
```bash
export ORLY_ACL_MODE=follows
@@ -578,6 +621,30 @@ export ORLY_ADMINS=npub1fjqqy4a93z5zsjwsfxqhc2764kvykfdyttvldkkkdera8dr78vhsmmle
The system grants write access to users followed by designated admins, with read-only access for others. Follow lists update dynamically as admins modify their relationships.
### Curation ACL
The curation ACL mode provides sophisticated content curation with a three-tier publisher classification system:
- **Trusted**: Unlimited publishing, bypass rate limits
- **Blacklisted**: Blocked from publishing, invisible to regular users
- **Unclassified**: Rate-limited publishing (default 50 events/day)
Key features:
- **Kind whitelisting**: Only allow specific event kinds (e.g., social, DMs, longform)
- **IP-based flood protection**: Auto-ban IPs that exceed rate limits
- **Spam flagging**: Mark events as spam without deleting
- **Web UI management**: Configure via the built-in curation interface
```bash
export ORLY_ACL_MODE=curating
export ORLY_OWNERS=npub1your_owner_key
./orly
```
After starting, publish a configuration event (kind 30078) to enable the relay. The web UI at `/#curation` provides a complete management interface.
For detailed configuration and API documentation, see the [Curation Mode Guide](docs/CURATION_MODE_GUIDE.md).
### Cluster Replication
ORLY supports distributed relay clusters using active replication. When configured with peer relays, ORLY will automatically synchronize events between cluster members using efficient HTTP polling.

411
docs/CURATION_MODE_GUIDE.md Normal file
View File

@@ -0,0 +1,411 @@
# Curation Mode Guide
Curation mode is a sophisticated access control system for Nostr relays that provides three-tier publisher classification, rate limiting, IP-based flood protection, and event kind whitelisting.
## Overview
Unlike simple allow/deny lists, curation mode classifies publishers into three tiers:
| Tier | Rate Limited | Daily Limit | Visibility |
|------|--------------|-------------|------------|
| **Trusted** | No | Unlimited | Full |
| **Blacklisted** | N/A (blocked) | 0 | Hidden from regular users |
| **Unclassified** | Yes | 50 events/day (default) | Full |
This allows relay operators to:
- Reward quality contributors with unlimited publishing
- Block bad actors while preserving their events for admin review
- Allow new users to participate with reasonable rate limits
- Prevent spam floods through automatic IP-based protections
## Quick Start
### 1. Start the Relay
```bash
export ORLY_ACL_MODE=curating
export ORLY_OWNERS=npub1your_owner_pubkey
./orly
```
### 2. Publish Configuration
The relay will not accept events until you publish a configuration event. Use the web UI at `http://your-relay/#curation` or publish a kind 30078 event:
```json
{
"kind": 30078,
"tags": [["d", "curating-config"]],
"content": "{\"dailyLimit\":50,\"ipDailyLimit\":500,\"firstBanHours\":1,\"secondBanHours\":168,\"kindCategories\":[\"social\"]}"
}
```
### 3. Manage Publishers
Use the web UI or NIP-86 API to:
- Trust quality publishers
- Blacklist spammers
- Review unclassified users by activity
- Unblock IPs if needed
## Configuration
### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `ORLY_ACL_MODE` | `none` | Set to `curating` to enable |
| `ORLY_OWNERS` | | Owner pubkeys (can configure relay) |
| `ORLY_ADMINS` | | Admin pubkeys (can manage publishers) |
### Configuration Event (Kind 30078)
Configuration is stored as a replaceable Nostr event (kind 30078) with d-tag `curating-config`. Only owners and admins can publish configuration.
```typescript
interface CuratingConfig {
// Rate Limiting
dailyLimit: number; // Max events/day for unclassified users (default: 50)
ipDailyLimit: number; // Max events/day from single IP (default: 500)
// IP Ban Durations
firstBanHours: number; // First offense ban duration (default: 1 hour)
secondBanHours: number; // Subsequent offense ban duration (default: 168 hours / 1 week)
// Kind Filtering (choose one or combine)
allowedKinds: number[]; // Explicit kind numbers: [0, 1, 3, 7]
allowedRanges: string[]; // Kind ranges: ["1000-1999", "30000-39999"]
kindCategories: string[]; // Pre-defined categories: ["social", "dm"]
}
```
### Event Kind Categories
Pre-defined categories for convenient kind whitelisting:
| Category | Kinds | Description |
|----------|-------|-------------|
| `social` | 0, 1, 3, 6, 7, 10002 | Profiles, notes, contacts, reposts, reactions |
| `dm` | 4, 14, 1059 | Direct messages (NIP-04, NIP-17, gift wraps) |
| `longform` | 30023, 30024 | Long-form articles and drafts |
| `media` | 1063, 20, 21, 22 | File metadata, picture/video/audio events |
| `marketplace` | 30017-30020, 1021, 1022 | Products, stalls, auctions, bids |
| `groups_nip29` | 9-12, 9000-9002, 39000-39002 | NIP-29 relay-based groups |
| `groups_nip72` | 34550, 1111, 4550 | NIP-72 moderated communities |
| `lists` | 10000, 10001, 10003, 30000, 30001, 30003 | Mute, pin, bookmark lists |
Example configuration allowing social interactions and DMs:
```json
{
"kindCategories": ["social", "dm"],
"dailyLimit": 100,
"ipDailyLimit": 1000
}
```
## Three-Tier Classification
### Trusted Publishers
Trusted publishers have unlimited publishing rights:
- Bypass all rate limiting
- Can publish any allowed kind
- Events always visible to all users
**Use case**: Known quality contributors, verified community members, partner relays.
### Blacklisted Publishers
Blacklisted publishers are blocked from publishing:
- All events rejected with `"pubkey is blacklisted"` error
- Existing events become invisible to regular users
- Admins and owners can still see blacklisted events (for review)
**Use case**: Spammers, abusive users, bad actors.
### Unclassified Publishers
Everyone else falls into the unclassified tier:
- Subject to daily event limit (default: 50 events/day)
- Subject to IP-based flood protection
- Events visible to all users
- Can be promoted to trusted or demoted to blacklisted
**Use case**: New users, general public.
## Rate Limiting & Flood Protection
### Per-Pubkey Limits
Unclassified publishers are limited to a configurable number of events per day (default: 50). The count resets at midnight UTC.
When a user exceeds their limit:
1. Event is rejected with `"daily event limit exceeded"` error
2. Their IP is flagged for potential abuse
### Per-IP Limits
To prevent Sybil attacks (creating many pubkeys from one IP), there's also an IP-based daily limit (default: 500 events).
When an IP exceeds its limit:
1. All events from that IP are rejected
2. The IP is temporarily banned
### Automatic IP Banning
When rate limits are exceeded:
| Offense | Ban Duration | Description |
|---------|--------------|-------------|
| First | 1 hour | Quick timeout for accidental over-posting |
| Second+ | 1 week | Extended ban for repeated abuse |
Ban durations are configurable via `firstBanHours` and `secondBanHours`.
### Offense Tracking
The system tracks which pubkeys triggered rate limits from each IP:
```
IP 192.168.1.100:
- npub1abc... exceeded limit at 2024-01-15 10:30:00
- npub1xyz... exceeded limit at 2024-01-15 10:45:00
Offense count: 2
Status: Banned until 2024-01-22 10:45:00
```
This helps identify coordinated spam attacks.
## Spam Flagging
Events can be flagged as spam without deletion:
- Flagged events are hidden from regular users
- Admins can review flagged events
- Events can be unflagged if incorrectly marked
- Original event data is preserved
This is useful for:
- Moderation review queues
- Training spam detection systems
- Preserving evidence of abuse
## NIP-86 Management API
All management operations use NIP-98 HTTP authentication.
### Trust Management
```bash
# Trust a pubkey
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"trustpubkey","params":["<pubkey_hex>"]}'
# Untrust a pubkey
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"untrustpubkey","params":["<pubkey_hex>"]}'
# List trusted pubkeys
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"listtrustedpubkeys","params":[]}'
```
### Blacklist Management
```bash
# Blacklist a pubkey
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"blacklistpubkey","params":["<pubkey_hex>"]}'
# Remove from blacklist
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"unblacklistpubkey","params":["<pubkey_hex>"]}'
# List blacklisted pubkeys
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"listblacklistedpubkeys","params":[]}'
```
### Unclassified User Management
```bash
# List unclassified users sorted by event count
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"listunclassifiedusers","params":[]}'
```
Response includes pubkey, event count, and last activity for each user.
### Spam Management
```bash
# Mark event as spam
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"markspam","params":["<event_id_hex>"]}'
# Unmark spam
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"unmarkspam","params":["<event_id_hex>"]}'
# List spam events
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"listspamevents","params":[]}'
```
### IP Block Management
```bash
# List blocked IPs
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"listblockedips","params":[]}'
# Unblock an IP
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"unblockip","params":["<ip_address>"]}'
```
### Configuration Management
```bash
# Get current configuration
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"getcuratingconfig","params":[]}'
# Set allowed kind categories
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"setallowedkindcategories","params":[["social","dm","longform"]]}'
# Get allowed kind categories
curl -X POST https://relay.example.com \
-H "Authorization: Nostr <nip98_token>" \
-d '{"method":"getallowedkindcategories","params":[]}'
```
## Web UI
The curation web UI is available at `/#curation` and provides:
- **Configuration Panel**: Set rate limits, ban durations, and allowed kinds
- **Publisher Management**: Trust/blacklist pubkeys with one click
- **Unclassified Users**: View users sorted by activity, promote or blacklist
- **IP Blocks**: View and unblock banned IP addresses
- **Spam Queue**: Review flagged events, confirm or unflag
## Database Storage
Curation data is stored in the relay database with the following key prefixes:
| Prefix | Purpose |
|--------|---------|
| `CURATING_ACL_CONFIG` | Current configuration |
| `CURATING_ACL_TRUSTED_PUBKEY_{pubkey}` | Trusted publisher list |
| `CURATING_ACL_BLACKLISTED_PUBKEY_{pubkey}` | Blacklisted publisher list |
| `CURATING_ACL_EVENT_COUNT_{pubkey}_{date}` | Daily event counts per pubkey |
| `CURATING_ACL_IP_EVENT_COUNT_{ip}_{date}` | Daily event counts per IP |
| `CURATING_ACL_IP_OFFENSE_{ip}` | Offense tracking per IP |
| `CURATING_ACL_BLOCKED_IP_{ip}` | Active IP blocks |
| `CURATING_ACL_SPAM_EVENT_{eventID}` | Spam-flagged events |
## Caching
For performance, the following data is cached in memory:
- Trusted pubkey set
- Blacklisted pubkey set
- Allowed kinds set
- Current configuration
Caches are refreshed every hour by the background cleanup goroutine.
## Background Maintenance
A background goroutine runs hourly to:
1. Remove expired IP blocks
2. Clean up old event count entries (older than 2 days)
3. Refresh in-memory caches
4. Log maintenance statistics
## Best Practices
### Starting a New Curated Relay
1. Start with permissive settings:
```json
{"dailyLimit": 100, "ipDailyLimit": 1000, "kindCategories": ["social"]}
```
2. Monitor unclassified users for a few days
3. Trust active, quality contributors
4. Blacklist obvious spammers
5. Adjust rate limits based on observed patterns
### Handling Spam Waves
During spam attacks:
1. The IP-based flood protection will auto-ban attack sources
2. Review blocked IPs via web UI or API
3. Blacklist any pubkeys that got through
4. Consider temporarily lowering `ipDailyLimit`
### Recovering from Mistakes
- **Accidentally blacklisted someone**: Use `unblacklistpubkey` - their events become visible again
- **Wrongly flagged spam**: Use `unmarkspam` - event becomes visible again
- **Blocked legitimate IP**: Use `unblockip` - IP can publish again immediately
## Comparison with Other ACL Modes
| Feature | None | Follows | Managed | Curating |
|---------|------|---------|---------|----------|
| Default Access | Write | Write if followed | Explicit allow | Rate-limited |
| Rate Limiting | No | No | No | Yes |
| Kind Filtering | No | No | Optional | Yes |
| IP Protection | No | No | No | Yes |
| Spam Flagging | No | No | No | Yes |
| Configuration | Env vars | Follow lists | NIP-86 | Kind 30078 events |
| Web UI | Basic | Basic | Basic | Full curation panel |
## Troubleshooting
### "Relay not accepting events"
The relay requires a configuration event before accepting any events. Publish a kind 30078 event with d-tag `curating-config`.
### "daily event limit exceeded"
The user has exceeded their daily limit. Options:
1. Wait until midnight UTC for reset
2. Trust the pubkey if they're a quality contributor
3. Increase `dailyLimit` in configuration
### "pubkey is blacklisted"
The pubkey is on the blacklist. Use `unblacklistpubkey` if this was a mistake.
### "IP is blocked"
The IP has been auto-banned due to rate limit violations. Use `unblockip` if legitimate, or wait for the ban to expire.
### Events disappearing for users
Check if the event author has been blacklisted. Blacklisted authors' events are hidden from regular users but visible to admins.

View File

@@ -31,9 +31,10 @@ func (b *B) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
if chk.E(err) {
return err
}
defer os.Remove(tmp.Name()) // Clean up temp file when done
tmpName := tmp.Name()
defer os.Remove(tmpName) // Clean up temp file when done
log.I.F("bbolt import: buffering upload to %s", tmp.Name())
log.I.F("bbolt import: buffering upload to %s", tmpName)
bufferStart := time.Now()
bytesBuffered, err := io.Copy(tmp, rr)
if chk.E(err) {
@@ -48,12 +49,30 @@ func (b *B) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
return err
}
processErr := b.processJSONLEvents(ctx, tmp)
count, processErr := b.processJSONLEventsReturningCount(ctx, tmp)
// Close temp file to release resources before index building
tmp.Close()
if processErr != nil {
return processErr
}
// Build indexes after events are stored (minimal import mode)
if count > 0 {
// Force garbage collection to reclaim memory before index building
debug.FreeOSMemory()
log.I.F("bbolt import: building indexes for %d events...", count)
if err := b.BuildIndexes(ctx); err != nil {
log.E.F("bbolt import: failed to build indexes: %v", err)
return err
}
}
totalElapsed := time.Since(startTime)
log.I.F("bbolt import: total operation time: %v", totalElapsed.Round(time.Millisecond))
return processErr
return nil
}
// ImportEventsFromStrings imports events from a slice of JSON strings with policy filtering
@@ -67,7 +86,95 @@ func (b *B) ImportEventsFromStrings(ctx context.Context, eventJSONs []string, po
// processJSONLEvents processes JSONL events from a reader
func (b *B) processJSONLEvents(ctx context.Context, rr io.Reader) error {
return b.processJSONLEventsWithPolicy(ctx, rr, nil)
_, err := b.processJSONLEventsReturningCount(ctx, rr)
return err
}
// processJSONLEventsReturningCount processes JSONL events and returns the count saved
// This is used by ImportEventsFromReader for migration mode (minimal import without inline indexes)
func (b *B) processJSONLEventsReturningCount(ctx context.Context, rr io.Reader) (int, error) {
// Create a scanner to read the buffer line by line
scan := bufio.NewScanner(rr)
scanBuf := make([]byte, maxLen)
scan.Buffer(scanBuf, maxLen)
// Performance tracking
startTime := time.Now()
lastLogTime := startTime
const logInterval = 5 * time.Second
var count, total, skipped, unmarshalErrors, saveErrors int
for scan.Scan() {
select {
case <-ctx.Done():
log.I.F("bbolt import: context closed after %d events", count)
return count, ctx.Err()
default:
}
line := scan.Bytes()
total += len(line) + 1
if len(line) < 1 {
skipped++
continue
}
ev := event.New()
if _, err := ev.Unmarshal(line); err != nil {
ev.Free()
unmarshalErrors++
log.W.F("bbolt import: failed to unmarshal event: %v", err)
continue
}
// Minimal path for migration: store events only, indexes built later
if err := b.SaveEventMinimal(ev); err != nil {
ev.Free()
saveErrors++
log.W.F("bbolt import: failed to save event: %v", err)
continue
}
ev.Free()
line = nil
count++
// Progress logging every logInterval
if time.Since(lastLogTime) >= logInterval {
elapsed := time.Since(startTime)
eventsPerSec := float64(count) / elapsed.Seconds()
mbPerSec := float64(total) / elapsed.Seconds() / 1024 / 1024
log.I.F("bbolt import: progress %d events saved, %.2f MB read, %.0f events/sec, %.2f MB/sec",
count, float64(total)/1024/1024, eventsPerSec, mbPerSec)
lastLogTime = time.Now()
debug.FreeOSMemory()
}
}
// Flush any remaining batched events
if b.batcher != nil {
b.batcher.Flush()
}
// Final summary
elapsed := time.Since(startTime)
eventsPerSec := float64(count) / elapsed.Seconds()
mbPerSec := float64(total) / elapsed.Seconds() / 1024 / 1024
log.I.F("bbolt import: completed - %d events saved, %.2f MB in %v (%.0f events/sec, %.2f MB/sec)",
count, float64(total)/1024/1024, elapsed.Round(time.Millisecond), eventsPerSec, mbPerSec)
if unmarshalErrors > 0 || saveErrors > 0 || skipped > 0 {
log.I.F("bbolt import: stats - %d unmarshal errors, %d save errors, %d skipped empty lines",
unmarshalErrors, saveErrors, skipped)
}
if err := scan.Err(); err != nil {
return count, err
}
// Clear scanner buffer to help GC
scanBuf = nil
return count, nil
}
// processJSONLEventsWithPolicy processes JSONL events from a reader with optional policy filtering
@@ -127,14 +234,21 @@ func (b *B) processJSONLEventsWithPolicy(ctx context.Context, rr io.Reader, poli
continue
}
log.D.F("bbolt import: policy allowed event %x during sync import", ev.ID)
}
if _, err := b.SaveEvent(ctx, ev); err != nil {
// return the pooled buffer on error paths too
ev.Free()
saveErrors++
log.W.F("bbolt import: failed to save event: %v", err)
continue
// With policy checking, use regular SaveEvent path
if _, err := b.SaveEvent(ctx, ev); err != nil {
ev.Free()
saveErrors++
log.W.F("bbolt import: failed to save event: %v", err)
continue
}
} else {
// Minimal path for migration: store events only, build indexes later
if err := b.SaveEventMinimal(ev); err != nil {
ev.Free()
saveErrors++
log.W.F("bbolt import: failed to save event: %v", err)
continue
}
}
// return the pooled buffer after successful save

232
pkg/bbolt/import-minimal.go Normal file
View File

@@ -0,0 +1,232 @@
//go:build !(js && wasm)
package bbolt
import (
"bytes"
"context"
"errors"
"runtime/debug"
"sort"
"time"
bolt "go.etcd.io/bbolt"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/database/bufpool"
"git.mleku.dev/mleku/nostr/encoders/event"
)
// SaveEventMinimal stores only the essential event data for fast bulk import.
// It skips all indexes - call BuildIndexes after import completes.
func (b *B) SaveEventMinimal(ev *event.E) error {
if ev == nil {
return errors.New("nil event")
}
// Reject ephemeral events
if ev.Kind >= 20000 && ev.Kind <= 29999 {
return nil
}
// Get the next serial number
serial := b.getNextEventSerial()
// Serialize event in raw binary format (not compact - preserves full pubkey)
// This allows index building to work without pubkey serial resolution
legacyBuf := bufpool.GetMedium()
defer bufpool.PutMedium(legacyBuf)
ev.MarshalBinary(legacyBuf)
eventData := bufpool.CopyBytes(legacyBuf)
// Create minimal batch - only event data and ID mappings
batch := &EventBatch{
Serial: serial,
EventData: eventData,
Indexes: []BatchedWrite{
// Event ID -> Serial (for lookups)
{BucketName: bucketEid, Key: ev.ID[:], Value: makeSerialKey(serial)},
// Serial -> Event ID (for reverse lookups)
{BucketName: bucketSei, Key: makeSerialKey(serial), Value: ev.ID[:]},
},
}
return b.batcher.Add(batch)
}
// BuildIndexes builds all query indexes from stored events.
// Call this after importing events with SaveEventMinimal.
// Processes events in chunks to avoid OOM on large databases.
func (b *B) BuildIndexes(ctx context.Context) error {
log.I.F("bbolt: starting index build...")
startTime := time.Now()
// Force GC before starting to reclaim batch buffer memory
debug.FreeOSMemory()
// Process in small chunks to avoid OOM on memory-constrained systems
// With ~15 indexes per event and ~50 bytes per key, 50k events = ~37.5MB per chunk
const chunkSize = 50000
var totalEvents int
var lastSerial uint64 = 0
var lastLogTime = time.Now()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Collect indexes for this chunk
indexesByBucket := make(map[string][][]byte)
var chunkEvents int
var chunkSerial uint64
// Read a chunk of events
err := b.db.View(func(tx *bolt.Tx) error {
cmpBucket := tx.Bucket(bucketCmp)
if cmpBucket == nil {
return errors.New("cmp bucket not found")
}
cursor := cmpBucket.Cursor()
// Seek to start position
var k, v []byte
if lastSerial == 0 {
k, v = cursor.First()
} else {
// Seek past the last processed serial
seekKey := makeSerialKey(lastSerial + 1)
k, v = cursor.Seek(seekKey)
}
for ; k != nil && chunkEvents < chunkSize; k, v = cursor.Next() {
serial := decodeSerialKey(k)
chunkSerial = serial
// Decode event from raw binary format
ev := event.New()
if err := ev.UnmarshalBinary(bytes.NewBuffer(v)); err != nil {
log.W.F("bbolt: failed to unmarshal event at serial %d: %v", serial, err)
continue
}
// Generate indexes for this event
rawIdxs, err := database.GetIndexesForEvent(ev, serial)
if chk.E(err) {
ev.Free()
continue
}
// Group by bucket (first 3 bytes)
for _, idx := range rawIdxs {
if len(idx) < 3 {
continue
}
bucketName := string(idx[:3])
key := idx[3:]
// Skip eid and sei - already stored during import
if bucketName == "eid" || bucketName == "sei" {
continue
}
// Make a copy of the key
keyCopy := make([]byte, len(key))
copy(keyCopy, key)
indexesByBucket[bucketName] = append(indexesByBucket[bucketName], keyCopy)
}
ev.Free()
chunkEvents++
}
return nil
})
if err != nil {
return err
}
// No more events to process
if chunkEvents == 0 {
break
}
totalEvents += chunkEvents
lastSerial = chunkSerial
// Progress logging
if time.Since(lastLogTime) >= 5*time.Second {
log.I.F("bbolt: index build progress: %d events processed", totalEvents)
lastLogTime = time.Now()
}
// Count total keys in this chunk
var totalKeys int
for _, keys := range indexesByBucket {
totalKeys += len(keys)
}
log.I.F("bbolt: writing %d index keys for chunk (%d events)", totalKeys, chunkEvents)
// Write this chunk's indexes
for bucketName, keys := range indexesByBucket {
if len(keys) == 0 {
continue
}
bucketBytes := []byte(bucketName)
// Sort keys for this bucket before writing
sort.Slice(keys, func(i, j int) bool {
return bytes.Compare(keys[i], keys[j]) < 0
})
// Write in batches
const batchSize = 50000
for i := 0; i < len(keys); i += batchSize {
end := i + batchSize
if end > len(keys) {
end = len(keys)
}
batch := keys[i:end]
err := b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketBytes)
if bucket == nil {
return nil
}
for _, key := range batch {
if err := bucket.Put(key, nil); err != nil {
return err
}
}
return nil
})
if err != nil {
log.E.F("bbolt: failed to write batch for bucket %s: %v", bucketName, err)
return err
}
}
}
// Clear for next chunk and release memory
indexesByBucket = nil
debug.FreeOSMemory()
}
elapsed := time.Since(startTime)
log.I.F("bbolt: index build complete in %v (%d events)", elapsed.Round(time.Second), totalEvents)
return nil
}
// decodeSerialKey decodes a 5-byte serial key to uint64
func decodeSerialKey(b []byte) uint64 {
if len(b) < 5 {
return 0
}
return uint64(b[0])<<32 | uint64(b[1])<<24 | uint64(b[2])<<16 | uint64(b[3])<<8 | uint64(b[4])
}

View File

@@ -0,0 +1,96 @@
//go:build !(js && wasm)
package bbolt
import (
"errors"
"lol.mleku.dev/chk"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/database/bufpool"
"git.mleku.dev/mleku/nostr/encoders/event"
)
// SaveEventForImport saves an event optimized for bulk import.
// It skips duplicate checking, deletion checking, and graph vertex creation
// to maximize import throughput. Use only for trusted data migration.
func (b *B) SaveEventForImport(ev *event.E) error {
if ev == nil {
return errors.New("nil event")
}
// Reject ephemeral events (kinds 20000-29999)
if ev.Kind >= 20000 && ev.Kind <= 29999 {
return nil // silently skip
}
// Get the next serial number
serial := b.getNextEventSerial()
// Generate all indexes using the shared function
rawIdxs, err := database.GetIndexesForEvent(ev, serial)
if chk.E(err) {
return err
}
// Convert raw indexes to BatchedWrites, stripping the 3-byte prefix
batch := &EventBatch{
Serial: serial,
Indexes: make([]BatchedWrite, 0, len(rawIdxs)+1),
}
for _, idx := range rawIdxs {
if len(idx) < 3 {
continue
}
bucketName := idx[:3]
key := idx[3:]
batch.Indexes = append(batch.Indexes, BatchedWrite{
BucketName: bucketName,
Key: key,
Value: nil,
})
}
// Serialize event in compact format (without graph references for import)
resolver := &nullSerialResolver{}
compactData, compactErr := database.MarshalCompactEvent(ev, resolver)
if compactErr != nil {
// Fall back to legacy format
legacyBuf := bufpool.GetMedium()
defer bufpool.PutMedium(legacyBuf)
ev.MarshalBinary(legacyBuf)
compactData = bufpool.CopyBytes(legacyBuf)
}
batch.EventData = compactData
// Store serial -> event ID mapping
batch.Indexes = append(batch.Indexes, BatchedWrite{
BucketName: bucketSei,
Key: makeSerialKey(serial),
Value: ev.ID[:],
})
// Add to batcher (no graph vertex, no pubkey lookups)
return b.batcher.Add(batch)
}
// nullSerialResolver returns 0 for all lookups, used for fast import
// where we don't need pubkey/event serial references in compact format
type nullSerialResolver struct{}
func (r *nullSerialResolver) GetOrCreatePubkeySerial(pubkey []byte) (uint64, error) {
return 0, nil
}
func (r *nullSerialResolver) GetPubkeyBySerial(serial uint64) ([]byte, error) {
return nil, nil
}
func (r *nullSerialResolver) GetEventSerialById(eventID []byte) (uint64, bool, error) {
return 0, false, nil
}
func (r *nullSerialResolver) GetEventIdBySerial(serial uint64) ([]byte, error) {
return nil, nil
}

View File

@@ -84,8 +84,12 @@ func (b *B) DeleteExpired() {
}
// ProcessDelete processes a deletion event.
// For migration from other backends, deletions have already been processed,
// so this is a no-op. Full implementation needed for production use.
func (b *B) ProcessDelete(ev *event.E, admins [][]byte) error {
return errNotImplemented
// TODO: Implement full deletion processing for production use
// For now, just return nil to allow migrations to proceed
return nil
}
// CheckForDeleted checks if an event has been deleted.

View File

@@ -1 +1 @@
v0.48.1
v0.48.8