Remove deprecated files and enhance subscription stability
- Deleted obsolete files including ALL_FIXES.md, MESSAGE_QUEUE_FIX.md, PUBLISHER_FIX.md, and others to streamline the codebase. - Implemented critical fixes for subscription stability, ensuring receiver channels are consumed and preventing drops. - Introduced per-subscription consumer goroutines to enhance event delivery and prevent message queue overflow. - Updated documentation to reflect changes and provide clear testing guidelines for subscription stability. - Bumped version to v0.26.3 to signify these important updates.
This commit is contained in:
@@ -174,6 +174,12 @@ whitelist:
|
||||
// Wait for message processor to finish
|
||||
<-listener.processingDone
|
||||
|
||||
// Wait for all spawned message handlers to complete
|
||||
// This is critical to prevent "send on closed channel" panics
|
||||
log.D.F("ws->%s waiting for message handlers to complete", remote)
|
||||
listener.handlerWg.Wait()
|
||||
log.D.F("ws->%s all message handlers completed", remote)
|
||||
|
||||
// Close write channel to signal worker to exit
|
||||
close(listener.writeChan)
|
||||
// Wait for write worker to finish
|
||||
|
||||
@@ -37,6 +37,7 @@ type Listener struct {
|
||||
// Message processing queue for async handling
|
||||
messageQueue chan messageRequest // Buffered channel for message processing
|
||||
processingDone chan struct{} // Closed when message processor exits
|
||||
handlerWg sync.WaitGroup // Tracks spawned message handler goroutines
|
||||
// Flow control counters (atomic for concurrent access)
|
||||
droppedMessages atomic.Int64 // Messages dropped due to full queue
|
||||
// Diagnostics: per-connection counters
|
||||
@@ -85,6 +86,15 @@ func (l *Listener) QueueMessage(data []byte, remote string) bool {
|
||||
|
||||
|
||||
func (l *Listener) Write(p []byte) (n int, err error) {
|
||||
// Defensive: recover from any panic when sending to closed channel
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.D.F("ws->%s write panic recovered (channel likely closed): %v", l.remote, r)
|
||||
err = errorf.E("write channel closed")
|
||||
n = 0
|
||||
}
|
||||
}()
|
||||
|
||||
// Send write request to channel - non-blocking with timeout
|
||||
select {
|
||||
case <-l.ctx.Done():
|
||||
@@ -99,6 +109,14 @@ func (l *Listener) Write(p []byte) (n int, err error) {
|
||||
|
||||
// WriteControl sends a control message through the write channel
|
||||
func (l *Listener) WriteControl(messageType int, data []byte, deadline time.Time) (err error) {
|
||||
// Defensive: recover from any panic when sending to closed channel
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.D.F("ws->%s writeControl panic recovered (channel likely closed): %v", l.remote, r)
|
||||
err = errorf.E("write channel closed")
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-l.ctx.Done():
|
||||
return l.ctx.Err()
|
||||
@@ -196,7 +214,12 @@ func (l *Listener) messageProcessor() {
|
||||
|
||||
// Process the message in a separate goroutine to avoid blocking
|
||||
// This allows multiple messages to be processed concurrently (like khatru does)
|
||||
go l.HandleMessage(req.data, req.remote)
|
||||
// Track the goroutine so we can wait for it during cleanup
|
||||
l.handlerWg.Add(1)
|
||||
go func(data []byte, remote string) {
|
||||
defer l.handlerWg.Done()
|
||||
l.HandleMessage(data, remote)
|
||||
}(req.data, req.remote)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -12,9 +13,51 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"next.orly.dev/app/config"
|
||||
"next.orly.dev/pkg/database"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/tag"
|
||||
"next.orly.dev/pkg/interfaces/signer/p8k"
|
||||
"next.orly.dev/pkg/protocol/publish"
|
||||
)
|
||||
|
||||
// createSignedTestEvent creates a properly signed test event for use in tests
|
||||
func createSignedTestEvent(t *testing.T, kind uint16, content string, tags ...*tag.T) *event.E {
|
||||
t.Helper()
|
||||
|
||||
// Create a signer
|
||||
signer, err := p8k.New()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create signer: %v", err)
|
||||
}
|
||||
defer signer.Zero()
|
||||
|
||||
// Generate a keypair
|
||||
if err := signer.Generate(); err != nil {
|
||||
t.Fatalf("Failed to generate keypair: %v", err)
|
||||
}
|
||||
|
||||
// Create event
|
||||
ev := &event.E{
|
||||
Kind: kind,
|
||||
Content: []byte(content),
|
||||
CreatedAt: time.Now().Unix(),
|
||||
Tags: &tag.S{},
|
||||
}
|
||||
|
||||
// Add any provided tags
|
||||
for _, tg := range tags {
|
||||
*ev.Tags = append(*ev.Tags, tg)
|
||||
}
|
||||
|
||||
// Sign the event (this sets Pubkey, ID, and Sig)
|
||||
if err := ev.Sign(signer); err != nil {
|
||||
t.Fatalf("Failed to sign event: %v", err)
|
||||
}
|
||||
|
||||
return ev
|
||||
}
|
||||
|
||||
// TestLongRunningSubscriptionStability verifies that subscriptions remain active
|
||||
// for extended periods and correctly receive real-time events without dropping.
|
||||
func TestLongRunningSubscriptionStability(t *testing.T) {
|
||||
@@ -68,23 +111,45 @@ func TestLongRunningSubscriptionStability(t *testing.T) {
|
||||
readDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(readDone)
|
||||
defer func() {
|
||||
// Recover from any panic in read goroutine
|
||||
if r := recover(); r != nil {
|
||||
t.Logf("Read goroutine panic (recovered): %v", r)
|
||||
}
|
||||
}()
|
||||
for {
|
||||
// Check context first before attempting any read
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
|
||||
// Use a longer deadline and check context more frequently
|
||||
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
// Immediately check if context is done - if so, just exit without continuing
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Check for normal close
|
||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||
return
|
||||
}
|
||||
if strings.Contains(err.Error(), "timeout") {
|
||||
|
||||
// Check if this is a timeout error - those are recoverable
|
||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||
// Double-check context before continuing
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
t.Logf("Read error: %v", err)
|
||||
|
||||
// Any other error means connection is broken, exit
|
||||
t.Logf("Read error (non-timeout): %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -130,19 +195,18 @@ func TestLongRunningSubscriptionStability(t *testing.T) {
|
||||
default:
|
||||
}
|
||||
|
||||
// Create test event
|
||||
ev := &event.E{
|
||||
Kind: 1,
|
||||
Content: []byte(fmt.Sprintf("Test event %d for long-running subscription", i)),
|
||||
CreatedAt: uint64(time.Now().Unix()),
|
||||
}
|
||||
// Create and sign test event
|
||||
ev := createSignedTestEvent(t, 1, fmt.Sprintf("Test event %d for long-running subscription", i))
|
||||
|
||||
// Save event to database (this will trigger publisher)
|
||||
if err := server.D.SaveEvent(context.Background(), ev); err != nil {
|
||||
// Save event to database
|
||||
if _, err := server.D.SaveEvent(context.Background(), ev); err != nil {
|
||||
t.Errorf("Failed to save event %d: %v", i, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Manually trigger publisher to deliver event to subscriptions
|
||||
server.publishers.Deliver(ev)
|
||||
|
||||
t.Logf("Published event %d", i)
|
||||
|
||||
// Wait before next publish
|
||||
@@ -240,7 +304,14 @@ func TestMultipleConcurrentSubscriptions(t *testing.T) {
|
||||
readDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(readDone)
|
||||
defer func() {
|
||||
// Recover from any panic in read goroutine
|
||||
if r := recover(); r != nil {
|
||||
t.Logf("Read goroutine panic (recovered): %v", r)
|
||||
}
|
||||
}()
|
||||
for {
|
||||
// Check context first before attempting any read
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
@@ -250,9 +321,27 @@ func TestMultipleConcurrentSubscriptions(t *testing.T) {
|
||||
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "timeout") {
|
||||
// Immediately check if context is done - if so, just exit without continuing
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Check for normal close
|
||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if this is a timeout error - those are recoverable
|
||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||
// Double-check context before continuing
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Any other error means connection is broken, exit
|
||||
t.Logf("Read error (non-timeout): %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -284,16 +373,16 @@ func TestMultipleConcurrentSubscriptions(t *testing.T) {
|
||||
// Publish events for each kind
|
||||
for _, sub := range subscriptions {
|
||||
for i := 0; i < 5; i++ {
|
||||
ev := &event.E{
|
||||
Kind: uint16(sub.kind),
|
||||
Content: []byte(fmt.Sprintf("Test for kind %d event %d", sub.kind, i)),
|
||||
CreatedAt: uint64(time.Now().Unix()),
|
||||
}
|
||||
// Create and sign test event
|
||||
ev := createSignedTestEvent(t, uint16(sub.kind), fmt.Sprintf("Test for kind %d event %d", sub.kind, i))
|
||||
|
||||
if err := server.D.SaveEvent(context.Background(), ev); err != nil {
|
||||
if _, err := server.D.SaveEvent(context.Background(), ev); err != nil {
|
||||
t.Errorf("Failed to save event: %v", err)
|
||||
}
|
||||
|
||||
// Manually trigger publisher to deliver event to subscriptions
|
||||
server.publishers.Deliver(ev)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
@@ -321,8 +410,40 @@ func TestMultipleConcurrentSubscriptions(t *testing.T) {
|
||||
|
||||
// setupTestServer creates a test relay server for subscription testing
|
||||
func setupTestServer(t *testing.T) (*Server, func()) {
|
||||
// This is a simplified setup - adapt based on your actual test setup
|
||||
// You may need to create a proper test database, etc.
|
||||
t.Skip("Implement setupTestServer based on your existing test infrastructure")
|
||||
return nil, func() {}
|
||||
// Setup test database
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// Use a temporary directory for the test database
|
||||
tmpDir := t.TempDir()
|
||||
db, err := database.New(ctx, cancel, tmpDir, "test.db")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test database: %v", err)
|
||||
}
|
||||
|
||||
// Setup basic config
|
||||
cfg := &config.C{
|
||||
AuthRequired: false,
|
||||
Owners: []string{},
|
||||
Admins: []string{},
|
||||
ACLMode: "none",
|
||||
}
|
||||
|
||||
// Setup server
|
||||
server := &Server{
|
||||
Config: cfg,
|
||||
D: db,
|
||||
Ctx: ctx,
|
||||
publishers: publish.New(NewPublisher(ctx)),
|
||||
Admins: [][]byte{},
|
||||
Owners: [][]byte{},
|
||||
challenges: make(map[string][]byte),
|
||||
}
|
||||
|
||||
// Cleanup function
|
||||
cleanup := func() {
|
||||
db.Close()
|
||||
cancel()
|
||||
}
|
||||
|
||||
return server, cleanup
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user