Add support for expiration indexing and event deletion

- pkg/database/database.go
  - Added `RunMigrations` to handle new index versions.
  - Integrated `DeleteExpired` for scheduled cleanup of expired events within a goroutine.

- pkg/database/delete-event.go
  - Refactored the existing deletion logic into `DeleteEventBySerial`.

- pkg/database/delete-expired.go
  - Added new implementation to handle deletion of expired events using expiration indexes.

- pkg/database/migrations.go
  - Implemented `RunMigrations` to handle database versioning and reindexing when new keys are introduced.

- pkg/database/indexes/keys.go
  - Added `ExpirationPrefix` and `VersionPrefix` for new expiration and version indexes.
  - Implemented encoding structs for expiration and version handling.

- pkg/encoders/event/writer.go
  - Added JSON marshaling logic to serialize events with or without whitespace.

- pkg/encoders/event/reader.go
  - Refined unmarshaling logic for handling event keys and values robustly.

- pkg/protocol/socketapi/handleEvent.go
  - Formatted log statements and updated logging verbosity for event handling.

- pkg/app/relay/handleRelayinfo.go
  - Re-enabled relay handling for expiration timestamps.

- pkg/database/indexes/types.go (new file)
  - Introduced structures for `Uint40s` and other types used in indexes.
This commit is contained in:
2025-08-15 15:50:31 +01:00
parent a2cce3f38b
commit 66be769f7a
12 changed files with 621 additions and 90 deletions

View File

@@ -2,7 +2,7 @@
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go
#
# Release Process:
# 1. Update the version in pkg/version/version file (e.g., v1.2.3)
# 1. Update the version in the pkg/version/version file (e.g. v1.2.3)
# 2. Create and push a tag matching the version:
# git tag v1.2.3
# git push origin v1.2.3
@@ -28,7 +28,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.24'
go-version: '1.25'
- name: Install libsecp256k1
run: ./scripts/ubuntu_install_libsecp256k1.sh
@@ -61,7 +61,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.24'
go-version: '1.25'
- name: Install libsecp256k1
run: ./scripts/ubuntu_install_libsecp256k1.sh
@@ -77,20 +77,20 @@ jobs:
mkdir -p release-binaries
# Build for different platforms
GOOS=linux GOARCH=amd64 CGO_ENABLED=1 go build --ldflags '-extldflags "-static"' -o release-binaries/orly-${VERSION}-linux-amd64 .
GOOS=linux GOARCH=arm64 CGO_ENABLED=1 go build --ldflags '-extldflags "-static"' -o release-binaries/orly-${VERSION}-linux-arm64 .
GOOS=darwin GOARCH=amd64 CGO_ENABLED=0 go build -o release-binaries/orly-${VERSION}-darwin-amd64 .
GOOS=darwin GOARCH=arm64 CGO_ENABLED=0 go build -o release-binaries/orly-${VERSION}-darwin-arm64 .
GOOS=windows GOARCH=amd64 CGO_ENABLED=0 go build -o release-binaries/orly-${VERSION}-windows-amd64.exe .
GOEXPERIMENT=greenteagc,jsonv2 GOOS=linux GOARCH=amd64 CGO_ENABLED=1 go build --ldflags '-extldflags "-static"' -o release-binaries/orly-${VERSION}-linux-amd64 .
GOEXPERIMENT=greenteagc,jsonv2 GOOS=linux GOARCH=arm64 CGO_ENABLED=1 go build --ldflags '-extldflags "-static"' -o release-binaries/orly-${VERSION}-linux-arm64 .
GOEXPERIMENT=greenteagc,jsonv2 GOOS=darwin GOARCH=amd64 CGO_ENABLED=0 go build -o release-binaries/orly-${VERSION}-darwin-amd64 .
GOEXPERIMENT=greenteagc,jsonv2 GOOS=darwin GOARCH=arm64 CGO_ENABLED=0 go build -o release-binaries/orly-${VERSION}-darwin-arm64 .
GOEXPERIMENT=greenteagc,jsonv2 GOOS=windows GOARCH=amd64 CGO_ENABLED=0 go build -o release-binaries/orly-${VERSION}-windows-amd64.exe .
# Build cmd executables
for cmd in lerproxy nauth nurl vainstr walletcli; do
echo "Building $cmd"
GOOS=linux GOARCH=amd64 CGO_ENABLED=1 go build --ldflags '-extldflags "-static"' -o release-binaries/${cmd}-${VERSION}-linux-amd64 ./cmd/${cmd}
GOOS=linux GOARCH=arm64 CGO_ENABLED=1 go build --ldflags '-extldflags "-static"' -o release-binaries/${cmd}-${VERSION}-linux-arm64 ./cmd/${cmd}
GOOS=darwin GOARCH=amd64 CGO_ENABLED=0 go build -o release-binaries/${cmd}-${VERSION}-darwin-amd64 ./cmd/${cmd}
GOOS=darwin GOARCH=arm64 CGO_ENABLED=0 go build -o release-binaries/${cmd}-${VERSION}-darwin-arm64 ./cmd/${cmd}
GOOS=windows GOARCH=amd64 CGO_ENABLED=0 go build -o release-binaries/${cmd}-${VERSION}-windows-amd64.exe ./cmd/${cmd}
GOEXPERIMENT=greenteagc,jsonv2 GOOS=linux GOARCH=amd64 CGO_ENABLED=1 go build --ldflags '-extldflags "-static"' -o release-binaries/${cmd}-${VERSION}-linux-amd64 ./cmd/${cmd}
GOEXPERIMENT=greenteagc,jsonv2 GOOS=linux GOARCH=arm64 CGO_ENABLED=1 go build --ldflags '-extldflags "-static"' -o release-binaries/${cmd}-${VERSION}-linux-arm64 ./cmd/${cmd}
GOEXPERIMENT=greenteagc,jsonv2 GOOS=darwin GOARCH=amd64 CGO_ENABLED=0 go build -o release-binaries/${cmd}-${VERSION}-darwin-amd64 ./cmd/${cmd}
GOEXPERIMENT=greenteagc,jsonv2 GOOS=darwin GOARCH=arm64 CGO_ENABLED=0 go build -o release-binaries/${cmd}-${VERSION}-darwin-arm64 ./cmd/${cmd}
GOEXPERIMENT=greenteagc,jsonv2 GOOS=windows GOARCH=amd64 CGO_ENABLED=0 go build -o release-binaries/${cmd}-${VERSION}-windows-amd64.exe ./cmd/${cmd}
done
# Create checksums

