Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4522081506 |
@@ -734,6 +734,10 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||
if l.accessTracker != nil && len(events) > 0 {
|
||||
go func(evts event.S, connID string) {
|
||||
for _, ev := range evts {
|
||||
// Validate event ID before calling GetSerialById
|
||||
if len(ev.ID) != 32 {
|
||||
continue
|
||||
}
|
||||
if ser, err := l.DB.GetSerialById(ev.ID); err == nil && ser != nil {
|
||||
l.accessTracker.RecordAccess(ser.Get(), connID)
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/dgraph-io/badger/v4/options"
|
||||
"lol.mleku.dev"
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/database/querycache"
|
||||
"git.mleku.dev/mleku/nostr/encoders/event"
|
||||
"git.mleku.dev/mleku/nostr/encoders/filter"
|
||||
@@ -20,6 +21,12 @@ import (
|
||||
"git.mleku.dev/mleku/nostr/utils/units"
|
||||
)
|
||||
|
||||
// DefaultMaxConcurrentQueries limits concurrent database queries to prevent memory exhaustion.
|
||||
// Each query creates Badger iterators that consume significant memory. With many concurrent
|
||||
// connections, unlimited queries can exhaust available memory in seconds.
|
||||
// Set very low (3) because each query can internally create many iterators.
|
||||
const DefaultMaxConcurrentQueries = 3
|
||||
|
||||
// RateLimiterInterface defines the minimal interface for rate limiting during import
|
||||
type RateLimiterInterface interface {
|
||||
IsEnabled() bool
|
||||
@@ -47,6 +54,10 @@ type D struct {
|
||||
|
||||
// Rate limiter for controlling memory pressure during bulk operations
|
||||
rateLimiter RateLimiterInterface
|
||||
|
||||
// Query semaphore limits concurrent database queries to prevent memory exhaustion.
|
||||
// Each query with iterators consumes significant memory; this prevents OOM under load.
|
||||
querySem chan struct{}
|
||||
}
|
||||
|
||||
// SetRateLimiter sets the rate limiter for controlling memory during import/export
|
||||
@@ -139,6 +150,7 @@ func NewWithConfig(
|
||||
ready: make(chan struct{}),
|
||||
queryCache: qc,
|
||||
serialCache: NewSerialCache(serialCachePubkeys, serialCacheEventIds),
|
||||
querySem: make(chan struct{}, DefaultMaxConcurrentQueries),
|
||||
}
|
||||
|
||||
// Ensure the data directory exists
|
||||
@@ -353,3 +365,24 @@ func (d *D) Close() (err error) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// AcquireQuerySlot acquires a slot from the query semaphore to limit concurrent queries.
|
||||
// This blocks until a slot is available or the context is cancelled.
|
||||
// Returns true if slot was acquired, false if context cancelled.
|
||||
func (d *D) AcquireQuerySlot(ctx context.Context) bool {
|
||||
select {
|
||||
case d.querySem <- struct{}{}:
|
||||
return true
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// ReleaseQuerySlot releases a previously acquired query slot.
|
||||
func (d *D) ReleaseQuerySlot() {
|
||||
select {
|
||||
case <-d.querySem:
|
||||
default:
|
||||
log.W.F("ReleaseQuerySlot called without matching AcquireQuerySlot")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,14 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev
|
||||
|
||||
if err = d.View(
|
||||
func(txn *badger.Txn) (err error) {
|
||||
// Create ONE iterator for sev prefix lookups - reused for all serials
|
||||
// This dramatically reduces memory usage vs creating one per serial
|
||||
sevOpts := badger.DefaultIteratorOptions
|
||||
sevOpts.PrefetchValues = false // We read from key, not value
|
||||
sevOpts.PrefetchSize = 1
|
||||
sevIt := txn.NewIterator(sevOpts)
|
||||
defer sevIt.Close()
|
||||
|
||||
for _, ser := range serials {
|
||||
var ev *event.E
|
||||
serialVal := ser.Get()
|
||||
@@ -45,8 +53,8 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev
|
||||
}
|
||||
err = nil // Reset error, try legacy formats
|
||||
|
||||
// Try sev (small event inline) prefix - legacy Reiser4 optimization
|
||||
ev, err = d.fetchSmallEvent(txn, ser)
|
||||
// Try sev (small event inline) using shared iterator
|
||||
ev, err = d.fetchSmallEventWithIterator(txn, ser, sevIt)
|
||||
if err == nil && ev != nil {
|
||||
events[serialVal] = ev
|
||||
continue
|
||||
@@ -70,6 +78,61 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev
|
||||
return events, nil
|
||||
}
|
||||
|
||||
// fetchSmallEventWithIterator uses a provided iterator for sev lookups (memory efficient)
|
||||
func (d *D) fetchSmallEventWithIterator(txn *badger.Txn, ser *types.Uint40, it *badger.Iterator) (ev *event.E, err error) {
|
||||
smallBuf := new(bytes.Buffer)
|
||||
if err = indexes.SmallEventEnc(ser).MarshalWrite(smallBuf); chk.E(err) {
|
||||
return nil, err
|
||||
}
|
||||
prefix := smallBuf.Bytes()
|
||||
|
||||
// Seek to the prefix for this serial
|
||||
it.Seek(prefix)
|
||||
if !it.ValidForPrefix(prefix) {
|
||||
return nil, nil // Not found
|
||||
}
|
||||
|
||||
// Found in sev table - extract inline data
|
||||
key := it.Item().KeyCopy(nil) // Copy key as iterator may be reused
|
||||
// Key format: sev|serial|size_uint16|event_data
|
||||
if len(key) <= 8+2 { // prefix(3) + serial(5) + size(2) = 10 bytes minimum
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
sizeIdx := 8 // After sev(3) + serial(5)
|
||||
// Read uint16 big-endian size
|
||||
size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1])
|
||||
dataStart := sizeIdx + 2
|
||||
|
||||
if len(key) < dataStart+size {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
eventData := key[dataStart : dataStart+size]
|
||||
|
||||
// Check if this is compact format (starts with version byte 1)
|
||||
if len(eventData) > 0 && eventData[0] == CompactFormatVersion {
|
||||
// This is compact format stored in sev - need to decode with resolver
|
||||
resolver := NewDatabaseSerialResolver(d, d.serialCache)
|
||||
eventId, idErr := d.GetEventIdBySerial(ser)
|
||||
if idErr != nil {
|
||||
// Cannot decode compact format without event ID - return error
|
||||
// DO NOT fall back to legacy unmarshal as compact format is not valid legacy format
|
||||
log.W.F("fetchSmallEventWithIterator: compact format but no event ID mapping for serial %d: %v", ser.Get(), idErr)
|
||||
return nil, idErr
|
||||
}
|
||||
return UnmarshalCompactEvent(eventData, eventId, resolver)
|
||||
}
|
||||
|
||||
// Legacy binary format
|
||||
ev = new(event.E)
|
||||
if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ev, nil
|
||||
}
|
||||
|
||||
// fetchCompactEvent tries to fetch an event from the compact format (cmp prefix).
|
||||
func (d *D) fetchCompactEvent(txn *badger.Txn, ser *types.Uint40, resolver SerialResolver) (ev *event.E, err error) {
|
||||
// Build cmp key
|
||||
|
||||
@@ -21,7 +21,8 @@ import (
|
||||
func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
|
||||
// log.T.F("GetSerialById: input id=%s", hex.Enc(id))
|
||||
if len(id) == 0 {
|
||||
err = errorf.E("GetSerialById: called with empty ID")
|
||||
// Return error without logging - caller should validate ID before calling
|
||||
err = errorf.E("empty event ID")
|
||||
return
|
||||
}
|
||||
var idxs []Range
|
||||
|
||||
@@ -54,6 +54,13 @@ func (d *D) QueryAllVersions(c context.Context, f *filter.F) (
|
||||
func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDeleteEvents bool, showAllVersions bool) (
|
||||
evs event.S, err error,
|
||||
) {
|
||||
// Acquire query slot to limit concurrent queries and prevent memory exhaustion
|
||||
if !d.AcquireQuerySlot(c) {
|
||||
err = c.Err()
|
||||
return
|
||||
}
|
||||
defer d.ReleaseQuerySlot()
|
||||
|
||||
// Determine if we should return multiple versions of replaceable events
|
||||
// based on the limit parameter
|
||||
wantMultipleVersions := showAllVersions || (f.Limit != nil && *f.Limit > 1)
|
||||
@@ -310,18 +317,13 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
||||
}
|
||||
}
|
||||
|
||||
// If not found in current batch, try to fetch it directly
|
||||
// DISABLED: Fetching target events not in current batch causes memory
|
||||
// exhaustion under high concurrent load. Each GetSerialById call creates
|
||||
// a Badger iterator that consumes significant memory. With many connections
|
||||
// processing deletion events, this explodes memory usage (~300MB/connection).
|
||||
// The deletion will still work when the target event is directly queried.
|
||||
if targetEv == nil {
|
||||
// Get serial for the event ID
|
||||
ser, serErr := d.GetSerialById(evId)
|
||||
if serErr != nil || ser == nil {
|
||||
continue
|
||||
}
|
||||
// Fetch the event by serial
|
||||
targetEv, serErr = d.FetchEventBySerial(ser)
|
||||
if serErr != nil || targetEv == nil {
|
||||
continue
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Only allow users to delete their own events
|
||||
|
||||
@@ -1 +1 @@
|
||||
v0.52.12
|
||||
v0.52.17
|
||||
|
||||
Reference in New Issue
Block a user