Compare commits

...

7 Commits

Author SHA1 Message Date
0ba555c6a8 Update version to v0.21.0 and enhance relay client functionality
- Bumped version from v0.20.6 to v0.21.0.
- Added a `complete` map in the Client struct to track subscription completion status.
- Improved event handling in the read loop to manage EOSE messages and subscription closures.
- Introduced new tests for filtering, event ordering, and subscription behaviors, enhancing test coverage and reliability.
2025-10-30 19:26:42 +00:00
54f65d8740 Enhance relay testing and event handling
- Updated TestRelay to include a wait mechanism for relay readiness, improving test reliability.
- Refactored startTestRelay to return the assigned port, allowing dynamic port assignment.
- Added timestamp validation in HandleEvent to reject events with timestamps more than one hour in the future.
- Introduced channels for handling OK and COUNT messages in the Client struct, improving message processing.
- Updated tests to reflect changes in event timestamp handling and increased wait times for event processing.
- Bumped version to v0.20.6 to reflect these enhancements.
2025-10-30 19:12:11 +00:00
2ff8b47410 bump to v0.20.5 2025-10-30 18:37:30 +00:00
ba2d35012c Enhance WebSocket connection management and error handling
- Set initial read deadline for connections to prevent premature timeouts on idle connections.
- Improved pong and ping handlers to extend read deadlines and handle timeout errors more effectively.
- Refined error logging for connection issues, distinguishing between timeouts and connection errors to enhance debugging.
- Updated subscriber delivery logic to handle timeouts gracefully, allowing for potential recovery without immediate disconnection.
2025-10-30 18:32:03 +00:00
b70f03bce0 Refactor policy script handling and improve fallback logic
- Renamed test functions for clarity, changing "NotRunning" to "Disabled" to better reflect the policy state.
- Updated policy checks to ensure that if the policy is disabled, it falls back to the default policy immediately.
- Enhanced error handling in the policy manager to ensure proper startup and running state management.
- Introduced a new method to ensure the policy is running, with timeout handling for startup completion.
- Bumped version to v0.20.3 to reflect these changes.
2025-10-30 18:22:56 +00:00
8954846864 Add relay testing framework and utilities
- Introduced a new `relaytester` package to facilitate testing of relay functionalities.
- Implemented a `TestSuite` structure to manage and execute various test cases against the relay.
- Added multiple test cases for event publishing, retrieval, and validation, ensuring comprehensive coverage of relay behavior.
- Created utility functions for generating key pairs and events, enhancing test reliability and maintainability.
- Established a WebSocket client for interacting with the relay during tests, including subscription and message handling.
- Included JSON formatting for test results to improve output readability.
- This commit lays the groundwork for robust integration testing of relay features.
2025-10-30 18:14:22 +00:00
5e6c0b80aa Add Relay functionality for managing startup and shutdown processes
- Introduced a new package `run` with a `Relay` struct to manage the lifecycle of a relay instance.
- Implemented `Start` and `Stop` methods for initializing and gracefully shutting down the relay, including options for log capturing and data directory cleanup.
- Added methods to retrieve captured stdout and stderr logs.
- Enhanced configuration handling for data directory and logging based on user-defined options.
2025-10-30 17:57:57 +00:00
12 changed files with 3460 additions and 44 deletions

View File

@@ -176,6 +176,18 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
}
return
}
// validate timestamp - reject events too far in the future (more than 1 hour)
now := time.Now().Unix()
if env.E.CreatedAt > now+3600 {
if err = Ok.Invalid(
l, env,
"timestamp too far in the future",
); chk.E(err) {
return
}
return
}
// verify the signature
var ok bool
if ok, err = env.Verify(); chk.T(err) {

View File

@@ -71,6 +71,10 @@ whitelist:
// Set read limit immediately after connection is established
conn.SetReadLimit(DefaultMaxMessageSize)
log.D.F("set read limit to %d bytes (%d MB) for %s", DefaultMaxMessageSize, DefaultMaxMessageSize/units.Mb, remote)
// Set initial read deadline - pong handler will extend it when pongs are received
conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
defer conn.Close()
listener := &Listener{
ctx: ctx,
@@ -100,12 +104,12 @@ whitelist:
log.D.F("AUTH challenge sent successfully to %s", remote)
}
ticker := time.NewTicker(DefaultPingWait)
// Set pong handler
// Set pong handler - extends read deadline when pongs are received
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
return nil
})
// Set ping handler
// Set ping handler - extends read deadline when pings are received
conn.SetPingHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
return conn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(DefaultWriteTimeout))
@@ -159,14 +163,14 @@ whitelist:
var msg []byte
log.T.F("waiting for message from %s", remote)
// Set read deadline for context cancellation
deadline := time.Now().Add(DefaultPongWait)
// Don't set read deadline here - it's set initially and extended by pong handler
// This prevents premature timeouts on idle connections with active subscriptions
if ctx.Err() != nil {
return
}
conn.SetReadDeadline(deadline)
// Block waiting for message; rely on pings and context cancellation to detect dead peers
// The read deadline is managed by the pong handler which extends it when pongs are received
typ, msg, err = conn.ReadMessage()
if err != nil {
@@ -187,6 +191,12 @@ whitelist:
log.T.F("connection from %s closed: %v", remote, err)
return
}
// Handle timeout errors specifically - these can occur on idle connections
// but pongs should extend the deadline, so a timeout usually means dead connection
if strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded") {
log.T.F("connection from %s read timeout (likely dead connection): %v", remote, err)
return
}
// Handle message too big errors specifically
if strings.Contains(err.Error(), "message too large") ||
strings.Contains(err.Error(), "read limited at") {
@@ -216,13 +226,41 @@ whitelist:
deadline := time.Now().Add(DefaultWriteTimeout)
conn.SetWriteDeadline(deadline)
pongStart := time.Now()
if err = conn.WriteControl(websocket.PongMessage, msg, deadline); chk.E(err) {
if err = conn.WriteControl(websocket.PongMessage, msg, deadline); err != nil {
pongDuration := time.Since(pongStart)
log.E.F(
"failed to send PONG to %s after %v: %v", remote,
pongDuration, err,
)
return
// Check if this is a timeout vs a connection error
isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded")
isConnectionError := strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "broken pipe") ||
strings.Contains(err.Error(), "connection reset") ||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived)
if isConnectionError {
log.E.F(
"failed to send PONG to %s after %v (connection error): %v", remote,
pongDuration, err,
)
return
} else if isTimeout {
// Timeout on pong - log but don't close immediately
// The read deadline will catch dead connections
log.W.F(
"failed to send PONG to %s after %v (timeout, but connection may still be alive): %v", remote,
pongDuration, err,
)
// Continue - don't close connection on pong timeout
} else {
// Unknown error - log and continue
log.E.F(
"failed to send PONG to %s after %v (unknown error): %v", remote,
pongDuration, err,
)
// Continue - don't close on unknown errors
}
continue
}
pongDuration := time.Since(pongStart)
log.D.F("sent PONG to %s successfully in %v", remote, pongDuration)
@@ -264,12 +302,40 @@ func (s *Server) Pinger(
if err = conn.WriteControl(websocket.PingMessage, []byte{}, deadline); err != nil {
pingDuration := time.Since(pingStart)
log.E.F(
"PING #%d FAILED after %v: %v", pingCount, pingDuration,
err,
)
chk.E(err)
return
// Check if this is a timeout vs a connection error
isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded")
isConnectionError := strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "broken pipe") ||
strings.Contains(err.Error(), "connection reset") ||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived)
if isConnectionError {
log.E.F(
"PING #%d FAILED after %v (connection error): %v", pingCount, pingDuration,
err,
)
chk.E(err)
return
} else if isTimeout {
// Timeout on ping - log but don't stop pinger immediately
// The read deadline will catch dead connections
log.W.F(
"PING #%d timeout after %v (connection may still be alive): %v", pingCount, pingDuration,
err,
)
// Continue - don't stop pinger on timeout
} else {
// Unknown error - log and continue
log.E.F(
"PING #%d FAILED after %v (unknown error): %v", pingCount, pingDuration,
err,
)
// Continue - don't stop pinger on unknown errors
}
continue
}
pingDuration := time.Since(pingStart)

