add SimplePool implementation for managing relay connections
- pkg/protocol/ws/pool.go - Added `SimplePool` struct to manage connections to multiple relays. - Introduced associated methods for relay connection, publishing, and subscribing. - Added middleware support for events, duplicates, and queries. - Implemented penalty box for managing failed relay connections. - Provided various options for customizing behavior (e.g. relays, authentication, event handling). - pkg/protocol/ws/subscription.go - Removed unnecessary `ReplaceableKey` struct. - Cleaned up redundant spaces and comments in subscription methods.
This commit is contained in:
852
pkg/protocol/ws/pool.go
Normal file
852
pkg/protocol/ws/pool.go
Normal file
@@ -0,0 +1,852 @@
|
||||
package ws
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"orly.dev/pkg/encoders/event"
|
||||
"orly.dev/pkg/encoders/filter"
|
||||
"orly.dev/pkg/encoders/filters"
|
||||
"orly.dev/pkg/encoders/kind"
|
||||
"orly.dev/pkg/encoders/timestamp"
|
||||
"orly.dev/pkg/interfaces/signer"
|
||||
"orly.dev/pkg/utils/context"
|
||||
"orly.dev/pkg/utils/log"
|
||||
"orly.dev/pkg/utils/normalize"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
)
|
||||
|
||||
const (
|
||||
seenAlreadyDropTick = time.Minute
|
||||
)
|
||||
|
||||
// SimplePool manages connections to multiple relays, ensures they are reopened when necessary and not duplicated.
|
||||
type SimplePool struct {
|
||||
Relays *xsync.MapOf[string, *Client]
|
||||
Context context.T
|
||||
|
||||
authHandler signer.I
|
||||
cancel context.C
|
||||
|
||||
eventMiddleware func(RelayEvent)
|
||||
duplicateMiddleware func(relay string, id string)
|
||||
queryMiddleware func(relay string, pubkey []byte, k *kind.T)
|
||||
|
||||
// custom things not often used
|
||||
penaltyBoxMu sync.Mutex
|
||||
penaltyBox map[string][2]float64
|
||||
relayOptions []RelayOption
|
||||
}
|
||||
|
||||
// DirectedFilter combines a Filter with a specific relay URL.
|
||||
type DirectedFilter struct {
|
||||
Filter *filter.F
|
||||
Relay string
|
||||
}
|
||||
|
||||
// RelayEvent represents an event received from a specific relay.
|
||||
type RelayEvent struct {
|
||||
*event.E
|
||||
Relay *Client
|
||||
}
|
||||
|
||||
func (ie RelayEvent) String() string {
|
||||
return fmt.Sprintf(
|
||||
"[%s] >> %s", ie.Relay.URL, ie.E.Serialize(),
|
||||
)
|
||||
}
|
||||
|
||||
// PoolOption is an interface for options that can be applied to a SimplePool.
|
||||
type PoolOption interface {
|
||||
ApplyPoolOption(*SimplePool)
|
||||
}
|
||||
|
||||
// NewSimplePool creates a new SimplePool with the given context and options.
|
||||
func NewSimplePool(ctx context.T, opts ...PoolOption) *SimplePool {
|
||||
ctx, cancel := context.Cause(ctx)
|
||||
|
||||
pool := &SimplePool{
|
||||
Relays: xsync.NewMapOf[string, *Client](),
|
||||
|
||||
Context: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt.ApplyPoolOption(pool)
|
||||
}
|
||||
|
||||
return pool
|
||||
}
|
||||
|
||||
// WithRelayOptions sets options that will be used on every relay instance created by this pool.
|
||||
func WithRelayOptions(ropts ...RelayOption) withRelayOptionsOpt {
|
||||
return ropts
|
||||
}
|
||||
|
||||
type withRelayOptionsOpt []RelayOption
|
||||
|
||||
func (h withRelayOptionsOpt) ApplyPoolOption(pool *SimplePool) {
|
||||
pool.relayOptions = h
|
||||
}
|
||||
|
||||
// WithAuthHandler must be a function that signs the auth event when called.
|
||||
// it will be called whenever any relay in the pool returns a `CLOSED` message
|
||||
// with the "auth-required:" prefix, only once for each relay
|
||||
type WithAuthHandler struct {
|
||||
signer.I
|
||||
}
|
||||
|
||||
func (h *WithAuthHandler) ApplyPoolOption(pool *SimplePool) {
|
||||
pool.authHandler = h.I
|
||||
}
|
||||
|
||||
// WithPenaltyBox just sets the penalty box mechanism so relays that fail to connect
|
||||
// or that disconnect will be ignored for a while and we won't attempt to connect again.
|
||||
func WithPenaltyBox() withPenaltyBoxOpt { return withPenaltyBoxOpt{} }
|
||||
|
||||
type withPenaltyBoxOpt struct{}
|
||||
|
||||
func (h withPenaltyBoxOpt) ApplyPoolOption(pool *SimplePool) {
|
||||
pool.penaltyBox = make(map[string][2]float64)
|
||||
go func() {
|
||||
sleep := 30.0
|
||||
for {
|
||||
time.Sleep(time.Duration(sleep) * time.Second)
|
||||
|
||||
pool.penaltyBoxMu.Lock()
|
||||
nextSleep := 300.0
|
||||
for url, v := range pool.penaltyBox {
|
||||
remainingSeconds := v[1]
|
||||
remainingSeconds -= sleep
|
||||
if remainingSeconds <= 0 {
|
||||
pool.penaltyBox[url] = [2]float64{v[0], 0}
|
||||
continue
|
||||
} else {
|
||||
pool.penaltyBox[url] = [2]float64{v[0], remainingSeconds}
|
||||
}
|
||||
|
||||
if remainingSeconds < nextSleep {
|
||||
nextSleep = remainingSeconds
|
||||
}
|
||||
}
|
||||
|
||||
sleep = nextSleep
|
||||
pool.penaltyBoxMu.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// WithEventMiddleware is a function that will be called with all events received.
|
||||
type WithEventMiddleware func(RelayEvent)
|
||||
|
||||
func (h WithEventMiddleware) ApplyPoolOption(pool *SimplePool) {
|
||||
pool.eventMiddleware = h
|
||||
}
|
||||
|
||||
// WithDuplicateMiddleware is a function that will be called with all duplicate ids received.
|
||||
type WithDuplicateMiddleware func(relay string, id string)
|
||||
|
||||
func (h WithDuplicateMiddleware) ApplyPoolOption(pool *SimplePool) {
|
||||
pool.duplicateMiddleware = h
|
||||
}
|
||||
|
||||
// WithAuthorKindQueryMiddleware is a function that will be called with every combination of relay+pubkey+kind queried
|
||||
// in a .SubMany*() call -- when applicable (i.e. when the query contains a pubkey and a kind).
|
||||
type WithAuthorKindQueryMiddleware func(
|
||||
relay string, pubkey []byte, kind *kind.T,
|
||||
)
|
||||
|
||||
func (h WithAuthorKindQueryMiddleware) ApplyPoolOption(pool *SimplePool) {
|
||||
pool.queryMiddleware = h
|
||||
}
|
||||
|
||||
type WithAuthMiddleware func(RelayEvent) error
|
||||
|
||||
var (
|
||||
_ PoolOption = (*WithAuthHandler)(nil)
|
||||
_ PoolOption = (WithEventMiddleware)(nil)
|
||||
_ PoolOption = WithPenaltyBox()
|
||||
_ PoolOption = WithRelayOptions(WithRequestHeader(http.Header{}))
|
||||
)
|
||||
|
||||
const MAX_LOCKS = 50
|
||||
|
||||
var (
|
||||
namedMutexPool = make([]sync.Mutex, MAX_LOCKS)
|
||||
)
|
||||
|
||||
//go:noescape
|
||||
//go:linkname memhash runtime.memhash
|
||||
func memhash(p unsafe.Pointer, h, s uintptr) uintptr
|
||||
|
||||
func namedLock(name string) (unlock func()) {
|
||||
sptr := unsafe.StringData(name)
|
||||
idx := uint64(
|
||||
memhash(
|
||||
unsafe.Pointer(sptr), 0, uintptr(len(name)),
|
||||
),
|
||||
) % MAX_LOCKS
|
||||
namedMutexPool[idx].Lock()
|
||||
return namedMutexPool[idx].Unlock
|
||||
}
|
||||
|
||||
// EnsureRelay ensures that a relay connection exists and is active.
|
||||
// If the relay is not connected, it attempts to connect.
|
||||
func (pool *SimplePool) EnsureRelay(url string) (*Client, error) {
|
||||
nm := string(normalize.URL(url))
|
||||
defer namedLock(nm)()
|
||||
|
||||
relay, ok := pool.Relays.Load(nm)
|
||||
if ok && relay == nil {
|
||||
if pool.penaltyBox != nil {
|
||||
pool.penaltyBoxMu.Lock()
|
||||
defer pool.penaltyBoxMu.Unlock()
|
||||
v, _ := pool.penaltyBox[nm]
|
||||
if v[1] > 0 {
|
||||
return nil, fmt.Errorf("in penalty box, %fs remaining", v[1])
|
||||
}
|
||||
}
|
||||
} else if ok && relay.IsConnected() {
|
||||
// already connected, unlock and return
|
||||
return relay, nil
|
||||
}
|
||||
|
||||
// try to connect
|
||||
// we use this ctx here so when the pool dies everything dies
|
||||
ctx, cancel := context.TimeoutCause(
|
||||
pool.Context,
|
||||
time.Second*15,
|
||||
errors.New("connecting to the relay took too long"),
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
relay = NewRelay(context.Bg(), url, pool.relayOptions...)
|
||||
if err := relay.Connect(ctx); err != nil {
|
||||
if pool.penaltyBox != nil {
|
||||
// putting relay in penalty box
|
||||
pool.penaltyBoxMu.Lock()
|
||||
defer pool.penaltyBoxMu.Unlock()
|
||||
v, _ := pool.penaltyBox[nm]
|
||||
pool.penaltyBox[nm] = [2]float64{
|
||||
v[0] + 1, 30.0 + math.Pow(2, v[0]+1),
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("failed to connect: %w", err)
|
||||
}
|
||||
|
||||
pool.Relays.Store(nm, relay)
|
||||
return relay, nil
|
||||
}
|
||||
|
||||
// PublishResult represents the result of publishing an event to a relay.
|
||||
type PublishResult struct {
|
||||
Error error
|
||||
RelayURL string
|
||||
Relay *Client
|
||||
}
|
||||
|
||||
// PublishMany publishes an event to multiple relays and returns a channel of results emitted as they're received.
|
||||
func (pool *SimplePool) PublishMany(
|
||||
ctx context.T, urls []string, evt *event.E,
|
||||
) chan PublishResult {
|
||||
ch := make(chan PublishResult, len(urls))
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(urls))
|
||||
go func() {
|
||||
for _, url := range urls {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
relay, err := pool.EnsureRelay(url)
|
||||
if err != nil {
|
||||
ch <- PublishResult{err, url, nil}
|
||||
return
|
||||
}
|
||||
|
||||
if err := relay.Publish(ctx, evt); err == nil {
|
||||
// success with no auth required
|
||||
ch <- PublishResult{nil, url, relay}
|
||||
} else if strings.HasPrefix(
|
||||
err.Error(), "msg: auth-required:",
|
||||
) && pool.authHandler != nil {
|
||||
// try to authenticate if we can
|
||||
if authErr := relay.Auth(
|
||||
ctx, pool.authHandler,
|
||||
); authErr == nil {
|
||||
if err := relay.Publish(ctx, evt); err == nil {
|
||||
// success after auth
|
||||
ch <- PublishResult{nil, url, relay}
|
||||
} else {
|
||||
// failure after auth
|
||||
ch <- PublishResult{err, url, relay}
|
||||
}
|
||||
} else {
|
||||
// failure to auth
|
||||
ch <- PublishResult{
|
||||
fmt.Errorf(
|
||||
"failed to auth: %w", authErr,
|
||||
), url, relay,
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// direct failure
|
||||
ch <- PublishResult{err, url, relay}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
// SubscribeMany opens a subscription with the given filter to multiple relays
|
||||
// the subscriptions ends when the context is canceled or when all relays return a CLOSED.
|
||||
func (pool *SimplePool) SubscribeMany(
|
||||
ctx context.T,
|
||||
urls []string,
|
||||
filter *filter.F,
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
return pool.subMany(ctx, urls, filters.New(filter), nil, opts...)
|
||||
}
|
||||
|
||||
// FetchMany opens a subscription, much like SubscribeMany, but it ends as soon as all Relays
|
||||
// return an EOSE message.
|
||||
func (pool *SimplePool) FetchMany(
|
||||
ctx context.T,
|
||||
urls []string,
|
||||
filter *filter.F,
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
return pool.SubManyEose(ctx, urls, filters.New(filter), opts...)
|
||||
}
|
||||
|
||||
// Deprecated: SubMany is deprecated: use SubscribeMany instead.
|
||||
func (pool *SimplePool) SubMany(
|
||||
ctx context.T,
|
||||
urls []string,
|
||||
filters *filters.T,
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
return pool.subMany(ctx, urls, filters, nil, opts...)
|
||||
}
|
||||
|
||||
// SubscribeManyNotifyEOSE is like SubscribeMany, but takes a channel that is closed when
|
||||
// all subscriptions have received an EOSE
|
||||
func (pool *SimplePool) SubscribeManyNotifyEOSE(
|
||||
ctx context.T,
|
||||
urls []string,
|
||||
filter *filter.F,
|
||||
eoseChan chan struct{},
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
return pool.subMany(ctx, urls, filters.New(filter), eoseChan, opts...)
|
||||
}
|
||||
|
||||
type ReplaceableKey struct {
|
||||
PubKey string
|
||||
D string
|
||||
}
|
||||
|
||||
// FetchManyReplaceable is like FetchMany, but deduplicates replaceable and addressable events and returns
|
||||
// only the latest for each "d" tag.
|
||||
func (pool *SimplePool) FetchManyReplaceable(
|
||||
ctx context.T,
|
||||
urls []string,
|
||||
filter *filter.F,
|
||||
opts ...SubscriptionOption,
|
||||
) *xsync.MapOf[ReplaceableKey, *event.E] {
|
||||
ctx, cancel := context.Cause(ctx)
|
||||
|
||||
results := xsync.NewMapOf[ReplaceableKey, *event.E]()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(urls))
|
||||
|
||||
seenAlreadyLatest := xsync.NewMapOf[ReplaceableKey, *timestamp.T]()
|
||||
opts = append(
|
||||
opts, WithCheckDuplicateReplaceable(
|
||||
func(rk ReplaceableKey, ts *timestamp.T) bool {
|
||||
updated := false
|
||||
seenAlreadyLatest.Compute(
|
||||
rk, func(latest *timestamp.T, _ bool) (
|
||||
newValue *timestamp.T, delete bool,
|
||||
) {
|
||||
if ts.I64() > latest.I64() {
|
||||
updated = true // we are updating the most recent
|
||||
return ts, false
|
||||
}
|
||||
return latest, false // the one we had was already more recent
|
||||
},
|
||||
)
|
||||
return updated
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
for _, url := range urls {
|
||||
go func(nm string) {
|
||||
defer wg.Done()
|
||||
|
||||
if mh := pool.queryMiddleware; mh != nil {
|
||||
if filter.Kinds != nil && filter.Authors != nil {
|
||||
for _, kind := range filter.Kinds.K {
|
||||
for _, author := range filter.Authors.ToSliceOfBytes() {
|
||||
mh(nm, author, kind)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
relay, err := pool.EnsureRelay(nm)
|
||||
if err != nil {
|
||||
log.D.F("error connecting to %s with %v: %s", nm, filter, err)
|
||||
return
|
||||
}
|
||||
|
||||
hasAuthed := false
|
||||
|
||||
subscribe:
|
||||
sub, err := relay.Subscribe(ctx, filters.New(filter), opts...)
|
||||
if err != nil {
|
||||
log.D.F(
|
||||
"error subscribing to %s with %v: %s", relay, filter, err,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-sub.EndOfStoredEvents:
|
||||
return
|
||||
case reason := <-sub.ClosedReason:
|
||||
if strings.HasPrefix(
|
||||
reason, "auth-required:",
|
||||
) && pool.authHandler != nil && !hasAuthed {
|
||||
// relay is requesting auth. if we can we will perform auth and try again
|
||||
err = relay.Auth(ctx, pool.authHandler)
|
||||
if err == nil {
|
||||
hasAuthed = true // so we don't keep doing AUTH again and again
|
||||
goto subscribe
|
||||
}
|
||||
}
|
||||
log.D.F("CLOSED from %s: '%s'\n", nm, reason)
|
||||
return
|
||||
case evt, more := <-sub.Events:
|
||||
if !more {
|
||||
return
|
||||
}
|
||||
|
||||
ie := RelayEvent{E: evt, Relay: relay}
|
||||
if mh := pool.eventMiddleware; mh != nil {
|
||||
mh(ie)
|
||||
}
|
||||
|
||||
results.Store(
|
||||
ReplaceableKey{evt.PubKeyString(), evt.Tags.GetD()},
|
||||
evt,
|
||||
)
|
||||
}
|
||||
}
|
||||
}(string(normalize.URL(url)))
|
||||
}
|
||||
|
||||
// this will happen when all subscriptions get an eose (or when they die)
|
||||
wg.Wait()
|
||||
cancel(errors.New("all subscriptions ended"))
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func (pool *SimplePool) subMany(
|
||||
ctx context.T,
|
||||
urls []string,
|
||||
filters *filters.T,
|
||||
eoseChan chan struct{},
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
ctx, cancel := context.Cause(ctx)
|
||||
_ = cancel // do this so `go vet` will stop complaining
|
||||
events := make(chan RelayEvent)
|
||||
seenAlready := xsync.NewMapOf[string, *timestamp.T]()
|
||||
ticker := time.NewTicker(seenAlreadyDropTick)
|
||||
|
||||
eoseWg := sync.WaitGroup{}
|
||||
eoseWg.Add(len(urls))
|
||||
if eoseChan != nil {
|
||||
go func() {
|
||||
eoseWg.Wait()
|
||||
close(eoseChan)
|
||||
}()
|
||||
}
|
||||
|
||||
pending := xsync.NewCounter()
|
||||
pending.Add(int64(len(urls)))
|
||||
for i, url := range urls {
|
||||
url = string(normalize.URL(url))
|
||||
urls[i] = url
|
||||
if idx := slices.Index(urls, url); idx != i {
|
||||
// skip duplicate relays in the list
|
||||
eoseWg.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
eosed := atomic.Bool{}
|
||||
firstConnection := true
|
||||
|
||||
go func(nm string) {
|
||||
defer func() {
|
||||
pending.Dec()
|
||||
if pending.Value() == 0 {
|
||||
close(events)
|
||||
cancel(fmt.Errorf("aborted: %w", context.GetCause(ctx)))
|
||||
}
|
||||
if eosed.CompareAndSwap(false, true) {
|
||||
eoseWg.Done()
|
||||
}
|
||||
}()
|
||||
|
||||
hasAuthed := false
|
||||
interval := 3 * time.Second
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
var sub *Subscription
|
||||
|
||||
if mh := pool.queryMiddleware; mh != nil {
|
||||
for _, filter := range filters.F {
|
||||
if filter.Kinds != nil && filter.Authors != nil {
|
||||
for _, kind := range filter.Kinds.K {
|
||||
for _, author := range filter.Authors.ToSliceOfBytes() {
|
||||
mh(nm, author, kind)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
relay, err := pool.EnsureRelay(nm)
|
||||
if err != nil {
|
||||
// if we never connected to this just fail
|
||||
if firstConnection {
|
||||
return
|
||||
}
|
||||
|
||||
// otherwise (if we were connected and got disconnected) keep trying to reconnect
|
||||
log.D.F("%s reconnecting because connection failed\n", nm)
|
||||
goto reconnect
|
||||
}
|
||||
firstConnection = false
|
||||
hasAuthed = false
|
||||
|
||||
subscribe:
|
||||
sub, err = relay.Subscribe(
|
||||
ctx, filters, append(
|
||||
opts, WithCheckDuplicate(
|
||||
func(id, relay string) bool {
|
||||
_, exists := seenAlready.Load(id)
|
||||
if exists && pool.duplicateMiddleware != nil {
|
||||
pool.duplicateMiddleware(relay, id)
|
||||
}
|
||||
return exists
|
||||
},
|
||||
),
|
||||
)...,
|
||||
)
|
||||
if err != nil {
|
||||
log.D.F("%s reconnecting because subscription died\n", nm)
|
||||
goto reconnect
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-sub.EndOfStoredEvents
|
||||
|
||||
// guard here otherwise a resubscription will trigger a duplicate call to eoseWg.Done()
|
||||
if eosed.CompareAndSwap(false, true) {
|
||||
eoseWg.Done()
|
||||
}
|
||||
}()
|
||||
|
||||
// reset interval when we get a good subscription
|
||||
interval = 3 * time.Second
|
||||
|
||||
for {
|
||||
select {
|
||||
case evt, more := <-sub.Events:
|
||||
if !more {
|
||||
// this means the connection was closed for weird reasons, like the server shut down
|
||||
// so we will update the filters here to include only events seem from now on
|
||||
// and try to reconnect until we succeed
|
||||
now := timestamp.Now()
|
||||
for i := range filters.F {
|
||||
filters.F[i].Since = now
|
||||
}
|
||||
log.D.F(
|
||||
"%s reconnecting because sub.Events is broken\n",
|
||||
nm,
|
||||
)
|
||||
goto reconnect
|
||||
}
|
||||
|
||||
ie := RelayEvent{E: evt, Relay: relay}
|
||||
if mh := pool.eventMiddleware; mh != nil {
|
||||
mh(ie)
|
||||
}
|
||||
|
||||
select {
|
||||
case events <- ie:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
case <-ticker.C:
|
||||
if eosed.Load() {
|
||||
old := timestamp.New(time.Now().Add(-seenAlreadyDropTick).Unix())
|
||||
for id, value := range seenAlready.Range {
|
||||
if value.I64() < old.I64() {
|
||||
seenAlready.Delete(id)
|
||||
}
|
||||
}
|
||||
}
|
||||
case reason := <-sub.ClosedReason:
|
||||
if strings.HasPrefix(
|
||||
reason, "auth-required:",
|
||||
) && pool.authHandler != nil && !hasAuthed {
|
||||
// relay is requesting auth. if we can we will perform auth and try again
|
||||
err := relay.Auth(
|
||||
ctx, pool.authHandler,
|
||||
)
|
||||
if err == nil {
|
||||
hasAuthed = true // so we don't keep doing AUTH again and again
|
||||
goto subscribe
|
||||
}
|
||||
} else {
|
||||
log.D.F("CLOSED from %s: '%s'\n", nm, reason)
|
||||
}
|
||||
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
reconnect:
|
||||
// we will go back to the beginning of the loop and try to connect again and again
|
||||
// until the context is canceled
|
||||
time.Sleep(interval)
|
||||
interval = interval * 17 / 10 // the next time we try we will wait longer
|
||||
}
|
||||
}(url)
|
||||
}
|
||||
|
||||
return events
|
||||
}
|
||||
|
||||
// Deprecated: SubManyEose is deprecated: use FetchMany instead.
|
||||
func (pool *SimplePool) SubManyEose(
|
||||
ctx context.T,
|
||||
urls []string,
|
||||
filters *filters.T,
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
seenAlready := xsync.NewMapOf[string, struct{}]()
|
||||
return pool.subManyEoseNonOverwriteCheckDuplicate(
|
||||
ctx, urls, filters,
|
||||
WithCheckDuplicate(
|
||||
func(id, relay string) bool {
|
||||
_, exists := seenAlready.LoadOrStore(id, struct{}{})
|
||||
if exists && pool.duplicateMiddleware != nil {
|
||||
pool.duplicateMiddleware(relay, id)
|
||||
}
|
||||
return exists
|
||||
},
|
||||
),
|
||||
opts...,
|
||||
)
|
||||
}
|
||||
|
||||
func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate(
|
||||
ctx context.T,
|
||||
urls []string,
|
||||
filters *filters.T,
|
||||
wcd WithCheckDuplicate,
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
ctx, cancel := context.Cause(ctx)
|
||||
|
||||
events := make(chan RelayEvent)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(urls))
|
||||
|
||||
opts = append(opts, wcd)
|
||||
|
||||
go func() {
|
||||
// this will happen when all subscriptions get an eose (or when they die)
|
||||
wg.Wait()
|
||||
cancel(errors.New("all subscriptions ended"))
|
||||
close(events)
|
||||
}()
|
||||
|
||||
for _, url := range urls {
|
||||
go func(nm string) {
|
||||
defer wg.Done()
|
||||
|
||||
if mh := pool.queryMiddleware; mh != nil {
|
||||
for _, filter := range filters.F {
|
||||
if filter.Kinds != nil && filter.Authors != nil {
|
||||
for _, kind := range filter.Kinds.K {
|
||||
for _, author := range filter.Authors.ToSliceOfBytes() {
|
||||
mh(nm, author, kind)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
relay, err := pool.EnsureRelay(nm)
|
||||
if err != nil {
|
||||
log.D.F(
|
||||
"error connecting to %s with %v: %s", nm, filters, err,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
hasAuthed := false
|
||||
|
||||
subscribe:
|
||||
sub, err := relay.Subscribe(ctx, filters, opts...)
|
||||
if err != nil {
|
||||
log.D.F(
|
||||
"error subscribing to %s with %v: %s", relay, filters, err,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-sub.EndOfStoredEvents:
|
||||
return
|
||||
case reason := <-sub.ClosedReason:
|
||||
if strings.HasPrefix(
|
||||
reason, "auth-required:",
|
||||
) && pool.authHandler != nil && !hasAuthed {
|
||||
// relay is requesting auth. if we can we will perform auth and try again
|
||||
err := relay.Auth(
|
||||
ctx, pool.authHandler,
|
||||
)
|
||||
if err == nil {
|
||||
hasAuthed = true // so we don't keep doing AUTH again and again
|
||||
goto subscribe
|
||||
}
|
||||
}
|
||||
log.D.F("CLOSED from %s: '%s'\n", nm, reason)
|
||||
return
|
||||
case evt, more := <-sub.Events:
|
||||
if !more {
|
||||
return
|
||||
}
|
||||
|
||||
ie := RelayEvent{E: evt, Relay: relay}
|
||||
if mh := pool.eventMiddleware; mh != nil {
|
||||
mh(ie)
|
||||
}
|
||||
|
||||
select {
|
||||
case events <- ie:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}(string(normalize.URL(url)))
|
||||
}
|
||||
|
||||
return events
|
||||
}
|
||||
|
||||
// QuerySingle returns the first event returned by the first relay, cancels everything else.
|
||||
func (pool *SimplePool) QuerySingle(
|
||||
ctx context.T,
|
||||
urls []string,
|
||||
filter *filter.F,
|
||||
opts ...SubscriptionOption,
|
||||
) *RelayEvent {
|
||||
ctx, cancel := context.Cause(ctx)
|
||||
for ievt := range pool.SubManyEose(
|
||||
ctx, urls, filters.New(filter), opts...,
|
||||
) {
|
||||
cancel(errors.New("got the first event and ended successfully"))
|
||||
return &ievt
|
||||
}
|
||||
cancel(errors.New("SubManyEose() didn't get yield events"))
|
||||
return nil
|
||||
}
|
||||
|
||||
// BatchedSubManyEose performs batched subscriptions to multiple relays with different filters.
|
||||
func (pool *SimplePool) BatchedSubManyEose(
|
||||
ctx context.T,
|
||||
dfs []DirectedFilter,
|
||||
opts ...SubscriptionOption,
|
||||
) chan RelayEvent {
|
||||
res := make(chan RelayEvent)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(dfs))
|
||||
seenAlready := xsync.NewMapOf[string, struct{}]()
|
||||
|
||||
for _, df := range dfs {
|
||||
go func(df DirectedFilter) {
|
||||
for ie := range pool.subManyEoseNonOverwriteCheckDuplicate(
|
||||
ctx,
|
||||
[]string{df.Relay},
|
||||
filters.New(df.Filter),
|
||||
func(id, relay string) bool {
|
||||
_, exists := seenAlready.LoadOrStore(id, struct{}{})
|
||||
if exists && pool.duplicateMiddleware != nil {
|
||||
pool.duplicateMiddleware(relay, id)
|
||||
}
|
||||
return exists
|
||||
}, opts...,
|
||||
) {
|
||||
select {
|
||||
case res <- ie:
|
||||
case <-ctx.Done():
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}(df)
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(res)
|
||||
}()
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
// Close closes the pool with the given reason.
|
||||
func (pool *SimplePool) Close(reason string) {
|
||||
pool.cancel(fmt.Errorf("pool closed with reason: '%s'", reason))
|
||||
}
|
||||
@@ -14,11 +14,6 @@ import (
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type ReplaceableKey struct {
|
||||
PubKey string
|
||||
D string
|
||||
}
|
||||
|
||||
// Subscription represents a subscription to a relay.
|
||||
type Subscription struct {
|
||||
counter int64
|
||||
@@ -88,10 +83,8 @@ var (
|
||||
|
||||
func (sub *Subscription) start() {
|
||||
<-sub.Context.Done()
|
||||
|
||||
// the subscription ends once the context is canceled (if not already)
|
||||
sub.unsub(errors.New("context done on start()")) // this will set sub.live to false
|
||||
|
||||
// do this so we don't have the possibility of closing the Events channel and then trying to send to it
|
||||
sub.mu.Lock()
|
||||
close(sub.Events)
|
||||
@@ -107,7 +100,6 @@ func (sub *Subscription) dispatchEvent(evt *event.E) {
|
||||
sub.storedwg.Add(1)
|
||||
added = true
|
||||
}
|
||||
|
||||
go func() {
|
||||
sub.mu.Lock()
|
||||
defer sub.mu.Unlock()
|
||||
@@ -118,7 +110,6 @@ func (sub *Subscription) dispatchEvent(evt *event.E) {
|
||||
case <-sub.Context.Done():
|
||||
}
|
||||
}
|
||||
|
||||
if added {
|
||||
sub.storedwg.Done()
|
||||
}
|
||||
@@ -154,12 +145,10 @@ func (sub *Subscription) Unsub() {
|
||||
func (sub *Subscription) unsub(err error) {
|
||||
// cancel the context (if it's not canceled already)
|
||||
sub.cancel(err)
|
||||
|
||||
// mark subscription as closed and send a CLOSE to the relay (naïve sync.Once implementation)
|
||||
if sub.live.CompareAndSwap(true, false) {
|
||||
sub.Close()
|
||||
}
|
||||
|
||||
// remove subscription from our map
|
||||
sub.Client.Subscriptions.Delete(sub.id.String())
|
||||
}
|
||||
@@ -190,6 +179,5 @@ func (sub *Subscription) Fire() (err error) {
|
||||
sub.cancel(err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user