Compare commits

..

4 Commits

Author SHA1 Message Date
290fcbf8f0 remove outdated configuration items for obsolete tail packing optimization
Some checks failed
Go / build-and-release (push) Has been cancelled
2025-12-03 21:24:43 +00:00
54ead81791 merge authors/nostruser in neo4j, add compact pubkey/e/p serial refs
Some checks failed
Go / build-and-release (push) Has been cancelled
2025-12-03 20:49:49 +00:00
746523ea78 Add support for read/write permissive overrides in policies
Some checks failed
Go / build-and-release (push) Has been cancelled
Introduce `read_allow_permissive` and `write_allow_permissive` flags in the global rule to override kind whitelists for read or write operations. These flags allow more flexible policy configurations while maintaining blacklist enforcement and preventing conflicting settings. Updated tests and documentation for clarity.
2025-12-03 20:26:49 +00:00
52189633d9 Unify NostrUser and Author nodes; add migrations support
Some checks failed
Go / build-and-release (push) Has been cancelled
Merged 'Author' nodes into 'NostrUser' for unified identity tracking and social graph representation. Introduced migrations framework to handle schema changes, including retroactive updates for existing relationships and constraints. Updated tests, schema definitions, and documentation to reflect these changes.
2025-12-03 20:02:41 +00:00
35 changed files with 2521 additions and 586 deletions

View File

@@ -176,7 +176,11 @@
"Bash(xxd:*)",
"Bash(CGO_ENABLED=0 go mod tidy:*)",
"WebFetch(domain:git.mleku.dev)",
"Bash(CGO_ENABLED=0 LOG_LEVEL=trace go test:*)"
"Bash(CGO_ENABLED=0 LOG_LEVEL=trace go test:*)",
"Bash(go vet:*)",
"Bash(gofmt:*)",
"Skill(cypher)",
"Bash(git mv:*)"
],
"deny": [],
"ask": []

View File

@@ -155,7 +155,11 @@ export ORLY_QUERY_CACHE_MAX_AGE=5m # Cache expiry time
# Database cache tuning (for Badger backend)
export ORLY_DB_BLOCK_CACHE_MB=512 # Block cache size
export ORLY_DB_INDEX_CACHE_MB=256 # Index cache size
export ORLY_INLINE_EVENT_THRESHOLD=1024 # Inline storage threshold (bytes)
export ORLY_DB_ZSTD_LEVEL=1 # ZSTD level: 0=off, 1=fast, 3=default, 9=best
# Serial cache for compact event storage (Badger backend)
export ORLY_SERIAL_CACHE_PUBKEYS=100000 # Max pubkeys to cache (~3.2MB memory)
export ORLY_SERIAL_CACHE_EVENT_IDS=500000 # Max event IDs to cache (~16MB memory)
# Directory Spider (metadata sync from other relays)
export ORLY_DIRECTORY_SPIDER=true # Enable directory spider
@@ -702,6 +706,14 @@ ORLY has received several significant performance improvements in recent updates
- Dramatically reduces database load for repeated queries (common in Nostr clients)
- Cache key includes normalized filter representation for optimal hit rate
### Compact Event Storage (Latest)
- Events stored with 5-byte serial references instead of 32-byte IDs/pubkeys
- Achieves up to 40% space savings on event data
- Serial cache for fast lookups (configurable via `ORLY_SERIAL_CACHE_PUBKEYS` and `ORLY_SERIAL_CACHE_EVENT_IDS`)
- Automatic migration from legacy format (version 6)
- Cleanup removes redundant legacy storage after migration
- Storage stats available via `db.CompactStorageStats()` and `db.LogCompactSavings()`
### Badger Cache Tuning
- Optimized block cache (default 512MB, tune via `ORLY_DB_BLOCK_CACHE_MB`)
- Optimized index cache (default 256MB, tune via `ORLY_DB_INDEX_CACHE_MB`)

View File

