From 80162bea8415b7561b97ccf1769cea97bf09083e Mon Sep 17 00:00:00 2001 From: mleku Date: Tue, 13 May 2025 19:38:00 -0106 Subject: [PATCH] refactor index rescan and add fulltext and lang indexes --- kind/kind.go | 71 +++++++++++++++++++++++++--- ratel/fulltext.go | 5 +- ratel/langindex.go | 5 +- ratel/main.go | 2 + ratel/rescan.go | 114 +++++++++++++++++++++++++++++---------------- 5 files changed, 146 insertions(+), 51 deletions(-) diff --git a/kind/kind.go b/kind/kind.go index 165b72f..65e0286 100644 --- a/kind/kind.go +++ b/kind/kind.go @@ -162,6 +162,40 @@ func (k *T) IsDirectoryEvent() bool { return false } +var Text = []*T{ + ProfileMetadata, + TextNote, + Article, + Thread, + Reply, + Repost, + Issue, + Thread, + Reply, + WikiMergeRequest, + Wiki, + Issue, + IssueOpen, + IssueApplied, + IssueClosed, + IssueDraft, + Torrent, + TorrentComment, + DateBasedCalendarEvent, + TimeBasedCalendarEvent, + Calendar, + CalendarEventRSVP, +} + +func (k *T) IsText() bool { + for i := range Text { + if k.Equal(Text[i]) { + return true + } + } + return false +} + var ( // ProfileMetadata is an event type that stores user profile data, pet // names, bio, lightning address, etc. @@ -188,6 +222,8 @@ var ( Reaction = &T{7} // BadgeAward is an event type BadgeAward = &T{8} + // Thread is an OP for another type of text note + Thread = &T{11} // Seal is an event that wraps a PrivateDirectMessage and is placed inside a // GiftWrap or GiftWrapWithKind4 Seal = &T{13} @@ -212,6 +248,11 @@ var ( ChannelHideMessage = &T{43} // ChannelMuteUser is an event type that... ChannelMuteUser = &T{44} + // Reply is a non-OP type of text note that is used with Thread. + Reply = &T{111} + // WikiMergeRequest is a request to have another user merge their version of a wiki page + // into their version. + WikiMergeRequest = &T{818} // Bid is an event type that... Bid = &T{1021} // BidConfirmation is an event type that... @@ -226,16 +267,30 @@ var ( LiveChatMessage = &T{1311} // BitcoinBlock is an event type created for the Nostrocket BitcoinBlock = &T{1517} + // Issue is a conversation post for a git issue. + Issue = &T{1621} + // IssueOpen is an Issue comment that marks an issue open. + IssueOpen = &T{1630} + // IssueApplied is an Issue comment that marks status to applied/merged/resolved + IssueApplied = &T{1631} + // IssueClosed is an Issue comment that marks an Issue as closed + IssueClosed = &T{1632} + // IssueDraft is an Issue comment that marks an issue as a Draft. + IssueDraft = &T{1633} // LiveStream from zap.stream LiveStream = &T{1808} // ProblemTracker is an event type used by Nostrocket ProblemTracker = &T{1971} // MemoryHole is an event type contains a report about an event (usually - // text note or other human readable) + // text note or other human-readable) MemoryHole = &T{1984} Reporting = &T{1984} // Label is an event type has L and l tags, namespace and type - NIP-32 Label = &T{1985} + // Torrent is an announcement of a torrent. + Torrent = &T{2003} + // TorrentComment is a comment on a Torrent + TorrentComment = &T{2004} // CommunityPostApproval is an event type that... CommunityPostApproval = &T{4550} JobRequestStart = &T{5000} @@ -325,12 +380,14 @@ var ( UserStatuses = &T{30315} ClassifiedListing = &T{30402} DraftClassifiedListing = &T{30403} - DateBasedCalendarEvent = &T{31922} - TimeBasedCalendarEvent = &T{31923} - Calendar = &T{31924} - CalendarEventRSVP = &T{31925} - HandlerRecommendation = &T{31989} - HandlerInformation = &T{31990} + // Wiki is a page in a wiki + Wiki = &T{30818} + DateBasedCalendarEvent = &T{31922} + TimeBasedCalendarEvent = &T{31923} + Calendar = &T{31924} + CalendarEventRSVP = &T{31925} + HandlerRecommendation = &T{31989} + HandlerInformation = &T{31990} // WaveLakeTrack which has no spec and uses malformed tags WaveLakeTrack = &T{32123} CommunityDefinition = &T{34550} diff --git a/ratel/fulltext.go b/ratel/fulltext.go index d8622bd..c14f5ba 100644 --- a/ratel/fulltext.go +++ b/ratel/fulltext.go @@ -13,7 +13,6 @@ import ( "realy.lol/chk" "realy.lol/event" - "realy.lol/kind" "realy.lol/log" "realy.lol/ratel/keys/arb" "realy.lol/ratel/keys/serial" @@ -32,6 +31,8 @@ type Words struct { func (r *T) FulltextIndex() (err error) { r.WG.Add(1) defer r.WG.Done() + r.IndexMx.Lock() + defer r.IndexMx.Unlock() wordsChan := make(chan Words) go func() { for { @@ -151,7 +152,7 @@ func (r *T) FulltextIndex() (err error) { func (r *T) GetWordsFromContent(ev *event.T) (wordMap map[string]struct{}) { wordMap = make(map[string]struct{}) - if ev.Kind.OneOf(kind.TextNote, kind.Article) { + if ev.Kind.IsText() { content := ev.Content seg := words.NewSegmenter(content) for seg.Next() { diff --git a/ratel/langindex.go b/ratel/langindex.go index 3a76d32..346ab4b 100644 --- a/ratel/langindex.go +++ b/ratel/langindex.go @@ -7,7 +7,6 @@ import ( "realy.lol/chk" "realy.lol/event" - "realy.lol/kind" "realy.lol/log" "realy.lol/ratel/keys/lang" "realy.lol/ratel/keys/serial" @@ -25,6 +24,8 @@ type Langs struct { func (r *T) LangIndex() (err error) { r.WG.Add(1) defer r.WG.Done() + r.IndexMx.Lock() + defer r.IndexMx.Unlock() log.I.F("indexing language tags") defer log.I.F("finished indexing language tags") langChan := make(chan Langs) @@ -124,7 +125,7 @@ func (r *T) LangIndex() (err error) { } func (r *T) GetLangTags(ev *event.T) (langs []string) { - if ev.Kind.OneOf(kind.TextNote, kind.Article) { + if ev.Kind.IsText() { tgs := ev.Tags.GetAll(tag.New("l")) tgsl := tgs.ToStringsSlice() for _, v := range tgsl { diff --git a/ratel/main.go b/ratel/main.go index 4f3bd36..8e8629d 100644 --- a/ratel/main.go +++ b/ratel/main.go @@ -45,6 +45,8 @@ type T struct { // Binary sets whether to use a fast streaming binary codec for events, to change to this, // events must be exported, the database nuked and the events re-imported. Binary bool + // IndexMx is a lock to ensure that indexing doesn't happen concurrently. + IndexMx sync.Mutex } func (r *T) SetLogLevel(level string) { diff --git a/ratel/rescan.go b/ratel/rescan.go index 7e4e5bb..f208368 100644 --- a/ratel/rescan.go +++ b/ratel/rescan.go @@ -14,9 +14,53 @@ import ( "realy.lol/timestamp" ) +type Event struct { + ser *serial.T + ev *event.T +} + // Rescan regenerates all indexes of events to add new indexes in a new version. func (r *T) Rescan() (err error) { - var evKeys [][]byte + r.WG.Add(1) + defer r.WG.Done() + evChan := make(chan *Event) + go func() { + var count int + for { + select { + case <-r.Ctx.Done(): + log.I.F("completed rescanning %d events", count) + return + case e := <-evChan: + if e == nil { + log.I.F("completed rescanning %d events", count) + return + } + retry: + if err = r.Update(func(txn *badger.Txn) (err error) { + // rewrite the indexes + var indexKeys [][]byte + indexKeys = GetIndexKeysForEvent(e.ev, e.ser) + for _, k := range indexKeys { + var val []byte + if k[0] == prefixes.Counter.B() { + val = keys.Write(createdat.New(timestamp.Now())) + } + if err = txn.Set(k, val); chk.E(err) { + return + } + } + count++ + if count%1000 == 0 { + log.I.F("rescanned %d events", count) + } + return + }); chk.E(err) { + goto retry + } + } + } + }() err = r.View(func(txn *badger.Txn) (err error) { prf := []byte{prefixes.Event.B()} it := txn.NewIterator(badger.IteratorOptions{}) @@ -26,49 +70,39 @@ func (r *T) Rescan() (err error) { if it.Item().ValueSize() == sha256.Size { continue } - evKeys = append(evKeys, item.KeyCopy(nil)) + k := item.KeyCopy(nil) + ser := serial.New(k[1:]) + var val []byte + if val, err = item.ValueCopy(nil); chk.E(err) { + continue + } + ev := &event.T{} + if _, err = r.Unmarshal(ev, val); chk.E(err) { + return + } + evChan <- &Event{ser: ser, ev: ev} } return }) - var i int - var key []byte - for i, key = range evKeys { - err = r.Update(func(txn *badger.Txn) (err error) { - it := txn.NewIterator(badger.IteratorOptions{}) - defer it.Close() - it.Seek(key) - if it.Valid() { - item := it.Item() - var evB []byte - if evB, err = item.ValueCopy(nil); chk.E(err) { - return - } - ser := serial.FromKey(key) - ev := &event.T{} - if _, err = r.Unmarshal(ev, evB); chk.E(err) { - return - } - // add the indexes - var indexKeys [][]byte - indexKeys = GetIndexKeysForEvent(ev, ser) - // log.I.S(indexKeys) - for _, k := range indexKeys { - var val []byte - if k[0] == prefixes.Counter.B() { - val = keys.Write(createdat.New(timestamp.Now())) - } - if err = txn.Set(k, val); chk.E(err) { - return - } - } - if i%1000 == 0 { - log.I.F("rescanned %d events", i) - } - } + r.IndexMx.Lock() + if err = r.Update(func(txn *badger.Txn) (err error) { + // reset the last indexed for fulltext + lprf := prefixes.FulltextLastIndexed.Key() + if err = txn.Set(lprf, make([]byte, serial.Len)); chk.E(err) { return - }) + } + // reset the last indexed for lang + lprf = prefixes.LangLastIndexed.Key() + if err = txn.Set(lprf, make([]byte, serial.Len)); chk.E(err) { + return + } + return + }); chk.E(err) { + } + r.IndexMx.Unlock() + if err = r.FulltextIndex(); chk.E(err) { + } + if err = r.LangIndex(); chk.E(err) { } - chk.E(err) - log.I.F("completed rescanning %d events", i) return err }