Optimize deletion timestamp lookup by replacing sorting logic with linear scan to improve performance. Add profiling support with cmd/benchmark/profile.sh, introduce network load testing in benchmarks, and update benchmark reports with additional latency metrics (P90, bottom 10%).

This commit is contained in:
2025-09-12 23:47:53 +01:00
parent fefa4d202e
commit c45276ef08
14 changed files with 1788 additions and 63 deletions

View File

@@ -9,16 +9,20 @@ import (
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"time"
"next.orly.dev/pkg/crypto/p256k"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/encoders/envelopes/eventenvelope"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/filter"
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag"
"next.orly.dev/pkg/encoders/timestamp"
"next.orly.dev/pkg/protocol/ws"
)
type BenchmarkConfig struct {
@@ -28,6 +32,11 @@ type BenchmarkConfig struct {
TestDuration time.Duration
BurstPattern bool
ReportInterval time.Duration
// Network load options
RelayURL string
NetWorkers int
NetRate int // events/sec per worker
}
type BenchmarkResult struct {
@@ -36,8 +45,10 @@ type BenchmarkResult struct {
TotalEvents int
EventsPerSecond float64
AvgLatency time.Duration
P90Latency time.Duration
P95Latency time.Duration
P99Latency time.Duration
Bottom10Avg time.Duration
SuccessRate float64
ConcurrentWorkers int
MemoryUsed uint64
@@ -54,6 +65,12 @@ type Benchmark struct {
func main() {
config := parseFlags()
if config.RelayURL != "" {
// Network mode: connect to relay and generate traffic
runNetworkLoad(config)
return
}
fmt.Printf("Starting Nostr Relay Benchmark\n")
fmt.Printf("Data Directory: %s\n", config.DataDir)
fmt.Printf(
@@ -64,13 +81,12 @@ func main() {
benchmark := NewBenchmark(config)
defer benchmark.Close()
// Run benchmark tests
benchmark.RunPeakThroughputTest()
benchmark.RunBurstPatternTest()
benchmark.RunMixedReadWriteTest()
// Run benchmark suite twice with pauses
benchmark.RunSuite()
// Generate report
// Generate reports
benchmark.GenerateReport()
benchmark.GenerateAsciidocReport()
}
func parseFlags() *BenchmarkConfig {
@@ -80,7 +96,7 @@ func parseFlags() *BenchmarkConfig {
&config.DataDir, "datadir", "/tmp/benchmark_db", "Database directory",
)
flag.IntVar(
&config.NumEvents, "events", 10000, "Number of events to generate",
&config.NumEvents, "events", 100000, "Number of events to generate",
)
flag.IntVar(
&config.ConcurrentWorkers, "workers", runtime.NumCPU(),
@@ -97,10 +113,142 @@ func parseFlags() *BenchmarkConfig {
"Report interval",
)
// Network mode flags
flag.StringVar(
&config.RelayURL, "relay-url", "",
"Relay WebSocket URL (enables network mode if set)",
)
flag.IntVar(
&config.NetWorkers, "net-workers", runtime.NumCPU(),
"Network workers (connections)",
)
flag.IntVar(&config.NetRate, "net-rate", 20, "Events per second per worker")
flag.Parse()
return config
}
func runNetworkLoad(cfg *BenchmarkConfig) {
fmt.Printf(
"Network mode: relay=%s workers=%d rate=%d ev/s per worker duration=%s\n",
cfg.RelayURL, cfg.NetWorkers, cfg.NetRate, cfg.TestDuration,
)
ctx, cancel := context.WithTimeout(context.Background(), cfg.TestDuration)
defer cancel()
var wg sync.WaitGroup
if cfg.NetWorkers <= 0 {
cfg.NetWorkers = 1
}
if cfg.NetRate <= 0 {
cfg.NetRate = 1
}
for i := 0; i < cfg.NetWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
// Connect to relay
rl, err := ws.RelayConnect(ctx, cfg.RelayURL)
if err != nil {
fmt.Printf(
"worker %d: failed to connect to %s: %v\n", workerID,
cfg.RelayURL, err,
)
return
}
defer rl.Close()
fmt.Printf("worker %d: connected to %s\n", workerID, cfg.RelayURL)
// Signer for this worker
var keys p256k.Signer
if err := keys.Generate(); err != nil {
fmt.Printf("worker %d: keygen failed: %v\n", workerID, err)
return
}
// Start a concurrent subscriber that listens for events published by this worker
// Build a filter that matches this worker's pubkey and kind=1, since now
since := time.Now().Unix()
go func() {
f := filter.New()
f.Kinds = kind.NewS(kind.TextNote)
f.Authors = tag.NewWithCap(1)
f.Authors.T = append(f.Authors.T, keys.Pub())
f.Since = timestamp.FromUnix(since)
sub, err := rl.Subscribe(ctx, filter.NewS(f))
if err != nil {
fmt.Printf("worker %d: subscribe error: %v\n", workerID, err)
return
}
defer sub.Unsub()
recv := 0
for {
select {
case <-ctx.Done():
fmt.Printf("worker %d: subscriber exiting after %d events\n", workerID, recv)
return
case <-sub.EndOfStoredEvents:
// continue streaming live events
case ev := <-sub.Events:
if ev == nil {
continue
}
recv++
if recv%100 == 0 {
fmt.Printf("worker %d: received %d matching events\n", workerID, recv)
}
ev.Free()
}
}
}()
interval := time.Second / time.Duration(cfg.NetRate)
ticker := time.NewTicker(interval)
defer ticker.Stop()
count := 0
for {
select {
case <-ctx.Done():
fmt.Printf(
"worker %d: stopping after %d publishes\n", workerID,
count,
)
return
case <-ticker.C:
// Build and sign a simple text note event
ev := event.New()
ev.Kind = uint16(1)
ev.CreatedAt = time.Now().Unix()
ev.Tags = tag.NewS()
ev.Content = []byte(fmt.Sprintf(
"bench worker=%d n=%d", workerID, count,
))
if err := ev.Sign(&keys); err != nil {
fmt.Printf("worker %d: sign error: %v\n", workerID, err)
ev.Free()
continue
}
// Async publish: don't wait for OK; this greatly increases throughput
ch := rl.Write(eventenvelope.NewSubmissionWith(ev).Marshal(nil))
// Non-blocking error check
select {
case err := <-ch:
if err != nil {
fmt.Printf("worker %d: write error: %v\n", workerID, err)
}
default:
}
if count%100 == 0 {
fmt.Printf("worker %d: sent %d events\n", workerID, count)
}
ev.Free()
count++
}
}
}(i)
}
wg.Wait()
}
func NewBenchmark(config *BenchmarkConfig) *Benchmark {
// Clean up existing data directory
os.RemoveAll(config.DataDir)
@@ -113,11 +261,16 @@ func NewBenchmark(config *BenchmarkConfig) *Benchmark {
log.Fatalf("Failed to create database: %v", err)
}
return &Benchmark{
b := &Benchmark{
config: config,
db: db,
results: make([]*BenchmarkResult, 0),
}
// Trigger compaction/GC before starting tests
b.compactDatabase()
return b
}
func (b *Benchmark) Close() {
@@ -126,6 +279,32 @@ func (b *Benchmark) Close() {
}
}
// RunSuite runs the three tests with a 10s pause between them and repeats the
// set twice with a 10s pause between rounds.
func (b *Benchmark) RunSuite() {
for round := 1; round <= 2; round++ {
fmt.Printf("\n=== Starting test round %d/2 ===\n", round)
b.RunPeakThroughputTest()
time.Sleep(10 * time.Second)
b.RunBurstPatternTest()
time.Sleep(10 * time.Second)
b.RunMixedReadWriteTest()
if round < 2 {
fmt.Println("\nPausing 10s before next round...")
time.Sleep(10 * time.Second)
}
}
}
// compactDatabase triggers a Badger value log GC before starting tests.
func (b *Benchmark) compactDatabase() {
if b.db == nil || b.db.DB == nil {
return
}
// Attempt value log GC. Ignore errors; this is best-effort.
_ = b.db.DB.RunValueLogGC(0.5)
}
func (b *Benchmark) RunPeakThroughputTest() {
fmt.Println("\n=== Peak Throughput Test ===")
@@ -185,8 +364,10 @@ func (b *Benchmark) RunPeakThroughputTest() {
if len(latencies) > 0 {
result.AvgLatency = calculateAvgLatency(latencies)
result.P90Latency = calculatePercentileLatency(latencies, 0.90)
result.P95Latency = calculatePercentileLatency(latencies, 0.95)
result.P99Latency = calculatePercentileLatency(latencies, 0.99)
result.Bottom10Avg = calculateBottom10Avg(latencies)
}
result.SuccessRate = float64(totalEvents) / float64(b.config.NumEvents) * 100
@@ -206,8 +387,10 @@ func (b *Benchmark) RunPeakThroughputTest() {
fmt.Printf("Duration: %v\n", duration)
fmt.Printf("Events/sec: %.2f\n", result.EventsPerSecond)
fmt.Printf("Avg latency: %v\n", result.AvgLatency)
fmt.Printf("P90 latency: %v\n", result.P90Latency)
fmt.Printf("P95 latency: %v\n", result.P95Latency)
fmt.Printf("P99 latency: %v\n", result.P99Latency)
fmt.Printf("Bottom 10%% Avg latency: %v\n", result.Bottom10Avg)
}
func (b *Benchmark) RunBurstPatternTest() {
@@ -282,8 +465,10 @@ func (b *Benchmark) RunBurstPatternTest() {
if len(latencies) > 0 {
result.AvgLatency = calculateAvgLatency(latencies)
result.P90Latency = calculatePercentileLatency(latencies, 0.90)
result.P95Latency = calculatePercentileLatency(latencies, 0.95)
result.P99Latency = calculatePercentileLatency(latencies, 0.99)
result.Bottom10Avg = calculateBottom10Avg(latencies)
}
result.SuccessRate = float64(totalEvents) / float64(eventIndex) * 100
@@ -387,8 +572,10 @@ func (b *Benchmark) RunMixedReadWriteTest() {
allLatencies := append(writeLatencies, readLatencies...)
if len(allLatencies) > 0 {
result.AvgLatency = calculateAvgLatency(allLatencies)
result.P90Latency = calculatePercentileLatency(allLatencies, 0.90)
result.P95Latency = calculatePercentileLatency(allLatencies, 0.95)
result.P99Latency = calculatePercentileLatency(allLatencies, 0.99)
result.Bottom10Avg = calculateBottom10Avg(allLatencies)
}
result.SuccessRate = float64(totalWrites+totalReads) / float64(len(events)) * 100
@@ -460,8 +647,10 @@ func (b *Benchmark) GenerateReport() {
fmt.Printf("Concurrent Workers: %d\n", result.ConcurrentWorkers)
fmt.Printf("Memory Used: %d MB\n", result.MemoryUsed/(1024*1024))
fmt.Printf("Avg Latency: %v\n", result.AvgLatency)
fmt.Printf("P90 Latency: %v\n", result.P90Latency)
fmt.Printf("P95 Latency: %v\n", result.P95Latency)
fmt.Printf("P99 Latency: %v\n", result.P99Latency)
fmt.Printf("Bottom 10%% Avg Latency: %v\n", result.Bottom10Avg)
if len(result.Errors) > 0 {
fmt.Printf("Errors (%d):\n", len(result.Errors))
@@ -524,8 +713,14 @@ func (b *Benchmark) saveReportToFile(path string) error {
),
)
file.WriteString(fmt.Sprintf("Avg Latency: %v\n", result.AvgLatency))
file.WriteString(fmt.Sprintf("P90 Latency: %v\n", result.P90Latency))
file.WriteString(fmt.Sprintf("P95 Latency: %v\n", result.P95Latency))
file.WriteString(fmt.Sprintf("P99 Latency: %v\n", result.P99Latency))
file.WriteString(
fmt.Sprintf(
"Bottom 10%% Avg Latency: %v\n", result.Bottom10Avg,
),
)
file.WriteString(
fmt.Sprintf(
"Memory: %d MB\n", result.MemoryUsed/(1024*1024),
@@ -537,6 +732,41 @@ func (b *Benchmark) saveReportToFile(path string) error {
return nil
}
// GenerateAsciidocReport creates a simple AsciiDoc report alongside the text report.
func (b *Benchmark) GenerateAsciidocReport() error {
path := filepath.Join(b.config.DataDir, "benchmark_report.adoc")
file, err := os.Create(path)
if err != nil {
return err
}
defer file.Close()
file.WriteString("= NOSTR Relay Benchmark Results\n\n")
file.WriteString(
fmt.Sprintf(
"Generated: %s\n\n", time.Now().Format(time.RFC3339),
),
)
file.WriteString("[cols=\"1,^1,^1,^1,^1,^1\",options=\"header\"]\n")
file.WriteString("|===\n")
file.WriteString("| Test | Events/sec | Avg Latency | P90 | P95 | Bottom 10% Avg\n")
b.mu.RLock()
defer b.mu.RUnlock()
for _, r := range b.results {
file.WriteString(fmt.Sprintf("| %s\n", r.TestName))
file.WriteString(fmt.Sprintf("| %.2f\n", r.EventsPerSecond))
file.WriteString(fmt.Sprintf("| %v\n", r.AvgLatency))
file.WriteString(fmt.Sprintf("| %v\n", r.P90Latency))
file.WriteString(fmt.Sprintf("| %v\n", r.P95Latency))
file.WriteString(fmt.Sprintf("| %v\n", r.Bottom10Avg))
}
file.WriteString("|===\n")
fmt.Printf("AsciiDoc report saved to: %s\n", path)
return nil
}
// Helper functions
func calculateAvgLatency(latencies []time.Duration) time.Duration {
@@ -557,13 +787,48 @@ func calculatePercentileLatency(
if len(latencies) == 0 {
return 0
}
// Simple percentile calculation - in production would sort first
index := int(float64(len(latencies)) * percentile)
if index >= len(latencies) {
index = len(latencies) - 1
// Sort a copy to avoid mutating caller slice
copySlice := make([]time.Duration, len(latencies))
copy(copySlice, latencies)
sort.Slice(
copySlice, func(i, j int) bool { return copySlice[i] < copySlice[j] },
)
index := int(float64(len(copySlice)-1) * percentile)
if index < 0 {
index = 0
}
return latencies[index]
if index >= len(copySlice) {
index = len(copySlice) - 1
}
return copySlice[index]
}
// calculateBottom10Avg returns the average latency of the slowest 10% of samples.
func calculateBottom10Avg(latencies []time.Duration) time.Duration {
if len(latencies) == 0 {
return 0
}
copySlice := make([]time.Duration, len(latencies))
copy(copySlice, latencies)
sort.Slice(
copySlice, func(i, j int) bool { return copySlice[i] < copySlice[j] },
)
start := int(float64(len(copySlice)) * 0.9)
if start < 0 {
start = 0
}
if start >= len(copySlice) {
start = len(copySlice) - 1
}
var total time.Duration
for i := start; i < len(copySlice); i++ {
total += copySlice[i]
}
count := len(copySlice) - start
if count <= 0 {
return 0
}
return total / time.Duration(count)
}
func getMemUsage() uint64 {