From 81a40c04e5d2d89712bd8e06582c160d13e1e1ed Mon Sep 17 00:00:00 2001 From: mleku Date: Sat, 20 Sep 2025 04:10:59 +0100 Subject: [PATCH] Refactor `publishCacheEvents` for concurrent publishing and optimize database access. - Updated `publishCacheEvents` to utilize multiple concurrent connections for event publishing. - Introduced worker-based architecture leveraging `runtime.NumCPU` for parallel uploads. - Optimized database fetch logic in `FetchEventsBySerials` for improved maintainability and performance. - Bumped version to `v0.4.8`. --- cmd/stresstest/main.go | 133 +++++++++++++++--------- pkg/database/fetch-events-by-serials.go | 71 ++++++++----- pkg/version/version | 2 +- 3 files changed, 131 insertions(+), 75 deletions(-) diff --git a/cmd/stresstest/main.go b/cmd/stresstest/main.go index 532a9f6..f559aa3 100644 --- a/cmd/stresstest/main.go +++ b/cmd/stresstest/main.go @@ -9,6 +9,7 @@ import ( "math/rand" "os" "os/signal" + "runtime" "strings" "sync" "sync/atomic" @@ -140,56 +141,92 @@ func loadCacheAndIndex() (*CacheIndex, error) { return idx, nil } -// publishCacheEvents uploads all cache events to the relay without waiting for OKs +// publishCacheEvents uploads all cache events to the relay using multiple concurrent connections func publishCacheEvents( - ctx context.Context, rc *RelayConn, idx *CacheIndex, + ctx context.Context, relayURL string, idx *CacheIndex, ) (sentCount int) { - // Use an index-based loop so we can truly retry the same event on transient errors - for i := 0; i < len(idx.events); i++ { - // allow cancellation - select { - case <-ctx.Done(): - return sentCount - default: - } - ev := idx.events[i] - client := rc.Get() - if client == nil { - _ = rc.Reconnect(ctx) - // retry same index - i-- - continue - } - - log.T.F("cache publish %d/%d", i+1, len(idx.events)) - // Send event without waiting for OK response (fire-and-forget) - envelope := eventenvelope.NewSubmissionWith(ev) - envBytes := envelope.Marshal(nil) - if err := <-client.Write(envBytes); err != nil { - log.E.F("cache write error: %v", err) - errStr := err.Error() - if strings.Contains(errStr, "connection closed") { - _ = rc.Reconnect(ctx) - // retry this event by decrementing i so the for-loop will attempt it again - i-- - continue - } - // small backoff and move on to next event on non-transient errors - time.Sleep(50 * time.Millisecond) - continue - } - log.I.F("event: %s", ev.Serialize()) - sentCount++ - - // Add a longer delay between successful publishes to prevent overwhelming the relay - // This helps maintain stable connections during sustained cache publishing - select { - case <-time.After(100 * time.Millisecond): - case <-ctx.Done(): - return sentCount - } + numWorkers := runtime.NumCPU() + log.I.F("using %d concurrent connections for cache upload", numWorkers) + + // Channel to distribute events to workers + eventChan := make(chan *event.E, len(idx.events)) + var totalSent atomic.Int64 + + // Fill the event channel + for _, ev := range idx.events { + eventChan <- ev } - return sentCount + close(eventChan) + + // Start worker goroutines + var wg sync.WaitGroup + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + // Create separate connection for this worker + client, err := ws.RelayConnect(ctx, relayURL) + if err != nil { + log.E.F("worker %d: failed to connect: %v", workerID, err) + return + } + defer client.Close() + + rc := &RelayConn{client: client, url: relayURL} + workerSent := 0 + + // Process events from the channel + for ev := range eventChan { + select { + case <-ctx.Done(): + return + default: + } + + // Get client connection + wsClient := rc.Get() + if wsClient == nil { + if err := rc.Reconnect(ctx); err != nil { + log.E.F("worker %d: reconnect failed: %v", workerID, err) + continue + } + wsClient = rc.Get() + } + + // Send event without waiting for OK response (fire-and-forget) + envelope := eventenvelope.NewSubmissionWith(ev) + envBytes := envelope.Marshal(nil) + if err := <-wsClient.Write(envBytes); err != nil { + log.E.F("worker %d: write error: %v", workerID, err) + errStr := err.Error() + if strings.Contains(errStr, "connection closed") { + _ = rc.Reconnect(ctx) + } + time.Sleep(50 * time.Millisecond) + continue + } + + workerSent++ + totalSent.Add(1) + log.T.F("worker %d: sent event %d (total: %d)", workerID, workerSent, totalSent.Load()) + + // Small delay to prevent overwhelming the relay + select { + case <-time.After(10 * time.Millisecond): + case <-ctx.Done(): + return + } + } + + log.I.F("worker %d: completed, sent %d events", workerID, workerSent) + }(i) + } + + // Wait for all workers to complete + wg.Wait() + + return int(totalSent.Load()) } // buildRandomFilter builds a filter combining random subsets of id, author, timestamp, and a single-letter tag value. @@ -518,7 +555,7 @@ func main() { cacheSent := 0 if !skipCache && idx != nil && len(idx.events) > 0 { log.I.F("sending %d events from examples.Cache...", len(idx.events)) - cacheSent = publishCacheEvents(ctx, rc, idx) + cacheSent = publishCacheEvents(ctx, relayURL, idx) log.I.F("sent %d/%d cache events", cacheSent, len(idx.events)) } diff --git a/pkg/database/fetch-events-by-serials.go b/pkg/database/fetch-events-by-serials.go index b27a6cf..aab0e2e 100644 --- a/pkg/database/fetch-events-by-serials.go +++ b/pkg/database/fetch-events-by-serials.go @@ -13,34 +13,38 @@ import ( "next.orly.dev/pkg/encoders/event" ) -// FetchEventsBySerials processes multiple serials in ascending order and retrieves +// FetchEventsBySerials processes multiple serials in ascending order and retrieves // the corresponding events from the database. It optimizes database access by // sorting the serials and seeking to each one sequentially. -func (d *D) FetchEventsBySerials(serials []*types.Uint40) (evMap map[string]*event.E, err error) { +func (d *D) FetchEventsBySerials(serials []*types.Uint40) ( + evMap map[string]*event.E, err error, +) { log.T.F("FetchEventsBySerials: processing %d serials", len(serials)) - + // Initialize the result map evMap = make(map[string]*event.E) - + // Return early if no serials are provided if len(serials) == 0 { return } - + // Sort serials in ascending order for more efficient database access sortedSerials := make([]*types.Uint40, len(serials)) copy(sortedSerials, serials) - sort.Slice(sortedSerials, func(i, j int) bool { - return sortedSerials[i].Get() < sortedSerials[j].Get() - }) - + sort.Slice( + sortedSerials, func(i, j int) bool { + return sortedSerials[i].Get() < sortedSerials[j].Get() + }, + ) + // Process all serials in a single transaction if err = d.View( func(txn *badger.Txn) (err error) { // Create an iterator with default options it := txn.NewIterator(badger.DefaultIteratorOptions) defer it.Close() - + // Process each serial sequentially for _, ser := range sortedSerials { // Create the key for this serial @@ -49,31 +53,43 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (evMap map[string]*eve continue } key := buf.Bytes() - + // Seek to this key in the database it.Seek(key) if it.Valid() { item := it.Item() - + // Verify the key matches exactly (should always be true after a Seek) if !bytes.Equal(item.Key(), key) { continue } - - // Get the item value - var v []byte - if v, err = item.ValueCopy(nil); chk.E(err) { - continue - } - - // Unmarshal the event + ev := new(event.E) - if err = ev.UnmarshalBinary(bytes.NewBuffer(v)); chk.E(err) { + if err = item.Value( + func(val []byte) (err error) { + // Unmarshal the event + if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) { + return + } + // Store the event in the result map using the serial value as string key + return + }, + ); chk.E(err) { continue } - - // Store the event in the result map using the serial value as string key evMap[strconv.FormatUint(ser.Get(), 10)] = ev + // // Get the item value + // var v []byte + // if v, err = item.ValueCopy(nil); chk.E(err) { + // continue + // } + // + // // Unmarshal the event + // ev := new(event.E) + // if err = ev.UnmarshalBinary(bytes.NewBuffer(v)); chk.E(err) { + // continue + // } + } } return @@ -81,7 +97,10 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (evMap map[string]*eve ); chk.E(err) { return } - - log.T.F("FetchEventsBySerials: found %d events out of %d requested serials", len(evMap), len(serials)) + + log.T.F( + "FetchEventsBySerials: found %d events out of %d requested serials", + len(evMap), len(serials), + ) return -} \ No newline at end of file +} diff --git a/pkg/version/version b/pkg/version/version index e534dbb..d5d76e7 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.4.7 \ No newline at end of file +v0.4.8 \ No newline at end of file