From 4c1b0497dfc1bb8050b3d0766322289225364a54 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Thu, 18 Apr 2024 20:32:58 -0300 Subject: [PATCH] lmdb/badger/bolt: actually stop the cursor when we have pulled enough. --- badger/query.go | 16 ++++++++++++---- bolt/query.go | 9 +++++---- lmdb/query.go | 13 ++++++------- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/badger/query.go b/badger/query.go index eaf2e74..de1a9e0 100644 --- a/badger/query.go +++ b/badger/query.go @@ -5,6 +5,7 @@ import ( "context" "encoding/binary" "encoding/hex" + "errors" "fmt" "log" @@ -26,6 +27,8 @@ type queryEvent struct { query int } +var exit = errors.New("exit") + func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { ch := make(chan *nostr.Event) @@ -87,7 +90,8 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch idx, q.prefix, key, err) return err } - item.Value(func(val []byte) error { + + if err := item.Value(func(val []byte) error { evt := &nostr.Event{} if err := nostr_binary.Unmarshal(val, evt); err != nil { log.Printf("badger: value read error (id %x): %s\n", val[0:32], err) @@ -100,15 +104,19 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch case q.results <- evt: pulled++ if pulled > limit { - break + return exit } case <-ctx.Done(): - break + return exit } } return nil - }) + }); err == exit { + return nil + } else if err != nil { + return err + } } return nil diff --git a/bolt/query.go b/bolt/query.go index dc657ca..16937d4 100644 --- a/bolt/query.go +++ b/bolt/query.go @@ -69,7 +69,7 @@ func (b *BoltBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha if !q.skipTimestamp { createdAt := binary.BigEndian.Uint32(k[len(k)-4:]) if createdAt < since { - break + return nil } } @@ -78,7 +78,7 @@ func (b *BoltBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha evt := &nostr.Event{} if err := nostr_binary.Unmarshal(val, evt); err != nil { log.Printf("bolt: value read error (id %x): %s\n", val[0:32], err) - break + return fmt.Errorf("error: %w", err) } // check if this matches the other filters that were not part of the index before yielding @@ -87,13 +87,14 @@ func (b *BoltBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha case q.results <- evt: pulled++ if pulled > limit { - break + return nil } case <-ctx.Done(): - break + return nil } } } + return nil }) } diff --git a/lmdb/query.go b/lmdb/query.go index aacd3f3..aabf222 100644 --- a/lmdb/query.go +++ b/lmdb/query.go @@ -84,14 +84,14 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha len(k) != q.prefixSize+q.timestampSize || !bytes.Equal(k[:q.prefixSize], q.prefix) { // either iteration has errored or we reached the end of this prefix - break // stop this cursor and move to the next one + return nil } // "id" indexes don't contain a timestamp if q.timestampSize == 4 { createdAt := binary.BigEndian.Uint32(k[len(k)-4:]) if createdAt < since { - break + return nil } } @@ -101,13 +101,13 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha log.Printf( "lmdb: failed to get %x based on prefix %x, index key %x from raw event store: %s\n", idx, q.prefix, k, err) - break + return fmt.Errorf("error: %w", err) } evt := &nostr.Event{} if err := nostr_binary.Unmarshal(val, evt); err != nil { log.Printf("lmdb: value read error (id %x): %s\n", val[0:32], err) - break + return fmt.Errorf("error: %w", err) } // check if this matches the other filters that were not part of the index before yielding @@ -116,17 +116,16 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha case q.results <- evt: pulled++ if pulled >= limit { - break + return nil } case <-ctx.Done(): - break + return nil } } // move one back (we'll look into k and v and err in the next iteration) k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev) } - return nil }) } if err != nil {