View File

@@ -44,7 +44,7 @@ func (s *Server) HandleRelayInfo(w http.ResponseWriter, r *http.Request) {
relayinfo.EventTreatment,
// relayinfo.CommandResults,
relayinfo.ParameterizedReplaceableEvents,
// relayinfo.ExpirationTimestamp,
relayinfo.ExpirationTimestamp,
relayinfo.ProtectedEvents,
// relayinfo.RelayListMetadata,
)

View File

@@ -11,6 +11,7 @@ import (
"orly.dev/pkg/utils/units"
"os"
"path/filepath"
"time"
)
type D struct {
@@ -57,8 +58,19 @@ func New(ctx context.T, cancel context.F, dataDir, logLevel string) (
if d.seq, err = d.DB.GetSequence([]byte("EVENTS"), 1000); chk.E(err) {
return
}
// run code that updates indexes when new indexes have been added and bumps
// the version so they aren't run again.
d.RunMigrations()
// start up the expiration tag processing and shut down and clean up the
// database after the context is canceled.
go func() {
<-d.ctx.Done()
expirationTicker := time.NewTicker(time.Minute * 10)
select {
case <-expirationTicker.C:
d.DeleteExpired()
return
case <-d.ctx.Done():
}
d.cancel()
d.seq.Release()
d.DB.Close()

View File

@@ -33,9 +33,18 @@ func (d *D) DeleteEvent(c context.T, eid *eventid.T) (err error) {
return
}
if ev == nil {
// Event wasn't found, nothing to delete
// Event wasn't found, nothing to delete. this shouldn't happen.
return
}
if err = d.DeleteEventBySerial(c, ser, ev); chk.E(err) {
return
}
return
}
func (d *D) DeleteEventBySerial(
c context.T, ser *types.Uint40, ev *event.E,
) (err error) {
// Get all indexes for the event
var idxs [][]byte
idxs, err = GetIndexesForEvent(ev, ser.Get())

View File

@@ -0,0 +1,59 @@
package database
import (
"bytes"
"github.com/dgraph-io/badger/v4"
"orly.dev/pkg/database/indexes"
"orly.dev/pkg/database/indexes/types"
"orly.dev/pkg/encoders/event"
"orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/context"
"time"
)
func (d *D) DeleteExpired() {
var err error
var expiredSerials types.Uint40s
// make the operation atomic and save on accesses to the system clock by
// setting the boundary at the current second
now := time.Now().Unix()
// search the expiration indexes for expiry timestamps that are now past
if err = d.View(
func(txn *badger.Txn) (err error) {
exp, ser := indexes.ExpirationVars()
expPrf := new(bytes.Buffer)
if _, err = indexes.ExpirationPrefix.Write(expPrf); chk.E(err) {
return
}
it := txn.NewIterator(badger.IteratorOptions{Prefix: expPrf.Bytes()})
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := item.KeyCopy(nil)
buf := bytes.NewBuffer(key)
if err = indexes.ExpirationDec(
exp, ser,
).UnmarshalRead(buf); chk.E(err) {
continue
}
if int64(exp.Get()) > now {
// not expired yet
continue
}
expiredSerials = append(expiredSerials, ser)
}
return
},
); chk.E(err) {
}
// delete the events and their indexes
for _, ser := range expiredSerials {
var ev *event.E
if ev, err = d.FetchEventBySerial(ser); chk.E(err) {
continue
}
if err = d.DeleteEventBySerial(context.Bg(), ser, ev); chk.E(err) {
continue
}
}
}

View File

@@ -16,10 +16,8 @@ func appendIndexBytes(idxs *[][]byte, idx *indexes.T) (err error) {
return
}
// Copy the buffer's bytes to a new byte slice
bytes := make([]byte, buf.Len())
copy(bytes, buf.Bytes())
// Append the byte slice to the idxs slice
*idxs = append(*idxs, bytes)
*idxs = append(*idxs, buf.Bytes())
return
}

View File

@@ -67,6 +67,9 @@ const (
TagKindPrefix = I("tkc") // tag, kind, created at
TagPubkeyPrefix = I("tpc") // tag, pubkey, created at
TagKindPubkeyPrefix = I("tkp") // tag, kind, pubkey, created at
ExpirationPrefix = I("exp") // timestamp of expiration
VersionPrefix = I("ver") // database version number, for triggering reindexes when new keys are added (policy is add-only).
)
// Prefix returns the three byte human-readable prefixes that go in front of
@@ -97,6 +100,11 @@ func Prefix(prf int) (i I) {
return TagPubkeyPrefix
case TagKindPubkey:
return TagKindPubkeyPrefix
case Expiration:
return ExpirationPrefix
case Version:
return VersionPrefix
}
return
}
@@ -135,6 +143,9 @@ func Identify(r io.Reader) (i int, err error) {
i = TagPubkey
case TagKindPubkeyPrefix:
i = TagKindPubkey
case ExpirationPrefix:
i = Expiration
}
return
}
@@ -146,7 +157,7 @@ type Encs []codec.I
type T struct{ Encs }
// New creates a new indexes.T. The helper functions below have an encode and
// decode variant, the decode variant does not add the prefix encoder because it
// decode variant, the decode variant doesn't add the prefix encoder because it
// has been read by Identify or just is being read, and found because it was
// written for the prefix in the iteration.
func New(encoders ...codec.I) (i *T) { return &T{encoders} }
@@ -359,7 +370,7 @@ func TagPubkeyDec(
// TagKindPubkey
//
// 3 prefix|1 key letter|8 value hash|2 kind|8 pubkey hash|8 bytes timestamp|5 byte serial
// 3 prefix|1 key letter|8 value hash|2 kind|8 pubkey hash|8 bytes timestamp|5 serial
var TagKindPubkey = next()
func TagKindPubkeyVars() (
@@ -383,3 +394,45 @@ func TagKindPubkeyDec(
) (enc *T) {
return New(NewPrefix(), ki, p, k, v, ca, ser)
}
// Expiration
//
// 3 prefix|8 timestamp|5 serial
var Expiration = next()
func ExpirationVars() (
exp *types.Uint64, ser *types.Uint40,
) {
return new(types.Uint64), new(types.Uint40)
}
func ExpirationEnc(
exp *types.Uint64, ser *types.Uint40,
) (enc *T) {
return New(NewPrefix(Expiration), exp, ser)
}
func ExpirationDec(
exp *types.Uint64, ser *types.Uint40,
) (enc *T) {
return New(NewPrefix(), exp, ser)
}
// Version
//
// 3 prefix|4 version
var Version = next()
func VersionVars() (
ver *types.Uint32,
) {
return new(types.Uint32)
}
func VersionEnc(
ver *types.Uint32,
) (enc *T) {
return New(NewPrefix(Version), ver)
}
func VersionDec(
ver *types.Uint32,
) (enc *T) {
return New(NewPrefix(), ver)
}

152
pkg/database/migrations.go Normal file
View File

@@ -0,0 +1,152 @@
package database
import (
"bytes"
"github.com/dgraph-io/badger/v4"
"orly.dev/pkg/database/indexes"
"orly.dev/pkg/database/indexes/types"
"orly.dev/pkg/encoders/event"
"orly.dev/pkg/encoders/ints"
"orly.dev/pkg/encoders/tag"
"orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/log"
"sort"
)
const (
currentVersion uint32 = 0
)
func (d *D) RunMigrations() {
log.I.F("running migrations...")
var err error
var dbVersion uint32
// first find the current version tag if any
if err = d.View(
func(txn *badger.Txn) (err error) {
buf := new(bytes.Buffer)
if err = indexes.VersionEnc(nil).MarshalWrite(buf); chk.E(err) {
return
}
verPrf := new(bytes.Buffer)
if _, err = indexes.VersionPrefix.Write(verPrf); chk.E(err) {
return
}
it := txn.NewIterator(
badger.IteratorOptions{
Prefix: verPrf.Bytes(),
},
)
defer it.Close()
ver := indexes.VersionVars()
for it.Rewind(); it.Valid(); it.Next() {
// there should only be one
item := it.Item()
key := item.KeyCopy(nil)
if err = indexes.VersionDec(ver).UnmarshalRead(
bytes.NewBuffer(key),
); chk.E(err) {
return
}
dbVersion = ver.Get()
}
return
},
); chk.E(err) {
}
if dbVersion == 0 {
log.D.F("no version tag found, creating...")
// write the version tag now
if err = d.Update(
func(txn *badger.Txn) (err error) {
buf := new(bytes.Buffer)
vv := new(types.Uint32)
vv.Set(currentVersion)
if err = indexes.VersionEnc(vv).MarshalWrite(buf); chk.E(err) {
return
}
return
},
); chk.E(err) {
return
}
}
if dbVersion < 1 {
// the first migration is expiration tags
d.UpdateExpirationTags()
}
}
func (d *D) UpdateExpirationTags() {
log.T.F("updating expiration tag indexes...")
var err error
var expIndexes [][]byte
// iterate all event records and decode and look for version tags
if err = d.View(
func(txn *badger.Txn) (err error) {
prf := new(bytes.Buffer)
if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) {
return
}
it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()})
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
var val []byte
if val, err = item.ValueCopy(nil); chk.E(err) {
continue
}
// decode the event
ev := new(event.E)
if err = ev.UnmarshalRead(bytes.NewBuffer(val)); chk.E(err) {
continue
}
expTag := ev.Tags.GetFirst(tag.New("expiration"))
if expTag == nil {
continue
}
expTS := ints.New(0)
if _, err = expTS.Unmarshal(expTag.Value()); chk.E(err) {
continue
}
key := item.KeyCopy(nil)
ser := indexes.EventVars()
if err = indexes.EventDec(ser).UnmarshalRead(
bytes.NewBuffer(key),
); chk.E(err) {
continue
}
// create the expiration tag
exp, _ := indexes.ExpirationVars()
exp.Set(expTS.N)
expBuf := new(bytes.Buffer)
if err = indexes.ExpirationEnc(
exp, ser,
).MarshalWrite(expBuf); chk.E(err) {
continue
}
expIndexes = append(expIndexes, expBuf.Bytes())
}
return
},
); chk.E(err) {
return
}
// sort the indexes first so they're written in order, improving compaction
// and iteration.
sort.Slice(
expIndexes, func(i, j int) bool {
return bytes.Compare(expIndexes[i], expIndexes[j]) < 0
},
)
// write the collected indexes
batch := d.NewWriteBatch()
for _, v := range expIndexes {
if err = batch.Set(v, nil); chk.E(err) {
continue
}
}
if err = batch.Flush(); chk.E(err) {
return
}
}

