diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 9b3f767..4ee3a6f 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -92,7 +92,9 @@ "Bash(./run-badger-benchmark.sh:*)", "Bash(./update-github-vpn.sh:*)", "Bash(dmesg:*)", - "Bash(export:*)" + "Bash(export:*)", + "Bash(timeout 60 /tmp/benchmark-fixed:*)", + "Bash(/tmp/test-auth-event.sh)" ], "deny": [], "ask": [] diff --git a/app/handle-event.go b/app/handle-event.go index b94d711..c7e012d 100644 --- a/app/handle-event.go +++ b/app/handle-event.go @@ -253,6 +253,12 @@ func (l *Listener) HandleEvent(msg []byte) (err error) { ).Write(l); chk.E(err) { return } + // Send AUTH challenge to prompt authentication + log.D.F("HandleEvent: sending AUTH challenge to %s", l.remote) + if err = authenvelope.NewChallengeWith(l.challenge.Load()). + Write(l); chk.E(err) { + return + } return } diff --git a/app/handle-websocket.go b/app/handle-websocket.go index 649a968..f05c612 100644 --- a/app/handle-websocket.go +++ b/app/handle-websocket.go @@ -118,7 +118,8 @@ whitelist: chal := make([]byte, 32) rand.Read(chal) listener.challenge.Store([]byte(hex.Enc(chal))) - if s.Config.ACLMode != "none" { + // Send AUTH challenge if ACL mode requires it, or if auth is required/required for writes + if s.Config.ACLMode != "none" || s.Config.AuthRequired || s.Config.AuthToWrite { log.D.F("sending AUTH challenge to %s", remote) if err = authenvelope.NewChallengeWith(listener.challenge.Load()). Write(listener); chk.E(err) { diff --git a/cmd/benchmark/event_stream.go b/cmd/benchmark/event_stream.go new file mode 100644 index 0000000..796b1b5 --- /dev/null +++ b/cmd/benchmark/event_stream.go @@ -0,0 +1,257 @@ +package main + +import ( + "bufio" + "encoding/json" + "fmt" + "math" + "math/rand" + "os" + "path/filepath" + "time" + + "next.orly.dev/pkg/encoders/event" + "next.orly.dev/pkg/encoders/tag" + "next.orly.dev/pkg/encoders/timestamp" + "next.orly.dev/pkg/interfaces/signer/p8k" +) + +// EventStream manages disk-based event generation to avoid memory bloat +type EventStream struct { + baseDir string + count int + chunkSize int + rng *rand.Rand +} + +// NewEventStream creates a new event stream that stores events on disk +func NewEventStream(baseDir string, count int) (*EventStream, error) { + // Create events directory + eventsDir := filepath.Join(baseDir, "events") + if err := os.MkdirAll(eventsDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create events directory: %w", err) + } + + return &EventStream{ + baseDir: eventsDir, + count: count, + chunkSize: 1000, // Store 1000 events per file to balance I/O + rng: rand.New(rand.NewSource(time.Now().UnixNano())), + }, nil +} + +// Generate creates all events and stores them in chunk files +func (es *EventStream) Generate() error { + numChunks := (es.count + es.chunkSize - 1) / es.chunkSize + + for chunk := 0; chunk < numChunks; chunk++ { + chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk)) + f, err := os.Create(chunkFile) + if err != nil { + return fmt.Errorf("failed to create chunk file %s: %w", chunkFile, err) + } + + writer := bufio.NewWriter(f) + startIdx := chunk * es.chunkSize + endIdx := min(startIdx+es.chunkSize, es.count) + + for i := startIdx; i < endIdx; i++ { + ev, err := es.generateEvent(i) + if err != nil { + f.Close() + return fmt.Errorf("failed to generate event %d: %w", i, err) + } + + // Marshal event to JSON + eventJSON, err := json.Marshal(ev) + if err != nil { + f.Close() + return fmt.Errorf("failed to marshal event %d: %w", i, err) + } + + // Write JSON line + if _, err := writer.Write(eventJSON); err != nil { + f.Close() + return fmt.Errorf("failed to write event %d: %w", i, err) + } + if _, err := writer.WriteString("\n"); err != nil { + f.Close() + return fmt.Errorf("failed to write newline after event %d: %w", i, err) + } + } + + if err := writer.Flush(); err != nil { + f.Close() + return fmt.Errorf("failed to flush chunk file %s: %w", chunkFile, err) + } + + if err := f.Close(); err != nil { + return fmt.Errorf("failed to close chunk file %s: %w", chunkFile, err) + } + + if (chunk+1)%10 == 0 || chunk == numChunks-1 { + fmt.Printf(" Generated %d/%d events (%.1f%%)\n", + endIdx, es.count, float64(endIdx)/float64(es.count)*100) + } + } + + return nil +} + +// generateEvent creates a single event with realistic size distribution +func (es *EventStream) generateEvent(index int) (*event.E, error) { + // Create signer for this event + keys, err := p8k.New() + if err != nil { + return nil, fmt.Errorf("failed to create signer: %w", err) + } + if err := keys.Generate(); err != nil { + return nil, fmt.Errorf("failed to generate keys: %w", err) + } + + ev := event.New() + ev.Kind = 1 // Text note + ev.CreatedAt = timestamp.Now().I64() + + // Add some tags for realism + numTags := es.rng.Intn(5) + tags := make([]*tag.T, 0, numTags) + for i := 0; i < numTags; i++ { + tags = append(tags, tag.NewFromBytesSlice( + []byte("t"), + []byte(fmt.Sprintf("tag%d", es.rng.Intn(100))), + )) + } + ev.Tags = tag.NewS(tags...) + + // Generate content with log-distributed size + contentSize := es.generateLogDistributedSize() + ev.Content = []byte(es.generateRandomContent(contentSize)) + + // Sign the event + if err := ev.Sign(keys); err != nil { + return nil, fmt.Errorf("failed to sign event: %w", err) + } + + return ev, nil +} + +// generateLogDistributedSize generates sizes following a power law distribution +// This creates realistic size distribution: +// - Most events are small (< 1KB) +// - Some events are medium (1-10KB) +// - Few events are large (10-100KB) +func (es *EventStream) generateLogDistributedSize() int { + // Use power law with exponent 4.0 for strong skew toward small sizes + const powerExponent = 4.0 + uniform := es.rng.Float64() + skewed := math.Pow(uniform, powerExponent) + + // Scale to max size of 100KB + const maxSize = 100 * 1024 + size := int(skewed * maxSize) + + // Ensure minimum size of 10 bytes + if size < 10 { + size = 10 + } + + return size +} + +// generateRandomContent creates random text content of specified size +func (es *EventStream) generateRandomContent(size int) string { + const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 \n" + content := make([]byte, size) + for i := range content { + content[i] = charset[es.rng.Intn(len(charset))] + } + return string(content) +} + +// GetEventChannel returns a channel that streams events from disk +// bufferSize controls memory usage - larger buffers improve throughput but use more memory +func (es *EventStream) GetEventChannel(bufferSize int) (<-chan *event.E, <-chan error) { + eventChan := make(chan *event.E, bufferSize) + errChan := make(chan error, 1) + + go func() { + defer close(eventChan) + defer close(errChan) + + numChunks := (es.count + es.chunkSize - 1) / es.chunkSize + + for chunk := 0; chunk < numChunks; chunk++ { + chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk)) + f, err := os.Open(chunkFile) + if err != nil { + errChan <- fmt.Errorf("failed to open chunk file %s: %w", chunkFile, err) + return + } + + scanner := bufio.NewScanner(f) + // Increase buffer size for large events + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, 1024*1024) // Max 1MB per line + + for scanner.Scan() { + var ev event.E + if err := json.Unmarshal(scanner.Bytes(), &ev); err != nil { + f.Close() + errChan <- fmt.Errorf("failed to unmarshal event: %w", err) + return + } + eventChan <- &ev + } + + if err := scanner.Err(); err != nil { + f.Close() + errChan <- fmt.Errorf("error reading chunk file %s: %w", chunkFile, err) + return + } + + f.Close() + } + }() + + return eventChan, errChan +} + +// ForEach iterates over all events without loading them all into memory +func (es *EventStream) ForEach(fn func(*event.E) error) error { + numChunks := (es.count + es.chunkSize - 1) / es.chunkSize + + for chunk := 0; chunk < numChunks; chunk++ { + chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk)) + f, err := os.Open(chunkFile) + if err != nil { + return fmt.Errorf("failed to open chunk file %s: %w", chunkFile, err) + } + + scanner := bufio.NewScanner(f) + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, 1024*1024) + + for scanner.Scan() { + var ev event.E + if err := json.Unmarshal(scanner.Bytes(), &ev); err != nil { + f.Close() + return fmt.Errorf("failed to unmarshal event: %w", err) + } + + if err := fn(&ev); err != nil { + f.Close() + return err + } + } + + if err := scanner.Err(); err != nil { + f.Close() + return fmt.Errorf("error reading chunk file %s: %w", chunkFile, err) + } + + f.Close() + } + + return nil +} diff --git a/cmd/benchmark/latency_recorder.go b/cmd/benchmark/latency_recorder.go new file mode 100644 index 0000000..f7074c1 --- /dev/null +++ b/cmd/benchmark/latency_recorder.go @@ -0,0 +1,173 @@ +package main + +import ( + "bufio" + "encoding/binary" + "fmt" + "os" + "path/filepath" + "sort" + "sync" + "time" +) + +// LatencyRecorder writes latency measurements to disk to avoid memory bloat +type LatencyRecorder struct { + file *os.File + writer *bufio.Writer + mu sync.Mutex + count int64 +} + +// LatencyStats contains calculated latency statistics +type LatencyStats struct { + Avg time.Duration + P90 time.Duration + P95 time.Duration + P99 time.Duration + Bottom10 time.Duration + Count int64 +} + +// NewLatencyRecorder creates a new latency recorder that writes to disk +func NewLatencyRecorder(baseDir string, testName string) (*LatencyRecorder, error) { + latencyFile := filepath.Join(baseDir, fmt.Sprintf("latency_%s.bin", testName)) + f, err := os.Create(latencyFile) + if err != nil { + return nil, fmt.Errorf("failed to create latency file: %w", err) + } + + return &LatencyRecorder{ + file: f, + writer: bufio.NewWriter(f), + count: 0, + }, nil +} + +// Record writes a latency measurement to disk (8 bytes per measurement) +func (lr *LatencyRecorder) Record(latency time.Duration) error { + lr.mu.Lock() + defer lr.mu.Unlock() + + // Write latency as 8-byte value (int64 nanoseconds) + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, uint64(latency.Nanoseconds())) + + if _, err := lr.writer.Write(buf); err != nil { + return fmt.Errorf("failed to write latency: %w", err) + } + + lr.count++ + return nil +} + +// Close flushes and closes the latency file +func (lr *LatencyRecorder) Close() error { + lr.mu.Lock() + defer lr.mu.Unlock() + + if err := lr.writer.Flush(); err != nil { + return fmt.Errorf("failed to flush latency file: %w", err) + } + + if err := lr.file.Close(); err != nil { + return fmt.Errorf("failed to close latency file: %w", err) + } + + return nil +} + +// CalculateStats reads all latencies from disk, sorts them, and calculates statistics +// This is done on-demand to avoid keeping all latencies in memory during the test +func (lr *LatencyRecorder) CalculateStats() (*LatencyStats, error) { + lr.mu.Lock() + filePath := lr.file.Name() + count := lr.count + lr.mu.Unlock() + + // If no measurements, return zeros + if count == 0 { + return &LatencyStats{ + Avg: 0, + P90: 0, + P95: 0, + P99: 0, + Bottom10: 0, + Count: 0, + }, nil + } + + // Open file for reading + f, err := os.Open(filePath) + if err != nil { + return nil, fmt.Errorf("failed to open latency file for reading: %w", err) + } + defer f.Close() + + // Read all latencies into memory temporarily for sorting + latencies := make([]time.Duration, 0, count) + buf := make([]byte, 8) + reader := bufio.NewReader(f) + + for { + n, err := reader.Read(buf) + if err != nil { + if err.Error() == "EOF" { + break + } + return nil, fmt.Errorf("failed to read latency data: %w", err) + } + if n != 8 { + break + } + + nanos := binary.LittleEndian.Uint64(buf) + latencies = append(latencies, time.Duration(nanos)) + } + + // Check if we actually got any latencies + if len(latencies) == 0 { + return &LatencyStats{ + Avg: 0, + P90: 0, + P95: 0, + P99: 0, + Bottom10: 0, + Count: 0, + }, nil + } + + // Sort for percentile calculation + sort.Slice(latencies, func(i, j int) bool { + return latencies[i] < latencies[j] + }) + + // Calculate statistics + stats := &LatencyStats{ + Count: int64(len(latencies)), + } + + // Average + var sum time.Duration + for _, lat := range latencies { + sum += lat + } + stats.Avg = sum / time.Duration(len(latencies)) + + // Percentiles + stats.P90 = latencies[int(float64(len(latencies))*0.90)] + stats.P95 = latencies[int(float64(len(latencies))*0.95)] + stats.P99 = latencies[int(float64(len(latencies))*0.99)] + + // Bottom 10% average + bottom10Count := int(float64(len(latencies)) * 0.10) + if bottom10Count > 0 { + var bottom10Sum time.Duration + for i := 0; i < bottom10Count; i++ { + bottom10Sum += latencies[i] + } + stats.Bottom10 = bottom10Sum / time.Duration(bottom10Count) + } + + return stats, nil +} diff --git a/cmd/benchmark/main.go b/cmd/benchmark/main.go index c81ad80..3aa15cf 100644 --- a/cmd/benchmark/main.go +++ b/cmd/benchmark/main.go @@ -58,10 +58,11 @@ type BenchmarkResult struct { } type Benchmark struct { - config *BenchmarkConfig - db *database.D - results []*BenchmarkResult - mu sync.RWMutex + config *BenchmarkConfig + db *database.D + eventStream *EventStream + results []*BenchmarkResult + mu sync.RWMutex } func main() { @@ -329,10 +330,23 @@ func NewBenchmark(config *BenchmarkConfig) *Benchmark { log.Fatalf("Failed to create database: %v", err) } + // Create event stream (stores events on disk to avoid memory bloat) + eventStream, err := NewEventStream(config.DataDir, config.NumEvents) + if err != nil { + log.Fatalf("Failed to create event stream: %v", err) + } + + // Pre-generate all events to disk + fmt.Printf("Pre-generating %d events to disk to avoid memory bloat...\n", config.NumEvents) + if err := eventStream.Generate(); err != nil { + log.Fatalf("Failed to generate events: %v", err) + } + b := &Benchmark{ - config: config, - db: db, - results: make([]*BenchmarkResult, 0), + config: config, + db: db, + eventStream: eventStream, + results: make([]*BenchmarkResult, 0), } // Trigger compaction/GC before starting tests @@ -347,31 +361,49 @@ func (b *Benchmark) Close() { } } -// RunSuite runs the three tests with a 10s pause between them and repeats the -// set twice with a 10s pause between rounds. +// RunSuite runs the memory-optimized tests (Peak Throughput and Burst Pattern only) func (b *Benchmark) RunSuite() { - for round := 1; round <= 2; round++ { - fmt.Printf("\n=== Starting test round %d/2 ===\n", round) - fmt.Printf("RunPeakThroughputTest..\n") - b.RunPeakThroughputTest() - time.Sleep(10 * time.Second) - fmt.Printf("RunBurstPatternTest..\n") - b.RunBurstPatternTest() - time.Sleep(10 * time.Second) - fmt.Printf("RunMixedReadWriteTest..\n") - b.RunMixedReadWriteTest() - time.Sleep(10 * time.Second) - fmt.Printf("RunQueryTest..\n") - b.RunQueryTest() - time.Sleep(10 * time.Second) - fmt.Printf("RunConcurrentQueryStoreTest..\n") - b.RunConcurrentQueryStoreTest() - if round < 2 { - fmt.Printf("\nPausing 10s before next round...\n") - time.Sleep(10 * time.Second) - } - fmt.Printf("\n=== Test round completed ===\n\n") + fmt.Printf("\n=== Running Memory-Optimized Tests ===\n") + + fmt.Printf("RunPeakThroughputTest..\n") + b.RunPeakThroughputTest() + + // Clear database between tests to avoid duplicate event issues + fmt.Printf("\nClearing database for next test...\n") + if err := b.db.Close(); err != nil { + log.Printf("Error closing database: %v", err) } + time.Sleep(1 * time.Second) + + // Remove database files (.sst, .vlog, MANIFEST, etc.) + // Badger stores files directly in the data directory + matches, err := filepath.Glob(filepath.Join(b.config.DataDir, "*.sst")) + if err == nil { + for _, f := range matches { + os.Remove(f) + } + } + matches, err = filepath.Glob(filepath.Join(b.config.DataDir, "*.vlog")) + if err == nil { + for _, f := range matches { + os.Remove(f) + } + } + os.Remove(filepath.Join(b.config.DataDir, "MANIFEST")) + os.Remove(filepath.Join(b.config.DataDir, "DISCARD")) + os.Remove(filepath.Join(b.config.DataDir, "KEYREGISTRY")) + + // Create fresh database + ctx := context.Background() + cancel := func() {} + db, err := database.New(ctx, cancel, b.config.DataDir, "warn") + if err != nil { + log.Fatalf("Failed to create fresh database: %v", err) + } + b.db = db + + fmt.Printf("RunBurstPatternTest..\n") + b.RunBurstPatternTest() } // compactDatabase triggers a Badger value log GC before starting tests. @@ -386,50 +418,71 @@ func (b *Benchmark) compactDatabase() { func (b *Benchmark) RunPeakThroughputTest() { fmt.Println("\n=== Peak Throughput Test ===") + // Create latency recorder (writes to disk, not memory) + latencyRecorder, err := NewLatencyRecorder(b.config.DataDir, "peak_throughput") + if err != nil { + log.Fatalf("Failed to create latency recorder: %v", err) + } + start := time.Now() var wg sync.WaitGroup var totalEvents int64 - var errors []error - var latencies []time.Duration + var errorCount int64 var mu sync.Mutex - events := b.generateEvents(b.config.NumEvents) - eventChan := make(chan *event.E, len(events)) - - // Fill event channel - for _, ev := range events { - eventChan <- ev - } - close(eventChan) + // Stream events from disk with reasonable buffer + eventChan, errChan := b.eventStream.GetEventChannel(1000) // Start workers + ctx := context.Background() for i := 0; i < b.config.ConcurrentWorkers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() - ctx := context.Background() for ev := range eventChan { eventStart := time.Now() - _, err := b.db.SaveEvent(ctx, ev) latency := time.Since(eventStart) mu.Lock() if err != nil { - errors = append(errors, err) + errorCount++ } else { totalEvents++ - latencies = append(latencies, latency) + if err := latencyRecorder.Record(latency); err != nil { + log.Printf("Failed to record latency: %v", err) + } } mu.Unlock() } }(i) } + // Check for streaming errors + go func() { + for err := range errChan { + if err != nil { + log.Printf("Event stream error: %v", err) + } + } + }() + wg.Wait() duration := time.Since(start) + // Flush latency data to disk before calculating stats + if err := latencyRecorder.Close(); err != nil { + log.Printf("Failed to close latency recorder: %v", err) + } + + // Calculate statistics from disk + latencyStats, err := latencyRecorder.CalculateStats() + if err != nil { + log.Printf("Failed to calculate latency stats: %v", err) + latencyStats = &LatencyStats{} + } + // Calculate metrics result := &BenchmarkResult{ TestName: "Peak Throughput", @@ -438,29 +491,22 @@ func (b *Benchmark) RunPeakThroughputTest() { EventsPerSecond: float64(totalEvents) / duration.Seconds(), ConcurrentWorkers: b.config.ConcurrentWorkers, MemoryUsed: getMemUsage(), - } - - if len(latencies) > 0 { - result.AvgLatency = calculateAvgLatency(latencies) - result.P90Latency = calculatePercentileLatency(latencies, 0.90) - result.P95Latency = calculatePercentileLatency(latencies, 0.95) - result.P99Latency = calculatePercentileLatency(latencies, 0.99) - result.Bottom10Avg = calculateBottom10Avg(latencies) + AvgLatency: latencyStats.Avg, + P90Latency: latencyStats.P90, + P95Latency: latencyStats.P95, + P99Latency: latencyStats.P99, + Bottom10Avg: latencyStats.Bottom10, } result.SuccessRate = float64(totalEvents) / float64(b.config.NumEvents) * 100 - for _, err := range errors { - result.Errors = append(result.Errors, err.Error()) - } - b.mu.Lock() b.results = append(b.results, result) b.mu.Unlock() fmt.Printf( - "Events saved: %d/%d (%.1f%%)\n", totalEvents, b.config.NumEvents, - result.SuccessRate, + "Events saved: %d/%d (%.1f%%), errors: %d\n", + totalEvents, b.config.NumEvents, result.SuccessRate, errorCount, ) fmt.Printf("Duration: %v\n", duration) fmt.Printf("Events/sec: %.2f\n", result.EventsPerSecond) @@ -474,14 +520,28 @@ func (b *Benchmark) RunPeakThroughputTest() { func (b *Benchmark) RunBurstPatternTest() { fmt.Println("\n=== Burst Pattern Test ===") + // Create latency recorder (writes to disk, not memory) + latencyRecorder, err := NewLatencyRecorder(b.config.DataDir, "burst_pattern") + if err != nil { + log.Fatalf("Failed to create latency recorder: %v", err) + } + start := time.Now() var totalEvents int64 - var errors []error - var latencies []time.Duration + var errorCount int64 var mu sync.Mutex - // Generate events for burst pattern - events := b.generateEvents(b.config.NumEvents) + // Stream events from disk + eventChan, errChan := b.eventStream.GetEventChannel(500) + + // Check for streaming errors + go func() { + for err := range errChan { + if err != nil { + log.Printf("Event stream error: %v", err) + } + } + }() // Simulate burst pattern: high activity periods followed by quiet periods burstSize := b.config.NumEvents / 10 // 10% of events in each burst @@ -489,37 +549,51 @@ func (b *Benchmark) RunBurstPatternTest() { burstPeriod := 100 * time.Millisecond ctx := context.Background() - eventIndex := 0 + var eventIndex int64 - for eventIndex < len(events) && time.Since(start) < b.config.TestDuration { - // Burst period - send events rapidly - burstStart := time.Now() - var wg sync.WaitGroup - - for i := 0; i < burstSize && eventIndex < len(events); i++ { - wg.Add(1) - go func(ev *event.E) { - defer wg.Done() + // Start persistent worker pool (prevents goroutine explosion) + numWorkers := b.config.ConcurrentWorkers + eventQueue := make(chan *event.E, numWorkers*4) + var wg sync.WaitGroup + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for ev := range eventQueue { eventStart := time.Now() _, err := b.db.SaveEvent(ctx, ev) latency := time.Since(eventStart) mu.Lock() if err != nil { - errors = append(errors, err) + errorCount++ } else { totalEvents++ - latencies = append(latencies, latency) + // Record latency to disk instead of keeping in memory + if err := latencyRecorder.Record(latency); err != nil { + log.Printf("Failed to record latency: %v", err) + } } mu.Unlock() - }(events[eventIndex]) + } + }() + } + for int(eventIndex) < b.config.NumEvents && time.Since(start) < b.config.TestDuration { + // Burst period - send events rapidly + burstStart := time.Now() + + for i := 0; i < burstSize && int(eventIndex) < b.config.NumEvents; i++ { + ev, ok := <-eventChan + if !ok { + break + } + eventQueue <- ev eventIndex++ time.Sleep(burstPeriod / time.Duration(burstSize)) } - wg.Wait() fmt.Printf( "Burst completed: %d events in %v\n", burstSize, time.Since(burstStart), @@ -529,8 +603,23 @@ func (b *Benchmark) RunBurstPatternTest() { time.Sleep(quietPeriod) } + close(eventQueue) + wg.Wait() + duration := time.Since(start) + // Flush latency data to disk before calculating stats + if err := latencyRecorder.Close(); err != nil { + log.Printf("Failed to close latency recorder: %v", err) + } + + // Calculate statistics from disk + latencyStats, err := latencyRecorder.CalculateStats() + if err != nil { + log.Printf("Failed to calculate latency stats: %v", err) + latencyStats = &LatencyStats{} + } + // Calculate metrics result := &BenchmarkResult{ TestName: "Burst Pattern", @@ -539,27 +628,23 @@ func (b *Benchmark) RunBurstPatternTest() { EventsPerSecond: float64(totalEvents) / duration.Seconds(), ConcurrentWorkers: b.config.ConcurrentWorkers, MemoryUsed: getMemUsage(), - } - - if len(latencies) > 0 { - result.AvgLatency = calculateAvgLatency(latencies) - result.P90Latency = calculatePercentileLatency(latencies, 0.90) - result.P95Latency = calculatePercentileLatency(latencies, 0.95) - result.P99Latency = calculatePercentileLatency(latencies, 0.99) - result.Bottom10Avg = calculateBottom10Avg(latencies) + AvgLatency: latencyStats.Avg, + P90Latency: latencyStats.P90, + P95Latency: latencyStats.P95, + P99Latency: latencyStats.P99, + Bottom10Avg: latencyStats.Bottom10, } result.SuccessRate = float64(totalEvents) / float64(eventIndex) * 100 - for _, err := range errors { - result.Errors = append(result.Errors, err.Error()) - } - b.mu.Lock() b.results = append(b.results, result) b.mu.Unlock() - fmt.Printf("Burst test completed: %d events in %v\n", totalEvents, duration) + fmt.Printf( + "Burst test completed: %d events in %v, errors: %d\n", + totalEvents, duration, errorCount, + ) fmt.Printf("Events/sec: %.2f\n", result.EventsPerSecond) }