diff --git a/pkg/crypto/ec/musig2/keys.go b/pkg/crypto/ec/musig2/keys.go index 7bfac60..fea4af3 100644 --- a/pkg/crypto/ec/musig2/keys.go +++ b/pkg/crypto/ec/musig2/keys.go @@ -5,6 +5,7 @@ package musig2 import ( "bytes" "fmt" + "orly.dev/pkg/utils" "sort" "orly.dev/pkg/crypto/ec" diff --git a/pkg/crypto/ec/musig2/sign.go b/pkg/crypto/ec/musig2/sign.go index 23b92a4..9e96b29 100644 --- a/pkg/crypto/ec/musig2/sign.go +++ b/pkg/crypto/ec/musig2/sign.go @@ -6,6 +6,7 @@ import ( "bytes" "fmt" "io" + "orly.dev/pkg/utils" "orly.dev/pkg/crypto/ec" "orly.dev/pkg/crypto/ec/chainhash" diff --git a/pkg/crypto/ec/taproot/taproot.go b/pkg/crypto/ec/taproot/taproot.go index 29f6bb7..bfc7e2b 100644 --- a/pkg/crypto/ec/taproot/taproot.go +++ b/pkg/crypto/ec/taproot/taproot.go @@ -8,6 +8,7 @@ import ( "fmt" "orly.dev/pkg/crypto/ec/bech32" "orly.dev/pkg/crypto/ec/chaincfg" + "orly.dev/pkg/utils" "orly.dev/pkg/utils/chk" ) diff --git a/pkg/crypto/encryption/nip44.go b/pkg/crypto/encryption/nip44.go index b2e946a..d91bdd2 100644 --- a/pkg/crypto/encryption/nip44.go +++ b/pkg/crypto/encryption/nip44.go @@ -1,13 +1,13 @@ package encryption import ( - "bytes" "crypto/hmac" "crypto/rand" "encoding/base64" "encoding/binary" "io" "math" + "orly.dev/pkg/utils" "golang.org/x/crypto/chacha20" "golang.org/x/crypto/hkdf" diff --git a/pkg/crypto/keys/keys.go b/pkg/crypto/keys/keys.go index 9538764..81c4175 100644 --- a/pkg/crypto/keys/keys.go +++ b/pkg/crypto/keys/keys.go @@ -7,6 +7,7 @@ import ( "orly.dev/pkg/crypto/ec/schnorr" "orly.dev/pkg/crypto/p256k" "orly.dev/pkg/encoders/hex" + "orly.dev/pkg/utils" "orly.dev/pkg/utils/chk" ) diff --git a/pkg/database/database.go b/pkg/database/database.go index 31e93d9..6141966 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -2,6 +2,7 @@ package database import ( "github.com/dgraph-io/badger/v4" + "github.com/dgraph-io/badger/v4/options" "orly.dev/pkg/encoders/eventidserial" "orly.dev/pkg/utils/apputil" "orly.dev/pkg/utils/chk" @@ -51,6 +52,7 @@ func New(ctx context.T, cancel context.F, dataDir, logLevel string) ( opts.BlockSize = units.Gb opts.CompactL0OnClose = true opts.LmaxCompaction = true + opts.Compression = options.None opts.Logger = d.Logger if d.DB, err = badger.Open(opts); chk.E(err) { return diff --git a/pkg/database/get-fullidpubkey-by-serial.go b/pkg/database/get-fullidpubkey-by-serial.go index 556f3be..0e22f24 100644 --- a/pkg/database/get-fullidpubkey-by-serial.go +++ b/pkg/database/get-fullidpubkey-by-serial.go @@ -53,56 +53,3 @@ func (d *D) GetFullIdPubkeyBySerial(ser *types.Uint40) ( } return } -func (d *D) GetFullIdPubkeyBySerials(sers []*types.Uint40) ( - fidpks []*store.IdPkTs, err error, -) { - if err = d.View( - func(txn *badger.Txn) (err error) { - prf := []byte(indexes.FullIdPubkeyPrefix) - it := txn.NewIterator( - badger.IteratorOptions{ - Prefix: prf, - }, - ) - defer it.Close() - for it.Seek(prf); it.Valid(); it.Next() { - item := it.Item() - key := item.Key() - ser, fid, p, ca := indexes.FullIdPubkeyVars() - buf2 := bytes.NewBuffer(key) - if err = indexes.FullIdPubkeyDec( - ser, fid, p, ca, - ).UnmarshalRead(buf2); chk.E(err) { - return - } - for i, v := range sers { - if v == nil { - continue - } - if v.Get() == ser.Get() { - fidpks = append( - fidpks, &store.IdPkTs{ - Id: fid.Bytes(), - Pub: p.Bytes(), - Ts: int64(ca.Get()), - Ser: ser.Get(), - }, - ) - sers[i] = nil - } - } - idpkts := &store.IdPkTs{ - Id: fid.Bytes(), - Pub: p.Bytes(), - Ts: int64(ca.Get()), - Ser: ser.Get(), - } - fidpks = append(fidpks, idpkts) - } - return - }, - ); chk.E(err) { - return - } - return -} diff --git a/pkg/database/get-fullidpubkey-by-serials.go b/pkg/database/get-fullidpubkey-by-serials.go new file mode 100644 index 0000000..43ad4bc --- /dev/null +++ b/pkg/database/get-fullidpubkey-by-serials.go @@ -0,0 +1,65 @@ +package database + +import ( + "bytes" + "github.com/dgraph-io/badger/v4" + "orly.dev/pkg/database/indexes" + "orly.dev/pkg/database/indexes/types" + "orly.dev/pkg/interfaces/store" + "orly.dev/pkg/utils/chk" +) + +// GetFullIdPubkeyBySerials seeks directly to each serial's prefix in the +// FullIdPubkey index. The input sers slice is expected to be sorted in +// ascending order, allowing efficient forward-only iteration via a single +// Badger iterator. +func (d *D) GetFullIdPubkeyBySerials(sers []*types.Uint40) ( + fidpks []*store.IdPkTs, err error, +) { + if len(sers) == 0 { + return + } + if err = d.View(func(txn *badger.Txn) (err error) { + // Scope the iterator to the FullIdPubkey table using its 3-byte prefix. + buf := new(bytes.Buffer) + if err = indexes.NewPrefix(indexes.FullIdPubkey).MarshalWrite(buf); chk.E(err) { + return + } + tablePrefix := buf.Bytes() + it := txn.NewIterator(badger.IteratorOptions{Prefix: tablePrefix}) + defer it.Close() + + for _, s := range sers { + if s == nil { + continue + } + // Build the serial-specific prefix: 3-byte table prefix + 5-byte serial. + sbuf := new(bytes.Buffer) + if err = indexes.FullIdPubkeyEnc(s, nil, nil, nil).MarshalWrite(sbuf); chk.E(err) { + return + } + serialPrefix := sbuf.Bytes() + + // Seek to the first key for this serial and verify it matches the prefix. + it.Seek(serialPrefix) + if it.ValidForPrefix(serialPrefix) { + item := it.Item() + key := item.Key() + ser, fid, p, ca := indexes.FullIdPubkeyVars() + if err = indexes.FullIdPubkeyDec(ser, fid, p, ca).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) { + return + } + fidpks = append(fidpks, &store.IdPkTs{ + Id: fid.Bytes(), + Pub: p.Bytes(), + Ts: int64(ca.Get()), + Ser: ser.Get(), + }) + } + } + return + }); chk.E(err) { + return + } + return +} diff --git a/pkg/database/get-serial-by-id.go b/pkg/database/get-serial-by-id.go index 5b56088..e6f011c 100644 --- a/pkg/database/get-serial-by-id.go +++ b/pkg/database/get-serial-by-id.go @@ -43,3 +43,34 @@ func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) { } return } + +// +// func (d *D) GetSerialBytesById(id []byte) (ser []byte, err error) { +// var idxs []Range +// if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.New(id)}); chk.E(err) { +// return +// } +// if len(idxs) == 0 { +// err = errorf.E("no indexes found for id %0x", id) +// } +// if err = d.View( +// func(txn *badger.Txn) (err error) { +// it := txn.NewIterator(badger.DefaultIteratorOptions) +// var key []byte +// defer it.Close() +// it.Seek(idxs[0].Start) +// if it.ValidForPrefix(idxs[0].Start) { +// item := it.Item() +// key = item.Key() +// ser = key[len(key)-5:] +// } else { +// // just don't return what we don't have? others may be +// // found tho. +// } +// return +// }, +// ); chk.E(err) { +// return +// } +// return +// } diff --git a/pkg/database/get-serials-by-range.go b/pkg/database/get-serials-by-range.go index 0dfbb14..4d9d76c 100644 --- a/pkg/database/get-serials-by-range.go +++ b/pkg/database/get-serials-by-range.go @@ -5,6 +5,7 @@ import ( "github.com/dgraph-io/badger/v4" "orly.dev/pkg/database/indexes/types" "orly.dev/pkg/utils/chk" + "sort" ) func (d *D) GetSerialsByRange(idx Range) ( @@ -40,6 +41,10 @@ func (d *D) GetSerialsByRange(idx Range) ( ); chk.E(err) { return } - + sort.Slice( + sers, func(i, j int) bool { + return sers[i].Get() < sers[j].Get() + }, + ) return } diff --git a/pkg/database/query-for-ids.go b/pkg/database/query-for-ids.go index 986c403..b88125f 100644 --- a/pkg/database/query-for-ids.go +++ b/pkg/database/query-for-ids.go @@ -26,88 +26,37 @@ func (d *D) QueryForIds(c context.T, f *filter.F) ( if idxs, err = GetIndexesFromFilter(f); chk.E(err) { return } - var idOnly bool - var tagIdPkTs []*store.IdPkTs + var results []*store.IdPkTs + var founds []*types.Uint40 for _, idx := range idxs { if f.Tags != nil && f.Tags.Len() > 1 { - if len(tagIdPkTs) == 0 { - // first - var founds types.Uint40s - if founds, err = d.GetSerialsByRange(idx); chk.E(err) { - return - } - // fetch the events full id indexes - for _, ser := range founds { - // scan for the IdPkTs - var fidpk *store.IdPkTs - if fidpk, err = d.GetFullIdPubkeyBySerial(ser); chk.E(err) { - return - } - if fidpk == nil { - continue - } - tagIdPkTs = append(tagIdPkTs, fidpk) - } - } else { - // second and subsequent - var founds types.Uint40s - var temp []*store.IdPkTs - if founds, err = d.GetSerialsByRange(idx); chk.E(err) { - return - } - // fetch the events full id indexes - for _, ser := range founds { - // scan for the IdPkTs - var fidpk *store.IdPkTs - if fidpk, err = d.GetFullIdPubkeyBySerial(ser); chk.E(err) { - return - } - if fidpk == nil { - continue - } - temp = append(temp, fidpk) - } - var intersecting []*store.IdPkTs - for _, idpk := range temp { - for _, tagIdPk := range tagIdPkTs { - if tagIdPk.Ser == idpk.Ser { - intersecting = append(intersecting, idpk) - } - } - } - tagIdPkTs = intersecting - } - // deduplicate in case this somehow happened (such as two or more - // from one tag matched, only need it once) - seen := make(map[uint64]struct{}) - for _, idpk := range tagIdPkTs { - if _, ok := seen[idpk.Ser]; !ok { - seen[idpk.Ser] = struct{}{} - idPkTs = append(idPkTs, idpk) - } - } - idPkTs = tagIdPkTs - } else { - var founds types.Uint40s if founds, err = d.GetSerialsByRange(idx); chk.E(err) { return } - // fetch the events full id indexes - for _, ser := range founds { - // scan for the IdPkTs - var fidpk *store.IdPkTs - if fidpk, err = d.GetFullIdPubkeyBySerial(ser); chk.E(err) { - return - } - if fidpk == nil { - continue - } - idPkTs = append(idPkTs, fidpk) + var tmp []*store.IdPkTs + if tmp, err = d.GetFullIdPubkeyBySerials(founds); chk.E(err) { + return } + results = append(results, tmp...) + } else { + if founds, err = d.GetSerialsByRange(idx); chk.E(err) { + return + } + var tmp []*store.IdPkTs + if tmp, err = d.GetFullIdPubkeyBySerials(founds); chk.E(err) { + return + } + results = append(results, tmp...) } } - if idOnly { - return + // deduplicate in case this somehow happened (such as two or more + // from one tag matched, only need it once) + seen := make(map[uint64]struct{}) + for _, idpk := range results { + if _, ok := seen[idpk.Ser]; !ok { + seen[idpk.Ser] = struct{}{} + idPkTs = append(idPkTs, idpk) + } } // sort results by timestamp in reverse chronological order sort.Slice( diff --git a/pkg/database/query-for-serials.go b/pkg/database/query-for-serials.go index 8fe4979..047f4a3 100644 --- a/pkg/database/query-for-serials.go +++ b/pkg/database/query-for-serials.go @@ -6,7 +6,6 @@ import ( "orly.dev/pkg/interfaces/store" "orly.dev/pkg/utils/chk" "orly.dev/pkg/utils/context" - "sort" ) // QueryForSerials takes a filter and returns the serials of events that match, @@ -14,7 +13,7 @@ import ( func (d *D) QueryForSerials(c context.T, f *filter.F) ( sers types.Uint40s, err error, ) { - var founds types.Uint40s + var founds []*types.Uint40 var idPkTs []*store.IdPkTs if f.Ids != nil && f.Ids.Len() > 0 { for _, id := range f.Ids.ToSliceOfBytes() { @@ -24,24 +23,30 @@ func (d *D) QueryForSerials(c context.T, f *filter.F) ( } founds = append(founds, ser) } - // fetch the events full id indexes so we can sort them - for _, ser := range founds { - // scan for the IdPkTs - var fidpk *store.IdPkTs - if fidpk, err = d.GetFullIdPubkeyBySerial(ser); chk.E(err) { - return - } - if fidpk == nil { - continue - } - idPkTs = append(idPkTs, fidpk) - // sort by timestamp - sort.Slice( - idPkTs, func(i, j int) bool { - return idPkTs[i].Ts > idPkTs[j].Ts - }, - ) + var tmp []*store.IdPkTs + if tmp, err = d.GetFullIdPubkeyBySerials(founds); chk.E(err) { + return } + idPkTs = append(idPkTs, tmp...) + + // // fetch the events full id indexes so we can sort them + // for _, ser := range founds { + // // scan for the IdPkTs + // var fidpk *store.IdPkTs + // if fidpk, err = d.GetFullIdPubkeyBySerial(ser); chk.E(err) { + // return + // } + // if fidpk == nil { + // continue + // } + // idPkTs = append(idPkTs, fidpk) + // // sort by timestamp + // sort.Slice( + // idPkTs, func(i, j int) bool { + // return idPkTs[i].Ts > idPkTs[j].Ts + // }, + // ) + // } } else { if idPkTs, err = d.QueryForIds(c, f); chk.E(err) { return diff --git a/pkg/database/save-event.go b/pkg/database/save-event.go index 38ee8a7..9eda915 100644 --- a/pkg/database/save-event.go +++ b/pkg/database/save-event.go @@ -67,16 +67,21 @@ func (d *D) SaveEvent( // to get the timestamp and ensure that the event post-dates it. // otherwise, it should be rejected. var idPkTss []*store.IdPkTs - for _, ser := range sers { - var fidpk *store.IdPkTs - if fidpk, err = d.GetFullIdPubkeyBySerial(ser); chk.E(err) { - return - } - if fidpk == nil { - continue - } - idPkTss = append(idPkTss, fidpk) + var tmp []*store.IdPkTs + if tmp, err = d.GetFullIdPubkeyBySerials(sers); chk.E(err) { + return } + idPkTss = append(idPkTss, tmp...) + // for _, ser := range sers { + // var fidpk *store.IdPkTs + // if fidpk, err = d.GetFullIdPubkeyBySerial(ser); chk.E(err) { + // return + // } + // if fidpk == nil { + // continue + // } + // idPkTss = append(idPkTss, fidpk) + // } // sort by timestamp, so the first is the newest sort.Slice( idPkTss, func(i, j int) bool { diff --git a/pkg/encoders/event/codectester/divider/main.go b/pkg/encoders/event/codectester/divider/main.go index f2e86b1..09a3839 100644 --- a/pkg/encoders/event/codectester/divider/main.go +++ b/pkg/encoders/event/codectester/divider/main.go @@ -5,9 +5,9 @@ package main import ( "bufio" - "bytes" "fmt" "orly.dev/pkg/encoders/event" + "orly.dev/pkg/utils" "orly.dev/pkg/utils/chk" "orly.dev/pkg/utils/interrupt" "orly.dev/pkg/utils/log"