fixed bug with client message buffer causing overwriting of tag slices

This commit is contained in:
2025-07-27 11:46:04 +01:00
parent 978d9b88cd
commit 0d7943be89
12 changed files with 9 additions and 621 deletions

View File

@@ -148,6 +148,7 @@ func (s *Server) SpiderFetch(
// Process each event immediately
for i, ev := range evss {
// log.I.S(ev)
// Create a key based on pubkey and kind for deduplication
pkKindKey := string(ev.Pubkey) + string(ev.Kind.Marshal(nil))

View File

@@ -6,7 +6,6 @@ import (
"io"
"orly.dev/pkg/database/indexes"
"orly.dev/pkg/database/indexes/types"
"orly.dev/pkg/encoders/codecbuf"
"orly.dev/pkg/encoders/event"
"orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/context"
@@ -22,8 +21,7 @@ func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) {
if len(pubkeys) == 0 {
if err = d.View(
func(txn *badger.Txn) (err error) {
buf := codecbuf.Get()
defer codecbuf.Put(buf)
buf := new(bytes.Buffer)
if err = indexes.EventEnc(nil).MarshalWrite(buf); chk.E(err) {
return
}
@@ -61,8 +59,7 @@ func (d *D) Export(c context.T, w io.Writer, pubkeys ...[]byte) {
for _, pubkey := range pubkeys {
if err = d.View(
func(txn *badger.Txn) (err error) {
pkBuf := codecbuf.Get()
defer codecbuf.Put(pkBuf)
pkBuf := new(bytes.Buffer)
ph := &types.PubHash{}
if err = ph.FromPubkey(pubkey); chk.E(err) {
return

View File

@@ -5,7 +5,6 @@ import (
"github.com/dgraph-io/badger/v4"
"orly.dev/pkg/database/indexes"
"orly.dev/pkg/database/indexes/types"
"orly.dev/pkg/encoders/codecbuf"
"orly.dev/pkg/encoders/event"
"orly.dev/pkg/utils/chk"
)
@@ -13,8 +12,7 @@ import (
func (d *D) FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error) {
if err = d.View(
func(txn *badger.Txn) (err error) {
buf := codecbuf.Get()
defer codecbuf.Put(buf)
buf := new(bytes.Buffer)
if err = indexes.EventEnc(ser).MarshalWrite(buf); chk.E(err) {
return
}

View File

@@ -5,7 +5,6 @@ import (
"github.com/dgraph-io/badger/v4"
"orly.dev/pkg/database/indexes"
"orly.dev/pkg/database/indexes/types"
"orly.dev/pkg/encoders/codecbuf"
"orly.dev/pkg/interfaces/store"
"orly.dev/pkg/utils/chk"
)
@@ -15,8 +14,7 @@ func (d *D) GetFullIdPubkeyBySerial(ser *types.Uint40) (
) {
if err = d.View(
func(txn *badger.Txn) (err error) {
buf := codecbuf.Get()
defer codecbuf.Put(buf)
buf := new(bytes.Buffer)
if err = indexes.FullIdPubkeyEnc(
ser, nil, nil, nil,
).MarshalWrite(buf); chk.E(err) {

View File

@@ -3,7 +3,6 @@ package types
import (
"bytes"
"io"
"orly.dev/pkg/encoders/codecbuf"
"orly.dev/pkg/utils/chk"
)
@@ -29,7 +28,7 @@ func (ts *Timestamp) ToTimestamp() (timestamp int64) {
func (ts *Timestamp) Bytes() (b []byte, err error) {
v := new(Uint64)
v.Set(uint64(ts.val))
buf := codecbuf.Get()
buf := new(bytes.Buffer)
if err = v.MarshalWrite(buf); chk.E(err) {
return
}

View File

@@ -1,8 +1,8 @@
package types
import (
"bytes"
"io"
"orly.dev/pkg/encoders/codecbuf"
"orly.dev/pkg/utils/chk"
)
@@ -35,8 +35,7 @@ func (w *Word) MarshalWrite(wr io.Writer) (err error) {
// UnmarshalRead reads the word from the reader, stopping at the zero-byte marker
func (w *Word) UnmarshalRead(r io.Reader) error {
buf := codecbuf.Get()
defer codecbuf.Put(buf)
buf := new(bytes.Buffer)
tmp := make([]byte, 1)
foundEndMarker := false

View File

@@ -1,52 +0,0 @@
// Package bytesbuf provides a concurrent-safe []byte buffer pool for encoding
// data.
package bytesbuf
import (
"orly.dev/pkg/utils/units"
"sync"
)
// Pool is a concurrent-safe pool of []byte objects.
type Pool struct {
pool sync.Pool
}
// NewPool creates a new buffer pool.
func NewPool() *Pool {
return &Pool{
pool: sync.Pool{
New: func() interface{} {
return make([]byte, 0, units.Mb) // Initial capacity of 64 bytes
},
},
}
}
// Get returns a buffer from the pool or creates a new one if the pool is empty.
func (p *Pool) Get() []byte {
return p.pool.Get().([]byte)
}
// Put returns a buffer to the pool after zeroing its bytes for security and resetting it.
func (p *Pool) Put(buf []byte) {
// Zero out the bytes for security
for i := range buf {
buf[i] = 0
}
// Reset the slice length to 0 while preserving capacity
p.pool.Put(buf[:0])
}
// DefaultPool is the default buffer pool for the application.
var DefaultPool = NewPool()
// Get returns a buffer from the default pool.
func Get() []byte {
return DefaultPool.Get()
}
// Put returns a buffer to the default pool after zeroing its bytes for security.
func Put(buf []byte) {
DefaultPool.Put(buf)
}

View File

@@ -1,205 +0,0 @@
package bytesbuf
import (
"bytes"
"testing"
)
func TestPool(t *testing.T) {
// Create a new pool
pool := NewPool()
// Get a buffer from the pool
buf := pool.Get()
if buf == nil {
t.Fatal("Expected non-nil buffer from pool")
}
// Write some data to the buffer
testData := []byte("test data")
buf = append(buf, testData...)
// Verify the buffer contains the expected data
if !bytes.Equal(buf, testData) {
t.Fatalf(
"Expected buffer to contain %q, got %q", testData, buf,
)
}
// Put the buffer back in the pool
pool.Put(buf)
// Get another buffer from the pool (should be the same one, reset)
buf2 := pool.Get()
if buf2 == nil {
t.Fatal("Expected non-nil buffer from pool")
}
// Verify the buffer is empty (was reset)
if len(buf2) != 0 {
t.Fatalf("Expected empty buffer, got buffer with length %d", len(buf2))
}
// Write different data to the buffer
testData2 := []byte("different data")
buf2 = append(buf2, testData2...)
// Verify the buffer contains the new data
if !bytes.Equal(buf2, testData2) {
t.Fatalf(
"Expected buffer to contain %q, got %q", testData2, buf2,
)
}
}
func TestDefaultPool(t *testing.T) {
// Get a buffer from the default pool
buf := Get()
if buf == nil {
t.Fatal("Expected non-nil buffer from default pool")
}
// Write some data to the buffer
testData := []byte("test data for default pool")
buf = append(buf, testData...)
// Verify the buffer contains the expected data
if !bytes.Equal(buf, testData) {
t.Fatalf(
"Expected buffer to contain %q, got %q", testData, buf,
)
}
// Put the buffer back in the pool
Put(buf)
// Get another buffer from the pool (should be reset)
buf2 := Get()
if buf2 == nil {
t.Fatal("Expected non-nil buffer from default pool")
}
// Verify the buffer is empty (was reset)
if len(buf2) != 0 {
t.Fatalf("Expected empty buffer, got buffer with length %d", len(buf2))
}
}
func TestZeroBytes(t *testing.T) {
// Create a new pool
pool := NewPool()
// Get a buffer from the pool
buf := pool.Get()
if buf == nil {
t.Fatal("Expected non-nil buffer from pool")
}
// Write some sensitive data to the buffer
sensitiveData := []byte{0x01, 0x02, 0x03, 0x04, 0x05}
buf = append(buf, sensitiveData...)
// Get the capacity before putting it back
capacity := cap(buf)
// Put the buffer back in the pool
pool.Put(buf)
// Get another buffer from the pool (should be the same one, reset)
buf2 := pool.Get()
if buf2 == nil {
t.Fatal("Expected non-nil buffer from pool")
}
// Verify the buffer is empty (was reset)
if len(buf2) != 0 {
t.Fatalf("Expected empty buffer, got buffer with length %d", len(buf2))
}
// Verify the capacity is at least the same (should be the same buffer)
if cap(buf2) < capacity {
t.Fatalf("Expected capacity at least %d, got %d", capacity, cap(buf2))
}
// Grow the buffer to expose the underlying memory
buf2 = append(buf2, make([]byte, len(sensitiveData))...)
// Write some new data to the buffer to expose the underlying memory
newData := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
copy(buf2, newData)
// Verify that the sensitive data was zeroed out
// The new data should be there, but no trace of the old data
for i, b := range buf2[:len(newData)] {
if b != newData[i] {
t.Fatalf("Expected byte %d to be %d, got %d", i, newData[i], b)
}
}
}
func TestDefaultPoolZeroBytes(t *testing.T) {
// Get a buffer from the default pool
buf := Get()
if buf == nil {
t.Fatal("Expected non-nil buffer from default pool")
}
// Write some sensitive data to the buffer
sensitiveData := []byte{0x01, 0x02, 0x03, 0x04, 0x05}
buf = append(buf, sensitiveData...)
// Get the capacity before putting it back
capacity := cap(buf)
// Put the buffer back in the pool
Put(buf)
// Get another buffer from the pool (should be the same one, reset)
buf2 := Get()
if buf2 == nil {
t.Fatal("Expected non-nil buffer from default pool")
}
// Verify the buffer is empty (was reset)
if len(buf2) != 0 {
t.Fatalf("Expected empty buffer, got buffer with length %d", len(buf2))
}
// Verify the capacity is at least the same (should be the same buffer)
if cap(buf2) < capacity {
t.Fatalf("Expected capacity at least %d, got %d", capacity, cap(buf2))
}
// Grow the buffer to expose the underlying memory
buf2 = append(buf2, make([]byte, len(sensitiveData))...)
// Write some new data to the buffer to expose the underlying memory
newData := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
copy(buf2, newData)
// Verify that the sensitive data was zeroed out
// The new data should be there, but no trace of the old data
for i, b := range buf2[:len(newData)] {
if b != newData[i] {
t.Fatalf("Expected byte %d to be %d, got %d", i, newData[i], b)
}
}
}
func BenchmarkWithPool(b *testing.B) {
pool := NewPool()
b.ResetTimer()
for i := 0; i < b.N; i++ {
buf := pool.Get()
buf = append(buf, []byte("benchmark test data")...)
pool.Put(buf)
}
}
func BenchmarkWithoutPool(b *testing.B) {
for i := 0; i < b.N; i++ {
buf := make([]byte, 0, 64)
buf = append(buf, []byte("benchmark test data")...)
}
}

View File

@@ -1,59 +0,0 @@
# Codecbuf - Concurrent-Safe Bytes Buffer Pool
This package provides a concurrent-safe pool of `bytes.Buffer` objects for encoding data. It helps reduce memory allocations and improve performance by reusing buffers instead of creating new ones for each operation.
## Usage
### Basic Usage
```go
// Get a buffer from the default pool
buf := codecbuf.Get()
// Use the buffer
buf.WriteString("Hello, World!")
// ... do more operations with the buffer ...
// Return the buffer to the pool when done
codecbuf.Put(buf)
```
### Using with defer
```go
func ProcessData() {
// Get a buffer from the default pool
buf := codecbuf.Get()
// Return the buffer to the pool when the function exits
defer codecbuf.Put(buf)
// Use the buffer
buf.WriteString("Hello, World!")
// ... do more operations with the buffer ...
}
```
### Creating a Custom Pool
```go
// Create a new buffer pool
pool := codecbuf.NewPool()
// Get a buffer from the custom pool
buf := pool.Get()
// Use the buffer
buf.WriteString("Hello, World!")
// Return the buffer to the custom pool
pool.Put(buf)
```
## Performance
Using a buffer pool can significantly improve performance in applications that frequently create and use byte buffers, especially in high-throughput scenarios. The pool reduces garbage collection pressure by reusing buffers instead of allocating new ones.
## Thread Safety
The buffer pool is safe for concurrent use by multiple goroutines. However, individual buffers obtained from the pool should not be used concurrently by multiple goroutines without additional synchronization.

View File

@@ -1,53 +0,0 @@
// Package codecbuf provides a concurrent-safe bytes buffer pool for encoding
// data.
package codecbuf
import (
"bytes"
"sync"
)
// Pool is a concurrent-safe pool of bytes.Buffer objects.
type Pool struct {
pool sync.Pool
}
// NewPool creates a new buffer pool.
func NewPool() *Pool {
return &Pool{
pool: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
}
}
// Get returns a buffer from the pool or creates a new one if the pool is empty.
func (p *Pool) Get() *bytes.Buffer {
return p.pool.Get().(*bytes.Buffer)
}
// Put returns a buffer to the pool after zeroing its bytes for security and resetting it.
func (p *Pool) Put(buf *bytes.Buffer) {
// Zero out the bytes for security
data := buf.Bytes()
for i := range data {
data[i] = 0
}
buf.Reset()
p.pool.Put(buf)
}
// DefaultPool is the default buffer pool for the application.
var DefaultPool = NewPool()
// Get returns a buffer from the default pool.
func Get() *bytes.Buffer {
return DefaultPool.Get()
}
// Put returns a buffer to the default pool after zeroing its bytes for security.
func Put(buf *bytes.Buffer) {
DefaultPool.Put(buf)
}

View File

@@ -1,234 +0,0 @@
package codecbuf
import (
"bytes"
"testing"
)
func TestPool(t *testing.T) {
// Create a new pool
pool := NewPool()
// Get a buffer from the pool
buf := pool.Get()
if buf == nil {
t.Fatal("Expected non-nil buffer from pool")
}
// Write some data to the buffer
testData := "test data"
_, err := buf.WriteString(testData)
if err != nil {
t.Fatalf("Failed to write to buffer: %v", err)
}
// Verify the buffer contains the expected data
if buf.String() != testData {
t.Fatalf(
"Expected buffer to contain %q, got %q", testData, buf.String(),
)
}
// Put the buffer back in the pool
pool.Put(buf)
// Get another buffer from the pool (should be the same one, reset)
buf2 := pool.Get()
if buf2 == nil {
t.Fatal("Expected non-nil buffer from pool")
}
// Verify the buffer is empty (was reset)
if buf2.Len() != 0 {
t.Fatalf("Expected empty buffer, got buffer with length %d", buf2.Len())
}
// Write different data to the buffer
testData2 := "different data"
_, err = buf2.WriteString(testData2)
if err != nil {
t.Fatalf("Failed to write to buffer: %v", err)
}
// Verify the buffer contains the new data
if buf2.String() != testData2 {
t.Fatalf(
"Expected buffer to contain %q, got %q", testData2, buf2.String(),
)
}
}
func TestDefaultPool(t *testing.T) {
// Get a buffer from the default pool
buf := Get()
if buf == nil {
t.Fatal("Expected non-nil buffer from default pool")
}
// Write some data to the buffer
testData := "test data for default pool"
_, err := buf.WriteString(testData)
if err != nil {
t.Fatalf("Failed to write to buffer: %v", err)
}
// Verify the buffer contains the expected data
if buf.String() != testData {
t.Fatalf(
"Expected buffer to contain %q, got %q", testData, buf.String(),
)
}
// Put the buffer back in the pool
Put(buf)
// Get another buffer from the pool (should be reset)
buf2 := Get()
if buf2 == nil {
t.Fatal("Expected non-nil buffer from default pool")
}
// Verify the buffer is empty (was reset)
if buf2.Len() != 0 {
t.Fatalf("Expected empty buffer, got buffer with length %d", buf2.Len())
}
}
func TestZeroBytes(t *testing.T) {
// Create a new pool
pool := NewPool()
// Get a buffer from the pool
buf := pool.Get()
if buf == nil {
t.Fatal("Expected non-nil buffer from pool")
}
// Write some sensitive data to the buffer
sensitiveData := []byte{0x01, 0x02, 0x03, 0x04, 0x05}
_, err := buf.Write(sensitiveData)
if err != nil {
t.Fatalf("Failed to write to buffer: %v", err)
}
// Get the capacity before putting it back
capacity := buf.Cap()
// Put the buffer back in the pool
pool.Put(buf)
// Get another buffer from the pool (should be the same one, reset)
buf2 := pool.Get()
if buf2 == nil {
t.Fatal("Expected non-nil buffer from pool")
}
// Verify the buffer is empty (was reset)
if buf2.Len() != 0 {
t.Fatalf("Expected empty buffer, got buffer with length %d", buf2.Len())
}
// Verify the capacity is the same (should be the same buffer)
if buf2.Cap() != capacity {
t.Fatalf("Expected capacity %d, got %d", capacity, buf2.Cap())
}
// Get the underlying bytes directly
// We need to grow the buffer to the same size as before to access the same memory
buf2.Grow(len(sensitiveData))
// Write some new data to the buffer to expose the underlying memory
newData := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
_, err = buf2.Write(newData)
if err != nil {
t.Fatalf("Failed to write to buffer: %v", err)
}
// Read the buffer bytes
bufBytes := buf2.Bytes()
// Verify that the sensitive data was zeroed out
// The new data should be there, but no trace of the old data
for i, b := range bufBytes[:len(newData)] {
if b != newData[i] {
t.Fatalf("Expected byte %d to be %d, got %d", i, newData[i], b)
}
}
}
func TestDefaultPoolZeroBytes(t *testing.T) {
// Get a buffer from the default pool
buf := Get()
if buf == nil {
t.Fatal("Expected non-nil buffer from default pool")
}
// Write some sensitive data to the buffer
sensitiveData := []byte{0x01, 0x02, 0x03, 0x04, 0x05}
_, err := buf.Write(sensitiveData)
if err != nil {
t.Fatalf("Failed to write to buffer: %v", err)
}
// Get the capacity before putting it back
capacity := buf.Cap()
// Put the buffer back in the pool
Put(buf)
// Get another buffer from the pool (should be the same one, reset)
buf2 := Get()
if buf2 == nil {
t.Fatal("Expected non-nil buffer from default pool")
}
// Verify the buffer is empty (was reset)
if buf2.Len() != 0 {
t.Fatalf("Expected empty buffer, got buffer with length %d", buf2.Len())
}
// Verify the capacity is the same (should be the same buffer)
if buf2.Cap() != capacity {
t.Fatalf("Expected capacity %d, got %d", capacity, buf2.Cap())
}
// Get the underlying bytes directly
// We need to grow the buffer to the same size as before to access the same memory
buf2.Grow(len(sensitiveData))
// Write some new data to the buffer to expose the underlying memory
newData := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
_, err = buf2.Write(newData)
if err != nil {
t.Fatalf("Failed to write to buffer: %v", err)
}
// Read the buffer bytes
bufBytes := buf2.Bytes()
// Verify that the sensitive data was zeroed out
// The new data should be there, but no trace of the old data
for i, b := range bufBytes[:len(newData)] {
if b != newData[i] {
t.Fatalf("Expected byte %d to be %d, got %d", i, newData[i], b)
}
}
}
func BenchmarkWithPool(b *testing.B) {
pool := NewPool()
b.ResetTimer()
for i := 0; i < b.N; i++ {
buf := pool.Get()
buf.WriteString("benchmark test data")
pool.Put(buf)
}
}
func BenchmarkWithoutPool(b *testing.B) {
for i := 0; i < b.N; i++ {
buf := new(bytes.Buffer)
buf.WriteString("benchmark test data")
}
}

View File

@@ -233,10 +233,9 @@ func (r *Client) ConnectWithTLS(ctx context.T, tlsConfig *tls.Config) error {
}()
// general message reader loop
go func() {
buf := new(bytes.Buffer)
var err error
for {
buf.Reset()
buf := new(bytes.Buffer)
if err = conn.ReadMessage(r.connectionContext, buf); err != nil {
r.ConnectionError = err
r.Close()