Refactor SpiderFetch to use IdPkTs with Kind and optimize memory usage
This commit is contained in:
@@ -14,40 +14,32 @@ import (
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
// IdPkTs is a map of event IDs to their id, pubkey, and timestamp
|
||||
// IdPkTs is a map of event IDs to their id, pubkey, kind, and timestamp
|
||||
// This is used to reduce memory usage by storing only the essential information
|
||||
// instead of the full events
|
||||
type IdPkTs struct {
|
||||
Id []byte
|
||||
Pubkey []byte
|
||||
Kind uint16
|
||||
Timestamp int64
|
||||
}
|
||||
|
||||
func (s *Server) SpiderFetch(
|
||||
k *kinds.T, noFetch, noExtract bool, pubkeys ...[]byte,
|
||||
) (pks [][]byte, err error) {
|
||||
// Map to store id, pubkey, kind, and timestamp for each event
|
||||
// Key is a combination of pubkey and kind for deduplication
|
||||
pkKindMap := make(map[string]*IdPkTs)
|
||||
// Map to collect pubkeys from p tags
|
||||
pkMap := make(map[string]struct{})
|
||||
|
||||
// first search the local database
|
||||
pkList := tag.New(pubkeys...)
|
||||
f := &filter.F{
|
||||
Kinds: k,
|
||||
Authors: pkList,
|
||||
}
|
||||
var evs event.S
|
||||
// Map to store id, pubkey, and timestamp for each event
|
||||
idPkTsMap := make(map[string]*IdPkTs)
|
||||
if evs, err = s.Storage().QueryEvents(s.Ctx, f); chk.E(err) {
|
||||
// none were found, so we need to scan the spiders
|
||||
err = nil
|
||||
}
|
||||
// Extract id, pubkey, and timestamp from initial events
|
||||
for _, ev := range evs {
|
||||
idStr := ev.IdString()
|
||||
idPkTsMap[idStr] = &IdPkTs{
|
||||
Id: ev.Id,
|
||||
Pubkey: ev.Pubkey,
|
||||
Timestamp: ev.CreatedAtInt64(),
|
||||
}
|
||||
}
|
||||
|
||||
var kindsList string
|
||||
for i, kk := range k.K {
|
||||
if i > 0 {
|
||||
@@ -55,7 +47,55 @@ func (s *Server) SpiderFetch(
|
||||
}
|
||||
kindsList += kk.Name()
|
||||
}
|
||||
log.I.F("%d events found of type %s", len(evs), kindsList)
|
||||
|
||||
// Query local database
|
||||
var localEvents event.S
|
||||
if localEvents, err = s.Storage().QueryEvents(s.Ctx, f); chk.E(err) {
|
||||
// none were found, so we need to scan the spiders
|
||||
err = nil
|
||||
}
|
||||
|
||||
// Process local events
|
||||
for _, ev := range localEvents {
|
||||
// Create a key based on pubkey and kind for deduplication
|
||||
pkKindKey := string(ev.Pubkey) + string(ev.Kind.Marshal(nil))
|
||||
|
||||
// Check if we already have an event with this pubkey and kind
|
||||
existing, exists := pkKindMap[pkKindKey]
|
||||
|
||||
// If it doesn't exist or the new event is newer, store it
|
||||
if !exists || ev.CreatedAtInt64() > existing.Timestamp {
|
||||
pkKindMap[pkKindKey] = &IdPkTs{
|
||||
Id: ev.Id,
|
||||
Pubkey: ev.Pubkey,
|
||||
Kind: ev.Kind.ToU16(),
|
||||
Timestamp: ev.CreatedAtInt64(),
|
||||
}
|
||||
|
||||
// Extract p tags if not in noExtract mode
|
||||
if !noExtract {
|
||||
t := ev.Tags.GetAll(tag.New("p"))
|
||||
for _, tt := range t.ToSliceOfTags() {
|
||||
pkh := tt.Value()
|
||||
if len(pkh) != 2*schnorr.PubKeyBytesLen {
|
||||
continue
|
||||
}
|
||||
pk := make([]byte, schnorr.PubKeyBytesLen)
|
||||
if _, err = hex.DecBytes(pk, pkh); err != nil {
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
pkMap[string(pk)] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Nil the event to free memory
|
||||
ev = nil
|
||||
}
|
||||
|
||||
log.I.F("%d events found of type %s", len(pkKindMap), kindsList)
|
||||
|
||||
if !noFetch {
|
||||
// we need to search the spider seeds.
|
||||
// Break up pubkeys into batches of 128
|
||||
@@ -97,88 +137,72 @@ func (s *Server) SpiderFetch(
|
||||
err = nil
|
||||
return
|
||||
}
|
||||
// save the events to the database and extract id, pubkey, and timestamp
|
||||
for i, ev := range evss {
|
||||
log.I.F("saving event:\n%s", ev.Marshal(nil))
|
||||
if _, _, err = s.Storage().SaveEvent(
|
||||
s.Ctx, ev,
|
||||
); chk.E(err) {
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
|
||||
// Extract id, pubkey, and timestamp
|
||||
idStr := ev.IdString()
|
||||
idPkTsMap[idStr] = &IdPkTs{
|
||||
Id: ev.Id,
|
||||
Pubkey: ev.Pubkey,
|
||||
Timestamp: ev.CreatedAtInt64(),
|
||||
// Process each event immediately
|
||||
for i, ev := range evss {
|
||||
// Create a key based on pubkey and kind for deduplication
|
||||
pkKindKey := string(ev.Pubkey) + string(ev.Kind.Marshal(nil))
|
||||
|
||||
// Check if we already have an event with this pubkey and kind
|
||||
existing, exists := pkKindMap[pkKindKey]
|
||||
|
||||
// If it doesn't exist or the new event is newer, store it and save to database
|
||||
if !exists || ev.CreatedAtInt64() > existing.Timestamp {
|
||||
// Save the event to the database
|
||||
log.I.F("saving event:\n%s", ev.Marshal(nil))
|
||||
if _, _, err = s.Storage().SaveEvent(
|
||||
s.Ctx, ev,
|
||||
); chk.E(err) {
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
|
||||
// Store the essential information
|
||||
pkKindMap[pkKindKey] = &IdPkTs{
|
||||
Id: ev.Id,
|
||||
Pubkey: ev.Pubkey,
|
||||
Kind: ev.Kind.ToU16(),
|
||||
Timestamp: ev.CreatedAtInt64(),
|
||||
}
|
||||
|
||||
// Extract p tags if not in noExtract mode
|
||||
if !noExtract {
|
||||
t := ev.Tags.GetAll(tag.New("p"))
|
||||
for _, tt := range t.ToSliceOfTags() {
|
||||
pkh := tt.Value()
|
||||
if len(pkh) != 2*schnorr.PubKeyBytesLen {
|
||||
continue
|
||||
}
|
||||
pk := make([]byte, schnorr.PubKeyBytesLen)
|
||||
if _, err = hex.DecBytes(pk, pkh); err != nil {
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
pkMap[string(pk)] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Append the event to evs for further processing
|
||||
evs = append(evs, ev)
|
||||
|
||||
// Nil the event in the slice to free memory
|
||||
evss[i] = nil
|
||||
}
|
||||
|
||||
chk.E(s.Storage().Sync())
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
}
|
||||
}
|
||||
// deduplicate and take the newest
|
||||
// We need to query the database for the events we need to extract p tags from
|
||||
// since we've niled the events in memory
|
||||
|
||||
// If we're in noExtract mode, just return
|
||||
if noExtract {
|
||||
return
|
||||
}
|
||||
|
||||
// Create a list of event IDs to query
|
||||
var eventIds [][]byte
|
||||
for _, idPkTs := range idPkTsMap {
|
||||
eventIds = append(eventIds, idPkTs.Id)
|
||||
}
|
||||
|
||||
// Query the database for the events
|
||||
var eventsForExtraction event.S
|
||||
if len(eventIds) > 0 {
|
||||
// Create a filter for the event IDs
|
||||
idFilter := &filter.F{
|
||||
Ids: tag.New(eventIds...),
|
||||
}
|
||||
|
||||
// Query the database
|
||||
if eventsForExtraction, err = s.Storage().QueryEvents(
|
||||
s.Ctx, idFilter,
|
||||
); chk.E(err) {
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Extract the p tags
|
||||
pkMap := make(map[string]struct{})
|
||||
for _, ev := range eventsForExtraction {
|
||||
t := ev.Tags.GetAll(tag.New("p"))
|
||||
for _, tt := range t.ToSliceOfTags() {
|
||||
pkh := tt.Value()
|
||||
if len(pkh) != 2*schnorr.PubKeyBytesLen {
|
||||
continue
|
||||
}
|
||||
pk := make([]byte, schnorr.PubKeyBytesLen)
|
||||
if _, err = hex.DecBytes(pk, pkh); err != nil {
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
pkMap[string(pk)] = struct{}{}
|
||||
}
|
||||
|
||||
// Nil the event after extraction to free memory
|
||||
ev = nil
|
||||
}
|
||||
chk.E(s.Storage().Sync())
|
||||
debug.FreeOSMemory()
|
||||
// Convert the collected pubkeys to the return format
|
||||
for pk := range pkMap {
|
||||
pks = append(pks, []byte(pk))
|
||||
}
|
||||
|
||||
log.I.F("found %d pks", len(pks))
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user