implement event table subtyping for small events in value log
This commit is contained in:
@@ -66,6 +66,29 @@ func SecretBytesToPubKeyHex(skb []byte) (pk string, err error) {
|
||||
return hex.Enc(signer.Pub()), nil
|
||||
}
|
||||
|
||||
// SecretBytesToPubKeyBytes generates a public key bytes from secret key bytes.
|
||||
func SecretBytesToPubKeyBytes(skb []byte) (pkb []byte, err error) {
|
||||
var signer *p8k.Signer
|
||||
if signer, err = p8k.New(); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if err = signer.InitSec(skb); chk.E(err) {
|
||||
return
|
||||
}
|
||||
return signer.Pub(), nil
|
||||
}
|
||||
|
||||
// SecretBytesToSigner creates a signer from secret key bytes.
|
||||
func SecretBytesToSigner(skb []byte) (signer *p8k.Signer, err error) {
|
||||
if signer, err = p8k.New(); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if err = signer.InitSec(skb); chk.E(err) {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// IsValid32ByteHex checks that a hex string is a valid 32 bytes lower case hex encoded value as
|
||||
// per nostr NIP-01 spec.
|
||||
func IsValid32ByteHex[V []byte | string](pk V) bool {
|
||||
|
||||
279
pkg/database/dual-storage_test.go
Normal file
279
pkg/database/dual-storage_test.go
Normal file
@@ -0,0 +1,279 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/kind"
|
||||
"next.orly.dev/pkg/encoders/tag"
|
||||
"next.orly.dev/pkg/encoders/timestamp"
|
||||
"next.orly.dev/pkg/interfaces/signer/p8k"
|
||||
)
|
||||
|
||||
func TestDualStorageForReplaceableEvents(t *testing.T) {
|
||||
// Create a temporary directory for the database
|
||||
tempDir, err := os.MkdirTemp("", "test-dual-db-*")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
// Create a context and cancel function for the database
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Initialize the database
|
||||
db, err := New(ctx, cancel, tempDir, "info")
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
// Create a signing key
|
||||
sign := p8k.MustNew()
|
||||
require.NoError(t, sign.Generate())
|
||||
|
||||
t.Run("SmallReplaceableEvent", func(t *testing.T) {
|
||||
// Create a small replaceable event (kind 0 - profile metadata)
|
||||
ev := event.New()
|
||||
ev.Pubkey = sign.Pub()
|
||||
ev.CreatedAt = timestamp.Now().V
|
||||
ev.Kind = kind.ProfileMetadata.K
|
||||
ev.Tags = tag.NewS()
|
||||
ev.Content = []byte(`{"name":"Alice","about":"Test user"}`)
|
||||
|
||||
require.NoError(t, ev.Sign(sign))
|
||||
|
||||
// Save the event
|
||||
replaced, err := db.SaveEvent(ctx, ev)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, replaced)
|
||||
|
||||
// Fetch by serial - should work via sev key
|
||||
ser, err := db.GetSerialById(ev.ID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ser)
|
||||
|
||||
fetched, err := db.FetchEventBySerial(ser)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, fetched)
|
||||
|
||||
// Verify event contents
|
||||
assert.Equal(t, ev.ID, fetched.ID)
|
||||
assert.Equal(t, ev.Pubkey, fetched.Pubkey)
|
||||
assert.Equal(t, ev.Kind, fetched.Kind)
|
||||
assert.Equal(t, ev.Content, fetched.Content)
|
||||
})
|
||||
|
||||
t.Run("LargeReplaceableEvent", func(t *testing.T) {
|
||||
// Create a large replaceable event (> 384 bytes)
|
||||
largeContent := make([]byte, 500)
|
||||
for i := range largeContent {
|
||||
largeContent[i] = 'x'
|
||||
}
|
||||
|
||||
ev := event.New()
|
||||
ev.Pubkey = sign.Pub()
|
||||
ev.CreatedAt = timestamp.Now().V + 1
|
||||
ev.Kind = kind.ProfileMetadata.K
|
||||
ev.Tags = tag.NewS()
|
||||
ev.Content = largeContent
|
||||
|
||||
require.NoError(t, ev.Sign(sign))
|
||||
|
||||
// Save the event
|
||||
replaced, err := db.SaveEvent(ctx, ev)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, replaced) // Should replace the previous profile
|
||||
|
||||
// Fetch by serial - should work via evt key
|
||||
ser, err := db.GetSerialById(ev.ID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ser)
|
||||
|
||||
fetched, err := db.FetchEventBySerial(ser)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, fetched)
|
||||
|
||||
// Verify event contents
|
||||
assert.Equal(t, ev.ID, fetched.ID)
|
||||
assert.Equal(t, ev.Content, fetched.Content)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDualStorageForAddressableEvents(t *testing.T) {
|
||||
// Create a temporary directory for the database
|
||||
tempDir, err := os.MkdirTemp("", "test-addressable-db-*")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
// Create a context and cancel function for the database
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Initialize the database
|
||||
db, err := New(ctx, cancel, tempDir, "info")
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
// Create a signing key
|
||||
sign := p8k.MustNew()
|
||||
require.NoError(t, sign.Generate())
|
||||
|
||||
t.Run("SmallAddressableEvent", func(t *testing.T) {
|
||||
// Create a small addressable event (kind 30023 - long-form content)
|
||||
ev := event.New()
|
||||
ev.Pubkey = sign.Pub()
|
||||
ev.CreatedAt = timestamp.Now().V
|
||||
ev.Kind = 30023
|
||||
ev.Tags = tag.NewS(
|
||||
tag.NewFromAny("d", []byte("my-article")),
|
||||
tag.NewFromAny("title", []byte("Test Article")),
|
||||
)
|
||||
ev.Content = []byte("This is a short article.")
|
||||
|
||||
require.NoError(t, ev.Sign(sign))
|
||||
|
||||
// Save the event
|
||||
replaced, err := db.SaveEvent(ctx, ev)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, replaced)
|
||||
|
||||
// Fetch by serial - should work via sev key
|
||||
ser, err := db.GetSerialById(ev.ID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ser)
|
||||
|
||||
fetched, err := db.FetchEventBySerial(ser)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, fetched)
|
||||
|
||||
// Verify event contents
|
||||
assert.Equal(t, ev.ID, fetched.ID)
|
||||
assert.Equal(t, ev.Pubkey, fetched.Pubkey)
|
||||
assert.Equal(t, ev.Kind, fetched.Kind)
|
||||
assert.Equal(t, ev.Content, fetched.Content)
|
||||
|
||||
// Verify d tag
|
||||
dTag := fetched.Tags.GetFirst([]byte("d"))
|
||||
require.NotNil(t, dTag)
|
||||
assert.Equal(t, []byte("my-article"), dTag.Value())
|
||||
})
|
||||
|
||||
t.Run("AddressableEventWithoutDTag", func(t *testing.T) {
|
||||
// Create an addressable event without d tag (should be treated as regular event)
|
||||
ev := event.New()
|
||||
ev.Pubkey = sign.Pub()
|
||||
ev.CreatedAt = timestamp.Now().V + 1
|
||||
ev.Kind = 30023
|
||||
ev.Tags = tag.NewS()
|
||||
ev.Content = []byte("Article without d tag")
|
||||
|
||||
require.NoError(t, ev.Sign(sign))
|
||||
|
||||
// Save should fail with missing d tag error
|
||||
_, err := db.SaveEvent(ctx, ev)
|
||||
assert.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "missing a d tag")
|
||||
})
|
||||
|
||||
t.Run("ReplaceAddressableEvent", func(t *testing.T) {
|
||||
// Create first version
|
||||
ev1 := event.New()
|
||||
ev1.Pubkey = sign.Pub()
|
||||
ev1.CreatedAt = timestamp.Now().V
|
||||
ev1.Kind = 30023
|
||||
ev1.Tags = tag.NewS(
|
||||
tag.NewFromAny("d", []byte("replaceable-article")),
|
||||
)
|
||||
ev1.Content = []byte("Version 1")
|
||||
|
||||
require.NoError(t, ev1.Sign(sign))
|
||||
|
||||
replaced, err := db.SaveEvent(ctx, ev1)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, replaced)
|
||||
|
||||
// Create second version (newer)
|
||||
ev2 := event.New()
|
||||
ev2.Pubkey = sign.Pub()
|
||||
ev2.CreatedAt = ev1.CreatedAt + 10
|
||||
ev2.Kind = 30023
|
||||
ev2.Tags = tag.NewS(
|
||||
tag.NewFromAny("d", []byte("replaceable-article")),
|
||||
)
|
||||
ev2.Content = []byte("Version 2")
|
||||
|
||||
require.NoError(t, ev2.Sign(sign))
|
||||
|
||||
replaced, err = db.SaveEvent(ctx, ev2)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, replaced)
|
||||
|
||||
// Try to save older version (should fail)
|
||||
ev0 := event.New()
|
||||
ev0.Pubkey = sign.Pub()
|
||||
ev0.CreatedAt = ev1.CreatedAt - 10
|
||||
ev0.Kind = 30023
|
||||
ev0.Tags = tag.NewS(
|
||||
tag.NewFromAny("d", []byte("replaceable-article")),
|
||||
)
|
||||
ev0.Content = []byte("Version 0 (old)")
|
||||
|
||||
require.NoError(t, ev0.Sign(sign))
|
||||
|
||||
replaced, err = db.SaveEvent(ctx, ev0)
|
||||
assert.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "older than existing")
|
||||
})
|
||||
}
|
||||
|
||||
func TestDualStorageRegularEvents(t *testing.T) {
|
||||
// Create a temporary directory for the database
|
||||
tempDir, err := os.MkdirTemp("", "test-regular-db-*")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
// Create a context and cancel function for the database
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Initialize the database
|
||||
db, err := New(ctx, cancel, tempDir, "info")
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
// Create a signing key
|
||||
sign := p8k.MustNew()
|
||||
require.NoError(t, sign.Generate())
|
||||
|
||||
t.Run("SmallRegularEvent", func(t *testing.T) {
|
||||
// Create a small regular event (kind 1 - note)
|
||||
ev := event.New()
|
||||
ev.Pubkey = sign.Pub()
|
||||
ev.CreatedAt = timestamp.Now().V
|
||||
ev.Kind = kind.TextNote.K
|
||||
ev.Tags = tag.NewS()
|
||||
ev.Content = []byte("Hello, Nostr!")
|
||||
|
||||
require.NoError(t, ev.Sign(sign))
|
||||
|
||||
// Save the event
|
||||
replaced, err := db.SaveEvent(ctx, ev)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, replaced)
|
||||
|
||||
// Fetch by serial - should work via sev key
|
||||
ser, err := db.GetSerialById(ev.ID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ser)
|
||||
|
||||
fetched, err := db.FetchEventBySerial(ser)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, fetched)
|
||||
|
||||
// Verify event contents
|
||||
assert.Equal(t, ev.ID, fetched.ID)
|
||||
assert.Equal(t, ev.Content, fetched.Content)
|
||||
})
|
||||
}
|
||||
@@ -14,6 +14,55 @@ import (
|
||||
func (d *D) FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error) {
|
||||
if err = d.View(
|
||||
func(txn *badger.Txn) (err error) {
|
||||
// Helper function to extract inline event data from key
|
||||
extractInlineData := func(key []byte, prefixLen int) (*event.E, error) {
|
||||
if len(key) > prefixLen+2 {
|
||||
sizeIdx := prefixLen
|
||||
size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1])
|
||||
dataStart := sizeIdx + 2
|
||||
|
||||
if len(key) >= dataStart+size {
|
||||
eventData := key[dataStart : dataStart+size]
|
||||
ev := new(event.E)
|
||||
if err := ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"error unmarshaling inline event (size=%d): %w",
|
||||
size, err,
|
||||
)
|
||||
}
|
||||
return ev, nil
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Try sev (small event inline) prefix first - Reiser4 optimization
|
||||
smallBuf := new(bytes.Buffer)
|
||||
if err = indexes.SmallEventEnc(ser).MarshalWrite(smallBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.Prefix = smallBuf.Bytes()
|
||||
opts.PrefetchValues = true
|
||||
opts.PrefetchSize = 1
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
it.Rewind()
|
||||
if it.Valid() {
|
||||
// Found in sev table - extract inline data
|
||||
key := it.Item().Key()
|
||||
// Key format: sev|serial|size_uint16|event_data
|
||||
if ev, err = extractInlineData(key, 8); err != nil {
|
||||
return err
|
||||
}
|
||||
if ev != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Not found in sev table, try evt (traditional) prefix
|
||||
buf := new(bytes.Buffer)
|
||||
if err = indexes.EventEnc(ser).MarshalWrite(buf); chk.E(err) {
|
||||
return
|
||||
|
||||
@@ -15,47 +15,92 @@ import (
|
||||
func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*event.E, err error) {
|
||||
// Pre-allocate map with estimated capacity to reduce reallocations
|
||||
events = make(map[uint64]*event.E, len(serials))
|
||||
|
||||
|
||||
if len(serials) == 0 {
|
||||
return events, nil
|
||||
}
|
||||
|
||||
|
||||
if err = d.View(
|
||||
func(txn *badger.Txn) (err error) {
|
||||
for _, ser := range serials {
|
||||
var ev *event.E
|
||||
|
||||
// Try sev (small event inline) prefix first - Reiser4 optimization
|
||||
smallBuf := new(bytes.Buffer)
|
||||
if err = indexes.SmallEventEnc(ser).MarshalWrite(smallBuf); chk.E(err) {
|
||||
// Skip this serial on error but continue with others
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
|
||||
// Iterate with prefix to find the small event key
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.Prefix = smallBuf.Bytes()
|
||||
opts.PrefetchValues = true
|
||||
opts.PrefetchSize = 1
|
||||
it := txn.NewIterator(opts)
|
||||
|
||||
it.Rewind()
|
||||
if it.Valid() {
|
||||
// Found in sev table - extract inline data
|
||||
key := it.Item().Key()
|
||||
// Key format: sev|serial|size_uint16|event_data
|
||||
if len(key) > 8+2 { // prefix(3) + serial(5) + size(2) = 10 bytes minimum
|
||||
sizeIdx := 8 // After sev(3) + serial(5)
|
||||
// Read uint16 big-endian size
|
||||
size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1])
|
||||
dataStart := sizeIdx + 2
|
||||
|
||||
if len(key) >= dataStart+size {
|
||||
eventData := key[dataStart : dataStart+size]
|
||||
ev = new(event.E)
|
||||
if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err == nil {
|
||||
events[ser.Get()] = ev
|
||||
}
|
||||
// Clean up and continue
|
||||
it.Close()
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
it.Close()
|
||||
|
||||
// Not found in sev table, try evt (traditional) prefix
|
||||
buf := new(bytes.Buffer)
|
||||
if err = indexes.EventEnc(ser).MarshalWrite(buf); chk.E(err) {
|
||||
// Skip this serial on error but continue with others
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
var item *badger.Item
|
||||
if item, err = txn.Get(buf.Bytes()); err != nil {
|
||||
// Skip this serial if not found but continue with others
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
var v []byte
|
||||
if v, err = item.ValueCopy(nil); chk.E(err) {
|
||||
// Skip this serial on error but continue with others
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
// Check if we have valid data before attempting to unmarshal
|
||||
if len(v) < 32+32+1+2+1+1+64 { // ID + Pubkey + min varint fields + Sig
|
||||
// Skip this serial - incomplete data
|
||||
continue
|
||||
}
|
||||
|
||||
ev := new(event.E)
|
||||
|
||||
ev = new(event.E)
|
||||
if err = ev.UnmarshalBinary(bytes.NewBuffer(v)); err != nil {
|
||||
// Skip this serial on unmarshal error but continue with others
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
// Successfully unmarshaled event, add to results
|
||||
events[ser.Get()] = ev
|
||||
}
|
||||
@@ -64,6 +109,6 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev
|
||||
); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
return events, nil
|
||||
}
|
||||
@@ -55,9 +55,12 @@ type I string
|
||||
func (i I) Write(w io.Writer) (n int, err error) { return w.Write([]byte(i)) }
|
||||
|
||||
const (
|
||||
EventPrefix = I("evt")
|
||||
IdPrefix = I("eid")
|
||||
FullIdPubkeyPrefix = I("fpc") // full id, pubkey, created at
|
||||
EventPrefix = I("evt")
|
||||
SmallEventPrefix = I("sev") // small event with inline data (<=384 bytes)
|
||||
ReplaceableEventPrefix = I("rev") // replaceable event (kinds 0,3,10000-19999) with inline data
|
||||
AddressableEventPrefix = I("aev") // addressable event (kinds 30000-39999) with inline data
|
||||
IdPrefix = I("eid")
|
||||
FullIdPubkeyPrefix = I("fpc") // full id, pubkey, created at
|
||||
|
||||
CreatedAtPrefix = I("c--") // created at
|
||||
KindPrefix = I("kc-") // kind, created at
|
||||
@@ -80,6 +83,12 @@ func Prefix(prf int) (i I) {
|
||||
switch prf {
|
||||
case Event:
|
||||
return EventPrefix
|
||||
case SmallEvent:
|
||||
return SmallEventPrefix
|
||||
case ReplaceableEvent:
|
||||
return ReplaceableEventPrefix
|
||||
case AddressableEvent:
|
||||
return AddressableEventPrefix
|
||||
case Id:
|
||||
return IdPrefix
|
||||
case FullIdPubkey:
|
||||
@@ -125,6 +134,12 @@ func Identify(r io.Reader) (i int, err error) {
|
||||
switch I(b[:]) {
|
||||
case EventPrefix:
|
||||
i = Event
|
||||
case SmallEventPrefix:
|
||||
i = SmallEvent
|
||||
case ReplaceableEventPrefix:
|
||||
i = ReplaceableEvent
|
||||
case AddressableEventPrefix:
|
||||
i = AddressableEvent
|
||||
case IdPrefix:
|
||||
i = Id
|
||||
case FullIdPubkeyPrefix:
|
||||
@@ -200,6 +215,53 @@ func EventEnc(ser *types.Uint40) (enc *T) {
|
||||
}
|
||||
func EventDec(ser *types.Uint40) (enc *T) { return New(NewPrefix(), ser) }
|
||||
|
||||
// SmallEvent stores events <=384 bytes with inline data to avoid double lookup.
|
||||
// This is a Reiser4-inspired optimization for small event packing.
|
||||
// 384 bytes covers: ID(32) + Pubkey(32) + Sig(64) + basic fields + small content
|
||||
//
|
||||
// prefix|5 serial|2 size_uint16|data (variable length, max 384 bytes)
|
||||
var SmallEvent = next()
|
||||
|
||||
func SmallEventVars() (ser *types.Uint40) { return new(types.Uint40) }
|
||||
func SmallEventEnc(ser *types.Uint40) (enc *T) {
|
||||
return New(NewPrefix(SmallEvent), ser)
|
||||
}
|
||||
func SmallEventDec(ser *types.Uint40) (enc *T) { return New(NewPrefix(), ser) }
|
||||
|
||||
// ReplaceableEvent stores replaceable events (kinds 0,3,10000-19999) with inline data.
|
||||
// Optimized storage for metadata events that are frequently replaced.
|
||||
// Key format enables direct lookup by pubkey+kind without additional index traversal.
|
||||
//
|
||||
// prefix|8 pubkey_hash|2 kind|2 size_uint16|data (variable length, max 384 bytes)
|
||||
var ReplaceableEvent = next()
|
||||
|
||||
func ReplaceableEventVars() (p *types.PubHash, ki *types.Uint16) {
|
||||
return new(types.PubHash), new(types.Uint16)
|
||||
}
|
||||
func ReplaceableEventEnc(p *types.PubHash, ki *types.Uint16) (enc *T) {
|
||||
return New(NewPrefix(ReplaceableEvent), p, ki)
|
||||
}
|
||||
func ReplaceableEventDec(p *types.PubHash, ki *types.Uint16) (enc *T) {
|
||||
return New(NewPrefix(), p, ki)
|
||||
}
|
||||
|
||||
// AddressableEvent stores parameterized replaceable events (kinds 30000-39999) with inline data.
|
||||
// Optimized storage for addressable events identified by pubkey+kind+d-tag.
|
||||
// Key format enables direct lookup without additional index traversal.
|
||||
//
|
||||
// prefix|8 pubkey_hash|2 kind|8 dtag_hash|2 size_uint16|data (variable length, max 384 bytes)
|
||||
var AddressableEvent = next()
|
||||
|
||||
func AddressableEventVars() (p *types.PubHash, ki *types.Uint16, d *types.Ident) {
|
||||
return new(types.PubHash), new(types.Uint16), new(types.Ident)
|
||||
}
|
||||
func AddressableEventEnc(p *types.PubHash, ki *types.Uint16, d *types.Ident) (enc *T) {
|
||||
return New(NewPrefix(AddressableEvent), p, ki, d)
|
||||
}
|
||||
func AddressableEventDec(p *types.PubHash, ki *types.Uint16, d *types.Ident) (enc *T) {
|
||||
return New(NewPrefix(), p, ki, d)
|
||||
}
|
||||
|
||||
// Id contains a truncated 8-byte hash of an event index. This is the secondary
|
||||
// key of an event, the primary key is the serial found in the Event.
|
||||
//
|
||||
|
||||
521
pkg/database/inline-storage_test.go
Normal file
521
pkg/database/inline-storage_test.go
Normal file
@@ -0,0 +1,521 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"lol.mleku.dev/chk"
|
||||
"next.orly.dev/pkg/database/indexes"
|
||||
"next.orly.dev/pkg/database/indexes/types"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/hex"
|
||||
"next.orly.dev/pkg/encoders/kind"
|
||||
"next.orly.dev/pkg/encoders/tag"
|
||||
"next.orly.dev/pkg/encoders/timestamp"
|
||||
"next.orly.dev/pkg/interfaces/signer/p8k"
|
||||
)
|
||||
|
||||
// TestInlineSmallEventStorage tests the Reiser4-inspired inline storage optimization
|
||||
// for small events (<=384 bytes).
|
||||
func TestInlineSmallEventStorage(t *testing.T) {
|
||||
// Create a temporary directory for the database
|
||||
tempDir, err := os.MkdirTemp("", "test-inline-db-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temporary directory: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
// Create a context and cancel function for the database
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Initialize the database
|
||||
db, err := New(ctx, cancel, tempDir, "info")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Create a signer
|
||||
sign := p8k.MustNew()
|
||||
if err := sign.Generate(); chk.E(err) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Test Case 1: Small event (should use inline storage)
|
||||
t.Run("SmallEventInlineStorage", func(t *testing.T) {
|
||||
smallEvent := event.New()
|
||||
smallEvent.Kind = kind.TextNote.K
|
||||
smallEvent.CreatedAt = timestamp.Now().V
|
||||
smallEvent.Content = []byte("Hello Nostr!") // Small content
|
||||
smallEvent.Pubkey = sign.Pub()
|
||||
smallEvent.Tags = tag.NewS()
|
||||
|
||||
// Sign the event
|
||||
if err := smallEvent.Sign(sign); err != nil {
|
||||
t.Fatalf("Failed to sign small event: %v", err)
|
||||
}
|
||||
|
||||
// Save the event
|
||||
if _, err := db.SaveEvent(ctx, smallEvent); err != nil {
|
||||
t.Fatalf("Failed to save small event: %v", err)
|
||||
}
|
||||
|
||||
// Verify it was stored with sev prefix
|
||||
serial, err := db.GetSerialById(smallEvent.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get serial for small event: %v", err)
|
||||
}
|
||||
|
||||
// Check that sev key exists
|
||||
sevKeyExists := false
|
||||
db.View(func(txn *badger.Txn) error {
|
||||
smallBuf := new(bytes.Buffer)
|
||||
indexes.SmallEventEnc(serial).MarshalWrite(smallBuf)
|
||||
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.Prefix = smallBuf.Bytes()
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
it.Rewind()
|
||||
if it.Valid() {
|
||||
sevKeyExists = true
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if !sevKeyExists {
|
||||
t.Errorf("Small event was not stored with sev prefix")
|
||||
}
|
||||
|
||||
// Verify evt key does NOT exist for small event
|
||||
evtKeyExists := false
|
||||
db.View(func(txn *badger.Txn) error {
|
||||
buf := new(bytes.Buffer)
|
||||
indexes.EventEnc(serial).MarshalWrite(buf)
|
||||
|
||||
_, err := txn.Get(buf.Bytes())
|
||||
if err == nil {
|
||||
evtKeyExists = true
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if evtKeyExists {
|
||||
t.Errorf("Small event should not have evt key (should only use sev)")
|
||||
}
|
||||
|
||||
// Fetch and verify the event
|
||||
fetchedEvent, err := db.FetchEventBySerial(serial)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to fetch small event: %v", err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(fetchedEvent.ID, smallEvent.ID) {
|
||||
t.Errorf("Fetched event ID mismatch: got %x, want %x", fetchedEvent.ID, smallEvent.ID)
|
||||
}
|
||||
if !bytes.Equal(fetchedEvent.Content, smallEvent.Content) {
|
||||
t.Errorf("Fetched event content mismatch: got %q, want %q", fetchedEvent.Content, smallEvent.Content)
|
||||
}
|
||||
})
|
||||
|
||||
// Test Case 2: Large event (should use traditional storage)
|
||||
t.Run("LargeEventTraditionalStorage", func(t *testing.T) {
|
||||
largeEvent := event.New()
|
||||
largeEvent.Kind = kind.TextNote.K
|
||||
largeEvent.CreatedAt = timestamp.Now().V
|
||||
// Create content larger than 384 bytes
|
||||
largeContent := make([]byte, 500)
|
||||
for i := range largeContent {
|
||||
largeContent[i] = 'x'
|
||||
}
|
||||
largeEvent.Content = largeContent
|
||||
largeEvent.Pubkey = sign.Pub()
|
||||
largeEvent.Tags = tag.NewS()
|
||||
|
||||
// Sign the event
|
||||
if err := largeEvent.Sign(sign); err != nil {
|
||||
t.Fatalf("Failed to sign large event: %v", err)
|
||||
}
|
||||
|
||||
// Save the event
|
||||
if _, err := db.SaveEvent(ctx, largeEvent); err != nil {
|
||||
t.Fatalf("Failed to save large event: %v", err)
|
||||
}
|
||||
|
||||
// Verify it was stored with evt prefix
|
||||
serial, err := db.GetSerialById(largeEvent.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get serial for large event: %v", err)
|
||||
}
|
||||
|
||||
// Check that evt key exists
|
||||
evtKeyExists := false
|
||||
db.View(func(txn *badger.Txn) error {
|
||||
buf := new(bytes.Buffer)
|
||||
indexes.EventEnc(serial).MarshalWrite(buf)
|
||||
|
||||
_, err := txn.Get(buf.Bytes())
|
||||
if err == nil {
|
||||
evtKeyExists = true
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if !evtKeyExists {
|
||||
t.Errorf("Large event was not stored with evt prefix")
|
||||
}
|
||||
|
||||
// Fetch and verify the event
|
||||
fetchedEvent, err := db.FetchEventBySerial(serial)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to fetch large event: %v", err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(fetchedEvent.ID, largeEvent.ID) {
|
||||
t.Errorf("Fetched event ID mismatch: got %x, want %x", fetchedEvent.ID, largeEvent.ID)
|
||||
}
|
||||
})
|
||||
|
||||
// Test Case 3: Batch fetch with mixed small and large events
|
||||
t.Run("BatchFetchMixedEvents", func(t *testing.T) {
|
||||
var serials []*types.Uint40
|
||||
expectedIDs := make(map[uint64][]byte)
|
||||
|
||||
// Create 10 small events and 10 large events
|
||||
for i := 0; i < 20; i++ {
|
||||
ev := event.New()
|
||||
ev.Kind = kind.TextNote.K
|
||||
ev.CreatedAt = timestamp.Now().V + int64(i)
|
||||
ev.Pubkey = sign.Pub()
|
||||
ev.Tags = tag.NewS()
|
||||
|
||||
// Alternate between small and large
|
||||
if i%2 == 0 {
|
||||
ev.Content = []byte("Small event")
|
||||
} else {
|
||||
largeContent := make([]byte, 500)
|
||||
for j := range largeContent {
|
||||
largeContent[j] = 'x'
|
||||
}
|
||||
ev.Content = largeContent
|
||||
}
|
||||
|
||||
if err := ev.Sign(sign); err != nil {
|
||||
t.Fatalf("Failed to sign event %d: %v", i, err)
|
||||
}
|
||||
|
||||
if _, err := db.SaveEvent(ctx, ev); err != nil {
|
||||
t.Fatalf("Failed to save event %d: %v", i, err)
|
||||
}
|
||||
|
||||
serial, err := db.GetSerialById(ev.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get serial for event %d: %v", i, err)
|
||||
}
|
||||
|
||||
serials = append(serials, serial)
|
||||
expectedIDs[serial.Get()] = ev.ID
|
||||
}
|
||||
|
||||
// Batch fetch all events
|
||||
events, err := db.FetchEventsBySerials(serials)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to batch fetch events: %v", err)
|
||||
}
|
||||
|
||||
if len(events) != 20 {
|
||||
t.Errorf("Expected 20 events, got %d", len(events))
|
||||
}
|
||||
|
||||
// Verify all events were fetched correctly
|
||||
for serialValue, ev := range events {
|
||||
expectedID := expectedIDs[serialValue]
|
||||
if !bytes.Equal(ev.ID, expectedID) {
|
||||
t.Errorf("Event ID mismatch for serial %d: got %x, want %x",
|
||||
serialValue, ev.ID, expectedID)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// Test Case 4: Edge case - event near 384 byte threshold
|
||||
t.Run("ThresholdEvent", func(t *testing.T) {
|
||||
ev := event.New()
|
||||
ev.Kind = kind.TextNote.K
|
||||
ev.CreatedAt = timestamp.Now().V
|
||||
ev.Pubkey = sign.Pub()
|
||||
ev.Tags = tag.NewS()
|
||||
|
||||
// Create content near the threshold
|
||||
testContent := make([]byte, 250)
|
||||
for i := range testContent {
|
||||
testContent[i] = 'x'
|
||||
}
|
||||
ev.Content = testContent
|
||||
|
||||
if err := ev.Sign(sign); err != nil {
|
||||
t.Fatalf("Failed to sign threshold event: %v", err)
|
||||
}
|
||||
|
||||
if _, err := db.SaveEvent(ctx, ev); err != nil {
|
||||
t.Fatalf("Failed to save threshold event: %v", err)
|
||||
}
|
||||
|
||||
serial, err := db.GetSerialById(ev.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get serial: %v", err)
|
||||
}
|
||||
|
||||
// Fetch and verify
|
||||
fetchedEvent, err := db.FetchEventBySerial(serial)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to fetch threshold event: %v", err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(fetchedEvent.ID, ev.ID) {
|
||||
t.Errorf("Fetched event ID mismatch")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestInlineStorageMigration tests the migration from traditional to inline storage
|
||||
func TestInlineStorageMigration(t *testing.T) {
|
||||
// Create a temporary directory for the database
|
||||
tempDir, err := os.MkdirTemp("", "test-migration-db-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temporary directory: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
// Create a context and cancel function for the database
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Initialize the database
|
||||
db, err := New(ctx, cancel, tempDir, "info")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create database: %v", err)
|
||||
}
|
||||
|
||||
// Create a signer
|
||||
sign := p8k.MustNew()
|
||||
if err := sign.Generate(); chk.E(err) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Manually set database version to 3 (before inline storage migration)
|
||||
db.writeVersionTag(3)
|
||||
|
||||
// Create and save some small events the old way (manually)
|
||||
var testEvents []*event.E
|
||||
for i := 0; i < 5; i++ {
|
||||
ev := event.New()
|
||||
ev.Kind = kind.TextNote.K
|
||||
ev.CreatedAt = timestamp.Now().V + int64(i)
|
||||
ev.Content = []byte("Test event")
|
||||
ev.Pubkey = sign.Pub()
|
||||
ev.Tags = tag.NewS()
|
||||
|
||||
if err := ev.Sign(sign); err != nil {
|
||||
t.Fatalf("Failed to sign event: %v", err)
|
||||
}
|
||||
|
||||
// Get next serial
|
||||
serial, err := db.seq.Next()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get serial: %v", err)
|
||||
}
|
||||
|
||||
// Generate indexes
|
||||
idxs, err := GetIndexesForEvent(ev, serial)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to generate indexes: %v", err)
|
||||
}
|
||||
|
||||
// Serialize event
|
||||
eventDataBuf := new(bytes.Buffer)
|
||||
ev.MarshalBinary(eventDataBuf)
|
||||
eventData := eventDataBuf.Bytes()
|
||||
|
||||
// Save the old way (evt prefix with value)
|
||||
db.Update(func(txn *badger.Txn) error {
|
||||
ser := new(types.Uint40)
|
||||
ser.Set(serial)
|
||||
|
||||
// Save indexes
|
||||
for _, key := range idxs {
|
||||
txn.Set(key, nil)
|
||||
}
|
||||
|
||||
// Save event the old way
|
||||
keyBuf := new(bytes.Buffer)
|
||||
indexes.EventEnc(ser).MarshalWrite(keyBuf)
|
||||
txn.Set(keyBuf.Bytes(), eventData)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
testEvents = append(testEvents, ev)
|
||||
}
|
||||
|
||||
t.Logf("Created %d test events with old storage format", len(testEvents))
|
||||
|
||||
// Close and reopen database to trigger migration
|
||||
db.Close()
|
||||
|
||||
db, err = New(ctx, cancel, tempDir, "info")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to reopen database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Give migration time to complete
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Verify all events can still be fetched
|
||||
for i, ev := range testEvents {
|
||||
serial, err := db.GetSerialById(ev.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get serial for event %d after migration: %v", i, err)
|
||||
}
|
||||
|
||||
fetchedEvent, err := db.FetchEventBySerial(serial)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to fetch event %d after migration: %v", i, err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(fetchedEvent.ID, ev.ID) {
|
||||
t.Errorf("Event %d ID mismatch after migration: got %x, want %x",
|
||||
i, fetchedEvent.ID, ev.ID)
|
||||
}
|
||||
|
||||
if !bytes.Equal(fetchedEvent.Content, ev.Content) {
|
||||
t.Errorf("Event %d content mismatch after migration: got %q, want %q",
|
||||
i, fetchedEvent.Content, ev.Content)
|
||||
}
|
||||
|
||||
// Verify it's now using inline storage
|
||||
sevKeyExists := false
|
||||
db.View(func(txn *badger.Txn) error {
|
||||
smallBuf := new(bytes.Buffer)
|
||||
indexes.SmallEventEnc(serial).MarshalWrite(smallBuf)
|
||||
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.Prefix = smallBuf.Bytes()
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
it.Rewind()
|
||||
if it.Valid() {
|
||||
sevKeyExists = true
|
||||
t.Logf("Event %d (%s) successfully migrated to inline storage",
|
||||
i, hex.Enc(ev.ID[:8]))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if !sevKeyExists {
|
||||
t.Errorf("Event %d was not migrated to inline storage", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkInlineVsTraditionalStorage compares performance of inline vs traditional storage
|
||||
func BenchmarkInlineVsTraditionalStorage(b *testing.B) {
|
||||
// Create a temporary directory for the database
|
||||
tempDir, err := os.MkdirTemp("", "bench-inline-db-*")
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to create temporary directory: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
// Create a context and cancel function for the database
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Initialize the database
|
||||
db, err := New(ctx, cancel, tempDir, "info")
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to create database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Create a signer
|
||||
sign := p8k.MustNew()
|
||||
if err := sign.Generate(); chk.E(err) {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
// Pre-populate database with mix of small and large events
|
||||
var smallSerials []*types.Uint40
|
||||
var largeSerials []*types.Uint40
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
// Small event
|
||||
smallEv := event.New()
|
||||
smallEv.Kind = kind.TextNote.K
|
||||
smallEv.CreatedAt = timestamp.Now().V + int64(i)*2
|
||||
smallEv.Content = []byte("Small test event")
|
||||
smallEv.Pubkey = sign.Pub()
|
||||
smallEv.Tags = tag.NewS()
|
||||
smallEv.Sign(sign)
|
||||
|
||||
db.SaveEvent(ctx, smallEv)
|
||||
if serial, err := db.GetSerialById(smallEv.ID); err == nil {
|
||||
smallSerials = append(smallSerials, serial)
|
||||
}
|
||||
|
||||
// Large event
|
||||
largeEv := event.New()
|
||||
largeEv.Kind = kind.TextNote.K
|
||||
largeEv.CreatedAt = timestamp.Now().V + int64(i)*2 + 1
|
||||
largeContent := make([]byte, 500)
|
||||
for j := range largeContent {
|
||||
largeContent[j] = 'x'
|
||||
}
|
||||
largeEv.Content = largeContent
|
||||
largeEv.Pubkey = sign.Pub()
|
||||
largeEv.Tags = tag.NewS()
|
||||
largeEv.Sign(sign)
|
||||
|
||||
db.SaveEvent(ctx, largeEv)
|
||||
if serial, err := db.GetSerialById(largeEv.ID); err == nil {
|
||||
largeSerials = append(largeSerials, serial)
|
||||
}
|
||||
}
|
||||
|
||||
b.Run("FetchSmallEventsInline", func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
idx := i % len(smallSerials)
|
||||
db.FetchEventBySerial(smallSerials[idx])
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("FetchLargeEventsTraditional", func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
idx := i % len(largeSerials)
|
||||
db.FetchEventBySerial(largeSerials[idx])
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("BatchFetchSmallEvents", func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
db.FetchEventsBySerials(smallSerials[:10])
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("BatchFetchLargeEvents", func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
db.FetchEventsBySerials(largeSerials[:10])
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -12,10 +12,11 @@ import (
|
||||
"next.orly.dev/pkg/database/indexes/types"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/ints"
|
||||
"next.orly.dev/pkg/encoders/kind"
|
||||
)
|
||||
|
||||
const (
|
||||
currentVersion uint32 = 3
|
||||
currentVersion uint32 = 4
|
||||
)
|
||||
|
||||
func (d *D) RunMigrations() {
|
||||
@@ -82,6 +83,13 @@ func (d *D) RunMigrations() {
|
||||
// bump to version 3
|
||||
_ = d.writeVersionTag(3)
|
||||
}
|
||||
if dbVersion < 4 {
|
||||
log.I.F("migrating to version 4...")
|
||||
// convert small events to inline storage (Reiser4 optimization)
|
||||
d.ConvertSmallEventsToInline()
|
||||
// bump to version 4
|
||||
_ = d.writeVersionTag(4)
|
||||
}
|
||||
}
|
||||
|
||||
// writeVersionTag writes a new version tag key to the database (no value)
|
||||
@@ -323,3 +331,209 @@ func (d *D) CleanupEphemeralEvents() {
|
||||
|
||||
log.I.F("cleaned up %d ephemeral events from database", deletedCount)
|
||||
}
|
||||
|
||||
// ConvertSmallEventsToInline migrates small events (<=384 bytes) to inline storage.
|
||||
// This is a Reiser4-inspired optimization that stores small event data in the key itself,
|
||||
// avoiding a second database lookup and improving query performance.
|
||||
// Also handles replaceable and addressable events with specialized storage.
|
||||
func (d *D) ConvertSmallEventsToInline() {
|
||||
log.I.F("converting events to optimized inline storage (Reiser4 optimization)...")
|
||||
var err error
|
||||
const smallEventThreshold = 384
|
||||
|
||||
type EventData struct {
|
||||
Serial uint64
|
||||
EventData []byte
|
||||
OldKey []byte
|
||||
IsReplaceable bool
|
||||
IsAddressable bool
|
||||
Pubkey []byte
|
||||
Kind uint16
|
||||
DTag []byte
|
||||
}
|
||||
|
||||
var events []EventData
|
||||
var convertedCount int
|
||||
var deletedCount int
|
||||
|
||||
// Helper function for counting by predicate
|
||||
countBy := func(events []EventData, predicate func(EventData) bool) int {
|
||||
count := 0
|
||||
for _, e := range events {
|
||||
if predicate(e) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
// First pass: identify events in evt table that can benefit from inline storage
|
||||
if err = d.View(
|
||||
func(txn *badger.Txn) (err error) {
|
||||
prf := new(bytes.Buffer)
|
||||
if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()})
|
||||
defer it.Close()
|
||||
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
item := it.Item()
|
||||
var val []byte
|
||||
if val, err = item.ValueCopy(nil); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if event data is small enough for inline storage
|
||||
if len(val) <= smallEventThreshold {
|
||||
// Decode event to check if it's replaceable or addressable
|
||||
ev := new(event.E)
|
||||
if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Extract serial from key
|
||||
key := item.KeyCopy(nil)
|
||||
ser := indexes.EventVars()
|
||||
if err = indexes.EventDec(ser).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
eventData := EventData{
|
||||
Serial: ser.Get(),
|
||||
EventData: val,
|
||||
OldKey: key,
|
||||
IsReplaceable: kind.IsReplaceable(ev.Kind),
|
||||
IsAddressable: kind.IsParameterizedReplaceable(ev.Kind),
|
||||
Pubkey: ev.Pubkey,
|
||||
Kind: ev.Kind,
|
||||
}
|
||||
|
||||
// Extract d-tag for addressable events
|
||||
if eventData.IsAddressable {
|
||||
dTag := ev.Tags.GetFirst([]byte("d"))
|
||||
if dTag != nil {
|
||||
eventData.DTag = dTag.Value()
|
||||
}
|
||||
}
|
||||
|
||||
events = append(events, eventData)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
log.I.F("found %d events to convert (%d regular, %d replaceable, %d addressable)",
|
||||
len(events),
|
||||
countBy(events, func(e EventData) bool { return !e.IsReplaceable && !e.IsAddressable }),
|
||||
countBy(events, func(e EventData) bool { return e.IsReplaceable }),
|
||||
countBy(events, func(e EventData) bool { return e.IsAddressable }),
|
||||
)
|
||||
|
||||
// Second pass: convert in batches to avoid large transactions
|
||||
const batchSize = 1000
|
||||
for i := 0; i < len(events); i += batchSize {
|
||||
end := i + batchSize
|
||||
if end > len(events) {
|
||||
end = len(events)
|
||||
}
|
||||
batch := events[i:end]
|
||||
|
||||
// Write new inline keys and delete old keys
|
||||
if err = d.Update(
|
||||
func(txn *badger.Txn) (err error) {
|
||||
for _, e := range batch {
|
||||
// First, write the sev key for serial-based access (all small events)
|
||||
sevKeyBuf := new(bytes.Buffer)
|
||||
ser := new(types.Uint40)
|
||||
if err = ser.Set(e.Serial); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err = indexes.SmallEventEnc(ser).MarshalWrite(sevKeyBuf); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Append size as uint16 big-endian (2 bytes)
|
||||
sizeBytes := []byte{byte(len(e.EventData) >> 8), byte(len(e.EventData))}
|
||||
sevKeyBuf.Write(sizeBytes)
|
||||
|
||||
// Append event data
|
||||
sevKeyBuf.Write(e.EventData)
|
||||
|
||||
// Write sev key (no value needed)
|
||||
if err = txn.Set(sevKeyBuf.Bytes(), nil); chk.E(err) {
|
||||
log.W.F("failed to write sev key for serial %d: %v", e.Serial, err)
|
||||
continue
|
||||
}
|
||||
convertedCount++
|
||||
|
||||
// Additionally, for replaceable/addressable events, write specialized keys
|
||||
if e.IsAddressable && len(e.DTag) > 0 {
|
||||
// Addressable event: aev|pubkey_hash|kind|dtag_hash|size|data
|
||||
aevKeyBuf := new(bytes.Buffer)
|
||||
pubHash := new(types.PubHash)
|
||||
pubHash.FromPubkey(e.Pubkey)
|
||||
kindVal := new(types.Uint16)
|
||||
kindVal.Set(e.Kind)
|
||||
dTagHash := new(types.Ident)
|
||||
dTagHash.FromIdent(e.DTag)
|
||||
|
||||
if err = indexes.AddressableEventEnc(pubHash, kindVal, dTagHash).MarshalWrite(aevKeyBuf); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Append size and data
|
||||
aevKeyBuf.Write(sizeBytes)
|
||||
aevKeyBuf.Write(e.EventData)
|
||||
|
||||
if err = txn.Set(aevKeyBuf.Bytes(), nil); chk.E(err) {
|
||||
log.W.F("failed to write aev key for serial %d: %v", e.Serial, err)
|
||||
continue
|
||||
}
|
||||
} else if e.IsReplaceable {
|
||||
// Replaceable event: rev|pubkey_hash|kind|size|data
|
||||
revKeyBuf := new(bytes.Buffer)
|
||||
pubHash := new(types.PubHash)
|
||||
pubHash.FromPubkey(e.Pubkey)
|
||||
kindVal := new(types.Uint16)
|
||||
kindVal.Set(e.Kind)
|
||||
|
||||
if err = indexes.ReplaceableEventEnc(pubHash, kindVal).MarshalWrite(revKeyBuf); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Append size and data
|
||||
revKeyBuf.Write(sizeBytes)
|
||||
revKeyBuf.Write(e.EventData)
|
||||
|
||||
if err = txn.Set(revKeyBuf.Bytes(), nil); chk.E(err) {
|
||||
log.W.F("failed to write rev key for serial %d: %v", e.Serial, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Delete old evt key
|
||||
if err = txn.Delete(e.OldKey); chk.E(err) {
|
||||
log.W.F("failed to delete old event key for serial %d: %v", e.Serial, err)
|
||||
continue
|
||||
}
|
||||
deletedCount++
|
||||
}
|
||||
return nil
|
||||
},
|
||||
); chk.E(err) {
|
||||
log.W.F("batch update failed: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if (i/batchSize)%10 == 0 && i > 0 {
|
||||
log.I.F("progress: %d/%d events converted", i, len(events))
|
||||
}
|
||||
}
|
||||
|
||||
log.I.F("migration complete: converted %d events to optimized inline storage, deleted %d old keys", convertedCount, deletedCount)
|
||||
}
|
||||
|
||||
@@ -177,6 +177,19 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
return
|
||||
}
|
||||
log.T.F("SaveEvent: generated %d indexes for event %x (kind %d)", len(idxs), ev.ID, ev.Kind)
|
||||
|
||||
// Serialize event once to check size
|
||||
eventDataBuf := new(bytes.Buffer)
|
||||
ev.MarshalBinary(eventDataBuf)
|
||||
eventData := eventDataBuf.Bytes()
|
||||
|
||||
// Determine storage strategy (Reiser4 optimizations)
|
||||
// 384 bytes covers: ID(32) + Pubkey(32) + Sig(64) + basic fields + small content
|
||||
const smallEventThreshold = 384
|
||||
isSmallEvent := len(eventData) <= smallEventThreshold
|
||||
isReplaceableEvent := kind.IsReplaceable(ev.Kind)
|
||||
isAddressableEvent := kind.IsParameterizedReplaceable(ev.Kind)
|
||||
|
||||
// Start a transaction to save the event and all its indexes
|
||||
err = d.Update(
|
||||
func(txn *badger.Txn) (err error) {
|
||||
@@ -185,26 +198,98 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
if err = ser.Set(serial); chk.E(err) {
|
||||
return
|
||||
}
|
||||
keyBuf := new(bytes.Buffer)
|
||||
if err = indexes.EventEnc(ser).MarshalWrite(keyBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
kb := keyBuf.Bytes()
|
||||
|
||||
// Pre-allocate value buffer
|
||||
valueBuf := new(bytes.Buffer)
|
||||
ev.MarshalBinary(valueBuf)
|
||||
vb := valueBuf.Bytes()
|
||||
|
||||
|
||||
// Save each index
|
||||
for _, key := range idxs {
|
||||
if err = txn.Set(key, nil); chk.E(err) {
|
||||
return
|
||||
}
|
||||
}
|
||||
// write the event
|
||||
if err = txn.Set(kb, vb); chk.E(err) {
|
||||
return
|
||||
|
||||
// Write the event using optimized storage strategy
|
||||
// Determine if we should use inline addressable/replaceable storage
|
||||
useAddressableInline := false
|
||||
var dTag *tag.T
|
||||
if isAddressableEvent && isSmallEvent {
|
||||
dTag = ev.Tags.GetFirst([]byte("d"))
|
||||
useAddressableInline = dTag != nil
|
||||
}
|
||||
|
||||
// All small events get a sev key for serial-based access
|
||||
if isSmallEvent {
|
||||
// Small event: store inline with sev prefix
|
||||
// Format: sev|serial|size_uint16|event_data
|
||||
keyBuf := new(bytes.Buffer)
|
||||
if err = indexes.SmallEventEnc(ser).MarshalWrite(keyBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// Append size as uint16 big-endian (2 bytes for size up to 65535)
|
||||
sizeBytes := []byte{byte(len(eventData) >> 8), byte(len(eventData))}
|
||||
keyBuf.Write(sizeBytes)
|
||||
// Append event data
|
||||
keyBuf.Write(eventData)
|
||||
|
||||
if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
|
||||
return
|
||||
}
|
||||
log.T.F("SaveEvent: stored small event inline (%d bytes)", len(eventData))
|
||||
} else {
|
||||
// Large event: store separately with evt prefix
|
||||
keyBuf := new(bytes.Buffer)
|
||||
if err = indexes.EventEnc(ser).MarshalWrite(keyBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if err = txn.Set(keyBuf.Bytes(), eventData); chk.E(err) {
|
||||
return
|
||||
}
|
||||
log.T.F("SaveEvent: stored large event separately (%d bytes)", len(eventData))
|
||||
}
|
||||
|
||||
// Additionally, store replaceable/addressable events with specialized keys for direct access
|
||||
if useAddressableInline {
|
||||
// Addressable event: also store with aev|pubkey_hash|kind|dtag_hash|size|data
|
||||
pubHash := new(types.PubHash)
|
||||
pubHash.FromPubkey(ev.Pubkey)
|
||||
kindVal := new(types.Uint16)
|
||||
kindVal.Set(ev.Kind)
|
||||
dTagHash := new(types.Ident)
|
||||
dTagHash.FromIdent(dTag.Value())
|
||||
|
||||
keyBuf := new(bytes.Buffer)
|
||||
if err = indexes.AddressableEventEnc(pubHash, kindVal, dTagHash).MarshalWrite(keyBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// Append size as uint16 big-endian
|
||||
sizeBytes := []byte{byte(len(eventData) >> 8), byte(len(eventData))}
|
||||
keyBuf.Write(sizeBytes)
|
||||
// Append event data
|
||||
keyBuf.Write(eventData)
|
||||
|
||||
if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
|
||||
return
|
||||
}
|
||||
log.T.F("SaveEvent: also stored addressable event with specialized key")
|
||||
} else if isReplaceableEvent && isSmallEvent {
|
||||
// Replaceable event: also store with rev|pubkey_hash|kind|size|data
|
||||
pubHash := new(types.PubHash)
|
||||
pubHash.FromPubkey(ev.Pubkey)
|
||||
kindVal := new(types.Uint16)
|
||||
kindVal.Set(ev.Kind)
|
||||
|
||||
keyBuf := new(bytes.Buffer)
|
||||
if err = indexes.ReplaceableEventEnc(pubHash, kindVal).MarshalWrite(keyBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// Append size as uint16 big-endian
|
||||
sizeBytes := []byte{byte(len(eventData) >> 8), byte(len(eventData))}
|
||||
keyBuf.Write(sizeBytes)
|
||||
// Append event data
|
||||
keyBuf.Write(eventData)
|
||||
|
||||
if err = txn.Set(keyBuf.Bytes(), nil); chk.E(err) {
|
||||
return
|
||||
}
|
||||
log.T.F("SaveEvent: also stored replaceable event with specialized key")
|
||||
}
|
||||
return
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user