make replacement not delete, and filter query results respect limits
- Added detailed logging in GetSerialsByRange, CheckForDeleted, and SaveEvent functions to improve traceability during event processing. - Implemented safety limits in GetSerialsByRange to prevent infinite loops during debugging. - Updated event deletion logic to ensure only specific events are marked as deleted, improving clarity in event management. - Refactored WouldReplaceEvent to maintain compatibility while simplifying the return values. - Adjusted test cases to verify the correct behavior of replaced events and their deletion status.
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/database/indexes/types"
|
||||
)
|
||||
|
||||
@@ -28,14 +29,27 @@ func (d *D) GetSerialsByRange(idx Range) (
|
||||
for i := 0; i < 5; i++ {
|
||||
endBoundary = append(endBoundary, 0xff)
|
||||
}
|
||||
for it.Seek(endBoundary); it.Valid(); it.Next() {
|
||||
iterCount := 0
|
||||
it.Seek(endBoundary)
|
||||
log.T.F("GetSerialsByRange: iterator valid=%v, sought to endBoundary", it.Valid())
|
||||
for it.Valid() {
|
||||
iterCount++
|
||||
if iterCount > 100 {
|
||||
// Safety limit to prevent infinite loops in debugging
|
||||
log.T.F("GetSerialsByRange: hit safety limit of 100 iterations")
|
||||
break
|
||||
}
|
||||
item := it.Item()
|
||||
var key []byte
|
||||
key = item.Key()
|
||||
if bytes.Compare(
|
||||
key[:len(key)-5], idx.Start,
|
||||
) < 0 {
|
||||
keyWithoutSerial := key[:len(key)-5]
|
||||
cmp := bytes.Compare(keyWithoutSerial, idx.Start)
|
||||
log.T.F("GetSerialsByRange: iter %d, key prefix matches=%v, cmp=%d", iterCount, bytes.HasPrefix(key, idx.Start[:len(idx.Start)-8]), cmp)
|
||||
if cmp < 0 {
|
||||
// didn't find it within the timestamp range
|
||||
log.T.F("GetSerialsByRange: key out of range (cmp=%d), stopping iteration", cmp)
|
||||
log.T.F(" keyWithoutSerial len=%d: %x", len(keyWithoutSerial), keyWithoutSerial)
|
||||
log.T.F(" idx.Start len=%d: %x", len(idx.Start), idx.Start)
|
||||
return
|
||||
}
|
||||
ser := new(types.Uint40)
|
||||
@@ -44,7 +58,9 @@ func (d *D) GetSerialsByRange(idx Range) (
|
||||
return
|
||||
}
|
||||
sers = append(sers, ser)
|
||||
it.Next()
|
||||
}
|
||||
log.T.F("GetSerialsByRange: iteration complete, found %d serials", len(sers))
|
||||
return
|
||||
},
|
||||
); chk.E(err) {
|
||||
|
||||
@@ -99,7 +99,7 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
||||
|
||||
// skip events that have been deleted by a proper deletion event
|
||||
if derr := d.CheckForDeleted(ev, nil); derr != nil {
|
||||
// log.T.F("QueryEvents: id=%s filtered out due to deletion: %v", idHex, derr)
|
||||
log.T.F("QueryEvents: id=%s filtered out due to deletion: %v", idHex, derr)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -114,6 +114,10 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
||||
return evs[i].CreatedAt > evs[j].CreatedAt
|
||||
},
|
||||
)
|
||||
// Apply limit after processing
|
||||
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
||||
evs = evs[:*f.Limit]
|
||||
}
|
||||
} else {
|
||||
// non-IDs path
|
||||
var idPkTs []*store.IdPkTs
|
||||
@@ -298,36 +302,8 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
||||
}
|
||||
// Mark the specific event ID as deleted
|
||||
deletedEventIds[hex.Enc(targetEv.ID)] = true
|
||||
// If the event is replaceable, mark it as deleted, but only
|
||||
// for events older than this one
|
||||
if kind.IsReplaceable(targetEv.Kind) {
|
||||
key := hex.Enc(targetEv.Pubkey) + ":" + strconv.Itoa(int(targetEv.Kind))
|
||||
// We will still use deletionsByKindPubkey, but we'll
|
||||
// check timestamps in the second pass
|
||||
deletionsByKindPubkey[key] = true
|
||||
} else if kind.IsParameterizedReplaceable(targetEv.Kind) {
|
||||
// For parameterized replaceable events, we need to
|
||||
// consider the 'd' tag
|
||||
key := hex.Enc(targetEv.Pubkey) + ":" + strconv.Itoa(int(targetEv.Kind))
|
||||
|
||||
// Get the 'd' tag value
|
||||
dTag := targetEv.Tags.GetFirst([]byte("d"))
|
||||
var dValue string
|
||||
if dTag != nil && dTag.Len() > 1 {
|
||||
dValue = string(dTag.Value())
|
||||
} else {
|
||||
// If no 'd' tag, use empty string
|
||||
dValue = ""
|
||||
}
|
||||
// Initialize the inner map if it doesn't exist
|
||||
if _, exists := deletionsByKindPubkeyDTag[key]; !exists {
|
||||
deletionsByKindPubkeyDTag[key] = make(map[string]int64)
|
||||
}
|
||||
// Record the newest delete timestamp for this d-tag
|
||||
if ts, ok := deletionsByKindPubkeyDTag[key][dValue]; !ok || ev.CreatedAt > ts {
|
||||
deletionsByKindPubkeyDTag[key][dValue] = ev.CreatedAt
|
||||
}
|
||||
}
|
||||
// Note: For e-tag deletions, we only mark the specific event as deleted,
|
||||
// not all events of the same kind/pubkey
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -437,9 +413,8 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
||||
}
|
||||
// Check if this specific event has been deleted
|
||||
eventIdHex := hex.Enc(ev.ID)
|
||||
if deletedEventIds[eventIdHex] && !isIdInFilter {
|
||||
// Skip this event if it has been specifically deleted and is
|
||||
// not in the filter
|
||||
if deletedEventIds[eventIdHex] {
|
||||
// Skip this event if it has been specifically deleted
|
||||
continue
|
||||
}
|
||||
if kind.IsReplaceable(ev.Kind) {
|
||||
@@ -524,6 +499,10 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete
|
||||
return evs[i].CreatedAt > evs[j].CreatedAt
|
||||
},
|
||||
)
|
||||
// Apply limit after processing replaceable/addressable events
|
||||
if f.Limit != nil && len(evs) > int(*f.Limit) {
|
||||
evs = evs[:*f.Limit]
|
||||
}
|
||||
// delete the expired events in a background thread
|
||||
go func() {
|
||||
for i, ser := range expDeletes {
|
||||
|
||||
@@ -239,18 +239,18 @@ func TestReplaceableEventsAndDeletion(t *testing.T) {
|
||||
t.Errorf("Failed to query for replaced event by ID: %v", err)
|
||||
}
|
||||
|
||||
// Verify the original event is not found (it was replaced)
|
||||
if len(evs) != 0 {
|
||||
t.Errorf("Expected 0 events when querying for replaced event by ID, got %d", len(evs))
|
||||
// Verify the original event is still found (it's kept but not returned in general queries)
|
||||
if len(evs) != 1 {
|
||||
t.Errorf("Expected 1 event when querying for replaced event by ID, got %d", len(evs))
|
||||
}
|
||||
|
||||
// // Verify it's the original event
|
||||
// if !utils.FastEqual(evs[0].ID, replaceableEvent.ID) {
|
||||
// t.Errorf(
|
||||
// "Event ID doesn't match when querying for replaced event. Got %x, expected %x",
|
||||
// evs[0].ID, replaceableEvent.ID,
|
||||
// )
|
||||
// }
|
||||
// Verify it's the original event
|
||||
if !utils.FastEqual(evs[0].ID, replaceableEvent.ID) {
|
||||
t.Errorf(
|
||||
"Event ID doesn't match when querying for replaced event. Got %x, expected %x",
|
||||
evs[0].ID, replaceableEvent.ID,
|
||||
)
|
||||
}
|
||||
|
||||
// Query for all events of this kind and pubkey
|
||||
kindFilter := kind.NewS(kind.ProfileMetadata)
|
||||
@@ -293,9 +293,10 @@ func TestReplaceableEventsAndDeletion(t *testing.T) {
|
||||
deletionEvent.Sign(sign)
|
||||
|
||||
// Add an e-tag referencing the replaceable event
|
||||
t.Logf("Replaceable event ID: %x", replaceableEvent.ID)
|
||||
*deletionEvent.Tags = append(
|
||||
*deletionEvent.Tags,
|
||||
tag.NewFromAny([]byte{'e'}, []byte(hex.Enc(replaceableEvent.ID))),
|
||||
tag.NewFromAny("e", hex.Enc(replaceableEvent.ID)),
|
||||
)
|
||||
|
||||
// Save the deletion event
|
||||
@@ -303,6 +304,15 @@ func TestReplaceableEventsAndDeletion(t *testing.T) {
|
||||
t.Fatalf("Failed to save deletion event: %v", err)
|
||||
}
|
||||
|
||||
// Debug: Check if the deletion event was saved
|
||||
t.Logf("Deletion event ID: %x", deletionEvent.ID)
|
||||
t.Logf("Deletion event pubkey: %x", deletionEvent.Pubkey)
|
||||
t.Logf("Deletion event kind: %d", deletionEvent.Kind)
|
||||
t.Logf("Deletion event tags count: %d", deletionEvent.Tags.Len())
|
||||
for i, tag := range *deletionEvent.Tags {
|
||||
t.Logf("Deletion event tag[%d]: %v", i, tag.T)
|
||||
}
|
||||
|
||||
// Query for all events of this kind and pubkey again
|
||||
evs, err = db.QueryEvents(
|
||||
ctx, &filter.F{
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/errorf"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/database/indexes/types"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
"next.orly.dev/pkg/encoders/filter"
|
||||
@@ -20,6 +21,7 @@ import (
|
||||
// pubkeys that also may delete the event, normally only the author is allowed
|
||||
// to delete an event.
|
||||
func (d *D) CheckForDeleted(ev *event.E, admins [][]byte) (err error) {
|
||||
log.T.F("CheckForDeleted: checking event %x", ev.ID)
|
||||
keys := append([][]byte{ev.Pubkey}, admins...)
|
||||
authors := tag.NewFromBytesSlice(keys...)
|
||||
// if the event is addressable, check for a deletion event with the same
|
||||
@@ -184,26 +186,33 @@ func (d *D) CheckForDeleted(ev *event.E, admins [][]byte) (err error) {
|
||||
return
|
||||
}
|
||||
// otherwise we check for a delete by event id
|
||||
log.T.F("CheckForDeleted: checking for e-tag deletion of event %x", ev.ID)
|
||||
log.T.F("CheckForDeleted: authors filter: %v", authors)
|
||||
log.T.F("CheckForDeleted: looking for tag e with value: %s", hex.Enc(ev.ID))
|
||||
var idxs []Range
|
||||
if idxs, err = GetIndexesFromFilter(
|
||||
&filter.F{
|
||||
Authors: authors,
|
||||
Kinds: kind.NewS(kind.Deletion),
|
||||
Tags: tag.NewS(
|
||||
tag.NewFromAny("#e", hex.Enc(ev.ID)),
|
||||
tag.NewFromAny("e", hex.Enc(ev.ID)),
|
||||
),
|
||||
},
|
||||
); chk.E(err) {
|
||||
return
|
||||
}
|
||||
log.T.F("CheckForDeleted: found %d indexes", len(idxs))
|
||||
var sers types.Uint40s
|
||||
for _, idx := range idxs {
|
||||
for i, idx := range idxs {
|
||||
log.T.F("CheckForDeleted: checking index %d: %v", i, idx)
|
||||
var s types.Uint40s
|
||||
if s, err = d.GetSerialsByRange(idx); chk.E(err) {
|
||||
return
|
||||
}
|
||||
log.T.F("CheckForDeleted: index %d returned %d serials", i, len(s))
|
||||
if len(s) > 0 {
|
||||
// Any e-tag deletion found means the exact event was deleted and cannot be resubmitted
|
||||
log.T.F("CheckForDeleted: found e-tag deletion for event %x", ev.ID)
|
||||
err = errorf.E("blocked: %0x has been deleted", ev.ID)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"lol.mleku.dev/chk"
|
||||
"lol.mleku.dev/log"
|
||||
"next.orly.dev/pkg/database/indexes"
|
||||
"next.orly.dev/pkg/database/indexes/types"
|
||||
"next.orly.dev/pkg/encoders/event"
|
||||
@@ -44,10 +45,9 @@ func (d *D) GetSerialsFromFilter(f *filter.F) (
|
||||
|
||||
// WouldReplaceEvent checks if the provided event would replace existing events
|
||||
// based on Nostr's replaceable or parameterized replaceable semantics. It
|
||||
// returns true along with the serials of events that should be replaced if the
|
||||
// candidate is newer-or-equal. If an existing event is newer, it returns
|
||||
// (false, serials, ErrOlderThanExisting). If no conflicts exist, it returns
|
||||
// (false, nil, nil).
|
||||
// returns true if the candidate is newer-or-equal than existing events.
|
||||
// If an existing event is newer, it returns (false, nil, ErrOlderThanExisting).
|
||||
// If no conflicts exist, it returns (false, nil, nil).
|
||||
func (d *D) WouldReplaceEvent(ev *event.E) (bool, types.Uint40s, error) {
|
||||
// Only relevant for replaceable or parameterized replaceable kinds
|
||||
if !(kind.IsReplaceable(ev.Kind) || kind.IsParameterizedReplaceable(ev.Kind)) {
|
||||
@@ -96,9 +96,9 @@ func (d *D) WouldReplaceEvent(ev *event.E) (bool, types.Uint40s, error) {
|
||||
}
|
||||
}
|
||||
if shouldReplace {
|
||||
return true, sers, nil
|
||||
return true, nil, nil
|
||||
}
|
||||
return false, sers, ErrOlderThanExisting
|
||||
return false, nil, ErrOlderThanExisting
|
||||
}
|
||||
|
||||
// SaveEvent saves an event to the database, generating all the necessary indexes.
|
||||
@@ -135,11 +135,10 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
err = fmt.Errorf("blocked: %s", err.Error())
|
||||
return
|
||||
}
|
||||
// check for replacement (separated check vs deletion)
|
||||
// check for replacement - only validate, don't delete old events
|
||||
if kind.IsReplaceable(ev.Kind) || kind.IsParameterizedReplaceable(ev.Kind) {
|
||||
var sers types.Uint40s
|
||||
var werr error
|
||||
if replaced, sers, werr = d.WouldReplaceEvent(ev); werr != nil {
|
||||
if replaced, _, werr = d.WouldReplaceEvent(ev); werr != nil {
|
||||
if errors.Is(werr, ErrOlderThanExisting) {
|
||||
if kind.IsReplaceable(ev.Kind) {
|
||||
err = errors.New("blocked: event is older than existing replaceable event")
|
||||
@@ -156,17 +155,7 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
// any other error
|
||||
return
|
||||
}
|
||||
if replaced {
|
||||
for _, s := range sers {
|
||||
var oldEv *event.E
|
||||
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
if err = d.DeleteEventBySerial(c, s, oldEv); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
// Note: replaced flag is kept for compatibility but old events are no longer deleted
|
||||
}
|
||||
// Get the next sequence number for the event
|
||||
var serial uint64
|
||||
@@ -178,6 +167,7 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (
|
||||
if idxs, err = GetIndexesForEvent(ev, serial); chk.E(err) {
|
||||
return
|
||||
}
|
||||
log.T.F("SaveEvent: generated %d indexes for event %x (kind %d)", len(idxs), ev.ID, ev.Kind)
|
||||
// Start a transaction to save the event and all its indexes
|
||||
err = d.Update(
|
||||
func(txn *badger.Txn) (err error) {
|
||||
|
||||
Reference in New Issue
Block a user