diff --git a/pkg/engine/dispatcher_test.go b/pkg/engine/dispatcher_test.go index 9754f697..0a7aa03c 100644 --- a/pkg/engine/dispatcher_test.go +++ b/pkg/engine/dispatcher_test.go @@ -1,93 +1,44 @@ package engine import ( - "context" "github.com/indra-labs/indra" - "net/netip" "os" "testing" "time" - "github.com/multiformats/go-multiaddr" - - "github.com/indra-labs/indra/pkg/crypto" - "github.com/indra-labs/indra/pkg/engine/node" "github.com/indra-labs/indra/pkg/engine/transport" log2 "github.com/indra-labs/indra/pkg/proc/log" ) func TestEngine_Dispatcher(t *testing.T) { - if indra.CI=="false" { + if indra.CI == "false" { log2.SetLogLevel(log2.Trace) } - var e error const nTotal = 26 - ctx, cancel := context.WithCancel(context.Background()) - var listeners []*transport.Listener - var keys []*crypto.Keys - var nodes []*node.Node + var cancel func() + var e error var engines []*Engine var seed string for i := 0; i < nTotal; i++ { - var k *crypto.Keys - if k, e = crypto.GenerateKeys(); fails(e) { - t.FailNow() - } - keys = append(keys, k) - var l *transport.Listener dataPath, err := os.MkdirTemp(os.TempDir(), "badger") if err != nil { t.FailNow() } - if l, e = transport.NewListener(seed, transport.LocalhostZeroIPv4TCP, - dataPath, k, ctx, transport.DefaultMTU); fails(e) { - os.RemoveAll(dataPath) - t.FailNow() - } - sa := transport.GetHostAddress(l.Host) - if i == 0 { - seed = sa - } - listeners = append(listeners, l) - var addr netip.AddrPort - var ma multiaddr.Multiaddr - if ma, e = multiaddr.NewMultiaddr(sa); fails(e) { - os.RemoveAll(dataPath) - t.FailNow() - } - - var ip, port string - if ip, e = ma.ValueForProtocol(multiaddr.P_IP4); fails(e) { - // we specified ipv4 previously. - os.RemoveAll(dataPath) - t.FailNow() - } - if port, e = ma.ValueForProtocol(multiaddr.P_TCP); fails(e) { - os.RemoveAll(dataPath) - t.FailNow() - } - if addr, e = netip.ParseAddrPort(ip + ":" + port); fails(e) { - os.RemoveAll(dataPath) - t.FailNow() - } - var nod *node.Node - if nod, _ = node.NewNode(&addr, k, nil, 50000); fails(e) { - os.RemoveAll(dataPath) - t.FailNow() - } - nodes = append(nodes, nod) var eng *Engine - if eng, e = NewEngine(Params{ - Listener: l, - Keys: k, - Node: nod, - }); fails(e) { - os.RemoveAll(dataPath) - t.FailNow() + if eng, cancel, e = CreateMockEngine(seed, dataPath); fails(e) { + return } engines = append(engines, eng) + if i == 0 { + seed = transport.GetHostAddress(eng.Manager.Listener.Host) + } defer os.RemoveAll(dataPath) + go eng.Start() + log.D.Ln("started engine", i) } - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 1) cancel() + for i := range engines { + engines[i].Shutdown() + } } diff --git a/pkg/engine/mock.go b/pkg/engine/mock.go index 0de4caf1..e10873eb 100644 --- a/pkg/engine/mock.go +++ b/pkg/engine/mock.go @@ -11,6 +11,9 @@ import ( "github.com/indra-labs/indra/pkg/engine/transport" log2 "github.com/indra-labs/indra/pkg/proc/log" "github.com/indra-labs/indra/pkg/util/slice" + "github.com/multiformats/go-multiaddr" + "net/netip" + "os" ) var ( @@ -84,3 +87,60 @@ func createNMockCircuits(inclSessions bool, nCircuits int, } return } + +// CreateMockEngine creates an indra Engine with a random localhost listener. +func CreateMockEngine(seed, dataPath string) (ng *Engine, cancel func(), e error) { + var ctx context.Context + ctx, cancel = context.WithCancel(context.Background()) + var keys []*crypto.Keys + var nodes []*node.Node + var k *crypto.Keys + if k, e = crypto.GenerateKeys(); fails(e) { + return + } + keys = append(keys, k) + var l *transport.Listener + if l, e = transport.NewListener(seed, transport.LocalhostZeroIPv4TCP, + dataPath, k, ctx, transport.DefaultMTU); fails(e) { + os.RemoveAll(dataPath) + return + } + sa := transport.GetHostAddress(l.Host) + var addr netip.AddrPort + var ma multiaddr.Multiaddr + if ma, e = multiaddr.NewMultiaddr(sa); fails(e) { + e = os.RemoveAll(dataPath) + return + } + + var ip, port string + if ip, e = ma.ValueForProtocol(multiaddr.P_IP4); fails(e) { + // we specified ipv4 previously. + fails(os.RemoveAll(dataPath)) + return + } + if port, e = ma.ValueForProtocol(multiaddr.P_TCP); fails(e) { + fails(os.RemoveAll(dataPath)) + return + } + if addr, e = netip.ParseAddrPort(ip + ":" + port); fails(e) { + fails(os.RemoveAll(dataPath)) + return + } + var nod *node.Node + if nod, _ = node.NewNode(&addr, k, nil, 50000); fails(e) { + fails(os.RemoveAll(dataPath)) + return + } + nodes = append(nodes, nod) + if ng, e = NewEngine(Params{ + Transport: transport.NewByteChan(transport.ConnBufs), + Listener: l, + Keys: k, + Node: nod, + }); fails(e) { + os.RemoveAll(dataPath) + return + } + return +} diff --git a/pkg/engine/transport/peerstore.go b/pkg/engine/transport/peerstore.go new file mode 100644 index 00000000..0af57f94 --- /dev/null +++ b/pkg/engine/transport/peerstore.go @@ -0,0 +1,11 @@ +package transport + +import "github.com/libp2p/go-libp2p/core/peer" + +func (l *Listener) Publish(p peer.ID, key string, val interface{}) error { + return l.Host.Peerstore().Put(p, key, val) +} + +func (l *Listener) FindPeerRecord(p peer.ID, key string) (val interface{}, e error) { + return l.Host.Peerstore().Get(p, key) +} diff --git a/pkg/engine/transport/transport_test.go b/pkg/engine/transport/transport_test.go index d716fbf8..6086072b 100644 --- a/pkg/engine/transport/transport_test.go +++ b/pkg/engine/transport/transport_test.go @@ -14,7 +14,7 @@ import ( func TestNewListener(t *testing.T) { if indra.CI=="false" { - log2.SetLogLevel(log2.Debug) + log2.SetLogLevel(log2.Trace) } var e error var l1, l2 *Listener