Compare commits

...

30 Commits

Author SHA1 Message Date
6e06905773 Replace WriteTimeout with DefaultWriteTimeout in publisher for consistency and bump version to v0.4.2.
Some checks failed
Go / build (push) Has been cancelled
2025-09-11 16:32:40 +01:00
d1316a5b7a Introduce DefaultWriteTimeout for WebSocket operations, replace hardcoded timeouts, and upgrade version to v0.4.1.
Some checks failed
Go / build (push) Has been cancelled
2025-09-11 16:29:43 +01:00
b45f0a2c51 Bump version to v0.4.0.
Some checks failed
Go / build (push) Has been cancelled
2025-09-11 12:35:15 +01:00
e2b7152221 Introduce Ctx() for proper context management in Listener and replace direct context usage in HandleDelete with Ctx().
also introduce a 3 second timeout for websocket read failure
2025-09-11 12:34:01 +01:00
bf7ca1da43 Improve logging consistency across the application, handle context cancellation during WebSocket writes, and introduce async ACL reconfiguration for admin events. 2025-09-11 11:37:25 +01:00
bb8998fef6 Update relay tester scripts to use /tmp/orlytest for temporary data storage and adjust cleanup logic 2025-09-10 22:48:14 +01:00
57ac3667e6 Bump version to v0.3.2.
Some checks failed
Go / build (push) Has been cancelled
2025-09-10 22:26:57 +01:00
cb54891473 Remove verbose and debug logging across HandleDelete, HandleEvent, and acl/follows for consistency. 2025-09-10 22:26:41 +01:00
fdcfd863e0 Update SaveEvent to improve logging consistency, block duplicate event creations more explicitly, and handle multi-line log formatting updates. Bump version to v0.3.1.
Some checks failed
Go / build (push) Has been cancelled
2025-09-10 22:23:35 +01:00
4e96c9e2f7 Remove debug logging across the codebase and update version to v0.3.0.
Some checks failed
Go / build (push) Has been cancelled
2025-09-10 22:12:54 +01:00
fb956ff09c Block resubmission of deleted events by ID in SaveEvent and simplify deletion timestamp checks in QueryForDeleted. 2025-09-10 20:43:53 +01:00
eac6ba1410 Enhance HandleDelete to skip newer events based on delete event timestamp and improve logging for skipped and deleted events. 2025-09-10 20:34:35 +01:00
6b4b035f0c Refine HandleDelete logic to enforce a-tag criteria for replaceable events, improve parameterized replaceable event handling, and enhance logging for skipped and deleted events. 2025-09-10 20:27:02 +01:00
c2c6720e01 Enhance SaveEvent logic to handle older event rejection with error reporting, validate timestamps in parameterized replaceable events, and improve HandleEvent error handling for blocked events. 2025-09-10 20:11:59 +01:00
dddcc682b9 Improve HandleDelete error handling, add validation for deletion ownership, and enhance logging for unauthorized deletion attempts. 2025-09-10 19:56:11 +01:00
ddaab70d2b Improve HandleDelete error handling, add validation for deletion ownership, and enhance logging for unauthorized deletion attempts. 2025-09-10 19:32:42 +01:00
61cec63ca9 Add detailed tag filter debugging logs in QueryEvents and update rules.md with context and debugging guidance. 2025-09-10 19:24:24 +01:00
b063dab2a3 Improve logging, error handling for ID queries, and ensure inclusive range boundaries in event management. 2025-09-10 19:04:54 +01:00
9e59d5f72b Set default value for LogToStdout, enhance logging for request handling, query events, and filters, and fix ID handling in relaytester-test.sh. 2025-09-10 16:29:55 +01:00
fe3893addf Add LogToStdout config option, improve tag decoding, and fix ID tracking in event handling 2025-09-10 15:16:33 +01:00
5eb192f208 Send initial AUTH challenge if admins are configured and clean up leftover ORLY data in relaytester-test.sh. 2025-09-10 14:39:22 +01:00
2385d1f752 Update relaytester-test.sh log level to off and improve follows key decoding logic in ACL implementation for clarity and error handling. 2025-09-10 14:30:31 +01:00
faad7ddc93 add relay-tester scripts 2025-09-10 14:23:57 +01:00
c9314bdbd0 Refactor GetAccessLevel to include address parameter, update all ACL implementations and handlers for enhanced contextual access control. 2025-09-08 07:42:47 +01:00
85d806b157 Bump version to v0.2.1
Some checks failed
Go / build (push) Has been cancelled
2025-09-07 23:44:06 +01:00
6207f9d426 Enforce authenticated pubkey checks for privileged events, refactor delivery logic for improved efficiency, and extend Subscription with AuthedPubkey. 2025-09-07 23:41:45 +01:00
ebb5e2c0f3 Refactor publisher to clean up dead code, streamline event filtering, and optimize subscriber removal logic. 2025-09-07 23:35:01 +01:00
9dec51cd40 Switch sync.Mutex to sync.RWMutex in publisher for improved concurrent read performance. 2025-09-07 23:06:46 +01:00
f570660f37 Uncomment and enable additional relayinfo features and fix order of event response handling in SaveEvent. 2025-09-07 23:01:26 +01:00
3d3a0fa520 Refactor Signer to use secp256k1 directly and enhance ACL reconfiguration for admin-triggered events 2025-09-07 21:59:50 +01:00
32 changed files with 15776 additions and 227 deletions

View File

@@ -89,3 +89,7 @@ A good typical example:
// - Initializes the relay, starting its operation in a separate goroutine.
```
use the source of the relay-tester to help guide what expectations the test has,
and use context7 for information about the nostr protocol, and use additional
log statements to help locate the cause of bugs

View File

@@ -29,6 +29,7 @@ type C struct {
Port int `env:"ORLY_PORT" default:"3334" usage:"port to listen on"`
LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"`
DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"`
LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"`
Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation"`
IPWhitelist []string `env:"ORLY_IP_WHITELIST" usage:"comma-separated list of IP addresses to allow access from, matches on prefixes to allow private subnets, eg 10.0.0 = 10.0.0.0/8"`
Admins []string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"`
@@ -73,6 +74,9 @@ func New() (cfg *C, err error) {
PrintHelp(cfg, os.Stderr)
os.Exit(0)
}
if cfg.LogToStdout {
lol.Writer = os.Stdout
}
lol.SetLogLevel(cfg.LogLevel)
return
}

View File

@@ -46,7 +46,7 @@ func (l *Listener) HandleAuth(b []byte) (err error) {
return
}
log.D.F(
"%s authed to pubkey,%0x", l.remote,
"%s authed to pubkey %0x", l.remote,
env.Event.Pubkey,
)
l.authedPubkey.Store(env.Event.Pubkey)

View File

