lmdb/badger: change (fix?) and simplify queries (offsets and created_at stuff should be simpler and q-dependent).

This commit is contained in:
fiatjaf
2023-11-28 16:04:29 -03:00
parent 348bed02e9
commit b95cfc42cc
8 changed files with 114 additions and 51 deletions

View File

@@ -13,16 +13,16 @@ import (
func (b BadgerBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) { func (b BadgerBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) {
var count int64 = 0 var count int64 = 0
queries, extraFilter, since, prefixLen, idxOffset, err := prepareQueries(filter) queries, extraFilter, since, err := prepareQueries(filter)
if err != nil { if err != nil {
return 0, err return 0, err
} }
err = b.View(func(txn *badger.Txn) error { err = b.View(func(txn *badger.Txn) error {
// iterate only through keys and in reverse order // iterate only through keys and in reverse order
opts := badger.DefaultIteratorOptions opts := badger.IteratorOptions{
opts.PrefetchValues = false Reverse: true,
opts.Reverse = true }
// actually iterate // actually iterate
for _, q := range queries { for _, q := range queries {
@@ -33,8 +33,11 @@ func (b BadgerBackend) CountEvents(ctx context.Context, filter nostr.Filter) (in
item := it.Item() item := it.Item()
key := item.Key() key := item.Key()
idxOffset := len(key) - 4 // this is where the idx actually starts
// "id" indexes don't contain a timestamp
if !q.skipTimestamp { if !q.skipTimestamp {
createdAt := binary.BigEndian.Uint32(key[prefixLen:idxOffset]) createdAt := binary.BigEndian.Uint32(key[idxOffset-4 : idxOffset])
if createdAt < since { if createdAt < since {
break break
} }

View File

@@ -24,8 +24,9 @@ func (b *BadgerBackend) DeleteEvent(ctx context.Context, evt *nostr.Event) error
prefix := make([]byte, 1+32) prefix := make([]byte, 1+32)
prefix[0] = indexIdPrefix prefix[0] = indexIdPrefix
copy(prefix[1:], id) copy(prefix[1:], id)
opts := badger.DefaultIteratorOptions opts := badger.IteratorOptions{
opts.PrefetchValues = false PrefetchValues: false,
}
it := txn.NewIterator(opts) it := txn.NewIterator(opts)
it.Seek(prefix) it.Seek(prefix)
if it.ValidForPrefix(prefix) { if it.ValidForPrefix(prefix) {

View File

@@ -10,6 +10,7 @@ import (
) )
const ( const (
dbVersionKey byte = 255
rawEventStorePrefix byte = 0 rawEventStorePrefix byte = 0
indexCreatedAtPrefix byte = 1 indexCreatedAtPrefix byte = 1
indexIdPrefix byte = 2 indexIdPrefix byte = 2
@@ -17,6 +18,7 @@ const (
indexPubkeyPrefix byte = 4 indexPubkeyPrefix byte = 4
indexPubkeyKindPrefix byte = 5 indexPubkeyKindPrefix byte = 5
indexTagPrefix byte = 6 indexTagPrefix byte = 6
indexTag32Prefix byte = 7
) )
var _ eventstore.Store = (*BadgerBackend)(nil) var _ eventstore.Store = (*BadgerBackend)(nil)
@@ -114,15 +116,18 @@ func getIndexKeysForEvent(evt *nostr.Event, idx []byte) [][]byte {
} }
var v []byte var v []byte
var indexPrefix byte
if vb, _ := hex.DecodeString(tag[1]); len(vb) == 32 { if vb, _ := hex.DecodeString(tag[1]); len(vb) == 32 {
// store value as bytes // store value as bytes
v = vb v = vb
indexPrefix = indexTag32Prefix
} else { } else {
v = []byte(tag[1]) v = []byte(tag[1])
indexPrefix = indexTagPrefix
} }
k := make([]byte, 1+len(v)+4+4) k := make([]byte, 1+len(v)+4+4)
k[0] = indexTagPrefix k[0] = indexPrefix
copy(k[1:], v) copy(k[1:], v)
binary.BigEndian.PutUint32(k[1+len(v):], uint32(evt.CreatedAt)) binary.BigEndian.PutUint32(k[1+len(v):], uint32(evt.CreatedAt))
copy(k[1+len(v)+4:], idx) copy(k[1+len(v)+4:], idx)

68
badger/migrations.go Normal file
View File

@@ -0,0 +1,68 @@
package badger
import (
"encoding/binary"
"log"
"github.com/dgraph-io/badger/v4"
)
func (b *BadgerBackend) runMigrations() error {
return b.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte{dbVersionKey})
if err != nil {
return err
}
item.Value(func(val []byte) error {
var version uint16
// do the migrations in increasing steps (there is no rollback)
//
if version < 1 {
log.Println("migration 1: move all keys from indexTag to indexTag32 if they are 32-bytes")
prefix := []byte{indexTagPrefix}
it := txn.NewIterator(badger.IteratorOptions{
PrefetchValues: true,
PrefetchSize: 100,
Prefix: prefix,
})
defer it.Close()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
key := item.Key()
if len(key) == 1+32+2+4 {
// it's 32 bytes
log.Printf("moving key %x", key)
if err := txn.Delete(key); err != nil {
return err
}
key[0] = indexTag32Prefix
txn.Set(key, nil)
}
}
// bump version
if err := b.bumpVersion(txn, 1); err != nil {
return err
}
}
if version < 2 {
// ...
}
return nil
})
return nil
})
}
func (b *BadgerBackend) bumpVersion(txn *badger.Txn, version uint16) error {
buf := make([]byte, 2)
binary.BigEndian.PutUint16(buf, version)
return txn.Set([]byte{dbVersionKey}, buf)
}

View File

@@ -29,17 +29,19 @@ type queryEvent struct {
func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
ch := make(chan *nostr.Event) ch := make(chan *nostr.Event)
queries, extraFilter, since, prefixLen, idxOffset, err := prepareQueries(filter) queries, extraFilter, since, err := prepareQueries(filter)
if err != nil { if err != nil {
return nil, err return nil, err
} }
fmt.Println(filter)
go func() { go func() {
err := b.View(func(txn *badger.Txn) error { err := b.View(func(txn *badger.Txn) error {
// iterate only through keys and in reverse order // iterate only through keys and in reverse order
opts := badger.DefaultIteratorOptions opts := badger.IteratorOptions{
opts.PrefetchValues = false Reverse: true,
opts.Reverse = true }
// actually iterate // actually iterate
iteratorClosers := make([]func(), len(queries)) iteratorClosers := make([]func(), len(queries))
@@ -54,12 +56,11 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
item := it.Item() item := it.Item()
key := item.Key() key := item.Key()
if len(key)-4 != idxOffset { idxOffset := len(key) - 4 // this is where the idx actually starts
continue
}
// "id" indexes don't contain a timestamp
if !q.skipTimestamp { if !q.skipTimestamp {
createdAt := binary.BigEndian.Uint32(key[prefixLen:idxOffset]) createdAt := binary.BigEndian.Uint32(key[idxOffset-4 : idxOffset])
if createdAt < since { if createdAt < since {
break break
} }
@@ -199,8 +200,6 @@ func prepareQueries(filter nostr.Filter) (
queries []query, queries []query,
extraFilter *nostr.Filter, extraFilter *nostr.Filter,
since uint32, since uint32,
prefixLen int,
idxOffset int,
err error, err error,
) { ) {
var index byte var index byte
@@ -213,7 +212,7 @@ func prepareQueries(filter nostr.Filter) (
prefix[0] = index prefix[0] = index
id, _ := hex.DecodeString(idHex) id, _ := hex.DecodeString(idHex)
if len(id) != 32 { if len(id) != 32 {
return nil, nil, 0, 0, 0, fmt.Errorf("invalid id '%s'", idHex) return nil, nil, 0, fmt.Errorf("invalid id '%s'", idHex)
} }
copy(prefix[1:], id) copy(prefix[1:], id)
queries[i] = query{i: i, prefix: prefix, skipTimestamp: true} queries[i] = query{i: i, prefix: prefix, skipTimestamp: true}
@@ -225,7 +224,7 @@ func prepareQueries(filter nostr.Filter) (
for i, pubkeyHex := range filter.Authors { for i, pubkeyHex := range filter.Authors {
pubkey, _ := hex.DecodeString(pubkeyHex) pubkey, _ := hex.DecodeString(pubkeyHex)
if len(pubkey) != 32 { if len(pubkey) != 32 {
return nil, nil, 0, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) return nil, nil, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex)
} }
prefix := make([]byte, 1+32) prefix := make([]byte, 1+32)
prefix[0] = index prefix[0] = index
@@ -240,7 +239,7 @@ func prepareQueries(filter nostr.Filter) (
for _, kind := range filter.Kinds { for _, kind := range filter.Kinds {
pubkey, _ := hex.DecodeString(pubkeyHex) pubkey, _ := hex.DecodeString(pubkeyHex)
if len(pubkey) != 32 { if len(pubkey) != 32 {
return nil, nil, 0, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) return nil, nil, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex)
} }
prefix := make([]byte, 1+32+2) prefix := make([]byte, 1+32+2)
prefix[0] = index prefix[0] = index
@@ -253,15 +252,13 @@ func prepareQueries(filter nostr.Filter) (
} }
extraFilter = &nostr.Filter{Tags: filter.Tags} extraFilter = &nostr.Filter{Tags: filter.Tags}
} else if len(filter.Tags) > 0 { } else if len(filter.Tags) > 0 {
index = indexTagPrefix
// determine the size of the queries array by inspecting all tags sizes // determine the size of the queries array by inspecting all tags sizes
size := 0 size := 0
for _, values := range filter.Tags { for _, values := range filter.Tags {
size += len(values) size += len(values)
} }
if size == 0 { if size == 0 {
return nil, nil, 0, 0, 0, fmt.Errorf("empty tag filters") return nil, nil, 0, fmt.Errorf("empty tag filters")
} }
queries = make([]query, size) queries = make([]query, size)
@@ -275,10 +272,12 @@ func prepareQueries(filter nostr.Filter) (
if len(bv) == 32 { if len(bv) == 32 {
// hex tag // hex tag
size = 32 size = 32
index = indexTag32Prefix
} else { } else {
// string tag // string tag
bv = []byte(value) bv = []byte(value)
size = len(bv) size = len(bv)
index = indexTagPrefix
} }
prefix := make([]byte, 1+size) prefix := make([]byte, 1+size)
prefix[0] = index prefix[0] = index
@@ -305,16 +304,6 @@ func prepareQueries(filter nostr.Filter) (
extraFilter = nil extraFilter = nil
} }
prefixLen = len(queries[0].prefix)
// the idx -- i.e. the key to the raw event store -- is at the end of
// the index key, not in the value, this is the offset for us to read it
if index == indexIdPrefix {
idxOffset = prefixLen
} else {
idxOffset = prefixLen + 4 // add 4 bytes for the created_at
}
var until uint32 = 4294967295 var until uint32 = 4294967295
if filter.Until != nil { if filter.Until != nil {
if fu := uint32(*filter.Until); fu < until { if fu := uint32(*filter.Until); fu < until {
@@ -333,5 +322,5 @@ func prepareQueries(filter nostr.Filter) (
} }
} }
return queries, extraFilter, since, prefixLen, idxOffset, nil return queries, extraFilter, since, nil
} }

View File

@@ -17,9 +17,7 @@ func (b *BadgerBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error {
prefix := make([]byte, 1+32) prefix := make([]byte, 1+32)
prefix[0] = indexIdPrefix prefix[0] = indexIdPrefix
copy(prefix[1:], id) copy(prefix[1:], id)
opts := badger.DefaultIteratorOptions it := txn.NewIterator(badger.IteratorOptions{})
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close() defer it.Close()
it.Seek(prefix) it.Seek(prefix)
if it.ValidForPrefix(prefix) { if it.ValidForPrefix(prefix) {

View File

@@ -13,7 +13,7 @@ import (
func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) { func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) {
var count int64 = 0 var count int64 = 0
queries, extraFilter, since, prefixLen, err := b.prepareQueries(filter) queries, extraFilter, since, err := b.prepareQueries(filter)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@@ -46,12 +46,13 @@ func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int
for { for {
// we already have a k and a v and an err from the cursor setup, so check and use these // we already have a k and a v and an err from the cursor setup, so check and use these
if iterr != nil || !bytes.Equal(q.prefix, k[0:prefixLen]) { if iterr != nil || !bytes.HasPrefix(k, q.prefix) {
break break
} }
// "id" indexes don't contain a timestamp
if !q.skipTimestamp { if !q.skipTimestamp {
createdAt := binary.BigEndian.Uint32(k[prefixLen:]) createdAt := binary.BigEndian.Uint32(k[len(k)-4:])
if createdAt < since { if createdAt < since {
break break
} }

View File

@@ -30,7 +30,7 @@ type queryEvent struct {
func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
ch := make(chan *nostr.Event) ch := make(chan *nostr.Event)
queries, extraFilter, since, prefixLen, err := b.prepareQueries(filter) queries, extraFilter, since, err := b.prepareQueries(filter)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -81,8 +81,9 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
return return
} }
// "id" indexes don't contain a timestamp
if !q.skipTimestamp { if !q.skipTimestamp {
createdAt := binary.BigEndian.Uint32(k[prefixLen:]) createdAt := binary.BigEndian.Uint32(k[len(k)-4:])
if createdAt < since { if createdAt < since {
break break
} }
@@ -210,7 +211,6 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
queries []query, queries []query,
extraFilter *nostr.Filter, extraFilter *nostr.Filter,
since uint32, since uint32,
prefixLen int,
err error, err error,
) { ) {
if len(filter.IDs) > 0 { if len(filter.IDs) > 0 {
@@ -218,7 +218,7 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
for i, idHex := range filter.IDs { for i, idHex := range filter.IDs {
prefix, _ := hex.DecodeString(idHex) prefix, _ := hex.DecodeString(idHex)
if len(prefix) != 32 { if len(prefix) != 32 {
return nil, nil, 0, 0, fmt.Errorf("invalid id '%s'", idHex) return nil, nil, 0, fmt.Errorf("invalid id '%s'", idHex)
} }
queries[i] = query{i: i, dbi: b.indexId, prefix: prefix, skipTimestamp: true} queries[i] = query{i: i, dbi: b.indexId, prefix: prefix, skipTimestamp: true}
} }
@@ -228,7 +228,7 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
for i, pubkeyHex := range filter.Authors { for i, pubkeyHex := range filter.Authors {
prefix, _ := hex.DecodeString(pubkeyHex) prefix, _ := hex.DecodeString(pubkeyHex)
if len(prefix) != 32 { if len(prefix) != 32 {
return nil, nil, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) return nil, nil, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex)
} }
queries[i] = query{i: i, dbi: b.indexPubkey, prefix: prefix} queries[i] = query{i: i, dbi: b.indexPubkey, prefix: prefix}
} }
@@ -239,7 +239,7 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
for _, kind := range filter.Kinds { for _, kind := range filter.Kinds {
pubkey, _ := hex.DecodeString(pubkeyHex) pubkey, _ := hex.DecodeString(pubkeyHex)
if len(pubkey) != 32 { if len(pubkey) != 32 {
return nil, nil, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) return nil, nil, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex)
} }
prefix := make([]byte, 32+2) prefix := make([]byte, 32+2)
copy(prefix[:], pubkey) copy(prefix[:], pubkey)
@@ -257,7 +257,7 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
size += len(values) size += len(values)
} }
if size == 0 { if size == 0 {
return nil, nil, 0, 0, fmt.Errorf("empty tag filters") return nil, nil, 0, fmt.Errorf("empty tag filters")
} }
queries = make([]query, size) queries = make([]query, size)
@@ -299,8 +299,6 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
extraFilter = nil extraFilter = nil
} }
prefixLen = len(queries[0].prefix)
var until uint32 = 4294967295 var until uint32 = 4294967295
if filter.Until != nil { if filter.Until != nil {
if fu := uint32(*filter.Until); fu < until { if fu := uint32(*filter.Until); fu < until {
@@ -319,5 +317,5 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
} }
} }
return queries, extraFilter, since, prefixLen, nil return queries, extraFilter, since, nil
} }