Moved listener out of sessionmanager into engine.

This commit is contained in:
херетик
2023-06-07 14:52:30 +01:00
parent 16b6766d74
commit c0f4fe517c
8 changed files with 56 additions and 111 deletions

View File

@@ -5,7 +5,7 @@
<go_parameters value="-v ./... -tags local" />
<root_directory value="$PROJECT_DIR$" />
<kind value="DIRECTORY" />
<package value="git-indra.lan/indra-labs/indra/pkg/engine" />
<package value="git-indra.lan/indra-labs/indra/pkg/onions" />
<directory value="$PROJECT_DIR$/pkg/onions" />
<filePath value="$PROJECT_DIR$" />
<framework value="gotest" />

View File

@@ -1,8 +1,8 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="go test git-indra.lan/indra-labs/indra/pkg (1)" type="GoTestRunConfiguration" factoryName="Go Test" singleton="true">
<configuration default="false" name="go test local CI" type="GoTestRunConfiguration" factoryName="Go Test">
<module name="indra" />
<working_directory value="$PROJECT_DIR$/pkg" />
<go_parameters value="-v ./... -tags local -ldflags &quot;-X indra.CI=true&quot;" />
<go_parameters value="-v ./... -tags local -ldflags '-X indra.CI=true'" />
<root_directory value="$PROJECT_DIR$" />
<kind value="DIRECTORY" />
<package value="git-indra.lan/indra-labs/indra/pkg/engine" />

View File

@@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="go test git-indra.lan/indra-labs/indra/pkg (2)" type="GoTestRunConfiguration" factoryName="Go Test">
<configuration default="false" name="go test local" type="GoTestRunConfiguration" factoryName="Go Test">
<module name="indra" />
<working_directory value="$PROJECT_DIR$/pkg" />
<go_parameters value="-v ./... -tags local" />

View File

