Enhance relay testing and event handling
- Updated TestRelay to include a wait mechanism for relay readiness, improving test reliability. - Refactored startTestRelay to return the assigned port, allowing dynamic port assignment. - Added timestamp validation in HandleEvent to reject events with timestamps more than one hour in the future. - Introduced channels for handling OK and COUNT messages in the Client struct, improving message processing. - Updated tests to reflect changes in event timestamp handling and increased wait times for event processing. - Bumped version to v0.20.6 to reflect these enhancements.
This commit is contained in:
@@ -176,6 +176,18 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// validate timestamp - reject events too far in the future (more than 1 hour)
|
||||||
|
now := time.Now().Unix()
|
||||||
|
if env.E.CreatedAt > now+3600 {
|
||||||
|
if err = Ok.Invalid(
|
||||||
|
l, env,
|
||||||
|
"timestamp too far in the future",
|
||||||
|
); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// verify the signature
|
// verify the signature
|
||||||
var ok bool
|
var ok bool
|
||||||
if ok, err = env.Verify(); chk.T(err) {
|
if ok, err = env.Verify(); chk.T(err) {
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
v0.20.4
|
v0.20.6
|
||||||
@@ -18,6 +18,8 @@ type Client struct {
|
|||||||
url string
|
url string
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
subs map[string]chan []byte
|
subs map[string]chan []byte
|
||||||
|
okCh chan []byte // Channel for OK messages
|
||||||
|
countCh chan []byte // Channel for COUNT messages
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
@@ -34,11 +36,13 @@ func NewClient(url string) (c *Client, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
c = &Client{
|
c = &Client{
|
||||||
conn: conn,
|
conn: conn,
|
||||||
url: url,
|
url: url,
|
||||||
subs: make(map[string]chan []byte),
|
subs: make(map[string]chan []byte),
|
||||||
ctx: ctx,
|
okCh: make(chan []byte, 100),
|
||||||
cancel: cancel,
|
countCh: make(chan []byte, 100),
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
go c.readLoop()
|
go c.readLoop()
|
||||||
return
|
return
|
||||||
@@ -106,11 +110,22 @@ func (c *Client) readLoop() {
|
|||||||
if subID, ok := raw[1].(string); ok {
|
if subID, ok := raw[1].(string); ok {
|
||||||
if ch, exists := c.subs[subID]; exists {
|
if ch, exists := c.subs[subID]; exists {
|
||||||
close(ch)
|
close(ch)
|
||||||
|
delete(c.subs, subID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case "OK":
|
case "OK":
|
||||||
// OK messages are handled by WaitForOK
|
// Route OK messages to okCh for WaitForOK
|
||||||
|
select {
|
||||||
|
case c.okCh <- msg:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
case "COUNT":
|
||||||
|
// Route COUNT messages to countCh for Count
|
||||||
|
select {
|
||||||
|
case c.countCh <- msg:
|
||||||
|
default:
|
||||||
|
}
|
||||||
case "NOTICE":
|
case "NOTICE":
|
||||||
// Notice messages are logged
|
// Notice messages are logged
|
||||||
case "CLOSED":
|
case "CLOSED":
|
||||||
@@ -140,7 +155,15 @@ func (c *Client) Subscribe(subID string, filters []interface{}) (ch chan []byte,
|
|||||||
func (c *Client) Unsubscribe(subID string) error {
|
func (c *Client) Unsubscribe(subID string) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
if ch, exists := c.subs[subID]; exists {
|
if ch, exists := c.subs[subID]; exists {
|
||||||
close(ch)
|
// Channel might already be closed by EOSE, so use recover to handle gracefully
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
if recover() != nil {
|
||||||
|
// Channel was already closed, ignore
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
delete(c.subs, subID)
|
delete(c.subs, subID)
|
||||||
}
|
}
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
@@ -149,10 +172,7 @@ func (c *Client) Unsubscribe(subID string) error {
|
|||||||
|
|
||||||
// Publish sends an EVENT message to the relay.
|
// Publish sends an EVENT message to the relay.
|
||||||
func (c *Client) Publish(ev *event.E) (err error) {
|
func (c *Client) Publish(ev *event.E) (err error) {
|
||||||
evJSON, err := json.Marshal(ev.Serialize())
|
evJSON := ev.Serialize()
|
||||||
if err != nil {
|
|
||||||
return errorf.E("failed to marshal event: %w", err)
|
|
||||||
}
|
|
||||||
var evMap map[string]interface{}
|
var evMap map[string]interface{}
|
||||||
if err = json.Unmarshal(evJSON, &evMap); err != nil {
|
if err = json.Unmarshal(evJSON, &evMap); err != nil {
|
||||||
return errorf.E("failed to unmarshal event: %w", err)
|
return errorf.E("failed to unmarshal event: %w", err)
|
||||||
@@ -169,21 +189,14 @@ func (c *Client) WaitForOK(eventID []byte, timeout time.Duration) (accepted bool
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return false, "", errorf.E("timeout waiting for OK response")
|
return false, "", errorf.E("timeout waiting for OK response")
|
||||||
default:
|
case msg := <-c.okCh:
|
||||||
}
|
var raw []interface{}
|
||||||
var msg []byte
|
if err = json.Unmarshal(msg, &raw); err != nil {
|
||||||
_, msg, err = c.conn.ReadMessage()
|
continue
|
||||||
if err != nil {
|
}
|
||||||
return false, "", errorf.E("connection closed: %w", err)
|
if len(raw) < 3 {
|
||||||
}
|
continue
|
||||||
var raw []interface{}
|
}
|
||||||
if err = json.Unmarshal(msg, &raw); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if len(raw) < 3 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if typ, ok := raw[0].(string); ok && typ == "OK" {
|
|
||||||
if id, ok := raw[1].(string); ok && id == idStr {
|
if id, ok := raw[1].(string); ok && id == idStr {
|
||||||
accepted, _ = raw[2].(bool)
|
accepted, _ = raw[2].(bool)
|
||||||
if len(raw) > 3 {
|
if len(raw) > 3 {
|
||||||
@@ -208,23 +221,16 @@ func (c *Client) Count(filters []interface{}) (count int64, err error) {
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return 0, errorf.E("timeout waiting for COUNT response")
|
return 0, errorf.E("timeout waiting for COUNT response")
|
||||||
default:
|
case msg := <-c.countCh:
|
||||||
}
|
var raw []interface{}
|
||||||
_, msg, err := c.conn.ReadMessage()
|
if err = json.Unmarshal(msg, &raw); err != nil {
|
||||||
if err != nil {
|
continue
|
||||||
return 0, errorf.E("connection closed: %w", err)
|
}
|
||||||
}
|
if len(raw) >= 3 {
|
||||||
var raw []interface{}
|
|
||||||
if err = json.Unmarshal(msg, &raw); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if len(raw) >= 3 {
|
|
||||||
if typ, ok := raw[0].(string); ok && typ == "COUNT" {
|
|
||||||
if subID, ok := raw[1].(string); ok && subID == "count-sub" {
|
if subID, ok := raw[1].(string); ok && subID == "count-sub" {
|
||||||
if countObj, ok := raw[2].(map[string]interface{}); ok {
|
// COUNT response format: ["COUNT", "subscription-id", count, approximate?]
|
||||||
if c, ok := countObj["count"].(float64); ok {
|
if cnt, ok := raw[2].(float64); ok {
|
||||||
return int64(c), nil
|
return int64(cnt), nil
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -234,12 +240,9 @@ func (c *Client) Count(filters []interface{}) (count int64, err error) {
|
|||||||
|
|
||||||
// Auth sends an AUTH message with the signed event.
|
// Auth sends an AUTH message with the signed event.
|
||||||
func (c *Client) Auth(ev *event.E) error {
|
func (c *Client) Auth(ev *event.E) error {
|
||||||
evJSON, err := json.Marshal(ev.Serialize())
|
evJSON := ev.Serialize()
|
||||||
if err != nil {
|
|
||||||
return errorf.E("failed to marshal event: %w", err)
|
|
||||||
}
|
|
||||||
var evMap map[string]interface{}
|
var evMap map[string]interface{}
|
||||||
if err = json.Unmarshal(evJSON, &evMap); err != nil {
|
if err := json.Unmarshal(evJSON, &evMap); err != nil {
|
||||||
return errorf.E("failed to unmarshal event: %w", err)
|
return errorf.E("failed to unmarshal event: %w", err)
|
||||||
}
|
}
|
||||||
return c.Send([]interface{}{"AUTH", evMap})
|
return c.Send([]interface{}{"AUTH", evMap})
|
||||||
|
|||||||
@@ -91,7 +91,8 @@ func CreateEphemeralEvent(signer *p256k.Signer, kindNum uint16, content string)
|
|||||||
func CreateDeleteEvent(signer *p256k.Signer, eventIDs [][]byte, reason string) (ev *event.E, err error) {
|
func CreateDeleteEvent(signer *p256k.Signer, eventIDs [][]byte, reason string) (ev *event.E, err error) {
|
||||||
tags := tag.NewS()
|
tags := tag.NewS()
|
||||||
for _, id := range eventIDs {
|
for _, id := range eventIDs {
|
||||||
tags.Append(tag.NewFromBytesSlice([]byte("e"), id))
|
// e tags must contain hex-encoded event IDs
|
||||||
|
tags.Append(tag.NewFromBytesSlice([]byte("e"), []byte(hex.Enc(id))))
|
||||||
}
|
}
|
||||||
if reason != "" {
|
if reason != "" {
|
||||||
tags.Append(tag.NewFromBytesSlice([]byte("content"), []byte(reason)))
|
tags.Append(tag.NewFromBytesSlice([]byte("content"), []byte(reason)))
|
||||||
|
|||||||
@@ -260,7 +260,7 @@ func testRejectFutureEvent(client *Client, key1, key2 *KeyPair) (result TestResu
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
|
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
|
||||||
}
|
}
|
||||||
ev.CreatedAt = time.Now().Unix() + 3600 // 1 hour in the future
|
ev.CreatedAt = time.Now().Unix() + 3601 // More than 1 hour in the future (should be rejected)
|
||||||
// Re-sign with new timestamp
|
// Re-sign with new timestamp
|
||||||
if err = ev.Sign(key1.Secret); err != nil {
|
if err = ev.Sign(key1.Secret); err != nil {
|
||||||
return TestResult{Pass: false, Info: fmt.Sprintf("failed to re-sign: %v", err)}
|
return TestResult{Pass: false, Info: fmt.Sprintf("failed to re-sign: %v", err)}
|
||||||
@@ -327,12 +327,14 @@ func testReplaceableEvents(client *Client, key1, key2 *KeyPair) (result TestResu
|
|||||||
if err != nil || !accepted {
|
if err != nil || !accepted {
|
||||||
return TestResult{Pass: false, Info: "second event not accepted"}
|
return TestResult{Pass: false, Info: "second event not accepted"}
|
||||||
}
|
}
|
||||||
time.Sleep(200 * time.Millisecond)
|
// Wait longer for replacement to complete
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
filter := map[string]interface{}{
|
filter := map[string]interface{}{
|
||||||
"kinds": []int{int(kind.ProfileMetadata.K)},
|
"kinds": []int{int(kind.ProfileMetadata.K)},
|
||||||
"authors": []string{hex.Enc(key1.Pubkey)},
|
"authors": []string{hex.Enc(key1.Pubkey)},
|
||||||
|
"limit": 2, // Set limit > 1 to get multiple versions of replaceable events
|
||||||
}
|
}
|
||||||
events, err := client.GetEvents("test-replaceable", []interface{}{filter}, 2*time.Second)
|
events, err := client.GetEvents("test-replaceable", []interface{}{filter}, 3*time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get events: %v", err)}
|
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get events: %v", err)}
|
||||||
}
|
}
|
||||||
@@ -419,7 +421,8 @@ func testDeletionEvents(client *Client, key1, key2 *KeyPair) (result TestResult)
|
|||||||
if err != nil || !accepted {
|
if err != nil || !accepted {
|
||||||
return TestResult{Pass: false, Info: "target event not accepted"}
|
return TestResult{Pass: false, Info: "target event not accepted"}
|
||||||
}
|
}
|
||||||
time.Sleep(200 * time.Millisecond)
|
// Wait longer for event to be indexed
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
// Now create deletion event
|
// Now create deletion event
|
||||||
deleteEv, err := CreateDeleteEvent(key1.Secret, [][]byte{targetEv.ID}, "deletion reason")
|
deleteEv, err := CreateDeleteEvent(key1.Secret, [][]byte{targetEv.ID}, "deletion reason")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
102
relay_test.go
102
relay_test.go
@@ -2,10 +2,9 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"syscall"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -34,7 +33,8 @@ func TestRelay(t *testing.T) {
|
|||||||
relayURL = testRelayURL
|
relayURL = testRelayURL
|
||||||
} else {
|
} else {
|
||||||
// Start local relay for testing
|
// Start local relay for testing
|
||||||
if relay, err = startTestRelay(); err != nil {
|
var port int
|
||||||
|
if relay, port, err = startTestRelay(); err != nil {
|
||||||
t.Fatalf("Failed to start test relay: %v", err)
|
t.Fatalf("Failed to start test relay: %v", err)
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -42,20 +42,22 @@ func TestRelay(t *testing.T) {
|
|||||||
t.Logf("Error stopping relay: %v", stopErr)
|
t.Logf("Error stopping relay: %v", stopErr)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
port := relayPort
|
|
||||||
if port == 0 {
|
|
||||||
port = 3334 // Default port
|
|
||||||
}
|
|
||||||
relayURL = fmt.Sprintf("ws://127.0.0.1:%d", port)
|
relayURL = fmt.Sprintf("ws://127.0.0.1:%d", port)
|
||||||
// Wait for relay to be ready
|
t.Logf("Waiting for relay to be ready at %s...", relayURL)
|
||||||
time.Sleep(2 * time.Second)
|
// Wait for relay to be ready - try connecting to verify it's up
|
||||||
|
if err = waitForRelay(relayURL, 10*time.Second); err != nil {
|
||||||
|
t.Fatalf("Relay not ready after timeout: %v", err)
|
||||||
|
}
|
||||||
|
t.Logf("Relay is ready at %s", relayURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create test suite
|
// Create test suite
|
||||||
|
t.Logf("Creating test suite for %s...", relayURL)
|
||||||
suite, err := relaytester.NewTestSuite(relayURL)
|
suite, err := relaytester.NewTestSuite(relayURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create test suite: %v", err)
|
t.Fatalf("Failed to create test suite: %v", err)
|
||||||
}
|
}
|
||||||
|
t.Logf("Test suite created, running tests...")
|
||||||
|
|
||||||
// Run tests
|
// Run tests
|
||||||
var results []relaytester.TestResult
|
var results []relaytester.TestResult
|
||||||
@@ -92,20 +94,43 @@ func TestRelay(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func startTestRelay() (relay *run.Relay, err error) {
|
func startTestRelay() (relay *run.Relay, port int, err error) {
|
||||||
cfg := &config.C{
|
cfg := &config.C{
|
||||||
AppName: "ORLY-TEST",
|
AppName: "ORLY-TEST",
|
||||||
DataDir: relayDataDir,
|
DataDir: relayDataDir,
|
||||||
Listen: "127.0.0.1",
|
Listen: "127.0.0.1",
|
||||||
Port: relayPort,
|
Port: 0, // Always use random port, unless overridden via -port flag
|
||||||
LogLevel: "warn",
|
HealthPort: 0,
|
||||||
DBLogLevel: "warn",
|
EnableShutdown: false,
|
||||||
ACLMode: "none",
|
LogLevel: "warn",
|
||||||
|
DBLogLevel: "warn",
|
||||||
|
DBBlockCacheMB: 512,
|
||||||
|
DBIndexCacheMB: 256,
|
||||||
|
LogToStdout: false,
|
||||||
|
PprofHTTP: false,
|
||||||
|
ACLMode: "none",
|
||||||
|
AuthRequired: false,
|
||||||
|
AuthToWrite: false,
|
||||||
|
SubscriptionEnabled: false,
|
||||||
|
MonthlyPriceSats: 6000,
|
||||||
|
FollowListFrequency: time.Hour,
|
||||||
|
WebDisableEmbedded: false,
|
||||||
|
SprocketEnabled: false,
|
||||||
|
SpiderMode: "none",
|
||||||
|
PolicyEnabled: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set default port if not specified
|
// Use explicitly set port if provided via flag, otherwise find an available port
|
||||||
if cfg.Port == 0 {
|
if relayPort > 0 {
|
||||||
cfg.Port = 3334
|
cfg.Port = relayPort
|
||||||
|
} else {
|
||||||
|
var listener net.Listener
|
||||||
|
if listener, err = net.Listen("tcp", "127.0.0.1:0"); err != nil {
|
||||||
|
return nil, 0, fmt.Errorf("failed to find available port: %w", err)
|
||||||
|
}
|
||||||
|
addr := listener.Addr().(*net.TCPAddr)
|
||||||
|
cfg.Port = addr.Port
|
||||||
|
listener.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set default data dir if not specified
|
// Set default data dir if not specified
|
||||||
@@ -125,21 +150,34 @@ func startTestRelay() (relay *run.Relay, err error) {
|
|||||||
|
|
||||||
// Start relay
|
// Start relay
|
||||||
if relay, err = run.Start(cfg, opts); err != nil {
|
if relay, err = run.Start(cfg, opts); err != nil {
|
||||||
return nil, fmt.Errorf("failed to start relay: %w", err)
|
return nil, 0, fmt.Errorf("failed to start relay: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set up signal handling for graceful shutdown
|
return relay, cfg.Port, nil
|
||||||
sigChan := make(chan os.Signal, 1)
|
}
|
||||||
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
|
|
||||||
go func() {
|
|
||||||
<-sigChan
|
|
||||||
if relay != nil {
|
|
||||||
relay.Stop()
|
|
||||||
}
|
|
||||||
os.Exit(0)
|
|
||||||
}()
|
|
||||||
|
|
||||||
return relay, nil
|
// waitForRelay waits for the relay to be ready by attempting to connect
|
||||||
|
func waitForRelay(url string, timeout time.Duration) error {
|
||||||
|
// Extract host:port from ws:// URL
|
||||||
|
addr := url
|
||||||
|
if len(url) > 7 && url[:5] == "ws://" {
|
||||||
|
addr = url[5:]
|
||||||
|
}
|
||||||
|
deadline := time.Now().Add(timeout)
|
||||||
|
attempts := 0
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
conn, err := net.DialTimeout("tcp", addr, 500*time.Millisecond)
|
||||||
|
if err == nil {
|
||||||
|
conn.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
attempts++
|
||||||
|
if attempts%10 == 0 {
|
||||||
|
// Log every 10th attempt (every second)
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("timeout waiting for relay at %s after %d attempts", url, attempts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func outputResults(results []relaytester.TestResult, t *testing.T) {
|
func outputResults(results []relaytester.TestResult, t *testing.T) {
|
||||||
|
|||||||
Reference in New Issue
Block a user