Fix broken submodule and add import memory optimization plan
- Remove broken submodule reference for pkg/protocol/blossom/blossom and track blossom spec files as regular files instead - Add IMPORT_MEMORY_OPTIMIZATION_PLAN.md documenting strategies to constrain import memory usage to ≤1.5GB through cache reduction, batched syncs, batch transactions, and adaptive rate limiting - Based on test results: 2.1M events imported in 48min at 736 events/sec with peak memory of 6.4GB (target is 1.5GB) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"os"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
@@ -20,6 +21,9 @@ const maxLen = 500000000
|
||||
|
||||
// ImportEventsFromReader imports events from an io.Reader containing JSONL data
|
||||
func (d *D) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
|
||||
startTime := time.Now()
|
||||
log.I.F("import: starting import operation")
|
||||
|
||||
// store to disk so we can return fast
|
||||
tmpPath := os.TempDir() + string(os.PathSeparator) + "orly"
|
||||
os.MkdirAll(tmpPath, 0700)
|
||||
@@ -29,15 +33,27 @@ func (d *D) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
|
||||
}
|
||||
defer os.Remove(tmp.Name()) // Clean up temp file when done
|
||||
|
||||
log.I.F("buffering upload to %s", tmp.Name())
|
||||
if _, err = io.Copy(tmp, rr); chk.E(err) {
|
||||
log.I.F("import: buffering upload to %s", tmp.Name())
|
||||
bufferStart := time.Now()
|
||||
bytesBuffered, err := io.Copy(tmp, rr)
|
||||
if chk.E(err) {
|
||||
return err
|
||||
}
|
||||
bufferElapsed := time.Since(bufferStart)
|
||||
log.I.F("import: buffered %.2f MB in %v (%.2f MB/sec)",
|
||||
float64(bytesBuffered)/1024/1024, bufferElapsed.Round(time.Millisecond),
|
||||
float64(bytesBuffered)/bufferElapsed.Seconds()/1024/1024)
|
||||
|
||||
if _, err = tmp.Seek(0, 0); chk.E(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.processJSONLEvents(ctx, tmp)
|
||||
processErr := d.processJSONLEvents(ctx, tmp)
|
||||
|
||||
totalElapsed := time.Since(startTime)
|
||||
log.I.F("import: total operation time: %v", totalElapsed.Round(time.Millisecond))
|
||||
|
||||
return processErr
|
||||
}
|
||||
|
||||
// ImportEventsFromStrings imports events from a slice of JSON strings with policy filtering
|
||||
@@ -59,11 +75,16 @@ func (d *D) processJSONLEventsWithPolicy(ctx context.Context, rr io.Reader, poli
|
||||
scanBuf := make([]byte, maxLen)
|
||||
scan.Buffer(scanBuf, maxLen)
|
||||
|
||||
var count, total int
|
||||
// Performance tracking
|
||||
startTime := time.Now()
|
||||
lastLogTime := startTime
|
||||
const logInterval = 5 * time.Second
|
||||
|
||||
var count, total, skipped, policyRejected, unmarshalErrors, saveErrors int
|
||||
for scan.Scan() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.I.F("context closed")
|
||||
log.I.F("import: context closed after %d events", count)
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
@@ -71,6 +92,7 @@ func (d *D) processJSONLEventsWithPolicy(ctx context.Context, rr io.Reader, poli
|
||||
b := scan.Bytes()
|
||||
total += len(b) + 1
|
||||
if len(b) < 1 {
|
||||
skipped++
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -78,6 +100,7 @@ func (d *D) processJSONLEventsWithPolicy(ctx context.Context, rr io.Reader, poli
|
||||
if _, err := ev.Unmarshal(b); err != nil {
|
||||
// return the pooled buffer on error
|
||||
ev.Free()
|
||||
unmarshalErrors++
|
||||
log.W.F("failed to unmarshal event: %v", err)
|
||||
continue
|
||||
}
|
||||
@@ -90,11 +113,13 @@ func (d *D) processJSONLEventsWithPolicy(ctx context.Context, rr io.Reader, poli
|
||||
if policyErr != nil {
|
||||
log.W.F("policy check failed for event %x: %v", ev.ID, policyErr)
|
||||
ev.Free()
|
||||
policyRejected++
|
||||
continue
|
||||
}
|
||||
if !allowed {
|
||||
log.D.F("policy rejected event %x during sync import", ev.ID)
|
||||
ev.Free()
|
||||
policyRejected++
|
||||
continue
|
||||
}
|
||||
log.D.F("policy allowed event %x during sync import", ev.ID)
|
||||
@@ -103,6 +128,7 @@ func (d *D) processJSONLEventsWithPolicy(ctx context.Context, rr io.Reader, poli
|
||||
if _, err := d.SaveEvent(ctx, ev); err != nil {
|
||||
// return the pooled buffer on error paths too
|
||||
ev.Free()
|
||||
saveErrors++
|
||||
log.W.F("failed to save event: %v", err)
|
||||
continue
|
||||
}
|
||||
@@ -111,13 +137,30 @@ func (d *D) processJSONLEventsWithPolicy(ctx context.Context, rr io.Reader, poli
|
||||
ev.Free()
|
||||
b = nil
|
||||
count++
|
||||
if count%100 == 0 {
|
||||
log.I.F("processed %d events", count)
|
||||
|
||||
// Progress logging every logInterval
|
||||
if time.Since(lastLogTime) >= logInterval {
|
||||
elapsed := time.Since(startTime)
|
||||
eventsPerSec := float64(count) / elapsed.Seconds()
|
||||
mbPerSec := float64(total) / elapsed.Seconds() / 1024 / 1024
|
||||
log.I.F("import: progress %d events saved, %.2f MB read, %.0f events/sec, %.2f MB/sec",
|
||||
count, float64(total)/1024/1024, eventsPerSec, mbPerSec)
|
||||
lastLogTime = time.Now()
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
}
|
||||
|
||||
log.I.F("read %d bytes and saved %d events", total, count)
|
||||
// Final summary
|
||||
elapsed := time.Since(startTime)
|
||||
eventsPerSec := float64(count) / elapsed.Seconds()
|
||||
mbPerSec := float64(total) / elapsed.Seconds() / 1024 / 1024
|
||||
log.I.F("import: completed - %d events saved, %.2f MB in %v (%.0f events/sec, %.2f MB/sec)",
|
||||
count, float64(total)/1024/1024, elapsed.Round(time.Millisecond), eventsPerSec, mbPerSec)
|
||||
if unmarshalErrors > 0 || saveErrors > 0 || policyRejected > 0 || skipped > 0 {
|
||||
log.I.F("import: stats - %d unmarshal errors, %d save errors, %d policy rejected, %d skipped empty lines",
|
||||
unmarshalErrors, saveErrors, policyRejected, skipped)
|
||||
}
|
||||
|
||||
if err := scan.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user