From 3777e2f8f67e03c8fb836db7882aa243d379680f Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Thu, 7 Dec 2023 11:36:23 -0300 Subject: [PATCH] lmdb: do not use goroutines for each query. thanks to @wojas at https://github.com/PowerDNS/lmdb-go/issues/28#issuecomment-1845056613 --- go.mod | 1 + go.sum | 1 + lmdb/query.go | 258 ++++++++++++++++++++++++-------------------------- 3 files changed, 125 insertions(+), 135 deletions(-) diff --git a/go.mod b/go.mod index 8d85c3c..8f1bb2c 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/urfave/cli/v2 v2.25.7 golang.org/x/exp v0.0.0-20231006140011-7918f672742d + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 ) require ( diff --git a/go.sum b/go.sum index bdffb76..48efbf9 100644 --- a/go.sum +++ b/go.sum @@ -179,6 +179,7 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/lmdb/query.go b/lmdb/query.go index d589c8e..643501f 100644 --- a/lmdb/query.go +++ b/lmdb/query.go @@ -7,7 +7,7 @@ import ( "encoding/binary" "encoding/hex" "fmt" - "sync" + "log" "github.com/PowerDNS/lmdb-go/lmdb" "github.com/nbd-wtf/go-nostr" @@ -29,157 +29,145 @@ type queryEvent struct { } func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { - ch := make(chan *nostr.Event) - queries, extraFilter, since, err := b.prepareQueries(filter) if err != nil { return nil, err } + ch := make(chan *nostr.Event) go func() { + defer close(ch) + err := b.lmdbEnv.View(func(txn *lmdb.Txn) error { - txn.RawRead = true - wg := sync.WaitGroup{} - wg.Add(len(queries)) - - // actually iterate - cursorClosers := make([]func(), len(queries)) - for i, q := range queries { - go func(i int, q query) { - defer close(q.results) - defer wg.Done() - - cursor, err := txn.OpenCursor(q.dbi) - if err != nil { - return - } - cursorClosers[i] = cursor.Close - - var k []byte - var idx []byte - var iterr error - - if _, _, errsr := cursor.Get(q.startingPoint, nil, lmdb.SetRange); errsr != nil { - if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound { - // in this case it's really an error - panic(err) - } else { - // we're at the end and we just want notes before this, - // so we just need to set the cursor the last key, this is not a real error - k, idx, iterr = cursor.Get(nil, nil, lmdb.Last) - } - } else { - // move one back as the first step - k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev) - } - - for { - select { - case <-ctx.Done(): - break - default: - } - - // we already have a k and a v and an err from the cursor setup, so check and use these - if iterr != nil || !bytes.HasPrefix(k, q.prefix) { - return - } - - // "id" indexes don't contain a timestamp - if !q.skipTimestamp { - createdAt := binary.BigEndian.Uint32(k[len(k)-4:]) - if createdAt < since { - break - } - } - - // fetch actual event - val, err := txn.Get(b.rawEventStore, idx) - if err != nil { - panic(err) - } - - evt := &nostr.Event{} - if err := nostr_binary.Unmarshal(val, evt); err != nil { - panic(err) - } - - // check if this matches the other filters that were not part of the index - if extraFilter == nil || extraFilter.Matches(evt) { - q.results <- evt - } - - // move one back (we'll look into k and v and err in the next iteration) - k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev) - } - }(i, q) - } - - // max number of events we'll return - limit := b.MaxLimit - if filter.Limit > 0 && filter.Limit < limit { - limit = filter.Limit - } - - // receive results and ensure we only return the most recent ones always - emittedEvents := 0 - - // first pass - emitQueue := make(priorityQueue, 0, len(queries)+limit) for _, q := range queries { - evt, ok := <-q.results - if ok { - emitQueue = append(emitQueue, &queryEvent{Event: evt, query: q.i}) + txn.RawRead = true + defer close(q.results) + + cursor, err := txn.OpenCursor(q.dbi) + if err != nil { + return err } - } + defer cursor.Close() - // now it's a good time to schedule this - defer func() { - close(ch) - for _, cclose := range cursorClosers { - cclose() - } - }() + var k []byte + var idx []byte + var iterr error - // queue may be empty here if we have literally nothing - if len(emitQueue) == 0 { - return nil - } - - heap.Init(&emitQueue) - - // iterate until we've emitted all events required - for { - // emit latest event in queue - latest := emitQueue[0] - ch <- latest.Event - - // stop when reaching limit - emittedEvents++ - if emittedEvents >= limit { - break - } - - // fetch a new one from query results and replace the previous one with it - if evt, ok := <-queries[latest.query].results; ok { - emitQueue[0].Event = evt - heap.Fix(&emitQueue, 0) - } else { - // if this query has no more events we just remove this and proceed normally - heap.Remove(&emitQueue, 0) - - // check if the list is empty and end - if len(emitQueue) == 0 { - break + if _, _, errsr := cursor.Get(q.startingPoint, nil, lmdb.SetRange); errsr != nil { + if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound { + // in this case it's really an error + panic(err) + } else { + // we're at the end and we just want notes before this, + // so we just need to set the cursor the last key, this is not a real error + k, idx, iterr = cursor.Get(nil, nil, lmdb.Last) } + } else { + // move one back as the first step + k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev) + } + + for { + select { + case <-ctx.Done(): + break + default: + } + + // we already have a k and a v and an err from the cursor setup, so check and use these + if iterr != nil || !bytes.HasPrefix(k, q.prefix) { + // either iteration has errored or we reached the end of this prefix + break // stop this cursor and move to the next one + } + + // "id" indexes don't contain a timestamp + if !q.skipTimestamp { + createdAt := binary.BigEndian.Uint32(k[len(k)-4:]) + if createdAt < since { + break + } + } + + // fetch actual event + val, err := txn.Get(b.rawEventStore, idx) + if err != nil { + log.Printf( + "lmdb: failed to get %x based on prefix %x, index key %x from raw event store: %s\n", + idx, q.prefix, k, err) + continue + } + + evt := &nostr.Event{} + if err := nostr_binary.Unmarshal(val, evt); err != nil { + log.Printf("lmdb: value read error: %s\n", err) + continue + } + + // check if this matches the other filters that were not part of the index + if extraFilter == nil || extraFilter.Matches(evt) { + q.results <- evt + } + + // move one back (we'll look into k and v and err in the next iteration) + k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev) } } - - wg.Wait() return nil }) if err != nil { - panic(err) + log.Printf("lmdb: error on cursor iteration: %v\n", err) + } + + // max number of events we'll return + limit := b.MaxLimit + if filter.Limit > 0 && filter.Limit < limit { + limit = filter.Limit + } + + // receive results and ensure we only return the most recent ones always + emittedEvents := 0 + + // first pass + emitQueue := make(priorityQueue, 0, len(queries)+limit) + for _, q := range queries { + evt, ok := <-q.results + if ok { + emitQueue = append(emitQueue, &queryEvent{Event: evt, query: q.i}) + } + } + + // queue may be empty here if we have literally nothing + if len(emitQueue) == 0 { + return + } + + heap.Init(&emitQueue) + + // iterate until we've emitted all events required + for { + // emit latest event in queue + latest := emitQueue[0] + ch <- latest.Event + + // stop when reaching limit + emittedEvents++ + if emittedEvents >= limit { + break + } + + // fetch a new one from query results and replace the previous one with it + if evt, ok := <-queries[latest.query].results; ok { + emitQueue[0].Event = evt + heap.Fix(&emitQueue, 0) + } else { + // if this query has no more events we just remove this and proceed normally + heap.Remove(&emitQueue, 0) + + // check if the list is empty and end + if len(emitQueue) == 0 { + break + } + } } }()