diff --git a/lmdb/count.go b/lmdb/count.go index 8e4774a..728859a 100644 --- a/lmdb/count.go +++ b/lmdb/count.go @@ -13,7 +13,7 @@ import ( func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) { var count int64 = 0 - dbi, queries, extraFilter, since, prefixLen, err := b.prepareQueries(filter) + queries, extraFilter, since, prefixLen, err := b.prepareQueries(filter) if err != nil { return 0, err } @@ -21,7 +21,7 @@ func (b *LMDBBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int err = b.lmdbEnv.View(func(txn *lmdb.Txn) error { // actually iterate for _, q := range queries { - cursor, err := txn.OpenCursor(dbi) + cursor, err := txn.OpenCursor(q.dbi) if err != nil { continue } diff --git a/lmdb/lib.go b/lmdb/lib.go index 1382962..7993212 100644 --- a/lmdb/lib.go +++ b/lmdb/lib.go @@ -31,6 +31,7 @@ type LMDBBackend struct { indexPubkey lmdb.DBI indexPubkeyKind lmdb.DBI indexTag lmdb.DBI + indexTag32 lmdb.DBI lastId atomic.Uint32 } @@ -46,7 +47,7 @@ func (b *LMDBBackend) Init() error { return err } - env.SetMaxDBs(7) + env.SetMaxDBs(8) env.SetMaxReaders(500) env.SetMapSize(1 << 38) // ~273GB @@ -103,6 +104,11 @@ func (b *LMDBBackend) Init() error { } else { b.indexTag = dbi } + if dbi, err := txn.OpenDBI("tag32", lmdb.Create); err != nil { + return err + } else { + b.indexTag32 = dbi + } return nil }); err != nil { return err @@ -193,17 +199,20 @@ func (b *LMDBBackend) getIndexKeysForEvent(evt *nostr.Event) []key { } var v []byte + var dbi lmdb.DBI if vb, _ := hex.DecodeString(tag[1]); len(vb) == 32 { // store value as bytes v = vb + dbi = b.indexTag32 } else { v = []byte(tag[1]) + dbi = b.indexTag } k := make([]byte, len(v)+4) copy(k[:], v) binary.BigEndian.PutUint32(k[len(v):], uint32(evt.CreatedAt)) - keys = append(keys, key{dbi: b.indexTag, key: k}) + keys = append(keys, key{dbi: dbi, key: k}) } { diff --git a/lmdb/query.go b/lmdb/query.go index 8157971..7c5f373 100644 --- a/lmdb/query.go +++ b/lmdb/query.go @@ -15,6 +15,7 @@ import ( type query struct { i int + dbi lmdb.DBI prefix []byte startingPoint []byte results chan *nostr.Event @@ -29,7 +30,7 @@ type queryEvent struct { func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { ch := make(chan *nostr.Event) - dbi, queries, extraFilter, since, prefixLen, err := b.prepareQueries(filter) + queries, extraFilter, since, prefixLen, err := b.prepareQueries(filter) if err != nil { return nil, err } @@ -42,7 +43,7 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha go func(i int, q query) { defer close(q.results) - cursor, err := txn.OpenCursor(dbi) + cursor, err := txn.OpenCursor(q.dbi) if err != nil { return } @@ -74,7 +75,7 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha } // we already have a k and a v and an err from the cursor setup, so check and use these - if iterr != nil || !bytes.Equal(q.prefix, k[0:prefixLen]) { + if iterr != nil || !bytes.HasPrefix(q.prefix, k) { return } @@ -204,7 +205,6 @@ func (pq *priorityQueue) Pop() any { } func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( - dbi lmdb.DBI, queries []query, extraFilter *nostr.Filter, since uint32, @@ -212,48 +212,43 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( err error, ) { if len(filter.IDs) > 0 { - dbi = b.indexId queries = make([]query, len(filter.IDs)) for i, idHex := range filter.IDs { prefix, _ := hex.DecodeString(idHex) if len(prefix) != 32 { - return dbi, nil, nil, 0, 0, fmt.Errorf("invalid id '%s'", idHex) + return nil, nil, 0, 0, fmt.Errorf("invalid id '%s'", idHex) } - queries[i] = query{i: i, prefix: prefix, skipTimestamp: true} + queries[i] = query{i: i, dbi: b.indexId, prefix: prefix, skipTimestamp: true} } } else if len(filter.Authors) > 0 { if len(filter.Kinds) == 0 { - dbi = b.indexPubkey queries = make([]query, len(filter.Authors)) for i, pubkeyHex := range filter.Authors { prefix, _ := hex.DecodeString(pubkeyHex) if len(prefix) != 32 { - return dbi, nil, nil, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) + return nil, nil, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) } - queries[i] = query{i: i, prefix: prefix} + queries[i] = query{i: i, dbi: b.indexPubkey, prefix: prefix} } } else { - dbi = b.indexPubkeyKind queries = make([]query, len(filter.Authors)*len(filter.Kinds)) i := 0 for _, pubkeyHex := range filter.Authors { for _, kind := range filter.Kinds { pubkey, _ := hex.DecodeString(pubkeyHex) if len(pubkey) != 32 { - return dbi, nil, nil, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) + return nil, nil, 0, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) } prefix := make([]byte, 32+2) copy(prefix[:], pubkey) binary.BigEndian.PutUint16(prefix[+32:], uint16(kind)) - queries[i] = query{i: i, prefix: prefix} + queries[i] = query{i: i, dbi: b.indexPubkeyKind, prefix: prefix} i++ } } } extraFilter = &nostr.Filter{Tags: filter.Tags} } else if len(filter.Tags) > 0 { - dbi = b.indexTag - // determine the size of the queries array by inspecting all tags sizes size := 0 for _, values := range filter.Tags { @@ -265,35 +260,36 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( i := 0 for _, values := range filter.Tags { for _, value := range values { + var dbi lmdb.DBI bv, _ := hex.DecodeString(value) var size int if len(bv) == 32 { // hex tag size = 32 + dbi = b.indexTag32 } else { // string tag bv = []byte(value) size = len(bv) + dbi = b.indexTag } prefix := make([]byte, size) copy(prefix[:], bv) - queries[i] = query{i: i, prefix: prefix} + queries[i] = query{i: i, dbi: dbi, prefix: prefix} i++ } } } else if len(filter.Kinds) > 0 { - dbi = b.indexKind queries = make([]query, len(filter.Kinds)) for i, kind := range filter.Kinds { prefix := make([]byte, 2) binary.BigEndian.PutUint16(prefix[:], uint16(kind)) - queries[i] = query{i: i, prefix: prefix} + queries[i] = query{i: i, dbi: b.indexKind, prefix: prefix} } } else { - dbi = b.indexCreatedAt queries = make([]query, 1) prefix := make([]byte, 0) - queries[0] = query{i: 0, prefix: prefix} + queries[0] = query{i: 0, dbi: b.indexCreatedAt, prefix: prefix} extraFilter = nil } @@ -317,5 +313,5 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( } } - return dbi, queries, extraFilter, since, prefixLen, nil + return queries, extraFilter, since, prefixLen, nil }