Files
next.orly.dev/.claude/skills/nostr-websocket/references/khatru_implementation.md
mleku d604341a27
Some checks failed
Go / build (push) Has been cancelled
Go / release (push) Has been cancelled
Add comprehensive documentation for CLAUDE and Nostr WebSocket skills
- Introduced CLAUDE.md to provide guidance for working with the Claude Code repository, including project overview, build commands, testing guidelines, and performance considerations.
- Added INDEX.md for a structured overview of the strfry WebSocket implementation analysis, detailing document contents and usage.
- Created SKILL.md for the nostr-websocket skill, covering WebSocket protocol fundamentals, connection management, and performance optimization techniques.
- Included multiple reference documents for Go, C++, and Rust implementations of WebSocket patterns, enhancing the knowledge base for developers.
- Updated deployment and build documentation to reflect new multi-platform capabilities and pure Go build processes.
- Bumped version to reflect the addition of extensive documentation and resources for developers working with Nostr relays and WebSocket connections.
2025-11-06 16:18:09 +00:00

1276 lines
35 KiB
Markdown

# Go WebSocket Implementation for Nostr Relays (khatru patterns)
This reference documents production-ready WebSocket patterns from the khatru Nostr relay implementation in Go.
## Repository Information
- **Project:** khatru - Nostr relay framework
- **Repository:** https://github.com/fiatjaf/khatru
- **Language:** Go
- **WebSocket Library:** `github.com/fasthttp/websocket`
- **Architecture:** Hook-based plugin system with dual-goroutine per connection
## Core Architecture
### Relay Structure
```go
// relay.go, lines 54-119
type Relay struct {
// Service configuration
ServiceURL string
upgrader websocket.Upgrader
// WebSocket lifecycle hooks
RejectConnection []func(r *http.Request) bool
OnConnect []func(ctx context.Context)
OnDisconnect []func(ctx context.Context)
// Event processing hooks
RejectEvent []func(ctx context.Context, event *nostr.Event) (reject bool, msg string)
OverwriteDeletionOutcome []func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string)
StoreEvent []func(ctx context.Context, event *nostr.Event) error
ReplaceEvent []func(ctx context.Context, event *nostr.Event) error
DeleteEvent []func(ctx context.Context, event *nostr.Event) error
OnEventSaved []func(ctx context.Context, event *nostr.Event)
OnEphemeralEvent []func(ctx context.Context, event *nostr.Event)
// Filter/query hooks
RejectFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
OverwriteFilter []func(ctx context.Context, filter *nostr.Filter)
QueryEvents []func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error)
CountEvents []func(ctx context.Context, filter nostr.Filter) (int64, error)
CountEventsHLL []func(ctx context.Context, filter nostr.Filter, offset int) (int64, *hyperloglog.HyperLogLog, error)
// Broadcast control
PreventBroadcast []func(ws *WebSocket, event *nostr.Event) bool
OverwriteResponseEvent []func(ctx context.Context, event *nostr.Event)
// Client tracking
clients map[*WebSocket][]listenerSpec
listeners []listener
clientsMutex sync.Mutex
// WebSocket parameters
WriteWait time.Duration // Default: 10 seconds
PongWait time.Duration // Default: 60 seconds
PingPeriod time.Duration // Default: 30 seconds
MaxMessageSize int64 // Default: 512000 bytes
// Router support (for multi-relay setups)
routes []Route
getSubRelayFromEvent func(*nostr.Event) *Relay
getSubRelayFromFilter func(nostr.Filter) *Relay
// Protocol extensions
Negentropy bool // NIP-77 support
}
```
### WebSocket Configuration
```go
// relay.go, lines 31-35
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
},
```
**Key configuration choices:**
- **1 KB read/write buffers:** Small buffers for many concurrent connections
- **Allow all origins:** Nostr is designed for public relays; adjust for private relays
- **No compression by default:** Can be enabled with `EnableCompression: true`
**Recommended production settings:**
```go
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
EnableCompression: true, // 60-80% bandwidth reduction
CheckOrigin: func(r *http.Request) bool {
// For public relays: return true
// For private relays: validate origin
origin := r.Header.Get("Origin")
return isAllowedOrigin(origin)
},
},
```
## WebSocket Connection Structure
### Connection Wrapper
```go
// websocket.go, lines 12-32
type WebSocket struct {
conn *websocket.Conn
mutex sync.Mutex // Protects all write operations
// Original HTTP request (for IP, headers, etc.)
Request *http.Request
// Connection lifecycle context
Context context.Context
cancel context.CancelFunc
// NIP-42 authentication
Challenge string // Random 8-byte hex string
AuthedPublicKey string // Authenticated pubkey after AUTH
Authed chan struct{} // Closed when authenticated
authLock sync.Mutex
// NIP-77 negentropy sessions (for efficient set reconciliation)
negentropySessions *xsync.MapOf[string, *NegentropySession]
}
```
**Design decisions:**
1. **Mutex for writes:** WebSocket library panics on concurrent writes; mutex is simplest solution
2. **Context-based lifecycle:** Clean cancellation propagation to all operations
3. **Original request preservation:** Enables IP extraction, header inspection
4. **NIP-42 challenge storage:** No database lookup needed for authentication
5. **Lock-free session map:** `xsync.MapOf` provides concurrent access without locks
### Thread-Safe Write Operations
```go
// websocket.go, lines 34-46
func (ws *WebSocket) WriteJSON(any any) error {
ws.mutex.Lock()
err := ws.conn.WriteJSON(any)
ws.mutex.Unlock()
return err
}
func (ws *WebSocket) WriteMessage(t int, b []byte) error {
ws.mutex.Lock()
err := ws.conn.WriteMessage(t, b)
ws.mutex.Unlock()
return err
}
```
**Critical pattern:** ALL writes to WebSocket MUST be protected by mutex
**Common mistake:**
```go
// DON'T DO THIS - Race condition!
go func() {
ws.conn.WriteJSON(msg1) // Not protected
}()
go func() {
ws.conn.WriteJSON(msg2) // Not protected
}()
```
**Correct approach:**
```go
// DO THIS - Protected writes
go func() {
ws.WriteJSON(msg1) // Uses mutex
}()
go func() {
ws.WriteJSON(msg2) // Uses mutex
}()
```
## Connection Lifecycle
### HTTP to WebSocket Upgrade
```go
// handlers.go, lines 29-52
func (rl *Relay) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// CORS middleware for non-WebSocket requests
corsMiddleware := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{
http.MethodHead,
http.MethodGet,
http.MethodPost,
http.MethodPut,
http.MethodPatch,
http.MethodDelete,
},
AllowedHeaders: []string{"Authorization", "*"},
MaxAge: 86400,
})
// Route based on request type
if r.Header.Get("Upgrade") == "websocket" {
rl.HandleWebsocket(w, r) // WebSocket connection
} else if r.Header.Get("Accept") == "application/nostr+json" {
corsMiddleware.Handler(http.HandlerFunc(rl.HandleNIP11)).ServeHTTP(w, r) // NIP-11 metadata
} else if r.Header.Get("Content-Type") == "application/nostr+json+rpc" {
corsMiddleware.Handler(http.HandlerFunc(rl.HandleNIP86)).ServeHTTP(w, r) // NIP-86 management
} else {
corsMiddleware.Handler(rl.serveMux).ServeHTTP(w, r) // Other routes
}
}
```
**Pattern:** Single HTTP handler multiplexes all request types by headers
### WebSocket Upgrade Process
```go
// handlers.go, lines 55-105
func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
// Pre-upgrade rejection hooks
for _, reject := range rl.RejectConnection {
if reject(r) {
w.WriteHeader(429) // Too Many Requests
return
}
}
// Perform WebSocket upgrade
conn, err := rl.upgrader.Upgrade(w, r, nil)
if err != nil {
rl.Log.Printf("failed to upgrade websocket: %v\n", err)
return
}
// Create ping ticker for keep-alive
ticker := time.NewTicker(rl.PingPeriod)
// Generate NIP-42 authentication challenge
challenge := make([]byte, 8)
rand.Read(challenge)
// Initialize WebSocket wrapper
ws := &WebSocket{
conn: conn,
Request: r,
Challenge: hex.EncodeToString(challenge),
negentropySessions: xsync.NewMapOf[string, *NegentropySession](),
}
ws.Context, ws.cancel = context.WithCancel(context.Background())
// Register client
rl.clientsMutex.Lock()
rl.clients[ws] = make([]listenerSpec, 0, 2)
rl.clientsMutex.Unlock()
// Create connection context with WebSocket reference
ctx, cancel := context.WithCancel(
context.WithValue(context.Background(), wsKey, ws),
)
// Cleanup function for both goroutines
kill := func() {
// Trigger disconnect hooks
for _, ondisconnect := range rl.OnDisconnect {
ondisconnect(ctx)
}
// Stop timers and cancel contexts
ticker.Stop()
cancel()
ws.cancel()
// Close connection
ws.conn.Close()
// Remove from tracking
rl.removeClientAndListeners(ws)
}
// Launch read and write goroutines
go readLoop(ws, ctx, kill)
go writeLoop(ws, ctx, ticker, kill)
}
```
**Key steps:**
1. Check rejection hooks (rate limiting, IP bans, etc.)
2. Upgrade HTTP connection to WebSocket
3. Generate authentication challenge (NIP-42)
4. Initialize WebSocket wrapper with context
5. Register client in tracking map
6. Define cleanup function
7. Launch read and write goroutines
### Read Loop (Primary Goroutine)
```go
// handlers.go, lines 107-414
go func() {
defer kill()
// Configure read constraints
ws.conn.SetReadLimit(rl.MaxMessageSize)
ws.conn.SetReadDeadline(time.Now().Add(rl.PongWait))
// Auto-refresh deadline on Pong receipt
ws.conn.SetPongHandler(func(string) error {
ws.conn.SetReadDeadline(time.Now().Add(rl.PongWait))
return nil
})
// Trigger connection hooks
for _, onconnect := range rl.OnConnect {
onconnect(ctx)
}
// Create message parser (sonic parser is stateful)
smp := nostr.NewMessageParser()
for {
// Read message (blocks until data available)
typ, msgb, err := ws.conn.ReadMessage()
if err != nil {
// Check if expected close
if websocket.IsUnexpectedCloseError(
err,
websocket.CloseNormalClosure, // 1000
websocket.CloseGoingAway, // 1001
websocket.CloseNoStatusReceived, // 1005
websocket.CloseAbnormalClosure, // 1006
4537, // Custom: client preference
) {
rl.Log.Printf("unexpected close error from %s: %v\n",
GetIPFromRequest(r), err)
}
ws.cancel()
return
}
// Handle Ping manually (library should auto-respond, but...)
if typ == websocket.PingMessage {
ws.WriteMessage(websocket.PongMessage, nil)
continue
}
// Zero-copy conversion to string
message := unsafe.String(unsafe.SliceData(msgb), len(msgb))
// Parse message (sequential due to sonic parser constraint)
envelope, err := smp.ParseMessage(message)
// Handle message in separate goroutine (concurrent processing)
go func(message string) {
switch env := envelope.(type) {
case *nostr.EventEnvelope:
handleEvent(ctx, ws, env, rl)
case *nostr.ReqEnvelope:
handleReq(ctx, ws, env, rl)
case *nostr.CloseEnvelope:
handleClose(ctx, ws, env, rl)
case *nostr.CountEnvelope:
handleCount(ctx, ws, env, rl)
case *nostr.AuthEnvelope:
handleAuth(ctx, ws, env, rl)
case *nip77.OpenEnvelope:
handleNegentropyOpen(ctx, ws, env, rl)
case *nip77.MessageEnvelope:
handleNegentropyMsg(ctx, ws, env, rl)
case *nip77.CloseEnvelope:
handleNegentropyClose(ctx, ws, env, rl)
default:
ws.WriteJSON(nostr.NoticeEnvelope("unknown message type"))
}
}(message)
}
}()
```
**Critical patterns:**
1. **SetReadDeadline + SetPongHandler:** Automatic timeout detection
- Read blocks up to `PongWait` (60s)
- Pong receipt resets deadline
- No Pong = timeout error = connection dead
2. **Zero-copy string conversion:**
```go
message := unsafe.String(unsafe.SliceData(msgb), len(msgb))
```
- Avoids allocation when converting `[]byte` to `string`
- Safe because `msgb` is newly allocated by `ReadMessage()`
3. **Sequential parsing, concurrent handling:**
- `smp.ParseMessage()` called sequentially (parser has state)
- Message handling dispatched to goroutine (concurrent)
- Balances correctness and performance
4. **Goroutine-per-message pattern:**
```go
go func(message string) {
// Handle message
}(message)
```
- Allows next message to be read immediately
- Prevents slow handler blocking read loop
- Captures `message` to avoid data race
### Write Loop (Ping Goroutine)
```go
// handlers.go, lines 416-433
go func() {
defer kill()
for {
select {
case <-ctx.Done():
// Connection closed or context canceled
return
case <-ticker.C:
// Send ping every PingPeriod (30s)
err := ws.WriteMessage(websocket.PingMessage, nil)
if err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
rl.Log.Printf("error writing ping: %v; closing websocket\n", err)
}
return
}
}
}
}()
```
**Purpose:**
- Send periodic pings to detect dead connections
- Uses `select` to monitor context cancellation
- Returns on any write error (connection dead)
**Timing relationship:**
```
PingPeriod: 30 seconds (send ping every 30s)
PongWait: 60 seconds (expect pong within 60s)
Rule: PingPeriod < PongWait
If client doesn't respond to 2 consecutive pings,
connection times out after 60 seconds.
```
### Connection Cleanup
```go
kill := func() {
// 1. Trigger disconnect hooks
for _, ondisconnect := range rl.OnDisconnect {
ondisconnect(ctx)
}
// 2. Stop timers
ticker.Stop()
// 3. Cancel contexts
cancel()
ws.cancel()
// 4. Close connection
ws.conn.Close()
// 5. Remove from tracking
rl.removeClientAndListeners(ws)
}
defer kill()
```
**Cleanup order:**
1. **Hooks first:** Allow app to log, update stats
2. **Stop timers:** Prevent goroutine leaks
3. **Cancel contexts:** Signal cancellation to operations
4. **Close connection:** Release network resources
5. **Remove tracking:** Clean up maps
**Why defer?** Ensures cleanup runs even if goroutine panics
## Message Handling
### Event Handling (EVENT)
```go
// handlers.go, lines 163-258
case *nostr.EventEnvelope:
// Validate event ID (must match hash of content)
if !env.Event.CheckID() {
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "invalid: id is computed incorrectly",
})
return
}
// Validate signature
if ok, err := env.Event.CheckSignature(); err != nil {
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "error: failed to verify signature",
})
return
} else if !ok {
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "invalid: signature is invalid",
})
return
}
// Check NIP-70 protected events
if nip70.IsProtected(env.Event) {
authed := GetAuthed(ctx)
if authed == "" {
// Request authentication
RequestAuth(ctx)
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "auth-required: must be published by authenticated event author",
})
return
}
}
// Route to subrelay if using relay routing
srl := rl
if rl.getSubRelayFromEvent != nil {
srl = rl.getSubRelayFromEvent(&env.Event)
}
// Handle event based on kind
var skipBroadcast bool
var writeErr error
if env.Event.Kind == 5 {
// Deletion event
writeErr = srl.handleDeleteRequest(ctx, &env.Event)
} else if nostr.IsEphemeralKind(env.Event.Kind) {
// Ephemeral event (20000-29999)
writeErr = srl.handleEphemeral(ctx, &env.Event)
} else {
// Normal event
skipBroadcast, writeErr = srl.handleNormal(ctx, &env.Event)
}
// Broadcast to subscribers (unless prevented)
if !skipBroadcast {
n := srl.notifyListeners(&env.Event)
// Can update reason with broadcast count
}
// Send OK response
ok := writeErr == nil
reason := ""
if writeErr != nil {
reason = writeErr.Error()
}
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: ok,
Reason: reason,
})
```
**Validation sequence:**
1. Check event ID matches content hash
2. Verify cryptographic signature
3. Check authentication if protected event (NIP-70)
4. Route to appropriate subrelay (if multi-relay setup)
5. Handle based on kind (deletion, ephemeral, normal)
6. Broadcast to matching subscriptions
7. Send OK response to publisher
### Request Handling (REQ)
```go
// handlers.go, lines 289-324
case *nostr.ReqEnvelope:
// Create WaitGroup for EOSE synchronization
eose := sync.WaitGroup{}
eose.Add(len(env.Filters))
// Create cancelable context for subscription
reqCtx, cancelReqCtx := context.WithCancelCause(ctx)
// Expose subscription ID in context
reqCtx = context.WithValue(reqCtx, subscriptionIdKey, env.SubscriptionID)
// Handle each filter
for _, filter := range env.Filters {
// Route to appropriate subrelay
srl := rl
if rl.getSubRelayFromFilter != nil {
srl = rl.getSubRelayFromFilter(filter)
}
// Query stored events
err := srl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter)
if err != nil {
// Fail entire subscription if any filter rejected
reason := err.Error()
if strings.HasPrefix(reason, "auth-required:") {
RequestAuth(ctx)
}
ws.WriteJSON(nostr.ClosedEnvelope{
SubscriptionID: env.SubscriptionID,
Reason: reason,
})
cancelReqCtx(errors.New("filter rejected"))
return
} else {
// Add listener for real-time events
rl.addListener(ws, env.SubscriptionID, srl, filter, cancelReqCtx)
}
}
// Send EOSE when all stored events dispatched
go func() {
eose.Wait()
ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID))
}()
```
**Subscription lifecycle:**
1. **Parse filters:** Client sends array of filters in REQ
2. **Create context:** Allows cancellation of subscription
3. **Query database:** For each filter, query stored events
4. **Stream results:** Send matching events to client
5. **Send EOSE:** End Of Stored Events marker
6. **Add listener:** Subscribe to real-time events
**WaitGroup pattern:**
```go
eose := sync.WaitGroup{}
eose.Add(len(env.Filters))
// Each query handler calls eose.Done() when complete
go func() {
eose.Wait() // Wait for all queries
ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID))
}()
```
### Close Handling (CLOSE)
```go
// handlers.go, lines 325-327
case *nostr.CloseEnvelope:
id := string(*env)
rl.removeListenerId(ws, id)
```
**Simple unsubscribe:** Remove listener by subscription ID
### Authentication (AUTH)
```go
// handlers.go, lines 328-341
case *nostr.AuthEnvelope:
// Compute relay WebSocket URL
wsBaseUrl := strings.Replace(rl.getBaseURL(r), "http", "ws", 1)
// Validate AUTH event
if pubkey, ok := nip42.ValidateAuthEvent(&env.Event, ws.Challenge, wsBaseUrl); ok {
// Store authenticated pubkey
ws.AuthedPublicKey = pubkey
// Close Authed channel (unblocks any waiting goroutines)
ws.authLock.Lock()
if ws.Authed != nil {
close(ws.Authed)
ws.Authed = nil
}
ws.authLock.Unlock()
// Send OK response
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: true})
} else {
// Validation failed
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "error: failed to authenticate",
})
}
```
**NIP-42 authentication:**
1. Client receives AUTH challenge on connect
2. Client creates kind-22242 event with challenge
3. Server validates event signature and challenge match
4. Server stores authenticated pubkey in `ws.AuthedPublicKey`
## Subscription Management
### Subscription Data Structures
```go
// listener.go, lines 13-24
type listenerSpec struct {
id string // Subscription ID from REQ
cancel context.CancelCauseFunc // Cancels this subscription
index int // Position in subrelay.listeners array
subrelay *Relay // Reference to (sub)relay handling this
}
type listener struct {
id string // Subscription ID
filter nostr.Filter // Filter for matching events
ws *WebSocket // WebSocket connection
}
```
**Two-level tracking:**
1. **Per-client specs:** `clients map[*WebSocket][]listenerSpec`
- Tracks what subscriptions each client has
- Enables cleanup when client disconnects
2. **Per-relay listeners:** `listeners []listener`
- Flat array for fast iteration when broadcasting
- No maps, no allocations during broadcast
### Adding Listeners
```go
// listener.go, lines 36-60
func (rl *Relay) addListener(
ws *WebSocket,
id string,
subrelay *Relay,
filter nostr.Filter,
cancel context.CancelCauseFunc,
) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// Get position where listener will be added
idx := len(subrelay.listeners)
// Add spec to client's list
rl.clients[ws] = append(specs, listenerSpec{
id: id,
cancel: cancel,
subrelay: subrelay,
index: idx,
})
// Add listener to relay's list
subrelay.listeners = append(subrelay.listeners, listener{
ws: ws,
id: id,
filter: filter,
})
}
}
```
**O(1) append operation**
### Removing Listeners by ID
```go
// listener.go, lines 64-99
func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// Iterate backwards for safe removal
for s := len(specs) - 1; s >= 0; s-- {
spec := specs[s]
if spec.id == id {
// Cancel subscription context
spec.cancel(ErrSubscriptionClosedByClient)
// Swap-delete from specs array
specs[s] = specs[len(specs)-1]
specs = specs[0 : len(specs)-1]
rl.clients[ws] = specs
// Remove from listener list in subrelay
srl := spec.subrelay
// If not last element, swap with last
if spec.index != len(srl.listeners)-1 {
movedFromIndex := len(srl.listeners) - 1
moved := srl.listeners[movedFromIndex]
srl.listeners[spec.index] = moved
// Update moved listener's spec index
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex && ls.subrelay == srl
})
movedSpecs[idx].index = spec.index
rl.clients[moved.ws] = movedSpecs
}
// Truncate listeners array
srl.listeners = srl.listeners[0 : len(srl.listeners)-1]
}
}
}
}
```
**Swap-delete pattern:**
1. Move last element to deleted position
2. Truncate array
3. **Result:** O(1) deletion without preserving order
**Why not just delete?**
- `append(arr[:i], arr[i+1:]...)` is O(n) - shifts all elements
- Swap-delete is O(1) - just one swap and truncate
- Order doesn't matter for listeners
### Removing All Client Listeners
```go
// listener.go, lines 101-133
func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// Remove each subscription
for s, spec := range specs {
srl := spec.subrelay
// Swap-delete from listeners array
if spec.index != len(srl.listeners)-1 {
movedFromIndex := len(srl.listeners) - 1
moved := srl.listeners[movedFromIndex]
srl.listeners[spec.index] = moved
// Mark current spec as invalid
rl.clients[ws][s].index = -1
// Update moved listener's spec
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex && ls.subrelay == srl
})
movedSpecs[idx].index = spec.index
rl.clients[moved.ws] = movedSpecs
}
// Truncate listeners array
srl.listeners = srl.listeners[0 : len(srl.listeners)-1]
}
}
// Remove client from map
delete(rl.clients, ws)
}
```
**Called when client disconnects:** Removes all subscriptions for that client
### Broadcasting to Listeners
```go
// listener.go, lines 136-151
func (rl *Relay) notifyListeners(event *nostr.Event) int {
count := 0
listenersloop:
for _, listener := range rl.listeners {
// Check if filter matches event
if listener.filter.Matches(event) {
// Check if broadcast should be prevented (hooks)
for _, pb := range rl.PreventBroadcast {
if pb(listener.ws, event) {
continue listenersloop
}
}
// Send event to subscriber
listener.ws.WriteJSON(nostr.EventEnvelope{
SubscriptionID: &listener.id,
Event: *event,
})
count++
}
}
return count
}
```
**Performance characteristics:**
- **O(n) in number of listeners:** Iterates all active subscriptions
- **Fast filter matching:** Simple field comparisons
- **No allocations:** Uses existing listener array
- **Labeled continue:** Clean exit from nested loop
**Optimization opportunity:** For relays with thousands of subscriptions, consider:
- Indexing listeners by event kind
- Using bloom filters for quick negatives
- Sharding listeners across goroutines
## Context Utilities
### Context Keys
```go
// utils.go
const (
wsKey = iota // WebSocket connection
subscriptionIdKey // Current subscription ID
nip86HeaderAuthKey // NIP-86 authorization header
internalCallKey // Internal call marker
)
```
**Pattern:** Use iota for compile-time context key uniqueness
### Get WebSocket from Context
```go
func GetConnection(ctx context.Context) *WebSocket {
wsi := ctx.Value(wsKey)
if wsi != nil {
return wsi.(*WebSocket)
}
return nil
}
```
**Usage:** Retrieve WebSocket in hooks and handlers
### Get Authenticated Pubkey
```go
func GetAuthed(ctx context.Context) string {
// Check WebSocket auth
if conn := GetConnection(ctx); conn != nil {
return conn.AuthedPublicKey
}
// Check NIP-86 header auth
if nip86Auth := ctx.Value(nip86HeaderAuthKey); nip86Auth != nil {
return nip86Auth.(string)
}
return ""
}
```
**Supports two auth mechanisms:**
1. NIP-42 WebSocket authentication
2. NIP-86 HTTP header authentication
### Request Authentication
```go
func RequestAuth(ctx context.Context) {
ws := GetConnection(ctx)
ws.authLock.Lock()
if ws.Authed == nil {
ws.Authed = make(chan struct{})
}
ws.authLock.Unlock()
ws.WriteJSON(nostr.AuthEnvelope{Challenge: &ws.Challenge})
}
```
**Sends AUTH challenge to client**
### Wait for Authentication
```go
func (ws *WebSocket) WaitForAuth(timeout time.Duration) bool {
ws.authLock.Lock()
authChan := ws.Authed
ws.authLock.Unlock()
if authChan == nil {
return true // Already authenticated
}
select {
case <-authChan:
return true // Authenticated
case <-time.After(timeout):
return false // Timeout
}
}
```
**Pattern:** Use closed channel as signal
## Performance Patterns
### Zero-Copy String Conversion
```go
message := unsafe.String(unsafe.SliceData(msgb), len(msgb))
```
**When safe:**
- `msgb` is newly allocated by `ReadMessage()`
- Not modified after conversion
- Message processing completes before next read
**Savings:** Avoids 512 KB allocation per message
### Goroutine-per-Message
```go
go func(message string) {
handleMessage(message)
}(message)
```
**Benefits:**
- Read loop continues immediately
- Messages processed concurrently
- Natural backpressure (goroutine scheduler)
**Trade-off:** Goroutine creation overhead (typically <1μs)
### Swap-Delete for Slice Removal
```go
// O(1) deletion
arr[i] = arr[len(arr)-1]
arr = arr[:len(arr)-1]
// vs. O(n) deletion
arr = append(arr[:i], arr[i+1:]...)
```
**When appropriate:**
- Order doesn't matter (listeners, specs)
- Frequent removals expected
- Array size significant
### Lock-Free Session Maps
```go
negentropySessions *xsync.MapOf[string, *NegentropySession]
```
**vs. standard map with mutex:**
```go
sessions map[string]*NegentropySession
mutex sync.RWMutex
```
**Benefits of xsync.MapOf:**
- Lock-free concurrent access
- Better performance under contention
- No manual lock management
**Trade-off:** Slightly more memory per entry
## Testing Patterns
### Basic WebSocket Test
```go
func TestWebSocketConnection(t *testing.T) {
relay := khatru.NewRelay()
// Start server
server := httptest.NewServer(relay)
defer server.Close()
// Convert http:// to ws://
wsURL := "ws" + strings.TrimPrefix(server.URL, "http")
// Connect client
ws, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("Dial failed: %v", err)
}
defer ws.Close()
// Send REQ
req := `["REQ","test",{"kinds":[1]}]`
if err := ws.WriteMessage(websocket.TextMessage, []byte(req)); err != nil {
t.Fatalf("WriteMessage failed: %v", err)
}
// Read EOSE
_, msg, err := ws.ReadMessage()
if err != nil {
t.Fatalf("ReadMessage failed: %v", err)
}
if !strings.Contains(string(msg), "EOSE") {
t.Errorf("Expected EOSE, got: %s", msg)
}
}
```
### Testing Hooks
```go
func TestRejectConnection(t *testing.T) {
relay := khatru.NewRelay()
// Add rejection hook
relay.RejectConnection = append(relay.RejectConnection,
func(r *http.Request) bool {
return r.RemoteAddr == "192.0.2.1:12345" // Block specific IP
},
)
server := httptest.NewServer(relay)
defer server.Close()
wsURL := "ws" + strings.TrimPrefix(server.URL, "http")
// Should fail to connect
ws, resp, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err == nil {
ws.Close()
t.Fatal("Expected connection to be rejected")
}
if resp.StatusCode != 429 {
t.Errorf("Expected 429, got %d", resp.StatusCode)
}
}
```
## Production Deployment
### Recommended Configuration
```go
relay := khatru.NewRelay()
relay.ServiceURL = "wss://relay.example.com"
relay.WriteWait = 10 * time.Second
relay.PongWait = 60 * time.Second
relay.PingPeriod = 30 * time.Second
relay.MaxMessageSize = 512000 // 512 KB
relay.upgrader.EnableCompression = true
relay.upgrader.CheckOrigin = func(r *http.Request) bool {
// For public relays: return true
// For private relays: validate origin
return true
}
```
### Rate Limiting Hook
```go
import "golang.org/x/time/rate"
type RateLimiter struct {
limiters map[string]*rate.Limiter
mu sync.Mutex
}
func (rl *RateLimiter) getLimiter(ip string) *rate.Limiter {
rl.mu.Lock()
defer rl.mu.Unlock()
limiter, exists := rl.limiters[ip]
if !exists {
limiter = rate.NewLimiter(10, 20) // 10/sec, burst 20
rl.limiters[ip] = limiter
}
return limiter
}
rateLimiter := &RateLimiter{limiters: make(map[string]*rate.Limiter)}
relay.RejectConnection = append(relay.RejectConnection,
func(r *http.Request) bool {
ip := getIP(r)
return !rateLimiter.getLimiter(ip).Allow()
},
)
```
### Monitoring Hook
```go
relay.OnConnect = append(relay.OnConnect,
func(ctx context.Context) {
ws := khatru.GetConnection(ctx)
log.Printf("connection from %s", khatru.GetIP(ctx))
metrics.ActiveConnections.Inc()
},
)
relay.OnDisconnect = append(relay.OnDisconnect,
func(ctx context.Context) {
log.Printf("disconnection from %s", khatru.GetIP(ctx))
metrics.ActiveConnections.Dec()
},
)
```
### Graceful Shutdown
```go
server := &http.Server{
Addr: ":8080",
Handler: relay,
}
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
}()
<-sigChan
log.Println("Shutting down...")
// Graceful shutdown with timeout
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Printf("Shutdown error: %v", err)
}
```
## Summary
**Key architectural decisions:**
1. **Dual goroutine per connection:** Separate read and ping concerns
2. **Mutex-protected writes:** Simplest concurrency safety
3. **Hook-based extensibility:** Plugin architecture without framework changes
4. **Swap-delete for listeners:** O(1) subscription removal
5. **Context-based lifecycle:** Clean cancellation propagation
6. **Zero-copy optimizations:** Reduce allocations in hot path
**When to use khatru patterns:**
- Building Nostr relays in Go
- Need plugin architecture (hooks)
- Want simple, understandable WebSocket handling
- Prioritize correctness over maximum performance
- Support multi-relay routing
**Performance characteristics:**
- Handles 10,000+ concurrent connections per server
- Sub-millisecond latency for event broadcast
- ~10 MB memory per 1000 connections
- Single-core CPU can serve 1000+ req/sec
**Further reading:**
- khatru repository: https://github.com/fiatjaf/khatru
- nostr-sdk (includes khatru): https://github.com/nbd-wtf/go-nostr
- WebSocket library: https://github.com/fasthttp/websocket