View File

@@ -283,17 +283,36 @@ func (p *P) Deliver(ev *event.E) {
hex.Enc(ev.ID), d.sub.remote, d.id, deliveryDuration, err)
// Check for timeout specifically
if strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline") {
isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded")
if isTimeout {
log.E.F("subscription delivery TIMEOUT: event=%s to=%s after %v (limit=%v)",
hex.Enc(ev.ID), d.sub.remote, deliveryDuration, DefaultWriteTimeout)
}
// Log connection cleanup
log.D.F("removing failed subscriber connection: %s", d.sub.remote)
// Only close connection on permanent errors, not transient timeouts
// WebSocket write errors typically indicate connection issues, but we should
// distinguish between timeouts (client might be slow) and connection errors
isConnectionError := strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "broken pipe") ||
strings.Contains(err.Error(), "connection reset") ||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived)
// On error, remove the subscriber connection safely
p.removeSubscriber(d.w)
_ = d.w.Close()
if isConnectionError {
log.D.F("removing failed subscriber connection due to connection error: %s", d.sub.remote)
p.removeSubscriber(d.w)
_ = d.w.Close()
} else if isTimeout {
// For timeouts, log but don't immediately close - give it another chance
// The read deadline will catch dead connections eventually
log.W.F("subscription delivery timeout for %s (client may be slow), skipping event but keeping connection", d.sub.remote)
} else {
// Unknown error - be conservative and close
log.D.F("removing failed subscriber connection due to unknown error: %s", d.sub.remote)
p.removeSubscriber(d.w)
_ = d.w.Close()
}
continue
}

View File

