Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a565244435 | ||
|
|
bb858a0d6f | ||
|
|
b478845e1c |
@@ -146,6 +146,7 @@ type C struct {
|
|||||||
|
|
||||||
// Connection concurrency control
|
// Connection concurrency control
|
||||||
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
|
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
|
||||||
|
MaxConnectionsPerIP int `env:"ORLY_MAX_CONN_PER_IP" default:"25" usage:"max WebSocket connections per IP address (prevents resource exhaustion, hard limit 40)"`
|
||||||
|
|
||||||
// Adaptive rate limiting (PID-controlled)
|
// Adaptive rate limiting (PID-controlled)
|
||||||
RateLimitEnabled bool `env:"ORLY_RATE_LIMIT_ENABLED" default:"true" usage:"enable adaptive PID-controlled rate limiting for database operations"`
|
RateLimitEnabled bool `env:"ORLY_RATE_LIMIT_ENABLED" default:"true" usage:"enable adaptive PID-controlled rate limiting for database operations"`
|
||||||
@@ -207,8 +208,11 @@ type C struct {
|
|||||||
ArchiveCacheTTLHrs int `env:"ORLY_ARCHIVE_CACHE_TTL_HRS" default:"24" usage:"hours to cache query fingerprints to avoid repeated archive requests"`
|
ArchiveCacheTTLHrs int `env:"ORLY_ARCHIVE_CACHE_TTL_HRS" default:"24" usage:"hours to cache query fingerprints to avoid repeated archive requests"`
|
||||||
|
|
||||||
// Storage management configuration (access-based garbage collection)
|
// Storage management configuration (access-based garbage collection)
|
||||||
|
// TODO: GC implementation needs batch transaction handling to avoid Badger race conditions
|
||||||
|
// TODO: GC should use smaller batches with delays between transactions on large datasets
|
||||||
|
// TODO: GC deletion should be serialized or use transaction pools to prevent concurrent txn issues
|
||||||
MaxStorageBytes int64 `env:"ORLY_MAX_STORAGE_BYTES" default:"0" usage:"maximum storage in bytes (0=auto-detect 80%% of filesystem)"`
|
MaxStorageBytes int64 `env:"ORLY_MAX_STORAGE_BYTES" default:"0" usage:"maximum storage in bytes (0=auto-detect 80%% of filesystem)"`
|
||||||
GCEnabled bool `env:"ORLY_GC_ENABLED" default:"true" usage:"enable continuous garbage collection based on access patterns"`
|
GCEnabled bool `env:"ORLY_GC_ENABLED" default:"false" usage:"enable continuous garbage collection based on access patterns (EXPERIMENTAL - may cause crashes under load)"`
|
||||||
GCIntervalSec int `env:"ORLY_GC_INTERVAL_SEC" default:"60" usage:"seconds between GC runs when storage exceeds limit"`
|
GCIntervalSec int `env:"ORLY_GC_INTERVAL_SEC" default:"60" usage:"seconds between GC runs when storage exceeds limit"`
|
||||||
GCBatchSize int `env:"ORLY_GC_BATCH_SIZE" default:"1000" usage:"number of events to consider per GC run"`
|
GCBatchSize int `env:"ORLY_GC_BATCH_SIZE" default:"1000" usage:"number of events to consider per GC run"`
|
||||||
|
|
||||||
|
|||||||
@@ -56,6 +56,42 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
whitelist:
|
whitelist:
|
||||||
|
// Extract IP from remote (strip port)
|
||||||
|
ip := remote
|
||||||
|
if idx := strings.LastIndex(remote, ":"); idx != -1 {
|
||||||
|
ip = remote[:idx]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check per-IP connection limit (hard limit 40, default 25)
|
||||||
|
maxConnPerIP := s.Config.MaxConnectionsPerIP
|
||||||
|
if maxConnPerIP <= 0 {
|
||||||
|
maxConnPerIP = 25
|
||||||
|
}
|
||||||
|
if maxConnPerIP > 40 {
|
||||||
|
maxConnPerIP = 40 // Hard limit
|
||||||
|
}
|
||||||
|
|
||||||
|
s.connPerIPMu.Lock()
|
||||||
|
currentConns := s.connPerIP[ip]
|
||||||
|
if currentConns >= maxConnPerIP {
|
||||||
|
s.connPerIPMu.Unlock()
|
||||||
|
log.W.F("connection limit exceeded for IP %s: %d/%d connections", ip, currentConns, maxConnPerIP)
|
||||||
|
http.Error(w, "too many connections from your IP", http.StatusTooManyRequests)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.connPerIP[ip]++
|
||||||
|
s.connPerIPMu.Unlock()
|
||||||
|
|
||||||
|
// Decrement connection count when this function returns
|
||||||
|
defer func() {
|
||||||
|
s.connPerIPMu.Lock()
|
||||||
|
s.connPerIP[ip]--
|
||||||
|
if s.connPerIP[ip] <= 0 {
|
||||||
|
delete(s.connPerIP, ip)
|
||||||
|
}
|
||||||
|
s.connPerIPMu.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
// Create an independent context for this connection
|
// Create an independent context for this connection
|
||||||
// This context will be cancelled when the connection closes or server shuts down
|
// This context will be cancelled when the connection closes or server shuts down
|
||||||
ctx, cancel := context.WithCancel(s.Ctx)
|
ctx, cancel := context.WithCancel(s.Ctx)
|
||||||
|
|||||||
@@ -87,6 +87,7 @@ func Run(
|
|||||||
rateLimiter: limiter,
|
rateLimiter: limiter,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
db: db,
|
db: db,
|
||||||
|
connPerIP: make(map[string]int),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize branding/white-label manager if enabled
|
// Initialize branding/white-label manager if enabled
|
||||||
|
|||||||
@@ -57,6 +57,10 @@ type Server struct {
|
|||||||
// optional reverse proxy for dev web server
|
// optional reverse proxy for dev web server
|
||||||
devProxy *httputil.ReverseProxy
|
devProxy *httputil.ReverseProxy
|
||||||
|
|
||||||
|
// Per-IP connection tracking to prevent resource exhaustion
|
||||||
|
connPerIPMu sync.RWMutex
|
||||||
|
connPerIP map[string]int
|
||||||
|
|
||||||
// Challenge storage for HTTP UI authentication
|
// Challenge storage for HTTP UI authentication
|
||||||
challengeMutex sync.RWMutex
|
challengeMutex sync.RWMutex
|
||||||
challenges map[string][]byte
|
challenges map[string][]byte
|
||||||
|
|||||||
@@ -590,15 +590,15 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
|||||||
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
||||||
evs = evs[:*f.Limit]
|
evs = evs[:*f.Limit]
|
||||||
}
|
}
|
||||||
// delete the expired events in a background thread
|
// TODO: DISABLED - inline deletion of expired events causes Badger race conditions
|
||||||
go func() {
|
// under high concurrent load ("assignment to entry in nil map" panic).
|
||||||
for i, ser := range expDeletes {
|
// Expired events should be cleaned up by a separate, rate-limited background
|
||||||
if err = d.DeleteEventBySerial(c, ser, expEvs[i]); chk.E(err) {
|
// worker instead of being deleted inline during query processing.
|
||||||
continue
|
// See: pkg/storage/gc.go TODOs for proper batch deletion implementation.
|
||||||
|
if len(expDeletes) > 0 {
|
||||||
|
log.D.F("QueryEvents: found %d expired events (deletion disabled)", len(expDeletes))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,24 @@
|
|||||||
|
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
|
// TODO: IMPORTANT - This GC implementation is EXPERIMENTAL and may cause crashes under high load.
|
||||||
|
// The current implementation has the following issues that need to be addressed:
|
||||||
|
//
|
||||||
|
// 1. Badger race condition: DeleteEventBySerial runs transactions that can trigger
|
||||||
|
// "assignment to entry in nil map" panics in Badger v4.8.0 under concurrent load.
|
||||||
|
// This happens when GC deletes events while many REQ queries are being processed.
|
||||||
|
//
|
||||||
|
// 2. Batch transaction handling: On large datasets (14+ GB), deletions should be:
|
||||||
|
// - Serialized or use a transaction pool to prevent concurrent txn issues
|
||||||
|
// - Batched with proper delays between batches to avoid overwhelming Badger
|
||||||
|
// - Rate-limited based on current system load
|
||||||
|
//
|
||||||
|
// 3. The current 10ms delay every 100 events (line ~237) is insufficient for busy relays.
|
||||||
|
// Consider adaptive rate limiting based on pending transaction count.
|
||||||
|
//
|
||||||
|
// 4. Consider using Badger's WriteBatch API instead of individual Update transactions
|
||||||
|
// for bulk deletions, which may be more efficient and avoid some race conditions.
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
v0.52.7
|
v0.52.10
|
||||||
|
|||||||
Reference in New Issue
Block a user