Fix log buffer race condition and add NRC IDS/chunking support (v0.52.4)
Some checks are pending
Go / build-and-release (push) Waiting to run
Some checks are pending
Go / build-and-release (push) Waiting to run
- Fix race condition in BufferedWriter causing slice bounds panics under concurrent writes by adding mutex protection - Add IDS message type to NRC protocol for event manifest diffing - Add chunking support for large NRC responses (>40KB) - Skip rate limiting for NIP-46 bunker events (kind 24133) for realtime priority Files modified: - pkg/logbuffer/writer.go: Add mutex to protect lineBuf operations - pkg/event/processing/processing.go: Skip rate limit for NIP-46 - pkg/protocol/nrc/bridge.go: Add IDS handler and chunked responses - pkg/protocol/nrc/client.go: Add IDS client and chunk reassembly - pkg/protocol/nrc/session.go: Add IDS/Chunk message types - pkg/version/version: Bump to v0.52.4 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -183,8 +183,9 @@ func (s *Service) saveEvent(ctx context.Context, ev *event.E) Result {
|
||||
saveCtx, cancel := context.WithTimeout(ctx, s.cfg.WriteTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Apply rate limiting
|
||||
if s.rateLimiter != nil && s.rateLimiter.IsEnabled() {
|
||||
// Apply rate limiting (skip for NIP-46 bunker events which need realtime priority)
|
||||
const kindNIP46 = 24133
|
||||
if s.rateLimiter != nil && s.rateLimiter.IsEnabled() && ev.Kind != uint16(kindNIP46) {
|
||||
const writeOpType = 1 // ratelimit.Write
|
||||
s.rateLimiter.Wait(saveCtx, writeOpType)
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -14,6 +15,7 @@ type BufferedWriter struct {
|
||||
original io.Writer
|
||||
buffer *Buffer
|
||||
lineBuf bytes.Buffer
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// Log format regex patterns
|
||||
@@ -42,10 +44,12 @@ func (w *BufferedWriter) Write(p []byte) (n int, err error) {
|
||||
|
||||
// Store in buffer if we have one
|
||||
if w.buffer != nil {
|
||||
w.mu.Lock()
|
||||
// Accumulate data in line buffer
|
||||
w.lineBuf.Write(p)
|
||||
|
||||
// Process complete lines
|
||||
var entries []LogEntry
|
||||
for {
|
||||
line, lineErr := w.lineBuf.ReadString('\n')
|
||||
if lineErr != nil {
|
||||
@@ -56,12 +60,18 @@ func (w *BufferedWriter) Write(p []byte) (n int, err error) {
|
||||
break
|
||||
}
|
||||
|
||||
// Parse and store the complete line
|
||||
// Parse the complete line
|
||||
entry := w.parseLine(strings.TrimSuffix(line, "\n"))
|
||||
if entry.Message != "" {
|
||||
w.buffer.Add(entry)
|
||||
entries = append(entries, entry)
|
||||
}
|
||||
}
|
||||
w.mu.Unlock()
|
||||
|
||||
// Add entries outside the lock to avoid holding it during buffer.Add
|
||||
for _, entry := range entries {
|
||||
w.buffer.Add(entry)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
@@ -2,6 +2,8 @@ package nrc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
@@ -25,6 +27,8 @@ const (
|
||||
KindNRCRequest = 24891
|
||||
// KindNRCResponse is the event kind for NRC responses.
|
||||
KindNRCResponse = 24892
|
||||
// MaxChunkSize is the maximum size for a single chunk (40KB to stay under 65KB limit after NIP-44 + base64).
|
||||
MaxChunkSize = 40000
|
||||
)
|
||||
|
||||
// BridgeConfig holds configuration for the NRC bridge.
|
||||
@@ -300,6 +304,8 @@ func (b *Bridge) forwardToLocalRelay(ctx context.Context, session *Session, reqE
|
||||
return b.handleCLOSE(ctx, session, reqEvent, reqMsg)
|
||||
case "COUNT":
|
||||
return b.handleCOUNT(ctx, session, reqEvent, reqMsg, localConn)
|
||||
case "IDS":
|
||||
return b.handleIDS(ctx, session, reqEvent, reqMsg, localConn)
|
||||
default:
|
||||
return fmt.Errorf("unsupported message type: %s", reqMsg.Type)
|
||||
}
|
||||
@@ -462,6 +468,158 @@ func (b *Bridge) handleCOUNT(ctx context.Context, session *Session, reqEvent *ev
|
||||
return b.sendResponse(ctx, reqEvent, session, resp)
|
||||
}
|
||||
|
||||
// handleIDS handles an IDS message - returns event manifests for diffing.
|
||||
func (b *Bridge) handleIDS(ctx context.Context, session *Session, reqEvent *event.E, reqMsg *RequestMessage, conn *ws.Client) error {
|
||||
// Extract subscription ID and filters from payload
|
||||
// Payload: ["IDS", "<sub_id>", filter1, filter2, ...]
|
||||
if len(reqMsg.Payload) < 3 {
|
||||
return fmt.Errorf("invalid IDS payload")
|
||||
}
|
||||
subID, ok := reqMsg.Payload[1].(string)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid subscription ID")
|
||||
}
|
||||
|
||||
// Parse filters from payload
|
||||
var filters []*filter.F
|
||||
for i := 2; i < len(reqMsg.Payload); i++ {
|
||||
filterMap, ok := reqMsg.Payload[i].(map[string]any)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
filterBytes, err := json.Marshal(filterMap)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
var f filter.F
|
||||
if err := json.Unmarshal(filterBytes, &f); err != nil {
|
||||
continue
|
||||
}
|
||||
filters = append(filters, &f)
|
||||
}
|
||||
|
||||
if len(filters) == 0 {
|
||||
return fmt.Errorf("no valid filters in IDS")
|
||||
}
|
||||
|
||||
// Add subscription to session
|
||||
if err := session.AddSubscription(subID); err != nil {
|
||||
return err
|
||||
}
|
||||
defer session.RemoveSubscription(subID)
|
||||
|
||||
// Create filter set
|
||||
filterSet := filter.NewS(filters...)
|
||||
|
||||
// Subscribe to local relay
|
||||
sub, err := conn.Subscribe(ctx, filterSet)
|
||||
if chk.E(err) {
|
||||
return fmt.Errorf("local subscribe failed: %w", err)
|
||||
}
|
||||
defer sub.Unsub()
|
||||
|
||||
// Collect events and build manifest
|
||||
var manifest []EventManifestEntry
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case ev := <-sub.Events:
|
||||
if ev == nil {
|
||||
// Subscription closed, send IDS response
|
||||
return b.sendIDSResponse(ctx, reqEvent, session, subID, manifest)
|
||||
}
|
||||
|
||||
// Build manifest entry
|
||||
entry := EventManifestEntry{
|
||||
Kind: int(ev.Kind),
|
||||
ID: string(hex.Enc(ev.ID[:])),
|
||||
CreatedAt: ev.CreatedAt,
|
||||
}
|
||||
|
||||
// Check for d tag (parameterized replaceable events)
|
||||
dTag := ev.Tags.GetFirst([]byte("d"))
|
||||
if dTag != nil && dTag.Len() >= 2 {
|
||||
entry.D = string(dTag.Value())
|
||||
}
|
||||
|
||||
manifest = append(manifest, entry)
|
||||
case <-sub.EndOfStoredEvents:
|
||||
// Send IDS response with manifest
|
||||
return b.sendIDSResponse(ctx, reqEvent, session, subID, manifest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sendIDSResponse sends an IDS response with the event manifest, chunking if necessary.
|
||||
func (b *Bridge) sendIDSResponse(ctx context.Context, reqEvent *event.E, session *Session, subID string, manifest []EventManifestEntry) error {
|
||||
resp := &ResponseMessage{
|
||||
Type: "IDS",
|
||||
Payload: []any{"IDS", subID, manifest},
|
||||
}
|
||||
return b.sendResponseChunked(ctx, reqEvent, session, resp)
|
||||
}
|
||||
|
||||
// sendResponseChunked sends a response, chunking if necessary for large payloads.
|
||||
func (b *Bridge) sendResponseChunked(ctx context.Context, reqEvent *event.E, session *Session, resp *ResponseMessage) error {
|
||||
// Marshal response content
|
||||
content, err := MarshalResponseContent(resp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal failed: %w", err)
|
||||
}
|
||||
|
||||
// If small enough, send directly
|
||||
if len(content) <= MaxChunkSize {
|
||||
return b.sendResponse(ctx, reqEvent, session, resp)
|
||||
}
|
||||
|
||||
// Need to chunk - encode to base64 for safe transmission
|
||||
encoded := base64.StdEncoding.EncodeToString(content)
|
||||
var chunks []string
|
||||
|
||||
// Split into chunks
|
||||
for i := 0; i < len(encoded); i += MaxChunkSize {
|
||||
end := i + MaxChunkSize
|
||||
if end > len(encoded) {
|
||||
end = len(encoded)
|
||||
}
|
||||
chunks = append(chunks, encoded[i:end])
|
||||
}
|
||||
|
||||
// Generate message ID
|
||||
messageID := generateMessageID()
|
||||
log.D.F("NRC: chunking large message (%d bytes) into %d chunks", len(content), len(chunks))
|
||||
|
||||
// Send each chunk
|
||||
for i, chunkData := range chunks {
|
||||
chunkMsg := ChunkMessage{
|
||||
Type: "CHUNK",
|
||||
MessageID: messageID,
|
||||
Index: i,
|
||||
Total: len(chunks),
|
||||
Data: chunkData,
|
||||
}
|
||||
|
||||
chunkResp := &ResponseMessage{
|
||||
Type: "CHUNK",
|
||||
Payload: []any{chunkMsg},
|
||||
}
|
||||
|
||||
if err := b.sendResponse(ctx, reqEvent, session, chunkResp); err != nil {
|
||||
return fmt.Errorf("failed to send chunk %d/%d: %w", i+1, len(chunks), err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// generateMessageID generates a random message ID for chunking.
|
||||
func generateMessageID() string {
|
||||
b := make([]byte, 16)
|
||||
rand.Read(b)
|
||||
return string(hex.Enc(b))
|
||||
}
|
||||
|
||||
// sendResponse encrypts and sends a response to the client.
|
||||
func (b *Bridge) sendResponse(ctx context.Context, reqEvent *event.E, session *Session, resp *ResponseMessage) error {
|
||||
// Marshal response content
|
||||
|
||||
@@ -2,6 +2,7 @@ package nrc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
@@ -21,6 +22,13 @@ import (
|
||||
"lol.mleku.dev/log"
|
||||
)
|
||||
|
||||
// chunkBuffer holds chunks for a message being reassembled.
|
||||
type chunkBuffer struct {
|
||||
chunks map[int]string
|
||||
total int
|
||||
receivedAt time.Time
|
||||
}
|
||||
|
||||
// Client connects to a private relay through the NRC tunnel.
|
||||
type Client struct {
|
||||
uri *ConnectionURI
|
||||
@@ -38,6 +46,10 @@ type Client struct {
|
||||
subscriptions map[string]chan *event.E
|
||||
subscriptionsMu sync.Mutex
|
||||
|
||||
// chunkBuffers holds partially received chunked messages.
|
||||
chunkBuffers map[string]*chunkBuffer
|
||||
chunkBuffersMu sync.Mutex
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
@@ -61,6 +73,7 @@ func NewClient(connectionURI string) (*Client, error) {
|
||||
clientSigner: uri.GetClientSigner(),
|
||||
pending: make(map[string]chan *ResponseMessage),
|
||||
subscriptions: make(map[string]chan *event.E),
|
||||
chunkBuffers: make(map[string]*chunkBuffer),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}, nil
|
||||
@@ -127,6 +140,11 @@ func (c *Client) Close() {
|
||||
}
|
||||
c.subscriptions = make(map[string]chan *event.E)
|
||||
c.subscriptionsMu.Unlock()
|
||||
|
||||
// Clear chunk buffers
|
||||
c.chunkBuffersMu.Lock()
|
||||
c.chunkBuffers = make(map[string]*chunkBuffer)
|
||||
c.chunkBuffersMu.Unlock()
|
||||
}
|
||||
|
||||
// handleResponses processes incoming NRC response events.
|
||||
@@ -186,6 +204,10 @@ func (c *Client) processResponse(ev *event.E) {
|
||||
c.handleCountResponse(resp.Payload, requestEventID)
|
||||
case "AUTH":
|
||||
c.handleAuthResponse(resp.Payload, requestEventID)
|
||||
case "IDS":
|
||||
c.handleIDSResponse(resp.Payload, requestEventID)
|
||||
case "CHUNK":
|
||||
c.handleChunkResponse(resp.Payload, requestEventID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,6 +337,127 @@ func (c *Client) handleAuthResponse(payload []any, requestEventID string) {
|
||||
}
|
||||
}
|
||||
|
||||
// handleIDSResponse handles an IDS response.
|
||||
func (c *Client) handleIDSResponse(payload []any, requestEventID string) {
|
||||
c.pendingMu.Lock()
|
||||
ch, exists := c.pending[requestEventID]
|
||||
c.pendingMu.Unlock()
|
||||
|
||||
if exists {
|
||||
resp := &ResponseMessage{Type: "IDS", Payload: payload}
|
||||
select {
|
||||
case ch <- resp:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleChunkResponse handles a CHUNK response and reassembles the message.
|
||||
func (c *Client) handleChunkResponse(payload []any, requestEventID string) {
|
||||
if len(payload) < 1 {
|
||||
return
|
||||
}
|
||||
|
||||
// Parse chunk message from payload
|
||||
chunkData, ok := payload[0].(map[string]any)
|
||||
if !ok {
|
||||
log.W.F("NRC: invalid chunk payload format")
|
||||
return
|
||||
}
|
||||
|
||||
messageID, _ := chunkData["messageId"].(string)
|
||||
indexFloat, _ := chunkData["index"].(float64)
|
||||
totalFloat, _ := chunkData["total"].(float64)
|
||||
data, _ := chunkData["data"].(string)
|
||||
|
||||
if messageID == "" || data == "" {
|
||||
log.W.F("NRC: chunk missing required fields")
|
||||
return
|
||||
}
|
||||
|
||||
index := int(indexFloat)
|
||||
total := int(totalFloat)
|
||||
|
||||
c.chunkBuffersMu.Lock()
|
||||
defer c.chunkBuffersMu.Unlock()
|
||||
|
||||
// Get or create buffer for this message
|
||||
buf, exists := c.chunkBuffers[messageID]
|
||||
if !exists {
|
||||
buf = &chunkBuffer{
|
||||
chunks: make(map[int]string),
|
||||
total: total,
|
||||
receivedAt: time.Now(),
|
||||
}
|
||||
c.chunkBuffers[messageID] = buf
|
||||
}
|
||||
|
||||
// Store the chunk
|
||||
buf.chunks[index] = data
|
||||
log.D.F("NRC: received chunk %d/%d for message %s", index+1, total, messageID[:8])
|
||||
|
||||
// Check if we have all chunks
|
||||
if len(buf.chunks) == buf.total {
|
||||
// Reassemble the message
|
||||
var encoded string
|
||||
for i := 0; i < buf.total; i++ {
|
||||
part, ok := buf.chunks[i]
|
||||
if !ok {
|
||||
log.W.F("NRC: missing chunk %d for message %s", i, messageID)
|
||||
delete(c.chunkBuffers, messageID)
|
||||
return
|
||||
}
|
||||
encoded += part
|
||||
}
|
||||
|
||||
// Decode from base64
|
||||
decoded, err := base64.StdEncoding.DecodeString(encoded)
|
||||
if err != nil {
|
||||
log.W.F("NRC: failed to decode chunked message: %v", err)
|
||||
delete(c.chunkBuffers, messageID)
|
||||
return
|
||||
}
|
||||
|
||||
// Parse the reassembled response
|
||||
var resp struct {
|
||||
Type string `json:"type"`
|
||||
Payload []any `json:"payload"`
|
||||
}
|
||||
if err := json.Unmarshal(decoded, &resp); err != nil {
|
||||
log.W.F("NRC: failed to parse reassembled message: %v", err)
|
||||
delete(c.chunkBuffers, messageID)
|
||||
return
|
||||
}
|
||||
|
||||
log.D.F("NRC: reassembled chunked message: %s", resp.Type)
|
||||
|
||||
// Clean up buffer
|
||||
delete(c.chunkBuffers, messageID)
|
||||
|
||||
// Route the reassembled response
|
||||
c.pendingMu.Lock()
|
||||
ch, exists := c.pending[requestEventID]
|
||||
c.pendingMu.Unlock()
|
||||
|
||||
if exists {
|
||||
respMsg := &ResponseMessage{Type: resp.Type, Payload: resp.Payload}
|
||||
select {
|
||||
case ch <- respMsg:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up stale buffers (older than 60 seconds)
|
||||
now := time.Now()
|
||||
for id, b := range c.chunkBuffers {
|
||||
if now.Sub(b.receivedAt) > 60*time.Second {
|
||||
log.W.F("NRC: discarding stale chunk buffer: %s", id)
|
||||
delete(c.chunkBuffers, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sendRequest sends an NRC request and waits for response.
|
||||
func (c *Client) sendRequest(ctx context.Context, msgType string, payload []any) (*ResponseMessage, error) {
|
||||
// Build request content
|
||||
@@ -511,3 +654,61 @@ func (c *Client) Count(ctx context.Context, subID string, filters ...*filter.F)
|
||||
func (c *Client) RelayURL() string {
|
||||
return "nrc://" + string(hex.Enc(c.uri.RelayPubkey))
|
||||
}
|
||||
|
||||
// RequestIDs sends an IDS request to get event manifests for diffing.
|
||||
func (c *Client) RequestIDs(ctx context.Context, subID string, filters ...*filter.F) ([]EventManifestEntry, error) {
|
||||
// Build payload: ["IDS", "<sub_id>", filter1, filter2, ...]
|
||||
payload := []any{"IDS", subID}
|
||||
for _, f := range filters {
|
||||
filterBytes, err := json.Marshal(f)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal filter failed: %w", err)
|
||||
}
|
||||
var filterMap map[string]any
|
||||
if err := json.Unmarshal(filterBytes, &filterMap); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal filter failed: %w", err)
|
||||
}
|
||||
payload = append(payload, filterMap)
|
||||
}
|
||||
|
||||
resp, err := c.sendRequest(ctx, "IDS", payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Parse IDS response: ["IDS", "<sub_id>", [...manifest...]]
|
||||
if resp.Type != "IDS" || len(resp.Payload) < 3 {
|
||||
return nil, fmt.Errorf("unexpected response type: %s", resp.Type)
|
||||
}
|
||||
|
||||
// Parse manifest entries
|
||||
manifestData, ok := resp.Payload[2].([]any)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid manifest response")
|
||||
}
|
||||
|
||||
var manifest []EventManifestEntry
|
||||
for _, item := range manifestData {
|
||||
entryMap, ok := item.(map[string]any)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
entry := EventManifestEntry{}
|
||||
if k, ok := entryMap["kind"].(float64); ok {
|
||||
entry.Kind = int(k)
|
||||
}
|
||||
if id, ok := entryMap["id"].(string); ok {
|
||||
entry.ID = id
|
||||
}
|
||||
if ca, ok := entryMap["created_at"].(float64); ok {
|
||||
entry.CreatedAt = int64(ca)
|
||||
}
|
||||
if d, ok := entryMap["d"].(string); ok {
|
||||
entry.D = d
|
||||
}
|
||||
manifest = append(manifest, entry)
|
||||
}
|
||||
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
@@ -276,16 +276,33 @@ func (m *SessionManager) Close() {
|
||||
|
||||
// RequestMessage represents a parsed NRC request message.
|
||||
type RequestMessage struct {
|
||||
Type string // EVENT, REQ, CLOSE, AUTH, COUNT
|
||||
Type string // EVENT, REQ, CLOSE, AUTH, COUNT, IDS
|
||||
Payload []any
|
||||
}
|
||||
|
||||
// ResponseMessage represents an NRC response message to be sent.
|
||||
type ResponseMessage struct {
|
||||
Type string // EVENT, OK, EOSE, NOTICE, CLOSED, COUNT, AUTH
|
||||
Type string // EVENT, OK, EOSE, NOTICE, CLOSED, COUNT, AUTH, IDS, CHUNK
|
||||
Payload []any
|
||||
}
|
||||
|
||||
// EventManifestEntry describes an event for manifest diffing (used by IDS).
|
||||
type EventManifestEntry struct {
|
||||
Kind int `json:"kind"`
|
||||
ID string `json:"id"`
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
D string `json:"d,omitempty"` // For parameterized replaceable events (kinds 30000-39999)
|
||||
}
|
||||
|
||||
// ChunkMessage represents a chunk of a large message.
|
||||
type ChunkMessage struct {
|
||||
Type string `json:"type"` // Always "CHUNK"
|
||||
MessageID string `json:"messageId"` // Unique ID for the chunked message
|
||||
Index int `json:"index"` // 0-based chunk index
|
||||
Total int `json:"total"` // Total number of chunks
|
||||
Data string `json:"data"` // Base64 encoded chunk data
|
||||
}
|
||||
|
||||
// ParseRequestContent parses the decrypted content of an NRC request.
|
||||
func ParseRequestContent(content []byte) (*RequestMessage, error) {
|
||||
// Content format: {"type": "EVENT|REQ|...", "payload": [...]}
|
||||
|
||||
@@ -1 +1 @@
|
||||
v0.52.3
|
||||
v0.52.4
|
||||
|
||||
Reference in New Issue
Block a user