diff --git a/badger/query.go b/badger/query.go index 9618233..e26e9f8 100644 --- a/badger/query.go +++ b/badger/query.go @@ -127,7 +127,11 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch for { // emit latest event in queue latest := emitQueue[0] - ch <- latest.Event + select { + case ch <- latest.Event: + case <-ctx.Done(): + return + } // stop when reaching limit emittedEvents++ diff --git a/bolt/query.go b/bolt/query.go index 36c5367..649bdc4 100644 --- a/bolt/query.go +++ b/bolt/query.go @@ -114,7 +114,11 @@ func (b *BoltBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha for { // emit latest event in queue latest := emitQueue[0] - ch <- latest.Event + select { + case ch <- latest.Event: + case <-ctx.Done(): + return + } // stop when reaching limit emittedEvents++ diff --git a/elasticsearch/query.go b/elasticsearch/query.go index fb29f09..3942199 100644 --- a/elasticsearch/query.go +++ b/elasticsearch/query.go @@ -123,9 +123,15 @@ func (ess *ElasticsearchStorage) QueryEvents(ctx context.Context, filter nostr.F // optimization: get by id if isGetByID(filter) { if evts, err := ess.getByID(filter); err == nil { - for _, evt := range evts { - ch <- evt - } + go func() { + for _, evt := range evts { + select { + case ch <- evt: + case <-ctx.Done(): + return + } + } + }() close(ch) } else { return nil, fmt.Errorf("error getting by id: %w", err) diff --git a/lmdb/query.go b/lmdb/query.go index 1c71fee..5bac868 100644 --- a/lmdb/query.go +++ b/lmdb/query.go @@ -146,7 +146,11 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha for { // emit latest event in queue latest := emitQueue[0] - ch <- latest.Event + select { + case ch <- latest.Event: + case <-ctx.Done(): + return + } // stop when reaching limit emittedEvents++ diff --git a/mysql/query.go b/mysql/query.go index c94546d..a2e217f 100644 --- a/mysql/query.go +++ b/mysql/query.go @@ -37,7 +37,11 @@ func (b MySQLBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch return } evt.CreatedAt = nostr.Timestamp(timestamp) - ch <- &evt + select { + case ch <- &evt: + case <-ctx.Done(): + return + } } }() diff --git a/postgresql/query.go b/postgresql/query.go index 1cc99fe..30c82be 100644 --- a/postgresql/query.go +++ b/postgresql/query.go @@ -34,7 +34,11 @@ func (b PostgresBackend) QueryEvents(ctx context.Context, filter nostr.Filter) ( return } evt.CreatedAt = nostr.Timestamp(timestamp) - ch <- &evt + select { + case ch <- &evt: + case <-ctx.Done(): + return + } } }() diff --git a/slicestore/lib.go b/slicestore/lib.go index 518e8dc..caea9e1 100644 --- a/slicestore/lib.go +++ b/slicestore/lib.go @@ -55,7 +55,11 @@ func (b *SliceStore) QueryEvents(ctx context.Context, filter nostr.Filter) (chan break } if filter.Matches(event) { - ch <- event + select { + case ch <- event: + case <-ctx.Done(): + return + } count++ } } diff --git a/sqlite3/query.go b/sqlite3/query.go index dfbfe7a..2bd33dc 100644 --- a/sqlite3/query.go +++ b/sqlite3/query.go @@ -34,7 +34,11 @@ func (b SQLite3Backend) QueryEvents(ctx context.Context, filter nostr.Filter) (c return } evt.CreatedAt = nostr.Timestamp(timestamp) - ch <- &evt + select { + case ch <- &evt: + case <-ctx.Done(): + return + } } }()