@@ -13,39 +13,6 @@ import (
log2 "github.com/indra-labs/indra/pkg/proc/log"
)
func TestEngine_Dispatcher(t *testing.T) {
if indra.CI == "false" {
log2.SetLogLevel(log2.Trace)
}
const nTotal = 26
var cancel func()
var e error
var engines []*Engine
var seed string
for i := 0; i < nTotal; i++ {
dataPath, err := os.MkdirTemp(os.TempDir(), "badger")
if err != nil {
t.FailNow()
}
var eng *Engine
if eng, cancel, e = CreateMockEngine(seed, dataPath); fails(e) {
return
}
engines = append(engines, eng)
if i == 0 {
seed = transport.GetHostAddress(eng.Manager.Listener.Host)
}
defer os.RemoveAll(dataPath)
go eng.Start()
log.D.Ln("started engine", i)
}
time.Sleep(time.Second * 1)
cancel()
for i := range engines {
engines[i].Shutdown()
}
}
func TestEngine_PeerStore(t *testing.T) {
if indra.CI == "false" {
log2.SetLogLevel(log2.Trace)
@@ -66,7 +33,7 @@ func TestEngine_PeerStore(t *testing.T) {
}
engines = append(engines, eng)
if i == 0 {
seed = transport.GetHostAddress(eng.Manager.Listener.Host)
seed = transport.GetHostAddress(eng.Listener.Host)
}
defer os.RemoveAll(dataPath)
go eng.Start()
@@ -81,16 +48,8 @@ func TestEngine_PeerStore(t *testing.T) {
t.FailNow()
}
const key = "testkey"
//if e = engines[0].Publish(engines[0].Manager.Listener.Host.ID(),
// key, s.GetAll().ToBytes());fails(e){
// t.FailNow()
//}
//var val interface{}
//val, e = engines[1].FindPeerRecord(engines[0].Manager.Listener.Host.ID(), key)
//log.D.S("val", val)
//val, e = engines[0].FindPeerRecord(engines[0].Manager.Listener.Host.ID(), key)
//log.D.S("val", val)
time.Sleep(time.Second*3)
// todo: read and write things
time.Sleep(time.Second * 3)
cancel()
for i := range engines {
engines[i].Shutdown()

View File

@@ -27,6 +27,7 @@ type (
Engine struct {
Responses *responses.Pending
Manager *sess.Manager
Listener *transport.Listener
h *hidden.Hidden
KeySet *crypto.KeySet
Load atomic.Uint32
@@ -35,7 +36,7 @@ type (
}
Params struct {
Transport tpt.Transport
Listener *transport.Listener
Listener *transport.Listener
*crypto.Keys
Node *node.Node
Nodes []*node.Node
@@ -88,8 +89,9 @@ func (ng *Engine) Handler() (out bool) {
ng.Shutdown()
out = true
break
case c := <-ng.Manager.Listener.Accept():
case c := <-ng.Listener.Accept():
go func() {
log.D.Ln("new connection inbound (TODO):", c.Host.Addrs()[0])
_ = c
}()
case b := <-ng.Manager.ReceiveToLocalNode():
@@ -176,7 +178,8 @@ func NewEngine(p Params) (c *Engine, e error) {
c = &Engine{
Responses: &responses.Pending{},
KeySet: ks,
Manager: sess.NewSessionManager(p.Listener),
Listener: p.Listener,
Manager: sess.NewSessionManager(),
h: hidden.NewHiddenrouting(),
Pause: qu.T(),
C: qu.T(),

View File

@@ -11,12 +11,11 @@ import (
"github.com/indra-labs/indra/pkg/engine/tpt"
"github.com/indra-labs/indra/pkg/engine/transport"
log2 "github.com/indra-labs/indra/pkg/proc/log"
"github.com/indra-labs/indra/pkg/util/multi"
"github.com/indra-labs/indra/pkg/util/slice"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"net/netip"
"os"
"strconv"
)
var (
@@ -101,7 +100,6 @@ func CreateMockEngine(seed, dataPath string) (ng *Engine, cancel func(), e error
var ctx context.Context
ctx, cancel = context.WithCancel(context.Background())
var keys []*crypto.Keys
var nodes []*node.Node
var k *crypto.Keys
if k, e = crypto.GenerateKeys(); fails(e) {
return
@@ -112,21 +110,28 @@ func CreateMockEngine(seed, dataPath string) (ng *Engine, cancel func(), e error
dataPath, k, ctx, transport.DefaultMTU); fails(e) {
return
}
log.D.Ln("listener", l != nil)
//time.Sleep(time.Second/2)
if l == nil {
cancel()
return nil, nil, errors.New("got nil listener")
}
log.D.Ln("getting address")
sa := transport.GetHostAddress(l.Host)
var ap *netip.AddrPort
var ap netip.AddrPort
var ma multiaddr.Multiaddr
if ma, e = multiaddr.NewMultiaddr(sa); fails(e) {
return
}
if ap = MultiaddrToAddrPort(ma); ap == nil {
e = errors.New("unable to parse multiaddr")
if ap, e = multi.AddrToAddrPort(ma); fails(e) {
return
}
log.D.Ln("making node for engine")
var nod *node.Node
if nod, _ = node.NewNode(ap, k, nil, 50000); fails(e) {
if nod, _ = node.NewNode(&ap, k, nil, 50000); fails(e) {
return
}
nodes = append(nodes, nod)
log.D.Ln("appending engine")
if ng, e = NewEngine(Params{
Transport: transport.NewDuplexByteChan(transport.ConnBufs),
Listener: l,
@@ -136,22 +141,3 @@ func CreateMockEngine(seed, dataPath string) (ng *Engine, cancel func(), e error
}
return
}
// MultiaddrToAddrPort returns the ip and port for a. p should be either ma.P_TCP or ma.P_UDP.
// a must be an (ip, TCP) or (ip, udp) address.
func MultiaddrToAddrPort(a multiaddr.Multiaddr) (ap *netip.AddrPort) {
ip, _ := manet.ToIP(a)
var port string
var e error
for _, p := range []int{multiaddr.P_TCP, multiaddr.P_UDP} {
if port, e = a.ValueForProtocol(p); e == nil {
break
} else {
return
}
}
pi, _ := strconv.Atoi(port)
addr, _ := netip.AddrFromSlice(ip)
aap := netip.AddrPortFrom(addr, uint16(pi))
return &aap
}

View File

@@ -36,9 +36,8 @@ type Node struct {
Transport tpt.Transport
}
// NewNode creates a new Node. The Node for a client's self should use true in
// the local parameter to not initialise the peer state ring buffers as it won't
// use them.
// NewNode creates a new Node. The transport should be from either dialing out or
// a peer dialing in and the self model does not need to do this.
func NewNode(addr *netip.AddrPort, keys *crypto.Keys, tpt tpt.Transport,
relayRate int) (n *Node, id nonce.ID) {
id = nonce.NewID()

View File

@@ -16,7 +16,6 @@ import (
"github.com/indra-labs/indra/pkg/engine/payments"
"github.com/indra-labs/indra/pkg/engine/services"
"github.com/indra-labs/indra/pkg/engine/sessions"
"github.com/indra-labs/indra/pkg/engine/transport"
log2 "github.com/indra-labs/indra/pkg/proc/log"
"github.com/indra-labs/indra/pkg/util/slice"
)
@@ -39,6 +38,21 @@ func (sc CircuitCache) Add(s *sessions.Data) CircuitCache {
return sc
}
type (
// A CircuitCache stores each of the 5 hops of a peer node.
CircuitCache map[nonce.ID]*sessions.Circuit
)
type (
// Manager is a session manager for Indra, handling sessions and services.
Manager struct {
nodes []*node.Node
PendingPayments payments.PendingPayments
sessions.Sessions
CircuitCache
sync.Mutex
}
)
// AddNodes adds a Node to a Nodes.
func (sm *Manager) AddNodes(nn ...*node.Node) {
sm.Lock()
@@ -290,18 +304,6 @@ func (sm *Manager) FindPendingPreimage(pi sha256.Hash) (pp *payments.Payment) {
return sm.PendingPayments.FindPreimage(pi)
}
// FindSessionByPubkey searches for a session with a matching public key.
func (sm *Manager) FindSessionByPubkey(id crypto.PubBytes) *sessions.Data {
sm.Lock()
defer sm.Unlock()
for i := range sm.Sessions {
if sm.Sessions[i].Header.Bytes == id {
return sm.Sessions[i]
}
}
return nil
}
// FindSessionByHeader searches for a session with a matching header private key.
func (sm *Manager) FindSessionByHeader(prvKey *crypto.Prv) *sessions.Data {
sm.Lock()
@@ -327,6 +329,18 @@ func (sm *Manager) FindSessionByHeaderPub(pubKey *crypto.Pub) *sessions.Data {
return nil
}
// FindSessionByPubkey searches for a session with a matching public key.
func (sm *Manager) FindSessionByPubkey(id crypto.PubBytes) *sessions.Data {
sm.Lock()
defer sm.Unlock()
for i := range sm.Sessions {
if sm.Sessions[i].Header.Bytes == id {
return sm.Sessions[i]
}
}
return nil
}
// FindSessionPreimage searches for a session with a matching preimage hash.
func (sm *Manager) FindSessionPreimage(pr sha256.Hash) *sessions.Data {
sm.Lock()
@@ -536,26 +550,10 @@ func (sm *Manager) UpdateSessionCache() {
}
}
type (
// A CircuitCache stores each of the 5 hops of a peer node.
CircuitCache map[nonce.ID]*sessions.Circuit
// Manager is a session manager for Indra, handling sessions and services.
Manager struct {
nodes []*node.Node
Listener *transport.Listener
PendingPayments payments.PendingPayments
sessions.Sessions
CircuitCache
sync.Mutex
}
)
// NewSessionManager creates a new session manager.
func NewSessionManager(listener *transport.Listener) *Manager {
func NewSessionManager() *Manager {
return &Manager{
CircuitCache: make(CircuitCache),
PendingPayments: make(payments.PendingPayments, 0),
Listener: listener,
}
}