From 830d51e96d46f62f1049ff530f15abef8cf0779a Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Thu, 8 Feb 2024 11:08:28 -0300 Subject: [PATCH] badger: use a different transaction for each goroutine. --- badger/query.go | 200 +++++++++++++++++++++++------------------------- 1 file changed, 94 insertions(+), 106 deletions(-) diff --git a/badger/query.go b/badger/query.go index c1ffaa9..9618233 100644 --- a/badger/query.go +++ b/badger/query.go @@ -35,131 +35,119 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch } go func() { - err := b.View(func(txn *badger.Txn) error { - // iterate only through keys and in reverse order - opts := badger.IteratorOptions{ - Reverse: true, - } + defer close(ch) - // actually iterate - iteratorClosers := make([]func(), len(queries)) - for i, q := range queries { - go func(i int, q query) { - it := txn.NewIterator(opts) - iteratorClosers[i] = it.Close + // actually iterate + for _, q := range queries { + q := q + go b.View(func(txn *badger.Txn) error { + // iterate only through keys and in reverse order + opts := badger.IteratorOptions{ + Reverse: true, + } - defer close(q.results) + it := txn.NewIterator(opts) + defer it.Close() + defer close(q.results) - for it.Seek(q.startingPoint); it.ValidForPrefix(q.prefix); it.Next() { - item := it.Item() - key := item.Key() + for it.Seek(q.startingPoint); it.ValidForPrefix(q.prefix); it.Next() { + item := it.Item() + key := item.Key() - idxOffset := len(key) - 4 // this is where the idx actually starts + idxOffset := len(key) - 4 // this is where the idx actually starts - // "id" indexes don't contain a timestamp - if !q.skipTimestamp { - createdAt := binary.BigEndian.Uint32(key[idxOffset-4 : idxOffset]) - if createdAt < since { - break - } + // "id" indexes don't contain a timestamp + if !q.skipTimestamp { + createdAt := binary.BigEndian.Uint32(key[idxOffset-4 : idxOffset]) + if createdAt < since { + break } - - idx := make([]byte, 5) - idx[0] = rawEventStorePrefix - copy(idx[1:], key[idxOffset:]) - - // fetch actual event - item, err := txn.Get(idx) - if err != nil { - if err == badger.ErrDiscardedTxn { - return - } - log.Printf("badger: failed to get %x based on prefix %x, index key %x from raw event store: %s\n", - idx, q.prefix, key, err) - return - } - item.Value(func(val []byte) error { - evt := &nostr.Event{} - if err := nostr_binary.Unmarshal(val, evt); err != nil { - log.Printf("badger: value read error (id %x): %s\n", val[0:32], err) - return err - } - - // check if this matches the other filters that were not part of the index - if extraFilter == nil || extraFilter.Matches(evt) { - q.results <- evt - } - - return nil - }) } - }(i, q) - } - // max number of events we'll return - limit := b.MaxLimit - if filter.Limit > 0 && filter.Limit < limit { - limit = filter.Limit - } + idx := make([]byte, 5) + idx[0] = rawEventStorePrefix + copy(idx[1:], key[idxOffset:]) - // receive results and ensure we only return the most recent ones always - emittedEvents := 0 + // fetch actual event + item, err := txn.Get(idx) + if err != nil { + if err == badger.ErrDiscardedTxn { + return err + } + log.Printf("badger: failed to get %x based on prefix %x, index key %x from raw event store: %s\n", + idx, q.prefix, key, err) + return err + } + item.Value(func(val []byte) error { + evt := &nostr.Event{} + if err := nostr_binary.Unmarshal(val, evt); err != nil { + log.Printf("badger: value read error (id %x): %s\n", val[0:32], err) + return err + } - // first pass - emitQueue := make(priorityQueue, 0, len(queries)+limit) - for _, q := range queries { - evt, ok := <-q.results - if ok { - emitQueue = append(emitQueue, &queryEvent{Event: evt, query: q.i}) + // check if this matches the other filters that were not part of the index + if extraFilter == nil || extraFilter.Matches(evt) { + q.results <- evt + } + + return nil + }) } - } - // now it's a good time to schedule this - defer func() { - close(ch) - for _, itclose := range iteratorClosers { - itclose() - } - }() - - // queue may be empty here if we have literally nothing - if len(emitQueue) == 0 { return nil + }) + } + + // max number of events we'll return + limit := b.MaxLimit + if filter.Limit > 0 && filter.Limit < limit { + limit = filter.Limit + } + + // receive results and ensure we only return the most recent ones always + emittedEvents := 0 + + // first pass + emitQueue := make(priorityQueue, 0, len(queries)+limit) + for _, q := range queries { + evt, ok := <-q.results + if ok { + emitQueue = append(emitQueue, &queryEvent{Event: evt, query: q.i}) + } + } + + // queue may be empty here if we have literally nothing + if len(emitQueue) == 0 { + return + } + + heap.Init(&emitQueue) + + // iterate until we've emitted all events required + for { + // emit latest event in queue + latest := emitQueue[0] + ch <- latest.Event + + // stop when reaching limit + emittedEvents++ + if emittedEvents == limit { + break } - heap.Init(&emitQueue) + // fetch a new one from query results and replace the previous one with it + if evt, ok := <-queries[latest.query].results; ok { + emitQueue[0].Event = evt + heap.Fix(&emitQueue, 0) + } else { + // if this query has no more events we just remove this and proceed normally + heap.Remove(&emitQueue, 0) - // iterate until we've emitted all events required - for { - // emit latest event in queue - latest := emitQueue[0] - ch <- latest.Event - - // stop when reaching limit - emittedEvents++ - if emittedEvents == limit { + // check if the list is empty and end + if len(emitQueue) == 0 { break } - - // fetch a new one from query results and replace the previous one with it - if evt, ok := <-queries[latest.query].results; ok { - emitQueue[0].Event = evt - heap.Fix(&emitQueue, 0) - } else { - // if this query has no more events we just remove this and proceed normally - heap.Remove(&emitQueue, 0) - - // check if the list is empty and end - if len(emitQueue) == 0 { - break - } - } } - - return nil - }) - if err != nil { - log.Printf("badger: query txn error: %s\n", err) } }()