Add memory optimization improvements for reduced GC pressure (v0.36.16)
Some checks failed
Go / build-and-release (push) Has been cancelled
Some checks failed
Go / build-and-release (push) Has been cancelled
- Add buffer pool (pkg/database/bufpool) with SmallPool (64B) and MediumPool (1KB) for reusing bytes.Buffer instances on hot paths - Fix escape analysis in index types (uint40, letter, word) by using fixed-size arrays instead of make() calls that escape to heap - Add handler concurrency limiter (ORLY_MAX_HANDLERS_PER_CONN, default 100) to prevent unbounded goroutine growth under WebSocket load - Add pre-allocation hints to Uint40s.Union/Intersection/Difference methods - Update compact_event.go, save-event.go, serial_cache.go, and get-indexes-for-event.go to use pooled buffers Files modified: - app/config/config.go: Add MaxHandlersPerConnection config - app/handle-websocket.go: Initialize handler semaphore - app/listener.go: Add semaphore acquire/release in messageProcessor - pkg/database/bufpool/pool.go: New buffer pool package - pkg/database/compact_event.go: Use buffer pool, fix escape analysis - pkg/database/get-indexes-for-event.go: Reuse single buffer for all indexes - pkg/database/indexes/types/letter.go: Fixed array in UnmarshalRead - pkg/database/indexes/types/uint40.go: Fixed arrays, pre-allocation hints - pkg/database/indexes/types/word.go: Fixed array in UnmarshalRead - pkg/database/save-event.go: Use buffer pool for key encoding - pkg/database/serial_cache.go: Use buffer pool for lookups 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Binary file not shown.
@@ -106,6 +106,9 @@ type C struct {
|
||||
SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"100000" usage:"max pubkeys to cache for compact event storage (default: 100000, ~3.2MB memory)"`
|
||||
SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"500000" usage:"max event IDs to cache for compact event storage (default: 500000, ~16MB memory)"`
|
||||
|
||||
// Connection concurrency control
|
||||
MaxHandlersPerConnection int `env:"ORLY_MAX_HANDLERS_PER_CONN" default:"100" usage:"max concurrent message handlers per WebSocket connection (limits goroutine growth under load)"`
|
||||
|
||||
// Adaptive rate limiting (PID-controlled)
|
||||
RateLimitEnabled bool `env:"ORLY_RATE_LIMIT_ENABLED" default:"true" usage:"enable adaptive PID-controlled rate limiting for database operations"`
|
||||
RateLimitTargetMB int `env:"ORLY_RATE_LIMIT_TARGET_MB" default:"0" usage:"target memory limit in MB (0=auto-detect: 66% of available, min 500MB)"`
|
||||
|
||||
@@ -86,6 +86,12 @@ whitelist:
|
||||
})
|
||||
|
||||
defer conn.Close()
|
||||
// Determine handler semaphore size from config
|
||||
handlerSemSize := s.Config.MaxHandlersPerConnection
|
||||
if handlerSemSize <= 0 {
|
||||
handlerSemSize = 100 // Default if not configured
|
||||
}
|
||||
|
||||
listener := &Listener{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
@@ -98,6 +104,7 @@ whitelist:
|
||||
writeDone: make(chan struct{}),
|
||||
messageQueue: make(chan messageRequest, 100), // Buffered channel for message processing
|
||||
processingDone: make(chan struct{}),
|
||||
handlerSem: make(chan struct{}, handlerSemSize), // Limits concurrent handlers
|
||||
subscriptions: make(map[string]context.CancelFunc),
|
||||
}
|
||||
|
||||
|
||||
@@ -39,6 +39,7 @@ type Listener struct {
|
||||
messageQueue chan messageRequest // Buffered channel for message processing
|
||||
processingDone chan struct{} // Closed when message processor exits
|
||||
handlerWg sync.WaitGroup // Tracks spawned message handler goroutines
|
||||
handlerSem chan struct{} // Limits concurrent message handlers per connection
|
||||
authProcessing sync.RWMutex // Ensures AUTH completes before other messages check authentication
|
||||
// Flow control counters (atomic for concurrent access)
|
||||
droppedMessages atomic.Int64 // Messages dropped due to full queue
|
||||
@@ -240,9 +241,20 @@ func (l *Listener) messageProcessor() {
|
||||
// Not AUTH - unlock immediately and process concurrently
|
||||
// The next message can now be dequeued (possibly another non-AUTH to process concurrently)
|
||||
l.authProcessing.Unlock()
|
||||
|
||||
// Acquire semaphore to limit concurrent handlers (blocking with context awareness)
|
||||
select {
|
||||
case l.handlerSem <- struct{}{}:
|
||||
// Semaphore acquired
|
||||
case <-l.ctx.Done():
|
||||
return
|
||||
}
|
||||
l.handlerWg.Add(1)
|
||||
go func(data []byte, remote string) {
|
||||
defer l.handlerWg.Done()
|
||||
defer func() {
|
||||
<-l.handlerSem // Release semaphore
|
||||
l.handlerWg.Done()
|
||||
}()
|
||||
l.HandleMessage(data, remote)
|
||||
}(req.data, req.remote)
|
||||
}
|
||||
|
||||
94
pkg/database/bufpool/pool.go
Normal file
94
pkg/database/bufpool/pool.go
Normal file
@@ -0,0 +1,94 @@
|
||||
//go:build !(js && wasm)
|
||||
|
||||
// Package bufpool provides buffer pools for reducing GC pressure in hot paths.
|
||||
//
|
||||
// Two pool sizes are provided:
|
||||
// - SmallPool (64 bytes): For index keys, serial encoding, short buffers
|
||||
// - MediumPool (1KB): For event encoding, larger serialization buffers
|
||||
//
|
||||
// Usage:
|
||||
//
|
||||
// buf := bufpool.GetSmall()
|
||||
// defer bufpool.PutSmall(buf)
|
||||
// // Use buf...
|
||||
// // IMPORTANT: Copy buf.Bytes() before Put if data is needed after
|
||||
package bufpool
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
// SmallBufferSize for index keys (8-64 bytes typical)
|
||||
SmallBufferSize = 64
|
||||
|
||||
// MediumBufferSize for event encoding (300-1000 bytes typical)
|
||||
MediumBufferSize = 1024
|
||||
)
|
||||
|
||||
var (
|
||||
// smallPool for index keys and short encodings
|
||||
smallPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return bytes.NewBuffer(make([]byte, 0, SmallBufferSize))
|
||||
},
|
||||
}
|
||||
|
||||
// mediumPool for event encoding and larger buffers
|
||||
mediumPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return bytes.NewBuffer(make([]byte, 0, MediumBufferSize))
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
// GetSmall returns a small buffer (64 bytes) from the pool.
|
||||
// Call PutSmall when done to return it to the pool.
|
||||
//
|
||||
// WARNING: Copy buf.Bytes() before calling PutSmall if the data
|
||||
// is needed after the buffer is returned to the pool.
|
||||
func GetSmall() *bytes.Buffer {
|
||||
return smallPool.Get().(*bytes.Buffer)
|
||||
}
|
||||
|
||||
// PutSmall returns a small buffer to the pool.
|
||||
// The buffer is reset before being returned.
|
||||
func PutSmall(buf *bytes.Buffer) {
|
||||
if buf == nil {
|
||||
return
|
||||
}
|
||||
buf.Reset()
|
||||
smallPool.Put(buf)
|
||||
}
|
||||
|
||||
// GetMedium returns a medium buffer (1KB) from the pool.
|
||||
// Call PutMedium when done to return it to the pool.
|
||||
//
|
||||
// WARNING: Copy buf.Bytes() before calling PutMedium if the data
|
||||
// is needed after the buffer is returned to the pool.
|
||||
func GetMedium() *bytes.Buffer {
|
||||
return mediumPool.Get().(*bytes.Buffer)
|
||||
}
|
||||
|
||||
// PutMedium returns a medium buffer to the pool.
|
||||
// The buffer is reset before being returned.
|
||||
func PutMedium(buf *bytes.Buffer) {
|
||||
if buf == nil {
|
||||
return
|
||||
}
|
||||
buf.Reset()
|
||||
mediumPool.Put(buf)
|
||||
}
|
||||
|
||||
// CopyBytes copies the buffer contents to a new slice.
|
||||
// Use this before returning the buffer to the pool if the
|
||||
// data needs to persist.
|
||||
func CopyBytes(buf *bytes.Buffer) []byte {
|
||||
if buf == nil || buf.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
result := make([]byte, buf.Len())
|
||||
copy(result, buf.Bytes())
|
||||
return result
|
||||
}
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"git.mleku.dev/mleku/nostr/encoders/tag"
|
||||
"git.mleku.dev/mleku/nostr/encoders/varint"
|
||||
"lol.mleku.dev/chk"
|
||||
"next.orly.dev/pkg/database/bufpool"
|
||||
)
|
||||
|
||||
// CompactEventFormat defines the binary format for compact event storage.
|
||||
@@ -72,7 +73,8 @@ type SerialResolver interface {
|
||||
// MarshalCompactEvent encodes an event using compact serial references.
|
||||
// The resolver is used to look up/create serial mappings for pubkeys and event IDs.
|
||||
func MarshalCompactEvent(ev *event.E, resolver SerialResolver) (data []byte, err error) {
|
||||
buf := new(bytes.Buffer)
|
||||
buf := bufpool.GetMedium()
|
||||
defer bufpool.PutMedium(buf)
|
||||
|
||||
// Version byte
|
||||
buf.WriteByte(CompactFormatVersion)
|
||||
@@ -109,7 +111,8 @@ func MarshalCompactEvent(ev *event.E, resolver SerialResolver) (data []byte, err
|
||||
// Signature (64 bytes)
|
||||
buf.Write(ev.Sig)
|
||||
|
||||
return buf.Bytes(), nil
|
||||
// Copy bytes before returning buffer to pool
|
||||
return bufpool.CopyBytes(buf), nil
|
||||
}
|
||||
|
||||
// encodeCompactTag encodes a single tag with serial references for e/p tags.
|
||||
@@ -221,8 +224,8 @@ func writeUint40(w io.Writer, value uint64) {
|
||||
|
||||
// readUint40 reads a 5-byte big-endian unsigned integer.
|
||||
func readUint40(r io.Reader) (value uint64, err error) {
|
||||
buf := make([]byte, 5)
|
||||
if _, err = io.ReadFull(r, buf); err != nil {
|
||||
var buf [5]byte // Fixed array avoids heap escape
|
||||
if _, err = io.ReadFull(r, buf[:]); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
value = (uint64(buf[0]) << 32) |
|
||||
@@ -331,9 +334,9 @@ func decodeCompactTag(r io.Reader, resolver SerialResolver) (t *tag.T, err error
|
||||
|
||||
// decodeTagElement decodes a single tag element from compact format.
|
||||
func decodeTagElement(r io.Reader, resolver SerialResolver) (elem []byte, err error) {
|
||||
// Read type flag
|
||||
typeBuf := make([]byte, 1)
|
||||
if _, err = io.ReadFull(r, typeBuf); err != nil {
|
||||
// Read type flag (fixed array avoids heap escape)
|
||||
var typeBuf [1]byte
|
||||
if _, err = io.ReadFull(r, typeBuf[:]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
typeFlag := typeBuf[0]
|
||||
|
||||
@@ -4,21 +4,22 @@ import (
|
||||
"bytes"
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"next.orly.dev/pkg/database/bufpool"
|
||||
"next.orly.dev/pkg/database/indexes"
|
||||
. "next.orly.dev/pkg/database/indexes/types"
|
||||
"git.mleku.dev/mleku/nostr/encoders/event"
|
||||
)
|
||||
|
||||
// appendIndexBytes marshals an index to a byte slice and appends it to the idxs slice
|
||||
func appendIndexBytes(idxs *[][]byte, idx *indexes.T) (err error) {
|
||||
buf := new(bytes.Buffer)
|
||||
// appendIndexBytes marshals an index to a byte slice and appends it to the idxs slice.
|
||||
// It reuses the provided buffer (resetting it first) to avoid allocations.
|
||||
func appendIndexBytes(idxs *[][]byte, idx *indexes.T, buf *bytes.Buffer) (err error) {
|
||||
buf.Reset()
|
||||
// Marshal the index to the buffer
|
||||
if err = idx.MarshalWrite(buf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// Copy the buffer's bytes to a new byte slice
|
||||
// Append the byte slice to the idxs slice
|
||||
*idxs = append(*idxs, buf.Bytes())
|
||||
// Copy the buffer's bytes to a new byte slice and append
|
||||
*idxs = append(*idxs, bufpool.CopyBytes(buf))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -28,6 +29,10 @@ func appendIndexBytes(idxs *[][]byte, idx *indexes.T) (err error) {
|
||||
func GetIndexesForEvent(ev *event.E, serial uint64) (
|
||||
idxs [][]byte, err error,
|
||||
) {
|
||||
// Get a reusable buffer for all index serializations
|
||||
buf := bufpool.GetSmall()
|
||||
defer bufpool.PutSmall(buf)
|
||||
|
||||
defer func() {
|
||||
if chk.E(err) {
|
||||
idxs = nil
|
||||
@@ -44,7 +49,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) (
|
||||
return
|
||||
}
|
||||
idIndex := indexes.IdEnc(idHash, ser)
|
||||
if err = appendIndexBytes(&idxs, idIndex); chk.E(err) {
|
||||
if err = appendIndexBytes(&idxs, idIndex, buf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// FullIdPubkey index
|
||||
@@ -61,17 +66,17 @@ func GetIndexesForEvent(ev *event.E, serial uint64) (
|
||||
idPubkeyIndex := indexes.FullIdPubkeyEnc(
|
||||
ser, fullID, pubHash, createdAt,
|
||||
)
|
||||
if err = appendIndexBytes(&idxs, idPubkeyIndex); chk.E(err) {
|
||||
if err = appendIndexBytes(&idxs, idPubkeyIndex, buf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// CreatedAt index
|
||||
createdAtIndex := indexes.CreatedAtEnc(createdAt, ser)
|
||||
if err = appendIndexBytes(&idxs, createdAtIndex); chk.E(err) {
|
||||
if err = appendIndexBytes(&idxs, createdAtIndex, buf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// PubkeyCreatedAt index
|
||||
pubkeyIndex := indexes.PubkeyEnc(pubHash, createdAt, ser)
|
||||
if err = appendIndexBytes(&idxs, pubkeyIndex); chk.E(err) {
|
||||
if err = appendIndexBytes(&idxs, pubkeyIndex, buf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// Process tags for tag-related indexes
|
||||
@@ -101,7 +106,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) (
|
||||
key, valueHash, pubHash, createdAt, ser,
|
||||
)
|
||||
if err = appendIndexBytes(
|
||||
&idxs, pubkeyTagIndex,
|
||||
&idxs, pubkeyTagIndex, buf,
|
||||
); chk.E(err) {
|
||||
return
|
||||
}
|
||||
@@ -110,7 +115,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) (
|
||||
key, valueHash, createdAt, ser,
|
||||
)
|
||||
if err = appendIndexBytes(
|
||||
&idxs, tagIndex,
|
||||
&idxs, tagIndex, buf,
|
||||
); chk.E(err) {
|
||||
return
|
||||
}
|
||||
@@ -122,7 +127,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) (
|
||||
key, valueHash, kind, createdAt, ser,
|
||||
)
|
||||
if err = appendIndexBytes(
|
||||
&idxs, kindTagIndex,
|
||||
&idxs, kindTagIndex, buf,
|
||||
); chk.E(err) {
|
||||
return
|
||||
}
|
||||
@@ -131,7 +136,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) (
|
||||
key, valueHash, kind, pubHash, createdAt, ser,
|
||||
)
|
||||
if err = appendIndexBytes(
|
||||
&idxs, kindPubkeyTagIndex,
|
||||
&idxs, kindPubkeyTagIndex, buf,
|
||||
); chk.E(err) {
|
||||
return
|
||||
}
|
||||
@@ -142,7 +147,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) (
|
||||
kind.Set(uint16(ev.Kind))
|
||||
// Kind index
|
||||
kindIndex := indexes.KindEnc(kind, createdAt, ser)
|
||||
if err = appendIndexBytes(&idxs, kindIndex); chk.E(err) {
|
||||
if err = appendIndexBytes(&idxs, kindIndex, buf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// KindPubkey index
|
||||
@@ -150,7 +155,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) (
|
||||
kindPubkeyIndex := indexes.KindPubkeyEnc(
|
||||
kind, pubHash, createdAt, ser,
|
||||
)
|
||||
if err = appendIndexBytes(&idxs, kindPubkeyIndex); chk.E(err) {
|
||||
if err = appendIndexBytes(&idxs, kindPubkeyIndex, buf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -160,7 +165,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) (
|
||||
w := new(Word)
|
||||
w.FromWord(h) // 8-byte truncated hash
|
||||
wIdx := indexes.WordEnc(w, ser)
|
||||
if err = appendIndexBytes(&idxs, wIdx); chk.E(err) {
|
||||
if err = appendIndexBytes(&idxs, wIdx, buf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -176,7 +181,7 @@ func GetIndexesForEvent(ev *event.E, serial uint64) (
|
||||
w := new(Word)
|
||||
w.FromWord(h)
|
||||
wIdx := indexes.WordEnc(w, ser)
|
||||
if err = appendIndexBytes(&idxs, wIdx); chk.E(err) {
|
||||
if err = appendIndexBytes(&idxs, wIdx, buf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,8 +35,8 @@ func (p *Letter) MarshalWrite(w io.Writer) (err error) {
|
||||
}
|
||||
|
||||
func (p *Letter) UnmarshalRead(r io.Reader) (err error) {
|
||||
val := make([]byte, 1)
|
||||
if _, err = r.Read(val); chk.E(err) {
|
||||
var val [1]byte // Fixed array avoids heap escape
|
||||
if _, err = r.Read(val[:]); chk.E(err) {
|
||||
return
|
||||
}
|
||||
p.val = val[0]
|
||||
|
||||
@@ -46,23 +46,23 @@ func (c *Uint40) MarshalWrite(w io.Writer) (err error) {
|
||||
if c.value > MaxUint40 {
|
||||
return errors.New("value exceeds 40-bit range")
|
||||
}
|
||||
// Buffer for the 5 bytes
|
||||
buf := make([]byte, 5)
|
||||
// Fixed array avoids heap escape
|
||||
var buf [5]byte
|
||||
// Write the upper 5 bytes (ignoring the most significant 3 bytes of uint64)
|
||||
buf[0] = byte((c.value >> 32) & 0xFF) // Most significant byte
|
||||
buf[1] = byte((c.value >> 24) & 0xFF)
|
||||
buf[2] = byte((c.value >> 16) & 0xFF)
|
||||
buf[3] = byte((c.value >> 8) & 0xFF)
|
||||
buf[4] = byte(c.value & 0xFF) // Least significant byte
|
||||
_, err = w.Write(buf)
|
||||
_, err = w.Write(buf[:])
|
||||
return err
|
||||
}
|
||||
|
||||
// UnmarshalRead reads 5 bytes from the provided reader and decodes it into a 40-bit unsigned integer.
|
||||
func (c *Uint40) UnmarshalRead(r io.Reader) (err error) {
|
||||
// Buffer for the 5 bytes
|
||||
buf := make([]byte, 5)
|
||||
_, err = r.Read(buf)
|
||||
// Fixed array avoids heap escape
|
||||
var buf [5]byte
|
||||
_, err = r.Read(buf[:])
|
||||
if chk.E(err) {
|
||||
return err
|
||||
}
|
||||
@@ -81,8 +81,9 @@ type Uint40s []*Uint40
|
||||
// Union computes the union of the current Uint40s slice with another Uint40s slice. The result
|
||||
// contains all unique elements from both slices.
|
||||
func (s Uint40s) Union(other Uint40s) Uint40s {
|
||||
valueMap := make(map[uint64]bool)
|
||||
var result Uint40s
|
||||
totalCap := len(s) + len(other)
|
||||
valueMap := make(map[uint64]bool, totalCap)
|
||||
result := make(Uint40s, 0, totalCap) // Pre-allocate for worst case
|
||||
|
||||
// Add elements from the current Uint40s slice to the result
|
||||
for _, item := range s {
|
||||
@@ -108,8 +109,13 @@ func (s Uint40s) Union(other Uint40s) Uint40s {
|
||||
// Intersection computes the intersection of the current Uint40s slice with another Uint40s
|
||||
// slice. The result contains only the elements that exist in both slices.
|
||||
func (s Uint40s) Intersection(other Uint40s) Uint40s {
|
||||
valueMap := make(map[uint64]bool)
|
||||
var result Uint40s
|
||||
// Result can be at most the size of the smaller slice
|
||||
smallerLen := len(s)
|
||||
if len(other) < smallerLen {
|
||||
smallerLen = len(other)
|
||||
}
|
||||
valueMap := make(map[uint64]bool, len(other))
|
||||
result := make(Uint40s, 0, smallerLen) // Pre-allocate for worst case
|
||||
|
||||
// Add all elements from the other Uint40s slice to the map
|
||||
for _, item := range other {
|
||||
@@ -131,8 +137,8 @@ func (s Uint40s) Intersection(other Uint40s) Uint40s {
|
||||
// The result contains only the elements that are in the current slice but not in the other
|
||||
// slice.
|
||||
func (s Uint40s) Difference(other Uint40s) Uint40s {
|
||||
valueMap := make(map[uint64]bool)
|
||||
var result Uint40s
|
||||
valueMap := make(map[uint64]bool, len(other))
|
||||
result := make(Uint40s, 0, len(s)) // Pre-allocate for worst case (no overlap)
|
||||
|
||||
// Mark all elements in the other Uint40s slice
|
||||
for _, item := range other {
|
||||
|
||||
@@ -37,12 +37,12 @@ func (w *Word) MarshalWrite(wr io.Writer) (err error) {
|
||||
// UnmarshalRead reads the word from the reader, stopping at the zero-byte marker
|
||||
func (w *Word) UnmarshalRead(r io.Reader) error {
|
||||
buf := new(bytes.Buffer)
|
||||
tmp := make([]byte, 1)
|
||||
var tmp [1]byte // Fixed array avoids heap escape
|
||||
foundEndMarker := false
|
||||
|
||||
// Read bytes until the zero byte is encountered
|
||||
for {
|
||||
n, err := r.Read(tmp)
|
||||
n, err := r.Read(tmp[:])
|
||||
if n > 0 {
|
||||
if tmp[0] == 0x00 { // Stop on encountering the zero-byte marker
|
||||
foundEndMarker = true
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -12,6 +11,7 @@ import (
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/database/bufpool"
|
||||
"next.orly.dev/pkg/database/indexes"
|
||||
"next.orly.dev/pkg/database/indexes/types"
|
||||
"next.orly.dev/pkg/mode"
|
||||
@@ -277,14 +277,15 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
|
||||
// Calculate legacy size for comparison (for metrics tracking)
|
||||
// We marshal to get accurate size comparison
|
||||
legacyBuf := new(bytes.Buffer)
|
||||
legacyBuf := bufpool.GetMedium()
|
||||
defer bufpool.PutMedium(legacyBuf)
|
||||
ev.MarshalBinary(legacyBuf)
|
||||
legacySize := legacyBuf.Len()
|
||||
|
||||
if compactErr != nil {
|
||||
// Fall back to legacy format if compact encoding fails
|
||||
log.W.F("SaveEvent: compact encoding failed, using legacy format: %v", compactErr)
|
||||
compactData = legacyBuf.Bytes()
|
||||
compactData = bufpool.CopyBytes(legacyBuf)
|
||||
} else {
|
||||
// Track storage savings
|
||||
TrackCompactSaving(legacySize, len(compactData))
|
||||
@@ -322,13 +323,16 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
// Format: cmp|serial|compact_event_data
|
||||
// This is the only storage format - legacy evt/sev/aev/rev prefixes
|
||||
// are handled by migration and no longer written for new events
|
||||
cmpKeyBuf := new(bytes.Buffer)
|
||||
cmpKeyBuf := bufpool.GetSmall()
|
||||
if err = indexes.CompactEventEnc(ser).MarshalWrite(cmpKeyBuf); chk.E(err) {
|
||||
bufpool.PutSmall(cmpKeyBuf)
|
||||
return
|
||||
}
|
||||
if err = txn.Set(cmpKeyBuf.Bytes(), compactData); chk.E(err) {
|
||||
if err = txn.Set(bufpool.CopyBytes(cmpKeyBuf), compactData); chk.E(err) {
|
||||
bufpool.PutSmall(cmpKeyBuf)
|
||||
return
|
||||
}
|
||||
bufpool.PutSmall(cmpKeyBuf)
|
||||
|
||||
// Create graph edges between event and all related pubkeys
|
||||
// This creates bidirectional edges: event->pubkey and pubkey->event
|
||||
@@ -336,6 +340,10 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
eventKind := new(types.Uint16)
|
||||
eventKind.Set(ev.Kind)
|
||||
|
||||
// Reuse a single buffer for graph edge keys (reset between uses)
|
||||
graphKeyBuf := bufpool.GetSmall()
|
||||
defer bufpool.PutSmall(graphKeyBuf)
|
||||
|
||||
for _, pkInfo := range pubkeysForGraph {
|
||||
// Determine direction for forward edge (event -> pubkey perspective)
|
||||
directionForward := new(types.Letter)
|
||||
@@ -353,23 +361,20 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
}
|
||||
|
||||
// Create event -> pubkey edge (with kind and direction)
|
||||
epgKeyBuf := new(bytes.Buffer)
|
||||
if err = indexes.EventPubkeyGraphEnc(ser, pkInfo.serial, eventKind, directionForward).MarshalWrite(epgKeyBuf); chk.E(err) {
|
||||
graphKeyBuf.Reset()
|
||||
if err = indexes.EventPubkeyGraphEnc(ser, pkInfo.serial, eventKind, directionForward).MarshalWrite(graphKeyBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// Make a copy of the key bytes to avoid buffer reuse issues in txn
|
||||
epgKey := make([]byte, epgKeyBuf.Len())
|
||||
copy(epgKey, epgKeyBuf.Bytes())
|
||||
if err = txn.Set(epgKey, nil); chk.E(err) {
|
||||
if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
// Create pubkey -> event edge (reverse, with kind and direction for filtering)
|
||||
pegKeyBuf := new(bytes.Buffer)
|
||||
if err = indexes.PubkeyEventGraphEnc(pkInfo.serial, eventKind, directionReverse, ser).MarshalWrite(pegKeyBuf); chk.E(err) {
|
||||
graphKeyBuf.Reset()
|
||||
if err = indexes.PubkeyEventGraphEnc(pkInfo.serial, eventKind, directionReverse, ser).MarshalWrite(graphKeyBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if err = txn.Set(pegKeyBuf.Bytes(), nil); chk.E(err) {
|
||||
if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -397,25 +402,22 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
// Create forward edge: source event -> target event (outbound e-tag)
|
||||
directionOut := new(types.Letter)
|
||||
directionOut.Set(types.EdgeDirectionETagOut)
|
||||
eegKeyBuf := new(bytes.Buffer)
|
||||
if err = indexes.EventEventGraphEnc(ser, targetSerial, eventKind, directionOut).MarshalWrite(eegKeyBuf); chk.E(err) {
|
||||
graphKeyBuf.Reset()
|
||||
if err = indexes.EventEventGraphEnc(ser, targetSerial, eventKind, directionOut).MarshalWrite(graphKeyBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// Make a copy of the key bytes to avoid buffer reuse issues in txn
|
||||
eegKey := make([]byte, eegKeyBuf.Len())
|
||||
copy(eegKey, eegKeyBuf.Bytes())
|
||||
if err = txn.Set(eegKey, nil); chk.E(err) {
|
||||
if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
// Create reverse edge: target event -> source event (inbound e-tag)
|
||||
directionIn := new(types.Letter)
|
||||
directionIn.Set(types.EdgeDirectionETagIn)
|
||||
geeKeyBuf := new(bytes.Buffer)
|
||||
if err = indexes.GraphEventEventEnc(targetSerial, eventKind, directionIn, ser).MarshalWrite(geeKeyBuf); chk.E(err) {
|
||||
graphKeyBuf.Reset()
|
||||
if err = indexes.GraphEventEventEnc(targetSerial, eventKind, directionIn, ser).MarshalWrite(graphKeyBuf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if err = txn.Set(geeKeyBuf.Bytes(), nil); chk.E(err) {
|
||||
if err = txn.Set(bufpool.CopyBytes(graphKeyBuf), nil); chk.E(err) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,12 +3,12 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"lol.mleku.dev/chk"
|
||||
"next.orly.dev/pkg/database/bufpool"
|
||||
"next.orly.dev/pkg/database/indexes"
|
||||
"next.orly.dev/pkg/database/indexes/types"
|
||||
)
|
||||
@@ -281,7 +281,8 @@ func (r *DatabaseSerialResolver) GetEventIdBySerial(serial uint64) (eventId []by
|
||||
// GetEventIdBySerial looks up an event ID by its serial number.
|
||||
// Uses the SerialEventId index (sei prefix).
|
||||
func (d *D) GetEventIdBySerial(ser *types.Uint40) (eventId []byte, err error) {
|
||||
keyBuf := new(bytes.Buffer)
|
||||
keyBuf := bufpool.GetSmall()
|
||||
defer bufpool.PutSmall(keyBuf)
|
||||
if err = indexes.SerialEventIdEnc(ser).MarshalWrite(keyBuf); chk.E(err) {
|
||||
return nil, err
|
||||
}
|
||||
@@ -318,12 +319,13 @@ func (d *D) StoreEventIdSerial(txn *badger.Txn, serial uint64, eventId []byte) e
|
||||
return err
|
||||
}
|
||||
|
||||
keyBuf := new(bytes.Buffer)
|
||||
keyBuf := bufpool.GetSmall()
|
||||
defer bufpool.PutSmall(keyBuf)
|
||||
if err := indexes.SerialEventIdEnc(ser).MarshalWrite(keyBuf); chk.E(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
return txn.Set(keyBuf.Bytes(), eventId)
|
||||
return txn.Set(bufpool.CopyBytes(keyBuf), eventId)
|
||||
}
|
||||
|
||||
// SerialCacheStats holds statistics about the serial cache.
|
||||
|
||||
@@ -1 +1 @@
|
||||
v0.36.15
|
||||
v0.36.16
|
||||
|
||||
Reference in New Issue
Block a user