From b95cfc42cc2ec529b1cbe08098d228170777b32d Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Tue, 28 Nov 2023 16:04:29 -0300 Subject: [PATCH] lmdb/badger: change (fix?) and simplify queries (offsets and created_at stuff should be simpler and q-dependent). --- badger/count.go | 13 +++++---- badger/delete.go | 5 ++-- badger/lib.go | 7 ++++- badger/migrations.go | 68 ++++++++++++++++++++++++++++++++++++++++++++ badger/query.go | 43 +++++++++++----------------- badger/save.go | 4 +-- lmdb/count.go | 7 +++-- lmdb/query.go | 18 ++++++------ 8 files changed, 114 insertions(+), 51 deletions(-) create mode 100644 badger/migrations.go diff --git a/badger/count.go b/badger/count.go index ceef89e..3e1d77e 100644 --- a/badger/count.go +++ b/badger/count.go @@ -13,16 +13,16 @@ import ( func (b BadgerBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) { var count int64 = 0 - queries, extraFilter, since, prefixLen, idxOffset, err := prepareQueries(filter) + queries, extraFilter, since, err := prepareQueries(filter) if err != nil { return 0, err } err = b.View(func(txn *badger.Txn) error { // iterate only through keys and in reverse order - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false - opts.Reverse = true + opts := badger.IteratorOptions{ + Reverse: true, + } // actually iterate for _, q := range queries { @@ -33,8 +33,11 @@ func (b BadgerBackend) CountEvents(ctx context.Context, filter nostr.Filter) (in item := it.Item() key := item.Key() + idxOffset := len(key) - 4 // this is where the idx actually starts + + // "id" indexes don't contain a timestamp if !q.skipTimestamp { - createdAt := binary.BigEndian.Uint32(key[prefixLen:idxOffset]) + createdAt := binary.BigEndian.Uint32(key[idxOffset-4 : idxOffset]) if createdAt < since { break } diff --git a/badger/delete.go b/badger/delete.go index 167d9cf..7137d62 100644 --- a/badger/delete.go +++ b/badger/delete.go @@ -24,8 +24,9 @@ func (b *BadgerBackend) DeleteEvent(ctx context.Context, evt *nostr.Event) error prefix := make([]byte, 1+32) prefix[0] = indexIdPrefix copy(prefix[1:], id) - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false + opts := badger.IteratorOptions{ + PrefetchValues: false, + } it := txn.NewIterator(opts) it.Seek(prefix) if it.ValidForPrefix(prefix) { diff --git a/badger/lib.go b/badger/lib.go index f895589..4b4a6f4 100644 --- a/badger/lib.go +++ b/badger/lib.go @@ -10,6 +10,7 @@ import ( ) const ( + dbVersionKey byte = 255 rawEventStorePrefix byte = 0 indexCreatedAtPrefix byte = 1 indexIdPrefix byte = 2 @@ -17,6 +18,7 @@ const ( indexPubkeyPrefix byte = 4 indexPubkeyKindPrefix byte = 5 indexTagPrefix byte = 6 + indexTag32Prefix byte = 7 ) var _ eventstore.Store = (*BadgerBackend)(nil) @@ -114,15 +116,18 @@ func getIndexKeysForEvent(evt *nostr.Event, idx []byte) [][]byte { } var v []byte + var indexPrefix byte if vb, _ := hex.DecodeString(tag[1]); len(vb) == 32 { // store value as bytes v = vb + indexPrefix = indexTag32Prefix } else { v = []byte(tag[1]) + indexPrefix = indexTagPrefix } k := make([]byte, 1+len(v)+4+4) - k[0] = indexTagPrefix + k[0] = indexPrefix copy(k[1:], v) binary.BigEndian.PutUint32(k[1+len(v):], uint32(evt.CreatedAt)) copy(k[1+len(v)+4:], idx) diff --git a/badger/migrations.go b/badger/migrations.go new file mode 100644 index 0000000..8236bf8 --- /dev/null +++ b/badger/migrations.go @@ -0,0 +1,68 @@ +package badger + +import ( + "encoding/binary" + "log" + + "github.com/dgraph-io/badger/v4" +) + +func (b *BadgerBackend) runMigrations() error { + return b.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte{dbVersionKey}) + if err != nil { + return err + } + item.Value(func(val []byte) error { + var version uint16 + + // do the migrations in increasing steps (there is no rollback) + // + + if version < 1 { + log.Println("migration 1: move all keys from indexTag to indexTag32 if they are 32-bytes") + prefix := []byte{indexTagPrefix} + it := txn.NewIterator(badger.IteratorOptions{ + PrefetchValues: true, + PrefetchSize: 100, + Prefix: prefix, + }) + defer it.Close() + + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + key := item.Key() + + if len(key) == 1+32+2+4 { + // it's 32 bytes + log.Printf("moving key %x", key) + if err := txn.Delete(key); err != nil { + return err + } + key[0] = indexTag32Prefix + txn.Set(key, nil) + } + } + + // bump version + if err := b.bumpVersion(txn, 1); err != nil { + return err + } + } + + if version < 2 { + // ... + } + + return nil + }) + + return nil + }) +} + +func (b *BadgerBackend) bumpVersion(txn *badger.Txn, version uint16) error { + buf := make([]byte, 2) + binary.BigEndian.PutUint16(buf, version) + return txn.Set([]byte{dbVersionKey}, buf) +} diff --git a/badger/query.go b/badger/query.go index b54fdf9..a1c7d57 100644 --- a/badger/query.go +++ b/badger/query.go @@ -29,17 +29,19 @@ type queryEvent struct { func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { ch := make(chan *nostr.Event) - queries, extraFilter, since, prefixLen, idxOffset, err := prepareQueries(filter) + queries, extraFilter, since, err := prepareQueries(filter) if err != nil { return nil, err } + fmt.Println(filter) + go func() { err := b.View(func(txn *badger.Txn) error { // iterate only through keys and in reverse order - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false - opts.Reverse = true + opts := badger.IteratorOptions{ + Reverse: true, + } // actually iterate iteratorClosers := make([]func(), len(queries)) @@ -54,12 +56,11 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch item := it.Item() key := item.Key() - if len(key)-4 != idxOffset { - continue - } + idxOffset := len(key) - 4 // this is where the idx actually starts + // "id" indexes don't contain a timestamp if !q.skipTimestamp { - createdAt := binary.BigEndian.Uint32(key[prefixLen:idxOffset]) + createdAt := binary.BigEndian.Uint32(key[idxOffset-4 : idxOffset]) if createdAt < since { break } @@ -199,8 +200,6 @@ func prepareQueries(filter nostr.Filter) ( queries []query, extraFilter *nostr.Filter, since uint32, - prefixLen int, - idxOffset int, err error, ) { var index byte @@ -213,7 +212,7 @@ func prepareQueries(filter nostr.Filter) ( prefix[0] = index id, _ := hex.DecodeString(idHex) if len(id) != 32 { - return nil, nil, 0, 0, 0, fmt.Errorf("invalid id '%s'", idHex) + return nil, nil, 0, fmt.Errorf("invalid id '%s'", idHex) } copy(prefix[1:], id) queries[i] = query{i: i, prefix: prefix, skipTimestamp: true} @@ -225,7 +224,7 @@ func prepareQueries(filter nostr.Filter) ( for i, pubkeyHex := range filter.Authors { pubkey, _ := hex.DecodeString(pubkeyHex) if len(pubkey) != 32 { - return nil, nil, 0, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) + return nil, nil, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) } prefix := make([]byte, 1+32) prefix[0] = index @@ -240,7 +239,7 @@ func prepareQueries(filter nostr.Filter) ( for _, kind := range filter.Kinds { pubkey, _ := hex.DecodeString(pubkeyHex) if len(pubkey) != 32 { - return nil, nil, 0, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) + return nil, nil, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) } prefix := make([]byte, 1+32+2) prefix[0] = index @@ -253,15 +252,13 @@ func prepareQueries(filter nostr.Filter) ( } extraFilter = &nostr.Filter{Tags: filter.Tags} } else if len(filter.Tags) > 0 { - index = indexTagPrefix - // determine the size of the queries array by inspecting all tags sizes size := 0 for _, values := range filter.Tags { size += len(values) } if size == 0 { - return nil, nil, 0, 0, 0, fmt.Errorf("empty tag filters") + return nil, nil, 0, fmt.Errorf("empty tag filters") } queries = make([]query, size) @@ -275,10 +272,12 @@ func prepareQueries(filter nostr.Filter) ( if len(bv) == 32 { // hex tag size = 32 + index = indexTag32Prefix } else { // string tag bv = []byte(value) size = len(bv) + index = indexTagPrefix } prefix := make([]byte, 1+size) prefix[0] = index @@ -305,16 +304,6 @@ func prepareQueries(filter nostr.Filter) ( extraFilter = nil } - prefixLen = len(queries[0].prefix) - - // the idx -- i.e. the key to the raw event store -- is at the end of - // the index key, not in the value, this is the offset for us to read it - if index == indexIdPrefix { - idxOffset = prefixLen - } else { - idxOffset = prefixLen + 4 // add 4 bytes for the created_at - } - var until uint32 = 4294967295 if filter.Until != nil { if fu := uint32(*filter.Until); fu < until { @@ -333,5 +322,5 @@ func prepareQueries(filter nostr.Filter) ( } } - return queries, extraFilter, since, prefixLen, idxOffset, nil + return queries, extraFilter, since, nil } diff --git a/badger/save.go b/badger/save.go index 4760126..b9ee044 100644 --- a/badger/save.go +++ b/badger/save.go @@ -17,9 +17,7 @@ func (b *BadgerBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error { prefix := make([]byte, 1+32) prefix[0] = indexIdPrefix copy(prefix[1:], id) - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false - it := txn.NewIterator(opts) + it := txn.NewIterator(badger.IteratorOptions{}) defer it.Close() it.Seek(prefix) if it.ValidForPrefix(prefix) { diff --git a/lmdb/count.go b/lmdb/count.go index 728859a..81cdd86 100644 --- a/lmdb/count.go +++ b/lmdb/count.go @@ -13,7 +13,7 @@ import ( func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) { var count int64 = 0 - queries, extraFilter, since, prefixLen, err := b.prepareQueries(filter) + queries, extraFilter, since, err := b.prepareQueries(filter) if err != nil { return 0, err } @@ -46,12 +46,13 @@ func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int for { // we already have a k and a v and an err from the cursor setup, so check and use these - if iterr != nil || !bytes.Equal(q.prefix, k[0:prefixLen]) { + if iterr != nil || !bytes.HasPrefix(k, q.prefix) { break } + // "id" indexes don't contain a timestamp if !q.skipTimestamp { - createdAt := binary.BigEndian.Uint32(k[prefixLen:]) + createdAt := binary.BigEndian.Uint32(k[len(k)-4:]) if createdAt < since { break } diff --git a/lmdb/query.go b/lmdb/query.go index 091b0af..d62e761 100644 --- a/lmdb/query.go +++ b/lmdb/query.go @@ -30,7 +30,7 @@ type queryEvent struct { func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { ch := make(chan *nostr.Event) - queries, extraFilter, since, prefixLen, err := b.prepareQueries(filter) + queries, extraFilter, since, err := b.prepareQueries(filter) if err != nil { return nil, err } @@ -81,8 +81,9 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha return } + // "id" indexes don't contain a timestamp if !q.skipTimestamp { - createdAt := binary.BigEndian.Uint32(k[prefixLen:]) + createdAt := binary.BigEndian.Uint32(k[len(k)-4:]) if createdAt < since { break } @@ -210,7 +211,6 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( queries []query, extraFilter *nostr.Filter, since uint32, - prefixLen int, err error, ) { if len(filter.IDs) > 0 { @@ -218,7 +218,7 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( for i, idHex := range filter.IDs { prefix, _ := hex.DecodeString(idHex) if len(prefix) != 32 { - return nil, nil, 0, 0, fmt.Errorf("invalid id '%s'", idHex) + return nil, nil, 0, fmt.Errorf("invalid id '%s'", idHex) } queries[i] = query{i: i, dbi: b.indexId, prefix: prefix, skipTimestamp: true} } @@ -228,7 +228,7 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( for i, pubkeyHex := range filter.Authors { prefix, _ := hex.DecodeString(pubkeyHex) if len(prefix) != 32 { - return nil, nil, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) + return nil, nil, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) } queries[i] = query{i: i, dbi: b.indexPubkey, prefix: prefix} } @@ -239,7 +239,7 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( for _, kind := range filter.Kinds { pubkey, _ := hex.DecodeString(pubkeyHex) if len(pubkey) != 32 { - return nil, nil, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) + return nil, nil, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) } prefix := make([]byte, 32+2) copy(prefix[:], pubkey) @@ -257,7 +257,7 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( size += len(values) } if size == 0 { - return nil, nil, 0, 0, fmt.Errorf("empty tag filters") + return nil, nil, 0, fmt.Errorf("empty tag filters") } queries = make([]query, size) @@ -299,8 +299,6 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( extraFilter = nil } - prefixLen = len(queries[0].prefix) - var until uint32 = 4294967295 if filter.Until != nil { if fu := uint32(*filter.Until); fu < until { @@ -319,5 +317,5 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( } } - return queries, extraFilter, since, prefixLen, nil + return queries, extraFilter, since, nil }