optimize e and p tags
This commit is contained in:
@@ -16,7 +16,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
currentVersion uint32 = 4
|
||||
currentVersion uint32 = 5
|
||||
)
|
||||
|
||||
func (d *D) RunMigrations() {
|
||||
@@ -90,6 +90,13 @@ func (d *D) RunMigrations() {
|
||||
// bump to version 4
|
||||
_ = d.writeVersionTag(4)
|
||||
}
|
||||
if dbVersion < 5 {
|
||||
log.I.F("migrating to version 5...")
|
||||
// re-encode events with optimized tag binary format (e/p tags)
|
||||
d.ReencodeEventsWithOptimizedTags()
|
||||
// bump to version 5
|
||||
_ = d.writeVersionTag(5)
|
||||
}
|
||||
}
|
||||
|
||||
// writeVersionTag writes a new version tag key to the database (no value)
|
||||
@@ -537,3 +544,140 @@ func (d *D) ConvertSmallEventsToInline() {
|
||||
|
||||
log.I.F("migration complete: converted %d events to optimized inline storage, deleted %d old keys", convertedCount, deletedCount)
|
||||
}
|
||||
|
||||
// ReencodeEventsWithOptimizedTags re-encodes all events to use the new binary
|
||||
// tag format that stores e/p tag values as 33-byte binary (32-byte hash + null)
|
||||
// instead of 64-byte hex strings. This reduces memory usage by ~48% for these tags.
|
||||
func (d *D) ReencodeEventsWithOptimizedTags() {
|
||||
log.I.F("re-encoding events with optimized tag binary format...")
|
||||
var err error
|
||||
|
||||
type EventUpdate struct {
|
||||
Key []byte
|
||||
OldData []byte
|
||||
NewData []byte
|
||||
}
|
||||
|
||||
var updates []EventUpdate
|
||||
var processedCount int
|
||||
|
||||
// Helper to collect event updates from iterator
|
||||
// Only processes regular events (evt prefix) - inline storage already benefits
|
||||
collectUpdates := func(it *badger.Iterator, prefix []byte) error {
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
item := it.Item()
|
||||
key := item.KeyCopy(nil)
|
||||
|
||||
var val []byte
|
||||
if val, err = item.ValueCopy(nil); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Regular event storage - data is in value
|
||||
eventData := val
|
||||
if len(eventData) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Decode the event
|
||||
ev := new(event.E)
|
||||
if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if this event has e or p tags that could benefit from optimization
|
||||
hasOptimizableTags := false
|
||||
if ev.Tags != nil && ev.Tags.Len() > 0 {
|
||||
for _, t := range *ev.Tags {
|
||||
if t.Len() >= 2 {
|
||||
key := t.Key()
|
||||
if len(key) == 1 && (key[0] == 'e' || key[0] == 'p') {
|
||||
hasOptimizableTags = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !hasOptimizableTags {
|
||||
continue
|
||||
}
|
||||
|
||||
// Re-encode the event (this will apply the new tag optimization)
|
||||
newData := ev.MarshalBinaryToBytes(nil)
|
||||
|
||||
// Only update if the data actually changed
|
||||
if !bytes.Equal(eventData, newData) {
|
||||
updates = append(updates, EventUpdate{
|
||||
Key: key,
|
||||
OldData: eventData,
|
||||
NewData: newData,
|
||||
})
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Only process regular "evt" prefix events (not inline storage)
|
||||
// Inline storage (sev, rev, aev) already benefits from the optimization
|
||||
// because the binary data is stored directly in the key
|
||||
prf := new(bytes.Buffer)
|
||||
if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
|
||||
return
|
||||
}
|
||||
evtPrefix := prf.Bytes()
|
||||
|
||||
// Collect updates from regular events only
|
||||
if err = d.View(func(txn *badger.Txn) error {
|
||||
it := txn.NewIterator(badger.IteratorOptions{Prefix: evtPrefix})
|
||||
defer it.Close()
|
||||
return collectUpdates(it, evtPrefix)
|
||||
}); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
log.I.F("found %d events with e/p tags to re-encode", len(updates))
|
||||
|
||||
if len(updates) == 0 {
|
||||
log.I.F("no events need re-encoding")
|
||||
return
|
||||
}
|
||||
|
||||
// Apply updates in batches
|
||||
const batchSize = 1000
|
||||
for i := 0; i < len(updates); i += batchSize {
|
||||
end := i + batchSize
|
||||
if end > len(updates) {
|
||||
end = len(updates)
|
||||
}
|
||||
batch := updates[i:end]
|
||||
|
||||
if err = d.Update(func(txn *badger.Txn) error {
|
||||
for _, upd := range batch {
|
||||
// Since we're only processing regular events (evt prefix),
|
||||
// we just update the value directly
|
||||
if err = txn.Set(upd.Key, upd.NewData); chk.E(err) {
|
||||
log.W.F("failed to update event: %v", err)
|
||||
continue
|
||||
}
|
||||
processedCount++
|
||||
}
|
||||
return nil
|
||||
}); chk.E(err) {
|
||||
log.W.F("batch update failed: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if (i/batchSize)%10 == 0 && i > 0 {
|
||||
log.I.F("progress: %d/%d events re-encoded", i, len(updates))
|
||||
}
|
||||
}
|
||||
|
||||
savedBytes := 0
|
||||
for _, upd := range updates {
|
||||
savedBytes += len(upd.OldData) - len(upd.NewData)
|
||||
}
|
||||
|
||||
log.I.F("migration complete: re-encoded %d events, saved approximately %d bytes (%.2f KB)",
|
||||
processedCount, savedBytes, float64(savedBytes)/1024.0)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user