Compare commits

..

30 Commits

Author SHA1 Message Date
6e06905773 Replace WriteTimeout with DefaultWriteTimeout in publisher for consistency and bump version to v0.4.2.
Some checks failed
Go / build (push) Has been cancelled
2025-09-11 16:32:40 +01:00
d1316a5b7a Introduce DefaultWriteTimeout for WebSocket operations, replace hardcoded timeouts, and upgrade version to v0.4.1.
Some checks failed
Go / build (push) Has been cancelled
2025-09-11 16:29:43 +01:00
b45f0a2c51 Bump version to v0.4.0.
Some checks failed
Go / build (push) Has been cancelled
2025-09-11 12:35:15 +01:00
e2b7152221 Introduce Ctx() for proper context management in Listener and replace direct context usage in HandleDelete with Ctx().
also introduce a 3 second timeout for websocket read failure
2025-09-11 12:34:01 +01:00
bf7ca1da43 Improve logging consistency across the application, handle context cancellation during WebSocket writes, and introduce async ACL reconfiguration for admin events. 2025-09-11 11:37:25 +01:00
bb8998fef6 Update relay tester scripts to use /tmp/orlytest for temporary data storage and adjust cleanup logic 2025-09-10 22:48:14 +01:00
57ac3667e6 Bump version to v0.3.2.
Some checks failed
Go / build (push) Has been cancelled
2025-09-10 22:26:57 +01:00
cb54891473 Remove verbose and debug logging across HandleDelete, HandleEvent, and acl/follows for consistency. 2025-09-10 22:26:41 +01:00
fdcfd863e0 Update SaveEvent to improve logging consistency, block duplicate event creations more explicitly, and handle multi-line log formatting updates. Bump version to v0.3.1.
Some checks failed
Go / build (push) Has been cancelled
2025-09-10 22:23:35 +01:00
4e96c9e2f7 Remove debug logging across the codebase and update version to v0.3.0.
Some checks failed
Go / build (push) Has been cancelled
2025-09-10 22:12:54 +01:00
fb956ff09c Block resubmission of deleted events by ID in SaveEvent and simplify deletion timestamp checks in QueryForDeleted. 2025-09-10 20:43:53 +01:00
eac6ba1410 Enhance HandleDelete to skip newer events based on delete event timestamp and improve logging for skipped and deleted events. 2025-09-10 20:34:35 +01:00
6b4b035f0c Refine HandleDelete logic to enforce a-tag criteria for replaceable events, improve parameterized replaceable event handling, and enhance logging for skipped and deleted events. 2025-09-10 20:27:02 +01:00
c2c6720e01 Enhance SaveEvent logic to handle older event rejection with error reporting, validate timestamps in parameterized replaceable events, and improve HandleEvent error handling for blocked events. 2025-09-10 20:11:59 +01:00
dddcc682b9 Improve HandleDelete error handling, add validation for deletion ownership, and enhance logging for unauthorized deletion attempts. 2025-09-10 19:56:11 +01:00
ddaab70d2b Improve HandleDelete error handling, add validation for deletion ownership, and enhance logging for unauthorized deletion attempts. 2025-09-10 19:32:42 +01:00
61cec63ca9 Add detailed tag filter debugging logs in QueryEvents and update rules.md with context and debugging guidance. 2025-09-10 19:24:24 +01:00
b063dab2a3 Improve logging, error handling for ID queries, and ensure inclusive range boundaries in event management. 2025-09-10 19:04:54 +01:00
9e59d5f72b Set default value for LogToStdout, enhance logging for request handling, query events, and filters, and fix ID handling in relaytester-test.sh. 2025-09-10 16:29:55 +01:00
fe3893addf Add LogToStdout config option, improve tag decoding, and fix ID tracking in event handling 2025-09-10 15:16:33 +01:00
5eb192f208 Send initial AUTH challenge if admins are configured and clean up leftover ORLY data in relaytester-test.sh. 2025-09-10 14:39:22 +01:00
2385d1f752 Update relaytester-test.sh log level to off and improve follows key decoding logic in ACL implementation for clarity and error handling. 2025-09-10 14:30:31 +01:00
faad7ddc93 add relay-tester scripts 2025-09-10 14:23:57 +01:00
c9314bdbd0 Refactor GetAccessLevel to include address parameter, update all ACL implementations and handlers for enhanced contextual access control. 2025-09-08 07:42:47 +01:00
85d806b157 Bump version to v0.2.1
Some checks failed
Go / build (push) Has been cancelled
2025-09-07 23:44:06 +01:00
6207f9d426 Enforce authenticated pubkey checks for privileged events, refactor delivery logic for improved efficiency, and extend Subscription with AuthedPubkey. 2025-09-07 23:41:45 +01:00
ebb5e2c0f3 Refactor publisher to clean up dead code, streamline event filtering, and optimize subscriber removal logic. 2025-09-07 23:35:01 +01:00
9dec51cd40 Switch sync.Mutex to sync.RWMutex in publisher for improved concurrent read performance. 2025-09-07 23:06:46 +01:00
f570660f37 Uncomment and enable additional relayinfo features and fix order of event response handling in SaveEvent. 2025-09-07 23:01:26 +01:00
3d3a0fa520 Refactor Signer to use secp256k1 directly and enhance ACL reconfiguration for admin-triggered events 2025-09-07 21:59:50 +01:00
32 changed files with 15776 additions and 227 deletions

View File

@@ -89,3 +89,7 @@ A good typical example:
// - Initializes the relay, starting its operation in a separate goroutine. // - Initializes the relay, starting its operation in a separate goroutine.
``` ```
use the source of the relay-tester to help guide what expectations the test has,
and use context7 for information about the nostr protocol, and use additional
log statements to help locate the cause of bugs

View File

@@ -29,6 +29,7 @@ type C struct {
Port int `env:"ORLY_PORT" default:"3334" usage:"port to listen on"` Port int `env:"ORLY_PORT" default:"3334" usage:"port to listen on"`
LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"` LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"`
DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"` DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"`
LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"`
Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation"` Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation"`
IPWhitelist []string `env:"ORLY_IP_WHITELIST" usage:"comma-separated list of IP addresses to allow access from, matches on prefixes to allow private subnets, eg 10.0.0 = 10.0.0.0/8"` IPWhitelist []string `env:"ORLY_IP_WHITELIST" usage:"comma-separated list of IP addresses to allow access from, matches on prefixes to allow private subnets, eg 10.0.0 = 10.0.0.0/8"`
Admins []string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"` Admins []string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"`
@@ -73,6 +74,9 @@ func New() (cfg *C, err error) {
PrintHelp(cfg, os.Stderr) PrintHelp(cfg, os.Stderr)
os.Exit(0) os.Exit(0)
} }
if cfg.LogToStdout {
lol.Writer = os.Stdout
}
lol.SetLogLevel(cfg.LogLevel) lol.SetLogLevel(cfg.LogLevel)
return return
} }

View File

@@ -46,7 +46,7 @@ func (l *Listener) HandleAuth(b []byte) (err error) {
return return
} }
log.D.F( log.D.F(
"%s authed to pubkey,%0x", l.remote, "%s authed to pubkey %0x", l.remote,
env.Event.Pubkey, env.Event.Pubkey,
) )
l.authedPubkey.Store(env.Event.Pubkey) l.authedPubkey.Store(env.Event.Pubkey)

View File

