Enhance SaveEvent logic to handle older event rejection with error reporting, validate timestamps in parameterized replaceable events, and improve HandleEvent error handling for blocked events.
This commit is contained in:
@@ -129,7 +129,17 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
|
|||||||
}
|
}
|
||||||
// store the event
|
// store the event
|
||||||
log.I.F("saving event %0x, %s", env.E.ID, env.E.Serialize())
|
log.I.F("saving event %0x, %s", env.E.ID, env.E.Serialize())
|
||||||
if _, _, err = l.SaveEvent(l.Ctx, env.E); chk.E(err) {
|
if _, _, err = l.SaveEvent(l.Ctx, env.E); err != nil {
|
||||||
|
if strings.HasPrefix(err.Error(), "blocked:") {
|
||||||
|
errStr := err.Error()[len("blocked: "):len(err.Error())]
|
||||||
|
if err = Ok.Error(
|
||||||
|
l, env, errStr,
|
||||||
|
); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
chk.E(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Send a success response storing
|
// Send a success response storing
|
||||||
|
|||||||
@@ -99,12 +99,13 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Don't save the older event
|
// Don't save the older event - return an error
|
||||||
|
err = errorf.E("blocked: event is older than existing replaceable event")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if kind.IsParameterizedReplaceable(ev.Kind) {
|
} else if kind.IsParameterizedReplaceable(ev.Kind) {
|
||||||
// find the events and delete them
|
// find the events and check timestamps before deleting
|
||||||
dTag := ev.Tags.GetFirst([]byte("d"))
|
dTag := ev.Tags.GetFirst([]byte("d"))
|
||||||
if dTag == nil {
|
if dTag == nil {
|
||||||
err = errorf.E("event is missing a d tag identifier")
|
err = errorf.E("event is missing a d tag identifier")
|
||||||
@@ -121,19 +122,41 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
|
|||||||
if sers, err = d.GetSerialsFromFilter(f); chk.E(err) {
|
if sers, err = d.GetSerialsFromFilter(f); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// if found, delete them
|
// if found, check timestamps before deleting
|
||||||
if len(sers) > 0 {
|
if len(sers) > 0 {
|
||||||
|
var shouldReplace bool = true
|
||||||
for _, s := range sers {
|
for _, s := range sers {
|
||||||
var oldEv *event.E
|
var oldEv *event.E
|
||||||
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
|
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err = d.DeleteEventBySerial(
|
// Only replace if the new event is newer or same timestamp
|
||||||
c, s, oldEv,
|
if ev.CreatedAt < oldEv.CreatedAt {
|
||||||
); chk.E(err) {
|
log.I.F("SaveEvent: rejecting older addressable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)",
|
||||||
continue
|
hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID), oldEv.CreatedAt)
|
||||||
|
shouldReplace = false
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if shouldReplace {
|
||||||
|
for _, s := range sers {
|
||||||
|
var oldEv *event.E
|
||||||
|
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.I.F("SaveEvent: replacing older addressable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)",
|
||||||
|
hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID), ev.CreatedAt)
|
||||||
|
if err = d.DeleteEventBySerial(
|
||||||
|
c, s, oldEv,
|
||||||
|
); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Don't save the older event - return an error
|
||||||
|
err = errorf.E("blocked: event is older than existing addressable event")
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Get the next sequence number for the event
|
// Get the next sequence number for the event
|
||||||
|
|||||||
Reference in New Issue
Block a user