Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4522081506 | ||
|
|
a768be22e6 | ||
|
|
779f341dda | ||
|
|
a565244435 | ||
|
|
bb858a0d6f | ||
|
|
b478845e1c |
@@ -146,6 +146,10 @@ type C struct {
|
|||||||
|
|
||||||
// Connection concurrency control
|
// Connection concurrency control
|
||||||
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
|
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
|
||||||
|
MaxConnectionsPerIP int `env:"ORLY_MAX_CONN_PER_IP" default:"25" usage:"max WebSocket connections per IP address (prevents resource exhaustion, hard limit 40)"`
|
||||||
|
|
||||||
|
// Query result limits (prevents memory exhaustion from unbounded queries)
|
||||||
|
QueryResultLimit int `env:"ORLY_QUERY_RESULT_LIMIT" default:"256" usage:"max events returned per REQ filter (prevents unbounded memory usage, 0=unlimited)"`
|
||||||
|
|
||||||
// Adaptive rate limiting (PID-controlled)
|
// Adaptive rate limiting (PID-controlled)
|
||||||
RateLimitEnabled bool `env:"ORLY_RATE_LIMIT_ENABLED" default:"true" usage:"enable adaptive PID-controlled rate limiting for database operations"`
|
RateLimitEnabled bool `env:"ORLY_RATE_LIMIT_ENABLED" default:"true" usage:"enable adaptive PID-controlled rate limiting for database operations"`
|
||||||
@@ -207,8 +211,11 @@ type C struct {
|
|||||||
ArchiveCacheTTLHrs int `env:"ORLY_ARCHIVE_CACHE_TTL_HRS" default:"24" usage:"hours to cache query fingerprints to avoid repeated archive requests"`
|
ArchiveCacheTTLHrs int `env:"ORLY_ARCHIVE_CACHE_TTL_HRS" default:"24" usage:"hours to cache query fingerprints to avoid repeated archive requests"`
|
||||||
|
|
||||||
// Storage management configuration (access-based garbage collection)
|
// Storage management configuration (access-based garbage collection)
|
||||||
|
// TODO: GC implementation needs batch transaction handling to avoid Badger race conditions
|
||||||
|
// TODO: GC should use smaller batches with delays between transactions on large datasets
|
||||||
|
// TODO: GC deletion should be serialized or use transaction pools to prevent concurrent txn issues
|
||||||
MaxStorageBytes int64 `env:"ORLY_MAX_STORAGE_BYTES" default:"0" usage:"maximum storage in bytes (0=auto-detect 80%% of filesystem)"`
|
MaxStorageBytes int64 `env:"ORLY_MAX_STORAGE_BYTES" default:"0" usage:"maximum storage in bytes (0=auto-detect 80%% of filesystem)"`
|
||||||
GCEnabled bool `env:"ORLY_GC_ENABLED" default:"true" usage:"enable continuous garbage collection based on access patterns"`
|
GCEnabled bool `env:"ORLY_GC_ENABLED" default:"false" usage:"enable continuous garbage collection based on access patterns (EXPERIMENTAL - may cause crashes under load)"`
|
||||||
GCIntervalSec int `env:"ORLY_GC_INTERVAL_SEC" default:"60" usage:"seconds between GC runs when storage exceeds limit"`
|
GCIntervalSec int `env:"ORLY_GC_INTERVAL_SEC" default:"60" usage:"seconds between GC runs when storage exceeds limit"`
|
||||||
GCBatchSize int `env:"ORLY_GC_BATCH_SIZE" default:"1000" usage:"number of events to consider per GC run"`
|
GCBatchSize int `env:"ORLY_GC_BATCH_SIZE" default:"1000" usage:"number of events to consider per GC run"`
|
||||||
|
|
||||||
|
|||||||
@@ -316,8 +316,27 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
|||||||
|
|
||||||
// Collect all events from all filters
|
// Collect all events from all filters
|
||||||
var allEvents event.S
|
var allEvents event.S
|
||||||
|
|
||||||
|
// Server-side query result limit to prevent memory exhaustion
|
||||||
|
serverLimit := l.Config.QueryResultLimit
|
||||||
|
if serverLimit <= 0 {
|
||||||
|
serverLimit = 256 // Default if not configured
|
||||||
|
}
|
||||||
|
|
||||||
for _, f := range *env.Filters {
|
for _, f := range *env.Filters {
|
||||||
if f != nil {
|
if f != nil {
|
||||||
|
// Enforce server-side limit on each filter
|
||||||
|
if serverLimit > 0 {
|
||||||
|
if f.Limit == nil {
|
||||||
|
// No client limit - apply server limit
|
||||||
|
limitVal := uint(serverLimit)
|
||||||
|
f.Limit = &limitVal
|
||||||
|
} else if int(*f.Limit) > serverLimit {
|
||||||
|
// Client limit exceeds server limit - cap it
|
||||||
|
limitVal := uint(serverLimit)
|
||||||
|
f.Limit = &limitVal
|
||||||
|
}
|
||||||
|
}
|
||||||
// Summarize filter details for diagnostics (avoid internal fields)
|
// Summarize filter details for diagnostics (avoid internal fields)
|
||||||
var kindsLen int
|
var kindsLen int
|
||||||
if f.Kinds != nil {
|
if f.Kinds != nil {
|
||||||
@@ -715,6 +734,10 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
|||||||
if l.accessTracker != nil && len(events) > 0 {
|
if l.accessTracker != nil && len(events) > 0 {
|
||||||
go func(evts event.S, connID string) {
|
go func(evts event.S, connID string) {
|
||||||
for _, ev := range evts {
|
for _, ev := range evts {
|
||||||
|
// Validate event ID before calling GetSerialById
|
||||||
|
if len(ev.ID) != 32 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if ser, err := l.DB.GetSerialById(ev.ID); err == nil && ser != nil {
|
if ser, err := l.DB.GetSerialById(ev.ID); err == nil && ser != nil {
|
||||||
l.accessTracker.RecordAccess(ser.Get(), connID)
|
l.accessTracker.RecordAccess(ser.Get(), connID)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,6 +56,42 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
whitelist:
|
whitelist:
|
||||||
|
// Extract IP from remote (strip port)
|
||||||
|
ip := remote
|
||||||
|
if idx := strings.LastIndex(remote, ":"); idx != -1 {
|
||||||
|
ip = remote[:idx]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check per-IP connection limit (hard limit 40, default 25)
|
||||||
|
maxConnPerIP := s.Config.MaxConnectionsPerIP
|
||||||
|
if maxConnPerIP <= 0 {
|
||||||
|
maxConnPerIP = 25
|
||||||
|
}
|
||||||
|
if maxConnPerIP > 40 {
|
||||||
|
maxConnPerIP = 40 // Hard limit
|
||||||
|
}
|
||||||
|
|
||||||
|
s.connPerIPMu.Lock()
|
||||||
|
currentConns := s.connPerIP[ip]
|
||||||
|
if currentConns >= maxConnPerIP {
|
||||||
|
s.connPerIPMu.Unlock()
|
||||||
|
log.W.F("connection limit exceeded for IP %s: %d/%d connections", ip, currentConns, maxConnPerIP)
|
||||||
|
http.Error(w, "too many connections from your IP", http.StatusTooManyRequests)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.connPerIP[ip]++
|
||||||
|
s.connPerIPMu.Unlock()
|
||||||
|
|
||||||
|
// Decrement connection count when this function returns
|
||||||
|
defer func() {
|
||||||
|
s.connPerIPMu.Lock()
|
||||||
|
s.connPerIP[ip]--
|
||||||
|
if s.connPerIP[ip] <= 0 {
|
||||||
|
delete(s.connPerIP, ip)
|
||||||
|
}
|
||||||
|
s.connPerIPMu.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
// Create an independent context for this connection
|
// Create an independent context for this connection
|
||||||
// This context will be cancelled when the connection closes or server shuts down
|
// This context will be cancelled when the connection closes or server shuts down
|
||||||
ctx, cancel := context.WithCancel(s.Ctx)
|
ctx, cancel := context.WithCancel(s.Ctx)
|
||||||
@@ -64,8 +100,10 @@ whitelist:
|
|||||||
var conn *websocket.Conn
|
var conn *websocket.Conn
|
||||||
|
|
||||||
// Configure upgrader for this connection
|
// Configure upgrader for this connection
|
||||||
upgrader.ReadBufferSize = int(DefaultMaxMessageSize)
|
// Use reasonable buffer sizes (64KB) instead of max message size (10MB)
|
||||||
upgrader.WriteBufferSize = int(DefaultMaxMessageSize)
|
// to prevent memory exhaustion with many connections
|
||||||
|
upgrader.ReadBufferSize = 64 * 1024 // 64KB
|
||||||
|
upgrader.WriteBufferSize = 64 * 1024 // 64KB
|
||||||
|
|
||||||
if conn, err = upgrader.Upgrade(w, r, nil); chk.E(err) {
|
if conn, err = upgrader.Upgrade(w, r, nil); chk.E(err) {
|
||||||
log.E.F("websocket accept failed from %s: %v", remote, err)
|
log.E.F("websocket accept failed from %s: %v", remote, err)
|
||||||
|
|||||||
@@ -87,6 +87,7 @@ func Run(
|
|||||||
rateLimiter: limiter,
|
rateLimiter: limiter,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
db: db,
|
db: db,
|
||||||
|
connPerIP: make(map[string]int),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize branding/white-label manager if enabled
|
// Initialize branding/white-label manager if enabled
|
||||||
|
|||||||
@@ -57,6 +57,10 @@ type Server struct {
|
|||||||
// optional reverse proxy for dev web server
|
// optional reverse proxy for dev web server
|
||||||
devProxy *httputil.ReverseProxy
|
devProxy *httputil.ReverseProxy
|
||||||
|
|
||||||
|
// Per-IP connection tracking to prevent resource exhaustion
|
||||||
|
connPerIPMu sync.RWMutex
|
||||||
|
connPerIP map[string]int
|
||||||
|
|
||||||
// Challenge storage for HTTP UI authentication
|
// Challenge storage for HTTP UI authentication
|
||||||
challengeMutex sync.RWMutex
|
challengeMutex sync.RWMutex
|
||||||
challenges map[string][]byte
|
challenges map[string][]byte
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import (
|
|||||||
"github.com/dgraph-io/badger/v4/options"
|
"github.com/dgraph-io/badger/v4/options"
|
||||||
"lol.mleku.dev"
|
"lol.mleku.dev"
|
||||||
"lol.mleku.dev/chk"
|
"lol.mleku.dev/chk"
|
||||||
|
"lol.mleku.dev/log"
|
||||||
"next.orly.dev/pkg/database/querycache"
|
"next.orly.dev/pkg/database/querycache"
|
||||||
"git.mleku.dev/mleku/nostr/encoders/event"
|
"git.mleku.dev/mleku/nostr/encoders/event"
|
||||||
"git.mleku.dev/mleku/nostr/encoders/filter"
|
"git.mleku.dev/mleku/nostr/encoders/filter"
|
||||||
@@ -20,6 +21,12 @@ import (
|
|||||||
"git.mleku.dev/mleku/nostr/utils/units"
|
"git.mleku.dev/mleku/nostr/utils/units"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// DefaultMaxConcurrentQueries limits concurrent database queries to prevent memory exhaustion.
|
||||||
|
// Each query creates Badger iterators that consume significant memory. With many concurrent
|
||||||
|
// connections, unlimited queries can exhaust available memory in seconds.
|
||||||
|
// Set very low (3) because each query can internally create many iterators.
|
||||||
|
const DefaultMaxConcurrentQueries = 3
|
||||||
|
|
||||||
// RateLimiterInterface defines the minimal interface for rate limiting during import
|
// RateLimiterInterface defines the minimal interface for rate limiting during import
|
||||||
type RateLimiterInterface interface {
|
type RateLimiterInterface interface {
|
||||||
IsEnabled() bool
|
IsEnabled() bool
|
||||||
@@ -47,6 +54,10 @@ type D struct {
|
|||||||
|
|
||||||
// Rate limiter for controlling memory pressure during bulk operations
|
// Rate limiter for controlling memory pressure during bulk operations
|
||||||
rateLimiter RateLimiterInterface
|
rateLimiter RateLimiterInterface
|
||||||
|
|
||||||
|
// Query semaphore limits concurrent database queries to prevent memory exhaustion.
|
||||||
|
// Each query with iterators consumes significant memory; this prevents OOM under load.
|
||||||
|
querySem chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetRateLimiter sets the rate limiter for controlling memory during import/export
|
// SetRateLimiter sets the rate limiter for controlling memory during import/export
|
||||||
@@ -139,6 +150,7 @@ func NewWithConfig(
|
|||||||
ready: make(chan struct{}),
|
ready: make(chan struct{}),
|
||||||
queryCache: qc,
|
queryCache: qc,
|
||||||
serialCache: NewSerialCache(serialCachePubkeys, serialCacheEventIds),
|
serialCache: NewSerialCache(serialCachePubkeys, serialCacheEventIds),
|
||||||
|
querySem: make(chan struct{}, DefaultMaxConcurrentQueries),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure the data directory exists
|
// Ensure the data directory exists
|
||||||
@@ -353,3 +365,24 @@ func (d *D) Close() (err error) {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AcquireQuerySlot acquires a slot from the query semaphore to limit concurrent queries.
|
||||||
|
// This blocks until a slot is available or the context is cancelled.
|
||||||
|
// Returns true if slot was acquired, false if context cancelled.
|
||||||
|
func (d *D) AcquireQuerySlot(ctx context.Context) bool {
|
||||||
|
select {
|
||||||
|
case d.querySem <- struct{}{}:
|
||||||
|
return true
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReleaseQuerySlot releases a previously acquired query slot.
|
||||||
|
func (d *D) ReleaseQuerySlot() {
|
||||||
|
select {
|
||||||
|
case <-d.querySem:
|
||||||
|
default:
|
||||||
|
log.W.F("ReleaseQuerySlot called without matching AcquireQuerySlot")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -33,6 +33,14 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev
|
|||||||
|
|
||||||
if err = d.View(
|
if err = d.View(
|
||||||
func(txn *badger.Txn) (err error) {
|
func(txn *badger.Txn) (err error) {
|
||||||
|
// Create ONE iterator for sev prefix lookups - reused for all serials
|
||||||
|
// This dramatically reduces memory usage vs creating one per serial
|
||||||
|
sevOpts := badger.DefaultIteratorOptions
|
||||||
|
sevOpts.PrefetchValues = false // We read from key, not value
|
||||||
|
sevOpts.PrefetchSize = 1
|
||||||
|
sevIt := txn.NewIterator(sevOpts)
|
||||||
|
defer sevIt.Close()
|
||||||
|
|
||||||
for _, ser := range serials {
|
for _, ser := range serials {
|
||||||
var ev *event.E
|
var ev *event.E
|
||||||
serialVal := ser.Get()
|
serialVal := ser.Get()
|
||||||
@@ -45,8 +53,8 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev
|
|||||||
}
|
}
|
||||||
err = nil // Reset error, try legacy formats
|
err = nil // Reset error, try legacy formats
|
||||||
|
|
||||||
// Try sev (small event inline) prefix - legacy Reiser4 optimization
|
// Try sev (small event inline) using shared iterator
|
||||||
ev, err = d.fetchSmallEvent(txn, ser)
|
ev, err = d.fetchSmallEventWithIterator(txn, ser, sevIt)
|
||||||
if err == nil && ev != nil {
|
if err == nil && ev != nil {
|
||||||
events[serialVal] = ev
|
events[serialVal] = ev
|
||||||
continue
|
continue
|
||||||
@@ -70,6 +78,61 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev
|
|||||||
return events, nil
|
return events, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fetchSmallEventWithIterator uses a provided iterator for sev lookups (memory efficient)
|
||||||
|
func (d *D) fetchSmallEventWithIterator(txn *badger.Txn, ser *types.Uint40, it *badger.Iterator) (ev *event.E, err error) {
|
||||||
|
smallBuf := new(bytes.Buffer)
|
||||||
|
if err = indexes.SmallEventEnc(ser).MarshalWrite(smallBuf); chk.E(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
prefix := smallBuf.Bytes()
|
||||||
|
|
||||||
|
// Seek to the prefix for this serial
|
||||||
|
it.Seek(prefix)
|
||||||
|
if !it.ValidForPrefix(prefix) {
|
||||||
|
return nil, nil // Not found
|
||||||
|
}
|
||||||
|
|
||||||
|
// Found in sev table - extract inline data
|
||||||
|
key := it.Item().KeyCopy(nil) // Copy key as iterator may be reused
|
||||||
|
// Key format: sev|serial|size_uint16|event_data
|
||||||
|
if len(key) <= 8+2 { // prefix(3) + serial(5) + size(2) = 10 bytes minimum
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sizeIdx := 8 // After sev(3) + serial(5)
|
||||||
|
// Read uint16 big-endian size
|
||||||
|
size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1])
|
||||||
|
dataStart := sizeIdx + 2
|
||||||
|
|
||||||
|
if len(key) < dataStart+size {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
eventData := key[dataStart : dataStart+size]
|
||||||
|
|
||||||
|
// Check if this is compact format (starts with version byte 1)
|
||||||
|
if len(eventData) > 0 && eventData[0] == CompactFormatVersion {
|
||||||
|
// This is compact format stored in sev - need to decode with resolver
|
||||||
|
resolver := NewDatabaseSerialResolver(d, d.serialCache)
|
||||||
|
eventId, idErr := d.GetEventIdBySerial(ser)
|
||||||
|
if idErr != nil {
|
||||||
|
// Cannot decode compact format without event ID - return error
|
||||||
|
// DO NOT fall back to legacy unmarshal as compact format is not valid legacy format
|
||||||
|
log.W.F("fetchSmallEventWithIterator: compact format but no event ID mapping for serial %d: %v", ser.Get(), idErr)
|
||||||
|
return nil, idErr
|
||||||
|
}
|
||||||
|
return UnmarshalCompactEvent(eventData, eventId, resolver)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Legacy binary format
|
||||||
|
ev = new(event.E)
|
||||||
|
if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ev, nil
|
||||||
|
}
|
||||||
|
|
||||||
// fetchCompactEvent tries to fetch an event from the compact format (cmp prefix).
|
// fetchCompactEvent tries to fetch an event from the compact format (cmp prefix).
|
||||||
func (d *D) fetchCompactEvent(txn *badger.Txn, ser *types.Uint40, resolver SerialResolver) (ev *event.E, err error) {
|
func (d *D) fetchCompactEvent(txn *badger.Txn, ser *types.Uint40, resolver SerialResolver) (ev *event.E, err error) {
|
||||||
// Build cmp key
|
// Build cmp key
|
||||||
|
|||||||
@@ -21,7 +21,8 @@ import (
|
|||||||
func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
|
func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
|
||||||
// log.T.F("GetSerialById: input id=%s", hex.Enc(id))
|
// log.T.F("GetSerialById: input id=%s", hex.Enc(id))
|
||||||
if len(id) == 0 {
|
if len(id) == 0 {
|
||||||
err = errorf.E("GetSerialById: called with empty ID")
|
// Return error without logging - caller should validate ID before calling
|
||||||
|
err = errorf.E("empty event ID")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var idxs []Range
|
var idxs []Range
|
||||||
|
|||||||
@@ -54,6 +54,13 @@ func (d *D) QueryAllVersions(c context.Context, f *filter.F) (
|
|||||||
func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDeleteEvents bool, showAllVersions bool) (
|
func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDeleteEvents bool, showAllVersions bool) (
|
||||||
evs event.S, err error,
|
evs event.S, err error,
|
||||||
) {
|
) {
|
||||||
|
// Acquire query slot to limit concurrent queries and prevent memory exhaustion
|
||||||
|
if !d.AcquireQuerySlot(c) {
|
||||||
|
err = c.Err()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer d.ReleaseQuerySlot()
|
||||||
|
|
||||||
// Determine if we should return multiple versions of replaceable events
|
// Determine if we should return multiple versions of replaceable events
|
||||||
// based on the limit parameter
|
// based on the limit parameter
|
||||||
wantMultipleVersions := showAllVersions || (f.Limit != nil && *f.Limit > 1)
|
wantMultipleVersions := showAllVersions || (f.Limit != nil && *f.Limit > 1)
|
||||||
@@ -310,18 +317,13 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If not found in current batch, try to fetch it directly
|
// DISABLED: Fetching target events not in current batch causes memory
|
||||||
|
// exhaustion under high concurrent load. Each GetSerialById call creates
|
||||||
|
// a Badger iterator that consumes significant memory. With many connections
|
||||||
|
// processing deletion events, this explodes memory usage (~300MB/connection).
|
||||||
|
// The deletion will still work when the target event is directly queried.
|
||||||
if targetEv == nil {
|
if targetEv == nil {
|
||||||
// Get serial for the event ID
|
continue
|
||||||
ser, serErr := d.GetSerialById(evId)
|
|
||||||
if serErr != nil || ser == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Fetch the event by serial
|
|
||||||
targetEv, serErr = d.FetchEventBySerial(ser)
|
|
||||||
if serErr != nil || targetEv == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only allow users to delete their own events
|
// Only allow users to delete their own events
|
||||||
@@ -590,14 +592,14 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
|||||||
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
||||||
evs = evs[:*f.Limit]
|
evs = evs[:*f.Limit]
|
||||||
}
|
}
|
||||||
// delete the expired events in a background thread
|
// TODO: DISABLED - inline deletion of expired events causes Badger race conditions
|
||||||
go func() {
|
// under high concurrent load ("assignment to entry in nil map" panic).
|
||||||
for i, ser := range expDeletes {
|
// Expired events should be cleaned up by a separate, rate-limited background
|
||||||
if err = d.DeleteEventBySerial(c, ser, expEvs[i]); chk.E(err) {
|
// worker instead of being deleted inline during query processing.
|
||||||
continue
|
// See: pkg/storage/gc.go TODOs for proper batch deletion implementation.
|
||||||
}
|
if len(expDeletes) > 0 {
|
||||||
}
|
log.D.F("QueryEvents: found %d expired events (deletion disabled)", len(expDeletes))
|
||||||
}()
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -2,6 +2,24 @@
|
|||||||
|
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
|
// TODO: IMPORTANT - This GC implementation is EXPERIMENTAL and may cause crashes under high load.
|
||||||
|
// The current implementation has the following issues that need to be addressed:
|
||||||
|
//
|
||||||
|
// 1. Badger race condition: DeleteEventBySerial runs transactions that can trigger
|
||||||
|
// "assignment to entry in nil map" panics in Badger v4.8.0 under concurrent load.
|
||||||
|
// This happens when GC deletes events while many REQ queries are being processed.
|
||||||
|
//
|
||||||
|
// 2. Batch transaction handling: On large datasets (14+ GB), deletions should be:
|
||||||
|
// - Serialized or use a transaction pool to prevent concurrent txn issues
|
||||||
|
// - Batched with proper delays between batches to avoid overwhelming Badger
|
||||||
|
// - Rate-limited based on current system load
|
||||||
|
//
|
||||||
|
// 3. The current 10ms delay every 100 events (line ~237) is insufficient for busy relays.
|
||||||
|
// Consider adaptive rate limiting based on pending transaction count.
|
||||||
|
//
|
||||||
|
// 4. Consider using Badger's WriteBatch API instead of individual Update transactions
|
||||||
|
// for bulk deletions, which may be more efficient and avoid some race conditions.
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
v0.52.7
|
v0.52.17
|
||||||
|
|||||||
Reference in New Issue
Block a user