fixes memory problem with many pubkeys in query
This commit is contained in:
@@ -129,6 +129,87 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||
)
|
||||
},
|
||||
)
|
||||
|
||||
// Process large author lists by breaking them into chunks
|
||||
if f.Authors != nil && f.Authors.Len() > 50 {
|
||||
log.W.F("REQ %s: breaking down large author list (%d authors) into chunks", env.Subscription, f.Authors.Len())
|
||||
|
||||
// Calculate chunk size based on kinds to avoid OOM
|
||||
chunkSize := 50
|
||||
if f.Kinds != nil && f.Kinds.Len() > 0 {
|
||||
// Reduce chunk size if there are multiple kinds to prevent too many index ranges
|
||||
chunkSize = 50 / f.Kinds.Len()
|
||||
if chunkSize < 10 {
|
||||
chunkSize = 10 // Minimum chunk size
|
||||
}
|
||||
}
|
||||
|
||||
// Process authors in chunks
|
||||
for i := 0; i < f.Authors.Len(); i += chunkSize {
|
||||
end := i + chunkSize
|
||||
if end > f.Authors.Len() {
|
||||
end = f.Authors.Len()
|
||||
}
|
||||
|
||||
// Create a chunk filter
|
||||
chunkAuthors := tag.NewFromBytesSlice(f.Authors.T[i:end]...)
|
||||
chunkFilter := &filter.F{
|
||||
Kinds: f.Kinds,
|
||||
Authors: chunkAuthors,
|
||||
Ids: f.Ids,
|
||||
Tags: f.Tags,
|
||||
Since: f.Since,
|
||||
Until: f.Until,
|
||||
Limit: f.Limit,
|
||||
Search: f.Search,
|
||||
}
|
||||
|
||||
log.T.F("REQ %s: processing chunk %d-%d of %d authors", env.Subscription, i+1, end, f.Authors.Len())
|
||||
|
||||
// Process this chunk
|
||||
var chunkEvents event.S
|
||||
showAllVersions := false
|
||||
if chunkFilter.Tags != nil {
|
||||
if showAllTag := chunkFilter.Tags.GetFirst([]byte("show_all_versions")); showAllTag != nil {
|
||||
if string(showAllTag.Value()) == "true" {
|
||||
showAllVersions = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if showAllVersions {
|
||||
if chunkEvents, err = l.QueryAllVersions(queryCtx, chunkFilter); chk.E(err) {
|
||||
if errors.Is(err, badger.ErrDBClosed) {
|
||||
return
|
||||
}
|
||||
log.E.F("QueryAllVersions failed for chunk filter: %v", err)
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
if chunkEvents, err = l.QueryEvents(queryCtx, chunkFilter); chk.E(err) {
|
||||
if errors.Is(err, badger.ErrDBClosed) {
|
||||
return
|
||||
}
|
||||
log.E.F("QueryEvents failed for chunk filter: %v", err)
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Add chunk results to overall results
|
||||
allEvents = append(allEvents, chunkEvents...)
|
||||
|
||||
// Check if we've hit the limit
|
||||
if f.Limit != nil && len(allEvents) >= int(*f.Limit) {
|
||||
log.T.F("REQ %s: reached limit of %d events, stopping chunk processing", env.Subscription, *f.Limit)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Skip the normal processing since we handled it in chunks
|
||||
continue
|
||||
}
|
||||
}
|
||||
if f != nil && pointers.Present(f.Limit) {
|
||||
if *f.Limit == 0 {
|
||||
@@ -136,13 +217,35 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||
}
|
||||
}
|
||||
var filterEvents event.S
|
||||
if filterEvents, err = l.QueryEvents(queryCtx, f); chk.E(err) {
|
||||
if errors.Is(err, badger.ErrDBClosed) {
|
||||
return
|
||||
// Check if the filter has the special "show_all_versions" tag
|
||||
showAllVersions := false
|
||||
if f.Tags != nil {
|
||||
if showAllTag := f.Tags.GetFirst([]byte("show_all_versions")); showAllTag != nil {
|
||||
if string(showAllTag.Value()) == "true" {
|
||||
showAllVersions = true
|
||||
log.T.F("REQ %s: detected show_all_versions tag, using QueryAllVersions", env.Subscription)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if showAllVersions {
|
||||
if filterEvents, err = l.QueryAllVersions(queryCtx, f); chk.E(err) {
|
||||
if errors.Is(err, badger.ErrDBClosed) {
|
||||
return
|
||||
}
|
||||
log.E.F("QueryAllVersions failed for filter: %v", err)
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
if filterEvents, err = l.QueryEvents(queryCtx, f); chk.E(err) {
|
||||
if errors.Is(err, badger.ErrDBClosed) {
|
||||
return
|
||||
}
|
||||
log.E.F("QueryEvents failed for filter: %v", err)
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
log.E.F("QueryEvents failed for filter: %v", err)
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
// Append events from this filter to the overall collection
|
||||
allEvents = append(allEvents, filterEvents...)
|
||||
@@ -275,10 +378,28 @@ privCheck:
|
||||
events = policyFilteredEvents
|
||||
}
|
||||
|
||||
// Deduplicate events (in case chunk processing returned duplicates)
|
||||
if len(allEvents) > 0 {
|
||||
seen := make(map[string]struct{})
|
||||
var deduplicatedEvents event.S
|
||||
originalCount := len(allEvents)
|
||||
for _, ev := range allEvents {
|
||||
eventID := hexenc.Enc(ev.ID)
|
||||
if _, exists := seen[eventID]; !exists {
|
||||
seen[eventID] = struct{}{}
|
||||
deduplicatedEvents = append(deduplicatedEvents, ev)
|
||||
}
|
||||
}
|
||||
allEvents = deduplicatedEvents
|
||||
if originalCount != len(allEvents) {
|
||||
log.T.F("REQ %s: deduplicated %d events to %d unique events", env.Subscription, originalCount, len(allEvents))
|
||||
}
|
||||
}
|
||||
|
||||
// Apply managed ACL filtering for read access if managed ACL is active
|
||||
if acl.Registry.Active.Load() == "managed" {
|
||||
var aclFilteredEvents event.S
|
||||
for _, ev := range events {
|
||||
for _, ev := range allEvents {
|
||||
// Check if event is banned
|
||||
eventID := hex.EncodeToString(ev.ID)
|
||||
if banned, err := l.getManagedACL().IsEventBanned(eventID); err == nil && banned {
|
||||
@@ -304,11 +425,11 @@ privCheck:
|
||||
|
||||
aclFilteredEvents = append(aclFilteredEvents, ev)
|
||||
}
|
||||
events = aclFilteredEvents
|
||||
allEvents = aclFilteredEvents
|
||||
}
|
||||
|
||||
seen := make(map[string]struct{})
|
||||
for _, ev := range events {
|
||||
for _, ev := range allEvents {
|
||||
log.T.C(
|
||||
func() string {
|
||||
return fmt.Sprintf(
|
||||
|
||||
Reference in New Issue
Block a user