35 KiB
Go WebSocket Implementation for Nostr Relays (khatru patterns)
This reference documents production-ready WebSocket patterns from the khatru Nostr relay implementation in Go.
Repository Information
- Project: khatru - Nostr relay framework
- Repository: https://github.com/fiatjaf/khatru
- Language: Go
- WebSocket Library:
github.com/fasthttp/websocket - Architecture: Hook-based plugin system with dual-goroutine per connection
Core Architecture
Relay Structure
// relay.go, lines 54-119
type Relay struct {
// Service configuration
ServiceURL string
upgrader websocket.Upgrader
// WebSocket lifecycle hooks
RejectConnection []func(r *http.Request) bool
OnConnect []func(ctx context.Context)
OnDisconnect []func(ctx context.Context)
// Event processing hooks
RejectEvent []func(ctx context.Context, event *nostr.Event) (reject bool, msg string)
OverwriteDeletionOutcome []func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string)
StoreEvent []func(ctx context.Context, event *nostr.Event) error
ReplaceEvent []func(ctx context.Context, event *nostr.Event) error
DeleteEvent []func(ctx context.Context, event *nostr.Event) error
OnEventSaved []func(ctx context.Context, event *nostr.Event)
OnEphemeralEvent []func(ctx context.Context, event *nostr.Event)
// Filter/query hooks
RejectFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
OverwriteFilter []func(ctx context.Context, filter *nostr.Filter)
QueryEvents []func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error)
CountEvents []func(ctx context.Context, filter nostr.Filter) (int64, error)
CountEventsHLL []func(ctx context.Context, filter nostr.Filter, offset int) (int64, *hyperloglog.HyperLogLog, error)
// Broadcast control
PreventBroadcast []func(ws *WebSocket, event *nostr.Event) bool
OverwriteResponseEvent []func(ctx context.Context, event *nostr.Event)
// Client tracking
clients map[*WebSocket][]listenerSpec
listeners []listener
clientsMutex sync.Mutex
// WebSocket parameters
WriteWait time.Duration // Default: 10 seconds
PongWait time.Duration // Default: 60 seconds
PingPeriod time.Duration // Default: 30 seconds
MaxMessageSize int64 // Default: 512000 bytes
// Router support (for multi-relay setups)
routes []Route
getSubRelayFromEvent func(*nostr.Event) *Relay
getSubRelayFromFilter func(nostr.Filter) *Relay
// Protocol extensions
Negentropy bool // NIP-77 support
}
WebSocket Configuration
// relay.go, lines 31-35
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
},
Key configuration choices:
- 1 KB read/write buffers: Small buffers for many concurrent connections
- Allow all origins: Nostr is designed for public relays; adjust for private relays
- No compression by default: Can be enabled with
EnableCompression: true
Recommended production settings:
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
EnableCompression: true, // 60-80% bandwidth reduction
CheckOrigin: func(r *http.Request) bool {
// For public relays: return true
// For private relays: validate origin
origin := r.Header.Get("Origin")
return isAllowedOrigin(origin)
},
},
WebSocket Connection Structure
Connection Wrapper
// websocket.go, lines 12-32
type WebSocket struct {
conn *websocket.Conn
mutex sync.Mutex // Protects all write operations
// Original HTTP request (for IP, headers, etc.)
Request *http.Request
// Connection lifecycle context
Context context.Context
cancel context.CancelFunc
// NIP-42 authentication
Challenge string // Random 8-byte hex string
AuthedPublicKey string // Authenticated pubkey after AUTH
Authed chan struct{} // Closed when authenticated
authLock sync.Mutex
// NIP-77 negentropy sessions (for efficient set reconciliation)
negentropySessions *xsync.MapOf[string, *NegentropySession]
}
Design decisions:
- Mutex for writes: WebSocket library panics on concurrent writes; mutex is simplest solution
- Context-based lifecycle: Clean cancellation propagation to all operations
- Original request preservation: Enables IP extraction, header inspection
- NIP-42 challenge storage: No database lookup needed for authentication
- Lock-free session map:
xsync.MapOfprovides concurrent access without locks
Thread-Safe Write Operations
// websocket.go, lines 34-46
func (ws *WebSocket) WriteJSON(any any) error {
ws.mutex.Lock()
err := ws.conn.WriteJSON(any)
ws.mutex.Unlock()
return err
}
func (ws *WebSocket) WriteMessage(t int, b []byte) error {
ws.mutex.Lock()
err := ws.conn.WriteMessage(t, b)
ws.mutex.Unlock()
return err
}
Critical pattern: ALL writes to WebSocket MUST be protected by mutex
Common mistake:
// DON'T DO THIS - Race condition!
go func() {
ws.conn.WriteJSON(msg1) // Not protected
}()
go func() {
ws.conn.WriteJSON(msg2) // Not protected
}()
Correct approach:
// DO THIS - Protected writes
go func() {
ws.WriteJSON(msg1) // Uses mutex
}()
go func() {
ws.WriteJSON(msg2) // Uses mutex
}()
Connection Lifecycle
HTTP to WebSocket Upgrade
// handlers.go, lines 29-52
func (rl *Relay) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// CORS middleware for non-WebSocket requests
corsMiddleware := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{
http.MethodHead,
http.MethodGet,
http.MethodPost,
http.MethodPut,
http.MethodPatch,
http.MethodDelete,
},
AllowedHeaders: []string{"Authorization", "*"},
MaxAge: 86400,
})
// Route based on request type
if r.Header.Get("Upgrade") == "websocket" {
rl.HandleWebsocket(w, r) // WebSocket connection
} else if r.Header.Get("Accept") == "application/nostr+json" {
corsMiddleware.Handler(http.HandlerFunc(rl.HandleNIP11)).ServeHTTP(w, r) // NIP-11 metadata
} else if r.Header.Get("Content-Type") == "application/nostr+json+rpc" {
corsMiddleware.Handler(http.HandlerFunc(rl.HandleNIP86)).ServeHTTP(w, r) // NIP-86 management
} else {
corsMiddleware.Handler(rl.serveMux).ServeHTTP(w, r) // Other routes
}
}
Pattern: Single HTTP handler multiplexes all request types by headers
WebSocket Upgrade Process
// handlers.go, lines 55-105
func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
// Pre-upgrade rejection hooks
for _, reject := range rl.RejectConnection {
if reject(r) {
w.WriteHeader(429) // Too Many Requests
return
}
}
// Perform WebSocket upgrade
conn, err := rl.upgrader.Upgrade(w, r, nil)
if err != nil {
rl.Log.Printf("failed to upgrade websocket: %v\n", err)
return
}
// Create ping ticker for keep-alive
ticker := time.NewTicker(rl.PingPeriod)
// Generate NIP-42 authentication challenge
challenge := make([]byte, 8)
rand.Read(challenge)
// Initialize WebSocket wrapper
ws := &WebSocket{
conn: conn,
Request: r,
Challenge: hex.EncodeToString(challenge),
negentropySessions: xsync.NewMapOf[string, *NegentropySession](),
}
ws.Context, ws.cancel = context.WithCancel(context.Background())
// Register client
rl.clientsMutex.Lock()
rl.clients[ws] = make([]listenerSpec, 0, 2)
rl.clientsMutex.Unlock()
// Create connection context with WebSocket reference
ctx, cancel := context.WithCancel(
context.WithValue(context.Background(), wsKey, ws),
)
// Cleanup function for both goroutines
kill := func() {
// Trigger disconnect hooks
for _, ondisconnect := range rl.OnDisconnect {
ondisconnect(ctx)
}
// Stop timers and cancel contexts
ticker.Stop()
cancel()
ws.cancel()
// Close connection
ws.conn.Close()
// Remove from tracking
rl.removeClientAndListeners(ws)
}
// Launch read and write goroutines
go readLoop(ws, ctx, kill)
go writeLoop(ws, ctx, ticker, kill)
}
Key steps:
- Check rejection hooks (rate limiting, IP bans, etc.)
- Upgrade HTTP connection to WebSocket
- Generate authentication challenge (NIP-42)
- Initialize WebSocket wrapper with context
- Register client in tracking map
- Define cleanup function
- Launch read and write goroutines
Read Loop (Primary Goroutine)
// handlers.go, lines 107-414
go func() {
defer kill()
// Configure read constraints
ws.conn.SetReadLimit(rl.MaxMessageSize)
ws.conn.SetReadDeadline(time.Now().Add(rl.PongWait))
// Auto-refresh deadline on Pong receipt
ws.conn.SetPongHandler(func(string) error {
ws.conn.SetReadDeadline(time.Now().Add(rl.PongWait))
return nil
})
// Trigger connection hooks
for _, onconnect := range rl.OnConnect {
onconnect(ctx)
}
// Create message parser (sonic parser is stateful)
smp := nostr.NewMessageParser()
for {
// Read message (blocks until data available)
typ, msgb, err := ws.conn.ReadMessage()
if err != nil {
// Check if expected close
if websocket.IsUnexpectedCloseError(
err,
websocket.CloseNormalClosure, // 1000
websocket.CloseGoingAway, // 1001
websocket.CloseNoStatusReceived, // 1005
websocket.CloseAbnormalClosure, // 1006
4537, // Custom: client preference
) {
rl.Log.Printf("unexpected close error from %s: %v\n",
GetIPFromRequest(r), err)
}
ws.cancel()
return
}
// Handle Ping manually (library should auto-respond, but...)
if typ == websocket.PingMessage {
ws.WriteMessage(websocket.PongMessage, nil)
continue
}
// Zero-copy conversion to string
message := unsafe.String(unsafe.SliceData(msgb), len(msgb))
// Parse message (sequential due to sonic parser constraint)
envelope, err := smp.ParseMessage(message)
// Handle message in separate goroutine (concurrent processing)
go func(message string) {
switch env := envelope.(type) {
case *nostr.EventEnvelope:
handleEvent(ctx, ws, env, rl)
case *nostr.ReqEnvelope:
handleReq(ctx, ws, env, rl)
case *nostr.CloseEnvelope:
handleClose(ctx, ws, env, rl)
case *nostr.CountEnvelope:
handleCount(ctx, ws, env, rl)
case *nostr.AuthEnvelope:
handleAuth(ctx, ws, env, rl)
case *nip77.OpenEnvelope:
handleNegentropyOpen(ctx, ws, env, rl)
case *nip77.MessageEnvelope:
handleNegentropyMsg(ctx, ws, env, rl)
case *nip77.CloseEnvelope:
handleNegentropyClose(ctx, ws, env, rl)
default:
ws.WriteJSON(nostr.NoticeEnvelope("unknown message type"))
}
}(message)
}
}()
Critical patterns:
-
SetReadDeadline + SetPongHandler: Automatic timeout detection
- Read blocks up to
PongWait(60s) - Pong receipt resets deadline
- No Pong = timeout error = connection dead
- Read blocks up to
-
Zero-copy string conversion:
message := unsafe.String(unsafe.SliceData(msgb), len(msgb))- Avoids allocation when converting
[]bytetostring - Safe because
msgbis newly allocated byReadMessage()
- Avoids allocation when converting
-
Sequential parsing, concurrent handling:
smp.ParseMessage()called sequentially (parser has state)- Message handling dispatched to goroutine (concurrent)
- Balances correctness and performance
-
Goroutine-per-message pattern:
go func(message string) { // Handle message }(message)- Allows next message to be read immediately
- Prevents slow handler blocking read loop
- Captures
messageto avoid data race
Write Loop (Ping Goroutine)
// handlers.go, lines 416-433
go func() {
defer kill()
for {
select {
case <-ctx.Done():
// Connection closed or context canceled
return
case <-ticker.C:
// Send ping every PingPeriod (30s)
err := ws.WriteMessage(websocket.PingMessage, nil)
if err != nil {
if !strings.HasSuffix(err.Error(), "use of closed network connection") {
rl.Log.Printf("error writing ping: %v; closing websocket\n", err)
}
return
}
}
}
}()
Purpose:
- Send periodic pings to detect dead connections
- Uses
selectto monitor context cancellation - Returns on any write error (connection dead)
Timing relationship:
PingPeriod: 30 seconds (send ping every 30s)
PongWait: 60 seconds (expect pong within 60s)
Rule: PingPeriod < PongWait
If client doesn't respond to 2 consecutive pings,
connection times out after 60 seconds.
Connection Cleanup
kill := func() {
// 1. Trigger disconnect hooks
for _, ondisconnect := range rl.OnDisconnect {
ondisconnect(ctx)
}
// 2. Stop timers
ticker.Stop()
// 3. Cancel contexts
cancel()
ws.cancel()
// 4. Close connection
ws.conn.Close()
// 5. Remove from tracking
rl.removeClientAndListeners(ws)
}
defer kill()
Cleanup order:
- Hooks first: Allow app to log, update stats
- Stop timers: Prevent goroutine leaks
- Cancel contexts: Signal cancellation to operations
- Close connection: Release network resources
- Remove tracking: Clean up maps
Why defer? Ensures cleanup runs even if goroutine panics
Message Handling
Event Handling (EVENT)
// handlers.go, lines 163-258
case *nostr.EventEnvelope:
// Validate event ID (must match hash of content)
if !env.Event.CheckID() {
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "invalid: id is computed incorrectly",
})
return
}
// Validate signature
if ok, err := env.Event.CheckSignature(); err != nil {
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "error: failed to verify signature",
})
return
} else if !ok {
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "invalid: signature is invalid",
})
return
}
// Check NIP-70 protected events
if nip70.IsProtected(env.Event) {
authed := GetAuthed(ctx)
if authed == "" {
// Request authentication
RequestAuth(ctx)
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "auth-required: must be published by authenticated event author",
})
return
}
}
// Route to subrelay if using relay routing
srl := rl
if rl.getSubRelayFromEvent != nil {
srl = rl.getSubRelayFromEvent(&env.Event)
}
// Handle event based on kind
var skipBroadcast bool
var writeErr error
if env.Event.Kind == 5 {
// Deletion event
writeErr = srl.handleDeleteRequest(ctx, &env.Event)
} else if nostr.IsEphemeralKind(env.Event.Kind) {
// Ephemeral event (20000-29999)
writeErr = srl.handleEphemeral(ctx, &env.Event)
} else {
// Normal event
skipBroadcast, writeErr = srl.handleNormal(ctx, &env.Event)
}
// Broadcast to subscribers (unless prevented)
if !skipBroadcast {
n := srl.notifyListeners(&env.Event)
// Can update reason with broadcast count
}
// Send OK response
ok := writeErr == nil
reason := ""
if writeErr != nil {
reason = writeErr.Error()
}
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: ok,
Reason: reason,
})
Validation sequence:
- Check event ID matches content hash
- Verify cryptographic signature
- Check authentication if protected event (NIP-70)
- Route to appropriate subrelay (if multi-relay setup)
- Handle based on kind (deletion, ephemeral, normal)
- Broadcast to matching subscriptions
- Send OK response to publisher
Request Handling (REQ)
// handlers.go, lines 289-324
case *nostr.ReqEnvelope:
// Create WaitGroup for EOSE synchronization
eose := sync.WaitGroup{}
eose.Add(len(env.Filters))
// Create cancelable context for subscription
reqCtx, cancelReqCtx := context.WithCancelCause(ctx)
// Expose subscription ID in context
reqCtx = context.WithValue(reqCtx, subscriptionIdKey, env.SubscriptionID)
// Handle each filter
for _, filter := range env.Filters {
// Route to appropriate subrelay
srl := rl
if rl.getSubRelayFromFilter != nil {
srl = rl.getSubRelayFromFilter(filter)
}
// Query stored events
err := srl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter)
if err != nil {
// Fail entire subscription if any filter rejected
reason := err.Error()
if strings.HasPrefix(reason, "auth-required:") {
RequestAuth(ctx)
}
ws.WriteJSON(nostr.ClosedEnvelope{
SubscriptionID: env.SubscriptionID,
Reason: reason,
})
cancelReqCtx(errors.New("filter rejected"))
return
} else {
// Add listener for real-time events
rl.addListener(ws, env.SubscriptionID, srl, filter, cancelReqCtx)
}
}
// Send EOSE when all stored events dispatched
go func() {
eose.Wait()
ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID))
}()
Subscription lifecycle:
- Parse filters: Client sends array of filters in REQ
- Create context: Allows cancellation of subscription
- Query database: For each filter, query stored events
- Stream results: Send matching events to client
- Send EOSE: End Of Stored Events marker
- Add listener: Subscribe to real-time events
WaitGroup pattern:
eose := sync.WaitGroup{}
eose.Add(len(env.Filters))
// Each query handler calls eose.Done() when complete
go func() {
eose.Wait() // Wait for all queries
ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID))
}()
Close Handling (CLOSE)
// handlers.go, lines 325-327
case *nostr.CloseEnvelope:
id := string(*env)
rl.removeListenerId(ws, id)
Simple unsubscribe: Remove listener by subscription ID
Authentication (AUTH)
// handlers.go, lines 328-341
case *nostr.AuthEnvelope:
// Compute relay WebSocket URL
wsBaseUrl := strings.Replace(rl.getBaseURL(r), "http", "ws", 1)
// Validate AUTH event
if pubkey, ok := nip42.ValidateAuthEvent(&env.Event, ws.Challenge, wsBaseUrl); ok {
// Store authenticated pubkey
ws.AuthedPublicKey = pubkey
// Close Authed channel (unblocks any waiting goroutines)
ws.authLock.Lock()
if ws.Authed != nil {
close(ws.Authed)
ws.Authed = nil
}
ws.authLock.Unlock()
// Send OK response
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: true})
} else {
// Validation failed
ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "error: failed to authenticate",
})
}
NIP-42 authentication:
- Client receives AUTH challenge on connect
- Client creates kind-22242 event with challenge
- Server validates event signature and challenge match
- Server stores authenticated pubkey in
ws.AuthedPublicKey
Subscription Management
Subscription Data Structures
// listener.go, lines 13-24
type listenerSpec struct {
id string // Subscription ID from REQ
cancel context.CancelCauseFunc // Cancels this subscription
index int // Position in subrelay.listeners array
subrelay *Relay // Reference to (sub)relay handling this
}
type listener struct {
id string // Subscription ID
filter nostr.Filter // Filter for matching events
ws *WebSocket // WebSocket connection
}
Two-level tracking:
-
Per-client specs:
clients map[*WebSocket][]listenerSpec- Tracks what subscriptions each client has
- Enables cleanup when client disconnects
-
Per-relay listeners:
listeners []listener- Flat array for fast iteration when broadcasting
- No maps, no allocations during broadcast
Adding Listeners
// listener.go, lines 36-60
func (rl *Relay) addListener(
ws *WebSocket,
id string,
subrelay *Relay,
filter nostr.Filter,
cancel context.CancelCauseFunc,
) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// Get position where listener will be added
idx := len(subrelay.listeners)
// Add spec to client's list
rl.clients[ws] = append(specs, listenerSpec{
id: id,
cancel: cancel,
subrelay: subrelay,
index: idx,
})
// Add listener to relay's list
subrelay.listeners = append(subrelay.listeners, listener{
ws: ws,
id: id,
filter: filter,
})
}
}
O(1) append operation
Removing Listeners by ID
// listener.go, lines 64-99
func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// Iterate backwards for safe removal
for s := len(specs) - 1; s >= 0; s-- {
spec := specs[s]
if spec.id == id {
// Cancel subscription context
spec.cancel(ErrSubscriptionClosedByClient)
// Swap-delete from specs array
specs[s] = specs[len(specs)-1]
specs = specs[0 : len(specs)-1]
rl.clients[ws] = specs
// Remove from listener list in subrelay
srl := spec.subrelay
// If not last element, swap with last
if spec.index != len(srl.listeners)-1 {
movedFromIndex := len(srl.listeners) - 1
moved := srl.listeners[movedFromIndex]
srl.listeners[spec.index] = moved
// Update moved listener's spec index
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex && ls.subrelay == srl
})
movedSpecs[idx].index = spec.index
rl.clients[moved.ws] = movedSpecs
}
// Truncate listeners array
srl.listeners = srl.listeners[0 : len(srl.listeners)-1]
}
}
}
}
Swap-delete pattern:
- Move last element to deleted position
- Truncate array
- Result: O(1) deletion without preserving order
Why not just delete?
append(arr[:i], arr[i+1:]...)is O(n) - shifts all elements- Swap-delete is O(1) - just one swap and truncate
- Order doesn't matter for listeners
Removing All Client Listeners
// listener.go, lines 101-133
func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// Remove each subscription
for s, spec := range specs {
srl := spec.subrelay
// Swap-delete from listeners array
if spec.index != len(srl.listeners)-1 {
movedFromIndex := len(srl.listeners) - 1
moved := srl.listeners[movedFromIndex]
srl.listeners[spec.index] = moved
// Mark current spec as invalid
rl.clients[ws][s].index = -1
// Update moved listener's spec
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex && ls.subrelay == srl
})
movedSpecs[idx].index = spec.index
rl.clients[moved.ws] = movedSpecs
}
// Truncate listeners array
srl.listeners = srl.listeners[0 : len(srl.listeners)-1]
}
}
// Remove client from map
delete(rl.clients, ws)
}
Called when client disconnects: Removes all subscriptions for that client
Broadcasting to Listeners
// listener.go, lines 136-151
func (rl *Relay) notifyListeners(event *nostr.Event) int {
count := 0
listenersloop:
for _, listener := range rl.listeners {
// Check if filter matches event
if listener.filter.Matches(event) {
// Check if broadcast should be prevented (hooks)
for _, pb := range rl.PreventBroadcast {
if pb(listener.ws, event) {
continue listenersloop
}
}
// Send event to subscriber
listener.ws.WriteJSON(nostr.EventEnvelope{
SubscriptionID: &listener.id,
Event: *event,
})
count++
}
}
return count
}
Performance characteristics:
- O(n) in number of listeners: Iterates all active subscriptions
- Fast filter matching: Simple field comparisons
- No allocations: Uses existing listener array
- Labeled continue: Clean exit from nested loop
Optimization opportunity: For relays with thousands of subscriptions, consider:
- Indexing listeners by event kind
- Using bloom filters for quick negatives
- Sharding listeners across goroutines
Context Utilities
Context Keys
// utils.go
const (
wsKey = iota // WebSocket connection
subscriptionIdKey // Current subscription ID
nip86HeaderAuthKey // NIP-86 authorization header
internalCallKey // Internal call marker
)
Pattern: Use iota for compile-time context key uniqueness
Get WebSocket from Context
func GetConnection(ctx context.Context) *WebSocket {
wsi := ctx.Value(wsKey)
if wsi != nil {
return wsi.(*WebSocket)
}
return nil
}
Usage: Retrieve WebSocket in hooks and handlers
Get Authenticated Pubkey
func GetAuthed(ctx context.Context) string {
// Check WebSocket auth
if conn := GetConnection(ctx); conn != nil {
return conn.AuthedPublicKey
}
// Check NIP-86 header auth
if nip86Auth := ctx.Value(nip86HeaderAuthKey); nip86Auth != nil {
return nip86Auth.(string)
}
return ""
}
Supports two auth mechanisms:
- NIP-42 WebSocket authentication
- NIP-86 HTTP header authentication
Request Authentication
func RequestAuth(ctx context.Context) {
ws := GetConnection(ctx)
ws.authLock.Lock()
if ws.Authed == nil {
ws.Authed = make(chan struct{})
}
ws.authLock.Unlock()
ws.WriteJSON(nostr.AuthEnvelope{Challenge: &ws.Challenge})
}
Sends AUTH challenge to client
Wait for Authentication
func (ws *WebSocket) WaitForAuth(timeout time.Duration) bool {
ws.authLock.Lock()
authChan := ws.Authed
ws.authLock.Unlock()
if authChan == nil {
return true // Already authenticated
}
select {
case <-authChan:
return true // Authenticated
case <-time.After(timeout):
return false // Timeout
}
}
Pattern: Use closed channel as signal
Performance Patterns
Zero-Copy String Conversion
message := unsafe.String(unsafe.SliceData(msgb), len(msgb))
When safe:
msgbis newly allocated byReadMessage()- Not modified after conversion
- Message processing completes before next read
Savings: Avoids 512 KB allocation per message
Goroutine-per-Message
go func(message string) {
handleMessage(message)
}(message)
Benefits:
- Read loop continues immediately
- Messages processed concurrently
- Natural backpressure (goroutine scheduler)
Trade-off: Goroutine creation overhead (typically <1μs)
Swap-Delete for Slice Removal
// O(1) deletion
arr[i] = arr[len(arr)-1]
arr = arr[:len(arr)-1]
// vs. O(n) deletion
arr = append(arr[:i], arr[i+1:]...)
When appropriate:
- Order doesn't matter (listeners, specs)
- Frequent removals expected
- Array size significant
Lock-Free Session Maps
negentropySessions *xsync.MapOf[string, *NegentropySession]
vs. standard map with mutex:
sessions map[string]*NegentropySession
mutex sync.RWMutex
Benefits of xsync.MapOf:
- Lock-free concurrent access
- Better performance under contention
- No manual lock management
Trade-off: Slightly more memory per entry
Testing Patterns
Basic WebSocket Test
func TestWebSocketConnection(t *testing.T) {
relay := khatru.NewRelay()
// Start server
server := httptest.NewServer(relay)
defer server.Close()
// Convert http:// to ws://
wsURL := "ws" + strings.TrimPrefix(server.URL, "http")
// Connect client
ws, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("Dial failed: %v", err)
}
defer ws.Close()
// Send REQ
req := `["REQ","test",{"kinds":[1]}]`
if err := ws.WriteMessage(websocket.TextMessage, []byte(req)); err != nil {
t.Fatalf("WriteMessage failed: %v", err)
}
// Read EOSE
_, msg, err := ws.ReadMessage()
if err != nil {
t.Fatalf("ReadMessage failed: %v", err)
}
if !strings.Contains(string(msg), "EOSE") {
t.Errorf("Expected EOSE, got: %s", msg)
}
}
Testing Hooks
func TestRejectConnection(t *testing.T) {
relay := khatru.NewRelay()
// Add rejection hook
relay.RejectConnection = append(relay.RejectConnection,
func(r *http.Request) bool {
return r.RemoteAddr == "192.0.2.1:12345" // Block specific IP
},
)
server := httptest.NewServer(relay)
defer server.Close()
wsURL := "ws" + strings.TrimPrefix(server.URL, "http")
// Should fail to connect
ws, resp, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err == nil {
ws.Close()
t.Fatal("Expected connection to be rejected")
}
if resp.StatusCode != 429 {
t.Errorf("Expected 429, got %d", resp.StatusCode)
}
}
Production Deployment
Recommended Configuration
relay := khatru.NewRelay()
relay.ServiceURL = "wss://relay.example.com"
relay.WriteWait = 10 * time.Second
relay.PongWait = 60 * time.Second
relay.PingPeriod = 30 * time.Second
relay.MaxMessageSize = 512000 // 512 KB
relay.upgrader.EnableCompression = true
relay.upgrader.CheckOrigin = func(r *http.Request) bool {
// For public relays: return true
// For private relays: validate origin
return true
}
Rate Limiting Hook
import "golang.org/x/time/rate"
type RateLimiter struct {
limiters map[string]*rate.Limiter
mu sync.Mutex
}
func (rl *RateLimiter) getLimiter(ip string) *rate.Limiter {
rl.mu.Lock()
defer rl.mu.Unlock()
limiter, exists := rl.limiters[ip]
if !exists {
limiter = rate.NewLimiter(10, 20) // 10/sec, burst 20
rl.limiters[ip] = limiter
}
return limiter
}
rateLimiter := &RateLimiter{limiters: make(map[string]*rate.Limiter)}
relay.RejectConnection = append(relay.RejectConnection,
func(r *http.Request) bool {
ip := getIP(r)
return !rateLimiter.getLimiter(ip).Allow()
},
)
Monitoring Hook
relay.OnConnect = append(relay.OnConnect,
func(ctx context.Context) {
ws := khatru.GetConnection(ctx)
log.Printf("connection from %s", khatru.GetIP(ctx))
metrics.ActiveConnections.Inc()
},
)
relay.OnDisconnect = append(relay.OnDisconnect,
func(ctx context.Context) {
log.Printf("disconnection from %s", khatru.GetIP(ctx))
metrics.ActiveConnections.Dec()
},
)
Graceful Shutdown
server := &http.Server{
Addr: ":8080",
Handler: relay,
}
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
}()
<-sigChan
log.Println("Shutting down...")
// Graceful shutdown with timeout
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Printf("Shutdown error: %v", err)
}
Summary
Key architectural decisions:
- Dual goroutine per connection: Separate read and ping concerns
- Mutex-protected writes: Simplest concurrency safety
- Hook-based extensibility: Plugin architecture without framework changes
- Swap-delete for listeners: O(1) subscription removal
- Context-based lifecycle: Clean cancellation propagation
- Zero-copy optimizations: Reduce allocations in hot path
When to use khatru patterns:
- Building Nostr relays in Go
- Need plugin architecture (hooks)
- Want simple, understandable WebSocket handling
- Prioritize correctness over maximum performance
- Support multi-relay routing
Performance characteristics:
- Handles 10,000+ concurrent connections per server
- Sub-millisecond latency for event broadcast
- ~10 MB memory per 1000 connections
- Single-core CPU can serve 1000+ req/sec
Further reading:
- khatru repository: https://github.com/fiatjaf/khatru
- nostr-sdk (includes khatru): https://github.com/nbd-wtf/go-nostr
- WebSocket library: https://github.com/fasthttp/websocket