View File

@@ -8,18 +8,37 @@ import (
"orly.dev/pkg/encoders/event"
"orly.dev/pkg/encoders/filter"
"orly.dev/pkg/encoders/hex"
"orly.dev/pkg/encoders/ints"
"orly.dev/pkg/encoders/kind"
"orly.dev/pkg/encoders/kinds"
"orly.dev/pkg/encoders/tag"
"orly.dev/pkg/interfaces/store"
"orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/context"
"orly.dev/pkg/utils/log"
"sort"
"strconv"
"time"
)
func CheckExpiration(ev *event.E) (expired bool) {
var err error
expTag := ev.Tags.GetFirst(tag.New("expiration"))
if expTag != nil {
expTS := ints.New(0)
if _, err = expTS.Unmarshal(expTag.Value()); !chk.E(err) {
if int64(expTS.N) < time.Now().Unix() {
return true
}
}
}
return
}
func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
// if there is Ids in the query, this overrides anything else
var expDeletes types.Uint40s
var expEvs event.S
if f.Ids != nil && f.Ids.Len() > 0 {
for _, idx := range f.Ids.ToSliceOfBytes() {
// we know there is only Ids in this, so run the ID query and fetch.
@@ -32,6 +51,12 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
if ev, err = d.FetchEventBySerial(ser); err != nil {
continue
}
// check for an expiration tag and delete after returning the result
if CheckExpiration(ev) {
expDeletes = append(expDeletes, ser)
expEvs = append(expEvs, ev)
continue
}
evs = append(evs, ev)
}
// sort the events by timestamp
@@ -45,16 +70,13 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
if idPkTs, err = d.QueryForIds(c, f); chk.E(err) {
return
}
// Create a map to store the latest version of replaceable events
replaceableEvents := make(map[string]*event.E)
// Create a map to store the latest version of parameterized replaceable
// events
paramReplaceableEvents := make(map[string]map[string]*event.E)
// Regular events that are not replaceable
var regularEvents event.S
// Map to track deletion events by kind and pubkey (for replaceable
// events)
deletionsByKindPubkey := make(map[string]bool)
@@ -63,7 +85,6 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
deletionsByKindPubkeyDTag := make(map[string]map[string]bool)
// Map to track specific event IDs that have been deleted
deletedEventIds := make(map[string]bool)
// Query for deletion events separately if we have authors in the filter
if f.Authors != nil && f.Authors.Len() > 0 {
// Create a filter for deletion events with the same authors
@@ -71,18 +92,21 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
Kinds: kinds.New(kind.New(5)), // Kind 5 is deletion
Authors: f.Authors,
}
var deletionIdPkTs []store.IdPkTs
if deletionIdPkTs, err = d.QueryForIds(c, deletionFilter); chk.E(err) {
if deletionIdPkTs, err = d.QueryForIds(
c, deletionFilter,
); chk.E(err) {
return
}
// Add deletion events to the list of events to process
idPkTs = append(idPkTs, deletionIdPkTs...)
}
// First pass: collect all deletion events
fmt.Printf("Debug: Starting first pass - processing %d events\n", len(idPkTs))
fmt.Printf(
"Debug: Starting first pass - processing %d events\n", len(idPkTs),
)
for _, idpk := range idPkTs {
var ev *event.E
ser := new(types.Uint40)
@@ -92,10 +116,17 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
if ev, err = d.FetchEventBySerial(ser); err != nil {
continue
}
// check for an expiration tag and delete after returning the result
if CheckExpiration(ev) {
expDeletes = append(expDeletes, ser)
expEvs = append(expEvs, ev)
continue
}
// Process deletion events to build our deletion maps
if ev.Kind.Equal(kind.Deletion) {
fmt.Printf("Debug: Found deletion event with ID: %s\n", hex.Enc(ev.ID))
log.D.F(
"found deletion event with ID: %s\n", hex.Enc(ev.ID),
)
// Check for 'e' tags that directly reference event IDs
eTags := ev.Tags.GetAll(tag.New([]byte{'e'}))
for _, eTag := range eTags.ToSliceOfTags() {
@@ -103,25 +134,25 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
continue
}
// We don't need to do anything with direct event ID
// references as we'll filter those out in the second pass
// references as we will filter those out in the second pass
}
// Check for 'a' tags that reference parameterized replaceable
// events
fmt.Printf("Debug: Processing deletion event with ID: %s\n", hex.Enc(ev.ID))
log.D.F(
"processing deletion event with ID: %s\n",
hex.Enc(ev.ID),
)
aTags := ev.Tags.GetAll(tag.New([]byte{'a'}))
fmt.Printf("Debug: Found %d a-tags\n", aTags.Len())
log.D.F("Debug: Found %d a-tags\n", aTags.Len())
for _, aTag := range aTags.ToSliceOfTags() {
if aTag.Len() < 2 {
continue
}
// Parse the 'a' tag value: kind:pubkey:d-tag
split := bytes.Split(aTag.Value(), []byte{':'})
if len(split) != 3 {
continue
}
// Parse the kind
kindStr := string(split[0])
kindInt, err := strconv.Atoi(kindStr)
@@ -129,53 +160,49 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
continue
}
kk := kind.New(uint16(kindInt))
// Only process parameterized replaceable events
if !kk.IsParameterizedReplaceable() {
continue
}
// Parse the pubkey
var pk []byte
if pk, err = hex.DecAppend(nil, split[1]); err != nil {
continue
}
// Only allow users to delete their own events
if !bytes.Equal(pk, ev.Pubkey) {
continue
}
// Create the key for the deletion map using hex representation of pubkey
// Create the key for the deletion map using hex
// representation of pubkey
key := hex.Enc(pk) + ":" + strconv.Itoa(int(kk.K))
// Initialize the inner map if it doesn't exist
if _, exists := deletionsByKindPubkeyDTag[key]; !exists {
deletionsByKindPubkeyDTag[key] = make(map[string]bool)
}
// Mark this d-tag as deleted
dValue := string(split[2])
deletionsByKindPubkeyDTag[key][dValue] = true
// Debug logging
fmt.Printf("Debug: Processing a-tag: %s\n", string(aTag.Value()))
fmt.Printf("Debug: Adding to deletion map - key: %s, d-tag: %s\n", key, dValue)
log.D.F(
"processing a-tag: %s\n", string(aTag.Value()),
)
log.D.F(
"adding to deletion map - key: %s, d-tag: %s\n",
key, dValue,
)
}
// For replaceable events, we need to check if there are any
// e-tags that reference events with the same kind and pubkey
for _, eTag := range eTags.ToSliceOfTags() {
if eTag.Len() < 2 {
continue
}
// Get the event ID from the e-tag
evId := make([]byte, sha256.Size)
if _, err = hex.DecBytes(evId, eTag.Value()); err != nil {
continue
}
// Query for the event
var targetEvs event.S
targetEvs, err = d.QueryEvents(
@@ -184,24 +211,23 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
if err != nil || len(targetEvs) == 0 {
continue
}
targetEv := targetEvs[0]
// Only allow users to delete their own events
if !bytes.Equal(targetEv.Pubkey, ev.Pubkey) {
continue
}
// Mark the specific event ID as deleted
deletedEventIds[hex.Enc(targetEv.ID)] = true
// If the event is replaceable, mark it as deleted, but only for events older than this one
// If the event is replaceable, mark it as deleted, but only
// for events older than this one
if targetEv.Kind.IsReplaceable() {
key := hex.Enc(targetEv.Pubkey) + ":" + strconv.Itoa(int(targetEv.Kind.K))
// We'll still use deletionsByKindPubkey, but we'll check timestamps in the second pass
// We will still use deletionsByKindPubkey, but we'll
// check timestamps in the second pass
deletionsByKindPubkey[key] = true
} else if targetEv.Kind.IsParameterizedReplaceable() {
// For parameterized replaceable events, we need to consider the 'd' tag
// For parameterized replaceable events, we need to
// consider the 'd' tag
key := hex.Enc(targetEv.Pubkey) + ":" + strconv.Itoa(int(targetEv.Kind.K))
// Get the 'd' tag value
@@ -213,12 +239,10 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
// If no 'd' tag, use empty string
dValue = ""
}
// Initialize the inner map if it doesn't exist
if _, exists := deletionsByKindPubkeyDTag[key]; !exists {
deletionsByKindPubkeyDTag[key] = make(map[string]bool)
}
// Mark this d-tag as deleted
deletionsByKindPubkeyDTag[key][dValue] = true
}
@@ -236,12 +260,10 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
if ev, err = d.FetchEventBySerial(ser); err != nil {
continue
}
// Skip events with kind 5 (Deletion)
if ev.Kind.Equal(kind.Deletion) {
continue
}
// Check if this event's ID is in the filter
isIdInFilter := false
if f.Ids != nil && f.Ids.Len() > 0 {
@@ -252,22 +274,21 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
}
}
}
// Check if this specific event has been deleted
eventIdHex := hex.Enc(ev.ID)
if deletedEventIds[eventIdHex] && !isIdInFilter {
// Skip this event if it has been specifically deleted and is not in the filter
// Skip this event if it has been specifically deleted and is
// not in the filter
continue
}
if ev.Kind.IsReplaceable() {
// For replaceable events, we only keep the latest version for
// each pubkey and kind, and only if it hasn't been deleted
key := hex.Enc(ev.Pubkey) + ":" + strconv.Itoa(int(ev.Kind.K))
// For replaceable events, we need to be more careful with deletion
// Only skip this event if it has been deleted by kind/pubkey and is not in the filter
// AND there isn't a newer event with the same kind/pubkey
// For replaceable events, we need to be more careful with
// deletion Only skip this event if it has been deleted by
// kind/pubkey and is not in the filter AND there isn't a newer
// event with the same kind/pubkey
if deletionsByKindPubkey[key] && !isIdInFilter {
// Check if there's a newer event with the same kind/pubkey
// that hasn't been specifically deleted
@@ -304,11 +325,17 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
// Check if this event has been deleted via an a-tag
if deletionMap, exists := deletionsByKindPubkeyDTag[key]; exists {
// Debug logging
fmt.Printf("Debug: Checking deletion map - key: %s, d-tag: %s\n", key, dValue)
fmt.Printf("Debug: Deletion map contains key: %v, d-tag in map: %v\n", exists, deletionMap[dValue])
// If the d-tag value is in the deletion map and this event is not
// specifically requested by ID, skip it
fmt.Printf(
"Debug: Checking deletion map - key: %s, d-tag: %s\n",
key, dValue,
)
fmt.Printf(
"Debug: Deletion map contains key: %v, d-tag in map: %v\n",
exists, deletionMap[dValue],
)
// If the d-tag value is in the deletion map and this event
// is not specifically requested by ID, skip it
if deletionMap[dValue] && !isIdInFilter {
fmt.Printf("Debug: Event deleted - skipping\n")
continue
@@ -336,7 +363,6 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
regularEvents = append(regularEvents, ev)
}
}
// Add all the latest replaceable events to the result
for _, ev := range replaceableEvents {
evs = append(evs, ev)
@@ -348,16 +374,22 @@ func (d *D) QueryEvents(c context.T, f *filter.F) (evs event.S, err error) {
evs = append(evs, ev)
}
}
// Add all regular events to the result
evs = append(evs, regularEvents...)
// Sort all events by timestamp (newest first)
sort.Slice(
evs, func(i, j int) bool {
return evs[i].CreatedAt.I64() > evs[j].CreatedAt.I64()
},
)
// delete the expired events in a background thread
go func() {
for i, ser := range expDeletes {
if err = d.DeleteEventBySerial(c, ser, expEvs[i]); chk.E(err) {
continue
}
}
}()
}
return
}

