From eddd05eabf9f96be4405cbdec2c2b593e0863ea4 Mon Sep 17 00:00:00 2001 From: mleku Date: Thu, 25 Dec 2025 06:03:53 +0100 Subject: [PATCH] Add memory optimization improvements for reduced GC pressure (v0.36.16) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add buffer pool (pkg/database/bufpool) with SmallPool (64B) and MediumPool (1KB) for reusing bytes.Buffer instances on hot paths - Fix escape analysis in index types (uint40, letter, word) by using fixed-size arrays instead of make() calls that escape to heap - Add handler concurrency limiter (ORLY_MAX_HANDLERS_PER_CONN, default 100) to prevent unbounded goroutine growth under WebSocket load - Add pre-allocation hints to Uint40s.Union/Intersection/Difference methods - Update compact_event.go, save-event.go, serial_cache.go, and get-indexes-for-event.go to use pooled buffers Files modified: - app/config/config.go: Add MaxHandlersPerConnection config - app/handle-websocket.go: Initialize handler semaphore - app/listener.go: Add semaphore acquire/release in messageProcessor - pkg/database/bufpool/pool.go: New buffer pool package - pkg/database/compact_event.go: Use buffer pool, fix escape analysis - pkg/database/get-indexes-for-event.go: Reuse single buffer for all indexes - pkg/database/indexes/types/letter.go: Fixed array in UnmarshalRead - pkg/database/indexes/types/uint40.go: Fixed arrays, pre-allocation hints - pkg/database/indexes/types/word.go: Fixed array in UnmarshalRead - pkg/database/save-event.go: Use buffer pool for key encoding - pkg/database/serial_cache.go: Use buffer pool for lookups 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../quick_validate.cpython-310.pyc | Bin 0 -> 1675 bytes app/config/config.go | 3 + app/handle-websocket.go | 7 ++ app/listener.go | 14 ++- pkg/database/bufpool/pool.go | 94 ++++++++++++++++++ pkg/database/compact_event.go | 17 ++-- pkg/database/get-indexes-for-event.go | 41 ++++---- pkg/database/indexes/types/letter.go | 4 +- pkg/database/indexes/types/uint40.go | 30 +++--- pkg/database/indexes/types/word.go | 4 +- pkg/database/save-event.go | 48 ++++----- pkg/database/serial_cache.go | 10 +- pkg/version/version | 2 +- 13 files changed, 204 insertions(+), 70 deletions(-) create mode 100644 .claude/skills/skill-creator/scripts/__pycache__/quick_validate.cpython-310.pyc create mode 100644 pkg/database/bufpool/pool.go diff --git a/.claude/skills/skill-creator/scripts/__pycache__/quick_validate.cpython-310.pyc b/.claude/skills/skill-creator/scripts/__pycache__/quick_validate.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..d54036600f1253a6e476a9a3fb2305cd63b101a2 GIT binary patch literal 1675 zcmZWpU2hvj6y4b`uh))iQYfS;(jXNjZo78jp@Jx+6hftnG=LHwFrll>&cvR2y=!LH zZoOJdsNs=+fROA*{?fei)So~e5O;Q+V9}L!?wy%)=bn4!BUxK>HTeD+*4fz6w7*nm z^)sOJIsD=`Xo%L;2qJofx~Q;vS1+v5HNYB?IkdVK(uhf{U$w4HE5sh?e4jcLJ+Gc4 zQW+Th9(9Rxj*v#tyhf@E<=JYUR5!G9bfS0b-8JgcwMz`G^M?e*sKgTeNK=2y%^tYl zjafLrlOSRwNLdnN5pp(4@ma#L7_cZ3*uz5>vtbb73FQJX?rW$-J8U6oejB9y5UK9+ ztAe%Q{}O)Tz)Y&;z-b08L7UnP5=u^o@kt1VXv21SjQo67g;KKg}=%x zD_I=!PVP)J@e8@3EPqAb1&;55+8f#C7lB~ms@TaH4oaCb=jiJv2M6sTY3Yx%^~(;9 z6SWuPm}G|Md1`LsFu^}OdU}A*coL_>AWbP>LYmJ2a}sa2?>uas#7q3mCvma8|HE^L ze64mav*KV#chu7FKV?F&xQDmY&=zL#YEV`uv^*Vf1`7P`EBwmLn_-tGI^= z>#-DU5R)ZV;3STwtqbI3y60t|ZQ(G8m3ATxc-o-=e8JNG5{=<-hag@61%eec9H(q@ znO9`jSIXI062CyZ7idqZ^zr_GGF+y)TrG%u5yhuG2nRG3czai++H1A+tH+b^NZNu1 z9EL5C!j+W{6j_9{s<42{<&WTYN~iRPL{=0_8mGLb^43<)k95@@`DGeY`O;MyEJtc9 zdi<&J4G6zD?(~x(?F=J27VHAuB?UZjwX}l0hOcUNIPm$>S ztR5eK`S{TGXet|*9lr=vM8zl2IJ#pv$V86eLfbSp@~(JjZ%4HRrTP;?!EB%KC$E?UsJ5( literal 0 HcmV?d00001 diff --git a/app/config/config.go b/app/config/config.go index e18869e..c5274e0 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -106,6 +106,9 @@ type C struct { SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"100000" usage:"max pubkeys to cache for compact event storage (default: 100000, ~3.2MB memory)"` SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"500000" usage:"max event IDs to cache for compact event storage (default: 500000, ~16MB memory)"` + // Connection concurrency control + MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"` + // Adaptive rate limiting (PID-controlled) RateLimitEnabled bool `env:"ORLY_RATE_LIMIT_ENABLED" default:"true" usage:"enable adaptive PID-controlled rate limiting for database operations"` RateLimitTargetMB int `env:"ORLY_RATE_LIMIT_TARGET_MB" default:"0" usage:"target memory limit in MB (0=auto-detect: 66% of available, min 500MB)"` diff --git a/app/handle-websocket.go b/app/handle-websocket.go index 3723292..b5000ed 100644 --- a/app/handle-websocket.go +++ b/app/handle-websocket.go @@ -86,6 +86,12 @@ whitelist: }) defer conn.Close() + // Determine handler semaphore size from config + handlerSemSize := s.Config.MaxHandlersPerConnection + if handlerSemSize <= 0 { + handlerSemSize = 100 // Default if not configured + } + listener := &Listener{ ctx: ctx, cancel: cancel, @@ -98,6 +104,7 @@ whitelist: writeDone: make(chan struct{}), messageQueue: make(chan messageRequest, 100), // Buffered channel for message processing processingDone: make(chan struct{}), + handlerSem: make(chan struct{}, handlerSemSize), // Limits concurrent handlers subscriptions: make(map[string]context.CancelFunc), } diff --git a/app/listener.go b/app/listener.go index 63410d6..badd48c 100644 --- a/app/listener.go +++ b/app/listener.go @@ -39,6 +39,7 @@ type Listener struct { messageQueue chan messageRequest // Buffered channel for message processing processingDone chan struct{} // Closed when message processor exits handlerWg sync.WaitGroup // Tracks spawned message handler goroutines + handlerSem chan struct{} // Limits concurrent message handlers per connection authProcessing sync.RWMutex // Ensures AUTH completes before other messages check authentication // Flow control counters (atomic for concurrent access) droppedMessages atomic.Int64 // Messages dropped due to full queue @@ -240,9 +241,20 @@ func (l *Listener) messageProcessor() { // Not AUTH - unlock immediately and process concurrently // The next message can now be dequeued (possibly another non-AUTH to process concurrently) l.authProcessing.Unlock() + + // Acquire semaphore to limit concurrent handlers (blocking with context awareness) + select { + case l.handlerSem <- struct{}{}: + // Semaphore acquired + case <-l.ctx.Done(): + return + } l.handlerWg.Add(1) go func(data []byte, remote string) { - defer l.handlerWg.Done() + defer func() { + <-l.handlerSem // Release semaphore + l.handlerWg.Done() + }() l.HandleMessage(data, remote) }(req.data, req.remote) } diff --git a/pkg/database/bufpool/pool.go b/pkg/database/bufpool/pool.go new file mode 100644 index 0000000..e636475 --- /dev/null +++ b/pkg/database/bufpool/pool.go @@ -0,0 +1,94 @@ +//go:build !(js && wasm) + +// Package bufpool provides buffer pools for reducing GC pressure in hot paths. +// +// Two pool sizes are provided: +// - SmallPool (64 bytes): For index keys, serial encoding, short buffers +// - MediumPool (1KB): For event encoding, larger serialization buffers +// +// Usage: +// +// buf := bufpool.GetSmall() +// defer bufpool.PutSmall(buf) +// // Use buf... +// // IMPORTANT: Copy buf.Bytes() before Put if data is needed after +package bufpool + +import ( + "bytes" + "sync" +) + +const ( + // SmallBufferSize for index keys (8-64 bytes typical) + SmallBufferSize = 64 + + // MediumBufferSize for event encoding (300-1000 bytes typical) + MediumBufferSize = 1024 +) + +var ( + // smallPool for index keys and short encodings + smallPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, SmallBufferSize)) + }, + } + + // mediumPool for event encoding and larger buffers + mediumPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, MediumBufferSize)) + }, + } +) + +// GetSmall returns a small buffer (64 bytes) from the pool. +// Call PutSmall when done to return it to the pool. +// +// WARNING: Copy buf.Bytes() before calling PutSmall if the data +// is needed after the buffer is returned to the pool. +func GetSmall() *bytes.Buffer { + return smallPool.Get().(*bytes.Buffer) +} + +// PutSmall returns a small buffer to the pool. +// The buffer is reset before being returned. +func PutSmall(buf *bytes.Buffer) { + if buf == nil { + return + } + buf.Reset() + smallPool.Put(buf) +} + +// GetMedium returns a medium buffer (1KB) from the pool. +// Call PutMedium when done to return it to the pool. +// +// WARNING: Copy buf.Bytes() before calling PutMedium if the data +// is needed after the buffer is returned to the pool. +func GetMedium() *bytes.Buffer { + return mediumPool.Get().(*bytes.Buffer) +} + +// PutMedium returns a medium buffer to the pool. +// The buffer is reset before being returned. +func PutMedium(buf *bytes.Buffer) { + if buf == nil { + return + } + buf.Reset() + mediumPool.Put(buf) +} + +// CopyBytes copies the buffer contents to a new slice. +// Use this before returning the buffer to the pool if the +// data needs to persist. +func CopyBytes(buf *bytes.Buffer) []byte { + if buf == nil || buf.Len() == 0 { + return nil + } + result := make([]byte, buf.Len()) + copy(result, buf.Bytes()) + return result +} diff --git a/pkg/database/compact_event.go b/pkg/database/compact_event.go index be06caf..32e70a3 100644 --- a/pkg/database/compact_event.go +++ b/pkg/database/compact_event.go @@ -13,6 +13,7 @@ import ( "git.mleku.dev/mleku/nostr/encoders/tag" "git.mleku.dev/mleku/nostr/encoders/varint" "lol.mleku.dev/chk" + "next.orly.dev/pkg/database/bufpool" ) // CompactEventFormat defines the binary format for compact event storage. @@ -72,7 +73,8 @@ type SerialResolver interface { // MarshalCompactEvent encodes an event using compact serial references. // The resolver is used to look up/create serial mappings for pubkeys and event IDs. func MarshalCompactEvent(ev *event.E, resolver SerialResolver) (data []byte, err error) { - buf := new(bytes.Buffer) + buf := bufpool.GetMedium() + defer bufpool.PutMedium(buf) // Version byte buf.WriteByte(CompactFormatVersion) @@ -109,7 +111,8 @@ func MarshalCompactEvent(ev *event.E, resolver SerialResolver) (data []byte, err // Signature (64 bytes) buf.Write(ev.Sig) - return buf.Bytes(), nil + // Copy bytes before returning buffer to pool + return bufpool.CopyBytes(buf), nil } // encodeCompactTag encodes a single tag with serial references for e/p tags. @@ -221,8 +224,8 @@ func writeUint40(w io.Writer, value uint64) { // readUint40 reads a 5-byte big-endian unsigned integer. func readUint40(r io.Reader) (value uint64, err error) { - buf := make([]byte, 5) - if _, err = io.ReadFull(r, buf); err != nil { + var buf [5]byte // Fixed array avoids heap escape + if _, err = io.ReadFull(r, buf[:]); err != nil { return 0, err } value = (uint64(buf[0]) << 32) | @@ -331,9 +334,9 @@ func decodeCompactTag(r io.Reader, resolver SerialResolver) (t *tag.T, err error // decodeTagElement decodes a single tag element from compact format. func decodeTagElement(r io.Reader, resolver SerialResolver) (elem []byte, err error) { - // Read type flag - typeBuf := make([]byte, 1) - if _, err = io.ReadFull(r, typeBuf); err != nil { + // Read type flag (fixed array avoids heap escape) + var typeBuf [1]byte + if _, err = io.ReadFull(r, typeBuf[:]); err != nil { return nil, err } typeFlag := typeBuf[0] diff --git a/pkg/database/get-indexes-for-event.go b/pkg/database/get-indexes-for-event.go index 2e5b934..72757bb 100644 --- a/pkg/database/get-indexes-for-event.go +++ b/pkg/database/get-indexes-for-event.go @@ -4,21 +4,22 @@ import ( "bytes" "lol.mleku.dev/chk" + "next.orly.dev/pkg/database/bufpool" "next.orly.dev/pkg/database/indexes" . "next.orly.dev/pkg/database/indexes/types" "git.mleku.dev/mleku/nostr/encoders/event" ) -// appendIndexBytes marshals an index to a byte slice and appends it to the idxs slice -func appendIndexBytes(idxs *[][]byte, idx *indexes.T) (err error) { - buf := new(bytes.Buffer) +// appendIndexBytes marshals an index to a byte slice and appends it to the idxs slice. +// It reuses the provided buffer (resetting it first) to avoid allocations. +func appendIndexBytes(idxs *[][]byte, idx *indexes.T, buf *bytes.Buffer) (err error) { + buf.Reset() // Marshal the index to the buffer if err = idx.MarshalWrite(buf); chk.E(err) { return } - // Copy the buffer's bytes to a new byte slice - // Append the byte slice to the idxs slice - *idxs = append(*idxs, buf.Bytes()) + // Copy the buffer's bytes to a new byte slice and append + *idxs = append(*idxs, bufpool.CopyBytes(buf)) return } @@ -28,6 +29,10 @@ func appendIndexBytes(idxs *[][]byte, idx *indexes.T) (err error) { func GetIndexesForEvent(ev *event.E, serial uint64) ( idxs [][]byte, err error, ) { + // Get a reusable buffer for all index serializations + buf := bufpool.GetSmall() + defer bufpool.PutSmall(buf) + defer func() { if chk.E(err) { idxs = nil @@ -44,7 +49,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) ( return } idIndex := indexes.IdEnc(idHash, ser) - if err = appendIndexBytes(&idxs, idIndex); chk.E(err) { + if err = appendIndexBytes(&idxs, idIndex, buf); chk.E(err) { return } // FullIdPubkey index @@ -61,17 +66,17 @@ func GetIndexesForEvent(ev *event.E, serial uint64) ( idPubkeyIndex := indexes.FullIdPubkeyEnc( ser, fullID, pubHash, createdAt, ) - if err = appendIndexBytes(&idxs, idPubkeyIndex); chk.E(err) { + if err = appendIndexBytes(&idxs, idPubkeyIndex, buf); chk.E(err) { return } // CreatedAt index createdAtIndex := indexes.CreatedAtEnc(createdAt, ser) - if err = appendIndexBytes(&idxs, createdAtIndex); chk.E(err) { + if err = appendIndexBytes(&idxs, createdAtIndex, buf); chk.E(err) { return } // PubkeyCreatedAt index pubkeyIndex := indexes.PubkeyEnc(pubHash, createdAt, ser) - if err = appendIndexBytes(&idxs, pubkeyIndex); chk.E(err) { + if err = appendIndexBytes(&idxs, pubkeyIndex, buf); chk.E(err) { return } // Process tags for tag-related indexes @@ -101,7 +106,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) ( key, valueHash, pubHash, createdAt, ser, ) if err = appendIndexBytes( - &idxs, pubkeyTagIndex, + &idxs, pubkeyTagIndex, buf, ); chk.E(err) { return } @@ -110,7 +115,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) ( key, valueHash, createdAt, ser, ) if err = appendIndexBytes( - &idxs, tagIndex, + &idxs, tagIndex, buf, ); chk.E(err) { return } @@ -122,7 +127,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) ( key, valueHash, kind, createdAt, ser, ) if err = appendIndexBytes( - &idxs, kindTagIndex, + &idxs, kindTagIndex, buf, ); chk.E(err) { return } @@ -131,7 +136,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) ( key, valueHash, kind, pubHash, createdAt, ser, ) if err = appendIndexBytes( - &idxs, kindPubkeyTagIndex, + &idxs, kindPubkeyTagIndex, buf, ); chk.E(err) { return } @@ -142,7 +147,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) ( kind.Set(uint16(ev.Kind)) // Kind index kindIndex := indexes.KindEnc(kind, createdAt, ser) - if err = appendIndexBytes(&idxs, kindIndex); chk.E(err) { + if err = appendIndexBytes(&idxs, kindIndex, buf); chk.E(err) { return } // KindPubkey index @@ -150,7 +155,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) ( kindPubkeyIndex := indexes.KindPubkeyEnc( kind, pubHash, createdAt, ser, ) - if err = appendIndexBytes(&idxs, kindPubkeyIndex); chk.E(err) { + if err = appendIndexBytes(&idxs, kindPubkeyIndex, buf); chk.E(err) { return } @@ -160,7 +165,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) ( w := new(Word) w.FromWord(h) // 8-byte truncated hash wIdx := indexes.WordEnc(w, ser) - if err = appendIndexBytes(&idxs, wIdx); chk.E(err) { + if err = appendIndexBytes(&idxs, wIdx, buf); chk.E(err) { return } } @@ -176,7 +181,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) ( w := new(Word) w.FromWord(h) wIdx := indexes.WordEnc(w, ser) - if err = appendIndexBytes(&idxs, wIdx); chk.E(err) { + if err = appendIndexBytes(&idxs, wIdx, buf); chk.E(err) { return } } diff --git a/pkg/database/indexes/types/letter.go b/pkg/database/indexes/types/letter.go index 1b2faea..d83ea06 100644 --- a/pkg/database/indexes/types/letter.go +++ b/pkg/database/indexes/types/letter.go @@ -35,8 +35,8 @@ func (p *Letter) MarshalWrite(w io.Writer) (err error) { } func (p *Letter) UnmarshalRead(r io.Reader) (err error) { - val := make([]byte, 1) - if _, err = r.Read(val); chk.E(err) { + var val [1]byte // Fixed array avoids heap escape + if _, err = r.Read(val[:]); chk.E(err) { return } p.val = val[0] diff --git a/pkg/database/indexes/types/uint40.go b/pkg/database/indexes/types/uint40.go index 3d58520..99fa0d2 100644 --- a/pkg/database/indexes/types/uint40.go +++ b/pkg/database/indexes/types/uint40.go @@ -46,23 +46,23 @@ func (c *Uint40) MarshalWrite(w io.Writer) (err error) { if c.value > MaxUint40 { return errors.New("value exceeds 40-bit range") } - // Buffer for the 5 bytes - buf := make([]byte, 5) + // Fixed array avoids heap escape + var buf [5]byte // Write the upper 5 bytes (ignoring the most significant 3 bytes of uint64) buf[0] = byte((c.value >> 32) & 0xFF) // Most significant byte buf[1] = byte((c.value >> 24) & 0xFF) buf[2] = byte((c.value >> 16) & 0xFF) buf[3] = byte((c.value >> 8) & 0xFF) buf[4] = byte(c.value & 0xFF) // Least significant byte - _, err = w.Write(buf) + _, err = w.Write(buf[:]) return err } // UnmarshalRead reads 5 bytes from the provided reader and decodes it into a 40-bit unsigned integer. func (c *Uint40) UnmarshalRead(r io.Reader) (err error) { - // Buffer for the 5 bytes - buf := make([]byte, 5) - _, err = r.Read(buf) + // Fixed array avoids heap escape + var buf [5]byte + _, err = r.Read(buf[:]) if chk.E(err) { return err } @@ -81,8 +81,9 @@ type Uint40s []*Uint40 // Union computes the union of the current Uint40s slice with another Uint40s slice. The result // contains all unique elements from both slices. func (s Uint40s) Union(other Uint40s) Uint40s { - valueMap := make(map[uint64]bool) - var result Uint40s + totalCap := len(s) + len(other) + valueMap := make(map[uint64]bool, totalCap) + result := make(Uint40s, 0, totalCap) // Pre-allocate for worst case // Add elements from the current Uint40s slice to the result for _, item := range s { @@ -108,8 +109,13 @@ func (s Uint40s) Union(other Uint40s) Uint40s { // Intersection computes the intersection of the current Uint40s slice with another Uint40s // slice. The result contains only the elements that exist in both slices. func (s Uint40s) Intersection(other Uint40s) Uint40s { - valueMap := make(map[uint64]bool) - var result Uint40s + // Result can be at most the size of the smaller slice + smallerLen := len(s) + if len(other) < smallerLen { + smallerLen = len(other) + } + valueMap := make(map[uint64]bool, len(other)) + result := make(Uint40s, 0, smallerLen) // Pre-allocate for worst case // Add all elements from the other Uint40s slice to the map for _, item := range other { @@ -131,8 +137,8 @@ func (s Uint40s) Intersection(other Uint40s) Uint40s { // The result contains only the elements that are in the current slice but not in the other // slice. func (s Uint40s) Difference(other Uint40s) Uint40s { - valueMap := make(map[uint64]bool) - var result Uint40s + valueMap := make(map[uint64]bool, len(other)) + result := make(Uint40s, 0, len(s)) // Pre-allocate for worst case (no overlap) // Mark all elements in the other Uint40s slice for _, item := range other { diff --git a/pkg/database/indexes/types/word.go b/pkg/database/indexes/types/word.go index c9a86f5..372214c 100644 --- a/pkg/database/indexes/types/word.go +++ b/pkg/database/indexes/types/word.go @@ -37,12 +37,12 @@ func (w *Word) MarshalWrite(wr io.Writer) (err error) { // UnmarshalRead reads the word from the reader, stopping at the zero-byte marker func (w *Word) UnmarshalRead(r io.Reader) error { buf := new(bytes.Buffer) - tmp := make([]byte, 1) + var tmp [1]byte // Fixed array avoids heap escape foundEndMarker := false // Read bytes until the zero byte is encountered for { - n, err := r.Read(tmp) + n, err := r.Read(tmp[:]) if n > 0 { if tmp[0] == 0x00 { // Stop on encountering the zero-byte marker foundEndMarker = true diff --git a/pkg/database/save-event.go b/pkg/database/save-event.go index c138ffa..d61ea1b 100644 --- a/pkg/database/save-event.go +++ b/pkg/database/save-event.go @@ -3,7 +3,6 @@ package database import ( - "bytes" "context" "errors" "fmt" @@ -12,6 +11,7 @@ import ( "github.com/dgraph-io/badger/v4" "lol.mleku.dev/chk" "lol.mleku.dev/log" + "next.orly.dev/pkg/database/bufpool" "next.orly.dev/pkg/database/indexes" "next.orly.dev/pkg/database/indexes/types" "next.orly.dev/pkg/mode" @@ -277,14 +277,15 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) ( // Calculate legacy size for comparison (for metrics tracking) // We marshal to get accurate size comparison - legacyBuf := new(bytes.Buffer) + legacyBuf := bufpool.GetMedium() + defer bufpool.PutMedium(legacyBuf) ev.MarshalBinary(legacyBuf) legacySize := legacyBuf.Len() if compactErr != nil { // Fall back to legacy format if compact encoding fails log.W.F("SaveEvent: compact encoding failed, using legacy format: %v", compactErr) - compactData = legacyBuf.Bytes() + compactData = bufpool.CopyBytes(legacyBuf) } else { // Track storage savings TrackCompactSaving(legacySize, len(compactData)) @@ -322,13 +323,16 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) ( // Format: cmp|serial|compact_event_data // This is the only storage format - legacy evt/sev/aev/rev prefixes // are handled by migration and no longer written for new events - cmpKeyBuf := new(bytes.Buffer) + cmpKeyBuf := bufpool.GetSmall() if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKeyBuf); chk.E(err) { + bufpool.PutSmall(cmpKeyBuf) return } - if err = txn.Set(cmpKeyBuf.Bytes(), compactData); chk.E(err) { + if err = txn.Set(bufpool.CopyBytes(cmpKeyBuf), compactData); chk.E(err) { + bufpool.PutSmall(cmpKeyBuf) return } + bufpool.PutSmall(cmpKeyBuf) // Create graph edges between event and all related pubkeys // This creates bidirectional edges: event->pubkey and pubkey->event @@ -336,6 +340,10 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) ( eventKind := new(types.Uint16) eventKind.Set(ev.Kind) + // Reuse a single buffer for graph edge keys (reset between uses) + graphKeyBuf := bufpool.GetSmall() + defer bufpool.PutSmall(graphKeyBuf) + for _, pkInfo := range pubkeysForGraph { // Determine direction for forward edge (event -> pubkey perspective) directionForward := new(types.Letter) @@ -353,23 +361,20 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) ( } // Create event -> pubkey edge (with kind and direction) - epgKeyBuf := new(bytes.Buffer) - if err = indexes.EventPubkeyGraphEnc(ser, pkInfo.serial, eventKind, directionForward).MarshalWrite(epgKeyBuf); chk.E(err) { + graphKeyBuf.Reset() + if err = indexes.EventPubkeyGraphEnc(ser, pkInfo.serial, eventKind, directionForward).MarshalWrite(graphKeyBuf); chk.E(err) { return } - // Make a copy of the key bytes to avoid buffer reuse issues in txn - epgKey := make([]byte, epgKeyBuf.Len()) - copy(epgKey, epgKeyBuf.Bytes()) - if err = txn.Set(epgKey, nil); chk.E(err) { + if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) { return } // Create pubkey -> event edge (reverse, with kind and direction for filtering) - pegKeyBuf := new(bytes.Buffer) - if err = indexes.PubkeyEventGraphEnc(pkInfo.serial, eventKind, directionReverse, ser).MarshalWrite(pegKeyBuf); chk.E(err) { + graphKeyBuf.Reset() + if err = indexes.PubkeyEventGraphEnc(pkInfo.serial, eventKind, directionReverse, ser).MarshalWrite(graphKeyBuf); chk.E(err) { return } - if err = txn.Set(pegKeyBuf.Bytes(), nil); chk.E(err) { + if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) { return } } @@ -397,25 +402,22 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) ( // Create forward edge: source event -> target event (outbound e-tag) directionOut := new(types.Letter) directionOut.Set(types.EdgeDirectionETagOut) - eegKeyBuf := new(bytes.Buffer) - if err = indexes.EventEventGraphEnc(ser, targetSerial, eventKind, directionOut).MarshalWrite(eegKeyBuf); chk.E(err) { + graphKeyBuf.Reset() + if err = indexes.EventEventGraphEnc(ser, targetSerial, eventKind, directionOut).MarshalWrite(graphKeyBuf); chk.E(err) { return } - // Make a copy of the key bytes to avoid buffer reuse issues in txn - eegKey := make([]byte, eegKeyBuf.Len()) - copy(eegKey, eegKeyBuf.Bytes()) - if err = txn.Set(eegKey, nil); chk.E(err) { + if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) { return } // Create reverse edge: target event -> source event (inbound e-tag) directionIn := new(types.Letter) directionIn.Set(types.EdgeDirectionETagIn) - geeKeyBuf := new(bytes.Buffer) - if err = indexes.GraphEventEventEnc(targetSerial, eventKind, directionIn, ser).MarshalWrite(geeKeyBuf); chk.E(err) { + graphKeyBuf.Reset() + if err = indexes.GraphEventEventEnc(targetSerial, eventKind, directionIn, ser).MarshalWrite(graphKeyBuf); chk.E(err) { return } - if err = txn.Set(geeKeyBuf.Bytes(), nil); chk.E(err) { + if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) { return } } diff --git a/pkg/database/serial_cache.go b/pkg/database/serial_cache.go index 81f9bef..7bf0121 100644 --- a/pkg/database/serial_cache.go +++ b/pkg/database/serial_cache.go @@ -3,12 +3,12 @@ package database import ( - "bytes" "errors" "sync" "github.com/dgraph-io/badger/v4" "lol.mleku.dev/chk" + "next.orly.dev/pkg/database/bufpool" "next.orly.dev/pkg/database/indexes" "next.orly.dev/pkg/database/indexes/types" ) @@ -281,7 +281,8 @@ func (r *DatabaseSerialResolver) GetEventIdBySerial(serial uint64) (eventId []by // GetEventIdBySerial looks up an event ID by its serial number. // Uses the SerialEventId index (sei prefix). func (d *D) GetEventIdBySerial(ser *types.Uint40) (eventId []byte, err error) { - keyBuf := new(bytes.Buffer) + keyBuf := bufpool.GetSmall() + defer bufpool.PutSmall(keyBuf) if err = indexes.SerialEventIdEnc(ser).MarshalWrite(keyBuf); chk.E(err) { return nil, err } @@ -318,12 +319,13 @@ func (d *D) StoreEventIdSerial(txn *badger.Txn, serial uint64, eventId []byte) e return err } - keyBuf := new(bytes.Buffer) + keyBuf := bufpool.GetSmall() + defer bufpool.PutSmall(keyBuf) if err := indexes.SerialEventIdEnc(ser).MarshalWrite(keyBuf); chk.E(err) { return err } - return txn.Set(keyBuf.Bytes(), eventId) + return txn.Set(bufpool.CopyBytes(keyBuf), eventId) } // SerialCacheStats holds statistics about the serial cache. diff --git a/pkg/version/version b/pkg/version/version index 4c0f21d..057b551 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.36.15 +v0.36.16