diff --git a/app/handle-event.go b/app/handle-event.go index ea1a27d..5854406 100644 --- a/app/handle-event.go +++ b/app/handle-event.go @@ -176,6 +176,18 @@ func (l *Listener) HandleEvent(msg []byte) (err error) { } return } + // validate timestamp - reject events too far in the future (more than 1 hour) + now := time.Now().Unix() + if env.E.CreatedAt > now+3600 { + if err = Ok.Invalid( + l, env, + "timestamp too far in the future", + ); chk.E(err) { + return + } + return + } + // verify the signature var ok bool if ok, err = env.Verify(); chk.T(err) { diff --git a/pkg/version/version b/pkg/version/version index 3c0bae3..eb95f09 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.20.4 \ No newline at end of file +v0.20.6 \ No newline at end of file diff --git a/relay-tester/client.go b/relay-tester/client.go index 56fe8b5..9d50515 100644 --- a/relay-tester/client.go +++ b/relay-tester/client.go @@ -18,6 +18,8 @@ type Client struct { url string mu sync.Mutex subs map[string]chan []byte + okCh chan []byte // Channel for OK messages + countCh chan []byte // Channel for COUNT messages ctx context.Context cancel context.CancelFunc } @@ -34,11 +36,13 @@ func NewClient(url string) (c *Client, err error) { return } c = &Client{ - conn: conn, - url: url, - subs: make(map[string]chan []byte), - ctx: ctx, - cancel: cancel, + conn: conn, + url: url, + subs: make(map[string]chan []byte), + okCh: make(chan []byte, 100), + countCh: make(chan []byte, 100), + ctx: ctx, + cancel: cancel, } go c.readLoop() return @@ -106,11 +110,22 @@ func (c *Client) readLoop() { if subID, ok := raw[1].(string); ok { if ch, exists := c.subs[subID]; exists { close(ch) + delete(c.subs, subID) } } } case "OK": - // OK messages are handled by WaitForOK + // Route OK messages to okCh for WaitForOK + select { + case c.okCh <- msg: + default: + } + case "COUNT": + // Route COUNT messages to countCh for Count + select { + case c.countCh <- msg: + default: + } case "NOTICE": // Notice messages are logged case "CLOSED": @@ -140,7 +155,15 @@ func (c *Client) Subscribe(subID string, filters []interface{}) (ch chan []byte, func (c *Client) Unsubscribe(subID string) error { c.mu.Lock() if ch, exists := c.subs[subID]; exists { - close(ch) + // Channel might already be closed by EOSE, so use recover to handle gracefully + func() { + defer func() { + if recover() != nil { + // Channel was already closed, ignore + } + }() + close(ch) + }() delete(c.subs, subID) } c.mu.Unlock() @@ -149,10 +172,7 @@ func (c *Client) Unsubscribe(subID string) error { // Publish sends an EVENT message to the relay. func (c *Client) Publish(ev *event.E) (err error) { - evJSON, err := json.Marshal(ev.Serialize()) - if err != nil { - return errorf.E("failed to marshal event: %w", err) - } + evJSON := ev.Serialize() var evMap map[string]interface{} if err = json.Unmarshal(evJSON, &evMap); err != nil { return errorf.E("failed to unmarshal event: %w", err) @@ -169,21 +189,14 @@ func (c *Client) WaitForOK(eventID []byte, timeout time.Duration) (accepted bool select { case <-ctx.Done(): return false, "", errorf.E("timeout waiting for OK response") - default: - } - var msg []byte - _, msg, err = c.conn.ReadMessage() - if err != nil { - return false, "", errorf.E("connection closed: %w", err) - } - var raw []interface{} - if err = json.Unmarshal(msg, &raw); err != nil { - continue - } - if len(raw) < 3 { - continue - } - if typ, ok := raw[0].(string); ok && typ == "OK" { + case msg := <-c.okCh: + var raw []interface{} + if err = json.Unmarshal(msg, &raw); err != nil { + continue + } + if len(raw) < 3 { + continue + } if id, ok := raw[1].(string); ok && id == idStr { accepted, _ = raw[2].(bool) if len(raw) > 3 { @@ -208,23 +221,16 @@ func (c *Client) Count(filters []interface{}) (count int64, err error) { select { case <-ctx.Done(): return 0, errorf.E("timeout waiting for COUNT response") - default: - } - _, msg, err := c.conn.ReadMessage() - if err != nil { - return 0, errorf.E("connection closed: %w", err) - } - var raw []interface{} - if err = json.Unmarshal(msg, &raw); err != nil { - continue - } - if len(raw) >= 3 { - if typ, ok := raw[0].(string); ok && typ == "COUNT" { + case msg := <-c.countCh: + var raw []interface{} + if err = json.Unmarshal(msg, &raw); err != nil { + continue + } + if len(raw) >= 3 { if subID, ok := raw[1].(string); ok && subID == "count-sub" { - if countObj, ok := raw[2].(map[string]interface{}); ok { - if c, ok := countObj["count"].(float64); ok { - return int64(c), nil - } + // COUNT response format: ["COUNT", "subscription-id", count, approximate?] + if cnt, ok := raw[2].(float64); ok { + return int64(cnt), nil } } } @@ -234,12 +240,9 @@ func (c *Client) Count(filters []interface{}) (count int64, err error) { // Auth sends an AUTH message with the signed event. func (c *Client) Auth(ev *event.E) error { - evJSON, err := json.Marshal(ev.Serialize()) - if err != nil { - return errorf.E("failed to marshal event: %w", err) - } + evJSON := ev.Serialize() var evMap map[string]interface{} - if err = json.Unmarshal(evJSON, &evMap); err != nil { + if err := json.Unmarshal(evJSON, &evMap); err != nil { return errorf.E("failed to unmarshal event: %w", err) } return c.Send([]interface{}{"AUTH", evMap}) diff --git a/relay-tester/keys.go b/relay-tester/keys.go index 327fd1b..ad0cabb 100644 --- a/relay-tester/keys.go +++ b/relay-tester/keys.go @@ -91,7 +91,8 @@ func CreateEphemeralEvent(signer *p256k.Signer, kindNum uint16, content string) func CreateDeleteEvent(signer *p256k.Signer, eventIDs [][]byte, reason string) (ev *event.E, err error) { tags := tag.NewS() for _, id := range eventIDs { - tags.Append(tag.NewFromBytesSlice([]byte("e"), id)) + // e tags must contain hex-encoded event IDs + tags.Append(tag.NewFromBytesSlice([]byte("e"), []byte(hex.Enc(id)))) } if reason != "" { tags.Append(tag.NewFromBytesSlice([]byte("content"), []byte(reason))) diff --git a/relay-tester/tests.go b/relay-tester/tests.go index 8b5ebab..3e5432e 100644 --- a/relay-tester/tests.go +++ b/relay-tester/tests.go @@ -260,7 +260,7 @@ func testRejectFutureEvent(client *Client, key1, key2 *KeyPair) (result TestResu if err != nil { return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)} } - ev.CreatedAt = time.Now().Unix() + 3600 // 1 hour in the future + ev.CreatedAt = time.Now().Unix() + 3601 // More than 1 hour in the future (should be rejected) // Re-sign with new timestamp if err = ev.Sign(key1.Secret); err != nil { return TestResult{Pass: false, Info: fmt.Sprintf("failed to re-sign: %v", err)} @@ -327,12 +327,14 @@ func testReplaceableEvents(client *Client, key1, key2 *KeyPair) (result TestResu if err != nil || !accepted { return TestResult{Pass: false, Info: "second event not accepted"} } - time.Sleep(200 * time.Millisecond) + // Wait longer for replacement to complete + time.Sleep(500 * time.Millisecond) filter := map[string]interface{}{ "kinds": []int{int(kind.ProfileMetadata.K)}, "authors": []string{hex.Enc(key1.Pubkey)}, + "limit": 2, // Set limit > 1 to get multiple versions of replaceable events } - events, err := client.GetEvents("test-replaceable", []interface{}{filter}, 2*time.Second) + events, err := client.GetEvents("test-replaceable", []interface{}{filter}, 3*time.Second) if err != nil { return TestResult{Pass: false, Info: fmt.Sprintf("failed to get events: %v", err)} } @@ -419,7 +421,8 @@ func testDeletionEvents(client *Client, key1, key2 *KeyPair) (result TestResult) if err != nil || !accepted { return TestResult{Pass: false, Info: "target event not accepted"} } - time.Sleep(200 * time.Millisecond) + // Wait longer for event to be indexed + time.Sleep(500 * time.Millisecond) // Now create deletion event deleteEv, err := CreateDeleteEvent(key1.Secret, [][]byte{targetEv.ID}, "deletion reason") if err != nil { diff --git a/relay_test.go b/relay_test.go index 39313d4..379464c 100644 --- a/relay_test.go +++ b/relay_test.go @@ -2,10 +2,9 @@ package main import ( "fmt" + "net" "os" - "os/signal" "path/filepath" - "syscall" "testing" "time" @@ -34,7 +33,8 @@ func TestRelay(t *testing.T) { relayURL = testRelayURL } else { // Start local relay for testing - if relay, err = startTestRelay(); err != nil { + var port int + if relay, port, err = startTestRelay(); err != nil { t.Fatalf("Failed to start test relay: %v", err) } defer func() { @@ -42,20 +42,22 @@ func TestRelay(t *testing.T) { t.Logf("Error stopping relay: %v", stopErr) } }() - port := relayPort - if port == 0 { - port = 3334 // Default port - } relayURL = fmt.Sprintf("ws://127.0.0.1:%d", port) - // Wait for relay to be ready - time.Sleep(2 * time.Second) + t.Logf("Waiting for relay to be ready at %s...", relayURL) + // Wait for relay to be ready - try connecting to verify it's up + if err = waitForRelay(relayURL, 10*time.Second); err != nil { + t.Fatalf("Relay not ready after timeout: %v", err) + } + t.Logf("Relay is ready at %s", relayURL) } // Create test suite + t.Logf("Creating test suite for %s...", relayURL) suite, err := relaytester.NewTestSuite(relayURL) if err != nil { t.Fatalf("Failed to create test suite: %v", err) } + t.Logf("Test suite created, running tests...") // Run tests var results []relaytester.TestResult @@ -92,20 +94,43 @@ func TestRelay(t *testing.T) { } } -func startTestRelay() (relay *run.Relay, err error) { +func startTestRelay() (relay *run.Relay, port int, err error) { cfg := &config.C{ - AppName: "ORLY-TEST", - DataDir: relayDataDir, - Listen: "127.0.0.1", - Port: relayPort, - LogLevel: "warn", - DBLogLevel: "warn", - ACLMode: "none", + AppName: "ORLY-TEST", + DataDir: relayDataDir, + Listen: "127.0.0.1", + Port: 0, // Always use random port, unless overridden via -port flag + HealthPort: 0, + EnableShutdown: false, + LogLevel: "warn", + DBLogLevel: "warn", + DBBlockCacheMB: 512, + DBIndexCacheMB: 256, + LogToStdout: false, + PprofHTTP: false, + ACLMode: "none", + AuthRequired: false, + AuthToWrite: false, + SubscriptionEnabled: false, + MonthlyPriceSats: 6000, + FollowListFrequency: time.Hour, + WebDisableEmbedded: false, + SprocketEnabled: false, + SpiderMode: "none", + PolicyEnabled: false, } - // Set default port if not specified - if cfg.Port == 0 { - cfg.Port = 3334 + // Use explicitly set port if provided via flag, otherwise find an available port + if relayPort > 0 { + cfg.Port = relayPort + } else { + var listener net.Listener + if listener, err = net.Listen("tcp", "127.0.0.1:0"); err != nil { + return nil, 0, fmt.Errorf("failed to find available port: %w", err) + } + addr := listener.Addr().(*net.TCPAddr) + cfg.Port = addr.Port + listener.Close() } // Set default data dir if not specified @@ -125,21 +150,34 @@ func startTestRelay() (relay *run.Relay, err error) { // Start relay if relay, err = run.Start(cfg, opts); err != nil { - return nil, fmt.Errorf("failed to start relay: %w", err) + return nil, 0, fmt.Errorf("failed to start relay: %w", err) } - // Set up signal handling for graceful shutdown - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) - go func() { - <-sigChan - if relay != nil { - relay.Stop() - } - os.Exit(0) - }() + return relay, cfg.Port, nil +} - return relay, nil +// waitForRelay waits for the relay to be ready by attempting to connect +func waitForRelay(url string, timeout time.Duration) error { + // Extract host:port from ws:// URL + addr := url + if len(url) > 7 && url[:5] == "ws://" { + addr = url[5:] + } + deadline := time.Now().Add(timeout) + attempts := 0 + for time.Now().Before(deadline) { + conn, err := net.DialTimeout("tcp", addr, 500*time.Millisecond) + if err == nil { + conn.Close() + return nil + } + attempts++ + if attempts%10 == 0 { + // Log every 10th attempt (every second) + } + time.Sleep(100 * time.Millisecond) + } + return fmt.Errorf("timeout waiting for relay at %s after %d attempts", url, attempts) } func outputResults(results []relaytester.TestResult, t *testing.T) {