complete the marshal/unmarshal of events using the new pool enabled tag codecs

This commit is contained in:
2025-08-22 14:29:55 +01:00
parent 8add32bb78
commit bf178eae4e
25 changed files with 1547 additions and 44 deletions

View File

@@ -0,0 +1,78 @@
package bufpool
import (
"fmt"
"sync"
"unsafe"
"lol.mleku.dev/log"
"next.orly.dev/pkg/utils/units"
)
const (
// BufferSize is the size of each buffer in the pool (1kb)
BufferSize = units.Kb / 2
)
type B []byte
func (b B) ToBytes() []byte { return b }
var Pool = sync.Pool{
New: func() interface{} {
// Create a new buffer when the pool is empty
b := make([]byte, 0, BufferSize)
log.T.C(
func() string {
ptr := unsafe.SliceData(b)
return fmt.Sprintf("creating buffer at: %p", ptr)
},
)
return B(b)
},
}
// Get returns a buffer from the pool or creates a new one if the pool is empty.
//
// Example usage:
//
// buf := bufpool.Get()
// defer bufpool.Put(buf)
// // Use buf...
func Get() B {
b := Pool.Get().(B)
log.T.C(
func() string {
ptr := unsafe.SliceData(b)
return fmt.Sprintf("getting buffer at: %p", ptr)
},
)
return b
}
// Put returns a buffer to the pool.
// Buffers should be returned to the pool when no longer needed to allow reuse.
func Put(b B) {
for i := range b {
(b)[i] = 0
}
b = b[:0]
log.T.C(
func() string {
ptr := unsafe.SliceData(b)
return fmt.Sprintf("returning to buffer: %p", ptr)
},
)
Pool.Put(b)
}
// PutBytes returns a buffer was not necessarily created by Get().
func PutBytes(b []byte) {
log.T.C(
func() string {
ptr := unsafe.SliceData(b)
return fmt.Sprintf("returning bytes to buffer: %p", ptr)
},
)
Put(b)
}

View File

@@ -0,0 +1,71 @@
package bufpool
import (
"testing"
)
func TestBufferPoolGetPut(t *testing.T) {
// Get a buffer from the pool
buf1 := Get()
// Verify the buffer is the correct size
if len(*buf1) != BufferSize {
t.Errorf("Expected buffer size of %d, got %d", BufferSize, len(*buf1))
}
// Write some data to the buffer
(*buf1)[0] = 42
// Return the buffer to the pool
Put(buf1)
// Get another buffer, which should be the same one we just returned
buf2 := Get()
// Buffer may or may not be cleared, but we should be able to use it
// Let's check if we have the expected buffer size
if len(*buf2) != BufferSize {
t.Errorf("Expected buffer size of %d, got %d", BufferSize, len(*buf2))
}
}
func TestMultipleBuffers(t *testing.T) {
// Get multiple buffers at once to ensure the pool can handle it
const numBuffers = 10
buffers := make([]B, numBuffers)
// Get buffers from the pool
for i := 0; i < numBuffers; i++ {
buffers[i] = Get()
// Verify each buffer is the correct size
if len(*buffers[i]) != BufferSize {
t.Errorf(
"Buffer %d: Expected size of %d, got %d", i, BufferSize,
len(*buffers[i]),
)
}
}
// Return all buffers to the pool
for i := 0; i < numBuffers; i++ {
Put(buffers[i])
}
}
func BenchmarkGetPut(b *testing.B) {
for i := 0; i < b.N; i++ {
buf := Get()
Put(buf)
}
}
func BenchmarkGetPutParallel(b *testing.B) {
b.RunParallel(
func(pb *testing.PB) {
for pb.Next() {
buf := Get()
Put(buf)
}
},
)
}