Compare commits

..

4 Commits

18 changed files with 15201 additions and 125 deletions

View File

@@ -107,7 +107,7 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) (err error) {
string(at.DTag), ev.CreatedAt, env.E.CreatedAt,
)
if err = l.DeleteEventBySerial(
l.Ctx, s, ev,
l.Ctx(), s, ev,
); chk.E(err) {
continue
}
@@ -165,7 +165,7 @@ func (l *Listener) HandleDelete(env *eventenvelope.Submission) (err error) {
"HandleDelete: deleting event %s by authorized user %s",
hex.Enc(ev.ID), hex.Enc(env.E.Pubkey),
)
if err = l.DeleteEventBySerial(l.Ctx, s, ev); chk.E(err) {
if err = l.DeleteEventBySerial(l.Ctx(), s, ev); chk.E(err) {
continue
}
}

View File

@@ -1,8 +1,10 @@
package app
import (
"context"
"fmt"
"strings"
"time"
acl "acl.orly"
"encoders.orly/envelopes/authenvelope"
@@ -99,7 +101,7 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
return
default:
// user has write access or better, continue
log.D.F("user has %s access", accessLevel)
// log.D.F("user has %s access", accessLevel)
}
// if the event is a delete, process the delete
if env.E.Kind == kind.EventDeletion.K {
@@ -127,9 +129,11 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
}
}
}
// store the event
// store the event - use a separate context to prevent cancellation issues
saveCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// log.I.F("saving event %0x, %s", env.E.ID, env.E.Serialize())
if _, _, err = l.SaveEvent(l.Ctx, env.E); err != nil {
if _, _, err = l.SaveEvent(saveCtx, env.E); err != nil {
if strings.HasPrefix(err.Error(), "blocked:") {
errStr := err.Error()[len("blocked: "):len(err.Error())]
if err = Ok.Error(
@@ -146,7 +150,8 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
if err = Ok.Ok(l, env, ""); chk.E(err) {
return
}
defer l.publishers.Deliver(env.E)
// Deliver the event to subscribers immediately after sending OK response
l.publishers.Deliver(env.E)
log.D.F("saved event %0x", env.E.ID)
var isNewFromAdmin bool
for _, admin := range l.Admins {
@@ -156,11 +161,16 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
}
}
if isNewFromAdmin {
log.I.F("new event from admin %0x", env.E.Pubkey)
// if a follow list was saved, reconfigure ACLs now that it is persisted
if env.E.Kind == kind.FollowList.K ||
env.E.Kind == kind.RelayListMetadata.K {
if err = acl.Registry.Configure(); chk.E(err) {
}
// Run ACL reconfiguration asynchronously to prevent blocking websocket operations
go func() {
if err := acl.Registry.Configure(); chk.E(err) {
log.E.F("failed to reconfigure ACL: %v", err)
}
}()
}
}
return

View File

@@ -9,10 +9,11 @@ import (
"encoders.orly/envelopes/reqenvelope"
"lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
)
func (l *Listener) HandleMessage(msg []byte, remote string) {
// log.D.F("%s received message:\n%s", remote, msg)
log.D.F("%s received message:\n%s", remote, msg)
var err error
var t string
var rem []byte

View File

@@ -1,7 +1,10 @@
package app
import (
"context"
"errors"
"fmt"
"time"
acl "acl.orly"
"encoders.orly/envelopes/authenvelope"
@@ -25,7 +28,7 @@ import (
)
func (l *Listener) HandleReq(msg []byte) (err error) {
// log.T.F("HandleReq: from %s", l.remote)
log.T.F("HandleReq: START processing from %s\n%s\n", l.remote, msg)
var rem []byte
env := reqenvelope.New()
if rem, err = env.Unmarshal(msg); chk.E(err) {
@@ -54,69 +57,91 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
return
default:
// user has read access or better, continue
// log.D.F("user has %s access", accessLevel)
log.D.F("user has %s access", accessLevel)
}
var events event.S
for _, f := range *env.Filters {
// idsLen := 0; kindsLen := 0; authorsLen := 0; tagsLen := 0
// if f != nil {
// if f.Ids != nil { idsLen = f.Ids.Len() }
// if f.Kinds != nil { kindsLen = f.Kinds.Len() }
// if f.Authors != nil { authorsLen = f.Authors.Len() }
// if f.Tags != nil { tagsLen = f.Tags.Len() }
// }
// log.T.F("REQ %s: filter summary ids=%d kinds=%d authors=%d tags=%d", env.Subscription, idsLen, kindsLen, authorsLen, tagsLen)
// if f != nil && f.Authors != nil && f.Authors.Len() > 0 {
// var authors []string
// for _, a := range f.Authors.T { authors = append(authors, hex.Enc(a)) }
// log.T.F("REQ %s: authors=%v", env.Subscription, authors)
// }
// if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 {
// log.T.F("REQ %s: kinds=%v", env.Subscription, f.Kinds.ToUint16())
// }
// if f != nil && f.Ids != nil && f.Ids.Len() > 0 {
// var ids []string
// for _, id := range f.Ids.T {
// ids = append(ids, hex.Enc(id))
// }
// var lim any
// if pointers.Present(f.Limit) {
// lim = *f.Limit
// } else {
// lim = nil
// }
// log.T.F(
// "REQ %s: ids filter count=%d ids=%v limit=%v", env.Subscription,
// f.Ids.Len(), ids, lim,
// )
// }
idsLen := 0
kindsLen := 0
authorsLen := 0
tagsLen := 0
if f != nil {
if f.Ids != nil {
idsLen = f.Ids.Len()
}
if f.Kinds != nil {
kindsLen = f.Kinds.Len()
}
if f.Authors != nil {
authorsLen = f.Authors.Len()
}
if f.Tags != nil {
tagsLen = f.Tags.Len()
}
}
log.T.F(
"REQ %s: filter summary ids=%d kinds=%d authors=%d tags=%d",
env.Subscription, idsLen, kindsLen, authorsLen, tagsLen,
)
if f != nil && f.Authors != nil && f.Authors.Len() > 0 {
var authors []string
for _, a := range f.Authors.T {
authors = append(authors, hex.Enc(a))
}
log.T.F("REQ %s: authors=%v", env.Subscription, authors)
}
if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 {
log.T.F("REQ %s: kinds=%v", env.Subscription, f.Kinds.ToUint16())
}
if f != nil && f.Ids != nil && f.Ids.Len() > 0 {
var ids []string
for _, id := range f.Ids.T {
ids = append(ids, hex.Enc(id))
}
var lim any
if pointers.Present(f.Limit) {
lim = *f.Limit
} else {
lim = nil
}
log.T.F(
"REQ %s: ids filter count=%d ids=%v limit=%v", env.Subscription,
f.Ids.Len(), ids, lim,
)
}
if pointers.Present(f.Limit) {
if *f.Limit == 0 {
continue
}
}
if events, err = l.QueryEvents(l.Ctx, f); chk.E(err) {
if errors.Is(err, badger.ErrDBClosed) {
return
}
err = nil
}
// Use a separate context for QueryEvents to prevent cancellation issues
queryCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
log.T.F("HandleReq: About to QueryEvents for %s, main context done: %v", l.remote, l.ctx.Err() != nil)
if events, err = l.QueryEvents(queryCtx, f); chk.E(err) {
if errors.Is(err, badger.ErrDBClosed) {
return
}
log.T.F("HandleReq: QueryEvents error for %s: %v", l.remote, err)
err = nil
}
log.T.F("HandleReq: QueryEvents completed for %s, found %d events", l.remote, len(events))
}
var tmp event.S
privCheck:
for _, ev := range events {
if kind.IsPrivileged(ev.Kind) &&
accessLevel != "admin" { // admins can see all events
// log.I.F("checking privileged event %s", ev.ID)
log.I.F("checking privileged event %s", ev.ID)
pk := l.authedPubkey.Load()
if pk == nil {
continue
}
if utils.FastEqual(ev.Pubkey, pk) {
// log.I.F(
// "privileged event %s is for logged in pubkey %0x", ev.ID,
// pk,
// )
log.I.F(
"privileged event %s is for logged in pubkey %0x", ev.ID,
pk,
)
tmp = append(tmp, ev)
continue
}
@@ -127,10 +152,10 @@ privCheck:
continue
}
if utils.FastEqual(pt, pk) {
// log.I.F(
// "privileged event %s is for logged in pubkey %0x",
// ev.ID, pk,
// )
log.I.F(
"privileged event %s is for logged in pubkey %0x",
ev.ID, pk,
)
tmp = append(tmp, ev)
continue privCheck
}
@@ -146,10 +171,15 @@ privCheck:
events = tmp
seen := make(map[string]struct{})
for _, ev := range events {
// log.T.F(
// "REQ %s: sending EVENT id=%s kind=%d", env.Subscription,
// hex.Enc(ev.ID), ev.Kind,
// )
log.T.F(
"REQ %s: sending EVENT id=%s kind=%d", env.Subscription,
hex.Enc(ev.ID), ev.Kind,
)
log.T.C(
func() string {
return fmt.Sprintf("event:\n%s\n", ev.Serialize())
},
)
var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith(
env.Subscription, ev,
@@ -164,7 +194,7 @@ privCheck:
}
// write the EOSE to signal to the client that all events found have been
// sent.
// log.T.F("sending EOSE to %s", l.remote)
log.T.F("sending EOSE to %s", l.remote)
if err = eoseenvelope.NewFrom(env.Subscription).
Write(l); chk.E(err) {
return
@@ -172,10 +202,10 @@ privCheck:
// if the query was for just Ids, we know there can't be any more results,
// so cancel the subscription.
cancel := true
// log.T.F(
// "REQ %s: computing cancel/subscription; events_sent=%d",
// env.Subscription, len(events),
// )
log.T.F(
"REQ %s: computing cancel/subscription; events_sent=%d",
env.Subscription, len(events),
)
var subbedFilters filter.S
for _, f := range *env.Filters {
if f.Ids.Len() < 1 {
@@ -190,10 +220,10 @@ privCheck:
}
notFounds = append(notFounds, id)
}
// log.T.F(
// "REQ %s: ids outstanding=%d of %d", env.Subscription,
// len(notFounds), f.Ids.Len(),
// )
log.T.F(
"REQ %s: ids outstanding=%d of %d", env.Subscription,
len(notFounds), f.Ids.Len(),
)
// if all were found, don't add to subbedFilters
if len(notFounds) == 0 {
continue
@@ -230,5 +260,6 @@ privCheck:
return
}
}
log.T.F("HandleReq: COMPLETED processing from %s", l.remote)
return
}

View File

@@ -19,6 +19,7 @@ const (
DefaultWriteWait = 10 * time.Second
DefaultPongWait = 60 * time.Second
DefaultPingWait = DefaultPongWait / 2
DefaultReadTimeout = 3 * time.Second // Read timeout to detect stalled connections
DefaultMaxMessageSize = 1 * units.Mb
// CloseMessage denotes a close control message. The optional message
@@ -95,13 +96,34 @@ whitelist:
}
var typ websocket.MessageType
var msg []byte
// log.T.F("waiting for message from %s", remote)
if typ, msg, err = conn.Read(ctx); chk.E(err) {
log.T.F("waiting for message from %s", remote)
// Create a read context with timeout to prevent indefinite blocking
readCtx, readCancel := context.WithTimeout(ctx, DefaultReadTimeout)
typ, msg, err = conn.Read(readCtx)
readCancel()
if err != nil {
if strings.Contains(
err.Error(), "use of closed network connection",
) {
return
}
// Handle timeout errors - occurs when client becomes unresponsive
if strings.Contains(err.Error(), "context deadline exceeded") {
log.T.F(
"connection from %s timed out after %v", remote,
DefaultReadTimeout,
)
return
}
// Handle EOF errors gracefully - these occur when client closes connection
// or sends incomplete/malformed WebSocket frames
if strings.Contains(err.Error(), "EOF") ||
strings.Contains(err.Error(), "failed to read frame header") {
log.T.F("connection from %s closed: %v", remote, err)
return
}
status := websocket.CloseStatus(err)
switch status {
case websocket.StatusNormalClosure,
@@ -109,6 +131,9 @@ whitelist:
websocket.StatusNoStatusRcvd,
websocket.StatusAbnormalClosure,
websocket.StatusProtocolError:
log.T.F(
"connection from %s closed with status: %v", remote, status,
)
default:
log.E.F("unexpected close error from %s: %v", remote, err)
}
@@ -120,6 +145,7 @@ whitelist:
}
continue
}
log.T.F("received message from %s: %s", remote, string(msg))
go listener.HandleMessage(msg, remote)
}
}

View File

@@ -3,12 +3,15 @@ package app
import (
"context"
"net/http"
"time"
"github.com/coder/websocket"
"lol.mleku.dev/chk"
"utils.orly/atomic"
)
const WriteTimeout = 10 * time.Second
type Listener struct {
*Server
conn *websocket.Conn
@@ -19,8 +22,19 @@ type Listener struct {
authedPubkey atomic.Bytes
}
// Ctx returns the listener's context, but creates a new context for each operation
// to prevent cancellation from affecting subsequent operations
func (l *Listener) Ctx() context.Context {
return l.ctx
}
func (l *Listener) Write(p []byte) (n int, err error) {
if err = l.conn.Write(l.ctx, websocket.MessageText, p); chk.E(err) {
// Use a separate context with timeout for writes to prevent race conditions
// where the main connection context gets cancelled while writing events
writeCtx, cancel := context.WithTimeout(context.Background(), WriteTimeout)
defer cancel()
if err = l.conn.Write(writeCtx, websocket.MessageText, p); chk.E(err) {
return
}
n = len(p)

View File

@@ -28,6 +28,9 @@ func Run(
var err error
var adminKeys [][]byte
for _, admin := range cfg.Admins {
if len(admin) == 0 {
continue
}
var pk []byte
if pk, err = bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(err) {
continue

View File

@@ -101,17 +101,17 @@ func (p *P) Receive(msg typer.T) {
if m.Cancel {
if m.Id == "" {
p.removeSubscriber(m.Conn)
// log.D.F("removed listener %s", m.remote)
log.D.F("removed listener %s", m.remote)
} else {
p.removeSubscriberId(m.Conn, m.Id)
// log.D.C(
// func() string {
// return fmt.Sprintf(
// "removed subscription %s for %s", m.Id,
// m.remote,
// )
// },
// )
log.D.C(
func() string {
return fmt.Sprintf(
"removed subscription %s for %s", m.Id,
m.remote,
)
},
)
}
return
}
@@ -123,27 +123,27 @@ func (p *P) Receive(msg typer.T) {
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
}
p.Map[m.Conn] = subs
// log.D.C(
// func() string {
// return fmt.Sprintf(
// "created new subscription for %s, %s",
// m.remote,
// m.Filters.Marshal(nil),
// )
// },
// )
log.D.C(
func() string {
return fmt.Sprintf(
"created new subscription for %s, %s",
m.remote,
m.Filters.Marshal(nil),
)
},
)
} else {
subs[m.Id] = Subscription{
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey,
}
// log.D.C(
// func() string {
// return fmt.Sprintf(
// "added subscription %s for %s", m.Id,
// m.remote,
// )
// },
// )
log.D.C(
func() string {
return fmt.Sprintf(
"added subscription %s for %s", m.Id,
m.remote,
)
},
)
}
}
}
@@ -179,14 +179,16 @@ func (p *P) Deliver(ev *event.E) {
}
}
p.Mx.RUnlock()
log.D.C(
func() string {
return fmt.Sprintf(
"delivering event %0x to websocket subscribers %d", ev.ID,
len(deliveries),
)
},
)
if len(deliveries) > 0 {
log.D.C(
func() string {
return fmt.Sprintf(
"delivering event %0x to websocket subscribers %d", ev.ID,
len(deliveries),
)
},
)
}
for _, d := range deliveries {
// If the event is privileged, enforce that the subscriber's authed pubkey matches
// either the event pubkey or appears in any 'p' tag of the event.
@@ -218,8 +220,15 @@ func (p *P) Deliver(ev *event.E) {
if res, err = eventenvelope.NewResultWith(d.id, ev); chk.E(err) {
continue
}
// Use a separate context with timeout for writes to prevent race conditions
// where the publisher context gets cancelled while writing events
writeCtx, cancel := context.WithTimeout(
context.Background(), WriteTimeout,
)
defer cancel()
if err = d.w.Write(
p.c, websocket.MessageText, res.Marshal(nil),
writeCtx, websocket.MessageText, res.Marshal(nil),
); chk.E(err) {
// On error, remove the subscriber connection safely
p.removeSubscriber(d.w)
@@ -245,9 +254,9 @@ func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
var subs map[string]Subscription
var ok bool
if subs, ok = p.Map[ws]; ok {
delete(p.Map[ws], id)
_ = subs
if len(subs) == 0 {
delete(subs, id)
// Check the actual map after deletion, not the original reference
if len(p.Map[ws]) == 0 {
delete(p.Map, ws)
}
}

View File

@@ -282,7 +282,7 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
ctx, res.Event,
); err != nil {
if !strings.HasPrefix(
err.Error(), "event already exists",
err.Error(), "blocked:",
) {
log.W.F(
"follows syncer: save event failed: %v",

View File

@@ -9,6 +9,7 @@ import (
types2 "database.orly/indexes/types"
"encoders.orly/filter"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
)
type Range struct {
@@ -95,16 +96,13 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
return
}
b := buf.Bytes()
// Create range that will match any serial value with this ID prefix
end := make([]byte, len(b))
copy(end, b)
// Fill the end range with 0xff bytes to match all possible serial values
for i := 0; i < 5; i++ {
end = append(end, 0xff)
}
r := Range{b, end}
idxs = append(idxs, r)
return
@@ -241,6 +239,7 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
for _, t := range *f.Tags {
if t.Len() >= 2 && (len(t.Key()) == 1 || (len(t.Key()) == 2 && t.Key()[0] == '#')) {
var p *types2.PubHash
log.I.S(author)
if p, err = CreatePubHashFromData(author); chk.E(err) {
return
}
@@ -363,6 +362,7 @@ func GetIndexesFromFilter(f *filter.F) (idxs []Range, err error) {
if f.Authors != nil && f.Authors.Len() > 0 {
for _, author := range f.Authors.T {
var p *types2.PubHash
log.I.S(author)
if p, err = CreatePubHashFromData(author); chk.E(err) {
return
}

View File

@@ -15,10 +15,13 @@ const PubHashLen = 8
type PubHash struct{ val [PubHashLen]byte }
func (ph *PubHash) FromPubkey(pk []byte) (err error) {
if len(pk) == 0 {
panic("nil pubkey")
}
if len(pk) != schnorr.PubKeyBytesLen {
err = errorf.E(
"invalid Pubkey length, got %d require %d",
len(pk), schnorr.PubKeyBytesLen,
"invalid Pubkey length, got %d require %d %0x",
len(pk), schnorr.PubKeyBytesLen, pk,
)
return
}

View File

@@ -3,6 +3,7 @@ package database
import (
"bytes"
"context"
"fmt"
"strings"
"database.orly/indexes"
@@ -235,5 +236,10 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) (kc, vc int, err error) {
"total data written: %d bytes keys %d bytes values for event ID %s", kc,
vc, hex.Enc(ev.ID),
)
log.T.C(
func() string {
return fmt.Sprintf("event:\n%s\n", ev.Serialize())
},
)
return
}

View File

@@ -5,7 +5,6 @@ import (
"encoders.orly/hex"
"encoders.orly/ints"
"encoders.orly/text"
"lol.mleku.dev/log"
)
// ToCanonical converts the event to the canonical encoding used to derive the
@@ -23,7 +22,7 @@ func (ev *E) ToCanonical(dst []byte) (b []byte) {
b = append(b, ',')
b = text.AppendQuote(b, ev.Content, text.NostrEscape)
b = append(b, ']')
log.D.F("canonical: %s", b)
// log.D.F("canonical: %s", b)
return
}

View File

@@ -333,7 +333,7 @@ var (
CommunityDefinition = &K{34550}
ACLEvent = &K{39998}
// ParameterizedReplaceableEnd is an event type that...
ParameterizedReplaceableEnd = &K{39999}
ParameterizedReplaceableEnd = &K{40000}
)
var MapMx sync.RWMutex

View File

@@ -1 +1 @@
v0.3.2
v0.4.0

View File

@@ -16,4 +16,4 @@ cd relay-tester
cargo build -r
cp target/release/relay-tester $GOBIN/
cd ..
rm -rf relay-tester
#rm -rf relay-tester

View File

@@ -7,7 +7,7 @@ if ! command -v "relay-tester" &> /dev/null; then
echo "./scripts/relaytester-install.sh"
exit
fi
rm -rf ~/.local/share/ORLY
rm -rf /tmp/orlytest
export ORLY_LOG_LEVEL=trace
export ORLY_LOG_TO_STDOUT=true
export ORLY_LISTEN=127.0.0.1
@@ -15,7 +15,9 @@ export ORLY_PORT=3334
export ORLY_IP_WHITELIST=127.0.0
export ORLY_ADMINS=6d9b216ec1dc329ca43c56634e0dba6aaaf3d45ab878bdf4fa910c7117db0bfa,c284f03a874668eded145490e436b87f1a1fc565cf320e7dea93a7e96e3629d7
export ORLY_ACL_MODE=none
export ORLY_DATA_DIR=/tmp/orlytest
go run . &
sleep 5
relay-tester ws://127.0.0.1:3334 nsec12l4072hvvyjpmkyjtdxn48xf8qj299zw60u7ddg58s2aphv3rpjqtg0tvr nsec1syvtjgqauyeezgrev5nqrp36d87apjk87043tgu2usgv8umyy6wq4yl6tu
killall next.orly.dev
killall next.orly.dev
rm -rf /tmp/orlytest

14972
stacktrace.txt Normal file

File diff suppressed because it is too large Load Diff