From 58a9e830387e6c7a766f15bb65c92e79a2af3cb8 Mon Sep 17 00:00:00 2001 From: mleku Date: Sat, 20 Sep 2025 03:48:50 +0100 Subject: [PATCH] Refactor `publishCacheEvents` and `publisherWorker` to use fire-and-forget publishing. - Replaced `Publish` calls with direct event envelope writes, removing wait-for-OK behavior. - Simplified `publishCacheEvents` logic, removed per-publish timeout contexts, and updated return values. - Adjusted log messages to reflect "sent" instead of "published." - Enhanced relay stability with delays between successful publishes. - Removed unused `publishTimeout` parameter from `publisherWorker` and main logic. --- cmd/stresstest/main.go | 86 +++++++++++++++++++----------------------- 1 file changed, 39 insertions(+), 47 deletions(-) diff --git a/cmd/stresstest/main.go b/cmd/stresstest/main.go index 82462a4..532a9f6 100644 --- a/cmd/stresstest/main.go +++ b/cmd/stresstest/main.go @@ -16,6 +16,7 @@ import ( "lol.mleku.dev/log" "next.orly.dev/pkg/crypto/p256k" + "next.orly.dev/pkg/encoders/envelopes/eventenvelope" "next.orly.dev/pkg/encoders/event" "next.orly.dev/pkg/encoders/event/examples" "next.orly.dev/pkg/encoders/filter" @@ -139,17 +140,16 @@ func loadCacheAndIndex() (*CacheIndex, error) { return idx, nil } -// publishCacheEvents uploads all cache events to the relay, waiting for OKs +// publishCacheEvents uploads all cache events to the relay without waiting for OKs func publishCacheEvents( ctx context.Context, rc *RelayConn, idx *CacheIndex, - publishTimeout time.Duration, -) (okCount int) { +) (sentCount int) { // Use an index-based loop so we can truly retry the same event on transient errors for i := 0; i < len(idx.events); i++ { // allow cancellation select { case <-ctx.Done(): - return okCount + return sentCount default: } ev := idx.events[i] @@ -160,26 +160,15 @@ func publishCacheEvents( i-- continue } - // Create per-publish timeout context and add diagnostics to understand cancellations - pubCtx, cancel := context.WithTimeout(ctx, publishTimeout) - if dl, ok := pubCtx.Deadline(); ok { - log.T.F( - "cache publish %d/%d: deadline=%s now=%s remain=%s", - i+1, len(idx.events), dl.Format(time.RFC3339Nano), - time.Now().Format(time.RFC3339Nano), - time.Until(dl).Truncate(time.Millisecond), - ) - } - err := client.Publish(pubCtx, ev) - log.I.F("event: %s", ev.Serialize()) - // it's safe to cancel our per-publish context after Publish returns - cancel() - if err != nil { - log.E.F("cache publish error: %v (ctxErr=%v)", err, pubCtx.Err()) + + log.T.F("cache publish %d/%d", i+1, len(idx.events)) + // Send event without waiting for OK response (fire-and-forget) + envelope := eventenvelope.NewSubmissionWith(ev) + envBytes := envelope.Marshal(nil) + if err := <-client.Write(envBytes); err != nil { + log.E.F("cache write error: %v", err) errStr := err.Error() - if strings.Contains(errStr, "connection closed") || - strings.Contains(errStr, "context deadline exceeded") || - strings.Contains(errStr, "given up waiting for an OK") { + if strings.Contains(errStr, "connection closed") { _ = rc.Reconnect(ctx) // retry this event by decrementing i so the for-loop will attempt it again i-- @@ -189,9 +178,18 @@ func publishCacheEvents( time.Sleep(50 * time.Millisecond) continue } - okCount++ + log.I.F("event: %s", ev.Serialize()) + sentCount++ + + // Add a longer delay between successful publishes to prevent overwhelming the relay + // This helps maintain stable connections during sustained cache publishing + select { + case <-time.After(100 * time.Millisecond): + case <-ctx.Done(): + return sentCount + } } - return okCount + return sentCount } // buildRandomFilter builds a filter combining random subsets of id, author, timestamp, and a single-letter tag value. @@ -253,7 +251,6 @@ func buildRandomFilter(idx *CacheIndex, rng *rand.Rand, mask int) *filter.F { func publisherWorker( ctx context.Context, rc *RelayConn, id int, stats *uint64, - publishTimeout time.Duration, ) { // Unique RNG per worker src := rand.NewSource(time.Now().UnixNano() ^ int64(id<<16)) @@ -278,23 +275,19 @@ func publisherWorker( return } - // Publish waits for OK (ws.Client.Publish does the waiting) + // Send event without waiting for OK response (fire-and-forget) client := rc.Get() if client == nil { _ = rc.Reconnect(ctx) continue } - // per-publish timeout context - pubCtx, cancel := context.WithTimeout(ctx, publishTimeout) - if err := client.Publish(pubCtx, ev); err != nil { - cancel() - log.E.F("worker %d: publish error: %v", id, err) + // Create EVENT envelope and send directly without waiting for OK + envelope := eventenvelope.NewSubmissionWith(ev) + envBytes := envelope.Marshal(nil) + if err := <-client.Write(envBytes); err != nil { + log.E.F("worker %d: write error: %v", id, err) errStr := err.Error() - if strings.Contains( - errStr, "connection closed", - ) || strings.Contains( - errStr, "context deadline exceeded", - ) || strings.Contains(errStr, "given up waiting for an OK") { + if strings.Contains(errStr, "connection closed") { for attempt := 0; attempt < 5; attempt++ { if ctx.Err() != nil { return @@ -318,7 +311,6 @@ func publisherWorker( } continue } - cancel() atomic.AddUint64(stats, 1) @@ -523,11 +515,11 @@ func main() { if err != nil { log.E.F("failed to load examples.Cache: %v", err) } - cachePublished := 0 + cacheSent := 0 if !skipCache && idx != nil && len(idx.events) > 0 { - log.I.F("uploading %d events from examples.Cache...", len(idx.events)) - cachePublished = publishCacheEvents(ctx, rc, idx, publishTimeout) - log.I.F("uploaded %d/%d cache events", cachePublished, len(idx.events)) + log.I.F("sending %d events from examples.Cache...", len(idx.events)) + cacheSent = publishCacheEvents(ctx, rc, idx) + log.I.F("sent %d/%d cache events", cacheSent, len(idx.events)) } var pubOK uint64 @@ -547,7 +539,7 @@ func main() { i := i go func() { defer wg.Done() - publisherWorker(ctx, rc, i, &pubOK, publishTimeout) + publisherWorker(ctx, rc, i, &pubOK) }() } // Start query workers @@ -581,8 +573,8 @@ loop: qc := atomic.LoadUint64(&qCount) qr := atomic.LoadUint64(&qResults) log.I.F( - "elapsed=%.1fs published_ok=%d (%.0f/s) received=%d cache_published=%d queries=%d results=%d", - elapsed, p, float64(p)/elapsed, r, cachePublished, qc, qr, + "elapsed=%.1fs sent=%d (%.0f/s) received=%d cache_sent=%d queries=%d results=%d", + elapsed, p, float64(p)/elapsed, r, cacheSent, qc, qr, ) case <-end.C: break loop @@ -598,8 +590,8 @@ loop: qc := atomic.LoadUint64(&qCount) qr := atomic.LoadUint64(&qResults) log.I.F( - "stresstest complete: cache_published=%d published_ok=%d received=%d queries=%d results=%d duration=%s", - cachePublished, p, r, qc, qr, + "stresstest complete: cache_sent=%d sent=%d received=%d queries=%d results=%d duration=%s", + cacheSent, p, r, qc, qr, time.Since(start).Truncate(time.Millisecond), ) }