refactor index rescan and add fulltext and lang indexes

This commit is contained in:
2025-05-13 19:38:00 -01:06
parent 001ccb0d5b
commit 80162bea84
5 changed files with 146 additions and 51 deletions

View File

@@ -162,6 +162,40 @@ func (k *T) IsDirectoryEvent() bool {
return false
}
var Text = []*T{
ProfileMetadata,
TextNote,
Article,
Thread,
Reply,
Repost,
Issue,
Thread,
Reply,
WikiMergeRequest,
Wiki,
Issue,
IssueOpen,
IssueApplied,
IssueClosed,
IssueDraft,
Torrent,
TorrentComment,
DateBasedCalendarEvent,
TimeBasedCalendarEvent,
Calendar,
CalendarEventRSVP,
}
func (k *T) IsText() bool {
for i := range Text {
if k.Equal(Text[i]) {
return true
}
}
return false
}
var (
// ProfileMetadata is an event type that stores user profile data, pet
// names, bio, lightning address, etc.
@@ -188,6 +222,8 @@ var (
Reaction = &T{7}
// BadgeAward is an event type
BadgeAward = &T{8}
// Thread is an OP for another type of text note
Thread = &T{11}
// Seal is an event that wraps a PrivateDirectMessage and is placed inside a
// GiftWrap or GiftWrapWithKind4
Seal = &T{13}
@@ -212,6 +248,11 @@ var (
ChannelHideMessage = &T{43}
// ChannelMuteUser is an event type that...
ChannelMuteUser = &T{44}
// Reply is a non-OP type of text note that is used with Thread.
Reply = &T{111}
// WikiMergeRequest is a request to have another user merge their version of a wiki page
// into their version.
WikiMergeRequest = &T{818}
// Bid is an event type that...
Bid = &T{1021}
// BidConfirmation is an event type that...
@@ -226,16 +267,30 @@ var (
LiveChatMessage = &T{1311}
// BitcoinBlock is an event type created for the Nostrocket
BitcoinBlock = &T{1517}
// Issue is a conversation post for a git issue.
Issue = &T{1621}
// IssueOpen is an Issue comment that marks an issue open.
IssueOpen = &T{1630}
// IssueApplied is an Issue comment that marks status to applied/merged/resolved
IssueApplied = &T{1631}
// IssueClosed is an Issue comment that marks an Issue as closed
IssueClosed = &T{1632}
// IssueDraft is an Issue comment that marks an issue as a Draft.
IssueDraft = &T{1633}
// LiveStream from zap.stream
LiveStream = &T{1808}
// ProblemTracker is an event type used by Nostrocket
ProblemTracker = &T{1971}
// MemoryHole is an event type contains a report about an event (usually
// text note or other human readable)
// text note or other human-readable)
MemoryHole = &T{1984}
Reporting = &T{1984}
// Label is an event type has L and l tags, namespace and type - NIP-32
Label = &T{1985}
// Torrent is an announcement of a torrent.
Torrent = &T{2003}
// TorrentComment is a comment on a Torrent
TorrentComment = &T{2004}
// CommunityPostApproval is an event type that...
CommunityPostApproval = &T{4550}
JobRequestStart = &T{5000}
@@ -325,12 +380,14 @@ var (
UserStatuses = &T{30315}
ClassifiedListing = &T{30402}
DraftClassifiedListing = &T{30403}
DateBasedCalendarEvent = &T{31922}
TimeBasedCalendarEvent = &T{31923}
Calendar = &T{31924}
CalendarEventRSVP = &T{31925}
HandlerRecommendation = &T{31989}
HandlerInformation = &T{31990}
// Wiki is a page in a wiki
Wiki = &T{30818}
DateBasedCalendarEvent = &T{31922}
TimeBasedCalendarEvent = &T{31923}
Calendar = &T{31924}
CalendarEventRSVP = &T{31925}
HandlerRecommendation = &T{31989}
HandlerInformation = &T{31990}
// WaveLakeTrack which has no spec and uses malformed tags
WaveLakeTrack = &T{32123}
CommunityDefinition = &T{34550}

View File

@@ -13,7 +13,6 @@ import (
"realy.lol/chk"
"realy.lol/event"
"realy.lol/kind"
"realy.lol/log"
"realy.lol/ratel/keys/arb"
"realy.lol/ratel/keys/serial"
@@ -32,6 +31,8 @@ type Words struct {
func (r *T) FulltextIndex() (err error) {
r.WG.Add(1)
defer r.WG.Done()
r.IndexMx.Lock()
defer r.IndexMx.Unlock()
wordsChan := make(chan Words)
go func() {
for {
@@ -151,7 +152,7 @@ func (r *T) FulltextIndex() (err error) {
func (r *T) GetWordsFromContent(ev *event.T) (wordMap map[string]struct{}) {
wordMap = make(map[string]struct{})
if ev.Kind.OneOf(kind.TextNote, kind.Article) {
if ev.Kind.IsText() {
content := ev.Content
seg := words.NewSegmenter(content)
for seg.Next() {

View File

@@ -7,7 +7,6 @@ import (
"realy.lol/chk"
"realy.lol/event"
"realy.lol/kind"
"realy.lol/log"
"realy.lol/ratel/keys/lang"
"realy.lol/ratel/keys/serial"
@@ -25,6 +24,8 @@ type Langs struct {
func (r *T) LangIndex() (err error) {
r.WG.Add(1)
defer r.WG.Done()
r.IndexMx.Lock()
defer r.IndexMx.Unlock()
log.I.F("indexing language tags")
defer log.I.F("finished indexing language tags")
langChan := make(chan Langs)
@@ -124,7 +125,7 @@ func (r *T) LangIndex() (err error) {
}
func (r *T) GetLangTags(ev *event.T) (langs []string) {
if ev.Kind.OneOf(kind.TextNote, kind.Article) {
if ev.Kind.IsText() {
tgs := ev.Tags.GetAll(tag.New("l"))
tgsl := tgs.ToStringsSlice()
for _, v := range tgsl {

View File

@@ -45,6 +45,8 @@ type T struct {
// Binary sets whether to use a fast streaming binary codec for events, to change to this,
// events must be exported, the database nuked and the events re-imported.
Binary bool
// IndexMx is a lock to ensure that indexing doesn't happen concurrently.
IndexMx sync.Mutex
}
func (r *T) SetLogLevel(level string) {

View File

@@ -14,9 +14,53 @@ import (
"realy.lol/timestamp"
)
type Event struct {
ser *serial.T
ev *event.T
}
// Rescan regenerates all indexes of events to add new indexes in a new version.
func (r *T) Rescan() (err error) {
var evKeys [][]byte
r.WG.Add(1)
defer r.WG.Done()
evChan := make(chan *Event)
go func() {
var count int
for {
select {
case <-r.Ctx.Done():
log.I.F("completed rescanning %d events", count)
return
case e := <-evChan:
if e == nil {
log.I.F("completed rescanning %d events", count)
return
}
retry:
if err = r.Update(func(txn *badger.Txn) (err error) {
// rewrite the indexes
var indexKeys [][]byte
indexKeys = GetIndexKeysForEvent(e.ev, e.ser)
for _, k := range indexKeys {
var val []byte
if k[0] == prefixes.Counter.B() {
val = keys.Write(createdat.New(timestamp.Now()))
}
if err = txn.Set(k, val); chk.E(err) {
return
}
}
count++
if count%1000 == 0 {
log.I.F("rescanned %d events", count)
}
return
}); chk.E(err) {
goto retry
}
}
}
}()
err = r.View(func(txn *badger.Txn) (err error) {
prf := []byte{prefixes.Event.B()}
it := txn.NewIterator(badger.IteratorOptions{})
@@ -26,49 +70,39 @@ func (r *T) Rescan() (err error) {
if it.Item().ValueSize() == sha256.Size {
continue
}
evKeys = append(evKeys, item.KeyCopy(nil))
k := item.KeyCopy(nil)
ser := serial.New(k[1:])
var val []byte
if val, err = item.ValueCopy(nil); chk.E(err) {
continue
}
ev := &event.T{}
if _, err = r.Unmarshal(ev, val); chk.E(err) {
return
}
evChan <- &Event{ser: ser, ev: ev}
}
return
})
var i int
var key []byte
for i, key = range evKeys {
err = r.Update(func(txn *badger.Txn) (err error) {
it := txn.NewIterator(badger.IteratorOptions{})
defer it.Close()
it.Seek(key)
if it.Valid() {
item := it.Item()
var evB []byte
if evB, err = item.ValueCopy(nil); chk.E(err) {
return
}
ser := serial.FromKey(key)
ev := &event.T{}
if _, err = r.Unmarshal(ev, evB); chk.E(err) {
return
}
// add the indexes
var indexKeys [][]byte
indexKeys = GetIndexKeysForEvent(ev, ser)
// log.I.S(indexKeys)
for _, k := range indexKeys {
var val []byte
if k[0] == prefixes.Counter.B() {
val = keys.Write(createdat.New(timestamp.Now()))
}
if err = txn.Set(k, val); chk.E(err) {
return
}
}
if i%1000 == 0 {
log.I.F("rescanned %d events", i)
}
}
r.IndexMx.Lock()
if err = r.Update(func(txn *badger.Txn) (err error) {
// reset the last indexed for fulltext
lprf := prefixes.FulltextLastIndexed.Key()
if err = txn.Set(lprf, make([]byte, serial.Len)); chk.E(err) {
return
})
}
// reset the last indexed for lang
lprf = prefixes.LangLastIndexed.Key()
if err = txn.Set(lprf, make([]byte, serial.Len)); chk.E(err) {
return
}
return
}); chk.E(err) {
}
r.IndexMx.Unlock()
if err = r.FulltextIndex(); chk.E(err) {
}
if err = r.LangIndex(); chk.E(err) {
}
chk.E(err)
log.I.F("completed rescanning %d events", i)
return err
}