From 22cde96f3fbe994119ffae7d0edb147e5c7aee7d Mon Sep 17 00:00:00 2001 From: mleku Date: Fri, 19 Sep 2025 16:17:44 +0100 Subject: [PATCH] Remove `bufpool` references and unused imports, optimize memory operations. - Removed `bufpool` usage throughout `tag`, `tags`, and `event` packages for memory efficiency. - Replaced in-place buffer modifications with independent, deep-copied allocations to prevent unintended mutations. - Added new `Clone` method for deep copying `event.E`. - Ensured valid JSON emission for nil `Tags` in `event` marshaling. - Introduced `cmd/stresstest` for relay stress-testing with detailed workload generation and query simulation. --- app/handle-event.go | 4 +- cmd/stresstest/main.go | 605 ++++++++++++++++++ .../envelopes/eventenvelope/eventenvelope.go | 7 +- pkg/encoders/event/event.go | 88 ++- pkg/encoders/tag/tag.go | 25 +- pkg/encoders/tag/tags.go | 6 - pkg/encoders/text/helpers.go | 5 +- 7 files changed, 681 insertions(+), 59 deletions(-) create mode 100644 cmd/stresstest/main.go diff --git a/app/handle-event.go b/app/handle-event.go index 417f96a..982600b 100644 --- a/app/handle-event.go +++ b/app/handle-event.go @@ -151,7 +151,9 @@ func (l *Listener) HandleEvent(msg []byte) (err error) { return } // Deliver the event to subscribers immediately after sending OK response - go l.publishers.Deliver(env.E) + // Clone the event to prevent corruption when the original is freed + clonedEvent := env.E.Clone() + go l.publishers.Deliver(clonedEvent) log.D.F("saved event %0x", env.E.ID) var isNewFromAdmin bool for _, admin := range l.Admins { diff --git a/cmd/stresstest/main.go b/cmd/stresstest/main.go new file mode 100644 index 0000000..82462a4 --- /dev/null +++ b/cmd/stresstest/main.go @@ -0,0 +1,605 @@ +package main + +import ( + "bufio" + "bytes" + "context" + "flag" + "fmt" + "math/rand" + "os" + "os/signal" + "strings" + "sync" + "sync/atomic" + "time" + + "lol.mleku.dev/log" + "next.orly.dev/pkg/crypto/p256k" + "next.orly.dev/pkg/encoders/event" + "next.orly.dev/pkg/encoders/event/examples" + "next.orly.dev/pkg/encoders/filter" + "next.orly.dev/pkg/encoders/hex" + "next.orly.dev/pkg/encoders/kind" + "next.orly.dev/pkg/encoders/tag" + "next.orly.dev/pkg/encoders/timestamp" + "next.orly.dev/pkg/protocol/ws" +) + +// randomHex returns a hex-encoded string of n random bytes (2n hex chars) +func randomHex(n int) string { + b := make([]byte, n) + _, _ = rand.Read(b) + return hex.Enc(b) +} + +func makeEvent(rng *rand.Rand, signer *p256k.Signer) (*event.E, error) { + ev := &event.E{ + CreatedAt: time.Now().Unix(), + Kind: kind.TextNote.K, + Tags: tag.NewS(), + Content: []byte(fmt.Sprintf("stresstest %d", rng.Int63())), + } + + // Random number of p-tags up to 100 + nPTags := rng.Intn(101) // 0..100 inclusive + for i := 0; i < nPTags; i++ { + // random 32-byte pubkey in hex (64 chars) + phex := randomHex(32) + ev.Tags.Append(tag.NewFromAny("p", phex)) + } + + // Sign and verify to ensure pubkey, id and signature are coherent + if err := ev.Sign(signer); err != nil { + return nil, err + } + if ok, err := ev.Verify(); err != nil || !ok { + return nil, fmt.Errorf("event signature verification failed: %v", err) + } + return ev, nil +} + +type RelayConn struct { + mu sync.RWMutex + client *ws.Client + url string +} + +type CacheIndex struct { + events []*event.E + ids [][]byte + authors [][]byte + times []int64 + tags map[byte][][]byte // single-letter tag -> list of values +} + +func (rc *RelayConn) Get() *ws.Client { + rc.mu.RLock() + defer rc.mu.RUnlock() + return rc.client +} + +func (rc *RelayConn) Reconnect(ctx context.Context) error { + rc.mu.Lock() + defer rc.mu.Unlock() + if rc.client != nil { + _ = rc.client.Close() + } + c, err := ws.RelayConnect(ctx, rc.url) + if err != nil { + return err + } + rc.client = c + return nil +} + +// loadCacheAndIndex parses examples.Cache (JSONL of events) and builds an index +func loadCacheAndIndex() (*CacheIndex, error) { + scanner := bufio.NewScanner(bytes.NewReader(examples.Cache)) + idx := &CacheIndex{tags: make(map[byte][][]byte)} + for scanner.Scan() { + line := scanner.Bytes() + if len(bytes.TrimSpace(line)) == 0 { + continue + } + ev := event.New() + rem, err := ev.Unmarshal(line) + _ = rem + if err != nil { + // skip malformed lines + continue + } + idx.events = append(idx.events, ev) + // collect fields + if len(ev.ID) > 0 { + idx.ids = append(idx.ids, append([]byte(nil), ev.ID...)) + } + if len(ev.Pubkey) > 0 { + idx.authors = append(idx.authors, append([]byte(nil), ev.Pubkey...)) + } + idx.times = append(idx.times, ev.CreatedAt) + if ev.Tags != nil { + for _, tg := range *ev.Tags { + if tg == nil || tg.Len() < 2 { + continue + } + k := tg.Key() + if len(k) != 1 { + continue // only single-letter keys per requirement + } + key := k[0] + for _, v := range tg.T[1:] { + idx.tags[key] = append( + idx.tags[key], append([]byte(nil), v...), + ) + } + } + } + } + return idx, nil +} + +// publishCacheEvents uploads all cache events to the relay, waiting for OKs +func publishCacheEvents( + ctx context.Context, rc *RelayConn, idx *CacheIndex, + publishTimeout time.Duration, +) (okCount int) { + // Use an index-based loop so we can truly retry the same event on transient errors + for i := 0; i < len(idx.events); i++ { + // allow cancellation + select { + case <-ctx.Done(): + return okCount + default: + } + ev := idx.events[i] + client := rc.Get() + if client == nil { + _ = rc.Reconnect(ctx) + // retry same index + i-- + continue + } + // Create per-publish timeout context and add diagnostics to understand cancellations + pubCtx, cancel := context.WithTimeout(ctx, publishTimeout) + if dl, ok := pubCtx.Deadline(); ok { + log.T.F( + "cache publish %d/%d: deadline=%s now=%s remain=%s", + i+1, len(idx.events), dl.Format(time.RFC3339Nano), + time.Now().Format(time.RFC3339Nano), + time.Until(dl).Truncate(time.Millisecond), + ) + } + err := client.Publish(pubCtx, ev) + log.I.F("event: %s", ev.Serialize()) + // it's safe to cancel our per-publish context after Publish returns + cancel() + if err != nil { + log.E.F("cache publish error: %v (ctxErr=%v)", err, pubCtx.Err()) + errStr := err.Error() + if strings.Contains(errStr, "connection closed") || + strings.Contains(errStr, "context deadline exceeded") || + strings.Contains(errStr, "given up waiting for an OK") { + _ = rc.Reconnect(ctx) + // retry this event by decrementing i so the for-loop will attempt it again + i-- + continue + } + // small backoff and move on to next event on non-transient errors + time.Sleep(50 * time.Millisecond) + continue + } + okCount++ + } + return okCount +} + +// buildRandomFilter builds a filter combining random subsets of id, author, timestamp, and a single-letter tag value. +func buildRandomFilter(idx *CacheIndex, rng *rand.Rand, mask int) *filter.F { + // pick a random base event as anchor for fields + i := rng.Intn(len(idx.events)) + ev := idx.events[i] + f := filter.New() + // clear defaults we don't set + f.Kinds = kind.NewS() // we don't constrain kinds + // include fields based on mask bits: 1=id, 2=author, 4=timestamp, 8=tag + if mask&1 != 0 { + f.Ids.T = append(f.Ids.T, append([]byte(nil), ev.ID...)) + } + if mask&2 != 0 { + f.Authors.T = append(f.Authors.T, append([]byte(nil), ev.Pubkey...)) + } + if mask&4 != 0 { + // use a tight window around the event timestamp (exact match) + f.Since = timestamp.FromUnix(ev.CreatedAt) + f.Until = timestamp.FromUnix(ev.CreatedAt) + } + if mask&8 != 0 { + // choose a random single-letter tag from this event if present; fallback to global index + var key byte + var val []byte + chosen := false + if ev.Tags != nil { + for _, tg := range *ev.Tags { + if tg == nil || tg.Len() < 2 { + continue + } + k := tg.Key() + if len(k) == 1 { + key = k[0] + vv := tg.T[1:] + val = vv[rng.Intn(len(vv))] + chosen = true + break + } + } + } + if !chosen && len(idx.tags) > 0 { + // pick a random entry from global tags map + keys := make([]byte, 0, len(idx.tags)) + for k := range idx.tags { + keys = append(keys, k) + } + key = keys[rng.Intn(len(keys))] + vals := idx.tags[key] + val = vals[rng.Intn(len(vals))] + } + if key != 0 && len(val) > 0 { + f.Tags.Append(tag.NewFromBytesSlice([]byte{key}, val)) + } + } + return f +} + +func publisherWorker( + ctx context.Context, rc *RelayConn, id int, stats *uint64, + publishTimeout time.Duration, +) { + // Unique RNG per worker + src := rand.NewSource(time.Now().UnixNano() ^ int64(id<<16)) + rng := rand.New(src) + // Generate and reuse signing key per worker + signer := &p256k.Signer{} + if err := signer.Generate(); err != nil { + log.E.F("worker %d: signer generate error: %v", id, err) + return + } + + for { + select { + case <-ctx.Done(): + return + default: + } + + ev, err := makeEvent(rng, signer) + if err != nil { + log.E.F("worker %d: makeEvent error: %v", id, err) + return + } + + // Publish waits for OK (ws.Client.Publish does the waiting) + client := rc.Get() + if client == nil { + _ = rc.Reconnect(ctx) + continue + } + // per-publish timeout context + pubCtx, cancel := context.WithTimeout(ctx, publishTimeout) + if err := client.Publish(pubCtx, ev); err != nil { + cancel() + log.E.F("worker %d: publish error: %v", id, err) + errStr := err.Error() + if strings.Contains( + errStr, "connection closed", + ) || strings.Contains( + errStr, "context deadline exceeded", + ) || strings.Contains(errStr, "given up waiting for an OK") { + for attempt := 0; attempt < 5; attempt++ { + if ctx.Err() != nil { + return + } + if err := rc.Reconnect(ctx); err == nil { + log.I.F("worker %d: reconnected to %s", id, rc.url) + break + } + select { + case <-time.After(200 * time.Millisecond): + case <-ctx.Done(): + return + } + } + } + // back off briefly on error to avoid tight loop if relay misbehaves + select { + case <-time.After(100 * time.Millisecond): + case <-ctx.Done(): + return + } + continue + } + cancel() + + atomic.AddUint64(stats, 1) + + // Randomly fluctuate pacing: small random sleep 0..50ms plus occasional longer jitter + sleep := time.Duration(rng.Intn(50)) * time.Millisecond + if rng.Intn(10) == 0 { // 10% chance add extra 100..400ms + sleep += time.Duration(100+rng.Intn(300)) * time.Millisecond + } + select { + case <-time.After(sleep): + case <-ctx.Done(): + return + } + } +} + +func queryWorker( + ctx context.Context, rc *RelayConn, idx *CacheIndex, id int, + queries, results *uint64, subTimeout time.Duration, + minInterval, maxInterval time.Duration, +) { + rng := rand.New(rand.NewSource(time.Now().UnixNano() ^ int64(id<<24))) + mask := 1 + for { + select { + case <-ctx.Done(): + return + default: + } + if len(idx.events) == 0 { + time.Sleep(200 * time.Millisecond) + continue + } + f := buildRandomFilter(idx, rng, mask) + mask++ + if mask > 15 { // all combinations of 4 criteria (excluding 0) + mask = 1 + } + client := rc.Get() + if client == nil { + _ = rc.Reconnect(ctx) + continue + } + ff := filter.S{f} + sCtx, cancel := context.WithTimeout(ctx, subTimeout) + sub, err := client.Subscribe( + sCtx, &ff, ws.WithLabel("stresstest-query"), + ) + if err != nil { + cancel() + // reconnect on connection issues + errStr := err.Error() + if strings.Contains(errStr, "connection closed") { + _ = rc.Reconnect(ctx) + } + continue + } + atomic.AddUint64(queries, 1) + // read until EOSE or timeout + innerDone := false + for !innerDone { + select { + case <-sCtx.Done(): + innerDone = true + case <-sub.EndOfStoredEvents: + innerDone = true + case ev, ok := <-sub.Events: + if !ok { + innerDone = true + break + } + if ev != nil { + atomic.AddUint64(results, 1) + } + } + } + sub.Unsub() + cancel() + // wait a random interval between queries + interval := minInterval + if maxInterval > minInterval { + delta := rng.Int63n(int64(maxInterval - minInterval)) + interval += time.Duration(delta) + } + select { + case <-time.After(interval): + case <-ctx.Done(): + return + } + } +} + +func startReader(ctx context.Context, rl *ws.Client, received *uint64) error { + // Broad filter: subscribe to text notes since now-5m to catch our own writes + f := filter.New() + f.Kinds = kind.NewS(kind.TextNote) + // We don't set authors to ensure we read all text notes coming in + ff := filter.S{f} + sub, err := rl.Subscribe(ctx, &ff, ws.WithLabel("stresstest-reader")) + if err != nil { + return err + } + + go func() { + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-sub.Events: + if !ok { + return + } + if ev != nil { + atomic.AddUint64(received, 1) + } + } + } + }() + + return nil +} + +func main() { + var ( + address string + port int + workers int + duration time.Duration + publishTimeout time.Duration + queryWorkers int + queryTimeout time.Duration + queryMinInt time.Duration + queryMaxInt time.Duration + skipCache bool + ) + + flag.StringVar( + &address, "address", "localhost", "relay address (host or IP)", + ) + flag.IntVar(&port, "port", 3334, "relay port") + flag.IntVar( + &workers, "workers", 8, "number of concurrent publisher workers", + ) + flag.DurationVar( + &duration, "duration", 60*time.Second, + "how long to run the stress test", + ) + flag.DurationVar( + &publishTimeout, "publish-timeout", 15*time.Second, + "timeout waiting for OK per publish", + ) + flag.IntVar( + &queryWorkers, "query-workers", 4, "number of concurrent query workers", + ) + flag.DurationVar( + &queryTimeout, "query-timeout", 3*time.Second, + "subscription timeout for queries", + ) + flag.DurationVar( + &queryMinInt, "query-min-interval", 50*time.Millisecond, + "minimum interval between queries per worker", + ) + flag.DurationVar( + &queryMaxInt, "query-max-interval", 300*time.Millisecond, + "maximum interval between queries per worker", + ) + flag.BoolVar( + &skipCache, "skip-cache", false, + "skip uploading examples.Cache before running", + ) + flag.Parse() + + relayURL := fmt.Sprintf("ws://%s:%d", address, port) + log.I.F("stresstest: connecting to %s", relayURL) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Handle Ctrl+C + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, os.Interrupt) + go func() { + select { + case <-sigc: + log.I.Ln("interrupt received, shutting down...") + cancel() + case <-ctx.Done(): + } + }() + + rl, err := ws.RelayConnect(ctx, relayURL) + if err != nil { + log.E.F("failed to connect to relay %s: %v", relayURL, err) + os.Exit(1) + } + defer rl.Close() + + rc := &RelayConn{client: rl, url: relayURL} + + // Load and publish cache events first (unless skipped) + idx, err := loadCacheAndIndex() + if err != nil { + log.E.F("failed to load examples.Cache: %v", err) + } + cachePublished := 0 + if !skipCache && idx != nil && len(idx.events) > 0 { + log.I.F("uploading %d events from examples.Cache...", len(idx.events)) + cachePublished = publishCacheEvents(ctx, rc, idx, publishTimeout) + log.I.F("uploaded %d/%d cache events", cachePublished, len(idx.events)) + } + + var pubOK uint64 + var recvCount uint64 + var qCount uint64 + var qResults uint64 + + if err := startReader(ctx, rl, &recvCount); err != nil { + log.E.F("reader subscribe error: %v", err) + // continue anyway, we can still write + } + + wg := sync.WaitGroup{} + // Start publisher workers + wg.Add(workers) + for i := 0; i < workers; i++ { + i := i + go func() { + defer wg.Done() + publisherWorker(ctx, rc, i, &pubOK, publishTimeout) + }() + } + // Start query workers + if idx != nil && len(idx.events) > 0 && queryWorkers > 0 { + wg.Add(queryWorkers) + for i := 0; i < queryWorkers; i++ { + i := i + go func() { + defer wg.Done() + queryWorker( + ctx, rc, idx, i, &qCount, &qResults, queryTimeout, + queryMinInt, queryMaxInt, + ) + }() + } + } + + // Timer for duration and periodic stats + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + end := time.NewTimer(duration) + start := time.Now() + +loop: + for { + select { + case <-ticker.C: + elapsed := time.Since(start).Seconds() + p := atomic.LoadUint64(&pubOK) + r := atomic.LoadUint64(&recvCount) + qc := atomic.LoadUint64(&qCount) + qr := atomic.LoadUint64(&qResults) + log.I.F( + "elapsed=%.1fs published_ok=%d (%.0f/s) received=%d cache_published=%d queries=%d results=%d", + elapsed, p, float64(p)/elapsed, r, cachePublished, qc, qr, + ) + case <-end.C: + break loop + case <-ctx.Done(): + break loop + } + } + + cancel() + wg.Wait() + p := atomic.LoadUint64(&pubOK) + r := atomic.LoadUint64(&recvCount) + qc := atomic.LoadUint64(&qCount) + qr := atomic.LoadUint64(&qResults) + log.I.F( + "stresstest complete: cache_published=%d published_ok=%d received=%d queries=%d results=%d duration=%s", + cachePublished, p, r, qc, qr, + time.Since(start).Truncate(time.Millisecond), + ) +} diff --git a/pkg/encoders/envelopes/eventenvelope/eventenvelope.go b/pkg/encoders/envelopes/eventenvelope/eventenvelope.go index eded6cf..d62857e 100644 --- a/pkg/encoders/envelopes/eventenvelope/eventenvelope.go +++ b/pkg/encoders/envelopes/eventenvelope/eventenvelope.go @@ -7,12 +7,10 @@ import ( "lol.mleku.dev/chk" "lol.mleku.dev/errorf" - "lol.mleku.dev/log" "next.orly.dev/pkg/encoders/envelopes" "next.orly.dev/pkg/encoders/event" "next.orly.dev/pkg/encoders/text" "next.orly.dev/pkg/interfaces/codec" - "next.orly.dev/pkg/utils/bufpool" "next.orly.dev/pkg/utils/constraints" "next.orly.dev/pkg/utils/units" ) @@ -76,8 +74,8 @@ func (en *Submission) Unmarshal(b []byte) (r []byte, err error) { if r, err = en.E.Unmarshal(r); chk.T(err) { return } - buf := bufpool.Get() - r = en.E.Marshal(buf) + // after parsing the event object, r points just after the event JSON + // now skip to the end of the envelope (consume comma/closing bracket etc.) if r, err = envelopes.SkipToTheEnd(r); chk.E(err) { return } @@ -162,7 +160,6 @@ func (en *Result) Unmarshal(b []byte) (r []byte, err error) { return } en.Event = event.New() - log.I.F("unmarshal: '%s'", b) if r, err = en.Event.Unmarshal(r); err != nil { return } diff --git a/pkg/encoders/event/event.go b/pkg/encoders/event/event.go index fa28e79..73d3084 100644 --- a/pkg/encoders/event/event.go +++ b/pkg/encoders/event/event.go @@ -15,13 +15,10 @@ import ( "next.orly.dev/pkg/encoders/tag" "next.orly.dev/pkg/encoders/text" "next.orly.dev/pkg/utils" - "next.orly.dev/pkg/utils/bufpool" ) // E is the primary datatype of nostr. This is the form of the structure that -// defines its JSON string-based format. Always use New() and Free() to create -// and free event.E to take advantage of the bufpool which greatly improves -// memory allocation behaviour when encoding and decoding nostr events. +// defines its JSON string-based format. // // WARNING: DO NOT use json.Marshal with this type because it will not properly // encode <, >, and & characters due to legacy bullcrap in the encoding/json @@ -57,10 +54,6 @@ type E struct { // Sig is the signature on the ID hash that validates as coming from the // Pubkey in binary format. Sig []byte - - // b is the decode buffer for the event.E. this is where the UnmarshalJSON - // will source the memory to store all of the fields except for the tags. - b bufpool.B } var ( @@ -73,25 +66,66 @@ var ( jSig = []byte("sig") ) -// New returns a new event.E. The returned event.E should be freed with Free() -// to return the unmarshalling buffer to the bufpool. +// New returns a new event.E. func New() *E { - return &E{ - b: bufpool.Get(), - } + return &E{} } -// Free returns the event.E to the pool, as well as nilling all of the fields. -// This should hint to the GC that the event.E can be freed, and the memory -// reused. The decode buffer will be returned to the pool for reuse. +// Free nils all of the fields to hint to the GC that the event.E can be freed. func (ev *E) Free() { - bufpool.Put(ev.b) ev.ID = nil ev.Pubkey = nil ev.Tags = nil ev.Content = nil ev.Sig = nil - ev.b = nil +} + +// Clone creates a deep copy of the event with independent memory allocations. +// The clone does not use bufpool, ensuring it has a separate lifetime from +// the original event. This prevents corruption when the original is freed +// while the clone is still in use (e.g., in asynchronous delivery). +func (ev *E) Clone() *E { + clone := &E{ + CreatedAt: ev.CreatedAt, + Kind: ev.Kind, + } + + // Deep copy all byte slices with independent memory + if ev.ID != nil { + clone.ID = make([]byte, len(ev.ID)) + copy(clone.ID, ev.ID) + } + if ev.Pubkey != nil { + clone.Pubkey = make([]byte, len(ev.Pubkey)) + copy(clone.Pubkey, ev.Pubkey) + } + if ev.Content != nil { + clone.Content = make([]byte, len(ev.Content)) + copy(clone.Content, ev.Content) + } + if ev.Sig != nil { + clone.Sig = make([]byte, len(ev.Sig)) + copy(clone.Sig, ev.Sig) + } + + // Deep copy tags + if ev.Tags != nil { + clone.Tags = tag.NewS() + for _, tg := range *ev.Tags { + if tg != nil { + // Create new tag with deep-copied elements + newTag := tag.NewWithCap(len(tg.T)) + for _, element := range tg.T { + newElement := make([]byte, len(element)) + copy(newElement, element) + newTag.T = append(newTag.T, newElement) + } + clone.Tags.Append(newTag) + } + } + } + + return clone } // EstimateSize returns a size for the event that allows for worst case scenario @@ -135,6 +169,9 @@ func (ev *E) Marshal(dst []byte) (b []byte) { b = append(b, `":`...) if ev.Tags != nil { b = ev.Tags.Marshal(b) + } else { + // Emit empty array for nil tags to keep JSON valid + b = append(b, '[', ']') } b = append(b, `,"`...) b = append(b, jContent...) @@ -151,29 +188,22 @@ func (ev *E) Marshal(dst []byte) (b []byte) { // MarshalJSON marshals an event.E into a JSON byte string. // -// Call bufpool.PutBytes(b) to return the buffer to the bufpool after use. -// // WARNING: if json.Marshal is called in the hopes of invoking this function on // an event, if it has <, > or * in the content or tags they are escaped into // unicode escapes and break the event ID. Call this function directly in order // to bypass this issue. func (ev *E) MarshalJSON() (b []byte, err error) { - b = bufpool.Get() - b = ev.Marshal(b[:0]) + b = ev.Marshal(nil) return } func (ev *E) Serialize() (b []byte) { - b = bufpool.Get() - b = ev.Marshal(b[:0]) + b = ev.Marshal(nil) return } // Unmarshal unmarshalls a JSON string into an event.E. -// -// Call ev.Free() to return the provided buffer to the bufpool afterwards. func (ev *E) Unmarshal(b []byte) (rem []byte, err error) { - log.I.F("Unmarshal\n%s\n", string(b)) key := make([]byte, 0, 9) for ; len(b) > 0; b = b[1:] { // Skip whitespace @@ -185,7 +215,6 @@ func (ev *E) Unmarshal(b []byte) (rem []byte, err error) { goto BetweenKeys } } - log.I.F("start") goto eof BetweenKeys: for ; len(b) > 0; b = b[1:] { @@ -198,7 +227,6 @@ BetweenKeys: goto InKey } } - log.I.F("BetweenKeys") goto eof InKey: for ; len(b) > 0; b = b[1:] { @@ -208,7 +236,6 @@ InKey: } key = append(key, b[0]) } - log.I.F("InKey") goto eof InKV: for ; len(b) > 0; b = b[1:] { @@ -221,7 +248,6 @@ InKV: goto InVal } } - log.I.F("InKV") goto eof InVal: // Skip whitespace before value diff --git a/pkg/encoders/tag/tag.go b/pkg/encoders/tag/tag.go index 8d6db65..722e4b5 100644 --- a/pkg/encoders/tag/tag.go +++ b/pkg/encoders/tag/tag.go @@ -9,7 +9,6 @@ import ( "lol.mleku.dev/errorf" "next.orly.dev/pkg/encoders/text" utils "next.orly.dev/pkg/utils" - "next.orly.dev/pkg/utils/bufpool" ) // The tag position meanings, so they are clear when reading. @@ -21,18 +20,17 @@ const ( type T struct { T [][]byte - b bufpool.B } -func New() *T { return &T{b: bufpool.Get()} } +func New() *T { return &T{} } func NewFromBytesSlice(t ...[]byte) (tt *T) { - tt = &T{T: t, b: bufpool.Get()} + tt = &T{T: t} return } func NewFromAny(t ...any) (tt *T) { - tt = &T{b: bufpool.Get()} + tt = &T{} for _, v := range t { switch vv := v.(type) { case []byte: @@ -47,11 +45,10 @@ func NewFromAny(t ...any) (tt *T) { } func NewWithCap(c int) *T { - return &T{T: make([][]byte, 0, c), b: bufpool.Get()} + return &T{T: make([][]byte, 0, c)} } func (t *T) Free() { - bufpool.Put(t.b) t.T = nil } @@ -99,18 +96,12 @@ func (t *T) Marshal(dst []byte) (b []byte) { // in an event as you will have a bad time. Use the json.Marshal function in the // pkg/encoders/json package instead, this has a fork of the json library that // disables html escaping for json.Marshal. -// -// Call bufpool.PutBytes(b) to return the buffer to the bufpool after use. func (t *T) MarshalJSON() (b []byte, err error) { - b = bufpool.Get() - b = t.Marshal(b) + b = t.Marshal(nil) return } // Unmarshal decodes a standard minified JSON array of strings to a tags.T. -// -// Call bufpool.PutBytes(b) to return the buffer to the bufpool after use if it -// was originally created using bufpool.Get(). func (t *T) Unmarshal(b []byte) (r []byte, err error) { var inQuotes, openedBracket bool var quoteStart int @@ -127,7 +118,11 @@ func (t *T) Unmarshal(b []byte) (r []byte, err error) { i++ } else if b[i] == '"' { inQuotes = false - t.T = append(t.T, text.NostrUnescape(b[quoteStart:i])) + // Copy the quoted substring before unescaping so we don't mutate the + // original JSON buffer in-place (which would corrupt subsequent parsing). + copyBuf := make([]byte, i-quoteStart) + copy(copyBuf, b[quoteStart:i]) + t.T = append(t.T, text.NostrUnescape(copyBuf)) } } if !openedBracket || inQuotes { diff --git a/pkg/encoders/tag/tags.go b/pkg/encoders/tag/tags.go index a246321..8c426cb 100644 --- a/pkg/encoders/tag/tags.go +++ b/pkg/encoders/tag/tags.go @@ -5,7 +5,6 @@ import ( "lol.mleku.dev/chk" "next.orly.dev/pkg/utils" - "next.orly.dev/pkg/utils/bufpool" ) // S is a list of tag.T - which are lists of string elements with ordering and @@ -70,10 +69,7 @@ func (s *S) ContainsAny(tagName []byte, values [][]byte) bool { } // MarshalJSON encodes a tags.T appended to a provided byte slice in JSON form. -// -// Call bufpool.PutBytes(b) to return the buffer to the bufpool after use. func (s *S) MarshalJSON() (b []byte, err error) { - b = bufpool.Get() b = append(b, '[') for i, ss := range *s { b = ss.Marshal(b) @@ -100,8 +96,6 @@ func (s *S) Marshal(dst []byte) (b []byte) { // UnmarshalJSON a tags.T from a provided byte slice and return what remains // after the end of the array. -// -// Call bufpool.PutBytes(b) to return the buffer to the bufpool after use. func (s *S) UnmarshalJSON(b []byte) (err error) { _, err = s.Unmarshal(b) return diff --git a/pkg/encoders/text/helpers.go b/pkg/encoders/text/helpers.go index ccfe395..e65d52e 100644 --- a/pkg/encoders/text/helpers.go +++ b/pkg/encoders/text/helpers.go @@ -94,7 +94,10 @@ func UnmarshalQuoted(b []byte) (content, rem []byte, err error) { if !escaping { rem = rem[1:] content = content[:contentLen] - content = NostrUnescape(content) + // Create a copy of the content to avoid corrupting the original input buffer + contentCopy := make([]byte, len(content)) + copy(contentCopy, content) + content = NostrUnescape(contentCopy) return } contentLen++