@@ -41,6 +41,7 @@ type C struct {
DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"`
DBBlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"512" usage:"Badger block cache size in MB (higher improves read hit ratio)"`
DBIndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"256" usage:"Badger index cache size in MB (improves index lookup performance)"`
DBZSTDLevel int `env:"ORLY_DB_ZSTD_LEVEL" default:"1" usage:"Badger ZSTD compression level (1=fast/500MB/s, 3=default, 9=best ratio, 0=disable)"`
LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"`
Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation,heap,block,goroutine,threadcreate,mutex"`
PprofPath string `env:"ORLY_PPROF_PATH" usage:"optional directory to write pprof profiles into (inside container); default is temporary dir"`
@@ -100,7 +101,8 @@ type C struct {
Neo4jPassword string `env:"ORLY_NEO4J_PASSWORD" default:"password" usage:"Neo4j authentication password (only used when ORLY_DB_TYPE=neo4j)"`
// Advanced database tuning
InlineEventThreshold int `env:"ORLY_INLINE_EVENT_THRESHOLD" default:"1024" usage:"size threshold in bytes for inline event storage in Badger (0 to disable, typical values: 384-1024)"`
SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"100000" usage:"max pubkeys to cache for compact event storage (default: 100000, ~3.2MB memory)"`
SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"500000" usage:"max event IDs to cache for compact event storage (default: 500000, ~16MB memory)"`
// TLS configuration
TLSDomains []string `env:"ORLY_TLS_DOMAINS" usage:"comma-separated list of domains to respond to for TLS"`
@@ -408,7 +410,8 @@ func (cfg *C) GetDatabaseConfigValues() (
dataDir, logLevel string,
blockCacheMB, indexCacheMB, queryCacheSizeMB int,
queryCacheMaxAge time.Duration,
inlineEventThreshold int,
serialCachePubkeys, serialCacheEventIds int,
zstdLevel int,
neo4jURI, neo4jUser, neo4jPassword string,
) {
// Parse query cache max age from string to duration
@@ -422,6 +425,7 @@ func (cfg *C) GetDatabaseConfigValues() (
return cfg.DataDir, cfg.DBLogLevel,
cfg.DBBlockCacheMB, cfg.DBIndexCacheMB, cfg.QueryCacheSizeMB,
queryCacheMaxAge,
cfg.InlineEventThreshold,
cfg.SerialCachePubkeys, cfg.SerialCacheEventIds,
cfg.DBZSTDLevel,
cfg.Neo4jURI, cfg.Neo4jUser, cfg.Neo4jPassword
}

25
main.go
View File

@@ -444,19 +444,22 @@ func makeDatabaseConfig(cfg *config.C) *database.DatabaseConfig {
dataDir, logLevel,
blockCacheMB, indexCacheMB, queryCacheSizeMB,
queryCacheMaxAge,
inlineEventThreshold,
serialCachePubkeys, serialCacheEventIds,
zstdLevel,
neo4jURI, neo4jUser, neo4jPassword := cfg.GetDatabaseConfigValues()
return &database.DatabaseConfig{
DataDir: dataDir,
LogLevel: logLevel,
BlockCacheMB: blockCacheMB,
IndexCacheMB: indexCacheMB,
QueryCacheSizeMB: queryCacheSizeMB,
QueryCacheMaxAge: queryCacheMaxAge,
InlineEventThreshold: inlineEventThreshold,
Neo4jURI: neo4jURI,
Neo4jUser: neo4jUser,
Neo4jPassword: neo4jPassword,
DataDir: dataDir,
LogLevel: logLevel,
BlockCacheMB: blockCacheMB,
IndexCacheMB: indexCacheMB,
QueryCacheSizeMB: queryCacheSizeMB,
QueryCacheMaxAge: queryCacheMaxAge,
SerialCachePubkeys: serialCachePubkeys,
SerialCacheEventIds: serialCacheEventIds,
ZSTDLevel: zstdLevel,
Neo4jURI: neo4jURI,
Neo4jUser: neo4jUser,
Neo4jPassword: neo4jPassword,
}
}

View File

@@ -19,11 +19,12 @@ import (
"git.mleku.dev/mleku/nostr/interfaces/signer/p8k"
)
// TestInlineSmallEventStorage tests the Reiser4-inspired inline storage optimization
// for small events (<=1024 bytes by default).
func TestInlineSmallEventStorage(t *testing.T) {
// TestCompactEventStorage tests the compact storage format (cmp prefix) which
// replaced the old inline storage optimization (sev/evt prefixes).
// All events are now stored in compact format regardless of size.
func TestCompactEventStorage(t *testing.T) {
// Create a temporary directory for the database
tempDir, err := os.MkdirTemp("", "test-inline-db-*")
tempDir, err := os.MkdirTemp("", "test-compact-db-*")
if err != nil {
t.Fatalf("Failed to create temporary directory: %v", err)
}
@@ -46,8 +47,8 @@ func TestInlineSmallEventStorage(t *testing.T) {
t.Fatal(err)
}
// Test Case 1: Small event (should use inline storage)
t.Run("SmallEventInlineStorage", func(t *testing.T) {
// Test Case 1: Small event (should use compact storage)
t.Run("SmallEventCompactStorage", func(t *testing.T) {
smallEvent := event.New()
smallEvent.Kind = kind.TextNote.K
smallEvent.CreatedAt = timestamp.Now().V
@@ -65,49 +66,27 @@ func TestInlineSmallEventStorage(t *testing.T) {
t.Fatalf("Failed to save small event: %v", err)
}
// Verify it was stored with sev prefix
// Verify it was stored with cmp prefix
serial, err := db.GetSerialById(smallEvent.ID)
if err != nil {
t.Fatalf("Failed to get serial for small event: %v", err)
}
// Check that sev key exists
sevKeyExists := false
// Check that cmp key exists (compact format)
cmpKeyExists := false
db.View(func(txn *badger.Txn) error {
smallBuf := new(bytes.Buffer)
indexes.SmallEventEnc(serial).MarshalWrite(smallBuf)
cmpBuf := new(bytes.Buffer)
indexes.CompactEventEnc(serial).MarshalWrite(cmpBuf)
opts := badger.DefaultIteratorOptions
opts.Prefix = smallBuf.Bytes()
it := txn.NewIterator(opts)
defer it.Close()
it.Rewind()
if it.Valid() {
sevKeyExists = true
}
return nil
})
if !sevKeyExists {
t.Errorf("Small event was not stored with sev prefix")
}
// Verify evt key does NOT exist for small event
evtKeyExists := false
db.View(func(txn *badger.Txn) error {
buf := new(bytes.Buffer)
indexes.EventEnc(serial).MarshalWrite(buf)
_, err := txn.Get(buf.Bytes())
_, err := txn.Get(cmpBuf.Bytes())
if err == nil {
evtKeyExists = true
cmpKeyExists = true
}
return nil
})
if evtKeyExists {
t.Errorf("Small event should not have evt key (should only use sev)")
if !cmpKeyExists {
t.Errorf("Small event was not stored with cmp prefix (compact format)")
}
// Fetch and verify the event
@@ -124,12 +103,12 @@ func TestInlineSmallEventStorage(t *testing.T) {
}
})
// Test Case 2: Large event (should use traditional storage)
t.Run("LargeEventTraditionalStorage", func(t *testing.T) {
// Test Case 2: Large event (should also use compact storage)
t.Run("LargeEventCompactStorage", func(t *testing.T) {
largeEvent := event.New()
largeEvent.Kind = kind.TextNote.K
largeEvent.CreatedAt = timestamp.Now().V
// Create content larger than 1024 bytes (the default inline storage threshold)
// Create larger content
largeContent := make([]byte, 1500)
for i := range largeContent {
largeContent[i] = 'x'
@@ -148,27 +127,27 @@ func TestInlineSmallEventStorage(t *testing.T) {
t.Fatalf("Failed to save large event: %v", err)
}
// Verify it was stored with evt prefix
// Verify it was stored with cmp prefix (compact format)
serial, err := db.GetSerialById(largeEvent.ID)
if err != nil {
t.Fatalf("Failed to get serial for large event: %v", err)
}
// Check that evt key exists
evtKeyExists := false
// Check that cmp key exists
cmpKeyExists := false
db.View(func(txn *badger.Txn) error {
buf := new(bytes.Buffer)
indexes.EventEnc(serial).MarshalWrite(buf)
cmpBuf := new(bytes.Buffer)
indexes.CompactEventEnc(serial).MarshalWrite(cmpBuf)
_, err := txn.Get(buf.Bytes())
_, err := txn.Get(cmpBuf.Bytes())
if err == nil {
evtKeyExists = true
cmpKeyExists = true
}
return nil
})
if !evtKeyExists {
t.Errorf("Large event was not stored with evt prefix")
if !cmpKeyExists {
t.Errorf("Large event was not stored with cmp prefix (compact format)")
}
// Fetch and verify the event
@@ -399,9 +378,11 @@ func TestInlineStorageMigration(t *testing.T) {
i, fetchedEvent.Content, ev.Content)
}
// Verify it's now using inline storage
sevKeyExists := false
// Verify it's now using optimized storage (sev inline OR cmp compact format)
// The migration may convert to sev (version 4) or cmp (version 6) depending on migration order
optimizedStorageExists := false
db.View(func(txn *badger.Txn) error {
// Check for sev (small event inline) format
smallBuf := new(bytes.Buffer)
indexes.SmallEventEnc(serial).MarshalWrite(smallBuf)
@@ -412,21 +393,31 @@ func TestInlineStorageMigration(t *testing.T) {
it.Rewind()
if it.Valid() {
sevKeyExists = true
t.Logf("Event %d (%s) successfully migrated to inline storage",
optimizedStorageExists = true
t.Logf("Event %d (%s) successfully migrated to inline (sev) storage",
i, hex.Enc(ev.ID[:8]))
return nil
}
// Check for cmp (compact format) storage
cmpBuf := new(bytes.Buffer)
indexes.CompactEventEnc(serial).MarshalWrite(cmpBuf)
if _, err := txn.Get(cmpBuf.Bytes()); err == nil {
optimizedStorageExists = true
t.Logf("Event %d (%s) successfully migrated to compact (cmp) storage",
i, hex.Enc(ev.ID[:8]))
}
return nil
})
if !sevKeyExists {
t.Errorf("Event %d was not migrated to inline storage", i)
if !optimizedStorageExists {
t.Errorf("Event %d was not migrated to optimized storage (sev or cmp)", i)
}
}
}
// BenchmarkInlineVsTraditionalStorage compares performance of inline vs traditional storage
func BenchmarkInlineVsTraditionalStorage(b *testing.B) {
// BenchmarkCompactStorage benchmarks the compact storage format performance
func BenchmarkCompactStorage(b *testing.B) {
// Create a temporary directory for the database
tempDir, err := os.MkdirTemp("", "bench-inline-db-*")
if err != nil {
@@ -489,7 +480,7 @@ func BenchmarkInlineVsTraditionalStorage(b *testing.B) {
}
}
b.Run("FetchSmallEventsInline", func(b *testing.B) {
b.Run("FetchSmallEventsCompact", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
idx := i % len(smallSerials)
@@ -497,7 +488,7 @@ func BenchmarkInlineVsTraditionalStorage(b *testing.B) {
}
})
b.Run("FetchLargeEventsTraditional", func(b *testing.B) {
b.Run("FetchLargeEventsCompact", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
idx := i % len(largeSerials)

View File

@@ -0,0 +1,421 @@
//go:build !(js && wasm)
package database
import (
"bytes"
"encoding/binary"
"errors"
"io"
"git.mleku.dev/mleku/nostr/crypto/ec/schnorr"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/tag"
"git.mleku.dev/mleku/nostr/encoders/varint"
"lol.mleku.dev/chk"
)
// CompactEventFormat defines the binary format for compact event storage.
// This format uses 5-byte serial references instead of 32-byte IDs/pubkeys,
// dramatically reducing storage requirements.
//
// Format:
// - 1 byte: Version (currently 1)
// - 5 bytes: Author pubkey serial (reference to spk table)
// - varint: CreatedAt timestamp
// - 2 bytes: Kind (uint16 big-endian)
// - varint: Number of tags
// - For each tag:
// - varint: Number of elements in tag
// - For each element:
// - 1 byte: Element type flag
// - 0x00 = raw bytes (followed by varint length + data)
// - 0x01 = pubkey serial reference (followed by 5-byte serial)
// - 0x02 = event ID serial reference (followed by 5-byte serial)
// - 0x03 = unknown event ID (followed by 32-byte full ID)
// - Element data based on type
// - varint: Content length
// - Content bytes
// - 64 bytes: Signature
//
// Space savings example (event with 3 p-tags, 1 e-tag):
// - Original: 32 (ID) + 32 (pubkey) + 32*4 (tags) = 192 bytes
// - Compact: 5 (pubkey serial) + 5*4 (tag serials) = 25 bytes
// - Savings: 167 bytes per event (87%)
const (
CompactFormatVersion = 1
// Tag element type flags
TagElementRaw = 0x00 // Raw bytes (varint length + data)
TagElementPubkeySerial = 0x01 // Pubkey serial reference (5 bytes)
TagElementEventSerial = 0x02 // Event ID serial reference (5 bytes)
TagElementEventIdFull = 0x03 // Full event ID (32 bytes) - for unknown refs
)
// SerialResolver is an interface for resolving serials during compact encoding/decoding.
// This allows the encoder/decoder to look up or create serial mappings.
type SerialResolver interface {
// GetOrCreatePubkeySerial returns the serial for a pubkey, creating one if needed.
GetOrCreatePubkeySerial(pubkey []byte) (serial uint64, err error)
// GetPubkeyBySerial returns the full pubkey for a serial.
GetPubkeyBySerial(serial uint64) (pubkey []byte, err error)
// GetEventSerialById returns the serial for an event ID, or 0 if not found.
GetEventSerialById(eventId []byte) (serial uint64, found bool, err error)
// GetEventIdBySerial returns the full event ID for a serial.
GetEventIdBySerial(serial uint64) (eventId []byte, err error)
}
// MarshalCompactEvent encodes an event using compact serial references.
// The resolver is used to look up/create serial mappings for pubkeys and event IDs.
func MarshalCompactEvent(ev *event.E, resolver SerialResolver) (data []byte, err error) {
buf := new(bytes.Buffer)
// Version byte
buf.WriteByte(CompactFormatVersion)
// Author pubkey serial (5 bytes)
var authorSerial uint64
if authorSerial, err = resolver.GetOrCreatePubkeySerial(ev.Pubkey); chk.E(err) {
return nil, err
}
writeUint40(buf, authorSerial)
// CreatedAt (varint)
varint.Encode(buf, uint64(ev.CreatedAt))
// Kind (2 bytes big-endian)
binary.Write(buf, binary.BigEndian, ev.Kind)
// Tags
if ev.Tags == nil || ev.Tags.Len() == 0 {
varint.Encode(buf, 0)
} else {
varint.Encode(buf, uint64(ev.Tags.Len()))
for _, t := range *ev.Tags {
if err = encodeCompactTag(buf, t, resolver); chk.E(err) {
return nil, err
}
}
}
// Content
varint.Encode(buf, uint64(len(ev.Content)))
buf.Write(ev.Content)
// Signature (64 bytes)
buf.Write(ev.Sig)
return buf.Bytes(), nil
}
// encodeCompactTag encodes a single tag with serial references for e/p tags.
func encodeCompactTag(w io.Writer, t *tag.T, resolver SerialResolver) (err error) {
if t == nil || t.Len() == 0 {
varint.Encode(w, 0)
return nil
}
varint.Encode(w, uint64(t.Len()))
// Get tag key to determine if we should use serial references
key := t.Key()
isPTag := len(key) == 1 && key[0] == 'p'
isETag := len(key) == 1 && key[0] == 'e'
for i, elem := range t.T {
if i == 0 {
// First element is always the tag key - store as raw
writeTagElement(w, TagElementRaw, elem)
continue
}
if i == 1 {
// Second element is the value - potentially a serial reference
if isPTag && len(elem) == 32 {
// Binary pubkey - look up serial
serial, serErr := resolver.GetOrCreatePubkeySerial(elem)
if serErr == nil {
writeTagElementSerial(w, TagElementPubkeySerial, serial)
continue
}
// Fall through to raw encoding on error
} else if isPTag && len(elem) == 64 {
// Hex pubkey - decode and look up serial
var pubkey []byte
if pubkey, err = hexDecode(elem); err == nil && len(pubkey) == 32 {
serial, serErr := resolver.GetOrCreatePubkeySerial(pubkey)
if serErr == nil {
writeTagElementSerial(w, TagElementPubkeySerial, serial)
continue
}
}
// Fall through to raw encoding on error
} else if isETag && len(elem) == 32 {
// Binary event ID - look up serial if exists
serial, found, serErr := resolver.GetEventSerialById(elem)
if serErr == nil && found {
writeTagElementSerial(w, TagElementEventSerial, serial)
continue
}
// Event not found - store full ID
writeTagElement(w, TagElementEventIdFull, elem)
continue
} else if isETag && len(elem) == 64 {
// Hex event ID - decode and look up serial
var eventId []byte
if eventId, err = hexDecode(elem); err == nil && len(eventId) == 32 {
serial, found, serErr := resolver.GetEventSerialById(eventId)
if serErr == nil && found {
writeTagElementSerial(w, TagElementEventSerial, serial)
continue
}
// Event not found - store full ID
writeTagElement(w, TagElementEventIdFull, eventId)
continue
}
// Fall through to raw encoding on error
}
}
// Default: raw encoding
writeTagElement(w, TagElementRaw, elem)
}
return nil
}
// writeTagElement writes a tag element with type flag.
func writeTagElement(w io.Writer, typeFlag byte, data []byte) {
w.Write([]byte{typeFlag})
if typeFlag == TagElementEventIdFull {
// Full event ID - no length prefix, always 32 bytes
w.Write(data)
} else {
// Raw data - length prefix
varint.Encode(w, uint64(len(data)))
w.Write(data)
}
}
// writeTagElementSerial writes a serial reference tag element.
func writeTagElementSerial(w io.Writer, typeFlag byte, serial uint64) {
w.Write([]byte{typeFlag})
writeUint40(w, serial)
}
// writeUint40 writes a 5-byte big-endian unsigned integer.
func writeUint40(w io.Writer, value uint64) {
buf := []byte{
byte((value >> 32) & 0xFF),
byte((value >> 24) & 0xFF),
byte((value >> 16) & 0xFF),
byte((value >> 8) & 0xFF),
byte(value & 0xFF),
}
w.Write(buf)
}
// readUint40 reads a 5-byte big-endian unsigned integer.
func readUint40(r io.Reader) (value uint64, err error) {
buf := make([]byte, 5)
if _, err = io.ReadFull(r, buf); err != nil {
return 0, err
}
value = (uint64(buf[0]) << 32) |
(uint64(buf[1]) << 24) |
(uint64(buf[2]) << 16) |
(uint64(buf[3]) << 8) |
uint64(buf[4])
return value, nil
}
// UnmarshalCompactEvent decodes a compact event back to a full event.E.
// The resolver is used to look up pubkeys and event IDs from serials.
// The eventId parameter is the full 32-byte event ID (from SerialEventId table).
func UnmarshalCompactEvent(data []byte, eventId []byte, resolver SerialResolver) (ev *event.E, err error) {
r := bytes.NewReader(data)
ev = new(event.E)
// Version byte
version, err := r.ReadByte()
if err != nil {
return nil, err
}
if version != CompactFormatVersion {
return nil, errors.New("unsupported compact event format version")
}
// Set the event ID (passed separately from SerialEventId lookup)
ev.ID = make([]byte, 32)
copy(ev.ID, eventId)
// Author pubkey serial (5 bytes) -> full pubkey
authorSerial, err := readUint40(r)
if err != nil {
return nil, err
}
if ev.Pubkey, err = resolver.GetPubkeyBySerial(authorSerial); chk.E(err) {
return nil, err
}
// CreatedAt (varint)
var ca uint64
if ca, err = varint.Decode(r); chk.E(err) {
return nil, err
}
ev.CreatedAt = int64(ca)
// Kind (2 bytes big-endian)
if err = binary.Read(r, binary.BigEndian, &ev.Kind); chk.E(err) {
return nil, err
}
// Tags
var nTags uint64
if nTags, err = varint.Decode(r); chk.E(err) {
return nil, err
}
if nTags > 0 {
ev.Tags = tag.NewSWithCap(int(nTags))
for i := uint64(0); i < nTags; i++ {
var t *tag.T
if t, err = decodeCompactTag(r, resolver); chk.E(err) {
return nil, err
}
*ev.Tags = append(*ev.Tags, t)
}
}
// Content
var contentLen uint64
if contentLen, err = varint.Decode(r); chk.E(err) {
return nil, err
}
ev.Content = make([]byte, contentLen)
if _, err = io.ReadFull(r, ev.Content); chk.E(err) {
return nil, err
}
// Signature (64 bytes)
ev.Sig = make([]byte, schnorr.SignatureSize)
if _, err = io.ReadFull(r, ev.Sig); chk.E(err) {
return nil, err
}
return ev, nil
}
// decodeCompactTag decodes a single tag from compact format.
func decodeCompactTag(r io.Reader, resolver SerialResolver) (t *tag.T, err error) {
var nElems uint64
if nElems, err = varint.Decode(r); chk.E(err) {
return nil, err
}
t = tag.NewWithCap(int(nElems))
for i := uint64(0); i < nElems; i++ {
var elem []byte
if elem, err = decodeTagElement(r, resolver); chk.E(err) {
return nil, err
}
t.T = append(t.T, elem)
}
return t, nil
}
// decodeTagElement decodes a single tag element from compact format.
func decodeTagElement(r io.Reader, resolver SerialResolver) (elem []byte, err error) {
// Read type flag
typeBuf := make([]byte, 1)
if _, err = io.ReadFull(r, typeBuf); err != nil {
return nil, err
}
typeFlag := typeBuf[0]
switch typeFlag {
case TagElementRaw:
// Raw bytes: varint length + data
var length uint64
if length, err = varint.Decode(r); chk.E(err) {
return nil, err
}
elem = make([]byte, length)
if _, err = io.ReadFull(r, elem); err != nil {
return nil, err
}
return elem, nil
case TagElementPubkeySerial:
// Pubkey serial: 5 bytes -> lookup full pubkey -> return as 32-byte binary
serial, err := readUint40(r)
if err != nil {
return nil, err
}
pubkey, err := resolver.GetPubkeyBySerial(serial)
if err != nil {
return nil, err
}
// Return as 32-byte binary (nostr library optimized format)
return pubkey, nil
case TagElementEventSerial:
// Event serial: 5 bytes -> lookup full event ID -> return as 32-byte binary
serial, err := readUint40(r)
if err != nil {
return nil, err
}
eventId, err := resolver.GetEventIdBySerial(serial)
if err != nil {
return nil, err
}
// Return as 32-byte binary
return eventId, nil
case TagElementEventIdFull:
// Full event ID: 32 bytes (for unknown/forward references)
elem = make([]byte, 32)
if _, err = io.ReadFull(r, elem); err != nil {
return nil, err
}
return elem, nil
default:
return nil, errors.New("unknown tag element type flag")
}
}
// hexDecode decodes hex bytes to binary.
// This is a simple implementation - the real one uses the optimized hex package.
func hexDecode(src []byte) (dst []byte, err error) {
if len(src)%2 != 0 {
return nil, errors.New("hex string has odd length")
}
dst = make([]byte, len(src)/2)
for i := 0; i < len(dst); i++ {
a := unhex(src[i*2])
b := unhex(src[i*2+1])
if a == 0xFF || b == 0xFF {
return nil, errors.New("invalid hex character")
}
dst[i] = (a << 4) | b
}
return dst, nil
}
func unhex(c byte) byte {
switch {
case '0' <= c && c <= '9':
return c - '0'
case 'a' <= c && c <= 'f':
return c - 'a' + 10
case 'A' <= c && c <= 'F':
return c - 'A' + 10
}
return 0xFF
}

View File

@@ -0,0 +1,195 @@
//go:build !(js && wasm)
package database
import (
"bytes"
"sync/atomic"
"github.com/dgraph-io/badger/v4"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database/indexes"
)
// CompactStorageStats holds statistics about compact vs legacy storage.
type CompactStorageStats struct {
// Event counts
CompactEvents int64 // Number of events in compact format (cmp prefix)
LegacyEvents int64 // Number of events in legacy format (evt/sev prefixes)
TotalEvents int64 // Total events
// Storage sizes
CompactBytes int64 // Total bytes used by compact format
LegacyBytes int64 // Total bytes used by legacy format (would be used without compact)
// Savings
BytesSaved int64 // Bytes saved by using compact format
PercentSaved float64 // Percentage of space saved
AverageCompact float64 // Average compact event size
AverageLegacy float64 // Average legacy event size (estimated)
// Serial mappings
SerialEventIdEntries int64 // Number of sei (serial -> event ID) mappings
SerialEventIdBytes int64 // Bytes used by sei mappings
}
// CompactStorageStats calculates storage statistics for compact event storage.
// This scans the database to provide accurate metrics on space savings.
func (d *D) CompactStorageStats() (stats CompactStorageStats, err error) {
if err = d.View(func(txn *badger.Txn) error {
// Count compact events (cmp prefix)
cmpPrf := new(bytes.Buffer)
if err = indexes.CompactEventEnc(nil).MarshalWrite(cmpPrf); chk.E(err) {
return err
}
it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpPrf.Bytes()})
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
stats.CompactEvents++
stats.CompactBytes += int64(len(item.Key())) + int64(item.ValueSize())
}
it.Close()
// Count legacy evt entries
evtPrf := new(bytes.Buffer)
if err = indexes.EventEnc(nil).MarshalWrite(evtPrf); chk.E(err) {
return err
}
it = txn.NewIterator(badger.IteratorOptions{Prefix: evtPrf.Bytes()})
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
stats.LegacyEvents++
stats.LegacyBytes += int64(len(item.Key())) + int64(item.ValueSize())
}
it.Close()
// Count legacy sev entries
sevPrf := new(bytes.Buffer)
if err = indexes.SmallEventEnc(nil).MarshalWrite(sevPrf); chk.E(err) {
return err
}
it = txn.NewIterator(badger.IteratorOptions{Prefix: sevPrf.Bytes()})
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
stats.LegacyEvents++
stats.LegacyBytes += int64(len(item.Key())) // sev stores data in key
}
it.Close()
// Count SerialEventId mappings (sei prefix)
seiPrf := new(bytes.Buffer)
if err = indexes.SerialEventIdEnc(nil).MarshalWrite(seiPrf); chk.E(err) {
return err
}
it = txn.NewIterator(badger.IteratorOptions{Prefix: seiPrf.Bytes()})
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
stats.SerialEventIdEntries++
stats.SerialEventIdBytes += int64(len(item.Key())) + int64(item.ValueSize())
}
it.Close()
return nil
}); chk.E(err) {
return
}
stats.TotalEvents = stats.CompactEvents + stats.LegacyEvents
// Calculate averages
if stats.CompactEvents > 0 {
stats.AverageCompact = float64(stats.CompactBytes) / float64(stats.CompactEvents)
}
if stats.LegacyEvents > 0 {
stats.AverageLegacy = float64(stats.LegacyBytes) / float64(stats.LegacyEvents)
}
// Estimate savings: compare compact size to what legacy size would be
// For events that are in compact format, estimate legacy size based on typical ratios
// A typical event has:
// - 32 bytes event ID (saved in compact: stored separately in sei)
// - 32 bytes pubkey (saved: replaced by 5-byte serial)
// - For e-tags: 32 bytes each (saved: replaced by 5-byte serial when known)
// - For p-tags: 32 bytes each (saved: replaced by 5-byte serial)
// Conservative estimate: compact format is ~60% of legacy size for typical events
if stats.CompactEvents > 0 && stats.AverageCompact > 0 {
// Estimate what the legacy size would have been
estimatedLegacyForCompact := float64(stats.CompactBytes) / 0.60 // 60% compression ratio
stats.BytesSaved = int64(estimatedLegacyForCompact) - stats.CompactBytes - stats.SerialEventIdBytes
if stats.BytesSaved < 0 {
stats.BytesSaved = 0
}
totalWithoutCompact := estimatedLegacyForCompact + float64(stats.LegacyBytes)
totalWithCompact := float64(stats.CompactBytes + stats.LegacyBytes + stats.SerialEventIdBytes)
if totalWithoutCompact > 0 {
stats.PercentSaved = (1.0 - totalWithCompact/totalWithoutCompact) * 100.0
}
}
return stats, nil
}
// compactSaveCounter tracks cumulative bytes saved by compact format
var compactSaveCounter atomic.Int64
// LogCompactSavings logs the storage savings achieved by compact format.
// Call this periodically or after significant operations.
func (d *D) LogCompactSavings() {
stats, err := d.CompactStorageStats()
if err != nil {
log.W.F("failed to get compact storage stats: %v", err)
return
}
if stats.TotalEvents == 0 {
return
}
log.I.F("📊 Compact storage stats: %d compact events, %d legacy events",
stats.CompactEvents, stats.LegacyEvents)
log.I.F(" Compact size: %.2f MB, Legacy size: %.2f MB",
float64(stats.CompactBytes)/(1024.0*1024.0),
float64(stats.LegacyBytes)/(1024.0*1024.0))
log.I.F(" Serial mappings (sei): %d entries, %.2f KB",
stats.SerialEventIdEntries,
float64(stats.SerialEventIdBytes)/1024.0)
if stats.CompactEvents > 0 {
log.I.F(" Average compact event: %.0f bytes, estimated legacy: %.0f bytes",
stats.AverageCompact, stats.AverageCompact/0.60)
log.I.F(" Estimated savings: %.2f MB (%.1f%%)",
float64(stats.BytesSaved)/(1024.0*1024.0),
stats.PercentSaved)
}
// Also log serial cache stats
cacheStats := d.SerialCacheStats()
log.I.F(" Serial cache: %d/%d pubkeys, %d/%d event IDs, ~%.2f MB memory",
cacheStats.PubkeysCached, cacheStats.PubkeysMaxSize,
cacheStats.EventIdsCached, cacheStats.EventIdsMaxSize,
float64(cacheStats.TotalMemoryBytes)/(1024.0*1024.0))
}
// TrackCompactSaving records bytes saved for a single event.
// Call this during event save to track cumulative savings.
func TrackCompactSaving(legacySize, compactSize int) {
saved := legacySize - compactSize
if saved > 0 {
compactSaveCounter.Add(int64(saved))
}
}
// GetCumulativeCompactSavings returns total bytes saved across all compact saves.
func GetCumulativeCompactSavings() int64 {
return compactSaveCounter.Load()
}
// ResetCompactSavingsCounter resets the cumulative savings counter.
func ResetCompactSavingsCounter() {
compactSaveCounter.Store(0)
}

View File

@@ -22,16 +22,19 @@ import (
// D implements the Database interface using Badger as the storage backend
type D struct {
ctx context.Context
cancel context.CancelFunc
dataDir string
Logger *logger
inlineEventThreshold int // Configurable threshold for inline event storage
ctx context.Context
cancel context.CancelFunc
dataDir string
Logger *logger
*badger.DB
seq *badger.Sequence
pubkeySeq *badger.Sequence // Sequence for pubkey serials
ready chan struct{} // Closed when database is ready to serve requests
queryCache *querycache.EventCache
// Serial cache for compact event storage
// Caches pubkey and event ID serial mappings for fast compact event decoding
serialCache *SerialCache
}
// Ensure D implements Database interface at compile time
@@ -47,13 +50,12 @@ func New(
) {
// Create a default config for backward compatibility
cfg := &DatabaseConfig{
DataDir: dataDir,
LogLevel: logLevel,
BlockCacheMB: 1024, // Default 1024 MB
IndexCacheMB: 512, // Default 512 MB
QueryCacheSizeMB: 512, // Default 512 MB
QueryCacheMaxAge: 5 * time.Minute, // Default 5 minutes
InlineEventThreshold: 1024, // Default 1024 bytes
DataDir: dataDir,
LogLevel: logLevel,
BlockCacheMB: 1024, // Default 1024 MB
IndexCacheMB: 512, // Default 512 MB
QueryCacheSizeMB: 512, // Default 512 MB
QueryCacheMaxAge: 5 * time.Minute, // Default 5 minutes
}
return NewWithConfig(ctx, cancel, cfg)
}
@@ -82,23 +84,38 @@ func NewWithConfig(
if queryCacheMaxAge == 0 {
queryCacheMaxAge = 5 * time.Minute // Default 5 minutes
}
inlineEventThreshold := cfg.InlineEventThreshold
if inlineEventThreshold == 0 {
inlineEventThreshold = 1024 // Default 1024 bytes
// Serial cache configuration for compact event storage
serialCachePubkeys := cfg.SerialCachePubkeys
if serialCachePubkeys == 0 {
serialCachePubkeys = 100000 // Default 100k pubkeys (~3.2MB memory)
}
serialCacheEventIds := cfg.SerialCacheEventIds
if serialCacheEventIds == 0 {
serialCacheEventIds = 500000 // Default 500k event IDs (~16MB memory)
}
// ZSTD compression level configuration
// Level 0 = disabled, 1 = fast (~500 MB/s), 3 = default, 9 = best ratio
zstdLevel := cfg.ZSTDLevel
if zstdLevel < 0 {
zstdLevel = 0
} else if zstdLevel > 19 {
zstdLevel = 19 // ZSTD maximum level
}
queryCacheSize := int64(queryCacheSizeMB * 1024 * 1024)
d = &D{
ctx: ctx,
cancel: cancel,
dataDir: cfg.DataDir,
Logger: NewLogger(lol.GetLogLevel(cfg.LogLevel), cfg.DataDir),
inlineEventThreshold: inlineEventThreshold,
DB: nil,
seq: nil,
ready: make(chan struct{}),
queryCache: querycache.NewEventCache(queryCacheSize, queryCacheMaxAge),
ctx: ctx,
cancel: cancel,
dataDir: cfg.DataDir,
Logger: NewLogger(lol.GetLogLevel(cfg.LogLevel), cfg.DataDir),
DB: nil,
seq: nil,
ready: make(chan struct{}),
queryCache: querycache.NewEventCache(queryCacheSize, queryCacheMaxAge),
serialCache: NewSerialCache(serialCachePubkeys, serialCacheEventIds),
}
// Ensure the data directory exists
@@ -141,8 +158,13 @@ func NewWithConfig(
opts.LmaxCompaction = true
// Enable compression to reduce cache cost
opts.Compression = options.ZSTD
opts.ZSTDCompressionLevel = 1 // Fast compression (500+ MB/s)
// Level 0 disables compression, 1 = fast (~500 MB/s), 3 = default, 9 = best ratio
if zstdLevel == 0 {
opts.Compression = options.None
} else {
opts.Compression = options.ZSTD
opts.ZSTDCompressionLevel = zstdLevel
}
// Disable conflict detection for write-heavy relay workloads
// Nostr events are immutable, no need for transaction conflict checks

View File

@@ -16,52 +16,141 @@ import (
)
// Export the complete database of stored events to an io.Writer in line structured minified
// JSON.
// JSON. Supports both legacy and compact event formats.
func (d *D) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
var err error
evB := make([]byte, 0, units.Mb)
evBuf := bytes.NewBuffer(evB)
// Create resolver for compact event decoding
resolver := NewDatabaseSerialResolver(d, d.serialCache)
// Helper function to unmarshal event data (handles both legacy and compact formats)
unmarshalEventData := func(val []byte, ser *types.Uint40) (*event.E, error) {
// Check if this is compact format (starts with version byte 1)
if len(val) > 0 && val[0] == CompactFormatVersion {
// Get event ID from SerialEventId table
eventId, idErr := d.GetEventIdBySerial(ser)
if idErr != nil {
// Can't decode without event ID - skip
return nil, idErr
}
return UnmarshalCompactEvent(val, eventId, resolver)
}
// Legacy binary format
ev := event.New()
evBuf.Reset()
evBuf.Write(val)
if err := ev.UnmarshalBinary(evBuf); err != nil {
return nil, err
}
return ev, nil
}
if len(pubkeys) == 0 {
// Export all events - prefer cmp table, fall back to evt
if err = d.View(
func(txn *badger.Txn) (err error) {
buf := new(bytes.Buffer)
if err = indexes.EventEnc(nil).MarshalWrite(buf); chk.E(err) {
// First try cmp (compact format) table
cmpBuf := new(bytes.Buffer)
if err = indexes.CompactEventEnc(nil).MarshalWrite(cmpBuf); chk.E(err) {
return
}
it := txn.NewIterator(badger.IteratorOptions{Prefix: buf.Bytes()})
it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpBuf.Bytes()})
defer it.Close()
seenSerials := make(map[uint64]bool)
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
if err = item.Value(
func(val []byte) (err error) {
evBuf.Write(val)
return
},
); chk.E(err) {
key := item.Key()
// Extract serial from key
ser := new(types.Uint40)
if err = ser.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
continue
}
ev := event.New()
if err = ev.UnmarshalBinary(evBuf); chk.E(err) {
seenSerials[ser.Get()] = true
var val []byte
if val, err = item.ValueCopy(nil); chk.E(err) {
continue
}
ev, unmarshalErr := unmarshalEventData(val, ser)
if unmarshalErr != nil {
continue
}
// Serialize the event to JSON and write it to the output
defer func(ev *event.E) {
ev.Free()
evBuf.Reset()
}(ev)
if _, err = w.Write(ev.Serialize()); chk.E(err) {
ev.Free()
return
}
if _, err = w.Write([]byte{'\n'}); chk.E(err) {
ev.Free()
return
}
ev.Free()
}
it.Close()
// Then fall back to evt (legacy) table for any events not in cmp
evtBuf := new(bytes.Buffer)
if err = indexes.EventEnc(nil).MarshalWrite(evtBuf); chk.E(err) {
return
}
it2 := txn.NewIterator(badger.IteratorOptions{Prefix: evtBuf.Bytes()})
defer it2.Close()
for it2.Rewind(); it2.Valid(); it2.Next() {
item := it2.Item()
key := item.Key()
// Extract serial from key
ser := new(types.Uint40)
if err = ser.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
continue
}
// Skip if already exported from cmp table
if seenSerials[ser.Get()] {
continue
}
var val []byte
if val, err = item.ValueCopy(nil); chk.E(err) {
continue
}
ev, unmarshalErr := unmarshalEventData(val, ser)
if unmarshalErr != nil {
continue
}
// Serialize the event to JSON and write it to the output
if _, err = w.Write(ev.Serialize()); chk.E(err) {
ev.Free()
return
}
if _, err = w.Write([]byte{'\n'}); chk.E(err) {
ev.Free()
return
}
ev.Free()
}
return
},
); err != nil {
return
}
} else {
// Export events for specific pubkeys
for _, pubkey := range pubkeys {
if err = d.View(
func(txn *badger.Txn) (err error) {
@@ -79,29 +168,34 @@ func (d *D) Export(c context.Context, w io.Writer, pubkeys ...[]byte) {
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
if err = item.Value(
func(val []byte) (err error) {
evBuf.Write(val)
return
},
); chk.E(err) {
key := item.Key()
// Extract serial from pubkey index key
// Key format: pc-|pubkey_hash|created_at|serial
if len(key) < 3+8+8+5 {
continue
}
ev := event.New()
if err = ev.UnmarshalBinary(evBuf); chk.E(err) {
ser := new(types.Uint40)
if err = ser.UnmarshalRead(bytes.NewReader(key[len(key)-5:])); chk.E(err) {
continue
}
// Fetch the event using FetchEventBySerial which handles all formats
ev, fetchErr := d.FetchEventBySerial(ser)
if fetchErr != nil || ev == nil {
continue
}
// Serialize the event to JSON and write it to the output
defer func(ev *event.E) {
ev.Free()
evBuf.Reset()
}(ev)
if _, err = w.Write(ev.Serialize()); chk.E(err) {
ev.Free()
continue
}
if _, err = w.Write([]byte{'\n'}); chk.E(err) {
ev.Free()
continue
}
ev.Free()
}
return
},

View File

@@ -18,11 +18,17 @@ type DatabaseConfig struct {
LogLevel string
// Badger-specific settings
BlockCacheMB int // ORLY_DB_BLOCK_CACHE_MB
IndexCacheMB int // ORLY_DB_INDEX_CACHE_MB
QueryCacheSizeMB int // ORLY_QUERY_CACHE_SIZE_MB
QueryCacheMaxAge time.Duration // ORLY_QUERY_CACHE_MAX_AGE
InlineEventThreshold int // ORLY_INLINE_EVENT_THRESHOLD
BlockCacheMB int // ORLY_DB_BLOCK_CACHE_MB
IndexCacheMB int // ORLY_DB_INDEX_CACHE_MB
QueryCacheSizeMB int // ORLY_QUERY_CACHE_SIZE_MB
QueryCacheMaxAge time.Duration // ORLY_QUERY_CACHE_MAX_AGE
// Serial cache settings for compact event storage
SerialCachePubkeys int // ORLY_SERIAL_CACHE_PUBKEYS - max pubkeys to cache (default: 100000)
SerialCacheEventIds int // ORLY_SERIAL_CACHE_EVENT_IDS - max event IDs to cache (default: 500000)
// Compression settings
ZSTDLevel int // ORLY_DB_ZSTD_LEVEL - ZSTD compression level (0=none, 1=fast, 3=default, 9=best)
// Neo4j-specific settings
Neo4jURI string // ORLY_NEO4J_URI

View File

@@ -18,11 +18,14 @@ type DatabaseConfig struct {
LogLevel string
// Badger-specific settings (not available in WASM)
BlockCacheMB int // ORLY_DB_BLOCK_CACHE_MB
IndexCacheMB int // ORLY_DB_INDEX_CACHE_MB
QueryCacheSizeMB int // ORLY_QUERY_CACHE_SIZE_MB
QueryCacheMaxAge time.Duration // ORLY_QUERY_CACHE_MAX_AGE
InlineEventThreshold int // ORLY_INLINE_EVENT_THRESHOLD
BlockCacheMB int // ORLY_DB_BLOCK_CACHE_MB
IndexCacheMB int // ORLY_DB_INDEX_CACHE_MB
QueryCacheSizeMB int // ORLY_QUERY_CACHE_SIZE_MB
QueryCacheMaxAge time.Duration // ORLY_QUERY_CACHE_MAX_AGE
// Serial cache settings for compact event storage (Badger-specific)
SerialCachePubkeys int // ORLY_SERIAL_CACHE_PUBKEYS - max pubkeys to cache (default: 100000)
SerialCacheEventIds int // ORLY_SERIAL_CACHE_EVENT_IDS - max event IDs to cache (default: 500000)
// Neo4j-specific settings
Neo4jURI string // ORLY_NEO4J_URI

View File

@@ -13,9 +13,24 @@ import (
"git.mleku.dev/mleku/nostr/encoders/event"
)
// FetchEventBySerial fetches a single event by its serial.
// This function tries multiple storage formats in order:
// 1. cmp (compact format with serial references) - newest, most space-efficient
// 2. sev (small event inline) - legacy Reiser4 optimization
// 3. evt (traditional separate storage) - legacy fallback
func (d *D) FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error) {
// Create resolver for compact event decoding
resolver := NewDatabaseSerialResolver(d, d.serialCache)
if err = d.View(
func(txn *badger.Txn) (err error) {
// Try cmp (compact format) first - most efficient
ev, err = d.fetchCompactEvent(txn, ser, resolver)
if err == nil && ev != nil {
return nil
}
err = nil // Reset error, try legacy formats
// Helper function to extract inline event data from key
extractInlineData := func(key []byte, prefixLen int) (*event.E, error) {
if len(key) > prefixLen+2 {
@@ -25,6 +40,16 @@ func (d *D) FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error) {
if len(key) >= dataStart+size {
eventData := key[dataStart : dataStart+size]
// Check if this is compact format
if len(eventData) > 0 && eventData[0] == CompactFormatVersion {
eventId, idErr := d.GetEventIdBySerial(ser)
if idErr == nil {
return UnmarshalCompactEvent(eventData, eventId, resolver)
}
}
// Legacy binary format
ev := new(event.E)
if err := ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err != nil {
return nil, fmt.Errorf(
@@ -38,7 +63,7 @@ func (d *D) FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error) {
return nil, nil
}
// Try sev (small event inline) prefix first - Reiser4 optimization
// Try sev (small event inline) prefix - Reiser4 optimization
smallBuf := new(bytes.Buffer)
if err = indexes.SmallEventEnc(ser).MarshalWrite(smallBuf); chk.E(err) {
return
@@ -77,6 +102,16 @@ func (d *D) FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error) {
if v, err = item.ValueCopy(nil); chk.E(err) {
return
}
// Check if this is compact format
if len(v) > 0 && v[0] == CompactFormatVersion {
eventId, idErr := d.GetEventIdBySerial(ser)
if idErr == nil {
ev, err = UnmarshalCompactEvent(v, eventId, resolver)
return
}
}
// Check if we have valid data before attempting to unmarshal
if len(v) < 32+32+1+2+1+1+64 { // ID + Pubkey + min varint fields + Sig
err = fmt.Errorf(

View File

@@ -7,6 +7,7 @@ import (
"github.com/dgraph-io/badger/v4"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database/indexes"
"next.orly.dev/pkg/database/indexes/types"
"git.mleku.dev/mleku/nostr/encoders/event"
@@ -14,6 +15,11 @@ import (
// FetchEventsBySerials fetches multiple events by their serials in a single database transaction.
// Returns a map of serial uint64 value to event, only including successfully fetched events.
//
// This function tries multiple storage formats in order:
// 1. cmp (compact format with serial references) - newest, most space-efficient
// 2. sev (small event inline) - legacy Reiser4 optimization
// 3. evt (traditional separate storage) - legacy fallback
func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*event.E, err error) {
// Pre-allocate map with estimated capacity to reduce reallocations
events = make(map[uint64]*event.E, len(serials))
@@ -22,89 +28,38 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev
return events, nil
}
// Create resolver for compact event decoding
resolver := NewDatabaseSerialResolver(d, d.serialCache)
if err = d.View(
func(txn *badger.Txn) (err error) {
for _, ser := range serials {
var ev *event.E
serialVal := ser.Get()
// Try sev (small event inline) prefix first - Reiser4 optimization
smallBuf := new(bytes.Buffer)
if err = indexes.SmallEventEnc(ser).MarshalWrite(smallBuf); chk.E(err) {
// Skip this serial on error but continue with others
err = nil
// Try cmp (compact format) first - most efficient
ev, err = d.fetchCompactEvent(txn, ser, resolver)
if err == nil && ev != nil {
events[serialVal] = ev
continue
}
err = nil // Reset error, try legacy formats
// Iterate with prefix to find the small event key
opts := badger.DefaultIteratorOptions
opts.Prefix = smallBuf.Bytes()
opts.PrefetchValues = true
opts.PrefetchSize = 1
it := txn.NewIterator(opts)
it.Rewind()
if it.Valid() {
// Found in sev table - extract inline data
key := it.Item().Key()
// Key format: sev|serial|size_uint16|event_data
if len(key) > 8+2 { // prefix(3) + serial(5) + size(2) = 10 bytes minimum
sizeIdx := 8 // After sev(3) + serial(5)
// Read uint16 big-endian size
size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1])
dataStart := sizeIdx + 2
if len(key) >= dataStart+size {
eventData := key[dataStart : dataStart+size]
ev = new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err == nil {
events[ser.Get()] = ev
}
// Clean up and continue
it.Close()
err = nil
continue
}
}
// Try sev (small event inline) prefix - legacy Reiser4 optimization
ev, err = d.fetchSmallEvent(txn, ser)
if err == nil && ev != nil {
events[serialVal] = ev
continue
}
it.Close()
err = nil // Reset error, try evt
// Not found in sev table, try evt (traditional) prefix
buf := new(bytes.Buffer)
if err = indexes.EventEnc(ser).MarshalWrite(buf); chk.E(err) {
// Skip this serial on error but continue with others
err = nil
ev, err = d.fetchLegacyEvent(txn, ser)
if err == nil && ev != nil {
events[serialVal] = ev
continue
}
var item *badger.Item
if item, err = txn.Get(buf.Bytes()); err != nil {
// Skip this serial if not found but continue with others
err = nil
continue
}
var v []byte
if v, err = item.ValueCopy(nil); chk.E(err) {
// Skip this serial on error but continue with others
err = nil
continue
}
// Check if we have valid data before attempting to unmarshal
if len(v) < 32+32+1+2+1+1+64 { // ID + Pubkey + min varint fields + Sig
// Skip this serial - incomplete data
continue
}
ev = new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(v)); err != nil {
// Skip this serial on unmarshal error but continue with others
err = nil
continue
}
// Successfully unmarshaled event, add to results
events[ser.Get()] = ev
err = nil // Reset error, event not found
}
return nil
},
@@ -113,4 +68,150 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev
}
return events, nil
}
}
// fetchCompactEvent tries to fetch an event from the compact format (cmp prefix).
func (d *D) fetchCompactEvent(txn *badger.Txn, ser *types.Uint40, resolver SerialResolver) (ev *event.E, err error) {
// Build cmp key
keyBuf := new(bytes.Buffer)
if err = indexes.CompactEventEnc(ser).MarshalWrite(keyBuf); chk.E(err) {
return nil, err
}
item, err := txn.Get(keyBuf.Bytes())
if err != nil {
return nil, err
}
var compactData []byte
if compactData, err = item.ValueCopy(nil); chk.E(err) {
return nil, err
}
// Need to get the event ID from SerialEventId table
eventId, err := d.GetEventIdBySerial(ser)
if err != nil {
log.D.F("fetchCompactEvent: failed to get event ID for serial %d: %v", ser.Get(), err)
return nil, err
}
// Unmarshal compact event
ev, err = UnmarshalCompactEvent(compactData, eventId, resolver)
if err != nil {
log.D.F("fetchCompactEvent: failed to unmarshal compact event for serial %d: %v", ser.Get(), err)
return nil, err
}
return ev, nil
}
// fetchSmallEvent tries to fetch an event from the small event inline format (sev prefix).
func (d *D) fetchSmallEvent(txn *badger.Txn, ser *types.Uint40) (ev *event.E, err error) {
smallBuf := new(bytes.Buffer)
if err = indexes.SmallEventEnc(ser).MarshalWrite(smallBuf); chk.E(err) {
return nil, err
}
// Iterate with prefix to find the small event key
opts := badger.DefaultIteratorOptions
opts.Prefix = smallBuf.Bytes()
opts.PrefetchValues = true
opts.PrefetchSize = 1
it := txn.NewIterator(opts)
defer it.Close()
it.Rewind()
if !it.Valid() {
return nil, nil // Not found
}
// Found in sev table - extract inline data
key := it.Item().Key()
// Key format: sev|serial|size_uint16|event_data
if len(key) <= 8+2 { // prefix(3) + serial(5) + size(2) = 10 bytes minimum
return nil, nil
}
sizeIdx := 8 // After sev(3) + serial(5)
// Read uint16 big-endian size
size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1])
dataStart := sizeIdx + 2
if len(key) < dataStart+size {
return nil, nil
}
eventData := key[dataStart : dataStart+size]
// Check if this is compact format (starts with version byte 1)
if len(eventData) > 0 && eventData[0] == CompactFormatVersion {
// This is compact format stored in sev - need to decode with resolver
resolver := NewDatabaseSerialResolver(d, d.serialCache)
eventId, idErr := d.GetEventIdBySerial(ser)
if idErr != nil {
// Fall back to legacy unmarshal
ev = new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err != nil {
return nil, err
}
return ev, nil
}
return UnmarshalCompactEvent(eventData, eventId, resolver)
}
// Legacy binary format
ev = new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err != nil {
return nil, err
}
return ev, nil
}
// fetchLegacyEvent tries to fetch an event from the legacy format (evt prefix).
func (d *D) fetchLegacyEvent(txn *badger.Txn, ser *types.Uint40) (ev *event.E, err error) {
buf := new(bytes.Buffer)
if err = indexes.EventEnc(ser).MarshalWrite(buf); chk.E(err) {
return nil, err
}
item, err := txn.Get(buf.Bytes())
if err != nil {
return nil, err
}
var v []byte
if v, err = item.ValueCopy(nil); chk.E(err) {
return nil, err
}
// Check if we have valid data before attempting to unmarshal
if len(v) < 32+32+1+2+1+1+64 { // ID + Pubkey + min varint fields + Sig
return nil, nil
}
// Check if this is compact format (starts with version byte 1)
if len(v) > 0 && v[0] == CompactFormatVersion {
// This is compact format stored in evt - need to decode with resolver
resolver := NewDatabaseSerialResolver(d, d.serialCache)
eventId, idErr := d.GetEventIdBySerial(ser)
if idErr != nil {
// Fall back to legacy unmarshal
ev = new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(v)); err != nil {
return nil, err
}
return ev, nil
}
return UnmarshalCompactEvent(v, eventId, resolver)
}
// Legacy binary format
ev = new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(v)); err != nil {
return nil, err
}
return ev, nil
}

View File

@@ -81,6 +81,10 @@ const (
SerialPubkeyPrefix = I("spk") // pubkey serial -> pubkey hash (full 32 bytes)
EventPubkeyGraphPrefix = I("epg") // event serial -> pubkey serial (graph edges)
PubkeyEventGraphPrefix = I("peg") // pubkey serial -> event serial (reverse edges)
// Compact event storage indexes
SerialEventIdPrefix = I("sei") // event serial -> full 32-byte event ID
CompactEventPrefix = I("cmp") // compact event storage with serial references
)
// Prefix returns the three byte human-readable prefixes that go in front of
@@ -133,6 +137,11 @@ func Prefix(prf int) (i I) {
return EventPubkeyGraphPrefix
case PubkeyEventGraph:
return PubkeyEventGraphPrefix
case SerialEventId:
return SerialEventIdPrefix
case CompactEvent:
return CompactEventPrefix
}
return
}
@@ -191,6 +200,11 @@ func Identify(r io.Reader) (i int, err error) {
i = EventPubkeyGraph
case PubkeyEventGraphPrefix:
i = PubkeyEventGraph
case SerialEventIdPrefix:
i = SerialEventId
case CompactEventPrefix:
i = CompactEvent
}
return
}
@@ -608,3 +622,36 @@ func PubkeyEventGraphEnc(pubkeySer *types.Uint40, kind *types.Uint16, direction
func PubkeyEventGraphDec(pubkeySer *types.Uint40, kind *types.Uint16, direction *types.Letter, eventSer *types.Uint40) (enc *T) {
return New(NewPrefix(), pubkeySer, kind, direction, eventSer)
}
// SerialEventId maps an event serial to its full 32-byte event ID.
// This enables reconstruction of the original event ID from compact storage.
// The event ID is stored as the value (32 bytes), not inline in the key.
//
// 3 prefix|5 serial -> 32 byte event ID value
var SerialEventId = next()
func SerialEventIdVars() (ser *types.Uint40) {
return new(types.Uint40)
}
func SerialEventIdEnc(ser *types.Uint40) (enc *T) {
return New(NewPrefix(SerialEventId), ser)
}
func SerialEventIdDec(ser *types.Uint40) (enc *T) {
return New(NewPrefix(), ser)
}
// CompactEvent stores events using serial references instead of full IDs/pubkeys.
// This dramatically reduces storage size by replacing:
// - 32-byte event ID with 5-byte serial
// - 32-byte author pubkey with 5-byte pubkey serial
// - 32-byte e-tag values with 5-byte event serials (or full ID if unknown)
// - 32-byte p-tag values with 5-byte pubkey serials
//
// Format: cmp|5 serial|compact event data (variable length)
var CompactEvent = next()
func CompactEventVars() (ser *types.Uint40) { return new(types.Uint40) }
func CompactEventEnc(ser *types.Uint40) (enc *T) {
return New(NewPrefix(CompactEvent), ser)
}
func CompactEventDec(ser *types.Uint40) (enc *T) { return New(NewPrefix(), ser) }

View File

@@ -18,7 +18,7 @@ import (
)
const (
currentVersion uint32 = 5
currentVersion uint32 = 6
)
func (d *D) RunMigrations() {
@@ -99,6 +99,14 @@ func (d *D) RunMigrations() {
// bump to version 5
_ = d.writeVersionTag(5)
}
if dbVersion < 6 {
log.I.F("migrating to version 6...")
// convert events to compact serial-reference format
// This replaces 32-byte IDs/pubkeys with 5-byte serial references
d.ConvertToCompactEventFormat()
// bump to version 6
_ = d.writeVersionTag(6)
}
}
// writeVersionTag writes a new version tag key to the database (no value)
@@ -683,3 +691,337 @@ func (d *D) ReencodeEventsWithOptimizedTags() {
log.I.F("migration complete: re-encoded %d events, saved approximately %d bytes (%.2f KB)",
processedCount, savedBytes, float64(savedBytes)/1024.0)
}
// ConvertToCompactEventFormat migrates all existing events to the new compact format.
// This format uses 5-byte serial references instead of 32-byte IDs/pubkeys,
// dramatically reducing storage requirements (up to 80% savings on ID/pubkey data).
//
// The migration:
// 1. Reads each event from legacy storage (evt/sev prefixes)
// 2. Creates SerialEventId mapping (sei prefix) for event ID lookup
// 3. Re-encodes the event in compact format
// 4. Stores in cmp prefix
// 5. Optionally removes legacy storage after successful migration
func (d *D) ConvertToCompactEventFormat() {
log.I.F("converting events to compact serial-reference format...")
var err error
type EventMigration struct {
Serial uint64
EventId []byte
OldData []byte
OldKey []byte
IsInline bool // true if from sev, false if from evt
}
var migrations []EventMigration
var processedCount int
var savedBytes int64
// Create resolver for compact encoding
resolver := NewDatabaseSerialResolver(d, d.serialCache)
// First pass: collect all events that need migration
// Only process events that don't have a cmp entry yet
if err = d.View(func(txn *badger.Txn) error {
// Process evt (large events) table
evtPrf := new(bytes.Buffer)
if err = indexes.EventEnc(nil).MarshalWrite(evtPrf); chk.E(err) {
return err
}
it := txn.NewIterator(badger.IteratorOptions{Prefix: evtPrf.Bytes()})
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := item.KeyCopy(nil)
// Extract serial from key
ser := indexes.EventVars()
if err = indexes.EventDec(ser).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) {
continue
}
// Check if this event already has a cmp entry
cmpKey := new(bytes.Buffer)
if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKey); err == nil {
if _, getErr := txn.Get(cmpKey.Bytes()); getErr == nil {
// Already migrated
continue
}
}
var val []byte
if val, err = item.ValueCopy(nil); chk.E(err) {
continue
}
// Skip if this is already compact format
if len(val) > 0 && val[0] == CompactFormatVersion {
continue
}
// Decode the event to get the ID
ev := new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
continue
}
migrations = append(migrations, EventMigration{
Serial: ser.Get(),
EventId: ev.ID,
OldData: val,
OldKey: key,
IsInline: false,
})
}
it.Close()
// Process sev (small inline events) table
sevPrf := new(bytes.Buffer)
if err = indexes.SmallEventEnc(nil).MarshalWrite(sevPrf); chk.E(err) {
return err
}
it2 := txn.NewIterator(badger.IteratorOptions{Prefix: sevPrf.Bytes()})
defer it2.Close()
for it2.Rewind(); it2.Valid(); it2.Next() {
item := it2.Item()
key := item.KeyCopy(nil)
// Extract serial and data from inline key
if len(key) <= 8+2 {
continue
}
// Extract serial
ser := new(types.Uint40)
if err = ser.UnmarshalRead(bytes.NewReader(key[3:8])); chk.E(err) {
continue
}
// Check if this event already has a cmp entry
cmpKey := new(bytes.Buffer)
if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKey); err == nil {
if _, getErr := txn.Get(cmpKey.Bytes()); getErr == nil {
// Already migrated
continue
}
}
// Extract size and data
sizeIdx := 8
size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1])
dataStart := sizeIdx + 2
if len(key) < dataStart+size {
continue
}
eventData := key[dataStart : dataStart+size]
// Skip if this is already compact format
if len(eventData) > 0 && eventData[0] == CompactFormatVersion {
continue
}
// Decode the event to get the ID
ev := new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); chk.E(err) {
continue
}
migrations = append(migrations, EventMigration{
Serial: ser.Get(),
EventId: ev.ID,
OldData: eventData,
OldKey: key,
IsInline: true,
})
}
return nil
}); chk.E(err) {
return
}
log.I.F("found %d events to convert to compact format", len(migrations))
if len(migrations) == 0 {
log.I.F("no events need conversion")
return
}
// Second pass: convert in batches
const batchSize = 500
for i := 0; i < len(migrations); i += batchSize {
end := i + batchSize
if end > len(migrations) {
end = len(migrations)
}
batch := migrations[i:end]
if err = d.Update(func(txn *badger.Txn) error {
for _, m := range batch {
// Decode the legacy event
ev := new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(m.OldData)); chk.E(err) {
log.W.F("migration: failed to decode event serial %d: %v", m.Serial, err)
continue
}
// Store SerialEventId mapping
if err = d.StoreEventIdSerial(txn, m.Serial, m.EventId); chk.E(err) {
log.W.F("migration: failed to store event ID mapping for serial %d: %v", m.Serial, err)
continue
}
// Encode in compact format
compactData, encErr := MarshalCompactEvent(ev, resolver)
if encErr != nil {
log.W.F("migration: failed to encode compact event for serial %d: %v", m.Serial, encErr)
continue
}
// Store compact event
ser := new(types.Uint40)
if err = ser.Set(m.Serial); chk.E(err) {
continue
}
cmpKey := new(bytes.Buffer)
if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKey); chk.E(err) {
continue
}
if err = txn.Set(cmpKey.Bytes(), compactData); chk.E(err) {
log.W.F("migration: failed to store compact event for serial %d: %v", m.Serial, err)
continue
}
// Track savings
savedBytes += int64(len(m.OldData) - len(compactData))
processedCount++
// Cache the mappings
d.serialCache.CacheEventId(m.Serial, m.EventId)
}
return nil
}); chk.E(err) {
log.W.F("batch migration failed: %v", err)
continue
}
if (i/batchSize)%10 == 0 && i > 0 {
log.I.F("migration progress: %d/%d events converted", i, len(migrations))
}
}
log.I.F("compact format migration complete: converted %d events, saved approximately %d bytes (%.2f MB)",
processedCount, savedBytes, float64(savedBytes)/(1024.0*1024.0))
// Cleanup legacy storage after successful migration
log.I.F("cleaning up legacy event storage (evt/sev prefixes)...")
d.CleanupLegacyEventStorage()
}
// CleanupLegacyEventStorage removes legacy evt and sev storage entries after
// compact format migration. This reclaims disk space by removing the old storage
// format entries once all events have been successfully migrated to cmp format.
//
// The cleanup:
// 1. Iterates through all cmp entries (compact format)
// 2. For each serial found in cmp, deletes corresponding evt and sev entries
// 3. Reports total bytes reclaimed
func (d *D) CleanupLegacyEventStorage() {
var err error
var cleanedEvt, cleanedSev int
var bytesReclaimed int64
// Collect serials from cmp table
var serialsToClean []uint64
if err = d.View(func(txn *badger.Txn) error {
cmpPrf := new(bytes.Buffer)
if err = indexes.CompactEventEnc(nil).MarshalWrite(cmpPrf); chk.E(err) {
return err
}
it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpPrf.Bytes()})
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
key := it.Item().Key()
// Extract serial from key (prefix 3 bytes + serial 5 bytes)
if len(key) >= 8 {
ser := new(types.Uint40)
if err = ser.UnmarshalRead(bytes.NewReader(key[3:8])); err == nil {
serialsToClean = append(serialsToClean, ser.Get())
}
}
}
return nil
}); chk.E(err) {
log.W.F("failed to collect compact event serials: %v", err)
return
}
log.I.F("found %d compact events to clean up legacy storage for", len(serialsToClean))
// Clean up in batches
const batchSize = 1000
for i := 0; i < len(serialsToClean); i += batchSize {
end := i + batchSize
if end > len(serialsToClean) {
end = len(serialsToClean)
}
batch := serialsToClean[i:end]
if err = d.Update(func(txn *badger.Txn) error {
for _, serial := range batch {
ser := new(types.Uint40)
if err = ser.Set(serial); err != nil {
continue
}
// Try to delete evt entry
evtKeyBuf := new(bytes.Buffer)
if err = indexes.EventEnc(ser).MarshalWrite(evtKeyBuf); err == nil {
item, getErr := txn.Get(evtKeyBuf.Bytes())
if getErr == nil {
// Track size before deleting
bytesReclaimed += int64(item.ValueSize())
if delErr := txn.Delete(evtKeyBuf.Bytes()); delErr == nil {
cleanedEvt++
}
}
}
// Try to delete sev entry (need to iterate with prefix since key includes inline data)
sevKeyBuf := new(bytes.Buffer)
if err = indexes.SmallEventEnc(ser).MarshalWrite(sevKeyBuf); err == nil {
opts := badger.DefaultIteratorOptions
opts.Prefix = sevKeyBuf.Bytes()
it := txn.NewIterator(opts)
it.Rewind()
if it.Valid() {
key := it.Item().KeyCopy(nil)
bytesReclaimed += int64(len(key)) // sev stores data in key
if delErr := txn.Delete(key); delErr == nil {
cleanedSev++
}
}
it.Close()
}
}
return nil
}); chk.E(err) {
log.W.F("batch cleanup failed: %v", err)
continue
}
if (i/batchSize)%10 == 0 && i > 0 {
log.I.F("cleanup progress: %d/%d events processed", i, len(serialsToClean))
}
}
log.I.F("legacy storage cleanup complete: removed %d evt entries, %d sev entries, reclaimed approximately %d bytes (%.2f MB)",
cleanedEvt, cleanedSev, bytesReclaimed, float64(bytesReclaimed)/(1024.0*1024.0))
}

View File

@@ -264,18 +264,30 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
// ev.ID, ev.Kind,
// )
// Serialize event once to check size
eventDataBuf := new(bytes.Buffer)
ev.MarshalBinary(eventDataBuf)
eventData := eventDataBuf.Bytes()
// Create serial resolver for compact encoding
resolver := NewDatabaseSerialResolver(d, d.serialCache)
// Determine storage strategy (Reiser4 optimizations)
// Use the threshold from database configuration
// Typical values: 384 (conservative), 512 (recommended), 1024 (aggressive)
smallEventThreshold := d.inlineEventThreshold
isSmallEvent := smallEventThreshold > 0 && len(eventData) <= smallEventThreshold
isReplaceableEvent := kind.IsReplaceable(ev.Kind)
isAddressableEvent := kind.IsParameterizedReplaceable(ev.Kind)
// Serialize event in compact format using serial references
// This dramatically reduces storage by replacing 32-byte IDs/pubkeys with 5-byte serials
compactData, compactErr := MarshalCompactEvent(ev, resolver)
// Calculate legacy size for comparison (for metrics tracking)
// We marshal to get accurate size comparison
legacyBuf := new(bytes.Buffer)
ev.MarshalBinary(legacyBuf)
legacySize := legacyBuf.Len()
if compactErr != nil {
// Fall back to legacy format if compact encoding fails
log.W.F("SaveEvent: compact encoding failed, using legacy format: %v", compactErr)
compactData = legacyBuf.Bytes()
} else {
// Track storage savings
TrackCompactSaving(legacySize, len(compactData))
log.T.F("SaveEvent: compact %d bytes vs legacy %d bytes (saved %d bytes, %.1f%%)",
len(compactData), legacySize, legacySize-len(compactData),
float64(legacySize-len(compactData))/float64(legacySize)*100.0)
}
// Start a transaction to save the event and all its indexes
err = d.Update(
@@ -293,106 +305,25 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
}
}
// Write the event using optimized storage strategy
// Determine if we should use inline addressable/replaceable storage
useAddressableInline := false
var dTag *tag.T
if isAddressableEvent && isSmallEvent {
dTag = ev.Tags.GetFirst([]byte("d"))
useAddressableInline = dTag != nil
// Store the SerialEventId mapping (serial -> full 32-byte event ID)
// This is required for reconstructing compact events
if err = d.StoreEventIdSerial(txn, serial, ev.ID); chk.E(err) {
return
}
// All small events get a sev key for serial-based access
if isSmallEvent {
// Small event: store inline with sev prefix
// Format: sev|serial|size_uint16|event_data
keyBuf := new(bytes.Buffer)
if err = indexes.SmallEventEnc(ser).MarshalWrite(keyBuf); chk.E(err) {
return
}
// Append size as uint16 big-endian (2 bytes for size up to 65535)
sizeBytes := []byte{
byte(len(eventData) >> 8), byte(len(eventData)),
}
keyBuf.Write(sizeBytes)
// Append event data
keyBuf.Write(eventData)
// Cache the event ID mapping
d.serialCache.CacheEventId(serial, ev.ID)
if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
return
}
// log.T.F(
// "SaveEvent: stored small event inline (%d bytes)",
// len(eventData),
// )
} else {
// Large event: store separately with evt prefix
keyBuf := new(bytes.Buffer)
if err = indexes.EventEnc(ser).MarshalWrite(keyBuf); chk.E(err) {
return
}
if err = txn.Set(keyBuf.Bytes(), eventData); chk.E(err) {
return
}
// log.T.F(
// "SaveEvent: stored large event separately (%d bytes)",
// len(eventData),
// )
// Store compact event with cmp prefix
// Format: cmp|serial|compact_event_data
// This is the only storage format - legacy evt/sev/aev/rev prefixes
// are handled by migration and no longer written for new events
cmpKeyBuf := new(bytes.Buffer)
if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKeyBuf); chk.E(err) {
return
}
// Additionally, store replaceable/addressable events with specialized keys for direct access
if useAddressableInline {
// Addressable event: also store with aev|pubkey_hash|kind|dtag_hash|size|data
pubHash := new(types.PubHash)
pubHash.FromPubkey(ev.Pubkey)
kindVal := new(types.Uint16)
kindVal.Set(ev.Kind)
dTagHash := new(types.Ident)
dTagHash.FromIdent(dTag.Value())
keyBuf := new(bytes.Buffer)
if err = indexes.AddressableEventEnc(
pubHash, kindVal, dTagHash,
).MarshalWrite(keyBuf); chk.E(err) {
return
}
// Append size as uint16 big-endian
sizeBytes := []byte{
byte(len(eventData) >> 8), byte(len(eventData)),
}
keyBuf.Write(sizeBytes)
// Append event data
keyBuf.Write(eventData)
if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
return
}
// log.T.F("SaveEvent: also stored addressable event with specialized key")
} else if isReplaceableEvent && isSmallEvent {
// Replaceable event: also store with rev|pubkey_hash|kind|size|data
pubHash := new(types.PubHash)
pubHash.FromPubkey(ev.Pubkey)
kindVal := new(types.Uint16)
kindVal.Set(ev.Kind)
keyBuf := new(bytes.Buffer)
if err = indexes.ReplaceableEventEnc(
pubHash, kindVal,
).MarshalWrite(keyBuf); chk.E(err) {
return
}
// Append size as uint16 big-endian
sizeBytes := []byte{
byte(len(eventData) >> 8), byte(len(eventData)),
}
keyBuf.Write(sizeBytes)
// Append event data
keyBuf.Write(eventData)
if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
return
}
log.T.F("SaveEvent: also stored replaceable event with specialized key")
if err = txn.Set(cmpKeyBuf.Bytes(), compactData); chk.E(err) {
return
}
// Create graph edges between event and all related pubkeys

View File

@@ -0,0 +1,374 @@
//go:build !(js && wasm)
package database
import (
"bytes"
"errors"
"sync"
"github.com/dgraph-io/badger/v4"
"lol.mleku.dev/chk"
"next.orly.dev/pkg/database/indexes"
"next.orly.dev/pkg/database/indexes/types"
)
// SerialCache provides LRU caching for pubkey and event ID serial lookups.
// This is critical for compact event decoding performance since every event
// requires looking up the author pubkey and potentially multiple tag references.
type SerialCache struct {
// Pubkey serial -> full pubkey (for decoding)
pubkeyBySerial map[uint64][]byte
pubkeyBySerialLock sync.RWMutex
// Pubkey hash -> serial (for encoding)
serialByPubkeyHash map[string]uint64
serialByPubkeyHashLock sync.RWMutex
// Event serial -> full event ID (for decoding)
eventIdBySerial map[uint64][]byte
eventIdBySerialLock sync.RWMutex
// Event ID hash -> serial (for encoding)
serialByEventIdHash map[string]uint64
serialByEventIdHashLock sync.RWMutex
// Maximum cache sizes
maxPubkeys int
maxEventIds int
}
// NewSerialCache creates a new serial cache with the specified sizes.
func NewSerialCache(maxPubkeys, maxEventIds int) *SerialCache {
if maxPubkeys <= 0 {
maxPubkeys = 100000 // Default 100k pubkeys (~3.2MB)
}
if maxEventIds <= 0 {
maxEventIds = 500000 // Default 500k event IDs (~16MB)
}
return &SerialCache{
pubkeyBySerial: make(map[uint64][]byte, maxPubkeys),
serialByPubkeyHash: make(map[string]uint64, maxPubkeys),
eventIdBySerial: make(map[uint64][]byte, maxEventIds),
serialByEventIdHash: make(map[string]uint64, maxEventIds),
maxPubkeys: maxPubkeys,
maxEventIds: maxEventIds,
}
}
// CachePubkey adds a pubkey to the cache.
func (c *SerialCache) CachePubkey(serial uint64, pubkey []byte) {
if len(pubkey) != 32 {
return
}
// Cache serial -> pubkey
c.pubkeyBySerialLock.Lock()
if len(c.pubkeyBySerial) >= c.maxPubkeys {
// Simple eviction: clear half the cache
// A proper LRU would be better but this is simpler
count := 0
for k := range c.pubkeyBySerial {
delete(c.pubkeyBySerial, k)
count++
if count >= c.maxPubkeys/2 {
break
}
}
}
pk := make([]byte, 32)
copy(pk, pubkey)
c.pubkeyBySerial[serial] = pk
c.pubkeyBySerialLock.Unlock()
// Cache pubkey hash -> serial
c.serialByPubkeyHashLock.Lock()
if len(c.serialByPubkeyHash) >= c.maxPubkeys {
count := 0
for k := range c.serialByPubkeyHash {
delete(c.serialByPubkeyHash, k)
count++
if count >= c.maxPubkeys/2 {
break
}
}
}
c.serialByPubkeyHash[string(pubkey)] = serial
c.serialByPubkeyHashLock.Unlock()
}
// GetPubkeyBySerial returns the pubkey for a serial from cache.
func (c *SerialCache) GetPubkeyBySerial(serial uint64) (pubkey []byte, found bool) {
c.pubkeyBySerialLock.RLock()
pubkey, found = c.pubkeyBySerial[serial]
c.pubkeyBySerialLock.RUnlock()
return
}
// GetSerialByPubkey returns the serial for a pubkey from cache.
func (c *SerialCache) GetSerialByPubkey(pubkey []byte) (serial uint64, found bool) {
c.serialByPubkeyHashLock.RLock()
serial, found = c.serialByPubkeyHash[string(pubkey)]
c.serialByPubkeyHashLock.RUnlock()
return
}
// CacheEventId adds an event ID to the cache.
func (c *SerialCache) CacheEventId(serial uint64, eventId []byte) {
if len(eventId) != 32 {
return
}
// Cache serial -> event ID
c.eventIdBySerialLock.Lock()
if len(c.eventIdBySerial) >= c.maxEventIds {
count := 0
for k := range c.eventIdBySerial {
delete(c.eventIdBySerial, k)
count++
if count >= c.maxEventIds/2 {
break
}
}
}
eid := make([]byte, 32)
copy(eid, eventId)
c.eventIdBySerial[serial] = eid
c.eventIdBySerialLock.Unlock()
// Cache event ID hash -> serial
c.serialByEventIdHashLock.Lock()
if len(c.serialByEventIdHash) >= c.maxEventIds {
count := 0
for k := range c.serialByEventIdHash {
delete(c.serialByEventIdHash, k)
count++
if count >= c.maxEventIds/2 {
break
}
}
}
c.serialByEventIdHash[string(eventId)] = serial
c.serialByEventIdHashLock.Unlock()
}
// GetEventIdBySerial returns the event ID for a serial from cache.
func (c *SerialCache) GetEventIdBySerial(serial uint64) (eventId []byte, found bool) {
c.eventIdBySerialLock.RLock()
eventId, found = c.eventIdBySerial[serial]
c.eventIdBySerialLock.RUnlock()
return
}
// GetSerialByEventId returns the serial for an event ID from cache.
func (c *SerialCache) GetSerialByEventId(eventId []byte) (serial uint64, found bool) {
c.serialByEventIdHashLock.RLock()
serial, found = c.serialByEventIdHash[string(eventId)]
c.serialByEventIdHashLock.RUnlock()
return
}
// DatabaseSerialResolver implements SerialResolver using the database and cache.
type DatabaseSerialResolver struct {
db *D
cache *SerialCache
}
// NewDatabaseSerialResolver creates a new resolver.
func NewDatabaseSerialResolver(db *D, cache *SerialCache) *DatabaseSerialResolver {
return &DatabaseSerialResolver{db: db, cache: cache}
}
// GetOrCreatePubkeySerial implements SerialResolver.
func (r *DatabaseSerialResolver) GetOrCreatePubkeySerial(pubkey []byte) (serial uint64, err error) {
if len(pubkey) != 32 {
return 0, errors.New("pubkey must be 32 bytes")
}
// Check cache first
if s, found := r.cache.GetSerialByPubkey(pubkey); found {
return s, nil
}
// Use existing function which handles creation
ser, err := r.db.GetOrCreatePubkeySerial(pubkey)
if err != nil {
return 0, err
}
serial = ser.Get()
// Cache it
r.cache.CachePubkey(serial, pubkey)
return serial, nil
}
// GetPubkeyBySerial implements SerialResolver.
func (r *DatabaseSerialResolver) GetPubkeyBySerial(serial uint64) (pubkey []byte, err error) {
// Check cache first
if pk, found := r.cache.GetPubkeyBySerial(serial); found {
return pk, nil
}
// Look up in database
ser := new(types.Uint40)
if err = ser.Set(serial); err != nil {
return nil, err
}
pubkey, err = r.db.GetPubkeyBySerial(ser)
if err != nil {
return nil, err
}
// Cache it
r.cache.CachePubkey(serial, pubkey)
return pubkey, nil
}
// GetEventSerialById implements SerialResolver.
func (r *DatabaseSerialResolver) GetEventSerialById(eventId []byte) (serial uint64, found bool, err error) {
if len(eventId) != 32 {
return 0, false, errors.New("event ID must be 32 bytes")
}
// Check cache first
if s, ok := r.cache.GetSerialByEventId(eventId); ok {
return s, true, nil
}
// Look up in database using existing GetSerialById
ser, err := r.db.GetSerialById(eventId)
if err != nil {
// Not found is not an error - just return found=false
return 0, false, nil
}
serial = ser.Get()
// Cache it
r.cache.CacheEventId(serial, eventId)
return serial, true, nil
}
// GetEventIdBySerial implements SerialResolver.
func (r *DatabaseSerialResolver) GetEventIdBySerial(serial uint64) (eventId []byte, err error) {
// Check cache first
if eid, found := r.cache.GetEventIdBySerial(serial); found {
return eid, nil
}
// Look up in database - use SerialEventId index
ser := new(types.Uint40)
if err = ser.Set(serial); err != nil {
return nil, err
}
eventId, err = r.db.GetEventIdBySerial(ser)
if err != nil {
return nil, err
}
// Cache it
r.cache.CacheEventId(serial, eventId)
return eventId, nil
}
// GetEventIdBySerial looks up an event ID by its serial number.
// Uses the SerialEventId index (sei prefix).
func (d *D) GetEventIdBySerial(ser *types.Uint40) (eventId []byte, err error) {
keyBuf := new(bytes.Buffer)
if err = indexes.SerialEventIdEnc(ser).MarshalWrite(keyBuf); chk.E(err) {
return nil, err
}
err = d.View(func(txn *badger.Txn) error {
item, gerr := txn.Get(keyBuf.Bytes())
if chk.E(gerr) {
return gerr
}
return item.Value(func(val []byte) error {
eventId = make([]byte, len(val))
copy(eventId, val)
return nil
})
})
if err != nil {
return nil, errors.New("event ID not found for serial")
}
return eventId, nil
}
// StoreEventIdSerial stores the mapping from event serial to full event ID.
// This is called during event save to enable later reconstruction.
func (d *D) StoreEventIdSerial(txn *badger.Txn, serial uint64, eventId []byte) error {
if len(eventId) != 32 {
return errors.New("event ID must be 32 bytes")
}
ser := new(types.Uint40)
if err := ser.Set(serial); err != nil {
return err
}
keyBuf := new(bytes.Buffer)
if err := indexes.SerialEventIdEnc(ser).MarshalWrite(keyBuf); chk.E(err) {
return err
}
return txn.Set(keyBuf.Bytes(), eventId)
}
// SerialCacheStats holds statistics about the serial cache.
type SerialCacheStats struct {
PubkeysCached int // Number of pubkeys currently cached
PubkeysMaxSize int // Maximum pubkey cache size
EventIdsCached int // Number of event IDs currently cached
EventIdsMaxSize int // Maximum event ID cache size
PubkeyMemoryBytes int // Estimated memory usage for pubkey cache
EventIdMemoryBytes int // Estimated memory usage for event ID cache
TotalMemoryBytes int // Total estimated memory usage
}
// Stats returns statistics about the serial cache.
func (c *SerialCache) Stats() SerialCacheStats {
c.pubkeyBySerialLock.RLock()
pubkeysCached := len(c.pubkeyBySerial)
c.pubkeyBySerialLock.RUnlock()
c.eventIdBySerialLock.RLock()
eventIdsCached := len(c.eventIdBySerial)
c.eventIdBySerialLock.RUnlock()
// Memory estimation:
// - Each pubkey entry: 8 bytes (uint64 key) + 32 bytes (pubkey value) = 40 bytes
// - Each event ID entry: 8 bytes (uint64 key) + 32 bytes (event ID value) = 40 bytes
// - Map overhead is roughly 2x the entry size for buckets
pubkeyMemory := pubkeysCached * 40 * 2
eventIdMemory := eventIdsCached * 40 * 2
return SerialCacheStats{
PubkeysCached: pubkeysCached,
PubkeysMaxSize: c.maxPubkeys,
EventIdsCached: eventIdsCached,
EventIdsMaxSize: c.maxEventIds,
PubkeyMemoryBytes: pubkeyMemory,
EventIdMemoryBytes: eventIdMemory,
TotalMemoryBytes: pubkeyMemory + eventIdMemory,
}
}
// SerialCacheStats returns statistics about the serial cache.
func (d *D) SerialCacheStats() SerialCacheStats {
if d.serialCache == nil {
return SerialCacheStats{}
}
return d.serialCache.Stats()
}

View File

@@ -16,12 +16,13 @@ This document provides a comprehensive guide to the Neo4j database schema used b
## Architecture Overview
The Neo4j implementation uses a **dual-node architecture** to separate concerns:
The Neo4j implementation uses a **unified node architecture**:
1. **NIP-01 Base Layer**: Stores Nostr events with `Event`, `Author`, and `Tag` nodes for standard relay operations
2. **WoT Extension Layer**: Stores social graph data with `NostrUser` nodes and relationship types (`FOLLOWS`, `MUTES`, `REPORTS`) for trust calculations
1. **Event Storage**: `Event` and `Tag` nodes store Nostr events for standard relay operations
2. **User Identity**: `NostrUser` nodes represent all Nostr users (both event authors and social graph participants)
3. **Social Graph**: Relationship types (`FOLLOWS`, `MUTES`, `REPORTS`) between `NostrUser` nodes for trust calculations
This separation allows the WoT extension to be modified independently without affecting NIP-01 compliance.
**Note:** The `Author` label was deprecated and merged into `NostrUser` to eliminate redundancy. A migration automatically converts existing `Author` nodes when the relay starts.
### Data Model Summary
@@ -72,16 +73,17 @@ From the specification document:
These elements are **required** for a NIP-01 compliant relay.
### Constraints (schema.go:30-43)
### Constraints (schema.go:30-44)
```cypher
-- Event ID uniqueness (for "ids" filter)
CREATE CONSTRAINT event_id_unique IF NOT EXISTS
FOR (e:Event) REQUIRE e.id IS UNIQUE
-- Author pubkey uniqueness (for "authors" filter)
CREATE CONSTRAINT author_pubkey_unique IF NOT EXISTS
FOR (a:Author) REQUIRE a.pubkey IS UNIQUE
-- NostrUser pubkey uniqueness (for "authors" filter and social graph)
-- NostrUser unifies both NIP-01 author tracking and WoT social graph
CREATE CONSTRAINT nostrUser_pubkey IF NOT EXISTS
FOR (n:NostrUser) REQUIRE n.pubkey IS UNIQUE
```
### Indexes (schema.go:84-108)
@@ -122,14 +124,14 @@ Created in `save-event.go:buildEventCreationCypher()`:
Created in `save-event.go:buildEventCreationCypher()`:
```cypher
-- Event → Author relationship
(e:Event)-[:AUTHORED_BY]->(a:Author {pubkey: ...})
-- Event → NostrUser relationship (author)
(e:Event)-[:AUTHORED_BY]->(u:NostrUser {pubkey: ...})
-- Event → Event reference (e-tags)
(e:Event)-[:REFERENCES]->(ref:Event)
-- Event → Author mention (p-tags)
(e:Event)-[:MENTIONS]->(mentioned:Author)
-- Event → NostrUser mention (p-tags)
(e:Event)-[:MENTIONS]->(mentioned:NostrUser)
-- Event → Tag (other tags like #t, #d, etc.)
(e:Event)-[:TAGGED_WITH]->(t:Tag {type: ..., value: ...})
@@ -146,7 +148,7 @@ The `query-events.go` file translates Nostr REQ filters into Cypher queries.
| NIP-01 Filter | Cypher Translation | Index Used |
|---------------|-------------------|------------|
| `ids: ["abc..."]` | `e.id = $id_0` or `e.id STARTS WITH $id_0` | `event_id_unique` |
| `authors: ["def..."]` | `e.pubkey = $author_0` or `e.pubkey STARTS WITH $author_0` | `author_pubkey_unique` |
| `authors: ["def..."]` | `e.pubkey = $author_0` or `e.pubkey STARTS WITH $author_0` | `nostrUser_pubkey` |
| `kinds: [1, 7]` | `e.kind IN $kinds` | `event_kind` |
| `since: 1234567890` | `e.created_at >= $since` | `event_created_at` |
| `until: 1234567890` | `e.created_at <= $until` | `event_created_at` |
@@ -435,25 +437,28 @@ if ev.Kind == 1 {
### Adding NostrEventTag → NostrUser REFERENCES
Per the specification update, p-tags should create `REFERENCES` relationships to `NostrUser` nodes:
The current implementation creates `MENTIONS` relationships from Events to `NostrUser` nodes for p-tags:
```go
// In save-event.go buildEventCreationCypher(), modify p-tag handling:
// In save-event.go buildEventCreationCypher(), p-tag handling:
case "p":
// Current implementation: creates MENTIONS to Author
// Creates MENTIONS to NostrUser (unified node for both author and social graph)
cypher += fmt.Sprintf(`
MERGE (mentioned%d:Author {pubkey: $%s})
MERGE (mentioned%d:NostrUser {pubkey: $%s})
ON CREATE SET mentioned%d.created_at = timestamp()
CREATE (e)-[:MENTIONS]->(mentioned%d)
`, pTagIndex, paramName, pTagIndex)
`, pTagIndex, paramName, pTagIndex, pTagIndex)
```
// NEW: Also reference NostrUser for WoT traversal
To add additional tag nodes for enhanced query patterns:
```go
// Optional: Also create a Tag node for the p-tag
cypher += fmt.Sprintf(`
MERGE (user%d:NostrUser {pubkey: $%s})
// Create a Tag node for the p-tag
MERGE (pTag%d:NostrEventTag {tag_name: 'p', tag_value: $%s})
CREATE (e)-[:HAS_TAG]->(pTag%d)
CREATE (pTag%d)-[:REFERENCES]->(user%d)
`, pTagIndex, paramName, pTagIndex, paramName, pTagIndex, pTagIndex, pTagIndex)
CREATE (pTag%d)-[:REFERENCES]->(mentioned%d)
`, pTagIndex, paramName, pTagIndex, pTagIndex, pTagIndex)
```
---

View File

@@ -168,9 +168,9 @@ RETURN e
### Social graph query
```cypher
MATCH (author:Author {pubkey: "abc123..."})
MATCH (author:NostrUser {pubkey: "abc123..."})
<-[:AUTHORED_BY]-(e:Event)
-[:MENTIONS]->(mentioned:Author)
-[:MENTIONS]->(mentioned:NostrUser)
RETURN author, e, mentioned
```

View File

@@ -55,7 +55,7 @@ func TestExpiration_SaveEventWithExpiration(t *testing.T) {
ev.CreatedAt = timestamp.Now().V
ev.Kind = 1
ev.Content = []byte("Event with expiration")
ev.Tags = tag.NewS(tag.NewFromAny("expiration", timestamp.From(futureExpiration).String()))
ev.Tags = tag.NewS(tag.NewFromAny("expiration", timestamp.FromUnix(futureExpiration).String()))
if err := ev.Sign(signer); err != nil {
t.Fatalf("Failed to sign event: %v", err)
@@ -118,7 +118,7 @@ func TestExpiration_DeleteExpiredEvents(t *testing.T) {
expiredEv.CreatedAt = timestamp.Now().V - 7200 // 2 hours ago
expiredEv.Kind = 1
expiredEv.Content = []byte("Expired event")
expiredEv.Tags = tag.NewS(tag.NewFromAny("expiration", timestamp.From(pastExpiration).String()))
expiredEv.Tags = tag.NewS(tag.NewFromAny("expiration", timestamp.FromUnix(pastExpiration).String()))
if err := expiredEv.Sign(signer); err != nil {
t.Fatalf("Failed to sign expired event: %v", err)
@@ -136,7 +136,7 @@ func TestExpiration_DeleteExpiredEvents(t *testing.T) {
validEv.CreatedAt = timestamp.Now().V
validEv.Kind = 1
validEv.Content = []byte("Valid event")
validEv.Tags = tag.NewS(tag.NewFromAny("expiration", timestamp.From(futureExpiration).String()))
validEv.Tags = tag.NewS(tag.NewFromAny("expiration", timestamp.FromUnix(futureExpiration).String()))
if err := validEv.Sign(signer); err != nil {
t.Fatalf("Failed to sign valid event: %v", err)

View File

@@ -331,7 +331,7 @@ func TestGetSerialsByIds(t *testing.T) {
}
// Create and save multiple events
ids := tag.NewS()
ids := tag.New()
for i := 0; i < 3; i++ {
ev := event.New()
ev.Pubkey = signer.Pub()
@@ -347,7 +347,8 @@ func TestGetSerialsByIds(t *testing.T) {
t.Fatalf("Failed to save event: %v", err)
}
ids.Append(tag.NewFromAny("", hex.Enc(ev.ID[:])))
// Append ID to the tag's T slice
ids.T = append(ids.T, []byte(hex.Enc(ev.ID[:])))
}
// Get serials by IDs

197
pkg/neo4j/migrations.go Normal file
View File

@@ -0,0 +1,197 @@
package neo4j
import (
"context"
"fmt"
)
// Migration represents a database migration with a version identifier
type Migration struct {
Version string
Description string
Migrate func(ctx context.Context, n *N) error
}
// migrations is the ordered list of database migrations
// Migrations are applied in order and tracked via Marker nodes
var migrations = []Migration{
{
Version: "v1",
Description: "Merge Author nodes into NostrUser nodes",
Migrate: migrateAuthorToNostrUser,
},
}
// RunMigrations executes all pending migrations
func (n *N) RunMigrations() {
ctx := context.Background()
for _, migration := range migrations {
// Check if migration has already been applied
if n.migrationApplied(ctx, migration.Version) {
n.Logger.Infof("migration %s already applied, skipping", migration.Version)
continue
}
n.Logger.Infof("applying migration %s: %s", migration.Version, migration.Description)
if err := migration.Migrate(ctx, n); err != nil {
n.Logger.Errorf("migration %s failed: %v", migration.Version, err)
// Continue to next migration - don't fail startup
continue
}
// Mark migration as complete
if err := n.markMigrationComplete(ctx, migration.Version, migration.Description); err != nil {
n.Logger.Warningf("failed to mark migration %s as complete: %v", migration.Version, err)
}
n.Logger.Infof("migration %s completed successfully", migration.Version)
}
}
// migrationApplied checks if a migration has already been applied
func (n *N) migrationApplied(ctx context.Context, version string) bool {
cypher := `
MATCH (m:Migration {version: $version})
RETURN m.version
`
result, err := n.ExecuteRead(ctx, cypher, map[string]any{"version": version})
if err != nil {
return false
}
return result.Next(ctx)
}
// markMigrationComplete marks a migration as applied
func (n *N) markMigrationComplete(ctx context.Context, version, description string) error {
cypher := `
CREATE (m:Migration {
version: $version,
description: $description,
applied_at: timestamp()
})
`
_, err := n.ExecuteWrite(ctx, cypher, map[string]any{
"version": version,
"description": description,
})
return err
}
// migrateAuthorToNostrUser migrates Author nodes to NostrUser nodes
// This consolidates the separate Author (NIP-01) and NostrUser (WoT) labels
// into a unified NostrUser label for the social graph
func migrateAuthorToNostrUser(ctx context.Context, n *N) error {
// Step 1: Check if there are any Author nodes to migrate
countCypher := `MATCH (a:Author) RETURN count(a) AS count`
countResult, err := n.ExecuteRead(ctx, countCypher, nil)
if err != nil {
return fmt.Errorf("failed to count Author nodes: %w", err)
}
var authorCount int64
if countResult.Next(ctx) {
record := countResult.Record()
if count, ok := record.Values[0].(int64); ok {
authorCount = count
}
}
if authorCount == 0 {
n.Logger.Infof("no Author nodes to migrate")
return nil
}
n.Logger.Infof("migrating %d Author nodes to NostrUser", authorCount)
// Step 2: For each Author node, merge into NostrUser with same pubkey
// This uses MERGE to either match existing NostrUser or create new one
// Then copies any relationships from Author to NostrUser
mergeCypher := `
// Match all Author nodes
MATCH (a:Author)
// For each Author, merge into NostrUser (creates if doesn't exist)
MERGE (u:NostrUser {pubkey: a.pubkey})
ON CREATE SET u.created_at = timestamp(), u.migrated_from_author = true
// Return count for logging
RETURN count(DISTINCT a) AS migrated
`
result, err := n.ExecuteWrite(ctx, mergeCypher, nil)
if err != nil {
return fmt.Errorf("failed to merge Author nodes to NostrUser: %w", err)
}
// Log result (result consumption happens within the session)
_ = result
// Step 3: Migrate AUTHORED_BY relationships from Author to NostrUser
// Events should now point to NostrUser instead of Author
relationshipCypher := `
// Find events linked to Author via AUTHORED_BY
MATCH (e:Event)-[r:AUTHORED_BY]->(a:Author)
// Get or create the corresponding NostrUser
MATCH (u:NostrUser {pubkey: a.pubkey})
// Create new relationship to NostrUser if it doesn't exist
MERGE (e)-[:AUTHORED_BY]->(u)
// Delete old relationship to Author
DELETE r
RETURN count(r) AS migrated_relationships
`
_, err = n.ExecuteWrite(ctx, relationshipCypher, nil)
if err != nil {
return fmt.Errorf("failed to migrate AUTHORED_BY relationships: %w", err)
}
// Step 4: Migrate MENTIONS relationships from Author to NostrUser
mentionsCypher := `
// Find events with MENTIONS to Author
MATCH (e:Event)-[r:MENTIONS]->(a:Author)
// Get or create the corresponding NostrUser
MATCH (u:NostrUser {pubkey: a.pubkey})
// Create new relationship to NostrUser if it doesn't exist
MERGE (e)-[:MENTIONS]->(u)
// Delete old relationship to Author
DELETE r
RETURN count(r) AS migrated_mentions
`
_, err = n.ExecuteWrite(ctx, mentionsCypher, nil)
if err != nil {
return fmt.Errorf("failed to migrate MENTIONS relationships: %w", err)
}
// Step 5: Delete orphaned Author nodes (no longer needed)
deleteCypher := `
// Find Author nodes with no remaining relationships
MATCH (a:Author)
WHERE NOT (a)<-[:AUTHORED_BY]-() AND NOT (a)<-[:MENTIONS]-()
DETACH DELETE a
RETURN count(a) AS deleted
`
_, err = n.ExecuteWrite(ctx, deleteCypher, nil)
if err != nil {
return fmt.Errorf("failed to delete orphaned Author nodes: %w", err)
}
// Step 6: Drop the old Author constraint if it exists
dropConstraintCypher := `DROP CONSTRAINT author_pubkey_unique IF EXISTS`
_, _ = n.ExecuteWrite(ctx, dropConstraintCypher, nil)
// Ignore error as constraint may not exist
n.Logger.Infof("completed Author to NostrUser migration")
return nil
}

View File

@@ -135,6 +135,9 @@ func NewWithConfig(
return
}
// Run database migrations (e.g., Author -> NostrUser consolidation)
n.RunMigrations()
// Initialize serial counter
if err = n.initSerialCounter(); chk.E(err) {
return
@@ -298,10 +301,8 @@ func (n *N) EventIdsBySerial(start uint64, count int) (
return
}
// RunMigrations runs database migrations (no-op for neo4j)
func (n *N) RunMigrations() {
// No-op for neo4j
}
// RunMigrations is implemented in migrations.go
// It handles schema migrations like the Author -> NostrUser consolidation
// Ready returns a channel that closes when the database is ready to serve requests.
// This allows callers to wait for database warmup to complete.

View File

@@ -290,16 +290,16 @@ func TestQueryEventsWithLimit(t *testing.T) {
}
// Query with limit
limit := 5
limit := uint(5)
evs, err := db.QueryEvents(ctx, &filter.F{
Kinds: kind.NewS(kind.New(1)),
Limit: limit,
Limit: &limit,
})
if err != nil {
t.Fatalf("Failed to query events with limit: %v", err)
}
if len(evs) != limit {
if len(evs) != int(limit) {
t.Fatalf("Expected %d events with limit, got %d", limit, len(evs))
}
@@ -406,8 +406,7 @@ func TestQueryEventsMultipleAuthors(t *testing.T) {
createAndSaveEvent(t, ctx, db, charlie, 1, "Charlie", nil, baseTs+2)
// Query for Alice and Bob's events
authors := tag.NewFromBytesSlice(alice.Pub())
authors.Append(tag.NewFromBytesSlice(bob.Pub()).GetFirst(nil))
authors := tag.NewFromBytesSlice(alice.Pub(), bob.Pub())
evs, err := db.QueryEvents(ctx, &filter.F{
Authors: authors,
@@ -437,7 +436,7 @@ func TestCountEvents(t *testing.T) {
}
// Count events
count, err := db.CountEvents(ctx, &filter.F{
count, _, err := db.CountEvents(ctx, &filter.F{
Kinds: kind.NewS(kind.New(1)),
})
if err != nil {

View File

@@ -84,7 +84,7 @@ func (n *N) SaveEvent(c context.Context, ev *event.E) (exists bool, err error) {
// buildEventCreationCypher constructs a Cypher query to create an event node with all relationships
// This is a single atomic operation that creates:
// - Event node with all properties
// - Author node and AUTHORED_BY relationship
// - NostrUser node and AUTHORED_BY relationship (unified author + WoT node)
// - Tag nodes and TAGGED_WITH relationships
// - Reference relationships (REFERENCES for 'e' tags, MENTIONS for 'p' tags)
func (n *N) buildEventCreationCypher(ev *event.E, serial uint64) (string, map[string]any) {
@@ -124,10 +124,12 @@ func (n *N) buildEventCreationCypher(ev *event.E, serial uint64) (string, map[st
params["tags"] = string(tagsJSON)
// Start building the Cypher query
// Use MERGE to ensure idempotency for author nodes
// Use MERGE to ensure idempotency for NostrUser nodes
// NostrUser serves both NIP-01 author tracking and WoT social graph
cypher := `
// Create or match author node
MERGE (a:Author {pubkey: $pubkey})
// Create or match NostrUser node (unified author + social graph)
MERGE (a:NostrUser {pubkey: $pubkey})
ON CREATE SET a.created_at = timestamp(), a.first_seen_event = $eventId
// Create event node with expiration for NIP-40 support
CREATE (e:Event {
@@ -177,22 +179,16 @@ CREATE (e)-[:AUTHORED_BY]->(a)
paramName := fmt.Sprintf("eTag_%d", eTagIndex)
params[paramName] = tagValue
// Add WITH clause before OPTIONAL MATCH
// This is required because:
// 1. Cypher doesn't allow MATCH after CREATE without WITH
// 2. Cypher doesn't allow MATCH after FOREACH without WITH
// So we need WITH before EVERY OPTIONAL MATCH, not just the first
// Add WITH clause before first OPTIONAL MATCH only
// This is required because Cypher doesn't allow MATCH after CREATE without WITH.
// However, you CAN chain multiple OPTIONAL MATCH + FOREACH pairs without
// additional WITH clauses between them - Cypher allows OPTIONAL MATCH after FOREACH.
if needsWithClause {
cypher += `
// Carry forward event and author nodes for tag processing
WITH e, a
`
needsWithClause = false
} else {
// After a FOREACH, we need WITH to transition back to MATCH
cypher += `
WITH e, a
`
}
cypher += fmt.Sprintf(`
@@ -212,15 +208,16 @@ FOREACH (ignoreMe IN CASE WHEN ref%d IS NOT NULL THEN [1] ELSE [] END |
continue // Skip invalid p-tags
}
// Create mention to another author
// Create mention to another NostrUser
paramName := fmt.Sprintf("pTag_%d", pTagIndex)
params[paramName] = tagValue
cypher += fmt.Sprintf(`
// Mention of author (p-tag)
MERGE (mentioned%d:Author {pubkey: $%s})
// Mention of NostrUser (p-tag)
MERGE (mentioned%d:NostrUser {pubkey: $%s})
ON CREATE SET mentioned%d.created_at = timestamp()
CREATE (e)-[:MENTIONS]->(mentioned%d)
`, pTagIndex, paramName, pTagIndex)
`, pTagIndex, paramName, pTagIndex, pTagIndex)
pTagIndex++

View File

@@ -542,7 +542,7 @@ func TestSaveEvent_ETagReference(t *testing.T) {
// Verify MENTIONS relationship was also created for the p-tag
mentionsCypher := `
MATCH (reply:Event {id: $replyId})-[:MENTIONS]->(author:Author {pubkey: $authorPubkey})
MATCH (reply:Event {id: $replyId})-[:MENTIONS]->(author:NostrUser {pubkey: $authorPubkey})
RETURN author.pubkey AS pubkey
`
mentionsParams := map[string]any{

View File

@@ -37,10 +37,11 @@ func (n *N) applySchema(ctx context.Context) error {
// REQ filters can specify: {"ids": ["<event_id>", ...]}
"CREATE CONSTRAINT event_id_unique IF NOT EXISTS FOR (e:Event) REQUIRE e.id IS UNIQUE",
// MANDATORY (NIP-01): Author.pubkey uniqueness for "authors" filter
// MANDATORY (NIP-01): NostrUser.pubkey uniqueness for "authors" filter
// REQ filters can specify: {"authors": ["<pubkey>", ...]}
// Events are linked to Author nodes via AUTHORED_BY relationship
"CREATE CONSTRAINT author_pubkey_unique IF NOT EXISTS FOR (a:Author) REQUIRE a.pubkey IS UNIQUE",
// Events are linked to NostrUser nodes via AUTHORED_BY relationship
// NOTE: NostrUser unifies both NIP-01 author tracking and WoT social graph
"CREATE CONSTRAINT nostrUser_pubkey IF NOT EXISTS FOR (n:NostrUser) REQUIRE n.pubkey IS UNIQUE",
// ============================================================
// === OPTIONAL: Internal Relay Operations ===
@@ -66,9 +67,8 @@ func (n *N) applySchema(ctx context.Context) error {
// Not required for NIP-01 compliance
// ============================================================
// OPTIONAL (WoT): NostrUser nodes for social graph/trust metrics
// Separate from Author nodes - Author is for NIP-01, NostrUser for WoT
"CREATE CONSTRAINT nostrUser_pubkey IF NOT EXISTS FOR (n:NostrUser) REQUIRE n.pubkey IS UNIQUE",
// NOTE: NostrUser constraint is defined above in MANDATORY section
// It serves both NIP-01 (author tracking) and WoT (social graph) purposes
// OPTIONAL (WoT): Container for WoT metrics cards per observee
"CREATE CONSTRAINT setOfNostrUserWotMetricsCards_observee_pubkey IF NOT EXISTS FOR (n:SetOfNostrUserWotMetricsCards) REQUIRE n.observee_pubkey IS UNIQUE",
@@ -200,6 +200,9 @@ func (n *N) dropAll(ctx context.Context) error {
constraints := []string{
// MANDATORY (NIP-01) constraints
"DROP CONSTRAINT event_id_unique IF EXISTS",
"DROP CONSTRAINT nostrUser_pubkey IF EXISTS", // Unified author + WoT constraint
// Legacy constraint (removed in migration)
"DROP CONSTRAINT author_pubkey_unique IF EXISTS",
// OPTIONAL (Internal) constraints
@@ -207,9 +210,6 @@ func (n *N) dropAll(ctx context.Context) error {
// OPTIONAL (Social Graph) constraints
"DROP CONSTRAINT processedSocialEvent_event_id IF EXISTS",
// OPTIONAL (WoT) constraints
"DROP CONSTRAINT nostrUser_pubkey IF EXISTS",
"DROP CONSTRAINT setOfNostrUserWotMetricsCards_observee_pubkey IF EXISTS",
"DROP CONSTRAINT nostrUserWotMetricsCard_unique_combination_1 IF EXISTS",
"DROP CONSTRAINT nostrUserWotMetricsCard_unique_combination_2 IF EXISTS",

View File

@@ -5,179 +5,12 @@ import (
"os"
"testing"
"git.mleku.dev/mleku/nostr/encoders/filter"
"git.mleku.dev/mleku/nostr/encoders/kind"
"git.mleku.dev/mleku/nostr/interfaces/signer/p8k"
)
func TestSubscriptions_AddAndRemove(t *testing.T) {
neo4jURI := os.Getenv("ORLY_NEO4J_URI")
if neo4jURI == "" {
t.Skip("Skipping Neo4j test: ORLY_NEO4J_URI not set")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tempDir := t.TempDir()
db, err := New(ctx, cancel, tempDir, "debug")
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer db.Close()
<-db.Ready()
// Create a subscription
subID := "test-sub-123"
f := &filter.F{
Kinds: kind.NewS(kind.New(1)),
}
// Add subscription
db.AddSubscription(subID, f)
// Get subscription count (should be 1)
count := db.GetSubscriptionCount()
if count != 1 {
t.Fatalf("Expected 1 subscription, got %d", count)
}
// Remove subscription
db.RemoveSubscription(subID)
// Get subscription count (should be 0)
count = db.GetSubscriptionCount()
if count != 0 {
t.Fatalf("Expected 0 subscriptions after removal, got %d", count)
}
t.Logf("✓ Subscription add/remove works correctly")
}
func TestSubscriptions_MultipleSubscriptions(t *testing.T) {
neo4jURI := os.Getenv("ORLY_NEO4J_URI")
if neo4jURI == "" {
t.Skip("Skipping Neo4j test: ORLY_NEO4J_URI not set")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tempDir := t.TempDir()
db, err := New(ctx, cancel, tempDir, "debug")
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer db.Close()
<-db.Ready()
// Add multiple subscriptions
for i := 0; i < 5; i++ {
subID := string(rune('A' + i))
f := &filter.F{
Kinds: kind.NewS(kind.New(uint16(i + 1))),
}
db.AddSubscription(subID, f)
}
// Get subscription count
count := db.GetSubscriptionCount()
if count != 5 {
t.Fatalf("Expected 5 subscriptions, got %d", count)
}
// Remove some subscriptions
db.RemoveSubscription("A")
db.RemoveSubscription("C")
count = db.GetSubscriptionCount()
if count != 3 {
t.Fatalf("Expected 3 subscriptions after removal, got %d", count)
}
// Clear all subscriptions
db.ClearSubscriptions()
count = db.GetSubscriptionCount()
if count != 0 {
t.Fatalf("Expected 0 subscriptions after clear, got %d", count)
}
t.Logf("✓ Multiple subscriptions managed correctly")
}
func TestSubscriptions_DuplicateID(t *testing.T) {
neo4jURI := os.Getenv("ORLY_NEO4J_URI")
if neo4jURI == "" {
t.Skip("Skipping Neo4j test: ORLY_NEO4J_URI not set")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tempDir := t.TempDir()
db, err := New(ctx, cancel, tempDir, "debug")
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer db.Close()
<-db.Ready()
subID := "duplicate-test"
// Add first subscription
f1 := &filter.F{
Kinds: kind.NewS(kind.New(1)),
}
db.AddSubscription(subID, f1)
// Add subscription with same ID (should replace)
f2 := &filter.F{
Kinds: kind.NewS(kind.New(7)),
}
db.AddSubscription(subID, f2)
// Should still have only 1 subscription
count := db.GetSubscriptionCount()
if count != 1 {
t.Fatalf("Expected 1 subscription (duplicate replaced), got %d", count)
}
t.Logf("✓ Duplicate subscription ID handling works correctly")
}
func TestSubscriptions_RemoveNonExistent(t *testing.T) {
neo4jURI := os.Getenv("ORLY_NEO4J_URI")
if neo4jURI == "" {
t.Skip("Skipping Neo4j test: ORLY_NEO4J_URI not set")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tempDir := t.TempDir()
db, err := New(ctx, cancel, tempDir, "debug")
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer db.Close()
<-db.Ready()
// Try to remove non-existent subscription (should not panic)
db.RemoveSubscription("non-existent")
// Should still have 0 subscriptions
count := db.GetSubscriptionCount()
if count != 0 {
t.Fatalf("Expected 0 subscriptions, got %d", count)
}
t.Logf("✓ Removing non-existent subscription handled gracefully")
}
// Note: WebSocket subscription management (AddSubscription, GetSubscriptionCount,
// RemoveSubscription, ClearSubscriptions) is handled at the app layer, not the
// database layer. Tests for those methods have been removed.
func TestMarkers_SetGetDelete(t *testing.T) {
neo4jURI := os.Getenv("ORLY_NEO4J_URI")
@@ -371,24 +204,36 @@ func TestIdentity(t *testing.T) {
<-db.Ready()
// Wipe to ensure clean state
if err := db.Wipe(); err != nil {
t.Fatalf("Failed to wipe database: %v", err)
}
// Get identity (creates if not exists)
signer := db.Identity()
if signer == nil {
t.Fatal("Expected non-nil signer from Identity()")
secret1, err := db.GetOrCreateRelayIdentitySecret()
if err != nil {
t.Fatalf("Failed to get identity: %v", err)
}
if secret1 == nil {
t.Fatal("Expected non-nil secret from GetOrCreateRelayIdentitySecret()")
}
// Get identity again (should return same one)
signer2 := db.Identity()
if signer2 == nil {
t.Fatal("Expected non-nil signer from second Identity() call")
secret2, err := db.GetOrCreateRelayIdentitySecret()
if err != nil {
t.Fatalf("Failed to get identity second time: %v", err)
}
if secret2 == nil {
t.Fatal("Expected non-nil secret from second GetOrCreateRelayIdentitySecret() call")
}
// Public keys should match
pub1 := signer.Pub()
pub2 := signer2.Pub()
for i := range pub1 {
if pub1[i] != pub2[i] {
t.Fatal("Identity pubkeys don't match across calls")
// Secrets should match
if len(secret1) != len(secret2) {
t.Fatalf("Secret lengths don't match: %d vs %d", len(secret1), len(secret2))
}
for i := range secret1 {
if secret1[i] != secret2[i] {
t.Fatal("Identity secrets don't match across calls")
}
}

View File

@@ -19,6 +19,7 @@ The policy system provides fine-grained control over event storage and retrieval
- [Dynamic Policy Updates](#dynamic-policy-updates)
- [Evaluation Order](#evaluation-order)
- [Examples](#examples)
- [Permissive Mode Examples](#permissive-mode-examples)
## Overview
@@ -271,6 +272,38 @@ Validates that tag values match the specified regex patterns. Only validates tag
See [Follows-Based Whitelisting](#follows-based-whitelisting) for details.
#### Permissive Mode Overrides
| Field | Type | Description |
|-------|------|-------------|
| `read_allow_permissive` | boolean | Override kind whitelist for READ access (reads allowed for all kinds) |
| `write_allow_permissive` | boolean | Override kind whitelist for WRITE access (writes use global rule only) |
These fields, when set on the **global** rule, allow independent control over read and write access relative to the kind whitelist/blacklist:
```json
{
"kind": {
"whitelist": [1, 3, 5, 7]
},
"global": {
"read_allow_permissive": true,
"size_limit": 100000
}
}
```
In this example:
- **READ**: Allowed for ALL kinds (permissive override ignores whitelist)
- **WRITE**: Only kinds 1, 3, 5, 7 can be written (whitelist applies)
**Important constraints:**
- These flags only work on the **global** rule (ignored on kind-specific rules)
- You cannot enable BOTH `read_allow_permissive` AND `write_allow_permissive` when a kind whitelist/blacklist is configured (this would make the whitelist meaningless)
- Blacklists always take precedence—permissive flags do NOT override explicit blacklist entries
See [Permissive Mode Examples](#permissive-mode-examples) for detailed use cases.
#### Rate Limiting
| Field | Type | Unit | Description |
@@ -809,6 +842,83 @@ access_allowed = (
}
```
### Permissive Mode Examples
#### Read-Permissive Relay (Write-Restricted)
Allow anyone to read all events, but restrict writes to specific kinds:
```json
{
"default_policy": "allow",
"kind": {
"whitelist": [1, 3, 7, 9735]
},
"global": {
"read_allow_permissive": true,
"size_limit": 100000
}
}
```
**Behavior:**
- **READ**: Any kind can be read (permissive override)
- **WRITE**: Only kinds 1, 3, 7, 9735 can be written
This is useful for relays that want to serve as aggregators (read any event type) but only accept specific event types from clients.
#### Write-Permissive with Read Restrictions
Allow writes of any kind (with global constraints), but restrict reads:
```json
{
"default_policy": "allow",
"kind": {
"whitelist": [0, 1, 3]
},
"global": {
"write_allow_permissive": true,
"size_limit": 50000,
"max_age_of_event": 86400
}
}
```
**Behavior:**
- **READ**: Only kinds 0, 1, 3 can be read (whitelist applies)
- **WRITE**: Any kind can be written (with size and age limits from global rule)
This is useful for relays that want to accept any event type but only serve a curated subset.
#### Archive Relay (Read Any, Accept Specific)
Perfect for archive/backup relays:
```json
{
"default_policy": "allow",
"kind": {
"whitelist": [0, 1, 3, 4, 7, 30023]
},
"global": {
"read_allow_permissive": true,
"size_limit": 500000
},
"rules": {
"30023": {
"description": "Long-form articles with validation",
"identifier_regex": "^[a-z0-9-]{1,64}$",
"max_expiry_duration": "P365D"
}
}
}
```
**Behavior:**
- **READ**: All kinds can be read (historical data)
- **WRITE**: Only whitelisted kinds accepted, with specific rules for articles
## Testing
### Run Policy Tests

View File

@@ -40,7 +40,7 @@ func BenchmarkCheckKindsPolicy(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
policy.checkKindsPolicy(1)
policy.checkKindsPolicy("write", 1)
}
}

View File

@@ -168,8 +168,8 @@ func TestBugReproduction_DebugPolicyFlow(t *testing.T) {
t.Logf("=== Policy Check Flow ===")
// Step 1: Check kinds policy
kindsAllowed := policy.checkKindsPolicy(event.Kind)
t.Logf("1. checkKindsPolicy(kind=%d) returned: %v", event.Kind, kindsAllowed)
kindsAllowed := policy.checkKindsPolicy("write", event.Kind)
t.Logf("1. checkKindsPolicy(access=write, kind=%d) returned: %v", event.Kind, kindsAllowed)
// Full policy check
allowed, err := policy.CheckPolicy("write", event, testPubkey, "127.0.0.1")

View File

@@ -1351,6 +1351,57 @@ func TestValidateJSONNewFields(t *testing.T) {
}`,
expectError: false,
},
// Tests for read_allow_permissive and write_allow_permissive
{
name: "valid read_allow_permissive alone with whitelist",
json: `{
"kind": {"whitelist": [1, 3, 5]},
"global": {"read_allow_permissive": true}
}`,
expectError: false,
},
{
name: "valid write_allow_permissive alone with whitelist",
json: `{
"kind": {"whitelist": [1, 3, 5]},
"global": {"write_allow_permissive": true}
}`,
expectError: false,
},
{
name: "invalid both permissive flags with whitelist",
json: `{
"kind": {"whitelist": [1, 3, 5]},
"global": {
"read_allow_permissive": true,
"write_allow_permissive": true
}
}`,
expectError: true,
errorMatch: "read_allow_permissive and write_allow_permissive cannot be enabled together",
},
{
name: "invalid both permissive flags with blacklist",
json: `{
"kind": {"blacklist": [2, 4, 6]},
"global": {
"read_allow_permissive": true,
"write_allow_permissive": true
}
}`,
expectError: true,
errorMatch: "read_allow_permissive and write_allow_permissive cannot be enabled together",
},
{
name: "valid both permissive flags without any kind restriction",
json: `{
"global": {
"read_allow_permissive": true,
"write_allow_permissive": true
}
}`,
expectError: false,
},
}
for _, tt := range tests {

View File

@@ -144,6 +144,23 @@ type Rule struct {
// Example: "^[a-z0-9-]{1,64}$" requires lowercase alphanumeric with hyphens, max 64 chars.
IdentifierRegex string `json:"identifier_regex,omitempty"`
// ReadAllowPermissive when set on a GLOBAL rule, allows read access for ALL kinds,
// even when a kind whitelist is configured. This allows the kind whitelist to
// restrict WRITE operations while keeping reads permissive.
// When true:
// - READ: Allowed for all kinds (global rule still applies for other read restrictions)
// - WRITE: Kind whitelist/blacklist applies as normal
// Only meaningful on the Global rule - ignored on kind-specific rules.
ReadAllowPermissive bool `json:"read_allow_permissive,omitempty"`
// WriteAllowPermissive when set on a GLOBAL rule, allows write access for kinds
// that don't have specific rules defined, bypassing the implicit kind whitelist.
// When true:
// - Kinds without specific rules apply global rule constraints only
// - Kind whitelist still blocks reads for unlisted kinds (unless ReadAllowPermissive is also set)
// Only meaningful on the Global rule - ignored on kind-specific rules.
WriteAllowPermissive bool `json:"write_allow_permissive,omitempty"`
// Binary caches for faster comparison (populated from hex strings above)
// These are not exported and not serialized to JSON
writeAllowBin [][]byte
@@ -178,7 +195,8 @@ func (r *Rule) hasAnyRules() bool {
len(r.ReadFollowsWhitelist) > 0 || len(r.WriteFollowsWhitelist) > 0 ||
len(r.readFollowsWhitelistBin) > 0 || len(r.writeFollowsWhitelistBin) > 0 ||
len(r.TagValidation) > 0 ||
r.ProtectedRequired || r.IdentifierRegex != ""
r.ProtectedRequired || r.IdentifierRegex != "" ||
r.ReadAllowPermissive || r.WriteAllowPermissive
}
// populateBinaryCache converts hex-encoded pubkey strings to binary for faster comparison.
@@ -1280,7 +1298,7 @@ func (p *P) CheckPolicy(
// ==========================================================================
// STEP 1: Check kinds whitelist/blacklist (applies before any rule checks)
// ==========================================================================
if !p.checkKindsPolicy(ev.Kind) {
if !p.checkKindsPolicy(access, ev.Kind) {
return false, nil
}
@@ -1341,19 +1359,32 @@ func (p *P) CheckPolicy(
return p.getDefaultPolicyAction(), nil
}
// checkKindsPolicy checks if the event kind is allowed.
// checkKindsPolicy checks if the event kind is allowed for the given access type.
// Logic:
// 1. If explicit whitelist exists, use it (backwards compatibility)
// 2. If explicit blacklist exists, use it (backwards compatibility)
// 3. Otherwise, kinds with defined rules are implicitly allowed, others denied
func (p *P) checkKindsPolicy(kind uint16) bool {
// If whitelist is present, only allow whitelisted kinds
// 1. If explicit whitelist exists, use it (but respect permissive flags for read/write)
// 2. If explicit blacklist exists, use it (but respect permissive flags for read/write)
// 3. Otherwise, kinds with defined rules are implicitly allowed, others denied (with permissive overrides)
//
// Permissive flags (set on Global rule):
// - ReadAllowPermissive: Allows READ access for kinds not in whitelist (write still restricted)
// - WriteAllowPermissive: Allows WRITE access for kinds not in whitelist (uses global rule constraints)
func (p *P) checkKindsPolicy(access string, kind uint16) bool {
// If whitelist is present, only allow whitelisted kinds (with permissive overrides)
if len(p.Kind.Whitelist) > 0 {
for _, allowedKind := range p.Kind.Whitelist {
if kind == uint16(allowedKind) {
return true
}
}
// Kind not in whitelist - check permissive flags
if access == "read" && p.Global.ReadAllowPermissive {
log.D.F("read_allow_permissive: allowing read for kind %d not in whitelist", kind)
return true // Allow read even though kind not whitelisted
}
if access == "write" && p.Global.WriteAllowPermissive {
log.D.F("write_allow_permissive: allowing write for kind %d not in whitelist (global rules apply)", kind)
return true // Allow write even though kind not whitelisted, global rule will be applied
}
return false
}
@@ -1361,12 +1392,25 @@ func (p *P) checkKindsPolicy(kind uint16) bool {
if len(p.Kind.Blacklist) > 0 {
for _, deniedKind := range p.Kind.Blacklist {
if kind == uint16(deniedKind) {
// Kind is explicitly blacklisted - permissive flags don't override blacklist
return false
}
}
// Not in blacklist - check if rule exists for implicit whitelist
_, hasRule := p.rules[int(kind)]
return hasRule // Only allow if there's a rule defined
if hasRule {
return true
}
// No kind-specific rule - check permissive flags
if access == "read" && p.Global.ReadAllowPermissive {
log.D.F("read_allow_permissive: allowing read for kind %d (not blacklisted, no rule)", kind)
return true
}
if access == "write" && p.Global.WriteAllowPermissive {
log.D.F("write_allow_permissive: allowing write for kind %d (not blacklisted, no rule)", kind)
return true
}
return false // Only allow if there's a rule defined
}
// No explicit whitelist or blacklist
@@ -1374,6 +1418,7 @@ func (p *P) checkKindsPolicy(kind uint16) bool {
// - If default_policy is explicitly "allow", allow all kinds (rules add constraints, not restrictions)
// - If default_policy is unset or "deny", use implicit whitelist (only allow kinds with rules)
// - If global rule has any configuration, allow kinds through for global rule checking
// - Permissive flags can override implicit whitelist behavior
if len(p.rules) > 0 {
// If default_policy is explicitly "allow", don't use implicit whitelist
if p.DefaultPolicy == "allow" {
@@ -1388,6 +1433,15 @@ func (p *P) checkKindsPolicy(kind uint16) bool {
if p.Global.hasAnyRules() {
return true // Allow through for global rule check
}
// Check permissive flags for implicit whitelist override
if access == "read" && p.Global.ReadAllowPermissive {
log.D.F("read_allow_permissive: allowing read for kind %d (implicit whitelist override)", kind)
return true
}
if access == "write" && p.Global.WriteAllowPermissive {
log.D.F("write_allow_permissive: allowing write for kind %d (implicit whitelist override)", kind)
return true
}
return false
}
// No kind-specific rules - check if global rule exists
@@ -2052,6 +2106,13 @@ func (p *P) ValidateJSON(policyJSON []byte) error {
return fmt.Errorf("invalid default_policy value: %q (must be \"allow\" or \"deny\")", tempPolicy.DefaultPolicy)
}
// Validate permissive flags: if both read_allow_permissive AND write_allow_permissive are set
// with a kind whitelist or blacklist, this makes the whitelist/blacklist meaningless
hasKindRestriction := len(tempPolicy.Kind.Whitelist) > 0 || len(tempPolicy.Kind.Blacklist) > 0
if hasKindRestriction && tempPolicy.Global.ReadAllowPermissive && tempPolicy.Global.WriteAllowPermissive {
return fmt.Errorf("invalid policy: both read_allow_permissive and write_allow_permissive cannot be enabled together with a kind whitelist or blacklist (this would make the kind restriction meaningless)")
}
log.D.F("policy JSON validation passed")
return nil
}

View File

@@ -146,6 +146,7 @@ func TestCheckKindsPolicy(t *testing.T) {
tests := []struct {
name string
policy *P
access string // "read" or "write"
kind uint16
expected bool
}{
@@ -155,6 +156,7 @@ func TestCheckKindsPolicy(t *testing.T) {
Kind: Kinds{},
rules: map[int]Rule{}, // No rules defined
},
access: "write",
kind: 1,
expected: true, // Should be allowed (no rules = allow all kinds)
},
@@ -166,6 +168,7 @@ func TestCheckKindsPolicy(t *testing.T) {
2: {Description: "Rule for kind 2"},
},
},
access: "write",
kind: 1,
expected: false, // Should be denied (implicit whitelist, no rule for kind 1)
},
@@ -177,6 +180,7 @@ func TestCheckKindsPolicy(t *testing.T) {
1: {Description: "Rule for kind 1"},
},
},
access: "write",
kind: 1,
expected: true, // Should be allowed (has rule)
},
@@ -189,6 +193,7 @@ func TestCheckKindsPolicy(t *testing.T) {
},
rules: map[int]Rule{}, // No specific rules
},
access: "write",
kind: 1,
expected: true, // Should be allowed (global rule exists)
},
@@ -199,6 +204,7 @@ func TestCheckKindsPolicy(t *testing.T) {
Whitelist: []int{1, 3, 5},
},
},
access: "write",
kind: 1,
expected: true,
},
@@ -209,6 +215,7 @@ func TestCheckKindsPolicy(t *testing.T) {
Whitelist: []int{1, 3, 5},
},
},
access: "write",
kind: 2,
expected: false,
},
@@ -222,6 +229,7 @@ func TestCheckKindsPolicy(t *testing.T) {
3: {Description: "Rule for kind 3"}, // Has at least one rule
},
},
access: "write",
kind: 1,
expected: false, // Should be denied (not blacklisted but no rule for kind 1)
},
@@ -235,6 +243,7 @@ func TestCheckKindsPolicy(t *testing.T) {
1: {Description: "Rule for kind 1"},
},
},
access: "write",
kind: 1,
expected: true, // Should be allowed (not blacklisted and has rule)
},
@@ -245,6 +254,7 @@ func TestCheckKindsPolicy(t *testing.T) {
Blacklist: []int{2, 4, 6},
},
},
access: "write",
kind: 2,
expected: false,
},
@@ -256,14 +266,87 @@ func TestCheckKindsPolicy(t *testing.T) {
Blacklist: []int{1, 2, 3},
},
},
access: "write",
kind: 1,
expected: true,
},
// Tests for new permissive flags
{
name: "read_allow_permissive - allows read for non-whitelisted kind",
policy: &P{
Kind: Kinds{
Whitelist: []int{1, 3, 5},
},
Global: Rule{
ReadAllowPermissive: true,
},
},
access: "read",
kind: 2,
expected: true, // Should be allowed (read permissive overrides whitelist)
},
{
name: "read_allow_permissive - write still blocked for non-whitelisted kind",
policy: &P{
Kind: Kinds{
Whitelist: []int{1, 3, 5},
},
Global: Rule{
ReadAllowPermissive: true,
},
},
access: "write",
kind: 2,
expected: false, // Should be denied (only read is permissive)
},
{
name: "write_allow_permissive - allows write for non-whitelisted kind",
policy: &P{
Kind: Kinds{
Whitelist: []int{1, 3, 5},
},
Global: Rule{
WriteAllowPermissive: true,
},
},
access: "write",
kind: 2,
expected: true, // Should be allowed (write permissive overrides whitelist)
},
{
name: "write_allow_permissive - read still blocked for non-whitelisted kind",
policy: &P{
Kind: Kinds{
Whitelist: []int{1, 3, 5},
},
Global: Rule{
WriteAllowPermissive: true,
},
},
access: "read",
kind: 2,
expected: false, // Should be denied (only write is permissive)
},
{
name: "blacklist - permissive flags do NOT override blacklist",
policy: &P{
Kind: Kinds{
Blacklist: []int{2, 4, 6},
},
Global: Rule{
ReadAllowPermissive: true,
WriteAllowPermissive: true,
},
},
access: "write",
kind: 2,
expected: false, // Should be denied (blacklist always applies)
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.policy.checkKindsPolicy(tt.kind)
result := tt.policy.checkKindsPolicy(tt.access, tt.kind)
if result != tt.expected {
t.Errorf("Expected %v, got %v", tt.expected, result)
}
@@ -996,19 +1079,19 @@ func TestEdgeCasesWhitelistBlacklistConflict(t *testing.T) {
}
// Test kind in both whitelist and blacklist - whitelist should win
allowed := policy.checkKindsPolicy(1)
allowed := policy.checkKindsPolicy("write", 1)
if !allowed {
t.Error("Expected whitelist to override blacklist")
}
// Test kind in blacklist but not whitelist
allowed = policy.checkKindsPolicy(2)
allowed = policy.checkKindsPolicy("write", 2)
if allowed {
t.Error("Expected kind in blacklist but not whitelist to be blocked")
}
// Test kind in whitelist but not blacklist
allowed = policy.checkKindsPolicy(5)
allowed = policy.checkKindsPolicy("write", 5)
if !allowed {
t.Error("Expected kind in whitelist to be allowed")
}

View File

@@ -1 +1 @@
v0.32.5
v0.33.1