@@ -23,14 +23,14 @@ func (l *Listener) GetSerialsFromFilter(f *filter.F) (
return l.D.GetSerialsFromFilter(f) return l.D.GetSerialsFromFilter(f)
} }
func (l *Listener) HandleDelete(env *eventenvelope.Submission) { func (l *Listener) HandleDelete(env *eventenvelope.Submission) (err error) {
log.I.C( // log.I.C(
func() string { // func() string {
return fmt.Sprintf( // return fmt.Sprintf(
"delete event\n%s", env.E.Serialize(), // "delete event\n%s", env.E.Serialize(),
) // )
}, // },
) // )
var ownerDelete bool var ownerDelete bool
for _, pk := range l.Admins { for _, pk := range l.Admins {
if utils.FastEqual(pk, env.E.Pubkey) { if utils.FastEqual(pk, env.E.Pubkey) {
@@ -39,15 +39,17 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
} }
} }
// process the tags in the delete event // process the tags in the delete event
var err error var deleteErr error
var validDeletionFound bool
for _, t := range *env.E.Tags { for _, t := range *env.E.Tags {
// first search for a tags, as these are the simplest to process // first search for a tags, as these are the simplest to process
if utils.FastEqual(t.Key(), []byte("a")) { if utils.FastEqual(t.Key(), []byte("a")) {
at := new(atag.T) at := new(atag.T)
if _, err = at.Unmarshal(t.Value()); chk.E(err) { if _, deleteErr = at.Unmarshal(t.Value()); chk.E(deleteErr) {
continue continue
} }
if ownerDelete || utils.FastEqual(env.E.Pubkey, at.Pubkey) { if ownerDelete || utils.FastEqual(env.E.Pubkey, at.Pubkey) {
validDeletionFound = true
// find the event and delete it // find the event and delete it
f := &filter.F{ f := &filter.F{
Authors: tag.NewFromBytesSlice(at.Pubkey), Authors: tag.NewFromBytesSlice(at.Pubkey),
@@ -69,13 +71,43 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
if ev, err = l.FetchEventBySerial(s); chk.E(err) { if ev, err = l.FetchEventBySerial(s); chk.E(err) {
continue continue
} }
if !(kind.IsReplaceable(ev.Kind) && len(at.DTag) == 0) { // Only delete events that match the a-tag criteria:
// skip a tags with no dtag if the kind is not // - For parameterized replaceable events: must have matching d-tag
// replaceable. // - For regular replaceable events: should not have d-tag constraint
if kind.IsParameterizedReplaceable(ev.Kind) {
// For parameterized replaceable, we need a DTag to match
if len(at.DTag) == 0 {
log.I.F(
"HandleDelete: skipping parameterized replaceable event %s - no DTag in a-tag",
hex.Enc(ev.ID),
)
continue
}
} else if !kind.IsReplaceable(ev.Kind) {
// For non-replaceable events, a-tags don't apply
log.I.F(
"HandleDelete: skipping non-replaceable event %s - a-tags only apply to replaceable events",
hex.Enc(ev.ID),
)
continue continue
} }
// Only delete events that are older than or equal to the delete event timestamp
if ev.CreatedAt > env.E.CreatedAt {
log.I.F(
"HandleDelete: skipping newer event %s (created_at=%d) - delete event timestamp is %d",
hex.Enc(ev.ID), ev.CreatedAt, env.E.CreatedAt,
)
continue
}
log.I.F(
"HandleDelete: deleting event %s via a-tag %d:%s:%s (event_time=%d, delete_time=%d)",
hex.Enc(ev.ID), at.Kind.K, hex.Enc(at.Pubkey),
string(at.DTag), ev.CreatedAt, env.E.CreatedAt,
)
if err = l.DeleteEventBySerial( if err = l.DeleteEventBySerial(
l.Ctx, s, ev, l.Ctx(), s, ev,
); chk.E(err) { ); chk.E(err) {
continue continue
} }
@@ -87,10 +119,16 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
// if e tags are found, delete them if the author is signer, or one of // if e tags are found, delete them if the author is signer, or one of
// the owners is signer // the owners is signer
if utils.FastEqual(t.Key(), []byte("e")) { if utils.FastEqual(t.Key(), []byte("e")) {
var dst []byte val := t.Value()
if _, err = hex.DecBytes(dst, t.Value()); chk.E(err) { if len(val) == 0 {
continue continue
} }
var dst []byte
if b, e := hex.Dec(string(val)); chk.E(e) {
continue
} else {
dst = b
}
f := &filter.F{ f := &filter.F{
Ids: tag.NewFromBytesSlice(dst), Ids: tag.NewFromBytesSlice(dst),
} }
@@ -108,16 +146,26 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
continue continue
} }
// check that the author is the same as the signer of the // check that the author is the same as the signer of the
// delete, for the k tag case the author is the signer of // delete, for the e tag case the author is the signer of
// the event. // the event.
if !utils.FastEqual(env.E.Pubkey, ev.Pubkey) { if !utils.FastEqual(env.E.Pubkey, ev.Pubkey) {
log.W.F(
"HandleDelete: attempted deletion of event %s by different user - delete pubkey=%s, event pubkey=%s",
hex.Enc(ev.ID), hex.Enc(env.E.Pubkey),
hex.Enc(ev.Pubkey),
)
continue continue
} }
validDeletionFound = true
// exclude delete events // exclude delete events
if ev.Kind == kind.EventDeletion.K { if ev.Kind == kind.EventDeletion.K {
continue continue
} }
if err = l.DeleteEventBySerial(l.Ctx, s, ev); chk.E(err) { log.I.F(
"HandleDelete: deleting event %s by authorized user %s",
hex.Enc(ev.ID), hex.Enc(env.E.Pubkey),
)
if err = l.DeleteEventBySerial(l.Ctx(), s, ev); chk.E(err) {
continue continue
} }
} }
@@ -164,5 +212,11 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
} }
continue continue
} }
// If no valid deletions were found, return an error
if !validDeletionFound {
return fmt.Errorf("blocked: cannot delete events that belong to other users")
}
return return
} }

View File

@@ -1,8 +1,10 @@
package app package app
import ( import (
"context"
"fmt" "fmt"
"strings" "strings"
"time"
acl "acl.orly" acl "acl.orly"
"encoders.orly/envelopes/authenvelope" "encoders.orly/envelopes/authenvelope"
@@ -62,7 +64,7 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
return return
} }
// check permissions of user // check permissions of user
accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load()) accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load(), l.remote)
switch accessLevel { switch accessLevel {
case "none": case "none":
log.D.F( log.D.F(
@@ -99,11 +101,21 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
return return
default: default:
// user has write access or better, continue // user has write access or better, continue
log.D.F("user has %s access", accessLevel) // log.D.F("user has %s access", accessLevel)
} }
// if the event is a delete, process the delete // if the event is a delete, process the delete
if env.E.Kind == kind.EventDeletion.K { if env.E.Kind == kind.EventDeletion.K {
l.HandleDelete(env) if err = l.HandleDelete(env); err != nil {
if strings.HasPrefix(err.Error(), "blocked:") {
errStr := err.Error()[len("blocked: "):len(err.Error())]
if err = Ok.Error(
l, env, errStr,
); chk.E(err) {
return
}
return
}
}
} else { } else {
// check if the event was deleted // check if the event was deleted
if err = l.CheckForDeleted(env.E, l.Admins); err != nil { if err = l.CheckForDeleted(env.E, l.Admins); err != nil {
@@ -117,21 +129,49 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
} }
} }
} }
// store the event // store the event - use a separate context to prevent cancellation issues
log.I.F("saving event %0x, %s", env.E.ID, env.E.Serialize()) saveCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
if _, _, err = l.SaveEvent(l.Ctx, env.E); chk.E(err) { defer cancel()
// log.I.F("saving event %0x, %s", env.E.ID, env.E.Serialize())
if _, _, err = l.SaveEvent(saveCtx, env.E); err != nil {
if strings.HasPrefix(err.Error(), "blocked:") {
errStr := err.Error()[len("blocked: "):len(err.Error())]
if err = Ok.Error(
l, env, errStr,
); chk.E(err) {
return
}
return
}
chk.E(err)
return return
} }
// if a follow list was saved, reconfigure ACLs now that it is persisted
if env.E.Kind == kind.FollowList.K {
if err = acl.Registry.Configure(); chk.E(err) {
}
}
l.publishers.Deliver(env.E)
// Send a success response storing // Send a success response storing
if err = Ok.Ok(l, env, ""); chk.E(err) { if err = Ok.Ok(l, env, ""); chk.E(err) {
return return
} }
// Deliver the event to subscribers immediately after sending OK response
l.publishers.Deliver(env.E)
log.D.F("saved event %0x", env.E.ID) log.D.F("saved event %0x", env.E.ID)
var isNewFromAdmin bool
for _, admin := range l.Admins {
if utils.FastEqual(admin, env.E.Pubkey) {
isNewFromAdmin = true
break
}
}
if isNewFromAdmin {
log.I.F("new event from admin %0x", env.E.Pubkey)
// if a follow list was saved, reconfigure ACLs now that it is persisted
if env.E.Kind == kind.FollowList.K ||
env.E.Kind == kind.RelayListMetadata.K {
// Run ACL reconfiguration asynchronously to prevent blocking websocket operations
go func() {
if err := acl.Registry.Configure(); chk.E(err) {
log.E.F("failed to reconfigure ACL: %v", err)
}
}()
}
}
return return
} }

View File

