send session keys test working for real

This commit is contained in:
херетик
2023-02-03 19:30:18 +00:00
parent e3b3aa06b2
commit 815cd2e5a1
11 changed files with 168 additions and 59 deletions

View File

@@ -20,7 +20,7 @@ import (
// have steadily increasing scores from successful pings.
func Ping(id nonce.ID, client *traffic.Session, s traffic.Circuit,
ks *signer.KeySet) Skins {
n := GenPingNonces()
return Skins{}.
ForwardCrypt(s[0], ks.Next(), n[0]).
@@ -51,7 +51,7 @@ func Ping(id nonce.ID, client *traffic.Session, s traffic.Circuit,
// remainder with noise, so it always looks like the first hop.
func SendExit(port uint16, payload slice.Bytes, id nonce.ID,
client *traffic.Session, s traffic.Circuit, ks *signer.KeySet) Skins {
var prvs [3]*prv.Key
for i := range prvs {
prvs[i] = ks.Next()
@@ -97,11 +97,11 @@ func SendExit(port uint16, payload slice.Bytes, id nonce.ID,
// the HeaderPub instead. Not allowing free relay at all prevents spam attacks.
func SendKeys(id nonce.ID, s [5]*session.Layer,
client *traffic.Session, hop []*traffic.Node, ks *signer.KeySet) Skins {
n := GenNonces(6)
sk := Skins{}
for i := range s {
sk.ForwardSession(hop[i], ks.Next(), n[i], s[i])
sk = sk.ForwardSession(hop[i], ks.Next(), n[i], s[i])
}
return sk.
ForwardCrypt(client, ks.Next(), n[5]).
@@ -121,7 +121,7 @@ func SendKeys(id nonce.ID, s [5]*session.Layer,
// addressed to the Header key.
func GetBalance(s traffic.Circuit, target int, returns [3]*traffic.Session,
ks *signer.KeySet, id nonce.ID) (o Skins) {
if target == 0 || target == 4 {
n := GenNonces(2)
o = o.ForwardCrypt(s[target], ks.Next(), n[0]).

View File

@@ -27,6 +27,7 @@ type Engine struct {
*traffic.SessionManager
*signer.KeySet
Load byte
Pause qu.C
ShuttingDown atomic.Bool
qu.C
}
@@ -45,6 +46,7 @@ func NewEngine(tpt types.Transport, hdrPrv *prv.Key, no *traffic.Node,
PendingResponses: &PendingResponses{},
KeySet: ks,
SessionManager: traffic.NewSessionManager(),
Pause: qu.T(),
C: qu.T(),
}
c.AddNodes(append([]*traffic.Node{no}, nodes...)...)
@@ -78,6 +80,7 @@ func (eng *Engine) Shutdown() {
log.T.C(func() string {
return "shutting down client " + eng.GetLocalNodeAddress().String()
})
eng.Cleanup()
eng.ShuttingDown.Store(true)
eng.C.Q()
}

View File

@@ -5,10 +5,10 @@ import (
"sync"
"testing"
"time"
"github.com/cybriq/qu"
"go.uber.org/atomic"
"git-indra.lan/indra-labs/indra/pkg/crypto/nonce"
"git-indra.lan/indra-labs/indra/pkg/crypto/sha256"
log2 "git-indra.lan/indra-labs/indra/pkg/proc/log"
@@ -43,7 +43,7 @@ func TestClient_SendSessionKeys(t *testing.T) {
for i := 0; i < int(counter.Load()); i++ {
wg.Done()
}
t.Error("SendExit test failed")
t.Error("SendSessionKeys test failed")
os.Exit(1)
}()
wg.Add(1)
@@ -54,17 +54,16 @@ func TestClient_SendSessionKeys(t *testing.T) {
}
})
wg.Wait()
time.Sleep(time.Second)
for _, v := range clients {
v.Shutdown()
}
}
func TestClient_SendPing(t *testing.T) {
log2.SetLogLevel(log2.Debug)
func TestClient_ExitTxFailureDiagnostics(t *testing.T) {
log2.SetLogLevel(log2.Trace)
var clients []*Engine
var e error
if clients, e = CreateNMockCircuits(true, 2); check(e) {
if clients, e = CreateNMockCircuits(false, 1); check(e) {
t.Error(e)
t.FailNow()
}
@@ -72,43 +71,72 @@ func TestClient_SendPing(t *testing.T) {
for _, v := range clients {
go v.Start()
}
quit := qu.T()
var wg sync.WaitGroup
var counter atomic.Int32
go func() {
select {
case <-time.After(time.Second):
case <-quit:
return
case <-time.After(time.Second * 2):
}
quit.Q()
t.Error("SendPing test failed")
for i := 0; i < int(counter.Load()); i++ {
wg.Done()
}
t.Error("TxFailureDiagnostics test failed")
os.Exit(1)
}()
out:
for i := 1; i < len(clients[0].Sessions)-1; i++ {
wg.Add(1)
var c traffic.Circuit
sess := clients[0].Sessions[i]
c[sess.Hop] = clients[0].Sessions[i]
clients[0].SendPing(c,
func(id nonce.ID, b slice.Bytes) {
log.I.Ln("success")
wg.Done()
})
select {
case <-quit:
break out
default:
}
wg.Wait()
}
quit.Q()
wg.Add(1)
counter.Inc()
clients[0].BuyNewSessions(1000000, func() {
wg.Done()
counter.Dec()
log.D.Ln("session buy done")
})
wg.Wait()
log.D.Ln("starting fail test")
const port = 3455
// // Now we will disable each of the nodes one by one and run a discovery
// // process to find the "failed" node.
// for _, v := range clients[1:] {
// // Pause the node (also clearing its channels and caches).
// v.Pause.Signal()
// // Try to send out an Exit message.
// var msg slice.Bytes
// if msg, _, e = tests.GenMessage(64, "request"); check(e) {
// t.Error(e)
// t.FailNow()
// }
// var respMsg slice.Bytes
// var respHash sha256.Hash
// if respMsg, respHash, e = tests.GenMessage(32, "response"); check(e) {
// t.Error(e)
// t.FailNow()
// }
// _ = respHash
// sess := clients[0].Sessions[1]
// var c traffic.Circuit
// c[sess.Hop] = clients[0].Sessions[1]
// id := nonce.NewID()
// log.D.Ln("sending out onion that will fail")
// clients[0].SendExit(port, msg, id, clients[0].Sessions[1],
// func(idd nonce.ID, b slice.Bytes) {
// log.D.Ln("this shouldn't print!")
// })
// log.D.Ln("listening for message")
// bb := <-clients[3].ReceiveToLocalNode(port)
// log.T.S(bb.ToBytes())
// if e = clients[3].SendFromLocalNode(port, respMsg); check(e) {
// t.Error("fail send")
// }
// log.T.Ln("response sent")
// // Resume the node.
// v.Pause.Signal()
// }
for _, v := range clients {
v.Shutdown()
}
}
func TestClient_SendExit(t *testing.T) {
log2.SetLogLevel(log2.Debug)
log2.SetLogLevel(log2.Trace)
var clients []*Engine
var e error
if clients, e = CreateNMockCircuits(true, 2); check(e) {
@@ -191,6 +219,53 @@ out:
}
}
func TestClient_SendPing(t *testing.T) {
log2.SetLogLevel(log2.Debug)
var clients []*Engine
var e error
if clients, e = CreateNMockCircuits(true, 2); check(e) {
t.Error(e)
t.FailNow()
}
// Start up the clients.
for _, v := range clients {
go v.Start()
}
quit := qu.T()
var wg sync.WaitGroup
go func() {
select {
case <-time.After(time.Second):
case <-quit:
return
}
quit.Q()
t.Error("SendPing test failed")
}()
out:
for i := 1; i < len(clients[0].Sessions)-1; i++ {
wg.Add(1)
var c traffic.Circuit
sess := clients[0].Sessions[i]
c[sess.Hop] = clients[0].Sessions[i]
clients[0].SendPing(c,
func(id nonce.ID, b slice.Bytes) {
log.I.Ln("success")
wg.Done()
})
select {
case <-quit:
break out
default:
}
wg.Wait()
}
quit.Q()
for _, v := range clients {
v.Shutdown()
}
}
func TestClient_SendGetBalance(t *testing.T) {
log2.SetLogLevel(log2.Debug)
var clients []*Engine

View File

@@ -2,7 +2,7 @@ package relay
import (
"git-indra.lan/indra-labs/lnd/lnd/lnwire"
"git-indra.lan/indra-labs/indra/pkg/crypto/key/pub"
"git-indra.lan/indra-labs/indra/pkg/crypto/nonce"
"git-indra.lan/indra-labs/indra/pkg/onion"
@@ -18,7 +18,7 @@ import (
func (eng *Engine) crypt(on *crypt.Layer, b slice.Bytes,
c *slice.Cursor, prev types.Onion) {
// this is probably an encrypted crypt for us.
hdr, _, sess, identity := eng.FindCloaked(on.Cloak)
if hdr == nil {
@@ -28,13 +28,12 @@ func (eng *Engine) crypt(on *crypt.Layer, b slice.Bytes,
on.ToPriv = hdr
on.Decrypt(hdr, b, c)
if identity {
log.T.F("identity")
if string(b[*c:][:magicbytes.Len]) != session.MagicString {
log.T.Ln("dropping message due to identity key with" +
" no following session")
return
}
eng.handleMessage(BudgeUp(b, *c), on)
return
}

View File

@@ -3,9 +3,9 @@ package relay
import (
"fmt"
"reflect"
"github.com/davecgh/go-spew/spew"
"git-indra.lan/indra-labs/indra/pkg/onion"
"git-indra.lan/indra-labs/indra/pkg/onion/layers/balance"
"git-indra.lan/indra-labs/indra/pkg/onion/layers/confirm"
@@ -59,12 +59,22 @@ func (eng *Engine) handler() (out bool) {
// Later this will wait with a timeout on the lnd node returning the
// success to trigger this.
p.ConfirmChan <- true
case <-eng.Pause:
log.D.Ln("pausing", eng.GetLocalNodeAddress())
// For testing purposes we need to halt this handler and clear
// all the channels after the resume signal.
select {
case <-eng.C.Wait():
break
case <-eng.Pause:
// This will then resume to the top level select.
}
}
return
}
func (eng *Engine) handleMessage(b slice.Bytes, prev types.Onion) {
log.T.Ln("process received message")
log.T.F("%v handling received message", eng.GetLocalNodeAddress())
var on types.Onion
var e error
c := slice.NewCursor()

View File

@@ -2,7 +2,7 @@ package relay
import (
"git-indra.lan/indra-labs/lnd/lnd/lnwire"
"git-indra.lan/indra-labs/indra/pkg/crypto/nonce"
"git-indra.lan/indra-labs/indra/pkg/onion"
"git-indra.lan/indra-labs/indra/pkg/onion/layers/session"
@@ -16,6 +16,8 @@ import (
// the sessions will be paid the amount specified, not divided up.
func (eng *Engine) BuyNewSessions(amount lnwire.MilliSatoshi,
hook func()) {
log.D.Ln("buying new sessions")
var nodes [5]*traffic.Node
nodes = eng.SessionManager.SelectUnusedCircuit(nodes)
// Get a random return hop session (index 5).
@@ -51,19 +53,19 @@ func (eng *Engine) BuyNewSessions(amount lnwire.MilliSatoshi,
if success {
pendingConfirms--
} else {
}
case success = <-confirmChans[1]:
if success {
pendingConfirms--
} else {
}
case success = <-confirmChans[2]:
if success {
pendingConfirms--
} else {
}
case success = <-confirmChans[3]:
if success {
@@ -73,7 +75,7 @@ func (eng *Engine) BuyNewSessions(amount lnwire.MilliSatoshi,
if success {
pendingConfirms--
} else {
}
}
}
@@ -86,6 +88,7 @@ func (eng *Engine) BuyNewSessions(amount lnwire.MilliSatoshi,
sessions[i] = traffic.NewSession(s[i].ID, nodes[i], amount,
s[i].Header, s[i].Payload, byte(i))
eng.SessionManager.Add(sessions[i])
eng.SessionManager.AddSession(sessions[i])
eng.SessionManager.DeletePendingPayment(s[i].PreimageHash())
}
hook()

View File

@@ -2,7 +2,7 @@ package relay
import (
"net/netip"
"git-indra.lan/indra-labs/indra/pkg/traffic"
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
@@ -12,15 +12,20 @@ func (eng *Engine) Send(addr *netip.AddrPort, b slice.Bytes) {
// first search if we already have the node available with connection
// open.
as := addr.String()
failed := false
eng.ForEachNode(func(n *traffic.Node) bool {
if as == n.AddrPort.String() {
n.Transport.Send(b)
log.D.F("sending message to %v", addr)
return true
}
failed = true
return false
})
// If we got to here none of the addresses matched, and we need to
// establish a new peer connection to them, if we know of them (this
// would usually be the reason this happens).
log.T.Ln("could not find peer with address", addr.String())
if failed {
log.T.Ln("could not find peer with address", addr.String())
}
}

View File

@@ -9,7 +9,7 @@ import (
func (eng *Engine) SendExit(port uint16, message slice.Bytes, id nonce.ID,
target *traffic.Session, hook func(id nonce.ID, b slice.Bytes)) {
hops := []byte{0, 1, 2, 3, 4, 5}
s := make(traffic.Sessions, len(hops))
s[2] = target
@@ -17,5 +17,6 @@ func (eng *Engine) SendExit(port uint16, message slice.Bytes, id nonce.ID,
var c traffic.Circuit
copy(c[:], se)
o := onion.SendExit(port, message, id, se[len(se)-1], c, eng.KeySet)
log.D.Ln("sending out exit onion")
eng.SendOnion(c[0].AddrPort, o, hook)
}

View File

@@ -2,9 +2,9 @@ package relay
import (
"net/netip"
"git-indra.lan/indra-labs/lnd/lnd/lnwire"
"git-indra.lan/indra-labs/indra/pkg/crypto/nonce"
"git-indra.lan/indra-labs/indra/pkg/onion"
"git-indra.lan/indra-labs/indra/pkg/onion/layers/balance"
@@ -20,7 +20,7 @@ import (
func (eng *Engine) SendOnion(ap *netip.AddrPort, o onion.Skins,
responseHook func(id nonce.ID, b slice.Bytes)) {
b := onion.Encode(o.Assemble())
var billable, accounted []nonce.ID
var ret nonce.ID
@@ -100,5 +100,5 @@ func (eng *Engine) SendOnion(ap *netip.AddrPort, o onion.Skins,
eng.PendingResponses.Add(last, len(b), billable, accounted, ret, port, responseHook)
log.T.Ln("sending out onion")
eng.Send(ap, b)
}

View File

@@ -6,7 +6,7 @@ import (
func (sm *SessionManager) SelectHops(hops []byte,
alreadyHave Sessions) (so Sessions) {
sm.Lock()
defer sm.Unlock()
ws := make(Sessions, 0)
@@ -87,7 +87,6 @@ func (sm *SessionManager) SelectUnusedCircuit(nodes [5]*Node) (c [5]*Node) {
continue
}
if _, ok := sm.SessionCache[nodeList[j].ID]; !ok {
log.D.Ln("adding node")
c[i] = nodeList[j]
// nil the entry so it isn't selected again
nodeList[j] = nil

View File

@@ -36,6 +36,20 @@ func NewSessionManager() *SessionManager {
}
}
// ClearPendingPayments is used only for debugging, removing all pending
// payments, making the engine forget about payments it received.
func (sm *SessionManager) ClearPendingPayments() {
log.D.Ln("clearing pending payments")
sm.pendingPayments = sm.pendingPayments[:0]
}
// ClearSessions is used only for debugging, removing all but the first session,
// which is the engine's initial return session.
func (sm *SessionManager) ClearSessions() {
log.D.Ln("clearing sessions")
sm.Sessions = sm.Sessions[:1]
}
func (sm *SessionManager) IncSession(id nonce.ID, sats lnwire.MilliSatoshi,
sender bool, typ string) {