Compare commits

...

4 Commits

Author SHA1 Message Date
woikos
e75e6de59b Extend event fetch time window to 5 years (v0.52.7)
Some checks are pending
Go / build-and-release (push) Waiting to run
- Change default event query window from 30 days to 5 years in web UI
- Remove 6-month fallback retry logic (no longer needed with larger window)
- Simplifies fetchAllEvents function in nostr.js

Files modified:
- app/web/src/nostr.js: Extended time window from 30 days to 5 years
- app/web/dist/bundle.js: Rebuilt with changes
- pkg/version/version: Bump to v0.52.7

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 18:24:42 +01:00
woikos
1297a45ee3 Fix standalone mode detection for port 7777 (v0.52.6)
Some checks are pending
Go / build-and-release (push) Waiting to run
- Add port 7777 to list of known relay ports in config.js
- Prevents false-positive standalone mode detection

Files modified:
- app/web/src/config.js: Add 7777 to relay port list
- pkg/version/version: Bump to v0.52.6

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 15:53:24 +01:00
woikos
138d5cbff9 Fix Neo4j parameterized replaceable event handling (v0.52.5)
Some checks are pending
Go / build-and-release (push) Waiting to run
- Add post-query filtering to return only latest version per (pubkey, kind, d-tag)
- Delete older versions on save for kinds 30000-39999 in Neo4j backend
- QueryAllVersions bypasses filtering for recovery UI compatibility
- Badger continues to keep old versions (filtered at query time)

Files modified:
- pkg/neo4j/query-events.go: Add replaceable event filtering logic
- pkg/neo4j/save-event.go: Add deleteOlderParameterizedReplaceable helper
- pkg/version/version: Bump to v0.52.5

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 15:26:39 +01:00
woikos
0c82307bf6 Fix log buffer race condition and add NRC IDS/chunking support (v0.52.4)
Some checks failed
Go / build-and-release (push) Has been cancelled
- Fix race condition in BufferedWriter causing slice bounds panics
  under concurrent writes by adding mutex protection
- Add IDS message type to NRC protocol for event manifest diffing
- Add chunking support for large NRC responses (>40KB)
- Skip rate limiting for NIP-46 bunker events (kind 24133) for
  realtime priority