@@ -23,14 +23,14 @@ func (l *Listener) GetSerialsFromFilter(f *filter.F) (
return l.D.GetSerialsFromFilter(f)
}
func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
log.I.C(
func() string {
return fmt.Sprintf(
"delete event\n%s", env.E.Serialize(),
)
},
)
func (l *Listener) HandleDelete(env *eventenvelope.Submission) (err error) {
// log.I.C(
// func() string {
// return fmt.Sprintf(
// "delete event\n%s", env.E.Serialize(),
// )
// },
// )
var ownerDelete bool
for _, pk := range l.Admins {
if utils.FastEqual(pk, env.E.Pubkey) {
@@ -39,15 +39,17 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
}
}
// process the tags in the delete event
var err error
var deleteErr error
var validDeletionFound bool
for _, t := range *env.E.Tags {
// first search for a tags, as these are the simplest to process
if utils.FastEqual(t.Key(), []byte("a")) {
at := new(atag.T)
if _, err = at.Unmarshal(t.Value()); chk.E(err) {
if _, deleteErr = at.Unmarshal(t.Value()); chk.E(deleteErr) {
continue
}
if ownerDelete || utils.FastEqual(env.E.Pubkey, at.Pubkey) {
validDeletionFound = true
// find the event and delete it
f := &filter.F{
Authors: tag.NewFromBytesSlice(at.Pubkey),
@@ -69,13 +71,43 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
if ev, err = l.FetchEventBySerial(s); chk.E(err) {
continue
}
if !(kind.IsReplaceable(ev.Kind) && len(at.DTag) == 0) {
// skip a tags with no dtag if the kind is not
// replaceable.
// Only delete events that match the a-tag criteria:
// - For parameterized replaceable events: must have matching d-tag
// - For regular replaceable events: should not have d-tag constraint
if kind.IsParameterizedReplaceable(ev.Kind) {
// For parameterized replaceable, we need a DTag to match
if len(at.DTag) == 0 {
log.I.F(
"HandleDelete: skipping parameterized replaceable event %s - no DTag in a-tag",
hex.Enc(ev.ID),
)
continue
}
} else if !kind.IsReplaceable(ev.Kind) {
// For non-replaceable events, a-tags don't apply
log.I.F(
"HandleDelete: skipping non-replaceable event %s - a-tags only apply to replaceable events",
hex.Enc(ev.ID),
)
continue
}
// Only delete events that are older than or equal to the delete event timestamp
if ev.CreatedAt > env.E.CreatedAt {
log.I.F(
"HandleDelete: skipping newer event %s (created_at=%d) - delete event timestamp is %d",
hex.Enc(ev.ID), ev.CreatedAt, env.E.CreatedAt,
)
continue
}
log.I.F(
"HandleDelete: deleting event %s via a-tag %d:%s:%s (event_time=%d, delete_time=%d)",
hex.Enc(ev.ID), at.Kind.K, hex.Enc(at.Pubkey),
string(at.DTag), ev.CreatedAt, env.E.CreatedAt,
)
if err = l.DeleteEventBySerial(
l.Ctx, s, ev,
l.Ctx(), s, ev,
); chk.E(err) {
continue
}
@@ -87,10 +119,16 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
// if e tags are found, delete them if the author is signer, or one of
// the owners is signer
if utils.FastEqual(t.Key(), []byte("e")) {
var dst []byte
if _, err = hex.DecBytes(dst, t.Value()); chk.E(err) {
val := t.Value()
if len(val) == 0 {
continue
}
var dst []byte
if b, e := hex.Dec(string(val)); chk.E(e) {
continue
} else {
dst = b
}
f := &filter.F{
Ids: tag.NewFromBytesSlice(dst),
}
@@ -108,16 +146,26 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
continue
}
// check that the author is the same as the signer of the
// delete, for the k tag case the author is the signer of
// delete, for the e tag case the author is the signer of
// the event.
if !utils.FastEqual(env.E.Pubkey, ev.Pubkey) {
log.W.F(
"HandleDelete: attempted deletion of event %s by different user - delete pubkey=%s, event pubkey=%s",
hex.Enc(ev.ID), hex.Enc(env.E.Pubkey),
hex.Enc(ev.Pubkey),
)
continue
}
validDeletionFound = true
// exclude delete events
if ev.Kind == kind.EventDeletion.K {
continue
}
if err = l.DeleteEventBySerial(l.Ctx, s, ev); chk.E(err) {
log.I.F(
"HandleDelete: deleting event %s by authorized user %s",
hex.Enc(ev.ID), hex.Enc(env.E.Pubkey),
)
if err = l.DeleteEventBySerial(l.Ctx(), s, ev); chk.E(err) {
continue
}
}
@@ -164,5 +212,11 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
}
continue
}
// If no valid deletions were found, return an error
if !validDeletionFound {
return fmt.Errorf("blocked: cannot delete events that belong to other users")
}
return
}

View File

@@ -1,8 +1,10 @@
package app
import (
"context"
"fmt"
"strings"
"time"
acl "acl.orly"
"encoders.orly/envelopes/authenvelope"
@@ -62,7 +64,7 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
return
}
// check permissions of user
accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load())
accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load(), l.remote)
switch accessLevel {
case "none":
log.D.F(
@@ -99,11 +101,21 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
return
default:
// user has write access or better, continue
log.D.F("user has %s access", accessLevel)
// log.D.F("user has %s access", accessLevel)
}
// if the event is a delete, process the delete
if env.E.Kind == kind.EventDeletion.K {
l.HandleDelete(env)
if err = l.HandleDelete(env); err != nil {
if strings.HasPrefix(err.Error(), "blocked:") {
errStr := err.Error()[len("blocked: "):len(err.Error())]
if err = Ok.Error(
l, env, errStr,
); chk.E(err) {
return
}
return
}
}
} else {
// check if the event was deleted
if err = l.CheckForDeleted(env.E, l.Admins); err != nil {
@@ -117,21 +129,49 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
}
}
}
// store the event
log.I.F("saving event %0x, %s", env.E.ID, env.E.Serialize())
if _, _, err = l.SaveEvent(l.Ctx, env.E); chk.E(err) {
// store the event - use a separate context to prevent cancellation issues
saveCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// log.I.F("saving event %0x, %s", env.E.ID, env.E.Serialize())
if _, _, err = l.SaveEvent(saveCtx, env.E); err != nil {
if strings.HasPrefix(err.Error(), "blocked:") {
errStr := err.Error()[len("blocked: "):len(err.Error())]
if err = Ok.Error(
l, env, errStr,
); chk.E(err) {
return
}
return
}
chk.E(err)
return
}
// if a follow list was saved, reconfigure ACLs now that it is persisted
if env.E.Kind == kind.FollowList.K {
if err = acl.Registry.Configure(); chk.E(err) {
}
}
l.publishers.Deliver(env.E)
// Send a success response storing
if err = Ok.Ok(l, env, ""); chk.E(err) {
return
}
// Deliver the event to subscribers immediately after sending OK response
l.publishers.Deliver(env.E)
log.D.F("saved event %0x", env.E.ID)
var isNewFromAdmin bool
for _, admin := range l.Admins {
if utils.FastEqual(admin, env.E.Pubkey) {
isNewFromAdmin = true
break
}
}
if isNewFromAdmin {
log.I.F("new event from admin %0x", env.E.Pubkey)
// if a follow list was saved, reconfigure ACLs now that it is persisted
if env.E.Kind == kind.FollowList.K ||
env.E.Kind == kind.RelayListMetadata.K {
// Run ACL reconfiguration asynchronously to prevent blocking websocket operations
go func() {
if err := acl.Registry.Configure(); chk.E(err) {
log.E.F("failed to reconfigure ACL: %v", err)
}
}()
}
}
return
}

View File

@@ -1,8 +1,6 @@
package app
import (
"fmt"
"encoders.orly/envelopes"
"encoders.orly/envelopes/authenvelope"
"encoders.orly/envelopes/closeenvelope"
@@ -15,42 +13,36 @@ import (
)
func (l *Listener) HandleMessage(msg []byte, remote string) {
log.D.C(
func() string {
return fmt.Sprintf(
"%s received message:\n%s", remote, msg,
)
},
)
log.D.F("%s received message:\n%s", remote, msg)
var err error
var t string
var rem []byte
if t, rem, err = envelopes.Identify(msg); !chk.E(err) {
switch t {
case eventenvelope.L:
log.D.F("eventenvelope: %s", rem)
// log.D.F("eventenvelope: %s %s", remote, rem)
err = l.HandleEvent(rem)
case reqenvelope.L:
log.D.F("reqenvelope: %s", rem)
// log.D.F("reqenvelope: %s %s", remote, rem)
err = l.HandleReq(rem)
case closeenvelope.L:
log.D.F("closeenvelope: %s", rem)
// log.D.F("closeenvelope: %s %s", remote, rem)
err = l.HandleClose(rem)
case authenvelope.L:
log.D.F("authenvelope: %s", rem)
// log.D.F("authenvelope: %s %s", remote, rem)
err = l.HandleAuth(rem)
default:
err = errorf.E("unknown envelope type %s\n%s", t, rem)
}
}
if err != nil {
log.D.C(
func() string {
return fmt.Sprintf(
"notice->%s %s", remote, err,
)
},
)
// log.D.C(
// func() string {
// return fmt.Sprintf(
// "notice->%s %s", remote, err,
// )
// },
// )
if err = noticeenvelope.NewFrom(err.Error()).Write(l); chk.E(err) {
return
}

View File

@@ -33,32 +33,32 @@ func (s *Server) HandleRelayInfo(w http.ResponseWriter, r *http.Request) {
relayinfo.BasicProtocol,
// relayinfo.Authentication,
// relayinfo.EncryptedDirectMessage,
// relayinfo.EventDeletion,
relayinfo.EventDeletion,
relayinfo.RelayInformationDocument,
// relayinfo.GenericTagQueries,
// relayinfo.NostrMarketplace,
// relayinfo.EventTreatment,
relayinfo.EventTreatment,
// relayinfo.CommandResults,
// relayinfo.ParameterizedReplaceableEvents,
relayinfo.ParameterizedReplaceableEvents,
// relayinfo.ExpirationTimestamp,
// relayinfo.ProtectedEvents,
// relayinfo.RelayListMetadata,
relayinfo.ProtectedEvents,
relayinfo.RelayListMetadata,
)
if s.Config.ACLMode != "none" {
supportedNIPs = relayinfo.GetList(
relayinfo.BasicProtocol,
relayinfo.Authentication,
// relayinfo.EncryptedDirectMessage,
// relayinfo.EventDeletion,
relayinfo.EventDeletion,
relayinfo.RelayInformationDocument,
// relayinfo.GenericTagQueries,
// relayinfo.NostrMarketplace,
// relayinfo.EventTreatment,
relayinfo.EventTreatment,
// relayinfo.CommandResults,
// relayinfo.ParameterizedReplaceableEvents,
// relayinfo.ExpirationTimestamp,
// relayinfo.ProtectedEvents,
// relayinfo.RelayListMetadata,
relayinfo.ProtectedEvents,
relayinfo.RelayListMetadata,
)
}
sort.Sort(supportedNIPs)

View File

@@ -1,7 +1,10 @@
package app
import (
"context"
"errors"
"fmt"
"time"
acl "acl.orly"
"encoders.orly/envelopes/authenvelope"
@@ -24,16 +27,15 @@ import (
"utils.orly/pointers"
)
func (l *Listener) HandleReq(msg []byte) (
err error,
) {
func (l *Listener) HandleReq(msg []byte) (err error) {
log.T.F("HandleReq: START processing from %s\n%s\n", l.remote, msg)
var rem []byte
env := reqenvelope.New()
if rem, err = env.Unmarshal(msg); chk.E(err) {
return normalize.Error.Errorf(err.Error())
}
if len(rem) > 0 {
log.I.F("extra '%s'", rem)
log.I.F("REQ extra bytes: '%s'", rem)
}
// send a challenge to the client to auth if an ACL is active
if acl.Registry.Active.Load() != "none" {
@@ -43,7 +45,7 @@ func (l *Listener) HandleReq(msg []byte) (
}
}
// check permissions of user
accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load())
accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load(), l.remote)
switch accessLevel {
case "none":
if err = okenvelope.NewFrom(
@@ -59,17 +61,71 @@ func (l *Listener) HandleReq(msg []byte) (
}
var events event.S
for _, f := range *env.Filters {
idsLen := 0
kindsLen := 0
authorsLen := 0
tagsLen := 0
if f != nil {
if f.Ids != nil {
idsLen = f.Ids.Len()
}
if f.Kinds != nil {
kindsLen = f.Kinds.Len()
}
if f.Authors != nil {
authorsLen = f.Authors.Len()
}
if f.Tags != nil {
tagsLen = f.Tags.Len()
}
}
log.T.F(
"REQ %s: filter summary ids=%d kinds=%d authors=%d tags=%d",
env.Subscription, idsLen, kindsLen, authorsLen, tagsLen,
)
if f != nil && f.Authors != nil && f.Authors.Len() > 0 {
var authors []string
for _, a := range f.Authors.T {
authors = append(authors, hex.Enc(a))
}
log.T.F("REQ %s: authors=%v", env.Subscription, authors)
}
if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 {
log.T.F("REQ %s: kinds=%v", env.Subscription, f.Kinds.ToUint16())
}
if f != nil && f.Ids != nil && f.Ids.Len() > 0 {
var ids []string
for _, id := range f.Ids.T {
ids = append(ids, hex.Enc(id))
}
var lim any
if pointers.Present(f.Limit) {
lim = *f.Limit
} else {
lim = nil
}
log.T.F(
"REQ %s: ids filter count=%d ids=%v limit=%v", env.Subscription,
f.Ids.Len(), ids, lim,
)
}
if pointers.Present(f.Limit) {
if *f.Limit == 0 {
continue
}
}
if events, err = l.QueryEvents(l.Ctx, f); chk.E(err) {
if errors.Is(err, badger.ErrDBClosed) {
return
}
err = nil
}
// Use a separate context for QueryEvents to prevent cancellation issues
queryCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
log.T.F("HandleReq: About to QueryEvents for %s, main context done: %v", l.remote, l.ctx.Err() != nil)
if events, err = l.QueryEvents(queryCtx, f); chk.E(err) {
if errors.Is(err, badger.ErrDBClosed) {
return
}
log.T.F("HandleReq: QueryEvents error for %s: %v", l.remote, err)
err = nil
}
log.T.F("HandleReq: QueryEvents completed for %s, found %d events", l.remote, len(events))
}
var tmp event.S
privCheck:
@@ -115,8 +171,15 @@ privCheck:
events = tmp
seen := make(map[string]struct{})
for _, ev := range events {
// track the IDs we've sent
seen[string(ev.ID)] = struct{}{}
log.T.F(
"REQ %s: sending EVENT id=%s kind=%d", env.Subscription,
hex.Enc(ev.ID), ev.Kind,
)
log.T.C(
func() string {
return fmt.Sprintf("event:\n%s\n", ev.Serialize())
},
)
var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith(
env.Subscription, ev,
@@ -126,6 +189,8 @@ privCheck:
if err = res.Write(l); chk.E(err) {
return
}
// track the IDs we've sent (use hex encoding for stable key)
seen[hex.Enc(ev.ID)] = struct{}{}
}
// write the EOSE to signal to the client that all events found have been
// sent.
@@ -137,6 +202,10 @@ privCheck:
// if the query was for just Ids, we know there can't be any more results,
// so cancel the subscription.
cancel := true
log.T.F(
"REQ %s: computing cancel/subscription; events_sent=%d",
env.Subscription, len(events),
)
var subbedFilters filter.S
for _, f := range *env.Filters {
if f.Ids.Len() < 1 {
@@ -145,12 +214,16 @@ privCheck:
} else {
// remove the IDs that we already sent
var notFounds [][]byte
for _, ev := range events {
if _, ok := seen[string(ev.ID)]; ok {
for _, id := range f.Ids.T {
if _, ok := seen[hex.Enc(id)]; ok {
continue
}
notFounds = append(notFounds, ev.ID)
notFounds = append(notFounds, id)
}
log.T.F(
"REQ %s: ids outstanding=%d of %d", env.Subscription,
len(notFounds), f.Ids.Len(),
)
// if all were found, don't add to subbedFilters
if len(notFounds) == 0 {
continue
@@ -172,11 +245,12 @@ privCheck:
if !cancel {
l.publishers.Receive(
&W{
Conn: l.conn,
remote: l.remote,
Id: string(env.Subscription),
Receiver: receiver,
Filters: env.Filters,
Conn: l.conn,
remote: l.remote,
Id: string(env.Subscription),
Receiver: receiver,
Filters: env.Filters,
AuthedPubkey: l.authedPubkey.Load(),
},
)
} else {
@@ -186,5 +260,6 @@ privCheck:
return
}
}
log.T.F("HandleReq: COMPLETED processing from %s", l.remote)
return
}

View File

@@ -7,6 +7,7 @@ import (
"strings"
"time"
"encoders.orly/envelopes/authenvelope"
"encoders.orly/hex"
"github.com/coder/websocket"
"lol.mleku.dev/chk"
@@ -18,6 +19,8 @@ const (
DefaultWriteWait = 10 * time.Second
DefaultPongWait = 60 * time.Second
DefaultPingWait = DefaultPongWait / 2
DefaultReadTimeout = 3 * time.Second // Read timeout to detect stalled connections
DefaultWriteTimeout = 3 * time.Second
DefaultMaxMessageSize = 1 * units.Mb
// CloseMessage denotes a close control message. The optional message
@@ -70,10 +73,18 @@ whitelist:
chal := make([]byte, 32)
rand.Read(chal)
listener.challenge.Store([]byte(hex.Enc(chal)))
// If admins are configured, immediately prompt client to AUTH (NIP-42)
if len(s.Config.Admins) > 0 {
// log.D.F("sending initial AUTH challenge to %s", remote)
if err = authenvelope.NewChallengeWith(listener.challenge.Load()).
Write(listener); chk.E(err) {
return
}
}
ticker := time.NewTicker(DefaultPingWait)
go s.Pinger(ctx, conn, ticker, cancel)
defer func() {
log.D.F("closing websocket connection from %s", remote)
// log.D.F("closing websocket connection from %s", remote)
cancel()
ticker.Stop()
listener.publishers.Receive(&W{Cancel: true})
@@ -87,12 +98,33 @@ whitelist:
var typ websocket.MessageType
var msg []byte
log.T.F("waiting for message from %s", remote)
if typ, msg, err = conn.Read(ctx); chk.E(err) {
// Create a read context with timeout to prevent indefinite blocking
readCtx, readCancel := context.WithTimeout(ctx, DefaultReadTimeout)
typ, msg, err = conn.Read(readCtx)
readCancel()
if err != nil {
if strings.Contains(
err.Error(), "use of closed network connection",
) {
return
}
// Handle timeout errors - occurs when client becomes unresponsive
if strings.Contains(err.Error(), "context deadline exceeded") {
log.T.F(
"connection from %s timed out after %v", remote,
DefaultReadTimeout,
)
return
}
// Handle EOF errors gracefully - these occur when client closes connection
// or sends incomplete/malformed WebSocket frames
if strings.Contains(err.Error(), "EOF") ||
strings.Contains(err.Error(), "failed to read frame header") {
log.T.F("connection from %s closed: %v", remote, err)
return
}
status := websocket.CloseStatus(err)
switch status {
case websocket.StatusNormalClosure,
@@ -100,17 +132,27 @@ whitelist:
websocket.StatusNoStatusRcvd,
websocket.StatusAbnormalClosure,
websocket.StatusProtocolError:
log.T.F(
"connection from %s closed with status: %v", remote, status,
)
default:
log.E.F("unexpected close error from %s: %v", remote, err)
}
return
}
if typ == PingMessage {
if err = conn.Write(ctx, PongMessage, msg); chk.E(err) {
// Create a write context with timeout for pong response
writeCtx, writeCancel := context.WithTimeout(
ctx, DefaultWriteTimeout,
)
if err = conn.Write(writeCtx, PongMessage, msg); chk.E(err) {
writeCancel()
return
}
writeCancel()
continue
}
log.T.F("received message from %s: %s", remote, string(msg))
go listener.HandleMessage(msg, remote)
}
}
@@ -127,9 +169,13 @@ func (s *Server) Pinger(
for {
select {
case <-ticker.C:
if err = conn.Ping(ctx); chk.E(err) {
// Create a write context with timeout for ping operation
pingCtx, pingCancel := context.WithTimeout(ctx, DefaultWriteTimeout)
if err = conn.Ping(pingCtx); chk.E(err) {
pingCancel()
return
}
pingCancel()
case <-ctx.Done():
return
}

View File

@@ -19,8 +19,21 @@ type Listener struct {
authedPubkey atomic.Bytes
}
// Ctx returns the listener's context, but creates a new context for each operation
// to prevent cancellation from affecting subsequent operations
func (l *Listener) Ctx() context.Context {
return l.ctx
}
func (l *Listener) Write(p []byte) (n int, err error) {
if err = l.conn.Write(l.ctx, websocket.MessageText, p); chk.E(err) {
// Use a separate context with timeout for writes to prevent race conditions
// where the main connection context gets cancelled while writing events
writeCtx, cancel := context.WithTimeout(
context.Background(), DefaultWriteTimeout,
)
defer cancel()
if err = l.conn.Write(writeCtx, websocket.MessageText, p); chk.E(err) {
return
}
n = len(p)

View File

@@ -28,6 +28,9 @@ func Run(
var err error
var adminKeys [][]byte
for _, admin := range cfg.Admins {
if len(admin) == 0 {
continue
}
var pk []byte
if pk, err = bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(err) {
continue

View File

@@ -8,17 +8,21 @@ import (
"encoders.orly/envelopes/eventenvelope"
"encoders.orly/event"
"encoders.orly/filter"
"encoders.orly/hex"
"encoders.orly/kind"
"github.com/coder/websocket"
"interfaces.orly/publisher"
"interfaces.orly/typer"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
utils "utils.orly"
)
const Type = "socketapi"
type Subscription struct {
remote string
remote string
AuthedPubkey []byte
*filter.S
}
@@ -46,6 +50,9 @@ type W struct {
// associated with this WebSocket connection. It is used to determine which
// notifications or data should be received by the subscriber.
Filters *filter.S
// AuthedPubkey is the authenticated pubkey associated with the listener (if any).
AuthedPubkey []byte
}
func (w *W) Type() (typeName string) { return Type }
@@ -56,7 +63,7 @@ func (w *W) Type() (typeName string) { return Type }
type P struct {
c context.Context
// Mx is the mutex for the Map.
Mx sync.Mutex
Mx sync.RWMutex
// Map is the map of subscribers and subscriptions from the websocket api.
Map
}
@@ -112,7 +119,9 @@ func (p *P) Receive(msg typer.T) {
defer p.Mx.Unlock()
if subs, ok := p.Map[m.Conn]; !ok {
subs = make(map[string]Subscription)
subs[m.Id] = Subscription{S: m.Filters, remote: m.remote}
subs[m.Id] = Subscription{
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
}
p.Map[m.Conn] = subs
log.D.C(
func() string {
@@ -124,7 +133,9 @@ func (p *P) Receive(msg typer.T) {
},
)
} else {
subs[m.Id] = Subscription{S: m.Filters, remote: m.remote}
subs[m.Id] = Subscription{
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
}
log.D.C(
func() string {
return fmt.Sprintf(
@@ -150,71 +161,111 @@ func (p *P) Receive(msg typer.T) {
// for unauthenticated users when events are privileged.
func (p *P) Deliver(ev *event.E) {
var err error
p.Mx.Lock()
defer p.Mx.Unlock()
log.D.C(
func() string {
return fmt.Sprintf(
"delivering event %0x to websocket subscribers %d", ev.ID,
len(p.Map),
)
},
)
// Snapshot the deliveries under read lock to avoid holding locks during I/O
p.Mx.RLock()
type delivery struct {
w *websocket.Conn
id string
sub Subscription
}
var deliveries []delivery
for w, subs := range p.Map {
for id, subscriber := range subs {
if !subscriber.Match(ev) {
continue
if subscriber.Match(ev) {
deliveries = append(
deliveries, delivery{w: w, id: id, sub: subscriber},
)
}
// if p.Server.AuthRequired() {
// if !auth.CheckPrivilege(w.AuthedPubkey(), ev) {
// continue
// }
// }
var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith(id, ev); chk.E(err) {
continue
}
if err = w.Write(
p.c, websocket.MessageText, res.Marshal(nil),
); chk.E(err) {
p.removeSubscriber(w)
if err = w.CloseNow(); chk.E(err) {
continue
}
continue
}
log.D.C(
func() string {
return fmt.Sprintf(
"dispatched event %0x to subscription %s, %s",
ev.ID, id, subscriber.remote,
)
},
)
}
}
p.Mx.RUnlock()
if len(deliveries) > 0 {
log.D.C(
func() string {
return fmt.Sprintf(
"delivering event %0x to websocket subscribers %d", ev.ID,
len(deliveries),
)
},
)
}
for _, d := range deliveries {
// If the event is privileged, enforce that the subscriber's authed pubkey matches
// either the event pubkey or appears in any 'p' tag of the event.
if kind.IsPrivileged(ev.Kind) && len(d.sub.AuthedPubkey) > 0 {
pk := d.sub.AuthedPubkey
allowed := false
// Direct author match
if utils.FastEqual(ev.Pubkey, pk) {
allowed = true
} else if ev.Tags != nil {
for _, pTag := range ev.Tags.GetAll([]byte("p")) {
// pTag.Value() returns []byte hex string; decode to bytes
dec, derr := hex.Dec(string(pTag.Value()))
if derr != nil {
continue
}
if utils.FastEqual(dec, pk) {
allowed = true
break
}
}
}
if !allowed {
// Skip delivery for this subscriber
continue
}
}
var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith(d.id, ev); chk.E(err) {
continue
}
// Use a separate context with timeout for writes to prevent race conditions
// where the publisher context gets cancelled while writing events
writeCtx, cancel := context.WithTimeout(
context.Background(), DefaultWriteTimeout,
)
defer cancel()
if err = d.w.Write(
writeCtx, websocket.MessageText, res.Marshal(nil),
); chk.E(err) {
// On error, remove the subscriber connection safely
p.removeSubscriber(d.w)
_ = d.w.CloseNow()
continue
}
log.D.C(
func() string {
return fmt.Sprintf(
"dispatched event %0x to subscription %s, %s",
ev.ID, d.id, d.sub.remote,
)
},
)
}
}
// removeSubscriberId removes a specific subscription from a subscriber
// websocket.
func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
p.Mx.Lock()
defer p.Mx.Unlock()
var subs map[string]Subscription
var ok bool
if subs, ok = p.Map[ws]; ok {
delete(p.Map[ws], id)
_ = subs
if len(subs) == 0 {
delete(subs, id)
// Check the actual map after deletion, not the original reference
if len(p.Map[ws]) == 0 {
delete(p.Map, ws)
}
}
p.Mx.Unlock()
}
// removeSubscriber removes a websocket from the P collection.
func (p *P) removeSubscriber(ws *websocket.Conn) {
p.Mx.Lock()
defer p.Mx.Unlock()
clear(p.Map[ws])
delete(p.Map, ws)
p.Mx.Unlock()
}

View File

@@ -2,14 +2,12 @@ package app
import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
"database.orly"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/app/config"
"protocol.orly/publish"
)
@@ -25,11 +23,11 @@ type Server struct {
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.T.C(
func() string {
return fmt.Sprintf("path %v header %v", r.URL, r.Header)
},
)
// log.T.C(
// func() string {
// return fmt.Sprintf("path %v header %v", r.URL, r.Header)
// },
// )
if r.Header.Get("Upgrade") == "websocket" {
s.HandleWebsocket(w, r)
} else if r.Header.Get("Accept") == "application/nostr+json" {

View File

@@ -28,10 +28,10 @@ func (s *S) Configure(cfg ...any) (err error) {
return err
}
func (s *S) GetAccessLevel(pub []byte) (level string) {
func (s *S) GetAccessLevel(pub []byte, address string) (level string) {
for _, i := range s.ACL {
if i.Type() == s.Active.Load() {
level = i.GetAccessLevel(pub)
level = i.GetAccessLevel(pub, address)
break
}
}

View File

@@ -45,13 +45,13 @@ func (f *Follows) Configure(cfg ...any) (err error) {
for _, ca := range cfg {
switch c := ca.(type) {
case *config.C:
log.D.F("setting ACL config: %v", c)
// log.D.F("setting ACL config: %v", c)
f.cfg = c
case *database.D:
log.D.F("setting ACL database: %s", c.Path())
// log.D.F("setting ACL database: %s", c.Path())
f.D = c
case context.Context:
log.D.F("setting ACL context: %s", c.Value("id"))
// log.D.F("setting ACL context: %s", c.Value("id"))
f.Ctx = c
default:
err = errorf.E("invalid type: %T", reflect.TypeOf(ca))
@@ -64,13 +64,15 @@ func (f *Follows) Configure(cfg ...any) (err error) {
// find admin follow lists
f.followsMx.Lock()
defer f.followsMx.Unlock()
log.I.F("finding admins")
// log.I.F("finding admins")
f.follows, f.admins = nil, nil
for _, admin := range f.cfg.Admins {
log.I.F("%s", admin)
// log.I.F("%s", admin)
var adm []byte
if adm, err = bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(err) {
if a, e := bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(e) {
continue
} else {
adm = a
}
log.I.F("admin: %0x", adm)
f.admins = append(f.admins, adm)
@@ -96,12 +98,14 @@ func (f *Follows) Configure(cfg ...any) (err error) {
if ev, err = f.D.FetchEventBySerial(s); chk.E(err) {
continue
}
log.I.F("admin follow list:\n%s", ev.Serialize())
// log.I.F("admin follow list:\n%s", ev.Serialize())
for _, v := range ev.Tags.GetAll([]byte("p")) {
log.I.F("adding follow: %s", v.Value())
// log.I.F("adding follow: %s", v.Value())
var a []byte
if a, err = hex.Dec(string(v.Value())); chk.E(err) {
if b, e := hex.Dec(string(v.Value())); chk.E(e) {
continue
} else {
a = b
}
f.follows = append(f.follows, a)
}
@@ -116,7 +120,7 @@ func (f *Follows) Configure(cfg ...any) (err error) {
return
}
func (f *Follows) GetAccessLevel(pub []byte) (level string) {
func (f *Follows) GetAccessLevel(pub []byte, address string) (level string) {
if f.cfg == nil {
return "write"
}
@@ -203,7 +207,7 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
log.W.F("follows syncer: no admin relays found in DB (kind 10002)")
return
}
log.I.F(
log.T.F(
"follows syncer: subscribing to %d relays for %d authors", len(urls),
len(authors),
)
@@ -240,13 +244,13 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
}
*ff = append(*ff, f1)
req := reqenvelope.NewFrom([]byte("follows-sync"), ff)
if err := c.Write(
if err = c.Write(
ctx, websocket.MessageText, req.Marshal(nil),
); chk.E(err) {
_ = c.Close(websocket.StatusInternalError, "write failed")
continue
}
log.I.F("sent REQ to %s for follows subscription", u)
log.T.F("sent REQ to %s for follows subscription", u)
// read loop
for {
select {
@@ -274,11 +278,11 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
if ok, err := res.Event.Verify(); chk.T(err) || !ok {
continue
}
if _, _, err := f.D.SaveEvent(
if _, _, err = f.D.SaveEvent(
ctx, res.Event,
); err != nil {
if !strings.HasPrefix(
err.Error(), "event already exists",
err.Error(), "blocked:",
) {
log.W.F(
"follows syncer: save event failed: %v",
@@ -333,6 +337,7 @@ func (f *Follows) Syncer() {
}
}
}()
f.updated <- struct{}{}
}
func init() {

View File

@@ -8,7 +8,7 @@ type None struct{}
func (n None) Configure(cfg ...any) (err error) { return }
func (n None) GetAccessLevel(pub []byte) (level string) {
func (n None) GetAccessLevel(pub []byte, address string) (level string) {
return "write"
}

View File

@@ -15,7 +15,7 @@ import (
type Signer struct {
SecretKey *secp256k1.SecretKey
PublicKey *secp256k1.PublicKey
BTCECSec *ec.SecretKey
BTCECSec *secp256k1.SecretKey
pkb, skb []byte
}
@@ -23,11 +23,11 @@ var _ signer.I = &Signer{}
// Generate creates a new Signer.
func (s *Signer) Generate() (err error) {
if s.SecretKey, err = ec.NewSecretKey(); chk.E(err) {
if s.SecretKey, err = secp256k1.GenerateSecretKey(); chk.E(err) {
return
}
s.skb = s.SecretKey.Serialize()
s.BTCECSec, _ = ec.PrivKeyFromBytes(s.skb)
s.BTCECSec = secp256k1.PrivKeyFromBytes(s.skb)
s.PublicKey = s.SecretKey.PubKey()
s.pkb = schnorr.SerializePubKey(s.PublicKey)
return
@@ -43,7 +43,7 @@ func (s *Signer) InitSec(sec []byte) (err error) {
s.SecretKey = secp256k1.SecKeyFromBytes(sec)
s.PublicKey = s.SecretKey.PubKey()
s.pkb = schnorr.SerializePubKey(s.PublicKey)
s.BTCECSec, _ = ec.PrivKeyFromBytes(s.skb)
s.BTCECSec = secp256k1.PrivKeyFromBytes(s.skb)
return
}
@@ -142,7 +142,7 @@ func (s *Signer) ECDH(pubkeyBytes []byte) (secret []byte, err error) {
); chk.E(err) {
return
}
secret = ec.GenerateSharedSecret(s.BTCECSec, pub)
secret = secp256k1.GenerateSharedSecret(s.BTCECSec, pub)
return
}
@@ -154,7 +154,7 @@ type Keygen struct {
// Generate a new key pair. If the result is suitable, the embedded Signer can have its contents
// extracted.
func (k *Keygen) Generate() (pubBytes []byte, err error) {
if k.Signer.SecretKey, err = ec.NewSecretKey(); chk.E(err) {
if k.Signer.SecretKey, err = secp256k1.GenerateSecretKey(); chk.E(err) {
return
}
k.Signer.PublicKey = k.SecretKey.PubKey()

View File

@@ -9,6 +9,7 @@ import (
types2 "database.orly/indexes/types"
"encoders.orly/filter"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
)
type Range struct {
@@ -89,12 +90,20 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
return
}
buf := new(bytes.Buffer)
// Create an index prefix without the serial number
idx := indexes.IdEnc(i, nil)
if err = idx.MarshalWrite(buf); chk.E(err) {
return
}
b := buf.Bytes()
r := Range{b, b}
// Create range that will match any serial value with this ID prefix
end := make([]byte, len(b))
copy(end, b)
// Fill the end range with 0xff bytes to match all possible serial values
for i := 0; i < 5; i++ {
end = append(end, 0xff)
}
r := Range{b, end}
idxs = append(idxs, r)
return
}(); chk.E(err) {
@@ -230,6 +239,7 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
for _, t := range *f.Tags {
if t.Len() >= 2 && (len(t.Key()) == 1 || (len(t.Key()) == 2 && t.Key()[0] == '#')) {
var p *types2.PubHash
log.I.S(author)
if p, err = CreatePubHashFromData(author); chk.E(err) {
return
}
@@ -306,8 +316,8 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
for _, author := range f.Authors.T {
kind := new(types2.Uint16)
kind.Set(k)
p := new(types2.PubHash)
if err = p.FromPubkey(author); chk.E(err) {
var p *types2.PubHash
if p, err = CreatePubHashFromData(author); chk.E(err) {
return
}
start, end := new(bytes.Buffer), new(bytes.Buffer)
@@ -351,8 +361,9 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
// Pubkey pc-
if f.Authors != nil && f.Authors.Len() > 0 {
for _, author := range f.Authors.T {
p := new(types2.PubHash)
if err = p.FromPubkey(author); chk.E(err) {
var p *types2.PubHash
log.I.S(author)
if p, err = CreatePubHashFromData(author); chk.E(err) {
return
}
start, end := new(bytes.Buffer), new(bytes.Buffer)

View File

@@ -5,20 +5,33 @@ import (
"database.orly/indexes/types"
"encoders.orly/filter"
"encoders.orly/hex"
"encoders.orly/tag"
"github.com/dgraph-io/badger/v4"
"lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
)
func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
log.T.F("GetSerialById: input id=%s", hex.Enc(id))
var idxs []Range
if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.NewFromBytesSlice(id)}); chk.E(err) {
return
}
for i, idx := range idxs {
log.T.F(
"GetSerialById: searching range %d: start=%x, end=%x", i, idx.Start,
idx.End,
)
}
if len(idxs) == 0 {
err = errorf.E("no indexes found for id %0x", id)
return
}
idFound := false
if err = d.View(
func(txn *badger.Txn) (err error) {
it := txn.NewIterator(badger.DefaultIteratorOptions)
@@ -33,15 +46,24 @@ func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
if err = ser.UnmarshalRead(buf); chk.E(err) {
return
}
idFound = true
} else {
// just don't return what we don't have? others may be
// found tho.
// Item not found in database
log.T.F(
"GetSerialById: ID not found in database: %s", hex.Enc(id),
)
}
return
},
); chk.E(err) {
return
}
if !idFound {
err = errorf.T("id not found in database: %s", hex.Enc(id))
return
}
return
}

View File

@@ -20,7 +20,15 @@ func (d *D) GetSerialsByRange(idx Range) (
},
)
defer it.Close()
for it.Seek(idx.End); it.Valid(); it.Next() {
// Start from a position that includes the end boundary (until timestamp)
// We create an end boundary that's slightly beyond the actual end to ensure inclusivity
endBoundary := make([]byte, len(idx.End))
copy(endBoundary, idx.End)
// Add 0xff bytes to ensure we capture all events at the exact until timestamp
for i := 0; i < 5; i++ {
endBoundary = append(endBoundary, 0xff)
}
for it.Seek(endBoundary); it.Valid(); it.Next() {
item := it.Item()
var key []byte
key = item.Key()

View File

@@ -15,10 +15,13 @@ const PubHashLen = 8
type PubHash struct{ val [PubHashLen]byte }
func (ph *PubHash) FromPubkey(pk []byte) (err error) {
if len(pk) == 0 {
panic("nil pubkey")
}
if len(pk) != schnorr.PubKeyBytesLen {
err = errorf.E(
"invalid Pubkey length, got %d require %d",
len(pk), schnorr.PubKeyBytesLen,
"invalid Pubkey length, got %d require %d %0x",
len(pk), schnorr.PubKeyBytesLen, pk,
)
return
}

View File

@@ -5,6 +5,7 @@ import (
"context"
"sort"
"strconv"
"strings"
"time"
"crypto.orly/sha256"
@@ -42,23 +43,71 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
var expDeletes types.Uint40s
var expEvs event.S
if f.Ids != nil && f.Ids.Len() > 0 {
for _, id := range f.Ids.T {
log.T.F("QueryEvents: looking for ID=%s", hex.Enc(id))
}
log.T.F("QueryEvents: ids path, count=%d", f.Ids.Len())
for _, idx := range f.Ids.T {
log.T.F("QueryEvents: lookup id=%s", hex.Enc(idx))
// we know there is only Ids in this, so run the ID query and fetch.
var ser *types.Uint40
if ser, err = d.GetSerialById(idx); chk.E(err) {
var idErr error
if ser, idErr = d.GetSerialById(idx); idErr != nil {
// Check if this is a "not found" error which is expected for IDs we don't have
if strings.Contains(idErr.Error(), "id not found in database") {
log.T.F(
"QueryEvents: ID not found in database: %s",
hex.Enc(idx),
)
} else {
// Log unexpected errors but continue processing other IDs
log.E.F(
"QueryEvents: error looking up id=%s err=%v",
hex.Enc(idx), idErr,
)
}
continue
}
// Check if the serial is nil, which indicates the ID wasn't found
if ser == nil {
log.T.F("QueryEvents: Serial is nil for ID: %s", hex.Enc(idx))
continue
}
// fetch the events
var ev *event.E
if ev, err = d.FetchEventBySerial(ser); err != nil {
log.T.F(
"QueryEvents: fetch by serial failed for id=%s ser=%v err=%v",
hex.Enc(idx), ser, err,
)
continue
}
log.T.F(
"QueryEvents: found id=%s kind=%d created_at=%d",
hex.Enc(ev.ID), ev.Kind, ev.CreatedAt,
)
// check for an expiration tag and delete after returning the result
if CheckExpiration(ev) {
log.T.F(
"QueryEvents: id=%s filtered out due to expiration",
hex.Enc(ev.ID),
)
expDeletes = append(expDeletes, ser)
expEvs = append(expEvs, ev)
continue
}
// skip events that have been deleted by a proper deletion event
if derr := d.CheckForDeleted(ev, nil); derr != nil {
log.T.F(
"QueryEvents: id=%s filtered out due to deletion: %v",
hex.Enc(ev.ID), derr,
)
continue
}
log.T.F(
"QueryEvents: id=%s SUCCESSFULLY FOUND, adding to results",
hex.Enc(ev.ID),
)
evs = append(evs, ev)
}
// sort the events by timestamp
@@ -68,10 +117,15 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
},
)
} else {
// non-IDs path
var idPkTs []*store.IdPkTs
// if f.Authors != nil && f.Authors.Len() > 0 && f.Kinds != nil && f.Kinds.Len() > 0 {
// log.T.F("QueryEvents: authors+kinds path, authors=%d kinds=%d", f.Authors.Len(), f.Kinds.Len())
// }
if idPkTs, err = d.QueryForIds(c, f); chk.E(err) {
return
}
// log.T.F("QueryEvents: QueryForIds returned %d candidates", len(idPkTs))
// Create a map to store the latest version of replaceable events
replaceableEvents := make(map[string]*event.E)
// Create a map to store the latest version of parameterized replaceable
@@ -83,8 +137,8 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
// events)
deletionsByKindPubkey := make(map[string]bool)
// Map to track deletion events by kind, pubkey, and d-tag (for
// parameterized replaceable events)
deletionsByKindPubkeyDTag := make(map[string]map[string]bool)
// parameterized replaceable events). We store the newest delete timestamp per d-tag.
deletionsByKindPubkeyDTag := make(map[string]map[string]int64)
// Map to track specific event IDs that have been deleted
deletedEventIds := make(map[string]bool)
// Query for deletion events separately if we have authors in the filter
@@ -169,11 +223,13 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
key := hex.Enc(pk) + ":" + strconv.Itoa(int(kk.K))
// Initialize the inner map if it doesn't exist
if _, exists := deletionsByKindPubkeyDTag[key]; !exists {
deletionsByKindPubkeyDTag[key] = make(map[string]bool)
deletionsByKindPubkeyDTag[key] = make(map[string]int64)
}
// Mark this d-tag as deleted
// Record the newest delete timestamp for this d-tag
dValue := string(split[2])
deletionsByKindPubkeyDTag[key][dValue] = true
if ts, ok := deletionsByKindPubkeyDTag[key][dValue]; !ok || ev.CreatedAt > ts {
deletionsByKindPubkeyDTag[key][dValue] = ev.CreatedAt
}
// Debug logging
}
// For replaceable events, we need to check if there are any
@@ -225,15 +281,16 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
}
// Initialize the inner map if it doesn't exist
if _, exists := deletionsByKindPubkeyDTag[key]; !exists {
deletionsByKindPubkeyDTag[key] = make(map[string]bool)
deletionsByKindPubkeyDTag[key] = make(map[string]int64)
}
// Record the newest delete timestamp for this d-tag
if ts, ok := deletionsByKindPubkeyDTag[key][dValue]; !ok || ev.CreatedAt > ts {
deletionsByKindPubkeyDTag[key][dValue] = ev.CreatedAt
}
// Mark this d-tag as deleted
deletionsByKindPubkeyDTag[key][dValue] = true
}
}
}
}
// Second pass: process all events, filtering out deleted ones
for _, idpk := range idPkTs {
var ev *event.E
@@ -244,6 +301,83 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
if ev, err = d.FetchEventBySerial(ser); err != nil {
continue
}
// Add logging for tag filter debugging
if f.Tags != nil && f.Tags.Len() > 0 {
var eventTags []string
if ev.Tags != nil && ev.Tags.Len() > 0 {
for _, t := range *ev.Tags {
if t.Len() >= 2 {
eventTags = append(
eventTags,
string(t.Key())+"="+string(t.Value()),
)
}
}
}
// log.T.F(
// "QueryEvents: processing event ID=%s kind=%d tags=%v",
// hex.Enc(ev.ID), ev.Kind, eventTags,
// )
// Check if this event matches ALL required tags in the filter
tagMatches := 0
for _, filterTag := range *f.Tags {
if filterTag.Len() >= 2 {
filterKey := filterTag.Key()
// Handle filter keys that start with # (remove the prefix for comparison)
var actualKey []byte
if len(filterKey) == 2 && filterKey[0] == '#' {
actualKey = filterKey[1:]
} else {
actualKey = filterKey
}
// Check if event has this tag key with any of the filter's values
eventHasTag := false
if ev.Tags != nil {
for _, eventTag := range *ev.Tags {
if eventTag.Len() >= 2 && bytes.Equal(
eventTag.Key(), actualKey,
) {
// Check if the event's tag value matches any of the filter's values
for _, filterValue := range filterTag.T[1:] {
if bytes.Equal(
eventTag.Value(), filterValue,
) {
eventHasTag = true
break
}
}
if eventHasTag {
break
}
}
}
}
if eventHasTag {
tagMatches++
}
// log.T.F(
// "QueryEvents: tag filter %s (actual key: %s) matches: %v (total matches: %d/%d)",
// string(filterKey), string(actualKey), eventHasTag,
// tagMatches, f.Tags.Len(),
// )
}
}
// If not all tags match, skip this event
if tagMatches < f.Tags.Len() {
// log.T.F(
// "QueryEvents: event ID=%s SKIPPED - only matches %d/%d required tags",
// hex.Enc(ev.ID), tagMatches, f.Tags.Len(),
// )
continue
}
// log.T.F(
// "QueryEvents: event ID=%s PASSES all tag filters",
// hex.Enc(ev.ID),
// )
}
// Skip events with kind 5 (Deletion)
if ev.Kind == kind.Deletion.K {
continue
@@ -308,10 +442,10 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
// Check if this event has been deleted via an a-tag
if deletionMap, exists := deletionsByKindPubkeyDTag[key]; exists {
// If the d-tag value is in the deletion map and this event
// is not specifically requested by ID, skip it
if deletionMap[dValue] && !isIdInFilter {
log.T.F("Debug: Event deleted - skipping")
// If there is a deletion timestamp and this event is older than the deletion,
// and this event is not specifically requested by ID, skip it
if delTs, ok := deletionMap[dValue]; ok && ev.CreatedAt < delTs && !isIdInFilter {
log.T.F("Debug: Event deleted by a-tag (older than delete) - skipping")
continue
}
}

View File

@@ -199,25 +199,12 @@ func (d *D) CheckForDeleted(ev *event.E, admins [][]byte) (err error) {
sers = append(sers, s...)
}
if len(sers) > 0 {
var idPkTss []*store.IdPkTs
var tmp []*store.IdPkTs
if tmp, err = d.GetFullIdPubkeyBySerials(sers); chk.E(err) {
return
}
idPkTss = append(idPkTss, tmp...)
// sort by timestamp, so the first is the newest
sort.Slice(
idPkTss, func(i, j int) bool {
return idPkTss[i].Ts > idPkTss[j].Ts
},
// For e-tag deletions (delete by ID), any deletion event means the event cannot be resubmitted
// regardless of timestamp, since it's a specific deletion of this exact event
err = errorf.E(
"blocked: %0x was deleted by ID and cannot be resubmitted",
ev.ID,
)
if ev.CreatedAt < idPkTss[0].Ts {
err = errorf.E(
"blocked: %0x was deleted because it is older than the delete: event: %d delete: %d",
ev.ID, ev.CreatedAt, idPkTss[0].Ts,
)
return
}
return
}

View File

@@ -3,11 +3,14 @@ package database
import (
"bytes"
"context"
"fmt"
"strings"
"database.orly/indexes"
"database.orly/indexes/types"
"encoders.orly/event"
"encoders.orly/filter"
"encoders.orly/hex"
"encoders.orly/kind"
"encoders.orly/tag"
"github.com/dgraph-io/badger/v4"
@@ -42,12 +45,32 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
// check if the event already exists
var ser *types.Uint40
if ser, err = d.GetSerialById(ev.ID); err == nil && ser != nil {
err = errorf.E("event already exists: %0x", ev.ID)
err = errorf.E("blocked: event already exists: %0x", ev.ID)
return
}
// If the error is "id not found", we can proceed with saving the event
if err != nil && strings.Contains(err.Error(), "id not found in database") {
// Reset error since this is expected for new events
err = nil
} else if err != nil {
// For any other error, return it
log.E.F("error checking if event exists: %s", err)
return
}
// Check if the event has been deleted before allowing resubmission
if err = d.CheckForDeleted(ev, nil); err != nil {
// log.I.F(
// "SaveEvent: rejecting resubmission of deleted event ID=%s: %v",
// hex.Enc(ev.ID), err,
// )
err = errorf.E("blocked: %s", err.Error())
return
}
// check for replacement
if kind.IsReplaceable(ev.Kind) {
// find the events and delete them
// find the events and check timestamps before deleting
f := &filter.F{
Authors: tag.NewFromBytesSlice(ev.Pubkey),
Kinds: kind.NewS(kind.New(ev.Kind)),
@@ -56,22 +79,50 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
if sers, err = d.GetSerialsFromFilter(f); chk.E(err) {
return
}
// if found, delete them
// if found, check timestamps before deleting
if len(sers) > 0 {
var shouldReplace bool = true
for _, s := range sers {
var oldEv *event.E
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
continue
}
if err = d.DeleteEventBySerial(
c, s, oldEv,
); chk.E(err) {
continue
// Only replace if the new event is newer or same timestamp
if ev.CreatedAt < oldEv.CreatedAt {
log.I.F(
"SaveEvent: rejecting older replaceable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)",
hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID),
oldEv.CreatedAt,
)
shouldReplace = false
break
}
}
if shouldReplace {
for _, s := range sers {
var oldEv *event.E
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
continue
}
log.I.F(
"SaveEvent: replacing older replaceable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)",
hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID),
ev.CreatedAt,
)
if err = d.DeleteEventBySerial(
c, s, oldEv,
); chk.E(err) {
continue
}
}
} else {
// Don't save the older event - return an error
err = errorf.E("blocked: event is older than existing replaceable event")
return
}
}
} else if kind.IsParameterizedReplaceable(ev.Kind) {
// find the events and delete them
// find the events and check timestamps before deleting
dTag := ev.Tags.GetFirst([]byte("d"))
if dTag == nil {
err = errorf.E("event is missing a d tag identifier")
@@ -88,19 +139,47 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
if sers, err = d.GetSerialsFromFilter(f); chk.E(err) {
return
}
// if found, delete them
// if found, check timestamps before deleting
if len(sers) > 0 {
var shouldReplace bool = true
for _, s := range sers {
var oldEv *event.E
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
continue
}
if err = d.DeleteEventBySerial(
c, s, oldEv,
); chk.E(err) {
continue
// Only replace if the new event is newer or same timestamp
if ev.CreatedAt < oldEv.CreatedAt {
log.I.F(
"SaveEvent: rejecting older addressable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)",
hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID),
oldEv.CreatedAt,
)
shouldReplace = false
break
}
}
if shouldReplace {
for _, s := range sers {
var oldEv *event.E
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
continue
}
log.I.F(
"SaveEvent: replacing older addressable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)",
hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID),
ev.CreatedAt,
)
if err = d.DeleteEventBySerial(
c, s, oldEv,
); chk.E(err) {
continue
}
}
} else {
// Don't save the older event - return an error
err = errorf.E("blocked: event is older than existing addressable event")
return
}
}
}
// Get the next sequence number for the event
@@ -153,6 +232,14 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
return
},
)
log.T.F("total data written: %d bytes keys %d bytes values", kc, vc)
log.T.F(
"total data written: %d bytes keys %d bytes values for event ID %s", kc,
vc, hex.Enc(ev.ID),
)
log.T.C(
func() string {
return fmt.Sprintf("event:\n%s\n", ev.Serialize())
},
)
return
}

View File

@@ -11,7 +11,6 @@ import (
"interfaces.orly/codec"
"lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
"utils.orly/constraints"
"utils.orly/units"
)
@@ -55,7 +54,7 @@ func (en *Challenge) Label() string { return L }
func (en *Challenge) Write(w io.Writer) (err error) {
var b []byte
b = en.Marshal(b)
log.D.F("writing out challenge envelope: '%s'", b)
// log.D.F("writing out challenge envelope: '%s'", b)
_, err = w.Write(b)
return
}

View File

@@ -5,7 +5,6 @@ import (
"encoders.orly/hex"
"encoders.orly/ints"
"encoders.orly/text"
"lol.mleku.dev/log"
)
// ToCanonical converts the event to the canonical encoding used to derive the
@@ -23,7 +22,7 @@ func (ev *E) ToCanonical(dst []byte) (b []byte) {
b = append(b, ',')
b = text.AppendQuote(b, ev.Content, text.NostrEscape)
b = append(b, ']')
log.D.F("canonical: %s", b)
// log.D.F("canonical: %s", b)
return
}

View File

@@ -333,7 +333,7 @@ var (
CommunityDefinition = &K{34550}
ACLEvent = &K{39998}
// ParameterizedReplaceableEnd is an event type that...
ParameterizedReplaceableEnd = &K{39999}
ParameterizedReplaceableEnd = &K{40000}
)
var MapMx sync.RWMutex

View File

@@ -22,7 +22,7 @@ const (
type I interface {
Configure(cfg ...any) (err error)
// GetAccessLevel returns the access level string for a given pubkey.
GetAccessLevel(pub []byte) (level string)
GetAccessLevel(pub []byte, address string) (level string)
// GetACLInfo returns the name and a description of the ACL, which should
// explain briefly how it works, and then a long text of documentation of
// the ACL's rules and configuration (in asciidoc or markdown).

View File

@@ -1 +1 @@
v0.2.0
v0.4.2

19
scripts/relaytester-install.sh Executable file
View File

@@ -0,0 +1,19 @@
#!/usr/bin/env bash
## rust must be installed
if ! command -v "cargo" &> /dev/null; then
echo "rust and cargo is not installed."
echo "run this command to install:"
echo
echo "curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh"
exit
else
echo "cargo is installed."
fi
rm -rf relay-tester
git clone https://github.com/mikedilger/relay-tester.git
cd relay-tester
cargo build -r
cp target/release/relay-tester $GOBIN/
cd ..
#rm -rf relay-tester

23
scripts/relaytester-test.sh Executable file
View File

@@ -0,0 +1,23 @@
#!/usr/bin/env bash
## relay-tester must be installed
if ! command -v "relay-tester" &> /dev/null; then
echo "relay-tester is not installed."
echo "run this command to install:"
echo
echo "./scripts/relaytester-install.sh"
exit
fi
rm -rf /tmp/orlytest
export ORLY_LOG_LEVEL=trace
export ORLY_LOG_TO_STDOUT=true
export ORLY_LISTEN=127.0.0.1
export ORLY_PORT=3334
export ORLY_IP_WHITELIST=127.0.0
export ORLY_ADMINS=6d9b216ec1dc329ca43c56634e0dba6aaaf3d45ab878bdf4fa910c7117db0bfa,c284f03a874668eded145490e436b87f1a1fc565cf320e7dea93a7e96e3629d7
export ORLY_ACL_MODE=none
export ORLY_DATA_DIR=/tmp/orlytest
go run . &
sleep 5
relay-tester ws://127.0.0.1:3334 nsec12l4072hvvyjpmkyjtdxn48xf8qj299zw60u7ddg58s2aphv3rpjqtg0tvr nsec1syvtjgqauyeezgrev5nqrp36d87apjk87043tgu2usgv8umyy6wq4yl6tu
killall next.orly.dev
rm -rf /tmp/orlytest

14972
stacktrace.txt Normal file

File diff suppressed because it is too large Load Diff