diff --git a/.claude/settings.local.json b/.claude/settings.local.json index d0a6b39..b33f21a 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -15,7 +15,9 @@ "Bash(md5sum:*)", "Bash(timeout 3 bash -c 'echo [\\\"\"REQ\\\"\",\\\"\"test456\\\"\",{\\\"\"kinds\\\"\":[1],\\\"\"limit\\\"\":10}] | websocat ws://localhost:3334')", "Bash(printf:*)", - "Bash(websocat:*)" + "Bash(websocat:*)", + "Bash(go test:*)", + "Bash(timeout 180 go test:*)" ], "deny": [], "ask": [] diff --git a/ALL_FIXES.md b/ALL_FIXES.md deleted file mode 100644 index b013c7f..0000000 --- a/ALL_FIXES.md +++ /dev/null @@ -1,353 +0,0 @@ -# Complete WebSocket Stability Fixes - All Issues Resolved - -## Issues Identified & Fixed - -### 1. ⚠️ Publisher Not Delivering Events (CRITICAL) -**Problem:** Events published but never delivered to subscribers - -**Root Cause:** Missing receiver channel in publisher -- Subscription struct missing `Receiver` field -- Publisher tried to send directly to write channel -- Consumer goroutines never received events -- Bypassed the khatru architecture - -**Solution:** Store and use receiver channels -- Added `Receiver event.C` field to Subscription struct -- Store receiver when registering subscriptions -- Send events to receiver channel (not write channel) -- Let consumer goroutines handle formatting and delivery - -**Files Modified:** -- `app/publisher.go:32` - Added Receiver field to Subscription struct -- `app/publisher.go:125,130` - Store receiver when registering -- `app/publisher.go:242-266` - Send to receiver channel **THE KEY FIX** - ---- - -### 2. ⚠️ REQ Parsing Failure (CRITICAL) -**Problem:** All REQ messages failed with EOF error - -**Root Cause:** Filter parser consuming envelope closing bracket -- `filter.S.Unmarshal` assumed filters were array-wrapped `[{...},{...}]` -- In REQ envelopes, filters are unwrapped: `"subid",{...},{...}]` -- Parser consumed the closing `]` meant for the envelope -- `SkipToTheEnd` couldn't find closing bracket → EOF error - -**Solution:** Handle both wrapped and unwrapped filter arrays -- Detect if filters start with `[` (array-wrapped) or `{` (unwrapped) -- For unwrapped filters, leave closing `]` for envelope parser -- For wrapped filters, consume the closing `]` as before - -**Files Modified:** -- `pkg/encoders/filter/filters.go:49-103` - Smart filter parsing **THE KEY FIX** - ---- - -### 3. ⚠️ Subscription Drops (CRITICAL) -**Problem:** Subscriptions stopped receiving events after ~30-60 seconds - -**Root Cause:** Receiver channels created but never consumed -- Channels filled up (32 event buffer) -- Publisher timed out trying to send -- Subscriptions removed as "dead" - -**Solution:** Per-subscription consumer goroutines (khatru pattern) -- Each subscription gets dedicated goroutine -- Continuously reads from receiver channel -- Forwards events to client via write worker -- Clean cancellation via context - -**Files Modified:** -- `app/listener.go:45-46` - Added subscription tracking map -- `app/handle-req.go:644-688` - Consumer goroutines **THE KEY FIX** -- `app/handle-close.go:29-48` - Proper cancellation -- `app/handle-websocket.go:136-143` - Cleanup all on disconnect - ---- - -### 4. ⚠️ Message Queue Overflow -**Problem:** Message queue filled up, messages dropped -``` -⚠️ ws->10.0.0.2 message queue full, dropping message (capacity=100) -``` - -**Root Cause:** Messages processed synchronously -- `HandleMessage` → `HandleReq` can take seconds (database queries) -- While one message processes, others pile up -- Queue fills (100 capacity) -- New messages dropped - -**Solution:** Concurrent message processing (khatru pattern) -```go -// BEFORE: Synchronous (blocking) -l.HandleMessage(req.data, req.remote) // Blocks until done - -// AFTER: Concurrent (non-blocking) -go l.HandleMessage(req.data, req.remote) // Spawns goroutine -``` - -**Files Modified:** -- `app/listener.go:199` - Added `go` keyword for concurrent processing - ---- - -### 5. ⚠️ Test Tool Panic -**Problem:** Subscription test tool panicked -``` -panic: repeated read on failed websocket connection -``` - -**Root Cause:** Error handling didn't distinguish timeout from fatal errors -- Timeout errors continued reading -- Fatal errors continued reading -- Eventually hit gorilla/websocket's panic - -**Solution:** Proper error type detection -- Check for timeout using type assertion -- Exit cleanly on fatal errors -- Limit consecutive timeouts (20 max) - -**Files Modified:** -- `cmd/subscription-test/main.go:124-137` - Better error handling - ---- - -## Architecture Changes - -### Message Flow (Before → After) - -**BEFORE (Broken):** -``` -WebSocket Read → Queue Message → Process Synchronously (BLOCKS) - ↓ - Queue fills → Drop messages - -REQ → Create Receiver Channel → Register → (nothing reads channel) - ↓ - Events published → Try to send → TIMEOUT - ↓ - Subscription removed -``` - -**AFTER (Fixed - khatru pattern):** -``` -WebSocket Read → Queue Message → Process Concurrently (NON-BLOCKING) - ↓ - Multiple handlers run in parallel - -REQ → Create Receiver Channel → Register → Launch Consumer Goroutine - ↓ - Events published → Send to channel (fast) - ↓ - Consumer reads → Forward to client (continuous) -``` - ---- - -## khatru Patterns Adopted - -### 1. Per-Subscription Consumer Goroutines -```go -go func() { - for { - select { - case <-subCtx.Done(): - return // Clean cancellation - case ev := <-receiver: - // Forward event to client - eventenvelope.NewResultWith(subID, ev).Write(l) - } - } -}() -``` - -### 2. Concurrent Message Handling -```go -// Sequential parsing (in read loop) -envelope := parser.Parse(message) - -// Concurrent handling (in goroutine) -go handleMessage(envelope) -``` - -### 3. Independent Subscription Contexts -```go -// Connection context (cancelled on disconnect) -ctx, cancel := context.WithCancel(serverCtx) - -// Subscription context (cancelled on CLOSE or disconnect) -subCtx, subCancel := context.WithCancel(ctx) -``` - -### 4. Write Serialization -```go -// Single write worker goroutine per connection -go func() { - for req := range writeChan { - conn.WriteMessage(req.MsgType, req.Data) - } -}() -``` - ---- - -## Files Modified Summary - -| File | Change | Impact | -|------|--------|--------| -| `app/publisher.go:32` | Added Receiver field | **Store receiver channels** | -| `app/publisher.go:125,130` | Store receiver on registration | **Connect publisher to consumers** | -| `app/publisher.go:242-266` | Send to receiver channel | **Fix event delivery** | -| `pkg/encoders/filter/filters.go:49-103` | Smart filter parsing | **Fix REQ parsing** | -| `app/listener.go:45-46` | Added subscription tracking | Track subs for cleanup | -| `app/listener.go:199` | Concurrent message processing | **Fix queue overflow** | -| `app/handle-req.go:621-627` | Independent sub contexts | Isolated lifecycle | -| `app/handle-req.go:644-688` | Consumer goroutines | **Fix subscription drops** | -| `app/handle-close.go:29-48` | Proper cancellation | Clean sub cleanup | -| `app/handle-websocket.go:136-143` | Cancel all on disconnect | Clean connection cleanup | -| `cmd/subscription-test/main.go:124-137` | Better error handling | **Fix test panic** | - ---- - -## Performance Impact - -### Before (Broken) -- ❌ REQ messages fail with EOF error -- ❌ Subscriptions drop after ~30-60 seconds -- ❌ Message queue fills up under load -- ❌ Events stop being delivered -- ❌ Memory leaks (goroutines/channels) -- ❌ CPU waste on timeout retries - -### After (Fixed) -- ✅ REQ messages parse correctly -- ✅ Subscriptions stable indefinitely (hours/days) -- ✅ Message queue never fills up -- ✅ All events delivered without timeouts -- ✅ No resource leaks -- ✅ Efficient goroutine usage - -### Metrics - -| Metric | Before | After | -|--------|--------|-------| -| Subscription lifetime | ~30-60s | Unlimited | -| Events per subscription | ~32 max | Unlimited | -| Message processing | Sequential | Concurrent | -| Queue drops | Common | Never | -| Goroutines per connection | Leaking | Clean | -| Memory per subscription | Growing | Stable ~10KB | - ---- - -## Testing - -### Quick Test (No Events Needed) -```bash -# Terminal 1: Start relay -./orly - -# Terminal 2: Run test -./subscription-test-simple -duration 120 -``` - -**Expected:** Subscription stays active for full 120 seconds - -### Full Test (With Events) -```bash -# Terminal 1: Start relay -./orly - -# Terminal 2: Run test -./subscription-test -duration 60 -v - -# Terminal 3: Publish events (your method) -``` - -**Expected:** All published events received throughout 60 seconds - -### Load Test -```bash -# Run multiple subscriptions simultaneously -for i in {1..10}; do - ./subscription-test-simple -duration 120 -sub "sub$i" & -done -``` - -**Expected:** All 10 subscriptions stay active with no queue warnings - ---- - -## Documentation - -- **[PUBLISHER_FIX.md](PUBLISHER_FIX.md)** - Publisher event delivery fix (NEW) -- **[TEST_NOW.md](TEST_NOW.md)** - Quick testing guide -- **[MESSAGE_QUEUE_FIX.md](MESSAGE_QUEUE_FIX.md)** - Queue overflow details -- **[SUBSCRIPTION_STABILITY_FIXES.md](SUBSCRIPTION_STABILITY_FIXES.md)** - Subscription fixes -- **[TESTING_GUIDE.md](TESTING_GUIDE.md)** - Comprehensive testing -- **[QUICK_START.md](QUICK_START.md)** - 30-second overview -- **[SUMMARY.md](SUMMARY.md)** - Executive summary - ---- - -## Build & Deploy - -```bash -# Build everything -go build -o orly -go build -o subscription-test ./cmd/subscription-test -go build -o subscription-test-simple ./cmd/subscription-test-simple - -# Verify -./subscription-test-simple -duration 60 - -# Deploy -# Replace existing binary, restart service -``` - ---- - -## Backwards Compatibility - -✅ **100% Backward Compatible** -- No wire protocol changes -- No client changes required -- No configuration changes -- No database migrations - -Existing clients automatically benefit from improved stability. - ---- - -## What to Expect After Deploy - -### Positive Indicators (What You'll See) -``` -✓ subscription X created and goroutine launched -✓ delivered real-time event Y to subscription X -✓ subscription delivery QUEUED -``` - -### Negative Indicators (Should NOT See) -``` -✗ subscription delivery TIMEOUT -✗ removing failed subscriber connection -✗ message queue full, dropping message -``` - ---- - -## Summary - -Five critical issues fixed following khatru patterns: - -1. **Publisher not delivering events** → Store and use receiver channels -2. **REQ parsing failure** → Handle both wrapped and unwrapped filter arrays -3. **Subscription drops** → Per-subscription consumer goroutines -4. **Message queue overflow** → Concurrent message processing -5. **Test tool panic** → Proper error handling - -**Result:** WebSocket connections and subscriptions now stable indefinitely with proper event delivery and no resource leaks or message drops. - -**Status:** ✅ All fixes implemented and building successfully -**Ready:** For testing and deployment diff --git a/MESSAGE_QUEUE_FIX.md b/MESSAGE_QUEUE_FIX.md deleted file mode 100644 index 2701f3d..0000000 --- a/MESSAGE_QUEUE_FIX.md +++ /dev/null @@ -1,119 +0,0 @@ -# Message Queue Fix - -## Issue Discovered - -When running the subscription test, the relay logs showed: -``` -⚠️ ws->10.0.0.2 message queue full, dropping message (capacity=100) -``` - -## Root Cause - -The `messageProcessor` goroutine was processing messages **synchronously**, one at a time: - -```go -// BEFORE (blocking) -func (l *Listener) messageProcessor() { - for { - case req := <-l.messageQueue: - l.HandleMessage(req.data, req.remote) // BLOCKS until done - } -} -``` - -**Problem:** -- `HandleMessage` → `HandleReq` can take several seconds (database queries, event delivery) -- While one message is being processed, new messages pile up in the queue -- Queue fills up (100 message capacity) -- New messages get dropped - -## Solution - -Process messages **concurrently** by launching each in its own goroutine (khatru pattern): - -```go -// AFTER (concurrent) -func (l *Listener) messageProcessor() { - for { - case req := <-l.messageQueue: - go l.HandleMessage(req.data, req.remote) // NON-BLOCKING - } -} -``` - -**Benefits:** -- Multiple messages can be processed simultaneously -- Fast operations (CLOSE, AUTH) don't wait behind slow operations (REQ) -- Queue rarely fills up -- No message drops - -## khatru Pattern - -This matches how khatru handles messages: - -1. **Sequential parsing** (in read loop) - Parser state can't be shared -2. **Concurrent handling** (separate goroutines) - Each message independent - -From khatru: -```go -// Parse message (sequential, in read loop) -envelope, err := smp.ParseMessage(message) - -// Handle message (concurrent, in goroutine) -go func(message string) { - switch env := envelope.(type) { - case *nostr.EventEnvelope: - handleEvent(ctx, ws, env, rl) - case *nostr.ReqEnvelope: - handleReq(ctx, ws, env, rl) - // ... - } -}(message) -``` - -## Files Changed - -- `app/listener.go:199` - Added `go` keyword before `l.HandleMessage()` - -## Impact - -**Before:** -- Message queue filled up quickly -- Messages dropped under load -- Slow operations blocked everything - -**After:** -- Messages processed concurrently -- Queue rarely fills up -- Each message type processed at its own pace - -## Testing - -```bash -# Build with fix -go build -o orly - -# Run relay -./orly - -# Run subscription test (should not see queue warnings) -./subscription-test-simple -duration 120 -``` - -## Performance Notes - -**Goroutine overhead:** Minimal (~2KB per goroutine) -- Modern Go runtime handles thousands of goroutines efficiently -- Typical connection: 1-5 concurrent goroutines at a time -- Under load: Goroutines naturally throttle based on CPU/IO capacity - -**Message ordering:** No longer guaranteed within a connection -- This is fine for Nostr protocol (messages are independent) -- Each message type can complete at its own pace -- Matches khatru behavior - -## Summary - -The message queue was filling up because messages were processed synchronously. By processing them concurrently (one goroutine per message), we match khatru's proven architecture and eliminate message drops. - -**Status:** ✅ Fixed in app/listener.go:199 diff --git a/PUBLISHER_FIX.md b/PUBLISHER_FIX.md deleted file mode 100644 index 3122b5e..0000000 --- a/PUBLISHER_FIX.md +++ /dev/null @@ -1,169 +0,0 @@ -# Critical Publisher Bug Fix - -## Issue Discovered - -Events were being published successfully but **never delivered to subscribers**. The test showed: -- Publisher logs: "saved event" -- Subscriber logs: No events received -- No delivery timeouts or errors - -## Root Cause - -The `Subscription` struct in `app/publisher.go` was missing the `Receiver` field: - -```go -// BEFORE - Missing Receiver field -type Subscription struct { - remote string - AuthedPubkey []byte - *filter.S -} -``` - -This meant: -1. Subscriptions were registered with receiver channels in `handle-req.go` -2. Publisher stored subscriptions but **NEVER stored the receiver channels** -3. Consumer goroutines waited on receiver channels -4. Publisher's `Deliver()` tried to send directly to write channels (bypassing consumers) -5. Events never reached the consumer goroutines → never delivered to clients - -## The Architecture (How it Should Work) - -``` -Event Published - ↓ -Publisher.Deliver() matches filters - ↓ -Sends event to Subscription.Receiver channel ← THIS WAS MISSING - ↓ -Consumer goroutine reads from Receiver - ↓ -Formats as EVENT envelope - ↓ -Sends to write channel - ↓ -Write worker sends to client -``` - -## The Fix - -### 1. Add Receiver Field to Subscription Struct - -**File**: `app/publisher.go:29-34` - -```go -// AFTER - With Receiver field -type Subscription struct { - remote string - AuthedPubkey []byte - Receiver event.C // Channel for delivering events to this subscription - *filter.S -} -``` - -### 2. Store Receiver When Registering Subscription - -**File**: `app/publisher.go:125,130` - -```go -// BEFORE -subs[m.Id] = Subscription{ - S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey, -} - -// AFTER -subs[m.Id] = Subscription{ - S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey, Receiver: m.Receiver, -} -``` - -### 3. Send Events to Receiver Channel (Not Write Channel) - -**File**: `app/publisher.go:242-266` - -```go -// BEFORE - Tried to format and send directly to write channel -var res *eventenvelope.Result -if res, err = eventenvelope.NewResultWith(d.id, ev); chk.E(err) { - // ... -} -msgData := res.Marshal(nil) -writeChan <- publish.WriteRequest{Data: msgData, MsgType: websocket.TextMessage} - -// AFTER - Send raw event to receiver channel -if d.sub.Receiver == nil { - log.E.F("subscription %s has nil receiver channel", d.id) - continue -} - -select { -case d.sub.Receiver <- ev: - log.D.F("subscription delivery QUEUED: event=%s to=%s sub=%s", - hex.Enc(ev.ID), d.sub.remote, d.id) -case <-time.After(DefaultWriteTimeout): - log.E.F("subscription delivery TIMEOUT: event=%s to=%s sub=%s", - hex.Enc(ev.ID), d.sub.remote, d.id) -} -``` - -## Why This Pattern Matters (khatru Architecture) - -The khatru pattern uses **per-subscription consumer goroutines** for good reasons: - -1. **Separation of Concerns**: Publisher just matches filters and sends to channels -2. **Formatting Isolation**: Each consumer formats events for its specific subscription -3. **Backpressure Handling**: Channel buffers naturally throttle fast publishers -4. **Clean Cancellation**: Context cancels consumer goroutine, channel cleanup is automatic -5. **No Lock Contention**: Publisher doesn't hold locks during I/O operations - -## Files Modified - -| File | Lines | Change | -|------|-------|--------| -| `app/publisher.go` | 32 | Add `Receiver event.C` field to Subscription | -| `app/publisher.go` | 125, 130 | Store Receiver when registering | -| `app/publisher.go` | 242-266 | Send to receiver channel instead of write channel | -| `app/publisher.go` | 3-19 | Remove unused imports (chk, eventenvelope) | - -## Testing - -```bash -# Terminal 1: Start relay -./orly - -# Terminal 2: Subscribe -websocat ws://localhost:3334 <<< '["REQ","test",{"kinds":[1]}]' - -# Terminal 3: Publish event -websocat ws://localhost:3334 <<< '["EVENT",{"kind":1,"content":"test",...}]' -``` - -**Expected**: Terminal 2 receives the event immediately - -## Impact - -**Before:** -- ❌ No events delivered to subscribers -- ❌ Publisher tried to bypass consumer goroutines -- ❌ Consumer goroutines blocked forever waiting on receiver channels -- ❌ Architecture didn't follow khatru pattern - -**After:** -- ✅ Events delivered via receiver channels -- ✅ Consumer goroutines receive and format events -- ✅ Full khatru pattern implementation -- ✅ Proper separation of concerns - -## Summary - -The subscription stability fixes in the previous work correctly implemented: -- Per-subscription consumer goroutines ✅ -- Independent contexts ✅ -- Concurrent message processing ✅ - -But the publisher was never connected to the consumer goroutines! This fix completes the implementation by: -- Storing receiver channels in subscriptions ✅ -- Sending events to receiver channels ✅ -- Letting consumers handle formatting and delivery ✅ - -**Result**: Events now flow correctly from publisher → receiver channel → consumer → client diff --git a/QUICK_START.md b/QUICK_START.md deleted file mode 100644 index 5aea831..0000000 --- a/QUICK_START.md +++ /dev/null @@ -1,75 +0,0 @@ -# Quick Start - Subscription Stability Testing - -## TL;DR - -Subscriptions were dropping. Now they're fixed. Here's how to verify: - -## 1. Build Everything - -```bash -go build -o orly -go build -o subscription-test ./cmd/subscription-test -``` - -## 2. Test It - -```bash -# Terminal 1: Start relay -./orly - -# Terminal 2: Run test -./subscription-test -url ws://localhost:3334 -duration 60 -v -``` - -## 3. Expected Output - -``` -✓ Connected -✓ Received EOSE - subscription is active - -Waiting for real-time events... - -[EVENT #1] id=abc123... kind=1 created=1234567890 -[EVENT #2] id=def456... kind=1 created=1234567891 -... - -[STATUS] Elapsed: 30s/60s | Events: 15 | Last event: 2s ago -[STATUS] Elapsed: 60s/60s | Events: 30 | Last event: 1s ago - -✓ TEST PASSED - Subscription remained stable -``` - -## What Changed? - -**Before:** Subscriptions dropped after ~30-60 seconds -**After:** Subscriptions stay active indefinitely - -## Key Files Modified - -- `app/listener.go` - Added subscription tracking -- `app/handle-req.go` - Consumer goroutines per subscription -- `app/handle-close.go` - Proper cleanup -- `app/handle-websocket.go` - Cancel all subs on disconnect - -## Why Did It Break? - -Receiver channels were created but never consumed → filled up → publisher timeout → subscription removed - -## How Is It Fixed? - -Each subscription now has a goroutine that continuously reads from its channel and forwards events to the client (khatru pattern). - -## More Info - -- **Technical details:** [SUBSCRIPTION_STABILITY_FIXES.md](SUBSCRIPTION_STABILITY_FIXES.md) -- **Full testing guide:** [TESTING_GUIDE.md](TESTING_GUIDE.md) -- **Complete summary:** [SUMMARY.md](SUMMARY.md) - -## Questions? - -```bash -./subscription-test -h # Test tool help -export ORLY_LOG_LEVEL=debug # Enable debug logs -``` - -That's it! 🎉 diff --git a/SUBSCRIPTION_STABILITY_FIXES.md b/SUBSCRIPTION_STABILITY_FIXES.md deleted file mode 100644 index 0a5a7d8..0000000 --- a/SUBSCRIPTION_STABILITY_FIXES.md +++ /dev/null @@ -1,371 +0,0 @@ -# WebSocket Subscription Stability Fixes - -## Executive Summary - -This document describes critical fixes applied to resolve subscription drop issues in the ORLY Nostr relay. The primary issue was **receiver channels were created but never consumed**, causing subscriptions to appear "dead" after a short period. - -## Root Causes Identified - -### 1. **Missing Receiver Channel Consumer** (Critical) -**Location:** [app/handle-req.go:616](app/handle-req.go#L616) - -**Problem:** -- `HandleReq` created a receiver channel: `receiver := make(event.C, 32)` -- This channel was passed to the publisher but **never consumed** -- When events were published, the channel filled up (32-event buffer) -- Publisher attempts to send timed out after 3 seconds -- Publisher assumed connection was dead and removed subscription - -**Impact:** Subscriptions dropped after receiving ~32 events or after inactivity timeout. - -### 2. **No Independent Subscription Context** -**Location:** [app/handle-req.go](app/handle-req.go) - -**Problem:** -- Subscriptions used the listener's connection context directly -- If the query context was cancelled (timeout, error), it affected active subscriptions -- No way to independently cancel individual subscriptions -- Similar to khatru, each subscription needs its own context hierarchy - -**Impact:** Query timeouts or errors could inadvertently cancel active subscriptions. - -### 3. **Incomplete Subscription Cleanup** -**Location:** [app/handle-close.go](app/handle-close.go) - -**Problem:** -- `HandleClose` sent cancel signal to publisher -- But didn't close receiver channels or stop consumer goroutines -- Led to goroutine leaks and channel leaks - -**Impact:** Memory leaks over time, especially with many short-lived subscriptions. - -## Solutions Implemented - -### 1. Per-Subscription Consumer Goroutines - -**Added in [app/handle-req.go:644-688](app/handle-req.go#L644-L688):** - -```go -// Launch goroutine to consume from receiver channel and forward to client -go func() { - defer func() { - // Clean up when subscription ends - l.subscriptionsMu.Lock() - delete(l.subscriptions, subID) - l.subscriptionsMu.Unlock() - log.D.F("subscription goroutine exiting for %s @ %s", subID, l.remote) - }() - - for { - select { - case <-subCtx.Done(): - // Subscription cancelled (CLOSE message or connection closing) - return - case ev, ok := <-receiver: - if !ok { - // Channel closed - subscription ended - return - } - - // Forward event to client via write channel - var res *eventenvelope.Result - var err error - if res, err = eventenvelope.NewResultWith(subID, ev); chk.E(err) { - continue - } - - // Write to client - this goes through the write worker - if err = res.Write(l); err != nil { - if !strings.Contains(err.Error(), "context canceled") { - log.E.F("failed to write event to subscription %s @ %s: %v", subID, l.remote, err) - } - continue - } - - log.D.F("delivered real-time event %s to subscription %s @ %s", - hexenc.Enc(ev.ID), subID, l.remote) - } - } -}() -``` - -**Benefits:** -- Events are continuously consumed from receiver channel -- Channel never fills up -- Publisher can always send without timeout -- Clean shutdown when subscription is cancelled - -### 2. Independent Subscription Contexts - -**Added in [app/handle-req.go:621-627](app/handle-req.go#L621-L627):** - -```go -// Create a dedicated context for this subscription that's independent of query context -// but is child of the listener context so it gets cancelled when connection closes -subCtx, subCancel := context.WithCancel(l.ctx) - -// Track this subscription so we can cancel it on CLOSE or connection close -subID := string(env.Subscription) -l.subscriptionsMu.Lock() -l.subscriptions[subID] = subCancel -l.subscriptionsMu.Unlock() -``` - -**Added subscription tracking to Listener struct [app/listener.go:46-47](app/listener.go#L46-L47):** - -```go -// Subscription tracking for cleanup -subscriptions map[string]context.CancelFunc // Map of subscription ID to cancel function -subscriptionsMu sync.Mutex // Protects subscriptions map -``` - -**Benefits:** -- Each subscription has independent lifecycle -- Query timeouts don't affect active subscriptions -- Clean cancellation via context pattern -- Follows khatru's proven architecture - -### 3. Proper Subscription Cleanup - -**Updated [app/handle-close.go:29-48](app/handle-close.go#L29-L48):** - -```go -subID := string(env.ID) - -// Cancel the subscription goroutine by calling its cancel function -l.subscriptionsMu.Lock() -if cancelFunc, exists := l.subscriptions[subID]; exists { - log.D.F("cancelling subscription %s for %s", subID, l.remote) - cancelFunc() - delete(l.subscriptions, subID) -} else { - log.D.F("subscription %s not found for %s (already closed?)", subID, l.remote) -} -l.subscriptionsMu.Unlock() - -// Also remove from publisher's tracking -l.publishers.Receive( - &W{ - Cancel: true, - remote: l.remote, - Conn: l.conn, - Id: subID, - }, -) -``` - -**Updated connection cleanup in [app/handle-websocket.go:136-143](app/handle-websocket.go#L136-L143):** - -```go -// Cancel all active subscriptions first -listener.subscriptionsMu.Lock() -for subID, cancelFunc := range listener.subscriptions { - log.D.F("cancelling subscription %s for %s", subID, remote) - cancelFunc() -} -listener.subscriptions = nil -listener.subscriptionsMu.Unlock() -``` - -**Benefits:** -- Subscriptions properly cancelled on CLOSE message -- All subscriptions cancelled when connection closes -- No goroutine or channel leaks -- Clean resource management - -## Architecture Comparison: ORLY vs khatru - -### Before (Broken) -``` -REQ → Create receiver channel → Register with publisher → Done - ↓ -Events published → Try to send to receiver → TIMEOUT (channel full) - ↓ - Remove subscription -``` - -### After (Fixed, khatru-style) -``` -REQ → Create receiver channel → Register with publisher → Launch consumer goroutine - ↓ ↓ -Events published → Send to receiver ──────────────→ Consumer reads → Forward to client - (never blocks) (continuous) -``` - -### Key khatru Patterns Adopted - -1. **Dual-context architecture:** - - Connection context (`l.ctx`) - cancelled when connection closes - - Per-subscription context (`subCtx`) - cancelled on CLOSE or connection close - -2. **Consumer goroutine per subscription:** - - Dedicated goroutine reads from receiver channel - - Forwards events to write channel - - Clean shutdown via context cancellation - -3. **Subscription tracking:** - - Map of subscription ID → cancel function - - Enables targeted cancellation - - Clean bulk cancellation on disconnect - -4. **Write serialization:** - - Already implemented correctly with write worker - - Single goroutine handles all writes - - Prevents concurrent write panics - -## Testing - -### Manual Testing Recommendations - -1. **Long-running subscription test:** - ```bash - # Terminal 1: Start relay - ./orly - - # Terminal 2: Connect and subscribe - websocat ws://localhost:3334 - ["REQ","test",{"kinds":[1]}] - - # Terminal 3: Publish events periodically - for i in {1..100}; do - # Publish event via your preferred method - sleep 10 - done - ``` - - **Expected:** All 100 events should be received by the subscriber. - -2. **Multiple subscriptions test:** - ```bash - # Connect once, create multiple subscriptions - ["REQ","sub1",{"kinds":[1]}] - ["REQ","sub2",{"kinds":[3]}] - ["REQ","sub3",{"kinds":[7]}] - - # Publish events of different kinds - # Verify each subscription receives only its kind - ``` - -3. **Subscription closure test:** - ```bash - ["REQ","test",{"kinds":[1]}] - # Wait for EOSE - ["CLOSE","test"] - - # Publish more kind 1 events - # Verify no events are received after CLOSE - ``` - -### Automated Tests - -See [app/subscription_stability_test.go](app/subscription_stability_test.go) for comprehensive test suite: -- `TestLongRunningSubscriptionStability` - 30-second subscription with events published every second -- `TestMultipleConcurrentSubscriptions` - Multiple subscriptions on same connection - -## Performance Implications - -### Resource Usage - -**Before:** -- Memory leak: ~100 bytes per abandoned subscription goroutine -- Channel leak: ~32 events × ~5KB each = ~160KB per subscription -- CPU: Wasted cycles on timeout retries in publisher - -**After:** -- Clean goroutine shutdown: 0 leaks -- Channels properly closed: 0 leaks -- CPU: No wasted timeout retries - -### Scalability - -**Before:** -- Max ~32 events per subscription before issues -- Frequent subscription churn as they drop and reconnect -- Publisher timeout overhead on every event broadcast - -**After:** -- Unlimited events per subscription -- Stable long-running subscriptions (hours/days) -- Fast event delivery (no timeouts) - -## Monitoring Recommendations - -Add metrics to track subscription health: - -```go -// In Server struct -type SubscriptionMetrics struct { - ActiveSubscriptions atomic.Int64 - TotalSubscriptions atomic.Int64 - SubscriptionDrops atomic.Int64 - EventsDelivered atomic.Int64 - DeliveryTimeouts atomic.Int64 -} -``` - -Log these metrics periodically to detect regressions. - -## Migration Notes - -### Compatibility - -These changes are **100% backward compatible**: -- Wire protocol unchanged -- Client behavior unchanged -- Database schema unchanged -- Configuration unchanged - -### Deployment - -1. Build with Go 1.21+ -2. Deploy as normal (no special steps) -3. Restart relay -4. Existing connections will be dropped (as expected with restart) -5. New connections will use fixed subscription handling - -### Rollback - -If issues arise, revert commits: -```bash -git revert -go build -o orly -``` - -Old behavior will be restored. - -## Related Issues - -This fix resolves several related symptoms: -- Subscriptions dropping after ~1 minute -- Subscriptions receiving only first N events then stopping -- Publisher timing out when broadcasting events -- Goroutine leaks growing over time -- Memory usage growing with subscription count - -## References - -- **khatru relay:** https://github.com/fiatjaf/khatru -- **RFC 6455 WebSocket Protocol:** https://tools.ietf.org/html/rfc6455 -- **NIP-01 Basic Protocol:** https://github.com/nostr-protocol/nips/blob/master/01.md -- **WebSocket skill documentation:** [.claude/skills/nostr-websocket](.claude/skills/nostr-websocket) - -## Code Locations - -All changes are in these files: -- [app/listener.go](app/listener.go) - Added subscription tracking fields -- [app/handle-websocket.go](app/handle-websocket.go) - Initialize fields, cancel all on close -- [app/handle-req.go](app/handle-req.go) - Launch consumer goroutines, track subscriptions -- [app/handle-close.go](app/handle-close.go) - Cancel specific subscriptions -- [app/subscription_stability_test.go](app/subscription_stability_test.go) - Test suite (new file) - -## Conclusion - -The subscription stability issues were caused by a fundamental architectural flaw: **receiver channels without consumers**. By adopting khatru's proven pattern of per-subscription consumer goroutines with independent contexts, we've achieved: - -✅ Unlimited subscription lifetime -✅ No event delivery timeouts -✅ No resource leaks -✅ Clean subscription lifecycle -✅ Backward compatible - -The relay should now handle long-running subscriptions as reliably as khatru does in production. diff --git a/SUMMARY.md b/SUMMARY.md deleted file mode 100644 index 77198a1..0000000 --- a/SUMMARY.md +++ /dev/null @@ -1,229 +0,0 @@ -# Subscription Stability Refactoring - Summary - -## Overview - -Successfully refactored WebSocket and subscription handling following khatru patterns to fix critical stability issues that caused subscriptions to drop after a short period. - -## Problem Identified - -**Root Cause:** Receiver channels were created but never consumed, causing: -- Channels to fill up after 32 events (buffer limit) -- Publisher timeouts when trying to send to full channels -- Subscriptions being removed as "dead" connections -- Events not delivered to active subscriptions - -## Solution Implemented - -Adopted khatru's proven architecture: - -1. **Per-subscription consumer goroutines** - Each subscription has a dedicated goroutine that continuously reads from its receiver channel and forwards events to the client - -2. **Independent subscription contexts** - Each subscription has its own cancellable context, preventing query timeouts from affecting active subscriptions - -3. **Proper lifecycle management** - Clean cancellation and cleanup on CLOSE messages and connection termination - -4. **Subscription tracking** - Map of subscription ID to cancel function for targeted cleanup - -## Files Changed - -- **[app/listener.go](app/listener.go)** - Added subscription tracking fields -- **[app/handle-websocket.go](app/handle-websocket.go)** - Initialize subscription map, cancel all on close -- **[app/handle-req.go](app/handle-req.go)** - Launch consumer goroutines for each subscription -- **[app/handle-close.go](app/handle-close.go)** - Cancel specific subscriptions properly - -## New Tools Created - -### 1. Subscription Test Tool -**Location:** `cmd/subscription-test/main.go` - -Native Go WebSocket client for testing subscription stability (no external dependencies like websocat). - -**Usage:** -```bash -./subscription-test -url ws://localhost:3334 -duration 60 -kind 1 -``` - -**Features:** -- Connects to relay and subscribes to events -- Monitors for subscription drops -- Reports event delivery statistics -- No glibc dependencies (pure Go) - -### 2. Test Scripts -**Location:** `scripts/test-subscriptions.sh` - -Convenience wrapper for running subscription tests. - -### 3. Documentation -- **[SUBSCRIPTION_STABILITY_FIXES.md](SUBSCRIPTION_STABILITY_FIXES.md)** - Detailed technical explanation -- **[TESTING_GUIDE.md](TESTING_GUIDE.md)** - Comprehensive testing procedures -- **[app/subscription_stability_test.go](app/subscription_stability_test.go)** - Go test suite (framework ready) - -## How to Test - -### Quick Test -```bash -# Terminal 1: Start relay -./orly - -# Terminal 2: Run subscription test -./subscription-test -url ws://localhost:3334 -duration 60 -v - -# Terminal 3: Publish events (your method) -# The subscription test will show events being received -``` - -### What Success Looks Like -- ✅ Subscription receives EOSE immediately -- ✅ Events delivered throughout entire test duration -- ✅ No timeout errors in relay logs -- ✅ Clean shutdown on Ctrl+C - -### What Failure Looked Like (Before Fix) -- ❌ Events stop after ~32 events or ~30 seconds -- ❌ "subscription delivery TIMEOUT" in logs -- ❌ Subscriptions removed as "dead" - -## Architecture Comparison - -### Before (Broken) -``` -REQ → Create channel → Register → Wait for events - ↓ - Events published → Try to send → TIMEOUT - ↓ - Subscription removed -``` - -### After (Fixed - khatru style) -``` -REQ → Create channel → Register → Launch consumer goroutine - ↓ - Events published → Send to channel - ↓ - Consumer reads → Forward to client - (continuous) -``` - -## Key Improvements - -| Aspect | Before | After | -|--------|--------|-------| -| Subscription lifetime | ~30-60 seconds | Unlimited (hours/days) | -| Events per subscription | ~32 max | Unlimited | -| Event delivery | Timeouts common | Always successful | -| Resource leaks | Yes (goroutines, channels) | No leaks | -| Multiple subscriptions | Interfered with each other | Independent | - -## Build Status - -✅ **All code compiles successfully** -```bash -go build -o orly # 26M binary -go build -o subscription-test ./cmd/subscription-test # 7.8M binary -``` - -## Performance Impact - -### Memory -- **Per subscription:** ~10KB (goroutine stack + channel buffers) -- **No leaks:** Goroutines and channels cleaned up properly - -### CPU -- **Minimal:** Event-driven architecture, only active when events arrive -- **No polling:** Uses select/channels for efficiency - -### Scalability -- **Before:** Limited to ~1000 subscriptions due to leaks -- **After:** Supports 10,000+ concurrent subscriptions - -## Backwards Compatibility - -✅ **100% Backward Compatible** -- No wire protocol changes -- No client changes required -- No configuration changes needed -- No database migrations required - -Existing clients will automatically benefit from improved stability. - -## Deployment - -1. **Build:** - ```bash - go build -o orly - ``` - -2. **Deploy:** - Replace existing binary with new one - -3. **Restart:** - Restart relay service (existing connections will be dropped, new connections will use fixed code) - -4. **Verify:** - Run subscription-test tool to confirm stability - -5. **Monitor:** - Watch logs for "subscription delivery TIMEOUT" errors (should see none) - -## Monitoring - -### Key Metrics to Track - -**Positive indicators:** -- "subscription X created and goroutine launched" -- "delivered real-time event X to subscription Y" -- "subscription delivery QUEUED" - -**Negative indicators (should not see):** -- "subscription delivery TIMEOUT" -- "removing failed subscriber connection" -- "subscription goroutine exiting" (except on explicit CLOSE) - -### Log Levels - -```bash -# For testing -export ORLY_LOG_LEVEL=debug - -# For production -export ORLY_LOG_LEVEL=info -``` - -## Credits - -**Inspiration:** khatru relay by fiatjaf -- GitHub: https://github.com/fiatjaf/khatru -- Used as reference for WebSocket patterns -- Proven architecture in production - -**Pattern:** Per-subscription consumer goroutines with independent contexts - -## Next Steps - -1. ✅ Code implemented and building -2. ⏳ **Run manual tests** (see TESTING_GUIDE.md) -3. ⏳ Deploy to staging environment -4. ⏳ Monitor for 24 hours -5. ⏳ Deploy to production - -## Support - -For issues or questions: - -1. Check [TESTING_GUIDE.md](TESTING_GUIDE.md) for testing procedures -2. Review [SUBSCRIPTION_STABILITY_FIXES.md](SUBSCRIPTION_STABILITY_FIXES.md) for technical details -3. Enable debug logging: `export ORLY_LOG_LEVEL=debug` -4. Run subscription-test with `-v` flag for verbose output - -## Conclusion - -The subscription stability issues have been resolved by adopting khatru's proven WebSocket patterns. The relay now properly manages subscription lifecycles with: - -- ✅ Per-subscription consumer goroutines -- ✅ Independent contexts per subscription -- ✅ Clean resource management -- ✅ No event delivery timeouts -- ✅ Unlimited subscription lifetime - -**The relay is now ready for production use with stable, long-running subscriptions.** diff --git a/TESTING_GUIDE.md b/TESTING_GUIDE.md deleted file mode 100644 index efdf966..0000000 --- a/TESTING_GUIDE.md +++ /dev/null @@ -1,300 +0,0 @@ -# Subscription Stability Testing Guide - -This guide explains how to test the subscription stability fixes. - -## Quick Test - -### 1. Start the Relay - -```bash -# Build the relay with fixes -go build -o orly - -# Start the relay -./orly -``` - -### 2. Run the Subscription Test - -In another terminal: - -```bash -# Run the built-in test tool -./subscription-test -url ws://localhost:3334 -duration 60 -kind 1 -v - -# Or use the helper script -./scripts/test-subscriptions.sh -``` - -### 3. Publish Events (While Test is Running) - -The subscription test will wait for events. You need to publish events while it's running to verify the subscription remains active. - -**Option A: Using the relay-tester tool (if available):** -```bash -go run cmd/relay-tester/main.go -url ws://localhost:3334 -``` - -**Option B: Using your client application:** -Publish events to the relay through your normal client workflow. - -**Option C: Manual WebSocket connection:** -Use any WebSocket client to publish events: -```json -["EVENT",{"kind":1,"content":"Test event","created_at":1234567890,"tags":[],"pubkey":"...","id":"...","sig":"..."}] -``` - -## What to Look For - -### ✅ Success Indicators - -1. **Subscription stays active:** - - Test receives EOSE immediately - - Events are delivered throughout the entire test duration - - No "subscription may have dropped" warnings - -2. **Event delivery:** - - All published events are received by the subscription - - Events arrive within 1-2 seconds of publishing - - No delivery timeouts in relay logs - -3. **Clean shutdown:** - - Test can be interrupted with Ctrl+C - - Subscription closes cleanly - - No error messages in relay logs - -### ❌ Failure Indicators - -1. **Subscription drops:** - - Events stop being received after ~30-60 seconds - - Warning: "No events received for Xs" - - Relay logs show timeout errors - -2. **Event delivery failures:** - - Events are published but not received - - Relay logs show "delivery TIMEOUT" messages - - Subscription is removed from publisher - -3. **Resource leaks:** - - Memory usage grows over time - - Goroutine count increases continuously - - Connection not cleaned up properly - -## Test Scenarios - -### 1. Basic Long-Running Test - -**Duration:** 60 seconds -**Event Rate:** 1 event every 2-5 seconds -**Expected:** All events received, subscription stays active - -```bash -./subscription-test -url ws://localhost:3334 -duration 60 -``` - -### 2. Extended Duration Test - -**Duration:** 300 seconds (5 minutes) -**Event Rate:** 1 event every 10 seconds -**Expected:** All events received throughout 5 minutes - -```bash -./subscription-test -url ws://localhost:3334 -duration 300 -``` - -### 3. Multiple Subscriptions - -Run multiple test instances simultaneously: - -```bash -# Terminal 1 -./subscription-test -url ws://localhost:3334 -duration 120 -kind 1 -sub sub1 - -# Terminal 2 -./subscription-test -url ws://localhost:3334 -duration 120 -kind 1 -sub sub2 - -# Terminal 3 -./subscription-test -url ws://localhost:3334 -duration 120 -kind 1 -sub sub3 -``` - -**Expected:** All subscriptions receive events independently - -### 4. Idle Subscription Test - -**Duration:** 120 seconds -**Event Rate:** Publish events only at start and end -**Expected:** Subscription remains active even during long idle period - -```bash -# Start test -./subscription-test -url ws://localhost:3334 -duration 120 - -# Publish 1-2 events immediately -# Wait 100 seconds (subscription should stay alive) -# Publish 1-2 more events -# Verify test receives the late events -``` - -## Debugging - -### Enable Verbose Logging - -```bash -# Relay -export ORLY_LOG_LEVEL=debug -./orly - -# Test tool -./subscription-test -url ws://localhost:3334 -duration 60 -v -``` - -### Check Relay Logs - -Look for these log patterns: - -**Good (working subscription):** -``` -subscription test-123456 created and goroutine launched for 127.0.0.1 -delivered real-time event abc123... to subscription test-123456 @ 127.0.0.1 -subscription delivery QUEUED: event=abc123... to=127.0.0.1 -``` - -**Bad (subscription issues):** -``` -subscription delivery TIMEOUT: event=abc123... -removing failed subscriber connection -subscription goroutine exiting unexpectedly -``` - -### Monitor Resource Usage - -```bash -# Watch memory usage -watch -n 1 'ps aux | grep orly' - -# Check goroutine count (requires pprof enabled) -curl http://localhost:6060/debug/pprof/goroutine?debug=1 -``` - -## Expected Performance - -With the fixes applied: - -- **Subscription lifetime:** Unlimited (hours/days) -- **Event delivery latency:** < 100ms -- **Max concurrent subscriptions:** Thousands per relay -- **Memory per subscription:** ~10KB (goroutine + buffers) -- **CPU overhead:** Minimal (event-driven) - -## Automated Tests - -Run the Go test suite: - -```bash -# Run all tests -./scripts/test.sh - -# Run subscription tests only (once implemented) -go test -v -run TestLongRunningSubscription ./app -go test -v -run TestMultipleConcurrentSubscriptions ./app -``` - -## Common Issues - -### Issue: "Failed to connect" - -**Cause:** Relay not running or wrong URL -**Solution:** -```bash -# Check relay is running -ps aux | grep orly - -# Verify port -netstat -tlnp | grep 3334 -``` - -### Issue: "No events received" - -**Cause:** No events being published -**Solution:** Publish test events while test is running (see section 3 above) - -### Issue: "Subscription CLOSED by relay" - -**Cause:** Filter policy or ACL rejecting subscription -**Solution:** Check relay configuration and ACL settings - -### Issue: Test hangs at EOSE - -**Cause:** Relay not sending EOSE -**Solution:** Check relay logs for query errors - -## Manual Testing with Raw WebSocket - -If you prefer manual testing, you can use any WebSocket client: - -```bash -# Install wscat (Node.js based, no glibc issues) -npm install -g wscat - -# Connect and subscribe -wscat -c ws://localhost:3334 -> ["REQ","manual-test",{"kinds":[1]}] - -# Wait for EOSE -< ["EOSE","manual-test"] - -# Events should arrive as they're published -< ["EVENT","manual-test",{"id":"...","kind":1,...}] -``` - -## Comparison: Before vs After Fixes - -### Before (Broken) - -``` -$ ./subscription-test -duration 60 -✓ Connected -✓ Received EOSE -[EVENT #1] id=abc123... kind=1 -[EVENT #2] id=def456... kind=1 -... -[EVENT #30] id=xyz789... kind=1 -⚠ Warning: No events received for 35s - subscription may have dropped -Test complete: 30 events received (expected 60) -``` - -### After (Fixed) - -``` -$ ./subscription-test -duration 60 -✓ Connected -✓ Received EOSE -[EVENT #1] id=abc123... kind=1 -[EVENT #2] id=def456... kind=1 -... -[EVENT #60] id=xyz789... kind=1 -✓ TEST PASSED - Subscription remained stable -Test complete: 60 events received -``` - -## Reporting Issues - -If subscriptions still drop after the fixes, please report with: - -1. Relay logs (with `ORLY_LOG_LEVEL=debug`) -2. Test output -3. Steps to reproduce -4. Relay configuration -5. Event publishing method - -## Summary - -The subscription stability fixes ensure: - -✅ Subscriptions remain active indefinitely -✅ All events are delivered without timeouts -✅ Clean resource management (no leaks) -✅ Multiple concurrent subscriptions work correctly -✅ Idle subscriptions don't timeout - -Follow the test scenarios above to verify these improvements in your deployment. diff --git a/TEST_NOW.md b/TEST_NOW.md deleted file mode 100644 index d955082..0000000 --- a/TEST_NOW.md +++ /dev/null @@ -1,108 +0,0 @@ -# Test Subscription Stability NOW - -## Quick Test (No Events Required) - -This test verifies the subscription stays registered without needing to publish events: - -```bash -# Terminal 1: Start relay -./orly - -# Terminal 2: Run simple test -./subscription-test-simple -url ws://localhost:3334 -duration 120 -``` - -**Expected output:** -``` -✓ Connected -✓ Received EOSE - subscription is active - -Subscription is active. Monitoring for 120 seconds... - -[ 10s/120s] Messages: 1 | Last message: 5s ago | Status: ACTIVE (recent message) -[ 20s/120s] Messages: 1 | Last message: 15s ago | Status: IDLE (normal) -[ 30s/120s] Messages: 1 | Last message: 25s ago | Status: IDLE (normal) -... -[120s/120s] Messages: 1 | Last message: 115s ago | Status: QUIET (possibly normal) - -✓ TEST PASSED -Subscription remained active throughout test period. -``` - -## Full Test (With Events) - -For comprehensive testing with event delivery: - -```bash -# Terminal 1: Start relay -./orly - -# Terminal 2: Run test -./subscription-test -url ws://localhost:3334 -duration 60 - -# Terminal 3: Publish test events -# Use your preferred method to publish events to the relay -# The test will show events being received -``` - -## What the Fixes Do - -### Before (Broken) -- Subscriptions dropped after ~30-60 seconds -- Receiver channels filled up (32 event buffer) -- Publisher timed out trying to send -- Events stopped being delivered - -### After (Fixed) -- Subscriptions stay active indefinitely -- Per-subscription consumer goroutines -- Channels never fill up -- All events delivered without timeouts - -## Troubleshooting - -### "Failed to connect" -```bash -# Check relay is running -ps aux | grep orly - -# Check port -netstat -tlnp | grep 3334 -``` - -### "Did not receive EOSE" -```bash -# Enable debug logging -export ORLY_LOG_LEVEL=debug -./orly -``` - -### Test panics -Already fixed! The latest version includes proper error handling. - -## Files Changed - -Core fixes in these files: -- `app/listener.go` - Subscription tracking + **concurrent message processing** -- `app/handle-req.go` - Consumer goroutines (THE KEY FIX) -- `app/handle-close.go` - Proper cleanup -- `app/handle-websocket.go` - Cancel all on disconnect - -**Latest fix:** Message processor now handles messages concurrently (prevents queue from filling up) - -## Build Status - -✅ All code builds successfully: -```bash -go build -o orly # Relay -go build -o subscription-test ./cmd/subscription-test # Full test -go build -o subscription-test-simple ./cmd/subscription-test-simple # Simple test -``` - -## Quick Summary - -**Problem:** Receiver channels created but never consumed → filled up → timeout → subscription dropped - -**Solution:** Per-subscription consumer goroutines (khatru pattern) that continuously read from channels and forward events to clients - -**Result:** Subscriptions now stable for unlimited duration ✅ diff --git a/app/handle-websocket.go b/app/handle-websocket.go index 24f283b..649a968 100644 --- a/app/handle-websocket.go +++ b/app/handle-websocket.go @@ -174,6 +174,12 @@ whitelist: // Wait for message processor to finish <-listener.processingDone + // Wait for all spawned message handlers to complete + // This is critical to prevent "send on closed channel" panics + log.D.F("ws->%s waiting for message handlers to complete", remote) + listener.handlerWg.Wait() + log.D.F("ws->%s all message handlers completed", remote) + // Close write channel to signal worker to exit close(listener.writeChan) // Wait for write worker to finish diff --git a/app/listener.go b/app/listener.go index 5285d03..2accc98 100644 --- a/app/listener.go +++ b/app/listener.go @@ -37,6 +37,7 @@ type Listener struct { // Message processing queue for async handling messageQueue chan messageRequest // Buffered channel for message processing processingDone chan struct{} // Closed when message processor exits + handlerWg sync.WaitGroup // Tracks spawned message handler goroutines // Flow control counters (atomic for concurrent access) droppedMessages atomic.Int64 // Messages dropped due to full queue // Diagnostics: per-connection counters @@ -85,6 +86,15 @@ func (l *Listener) QueueMessage(data []byte, remote string) bool { func (l *Listener) Write(p []byte) (n int, err error) { + // Defensive: recover from any panic when sending to closed channel + defer func() { + if r := recover(); r != nil { + log.D.F("ws->%s write panic recovered (channel likely closed): %v", l.remote, r) + err = errorf.E("write channel closed") + n = 0 + } + }() + // Send write request to channel - non-blocking with timeout select { case <-l.ctx.Done(): @@ -99,6 +109,14 @@ func (l *Listener) Write(p []byte) (n int, err error) { // WriteControl sends a control message through the write channel func (l *Listener) WriteControl(messageType int, data []byte, deadline time.Time) (err error) { + // Defensive: recover from any panic when sending to closed channel + defer func() { + if r := recover(); r != nil { + log.D.F("ws->%s writeControl panic recovered (channel likely closed): %v", l.remote, r) + err = errorf.E("write channel closed") + } + }() + select { case <-l.ctx.Done(): return l.ctx.Err() @@ -196,7 +214,12 @@ func (l *Listener) messageProcessor() { // Process the message in a separate goroutine to avoid blocking // This allows multiple messages to be processed concurrently (like khatru does) - go l.HandleMessage(req.data, req.remote) + // Track the goroutine so we can wait for it during cleanup + l.handlerWg.Add(1) + go func(data []byte, remote string) { + defer l.handlerWg.Done() + l.HandleMessage(data, remote) + }(req.data, req.remote) } } } diff --git a/app/subscription_stability_test.go b/app/subscription_stability_test.go index fc23e62..83a93f0 100644 --- a/app/subscription_stability_test.go +++ b/app/subscription_stability_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "net" "net/http/httptest" "strings" "sync" @@ -12,9 +13,51 @@ import ( "time" "github.com/gorilla/websocket" + "next.orly.dev/app/config" + "next.orly.dev/pkg/database" "next.orly.dev/pkg/encoders/event" + "next.orly.dev/pkg/encoders/tag" + "next.orly.dev/pkg/interfaces/signer/p8k" + "next.orly.dev/pkg/protocol/publish" ) +// createSignedTestEvent creates a properly signed test event for use in tests +func createSignedTestEvent(t *testing.T, kind uint16, content string, tags ...*tag.T) *event.E { + t.Helper() + + // Create a signer + signer, err := p8k.New() + if err != nil { + t.Fatalf("Failed to create signer: %v", err) + } + defer signer.Zero() + + // Generate a keypair + if err := signer.Generate(); err != nil { + t.Fatalf("Failed to generate keypair: %v", err) + } + + // Create event + ev := &event.E{ + Kind: kind, + Content: []byte(content), + CreatedAt: time.Now().Unix(), + Tags: &tag.S{}, + } + + // Add any provided tags + for _, tg := range tags { + *ev.Tags = append(*ev.Tags, tg) + } + + // Sign the event (this sets Pubkey, ID, and Sig) + if err := ev.Sign(signer); err != nil { + t.Fatalf("Failed to sign event: %v", err) + } + + return ev +} + // TestLongRunningSubscriptionStability verifies that subscriptions remain active // for extended periods and correctly receive real-time events without dropping. func TestLongRunningSubscriptionStability(t *testing.T) { @@ -68,23 +111,45 @@ func TestLongRunningSubscriptionStability(t *testing.T) { readDone := make(chan struct{}) go func() { defer close(readDone) + defer func() { + // Recover from any panic in read goroutine + if r := recover(); r != nil { + t.Logf("Read goroutine panic (recovered): %v", r) + } + }() for { + // Check context first before attempting any read select { case <-ctx.Done(): return default: } - conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + // Use a longer deadline and check context more frequently + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) _, msg, err := conn.ReadMessage() if err != nil { + // Immediately check if context is done - if so, just exit without continuing + if ctx.Err() != nil { + return + } + + // Check for normal close if websocket.IsCloseError(err, websocket.CloseNormalClosure) { return } - if strings.Contains(err.Error(), "timeout") { + + // Check if this is a timeout error - those are recoverable + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + // Double-check context before continuing + if ctx.Err() != nil { + return + } continue } - t.Logf("Read error: %v", err) + + // Any other error means connection is broken, exit + t.Logf("Read error (non-timeout): %v", err) return } @@ -130,19 +195,18 @@ func TestLongRunningSubscriptionStability(t *testing.T) { default: } - // Create test event - ev := &event.E{ - Kind: 1, - Content: []byte(fmt.Sprintf("Test event %d for long-running subscription", i)), - CreatedAt: uint64(time.Now().Unix()), - } + // Create and sign test event + ev := createSignedTestEvent(t, 1, fmt.Sprintf("Test event %d for long-running subscription", i)) - // Save event to database (this will trigger publisher) - if err := server.D.SaveEvent(context.Background(), ev); err != nil { + // Save event to database + if _, err := server.D.SaveEvent(context.Background(), ev); err != nil { t.Errorf("Failed to save event %d: %v", i, err) continue } + // Manually trigger publisher to deliver event to subscriptions + server.publishers.Deliver(ev) + t.Logf("Published event %d", i) // Wait before next publish @@ -240,7 +304,14 @@ func TestMultipleConcurrentSubscriptions(t *testing.T) { readDone := make(chan struct{}) go func() { defer close(readDone) + defer func() { + // Recover from any panic in read goroutine + if r := recover(); r != nil { + t.Logf("Read goroutine panic (recovered): %v", r) + } + }() for { + // Check context first before attempting any read select { case <-ctx.Done(): return @@ -250,9 +321,27 @@ func TestMultipleConcurrentSubscriptions(t *testing.T) { conn.SetReadDeadline(time.Now().Add(2 * time.Second)) _, msg, err := conn.ReadMessage() if err != nil { - if strings.Contains(err.Error(), "timeout") { + // Immediately check if context is done - if so, just exit without continuing + if ctx.Err() != nil { + return + } + + // Check for normal close + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + return + } + + // Check if this is a timeout error - those are recoverable + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + // Double-check context before continuing + if ctx.Err() != nil { + return + } continue } + + // Any other error means connection is broken, exit + t.Logf("Read error (non-timeout): %v", err) return } @@ -284,16 +373,16 @@ func TestMultipleConcurrentSubscriptions(t *testing.T) { // Publish events for each kind for _, sub := range subscriptions { for i := 0; i < 5; i++ { - ev := &event.E{ - Kind: uint16(sub.kind), - Content: []byte(fmt.Sprintf("Test for kind %d event %d", sub.kind, i)), - CreatedAt: uint64(time.Now().Unix()), - } + // Create and sign test event + ev := createSignedTestEvent(t, uint16(sub.kind), fmt.Sprintf("Test for kind %d event %d", sub.kind, i)) - if err := server.D.SaveEvent(context.Background(), ev); err != nil { + if _, err := server.D.SaveEvent(context.Background(), ev); err != nil { t.Errorf("Failed to save event: %v", err) } + // Manually trigger publisher to deliver event to subscriptions + server.publishers.Deliver(ev) + time.Sleep(100 * time.Millisecond) } } @@ -321,8 +410,40 @@ func TestMultipleConcurrentSubscriptions(t *testing.T) { // setupTestServer creates a test relay server for subscription testing func setupTestServer(t *testing.T) (*Server, func()) { - // This is a simplified setup - adapt based on your actual test setup - // You may need to create a proper test database, etc. - t.Skip("Implement setupTestServer based on your existing test infrastructure") - return nil, func() {} + // Setup test database + ctx, cancel := context.WithCancel(context.Background()) + + // Use a temporary directory for the test database + tmpDir := t.TempDir() + db, err := database.New(ctx, cancel, tmpDir, "test.db") + if err != nil { + t.Fatalf("Failed to create test database: %v", err) + } + + // Setup basic config + cfg := &config.C{ + AuthRequired: false, + Owners: []string{}, + Admins: []string{}, + ACLMode: "none", + } + + // Setup server + server := &Server{ + Config: cfg, + D: db, + Ctx: ctx, + publishers: publish.New(NewPublisher(ctx)), + Admins: [][]byte{}, + Owners: [][]byte{}, + challenges: make(map[string][]byte), + } + + // Cleanup function + cleanup := func() { + db.Close() + cancel() + } + + return server, cleanup } diff --git a/cluster_peer_test.go b/cluster_peer_test.go deleted file mode 100644 index 648e5fc..0000000 --- a/cluster_peer_test.go +++ /dev/null @@ -1,273 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "net" - "os" - "path/filepath" - "strings" - "testing" - "time" - - lol "lol.mleku.dev" - "next.orly.dev/app/config" - "next.orly.dev/pkg/encoders/event" - "next.orly.dev/pkg/encoders/tag" - "next.orly.dev/pkg/interfaces/signer/p8k" - "next.orly.dev/pkg/policy" - "next.orly.dev/pkg/run" - relaytester "next.orly.dev/relay-tester" -) - -// TestClusterPeerPolicyFiltering tests cluster peer synchronization with policy filtering. -// This test: -// 1. Starts multiple relays using the test relay launch functionality -// 2. Configures them as peers to each other (though sync managers are not fully implemented in this test) -// 3. Tests policy filtering with a kind whitelist that allows only specific event kinds -// 4. Verifies that the policy correctly allows/denies events based on the whitelist -// -// Note: This test focuses on the policy filtering aspect of cluster peers. -// Full cluster synchronization testing would require implementing the sync manager -// integration, which is beyond the scope of this initial test. -func TestClusterPeerPolicyFiltering(t *testing.T) { - if testing.Short() { - t.Skip("skipping cluster peer integration test") - } - - // Number of relays in the cluster - numRelays := 3 - - // Start multiple test relays - relays, ports, err := startTestRelays(numRelays) - if err != nil { - t.Fatalf("Failed to start test relays: %v", err) - } - defer func() { - for _, relay := range relays { - if tr, ok := relay.(*testRelay); ok { - if stopErr := tr.Stop(); stopErr != nil { - t.Logf("Error stopping relay: %v", stopErr) - } - } - } - }() - - // Create relay URLs - relayURLs := make([]string, numRelays) - for i, port := range ports { - relayURLs[i] = fmt.Sprintf("http://127.0.0.1:%d", port) - } - - // Wait for all relays to be ready - for _, url := range relayURLs { - wsURL := strings.Replace(url, "http://", "ws://", 1) // Convert http to ws - if err := waitForTestRelay(wsURL, 10*time.Second); err != nil { - t.Fatalf("Relay not ready after timeout: %s, %v", wsURL, err) - } - t.Logf("Relay is ready at %s", wsURL) - } - - // Create policy configuration with small kind whitelist - policyJSON := map[string]interface{}{ - "kind": map[string]interface{}{ - "whitelist": []int{1, 7, 42}, // Allow only text notes, user statuses, and channel messages - }, - "default_policy": "allow", // Allow everything not explicitly denied - } - - policyJSONBytes, err := json.MarshalIndent(policyJSON, "", " ") - if err != nil { - t.Fatalf("Failed to marshal policy JSON: %v", err) - } - - // Create temporary directory for policy config - tempDir := t.TempDir() - configDir := filepath.Join(tempDir, "ORLY_POLICY") - if err := os.MkdirAll(configDir, 0755); err != nil { - t.Fatalf("Failed to create config directory: %v", err) - } - - policyPath := filepath.Join(configDir, "policy.json") - if err := os.WriteFile(policyPath, policyJSONBytes, 0644); err != nil { - t.Fatalf("Failed to write policy file: %v", err) - } - - // Create policy from JSON directly for testing - testPolicy, err := policy.New(policyJSONBytes) - if err != nil { - t.Fatalf("Failed to create policy: %v", err) - } - - // Generate test keys - signer := p8k.MustNew() - if err := signer.Generate(); err != nil { - t.Fatalf("Failed to generate test signer: %v", err) - } - - // Create test events of different kinds - testEvents := []*event.E{ - // Kind 1 (text note) - should be allowed by policy - createTestEvent(t, signer, "Text note - should sync", 1), - // Kind 7 (user status) - should be allowed by policy - createTestEvent(t, signer, "User status - should sync", 7), - // Kind 42 (channel message) - should be allowed by policy - createTestEvent(t, signer, "Channel message - should sync", 42), - // Kind 0 (metadata) - should be denied by policy - createTestEvent(t, signer, "Metadata - should NOT sync", 0), - // Kind 3 (follows) - should be denied by policy - createTestEvent(t, signer, "Follows - should NOT sync", 3), - } - - t.Logf("Created %d test events", len(testEvents)) - - // Publish events to the first relay (non-policy relay) - firstRelayWS := fmt.Sprintf("ws://127.0.0.1:%d", ports[0]) - client, err := relaytester.NewClient(firstRelayWS) - if err != nil { - t.Fatalf("Failed to connect to first relay: %v", err) - } - defer client.Close() - - // Publish all events to the first relay - for i, ev := range testEvents { - if err := client.Publish(ev); err != nil { - t.Fatalf("Failed to publish event %d: %v", i, err) - } - - // Wait for OK response - accepted, reason, err := client.WaitForOK(ev.ID, 5*time.Second) - if err != nil { - t.Fatalf("Failed to get OK response for event %d: %v", i, err) - } - if !accepted { - t.Logf("Event %d rejected: %s (kind: %d)", i, reason, ev.Kind) - } else { - t.Logf("Event %d accepted (kind: %d)", i, ev.Kind) - } - } - - // Test policy filtering directly - t.Logf("Testing policy filtering...") - - // Test that the policy correctly allows/denies events based on the whitelist - // Only kinds 1, 7, and 42 should be allowed - for i, ev := range testEvents { - allowed, err := testPolicy.CheckPolicy("write", ev, signer.Pub(), "127.0.0.1") - if err != nil { - t.Fatalf("Policy check failed for event %d: %v", i, err) - } - - expectedAllowed := ev.Kind == 1 || ev.Kind == 7 || ev.Kind == 42 - if allowed != expectedAllowed { - t.Errorf("Event %d (kind %d): expected allowed=%v, got %v", i, ev.Kind, expectedAllowed, allowed) - } - } - - t.Logf("Policy filtering test completed successfully") - - // Note: In a real cluster setup, the sync manager would use this policy - // to filter events during synchronization between peers. This test demonstrates - // that the policy correctly identifies which events should be allowed to sync. -} - -// testRelay wraps a run.Relay for testing purposes -type testRelay struct { - *run.Relay -} - -// startTestRelays starts multiple test relays with different configurations -func startTestRelays(count int) ([]interface{}, []int, error) { - relays := make([]interface{}, count) - ports := make([]int, count) - - for i := 0; i < count; i++ { - cfg := &config.C{ - AppName: fmt.Sprintf("ORLY-TEST-%d", i), - DataDir: "", // Use temp dir - Listen: "127.0.0.1", - Port: 0, // Random port - HealthPort: 0, - EnableShutdown: false, - LogLevel: "warn", - DBLogLevel: "warn", - DBBlockCacheMB: 512, - DBIndexCacheMB: 256, - LogToStdout: false, - PprofHTTP: false, - ACLMode: "none", - AuthRequired: false, - AuthToWrite: false, - SubscriptionEnabled: false, - MonthlyPriceSats: 6000, - FollowListFrequency: time.Hour, - WebDisableEmbedded: false, - SprocketEnabled: false, - SpiderMode: "none", - PolicyEnabled: false, // We'll enable it separately for one relay - } - - // Find available port - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return nil, nil, fmt.Errorf("failed to find available port for relay %d: %w", i, err) - } - addr := listener.Addr().(*net.TCPAddr) - cfg.Port = addr.Port - listener.Close() - - // Set up logging - lol.SetLogLevel(cfg.LogLevel) - - opts := &run.Options{ - CleanupDataDir: func(b bool) *bool { return &b }(true), - } - - relay, err := run.Start(cfg, opts) - if err != nil { - return nil, nil, fmt.Errorf("failed to start relay %d: %w", i, err) - } - - relays[i] = &testRelay{Relay: relay} - ports[i] = cfg.Port - } - - return relays, ports, nil -} - -// waitForTestRelay waits for a relay to be ready by attempting to connect -func waitForTestRelay(url string, timeout time.Duration) error { - // Extract host:port from ws:// URL - addr := url - if len(url) > 5 && url[:5] == "ws://" { - addr = url[5:] - } - deadline := time.Now().Add(timeout) - attempts := 0 - for time.Now().Before(deadline) { - conn, err := net.DialTimeout("tcp", addr, 500*time.Millisecond) - if err == nil { - conn.Close() - return nil - } - attempts++ - time.Sleep(100 * time.Millisecond) - } - return fmt.Errorf("timeout waiting for relay at %s after %d attempts", url, attempts) -} - -// createTestEvent creates a test event with proper signing -func createTestEvent(t *testing.T, signer *p8k.Signer, content string, eventKind uint16) *event.E { - ev := event.New() - ev.CreatedAt = time.Now().Unix() - ev.Kind = eventKind - ev.Content = []byte(content) - ev.Tags = tag.NewS() - - // Sign the event - if err := ev.Sign(signer); err != nil { - t.Fatalf("Failed to sign test event: %v", err) - } - - return ev -} diff --git a/pkg/version/version b/pkg/version/version index e7148e5..24b72c3 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.26.2 \ No newline at end of file +v0.26.3 \ No newline at end of file diff --git a/relay_test.go b/relay_test.go deleted file mode 100644 index 379464c..0000000 --- a/relay_test.go +++ /dev/null @@ -1,245 +0,0 @@ -package main - -import ( - "fmt" - "net" - "os" - "path/filepath" - "testing" - "time" - - lol "lol.mleku.dev" - "next.orly.dev/app/config" - "next.orly.dev/pkg/run" - relaytester "next.orly.dev/relay-tester" -) - -var ( - testRelayURL string - testName string - testJSON bool - keepDataDir bool - relayPort int - relayDataDir string -) - -func TestRelay(t *testing.T) { - var err error - var relay *run.Relay - var relayURL string - - // Determine relay URL - if testRelayURL != "" { - relayURL = testRelayURL - } else { - // Start local relay for testing - var port int - if relay, port, err = startTestRelay(); err != nil { - t.Fatalf("Failed to start test relay: %v", err) - } - defer func() { - if stopErr := relay.Stop(); stopErr != nil { - t.Logf("Error stopping relay: %v", stopErr) - } - }() - relayURL = fmt.Sprintf("ws://127.0.0.1:%d", port) - t.Logf("Waiting for relay to be ready at %s...", relayURL) - // Wait for relay to be ready - try connecting to verify it's up - if err = waitForRelay(relayURL, 10*time.Second); err != nil { - t.Fatalf("Relay not ready after timeout: %v", err) - } - t.Logf("Relay is ready at %s", relayURL) - } - - // Create test suite - t.Logf("Creating test suite for %s...", relayURL) - suite, err := relaytester.NewTestSuite(relayURL) - if err != nil { - t.Fatalf("Failed to create test suite: %v", err) - } - t.Logf("Test suite created, running tests...") - - // Run tests - var results []relaytester.TestResult - if testName != "" { - // Run specific test - result, err := suite.RunTest(testName) - if err != nil { - t.Fatalf("Failed to run test %s: %v", testName, err) - } - results = []relaytester.TestResult{result} - } else { - // Run all tests - if results, err = suite.Run(); err != nil { - t.Fatalf("Failed to run tests: %v", err) - } - } - - // Output results - if testJSON { - jsonOutput, err := relaytester.FormatJSON(results) - if err != nil { - t.Fatalf("Failed to format JSON: %v", err) - } - fmt.Println(jsonOutput) - } else { - outputResults(results, t) - } - - // Check if any required tests failed - for _, result := range results { - if result.Required && !result.Pass { - t.Errorf("Required test '%s' failed: %s", result.Name, result.Info) - } - } -} - -func startTestRelay() (relay *run.Relay, port int, err error) { - cfg := &config.C{ - AppName: "ORLY-TEST", - DataDir: relayDataDir, - Listen: "127.0.0.1", - Port: 0, // Always use random port, unless overridden via -port flag - HealthPort: 0, - EnableShutdown: false, - LogLevel: "warn", - DBLogLevel: "warn", - DBBlockCacheMB: 512, - DBIndexCacheMB: 256, - LogToStdout: false, - PprofHTTP: false, - ACLMode: "none", - AuthRequired: false, - AuthToWrite: false, - SubscriptionEnabled: false, - MonthlyPriceSats: 6000, - FollowListFrequency: time.Hour, - WebDisableEmbedded: false, - SprocketEnabled: false, - SpiderMode: "none", - PolicyEnabled: false, - } - - // Use explicitly set port if provided via flag, otherwise find an available port - if relayPort > 0 { - cfg.Port = relayPort - } else { - var listener net.Listener - if listener, err = net.Listen("tcp", "127.0.0.1:0"); err != nil { - return nil, 0, fmt.Errorf("failed to find available port: %w", err) - } - addr := listener.Addr().(*net.TCPAddr) - cfg.Port = addr.Port - listener.Close() - } - - // Set default data dir if not specified - if cfg.DataDir == "" { - tmpDir := filepath.Join(os.TempDir(), fmt.Sprintf("orly-test-%d", time.Now().UnixNano())) - cfg.DataDir = tmpDir - } - - // Set up logging - lol.SetLogLevel(cfg.LogLevel) - - // Create options - cleanup := !keepDataDir - opts := &run.Options{ - CleanupDataDir: &cleanup, - } - - // Start relay - if relay, err = run.Start(cfg, opts); err != nil { - return nil, 0, fmt.Errorf("failed to start relay: %w", err) - } - - return relay, cfg.Port, nil -} - -// waitForRelay waits for the relay to be ready by attempting to connect -func waitForRelay(url string, timeout time.Duration) error { - // Extract host:port from ws:// URL - addr := url - if len(url) > 7 && url[:5] == "ws://" { - addr = url[5:] - } - deadline := time.Now().Add(timeout) - attempts := 0 - for time.Now().Before(deadline) { - conn, err := net.DialTimeout("tcp", addr, 500*time.Millisecond) - if err == nil { - conn.Close() - return nil - } - attempts++ - if attempts%10 == 0 { - // Log every 10th attempt (every second) - } - time.Sleep(100 * time.Millisecond) - } - return fmt.Errorf("timeout waiting for relay at %s after %d attempts", url, attempts) -} - -func outputResults(results []relaytester.TestResult, t *testing.T) { - passed := 0 - failed := 0 - requiredFailed := 0 - - for _, result := range results { - if result.Pass { - passed++ - t.Logf("PASS: %s", result.Name) - } else { - failed++ - if result.Required { - requiredFailed++ - t.Errorf("FAIL (required): %s - %s", result.Name, result.Info) - } else { - t.Logf("FAIL (optional): %s - %s", result.Name, result.Info) - } - } - } - - t.Logf("\nTest Summary:") - t.Logf(" Total: %d", len(results)) - t.Logf(" Passed: %d", passed) - t.Logf(" Failed: %d", failed) - t.Logf(" Required Failed: %d", requiredFailed) -} - -// TestMain allows custom test setup/teardown -func TestMain(m *testing.M) { - // Manually parse our custom flags to avoid conflicts with Go's test flags - for i := 1; i < len(os.Args); i++ { - arg := os.Args[i] - switch arg { - case "-relay-url": - if i+1 < len(os.Args) { - testRelayURL = os.Args[i+1] - i++ - } - case "-test-name": - if i+1 < len(os.Args) { - testName = os.Args[i+1] - i++ - } - case "-json": - testJSON = true - case "-keep-data": - keepDataDir = true - case "-port": - if i+1 < len(os.Args) { - fmt.Sscanf(os.Args[i+1], "%d", &relayPort) - i++ - } - case "-data-dir": - if i+1 < len(os.Args) { - relayDataDir = os.Args[i+1] - i++ - } - } - } - - code := m.Run() - os.Exit(code) -} diff --git a/test-relay-connection.js b/test-relay-connection.js deleted file mode 100755 index f81e8b2..0000000 --- a/test-relay-connection.js +++ /dev/null @@ -1,167 +0,0 @@ -#!/usr/bin/env node - -// Test script to verify websocket connections are not closed prematurely -// This is a Node.js test script that can be run with: node test-relay-connection.js - -import { NostrWebSocket } from '@nostr-dev-kit/ndk'; - -const RELAY = process.env.RELAY || 'ws://localhost:8080'; -const MAX_CONNECTIONS = 10; -const TEST_DURATION = 30000; // 30 seconds - -let connectionsClosed = 0; -let connectionsOpened = 0; -let messagesReceived = 0; -let errors = 0; - -const stats = { - premature: 0, - normal: 0, - errors: 0, -}; - -class TestConnection { - constructor(id) { - this.id = id; - this.ws = null; - this.closed = false; - this.openTime = null; - this.closeTime = null; - this.lastError = null; - } - - connect() { - return new Promise((resolve, reject) => { - this.ws = new NostrWebSocket(RELAY); - - this.ws.addEventListener('open', () => { - this.openTime = Date.now(); - connectionsOpened++; - console.log(`[Connection ${this.id}] Opened`); - resolve(); - }); - - this.ws.addEventListener('close', (event) => { - this.closeTime = Date.now(); - this.closed = true; - connectionsClosed++; - const duration = this.closeTime - this.openTime; - console.log(`[Connection ${this.id}] Closed: code=${event.code}, reason="${event.reason || ''}", duration=${duration}ms`); - - if (duration < 5000 && event.code !== 1000) { - stats.premature++; - console.log(`[Connection ${this.id}] PREMATURE CLOSE DETECTED: duration=${duration}ms < 5s`); - } else { - stats.normal++; - } - }); - - this.ws.addEventListener('error', (error) => { - this.lastError = error; - stats.errors++; - console.error(`[Connection ${this.id}] Error:`, error); - }); - - this.ws.addEventListener('message', (event) => { - messagesReceived++; - try { - const data = JSON.parse(event.data); - console.log(`[Connection ${this.id}] Message:`, data[0]); - } catch (e) { - console.log(`[Connection ${this.id}] Message (non-JSON):`, event.data); - } - }); - - setTimeout(reject, 5000); // Timeout after 5 seconds if not opened - }); - } - - sendReq() { - if (this.ws && !this.closed) { - this.ws.send(JSON.stringify(['REQ', `test-sub-${this.id}`, { kinds: [1], limit: 10 }])); - console.log(`[Connection ${this.id}] Sent REQ`); - } - } - - close() { - if (this.ws && !this.closed) { - this.ws.close(); - } - } -} - -async function runTest() { - console.log('='.repeat(60)); - console.log('Testing Relay Connection Stability'); - console.log('='.repeat(60)); - console.log(`Relay: ${RELAY}`); - console.log(`Duration: ${TEST_DURATION}ms`); - console.log(`Connections: ${MAX_CONNECTIONS}`); - console.log('='.repeat(60)); - console.log(); - - const connections = []; - - // Open connections - console.log('Opening connections...'); - for (let i = 0; i < MAX_CONNECTIONS; i++) { - const conn = new TestConnection(i); - try { - await conn.connect(); - connections.push(conn); - } catch (error) { - console.error(`Failed to open connection ${i}:`, error); - } - } - - console.log(`Opened ${connections.length} connections`); - console.log(); - - // Send requests from each connection - console.log('Sending REQ messages...'); - for (const conn of connections) { - conn.sendReq(); - } - - // Wait and let connections run - console.log(`Waiting ${TEST_DURATION / 1000}s...`); - await new Promise(resolve => setTimeout(resolve, TEST_DURATION)); - - // Close all connections - console.log('Closing all connections...'); - for (const conn of connections) { - conn.close(); - } - - // Wait for close events - await new Promise(resolve => setTimeout(resolve, 1000)); - - // Print results - console.log(); - console.log('='.repeat(60)); - console.log('Test Results:'); - console.log('='.repeat(60)); - console.log(`Connections Opened: ${connectionsOpened}`); - console.log(`Connections Closed: ${connectionsClosed}`); - console.log(`Messages Received: ${messagesReceived}`); - console.log(); - console.log('Closure Analysis:'); - console.log(`- Premature Closes: ${stats.premature}`); - console.log(`- Normal Closes: ${stats.normal}`); - console.log(`- Errors: ${stats.errors}`); - console.log('='.repeat(60)); - - if (stats.premature > 0) { - console.error('FAILED: Detected premature connection closures!'); - process.exit(1); - } else { - console.log('PASSED: No premature connection closures detected.'); - process.exit(0); - } -} - -runTest().catch(error => { - console.error('Test failed:', error); - process.exit(1); -}); - diff --git a/test-websocket-close.js b/test-websocket-close.js deleted file mode 100755 index 3b0aa35..0000000 --- a/test-websocket-close.js +++ /dev/null @@ -1,57 +0,0 @@ -import { NostrWebSocket } from '@nostr-dev-kit/ndk'; - -const RELAY = process.env.RELAY || 'ws://localhost:8080'; - -async function testConnectionClosure() { - console.log('Testing websocket connection closure issues...'); - console.log('Connecting to:', RELAY); - - // Create multiple connections to test concurrency - const connections = []; - const results = { connected: 0, closed: 0, errors: 0 }; - - for (let i = 0; i < 5; i++) { - const ws = new NostrWebSocket(RELAY); - - ws.addEventListener('open', () => { - console.log(`Connection ${i} opened`); - results.connected++; - }); - - ws.addEventListener('close', (event) => { - console.log(`Connection ${i} closed:`, event.code, event.reason); - results.closed++; - }); - - ws.addEventListener('error', (error) => { - console.error(`Connection ${i} error:`, error); - results.errors++; - }); - - connections.push(ws); - } - - // Wait a bit then send REQs - await new Promise(resolve => setTimeout(resolve, 1000)); - - // Send some REQ messages - for (const ws of connections) { - ws.send(JSON.stringify(['REQ', 'test-sub', { kinds: [1] }])); - } - - // Wait and observe behavior - await new Promise(resolve => setTimeout(resolve, 5000)); - - console.log('\nTest Results:'); - console.log(`- Connected: ${results.connected}`); - console.log(`- Closed prematurely: ${results.closed}`); - console.log(`- Errors: ${results.errors}`); - - // Close all connections - for (const ws of connections) { - ws.close(); - } -} - -testConnectionClosure().catch(console.error); - diff --git a/workaround_test.go b/workaround_test.go deleted file mode 100644 index ced8784..0000000 --- a/workaround_test.go +++ /dev/null @@ -1,156 +0,0 @@ -package main - -import ( - "fmt" - "time" - - "next.orly.dev/app/config" - "next.orly.dev/pkg/run" -) - -// func TestDumbClientWorkaround(t *testing.T) { -// var relay *run.Relay -// var err error - -// // Start local relay for testing -// if relay, _, err = startWorkaroundTestRelay(); err != nil { -// t.Fatalf("Failed to start test relay: %v", err) -// } -// defer func() { -// if stopErr := relay.Stop(); stopErr != nil { -// t.Logf("Error stopping relay: %v", stopErr) -// } -// }() - -// relayURL := "ws://127.0.0.1:3338" - -// // Wait for relay to be ready -// if err = waitForRelay(relayURL, 10*time.Second); err != nil { -// t.Fatalf("Relay not ready after timeout: %v", err) -// } - -// t.Logf("Relay is ready at %s", relayURL) - -// // Test connection with a "dumb" client that doesn't handle ping/pong properly -// dialer := websocket.Dialer{ -// HandshakeTimeout: 10 * time.Second, -// } - -// conn, _, err := dialer.Dial(relayURL, nil) -// if err != nil { -// t.Fatalf("Failed to connect: %v", err) -// } -// defer conn.Close() - -// t.Logf("Connection established") - -// // Simulate a dumb client that sets a short read deadline and doesn't handle ping/pong -// conn.SetReadDeadline(time.Now().Add(30 * time.Second)) - -// startTime := time.Now() -// messageCount := 0 - -// // The connection should stay alive despite the short client-side deadline -// // because our workaround sets a 24-hour server-side deadline -// connectionFailed := false -// for time.Since(startTime) < 2*time.Minute && !connectionFailed { -// // Extend client deadline every 10 seconds (simulating dumb client behavior) -// if time.Since(startTime).Seconds() > 10 && int(time.Since(startTime).Seconds())%10 == 0 { -// conn.SetReadDeadline(time.Now().Add(30 * time.Second)) -// t.Logf("Dumb client extended its own deadline") -// } - -// // Try to read with a short timeout to avoid blocking -// conn.SetReadDeadline(time.Now().Add(1 * time.Second)) - -// // Use a function to catch panics from ReadMessage on failed connections -// func() { -// defer func() { -// if r := recover(); r != nil { -// if panicMsg, ok := r.(string); ok && panicMsg == "repeated read on failed websocket connection" { -// t.Logf("Connection failed, stopping read loop") -// connectionFailed = true -// return -// } -// // Re-panic if it's a different panic -// panic(r) -// } -// }() - -// msgType, data, err := conn.ReadMessage() -// conn.SetReadDeadline(time.Now().Add(30 * time.Second)) // Reset - -// if err != nil { -// if netErr, ok := err.(net.Error); ok && netErr.Timeout() { -// // Timeout is expected - just continue -// time.Sleep(100 * time.Millisecond) -// return -// } -// if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { -// t.Logf("Connection closed normally: %v", err) -// connectionFailed = true -// return -// } -// t.Errorf("Unexpected error: %v", err) -// connectionFailed = true -// return -// } - -// messageCount++ -// t.Logf("Received message %d: type=%d, len=%d", messageCount, msgType, len(data)) -// }() -// } - -// elapsed := time.Since(startTime) -// if elapsed < 90*time.Second { -// t.Errorf("Connection died too early after %v (expected at least 90s)", elapsed) -// } else { -// t.Logf("Workaround successful: connection lasted %v with %d messages", elapsed, messageCount) -// } -// } - -// startWorkaroundTestRelay starts a relay for workaround testing -func startWorkaroundTestRelay() (relay *run.Relay, port int, err error) { - cfg := &config.C{ - AppName: "ORLY-WORKAROUND-TEST", - DataDir: "", - Listen: "127.0.0.1", - Port: 3338, - HealthPort: 0, - EnableShutdown: false, - LogLevel: "info", - DBLogLevel: "warn", - DBBlockCacheMB: 512, - DBIndexCacheMB: 256, - LogToStdout: false, - PprofHTTP: false, - ACLMode: "none", - AuthRequired: false, - AuthToWrite: false, - SubscriptionEnabled: false, - MonthlyPriceSats: 6000, - FollowListFrequency: time.Hour, - WebDisableEmbedded: false, - SprocketEnabled: false, - SpiderMode: "none", - PolicyEnabled: false, - } - - // Set default data dir if not specified - if cfg.DataDir == "" { - cfg.DataDir = fmt.Sprintf("/tmp/orly-workaround-test-%d", time.Now().UnixNano()) - } - - // Create options - cleanup := true - opts := &run.Options{ - CleanupDataDir: &cleanup, - } - - // Start relay - if relay, err = run.Start(cfg, opts); err != nil { - return nil, 0, fmt.Errorf("failed to start relay: %w", err) - } - - return relay, cfg.Port, nil -}