Compare commits

..

12 Commits

Author SHA1 Message Date
54f65d8740 Enhance relay testing and event handling
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
- Updated TestRelay to include a wait mechanism for relay readiness, improving test reliability.
- Refactored startTestRelay to return the assigned port, allowing dynamic port assignment.
- Added timestamp validation in HandleEvent to reject events with timestamps more than one hour in the future.
- Introduced channels for handling OK and COUNT messages in the Client struct, improving message processing.
- Updated tests to reflect changes in event timestamp handling and increased wait times for event processing.
- Bumped version to v0.20.6 to reflect these enhancements.
2025-10-30 19:12:11 +00:00
2ff8b47410 bump to v0.20.5
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
2025-10-30 18:37:30 +00:00
ba2d35012c Enhance WebSocket connection management and error handling
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
- Set initial read deadline for connections to prevent premature timeouts on idle connections.
- Improved pong and ping handlers to extend read deadlines and handle timeout errors more effectively.
- Refined error logging for connection issues, distinguishing between timeouts and connection errors to enhance debugging.
- Updated subscriber delivery logic to handle timeouts gracefully, allowing for potential recovery without immediate disconnection.
2025-10-30 18:32:03 +00:00
b70f03bce0 Refactor policy script handling and improve fallback logic
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
- Renamed test functions for clarity, changing "NotRunning" to "Disabled" to better reflect the policy state.
- Updated policy checks to ensure that if the policy is disabled, it falls back to the default policy immediately.
- Enhanced error handling in the policy manager to ensure proper startup and running state management.
- Introduced a new method to ensure the policy is running, with timeout handling for startup completion.
- Bumped version to v0.20.3 to reflect these changes.
2025-10-30 18:22:56 +00:00
8954846864 Add relay testing framework and utilities
- Introduced a new `relaytester` package to facilitate testing of relay functionalities.
- Implemented a `TestSuite` structure to manage and execute various test cases against the relay.
- Added multiple test cases for event publishing, retrieval, and validation, ensuring comprehensive coverage of relay behavior.
- Created utility functions for generating key pairs and events, enhancing test reliability and maintainability.
- Established a WebSocket client for interacting with the relay during tests, including subscription and message handling.
- Included JSON formatting for test results to improve output readability.
- This commit lays the groundwork for robust integration testing of relay features.
2025-10-30 18:14:22 +00:00
5e6c0b80aa Add Relay functionality for managing startup and shutdown processes
- Introduced a new package `run` with a `Relay` struct to manage the lifecycle of a relay instance.
- Implemented `Start` and `Stop` methods for initializing and gracefully shutting down the relay, including options for log capturing and data directory cleanup.
- Added methods to retrieve captured stdout and stderr logs.
- Enhanced configuration handling for data directory and logging based on user-defined options.
2025-10-30 17:57:57 +00:00
80ab3caa5f Implement policy-based event filtering and add integration tests
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
- Enhanced the HandleReq function to incorporate policy checks for privileged events, ensuring only authorized users can access sensitive data.
- Introduced a new integration test suite for policy filtering, validating the behavior of event access based on user authentication and policy rules.
- Added a script to automate the policy filter integration tests, improving testing efficiency and reliability.
- Updated version to v0.20.2 to reflect the new features and improvements.
2025-10-30 17:51:15 +00:00
62f244d114 Refactor event handling and testing utilities
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
- Updated the HandleReq function to improve event filtering logic, ensuring that privileged events are consistently checked against user access levels.
- Refactored event deduplication to utilize filtered events instead of all events, enhancing performance and clarity.
- Enhanced test utilities by generating keypairs for event creation, ensuring proper signing and validation in tests.
- Updated various test cases to use the new event creation methods, improving reliability and maintainability of tests.
- Bumped version to reflect changes made.
2025-10-30 15:53:02 +00:00
88ebf6eccc Update WebSocket implementation to use Gorilla WebSocket library
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
- Replaced the existing `github.com/coder/websocket` package with `github.com/gorilla/websocket` for improved functionality and compatibility.
- Adjusted WebSocket connection handling, including message reading and writing, to align with the new library's API.
- Enhanced error handling and logging for WebSocket operations.
- Bumped version to v0.20.0 to reflect the changes made.
2025-10-30 15:20:39 +00:00
4f97cb9a42 Improve error handling and logging in message processing
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
- Updated HandleMessage function to include the actual message content in error notices for invalid messages, enhancing clarity for debugging.
- Added nil check in Value method of tag encoder to prevent potential nil pointer dereference.
- Refactored tag handling in spider to utilize hex-encoded pubkeys for improved compatibility and clarity in filter creation.
- bump to v0.19.10
2025-10-28 20:26:09 +00:00
df67538af2 Refactor Marshal function in filter encoder
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
- Simplified the key-value appending logic in the Marshal function by replacing manual string appending with the text.AppendQuote method for better readability and maintainability.
- Updated version number to v0.19.9 to reflect the changes made.
2025-10-28 19:23:27 +00:00
f5d13a6807 Update error handling and logging in message processing
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
- Modified the HandleMessage function to avoid logging actual message content and instead send generic error notices to clients for invalid or malformed messages, enhancing security by preventing exposure of binary data.
- Updated the NostrEscape function to escape all control characters to ensure valid JSON, preventing parsing errors with binary data.
- Adjusted policy checks to handle hex-encoded pubkeys correctly, ensuring compatibility with the updated encoding scheme.
- Introduced blackout period for relay connections after exceeding maximum reconnection delays, improving connection stability and management.
2025-10-28 19:12:02 +00:00
40 changed files with 4031 additions and 397 deletions

View File

