Refactor publishCacheEvents for concurrent publishing and optimize database access.

- Updated `publishCacheEvents` to utilize multiple concurrent connections for event publishing.
- Introduced worker-based architecture leveraging `runtime.NumCPU` for parallel uploads.
- Optimized database fetch logic in `FetchEventsBySerials` for improved maintainability and performance.
- Bumped version to `v0.4.8`.
This commit is contained in:
2025-09-20 04:10:59 +01:00
parent 58a9e83038
commit 81a40c04e5
3 changed files with 131 additions and 75 deletions

View File

@@ -9,6 +9,7 @@ import (
"math/rand" "math/rand"
"os" "os"
"os/signal" "os/signal"
"runtime"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -140,56 +141,92 @@ func loadCacheAndIndex() (*CacheIndex, error) {
return idx, nil return idx, nil
} }
// publishCacheEvents uploads all cache events to the relay without waiting for OKs // publishCacheEvents uploads all cache events to the relay using multiple concurrent connections
func publishCacheEvents( func publishCacheEvents(
ctx context.Context, rc *RelayConn, idx *CacheIndex, ctx context.Context, relayURL string, idx *CacheIndex,
) (sentCount int) { ) (sentCount int) {
// Use an index-based loop so we can truly retry the same event on transient errors numWorkers := runtime.NumCPU()
for i := 0; i < len(idx.events); i++ { log.I.F("using %d concurrent connections for cache upload", numWorkers)
// allow cancellation
select {
case <-ctx.Done():
return sentCount
default:
}
ev := idx.events[i]
client := rc.Get()
if client == nil {
_ = rc.Reconnect(ctx)
// retry same index
i--
continue
}
log.T.F("cache publish %d/%d", i+1, len(idx.events)) // Channel to distribute events to workers
// Send event without waiting for OK response (fire-and-forget) eventChan := make(chan *event.E, len(idx.events))
envelope := eventenvelope.NewSubmissionWith(ev) var totalSent atomic.Int64
envBytes := envelope.Marshal(nil)
if err := <-client.Write(envBytes); err != nil {
log.E.F("cache write error: %v", err)
errStr := err.Error()
if strings.Contains(errStr, "connection closed") {
_ = rc.Reconnect(ctx)
// retry this event by decrementing i so the for-loop will attempt it again
i--
continue
}
// small backoff and move on to next event on non-transient errors
time.Sleep(50 * time.Millisecond)
continue
}
log.I.F("event: %s", ev.Serialize())
sentCount++
// Add a longer delay between successful publishes to prevent overwhelming the relay // Fill the event channel
// This helps maintain stable connections during sustained cache publishing for _, ev := range idx.events {
select { eventChan <- ev
case <-time.After(100 * time.Millisecond):
case <-ctx.Done():
return sentCount
}
} }
return sentCount close(eventChan)
// Start worker goroutines
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
// Create separate connection for this worker
client, err := ws.RelayConnect(ctx, relayURL)
if err != nil {
log.E.F("worker %d: failed to connect: %v", workerID, err)
return
}
defer client.Close()
rc := &RelayConn{client: client, url: relayURL}
workerSent := 0
// Process events from the channel
for ev := range eventChan {
select {
case <-ctx.Done():
return
default:
}
// Get client connection
wsClient := rc.Get()
if wsClient == nil {
if err := rc.Reconnect(ctx); err != nil {
log.E.F("worker %d: reconnect failed: %v", workerID, err)
continue
}
wsClient = rc.Get()
}
// Send event without waiting for OK response (fire-and-forget)
envelope := eventenvelope.NewSubmissionWith(ev)
envBytes := envelope.Marshal(nil)
if err := <-wsClient.Write(envBytes); err != nil {
log.E.F("worker %d: write error: %v", workerID, err)
errStr := err.Error()
if strings.Contains(errStr, "connection closed") {
_ = rc.Reconnect(ctx)
}
time.Sleep(50 * time.Millisecond)
continue
}
workerSent++
totalSent.Add(1)
log.T.F("worker %d: sent event %d (total: %d)", workerID, workerSent, totalSent.Load())
// Small delay to prevent overwhelming the relay
select {
case <-time.After(10 * time.Millisecond):
case <-ctx.Done():
return
}
}
log.I.F("worker %d: completed, sent %d events", workerID, workerSent)
}(i)
}
// Wait for all workers to complete
wg.Wait()
return int(totalSent.Load())
} }
// buildRandomFilter builds a filter combining random subsets of id, author, timestamp, and a single-letter tag value. // buildRandomFilter builds a filter combining random subsets of id, author, timestamp, and a single-letter tag value.
@@ -518,7 +555,7 @@ func main() {
cacheSent := 0 cacheSent := 0
if !skipCache && idx != nil && len(idx.events) > 0 { if !skipCache && idx != nil && len(idx.events) > 0 {
log.I.F("sending %d events from examples.Cache...", len(idx.events)) log.I.F("sending %d events from examples.Cache...", len(idx.events))
cacheSent = publishCacheEvents(ctx, rc, idx) cacheSent = publishCacheEvents(ctx, relayURL, idx)
log.I.F("sent %d/%d cache events", cacheSent, len(idx.events)) log.I.F("sent %d/%d cache events", cacheSent, len(idx.events))
} }

View File

@@ -16,7 +16,9 @@ import (
// FetchEventsBySerials processes multiple serials in ascending order and retrieves // FetchEventsBySerials processes multiple serials in ascending order and retrieves
// the corresponding events from the database. It optimizes database access by // the corresponding events from the database. It optimizes database access by
// sorting the serials and seeking to each one sequentially. // sorting the serials and seeking to each one sequentially.
func (d *D) FetchEventsBySerials(serials []*types.Uint40) (evMap map[string]*event.E, err error) { func (d *D) FetchEventsBySerials(serials []*types.Uint40) (
evMap map[string]*event.E, err error,
) {
log.T.F("FetchEventsBySerials: processing %d serials", len(serials)) log.T.F("FetchEventsBySerials: processing %d serials", len(serials))
// Initialize the result map // Initialize the result map
@@ -30,9 +32,11 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (evMap map[string]*eve
// Sort serials in ascending order for more efficient database access // Sort serials in ascending order for more efficient database access
sortedSerials := make([]*types.Uint40, len(serials)) sortedSerials := make([]*types.Uint40, len(serials))
copy(sortedSerials, serials) copy(sortedSerials, serials)
sort.Slice(sortedSerials, func(i, j int) bool { sort.Slice(
return sortedSerials[i].Get() < sortedSerials[j].Get() sortedSerials, func(i, j int) bool {
}) return sortedSerials[i].Get() < sortedSerials[j].Get()
},
)
// Process all serials in a single transaction // Process all serials in a single transaction
if err = d.View( if err = d.View(
@@ -60,20 +64,32 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (evMap map[string]*eve
continue continue
} }
// Get the item value
var v []byte
if v, err = item.ValueCopy(nil); chk.E(err) {
continue
}
// Unmarshal the event
ev := new(event.E) ev := new(event.E)
if err = ev.UnmarshalBinary(bytes.NewBuffer(v)); chk.E(err) { if err = item.Value(
func(val []byte) (err error) {
// Unmarshal the event
if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
return
}
// Store the event in the result map using the serial value as string key
return
},
); chk.E(err) {
continue continue
} }
// Store the event in the result map using the serial value as string key
evMap[strconv.FormatUint(ser.Get(), 10)] = ev evMap[strconv.FormatUint(ser.Get(), 10)] = ev
// // Get the item value
// var v []byte
// if v, err = item.ValueCopy(nil); chk.E(err) {
// continue
// }
//
// // Unmarshal the event
// ev := new(event.E)
// if err = ev.UnmarshalBinary(bytes.NewBuffer(v)); chk.E(err) {
// continue
// }
} }
} }
return return
@@ -82,6 +98,9 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (evMap map[string]*eve
return return
} }
log.T.F("FetchEventsBySerials: found %d events out of %d requested serials", len(evMap), len(serials)) log.T.F(
"FetchEventsBySerials: found %d events out of %d requested serials",
len(evMap), len(serials),
)
return return
} }

View File

@@ -1 +1 @@
v0.4.7 v0.4.8