Compare commits
30 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
6e06905773
|
|||
|
d1316a5b7a
|
|||
|
b45f0a2c51
|
|||
|
e2b7152221
|
|||
|
bf7ca1da43
|
|||
|
bb8998fef6
|
|||
|
57ac3667e6
|
|||
|
cb54891473
|
|||
|
fdcfd863e0
|
|||
|
4e96c9e2f7
|
|||
|
fb956ff09c
|
|||
|
eac6ba1410
|
|||
|
6b4b035f0c
|
|||
|
c2c6720e01
|
|||
|
dddcc682b9
|
|||
|
ddaab70d2b
|
|||
|
61cec63ca9
|
|||
|
b063dab2a3
|
|||
|
9e59d5f72b
|
|||
|
fe3893addf
|
|||
|
5eb192f208
|
|||
|
2385d1f752
|
|||
|
faad7ddc93
|
|||
|
c9314bdbd0
|
|||
|
85d806b157
|
|||
|
6207f9d426
|
|||
|
ebb5e2c0f3
|
|||
|
9dec51cd40
|
|||
|
f570660f37
|
|||
|
3d3a0fa520
|
@@ -89,3 +89,7 @@ A good typical example:
|
|||||||
// - Initializes the relay, starting its operation in a separate goroutine.
|
// - Initializes the relay, starting its operation in a separate goroutine.
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
use the source of the relay-tester to help guide what expectations the test has,
|
||||||
|
and use context7 for information about the nostr protocol, and use additional
|
||||||
|
log statements to help locate the cause of bugs
|
||||||
@@ -29,6 +29,7 @@ type C struct {
|
|||||||
Port int `env:"ORLY_PORT" default:"3334" usage:"port to listen on"`
|
Port int `env:"ORLY_PORT" default:"3334" usage:"port to listen on"`
|
||||||
LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"`
|
LogLevel string `env:"ORLY_LOG_LEVEL" default:"info" usage:"relay log level: fatal error warn info debug trace"`
|
||||||
DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"`
|
DBLogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"database log level: fatal error warn info debug trace"`
|
||||||
|
LogToStdout bool `env:"ORLY_LOG_TO_STDOUT" default:"false" usage:"log to stdout instead of stderr"`
|
||||||
Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation"`
|
Pprof string `env:"ORLY_PPROF" usage:"enable pprof in modes: cpu,memory,allocation"`
|
||||||
IPWhitelist []string `env:"ORLY_IP_WHITELIST" usage:"comma-separated list of IP addresses to allow access from, matches on prefixes to allow private subnets, eg 10.0.0 = 10.0.0.0/8"`
|
IPWhitelist []string `env:"ORLY_IP_WHITELIST" usage:"comma-separated list of IP addresses to allow access from, matches on prefixes to allow private subnets, eg 10.0.0 = 10.0.0.0/8"`
|
||||||
Admins []string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"`
|
Admins []string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"`
|
||||||
@@ -73,6 +74,9 @@ func New() (cfg *C, err error) {
|
|||||||
PrintHelp(cfg, os.Stderr)
|
PrintHelp(cfg, os.Stderr)
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
|
if cfg.LogToStdout {
|
||||||
|
lol.Writer = os.Stdout
|
||||||
|
}
|
||||||
lol.SetLogLevel(cfg.LogLevel)
|
lol.SetLogLevel(cfg.LogLevel)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ func (l *Listener) HandleAuth(b []byte) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.D.F(
|
log.D.F(
|
||||||
"%s authed to pubkey,%0x", l.remote,
|
"%s authed to pubkey %0x", l.remote,
|
||||||
env.Event.Pubkey,
|
env.Event.Pubkey,
|
||||||
)
|
)
|
||||||
l.authedPubkey.Store(env.Event.Pubkey)
|
l.authedPubkey.Store(env.Event.Pubkey)
|
||||||
|
|||||||
@@ -23,14 +23,14 @@ func (l *Listener) GetSerialsFromFilter(f *filter.F) (
|
|||||||
return l.D.GetSerialsFromFilter(f)
|
return l.D.GetSerialsFromFilter(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
|
func (l *Listener) HandleDelete(env *eventenvelope.Submission) (err error) {
|
||||||
log.I.C(
|
// log.I.C(
|
||||||
func() string {
|
// func() string {
|
||||||
return fmt.Sprintf(
|
// return fmt.Sprintf(
|
||||||
"delete event\n%s", env.E.Serialize(),
|
// "delete event\n%s", env.E.Serialize(),
|
||||||
)
|
// )
|
||||||
},
|
// },
|
||||||
)
|
// )
|
||||||
var ownerDelete bool
|
var ownerDelete bool
|
||||||
for _, pk := range l.Admins {
|
for _, pk := range l.Admins {
|
||||||
if utils.FastEqual(pk, env.E.Pubkey) {
|
if utils.FastEqual(pk, env.E.Pubkey) {
|
||||||
@@ -39,15 +39,17 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// process the tags in the delete event
|
// process the tags in the delete event
|
||||||
var err error
|
var deleteErr error
|
||||||
|
var validDeletionFound bool
|
||||||
for _, t := range *env.E.Tags {
|
for _, t := range *env.E.Tags {
|
||||||
// first search for a tags, as these are the simplest to process
|
// first search for a tags, as these are the simplest to process
|
||||||
if utils.FastEqual(t.Key(), []byte("a")) {
|
if utils.FastEqual(t.Key(), []byte("a")) {
|
||||||
at := new(atag.T)
|
at := new(atag.T)
|
||||||
if _, err = at.Unmarshal(t.Value()); chk.E(err) {
|
if _, deleteErr = at.Unmarshal(t.Value()); chk.E(deleteErr) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if ownerDelete || utils.FastEqual(env.E.Pubkey, at.Pubkey) {
|
if ownerDelete || utils.FastEqual(env.E.Pubkey, at.Pubkey) {
|
||||||
|
validDeletionFound = true
|
||||||
// find the event and delete it
|
// find the event and delete it
|
||||||
f := &filter.F{
|
f := &filter.F{
|
||||||
Authors: tag.NewFromBytesSlice(at.Pubkey),
|
Authors: tag.NewFromBytesSlice(at.Pubkey),
|
||||||
@@ -69,13 +71,43 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
|
|||||||
if ev, err = l.FetchEventBySerial(s); chk.E(err) {
|
if ev, err = l.FetchEventBySerial(s); chk.E(err) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !(kind.IsReplaceable(ev.Kind) && len(at.DTag) == 0) {
|
// Only delete events that match the a-tag criteria:
|
||||||
// skip a tags with no dtag if the kind is not
|
// - For parameterized replaceable events: must have matching d-tag
|
||||||
// replaceable.
|
// - For regular replaceable events: should not have d-tag constraint
|
||||||
|
if kind.IsParameterizedReplaceable(ev.Kind) {
|
||||||
|
// For parameterized replaceable, we need a DTag to match
|
||||||
|
if len(at.DTag) == 0 {
|
||||||
|
log.I.F(
|
||||||
|
"HandleDelete: skipping parameterized replaceable event %s - no DTag in a-tag",
|
||||||
|
hex.Enc(ev.ID),
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
} else if !kind.IsReplaceable(ev.Kind) {
|
||||||
|
// For non-replaceable events, a-tags don't apply
|
||||||
|
log.I.F(
|
||||||
|
"HandleDelete: skipping non-replaceable event %s - a-tags only apply to replaceable events",
|
||||||
|
hex.Enc(ev.ID),
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Only delete events that are older than or equal to the delete event timestamp
|
||||||
|
if ev.CreatedAt > env.E.CreatedAt {
|
||||||
|
log.I.F(
|
||||||
|
"HandleDelete: skipping newer event %s (created_at=%d) - delete event timestamp is %d",
|
||||||
|
hex.Enc(ev.ID), ev.CreatedAt, env.E.CreatedAt,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.I.F(
|
||||||
|
"HandleDelete: deleting event %s via a-tag %d:%s:%s (event_time=%d, delete_time=%d)",
|
||||||
|
hex.Enc(ev.ID), at.Kind.K, hex.Enc(at.Pubkey),
|
||||||
|
string(at.DTag), ev.CreatedAt, env.E.CreatedAt,
|
||||||
|
)
|
||||||
if err = l.DeleteEventBySerial(
|
if err = l.DeleteEventBySerial(
|
||||||
l.Ctx, s, ev,
|
l.Ctx(), s, ev,
|
||||||
); chk.E(err) {
|
); chk.E(err) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -87,10 +119,16 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
|
|||||||
// if e tags are found, delete them if the author is signer, or one of
|
// if e tags are found, delete them if the author is signer, or one of
|
||||||
// the owners is signer
|
// the owners is signer
|
||||||
if utils.FastEqual(t.Key(), []byte("e")) {
|
if utils.FastEqual(t.Key(), []byte("e")) {
|
||||||
var dst []byte
|
val := t.Value()
|
||||||
if _, err = hex.DecBytes(dst, t.Value()); chk.E(err) {
|
if len(val) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
var dst []byte
|
||||||
|
if b, e := hex.Dec(string(val)); chk.E(e) {
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
dst = b
|
||||||
|
}
|
||||||
f := &filter.F{
|
f := &filter.F{
|
||||||
Ids: tag.NewFromBytesSlice(dst),
|
Ids: tag.NewFromBytesSlice(dst),
|
||||||
}
|
}
|
||||||
@@ -108,16 +146,26 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// check that the author is the same as the signer of the
|
// check that the author is the same as the signer of the
|
||||||
// delete, for the k tag case the author is the signer of
|
// delete, for the e tag case the author is the signer of
|
||||||
// the event.
|
// the event.
|
||||||
if !utils.FastEqual(env.E.Pubkey, ev.Pubkey) {
|
if !utils.FastEqual(env.E.Pubkey, ev.Pubkey) {
|
||||||
|
log.W.F(
|
||||||
|
"HandleDelete: attempted deletion of event %s by different user - delete pubkey=%s, event pubkey=%s",
|
||||||
|
hex.Enc(ev.ID), hex.Enc(env.E.Pubkey),
|
||||||
|
hex.Enc(ev.Pubkey),
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
validDeletionFound = true
|
||||||
// exclude delete events
|
// exclude delete events
|
||||||
if ev.Kind == kind.EventDeletion.K {
|
if ev.Kind == kind.EventDeletion.K {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err = l.DeleteEventBySerial(l.Ctx, s, ev); chk.E(err) {
|
log.I.F(
|
||||||
|
"HandleDelete: deleting event %s by authorized user %s",
|
||||||
|
hex.Enc(ev.ID), hex.Enc(env.E.Pubkey),
|
||||||
|
)
|
||||||
|
if err = l.DeleteEventBySerial(l.Ctx(), s, ev); chk.E(err) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -164,5 +212,11 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) {
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If no valid deletions were found, return an error
|
||||||
|
if !validDeletionFound {
|
||||||
|
return fmt.Errorf("blocked: cannot delete events that belong to other users")
|
||||||
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
package app
|
package app
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
acl "acl.orly"
|
acl "acl.orly"
|
||||||
"encoders.orly/envelopes/authenvelope"
|
"encoders.orly/envelopes/authenvelope"
|
||||||
@@ -62,7 +64,7 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// check permissions of user
|
// check permissions of user
|
||||||
accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load())
|
accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load(), l.remote)
|
||||||
switch accessLevel {
|
switch accessLevel {
|
||||||
case "none":
|
case "none":
|
||||||
log.D.F(
|
log.D.F(
|
||||||
@@ -99,11 +101,21 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
|
|||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
// user has write access or better, continue
|
// user has write access or better, continue
|
||||||
log.D.F("user has %s access", accessLevel)
|
// log.D.F("user has %s access", accessLevel)
|
||||||
}
|
}
|
||||||
// if the event is a delete, process the delete
|
// if the event is a delete, process the delete
|
||||||
if env.E.Kind == kind.EventDeletion.K {
|
if env.E.Kind == kind.EventDeletion.K {
|
||||||
l.HandleDelete(env)
|
if err = l.HandleDelete(env); err != nil {
|
||||||
|
if strings.HasPrefix(err.Error(), "blocked:") {
|
||||||
|
errStr := err.Error()[len("blocked: "):len(err.Error())]
|
||||||
|
if err = Ok.Error(
|
||||||
|
l, env, errStr,
|
||||||
|
); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// check if the event was deleted
|
// check if the event was deleted
|
||||||
if err = l.CheckForDeleted(env.E, l.Admins); err != nil {
|
if err = l.CheckForDeleted(env.E, l.Admins); err != nil {
|
||||||
@@ -117,21 +129,49 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// store the event
|
// store the event - use a separate context to prevent cancellation issues
|
||||||
log.I.F("saving event %0x, %s", env.E.ID, env.E.Serialize())
|
saveCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
if _, _, err = l.SaveEvent(l.Ctx, env.E); chk.E(err) {
|
defer cancel()
|
||||||
|
// log.I.F("saving event %0x, %s", env.E.ID, env.E.Serialize())
|
||||||
|
if _, _, err = l.SaveEvent(saveCtx, env.E); err != nil {
|
||||||
|
if strings.HasPrefix(err.Error(), "blocked:") {
|
||||||
|
errStr := err.Error()[len("blocked: "):len(err.Error())]
|
||||||
|
if err = Ok.Error(
|
||||||
|
l, env, errStr,
|
||||||
|
); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
chk.E(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// if a follow list was saved, reconfigure ACLs now that it is persisted
|
|
||||||
if env.E.Kind == kind.FollowList.K {
|
|
||||||
if err = acl.Registry.Configure(); chk.E(err) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
l.publishers.Deliver(env.E)
|
|
||||||
// Send a success response storing
|
// Send a success response storing
|
||||||
if err = Ok.Ok(l, env, ""); chk.E(err) {
|
if err = Ok.Ok(l, env, ""); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// Deliver the event to subscribers immediately after sending OK response
|
||||||
|
l.publishers.Deliver(env.E)
|
||||||
log.D.F("saved event %0x", env.E.ID)
|
log.D.F("saved event %0x", env.E.ID)
|
||||||
|
var isNewFromAdmin bool
|
||||||
|
for _, admin := range l.Admins {
|
||||||
|
if utils.FastEqual(admin, env.E.Pubkey) {
|
||||||
|
isNewFromAdmin = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if isNewFromAdmin {
|
||||||
|
log.I.F("new event from admin %0x", env.E.Pubkey)
|
||||||
|
// if a follow list was saved, reconfigure ACLs now that it is persisted
|
||||||
|
if env.E.Kind == kind.FollowList.K ||
|
||||||
|
env.E.Kind == kind.RelayListMetadata.K {
|
||||||
|
// Run ACL reconfiguration asynchronously to prevent blocking websocket operations
|
||||||
|
go func() {
|
||||||
|
if err := acl.Registry.Configure(); chk.E(err) {
|
||||||
|
log.E.F("failed to reconfigure ACL: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
package app
|
package app
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"encoders.orly/envelopes"
|
"encoders.orly/envelopes"
|
||||||
"encoders.orly/envelopes/authenvelope"
|
"encoders.orly/envelopes/authenvelope"
|
||||||
"encoders.orly/envelopes/closeenvelope"
|
"encoders.orly/envelopes/closeenvelope"
|
||||||
@@ -15,42 +13,36 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (l *Listener) HandleMessage(msg []byte, remote string) {
|
func (l *Listener) HandleMessage(msg []byte, remote string) {
|
||||||
log.D.C(
|
log.D.F("%s received message:\n%s", remote, msg)
|
||||||
func() string {
|
|
||||||
return fmt.Sprintf(
|
|
||||||
"%s received message:\n%s", remote, msg,
|
|
||||||
)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
var err error
|
var err error
|
||||||
var t string
|
var t string
|
||||||
var rem []byte
|
var rem []byte
|
||||||
if t, rem, err = envelopes.Identify(msg); !chk.E(err) {
|
if t, rem, err = envelopes.Identify(msg); !chk.E(err) {
|
||||||
switch t {
|
switch t {
|
||||||
case eventenvelope.L:
|
case eventenvelope.L:
|
||||||
log.D.F("eventenvelope: %s", rem)
|
// log.D.F("eventenvelope: %s %s", remote, rem)
|
||||||
err = l.HandleEvent(rem)
|
err = l.HandleEvent(rem)
|
||||||
case reqenvelope.L:
|
case reqenvelope.L:
|
||||||
log.D.F("reqenvelope: %s", rem)
|
// log.D.F("reqenvelope: %s %s", remote, rem)
|
||||||
err = l.HandleReq(rem)
|
err = l.HandleReq(rem)
|
||||||
case closeenvelope.L:
|
case closeenvelope.L:
|
||||||
log.D.F("closeenvelope: %s", rem)
|
// log.D.F("closeenvelope: %s %s", remote, rem)
|
||||||
err = l.HandleClose(rem)
|
err = l.HandleClose(rem)
|
||||||
case authenvelope.L:
|
case authenvelope.L:
|
||||||
log.D.F("authenvelope: %s", rem)
|
// log.D.F("authenvelope: %s %s", remote, rem)
|
||||||
err = l.HandleAuth(rem)
|
err = l.HandleAuth(rem)
|
||||||
default:
|
default:
|
||||||
err = errorf.E("unknown envelope type %s\n%s", t, rem)
|
err = errorf.E("unknown envelope type %s\n%s", t, rem)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.D.C(
|
// log.D.C(
|
||||||
func() string {
|
// func() string {
|
||||||
return fmt.Sprintf(
|
// return fmt.Sprintf(
|
||||||
"notice->%s %s", remote, err,
|
// "notice->%s %s", remote, err,
|
||||||
)
|
// )
|
||||||
},
|
// },
|
||||||
)
|
// )
|
||||||
if err = noticeenvelope.NewFrom(err.Error()).Write(l); chk.E(err) {
|
if err = noticeenvelope.NewFrom(err.Error()).Write(l); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,32 +33,32 @@ func (s *Server) HandleRelayInfo(w http.ResponseWriter, r *http.Request) {
|
|||||||
relayinfo.BasicProtocol,
|
relayinfo.BasicProtocol,
|
||||||
// relayinfo.Authentication,
|
// relayinfo.Authentication,
|
||||||
// relayinfo.EncryptedDirectMessage,
|
// relayinfo.EncryptedDirectMessage,
|
||||||
// relayinfo.EventDeletion,
|
relayinfo.EventDeletion,
|
||||||
relayinfo.RelayInformationDocument,
|
relayinfo.RelayInformationDocument,
|
||||||
// relayinfo.GenericTagQueries,
|
// relayinfo.GenericTagQueries,
|
||||||
// relayinfo.NostrMarketplace,
|
// relayinfo.NostrMarketplace,
|
||||||
// relayinfo.EventTreatment,
|
relayinfo.EventTreatment,
|
||||||
// relayinfo.CommandResults,
|
// relayinfo.CommandResults,
|
||||||
// relayinfo.ParameterizedReplaceableEvents,
|
relayinfo.ParameterizedReplaceableEvents,
|
||||||
// relayinfo.ExpirationTimestamp,
|
// relayinfo.ExpirationTimestamp,
|
||||||
// relayinfo.ProtectedEvents,
|
relayinfo.ProtectedEvents,
|
||||||
// relayinfo.RelayListMetadata,
|
relayinfo.RelayListMetadata,
|
||||||
)
|
)
|
||||||
if s.Config.ACLMode != "none" {
|
if s.Config.ACLMode != "none" {
|
||||||
supportedNIPs = relayinfo.GetList(
|
supportedNIPs = relayinfo.GetList(
|
||||||
relayinfo.BasicProtocol,
|
relayinfo.BasicProtocol,
|
||||||
relayinfo.Authentication,
|
relayinfo.Authentication,
|
||||||
// relayinfo.EncryptedDirectMessage,
|
// relayinfo.EncryptedDirectMessage,
|
||||||
// relayinfo.EventDeletion,
|
relayinfo.EventDeletion,
|
||||||
relayinfo.RelayInformationDocument,
|
relayinfo.RelayInformationDocument,
|
||||||
// relayinfo.GenericTagQueries,
|
// relayinfo.GenericTagQueries,
|
||||||
// relayinfo.NostrMarketplace,
|
// relayinfo.NostrMarketplace,
|
||||||
// relayinfo.EventTreatment,
|
relayinfo.EventTreatment,
|
||||||
// relayinfo.CommandResults,
|
// relayinfo.CommandResults,
|
||||||
// relayinfo.ParameterizedReplaceableEvents,
|
// relayinfo.ParameterizedReplaceableEvents,
|
||||||
// relayinfo.ExpirationTimestamp,
|
// relayinfo.ExpirationTimestamp,
|
||||||
// relayinfo.ProtectedEvents,
|
relayinfo.ProtectedEvents,
|
||||||
// relayinfo.RelayListMetadata,
|
relayinfo.RelayListMetadata,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
sort.Sort(supportedNIPs)
|
sort.Sort(supportedNIPs)
|
||||||
|
|||||||
@@ -1,7 +1,10 @@
|
|||||||
package app
|
package app
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
acl "acl.orly"
|
acl "acl.orly"
|
||||||
"encoders.orly/envelopes/authenvelope"
|
"encoders.orly/envelopes/authenvelope"
|
||||||
@@ -24,16 +27,15 @@ import (
|
|||||||
"utils.orly/pointers"
|
"utils.orly/pointers"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (l *Listener) HandleReq(msg []byte) (
|
func (l *Listener) HandleReq(msg []byte) (err error) {
|
||||||
err error,
|
log.T.F("HandleReq: START processing from %s\n%s\n", l.remote, msg)
|
||||||
) {
|
|
||||||
var rem []byte
|
var rem []byte
|
||||||
env := reqenvelope.New()
|
env := reqenvelope.New()
|
||||||
if rem, err = env.Unmarshal(msg); chk.E(err) {
|
if rem, err = env.Unmarshal(msg); chk.E(err) {
|
||||||
return normalize.Error.Errorf(err.Error())
|
return normalize.Error.Errorf(err.Error())
|
||||||
}
|
}
|
||||||
if len(rem) > 0 {
|
if len(rem) > 0 {
|
||||||
log.I.F("extra '%s'", rem)
|
log.I.F("REQ extra bytes: '%s'", rem)
|
||||||
}
|
}
|
||||||
// send a challenge to the client to auth if an ACL is active
|
// send a challenge to the client to auth if an ACL is active
|
||||||
if acl.Registry.Active.Load() != "none" {
|
if acl.Registry.Active.Load() != "none" {
|
||||||
@@ -43,7 +45,7 @@ func (l *Listener) HandleReq(msg []byte) (
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// check permissions of user
|
// check permissions of user
|
||||||
accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load())
|
accessLevel := acl.Registry.GetAccessLevel(l.authedPubkey.Load(), l.remote)
|
||||||
switch accessLevel {
|
switch accessLevel {
|
||||||
case "none":
|
case "none":
|
||||||
if err = okenvelope.NewFrom(
|
if err = okenvelope.NewFrom(
|
||||||
@@ -59,17 +61,71 @@ func (l *Listener) HandleReq(msg []byte) (
|
|||||||
}
|
}
|
||||||
var events event.S
|
var events event.S
|
||||||
for _, f := range *env.Filters {
|
for _, f := range *env.Filters {
|
||||||
|
idsLen := 0
|
||||||
|
kindsLen := 0
|
||||||
|
authorsLen := 0
|
||||||
|
tagsLen := 0
|
||||||
|
if f != nil {
|
||||||
|
if f.Ids != nil {
|
||||||
|
idsLen = f.Ids.Len()
|
||||||
|
}
|
||||||
|
if f.Kinds != nil {
|
||||||
|
kindsLen = f.Kinds.Len()
|
||||||
|
}
|
||||||
|
if f.Authors != nil {
|
||||||
|
authorsLen = f.Authors.Len()
|
||||||
|
}
|
||||||
|
if f.Tags != nil {
|
||||||
|
tagsLen = f.Tags.Len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.T.F(
|
||||||
|
"REQ %s: filter summary ids=%d kinds=%d authors=%d tags=%d",
|
||||||
|
env.Subscription, idsLen, kindsLen, authorsLen, tagsLen,
|
||||||
|
)
|
||||||
|
if f != nil && f.Authors != nil && f.Authors.Len() > 0 {
|
||||||
|
var authors []string
|
||||||
|
for _, a := range f.Authors.T {
|
||||||
|
authors = append(authors, hex.Enc(a))
|
||||||
|
}
|
||||||
|
log.T.F("REQ %s: authors=%v", env.Subscription, authors)
|
||||||
|
}
|
||||||
|
if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 {
|
||||||
|
log.T.F("REQ %s: kinds=%v", env.Subscription, f.Kinds.ToUint16())
|
||||||
|
}
|
||||||
|
if f != nil && f.Ids != nil && f.Ids.Len() > 0 {
|
||||||
|
var ids []string
|
||||||
|
for _, id := range f.Ids.T {
|
||||||
|
ids = append(ids, hex.Enc(id))
|
||||||
|
}
|
||||||
|
var lim any
|
||||||
|
if pointers.Present(f.Limit) {
|
||||||
|
lim = *f.Limit
|
||||||
|
} else {
|
||||||
|
lim = nil
|
||||||
|
}
|
||||||
|
log.T.F(
|
||||||
|
"REQ %s: ids filter count=%d ids=%v limit=%v", env.Subscription,
|
||||||
|
f.Ids.Len(), ids, lim,
|
||||||
|
)
|
||||||
|
}
|
||||||
if pointers.Present(f.Limit) {
|
if pointers.Present(f.Limit) {
|
||||||
if *f.Limit == 0 {
|
if *f.Limit == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if events, err = l.QueryEvents(l.Ctx, f); chk.E(err) {
|
// Use a separate context for QueryEvents to prevent cancellation issues
|
||||||
if errors.Is(err, badger.ErrDBClosed) {
|
queryCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
return
|
defer cancel()
|
||||||
}
|
log.T.F("HandleReq: About to QueryEvents for %s, main context done: %v", l.remote, l.ctx.Err() != nil)
|
||||||
err = nil
|
if events, err = l.QueryEvents(queryCtx, f); chk.E(err) {
|
||||||
}
|
if errors.Is(err, badger.ErrDBClosed) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.T.F("HandleReq: QueryEvents error for %s: %v", l.remote, err)
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
log.T.F("HandleReq: QueryEvents completed for %s, found %d events", l.remote, len(events))
|
||||||
}
|
}
|
||||||
var tmp event.S
|
var tmp event.S
|
||||||
privCheck:
|
privCheck:
|
||||||
@@ -115,8 +171,15 @@ privCheck:
|
|||||||
events = tmp
|
events = tmp
|
||||||
seen := make(map[string]struct{})
|
seen := make(map[string]struct{})
|
||||||
for _, ev := range events {
|
for _, ev := range events {
|
||||||
// track the IDs we've sent
|
log.T.F(
|
||||||
seen[string(ev.ID)] = struct{}{}
|
"REQ %s: sending EVENT id=%s kind=%d", env.Subscription,
|
||||||
|
hex.Enc(ev.ID), ev.Kind,
|
||||||
|
)
|
||||||
|
log.T.C(
|
||||||
|
func() string {
|
||||||
|
return fmt.Sprintf("event:\n%s\n", ev.Serialize())
|
||||||
|
},
|
||||||
|
)
|
||||||
var res *eventenvelope.Result
|
var res *eventenvelope.Result
|
||||||
if res, err = eventenvelope.NewResultWith(
|
if res, err = eventenvelope.NewResultWith(
|
||||||
env.Subscription, ev,
|
env.Subscription, ev,
|
||||||
@@ -126,6 +189,8 @@ privCheck:
|
|||||||
if err = res.Write(l); chk.E(err) {
|
if err = res.Write(l); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// track the IDs we've sent (use hex encoding for stable key)
|
||||||
|
seen[hex.Enc(ev.ID)] = struct{}{}
|
||||||
}
|
}
|
||||||
// write the EOSE to signal to the client that all events found have been
|
// write the EOSE to signal to the client that all events found have been
|
||||||
// sent.
|
// sent.
|
||||||
@@ -137,6 +202,10 @@ privCheck:
|
|||||||
// if the query was for just Ids, we know there can't be any more results,
|
// if the query was for just Ids, we know there can't be any more results,
|
||||||
// so cancel the subscription.
|
// so cancel the subscription.
|
||||||
cancel := true
|
cancel := true
|
||||||
|
log.T.F(
|
||||||
|
"REQ %s: computing cancel/subscription; events_sent=%d",
|
||||||
|
env.Subscription, len(events),
|
||||||
|
)
|
||||||
var subbedFilters filter.S
|
var subbedFilters filter.S
|
||||||
for _, f := range *env.Filters {
|
for _, f := range *env.Filters {
|
||||||
if f.Ids.Len() < 1 {
|
if f.Ids.Len() < 1 {
|
||||||
@@ -145,12 +214,16 @@ privCheck:
|
|||||||
} else {
|
} else {
|
||||||
// remove the IDs that we already sent
|
// remove the IDs that we already sent
|
||||||
var notFounds [][]byte
|
var notFounds [][]byte
|
||||||
for _, ev := range events {
|
for _, id := range f.Ids.T {
|
||||||
if _, ok := seen[string(ev.ID)]; ok {
|
if _, ok := seen[hex.Enc(id)]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
notFounds = append(notFounds, ev.ID)
|
notFounds = append(notFounds, id)
|
||||||
}
|
}
|
||||||
|
log.T.F(
|
||||||
|
"REQ %s: ids outstanding=%d of %d", env.Subscription,
|
||||||
|
len(notFounds), f.Ids.Len(),
|
||||||
|
)
|
||||||
// if all were found, don't add to subbedFilters
|
// if all were found, don't add to subbedFilters
|
||||||
if len(notFounds) == 0 {
|
if len(notFounds) == 0 {
|
||||||
continue
|
continue
|
||||||
@@ -172,11 +245,12 @@ privCheck:
|
|||||||
if !cancel {
|
if !cancel {
|
||||||
l.publishers.Receive(
|
l.publishers.Receive(
|
||||||
&W{
|
&W{
|
||||||
Conn: l.conn,
|
Conn: l.conn,
|
||||||
remote: l.remote,
|
remote: l.remote,
|
||||||
Id: string(env.Subscription),
|
Id: string(env.Subscription),
|
||||||
Receiver: receiver,
|
Receiver: receiver,
|
||||||
Filters: env.Filters,
|
Filters: env.Filters,
|
||||||
|
AuthedPubkey: l.authedPubkey.Load(),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
@@ -186,5 +260,6 @@ privCheck:
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
log.T.F("HandleReq: COMPLETED processing from %s", l.remote)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"encoders.orly/envelopes/authenvelope"
|
||||||
"encoders.orly/hex"
|
"encoders.orly/hex"
|
||||||
"github.com/coder/websocket"
|
"github.com/coder/websocket"
|
||||||
"lol.mleku.dev/chk"
|
"lol.mleku.dev/chk"
|
||||||
@@ -18,6 +19,8 @@ const (
|
|||||||
DefaultWriteWait = 10 * time.Second
|
DefaultWriteWait = 10 * time.Second
|
||||||
DefaultPongWait = 60 * time.Second
|
DefaultPongWait = 60 * time.Second
|
||||||
DefaultPingWait = DefaultPongWait / 2
|
DefaultPingWait = DefaultPongWait / 2
|
||||||
|
DefaultReadTimeout = 3 * time.Second // Read timeout to detect stalled connections
|
||||||
|
DefaultWriteTimeout = 3 * time.Second
|
||||||
DefaultMaxMessageSize = 1 * units.Mb
|
DefaultMaxMessageSize = 1 * units.Mb
|
||||||
|
|
||||||
// CloseMessage denotes a close control message. The optional message
|
// CloseMessage denotes a close control message. The optional message
|
||||||
@@ -70,10 +73,18 @@ whitelist:
|
|||||||
chal := make([]byte, 32)
|
chal := make([]byte, 32)
|
||||||
rand.Read(chal)
|
rand.Read(chal)
|
||||||
listener.challenge.Store([]byte(hex.Enc(chal)))
|
listener.challenge.Store([]byte(hex.Enc(chal)))
|
||||||
|
// If admins are configured, immediately prompt client to AUTH (NIP-42)
|
||||||
|
if len(s.Config.Admins) > 0 {
|
||||||
|
// log.D.F("sending initial AUTH challenge to %s", remote)
|
||||||
|
if err = authenvelope.NewChallengeWith(listener.challenge.Load()).
|
||||||
|
Write(listener); chk.E(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
ticker := time.NewTicker(DefaultPingWait)
|
ticker := time.NewTicker(DefaultPingWait)
|
||||||
go s.Pinger(ctx, conn, ticker, cancel)
|
go s.Pinger(ctx, conn, ticker, cancel)
|
||||||
defer func() {
|
defer func() {
|
||||||
log.D.F("closing websocket connection from %s", remote)
|
// log.D.F("closing websocket connection from %s", remote)
|
||||||
cancel()
|
cancel()
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
listener.publishers.Receive(&W{Cancel: true})
|
listener.publishers.Receive(&W{Cancel: true})
|
||||||
@@ -87,12 +98,33 @@ whitelist:
|
|||||||
var typ websocket.MessageType
|
var typ websocket.MessageType
|
||||||
var msg []byte
|
var msg []byte
|
||||||
log.T.F("waiting for message from %s", remote)
|
log.T.F("waiting for message from %s", remote)
|
||||||
if typ, msg, err = conn.Read(ctx); chk.E(err) {
|
|
||||||
|
// Create a read context with timeout to prevent indefinite blocking
|
||||||
|
readCtx, readCancel := context.WithTimeout(ctx, DefaultReadTimeout)
|
||||||
|
typ, msg, err = conn.Read(readCtx)
|
||||||
|
readCancel()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
if strings.Contains(
|
if strings.Contains(
|
||||||
err.Error(), "use of closed network connection",
|
err.Error(), "use of closed network connection",
|
||||||
) {
|
) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// Handle timeout errors - occurs when client becomes unresponsive
|
||||||
|
if strings.Contains(err.Error(), "context deadline exceeded") {
|
||||||
|
log.T.F(
|
||||||
|
"connection from %s timed out after %v", remote,
|
||||||
|
DefaultReadTimeout,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Handle EOF errors gracefully - these occur when client closes connection
|
||||||
|
// or sends incomplete/malformed WebSocket frames
|
||||||
|
if strings.Contains(err.Error(), "EOF") ||
|
||||||
|
strings.Contains(err.Error(), "failed to read frame header") {
|
||||||
|
log.T.F("connection from %s closed: %v", remote, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
status := websocket.CloseStatus(err)
|
status := websocket.CloseStatus(err)
|
||||||
switch status {
|
switch status {
|
||||||
case websocket.StatusNormalClosure,
|
case websocket.StatusNormalClosure,
|
||||||
@@ -100,17 +132,27 @@ whitelist:
|
|||||||
websocket.StatusNoStatusRcvd,
|
websocket.StatusNoStatusRcvd,
|
||||||
websocket.StatusAbnormalClosure,
|
websocket.StatusAbnormalClosure,
|
||||||
websocket.StatusProtocolError:
|
websocket.StatusProtocolError:
|
||||||
|
log.T.F(
|
||||||
|
"connection from %s closed with status: %v", remote, status,
|
||||||
|
)
|
||||||
default:
|
default:
|
||||||
log.E.F("unexpected close error from %s: %v", remote, err)
|
log.E.F("unexpected close error from %s: %v", remote, err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if typ == PingMessage {
|
if typ == PingMessage {
|
||||||
if err = conn.Write(ctx, PongMessage, msg); chk.E(err) {
|
// Create a write context with timeout for pong response
|
||||||
|
writeCtx, writeCancel := context.WithTimeout(
|
||||||
|
ctx, DefaultWriteTimeout,
|
||||||
|
)
|
||||||
|
if err = conn.Write(writeCtx, PongMessage, msg); chk.E(err) {
|
||||||
|
writeCancel()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
writeCancel()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
log.T.F("received message from %s: %s", remote, string(msg))
|
||||||
go listener.HandleMessage(msg, remote)
|
go listener.HandleMessage(msg, remote)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -127,9 +169,13 @@ func (s *Server) Pinger(
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if err = conn.Ping(ctx); chk.E(err) {
|
// Create a write context with timeout for ping operation
|
||||||
|
pingCtx, pingCancel := context.WithTimeout(ctx, DefaultWriteTimeout)
|
||||||
|
if err = conn.Ping(pingCtx); chk.E(err) {
|
||||||
|
pingCancel()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
pingCancel()
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,8 +19,21 @@ type Listener struct {
|
|||||||
authedPubkey atomic.Bytes
|
authedPubkey atomic.Bytes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ctx returns the listener's context, but creates a new context for each operation
|
||||||
|
// to prevent cancellation from affecting subsequent operations
|
||||||
|
func (l *Listener) Ctx() context.Context {
|
||||||
|
return l.ctx
|
||||||
|
}
|
||||||
|
|
||||||
func (l *Listener) Write(p []byte) (n int, err error) {
|
func (l *Listener) Write(p []byte) (n int, err error) {
|
||||||
if err = l.conn.Write(l.ctx, websocket.MessageText, p); chk.E(err) {
|
// Use a separate context with timeout for writes to prevent race conditions
|
||||||
|
// where the main connection context gets cancelled while writing events
|
||||||
|
writeCtx, cancel := context.WithTimeout(
|
||||||
|
context.Background(), DefaultWriteTimeout,
|
||||||
|
)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err = l.conn.Write(writeCtx, websocket.MessageText, p); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
n = len(p)
|
n = len(p)
|
||||||
|
|||||||
@@ -28,6 +28,9 @@ func Run(
|
|||||||
var err error
|
var err error
|
||||||
var adminKeys [][]byte
|
var adminKeys [][]byte
|
||||||
for _, admin := range cfg.Admins {
|
for _, admin := range cfg.Admins {
|
||||||
|
if len(admin) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
var pk []byte
|
var pk []byte
|
||||||
if pk, err = bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(err) {
|
if pk, err = bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(err) {
|
||||||
continue
|
continue
|
||||||
|
|||||||
145
app/publisher.go
145
app/publisher.go
@@ -8,17 +8,21 @@ import (
|
|||||||
"encoders.orly/envelopes/eventenvelope"
|
"encoders.orly/envelopes/eventenvelope"
|
||||||
"encoders.orly/event"
|
"encoders.orly/event"
|
||||||
"encoders.orly/filter"
|
"encoders.orly/filter"
|
||||||
|
"encoders.orly/hex"
|
||||||
|
"encoders.orly/kind"
|
||||||
"github.com/coder/websocket"
|
"github.com/coder/websocket"
|
||||||
"interfaces.orly/publisher"
|
"interfaces.orly/publisher"
|
||||||
"interfaces.orly/typer"
|
"interfaces.orly/typer"
|
||||||
"lol.mleku.dev/chk"
|
"lol.mleku.dev/chk"
|
||||||
"lol.mleku.dev/log"
|
"lol.mleku.dev/log"
|
||||||
|
utils "utils.orly"
|
||||||
)
|
)
|
||||||
|
|
||||||
const Type = "socketapi"
|
const Type = "socketapi"
|
||||||
|
|
||||||
type Subscription struct {
|
type Subscription struct {
|
||||||
remote string
|
remote string
|
||||||
|
AuthedPubkey []byte
|
||||||
*filter.S
|
*filter.S
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -46,6 +50,9 @@ type W struct {
|
|||||||
// associated with this WebSocket connection. It is used to determine which
|
// associated with this WebSocket connection. It is used to determine which
|
||||||
// notifications or data should be received by the subscriber.
|
// notifications or data should be received by the subscriber.
|
||||||
Filters *filter.S
|
Filters *filter.S
|
||||||
|
|
||||||
|
// AuthedPubkey is the authenticated pubkey associated with the listener (if any).
|
||||||
|
AuthedPubkey []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *W) Type() (typeName string) { return Type }
|
func (w *W) Type() (typeName string) { return Type }
|
||||||
@@ -56,7 +63,7 @@ func (w *W) Type() (typeName string) { return Type }
|
|||||||
type P struct {
|
type P struct {
|
||||||
c context.Context
|
c context.Context
|
||||||
// Mx is the mutex for the Map.
|
// Mx is the mutex for the Map.
|
||||||
Mx sync.Mutex
|
Mx sync.RWMutex
|
||||||
// Map is the map of subscribers and subscriptions from the websocket api.
|
// Map is the map of subscribers and subscriptions from the websocket api.
|
||||||
Map
|
Map
|
||||||
}
|
}
|
||||||
@@ -112,7 +119,9 @@ func (p *P) Receive(msg typer.T) {
|
|||||||
defer p.Mx.Unlock()
|
defer p.Mx.Unlock()
|
||||||
if subs, ok := p.Map[m.Conn]; !ok {
|
if subs, ok := p.Map[m.Conn]; !ok {
|
||||||
subs = make(map[string]Subscription)
|
subs = make(map[string]Subscription)
|
||||||
subs[m.Id] = Subscription{S: m.Filters, remote: m.remote}
|
subs[m.Id] = Subscription{
|
||||||
|
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
|
||||||
|
}
|
||||||
p.Map[m.Conn] = subs
|
p.Map[m.Conn] = subs
|
||||||
log.D.C(
|
log.D.C(
|
||||||
func() string {
|
func() string {
|
||||||
@@ -124,7 +133,9 @@ func (p *P) Receive(msg typer.T) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
subs[m.Id] = Subscription{S: m.Filters, remote: m.remote}
|
subs[m.Id] = Subscription{
|
||||||
|
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
|
||||||
|
}
|
||||||
log.D.C(
|
log.D.C(
|
||||||
func() string {
|
func() string {
|
||||||
return fmt.Sprintf(
|
return fmt.Sprintf(
|
||||||
@@ -150,71 +161,111 @@ func (p *P) Receive(msg typer.T) {
|
|||||||
// for unauthenticated users when events are privileged.
|
// for unauthenticated users when events are privileged.
|
||||||
func (p *P) Deliver(ev *event.E) {
|
func (p *P) Deliver(ev *event.E) {
|
||||||
var err error
|
var err error
|
||||||
p.Mx.Lock()
|
// Snapshot the deliveries under read lock to avoid holding locks during I/O
|
||||||
defer p.Mx.Unlock()
|
p.Mx.RLock()
|
||||||
log.D.C(
|
type delivery struct {
|
||||||
func() string {
|
w *websocket.Conn
|
||||||
return fmt.Sprintf(
|
id string
|
||||||
"delivering event %0x to websocket subscribers %d", ev.ID,
|
sub Subscription
|
||||||
len(p.Map),
|
}
|
||||||
)
|
var deliveries []delivery
|
||||||
},
|
|
||||||
)
|
|
||||||
for w, subs := range p.Map {
|
for w, subs := range p.Map {
|
||||||
for id, subscriber := range subs {
|
for id, subscriber := range subs {
|
||||||
if !subscriber.Match(ev) {
|
if subscriber.Match(ev) {
|
||||||
continue
|
deliveries = append(
|
||||||
|
deliveries, delivery{w: w, id: id, sub: subscriber},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
// if p.Server.AuthRequired() {
|
|
||||||
// if !auth.CheckPrivilege(w.AuthedPubkey(), ev) {
|
|
||||||
// continue
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
var res *eventenvelope.Result
|
|
||||||
if res, err = eventenvelope.NewResultWith(id, ev); chk.E(err) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err = w.Write(
|
|
||||||
p.c, websocket.MessageText, res.Marshal(nil),
|
|
||||||
); chk.E(err) {
|
|
||||||
p.removeSubscriber(w)
|
|
||||||
if err = w.CloseNow(); chk.E(err) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.D.C(
|
|
||||||
func() string {
|
|
||||||
return fmt.Sprintf(
|
|
||||||
"dispatched event %0x to subscription %s, %s",
|
|
||||||
ev.ID, id, subscriber.remote,
|
|
||||||
)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
p.Mx.RUnlock()
|
||||||
|
if len(deliveries) > 0 {
|
||||||
|
log.D.C(
|
||||||
|
func() string {
|
||||||
|
return fmt.Sprintf(
|
||||||
|
"delivering event %0x to websocket subscribers %d", ev.ID,
|
||||||
|
len(deliveries),
|
||||||
|
)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
for _, d := range deliveries {
|
||||||
|
// If the event is privileged, enforce that the subscriber's authed pubkey matches
|
||||||
|
// either the event pubkey or appears in any 'p' tag of the event.
|
||||||
|
if kind.IsPrivileged(ev.Kind) && len(d.sub.AuthedPubkey) > 0 {
|
||||||
|
pk := d.sub.AuthedPubkey
|
||||||
|
allowed := false
|
||||||
|
// Direct author match
|
||||||
|
if utils.FastEqual(ev.Pubkey, pk) {
|
||||||
|
allowed = true
|
||||||
|
} else if ev.Tags != nil {
|
||||||
|
for _, pTag := range ev.Tags.GetAll([]byte("p")) {
|
||||||
|
// pTag.Value() returns []byte hex string; decode to bytes
|
||||||
|
dec, derr := hex.Dec(string(pTag.Value()))
|
||||||
|
if derr != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if utils.FastEqual(dec, pk) {
|
||||||
|
allowed = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !allowed {
|
||||||
|
// Skip delivery for this subscriber
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var res *eventenvelope.Result
|
||||||
|
if res, err = eventenvelope.NewResultWith(d.id, ev); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Use a separate context with timeout for writes to prevent race conditions
|
||||||
|
// where the publisher context gets cancelled while writing events
|
||||||
|
writeCtx, cancel := context.WithTimeout(
|
||||||
|
context.Background(), DefaultWriteTimeout,
|
||||||
|
)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err = d.w.Write(
|
||||||
|
writeCtx, websocket.MessageText, res.Marshal(nil),
|
||||||
|
); chk.E(err) {
|
||||||
|
// On error, remove the subscriber connection safely
|
||||||
|
p.removeSubscriber(d.w)
|
||||||
|
_ = d.w.CloseNow()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.D.C(
|
||||||
|
func() string {
|
||||||
|
return fmt.Sprintf(
|
||||||
|
"dispatched event %0x to subscription %s, %s",
|
||||||
|
ev.ID, d.id, d.sub.remote,
|
||||||
|
)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// removeSubscriberId removes a specific subscription from a subscriber
|
// removeSubscriberId removes a specific subscription from a subscriber
|
||||||
// websocket.
|
// websocket.
|
||||||
func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
|
func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
|
||||||
p.Mx.Lock()
|
p.Mx.Lock()
|
||||||
|
defer p.Mx.Unlock()
|
||||||
var subs map[string]Subscription
|
var subs map[string]Subscription
|
||||||
var ok bool
|
var ok bool
|
||||||
if subs, ok = p.Map[ws]; ok {
|
if subs, ok = p.Map[ws]; ok {
|
||||||
delete(p.Map[ws], id)
|
delete(subs, id)
|
||||||
_ = subs
|
// Check the actual map after deletion, not the original reference
|
||||||
if len(subs) == 0 {
|
if len(p.Map[ws]) == 0 {
|
||||||
delete(p.Map, ws)
|
delete(p.Map, ws)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
p.Mx.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// removeSubscriber removes a websocket from the P collection.
|
// removeSubscriber removes a websocket from the P collection.
|
||||||
func (p *P) removeSubscriber(ws *websocket.Conn) {
|
func (p *P) removeSubscriber(ws *websocket.Conn) {
|
||||||
p.Mx.Lock()
|
p.Mx.Lock()
|
||||||
|
defer p.Mx.Unlock()
|
||||||
clear(p.Map[ws])
|
clear(p.Map[ws])
|
||||||
delete(p.Map, ws)
|
delete(p.Map, ws)
|
||||||
p.Mx.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,14 +2,12 @@ package app
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"database.orly"
|
"database.orly"
|
||||||
"lol.mleku.dev/chk"
|
"lol.mleku.dev/chk"
|
||||||
"lol.mleku.dev/log"
|
|
||||||
"next.orly.dev/app/config"
|
"next.orly.dev/app/config"
|
||||||
"protocol.orly/publish"
|
"protocol.orly/publish"
|
||||||
)
|
)
|
||||||
@@ -25,11 +23,11 @@ type Server struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
log.T.C(
|
// log.T.C(
|
||||||
func() string {
|
// func() string {
|
||||||
return fmt.Sprintf("path %v header %v", r.URL, r.Header)
|
// return fmt.Sprintf("path %v header %v", r.URL, r.Header)
|
||||||
},
|
// },
|
||||||
)
|
// )
|
||||||
if r.Header.Get("Upgrade") == "websocket" {
|
if r.Header.Get("Upgrade") == "websocket" {
|
||||||
s.HandleWebsocket(w, r)
|
s.HandleWebsocket(w, r)
|
||||||
} else if r.Header.Get("Accept") == "application/nostr+json" {
|
} else if r.Header.Get("Accept") == "application/nostr+json" {
|
||||||
|
|||||||
@@ -28,10 +28,10 @@ func (s *S) Configure(cfg ...any) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *S) GetAccessLevel(pub []byte) (level string) {
|
func (s *S) GetAccessLevel(pub []byte, address string) (level string) {
|
||||||
for _, i := range s.ACL {
|
for _, i := range s.ACL {
|
||||||
if i.Type() == s.Active.Load() {
|
if i.Type() == s.Active.Load() {
|
||||||
level = i.GetAccessLevel(pub)
|
level = i.GetAccessLevel(pub, address)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,13 +45,13 @@ func (f *Follows) Configure(cfg ...any) (err error) {
|
|||||||
for _, ca := range cfg {
|
for _, ca := range cfg {
|
||||||
switch c := ca.(type) {
|
switch c := ca.(type) {
|
||||||
case *config.C:
|
case *config.C:
|
||||||
log.D.F("setting ACL config: %v", c)
|
// log.D.F("setting ACL config: %v", c)
|
||||||
f.cfg = c
|
f.cfg = c
|
||||||
case *database.D:
|
case *database.D:
|
||||||
log.D.F("setting ACL database: %s", c.Path())
|
// log.D.F("setting ACL database: %s", c.Path())
|
||||||
f.D = c
|
f.D = c
|
||||||
case context.Context:
|
case context.Context:
|
||||||
log.D.F("setting ACL context: %s", c.Value("id"))
|
// log.D.F("setting ACL context: %s", c.Value("id"))
|
||||||
f.Ctx = c
|
f.Ctx = c
|
||||||
default:
|
default:
|
||||||
err = errorf.E("invalid type: %T", reflect.TypeOf(ca))
|
err = errorf.E("invalid type: %T", reflect.TypeOf(ca))
|
||||||
@@ -64,13 +64,15 @@ func (f *Follows) Configure(cfg ...any) (err error) {
|
|||||||
// find admin follow lists
|
// find admin follow lists
|
||||||
f.followsMx.Lock()
|
f.followsMx.Lock()
|
||||||
defer f.followsMx.Unlock()
|
defer f.followsMx.Unlock()
|
||||||
log.I.F("finding admins")
|
// log.I.F("finding admins")
|
||||||
f.follows, f.admins = nil, nil
|
f.follows, f.admins = nil, nil
|
||||||
for _, admin := range f.cfg.Admins {
|
for _, admin := range f.cfg.Admins {
|
||||||
log.I.F("%s", admin)
|
// log.I.F("%s", admin)
|
||||||
var adm []byte
|
var adm []byte
|
||||||
if adm, err = bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(err) {
|
if a, e := bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(e) {
|
||||||
continue
|
continue
|
||||||
|
} else {
|
||||||
|
adm = a
|
||||||
}
|
}
|
||||||
log.I.F("admin: %0x", adm)
|
log.I.F("admin: %0x", adm)
|
||||||
f.admins = append(f.admins, adm)
|
f.admins = append(f.admins, adm)
|
||||||
@@ -96,12 +98,14 @@ func (f *Follows) Configure(cfg ...any) (err error) {
|
|||||||
if ev, err = f.D.FetchEventBySerial(s); chk.E(err) {
|
if ev, err = f.D.FetchEventBySerial(s); chk.E(err) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.I.F("admin follow list:\n%s", ev.Serialize())
|
// log.I.F("admin follow list:\n%s", ev.Serialize())
|
||||||
for _, v := range ev.Tags.GetAll([]byte("p")) {
|
for _, v := range ev.Tags.GetAll([]byte("p")) {
|
||||||
log.I.F("adding follow: %s", v.Value())
|
// log.I.F("adding follow: %s", v.Value())
|
||||||
var a []byte
|
var a []byte
|
||||||
if a, err = hex.Dec(string(v.Value())); chk.E(err) {
|
if b, e := hex.Dec(string(v.Value())); chk.E(e) {
|
||||||
continue
|
continue
|
||||||
|
} else {
|
||||||
|
a = b
|
||||||
}
|
}
|
||||||
f.follows = append(f.follows, a)
|
f.follows = append(f.follows, a)
|
||||||
}
|
}
|
||||||
@@ -116,7 +120,7 @@ func (f *Follows) Configure(cfg ...any) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Follows) GetAccessLevel(pub []byte) (level string) {
|
func (f *Follows) GetAccessLevel(pub []byte, address string) (level string) {
|
||||||
if f.cfg == nil {
|
if f.cfg == nil {
|
||||||
return "write"
|
return "write"
|
||||||
}
|
}
|
||||||
@@ -203,7 +207,7 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
|
|||||||
log.W.F("follows syncer: no admin relays found in DB (kind 10002)")
|
log.W.F("follows syncer: no admin relays found in DB (kind 10002)")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.I.F(
|
log.T.F(
|
||||||
"follows syncer: subscribing to %d relays for %d authors", len(urls),
|
"follows syncer: subscribing to %d relays for %d authors", len(urls),
|
||||||
len(authors),
|
len(authors),
|
||||||
)
|
)
|
||||||
@@ -240,13 +244,13 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
*ff = append(*ff, f1)
|
*ff = append(*ff, f1)
|
||||||
req := reqenvelope.NewFrom([]byte("follows-sync"), ff)
|
req := reqenvelope.NewFrom([]byte("follows-sync"), ff)
|
||||||
if err := c.Write(
|
if err = c.Write(
|
||||||
ctx, websocket.MessageText, req.Marshal(nil),
|
ctx, websocket.MessageText, req.Marshal(nil),
|
||||||
); chk.E(err) {
|
); chk.E(err) {
|
||||||
_ = c.Close(websocket.StatusInternalError, "write failed")
|
_ = c.Close(websocket.StatusInternalError, "write failed")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.I.F("sent REQ to %s for follows subscription", u)
|
log.T.F("sent REQ to %s for follows subscription", u)
|
||||||
// read loop
|
// read loop
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@@ -274,11 +278,11 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
|
|||||||
if ok, err := res.Event.Verify(); chk.T(err) || !ok {
|
if ok, err := res.Event.Verify(); chk.T(err) || !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if _, _, err := f.D.SaveEvent(
|
if _, _, err = f.D.SaveEvent(
|
||||||
ctx, res.Event,
|
ctx, res.Event,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
if !strings.HasPrefix(
|
if !strings.HasPrefix(
|
||||||
err.Error(), "event already exists",
|
err.Error(), "blocked:",
|
||||||
) {
|
) {
|
||||||
log.W.F(
|
log.W.F(
|
||||||
"follows syncer: save event failed: %v",
|
"follows syncer: save event failed: %v",
|
||||||
@@ -333,6 +337,7 @@ func (f *Follows) Syncer() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
f.updated <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ type None struct{}
|
|||||||
|
|
||||||
func (n None) Configure(cfg ...any) (err error) { return }
|
func (n None) Configure(cfg ...any) (err error) { return }
|
||||||
|
|
||||||
func (n None) GetAccessLevel(pub []byte) (level string) {
|
func (n None) GetAccessLevel(pub []byte, address string) (level string) {
|
||||||
return "write"
|
return "write"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ import (
|
|||||||
type Signer struct {
|
type Signer struct {
|
||||||
SecretKey *secp256k1.SecretKey
|
SecretKey *secp256k1.SecretKey
|
||||||
PublicKey *secp256k1.PublicKey
|
PublicKey *secp256k1.PublicKey
|
||||||
BTCECSec *ec.SecretKey
|
BTCECSec *secp256k1.SecretKey
|
||||||
pkb, skb []byte
|
pkb, skb []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -23,11 +23,11 @@ var _ signer.I = &Signer{}
|
|||||||
|
|
||||||
// Generate creates a new Signer.
|
// Generate creates a new Signer.
|
||||||
func (s *Signer) Generate() (err error) {
|
func (s *Signer) Generate() (err error) {
|
||||||
if s.SecretKey, err = ec.NewSecretKey(); chk.E(err) {
|
if s.SecretKey, err = secp256k1.GenerateSecretKey(); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.skb = s.SecretKey.Serialize()
|
s.skb = s.SecretKey.Serialize()
|
||||||
s.BTCECSec, _ = ec.PrivKeyFromBytes(s.skb)
|
s.BTCECSec = secp256k1.PrivKeyFromBytes(s.skb)
|
||||||
s.PublicKey = s.SecretKey.PubKey()
|
s.PublicKey = s.SecretKey.PubKey()
|
||||||
s.pkb = schnorr.SerializePubKey(s.PublicKey)
|
s.pkb = schnorr.SerializePubKey(s.PublicKey)
|
||||||
return
|
return
|
||||||
@@ -43,7 +43,7 @@ func (s *Signer) InitSec(sec []byte) (err error) {
|
|||||||
s.SecretKey = secp256k1.SecKeyFromBytes(sec)
|
s.SecretKey = secp256k1.SecKeyFromBytes(sec)
|
||||||
s.PublicKey = s.SecretKey.PubKey()
|
s.PublicKey = s.SecretKey.PubKey()
|
||||||
s.pkb = schnorr.SerializePubKey(s.PublicKey)
|
s.pkb = schnorr.SerializePubKey(s.PublicKey)
|
||||||
s.BTCECSec, _ = ec.PrivKeyFromBytes(s.skb)
|
s.BTCECSec = secp256k1.PrivKeyFromBytes(s.skb)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -142,7 +142,7 @@ func (s *Signer) ECDH(pubkeyBytes []byte) (secret []byte, err error) {
|
|||||||
); chk.E(err) {
|
); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
secret = ec.GenerateSharedSecret(s.BTCECSec, pub)
|
secret = secp256k1.GenerateSharedSecret(s.BTCECSec, pub)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -154,7 +154,7 @@ type Keygen struct {
|
|||||||
// Generate a new key pair. If the result is suitable, the embedded Signer can have its contents
|
// Generate a new key pair. If the result is suitable, the embedded Signer can have its contents
|
||||||
// extracted.
|
// extracted.
|
||||||
func (k *Keygen) Generate() (pubBytes []byte, err error) {
|
func (k *Keygen) Generate() (pubBytes []byte, err error) {
|
||||||
if k.Signer.SecretKey, err = ec.NewSecretKey(); chk.E(err) {
|
if k.Signer.SecretKey, err = secp256k1.GenerateSecretKey(); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
k.Signer.PublicKey = k.SecretKey.PubKey()
|
k.Signer.PublicKey = k.SecretKey.PubKey()
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
types2 "database.orly/indexes/types"
|
types2 "database.orly/indexes/types"
|
||||||
"encoders.orly/filter"
|
"encoders.orly/filter"
|
||||||
"lol.mleku.dev/chk"
|
"lol.mleku.dev/chk"
|
||||||
|
"lol.mleku.dev/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Range struct {
|
type Range struct {
|
||||||
@@ -89,12 +90,20 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
buf := new(bytes.Buffer)
|
buf := new(bytes.Buffer)
|
||||||
|
// Create an index prefix without the serial number
|
||||||
idx := indexes.IdEnc(i, nil)
|
idx := indexes.IdEnc(i, nil)
|
||||||
if err = idx.MarshalWrite(buf); chk.E(err) {
|
if err = idx.MarshalWrite(buf); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
b := buf.Bytes()
|
b := buf.Bytes()
|
||||||
r := Range{b, b}
|
// Create range that will match any serial value with this ID prefix
|
||||||
|
end := make([]byte, len(b))
|
||||||
|
copy(end, b)
|
||||||
|
// Fill the end range with 0xff bytes to match all possible serial values
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
end = append(end, 0xff)
|
||||||
|
}
|
||||||
|
r := Range{b, end}
|
||||||
idxs = append(idxs, r)
|
idxs = append(idxs, r)
|
||||||
return
|
return
|
||||||
}(); chk.E(err) {
|
}(); chk.E(err) {
|
||||||
@@ -230,6 +239,7 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
|
|||||||
for _, t := range *f.Tags {
|
for _, t := range *f.Tags {
|
||||||
if t.Len() >= 2 && (len(t.Key()) == 1 || (len(t.Key()) == 2 && t.Key()[0] == '#')) {
|
if t.Len() >= 2 && (len(t.Key()) == 1 || (len(t.Key()) == 2 && t.Key()[0] == '#')) {
|
||||||
var p *types2.PubHash
|
var p *types2.PubHash
|
||||||
|
log.I.S(author)
|
||||||
if p, err = CreatePubHashFromData(author); chk.E(err) {
|
if p, err = CreatePubHashFromData(author); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -306,8 +316,8 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
|
|||||||
for _, author := range f.Authors.T {
|
for _, author := range f.Authors.T {
|
||||||
kind := new(types2.Uint16)
|
kind := new(types2.Uint16)
|
||||||
kind.Set(k)
|
kind.Set(k)
|
||||||
p := new(types2.PubHash)
|
var p *types2.PubHash
|
||||||
if err = p.FromPubkey(author); chk.E(err) {
|
if p, err = CreatePubHashFromData(author); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
start, end := new(bytes.Buffer), new(bytes.Buffer)
|
start, end := new(bytes.Buffer), new(bytes.Buffer)
|
||||||
@@ -351,8 +361,9 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
|
|||||||
// Pubkey pc-
|
// Pubkey pc-
|
||||||
if f.Authors != nil && f.Authors.Len() > 0 {
|
if f.Authors != nil && f.Authors.Len() > 0 {
|
||||||
for _, author := range f.Authors.T {
|
for _, author := range f.Authors.T {
|
||||||
p := new(types2.PubHash)
|
var p *types2.PubHash
|
||||||
if err = p.FromPubkey(author); chk.E(err) {
|
log.I.S(author)
|
||||||
|
if p, err = CreatePubHashFromData(author); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
start, end := new(bytes.Buffer), new(bytes.Buffer)
|
start, end := new(bytes.Buffer), new(bytes.Buffer)
|
||||||
|
|||||||
@@ -5,20 +5,33 @@ import (
|
|||||||
|
|
||||||
"database.orly/indexes/types"
|
"database.orly/indexes/types"
|
||||||
"encoders.orly/filter"
|
"encoders.orly/filter"
|
||||||
|
"encoders.orly/hex"
|
||||||
"encoders.orly/tag"
|
"encoders.orly/tag"
|
||||||
"github.com/dgraph-io/badger/v4"
|
"github.com/dgraph-io/badger/v4"
|
||||||
"lol.mleku.dev/chk"
|
"lol.mleku.dev/chk"
|
||||||
"lol.mleku.dev/errorf"
|
"lol.mleku.dev/errorf"
|
||||||
|
"lol.mleku.dev/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
|
func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
|
||||||
|
log.T.F("GetSerialById: input id=%s", hex.Enc(id))
|
||||||
var idxs []Range
|
var idxs []Range
|
||||||
if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.NewFromBytesSlice(id)}); chk.E(err) {
|
if idxs, err = GetIndexesFromFilter(&filter.F{Ids: tag.NewFromBytesSlice(id)}); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for i, idx := range idxs {
|
||||||
|
log.T.F(
|
||||||
|
"GetSerialById: searching range %d: start=%x, end=%x", i, idx.Start,
|
||||||
|
idx.End,
|
||||||
|
)
|
||||||
|
}
|
||||||
if len(idxs) == 0 {
|
if len(idxs) == 0 {
|
||||||
err = errorf.E("no indexes found for id %0x", id)
|
err = errorf.E("no indexes found for id %0x", id)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
idFound := false
|
||||||
if err = d.View(
|
if err = d.View(
|
||||||
func(txn *badger.Txn) (err error) {
|
func(txn *badger.Txn) (err error) {
|
||||||
it := txn.NewIterator(badger.DefaultIteratorOptions)
|
it := txn.NewIterator(badger.DefaultIteratorOptions)
|
||||||
@@ -33,15 +46,24 @@ func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) {
|
|||||||
if err = ser.UnmarshalRead(buf); chk.E(err) {
|
if err = ser.UnmarshalRead(buf); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
idFound = true
|
||||||
} else {
|
} else {
|
||||||
// just don't return what we don't have? others may be
|
// Item not found in database
|
||||||
// found tho.
|
log.T.F(
|
||||||
|
"GetSerialById: ID not found in database: %s", hex.Enc(id),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
},
|
},
|
||||||
); chk.E(err) {
|
); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !idFound {
|
||||||
|
err = errorf.T("id not found in database: %s", hex.Enc(id))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -20,7 +20,15 @@ func (d *D) GetSerialsByRange(idx Range) (
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
defer it.Close()
|
defer it.Close()
|
||||||
for it.Seek(idx.End); it.Valid(); it.Next() {
|
// Start from a position that includes the end boundary (until timestamp)
|
||||||
|
// We create an end boundary that's slightly beyond the actual end to ensure inclusivity
|
||||||
|
endBoundary := make([]byte, len(idx.End))
|
||||||
|
copy(endBoundary, idx.End)
|
||||||
|
// Add 0xff bytes to ensure we capture all events at the exact until timestamp
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
endBoundary = append(endBoundary, 0xff)
|
||||||
|
}
|
||||||
|
for it.Seek(endBoundary); it.Valid(); it.Next() {
|
||||||
item := it.Item()
|
item := it.Item()
|
||||||
var key []byte
|
var key []byte
|
||||||
key = item.Key()
|
key = item.Key()
|
||||||
|
|||||||
@@ -15,10 +15,13 @@ const PubHashLen = 8
|
|||||||
type PubHash struct{ val [PubHashLen]byte }
|
type PubHash struct{ val [PubHashLen]byte }
|
||||||
|
|
||||||
func (ph *PubHash) FromPubkey(pk []byte) (err error) {
|
func (ph *PubHash) FromPubkey(pk []byte) (err error) {
|
||||||
|
if len(pk) == 0 {
|
||||||
|
panic("nil pubkey")
|
||||||
|
}
|
||||||
if len(pk) != schnorr.PubKeyBytesLen {
|
if len(pk) != schnorr.PubKeyBytesLen {
|
||||||
err = errorf.E(
|
err = errorf.E(
|
||||||
"invalid Pubkey length, got %d require %d",
|
"invalid Pubkey length, got %d require %d %0x",
|
||||||
len(pk), schnorr.PubKeyBytesLen,
|
len(pk), schnorr.PubKeyBytesLen, pk,
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"crypto.orly/sha256"
|
"crypto.orly/sha256"
|
||||||
@@ -42,23 +43,71 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
|
|||||||
var expDeletes types.Uint40s
|
var expDeletes types.Uint40s
|
||||||
var expEvs event.S
|
var expEvs event.S
|
||||||
if f.Ids != nil && f.Ids.Len() > 0 {
|
if f.Ids != nil && f.Ids.Len() > 0 {
|
||||||
|
for _, id := range f.Ids.T {
|
||||||
|
log.T.F("QueryEvents: looking for ID=%s", hex.Enc(id))
|
||||||
|
}
|
||||||
|
log.T.F("QueryEvents: ids path, count=%d", f.Ids.Len())
|
||||||
for _, idx := range f.Ids.T {
|
for _, idx := range f.Ids.T {
|
||||||
|
log.T.F("QueryEvents: lookup id=%s", hex.Enc(idx))
|
||||||
// we know there is only Ids in this, so run the ID query and fetch.
|
// we know there is only Ids in this, so run the ID query and fetch.
|
||||||
var ser *types.Uint40
|
var ser *types.Uint40
|
||||||
if ser, err = d.GetSerialById(idx); chk.E(err) {
|
var idErr error
|
||||||
|
if ser, idErr = d.GetSerialById(idx); idErr != nil {
|
||||||
|
// Check if this is a "not found" error which is expected for IDs we don't have
|
||||||
|
if strings.Contains(idErr.Error(), "id not found in database") {
|
||||||
|
log.T.F(
|
||||||
|
"QueryEvents: ID not found in database: %s",
|
||||||
|
hex.Enc(idx),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
// Log unexpected errors but continue processing other IDs
|
||||||
|
log.E.F(
|
||||||
|
"QueryEvents: error looking up id=%s err=%v",
|
||||||
|
hex.Enc(idx), idErr,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Check if the serial is nil, which indicates the ID wasn't found
|
||||||
|
if ser == nil {
|
||||||
|
log.T.F("QueryEvents: Serial is nil for ID: %s", hex.Enc(idx))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// fetch the events
|
// fetch the events
|
||||||
var ev *event.E
|
var ev *event.E
|
||||||
if ev, err = d.FetchEventBySerial(ser); err != nil {
|
if ev, err = d.FetchEventBySerial(ser); err != nil {
|
||||||
|
log.T.F(
|
||||||
|
"QueryEvents: fetch by serial failed for id=%s ser=%v err=%v",
|
||||||
|
hex.Enc(idx), ser, err,
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
log.T.F(
|
||||||
|
"QueryEvents: found id=%s kind=%d created_at=%d",
|
||||||
|
hex.Enc(ev.ID), ev.Kind, ev.CreatedAt,
|
||||||
|
)
|
||||||
// check for an expiration tag and delete after returning the result
|
// check for an expiration tag and delete after returning the result
|
||||||
if CheckExpiration(ev) {
|
if CheckExpiration(ev) {
|
||||||
|
log.T.F(
|
||||||
|
"QueryEvents: id=%s filtered out due to expiration",
|
||||||
|
hex.Enc(ev.ID),
|
||||||
|
)
|
||||||
expDeletes = append(expDeletes, ser)
|
expDeletes = append(expDeletes, ser)
|
||||||
expEvs = append(expEvs, ev)
|
expEvs = append(expEvs, ev)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// skip events that have been deleted by a proper deletion event
|
||||||
|
if derr := d.CheckForDeleted(ev, nil); derr != nil {
|
||||||
|
log.T.F(
|
||||||
|
"QueryEvents: id=%s filtered out due to deletion: %v",
|
||||||
|
hex.Enc(ev.ID), derr,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.T.F(
|
||||||
|
"QueryEvents: id=%s SUCCESSFULLY FOUND, adding to results",
|
||||||
|
hex.Enc(ev.ID),
|
||||||
|
)
|
||||||
evs = append(evs, ev)
|
evs = append(evs, ev)
|
||||||
}
|
}
|
||||||
// sort the events by timestamp
|
// sort the events by timestamp
|
||||||
@@ -68,10 +117,15 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
|
// non-IDs path
|
||||||
var idPkTs []*store.IdPkTs
|
var idPkTs []*store.IdPkTs
|
||||||
|
// if f.Authors != nil && f.Authors.Len() > 0 && f.Kinds != nil && f.Kinds.Len() > 0 {
|
||||||
|
// log.T.F("QueryEvents: authors+kinds path, authors=%d kinds=%d", f.Authors.Len(), f.Kinds.Len())
|
||||||
|
// }
|
||||||
if idPkTs, err = d.QueryForIds(c, f); chk.E(err) {
|
if idPkTs, err = d.QueryForIds(c, f); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// log.T.F("QueryEvents: QueryForIds returned %d candidates", len(idPkTs))
|
||||||
// Create a map to store the latest version of replaceable events
|
// Create a map to store the latest version of replaceable events
|
||||||
replaceableEvents := make(map[string]*event.E)
|
replaceableEvents := make(map[string]*event.E)
|
||||||
// Create a map to store the latest version of parameterized replaceable
|
// Create a map to store the latest version of parameterized replaceable
|
||||||
@@ -83,8 +137,8 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
|
|||||||
// events)
|
// events)
|
||||||
deletionsByKindPubkey := make(map[string]bool)
|
deletionsByKindPubkey := make(map[string]bool)
|
||||||
// Map to track deletion events by kind, pubkey, and d-tag (for
|
// Map to track deletion events by kind, pubkey, and d-tag (for
|
||||||
// parameterized replaceable events)
|
// parameterized replaceable events). We store the newest delete timestamp per d-tag.
|
||||||
deletionsByKindPubkeyDTag := make(map[string]map[string]bool)
|
deletionsByKindPubkeyDTag := make(map[string]map[string]int64)
|
||||||
// Map to track specific event IDs that have been deleted
|
// Map to track specific event IDs that have been deleted
|
||||||
deletedEventIds := make(map[string]bool)
|
deletedEventIds := make(map[string]bool)
|
||||||
// Query for deletion events separately if we have authors in the filter
|
// Query for deletion events separately if we have authors in the filter
|
||||||
@@ -169,11 +223,13 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
|
|||||||
key := hex.Enc(pk) + ":" + strconv.Itoa(int(kk.K))
|
key := hex.Enc(pk) + ":" + strconv.Itoa(int(kk.K))
|
||||||
// Initialize the inner map if it doesn't exist
|
// Initialize the inner map if it doesn't exist
|
||||||
if _, exists := deletionsByKindPubkeyDTag[key]; !exists {
|
if _, exists := deletionsByKindPubkeyDTag[key]; !exists {
|
||||||
deletionsByKindPubkeyDTag[key] = make(map[string]bool)
|
deletionsByKindPubkeyDTag[key] = make(map[string]int64)
|
||||||
}
|
}
|
||||||
// Mark this d-tag as deleted
|
// Record the newest delete timestamp for this d-tag
|
||||||
dValue := string(split[2])
|
dValue := string(split[2])
|
||||||
deletionsByKindPubkeyDTag[key][dValue] = true
|
if ts, ok := deletionsByKindPubkeyDTag[key][dValue]; !ok || ev.CreatedAt > ts {
|
||||||
|
deletionsByKindPubkeyDTag[key][dValue] = ev.CreatedAt
|
||||||
|
}
|
||||||
// Debug logging
|
// Debug logging
|
||||||
}
|
}
|
||||||
// For replaceable events, we need to check if there are any
|
// For replaceable events, we need to check if there are any
|
||||||
@@ -225,15 +281,16 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
|
|||||||
}
|
}
|
||||||
// Initialize the inner map if it doesn't exist
|
// Initialize the inner map if it doesn't exist
|
||||||
if _, exists := deletionsByKindPubkeyDTag[key]; !exists {
|
if _, exists := deletionsByKindPubkeyDTag[key]; !exists {
|
||||||
deletionsByKindPubkeyDTag[key] = make(map[string]bool)
|
deletionsByKindPubkeyDTag[key] = make(map[string]int64)
|
||||||
|
}
|
||||||
|
// Record the newest delete timestamp for this d-tag
|
||||||
|
if ts, ok := deletionsByKindPubkeyDTag[key][dValue]; !ok || ev.CreatedAt > ts {
|
||||||
|
deletionsByKindPubkeyDTag[key][dValue] = ev.CreatedAt
|
||||||
}
|
}
|
||||||
// Mark this d-tag as deleted
|
|
||||||
deletionsByKindPubkeyDTag[key][dValue] = true
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Second pass: process all events, filtering out deleted ones
|
// Second pass: process all events, filtering out deleted ones
|
||||||
for _, idpk := range idPkTs {
|
for _, idpk := range idPkTs {
|
||||||
var ev *event.E
|
var ev *event.E
|
||||||
@@ -244,6 +301,83 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
|
|||||||
if ev, err = d.FetchEventBySerial(ser); err != nil {
|
if ev, err = d.FetchEventBySerial(ser); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add logging for tag filter debugging
|
||||||
|
if f.Tags != nil && f.Tags.Len() > 0 {
|
||||||
|
var eventTags []string
|
||||||
|
if ev.Tags != nil && ev.Tags.Len() > 0 {
|
||||||
|
for _, t := range *ev.Tags {
|
||||||
|
if t.Len() >= 2 {
|
||||||
|
eventTags = append(
|
||||||
|
eventTags,
|
||||||
|
string(t.Key())+"="+string(t.Value()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// log.T.F(
|
||||||
|
// "QueryEvents: processing event ID=%s kind=%d tags=%v",
|
||||||
|
// hex.Enc(ev.ID), ev.Kind, eventTags,
|
||||||
|
// )
|
||||||
|
// Check if this event matches ALL required tags in the filter
|
||||||
|
tagMatches := 0
|
||||||
|
for _, filterTag := range *f.Tags {
|
||||||
|
if filterTag.Len() >= 2 {
|
||||||
|
filterKey := filterTag.Key()
|
||||||
|
// Handle filter keys that start with # (remove the prefix for comparison)
|
||||||
|
var actualKey []byte
|
||||||
|
if len(filterKey) == 2 && filterKey[0] == '#' {
|
||||||
|
actualKey = filterKey[1:]
|
||||||
|
} else {
|
||||||
|
actualKey = filterKey
|
||||||
|
}
|
||||||
|
// Check if event has this tag key with any of the filter's values
|
||||||
|
eventHasTag := false
|
||||||
|
if ev.Tags != nil {
|
||||||
|
for _, eventTag := range *ev.Tags {
|
||||||
|
if eventTag.Len() >= 2 && bytes.Equal(
|
||||||
|
eventTag.Key(), actualKey,
|
||||||
|
) {
|
||||||
|
// Check if the event's tag value matches any of the filter's values
|
||||||
|
for _, filterValue := range filterTag.T[1:] {
|
||||||
|
if bytes.Equal(
|
||||||
|
eventTag.Value(), filterValue,
|
||||||
|
) {
|
||||||
|
eventHasTag = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if eventHasTag {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if eventHasTag {
|
||||||
|
tagMatches++
|
||||||
|
}
|
||||||
|
// log.T.F(
|
||||||
|
// "QueryEvents: tag filter %s (actual key: %s) matches: %v (total matches: %d/%d)",
|
||||||
|
// string(filterKey), string(actualKey), eventHasTag,
|
||||||
|
// tagMatches, f.Tags.Len(),
|
||||||
|
// )
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If not all tags match, skip this event
|
||||||
|
if tagMatches < f.Tags.Len() {
|
||||||
|
// log.T.F(
|
||||||
|
// "QueryEvents: event ID=%s SKIPPED - only matches %d/%d required tags",
|
||||||
|
// hex.Enc(ev.ID), tagMatches, f.Tags.Len(),
|
||||||
|
// )
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// log.T.F(
|
||||||
|
// "QueryEvents: event ID=%s PASSES all tag filters",
|
||||||
|
// hex.Enc(ev.ID),
|
||||||
|
// )
|
||||||
|
}
|
||||||
|
|
||||||
// Skip events with kind 5 (Deletion)
|
// Skip events with kind 5 (Deletion)
|
||||||
if ev.Kind == kind.Deletion.K {
|
if ev.Kind == kind.Deletion.K {
|
||||||
continue
|
continue
|
||||||
@@ -308,10 +442,10 @@ func (d *D) QueryEvents(c context.Context, f *filter.F) (
|
|||||||
|
|
||||||
// Check if this event has been deleted via an a-tag
|
// Check if this event has been deleted via an a-tag
|
||||||
if deletionMap, exists := deletionsByKindPubkeyDTag[key]; exists {
|
if deletionMap, exists := deletionsByKindPubkeyDTag[key]; exists {
|
||||||
// If the d-tag value is in the deletion map and this event
|
// If there is a deletion timestamp and this event is older than the deletion,
|
||||||
// is not specifically requested by ID, skip it
|
// and this event is not specifically requested by ID, skip it
|
||||||
if deletionMap[dValue] && !isIdInFilter {
|
if delTs, ok := deletionMap[dValue]; ok && ev.CreatedAt < delTs && !isIdInFilter {
|
||||||
log.T.F("Debug: Event deleted - skipping")
|
log.T.F("Debug: Event deleted by a-tag (older than delete) - skipping")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -199,25 +199,12 @@ func (d *D) CheckForDeleted(ev *event.E, admins [][]byte) (err error) {
|
|||||||
sers = append(sers, s...)
|
sers = append(sers, s...)
|
||||||
}
|
}
|
||||||
if len(sers) > 0 {
|
if len(sers) > 0 {
|
||||||
var idPkTss []*store.IdPkTs
|
// For e-tag deletions (delete by ID), any deletion event means the event cannot be resubmitted
|
||||||
var tmp []*store.IdPkTs
|
// regardless of timestamp, since it's a specific deletion of this exact event
|
||||||
if tmp, err = d.GetFullIdPubkeyBySerials(sers); chk.E(err) {
|
err = errorf.E(
|
||||||
return
|
"blocked: %0x was deleted by ID and cannot be resubmitted",
|
||||||
}
|
ev.ID,
|
||||||
idPkTss = append(idPkTss, tmp...)
|
|
||||||
// sort by timestamp, so the first is the newest
|
|
||||||
sort.Slice(
|
|
||||||
idPkTss, func(i, j int) bool {
|
|
||||||
return idPkTss[i].Ts > idPkTss[j].Ts
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
if ev.CreatedAt < idPkTss[0].Ts {
|
|
||||||
err = errorf.E(
|
|
||||||
"blocked: %0x was deleted because it is older than the delete: event: %d delete: %d",
|
|
||||||
ev.ID, ev.CreatedAt, idPkTss[0].Ts,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,11 +3,14 @@ package database
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"database.orly/indexes"
|
"database.orly/indexes"
|
||||||
"database.orly/indexes/types"
|
"database.orly/indexes/types"
|
||||||
"encoders.orly/event"
|
"encoders.orly/event"
|
||||||
"encoders.orly/filter"
|
"encoders.orly/filter"
|
||||||
|
"encoders.orly/hex"
|
||||||
"encoders.orly/kind"
|
"encoders.orly/kind"
|
||||||
"encoders.orly/tag"
|
"encoders.orly/tag"
|
||||||
"github.com/dgraph-io/badger/v4"
|
"github.com/dgraph-io/badger/v4"
|
||||||
@@ -42,12 +45,32 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
|
|||||||
// check if the event already exists
|
// check if the event already exists
|
||||||
var ser *types.Uint40
|
var ser *types.Uint40
|
||||||
if ser, err = d.GetSerialById(ev.ID); err == nil && ser != nil {
|
if ser, err = d.GetSerialById(ev.ID); err == nil && ser != nil {
|
||||||
err = errorf.E("event already exists: %0x", ev.ID)
|
err = errorf.E("blocked: event already exists: %0x", ev.ID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the error is "id not found", we can proceed with saving the event
|
||||||
|
if err != nil && strings.Contains(err.Error(), "id not found in database") {
|
||||||
|
// Reset error since this is expected for new events
|
||||||
|
err = nil
|
||||||
|
} else if err != nil {
|
||||||
|
// For any other error, return it
|
||||||
|
log.E.F("error checking if event exists: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the event has been deleted before allowing resubmission
|
||||||
|
if err = d.CheckForDeleted(ev, nil); err != nil {
|
||||||
|
// log.I.F(
|
||||||
|
// "SaveEvent: rejecting resubmission of deleted event ID=%s: %v",
|
||||||
|
// hex.Enc(ev.ID), err,
|
||||||
|
// )
|
||||||
|
err = errorf.E("blocked: %s", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// check for replacement
|
// check for replacement
|
||||||
if kind.IsReplaceable(ev.Kind) {
|
if kind.IsReplaceable(ev.Kind) {
|
||||||
// find the events and delete them
|
// find the events and check timestamps before deleting
|
||||||
f := &filter.F{
|
f := &filter.F{
|
||||||
Authors: tag.NewFromBytesSlice(ev.Pubkey),
|
Authors: tag.NewFromBytesSlice(ev.Pubkey),
|
||||||
Kinds: kind.NewS(kind.New(ev.Kind)),
|
Kinds: kind.NewS(kind.New(ev.Kind)),
|
||||||
@@ -56,22 +79,50 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
|
|||||||
if sers, err = d.GetSerialsFromFilter(f); chk.E(err) {
|
if sers, err = d.GetSerialsFromFilter(f); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// if found, delete them
|
// if found, check timestamps before deleting
|
||||||
if len(sers) > 0 {
|
if len(sers) > 0 {
|
||||||
|
var shouldReplace bool = true
|
||||||
for _, s := range sers {
|
for _, s := range sers {
|
||||||
var oldEv *event.E
|
var oldEv *event.E
|
||||||
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
|
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err = d.DeleteEventBySerial(
|
// Only replace if the new event is newer or same timestamp
|
||||||
c, s, oldEv,
|
if ev.CreatedAt < oldEv.CreatedAt {
|
||||||
); chk.E(err) {
|
log.I.F(
|
||||||
continue
|
"SaveEvent: rejecting older replaceable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)",
|
||||||
|
hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID),
|
||||||
|
oldEv.CreatedAt,
|
||||||
|
)
|
||||||
|
shouldReplace = false
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if shouldReplace {
|
||||||
|
for _, s := range sers {
|
||||||
|
var oldEv *event.E
|
||||||
|
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.I.F(
|
||||||
|
"SaveEvent: replacing older replaceable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)",
|
||||||
|
hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID),
|
||||||
|
ev.CreatedAt,
|
||||||
|
)
|
||||||
|
if err = d.DeleteEventBySerial(
|
||||||
|
c, s, oldEv,
|
||||||
|
); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Don't save the older event - return an error
|
||||||
|
err = errorf.E("blocked: event is older than existing replaceable event")
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if kind.IsParameterizedReplaceable(ev.Kind) {
|
} else if kind.IsParameterizedReplaceable(ev.Kind) {
|
||||||
// find the events and delete them
|
// find the events and check timestamps before deleting
|
||||||
dTag := ev.Tags.GetFirst([]byte("d"))
|
dTag := ev.Tags.GetFirst([]byte("d"))
|
||||||
if dTag == nil {
|
if dTag == nil {
|
||||||
err = errorf.E("event is missing a d tag identifier")
|
err = errorf.E("event is missing a d tag identifier")
|
||||||
@@ -88,19 +139,47 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
|
|||||||
if sers, err = d.GetSerialsFromFilter(f); chk.E(err) {
|
if sers, err = d.GetSerialsFromFilter(f); chk.E(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// if found, delete them
|
// if found, check timestamps before deleting
|
||||||
if len(sers) > 0 {
|
if len(sers) > 0 {
|
||||||
|
var shouldReplace bool = true
|
||||||
for _, s := range sers {
|
for _, s := range sers {
|
||||||
var oldEv *event.E
|
var oldEv *event.E
|
||||||
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
|
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err = d.DeleteEventBySerial(
|
// Only replace if the new event is newer or same timestamp
|
||||||
c, s, oldEv,
|
if ev.CreatedAt < oldEv.CreatedAt {
|
||||||
); chk.E(err) {
|
log.I.F(
|
||||||
continue
|
"SaveEvent: rejecting older addressable event ID=%s (created_at=%d) - existing event ID=%s (created_at=%d)",
|
||||||
|
hex.Enc(ev.ID), ev.CreatedAt, hex.Enc(oldEv.ID),
|
||||||
|
oldEv.CreatedAt,
|
||||||
|
)
|
||||||
|
shouldReplace = false
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if shouldReplace {
|
||||||
|
for _, s := range sers {
|
||||||
|
var oldEv *event.E
|
||||||
|
if oldEv, err = d.FetchEventBySerial(s); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.I.F(
|
||||||
|
"SaveEvent: replacing older addressable event ID=%s (created_at=%d) with newer event ID=%s (created_at=%d)",
|
||||||
|
hex.Enc(oldEv.ID), oldEv.CreatedAt, hex.Enc(ev.ID),
|
||||||
|
ev.CreatedAt,
|
||||||
|
)
|
||||||
|
if err = d.DeleteEventBySerial(
|
||||||
|
c, s, oldEv,
|
||||||
|
); chk.E(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Don't save the older event - return an error
|
||||||
|
err = errorf.E("blocked: event is older than existing addressable event")
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Get the next sequence number for the event
|
// Get the next sequence number for the event
|
||||||
@@ -153,6 +232,14 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
|
|||||||
return
|
return
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
log.T.F("total data written: %d bytes keys %d bytes values", kc, vc)
|
log.T.F(
|
||||||
|
"total data written: %d bytes keys %d bytes values for event ID %s", kc,
|
||||||
|
vc, hex.Enc(ev.ID),
|
||||||
|
)
|
||||||
|
log.T.C(
|
||||||
|
func() string {
|
||||||
|
return fmt.Sprintf("event:\n%s\n", ev.Serialize())
|
||||||
|
},
|
||||||
|
)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ import (
|
|||||||
"interfaces.orly/codec"
|
"interfaces.orly/codec"
|
||||||
"lol.mleku.dev/chk"
|
"lol.mleku.dev/chk"
|
||||||
"lol.mleku.dev/errorf"
|
"lol.mleku.dev/errorf"
|
||||||
"lol.mleku.dev/log"
|
|
||||||
"utils.orly/constraints"
|
"utils.orly/constraints"
|
||||||
"utils.orly/units"
|
"utils.orly/units"
|
||||||
)
|
)
|
||||||
@@ -55,7 +54,7 @@ func (en *Challenge) Label() string { return L }
|
|||||||
func (en *Challenge) Write(w io.Writer) (err error) {
|
func (en *Challenge) Write(w io.Writer) (err error) {
|
||||||
var b []byte
|
var b []byte
|
||||||
b = en.Marshal(b)
|
b = en.Marshal(b)
|
||||||
log.D.F("writing out challenge envelope: '%s'", b)
|
// log.D.F("writing out challenge envelope: '%s'", b)
|
||||||
_, err = w.Write(b)
|
_, err = w.Write(b)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import (
|
|||||||
"encoders.orly/hex"
|
"encoders.orly/hex"
|
||||||
"encoders.orly/ints"
|
"encoders.orly/ints"
|
||||||
"encoders.orly/text"
|
"encoders.orly/text"
|
||||||
"lol.mleku.dev/log"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ToCanonical converts the event to the canonical encoding used to derive the
|
// ToCanonical converts the event to the canonical encoding used to derive the
|
||||||
@@ -23,7 +22,7 @@ func (ev *E) ToCanonical(dst []byte) (b []byte) {
|
|||||||
b = append(b, ',')
|
b = append(b, ',')
|
||||||
b = text.AppendQuote(b, ev.Content, text.NostrEscape)
|
b = text.AppendQuote(b, ev.Content, text.NostrEscape)
|
||||||
b = append(b, ']')
|
b = append(b, ']')
|
||||||
log.D.F("canonical: %s", b)
|
// log.D.F("canonical: %s", b)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -333,7 +333,7 @@ var (
|
|||||||
CommunityDefinition = &K{34550}
|
CommunityDefinition = &K{34550}
|
||||||
ACLEvent = &K{39998}
|
ACLEvent = &K{39998}
|
||||||
// ParameterizedReplaceableEnd is an event type that...
|
// ParameterizedReplaceableEnd is an event type that...
|
||||||
ParameterizedReplaceableEnd = &K{39999}
|
ParameterizedReplaceableEnd = &K{40000}
|
||||||
)
|
)
|
||||||
|
|
||||||
var MapMx sync.RWMutex
|
var MapMx sync.RWMutex
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ const (
|
|||||||
type I interface {
|
type I interface {
|
||||||
Configure(cfg ...any) (err error)
|
Configure(cfg ...any) (err error)
|
||||||
// GetAccessLevel returns the access level string for a given pubkey.
|
// GetAccessLevel returns the access level string for a given pubkey.
|
||||||
GetAccessLevel(pub []byte) (level string)
|
GetAccessLevel(pub []byte, address string) (level string)
|
||||||
// GetACLInfo returns the name and a description of the ACL, which should
|
// GetACLInfo returns the name and a description of the ACL, which should
|
||||||
// explain briefly how it works, and then a long text of documentation of
|
// explain briefly how it works, and then a long text of documentation of
|
||||||
// the ACL's rules and configuration (in asciidoc or markdown).
|
// the ACL's rules and configuration (in asciidoc or markdown).
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
v0.2.0
|
v0.4.2
|
||||||
19
scripts/relaytester-install.sh
Executable file
19
scripts/relaytester-install.sh
Executable file
@@ -0,0 +1,19 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
## rust must be installed
|
||||||
|
if ! command -v "cargo" &> /dev/null; then
|
||||||
|
echo "rust and cargo is not installed."
|
||||||
|
echo "run this command to install:"
|
||||||
|
echo
|
||||||
|
echo "curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh"
|
||||||
|
exit
|
||||||
|
else
|
||||||
|
echo "cargo is installed."
|
||||||
|
fi
|
||||||
|
|
||||||
|
rm -rf relay-tester
|
||||||
|
git clone https://github.com/mikedilger/relay-tester.git
|
||||||
|
cd relay-tester
|
||||||
|
cargo build -r
|
||||||
|
cp target/release/relay-tester $GOBIN/
|
||||||
|
cd ..
|
||||||
|
#rm -rf relay-tester
|
||||||
23
scripts/relaytester-test.sh
Executable file
23
scripts/relaytester-test.sh
Executable file
@@ -0,0 +1,23 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
## relay-tester must be installed
|
||||||
|
if ! command -v "relay-tester" &> /dev/null; then
|
||||||
|
echo "relay-tester is not installed."
|
||||||
|
echo "run this command to install:"
|
||||||
|
echo
|
||||||
|
echo "./scripts/relaytester-install.sh"
|
||||||
|
exit
|
||||||
|
fi
|
||||||
|
rm -rf /tmp/orlytest
|
||||||
|
export ORLY_LOG_LEVEL=trace
|
||||||
|
export ORLY_LOG_TO_STDOUT=true
|
||||||
|
export ORLY_LISTEN=127.0.0.1
|
||||||
|
export ORLY_PORT=3334
|
||||||
|
export ORLY_IP_WHITELIST=127.0.0
|
||||||
|
export ORLY_ADMINS=6d9b216ec1dc329ca43c56634e0dba6aaaf3d45ab878bdf4fa910c7117db0bfa,c284f03a874668eded145490e436b87f1a1fc565cf320e7dea93a7e96e3629d7
|
||||||
|
export ORLY_ACL_MODE=none
|
||||||
|
export ORLY_DATA_DIR=/tmp/orlytest
|
||||||
|
go run . &
|
||||||
|
sleep 5
|
||||||
|
relay-tester ws://127.0.0.1:3334 nsec12l4072hvvyjpmkyjtdxn48xf8qj299zw60u7ddg58s2aphv3rpjqtg0tvr nsec1syvtjgqauyeezgrev5nqrp36d87apjk87043tgu2usgv8umyy6wq4yl6tu
|
||||||
|
killall next.orly.dev
|
||||||
|
rm -rf /tmp/orlytest
|
||||||
14972
stacktrace.txt
Normal file
14972
stacktrace.txt
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user