@@ -176,6 +176,18 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
}
return
}
// validate timestamp - reject events too far in the future (more than 1 hour)
now := time.Now().Unix()
if env.E.CreatedAt > now+3600 {
if err = Ok.Invalid(
l, env,
"timestamp too far in the future",
); chk.E(err) {
return
}
return
}
// verify the signature
var ok bool
if ok, err = env.Verify(); chk.T(err) {

View File

@@ -75,9 +75,7 @@ func (l *Listener) HandleMessage(msg []byte, remote string) {
// Validate message for invalid characters before processing
if err := validateJSONMessage(msg); err != nil {
log.E.F("%s message validation FAILED (len=%d): %v", remote, len(msg), err)
log.T.F("%s invalid message content: %q", remote, msgPreview)
// Send error notice to client
if noticeErr := noticeenvelope.NewFrom("invalid message format: " + err.Error()).Write(l); noticeErr != nil {
if noticeErr := noticeenvelope.NewFrom(fmt.Sprintf("invalid message format: contains invalid characters: %s", msg)).Write(l); noticeErr != nil {
log.E.F("%s failed to send validation error notice: %v", remote, noticeErr)
}
return
@@ -94,10 +92,10 @@ func (l *Listener) HandleMessage(msg []byte, remote string) {
"%s envelope identification FAILED (len=%d): %v", remote, len(msg),
err,
)
log.T.F("%s malformed message content: %q", remote, msgPreview)
// Don't log message preview as it may contain binary data
chk.E(err)
// Send error notice to client
if noticeErr := noticeenvelope.NewFrom("malformed message: " + err.Error()).Write(l); noticeErr != nil {
if noticeErr := noticeenvelope.NewFrom("malformed message").Write(l); noticeErr != nil {
log.E.F(
"%s failed to send malformed message notice: %v", remote,
noticeErr,
@@ -132,18 +130,18 @@ func (l *Listener) HandleMessage(msg []byte, remote string) {
default:
err = fmt.Errorf("unknown envelope type %s", t)
log.E.F(
"%s unknown envelope type: %s (payload: %q)", remote, t,
string(rem),
"%s unknown envelope type: %s (payload_len: %d)", remote, t,
len(rem),
)
}
// Handle any processing errors
if err != nil {
log.E.F("%s message processing FAILED (type=%s): %v", remote, t, err)
log.T.F("%s error context - original message: %q", remote, msgPreview)
// Don't log message preview as it may contain binary data
// Send error notice to client
noticeMsg := fmt.Sprintf("%s: %s", t, err.Error())
// Send error notice to client (use generic message to avoid control chars in errors)
noticeMsg := fmt.Sprintf("%s processing failed", t)
if noticeErr := noticeenvelope.NewFrom(noticeMsg).Write(l); noticeErr != nil {
log.E.F(
"%s failed to send error notice after %s processing failure: %v",

View File

@@ -283,13 +283,13 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
if !authorized {
continue // not authorized to see this private event
}
tmp = append(tmp, ev)
continue
// Event has private tag and user is authorized - continue to privileged check
}
if l.Config.ACLMode != "none" &&
kind.IsPrivileged(ev.Kind) && accessLevel != "admin" { // admins can see all events
// Always filter privileged events based on kind, regardless of ACLMode
// Privileged events should only be sent to users who are authenticated and
// are either the event author or listed in p tags
if kind.IsPrivileged(ev.Kind) && accessLevel != "admin" { // admins can see all events
log.T.C(
func() string {
return fmt.Sprintf(
@@ -357,6 +357,57 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
)
}
} else {
// Check if policy defines this event as privileged (even if not in hardcoded list)
// Policy check will handle this later, but we can skip it here if not authenticated
// to avoid unnecessary processing
if l.policyManager != nil && l.policyManager.Manager != nil && l.policyManager.Manager.IsEnabled() {
rule, hasRule := l.policyManager.Rules[int(ev.Kind)]
if hasRule && rule.Privileged && accessLevel != "admin" {
pk := l.authedPubkey.Load()
if pk == nil {
// Not authenticated - cannot see policy-privileged events
log.T.C(
func() string {
return fmt.Sprintf(
"policy-privileged event %s denied - not authenticated",
ev.ID,
)
},
)
continue
}
// Policy check will verify authorization later, but we need to check
// if user is party to the event here
authorized := false
if utils.FastEqual(ev.Pubkey, pk) {
authorized = true
} else {
// Check p tags
pTags := ev.Tags.GetAll([]byte("p"))
for _, pTag := range pTags {
var pt []byte
if pt, err = hexenc.Dec(string(pTag.Value())); chk.E(err) {
continue
}
if utils.FastEqual(pt, pk) {
authorized = true
break
}
}
}
if !authorized {
log.T.C(
func() string {
return fmt.Sprintf(
"policy-privileged event %s does not contain the logged in pubkey %0x",
ev.ID, pk,
)
},
)
continue
}
}
}
tmp = append(tmp, ev)
}
}
@@ -384,27 +435,28 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
}
// Deduplicate events (in case chunk processing returned duplicates)
if len(allEvents) > 0 {
// Use events (already filtered for privileged/policy) instead of allEvents
if len(events) > 0 {
seen := make(map[string]struct{})
var deduplicatedEvents event.S
originalCount := len(allEvents)
for _, ev := range allEvents {
originalCount := len(events)
for _, ev := range events {
eventID := hexenc.Enc(ev.ID)
if _, exists := seen[eventID]; !exists {
seen[eventID] = struct{}{}
deduplicatedEvents = append(deduplicatedEvents, ev)
}
}
allEvents = deduplicatedEvents
if originalCount != len(allEvents) {
log.T.F("REQ %s: deduplicated %d events to %d unique events", env.Subscription, originalCount, len(allEvents))
events = deduplicatedEvents
if originalCount != len(events) {
log.T.F("REQ %s: deduplicated %d events to %d unique events", env.Subscription, originalCount, len(events))
}
}
// Apply managed ACL filtering for read access if managed ACL is active
if acl.Registry.Active.Load() == "managed" {
var aclFilteredEvents event.S
for _, ev := range allEvents {
for _, ev := range events {
// Check if event is banned
eventID := hex.EncodeToString(ev.ID)
if banned, err := l.getManagedACL().IsEventBanned(eventID); err == nil && banned {
@@ -430,13 +482,13 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
aclFilteredEvents = append(aclFilteredEvents, ev)
}
allEvents = aclFilteredEvents
events = aclFilteredEvents
}
// Apply private tag filtering - only show events with "private" tags to authorized users
var privateFilteredEvents event.S
authedPubkey := l.authedPubkey.Load()
for _, ev := range allEvents {
for _, ev := range events {
// Check if event has private tags
hasPrivateTag := false
var privatePubkey []byte
@@ -469,10 +521,10 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
log.D.F("private tag: filtering out event %s from unauthorized user", hexenc.Enc(ev.ID))
}
}
allEvents = privateFilteredEvents
events = privateFilteredEvents
seen := make(map[string]struct{})
for _, ev := range allEvents {
for _, ev := range events {
log.T.C(
func() string {
return fmt.Sprintf(

View File

@@ -7,7 +7,7 @@ import (
"strings"
"time"
"github.com/coder/websocket"
"github.com/gorilla/websocket"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/encoders/envelopes/authenvelope"
@@ -24,21 +24,16 @@ const (
// ClientMessageSizeLimit is the maximum message size that clients can handle
// This is set to 100MB to allow large messages
ClientMessageSizeLimit = 100 * 1024 * 1024 // 100MB
// CloseMessage denotes a close control message. The optional message
// payload contains a numeric code and text. Use the FormatCloseMessage
// function to format a close message payload.
CloseMessage = 8
// PingMessage denotes a ping control message. The optional message payload
// is UTF-8 encoded text.
PingMessage = 9
// PongMessage denotes a pong control message. The optional message payload
// is UTF-8 encoded text.
PongMessage = 10
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true // Allow all origins for proxy compatibility
},
}
func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
remote := GetRemoteFromReq(r)
@@ -62,16 +57,12 @@ whitelist:
defer cancel()
var err error
var conn *websocket.Conn
// Configure WebSocket accept options for proxy compatibility
acceptOptions := &websocket.AcceptOptions{
OriginPatterns: []string{"*"}, // Allow all origins for proxy compatibility
// Don't check origin when behind a proxy - let the proxy handle it
InsecureSkipVerify: true,
// Try to set a higher compression threshold to allow larger messages
CompressionMode: websocket.CompressionDisabled,
}
if conn, err = websocket.Accept(w, r, acceptOptions); chk.E(err) {
// Configure upgrader for this connection
upgrader.ReadBufferSize = int(DefaultMaxMessageSize)
upgrader.WriteBufferSize = int(DefaultMaxMessageSize)
if conn, err = upgrader.Upgrade(w, r, nil); chk.E(err) {
log.E.F("websocket accept failed from %s: %v", remote, err)
return
}
@@ -80,7 +71,11 @@ whitelist:
// Set read limit immediately after connection is established
conn.SetReadLimit(DefaultMaxMessageSize)
log.D.F("set read limit to %d bytes (%d MB) for %s", DefaultMaxMessageSize, DefaultMaxMessageSize/units.Mb, remote)
defer conn.CloseNow()
// Set initial read deadline - pong handler will extend it when pongs are received
conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
defer conn.Close()
listener := &Listener{
ctx: ctx,
Server: s,
@@ -109,6 +104,16 @@ whitelist:
log.D.F("AUTH challenge sent successfully to %s", remote)
}
ticker := time.NewTicker(DefaultPingWait)
// Set pong handler - extends read deadline when pongs are received
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
return nil
})
// Set ping handler - extends read deadline when pings are received
conn.SetPingHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(DefaultPongWait))
return conn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(DefaultWriteTimeout))
})
// Don't pass cancel to Pinger - it should not be able to cancel the connection context
go s.Pinger(ctx, conn, ticker)
defer func() {
@@ -154,12 +159,19 @@ whitelist:
return
}
var typ websocket.MessageType
var typ int
var msg []byte
log.T.F("waiting for message from %s", remote)
// Don't set read deadline here - it's set initially and extended by pong handler
// This prevents premature timeouts on idle connections with active subscriptions
if ctx.Err() != nil {
return
}
// Block waiting for message; rely on pings and context cancellation to detect dead peers
typ, msg, err = conn.Read(ctx)
// The read deadline is managed by the pong handler which extends it when pongs are received
typ, msg, err = conn.ReadMessage()
if err != nil {
// Check if the error is due to context cancellation
@@ -179,59 +191,82 @@ whitelist:
log.T.F("connection from %s closed: %v", remote, err)
return
}
// Handle timeout errors specifically - these can occur on idle connections
// but pongs should extend the deadline, so a timeout usually means dead connection
if strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded") {
log.T.F("connection from %s read timeout (likely dead connection): %v", remote, err)
return
}
// Handle message too big errors specifically
if strings.Contains(err.Error(), "MessageTooBig") ||
if strings.Contains(err.Error(), "message too large") ||
strings.Contains(err.Error(), "read limited at") {
log.D.F("client %s hit message size limit: %v", remote, err)
// Don't log this as an error since it's a client-side limit
// Just close the connection gracefully
return
}
status := websocket.CloseStatus(err)
switch status {
case websocket.StatusNormalClosure,
websocket.StatusGoingAway,
websocket.StatusNoStatusRcvd,
websocket.StatusAbnormalClosure,
websocket.StatusProtocolError:
log.T.F(
"connection from %s closed with status: %v", remote, status,
)
case websocket.StatusMessageTooBig:
// Check for websocket close errors
if websocket.IsCloseError(err, websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived,
websocket.CloseAbnormalClosure,
websocket.CloseUnsupportedData,
websocket.CloseInvalidFramePayloadData) {
log.T.F("connection from %s closed: %v", remote, err)
} else if websocket.IsCloseError(err, websocket.CloseMessageTooBig) {
log.D.F("client %s sent message too big: %v", remote, err)
default:
} else {
log.E.F("unexpected close error from %s: %v", remote, err)
}
return
}
if typ == PingMessage {
if typ == websocket.PingMessage {
log.D.F("received PING from %s, sending PONG", remote)
// Create a write context with timeout for pong response
writeCtx, writeCancel := context.WithTimeout(
ctx, DefaultWriteTimeout,
)
deadline := time.Now().Add(DefaultWriteTimeout)
conn.SetWriteDeadline(deadline)
pongStart := time.Now()
if err = conn.Write(writeCtx, PongMessage, msg); chk.E(err) {
if err = conn.WriteControl(websocket.PongMessage, msg, deadline); err != nil {
pongDuration := time.Since(pongStart)
log.E.F(
"failed to send PONG to %s after %v: %v", remote,
pongDuration, err,
)
if writeCtx.Err() != nil {
// Check if this is a timeout vs a connection error
isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded")
isConnectionError := strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "broken pipe") ||
strings.Contains(err.Error(), "connection reset") ||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived)
if isConnectionError {
log.E.F(
"PONG write timeout to %s after %v (limit=%v)", remote,
pongDuration, DefaultWriteTimeout,
"failed to send PONG to %s after %v (connection error): %v", remote,
pongDuration, err,
)
return
} else if isTimeout {
// Timeout on pong - log but don't close immediately
// The read deadline will catch dead connections
log.W.F(
"failed to send PONG to %s after %v (timeout, but connection may still be alive): %v", remote,
pongDuration, err,
)
// Continue - don't close connection on pong timeout
} else {
// Unknown error - log and continue
log.E.F(
"failed to send PONG to %s after %v (unknown error): %v", remote,
pongDuration, err,
)
// Continue - don't close on unknown errors
}
writeCancel()
return
continue
}
pongDuration := time.Since(pongStart)
log.D.F("sent PONG to %s successfully in %v", remote, pongDuration)
if pongDuration > time.Millisecond*50 {
log.D.F("SLOW PONG to %s: %v (>50ms)", remote, pongDuration)
}
writeCancel()
continue
}
// Log message size for debugging
@@ -260,27 +295,47 @@ func (s *Server) Pinger(
pingCount++
log.D.F("sending PING #%d", pingCount)
// Create a write context with timeout for ping operation
pingCtx, pingCancel := context.WithTimeout(ctx, DefaultWriteTimeout)
// Set write deadline for ping operation
deadline := time.Now().Add(DefaultWriteTimeout)
conn.SetWriteDeadline(deadline)
pingStart := time.Now()
if err = conn.Ping(pingCtx); err != nil {
if err = conn.WriteControl(websocket.PingMessage, []byte{}, deadline); err != nil {
pingDuration := time.Since(pingStart)
log.E.F(
"PING #%d FAILED after %v: %v", pingCount, pingDuration,
err,
)
if pingCtx.Err() != nil {
// Check if this is a timeout vs a connection error
isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded")
isConnectionError := strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "broken pipe") ||
strings.Contains(err.Error(), "connection reset") ||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived)
if isConnectionError {
log.E.F(
"PING #%d timeout after %v (limit=%v)", pingCount,
pingDuration, DefaultWriteTimeout,
"PING #%d FAILED after %v (connection error): %v", pingCount, pingDuration,
err,
)
chk.E(err)
return
} else if isTimeout {
// Timeout on ping - log but don't stop pinger immediately
// The read deadline will catch dead connections
log.W.F(
"PING #%d timeout after %v (connection may still be alive): %v", pingCount, pingDuration,
err,
)
// Continue - don't stop pinger on timeout
} else {
// Unknown error - log and continue
log.E.F(
"PING #%d FAILED after %v (unknown error): %v", pingCount, pingDuration,
err,
)
// Continue - don't stop pinger on unknown errors
}
chk.E(err)
pingCancel()
return
continue
}
pingDuration := time.Since(pingStart)
@@ -289,8 +344,6 @@ func (s *Server) Pinger(
if pingDuration > time.Millisecond*100 {
log.D.F("SLOW PING #%d: %v (>100ms)", pingCount, pingDuration)
}
pingCancel()
case <-ctx.Done():
log.T.F("pinger context cancelled after %d pings", pingCount)
return

View File

@@ -3,9 +3,10 @@ package app
import (
"context"
"net/http"
"strings"
"time"
"github.com/coder/websocket"
"github.com/gorilla/websocket"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/acl"
@@ -54,14 +55,12 @@ func (l *Listener) Write(p []byte) (n int, err error) {
// Use a separate context with timeout for writes to prevent race conditions
// where the main connection context gets cancelled while writing events
writeCtx, cancel := context.WithTimeout(
context.Background(), DefaultWriteTimeout,
)
defer cancel()
deadline := time.Now().Add(DefaultWriteTimeout)
l.conn.SetWriteDeadline(deadline)
// Attempt the write operation
writeStart := time.Now()
if err = l.conn.Write(writeCtx, websocket.MessageText, p); err != nil {
if err = l.conn.WriteMessage(websocket.TextMessage, p); err != nil {
writeDuration := time.Since(writeStart)
totalDuration := time.Since(start)
@@ -72,7 +71,7 @@ func (l *Listener) Write(p []byte) (n int, err error) {
)
// Check if this is a context timeout
if writeCtx.Err() != nil {
if strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline") {
log.E.F(
"ws->%s write timeout after %v (limit=%v)", l.remote,
writeDuration, DefaultWriteTimeout,

View File

@@ -3,10 +3,11 @@ package app
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/coder/websocket"
"github.com/gorilla/websocket"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/acl"
@@ -270,15 +271,11 @@ func (p *P) Deliver(ev *event.E) {
// Use a separate context with timeout for writes to prevent race conditions
// where the publisher context gets cancelled while writing events
writeCtx, cancel := context.WithTimeout(
context.Background(), DefaultWriteTimeout,
)
defer cancel()
deadline := time.Now().Add(DefaultWriteTimeout)
d.w.SetWriteDeadline(deadline)
deliveryStart := time.Now()
if err = d.w.Write(
writeCtx, websocket.MessageText, msgData,
); err != nil {
if err = d.w.WriteMessage(websocket.TextMessage, msgData); err != nil {
deliveryDuration := time.Since(deliveryStart)
// Log detailed failure information
@@ -286,17 +283,36 @@ func (p *P) Deliver(ev *event.E) {
hex.Enc(ev.ID), d.sub.remote, d.id, deliveryDuration, err)
// Check for timeout specifically
if writeCtx.Err() != nil {
isTimeout := strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "deadline exceeded")
if isTimeout {
log.E.F("subscription delivery TIMEOUT: event=%s to=%s after %v (limit=%v)",
hex.Enc(ev.ID), d.sub.remote, deliveryDuration, DefaultWriteTimeout)
}
// Log connection cleanup
log.D.F("removing failed subscriber connection: %s", d.sub.remote)
// Only close connection on permanent errors, not transient timeouts
// WebSocket write errors typically indicate connection issues, but we should
// distinguish between timeouts (client might be slow) and connection errors
isConnectionError := strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "broken pipe") ||
strings.Contains(err.Error(), "connection reset") ||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived)
// On error, remove the subscriber connection safely
p.removeSubscriber(d.w)
_ = d.w.CloseNow()
if isConnectionError {
log.D.F("removing failed subscriber connection due to connection error: %s", d.sub.remote)
p.removeSubscriber(d.w)
_ = d.w.Close()
} else if isTimeout {
// For timeouts, log but don't immediately close - give it another chance
// The read deadline will catch dead connections eventually
log.W.F("subscription delivery timeout for %s (client may be slow), skipping event but keeping connection", d.sub.remote)
} else {
// Unknown error - be conservative and close
log.D.F("removing failed subscriber connection due to unknown error: %s", d.sub.remote)
p.removeSubscriber(d.w)
_ = d.w.Close()
}
continue
}

View File

@@ -0,0 +1,319 @@
package main
import (
"context"
"flag"
"fmt"
"os"
"strings"
"time"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/crypto/p256k"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/filter"
"next.orly.dev/pkg/encoders/hex"
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag"
"next.orly.dev/pkg/protocol/ws"
)
func main() {
var err error
url := flag.String("url", "ws://127.0.0.1:34568", "relay websocket URL")
allowedPubkeyHex := flag.String("allowed-pubkey", "", "hex-encoded allowed pubkey")
allowedSecHex := flag.String("allowed-sec", "", "hex-encoded allowed secret key")
unauthorizedPubkeyHex := flag.String("unauthorized-pubkey", "", "hex-encoded unauthorized pubkey")
unauthorizedSecHex := flag.String("unauthorized-sec", "", "hex-encoded unauthorized secret key")
timeout := flag.Duration("timeout", 10*time.Second, "operation timeout")
flag.Parse()
if *allowedPubkeyHex == "" || *allowedSecHex == "" {
log.E.F("required flags: -allowed-pubkey and -allowed-sec")
os.Exit(1)
}
if *unauthorizedPubkeyHex == "" || *unauthorizedSecHex == "" {
log.E.F("required flags: -unauthorized-pubkey and -unauthorized-sec")
os.Exit(1)
}
// Decode keys
allowedSecBytes, err := hex.Dec(*allowedSecHex)
if err != nil {
log.E.F("failed to decode allowed secret key: %v", err)
os.Exit(1)
}
allowedSigner := &p256k.Signer{}
if err = allowedSigner.InitSec(allowedSecBytes); chk.E(err) {
log.E.F("failed to initialize allowed signer: %v", err)
os.Exit(1)
}
unauthorizedSecBytes, err := hex.Dec(*unauthorizedSecHex)
if err != nil {
log.E.F("failed to decode unauthorized secret key: %v", err)
os.Exit(1)
}
unauthorizedSigner := &p256k.Signer{}
if err = unauthorizedSigner.InitSec(unauthorizedSecBytes); chk.E(err) {
log.E.F("failed to initialize unauthorized signer: %v", err)
os.Exit(1)
}
ctx, cancel := context.WithTimeout(context.Background(), *timeout)
defer cancel()
// Test 1: Authenticated as allowed pubkey - should work
fmt.Println("Test 1: Publishing event 30520 with allowed pubkey (authenticated)...")
if err := testWriteEvent(ctx, *url, 30520, allowedSigner, allowedSigner); err != nil {
fmt.Printf("❌ FAILED: %v\n", err)
os.Exit(1)
}
fmt.Println("✅ PASSED: Event published successfully")
// Test 2: Authenticated as allowed pubkey, then read event 10306 - should work
// First publish an event, then read it
fmt.Println("\nTest 2: Publishing and reading event 10306 with allowed pubkey (authenticated)...")
if err := testWriteEvent(ctx, *url, 10306, allowedSigner, allowedSigner); err != nil {
fmt.Printf("❌ FAILED to publish: %v\n", err)
os.Exit(1)
}
if err := testReadEvent(ctx, *url, 10306, allowedSigner); err != nil {
fmt.Printf("❌ FAILED to read: %v\n", err)
os.Exit(1)
}
fmt.Println("✅ PASSED: Event readable by allowed user")
// Test 3: Unauthenticated request - should be blocked
fmt.Println("\nTest 3: Publishing event 30520 without authentication...")
if err := testWriteEventUnauthenticated(ctx, *url, 30520, allowedSigner); err != nil {
fmt.Printf("✅ PASSED: Event correctly blocked (expected): %v\n", err)
} else {
fmt.Println("❌ FAILED: Event was allowed when it should have been blocked")
os.Exit(1)
}
// Test 4: Authenticated as unauthorized pubkey - should be blocked
fmt.Println("\nTest 4: Publishing event 30520 with unauthorized pubkey...")
if err := testWriteEvent(ctx, *url, 30520, unauthorizedSigner, unauthorizedSigner); err != nil {
fmt.Printf("✅ PASSED: Event correctly blocked (expected): %v\n", err)
} else {
fmt.Println("❌ FAILED: Event was allowed when it should have been blocked")
os.Exit(1)
}
// Test 5: Read event 10306 without authentication - should be blocked
// Event was published in test 2, so it exists in the database
fmt.Println("\nTest 5: Reading event 10306 without authentication (should be blocked)...")
// Wait a bit to ensure event is stored
time.Sleep(500 * time.Millisecond)
// If no error is returned, that means no events were received (which is correct)
// If an error is returned, it means an event was received (which is wrong)
if err := testReadEventUnauthenticated(ctx, *url, 10306); err != nil {
// If we got an error about receiving an event, that's a failure
if strings.Contains(err.Error(), "unexpected event received") {
fmt.Printf("❌ FAILED: %v\n", err)
os.Exit(1)
}
// Other errors (like connection errors) are also failures
fmt.Printf("❌ FAILED: Unexpected error: %v\n", err)
os.Exit(1)
}
fmt.Println("✅ PASSED: No events received (correctly filtered by policy)")
// Test 6: Read event 10306 with unauthorized pubkey - should be blocked
fmt.Println("\nTest 6: Reading event 10306 with unauthorized pubkey (should be blocked)...")
// If no error is returned, that means no events were received (which is correct)
// If an error is returned about receiving an event, that's a failure
if err := testReadEvent(ctx, *url, 10306, unauthorizedSigner); err != nil {
// Connection/subscription errors are failures
fmt.Printf("❌ FAILED: Unexpected error: %v\n", err)
os.Exit(1)
}
fmt.Println("✅ PASSED: No events received (correctly filtered by policy)")
fmt.Println("\n✅ All tests passed!")
}
func testWriteEvent(ctx context.Context, url string, kindNum uint16, eventSigner, authSigner *p256k.Signer) error {
rl, err := ws.RelayConnect(ctx, url)
if err != nil {
return fmt.Errorf("connect error: %w", err)
}
defer rl.Close()
// Send a REQ first to trigger AUTH challenge (when AuthToWrite is enabled)
// This is needed because challenges are sent on REQ, not on connect
limit := uint(1)
ff := filter.NewS(&filter.F{
Kinds: kind.NewS(kind.New(kindNum)),
Limit: &limit,
})
sub, err := rl.Subscribe(ctx, ff)
if err != nil {
return fmt.Errorf("subscription error (may be expected): %w", err)
}
// Wait a bit for challenge to arrive
time.Sleep(500 * time.Millisecond)
sub.Unsub()
// Authenticate
if err = rl.Auth(ctx, authSigner); err != nil {
return fmt.Errorf("auth error: %w", err)
}
// Create and sign event
ev := &event.E{
CreatedAt: time.Now().Unix(),
Kind: kind.K{K: kindNum}.K,
Tags: tag.NewS(),
Content: []byte(fmt.Sprintf("test event kind %d", kindNum)),
}
// Add p tag for privileged check
pTag := tag.NewFromAny("p", hex.Enc(authSigner.Pub()))
ev.Tags.Append(pTag)
// Add d tag for addressable events (kinds 30000-39999)
if kindNum >= 30000 && kindNum < 40000 {
dTag := tag.NewFromAny("d", "test")
ev.Tags.Append(dTag)
}
if err = ev.Sign(eventSigner); err != nil {
return fmt.Errorf("sign error: %w", err)
}
// Publish
if err = rl.Publish(ctx, ev); err != nil {
return fmt.Errorf("publish error: %w", err)
}
return nil
}
func testWriteEventUnauthenticated(ctx context.Context, url string, kindNum uint16, eventSigner *p256k.Signer) error {
rl, err := ws.RelayConnect(ctx, url)
if err != nil {
return fmt.Errorf("connect error: %w", err)
}
defer rl.Close()
// Do NOT authenticate
// Create and sign event
ev := &event.E{
CreatedAt: time.Now().Unix(),
Kind: kind.K{K: kindNum}.K,
Tags: tag.NewS(),
Content: []byte(fmt.Sprintf("test event kind %d (unauthenticated)", kindNum)),
}
// Add d tag for addressable events (kinds 30000-39999)
if kindNum >= 30000 && kindNum < 40000 {
dTag := tag.NewFromAny("d", "test")
ev.Tags.Append(dTag)
}
if err = ev.Sign(eventSigner); err != nil {
return fmt.Errorf("sign error: %w", err)
}
// Publish (should fail)
if err = rl.Publish(ctx, ev); err != nil {
return fmt.Errorf("publish error (expected): %w", err)
}
return nil
}
func testReadEvent(ctx context.Context, url string, kindNum uint16, authSigner *p256k.Signer) error {
rl, err := ws.RelayConnect(ctx, url)
if err != nil {
return fmt.Errorf("connect error: %w", err)
}
defer rl.Close()
// Send a REQ first to trigger AUTH challenge (when AuthToWrite is enabled)
// Then authenticate
ff := filter.NewS(&filter.F{
Kinds: kind.NewS(kind.New(kindNum)),
})
sub, err := rl.Subscribe(ctx, ff)
if err != nil {
return fmt.Errorf("subscription error: %w", err)
}
// Wait a bit for challenge to arrive
time.Sleep(500 * time.Millisecond)
// Authenticate
if err = rl.Auth(ctx, authSigner); err != nil {
sub.Unsub()
return fmt.Errorf("auth error: %w", err)
}
// Wait for events or timeout
// If we receive any events, return nil (success)
// If we don't receive events, also return nil (no events found, which may be expected)
select {
case ev := <-sub.Events:
if ev != nil {
sub.Unsub()
return nil // Event received
}
case <-sub.EndOfStoredEvents:
// EOSE received, no more events
sub.Unsub()
return nil
case <-time.After(5 * time.Second):
// No events received - this might be OK if no events exist or they're filtered
sub.Unsub()
return nil
case <-ctx.Done():
sub.Unsub()
return ctx.Err()
}
return nil
}
func testReadEventUnauthenticated(ctx context.Context, url string, kindNum uint16) error {
rl, err := ws.RelayConnect(ctx, url)
if err != nil {
return fmt.Errorf("connect error: %w", err)
}
defer rl.Close()
// Do NOT authenticate
// Subscribe to events
ff := filter.NewS(&filter.F{
Kinds: kind.NewS(kind.New(kindNum)),
})
sub, err := rl.Subscribe(ctx, ff)
if err != nil {
return fmt.Errorf("subscription error (may be expected): %w", err)
}
defer sub.Unsub()
// Wait for events or timeout
// If we receive any events, that's a failure (should be blocked)
select {
case ev := <-sub.Events:
if ev != nil {
return fmt.Errorf("unexpected event received: should have been blocked by policy (event ID: %s)", hex.Enc(ev.ID))
}
case <-sub.EndOfStoredEvents:
// EOSE received, no events (this is expected for unauthenticated privileged events)
return nil
case <-time.After(5 * time.Second):
// No events received - this is expected for unauthenticated requests
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}

2
go.mod
View File

@@ -4,9 +4,9 @@ go 1.25.0
require (
github.com/adrg/xdg v0.5.3
github.com/coder/websocket v1.8.14
github.com/davecgh/go-spew v1.1.1
github.com/dgraph-io/badger/v4 v4.8.0
github.com/gorilla/websocket v1.5.3
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0
github.com/klauspost/cpuid/v2 v2.3.0
github.com/pkg/profile v1.7.0

4
go.sum
View File

@@ -13,8 +13,6 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/chzyer/test v1.0.0/go.mod h1:2JlltgoNkt4TW/z9V/IzDdFaMTM2JPIi26O1pF38GC8=
github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g=
github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -45,6 +43,8 @@ github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8I
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/google/pprof v0.0.0-20251007162407-5df77e3f7d1d h1:KJIErDwbSHjnp/SGzE5ed8Aol7JsKiI5X7yWKAtzhM0=
github.com/google/pprof v0.0.0-20251007162407-5df77e3f7d1d/go.mod h1:I6V7YzU0XDpsHqbsyrghnFZLO1gwK6NPTNvmetQIk9U=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=

View File

@@ -10,7 +10,7 @@ import (
"sync"
"time"
"github.com/coder/websocket"
"github.com/gorilla/websocket"
"lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
@@ -396,12 +396,15 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
headers.Set("Origin", "https://orly.dev")
// Use proper WebSocket dial options
dialOptions := &websocket.DialOptions{
HTTPHeader: headers,
dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
}
c, _, err := websocket.Dial(connCtx, u, dialOptions)
c, resp, err := dialer.DialContext(connCtx, u, headers)
cancel()
if resp != nil {
resp.Body.Close()
}
if err != nil {
log.W.F("follows syncer: dial %s failed: %v", u, err)
@@ -480,13 +483,12 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
req := reqenvelope.NewFrom([]byte(subID), ff)
reqBytes := req.Marshal(nil)
log.T.F("follows syncer: outbound REQ to %s: %s", u, string(reqBytes))
if err = c.Write(
ctx, websocket.MessageText, reqBytes,
); chk.E(err) {
c.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err = c.WriteMessage(websocket.TextMessage, reqBytes); chk.E(err) {
log.W.F(
"follows syncer: failed to send event REQ to %s: %v", u, err,
)
_ = c.Close(websocket.StatusInternalError, "write failed")
_ = c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "write failed"), time.Now().Add(time.Second))
continue
}
log.T.F(
@@ -501,11 +503,12 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
for {
select {
case <-ctx.Done():
_ = c.Close(websocket.StatusNormalClosure, "ctx done")
_ = c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "ctx done"), time.Now().Add(time.Second))
return
case <-keepaliveTicker.C:
// Send ping to keep connection alive
if err := c.Ping(ctx); err != nil {
c.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err := c.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil {
log.T.F("follows syncer: ping failed for %s: %v", u, err)
break readLoop
}
@@ -513,11 +516,10 @@ func (f *Follows) startEventSubscriptions(ctx context.Context) {
continue
default:
// Set a read timeout to avoid hanging
readCtx, readCancel := context.WithTimeout(ctx, 60*time.Second)
_, data, err := c.Read(readCtx)
readCancel()
c.SetReadDeadline(time.Now().Add(60 * time.Second))
_, data, err := c.ReadMessage()
if err != nil {
_ = c.Close(websocket.StatusNormalClosure, "read err")
_ = c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "read err"), time.Now().Add(time.Second))
break readLoop
}
label, rem, err := envelopes.Identify(data)
@@ -714,16 +716,19 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) {
headers.Set("Origin", "https://orly.dev")
// Use proper WebSocket dial options
dialOptions := &websocket.DialOptions{
HTTPHeader: headers,
dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
}
c, _, err := websocket.Dial(ctx, relayURL, dialOptions)
c, resp, err := dialer.DialContext(ctx, relayURL, headers)
if resp != nil {
resp.Body.Close()
}
if err != nil {
log.W.F("follows syncer: failed to connect to %s for follow list fetch: %v", relayURL, err)
return
}
defer c.Close(websocket.StatusNormalClosure, "follow list fetch complete")
defer c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "follow list fetch complete"), time.Now().Add(time.Second))
log.I.F("follows syncer: fetching follow lists from relay %s", relayURL)
@@ -746,7 +751,8 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) {
req := reqenvelope.NewFrom([]byte(subID), ff)
reqBytes := req.Marshal(nil)
log.T.F("follows syncer: outbound REQ to %s: %s", relayURL, string(reqBytes))
if err = c.Write(ctx, websocket.MessageText, reqBytes); chk.E(err) {
c.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err = c.WriteMessage(websocket.TextMessage, reqBytes); chk.E(err) {
log.W.F("follows syncer: failed to send follow list REQ to %s: %v", relayURL, err)
return
}
@@ -769,7 +775,8 @@ func (f *Follows) fetchFollowListsFromRelay(relayURL string, authors [][]byte) {
default:
}
_, data, err := c.Read(ctx)
c.SetReadDeadline(time.Now().Add(10 * time.Second))
_, data, err := c.ReadMessage()
if err != nil {
log.T.F("follows syncer: error reading events from %s: %v", relayURL, err)
goto processEvents

View File

@@ -208,18 +208,16 @@ func (f *F) Marshal(dst []byte) (b []byte) {
} else {
first = true
}
// append the key with # prefix
dst = append(dst, '"', '#', tKey[0], '"', ':')
dst = append(dst, '[')
for i, value := range values {
dst = append(dst, '"')
dst = append(dst, value...)
dst = append(dst, '"')
if i < len(values)-1 {
dst = append(dst, ',')
}
// append the key with # prefix
dst = append(dst, '"', '#', tKey[0], '"', ':')
dst = append(dst, '[')
for i, value := range values {
dst = text.AppendQuote(dst, value, text.NostrEscape)
if i < len(values)-1 {
dst = append(dst, ',')
}
dst = append(dst, ']')
}
dst = append(dst, ']')
}
}
if f.Since != nil && f.Since.U64() > 0 {

View File

@@ -144,6 +144,7 @@ func (t *T) Key() (key []byte) {
}
func (t *T) Value() (key []byte) {
if t==nil {return}
if len(t.T) > Value {
return t.T[Value]
}

View File

@@ -20,31 +20,50 @@ package text
// - A form feed, 0x0C, as \f
//
// UTF-8 should be used for encoding.
//
// NOTE: We also escape all other control characters (0x00-0x1F excluding those above)
// to ensure valid JSON, even though NIP-01 doesn't require it. This prevents
// JSON parsing errors when events with binary data in content are sent to relays.
func NostrEscape(dst, src []byte) []byte {
l := len(src)
for i := 0; i < l; i++ {
c := src[i]
switch {
case c == '"':
if c == '"' {
dst = append(dst, '\\', '"')
case c == '\\':
} else if c == '\\' {
// if i+1 < l && src[i+1] == 'u' || i+1 < l && src[i+1] == '/' {
if i+1 < l && src[i+1] == 'u' {
dst = append(dst, '\\')
} else {
dst = append(dst, '\\', '\\')
}
case c == '\b':
} else if c == '\b' {
dst = append(dst, '\\', 'b')
case c == '\t':
} else if c == '\t' {
dst = append(dst, '\\', 't')
case c == '\n':
} else if c == '\n' {
dst = append(dst, '\\', 'n')
case c == '\f':
} else if c == '\f' {
dst = append(dst, '\\', 'f')
case c == '\r':
} else if c == '\r' {
dst = append(dst, '\\', 'r')
default:
} else if c < 32 {
// Escape all other control characters (0x00-0x1F except those handled above) as \uXXXX
// This ensures valid JSON even when content contains binary data
dst = append(dst, '\\', 'u', '0', '0')
hexHigh := (c >> 4) & 0x0F
hexLow := c & 0x0F
if hexHigh < 10 {
dst = append(dst, byte('0'+hexHigh))
} else {
dst = append(dst, byte('a'+(hexHigh-10)))
}
if hexLow < 10 {
dst = append(dst, byte('0'+hexLow))
} else {
dst = append(dst, byte('a'+(hexLow-10)))
}
} else {
dst = append(dst, c)
}
}
@@ -91,14 +110,46 @@ func NostrUnescape(dst []byte) (b []byte) {
dst[w] = '\r'
w++
// special cases for non-nip-01 specified json escapes (must be
// preserved for ID generation).
case c == 'u':
dst[w] = '\\'
w++
dst[w] = 'u'
w++
case c == '/':
// special cases for non-nip-01 specified json escapes (must be
// preserved for ID generation).
case c == 'u':
// Check if this is a \u0000-\u001F sequence we generated
if r+4 < len(dst) && dst[r+1] == '0' && dst[r+2] == '0' {
// Extract hex digits
hexHigh := dst[r+3]
hexLow := dst[r+4]
var val byte
if hexHigh >= '0' && hexHigh <= '9' {
val = (hexHigh - '0') << 4
} else if hexHigh >= 'a' && hexHigh <= 'f' {
val = (hexHigh - 'a' + 10) << 4
} else if hexHigh >= 'A' && hexHigh <= 'F' {
val = (hexHigh - 'A' + 10) << 4
}
if hexLow >= '0' && hexLow <= '9' {
val |= hexLow - '0'
} else if hexLow >= 'a' && hexLow <= 'f' {
val |= hexLow - 'a' + 10
} else if hexLow >= 'A' && hexLow <= 'F' {
val |= hexLow - 'A' + 10
}
// Only decode if it's a control character (0x00-0x1F)
if val < 32 {
dst[w] = val
w++
r += 4 // Skip the u00XX part
continue
}
}
// Not our generated \u0000-\u001F, preserve as-is
dst[w] = '\\'
w++
dst[w] = 'u'
w++
case c == '/':
dst[w] = '\\'
w++
dst[w] = '/'

View File

@@ -8,20 +8,27 @@ import (
"testing"
"time"
"lol.mleku.dev/chk"
"next.orly.dev/pkg/crypto/p256k"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/hex"
"next.orly.dev/pkg/encoders/tag"
)
// Helper function to create test event
func createTestEventBench(id, pubkey, content string, kind uint16) *event.E {
return &event.E{
ID: []byte(id),
Kind: kind,
Pubkey: []byte(pubkey),
Content: []byte(content),
Tags: &tag.S{},
CreatedAt: time.Now().Unix(),
// Helper function to create test event for benchmarks (reuses signer)
func createTestEventBench(b *testing.B, signer *p256k.Signer, content string, kind uint16) *event.E {
ev := event.New()
ev.CreatedAt = time.Now().Unix()
ev.Kind = kind
ev.Content = []byte(content)
ev.Tags = tag.NewS()
// Sign the event properly
if err := ev.Sign(signer); chk.E(err) {
b.Fatalf("Failed to sign test event: %v", err)
}
return ev
}
func BenchmarkCheckKindsPolicy(b *testing.B) {
@@ -38,12 +45,13 @@ func BenchmarkCheckKindsPolicy(b *testing.B) {
}
func BenchmarkCheckRulePolicy(b *testing.B) {
// Create test event
testEvent := createTestEventBench("test-event-id", "test-pubkey", "test content", 1)
// Generate keypair once for all events
signer, pubkey := generateTestKeypairB(b)
testEvent := createTestEventBench(b, signer, "test content", 1)
rule := Rule{
Description: "test rule",
WriteAllow: []string{"test-pubkey"},
WriteAllow: []string{hex.Enc(pubkey)},
SizeLimit: int64Ptr(10000),
ContentLimit: int64Ptr(1000),
MustHaveTags: []string{"p"},
@@ -53,13 +61,14 @@ func BenchmarkCheckRulePolicy(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
policy.checkRulePolicy("write", testEvent, rule, []byte("test-pubkey"))
policy.checkRulePolicy("write", testEvent, rule, pubkey)
}
}
func BenchmarkCheckPolicy(b *testing.B) {
// Create test event
testEvent := createTestEventBench("test-event-id", "test-pubkey", "test content", 1)
// Generate keypair once for all events
signer, pubkey := generateTestKeypairB(b)
testEvent := createTestEventBench(b, signer, "test content", 1)
policy := &P{
Kind: Kinds{
@@ -68,14 +77,14 @@ func BenchmarkCheckPolicy(b *testing.B) {
Rules: map[int]Rule{
1: {
Description: "test rule",
WriteAllow: []string{"test-pubkey"},
WriteAllow: []string{hex.Enc(pubkey)},
},
},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
policy.CheckPolicy("write", testEvent, []byte("test-pubkey"), "127.0.0.1")
policy.CheckPolicy("write", testEvent, pubkey, "127.0.0.1")
}
}
@@ -114,8 +123,9 @@ done
// Give the script time to start
time.Sleep(100 * time.Millisecond)
// Create test event
testEvent := createTestEventBench("test-event-id", "test-pubkey", "test content", 1)
// Generate keypair once for all events
signer, pubkey := generateTestKeypairB(b)
testEvent := createTestEventBench(b, signer, "test content", 1)
policy := &P{
Manager: manager,
@@ -130,7 +140,7 @@ done
b.ResetTimer()
for i := 0; i < b.N; i++ {
policy.CheckPolicy("write", testEvent, []byte("test-pubkey"), "127.0.0.1")
policy.CheckPolicy("write", testEvent, pubkey, "127.0.0.1")
}
}
@@ -190,16 +200,19 @@ func BenchmarkCheckPolicyMultipleKinds(b *testing.B) {
Rules: rules,
}
// Generate keypair once for all events
signer, pubkey := generateTestKeypairB(b)
// Create test events with different kinds
events := make([]*event.E, 100)
for i := 0; i < 100; i++ {
events[i] = createTestEvent("test-event-id", "test-pubkey", "test content", uint16(i+1))
events[i] = createTestEventBench(b, signer, "test content", uint16(i+1))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
event := events[i%100]
policy.CheckPolicy("write", event, []byte("test-pubkey"), "127.0.0.1")
policy.CheckPolicy("write", event, pubkey, "127.0.0.1")
}
}
@@ -217,11 +230,13 @@ func BenchmarkCheckPolicyLargeWhitelist(b *testing.B) {
Rules: map[int]Rule{},
}
testEvent := createTestEvent("test-event-id", "test-pubkey", "test content", 500) // Kind in the middle of the whitelist
// Generate keypair once for all events
signer, pubkey := generateTestKeypairB(b)
testEvent := createTestEventBench(b, signer, "test content", 500) // Kind in the middle of the whitelist
b.ResetTimer()
for i := 0; i < b.N; i++ {
policy.CheckPolicy("write", testEvent, []byte("test-pubkey"), "127.0.0.1")
policy.CheckPolicy("write", testEvent, pubkey, "127.0.0.1")
}
}
@@ -239,22 +254,25 @@ func BenchmarkCheckPolicyLargeBlacklist(b *testing.B) {
Rules: map[int]Rule{},
}
testEvent := createTestEvent("test-event-id", "test-pubkey", "test content", 1500) // Kind not in blacklist
// Generate keypair once for all events
signer, pubkey := generateTestKeypairB(b)
testEvent := createTestEventBench(b, signer, "test content", 1500) // Kind not in blacklist
b.ResetTimer()
for i := 0; i < b.N; i++ {
policy.CheckPolicy("write", testEvent, []byte("test-pubkey"), "127.0.0.1")
policy.CheckPolicy("write", testEvent, pubkey, "127.0.0.1")
}
}
func BenchmarkCheckPolicyComplexRule(b *testing.B) {
// Create test event with many tags
testEvent := createTestEventBench("test-event-id", "test-pubkey", "test content", 1)
// Generate keypair once for all events
signer, pubkey := generateTestKeypairB(b)
testEvent := createTestEventBench(b, signer, "test content", 1)
// Add many tags
for i := 0; i < 100; i++ {
tagItem1 := tag.New()
tagItem1.T = append(tagItem1.T, []byte("p"), []byte("test-pubkey"))
tagItem1.T = append(tagItem1.T, []byte("p"), []byte(hex.Enc(pubkey)))
*testEvent.Tags = append(*testEvent.Tags, tagItem1)
tagItem2 := tag.New()
@@ -264,7 +282,7 @@ func BenchmarkCheckPolicyComplexRule(b *testing.B) {
rule := Rule{
Description: "complex rule",
WriteAllow: []string{"test-pubkey"},
WriteAllow: []string{hex.Enc(pubkey)},
SizeLimit: int64Ptr(100000),
ContentLimit: int64Ptr(10000),
MustHaveTags: []string{"p", "e"},
@@ -275,7 +293,7 @@ func BenchmarkCheckPolicyComplexRule(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
policy.checkRulePolicy("write", testEvent, rule, []byte("test-pubkey"))
policy.checkRulePolicy("write", testEvent, rule, pubkey)
}
}
@@ -294,11 +312,12 @@ func BenchmarkCheckPolicyLargeEvent(b *testing.B) {
},
}
// Create test event with large content
testEvent := createTestEvent("test-event-id", "test-pubkey", largeContent, 1)
// Generate keypair once for all events
signer, pubkey := generateTestKeypairB(b)
testEvent := createTestEventBench(b, signer, largeContent, 1)
b.ResetTimer()
for i := 0; i < b.N; i++ {
policy.CheckPolicy("write", testEvent, []byte("test-pubkey"), "127.0.0.1")
policy.CheckPolicy("write", testEvent, pubkey, "127.0.0.1")
}
}

View File

@@ -10,6 +10,7 @@ import (
"os"
"os/exec"
"path/filepath"
"runtime"
"sync"
"time"
@@ -131,11 +132,13 @@ type PolicyManager struct {
currentCancel context.CancelFunc
mutex sync.RWMutex
isRunning bool
isStarting bool
enabled bool
stdin io.WriteCloser
stdout io.ReadCloser
stderr io.ReadCloser
responseChan chan PolicyResponse
startupChan chan error
}
// P represents a complete policy configuration for a Nostr relay.
@@ -203,6 +206,7 @@ func NewWithManager(ctx context.Context, appName string, enabled bool) *P {
scriptPath: scriptPath,
enabled: enabled,
responseChan: make(chan PolicyResponse, 100), // Buffered channel for responses
startupChan: make(chan error, 1), // Channel for startup completion
}
// Load policy configuration from JSON file
@@ -279,8 +283,19 @@ func (p *P) CheckPolicy(access string, ev *event.E, loggedInPubkey []byte, ipAdd
}
// Check if script is present and enabled
if rule.Script != "" && p.Manager != nil && p.Manager.IsEnabled() {
return p.checkScriptPolicy(access, ev, rule.Script, loggedInPubkey, ipAddress)
if rule.Script != "" && p.Manager != nil {
if p.Manager.IsEnabled() {
return p.checkScriptPolicy(access, ev, rule.Script, loggedInPubkey, ipAddress)
}
// Script is configured but policy is disabled - use default policy if rule has no other restrictions
hasOtherRestrictions := len(rule.WriteAllow) > 0 || len(rule.WriteDeny) > 0 || len(rule.ReadAllow) > 0 || len(rule.ReadDeny) > 0 ||
rule.SizeLimit != nil || rule.ContentLimit != nil || len(rule.MustHaveTags) > 0 ||
rule.MaxExpiry != nil || rule.Privileged || rule.RateLimit != nil ||
rule.MaxAgeOfEvent != nil || rule.MaxAgeEventInFuture != nil
if !hasOtherRestrictions {
// No other restrictions, use default policy
return p.getDefaultPolicyAction(), nil
}
}
// Apply rule-based filtering
@@ -431,7 +446,12 @@ func (p *P) checkRulePolicy(access string, ev *event.E, rule Rule, loggedInPubke
pTags := ev.Tags.GetAll([]byte("p"))
found := false
for _, pTag := range pTags {
if bytes.Equal(pTag.Value(), loggedInPubkey) {
// pTag.Value() returns hex-encoded string; decode to bytes
pt, err := hex.Dec(string(pTag.Value()))
if err != nil {
continue
}
if bytes.Equal(pt, loggedInPubkey) {
found = true
break
}
@@ -447,12 +467,41 @@ func (p *P) checkRulePolicy(access string, ev *event.E, rule Rule, loggedInPubke
// checkScriptPolicy runs the policy script to determine if event should be allowed
func (p *P) checkScriptPolicy(access string, ev *event.E, scriptPath string, loggedInPubkey []byte, ipAddress string) (allowed bool, err error) {
if p.Manager == nil || !p.Manager.IsRunning() {
// If script is not running, fall back to default policy
log.W.F("policy rule for kind %d is inactive (script not running), falling back to default policy (%s)", ev.Kind, p.DefaultPolicy)
if p.Manager == nil {
return false, fmt.Errorf("policy manager is not initialized")
}
// If policy is disabled, fall back to default policy immediately
if !p.Manager.IsEnabled() {
log.W.F("policy rule for kind %d is inactive (policy disabled), falling back to default policy (%s)", ev.Kind, p.DefaultPolicy)
return p.getDefaultPolicyAction(), nil
}
// Policy is enabled, check if it's running
if !p.Manager.IsRunning() {
// Check if script file exists
if _, err := os.Stat(p.Manager.GetScriptPath()); os.IsNotExist(err) {
// Script doesn't exist, this is a fatal error
buf := make([]byte, 1024*1024)
n := runtime.Stack(buf, true)
log.E.F("policy script does not exist at %s", p.Manager.GetScriptPath())
fmt.Fprintf(os.Stderr, "FATAL: Policy script required but not found at %s\n", p.Manager.GetScriptPath())
fmt.Fprintf(os.Stderr, "Stack trace:\n%s\n", buf[:n])
os.Exit(1)
}
// Try to start the policy and wait for it
if err := p.Manager.ensureRunning(); err != nil {
// Startup failed, this is a fatal error
buf := make([]byte, 1024*1024)
n := runtime.Stack(buf, true)
log.E.F("failed to start policy script: %v", err)
fmt.Fprintf(os.Stderr, "FATAL: Failed to start policy script: %v\n", err)
fmt.Fprintf(os.Stderr, "Stack trace:\n%s\n", buf[:n])
os.Exit(1)
}
}
// Create policy event with additional context
policyEvent := &PolicyEvent{
E: ev,
@@ -530,6 +579,91 @@ func (pm *PolicyManager) startPolicyIfExists() {
}
}
// ensureRunning ensures the policy is running, starting it if necessary.
// It waits for startup to complete with a timeout and returns an error if startup fails.
func (pm *PolicyManager) ensureRunning() error {
pm.mutex.Lock()
// Check if already running
if pm.isRunning {
pm.mutex.Unlock()
return nil
}
// Check if already starting
if pm.isStarting {
pm.mutex.Unlock()
// Wait for startup to complete
select {
case err := <-pm.startupChan:
if err != nil {
return fmt.Errorf("policy startup failed: %v", err)
}
// Double-check it's actually running after receiving signal
pm.mutex.RLock()
running := pm.isRunning
pm.mutex.RUnlock()
if !running {
return fmt.Errorf("policy startup completed but process is not running")
}
return nil
case <-time.After(10 * time.Second):
return fmt.Errorf("policy startup timeout")
case <-pm.ctx.Done():
return fmt.Errorf("policy context cancelled")
}
}
// Mark as starting
pm.isStarting = true
pm.mutex.Unlock()
// Start the policy in a goroutine
go func() {
err := pm.StartPolicy()
pm.mutex.Lock()
pm.isStarting = false
pm.mutex.Unlock()
// Signal startup completion (non-blocking)
// Drain any stale value first, then send
select {
case <-pm.startupChan:
default:
}
select {
case pm.startupChan <- err:
default:
// Channel should be empty now, but if it's full, try again
pm.startupChan <- err
}
}()
// Wait for startup to complete
select {
case err := <-pm.startupChan:
if err != nil {
return fmt.Errorf("policy startup failed: %v", err)
}
// Double-check it's actually running after receiving signal
pm.mutex.RLock()
running := pm.isRunning
pm.mutex.RUnlock()
if !running {
return fmt.Errorf("policy startup completed but process is not running")
}
return nil
case <-time.After(10 * time.Second):
pm.mutex.Lock()
pm.isStarting = false
pm.mutex.Unlock()
return fmt.Errorf("policy startup timeout")
case <-pm.ctx.Done():
pm.mutex.Lock()
pm.isStarting = false
pm.mutex.Unlock()
return fmt.Errorf("policy context cancelled")
}
}
// StartPolicy starts the policy script process.
// Returns an error if the script doesn't exist, can't be executed, or is already running.
func (pm *PolicyManager) StartPolicy() error {
@@ -795,6 +929,11 @@ func (pm *PolicyManager) IsRunning() bool {
return pm.isRunning
}
// GetScriptPath returns the path to the policy script.
func (pm *PolicyManager) GetScriptPath() string {
return pm.scriptPath
}
// Shutdown gracefully shuts down the policy manager.
// It cancels the context and stops any running policy script.
func (pm *PolicyManager) Shutdown() {

View File

@@ -0,0 +1,516 @@
package policy
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"
"time"
"lol.mleku.dev/chk"
"next.orly.dev/pkg/crypto/p256k"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/hex"
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag"
)
// TestPolicyIntegration runs the relay with policy enabled and tests event filtering
func TestPolicyIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
// Generate test keys
allowedSigner := &p256k.Signer{}
if err := allowedSigner.Generate(); chk.E(err) {
t.Fatalf("Failed to generate allowed signer: %v", err)
}
allowedPubkeyHex := hex.Enc(allowedSigner.Pub())
unauthorizedSigner := &p256k.Signer{}
if err := unauthorizedSigner.Generate(); chk.E(err) {
t.Fatalf("Failed to generate unauthorized signer: %v", err)
}
// Create temporary directory for policy config
tempDir := t.TempDir()
configDir := filepath.Join(tempDir, "ORLY_TEST")
if err := os.MkdirAll(configDir, 0755); chk.E(err) {
t.Fatalf("Failed to create config directory: %v", err)
}
// Create policy JSON with generated keys
policyJSON := map[string]interface{}{
"kind": map[string]interface{}{
"whitelist": []int{4678, 10306, 30520, 30919},
},
"rules": map[string]interface{}{
"4678": map[string]interface{}{
"description": "Zenotp message events",
"script": filepath.Join(configDir, "validate4678.js"), // Won't exist, should fall back to default
"privileged": true,
},
"10306": map[string]interface{}{
"description": "End user whitelist changes",
"read_allow": []string{allowedPubkeyHex},
"privileged": true,
},
"30520": map[string]interface{}{
"description": "Zenotp events",
"write_allow": []string{allowedPubkeyHex},
"privileged": true,
},
"30919": map[string]interface{}{
"description": "Zenotp events",
"write_allow": []string{allowedPubkeyHex},
"privileged": true,
},
},
}
policyJSONBytes, err := json.MarshalIndent(policyJSON, "", " ")
if err != nil {
t.Fatalf("Failed to marshal policy JSON: %v", err)
}
policyPath := filepath.Join(configDir, "policy.json")
if err := os.WriteFile(policyPath, policyJSONBytes, 0644); chk.E(err) {
t.Fatalf("Failed to write policy file: %v", err)
}
// Create events with proper signatures
// Event 1: Kind 30520 with allowed pubkey (should be allowed)
event30520Allowed := event.New()
event30520Allowed.CreatedAt = time.Now().Unix()
event30520Allowed.Kind = kind.K{K: 30520}.K
event30520Allowed.Content = []byte("test event 30520")
event30520Allowed.Tags = tag.NewS()
addPTag(event30520Allowed, allowedSigner.Pub()) // Add p tag for privileged check
if err := event30520Allowed.Sign(allowedSigner); chk.E(err) {
t.Fatalf("Failed to sign event30520Allowed: %v", err)
}
// Event 2: Kind 30520 with unauthorized pubkey (should be denied)
event30520Unauthorized := event.New()
event30520Unauthorized.CreatedAt = time.Now().Unix()
event30520Unauthorized.Kind = kind.K{K: 30520}.K
event30520Unauthorized.Content = []byte("test event 30520 unauthorized")
event30520Unauthorized.Tags = tag.NewS()
if err := event30520Unauthorized.Sign(unauthorizedSigner); chk.E(err) {
t.Fatalf("Failed to sign event30520Unauthorized: %v", err)
}
// Event 3: Kind 10306 with allowed pubkey (should be readable by allowed user)
event10306Allowed := event.New()
event10306Allowed.CreatedAt = time.Now().Unix()
event10306Allowed.Kind = kind.K{K: 10306}.K
event10306Allowed.Content = []byte("test event 10306")
event10306Allowed.Tags = tag.NewS()
addPTag(event10306Allowed, allowedSigner.Pub()) // Add p tag for privileged check
if err := event10306Allowed.Sign(allowedSigner); chk.E(err) {
t.Fatalf("Failed to sign event10306Allowed: %v", err)
}
// Event 4: Kind 4678 with allowed pubkey (script-based, should fall back to default)
event4678Allowed := event.New()
event4678Allowed.CreatedAt = time.Now().Unix()
event4678Allowed.Kind = kind.K{K: 4678}.K
event4678Allowed.Content = []byte("test event 4678")
event4678Allowed.Tags = tag.NewS()
addPTag(event4678Allowed, allowedSigner.Pub()) // Add p tag for privileged check
if err := event4678Allowed.Sign(allowedSigner); chk.E(err) {
t.Fatalf("Failed to sign event4678Allowed: %v", err)
}
// Test policy loading
policy, err := New(policyJSONBytes)
if err != nil {
t.Fatalf("Failed to create policy: %v", err)
}
// Verify policy loaded correctly
if len(policy.Rules) != 4 {
t.Errorf("Expected 4 rules, got %d", len(policy.Rules))
}
// Test policy checks directly
t.Run("policy checks", func(t *testing.T) {
// Test 1: Event 30520 with allowed pubkey should be allowed
allowed, err := policy.CheckPolicy("write", event30520Allowed, allowedSigner.Pub(), "127.0.0.1")
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if !allowed {
t.Error("Expected event30520Allowed to be allowed")
}
// Test 2: Event 30520 with unauthorized pubkey should be denied
allowed, err = policy.CheckPolicy("write", event30520Unauthorized, unauthorizedSigner.Pub(), "127.0.0.1")
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if allowed {
t.Error("Expected event30520Unauthorized to be denied")
}
// Test 3: Event 10306 should be readable by allowed user
allowed, err = policy.CheckPolicy("read", event10306Allowed, allowedSigner.Pub(), "127.0.0.1")
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if !allowed {
t.Error("Expected event10306Allowed to be readable by allowed user")
}
// Test 4: Event 10306 should NOT be readable by unauthorized user
allowed, err = policy.CheckPolicy("read", event10306Allowed, unauthorizedSigner.Pub(), "127.0.0.1")
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if allowed {
t.Error("Expected event10306Allowed to be denied for unauthorized user")
}
// Test 5: Event 10306 should NOT be readable without authentication
allowed, err = policy.CheckPolicy("read", event10306Allowed, nil, "127.0.0.1")
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if allowed {
t.Error("Expected event10306Allowed to be denied without authentication (privileged)")
}
// Test 6: Event 30520 should NOT be writable without authentication
allowed, err = policy.CheckPolicy("write", event30520Allowed, nil, "127.0.0.1")
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if allowed {
t.Error("Expected event30520Allowed to be denied without authentication (privileged)")
}
// Test 7: Event 4678 should fall back to default policy (allow) when script not running
allowed, err = policy.CheckPolicy("write", event4678Allowed, allowedSigner.Pub(), "127.0.0.1")
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if !allowed {
t.Error("Expected event4678Allowed to be allowed when script not running (falls back to default)")
}
// Test 8: Event 4678 should be denied without authentication (privileged check)
allowed, err = policy.CheckPolicy("write", event4678Allowed, nil, "127.0.0.1")
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if allowed {
t.Error("Expected event4678Allowed to be denied without authentication (privileged)")
}
})
// Test with relay simulation (checking log output)
t.Run("relay simulation", func(t *testing.T) {
// Note: We can't easily capture log output in tests, so we just verify
// that policy checks work correctly
// Simulate policy checks that would happen in relay
// First, publish events (simulate write checks)
checks := []struct {
name string
event *event.E
loggedInPubkey []byte
access string
shouldAllow bool
shouldLog string // Expected log message substring, empty means no specific log expected
}{
{
name: "write 30520 with allowed pubkey",
event: event30520Allowed,
loggedInPubkey: allowedSigner.Pub(),
access: "write",
shouldAllow: true,
},
{
name: "write 30520 with unauthorized pubkey",
event: event30520Unauthorized,
loggedInPubkey: unauthorizedSigner.Pub(),
access: "write",
shouldAllow: false,
},
{
name: "read 10306 with allowed pubkey",
event: event10306Allowed,
loggedInPubkey: allowedSigner.Pub(),
access: "read",
shouldAllow: true,
},
{
name: "read 10306 with unauthorized pubkey",
event: event10306Allowed,
loggedInPubkey: unauthorizedSigner.Pub(),
access: "read",
shouldAllow: false,
},
{
name: "read 10306 without authentication",
event: event10306Allowed,
loggedInPubkey: nil,
access: "read",
shouldAllow: false,
},
{
name: "write 30520 without authentication",
event: event30520Allowed,
loggedInPubkey: nil,
access: "write",
shouldAllow: false,
},
{
name: "write 4678 with allowed pubkey",
event: event4678Allowed,
loggedInPubkey: allowedSigner.Pub(),
access: "write",
shouldAllow: true,
shouldLog: "", // Should not log "policy rule is inactive" if script is not configured
},
}
for _, check := range checks {
t.Run(check.name, func(t *testing.T) {
allowed, err := policy.CheckPolicy(check.access, check.event, check.loggedInPubkey, "127.0.0.1")
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
if allowed != check.shouldAllow {
t.Errorf("Expected allowed=%v, got %v", check.shouldAllow, allowed)
}
})
}
})
// Test event IDs are regenerated correctly after signing
t.Run("event ID regeneration", func(t *testing.T) {
// Create a new event, sign it, then verify ID is correct
testEvent := event.New()
testEvent.CreatedAt = time.Now().Unix()
testEvent.Kind = kind.K{K: 30520}.K
testEvent.Content = []byte("test content")
testEvent.Tags = tag.NewS()
// Sign the event
if err := testEvent.Sign(allowedSigner); chk.E(err) {
t.Fatalf("Failed to sign test event: %v", err)
}
// Verify event ID is correct (should be SHA256 of serialized event)
if len(testEvent.ID) != 32 {
t.Errorf("Expected event ID to be 32 bytes, got %d", len(testEvent.ID))
}
// Verify signature is correct
if len(testEvent.Sig) != 64 {
t.Errorf("Expected event signature to be 64 bytes, got %d", len(testEvent.Sig))
}
// Verify signature validates using event's Verify method
valid, err := testEvent.Verify()
if err != nil {
t.Errorf("Failed to verify signature: %v", err)
}
if !valid {
t.Error("Event signature verification failed")
}
})
// Test WebSocket client simulation (for future integration)
t.Run("websocket client simulation", func(t *testing.T) {
// This test simulates what would happen if we connected via WebSocket
// For now, we'll just verify the events can be serialized correctly
events := []*event.E{
event30520Allowed,
event30520Unauthorized,
event10306Allowed,
event4678Allowed,
}
for i, ev := range events {
t.Run(fmt.Sprintf("event_%d", i), func(t *testing.T) {
// Serialize event
serialized := ev.Serialize()
if len(serialized) == 0 {
t.Error("Event serialization returned empty")
}
// Verify event can be parsed back (simplified check)
if len(ev.ID) != 32 {
t.Errorf("Event ID length incorrect: %d", len(ev.ID))
}
if len(ev.Pubkey) != 32 {
t.Errorf("Event pubkey length incorrect: %d", len(ev.Pubkey))
}
if len(ev.Sig) != 64 {
t.Errorf("Event signature length incorrect: %d", len(ev.Sig))
}
})
}
})
}
// TestPolicyWithRelay creates a comprehensive test that simulates relay behavior
func TestPolicyWithRelay(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
// Generate keys
allowedSigner := &p256k.Signer{}
if err := allowedSigner.Generate(); chk.E(err) {
t.Fatalf("Failed to generate allowed signer: %v", err)
}
allowedPubkeyHex := hex.Enc(allowedSigner.Pub())
unauthorizedSigner := &p256k.Signer{}
if err := unauthorizedSigner.Generate(); chk.E(err) {
t.Fatalf("Failed to generate unauthorized signer: %v", err)
}
// Create policy JSON
policyJSON := map[string]interface{}{
"kind": map[string]interface{}{
"whitelist": []int{4678, 10306, 30520, 30919},
},
"rules": map[string]interface{}{
"10306": map[string]interface{}{
"description": "End user whitelist changes",
"read_allow": []string{allowedPubkeyHex},
"privileged": true,
},
"30520": map[string]interface{}{
"description": "Zenotp events",
"write_allow": []string{allowedPubkeyHex},
"privileged": true,
},
"30919": map[string]interface{}{
"description": "Zenotp events",
"write_allow": []string{allowedPubkeyHex},
"privileged": true,
},
},
}
policyJSONBytes, err := json.Marshal(policyJSON)
if err != nil {
t.Fatalf("Failed to marshal policy JSON: %v", err)
}
policy, err := New(policyJSONBytes)
if err != nil {
t.Fatalf("Failed to create policy: %v", err)
}
// Create test event (kind 30520) with allowed pubkey
testEvent := event.New()
testEvent.CreatedAt = time.Now().Unix()
testEvent.Kind = kind.K{K: 30520}.K
testEvent.Content = []byte("test content")
testEvent.Tags = tag.NewS()
addPTag(testEvent, allowedSigner.Pub())
if err := testEvent.Sign(allowedSigner); chk.E(err) {
t.Fatalf("Failed to sign test event: %v", err)
}
// Test scenarios
scenarios := []struct {
name string
loggedInPubkey []byte
expectedResult bool
description string
}{
{
name: "authenticated as allowed pubkey",
loggedInPubkey: allowedSigner.Pub(),
expectedResult: true,
description: "Should allow when authenticated as allowed pubkey",
},
{
name: "unauthenticated",
loggedInPubkey: nil,
expectedResult: false,
description: "Should deny when not authenticated (privileged check)",
},
{
name: "authenticated as different pubkey",
loggedInPubkey: unauthorizedSigner.Pub(),
expectedResult: false,
description: "Should deny when authenticated as different pubkey",
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
allowed, err := policy.CheckPolicy("write", testEvent, scenario.loggedInPubkey, "127.0.0.1")
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
if allowed != scenario.expectedResult {
t.Errorf("%s: Expected allowed=%v, got %v", scenario.description, scenario.expectedResult, allowed)
}
})
}
// Test read access for kind 10306
readEvent := event.New()
readEvent.CreatedAt = time.Now().Unix()
readEvent.Kind = kind.K{K: 10306}.K
readEvent.Content = []byte("test read event")
readEvent.Tags = tag.NewS()
addPTag(readEvent, allowedSigner.Pub())
if err := readEvent.Sign(allowedSigner); chk.E(err) {
t.Fatalf("Failed to sign read event: %v", err)
}
readScenarios := []struct {
name string
loggedInPubkey []byte
expectedResult bool
description string
}{
{
name: "read authenticated as allowed pubkey",
loggedInPubkey: allowedSigner.Pub(),
expectedResult: true,
description: "Should allow read when authenticated as allowed pubkey",
},
{
name: "read unauthenticated",
loggedInPubkey: nil,
expectedResult: false,
description: "Should deny read when not authenticated (privileged check)",
},
{
name: "read authenticated as different pubkey",
loggedInPubkey: unauthorizedSigner.Pub(),
expectedResult: false,
description: "Should deny read when authenticated as different pubkey",
},
}
for _, scenario := range readScenarios {
t.Run(scenario.name, func(t *testing.T) {
allowed, err := policy.CheckPolicy("read", readEvent, scenario.loggedInPubkey, "127.0.0.1")
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
if allowed != scenario.expectedResult {
t.Errorf("%s: Expected allowed=%v, got %v", scenario.description, scenario.expectedResult, allowed)
}
})
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -307,7 +307,7 @@ func (r *Client) ConnectWithTLS(
if r.notices != nil {
r.notices <- env.Message
} else {
log.E.F("NOTICE from %s: '%s'\n", r.URL, env.Message)
log.E.F("NOTICE from %s: '%s'", r.URL, env.Message)
}
case authenvelope.L:
env := authenvelope.NewChallenge()

View File

@@ -3,21 +3,19 @@ package ws
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net/http"
"time"
"github.com/gorilla/websocket"
"lol.mleku.dev/errorf"
"next.orly.dev/pkg/utils/units"
ws "github.com/coder/websocket"
)
// Connection represents a websocket connection to a Nostr relay.
type Connection struct {
conn *ws.Conn
conn *websocket.Conn
}
// NewConnection creates a new websocket connection to a Nostr relay.
@@ -25,10 +23,23 @@ func NewConnection(
ctx context.Context, url string, reqHeader http.Header,
tlsConfig *tls.Config,
) (c *Connection, err error) {
var conn *ws.Conn
if conn, _, err = ws.Dial(
ctx, url, getConnectionOptions(reqHeader, tlsConfig),
); err != nil {
var conn *websocket.Conn
var resp *http.Response
dialer := getConnectionOptions(reqHeader, tlsConfig)
// Prepare headers with default User-Agent if not present
headers := reqHeader
if headers == nil {
headers = make(http.Header)
}
if headers.Get("User-Agent") == "" {
headers.Set("User-Agent", "github.com/nbd-wtf/go-nostr")
}
if conn, resp, err = dialer.DialContext(ctx, url, headers); err != nil {
if resp != nil {
resp.Body.Close()
}
return
}
conn.SetReadLimit(33 * units.Mb)
@@ -41,7 +52,14 @@ func NewConnection(
func (c *Connection) WriteMessage(
ctx context.Context, data []byte,
) (err error) {
if err = c.conn.Write(ctx, ws.MessageText, data); err != nil {
deadline := time.Now().Add(10 * time.Second)
if ctx != nil {
if d, ok := ctx.Deadline(); ok {
deadline = d
}
}
c.conn.SetWriteDeadline(deadline)
if err = c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
err = errorf.E("failed to write message: %w", err)
return
}
@@ -52,11 +70,22 @@ func (c *Connection) WriteMessage(
func (c *Connection) ReadMessage(
ctx context.Context, buf io.Writer,
) (err error) {
var reader io.Reader
if _, reader, err = c.conn.Reader(ctx); err != nil {
deadline := time.Now().Add(60 * time.Second)
if ctx != nil {
if d, ok := ctx.Deadline(); ok {
deadline = d
}
}
c.conn.SetReadDeadline(deadline)
messageType, reader, err := c.conn.NextReader()
if err != nil {
err = fmt.Errorf("failed to get reader: %w", err)
return
}
if messageType != websocket.TextMessage && messageType != websocket.BinaryMessage {
err = fmt.Errorf("unexpected message type: %d", messageType)
return
}
if _, err = io.Copy(buf, reader); err != nil {
err = fmt.Errorf("failed to read message: %w", err)
return
@@ -66,14 +95,18 @@ func (c *Connection) ReadMessage(
// Close closes the websocket connection.
func (c *Connection) Close() error {
return c.conn.Close(ws.StatusNormalClosure, "")
c.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second))
return c.conn.Close()
}
// Ping sends a ping message to the websocket connection.
func (c *Connection) Ping(ctx context.Context) error {
ctx, cancel := context.WithTimeoutCause(
ctx, time.Millisecond*800, errors.New("ping took too long"),
)
defer cancel()
return c.conn.Ping(ctx)
deadline := time.Now().Add(800 * time.Millisecond)
if ctx != nil {
if d, ok := ctx.Deadline(); ok {
deadline = d
}
}
c.conn.SetWriteDeadline(deadline)
return c.conn.WriteControl(websocket.PingMessage, []byte{}, deadline)
}

View File

@@ -5,32 +5,21 @@ package ws
import (
"crypto/tls"
"net/http"
"net/textproto"
"time"
ws "github.com/coder/websocket"
"github.com/gorilla/websocket"
)
var defaultConnectionOptions = &ws.DialOptions{
CompressionMode: ws.CompressionContextTakeover,
HTTPHeader: http.Header{
textproto.CanonicalMIMEHeaderKey("User-Agent"): {"github.com/nbd-wtf/go-nostr"},
},
}
func getConnectionOptions(
requestHeader http.Header, tlsConfig *tls.Config,
) *ws.DialOptions {
if requestHeader == nil && tlsConfig == nil {
return defaultConnectionOptions
}
return &ws.DialOptions{
HTTPHeader: requestHeader,
CompressionMode: ws.CompressionContextTakeover,
HTTPClient: &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
},
) *websocket.Dialer {
dialer := &websocket.Dialer{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
TLSClientConfig: tlsConfig,
HandshakeTimeout: 10 * time.Second,
}
// Headers are passed directly to DialContext, not set on Dialer
// The User-Agent header will be set when calling DialContext if not present
return dialer
}

200
pkg/run/run.go Normal file
View File

@@ -0,0 +1,200 @@
package run
import (
"bytes"
"context"
"io"
"os"
"path/filepath"
"strings"
"sync"
"github.com/adrg/xdg"
"lol.mleku.dev/chk"
lol "lol.mleku.dev"
"next.orly.dev/app"
"next.orly.dev/app/config"
"next.orly.dev/pkg/acl"
"next.orly.dev/pkg/database"
)
// Options configures relay startup behavior.
type Options struct {
// CleanupDataDir controls whether the data directory is deleted on Stop().
// Defaults to true. Set to false to preserve the data directory.
CleanupDataDir *bool
// StdoutWriter is an optional writer to receive stdout logs.
// If nil, stdout will be captured to a buffer accessible via Relay.Stdout().
StdoutWriter io.Writer
// StderrWriter is an optional writer to receive stderr logs.
// If nil, stderr will be captured to a buffer accessible via Relay.Stderr().
StderrWriter io.Writer
}
// Relay represents a running relay instance that can be started and stopped.
type Relay struct {
ctx context.Context
cancel context.CancelFunc
db *database.D
quit chan struct{}
dataDir string
cleanupDataDir bool
// Log capture
stdoutBuf *bytes.Buffer
stderrBuf *bytes.Buffer
stdoutWriter io.Writer
stderrWriter io.Writer
logMu sync.RWMutex
}
// Start initializes and starts a relay with the given configuration.
// It bypasses the configuration loading step and uses the provided config directly.
//
// Parameters:
// - cfg: The configuration to use for the relay
// - opts: Optional configuration for relay behavior. If nil, defaults are used.
//
// Returns:
// - relay: A Relay instance that can be used to stop the relay
// - err: An error if initialization or startup fails
func Start(cfg *config.C, opts *Options) (relay *Relay, err error) {
relay = &Relay{
cleanupDataDir: true,
}
// Apply options
var userStdoutWriter, userStderrWriter io.Writer
if opts != nil {
if opts.CleanupDataDir != nil {
relay.cleanupDataDir = *opts.CleanupDataDir
}
userStdoutWriter = opts.StdoutWriter
userStderrWriter = opts.StderrWriter
}
// Set up log capture buffers
relay.stdoutBuf = &bytes.Buffer{}
relay.stderrBuf = &bytes.Buffer{}
// Build writers list for stdout
stdoutWriters := []io.Writer{relay.stdoutBuf}
if userStdoutWriter != nil {
stdoutWriters = append(stdoutWriters, userStdoutWriter)
}
stdoutWriters = append(stdoutWriters, os.Stdout)
relay.stdoutWriter = io.MultiWriter(stdoutWriters...)
// Build writers list for stderr
stderrWriters := []io.Writer{relay.stderrBuf}
if userStderrWriter != nil {
stderrWriters = append(stderrWriters, userStderrWriter)
}
stderrWriters = append(stderrWriters, os.Stderr)
relay.stderrWriter = io.MultiWriter(stderrWriters...)
// Set up logging - write to appropriate destination and capture
if cfg.LogToStdout {
lol.Writer = relay.stdoutWriter
} else {
lol.Writer = relay.stderrWriter
}
lol.SetLogLevel(cfg.LogLevel)
// Expand DataDir if needed
if cfg.DataDir == "" || strings.Contains(cfg.DataDir, "~") {
cfg.DataDir = filepath.Join(xdg.DataHome, cfg.AppName)
}
relay.dataDir = cfg.DataDir
// Create context
relay.ctx, relay.cancel = context.WithCancel(context.Background())
// Initialize database
if relay.db, err = database.New(
relay.ctx, relay.cancel, cfg.DataDir, cfg.DBLogLevel,
); chk.E(err) {
return
}
// Configure ACL
acl.Registry.Active.Store(cfg.ACLMode)
if err = acl.Registry.Configure(cfg, relay.db, relay.ctx); chk.E(err) {
return
}
acl.Registry.Syncer()
// Start the relay
relay.quit = app.Run(relay.ctx, cfg, relay.db)
return
}
// Stop gracefully stops the relay by canceling the context and closing the database.
// If CleanupDataDir is enabled (default), it also removes the data directory.
//
// Returns:
// - err: An error if shutdown fails
func (r *Relay) Stop() (err error) {
if r.cancel != nil {
r.cancel()
}
if r.quit != nil {
<-r.quit
}
if r.db != nil {
err = r.db.Close()
}
// Clean up data directory if enabled
if r.cleanupDataDir && r.dataDir != "" {
if rmErr := os.RemoveAll(r.dataDir); rmErr != nil {
if err == nil {
err = rmErr
}
}
}
return
}
// Stdout returns the complete stdout log buffer contents.
func (r *Relay) Stdout() string {
r.logMu.RLock()
defer r.logMu.RUnlock()
if r.stdoutBuf == nil {
return ""
}
return r.stdoutBuf.String()
}
// Stderr returns the complete stderr log buffer contents.
func (r *Relay) Stderr() string {
r.logMu.RLock()
defer r.logMu.RUnlock()
if r.stderrBuf == nil {
return ""
}
return r.stderrBuf.String()
}
// StdoutBytes returns the complete stdout log buffer as bytes.
func (r *Relay) StdoutBytes() []byte {
r.logMu.RLock()
defer r.logMu.RUnlock()
if r.stdoutBuf == nil {
return nil
}
return r.stdoutBuf.Bytes()
}
// StderrBytes returns the complete stderr log buffer as bytes.
func (r *Relay) StderrBytes() []byte {
r.logMu.RLock()
defer r.logMu.RUnlock()
if r.stderrBuf == nil {
return nil
}
return r.stderrBuf.Bytes()
}

View File

@@ -2,7 +2,6 @@ package spider
import (
"context"
"encoding/hex"
"fmt"
"sync"
"time"
@@ -12,6 +11,7 @@ import (
"lol.mleku.dev/log"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/encoders/filter"
"next.orly.dev/pkg/encoders/hex"
"next.orly.dev/pkg/encoders/tag"
"next.orly.dev/pkg/encoders/timestamp"
"next.orly.dev/pkg/interfaces/publisher"
@@ -27,6 +27,8 @@ const (
ReconnectDelay = 5 * time.Second
// MaxReconnectDelay is the maximum delay between reconnection attempts
MaxReconnectDelay = 5 * time.Minute
// BlackoutPeriod is the duration to blacklist a relay after MaxReconnectDelay is reached
BlackoutPeriod = 24 * time.Hour
)
// Spider manages connections to admin relays and syncs events for followed pubkeys
@@ -64,8 +66,12 @@ type RelayConnection struct {
subscriptions map[string]*BatchSubscription
// Disconnection tracking
lastDisconnect time.Time
reconnectDelay time.Duration
lastDisconnect time.Time
reconnectDelay time.Duration
connectionStartTime time.Time
// Blackout tracking for IP filters
blackoutUntil time.Time
}
// BatchSubscription represents a subscription for a batch of pubkeys
@@ -261,6 +267,20 @@ func (rc *RelayConnection) manage(followList [][]byte) {
default:
}
// Check if relay is blacked out
if rc.isBlackedOut() {
log.D.F("spider: %s is blacked out until %v", rc.url, rc.blackoutUntil)
select {
case <-rc.ctx.Done():
return
case <-time.After(time.Until(rc.blackoutUntil)):
// Blackout expired, reset delay and try again
rc.reconnectDelay = ReconnectDelay
log.I.F("spider: blackout period ended for %s, retrying", rc.url)
}
continue
}
// Attempt to connect
if err := rc.connect(); chk.E(err) {
log.W.F("spider: failed to connect to %s: %v", rc.url, err)
@@ -269,7 +289,9 @@ func (rc *RelayConnection) manage(followList [][]byte) {
}
log.I.F("spider: connected to %s", rc.url)
rc.connectionStartTime = time.Now()
rc.reconnectDelay = ReconnectDelay // Reset delay on successful connection
rc.blackoutUntil = time.Time{} // Clear blackout on successful connection
// Create subscriptions for follow list
rc.createSubscriptions(followList)
@@ -278,6 +300,19 @@ func (rc *RelayConnection) manage(followList [][]byte) {
<-rc.client.Context().Done()
log.W.F("spider: disconnected from %s: %v", rc.url, rc.client.ConnectionCause())
// Check if disconnection happened very quickly (likely IP filter)
connectionDuration := time.Since(rc.connectionStartTime)
const quickDisconnectThreshold = 30 * time.Second
if connectionDuration < quickDisconnectThreshold {
log.W.F("spider: quick disconnection from %s after %v (likely IP filter)", rc.url, connectionDuration)
// Don't reset the delay, keep the backoff
rc.waitBeforeReconnect()
} else {
// Normal disconnection, reset backoff for future connections
rc.reconnectDelay = ReconnectDelay
}
rc.handleDisconnection()
// Clean up
@@ -306,13 +341,21 @@ func (rc *RelayConnection) waitBeforeReconnect() {
case <-time.After(rc.reconnectDelay):
}
// Exponential backoff
// Exponential backoff - double every time
rc.reconnectDelay *= 2
if rc.reconnectDelay > MaxReconnectDelay {
rc.reconnectDelay = MaxReconnectDelay
// If backoff exceeds 5 minutes, blackout for 24 hours
if rc.reconnectDelay >= MaxReconnectDelay {
rc.blackoutUntil = time.Now().Add(BlackoutPeriod)
log.W.F("spider: max backoff exceeded for %s (reached %v), blacking out for 24 hours", rc.url, rc.reconnectDelay)
}
}
// isBlackedOut returns true if the relay is currently blacked out
func (rc *RelayConnection) isBlackedOut() bool {
return !rc.blackoutUntil.IsZero() && time.Now().Before(rc.blackoutUntil)
}
// handleDisconnection records disconnection time for catch-up logic
func (rc *RelayConnection) handleDisconnection() {
now := time.Now()
@@ -371,17 +414,20 @@ func (rc *RelayConnection) createBatchSubscription(batchID string, pubkeys [][]b
}
// Create filters: one for authors, one for p tags
var pTags tag.S
// For #p tag filters, all pubkeys must be in a single tag array as hex-encoded strings
tagElements := [][]byte{[]byte("p")} // First element is the key
for _, pk := range pubkeys {
pTags = append(pTags, tag.NewFromAny("p", pk))
pkHex := hex.EncAppend(nil, pk)
tagElements = append(tagElements, pkHex)
}
pTags := &tag.S{tag.NewFromBytesSlice(tagElements...)}
filters := filter.NewS(
&filter.F{
Authors: tag.NewFromBytesSlice(pubkeys...),
},
&filter.F{
Tags: tag.NewS(pTags...),
Tags: pTags,
},
)
@@ -422,10 +468,6 @@ func (bs *BatchSubscription) handleEvents() {
// Save event to database
if _, err := bs.relay.spider.db.SaveEvent(bs.relay.ctx, ev); err != nil {
if !chk.E(err) {
log.T.F("spider: saved event %s from %s",
hex.EncodeToString(ev.ID[:]), bs.relay.url)
}
} else {
// Publish event if it was newly saved
if bs.relay.spider.pub != nil {
@@ -484,10 +526,14 @@ func (rc *RelayConnection) performCatchup(sub *BatchSubscription, disconnectTime
sinceTs := timestamp.T{V: since.Unix()}
untilTs := timestamp.T{V: until.Unix()}
var pTags tag.S
// Create filters with hex-encoded pubkeys for #p tags
// All pubkeys must be in a single tag array
tagElements := [][]byte{[]byte("p")} // First element is the key
for _, pk := range sub.pubkeys {
pTags = append(pTags, tag.NewFromAny("p", pk))
pkHex := hex.EncAppend(nil, pk)
tagElements = append(tagElements, pkHex)
}
pTags := &tag.S{tag.NewFromBytesSlice(tagElements...)}
filters := filter.NewS(
&filter.F{
@@ -496,7 +542,7 @@ func (rc *RelayConnection) performCatchup(sub *BatchSubscription, disconnectTime
Until: &untilTs,
},
&filter.F{
Tags: tag.NewS(pTags...),
Tags: pTags,
Since: &sinceTs,
Until: &untilTs,
},
@@ -539,7 +585,7 @@ func (rc *RelayConnection) performCatchup(sub *BatchSubscription, disconnectTime
if _, err := rc.spider.db.SaveEvent(rc.ctx, ev); err != nil {
if !chk.E(err) {
log.T.F("spider: catch-up saved event %s from %s",
hex.EncodeToString(ev.ID[:]), rc.url)
hex.Enc(ev.ID[:]), rc.url)
}
} else {
// Publish event if it was newly saved

View File

@@ -1 +1 @@
v0.19.7
v0.20.6

283
relay-tester/client.go Normal file
View File

@@ -0,0 +1,283 @@
package relaytester
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/gorilla/websocket"
"lol.mleku.dev/errorf"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/hex"
)
// Client wraps a WebSocket connection to a relay for testing.
type Client struct {
conn *websocket.Conn
url string
mu sync.Mutex
subs map[string]chan []byte
okCh chan []byte // Channel for OK messages
countCh chan []byte // Channel for COUNT messages
ctx context.Context
cancel context.CancelFunc
}
// NewClient creates a new test client connected to the relay.
func NewClient(url string) (c *Client, err error) {
ctx, cancel := context.WithCancel(context.Background())
var conn *websocket.Conn
dialer := websocket.Dialer{
HandshakeTimeout: 5 * time.Second,
}
if conn, _, err = dialer.Dial(url, nil); err != nil {
cancel()
return
}
c = &Client{
conn: conn,
url: url,
subs: make(map[string]chan []byte),
okCh: make(chan []byte, 100),
countCh: make(chan []byte, 100),
ctx: ctx,
cancel: cancel,
}
go c.readLoop()
return
}
// Close closes the client connection.
func (c *Client) Close() error {
c.cancel()
return c.conn.Close()
}
// Send sends a JSON message to the relay.
func (c *Client) Send(msg interface{}) (err error) {
c.mu.Lock()
defer c.mu.Unlock()
var data []byte
if data, err = json.Marshal(msg); err != nil {
return errorf.E("failed to marshal message: %w", err)
}
if err = c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
return errorf.E("failed to write message: %w", err)
}
return
}
// readLoop reads messages from the relay and routes them to subscriptions.
func (c *Client) readLoop() {
defer c.conn.Close()
for {
select {
case <-c.ctx.Done():
return
default:
}
_, msg, err := c.conn.ReadMessage()
if err != nil {
return
}
var raw []interface{}
if err = json.Unmarshal(msg, &raw); err != nil {
continue
}
if len(raw) < 2 {
continue
}
typ, ok := raw[0].(string)
if !ok {
continue
}
c.mu.Lock()
switch typ {
case "EVENT":
if len(raw) >= 2 {
if subID, ok := raw[1].(string); ok {
if ch, exists := c.subs[subID]; exists {
select {
case ch <- msg:
default:
}
}
}
}
case "EOSE":
if len(raw) >= 2 {
if subID, ok := raw[1].(string); ok {
if ch, exists := c.subs[subID]; exists {
close(ch)
delete(c.subs, subID)
}
}
}
case "OK":
// Route OK messages to okCh for WaitForOK
select {
case c.okCh <- msg:
default:
}
case "COUNT":
// Route COUNT messages to countCh for Count
select {
case c.countCh <- msg:
default:
}
case "NOTICE":
// Notice messages are logged
case "CLOSED":
// Closed messages indicate subscription ended
case "AUTH":
// Auth challenge messages
}
c.mu.Unlock()
}
}
// Subscribe creates a subscription and returns a channel for events.
func (c *Client) Subscribe(subID string, filters []interface{}) (ch chan []byte, err error) {
req := []interface{}{"REQ", subID}
req = append(req, filters...)
if err = c.Send(req); err != nil {
return
}
c.mu.Lock()
ch = make(chan []byte, 100)
c.subs[subID] = ch
c.mu.Unlock()
return
}
// Unsubscribe closes a subscription.
func (c *Client) Unsubscribe(subID string) error {
c.mu.Lock()
if ch, exists := c.subs[subID]; exists {
// Channel might already be closed by EOSE, so use recover to handle gracefully
func() {
defer func() {
if recover() != nil {
// Channel was already closed, ignore
}
}()
close(ch)
}()
delete(c.subs, subID)
}
c.mu.Unlock()
return c.Send([]interface{}{"CLOSE", subID})
}
// Publish sends an EVENT message to the relay.
func (c *Client) Publish(ev *event.E) (err error) {
evJSON := ev.Serialize()
var evMap map[string]interface{}
if err = json.Unmarshal(evJSON, &evMap); err != nil {
return errorf.E("failed to unmarshal event: %w", err)
}
return c.Send([]interface{}{"EVENT", evMap})
}
// WaitForOK waits for an OK response for the given event ID.
func (c *Client) WaitForOK(eventID []byte, timeout time.Duration) (accepted bool, reason string, err error) {
ctx, cancel := context.WithTimeout(c.ctx, timeout)
defer cancel()
idStr := hex.Enc(eventID)
for {
select {
case <-ctx.Done():
return false, "", errorf.E("timeout waiting for OK response")
case msg := <-c.okCh:
var raw []interface{}
if err = json.Unmarshal(msg, &raw); err != nil {
continue
}
if len(raw) < 3 {
continue
}
if id, ok := raw[1].(string); ok && id == idStr {
accepted, _ = raw[2].(bool)
if len(raw) > 3 {
reason, _ = raw[3].(string)
}
return
}
}
}
}
// Count sends a COUNT request and returns the count.
func (c *Client) Count(filters []interface{}) (count int64, err error) {
req := []interface{}{"COUNT", "count-sub"}
req = append(req, filters...)
if err = c.Send(req); err != nil {
return
}
ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return 0, errorf.E("timeout waiting for COUNT response")
case msg := <-c.countCh:
var raw []interface{}
if err = json.Unmarshal(msg, &raw); err != nil {
continue
}
if len(raw) >= 3 {
if subID, ok := raw[1].(string); ok && subID == "count-sub" {
// COUNT response format: ["COUNT", "subscription-id", count, approximate?]
if cnt, ok := raw[2].(float64); ok {
return int64(cnt), nil
}
}
}
}
}
}
// Auth sends an AUTH message with the signed event.
func (c *Client) Auth(ev *event.E) error {
evJSON := ev.Serialize()
var evMap map[string]interface{}
if err := json.Unmarshal(evJSON, &evMap); err != nil {
return errorf.E("failed to unmarshal event: %w", err)
}
return c.Send([]interface{}{"AUTH", evMap})
}
// GetEvents collects all events from a subscription until EOSE.
func (c *Client) GetEvents(subID string, filters []interface{}, timeout time.Duration) (events []*event.E, err error) {
ch, err := c.Subscribe(subID, filters)
if err != nil {
return
}
defer c.Unsubscribe(subID)
ctx, cancel := context.WithTimeout(c.ctx, timeout)
defer cancel()
for {
select {
case <-ctx.Done():
return events, nil
case msg, ok := <-ch:
if !ok {
return events, nil
}
var raw []interface{}
if err = json.Unmarshal(msg, &raw); err != nil {
continue
}
if len(raw) >= 3 && raw[0] == "EVENT" {
if evData, ok := raw[2].(map[string]interface{}); ok {
evJSON, _ := json.Marshal(evData)
ev := event.New()
if _, err = ev.Unmarshal(evJSON); err == nil {
events = append(events, ev)
}
}
}
}
}
}

131
relay-tester/keys.go Normal file
View File

@@ -0,0 +1,131 @@
package relaytester
import (
"crypto/rand"
"fmt"
"time"
"lol.mleku.dev/chk"
"next.orly.dev/pkg/crypto/p256k"
"next.orly.dev/pkg/encoders/bech32encoding"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/hex"
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag"
)
// KeyPair represents a test keypair.
type KeyPair struct {
Secret *p256k.Signer
Pubkey []byte
Nsec string
Npub string
}
// GenerateKeyPair generates a new keypair for testing.
func GenerateKeyPair() (kp *KeyPair, err error) {
kp = &KeyPair{}
kp.Secret = &p256k.Signer{}
if err = kp.Secret.Generate(); chk.E(err) {
return
}
kp.Pubkey = kp.Secret.Pub()
nsecBytes, err := bech32encoding.BinToNsec(kp.Secret.Sec())
if chk.E(err) {
return
}
kp.Nsec = string(nsecBytes)
npubBytes, err := bech32encoding.BinToNpub(kp.Pubkey)
if chk.E(err) {
return
}
kp.Npub = string(npubBytes)
return
}
// CreateEvent creates a signed event with the given parameters.
func CreateEvent(signer *p256k.Signer, kindNum uint16, content string, tags *tag.S) (ev *event.E, err error) {
ev = event.New()
ev.CreatedAt = time.Now().Unix()
ev.Kind = kindNum
ev.Content = []byte(content)
if tags != nil {
ev.Tags = tags
} else {
ev.Tags = tag.NewS()
}
if err = ev.Sign(signer); chk.E(err) {
return
}
return
}
// CreateEventWithTags creates an event with specific tags.
func CreateEventWithTags(signer *p256k.Signer, kindNum uint16, content string, tagPairs [][]string) (ev *event.E, err error) {
tags := tag.NewS()
for _, pair := range tagPairs {
if len(pair) >= 2 {
// Build tag fields as []byte variadic arguments
tagFields := make([][]byte, len(pair))
tagFields[0] = []byte(pair[0])
for i := 1; i < len(pair); i++ {
tagFields[i] = []byte(pair[i])
}
tags.Append(tag.NewFromBytesSlice(tagFields...))
}
}
return CreateEvent(signer, kindNum, content, tags)
}
// CreateReplaceableEvent creates a replaceable event (kind 0-3, 10000-19999).
func CreateReplaceableEvent(signer *p256k.Signer, kindNum uint16, content string) (ev *event.E, err error) {
return CreateEvent(signer, kindNum, content, nil)
}
// CreateEphemeralEvent creates an ephemeral event (kind 20000-29999).
func CreateEphemeralEvent(signer *p256k.Signer, kindNum uint16, content string) (ev *event.E, err error) {
return CreateEvent(signer, kindNum, content, nil)
}
// CreateDeleteEvent creates a deletion event (kind 5).
func CreateDeleteEvent(signer *p256k.Signer, eventIDs [][]byte, reason string) (ev *event.E, err error) {
tags := tag.NewS()
for _, id := range eventIDs {
// e tags must contain hex-encoded event IDs
tags.Append(tag.NewFromBytesSlice([]byte("e"), []byte(hex.Enc(id))))
}
if reason != "" {
tags.Append(tag.NewFromBytesSlice([]byte("content"), []byte(reason)))
}
return CreateEvent(signer, kind.EventDeletion.K, reason, tags)
}
// CreateParameterizedReplaceableEvent creates a parameterized replaceable event (kind 30000-39999).
func CreateParameterizedReplaceableEvent(signer *p256k.Signer, kindNum uint16, content string, dTag string) (ev *event.E, err error) {
tags := tag.NewS()
tags.Append(tag.NewFromBytesSlice([]byte("d"), []byte(dTag)))
return CreateEvent(signer, kindNum, content, tags)
}
// RandomID generates a random 32-byte ID.
func RandomID() (id []byte, err error) {
id = make([]byte, 32)
if _, err = rand.Read(id); err != nil {
return nil, fmt.Errorf("failed to generate random ID: %w", err)
}
return
}
// MustHex decodes a hex string or panics.
func MustHex(s string) []byte {
b, err := hex.Dec(s)
if err != nil {
panic(fmt.Sprintf("invalid hex: %s", s))
}
return b
}
// HexID returns the hex-encoded event ID.
func HexID(ev *event.E) string {
return hex.Enc(ev.ID)
}

261
relay-tester/test.go Normal file
View File

@@ -0,0 +1,261 @@
package relaytester
import (
"encoding/json"
"time"
"lol.mleku.dev/errorf"
)
// TestResult represents the result of a test.
type TestResult struct {
Name string `json:"test"`
Pass bool `json:"pass"`
Required bool `json:"required"`
Info string `json:"info,omitempty"`
}
// TestFunc is a function that runs a test case.
type TestFunc func(client *Client, key1, key2 *KeyPair) (result TestResult)
// TestCase represents a test case with dependencies.
type TestCase struct {
Name string
Required bool
Func TestFunc
Dependencies []string // Names of tests that must run before this one
}
// TestSuite runs all tests against a relay.
type TestSuite struct {
relayURL string
key1 *KeyPair
key2 *KeyPair
tests map[string]*TestCase
results map[string]TestResult
order []string
}
// NewTestSuite creates a new test suite.
func NewTestSuite(relayURL string) (suite *TestSuite, err error) {
suite = &TestSuite{
relayURL: relayURL,
tests: make(map[string]*TestCase),
results: make(map[string]TestResult),
}
if suite.key1, err = GenerateKeyPair(); err != nil {
return
}
if suite.key2, err = GenerateKeyPair(); err != nil {
return
}
suite.registerTests()
return
}
// AddTest adds a test case to the suite.
func (s *TestSuite) AddTest(tc *TestCase) {
s.tests[tc.Name] = tc
}
// registerTests registers all test cases.
func (s *TestSuite) registerTests() {
allTests := []*TestCase{
{
Name: "Publishes basic event",
Required: true,
Func: testPublishBasicEvent,
},
{
Name: "Finds event by ID",
Required: true,
Func: testFindByID,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Finds event by author",
Required: true,
Func: testFindByAuthor,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Finds event by kind",
Required: true,
Func: testFindByKind,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Finds event by tags",
Required: true,
Func: testFindByTags,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Finds by multiple tags",
Required: true,
Func: testFindByMultipleTags,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Finds by time range",
Required: true,
Func: testFindByTimeRange,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Rejects invalid signature",
Required: true,
Func: testRejectInvalidSignature,
},
{
Name: "Rejects future event",
Required: true,
Func: testRejectFutureEvent,
},
{
Name: "Rejects expired event",
Required: false,
Func: testRejectExpiredEvent,
},
{
Name: "Handles replaceable events",
Required: true,
Func: testReplaceableEvents,
},
{
Name: "Handles ephemeral events",
Required: false,
Func: testEphemeralEvents,
},
{
Name: "Handles parameterized replaceable events",
Required: true,
Func: testParameterizedReplaceableEvents,
},
{
Name: "Handles deletion events",
Required: true,
Func: testDeletionEvents,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Handles COUNT request",
Required: true,
Func: testCountRequest,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Handles limit parameter",
Required: true,
Func: testLimitParameter,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Handles multiple filters",
Required: true,
Func: testMultipleFilters,
Dependencies: []string{"Publishes basic event"},
},
{
Name: "Handles subscription close",
Required: true,
Func: testSubscriptionClose,
},
}
for _, tc := range allTests {
s.AddTest(tc)
}
s.topologicalSort()
}
// topologicalSort orders tests based on dependencies.
func (s *TestSuite) topologicalSort() {
visited := make(map[string]bool)
temp := make(map[string]bool)
var visit func(name string)
visit = func(name string) {
if temp[name] {
return
}
if visited[name] {
return
}
temp[name] = true
if tc, exists := s.tests[name]; exists {
for _, dep := range tc.Dependencies {
visit(dep)
}
}
temp[name] = false
visited[name] = true
s.order = append(s.order, name)
}
for name := range s.tests {
if !visited[name] {
visit(name)
}
}
}
// Run runs all tests in the suite.
func (s *TestSuite) Run() (results []TestResult, err error) {
client, err := NewClient(s.relayURL)
if err != nil {
return nil, errorf.E("failed to connect to relay: %w", err)
}
defer client.Close()
for _, name := range s.order {
tc := s.tests[name]
if tc == nil {
continue
}
result := tc.Func(client, s.key1, s.key2)
result.Name = name
result.Required = tc.Required
s.results[name] = result
results = append(results, result)
time.Sleep(100 * time.Millisecond) // Small delay between tests
}
return
}
// RunTest runs a specific test by name.
func (s *TestSuite) RunTest(testName string) (result TestResult, err error) {
tc, exists := s.tests[testName]
if !exists {
return result, errorf.E("test %s not found", testName)
}
// Check dependencies
for _, dep := range tc.Dependencies {
if _, exists := s.results[dep]; !exists {
return result, errorf.E("test %s depends on %s which has not been run", testName, dep)
}
if !s.results[dep].Pass {
return result, errorf.E("test %s depends on %s which failed", testName, dep)
}
}
client, err := NewClient(s.relayURL)
if err != nil {
return result, errorf.E("failed to connect to relay: %w", err)
}
defer client.Close()
result = tc.Func(client, s.key1, s.key2)
result.Name = testName
result.Required = tc.Required
s.results[testName] = result
return
}
// GetResults returns all test results.
func (s *TestSuite) GetResults() map[string]TestResult {
return s.results
}
// FormatJSON formats results as JSON.
func FormatJSON(results []TestResult) (output string, err error) {
var data []byte
if data, err = json.Marshal(results); err != nil {
return
}
return string(data), nil
}

550
relay-tester/tests.go Normal file
View File

@@ -0,0 +1,550 @@
package relaytester
import (
"fmt"
"time"
"next.orly.dev/pkg/encoders/hex"
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag"
)
// Test implementations - these are referenced by test.go
func testPublishBasicEvent(client *Client, key1, key2 *KeyPair) (result TestResult) {
ev, err := CreateEvent(key1.Secret, kind.TextNote.K, "test content", nil)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(ev); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, reason, err := client.WaitForOK(ev.ID, 5*time.Second)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get OK: %v", err)}
}
if !accepted {
return TestResult{Pass: false, Info: fmt.Sprintf("event rejected: %s", reason)}
}
return TestResult{Pass: true}
}
func testFindByID(client *Client, key1, key2 *KeyPair) (result TestResult) {
ev, err := CreateEvent(key1.Secret, kind.TextNote.K, "find by id test", nil)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(ev); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err := client.WaitForOK(ev.ID, 5*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "event not accepted"}
}
time.Sleep(200 * time.Millisecond)
filter := map[string]interface{}{
"ids": []string{hex.Enc(ev.ID)},
}
events, err := client.GetEvents("test-sub", []interface{}{filter}, 2*time.Second)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get events: %v", err)}
}
found := false
for _, e := range events {
if string(e.ID) == string(ev.ID) {
found = true
break
}
}
if !found {
return TestResult{Pass: false, Info: "event not found by ID"}
}
return TestResult{Pass: true}
}
func testFindByAuthor(client *Client, key1, key2 *KeyPair) (result TestResult) {
ev, err := CreateEvent(key1.Secret, kind.TextNote.K, "find by author test", nil)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(ev); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err := client.WaitForOK(ev.ID, 5*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "event not accepted"}
}
time.Sleep(200 * time.Millisecond)
filter := map[string]interface{}{
"authors": []string{hex.Enc(key1.Pubkey)},
}
events, err := client.GetEvents("test-author", []interface{}{filter}, 2*time.Second)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get events: %v", err)}
}
found := false
for _, e := range events {
if string(e.ID) == string(ev.ID) {
found = true
break
}
}
if !found {
return TestResult{Pass: false, Info: "event not found by author"}
}
return TestResult{Pass: true}
}
func testFindByKind(client *Client, key1, key2 *KeyPair) (result TestResult) {
ev, err := CreateEvent(key1.Secret, kind.TextNote.K, "find by kind test", nil)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(ev); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err := client.WaitForOK(ev.ID, 5*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "event not accepted"}
}
time.Sleep(200 * time.Millisecond)
filter := map[string]interface{}{
"kinds": []int{int(kind.TextNote.K)},
}
events, err := client.GetEvents("test-kind", []interface{}{filter}, 2*time.Second)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get events: %v", err)}
}
found := false
for _, e := range events {
if string(e.ID) == string(ev.ID) {
found = true
break
}
}
if !found {
return TestResult{Pass: false, Info: "event not found by kind"}
}
return TestResult{Pass: true}
}
func testFindByTags(client *Client, key1, key2 *KeyPair) (result TestResult) {
tags := tag.NewS(tag.NewFromBytesSlice([]byte("t"), []byte("test-tag")))
ev, err := CreateEvent(key1.Secret, kind.TextNote.K, "find by tags test", tags)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(ev); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err := client.WaitForOK(ev.ID, 5*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "event not accepted"}
}
time.Sleep(200 * time.Millisecond)
filter := map[string]interface{}{
"#t": []string{"test-tag"},
}
events, err := client.GetEvents("test-tags", []interface{}{filter}, 2*time.Second)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get events: %v", err)}
}
found := false
for _, e := range events {
if string(e.ID) == string(ev.ID) {
found = true
break
}
}
if !found {
return TestResult{Pass: false, Info: "event not found by tags"}
}
return TestResult{Pass: true}
}
func testFindByMultipleTags(client *Client, key1, key2 *KeyPair) (result TestResult) {
tags := tag.NewS(
tag.NewFromBytesSlice([]byte("t"), []byte("multi-tag-1")),
tag.NewFromBytesSlice([]byte("t"), []byte("multi-tag-2")),
)
ev, err := CreateEvent(key1.Secret, kind.TextNote.K, "find by multiple tags test", tags)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(ev); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err := client.WaitForOK(ev.ID, 5*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "event not accepted"}
}
time.Sleep(200 * time.Millisecond)
filter := map[string]interface{}{
"#t": []string{"multi-tag-1", "multi-tag-2"},
}
events, err := client.GetEvents("test-multi-tags", []interface{}{filter}, 2*time.Second)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get events: %v", err)}
}
found := false
for _, e := range events {
if string(e.ID) == string(ev.ID) {
found = true
break
}
}
if !found {
return TestResult{Pass: false, Info: "event not found by multiple tags"}
}
return TestResult{Pass: true}
}
func testFindByTimeRange(client *Client, key1, key2 *KeyPair) (result TestResult) {
ev, err := CreateEvent(key1.Secret, kind.TextNote.K, "find by time range test", nil)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(ev); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err := client.WaitForOK(ev.ID, 5*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "event not accepted"}
}
time.Sleep(200 * time.Millisecond)
now := time.Now().Unix()
filter := map[string]interface{}{
"since": now - 3600,
"until": now + 3600,
}
events, err := client.GetEvents("test-time", []interface{}{filter}, 2*time.Second)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get events: %v", err)}
}
found := false
for _, e := range events {
if string(e.ID) == string(ev.ID) {
found = true
break
}
}
if !found {
return TestResult{Pass: false, Info: "event not found by time range"}
}
return TestResult{Pass: true}
}
func testRejectInvalidSignature(client *Client, key1, key2 *KeyPair) (result TestResult) {
ev, err := CreateEvent(key1.Secret, kind.TextNote.K, "invalid sig test", nil)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
// Corrupt the signature
ev.Sig[0] ^= 0xFF
if err = client.Publish(ev); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, reason, err := client.WaitForOK(ev.ID, 5*time.Second)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get OK: %v", err)}
}
if accepted {
return TestResult{Pass: false, Info: "invalid signature was accepted"}
}
_ = reason
return TestResult{Pass: true}
}
func testRejectFutureEvent(client *Client, key1, key2 *KeyPair) (result TestResult) {
ev, err := CreateEvent(key1.Secret, kind.TextNote.K, "future event test", nil)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
ev.CreatedAt = time.Now().Unix() + 3601 // More than 1 hour in the future (should be rejected)
// Re-sign with new timestamp
if err = ev.Sign(key1.Secret); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to re-sign: %v", err)}
}
if err = client.Publish(ev); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, reason, err := client.WaitForOK(ev.ID, 5*time.Second)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get OK: %v", err)}
}
if accepted {
return TestResult{Pass: false, Info: "future event was accepted"}
}
_ = reason
return TestResult{Pass: true}
}
func testRejectExpiredEvent(client *Client, key1, key2 *KeyPair) (result TestResult) {
ev, err := CreateEvent(key1.Secret, kind.TextNote.K, "expired event test", nil)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
ev.CreatedAt = time.Now().Unix() - 86400*365 // 1 year ago
// Re-sign with new timestamp
if err = ev.Sign(key1.Secret); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to re-sign: %v", err)}
}
if err = client.Publish(ev); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err := client.WaitForOK(ev.ID, 5*time.Second)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get OK: %v", err)}
}
// Some relays may accept old events, so this is optional
if !accepted {
return TestResult{Pass: true, Info: "expired event rejected (expected)"}
}
return TestResult{Pass: true, Info: "expired event accepted (relay allows old events)"}
}
func testReplaceableEvents(client *Client, key1, key2 *KeyPair) (result TestResult) {
ev1, err := CreateReplaceableEvent(key1.Secret, kind.ProfileMetadata.K, "first version")
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(ev1); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err := client.WaitForOK(ev1.ID, 5*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "first event not accepted"}
}
time.Sleep(200 * time.Millisecond)
ev2, err := CreateReplaceableEvent(key1.Secret, kind.ProfileMetadata.K, "second version")
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(ev2); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err = client.WaitForOK(ev2.ID, 5*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "second event not accepted"}
}
// Wait longer for replacement to complete
time.Sleep(500 * time.Millisecond)
filter := map[string]interface{}{
"kinds": []int{int(kind.ProfileMetadata.K)},
"authors": []string{hex.Enc(key1.Pubkey)},
"limit": 2, // Set limit > 1 to get multiple versions of replaceable events
}
events, err := client.GetEvents("test-replaceable", []interface{}{filter}, 3*time.Second)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get events: %v", err)}
}
foundSecond := false
for _, e := range events {
if string(e.ID) == string(ev2.ID) {
foundSecond = true
break
}
}
if !foundSecond {
return TestResult{Pass: false, Info: "second replaceable event not found"}
}
return TestResult{Pass: true}
}
func testEphemeralEvents(client *Client, key1, key2 *KeyPair) (result TestResult) {
ev, err := CreateEphemeralEvent(key1.Secret, 20000, "ephemeral test")
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(ev); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err := client.WaitForOK(ev.ID, 5*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "ephemeral event not accepted"}
}
// Ephemeral events should not be stored, so query should not find them
time.Sleep(200 * time.Millisecond)
filter := map[string]interface{}{
"kinds": []int{20000},
}
events, err := client.GetEvents("test-ephemeral", []interface{}{filter}, 2*time.Second)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get events: %v", err)}
}
// Ephemeral events should not be queryable
for _, e := range events {
if string(e.ID) == string(ev.ID) {
return TestResult{Pass: false, Info: "ephemeral event was stored (should not be)"}
}
}
return TestResult{Pass: true}
}
func testParameterizedReplaceableEvents(client *Client, key1, key2 *KeyPair) (result TestResult) {
ev1, err := CreateParameterizedReplaceableEvent(key1.Secret, 30023, "first list", "test-list")
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(ev1); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err := client.WaitForOK(ev1.ID, 5*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "first event not accepted"}
}
time.Sleep(200 * time.Millisecond)
ev2, err := CreateParameterizedReplaceableEvent(key1.Secret, 30023, "second list", "test-list")
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(ev2); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err = client.WaitForOK(ev2.ID, 5*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "second event not accepted"}
}
return TestResult{Pass: true}
}
func testDeletionEvents(client *Client, key1, key2 *KeyPair) (result TestResult) {
// First create an event to delete
targetEv, err := CreateEvent(key1.Secret, kind.TextNote.K, "event to delete", nil)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(targetEv); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err := client.WaitForOK(targetEv.ID, 5*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "target event not accepted"}
}
// Wait longer for event to be indexed
time.Sleep(500 * time.Millisecond)
// Now create deletion event
deleteEv, err := CreateDeleteEvent(key1.Secret, [][]byte{targetEv.ID}, "deletion reason")
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create delete event: %v", err)}
}
if err = client.Publish(deleteEv); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err = client.WaitForOK(deleteEv.ID, 5*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "delete event not accepted"}
}
return TestResult{Pass: true}
}
func testCountRequest(client *Client, key1, key2 *KeyPair) (result TestResult) {
ev, err := CreateEvent(key1.Secret, kind.TextNote.K, "count test", nil)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(ev); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err := client.WaitForOK(ev.ID, 5*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "event not accepted"}
}
time.Sleep(200 * time.Millisecond)
filter := map[string]interface{}{
"kinds": []int{int(kind.TextNote.K)},
}
count, err := client.Count([]interface{}{filter})
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("COUNT failed: %v", err)}
}
if count < 1 {
return TestResult{Pass: false, Info: fmt.Sprintf("COUNT returned %d, expected at least 1", count)}
}
return TestResult{Pass: true}
}
func testLimitParameter(client *Client, key1, key2 *KeyPair) (result TestResult) {
// Publish multiple events
for i := 0; i < 5; i++ {
ev, err := CreateEvent(key1.Secret, kind.TextNote.K, fmt.Sprintf("limit test %d", i), nil)
if err != nil {
continue
}
client.Publish(ev)
client.WaitForOK(ev.ID, 2*time.Second)
}
time.Sleep(500 * time.Millisecond)
filter := map[string]interface{}{
"limit": 2,
}
events, err := client.GetEvents("test-limit", []interface{}{filter}, 2*time.Second)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get events: %v", err)}
}
// Limit should be respected (though exact count may vary)
if len(events) > 10 {
return TestResult{Pass: false, Info: fmt.Sprintf("got %d events, limit may not be working", len(events))}
}
return TestResult{Pass: true}
}
func testMultipleFilters(client *Client, key1, key2 *KeyPair) (result TestResult) {
ev1, err := CreateEvent(key1.Secret, kind.TextNote.K, "filter 1", nil)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
ev2, err := CreateEvent(key2.Secret, kind.TextNote.K, "filter 2", nil)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to create event: %v", err)}
}
if err = client.Publish(ev1); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
if err = client.Publish(ev2); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to publish: %v", err)}
}
accepted, _, err := client.WaitForOK(ev1.ID, 2*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "event 1 not accepted"}
}
accepted, _, err = client.WaitForOK(ev2.ID, 2*time.Second)
if err != nil || !accepted {
return TestResult{Pass: false, Info: "event 2 not accepted"}
}
time.Sleep(300 * time.Millisecond)
filter1 := map[string]interface{}{
"authors": []string{hex.Enc(key1.Pubkey)},
}
filter2 := map[string]interface{}{
"authors": []string{hex.Enc(key2.Pubkey)},
}
events, err := client.GetEvents("test-multi-filter", []interface{}{filter1, filter2}, 2*time.Second)
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to get events: %v", err)}
}
if len(events) < 2 {
return TestResult{Pass: false, Info: fmt.Sprintf("got %d events, expected at least 2", len(events))}
}
return TestResult{Pass: true}
}
func testSubscriptionClose(client *Client, key1, key2 *KeyPair) (result TestResult) {
ch, err := client.Subscribe("close-test", []interface{}{map[string]interface{}{}})
if err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to subscribe: %v", err)}
}
if err = client.Unsubscribe("close-test"); err != nil {
return TestResult{Pass: false, Info: fmt.Sprintf("failed to unsubscribe: %v", err)}
}
// Channel should be closed
select {
case _, ok := <-ch:
if ok {
return TestResult{Pass: false, Info: "subscription channel not closed"}
}
default:
// Channel already closed, which is fine
}
return TestResult{Pass: true}
}

245
relay_test.go Normal file
View File

@@ -0,0 +1,245 @@
package main
import (
"fmt"
"net"
"os"
"path/filepath"
"testing"
"time"
lol "lol.mleku.dev"
"next.orly.dev/app/config"
"next.orly.dev/pkg/run"
relaytester "next.orly.dev/relay-tester"
)
var (
testRelayURL string
testName string
testJSON bool
keepDataDir bool
relayPort int
relayDataDir string
)
func TestRelay(t *testing.T) {
var err error
var relay *run.Relay
var relayURL string
// Determine relay URL
if testRelayURL != "" {
relayURL = testRelayURL
} else {
// Start local relay for testing
var port int
if relay, port, err = startTestRelay(); err != nil {
t.Fatalf("Failed to start test relay: %v", err)
}
defer func() {
if stopErr := relay.Stop(); stopErr != nil {
t.Logf("Error stopping relay: %v", stopErr)
}
}()
relayURL = fmt.Sprintf("ws://127.0.0.1:%d", port)
t.Logf("Waiting for relay to be ready at %s...", relayURL)
// Wait for relay to be ready - try connecting to verify it's up
if err = waitForRelay(relayURL, 10*time.Second); err != nil {
t.Fatalf("Relay not ready after timeout: %v", err)
}
t.Logf("Relay is ready at %s", relayURL)
}
// Create test suite
t.Logf("Creating test suite for %s...", relayURL)
suite, err := relaytester.NewTestSuite(relayURL)
if err != nil {
t.Fatalf("Failed to create test suite: %v", err)
}
t.Logf("Test suite created, running tests...")
// Run tests
var results []relaytester.TestResult
if testName != "" {
// Run specific test
result, err := suite.RunTest(testName)
if err != nil {
t.Fatalf("Failed to run test %s: %v", testName, err)
}
results = []relaytester.TestResult{result}
} else {
// Run all tests
if results, err = suite.Run(); err != nil {
t.Fatalf("Failed to run tests: %v", err)
}
}
// Output results
if testJSON {
jsonOutput, err := relaytester.FormatJSON(results)
if err != nil {
t.Fatalf("Failed to format JSON: %v", err)
}
fmt.Println(jsonOutput)
} else {
outputResults(results, t)
}
// Check if any required tests failed
for _, result := range results {
if result.Required && !result.Pass {
t.Errorf("Required test '%s' failed: %s", result.Name, result.Info)
}
}
}
func startTestRelay() (relay *run.Relay, port int, err error) {
cfg := &config.C{
AppName: "ORLY-TEST",
DataDir: relayDataDir,
Listen: "127.0.0.1",
Port: 0, // Always use random port, unless overridden via -port flag
HealthPort: 0,
EnableShutdown: false,
LogLevel: "warn",
DBLogLevel: "warn",
DBBlockCacheMB: 512,
DBIndexCacheMB: 256,
LogToStdout: false,
PprofHTTP: false,
ACLMode: "none",
AuthRequired: false,
AuthToWrite: false,
SubscriptionEnabled: false,
MonthlyPriceSats: 6000,
FollowListFrequency: time.Hour,
WebDisableEmbedded: false,
SprocketEnabled: false,
SpiderMode: "none",
PolicyEnabled: false,
}
// Use explicitly set port if provided via flag, otherwise find an available port
if relayPort > 0 {
cfg.Port = relayPort
} else {
var listener net.Listener
if listener, err = net.Listen("tcp", "127.0.0.1:0"); err != nil {
return nil, 0, fmt.Errorf("failed to find available port: %w", err)
}
addr := listener.Addr().(*net.TCPAddr)
cfg.Port = addr.Port
listener.Close()
}
// Set default data dir if not specified
if cfg.DataDir == "" {
tmpDir := filepath.Join(os.TempDir(), fmt.Sprintf("orly-test-%d", time.Now().UnixNano()))
cfg.DataDir = tmpDir
}
// Set up logging
lol.SetLogLevel(cfg.LogLevel)
// Create options
cleanup := !keepDataDir
opts := &run.Options{
CleanupDataDir: &cleanup,
}
// Start relay
if relay, err = run.Start(cfg, opts); err != nil {
return nil, 0, fmt.Errorf("failed to start relay: %w", err)
}
return relay, cfg.Port, nil
}
// waitForRelay waits for the relay to be ready by attempting to connect
func waitForRelay(url string, timeout time.Duration) error {
// Extract host:port from ws:// URL
addr := url
if len(url) > 7 && url[:5] == "ws://" {
addr = url[5:]
}
deadline := time.Now().Add(timeout)
attempts := 0
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", addr, 500*time.Millisecond)
if err == nil {
conn.Close()
return nil
}
attempts++
if attempts%10 == 0 {
// Log every 10th attempt (every second)
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("timeout waiting for relay at %s after %d attempts", url, attempts)
}
func outputResults(results []relaytester.TestResult, t *testing.T) {
passed := 0
failed := 0
requiredFailed := 0
for _, result := range results {
if result.Pass {
passed++
t.Logf("PASS: %s", result.Name)
} else {
failed++
if result.Required {
requiredFailed++
t.Errorf("FAIL (required): %s - %s", result.Name, result.Info)
} else {
t.Logf("FAIL (optional): %s - %s", result.Name, result.Info)
}
}
}
t.Logf("\nTest Summary:")
t.Logf(" Total: %d", len(results))
t.Logf(" Passed: %d", passed)
t.Logf(" Failed: %d", failed)
t.Logf(" Required Failed: %d", requiredFailed)
}
// TestMain allows custom test setup/teardown
func TestMain(m *testing.M) {
// Manually parse our custom flags to avoid conflicts with Go's test flags
for i := 1; i < len(os.Args); i++ {
arg := os.Args[i]
switch arg {
case "-relay-url":
if i+1 < len(os.Args) {
testRelayURL = os.Args[i+1]
i++
}
case "-test-name":
if i+1 < len(os.Args) {
testName = os.Args[i+1]
i++
}
case "-json":
testJSON = true
case "-keep-data":
keepDataDir = true
case "-port":
if i+1 < len(os.Args) {
fmt.Sscanf(os.Args[i+1], "%d", &relayPort)
i++
}
case "-data-dir":
if i+1 < len(os.Args) {
relayDataDir = os.Args[i+1]
i++
}
}
}
code := m.Run()
os.Exit(code)
}

198
scripts/run-policy-filter-test.sh Executable file
View File

@@ -0,0 +1,198 @@
#!/bin/bash
set -euo pipefail
# Policy Filter Integration Test
# This script runs the relay with the example policy and tests event filtering
# Config
PORT=${PORT:-34568}
URL=${URL:-ws://127.0.0.1:${PORT}}
LOG=/tmp/orly-policy-filter.out
PID=/tmp/orly-policy-filter.pid
DATADIR=$(mktemp -d)
CONFIG_DIR="$HOME/.config/ORLY_POLICY_TEST"
cleanup() {
trap - EXIT
if [[ -f "$PID" ]]; then
kill -INT "$(cat "$PID")" 2>/dev/null || true
rm -f "$PID"
fi
rm -rf "$DATADIR"
rm -rf "$CONFIG_DIR"
}
trap cleanup EXIT
echo "🧪 Policy Filter Integration Test"
echo "=================================="
# Create config directory
mkdir -p "$CONFIG_DIR"
# Generate keys using Go helper
echo "🔑 Generating test keys..."
KEYGEN_TMP=$(mktemp)
cat > "$KEYGEN_TMP.go" <<'EOF'
package main
import (
"encoding/json"
"fmt"
"next.orly.dev/pkg/crypto/p256k"
"next.orly.dev/pkg/encoders/hex"
)
func main() {
// Generate allowed signer
allowedSigner := &p256k.Signer{}
if err := allowedSigner.Generate(); err != nil {
panic(err)
}
allowedPubkeyHex := hex.Enc(allowedSigner.Pub())
allowedSecHex := hex.Enc(allowedSigner.Sec())
// Generate unauthorized signer
unauthorizedSigner := &p256k.Signer{}
if err := unauthorizedSigner.Generate(); err != nil {
panic(err)
}
unauthorizedPubkeyHex := hex.Enc(unauthorizedSigner.Pub())
unauthorizedSecHex := hex.Enc(unauthorizedSigner.Sec())
result := map[string]string{
"allowedPubkey": allowedPubkeyHex,
"allowedSec": allowedSecHex,
"unauthorizedPubkey": unauthorizedPubkeyHex,
"unauthorizedSec": unauthorizedSecHex,
}
jsonBytes, _ := json.Marshal(result)
fmt.Println(string(jsonBytes))
}
EOF
# Run from the project root directory
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
cd "$PROJECT_ROOT"
KEYS=$(go run -tags=cgo "$KEYGEN_TMP.go" 2>&1 | grep -E '^\{.*\}$' || true)
rm -f "$KEYGEN_TMP.go"
cd - > /dev/null
ALLOWED_PUBKEY=$(echo "$KEYS" | jq -r '.allowedPubkey')
ALLOWED_SEC=$(echo "$KEYS" | jq -r '.allowedSec')
UNAUTHORIZED_PUBKEY=$(echo "$KEYS" | jq -r '.unauthorizedPubkey')
UNAUTHORIZED_SEC=$(echo "$KEYS" | jq -r '.unauthorizedSec')
echo "✅ Generated keys:"
echo " Allowed pubkey: $ALLOWED_PUBKEY"
echo " Unauthorized pubkey: $UNAUTHORIZED_PUBKEY"
# Create policy JSON with generated keys
echo "📝 Creating policy.json..."
cat > "$CONFIG_DIR/policy.json" <<EOF
{
"kind": {
"whitelist": [4678, 10306, 30520, 30919]
},
"rules": {
"4678": {
"description": "Zenotp message events",
"script": "$CONFIG_DIR/validate4678.js",
"privileged": true
},
"10306": {
"description": "End user whitelist changes",
"read_allow": [
"$ALLOWED_PUBKEY"
],
"privileged": true
},
"30520": {
"description": "Zenotp events",
"write_allow": [
"$ALLOWED_PUBKEY"
],
"privileged": true
},
"30919": {
"description": "Zenotp events",
"write_allow": [
"$ALLOWED_PUBKEY"
],
"privileged": true
}
}
}
EOF
echo "✅ Policy file created at: $CONFIG_DIR/policy.json"
# Build relay and test client
echo "🔨 Building relay..."
go build -o orly .
# Start relay
echo "🚀 Starting relay on ${URL} with policy enabled..."
ORLY_APP_NAME="ORLY_POLICY_TEST" \
ORLY_DATA_DIR="$DATADIR" \
ORLY_PORT=${PORT} \
ORLY_POLICY_ENABLED=true \
ORLY_ACL_MODE=none \
ORLY_AUTH_TO_WRITE=true \
ORLY_LOG_LEVEL=info \
./orly >"$LOG" 2>&1 & echo $! >"$PID"
# Wait for relay to start
sleep 3
if ! ps -p "$(cat "$PID")" >/dev/null 2>&1; then
echo "❌ Relay failed to start; logs:" >&2
sed -n '1,200p' "$LOG" >&2
exit 1
fi
echo "✅ Relay started (PID: $(cat "$PID"))"
# Build test client
echo "🔨 Building test client..."
go build -o cmd/policyfiltertest/policyfiltertest ./cmd/policyfiltertest
# Export keys for test client
export ALLOWED_PUBKEY
export ALLOWED_SEC
export UNAUTHORIZED_PUBKEY
export UNAUTHORIZED_SEC
# Run tests
echo "🧪 Running policy filter tests..."
set +e
cmd/policyfiltertest/policyfiltertest -url "${URL}" -allowed-pubkey "$ALLOWED_PUBKEY" -allowed-sec "$ALLOWED_SEC" -unauthorized-pubkey "$UNAUTHORIZED_PUBKEY" -unauthorized-sec "$UNAUTHORIZED_SEC"
TEST_RESULT=$?
set -e
# Check logs for "policy rule is inactive" messages
echo "📋 Checking logs for policy rule inactivity..."
if grep -q "policy rule is inactive" "$LOG"; then
echo "⚠️ WARNING: Found 'policy rule is inactive' messages in logs"
grep "policy rule is inactive" "$LOG" | head -5
else
echo "✅ No 'policy rule is inactive' messages found (good)"
fi
# Check logs for policy filtered events
echo "📋 Checking logs for policy filtered events..."
if grep -q "policy filtered out event" "$LOG"; then
echo "✅ Found policy filtered events (expected):"
grep "policy filtered out event" "$LOG" | head -5
fi
if [ $TEST_RESULT -eq 0 ]; then
echo "✅ All tests passed!"
exit 0
else
echo "❌ Tests failed with exit code $TEST_RESULT"
echo "📋 Last 50 lines of relay log:"
tail -50 "$LOG"
exit $TEST_RESULT
fi

Submodule scripts/secp256k1 deleted from 0cdc758a56

0
scripts/sprocket/SPROCKET_TEST_README.md Normal file → Executable file
View File

0
scripts/sprocket/test-sprocket-complete.sh Normal file → Executable file
View File

0
scripts/sprocket/test-sprocket-demo.sh Normal file → Executable file
View File

0
scripts/sprocket/test-sprocket-example.sh Normal file → Executable file
View File

0
scripts/sprocket/test-sprocket-final.sh Normal file → Executable file
View File

0
scripts/sprocket/test-sprocket-manual.sh Normal file → Executable file
View File

0
scripts/sprocket/test-sprocket-simple.sh Normal file → Executable file
View File

0
scripts/sprocket/test-sprocket-working.sh Normal file → Executable file
View File

0
scripts/sprocket/test-sprocket.py Normal file → Executable file
View File

0
scripts/sprocket/test-sprocket.sh Normal file → Executable file
View File