- Introduced test files for the blossom and database packages to improve test coverage and ensure functionality. - Updated logging practices by suppressing unnecessary log outputs during tests to enhance clarity and focus on relevant information. - Refactored error handling in the `handle-message` and `handle-req` functions to avoid logging expected context cancellation errors during shutdown. - Bumped version to v0.25.2 to reflect these updates.
326 lines
7.9 KiB
Go
326 lines
7.9 KiB
Go
package database
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"sort"
|
|
|
|
"github.com/dgraph-io/badger/v4"
|
|
"lol.mleku.dev/chk"
|
|
"lol.mleku.dev/log"
|
|
"next.orly.dev/pkg/database/indexes"
|
|
"next.orly.dev/pkg/database/indexes/types"
|
|
"next.orly.dev/pkg/encoders/event"
|
|
"next.orly.dev/pkg/encoders/ints"
|
|
)
|
|
|
|
const (
|
|
currentVersion uint32 = 3
|
|
)
|
|
|
|
func (d *D) RunMigrations() {
|
|
var err error
|
|
var dbVersion uint32
|
|
// first find the current version tag if any
|
|
if err = d.View(
|
|
func(txn *badger.Txn) (err error) {
|
|
buf := new(bytes.Buffer)
|
|
if err = indexes.VersionEnc(nil).MarshalWrite(buf); chk.E(err) {
|
|
return
|
|
}
|
|
verPrf := new(bytes.Buffer)
|
|
if _, err = indexes.VersionPrefix.Write(verPrf); chk.E(err) {
|
|
return
|
|
}
|
|
it := txn.NewIterator(
|
|
badger.IteratorOptions{
|
|
Prefix: verPrf.Bytes(),
|
|
},
|
|
)
|
|
defer it.Close()
|
|
ver := indexes.VersionVars()
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
// there should only be one
|
|
item := it.Item()
|
|
key := item.Key()
|
|
if err = indexes.VersionDec(ver).UnmarshalRead(
|
|
bytes.NewBuffer(key),
|
|
); chk.E(err) {
|
|
return
|
|
}
|
|
log.I.F("found version tag: %d", ver.Get())
|
|
dbVersion = ver.Get()
|
|
}
|
|
return
|
|
},
|
|
); chk.E(err) {
|
|
}
|
|
if dbVersion == 0 {
|
|
// write the version tag now (ensure any old tags are removed first)
|
|
if err = d.writeVersionTag(currentVersion); chk.E(err) {
|
|
return
|
|
}
|
|
}
|
|
if dbVersion < 1 {
|
|
log.I.F("migrating to version 1...")
|
|
// the first migration is expiration tags
|
|
d.UpdateExpirationTags()
|
|
// bump to version 1
|
|
_ = d.writeVersionTag(1)
|
|
}
|
|
if dbVersion < 2 {
|
|
log.I.F("migrating to version 2...")
|
|
// backfill word indexes
|
|
d.UpdateWordIndexes()
|
|
// bump to version 2
|
|
_ = d.writeVersionTag(2)
|
|
}
|
|
if dbVersion < 3 {
|
|
log.I.F("migrating to version 3...")
|
|
// clean up ephemeral events that should never have been stored
|
|
d.CleanupEphemeralEvents()
|
|
// bump to version 3
|
|
_ = d.writeVersionTag(3)
|
|
}
|
|
}
|
|
|
|
// writeVersionTag writes a new version tag key to the database (no value)
|
|
func (d *D) writeVersionTag(ver uint32) (err error) {
|
|
return d.Update(
|
|
func(txn *badger.Txn) (err error) {
|
|
// delete any existing version keys first (there should only be one, but be safe)
|
|
verPrf := new(bytes.Buffer)
|
|
if _, err = indexes.VersionPrefix.Write(verPrf); chk.E(err) {
|
|
return
|
|
}
|
|
it := txn.NewIterator(badger.IteratorOptions{Prefix: verPrf.Bytes()})
|
|
defer it.Close()
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
item := it.Item()
|
|
key := item.KeyCopy(nil)
|
|
if err = txn.Delete(key); chk.E(err) {
|
|
return
|
|
}
|
|
}
|
|
|
|
// now write the new version key
|
|
buf := new(bytes.Buffer)
|
|
vv := new(types.Uint32)
|
|
vv.Set(ver)
|
|
if err = indexes.VersionEnc(vv).MarshalWrite(buf); chk.E(err) {
|
|
return
|
|
}
|
|
return txn.Set(buf.Bytes(), nil)
|
|
},
|
|
)
|
|
}
|
|
|
|
func (d *D) UpdateWordIndexes() {
|
|
var err error
|
|
var wordIndexes [][]byte
|
|
// iterate all events and generate word index keys from content and tags
|
|
if err = d.View(
|
|
func(txn *badger.Txn) (err error) {
|
|
prf := new(bytes.Buffer)
|
|
if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
|
|
return
|
|
}
|
|
it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()})
|
|
defer it.Close()
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
item := it.Item()
|
|
var val []byte
|
|
if val, err = item.ValueCopy(nil); chk.E(err) {
|
|
continue
|
|
}
|
|
// decode the event
|
|
ev := new(event.E)
|
|
if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
|
|
continue
|
|
}
|
|
// log.I.F("updating word indexes for event: %s", ev.Serialize())
|
|
// read serial from key
|
|
key := item.Key()
|
|
ser := indexes.EventVars()
|
|
if err = indexes.EventDec(ser).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) {
|
|
continue
|
|
}
|
|
// collect unique word hashes for this event
|
|
seen := make(map[string]struct{})
|
|
// from content
|
|
if len(ev.Content) > 0 {
|
|
for _, h := range TokenHashes(ev.Content) {
|
|
seen[string(h)] = struct{}{}
|
|
}
|
|
}
|
|
// from all tag fields (key and values)
|
|
if ev.Tags != nil && ev.Tags.Len() > 0 {
|
|
for _, t := range *ev.Tags {
|
|
for _, field := range t.T {
|
|
if len(field) == 0 {
|
|
continue
|
|
}
|
|
for _, h := range TokenHashes(field) {
|
|
seen[string(h)] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// build keys
|
|
for k := range seen {
|
|
w := new(types.Word)
|
|
w.FromWord([]byte(k))
|
|
buf := new(bytes.Buffer)
|
|
if err = indexes.WordEnc(
|
|
w, ser,
|
|
).MarshalWrite(buf); chk.E(err) {
|
|
continue
|
|
}
|
|
wordIndexes = append(wordIndexes, buf.Bytes())
|
|
}
|
|
}
|
|
return
|
|
},
|
|
); chk.E(err) {
|
|
return
|
|
}
|
|
// sort the indexes for ordered writes
|
|
sort.Slice(
|
|
wordIndexes, func(i, j int) bool {
|
|
return bytes.Compare(
|
|
wordIndexes[i], wordIndexes[j],
|
|
) < 0
|
|
},
|
|
)
|
|
// write in a batch
|
|
batch := d.NewWriteBatch()
|
|
for _, v := range wordIndexes {
|
|
if err = batch.Set(v, nil); chk.E(err) {
|
|
continue
|
|
}
|
|
}
|
|
_ = batch.Flush()
|
|
}
|
|
|
|
func (d *D) UpdateExpirationTags() {
|
|
var err error
|
|
var expIndexes [][]byte
|
|
// iterate all event records and decode and look for version tags
|
|
if err = d.View(
|
|
func(txn *badger.Txn) (err error) {
|
|
prf := new(bytes.Buffer)
|
|
if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
|
|
return
|
|
}
|
|
it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()})
|
|
defer it.Close()
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
item := it.Item()
|
|
var val []byte
|
|
if val, err = item.ValueCopy(nil); chk.E(err) {
|
|
continue
|
|
}
|
|
// decode the event
|
|
ev := new(event.E)
|
|
if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
|
|
continue
|
|
}
|
|
expTag := ev.Tags.GetFirst([]byte("expiration"))
|
|
if expTag == nil {
|
|
continue
|
|
}
|
|
expTS := ints.New(0)
|
|
if _, err = expTS.Unmarshal(expTag.Value()); chk.E(err) {
|
|
continue
|
|
}
|
|
key := item.Key()
|
|
ser := indexes.EventVars()
|
|
if err = indexes.EventDec(ser).UnmarshalRead(
|
|
bytes.NewBuffer(key),
|
|
); chk.E(err) {
|
|
continue
|
|
}
|
|
// create the expiration tag
|
|
exp, _ := indexes.ExpirationVars()
|
|
exp.Set(expTS.N)
|
|
expBuf := new(bytes.Buffer)
|
|
if err = indexes.ExpirationEnc(
|
|
exp, ser,
|
|
).MarshalWrite(expBuf); chk.E(err) {
|
|
continue
|
|
}
|
|
expIndexes = append(expIndexes, expBuf.Bytes())
|
|
}
|
|
return
|
|
},
|
|
); chk.E(err) {
|
|
return
|
|
}
|
|
// sort the indexes first so they're written in order, improving compaction
|
|
// and iteration.
|
|
sort.Slice(
|
|
expIndexes, func(i, j int) bool {
|
|
return bytes.Compare(expIndexes[i], expIndexes[j]) < 0
|
|
},
|
|
)
|
|
// write the collected indexes
|
|
batch := d.NewWriteBatch()
|
|
for _, v := range expIndexes {
|
|
if err = batch.Set(v, nil); chk.E(err) {
|
|
continue
|
|
}
|
|
}
|
|
if err = batch.Flush(); chk.E(err) {
|
|
return
|
|
}
|
|
}
|
|
|
|
func (d *D) CleanupEphemeralEvents() {
|
|
log.I.F("cleaning up ephemeral events (kinds 20000-29999)...")
|
|
var err error
|
|
var ephemeralEvents [][]byte
|
|
|
|
// iterate all event records and find ephemeral events
|
|
if err = d.View(
|
|
func(txn *badger.Txn) (err error) {
|
|
prf := new(bytes.Buffer)
|
|
if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
|
|
return
|
|
}
|
|
it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()})
|
|
defer it.Close()
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
item := it.Item()
|
|
var val []byte
|
|
if val, err = item.ValueCopy(nil); chk.E(err) {
|
|
continue
|
|
}
|
|
// decode the event
|
|
ev := new(event.E)
|
|
if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) {
|
|
continue
|
|
}
|
|
// check if it's an ephemeral event (kinds 20000-29999)
|
|
if ev.Kind >= 20000 && ev.Kind <= 29999 {
|
|
ephemeralEvents = append(ephemeralEvents, ev.ID)
|
|
}
|
|
}
|
|
return
|
|
},
|
|
); chk.E(err) {
|
|
return
|
|
}
|
|
|
|
// delete all found ephemeral events
|
|
deletedCount := 0
|
|
for _, eventId := range ephemeralEvents {
|
|
if err = d.DeleteEvent(context.Background(), eventId); chk.E(err) {
|
|
log.W.F("failed to delete ephemeral event %x: %v", eventId, err)
|
|
continue
|
|
}
|
|
deletedCount++
|
|
}
|
|
|
|
log.I.F("cleaned up %d ephemeral events from database", deletedCount)
|
|
}
|