Files
next.orly.dev/cmd/benchmark/event_stream.go

258 lines
6.8 KiB
Go

package main
import (
"bufio"
"encoding/json"
"fmt"
"math"
"math/rand"
"os"
"path/filepath"
"time"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/tag"
"next.orly.dev/pkg/encoders/timestamp"
"next.orly.dev/pkg/interfaces/signer/p8k"
)
// EventStream manages disk-based event generation to avoid memory bloat
type EventStream struct {
baseDir string
count int
chunkSize int
rng *rand.Rand
}
// NewEventStream creates a new event stream that stores events on disk
func NewEventStream(baseDir string, count int) (*EventStream, error) {
// Create events directory
eventsDir := filepath.Join(baseDir, "events")
if err := os.MkdirAll(eventsDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create events directory: %w", err)
}
return &EventStream{
baseDir: eventsDir,
count: count,
chunkSize: 1000, // Store 1000 events per file to balance I/O
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
}, nil
}
// Generate creates all events and stores them in chunk files
func (es *EventStream) Generate() error {
numChunks := (es.count + es.chunkSize - 1) / es.chunkSize
for chunk := 0; chunk < numChunks; chunk++ {
chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk))
f, err := os.Create(chunkFile)
if err != nil {
return fmt.Errorf("failed to create chunk file %s: %w", chunkFile, err)
}
writer := bufio.NewWriter(f)
startIdx := chunk * es.chunkSize
endIdx := min(startIdx+es.chunkSize, es.count)
for i := startIdx; i < endIdx; i++ {
ev, err := es.generateEvent(i)
if err != nil {
f.Close()
return fmt.Errorf("failed to generate event %d: %w", i, err)
}
// Marshal event to JSON
eventJSON, err := json.Marshal(ev)
if err != nil {
f.Close()
return fmt.Errorf("failed to marshal event %d: %w", i, err)
}
// Write JSON line
if _, err := writer.Write(eventJSON); err != nil {
f.Close()
return fmt.Errorf("failed to write event %d: %w", i, err)
}
if _, err := writer.WriteString("\n"); err != nil {
f.Close()
return fmt.Errorf("failed to write newline after event %d: %w", i, err)
}
}
if err := writer.Flush(); err != nil {
f.Close()
return fmt.Errorf("failed to flush chunk file %s: %w", chunkFile, err)
}
if err := f.Close(); err != nil {
return fmt.Errorf("failed to close chunk file %s: %w", chunkFile, err)
}
if (chunk+1)%10 == 0 || chunk == numChunks-1 {
fmt.Printf(" Generated %d/%d events (%.1f%%)\n",
endIdx, es.count, float64(endIdx)/float64(es.count)*100)
}
}
return nil
}
// generateEvent creates a single event with realistic size distribution
func (es *EventStream) generateEvent(index int) (*event.E, error) {
// Create signer for this event
keys, err := p8k.New()
if err != nil {
return nil, fmt.Errorf("failed to create signer: %w", err)
}
if err := keys.Generate(); err != nil {
return nil, fmt.Errorf("failed to generate keys: %w", err)
}
ev := event.New()
ev.Kind = 1 // Text note
ev.CreatedAt = timestamp.Now().I64()
// Add some tags for realism
numTags := es.rng.Intn(5)
tags := make([]*tag.T, 0, numTags)
for i := 0; i < numTags; i++ {
tags = append(tags, tag.NewFromBytesSlice(
[]byte("t"),
[]byte(fmt.Sprintf("tag%d", es.rng.Intn(100))),
))
}
ev.Tags = tag.NewS(tags...)
// Generate content with log-distributed size
contentSize := es.generateLogDistributedSize()
ev.Content = []byte(es.generateRandomContent(contentSize))
// Sign the event
if err := ev.Sign(keys); err != nil {
return nil, fmt.Errorf("failed to sign event: %w", err)
}
return ev, nil
}
// generateLogDistributedSize generates sizes following a power law distribution
// This creates realistic size distribution:
// - Most events are small (< 1KB)
// - Some events are medium (1-10KB)
// - Few events are large (10-100KB)
func (es *EventStream) generateLogDistributedSize() int {
// Use power law with exponent 4.0 for strong skew toward small sizes
const powerExponent = 4.0
uniform := es.rng.Float64()
skewed := math.Pow(uniform, powerExponent)
// Scale to max size of 100KB
const maxSize = 100 * 1024
size := int(skewed * maxSize)
// Ensure minimum size of 10 bytes
if size < 10 {
size = 10
}
return size
}
// generateRandomContent creates random text content of specified size
func (es *EventStream) generateRandomContent(size int) string {
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 \n"
content := make([]byte, size)
for i := range content {
content[i] = charset[es.rng.Intn(len(charset))]
}
return string(content)
}
// GetEventChannel returns a channel that streams events from disk
// bufferSize controls memory usage - larger buffers improve throughput but use more memory
func (es *EventStream) GetEventChannel(bufferSize int) (<-chan *event.E, <-chan error) {
eventChan := make(chan *event.E, bufferSize)
errChan := make(chan error, 1)
go func() {
defer close(eventChan)
defer close(errChan)
numChunks := (es.count + es.chunkSize - 1) / es.chunkSize
for chunk := 0; chunk < numChunks; chunk++ {
chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk))
f, err := os.Open(chunkFile)
if err != nil {
errChan <- fmt.Errorf("failed to open chunk file %s: %w", chunkFile, err)
return
}
scanner := bufio.NewScanner(f)
// Increase buffer size for large events
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024) // Max 1MB per line
for scanner.Scan() {
var ev event.E
if err := json.Unmarshal(scanner.Bytes(), &ev); err != nil {
f.Close()
errChan <- fmt.Errorf("failed to unmarshal event: %w", err)
return
}
eventChan <- &ev
}
if err := scanner.Err(); err != nil {
f.Close()
errChan <- fmt.Errorf("error reading chunk file %s: %w", chunkFile, err)
return
}
f.Close()
}
}()
return eventChan, errChan
}
// ForEach iterates over all events without loading them all into memory
func (es *EventStream) ForEach(fn func(*event.E) error) error {
numChunks := (es.count + es.chunkSize - 1) / es.chunkSize
for chunk := 0; chunk < numChunks; chunk++ {
chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk))
f, err := os.Open(chunkFile)
if err != nil {
return fmt.Errorf("failed to open chunk file %s: %w", chunkFile, err)
}
scanner := bufio.NewScanner(f)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
var ev event.E
if err := json.Unmarshal(scanner.Bytes(), &ev); err != nil {
f.Close()
return fmt.Errorf("failed to unmarshal event: %w", err)
}
if err := fn(&ev); err != nil {
f.Close()
return err
}
}
if err := scanner.Err(); err != nil {
f.Close()
return fmt.Errorf("error reading chunk file %s: %w", chunkFile, err)
}
f.Close()
}
return nil
}