View File

@@ -0,0 +1,211 @@
package event
import (
"bytes"
"fmt"
"io"
"orly.dev/pkg/crypto/ec/schnorr"
"orly.dev/pkg/crypto/sha256"
"orly.dev/pkg/encoders/kind"
"orly.dev/pkg/encoders/tags"
text2 "orly.dev/pkg/encoders/text"
"orly.dev/pkg/encoders/timestamp"
"orly.dev/pkg/utils/chk"
"orly.dev/pkg/utils/errorf"
)
func (ev *E) UnmarshalRead(rd io.Reader) (err error) {
key := make([]byte, 0, 9)
// Read entire content from the io.Reader into a buffer to reuse existing slice-based parser.
var b []byte
if rd != nil {
var readErr error
b, readErr = io.ReadAll(rd)
if readErr != nil {
return readErr
}
}
r := b
for ; len(r) > 0; r = r[1:] {
// Skip whitespace
if isWhitespace(r[0]) {
continue
}
if r[0] == '{' {
r = r[1:]
goto BetweenKeys
}
}
goto eof
BetweenKeys:
for ; len(r) > 0; r = r[1:] {
// Skip whitespace
if isWhitespace(r[0]) {
continue
}
if r[0] == '"' {
r = r[1:]
goto InKey
}
}
goto eof
InKey:
for ; len(r) > 0; r = r[1:] {
if r[0] == '"' {
r = r[1:]
goto InKV
}
key = append(key, r[0])
}
goto eof
InKV:
for ; len(r) > 0; r = r[1:] {
// Skip whitespace
if isWhitespace(r[0]) {
continue
}
if r[0] == ':' {
r = r[1:]
goto InVal
}
}
goto eof
InVal:
// Skip whitespace before value
for len(r) > 0 && isWhitespace(r[0]) {
r = r[1:]
}
switch key[0] {
case jId[0]:
if !bytes.Equal(jId, key) {
goto invalid
}
var id []byte
if id, r, err = text2.UnmarshalHex(r); chk.E(err) {
return
}
if len(id) != sha256.Size {
err = errorf.E(
"invalid ID, require %d got %d", sha256.Size,
len(id),
)
return
}
ev.ID = id
goto BetweenKV
case jPubkey[0]:
if !bytes.Equal(jPubkey, key) {
goto invalid
}
var pk []byte
if pk, r, err = text2.UnmarshalHex(r); chk.E(err) {
return
}
if len(pk) != schnorr.PubKeyBytesLen {
err = errorf.E(
"invalid pubkey, require %d got %d",
schnorr.PubKeyBytesLen, len(pk),
)
return
}
ev.Pubkey = pk
goto BetweenKV
case jKind[0]:
if !bytes.Equal(jKind, key) {
goto invalid
}
ev.Kind = kind.New(0)
if r, err = ev.Kind.Unmarshal(r); chk.E(err) {
return
}
goto BetweenKV
case jTags[0]:
if !bytes.Equal(jTags, key) {
goto invalid
}
ev.Tags = tags.New()
if r, err = ev.Tags.Unmarshal(r); chk.E(err) {
return
}
goto BetweenKV
case jSig[0]:
if !bytes.Equal(jSig, key) {
goto invalid
}
var sig []byte
if sig, r, err = text2.UnmarshalHex(r); chk.E(err) {
return
}
if len(sig) != schnorr.SignatureSize {
err = errorf.E(
"invalid sig length, require %d got %d '%s'\n%s",
schnorr.SignatureSize, len(sig), r, b,
)
return
}
ev.Sig = sig
goto BetweenKV
case jContent[0]:
if key[1] == jContent[1] {
if !bytes.Equal(jContent, key) {
goto invalid
}
if ev.Content, r, err = text2.UnmarshalQuoted(r); chk.T(err) {
return
}
goto BetweenKV
} else if key[1] == jCreatedAt[1] {
if !bytes.Equal(jCreatedAt, key) {
goto invalid
}
ev.CreatedAt = timestamp.New(int64(0))
if r, err = ev.CreatedAt.Unmarshal(r); chk.T(err) {
return
}
goto BetweenKV
} else {
goto invalid
}
default:
goto invalid
}
BetweenKV:
key = key[:0]
for ; len(r) > 0; r = r[1:] {
// Skip whitespace
if isWhitespace(r[0]) {
continue
}
switch {
case len(r) == 0:
return
case r[0] == '}':
r = r[1:]
goto AfterClose
case r[0] == ',':
r = r[1:]
goto BetweenKeys
case r[0] == '"':
r = r[1:]
goto InKey
}
}
goto eof
AfterClose:
// Skip any trailing whitespace
for len(r) > 0 && isWhitespace(r[0]) {
r = r[1:]
}
return
invalid:
err = fmt.Errorf(
"invalid key,\n'%s'\n'%s'\n'%s'", string(b), string(b[:len(r)]),
string(r),
)
return
eof:
err = io.EOF
return
}

