lmdb: use 8-byte keys instead of 32, use indexAddr, migrations.
This commit is contained in:
@@ -3,29 +3,18 @@ package badger
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/fiatjaf/eventstore"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
func getAddrTagElements(tagValue string) (kind uint16, pkb []byte, d string) {
|
||||
spl := strings.Split(tagValue, ":")
|
||||
if len(spl) == 3 {
|
||||
if pkb, _ := hex.DecodeString(spl[1]); len(pkb) == 32 {
|
||||
if kind, err := strconv.ParseUint(spl[0], 10, 16); err == nil {
|
||||
return uint16(kind), pkb, spl[2]
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0, nil, ""
|
||||
}
|
||||
|
||||
func getTagIndexPrefix(tagValue string) ([]byte, int) {
|
||||
var k []byte // the key with full length for created_at and idx at the end, but not filled with these
|
||||
var offset int // the offset -- i.e. where the prefix ends and the created_at and idx would start
|
||||
|
||||
if kind, pkb, d := getAddrTagElements(tagValue); len(pkb) == 32 {
|
||||
if kind, pkb, d := eventstore.GetAddrTagElements(tagValue); len(pkb) == 32 {
|
||||
// store value in the new special "a" tag index
|
||||
k = make([]byte, 1+2+32+len(d)+4+4)
|
||||
k[0] = indexTagAddrPrefix
|
||||
@@ -98,8 +87,14 @@ func getIndexKeysForEvent(evt *nostr.Event, idx []byte) [][]byte {
|
||||
}
|
||||
|
||||
// ~ by tagvalue+date
|
||||
for _, tag := range evt.Tags {
|
||||
slices.SortFunc(evt.Tags, func(a, b nostr.Tag) int { return strings.Compare(a[1], b[1]) })
|
||||
for i, tag := range evt.Tags {
|
||||
if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 {
|
||||
// not indexable
|
||||
continue
|
||||
}
|
||||
if i > 0 && evt.Tags[i-1][1] == tag[1] {
|
||||
// duplicate
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"log"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/fiatjaf/eventstore"
|
||||
)
|
||||
|
||||
func (b *BadgerBackend) runMigrations() error {
|
||||
@@ -74,7 +75,7 @@ func (b *BadgerBackend) runMigrations() error {
|
||||
item := it.Item()
|
||||
key := item.Key()
|
||||
|
||||
if kind, pkb, d := getAddrTagElements(string(key[1 : len(key)-4-4])); len(pkb) == 32 {
|
||||
if kind, pkb, d := eventstore.GetAddrTagElements(string(key[1 : len(key)-4-4])); len(pkb) == 32 {
|
||||
// it's an 'a' tag or alike
|
||||
if err := txn.Delete(key); err != nil {
|
||||
return err
|
||||
|
||||
8
helpers.go
Normal file
8
helpers.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package eventstore
|
||||
|
||||
import "github.com/nbd-wtf/go-nostr"
|
||||
|
||||
func isOlder(previous, next *nostr.Event) bool {
|
||||
return previous.CreatedAt < next.CreatedAt ||
|
||||
(previous.CreatedAt == next.CreatedAt && previous.ID > next.ID)
|
||||
}
|
||||
110
lmdb/helpers.go
Normal file
110
lmdb/helpers.go
Normal file
@@ -0,0 +1,110 @@
|
||||
package lmdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"strings"
|
||||
|
||||
"github.com/PowerDNS/lmdb-go/lmdb"
|
||||
"github.com/fiatjaf/eventstore"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
func (b *LMDBBackend) getTagIndexPrefix(tagValue string) (lmdb.DBI, []byte, int) {
|
||||
var k []byte // the key with full length for created_at and idx at the end, but not filled with these
|
||||
var offset int // the offset -- i.e. where the prefix ends and the created_at and idx would start
|
||||
var dbi lmdb.DBI
|
||||
|
||||
if kind, pkb, d := eventstore.GetAddrTagElements(tagValue); len(pkb) == 32 {
|
||||
// store value in the new special "a" tag index
|
||||
k = make([]byte, 2+8+len(d)+4)
|
||||
binary.BigEndian.PutUint16(k[1:], kind)
|
||||
copy(k[2:], pkb[0:8])
|
||||
copy(k[2+8:], d)
|
||||
offset = 2 + 8 + len(d)
|
||||
dbi = b.indexTagAddr
|
||||
} else if vb, _ := hex.DecodeString(tagValue); len(vb) == 32 {
|
||||
// store value as bytes
|
||||
k = make([]byte, 8+4)
|
||||
copy(k, vb[0:8])
|
||||
offset = 8
|
||||
dbi = b.indexTag32
|
||||
} else {
|
||||
// store whatever as utf-8
|
||||
k = make([]byte, len(tagValue)+4)
|
||||
copy(k, tagValue)
|
||||
offset = len(tagValue)
|
||||
dbi = b.indexTag
|
||||
}
|
||||
|
||||
return dbi, k, offset
|
||||
}
|
||||
|
||||
func (b *LMDBBackend) getIndexKeysForEvent(evt *nostr.Event) []key {
|
||||
keys := make([]key, 0, 18)
|
||||
|
||||
// indexes
|
||||
{
|
||||
// ~ by id
|
||||
k, _ := hex.DecodeString(evt.ID)
|
||||
keys = append(keys, key{dbi: b.indexId, key: k})
|
||||
}
|
||||
|
||||
{
|
||||
// ~ by pubkey+date
|
||||
pubkey, _ := hex.DecodeString(evt.PubKey)
|
||||
k := make([]byte, 8+4)
|
||||
copy(k[:], pubkey[0:8])
|
||||
binary.BigEndian.PutUint32(k[8:], uint32(evt.CreatedAt))
|
||||
keys = append(keys, key{dbi: b.indexPubkey, key: k})
|
||||
}
|
||||
|
||||
{
|
||||
// ~ by kind+date
|
||||
k := make([]byte, 2+4)
|
||||
binary.BigEndian.PutUint16(k[:], uint16(evt.Kind))
|
||||
binary.BigEndian.PutUint32(k[2:], uint32(evt.CreatedAt))
|
||||
keys = append(keys, key{dbi: b.indexKind, key: k})
|
||||
}
|
||||
|
||||
{
|
||||
// ~ by pubkey+kind+date
|
||||
pubkey, _ := hex.DecodeString(evt.PubKey)
|
||||
k := make([]byte, 8+2+4)
|
||||
copy(k[:], pubkey[0:8])
|
||||
binary.BigEndian.PutUint16(k[8:], uint16(evt.Kind))
|
||||
binary.BigEndian.PutUint32(k[8+2:], uint32(evt.CreatedAt))
|
||||
keys = append(keys, key{dbi: b.indexPubkeyKind, key: k})
|
||||
}
|
||||
|
||||
// ~ by tagvalue+date
|
||||
slices.SortFunc(evt.Tags, func(a, b nostr.Tag) int { return strings.Compare(a[1], b[1]) })
|
||||
for i, tag := range evt.Tags {
|
||||
if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 {
|
||||
// not indexable
|
||||
continue
|
||||
}
|
||||
if i > 0 && evt.Tags[i-1][1] == tag[1] {
|
||||
// duplicate
|
||||
continue
|
||||
}
|
||||
|
||||
// get key prefix (with full length) and offset where to write the created_at
|
||||
dbi, k, offset := b.getTagIndexPrefix(tag[1])
|
||||
|
||||
// write the created_at
|
||||
binary.BigEndian.PutUint32(k[offset:], uint32(evt.CreatedAt))
|
||||
|
||||
keys = append(keys, key{dbi: dbi, key: k})
|
||||
}
|
||||
|
||||
{
|
||||
// ~ by date only
|
||||
k := make([]byte, 4)
|
||||
binary.BigEndian.PutUint32(k[:], uint32(evt.CreatedAt))
|
||||
keys = append(keys, key{dbi: b.indexCreatedAt, key: k})
|
||||
}
|
||||
|
||||
return keys
|
||||
}
|
||||
80
lmdb/lib.go
80
lmdb/lib.go
@@ -2,13 +2,11 @@ package lmdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/PowerDNS/lmdb-go/lmdb"
|
||||
"github.com/fiatjaf/eventstore"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -33,6 +31,7 @@ type LMDBBackend struct {
|
||||
indexPubkeyKind lmdb.DBI
|
||||
indexTag lmdb.DBI
|
||||
indexTag32 lmdb.DBI
|
||||
indexTagAddr lmdb.DBI
|
||||
|
||||
lastId atomic.Uint32
|
||||
}
|
||||
@@ -48,7 +47,7 @@ func (b *LMDBBackend) Init() error {
|
||||
return err
|
||||
}
|
||||
|
||||
env.SetMaxDBs(9)
|
||||
env.SetMaxDBs(10)
|
||||
env.SetMaxReaders(500)
|
||||
env.SetMapSize(1 << 38) // ~273GB
|
||||
|
||||
@@ -110,6 +109,11 @@ func (b *LMDBBackend) Init() error {
|
||||
} else {
|
||||
b.indexTag32 = dbi
|
||||
}
|
||||
if dbi, err := txn.OpenDBI("tagaddr", lmdb.Create); err != nil {
|
||||
return err
|
||||
} else {
|
||||
b.indexTagAddr = dbi
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
@@ -155,73 +159,3 @@ type key struct {
|
||||
dbi lmdb.DBI
|
||||
key []byte
|
||||
}
|
||||
|
||||
func (b *LMDBBackend) getIndexKeysForEvent(evt *nostr.Event) []key {
|
||||
keys := make([]key, 0, 18)
|
||||
|
||||
// indexes
|
||||
{
|
||||
// ~ by id
|
||||
k, _ := hex.DecodeString(evt.ID)
|
||||
keys = append(keys, key{dbi: b.indexId, key: k})
|
||||
}
|
||||
|
||||
{
|
||||
// ~ by pubkey+date
|
||||
pubkey, _ := hex.DecodeString(evt.PubKey)
|
||||
k := make([]byte, 32+4)
|
||||
copy(k[:], pubkey)
|
||||
binary.BigEndian.PutUint32(k[32:], uint32(evt.CreatedAt))
|
||||
keys = append(keys, key{dbi: b.indexPubkey, key: k})
|
||||
}
|
||||
|
||||
{
|
||||
// ~ by kind+date
|
||||
k := make([]byte, 2+4)
|
||||
binary.BigEndian.PutUint16(k[:], uint16(evt.Kind))
|
||||
binary.BigEndian.PutUint32(k[2:], uint32(evt.CreatedAt))
|
||||
keys = append(keys, key{dbi: b.indexKind, key: k})
|
||||
}
|
||||
|
||||
{
|
||||
// ~ by pubkey+kind+date
|
||||
pubkey, _ := hex.DecodeString(evt.PubKey)
|
||||
k := make([]byte, 32+2+4)
|
||||
copy(k[:], pubkey)
|
||||
binary.BigEndian.PutUint16(k[32:], uint16(evt.Kind))
|
||||
binary.BigEndian.PutUint32(k[32+2:], uint32(evt.CreatedAt))
|
||||
keys = append(keys, key{dbi: b.indexPubkeyKind, key: k})
|
||||
}
|
||||
|
||||
// ~ by tagvalue+date
|
||||
for _, tag := range evt.Tags {
|
||||
if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 {
|
||||
continue
|
||||
}
|
||||
|
||||
var v []byte
|
||||
var dbi lmdb.DBI
|
||||
if vb, _ := hex.DecodeString(tag[1]); len(vb) == 32 {
|
||||
// store value as bytes
|
||||
v = vb
|
||||
dbi = b.indexTag32
|
||||
} else {
|
||||
v = []byte(tag[1])
|
||||
dbi = b.indexTag
|
||||
}
|
||||
|
||||
k := make([]byte, len(v)+4)
|
||||
copy(k[:], v)
|
||||
binary.BigEndian.PutUint32(k[len(v):], uint32(evt.CreatedAt))
|
||||
keys = append(keys, key{dbi: dbi, key: k})
|
||||
}
|
||||
|
||||
{
|
||||
// ~ by date only
|
||||
k := make([]byte, 4)
|
||||
binary.BigEndian.PutUint32(k[:], uint32(evt.CreatedAt))
|
||||
keys = append(keys, key{dbi: b.indexCreatedAt, key: k})
|
||||
}
|
||||
|
||||
return keys
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"log"
|
||||
|
||||
"github.com/PowerDNS/lmdb-go/lmdb"
|
||||
"github.com/fiatjaf/eventstore"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -41,11 +42,15 @@ func (b *LMDBBackend) runMigrations() error {
|
||||
for err == nil {
|
||||
if len(key)-4 /* uint32 created_at */ == 32 {
|
||||
log.Printf("moving key %x->%x", key, val)
|
||||
txn.Put(b.indexTag32, key, val, 0)
|
||||
txn.Del(b.indexTag, key, val)
|
||||
if err := txn.Put(b.indexTag32, key, val, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := txn.Del(b.indexTag, key, val); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// next
|
||||
// next -- will end on err
|
||||
key, val, err = cursor.Get(nil, nil, lmdb.Next)
|
||||
}
|
||||
if lmdbErr, ok := err.(*lmdb.OpError); ok && lmdbErr.Errno != lmdb.NotFound {
|
||||
@@ -60,6 +65,89 @@ func (b *LMDBBackend) runMigrations() error {
|
||||
}
|
||||
|
||||
if version < 2 {
|
||||
log.Println("migration 2: use just 8 bytes for pubkeys and ids instead of 32 bytes")
|
||||
// rewrite all keys from indexTag32, indexId, indexPubkey and indexPubkeyKind
|
||||
for _, dbi := range []lmdb.DBI{b.indexTag32, b.indexId, b.indexPubkey, b.indexPubkeyKind} {
|
||||
cursor, err := txn.OpenCursor(dbi)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open cursor in migration 2: %w", err)
|
||||
}
|
||||
defer cursor.Close()
|
||||
key, val, err := cursor.Get(nil, nil, lmdb.First)
|
||||
for err == nil {
|
||||
if err := txn.Del(dbi, key, val); err != nil {
|
||||
return err
|
||||
}
|
||||
oldkey := fmt.Sprintf("%x", key)
|
||||
|
||||
// these keys are always 32 bytes of an id or pubkey, then something afterwards, doesn't matter
|
||||
// so we just keep 8 bytes and overwrite the rest
|
||||
if len(key) > 32 {
|
||||
copy(key[8:], key[32:])
|
||||
key = key[0 : len(key)-24]
|
||||
if err := txn.Put(dbi, key, val, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("moved key %s:%x to %x:%x", oldkey, val, key, val)
|
||||
}
|
||||
|
||||
// next -- will end on err
|
||||
key, val, err = cursor.Get(nil, nil, lmdb.Next)
|
||||
}
|
||||
if lmdbErr, ok := err.(*lmdb.OpError); ok && lmdbErr.Errno != lmdb.NotFound {
|
||||
// exited the loop with an error different from NOTFOUND
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// bump version
|
||||
if err := b.bumpVersion(txn, 2); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if version < 3 {
|
||||
log.Println("migration 3: move all keys from indexTag to indexTagAddr if they are like 'a' tags")
|
||||
cursor, err := txn.OpenCursor(b.indexTag)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open cursor in migration 2: %w", err)
|
||||
}
|
||||
defer cursor.Close()
|
||||
|
||||
key, val, err := cursor.Get(nil, nil, lmdb.First)
|
||||
for err == nil {
|
||||
if kind, pkb, d := eventstore.GetAddrTagElements(string(key[1 : len(key)-4])); len(pkb) == 32 {
|
||||
// it's an 'a' tag or alike
|
||||
if err := txn.Del(b.indexTag, key, val); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
k := make([]byte, 2+8+len(d)+4)
|
||||
binary.BigEndian.PutUint16(k[1:], kind)
|
||||
copy(k[2:], pkb[0:8]) // use only the first 8 bytes of the public key in the index
|
||||
copy(k[2+8:], d)
|
||||
copy(k[2+8+len(d):], key[len(key)-4:])
|
||||
if err := txn.Put(b.indexTagAddr, k, val, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("moved key %x:%x to %x:%x", key, val, k, val)
|
||||
}
|
||||
|
||||
// next -- will end on err
|
||||
key, val, err = cursor.Get(nil, nil, lmdb.Next)
|
||||
}
|
||||
if lmdbErr, ok := err.(*lmdb.OpError); ok && lmdbErr.Errno != lmdb.NotFound {
|
||||
// exited the loop with an error different from NOTFOUND
|
||||
return err
|
||||
}
|
||||
|
||||
// bump version
|
||||
if err := b.bumpVersion(txn, 3); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if version < 4 {
|
||||
// ...
|
||||
}
|
||||
|
||||
|
||||
@@ -216,20 +216,20 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
|
||||
if len(filter.IDs) > 0 {
|
||||
queries = make([]query, len(filter.IDs))
|
||||
for i, idHex := range filter.IDs {
|
||||
prefix, _ := hex.DecodeString(idHex)
|
||||
if len(prefix) != 32 {
|
||||
if len(idHex) != 64 {
|
||||
return nil, nil, 0, fmt.Errorf("invalid id '%s'", idHex)
|
||||
}
|
||||
prefix, _ := hex.DecodeString(idHex[0 : 8*2])
|
||||
queries[i] = query{i: i, dbi: b.indexId, prefix: prefix, skipTimestamp: true}
|
||||
}
|
||||
} else if len(filter.Authors) > 0 {
|
||||
if len(filter.Kinds) == 0 {
|
||||
queries = make([]query, len(filter.Authors))
|
||||
for i, pubkeyHex := range filter.Authors {
|
||||
prefix, _ := hex.DecodeString(pubkeyHex)
|
||||
if len(prefix) != 32 {
|
||||
if len(pubkeyHex) != 64 {
|
||||
return nil, nil, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex)
|
||||
}
|
||||
prefix, _ := hex.DecodeString(pubkeyHex[0 : 8*2])
|
||||
queries[i] = query{i: i, dbi: b.indexPubkey, prefix: prefix}
|
||||
}
|
||||
} else {
|
||||
@@ -237,13 +237,11 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
|
||||
i := 0
|
||||
for _, pubkeyHex := range filter.Authors {
|
||||
for _, kind := range filter.Kinds {
|
||||
pubkey, _ := hex.DecodeString(pubkeyHex)
|
||||
if len(pubkey) != 32 {
|
||||
if len(pubkeyHex) != 64 {
|
||||
return nil, nil, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex)
|
||||
}
|
||||
prefix := make([]byte, 32+2)
|
||||
copy(prefix[:], pubkey)
|
||||
binary.BigEndian.PutUint16(prefix[+32:], uint16(kind))
|
||||
pubkey, _ := hex.DecodeString(pubkeyHex[0 : 8*2])
|
||||
prefix := binary.BigEndian.AppendUint16(pubkey, uint16(kind))
|
||||
queries[i] = query{i: i, dbi: b.indexPubkeyKind, prefix: prefix}
|
||||
i++
|
||||
}
|
||||
@@ -266,21 +264,10 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) (
|
||||
i := 0
|
||||
for _, values := range filter.Tags {
|
||||
for _, value := range values {
|
||||
var dbi lmdb.DBI
|
||||
bv, _ := hex.DecodeString(value)
|
||||
var size int
|
||||
if len(bv) == 32 {
|
||||
// hex tag
|
||||
size = 32
|
||||
dbi = b.indexTag32
|
||||
} else {
|
||||
// string tag
|
||||
bv = []byte(value)
|
||||
size = len(bv)
|
||||
dbi = b.indexTag
|
||||
}
|
||||
prefix := make([]byte, size)
|
||||
copy(prefix[:], bv)
|
||||
// get key prefix (with full length) and offset where to write the created_at
|
||||
dbi, k, offset := b.getTagIndexPrefix(value)
|
||||
// remove the last parts part to get just the prefix we want here
|
||||
prefix := k[0:offset]
|
||||
queries[i] = query{i: i, dbi: dbi, prefix: prefix}
|
||||
i++
|
||||
}
|
||||
|
||||
19
utils.go
19
utils.go
@@ -1,8 +1,19 @@
|
||||
package eventstore
|
||||
|
||||
import "github.com/nbd-wtf/go-nostr"
|
||||
import (
|
||||
"encoding/hex"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func isOlder(previous, next *nostr.Event) bool {
|
||||
return previous.CreatedAt < next.CreatedAt ||
|
||||
(previous.CreatedAt == next.CreatedAt && previous.ID > next.ID)
|
||||
func GetAddrTagElements(tagValue string) (kind uint16, pkb []byte, d string) {
|
||||
spl := strings.Split(tagValue, ":")
|
||||
if len(spl) == 3 {
|
||||
if pkb, _ := hex.DecodeString(spl[1]); len(pkb) == 32 {
|
||||
if kind, err := strconv.ParseUint(spl[0], 10, 16); err == nil {
|
||||
return uint16(kind), pkb, spl[2]
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0, nil, ""
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user