Merge pull request #12 from fiatjaf/broken-connectons-locking
prevent deadlocking when `QueryEvents()` is not read fully
This commit is contained in:
@@ -151,7 +151,11 @@ func (b BadgerBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
|
||||
for {
|
||||
// emit latest event in queue
|
||||
latest := emitQueue[0]
|
||||
ch <- latest.Event
|
||||
select {
|
||||
case ch <- latest.Event:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
// stop when reaching limit
|
||||
emittedEvents++
|
||||
|
||||
@@ -127,7 +127,11 @@ func (b *BoltBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
|
||||
for {
|
||||
// emit latest event in queue
|
||||
latest := emitQueue[0]
|
||||
ch <- latest.Event
|
||||
select {
|
||||
case ch <- latest.Event:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
// stop when reaching limit
|
||||
emittedEvents++
|
||||
|
||||
@@ -123,10 +123,16 @@ func (ess *ElasticsearchStorage) QueryEvents(ctx context.Context, filter nostr.F
|
||||
// optimization: get by id
|
||||
if isGetByID(filter) {
|
||||
if evts, err := ess.getByID(filter); err == nil {
|
||||
for _, evt := range evts {
|
||||
ch <- evt
|
||||
}
|
||||
close(ch)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
for _, evt := range evts {
|
||||
select {
|
||||
case ch <- evt:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
return nil, fmt.Errorf("error getting by id: %w", err)
|
||||
}
|
||||
|
||||
@@ -161,7 +161,11 @@ func (b *LMDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (cha
|
||||
for {
|
||||
// emit latest event in queue
|
||||
latest := emitQueue[0]
|
||||
ch <- latest.Event
|
||||
select {
|
||||
case ch <- latest.Event:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
// stop when reaching limit
|
||||
emittedEvents++
|
||||
|
||||
@@ -40,6 +40,7 @@ func (b MySQLBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (ch
|
||||
select {
|
||||
case ch <- &evt:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -37,6 +37,7 @@ func (b PostgresBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (
|
||||
select {
|
||||
case ch <- &evt:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -56,7 +56,11 @@ func (b *SliceStore) QueryEvents(ctx context.Context, filter nostr.Filter) (chan
|
||||
}
|
||||
|
||||
if filter.Matches(event) {
|
||||
ch <- event
|
||||
select {
|
||||
case ch <- event:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,6 +37,7 @@ func (b SQLite3Backend) QueryEvents(ctx context.Context, filter nostr.Filter) (c
|
||||
select {
|
||||
case ch <- &evt:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
Reference in New Issue
Block a user