Delete outdated benchmark reports and results.
Removed old benchmark reports and detailed logs from the repository to clean up unnecessary files. These reports appear to be auto-generated and no longer relevant for ongoing development.
This commit is contained in:
@@ -14,6 +14,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
lol "lol.mleku.dev"
|
||||
"next.orly.dev/pkg/crypto/p256k"
|
||||
"next.orly.dev/pkg/database"
|
||||
"next.orly.dev/pkg/encoders/envelopes/eventenvelope"
|
||||
@@ -63,6 +64,7 @@ type Benchmark struct {
|
||||
}
|
||||
|
||||
func main() {
|
||||
lol.SetLogLevel("trace")
|
||||
config := parseFlags()
|
||||
|
||||
if config.RelayURL != "" {
|
||||
@@ -96,7 +98,7 @@ func parseFlags() *BenchmarkConfig {
|
||||
&config.DataDir, "datadir", "/tmp/benchmark_db", "Database directory",
|
||||
)
|
||||
flag.IntVar(
|
||||
&config.NumEvents, "events", 100000, "Number of events to generate",
|
||||
&config.NumEvents, "events", 10000, "Number of events to generate",
|
||||
)
|
||||
flag.IntVar(
|
||||
&config.ConcurrentWorkers, "workers", runtime.NumCPU(),
|
||||
@@ -133,8 +135,14 @@ func runNetworkLoad(cfg *BenchmarkConfig) {
|
||||
"Network mode: relay=%s workers=%d rate=%d ev/s per worker duration=%s\n",
|
||||
cfg.RelayURL, cfg.NetWorkers, cfg.NetRate, cfg.TestDuration,
|
||||
)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), cfg.TestDuration)
|
||||
// Create a timeout context for benchmark control only, not for connections
|
||||
timeoutCtx, cancel := context.WithTimeout(context.Background(), cfg.TestDuration)
|
||||
defer cancel()
|
||||
|
||||
// Use a separate background context for relay connections to avoid
|
||||
// cancelling the server when the benchmark timeout expires
|
||||
connCtx := context.Background()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
if cfg.NetWorkers <= 0 {
|
||||
cfg.NetWorkers = 1
|
||||
@@ -146,8 +154,8 @@ func runNetworkLoad(cfg *BenchmarkConfig) {
|
||||
wg.Add(1)
|
||||
go func(workerID int) {
|
||||
defer wg.Done()
|
||||
// Connect to relay
|
||||
rl, err := ws.RelayConnect(ctx, cfg.RelayURL)
|
||||
// Connect to relay using non-cancellable context
|
||||
rl, err := ws.RelayConnect(connCtx, cfg.RelayURL)
|
||||
if err != nil {
|
||||
fmt.Printf(
|
||||
"worker %d: failed to connect to %s: %v\n", workerID,
|
||||
@@ -174,17 +182,28 @@ func runNetworkLoad(cfg *BenchmarkConfig) {
|
||||
f.Authors = tag.NewWithCap(1)
|
||||
f.Authors.T = append(f.Authors.T, keys.Pub())
|
||||
f.Since = timestamp.FromUnix(since)
|
||||
sub, err := rl.Subscribe(ctx, filter.NewS(f))
|
||||
sub, err := rl.Subscribe(connCtx, filter.NewS(f))
|
||||
if err != nil {
|
||||
fmt.Printf("worker %d: subscribe error: %v\n", workerID, err)
|
||||
fmt.Printf(
|
||||
"worker %d: subscribe error: %v\n", workerID, err,
|
||||
)
|
||||
return
|
||||
}
|
||||
defer sub.Unsub()
|
||||
recv := 0
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Printf("worker %d: subscriber exiting after %d events\n", workerID, recv)
|
||||
case <-timeoutCtx.Done():
|
||||
fmt.Printf(
|
||||
"worker %d: subscriber exiting after %d events (benchmark timeout: %v)\n",
|
||||
workerID, recv, timeoutCtx.Err(),
|
||||
)
|
||||
return
|
||||
case <-rl.Context().Done():
|
||||
fmt.Printf(
|
||||
"worker %d: relay connection closed; cause=%v lastErr=%v\n",
|
||||
workerID, rl.ConnectionCause(), rl.LastError(),
|
||||
)
|
||||
return
|
||||
case <-sub.EndOfStoredEvents:
|
||||
// continue streaming live events
|
||||
@@ -194,7 +213,10 @@ func runNetworkLoad(cfg *BenchmarkConfig) {
|
||||
}
|
||||
recv++
|
||||
if recv%100 == 0 {
|
||||
fmt.Printf("worker %d: received %d matching events\n", workerID, recv)
|
||||
fmt.Printf(
|
||||
"worker %d: received %d matching events\n",
|
||||
workerID, recv,
|
||||
)
|
||||
}
|
||||
ev.Free()
|
||||
}
|
||||
@@ -207,7 +229,7 @@ func runNetworkLoad(cfg *BenchmarkConfig) {
|
||||
count := 0
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-timeoutCtx.Done():
|
||||
fmt.Printf(
|
||||
"worker %d: stopping after %d publishes\n", workerID,
|
||||
count,
|
||||
@@ -233,12 +255,16 @@ func runNetworkLoad(cfg *BenchmarkConfig) {
|
||||
select {
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
fmt.Printf("worker %d: write error: %v\n", workerID, err)
|
||||
fmt.Printf(
|
||||
"worker %d: write error: %v\n", workerID, err,
|
||||
)
|
||||
}
|
||||
default:
|
||||
}
|
||||
if count%100 == 0 {
|
||||
fmt.Printf("worker %d: sent %d events\n", workerID, count)
|
||||
fmt.Printf(
|
||||
"worker %d: sent %d events\n", workerID, count,
|
||||
)
|
||||
}
|
||||
ev.Free()
|
||||
count++
|
||||
@@ -284,15 +310,25 @@ func (b *Benchmark) Close() {
|
||||
func (b *Benchmark) RunSuite() {
|
||||
for round := 1; round <= 2; round++ {
|
||||
fmt.Printf("\n=== Starting test round %d/2 ===\n", round)
|
||||
fmt.Printf("RunPeakThroughputTest..\n")
|
||||
b.RunPeakThroughputTest()
|
||||
time.Sleep(10 * time.Second)
|
||||
fmt.Printf("RunBurstPatternTest..\n")
|
||||
b.RunBurstPatternTest()
|
||||
time.Sleep(10 * time.Second)
|
||||
fmt.Printf("RunMixedReadWriteTest..\n")
|
||||
b.RunMixedReadWriteTest()
|
||||
time.Sleep(10 * time.Second)
|
||||
fmt.Printf("RunQueryTest..\n")
|
||||
b.RunQueryTest()
|
||||
time.Sleep(10 * time.Second)
|
||||
fmt.Printf("RunConcurrentQueryStoreTest..\n")
|
||||
b.RunConcurrentQueryStoreTest()
|
||||
if round < 2 {
|
||||
fmt.Println("\nPausing 10s before next round...")
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
fmt.Println("\n=== Test round completed ===\n")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -595,6 +631,330 @@ func (b *Benchmark) RunMixedReadWriteTest() {
|
||||
fmt.Printf("Combined ops/sec: %.2f\n", result.EventsPerSecond)
|
||||
}
|
||||
|
||||
// RunQueryTest specifically benchmarks the QueryEvents function performance
|
||||
func (b *Benchmark) RunQueryTest() {
|
||||
fmt.Println("\n=== Query Test ===")
|
||||
|
||||
start := time.Now()
|
||||
var totalQueries int64
|
||||
var queryLatencies []time.Duration
|
||||
var errors []error
|
||||
var mu sync.Mutex
|
||||
|
||||
// Pre-populate with events for querying
|
||||
numSeedEvents := 10000
|
||||
seedEvents := b.generateEvents(numSeedEvents)
|
||||
ctx := context.Background()
|
||||
|
||||
fmt.Printf(
|
||||
"Pre-populating database with %d events for query tests...\n",
|
||||
numSeedEvents,
|
||||
)
|
||||
for _, ev := range seedEvents {
|
||||
b.db.SaveEvent(ctx, ev)
|
||||
}
|
||||
|
||||
// Create different types of filters for querying
|
||||
filters := []*filter.F{
|
||||
func() *filter.F { // Kind filter
|
||||
f := filter.New()
|
||||
f.Kinds = kind.NewS(kind.TextNote)
|
||||
limit := uint(100)
|
||||
f.Limit = &limit
|
||||
return f
|
||||
}(),
|
||||
func() *filter.F { // Tag filter
|
||||
f := filter.New()
|
||||
f.Tags = tag.NewS(
|
||||
tag.NewFromBytesSlice(
|
||||
[]byte("t"), []byte("benchmark"),
|
||||
),
|
||||
)
|
||||
limit := uint(100)
|
||||
f.Limit = &limit
|
||||
return f
|
||||
}(),
|
||||
func() *filter.F { // Mixed filter
|
||||
f := filter.New()
|
||||
f.Kinds = kind.NewS(kind.TextNote)
|
||||
f.Tags = tag.NewS(
|
||||
tag.NewFromBytesSlice(
|
||||
[]byte("t"), []byte("benchmark"),
|
||||
),
|
||||
)
|
||||
limit := uint(50)
|
||||
f.Limit = &limit
|
||||
return f
|
||||
}(),
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
// Start query workers
|
||||
for i := 0; i < b.config.ConcurrentWorkers; i++ {
|
||||
wg.Add(1)
|
||||
go func(workerID int) {
|
||||
defer wg.Done()
|
||||
|
||||
filterIndex := workerID % len(filters)
|
||||
queryCount := 0
|
||||
|
||||
for time.Since(start) < b.config.TestDuration {
|
||||
// Rotate through different filters
|
||||
f := filters[filterIndex]
|
||||
filterIndex = (filterIndex + 1) % len(filters)
|
||||
|
||||
// Execute query
|
||||
queryStart := time.Now()
|
||||
events, err := b.db.QueryEvents(ctx, f)
|
||||
queryLatency := time.Since(queryStart)
|
||||
|
||||
mu.Lock()
|
||||
if err != nil {
|
||||
errors = append(errors, err)
|
||||
} else {
|
||||
totalQueries++
|
||||
queryLatencies = append(queryLatencies, queryLatency)
|
||||
|
||||
// Free event memory
|
||||
for _, ev := range events {
|
||||
ev.Free()
|
||||
}
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
queryCount++
|
||||
if queryCount%10 == 0 {
|
||||
time.Sleep(10 * time.Millisecond) // Small delay every 10 queries
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
duration := time.Since(start)
|
||||
|
||||
// Calculate metrics
|
||||
result := &BenchmarkResult{
|
||||
TestName: "Query Performance",
|
||||
Duration: duration,
|
||||
TotalEvents: int(totalQueries),
|
||||
EventsPerSecond: float64(totalQueries) / duration.Seconds(),
|
||||
ConcurrentWorkers: b.config.ConcurrentWorkers,
|
||||
MemoryUsed: getMemUsage(),
|
||||
}
|
||||
|
||||
if len(queryLatencies) > 0 {
|
||||
result.AvgLatency = calculateAvgLatency(queryLatencies)
|
||||
result.P90Latency = calculatePercentileLatency(queryLatencies, 0.90)
|
||||
result.P95Latency = calculatePercentileLatency(queryLatencies, 0.95)
|
||||
result.P99Latency = calculatePercentileLatency(queryLatencies, 0.99)
|
||||
result.Bottom10Avg = calculateBottom10Avg(queryLatencies)
|
||||
}
|
||||
|
||||
result.SuccessRate = 100.0 // No specific target count for queries
|
||||
|
||||
for _, err := range errors {
|
||||
result.Errors = append(result.Errors, err.Error())
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
b.results = append(b.results, result)
|
||||
b.mu.Unlock()
|
||||
|
||||
fmt.Printf(
|
||||
"Query test completed: %d queries in %v\n", totalQueries, duration,
|
||||
)
|
||||
fmt.Printf("Queries/sec: %.2f\n", result.EventsPerSecond)
|
||||
fmt.Printf("Avg query latency: %v\n", result.AvgLatency)
|
||||
fmt.Printf("P95 query latency: %v\n", result.P95Latency)
|
||||
fmt.Printf("P99 query latency: %v\n", result.P99Latency)
|
||||
}
|
||||
|
||||
// RunConcurrentQueryStoreTest benchmarks the performance of concurrent query and store operations
|
||||
func (b *Benchmark) RunConcurrentQueryStoreTest() {
|
||||
fmt.Println("\n=== Concurrent Query/Store Test ===")
|
||||
|
||||
start := time.Now()
|
||||
var totalQueries, totalWrites int64
|
||||
var queryLatencies, writeLatencies []time.Duration
|
||||
var errors []error
|
||||
var mu sync.Mutex
|
||||
|
||||
// Pre-populate with some events
|
||||
numSeedEvents := 5000
|
||||
seedEvents := b.generateEvents(numSeedEvents)
|
||||
ctx := context.Background()
|
||||
|
||||
fmt.Printf(
|
||||
"Pre-populating database with %d events for concurrent query/store test...\n",
|
||||
numSeedEvents,
|
||||
)
|
||||
for _, ev := range seedEvents {
|
||||
b.db.SaveEvent(ctx, ev)
|
||||
}
|
||||
|
||||
// Generate events for writing during the test
|
||||
writeEvents := b.generateEvents(b.config.NumEvents)
|
||||
|
||||
// Create filters for querying
|
||||
filters := []*filter.F{
|
||||
func() *filter.F { // Recent events filter
|
||||
f := filter.New()
|
||||
f.Since = timestamp.FromUnix(time.Now().Add(-10 * time.Minute).Unix())
|
||||
limit := uint(100)
|
||||
f.Limit = &limit
|
||||
return f
|
||||
}(),
|
||||
func() *filter.F { // Kind and tag filter
|
||||
f := filter.New()
|
||||
f.Kinds = kind.NewS(kind.TextNote)
|
||||
f.Tags = tag.NewS(
|
||||
tag.NewFromBytesSlice(
|
||||
[]byte("t"), []byte("benchmark"),
|
||||
),
|
||||
)
|
||||
limit := uint(50)
|
||||
f.Limit = &limit
|
||||
return f
|
||||
}(),
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Half of the workers will be readers, half will be writers
|
||||
numReaders := b.config.ConcurrentWorkers / 2
|
||||
numWriters := b.config.ConcurrentWorkers - numReaders
|
||||
|
||||
// Start query workers (readers)
|
||||
for i := 0; i < numReaders; i++ {
|
||||
wg.Add(1)
|
||||
go func(workerID int) {
|
||||
defer wg.Done()
|
||||
|
||||
filterIndex := workerID % len(filters)
|
||||
queryCount := 0
|
||||
|
||||
for time.Since(start) < b.config.TestDuration {
|
||||
// Select a filter
|
||||
f := filters[filterIndex]
|
||||
filterIndex = (filterIndex + 1) % len(filters)
|
||||
|
||||
// Execute query
|
||||
queryStart := time.Now()
|
||||
events, err := b.db.QueryEvents(ctx, f)
|
||||
queryLatency := time.Since(queryStart)
|
||||
|
||||
mu.Lock()
|
||||
if err != nil {
|
||||
errors = append(errors, err)
|
||||
} else {
|
||||
totalQueries++
|
||||
queryLatencies = append(queryLatencies, queryLatency)
|
||||
|
||||
// Free event memory
|
||||
for _, ev := range events {
|
||||
ev.Free()
|
||||
}
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
queryCount++
|
||||
if queryCount%5 == 0 {
|
||||
time.Sleep(5 * time.Millisecond) // Small delay
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Start write workers
|
||||
for i := 0; i < numWriters; i++ {
|
||||
wg.Add(1)
|
||||
go func(workerID int) {
|
||||
defer wg.Done()
|
||||
|
||||
eventIndex := workerID
|
||||
writeCount := 0
|
||||
|
||||
for time.Since(start) < b.config.TestDuration && eventIndex < len(writeEvents) {
|
||||
// Write operation
|
||||
writeStart := time.Now()
|
||||
_, _, err := b.db.SaveEvent(ctx, writeEvents[eventIndex])
|
||||
writeLatency := time.Since(writeStart)
|
||||
|
||||
mu.Lock()
|
||||
if err != nil {
|
||||
errors = append(errors, err)
|
||||
} else {
|
||||
totalWrites++
|
||||
writeLatencies = append(writeLatencies, writeLatency)
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
eventIndex += numWriters
|
||||
writeCount++
|
||||
|
||||
if writeCount%10 == 0 {
|
||||
time.Sleep(10 * time.Millisecond) // Small delay every 10 writes
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
duration := time.Since(start)
|
||||
|
||||
// Calculate metrics
|
||||
totalOps := totalQueries + totalWrites
|
||||
result := &BenchmarkResult{
|
||||
TestName: "Concurrent Query/Store",
|
||||
Duration: duration,
|
||||
TotalEvents: int(totalOps),
|
||||
EventsPerSecond: float64(totalOps) / duration.Seconds(),
|
||||
ConcurrentWorkers: b.config.ConcurrentWorkers,
|
||||
MemoryUsed: getMemUsage(),
|
||||
}
|
||||
|
||||
// Calculate combined latencies for overall metrics
|
||||
allLatencies := append(queryLatencies, writeLatencies...)
|
||||
if len(allLatencies) > 0 {
|
||||
result.AvgLatency = calculateAvgLatency(allLatencies)
|
||||
result.P90Latency = calculatePercentileLatency(allLatencies, 0.90)
|
||||
result.P95Latency = calculatePercentileLatency(allLatencies, 0.95)
|
||||
result.P99Latency = calculatePercentileLatency(allLatencies, 0.99)
|
||||
result.Bottom10Avg = calculateBottom10Avg(allLatencies)
|
||||
}
|
||||
|
||||
result.SuccessRate = 100.0 // No specific target
|
||||
|
||||
for _, err := range errors {
|
||||
result.Errors = append(result.Errors, err.Error())
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
b.results = append(b.results, result)
|
||||
b.mu.Unlock()
|
||||
|
||||
// Calculate separate metrics for queries and writes
|
||||
var queryAvg, writeAvg time.Duration
|
||||
if len(queryLatencies) > 0 {
|
||||
queryAvg = calculateAvgLatency(queryLatencies)
|
||||
}
|
||||
if len(writeLatencies) > 0 {
|
||||
writeAvg = calculateAvgLatency(writeLatencies)
|
||||
}
|
||||
|
||||
fmt.Printf(
|
||||
"Concurrent test completed: %d operations (%d queries, %d writes) in %v\n",
|
||||
totalOps, totalQueries, totalWrites, duration,
|
||||
)
|
||||
fmt.Printf("Operations/sec: %.2f\n", result.EventsPerSecond)
|
||||
fmt.Printf("Avg latency: %v\n", result.AvgLatency)
|
||||
fmt.Printf("Avg query latency: %v\n", queryAvg)
|
||||
fmt.Printf("Avg write latency: %v\n", writeAvg)
|
||||
fmt.Printf("P95 latency: %v\n", result.P95Latency)
|
||||
fmt.Printf("P99 latency: %v\n", result.P99Latency)
|
||||
}
|
||||
|
||||
func (b *Benchmark) generateEvents(count int) []*event.E {
|
||||
events := make([]*event.E, count)
|
||||
now := timestamp.Now()
|
||||
|
||||
Reference in New Issue
Block a user