Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a768be22e6 | ||
|
|
779f341dda | ||
|
|
a565244435 | ||
|
|
bb858a0d6f | ||
|
|
b478845e1c | ||
|
|
e75e6de59b | ||
|
|
1297a45ee3 | ||
|
|
138d5cbff9 | ||
|
|
0c82307bf6 |
@@ -146,6 +146,10 @@ type C struct {
|
|||||||
|
|
||||||
// Connection concurrency control
|
// Connection concurrency control
|
||||||
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
|
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
|
||||||
|
MaxConnectionsPerIP int `env:"ORLY_MAX_CONN_PER_IP" default:"25" usage:"max WebSocket connections per IP address (prevents resource exhaustion, hard limit 40)"`
|
||||||
|
|
||||||
|
// Query result limits (prevents memory exhaustion from unbounded queries)
|
||||||
|
QueryResultLimit int `env:"ORLY_QUERY_RESULT_LIMIT" default:"256" usage:"max events returned per REQ filter (prevents unbounded memory usage, 0=unlimited)"`
|
||||||
|
|
||||||
// Adaptive rate limiting (PID-controlled)
|
// Adaptive rate limiting (PID-controlled)
|
||||||
RateLimitEnabled bool `env:"ORLY_RATE_LIMIT_ENABLED" default:"true" usage:"enable adaptive PID-controlled rate limiting for database operations"`
|
RateLimitEnabled bool `env:"ORLY_RATE_LIMIT_ENABLED" default:"true" usage:"enable adaptive PID-controlled rate limiting for database operations"`
|
||||||
@@ -207,8 +211,11 @@ type C struct {
|
|||||||
ArchiveCacheTTLHrs int `env:"ORLY_ARCHIVE_CACHE_TTL_HRS" default:"24" usage:"hours to cache query fingerprints to avoid repeated archive requests"`
|
ArchiveCacheTTLHrs int `env:"ORLY_ARCHIVE_CACHE_TTL_HRS" default:"24" usage:"hours to cache query fingerprints to avoid repeated archive requests"`
|
||||||
|
|
||||||
// Storage management configuration (access-based garbage collection)
|
// Storage management configuration (access-based garbage collection)
|
||||||
|
// TODO: GC implementation needs batch transaction handling to avoid Badger race conditions
|
||||||
|
// TODO: GC should use smaller batches with delays between transactions on large datasets
|
||||||
|
// TODO: GC deletion should be serialized or use transaction pools to prevent concurrent txn issues
|
||||||
MaxStorageBytes int64 `env:"ORLY_MAX_STORAGE_BYTES" default:"0" usage:"maximum storage in bytes (0=auto-detect 80%% of filesystem)"`
|
MaxStorageBytes int64 `env:"ORLY_MAX_STORAGE_BYTES" default:"0" usage:"maximum storage in bytes (0=auto-detect 80%% of filesystem)"`
|
||||||
GCEnabled bool `env:"ORLY_GC_ENABLED" default:"true" usage:"enable continuous garbage collection based on access patterns"`
|
GCEnabled bool `env:"ORLY_GC_ENABLED" default:"false" usage:"enable continuous garbage collection based on access patterns (EXPERIMENTAL - may cause crashes under load)"`
|
||||||
GCIntervalSec int `env:"ORLY_GC_INTERVAL_SEC" default:"60" usage:"seconds between GC runs when storage exceeds limit"`
|
GCIntervalSec int `env:"ORLY_GC_INTERVAL_SEC" default:"60" usage:"seconds between GC runs when storage exceeds limit"`
|
||||||
GCBatchSize int `env:"ORLY_GC_BATCH_SIZE" default:"1000" usage:"number of events to consider per GC run"`
|
GCBatchSize int `env:"ORLY_GC_BATCH_SIZE" default:"1000" usage:"number of events to consider per GC run"`
|
||||||
|
|
||||||
|
|||||||
@@ -316,8 +316,27 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
|||||||
|
|
||||||
// Collect all events from all filters
|
// Collect all events from all filters
|
||||||
var allEvents event.S
|
var allEvents event.S
|
||||||
|
|
||||||
|
// Server-side query result limit to prevent memory exhaustion
|
||||||
|
serverLimit := l.Config.QueryResultLimit
|
||||||
|
if serverLimit <= 0 {
|
||||||
|
serverLimit = 256 // Default if not configured
|
||||||
|
}
|
||||||
|
|
||||||
for _, f := range *env.Filters {
|
for _, f := range *env.Filters {
|
||||||
if f != nil {
|
if f != nil {
|
||||||
|
// Enforce server-side limit on each filter
|
||||||
|
if serverLimit > 0 {
|
||||||
|
if f.Limit == nil {
|
||||||
|
// No client limit - apply server limit
|
||||||
|
limitVal := uint(serverLimit)
|
||||||
|
f.Limit = &limitVal
|
||||||
|
} else if int(*f.Limit) > serverLimit {
|
||||||
|
// Client limit exceeds server limit - cap it
|
||||||
|
limitVal := uint(serverLimit)
|
||||||
|
f.Limit = &limitVal
|
||||||
|
}
|
||||||
|
}
|
||||||
// Summarize filter details for diagnostics (avoid internal fields)
|
// Summarize filter details for diagnostics (avoid internal fields)
|
||||||
var kindsLen int
|
var kindsLen int
|
||||||
if f.Kinds != nil {
|
if f.Kinds != nil {
|
||||||
|
|||||||
@@ -56,6 +56,42 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
whitelist:
|
whitelist:
|
||||||
|
// Extract IP from remote (strip port)
|
||||||
|
ip := remote
|
||||||
|
if idx := strings.LastIndex(remote, ":"); idx != -1 {
|
||||||
|
ip = remote[:idx]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check per-IP connection limit (hard limit 40, default 25)
|
||||||
|
maxConnPerIP := s.Config.MaxConnectionsPerIP
|
||||||
|
if maxConnPerIP <= 0 {
|
||||||
|
maxConnPerIP = 25
|
||||||
|
}
|
||||||
|
if maxConnPerIP > 40 {
|
||||||
|
maxConnPerIP = 40 // Hard limit
|
||||||
|
}
|
||||||
|
|
||||||
|
s.connPerIPMu.Lock()
|
||||||
|
currentConns := s.connPerIP[ip]
|
||||||
|
if currentConns >= maxConnPerIP {
|
||||||
|
s.connPerIPMu.Unlock()
|
||||||
|
log.W.F("connection limit exceeded for IP %s: %d/%d connections", ip, currentConns, maxConnPerIP)
|
||||||
|
http.Error(w, "too many connections from your IP", http.StatusTooManyRequests)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.connPerIP[ip]++
|
||||||
|
s.connPerIPMu.Unlock()
|
||||||
|
|
||||||
|
// Decrement connection count when this function returns
|
||||||
|
defer func() {
|
||||||
|
s.connPerIPMu.Lock()
|
||||||
|
s.connPerIP[ip]--
|
||||||
|
if s.connPerIP[ip] <= 0 {
|
||||||
|
delete(s.connPerIP, ip)
|
||||||
|
}
|
||||||
|
s.connPerIPMu.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
// Create an independent context for this connection
|
// Create an independent context for this connection
|
||||||
// This context will be cancelled when the connection closes or server shuts down
|
// This context will be cancelled when the connection closes or server shuts down
|
||||||
ctx, cancel := context.WithCancel(s.Ctx)
|
ctx, cancel := context.WithCancel(s.Ctx)
|
||||||
@@ -64,8 +100,10 @@ whitelist:
|
|||||||
var conn *websocket.Conn
|
var conn *websocket.Conn
|
||||||
|
|
||||||
// Configure upgrader for this connection
|
// Configure upgrader for this connection
|
||||||
upgrader.ReadBufferSize = int(DefaultMaxMessageSize)
|
// Use reasonable buffer sizes (64KB) instead of max message size (10MB)
|
||||||
upgrader.WriteBufferSize = int(DefaultMaxMessageSize)
|
// to prevent memory exhaustion with many connections
|
||||||
|
upgrader.ReadBufferSize = 64 * 1024 // 64KB
|
||||||
|
upgrader.WriteBufferSize = 64 * 1024 // 64KB
|
||||||
|
|
||||||
if conn, err = upgrader.Upgrade(w, r, nil); chk.E(err) {
|
if conn, err = upgrader.Upgrade(w, r, nil); chk.E(err) {
|
||||||
log.E.F("websocket accept failed from %s: %v", remote, err)
|
log.E.F("websocket accept failed from %s: %v", remote, err)
|
||||||
|
|||||||
@@ -87,6 +87,7 @@ func Run(
|
|||||||
rateLimiter: limiter,
|
rateLimiter: limiter,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
db: db,
|
db: db,
|
||||||
|
connPerIP: make(map[string]int),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize branding/white-label manager if enabled
|
// Initialize branding/white-label manager if enabled
|
||||||
|
|||||||
@@ -57,6 +57,10 @@ type Server struct {
|
|||||||
// optional reverse proxy for dev web server
|
// optional reverse proxy for dev web server
|
||||||
devProxy *httputil.ReverseProxy
|
devProxy *httputil.ReverseProxy
|
||||||
|
|
||||||
|
// Per-IP connection tracking to prevent resource exhaustion
|
||||||
|
connPerIPMu sync.RWMutex
|
||||||
|
connPerIP map[string]int
|
||||||
|
|
||||||
// Challenge storage for HTTP UI authentication
|
// Challenge storage for HTTP UI authentication
|
||||||
challengeMutex sync.RWMutex
|
challengeMutex sync.RWMutex
|
||||||
challenges map[string][]byte
|
challenges map[string][]byte
|
||||||
|
|||||||
2
app/web/dist/bundle.js
vendored
2
app/web/dist/bundle.js
vendored
File diff suppressed because one or more lines are too long
2
app/web/dist/bundle.js.map
vendored
2
app/web/dist/bundle.js.map
vendored
File diff suppressed because one or more lines are too long
@@ -26,7 +26,7 @@ export function initConfig() {
|
|||||||
// 4. Not running on a typical relay port (3334) - likely a static server
|
// 4. Not running on a typical relay port (3334) - likely a static server
|
||||||
const hasStoredRelay = !!localStorage.getItem("relayUrl");
|
const hasStoredRelay = !!localStorage.getItem("relayUrl");
|
||||||
const isFileProtocol = window.location.protocol === 'file:';
|
const isFileProtocol = window.location.protocol === 'file:';
|
||||||
const isNonRelayPort = !['3334', '443', '80', ''].includes(window.location.port);
|
const isNonRelayPort = !['3334', '7777', '443', '80', ''].includes(window.location.port);
|
||||||
|
|
||||||
const standalone = BUILD_STANDALONE_MODE || hasStoredRelay || isFileProtocol || isNonRelayPort;
|
const standalone = BUILD_STANDALONE_MODE || hasStoredRelay || isFileProtocol || isNonRelayPort;
|
||||||
isStandaloneMode.set(standalone);
|
isStandaloneMode.set(standalone);
|
||||||
|
|||||||
@@ -951,11 +951,10 @@ export async function fetchAllEvents(options = {}) {
|
|||||||
} = options;
|
} = options;
|
||||||
|
|
||||||
const now = Math.floor(Date.now() / 1000);
|
const now = Math.floor(Date.now() / 1000);
|
||||||
const thirtyDaysAgo = now - (30 * 24 * 60 * 60);
|
const fiveYearsAgo = now - (5 * 365 * 24 * 60 * 60);
|
||||||
const sixMonthsAgo = now - (180 * 24 * 60 * 60);
|
|
||||||
|
|
||||||
// Start with 30 days if no since specified
|
// Start with 5 years if no since specified
|
||||||
const initialSince = since || thirtyDaysAgo;
|
const initialSince = since || fiveYearsAgo;
|
||||||
|
|
||||||
const filters = [{ ...rest }];
|
const filters = [{ ...rest }];
|
||||||
filters[0].since = initialSince;
|
filters[0].since = initialSince;
|
||||||
@@ -964,21 +963,10 @@ export async function fetchAllEvents(options = {}) {
|
|||||||
if (kinds) filters[0].kinds = kinds;
|
if (kinds) filters[0].kinds = kinds;
|
||||||
if (limit) filters[0].limit = limit;
|
if (limit) filters[0].limit = limit;
|
||||||
|
|
||||||
let events = await fetchEvents(filters, {
|
const events = await fetchEvents(filters, {
|
||||||
timeout: 30000
|
timeout: 30000
|
||||||
});
|
});
|
||||||
|
|
||||||
// If we got few results and weren't already using a longer window, retry with 6 months
|
|
||||||
const fewResultsThreshold = Math.min(20, limit / 2);
|
|
||||||
if (events.length < fewResultsThreshold && initialSince > sixMonthsAgo && !since) {
|
|
||||||
console.log(`[fetchAllEvents] Only got ${events.length} events, retrying with 6-month window...`);
|
|
||||||
filters[0].since = sixMonthsAgo;
|
|
||||||
events = await fetchEvents(filters, {
|
|
||||||
timeout: 30000
|
|
||||||
});
|
|
||||||
console.log(`[fetchAllEvents] 6-month window returned ${events.length} events`);
|
|
||||||
}
|
|
||||||
|
|
||||||
return events;
|
return events;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -590,14 +590,14 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
|||||||
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
||||||
evs = evs[:*f.Limit]
|
evs = evs[:*f.Limit]
|
||||||
}
|
}
|
||||||
// delete the expired events in a background thread
|
// TODO: DISABLED - inline deletion of expired events causes Badger race conditions
|
||||||
go func() {
|
// under high concurrent load ("assignment to entry in nil map" panic).
|
||||||
for i, ser := range expDeletes {
|
// Expired events should be cleaned up by a separate, rate-limited background
|
||||||
if err = d.DeleteEventBySerial(c, ser, expEvs[i]); chk.E(err) {
|
// worker instead of being deleted inline during query processing.
|
||||||
continue
|
// See: pkg/storage/gc.go TODOs for proper batch deletion implementation.
|
||||||
}
|
if len(expDeletes) > 0 {
|
||||||
}
|
log.D.F("QueryEvents: found %d expired events (deletion disabled)", len(expDeletes))
|
||||||
}()
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -183,8 +183,9 @@ func (s *Service) saveEvent(ctx context.Context, ev *event.E) Result {
|
|||||||
saveCtx, cancel := context.WithTimeout(ctx, s.cfg.WriteTimeout)
|
saveCtx, cancel := context.WithTimeout(ctx, s.cfg.WriteTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Apply rate limiting
|
// Apply rate limiting (skip for NIP-46 bunker events which need realtime priority)
|
||||||
if s.rateLimiter != nil && s.rateLimiter.IsEnabled() {
|
const kindNIP46 = 24133
|
||||||
|
if s.rateLimiter != nil && s.rateLimiter.IsEnabled() && ev.Kind != uint16(kindNIP46) {
|
||||||
const writeOpType = 1 // ratelimit.Write
|
const writeOpType = 1 // ratelimit.Write
|
||||||
s.rateLimiter.Wait(saveCtx, writeOpType)
|
s.rateLimiter.Wait(saveCtx, writeOpType)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -14,6 +15,7 @@ type BufferedWriter struct {
|
|||||||
original io.Writer
|
original io.Writer
|
||||||
buffer *Buffer
|
buffer *Buffer
|
||||||
lineBuf bytes.Buffer
|
lineBuf bytes.Buffer
|
||||||
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log format regex patterns
|
// Log format regex patterns
|
||||||
@@ -42,10 +44,12 @@ func (w *BufferedWriter) Write(p []byte) (n int, err error) {
|
|||||||
|
|
||||||
// Store in buffer if we have one
|
// Store in buffer if we have one
|
||||||
if w.buffer != nil {
|
if w.buffer != nil {
|
||||||
|
w.mu.Lock()
|
||||||
// Accumulate data in line buffer
|
// Accumulate data in line buffer
|
||||||
w.lineBuf.Write(p)
|
w.lineBuf.Write(p)
|
||||||
|
|
||||||
// Process complete lines
|
// Process complete lines
|
||||||
|
var entries []LogEntry
|
||||||
for {
|
for {
|
||||||
line, lineErr := w.lineBuf.ReadString('\n')
|
line, lineErr := w.lineBuf.ReadString('\n')
|
||||||
if lineErr != nil {
|
if lineErr != nil {
|
||||||
@@ -56,12 +60,18 @@ func (w *BufferedWriter) Write(p []byte) (n int, err error) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse and store the complete line
|
// Parse the complete line
|
||||||
entry := w.parseLine(strings.TrimSuffix(line, "\n"))
|
entry := w.parseLine(strings.TrimSuffix(line, "\n"))
|
||||||
if entry.Message != "" {
|
if entry.Message != "" {
|
||||||
w.buffer.Add(entry)
|
entries = append(entries, entry)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
w.mu.Unlock()
|
||||||
|
|
||||||
|
// Add entries outside the lock to avoid holding it during buffer.Add
|
||||||
|
for _, entry := range entries {
|
||||||
|
w.buffer.Add(entry)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -3,12 +3,15 @@ package neo4j
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.mleku.dev/mleku/nostr/encoders/event"
|
"git.mleku.dev/mleku/nostr/encoders/event"
|
||||||
"git.mleku.dev/mleku/nostr/encoders/filter"
|
"git.mleku.dev/mleku/nostr/encoders/filter"
|
||||||
"git.mleku.dev/mleku/nostr/encoders/hex"
|
"git.mleku.dev/mleku/nostr/encoders/hex"
|
||||||
|
"git.mleku.dev/mleku/nostr/encoders/kind"
|
||||||
"git.mleku.dev/mleku/nostr/encoders/tag"
|
"git.mleku.dev/mleku/nostr/encoders/tag"
|
||||||
"lol.mleku.dev/log"
|
"lol.mleku.dev/log"
|
||||||
"next.orly.dev/pkg/database/indexes/types"
|
"next.orly.dev/pkg/database/indexes/types"
|
||||||
@@ -41,11 +44,81 @@ func (n *N) QueryEventsWithOptions(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Parse response
|
// Parse response
|
||||||
evs, err = n.parseEventsFromResult(result)
|
allEvents, err := n.parseEventsFromResult(result)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to parse events: %w", err)
|
return nil, fmt.Errorf("failed to parse events: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Filter replaceable events to only return the latest version
|
||||||
|
// unless showAllVersions is true
|
||||||
|
if showAllVersions {
|
||||||
|
return allEvents, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Separate events by type and filter replaceables
|
||||||
|
replaceableEvents := make(map[string]*event.E) // key: pubkey:kind
|
||||||
|
paramReplaceableEvents := make(map[string]map[string]*event.E) // key: pubkey:kind -> d-tag -> event
|
||||||
|
var regularEvents event.S
|
||||||
|
|
||||||
|
for _, ev := range allEvents {
|
||||||
|
if kind.IsReplaceable(ev.Kind) {
|
||||||
|
// For replaceable events, keep only the latest per pubkey:kind
|
||||||
|
key := hex.Enc(ev.Pubkey) + ":" + strconv.Itoa(int(ev.Kind))
|
||||||
|
existing, exists := replaceableEvents[key]
|
||||||
|
if !exists || ev.CreatedAt > existing.CreatedAt {
|
||||||
|
replaceableEvents[key] = ev
|
||||||
|
}
|
||||||
|
} else if kind.IsParameterizedReplaceable(ev.Kind) {
|
||||||
|
// For parameterized replaceable events, keep only the latest per pubkey:kind:d-tag
|
||||||
|
key := hex.Enc(ev.Pubkey) + ":" + strconv.Itoa(int(ev.Kind))
|
||||||
|
|
||||||
|
// Get the 'd' tag value
|
||||||
|
dTag := ev.Tags.GetFirst([]byte("d"))
|
||||||
|
var dValue string
|
||||||
|
if dTag != nil && dTag.Len() > 1 {
|
||||||
|
dValue = string(dTag.Value())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize inner map if needed
|
||||||
|
if _, exists := paramReplaceableEvents[key]; !exists {
|
||||||
|
paramReplaceableEvents[key] = make(map[string]*event.E)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep only the newest version
|
||||||
|
existing, exists := paramReplaceableEvents[key][dValue]
|
||||||
|
if !exists || ev.CreatedAt > existing.CreatedAt {
|
||||||
|
paramReplaceableEvents[key][dValue] = ev
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
regularEvents = append(regularEvents, ev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Combine results
|
||||||
|
evs = make(event.S, 0, len(replaceableEvents)+len(paramReplaceableEvents)+len(regularEvents))
|
||||||
|
|
||||||
|
for _, ev := range replaceableEvents {
|
||||||
|
evs = append(evs, ev)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, innerMap := range paramReplaceableEvents {
|
||||||
|
for _, ev := range innerMap {
|
||||||
|
evs = append(evs, ev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
evs = append(evs, regularEvents...)
|
||||||
|
|
||||||
|
// Re-sort by timestamp (newest first)
|
||||||
|
sort.Slice(evs, func(i, j int) bool {
|
||||||
|
return evs[i].CreatedAt > evs[j].CreatedAt
|
||||||
|
})
|
||||||
|
|
||||||
|
// Re-apply limit after filtering
|
||||||
|
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
||||||
|
evs = evs[:*f.Limit]
|
||||||
|
}
|
||||||
|
|
||||||
return evs, nil
|
return evs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -56,6 +56,15 @@ func (n *N) SaveEvent(c context.Context, ev *event.E) (exists bool, err error) {
|
|||||||
return true, nil // Event already exists
|
return true, nil // Event already exists
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For parameterized replaceable events (kinds 30000-39999), delete older versions
|
||||||
|
// before saving the new one. This ensures Neo4j only stores the latest version.
|
||||||
|
if ev.Kind >= 30000 && ev.Kind < 40000 {
|
||||||
|
if err := n.deleteOlderParameterizedReplaceable(c, ev); err != nil {
|
||||||
|
n.Logger.Warningf("failed to delete older replaceable events: %v", err)
|
||||||
|
// Continue with save - older events will be filtered at query time
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Get next serial number
|
// Get next serial number
|
||||||
serial, err := n.getNextSerial()
|
serial, err := n.getNextSerial()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -444,3 +453,37 @@ ORDER BY e.created_at DESC`
|
|||||||
|
|
||||||
return wouldReplace, serials, nil
|
return wouldReplace, serials, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deleteOlderParameterizedReplaceable deletes older versions of parameterized replaceable events
|
||||||
|
// (kinds 30000-39999) that have the same pubkey, kind, and d-tag value.
|
||||||
|
// This is called before saving a new event to ensure only the latest version is stored.
|
||||||
|
func (n *N) deleteOlderParameterizedReplaceable(c context.Context, ev *event.E) error {
|
||||||
|
authorPubkey := hex.Enc(ev.Pubkey[:])
|
||||||
|
|
||||||
|
// Get the d-tag value
|
||||||
|
dTag := ev.Tags.GetFirst([]byte{'d'})
|
||||||
|
dValue := ""
|
||||||
|
if dTag != nil && len(dTag.T) >= 2 {
|
||||||
|
dValue = string(dTag.T[1])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete older events with same pubkey, kind, and d-tag
|
||||||
|
// Only delete if the existing event is older than the new one
|
||||||
|
cypher := `
|
||||||
|
MATCH (e:Event {kind: $kind, pubkey: $pubkey})-[:TAGGED_WITH]->(t:Tag {type: 'd', value: $dValue})
|
||||||
|
WHERE e.created_at < $createdAt
|
||||||
|
DETACH DELETE e`
|
||||||
|
|
||||||
|
params := map[string]any{
|
||||||
|
"pubkey": authorPubkey,
|
||||||
|
"kind": int64(ev.Kind),
|
||||||
|
"dValue": dValue,
|
||||||
|
"createdAt": ev.CreatedAt,
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := n.ExecuteWrite(c, cypher, params); err != nil {
|
||||||
|
return fmt.Errorf("failed to delete older replaceable events: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ package nrc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -25,6 +27,8 @@ const (
|
|||||||
KindNRCRequest = 24891
|
KindNRCRequest = 24891
|
||||||
// KindNRCResponse is the event kind for NRC responses.
|
// KindNRCResponse is the event kind for NRC responses.
|
||||||
KindNRCResponse = 24892
|
KindNRCResponse = 24892
|
||||||
|
// MaxChunkSize is the maximum size for a single chunk (40KB to stay under 65KB limit after NIP-44 + base64).
|
||||||
|
MaxChunkSize = 40000
|
||||||
)
|
)
|
||||||
|
|
||||||
// BridgeConfig holds configuration for the NRC bridge.
|
// BridgeConfig holds configuration for the NRC bridge.
|
||||||
@@ -300,6 +304,8 @@ func (b *Bridge) forwardToLocalRelay(ctx context.Context, session *Session, reqE
|
|||||||
return b.handleCLOSE(ctx, session, reqEvent, reqMsg)
|
return b.handleCLOSE(ctx, session, reqEvent, reqMsg)
|
||||||
case "COUNT":
|
case "COUNT":
|
||||||
return b.handleCOUNT(ctx, session, reqEvent, reqMsg, localConn)
|
return b.handleCOUNT(ctx, session, reqEvent, reqMsg, localConn)
|
||||||
|
case "IDS":
|
||||||
|
return b.handleIDS(ctx, session, reqEvent, reqMsg, localConn)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unsupported message type: %s", reqMsg.Type)
|
return fmt.Errorf("unsupported message type: %s", reqMsg.Type)
|
||||||
}
|
}
|
||||||
@@ -462,6 +468,158 @@ func (b *Bridge) handleCOUNT(ctx context.Context, session *Session, reqEvent *ev
|
|||||||
return b.sendResponse(ctx, reqEvent, session, resp)
|
return b.sendResponse(ctx, reqEvent, session, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleIDS handles an IDS message - returns event manifests for diffing.
|
||||||
|
func (b *Bridge) handleIDS(ctx context.Context, session *Session, reqEvent *event.E, reqMsg *RequestMessage, conn *ws.Client) error {
|
||||||
|
// Extract subscription ID and filters from payload
|
||||||
|
// Payload: ["IDS", "<sub_id>", filter1, filter2, ...]
|
||||||
|
if len(reqMsg.Payload) < 3 {
|
||||||
|
return fmt.Errorf("invalid IDS payload")
|
||||||
|
}
|
||||||
|
subID, ok := reqMsg.Payload[1].(string)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid subscription ID")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse filters from payload
|
||||||
|
var filters []*filter.F
|
||||||
|
for i := 2; i < len(reqMsg.Payload); i++ {
|
||||||
|
filterMap, ok := reqMsg.Payload[i].(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
filterBytes, err := json.Marshal(filterMap)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var f filter.F
|
||||||
|
if err := json.Unmarshal(filterBytes, &f); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
filters = append(filters, &f)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(filters) == 0 {
|
||||||
|
return fmt.Errorf("no valid filters in IDS")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add subscription to session
|
||||||
|
if err := session.AddSubscription(subID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer session.RemoveSubscription(subID)
|
||||||
|
|
||||||
|
// Create filter set
|
||||||
|
filterSet := filter.NewS(filters...)
|
||||||
|
|
||||||
|
// Subscribe to local relay
|
||||||
|
sub, err := conn.Subscribe(ctx, filterSet)
|
||||||
|
if chk.E(err) {
|
||||||
|
return fmt.Errorf("local subscribe failed: %w", err)
|
||||||
|
}
|
||||||
|
defer sub.Unsub()
|
||||||
|
|
||||||
|
// Collect events and build manifest
|
||||||
|
var manifest []EventManifestEntry
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case ev := <-sub.Events:
|
||||||
|
if ev == nil {
|
||||||
|
// Subscription closed, send IDS response
|
||||||
|
return b.sendIDSResponse(ctx, reqEvent, session, subID, manifest)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build manifest entry
|
||||||
|
entry := EventManifestEntry{
|
||||||
|
Kind: int(ev.Kind),
|
||||||
|
ID: string(hex.Enc(ev.ID[:])),
|
||||||
|
CreatedAt: ev.CreatedAt,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for d tag (parameterized replaceable events)
|
||||||
|
dTag := ev.Tags.GetFirst([]byte("d"))
|
||||||
|
if dTag != nil && dTag.Len() >= 2 {
|
||||||
|
entry.D = string(dTag.Value())
|
||||||
|
}
|
||||||
|
|
||||||
|
manifest = append(manifest, entry)
|
||||||
|
case <-sub.EndOfStoredEvents:
|
||||||
|
// Send IDS response with manifest
|
||||||
|
return b.sendIDSResponse(ctx, reqEvent, session, subID, manifest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendIDSResponse sends an IDS response with the event manifest, chunking if necessary.
|
||||||
|
func (b *Bridge) sendIDSResponse(ctx context.Context, reqEvent *event.E, session *Session, subID string, manifest []EventManifestEntry) error {
|
||||||
|
resp := &ResponseMessage{
|
||||||
|
Type: "IDS",
|
||||||
|
Payload: []any{"IDS", subID, manifest},
|
||||||
|
}
|
||||||
|
return b.sendResponseChunked(ctx, reqEvent, session, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendResponseChunked sends a response, chunking if necessary for large payloads.
|
||||||
|
func (b *Bridge) sendResponseChunked(ctx context.Context, reqEvent *event.E, session *Session, resp *ResponseMessage) error {
|
||||||
|
// Marshal response content
|
||||||
|
content, err := MarshalResponseContent(resp)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If small enough, send directly
|
||||||
|
if len(content) <= MaxChunkSize {
|
||||||
|
return b.sendResponse(ctx, reqEvent, session, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Need to chunk - encode to base64 for safe transmission
|
||||||
|
encoded := base64.StdEncoding.EncodeToString(content)
|
||||||
|
var chunks []string
|
||||||
|
|
||||||
|
// Split into chunks
|
||||||
|
for i := 0; i < len(encoded); i += MaxChunkSize {
|
||||||
|
end := i + MaxChunkSize
|
||||||
|
if end > len(encoded) {
|
||||||
|
end = len(encoded)
|
||||||
|
}
|
||||||
|
chunks = append(chunks, encoded[i:end])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate message ID
|
||||||
|
messageID := generateMessageID()
|
||||||
|
log.D.F("NRC: chunking large message (%d bytes) into %d chunks", len(content), len(chunks))
|
||||||
|
|
||||||
|
// Send each chunk
|
||||||
|
for i, chunkData := range chunks {
|
||||||
|
chunkMsg := ChunkMessage{
|
||||||
|
Type: "CHUNK",
|
||||||
|
MessageID: messageID,
|
||||||
|
Index: i,
|
||||||
|
Total: len(chunks),
|
||||||
|
Data: chunkData,
|
||||||
|
}
|
||||||
|
|
||||||
|
chunkResp := &ResponseMessage{
|
||||||
|
Type: "CHUNK",
|
||||||
|
Payload: []any{chunkMsg},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := b.sendResponse(ctx, reqEvent, session, chunkResp); err != nil {
|
||||||
|
return fmt.Errorf("failed to send chunk %d/%d: %w", i+1, len(chunks), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// generateMessageID generates a random message ID for chunking.
|
||||||
|
func generateMessageID() string {
|
||||||
|
b := make([]byte, 16)
|
||||||
|
rand.Read(b)
|
||||||
|
return string(hex.Enc(b))
|
||||||
|
}
|
||||||
|
|
||||||
// sendResponse encrypts and sends a response to the client.
|
// sendResponse encrypts and sends a response to the client.
|
||||||
func (b *Bridge) sendResponse(ctx context.Context, reqEvent *event.E, session *Session, resp *ResponseMessage) error {
|
func (b *Bridge) sendResponse(ctx context.Context, reqEvent *event.E, session *Session, resp *ResponseMessage) error {
|
||||||
// Marshal response content
|
// Marshal response content
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package nrc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -21,6 +22,13 @@ import (
|
|||||||
"lol.mleku.dev/log"
|
"lol.mleku.dev/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// chunkBuffer holds chunks for a message being reassembled.
|
||||||
|
type chunkBuffer struct {
|
||||||
|
chunks map[int]string
|
||||||
|
total int
|
||||||
|
receivedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
// Client connects to a private relay through the NRC tunnel.
|
// Client connects to a private relay through the NRC tunnel.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
uri *ConnectionURI
|
uri *ConnectionURI
|
||||||
@@ -38,6 +46,10 @@ type Client struct {
|
|||||||
subscriptions map[string]chan *event.E
|
subscriptions map[string]chan *event.E
|
||||||
subscriptionsMu sync.Mutex
|
subscriptionsMu sync.Mutex
|
||||||
|
|
||||||
|
// chunkBuffers holds partially received chunked messages.
|
||||||
|
chunkBuffers map[string]*chunkBuffer
|
||||||
|
chunkBuffersMu sync.Mutex
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
@@ -61,6 +73,7 @@ func NewClient(connectionURI string) (*Client, error) {
|
|||||||
clientSigner: uri.GetClientSigner(),
|
clientSigner: uri.GetClientSigner(),
|
||||||
pending: make(map[string]chan *ResponseMessage),
|
pending: make(map[string]chan *ResponseMessage),
|
||||||
subscriptions: make(map[string]chan *event.E),
|
subscriptions: make(map[string]chan *event.E),
|
||||||
|
chunkBuffers: make(map[string]*chunkBuffer),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
}, nil
|
}, nil
|
||||||
@@ -127,6 +140,11 @@ func (c *Client) Close() {
|
|||||||
}
|
}
|
||||||
c.subscriptions = make(map[string]chan *event.E)
|
c.subscriptions = make(map[string]chan *event.E)
|
||||||
c.subscriptionsMu.Unlock()
|
c.subscriptionsMu.Unlock()
|
||||||
|
|
||||||
|
// Clear chunk buffers
|
||||||
|
c.chunkBuffersMu.Lock()
|
||||||
|
c.chunkBuffers = make(map[string]*chunkBuffer)
|
||||||
|
c.chunkBuffersMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleResponses processes incoming NRC response events.
|
// handleResponses processes incoming NRC response events.
|
||||||
@@ -186,6 +204,10 @@ func (c *Client) processResponse(ev *event.E) {
|
|||||||
c.handleCountResponse(resp.Payload, requestEventID)
|
c.handleCountResponse(resp.Payload, requestEventID)
|
||||||
case "AUTH":
|
case "AUTH":
|
||||||
c.handleAuthResponse(resp.Payload, requestEventID)
|
c.handleAuthResponse(resp.Payload, requestEventID)
|
||||||
|
case "IDS":
|
||||||
|
c.handleIDSResponse(resp.Payload, requestEventID)
|
||||||
|
case "CHUNK":
|
||||||
|
c.handleChunkResponse(resp.Payload, requestEventID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -315,6 +337,127 @@ func (c *Client) handleAuthResponse(payload []any, requestEventID string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleIDSResponse handles an IDS response.
|
||||||
|
func (c *Client) handleIDSResponse(payload []any, requestEventID string) {
|
||||||
|
c.pendingMu.Lock()
|
||||||
|
ch, exists := c.pending[requestEventID]
|
||||||
|
c.pendingMu.Unlock()
|
||||||
|
|
||||||
|
if exists {
|
||||||
|
resp := &ResponseMessage{Type: "IDS", Payload: payload}
|
||||||
|
select {
|
||||||
|
case ch <- resp:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleChunkResponse handles a CHUNK response and reassembles the message.
|
||||||
|
func (c *Client) handleChunkResponse(payload []any, requestEventID string) {
|
||||||
|
if len(payload) < 1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse chunk message from payload
|
||||||
|
chunkData, ok := payload[0].(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
log.W.F("NRC: invalid chunk payload format")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
messageID, _ := chunkData["messageId"].(string)
|
||||||
|
indexFloat, _ := chunkData["index"].(float64)
|
||||||
|
totalFloat, _ := chunkData["total"].(float64)
|
||||||
|
data, _ := chunkData["data"].(string)
|
||||||
|
|
||||||
|
if messageID == "" || data == "" {
|
||||||
|
log.W.F("NRC: chunk missing required fields")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
index := int(indexFloat)
|
||||||
|
total := int(totalFloat)
|
||||||
|
|
||||||
|
c.chunkBuffersMu.Lock()
|
||||||
|
defer c.chunkBuffersMu.Unlock()
|
||||||
|
|
||||||
|
// Get or create buffer for this message
|
||||||
|
buf, exists := c.chunkBuffers[messageID]
|
||||||
|
if !exists {
|
||||||
|
buf = &chunkBuffer{
|
||||||
|
chunks: make(map[int]string),
|
||||||
|
total: total,
|
||||||
|
receivedAt: time.Now(),
|
||||||
|
}
|
||||||
|
c.chunkBuffers[messageID] = buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store the chunk
|
||||||
|
buf.chunks[index] = data
|
||||||
|
log.D.F("NRC: received chunk %d/%d for message %s", index+1, total, messageID[:8])
|
||||||
|
|
||||||
|
// Check if we have all chunks
|
||||||
|
if len(buf.chunks) == buf.total {
|
||||||
|
// Reassemble the message
|
||||||
|
var encoded string
|
||||||
|
for i := 0; i < buf.total; i++ {
|
||||||
|
part, ok := buf.chunks[i]
|
||||||
|
if !ok {
|
||||||
|
log.W.F("NRC: missing chunk %d for message %s", i, messageID)
|
||||||
|
delete(c.chunkBuffers, messageID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
encoded += part
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decode from base64
|
||||||
|
decoded, err := base64.StdEncoding.DecodeString(encoded)
|
||||||
|
if err != nil {
|
||||||
|
log.W.F("NRC: failed to decode chunked message: %v", err)
|
||||||
|
delete(c.chunkBuffers, messageID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the reassembled response
|
||||||
|
var resp struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
Payload []any `json:"payload"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(decoded, &resp); err != nil {
|
||||||
|
log.W.F("NRC: failed to parse reassembled message: %v", err)
|
||||||
|
delete(c.chunkBuffers, messageID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.D.F("NRC: reassembled chunked message: %s", resp.Type)
|
||||||
|
|
||||||
|
// Clean up buffer
|
||||||
|
delete(c.chunkBuffers, messageID)
|
||||||
|
|
||||||
|
// Route the reassembled response
|
||||||
|
c.pendingMu.Lock()
|
||||||
|
ch, exists := c.pending[requestEventID]
|
||||||
|
c.pendingMu.Unlock()
|
||||||
|
|
||||||
|
if exists {
|
||||||
|
respMsg := &ResponseMessage{Type: resp.Type, Payload: resp.Payload}
|
||||||
|
select {
|
||||||
|
case ch <- respMsg:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up stale buffers (older than 60 seconds)
|
||||||
|
now := time.Now()
|
||||||
|
for id, b := range c.chunkBuffers {
|
||||||
|
if now.Sub(b.receivedAt) > 60*time.Second {
|
||||||
|
log.W.F("NRC: discarding stale chunk buffer: %s", id)
|
||||||
|
delete(c.chunkBuffers, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// sendRequest sends an NRC request and waits for response.
|
// sendRequest sends an NRC request and waits for response.
|
||||||
func (c *Client) sendRequest(ctx context.Context, msgType string, payload []any) (*ResponseMessage, error) {
|
func (c *Client) sendRequest(ctx context.Context, msgType string, payload []any) (*ResponseMessage, error) {
|
||||||
// Build request content
|
// Build request content
|
||||||
@@ -511,3 +654,61 @@ func (c *Client) Count(ctx context.Context, subID string, filters ...*filter.F)
|
|||||||
func (c *Client) RelayURL() string {
|
func (c *Client) RelayURL() string {
|
||||||
return "nrc://" + string(hex.Enc(c.uri.RelayPubkey))
|
return "nrc://" + string(hex.Enc(c.uri.RelayPubkey))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RequestIDs sends an IDS request to get event manifests for diffing.
|
||||||
|
func (c *Client) RequestIDs(ctx context.Context, subID string, filters ...*filter.F) ([]EventManifestEntry, error) {
|
||||||
|
// Build payload: ["IDS", "<sub_id>", filter1, filter2, ...]
|
||||||
|
payload := []any{"IDS", subID}
|
||||||
|
for _, f := range filters {
|
||||||
|
filterBytes, err := json.Marshal(f)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("marshal filter failed: %w", err)
|
||||||
|
}
|
||||||
|
var filterMap map[string]any
|
||||||
|
if err := json.Unmarshal(filterBytes, &filterMap); err != nil {
|
||||||
|
return nil, fmt.Errorf("unmarshal filter failed: %w", err)
|
||||||
|
}
|
||||||
|
payload = append(payload, filterMap)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.sendRequest(ctx, "IDS", payload)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse IDS response: ["IDS", "<sub_id>", [...manifest...]]
|
||||||
|
if resp.Type != "IDS" || len(resp.Payload) < 3 {
|
||||||
|
return nil, fmt.Errorf("unexpected response type: %s", resp.Type)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse manifest entries
|
||||||
|
manifestData, ok := resp.Payload[2].([]any)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid manifest response")
|
||||||
|
}
|
||||||
|
|
||||||
|
var manifest []EventManifestEntry
|
||||||
|
for _, item := range manifestData {
|
||||||
|
entryMap, ok := item.(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
entry := EventManifestEntry{}
|
||||||
|
if k, ok := entryMap["kind"].(float64); ok {
|
||||||
|
entry.Kind = int(k)
|
||||||
|
}
|
||||||
|
if id, ok := entryMap["id"].(string); ok {
|
||||||
|
entry.ID = id
|
||||||
|
}
|
||||||
|
if ca, ok := entryMap["created_at"].(float64); ok {
|
||||||
|
entry.CreatedAt = int64(ca)
|
||||||
|
}
|
||||||
|
if d, ok := entryMap["d"].(string); ok {
|
||||||
|
entry.D = d
|
||||||
|
}
|
||||||
|
manifest = append(manifest, entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
return manifest, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -276,16 +276,33 @@ func (m *SessionManager) Close() {
|
|||||||
|
|
||||||
// RequestMessage represents a parsed NRC request message.
|
// RequestMessage represents a parsed NRC request message.
|
||||||
type RequestMessage struct {
|
type RequestMessage struct {
|
||||||
Type string // EVENT, REQ, CLOSE, AUTH, COUNT
|
Type string // EVENT, REQ, CLOSE, AUTH, COUNT, IDS
|
||||||
Payload []any
|
Payload []any
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResponseMessage represents an NRC response message to be sent.
|
// ResponseMessage represents an NRC response message to be sent.
|
||||||
type ResponseMessage struct {
|
type ResponseMessage struct {
|
||||||
Type string // EVENT, OK, EOSE, NOTICE, CLOSED, COUNT, AUTH
|
Type string // EVENT, OK, EOSE, NOTICE, CLOSED, COUNT, AUTH, IDS, CHUNK
|
||||||
Payload []any
|
Payload []any
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EventManifestEntry describes an event for manifest diffing (used by IDS).
|
||||||
|
type EventManifestEntry struct {
|
||||||
|
Kind int `json:"kind"`
|
||||||
|
ID string `json:"id"`
|
||||||
|
CreatedAt int64 `json:"created_at"`
|
||||||
|
D string `json:"d,omitempty"` // For parameterized replaceable events (kinds 30000-39999)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChunkMessage represents a chunk of a large message.
|
||||||
|
type ChunkMessage struct {
|
||||||
|
Type string `json:"type"` // Always "CHUNK"
|
||||||
|
MessageID string `json:"messageId"` // Unique ID for the chunked message
|
||||||
|
Index int `json:"index"` // 0-based chunk index
|
||||||
|
Total int `json:"total"` // Total number of chunks
|
||||||
|
Data string `json:"data"` // Base64 encoded chunk data
|
||||||
|
}
|
||||||
|
|
||||||
// ParseRequestContent parses the decrypted content of an NRC request.
|
// ParseRequestContent parses the decrypted content of an NRC request.
|
||||||
func ParseRequestContent(content []byte) (*RequestMessage, error) {
|
func ParseRequestContent(content []byte) (*RequestMessage, error) {
|
||||||
// Content format: {"type": "EVENT|REQ|...", "payload": [...]}
|
// Content format: {"type": "EVENT|REQ|...", "payload": [...]}
|
||||||
|
|||||||
@@ -2,6 +2,24 @@
|
|||||||
|
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
|
// TODO: IMPORTANT - This GC implementation is EXPERIMENTAL and may cause crashes under high load.
|
||||||
|
// The current implementation has the following issues that need to be addressed:
|
||||||
|
//
|
||||||
|
// 1. Badger race condition: DeleteEventBySerial runs transactions that can trigger
|
||||||
|
// "assignment to entry in nil map" panics in Badger v4.8.0 under concurrent load.
|
||||||
|
// This happens when GC deletes events while many REQ queries are being processed.
|
||||||
|
//
|
||||||
|
// 2. Batch transaction handling: On large datasets (14+ GB), deletions should be:
|
||||||
|
// - Serialized or use a transaction pool to prevent concurrent txn issues
|
||||||
|
// - Batched with proper delays between batches to avoid overwhelming Badger
|
||||||
|
// - Rate-limited based on current system load
|
||||||
|
//
|
||||||
|
// 3. The current 10ms delay every 100 events (line ~237) is insufficient for busy relays.
|
||||||
|
// Consider adaptive rate limiting based on pending transaction count.
|
||||||
|
//
|
||||||
|
// 4. Consider using Badger's WriteBatch API instead of individual Update transactions
|
||||||
|
// for bulk deletions, which may be more efficient and avoid some race conditions.
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
v0.52.3
|
v0.52.12
|
||||||
|
|||||||
Reference in New Issue
Block a user