Refactor database filters and enhance query handling
Updated several database queries to support limits, improve efficiency, and add better range handling. Introduced new tests for filter functionality, corrected bugs, and added logging for debugging purposes.
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -86,3 +86,4 @@ node_modules/**
|
|||||||
/blocklist.json
|
/blocklist.json
|
||||||
/gui/gui/main.wasm
|
/gui/gui/main.wasm
|
||||||
/gui/gui/index.html
|
/gui/gui/index.html
|
||||||
|
database/testrealy
|
||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"x.realy.lol/database/indexes/types/varint"
|
"x.realy.lol/database/indexes/types/varint"
|
||||||
"x.realy.lol/filter"
|
"x.realy.lol/filter"
|
||||||
"x.realy.lol/hex"
|
"x.realy.lol/hex"
|
||||||
|
"x.realy.lol/log"
|
||||||
"x.realy.lol/timestamp"
|
"x.realy.lol/timestamp"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -37,7 +38,7 @@ func ToBitfield(f *filter.F) (b Bitfield) {
|
|||||||
if len(f.Authors) != 0 {
|
if len(f.Authors) != 0 {
|
||||||
b += hasAuthors
|
b += hasAuthors
|
||||||
}
|
}
|
||||||
if len(f.Kinds) != 0 {
|
if len(f.Tags) != 0 {
|
||||||
b += hasTags
|
b += hasTags
|
||||||
}
|
}
|
||||||
if f.Since != nil {
|
if f.Since != nil {
|
||||||
@@ -77,32 +78,58 @@ func (d *D) Filter(f filter.F, exclude []*pubhash.T) (evSerials varint.S, err er
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var since, until timestamp.Timestamp
|
var since, until *timestamp.Timestamp
|
||||||
|
if bf&hasSince == 0 || bf&hasUntil == 0 {
|
||||||
if bf&hasSince != 0 {
|
if bf&hasSince != 0 {
|
||||||
since = *f.Since
|
since = f.Since
|
||||||
}
|
}
|
||||||
if bf&hasUntil != 0 {
|
if bf&hasUntil != 0 {
|
||||||
until = *f.Until
|
until = f.Until
|
||||||
} else {
|
} else {
|
||||||
until = math.MaxInt64
|
m := timestamp.Timestamp(math.MaxInt64)
|
||||||
|
until = &m
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
limit := f.Limit
|
||||||
|
var postLimit bool
|
||||||
|
if limit != nil {
|
||||||
|
// put a reasonable cap on unlimited. the actual results may be a lot less for composite
|
||||||
|
// searches that intersect with tags.
|
||||||
|
limit = filter.IntToPointer(10000)
|
||||||
|
} else {
|
||||||
|
// this means trim the result at the end before returning.
|
||||||
|
postLimit = true
|
||||||
|
}
|
||||||
|
log.I.F("%b %b", bf, bf&(hasSince+hasUntil+hasLimit))
|
||||||
|
bf = bf &^ hasLimit
|
||||||
// next, check for filters that only have since and/or until
|
// next, check for filters that only have since and/or until
|
||||||
if bf&hasSince != 0 || bf&hasUntil != 0 {
|
if bf&(hasSince+hasUntil) != 0 && ^(hasUntil+hasSince)&bf == 0 {
|
||||||
if evs, err = d.GetEventSerialsByCreatedAtRange(since, until); chk.E(err) {
|
if evs, err = d.GetEventSerialsByCreatedAtRange(since, until, limit, postLimit); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
// next, kinds
|
// next, kinds
|
||||||
if bf&hasKinds == hasKinds && ^hasKinds&bf == 0 {
|
if bf&hasKinds == hasKinds && ^hasKinds&bf == 0 {
|
||||||
if evs, err = d.GetEventSerialsByKindsCreatedAtRange(f.Kinds, since, until); chk.E(err) {
|
log.I.F("kinds")
|
||||||
|
if evs, err = d.GetEventSerialsByKinds(f.Kinds, f.Limit); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
goto done
|
||||||
|
}
|
||||||
|
// next, kinds/created_at
|
||||||
|
if (bf&hasKinds+hasSince == hasKinds+hasSince ||
|
||||||
|
bf&hasKinds+hasUntil == hasKinds+hasUntil ||
|
||||||
|
bf&hasKinds+hasUntil+hasSince == hasKinds+hasUntil+hasSince) &&
|
||||||
|
^(hasKinds+hasUntil+hasSince)&bf == 0 {
|
||||||
|
if evs, err = d.GetEventSerialsByKindsCreatedAtRange(f.Kinds, since, until, limit); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
// next authors
|
// next authors
|
||||||
if bf&hasAuthors == hasAuthors && ^hasAuthors&bf == 0 {
|
if bf&hasAuthors == hasAuthors && ^hasAuthors&bf == 0 {
|
||||||
if evs, err = d.GetEventSerialsByAuthorsCreatedAtRange(f.Authors, since, until); chk.E(err) {
|
if evs, err = d.GetEventSerialsByAuthorsCreatedAtRange(f.Authors, since, until, limit); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
goto done
|
goto done
|
||||||
@@ -110,34 +137,35 @@ func (d *D) Filter(f filter.F, exclude []*pubhash.T) (evSerials varint.S, err er
|
|||||||
// next authors/kinds
|
// next authors/kinds
|
||||||
|
|
||||||
if ak := hasAuthors + hasKinds; bf&(ak) == ak && ^ak&bf == 0 {
|
if ak := hasAuthors + hasKinds; bf&(ak) == ak && ^ak&bf == 0 {
|
||||||
if evs, err = d.GetEventSerialsByKindsAuthorsCreatedAtRange(f.Kinds, f.Authors, since, until); chk.E(err) {
|
if evs, err = d.GetEventSerialsByKindsAuthorsCreatedAtRange(f.Kinds, f.Authors, since, until, limit); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
// if there is tags, assemble them into an array of tags with the
|
// if there is tags, assemble them into an array of tags with the
|
||||||
if bf&hasTags != 0 && bf&^hasTags == 0 {
|
if bf&hasTags != 0 && bf&^hasTags == 0 {
|
||||||
if evs, err = d.GetEventSerialsByTagsCreatedAtRange(f.Tags); chk.E(err) {
|
if evs, err = d.GetEventSerialsByTagsCreatedAtRange(f.Tags, limit); chk.E(err) {
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// next authors/tags
|
// next authors/tags
|
||||||
if at := hasAuthors + hasTags; bf&(at) == at && ^at&bf == 0 {
|
if at := hasAuthors + hasTags; bf&(at) == at && ^at&bf == 0 {
|
||||||
if evs, err = d.GetEventSerialsByAuthorsTagsCreatedAtRange(f.Tags, f.Authors, since, until); chk.E(err) {
|
if evs, err = d.GetEventSerialsByAuthorsTagsCreatedAtRange(f.Tags, f.Authors, since, until, limit); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
// next kinds/tags
|
// next kinds/tags
|
||||||
if kt := hasKinds + hasTags; bf&(kt) == kt && ^kt&bf == 0 {
|
if kt := hasKinds + hasTags; bf&(kt) == kt && ^kt&bf == 0 {
|
||||||
if evs, err = d.GetEventSerialsByKindsTagsCreatedAtRange(f.Tags, f.Kinds, since, until); chk.E(err) {
|
if evs, err = d.GetEventSerialsByKindsTagsCreatedAtRange(f.Tags, f.Kinds, since,
|
||||||
|
until, limit); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
// next kinds/authors/tags
|
// next kinds/authors/tags
|
||||||
if kat := hasAuthors + hasTags; bf&(kat) == kat && ^kat&bf == 0 {
|
if kat := hasAuthors + hasTags; bf&(kat) == kat && ^kat&bf == 0 {
|
||||||
if evs, err = d.GetEventSerialsByKindsAuthorsTagsCreatedAtRange(f.Tags, f.Kinds, f.Authors, since, until); chk.E(err) {
|
if evs, err = d.GetEventSerialsByKindsAuthorsTagsCreatedAtRange(f.Tags, f.Kinds, f.Authors, since, until, limit); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
goto done
|
goto done
|
||||||
|
|||||||
80
database/filter_test.go
Normal file
80
database/filter_test.go
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"x.realy.lol/apputil"
|
||||||
|
"x.realy.lol/chk"
|
||||||
|
"x.realy.lol/database/indexes/types/varint"
|
||||||
|
"x.realy.lol/event"
|
||||||
|
"x.realy.lol/filter"
|
||||||
|
"x.realy.lol/interrupt"
|
||||||
|
"x.realy.lol/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestD_Filter(t *testing.T) {
|
||||||
|
var err error
|
||||||
|
d := New()
|
||||||
|
tmpDir := "testrealy"
|
||||||
|
dbExists := !apputil.FileExists(tmpDir)
|
||||||
|
if err = d.Init(tmpDir); chk.E(err) {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
interrupt.AddHandler(func() {
|
||||||
|
d.Close()
|
||||||
|
})
|
||||||
|
if dbExists {
|
||||||
|
buf := bytes.NewBuffer(ExampleEvents)
|
||||||
|
scan := bufio.NewScanner(buf)
|
||||||
|
scan.Buffer(make([]byte, 5120000), 5120000)
|
||||||
|
var count, errs int
|
||||||
|
for scan.Scan() {
|
||||||
|
b := scan.Bytes()
|
||||||
|
ev := event.New()
|
||||||
|
if err = ev.Unmarshal(b); chk.E(err) {
|
||||||
|
t.Fatalf("%s:\n%s", err, b)
|
||||||
|
}
|
||||||
|
// verify the signature on the event
|
||||||
|
var ok bool
|
||||||
|
if ok, err = ev.Verify(); chk.E(err) {
|
||||||
|
errs++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
errs++
|
||||||
|
log.E.F("event signature is invalid\n%s", b)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
if count%1000 == 0 {
|
||||||
|
log.I.F("unmarshaled %d events", count)
|
||||||
|
}
|
||||||
|
if err = d.StoreEvent(ev); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.I.F("stored %d events", count)
|
||||||
|
}
|
||||||
|
// fetch some kind 0
|
||||||
|
var sers []*varint.V
|
||||||
|
if sers, err = d.Filter(filter.F{
|
||||||
|
Kinds: []int{0},
|
||||||
|
Limit: filter.IntToPointer(50),
|
||||||
|
}, nil); chk.E(err) {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// log.I.S(sers)
|
||||||
|
var fids [][]byte
|
||||||
|
for _, ser := range sers {
|
||||||
|
var evIds []byte
|
||||||
|
if evIds, err = d.GetEventIdFromSerial(ser); chk.E(err) {
|
||||||
|
// continue
|
||||||
|
log.I.S(ser)
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
fids = append(fids, evIds)
|
||||||
|
}
|
||||||
|
log.I.S(fids)
|
||||||
|
}
|
||||||
177
database/find.go
177
database/find.go
@@ -2,6 +2,7 @@ package database
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/dgraph-io/badger/v4"
|
"github.com/dgraph-io/badger/v4"
|
||||||
|
|
||||||
@@ -46,7 +47,7 @@ func (d *D) FindEventSerialById(evId []byte) (ser *varint.V, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if ser == nil {
|
if ser == nil {
|
||||||
err = errorf.E("event %0x not found", evId)
|
err = fmt.Errorf("event %0x not found", evId)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@@ -54,13 +55,13 @@ func (d *D) FindEventSerialById(evId []byte) (ser *varint.V, err error) {
|
|||||||
|
|
||||||
func (d *D) GetEventFromSerial(ser *varint.V) (ev *event.E, err error) {
|
func (d *D) GetEventFromSerial(ser *varint.V) (ev *event.E, err error) {
|
||||||
if err = d.View(func(txn *badger.Txn) (err error) {
|
if err = d.View(func(txn *badger.Txn) (err error) {
|
||||||
enc := indexes.EventDec(ser)
|
enc := indexes.EventEnc(ser)
|
||||||
kb := new(bytes.Buffer)
|
kb := new(bytes.Buffer)
|
||||||
if err = enc.MarshalWrite(kb); chk.E(err) {
|
if err = enc.MarshalWrite(kb); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var item *badger.Item
|
var item *badger.Item
|
||||||
if item, err = txn.Get(kb.Bytes()); chk.E(err) {
|
if item, err = txn.Get(kb.Bytes()); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var val []byte
|
var val []byte
|
||||||
@@ -73,13 +74,13 @@ func (d *D) GetEventFromSerial(ser *varint.V) (ev *event.E, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}); chk.E(err) {
|
}); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *D) GetEventFullIndexFromSerial(ser *varint.V) (id []byte, err error) {
|
func (d *D) GetEventIdFromSerial(ser *varint.V) (id []byte, err error) {
|
||||||
if err = d.View(func(txn *badger.Txn) (err error) {
|
if err = d.View(func(txn *badger.Txn) (err error) {
|
||||||
enc := indexes.New(prefix.New(prefixes.FullIndex), ser)
|
enc := indexes.New(prefix.New(prefixes.FullIndex), ser)
|
||||||
prf := new(bytes.Buffer)
|
prf := new(bytes.Buffer)
|
||||||
@@ -108,7 +109,7 @@ func (d *D) GetEventFullIndexFromSerial(ser *varint.V) (id []byte, err error) {
|
|||||||
|
|
||||||
func (d *D) GetEventById(evId []byte) (ev *event.E, err error) {
|
func (d *D) GetEventById(evId []byte) (ev *event.E, err error) {
|
||||||
var ser *varint.V
|
var ser *varint.V
|
||||||
if ser, err = d.FindEventSerialById(evId); chk.E(err) {
|
if ser, err = d.FindEventSerialById(evId); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ev, err = d.GetEventFromSerial(ser)
|
ev, err = d.GetEventFromSerial(ser)
|
||||||
@@ -117,7 +118,9 @@ func (d *D) GetEventById(evId []byte) (ev *event.E, err error) {
|
|||||||
|
|
||||||
// GetEventSerialsByCreatedAtRange returns the serials of events with the given since/until
|
// GetEventSerialsByCreatedAtRange returns the serials of events with the given since/until
|
||||||
// range in reverse chronological order (starting at until, going back to since).
|
// range in reverse chronological order (starting at until, going back to since).
|
||||||
func (d *D) GetEventSerialsByCreatedAtRange(since, until timestamp.Timestamp) (sers varint.S, err error) {
|
func (d *D) GetEventSerialsByCreatedAtRange(since, until *timestamp.Timestamp,
|
||||||
|
limit *int, postLimit bool) (sers varint.S, err error) {
|
||||||
|
log.I.F("GetEventSerialsByCreatedAtRange")
|
||||||
// get the start (end) max possible index prefix
|
// get the start (end) max possible index prefix
|
||||||
startCreatedAt, _ := indexes.CreatedAtVars()
|
startCreatedAt, _ := indexes.CreatedAtVars()
|
||||||
startCreatedAt.FromInt(until.ToInt())
|
startCreatedAt.FromInt(until.ToInt())
|
||||||
@@ -125,6 +128,7 @@ func (d *D) GetEventSerialsByCreatedAtRange(since, until timestamp.Timestamp) (s
|
|||||||
if err = indexes.CreatedAtEnc(startCreatedAt, nil).MarshalWrite(prf); chk.E(err) {
|
if err = indexes.CreatedAtEnc(startCreatedAt, nil).MarshalWrite(prf); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
var count int
|
||||||
if err = d.View(func(txn *badger.Txn) (err error) {
|
if err = d.View(func(txn *badger.Txn) (err error) {
|
||||||
it := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: prf.Bytes()})
|
it := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: prf.Bytes()})
|
||||||
defer it.Close()
|
defer it.Close()
|
||||||
@@ -138,19 +142,70 @@ func (d *D) GetEventSerialsByCreatedAtRange(since, until timestamp.Timestamp) (s
|
|||||||
// skip it then
|
// skip it then
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if ca.ToTimestamp() < since {
|
if ca.ToTimestamp() < *since {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
sers = append(sers, ser)
|
sers = append(sers, ser)
|
||||||
|
count++
|
||||||
|
if !postLimit && count > *limit {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}); chk.E(err) {
|
}); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if postLimit && len(sers) > *limit {
|
||||||
|
sers = sers[:*limit]
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *D) GetEventSerialsByKindsCreatedAtRange(kinds []int, since, until timestamp.Timestamp) (sers varint.S, err error) {
|
func (d *D) GetEventSerialsByKinds(kinds []int, limit *int) (sers varint.S, err error) {
|
||||||
|
log.I.F("GetEventSerialsByKinds")
|
||||||
|
// get the start (end) max possible index prefix, one for each kind in the list
|
||||||
|
var searchIdxs [][]byte
|
||||||
|
kind, _ := indexes.KindVars()
|
||||||
|
for _, k := range kinds {
|
||||||
|
kind.Set(k)
|
||||||
|
prf := new(bytes.Buffer)
|
||||||
|
if err = indexes.KindEnc(kind, nil).MarshalWrite(prf); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
searchIdxs = append(searchIdxs, prf.Bytes())
|
||||||
|
}
|
||||||
|
// log.I.S(searchIdxs)
|
||||||
|
var count int
|
||||||
|
for _, idx := range searchIdxs {
|
||||||
|
if err = d.View(func(txn *badger.Txn) (err error) {
|
||||||
|
// it := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: idx})
|
||||||
|
it := txn.NewIterator(badger.DefaultIteratorOptions)
|
||||||
|
defer it.Close()
|
||||||
|
var key []byte
|
||||||
|
for it.Seek(idx); it.ValidForPrefix(idx); it.Next() {
|
||||||
|
item := it.Item()
|
||||||
|
key = item.KeyCopy(nil)
|
||||||
|
ki, ser := indexes.KindVars()
|
||||||
|
buf := bytes.NewBuffer(key)
|
||||||
|
if err = indexes.KindDec(ki, ser).UnmarshalRead(buf); chk.E(err) {
|
||||||
|
// skip it then
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sers = append(sers, ser)
|
||||||
|
count++
|
||||||
|
if limit != nil && count >= *limit {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *D) GetEventSerialsByKindsCreatedAtRange(kinds []int, since, until *timestamp.Timestamp, limit *int) (sers varint.S, err error) {
|
||||||
// get the start (end) max possible index prefix, one for each kind in the list
|
// get the start (end) max possible index prefix, one for each kind in the list
|
||||||
var searchIdxs [][]byte
|
var searchIdxs [][]byte
|
||||||
kind, startCreatedAt, _ := indexes.KindCreatedAtVars()
|
kind, startCreatedAt, _ := indexes.KindCreatedAtVars()
|
||||||
@@ -163,11 +218,12 @@ func (d *D) GetEventSerialsByKindsCreatedAtRange(kinds []int, since, until times
|
|||||||
}
|
}
|
||||||
searchIdxs = append(searchIdxs, prf.Bytes())
|
searchIdxs = append(searchIdxs, prf.Bytes())
|
||||||
}
|
}
|
||||||
|
var count int
|
||||||
for _, idx := range searchIdxs {
|
for _, idx := range searchIdxs {
|
||||||
if err = d.View(func(txn *badger.Txn) (err error) {
|
if err = d.View(func(txn *badger.Txn) (err error) {
|
||||||
it := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: idx})
|
it := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: idx})
|
||||||
defer it.Close()
|
defer it.Close()
|
||||||
key := make([]byte, 10)
|
var key []byte
|
||||||
for it.Rewind(); it.Valid(); it.Next() {
|
for it.Rewind(); it.Valid(); it.Next() {
|
||||||
item := it.Item()
|
item := it.Item()
|
||||||
key = item.KeyCopy(key)
|
key = item.KeyCopy(key)
|
||||||
@@ -177,10 +233,14 @@ func (d *D) GetEventSerialsByKindsCreatedAtRange(kinds []int, since, until times
|
|||||||
// skip it then
|
// skip it then
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if ca.ToTimestamp() < since {
|
if ca.ToTimestamp() < *since {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
sers = append(sers, ser)
|
sers = append(sers, ser)
|
||||||
|
count++
|
||||||
|
if count > *limit {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}); chk.E(err) {
|
}); chk.E(err) {
|
||||||
@@ -190,7 +250,58 @@ func (d *D) GetEventSerialsByKindsCreatedAtRange(kinds []int, since, until times
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *D) GetEventSerialsByAuthorsCreatedAtRange(pubkeys []string, since, until timestamp.Timestamp) (sers varint.S, err error) {
|
func (d *D) GetEventSerialsByAuthors(pubkeys []string, limit *int) (sers varint.S, err error) {
|
||||||
|
// get the start (end) max possible index prefix, one for each kind in the list
|
||||||
|
var searchIdxs [][]byte
|
||||||
|
var pkDecodeErrs int
|
||||||
|
pubkey, _ := indexes.PubkeyVars()
|
||||||
|
for _, p := range pubkeys {
|
||||||
|
if err = pubkey.FromPubkeyHex(p); chk.E(err) {
|
||||||
|
// gracefully ignore wrong keys
|
||||||
|
pkDecodeErrs++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if pkDecodeErrs == len(pubkeys) {
|
||||||
|
err = errorf.E("all pubkeys in authors field of filter failed to decode")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
prf := new(bytes.Buffer)
|
||||||
|
if err = indexes.PubkeyEnc(pubkey, nil).MarshalWrite(prf); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
searchIdxs = append(searchIdxs, prf.Bytes())
|
||||||
|
}
|
||||||
|
var count int
|
||||||
|
for _, idx := range searchIdxs {
|
||||||
|
if err = d.View(func(txn *badger.Txn) (err error) {
|
||||||
|
it := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: idx})
|
||||||
|
defer it.Close()
|
||||||
|
key := make([]byte, 10)
|
||||||
|
for it.Rewind(); it.Valid(); it.Next() {
|
||||||
|
item := it.Item()
|
||||||
|
key = item.KeyCopy(key)
|
||||||
|
kind, ca, ser := indexes.KindCreatedAtVars()
|
||||||
|
buf := bytes.NewBuffer(key)
|
||||||
|
if err = indexes.KindCreatedAtDec(kind, ca, ser).UnmarshalRead(buf); chk.E(err) {
|
||||||
|
// skip it then
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sers = append(sers, ser)
|
||||||
|
count++
|
||||||
|
if count > *limit {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *D) GetEventSerialsByAuthorsCreatedAtRange(pubkeys []string,
|
||||||
|
since, until *timestamp.Timestamp, limit *int) (sers varint.S, err error) {
|
||||||
// get the start (end) max possible index prefix, one for each kind in the list
|
// get the start (end) max possible index prefix, one for each kind in the list
|
||||||
var searchIdxs [][]byte
|
var searchIdxs [][]byte
|
||||||
var pkDecodeErrs int
|
var pkDecodeErrs int
|
||||||
@@ -212,6 +323,7 @@ func (d *D) GetEventSerialsByAuthorsCreatedAtRange(pubkeys []string, since, unti
|
|||||||
}
|
}
|
||||||
searchIdxs = append(searchIdxs, prf.Bytes())
|
searchIdxs = append(searchIdxs, prf.Bytes())
|
||||||
}
|
}
|
||||||
|
var count int
|
||||||
for _, idx := range searchIdxs {
|
for _, idx := range searchIdxs {
|
||||||
if err = d.View(func(txn *badger.Txn) (err error) {
|
if err = d.View(func(txn *badger.Txn) (err error) {
|
||||||
it := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: idx})
|
it := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: idx})
|
||||||
@@ -226,10 +338,14 @@ func (d *D) GetEventSerialsByAuthorsCreatedAtRange(pubkeys []string, since, unti
|
|||||||
// skip it then
|
// skip it then
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if ca.ToTimestamp() < since {
|
if ca.ToTimestamp() < *since {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
sers = append(sers, ser)
|
sers = append(sers, ser)
|
||||||
|
count++
|
||||||
|
if count > *limit {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}); chk.E(err) {
|
}); chk.E(err) {
|
||||||
@@ -239,7 +355,8 @@ func (d *D) GetEventSerialsByAuthorsCreatedAtRange(pubkeys []string, since, unti
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *D) GetEventSerialsByKindsAuthorsCreatedAtRange(kinds []int, pubkeys []string, since, until timestamp.Timestamp) (sers varint.S, err error) {
|
func (d *D) GetEventSerialsByKindsAuthorsCreatedAtRange(kinds []int, pubkeys []string,
|
||||||
|
since, until *timestamp.Timestamp, limit *int) (sers varint.S, err error) {
|
||||||
// get the start (end) max possible index prefix, one for each kind in the list
|
// get the start (end) max possible index prefix, one for each kind in the list
|
||||||
var searchIdxs [][]byte
|
var searchIdxs [][]byte
|
||||||
var pkDecodeErrs int
|
var pkDecodeErrs int
|
||||||
@@ -264,6 +381,7 @@ func (d *D) GetEventSerialsByKindsAuthorsCreatedAtRange(kinds []int, pubkeys []s
|
|||||||
searchIdxs = append(searchIdxs, prf.Bytes())
|
searchIdxs = append(searchIdxs, prf.Bytes())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
var count int
|
||||||
for _, idx := range searchIdxs {
|
for _, idx := range searchIdxs {
|
||||||
if err = d.View(func(txn *badger.Txn) (err error) {
|
if err = d.View(func(txn *badger.Txn) (err error) {
|
||||||
it := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: idx})
|
it := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: idx})
|
||||||
@@ -278,10 +396,14 @@ func (d *D) GetEventSerialsByKindsAuthorsCreatedAtRange(kinds []int, pubkeys []s
|
|||||||
// skip it then
|
// skip it then
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if ca.ToTimestamp() < since {
|
if ca.ToTimestamp() < *since {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
sers = append(sers, ser)
|
sers = append(sers, ser)
|
||||||
|
count++
|
||||||
|
if count > *limit {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}); chk.E(err) {
|
}); chk.E(err) {
|
||||||
@@ -293,7 +415,7 @@ func (d *D) GetEventSerialsByKindsAuthorsCreatedAtRange(kinds []int, pubkeys []s
|
|||||||
|
|
||||||
// GetEventSerialsByTagsCreatedAtRange searches for events that match the tags in a filter and
|
// GetEventSerialsByTagsCreatedAtRange searches for events that match the tags in a filter and
|
||||||
// returns the list of serials that were found.
|
// returns the list of serials that were found.
|
||||||
func (d *D) GetEventSerialsByTagsCreatedAtRange(t filter.TagMap) (sers varint.S, err error) {
|
func (d *D) GetEventSerialsByTagsCreatedAtRange(t filter.TagMap, limit *int) (sers varint.S, err error) {
|
||||||
if len(t) < 1 {
|
if len(t) < 1 {
|
||||||
err = errorf.E("no tags provided")
|
err = errorf.E("no tags provided")
|
||||||
return
|
return
|
||||||
@@ -424,20 +546,21 @@ func (d *D) GetEventSerialsByTagsCreatedAtRange(t filter.TagMap) (sers varint.S,
|
|||||||
searchIdxs = append(searchIdxs, buf.Bytes())
|
searchIdxs = append(searchIdxs, buf.Bytes())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// todo: implement
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetEventSerialsByAuthorsTagsCreatedAtRange first performs
|
// GetEventSerialsByAuthorsTagsCreatedAtRange first performs
|
||||||
func (d *D) GetEventSerialsByAuthorsTagsCreatedAtRange(t filter.TagMap, pubkeys []string, since, until timestamp.Timestamp) (sers varint.S, err error) {
|
func (d *D) GetEventSerialsByAuthorsTagsCreatedAtRange(t filter.TagMap, pubkeys []string, since, until *timestamp.Timestamp, limit *int) (sers varint.S, err error) {
|
||||||
var acSers, tagSers varint.S
|
var acSers, tagSers varint.S
|
||||||
if acSers, err = d.GetEventSerialsByAuthorsCreatedAtRange(pubkeys, since, until); chk.E(err) {
|
if acSers, err = d.GetEventSerialsByAuthorsCreatedAtRange(pubkeys, since, until, limit); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// now we have the most limited set of serials that are included by the pubkeys, we can then
|
// now we have the most limited set of serials that are included by the pubkeys, we can then
|
||||||
// construct the tags searches for all of these serials to filter out the events that don't
|
// construct the tags searches for all of these serials to filter out the events that don't
|
||||||
// have both author AND one of the tags.
|
// have both author AND one of the tags.
|
||||||
if tagSers, err = d.GetEventSerialsByTagsCreatedAtRange(t); chk.E(err) {
|
if tagSers, err = d.GetEventSerialsByTagsCreatedAtRange(t, limit); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// remove the serials that are not present in both lists.
|
// remove the serials that are not present in both lists.
|
||||||
@@ -446,15 +569,15 @@ func (d *D) GetEventSerialsByAuthorsTagsCreatedAtRange(t filter.TagMap, pubkeys
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetEventSerialsByKindsTagsCreatedAtRange first performs
|
// GetEventSerialsByKindsTagsCreatedAtRange first performs
|
||||||
func (d *D) GetEventSerialsByKindsTagsCreatedAtRange(t filter.TagMap, kinds []int, since, until timestamp.Timestamp) (sers varint.S, err error) {
|
func (d *D) GetEventSerialsByKindsTagsCreatedAtRange(t filter.TagMap, kinds []int, since, until *timestamp.Timestamp, limit *int) (sers varint.S, err error) {
|
||||||
var acSers, tagSers varint.S
|
var acSers, tagSers varint.S
|
||||||
if acSers, err = d.GetEventSerialsByKindsCreatedAtRange(kinds, since, until); chk.E(err) {
|
if acSers, err = d.GetEventSerialsByKindsCreatedAtRange(kinds, since, until, limit); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// now we have the most limited set of serials that are included by the pubkeys, we can then
|
// now we have the most limited set of serials that are included by the pubkeys, we can then
|
||||||
// construct the tags searches for all of these serials to filter out the events that don't
|
// construct the tags searches for all of these serials to filter out the events that don't
|
||||||
// have both author AND one of the tags.
|
// have both author AND one of the tags.
|
||||||
if tagSers, err = d.GetEventSerialsByTagsCreatedAtRange(t); chk.E(err) {
|
if tagSers, err = d.GetEventSerialsByTagsCreatedAtRange(t, limit); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// remove the serials that are not present in both lists.
|
// remove the serials that are not present in both lists.
|
||||||
@@ -463,15 +586,18 @@ func (d *D) GetEventSerialsByKindsTagsCreatedAtRange(t filter.TagMap, kinds []in
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetEventSerialsByKindsAuthorsTagsCreatedAtRange first performs
|
// GetEventSerialsByKindsAuthorsTagsCreatedAtRange first performs
|
||||||
func (d *D) GetEventSerialsByKindsAuthorsTagsCreatedAtRange(t filter.TagMap, kinds []int, pubkeys []string, since, until timestamp.Timestamp) (sers varint.S, err error) {
|
func (d *D) GetEventSerialsByKindsAuthorsTagsCreatedAtRange(t filter.TagMap, kinds []int,
|
||||||
|
pubkeys []string, since, until *timestamp.Timestamp,
|
||||||
|
limit *int) (sers varint.S, err error) {
|
||||||
var acSers, tagSers varint.S
|
var acSers, tagSers varint.S
|
||||||
if acSers, err = d.GetEventSerialsByKindsAuthorsCreatedAtRange(kinds, pubkeys, since, until); chk.E(err) {
|
if acSers, err = d.GetEventSerialsByKindsAuthorsCreatedAtRange(kinds, pubkeys,
|
||||||
|
since, until, limit); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// now we have the most limited set of serials that are included by the pubkeys, we can then
|
// now we have the most limited set of serials that are included by the pubkeys, we can then
|
||||||
// construct the tags searches for all of these serials to filter out the events that don't
|
// construct the tags searches for all of these serials to filter out the events that don't
|
||||||
// have both author AND one of the tags.
|
// have both author AND one of the tags.
|
||||||
if tagSers, err = d.GetEventSerialsByTagsCreatedAtRange(t); chk.E(err) {
|
if tagSers, err = d.GetEventSerialsByTagsCreatedAtRange(t, limit); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// remove the serials that are not present in both lists.
|
// remove the serials that are not present in both lists.
|
||||||
@@ -480,6 +606,7 @@ func (d *D) GetEventSerialsByKindsAuthorsTagsCreatedAtRange(t filter.TagMap, kin
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *D) GetFullIndexesFromSerials(sers varint.S) (index []indexes.FullIndex, err error) {
|
func (d *D) GetFullIndexesFromSerials(sers varint.S) (index []indexes.FullIndex, err error) {
|
||||||
|
log.I.F("GetFullIndexesFromSerials")
|
||||||
for _, ser := range sers {
|
for _, ser := range sers {
|
||||||
if err = d.View(func(txn *badger.Txn) (err error) {
|
if err = d.View(func(txn *badger.Txn) (err error) {
|
||||||
buf := new(bytes.Buffer)
|
buf := new(bytes.Buffer)
|
||||||
|
|||||||
@@ -120,12 +120,16 @@ func TestGetEventIndexes(t *testing.T) {
|
|||||||
if indices, _, err = d.GetEventIndexes(ev); chk.E(err) {
|
if indices, _, err = d.GetEventIndexes(ev); chk.E(err) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
log.I.S(indices)
|
||||||
datasize += len(b)
|
datasize += len(b)
|
||||||
for _, v := range indices {
|
for _, v := range indices {
|
||||||
size += len(v)
|
size += len(v)
|
||||||
}
|
}
|
||||||
_ = indices
|
_ = indices
|
||||||
count++
|
count++
|
||||||
|
if count > 1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
log.I.F("unmarshaled, verified and indexed %d events in %s, %d Mb of indexes from %d Mb of events, %d Mb as binary, failed verify %d, failed encode %d", count, time.Now().Sub(start), size/units.Mb, datasize/units.Mb, binsize/units.Mb, errs, encErrs)
|
log.I.F("unmarshaled, verified and indexed %d events in %s, %d Mb of indexes from %d Mb of events, %d Mb as binary, failed verify %d, failed encode %d", count, time.Now().Sub(start), size/units.Mb, datasize/units.Mb, binsize/units.Mb, errs, encErrs)
|
||||||
d.Close()
|
d.Close()
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package indexes
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
"x.realy.lol/chk"
|
"x.realy.lol/chk"
|
||||||
"x.realy.lol/codec"
|
"x.realy.lol/codec"
|
||||||
@@ -32,10 +33,10 @@ func New(encoders ...codec.I) (i *T) { return &T{encoders} }
|
|||||||
|
|
||||||
func (t *T) MarshalWrite(w io.Writer) (err error) {
|
func (t *T) MarshalWrite(w io.Writer) (err error) {
|
||||||
for _, e := range t.Encs {
|
for _, e := range t.Encs {
|
||||||
if e == nil {
|
if e == nil || reflect.ValueOf(e).IsNil() {
|
||||||
// allow a field to be empty, as is needed for search indexes to create search
|
// allow a field to be empty, as is needed for search indexes to create search
|
||||||
// prefixes.
|
// prefixes.
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
if err = e.MarshalWrite(w); chk.E(err) {
|
if err = e.MarshalWrite(w); chk.E(err) {
|
||||||
return
|
return
|
||||||
@@ -173,7 +174,7 @@ func KindCreatedAtVars() (ki *kindidx.T, ca *timestamp.T, ser *varint.V) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
func KindCreatedAtEnc(ki *kindidx.T, ca *timestamp.T, ser *varint.V) (enc *T) {
|
func KindCreatedAtEnc(ki *kindidx.T, ca *timestamp.T, ser *varint.V) (enc *T) {
|
||||||
return New(prefix.New(prefixes.Kind), ki, ca, ser)
|
return New(prefix.New(prefixes.KindCreatedAt), ki, ca, ser)
|
||||||
}
|
}
|
||||||
func KindCreatedAtDec(ki *kindidx.T, ca *timestamp.T, ser *varint.V) (enc *T) {
|
func KindCreatedAtDec(ki *kindidx.T, ca *timestamp.T, ser *varint.V) (enc *T) {
|
||||||
return New(prefix.New(), ki, ca, ser)
|
return New(prefix.New(), ki, ca, ser)
|
||||||
@@ -185,7 +186,7 @@ func KindPubkeyCreatedAtVars() (ki *kindidx.T, p *pubhash.T, ca *timestamp.T, se
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
func KindPubkeyCreatedAtEnc(ki *kindidx.T, p *pubhash.T, ca *timestamp.T, ser *varint.V) (enc *T) {
|
func KindPubkeyCreatedAtEnc(ki *kindidx.T, p *pubhash.T, ca *timestamp.T, ser *varint.V) (enc *T) {
|
||||||
return New(prefix.New(prefixes.Kind), ki, p, ca, ser)
|
return New(prefix.New(prefixes.KindPubkeyCreatedAt), ki, p, ca, ser)
|
||||||
}
|
}
|
||||||
func KindPubkeyCreatedAtDec(ki *kindidx.T, p *pubhash.T, ca *timestamp.T, ser *varint.V) (enc *T) {
|
func KindPubkeyCreatedAtDec(ki *kindidx.T, p *pubhash.T, ca *timestamp.T, ser *varint.V) (enc *T) {
|
||||||
return New(prefix.New(), ki, p, ca, ser)
|
return New(prefix.New(), ki, p, ca, ser)
|
||||||
|
|||||||
Reference in New Issue
Block a user