Update event handling and replaceable event logic

- **socketapi/handleEvent.go**: Add conditional check to wrap `publish.P.Deliver` in a goroutine, ensuring asynchronous delivery only when the event is valid.
- **database/query-events.go**: Introduce logic to handle replaceable events by checking and processing duplicates based on `Pubkey` and kind before appending.
- **database/save-event.go**: Implement filtering for replaceable events, query for previous events of the same kind, and log replacements for improved traceability.
This commit is contained in:
2025-07-11 13:55:28 +01:00
parent 4135f1fd48
commit b5a4a11bcd
4 changed files with 46 additions and 7 deletions

10
.idea/workspace.xml generated
View File

@@ -18,9 +18,11 @@
<option name="autoReloadType" value="ALL" />
</component>
<component name="ChangeListManager">
<list default="true" id="848aca24-a3ec-4e50-a3d5-7b132d168000" name="Changes" comment="Update socket API to improve connection handling and subscription delivery&#10;&#10;- **socketapi/socketapi.go**: Remove redundant defer logic for connection close and logging; streamline the cleanup process after connection termination.&#10;- **socketapi/publisher.go**: Add detailed subscription event logs to track data sent to subscribers.">
<list default="true" id="848aca24-a3ec-4e50-a3d5-7b132d168000" name="Changes" comment="Update error handling in `socketapi/handleReq.go`&#10;&#10;- **socketapi/handleReq.go**: Replace `return` statements with `continue` in error conditions to ensure subsequent events are processed without interruption.">
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/socketapi/handleReq.go" beforeDir="false" afterPath="$PROJECT_DIR$/socketapi/handleReq.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/database/query-events.go" beforeDir="false" afterPath="$PROJECT_DIR$/database/query-events.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/database/save-event.go" beforeDir="false" afterPath="$PROJECT_DIR$/database/save-event.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/socketapi/handleEvent.go" beforeDir="false" afterPath="$PROJECT_DIR$/socketapi/handleEvent.go" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
@@ -260,7 +262,6 @@
<component name="VcsManagerConfiguration">
<option name="CHECK_CODE_SMELLS_BEFORE_PROJECT_COMMIT" value="false" />
<option name="CHECK_NEW_TODO" value="false" />
<MESSAGE value="refactor `GetIndexesFromFilter` and related index encodings to simplify serialization logic; add comprehensive event index tests" />
<MESSAGE value="change filter indexes to be pairs with created_at ranges" />
<MESSAGE value="partly implemented query for ids" />
<MESSAGE value="log event processing duration and throughput in save-event test" />
@@ -285,7 +286,8 @@
<MESSAGE value="Refactor socket API event handling and subscription delivery&#10;&#10;- **socketapi/handleEvent.go**: Add `publish` package for event delivery and replace redundant logging with `publish.P.Deliver` for streamlined event handling.&#10;- **socketapi/handleReq.go**: Simplify logging in `HandleReq` by removing unnecessary newline formatting.&#10;- **socketapi/socketapi.go**: Add context logging for improved debugging when context cancels.&#10;- **socketapi/publisher.go**: Add logs for subscription additions; commented out redundant lock handling in `Deliver` for concurrency refinement." />
<MESSAGE value="Update `socketapi` concurrency and logging improvements&#10;&#10;- **socketapi/socketapi.go**: Clean up unused defer logic for connection handling, removing redundant concurrency and logging code.&#10;- **socketapi/publisher.go**: Add detailed logs for subscription event deliveries to improve traceability during runtime." />
<MESSAGE value="Update socket API to improve connection handling and subscription delivery&#10;&#10;- **socketapi/socketapi.go**: Remove redundant defer logic for connection close and logging; streamline the cleanup process after connection termination.&#10;- **socketapi/publisher.go**: Add detailed subscription event logs to track data sent to subscribers." />
<option name="LAST_COMMIT_MESSAGE" value="Update socket API to improve connection handling and subscription delivery&#10;&#10;- **socketapi/socketapi.go**: Remove redundant defer logic for connection close and logging; streamline the cleanup process after connection termination.&#10;- **socketapi/publisher.go**: Add detailed subscription event logs to track data sent to subscribers." />
<MESSAGE value="Update error handling in `socketapi/handleReq.go`&#10;&#10;- **socketapi/handleReq.go**: Replace `return` statements with `continue` in error conditions to ensure subsequent events are processed without interruption." />
<option name="LAST_COMMIT_MESSAGE" value="Update error handling in `socketapi/handleReq.go`&#10;&#10;- **socketapi/handleReq.go**: Replace `return` statements with `continue` in error conditions to ensure subsequent events are processed without interruption." />
<option name="NON_MODAL_COMMIT_POSTPONE_SLOW_CHECKS" value="false" />
</component>
<component name="VgoProject">

View File

@@ -1,6 +1,7 @@
package database
import (
"bytes"
"orly.dev/chk"
"orly.dev/context"
"orly.dev/database/indexes/types"
@@ -56,8 +57,19 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
if ev, err = d.FetchEventBySerial(ser); err != nil {
continue
}
// we already know these are in correct order because QueryForIds
// already sorts them.
if ev.Kind.IsReplaceable() {
for _, e := range evs {
if bytes.Equal(
ev.Pubkey, e.Pubkey,
) && ev.Kind.K == e.Kind.K {
}
}
// } else if ev.Kind.IsParameterizedReplaceable(){
} else {
}
evs = append(evs, ev)
}
}

View File

@@ -8,6 +8,8 @@ import (
"orly.dev/database/indexes"
"orly.dev/database/indexes/types"
"orly.dev/event"
"orly.dev/filter"
"orly.dev/hex"
)
// SaveEvent saves an event to the database, generating all the necessary indexes.
@@ -16,6 +18,27 @@ func (d *D) SaveEvent(c context.T, ev *event.E) (kc, vc int, err error) {
buf := new(bytes.Buffer)
// Marshal the event to binary
ev.MarshalBinary(buf)
// Check if the event is replaceable
if ev.Kind != nil && ev.Kind.IsReplaceable() {
// Create a filter to find previous events of the same kind from the same pubkey
f := filter.New()
f.Kinds.K = append(f.Kinds.K, ev.Kind)
f.Authors = f.Authors.Append(ev.Pubkey)
// Query for previous events
var prevEvents event.S
if prevEvents, err = d.QueryEvents(c, f); chk.E(err) {
return
}
// If there are previous events, log that we're replacing one
if len(prevEvents) > 0 {
d.Logger.Infof("Saving new version of replaceable event kind %d from pubkey %s",
ev.Kind.K, hex.Enc(ev.Pubkey))
}
}
// Get the next sequence number for the event
var serial uint64
if serial, err = d.seq.Next(); chk.E(err) {

View File

@@ -48,7 +48,9 @@ func (a *A) HandleEvent(r []byte, s server.I, remote string) (msg []byte) {
ok, reason = s.AddEvent(
a.Context(), env.E, a.Listener.Req(), remote,
)
publish.P.Deliver(env.E)
if ok {
go publish.P.Deliver(env.E)
}
if err = okenvelope.NewFrom(
env.Id(), ok, reason,
).Write(a.Listener); chk.E(err) {