Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0dac41e35e | ||
|
|
2480be3a73 |
69
README.md
69
README.md
@@ -12,6 +12,36 @@ zap me: <20>mlekudev@getalby.com
|
||||
|
||||
follow me on [nostr](https://jumble.social/users/npub1fjqqy4a93z5zsjwsfxqhc2764kvykfdyttvldkkkdera8dr78vhsmmleku)
|
||||
|
||||
## Table of Contents
|
||||
|
||||
- [Bug Reports & Feature Requests](#%EF%B8%8F-bug-reports--feature-requests)
|
||||
- [System Requirements](#%EF%B8%8F-system-requirements)
|
||||
- [About](#about)
|
||||
- [Performance & Cryptography](#performance--cryptography)
|
||||
- [Building](#building)
|
||||
- [Prerequisites](#prerequisites)
|
||||
- [Basic Build](#basic-build)
|
||||
- [Building with Web UI](#building-with-web-ui)
|
||||
- [Core Features](#core-features)
|
||||
- [Web UI](#web-ui)
|
||||
- [Sprocket Event Processing](#sprocket-event-processing)
|
||||
- [Policy System](#policy-system)
|
||||
- [Deployment](#deployment)
|
||||
- [Automated Deployment](#automated-deployment)
|
||||
- [TLS Configuration](#tls-configuration)
|
||||
- [systemd Service Management](#systemd-service-management)
|
||||
- [Remote Deployment](#remote-deployment)
|
||||
- [Configuration](#configuration)
|
||||
- [Firewall Configuration](#firewall-configuration)
|
||||
- [Monitoring](#monitoring)
|
||||
- [Testing](#testing)
|
||||
- [Command-Line Tools](#command-line-tools)
|
||||
- [Access Control](#access-control)
|
||||
- [Follows ACL](#follows-acl)
|
||||
- [Curation ACL](#curation-acl)
|
||||
- [Cluster Replication](#cluster-replication)
|
||||
- [Developer Notes](#developer-notes)
|
||||
|
||||
## ⚠️ Bug Reports & Feature Requests
|
||||
|
||||
**Bug reports and feature requests that do not follow the protocol will not be accepted.**
|
||||
@@ -566,9 +596,22 @@ go run ./cmd/subscription-test-simple -url ws://localhost:3334 -duration 120
|
||||
|
||||
## Access Control
|
||||
|
||||
ORLY provides four ACL (Access Control List) modes to control who can publish events to your relay:
|
||||
|
||||
| Mode | Description | Best For |
|
||||
|------|-------------|----------|
|
||||
| `none` | Open relay, anyone can write | Public relays |
|
||||
| `follows` | Write access based on admin follow lists | Personal/community relays |
|
||||
| `managed` | Explicit allow/deny lists via NIP-86 API | Private relays |
|
||||
| `curating` | Three-tier classification with rate limiting | Curated community relays |
|
||||
|
||||
```bash
|
||||
export ORLY_ACL_MODE=follows # or: none, managed, curating
|
||||
```
|
||||
|
||||
### Follows ACL
|
||||
|
||||
The follows ACL (Access Control List) system provides flexible relay access control based on social relationships in the Nostr network.
|
||||
The follows ACL system provides flexible relay access control based on social relationships in the Nostr network.
|
||||
|
||||
```bash
|
||||
export ORLY_ACL_MODE=follows
|
||||
@@ -578,6 +621,30 @@ export ORLY_ADMINS=npub1fjqqy4a93z5zsjwsfxqhc2764kvykfdyttvldkkkdera8dr78vhsmmle
|
||||
|
||||
The system grants write access to users followed by designated admins, with read-only access for others. Follow lists update dynamically as admins modify their relationships.
|
||||
|
||||
### Curation ACL
|
||||
|
||||
The curation ACL mode provides sophisticated content curation with a three-tier publisher classification system:
|
||||
|
||||
- **Trusted**: Unlimited publishing, bypass rate limits
|
||||
- **Blacklisted**: Blocked from publishing, invisible to regular users
|
||||
- **Unclassified**: Rate-limited publishing (default 50 events/day)
|
||||
|
||||
Key features:
|
||||
- **Kind whitelisting**: Only allow specific event kinds (e.g., social, DMs, longform)
|
||||
- **IP-based flood protection**: Auto-ban IPs that exceed rate limits
|
||||
- **Spam flagging**: Mark events as spam without deleting
|
||||
- **Web UI management**: Configure via the built-in curation interface
|
||||
|
||||
```bash
|
||||
export ORLY_ACL_MODE=curating
|
||||
export ORLY_OWNERS=npub1your_owner_key
|
||||
./orly
|
||||
```
|
||||
|
||||
After starting, publish a configuration event (kind 30078) to enable the relay. The web UI at `/#curation` provides a complete management interface.
|
||||
|
||||
For detailed configuration and API documentation, see the [Curation Mode Guide](docs/CURATION_MODE_GUIDE.md).
|
||||
|
||||
### Cluster Replication
|
||||
|
||||
ORLY supports distributed relay clusters using active replication. When configured with peer relays, ORLY will automatically synchronize events between cluster members using efficient HTTP polling.
|
||||
|
||||
411
docs/CURATION_MODE_GUIDE.md
Normal file
411
docs/CURATION_MODE_GUIDE.md
Normal file
@@ -0,0 +1,411 @@
|
||||
# Curation Mode Guide
|
||||
|
||||
Curation mode is a sophisticated access control system for Nostr relays that provides three-tier publisher classification, rate limiting, IP-based flood protection, and event kind whitelisting.
|
||||
|
||||
## Overview
|
||||
|
||||
Unlike simple allow/deny lists, curation mode classifies publishers into three tiers:
|
||||
|
||||
| Tier | Rate Limited | Daily Limit | Visibility |
|
||||
|------|--------------|-------------|------------|
|
||||
| **Trusted** | No | Unlimited | Full |
|
||||
| **Blacklisted** | N/A (blocked) | 0 | Hidden from regular users |
|
||||
| **Unclassified** | Yes | 50 events/day (default) | Full |
|
||||
|
||||
This allows relay operators to:
|
||||
- Reward quality contributors with unlimited publishing
|
||||
- Block bad actors while preserving their events for admin review
|
||||
- Allow new users to participate with reasonable rate limits
|
||||
- Prevent spam floods through automatic IP-based protections
|
||||
|
||||
## Quick Start
|
||||
|
||||
### 1. Start the Relay
|
||||
|
||||
```bash
|
||||
export ORLY_ACL_MODE=curating
|
||||
export ORLY_OWNERS=npub1your_owner_pubkey
|
||||
./orly
|
||||
```
|
||||
|
||||
### 2. Publish Configuration
|
||||
|
||||
The relay will not accept events until you publish a configuration event. Use the web UI at `http://your-relay/#curation` or publish a kind 30078 event:
|
||||
|
||||
```json
|
||||
{
|
||||
"kind": 30078,
|
||||
"tags": [["d", "curating-config"]],
|
||||
"content": "{\"dailyLimit\":50,\"ipDailyLimit\":500,\"firstBanHours\":1,\"secondBanHours\":168,\"kindCategories\":[\"social\"]}"
|
||||
}
|
||||
```
|
||||
|
||||
### 3. Manage Publishers
|
||||
|
||||
Use the web UI or NIP-86 API to:
|
||||
- Trust quality publishers
|
||||
- Blacklist spammers
|
||||
- Review unclassified users by activity
|
||||
- Unblock IPs if needed
|
||||
|
||||
## Configuration
|
||||
|
||||
### Environment Variables
|
||||
|
||||
| Variable | Default | Description |
|
||||
|----------|---------|-------------|
|
||||
| `ORLY_ACL_MODE` | `none` | Set to `curating` to enable |
|
||||
| `ORLY_OWNERS` | | Owner pubkeys (can configure relay) |
|
||||
| `ORLY_ADMINS` | | Admin pubkeys (can manage publishers) |
|
||||
|
||||
### Configuration Event (Kind 30078)
|
||||
|
||||
Configuration is stored as a replaceable Nostr event (kind 30078) with d-tag `curating-config`. Only owners and admins can publish configuration.
|
||||
|
||||
```typescript
|
||||
interface CuratingConfig {
|
||||
// Rate Limiting
|
||||
dailyLimit: number; // Max events/day for unclassified users (default: 50)
|
||||
ipDailyLimit: number; // Max events/day from single IP (default: 500)
|
||||
|
||||
// IP Ban Durations
|
||||
firstBanHours: number; // First offense ban duration (default: 1 hour)
|
||||
secondBanHours: number; // Subsequent offense ban duration (default: 168 hours / 1 week)
|
||||
|
||||
// Kind Filtering (choose one or combine)
|
||||
allowedKinds: number[]; // Explicit kind numbers: [0, 1, 3, 7]
|
||||
allowedRanges: string[]; // Kind ranges: ["1000-1999", "30000-39999"]
|
||||
kindCategories: string[]; // Pre-defined categories: ["social", "dm"]
|
||||
}
|
||||
```
|
||||
|
||||
### Event Kind Categories
|
||||
|
||||
Pre-defined categories for convenient kind whitelisting:
|
||||
|
||||
| Category | Kinds | Description |
|
||||
|----------|-------|-------------|
|
||||
| `social` | 0, 1, 3, 6, 7, 10002 | Profiles, notes, contacts, reposts, reactions |
|
||||
| `dm` | 4, 14, 1059 | Direct messages (NIP-04, NIP-17, gift wraps) |
|
||||
| `longform` | 30023, 30024 | Long-form articles and drafts |
|
||||
| `media` | 1063, 20, 21, 22 | File metadata, picture/video/audio events |
|
||||
| `marketplace` | 30017-30020, 1021, 1022 | Products, stalls, auctions, bids |
|
||||
| `groups_nip29` | 9-12, 9000-9002, 39000-39002 | NIP-29 relay-based groups |
|
||||
| `groups_nip72` | 34550, 1111, 4550 | NIP-72 moderated communities |
|
||||
| `lists` | 10000, 10001, 10003, 30000, 30001, 30003 | Mute, pin, bookmark lists |
|
||||
|
||||
Example configuration allowing social interactions and DMs:
|
||||
|
||||
```json
|
||||
{
|
||||
"kindCategories": ["social", "dm"],
|
||||
"dailyLimit": 100,
|
||||
"ipDailyLimit": 1000
|
||||
}
|
||||
```
|
||||
|
||||
## Three-Tier Classification
|
||||
|
||||
### Trusted Publishers
|
||||
|
||||
Trusted publishers have unlimited publishing rights:
|
||||
- Bypass all rate limiting
|
||||
- Can publish any allowed kind
|
||||
- Events always visible to all users
|
||||
|
||||
**Use case**: Known quality contributors, verified community members, partner relays.
|
||||
|
||||
### Blacklisted Publishers
|
||||
|
||||
Blacklisted publishers are blocked from publishing:
|
||||
- All events rejected with `"pubkey is blacklisted"` error
|
||||
- Existing events become invisible to regular users
|
||||
- Admins and owners can still see blacklisted events (for review)
|
||||
|
||||
**Use case**: Spammers, abusive users, bad actors.
|
||||
|
||||
### Unclassified Publishers
|
||||
|
||||
Everyone else falls into the unclassified tier:
|
||||
- Subject to daily event limit (default: 50 events/day)
|
||||
- Subject to IP-based flood protection
|
||||
- Events visible to all users
|
||||
- Can be promoted to trusted or demoted to blacklisted
|
||||
|
||||
**Use case**: New users, general public.
|
||||
|
||||
## Rate Limiting & Flood Protection
|
||||
|
||||
### Per-Pubkey Limits
|
||||
|
||||
Unclassified publishers are limited to a configurable number of events per day (default: 50). The count resets at midnight UTC.
|
||||
|
||||
When a user exceeds their limit:
|
||||
1. Event is rejected with `"daily event limit exceeded"` error
|
||||
2. Their IP is flagged for potential abuse
|
||||
|
||||
### Per-IP Limits
|
||||
|
||||
To prevent Sybil attacks (creating many pubkeys from one IP), there's also an IP-based daily limit (default: 500 events).
|
||||
|
||||
When an IP exceeds its limit:
|
||||
1. All events from that IP are rejected
|
||||
2. The IP is temporarily banned
|
||||
|
||||
### Automatic IP Banning
|
||||
|
||||
When rate limits are exceeded:
|
||||
|
||||
| Offense | Ban Duration | Description |
|
||||
|---------|--------------|-------------|
|
||||
| First | 1 hour | Quick timeout for accidental over-posting |
|
||||
| Second+ | 1 week | Extended ban for repeated abuse |
|
||||
|
||||
Ban durations are configurable via `firstBanHours` and `secondBanHours`.
|
||||
|
||||
### Offense Tracking
|
||||
|
||||
The system tracks which pubkeys triggered rate limits from each IP:
|
||||
|
||||
```
|
||||
IP 192.168.1.100:
|
||||
- npub1abc... exceeded limit at 2024-01-15 10:30:00
|
||||
- npub1xyz... exceeded limit at 2024-01-15 10:45:00
|
||||
Offense count: 2
|
||||
Status: Banned until 2024-01-22 10:45:00
|
||||
```
|
||||
|
||||
This helps identify coordinated spam attacks.
|
||||
|
||||
## Spam Flagging
|
||||
|
||||
Events can be flagged as spam without deletion:
|
||||
|
||||
- Flagged events are hidden from regular users
|
||||
- Admins can review flagged events
|
||||
- Events can be unflagged if incorrectly marked
|
||||
- Original event data is preserved
|
||||
|
||||
This is useful for:
|
||||
- Moderation review queues
|
||||
- Training spam detection systems
|
||||
- Preserving evidence of abuse
|
||||
|
||||
## NIP-86 Management API
|
||||
|
||||
All management operations use NIP-98 HTTP authentication.
|
||||
|
||||
### Trust Management
|
||||
|
||||
```bash
|
||||
# Trust a pubkey
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"trustpubkey","params":["<pubkey_hex>"]}'
|
||||
|
||||
# Untrust a pubkey
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"untrustpubkey","params":["<pubkey_hex>"]}'
|
||||
|
||||
# List trusted pubkeys
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"listtrustedpubkeys","params":[]}'
|
||||
```
|
||||
|
||||
### Blacklist Management
|
||||
|
||||
```bash
|
||||
# Blacklist a pubkey
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"blacklistpubkey","params":["<pubkey_hex>"]}'
|
||||
|
||||
# Remove from blacklist
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"unblacklistpubkey","params":["<pubkey_hex>"]}'
|
||||
|
||||
# List blacklisted pubkeys
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"listblacklistedpubkeys","params":[]}'
|
||||
```
|
||||
|
||||
### Unclassified User Management
|
||||
|
||||
```bash
|
||||
# List unclassified users sorted by event count
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"listunclassifiedusers","params":[]}'
|
||||
```
|
||||
|
||||
Response includes pubkey, event count, and last activity for each user.
|
||||
|
||||
### Spam Management
|
||||
|
||||
```bash
|
||||
# Mark event as spam
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"markspam","params":["<event_id_hex>"]}'
|
||||
|
||||
# Unmark spam
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"unmarkspam","params":["<event_id_hex>"]}'
|
||||
|
||||
# List spam events
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"listspamevents","params":[]}'
|
||||
```
|
||||
|
||||
### IP Block Management
|
||||
|
||||
```bash
|
||||
# List blocked IPs
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"listblockedips","params":[]}'
|
||||
|
||||
# Unblock an IP
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"unblockip","params":["<ip_address>"]}'
|
||||
```
|
||||
|
||||
### Configuration Management
|
||||
|
||||
```bash
|
||||
# Get current configuration
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"getcuratingconfig","params":[]}'
|
||||
|
||||
# Set allowed kind categories
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"setallowedkindcategories","params":[["social","dm","longform"]]}'
|
||||
|
||||
# Get allowed kind categories
|
||||
curl -X POST https://relay.example.com \
|
||||
-H "Authorization: Nostr <nip98_token>" \
|
||||
-d '{"method":"getallowedkindcategories","params":[]}'
|
||||
```
|
||||
|
||||
## Web UI
|
||||
|
||||
The curation web UI is available at `/#curation` and provides:
|
||||
|
||||
- **Configuration Panel**: Set rate limits, ban durations, and allowed kinds
|
||||
- **Publisher Management**: Trust/blacklist pubkeys with one click
|
||||
- **Unclassified Users**: View users sorted by activity, promote or blacklist
|
||||
- **IP Blocks**: View and unblock banned IP addresses
|
||||
- **Spam Queue**: Review flagged events, confirm or unflag
|
||||
|
||||
## Database Storage
|
||||
|
||||
Curation data is stored in the relay database with the following key prefixes:
|
||||
|
||||
| Prefix | Purpose |
|
||||
|--------|---------|
|
||||
| `CURATING_ACL_CONFIG` | Current configuration |
|
||||
| `CURATING_ACL_TRUSTED_PUBKEY_{pubkey}` | Trusted publisher list |
|
||||
| `CURATING_ACL_BLACKLISTED_PUBKEY_{pubkey}` | Blacklisted publisher list |
|
||||
| `CURATING_ACL_EVENT_COUNT_{pubkey}_{date}` | Daily event counts per pubkey |
|
||||
| `CURATING_ACL_IP_EVENT_COUNT_{ip}_{date}` | Daily event counts per IP |
|
||||
| `CURATING_ACL_IP_OFFENSE_{ip}` | Offense tracking per IP |
|
||||
| `CURATING_ACL_BLOCKED_IP_{ip}` | Active IP blocks |
|
||||
| `CURATING_ACL_SPAM_EVENT_{eventID}` | Spam-flagged events |
|
||||
|
||||
## Caching
|
||||
|
||||
For performance, the following data is cached in memory:
|
||||
|
||||
- Trusted pubkey set
|
||||
- Blacklisted pubkey set
|
||||
- Allowed kinds set
|
||||
- Current configuration
|
||||
|
||||
Caches are refreshed every hour by the background cleanup goroutine.
|
||||
|
||||
## Background Maintenance
|
||||
|
||||
A background goroutine runs hourly to:
|
||||
|
||||
1. Remove expired IP blocks
|
||||
2. Clean up old event count entries (older than 2 days)
|
||||
3. Refresh in-memory caches
|
||||
4. Log maintenance statistics
|
||||
|
||||
## Best Practices
|
||||
|
||||
### Starting a New Curated Relay
|
||||
|
||||
1. Start with permissive settings:
|
||||
```json
|
||||
{"dailyLimit": 100, "ipDailyLimit": 1000, "kindCategories": ["social"]}
|
||||
```
|
||||
|
||||
2. Monitor unclassified users for a few days
|
||||
|
||||
3. Trust active, quality contributors
|
||||
|
||||
4. Blacklist obvious spammers
|
||||
|
||||
5. Adjust rate limits based on observed patterns
|
||||
|
||||
### Handling Spam Waves
|
||||
|
||||
During spam attacks:
|
||||
|
||||
1. The IP-based flood protection will auto-ban attack sources
|
||||
2. Review blocked IPs via web UI or API
|
||||
3. Blacklist any pubkeys that got through
|
||||
4. Consider temporarily lowering `ipDailyLimit`
|
||||
|
||||
### Recovering from Mistakes
|
||||
|
||||
- **Accidentally blacklisted someone**: Use `unblacklistpubkey` - their events become visible again
|
||||
- **Wrongly flagged spam**: Use `unmarkspam` - event becomes visible again
|
||||
- **Blocked legitimate IP**: Use `unblockip` - IP can publish again immediately
|
||||
|
||||
## Comparison with Other ACL Modes
|
||||
|
||||
| Feature | None | Follows | Managed | Curating |
|
||||
|---------|------|---------|---------|----------|
|
||||
| Default Access | Write | Write if followed | Explicit allow | Rate-limited |
|
||||
| Rate Limiting | No | No | No | Yes |
|
||||
| Kind Filtering | No | No | Optional | Yes |
|
||||
| IP Protection | No | No | No | Yes |
|
||||
| Spam Flagging | No | No | No | Yes |
|
||||
| Configuration | Env vars | Follow lists | NIP-86 | Kind 30078 events |
|
||||
| Web UI | Basic | Basic | Basic | Full curation panel |
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### "Relay not accepting events"
|
||||
|
||||
The relay requires a configuration event before accepting any events. Publish a kind 30078 event with d-tag `curating-config`.
|
||||
|
||||
### "daily event limit exceeded"
|
||||
|
||||
The user has exceeded their daily limit. Options:
|
||||
1. Wait until midnight UTC for reset
|
||||
2. Trust the pubkey if they're a quality contributor
|
||||
3. Increase `dailyLimit` in configuration
|
||||
|
||||
### "pubkey is blacklisted"
|
||||
|
||||
The pubkey is on the blacklist. Use `unblacklistpubkey` if this was a mistake.
|
||||
|
||||
### "IP is blocked"
|
||||
|
||||
The IP has been auto-banned due to rate limit violations. Use `unblockip` if legitimate, or wait for the ban to expire.
|
||||
|
||||
### Events disappearing for users
|
||||
|
||||
Check if the event author has been blacklisted. Blacklisted authors' events are hidden from regular users but visible to admins.
|
||||
@@ -31,9 +31,10 @@ func (b *B) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
|
||||
if chk.E(err) {
|
||||
return err
|
||||
}
|
||||
defer os.Remove(tmp.Name()) // Clean up temp file when done
|
||||
tmpName := tmp.Name()
|
||||
defer os.Remove(tmpName) // Clean up temp file when done
|
||||
|
||||
log.I.F("bbolt import: buffering upload to %s", tmp.Name())
|
||||
log.I.F("bbolt import: buffering upload to %s", tmpName)
|
||||
bufferStart := time.Now()
|
||||
bytesBuffered, err := io.Copy(tmp, rr)
|
||||
if chk.E(err) {
|
||||
@@ -48,12 +49,30 @@ func (b *B) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
|
||||
return err
|
||||
}
|
||||
|
||||
processErr := b.processJSONLEvents(ctx, tmp)
|
||||
count, processErr := b.processJSONLEventsReturningCount(ctx, tmp)
|
||||
|
||||
// Close temp file to release resources before index building
|
||||
tmp.Close()
|
||||
|
||||
if processErr != nil {
|
||||
return processErr
|
||||
}
|
||||
|
||||
// Build indexes after events are stored (minimal import mode)
|
||||
if count > 0 {
|
||||
// Force garbage collection to reclaim memory before index building
|
||||
debug.FreeOSMemory()
|
||||
log.I.F("bbolt import: building indexes for %d events...", count)
|
||||
if err := b.BuildIndexes(ctx); err != nil {
|
||||
log.E.F("bbolt import: failed to build indexes: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
totalElapsed := time.Since(startTime)
|
||||
log.I.F("bbolt import: total operation time: %v", totalElapsed.Round(time.Millisecond))
|
||||
|
||||
return processErr
|
||||
return nil
|
||||
}
|
||||
|
||||
// ImportEventsFromStrings imports events from a slice of JSON strings with policy filtering
|
||||
@@ -67,7 +86,95 @@ func (b *B) ImportEventsFromStrings(ctx context.Context, eventJSONs []string, po
|
||||
|
||||
// processJSONLEvents processes JSONL events from a reader
|
||||
func (b *B) processJSONLEvents(ctx context.Context, rr io.Reader) error {
|
||||
return b.processJSONLEventsWithPolicy(ctx, rr, nil)
|
||||
_, err := b.processJSONLEventsReturningCount(ctx, rr)
|
||||
return err
|
||||
}
|
||||
|
||||
// processJSONLEventsReturningCount processes JSONL events and returns the count saved
|
||||
// This is used by ImportEventsFromReader for migration mode (minimal import without inline indexes)
|
||||
func (b *B) processJSONLEventsReturningCount(ctx context.Context, rr io.Reader) (int, error) {
|
||||
// Create a scanner to read the buffer line by line
|
||||
scan := bufio.NewScanner(rr)
|
||||
scanBuf := make([]byte, maxLen)
|
||||
scan.Buffer(scanBuf, maxLen)
|
||||
|
||||
// Performance tracking
|
||||
startTime := time.Now()
|
||||
lastLogTime := startTime
|
||||
const logInterval = 5 * time.Second
|
||||
|
||||
var count, total, skipped, unmarshalErrors, saveErrors int
|
||||
for scan.Scan() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.I.F("bbolt import: context closed after %d events", count)
|
||||
return count, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
line := scan.Bytes()
|
||||
total += len(line) + 1
|
||||
if len(line) < 1 {
|
||||
skipped++
|
||||
continue
|
||||
}
|
||||
|
||||
ev := event.New()
|
||||
if _, err := ev.Unmarshal(line); err != nil {
|
||||
ev.Free()
|
||||
unmarshalErrors++
|
||||
log.W.F("bbolt import: failed to unmarshal event: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Minimal path for migration: store events only, indexes built later
|
||||
if err := b.SaveEventMinimal(ev); err != nil {
|
||||
ev.Free()
|
||||
saveErrors++
|
||||
log.W.F("bbolt import: failed to save event: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
ev.Free()
|
||||
line = nil
|
||||
count++
|
||||
|
||||
// Progress logging every logInterval
|
||||
if time.Since(lastLogTime) >= logInterval {
|
||||
elapsed := time.Since(startTime)
|
||||
eventsPerSec := float64(count) / elapsed.Seconds()
|
||||
mbPerSec := float64(total) / elapsed.Seconds() / 1024 / 1024
|
||||
log.I.F("bbolt import: progress %d events saved, %.2f MB read, %.0f events/sec, %.2f MB/sec",
|
||||
count, float64(total)/1024/1024, eventsPerSec, mbPerSec)
|
||||
lastLogTime = time.Now()
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
}
|
||||
|
||||
// Flush any remaining batched events
|
||||
if b.batcher != nil {
|
||||
b.batcher.Flush()
|
||||
}
|
||||
|
||||
// Final summary
|
||||
elapsed := time.Since(startTime)
|
||||
eventsPerSec := float64(count) / elapsed.Seconds()
|
||||
mbPerSec := float64(total) / elapsed.Seconds() / 1024 / 1024
|
||||
log.I.F("bbolt import: completed - %d events saved, %.2f MB in %v (%.0f events/sec, %.2f MB/sec)",
|
||||
count, float64(total)/1024/1024, elapsed.Round(time.Millisecond), eventsPerSec, mbPerSec)
|
||||
if unmarshalErrors > 0 || saveErrors > 0 || skipped > 0 {
|
||||
log.I.F("bbolt import: stats - %d unmarshal errors, %d save errors, %d skipped empty lines",
|
||||
unmarshalErrors, saveErrors, skipped)
|
||||
}
|
||||
|
||||
if err := scan.Err(); err != nil {
|
||||
return count, err
|
||||
}
|
||||
|
||||
// Clear scanner buffer to help GC
|
||||
scanBuf = nil
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// processJSONLEventsWithPolicy processes JSONL events from a reader with optional policy filtering
|
||||
@@ -127,14 +234,21 @@ func (b *B) processJSONLEventsWithPolicy(ctx context.Context, rr io.Reader, poli
|
||||
continue
|
||||
}
|
||||
log.D.F("bbolt import: policy allowed event %x during sync import", ev.ID)
|
||||
}
|
||||
|
||||
if _, err := b.SaveEvent(ctx, ev); err != nil {
|
||||
// return the pooled buffer on error paths too
|
||||
ev.Free()
|
||||
saveErrors++
|
||||
log.W.F("bbolt import: failed to save event: %v", err)
|
||||
continue
|
||||
// With policy checking, use regular SaveEvent path
|
||||
if _, err := b.SaveEvent(ctx, ev); err != nil {
|
||||
ev.Free()
|
||||
saveErrors++
|
||||
log.W.F("bbolt import: failed to save event: %v", err)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
// Minimal path for migration: store events only, build indexes later
|
||||
if err := b.SaveEventMinimal(ev); err != nil {
|
||||
ev.Free()
|
||||
saveErrors++
|
||||
log.W.F("bbolt import: failed to save event: %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// return the pooled buffer after successful save
|
||||
|
||||
232
pkg/bbolt/import-minimal.go
Normal file
232
pkg/bbolt/import-minimal.go
Normal file
@@ -0,0 +1,232 @@
|
||||
//go:build !(js && wasm)
|
||||
|
||||
package bbolt
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
bolt "go.etcd.io/bbolt"
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/database"
|
||||
"next.orly.dev/pkg/database/bufpool"
|
||||
"git.mleku.dev/mleku/nostr/encoders/event"
|
||||
)
|
||||
|
||||
// SaveEventMinimal stores only the essential event data for fast bulk import.
|
||||
// It skips all indexes - call BuildIndexes after import completes.
|
||||
func (b *B) SaveEventMinimal(ev *event.E) error {
|
||||
if ev == nil {
|
||||
return errors.New("nil event")
|
||||
}
|
||||
|
||||
// Reject ephemeral events
|
||||
if ev.Kind >= 20000 && ev.Kind <= 29999 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the next serial number
|
||||
serial := b.getNextEventSerial()
|
||||
|
||||
// Serialize event in raw binary format (not compact - preserves full pubkey)
|
||||
// This allows index building to work without pubkey serial resolution
|
||||
legacyBuf := bufpool.GetMedium()
|
||||
defer bufpool.PutMedium(legacyBuf)
|
||||
ev.MarshalBinary(legacyBuf)
|
||||
eventData := bufpool.CopyBytes(legacyBuf)
|
||||
|
||||
// Create minimal batch - only event data and ID mappings
|
||||
batch := &EventBatch{
|
||||
Serial: serial,
|
||||
EventData: eventData,
|
||||
Indexes: []BatchedWrite{
|
||||
// Event ID -> Serial (for lookups)
|
||||
{BucketName: bucketEid, Key: ev.ID[:], Value: makeSerialKey(serial)},
|
||||
// Serial -> Event ID (for reverse lookups)
|
||||
{BucketName: bucketSei, Key: makeSerialKey(serial), Value: ev.ID[:]},
|
||||
},
|
||||
}
|
||||
|
||||
return b.batcher.Add(batch)
|
||||
}
|
||||
|
||||
// BuildIndexes builds all query indexes from stored events.
|
||||
// Call this after importing events with SaveEventMinimal.
|
||||
// Processes events in chunks to avoid OOM on large databases.
|
||||
func (b *B) BuildIndexes(ctx context.Context) error {
|
||||
log.I.F("bbolt: starting index build...")
|
||||
startTime := time.Now()
|
||||
|
||||
// Force GC before starting to reclaim batch buffer memory
|
||||
debug.FreeOSMemory()
|
||||
|
||||
// Process in small chunks to avoid OOM on memory-constrained systems
|
||||
// With ~15 indexes per event and ~50 bytes per key, 50k events = ~37.5MB per chunk
|
||||
const chunkSize = 50000
|
||||
|
||||
var totalEvents int
|
||||
var lastSerial uint64 = 0
|
||||
var lastLogTime = time.Now()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
// Collect indexes for this chunk
|
||||
indexesByBucket := make(map[string][][]byte)
|
||||
var chunkEvents int
|
||||
var chunkSerial uint64
|
||||
|
||||
// Read a chunk of events
|
||||
err := b.db.View(func(tx *bolt.Tx) error {
|
||||
cmpBucket := tx.Bucket(bucketCmp)
|
||||
if cmpBucket == nil {
|
||||
return errors.New("cmp bucket not found")
|
||||
}
|
||||
|
||||
cursor := cmpBucket.Cursor()
|
||||
|
||||
// Seek to start position
|
||||
var k, v []byte
|
||||
if lastSerial == 0 {
|
||||
k, v = cursor.First()
|
||||
} else {
|
||||
// Seek past the last processed serial
|
||||
seekKey := makeSerialKey(lastSerial + 1)
|
||||
k, v = cursor.Seek(seekKey)
|
||||
}
|
||||
|
||||
for ; k != nil && chunkEvents < chunkSize; k, v = cursor.Next() {
|
||||
serial := decodeSerialKey(k)
|
||||
chunkSerial = serial
|
||||
|
||||
// Decode event from raw binary format
|
||||
ev := event.New()
|
||||
if err := ev.UnmarshalBinary(bytes.NewBuffer(v)); err != nil {
|
||||
log.W.F("bbolt: failed to unmarshal event at serial %d: %v", serial, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Generate indexes for this event
|
||||
rawIdxs, err := database.GetIndexesForEvent(ev, serial)
|
||||
if chk.E(err) {
|
||||
ev.Free()
|
||||
continue
|
||||
}
|
||||
|
||||
// Group by bucket (first 3 bytes)
|
||||
for _, idx := range rawIdxs {
|
||||
if len(idx) < 3 {
|
||||
continue
|
||||
}
|
||||
bucketName := string(idx[:3])
|
||||
key := idx[3:]
|
||||
|
||||
// Skip eid and sei - already stored during import
|
||||
if bucketName == "eid" || bucketName == "sei" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Make a copy of the key
|
||||
keyCopy := make([]byte, len(key))
|
||||
copy(keyCopy, key)
|
||||
indexesByBucket[bucketName] = append(indexesByBucket[bucketName], keyCopy)
|
||||
}
|
||||
|
||||
ev.Free()
|
||||
chunkEvents++
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// No more events to process
|
||||
if chunkEvents == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
totalEvents += chunkEvents
|
||||
lastSerial = chunkSerial
|
||||
|
||||
// Progress logging
|
||||
if time.Since(lastLogTime) >= 5*time.Second {
|
||||
log.I.F("bbolt: index build progress: %d events processed", totalEvents)
|
||||
lastLogTime = time.Now()
|
||||
}
|
||||
|
||||
// Count total keys in this chunk
|
||||
var totalKeys int
|
||||
for _, keys := range indexesByBucket {
|
||||
totalKeys += len(keys)
|
||||
}
|
||||
log.I.F("bbolt: writing %d index keys for chunk (%d events)", totalKeys, chunkEvents)
|
||||
|
||||
// Write this chunk's indexes
|
||||
for bucketName, keys := range indexesByBucket {
|
||||
if len(keys) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
bucketBytes := []byte(bucketName)
|
||||
|
||||
// Sort keys for this bucket before writing
|
||||
sort.Slice(keys, func(i, j int) bool {
|
||||
return bytes.Compare(keys[i], keys[j]) < 0
|
||||
})
|
||||
|
||||
// Write in batches
|
||||
const batchSize = 50000
|
||||
for i := 0; i < len(keys); i += batchSize {
|
||||
end := i + batchSize
|
||||
if end > len(keys) {
|
||||
end = len(keys)
|
||||
}
|
||||
batch := keys[i:end]
|
||||
|
||||
err := b.db.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(bucketBytes)
|
||||
if bucket == nil {
|
||||
return nil
|
||||
}
|
||||
for _, key := range batch {
|
||||
if err := bucket.Put(key, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.E.F("bbolt: failed to write batch for bucket %s: %v", bucketName, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clear for next chunk and release memory
|
||||
indexesByBucket = nil
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
|
||||
elapsed := time.Since(startTime)
|
||||
log.I.F("bbolt: index build complete in %v (%d events)", elapsed.Round(time.Second), totalEvents)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// decodeSerialKey decodes a 5-byte serial key to uint64
|
||||
func decodeSerialKey(b []byte) uint64 {
|
||||
if len(b) < 5 {
|
||||
return 0
|
||||
}
|
||||
return uint64(b[0])<<32 | uint64(b[1])<<24 | uint64(b[2])<<16 | uint64(b[3])<<8 | uint64(b[4])
|
||||
}
|
||||
96
pkg/bbolt/save-event-bulk.go
Normal file
96
pkg/bbolt/save-event-bulk.go
Normal file
@@ -0,0 +1,96 @@
|
||||
//go:build !(js && wasm)
|
||||
|
||||
package bbolt
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"next.orly.dev/pkg/database"
|
||||
"next.orly.dev/pkg/database/bufpool"
|
||||
"git.mleku.dev/mleku/nostr/encoders/event"
|
||||
)
|
||||
|
||||
// SaveEventForImport saves an event optimized for bulk import.
|
||||
// It skips duplicate checking, deletion checking, and graph vertex creation
|
||||
// to maximize import throughput. Use only for trusted data migration.
|
||||
func (b *B) SaveEventForImport(ev *event.E) error {
|
||||
if ev == nil {
|
||||
return errors.New("nil event")
|
||||
}
|
||||
|
||||
// Reject ephemeral events (kinds 20000-29999)
|
||||
if ev.Kind >= 20000 && ev.Kind <= 29999 {
|
||||
return nil // silently skip
|
||||
}
|
||||
|
||||
// Get the next serial number
|
||||
serial := b.getNextEventSerial()
|
||||
|
||||
// Generate all indexes using the shared function
|
||||
rawIdxs, err := database.GetIndexesForEvent(ev, serial)
|
||||
if chk.E(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Convert raw indexes to BatchedWrites, stripping the 3-byte prefix
|
||||
batch := &EventBatch{
|
||||
Serial: serial,
|
||||
Indexes: make([]BatchedWrite, 0, len(rawIdxs)+1),
|
||||
}
|
||||
|
||||
for _, idx := range rawIdxs {
|
||||
if len(idx) < 3 {
|
||||
continue
|
||||
}
|
||||
bucketName := idx[:3]
|
||||
key := idx[3:]
|
||||
batch.Indexes = append(batch.Indexes, BatchedWrite{
|
||||
BucketName: bucketName,
|
||||
Key: key,
|
||||
Value: nil,
|
||||
})
|
||||
}
|
||||
|
||||
// Serialize event in compact format (without graph references for import)
|
||||
resolver := &nullSerialResolver{}
|
||||
compactData, compactErr := database.MarshalCompactEvent(ev, resolver)
|
||||
if compactErr != nil {
|
||||
// Fall back to legacy format
|
||||
legacyBuf := bufpool.GetMedium()
|
||||
defer bufpool.PutMedium(legacyBuf)
|
||||
ev.MarshalBinary(legacyBuf)
|
||||
compactData = bufpool.CopyBytes(legacyBuf)
|
||||
}
|
||||
batch.EventData = compactData
|
||||
|
||||
// Store serial -> event ID mapping
|
||||
batch.Indexes = append(batch.Indexes, BatchedWrite{
|
||||
BucketName: bucketSei,
|
||||
Key: makeSerialKey(serial),
|
||||
Value: ev.ID[:],
|
||||
})
|
||||
|
||||
// Add to batcher (no graph vertex, no pubkey lookups)
|
||||
return b.batcher.Add(batch)
|
||||
}
|
||||
|
||||
// nullSerialResolver returns 0 for all lookups, used for fast import
|
||||
// where we don't need pubkey/event serial references in compact format
|
||||
type nullSerialResolver struct{}
|
||||
|
||||
func (r *nullSerialResolver) GetOrCreatePubkeySerial(pubkey []byte) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (r *nullSerialResolver) GetPubkeyBySerial(serial uint64) ([]byte, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (r *nullSerialResolver) GetEventSerialById(eventID []byte) (uint64, bool, error) {
|
||||
return 0, false, nil
|
||||
}
|
||||
|
||||
func (r *nullSerialResolver) GetEventIdBySerial(serial uint64) ([]byte, error) {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -84,8 +84,12 @@ func (b *B) DeleteExpired() {
|
||||
}
|
||||
|
||||
// ProcessDelete processes a deletion event.
|
||||
// For migration from other backends, deletions have already been processed,
|
||||
// so this is a no-op. Full implementation needed for production use.
|
||||
func (b *B) ProcessDelete(ev *event.E, admins [][]byte) error {
|
||||
return errNotImplemented
|
||||
// TODO: Implement full deletion processing for production use
|
||||
// For now, just return nil to allow migrations to proceed
|
||||
return nil
|
||||
}
|
||||
|
||||
// CheckForDeleted checks if an event has been deleted.
|
||||
|
||||
@@ -1 +1 @@
|
||||
v0.48.1
|
||||
v0.48.8
|
||||
|
||||
Reference in New Issue
Block a user