diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 2148eded..db008058 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -10,7 +10,7 @@ import ( "github.com/indra-labs/indra/pkg/engine/tpt" "github.com/indra-labs/indra/pkg/engine/transport" "github.com/indra-labs/indra/pkg/onions/hidden" - onions2 "github.com/indra-labs/indra/pkg/onions/ont" + "github.com/indra-labs/indra/pkg/onions/ont" "github.com/indra-labs/indra/pkg/onions/reg" "github.com/indra-labs/indra/pkg/util/qu" "github.com/indra-labs/indra/pkg/util/slice" @@ -18,7 +18,7 @@ import ( "go.uber.org/atomic" ) -var _ onions2.Ngin = &Engine{} +var _ ont.Ngin = &Engine{} type ( // Engine processes onion messages, forwarding the relevant data to other relays @@ -53,7 +53,7 @@ func (ng *Engine) GetHidden() *hidden.Hidden { return ng.h } func (ng *Engine) GetLoad() byte { return byte(ng.Load.Load()) } -func (ng *Engine) HandleMessage(s *splice.Splice, pr onions2.Onion) { +func (ng *Engine) HandleMessage(s *splice.Splice, pr ont.Onion) { log.D.F("%s handling received message", ng.Manager.GetLocalNodeAddressString()) s.SetCursor(0) @@ -72,7 +72,7 @@ func (ng *Engine) HandleMessage(s *splice.Splice, pr onions2.Onion) { log.D.Ln("did not get onion") return } - if fails(m.(onions2.Onion).Handle(s, pr, ng)) { + if fails(m.(ont.Onion).Handle(s, pr, ng)) { log.W.S("unrecognised packet", s.GetAll().ToBytes()) } } @@ -82,7 +82,7 @@ func (ng *Engine) Handler() (out bool) { log.T.C(func() string { return ng.Manager.GetLocalNodeAddressString() + " awaiting message" }) - var prev onions2.Onion + var prev ont.Onion select { case <-ng.C.Wait(): ng.Shutdown() diff --git a/pkg/engine/engine_test.go b/pkg/engine/engine_test.go index 873772f2..12edf677 100644 --- a/pkg/engine/engine_test.go +++ b/pkg/engine/engine_test.go @@ -23,7 +23,7 @@ import ( ) func TestClient_SendExit(t *testing.T) { - if indra.CI=="false" { + if indra.CI == "false" { log2.SetLogLevel(log2.Debug) } var clients []*Engine @@ -124,7 +124,7 @@ out: } func TestClient_SendPing(t *testing.T) { - if indra.CI=="false" { + if indra.CI == "false" { log2.SetLogLevel(log2.Debug) } var clients []*Engine @@ -177,7 +177,7 @@ out: } func TestClient_SendSessionKeys(t *testing.T) { - if indra.CI=="false" { + if indra.CI == "false" { log2.SetLogLevel(log2.Debug) } var clients []*Engine @@ -219,8 +219,8 @@ func TestClient_SendSessionKeys(t *testing.T) { counter.Dec() } wg.Wait() - for j := range clients[0].Manager.SessionCache { - log.D.F("%d %s %v", i, j, clients[0].Manager.SessionCache[j]) + for j := range clients[0].Manager.CircuitCache { + log.D.F("%d %s %v", i, j, clients[0].Manager.CircuitCache[j]) } quit.Q() } @@ -229,4 +229,3 @@ func TestClient_SendSessionKeys(t *testing.T) { } cancel() } - diff --git a/pkg/engine/sess/select.go b/pkg/engine/sess/select.go index 27fe0b45..9d0d327c 100644 --- a/pkg/engine/sess/select.go +++ b/pkg/engine/sess/select.go @@ -91,19 +91,19 @@ func (sm *Manager) SelectUnusedCircuit() (c [5]*node.Node) { nodeList := make([]*node.Node, len(sm.nodes)-1) copy(nodeList, sm.nodes[1:]) for i := range nodeList { - if _, ok := sm.SessionCache[nodeList[i].ID]; !ok { + if _, ok := sm.CircuitCache[nodeList[i].ID]; !ok { log.T.F("adding session cache entry for node %s", nodeList[i].ID) - sm.SessionCache[nodeList[i].ID] = &sessions.Circuit{} + sm.CircuitCache[nodeList[i].ID] = &sessions.Circuit{} } } var counter int out: for counter < 5 { - for i := range sm.SessionCache { + for i := range sm.CircuitCache { if counter == 5 { break out } - if sm.SessionCache[i][counter] == nil { + if sm.CircuitCache[i][counter] == nil { for j := range nodeList { if nodeList[j].ID == i { c[counter] = nodeList[j] diff --git a/pkg/engine/sess/sessionmanager.go b/pkg/engine/sess/sessionmanager.go index 021d4177..60cb25d9 100644 --- a/pkg/engine/sess/sessionmanager.go +++ b/pkg/engine/sess/sessionmanager.go @@ -5,8 +5,8 @@ import ( "net/netip" "sync" - "github.com/lightningnetwork/lnd/lnwire" "github.com/gookit/color" + "github.com/lightningnetwork/lnd/lnwire" "github.com/indra-labs/indra" "github.com/indra-labs/indra/pkg/crypto" @@ -26,7 +26,7 @@ var ( fails = log.E.Chk ) -func (sc SessionCache) Add(s *sessions.Data) SessionCache { +func (sc CircuitCache) Add(s *sessions.Data) CircuitCache { var sce *sessions.Circuit var exists bool if sce, exists = sc[s.Node.ID]; !exists { @@ -49,6 +49,8 @@ func (sm *Manager) AddNodes(nn ...*node.Node) { // PendingPayment accessors. For the same reason as the sessions, pending // payments need to be accessed only with the node's mutex locked. +// AddPendingPayment adds a received incoming payment message to await the +// session keys. func (sm *Manager) AddPendingPayment(np *payments.Payment) { sm.Lock() defer sm.Unlock() @@ -58,12 +60,14 @@ func (sm *Manager) AddPendingPayment(np *payments.Payment) { sm.PendingPayments = sm.PendingPayments.Add(np) } +// AddServiceToLocalNode adds a service to the local node. func (sm *Manager) AddServiceToLocalNode(s *services.Service) (e error) { sm.Lock() defer sm.Unlock() return sm.GetLocalNode().AddService(s) } +// AddSession adds a session to the session cache. func (sm *Manager) AddSession(s *sessions.Data) { sm.Lock() defer sm.Unlock() @@ -75,10 +79,10 @@ func (sm *Manager) AddSession(s *sessions.Data) { } } sm.Sessions = append(sm.Sessions, s) - // Hop 5, the return session( s) are not added to the SessionCache as they + // Hop 5, the return session( s) are not added to the CircuitCache as they // are not Billable and are only related to the node of the Engine. if s.Hop < 5 { - sm.SessionCache = sm.SessionCache.Add(s) + sm.CircuitCache = sm.CircuitCache.Add(s) } } @@ -96,10 +100,11 @@ func (sm *Manager) ClearSessions() { sm.Sessions = sm.Sessions[:1] } +// DecSession decrements credit (mSat) on a session. func (sm *Manager) DecSession(id crypto.PubBytes, msats int, sender bool, typ string) bool { - sess := sm.FindSession(id) + sess := sm.FindSessionByPubkey(id) if sess != nil { sm.Lock() defer sm.Unlock() @@ -109,15 +114,16 @@ func (sm *Manager) DecSession(id crypto.PubBytes, msats int, sender bool, return false } +// DeleteNodeAndSessions deletes a node and all the sessions for it. func (sm *Manager) DeleteNodeAndSessions(id nonce.ID) { sm.Lock() defer sm.Unlock() var exists bool - // If the node exists its Keys is in the SessionCache. - if _, exists = sm.SessionCache[id]; !exists { + // If the node exists its Keys is in the CircuitCache. + if _, exists = sm.CircuitCache[id]; !exists { return } - delete(sm.SessionCache, id) + delete(sm.CircuitCache, id) // ProcessAndDelete from the nodes list. for i := range sm.nodes { if sm.nodes[i].ID == id { @@ -175,6 +181,7 @@ func (sm *Manager) DeleteNodeByID(ii nonce.ID) (e error) { return } +// DeletePendingPayment deletes a pending payment by the preimage hash. func (sm *Manager) DeletePendingPayment(preimage sha256.Hash) { sm.Lock() defer sm.Unlock() @@ -186,7 +193,7 @@ func (sm *Manager) DeleteSession(id crypto.PubBytes) { for i := range sm.Sessions { if sm.Sessions[i].Header.Bytes == id { // ProcessAndDelete from Data cache. - sm.SessionCache[sm.Sessions[i].Node.ID][sm.Sessions[i].Hop] = nil + sm.CircuitCache[sm.Sessions[i].Node.ID][sm.Sessions[i].Hop] = nil // ProcessAndDelete from sm.Sessions = append(sm.Sessions[:i], sm.Sessions[i+1:]...) } @@ -261,24 +268,30 @@ func (sm *Manager) FindNodeByIdentity(id *crypto.Pub) (no *node.Node) { return } +// FindNodeByIndex returns the node at a given position in the array. func (sm *Manager) FindNodeByIndex(i int) (no *node.Node) { sm.Lock() defer sm.Unlock() return sm.nodes[i] } +// FindPendingPayment searches for a pending payment with the matching ID. func (sm *Manager) FindPendingPayment(id nonce.ID) (pp *payments.Payment) { sm.Lock() defer sm.Unlock() return sm.PendingPayments.Find(id) } + +// FindPendingPreimage searches for a pending payment with e matching preimage. func (sm *Manager) FindPendingPreimage(pi sha256.Hash) (pp *payments.Payment) { log.T.F("searching preimage %s", pi) sm.Lock() defer sm.Unlock() return sm.PendingPayments.FindPreimage(pi) } -func (sm *Manager) FindSession(id crypto.PubBytes) *sessions.Data { + +// FindSessionByPubkey searches for a session with a matching public key. +func (sm *Manager) FindSessionByPubkey(id crypto.PubBytes) *sessions.Data { sm.Lock() defer sm.Unlock() for i := range sm.Sessions { @@ -288,6 +301,8 @@ func (sm *Manager) FindSession(id crypto.PubBytes) *sessions.Data { } return nil } + +// FindSessionByHeader searches for a session with a matching header private key. func (sm *Manager) FindSessionByHeader(prvKey *crypto.Prv) *sessions.Data { sm.Lock() defer sm.Unlock() @@ -298,6 +313,9 @@ func (sm *Manager) FindSessionByHeader(prvKey *crypto.Prv) *sessions.Data { } return nil } + +// FindSessionByHeaderPub searches for a session with a matching header public +// key. func (sm *Manager) FindSessionByHeaderPub(pubKey *crypto.Pub) *sessions.Data { sm.Lock() defer sm.Unlock() @@ -308,6 +326,8 @@ func (sm *Manager) FindSessionByHeaderPub(pubKey *crypto.Pub) *sessions.Data { } return nil } + +// FindSessionPreimage searches for a session with a matching preimage hash. func (sm *Manager) FindSessionPreimage(pr sha256.Hash) *sessions.Data { sm.Lock() defer sm.Unlock() @@ -340,22 +360,26 @@ func (sm *Manager) ForEachNode(fn func(n *node.Node) bool) { // GetLocalNode returns the engine's local Node. func (sm *Manager) GetLocalNode() *node.Node { return sm.nodes[0] } +// GetLocalNodeAddress returns the AddrPort of the local node. func (sm *Manager) GetLocalNodeAddress() (addr *netip.AddrPort) { - // sm.Lock() - // defer sm.Unlock() + //sm.Lock() + //defer sm.Unlock() return sm.GetLocalNode().AddrPort } +// GetLocalNodeAddressString returns the string form of the local node address. func (sm *Manager) GetLocalNodeAddressString() (s string) { return color.Yellow.Sprint(sm.GetLocalNodeAddress()) } +// GetLocalNodeIdentityBytes returns the public key bytes of the local node. func (sm *Manager) GetLocalNodeIdentityBytes() (ident crypto.PubBytes) { sm.Lock() defer sm.Unlock() return sm.GetLocalNode().Identity.Bytes } +// GetLocalNodeIdentityPrv returns the identity private key of the local node. func (sm *Manager) GetLocalNodeIdentityPrv() (ident *crypto.Prv) { sm.Lock() defer sm.Unlock() @@ -367,21 +391,26 @@ func (sm *Manager) GetLocalNodePaymentChan() payments.Chan { return sm.nodes[0].Chan } +// GetLocalNodeRelayRate returns the relay rate for the local node. func (sm *Manager) GetLocalNodeRelayRate() (rate int) { sm.Lock() defer sm.Unlock() return sm.GetLocalNode().RelayRate } +// GetNodeCircuit gets the set of 5 sessions associated with a node with a given +// ID. func (sm *Manager) GetNodeCircuit(id nonce.ID) (sce *sessions.Circuit, exists bool) { sm.Lock() defer sm.Unlock() - sce, exists = sm.SessionCache[id] + sce, exists = sm.CircuitCache[id] return } +// GetSessionByIndex returns the session with the given index in the main session +// cache. func (sm *Manager) GetSessionByIndex(i int) (s *sessions.Data) { sm.Lock() defer sm.Unlock() @@ -391,6 +420,8 @@ func (sm *Manager) GetSessionByIndex(i int) (s *sessions.Data) { return } +// GetSessionsAtHop returns all of the sessions designated for a given hop in the +// circuit. func (sm *Manager) GetSessionsAtHop(hop byte) (s sessions.Sessions) { sm.Lock() defer sm.Unlock() @@ -402,10 +433,11 @@ func (sm *Manager) GetSessionsAtHop(hop byte) (s sessions.Sessions) { return } +// IncSession adds an amount of mSat to the balance of a session. func (sm *Manager) IncSession(id crypto.PubBytes, msats lnwire.MilliSatoshi, sender bool, typ string) { - sess := sm.FindSession(id) + sess := sm.FindSessionByPubkey(id) if sess != nil { sm.Lock() defer sm.Unlock() @@ -413,7 +445,7 @@ func (sm *Manager) IncSession(id crypto.PubBytes, msats lnwire.MilliSatoshi, } } -// IterateSessionCache calls a function for each entry in the SessionCache +// IterateSessionCache calls a function for each entry in the CircuitCache // that provides also access to the related node. // // Do not call Manager methods within this function. @@ -423,10 +455,10 @@ func (sm *Manager) IterateSessionCache(fn func(n *node.Node, sm.Lock() defer sm.Unlock() out: - for i := range sm.SessionCache { + for i := range sm.CircuitCache { for j := range sm.nodes { if sm.nodes[j].ID == i { - if fn(sm.nodes[j], sm.SessionCache[i]) { + if fn(sm.nodes[j], sm.CircuitCache[i]) { break out } break @@ -455,12 +487,15 @@ func (sm *Manager) NodesLen() int { return len(sm.nodes) } +// ReceiveToLocalNode returns a channel that will receive messages for the local +// node, that arrived from the internet. func (sm *Manager) ReceiveToLocalNode() <-chan slice.Bytes { sm.Lock() defer sm.Unlock() return sm.GetLocalNode().Transport.Receive() } +// SendFromLocalNode delivers a message to a local service. func (sm *Manager) SendFromLocalNode(port uint16, b slice.Bytes) (e error) { @@ -476,44 +511,50 @@ func (sm *Manager) SetLocalNode(n *node.Node) { sm.nodes[0] = n } +// SetLocalNodeAddress changes the local node address. func (sm *Manager) SetLocalNodeAddress(addr *netip.AddrPort) { sm.Lock() defer sm.Unlock() sm.GetLocalNode().AddrPort = addr } +// UpdateSessionCache reads the main Sessions cache and populates the +// CircuitCache where circuits are aggregated. func (sm *Manager) UpdateSessionCache() { sm.Lock() defer sm.Unlock() - // First we create SessionCache entries for all existing nodes. + // First we create CircuitCache entries for all existing nodes. for i := range sm.nodes { - _, exists := sm.SessionCache[sm.nodes[i].ID] + _, exists := sm.CircuitCache[sm.nodes[i].ID] if !exists { - sm.SessionCache[sm.nodes[i].ID] = &sessions.Circuit{} + sm.CircuitCache[sm.nodes[i].ID] = &sessions.Circuit{} } } // Place all sessions in their slots respective to their node. for _, v := range sm.Sessions { - sm.SessionCache[v.Node.ID][v.Hop] = v + sm.CircuitCache[v.Node.ID][v.Hop] = v } } type ( - // A SessionCache stores each of the 5 hops of a peer node. - SessionCache map[nonce.ID]*sessions.Circuit + // A CircuitCache stores each of the 5 hops of a peer node. + CircuitCache map[nonce.ID]*sessions.Circuit + + // Manager is a session manager for Indra, handling sessions and services. Manager struct { nodes []*node.Node Listener *transport.Listener PendingPayments payments.PendingPayments sessions.Sessions - SessionCache + CircuitCache sync.Mutex } ) +// NewSessionManager creates a new session manager. func NewSessionManager(listener *transport.Listener) *Manager { return &Manager{ - SessionCache: make(SessionCache), + CircuitCache: make(CircuitCache), PendingPayments: make(payments.PendingPayments, 0), Listener: listener, } diff --git a/pkg/engine/transport/transport.go b/pkg/engine/transport/transport.go index ceabaff2..8d600f1c 100644 --- a/pkg/engine/transport/transport.go +++ b/pkg/engine/transport/transport.go @@ -34,14 +34,21 @@ import ( ) const ( - LocalhostZeroIPv4TCP = "/ip4/127.0.0.1/tcp/0" + // LocalhostZeroIPv4TCP is the default localhost to bind to any address. Used in + // tests. + LocalhostZeroIPv4TCP = "/ip4/127.0.0.1/tcp/0" + // LocalhostZeroIPv4QUIC - Don't use. Buffer problems on linux and fails on CI. - //LocalhostZeroIPv4QUIC = "/ip4/127.0.0.1/udp/0/quic" - DefaultMTU = 1382 - ConnBufs = 8192 - IndraLibP2PID = "/indra/relay/" + indra.SemVer - IndraServiceName = "org.indra.relay" - ProtocolPrefix = "/indra/" + indra.SemVer + // LocalhostZeroIPv4QUIC = "/ip4/127.0.0.1/udp/0/quic" + + // DefaultMTU is the default maximum size for a packet. + DefaultMTU = 1382 + + // ConnBufs is the number of buffers to use in message dispatch channels. + ConnBufs = 8192 + + // IndraLibP2PID is the indra protocol identifier. + IndraLibP2PID = "/indra/relay/" + indra.SemVer ) var ( @@ -298,7 +305,7 @@ func NewDHT(ctx context.Context, host host.Host, options = append(options, dht.Mode(dht.ModeServer)) } options = append(options, - dht.ProtocolPrefix(ProtocolPrefix), + dht.ProtocolPrefix(IndraLibP2PID), ) if d, e = dht.New(ctx, host, options...); fails(e) { return diff --git a/pkg/onions/balance/balance.go b/pkg/onions/balance/balance.go index dda177b2..44e1db94 100644 --- a/pkg/onions/balance/balance.go +++ b/pkg/onions/balance/balance.go @@ -67,7 +67,7 @@ func (x *Balance) Handle(s *splice.Splice, p ont.Onion, ng ont.Ngin) (e error) { if pending := ng.Pending().Find(x.ID); pending != nil { log.D.S("found pending", pending.ID) for i := range pending.Billable { - session := ng.Mgr().FindSession(pending.Billable[i]) + session := ng.Mgr().FindSessionByPubkey(pending.Billable[i]) out := session.Node.RelayRate * s.Len() if session != nil { in := session.Node.RelayRate * pending.SentSize diff --git a/pkg/onions/response/response.go b/pkg/onions/response/response.go index 44f25057..bcd15b28 100644 --- a/pkg/onions/response/response.go +++ b/pkg/onions/response/response.go @@ -76,7 +76,7 @@ func (x *Response) Handle(s *splice.Splice, p ont.Onion, ng ont.Ngin) (e error) log.T.F("searching for pending Keys %s", x.ID) if pending != nil { for i := range pending.Billable { - se := ng.Mgr().FindSession(pending.Billable[i]) + se := ng.Mgr().FindSessionByPubkey(pending.Billable[i]) if se != nil { typ := "response" relayRate := se.Node.RelayRate