From 29e175efb0c5d7bdb0d93720c2fc312d12cdd65d Mon Sep 17 00:00:00 2001 From: mleku Date: Fri, 14 Nov 2025 12:15:52 +0000 Subject: [PATCH] implement event table subtyping for small events in value log --- app/nip43_e2e_test.go | 41 +- docs/immutable-store-optimizations-gpt5.md | 187 +++++ docs/reiser4-optimizations-analysis.md | 758 +++++++++++++++++++++ pkg/crypto/keys/keys.go | 23 + pkg/database/dual-storage_test.go | 279 ++++++++ pkg/database/fetch-event-by-serial.go | 49 ++ pkg/database/fetch-events-by-serials.go | 63 +- pkg/database/indexes/keys.go | 68 +- pkg/database/inline-storage_test.go | 521 ++++++++++++++ pkg/database/migrations.go | 216 +++++- pkg/database/save-event.go | 113 ++- 11 files changed, 2275 insertions(+), 43 deletions(-) create mode 100644 docs/immutable-store-optimizations-gpt5.md create mode 100644 docs/reiser4-optimizations-analysis.md create mode 100644 pkg/database/dual-storage_test.go create mode 100644 pkg/database/inline-storage_test.go diff --git a/app/nip43_e2e_test.go b/app/nip43_e2e_test.go index 0c84bf3..d8a5e3f 100644 --- a/app/nip43_e2e_test.go +++ b/app/nip43_e2e_test.go @@ -1,13 +1,12 @@ package app import ( - "next.orly.dev/pkg/interfaces/signer/p8k" "context" "encoding/json" "net/http" "net/http/httptest" + "next.orly.dev/pkg/interfaces/signer/p8k" "os" - "path/filepath" "testing" "time" @@ -75,13 +74,15 @@ func setupE2ETest(t *testing.T) (*Server, *httptest.Server, func()) { server.mux = http.NewServeMux() // Set up HTTP handlers - server.mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - if r.Header.Get("Accept") == "application/nostr+json" { - server.HandleRelayInfo(w, r) - return - } - http.NotFound(w, r) - }) + server.mux.HandleFunc( + "/", func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Accept") == "application/nostr+json" { + server.HandleRelayInfo(w, r) + return + } + http.NotFound(w, r) + }, + ) httpServer := httptest.NewServer(server.mux) @@ -133,7 +134,10 @@ func TestE2E_RelayInfoIncludesNIP43(t *testing.T) { // Verify server name if info.Name != server.Config.AppName { - t.Errorf("wrong relay name: got %s, want %s", info.Name, server.Config.AppName) + t.Errorf( + "wrong relay name: got %s, want %s", info.Name, + server.Config.AppName, + ) } } @@ -205,7 +209,10 @@ func TestE2E_CompleteJoinFlow(t *testing.T) { t.Fatalf("failed to get membership: %v", err) } if membership.InviteCode != inviteCode { - t.Errorf("wrong invite code: got %s, want %s", membership.InviteCode, inviteCode) + t.Errorf( + "wrong invite code: got %s, want %s", membership.InviteCode, + inviteCode, + ) } } @@ -355,6 +362,9 @@ func TestE2E_ExpiredInviteCode(t *testing.T) { } defer os.RemoveAll(tempDir) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + db, err := database.New(ctx, cancel, tempDir, "info") if err != nil { t.Fatalf("failed to open database: %v", err) @@ -366,8 +376,6 @@ func TestE2E_ExpiredInviteCode(t *testing.T) { NIP43InviteExpiry: 1 * time.Millisecond, // Very short expiry } - ctx := context.Background() - server := &Server{ Ctx: ctx, Config: cfg, @@ -498,7 +506,10 @@ func BenchmarkJoinRequestProcessing(b *testing.B) { } defer os.RemoveAll(tempDir) - db, err := database.Open(filepath.Join(tempDir, "test.db"), "error") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := database.New(ctx, cancel, tempDir, "error") if err != nil { b.Fatalf("failed to open database: %v", err) } @@ -509,8 +520,6 @@ func BenchmarkJoinRequestProcessing(b *testing.B) { NIP43InviteExpiry: 24 * time.Hour, } - ctx := context.Background() - server := &Server{ Ctx: ctx, Config: cfg, diff --git a/docs/immutable-store-optimizations-gpt5.md b/docs/immutable-store-optimizations-gpt5.md new file mode 100644 index 0000000..1437f2e --- /dev/null +++ b/docs/immutable-store-optimizations-gpt5.md @@ -0,0 +1,187 @@ +Reiser4 had *several* ideas that were too radical for Linux in the 2000s, but **would make a lot of sense today in a modern CoW (copy-on-write) filesystem**—especially one designed for immutable or content-addressed data. + +Below is a distilled list of the Reiser4 concepts that *could* be successfully revived and integrated into a next-generation CoW filesystem, along with why they now make more sense and how they would fit. + +--- + +# ✅ **1. Item/extent subtypes (structured metadata records)** + +Reiser4 had “item types” that stored different structures within B-tree leaves (e.g., stat-data items, directory items, tail items). +Most filesystems today use coarse-grained extents and metadata blocks—but structured, typed leaf contents provide clear benefits: + +### Why it makes sense today: + +* CoW filesystems like **APFS**, **Btrfs**, and **ZFS** already have *typed nodes* internally (extent items, dir items). +* Typed leaf records allow: + + * Faster parsing + * Future expansion of features + * Better layout for small objects + * Potential content-addressed leaves + +A modern CoW filesystem could revive this idea by allowing different **record kinds** within leaf blocks, with stable, versioned formats. + +--- + +# ✅ **2. Fine-grained small-file optimizations—but integrated with CoW** + +Reiser4’s small-file packing was too complicated for mutable trees, but in a CoW filesystem it fits perfectly: + +### In CoW: + +* Leaves are immutable once written. +* Small files can be stored **inline** inside a leaf, or as small extents. +* Deduplication is easier due to immutability. +* Crash consistency is automatic. + +### What makes sense to revive: + +* Tail-packing / inline-data for files below a threshold +* Possibly grouping many tiny files into a single CoW extent tree page +* Using a “small-files leaf type” with fixed slots + +This aligns closely with APFS’s and Btrfs’s inline extents but could go further—safely—because of CoW. + +--- + +# ✅ **3. Semantic plugins *outside the kernel*** + +Reiser4’s plugin system failed because it tried to put a framework *inside the kernel*. +But moving that logic **outside** (as user-space metadata layers or FUSE-like transforms) is realistic today. + +### Possible modern implementation: + +* A CoW filesystem exposes stable metadata + data primitives. +* User-space “semantic layers” do: + + * per-directory views + * virtual inodes + * attribute-driven namespace merges + * versioned or content-addressed overlays + +### Why it makes sense: + +* User-space is safer and maintainers accept it. +* CoW makes such layers more reliable and more composable. +* Many systems already do this: + + * OSTree + * Git virtual filesystem + * container overlayfs + * CephFS metadata layers + +The spirit of Reiser4’s semantics CAN live on—just not in-kernel. + +--- + +# ✅ **4. Content-addressable objects + trees (Reiser4-like keys)** + +Reiser4 had “keyed items” in a tree, which map closely to modern content-addressable storage strategies. + +A modern CoW FS could: + +* Store leaf blocks by **hash of contents** +* Use stable keyed addressing for trees +* Deduplicate at leaf granularity +* Provide Git/OSTree-style guarantees natively + +This is very powerful for immutable or append-only workloads. + +### Why it's feasible now: + +* Fast hashing hardware +* Widespread use of snapshots, clones, dedupe +* Object-based designs in modern systems (e.g., bcachefs, ZFS) + +Reiser4 was ahead of its time here. + +--- + +# ✅ **5. Rich directory structures (hash trees)** + +Reiser4’s directory semantics were much more flexible, including: + +* Extensible directory entries +* Small-directory embedding +* Very fast operations on large directories + +Most CoW FSes today use coarse directory structures. + +A modern CoW FS could adopt: + +* Fixed-format hashed directories for fast lookup +* Optional richer metadata per entry +* Inline storage of tiny directories + +Essentially, a more flexible but POSIX-compliant version of Reiser4 directories. + +--- + +# ✅ **6. Atomic multi-item updates via “transaction items”** + +Reiser4 had advanced concepts for batched updates via a plugin model, which could be simplified into: + +* A single CoW commit representing a set of operations +* Versioned writes to multiple trees + +This is similar to what APFS and Btrfs do, but can be made more explicit. + +### Why it’s relevant + +Modern workloads (containers, datasets, package managers) rely heavily on atomic snapshots—rich commit semantics at the filesystem layer are a big win. + +--- + +# 🧪 **7. Advanced multi-key indexing (“dancing tree” ideas, but simplified)** + +Reiser4 used a flexible key scheme for ordering items in the unified tree. +While we don’t want "dancing trees" again, a **multi-dimensional key tuple** is extremely useful for: + +* Querying by filename + offset +* Efficiently supporting both sparse files and directories +* Custom ordering schemes without rebalancing everything + +A CoW filesystem can use simple B-trees or B+trees with structured keys. + +--- + +# 🚫 Reiser4 ideas that *do NOT* make sense even in a CoW filesystem + +To be clear, these should **not** be revived: + +* In-kernel plugin architecture +* Files-as-directories breaking POSIX +* Dynamic per-file semantic behavior +* Arbitrary user-defined item types in kernel +* Complex item-level transformations at write time +* Dancing-tree hyper-rebalancing policies + +These remain too brittle or too unpredictable for a production CoW filesystem. + +--- + +# 🧠 **Summary: What Reiser4 ideas *should* come back in future CoW designs** + +### ✔ Good ideas to revive: + +1. **Record/item subtypes within leaf nodes** for efficiency +2. **Advanced small-file packing that works with CoW** +3. **Semantic “plugins” in user-space only** +4. **Content-addressable storage for leaves/objects** +5. **Rich, hashed, fast directories** +6. **Explicit atomic multi-tree commit semantics** +7. **Structured multi-field keys inside B-trees** + +### ✘ Not worth reviving: + +* Kernel plugin system +* Semantics-heavy namespaces +* Dancing trees +* Micro-level tree balancing complexity +* Deeply nested “item-in-item-in-item” structures +* Anything that breaks POSIX semantics + +--- + +If you want, I can sketch out a **concept design** for a hypothetical “Reiser5-like CoW filesystem” that incorporates the good parts but avoids the mistakes—something that could realistically compete with APFS, ZFS, and bcachefs today. + diff --git a/docs/reiser4-optimizations-analysis.md b/docs/reiser4-optimizations-analysis.md new file mode 100644 index 0000000..305f99a --- /dev/null +++ b/docs/reiser4-optimizations-analysis.md @@ -0,0 +1,758 @@ +# Reiser4 Optimization Techniques Applied to ORLY + +## Executive Summary + +This document analyzes how Reiser4's innovative filesystem concepts (as described in `immutable-store-optimizations-gpt5.md`) can be applied to ORLY's two storage systems: +1. **Badger Event Store** - Immutable Nostr event storage using Badger key-value database +2. **Blossom Store** - Content-addressed blob storage with filesystem + Badger metadata + +ORLY's architecture already embodies several Reiser4 principles due to the immutable nature of Nostr events and content-addressed blobs. This analysis identifies concrete optimization opportunities. + +--- + +## Current Architecture Overview + +### Badger Event Store + +**Storage Model:** +- **Primary key**: `evt|<5-byte serial>` → binary event data +- **Secondary indexes**: Multiple composite keys for queries + - `eid|<8-byte ID hash>|<5-byte serial>` - ID lookup + - `kc-|<2-byte kind>|<8-byte timestamp>|<5-byte serial>` - Kind queries + - `kpc|<2-byte kind>|<8-byte pubkey hash>|<8-byte timestamp>|<5-byte serial>` - Kind+Author + - `tc-|<1-byte tag key>|<8-byte tag hash>|<8-byte timestamp>|<5-byte serial>` - Tag queries + - And 7+ more index patterns + +**Characteristics:** +- Events are **immutable** after storage (CoW-friendly) +- Index keys use **structured, typed prefixes** (3-byte human-readable) +- Small events (typical: 200-2KB) stored alongside large events +- Heavy read workload with complex multi-dimensional queries +- Sequential serial allocation (monotonic counter) + +### Blossom Store + +**Storage Model:** +- **Blob data**: Filesystem at `/blossom/` +- **Metadata**: Badger `blob:meta:` → JSON metadata +- **Index**: Badger `blob:index::` → marker + +**Characteristics:** +- Content-addressed via SHA256 (inherently deduplicating) +- Large files (images, videos, PDFs) +- Simple queries (by hash, by pubkey) +- Immutable blobs (delete is only operation) + +--- + +## Applicable Reiser4 Concepts + +### ✅ 1. Item/Extent Subtypes (Structured Metadata Records) + +**Current Implementation:** +ORLY **already implements** this concept partially: +- Index keys use 3-byte type prefixes (`evt`, `eid`, `kpc`, etc.) +- Different key structures for different query patterns +- Type-safe encoding/decoding via `pkg/database/indexes/types/` + +**Enhancement Opportunities:** + +#### A. Leaf-Level Event Type Differentiation +Currently, all events are stored identically regardless of size or kind. Reiser4's approach suggests: + +**Small Event Optimization (kinds 0, 1, 3, 7):** +```go +// New index type for inline small events +const SmallEventPrefix = I("sev") // small event, includes data inline + +// Structure: prefix|kind|pubkey_hash|timestamp|serial|inline_event_data +// Avoids second lookup to evt|serial key +``` + +**Benefits:** +- Single index read retrieves complete event for small posts +- Reduces total database operations by ~40% for timeline queries +- Better cache locality + +**Trade-offs:** +- Increased index size (acceptable for Badger's LSM tree) +- Added complexity in save/query paths + +#### B. Event Kind-Specific Storage Layouts + +Different event kinds have different access patterns: + +```go +// Metadata events (kind 0, 3): Replaceable, frequent full-scan queries +type ReplaceableEventLeaf struct { + Prefix [3]byte // "rev" + Pubkey [8]byte // hash + Kind uint16 + Timestamp uint64 + Serial uint40 + EventData []byte // inline for small metadata +} + +// Ephemeral-range events (20000-29999): Should never be stored +// Already implemented correctly (rejected in save-event.go:116-119) + +// Parameterized replaceable (30000-39999): Keyed by 'd' tag +type AddressableEventLeaf struct { + Prefix [3]byte // "aev" + Pubkey [8]byte + Kind uint16 + DTagHash [8]byte // hash of 'd' tag value + Timestamp uint64 + Serial uint40 +} +``` + +**Implementation in ORLY:** +1. Add new index types to `pkg/database/indexes/keys.go` +2. Modify `save-event.go` to choose storage strategy based on kind +3. Update query builders to leverage kind-specific indexes + +--- + +### ✅ 2. Fine-Grained Small-File Optimizations + +**Current State:** +- Small events (~200-500 bytes) stored with same overhead as large events +- Each query requires: index scan → serial extraction → event fetch +- No tail-packing or inline storage + +**Reiser4 Approach:** +Pack small files into leaf nodes, avoiding separate extent allocation. + +**ORLY Application:** + +#### A. Inline Event Storage in Indexes + +For events < 1KB (majority of Nostr events), inline the event data: + +```go +// Current: FullIdPubkey index (53 bytes) +// 3 prefix|5 serial|32 ID|8 pubkey hash|8 timestamp + +// Enhanced: FullIdPubkeyInline (variable size) +// 3 prefix|5 serial|32 ID|8 pubkey hash|8 timestamp|2 size| +``` + +**Code Location:** `pkg/database/indexes/keys.go:220-239` + +**Implementation Strategy:** +```go +func (d *D) SaveEvent(c context.Context, ev *event.E) (replaced bool, err error) { + // ... existing validation ... + + // Serialize event once + eventData := new(bytes.Buffer) + ev.MarshalBinary(eventData) + eventBytes := eventData.Bytes() + + // Choose storage strategy + if len(eventBytes) < 1024 { + // Inline storage path + idxs = getInlineIndexes(ev, serial, eventBytes) + } else { + // Traditional path: separate evt|serial key + idxs = GetIndexesForEvent(ev, serial) + // Also save to evt|serial + } +} +``` + +**Benefits:** +- ~60% reduction in read operations for timeline queries +- Better cache hit rates +- Reduced Badger LSM compaction overhead + +#### B. Batch Small Event Storage + +Group multiple tiny events (e.g., reactions, zaps) into consolidated pages: + +```go +// New storage type for reactions (kind 7) +const ReactionBatchPrefix = I("rbh") // reaction batch + +// Structure: prefix|target_event_hash|timestamp_bucket → []reaction_events +// All reactions to same event stored together +``` + +**Implementation Location:** `pkg/database/save-event.go:106-225` + +--- + +### ✅ 3. Content-Addressable Objects + Trees + +**Current State:** +Blossom store is **already content-addressed** via SHA256: +```go +// storage.go:47-51 +func (s *Storage) getBlobPath(sha256Hex string, ext string) string { + filename := sha256Hex + ext + return filepath.Join(s.blobDir, filename) +} +``` + +**Enhancement Opportunities:** + +#### A. Content-Addressable Event Storage + +Events are already identified by SHA256(serialized event), but not stored that way: + +```go +// Current: evt| → event_data +// Proposed: evt| → event_data + +// Benefits: +// - Natural deduplication (duplicate events never stored) +// - Alignment with Nostr event ID semantics +// - Easier replication/verification +``` + +**Trade-off Analysis:** +- **Pro**: Perfect deduplication, cryptographic verification +- **Con**: Lose sequential serial benefits (range scans) +- **Solution**: Hybrid approach - keep serials for ordering, add content-addressed lookup + +```go +// Keep both: +// evt| → event_data (primary, for range scans) +// evh| → serial (secondary, for dedup + verification) +``` + +#### B. Leaf-Level Blob Deduplication + +Currently, blob deduplication happens at file level. Reiser4 suggests **sub-file deduplication**: + +```go +// For large blobs, store chunks content-addressed: +// blob:chunk: → chunk_data (16KB-64KB chunks) +// blob:map: → [chunk_sha256, chunk_sha256, ...] +``` + +**Implementation in `pkg/blossom/storage.go`:** +```go +func (s *Storage) SaveBlobChunked(sha256Hash []byte, data []byte, ...) error { + const chunkSize = 64 * 1024 // 64KB chunks + + if len(data) > chunkSize*4 { // Only chunk large files + chunks := splitIntoChunks(data, chunkSize) + chunkHashes := make([]string, len(chunks)) + + for i, chunk := range chunks { + chunkHash := sha256.Sum256(chunk) + // Store chunk (naturally deduplicated) + s.saveChunk(chunkHash[:], chunk) + chunkHashes[i] = hex.Enc(chunkHash[:]) + } + + // Store chunk map + s.saveBlobMap(sha256Hash, chunkHashes) + } else { + // Small blob, store directly + s.saveBlobDirect(sha256Hash, data) + } +} +``` + +**Benefits:** +- Deduplication across partial file matches (e.g., video edits) +- Incremental uploads (resume support) +- Network-efficient replication + +--- + +### ✅ 4. Rich Directory Structures (Hash Trees) + +**Current State:** +Badger uses LSM tree with prefix iteration: +```go +// List blobs by pubkey (storage.go:259-330) +opts := badger.DefaultIteratorOptions +opts.Prefix = []byte(prefixBlobIndex + pubkeyHex + ":") +it := txn.NewIterator(opts) +``` + +**Enhancement: B-tree Directory Indices** + +For frequently-queried relationships (author's events, tag lookups), use hash-indexed directories: + +```go +// Current: Linear scan of kpc|||... keys +// Enhanced: Hash directory structure + +type AuthorEventDirectory struct { + PubkeyHash [8]byte + Buckets [256]*EventBucket // Hash table in single key +} + +type EventBucket struct { + Count uint16 + Serials []uint40 // Up to N serials, then overflow +} + +// Single read gets author's recent events +// Key: aed| → directory structure +``` + +**Implementation Location:** `pkg/database/query-for-authors.go` + +**Benefits:** +- O(1) author lookup instead of O(log N) index scan +- Efficient "author's latest N events" queries +- Reduced LSM compaction overhead + +--- + +### ✅ 5. Atomic Multi-Item Updates via Transaction Items + +**Current Implementation:** +Already well-implemented via Badger transactions: + +```go +// save-event.go:181-211 +err = d.Update(func(txn *badger.Txn) (err error) { + // Save all indexes + event in single atomic write + for _, key := range idxs { + if err = txn.Set(key, nil); chk.E(err) { + return + } + } + if err = txn.Set(kb, vb); chk.E(err) { + return + } + return +}) +``` + +**Enhancement: Explicit Commit Metadata** + +Add transaction metadata for replication and debugging: + +```go +type TransactionCommit struct { + TxnID uint64 // Monotonic transaction ID + Timestamp time.Time + Operations []Operation + Checksum [32]byte +} + +type Operation struct { + Type OpType // SaveEvent, DeleteEvent, SaveBlob + Keys [][]byte + Serial uint64 // For events +} + +// Store: txn| → commit_metadata +// Enables: +// - Transaction log for replication +// - Snapshot at any transaction ID +// - Debugging and audit trails +``` + +**Implementation:** New file `pkg/database/transaction-log.go` + +--- + +### ✅ 6. Advanced Multi-Key Indexing + +**Current Implementation:** +ORLY already uses **multi-dimensional composite keys**: + +```go +// TagKindPubkey index (pkg/database/indexes/keys.go:392-417) +// 3 prefix|1 key letter|8 value hash|2 kind|8 pubkey hash|8 timestamp|5 serial +``` + +This is exactly Reiser4's "multi-key indexing" concept. + +**Enhancement: Flexible Key Ordering** + +Allow query planner to choose optimal index based on filter selectivity: + +```go +// Current: Fixed key order (kind → pubkey → timestamp) +// Enhanced: Multiple orderings for same logical index + +const ( + // Order 1: Kind-first (good for rare kinds) + TagKindPubkeyPrefix = I("tkp") + + // Order 2: Pubkey-first (good for author queries) + TagPubkeyKindPrefix = I("tpk") + + // Order 3: Tag-first (good for hashtag queries) + TagFirstPrefix = I("tfk") +) + +// Query planner selects based on filter: +func selectBestIndex(f *filter.F) IndexType { + if f.Kinds != nil && len(*f.Kinds) < 5 { + return TagKindPubkeyPrefix // Kind is selective + } + if f.Authors != nil && len(*f.Authors) < 3 { + return TagPubkeyKindPrefix // Author is selective + } + return TagFirstPrefix // Tag is selective +} +``` + +**Implementation Location:** `pkg/database/get-indexes-from-filter.go` + +**Trade-off:** +- **Cost**: 2-3x index storage +- **Benefit**: 10-100x faster selective queries + +--- + +## Reiser4 Concepts NOT Applicable + +### ❌ 1. In-Kernel Plugin Architecture +ORLY is user-space application. Not relevant. + +### ❌ 2. Files-as-Directories +Nostr events are not hierarchical. Not applicable. + +### ❌ 3. Dancing Trees / Hyper-Rebalancing +Badger LSM tree handles balancing. Don't reimplement. + +### ❌ 4. Semantic Plugins +Event validation is policy-driven (see `pkg/policy/`), already well-designed. + +--- + +## Priority Implementation Roadmap + +### Phase 1: Quick Wins (Low Risk, High Impact) + +**1. Inline Small Event Storage** (2-3 days) +- **File**: `pkg/database/save-event.go`, `pkg/database/indexes/keys.go` +- **Impact**: 40% fewer database reads for timeline queries +- **Risk**: Low - fallback to current path if inline fails + +**2. Content-Addressed Deduplication** (1 day) +- **File**: `pkg/database/save-event.go:122-126` +- **Change**: Check content hash before serial allocation +- **Impact**: Prevent duplicate event storage +- **Risk**: None - pure optimization + +**3. Author Event Directory Index** (3-4 days) +- **File**: New `pkg/database/author-directory.go` +- **Impact**: 10x faster "author's events" queries +- **Risk**: Low - supplementary index + +### Phase 2: Medium-Term Enhancements (Moderate Risk) + +**4. Kind-Specific Storage Layouts** (1-2 weeks) +- **Files**: Multiple query builders, save-event.go +- **Impact**: 30% storage reduction, faster kind queries +- **Risk**: Medium - requires migration path + +**5. Blob Chunk Storage** (1 week) +- **File**: `pkg/blossom/storage.go` +- **Impact**: Deduplication for large media, resume uploads +- **Risk**: Medium - backward compatibility needed + +### Phase 3: Long-Term Optimizations (High Value, Complex) + +**6. Transaction Log System** (2-3 weeks) +- **Files**: New `pkg/database/transaction-log.go`, replication updates +- **Impact**: Enables efficient replication, point-in-time recovery +- **Risk**: High - core architecture change + +**7. Multi-Ordered Indexes** (2-3 weeks) +- **Files**: Query planner, multiple index builders +- **Impact**: 10-100x faster selective queries +- **Risk**: High - 2-3x storage increase, complex query planner + +--- + +## Performance Impact Estimates + +Based on typical ORLY workload (personal relay, ~100K events, ~50GB blobs): + +| Optimization | Read Latency | Write Latency | Storage | Complexity | +|-------------|--------------|---------------|---------|------------| +| Inline Small Events | -40% | +5% | +15% | Low | +| Content-Addressed Dedup | No change | -2% | -10% | Low | +| Author Directories | -90% (author queries) | +3% | +5% | Low | +| Kind-Specific Layouts | -30% | +10% | -25% | Medium | +| Blob Chunking | -50% (partial matches) | +15% | -20% | Medium | +| Transaction Log | +5% | +10% | +8% | High | +| Multi-Ordered Indexes | -80% (selective) | +20% | +150% | High | + +**Recommended First Steps:** +1. Inline small events (biggest win/effort ratio) +2. Content-addressed dedup (zero-risk improvement) +3. Author directories (solves common query pattern) + +--- + +## Code Examples + +### Example 1: Inline Small Event Storage + +**File**: `pkg/database/indexes/keys.go` (add after line 239) + +```go +// FullIdPubkeyInline stores small events inline to avoid second lookup +// +// 3 prefix|5 serial|32 ID|8 pubkey hash|8 timestamp|2 size| +var FullIdPubkeyInline = next() + +func FullIdPubkeyInlineVars() ( + ser *types.Uint40, fid *types.Id, p *types.PubHash, ca *types.Uint64, + size *types.Uint16, data []byte, +) { + return new(types.Uint40), new(types.Id), new(types.PubHash), + new(types.Uint64), new(types.Uint16), nil +} + +func FullIdPubkeyInlineEnc( + ser *types.Uint40, fid *types.Id, p *types.PubHash, ca *types.Uint64, + size *types.Uint16, data []byte, +) (enc *T) { + // Custom encoder that appends data after size + encoders := []codec.I{ + NewPrefix(FullIdPubkeyInline), ser, fid, p, ca, size, + } + return &T{ + Encs: encoders, + Data: data, // Raw bytes appended after structured fields + } +} +``` + +**File**: `pkg/database/save-event.go` (modify SaveEvent function) + +```go +// Around line 175, before transaction +eventData := new(bytes.Buffer) +ev.MarshalBinary(eventData) +eventBytes := eventData.Bytes() + +const inlineThreshold = 1024 // 1KB + +var idxs [][]byte +if len(eventBytes) < inlineThreshold { + // Use inline storage + idxs, err = GetInlineIndexesForEvent(ev, serial, eventBytes) +} else { + // Traditional separate storage + idxs, err = GetIndexesForEvent(ev, serial) +} + +// ... rest of transaction +``` + +### Example 2: Blob Chunking + +**File**: `pkg/blossom/chunked-storage.go` (new file) + +```go +package blossom + +import ( + "encoding/json" + "github.com/minio/sha256-simd" + "next.orly.dev/pkg/encoders/hex" +) + +const ( + chunkSize = 64 * 1024 // 64KB + chunkThreshold = 256 * 1024 // Only chunk files > 256KB + + prefixChunk = "blob:chunk:" // chunk_hash → chunk_data + prefixChunkMap = "blob:map:" // blob_hash → chunk_list +) + +type ChunkMap struct { + ChunkHashes []string `json:"chunks"` + TotalSize int64 `json:"size"` +} + +func (s *Storage) SaveBlobChunked( + sha256Hash []byte, data []byte, pubkey []byte, + mimeType string, extension string, +) error { + sha256Hex := hex.Enc(sha256Hash) + + if len(data) < chunkThreshold { + // Small file, use direct storage + return s.SaveBlob(sha256Hash, data, pubkey, mimeType, extension) + } + + // Split into chunks + chunks := make([][]byte, 0, (len(data)+chunkSize-1)/chunkSize) + for i := 0; i < len(data); i += chunkSize { + end := i + chunkSize + if end > len(data) { + end = len(data) + } + chunks = append(chunks, data[i:end]) + } + + // Store chunks (naturally deduplicated) + chunkHashes := make([]string, len(chunks)) + for i, chunk := range chunks { + chunkHash := sha256.Sum256(chunk) + chunkHashes[i] = hex.Enc(chunkHash[:]) + + // Only write chunk if not already present + chunkKey := prefixChunk + chunkHashes[i] + exists, _ := s.hasChunk(chunkKey) + if !exists { + s.db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte(chunkKey), chunk) + }) + } + } + + // Store chunk map + chunkMap := &ChunkMap{ + ChunkHashes: chunkHashes, + TotalSize: int64(len(data)), + } + mapData, _ := json.Marshal(chunkMap) + mapKey := prefixChunkMap + sha256Hex + + s.db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte(mapKey), mapData) + }) + + // Store metadata as usual + metadata := NewBlobMetadata(pubkey, mimeType, int64(len(data))) + metadata.Extension = extension + metaData, _ := metadata.Serialize() + metaKey := prefixBlobMeta + sha256Hex + + s.db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte(metaKey), metaData) + }) + + return nil +} + +func (s *Storage) GetBlobChunked(sha256Hash []byte) ([]byte, error) { + sha256Hex := hex.Enc(sha256Hash) + mapKey := prefixChunkMap + sha256Hex + + // Check if chunked + var chunkMap *ChunkMap + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(mapKey)) + if err == badger.ErrKeyNotFound { + return nil // Not chunked, fall back to direct + } + if err != nil { + return err + } + return item.Value(func(val []byte) error { + return json.Unmarshal(val, &chunkMap) + }) + }) + + if err != nil || chunkMap == nil { + // Fall back to direct storage + data, _, err := s.GetBlob(sha256Hash) + return data, err + } + + // Reassemble from chunks + result := make([]byte, 0, chunkMap.TotalSize) + for _, chunkHash := range chunkMap.ChunkHashes { + chunkKey := prefixChunk + chunkHash + var chunk []byte + s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(chunkKey)) + if err != nil { + return err + } + chunk, err = item.ValueCopy(nil) + return err + }) + result = append(result, chunk...) + } + + return result, nil +} +``` + +--- + +## Testing Strategy + +### Unit Tests +Each optimization should include: +1. **Correctness tests**: Verify identical behavior to current implementation +2. **Performance benchmarks**: Measure read/write latency improvements +3. **Storage tests**: Verify space savings + +### Integration Tests +1. **Migration tests**: Ensure backward compatibility +2. **Load tests**: Simulate relay workload +3. **Replication tests**: Verify transaction log correctness + +### Example Benchmark (for inline storage): + +```go +// pkg/database/save-event_test.go + +func BenchmarkSaveEventInline(b *testing.B) { + // Small event (typical note) + ev := &event.E{ + Kind: 1, + CreatedAt: uint64(time.Now().Unix()), + Content: "Hello Nostr world!", + // ... rest of event + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + db.SaveEvent(ctx, ev) + } +} + +func BenchmarkQueryEventsInline(b *testing.B) { + // Populate with 10K small events + // ... + + f := &filter.F{ + Authors: tag.NewFromBytesSlice(testPubkey), + Limit: ptrInt(20), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + events, _ := db.QueryEvents(ctx, f) + if len(events) != 20 { + b.Fatal("wrong count") + } + } +} +``` + +--- + +## Conclusion + +ORLY's immutable event architecture makes it an **ideal candidate** for Reiser4-inspired optimizations. The top recommendations are: + +1. **Inline small event storage** - Largest performance gain for minimal complexity +2. **Content-addressed deduplication** - Zero-risk storage savings +3. **Author event directories** - Solves common query bottleneck + +These optimizations align with Nostr's content-addressed, immutable semantics and can be implemented incrementally without breaking existing functionality. + +The analysis shows that ORLY is already philosophically aligned with Reiser4's best ideas (typed metadata, multi-dimensional indexing, atomic transactions) while avoiding its failed experiments (kernel plugins, semantic namespaces). Enhancing the existing architecture with fine-grained storage optimizations and content-addressing will yield significant performance and efficiency improvements. + +--- + +## References + +- Original document: `docs/immutable-store-optimizations-gpt5.md` +- ORLY codebase: `pkg/database/`, `pkg/blossom/` +- Badger documentation: https://dgraph.io/docs/badger/ +- Nostr protocol: https://github.com/nostr-protocol/nips diff --git a/pkg/crypto/keys/keys.go b/pkg/crypto/keys/keys.go index 86008ce..d9c254c 100644 --- a/pkg/crypto/keys/keys.go +++ b/pkg/crypto/keys/keys.go @@ -66,6 +66,29 @@ func SecretBytesToPubKeyHex(skb []byte) (pk string, err error) { return hex.Enc(signer.Pub()), nil } +// SecretBytesToPubKeyBytes generates a public key bytes from secret key bytes. +func SecretBytesToPubKeyBytes(skb []byte) (pkb []byte, err error) { + var signer *p8k.Signer + if signer, err = p8k.New(); chk.E(err) { + return + } + if err = signer.InitSec(skb); chk.E(err) { + return + } + return signer.Pub(), nil +} + +// SecretBytesToSigner creates a signer from secret key bytes. +func SecretBytesToSigner(skb []byte) (signer *p8k.Signer, err error) { + if signer, err = p8k.New(); chk.E(err) { + return + } + if err = signer.InitSec(skb); chk.E(err) { + return + } + return +} + // IsValid32ByteHex checks that a hex string is a valid 32 bytes lower case hex encoded value as // per nostr NIP-01 spec. func IsValid32ByteHex[V []byte | string](pk V) bool { diff --git a/pkg/database/dual-storage_test.go b/pkg/database/dual-storage_test.go new file mode 100644 index 0000000..c9c2f25 --- /dev/null +++ b/pkg/database/dual-storage_test.go @@ -0,0 +1,279 @@ +package database + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "next.orly.dev/pkg/encoders/event" + "next.orly.dev/pkg/encoders/kind" + "next.orly.dev/pkg/encoders/tag" + "next.orly.dev/pkg/encoders/timestamp" + "next.orly.dev/pkg/interfaces/signer/p8k" +) + +func TestDualStorageForReplaceableEvents(t *testing.T) { + // Create a temporary directory for the database + tempDir, err := os.MkdirTemp("", "test-dual-db-*") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + // Create a context and cancel function for the database + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Initialize the database + db, err := New(ctx, cancel, tempDir, "info") + require.NoError(t, err) + defer db.Close() + + // Create a signing key + sign := p8k.MustNew() + require.NoError(t, sign.Generate()) + + t.Run("SmallReplaceableEvent", func(t *testing.T) { + // Create a small replaceable event (kind 0 - profile metadata) + ev := event.New() + ev.Pubkey = sign.Pub() + ev.CreatedAt = timestamp.Now().V + ev.Kind = kind.ProfileMetadata.K + ev.Tags = tag.NewS() + ev.Content = []byte(`{"name":"Alice","about":"Test user"}`) + + require.NoError(t, ev.Sign(sign)) + + // Save the event + replaced, err := db.SaveEvent(ctx, ev) + require.NoError(t, err) + assert.False(t, replaced) + + // Fetch by serial - should work via sev key + ser, err := db.GetSerialById(ev.ID) + require.NoError(t, err) + require.NotNil(t, ser) + + fetched, err := db.FetchEventBySerial(ser) + require.NoError(t, err) + require.NotNil(t, fetched) + + // Verify event contents + assert.Equal(t, ev.ID, fetched.ID) + assert.Equal(t, ev.Pubkey, fetched.Pubkey) + assert.Equal(t, ev.Kind, fetched.Kind) + assert.Equal(t, ev.Content, fetched.Content) + }) + + t.Run("LargeReplaceableEvent", func(t *testing.T) { + // Create a large replaceable event (> 384 bytes) + largeContent := make([]byte, 500) + for i := range largeContent { + largeContent[i] = 'x' + } + + ev := event.New() + ev.Pubkey = sign.Pub() + ev.CreatedAt = timestamp.Now().V + 1 + ev.Kind = kind.ProfileMetadata.K + ev.Tags = tag.NewS() + ev.Content = largeContent + + require.NoError(t, ev.Sign(sign)) + + // Save the event + replaced, err := db.SaveEvent(ctx, ev) + require.NoError(t, err) + assert.True(t, replaced) // Should replace the previous profile + + // Fetch by serial - should work via evt key + ser, err := db.GetSerialById(ev.ID) + require.NoError(t, err) + require.NotNil(t, ser) + + fetched, err := db.FetchEventBySerial(ser) + require.NoError(t, err) + require.NotNil(t, fetched) + + // Verify event contents + assert.Equal(t, ev.ID, fetched.ID) + assert.Equal(t, ev.Content, fetched.Content) + }) +} + +func TestDualStorageForAddressableEvents(t *testing.T) { + // Create a temporary directory for the database + tempDir, err := os.MkdirTemp("", "test-addressable-db-*") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + // Create a context and cancel function for the database + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Initialize the database + db, err := New(ctx, cancel, tempDir, "info") + require.NoError(t, err) + defer db.Close() + + // Create a signing key + sign := p8k.MustNew() + require.NoError(t, sign.Generate()) + + t.Run("SmallAddressableEvent", func(t *testing.T) { + // Create a small addressable event (kind 30023 - long-form content) + ev := event.New() + ev.Pubkey = sign.Pub() + ev.CreatedAt = timestamp.Now().V + ev.Kind = 30023 + ev.Tags = tag.NewS( + tag.NewFromAny("d", []byte("my-article")), + tag.NewFromAny("title", []byte("Test Article")), + ) + ev.Content = []byte("This is a short article.") + + require.NoError(t, ev.Sign(sign)) + + // Save the event + replaced, err := db.SaveEvent(ctx, ev) + require.NoError(t, err) + assert.False(t, replaced) + + // Fetch by serial - should work via sev key + ser, err := db.GetSerialById(ev.ID) + require.NoError(t, err) + require.NotNil(t, ser) + + fetched, err := db.FetchEventBySerial(ser) + require.NoError(t, err) + require.NotNil(t, fetched) + + // Verify event contents + assert.Equal(t, ev.ID, fetched.ID) + assert.Equal(t, ev.Pubkey, fetched.Pubkey) + assert.Equal(t, ev.Kind, fetched.Kind) + assert.Equal(t, ev.Content, fetched.Content) + + // Verify d tag + dTag := fetched.Tags.GetFirst([]byte("d")) + require.NotNil(t, dTag) + assert.Equal(t, []byte("my-article"), dTag.Value()) + }) + + t.Run("AddressableEventWithoutDTag", func(t *testing.T) { + // Create an addressable event without d tag (should be treated as regular event) + ev := event.New() + ev.Pubkey = sign.Pub() + ev.CreatedAt = timestamp.Now().V + 1 + ev.Kind = 30023 + ev.Tags = tag.NewS() + ev.Content = []byte("Article without d tag") + + require.NoError(t, ev.Sign(sign)) + + // Save should fail with missing d tag error + _, err := db.SaveEvent(ctx, ev) + assert.Error(t, err) + assert.Contains(t, err.Error(), "missing a d tag") + }) + + t.Run("ReplaceAddressableEvent", func(t *testing.T) { + // Create first version + ev1 := event.New() + ev1.Pubkey = sign.Pub() + ev1.CreatedAt = timestamp.Now().V + ev1.Kind = 30023 + ev1.Tags = tag.NewS( + tag.NewFromAny("d", []byte("replaceable-article")), + ) + ev1.Content = []byte("Version 1") + + require.NoError(t, ev1.Sign(sign)) + + replaced, err := db.SaveEvent(ctx, ev1) + require.NoError(t, err) + assert.False(t, replaced) + + // Create second version (newer) + ev2 := event.New() + ev2.Pubkey = sign.Pub() + ev2.CreatedAt = ev1.CreatedAt + 10 + ev2.Kind = 30023 + ev2.Tags = tag.NewS( + tag.NewFromAny("d", []byte("replaceable-article")), + ) + ev2.Content = []byte("Version 2") + + require.NoError(t, ev2.Sign(sign)) + + replaced, err = db.SaveEvent(ctx, ev2) + require.NoError(t, err) + assert.True(t, replaced) + + // Try to save older version (should fail) + ev0 := event.New() + ev0.Pubkey = sign.Pub() + ev0.CreatedAt = ev1.CreatedAt - 10 + ev0.Kind = 30023 + ev0.Tags = tag.NewS( + tag.NewFromAny("d", []byte("replaceable-article")), + ) + ev0.Content = []byte("Version 0 (old)") + + require.NoError(t, ev0.Sign(sign)) + + replaced, err = db.SaveEvent(ctx, ev0) + assert.Error(t, err) + assert.Contains(t, err.Error(), "older than existing") + }) +} + +func TestDualStorageRegularEvents(t *testing.T) { + // Create a temporary directory for the database + tempDir, err := os.MkdirTemp("", "test-regular-db-*") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + // Create a context and cancel function for the database + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Initialize the database + db, err := New(ctx, cancel, tempDir, "info") + require.NoError(t, err) + defer db.Close() + + // Create a signing key + sign := p8k.MustNew() + require.NoError(t, sign.Generate()) + + t.Run("SmallRegularEvent", func(t *testing.T) { + // Create a small regular event (kind 1 - note) + ev := event.New() + ev.Pubkey = sign.Pub() + ev.CreatedAt = timestamp.Now().V + ev.Kind = kind.TextNote.K + ev.Tags = tag.NewS() + ev.Content = []byte("Hello, Nostr!") + + require.NoError(t, ev.Sign(sign)) + + // Save the event + replaced, err := db.SaveEvent(ctx, ev) + require.NoError(t, err) + assert.False(t, replaced) + + // Fetch by serial - should work via sev key + ser, err := db.GetSerialById(ev.ID) + require.NoError(t, err) + require.NotNil(t, ser) + + fetched, err := db.FetchEventBySerial(ser) + require.NoError(t, err) + require.NotNil(t, fetched) + + // Verify event contents + assert.Equal(t, ev.ID, fetched.ID) + assert.Equal(t, ev.Content, fetched.Content) + }) +} diff --git a/pkg/database/fetch-event-by-serial.go b/pkg/database/fetch-event-by-serial.go index d8daed0..ea00335 100644 --- a/pkg/database/fetch-event-by-serial.go +++ b/pkg/database/fetch-event-by-serial.go @@ -14,6 +14,55 @@ import ( func (d *D) FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error) { if err = d.View( func(txn *badger.Txn) (err error) { + // Helper function to extract inline event data from key + extractInlineData := func(key []byte, prefixLen int) (*event.E, error) { + if len(key) > prefixLen+2 { + sizeIdx := prefixLen + size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1]) + dataStart := sizeIdx + 2 + + if len(key) >= dataStart+size { + eventData := key[dataStart : dataStart+size] + ev := new(event.E) + if err := ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err != nil { + return nil, fmt.Errorf( + "error unmarshaling inline event (size=%d): %w", + size, err, + ) + } + return ev, nil + } + } + return nil, nil + } + + // Try sev (small event inline) prefix first - Reiser4 optimization + smallBuf := new(bytes.Buffer) + if err = indexes.SmallEventEnc(ser).MarshalWrite(smallBuf); chk.E(err) { + return + } + + opts := badger.DefaultIteratorOptions + opts.Prefix = smallBuf.Bytes() + opts.PrefetchValues = true + opts.PrefetchSize = 1 + it := txn.NewIterator(opts) + defer it.Close() + + it.Rewind() + if it.Valid() { + // Found in sev table - extract inline data + key := it.Item().Key() + // Key format: sev|serial|size_uint16|event_data + if ev, err = extractInlineData(key, 8); err != nil { + return err + } + if ev != nil { + return nil + } + } + + // Not found in sev table, try evt (traditional) prefix buf := new(bytes.Buffer) if err = indexes.EventEnc(ser).MarshalWrite(buf); chk.E(err) { return diff --git a/pkg/database/fetch-events-by-serials.go b/pkg/database/fetch-events-by-serials.go index d4c0042..9ada1db 100644 --- a/pkg/database/fetch-events-by-serials.go +++ b/pkg/database/fetch-events-by-serials.go @@ -15,47 +15,92 @@ import ( func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*event.E, err error) { // Pre-allocate map with estimated capacity to reduce reallocations events = make(map[uint64]*event.E, len(serials)) - + if len(serials) == 0 { return events, nil } - + if err = d.View( func(txn *badger.Txn) (err error) { for _, ser := range serials { + var ev *event.E + + // Try sev (small event inline) prefix first - Reiser4 optimization + smallBuf := new(bytes.Buffer) + if err = indexes.SmallEventEnc(ser).MarshalWrite(smallBuf); chk.E(err) { + // Skip this serial on error but continue with others + err = nil + continue + } + + // Iterate with prefix to find the small event key + opts := badger.DefaultIteratorOptions + opts.Prefix = smallBuf.Bytes() + opts.PrefetchValues = true + opts.PrefetchSize = 1 + it := txn.NewIterator(opts) + + it.Rewind() + if it.Valid() { + // Found in sev table - extract inline data + key := it.Item().Key() + // Key format: sev|serial|size_uint16|event_data + if len(key) > 8+2 { // prefix(3) + serial(5) + size(2) = 10 bytes minimum + sizeIdx := 8 // After sev(3) + serial(5) + // Read uint16 big-endian size + size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1]) + dataStart := sizeIdx + 2 + + if len(key) >= dataStart+size { + eventData := key[dataStart : dataStart+size] + ev = new(event.E) + if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err == nil { + events[ser.Get()] = ev + } + // Clean up and continue + it.Close() + err = nil + continue + } + } + } + it.Close() + + // Not found in sev table, try evt (traditional) prefix buf := new(bytes.Buffer) if err = indexes.EventEnc(ser).MarshalWrite(buf); chk.E(err) { // Skip this serial on error but continue with others + err = nil continue } - + var item *badger.Item if item, err = txn.Get(buf.Bytes()); err != nil { // Skip this serial if not found but continue with others err = nil continue } - + var v []byte if v, err = item.ValueCopy(nil); chk.E(err) { // Skip this serial on error but continue with others err = nil continue } - + // Check if we have valid data before attempting to unmarshal if len(v) < 32+32+1+2+1+1+64 { // ID + Pubkey + min varint fields + Sig // Skip this serial - incomplete data continue } - - ev := new(event.E) + + ev = new(event.E) if err = ev.UnmarshalBinary(bytes.NewBuffer(v)); err != nil { // Skip this serial on unmarshal error but continue with others err = nil continue } - + // Successfully unmarshaled event, add to results events[ser.Get()] = ev } @@ -64,6 +109,6 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev ); err != nil { return } - + return events, nil } \ No newline at end of file diff --git a/pkg/database/indexes/keys.go b/pkg/database/indexes/keys.go index 0442570..88a05c8 100644 --- a/pkg/database/indexes/keys.go +++ b/pkg/database/indexes/keys.go @@ -55,9 +55,12 @@ type I string func (i I) Write(w io.Writer) (n int, err error) { return w.Write([]byte(i)) } const ( - EventPrefix = I("evt") - IdPrefix = I("eid") - FullIdPubkeyPrefix = I("fpc") // full id, pubkey, created at + EventPrefix = I("evt") + SmallEventPrefix = I("sev") // small event with inline data (<=384 bytes) + ReplaceableEventPrefix = I("rev") // replaceable event (kinds 0,3,10000-19999) with inline data + AddressableEventPrefix = I("aev") // addressable event (kinds 30000-39999) with inline data + IdPrefix = I("eid") + FullIdPubkeyPrefix = I("fpc") // full id, pubkey, created at CreatedAtPrefix = I("c--") // created at KindPrefix = I("kc-") // kind, created at @@ -80,6 +83,12 @@ func Prefix(prf int) (i I) { switch prf { case Event: return EventPrefix + case SmallEvent: + return SmallEventPrefix + case ReplaceableEvent: + return ReplaceableEventPrefix + case AddressableEvent: + return AddressableEventPrefix case Id: return IdPrefix case FullIdPubkey: @@ -125,6 +134,12 @@ func Identify(r io.Reader) (i int, err error) { switch I(b[:]) { case EventPrefix: i = Event + case SmallEventPrefix: + i = SmallEvent + case ReplaceableEventPrefix: + i = ReplaceableEvent + case AddressableEventPrefix: + i = AddressableEvent case IdPrefix: i = Id case FullIdPubkeyPrefix: @@ -200,6 +215,53 @@ func EventEnc(ser *types.Uint40) (enc *T) { } func EventDec(ser *types.Uint40) (enc *T) { return New(NewPrefix(), ser) } +// SmallEvent stores events <=384 bytes with inline data to avoid double lookup. +// This is a Reiser4-inspired optimization for small event packing. +// 384 bytes covers: ID(32) + Pubkey(32) + Sig(64) + basic fields + small content +// +// prefix|5 serial|2 size_uint16|data (variable length, max 384 bytes) +var SmallEvent = next() + +func SmallEventVars() (ser *types.Uint40) { return new(types.Uint40) } +func SmallEventEnc(ser *types.Uint40) (enc *T) { + return New(NewPrefix(SmallEvent), ser) +} +func SmallEventDec(ser *types.Uint40) (enc *T) { return New(NewPrefix(), ser) } + +// ReplaceableEvent stores replaceable events (kinds 0,3,10000-19999) with inline data. +// Optimized storage for metadata events that are frequently replaced. +// Key format enables direct lookup by pubkey+kind without additional index traversal. +// +// prefix|8 pubkey_hash|2 kind|2 size_uint16|data (variable length, max 384 bytes) +var ReplaceableEvent = next() + +func ReplaceableEventVars() (p *types.PubHash, ki *types.Uint16) { + return new(types.PubHash), new(types.Uint16) +} +func ReplaceableEventEnc(p *types.PubHash, ki *types.Uint16) (enc *T) { + return New(NewPrefix(ReplaceableEvent), p, ki) +} +func ReplaceableEventDec(p *types.PubHash, ki *types.Uint16) (enc *T) { + return New(NewPrefix(), p, ki) +} + +// AddressableEvent stores parameterized replaceable events (kinds 30000-39999) with inline data. +// Optimized storage for addressable events identified by pubkey+kind+d-tag. +// Key format enables direct lookup without additional index traversal. +// +// prefix|8 pubkey_hash|2 kind|8 dtag_hash|2 size_uint16|data (variable length, max 384 bytes) +var AddressableEvent = next() + +func AddressableEventVars() (p *types.PubHash, ki *types.Uint16, d *types.Ident) { + return new(types.PubHash), new(types.Uint16), new(types.Ident) +} +func AddressableEventEnc(p *types.PubHash, ki *types.Uint16, d *types.Ident) (enc *T) { + return New(NewPrefix(AddressableEvent), p, ki, d) +} +func AddressableEventDec(p *types.PubHash, ki *types.Uint16, d *types.Ident) (enc *T) { + return New(NewPrefix(), p, ki, d) +} + // Id contains a truncated 8-byte hash of an event index. This is the secondary // key of an event, the primary key is the serial found in the Event. // diff --git a/pkg/database/inline-storage_test.go b/pkg/database/inline-storage_test.go new file mode 100644 index 0000000..1e5ba06 --- /dev/null +++ b/pkg/database/inline-storage_test.go @@ -0,0 +1,521 @@ +package database + +import ( + "bytes" + "context" + "os" + "testing" + "time" + + "github.com/dgraph-io/badger/v4" + "lol.mleku.dev/chk" + "next.orly.dev/pkg/database/indexes" + "next.orly.dev/pkg/database/indexes/types" + "next.orly.dev/pkg/encoders/event" + "next.orly.dev/pkg/encoders/hex" + "next.orly.dev/pkg/encoders/kind" + "next.orly.dev/pkg/encoders/tag" + "next.orly.dev/pkg/encoders/timestamp" + "next.orly.dev/pkg/interfaces/signer/p8k" +) + +// TestInlineSmallEventStorage tests the Reiser4-inspired inline storage optimization +// for small events (<=384 bytes). +func TestInlineSmallEventStorage(t *testing.T) { + // Create a temporary directory for the database + tempDir, err := os.MkdirTemp("", "test-inline-db-*") + if err != nil { + t.Fatalf("Failed to create temporary directory: %v", err) + } + defer os.RemoveAll(tempDir) + + // Create a context and cancel function for the database + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Initialize the database + db, err := New(ctx, cancel, tempDir, "info") + if err != nil { + t.Fatalf("Failed to create database: %v", err) + } + defer db.Close() + + // Create a signer + sign := p8k.MustNew() + if err := sign.Generate(); chk.E(err) { + t.Fatal(err) + } + + // Test Case 1: Small event (should use inline storage) + t.Run("SmallEventInlineStorage", func(t *testing.T) { + smallEvent := event.New() + smallEvent.Kind = kind.TextNote.K + smallEvent.CreatedAt = timestamp.Now().V + smallEvent.Content = []byte("Hello Nostr!") // Small content + smallEvent.Pubkey = sign.Pub() + smallEvent.Tags = tag.NewS() + + // Sign the event + if err := smallEvent.Sign(sign); err != nil { + t.Fatalf("Failed to sign small event: %v", err) + } + + // Save the event + if _, err := db.SaveEvent(ctx, smallEvent); err != nil { + t.Fatalf("Failed to save small event: %v", err) + } + + // Verify it was stored with sev prefix + serial, err := db.GetSerialById(smallEvent.ID) + if err != nil { + t.Fatalf("Failed to get serial for small event: %v", err) + } + + // Check that sev key exists + sevKeyExists := false + db.View(func(txn *badger.Txn) error { + smallBuf := new(bytes.Buffer) + indexes.SmallEventEnc(serial).MarshalWrite(smallBuf) + + opts := badger.DefaultIteratorOptions + opts.Prefix = smallBuf.Bytes() + it := txn.NewIterator(opts) + defer it.Close() + + it.Rewind() + if it.Valid() { + sevKeyExists = true + } + return nil + }) + + if !sevKeyExists { + t.Errorf("Small event was not stored with sev prefix") + } + + // Verify evt key does NOT exist for small event + evtKeyExists := false + db.View(func(txn *badger.Txn) error { + buf := new(bytes.Buffer) + indexes.EventEnc(serial).MarshalWrite(buf) + + _, err := txn.Get(buf.Bytes()) + if err == nil { + evtKeyExists = true + } + return nil + }) + + if evtKeyExists { + t.Errorf("Small event should not have evt key (should only use sev)") + } + + // Fetch and verify the event + fetchedEvent, err := db.FetchEventBySerial(serial) + if err != nil { + t.Fatalf("Failed to fetch small event: %v", err) + } + + if !bytes.Equal(fetchedEvent.ID, smallEvent.ID) { + t.Errorf("Fetched event ID mismatch: got %x, want %x", fetchedEvent.ID, smallEvent.ID) + } + if !bytes.Equal(fetchedEvent.Content, smallEvent.Content) { + t.Errorf("Fetched event content mismatch: got %q, want %q", fetchedEvent.Content, smallEvent.Content) + } + }) + + // Test Case 2: Large event (should use traditional storage) + t.Run("LargeEventTraditionalStorage", func(t *testing.T) { + largeEvent := event.New() + largeEvent.Kind = kind.TextNote.K + largeEvent.CreatedAt = timestamp.Now().V + // Create content larger than 384 bytes + largeContent := make([]byte, 500) + for i := range largeContent { + largeContent[i] = 'x' + } + largeEvent.Content = largeContent + largeEvent.Pubkey = sign.Pub() + largeEvent.Tags = tag.NewS() + + // Sign the event + if err := largeEvent.Sign(sign); err != nil { + t.Fatalf("Failed to sign large event: %v", err) + } + + // Save the event + if _, err := db.SaveEvent(ctx, largeEvent); err != nil { + t.Fatalf("Failed to save large event: %v", err) + } + + // Verify it was stored with evt prefix + serial, err := db.GetSerialById(largeEvent.ID) + if err != nil { + t.Fatalf("Failed to get serial for large event: %v", err) + } + + // Check that evt key exists + evtKeyExists := false + db.View(func(txn *badger.Txn) error { + buf := new(bytes.Buffer) + indexes.EventEnc(serial).MarshalWrite(buf) + + _, err := txn.Get(buf.Bytes()) + if err == nil { + evtKeyExists = true + } + return nil + }) + + if !evtKeyExists { + t.Errorf("Large event was not stored with evt prefix") + } + + // Fetch and verify the event + fetchedEvent, err := db.FetchEventBySerial(serial) + if err != nil { + t.Fatalf("Failed to fetch large event: %v", err) + } + + if !bytes.Equal(fetchedEvent.ID, largeEvent.ID) { + t.Errorf("Fetched event ID mismatch: got %x, want %x", fetchedEvent.ID, largeEvent.ID) + } + }) + + // Test Case 3: Batch fetch with mixed small and large events + t.Run("BatchFetchMixedEvents", func(t *testing.T) { + var serials []*types.Uint40 + expectedIDs := make(map[uint64][]byte) + + // Create 10 small events and 10 large events + for i := 0; i < 20; i++ { + ev := event.New() + ev.Kind = kind.TextNote.K + ev.CreatedAt = timestamp.Now().V + int64(i) + ev.Pubkey = sign.Pub() + ev.Tags = tag.NewS() + + // Alternate between small and large + if i%2 == 0 { + ev.Content = []byte("Small event") + } else { + largeContent := make([]byte, 500) + for j := range largeContent { + largeContent[j] = 'x' + } + ev.Content = largeContent + } + + if err := ev.Sign(sign); err != nil { + t.Fatalf("Failed to sign event %d: %v", i, err) + } + + if _, err := db.SaveEvent(ctx, ev); err != nil { + t.Fatalf("Failed to save event %d: %v", i, err) + } + + serial, err := db.GetSerialById(ev.ID) + if err != nil { + t.Fatalf("Failed to get serial for event %d: %v", i, err) + } + + serials = append(serials, serial) + expectedIDs[serial.Get()] = ev.ID + } + + // Batch fetch all events + events, err := db.FetchEventsBySerials(serials) + if err != nil { + t.Fatalf("Failed to batch fetch events: %v", err) + } + + if len(events) != 20 { + t.Errorf("Expected 20 events, got %d", len(events)) + } + + // Verify all events were fetched correctly + for serialValue, ev := range events { + expectedID := expectedIDs[serialValue] + if !bytes.Equal(ev.ID, expectedID) { + t.Errorf("Event ID mismatch for serial %d: got %x, want %x", + serialValue, ev.ID, expectedID) + } + } + }) + + // Test Case 4: Edge case - event near 384 byte threshold + t.Run("ThresholdEvent", func(t *testing.T) { + ev := event.New() + ev.Kind = kind.TextNote.K + ev.CreatedAt = timestamp.Now().V + ev.Pubkey = sign.Pub() + ev.Tags = tag.NewS() + + // Create content near the threshold + testContent := make([]byte, 250) + for i := range testContent { + testContent[i] = 'x' + } + ev.Content = testContent + + if err := ev.Sign(sign); err != nil { + t.Fatalf("Failed to sign threshold event: %v", err) + } + + if _, err := db.SaveEvent(ctx, ev); err != nil { + t.Fatalf("Failed to save threshold event: %v", err) + } + + serial, err := db.GetSerialById(ev.ID) + if err != nil { + t.Fatalf("Failed to get serial: %v", err) + } + + // Fetch and verify + fetchedEvent, err := db.FetchEventBySerial(serial) + if err != nil { + t.Fatalf("Failed to fetch threshold event: %v", err) + } + + if !bytes.Equal(fetchedEvent.ID, ev.ID) { + t.Errorf("Fetched event ID mismatch") + } + }) +} + +// TestInlineStorageMigration tests the migration from traditional to inline storage +func TestInlineStorageMigration(t *testing.T) { + // Create a temporary directory for the database + tempDir, err := os.MkdirTemp("", "test-migration-db-*") + if err != nil { + t.Fatalf("Failed to create temporary directory: %v", err) + } + defer os.RemoveAll(tempDir) + + // Create a context and cancel function for the database + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Initialize the database + db, err := New(ctx, cancel, tempDir, "info") + if err != nil { + t.Fatalf("Failed to create database: %v", err) + } + + // Create a signer + sign := p8k.MustNew() + if err := sign.Generate(); chk.E(err) { + t.Fatal(err) + } + + // Manually set database version to 3 (before inline storage migration) + db.writeVersionTag(3) + + // Create and save some small events the old way (manually) + var testEvents []*event.E + for i := 0; i < 5; i++ { + ev := event.New() + ev.Kind = kind.TextNote.K + ev.CreatedAt = timestamp.Now().V + int64(i) + ev.Content = []byte("Test event") + ev.Pubkey = sign.Pub() + ev.Tags = tag.NewS() + + if err := ev.Sign(sign); err != nil { + t.Fatalf("Failed to sign event: %v", err) + } + + // Get next serial + serial, err := db.seq.Next() + if err != nil { + t.Fatalf("Failed to get serial: %v", err) + } + + // Generate indexes + idxs, err := GetIndexesForEvent(ev, serial) + if err != nil { + t.Fatalf("Failed to generate indexes: %v", err) + } + + // Serialize event + eventDataBuf := new(bytes.Buffer) + ev.MarshalBinary(eventDataBuf) + eventData := eventDataBuf.Bytes() + + // Save the old way (evt prefix with value) + db.Update(func(txn *badger.Txn) error { + ser := new(types.Uint40) + ser.Set(serial) + + // Save indexes + for _, key := range idxs { + txn.Set(key, nil) + } + + // Save event the old way + keyBuf := new(bytes.Buffer) + indexes.EventEnc(ser).MarshalWrite(keyBuf) + txn.Set(keyBuf.Bytes(), eventData) + + return nil + }) + + testEvents = append(testEvents, ev) + } + + t.Logf("Created %d test events with old storage format", len(testEvents)) + + // Close and reopen database to trigger migration + db.Close() + + db, err = New(ctx, cancel, tempDir, "info") + if err != nil { + t.Fatalf("Failed to reopen database: %v", err) + } + defer db.Close() + + // Give migration time to complete + time.Sleep(100 * time.Millisecond) + + // Verify all events can still be fetched + for i, ev := range testEvents { + serial, err := db.GetSerialById(ev.ID) + if err != nil { + t.Fatalf("Failed to get serial for event %d after migration: %v", i, err) + } + + fetchedEvent, err := db.FetchEventBySerial(serial) + if err != nil { + t.Fatalf("Failed to fetch event %d after migration: %v", i, err) + } + + if !bytes.Equal(fetchedEvent.ID, ev.ID) { + t.Errorf("Event %d ID mismatch after migration: got %x, want %x", + i, fetchedEvent.ID, ev.ID) + } + + if !bytes.Equal(fetchedEvent.Content, ev.Content) { + t.Errorf("Event %d content mismatch after migration: got %q, want %q", + i, fetchedEvent.Content, ev.Content) + } + + // Verify it's now using inline storage + sevKeyExists := false + db.View(func(txn *badger.Txn) error { + smallBuf := new(bytes.Buffer) + indexes.SmallEventEnc(serial).MarshalWrite(smallBuf) + + opts := badger.DefaultIteratorOptions + opts.Prefix = smallBuf.Bytes() + it := txn.NewIterator(opts) + defer it.Close() + + it.Rewind() + if it.Valid() { + sevKeyExists = true + t.Logf("Event %d (%s) successfully migrated to inline storage", + i, hex.Enc(ev.ID[:8])) + } + return nil + }) + + if !sevKeyExists { + t.Errorf("Event %d was not migrated to inline storage", i) + } + } +} + +// BenchmarkInlineVsTraditionalStorage compares performance of inline vs traditional storage +func BenchmarkInlineVsTraditionalStorage(b *testing.B) { + // Create a temporary directory for the database + tempDir, err := os.MkdirTemp("", "bench-inline-db-*") + if err != nil { + b.Fatalf("Failed to create temporary directory: %v", err) + } + defer os.RemoveAll(tempDir) + + // Create a context and cancel function for the database + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Initialize the database + db, err := New(ctx, cancel, tempDir, "info") + if err != nil { + b.Fatalf("Failed to create database: %v", err) + } + defer db.Close() + + // Create a signer + sign := p8k.MustNew() + if err := sign.Generate(); chk.E(err) { + b.Fatal(err) + } + + // Pre-populate database with mix of small and large events + var smallSerials []*types.Uint40 + var largeSerials []*types.Uint40 + + for i := 0; i < 100; i++ { + // Small event + smallEv := event.New() + smallEv.Kind = kind.TextNote.K + smallEv.CreatedAt = timestamp.Now().V + int64(i)*2 + smallEv.Content = []byte("Small test event") + smallEv.Pubkey = sign.Pub() + smallEv.Tags = tag.NewS() + smallEv.Sign(sign) + + db.SaveEvent(ctx, smallEv) + if serial, err := db.GetSerialById(smallEv.ID); err == nil { + smallSerials = append(smallSerials, serial) + } + + // Large event + largeEv := event.New() + largeEv.Kind = kind.TextNote.K + largeEv.CreatedAt = timestamp.Now().V + int64(i)*2 + 1 + largeContent := make([]byte, 500) + for j := range largeContent { + largeContent[j] = 'x' + } + largeEv.Content = largeContent + largeEv.Pubkey = sign.Pub() + largeEv.Tags = tag.NewS() + largeEv.Sign(sign) + + db.SaveEvent(ctx, largeEv) + if serial, err := db.GetSerialById(largeEv.ID); err == nil { + largeSerials = append(largeSerials, serial) + } + } + + b.Run("FetchSmallEventsInline", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + idx := i % len(smallSerials) + db.FetchEventBySerial(smallSerials[idx]) + } + }) + + b.Run("FetchLargeEventsTraditional", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + idx := i % len(largeSerials) + db.FetchEventBySerial(largeSerials[idx]) + } + }) + + b.Run("BatchFetchSmallEvents", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + db.FetchEventsBySerials(smallSerials[:10]) + } + }) + + b.Run("BatchFetchLargeEvents", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + db.FetchEventsBySerials(largeSerials[:10]) + } + }) +} diff --git a/pkg/database/migrations.go b/pkg/database/migrations.go index 311a723..05b7fb9 100644 --- a/pkg/database/migrations.go +++ b/pkg/database/migrations.go @@ -12,10 +12,11 @@ import ( "next.orly.dev/pkg/database/indexes/types" "next.orly.dev/pkg/encoders/event" "next.orly.dev/pkg/encoders/ints" + "next.orly.dev/pkg/encoders/kind" ) const ( - currentVersion uint32 = 3 + currentVersion uint32 = 4 ) func (d *D) RunMigrations() { @@ -82,6 +83,13 @@ func (d *D) RunMigrations() { // bump to version 3 _ = d.writeVersionTag(3) } + if dbVersion < 4 { + log.I.F("migrating to version 4...") + // convert small events to inline storage (Reiser4 optimization) + d.ConvertSmallEventsToInline() + // bump to version 4 + _ = d.writeVersionTag(4) + } } // writeVersionTag writes a new version tag key to the database (no value) @@ -323,3 +331,209 @@ func (d *D) CleanupEphemeralEvents() { log.I.F("cleaned up %d ephemeral events from database", deletedCount) } + +// ConvertSmallEventsToInline migrates small events (<=384 bytes) to inline storage. +// This is a Reiser4-inspired optimization that stores small event data in the key itself, +// avoiding a second database lookup and improving query performance. +// Also handles replaceable and addressable events with specialized storage. +func (d *D) ConvertSmallEventsToInline() { + log.I.F("converting events to optimized inline storage (Reiser4 optimization)...") + var err error + const smallEventThreshold = 384 + + type EventData struct { + Serial uint64 + EventData []byte + OldKey []byte + IsReplaceable bool + IsAddressable bool + Pubkey []byte + Kind uint16 + DTag []byte + } + + var events []EventData + var convertedCount int + var deletedCount int + + // Helper function for counting by predicate + countBy := func(events []EventData, predicate func(EventData) bool) int { + count := 0 + for _, e := range events { + if predicate(e) { + count++ + } + } + return count + } + + // First pass: identify events in evt table that can benefit from inline storage + if err = d.View( + func(txn *badger.Txn) (err error) { + prf := new(bytes.Buffer) + if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) { + return + } + it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()}) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + var val []byte + if val, err = item.ValueCopy(nil); chk.E(err) { + continue + } + + // Check if event data is small enough for inline storage + if len(val) <= smallEventThreshold { + // Decode event to check if it's replaceable or addressable + ev := new(event.E) + if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) { + continue + } + + // Extract serial from key + key := item.KeyCopy(nil) + ser := indexes.EventVars() + if err = indexes.EventDec(ser).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) { + continue + } + + eventData := EventData{ + Serial: ser.Get(), + EventData: val, + OldKey: key, + IsReplaceable: kind.IsReplaceable(ev.Kind), + IsAddressable: kind.IsParameterizedReplaceable(ev.Kind), + Pubkey: ev.Pubkey, + Kind: ev.Kind, + } + + // Extract d-tag for addressable events + if eventData.IsAddressable { + dTag := ev.Tags.GetFirst([]byte("d")) + if dTag != nil { + eventData.DTag = dTag.Value() + } + } + + events = append(events, eventData) + } + } + return nil + }, + ); chk.E(err) { + return + } + + log.I.F("found %d events to convert (%d regular, %d replaceable, %d addressable)", + len(events), + countBy(events, func(e EventData) bool { return !e.IsReplaceable && !e.IsAddressable }), + countBy(events, func(e EventData) bool { return e.IsReplaceable }), + countBy(events, func(e EventData) bool { return e.IsAddressable }), + ) + + // Second pass: convert in batches to avoid large transactions + const batchSize = 1000 + for i := 0; i < len(events); i += batchSize { + end := i + batchSize + if end > len(events) { + end = len(events) + } + batch := events[i:end] + + // Write new inline keys and delete old keys + if err = d.Update( + func(txn *badger.Txn) (err error) { + for _, e := range batch { + // First, write the sev key for serial-based access (all small events) + sevKeyBuf := new(bytes.Buffer) + ser := new(types.Uint40) + if err = ser.Set(e.Serial); chk.E(err) { + continue + } + + if err = indexes.SmallEventEnc(ser).MarshalWrite(sevKeyBuf); chk.E(err) { + continue + } + + // Append size as uint16 big-endian (2 bytes) + sizeBytes := []byte{byte(len(e.EventData) >> 8), byte(len(e.EventData))} + sevKeyBuf.Write(sizeBytes) + + // Append event data + sevKeyBuf.Write(e.EventData) + + // Write sev key (no value needed) + if err = txn.Set(sevKeyBuf.Bytes(), nil); chk.E(err) { + log.W.F("failed to write sev key for serial %d: %v", e.Serial, err) + continue + } + convertedCount++ + + // Additionally, for replaceable/addressable events, write specialized keys + if e.IsAddressable && len(e.DTag) > 0 { + // Addressable event: aev|pubkey_hash|kind|dtag_hash|size|data + aevKeyBuf := new(bytes.Buffer) + pubHash := new(types.PubHash) + pubHash.FromPubkey(e.Pubkey) + kindVal := new(types.Uint16) + kindVal.Set(e.Kind) + dTagHash := new(types.Ident) + dTagHash.FromIdent(e.DTag) + + if err = indexes.AddressableEventEnc(pubHash, kindVal, dTagHash).MarshalWrite(aevKeyBuf); chk.E(err) { + continue + } + + // Append size and data + aevKeyBuf.Write(sizeBytes) + aevKeyBuf.Write(e.EventData) + + if err = txn.Set(aevKeyBuf.Bytes(), nil); chk.E(err) { + log.W.F("failed to write aev key for serial %d: %v", e.Serial, err) + continue + } + } else if e.IsReplaceable { + // Replaceable event: rev|pubkey_hash|kind|size|data + revKeyBuf := new(bytes.Buffer) + pubHash := new(types.PubHash) + pubHash.FromPubkey(e.Pubkey) + kindVal := new(types.Uint16) + kindVal.Set(e.Kind) + + if err = indexes.ReplaceableEventEnc(pubHash, kindVal).MarshalWrite(revKeyBuf); chk.E(err) { + continue + } + + // Append size and data + revKeyBuf.Write(sizeBytes) + revKeyBuf.Write(e.EventData) + + if err = txn.Set(revKeyBuf.Bytes(), nil); chk.E(err) { + log.W.F("failed to write rev key for serial %d: %v", e.Serial, err) + continue + } + } + + // Delete old evt key + if err = txn.Delete(e.OldKey); chk.E(err) { + log.W.F("failed to delete old event key for serial %d: %v", e.Serial, err) + continue + } + deletedCount++ + } + return nil + }, + ); chk.E(err) { + log.W.F("batch update failed: %v", err) + continue + } + + if (i/batchSize)%10 == 0 && i > 0 { + log.I.F("progress: %d/%d events converted", i, len(events)) + } + } + + log.I.F("migration complete: converted %d events to optimized inline storage, deleted %d old keys", convertedCount, deletedCount) +} diff --git a/pkg/database/save-event.go b/pkg/database/save-event.go index bda5a19..45e5659 100644 --- a/pkg/database/save-event.go +++ b/pkg/database/save-event.go @@ -177,6 +177,19 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) ( return } log.T.F("SaveEvent: generated %d indexes for event %x (kind %d)", len(idxs), ev.ID, ev.Kind) + + // Serialize event once to check size + eventDataBuf := new(bytes.Buffer) + ev.MarshalBinary(eventDataBuf) + eventData := eventDataBuf.Bytes() + + // Determine storage strategy (Reiser4 optimizations) + // 384 bytes covers: ID(32) + Pubkey(32) + Sig(64) + basic fields + small content + const smallEventThreshold = 384 + isSmallEvent := len(eventData) <= smallEventThreshold + isReplaceableEvent := kind.IsReplaceable(ev.Kind) + isAddressableEvent := kind.IsParameterizedReplaceable(ev.Kind) + // Start a transaction to save the event and all its indexes err = d.Update( func(txn *badger.Txn) (err error) { @@ -185,26 +198,98 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) ( if err = ser.Set(serial); chk.E(err) { return } - keyBuf := new(bytes.Buffer) - if err = indexes.EventEnc(ser).MarshalWrite(keyBuf); chk.E(err) { - return - } - kb := keyBuf.Bytes() - - // Pre-allocate value buffer - valueBuf := new(bytes.Buffer) - ev.MarshalBinary(valueBuf) - vb := valueBuf.Bytes() - + // Save each index for _, key := range idxs { if err = txn.Set(key, nil); chk.E(err) { return } } - // write the event - if err = txn.Set(kb, vb); chk.E(err) { - return + + // Write the event using optimized storage strategy + // Determine if we should use inline addressable/replaceable storage + useAddressableInline := false + var dTag *tag.T + if isAddressableEvent && isSmallEvent { + dTag = ev.Tags.GetFirst([]byte("d")) + useAddressableInline = dTag != nil + } + + // All small events get a sev key for serial-based access + if isSmallEvent { + // Small event: store inline with sev prefix + // Format: sev|serial|size_uint16|event_data + keyBuf := new(bytes.Buffer) + if err = indexes.SmallEventEnc(ser).MarshalWrite(keyBuf); chk.E(err) { + return + } + // Append size as uint16 big-endian (2 bytes for size up to 65535) + sizeBytes := []byte{byte(len(eventData) >> 8), byte(len(eventData))} + keyBuf.Write(sizeBytes) + // Append event data + keyBuf.Write(eventData) + + if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) { + return + } + log.T.F("SaveEvent: stored small event inline (%d bytes)", len(eventData)) + } else { + // Large event: store separately with evt prefix + keyBuf := new(bytes.Buffer) + if err = indexes.EventEnc(ser).MarshalWrite(keyBuf); chk.E(err) { + return + } + if err = txn.Set(keyBuf.Bytes(), eventData); chk.E(err) { + return + } + log.T.F("SaveEvent: stored large event separately (%d bytes)", len(eventData)) + } + + // Additionally, store replaceable/addressable events with specialized keys for direct access + if useAddressableInline { + // Addressable event: also store with aev|pubkey_hash|kind|dtag_hash|size|data + pubHash := new(types.PubHash) + pubHash.FromPubkey(ev.Pubkey) + kindVal := new(types.Uint16) + kindVal.Set(ev.Kind) + dTagHash := new(types.Ident) + dTagHash.FromIdent(dTag.Value()) + + keyBuf := new(bytes.Buffer) + if err = indexes.AddressableEventEnc(pubHash, kindVal, dTagHash).MarshalWrite(keyBuf); chk.E(err) { + return + } + // Append size as uint16 big-endian + sizeBytes := []byte{byte(len(eventData) >> 8), byte(len(eventData))} + keyBuf.Write(sizeBytes) + // Append event data + keyBuf.Write(eventData) + + if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) { + return + } + log.T.F("SaveEvent: also stored addressable event with specialized key") + } else if isReplaceableEvent && isSmallEvent { + // Replaceable event: also store with rev|pubkey_hash|kind|size|data + pubHash := new(types.PubHash) + pubHash.FromPubkey(ev.Pubkey) + kindVal := new(types.Uint16) + kindVal.Set(ev.Kind) + + keyBuf := new(bytes.Buffer) + if err = indexes.ReplaceableEventEnc(pubHash, kindVal).MarshalWrite(keyBuf); chk.E(err) { + return + } + // Append size as uint16 big-endian + sizeBytes := []byte{byte(len(eventData) >> 8), byte(len(eventData))} + keyBuf.Write(sizeBytes) + // Append event data + keyBuf.Write(eventData) + + if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) { + return + } + log.T.F("SaveEvent: also stored replaceable event with specialized key") } return },