completed bunker client and server
not using the dynamic signer because it looks buggy
This commit is contained in:
307
bunker/client.go
Normal file
307
bunker/client.go
Normal file
@@ -0,0 +1,307 @@
|
||||
package bunker
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"math/rand"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
|
||||
"relay.mleku.dev/atomic"
|
||||
"relay.mleku.dev/chk"
|
||||
"relay.mleku.dev/context"
|
||||
"relay.mleku.dev/encryption"
|
||||
"relay.mleku.dev/errorf"
|
||||
"relay.mleku.dev/event"
|
||||
"relay.mleku.dev/filter"
|
||||
"relay.mleku.dev/filters"
|
||||
"relay.mleku.dev/hex"
|
||||
"relay.mleku.dev/keys"
|
||||
"relay.mleku.dev/kind"
|
||||
"relay.mleku.dev/kinds"
|
||||
"relay.mleku.dev/p256k"
|
||||
"relay.mleku.dev/signer"
|
||||
"relay.mleku.dev/tag"
|
||||
"relay.mleku.dev/tags"
|
||||
"relay.mleku.dev/timestamp"
|
||||
"relay.mleku.dev/ws"
|
||||
)
|
||||
|
||||
type BunkerClient struct {
|
||||
serial atomic.Uint64
|
||||
clientSecretKey signer.I
|
||||
target []byte
|
||||
pool *ws.Pool
|
||||
relays []string
|
||||
conversationKey []byte // nip44
|
||||
listeners *xsync.MapOf[string, chan Response]
|
||||
expectingAuth *xsync.MapOf[string, struct{}]
|
||||
idPrefix string
|
||||
onAuth func(string)
|
||||
// memoized
|
||||
getPublicKeyResponse string
|
||||
// SkipSignatureCheck can be set if you don't want to double-check incoming signatures
|
||||
SkipSignatureCheck bool
|
||||
}
|
||||
|
||||
// ConnectBunker establishes an RPC connection to a NIP-46 signer using the relays and secret provided in the bunkerURL.
|
||||
// pool can be passed to reuse an existing pool, otherwise a new pool will be created.
|
||||
func ConnectBunker(
|
||||
ctx context.T,
|
||||
clientSecretKey signer.I,
|
||||
bunkerURLOrNIP05 string,
|
||||
pool *ws.Pool,
|
||||
onAuth func(string),
|
||||
) (client *BunkerClient, err error) {
|
||||
var parsed *url.URL
|
||||
if parsed, err = url.Parse(bunkerURLOrNIP05); chk.E(err) {
|
||||
return
|
||||
}
|
||||
// assume it's a bunker url (will fail later if not)
|
||||
secret := parsed.Query().Get("secret")
|
||||
relays := parsed.Query()["relay"]
|
||||
targetPublicKey := parsed.Host
|
||||
if parsed.Scheme == "" {
|
||||
// could be a NIP-05
|
||||
var pubkey string
|
||||
var relays_ []string
|
||||
if pubkey, relays_, err = queryWellKnownNostrJson(ctx, bunkerURLOrNIP05); chk.E(err) {
|
||||
return
|
||||
}
|
||||
targetPublicKey = pubkey
|
||||
relays = relays_
|
||||
} else if parsed.Scheme == "bunker" {
|
||||
// this is what we were expecting, so just move on
|
||||
} else {
|
||||
// otherwise fail here
|
||||
err = errorf.E("wrong scheme '%s', must be bunker://", parsed.Scheme)
|
||||
return
|
||||
}
|
||||
if !keys.IsValidPublicKey(targetPublicKey) {
|
||||
err = errorf.E("'%s' is not a valid public key hex", targetPublicKey)
|
||||
return
|
||||
}
|
||||
var targetPubkey, sec []byte
|
||||
if targetPubkey, err = keys.HexPubkeyToBytes(targetPublicKey); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if sec, err = hex.Dec(secret); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if client, err = NewBunker(
|
||||
ctx,
|
||||
clientSecretKey,
|
||||
targetPubkey,
|
||||
relays,
|
||||
pool,
|
||||
onAuth,
|
||||
); chk.E(err) {
|
||||
return
|
||||
}
|
||||
_, err = client.RPC(ctx, "connect", [][]byte{targetPubkey, sec})
|
||||
return
|
||||
}
|
||||
|
||||
func NewBunker(
|
||||
ctx context.T,
|
||||
clientSecretKey signer.I,
|
||||
targetPublicKey []byte,
|
||||
relays []string,
|
||||
pool *ws.Pool,
|
||||
onAuth func(string),
|
||||
) (client *BunkerClient, err error) {
|
||||
if pool == nil {
|
||||
pool = ws.NewPool(ctx)
|
||||
}
|
||||
clientSecret := new(p256k.Signer)
|
||||
if err = clientSecret.InitSec(clientSecretKey.Sec()); chk.E(err) {
|
||||
return
|
||||
}
|
||||
clientPubkey := clientSecret.Pub()
|
||||
var conversationKey, sharedSecret []byte
|
||||
if sharedSecret, err = encryption.ComputeSharedSecret(targetPublicKey,
|
||||
clientSecretKey.Sec()); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if conversationKey, err = encryption.GenerateConversationKey(targetPublicKey,
|
||||
clientSecret.Sec()); chk.E(err) {
|
||||
return
|
||||
}
|
||||
client = &BunkerClient{
|
||||
pool: pool,
|
||||
clientSecretKey: clientSecretKey,
|
||||
target: targetPublicKey,
|
||||
relays: relays,
|
||||
conversationKey: conversationKey,
|
||||
listeners: xsync.NewMapOf[string, chan Response](),
|
||||
expectingAuth: xsync.NewMapOf[string, struct{}](),
|
||||
onAuth: onAuth,
|
||||
idPrefix: "gn-" + strconv.Itoa(rand.Intn(65536)),
|
||||
}
|
||||
go func() {
|
||||
now := timestamp.Now()
|
||||
events := pool.SubMany(ctx, relays, filters.New(&filter.T{
|
||||
Tags: tags.New(tag.New([]byte("p"), clientPubkey)),
|
||||
Kinds: kinds.New(kind.NostrConnect),
|
||||
Since: now,
|
||||
}), ws.WithLabel("bunker46client"))
|
||||
for ev := range events {
|
||||
if !ev.Event.Kind.Equal(kind.NostrConnect) {
|
||||
err = errorf.E("event kind is %s, but we expected %s",
|
||||
ev.Event.Kind.Name(), kind.NostrConnect.Name())
|
||||
continue
|
||||
}
|
||||
var plain []byte
|
||||
if plain, err = encryption.Decrypt(ev.Event.Content, conversationKey); chk.E(err) {
|
||||
if plain, err = encryption.DecryptNip4(ev.Event.Content,
|
||||
sharedSecret); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
var resp Response
|
||||
if err = json.Unmarshal(plain, &resp); chk.E(err) {
|
||||
continue
|
||||
}
|
||||
if resp.Result == "auth_url" {
|
||||
// special case
|
||||
authURL := resp.Error
|
||||
if _, ok := client.expectingAuth.Load(resp.ID); ok {
|
||||
client.onAuth(authURL)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if dispatcher, ok := client.listeners.Load(resp.ID); ok {
|
||||
dispatcher <- resp
|
||||
continue
|
||||
}
|
||||
}
|
||||
}()
|
||||
return
|
||||
}
|
||||
|
||||
func (client *BunkerClient) RPC(ctx context.T, method string,
|
||||
params [][]byte) (result string, err error) {
|
||||
id := client.idPrefix + "-" + strconv.FormatUint(client.serial.Add(1), 10)
|
||||
var req []byte
|
||||
if req, err = json.Marshal(Request{
|
||||
ID: id,
|
||||
Method: method,
|
||||
Params: params,
|
||||
}); chk.E(err) {
|
||||
return
|
||||
}
|
||||
var content []byte
|
||||
if content, err = encryption.Encrypt(req, client.conversationKey); chk.E(err) {
|
||||
return
|
||||
}
|
||||
ev := &event.T{
|
||||
Content: content,
|
||||
CreatedAt: timestamp.Now(),
|
||||
Kind: kind.NostrConnect,
|
||||
Tags: tags.New(tag.New([]byte("p"), client.target)),
|
||||
}
|
||||
if err = ev.Sign(client.clientSecretKey); chk.E(err) {
|
||||
return
|
||||
}
|
||||
respWaiter := make(chan Response)
|
||||
client.listeners.Store(id, respWaiter)
|
||||
defer func() {
|
||||
client.listeners.Delete(id)
|
||||
close(respWaiter)
|
||||
}()
|
||||
hasWorked := make(chan struct{})
|
||||
for _, url := range client.relays {
|
||||
go func(url string) {
|
||||
var relay *ws.Client
|
||||
relay, err = client.pool.EnsureRelay(url)
|
||||
if err == nil {
|
||||
if err = relay.Publish(ctx, ev); chk.E(err) {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case hasWorked <- struct{}{}: // todo: shouldn't this be after success?
|
||||
default:
|
||||
}
|
||||
}
|
||||
}(url)
|
||||
}
|
||||
select {
|
||||
case <-hasWorked:
|
||||
// continue
|
||||
case <-ctx.Done():
|
||||
err = errorf.E("couldn't connect to any relay")
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = errorf.E("context canceled")
|
||||
return
|
||||
case resp := <-respWaiter:
|
||||
if resp.Error != "" {
|
||||
err = errorf.E("response error: %s", resp.Error)
|
||||
return
|
||||
}
|
||||
result = resp.Result
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (client *BunkerClient) Ping(ctx context.T) (err error) {
|
||||
if _, err = client.RPC(ctx, "ping", [][]byte{}); chk.E(err) {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (client *BunkerClient) GetPublicKey(ctx context.T) (resp string, err error) {
|
||||
if client.getPublicKeyResponse != "" {
|
||||
resp = client.getPublicKeyResponse
|
||||
return
|
||||
}
|
||||
resp, err = client.RPC(ctx, "get_public_key", [][]byte{})
|
||||
client.getPublicKeyResponse = resp
|
||||
return
|
||||
}
|
||||
|
||||
func (client *BunkerClient) SignEvent(ctx context.T, evt *event.T) (err error) {
|
||||
var resp string
|
||||
if resp, err = client.RPC(ctx, "sign_event", [][]byte{evt.Serialize()}); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if err = json.Unmarshal([]byte(resp), evt); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if !client.SkipSignatureCheck {
|
||||
var valid bool
|
||||
if valid, err = evt.Verify(); chk.E(err) {
|
||||
return
|
||||
}
|
||||
if !valid {
|
||||
err = errorf.E("sign_event response from bunker has invalid signature")
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (client *BunkerClient) NIP44Encrypt(ctx context.T,
|
||||
targetPublicKey, plaintext []byte) (string, error) {
|
||||
return client.RPC(ctx, "nip44_encrypt", [][]byte{targetPublicKey, plaintext})
|
||||
}
|
||||
|
||||
func (client *BunkerClient) NIP44Decrypt(ctx context.T,
|
||||
targetPublicKey, ciphertext []byte) (string, error) {
|
||||
return client.RPC(ctx, "nip44_decrypt", [][]byte{targetPublicKey, ciphertext})
|
||||
}
|
||||
|
||||
func (client *BunkerClient) NIP04Encrypt(ctx context.T,
|
||||
targetPublicKey, plaintext []byte) (string, error) {
|
||||
return client.RPC(ctx, "nip04_encrypt", [][]byte{targetPublicKey, plaintext})
|
||||
}
|
||||
|
||||
func (client *BunkerClient) NIP04Decrypt(ctx context.T,
|
||||
targetPublicKey, ciphertext []byte) (string, error) {
|
||||
return client.RPC(ctx, "nip04_decrypt", [][]byte{targetPublicKey, ciphertext})
|
||||
}
|
||||
@@ -42,8 +42,8 @@ func (r *Response) String() (s string) {
|
||||
}
|
||||
|
||||
type Signer interface {
|
||||
GetSession(clientPubkey string) (Session, bool)
|
||||
HandleRequest(context.T, *event.T) (req Request, resp Response,
|
||||
GetSession(clientPubkey string) (*Session, bool)
|
||||
HandleRequest(context.T, *event.T) (req *Request, resp *Response,
|
||||
eventResponse *event.T, err error)
|
||||
}
|
||||
|
||||
|
||||
24
bunker/nip46_test.go
Normal file
24
bunker/nip46_test.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package bunker
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestValidBunkerURL(t *testing.T) {
|
||||
valid := IsValidBunkerURL("bunker://3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d?relay=wss%3A%2F%2Frelay.damus.io&relay=wss%3A%2F%2Frelay.snort.social&relay=wss%3A%2F%2Frelay.nsecbunker.com")
|
||||
assert.True(t, valid, "should be valid")
|
||||
|
||||
inValid := IsValidBunkerURL("askjdbkajdbv")
|
||||
assert.False(t, inValid, "should be invalid")
|
||||
|
||||
inValid1 := IsValidBunkerURL("asdjasbndksa@asjdnksa.com")
|
||||
assert.False(t, inValid1, "should be invalid")
|
||||
|
||||
inValid2 := IsValidBunkerURL("https://hello.com?relays=wss://xxxxxx.xxxx")
|
||||
assert.False(t, inValid2, "should be invalid")
|
||||
|
||||
inValid3 := IsValidBunkerURL("bunker://fa883d107ef9e558472c4eb9aaaefa459d?relay=wss%3A%2F%2Frelay.damus.io&relay=wss%3A%2F%2Frelay.snort.social&relay=wss%3A%2F%2Frelay.nsecbunker.com")
|
||||
assert.False(t, inValid3, "should be invalid")
|
||||
}
|
||||
@@ -13,9 +13,7 @@ import (
|
||||
)
|
||||
|
||||
type Session struct {
|
||||
Pubkey string
|
||||
SharedKey []byte
|
||||
ConversationKey []byte
|
||||
Pubkey, SharedKey, ConversationKey []byte
|
||||
}
|
||||
|
||||
func (s *Session) ParseRequest(ev *event.T) (req *Request, err error) {
|
||||
@@ -32,19 +30,19 @@ func (s *Session) ParseRequest(ev *event.T) (req *Request, err error) {
|
||||
}
|
||||
|
||||
func (s *Session) MakeResponse(id, requester, result string,
|
||||
inErr error) (resp *Response,
|
||||
ev *event.T, err error) {
|
||||
if inErr != nil {
|
||||
rErr error) (resp *Response, ev *event.T, err error) {
|
||||
if rErr != nil {
|
||||
resp = &Response{
|
||||
ID: string(id),
|
||||
Result: inErr.Error(),
|
||||
ID: id,
|
||||
Result: rErr.Error(),
|
||||
}
|
||||
} else if len(result) > 0 {
|
||||
resp = &Response{
|
||||
ID: string(id),
|
||||
Result: string(result),
|
||||
ID: id,
|
||||
Result: result,
|
||||
}
|
||||
}
|
||||
// todo: what if the response is empty? this shouldn't happen i think?
|
||||
var j []byte
|
||||
if j, err = json.Marshal(resp); chk.E(err) {
|
||||
return
|
||||
|
||||
@@ -17,6 +17,8 @@ import (
|
||||
"relay.mleku.dev/signer"
|
||||
)
|
||||
|
||||
var _ Signer = (*StaticKeySigner)(nil)
|
||||
|
||||
type StaticKeySigner struct {
|
||||
sync.Mutex
|
||||
secretKey signer.I
|
||||
@@ -31,6 +33,8 @@ func NewStaticKeySigner(secretKey signer.I) *StaticKeySigner {
|
||||
}
|
||||
|
||||
func (p *StaticKeySigner) GetSession(clientPubkey string) (s *Session, exists bool) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
s, exists = p.sessions[clientPubkey]
|
||||
return
|
||||
}
|
||||
@@ -47,8 +51,11 @@ func (p *StaticKeySigner) getOrCreateSession(clientPubkey []byte) (s *Session, e
|
||||
p.secretKey.Sec()); chk.E(err) {
|
||||
return
|
||||
}
|
||||
s.ConversationKey = p.secretKey.Pub()
|
||||
s.Pubkey = hex.Enc(p.secretKey.Pub())
|
||||
if s.ConversationKey, err = encryption.GenerateConversationKey(clientPubkey,
|
||||
p.secretKey.Pub()); chk.E(err) {
|
||||
return
|
||||
}
|
||||
s.Pubkey = p.secretKey.Pub()
|
||||
// add to pool
|
||||
p.sessions[string(clientPubkey)] = s
|
||||
return
|
||||
@@ -79,7 +86,7 @@ func (p *StaticKeySigner) HandleRequest(_ context.T, ev *event.T) (req *Request,
|
||||
result = []byte("ack")
|
||||
harmless = true
|
||||
case "get_public_key":
|
||||
result = []byte(session.Pubkey)
|
||||
result = session.Pubkey
|
||||
harmless = true
|
||||
case "sign_event":
|
||||
if len(req.Params) != 1 {
|
||||
|
||||
30
bunker/wellknownnostrjson.go
Normal file
30
bunker/wellknownnostrjson.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package bunker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"relay.mleku.dev/chk"
|
||||
"relay.mleku.dev/dns"
|
||||
"relay.mleku.dev/errorf"
|
||||
)
|
||||
|
||||
func queryWellKnownNostrJson(ctx context.Context, fullname string) (pubkey string,
|
||||
relays []string, err error) {
|
||||
var result *dns.WellKnownResponse
|
||||
var name string
|
||||
if result, name, err = dns.Fetch(ctx, fullname); chk.E(err) {
|
||||
return
|
||||
}
|
||||
|
||||
var ok bool
|
||||
if pubkey, ok = result.Names[name]; !ok {
|
||||
return "", nil, fmt.Errorf("no entry found for the '%s' name", name)
|
||||
}
|
||||
if relays, _ = result.NIP46[pubkey]; !ok {
|
||||
err = errorf.E("no bunker relays found for the '%s' name", name)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
@@ -73,13 +73,14 @@ func SecretBytesToPubKeyHex(skb []byte) (pk string, err error) {
|
||||
|
||||
// IsValid32ByteHex checks that a hex string is a valid 32 bytes lower case hex encoded value as
|
||||
// per nostr NIP-01 spec.
|
||||
func IsValid32ByteHex[V []byte | string](pk V) bool {
|
||||
func IsValid32ByteHex[V []byte | string](pk V) (valid bool) {
|
||||
if bytes.Equal(bytes.ToLower([]byte(pk)), []byte(pk)) {
|
||||
return false
|
||||
}
|
||||
var err error
|
||||
dec := make([]byte, 32)
|
||||
if _, err = hex.DecBytes(dec, []byte(pk)); chk.E(err) {
|
||||
return
|
||||
}
|
||||
return len(dec) == 32
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
"relay.mleku.dev/ec/schnorr"
|
||||
"relay.mleku.dev/ec/secp256k1"
|
||||
"relay.mleku.dev/errorf"
|
||||
"relay.mleku.dev/log"
|
||||
"relay.mleku.dev/sha256"
|
||||
)
|
||||
|
||||
@@ -171,28 +170,6 @@ func Generate() (
|
||||
// Negate inverts a secret key so an odd prefix bit becomes even and vice versa.
|
||||
func Negate(uskb []byte) { C.secp256k1_ec_seckey_negate(ctx, ToUchar(uskb)) }
|
||||
|
||||
type ECPub struct {
|
||||
Key ECPubKey
|
||||
}
|
||||
|
||||
// ECPubFromSchnorrBytes converts a BIP-340 public key to its even standard 33 byte encoding.
|
||||
//
|
||||
// This function is for the purpose of getting a key to do ECDH from an x-only key.
|
||||
func ECPubFromSchnorrBytes(xkb []byte) (pub *ECPub, err error) {
|
||||
if err = AssertLen(xkb, schnorr.PubKeyBytesLen, "pubkey"); chk.E(err) {
|
||||
return
|
||||
}
|
||||
pub = &ECPub{}
|
||||
p := append([]byte{0}, xkb...)
|
||||
if C.secp256k1_ec_pubkey_parse(ctx, &pub.Key, ToUchar(p),
|
||||
secp256k1.PubKeyBytesLenCompressed) != 1 {
|
||||
err = errorf.E("failed to parse pubkey from %0x", p)
|
||||
log.I.S(pub)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Pub is a schnorr BIP-340 public key.
|
||||
type Pub struct {
|
||||
Key PubKey
|
||||
|
||||
44
ws/pool.go
44
ws/pool.go
@@ -140,18 +140,19 @@ func (pool *Pool) EnsureRelay(url string) (*Client, error) {
|
||||
|
||||
// SubMany opens a subscription with the given filters to multiple relays
|
||||
// the subscriptions only end when the context is canceled
|
||||
func (pool *Pool) SubMany(c context.T, urls []string, ff *filters.T) chan IncomingEvent {
|
||||
return pool.subMany(c, urls, ff, true)
|
||||
func (pool *Pool) SubMany(c context.T, urls []string, ff *filters.T,
|
||||
opts ...SubscriptionOption) chan IncomingEvent {
|
||||
return pool.subMany(c, urls, ff, true, opts)
|
||||
}
|
||||
|
||||
// SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays
|
||||
func (pool *Pool) SubManyNonUnique(c context.T, urls []string,
|
||||
ff *filters.T) chan IncomingEvent {
|
||||
return pool.subMany(c, urls, ff, false)
|
||||
ff *filters.T, opts ...SubscriptionOption) chan IncomingEvent {
|
||||
return pool.subMany(c, urls, ff, false, opts)
|
||||
}
|
||||
|
||||
func (pool *Pool) subMany(c context.T, urls []string, ff *filters.T,
|
||||
unique bool) chan IncomingEvent {
|
||||
unique bool, opts []SubscriptionOption) chan IncomingEvent {
|
||||
ctx, cancel := context.Cancel(c)
|
||||
_ = cancel // do this so `go vet` will stop complaining
|
||||
events := make(chan IncomingEvent)
|
||||
@@ -192,7 +193,7 @@ func (pool *Pool) subMany(c context.T, urls []string, ff *filters.T,
|
||||
}
|
||||
hasAuthed = false
|
||||
subscribe:
|
||||
if sub, err = relay.Subscribe(ctx, ff); chk.T(err) {
|
||||
if sub, err = relay.Subscribe(ctx, ff, opts...); chk.T(err) {
|
||||
goto reconnect
|
||||
}
|
||||
go func() {
|
||||
@@ -268,18 +269,20 @@ func (pool *Pool) subMany(c context.T, urls []string, ff *filters.T,
|
||||
}
|
||||
|
||||
// SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE
|
||||
func (pool *Pool) SubManyEose(c context.T, urls []string, ff *filters.T) chan IncomingEvent {
|
||||
return pool.subManyEose(c, urls, ff, true)
|
||||
func (pool *Pool) SubManyEose(c context.T, urls []string, ff *filters.T,
|
||||
opts ...SubscriptionOption) chan IncomingEvent {
|
||||
return pool.subManyEose(c, urls, ff, true, opts)
|
||||
}
|
||||
|
||||
// SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays
|
||||
func (pool *Pool) SubManyEoseNonUnique(c context.T, urls []string,
|
||||
ff *filters.T) chan IncomingEvent {
|
||||
return pool.subManyEose(c, urls, ff, false)
|
||||
ff *filters.T,
|
||||
opts ...SubscriptionOption) chan IncomingEvent {
|
||||
return pool.subManyEose(c, urls, ff, false, opts)
|
||||
}
|
||||
|
||||
func (pool *Pool) subManyEose(c context.T, urls []string, ff *filters.T,
|
||||
unique bool) chan IncomingEvent {
|
||||
unique bool, opts []SubscriptionOption) chan IncomingEvent {
|
||||
ctx, cancel := context.Cancel(c)
|
||||
|
||||
events := make(chan IncomingEvent)
|
||||
@@ -307,7 +310,7 @@ func (pool *Pool) subManyEose(c context.T, urls []string, ff *filters.T,
|
||||
|
||||
subscribe:
|
||||
var sub *Subscription
|
||||
if sub, err = client.Subscribe(ctx, ff); chk.E(err) || sub == nil {
|
||||
if sub, err = client.Subscribe(ctx, ff, opts...); chk.E(err) || sub == nil {
|
||||
log.E.F("error subscribing to %s with %v: %s", client, ff, err)
|
||||
return
|
||||
}
|
||||
@@ -363,7 +366,7 @@ func (pool *Pool) subManyEose(c context.T, urls []string, ff *filters.T,
|
||||
func (pool *Pool) QuerySingle(c context.T, urls []string, f *filter.T) *IncomingEvent {
|
||||
ctx, cancel := context.Cancel(c)
|
||||
defer cancel()
|
||||
for ievt := range pool.SubManyEose(ctx, urls, filters.New(f)) {
|
||||
for ievt := range pool.SubManyEose(ctx, urls, filters.New(f), nil) {
|
||||
return &ievt
|
||||
}
|
||||
return nil
|
||||
@@ -372,13 +375,14 @@ func (pool *Pool) QuerySingle(c context.T, urls []string, f *filter.T) *Incoming
|
||||
func (pool *Pool) batchedSubMany(
|
||||
c context.T,
|
||||
dfs []DirectedFilters,
|
||||
subFn func(context.T, []string, *filters.T, bool) chan IncomingEvent,
|
||||
subFn func(context.T, []string, *filters.T, bool, []SubscriptionOption) chan IncomingEvent,
|
||||
opts []SubscriptionOption,
|
||||
) chan IncomingEvent {
|
||||
res := make(chan IncomingEvent)
|
||||
|
||||
for _, df := range dfs {
|
||||
go func(df DirectedFilters) {
|
||||
for ie := range subFn(c, []string{df.Client}, df.Filters, true) {
|
||||
for ie := range subFn(c, []string{df.Client}, df.Filters, true, opts) {
|
||||
res <- ie
|
||||
}
|
||||
}(df)
|
||||
@@ -388,11 +392,13 @@ func (pool *Pool) batchedSubMany(
|
||||
}
|
||||
|
||||
// BatchedSubMany fires subscriptions only to specific relays, but batches them when they are the same.
|
||||
func (pool *Pool) BatchedSubMany(c context.T, dfs []DirectedFilters) chan IncomingEvent {
|
||||
return pool.batchedSubMany(c, dfs, pool.subMany)
|
||||
func (pool *Pool) BatchedSubMany(c context.T, dfs []DirectedFilters,
|
||||
opts ...SubscriptionOption) chan IncomingEvent {
|
||||
return pool.batchedSubMany(c, dfs, pool.subMany, opts)
|
||||
}
|
||||
|
||||
// BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays.
|
||||
func (pool *Pool) BatchedSubManyEose(c context.T, dfs []DirectedFilters) chan IncomingEvent {
|
||||
return pool.batchedSubMany(c, dfs, pool.subManyEose)
|
||||
func (pool *Pool) BatchedSubManyEose(c context.T, dfs []DirectedFilters,
|
||||
opts ...SubscriptionOption) chan IncomingEvent {
|
||||
return pool.batchedSubMany(c, dfs, pool.subManyEose, opts)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user