Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a565244435 | ||
|
|
bb858a0d6f |
@@ -146,6 +146,7 @@ type C struct {
|
|||||||
|
|
||||||
// Connection concurrency control
|
// Connection concurrency control
|
||||||
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
|
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
|
||||||
|
MaxConnectionsPerIP int `env:"ORLY_MAX_CONN_PER_IP" default:"25" usage:"max WebSocket connections per IP address (prevents resource exhaustion, hard limit 40)"`
|
||||||
|
|
||||||
// Adaptive rate limiting (PID-controlled)
|
// Adaptive rate limiting (PID-controlled)
|
||||||
RateLimitEnabled bool `env:"ORLY_RATE_LIMIT_ENABLED" default:"true" usage:"enable adaptive PID-controlled rate limiting for database operations"`
|
RateLimitEnabled bool `env:"ORLY_RATE_LIMIT_ENABLED" default:"true" usage:"enable adaptive PID-controlled rate limiting for database operations"`
|
||||||
|
|||||||
@@ -56,6 +56,42 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
whitelist:
|
whitelist:
|
||||||
|
// Extract IP from remote (strip port)
|
||||||
|
ip := remote
|
||||||
|
if idx := strings.LastIndex(remote, ":"); idx != -1 {
|
||||||
|
ip = remote[:idx]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check per-IP connection limit (hard limit 40, default 25)
|
||||||
|
maxConnPerIP := s.Config.MaxConnectionsPerIP
|
||||||
|
if maxConnPerIP <= 0 {
|
||||||
|
maxConnPerIP = 25
|
||||||
|
}
|
||||||
|
if maxConnPerIP > 40 {
|
||||||
|
maxConnPerIP = 40 // Hard limit
|
||||||
|
}
|
||||||
|
|
||||||
|
s.connPerIPMu.Lock()
|
||||||
|
currentConns := s.connPerIP[ip]
|
||||||
|
if currentConns >= maxConnPerIP {
|
||||||
|
s.connPerIPMu.Unlock()
|
||||||
|
log.W.F("connection limit exceeded for IP %s: %d/%d connections", ip, currentConns, maxConnPerIP)
|
||||||
|
http.Error(w, "too many connections from your IP", http.StatusTooManyRequests)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.connPerIP[ip]++
|
||||||
|
s.connPerIPMu.Unlock()
|
||||||
|
|
||||||
|
// Decrement connection count when this function returns
|
||||||
|
defer func() {
|
||||||
|
s.connPerIPMu.Lock()
|
||||||
|
s.connPerIP[ip]--
|
||||||
|
if s.connPerIP[ip] <= 0 {
|
||||||
|
delete(s.connPerIP, ip)
|
||||||
|
}
|
||||||
|
s.connPerIPMu.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
// Create an independent context for this connection
|
// Create an independent context for this connection
|
||||||
// This context will be cancelled when the connection closes or server shuts down
|
// This context will be cancelled when the connection closes or server shuts down
|
||||||
ctx, cancel := context.WithCancel(s.Ctx)
|
ctx, cancel := context.WithCancel(s.Ctx)
|
||||||
|
|||||||
@@ -87,6 +87,7 @@ func Run(
|
|||||||
rateLimiter: limiter,
|
rateLimiter: limiter,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
db: db,
|
db: db,
|
||||||
|
connPerIP: make(map[string]int),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize branding/white-label manager if enabled
|
// Initialize branding/white-label manager if enabled
|
||||||
|
|||||||
@@ -57,6 +57,10 @@ type Server struct {
|
|||||||
// optional reverse proxy for dev web server
|
// optional reverse proxy for dev web server
|
||||||
devProxy *httputil.ReverseProxy
|
devProxy *httputil.ReverseProxy
|
||||||
|
|
||||||
|
// Per-IP connection tracking to prevent resource exhaustion
|
||||||
|
connPerIPMu sync.RWMutex
|
||||||
|
connPerIP map[string]int
|
||||||
|
|
||||||
// Challenge storage for HTTP UI authentication
|
// Challenge storage for HTTP UI authentication
|
||||||
challengeMutex sync.RWMutex
|
challengeMutex sync.RWMutex
|
||||||
challenges map[string][]byte
|
challenges map[string][]byte
|
||||||
|
|||||||
@@ -590,14 +590,14 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
|||||||
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
||||||
evs = evs[:*f.Limit]
|
evs = evs[:*f.Limit]
|
||||||
}
|
}
|
||||||
// delete the expired events in a background thread
|
// TODO: DISABLED - inline deletion of expired events causes Badger race conditions
|
||||||
go func() {
|
// under high concurrent load ("assignment to entry in nil map" panic).
|
||||||
for i, ser := range expDeletes {
|
// Expired events should be cleaned up by a separate, rate-limited background
|
||||||
if err = d.DeleteEventBySerial(c, ser, expEvs[i]); chk.E(err) {
|
// worker instead of being deleted inline during query processing.
|
||||||
continue
|
// See: pkg/storage/gc.go TODOs for proper batch deletion implementation.
|
||||||
}
|
if len(expDeletes) > 0 {
|
||||||
}
|
log.D.F("QueryEvents: found %d expired events (deletion disabled)", len(expDeletes))
|
||||||
}()
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
v0.52.8
|
v0.52.10
|
||||||
|
|||||||
Reference in New Issue
Block a user