@@ -1,8 +1,6 @@
package app package app
import ( import (
"fmt"
"encoders.orly/envelopes" "encoders.orly/envelopes"
"encoders.orly/envelopes/authenvelope" "encoders.orly/envelopes/authenvelope"
"encoders.orly/envelopes/closeenvelope" "encoders.orly/envelopes/closeenvelope"
@@ -15,42 +13,36 @@ import (
) )
func (l *Listener) HandleMessage(msg []byte, remote string) { func (l *Listener) HandleMessage(msg []byte, remote string) {
log.D.C( log.D.F("%s received message:\n%s", remote, msg)
func() string {
return fmt.Sprintf(
"%s received message:\n%s", remote, msg,
)
},
)
var err error var err error
var t string var t string
var rem []byte var rem []byte
if t, rem, err = envelopes.Identify(msg); !chk.E(err) { if t, rem, err = envelopes.Identify(msg); !chk.E(err) {
switch t { switch t {
case eventenvelope.L: case eventenvelope.L:
log.D.F("eventenvelope: %s", rem) // log.D.F("eventenvelope: %s %s", remote, rem)
err = l.HandleEvent(rem) err = l.HandleEvent(rem)
case reqenvelope.L: case reqenvelope.L:
log.D.F("reqenvelope: %s", rem) // log.D.F("reqenvelope: %s %s", remote, rem)
err = l.HandleReq(rem) err = l.HandleReq(rem)
case closeenvelope.L: case closeenvelope.L:
log.D.F("closeenvelope: %s", rem) // log.D.F("closeenvelope: %s %s", remote, rem)
err = l.HandleClose(rem) err = l.HandleClose(rem)
case authenvelope.L: case authenvelope.L:
log.D.F("authenvelope: %s", rem) // log.D.F("authenvelope: %s %s", remote, rem)
err = l.HandleAuth(rem) err = l.HandleAuth(rem)
default: default:
err = errorf.E("unknown envelope type %s\n%s", t, rem) err = errorf.E("unknown envelope type %s\n%s", t, rem)
} }
} }
if err != nil { if err != nil {
log.D.C( // log.D.C(
func() string { // func() string {
return fmt.Sprintf( // return fmt.Sprintf(
"notice->%s %s", remote, err, // "notice->%s %s", remote, err,
) // )
}, // },
) // )
if err = noticeenvelope.NewFrom(err.Error()).Write(l); chk.E(err) { if err = noticeenvelope.NewFrom(err.Error()).Write(l); chk.E(err) {
return return
} }

View File

@@ -33,32 +33,32 @@ func (s *Server) HandleRelayInfo(w http.ResponseWriter, r *http.Request) {
relayinfo.BasicProtocol, relayinfo.BasicProtocol,
// relayinfo.Authentication, // relayinfo.Authentication,
// relayinfo.EncryptedDirectMessage, // relayinfo.EncryptedDirectMessage,
// relayinfo.EventDeletion, relayinfo.EventDeletion,
relayinfo.RelayInformationDocument, relayinfo.RelayInformationDocument,
// relayinfo.GenericTagQueries, // relayinfo.GenericTagQueries,
// relayinfo.NostrMarketplace, // relayinfo.NostrMarketplace,
// relayinfo.EventTreatment, relayinfo.EventTreatment,
// relayinfo.CommandResults, // relayinfo.CommandResults,
// relayinfo.ParameterizedReplaceableEvents, relayinfo.ParameterizedReplaceableEvents,
// relayinfo.ExpirationTimestamp, // relayinfo.ExpirationTimestamp,
// relayinfo.ProtectedEvents, relayinfo.ProtectedEvents,
// relayinfo.RelayListMetadata, relayinfo.RelayListMetadata,
) )
if s.Config.ACLMode != "none" { if s.Config.ACLMode != "none" {
supportedNIPs = relayinfo.GetList( supportedNIPs = relayinfo.GetList(
relayinfo.BasicProtocol, relayinfo.BasicProtocol,
relayinfo.Authentication, relayinfo.Authentication,
// relayinfo.EncryptedDirectMessage, // relayinfo.EncryptedDirectMessage,
// relayinfo.EventDeletion, relayinfo.EventDeletion,
relayinfo.RelayInformationDocument, relayinfo.RelayInformationDocument,
// relayinfo.GenericTagQueries, // relayinfo.GenericTagQueries,
// relayinfo.NostrMarketplace, // relayinfo.NostrMarketplace,
// relayinfo.EventTreatment, relayinfo.EventTreatment,
// relayinfo.CommandResults, // relayinfo.CommandResults,
// relayinfo.ParameterizedReplaceableEvents, // relayinfo.ParameterizedReplaceableEvents,
// relayinfo.ExpirationTimestamp, // relayinfo.ExpirationTimestamp,
// relayinfo.ProtectedEvents, relayinfo.ProtectedEvents,
// relayinfo.RelayListMetadata, relayinfo.RelayListMetadata,
) )
} }
sort.Sort(supportedNIPs) sort.Sort(supportedNIPs)

View File

@@ -1,7 +1,10 @@
package app package app
import ( import (
"context"
"errors" "errors"
"fmt"
"time"
acl "acl.orly" acl "acl.orly"
"encoders.orly/envelopes/authenvelope" "encoders.orly/envelopes/authenvelope"
@@ -24,16 +27,15 @@ import (
"utils.orly/pointers" "utils.orly/pointers"
) )
func (l *Listener) HandleReq(msg []byte) ( func (l *Listener) HandleReq(msg []byte) (err error) {
err error, log.T.F("HandleReq: START processing from %s\n%s\n", l.remote, msg)
) {
var rem []byte var rem []byte
env := reqenvelope.New() env := reqenvelope.New()
if rem, err = env.Unmarshal(msg); chk.E(err) { if rem, err = env.Unmarshal(msg); chk.E(err) {
return normalize.Error.Errorf(err.Error()) return normalize.Error.Errorf(err.Error())
} }
if len(rem) > 0 { if len(rem) > 0 {
log.I.F("extra '%s'", rem) log.I.F("REQ extra bytes: '%s'", rem)
} }
// send a challenge to the client to auth if an ACL is active // send a challenge to the client to auth if an ACL is active
if acl.Registry.Active.Load() != "none" { if acl.Registry.Active.Load() != "none" {
@@ -43,7 +45,7 @@ func (l *Listener) HandleReq(msg []byte) (
} }
} }
// check permissions of user // check permissions of user
accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load()) accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load(), l.remote)
switch accessLevel { switch accessLevel {
case "none": case "none":
if err = okenvelope.NewFrom( if err = okenvelope.NewFrom(
@@ -59,17 +61,71 @@ func (l *Listener) HandleReq(msg []byte) (
} }
var events event.S var events event.S
for _, f := range *env.Filters { for _, f := range *env.Filters {
idsLen := 0
kindsLen := 0
authorsLen := 0
tagsLen := 0
if f != nil {
if f.Ids != nil {
idsLen = f.Ids.Len()
}
if f.Kinds != nil {
kindsLen = f.Kinds.Len()
}
if f.Authors != nil {
authorsLen = f.Authors.Len()
}
if f.Tags != nil {
tagsLen = f.Tags.Len()
}
}
log.T.F(
"REQ %s: filter summary ids=%d kinds=%d authors=%d tags=%d",
env.Subscription, idsLen, kindsLen, authorsLen, tagsLen,
)
if f != nil && f.Authors != nil && f.Authors.Len() > 0 {
var authors []string
for _, a := range f.Authors.T {
authors = append(authors, hex.Enc(a))
}
log.T.F("REQ %s: authors=%v", env.Subscription, authors)
}
if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 {
log.T.F("REQ %s: kinds=%v", env.Subscription, f.Kinds.ToUint16())
}
if f != nil && f.Ids != nil && f.Ids.Len() > 0 {
var ids []string
for _, id := range f.Ids.T {
ids = append(ids, hex.Enc(id))
}
var lim any
if pointers.Present(f.Limit) {
lim = *f.Limit
} else {
lim = nil
}
log.T.F(
"REQ %s: ids filter count=%d ids=%v limit=%v", env.Subscription,
f.Ids.Len(), ids, lim,
)
}
if pointers.Present(f.Limit) { if pointers.Present(f.Limit) {
if *f.Limit == 0 { if *f.Limit == 0 {
continue continue
} }
} }
if events, err = l.QueryEvents(l.Ctx, f); chk.E(err) { // Use a separate context for QueryEvents to prevent cancellation issues
if errors.Is(err, badger.ErrDBClosed) { queryCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
return defer cancel()
} log.T.F("HandleReq: About to QueryEvents for %s, main context done: %v", l.remote, l.ctx.Err() != nil)
err = nil if events, err = l.QueryEvents(queryCtx, f); chk.E(err) {
} if errors.Is(err, badger.ErrDBClosed) {
return
}
log.T.F("HandleReq: QueryEvents error for %s: %v", l.remote, err)
err = nil
}
log.T.F("HandleReq: QueryEvents completed for %s, found %d events", l.remote, len(events))
} }
var tmp event.S var tmp event.S
privCheck: privCheck:
@@ -115,8 +171,15 @@ privCheck:
events = tmp events = tmp
seen := make(map[string]struct{}) seen := make(map[string]struct{})
for _, ev := range events { for _, ev := range events {
// track the IDs we've sent log.T.F(
seen[string(ev.ID)] = struct{}{} "REQ %s: sending EVENT id=%s kind=%d", env.Subscription,
hex.Enc(ev.ID), ev.Kind,
)
log.T.C(
func() string {
return fmt.Sprintf("event:\n%s\n", ev.Serialize())
},
)
var res *eventenvelope.Result var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith( if res, err = eventenvelope.NewResultWith(
env.Subscription, ev, env.Subscription, ev,
@@ -126,6 +189,8 @@ privCheck:
if err = res.Write(l); chk.E(err) { if err = res.Write(l); chk.E(err) {
return return
} }
// track the IDs we've sent (use hex encoding for stable key)
seen[hex.Enc(ev.ID)] = struct{}{}
} }
// write the EOSE to signal to the client that all events found have been // write the EOSE to signal to the client that all events found have been
// sent. // sent.
@@ -137,6 +202,10 @@ privCheck:
// if the query was for just Ids, we know there can't be any more results, // if the query was for just Ids, we know there can't be any more results,
// so cancel the subscription. // so cancel the subscription.
cancel := true cancel := true
log.T.F(
"REQ %s: computing cancel/subscription; events_sent=%d",
env.Subscription, len(events),
)
var subbedFilters filter.S var subbedFilters filter.S
for _, f := range *env.Filters { for _, f := range *env.Filters {
if f.Ids.Len() < 1 { if f.Ids.Len() < 1 {
@@ -145,12 +214,16 @@ privCheck:
} else { } else {
// remove the IDs that we already sent // remove the IDs that we already sent
var notFounds [][]byte var notFounds [][]byte
for _, ev := range events { for _, id := range f.Ids.T {
if _, ok := seen[string(ev.ID)]; ok { if _, ok := seen[hex.Enc(id)]; ok {
continue continue
} }
notFounds = append(notFounds, ev.ID) notFounds = append(notFounds, id)
} }
log.T.F(
"REQ %s: ids outstanding=%d of %d", env.Subscription,
len(notFounds), f.Ids.Len(),
)
// if all were found, don't add to subbedFilters // if all were found, don't add to subbedFilters
if len(notFounds) == 0 { if len(notFounds) == 0 {
continue continue
@@ -172,11 +245,12 @@ privCheck:
if !cancel { if !cancel {
l.publishers.Receive( l.publishers.Receive(
&W{ &W{
Conn: l.conn, Conn: l.conn,
remote: l.remote, remote: l.remote,
Id: string(env.Subscription), Id: string(env.Subscription),
Receiver: receiver, Receiver: receiver,
Filters: env.Filters, Filters: env.Filters,
AuthedPubkey: l.authedPubkey.Load(),
}, },
) )
} else { } else {
@@ -186,5 +260,6 @@ privCheck:
return return
} }
} }
log.T.F("HandleReq: COMPLETED processing from %s", l.remote)
return return
} }

View File

@@ -7,6 +7,7 @@ import (
"strings" "strings"
"time" "time"
"encoders.orly/envelopes/authenvelope"
"encoders.orly/hex" "encoders.orly/hex"
"github.com/coder/websocket" "github.com/coder/websocket"
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
@@ -18,6 +19,8 @@ const (
DefaultWriteWait = 10 * time.Second DefaultWriteWait = 10 * time.Second
DefaultPongWait = 60 * time.Second DefaultPongWait = 60 * time.Second
DefaultPingWait = DefaultPongWait / 2 DefaultPingWait = DefaultPongWait / 2
DefaultReadTimeout = 3 * time.Second // Read timeout to detect stalled connections
DefaultWriteTimeout = 3 * time.Second
DefaultMaxMessageSize = 1 * units.Mb DefaultMaxMessageSize = 1 * units.Mb
// CloseMessage denotes a close control message. The optional message // CloseMessage denotes a close control message. The optional message
@@ -70,10 +73,18 @@ whitelist:
chal := make([]byte, 32) chal := make([]byte, 32)
rand.Read(chal) rand.Read(chal)
listener.challenge.Store([]byte(hex.Enc(chal))) listener.challenge.Store([]byte(hex.Enc(chal)))
// If admins are configured, immediately prompt client to AUTH (NIP-42)
if len(s.Config.Admins) > 0 {
// log.D.F("sending initial AUTH challenge to %s", remote)
if err = authenvelope.NewChallengeWith(listener.challenge.Load()).
Write(listener); chk.E(err) {
return
}
}
ticker := time.NewTicker(DefaultPingWait) ticker := time.NewTicker(DefaultPingWait)
go s.Pinger(ctx, conn, ticker, cancel) go s.Pinger(ctx, conn, ticker, cancel)
defer func() { defer func() {
log.D.F("closing websocket connection from %s", remote) // log.D.F("closing websocket connection from %s", remote)
cancel() cancel()
ticker.Stop() ticker.Stop()
listener.publishers.Receive(&W{Cancel: true}) listener.publishers.Receive(&W{Cancel: true})
@@ -87,12 +98,33 @@ whitelist:
var typ websocket.MessageType var typ websocket.MessageType
var msg []byte var msg []byte
log.T.F("waiting for message from %s", remote) log.T.F("waiting for message from %s", remote)
if typ, msg, err = conn.Read(ctx); chk.E(err) {
// Create a read context with timeout to prevent indefinite blocking
readCtx, readCancel := context.WithTimeout(ctx, DefaultReadTimeout)
typ, msg, err = conn.Read(readCtx)
readCancel()
if err != nil {
if strings.Contains( if strings.Contains(
err.Error(), "use of closed network connection", err.Error(), "use of closed network connection",
) { ) {
return return
} }
// Handle timeout errors - occurs when client becomes unresponsive
if strings.Contains(err.Error(), "context deadline exceeded") {
log.T.F(
"connection from %s timed out after %v", remote,
DefaultReadTimeout,
)
return
}
// Handle EOF errors gracefully - these occur when client closes connection
// or sends incomplete/malformed WebSocket frames
if strings.Contains(err.Error(), "EOF") ||
strings.Contains(err.Error(), "failed to read frame header") {
log.T.F("connection from %s closed: %v", remote, err)
return
}
status := websocket.CloseStatus(err) status := websocket.CloseStatus(err)
switch status { switch status {
case websocket.StatusNormalClosure, case websocket.StatusNormalClosure,
@@ -100,17 +132,27 @@ whitelist:
websocket.StatusNoStatusRcvd, websocket.StatusNoStatusRcvd,
websocket.StatusAbnormalClosure, websocket.StatusAbnormalClosure,
websocket.StatusProtocolError: websocket.StatusProtocolError:
log.T.F(
"connection from %s closed with status: %v", remote, status,
)
default: default:
log.E.F("unexpected close error from %s: %v", remote, err) log.E.F("unexpected close error from %s: %v", remote, err)
} }
return return
} }
if typ == PingMessage { if typ == PingMessage {
if err = conn.Write(ctx, PongMessage, msg); chk.E(err) { // Create a write context with timeout for pong response
writeCtx, writeCancel := context.WithTimeout(
ctx, DefaultWriteTimeout,
)
if err = conn.Write(writeCtx, PongMessage, msg); chk.E(err) {
writeCancel()
return return
} }
writeCancel()
continue continue
} }
log.T.F("received message from %s: %s", remote, string(msg))
go listener.HandleMessage(msg, remote) go listener.HandleMessage(msg, remote)
} }
} }
@@ -127,9 +169,13 @@ func (s *Server) Pinger(
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if err = conn.Ping(ctx); chk.E(err) { // Create a write context with timeout for ping operation
pingCtx, pingCancel := context.WithTimeout(ctx, DefaultWriteTimeout)
if err = conn.Ping(pingCtx); chk.E(err) {
pingCancel()
return return
} }
pingCancel()
case <-ctx.Done(): case <-ctx.Done():
return return
} }

View File

@@ -19,8 +19,21 @@ type Listener struct {
authedPubkey atomic.Bytes authedPubkey atomic.Bytes
} }
// Ctx returns the listener's context, but creates a new context for each operation
// to prevent cancellation from affecting subsequent operations
func (l *Listener) Ctx() context.Context {
return l.ctx
}
func (l *Listener) Write(p []byte) (n int, err error) { func (l *Listener) Write(p []byte) (n int, err error) {
if err = l.conn.Write(l.ctx, websocket.MessageText, p); chk.E(err) { // Use a separate context with timeout for writes to prevent race conditions
// where the main connection context gets cancelled while writing events
writeCtx, cancel := context.WithTimeout(
context.Background(), DefaultWriteTimeout,
)
defer cancel()
if err = l.conn.Write(writeCtx, websocket.MessageText, p); chk.E(err) {
return return
} }
n = len(p) n = len(p)

View File

@@ -28,6 +28,9 @@ func Run(
var err error var err error
var adminKeys [][]byte var adminKeys [][]byte
for _, admin := range cfg.Admins { for _, admin := range cfg.Admins {
if len(admin) == 0 {
continue
}
var pk []byte var pk []byte
if pk, err = bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(err) { if pk, err = bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(err) {
continue continue

View File

@@ -8,17 +8,21 @@ import (
"encoders.orly/envelopes/eventenvelope" "encoders.orly/envelopes/eventenvelope"
"encoders.orly/event" "encoders.orly/event"
"encoders.orly/filter" "encoders.orly/filter"
"encoders.orly/hex"
"encoders.orly/kind"
"github.com/coder/websocket" "github.com/coder/websocket"
"interfaces.orly/publisher" "interfaces.orly/publisher"
"interfaces.orly/typer" "interfaces.orly/typer"
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
"lol.mleku.dev/log" "lol.mleku.dev/log"
utils "utils.orly"
) )
const Type = "socketapi" const Type = "socketapi"
type Subscription struct { type Subscription struct {
remote string remote string
AuthedPubkey []byte
*filter.S *filter.S
} }
@@ -46,6 +50,9 @@ type W struct {
// associated with this WebSocket connection. It is used to determine which // associated with this WebSocket connection. It is used to determine which
// notifications or data should be received by the subscriber. // notifications or data should be received by the subscriber.
Filters *filter.S Filters *filter.S
// AuthedPubkey is the authenticated pubkey associated with the listener (if any).
AuthedPubkey []byte
} }
func (w *W) Type() (typeName string) { return Type } func (w *W) Type() (typeName string) { return Type }
@@ -56,7 +63,7 @@ func (w *W) Type() (typeName string) { return Type }
type P struct { type P struct {
c context.Context c context.Context
// Mx is the mutex for the Map. // Mx is the mutex for the Map.
Mx sync.Mutex Mx sync.RWMutex
// Map is the map of subscribers and subscriptions from the websocket api. // Map is the map of subscribers and subscriptions from the websocket api.
Map Map
} }
@@ -112,7 +119,9 @@ func (p *P) Receive(msg typer.T) {
defer p.Mx.Unlock() defer p.Mx.Unlock()
if subs, ok := p.Map[m.Conn]; !ok { if subs, ok := p.Map[m.Conn]; !ok {
subs = make(map[string]Subscription) subs = make(map[string]Subscription)
subs[m.Id] = Subscription{S: m.Filters, remote: m.remote} subs[m.Id] = Subscription{
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
}
p.Map[m.Conn] = subs p.Map[m.Conn] = subs
log.D.C( log.D.C(
func() string { func() string {
@@ -124,7 +133,9 @@ func (p *P) Receive(msg typer.T) {
}, },
) )
} else { } else {
subs[m.Id] = Subscription{S: m.Filters, remote: m.remote} subs[m.Id] = Subscription{
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
}
log.D.C( log.D.C(
func() string { func() string {
return fmt.Sprintf( return fmt.Sprintf(
@@ -150,71 +161,111 @@ func (p *P) Receive(msg typer.T) {
// for unauthenticated users when events are privileged. // for unauthenticated users when events are privileged.
func (p *P) Deliver(ev *event.E) { func (p *P) Deliver(ev *event.E) {
var err error var err error
p.Mx.Lock() // Snapshot the deliveries under read lock to avoid holding locks during I/O
defer p.Mx.Unlock() p.Mx.RLock()
log.D.C( type delivery struct {
func() string { w *websocket.Conn
return fmt.Sprintf( id string
"delivering event %0x to websocket subscribers %d", ev.ID, sub Subscription
len(p.Map), }
) var deliveries []delivery
},
)
for w, subs := range p.Map { for w, subs := range p.Map {
for id, subscriber := range subs { for id, subscriber := range subs {
if !subscriber.Match(ev) { if subscriber.Match(ev) {
continue deliveries = append(
deliveries, delivery{w: w, id: id, sub: subscriber},
)
} }
// if p.Server.AuthRequired() {
// if !auth.CheckPrivilege(w.AuthedPubkey(), ev) {
// continue
// }
// }
var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith(id, ev); chk.E(err) {
continue
}
if err = w.Write(
p.c, websocket.MessageText, res.Marshal(nil),
); chk.E(err) {
p.removeSubscriber(w)
if err = w.CloseNow(); chk.E(err) {
continue
}
continue
}
log.D.C(
func() string {
return fmt.Sprintf(
"dispatched event %0x to subscription %s, %s",
ev.ID, id, subscriber.remote,
)
},
)
} }
} }
p.Mx.RUnlock()
if len(deliveries) > 0 {
log.D.C(
func() string {
return fmt.Sprintf(
"delivering event %0x to websocket subscribers %d", ev.ID,
len(deliveries),
)
},
)
}
for _, d := range deliveries {
// If the event is privileged, enforce that the subscriber's authed pubkey matches
// either the event pubkey or appears in any 'p' tag of the event.
if kind.IsPrivileged(ev.Kind) && len(d.sub.AuthedPubkey) > 0 {
pk := d.sub.AuthedPubkey
allowed := false
// Direct author match
if utils.FastEqual(ev.Pubkey, pk) {
allowed = true
} else if ev.Tags != nil {
for _, pTag := range ev.Tags.GetAll([]byte("p")) {
// pTag.Value() returns []byte hex string; decode to bytes
dec, derr := hex.Dec(string(pTag.Value()))
if derr != nil {
continue
}
if utils.FastEqual(dec, pk) {
allowed = true
break
}
}
}
if !allowed {
// Skip delivery for this subscriber
continue
}
}
var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith(d.id, ev); chk.E(err) {
continue
}
// Use a separate context with timeout for writes to prevent race conditions
// where the publisher context gets cancelled while writing events
writeCtx, cancel := context.WithTimeout(
context.Background(), DefaultWriteTimeout,
)
defer cancel()
if err = d.w.Write(
writeCtx, websocket.MessageText, res.Marshal(nil),
); chk.E(err) {
// On error, remove the subscriber connection safely
p.removeSubscriber(d.w)
_ = d.w.CloseNow()
continue
}
log.D.C(
func() string {
return fmt.Sprintf(
"dispatched event %0x to subscription %s, %s",
ev.ID, d.id, d.sub.remote,
)
},
)
}
} }
// removeSubscriberId removes a specific subscription from a subscriber // removeSubscriberId removes a specific subscription from a subscriber
// websocket. // websocket.
func (p *P) removeSubscriberId(ws *websocket.Conn, id string) { func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
p.Mx.Lock() p.Mx.Lock()
defer p.Mx.Unlock()
var subs map[string]Subscription var subs map[string]Subscription
var ok bool var ok bool
if subs, ok = p.Map[ws]; ok { if subs, ok = p.Map[ws]; ok {
delete(p.Map[ws], id) delete(subs, id)
_ = subs // Check the actual map after deletion, not the original reference
if len(subs) == 0 { if len(p.Map[ws]) == 0 {
delete(p.Map, ws) delete(p.Map, ws)
} }
} }
p.Mx.Unlock()
} }
// removeSubscriber removes a websocket from the P collection. // removeSubscriber removes a websocket from the P collection.
func (p *P) removeSubscriber(ws *websocket.Conn) { func (p *P) removeSubscriber(ws *websocket.Conn) {
p.Mx.Lock() p.Mx.Lock()
defer p.Mx.Unlock()
clear(p.Map[ws]) clear(p.Map[ws])
delete(p.Map, ws) delete(p.Map, ws)
p.Mx.Unlock()
} }

View File

@@ -2,14 +2,12 @@ package app
import ( import (
"context" "context"
"fmt"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"database.orly" "database.orly"
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/app/config" "next.orly.dev/app/config"
"protocol.orly/publish" "protocol.orly/publish"
) )
@@ -25,11 +23,11 @@ type Server struct {
} }
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.T.C( // log.T.C(
func() string { // func() string {
return fmt.Sprintf("path %v header %v", r.URL, r.Header) // return fmt.Sprintf("path %v header %v", r.URL, r.Header)
}, // },
) // )
if r.Header.Get("Upgrade") == "websocket" { if r.Header.Get("Upgrade") == "websocket" {
s.HandleWebsocket(w, r) s.HandleWebsocket(w, r)
} else if r.Header.Get("Accept") == "application/nostr+json" { } else if r.Header.Get("Accept") == "application/nostr+json" {

View File

@@ -28,10 +28,10 @@ func (s *S) Configure(cfg ...any) (err error) {
return err return err
} }
func (s *S) GetAccessLevel(pub []byte) (level string) { func (s *S) GetAccessLevel(pub []byte, address string) (level string) {
for _, i := range s.ACL { for _, i := range s.ACL {
if i.Type() == s.Active.Load() { if i.Type() == s.Active.Load() {
level = i.GetAccessLevel(pub) level = i.GetAccessLevel(pub, address)
break break
} }
} }

View File

@@ -45,13 +45,13 @@ func (f *Follows) Configure(cfg ...any) (err error) {
for _, ca := range cfg { for _, ca := range cfg {
switch c := ca.(type) { switch c := ca.(type) {
case *config.C: case *config.C:
log.D.F("setting ACL config: %v", c) // log.D.F("setting ACL config: %v", c)
f.cfg = c f.cfg = c
case *database.D: case *database.D:
log.D.F("setting ACL database: %s", c.Path()) // log.D.F("setting ACL database: %s", c.Path())
f.D = c f.D = c
case context.Context: case context.Context:
log.D.F("setting ACL context: %s", c.Value("id")) // log.D.F("setting ACL context: %s", c.Value("id"))
f.Ctx = c f.Ctx = c
default: default:
err = errorf.E("invalid type: %T", reflect.TypeOf(ca)) err = errorf.E("invalid type: %T", reflect.TypeOf(ca))
@@ -64,13 +64,15 @@ func (f *Follows) Configure(cfg ...any) (err error) {
// find admin follow lists // find admin follow lists
f.followsMx.Lock() f.followsMx.Lock()
defer f.followsMx.Unlock() defer f.followsMx.Unlock()
log.I.F("finding admins") // log.I.F("finding admins")
f.follows, f.admins = nil, nil f.follows, f.admins = nil, nil
for _, admin := range f.cfg.Admins { for _, admin := range f.cfg.Admins {
log.I.F("%s", admin) // log.I.F("%s", admin)
var adm []byte var adm []byte
if adm, err = bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(err) { if a, e := bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(e) {
continue continue
} else {
adm = a
} }
log.I.F("admin: %0x", adm) log.I.F("admin: %0x", adm)
f.admins = append(f.admins, adm) f.admins = append(f.admins, adm)
@@ -96,12 +98,14 @@ func (f *Follows) Configure(cfg ...any) (err error) {
if ev, err = f.D.FetchEventBySerial(s); chk.E(err) { if ev, err = f.D.FetchEventBySerial(s); chk.E(err) {
continue continue
} }
log.I.F("admin follow list:\n%s", ev.Serialize()) // log.I.F("admin follow list:\n%s", ev.Serialize())
for _, v := range ev.Tags.GetAll([]byte("p")) { for _, v := range ev.Tags.GetAll([]byte("p")) {
log.I.F("adding follow: %s", v.Value()) // log.I.F("adding follow: %s", v.Value())
var a []byte var a []byte
if a, err = hex.Dec(string(v.Value())); chk.E(err) { if b, e := hex.Dec(string(v.Value())); chk.E(e) {
continue continue
} else {
a = b
} }
f.follows = append(f.follows, a) f.follows = append(f.follows, a)
} }
@@ -116,7 +120,7 @@ func (f *Follows) Configure(cfg ...any) (err error) {
return return
} }
func (f *Follows) GetAccessLevel(pub []byte) (level string) { func (f *Follows) GetAccessLevel(pub []byte, address string) (level string) {
if f.cfg == nil { if f.cfg == nil {
return "write" return "write"
} }
@@ -203,7 +207,7 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
log.W.F("follows syncer: no admin relays found in DB (kind 10002)") log.W.F("follows syncer: no admin relays found in DB (kind 10002)")
return return
} }
log.I.F( log.T.F(
"follows syncer: subscribing to %d relays for %d authors", len(urls), "follows syncer: subscribing to %d relays for %d authors", len(urls),
len(authors), len(authors),
) )
@@ -240,13 +244,13 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
} }
*ff = append(*ff, f1) *ff = append(*ff, f1)
req := reqenvelope.NewFrom([]byte("follows-sync"), ff) req := reqenvelope.NewFrom([]byte("follows-sync"), ff)
if err := c.Write( if err = c.Write(
ctx, websocket.MessageText, req.Marshal(nil), ctx, websocket.MessageText, req.Marshal(nil),
); chk.E(err) { ); chk.E(err) {
_ = c.Close(websocket.StatusInternalError, "write failed") _ = c.Close(websocket.StatusInternalError, "write failed")
continue continue
} }
log.I.F("sent REQ to %s for follows subscription", u) log.T.F("sent REQ to %s for follows subscription", u)
// read loop // read loop
for { for {
select { select {
@@ -274,11 +278,11 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
if ok, err := res.Event.Verify(); chk.T(err) || !ok { if ok, err := res.Event.Verify(); chk.T(err) || !ok {
continue continue
} }
if _, _, err := f.D.SaveEvent( if _, _, err = f.D.SaveEvent(
ctx, res.Event, ctx, res.Event,
); err != nil { ); err != nil {
if !strings.HasPrefix( if !strings.HasPrefix(
err.Error(), "event already exists", err.Error(), "blocked:",
) { ) {
log.W.F( log.W.F(
"follows syncer: save event failed: %v", "follows syncer: save event failed: %v",
@@ -333,6 +337,7 @@ func (f *Follows) Syncer() {
} }
} }
}() }()
f.updated <- struct{}{}
} }
func init() { func init() {

View File

@@ -8,7 +8,7 @@ type None struct{}
func (n None) Configure(cfg ...any) (err error) { return } func (n None) Configure(cfg ...any) (err error) { return }
func (n None) GetAccessLevel(pub []byte) (level string) { func (n None) GetAccessLevel(pub []byte, address string) (level string) {
return "write" return "write"
} }

View File

@@ -15,7 +15,7 @@ import (
type Signer struct { type Signer struct {
SecretKey *secp256k1.SecretKey SecretKey *secp256k1.SecretKey
PublicKey *secp256k1.PublicKey PublicKey *secp256k1.PublicKey
BTCECSec *ec.SecretKey BTCECSec *secp256k1.SecretKey
pkb, skb []byte pkb, skb []byte
} }
@@ -23,11 +23,11 @@ var _ signer.I = &Signer{}
// Generate creates a new Signer. // Generate creates a new Signer.
func (s *Signer) Generate() (err error) { func (s *Signer) Generate() (err error) {
if s.SecretKey, err = ec.NewSecretKey(); chk.E(err) { if s.SecretKey, err = secp256k1.GenerateSecretKey(); chk.E(err) {
return return
} }
s.skb = s.SecretKey.Serialize() s.skb = s.SecretKey.Serialize()
s.BTCECSec, _ = ec.PrivKeyFromBytes(s.skb) s.BTCECSec = secp256k1.PrivKeyFromBytes(s.skb)
s.PublicKey = s.SecretKey.PubKey() s.PublicKey = s.SecretKey.PubKey()
s.pkb = schnorr.SerializePubKey(s.PublicKey) s.pkb = schnorr.SerializePubKey(s.PublicKey)
return return
@@ -43,7 +43,7 @@ func (s *Signer) InitSec(sec []byte) (err error) {
s.SecretKey = secp256k1.SecKeyFromBytes(sec) s.SecretKey = secp256k1.SecKeyFromBytes(sec)
s.PublicKey = s.SecretKey.PubKey() s.PublicKey = s.SecretKey.PubKey()
s.pkb = schnorr.SerializePubKey(s.PublicKey) s.pkb = schnorr.SerializePubKey(s.PublicKey)
s.BTCECSec, _ = ec.PrivKeyFromBytes(s.skb) s.BTCECSec = secp256k1.PrivKeyFromBytes(s.skb)
return return
} }
@@ -142,7 +142,7 @@ func (s *Signer) ECDH(pubkeyBytes []byte) (secret []byte, err error) {
); chk.E(err) { ); chk.E(err) {
return return
} }
secret = ec.GenerateSharedSecret(s.BTCECSec, pub) secret = secp256k1.GenerateSharedSecret(s.BTCECSec, pub)
return return
} }
@@ -154,7 +154,7 @@ type Keygen struct {
// Generate a new key pair. If the result is suitable, the embedded Signer can have its contents // Generate a new key pair. If the result is suitable, the embedded Signer can have its contents
// extracted. // extracted.
func (k *Keygen) Generate() (pubBytes []byte, err error) { func (k *Keygen) Generate() (pubBytes []byte, err error) {
if k.Signer.SecretKey, err = ec.NewSecretKey(); chk.E(err) { if k.Signer.SecretKey, err = secp256k1.GenerateSecretKey(); chk.E(err) {
return return
} }
k.Signer.PublicKey = k.SecretKey.PubKey() k.Signer.PublicKey = k.SecretKey.PubKey()

View File

@@ -9,6 +9,7 @@ import (
types2 "database.orly/indexes/types" types2 "database.orly/indexes/types"
"encoders.orly/filter" "encoders.orly/filter"
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
"lol.mleku.dev/log"
) )
type Range struct { type Range struct {
@@ -89,12 +90,20 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
return return
} }
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
// Create an index prefix without the serial number
idx := indexes.IdEnc(i, nil) idx := indexes.IdEnc(i, nil)
if err = idx.MarshalWrite(buf); chk.E(err) { if err = idx.MarshalWrite(buf); chk.E(err) {
return return
} }
b := buf.Bytes() b := buf.Bytes()
r := Range{b, b} // Create range that will match any serial value with this ID prefix
end := make([]byte, len(b))
copy(end, b)
// Fill the end range with 0xff bytes to match all possible serial values
for i := 0; i < 5; i++ {
end = append(end, 0xff)
}
r := Range{b, end}
idxs = append(idxs, r) idxs = append(idxs, r)
return return
}(); chk.E(err) { }(); chk.E(err) {
@@ -230,6 +239,7 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
for _, t := range *f.Tags { for _, t := range *f.Tags {
if t.Len() >= 2 && (len(t.Key()) == 1 || (len(t.Key()) == 2 && t.Key()[0] == '#')) { if t.Len() >= 2 && (len(t.Key()) == 1 || (len(t.Key()) == 2 && t.Key()[0] == '#')) {
var p *types2.PubHash var p *types2.PubHash
log.I.S(author)
if p, err = CreatePubHashFromData(author); chk.E(err) { if p, err = CreatePubHashFromData(author); chk.E(err) {
return return
} }
@@ -306,8 +316,8 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
for _, author := range f.Authors.T { for _, author := range f.Authors.T {
kind := new(types2.Uint16) kind := new(types2.Uint16)
kind.Set(k) kind.Set(k)
p := new(types2.PubHash) var p *types2.PubHash
if err = p.FromPubkey(author); chk.E(err) { if p, err = CreatePubHashFromData(author); chk.E(err) {
return return
} }
start, end := new(bytes.Buffer), new(bytes.Buffer) start, end := new(bytes.Buffer), new(bytes.Buffer)
@@ -351,8 +361,9 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
// Pubkey pc- // Pubkey pc-
if f.Authors != nil && f.Authors.Len() > 0 { if f.Authors != nil && f.Authors.Len() > 0 {
for _, author := range f.Authors.T { for _, author := range f.Authors.T {
p := new(types2.PubHash) var p *types2.PubHash
if err = p.FromPubkey(author); chk.E(err) { log.I.S(author)
if p, err = CreatePubHashFromData(author); chk.E(err) {
return return
} }
start, end := new(bytes.Buffer), new(bytes.Buffer) start, end := new(bytes.Buffer), new(bytes.Buffer)

View File

@@ -5,20 +5,33 @@ import (
"database.orly/indexes/types" "database.orly/indexes/types"
"encoders.orly/filter" "encoders.orly/filter"
"encoders.orly/hex"
"encoders.orly/tag" "encoders.orly/tag"
"github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4"
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
"lol.mleku.dev/errorf" "lol.mleku.dev/errorf"
"lol.mleku.dev/log"
) )
func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) { func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
log.T.F("GetSerialById: input id=%s", hex.Enc(id))
var idxs []Range var idxs []Range
if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.NewFromBytesSlice(id)}); chk.E(err) { if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.NewFromBytesSlice(id)}); chk.E(err) {
return return
} }
for i, idx := range idxs {
log.T.F(
"GetSerialById: searching range %d: start=%x, end=%x", i, idx.Start,
idx.End,
)
}
if len(idxs) == 0 { if len(idxs) == 0 {
err = errorf.E("no indexes found for id %0x", id) err = errorf.E("no indexes found for id %0x", id)
return
} }
idFound := false
if err = d.View( if err = d.View(
func(txn *badger.Txn) (err error) { func(txn *badger.Txn) (err error) {
it := txn.NewIterator(badger.DefaultIteratorOptions) it := txn.NewIterator(badger.DefaultIteratorOptions)
@@ -33,15 +46,24 @@ func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
if err = ser.UnmarshalRead(buf); chk.E(err) { if err = ser.UnmarshalRead(buf); chk.E(err) {
return return
} }
idFound = true
} else { } else {
// just don't return what we don't have? others may be // Item not found in database
// found tho. log.T.F(
"GetSerialById: ID not found in database: %s", hex.Enc(id),
)
} }
return return
}, },
); chk.E(err) { ); chk.E(err) {
return return
} }
if !idFound {
err = errorf.T("id not found in database: %s", hex.Enc(id))
return
}
return return
} }

View File

@@ -20,7 +20,15 @@ func (d *D) GetSerialsByRange(idx Range) (
}, },
) )
defer it.Close() defer it.Close()
for it.Seek(idx.End); it.Valid(); it.Next() { // Start from a position that includes the end boundary (until timestamp)
// We create an end boundary that's slightly beyond the actual end to ensure inclusivity
endBoundary := make([]byte, len(idx.End))
copy(endBoundary, idx.End)
// Add 0xff bytes to ensure we capture all events at the exact until timestamp
for i := 0; i < 5; i++ {
endBoundary = append(endBoundary, 0xff)
}
for it.Seek(endBoundary); it.Valid(); it.Next() {
item := it.Item() item := it.Item()
var key []byte var key []byte
key = item.Key() key = item.Key()

View File

@@ -15,10 +15,13 @@ const PubHashLen = 8
type PubHash struct{ val [PubHashLen]byte } type PubHash struct{ val [PubHashLen]byte }
func (ph *PubHash) FromPubkey(pk []byte) (err error) { func (ph *PubHash) FromPubkey(pk []byte) (err error) {
if len(pk) == 0 {
panic("nil pubkey")
}
if len(pk) != schnorr.PubKeyBytesLen { if len(pk) != schnorr.PubKeyBytesLen {
err = errorf.E( err = errorf.E(
"invalid Pubkey length, got %d require %d", "invalid Pubkey length, got %d require %d %0x",
len(pk), schnorr.PubKeyBytesLen, len(pk), schnorr.PubKeyBytesLen, pk,
) )
return return
} }

View File

@@ -5,6 +5,7 @@ import (
"context" "context"
"sort" "sort"
"strconv" "strconv"
"strings"
"time" "time"
"crypto.orly/sha256" "crypto.orly/sha256"
@@ -42,23 +43,71 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
var expDeletes types.Uint40s var expDeletes types.Uint40s
var expEvs event.S var expEvs event.S
if f.Ids != nil && f.Ids.Len() > 0 { if f.Ids != nil && f.Ids.Len() > 0 {
for _, id := range f.Ids.T {
log.T.F("QueryEvents: looking for ID=%s", hex.Enc(id))
}
log.T.F("QueryEvents: ids path, count=%d", f.Ids.Len())
for _, idx := range f.Ids.T { for _, idx := range f.Ids.T {
log.T.F("QueryEvents: lookup id=%s", hex.Enc(idx))
// we know there is only Ids in this, so run the ID query and fetch. // we know there is only Ids in this, so run the ID query and fetch.
var ser *types.Uint40 var ser *types.Uint40
if ser, err = d.GetSerialById(idx); chk.E(err) { var idErr error
if ser, idErr = d.GetSerialById(idx); idErr != nil {
// Check if this is a "not found" error which is expected for IDs we don't have
if strings.Contains(idErr.Error(), "id not found in database") {
log.T.F(
"QueryEvents: ID not found in database: %s",
hex.Enc(idx),
)
} else {
// Log unexpected errors but continue processing other IDs
log.E.F(
"QueryEvents: error looking up id=%s err=%v",
hex.Enc(idx), idErr,
)
}
continue
}
// Check if the serial is nil, which indicates the ID wasn't found
if ser == nil {
log.T.F("QueryEvents: Serial is nil for ID: %s", hex.Enc(idx))
continue continue
} }
// fetch the events // fetch the events
var ev *event.E var ev *event.E
if ev, err = d.FetchEventBySerial(ser); err != nil { if ev, err = d.FetchEventBySerial(ser); err != nil {
log.T.F(
"QueryEvents: fetch by serial failed for id=%s ser=%v err=%v",
hex.Enc(idx), ser, err,
)
continue continue
} }
log.T.F(
"QueryEvents: found id=%s kind=%d created_at=%d",
hex.Enc(ev.ID), ev.Kind, ev.CreatedAt,
)
// check for an expiration tag and delete after returning the result // check for an expiration tag and delete after returning the result
if CheckExpiration(ev) { if CheckExpiration(ev) {
log.T.F(
"QueryEvents: id=%s filtered out due to expiration",
hex.Enc(ev.ID),
)
expDeletes = append(expDeletes, ser) expDeletes = append(expDeletes, ser)
expEvs = append(expEvs, ev) expEvs = append(expEvs, ev)
continue continue
} }
// skip events that have been deleted by a proper deletion event
if derr := d.CheckForDeleted(ev, nil); derr != nil {
log.T.F(
"QueryEvents: id=%s filtered out due to deletion: %v",
hex.Enc(ev.ID), derr,
)
continue
}
log.T.F(
"QueryEvents: id=%s SUCCESSFULLY FOUND, adding to results",
hex.Enc(ev.ID),
)
evs = append(evs, ev) evs = append(evs, ev)
} }
// sort the events by timestamp // sort the events by timestamp
@@ -68,10 +117,15 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
}, },
) )
} else { } else {
// non-IDs path
var idPkTs []*store.IdPkTs var idPkTs []*store.IdPkTs
// if f.Authors != nil && f.Authors.Len() > 0 && f.Kinds != nil && f.Kinds.Len() > 0 {
// log.T.F("QueryEvents: authors+kinds path, authors=%d kinds=%d", f.Authors.Len(), f.Kinds.Len())
// }
if idPkTs, err = d.QueryForIds(c, f); chk.E(err) { if idPkTs, err = d.QueryForIds(c, f); chk.E(err) {
return return
} }
// log.T.F("QueryEvents: QueryForIds returned %d candidates", len(idPkTs))
// Create a map to store the latest version of replaceable events // Create a map to store the latest version of replaceable events
replaceableEvents := make(map[string]*event.E) replaceableEvents := make(map[string]*event.E)
// Create a map to store the latest version of parameterized replaceable // Create a map to store the latest version of parameterized replaceable
@@ -83,8 +137,8 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
// events) // events)
deletionsByKindPubkey := make(map[string]bool) deletionsByKindPubkey := make(map[string]bool)
// Map to track deletion events by kind, pubkey, and d-tag (for // Map to track deletion events by kind, pubkey, and d-tag (for
// parameterized replaceable events) // parameterized replaceable events). We store the newest delete timestamp per d-tag.
deletionsByKindPubkeyDTag := make(map[string]map[string]bool) deletionsByKindPubkeyDTag := make(map[string]map[string]int64)
// Map to track specific event IDs that have been deleted // Map to track specific event IDs that have been deleted
deletedEventIds := make(map[string]bool) deletedEventIds := make(map[string]bool)
// Query for deletion events separately if we have authors in the filter // Query for deletion events separately if we have authors in the filter
@@ -169,11 +223,13 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
key := hex.Enc(pk) + ":" + strconv.Itoa(int(kk.K)) key := hex.Enc(pk) + ":" + strconv.Itoa(int(kk.K))
// Initialize the inner map if it doesn't exist // Initialize the inner map if it doesn't exist
if _, exists := deletionsByKindPubkeyDTag[key]; !exists { if _, exists := deletionsByKindPubkeyDTag[key]; !exists {
deletionsByKindPubkeyDTag[key] = make(map[string]bool) deletionsByKindPubkeyDTag[key] = make(map[string]int64)
} }
// Mark this d-tag as deleted // Record the newest delete timestamp for this d-tag
dValue := string(split[2]) dValue := string(split[2])
deletionsByKindPubkeyDTag[key][dValue] = true if ts, ok := deletionsByKindPubkeyDTag[key][dValue]; !ok || ev.CreatedAt > ts {
deletionsByKindPubkeyDTag[key][dValue] = ev.CreatedAt
}
// Debug logging // Debug logging
} }
// For replaceable events, we need to check if there are any // For replaceable events, we need to check if there are any
@@ -225,15 +281,16 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
} }
// Initialize the inner map if it doesn't exist // Initialize the inner map if it doesn't exist
if _, exists := deletionsByKindPubkeyDTag[key]; !exists { if _, exists := deletionsByKindPubkeyDTag[key]; !exists {
deletionsByKindPubkeyDTag[key] = make(map[string]bool) deletionsByKindPubkeyDTag[key] = make(map[string]int64)
}
// Record the newest delete timestamp for this d-tag
if ts, ok := deletionsByKindPubkeyDTag[key][dValue]; !ok || ev.CreatedAt > ts {
deletionsByKindPubkeyDTag[key][dValue] = ev.CreatedAt
} }
// Mark this d-tag as deleted
deletionsByKindPubkeyDTag[key][dValue] = true
} }
} }
} }
} }
// Second pass: process all events, filtering out deleted ones // Second pass: process all events, filtering out deleted ones
for _, idpk := range idPkTs { for _, idpk := range idPkTs {
var ev *event.E var ev *event.E
@@ -244,6 +301,83 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
if ev, err = d.FetchEventBySerial(ser); err != nil { if ev, err = d.FetchEventBySerial(ser); err != nil {
continue continue
} }
// Add logging for tag filter debugging
if f.Tags != nil && f.Tags.Len() > 0 {
var eventTags []string
if ev.Tags != nil && ev.Tags.Len() > 0 {
for _, t := range *ev.Tags {
if t.Len() >= 2 {
eventTags = append(
eventTags,
string(t.Key())+"="+string(t.Value()),
)
}
}
}
// log.T.F(
// "QueryEvents: processing event ID=%s kind=%d tags=%v",
// hex.Enc(ev.ID), ev.Kind, eventTags,
// )
// Check if this event matches ALL required tags in the filter
tagMatches := 0
for _, filterTag := range *f.Tags {
if filterTag.Len() >= 2 {
filterKey := filterTag.Key()
// Handle filter keys that start with # (remove the prefix for comparison)
var actualKey []byte
if len(filterKey) == 2 && filterKey[0] == '#' {
actualKey = filterKey[1:]
} else {
actualKey = filterKey
}
// Check if event has this tag key with any of the filter's values
eventHasTag := false
if ev.Tags != nil {
for _, eventTag := range *ev.Tags {
if eventTag.Len() >= 2 && bytes.Equal(
eventTag.Key(), actualKey,
) {
// Check if the event's tag value matches any of the filter's values
for _, filterValue := range filterTag.T[1:] {
if bytes.Equal(
eventTag.Value(), filterValue,
) {
eventHasTag = true
break
}
}
if eventHasTag {
break
}
}
}
}
if eventHasTag {
tagMatches++
}
// log.T.F(
// "QueryEvents: tag filter %s (actual key: %s) matches: %v (total matches: %d/%d)",
// string(filterKey), string(actualKey), eventHasTag,
// tagMatches, f.Tags.Len(),
// )
}
}
// If not all tags match, skip this event
if tagMatches < f.Tags.Len() {
// log.T.F(
// "QueryEvents: event ID=%s SKIPPED - only matches %d/%d required tags",
// hex.Enc(ev.ID), tagMatches, f.Tags.Len(),
// )
continue
}
// log.T.F(
// "QueryEvents: event ID=%s PASSES all tag filters",
// hex.Enc(ev.ID),
// )
}
// Skip events with kind 5 (Deletion) // Skip events with kind 5 (Deletion)
if ev.Kind == kind.Deletion.K { if ev.Kind == kind.Deletion.K {
continue continue
@@ -308,10 +442,10 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
// Check if this event has been deleted via an a-tag // Check if this event has been deleted via an a-tag
if deletionMap, exists := deletionsByKindPubkeyDTag[key]; exists { if deletionMap, exists := deletionsByKindPubkeyDTag[key]; exists {
// If the d-tag value is in the deletion map and this event // If there is a deletion timestamp and this event is older than the deletion,
// is not specifically requested by ID, skip it // and this event is not specifically requested by ID, skip it
if deletionMap[dValue] && !isIdInFilter { if delTs, ok := deletionMap[dValue]; ok && ev.CreatedAt < delTs && !isIdInFilter {
log.T.F("Debug: Event deleted - skipping") log.T.F("Debug: Event deleted by a-tag (older than delete) - skipping")
continue continue
} }
} }

View File

@@ -199,25 +199,12 @@ func (d *D) CheckForDeleted(ev *event.E, admins [][]byte) (err error) {
sers = append(sers, s...) sers = append(sers, s...)
} }
if len(sers) > 0 { if len(sers) > 0 {
var idPkTss []*store.IdPkTs // For e-tag deletions (delete by ID), any deletion event means the event cannot be resubmitted
var tmp []*store.IdPkTs // regardless of timestamp, since it's a specific deletion of this exact event
if tmp, err = d.GetFullIdPubkeyBySerials(sers); chk.E(err) { err = errorf.E(
return "blocked: %0x was deleted by ID and cannot be resubmitted",
} ev.ID,
idPkTss = append(idPkTss, tmp...)
// sort by timestamp, so the first is the newest
sort.Slice(
idPkTss, func(i, j int) bool {
return idPkTss[i].Ts > idPkTss[j].Ts
},
) )
if ev.CreatedAt < idPkTss[0].Ts {
err = errorf.E(
"blocked: %0x was deleted because it is older than the delete: event: %d delete: %d",
ev.ID, ev.CreatedAt, idPkTss[0].Ts,
)
return
}
return return
} }

View File

@@ -3,11 +3,14 @@ package database
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"strings"
"database.orly/indexes" "database.orly/indexes"
"database.orly/indexes/types" "database.orly/indexes/types"
"encoders.orly/event" "encoders.orly/event"
"encoders.orly/filter" "encoders.orly/filter"
"encoders.orly/hex"
"encoders.orly/kind" "encoders.orly/kind"
"encoders.orly/tag" "encoders.orly/tag"
"github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4"
@@ -42,12 +45,32 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
// check if the event already exists // check if the event already exists
var ser *types.Uint40 var ser *types.Uint40
if ser, err = d.GetSerialById(ev.ID); err == nil && ser != nil { if ser, err = d.GetSerialById(ev.ID); err == nil && ser != nil {
err = errorf.E("event already exists: %0x", ev.ID) err = errorf.E("blocked: event already exists: %0x", ev.ID)
return
}
// If the error is "id not found", we can proceed with saving the event
if err != nil && strings.Contains(err.Error(), "id not found in database") {
// Reset error since this is expected for new events
err = nil
} else if err != nil {
// For any other error, return it
log.E.F("error checking if event exists: %s", err)
return
}
// Check if the event has been deleted before allowing resubmission
if err = d.CheckForDeleted(ev, nil); err != nil {
// log.I.F(
// "SaveEvent: rejecting resubmission of deleted event ID=%s: %v",
// hex.Enc(ev.ID), err,
// )
err = errorf.E("blocked: %s", err.Error())
return return
} }
// check for replacement // check for replacement
if kind.IsReplaceable(ev.Kind) { if kind.IsReplaceable(ev.Kind) {
// find the events and delete them // find the events and check timestamps before deleting
f := &filter.F{ f := &filter.F{
Authors: tag.NewFromBytesSlice(ev.Pubkey), Authors: tag.NewFromBytesSlice(ev.Pubkey),
Kinds: kind.NewS(kind.New(ev.Kind)), Kinds: kind.NewS(kind.New(ev.Kind)),
@@ -56,22 +79,50 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
if sers, err = d.GetSerialsFromFilter(f); chk.E(err) { if sers, err = d.GetSerialsFromFilter(f); chk.E(err) {
return return
} }
// if found, delete them // if found, check timestamps before deleting
if len(sers) > 0 { if len(sers) > 0 {
var shouldReplace bool = true
for _, s := range sers { for _, s := range sers {
var oldEv *event.E var oldEv *event.E
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) { if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
continue continue
} }
if err = d.DeleteEventBySerial( // Only replace if the new event is newer or same timestamp
c, s, oldEv, if ev.CreatedAt < oldEv.CreatedAt {
); chk.E(err) { log.I.F(
continue "SaveEvent: rejecting older replaceable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)",
hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID),
oldEv.CreatedAt,
)
shouldReplace = false
break
} }
} }
if shouldReplace {
for _, s := range sers {
var oldEv *event.E
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
continue
}
log.I.F(
"SaveEvent: replacing older replaceable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)",
hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID),
ev.CreatedAt,
)
if err = d.DeleteEventBySerial(
c, s, oldEv,
); chk.E(err) {
continue
}
}
} else {
// Don't save the older event - return an error
err = errorf.E("blocked: event is older than existing replaceable event")
return
}
} }
} else if kind.IsParameterizedReplaceable(ev.Kind) { } else if kind.IsParameterizedReplaceable(ev.Kind) {
// find the events and delete them // find the events and check timestamps before deleting
dTag := ev.Tags.GetFirst([]byte("d")) dTag := ev.Tags.GetFirst([]byte("d"))
if dTag == nil { if dTag == nil {
err = errorf.E("event is missing a d tag identifier") err = errorf.E("event is missing a d tag identifier")
@@ -88,19 +139,47 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
if sers, err = d.GetSerialsFromFilter(f); chk.E(err) { if sers, err = d.GetSerialsFromFilter(f); chk.E(err) {
return return
} }
// if found, delete them // if found, check timestamps before deleting
if len(sers) > 0 { if len(sers) > 0 {
var shouldReplace bool = true
for _, s := range sers { for _, s := range sers {
var oldEv *event.E var oldEv *event.E
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) { if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
continue continue
} }
if err = d.DeleteEventBySerial( // Only replace if the new event is newer or same timestamp
c, s, oldEv, if ev.CreatedAt < oldEv.CreatedAt {
); chk.E(err) { log.I.F(
continue "SaveEvent: rejecting older addressable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)",
hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID),
oldEv.CreatedAt,
)
shouldReplace = false
break
} }
} }
if shouldReplace {
for _, s := range sers {
var oldEv *event.E
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
continue
}
log.I.F(
"SaveEvent: replacing older addressable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)",
hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID),
ev.CreatedAt,
)
if err = d.DeleteEventBySerial(
c, s, oldEv,
); chk.E(err) {
continue
}
}
} else {
// Don't save the older event - return an error
err = errorf.E("blocked: event is older than existing addressable event")
return
}
} }
} }
// Get the next sequence number for the event // Get the next sequence number for the event
@@ -153,6 +232,14 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
return return
}, },
) )
log.T.F("total data written: %d bytes keys %d bytes values", kc, vc) log.T.F(
"total data written: %d bytes keys %d bytes values for event ID %s", kc,
vc, hex.Enc(ev.ID),
)
log.T.C(
func() string {
return fmt.Sprintf("event:\n%s\n", ev.Serialize())
},
)
return return
} }

View File

@@ -11,7 +11,6 @@ import (
"interfaces.orly/codec" "interfaces.orly/codec"
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
"lol.mleku.dev/errorf" "lol.mleku.dev/errorf"
"lol.mleku.dev/log"
"utils.orly/constraints" "utils.orly/constraints"
"utils.orly/units" "utils.orly/units"
) )
@@ -55,7 +54,7 @@ func (en *Challenge) Label() string { return L }
func (en *Challenge) Write(w io.Writer) (err error) { func (en *Challenge) Write(w io.Writer) (err error) {
var b []byte var b []byte
b = en.Marshal(b) b = en.Marshal(b)
log.D.F("writing out challenge envelope: '%s'", b) // log.D.F("writing out challenge envelope: '%s'", b)
_, err = w.Write(b) _, err = w.Write(b)
return return
} }

View File

@@ -5,7 +5,6 @@ import (
"encoders.orly/hex" "encoders.orly/hex"
"encoders.orly/ints" "encoders.orly/ints"
"encoders.orly/text" "encoders.orly/text"
"lol.mleku.dev/log"
) )
// ToCanonical converts the event to the canonical encoding used to derive the // ToCanonical converts the event to the canonical encoding used to derive the
@@ -23,7 +22,7 @@ func (ev *E) ToCanonical(dst []byte) (b []byte) {
b = append(b, ',') b = append(b, ',')
b = text.AppendQuote(b, ev.Content, text.NostrEscape) b = text.AppendQuote(b, ev.Content, text.NostrEscape)
b = append(b, ']') b = append(b, ']')
log.D.F("canonical: %s", b) // log.D.F("canonical: %s", b)
return return
} }

View File

@@ -333,7 +333,7 @@ var (
CommunityDefinition = &K{34550} CommunityDefinition = &K{34550}
ACLEvent = &K{39998} ACLEvent = &K{39998}
// ParameterizedReplaceableEnd is an event type that... // ParameterizedReplaceableEnd is an event type that...
ParameterizedReplaceableEnd = &K{39999} ParameterizedReplaceableEnd = &K{40000}
) )
var MapMx sync.RWMutex var MapMx sync.RWMutex

View File

@@ -22,7 +22,7 @@ const (
type I interface { type I interface {
Configure(cfg ...any) (err error) Configure(cfg ...any) (err error)
// GetAccessLevel returns the access level string for a given pubkey. // GetAccessLevel returns the access level string for a given pubkey.
GetAccessLevel(pub []byte) (level string) GetAccessLevel(pub []byte, address string) (level string)
// GetACLInfo returns the name and a description of the ACL, which should // GetACLInfo returns the name and a description of the ACL, which should
// explain briefly how it works, and then a long text of documentation of // explain briefly how it works, and then a long text of documentation of
// the ACL's rules and configuration (in asciidoc or markdown). // the ACL's rules and configuration (in asciidoc or markdown).

View File

@@ -1 +1 @@
v0.2.0 v0.4.2

19
scripts/relaytester-install.sh Executable file
View File

@@ -0,0 +1,19 @@
#!/usr/bin/env bash
## rust must be installed
if ! command -v "cargo" &> /dev/null; then
echo "rust and cargo is not installed."
echo "run this command to install:"
echo
echo "curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh"
exit
else
echo "cargo is installed."
fi
rm -rf relay-tester
git clone https://github.com/mikedilger/relay-tester.git
cd relay-tester
cargo build -r
cp target/release/relay-tester $GOBIN/
cd ..
#rm -rf relay-tester

23
scripts/relaytester-test.sh Executable file
View File

@@ -0,0 +1,23 @@
#!/usr/bin/env bash
## relay-tester must be installed
if ! command -v "relay-tester" &> /dev/null; then
echo "relay-tester is not installed."
echo "run this command to install:"
echo
echo "./scripts/relaytester-install.sh"
exit
fi
rm -rf /tmp/orlytest
export ORLY_LOG_LEVEL=trace
export ORLY_LOG_TO_STDOUT=true
export ORLY_LISTEN=127.0.0.1
export ORLY_PORT=3334
export ORLY_IP_WHITELIST=127.0.0
export ORLY_ADMINS=6d9b216ec1dc329ca43c56634e0dba6aaaf3d45ab878bdf4fa910c7117db0bfa,c284f03a874668eded145490e436b87f1a1fc565cf320e7dea93a7e96e3629d7
export ORLY_ACL_MODE=none
export ORLY_DATA_DIR=/tmp/orlytest
go run . &
sleep 5
relay-tester ws://127.0.0.1:3334 nsec12l4072hvvyjpmkyjtdxn48xf8qj299zw60u7ddg58s2aphv3rpjqtg0tvr nsec1syvtjgqauyeezgrev5nqrp36d87apjk87043tgu2usgv8umyy6wq4yl6tu
killall next.orly.dev
rm -rf /tmp/orlytest

14972
stacktrace.txt Normal file

File diff suppressed because it is too large Load Diff