lmdb/badger/bolt: actually stop the cursor when we have pulled enough.
Some checks failed
build cli / make-release (push) Has been cancelled
build cli / build-linux (push) Has been cancelled

This commit is contained in:
fiatjaf
2024-04-18 20:32:58 -03:00
parent fc2884d7aa
commit 4c1b0497df
3 changed files with 23 additions and 15 deletions

View File

@@ -5,6 +5,7 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"errors"
"fmt" "fmt"
"log" "log"
@@ -26,6 +27,8 @@ type queryEvent struct {
query int query int
} }
var exit = errors.New("exit")
func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) { func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
ch := make(chan *nostr.Event) ch := make(chan *nostr.Event)
@@ -87,7 +90,8 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
idx, q.prefix, key, err) idx, q.prefix, key, err)
return err return err
} }
item.Value(func(val []byte) error {
if err := item.Value(func(val []byte) error {
evt := &nostr.Event{} evt := &nostr.Event{}
if err := nostr_binary.Unmarshal(val, evt); err != nil { if err := nostr_binary.Unmarshal(val, evt); err != nil {
log.Printf("badger: value read error (id %x): %s\n", val[0:32], err) log.Printf("badger: value read error (id %x): %s\n", val[0:32], err)
@@ -100,15 +104,19 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
case q.results <- evt: case q.results <- evt:
pulled++ pulled++
if pulled > limit { if pulled > limit {
break return exit
} }
case <-ctx.Done(): case <-ctx.Done():
break return exit
} }
} }
return nil return nil
}) }); err == exit {
return nil
} else if err != nil {
return err
}
} }
return nil return nil

View File

@@ -69,7 +69,7 @@ func (b *BoltBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
if !q.skipTimestamp { if !q.skipTimestamp {
createdAt := binary.BigEndian.Uint32(k[len(k)-4:]) createdAt := binary.BigEndian.Uint32(k[len(k)-4:])
if createdAt < since { if createdAt < since {
break return nil
} }
} }
@@ -78,7 +78,7 @@ func (b *BoltBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
evt := &nostr.Event{} evt := &nostr.Event{}
if err := nostr_binary.Unmarshal(val, evt); err != nil { if err := nostr_binary.Unmarshal(val, evt); err != nil {
log.Printf("bolt: value read error (id %x): %s\n", val[0:32], err) log.Printf("bolt: value read error (id %x): %s\n", val[0:32], err)
break return fmt.Errorf("error: %w", err)
} }
// check if this matches the other filters that were not part of the index before yielding // check if this matches the other filters that were not part of the index before yielding
@@ -87,13 +87,14 @@ func (b *BoltBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
case q.results <- evt: case q.results <- evt:
pulled++ pulled++
if pulled > limit { if pulled > limit {
break return nil
} }
case <-ctx.Done(): case <-ctx.Done():
break return nil
} }
} }
} }
return nil return nil
}) })
} }

View File

@@ -84,14 +84,14 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
len(k) != q.prefixSize+q.timestampSize || len(k) != q.prefixSize+q.timestampSize ||
!bytes.Equal(k[:q.prefixSize], q.prefix) { !bytes.Equal(k[:q.prefixSize], q.prefix) {
// either iteration has errored or we reached the end of this prefix // either iteration has errored or we reached the end of this prefix
break // stop this cursor and move to the next one return nil
} }
// "id" indexes don't contain a timestamp // "id" indexes don't contain a timestamp
if q.timestampSize == 4 { if q.timestampSize == 4 {
createdAt := binary.BigEndian.Uint32(k[len(k)-4:]) createdAt := binary.BigEndian.Uint32(k[len(k)-4:])
if createdAt < since { if createdAt < since {
break return nil
} }
} }
@@ -101,13 +101,13 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
log.Printf( log.Printf(
"lmdb: failed to get %x based on prefix %x, index key %x from raw event store: %s\n", "lmdb: failed to get %x based on prefix %x, index key %x from raw event store: %s\n",
idx, q.prefix, k, err) idx, q.prefix, k, err)
break return fmt.Errorf("error: %w", err)
} }
evt := &nostr.Event{} evt := &nostr.Event{}
if err := nostr_binary.Unmarshal(val, evt); err != nil { if err := nostr_binary.Unmarshal(val, evt); err != nil {
log.Printf("lmdb: value read error (id %x): %s\n", val[0:32], err) log.Printf("lmdb: value read error (id %x): %s\n", val[0:32], err)
break return fmt.Errorf("error: %w", err)
} }
// check if this matches the other filters that were not part of the index before yielding // check if this matches the other filters that were not part of the index before yielding
@@ -116,17 +116,16 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
case q.results <- evt: case q.results <- evt:
pulled++ pulled++
if pulled >= limit { if pulled >= limit {
break return nil
} }
case <-ctx.Done(): case <-ctx.Done():
break return nil
} }
} }
// move one back (we'll look into k and v and err in the next iteration) // move one back (we'll look into k and v and err in the next iteration)
k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev) k, idx, iterr = cursor.Get(nil, nil, lmdb.Prev)
} }
return nil
}) })
} }
if err != nil { if err != nil {