@@ -10,6 +10,7 @@ import (
"os"
"os/exec"
"path/filepath"
"runtime"
"sync"
"time"
@@ -131,11 +132,13 @@ type PolicyManager struct {
currentCancel context.CancelFunc
mutex sync.RWMutex
isRunning bool
isStarting bool
enabled bool
stdin io.WriteCloser
stdout io.ReadCloser
stderr io.ReadCloser
responseChan chan PolicyResponse
startupChan chan error
}
// P represents a complete policy configuration for a Nostr relay.
@@ -203,6 +206,7 @@ func NewWithManager(ctx context.Context, appName string, enabled bool) *P {
scriptPath: scriptPath,
enabled: enabled,
responseChan: make(chan PolicyResponse, 100), // Buffered channel for responses
startupChan: make(chan error, 1), // Channel for startup completion
}
// Load policy configuration from JSON file
@@ -279,8 +283,19 @@ func (p *P) CheckPolicy(access string, ev *event.E, loggedInPubkey []byte, ipAdd
}
// Check if script is present and enabled
if rule.Script != "" && p.Manager != nil && p.Manager.IsEnabled() {
return p.checkScriptPolicy(access, ev, rule.Script, loggedInPubkey, ipAddress)
if rule.Script != "" && p.Manager != nil {
if p.Manager.IsEnabled() {
return p.checkScriptPolicy(access, ev, rule.Script, loggedInPubkey, ipAddress)
}
// Script is configured but policy is disabled - use default policy if rule has no other restrictions
hasOtherRestrictions := len(rule.WriteAllow) > 0 || len(rule.WriteDeny) > 0 || len(rule.ReadAllow) > 0 || len(rule.ReadDeny) > 0 ||
rule.SizeLimit != nil || rule.ContentLimit != nil || len(rule.MustHaveTags) > 0 ||
rule.MaxExpiry != nil || rule.Privileged || rule.RateLimit != nil ||
rule.MaxAgeOfEvent != nil || rule.MaxAgeEventInFuture != nil
if !hasOtherRestrictions {
// No other restrictions, use default policy
return p.getDefaultPolicyAction(), nil
}
}
// Apply rule-based filtering
@@ -452,12 +467,41 @@ func (p *P) checkRulePolicy(access string, ev *event.E, rule Rule, loggedInPubke
// checkScriptPolicy runs the policy script to determine if event should be allowed
func (p *P) checkScriptPolicy(access string, ev *event.E, scriptPath string, loggedInPubkey []byte, ipAddress string) (allowed bool, err error) {
if p.Manager == nil || !p.Manager.IsRunning() {
// If script is not running, fall back to default policy
log.W.F("policy rule for kind %d is inactive (script not running), falling back to default policy (%s)", ev.Kind, p.DefaultPolicy)
if p.Manager == nil {
return false, fmt.Errorf("policy manager is not initialized")
}
// If policy is disabled, fall back to default policy immediately
if !p.Manager.IsEnabled() {
log.W.F("policy rule for kind %d is inactive (policy disabled), falling back to default policy (%s)", ev.Kind, p.DefaultPolicy)
return p.getDefaultPolicyAction(), nil
}
// Policy is enabled, check if it's running
if !p.Manager.IsRunning() {
// Check if script file exists
if _, err := os.Stat(p.Manager.GetScriptPath()); os.IsNotExist(err) {
// Script doesn't exist, this is a fatal error
buf := make([]byte, 1024*1024)
n := runtime.Stack(buf, true)
log.E.F("policy script does not exist at %s", p.Manager.GetScriptPath())
fmt.Fprintf(os.Stderr, "FATAL: Policy script required but not found at %s\n", p.Manager.GetScriptPath())
fmt.Fprintf(os.Stderr, "Stack trace:\n%s\n", buf[:n])
os.Exit(1)
}
// Try to start the policy and wait for it
if err := p.Manager.ensureRunning(); err != nil {
// Startup failed, this is a fatal error
buf := make([]byte, 1024*1024)
n := runtime.Stack(buf, true)
log.E.F("failed to start policy script: %v", err)
fmt.Fprintf(os.Stderr, "FATAL: Failed to start policy script: %v\n", err)
fmt.Fprintf(os.Stderr, "Stack trace:\n%s\n", buf[:n])
os.Exit(1)
}
}
// Create policy event with additional context
policyEvent := &PolicyEvent{
E: ev,
@@ -535,6 +579,91 @@ func (pm *PolicyManager) startPolicyIfExists() {
}
}
// ensureRunning ensures the policy is running, starting it if necessary.
// It waits for startup to complete with a timeout and returns an error if startup fails.
func (pm *PolicyManager) ensureRunning() error {
pm.mutex.Lock()
// Check if already running
if pm.isRunning {
pm.mutex.Unlock()
return nil
}
// Check if already starting
if pm.isStarting {
pm.mutex.Unlock()
// Wait for startup to complete
select {
case err := <-pm.startupChan:
if err != nil {
return fmt.Errorf("policy startup failed: %v", err)
}
// Double-check it's actually running after receiving signal
pm.mutex.RLock()
running := pm.isRunning
pm.mutex.RUnlock()
if !running {
return fmt.Errorf("policy startup completed but process is not running")
}
return nil
case <-time.After(10 * time.Second):
return fmt.Errorf("policy startup timeout")
case <-pm.ctx.Done():
return fmt.Errorf("policy context cancelled")
}
}
// Mark as starting
pm.isStarting = true
pm.mutex.Unlock()
// Start the policy in a goroutine
go func() {
err := pm.StartPolicy()
pm.mutex.Lock()
pm.isStarting = false
pm.mutex.Unlock()
// Signal startup completion (non-blocking)
// Drain any stale value first, then send
select {
case <-pm.startupChan:
default:
}
select {
case pm.startupChan <- err:
default:
// Channel should be empty now, but if it's full, try again
pm.startupChan <- err
}
}()
// Wait for startup to complete
select {
case err := <-pm.startupChan:
if err != nil {
return fmt.Errorf("policy startup failed: %v", err)
}
// Double-check it's actually running after receiving signal
pm.mutex.RLock()
running := pm.isRunning
pm.mutex.RUnlock()
if !running {
return fmt.Errorf("policy startup completed but process is not running")
}
return nil
case <-time.After(10 * time.Second):
pm.mutex.Lock()
pm.isStarting = false
pm.mutex.Unlock()
return fmt.Errorf("policy startup timeout")
case <-pm.ctx.Done():
pm.mutex.Lock()
pm.isStarting = false
pm.mutex.Unlock()
return fmt.Errorf("policy context cancelled")
}
}
// StartPolicy starts the policy script process.
// Returns an error if the script doesn't exist, can't be executed, or is already running.
func (pm *PolicyManager) StartPolicy() error {
@@ -800,6 +929,11 @@ func (pm *PolicyManager) IsRunning() bool {
return pm.isRunning
}
// GetScriptPath returns the path to the policy script.
func (pm *PolicyManager) GetScriptPath() string {
return pm.scriptPath
}
// Shutdown gracefully shuts down the policy manager.
// It cancels the context and stops any running policy script.
func (pm *PolicyManager) Shutdown() {

View File

@@ -1136,11 +1136,11 @@ func TestMaxAgeChecks(t *testing.T) {
}
}
func TestScriptPolicyNotRunningFallsBackToDefault(t *testing.T) {
func TestScriptPolicyDisabledFallsBackToDefault(t *testing.T) {
// Generate real keypair for testing
eventSigner, eventPubkey := generateTestKeypair(t)
// Create a policy with a script rule but no running manager, default policy is "allow"
// Create a policy with a script rule but policy is disabled, default policy is "allow"
policy := &P{
DefaultPolicy: "allow",
Rules: map[int]Rule{
@@ -1150,21 +1150,21 @@ func TestScriptPolicyNotRunningFallsBackToDefault(t *testing.T) {
},
},
Manager: &PolicyManager{
enabled: true,
isRunning: false, // Script is not running
enabled: false, // Policy is disabled
isRunning: false,
},
}
// Create real test event with proper signing
testEvent := createTestEvent(t, eventSigner, "test content", 1)
// Should allow the event when script is configured but not running (falls back to default "allow")
// Should allow the event when policy is disabled (falls back to default "allow")
allowed, err := policy.CheckPolicy("write", testEvent, eventPubkey, "127.0.0.1")
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if !allowed {
t.Error("Expected event to be allowed when script is not running (should fall back to default policy 'allow')")
t.Error("Expected event to be allowed when policy is disabled (should fall back to default policy 'allow')")
}
// Test with default policy "deny"
@@ -1174,7 +1174,7 @@ func TestScriptPolicyNotRunningFallsBackToDefault(t *testing.T) {
t.Errorf("Unexpected error: %v", err2)
}
if allowed2 {
t.Error("Expected event to be denied when script is not running and default policy is 'deny'")
t.Error("Expected event to be denied when policy is disabled and default policy is 'deny'")
}
}
@@ -1340,12 +1340,11 @@ func TestNewPolicyWithDefaultPolicyJSON(t *testing.T) {
}
}
func TestScriptProcessingFailureFallsBackToDefault(t *testing.T) {
func TestScriptProcessingDisabledFallsBackToDefault(t *testing.T) {
// Generate real keypair for testing
eventSigner, eventPubkey := generateTestKeypair(t)
// Test that script processing failures fall back to default policy
// We'll test this by using a manager that's not running (simulating failure)
// Test that when policy is disabled, it falls back to default policy
policy := &P{
DefaultPolicy: "allow",
Rules: map[int]Rule{
@@ -1355,21 +1354,21 @@ func TestScriptProcessingFailureFallsBackToDefault(t *testing.T) {
},
},
Manager: &PolicyManager{
enabled: true,
isRunning: false, // Script is not running (simulating failure)
enabled: false, // Policy is disabled
isRunning: false,
},
}
// Create real test event with proper signing
testEvent := createTestEvent(t, eventSigner, "test content", 1)
// Should allow the event when script is not running (falls back to default "allow")
// Should allow the event when policy is disabled (falls back to default "allow")
allowed, err := policy.checkScriptPolicy("write", testEvent, "policy.sh", eventPubkey, "127.0.0.1")
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if !allowed {
t.Error("Expected event to be allowed when script is not running (should fall back to default policy 'allow')")
t.Error("Expected event to be allowed when policy is disabled (should fall back to default policy 'allow')")
}
// Test with default policy "deny"
@@ -1379,7 +1378,7 @@ func TestScriptProcessingFailureFallsBackToDefault(t *testing.T) {
t.Errorf("Unexpected error: %v", err2)
}
if allowed2 {
t.Error("Expected event to be denied when script is not running and default policy is 'deny'")
t.Error("Expected event to be denied when policy is disabled and default policy is 'deny'")
}
}

200
pkg/run/run.go Normal file
View File

@@ -0,0 +1,200 @@
package run
import (
"bytes"
"context"
"io"
"os"
"path/filepath"
"strings"
"sync"
"github.com/adrg/xdg"
"lol.mleku.dev/chk"
lol "lol.mleku.dev"
"next.orly.dev/app"
"next.orly.dev/app/config"
"next.orly.dev/pkg/acl"
"next.orly.dev/pkg/database"
)
// Options configures relay startup behavior.
type Options struct {
// CleanupDataDir controls whether the data directory is deleted on Stop().
// Defaults to true. Set to false to preserve the data directory.
CleanupDataDir *bool
// StdoutWriter is an optional writer to receive stdout logs.
// If nil, stdout will be captured to a buffer accessible via Relay.Stdout().
StdoutWriter io.Writer
// StderrWriter is an optional writer to receive stderr logs.
// If nil, stderr will be captured to a buffer accessible via Relay.Stderr().
StderrWriter io.Writer
}
// Relay represents a running relay instance that can be started and stopped.
type Relay struct {
ctx context.Context
cancel context.CancelFunc
db *database.D
quit chan struct{}
dataDir string
cleanupDataDir bool
// Log capture
stdoutBuf *bytes.Buffer
stderrBuf *bytes.Buffer
stdoutWriter io.Writer
stderrWriter io.Writer
logMu sync.RWMutex
}
// Start initializes and starts a relay with the given configuration.
// It bypasses the configuration loading step and uses the provided config directly.
//
// Parameters:
// - cfg: The configuration to use for the relay
// - opts: Optional configuration for relay behavior. If nil, defaults are used.
//
// Returns:
// - relay: A Relay instance that can be used to stop the relay
// - err: An error if initialization or startup fails
func Start(cfg *config.C, opts *Options) (relay *Relay, err error) {
relay = &Relay{
cleanupDataDir: true,
}
// Apply options
var userStdoutWriter, userStderrWriter io.Writer
if opts != nil {
if opts.CleanupDataDir != nil {
relay.cleanupDataDir = *opts.CleanupDataDir
}
userStdoutWriter = opts.StdoutWriter
userStderrWriter = opts.StderrWriter
}
// Set up log capture buffers
relay.stdoutBuf = &bytes.Buffer{}
relay.stderrBuf = &bytes.Buffer{}
// Build writers list for stdout
stdoutWriters := []io.Writer{relay.stdoutBuf}
if userStdoutWriter != nil {
stdoutWriters = append(stdoutWriters, userStdoutWriter)
}
stdoutWriters = append(stdoutWriters, os.Stdout)
relay.stdoutWriter = io.MultiWriter(stdoutWriters...)
// Build writers list for stderr
stderrWriters := []io.Writer{relay.stderrBuf}
if userStderrWriter != nil {
stderrWriters = append(stderrWriters, userStderrWriter)
}
stderrWriters = append(stderrWriters, os.Stderr)
relay.stderrWriter = io.MultiWriter(stderrWriters...)
// Set up logging - write to appropriate destination and capture
if cfg.LogToStdout {
lol.Writer = relay.stdoutWriter
} else {
lol.Writer = relay.stderrWriter
}
lol.SetLogLevel(cfg.LogLevel)
// Expand DataDir if needed
if cfg.DataDir == "" || strings.Contains(cfg.DataDir, "~") {
cfg.DataDir = filepath.Join(xdg.DataHome, cfg.AppName)
}
relay.dataDir = cfg.DataDir
// Create context
relay.ctx, relay.cancel = context.WithCancel(context.Background())
// Initialize database
if relay.db, err = database.New(
relay.ctx, relay.cancel, cfg.DataDir, cfg.DBLogLevel,
); chk.E(err) {
return
}
// Configure ACL
acl.Registry.Active.Store(cfg.ACLMode)
if err = acl.Registry.Configure(cfg, relay.db, relay.ctx); chk.E(err) {
return
}
acl.Registry.Syncer()
// Start the relay
relay.quit = app.Run(relay.ctx, cfg, relay.db)
return
}
// Stop gracefully stops the relay by canceling the context and closing the database.
// If CleanupDataDir is enabled (default), it also removes the data directory.
//
// Returns:
// - err: An error if shutdown fails
func (r *Relay) Stop() (err error) {
if r.cancel != nil {
r.cancel()
}
if r.quit != nil {
<-r.quit
}
if r.db != nil {
err = r.db.Close()
}
// Clean up data directory if enabled
if r.cleanupDataDir && r.dataDir != "" {
if rmErr := os.RemoveAll(r.dataDir); rmErr != nil {
if err == nil {
err = rmErr
}
}
}
return
}
// Stdout returns the complete stdout log buffer contents.
func (r *Relay) Stdout() string {
r.logMu.RLock()
defer r.logMu.RUnlock()
if r.stdoutBuf == nil {
return ""
}
return r.stdoutBuf.String()
}
// Stderr returns the complete stderr log buffer contents.
func (r *Relay) Stderr() string {
r.logMu.RLock()
defer r.logMu.RUnlock()
if r.stderrBuf == nil {
return ""
}
return r.stderrBuf.String()
}
// StdoutBytes returns the complete stdout log buffer as bytes.
func (r *Relay) StdoutBytes() []byte {
r.logMu.RLock()
defer r.logMu.RUnlock()
if r.stdoutBuf == nil {
return nil
}
return r.stdoutBuf.Bytes()
}
// StderrBytes returns the complete stderr log buffer as bytes.
func (r *Relay) StderrBytes() []byte {
r.logMu.RLock()
defer r.logMu.RUnlock()
if r.stderrBuf == nil {
return nil
}
return r.stderrBuf.Bytes()
}

View File

@@ -1 +1 @@
v0.20.2
v0.21.0

321
relay-tester/client.go Normal file
View File

@@ -0,0 +1,321 @@
package relaytester
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/gorilla/websocket"
"lol.mleku.dev/errorf"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/hex"
)
// Client wraps a WebSocket connection to a relay for testing.
type Client struct {
conn *websocket.Conn
url string
mu sync.Mutex
subs map[string]chan []byte
complete map[string]bool // Track if subscription is complete (e.g., by ID)
okCh chan []byte // Channel for OK messages
countCh chan []byte // Channel for COUNT messages
ctx context.Context
cancel context.CancelFunc
}
// NewClient creates a new test client connected to the relay.
func NewClient(url string) (c *Client, err error) {
ctx, cancel := context.WithCancel(context.Background())
var conn *websocket.Conn
dialer := websocket.Dialer{
HandshakeTimeout: 5 * time.Second,
}
if conn, _, err = dialer.Dial(url, nil); err != nil {
cancel()
return
}
c = &Client{
conn: conn,
url: url,
subs: make(map[string]chan []byte),
complete: make(map[string]bool),
okCh: make(chan []byte, 100),
countCh: make(chan []byte, 100),
ctx: ctx,
cancel: cancel,
}
go c.readLoop()
return
}
// Close closes the client connection.
func (c *Client) Close() error {
c.cancel()
return c.conn.Close()
}
// Send sends a JSON message to the relay.
func (c *Client) Send(msg interface{}) (err error) {
c.mu.Lock()
defer c.mu.Unlock()
var data []byte
if data, err = json.Marshal(msg); err != nil {
return errorf.E("failed to marshal message: %w", err)
}
if err = c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
return errorf.E("failed to write message: %w", err)
}
return
}
// readLoop reads messages from the relay and routes them to subscriptions.
func (c *Client) readLoop() {
defer c.conn.Close()
for {
select {
case <-c.ctx.Done():
return
default:
}
_, msg, err := c.conn.ReadMessage()
if err != nil {
return
}
var raw []interface{}
if err = json.Unmarshal(msg, &raw); err != nil {
continue
}
if len(raw) < 2 {
continue
}
typ, ok := raw[0].(string)
if !ok {
continue
}
c.mu.Lock()
switch typ {
case "EVENT":
if len(raw) >= 2 {
if subID, ok := raw[1].(string); ok {
if ch, exists := c.subs[subID]; exists {
select {
case ch <- msg:
default:
}
}
}
}
case "EOSE":
if len(raw) >= 2 {
if subID, ok := raw[1].(string); ok {
if ch, exists := c.subs[subID]; exists {
// Send EOSE message to channel
select {
case ch <- msg:
default:
}
// For complete subscriptions (by ID), close the channel after EOSE
if c.complete[subID] {
close(ch)
delete(c.subs, subID)
delete(c.complete, subID)
}
}
}
}
case "OK":
// Route OK messages to okCh for WaitForOK
select {
case c.okCh <- msg:
default:
}
case "COUNT":
// Route COUNT messages to countCh for Count
select {
case c.countCh <- msg:
default:
}
case "NOTICE":
// Notice messages are logged
case "CLOSED":
// Closed messages indicate subscription ended
case "AUTH":
// Auth challenge messages
}
c.mu.Unlock()
}
}
// Subscribe creates a subscription and returns a channel for events.
func (c *Client) Subscribe(subID string, filters []interface{}) (ch chan []byte, err error) {
req := []interface{}{"REQ", subID}
req = append(req, filters...)
if err = c.Send(req); err != nil {
return
}
c.mu.Lock()
ch = make(chan []byte, 100)
c.subs[subID] = ch
// Check if subscription is complete (has 'ids' filter)
isComplete := false
for _, f := range filters {
if fMap, ok := f.(map[string]interface{}); ok {
if ids, exists := fMap["ids"]; exists {
if idList, ok := ids.([]string); ok && len(idList) > 0 {
isComplete = true
break
}
}
}
}
c.complete[subID] = isComplete
c.mu.Unlock()
return
}
// Unsubscribe closes a subscription.
func (c *Client) Unsubscribe(subID string) error {
c.mu.Lock()
if ch, exists := c.subs[subID]; exists {
// Channel might already be closed by EOSE, so use recover to handle gracefully
func() {
defer func() {
if recover() != nil {
// Channel was already closed, ignore
}
}()
close(ch)
}()
delete(c.subs, subID)
delete(c.complete, subID)
}
c.mu.Unlock()
return c.Send([]interface{}{"CLOSE", subID})
}
// Publish sends an EVENT message to the relay.
func (c *Client) Publish(ev *event.E) (err error) {
evJSON := ev.Serialize()
var evMap map[string]interface{}
if err = json.Unmarshal(evJSON, &evMap); err != nil {
return errorf.E("failed to unmarshal event: %w", err)
}
return c.Send([]interface{}{"EVENT", evMap})
}
// WaitForOK waits for an OK response for the given event ID.
func (c *Client) WaitForOK(eventID []byte, timeout time.Duration) (accepted bool, reason string, err error) {
ctx, cancel := context.WithTimeout(c.ctx, timeout)
defer cancel()
idStr := hex.Enc(eventID)
for {
select {
case <-ctx.Done():
return false, "", errorf.E("timeout waiting for OK response")
case msg := <-c.okCh:
var raw []interface{}
if err = json.Unmarshal(msg, &raw); err != nil {
continue
}
if len(raw) < 3 {
continue
}
if id, ok := raw[1].(string); ok && id == idStr {
accepted, _ = raw[2].(bool)
if len(raw) > 3 {
reason, _ = raw[3].(string)
}
return
}
}
}
}
// Count sends a COUNT request and returns the count.
func (c *Client) Count(filters []interface{}) (count int64, err error) {
req := []interface{}{"COUNT", "count-sub"}
req = append(req, filters...)
if err = c.Send(req); err != nil {
return
}
ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return 0, errorf.E("timeout waiting for COUNT response")
case msg := <-c.countCh:
var raw []interface{}
if err = json.Unmarshal(msg, &raw); err != nil {
continue
}
if len(raw) >= 3 {
if subID, ok := raw[1].(string); ok && subID == "count-sub" {
// COUNT response format: ["COUNT", "subscription-id", count, approximate?]
if cnt, ok := raw[2].(float64); ok {
return int64(cnt), nil
}
}
}
}
}
}
// Auth sends an AUTH message with the signed event.
func (c *Client) Auth(ev *event.E) error {
evJSON := ev.Serialize()
var evMap map[string]interface{}
if err := json.Unmarshal(evJSON, &evMap); err != nil {
return errorf.E("failed to unmarshal event: %w", err)
}
return c.Send([]interface{}{"AUTH", evMap})
}
// GetEvents collects all events from a subscription until EOSE.
func (c *Client) GetEvents(subID string, filters []interface{}, timeout time.Duration) (events []*event.E, err error) {
ch, err := c.Subscribe(subID, filters)
if err != nil {
return
}
defer c.Unsubscribe(subID)
ctx, cancel := context.WithTimeout(c.ctx, timeout)
defer cancel()
for {
select {
case <-ctx.Done():
return events, nil
case msg, ok := <-ch:
if !ok {
return events, nil
}
var raw []interface{}
if err = json.Unmarshal(msg, &raw); err != nil {
continue
}
if len(raw) < 2 {
continue
}
typ, ok := raw[0].(string)
if !ok {
continue
}
switch typ {
case "EVENT":
if len(raw) >= 3 {
if evData, ok := raw[2].(map[string]interface{}); ok {
evJSON, _ := json.Marshal(evData)
ev := event.New()
if _, err = ev.Unmarshal(evJSON); err == nil {
events = append(events, ev)
}
}
}
case "EOSE":
// End of stored events - return what we have
return events, nil
}
}
}
}

131
relay-tester/keys.go Normal file
View File

@@ -0,0 +1,131 @@
package relaytester
import (
"crypto/rand"
"fmt"
"time"
"lol.mleku.dev/chk"
"next.orly.dev/pkg/crypto/p256k"
"next.orly.dev/pkg/encoders/bech32encoding"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/hex"
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag"
)
// KeyPair represents a test keypair.
type KeyPair struct {
Secret *p256k.Signer
Pubkey []byte
Nsec string
Npub string
}
// GenerateKeyPair generates a new keypair for testing.
func GenerateKeyPair() (kp *KeyPair, err error) {
kp = &KeyPair{}
kp.Secret = &p256k.Signer{}
if err = kp.Secret.Generate(); chk.E(err) {
return
}
kp.Pubkey = kp.Secret.Pub()
nsecBytes, err := bech32encoding.BinToNsec(kp.Secret.Sec())
if chk.E(err) {
return
}
kp.Nsec = string(nsecBytes)
npubBytes, err := bech32encoding.BinToNpub(kp.Pubkey)
if chk.E(err) {
return
}
kp.Npub = string(npubBytes)
return
}
// CreateEvent creates a signed event with the given parameters.
func CreateEvent(signer *p256k.Signer, kindNum uint16, content string, tags *tag.S) (ev *event.E, err error) {
ev = event.New()
ev.CreatedAt = time.Now().Unix()
ev.Kind = kindNum
ev.Content = []byte(content)
if tags != nil {
ev.Tags = tags
} else {
ev.Tags = tag.NewS()
}
if err = ev.Sign(signer); chk.E(err) {
return
}
return
}
// CreateEventWithTags creates an event with specific tags.
func CreateEventWithTags(signer *p256k.Signer, kindNum uint16, content string, tagPairs [][]string) (ev *event.E, err error) {
tags := tag.NewS()
for _, pair := range tagPairs {
if len(pair) >= 2 {
// Build tag fields as []byte variadic arguments
tagFields := make([][]byte, len(pair))
tagFields[0] = []byte(pair[0])
for i := 1; i < len(pair); i++ {
tagFields[i] = []byte(pair[i])
}
tags.Append(tag.NewFromBytesSlice(tagFields...))
}
}
return CreateEvent(signer, kindNum, content, tags)
}
// CreateReplaceableEvent creates a replaceable event (kind 0-3, 10000-19999).
func CreateReplaceableEvent(signer *p256k.Signer, kindNum uint16, content string) (ev *event.E, err error) {
return CreateEvent(signer, kindNum, content, nil)
}
// CreateEphemeralEvent creates an ephemeral event (kind 20000-29999).
func CreateEphemeralEvent(signer *p256k.Signer, kindNum uint16, content string) (ev *event.E, err error) {
return CreateEvent(signer, kindNum, content, nil)
}
// CreateDeleteEvent creates a deletion event (kind 5).
func CreateDeleteEvent(signer *p256k.Signer, eventIDs [][]byte, reason string) (ev *event.E, err error) {
tags := tag.NewS()
for _, id := range eventIDs {
// e tags must contain hex-encoded event IDs
tags.Append(tag.NewFromBytesSlice([]byte("e"), []byte(hex.Enc(id))))
}
if reason != "" {
tags.Append(tag.NewFromBytesSlice([]byte("content"), []byte(reason)))
}
return CreateEvent(signer, kind.EventDeletion.K, reason, tags)
}
// CreateParameterizedReplaceableEvent creates a parameterized replaceable event (kind 30000-39999).
func CreateParameterizedReplaceableEvent(signer *p256k.Signer, kindNum uint16, content string, dTag string) (ev *event.E, err error) {
tags := tag.NewS()
tags.Append(tag.NewFromBytesSlice([]byte("d"), []byte(dTag)))
return CreateEvent(signer, kindNum, content, tags)
}
// RandomID generates a random 32-byte ID.
func RandomID() (id []byte, err error) {
id = make([]byte, 32)
if _, err = rand.Read(id); err != nil {
return nil, fmt.Errorf("failed to generate random ID: %w", err)
}
return
}
// MustHex decodes a hex string or panics.
func MustHex(s string) []byte {
b, err := hex.Dec(s)
if err != nil {
panic(fmt.Sprintf("invalid hex: %s", s))
}
return b
}
// HexID returns the hex-encoded event ID.
func HexID(ev *event.E) string {
return hex.Enc(ev.ID)
}

430
relay-tester/test.go Normal file
View File

@@ -0,0 +1,430 @@
package relaytester
import (
"encoding/json"
"time"
"lol.mleku.dev/errorf"
)
// TestResult represents the result of a test.
type TestResult struct {
Name string `json:"test"`
Pass bool `json:"pass"`
Required bool `json:"required"`
Info string `json:"info,omitempty"`
}
// TestFunc is a function that runs a test case.
type TestFunc func(client *Client, key1, key2 *KeyPair) (result TestResult)
// TestCase represents a test case with dependencies.
type TestCase struct {
Name string
Required bool
Func TestFunc
Dependencies []string // Names of tests that must run before this one
}
// TestSuite runs all tests against a relay.
type TestSuite struct {
relayURL string
key1 *KeyPair
key2 *KeyPair
tests map[string]*TestCase
results map[string]TestResult
order []string
}
// NewTestSuite creates a new test suite.
func NewTestSuite(relayURL string) (suite *TestSuite, err error) {
suite = &TestSuite{
relayURL: relayURL,
tests: make(map[string]*TestCase),
results: make(map[string]TestResult),
}
if suite.key1, err = GenerateKeyPair(); err != nil {
return
}
if suite.key2, err = GenerateKeyPair(); err != nil {
return
}
suite.registerTests()
return
}
// AddTest adds a test case to the suite.
func (s *TestSuite) AddTest(tc *TestCase) {
s.tests[tc.Name] = tc
}
// registerTests registers all test cases.
func (s *TestSuite) registerTests() {
allTests := []*TestCase{
{
Name: "Publishes basic event",
Required: true,
Func: testPublishBasicEvent,
},
{
Name: "Finds event by ID",
Required: true,
Func: testFindByID,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Finds event by author",
Required: true,
Func: testFindByAuthor,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Finds event by kind",
Required: true,
Func: testFindByKind,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Finds event by tags",
Required: true,
Func: testFindByTags,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Finds by multiple tags",
Required: true,
Func: testFindByMultipleTags,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Finds by time range",
Required: true,
Func: testFindByTimeRange,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Rejects invalid signature",
Required: true,
Func: testRejectInvalidSignature,
},
{
Name: "Rejects future event",
Required: true,
Func: testRejectFutureEvent,
},
{
Name: "Rejects expired event",
Required: false,
Func: testRejectExpiredEvent,
},
{
Name: "Handles replaceable events",
Required: true,
Func: testReplaceableEvents,
},
{
Name: "Handles ephemeral events",
Required: false,
Func: testEphemeralEvents,
},
{
Name: "Handles parameterized replaceable events",
Required: true,
Func: testParameterizedReplaceableEvents,
},
{
Name: "Handles deletion events",
Required: true,
Func: testDeletionEvents,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Handles COUNT request",
Required: true,
Func: testCountRequest,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Handles limit parameter",
Required: true,
Func: testLimitParameter,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Handles multiple filters",
Required: true,
Func: testMultipleFilters,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Handles subscription close",
Required: true,
Func: testSubscriptionClose,
},
// Filter tests
{
Name: "Since and until filters are inclusive",
Required: true,
Func: testSinceUntilAreInclusive,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Limit zero works",
Required: true,
Func: testLimitZero,
},
// Find tests
{
Name: "Events are ordered from newest to oldest",
Required: true,
Func: testEventsOrderedFromNewestToOldest,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Newest events are returned when filter is limited",
Required: true,
Func: testNewestEventsWhenLimited,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Finds by pubkey and kind",
Required: true,
Func: testFindByPubkeyAndKind,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Finds by pubkey and tags",
Required: true,
Func: testFindByPubkeyAndTags,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Finds by kind and tags",
Required: true,
Func: testFindByKindAndTags,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Finds by scrape",
Required: true,
Func: testFindByScrape,
Dependencies: []string{"Publishes basic event"},
},
// Replaceable event tests
{
Name: "Replaces metadata",
Required: true,
Func: testReplacesMetadata,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Replaces contact list",
Required: true,
Func: testReplacesContactList,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Replaced events are still available by ID",
Required: false,
Func: testReplacedEventsStillAvailableByID,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Replaceable events replace older ones",
Required: true,
Func: testReplaceableEventRemovesPrevious,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Replaceable events rejected if a newer one exists",
Required: true,
Func: testReplaceableEventRejectedIfFuture,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Addressable events replace older ones",
Required: true,
Func: testAddressableEventRemovesPrevious,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Addressable events rejected if a newer one exists",
Required: true,
Func: testAddressableEventRejectedIfFuture,
Dependencies: []string{"Publishes basic event"},
},
// Deletion tests
{
Name: "Deletes by a-tag address",
Required: true,
Func: testDeleteByAddr,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Delete by a-tag deletes older but not newer",
Required: true,
Func: testDeleteByAddrOnlyDeletesOlder,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Delete by a-tag is bound by a-tag",
Required: true,
Func: testDeleteByAddrIsBoundByTag,
Dependencies: []string{"Publishes basic event"},
},
// Ephemeral tests
{
Name: "Ephemeral subscriptions work",
Required: false,
Func: testEphemeralSubscriptionsWork,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Persists ephemeral events",
Required: false,
Func: testPersistsEphemeralEvents,
Dependencies: []string{"Publishes basic event"},
},
// EOSE tests
{
Name: "Supports EOSE",
Required: true,
Func: testSupportsEose,
},
{
Name: "Closes complete subscriptions after EOSE",
Required: false,
Func: testClosesCompleteSubscriptionsAfterEose,
},
{
Name: "Keeps open incomplete subscriptions after EOSE",
Required: true,
Func: testKeepsOpenIncompleteSubscriptionsAfterEose,
},
// JSON tests
{
Name: "Accepts events with empty tags",
Required: false,
Func: testAcceptsEventsWithEmptyTags,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Accepts NIP-01 JSON escape sequences",
Required: true,
Func: testAcceptsNip1JsonEscapeSequences,
Dependencies: []string{"Publishes basic event"},
},
// Registration tests
{
Name: "Sends OK after EVENT",
Required: true,
Func: testSendsOkAfterEvent,
},
{
Name: "Verifies event signatures",
Required: true,
Func: testVerifiesSignatures,
},
{
Name: "Verifies event ID hashes",
Required: true,
Func: testVerifiesIdHashes,
},
}
for _, tc := range allTests {
s.AddTest(tc)
}
s.topologicalSort()
}
// topologicalSort orders tests based on dependencies.
func (s *TestSuite) topologicalSort() {
visited := make(map[string]bool)
temp := make(map[string]bool)
var visit func(name string)
visit = func(name string) {
if temp[name] {
return
}
if visited[name] {
return
}
temp[name] = true
if tc, exists := s.tests[name]; exists {
for _, dep := range tc.Dependencies {
visit(dep)
}
}
temp[name] = false
visited[name] = true
s.order = append(s.order, name)
}
for name := range s.tests {
if !visited[name] {
visit(name)
}
}
}
// Run runs all tests in the suite.
func (s *TestSuite) Run() (results []TestResult, err error) {
client, err := NewClient(s.relayURL)
if err != nil {
return nil, errorf.E("failed to connect to relay: %w", err)
}
defer client.Close()
for _, name := range s.order {
tc := s.tests[name]
if tc == nil {
continue
}
result := tc.Func(client, s.key1, s.key2)
result.Name = name
result.Required = tc.Required
s.results[name] = result
results = append(results, result)
time.Sleep(100 * time.Millisecond) // Small delay between tests
}
return
}
// RunTest runs a specific test by name.
func (s *TestSuite) RunTest(testName string) (result TestResult, err error) {
tc, exists := s.tests[testName]
if !exists {
return result, errorf.E("test %s not found", testName)
}
// Check dependencies
for _, dep := range tc.Dependencies {
if _, exists := s.results[dep]; !exists {
return result, errorf.E("test %s depends on %s which has not been run", testName, dep)
}
if !s.results[dep].Pass {
return result, errorf.E("test %s depends on %s which failed", testName, dep)
}
}
client, err := NewClient(s.relayURL)
if err != nil {
return result, errorf.E("failed to connect to relay: %w", err)
}
defer client.Close()
result = tc.Func(client, s.key1, s.key2)
result.Name = testName
result.Required = tc.Required
s.results[testName] = result
return
}
// GetResults returns all test results.
func (s *TestSuite) GetResults() map[string]TestResult {
return s.results
}
// FormatJSON formats results as JSON.
func FormatJSON(results []TestResult) (output string, err error) {
var data []byte
if data, err = json.Marshal(results); err != nil {
return
}
return string(data), nil
}

1859
relay-tester/tests.go Normal file

File diff suppressed because it is too large Load Diff

245
relay_test.go Normal file
View File

@@ -0,0 +1,245 @@
package main
import (
"fmt"
"net"
"os"
"path/filepath"
"testing"
"time"
lol "lol.mleku.dev"
"next.orly.dev/app/config"
"next.orly.dev/pkg/run"
relaytester "next.orly.dev/relay-tester"
)
var (
testRelayURL string
testName string
testJSON bool
keepDataDir bool
relayPort int
relayDataDir string
)
func TestRelay(t *testing.T) {
var err error
var relay *run.Relay
var relayURL string
// Determine relay URL
if testRelayURL != "" {
relayURL = testRelayURL
} else {
// Start local relay for testing
var port int
if relay, port, err = startTestRelay(); err != nil {
t.Fatalf("Failed to start test relay: %v", err)
}
defer func() {
if stopErr := relay.Stop(); stopErr != nil {
t.Logf("Error stopping relay: %v", stopErr)
}
}()
relayURL = fmt.Sprintf("ws://127.0.0.1:%d", port)
t.Logf("Waiting for relay to be ready at %s...", relayURL)
// Wait for relay to be ready - try connecting to verify it's up
if err = waitForRelay(relayURL, 10*time.Second); err != nil {
t.Fatalf("Relay not ready after timeout: %v", err)
}
t.Logf("Relay is ready at %s", relayURL)
}
// Create test suite
t.Logf("Creating test suite for %s...", relayURL)
suite, err := relaytester.NewTestSuite(relayURL)
if err != nil {
t.Fatalf("Failed to create test suite: %v", err)
}
t.Logf("Test suite created, running tests...")
// Run tests
var results []relaytester.TestResult
if testName != "" {
// Run specific test
result, err := suite.RunTest(testName)
if err != nil {
t.Fatalf("Failed to run test %s: %v", testName, err)
}
results = []relaytester.TestResult{result}
} else {
// Run all tests
if results, err = suite.Run(); err != nil {
t.Fatalf("Failed to run tests: %v", err)
}
}
// Output results
if testJSON {
jsonOutput, err := relaytester.FormatJSON(results)
if err != nil {
t.Fatalf("Failed to format JSON: %v", err)
}
fmt.Println(jsonOutput)
} else {
outputResults(results, t)
}
// Check if any required tests failed
for _, result := range results {
if result.Required && !result.Pass {
t.Errorf("Required test '%s' failed: %s", result.Name, result.Info)
}
}
}
func startTestRelay() (relay *run.Relay, port int, err error) {
cfg := &config.C{
AppName: "ORLY-TEST",
DataDir: relayDataDir,
Listen: "127.0.0.1",
Port: 0, // Always use random port, unless overridden via -port flag
HealthPort: 0,
EnableShutdown: false,
LogLevel: "warn",
DBLogLevel: "warn",
DBBlockCacheMB: 512,
DBIndexCacheMB: 256,
LogToStdout: false,
PprofHTTP: false,
ACLMode: "none",
AuthRequired: false,
AuthToWrite: false,
SubscriptionEnabled: false,
MonthlyPriceSats: 6000,
FollowListFrequency: time.Hour,
WebDisableEmbedded: false,
SprocketEnabled: false,
SpiderMode: "none",
PolicyEnabled: false,
}
// Use explicitly set port if provided via flag, otherwise find an available port
if relayPort > 0 {
cfg.Port = relayPort
} else {
var listener net.Listener
if listener, err = net.Listen("tcp", "127.0.0.1:0"); err != nil {
return nil, 0, fmt.Errorf("failed to find available port: %w", err)
}
addr := listener.Addr().(*net.TCPAddr)
cfg.Port = addr.Port
listener.Close()
}
// Set default data dir if not specified
if cfg.DataDir == "" {
tmpDir := filepath.Join(os.TempDir(), fmt.Sprintf("orly-test-%d", time.Now().UnixNano()))
cfg.DataDir = tmpDir
}
// Set up logging
lol.SetLogLevel(cfg.LogLevel)
// Create options
cleanup := !keepDataDir
opts := &run.Options{
CleanupDataDir: &cleanup,
}
// Start relay
if relay, err = run.Start(cfg, opts); err != nil {
return nil, 0, fmt.Errorf("failed to start relay: %w", err)
}
return relay, cfg.Port, nil
}
// waitForRelay waits for the relay to be ready by attempting to connect
func waitForRelay(url string, timeout time.Duration) error {
// Extract host:port from ws:// URL
addr := url
if len(url) > 7 && url[:5] == "ws://" {
addr = url[5:]
}
deadline := time.Now().Add(timeout)
attempts := 0
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", addr, 500*time.Millisecond)
if err == nil {
conn.Close()
return nil
}
attempts++
if attempts%10 == 0 {
// Log every 10th attempt (every second)
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("timeout waiting for relay at %s after %d attempts", url, attempts)
}
func outputResults(results []relaytester.TestResult, t *testing.T) {
passed := 0
failed := 0
requiredFailed := 0
for _, result := range results {
if result.Pass {
passed++
t.Logf("PASS: %s", result.Name)
} else {
failed++
if result.Required {
requiredFailed++
t.Errorf("FAIL (required): %s - %s", result.Name, result.Info)
} else {
t.Logf("FAIL (optional): %s - %s", result.Name, result.Info)
}
}
}
t.Logf("\nTest Summary:")
t.Logf(" Total: %d", len(results))
t.Logf(" Passed: %d", passed)
t.Logf(" Failed: %d", failed)
t.Logf(" Required Failed: %d", requiredFailed)
}
// TestMain allows custom test setup/teardown
func TestMain(m *testing.M) {
// Manually parse our custom flags to avoid conflicts with Go's test flags
for i := 1; i < len(os.Args); i++ {
arg := os.Args[i]
switch arg {
case "-relay-url":
if i+1 < len(os.Args) {
testRelayURL = os.Args[i+1]
i++
}
case "-test-name":
if i+1 < len(os.Args) {
testName = os.Args[i+1]
i++
}
case "-json":
testJSON = true
case "-keep-data":
keepDataDir = true
case "-port":
if i+1 < len(os.Args) {
fmt.Sscanf(os.Args[i+1], "%d", &relayPort)
i++
}
case "-data-dir":
if i+1 < len(os.Args) {
relayDataDir = os.Args[i+1]
i++
}
}
}
code := m.Run()
os.Exit(code)
}