View File

@@ -196,8 +196,3 @@ func (ev *E) MarshalWriteWithWhitespace(w io.Writer, on bool) (err error) {
}
return
}
func (ev *E) UnmarshalRead(r io.Reader) (err error) {
// TODO implement me
panic("implement me")
}

View File

@@ -79,8 +79,10 @@ func (a *A) HandleEvent(
// Check if the IP is blocked due to too many failed auth attempts
if iptracker.Global.IsBlocked(remoteIP) {
blockedUntil := iptracker.Global.GetBlockedUntil(remoteIP)
blockMsg := fmt.Sprintf("Too many failed authentication attempts. Blocked until %s",
blockedUntil.Format(time.RFC3339))
blockMsg := fmt.Sprintf(
"Too many failed authentication attempts. Blocked until %s",
blockedUntil.Format(time.RFC3339),
)
// Send a notice to the client explaining why they're blocked
if err = noticeenvelope.NewFrom(blockMsg).Write(a.Listener); chk.E(err) {
@@ -88,7 +90,10 @@ func (a *A) HandleEvent(
}
// Close the connection
log.I.F("closing connection from %s due to too many failed auth attempts", remoteIP)
log.I.F(
"closing connection from %s due to too many failed auth attempts",
remoteIP,
)
a.Listener.Close()
return
}
@@ -98,8 +103,10 @@ func (a *A) HandleEvent(
if blocked {
// If this attempt caused the IP to be blocked, close the connection
blockedUntil := iptracker.Global.GetBlockedUntil(remoteIP)
blockMsg := fmt.Sprintf("Too many failed authentication attempts. Blocked until %s",
blockedUntil.Format(time.RFC3339))
blockMsg := fmt.Sprintf(
"Too many failed authentication attempts. Blocked until %s",
blockedUntil.Format(time.RFC3339),
)
// Send a notice to the client explaining why they're blocked
if err = noticeenvelope.NewFrom(blockMsg).Write(a.Listener); chk.E(err) {
@@ -107,7 +114,10 @@ func (a *A) HandleEvent(
}
// Close the connection
log.I.F("closing connection from %s due to too many failed auth attempts", remoteIP)
log.I.F(
"closing connection from %s due to too many failed auth attempts",
remoteIP,
)
a.Listener.Close()
return
}
@@ -170,7 +180,7 @@ func (a *A) HandleEvent(
}
return
}
log.I.F("checking if policy allows this event")
log.T.F("checking if policy allows this event")
// check that relay policy allows this event
accept, notice, _ := srv.AcceptEvent(
c, env.E, a.Listener.Request, a.Listener.AuthedPubkey(),
@@ -187,7 +197,7 @@ func (a *A) HandleEvent(
}
return
}
log.I.F("checking for protected tag")
log.T.F("checking for protected tag")
// check for protected tag (NIP-70)
protectedTag := env.E.Tags.GetFirst(tag.New("-"))
if protectedTag != nil && a.AuthRequired() {