Make gossip methods part of listener, eliminate storing own ads, up peerstore test node count
This commit is contained in:
@@ -216,9 +216,11 @@ func (ng *Engine) HandleAd(p *pubsub.Message) (e error) {
|
||||
if id, e = peer.IDFromPublicKey(addr.Key); fails(e) {
|
||||
return
|
||||
}
|
||||
if e = ng.Listener.Host.
|
||||
Peerstore().Put(id, addresses.Magic, s.GetAll().ToBytes()); fails(e) {
|
||||
return
|
||||
if id != ng.Listener.Host.ID() {
|
||||
if e = ng.Listener.Host.
|
||||
Peerstore().Put(id, addresses.Magic, s.GetAll().ToBytes()); fails(e) {
|
||||
return
|
||||
}
|
||||
}
|
||||
case *intro.Ad:
|
||||
var intr *intro.Ad
|
||||
|
||||
@@ -96,7 +96,7 @@ func TestEngine_PeerStoreDiscovery(t *testing.T) {
|
||||
if indra.CI == "false" {
|
||||
log2.SetLogLevel(log2.Trace)
|
||||
}
|
||||
const nTotal = 10
|
||||
const nTotal = 50
|
||||
var e error
|
||||
var engines []*Engine
|
||||
var cleanup func()
|
||||
@@ -105,7 +105,7 @@ func TestEngine_PeerStoreDiscovery(t *testing.T) {
|
||||
t.FailNow()
|
||||
}
|
||||
_ = engines
|
||||
time.Sleep(time.Second * 3)
|
||||
time.Sleep(time.Second * 8)
|
||||
cleanup()
|
||||
pauza()
|
||||
}
|
||||
|
||||
@@ -2,7 +2,9 @@ package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/indra-labs/indra/pkg/crypto"
|
||||
dht "github.com/libp2p/go-libp2p-kad-dht"
|
||||
ic "github.com/libp2p/go-libp2p/core/crypto"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
@@ -39,8 +41,7 @@ func NewDHT(ctx context.Context, host host.Host,
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
if e := host.Connect(ctx, *peerinfo); fails(e) {
|
||||
log.D.F("Error while connecting to node %q",
|
||||
peerinfo)
|
||||
log.D.F("Error while connecting to node %q", peerinfo)
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
@@ -57,7 +58,7 @@ func NewDHT(ctx context.Context, host host.Host,
|
||||
|
||||
// Discover uses the DHT to share and distribute peer lists between nodes on
|
||||
// Indranet.
|
||||
func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT,
|
||||
func (l *Listener) Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT,
|
||||
rendezvous []multiaddr.Multiaddr) {
|
||||
|
||||
var disco = routing.NewRoutingDiscovery(dht)
|
||||
@@ -67,7 +68,7 @@ func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT,
|
||||
if _, e = disco.Advertise(ctx, rendezvous[i].String()); e != nil {
|
||||
}
|
||||
}
|
||||
if e = Tick(h, rendezvous, peers, disco, ctx); fails(e) {
|
||||
if e = l.Tick(h, rendezvous, peers, disco, ctx); fails(e) {
|
||||
}
|
||||
ticker := time.NewTicker(time.Second * 1)
|
||||
defer ticker.Stop()
|
||||
@@ -76,16 +77,17 @@ func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT,
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if e = Tick(h, rendezvous, peers, disco, ctx); fails(e) {
|
||||
if e = l.Tick(h, rendezvous, peers, disco, ctx); fails(e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Tick(h host.Host, rendezvous []multiaddr.Multiaddr,
|
||||
func (l *Listener) Tick(h host.Host, rendezvous []multiaddr.Multiaddr,
|
||||
peers <-chan peer.AddrInfo, disco *routing.RoutingDiscovery,
|
||||
ctx context.Context) (e error) {
|
||||
|
||||
log.T.Ln()
|
||||
for i := range rendezvous {
|
||||
if peers, e = disco.FindPeers(ctx,
|
||||
rendezvous[i].String()); fails(e) {
|
||||
@@ -104,8 +106,29 @@ func Tick(h host.Host, rendezvous []multiaddr.Multiaddr,
|
||||
|
||||
continue
|
||||
}
|
||||
log.T.Ln(h.Addrs()[0].String(), "Connected to peer",
|
||||
blue(p.Addrs[0]))
|
||||
var them, us ic.PubKey
|
||||
if them, e = p.ID.ExtractPublicKey(); fails(e) {
|
||||
continue
|
||||
}
|
||||
if us, e = h.ID().ExtractPublicKey(); fails(e) {
|
||||
continue
|
||||
}
|
||||
var themR, usR []byte
|
||||
if themR, e = them.Raw(); fails(e) {
|
||||
continue
|
||||
}
|
||||
if usR, e = us.Raw(); fails(e) {
|
||||
continue
|
||||
}
|
||||
var theirPubkey, ourPubkey *crypto.Pub
|
||||
if theirPubkey, e = crypto.PubFromBytes(themR); fails(e) {
|
||||
continue
|
||||
}
|
||||
if ourPubkey, e = crypto.PubFromBytes(usR); fails(e) {
|
||||
continue
|
||||
}
|
||||
log.T.Ln(ourPubkey.Fingerprint(), "Connected to peer",
|
||||
theirPubkey.Fingerprint())
|
||||
}
|
||||
}
|
||||
return
|
||||
|
||||
@@ -79,7 +79,7 @@ func (c *Conn) GetMTU() int {
|
||||
// messages.
|
||||
func (c *Conn) GetRecv() tpt.Transport { return c.Transport.Receiver }
|
||||
|
||||
// GetRemoteKey ruturns the current remote receiver public key we want to encrypt to (with ECDH).
|
||||
// GetRemoteKey returns the current remote receiver public key we want to encrypt to (with ECDH).
|
||||
func (c *Conn) GetRemoteKey() (remoteKey *crypto.Pub) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
@@ -427,12 +427,12 @@ func NewListener(rendezvous, multiAddr []string, storePath string,
|
||||
libp2p.UserAgent(DefaultUserAgent),
|
||||
libp2p.ListenAddrs(ma...),
|
||||
libp2p.EnableHolePunching(),
|
||||
//libp2p.Transport(libp2pquic.NewTransport),
|
||||
// libp2p.Transport(libp2pquic.NewTransport),
|
||||
libp2p.Transport(tcp.NewTCPTransport),
|
||||
//libp2p.Transport(websocket.New),
|
||||
//libp2p.Security(libp2ptls.ID, libp2ptls.New),
|
||||
// libp2p.Transport(websocket.New),
|
||||
// libp2p.Security(libp2ptls.ID, libp2ptls.New),
|
||||
libp2p.Security(noise.ID, noise.New),
|
||||
//libp2p.NoSecurity,
|
||||
// libp2p.NoSecurity,
|
||||
libp2p.Peerstore(st),
|
||||
); fails(e) {
|
||||
return
|
||||
@@ -441,7 +441,7 @@ func NewListener(rendezvous, multiAddr []string, storePath string,
|
||||
if c.DHT, e = NewDHT(ctx, c.Host, rdv); fails(e) {
|
||||
return
|
||||
}
|
||||
go Discover(ctx, c.Host, c.DHT, rdv)
|
||||
go c.Discover(ctx, c.Host, c.DHT, rdv)
|
||||
c.Host.SetStreamHandler(IndraLibP2PID, c.handle)
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user