Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a768be22e6 | ||
|
|
779f341dda | ||
|
|
a565244435 | ||
|
|
bb858a0d6f | ||
|
|
b478845e1c | ||
|
|
e75e6de59b | ||
|
|
1297a45ee3 |
@@ -146,6 +146,10 @@ type C struct {
|
||||
|
||||
// Connection concurrency control
|
||||
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
|
||||
MaxConnectionsPerIP int `env:"ORLY_MAX_CONN_PER_IP" default:"25" usage:"max WebSocket connections per IP address (prevents resource exhaustion, hard limit 40)"`
|
||||
|
||||
// Query result limits (prevents memory exhaustion from unbounded queries)
|
||||
QueryResultLimit int `env:"ORLY_QUERY_RESULT_LIMIT" default:"256" usage:"max events returned per REQ filter (prevents unbounded memory usage, 0=unlimited)"`
|
||||
|
||||
// Adaptive rate limiting (PID-controlled)
|
||||
RateLimitEnabled bool `env:"ORLY_RATE_LIMIT_ENABLED" default:"true" usage:"enable adaptive PID-controlled rate limiting for database operations"`
|
||||
@@ -207,8 +211,11 @@ type C struct {
|
||||
ArchiveCacheTTLHrs int `env:"ORLY_ARCHIVE_CACHE_TTL_HRS" default:"24" usage:"hours to cache query fingerprints to avoid repeated archive requests"`
|
||||
|
||||
// Storage management configuration (access-based garbage collection)
|
||||
// TODO: GC implementation needs batch transaction handling to avoid Badger race conditions
|
||||
// TODO: GC should use smaller batches with delays between transactions on large datasets
|
||||
// TODO: GC deletion should be serialized or use transaction pools to prevent concurrent txn issues
|
||||
MaxStorageBytes int64 `env:"ORLY_MAX_STORAGE_BYTES" default:"0" usage:"maximum storage in bytes (0=auto-detect 80%% of filesystem)"`
|
||||
GCEnabled bool `env:"ORLY_GC_ENABLED" default:"true" usage:"enable continuous garbage collection based on access patterns"`
|
||||
GCEnabled bool `env:"ORLY_GC_ENABLED" default:"false" usage:"enable continuous garbage collection based on access patterns (EXPERIMENTAL - may cause crashes under load)"`
|
||||
GCIntervalSec int `env:"ORLY_GC_INTERVAL_SEC" default:"60" usage:"seconds between GC runs when storage exceeds limit"`
|
||||
GCBatchSize int `env:"ORLY_GC_BATCH_SIZE" default:"1000" usage:"number of events to consider per GC run"`
|
||||
|
||||
|
||||
@@ -316,8 +316,27 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||
|
||||
// Collect all events from all filters
|
||||
var allEvents event.S
|
||||
|
||||
// Server-side query result limit to prevent memory exhaustion
|
||||
serverLimit := l.Config.QueryResultLimit
|
||||
if serverLimit <= 0 {
|
||||
serverLimit = 256 // Default if not configured
|
||||
}
|
||||
|
||||
for _, f := range *env.Filters {
|
||||
if f != nil {
|
||||
// Enforce server-side limit on each filter
|
||||
if serverLimit > 0 {
|
||||
if f.Limit == nil {
|
||||
// No client limit - apply server limit
|
||||
limitVal := uint(serverLimit)
|
||||
f.Limit = &limitVal
|
||||
} else if int(*f.Limit) > serverLimit {
|
||||
// Client limit exceeds server limit - cap it
|
||||
limitVal := uint(serverLimit)
|
||||
f.Limit = &limitVal
|
||||
}
|
||||
}
|
||||
// Summarize filter details for diagnostics (avoid internal fields)
|
||||
var kindsLen int
|
||||
if f.Kinds != nil {
|
||||
|
||||
@@ -56,6 +56,42 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
whitelist:
|
||||
// Extract IP from remote (strip port)
|
||||
ip := remote
|
||||
if idx := strings.LastIndex(remote, ":"); idx != -1 {
|
||||
ip = remote[:idx]
|
||||
}
|
||||
|
||||
// Check per-IP connection limit (hard limit 40, default 25)
|
||||
maxConnPerIP := s.Config.MaxConnectionsPerIP
|
||||
if maxConnPerIP <= 0 {
|
||||
maxConnPerIP = 25
|
||||
}
|
||||
if maxConnPerIP > 40 {
|
||||
maxConnPerIP = 40 // Hard limit
|
||||
}
|
||||
|
||||
s.connPerIPMu.Lock()
|
||||
currentConns := s.connPerIP[ip]
|
||||
if currentConns >= maxConnPerIP {
|
||||
s.connPerIPMu.Unlock()
|
||||
log.W.F("connection limit exceeded for IP %s: %d/%d connections", ip, currentConns, maxConnPerIP)
|
||||
http.Error(w, "too many connections from your IP", http.StatusTooManyRequests)
|
||||
return
|
||||
}
|
||||
s.connPerIP[ip]++
|
||||
s.connPerIPMu.Unlock()
|
||||
|
||||
// Decrement connection count when this function returns
|
||||
defer func() {
|
||||
s.connPerIPMu.Lock()
|
||||
s.connPerIP[ip]--
|
||||
if s.connPerIP[ip] <= 0 {
|
||||
delete(s.connPerIP, ip)
|
||||
}
|
||||
s.connPerIPMu.Unlock()
|
||||
}()
|
||||
|
||||
// Create an independent context for this connection
|
||||
// This context will be cancelled when the connection closes or server shuts down
|
||||
ctx, cancel := context.WithCancel(s.Ctx)
|
||||
@@ -64,8 +100,10 @@ whitelist:
|
||||
var conn *websocket.Conn
|
||||
|
||||
// Configure upgrader for this connection
|
||||
upgrader.ReadBufferSize = int(DefaultMaxMessageSize)
|
||||
upgrader.WriteBufferSize = int(DefaultMaxMessageSize)
|
||||
// Use reasonable buffer sizes (64KB) instead of max message size (10MB)
|
||||
// to prevent memory exhaustion with many connections
|
||||
upgrader.ReadBufferSize = 64 * 1024 // 64KB
|
||||
upgrader.WriteBufferSize = 64 * 1024 // 64KB
|
||||
|
||||
if conn, err = upgrader.Upgrade(w, r, nil); chk.E(err) {
|
||||
log.E.F("websocket accept failed from %s: %v", remote, err)
|
||||
|
||||
@@ -87,6 +87,7 @@ func Run(
|
||||
rateLimiter: limiter,
|
||||
cfg: cfg,
|
||||
db: db,
|
||||
connPerIP: make(map[string]int),
|
||||
}
|
||||
|
||||
// Initialize branding/white-label manager if enabled
|
||||
|
||||
@@ -57,6 +57,10 @@ type Server struct {
|
||||
// optional reverse proxy for dev web server
|
||||
devProxy *httputil.ReverseProxy
|
||||
|
||||
// Per-IP connection tracking to prevent resource exhaustion
|
||||
connPerIPMu sync.RWMutex
|
||||
connPerIP map[string]int
|
||||
|
||||
// Challenge storage for HTTP UI authentication
|
||||
challengeMutex sync.RWMutex
|
||||
challenges map[string][]byte
|
||||
|
||||
2
app/web/dist/bundle.js
vendored
2
app/web/dist/bundle.js
vendored
File diff suppressed because one or more lines are too long
2
app/web/dist/bundle.js.map
vendored
2
app/web/dist/bundle.js.map
vendored
File diff suppressed because one or more lines are too long
@@ -26,7 +26,7 @@ export function initConfig() {
|
||||
// 4. Not running on a typical relay port (3334) - likely a static server
|
||||
const hasStoredRelay = !!localStorage.getItem("relayUrl");
|
||||
const isFileProtocol = window.location.protocol === 'file:';
|
||||
const isNonRelayPort = !['3334', '443', '80', ''].includes(window.location.port);
|
||||
const isNonRelayPort = !['3334', '7777', '443', '80', ''].includes(window.location.port);
|
||||
|
||||
const standalone = BUILD_STANDALONE_MODE || hasStoredRelay || isFileProtocol || isNonRelayPort;
|
||||
isStandaloneMode.set(standalone);
|
||||
|
||||
@@ -951,11 +951,10 @@ export async function fetchAllEvents(options = {}) {
|
||||
} = options;
|
||||
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
const thirtyDaysAgo = now - (30 * 24 * 60 * 60);
|
||||
const sixMonthsAgo = now - (180 * 24 * 60 * 60);
|
||||
const fiveYearsAgo = now - (5 * 365 * 24 * 60 * 60);
|
||||
|
||||
// Start with 30 days if no since specified
|
||||
const initialSince = since || thirtyDaysAgo;
|
||||
// Start with 5 years if no since specified
|
||||
const initialSince = since || fiveYearsAgo;
|
||||
|
||||
const filters = [{ ...rest }];
|
||||
filters[0].since = initialSince;
|
||||
@@ -964,21 +963,10 @@ export async function fetchAllEvents(options = {}) {
|
||||
if (kinds) filters[0].kinds = kinds;
|
||||
if (limit) filters[0].limit = limit;
|
||||
|
||||
let events = await fetchEvents(filters, {
|
||||
const events = await fetchEvents(filters, {
|
||||
timeout: 30000
|
||||
});
|
||||
|
||||
// If we got few results and weren't already using a longer window, retry with 6 months
|
||||
const fewResultsThreshold = Math.min(20, limit / 2);
|
||||
if (events.length < fewResultsThreshold && initialSince > sixMonthsAgo && !since) {
|
||||
console.log(`[fetchAllEvents] Only got ${events.length} events, retrying with 6-month window...`);
|
||||
filters[0].since = sixMonthsAgo;
|
||||
events = await fetchEvents(filters, {
|
||||
timeout: 30000
|
||||
});
|
||||
console.log(`[fetchAllEvents] 6-month window returned ${events.length} events`);
|
||||
}
|
||||
|
||||
return events;
|
||||
}
|
||||
|
||||
|
||||
@@ -590,14 +590,14 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
||||
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
||||
evs = evs[:*f.Limit]
|
||||
}
|
||||
// delete the expired events in a background thread
|
||||
go func() {
|
||||
for i, ser := range expDeletes {
|
||||
if err = d.DeleteEventBySerial(c, ser, expEvs[i]); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}()
|
||||
// TODO: DISABLED - inline deletion of expired events causes Badger race conditions
|
||||
// under high concurrent load ("assignment to entry in nil map" panic).
|
||||
// Expired events should be cleaned up by a separate, rate-limited background
|
||||
// worker instead of being deleted inline during query processing.
|
||||
// See: pkg/storage/gc.go TODOs for proper batch deletion implementation.
|
||||
if len(expDeletes) > 0 {
|
||||
log.D.F("QueryEvents: found %d expired events (deletion disabled)", len(expDeletes))
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
@@ -2,6 +2,24 @@
|
||||
|
||||
package storage
|
||||
|
||||
// TODO: IMPORTANT - This GC implementation is EXPERIMENTAL and may cause crashes under high load.
|
||||
// The current implementation has the following issues that need to be addressed:
|
||||
//
|
||||
// 1. Badger race condition: DeleteEventBySerial runs transactions that can trigger
|
||||
// "assignment to entry in nil map" panics in Badger v4.8.0 under concurrent load.
|
||||
// This happens when GC deletes events while many REQ queries are being processed.
|
||||
//
|
||||
// 2. Batch transaction handling: On large datasets (14+ GB), deletions should be:
|
||||
// - Serialized or use a transaction pool to prevent concurrent txn issues
|
||||
// - Batched with proper delays between batches to avoid overwhelming Badger
|
||||
// - Rate-limited based on current system load
|
||||
//
|
||||
// 3. The current 10ms delay every 100 events (line ~237) is insufficient for busy relays.
|
||||
// Consider adaptive rate limiting based on pending transaction count.
|
||||
//
|
||||
// 4. Consider using Badger's WriteBatch API instead of individual Update transactions
|
||||
// for bulk deletions, which may be more efficient and avoid some race conditions.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
@@ -1 +1 @@
|
||||
v0.52.5
|
||||
v0.52.12
|
||||
|
||||
Reference in New Issue
Block a user