From 9d87d1fd8ad6ed383a0cd920856721d8ef30f72f Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Thu, 8 Feb 2024 12:36:34 -0300 Subject: [PATCH] boltdb support. --- bolt/count.go | 57 ++++++++++ bolt/delete.go | 31 ++++++ bolt/helpers.go | 114 ++++++++++++++++++++ bolt/lib.go | 93 ++++++++++++++++ bolt/migration.go | 32 ++++++ bolt/query.go | 263 ++++++++++++++++++++++++++++++++++++++++++++++ bolt/save.go | 54 ++++++++++ go.mod | 1 + go.sum | 2 + 9 files changed, 647 insertions(+) create mode 100644 bolt/count.go create mode 100644 bolt/delete.go create mode 100644 bolt/helpers.go create mode 100644 bolt/lib.go create mode 100644 bolt/migration.go create mode 100644 bolt/query.go create mode 100644 bolt/save.go diff --git a/bolt/count.go b/bolt/count.go new file mode 100644 index 0000000..49e3c1f --- /dev/null +++ b/bolt/count.go @@ -0,0 +1,57 @@ +package bolt + +import ( + "bytes" + "context" + "encoding/binary" + "log" + + "github.com/boltdb/bolt" + "github.com/nbd-wtf/go-nostr" + nostr_binary "github.com/nbd-wtf/go-nostr/binary" +) + +func (b *BoltBackend) CountEvents(ctx context.Context, filter nostr.Filter) (int64, error) { + var count int64 = 0 + + queries, extraFilter, since, err := prepareQueries(filter) + if err != nil { + return 0, err + } + + err = b.db.View(func(txn *bolt.Tx) error { + // actually iterate + for _, q := range queries { + bucket := txn.Bucket(q.bucket) + raw := txn.Bucket(bucketRaw) + + c := bucket.Cursor() + for k, v := c.Seek(q.startingPoint); k != nil && bytes.HasPrefix(k, q.prefix); k, v = c.Prev() { + // "id" indexes don't contain a timestamp + if !q.skipTimestamp { + createdAt := binary.BigEndian.Uint32(k[len(k)-4:]) + if createdAt < since { + break + } + } + + // fetch actual event + val := raw.Get(v) + evt := &nostr.Event{} + if err := nostr_binary.Unmarshal(val, evt); err != nil { + log.Printf("bolt: value read error (id %x): %s\n", val[0:32], err) + break + } + + // check if this matches the other filters that were not part of the index before yielding + if extraFilter == nil || extraFilter.Matches(evt) { + count++ + } + } + } + + return nil + }) + + return count, err +} diff --git a/bolt/delete.go b/bolt/delete.go new file mode 100644 index 0000000..b0b0691 --- /dev/null +++ b/bolt/delete.go @@ -0,0 +1,31 @@ +package bolt + +import ( + "context" + "encoding/hex" + + "github.com/boltdb/bolt" + "github.com/nbd-wtf/go-nostr" +) + +func (b *BoltBackend) DeleteEvent(ctx context.Context, evt *nostr.Event) error { + return b.db.Update(func(txn *bolt.Tx) error { + idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2]) + + // check if we already do not have this + bucket := txn.Bucket(bucketId) + seqb := bucket.Get(idPrefix8) + if seqb == nil { + return nil + } + + // calculate all index keys we have for this event and delete them + for _, k := range getIndexKeysForEvent(evt) { + bucket := txn.Bucket(k.bucket) + bucket.Delete(k.key) + } + + // delete the raw event + return txn.Bucket(bucketRaw).Delete(seqb) + }) +} diff --git a/bolt/helpers.go b/bolt/helpers.go new file mode 100644 index 0000000..afc297e --- /dev/null +++ b/bolt/helpers.go @@ -0,0 +1,114 @@ +package bolt + +import ( + "encoding/binary" + "encoding/hex" + + "github.com/fiatjaf/eventstore" + "github.com/nbd-wtf/go-nostr" + "golang.org/x/exp/slices" +) + +// returns +// - the bucket id where this will be saved +// - the key with full length for created_at and idx at the end, but not filled with these +// - the offset -- i.e. where the prefix ends and the created_at and idx would start +func getTagIndexPrefix(tagValue string) (bucket []byte, key []byte, offset int) { + if kind, pkb, d := eventstore.GetAddrTagElements(tagValue); len(pkb) == 32 { + // store value in the new special "a" tag index + key = make([]byte, 2+8+len(d)+4) + binary.BigEndian.PutUint16(key[1:], kind) + copy(key[2:], pkb[0:8]) + copy(key[2+8:], d) + offset = 2 + 8 + len(d) + bucket = bucketTagAddr + } else if vb, _ := hex.DecodeString(tagValue); len(vb) == 32 { + // store value as bytes + key = make([]byte, 8+4) + copy(key, vb[0:8]) + offset = 8 + bucket = bucketTag32 + } else { + // store whatever as utf-8 + key = make([]byte, len(tagValue)+4) + copy(key, tagValue) + offset = len(tagValue) + bucket = bucketTag + } + + return bucket, key, offset +} + +type keymeta struct { + bucket []byte + key []byte +} + +func getIndexKeysForEvent(evt *nostr.Event) []keymeta { + keys := make([]keymeta, 0, 24) + + // indexes + { + // ~ by id + idPrefix8, _ := hex.DecodeString(evt.ID[0 : 8*2]) + k := idPrefix8 + keys = append(keys, keymeta{bucket: bucketId, key: k}) + } + + { + // ~ by pubkey+date + pubkeyPrefix8, _ := hex.DecodeString(evt.PubKey[0 : 8*2]) + k := make([]byte, 8+4) + copy(k[:], pubkeyPrefix8) + binary.BigEndian.PutUint32(k[8:], uint32(evt.CreatedAt)) + keys = append(keys, keymeta{bucket: bucketPubkey, key: k}) + } + + { + // ~ by kind+date + k := make([]byte, 2+4) + binary.BigEndian.PutUint16(k[:], uint16(evt.Kind)) + binary.BigEndian.PutUint32(k[2:], uint32(evt.CreatedAt)) + keys = append(keys, keymeta{bucket: bucketKind, key: k}) + } + + { + // ~ by pubkey+kind+date + pubkeyPrefix8, _ := hex.DecodeString(evt.PubKey[0 : 8*2]) + k := make([]byte, 8+2+4) + copy(k[:], pubkeyPrefix8) + binary.BigEndian.PutUint16(k[8:], uint16(evt.Kind)) + binary.BigEndian.PutUint32(k[8+2:], uint32(evt.CreatedAt)) + keys = append(keys, keymeta{bucket: bucketPubkeyKind, key: k}) + } + + // ~ by tagvalue+date + for i, tag := range evt.Tags { + if len(tag) < 2 || len(tag[0]) != 1 || len(tag[1]) == 0 || len(tag[1]) > 100 { + // not indexable + continue + } + firstIndex := slices.IndexFunc(evt.Tags, func(t nostr.Tag) bool { return len(t) >= 2 && t[1] == tag[1] }) + if firstIndex != i { + // duplicate + continue + } + + // get key prefix (with full length) and offset where to write the created_at + bucket, k, offset := getTagIndexPrefix(tag[1]) + + // write the created_at + binary.BigEndian.PutUint32(k[offset:], uint32(evt.CreatedAt)) + + keys = append(keys, keymeta{bucket: bucket, key: k}) + } + + { + // ~ by date only + k := make([]byte, 4) + binary.BigEndian.PutUint32(k[:], uint32(evt.CreatedAt)) + keys = append(keys, keymeta{bucket: bucketCreatedAt, key: k}) + } + + return keys +} diff --git a/bolt/lib.go b/bolt/lib.go new file mode 100644 index 0000000..7e68222 --- /dev/null +++ b/bolt/lib.go @@ -0,0 +1,93 @@ +package bolt + +import ( + "sync/atomic" + + "github.com/boltdb/bolt" + "github.com/fiatjaf/eventstore" +) + +const ( + maxuint16 = 65535 + maxuint32 = 4294967295 +) + +var ( + bucketSettings = []byte{99} + bucketRaw = []byte{1} + bucketCreatedAt = []byte{2} + bucketId = []byte{3} + bucketKind = []byte{4} + bucketPubkey = []byte{5} + bucketPubkeyKind = []byte{6} + bucketTag = []byte{7} + bucketTag32 = []byte{8} + bucketTagAddr = []byte{9} +) + +var _ eventstore.Store = (*BoltBackend)(nil) + +type BoltBackend struct { + Path string + MaxLimit int + + db *bolt.DB + + lastId atomic.Uint32 +} + +func (b *BoltBackend) Init() error { + if b.MaxLimit == 0 { + b.MaxLimit = 500 + } + + // open boltdb + db, err := bolt.Open(b.Path, 0644, nil) + if err != nil { + return err + } + b.db = db + + // open each bucket + if err := b.db.Update(func(txn *bolt.Tx) error { + if _, err := txn.CreateBucket(bucketSettings); err != nil && err != bolt.ErrBucketExists { + return err + } + if _, err := txn.CreateBucket(bucketRaw); err != nil && err != bolt.ErrBucketExists { + return err + } + if _, err := txn.CreateBucket(bucketCreatedAt); err != nil && err != bolt.ErrBucketExists { + return err + } + if _, err := txn.CreateBucket(bucketId); err != nil && err != bolt.ErrBucketExists { + return err + } + if _, err := txn.CreateBucket(bucketKind); err != nil && err != bolt.ErrBucketExists { + return err + } + if _, err := txn.CreateBucket(bucketPubkey); err != nil && err != bolt.ErrBucketExists { + return err + } + if _, err := txn.CreateBucket(bucketPubkeyKind); err != nil && err != bolt.ErrBucketExists { + return err + } + if _, err := txn.CreateBucket(bucketTag); err != nil && err != bolt.ErrBucketExists { + return err + } + if _, err := txn.CreateBucket(bucketTag32); err != nil && err != bolt.ErrBucketExists { + return err + } + if _, err := txn.CreateBucket(bucketTagAddr); err != nil && err != bolt.ErrBucketExists { + return err + } + return nil + }); err != nil { + return err + } + + return b.runMigrations() +} + +func (b *BoltBackend) Close() { + b.db.Close() +} diff --git a/bolt/migration.go b/bolt/migration.go new file mode 100644 index 0000000..c5c6dae --- /dev/null +++ b/bolt/migration.go @@ -0,0 +1,32 @@ +package bolt + +import ( + "encoding/binary" + + "github.com/boltdb/bolt" +) + +const ( + DB_VERSION byte = 'v' +) + +func (b *BoltBackend) runMigrations() error { + return b.db.Update(func(txn *bolt.Tx) error { + var version uint16 + v := txn.Bucket(bucketSettings).Get([]byte{DB_VERSION}) + if v == nil { + version = 0 + } else { + version = binary.BigEndian.Uint16(v) + } + + // do the migrations in increasing steps (there is no rollback) + // + + if version < 0 { + // ... + } + + return nil + }) +} diff --git a/bolt/query.go b/bolt/query.go new file mode 100644 index 0000000..36c5367 --- /dev/null +++ b/bolt/query.go @@ -0,0 +1,263 @@ +package bolt + +import ( + "bytes" + "container/heap" + "context" + "encoding/binary" + "encoding/hex" + "fmt" + "log" + + "github.com/boltdb/bolt" + "github.com/nbd-wtf/go-nostr" + nostr_binary "github.com/nbd-wtf/go-nostr/binary" +) + +type query struct { + i int + bucket []byte + prefix []byte + startingPoint []byte + results chan *nostr.Event + skipTimestamp bool +} + +type queryEvent struct { + *nostr.Event + query int +} + +func (b *BoltBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { + queries, extraFilter, since, err := prepareQueries(filter) + if err != nil { + return nil, err + } + + ch := make(chan *nostr.Event) + go func() { + defer close(ch) + + for _, q := range queries { + q := q + go b.db.View(func(txn *bolt.Tx) error { + defer close(q.results) + + bucket := txn.Bucket(q.bucket) + raw := txn.Bucket(bucketRaw) + + c := bucket.Cursor() + + k, v := c.Seek(q.startingPoint) + if k == nil { + k, v = c.Last() + } else { + k, v = c.Prev() + } + + for ; k != nil && bytes.HasPrefix(k, q.prefix); k, v = c.Prev() { + // "id" indexes don't contain a timestamp + if !q.skipTimestamp { + createdAt := binary.BigEndian.Uint32(k[len(k)-4:]) + if createdAt < since { + break + } + } + + // fetch actual event + val := raw.Get(v) + evt := &nostr.Event{} + if err := nostr_binary.Unmarshal(val, evt); err != nil { + log.Printf("bolt: value read error (id %x): %s\n", val[0:32], err) + break + } + + // check if this matches the other filters that were not part of the index before yielding + if extraFilter == nil || extraFilter.Matches(evt) { + select { + case q.results <- evt: + case <-ctx.Done(): + break + } + } + } + return nil + }) + } + + // max number of events we'll return + limit := b.MaxLimit + if filter.Limit > 0 && filter.Limit < limit { + limit = filter.Limit + } + + // receive results and ensure we only return the most recent ones always + emittedEvents := 0 + + // first pass + emitQueue := make(priorityQueue, 0, len(queries)+limit) + for _, q := range queries { + evt, ok := <-q.results + if ok { + emitQueue = append(emitQueue, &queryEvent{Event: evt, query: q.i}) + } + } + + // queue may be empty here if we have literally nothing + if len(emitQueue) == 0 { + return + } + + heap.Init(&emitQueue) + + // iterate until we've emitted all events required + for { + // emit latest event in queue + latest := emitQueue[0] + ch <- latest.Event + + // stop when reaching limit + emittedEvents++ + if emittedEvents >= limit { + break + } + + // fetch a new one from query results and replace the previous one with it + if evt, ok := <-queries[latest.query].results; ok { + emitQueue[0].Event = evt + heap.Fix(&emitQueue, 0) + } else { + // if this query has no more events we just remove this and proceed normally + heap.Remove(&emitQueue, 0) + + // check if the list is empty and end + if len(emitQueue) == 0 { + break + } + } + } + }() + + return ch, nil +} + +type priorityQueue []*queryEvent + +func (pq priorityQueue) Len() int { return len(pq) } + +func (pq priorityQueue) Less(i, j int) bool { + return pq[i].CreatedAt > pq[j].CreatedAt +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] +} + +func (pq *priorityQueue) Push(x any) { + item := x.(*queryEvent) + *pq = append(*pq, item) +} + +func (pq *priorityQueue) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + *pq = old[0 : n-1] + return item +} + +func prepareQueries(filter nostr.Filter) (queries []query, extraFilter *nostr.Filter, since uint32, err error) { + if len(filter.IDs) > 0 { + queries = make([]query, len(filter.IDs)) + for i, idHex := range filter.IDs { + if len(idHex) != 64 { + return nil, nil, 0, fmt.Errorf("invalid id '%s'", idHex) + } + prefix, _ := hex.DecodeString(idHex[0 : 8*2]) + queries[i] = query{i: i, bucket: bucketId, prefix: prefix, skipTimestamp: true} + } + } else if len(filter.Authors) > 0 { + if len(filter.Kinds) == 0 { + queries = make([]query, len(filter.Authors)) + for i, pubkeyHex := range filter.Authors { + if len(pubkeyHex) != 64 { + return nil, nil, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) + } + prefix, _ := hex.DecodeString(pubkeyHex[0 : 8*2]) + queries[i] = query{i: i, bucket: bucketPubkey, prefix: prefix} + } + } else { + queries = make([]query, len(filter.Authors)*len(filter.Kinds)) + i := 0 + for _, pubkeyHex := range filter.Authors { + for _, kind := range filter.Kinds { + if len(pubkeyHex) != 64 { + return nil, nil, 0, fmt.Errorf("invalid pubkey '%s'", pubkeyHex) + } + pubkey, _ := hex.DecodeString(pubkeyHex[0 : 8*2]) + prefix := binary.BigEndian.AppendUint16(pubkey, uint16(kind)) + queries[i] = query{i: i, bucket: bucketPubkeyKind, prefix: prefix} + i++ + } + } + } + extraFilter = &nostr.Filter{Tags: filter.Tags} + } else if len(filter.Tags) > 0 { + // determine the size of the queries array by inspecting all tags sizes + size := 0 + for _, values := range filter.Tags { + size += len(values) + } + if size == 0 { + return nil, nil, 0, fmt.Errorf("empty tag filters") + } + + queries = make([]query, size) + + extraFilter = &nostr.Filter{Kinds: filter.Kinds} + i := 0 + for _, values := range filter.Tags { + for _, value := range values { + // get key prefix (with full length) and offset where to write the created_at + bucket, k, offset := getTagIndexPrefix(value) + // remove the last parts part to get just the prefix we want here + prefix := k[0:offset] + queries[i] = query{i: i, bucket: bucket, prefix: prefix} + i++ + } + } + } else if len(filter.Kinds) > 0 { + queries = make([]query, len(filter.Kinds)) + for i, kind := range filter.Kinds { + prefix := make([]byte, 2) + binary.BigEndian.PutUint16(prefix[:], uint16(kind)) + queries[i] = query{i: i, bucket: bucketKind, prefix: prefix} + } + } else { + queries = make([]query, 1) + prefix := make([]byte, 0) + queries[0] = query{i: 0, bucket: bucketCreatedAt, prefix: prefix} + extraFilter = nil + } + + var until uint32 = 4294967295 + if filter.Until != nil { + if fu := uint32(*filter.Until); fu < until { + until = fu + 1 + } + } + for i, q := range queries { + queries[i].startingPoint = binary.BigEndian.AppendUint32(q.prefix, uint32(until)) + queries[i].results = make(chan *nostr.Event, 12) + } + + // this is where we'll end the iteration + if filter.Since != nil { + if fs := uint32(*filter.Since); fs > since { + since = fs + } + } + + return queries, extraFilter, since, nil +} diff --git a/bolt/save.go b/bolt/save.go new file mode 100644 index 0000000..14fae0f --- /dev/null +++ b/bolt/save.go @@ -0,0 +1,54 @@ +package bolt + +import ( + "context" + "encoding/binary" + "encoding/hex" + "fmt" + + "github.com/boltdb/bolt" + "github.com/fiatjaf/eventstore" + "github.com/nbd-wtf/go-nostr" + nostr_binary "github.com/nbd-wtf/go-nostr/binary" +) + +func (b *BoltBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error { + // sanity checking + if evt.CreatedAt > maxuint32 || evt.Kind > maxuint16 { + return fmt.Errorf("event with values out of expected boundaries") + } + + return b.db.Update(func(txn *bolt.Tx) error { + id, _ := hex.DecodeString(evt.ID) + + // check if we already have this id + bucket := txn.Bucket(bucketId) + res := bucket.Get(id) + if res != nil { + return eventstore.ErrDupEvent + } + + // encode to binary form so we'll save it + bin, err := nostr_binary.Marshal(evt) + if err != nil { + return err + } + + // raw event store + raw := txn.Bucket(bucketRaw) + seq, _ := raw.NextSequence() + seqb := binary.BigEndian.AppendUint64(nil, seq) + if err := raw.Put(seqb, bin); err != nil { + return err + } + + for _, km := range getIndexKeysForEvent(evt) { + bucket := txn.Bucket(km.bucket) + if err := bucket.Put(km.key, seqb); err != nil { + return err + } + } + + return nil + }) +} diff --git a/go.mod b/go.mod index a63253f..fd64695 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/PowerDNS/lmdb-go v1.9.2 github.com/aquasecurity/esquery v0.2.0 + github.com/boltdb/bolt v1.3.1 github.com/dgraph-io/badger/v4 v4.2.0 github.com/elastic/go-elasticsearch/v8 v8.10.1 github.com/go-sql-driver/mysql v1.7.1 diff --git a/go.sum b/go.sum index 21691b8..edccdab 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/PowerDNS/lmdb-go v1.9.2 h1:Cmgerh9y3ZKBZGz1irxSShhfmFyRUh+Zdk4cZk7ZJv github.com/PowerDNS/lmdb-go v1.9.2/go.mod h1:TE0l+EZK8Z1B4dx070ZxkWTlp8RG1mjN0/+FkFRQMtU= github.com/aquasecurity/esquery v0.2.0 h1:9WWXve95TE8hbm3736WB7nS6Owl8UGDeu+0jiyE9ttA= github.com/aquasecurity/esquery v0.2.0/go.mod h1:VU+CIFR6C+H142HHZf9RUkp4Eedpo9UrEKeCQHWf9ao= +github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= +github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U= github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 h1:KdUfX2zKommPRa+PD0sWZUyXe9w277ABlgELO7H04IM=