prevent relay queriers that exit before EOSE from locking the QueryEvents() function forever.
This commit is contained in:
@@ -127,7 +127,11 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
|
||||
for {
|
||||
// emit latest event in queue
|
||||
latest := emitQueue[0]
|
||||
ch <- latest.Event
|
||||
select {
|
||||
case ch <- latest.Event:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
// stop when reaching limit
|
||||
emittedEvents++
|
||||
|
||||
@@ -114,7 +114,11 @@ func (b *BoltBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
|
||||
for {
|
||||
// emit latest event in queue
|
||||
latest := emitQueue[0]
|
||||
ch <- latest.Event
|
||||
select {
|
||||
case ch <- latest.Event:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
// stop when reaching limit
|
||||
emittedEvents++
|
||||
|
||||
@@ -123,9 +123,15 @@ func (ess *ElasticsearchStorage) QueryEvents(ctx context.Context, filter nostr.F
|
||||
// optimization: get by id
|
||||
if isGetByID(filter) {
|
||||
if evts, err := ess.getByID(filter); err == nil {
|
||||
for _, evt := range evts {
|
||||
ch <- evt
|
||||
}
|
||||
go func() {
|
||||
for _, evt := range evts {
|
||||
select {
|
||||
case ch <- evt:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
close(ch)
|
||||
} else {
|
||||
return nil, fmt.Errorf("error getting by id: %w", err)
|
||||
|
||||
@@ -146,7 +146,11 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
|
||||
for {
|
||||
// emit latest event in queue
|
||||
latest := emitQueue[0]
|
||||
ch <- latest.Event
|
||||
select {
|
||||
case ch <- latest.Event:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
// stop when reaching limit
|
||||
emittedEvents++
|
||||
|
||||
@@ -37,7 +37,11 @@ func (b MySQLBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
|
||||
return
|
||||
}
|
||||
evt.CreatedAt = nostr.Timestamp(timestamp)
|
||||
ch <- &evt
|
||||
select {
|
||||
case ch <- &evt:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -34,7 +34,11 @@ func (b PostgresBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (
|
||||
return
|
||||
}
|
||||
evt.CreatedAt = nostr.Timestamp(timestamp)
|
||||
ch <- &evt
|
||||
select {
|
||||
case ch <- &evt:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -55,7 +55,11 @@ func (b *SliceStore) QueryEvents(ctx context.Context, filter nostr.Filter) (chan
|
||||
break
|
||||
}
|
||||
if filter.Matches(event) {
|
||||
ch <- event
|
||||
select {
|
||||
case ch <- event:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,7 +34,11 @@ func (b SQLite3Backend) QueryEvents(ctx context.Context, filter nostr.Filter) (c
|
||||
return
|
||||
}
|
||||
evt.CreatedAt = nostr.Timestamp(timestamp)
|
||||
ch <- &evt
|
||||
select {
|
||||
case ch <- &evt:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user