Files modified:
- pkg/logbuffer/writer.go: Add mutex to protect lineBuf operations
- pkg/event/processing/processing.go: Skip rate limit for NIP-46
- pkg/protocol/nrc/bridge.go: Add IDS handler and chunked responses
- pkg/protocol/nrc/client.go: Add IDS client and chunk reassembly
- pkg/protocol/nrc/session.go: Add IDS/Chunk message types
- pkg/version/version: Bump to v0.52.4

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-19 15:53:49 +01:00
12 changed files with 518 additions and 27 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -26,7 +26,7 @@ export function initConfig() {
// 4. Not running on a typical relay port (3334) - likely a static server
const hasStoredRelay = !!localStorage.getItem("relayUrl");
const isFileProtocol = window.location.protocol === 'file:';
const isNonRelayPort = !['3334', '443', '80', ''].includes(window.location.port);
const isNonRelayPort = !['3334', '7777', '443', '80', ''].includes(window.location.port);
const standalone = BUILD_STANDALONE_MODE || hasStoredRelay || isFileProtocol || isNonRelayPort;
isStandaloneMode.set(standalone);

View File

@@ -951,11 +951,10 @@ export async function fetchAllEvents(options = {}) {
} = options;
const now = Math.floor(Date.now() / 1000);
const thirtyDaysAgo = now - (30 * 24 * 60 * 60);
const sixMonthsAgo = now - (180 * 24 * 60 * 60);
const fiveYearsAgo = now - (5 * 365 * 24 * 60 * 60);
// Start with 30 days if no since specified
const initialSince = since || thirtyDaysAgo;
// Start with 5 years if no since specified
const initialSince = since || fiveYearsAgo;
const filters = [{ ...rest }];
filters[0].since = initialSince;
@@ -964,21 +963,10 @@ export async function fetchAllEvents(options = {}) {
if (kinds) filters[0].kinds = kinds;
if (limit) filters[0].limit = limit;
let events = await fetchEvents(filters, {
const events = await fetchEvents(filters, {
timeout: 30000
});
// If we got few results and weren't already using a longer window, retry with 6 months
const fewResultsThreshold = Math.min(20, limit / 2);
if (events.length < fewResultsThreshold && initialSince > sixMonthsAgo && !since) {
console.log(`[fetchAllEvents] Only got ${events.length} events, retrying with 6-month window...`);
filters[0].since = sixMonthsAgo;
events = await fetchEvents(filters, {
timeout: 30000
});
console.log(`[fetchAllEvents] 6-month window returned ${events.length} events`);
}
return events;
}

View File

@@ -183,8 +183,9 @@ func (s *Service) saveEvent(ctx context.Context, ev *event.E) Result {
saveCtx, cancel := context.WithTimeout(ctx, s.cfg.WriteTimeout)
defer cancel()
// Apply rate limiting
if s.rateLimiter != nil && s.rateLimiter.IsEnabled() {
// Apply rate limiting (skip for NIP-46 bunker events which need realtime priority)
const kindNIP46 = 24133
if s.rateLimiter != nil && s.rateLimiter.IsEnabled() && ev.Kind != uint16(kindNIP46) {
const writeOpType = 1 // ratelimit.Write
s.rateLimiter.Wait(saveCtx, writeOpType)
}

View File

@@ -6,6 +6,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"
)
@@ -14,6 +15,7 @@ type BufferedWriter struct {
original io.Writer
buffer *Buffer
lineBuf bytes.Buffer
mu sync.Mutex
}
// Log format regex patterns
@@ -42,10 +44,12 @@ func (w *BufferedWriter) Write(p []byte) (n int, err error) {
// Store in buffer if we have one
if w.buffer != nil {
w.mu.Lock()
// Accumulate data in line buffer
w.lineBuf.Write(p)
// Process complete lines
var entries []LogEntry
for {
line, lineErr := w.lineBuf.ReadString('\n')
if lineErr != nil {
@@ -56,12 +60,18 @@ func (w *BufferedWriter) Write(p []byte) (n int, err error) {
break
}
// Parse and store the complete line
// Parse the complete line
entry := w.parseLine(strings.TrimSuffix(line, "\n"))
if entry.Message != "" {
w.buffer.Add(entry)
entries = append(entries, entry)
}
}
w.mu.Unlock()
// Add entries outside the lock to avoid holding it during buffer.Add
for _, entry := range entries {
w.buffer.Add(entry)
}
}
return

View File

@@ -3,12 +3,15 @@ package neo4j
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"time"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter"
"git.mleku.dev/mleku/nostr/encoders/hex"
"git.mleku.dev/mleku/nostr/encoders/kind"
"git.mleku.dev/mleku/nostr/encoders/tag"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database/indexes/types"
@@ -41,11 +44,81 @@ func (n *N) QueryEventsWithOptions(
}
// Parse response
evs, err = n.parseEventsFromResult(result)
allEvents, err := n.parseEventsFromResult(result)
if err != nil {
return nil, fmt.Errorf("failed to parse events: %w", err)
}
// Filter replaceable events to only return the latest version
// unless showAllVersions is true
if showAllVersions {
return allEvents, nil
}
// Separate events by type and filter replaceables
replaceableEvents := make(map[string]*event.E) // key: pubkey:kind
paramReplaceableEvents := make(map[string]map[string]*event.E) // key: pubkey:kind -> d-tag -> event
var regularEvents event.S
for _, ev := range allEvents {
if kind.IsReplaceable(ev.Kind) {
// For replaceable events, keep only the latest per pubkey:kind
key := hex.Enc(ev.Pubkey) + ":" + strconv.Itoa(int(ev.Kind))
existing, exists := replaceableEvents[key]
if !exists || ev.CreatedAt > existing.CreatedAt {
replaceableEvents[key] = ev
}
} else if kind.IsParameterizedReplaceable(ev.Kind) {
// For parameterized replaceable events, keep only the latest per pubkey:kind:d-tag
key := hex.Enc(ev.Pubkey) + ":" + strconv.Itoa(int(ev.Kind))
// Get the 'd' tag value
dTag := ev.Tags.GetFirst([]byte("d"))
var dValue string
if dTag != nil && dTag.Len() > 1 {
dValue = string(dTag.Value())
}
// Initialize inner map if needed
if _, exists := paramReplaceableEvents[key]; !exists {
paramReplaceableEvents[key] = make(map[string]*event.E)
}
// Keep only the newest version
existing, exists := paramReplaceableEvents[key][dValue]
if !exists || ev.CreatedAt > existing.CreatedAt {
paramReplaceableEvents[key][dValue] = ev
}
} else {
regularEvents = append(regularEvents, ev)
}
}
// Combine results
evs = make(event.S, 0, len(replaceableEvents)+len(paramReplaceableEvents)+len(regularEvents))
for _, ev := range replaceableEvents {
evs = append(evs, ev)
}
for _, innerMap := range paramReplaceableEvents {
for _, ev := range innerMap {
evs = append(evs, ev)
}
}
evs = append(evs, regularEvents...)
// Re-sort by timestamp (newest first)
sort.Slice(evs, func(i, j int) bool {
return evs[i].CreatedAt > evs[j].CreatedAt
})
// Re-apply limit after filtering
if f.Limit != nil && len(evs) > int(*f.Limit) {
evs = evs[:*f.Limit]
}
return evs, nil
}

View File

@@ -56,6 +56,15 @@ func (n *N) SaveEvent(c context.Context, ev *event.E) (exists bool, err error) {
return true, nil // Event already exists
}
// For parameterized replaceable events (kinds 30000-39999), delete older versions
// before saving the new one. This ensures Neo4j only stores the latest version.
if ev.Kind >= 30000 && ev.Kind < 40000 {
if err := n.deleteOlderParameterizedReplaceable(c, ev); err != nil {
n.Logger.Warningf("failed to delete older replaceable events: %v", err)
// Continue with save - older events will be filtered at query time
}
}
// Get next serial number
serial, err := n.getNextSerial()
if err != nil {
@@ -444,3 +453,37 @@ ORDER BY e.created_at DESC`
return wouldReplace, serials, nil
}
// deleteOlderParameterizedReplaceable deletes older versions of parameterized replaceable events
// (kinds 30000-39999) that have the same pubkey, kind, and d-tag value.
// This is called before saving a new event to ensure only the latest version is stored.
func (n *N) deleteOlderParameterizedReplaceable(c context.Context, ev *event.E) error {
authorPubkey := hex.Enc(ev.Pubkey[:])
// Get the d-tag value
dTag := ev.Tags.GetFirst([]byte{'d'})
dValue := ""
if dTag != nil && len(dTag.T) >= 2 {
dValue = string(dTag.T[1])
}
// Delete older events with same pubkey, kind, and d-tag
// Only delete if the existing event is older than the new one
cypher := `
MATCH (e:Event {kind: $kind, pubkey: $pubkey})-[:TAGGED_WITH]->(t:Tag {type: 'd', value: $dValue})
WHERE e.created_at < $createdAt
DETACH DELETE e`
params := map[string]any{
"pubkey": authorPubkey,
"kind": int64(ev.Kind),
"dValue": dValue,
"createdAt": ev.CreatedAt,
}
if _, err := n.ExecuteWrite(c, cypher, params); err != nil {
return fmt.Errorf("failed to delete older replaceable events: %w", err)
}
return nil
}

View File

@@ -2,6 +2,8 @@ package nrc
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/json"
"fmt"
"sync"
@@ -25,6 +27,8 @@ const (
KindNRCRequest = 24891
// KindNRCResponse is the event kind for NRC responses.
KindNRCResponse = 24892
// MaxChunkSize is the maximum size for a single chunk (40KB to stay under 65KB limit after NIP-44 + base64).
MaxChunkSize = 40000
)
// BridgeConfig holds configuration for the NRC bridge.
@@ -300,6 +304,8 @@ func (b *Bridge) forwardToLocalRelay(ctx context.Context, session *Session, reqE
return b.handleCLOSE(ctx, session, reqEvent, reqMsg)
case "COUNT":
return b.handleCOUNT(ctx, session, reqEvent, reqMsg, localConn)
case "IDS":
return b.handleIDS(ctx, session, reqEvent, reqMsg, localConn)
default:
return fmt.Errorf("unsupported message type: %s", reqMsg.Type)
}
@@ -462,6 +468,158 @@ func (b *Bridge) handleCOUNT(ctx context.Context, session *Session, reqEvent *ev
return b.sendResponse(ctx, reqEvent, session, resp)
}
// handleIDS handles an IDS message - returns event manifests for diffing.
func (b *Bridge) handleIDS(ctx context.Context, session *Session, reqEvent *event.E, reqMsg *RequestMessage, conn *ws.Client) error {
// Extract subscription ID and filters from payload
// Payload: ["IDS", "<sub_id>", filter1, filter2, ...]
if len(reqMsg.Payload) < 3 {
return fmt.Errorf("invalid IDS payload")
}
subID, ok := reqMsg.Payload[1].(string)
if !ok {
return fmt.Errorf("invalid subscription ID")
}
// Parse filters from payload
var filters []*filter.F
for i := 2; i < len(reqMsg.Payload); i++ {
filterMap, ok := reqMsg.Payload[i].(map[string]any)
if !ok {
continue
}
filterBytes, err := json.Marshal(filterMap)
if err != nil {
continue
}
var f filter.F
if err := json.Unmarshal(filterBytes, &f); err != nil {
continue
}
filters = append(filters, &f)
}
if len(filters) == 0 {
return fmt.Errorf("no valid filters in IDS")
}
// Add subscription to session
if err := session.AddSubscription(subID); err != nil {
return err
}
defer session.RemoveSubscription(subID)
// Create filter set
filterSet := filter.NewS(filters...)
// Subscribe to local relay
sub, err := conn.Subscribe(ctx, filterSet)
if chk.E(err) {
return fmt.Errorf("local subscribe failed: %w", err)
}
defer sub.Unsub()
// Collect events and build manifest
var manifest []EventManifestEntry
for {
select {
case <-ctx.Done():
return ctx.Err()
case ev := <-sub.Events:
if ev == nil {
// Subscription closed, send IDS response
return b.sendIDSResponse(ctx, reqEvent, session, subID, manifest)
}
// Build manifest entry
entry := EventManifestEntry{
Kind: int(ev.Kind),
ID: string(hex.Enc(ev.ID[:])),
CreatedAt: ev.CreatedAt,
}
// Check for d tag (parameterized replaceable events)
dTag := ev.Tags.GetFirst([]byte("d"))
if dTag != nil && dTag.Len() >= 2 {
entry.D = string(dTag.Value())
}
manifest = append(manifest, entry)
case <-sub.EndOfStoredEvents:
// Send IDS response with manifest
return b.sendIDSResponse(ctx, reqEvent, session, subID, manifest)
}
}
}
// sendIDSResponse sends an IDS response with the event manifest, chunking if necessary.
func (b *Bridge) sendIDSResponse(ctx context.Context, reqEvent *event.E, session *Session, subID string, manifest []EventManifestEntry) error {
resp := &ResponseMessage{
Type: "IDS",
Payload: []any{"IDS", subID, manifest},
}
return b.sendResponseChunked(ctx, reqEvent, session, resp)
}
// sendResponseChunked sends a response, chunking if necessary for large payloads.
func (b *Bridge) sendResponseChunked(ctx context.Context, reqEvent *event.E, session *Session, resp *ResponseMessage) error {
// Marshal response content
content, err := MarshalResponseContent(resp)
if err != nil {
return fmt.Errorf("marshal failed: %w", err)
}
// If small enough, send directly
if len(content) <= MaxChunkSize {
return b.sendResponse(ctx, reqEvent, session, resp)
}
// Need to chunk - encode to base64 for safe transmission
encoded := base64.StdEncoding.EncodeToString(content)
var chunks []string
// Split into chunks
for i := 0; i < len(encoded); i += MaxChunkSize {
end := i + MaxChunkSize
if end > len(encoded) {
end = len(encoded)
}
chunks = append(chunks, encoded[i:end])
}
// Generate message ID
messageID := generateMessageID()
log.D.F("NRC: chunking large message (%d bytes) into %d chunks", len(content), len(chunks))
// Send each chunk
for i, chunkData := range chunks {
chunkMsg := ChunkMessage{
Type: "CHUNK",
MessageID: messageID,
Index: i,
Total: len(chunks),
Data: chunkData,
}
chunkResp := &ResponseMessage{
Type: "CHUNK",
Payload: []any{chunkMsg},
}
if err := b.sendResponse(ctx, reqEvent, session, chunkResp); err != nil {
return fmt.Errorf("failed to send chunk %d/%d: %w", i+1, len(chunks), err)
}
}
return nil
}
// generateMessageID generates a random message ID for chunking.
func generateMessageID() string {
b := make([]byte, 16)
rand.Read(b)
return string(hex.Enc(b))
}
// sendResponse encrypts and sends a response to the client.
func (b *Bridge) sendResponse(ctx context.Context, reqEvent *event.E, session *Session, resp *ResponseMessage) error {
// Marshal response content

View File

@@ -2,6 +2,7 @@ package nrc
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"sync"
@@ -21,6 +22,13 @@ import (
"lol.mleku.dev/log"
)
// chunkBuffer holds chunks for a message being reassembled.
type chunkBuffer struct {
chunks map[int]string
total int
receivedAt time.Time
}
// Client connects to a private relay through the NRC tunnel.
type Client struct {
uri *ConnectionURI
@@ -38,6 +46,10 @@ type Client struct {
subscriptions map[string]chan *event.E
subscriptionsMu sync.Mutex
// chunkBuffers holds partially received chunked messages.
chunkBuffers map[string]*chunkBuffer
chunkBuffersMu sync.Mutex
ctx context.Context
cancel context.CancelFunc
}
@@ -61,6 +73,7 @@ func NewClient(connectionURI string) (*Client, error) {
clientSigner: uri.GetClientSigner(),
pending: make(map[string]chan *ResponseMessage),
subscriptions: make(map[string]chan *event.E),
chunkBuffers: make(map[string]*chunkBuffer),
ctx: ctx,
cancel: cancel,
}, nil
@@ -127,6 +140,11 @@ func (c *Client) Close() {
}
c.subscriptions = make(map[string]chan *event.E)
c.subscriptionsMu.Unlock()
// Clear chunk buffers
c.chunkBuffersMu.Lock()
c.chunkBuffers = make(map[string]*chunkBuffer)
c.chunkBuffersMu.Unlock()
}
// handleResponses processes incoming NRC response events.
@@ -186,6 +204,10 @@ func (c *Client) processResponse(ev *event.E) {
c.handleCountResponse(resp.Payload, requestEventID)
case "AUTH":
c.handleAuthResponse(resp.Payload, requestEventID)
case "IDS":
c.handleIDSResponse(resp.Payload, requestEventID)
case "CHUNK":
c.handleChunkResponse(resp.Payload, requestEventID)
}
}
@@ -315,6 +337,127 @@ func (c *Client) handleAuthResponse(payload []any, requestEventID string) {
}
}
// handleIDSResponse handles an IDS response.
func (c *Client) handleIDSResponse(payload []any, requestEventID string) {
c.pendingMu.Lock()
ch, exists := c.pending[requestEventID]
c.pendingMu.Unlock()
if exists {
resp := &ResponseMessage{Type: "IDS", Payload: payload}
select {
case ch <- resp:
default:
}
}
}
// handleChunkResponse handles a CHUNK response and reassembles the message.
func (c *Client) handleChunkResponse(payload []any, requestEventID string) {
if len(payload) < 1 {
return
}
// Parse chunk message from payload
chunkData, ok := payload[0].(map[string]any)
if !ok {
log.W.F("NRC: invalid chunk payload format")
return
}
messageID, _ := chunkData["messageId"].(string)
indexFloat, _ := chunkData["index"].(float64)
totalFloat, _ := chunkData["total"].(float64)
data, _ := chunkData["data"].(string)
if messageID == "" || data == "" {
log.W.F("NRC: chunk missing required fields")
return
}
index := int(indexFloat)
total := int(totalFloat)
c.chunkBuffersMu.Lock()
defer c.chunkBuffersMu.Unlock()
// Get or create buffer for this message
buf, exists := c.chunkBuffers[messageID]
if !exists {
buf = &chunkBuffer{
chunks: make(map[int]string),
total: total,
receivedAt: time.Now(),
}
c.chunkBuffers[messageID] = buf
}
// Store the chunk
buf.chunks[index] = data
log.D.F("NRC: received chunk %d/%d for message %s", index+1, total, messageID[:8])
// Check if we have all chunks
if len(buf.chunks) == buf.total {
// Reassemble the message
var encoded string
for i := 0; i < buf.total; i++ {
part, ok := buf.chunks[i]
if !ok {
log.W.F("NRC: missing chunk %d for message %s", i, messageID)
delete(c.chunkBuffers, messageID)
return
}
encoded += part
}
// Decode from base64
decoded, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
log.W.F("NRC: failed to decode chunked message: %v", err)
delete(c.chunkBuffers, messageID)
return
}
// Parse the reassembled response
var resp struct {
Type string `json:"type"`
Payload []any `json:"payload"`
}
if err := json.Unmarshal(decoded, &resp); err != nil {
log.W.F("NRC: failed to parse reassembled message: %v", err)
delete(c.chunkBuffers, messageID)
return
}
log.D.F("NRC: reassembled chunked message: %s", resp.Type)
// Clean up buffer
delete(c.chunkBuffers, messageID)
// Route the reassembled response
c.pendingMu.Lock()
ch, exists := c.pending[requestEventID]
c.pendingMu.Unlock()
if exists {
respMsg := &ResponseMessage{Type: resp.Type, Payload: resp.Payload}
select {
case ch <- respMsg:
default:
}
}
}
// Clean up stale buffers (older than 60 seconds)
now := time.Now()
for id, b := range c.chunkBuffers {
if now.Sub(b.receivedAt) > 60*time.Second {
log.W.F("NRC: discarding stale chunk buffer: %s", id)
delete(c.chunkBuffers, id)
}
}
}
// sendRequest sends an NRC request and waits for response.
func (c *Client) sendRequest(ctx context.Context, msgType string, payload []any) (*ResponseMessage, error) {
// Build request content
@@ -511,3 +654,61 @@ func (c *Client) Count(ctx context.Context, subID string, filters ...*filter.F)
func (c *Client) RelayURL() string {
return "nrc://" + string(hex.Enc(c.uri.RelayPubkey))
}
// RequestIDs sends an IDS request to get event manifests for diffing.
func (c *Client) RequestIDs(ctx context.Context, subID string, filters ...*filter.F) ([]EventManifestEntry, error) {
// Build payload: ["IDS", "<sub_id>", filter1, filter2, ...]
payload := []any{"IDS", subID}
for _, f := range filters {
filterBytes, err := json.Marshal(f)
if err != nil {
return nil, fmt.Errorf("marshal filter failed: %w", err)
}
var filterMap map[string]any
if err := json.Unmarshal(filterBytes, &filterMap); err != nil {
return nil, fmt.Errorf("unmarshal filter failed: %w", err)
}
payload = append(payload, filterMap)
}
resp, err := c.sendRequest(ctx, "IDS", payload)
if err != nil {
return nil, err
}
// Parse IDS response: ["IDS", "<sub_id>", [...manifest...]]
if resp.Type != "IDS" || len(resp.Payload) < 3 {
return nil, fmt.Errorf("unexpected response type: %s", resp.Type)
}
// Parse manifest entries
manifestData, ok := resp.Payload[2].([]any)
if !ok {
return nil, fmt.Errorf("invalid manifest response")
}
var manifest []EventManifestEntry
for _, item := range manifestData {
entryMap, ok := item.(map[string]any)
if !ok {
continue
}
entry := EventManifestEntry{}
if k, ok := entryMap["kind"].(float64); ok {
entry.Kind = int(k)
}
if id, ok := entryMap["id"].(string); ok {
entry.ID = id
}
if ca, ok := entryMap["created_at"].(float64); ok {
entry.CreatedAt = int64(ca)
}
if d, ok := entryMap["d"].(string); ok {
entry.D = d
}
manifest = append(manifest, entry)
}
return manifest, nil
}

View File

@@ -276,16 +276,33 @@ func (m *SessionManager) Close() {
// RequestMessage represents a parsed NRC request message.
type RequestMessage struct {
Type string // EVENT, REQ, CLOSE, AUTH, COUNT
Type string // EVENT, REQ, CLOSE, AUTH, COUNT, IDS
Payload []any
}
// ResponseMessage represents an NRC response message to be sent.
type ResponseMessage struct {
Type string // EVENT, OK, EOSE, NOTICE, CLOSED, COUNT, AUTH
Type string // EVENT, OK, EOSE, NOTICE, CLOSED, COUNT, AUTH, IDS, CHUNK
Payload []any
}
// EventManifestEntry describes an event for manifest diffing (used by IDS).
type EventManifestEntry struct {
Kind int `json:"kind"`
ID string `json:"id"`
CreatedAt int64 `json:"created_at"`
D string `json:"d,omitempty"` // For parameterized replaceable events (kinds 30000-39999)
}
// ChunkMessage represents a chunk of a large message.
type ChunkMessage struct {
Type string `json:"type"` // Always "CHUNK"
MessageID string `json:"messageId"` // Unique ID for the chunked message
Index int `json:"index"` // 0-based chunk index
Total int `json:"total"` // Total number of chunks
Data string `json:"data"` // Base64 encoded chunk data
}
// ParseRequestContent parses the decrypted content of an NRC request.
func ParseRequestContent(content []byte) (*RequestMessage, error) {
// Content format: {"type": "EVENT|REQ|...", "payload": [...]}

View File

@@ -1 +1 @@
v0.52.3
v0.52.7