Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a565244435 | ||
|
|
bb858a0d6f |
@@ -146,6 +146,7 @@ type C struct {
|
||||
|
||||
// Connection concurrency control
|
||||
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
|
||||
MaxConnectionsPerIP int `env:"ORLY_MAX_CONN_PER_IP" default:"25" usage:"max WebSocket connections per IP address (prevents resource exhaustion, hard limit 40)"`
|
||||
|
||||
// Adaptive rate limiting (PID-controlled)
|
||||
RateLimitEnabled bool `env:"ORLY_RATE_LIMIT_ENABLED" default:"true" usage:"enable adaptive PID-controlled rate limiting for database operations"`
|
||||
|
||||
@@ -56,6 +56,42 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
whitelist:
|
||||
// Extract IP from remote (strip port)
|
||||
ip := remote
|
||||
if idx := strings.LastIndex(remote, ":"); idx != -1 {
|
||||
ip = remote[:idx]
|
||||
}
|
||||
|
||||
// Check per-IP connection limit (hard limit 40, default 25)
|
||||
maxConnPerIP := s.Config.MaxConnectionsPerIP
|
||||
if maxConnPerIP <= 0 {
|
||||
maxConnPerIP = 25
|
||||
}
|
||||
if maxConnPerIP > 40 {
|
||||
maxConnPerIP = 40 // Hard limit
|
||||
}
|
||||
|
||||
s.connPerIPMu.Lock()
|
||||
currentConns := s.connPerIP[ip]
|
||||
if currentConns >= maxConnPerIP {
|
||||
s.connPerIPMu.Unlock()
|
||||
log.W.F("connection limit exceeded for IP %s: %d/%d connections", ip, currentConns, maxConnPerIP)
|
||||
http.Error(w, "too many connections from your IP", http.StatusTooManyRequests)
|
||||
return
|
||||
}
|
||||
s.connPerIP[ip]++
|
||||
s.connPerIPMu.Unlock()
|
||||
|
||||
// Decrement connection count when this function returns
|
||||
defer func() {
|
||||
s.connPerIPMu.Lock()
|
||||
s.connPerIP[ip]--
|
||||
if s.connPerIP[ip] <= 0 {
|
||||
delete(s.connPerIP, ip)
|
||||
}
|
||||
s.connPerIPMu.Unlock()
|
||||
}()
|
||||
|
||||
// Create an independent context for this connection
|
||||
// This context will be cancelled when the connection closes or server shuts down
|
||||
ctx, cancel := context.WithCancel(s.Ctx)
|
||||
|
||||
@@ -87,6 +87,7 @@ func Run(
|
||||
rateLimiter: limiter,
|
||||
cfg: cfg,
|
||||
db: db,
|
||||
connPerIP: make(map[string]int),
|
||||
}
|
||||
|
||||
// Initialize branding/white-label manager if enabled
|
||||
|
||||
@@ -57,6 +57,10 @@ type Server struct {
|
||||
// optional reverse proxy for dev web server
|
||||
devProxy *httputil.ReverseProxy
|
||||
|
||||
// Per-IP connection tracking to prevent resource exhaustion
|
||||
connPerIPMu sync.RWMutex
|
||||
connPerIP map[string]int
|
||||
|
||||
// Challenge storage for HTTP UI authentication
|
||||
challengeMutex sync.RWMutex
|
||||
challenges map[string][]byte
|
||||
|
||||
@@ -590,14 +590,14 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
||||
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
||||
evs = evs[:*f.Limit]
|
||||
}
|
||||
// delete the expired events in a background thread
|
||||
go func() {
|
||||
for i, ser := range expDeletes {
|
||||
if err = d.DeleteEventBySerial(c, ser, expEvs[i]); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}()
|
||||
// TODO: DISABLED - inline deletion of expired events causes Badger race conditions
|
||||
// under high concurrent load ("assignment to entry in nil map" panic).
|
||||
// Expired events should be cleaned up by a separate, rate-limited background
|
||||
// worker instead of being deleted inline during query processing.
|
||||
// See: pkg/storage/gc.go TODOs for proper batch deletion implementation.
|
||||
if len(expDeletes) > 0 {
|
||||
log.D.F("QueryEvents: found %d expired events (deletion disabled)", len(expDeletes))
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
@@ -1 +1 @@
|
||||
v0.52.8
|
||||
v0.52.10
|
||||
|
||||
Reference in New Issue
Block a user