- pkg/database/save-event.go - Added `noVerify` parameter to `SaveEvent` function - Added check for existing event using `GetSerialById` when `noVerify` is false - Modified logic to handle event verification based on `noVerify` flag - pkg/app/relay/server-publish.go - Added `false` as third argument to `SaveEvent` calls - pkg/database/export_test.go - Added `false` as third argument to `SaveEvent` call - pkg/database/query-for-tags_test.go - Added `false` as third argument to `SaveEvent` call - pkg/database/query-for-kinds-authors_test.go - Added `false` as third argument to `SaveEvent` call - pkg/database/query-for-kinds-tags_test.go - Added `false` as third argument to `SaveEvent` call - pkg/database/query-for-serials_test.go - Added `false` as third argument to `SaveEvent` call - main.go - Modified pprof handling to support different profiling types (cpu, memory, allocation) - Changed `Pprof` configuration from boolean to string with enum values - pkg/app/config/config.go - Changed `Pprof` field type from `bool` to `string` with enum values - pkg/database/query-for-kinds-authors-tags_test.go - Added `false` as third argument to `SaveEvent` call - pkg/version/version - Bumped version from v0.2.12 to v0.2.13 - pkg/database/fetch-event-by-serial_test.go - Added `false` as third argument to `SaveEvent` call - pkg/database/query-for-kinds_test.go - Added `false` as third argument to `SaveEvent` call - pkg/database/get-serials-by-range_test.go - Added `false` as third argument to `SaveEvent` call - pkg/database/query-events-multiple-param-replaceable_test.go - Added `false` as third argument to `SaveEvent` calls - pkg/database/query-events_test.go - Added `false` as third argument to `SaveEvent` calls - pkg/interfaces/store/store_interface.go - Updated `Saver` interface to include `noVerify` parameter in `SaveEvent` method - Added `SerialByIder` interface with `GetSerialById` method - pkg/database/save-event_test.go - Added `false` as third argument to `SaveEvent` calls - Added new test case for saving existing event - pkg/database/query-for-ids_test.go - Added `false` as third argument to `SaveEvent` call - pkg/protocol/ws/client.go - Changed comment about context cancellation from "context is canceled" to "context is cancelled" - pkg/app/relay/spider-fetch.go - Added signature checker for WebSocket connections - Modified logic to check for existing events before saving - Added logging and memory optimization improvements
588 lines
16 KiB
Go
588 lines
16 KiB
Go
package ws
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/tls"
|
|
"net/http"
|
|
"orly.dev/pkg/encoders/envelopes"
|
|
"orly.dev/pkg/encoders/envelopes/authenvelope"
|
|
"orly.dev/pkg/encoders/envelopes/closedenvelope"
|
|
"orly.dev/pkg/encoders/envelopes/countenvelope"
|
|
"orly.dev/pkg/encoders/envelopes/eoseenvelope"
|
|
"orly.dev/pkg/encoders/envelopes/eventenvelope"
|
|
"orly.dev/pkg/encoders/envelopes/noticeenvelope"
|
|
"orly.dev/pkg/encoders/envelopes/okenvelope"
|
|
"orly.dev/pkg/encoders/event"
|
|
"orly.dev/pkg/encoders/filter"
|
|
"orly.dev/pkg/encoders/filters"
|
|
"orly.dev/pkg/encoders/kind"
|
|
"orly.dev/pkg/interfaces/signer"
|
|
"orly.dev/pkg/protocol/auth"
|
|
"orly.dev/pkg/utils/atomic"
|
|
"orly.dev/pkg/utils/chk"
|
|
"orly.dev/pkg/utils/context"
|
|
"orly.dev/pkg/utils/errorf"
|
|
"orly.dev/pkg/utils/log"
|
|
"orly.dev/pkg/utils/normalize"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gobwas/ws"
|
|
"github.com/gobwas/ws/wsutil"
|
|
"github.com/puzpuzpuz/xsync/v3"
|
|
)
|
|
|
|
var subscriptionIDCounter atomic.Int32
|
|
|
|
type Client struct {
|
|
closeMutex sync.Mutex
|
|
|
|
URL string
|
|
|
|
RequestHeader http.Header // e.g. for origin header
|
|
|
|
Connection *Connection
|
|
|
|
Subscriptions *xsync.MapOf[string, *Subscription]
|
|
|
|
ConnectionError error
|
|
|
|
connectionContext context.T // will be canceled when the connection closes
|
|
|
|
connectionContextCancel context.F
|
|
|
|
challenge []byte // NIP-42 challenge, we only keep the last
|
|
|
|
notices chan []byte // NIP-01 NOTICEs
|
|
|
|
okCallbacks *xsync.MapOf[string, func(bool, string)]
|
|
|
|
writeQueue chan writeRequest
|
|
|
|
subscriptionChannelCloseQueue chan *Subscription
|
|
|
|
signatureChecker func(*event.E) bool
|
|
|
|
AssumeValid bool // this will skip verifying signatures for events received from this relay
|
|
}
|
|
|
|
type writeRequest struct {
|
|
msg []byte
|
|
answer chan error
|
|
}
|
|
|
|
// NewRelay returns a new relay. The relay connection will be closed when the
|
|
// context is cancelled.
|
|
func NewRelay(c context.T, url string, opts ...RelayOption) *Client {
|
|
ctx, cancel := context.Cancel(c)
|
|
r := &Client{
|
|
URL: string(normalize.URL([]byte(url))),
|
|
connectionContext: ctx,
|
|
connectionContextCancel: cancel,
|
|
Subscriptions: xsync.NewMapOf[string, *Subscription](),
|
|
okCallbacks: xsync.NewMapOf[string, func(
|
|
bool, string,
|
|
)](),
|
|
writeQueue: make(chan writeRequest),
|
|
subscriptionChannelCloseQueue: make(chan *Subscription),
|
|
signatureChecker: func(e *event.E) bool { ok, _ := e.Verify(); return ok },
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt.ApplyRelayOption(r)
|
|
}
|
|
|
|
return r
|
|
}
|
|
|
|
// RelayConnect returns a relay object connected to url. Once successfully
|
|
// connected, cancelling ctx has no effect. To close the connection, call
|
|
// r.Close().
|
|
func RelayConnect(ctx context.T, url string, opts ...RelayOption) (
|
|
*Client, error,
|
|
) {
|
|
r := NewRelay(context.Bg(), url, opts...)
|
|
err := r.Connect(ctx)
|
|
return r, err
|
|
}
|
|
|
|
// RelayOption is the type of the argument passed for that.
|
|
type RelayOption interface {
|
|
ApplyRelayOption(*Client)
|
|
}
|
|
|
|
var (
|
|
_ RelayOption = (WithNoticeHandler)(nil)
|
|
_ RelayOption = (WithSignatureChecker)(nil)
|
|
)
|
|
|
|
// WithNoticeHandler just takes notices and is expected to do something with
|
|
// them. when not given, defaults to logging the notices.
|
|
type WithNoticeHandler func(notice []byte)
|
|
|
|
func (nh WithNoticeHandler) ApplyRelayOption(r *Client) {
|
|
r.notices = make(chan []byte)
|
|
go func() {
|
|
for notice := range r.notices {
|
|
nh(notice)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// WithSignatureChecker must be a function that checks the signature of an event
|
|
// and returns true or false.
|
|
type WithSignatureChecker func(*event.E) bool
|
|
|
|
func (sc WithSignatureChecker) ApplyRelayOption(r *Client) {
|
|
r.signatureChecker = sc
|
|
}
|
|
|
|
// String just returns the relay URL.
|
|
func (r *Client) String() string {
|
|
return r.URL
|
|
}
|
|
|
|
// Context retrieves the context that is associated with this relay connection.
|
|
func (r *Client) Context() context.T { return r.connectionContext }
|
|
|
|
// IsConnected returns true if the connection to this relay seems to be active.
|
|
func (r *Client) IsConnected() bool { return r.connectionContext.Err() == nil }
|
|
|
|
// Connect tries to establish a websocket connection to r.URL. If the context
|
|
// expires before the connection is complete, an error is returned. Once
|
|
// successfully connected, context expiration has no effect: call r.Close to
|
|
// close the connection.
|
|
//
|
|
// The underlying relay connection will use a background context. If you want to
|
|
// pass a custom context to the underlying relay connection, use NewRelay() and
|
|
// then Client.Connect().
|
|
func (r *Client) Connect(c context.T) error { return r.ConnectWithTLS(c, nil) }
|
|
|
|
// ConnectWithTLS tries to establish a secured websocket connection to r.URL
|
|
// using customized tls.Config (CA's, etc.).
|
|
func (r *Client) ConnectWithTLS(ctx context.T, tlsConfig *tls.Config) error {
|
|
if r.connectionContext == nil || r.Subscriptions == nil {
|
|
return errorf.E("relay must be initialized with a call to NewRelay()")
|
|
}
|
|
if r.URL == "" {
|
|
return errorf.E("invalid relay URL '%s'", r.URL)
|
|
}
|
|
if _, ok := ctx.Deadline(); !ok {
|
|
// if no timeout is set, force it to 7 seconds
|
|
var cancel context.F
|
|
ctx, cancel = context.Timeout(ctx, 7*time.Second)
|
|
defer cancel()
|
|
}
|
|
conn, err := NewConnection(ctx, r.URL, r.RequestHeader, tlsConfig)
|
|
if err != nil {
|
|
return errorf.E(
|
|
"error opening websocket to '%s': %s", r.URL, err.Error(),
|
|
)
|
|
}
|
|
r.Connection = conn
|
|
// ping every 29 seconds (??)
|
|
ticker := time.NewTicker(29 * time.Second)
|
|
// to be used when the connection is closed
|
|
go func() {
|
|
<-r.connectionContext.Done()
|
|
// close these things when the connection is closed
|
|
if r.notices != nil {
|
|
close(r.notices)
|
|
}
|
|
// stop the ticker
|
|
ticker.Stop()
|
|
// close all subscriptions
|
|
r.Subscriptions.Range(
|
|
func(_ string, sub *Subscription) bool {
|
|
go sub.Unsub()
|
|
return true
|
|
},
|
|
)
|
|
}()
|
|
// queue all write operations here so we don't do mutex spaghetti
|
|
go func() {
|
|
var err error
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
err = wsutil.WriteClientMessage(
|
|
r.Connection.conn, ws.OpPing, nil,
|
|
)
|
|
if err != nil {
|
|
log.D.F(
|
|
"{%s} error writing ping: %v; closing websocket", r.URL,
|
|
err,
|
|
)
|
|
r.Close() // this should trigger a context cancelation
|
|
return
|
|
}
|
|
case writeReq := <-r.writeQueue:
|
|
// all write requests will go through this to prevent races
|
|
if err = r.Connection.WriteMessage(
|
|
r.connectionContext,
|
|
writeReq.msg,
|
|
); chk.T(err) {
|
|
writeReq.answer <- err
|
|
}
|
|
close(writeReq.answer)
|
|
case <-r.connectionContext.Done():
|
|
// stop here
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
// general message reader loop
|
|
go func() {
|
|
buf := new(bytes.Buffer)
|
|
var err error
|
|
for {
|
|
buf.Reset()
|
|
if err = conn.ReadMessage(r.connectionContext, buf); err != nil {
|
|
r.ConnectionError = err
|
|
r.Close()
|
|
break
|
|
}
|
|
message := buf.Bytes()
|
|
// log.D.F("{%s} %s\n", r.URL, message)
|
|
|
|
var t string
|
|
if t, message, err = envelopes.Identify(message); chk.E(err) {
|
|
continue
|
|
}
|
|
switch t {
|
|
case noticeenvelope.L:
|
|
env := noticeenvelope.New()
|
|
if env, message, err = noticeenvelope.Parse(message); chk.E(err) {
|
|
continue
|
|
}
|
|
// see WithNoticeHandler
|
|
if r.notices != nil {
|
|
r.notices <- env.Message
|
|
} else {
|
|
log.E.F("NOTICE from %s: '%s'\n", r.URL, env.Message)
|
|
}
|
|
case authenvelope.L:
|
|
env := authenvelope.NewChallenge()
|
|
if env, message, err = authenvelope.ParseChallenge(message); chk.E(err) {
|
|
continue
|
|
}
|
|
if len(env.Challenge) == 0 {
|
|
continue
|
|
}
|
|
r.challenge = env.Challenge
|
|
case eventenvelope.L:
|
|
// log.I.F("message: %s", message)
|
|
env := eventenvelope.NewResult()
|
|
if env, message, err = eventenvelope.ParseResult(message); err != nil {
|
|
continue
|
|
}
|
|
// log.I.F("%s", env.Event.Marshal(nil))
|
|
if len(env.Subscription.T) == 0 {
|
|
continue
|
|
}
|
|
if sub, ok := r.Subscriptions.Load(env.Subscription.String()); !ok {
|
|
// log.D.F(
|
|
// "{%s} no subscription with id '%s'\n", r.URL,
|
|
// env.Subscription,
|
|
// )
|
|
continue
|
|
} else {
|
|
// check if the event matches the desired filter, ignore
|
|
// otherwise
|
|
if !sub.Filters.Match(env.Event) {
|
|
log.D.F(
|
|
"{%s} filter does not match: %v ~ %v\n", r.URL,
|
|
sub.Filters, env.Event,
|
|
)
|
|
continue
|
|
}
|
|
// check signature, ignore invalid, except from trusted
|
|
// (AssumeValid) relays
|
|
if !r.AssumeValid {
|
|
if ok = r.signatureChecker(env.Event); !ok {
|
|
log.E.F(
|
|
"{%s} bad signature on %s\n", r.URL,
|
|
env.Event.Id,
|
|
)
|
|
continue
|
|
}
|
|
}
|
|
// dispatch this to the internal .events channel of the
|
|
// subscription
|
|
sub.dispatchEvent(env.Event)
|
|
}
|
|
case eoseenvelope.L:
|
|
env := eoseenvelope.New()
|
|
if env, message, err = eoseenvelope.Parse(message); chk.E(err) {
|
|
continue
|
|
}
|
|
if subscription, ok := r.Subscriptions.Load(env.Subscription.String()); ok {
|
|
subscription.dispatchEose()
|
|
}
|
|
case closedenvelope.L:
|
|
env := closedenvelope.New()
|
|
if env, message, err = closedenvelope.Parse(message); chk.E(err) {
|
|
continue
|
|
}
|
|
if subscription, ok := r.Subscriptions.Load(env.Subscription.String()); ok {
|
|
subscription.dispatchClosed(env.ReasonString())
|
|
}
|
|
case countenvelope.L:
|
|
env := countenvelope.NewResponse()
|
|
if env, message, err = countenvelope.Parse(message); chk.E(err) {
|
|
continue
|
|
}
|
|
if subscription, ok := r.Subscriptions.Load(env.ID.String()); ok && subscription.countResult != nil {
|
|
subscription.countResult <- env.Count
|
|
}
|
|
case okenvelope.L:
|
|
env := okenvelope.New()
|
|
if env, message, err = okenvelope.Parse(message); chk.E(err) {
|
|
continue
|
|
}
|
|
if okCallback, exist := r.okCallbacks.Load(env.EventID.String()); exist {
|
|
okCallback(env.OK, env.ReasonString())
|
|
} else {
|
|
log.I.F(
|
|
"{%s} got an unexpected OK message for event %s", r.URL,
|
|
env.EventID,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// Write queues a message to be sent to the relay.
|
|
func (r *Client) Write(msg []byte) <-chan error {
|
|
ch := make(chan error)
|
|
select {
|
|
case r.writeQueue <- writeRequest{msg: msg, answer: ch}:
|
|
case <-r.connectionContext.Done():
|
|
go func() { ch <- errorf.E("connection closed") }()
|
|
}
|
|
return ch
|
|
}
|
|
|
|
// Publish sends an "EVENT" command to the relay r as in NIP-01 and waits for an
|
|
// OK response.
|
|
func (r *Client) Publish(c context.T, ev *event.E) error {
|
|
return r.publish(
|
|
c, ev,
|
|
)
|
|
}
|
|
|
|
// Auth sends an "AUTH" command client->relay as in NIP-42 and waits for an OK
|
|
// response.
|
|
func (r *Client) Auth(c context.T, sign signer.I) error {
|
|
authEvent := auth.CreateUnsigned(sign.Pub(), r.challenge, r.URL)
|
|
if err := authEvent.Sign(sign); chk.T(err) {
|
|
return errorf.E("error signing auth event: %w", err)
|
|
}
|
|
return r.publish(c, authEvent)
|
|
}
|
|
|
|
// publish can be used both for EVENT and for AUTH
|
|
func (r *Client) publish(ctx context.T, ev *event.E) (err error) {
|
|
var cancel context.F
|
|
if _, ok := ctx.Deadline(); !ok {
|
|
// if no timeout is set, force it to 7 seconds
|
|
ctx, cancel = context.TimeoutCause(
|
|
ctx, 7*time.Second,
|
|
errorf.E("given up waiting for an OK"),
|
|
)
|
|
defer cancel()
|
|
} else {
|
|
// otherwise make the context cancellable so we can stop everything upon
|
|
// receiving an "OK"
|
|
ctx, cancel = context.Cancel(ctx)
|
|
defer cancel()
|
|
}
|
|
// listen for an OK callback
|
|
gotOk := false
|
|
id := ev.IdString()
|
|
r.okCallbacks.Store(
|
|
id, func(ok bool, reason string) {
|
|
gotOk = true
|
|
if !ok {
|
|
err = errorf.E("msg: %s", reason)
|
|
}
|
|
cancel()
|
|
},
|
|
)
|
|
defer r.okCallbacks.Delete(id)
|
|
// publish event
|
|
var b []byte
|
|
if ev.Kind.Equal(kind.ClientAuthentication) {
|
|
if b = authenvelope.NewResponseWith(ev).Marshal(b); chk.E(err) {
|
|
return
|
|
}
|
|
} else {
|
|
if b = eventenvelope.NewSubmissionWith(ev).Marshal(b); chk.E(err) {
|
|
return
|
|
}
|
|
}
|
|
log.T.F("{%s} sending %s\n", r.URL, b)
|
|
if err = <-r.Write(b); chk.T(err) {
|
|
return err
|
|
}
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
// this will be called when we get an OK or when the context has
|
|
// been canceled
|
|
if gotOk {
|
|
return err
|
|
}
|
|
return ctx.Err()
|
|
case <-r.connectionContext.Done():
|
|
// this is caused when we lose connectivity
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Subscribe sends a "REQ" command to the relay r as in NIP-01. Events are
|
|
// returned through the channel sub.Events. The subscription is closed when
|
|
// context ctx is cancelled ("CLOSE" in NIP-01).
|
|
//
|
|
// Remember to cancel subscriptions, either by calling `.Unsub()` on them or
|
|
// ensuring their `context.Context` will be canceled at some point. Failure to
|
|
// do that will result in a huge number of halted goroutines being created.
|
|
func (r *Client) Subscribe(
|
|
c context.T, ff *filters.T,
|
|
opts ...SubscriptionOption,
|
|
) (*Subscription, error) {
|
|
sub := r.PrepareSubscription(c, ff, opts...)
|
|
if r.Connection == nil {
|
|
return nil, errorf.E("not connected to %s", r.URL)
|
|
}
|
|
if err := sub.Fire(); chk.T(err) {
|
|
return nil, errorf.E(
|
|
"couldn't subscribe to %v at %s: %w", ff, r.URL, err,
|
|
)
|
|
}
|
|
return sub, nil
|
|
}
|
|
|
|
// PrepareSubscription creates a subscription, but doesn't fire it.
|
|
//
|
|
// Remember to cancel subscriptions, either by calling `.Unsub()` on them or
|
|
// ensuring their `context.Context` will be canceled at some point. Failure to
|
|
// do that will result in a huge number of halted goroutines being created.
|
|
func (r *Client) PrepareSubscription(
|
|
c context.T, ff *filters.T,
|
|
opts ...SubscriptionOption,
|
|
) *Subscription {
|
|
current := subscriptionIDCounter.Add(1)
|
|
c, cancel := context.Cancel(c)
|
|
sub := &Subscription{
|
|
Relay: r,
|
|
Context: c,
|
|
cancel: cancel,
|
|
counter: int(current),
|
|
Events: make(event.C),
|
|
EndOfStoredEvents: make(chan struct{}, 1),
|
|
ClosedReason: make(chan string, 1),
|
|
Filters: ff,
|
|
}
|
|
for _, opt := range opts {
|
|
switch o := opt.(type) {
|
|
case WithLabel:
|
|
sub.label = string(o)
|
|
}
|
|
}
|
|
id := sub.GetID()
|
|
r.Subscriptions.Store(id.String(), sub)
|
|
// start handling events, eose, unsub etc:
|
|
go sub.start()
|
|
return sub
|
|
}
|
|
|
|
// QuerySync is only used in tests. The relay query method is synchronous now
|
|
// anyway (it ensures sort order is respected).
|
|
func (r *Client) QuerySync(
|
|
ctx context.T, f *filter.F,
|
|
opts ...SubscriptionOption,
|
|
) ([]*event.E, error) {
|
|
// log.T.F("QuerySync:\n%s", f.Marshal(nil))
|
|
sub, err := r.Subscribe(ctx, filters.New(f), opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
defer sub.Unsub()
|
|
|
|
if _, ok := ctx.Deadline(); !ok {
|
|
// if no timeout is set, force it to 7 seconds
|
|
var cancel context.F
|
|
ctx, cancel = context.Timeout(ctx, 7*time.Second)
|
|
defer cancel()
|
|
}
|
|
|
|
var events []*event.E
|
|
for {
|
|
select {
|
|
case evt := <-sub.Events:
|
|
if evt == nil {
|
|
// channel is closed
|
|
return events, nil
|
|
}
|
|
events = append(events, evt)
|
|
case <-sub.EndOfStoredEvents:
|
|
return events, nil
|
|
case <-ctx.Done():
|
|
return events, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// TODO: count is a dumb idea anyway, and nothing is using this
|
|
// func (r *Client) Count(c context.F, ff *filters.F, opts ...SubscriptionOption) (int, error) {
|
|
// sub := r.PrepareSubscription(c, ff, opts...)
|
|
// sub.countResult = make(chan int)
|
|
//
|
|
// if err := sub.Fire(); chk.F(err) {
|
|
// return 0, err
|
|
// }
|
|
//
|
|
// defer sub.Unsub()
|
|
//
|
|
// if _, ok := c.Deadline(); !ok {
|
|
// // if no timeout is set, force it to 7 seconds
|
|
// var cancel context.F
|
|
// c, cancel = context.Timeout(c, 7*time.Second)
|
|
// defer cancel()
|
|
// }
|
|
//
|
|
// for {
|
|
// select {
|
|
// case count := <-sub.countResult:
|
|
// return count, nil
|
|
// case <-c.Done():
|
|
// return 0, c.Err()
|
|
// }
|
|
// }
|
|
// }
|
|
|
|
// Close shuts down a websocket client connection.
|
|
func (r *Client) Close() error {
|
|
r.closeMutex.Lock()
|
|
defer r.closeMutex.Unlock()
|
|
if r.connectionContextCancel == nil {
|
|
return errorf.E("relay already closed")
|
|
}
|
|
r.connectionContextCancel()
|
|
r.connectionContextCancel = nil
|
|
if r.Connection == nil {
|
|
return errorf.E("relay not connected")
|
|
}
|
|
err := r.Connection.Close()
|
|
r.Connection = nil
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|