documenting and tidying of engine, session manager and transport
This commit is contained in:
@@ -10,7 +10,7 @@ import (
|
|||||||
"github.com/indra-labs/indra/pkg/engine/tpt"
|
"github.com/indra-labs/indra/pkg/engine/tpt"
|
||||||
"github.com/indra-labs/indra/pkg/engine/transport"
|
"github.com/indra-labs/indra/pkg/engine/transport"
|
||||||
"github.com/indra-labs/indra/pkg/onions/hidden"
|
"github.com/indra-labs/indra/pkg/onions/hidden"
|
||||||
onions2 "github.com/indra-labs/indra/pkg/onions/ont"
|
"github.com/indra-labs/indra/pkg/onions/ont"
|
||||||
"github.com/indra-labs/indra/pkg/onions/reg"
|
"github.com/indra-labs/indra/pkg/onions/reg"
|
||||||
"github.com/indra-labs/indra/pkg/util/qu"
|
"github.com/indra-labs/indra/pkg/util/qu"
|
||||||
"github.com/indra-labs/indra/pkg/util/slice"
|
"github.com/indra-labs/indra/pkg/util/slice"
|
||||||
@@ -18,7 +18,7 @@ import (
|
|||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ onions2.Ngin = &Engine{}
|
var _ ont.Ngin = &Engine{}
|
||||||
|
|
||||||
type (
|
type (
|
||||||
// Engine processes onion messages, forwarding the relevant data to other relays
|
// Engine processes onion messages, forwarding the relevant data to other relays
|
||||||
@@ -53,7 +53,7 @@ func (ng *Engine) GetHidden() *hidden.Hidden { return ng.h }
|
|||||||
|
|
||||||
func (ng *Engine) GetLoad() byte { return byte(ng.Load.Load()) }
|
func (ng *Engine) GetLoad() byte { return byte(ng.Load.Load()) }
|
||||||
|
|
||||||
func (ng *Engine) HandleMessage(s *splice.Splice, pr onions2.Onion) {
|
func (ng *Engine) HandleMessage(s *splice.Splice, pr ont.Onion) {
|
||||||
log.D.F("%s handling received message",
|
log.D.F("%s handling received message",
|
||||||
ng.Manager.GetLocalNodeAddressString())
|
ng.Manager.GetLocalNodeAddressString())
|
||||||
s.SetCursor(0)
|
s.SetCursor(0)
|
||||||
@@ -72,7 +72,7 @@ func (ng *Engine) HandleMessage(s *splice.Splice, pr onions2.Onion) {
|
|||||||
log.D.Ln("did not get onion")
|
log.D.Ln("did not get onion")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if fails(m.(onions2.Onion).Handle(s, pr, ng)) {
|
if fails(m.(ont.Onion).Handle(s, pr, ng)) {
|
||||||
log.W.S("unrecognised packet", s.GetAll().ToBytes())
|
log.W.S("unrecognised packet", s.GetAll().ToBytes())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -82,7 +82,7 @@ func (ng *Engine) Handler() (out bool) {
|
|||||||
log.T.C(func() string {
|
log.T.C(func() string {
|
||||||
return ng.Manager.GetLocalNodeAddressString() + " awaiting message"
|
return ng.Manager.GetLocalNodeAddressString() + " awaiting message"
|
||||||
})
|
})
|
||||||
var prev onions2.Onion
|
var prev ont.Onion
|
||||||
select {
|
select {
|
||||||
case <-ng.C.Wait():
|
case <-ng.C.Wait():
|
||||||
ng.Shutdown()
|
ng.Shutdown()
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestClient_SendExit(t *testing.T) {
|
func TestClient_SendExit(t *testing.T) {
|
||||||
if indra.CI=="false" {
|
if indra.CI == "false" {
|
||||||
log2.SetLogLevel(log2.Debug)
|
log2.SetLogLevel(log2.Debug)
|
||||||
}
|
}
|
||||||
var clients []*Engine
|
var clients []*Engine
|
||||||
@@ -124,7 +124,7 @@ out:
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestClient_SendPing(t *testing.T) {
|
func TestClient_SendPing(t *testing.T) {
|
||||||
if indra.CI=="false" {
|
if indra.CI == "false" {
|
||||||
log2.SetLogLevel(log2.Debug)
|
log2.SetLogLevel(log2.Debug)
|
||||||
}
|
}
|
||||||
var clients []*Engine
|
var clients []*Engine
|
||||||
@@ -177,7 +177,7 @@ out:
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestClient_SendSessionKeys(t *testing.T) {
|
func TestClient_SendSessionKeys(t *testing.T) {
|
||||||
if indra.CI=="false" {
|
if indra.CI == "false" {
|
||||||
log2.SetLogLevel(log2.Debug)
|
log2.SetLogLevel(log2.Debug)
|
||||||
}
|
}
|
||||||
var clients []*Engine
|
var clients []*Engine
|
||||||
@@ -219,8 +219,8 @@ func TestClient_SendSessionKeys(t *testing.T) {
|
|||||||
counter.Dec()
|
counter.Dec()
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
for j := range clients[0].Manager.SessionCache {
|
for j := range clients[0].Manager.CircuitCache {
|
||||||
log.D.F("%d %s %v", i, j, clients[0].Manager.SessionCache[j])
|
log.D.F("%d %s %v", i, j, clients[0].Manager.CircuitCache[j])
|
||||||
}
|
}
|
||||||
quit.Q()
|
quit.Q()
|
||||||
}
|
}
|
||||||
@@ -229,4 +229,3 @@ func TestClient_SendSessionKeys(t *testing.T) {
|
|||||||
}
|
}
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -91,19 +91,19 @@ func (sm *Manager) SelectUnusedCircuit() (c [5]*node.Node) {
|
|||||||
nodeList := make([]*node.Node, len(sm.nodes)-1)
|
nodeList := make([]*node.Node, len(sm.nodes)-1)
|
||||||
copy(nodeList, sm.nodes[1:])
|
copy(nodeList, sm.nodes[1:])
|
||||||
for i := range nodeList {
|
for i := range nodeList {
|
||||||
if _, ok := sm.SessionCache[nodeList[i].ID]; !ok {
|
if _, ok := sm.CircuitCache[nodeList[i].ID]; !ok {
|
||||||
log.T.F("adding session cache entry for node %s", nodeList[i].ID)
|
log.T.F("adding session cache entry for node %s", nodeList[i].ID)
|
||||||
sm.SessionCache[nodeList[i].ID] = &sessions.Circuit{}
|
sm.CircuitCache[nodeList[i].ID] = &sessions.Circuit{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var counter int
|
var counter int
|
||||||
out:
|
out:
|
||||||
for counter < 5 {
|
for counter < 5 {
|
||||||
for i := range sm.SessionCache {
|
for i := range sm.CircuitCache {
|
||||||
if counter == 5 {
|
if counter == 5 {
|
||||||
break out
|
break out
|
||||||
}
|
}
|
||||||
if sm.SessionCache[i][counter] == nil {
|
if sm.CircuitCache[i][counter] == nil {
|
||||||
for j := range nodeList {
|
for j := range nodeList {
|
||||||
if nodeList[j].ID == i {
|
if nodeList[j].ID == i {
|
||||||
c[counter] = nodeList[j]
|
c[counter] = nodeList[j]
|
||||||
|
|||||||
@@ -5,8 +5,8 @@ import (
|
|||||||
"net/netip"
|
"net/netip"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
|
||||||
"github.com/gookit/color"
|
"github.com/gookit/color"
|
||||||
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
|
||||||
"github.com/indra-labs/indra"
|
"github.com/indra-labs/indra"
|
||||||
"github.com/indra-labs/indra/pkg/crypto"
|
"github.com/indra-labs/indra/pkg/crypto"
|
||||||
@@ -26,7 +26,7 @@ var (
|
|||||||
fails = log.E.Chk
|
fails = log.E.Chk
|
||||||
)
|
)
|
||||||
|
|
||||||
func (sc SessionCache) Add(s *sessions.Data) SessionCache {
|
func (sc CircuitCache) Add(s *sessions.Data) CircuitCache {
|
||||||
var sce *sessions.Circuit
|
var sce *sessions.Circuit
|
||||||
var exists bool
|
var exists bool
|
||||||
if sce, exists = sc[s.Node.ID]; !exists {
|
if sce, exists = sc[s.Node.ID]; !exists {
|
||||||
@@ -49,6 +49,8 @@ func (sm *Manager) AddNodes(nn ...*node.Node) {
|
|||||||
// PendingPayment accessors. For the same reason as the sessions, pending
|
// PendingPayment accessors. For the same reason as the sessions, pending
|
||||||
// payments need to be accessed only with the node's mutex locked.
|
// payments need to be accessed only with the node's mutex locked.
|
||||||
|
|
||||||
|
// AddPendingPayment adds a received incoming payment message to await the
|
||||||
|
// session keys.
|
||||||
func (sm *Manager) AddPendingPayment(np *payments.Payment) {
|
func (sm *Manager) AddPendingPayment(np *payments.Payment) {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
@@ -58,12 +60,14 @@ func (sm *Manager) AddPendingPayment(np *payments.Payment) {
|
|||||||
sm.PendingPayments = sm.PendingPayments.Add(np)
|
sm.PendingPayments = sm.PendingPayments.Add(np)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddServiceToLocalNode adds a service to the local node.
|
||||||
func (sm *Manager) AddServiceToLocalNode(s *services.Service) (e error) {
|
func (sm *Manager) AddServiceToLocalNode(s *services.Service) (e error) {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
return sm.GetLocalNode().AddService(s)
|
return sm.GetLocalNode().AddService(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddSession adds a session to the session cache.
|
||||||
func (sm *Manager) AddSession(s *sessions.Data) {
|
func (sm *Manager) AddSession(s *sessions.Data) {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
@@ -75,10 +79,10 @@ func (sm *Manager) AddSession(s *sessions.Data) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
sm.Sessions = append(sm.Sessions, s)
|
sm.Sessions = append(sm.Sessions, s)
|
||||||
// Hop 5, the return session( s) are not added to the SessionCache as they
|
// Hop 5, the return session( s) are not added to the CircuitCache as they
|
||||||
// are not Billable and are only related to the node of the Engine.
|
// are not Billable and are only related to the node of the Engine.
|
||||||
if s.Hop < 5 {
|
if s.Hop < 5 {
|
||||||
sm.SessionCache = sm.SessionCache.Add(s)
|
sm.CircuitCache = sm.CircuitCache.Add(s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -96,10 +100,11 @@ func (sm *Manager) ClearSessions() {
|
|||||||
sm.Sessions = sm.Sessions[:1]
|
sm.Sessions = sm.Sessions[:1]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DecSession decrements credit (mSat) on a session.
|
||||||
func (sm *Manager) DecSession(id crypto.PubBytes, msats int, sender bool,
|
func (sm *Manager) DecSession(id crypto.PubBytes, msats int, sender bool,
|
||||||
typ string) bool {
|
typ string) bool {
|
||||||
|
|
||||||
sess := sm.FindSession(id)
|
sess := sm.FindSessionByPubkey(id)
|
||||||
if sess != nil {
|
if sess != nil {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
@@ -109,15 +114,16 @@ func (sm *Manager) DecSession(id crypto.PubBytes, msats int, sender bool,
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteNodeAndSessions deletes a node and all the sessions for it.
|
||||||
func (sm *Manager) DeleteNodeAndSessions(id nonce.ID) {
|
func (sm *Manager) DeleteNodeAndSessions(id nonce.ID) {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
var exists bool
|
var exists bool
|
||||||
// If the node exists its Keys is in the SessionCache.
|
// If the node exists its Keys is in the CircuitCache.
|
||||||
if _, exists = sm.SessionCache[id]; !exists {
|
if _, exists = sm.CircuitCache[id]; !exists {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
delete(sm.SessionCache, id)
|
delete(sm.CircuitCache, id)
|
||||||
// ProcessAndDelete from the nodes list.
|
// ProcessAndDelete from the nodes list.
|
||||||
for i := range sm.nodes {
|
for i := range sm.nodes {
|
||||||
if sm.nodes[i].ID == id {
|
if sm.nodes[i].ID == id {
|
||||||
@@ -175,6 +181,7 @@ func (sm *Manager) DeleteNodeByID(ii nonce.ID) (e error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeletePendingPayment deletes a pending payment by the preimage hash.
|
||||||
func (sm *Manager) DeletePendingPayment(preimage sha256.Hash) {
|
func (sm *Manager) DeletePendingPayment(preimage sha256.Hash) {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
@@ -186,7 +193,7 @@ func (sm *Manager) DeleteSession(id crypto.PubBytes) {
|
|||||||
for i := range sm.Sessions {
|
for i := range sm.Sessions {
|
||||||
if sm.Sessions[i].Header.Bytes == id {
|
if sm.Sessions[i].Header.Bytes == id {
|
||||||
// ProcessAndDelete from Data cache.
|
// ProcessAndDelete from Data cache.
|
||||||
sm.SessionCache[sm.Sessions[i].Node.ID][sm.Sessions[i].Hop] = nil
|
sm.CircuitCache[sm.Sessions[i].Node.ID][sm.Sessions[i].Hop] = nil
|
||||||
// ProcessAndDelete from
|
// ProcessAndDelete from
|
||||||
sm.Sessions = append(sm.Sessions[:i], sm.Sessions[i+1:]...)
|
sm.Sessions = append(sm.Sessions[:i], sm.Sessions[i+1:]...)
|
||||||
}
|
}
|
||||||
@@ -261,24 +268,30 @@ func (sm *Manager) FindNodeByIdentity(id *crypto.Pub) (no *node.Node) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindNodeByIndex returns the node at a given position in the array.
|
||||||
func (sm *Manager) FindNodeByIndex(i int) (no *node.Node) {
|
func (sm *Manager) FindNodeByIndex(i int) (no *node.Node) {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
return sm.nodes[i]
|
return sm.nodes[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindPendingPayment searches for a pending payment with the matching ID.
|
||||||
func (sm *Manager) FindPendingPayment(id nonce.ID) (pp *payments.Payment) {
|
func (sm *Manager) FindPendingPayment(id nonce.ID) (pp *payments.Payment) {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
return sm.PendingPayments.Find(id)
|
return sm.PendingPayments.Find(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindPendingPreimage searches for a pending payment with e matching preimage.
|
||||||
func (sm *Manager) FindPendingPreimage(pi sha256.Hash) (pp *payments.Payment) {
|
func (sm *Manager) FindPendingPreimage(pi sha256.Hash) (pp *payments.Payment) {
|
||||||
log.T.F("searching preimage %s", pi)
|
log.T.F("searching preimage %s", pi)
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
return sm.PendingPayments.FindPreimage(pi)
|
return sm.PendingPayments.FindPreimage(pi)
|
||||||
}
|
}
|
||||||
func (sm *Manager) FindSession(id crypto.PubBytes) *sessions.Data {
|
|
||||||
|
// FindSessionByPubkey searches for a session with a matching public key.
|
||||||
|
func (sm *Manager) FindSessionByPubkey(id crypto.PubBytes) *sessions.Data {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
for i := range sm.Sessions {
|
for i := range sm.Sessions {
|
||||||
@@ -288,6 +301,8 @@ func (sm *Manager) FindSession(id crypto.PubBytes) *sessions.Data {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindSessionByHeader searches for a session with a matching header private key.
|
||||||
func (sm *Manager) FindSessionByHeader(prvKey *crypto.Prv) *sessions.Data {
|
func (sm *Manager) FindSessionByHeader(prvKey *crypto.Prv) *sessions.Data {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
@@ -298,6 +313,9 @@ func (sm *Manager) FindSessionByHeader(prvKey *crypto.Prv) *sessions.Data {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindSessionByHeaderPub searches for a session with a matching header public
|
||||||
|
// key.
|
||||||
func (sm *Manager) FindSessionByHeaderPub(pubKey *crypto.Pub) *sessions.Data {
|
func (sm *Manager) FindSessionByHeaderPub(pubKey *crypto.Pub) *sessions.Data {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
@@ -308,6 +326,8 @@ func (sm *Manager) FindSessionByHeaderPub(pubKey *crypto.Pub) *sessions.Data {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindSessionPreimage searches for a session with a matching preimage hash.
|
||||||
func (sm *Manager) FindSessionPreimage(pr sha256.Hash) *sessions.Data {
|
func (sm *Manager) FindSessionPreimage(pr sha256.Hash) *sessions.Data {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
@@ -340,22 +360,26 @@ func (sm *Manager) ForEachNode(fn func(n *node.Node) bool) {
|
|||||||
// GetLocalNode returns the engine's local Node.
|
// GetLocalNode returns the engine's local Node.
|
||||||
func (sm *Manager) GetLocalNode() *node.Node { return sm.nodes[0] }
|
func (sm *Manager) GetLocalNode() *node.Node { return sm.nodes[0] }
|
||||||
|
|
||||||
|
// GetLocalNodeAddress returns the AddrPort of the local node.
|
||||||
func (sm *Manager) GetLocalNodeAddress() (addr *netip.AddrPort) {
|
func (sm *Manager) GetLocalNodeAddress() (addr *netip.AddrPort) {
|
||||||
// sm.Lock()
|
//sm.Lock()
|
||||||
// defer sm.Unlock()
|
//defer sm.Unlock()
|
||||||
return sm.GetLocalNode().AddrPort
|
return sm.GetLocalNode().AddrPort
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLocalNodeAddressString returns the string form of the local node address.
|
||||||
func (sm *Manager) GetLocalNodeAddressString() (s string) {
|
func (sm *Manager) GetLocalNodeAddressString() (s string) {
|
||||||
return color.Yellow.Sprint(sm.GetLocalNodeAddress())
|
return color.Yellow.Sprint(sm.GetLocalNodeAddress())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLocalNodeIdentityBytes returns the public key bytes of the local node.
|
||||||
func (sm *Manager) GetLocalNodeIdentityBytes() (ident crypto.PubBytes) {
|
func (sm *Manager) GetLocalNodeIdentityBytes() (ident crypto.PubBytes) {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
return sm.GetLocalNode().Identity.Bytes
|
return sm.GetLocalNode().Identity.Bytes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLocalNodeIdentityPrv returns the identity private key of the local node.
|
||||||
func (sm *Manager) GetLocalNodeIdentityPrv() (ident *crypto.Prv) {
|
func (sm *Manager) GetLocalNodeIdentityPrv() (ident *crypto.Prv) {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
@@ -367,21 +391,26 @@ func (sm *Manager) GetLocalNodePaymentChan() payments.Chan {
|
|||||||
return sm.nodes[0].Chan
|
return sm.nodes[0].Chan
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLocalNodeRelayRate returns the relay rate for the local node.
|
||||||
func (sm *Manager) GetLocalNodeRelayRate() (rate int) {
|
func (sm *Manager) GetLocalNodeRelayRate() (rate int) {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
return sm.GetLocalNode().RelayRate
|
return sm.GetLocalNode().RelayRate
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetNodeCircuit gets the set of 5 sessions associated with a node with a given
|
||||||
|
// ID.
|
||||||
func (sm *Manager) GetNodeCircuit(id nonce.ID) (sce *sessions.Circuit,
|
func (sm *Manager) GetNodeCircuit(id nonce.ID) (sce *sessions.Circuit,
|
||||||
exists bool) {
|
exists bool) {
|
||||||
|
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
sce, exists = sm.SessionCache[id]
|
sce, exists = sm.CircuitCache[id]
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetSessionByIndex returns the session with the given index in the main session
|
||||||
|
// cache.
|
||||||
func (sm *Manager) GetSessionByIndex(i int) (s *sessions.Data) {
|
func (sm *Manager) GetSessionByIndex(i int) (s *sessions.Data) {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
@@ -391,6 +420,8 @@ func (sm *Manager) GetSessionByIndex(i int) (s *sessions.Data) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetSessionsAtHop returns all of the sessions designated for a given hop in the
|
||||||
|
// circuit.
|
||||||
func (sm *Manager) GetSessionsAtHop(hop byte) (s sessions.Sessions) {
|
func (sm *Manager) GetSessionsAtHop(hop byte) (s sessions.Sessions) {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
@@ -402,10 +433,11 @@ func (sm *Manager) GetSessionsAtHop(hop byte) (s sessions.Sessions) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IncSession adds an amount of mSat to the balance of a session.
|
||||||
func (sm *Manager) IncSession(id crypto.PubBytes, msats lnwire.MilliSatoshi,
|
func (sm *Manager) IncSession(id crypto.PubBytes, msats lnwire.MilliSatoshi,
|
||||||
sender bool, typ string) {
|
sender bool, typ string) {
|
||||||
|
|
||||||
sess := sm.FindSession(id)
|
sess := sm.FindSessionByPubkey(id)
|
||||||
if sess != nil {
|
if sess != nil {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
@@ -413,7 +445,7 @@ func (sm *Manager) IncSession(id crypto.PubBytes, msats lnwire.MilliSatoshi,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateSessionCache calls a function for each entry in the SessionCache
|
// IterateSessionCache calls a function for each entry in the CircuitCache
|
||||||
// that provides also access to the related node.
|
// that provides also access to the related node.
|
||||||
//
|
//
|
||||||
// Do not call Manager methods within this function.
|
// Do not call Manager methods within this function.
|
||||||
@@ -423,10 +455,10 @@ func (sm *Manager) IterateSessionCache(fn func(n *node.Node,
|
|||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
out:
|
out:
|
||||||
for i := range sm.SessionCache {
|
for i := range sm.CircuitCache {
|
||||||
for j := range sm.nodes {
|
for j := range sm.nodes {
|
||||||
if sm.nodes[j].ID == i {
|
if sm.nodes[j].ID == i {
|
||||||
if fn(sm.nodes[j], sm.SessionCache[i]) {
|
if fn(sm.nodes[j], sm.CircuitCache[i]) {
|
||||||
break out
|
break out
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
@@ -455,12 +487,15 @@ func (sm *Manager) NodesLen() int {
|
|||||||
return len(sm.nodes)
|
return len(sm.nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReceiveToLocalNode returns a channel that will receive messages for the local
|
||||||
|
// node, that arrived from the internet.
|
||||||
func (sm *Manager) ReceiveToLocalNode() <-chan slice.Bytes {
|
func (sm *Manager) ReceiveToLocalNode() <-chan slice.Bytes {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
return sm.GetLocalNode().Transport.Receive()
|
return sm.GetLocalNode().Transport.Receive()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SendFromLocalNode delivers a message to a local service.
|
||||||
func (sm *Manager) SendFromLocalNode(port uint16,
|
func (sm *Manager) SendFromLocalNode(port uint16,
|
||||||
b slice.Bytes) (e error) {
|
b slice.Bytes) (e error) {
|
||||||
|
|
||||||
@@ -476,44 +511,50 @@ func (sm *Manager) SetLocalNode(n *node.Node) {
|
|||||||
sm.nodes[0] = n
|
sm.nodes[0] = n
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetLocalNodeAddress changes the local node address.
|
||||||
func (sm *Manager) SetLocalNodeAddress(addr *netip.AddrPort) {
|
func (sm *Manager) SetLocalNodeAddress(addr *netip.AddrPort) {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
sm.GetLocalNode().AddrPort = addr
|
sm.GetLocalNode().AddrPort = addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateSessionCache reads the main Sessions cache and populates the
|
||||||
|
// CircuitCache where circuits are aggregated.
|
||||||
func (sm *Manager) UpdateSessionCache() {
|
func (sm *Manager) UpdateSessionCache() {
|
||||||
sm.Lock()
|
sm.Lock()
|
||||||
defer sm.Unlock()
|
defer sm.Unlock()
|
||||||
// First we create SessionCache entries for all existing nodes.
|
// First we create CircuitCache entries for all existing nodes.
|
||||||
for i := range sm.nodes {
|
for i := range sm.nodes {
|
||||||
_, exists := sm.SessionCache[sm.nodes[i].ID]
|
_, exists := sm.CircuitCache[sm.nodes[i].ID]
|
||||||
if !exists {
|
if !exists {
|
||||||
sm.SessionCache[sm.nodes[i].ID] = &sessions.Circuit{}
|
sm.CircuitCache[sm.nodes[i].ID] = &sessions.Circuit{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Place all sessions in their slots respective to their node.
|
// Place all sessions in their slots respective to their node.
|
||||||
for _, v := range sm.Sessions {
|
for _, v := range sm.Sessions {
|
||||||
sm.SessionCache[v.Node.ID][v.Hop] = v
|
sm.CircuitCache[v.Node.ID][v.Hop] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type (
|
type (
|
||||||
// A SessionCache stores each of the 5 hops of a peer node.
|
// A CircuitCache stores each of the 5 hops of a peer node.
|
||||||
SessionCache map[nonce.ID]*sessions.Circuit
|
CircuitCache map[nonce.ID]*sessions.Circuit
|
||||||
|
|
||||||
|
// Manager is a session manager for Indra, handling sessions and services.
|
||||||
Manager struct {
|
Manager struct {
|
||||||
nodes []*node.Node
|
nodes []*node.Node
|
||||||
Listener *transport.Listener
|
Listener *transport.Listener
|
||||||
PendingPayments payments.PendingPayments
|
PendingPayments payments.PendingPayments
|
||||||
sessions.Sessions
|
sessions.Sessions
|
||||||
SessionCache
|
CircuitCache
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NewSessionManager creates a new session manager.
|
||||||
func NewSessionManager(listener *transport.Listener) *Manager {
|
func NewSessionManager(listener *transport.Listener) *Manager {
|
||||||
return &Manager{
|
return &Manager{
|
||||||
SessionCache: make(SessionCache),
|
CircuitCache: make(CircuitCache),
|
||||||
PendingPayments: make(payments.PendingPayments, 0),
|
PendingPayments: make(payments.PendingPayments, 0),
|
||||||
Listener: listener,
|
Listener: listener,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,14 +34,21 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
LocalhostZeroIPv4TCP = "/ip4/127.0.0.1/tcp/0"
|
// LocalhostZeroIPv4TCP is the default localhost to bind to any address. Used in
|
||||||
|
// tests.
|
||||||
|
LocalhostZeroIPv4TCP = "/ip4/127.0.0.1/tcp/0"
|
||||||
|
|
||||||
// LocalhostZeroIPv4QUIC - Don't use. Buffer problems on linux and fails on CI.
|
// LocalhostZeroIPv4QUIC - Don't use. Buffer problems on linux and fails on CI.
|
||||||
//LocalhostZeroIPv4QUIC = "/ip4/127.0.0.1/udp/0/quic"
|
// LocalhostZeroIPv4QUIC = "/ip4/127.0.0.1/udp/0/quic"
|
||||||
DefaultMTU = 1382
|
|
||||||
ConnBufs = 8192
|
// DefaultMTU is the default maximum size for a packet.
|
||||||
IndraLibP2PID = "/indra/relay/" + indra.SemVer
|
DefaultMTU = 1382
|
||||||
IndraServiceName = "org.indra.relay"
|
|
||||||
ProtocolPrefix = "/indra/" + indra.SemVer
|
// ConnBufs is the number of buffers to use in message dispatch channels.
|
||||||
|
ConnBufs = 8192
|
||||||
|
|
||||||
|
// IndraLibP2PID is the indra protocol identifier.
|
||||||
|
IndraLibP2PID = "/indra/relay/" + indra.SemVer
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -298,7 +305,7 @@ func NewDHT(ctx context.Context, host host.Host,
|
|||||||
options = append(options, dht.Mode(dht.ModeServer))
|
options = append(options, dht.Mode(dht.ModeServer))
|
||||||
}
|
}
|
||||||
options = append(options,
|
options = append(options,
|
||||||
dht.ProtocolPrefix(ProtocolPrefix),
|
dht.ProtocolPrefix(IndraLibP2PID),
|
||||||
)
|
)
|
||||||
if d, e = dht.New(ctx, host, options...); fails(e) {
|
if d, e = dht.New(ctx, host, options...); fails(e) {
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ func (x *Balance) Handle(s *splice.Splice, p ont.Onion, ng ont.Ngin) (e error) {
|
|||||||
if pending := ng.Pending().Find(x.ID); pending != nil {
|
if pending := ng.Pending().Find(x.ID); pending != nil {
|
||||||
log.D.S("found pending", pending.ID)
|
log.D.S("found pending", pending.ID)
|
||||||
for i := range pending.Billable {
|
for i := range pending.Billable {
|
||||||
session := ng.Mgr().FindSession(pending.Billable[i])
|
session := ng.Mgr().FindSessionByPubkey(pending.Billable[i])
|
||||||
out := session.Node.RelayRate * s.Len()
|
out := session.Node.RelayRate * s.Len()
|
||||||
if session != nil {
|
if session != nil {
|
||||||
in := session.Node.RelayRate * pending.SentSize
|
in := session.Node.RelayRate * pending.SentSize
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ func (x *Response) Handle(s *splice.Splice, p ont.Onion, ng ont.Ngin) (e error)
|
|||||||
log.T.F("searching for pending Keys %s", x.ID)
|
log.T.F("searching for pending Keys %s", x.ID)
|
||||||
if pending != nil {
|
if pending != nil {
|
||||||
for i := range pending.Billable {
|
for i := range pending.Billable {
|
||||||
se := ng.Mgr().FindSession(pending.Billable[i])
|
se := ng.Mgr().FindSessionByPubkey(pending.Billable[i])
|
||||||
if se != nil {
|
if se != nil {
|
||||||
typ := "response"
|
typ := "response"
|
||||||
relayRate := se.Node.RelayRate
|
relayRate := se.Node.RelayRate
|
||||||
|
|||||||
Reference in New Issue
Block a user