refactoring some ugly long parameters

would like to redo a lotta this stuff but another time
This commit is contained in:
херетик
2023-03-01 06:54:21 +00:00
parent 35c8f1415f
commit 919d4187e6
12 changed files with 138 additions and 103 deletions

View File

@@ -3,9 +3,9 @@ package relay
import (
"sync"
"time"
"github.com/cybriq/qu"
"git-indra.lan/indra-labs/indra/pkg/crypto/nonce"
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
@@ -41,22 +41,30 @@ func (p *PendingResponses) GetOldestPending() (pr *PendingResponse) {
return
}
func (p *PendingResponses) Add(id nonce.ID, sentSize int, s Sessions,
billable []nonce.ID, ret nonce.ID, port uint16,
callback func(id nonce.ID, b slice.Bytes), postAcct []func()) {
type PendingResponseParams struct {
ID nonce.ID
SentSize int
S Sessions
Billable []nonce.ID
Ret nonce.ID
Port uint16
Callback func(id nonce.ID, b slice.Bytes)
PostAcct []func()
}
func (p *PendingResponses) Add(pr PendingResponseParams) {
p.Lock()
defer p.Unlock()
log.T.F("adding response hook %s", id)
log.T.F("adding response hook %s", pr.ID)
r := &PendingResponse{
ID: id,
SentSize: sentSize,
ID: pr.ID,
SentSize: pr.SentSize,
Time: time.Now(),
Billable: billable,
Return: ret,
Port: port,
PostAcct: postAcct,
Callback: callback,
Billable: pr.Billable,
Return: pr.Ret,
Port: pr.Port,
PostAcct: pr.PostAcct,
Callback: pr.Callback,
Success: qu.T(),
}
p.responses = append(p.responses, r)
@@ -84,9 +92,9 @@ func (p *PendingResponses) Find(id nonce.ID) (pr *PendingResponse) {
return
}
// Delete runs the callback and post accounting function list and deletes the
// ProcessAndDelete runs the callback and post accounting function list and deletes the
// pending response.
func (p *PendingResponses) Delete(id nonce.ID, b slice.Bytes) {
func (p *PendingResponses) ProcessAndDelete(id nonce.ID, b slice.Bytes) {
p.Lock()
defer p.Unlock()
log.T.F("deleting response %s", id)

View File

@@ -47,7 +47,7 @@ func (eng *Engine) balance(on *balance.Layer,
}
return false
})
eng.PendingResponses.Delete(pending.ID, nil)
eng.PendingResponses.ProcessAndDelete(pending.ID, nil)
if se != nil {
log.D.F("got %v, expected %v", se.Remaining, on.MilliSatoshi)
se.Remaining = on.MilliSatoshi
@@ -55,30 +55,36 @@ func (eng *Engine) balance(on *balance.Layer,
}
}
// GetBalance sends out a request in a similar way to SendExit except the node
type GetBalanceParams struct {
ID, ConfID nonce.ID
Client *Session
S Circuit
KS *signer.KeySet
}
// GetBalance sends out a request in a similar way to Exit except the node
// being queried can be any of the 5.
func GetBalance(id, confID nonce.ID, client *Session,
s Circuit, ks *signer.KeySet) Skins {
func GetBalance(p GetBalanceParams) Skins {
var prvs [3]*prv.Key
for i := range prvs {
prvs[i] = ks.Next()
prvs[i] = p.KS.Next()
}
n := GenNonces(6)
var retNonces [3]nonce.IV
copy(retNonces[:], n[3:])
var pubs [3]*pub.Key
pubs[0] = s[3].PayloadPub
pubs[1] = s[4].PayloadPub
pubs[2] = client.PayloadPub
pubs[0] = p.S[3].PayloadPub
pubs[1] = p.S[4].PayloadPub
pubs[2] = p.Client.PayloadPub
return Skins{}.
ReverseCrypt(s[0], ks.Next(), n[0], 3).
ReverseCrypt(s[1], ks.Next(), n[1], 2).
ReverseCrypt(s[2], ks.Next(), n[2], 1).
GetBalance(id, confID, prvs, pubs, retNonces).
ReverseCrypt(s[3], prvs[0], n[3], 0).
ReverseCrypt(s[4], prvs[1], n[4], 0).
ReverseCrypt(client, prvs[2], n[5], 0)
ReverseCrypt(p.S[0], p.KS.Next(), n[0], 3).
ReverseCrypt(p.S[1], p.KS.Next(), n[1], 2).
ReverseCrypt(p.S[2], p.KS.Next(), n[2], 1).
GetBalance(p.ID, p.ConfID, prvs, pubs, retNonces).
ReverseCrypt(p.S[3], prvs[0], n[3], 0).
ReverseCrypt(p.S[4], prvs[1], n[4], 0).
ReverseCrypt(p.Client, prvs[2], n[5], 0)
}
func (eng *Engine) SendGetBalance(target *Session, hook Callback) {
@@ -89,7 +95,7 @@ func (eng *Engine) SendGetBalance(target *Session, hook Callback) {
var c Circuit
copy(c[:], se)
confID := nonce.NewID()
o := GetBalance(target.ID, confID, se[5], c, eng.KeySet)
o := GetBalance(GetBalanceParams{target.ID, confID, se[5], c, eng.KeySet})
log.D.Ln("sending out getbalance onion")
res := eng.PostAcctOnion(o)
eng.SendWithOneHook(c[0].AddrPort, res, hook)

View File

@@ -1,8 +1,6 @@
package relay
import (
"time"
"github.com/cybriq/qu"
"go.uber.org/atomic"
@@ -20,8 +18,6 @@ var (
check = log.E.Chk
)
const DefaultTimeout = time.Second
type Engine struct {
*PendingResponses
*SessionManager
@@ -34,12 +30,18 @@ type Engine struct {
qu.C
}
func NewEngine(tpt types.Transport, idPrv *prv.Key, no *Node,
nodes []*Node, nReturnSessions int) (c *Engine, e error) {
no.Transport = tpt
no.IdentityPrv = idPrv
no.IdentityPub = pub.Derive(idPrv)
type EngineParams struct {
Tpt types.Transport
IDPrv *prv.Key
No *Node
Nodes []*Node
NReturnSessions int
}
func NewEngine(p EngineParams) (c *Engine, e error) {
p.No.Transport = p.Tpt
p.No.IdentityPrv = p.IDPrv
p.No.IdentityPub = pub.Derive(p.IDPrv)
var ks *signer.KeySet
if _, ks, e = signer.New(); check(e) {
return
@@ -53,11 +55,11 @@ func NewEngine(tpt types.Transport, idPrv *prv.Key, no *Node,
Pause: qu.T(),
C: qu.T(),
}
c.AddNodes(append([]*Node{no}, nodes...)...)
c.AddNodes(append([]*Node{p.No}, p.Nodes...)...)
// AddIntro a return session for receiving responses, ideally more of these will
// be generated during operation and rotated out over time.
for i := 0; i < nReturnSessions; i++ {
c.AddSession(NewSession(nonce.NewID(), no, 0, nil, nil, 5))
for i := 0; i < p.NReturnSessions; i++ {
c.AddSession(NewSession(nonce.NewID(), p.No, 0, nil, nil, 5))
}
return
}

View File

@@ -27,7 +27,7 @@ func TestClient_SendSessionKeys(t *testing.T) {
log2.SetLogLevel(log2.Trace)
var clients []*Engine
var e error
if clients, e = CreateNMockCircuits(false, 2, 2); check(e) {
if clients, e = CreateNMockCircuits(2, 2); check(e) {
t.Error(e)
t.FailNow()
}
@@ -77,7 +77,7 @@ func TestClient_SendExit(t *testing.T) {
log2.SetLogLevel(log2.Trace)
var clients []*Engine
var e error
if clients, e = CreateNMockCircuits(true, 2, 2); check(e) {
if clients, e = CreateNMockCircuitsWithSessions(2, 2); check(e) {
t.Error(e)
t.FailNow()
}
@@ -111,7 +111,7 @@ func TestClient_SendExit(t *testing.T) {
return
}
quit.Q()
t.Error("SendExit test failed")
t.Error("Exit test failed")
}()
out:
for i := 3; i < len(clients[0].Sessions)-1; i++ {
@@ -164,7 +164,7 @@ func TestClient_SendPing(t *testing.T) {
log2.SetLogLevel(log2.Debug)
var clients []*Engine
var e error
if clients, e = CreateNMockCircuits(true, 1, 2); check(e) {
if clients, e = CreateNMockCircuitsWithSessions(1, 2); check(e) {
t.Error(e)
t.FailNow()
}
@@ -211,7 +211,7 @@ func TestClient_SendGetBalance(t *testing.T) {
log2.SetLogLevel(log2.Trace)
var clients []*Engine
var e error
if clients, e = CreateNMockCircuits(true, 2, 2); check(e) {
if clients, e = CreateNMockCircuitsWithSessions(2, 2); check(e) {
t.Error(e)
t.FailNow()
}
@@ -256,7 +256,7 @@ func TestClient_HiddenService(t *testing.T) {
var clients []*Engine
var e error
const returns = 2
if clients, e = CreateNMockCircuits(false, 5, returns); check(e) {
if clients, e = CreateNMockCircuits(5, returns); check(e) {
t.Error(e)
t.FailNow()
}
@@ -317,7 +317,7 @@ func TestClient_HiddenServiceRequest(t *testing.T) {
var clients []*Engine
var e error
const returns = 2
if clients, e = CreateNMockCircuits(false, 10, returns); check(e) {
if clients, e = CreateNMockCircuits(10, returns); check(e) {
t.Error(e)
t.FailNow()
}

View File

@@ -15,7 +15,16 @@ import (
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
// SendExit constructs a message containing an arbitrary payload to a node (3rd
type ExitParams struct {
Port uint16
Payload slice.Bytes
ID nonce.ID
Client *Session
S Circuit
KS *signer.KeySet
}
// Exit constructs a message containing an arbitrary payload to a node (3rd
// hop) with a set of 3 ciphers derived from the hidden PayloadPub of the return
// hops that are layered progressively after the Exit message.
//
@@ -32,28 +41,27 @@ import (
// The header remains a constant size and each node in the Reverse trims off
// their section at the top, moves the next crypt header to the top and pads the
// remainder with noise, so it always looks like the first hop.
func SendExit(port uint16, payload slice.Bytes, id nonce.ID,
client *Session, s Circuit, ks *signer.KeySet) Skins {
func Exit(p ExitParams) Skins {
var prvs [3]*prv.Key
for i := range prvs {
prvs[i] = ks.Next()
prvs[i] = p.KS.Next()
}
n := GenNonces(6)
var returnNonces [3]nonce.IV
copy(returnNonces[:], n[3:])
var pubs [3]*pub.Key
pubs[0] = s[3].PayloadPub
pubs[1] = s[4].PayloadPub
pubs[2] = client.PayloadPub
pubs[0] = p.S[3].PayloadPub
pubs[1] = p.S[4].PayloadPub
pubs[2] = p.Client.PayloadPub
return Skins{}.
ReverseCrypt(s[0], ks.Next(), n[0], 3).
ReverseCrypt(s[1], ks.Next(), n[1], 2).
ReverseCrypt(s[2], ks.Next(), n[2], 1).
Exit(port, prvs, pubs, returnNonces, id, payload).
ReverseCrypt(s[3], prvs[0], n[3], 3).
ReverseCrypt(s[4], prvs[1], n[4], 2).
ReverseCrypt(client, prvs[2], n[5], 1)
ReverseCrypt(p.S[0], p.KS.Next(), n[0], 3).
ReverseCrypt(p.S[1], p.KS.Next(), n[1], 2).
ReverseCrypt(p.S[2], p.KS.Next(), n[2], 1).
Exit(p.Port, prvs, pubs, returnNonces, p.ID, p.Payload).
ReverseCrypt(p.S[3], prvs[0], n[3], 3).
ReverseCrypt(p.S[4], prvs[1], n[4], 2).
ReverseCrypt(p.Client, prvs[2], n[5], 1)
}
func (eng *Engine) exit(ex *exit.Layer, b slice.Bytes,
@@ -113,7 +121,7 @@ func (eng *Engine) SendExit(port uint16, msg slice.Bytes, id nonce.ID,
se := eng.SelectHops(hops, s)
var c Circuit
copy(c[:], se)
o := SendExit(port, msg, id, se[len(se)-1], c, eng.KeySet)
o := Exit(ExitParams{port, msg, id, se[len(se)-1], c, eng.KeySet})
log.D.Ln("sending out exit onion")
res := eng.PostAcctOnion(o)
eng.SendWithOneHook(c[0].AddrPort, res, hook)
@@ -127,7 +135,7 @@ func (eng *Engine) MakeExit(port uint16, msg slice.Bytes, id nonce.ID,
s[2] = exit
se := eng.SelectHops(hops, s)
copy(c[:], se)
o = SendExit(port, msg, id, se[len(se)-1], c, eng.KeySet)
o = Exit(ExitParams{port, msg, id, se[len(se)-1], c, eng.KeySet})
return
}

View File

@@ -21,7 +21,7 @@ func (eng *Engine) confirm(on *confirm.Layer,
// When a confirmation arrives check if it is registered for and run the
// hook that was registered with it.
eng.PendingResponses.Delete(on.ID, nil)
eng.PendingResponses.ProcessAndDelete(on.ID, nil)
}
func (eng *Engine) crypt(on *crypt.Layer, b slice.Bytes,
@@ -111,7 +111,7 @@ func (eng *Engine) response(on *response.Layer, b slice.Bytes,
eng.DecSession(s.ID, relayRate*dataSize, true, typ)
}
}
eng.PendingResponses.Delete(on.ID, on.Bytes)
eng.PendingResponses.ProcessAndDelete(on.ID, on.Bytes)
}
}

View File

@@ -45,8 +45,7 @@ const (
// available. The Node for a client's self should use true in the local
// parameter to not initialise the peer state ring buffers as it won't use them.
func NewNode(addr *netip.AddrPort, idPub *pub.Key, idPrv *prv.Key,
tpt types.Transport, relayRate int,
local bool) (n *Node, id nonce.ID) {
tpt types.Transport, relayRate int, local bool) (n *Node, id nonce.ID) {
id = nonce.NewID()
n = &Node{

View File

@@ -75,9 +75,7 @@ func (p PendingPayments) FindPreimage(pi sha256.Hash) (pp *Payment) {
// PendingPayment accessors. For the same reason as the sessions, pending
// payments need to be accessed only with the node's mutex locked.
func (sm *SessionManager) AddPendingPayment(
np *Payment) {
func (sm *SessionManager) AddPendingPayment(np *Payment) {
sm.Lock()
defer sm.Unlock()
log.D.F("%s adding pending payment %s for %v",
@@ -85,23 +83,17 @@ func (sm *SessionManager) AddPendingPayment(
np.Amount)
sm.PendingPayments = sm.PendingPayments.Add(np)
}
func (sm *SessionManager) DeletePendingPayment(
preimage sha256.Hash) {
func (sm *SessionManager) DeletePendingPayment(preimage sha256.Hash) {
sm.Lock()
defer sm.Unlock()
sm.PendingPayments = sm.PendingPayments.Delete(preimage)
}
func (sm *SessionManager) FindPendingPayment(
id nonce.ID) (pp *Payment) {
func (sm *SessionManager) FindPendingPayment(id nonce.ID) (pp *Payment) {
sm.Lock()
defer sm.Unlock()
return sm.PendingPayments.Find(id)
}
func (sm *SessionManager) FindPendingPreimage(
pi sha256.Hash) (pp *Payment) {
func (sm *SessionManager) FindPendingPreimage(pi sha256.Hash) (pp *Payment) {
log.T.F("searching preimage %x", pi)
sm.Lock()
defer sm.Unlock()

View File

@@ -117,8 +117,15 @@ func (eng *Engine) SendWithOneHook(ap *netip.AddrPort, res SendData,
log.D.Ln("nil response hook")
}
}
eng.PendingResponses.Add(res.last, len(res.b), res.sessions, res.billable,
res.ret, res.port, responseHook, res.postAcct)
eng.PendingResponses.Add(PendingResponseParams{
res.last,
len(res.b),
res.sessions,
res.billable,
res.ret,
res.port,
responseHook,
res.postAcct})
log.T.Ln("sending out onion")
eng.Send(ap, res.b)
}

View File

@@ -70,8 +70,7 @@ func NewSession(
// IncSats adds to the Remaining counter, used when new data allowance has been
// purchased.
func (s *Session) IncSats(sats lnwire.MilliSatoshi,
sender bool, typ string) {
func (s *Session) IncSats(sats lnwire.MilliSatoshi, sender bool, typ string) {
who := "relay"
if sender {
who = "client"
@@ -84,8 +83,9 @@ func (s *Session) IncSats(sats lnwire.MilliSatoshi,
// DecSats reduces the amount Remaining, if the requested amount would put
// the total below zero it returns false, signalling that new data allowance
// needs to be purchased before any further messages can be sent.
func (s *Session) DecSats(sats lnwire.MilliSatoshi,
sender bool, typ string) bool {
func (s *Session) DecSats(sats lnwire.MilliSatoshi, sender bool,
typ string) bool {
if s.Remaining < sats {
return false
}
@@ -141,7 +141,7 @@ func (eng *Engine) session(on *session.Layer, b slice.Bytes,
// different hop numbers to relays with existing sessions. Note that all 5 of
// the sessions will be paid the amount specified, not divided up.
func (eng *Engine) BuyNewSessions(amount lnwire.MilliSatoshi,
hook func()) (e error) {
fn func()) (e error) {
var nodes [5]*Node
nodes = eng.SessionManager.SelectUnusedCircuit()
@@ -219,7 +219,7 @@ func (eng *Engine) BuyNewSessions(amount lnwire.MilliSatoshi,
eng.Sessions = append(eng.Sessions, sessions[i])
eng.SessionManager.PendingPayments.Delete(s[i].PreimageHash())
}
hook()
fn()
})
return
}

View File

@@ -96,7 +96,7 @@ func (sm *SessionManager) AddSession(s *Session) {
sm.Sessions = append(sm.Sessions, s)
// log.D.S(s.ID, sm.Sessions)
// Hop 5, the return session( s) are not added to the SessionCache as they
// are not billable and are only related to the node of the Engine.
// are not Billable and are only related to the node of the Engine.
if s.Hop < 5 {
log.D.Ln("storing session", s)
sm.SessionCache = sm.SessionCache.Add(s)
@@ -158,9 +158,9 @@ func (sm *SessionManager) DeleteSession(id nonce.ID) {
defer sm.Unlock()
for i := range sm.Sessions {
if sm.Sessions[i].ID == id {
// Delete from Session cache.
// ProcessAndDelete from Session cache.
sm.SessionCache[sm.Sessions[i].Node.ID][sm.Sessions[i].Hop] = nil
// Delete from Sessions.
// ProcessAndDelete from Sessions.
sm.Sessions = append(sm.Sessions[:i], sm.Sessions[i+1:]...)
}
}
@@ -219,7 +219,7 @@ func (sm *SessionManager) DeleteNodeAndSessions(id nonce.ID) {
return
}
delete(sm.SessionCache, id)
// Delete from the nodes list.
// ProcessAndDelete from the nodes list.
for i := range sm.nodes {
if sm.nodes[i].ID == id {
sm.nodes = append(sm.nodes[:i], sm.nodes[i+1:]...)

View File

@@ -9,15 +9,16 @@ import (
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
func CreateNMockCircuits(inclSessions bool, nCircuits int, nReturnSessions int) (cl []*Engine, e error) {
func createNMockCircuits(inclSessions bool, nCircuits int,
nReturnSessions int) (cl []*Engine, e error) {
nTotal := 1 + nCircuits*5
cl = make([]*Engine, nTotal)
nodes := make([]*Node, nTotal)
transports := make([]types.Transport, nTotal)
tpts := make([]types.Transport, nTotal)
sessions := make(Sessions, nTotal-1)
for i := range transports {
transports[i] = transport.NewSim(nTotal)
for i := range tpts {
tpts[i] = transport.NewSim(nTotal)
}
for i := range nodes {
var idPrv *prv.Key
@@ -30,10 +31,14 @@ func CreateNMockCircuits(inclSessions bool, nCircuits int, nReturnSessions int)
if i == 0 {
local = true
}
nodes[i], _ = NewNode(addr, idPub, idPrv, transports[i], 50000,
local)
if cl[i], e = NewEngine(transports[i], idPrv, nodes[i], nil,
nReturnSessions); check(e) {
nodes[i], _ = NewNode(addr, idPub, idPrv, tpts[i], 50000, local)
if cl[i], e = NewEngine(EngineParams{
tpts[i],
idPrv,
nodes[i],
nil,
nReturnSessions},
); check(e) {
return
}
cl[i].SetLocalNodeAddress(nodes[i].AddrPort)
@@ -66,3 +71,11 @@ func CreateNMockCircuits(inclSessions bool, nCircuits int, nReturnSessions int)
}
return
}
func CreateNMockCircuits(nCirc int, nReturns int) (cl []*Engine, e error) {
return createNMockCircuits(false, nCirc, nReturns)
}
func CreateNMockCircuitsWithSessions(nCirc int, nReturns int) (cl []*Engine, e error) {
return createNMockCircuits(true, nCirc, nReturns)
}