broadcast intro and gossiping

This commit is contained in:
херетик
2023-02-27 18:54:20 +00:00
parent 85e04caa07
commit e58884a8e3
3 changed files with 121 additions and 19 deletions

View File

@@ -308,7 +308,7 @@ func TestClient_HiddenServiceBroadcast(t *testing.T) {
var clients []*Engine
var e error
const returns = 2
if clients, e = CreateNMockCircuits(false, 5, returns); check(e) {
if clients, e = CreateNMockCircuits(false, 10, returns); check(e) {
t.Error(e)
t.FailNow()
}
@@ -349,7 +349,63 @@ func TestClient_HiddenServiceBroadcast(t *testing.T) {
il, func(id nonce.ID, b slice.Bytes) {
log.I.Ln("success")
})
time.Sleep(time.Second * 5)
time.Sleep(time.Second * 60)
for _, v := range clients {
v.Shutdown()
}
}
func TestClient_HiddenServiceRequest(t *testing.T) {
log2.SetLogLevel(log2.Info)
var clients []*Engine
var e error
const returns = 2
if clients, e = CreateNMockCircuits(false, 10, returns); check(e) {
t.Error(e)
t.FailNow()
}
// Start up the clients.
for _, v := range clients {
go v.Start()
}
// Fund the client for all hops on all nodes.
var wg sync.WaitGroup
var counter atomic.Int32
for i := 0; i < 25; i++ {
log.D.Ln("buying sessions", i)
wg.Add(1)
counter.Inc()
e = clients[0].BuyNewSessions(1000000, func() {
wg.Done()
counter.Dec()
})
if check(e) {
wg.Done()
counter.Dec()
}
wg.Wait()
for j := range clients[0].SessionCache {
log.D.F("%d %s %v", i, j, clients[0].SessionCache[j])
}
}
var identPrv *prv.Key
if identPrv, e = prv.GenerateKey(); check(e) {
t.Error(e)
t.FailNow()
}
log2.SetLogLevel(log2.Trace)
// identPub := pub.Derive(identPrv)
id := nonce.NewID()
il := intro.New(identPrv, clients[0].GetLocalNodeAddress())
clients[0].SendIntro(id, clients[0].Sessions[returns],
il, func(id nonce.ID, b slice.Bytes) {
log.I.Ln("success")
})
// In this test environment generally every node has the intro after 1
// second.
time.Sleep(time.Second)
// Now to test nodes requesting the address (even though they already know
// it).
for _, v := range clients {
v.Shutdown()
}

View File

@@ -2,7 +2,6 @@ package relay
import (
"sync"
"time"
"github.com/cybriq/qu"
@@ -18,6 +17,8 @@ type Intros map[pub.Bytes]slice.Bytes
type NotifiedIntroducers map[pub.Bytes][]nonce.ID
type KnownIntros map[pub.Bytes]*intro.Layer
// Introductions is a map of existing known hidden service keys and the
// routing header for requesting a new one on behalf of the client.
//
@@ -29,14 +30,25 @@ type Introductions struct {
sync.Mutex
Intros
NotifiedIntroducers
KnownIntros
}
func NewIntroductions() *Introductions {
return &Introductions{Intros: make(Intros),
NotifiedIntroducers: make(NotifiedIntroducers)}
NotifiedIntroducers: make(NotifiedIntroducers),
KnownIntros: make(KnownIntros)}
}
func (in *Introductions) Find(key pub.Bytes) (header slice.Bytes) {
in.Lock()
var ok bool
if header, ok = in.Intros[key]; ok {
}
in.Unlock()
return
}
func (in *Introductions) Delete(key pub.Bytes) (header slice.Bytes) {
in.Lock()
var ok bool
if header, ok = in.Intros[key]; ok {
@@ -87,15 +99,6 @@ func (eng *Engine) SendIntro(id nonce.ID, target *Session, intr *intro.Layer,
eng.SendWithOneHook(c[0].AddrPort, res, hook)
}
func (eng *Engine) intro(intr *intro.Layer, b slice.Bytes,
c *slice.Cursor, prev types.Onion) {
if intr.Validate() {
log.D.F("sending out intro to %s at %s to all known peers",
intr.Key.ToBase32(), intr.AddrPort.String())
}
}
func (eng *Engine) introductionBroadcaster(intr *intro.Layer) {
log.D.F("propagating hidden service introduction for %x", intr.Key.ToBytes())
done := qu.T()
@@ -104,7 +107,7 @@ func (eng *Engine) introductionBroadcaster(intr *intro.Layer) {
intr.Encode(msg, c)
nPeers := eng.NodesLen()
peerIndices := make([]int, nPeers)
for i := 0; i < nPeers; i++ {
for i := 1; i < nPeers; i++ {
peerIndices[i] = i
}
cryptorand.Shuffle(nPeers, func(i, j int) {
@@ -113,7 +116,6 @@ func (eng *Engine) introductionBroadcaster(intr *intro.Layer) {
// Since relays will also gossip this information, we will start a ticker
// that sends out the hidden service introduction once a second until it
// runs out of known relays to gossip to.
ticker := time.NewTicker(time.Second)
var cursor int
for {
select {
@@ -121,10 +123,51 @@ func (eng *Engine) introductionBroadcaster(intr *intro.Layer) {
return
case <-done:
return
case <-ticker.C:
n := eng.FindNodeByIndex(peerIndices[cursor])
n.Transport.Send(msg)
cursor++
default:
}
n := eng.FindNodeByIndex(peerIndices[cursor])
n.Transport.Send(msg)
cursor++
if cursor > len(peerIndices)-1 {
break
}
}
log.T.Ln("finished broadcasting intro")
}
func (eng *Engine) intro(intr *intro.Layer, b slice.Bytes,
c *slice.Cursor, prev types.Onion) {
eng.Introductions.Lock()
if intr.Validate() {
if _, ok := eng.Introductions.KnownIntros[intr.Key.ToBytes()]; ok {
log.T.Ln("received intro we already know about")
eng.Introductions.Unlock()
return
}
log.T.F("storing intro for %s", intr.Key.ToBase32())
eng.Introductions.KnownIntros[intr.Key.ToBytes()] = intr
log.D.F("%s sending out intro to %s at %s to all known peers",
eng.GetLocalNodeAddress(), intr.Key.ToBase32(),
intr.AddrPort.String())
sender := eng.SessionManager.FindNodeByAddrPort(intr.AddrPort)
nodes := make(map[nonce.ID]*Node)
eng.SessionManager.ForEachNode(func(n *Node) bool {
if n.ID != sender.ID {
nodes[n.ID] = n
}
return false
})
counter := 0
for i := range nodes {
log.T.F("sending intro to %s", nodes[i].AddrPort.String())
nodes[i].Transport.Send(b)
counter++
if counter < 2 {
continue
}
break
}
eng.Introductions.Unlock()
}
}

View File

@@ -166,6 +166,9 @@ func (sm *SessionManager) ForEachNode(fn func(n *Node) bool) {
sm.Lock()
defer sm.Unlock()
for i := range sm.nodes {
if i == 0 {
continue
}
if fn(sm.nodes[i]) {
return
}