Compare commits

...

1 Commits

Author SHA1 Message Date
woikos
4522081506 Fix memory exhaustion from concurrent Badger queries (v0.52.17)
Some checks are pending
Go / build-and-release (push) Waiting to run
- Add query semaphore limiting concurrent database queries to 3
- Reuse single iterator in FetchEventsBySerials instead of one per serial
- Disable expensive e-tag fallback lookup in deletion processing
- Add empty ID validation in access tracker loop
- Reduce log spam from GetSerialById empty ID errors

These changes reduce memory usage from ~5GB to ~150MB under load by
limiting concurrent Badger iterators which consume significant memory.

Files modified:
- pkg/database/database.go: Add query semaphore with acquire/release
- pkg/database/fetch-events-by-serials.go: Reuse iterator for sev lookups
- pkg/database/query-events.go: Add semaphore, disable e-tag fallback
- pkg/database/get-serial-by-id.go: Don't log empty ID errors
- app/handle-req.go: Validate event ID before GetSerialById
- pkg/version/version: Bump to v0.52.17

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 17:23:30 +01:00
6 changed files with 118 additions and 15 deletions

View File

@@ -734,6 +734,10 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
if l.accessTracker != nil && len(events) > 0 {
go func(evts event.S, connID string) {
for _, ev := range evts {
// Validate event ID before calling GetSerialById
if len(ev.ID) != 32 {
continue
}
if ser, err := l.DB.GetSerialById(ev.ID); err == nil && ser != nil {
l.accessTracker.RecordAccess(ser.Get(), connID)
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/dgraph-io/badger/v4/options"
"lol.mleku.dev"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database/querycache"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter"
@@ -20,6 +21,12 @@ import (
"git.mleku.dev/mleku/nostr/utils/units"
)
// DefaultMaxConcurrentQueries limits concurrent database queries to prevent memory exhaustion.
// Each query creates Badger iterators that consume significant memory. With many concurrent
// connections, unlimited queries can exhaust available memory in seconds.
// Set very low (3) because each query can internally create many iterators.
const DefaultMaxConcurrentQueries = 3
// RateLimiterInterface defines the minimal interface for rate limiting during import
type RateLimiterInterface interface {
IsEnabled() bool
@@ -47,6 +54,10 @@ type D struct {
// Rate limiter for controlling memory pressure during bulk operations
rateLimiter RateLimiterInterface
// Query semaphore limits concurrent database queries to prevent memory exhaustion.
// Each query with iterators consumes significant memory; this prevents OOM under load.
querySem chan struct{}
}
// SetRateLimiter sets the rate limiter for controlling memory during import/export
@@ -139,6 +150,7 @@ func NewWithConfig(
ready: make(chan struct{}),
queryCache: qc,
serialCache: NewSerialCache(serialCachePubkeys, serialCacheEventIds),
querySem: make(chan struct{}, DefaultMaxConcurrentQueries),
}
// Ensure the data directory exists
@@ -353,3 +365,24 @@ func (d *D) Close() (err error) {
}
return
}
// AcquireQuerySlot acquires a slot from the query semaphore to limit concurrent queries.
// This blocks until a slot is available or the context is cancelled.
// Returns true if slot was acquired, false if context cancelled.
func (d *D) AcquireQuerySlot(ctx context.Context) bool {
select {
case d.querySem <- struct{}{}:
return true
case <-ctx.Done():
return false
}
}
// ReleaseQuerySlot releases a previously acquired query slot.
func (d *D) ReleaseQuerySlot() {
select {
case <-d.querySem:
default:
log.W.F("ReleaseQuerySlot called without matching AcquireQuerySlot")
}
}

View File

@@ -33,6 +33,14 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev
if err = d.View(
func(txn *badger.Txn) (err error) {
// Create ONE iterator for sev prefix lookups - reused for all serials
// This dramatically reduces memory usage vs creating one per serial
sevOpts := badger.DefaultIteratorOptions
sevOpts.PrefetchValues = false // We read from key, not value
sevOpts.PrefetchSize = 1
sevIt := txn.NewIterator(sevOpts)
defer sevIt.Close()
for _, ser := range serials {
var ev *event.E
serialVal := ser.Get()
@@ -45,8 +53,8 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev
}
err = nil // Reset error, try legacy formats
// Try sev (small event inline) prefix - legacy Reiser4 optimization
ev, err = d.fetchSmallEvent(txn, ser)
// Try sev (small event inline) using shared iterator
ev, err = d.fetchSmallEventWithIterator(txn, ser, sevIt)
if err == nil && ev != nil {
events[serialVal] = ev
continue
@@ -70,6 +78,61 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev
return events, nil
}
// fetchSmallEventWithIterator uses a provided iterator for sev lookups (memory efficient)
func (d *D) fetchSmallEventWithIterator(txn *badger.Txn, ser *types.Uint40, it *badger.Iterator) (ev *event.E, err error) {
smallBuf := new(bytes.Buffer)
if err = indexes.SmallEventEnc(ser).MarshalWrite(smallBuf); chk.E(err) {
return nil, err
}
prefix := smallBuf.Bytes()
// Seek to the prefix for this serial
it.Seek(prefix)
if !it.ValidForPrefix(prefix) {
return nil, nil // Not found
}
// Found in sev table - extract inline data
key := it.Item().KeyCopy(nil) // Copy key as iterator may be reused
// Key format: sev|serial|size_uint16|event_data
if len(key) <= 8+2 { // prefix(3) + serial(5) + size(2) = 10 bytes minimum
return nil, nil
}
sizeIdx := 8 // After sev(3) + serial(5)
// Read uint16 big-endian size
size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1])
dataStart := sizeIdx + 2
if len(key) < dataStart+size {
return nil, nil
}
eventData := key[dataStart : dataStart+size]
// Check if this is compact format (starts with version byte 1)
if len(eventData) > 0 && eventData[0] == CompactFormatVersion {
// This is compact format stored in sev - need to decode with resolver
resolver := NewDatabaseSerialResolver(d, d.serialCache)
eventId, idErr := d.GetEventIdBySerial(ser)
if idErr != nil {
// Cannot decode compact format without event ID - return error
// DO NOT fall back to legacy unmarshal as compact format is not valid legacy format
log.W.F("fetchSmallEventWithIterator: compact format but no event ID mapping for serial %d: %v", ser.Get(), idErr)
return nil, idErr
}
return UnmarshalCompactEvent(eventData, eventId, resolver)
}
// Legacy binary format
ev = new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err != nil {
return nil, err
}
return ev, nil
}
// fetchCompactEvent tries to fetch an event from the compact format (cmp prefix).
func (d *D) fetchCompactEvent(txn *badger.Txn, ser *types.Uint40, resolver SerialResolver) (ev *event.E, err error) {
// Build cmp key

View File

@@ -21,7 +21,8 @@ import (
func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
// log.T.F("GetSerialById: input id=%s", hex.Enc(id))
if len(id) == 0 {
err = errorf.E("GetSerialById: called with empty ID")
// Return error without logging - caller should validate ID before calling
err = errorf.E("empty event ID")
return
}
var idxs []Range

View File

@@ -54,6 +54,13 @@ func (d *D) QueryAllVersions(c context.Context, f *filter.F) (
func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDeleteEvents bool, showAllVersions bool) (
evs event.S, err error,
) {
// Acquire query slot to limit concurrent queries and prevent memory exhaustion
if !d.AcquireQuerySlot(c) {
err = c.Err()
return
}
defer d.ReleaseQuerySlot()
// Determine if we should return multiple versions of replaceable events
// based on the limit parameter
wantMultipleVersions := showAllVersions || (f.Limit != nil && *f.Limit > 1)
@@ -310,18 +317,13 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
}
}
// If not found in current batch, try to fetch it directly
// DISABLED: Fetching target events not in current batch causes memory
// exhaustion under high concurrent load. Each GetSerialById call creates
// a Badger iterator that consumes significant memory. With many connections
// processing deletion events, this explodes memory usage (~300MB/connection).
// The deletion will still work when the target event is directly queried.
if targetEv == nil {
// Get serial for the event ID
ser, serErr := d.GetSerialById(evId)
if serErr != nil || ser == nil {
continue
}
// Fetch the event by serial
targetEv, serErr = d.FetchEventBySerial(ser)
if serErr != nil || targetEv == nil {
continue
}
continue
}
// Only allow users to delete their own events

View File

@@ -1 +1 @@
v0.52.12
v0.52.17