diff --git a/badger/helpers.go b/badger/helpers.go index 0f50993..0c3b139 100644 --- a/badger/helpers.go +++ b/badger/helpers.go @@ -3,29 +3,18 @@ package badger import ( "encoding/binary" "encoding/hex" - "strconv" "strings" + "github.com/fiatjaf/eventstore" "github.com/nbd-wtf/go-nostr" + "golang.org/x/exp/slices" ) -func getAddrTagElements(tagValue string) (kind uint16, pkb []byte, d string) { - spl := strings.Split(tagValue, ":") - if len(spl) == 3 { - if pkb, _ := hex.DecodeString(spl[1]); len(pkb) == 32 { - if kind, err := strconv.ParseUint(spl[0], 10, 16); err == nil { - return uint16(kind), pkb, spl[2] - } - } - } - return 0, nil, "" -} - func getTagIndexPrefix(tagValue string) ([]byte, int) { var k []byte // the key with full length for created_at and idx at the end, but not filled with these var offset int // the offset -- i.e. where the prefix ends and the created_at and idx would start - if kind, pkb, d := getAddrTagElements(tagValue); len(pkb) == 32 { + if kind, pkb, d := eventstore.GetAddrTagElements(tagValue); len(pkb) == 32 { // store value in the new special "a" tag index k = make([]byte, 1+2+32+len(d)+4+4) k[0] = indexTagAddrPrefix @@ -98,8 +87,14 @@ func getIndexKeysForEvent(evt *nostr.Event, idx []byte) [][]byte { } // ~ by tagvalue+date - for _, tag := range evt.Tags { + slices.SortFunc(evt.Tags, func(a, b nostr.Tag) int { return strings.Compare(a[1], b[1]) }) + for i, tag := range evt.Tags { if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 { + // not indexable + continue + } + if i > 0 && evt.Tags[i-1][1] == tag[1] { + // duplicate continue } diff --git a/badger/migrations.go b/badger/migrations.go index 32dc631..00e6336 100644 --- a/badger/migrations.go +++ b/badger/migrations.go @@ -5,6 +5,7 @@ import ( "log" "github.com/dgraph-io/badger/v4" + "github.com/fiatjaf/eventstore" ) func (b *BadgerBackend) runMigrations() error { @@ -74,7 +75,7 @@ func (b *BadgerBackend) runMigrations() error { item := it.Item() key := item.Key() - if kind, pkb, d := getAddrTagElements(string(key[1 : len(key)-4-4])); len(pkb) == 32 { + if kind, pkb, d := eventstore.GetAddrTagElements(string(key[1 : len(key)-4-4])); len(pkb) == 32 { // it's an 'a' tag or alike if err := txn.Delete(key); err != nil { return err diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..665ae10 --- /dev/null +++ b/helpers.go @@ -0,0 +1,8 @@ +package eventstore + +import "github.com/nbd-wtf/go-nostr" + +func isOlder(previous, next *nostr.Event) bool { + return previous.CreatedAt < next.CreatedAt || + (previous.CreatedAt == next.CreatedAt && previous.ID > next.ID) +} diff --git a/lmdb/helpers.go b/lmdb/helpers.go new file mode 100644 index 0000000..1dcb980 --- /dev/null +++ b/lmdb/helpers.go @@ -0,0 +1,110 @@ +package lmdb + +import ( + "encoding/binary" + "encoding/hex" + "strings" + + "github.com/PowerDNS/lmdb-go/lmdb" + "github.com/fiatjaf/eventstore" + "github.com/nbd-wtf/go-nostr" + "golang.org/x/exp/slices" +) + +func (b *LMDBBackend) getTagIndexPrefix(tagValue string) (lmdb.DBI, []byte, int) { + var k []byte // the key with full length for created_at and idx at the end, but not filled with these + var offset int // the offset -- i.e. where the prefix ends and the created_at and idx would start + var dbi lmdb.DBI + + if kind, pkb, d := eventstore.GetAddrTagElements(tagValue); len(pkb) == 32 { + // store value in the new special "a" tag index + k = make([]byte, 2+8+len(d)+4) + binary.BigEndian.PutUint16(k[1:], kind) + copy(k[2:], pkb[0:8]) + copy(k[2+8:], d) + offset = 2 + 8 + len(d) + dbi = b.indexTagAddr + } else if vb, _ := hex.DecodeString(tagValue); len(vb) == 32 { + // store value as bytes + k = make([]byte, 8+4) + copy(k, vb[0:8]) + offset = 8 + dbi = b.indexTag32 + } else { + // store whatever as utf-8 + k = make([]byte, len(tagValue)+4) + copy(k, tagValue) + offset = len(tagValue) + dbi = b.indexTag + } + + return dbi, k, offset +} + +func (b *LMDBBackend) getIndexKeysForEvent(evt *nostr.Event) []key { + keys := make([]key, 0, 18) + + // indexes + { + // ~ by id + k, _ := hex.DecodeString(evt.ID) + keys = append(keys, key{dbi: b.indexId, key: k}) + } + + { + // ~ by pubkey+date + pubkey, _ := hex.DecodeString(evt.PubKey) + k := make([]byte, 8+4) + copy(k[:], pubkey[0:8]) + binary.BigEndian.PutUint32(k[8:], uint32(evt.CreatedAt)) + keys = append(keys, key{dbi: b.indexPubkey, key: k}) + } + + { + // ~ by kind+date + k := make([]byte, 2+4) + binary.BigEndian.PutUint16(k[:], uint16(evt.Kind)) + binary.BigEndian.PutUint32(k[2:], uint32(evt.CreatedAt)) + keys = append(keys, key{dbi: b.indexKind, key: k}) + } + + { + // ~ by pubkey+kind+date + pubkey, _ := hex.DecodeString(evt.PubKey) + k := make([]byte, 8+2+4) + copy(k[:], pubkey[0:8]) + binary.BigEndian.PutUint16(k[8:], uint16(evt.Kind)) + binary.BigEndian.PutUint32(k[8+2:], uint32(evt.CreatedAt)) + keys = append(keys, key{dbi: b.indexPubkeyKind, key: k}) + } + + // ~ by tagvalue+date + slices.SortFunc(evt.Tags, func(a, b nostr.Tag) int { return strings.Compare(a[1], b[1]) }) + for i, tag := range evt.Tags { + if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 { + // not indexable + continue + } + if i > 0 && evt.Tags[i-1][1] == tag[1] { + // duplicate + continue + } + + // get key prefix (with full length) and offset where to write the created_at + dbi, k, offset := b.getTagIndexPrefix(tag[1]) + + // write the created_at + binary.BigEndian.PutUint32(k[offset:], uint32(evt.CreatedAt)) + + keys = append(keys, key{dbi: dbi, key: k}) + } + + { + // ~ by date only + k := make([]byte, 4) + binary.BigEndian.PutUint32(k[:], uint32(evt.CreatedAt)) + keys = append(keys, key{dbi: b.indexCreatedAt, key: k}) + } + + return keys +} diff --git a/lmdb/lib.go b/lmdb/lib.go index e4f65b3..33bd006 100644 --- a/lmdb/lib.go +++ b/lmdb/lib.go @@ -2,13 +2,11 @@ package lmdb import ( "encoding/binary" - "encoding/hex" "os" "sync/atomic" "github.com/PowerDNS/lmdb-go/lmdb" "github.com/fiatjaf/eventstore" - "github.com/nbd-wtf/go-nostr" ) const ( @@ -33,6 +31,7 @@ type LMDBBackend struct { indexPubkeyKind lmdb.DBI indexTag lmdb.DBI indexTag32 lmdb.DBI + indexTagAddr lmdb.DBI lastId atomic.Uint32 } @@ -48,7 +47,7 @@ func (b *LMDBBackend) Init() error { return err } - env.SetMaxDBs(9) + env.SetMaxDBs(10) env.SetMaxReaders(500) env.SetMapSize(1 << 38) // ~273GB @@ -110,6 +109,11 @@ func (b *LMDBBackend) Init() error { } else { b.indexTag32 = dbi } + if dbi, err := txn.OpenDBI("tagaddr", lmdb.Create); err != nil { + return err + } else { + b.indexTagAddr = dbi + } return nil }); err != nil { return err @@ -155,73 +159,3 @@ type key struct { dbi lmdb.DBI key []byte } - -func (b *LMDBBackend) getIndexKeysForEvent(evt *nostr.Event) []key { - keys := make([]key, 0, 18) - - // indexes - { - // ~ by id - k, _ := hex.DecodeString(evt.ID) - keys = append(keys, key{dbi: b.indexId, key: k}) - } - - { - // ~ by pubkey+date - pubkey, _ := hex.DecodeString(evt.PubKey) - k := make([]byte, 32+4) - copy(k[:], pubkey) - binary.BigEndian.PutUint32(k[32:], uint32(evt.CreatedAt)) - keys = append(keys, key{dbi: b.indexPubkey, key: k}) - } - - { - // ~ by kind+date - k := make([]byte, 2+4) - binary.BigEndian.PutUint16(k[:], uint16(evt.Kind)) - binary.BigEndian.PutUint32(k[2:], uint32(evt.CreatedAt)) - keys = append(keys, key{dbi: b.indexKind, key: k}) - } - - { - // ~ by pubkey+kind+date - pubkey, _ := hex.DecodeString(evt.PubKey) - k := make([]byte, 32+2+4) - copy(k[:], pubkey) - binary.BigEndian.PutUint16(k[32:], uint16(evt.Kind)) - binary.BigEndian.PutUint32(k[32+2:], uint32(evt.CreatedAt)) - keys = append(keys, key{dbi: b.indexPubkeyKind, key: k}) - } - - // ~ by tagvalue+date - for _, tag := range evt.Tags { - if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 { - continue - } - - var v []byte - var dbi lmdb.DBI - if vb, _ := hex.DecodeString(tag[1]); len(vb) == 32 { - // store value as bytes - v = vb - dbi = b.indexTag32 - } else { - v = []byte(tag[1]) - dbi = b.indexTag - } - - k := make([]byte, len(v)+4) - copy(k[:], v) - binary.BigEndian.PutUint32(k[len(v):], uint32(evt.CreatedAt)) - keys = append(keys, key{dbi: dbi, key: k}) - } - - { - // ~ by date only - k := make([]byte, 4) - binary.BigEndian.PutUint32(k[:], uint32(evt.CreatedAt)) - keys = append(keys, key{dbi: b.indexCreatedAt, key: k}) - } - - return keys -} diff --git a/lmdb/migration.go b/lmdb/migration.go index 6a4de1b..16e7b58 100644 --- a/lmdb/migration.go +++ b/lmdb/migration.go @@ -6,6 +6,7 @@ import ( "log" "github.com/PowerDNS/lmdb-go/lmdb" + "github.com/fiatjaf/eventstore" ) const ( @@ -41,11 +42,15 @@ func (b *LMDBBackend) runMigrations() error { for err == nil { if len(key)-4 /* uint32 created_at */ == 32 { log.Printf("moving key %x->%x", key, val) - txn.Put(b.indexTag32, key, val, 0) - txn.Del(b.indexTag, key, val) + if err := txn.Put(b.indexTag32, key, val, 0); err != nil { + return err + } + if err := txn.Del(b.indexTag, key, val); err != nil { + return err + } } - // next + // next -- will end on err key, val, err = cursor.Get(nil, nil, lmdb.Next) } if lmdbErr, ok := err.(*lmdb.OpError); ok && lmdbErr.Errno != lmdb.NotFound { @@ -60,6 +65,89 @@ func (b *LMDBBackend) runMigrations() error { } if version < 2 { + log.Println("migration 2: use just 8 bytes for pubkeys and ids instead of 32 bytes") + // rewrite all keys from indexTag32, indexId, indexPubkey and indexPubkeyKind + for _, dbi := range []lmdb.DBI{b.indexTag32, b.indexId, b.indexPubkey, b.indexPubkeyKind} { + cursor, err := txn.OpenCursor(dbi) + if err != nil { + return fmt.Errorf("failed to open cursor in migration 2: %w", err) + } + defer cursor.Close() + key, val, err := cursor.Get(nil, nil, lmdb.First) + for err == nil { + if err := txn.Del(dbi, key, val); err != nil { + return err + } + oldkey := fmt.Sprintf("%x", key) + + // these keys are always 32 bytes of an id or pubkey, then something afterwards, doesn't matter + // so we just keep 8 bytes and overwrite the rest + if len(key) > 32 { + copy(key[8:], key[32:]) + key = key[0 : len(key)-24] + if err := txn.Put(dbi, key, val, 0); err != nil { + return err + } + log.Printf("moved key %s:%x to %x:%x", oldkey, val, key, val) + } + + // next -- will end on err + key, val, err = cursor.Get(nil, nil, lmdb.Next) + } + if lmdbErr, ok := err.(*lmdb.OpError); ok && lmdbErr.Errno != lmdb.NotFound { + // exited the loop with an error different from NOTFOUND + return err + } + } + + // bump version + if err := b.bumpVersion(txn, 2); err != nil { + return err + } + } + + if version < 3 { + log.Println("migration 3: move all keys from indexTag to indexTagAddr if they are like 'a' tags") + cursor, err := txn.OpenCursor(b.indexTag) + if err != nil { + return fmt.Errorf("failed to open cursor in migration 2: %w", err) + } + defer cursor.Close() + + key, val, err := cursor.Get(nil, nil, lmdb.First) + for err == nil { + if kind, pkb, d := eventstore.GetAddrTagElements(string(key[1 : len(key)-4])); len(pkb) == 32 { + // it's an 'a' tag or alike + if err := txn.Del(b.indexTag, key, val); err != nil { + return err + } + + k := make([]byte, 2+8+len(d)+4) + binary.BigEndian.PutUint16(k[1:], kind) + copy(k[2:], pkb[0:8]) // use only the first 8 bytes of the public key in the index + copy(k[2+8:], d) + copy(k[2+8+len(d):], key[len(key)-4:]) + if err := txn.Put(b.indexTagAddr, k, val, 0); err != nil { + return err + } + log.Printf("moved key %x:%x to %x:%x", key, val, k, val) + } + + // next -- will end on err + key, val, err = cursor.Get(nil, nil, lmdb.Next) + } + if lmdbErr, ok := err.(*lmdb.OpError); ok && lmdbErr.Errno != lmdb.NotFound { + // exited the loop with an error different from NOTFOUND + return err + } + + // bump version + if err := b.bumpVersion(txn, 3); err != nil { + return err + } + } + + if version < 4 { // ... } diff --git a/lmdb/query.go b/lmdb/query.go index 4cebe14..6ce2439 100644 --- a/lmdb/query.go +++ b/lmdb/query.go @@ -216,20 +216,20 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( if len(filter.IDs) > 0 { queries = make([]query, len(filter.IDs)) for i, idHex := range filter.IDs { - prefix, _ := hex.DecodeString(idHex) - if len(prefix) != 32 { + if len(idHex) != 64 { return nil, nil, 0, fmt.Errorf("invalid id '%s'", idHex) } + prefix, _ := hex.DecodeString(idHex[0 : 8*2]) queries[i] = query{i: i, dbi: b.indexId, prefix: prefix, skipTimestamp: true} } } else if len(filter.Authors) > 0 { if len(filter.Kinds) == 0 { queries = make([]query, len(filter.Authors)) for i, pubkeyHex := range filter.Authors { - prefix, _ := hex.DecodeString(pubkeyHex) - if len(prefix) != 32 { + if len(pubkeyHex) != 64 { return nil, nil, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) } + prefix, _ := hex.DecodeString(pubkeyHex[0 : 8*2]) queries[i] = query{i: i, dbi: b.indexPubkey, prefix: prefix} } } else { @@ -237,13 +237,11 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( i := 0 for _, pubkeyHex := range filter.Authors { for _, kind := range filter.Kinds { - pubkey, _ := hex.DecodeString(pubkeyHex) - if len(pubkey) != 32 { + if len(pubkeyHex) != 64 { return nil, nil, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) } - prefix := make([]byte, 32+2) - copy(prefix[:], pubkey) - binary.BigEndian.PutUint16(prefix[+32:], uint16(kind)) + pubkey, _ := hex.DecodeString(pubkeyHex[0 : 8*2]) + prefix := binary.BigEndian.AppendUint16(pubkey, uint16(kind)) queries[i] = query{i: i, dbi: b.indexPubkeyKind, prefix: prefix} i++ } @@ -266,21 +264,10 @@ func (b *LMDBBackend) prepareQueries(filter nostr.Filter) ( i := 0 for _, values := range filter.Tags { for _, value := range values { - var dbi lmdb.DBI - bv, _ := hex.DecodeString(value) - var size int - if len(bv) == 32 { - // hex tag - size = 32 - dbi = b.indexTag32 - } else { - // string tag - bv = []byte(value) - size = len(bv) - dbi = b.indexTag - } - prefix := make([]byte, size) - copy(prefix[:], bv) + // get key prefix (with full length) and offset where to write the created_at + dbi, k, offset := b.getTagIndexPrefix(value) + // remove the last parts part to get just the prefix we want here + prefix := k[0:offset] queries[i] = query{i: i, dbi: dbi, prefix: prefix} i++ } diff --git a/utils.go b/utils.go index 665ae10..3718cc4 100644 --- a/utils.go +++ b/utils.go @@ -1,8 +1,19 @@ package eventstore -import "github.com/nbd-wtf/go-nostr" +import ( + "encoding/hex" + "strconv" + "strings" +) -func isOlder(previous, next *nostr.Event) bool { - return previous.CreatedAt < next.CreatedAt || - (previous.CreatedAt == next.CreatedAt && previous.ID > next.ID) +func GetAddrTagElements(tagValue string) (kind uint16, pkb []byte, d string) { + spl := strings.Split(tagValue, ":") + if len(spl) == 3 { + if pkb, _ := hex.DecodeString(spl[1]); len(pkb) == 32 { + if kind, err := strconv.ParseUint(spl[0], 10, 16); err == nil { + return uint16(kind), pkb, spl[2] + } + } + } + return 0, nil, "" }