Refactor publishCacheEvents and publisherWorker to use fire-and-forget publishing.

- Replaced `Publish` calls with direct event envelope writes, removing wait-for-OK behavior.
- Simplified `publishCacheEvents` logic, removed per-publish timeout contexts, and updated return values.
- Adjusted log messages to reflect "sent" instead of "published."
- Enhanced relay stability with delays between successful publishes.
- Removed unused `publishTimeout` parameter from `publisherWorker` and main logic.
This commit is contained in:
2025-09-20 03:48:50 +01:00
parent 22cde96f3f
commit 58a9e83038

View File

@@ -16,6 +16,7 @@ import (
"lol.mleku.dev/log" "lol.mleku.dev/log"
"next.orly.dev/pkg/crypto/p256k" "next.orly.dev/pkg/crypto/p256k"
"next.orly.dev/pkg/encoders/envelopes/eventenvelope"
"next.orly.dev/pkg/encoders/event" "next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/event/examples" "next.orly.dev/pkg/encoders/event/examples"
"next.orly.dev/pkg/encoders/filter" "next.orly.dev/pkg/encoders/filter"
@@ -139,17 +140,16 @@ func loadCacheAndIndex() (*CacheIndex, error) {
return idx, nil return idx, nil
} }
// publishCacheEvents uploads all cache events to the relay, waiting for OKs // publishCacheEvents uploads all cache events to the relay without waiting for OKs
func publishCacheEvents( func publishCacheEvents(
ctx context.Context, rc *RelayConn, idx *CacheIndex, ctx context.Context, rc *RelayConn, idx *CacheIndex,
publishTimeout time.Duration, ) (sentCount int) {
) (okCount int) {
// Use an index-based loop so we can truly retry the same event on transient errors // Use an index-based loop so we can truly retry the same event on transient errors
for i := 0; i < len(idx.events); i++ { for i := 0; i < len(idx.events); i++ {
// allow cancellation // allow cancellation
select { select {
case <-ctx.Done(): case <-ctx.Done():
return okCount return sentCount
default: default:
} }
ev := idx.events[i] ev := idx.events[i]
@@ -160,26 +160,15 @@ func publishCacheEvents(
i-- i--
continue continue
} }
// Create per-publish timeout context and add diagnostics to understand cancellations
pubCtx, cancel := context.WithTimeout(ctx, publishTimeout) log.T.F("cache publish %d/%d", i+1, len(idx.events))
if dl, ok := pubCtx.Deadline(); ok { // Send event without waiting for OK response (fire-and-forget)
log.T.F( envelope := eventenvelope.NewSubmissionWith(ev)
"cache publish %d/%d: deadline=%s now=%s remain=%s", envBytes := envelope.Marshal(nil)
i+1, len(idx.events), dl.Format(time.RFC3339Nano), if err := <-client.Write(envBytes); err != nil {
time.Now().Format(time.RFC3339Nano), log.E.F("cache write error: %v", err)
time.Until(dl).Truncate(time.Millisecond),
)
}
err := client.Publish(pubCtx, ev)
log.I.F("event: %s", ev.Serialize())
// it's safe to cancel our per-publish context after Publish returns
cancel()
if err != nil {
log.E.F("cache publish error: %v (ctxErr=%v)", err, pubCtx.Err())
errStr := err.Error() errStr := err.Error()
if strings.Contains(errStr, "connection closed") || if strings.Contains(errStr, "connection closed") {
strings.Contains(errStr, "context deadline exceeded") ||
strings.Contains(errStr, "given up waiting for an OK") {
_ = rc.Reconnect(ctx) _ = rc.Reconnect(ctx)
// retry this event by decrementing i so the for-loop will attempt it again // retry this event by decrementing i so the for-loop will attempt it again
i-- i--
@@ -189,9 +178,18 @@ func publishCacheEvents(
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
continue continue
} }
okCount++ log.I.F("event: %s", ev.Serialize())
sentCount++
// Add a longer delay between successful publishes to prevent overwhelming the relay
// This helps maintain stable connections during sustained cache publishing
select {
case <-time.After(100 * time.Millisecond):
case <-ctx.Done():
return sentCount
}
} }
return okCount return sentCount
} }
// buildRandomFilter builds a filter combining random subsets of id, author, timestamp, and a single-letter tag value. // buildRandomFilter builds a filter combining random subsets of id, author, timestamp, and a single-letter tag value.
@@ -253,7 +251,6 @@ func buildRandomFilter(idx *CacheIndex, rng *rand.Rand, mask int) *filter.F {
func publisherWorker( func publisherWorker(
ctx context.Context, rc *RelayConn, id int, stats *uint64, ctx context.Context, rc *RelayConn, id int, stats *uint64,
publishTimeout time.Duration,
) { ) {
// Unique RNG per worker // Unique RNG per worker
src := rand.NewSource(time.Now().UnixNano() ^ int64(id<<16)) src := rand.NewSource(time.Now().UnixNano() ^ int64(id<<16))
@@ -278,23 +275,19 @@ func publisherWorker(
return return
} }
// Publish waits for OK (ws.Client.Publish does the waiting) // Send event without waiting for OK response (fire-and-forget)
client := rc.Get() client := rc.Get()
if client == nil { if client == nil {
_ = rc.Reconnect(ctx) _ = rc.Reconnect(ctx)
continue continue
} }
// per-publish timeout context // Create EVENT envelope and send directly without waiting for OK
pubCtx, cancel := context.WithTimeout(ctx, publishTimeout) envelope := eventenvelope.NewSubmissionWith(ev)
if err := client.Publish(pubCtx, ev); err != nil { envBytes := envelope.Marshal(nil)
cancel() if err := <-client.Write(envBytes); err != nil {
log.E.F("worker %d: publish error: %v", id, err) log.E.F("worker %d: write error: %v", id, err)
errStr := err.Error() errStr := err.Error()
if strings.Contains( if strings.Contains(errStr, "connection closed") {
errStr, "connection closed",
) || strings.Contains(
errStr, "context deadline exceeded",
) || strings.Contains(errStr, "given up waiting for an OK") {
for attempt := 0; attempt < 5; attempt++ { for attempt := 0; attempt < 5; attempt++ {
if ctx.Err() != nil { if ctx.Err() != nil {
return return
@@ -318,7 +311,6 @@ func publisherWorker(
} }
continue continue
} }
cancel()
atomic.AddUint64(stats, 1) atomic.AddUint64(stats, 1)
@@ -523,11 +515,11 @@ func main() {
if err != nil { if err != nil {
log.E.F("failed to load examples.Cache: %v", err) log.E.F("failed to load examples.Cache: %v", err)
} }
cachePublished := 0 cacheSent := 0
if !skipCache && idx != nil && len(idx.events) > 0 { if !skipCache && idx != nil && len(idx.events) > 0 {
log.I.F("uploading %d events from examples.Cache...", len(idx.events)) log.I.F("sending %d events from examples.Cache...", len(idx.events))
cachePublished = publishCacheEvents(ctx, rc, idx, publishTimeout) cacheSent = publishCacheEvents(ctx, rc, idx)
log.I.F("uploaded %d/%d cache events", cachePublished, len(idx.events)) log.I.F("sent %d/%d cache events", cacheSent, len(idx.events))
} }
var pubOK uint64 var pubOK uint64
@@ -547,7 +539,7 @@ func main() {
i := i i := i
go func() { go func() {
defer wg.Done() defer wg.Done()
publisherWorker(ctx, rc, i, &pubOK, publishTimeout) publisherWorker(ctx, rc, i, &pubOK)
}() }()
} }
// Start query workers // Start query workers
@@ -581,8 +573,8 @@ loop:
qc := atomic.LoadUint64(&qCount) qc := atomic.LoadUint64(&qCount)
qr := atomic.LoadUint64(&qResults) qr := atomic.LoadUint64(&qResults)
log.I.F( log.I.F(
"elapsed=%.1fs published_ok=%d (%.0f/s) received=%d cache_published=%d queries=%d results=%d", "elapsed=%.1fs sent=%d (%.0f/s) received=%d cache_sent=%d queries=%d results=%d",
elapsed, p, float64(p)/elapsed, r, cachePublished, qc, qr, elapsed, p, float64(p)/elapsed, r, cacheSent, qc, qr,
) )
case <-end.C: case <-end.C:
break loop break loop
@@ -598,8 +590,8 @@ loop:
qc := atomic.LoadUint64(&qCount) qc := atomic.LoadUint64(&qCount)
qr := atomic.LoadUint64(&qResults) qr := atomic.LoadUint64(&qResults)
log.I.F( log.I.F(
"stresstest complete: cache_published=%d published_ok=%d received=%d queries=%d results=%d duration=%s", "stresstest complete: cache_sent=%d sent=%d received=%d queries=%d results=%d duration=%s",
cachePublished, p, r, qc, qr, cacheSent, p, r, qc, qr,
time.Since(start).Truncate(time.Millisecond), time.Since(start).Truncate(time.Millisecond),
) )
} }