From e58884a8e3800890e876b92cc92741819e4dd153 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D1=85=D0=B5=D1=80=D0=B5=D1=82=D0=B8=D0=BA?= Date: Mon, 27 Feb 2023 18:54:20 +0000 Subject: [PATCH] broadcast intro and gossiping --- pkg/relay/engine_test.go | 60 ++++++++++++++++++++++++++++- pkg/relay/introductions.go | 77 +++++++++++++++++++++++++++++--------- pkg/relay/nodes.go | 3 ++ 3 files changed, 121 insertions(+), 19 deletions(-) diff --git a/pkg/relay/engine_test.go b/pkg/relay/engine_test.go index aef8962d..07e8991e 100644 --- a/pkg/relay/engine_test.go +++ b/pkg/relay/engine_test.go @@ -308,7 +308,7 @@ func TestClient_HiddenServiceBroadcast(t *testing.T) { var clients []*Engine var e error const returns = 2 - if clients, e = CreateNMockCircuits(false, 5, returns); check(e) { + if clients, e = CreateNMockCircuits(false, 10, returns); check(e) { t.Error(e) t.FailNow() } @@ -349,7 +349,63 @@ func TestClient_HiddenServiceBroadcast(t *testing.T) { il, func(id nonce.ID, b slice.Bytes) { log.I.Ln("success") }) - time.Sleep(time.Second * 5) + time.Sleep(time.Second * 60) + for _, v := range clients { + v.Shutdown() + } +} + +func TestClient_HiddenServiceRequest(t *testing.T) { + log2.SetLogLevel(log2.Info) + var clients []*Engine + var e error + const returns = 2 + if clients, e = CreateNMockCircuits(false, 10, returns); check(e) { + t.Error(e) + t.FailNow() + } + // Start up the clients. + for _, v := range clients { + go v.Start() + } + // Fund the client for all hops on all nodes. + var wg sync.WaitGroup + var counter atomic.Int32 + for i := 0; i < 25; i++ { + log.D.Ln("buying sessions", i) + wg.Add(1) + counter.Inc() + e = clients[0].BuyNewSessions(1000000, func() { + wg.Done() + counter.Dec() + }) + if check(e) { + wg.Done() + counter.Dec() + } + wg.Wait() + for j := range clients[0].SessionCache { + log.D.F("%d %s %v", i, j, clients[0].SessionCache[j]) + } + } + var identPrv *prv.Key + if identPrv, e = prv.GenerateKey(); check(e) { + t.Error(e) + t.FailNow() + } + log2.SetLogLevel(log2.Trace) + // identPub := pub.Derive(identPrv) + id := nonce.NewID() + il := intro.New(identPrv, clients[0].GetLocalNodeAddress()) + clients[0].SendIntro(id, clients[0].Sessions[returns], + il, func(id nonce.ID, b slice.Bytes) { + log.I.Ln("success") + }) + // In this test environment generally every node has the intro after 1 + // second. + time.Sleep(time.Second) + // Now to test nodes requesting the address (even though they already know + // it). for _, v := range clients { v.Shutdown() } diff --git a/pkg/relay/introductions.go b/pkg/relay/introductions.go index db5beef6..0f1869d9 100644 --- a/pkg/relay/introductions.go +++ b/pkg/relay/introductions.go @@ -2,7 +2,6 @@ package relay import ( "sync" - "time" "github.com/cybriq/qu" @@ -18,6 +17,8 @@ type Intros map[pub.Bytes]slice.Bytes type NotifiedIntroducers map[pub.Bytes][]nonce.ID +type KnownIntros map[pub.Bytes]*intro.Layer + // Introductions is a map of existing known hidden service keys and the // routing header for requesting a new one on behalf of the client. // @@ -29,14 +30,25 @@ type Introductions struct { sync.Mutex Intros NotifiedIntroducers + KnownIntros } func NewIntroductions() *Introductions { return &Introductions{Intros: make(Intros), - NotifiedIntroducers: make(NotifiedIntroducers)} + NotifiedIntroducers: make(NotifiedIntroducers), + KnownIntros: make(KnownIntros)} } func (in *Introductions) Find(key pub.Bytes) (header slice.Bytes) { + in.Lock() + var ok bool + if header, ok = in.Intros[key]; ok { + } + in.Unlock() + return +} + +func (in *Introductions) Delete(key pub.Bytes) (header slice.Bytes) { in.Lock() var ok bool if header, ok = in.Intros[key]; ok { @@ -87,15 +99,6 @@ func (eng *Engine) SendIntro(id nonce.ID, target *Session, intr *intro.Layer, eng.SendWithOneHook(c[0].AddrPort, res, hook) } -func (eng *Engine) intro(intr *intro.Layer, b slice.Bytes, - c *slice.Cursor, prev types.Onion) { - - if intr.Validate() { - log.D.F("sending out intro to %s at %s to all known peers", - intr.Key.ToBase32(), intr.AddrPort.String()) - } -} - func (eng *Engine) introductionBroadcaster(intr *intro.Layer) { log.D.F("propagating hidden service introduction for %x", intr.Key.ToBytes()) done := qu.T() @@ -104,7 +107,7 @@ func (eng *Engine) introductionBroadcaster(intr *intro.Layer) { intr.Encode(msg, c) nPeers := eng.NodesLen() peerIndices := make([]int, nPeers) - for i := 0; i < nPeers; i++ { + for i := 1; i < nPeers; i++ { peerIndices[i] = i } cryptorand.Shuffle(nPeers, func(i, j int) { @@ -113,7 +116,6 @@ func (eng *Engine) introductionBroadcaster(intr *intro.Layer) { // Since relays will also gossip this information, we will start a ticker // that sends out the hidden service introduction once a second until it // runs out of known relays to gossip to. - ticker := time.NewTicker(time.Second) var cursor int for { select { @@ -121,10 +123,51 @@ func (eng *Engine) introductionBroadcaster(intr *intro.Layer) { return case <-done: return - case <-ticker.C: - n := eng.FindNodeByIndex(peerIndices[cursor]) - n.Transport.Send(msg) - cursor++ + default: + } + n := eng.FindNodeByIndex(peerIndices[cursor]) + n.Transport.Send(msg) + cursor++ + if cursor > len(peerIndices)-1 { + break } } + log.T.Ln("finished broadcasting intro") +} + +func (eng *Engine) intro(intr *intro.Layer, b slice.Bytes, + c *slice.Cursor, prev types.Onion) { + + eng.Introductions.Lock() + if intr.Validate() { + if _, ok := eng.Introductions.KnownIntros[intr.Key.ToBytes()]; ok { + log.T.Ln("received intro we already know about") + eng.Introductions.Unlock() + return + } + log.T.F("storing intro for %s", intr.Key.ToBase32()) + eng.Introductions.KnownIntros[intr.Key.ToBytes()] = intr + log.D.F("%s sending out intro to %s at %s to all known peers", + eng.GetLocalNodeAddress(), intr.Key.ToBase32(), + intr.AddrPort.String()) + sender := eng.SessionManager.FindNodeByAddrPort(intr.AddrPort) + nodes := make(map[nonce.ID]*Node) + eng.SessionManager.ForEachNode(func(n *Node) bool { + if n.ID != sender.ID { + nodes[n.ID] = n + } + return false + }) + counter := 0 + for i := range nodes { + log.T.F("sending intro to %s", nodes[i].AddrPort.String()) + nodes[i].Transport.Send(b) + counter++ + if counter < 2 { + continue + } + break + } + eng.Introductions.Unlock() + } } diff --git a/pkg/relay/nodes.go b/pkg/relay/nodes.go index 2da16516..2aa4c64c 100644 --- a/pkg/relay/nodes.go +++ b/pkg/relay/nodes.go @@ -166,6 +166,9 @@ func (sm *SessionManager) ForEachNode(fn func(n *Node) bool) { sm.Lock() defer sm.Unlock() for i := range sm.nodes { + if i == 0 { + continue + } if fn(sm.nodes[i]) { return }