Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
c79cd2ffee
|
|||
|
581e0ec588
|
@@ -4,7 +4,20 @@
|
|||||||
"Skill(skill-creator)",
|
"Skill(skill-creator)",
|
||||||
"Bash(cat:*)",
|
"Bash(cat:*)",
|
||||||
"Bash(python3:*)",
|
"Bash(python3:*)",
|
||||||
"Bash(find:*)"
|
"Bash(find:*)",
|
||||||
|
"Skill(nostr-websocket)",
|
||||||
|
"Bash(go build:*)",
|
||||||
|
"Bash(chmod:*)",
|
||||||
|
"Bash(journalctl:*)",
|
||||||
|
"Bash(timeout 5 bash -c 'echo [\"\"REQ\"\",\"\"test123\"\",{\"\"kinds\"\":[1],\"\"limit\"\":1}] | websocat ws://localhost:3334':*)",
|
||||||
|
"Bash(pkill:*)",
|
||||||
|
"Bash(timeout 5 bash:*)",
|
||||||
|
"Bash(md5sum:*)",
|
||||||
|
"Bash(timeout 3 bash -c 'echo [\\\"\"REQ\\\"\",\\\"\"test456\\\"\",{\\\"\"kinds\\\"\":[1],\\\"\"limit\\\"\":10}] | websocat ws://localhost:3334')",
|
||||||
|
"Bash(printf:*)",
|
||||||
|
"Bash(websocat:*)",
|
||||||
|
"Bash(go test:*)",
|
||||||
|
"Bash(timeout 180 go test:*)"
|
||||||
],
|
],
|
||||||
"deny": [],
|
"deny": [],
|
||||||
"ask": []
|
"ask": []
|
||||||
|
|||||||
@@ -23,13 +23,30 @@ func (l *Listener) HandleClose(req []byte) (err error) {
|
|||||||
if len(env.ID) == 0 {
|
if len(env.ID) == 0 {
|
||||||
return errors.New("CLOSE has no <id>")
|
return errors.New("CLOSE has no <id>")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
subID := string(env.ID)
|
||||||
|
|
||||||
|
// Cancel the subscription goroutine by calling its cancel function
|
||||||
|
l.subscriptionsMu.Lock()
|
||||||
|
if cancelFunc, exists := l.subscriptions[subID]; exists {
|
||||||
|
log.D.F("cancelling subscription %s for %s", subID, l.remote)
|
||||||
|
cancelFunc()
|
||||||
|
delete(l.subscriptions, subID)
|
||||||
|
} else {
|
||||||
|
log.D.F("subscription %s not found for %s (already closed?)", subID, l.remote)
|
||||||
|
}
|
||||||
|
l.subscriptionsMu.Unlock()
|
||||||
|
|
||||||
|
// Also remove from publisher's tracking
|
||||||
l.publishers.Receive(
|
l.publishers.Receive(
|
||||||
&W{
|
&W{
|
||||||
Cancel: true,
|
Cancel: true,
|
||||||
remote: l.remote,
|
remote: l.remote,
|
||||||
Conn: l.conn,
|
Conn: l.conn,
|
||||||
Id: string(env.ID),
|
Id: subID,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
log.D.F("CLOSE processed for subscription %s @ %s", subID, l.remote)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -142,8 +142,7 @@ func (l *Listener) HandleMessage(msg []byte, remote string) {
|
|||||||
if !strings.Contains(err.Error(), "context canceled") {
|
if !strings.Contains(err.Error(), "context canceled") {
|
||||||
log.E.F("%s message processing FAILED (type=%s): %v", remote, t, err)
|
log.E.F("%s message processing FAILED (type=%s): %v", remote, t, err)
|
||||||
// Don't log message preview as it may contain binary data
|
// Don't log message preview as it may contain binary data
|
||||||
|
// Send error notice to client (use generic message to avoid control chars in errors)
|
||||||
// Send error notice to client (use generic message to avoid control chars in errors)
|
|
||||||
noticeMsg := fmt.Sprintf("%s processing failed", t)
|
noticeMsg := fmt.Sprintf("%s processing failed", t)
|
||||||
if noticeErr := noticeenvelope.NewFrom(noticeMsg).Write(l); noticeErr != nil {
|
if noticeErr := noticeenvelope.NewFrom(noticeMsg).Write(l); noticeErr != nil {
|
||||||
log.E.F(
|
log.E.F(
|
||||||
|
|||||||
@@ -43,7 +43,6 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
|||||||
}
|
}
|
||||||
return normalize.Error.Errorf(err.Error())
|
return normalize.Error.Errorf(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
log.T.C(
|
log.T.C(
|
||||||
func() string {
|
func() string {
|
||||||
return fmt.Sprintf(
|
return fmt.Sprintf(
|
||||||
@@ -533,24 +532,24 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
|||||||
)
|
)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
log.T.C(
|
log.T.C(
|
||||||
func() string {
|
func() string {
|
||||||
return fmt.Sprintf("event:\n%s\n", ev.Serialize())
|
return fmt.Sprintf("event:\n%s\n", ev.Serialize())
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
var res *eventenvelope.Result
|
var res *eventenvelope.Result
|
||||||
if res, err = eventenvelope.NewResultWith(
|
if res, err = eventenvelope.NewResultWith(
|
||||||
env.Subscription, ev,
|
env.Subscription, ev,
|
||||||
); chk.E(err) {
|
); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err = res.Write(l); err != nil {
|
if err = res.Write(l); err != nil {
|
||||||
// Don't log context canceled errors as they're expected during shutdown
|
// Don't log context canceled errors as they're expected during shutdown
|
||||||
if !strings.Contains(err.Error(), "context canceled") {
|
if !strings.Contains(err.Error(), "context canceled") {
|
||||||
chk.E(err)
|
chk.E(err)
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
return
|
|
||||||
}
|
|
||||||
// track the IDs we've sent (use hex encoding for stable key)
|
// track the IDs we've sent (use hex encoding for stable key)
|
||||||
seen[hexenc.Enc(ev.ID)] = struct{}{}
|
seen[hexenc.Enc(ev.ID)] = struct{}{}
|
||||||
}
|
}
|
||||||
@@ -577,7 +576,7 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
|||||||
limitSatisfied = true
|
limitSatisfied = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if f.Ids.Len() < 1 {
|
if f.Ids.Len() < 1 {
|
||||||
// Filter has no IDs - keep subscription open unless limit was satisfied
|
// Filter has no IDs - keep subscription open unless limit was satisfied
|
||||||
if !limitSatisfied {
|
if !limitSatisfied {
|
||||||
@@ -616,18 +615,81 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
|
|||||||
receiver := make(event.C, 32)
|
receiver := make(event.C, 32)
|
||||||
// if the subscription should be cancelled, do so
|
// if the subscription should be cancelled, do so
|
||||||
if !cancel {
|
if !cancel {
|
||||||
|
// Create a dedicated context for this subscription that's independent of query context
|
||||||
|
// but is child of the listener context so it gets cancelled when connection closes
|
||||||
|
subCtx, subCancel := context.WithCancel(l.ctx)
|
||||||
|
|
||||||
|
// Track this subscription so we can cancel it on CLOSE or connection close
|
||||||
|
subID := string(env.Subscription)
|
||||||
|
l.subscriptionsMu.Lock()
|
||||||
|
l.subscriptions[subID] = subCancel
|
||||||
|
l.subscriptionsMu.Unlock()
|
||||||
|
|
||||||
|
// Register subscription with publisher
|
||||||
l.publishers.Receive(
|
l.publishers.Receive(
|
||||||
&W{
|
&W{
|
||||||
Conn: l.conn,
|
Conn: l.conn,
|
||||||
remote: l.remote,
|
remote: l.remote,
|
||||||
Id: string(env.Subscription),
|
Id: subID,
|
||||||
Receiver: receiver,
|
Receiver: receiver,
|
||||||
Filters: &subbedFilters,
|
Filters: &subbedFilters,
|
||||||
AuthedPubkey: l.authedPubkey.Load(),
|
AuthedPubkey: l.authedPubkey.Load(),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Launch goroutine to consume from receiver channel and forward to client
|
||||||
|
// This is the critical missing piece - without this, the receiver channel fills up
|
||||||
|
// and the publisher times out trying to send, causing subscription to be removed
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
// Clean up when subscription ends
|
||||||
|
l.subscriptionsMu.Lock()
|
||||||
|
delete(l.subscriptions, subID)
|
||||||
|
l.subscriptionsMu.Unlock()
|
||||||
|
log.D.F("subscription goroutine exiting for %s @ %s", subID, l.remote)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-subCtx.Done():
|
||||||
|
// Subscription cancelled (CLOSE message or connection closing)
|
||||||
|
log.D.F("subscription %s cancelled for %s", subID, l.remote)
|
||||||
|
return
|
||||||
|
case ev, ok := <-receiver:
|
||||||
|
if !ok {
|
||||||
|
// Channel closed - subscription ended
|
||||||
|
log.D.F("subscription %s receiver channel closed for %s", subID, l.remote)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Forward event to client via write channel
|
||||||
|
var res *eventenvelope.Result
|
||||||
|
var err error
|
||||||
|
if res, err = eventenvelope.NewResultWith(subID, ev); chk.E(err) {
|
||||||
|
log.E.F("failed to create event envelope for subscription %s: %v", subID, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write to client - this goes through the write worker
|
||||||
|
if err = res.Write(l); err != nil {
|
||||||
|
if !strings.Contains(err.Error(), "context canceled") {
|
||||||
|
log.E.F("failed to write event to subscription %s @ %s: %v", subID, l.remote, err)
|
||||||
|
}
|
||||||
|
// Don't return here - write errors shouldn't kill the subscription
|
||||||
|
// The connection cleanup will handle removing the subscription
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.D.F("delivered real-time event %s to subscription %s @ %s",
|
||||||
|
hexenc.Enc(ev.ID), subID, l.remote)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
log.D.F("subscription %s created and goroutine launched for %s", subID, l.remote)
|
||||||
} else {
|
} else {
|
||||||
// suppress server-sent CLOSED; client will close subscription if desired
|
// suppress server-sent CLOSED; client will close subscription if desired
|
||||||
|
log.D.F("subscription request cancelled immediately (all IDs found or limit satisfied)")
|
||||||
}
|
}
|
||||||
log.T.F("HandleReq: COMPLETED processing from %s", l.remote)
|
log.T.F("HandleReq: COMPLETED processing from %s", l.remote)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -72,19 +72,20 @@ whitelist:
|
|||||||
// Set read limit immediately after connection is established
|
// Set read limit immediately after connection is established
|
||||||
conn.SetReadLimit(DefaultMaxMessageSize)
|
conn.SetReadLimit(DefaultMaxMessageSize)
|
||||||
log.D.F("set read limit to %d bytes (%d MB) for %s", DefaultMaxMessageSize, DefaultMaxMessageSize/units.Mb, remote)
|
log.D.F("set read limit to %d bytes (%d MB) for %s", DefaultMaxMessageSize, DefaultMaxMessageSize/units.Mb, remote)
|
||||||
|
|
||||||
// Set initial read deadline - pong handler will extend it when pongs are received
|
// Set initial read deadline - pong handler will extend it when pongs are received
|
||||||
conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
|
conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
|
||||||
|
|
||||||
// Add pong handler to extend read deadline when client responds to pings
|
// Add pong handler to extend read deadline when client responds to pings
|
||||||
conn.SetPongHandler(func(string) error {
|
conn.SetPongHandler(func(string) error {
|
||||||
log.T.F("received PONG from %s, extending read deadline", remote)
|
log.T.F("received PONG from %s, extending read deadline", remote)
|
||||||
return conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
|
return conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
|
||||||
})
|
})
|
||||||
|
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
listener := &Listener{
|
listener := &Listener{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
Server: s,
|
Server: s,
|
||||||
conn: conn,
|
conn: conn,
|
||||||
remote: remote,
|
remote: remote,
|
||||||
@@ -94,6 +95,7 @@ whitelist:
|
|||||||
writeDone: make(chan struct{}),
|
writeDone: make(chan struct{}),
|
||||||
messageQueue: make(chan messageRequest, 100), // Buffered channel for message processing
|
messageQueue: make(chan messageRequest, 100), // Buffered channel for message processing
|
||||||
processingDone: make(chan struct{}),
|
processingDone: make(chan struct{}),
|
||||||
|
subscriptions: make(map[string]context.CancelFunc),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start write worker goroutine
|
// Start write worker goroutine
|
||||||
@@ -131,12 +133,21 @@ whitelist:
|
|||||||
defer func() {
|
defer func() {
|
||||||
log.D.F("closing websocket connection from %s", remote)
|
log.D.F("closing websocket connection from %s", remote)
|
||||||
|
|
||||||
|
// Cancel all active subscriptions first
|
||||||
|
listener.subscriptionsMu.Lock()
|
||||||
|
for subID, cancelFunc := range listener.subscriptions {
|
||||||
|
log.D.F("cancelling subscription %s for %s", subID, remote)
|
||||||
|
cancelFunc()
|
||||||
|
}
|
||||||
|
listener.subscriptions = nil
|
||||||
|
listener.subscriptionsMu.Unlock()
|
||||||
|
|
||||||
// Cancel context and stop pinger
|
// Cancel context and stop pinger
|
||||||
cancel()
|
cancel()
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
|
|
||||||
// Cancel all subscriptions for this connection
|
// Cancel all subscriptions for this connection at publisher level
|
||||||
log.D.F("cancelling subscriptions for %s", remote)
|
log.D.F("removing subscriptions from publisher for %s", remote)
|
||||||
listener.publishers.Receive(&W{
|
listener.publishers.Receive(&W{
|
||||||
Cancel: true,
|
Cancel: true,
|
||||||
Conn: listener.conn,
|
Conn: listener.conn,
|
||||||
@@ -163,6 +174,12 @@ whitelist:
|
|||||||
// Wait for message processor to finish
|
// Wait for message processor to finish
|
||||||
<-listener.processingDone
|
<-listener.processingDone
|
||||||
|
|
||||||
|
// Wait for all spawned message handlers to complete
|
||||||
|
// This is critical to prevent "send on closed channel" panics
|
||||||
|
log.D.F("ws->%s waiting for message handlers to complete", remote)
|
||||||
|
listener.handlerWg.Wait()
|
||||||
|
log.D.F("ws->%s all message handlers completed", remote)
|
||||||
|
|
||||||
// Close write channel to signal worker to exit
|
// Close write channel to signal worker to exit
|
||||||
close(listener.writeChan)
|
close(listener.writeChan)
|
||||||
// Wait for write worker to finish
|
// Wait for write worker to finish
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -23,6 +24,7 @@ type Listener struct {
|
|||||||
*Server
|
*Server
|
||||||
conn *websocket.Conn
|
conn *websocket.Conn
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc // Cancel function for this listener's context
|
||||||
remote string
|
remote string
|
||||||
req *http.Request
|
req *http.Request
|
||||||
challenge atomicutils.Bytes
|
challenge atomicutils.Bytes
|
||||||
@@ -35,12 +37,16 @@ type Listener struct {
|
|||||||
// Message processing queue for async handling
|
// Message processing queue for async handling
|
||||||
messageQueue chan messageRequest // Buffered channel for message processing
|
messageQueue chan messageRequest // Buffered channel for message processing
|
||||||
processingDone chan struct{} // Closed when message processor exits
|
processingDone chan struct{} // Closed when message processor exits
|
||||||
|
handlerWg sync.WaitGroup // Tracks spawned message handler goroutines
|
||||||
// Flow control counters (atomic for concurrent access)
|
// Flow control counters (atomic for concurrent access)
|
||||||
droppedMessages atomic.Int64 // Messages dropped due to full queue
|
droppedMessages atomic.Int64 // Messages dropped due to full queue
|
||||||
// Diagnostics: per-connection counters
|
// Diagnostics: per-connection counters
|
||||||
msgCount int
|
msgCount int
|
||||||
reqCount int
|
reqCount int
|
||||||
eventCount int
|
eventCount int
|
||||||
|
// Subscription tracking for cleanup
|
||||||
|
subscriptions map[string]context.CancelFunc // Map of subscription ID to cancel function
|
||||||
|
subscriptionsMu sync.Mutex // Protects subscriptions map
|
||||||
}
|
}
|
||||||
|
|
||||||
type messageRequest struct {
|
type messageRequest struct {
|
||||||
@@ -80,6 +86,15 @@ func (l *Listener) QueueMessage(data []byte, remote string) bool {
|
|||||||
|
|
||||||
|
|
||||||
func (l *Listener) Write(p []byte) (n int, err error) {
|
func (l *Listener) Write(p []byte) (n int, err error) {
|
||||||
|
// Defensive: recover from any panic when sending to closed channel
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
log.D.F("ws->%s write panic recovered (channel likely closed): %v", l.remote, r)
|
||||||
|
err = errorf.E("write channel closed")
|
||||||
|
n = 0
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Send write request to channel - non-blocking with timeout
|
// Send write request to channel - non-blocking with timeout
|
||||||
select {
|
select {
|
||||||
case <-l.ctx.Done():
|
case <-l.ctx.Done():
|
||||||
@@ -94,6 +109,14 @@ func (l *Listener) Write(p []byte) (n int, err error) {
|
|||||||
|
|
||||||
// WriteControl sends a control message through the write channel
|
// WriteControl sends a control message through the write channel
|
||||||
func (l *Listener) WriteControl(messageType int, data []byte, deadline time.Time) (err error) {
|
func (l *Listener) WriteControl(messageType int, data []byte, deadline time.Time) (err error) {
|
||||||
|
// Defensive: recover from any panic when sending to closed channel
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
log.D.F("ws->%s writeControl panic recovered (channel likely closed): %v", l.remote, r)
|
||||||
|
err = errorf.E("write channel closed")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-l.ctx.Done():
|
case <-l.ctx.Done():
|
||||||
return l.ctx.Err()
|
return l.ctx.Err()
|
||||||
@@ -189,8 +212,14 @@ func (l *Listener) messageProcessor() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process the message synchronously in this goroutine
|
// Process the message in a separate goroutine to avoid blocking
|
||||||
l.HandleMessage(req.data, req.remote)
|
// This allows multiple messages to be processed concurrently (like khatru does)
|
||||||
|
// Track the goroutine so we can wait for it during cleanup
|
||||||
|
l.handlerWg.Add(1)
|
||||||
|
go func(data []byte, remote string) {
|
||||||
|
defer l.handlerWg.Done()
|
||||||
|
l.HandleMessage(data, remote)
|
||||||
|
}(req.data, req.remote)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,10 +7,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"lol.mleku.dev/chk"
|
|
||||||
"lol.mleku.dev/log"
|
"lol.mleku.dev/log"
|
||||||
"next.orly.dev/pkg/acl"
|
"next.orly.dev/pkg/acl"
|
||||||
"next.orly.dev/pkg/encoders/envelopes/eventenvelope"
|
|
||||||
"next.orly.dev/pkg/encoders/event"
|
"next.orly.dev/pkg/encoders/event"
|
||||||
"next.orly.dev/pkg/encoders/filter"
|
"next.orly.dev/pkg/encoders/filter"
|
||||||
"next.orly.dev/pkg/encoders/hex"
|
"next.orly.dev/pkg/encoders/hex"
|
||||||
@@ -29,6 +27,7 @@ type WriteChanMap map[*websocket.Conn]chan publish.WriteRequest
|
|||||||
type Subscription struct {
|
type Subscription struct {
|
||||||
remote string
|
remote string
|
||||||
AuthedPubkey []byte
|
AuthedPubkey []byte
|
||||||
|
Receiver event.C // Channel for delivering events to this subscription
|
||||||
*filter.S
|
*filter.S
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -121,12 +120,12 @@ func (p *P) Receive(msg typer.T) {
|
|||||||
if subs, ok := p.Map[m.Conn]; !ok {
|
if subs, ok := p.Map[m.Conn]; !ok {
|
||||||
subs = make(map[string]Subscription)
|
subs = make(map[string]Subscription)
|
||||||
subs[m.Id] = Subscription{
|
subs[m.Id] = Subscription{
|
||||||
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
|
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey, Receiver: m.Receiver,
|
||||||
}
|
}
|
||||||
p.Map[m.Conn] = subs
|
p.Map[m.Conn] = subs
|
||||||
} else {
|
} else {
|
||||||
subs[m.Id] = Subscription{
|
subs[m.Id] = Subscription{
|
||||||
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
|
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey, Receiver: m.Receiver,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -144,7 +143,6 @@ func (p *P) Receive(msg typer.T) {
|
|||||||
// applies authentication checks if required by the server and skips delivery
|
// applies authentication checks if required by the server and skips delivery
|
||||||
// for unauthenticated users when events are privileged.
|
// for unauthenticated users when events are privileged.
|
||||||
func (p *P) Deliver(ev *event.E) {
|
func (p *P) Deliver(ev *event.E) {
|
||||||
var err error
|
|
||||||
// Snapshot the deliveries under read lock to avoid holding locks during I/O
|
// Snapshot the deliveries under read lock to avoid holding locks during I/O
|
||||||
p.Mx.RLock()
|
p.Mx.RLock()
|
||||||
type delivery struct {
|
type delivery struct {
|
||||||
@@ -238,52 +236,30 @@ func (p *P) Deliver(ev *event.E) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var res *eventenvelope.Result
|
// Send event to the subscription's receiver channel
|
||||||
if res, err = eventenvelope.NewResultWith(d.id, ev); chk.E(err) {
|
// The consumer goroutine (in handle-req.go) will read from this channel
|
||||||
log.E.F("failed to create event envelope for %s to %s: %v",
|
// and forward it to the client via the write channel
|
||||||
hex.Enc(ev.ID), d.sub.remote, err)
|
log.D.F("attempting delivery of event %s (kind=%d) to subscription %s @ %s",
|
||||||
|
hex.Enc(ev.ID), ev.Kind, d.id, d.sub.remote)
|
||||||
|
|
||||||
|
// Check if receiver channel exists
|
||||||
|
if d.sub.Receiver == nil {
|
||||||
|
log.E.F("subscription %s has nil receiver channel for %s", d.id, d.sub.remote)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log delivery attempt
|
// Send to receiver channel - non-blocking with timeout
|
||||||
msgData := res.Marshal(nil)
|
|
||||||
log.D.F("attempting delivery of event %s (kind=%d, len=%d) to subscription %s @ %s",
|
|
||||||
hex.Enc(ev.ID), ev.Kind, len(msgData), d.id, d.sub.remote)
|
|
||||||
|
|
||||||
// Get write channel for this connection
|
|
||||||
p.Mx.RLock()
|
|
||||||
writeChan, hasChan := p.GetWriteChan(d.w)
|
|
||||||
stillSubscribed := p.Map[d.w] != nil
|
|
||||||
p.Mx.RUnlock()
|
|
||||||
|
|
||||||
if !stillSubscribed {
|
|
||||||
log.D.F("skipping delivery to %s - connection no longer subscribed", d.sub.remote)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if !hasChan {
|
|
||||||
log.D.F("skipping delivery to %s - no write channel available", d.sub.remote)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send to write channel - non-blocking with timeout
|
|
||||||
select {
|
select {
|
||||||
case <-p.c.Done():
|
case <-p.c.Done():
|
||||||
continue
|
continue
|
||||||
case writeChan <- publish.WriteRequest{Data: msgData, MsgType: websocket.TextMessage, IsControl: false}:
|
case d.sub.Receiver <- ev:
|
||||||
log.D.F("subscription delivery QUEUED: event=%s to=%s sub=%s len=%d",
|
log.D.F("subscription delivery QUEUED: event=%s to=%s sub=%s",
|
||||||
hex.Enc(ev.ID), d.sub.remote, d.id, len(msgData))
|
hex.Enc(ev.ID), d.sub.remote, d.id)
|
||||||
case <-time.After(DefaultWriteTimeout):
|
case <-time.After(DefaultWriteTimeout):
|
||||||
log.E.F("subscription delivery TIMEOUT: event=%s to=%s sub=%s",
|
log.E.F("subscription delivery TIMEOUT: event=%s to=%s sub=%s",
|
||||||
hex.Enc(ev.ID), d.sub.remote, d.id)
|
hex.Enc(ev.ID), d.sub.remote, d.id)
|
||||||
// Check if connection is still valid
|
// Receiver channel is full - subscription consumer is stuck or slow
|
||||||
p.Mx.RLock()
|
// The subscription should be removed by the cleanup logic
|
||||||
stillSubscribed = p.Map[d.w] != nil
|
|
||||||
p.Mx.RUnlock()
|
|
||||||
if !stillSubscribed {
|
|
||||||
log.D.F("removing failed subscriber connection: %s", d.sub.remote)
|
|
||||||
p.removeSubscriber(d.w)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
449
app/subscription_stability_test.go
Normal file
449
app/subscription_stability_test.go
Normal file
@@ -0,0 +1,449 @@
|
|||||||
|
package app
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"next.orly.dev/app/config"
|
||||||
|
"next.orly.dev/pkg/database"
|
||||||
|
"next.orly.dev/pkg/encoders/event"
|
||||||
|
"next.orly.dev/pkg/encoders/tag"
|
||||||
|
"next.orly.dev/pkg/interfaces/signer/p8k"
|
||||||
|
"next.orly.dev/pkg/protocol/publish"
|
||||||
|
)
|
||||||
|
|
||||||
|
// createSignedTestEvent creates a properly signed test event for use in tests
|
||||||
|
func createSignedTestEvent(t *testing.T, kind uint16, content string, tags ...*tag.T) *event.E {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
// Create a signer
|
||||||
|
signer, err := p8k.New()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create signer: %v", err)
|
||||||
|
}
|
||||||
|
defer signer.Zero()
|
||||||
|
|
||||||
|
// Generate a keypair
|
||||||
|
if err := signer.Generate(); err != nil {
|
||||||
|
t.Fatalf("Failed to generate keypair: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create event
|
||||||
|
ev := &event.E{
|
||||||
|
Kind: kind,
|
||||||
|
Content: []byte(content),
|
||||||
|
CreatedAt: time.Now().Unix(),
|
||||||
|
Tags: &tag.S{},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add any provided tags
|
||||||
|
for _, tg := range tags {
|
||||||
|
*ev.Tags = append(*ev.Tags, tg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sign the event (this sets Pubkey, ID, and Sig)
|
||||||
|
if err := ev.Sign(signer); err != nil {
|
||||||
|
t.Fatalf("Failed to sign event: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ev
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLongRunningSubscriptionStability verifies that subscriptions remain active
|
||||||
|
// for extended periods and correctly receive real-time events without dropping.
|
||||||
|
func TestLongRunningSubscriptionStability(t *testing.T) {
|
||||||
|
// Create test server
|
||||||
|
server, cleanup := setupTestServer(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
// Start HTTP test server
|
||||||
|
httpServer := httptest.NewServer(server)
|
||||||
|
defer httpServer.Close()
|
||||||
|
|
||||||
|
// Convert HTTP URL to WebSocket URL
|
||||||
|
wsURL := strings.Replace(httpServer.URL, "http://", "ws://", 1)
|
||||||
|
|
||||||
|
// Connect WebSocket client
|
||||||
|
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to connect WebSocket: %v", err)
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
// Subscribe to kind 1 events
|
||||||
|
subID := "test-long-running"
|
||||||
|
reqMsg := fmt.Sprintf(`["REQ","%s",{"kinds":[1]}]`, subID)
|
||||||
|
if err := conn.WriteMessage(websocket.TextMessage, []byte(reqMsg)); err != nil {
|
||||||
|
t.Fatalf("Failed to send REQ: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read until EOSE
|
||||||
|
gotEOSE := false
|
||||||
|
for !gotEOSE {
|
||||||
|
_, msg, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to read message: %v", err)
|
||||||
|
}
|
||||||
|
if strings.Contains(string(msg), `"EOSE"`) && strings.Contains(string(msg), subID) {
|
||||||
|
gotEOSE = true
|
||||||
|
t.Logf("Received EOSE for subscription %s", subID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up event counter
|
||||||
|
var receivedCount atomic.Int64
|
||||||
|
var mu sync.Mutex
|
||||||
|
receivedEvents := make(map[string]bool)
|
||||||
|
|
||||||
|
// Start goroutine to read events
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
readDone := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(readDone)
|
||||||
|
defer func() {
|
||||||
|
// Recover from any panic in read goroutine
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
t.Logf("Read goroutine panic (recovered): %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
// Check context first before attempting any read
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use a longer deadline and check context more frequently
|
||||||
|
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||||
|
_, msg, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
// Immediately check if context is done - if so, just exit without continuing
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for normal close
|
||||||
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if this is a timeout error - those are recoverable
|
||||||
|
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||||
|
// Double-check context before continuing
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Any other error means connection is broken, exit
|
||||||
|
t.Logf("Read error (non-timeout): %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse message to check if it's an EVENT for our subscription
|
||||||
|
var envelope []interface{}
|
||||||
|
if err := json.Unmarshal(msg, &envelope); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(envelope) >= 3 && envelope[0] == "EVENT" && envelope[1] == subID {
|
||||||
|
// Extract event ID
|
||||||
|
eventMap, ok := envelope[2].(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
eventID, ok := eventMap["id"].(string)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
if !receivedEvents[eventID] {
|
||||||
|
receivedEvents[eventID] = true
|
||||||
|
receivedCount.Add(1)
|
||||||
|
t.Logf("Received event %s (total: %d)", eventID[:8], receivedCount.Load())
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Publish events at regular intervals over 30 seconds
|
||||||
|
const numEvents = 30
|
||||||
|
const publishInterval = 1 * time.Second
|
||||||
|
|
||||||
|
publishCtx, publishCancel := context.WithTimeout(context.Background(), 35*time.Second)
|
||||||
|
defer publishCancel()
|
||||||
|
|
||||||
|
for i := 0; i < numEvents; i++ {
|
||||||
|
select {
|
||||||
|
case <-publishCtx.Done():
|
||||||
|
t.Fatalf("Publish timeout exceeded")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create and sign test event
|
||||||
|
ev := createSignedTestEvent(t, 1, fmt.Sprintf("Test event %d for long-running subscription", i))
|
||||||
|
|
||||||
|
// Save event to database
|
||||||
|
if _, err := server.D.SaveEvent(context.Background(), ev); err != nil {
|
||||||
|
t.Errorf("Failed to save event %d: %v", i, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manually trigger publisher to deliver event to subscriptions
|
||||||
|
server.publishers.Deliver(ev)
|
||||||
|
|
||||||
|
t.Logf("Published event %d", i)
|
||||||
|
|
||||||
|
// Wait before next publish
|
||||||
|
if i < numEvents-1 {
|
||||||
|
time.Sleep(publishInterval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait a bit more for all events to be delivered
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
|
// Cancel context and wait for reader to finish
|
||||||
|
cancel()
|
||||||
|
<-readDone
|
||||||
|
|
||||||
|
// Check results
|
||||||
|
received := receivedCount.Load()
|
||||||
|
t.Logf("Test complete: published %d events, received %d events", numEvents, received)
|
||||||
|
|
||||||
|
// We should receive at least 90% of events (allowing for some timing edge cases)
|
||||||
|
minExpected := int64(float64(numEvents) * 0.9)
|
||||||
|
if received < minExpected {
|
||||||
|
t.Errorf("Subscription stability issue: expected at least %d events, got %d", minExpected, received)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close subscription
|
||||||
|
closeMsg := fmt.Sprintf(`["CLOSE","%s"]`, subID)
|
||||||
|
if err := conn.WriteMessage(websocket.TextMessage, []byte(closeMsg)); err != nil {
|
||||||
|
t.Errorf("Failed to send CLOSE: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("Long-running subscription test PASSED: %d/%d events delivered", received, numEvents)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestMultipleConcurrentSubscriptions verifies that multiple subscriptions
|
||||||
|
// can coexist on the same connection without interfering with each other.
|
||||||
|
func TestMultipleConcurrentSubscriptions(t *testing.T) {
|
||||||
|
// Create test server
|
||||||
|
server, cleanup := setupTestServer(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
// Start HTTP test server
|
||||||
|
httpServer := httptest.NewServer(server)
|
||||||
|
defer httpServer.Close()
|
||||||
|
|
||||||
|
// Convert HTTP URL to WebSocket URL
|
||||||
|
wsURL := strings.Replace(httpServer.URL, "http://", "ws://", 1)
|
||||||
|
|
||||||
|
// Connect WebSocket client
|
||||||
|
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to connect WebSocket: %v", err)
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
// Create 3 subscriptions for different kinds
|
||||||
|
subscriptions := []struct {
|
||||||
|
id string
|
||||||
|
kind int
|
||||||
|
}{
|
||||||
|
{"sub1", 1},
|
||||||
|
{"sub2", 3},
|
||||||
|
{"sub3", 7},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe to all
|
||||||
|
for _, sub := range subscriptions {
|
||||||
|
reqMsg := fmt.Sprintf(`["REQ","%s",{"kinds":[%d]}]`, sub.id, sub.kind)
|
||||||
|
if err := conn.WriteMessage(websocket.TextMessage, []byte(reqMsg)); err != nil {
|
||||||
|
t.Fatalf("Failed to send REQ for %s: %v", sub.id, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read until we get EOSE for all subscriptions
|
||||||
|
eoseCount := 0
|
||||||
|
for eoseCount < len(subscriptions) {
|
||||||
|
_, msg, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to read message: %v", err)
|
||||||
|
}
|
||||||
|
if strings.Contains(string(msg), `"EOSE"`) {
|
||||||
|
eoseCount++
|
||||||
|
t.Logf("Received EOSE %d/%d", eoseCount, len(subscriptions))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Track received events per subscription
|
||||||
|
var mu sync.Mutex
|
||||||
|
receivedByKind := make(map[int]int)
|
||||||
|
|
||||||
|
// Start reader goroutine
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
readDone := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(readDone)
|
||||||
|
defer func() {
|
||||||
|
// Recover from any panic in read goroutine
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
t.Logf("Read goroutine panic (recovered): %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
// Check context first before attempting any read
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||||
|
_, msg, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
// Immediately check if context is done - if so, just exit without continuing
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for normal close
|
||||||
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if this is a timeout error - those are recoverable
|
||||||
|
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||||
|
// Double-check context before continuing
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Any other error means connection is broken, exit
|
||||||
|
t.Logf("Read error (non-timeout): %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse message
|
||||||
|
var envelope []interface{}
|
||||||
|
if err := json.Unmarshal(msg, &envelope); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(envelope) >= 3 && envelope[0] == "EVENT" {
|
||||||
|
eventMap, ok := envelope[2].(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
kindFloat, ok := eventMap["kind"].(float64)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
kind := int(kindFloat)
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
receivedByKind[kind]++
|
||||||
|
t.Logf("Received event for kind %d (count: %d)", kind, receivedByKind[kind])
|
||||||
|
mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Publish events for each kind
|
||||||
|
for _, sub := range subscriptions {
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
// Create and sign test event
|
||||||
|
ev := createSignedTestEvent(t, uint16(sub.kind), fmt.Sprintf("Test for kind %d event %d", sub.kind, i))
|
||||||
|
|
||||||
|
if _, err := server.D.SaveEvent(context.Background(), ev); err != nil {
|
||||||
|
t.Errorf("Failed to save event: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manually trigger publisher to deliver event to subscriptions
|
||||||
|
server.publishers.Deliver(ev)
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for events to be delivered
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
// Cancel and cleanup
|
||||||
|
cancel()
|
||||||
|
<-readDone
|
||||||
|
|
||||||
|
// Verify each subscription received its events
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
|
||||||
|
for _, sub := range subscriptions {
|
||||||
|
count := receivedByKind[sub.kind]
|
||||||
|
if count < 4 { // Allow for some timing issues, expect at least 4/5
|
||||||
|
t.Errorf("Subscription %s (kind %d) only received %d/5 events", sub.id, sub.kind, count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("Multiple concurrent subscriptions test PASSED")
|
||||||
|
}
|
||||||
|
|
||||||
|
// setupTestServer creates a test relay server for subscription testing
|
||||||
|
func setupTestServer(t *testing.T) (*Server, func()) {
|
||||||
|
// Setup test database
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
// Use a temporary directory for the test database
|
||||||
|
tmpDir := t.TempDir()
|
||||||
|
db, err := database.New(ctx, cancel, tmpDir, "test.db")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create test database: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup basic config
|
||||||
|
cfg := &config.C{
|
||||||
|
AuthRequired: false,
|
||||||
|
Owners: []string{},
|
||||||
|
Admins: []string{},
|
||||||
|
ACLMode: "none",
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup server
|
||||||
|
server := &Server{
|
||||||
|
Config: cfg,
|
||||||
|
D: db,
|
||||||
|
Ctx: ctx,
|
||||||
|
publishers: publish.New(NewPublisher(ctx)),
|
||||||
|
Admins: [][]byte{},
|
||||||
|
Owners: [][]byte{},
|
||||||
|
challenges: make(map[string][]byte),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cleanup function
|
||||||
|
cleanup := func() {
|
||||||
|
db.Close()
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
return server, cleanup
|
||||||
|
}
|
||||||
@@ -1,273 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
lol "lol.mleku.dev"
|
|
||||||
"next.orly.dev/app/config"
|
|
||||||
"next.orly.dev/pkg/encoders/event"
|
|
||||||
"next.orly.dev/pkg/encoders/tag"
|
|
||||||
"next.orly.dev/pkg/interfaces/signer/p8k"
|
|
||||||
"next.orly.dev/pkg/policy"
|
|
||||||
"next.orly.dev/pkg/run"
|
|
||||||
relaytester "next.orly.dev/relay-tester"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TestClusterPeerPolicyFiltering tests cluster peer synchronization with policy filtering.
|
|
||||||
// This test:
|
|
||||||
// 1. Starts multiple relays using the test relay launch functionality
|
|
||||||
// 2. Configures them as peers to each other (though sync managers are not fully implemented in this test)
|
|
||||||
// 3. Tests policy filtering with a kind whitelist that allows only specific event kinds
|
|
||||||
// 4. Verifies that the policy correctly allows/denies events based on the whitelist
|
|
||||||
//
|
|
||||||
// Note: This test focuses on the policy filtering aspect of cluster peers.
|
|
||||||
// Full cluster synchronization testing would require implementing the sync manager
|
|
||||||
// integration, which is beyond the scope of this initial test.
|
|
||||||
func TestClusterPeerPolicyFiltering(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping cluster peer integration test")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Number of relays in the cluster
|
|
||||||
numRelays := 3
|
|
||||||
|
|
||||||
// Start multiple test relays
|
|
||||||
relays, ports, err := startTestRelays(numRelays)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to start test relays: %v", err)
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
for _, relay := range relays {
|
|
||||||
if tr, ok := relay.(*testRelay); ok {
|
|
||||||
if stopErr := tr.Stop(); stopErr != nil {
|
|
||||||
t.Logf("Error stopping relay: %v", stopErr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Create relay URLs
|
|
||||||
relayURLs := make([]string, numRelays)
|
|
||||||
for i, port := range ports {
|
|
||||||
relayURLs[i] = fmt.Sprintf("http://127.0.0.1:%d", port)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for all relays to be ready
|
|
||||||
for _, url := range relayURLs {
|
|
||||||
wsURL := strings.Replace(url, "http://", "ws://", 1) // Convert http to ws
|
|
||||||
if err := waitForTestRelay(wsURL, 10*time.Second); err != nil {
|
|
||||||
t.Fatalf("Relay not ready after timeout: %s, %v", wsURL, err)
|
|
||||||
}
|
|
||||||
t.Logf("Relay is ready at %s", wsURL)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create policy configuration with small kind whitelist
|
|
||||||
policyJSON := map[string]interface{}{
|
|
||||||
"kind": map[string]interface{}{
|
|
||||||
"whitelist": []int{1, 7, 42}, // Allow only text notes, user statuses, and channel messages
|
|
||||||
},
|
|
||||||
"default_policy": "allow", // Allow everything not explicitly denied
|
|
||||||
}
|
|
||||||
|
|
||||||
policyJSONBytes, err := json.MarshalIndent(policyJSON, "", " ")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to marshal policy JSON: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create temporary directory for policy config
|
|
||||||
tempDir := t.TempDir()
|
|
||||||
configDir := filepath.Join(tempDir, "ORLY_POLICY")
|
|
||||||
if err := os.MkdirAll(configDir, 0755); err != nil {
|
|
||||||
t.Fatalf("Failed to create config directory: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
policyPath := filepath.Join(configDir, "policy.json")
|
|
||||||
if err := os.WriteFile(policyPath, policyJSONBytes, 0644); err != nil {
|
|
||||||
t.Fatalf("Failed to write policy file: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create policy from JSON directly for testing
|
|
||||||
testPolicy, err := policy.New(policyJSONBytes)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create policy: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Generate test keys
|
|
||||||
signer := p8k.MustNew()
|
|
||||||
if err := signer.Generate(); err != nil {
|
|
||||||
t.Fatalf("Failed to generate test signer: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create test events of different kinds
|
|
||||||
testEvents := []*event.E{
|
|
||||||
// Kind 1 (text note) - should be allowed by policy
|
|
||||||
createTestEvent(t, signer, "Text note - should sync", 1),
|
|
||||||
// Kind 7 (user status) - should be allowed by policy
|
|
||||||
createTestEvent(t, signer, "User status - should sync", 7),
|
|
||||||
// Kind 42 (channel message) - should be allowed by policy
|
|
||||||
createTestEvent(t, signer, "Channel message - should sync", 42),
|
|
||||||
// Kind 0 (metadata) - should be denied by policy
|
|
||||||
createTestEvent(t, signer, "Metadata - should NOT sync", 0),
|
|
||||||
// Kind 3 (follows) - should be denied by policy
|
|
||||||
createTestEvent(t, signer, "Follows - should NOT sync", 3),
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("Created %d test events", len(testEvents))
|
|
||||||
|
|
||||||
// Publish events to the first relay (non-policy relay)
|
|
||||||
firstRelayWS := fmt.Sprintf("ws://127.0.0.1:%d", ports[0])
|
|
||||||
client, err := relaytester.NewClient(firstRelayWS)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to connect to first relay: %v", err)
|
|
||||||
}
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
// Publish all events to the first relay
|
|
||||||
for i, ev := range testEvents {
|
|
||||||
if err := client.Publish(ev); err != nil {
|
|
||||||
t.Fatalf("Failed to publish event %d: %v", i, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for OK response
|
|
||||||
accepted, reason, err := client.WaitForOK(ev.ID, 5*time.Second)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to get OK response for event %d: %v", i, err)
|
|
||||||
}
|
|
||||||
if !accepted {
|
|
||||||
t.Logf("Event %d rejected: %s (kind: %d)", i, reason, ev.Kind)
|
|
||||||
} else {
|
|
||||||
t.Logf("Event %d accepted (kind: %d)", i, ev.Kind)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test policy filtering directly
|
|
||||||
t.Logf("Testing policy filtering...")
|
|
||||||
|
|
||||||
// Test that the policy correctly allows/denies events based on the whitelist
|
|
||||||
// Only kinds 1, 7, and 42 should be allowed
|
|
||||||
for i, ev := range testEvents {
|
|
||||||
allowed, err := testPolicy.CheckPolicy("write", ev, signer.Pub(), "127.0.0.1")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Policy check failed for event %d: %v", i, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedAllowed := ev.Kind == 1 || ev.Kind == 7 || ev.Kind == 42
|
|
||||||
if allowed != expectedAllowed {
|
|
||||||
t.Errorf("Event %d (kind %d): expected allowed=%v, got %v", i, ev.Kind, expectedAllowed, allowed)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("Policy filtering test completed successfully")
|
|
||||||
|
|
||||||
// Note: In a real cluster setup, the sync manager would use this policy
|
|
||||||
// to filter events during synchronization between peers. This test demonstrates
|
|
||||||
// that the policy correctly identifies which events should be allowed to sync.
|
|
||||||
}
|
|
||||||
|
|
||||||
// testRelay wraps a run.Relay for testing purposes
|
|
||||||
type testRelay struct {
|
|
||||||
*run.Relay
|
|
||||||
}
|
|
||||||
|
|
||||||
// startTestRelays starts multiple test relays with different configurations
|
|
||||||
func startTestRelays(count int) ([]interface{}, []int, error) {
|
|
||||||
relays := make([]interface{}, count)
|
|
||||||
ports := make([]int, count)
|
|
||||||
|
|
||||||
for i := 0; i < count; i++ {
|
|
||||||
cfg := &config.C{
|
|
||||||
AppName: fmt.Sprintf("ORLY-TEST-%d", i),
|
|
||||||
DataDir: "", // Use temp dir
|
|
||||||
Listen: "127.0.0.1",
|
|
||||||
Port: 0, // Random port
|
|
||||||
HealthPort: 0,
|
|
||||||
EnableShutdown: false,
|
|
||||||
LogLevel: "warn",
|
|
||||||
DBLogLevel: "warn",
|
|
||||||
DBBlockCacheMB: 512,
|
|
||||||
DBIndexCacheMB: 256,
|
|
||||||
LogToStdout: false,
|
|
||||||
PprofHTTP: false,
|
|
||||||
ACLMode: "none",
|
|
||||||
AuthRequired: false,
|
|
||||||
AuthToWrite: false,
|
|
||||||
SubscriptionEnabled: false,
|
|
||||||
MonthlyPriceSats: 6000,
|
|
||||||
FollowListFrequency: time.Hour,
|
|
||||||
WebDisableEmbedded: false,
|
|
||||||
SprocketEnabled: false,
|
|
||||||
SpiderMode: "none",
|
|
||||||
PolicyEnabled: false, // We'll enable it separately for one relay
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find available port
|
|
||||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("failed to find available port for relay %d: %w", i, err)
|
|
||||||
}
|
|
||||||
addr := listener.Addr().(*net.TCPAddr)
|
|
||||||
cfg.Port = addr.Port
|
|
||||||
listener.Close()
|
|
||||||
|
|
||||||
// Set up logging
|
|
||||||
lol.SetLogLevel(cfg.LogLevel)
|
|
||||||
|
|
||||||
opts := &run.Options{
|
|
||||||
CleanupDataDir: func(b bool) *bool { return &b }(true),
|
|
||||||
}
|
|
||||||
|
|
||||||
relay, err := run.Start(cfg, opts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("failed to start relay %d: %w", i, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
relays[i] = &testRelay{Relay: relay}
|
|
||||||
ports[i] = cfg.Port
|
|
||||||
}
|
|
||||||
|
|
||||||
return relays, ports, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// waitForTestRelay waits for a relay to be ready by attempting to connect
|
|
||||||
func waitForTestRelay(url string, timeout time.Duration) error {
|
|
||||||
// Extract host:port from ws:// URL
|
|
||||||
addr := url
|
|
||||||
if len(url) > 5 && url[:5] == "ws://" {
|
|
||||||
addr = url[5:]
|
|
||||||
}
|
|
||||||
deadline := time.Now().Add(timeout)
|
|
||||||
attempts := 0
|
|
||||||
for time.Now().Before(deadline) {
|
|
||||||
conn, err := net.DialTimeout("tcp", addr, 500*time.Millisecond)
|
|
||||||
if err == nil {
|
|
||||||
conn.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
attempts++
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
}
|
|
||||||
return fmt.Errorf("timeout waiting for relay at %s after %d attempts", url, attempts)
|
|
||||||
}
|
|
||||||
|
|
||||||
// createTestEvent creates a test event with proper signing
|
|
||||||
func createTestEvent(t *testing.T, signer *p8k.Signer, content string, eventKind uint16) *event.E {
|
|
||||||
ev := event.New()
|
|
||||||
ev.CreatedAt = time.Now().Unix()
|
|
||||||
ev.Kind = eventKind
|
|
||||||
ev.Content = []byte(content)
|
|
||||||
ev.Tags = tag.NewS()
|
|
||||||
|
|
||||||
// Sign the event
|
|
||||||
if err := ev.Sign(signer); err != nil {
|
|
||||||
t.Fatalf("Failed to sign test event: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return ev
|
|
||||||
}
|
|
||||||
268
cmd/subscription-test-simple/main.go
Normal file
268
cmd/subscription-test-simple/main.go
Normal file
@@ -0,0 +1,268 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
relayURL = flag.String("url", "ws://localhost:3334", "Relay WebSocket URL")
|
||||||
|
duration = flag.Int("duration", 120, "Test duration in seconds")
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
log.SetFlags(log.Ltime)
|
||||||
|
|
||||||
|
fmt.Println("===================================")
|
||||||
|
fmt.Println("Simple Subscription Stability Test")
|
||||||
|
fmt.Println("===================================")
|
||||||
|
fmt.Printf("Relay: %s\n", *relayURL)
|
||||||
|
fmt.Printf("Duration: %d seconds\n", *duration)
|
||||||
|
fmt.Println()
|
||||||
|
fmt.Println("This test verifies that subscriptions remain")
|
||||||
|
fmt.Println("active without dropping over the test period.")
|
||||||
|
fmt.Println()
|
||||||
|
|
||||||
|
// Connect to relay
|
||||||
|
log.Printf("Connecting to %s...", *relayURL)
|
||||||
|
conn, _, err := websocket.DefaultDialer.Dial(*relayURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect: %v", err)
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
log.Printf("✓ Connected")
|
||||||
|
|
||||||
|
// Context for the test
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(*duration+10)*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Handle interrupts
|
||||||
|
sigChan := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
|
||||||
|
go func() {
|
||||||
|
<-sigChan
|
||||||
|
log.Println("\nInterrupted, shutting down...")
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Subscribe
|
||||||
|
subID := fmt.Sprintf("stability-test-%d", time.Now().Unix())
|
||||||
|
reqMsg := []interface{}{"REQ", subID, map[string]interface{}{"kinds": []int{1}}}
|
||||||
|
reqMsgBytes, _ := json.Marshal(reqMsg)
|
||||||
|
|
||||||
|
log.Printf("Sending subscription: %s", subID)
|
||||||
|
if err := conn.WriteMessage(websocket.TextMessage, reqMsgBytes); err != nil {
|
||||||
|
log.Fatalf("Failed to send REQ: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Track connection health
|
||||||
|
lastMessageTime := time.Now()
|
||||||
|
gotEOSE := false
|
||||||
|
messageCount := 0
|
||||||
|
pingCount := 0
|
||||||
|
|
||||||
|
// Read goroutine
|
||||||
|
readDone := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(readDone)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
|
||||||
|
msgType, msg, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if netErr, ok := err.(interface{ Timeout() bool }); ok && netErr.Timeout() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Printf("Read error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
lastMessageTime = time.Now()
|
||||||
|
messageCount++
|
||||||
|
|
||||||
|
// Handle PING
|
||||||
|
if msgType == websocket.PingMessage {
|
||||||
|
pingCount++
|
||||||
|
log.Printf("Received PING #%d, sending PONG", pingCount)
|
||||||
|
conn.WriteMessage(websocket.PongMessage, nil)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse message
|
||||||
|
var envelope []json.RawMessage
|
||||||
|
if err := json.Unmarshal(msg, &envelope); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(envelope) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var msgTypeStr string
|
||||||
|
json.Unmarshal(envelope[0], &msgTypeStr)
|
||||||
|
|
||||||
|
switch msgTypeStr {
|
||||||
|
case "EOSE":
|
||||||
|
var recvSubID string
|
||||||
|
json.Unmarshal(envelope[1], &recvSubID)
|
||||||
|
if recvSubID == subID && !gotEOSE {
|
||||||
|
gotEOSE = true
|
||||||
|
log.Printf("✓ Received EOSE - subscription is active")
|
||||||
|
}
|
||||||
|
|
||||||
|
case "EVENT":
|
||||||
|
var recvSubID string
|
||||||
|
json.Unmarshal(envelope[1], &recvSubID)
|
||||||
|
if recvSubID == subID {
|
||||||
|
log.Printf("Received EVENT (subscription still active)")
|
||||||
|
}
|
||||||
|
|
||||||
|
case "CLOSED":
|
||||||
|
var recvSubID string
|
||||||
|
json.Unmarshal(envelope[1], &recvSubID)
|
||||||
|
if recvSubID == subID {
|
||||||
|
log.Printf("⚠ Subscription CLOSED by relay!")
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
case "NOTICE":
|
||||||
|
var notice string
|
||||||
|
json.Unmarshal(envelope[1], ¬ice)
|
||||||
|
log.Printf("NOTICE: %s", notice)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for EOSE
|
||||||
|
log.Println("Waiting for EOSE...")
|
||||||
|
for !gotEOSE && ctx.Err() == nil {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !gotEOSE {
|
||||||
|
log.Fatal("Did not receive EOSE")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Monitor loop
|
||||||
|
startTime := time.Now()
|
||||||
|
ticker := time.NewTicker(10 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
log.Println()
|
||||||
|
log.Printf("Subscription is active. Monitoring for %d seconds...", *duration)
|
||||||
|
log.Println("(Subscription should stay active even without events)")
|
||||||
|
log.Println()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
goto done
|
||||||
|
case <-ticker.C:
|
||||||
|
elapsed := time.Since(startTime)
|
||||||
|
timeSinceMessage := time.Since(lastMessageTime)
|
||||||
|
|
||||||
|
log.Printf("[%3.0fs/%ds] Messages: %d | Last message: %.0fs ago | Status: %s",
|
||||||
|
elapsed.Seconds(),
|
||||||
|
*duration,
|
||||||
|
messageCount,
|
||||||
|
timeSinceMessage.Seconds(),
|
||||||
|
getStatus(timeSinceMessage),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Check if we've reached duration
|
||||||
|
if elapsed >= time.Duration(*duration)*time.Second {
|
||||||
|
goto done
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
done:
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
// Wait for reader
|
||||||
|
select {
|
||||||
|
case <-readDone:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send CLOSE
|
||||||
|
closeMsg := []interface{}{"CLOSE", subID}
|
||||||
|
closeMsgBytes, _ := json.Marshal(closeMsg)
|
||||||
|
conn.WriteMessage(websocket.TextMessage, closeMsgBytes)
|
||||||
|
|
||||||
|
// Results
|
||||||
|
elapsed := time.Since(startTime)
|
||||||
|
timeSinceMessage := time.Since(lastMessageTime)
|
||||||
|
|
||||||
|
fmt.Println()
|
||||||
|
fmt.Println("===================================")
|
||||||
|
fmt.Println("Test Results")
|
||||||
|
fmt.Println("===================================")
|
||||||
|
fmt.Printf("Duration: %.1f seconds\n", elapsed.Seconds())
|
||||||
|
fmt.Printf("Total messages: %d\n", messageCount)
|
||||||
|
fmt.Printf("Last message: %.0f seconds ago\n", timeSinceMessage.Seconds())
|
||||||
|
fmt.Println()
|
||||||
|
|
||||||
|
// Determine success
|
||||||
|
if timeSinceMessage < 15*time.Second {
|
||||||
|
// Recent message - subscription is alive
|
||||||
|
fmt.Println("✓ TEST PASSED")
|
||||||
|
fmt.Println("Subscription remained active throughout test period.")
|
||||||
|
fmt.Println("Recent messages indicate healthy connection.")
|
||||||
|
} else if timeSinceMessage < 30*time.Second {
|
||||||
|
// Somewhat recent - probably OK
|
||||||
|
fmt.Println("✓ TEST LIKELY PASSED")
|
||||||
|
fmt.Println("Subscription appears active (message received recently).")
|
||||||
|
fmt.Println("Some delay is normal if relay is idle.")
|
||||||
|
} else if messageCount > 0 {
|
||||||
|
// Got EOSE but nothing since
|
||||||
|
fmt.Println("⚠ INCONCLUSIVE")
|
||||||
|
fmt.Println("Subscription was established but no activity since.")
|
||||||
|
fmt.Println("This is expected if relay has no events and doesn't send pings.")
|
||||||
|
fmt.Println("To properly test, publish events during the test period.")
|
||||||
|
} else {
|
||||||
|
// No messages at all
|
||||||
|
fmt.Println("✗ TEST FAILED")
|
||||||
|
fmt.Println("No messages received - subscription may have failed.")
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println()
|
||||||
|
fmt.Println("Note: This test verifies the subscription stays registered.")
|
||||||
|
fmt.Println("For full testing, publish events while this runs and verify")
|
||||||
|
fmt.Println("they are received throughout the entire test duration.")
|
||||||
|
}
|
||||||
|
|
||||||
|
func getStatus(timeSince time.Duration) string {
|
||||||
|
seconds := timeSince.Seconds()
|
||||||
|
switch {
|
||||||
|
case seconds < 10:
|
||||||
|
return "ACTIVE (recent message)"
|
||||||
|
case seconds < 30:
|
||||||
|
return "IDLE (normal)"
|
||||||
|
case seconds < 60:
|
||||||
|
return "QUIET (possibly normal)"
|
||||||
|
default:
|
||||||
|
return "STALE (may have dropped)"
|
||||||
|
}
|
||||||
|
}
|
||||||
347
cmd/subscription-test/main.go
Normal file
347
cmd/subscription-test/main.go
Normal file
@@ -0,0 +1,347 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"sync/atomic"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
relayURL = flag.String("url", "ws://localhost:3334", "Relay WebSocket URL")
|
||||||
|
duration = flag.Int("duration", 60, "Test duration in seconds")
|
||||||
|
eventKind = flag.Int("kind", 1, "Event kind to subscribe to")
|
||||||
|
verbose = flag.Bool("v", false, "Verbose output")
|
||||||
|
subID = flag.String("sub", "", "Subscription ID (default: auto-generated)")
|
||||||
|
)
|
||||||
|
|
||||||
|
type NostrEvent struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
PubKey string `json:"pubkey"`
|
||||||
|
CreatedAt int64 `json:"created_at"`
|
||||||
|
Kind int `json:"kind"`
|
||||||
|
Tags [][]string `json:"tags"`
|
||||||
|
Content string `json:"content"`
|
||||||
|
Sig string `json:"sig"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
log.SetFlags(log.Ltime | log.Lmicroseconds)
|
||||||
|
|
||||||
|
// Generate subscription ID if not provided
|
||||||
|
subscriptionID := *subID
|
||||||
|
if subscriptionID == "" {
|
||||||
|
subscriptionID = fmt.Sprintf("test-%d", time.Now().Unix())
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Starting subscription stability test")
|
||||||
|
log.Printf("Relay: %s", *relayURL)
|
||||||
|
log.Printf("Duration: %d seconds", *duration)
|
||||||
|
log.Printf("Event kind: %d", *eventKind)
|
||||||
|
log.Printf("Subscription ID: %s", subscriptionID)
|
||||||
|
log.Println()
|
||||||
|
|
||||||
|
// Connect to relay
|
||||||
|
log.Printf("Connecting to %s...", *relayURL)
|
||||||
|
conn, _, err := websocket.DefaultDialer.Dial(*relayURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect: %v", err)
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
log.Printf("✓ Connected")
|
||||||
|
log.Println()
|
||||||
|
|
||||||
|
// Context for the test
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(*duration+10)*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Handle interrupts
|
||||||
|
sigChan := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
|
||||||
|
go func() {
|
||||||
|
<-sigChan
|
||||||
|
log.Println("\nInterrupted, shutting down...")
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Counters
|
||||||
|
var receivedCount atomic.Int64
|
||||||
|
var lastEventTime atomic.Int64
|
||||||
|
lastEventTime.Store(time.Now().Unix())
|
||||||
|
|
||||||
|
// Subscribe
|
||||||
|
reqMsg := map[string]interface{}{
|
||||||
|
"kinds": []int{*eventKind},
|
||||||
|
}
|
||||||
|
reqMsgBytes, _ := json.Marshal(reqMsg)
|
||||||
|
subscribeMsg := []interface{}{"REQ", subscriptionID, json.RawMessage(reqMsgBytes)}
|
||||||
|
subscribeMsgBytes, _ := json.Marshal(subscribeMsg)
|
||||||
|
|
||||||
|
log.Printf("Sending REQ: %s", string(subscribeMsgBytes))
|
||||||
|
if err := conn.WriteMessage(websocket.TextMessage, subscribeMsgBytes); err != nil {
|
||||||
|
log.Fatalf("Failed to send REQ: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read messages
|
||||||
|
gotEOSE := false
|
||||||
|
readDone := make(chan struct{})
|
||||||
|
consecutiveTimeouts := 0
|
||||||
|
maxConsecutiveTimeouts := 20 // Exit if we get too many consecutive timeouts
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(readDone)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
|
||||||
|
_, msg, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
// Check for normal close
|
||||||
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
|
||||||
|
log.Println("Connection closed normally")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if context was cancelled
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for timeout errors (these are expected during idle periods)
|
||||||
|
if netErr, ok := err.(interface{ Timeout() bool }); ok && netErr.Timeout() {
|
||||||
|
consecutiveTimeouts++
|
||||||
|
if consecutiveTimeouts >= maxConsecutiveTimeouts {
|
||||||
|
log.Printf("Too many consecutive read timeouts (%d), connection may be dead", consecutiveTimeouts)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Only log every 5th timeout to avoid spam
|
||||||
|
if *verbose && consecutiveTimeouts%5 == 0 {
|
||||||
|
log.Printf("Read timeout (idle period, %d consecutive)", consecutiveTimeouts)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// For any other error, log and exit
|
||||||
|
log.Printf("Read error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset timeout counter on successful read
|
||||||
|
consecutiveTimeouts = 0
|
||||||
|
|
||||||
|
// Parse message
|
||||||
|
var envelope []json.RawMessage
|
||||||
|
if err := json.Unmarshal(msg, &envelope); err != nil {
|
||||||
|
if *verbose {
|
||||||
|
log.Printf("Failed to parse message: %v", err)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(envelope) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var msgType string
|
||||||
|
json.Unmarshal(envelope[0], &msgType)
|
||||||
|
|
||||||
|
// Check message type
|
||||||
|
switch msgType {
|
||||||
|
case "EOSE":
|
||||||
|
var recvSubID string
|
||||||
|
json.Unmarshal(envelope[1], &recvSubID)
|
||||||
|
if recvSubID == subscriptionID {
|
||||||
|
if !gotEOSE {
|
||||||
|
gotEOSE = true
|
||||||
|
log.Printf("✓ Received EOSE - subscription is active")
|
||||||
|
log.Println()
|
||||||
|
log.Println("Waiting for real-time events...")
|
||||||
|
log.Println()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case "EVENT":
|
||||||
|
var recvSubID string
|
||||||
|
json.Unmarshal(envelope[1], &recvSubID)
|
||||||
|
if recvSubID == subscriptionID {
|
||||||
|
var event NostrEvent
|
||||||
|
if err := json.Unmarshal(envelope[2], &event); err == nil {
|
||||||
|
count := receivedCount.Add(1)
|
||||||
|
lastEventTime.Store(time.Now().Unix())
|
||||||
|
|
||||||
|
eventIDShort := event.ID
|
||||||
|
if len(eventIDShort) > 8 {
|
||||||
|
eventIDShort = eventIDShort[:8]
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("[EVENT #%d] id=%s kind=%d created=%d",
|
||||||
|
count, eventIDShort, event.Kind, event.CreatedAt)
|
||||||
|
|
||||||
|
if *verbose {
|
||||||
|
log.Printf(" content: %s", event.Content)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case "NOTICE":
|
||||||
|
var notice string
|
||||||
|
json.Unmarshal(envelope[1], ¬ice)
|
||||||
|
log.Printf("[NOTICE] %s", notice)
|
||||||
|
|
||||||
|
case "CLOSED":
|
||||||
|
var recvSubID, reason string
|
||||||
|
json.Unmarshal(envelope[1], &recvSubID)
|
||||||
|
if len(envelope) > 2 {
|
||||||
|
json.Unmarshal(envelope[2], &reason)
|
||||||
|
}
|
||||||
|
if recvSubID == subscriptionID {
|
||||||
|
log.Printf("⚠ Subscription CLOSED by relay: %s", reason)
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
case "OK":
|
||||||
|
// Ignore OK messages for this test
|
||||||
|
|
||||||
|
default:
|
||||||
|
if *verbose {
|
||||||
|
log.Printf("Unknown message type: %s", msgType)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for EOSE with timeout
|
||||||
|
eoseTimeout := time.After(10 * time.Second)
|
||||||
|
for !gotEOSE {
|
||||||
|
select {
|
||||||
|
case <-eoseTimeout:
|
||||||
|
log.Printf("⚠ Warning: No EOSE received within 10 seconds")
|
||||||
|
gotEOSE = true // Continue anyway
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Println("Test cancelled before EOSE")
|
||||||
|
return
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
// Keep waiting
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Monitor for subscription drops
|
||||||
|
startTime := time.Now()
|
||||||
|
endTime := startTime.Add(time.Duration(*duration) * time.Second)
|
||||||
|
|
||||||
|
// Start monitoring goroutine
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(5 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
elapsed := time.Since(startTime).Seconds()
|
||||||
|
lastEvent := lastEventTime.Load()
|
||||||
|
timeSinceLastEvent := time.Now().Unix() - lastEvent
|
||||||
|
|
||||||
|
log.Printf("[STATUS] Elapsed: %.0fs/%ds | Events: %d | Last event: %ds ago",
|
||||||
|
elapsed, *duration, receivedCount.Load(), timeSinceLastEvent)
|
||||||
|
|
||||||
|
// Warn if no events for a while (but only if we've seen events before)
|
||||||
|
if receivedCount.Load() > 0 && timeSinceLastEvent > 30 {
|
||||||
|
log.Printf("⚠ Warning: No events received for %ds - subscription may have dropped", timeSinceLastEvent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for test duration
|
||||||
|
log.Printf("Test running for %d seconds...", *duration)
|
||||||
|
log.Println("(You can publish events to the relay in another terminal)")
|
||||||
|
log.Println()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// Test completed or interrupted
|
||||||
|
case <-time.After(time.Until(endTime)):
|
||||||
|
// Duration elapsed
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait a bit for final events
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
// Wait for reader to finish
|
||||||
|
select {
|
||||||
|
case <-readDone:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
log.Println("Reader goroutine didn't exit cleanly")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send CLOSE
|
||||||
|
closeMsg := []interface{}{"CLOSE", subscriptionID}
|
||||||
|
closeMsgBytes, _ := json.Marshal(closeMsg)
|
||||||
|
conn.WriteMessage(websocket.TextMessage, closeMsgBytes)
|
||||||
|
|
||||||
|
// Print results
|
||||||
|
log.Println()
|
||||||
|
log.Println("===================================")
|
||||||
|
log.Println("Test Results")
|
||||||
|
log.Println("===================================")
|
||||||
|
log.Printf("Duration: %.1f seconds", time.Since(startTime).Seconds())
|
||||||
|
log.Printf("Events received: %d", receivedCount.Load())
|
||||||
|
log.Printf("Subscription ID: %s", subscriptionID)
|
||||||
|
|
||||||
|
lastEvent := lastEventTime.Load()
|
||||||
|
if lastEvent > startTime.Unix() {
|
||||||
|
log.Printf("Last event: %ds ago", time.Now().Unix()-lastEvent)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println()
|
||||||
|
|
||||||
|
// Determine pass/fail
|
||||||
|
received := receivedCount.Load()
|
||||||
|
testDuration := time.Since(startTime).Seconds()
|
||||||
|
|
||||||
|
if received == 0 {
|
||||||
|
log.Println("⚠ No events received during test")
|
||||||
|
log.Println("This is expected if no events were published")
|
||||||
|
log.Println("To test properly, publish events while this is running:")
|
||||||
|
log.Println()
|
||||||
|
log.Println(" # In another terminal:")
|
||||||
|
log.Printf(" ./orly # Make sure relay is running\n")
|
||||||
|
log.Println()
|
||||||
|
log.Println(" # Then publish test events (implementation-specific)")
|
||||||
|
} else {
|
||||||
|
eventsPerSecond := float64(received) / testDuration
|
||||||
|
log.Printf("Rate: %.2f events/second", eventsPerSecond)
|
||||||
|
|
||||||
|
lastEvent := lastEventTime.Load()
|
||||||
|
timeSinceLastEvent := time.Now().Unix() - lastEvent
|
||||||
|
|
||||||
|
if timeSinceLastEvent < 10 {
|
||||||
|
log.Println()
|
||||||
|
log.Println("✓ TEST PASSED - Subscription remained stable")
|
||||||
|
log.Println("Events were received recently, indicating subscription is still active")
|
||||||
|
} else {
|
||||||
|
log.Println()
|
||||||
|
log.Printf("⚠ Potential issue - Last event was %ds ago", timeSinceLastEvent)
|
||||||
|
log.Println("Subscription may have dropped if events were still being published")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
|
|
||||||
"lol.mleku.dev/chk"
|
"lol.mleku.dev/chk"
|
||||||
|
"lol.mleku.dev/log"
|
||||||
"next.orly.dev/pkg/encoders/envelopes"
|
"next.orly.dev/pkg/encoders/envelopes"
|
||||||
"next.orly.dev/pkg/encoders/filter"
|
"next.orly.dev/pkg/encoders/filter"
|
||||||
"next.orly.dev/pkg/encoders/text"
|
"next.orly.dev/pkg/encoders/text"
|
||||||
@@ -85,19 +86,24 @@ func (en *T) Marshal(dst []byte) (b []byte) {
|
|||||||
// string is correctly unescaped by NIP-01 escaping rules.
|
// string is correctly unescaped by NIP-01 escaping rules.
|
||||||
func (en *T) Unmarshal(b []byte) (r []byte, err error) {
|
func (en *T) Unmarshal(b []byte) (r []byte, err error) {
|
||||||
r = b
|
r = b
|
||||||
|
log.I.F("%s", r)
|
||||||
if en.Subscription, r, err = text.UnmarshalQuoted(r); chk.E(err) {
|
if en.Subscription, r, err = text.UnmarshalQuoted(r); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.I.F("%s", r)
|
||||||
if r, err = text.Comma(r); chk.E(err) {
|
if r, err = text.Comma(r); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.I.F("%s", r)
|
||||||
en.Filters = new(filter.S)
|
en.Filters = new(filter.S)
|
||||||
if r, err = en.Filters.Unmarshal(r); chk.E(err) {
|
if r, err = en.Filters.Unmarshal(r); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.I.F("%s", r)
|
||||||
if r, err = envelopes.SkipToTheEnd(r); chk.E(err) {
|
if r, err = envelopes.SkipToTheEnd(r); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.I.F("%s", r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -47,17 +47,24 @@ func (s *S) Marshal(dst []byte) (b []byte) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Unmarshal decodes one or more filters from JSON.
|
// Unmarshal decodes one or more filters from JSON.
|
||||||
|
// This handles both array-wrapped filters [{},...] and unwrapped filters {},...
|
||||||
func (s *S) Unmarshal(b []byte) (r []byte, err error) {
|
func (s *S) Unmarshal(b []byte) (r []byte, err error) {
|
||||||
r = b
|
r = b
|
||||||
if len(r) == 0 {
|
if len(r) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
r = r[1:]
|
|
||||||
// Handle empty array "[]"
|
// Check if filters are wrapped in an array
|
||||||
if len(r) > 0 && r[0] == ']' {
|
isArrayWrapped := r[0] == '['
|
||||||
|
if isArrayWrapped {
|
||||||
r = r[1:]
|
r = r[1:]
|
||||||
return
|
// Handle empty array "[]"
|
||||||
|
if len(r) > 0 && r[0] == ']' {
|
||||||
|
r = r[1:]
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if len(r) == 0 {
|
if len(r) == 0 {
|
||||||
return
|
return
|
||||||
@@ -73,13 +80,17 @@ func (s *S) Unmarshal(b []byte) (r []byte, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if r[0] == ',' {
|
if r[0] == ',' {
|
||||||
// Next element in the array
|
// Next element
|
||||||
r = r[1:]
|
r = r[1:]
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if r[0] == ']' {
|
if r[0] == ']' {
|
||||||
// End of the enclosed array; consume and return
|
// End of array or envelope
|
||||||
r = r[1:]
|
if isArrayWrapped {
|
||||||
|
// Consume the closing bracket of the filter array
|
||||||
|
r = r[1:]
|
||||||
|
}
|
||||||
|
// Otherwise leave it for the envelope parser
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Unexpected token
|
// Unexpected token
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
v0.26.0
|
v0.26.3
|
||||||
245
relay_test.go
245
relay_test.go
@@ -1,245 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
lol "lol.mleku.dev"
|
|
||||||
"next.orly.dev/app/config"
|
|
||||||
"next.orly.dev/pkg/run"
|
|
||||||
relaytester "next.orly.dev/relay-tester"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
testRelayURL string
|
|
||||||
testName string
|
|
||||||
testJSON bool
|
|
||||||
keepDataDir bool
|
|
||||||
relayPort int
|
|
||||||
relayDataDir string
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestRelay(t *testing.T) {
|
|
||||||
var err error
|
|
||||||
var relay *run.Relay
|
|
||||||
var relayURL string
|
|
||||||
|
|
||||||
// Determine relay URL
|
|
||||||
if testRelayURL != "" {
|
|
||||||
relayURL = testRelayURL
|
|
||||||
} else {
|
|
||||||
// Start local relay for testing
|
|
||||||
var port int
|
|
||||||
if relay, port, err = startTestRelay(); err != nil {
|
|
||||||
t.Fatalf("Failed to start test relay: %v", err)
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if stopErr := relay.Stop(); stopErr != nil {
|
|
||||||
t.Logf("Error stopping relay: %v", stopErr)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
relayURL = fmt.Sprintf("ws://127.0.0.1:%d", port)
|
|
||||||
t.Logf("Waiting for relay to be ready at %s...", relayURL)
|
|
||||||
// Wait for relay to be ready - try connecting to verify it's up
|
|
||||||
if err = waitForRelay(relayURL, 10*time.Second); err != nil {
|
|
||||||
t.Fatalf("Relay not ready after timeout: %v", err)
|
|
||||||
}
|
|
||||||
t.Logf("Relay is ready at %s", relayURL)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create test suite
|
|
||||||
t.Logf("Creating test suite for %s...", relayURL)
|
|
||||||
suite, err := relaytester.NewTestSuite(relayURL)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create test suite: %v", err)
|
|
||||||
}
|
|
||||||
t.Logf("Test suite created, running tests...")
|
|
||||||
|
|
||||||
// Run tests
|
|
||||||
var results []relaytester.TestResult
|
|
||||||
if testName != "" {
|
|
||||||
// Run specific test
|
|
||||||
result, err := suite.RunTest(testName)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to run test %s: %v", testName, err)
|
|
||||||
}
|
|
||||||
results = []relaytester.TestResult{result}
|
|
||||||
} else {
|
|
||||||
// Run all tests
|
|
||||||
if results, err = suite.Run(); err != nil {
|
|
||||||
t.Fatalf("Failed to run tests: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Output results
|
|
||||||
if testJSON {
|
|
||||||
jsonOutput, err := relaytester.FormatJSON(results)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to format JSON: %v", err)
|
|
||||||
}
|
|
||||||
fmt.Println(jsonOutput)
|
|
||||||
} else {
|
|
||||||
outputResults(results, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if any required tests failed
|
|
||||||
for _, result := range results {
|
|
||||||
if result.Required && !result.Pass {
|
|
||||||
t.Errorf("Required test '%s' failed: %s", result.Name, result.Info)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func startTestRelay() (relay *run.Relay, port int, err error) {
|
|
||||||
cfg := &config.C{
|
|
||||||
AppName: "ORLY-TEST",
|
|
||||||
DataDir: relayDataDir,
|
|
||||||
Listen: "127.0.0.1",
|
|
||||||
Port: 0, // Always use random port, unless overridden via -port flag
|
|
||||||
HealthPort: 0,
|
|
||||||
EnableShutdown: false,
|
|
||||||
LogLevel: "warn",
|
|
||||||
DBLogLevel: "warn",
|
|
||||||
DBBlockCacheMB: 512,
|
|
||||||
DBIndexCacheMB: 256,
|
|
||||||
LogToStdout: false,
|
|
||||||
PprofHTTP: false,
|
|
||||||
ACLMode: "none",
|
|
||||||
AuthRequired: false,
|
|
||||||
AuthToWrite: false,
|
|
||||||
SubscriptionEnabled: false,
|
|
||||||
MonthlyPriceSats: 6000,
|
|
||||||
FollowListFrequency: time.Hour,
|
|
||||||
WebDisableEmbedded: false,
|
|
||||||
SprocketEnabled: false,
|
|
||||||
SpiderMode: "none",
|
|
||||||
PolicyEnabled: false,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use explicitly set port if provided via flag, otherwise find an available port
|
|
||||||
if relayPort > 0 {
|
|
||||||
cfg.Port = relayPort
|
|
||||||
} else {
|
|
||||||
var listener net.Listener
|
|
||||||
if listener, err = net.Listen("tcp", "127.0.0.1:0"); err != nil {
|
|
||||||
return nil, 0, fmt.Errorf("failed to find available port: %w", err)
|
|
||||||
}
|
|
||||||
addr := listener.Addr().(*net.TCPAddr)
|
|
||||||
cfg.Port = addr.Port
|
|
||||||
listener.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set default data dir if not specified
|
|
||||||
if cfg.DataDir == "" {
|
|
||||||
tmpDir := filepath.Join(os.TempDir(), fmt.Sprintf("orly-test-%d", time.Now().UnixNano()))
|
|
||||||
cfg.DataDir = tmpDir
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set up logging
|
|
||||||
lol.SetLogLevel(cfg.LogLevel)
|
|
||||||
|
|
||||||
// Create options
|
|
||||||
cleanup := !keepDataDir
|
|
||||||
opts := &run.Options{
|
|
||||||
CleanupDataDir: &cleanup,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start relay
|
|
||||||
if relay, err = run.Start(cfg, opts); err != nil {
|
|
||||||
return nil, 0, fmt.Errorf("failed to start relay: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return relay, cfg.Port, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// waitForRelay waits for the relay to be ready by attempting to connect
|
|
||||||
func waitForRelay(url string, timeout time.Duration) error {
|
|
||||||
// Extract host:port from ws:// URL
|
|
||||||
addr := url
|
|
||||||
if len(url) > 7 && url[:5] == "ws://" {
|
|
||||||
addr = url[5:]
|
|
||||||
}
|
|
||||||
deadline := time.Now().Add(timeout)
|
|
||||||
attempts := 0
|
|
||||||
for time.Now().Before(deadline) {
|
|
||||||
conn, err := net.DialTimeout("tcp", addr, 500*time.Millisecond)
|
|
||||||
if err == nil {
|
|
||||||
conn.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
attempts++
|
|
||||||
if attempts%10 == 0 {
|
|
||||||
// Log every 10th attempt (every second)
|
|
||||||
}
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
}
|
|
||||||
return fmt.Errorf("timeout waiting for relay at %s after %d attempts", url, attempts)
|
|
||||||
}
|
|
||||||
|
|
||||||
func outputResults(results []relaytester.TestResult, t *testing.T) {
|
|
||||||
passed := 0
|
|
||||||
failed := 0
|
|
||||||
requiredFailed := 0
|
|
||||||
|
|
||||||
for _, result := range results {
|
|
||||||
if result.Pass {
|
|
||||||
passed++
|
|
||||||
t.Logf("PASS: %s", result.Name)
|
|
||||||
} else {
|
|
||||||
failed++
|
|
||||||
if result.Required {
|
|
||||||
requiredFailed++
|
|
||||||
t.Errorf("FAIL (required): %s - %s", result.Name, result.Info)
|
|
||||||
} else {
|
|
||||||
t.Logf("FAIL (optional): %s - %s", result.Name, result.Info)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("\nTest Summary:")
|
|
||||||
t.Logf(" Total: %d", len(results))
|
|
||||||
t.Logf(" Passed: %d", passed)
|
|
||||||
t.Logf(" Failed: %d", failed)
|
|
||||||
t.Logf(" Required Failed: %d", requiredFailed)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestMain allows custom test setup/teardown
|
|
||||||
func TestMain(m *testing.M) {
|
|
||||||
// Manually parse our custom flags to avoid conflicts with Go's test flags
|
|
||||||
for i := 1; i < len(os.Args); i++ {
|
|
||||||
arg := os.Args[i]
|
|
||||||
switch arg {
|
|
||||||
case "-relay-url":
|
|
||||||
if i+1 < len(os.Args) {
|
|
||||||
testRelayURL = os.Args[i+1]
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
case "-test-name":
|
|
||||||
if i+1 < len(os.Args) {
|
|
||||||
testName = os.Args[i+1]
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
case "-json":
|
|
||||||
testJSON = true
|
|
||||||
case "-keep-data":
|
|
||||||
keepDataDir = true
|
|
||||||
case "-port":
|
|
||||||
if i+1 < len(os.Args) {
|
|
||||||
fmt.Sscanf(os.Args[i+1], "%d", &relayPort)
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
case "-data-dir":
|
|
||||||
if i+1 < len(os.Args) {
|
|
||||||
relayDataDir = os.Args[i+1]
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
code := m.Run()
|
|
||||||
os.Exit(code)
|
|
||||||
}
|
|
||||||
166
scripts/test-subscription-stability.sh
Executable file
166
scripts/test-subscription-stability.sh
Executable file
@@ -0,0 +1,166 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
# Test script for verifying subscription stability fixes
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
RELAY_URL="${RELAY_URL:-ws://localhost:3334}"
|
||||||
|
TEST_DURATION="${TEST_DURATION:-60}" # seconds
|
||||||
|
EVENT_INTERVAL="${EVENT_INTERVAL:-2}" # seconds between events
|
||||||
|
|
||||||
|
echo "==================================="
|
||||||
|
echo "Subscription Stability Test"
|
||||||
|
echo "==================================="
|
||||||
|
echo "Relay URL: $RELAY_URL"
|
||||||
|
echo "Test duration: ${TEST_DURATION}s"
|
||||||
|
echo "Event interval: ${EVENT_INTERVAL}s"
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
# Check if websocat is installed
|
||||||
|
if ! command -v websocat &> /dev/null; then
|
||||||
|
echo "ERROR: websocat is not installed"
|
||||||
|
echo "Install with: cargo install websocat"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Check if jq is installed
|
||||||
|
if ! command -v jq &> /dev/null; then
|
||||||
|
echo "ERROR: jq is not installed"
|
||||||
|
echo "Install with: sudo apt install jq"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Temporary files for communication
|
||||||
|
FIFO_IN=$(mktemp -u)
|
||||||
|
FIFO_OUT=$(mktemp -u)
|
||||||
|
mkfifo "$FIFO_IN"
|
||||||
|
mkfifo "$FIFO_OUT"
|
||||||
|
|
||||||
|
# Cleanup on exit
|
||||||
|
cleanup() {
|
||||||
|
echo ""
|
||||||
|
echo "Cleaning up..."
|
||||||
|
rm -f "$FIFO_IN" "$FIFO_OUT"
|
||||||
|
kill $WS_PID 2>/dev/null || true
|
||||||
|
kill $READER_PID 2>/dev/null || true
|
||||||
|
kill $PUBLISHER_PID 2>/dev/null || true
|
||||||
|
}
|
||||||
|
trap cleanup EXIT INT TERM
|
||||||
|
|
||||||
|
echo "Step 1: Connecting to relay..."
|
||||||
|
|
||||||
|
# Start WebSocket connection
|
||||||
|
websocat "$RELAY_URL" < "$FIFO_IN" > "$FIFO_OUT" &
|
||||||
|
WS_PID=$!
|
||||||
|
|
||||||
|
# Wait for connection
|
||||||
|
sleep 1
|
||||||
|
|
||||||
|
if ! kill -0 $WS_PID 2>/dev/null; then
|
||||||
|
echo "ERROR: Failed to connect to relay at $RELAY_URL"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "✓ Connected to relay"
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
echo "Step 2: Creating subscription..."
|
||||||
|
|
||||||
|
# Send REQ message
|
||||||
|
SUB_ID="stability-test-$(date +%s)"
|
||||||
|
REQ_MSG='["REQ","'$SUB_ID'",{"kinds":[1]}]'
|
||||||
|
echo "$REQ_MSG" > "$FIFO_IN"
|
||||||
|
|
||||||
|
echo "✓ Sent REQ for subscription: $SUB_ID"
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
# Variables for tracking
|
||||||
|
RECEIVED_COUNT=0
|
||||||
|
PUBLISHED_COUNT=0
|
||||||
|
EOSE_RECEIVED=0
|
||||||
|
|
||||||
|
echo "Step 3: Waiting for EOSE..."
|
||||||
|
|
||||||
|
# Read messages and count events
|
||||||
|
(
|
||||||
|
while IFS= read -r line; do
|
||||||
|
echo "[RECV] $line"
|
||||||
|
|
||||||
|
# Check for EOSE
|
||||||
|
if echo "$line" | jq -e '. | select(.[0] == "EOSE" and .[1] == "'$SUB_ID'")' > /dev/null 2>&1; then
|
||||||
|
EOSE_RECEIVED=1
|
||||||
|
echo "✓ Received EOSE"
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
done < "$FIFO_OUT"
|
||||||
|
) &
|
||||||
|
READER_PID=$!
|
||||||
|
|
||||||
|
# Wait up to 10 seconds for EOSE
|
||||||
|
for i in {1..10}; do
|
||||||
|
if [ $EOSE_RECEIVED -eq 1 ]; then
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "Step 4: Starting long-running test..."
|
||||||
|
echo "Publishing events every ${EVENT_INTERVAL}s for ${TEST_DURATION}s..."
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
# Start event counter
|
||||||
|
(
|
||||||
|
while IFS= read -r line; do
|
||||||
|
# Count EVENT messages for our subscription
|
||||||
|
if echo "$line" | jq -e '. | select(.[0] == "EVENT" and .[1] == "'$SUB_ID'")' > /dev/null 2>&1; then
|
||||||
|
RECEIVED_COUNT=$((RECEIVED_COUNT + 1))
|
||||||
|
EVENT_ID=$(echo "$line" | jq -r '.[2].id' 2>/dev/null || echo "unknown")
|
||||||
|
echo "[$(date +%H:%M:%S)] EVENT received #$RECEIVED_COUNT (id: ${EVENT_ID:0:8}...)"
|
||||||
|
fi
|
||||||
|
done < "$FIFO_OUT"
|
||||||
|
) &
|
||||||
|
READER_PID=$!
|
||||||
|
|
||||||
|
# Publish events
|
||||||
|
START_TIME=$(date +%s)
|
||||||
|
END_TIME=$((START_TIME + TEST_DURATION))
|
||||||
|
|
||||||
|
while [ $(date +%s) -lt $END_TIME ]; do
|
||||||
|
PUBLISHED_COUNT=$((PUBLISHED_COUNT + 1))
|
||||||
|
|
||||||
|
# Create and publish event (you'll need to implement this part)
|
||||||
|
# This is a placeholder - replace with actual event publishing
|
||||||
|
EVENT_JSON='["EVENT",{"kind":1,"content":"Test event '$PUBLISHED_COUNT' for stability test","created_at":'$(date +%s)',"tags":[],"pubkey":"0000000000000000000000000000000000000000000000000000000000000000","id":"0000000000000000000000000000000000000000000000000000000000000000","sig":"0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"}]'
|
||||||
|
|
||||||
|
echo "[$(date +%H:%M:%S)] Publishing event #$PUBLISHED_COUNT"
|
||||||
|
|
||||||
|
# Sleep before next event
|
||||||
|
sleep "$EVENT_INTERVAL"
|
||||||
|
done
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "==================================="
|
||||||
|
echo "Test Complete"
|
||||||
|
echo "==================================="
|
||||||
|
echo "Duration: ${TEST_DURATION}s"
|
||||||
|
echo "Events published: $PUBLISHED_COUNT"
|
||||||
|
echo "Events received: $RECEIVED_COUNT"
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
# Calculate success rate
|
||||||
|
if [ $PUBLISHED_COUNT -gt 0 ]; then
|
||||||
|
SUCCESS_RATE=$((RECEIVED_COUNT * 100 / PUBLISHED_COUNT))
|
||||||
|
echo "Success rate: ${SUCCESS_RATE}%"
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
if [ $SUCCESS_RATE -ge 90 ]; then
|
||||||
|
echo "✓ TEST PASSED - Subscription remained stable"
|
||||||
|
exit 0
|
||||||
|
else
|
||||||
|
echo "✗ TEST FAILED - Subscription dropped events"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
echo "✗ TEST FAILED - No events published"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
41
scripts/test-subscriptions.sh
Executable file
41
scripts/test-subscriptions.sh
Executable file
@@ -0,0 +1,41 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
# Simple subscription stability test script
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
RELAY_URL="${RELAY_URL:-ws://localhost:3334}"
|
||||||
|
DURATION="${DURATION:-60}"
|
||||||
|
KIND="${KIND:-1}"
|
||||||
|
|
||||||
|
echo "==================================="
|
||||||
|
echo "Subscription Stability Test"
|
||||||
|
echo "==================================="
|
||||||
|
echo ""
|
||||||
|
echo "This tool tests whether subscriptions remain stable over time."
|
||||||
|
echo ""
|
||||||
|
echo "Configuration:"
|
||||||
|
echo " Relay URL: $RELAY_URL"
|
||||||
|
echo " Duration: ${DURATION}s"
|
||||||
|
echo " Event kind: $KIND"
|
||||||
|
echo ""
|
||||||
|
echo "To test properly, you should:"
|
||||||
|
echo " 1. Start this test"
|
||||||
|
echo " 2. In another terminal, publish events to the relay"
|
||||||
|
echo " 3. Verify events are received throughout the test duration"
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
# Check if the test tool is built
|
||||||
|
if [ ! -f "./subscription-test" ]; then
|
||||||
|
echo "Building subscription-test tool..."
|
||||||
|
go build -o subscription-test ./cmd/subscription-test
|
||||||
|
echo "✓ Built"
|
||||||
|
echo ""
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Run the test
|
||||||
|
echo "Starting test..."
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
./subscription-test -url "$RELAY_URL" -duration "$DURATION" -kind "$KIND" -v
|
||||||
|
|
||||||
|
exit $?
|
||||||
@@ -1,167 +0,0 @@
|
|||||||
#!/usr/bin/env node
|
|
||||||
|
|
||||||
// Test script to verify websocket connections are not closed prematurely
|
|
||||||
// This is a Node.js test script that can be run with: node test-relay-connection.js
|
|
||||||
|
|
||||||
import { NostrWebSocket } from '@nostr-dev-kit/ndk';
|
|
||||||
|
|
||||||
const RELAY = process.env.RELAY || 'ws://localhost:8080';
|
|
||||||
const MAX_CONNECTIONS = 10;
|
|
||||||
const TEST_DURATION = 30000; // 30 seconds
|
|
||||||
|
|
||||||
let connectionsClosed = 0;
|
|
||||||
let connectionsOpened = 0;
|
|
||||||
let messagesReceived = 0;
|
|
||||||
let errors = 0;
|
|
||||||
|
|
||||||
const stats = {
|
|
||||||
premature: 0,
|
|
||||||
normal: 0,
|
|
||||||
errors: 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
class TestConnection {
|
|
||||||
constructor(id) {
|
|
||||||
this.id = id;
|
|
||||||
this.ws = null;
|
|
||||||
this.closed = false;
|
|
||||||
this.openTime = null;
|
|
||||||
this.closeTime = null;
|
|
||||||
this.lastError = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
connect() {
|
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
this.ws = new NostrWebSocket(RELAY);
|
|
||||||
|
|
||||||
this.ws.addEventListener('open', () => {
|
|
||||||
this.openTime = Date.now();
|
|
||||||
connectionsOpened++;
|
|
||||||
console.log(`[Connection ${this.id}] Opened`);
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
|
|
||||||
this.ws.addEventListener('close', (event) => {
|
|
||||||
this.closeTime = Date.now();
|
|
||||||
this.closed = true;
|
|
||||||
connectionsClosed++;
|
|
||||||
const duration = this.closeTime - this.openTime;
|
|
||||||
console.log(`[Connection ${this.id}] Closed: code=${event.code}, reason="${event.reason || ''}", duration=${duration}ms`);
|
|
||||||
|
|
||||||
if (duration < 5000 && event.code !== 1000) {
|
|
||||||
stats.premature++;
|
|
||||||
console.log(`[Connection ${this.id}] PREMATURE CLOSE DETECTED: duration=${duration}ms < 5s`);
|
|
||||||
} else {
|
|
||||||
stats.normal++;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
this.ws.addEventListener('error', (error) => {
|
|
||||||
this.lastError = error;
|
|
||||||
stats.errors++;
|
|
||||||
console.error(`[Connection ${this.id}] Error:`, error);
|
|
||||||
});
|
|
||||||
|
|
||||||
this.ws.addEventListener('message', (event) => {
|
|
||||||
messagesReceived++;
|
|
||||||
try {
|
|
||||||
const data = JSON.parse(event.data);
|
|
||||||
console.log(`[Connection ${this.id}] Message:`, data[0]);
|
|
||||||
} catch (e) {
|
|
||||||
console.log(`[Connection ${this.id}] Message (non-JSON):`, event.data);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
setTimeout(reject, 5000); // Timeout after 5 seconds if not opened
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
sendReq() {
|
|
||||||
if (this.ws && !this.closed) {
|
|
||||||
this.ws.send(JSON.stringify(['REQ', `test-sub-${this.id}`, { kinds: [1], limit: 10 }]));
|
|
||||||
console.log(`[Connection ${this.id}] Sent REQ`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
close() {
|
|
||||||
if (this.ws && !this.closed) {
|
|
||||||
this.ws.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function runTest() {
|
|
||||||
console.log('='.repeat(60));
|
|
||||||
console.log('Testing Relay Connection Stability');
|
|
||||||
console.log('='.repeat(60));
|
|
||||||
console.log(`Relay: ${RELAY}`);
|
|
||||||
console.log(`Duration: ${TEST_DURATION}ms`);
|
|
||||||
console.log(`Connections: ${MAX_CONNECTIONS}`);
|
|
||||||
console.log('='.repeat(60));
|
|
||||||
console.log();
|
|
||||||
|
|
||||||
const connections = [];
|
|
||||||
|
|
||||||
// Open connections
|
|
||||||
console.log('Opening connections...');
|
|
||||||
for (let i = 0; i < MAX_CONNECTIONS; i++) {
|
|
||||||
const conn = new TestConnection(i);
|
|
||||||
try {
|
|
||||||
await conn.connect();
|
|
||||||
connections.push(conn);
|
|
||||||
} catch (error) {
|
|
||||||
console.error(`Failed to open connection ${i}:`, error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
console.log(`Opened ${connections.length} connections`);
|
|
||||||
console.log();
|
|
||||||
|
|
||||||
// Send requests from each connection
|
|
||||||
console.log('Sending REQ messages...');
|
|
||||||
for (const conn of connections) {
|
|
||||||
conn.sendReq();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait and let connections run
|
|
||||||
console.log(`Waiting ${TEST_DURATION / 1000}s...`);
|
|
||||||
await new Promise(resolve => setTimeout(resolve, TEST_DURATION));
|
|
||||||
|
|
||||||
// Close all connections
|
|
||||||
console.log('Closing all connections...');
|
|
||||||
for (const conn of connections) {
|
|
||||||
conn.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for close events
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
|
||||||
|
|
||||||
// Print results
|
|
||||||
console.log();
|
|
||||||
console.log('='.repeat(60));
|
|
||||||
console.log('Test Results:');
|
|
||||||
console.log('='.repeat(60));
|
|
||||||
console.log(`Connections Opened: ${connectionsOpened}`);
|
|
||||||
console.log(`Connections Closed: ${connectionsClosed}`);
|
|
||||||
console.log(`Messages Received: ${messagesReceived}`);
|
|
||||||
console.log();
|
|
||||||
console.log('Closure Analysis:');
|
|
||||||
console.log(`- Premature Closes: ${stats.premature}`);
|
|
||||||
console.log(`- Normal Closes: ${stats.normal}`);
|
|
||||||
console.log(`- Errors: ${stats.errors}`);
|
|
||||||
console.log('='.repeat(60));
|
|
||||||
|
|
||||||
if (stats.premature > 0) {
|
|
||||||
console.error('FAILED: Detected premature connection closures!');
|
|
||||||
process.exit(1);
|
|
||||||
} else {
|
|
||||||
console.log('PASSED: No premature connection closures detected.');
|
|
||||||
process.exit(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
runTest().catch(error => {
|
|
||||||
console.error('Test failed:', error);
|
|
||||||
process.exit(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
@@ -1,57 +0,0 @@
|
|||||||
import { NostrWebSocket } from '@nostr-dev-kit/ndk';
|
|
||||||
|
|
||||||
const RELAY = process.env.RELAY || 'ws://localhost:8080';
|
|
||||||
|
|
||||||
async function testConnectionClosure() {
|
|
||||||
console.log('Testing websocket connection closure issues...');
|
|
||||||
console.log('Connecting to:', RELAY);
|
|
||||||
|
|
||||||
// Create multiple connections to test concurrency
|
|
||||||
const connections = [];
|
|
||||||
const results = { connected: 0, closed: 0, errors: 0 };
|
|
||||||
|
|
||||||
for (let i = 0; i < 5; i++) {
|
|
||||||
const ws = new NostrWebSocket(RELAY);
|
|
||||||
|
|
||||||
ws.addEventListener('open', () => {
|
|
||||||
console.log(`Connection ${i} opened`);
|
|
||||||
results.connected++;
|
|
||||||
});
|
|
||||||
|
|
||||||
ws.addEventListener('close', (event) => {
|
|
||||||
console.log(`Connection ${i} closed:`, event.code, event.reason);
|
|
||||||
results.closed++;
|
|
||||||
});
|
|
||||||
|
|
||||||
ws.addEventListener('error', (error) => {
|
|
||||||
console.error(`Connection ${i} error:`, error);
|
|
||||||
results.errors++;
|
|
||||||
});
|
|
||||||
|
|
||||||
connections.push(ws);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait a bit then send REQs
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
|
||||||
|
|
||||||
// Send some REQ messages
|
|
||||||
for (const ws of connections) {
|
|
||||||
ws.send(JSON.stringify(['REQ', 'test-sub', { kinds: [1] }]));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait and observe behavior
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 5000));
|
|
||||||
|
|
||||||
console.log('\nTest Results:');
|
|
||||||
console.log(`- Connected: ${results.connected}`);
|
|
||||||
console.log(`- Closed prematurely: ${results.closed}`);
|
|
||||||
console.log(`- Errors: ${results.errors}`);
|
|
||||||
|
|
||||||
// Close all connections
|
|
||||||
for (const ws of connections) {
|
|
||||||
ws.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
testConnectionClosure().catch(console.error);
|
|
||||||
|
|
||||||
@@ -1,156 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"next.orly.dev/app/config"
|
|
||||||
"next.orly.dev/pkg/run"
|
|
||||||
)
|
|
||||||
|
|
||||||
// func TestDumbClientWorkaround(t *testing.T) {
|
|
||||||
// var relay *run.Relay
|
|
||||||
// var err error
|
|
||||||
|
|
||||||
// // Start local relay for testing
|
|
||||||
// if relay, _, err = startWorkaroundTestRelay(); err != nil {
|
|
||||||
// t.Fatalf("Failed to start test relay: %v", err)
|
|
||||||
// }
|
|
||||||
// defer func() {
|
|
||||||
// if stopErr := relay.Stop(); stopErr != nil {
|
|
||||||
// t.Logf("Error stopping relay: %v", stopErr)
|
|
||||||
// }
|
|
||||||
// }()
|
|
||||||
|
|
||||||
// relayURL := "ws://127.0.0.1:3338"
|
|
||||||
|
|
||||||
// // Wait for relay to be ready
|
|
||||||
// if err = waitForRelay(relayURL, 10*time.Second); err != nil {
|
|
||||||
// t.Fatalf("Relay not ready after timeout: %v", err)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// t.Logf("Relay is ready at %s", relayURL)
|
|
||||||
|
|
||||||
// // Test connection with a "dumb" client that doesn't handle ping/pong properly
|
|
||||||
// dialer := websocket.Dialer{
|
|
||||||
// HandshakeTimeout: 10 * time.Second,
|
|
||||||
// }
|
|
||||||
|
|
||||||
// conn, _, err := dialer.Dial(relayURL, nil)
|
|
||||||
// if err != nil {
|
|
||||||
// t.Fatalf("Failed to connect: %v", err)
|
|
||||||
// }
|
|
||||||
// defer conn.Close()
|
|
||||||
|
|
||||||
// t.Logf("Connection established")
|
|
||||||
|
|
||||||
// // Simulate a dumb client that sets a short read deadline and doesn't handle ping/pong
|
|
||||||
// conn.SetReadDeadline(time.Now().Add(30 * time.Second))
|
|
||||||
|
|
||||||
// startTime := time.Now()
|
|
||||||
// messageCount := 0
|
|
||||||
|
|
||||||
// // The connection should stay alive despite the short client-side deadline
|
|
||||||
// // because our workaround sets a 24-hour server-side deadline
|
|
||||||
// connectionFailed := false
|
|
||||||
// for time.Since(startTime) < 2*time.Minute && !connectionFailed {
|
|
||||||
// // Extend client deadline every 10 seconds (simulating dumb client behavior)
|
|
||||||
// if time.Since(startTime).Seconds() > 10 && int(time.Since(startTime).Seconds())%10 == 0 {
|
|
||||||
// conn.SetReadDeadline(time.Now().Add(30 * time.Second))
|
|
||||||
// t.Logf("Dumb client extended its own deadline")
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Try to read with a short timeout to avoid blocking
|
|
||||||
// conn.SetReadDeadline(time.Now().Add(1 * time.Second))
|
|
||||||
|
|
||||||
// // Use a function to catch panics from ReadMessage on failed connections
|
|
||||||
// func() {
|
|
||||||
// defer func() {
|
|
||||||
// if r := recover(); r != nil {
|
|
||||||
// if panicMsg, ok := r.(string); ok && panicMsg == "repeated read on failed websocket connection" {
|
|
||||||
// t.Logf("Connection failed, stopping read loop")
|
|
||||||
// connectionFailed = true
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// // Re-panic if it's a different panic
|
|
||||||
// panic(r)
|
|
||||||
// }
|
|
||||||
// }()
|
|
||||||
|
|
||||||
// msgType, data, err := conn.ReadMessage()
|
|
||||||
// conn.SetReadDeadline(time.Now().Add(30 * time.Second)) // Reset
|
|
||||||
|
|
||||||
// if err != nil {
|
|
||||||
// if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
|
||||||
// // Timeout is expected - just continue
|
|
||||||
// time.Sleep(100 * time.Millisecond)
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
|
|
||||||
// t.Logf("Connection closed normally: %v", err)
|
|
||||||
// connectionFailed = true
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// t.Errorf("Unexpected error: %v", err)
|
|
||||||
// connectionFailed = true
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
|
|
||||||
// messageCount++
|
|
||||||
// t.Logf("Received message %d: type=%d, len=%d", messageCount, msgType, len(data))
|
|
||||||
// }()
|
|
||||||
// }
|
|
||||||
|
|
||||||
// elapsed := time.Since(startTime)
|
|
||||||
// if elapsed < 90*time.Second {
|
|
||||||
// t.Errorf("Connection died too early after %v (expected at least 90s)", elapsed)
|
|
||||||
// } else {
|
|
||||||
// t.Logf("Workaround successful: connection lasted %v with %d messages", elapsed, messageCount)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// startWorkaroundTestRelay starts a relay for workaround testing
|
|
||||||
func startWorkaroundTestRelay() (relay *run.Relay, port int, err error) {
|
|
||||||
cfg := &config.C{
|
|
||||||
AppName: "ORLY-WORKAROUND-TEST",
|
|
||||||
DataDir: "",
|
|
||||||
Listen: "127.0.0.1",
|
|
||||||
Port: 3338,
|
|
||||||
HealthPort: 0,
|
|
||||||
EnableShutdown: false,
|
|
||||||
LogLevel: "info",
|
|
||||||
DBLogLevel: "warn",
|
|
||||||
DBBlockCacheMB: 512,
|
|
||||||
DBIndexCacheMB: 256,
|
|
||||||
LogToStdout: false,
|
|
||||||
PprofHTTP: false,
|
|
||||||
ACLMode: "none",
|
|
||||||
AuthRequired: false,
|
|
||||||
AuthToWrite: false,
|
|
||||||
SubscriptionEnabled: false,
|
|
||||||
MonthlyPriceSats: 6000,
|
|
||||||
FollowListFrequency: time.Hour,
|
|
||||||
WebDisableEmbedded: false,
|
|
||||||
SprocketEnabled: false,
|
|
||||||
SpiderMode: "none",
|
|
||||||
PolicyEnabled: false,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set default data dir if not specified
|
|
||||||
if cfg.DataDir == "" {
|
|
||||||
cfg.DataDir = fmt.Sprintf("/tmp/orly-workaround-test-%d", time.Now().UnixNano())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create options
|
|
||||||
cleanup := true
|
|
||||||
opts := &run.Options{
|
|
||||||
CleanupDataDir: &cleanup,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start relay
|
|
||||||
if relay, err = run.Start(cfg, opts); err != nil {
|
|
||||||
return nil, 0, fmt.Errorf("failed to start relay: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return relay, cfg.Port, nil
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user