diff --git a/pkg/event/processing/processing.go b/pkg/event/processing/processing.go index 0676f18..9994235 100644 --- a/pkg/event/processing/processing.go +++ b/pkg/event/processing/processing.go @@ -183,8 +183,9 @@ func (s *Service) saveEvent(ctx context.Context, ev *event.E) Result { saveCtx, cancel := context.WithTimeout(ctx, s.cfg.WriteTimeout) defer cancel() - // Apply rate limiting - if s.rateLimiter != nil && s.rateLimiter.IsEnabled() { + // Apply rate limiting (skip for NIP-46 bunker events which need realtime priority) + const kindNIP46 = 24133 + if s.rateLimiter != nil && s.rateLimiter.IsEnabled() && ev.Kind != uint16(kindNIP46) { const writeOpType = 1 // ratelimit.Write s.rateLimiter.Wait(saveCtx, writeOpType) } diff --git a/pkg/logbuffer/writer.go b/pkg/logbuffer/writer.go index bdbc508..ab03eb3 100644 --- a/pkg/logbuffer/writer.go +++ b/pkg/logbuffer/writer.go @@ -6,6 +6,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" ) @@ -14,6 +15,7 @@ type BufferedWriter struct { original io.Writer buffer *Buffer lineBuf bytes.Buffer + mu sync.Mutex } // Log format regex patterns @@ -42,10 +44,12 @@ func (w *BufferedWriter) Write(p []byte) (n int, err error) { // Store in buffer if we have one if w.buffer != nil { + w.mu.Lock() // Accumulate data in line buffer w.lineBuf.Write(p) // Process complete lines + var entries []LogEntry for { line, lineErr := w.lineBuf.ReadString('\n') if lineErr != nil { @@ -56,12 +60,18 @@ func (w *BufferedWriter) Write(p []byte) (n int, err error) { break } - // Parse and store the complete line + // Parse the complete line entry := w.parseLine(strings.TrimSuffix(line, "\n")) if entry.Message != "" { - w.buffer.Add(entry) + entries = append(entries, entry) } } + w.mu.Unlock() + + // Add entries outside the lock to avoid holding it during buffer.Add + for _, entry := range entries { + w.buffer.Add(entry) + } } return diff --git a/pkg/protocol/nrc/bridge.go b/pkg/protocol/nrc/bridge.go index 57eaf99..03064a8 100644 --- a/pkg/protocol/nrc/bridge.go +++ b/pkg/protocol/nrc/bridge.go @@ -2,6 +2,8 @@ package nrc import ( "context" + "crypto/rand" + "encoding/base64" "encoding/json" "fmt" "sync" @@ -25,6 +27,8 @@ const ( KindNRCRequest = 24891 // KindNRCResponse is the event kind for NRC responses. KindNRCResponse = 24892 + // MaxChunkSize is the maximum size for a single chunk (40KB to stay under 65KB limit after NIP-44 + base64). + MaxChunkSize = 40000 ) // BridgeConfig holds configuration for the NRC bridge. @@ -300,6 +304,8 @@ func (b *Bridge) forwardToLocalRelay(ctx context.Context, session *Session, reqE return b.handleCLOSE(ctx, session, reqEvent, reqMsg) case "COUNT": return b.handleCOUNT(ctx, session, reqEvent, reqMsg, localConn) + case "IDS": + return b.handleIDS(ctx, session, reqEvent, reqMsg, localConn) default: return fmt.Errorf("unsupported message type: %s", reqMsg.Type) } @@ -462,6 +468,158 @@ func (b *Bridge) handleCOUNT(ctx context.Context, session *Session, reqEvent *ev return b.sendResponse(ctx, reqEvent, session, resp) } +// handleIDS handles an IDS message - returns event manifests for diffing. +func (b *Bridge) handleIDS(ctx context.Context, session *Session, reqEvent *event.E, reqMsg *RequestMessage, conn *ws.Client) error { + // Extract subscription ID and filters from payload + // Payload: ["IDS", "", filter1, filter2, ...] + if len(reqMsg.Payload) < 3 { + return fmt.Errorf("invalid IDS payload") + } + subID, ok := reqMsg.Payload[1].(string) + if !ok { + return fmt.Errorf("invalid subscription ID") + } + + // Parse filters from payload + var filters []*filter.F + for i := 2; i < len(reqMsg.Payload); i++ { + filterMap, ok := reqMsg.Payload[i].(map[string]any) + if !ok { + continue + } + filterBytes, err := json.Marshal(filterMap) + if err != nil { + continue + } + var f filter.F + if err := json.Unmarshal(filterBytes, &f); err != nil { + continue + } + filters = append(filters, &f) + } + + if len(filters) == 0 { + return fmt.Errorf("no valid filters in IDS") + } + + // Add subscription to session + if err := session.AddSubscription(subID); err != nil { + return err + } + defer session.RemoveSubscription(subID) + + // Create filter set + filterSet := filter.NewS(filters...) + + // Subscribe to local relay + sub, err := conn.Subscribe(ctx, filterSet) + if chk.E(err) { + return fmt.Errorf("local subscribe failed: %w", err) + } + defer sub.Unsub() + + // Collect events and build manifest + var manifest []EventManifestEntry + for { + select { + case <-ctx.Done(): + return ctx.Err() + case ev := <-sub.Events: + if ev == nil { + // Subscription closed, send IDS response + return b.sendIDSResponse(ctx, reqEvent, session, subID, manifest) + } + + // Build manifest entry + entry := EventManifestEntry{ + Kind: int(ev.Kind), + ID: string(hex.Enc(ev.ID[:])), + CreatedAt: ev.CreatedAt, + } + + // Check for d tag (parameterized replaceable events) + dTag := ev.Tags.GetFirst([]byte("d")) + if dTag != nil && dTag.Len() >= 2 { + entry.D = string(dTag.Value()) + } + + manifest = append(manifest, entry) + case <-sub.EndOfStoredEvents: + // Send IDS response with manifest + return b.sendIDSResponse(ctx, reqEvent, session, subID, manifest) + } + } +} + +// sendIDSResponse sends an IDS response with the event manifest, chunking if necessary. +func (b *Bridge) sendIDSResponse(ctx context.Context, reqEvent *event.E, session *Session, subID string, manifest []EventManifestEntry) error { + resp := &ResponseMessage{ + Type: "IDS", + Payload: []any{"IDS", subID, manifest}, + } + return b.sendResponseChunked(ctx, reqEvent, session, resp) +} + +// sendResponseChunked sends a response, chunking if necessary for large payloads. +func (b *Bridge) sendResponseChunked(ctx context.Context, reqEvent *event.E, session *Session, resp *ResponseMessage) error { + // Marshal response content + content, err := MarshalResponseContent(resp) + if err != nil { + return fmt.Errorf("marshal failed: %w", err) + } + + // If small enough, send directly + if len(content) <= MaxChunkSize { + return b.sendResponse(ctx, reqEvent, session, resp) + } + + // Need to chunk - encode to base64 for safe transmission + encoded := base64.StdEncoding.EncodeToString(content) + var chunks []string + + // Split into chunks + for i := 0; i < len(encoded); i += MaxChunkSize { + end := i + MaxChunkSize + if end > len(encoded) { + end = len(encoded) + } + chunks = append(chunks, encoded[i:end]) + } + + // Generate message ID + messageID := generateMessageID() + log.D.F("NRC: chunking large message (%d bytes) into %d chunks", len(content), len(chunks)) + + // Send each chunk + for i, chunkData := range chunks { + chunkMsg := ChunkMessage{ + Type: "CHUNK", + MessageID: messageID, + Index: i, + Total: len(chunks), + Data: chunkData, + } + + chunkResp := &ResponseMessage{ + Type: "CHUNK", + Payload: []any{chunkMsg}, + } + + if err := b.sendResponse(ctx, reqEvent, session, chunkResp); err != nil { + return fmt.Errorf("failed to send chunk %d/%d: %w", i+1, len(chunks), err) + } + } + + return nil +} + +// generateMessageID generates a random message ID for chunking. +func generateMessageID() string { + b := make([]byte, 16) + rand.Read(b) + return string(hex.Enc(b)) +} + // sendResponse encrypts and sends a response to the client. func (b *Bridge) sendResponse(ctx context.Context, reqEvent *event.E, session *Session, resp *ResponseMessage) error { // Marshal response content diff --git a/pkg/protocol/nrc/client.go b/pkg/protocol/nrc/client.go index 828d295..e31dd57 100644 --- a/pkg/protocol/nrc/client.go +++ b/pkg/protocol/nrc/client.go @@ -2,6 +2,7 @@ package nrc import ( "context" + "encoding/base64" "encoding/json" "fmt" "sync" @@ -21,6 +22,13 @@ import ( "lol.mleku.dev/log" ) +// chunkBuffer holds chunks for a message being reassembled. +type chunkBuffer struct { + chunks map[int]string + total int + receivedAt time.Time +} + // Client connects to a private relay through the NRC tunnel. type Client struct { uri *ConnectionURI @@ -38,6 +46,10 @@ type Client struct { subscriptions map[string]chan *event.E subscriptionsMu sync.Mutex + // chunkBuffers holds partially received chunked messages. + chunkBuffers map[string]*chunkBuffer + chunkBuffersMu sync.Mutex + ctx context.Context cancel context.CancelFunc } @@ -61,6 +73,7 @@ func NewClient(connectionURI string) (*Client, error) { clientSigner: uri.GetClientSigner(), pending: make(map[string]chan *ResponseMessage), subscriptions: make(map[string]chan *event.E), + chunkBuffers: make(map[string]*chunkBuffer), ctx: ctx, cancel: cancel, }, nil @@ -127,6 +140,11 @@ func (c *Client) Close() { } c.subscriptions = make(map[string]chan *event.E) c.subscriptionsMu.Unlock() + + // Clear chunk buffers + c.chunkBuffersMu.Lock() + c.chunkBuffers = make(map[string]*chunkBuffer) + c.chunkBuffersMu.Unlock() } // handleResponses processes incoming NRC response events. @@ -186,6 +204,10 @@ func (c *Client) processResponse(ev *event.E) { c.handleCountResponse(resp.Payload, requestEventID) case "AUTH": c.handleAuthResponse(resp.Payload, requestEventID) + case "IDS": + c.handleIDSResponse(resp.Payload, requestEventID) + case "CHUNK": + c.handleChunkResponse(resp.Payload, requestEventID) } } @@ -315,6 +337,127 @@ func (c *Client) handleAuthResponse(payload []any, requestEventID string) { } } +// handleIDSResponse handles an IDS response. +func (c *Client) handleIDSResponse(payload []any, requestEventID string) { + c.pendingMu.Lock() + ch, exists := c.pending[requestEventID] + c.pendingMu.Unlock() + + if exists { + resp := &ResponseMessage{Type: "IDS", Payload: payload} + select { + case ch <- resp: + default: + } + } +} + +// handleChunkResponse handles a CHUNK response and reassembles the message. +func (c *Client) handleChunkResponse(payload []any, requestEventID string) { + if len(payload) < 1 { + return + } + + // Parse chunk message from payload + chunkData, ok := payload[0].(map[string]any) + if !ok { + log.W.F("NRC: invalid chunk payload format") + return + } + + messageID, _ := chunkData["messageId"].(string) + indexFloat, _ := chunkData["index"].(float64) + totalFloat, _ := chunkData["total"].(float64) + data, _ := chunkData["data"].(string) + + if messageID == "" || data == "" { + log.W.F("NRC: chunk missing required fields") + return + } + + index := int(indexFloat) + total := int(totalFloat) + + c.chunkBuffersMu.Lock() + defer c.chunkBuffersMu.Unlock() + + // Get or create buffer for this message + buf, exists := c.chunkBuffers[messageID] + if !exists { + buf = &chunkBuffer{ + chunks: make(map[int]string), + total: total, + receivedAt: time.Now(), + } + c.chunkBuffers[messageID] = buf + } + + // Store the chunk + buf.chunks[index] = data + log.D.F("NRC: received chunk %d/%d for message %s", index+1, total, messageID[:8]) + + // Check if we have all chunks + if len(buf.chunks) == buf.total { + // Reassemble the message + var encoded string + for i := 0; i < buf.total; i++ { + part, ok := buf.chunks[i] + if !ok { + log.W.F("NRC: missing chunk %d for message %s", i, messageID) + delete(c.chunkBuffers, messageID) + return + } + encoded += part + } + + // Decode from base64 + decoded, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + log.W.F("NRC: failed to decode chunked message: %v", err) + delete(c.chunkBuffers, messageID) + return + } + + // Parse the reassembled response + var resp struct { + Type string `json:"type"` + Payload []any `json:"payload"` + } + if err := json.Unmarshal(decoded, &resp); err != nil { + log.W.F("NRC: failed to parse reassembled message: %v", err) + delete(c.chunkBuffers, messageID) + return + } + + log.D.F("NRC: reassembled chunked message: %s", resp.Type) + + // Clean up buffer + delete(c.chunkBuffers, messageID) + + // Route the reassembled response + c.pendingMu.Lock() + ch, exists := c.pending[requestEventID] + c.pendingMu.Unlock() + + if exists { + respMsg := &ResponseMessage{Type: resp.Type, Payload: resp.Payload} + select { + case ch <- respMsg: + default: + } + } + } + + // Clean up stale buffers (older than 60 seconds) + now := time.Now() + for id, b := range c.chunkBuffers { + if now.Sub(b.receivedAt) > 60*time.Second { + log.W.F("NRC: discarding stale chunk buffer: %s", id) + delete(c.chunkBuffers, id) + } + } +} + // sendRequest sends an NRC request and waits for response. func (c *Client) sendRequest(ctx context.Context, msgType string, payload []any) (*ResponseMessage, error) { // Build request content @@ -511,3 +654,61 @@ func (c *Client) Count(ctx context.Context, subID string, filters ...*filter.F) func (c *Client) RelayURL() string { return "nrc://" + string(hex.Enc(c.uri.RelayPubkey)) } + +// RequestIDs sends an IDS request to get event manifests for diffing. +func (c *Client) RequestIDs(ctx context.Context, subID string, filters ...*filter.F) ([]EventManifestEntry, error) { + // Build payload: ["IDS", "", filter1, filter2, ...] + payload := []any{"IDS", subID} + for _, f := range filters { + filterBytes, err := json.Marshal(f) + if err != nil { + return nil, fmt.Errorf("marshal filter failed: %w", err) + } + var filterMap map[string]any + if err := json.Unmarshal(filterBytes, &filterMap); err != nil { + return nil, fmt.Errorf("unmarshal filter failed: %w", err) + } + payload = append(payload, filterMap) + } + + resp, err := c.sendRequest(ctx, "IDS", payload) + if err != nil { + return nil, err + } + + // Parse IDS response: ["IDS", "", [...manifest...]] + if resp.Type != "IDS" || len(resp.Payload) < 3 { + return nil, fmt.Errorf("unexpected response type: %s", resp.Type) + } + + // Parse manifest entries + manifestData, ok := resp.Payload[2].([]any) + if !ok { + return nil, fmt.Errorf("invalid manifest response") + } + + var manifest []EventManifestEntry + for _, item := range manifestData { + entryMap, ok := item.(map[string]any) + if !ok { + continue + } + + entry := EventManifestEntry{} + if k, ok := entryMap["kind"].(float64); ok { + entry.Kind = int(k) + } + if id, ok := entryMap["id"].(string); ok { + entry.ID = id + } + if ca, ok := entryMap["created_at"].(float64); ok { + entry.CreatedAt = int64(ca) + } + if d, ok := entryMap["d"].(string); ok { + entry.D = d + } + manifest = append(manifest, entry) + } + + return manifest, nil +} diff --git a/pkg/protocol/nrc/session.go b/pkg/protocol/nrc/session.go index c5072b9..8025981 100644 --- a/pkg/protocol/nrc/session.go +++ b/pkg/protocol/nrc/session.go @@ -276,16 +276,33 @@ func (m *SessionManager) Close() { // RequestMessage represents a parsed NRC request message. type RequestMessage struct { - Type string // EVENT, REQ, CLOSE, AUTH, COUNT + Type string // EVENT, REQ, CLOSE, AUTH, COUNT, IDS Payload []any } // ResponseMessage represents an NRC response message to be sent. type ResponseMessage struct { - Type string // EVENT, OK, EOSE, NOTICE, CLOSED, COUNT, AUTH + Type string // EVENT, OK, EOSE, NOTICE, CLOSED, COUNT, AUTH, IDS, CHUNK Payload []any } +// EventManifestEntry describes an event for manifest diffing (used by IDS). +type EventManifestEntry struct { + Kind int `json:"kind"` + ID string `json:"id"` + CreatedAt int64 `json:"created_at"` + D string `json:"d,omitempty"` // For parameterized replaceable events (kinds 30000-39999) +} + +// ChunkMessage represents a chunk of a large message. +type ChunkMessage struct { + Type string `json:"type"` // Always "CHUNK" + MessageID string `json:"messageId"` // Unique ID for the chunked message + Index int `json:"index"` // 0-based chunk index + Total int `json:"total"` // Total number of chunks + Data string `json:"data"` // Base64 encoded chunk data +} + // ParseRequestContent parses the decrypted content of an NRC request. func ParseRequestContent(content []byte) (*RequestMessage, error) { // Content format: {"type": "EVENT|REQ|...", "payload": [...]} diff --git a/pkg/version/version b/pkg/version/version index 303f57c..476935b 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.52.3 +v0.52.4