From a3b6562b038687797c4d88892055f1e1efbdd89d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D1=85=D0=B5=D1=80=D0=B5=D1=82=D0=B8=D0=BA?= Date: Mon, 27 Feb 2023 11:03:08 +0000 Subject: [PATCH] merged sending stuff together --- pkg/relay/hiddenservice.go | 43 +---------------- pkg/relay/introductions.go | 37 ++++++++++++++ pkg/relay/postacct.go | 99 -------------------------------------- pkg/relay/send.go | 93 +++++++++++++++++++++++++++++++++++ 4 files changed, 131 insertions(+), 141 deletions(-) delete mode 100644 pkg/relay/postacct.go diff --git a/pkg/relay/hiddenservice.go b/pkg/relay/hiddenservice.go index 7696938d..2540b8d3 100644 --- a/pkg/relay/hiddenservice.go +++ b/pkg/relay/hiddenservice.go @@ -1,10 +1,6 @@ package relay import ( - "time" - - "github.com/cybriq/qu" - "git-indra.lan/indra-labs/indra/pkg/crypto/key/prv" "git-indra.lan/indra-labs/indra/pkg/crypto/key/pub" "git-indra.lan/indra-labs/indra/pkg/crypto/key/signer" @@ -12,48 +8,11 @@ import ( "git-indra.lan/indra-labs/indra/pkg/messages/hiddenservice" "git-indra.lan/indra-labs/indra/pkg/messages/intro" "git-indra.lan/indra-labs/indra/pkg/types" - "git-indra.lan/indra-labs/indra/pkg/util/cryptorand" "git-indra.lan/indra-labs/indra/pkg/util/slice" ) type Referrers map[pub.Bytes][]pub.Bytes -func (eng *Engine) hiddenserviceBroadcaster(hs *intro.Layer) { - log.D.F("propagating hidden service introduction for %x", hs.Key.ToBytes()) - done := qu.T() - intr := &intro.Layer{ - Key: hs.Key, AddrPort: hs.AddrPort, Bytes: hs.Bytes, - } - msg := make(slice.Bytes, intro.Len) - c := slice.NewCursor() - intr.Encode(msg, c) - nPeers := eng.NodesLen() - peerIndices := make([]int, nPeers) - for i := 0; i < nPeers; i++ { - peerIndices[i] = i - } - cryptorand.Shuffle(nPeers, func(i, j int) { - peerIndices[i], peerIndices[j] = peerIndices[j], peerIndices[i] - }) - // Since relays will also gossip this information, we will start a ticker - // that sends out the hidden service introduction once a second until it - // runs out of known relays to gossip to. - ticker := time.NewTicker(time.Second) - var cursor int - for { - select { - case <-eng.C.Wait(): - return - case <-done: - return - case <-ticker.C: - n := eng.FindNodeByIndex(peerIndices[cursor]) - n.Transport.Send(msg) - cursor++ - } - } -} - func HiddenService(id nonce.ID, il *intro.Layer, client *Session, s Circuit, ks *signer.KeySet) Skins { @@ -85,5 +44,5 @@ func (eng *Engine) hiddenservice(hs *hiddenservice.Layer, b slice.Bytes, hs.Layer.Key.ToBase32()) eng.Introductions.AddIntro(hs.Layer.Key, b[*c:]) log.I.Ln("stored new introduction, starting broadcast") - go eng.hiddenserviceBroadcaster(&hs.Layer) + go eng.introductionBroadcaster(&hs.Layer) } diff --git a/pkg/relay/introductions.go b/pkg/relay/introductions.go index fb317ab8..db5beef6 100644 --- a/pkg/relay/introductions.go +++ b/pkg/relay/introductions.go @@ -2,11 +2,15 @@ package relay import ( "sync" + "time" + + "github.com/cybriq/qu" "git-indra.lan/indra-labs/indra/pkg/crypto/key/pub" "git-indra.lan/indra-labs/indra/pkg/crypto/nonce" "git-indra.lan/indra-labs/indra/pkg/messages/intro" "git-indra.lan/indra-labs/indra/pkg/types" + "git-indra.lan/indra-labs/indra/pkg/util/cryptorand" "git-indra.lan/indra-labs/indra/pkg/util/slice" ) @@ -91,3 +95,36 @@ func (eng *Engine) intro(intr *intro.Layer, b slice.Bytes, intr.Key.ToBase32(), intr.AddrPort.String()) } } + +func (eng *Engine) introductionBroadcaster(intr *intro.Layer) { + log.D.F("propagating hidden service introduction for %x", intr.Key.ToBytes()) + done := qu.T() + msg := make(slice.Bytes, intro.Len) + c := slice.NewCursor() + intr.Encode(msg, c) + nPeers := eng.NodesLen() + peerIndices := make([]int, nPeers) + for i := 0; i < nPeers; i++ { + peerIndices[i] = i + } + cryptorand.Shuffle(nPeers, func(i, j int) { + peerIndices[i], peerIndices[j] = peerIndices[j], peerIndices[i] + }) + // Since relays will also gossip this information, we will start a ticker + // that sends out the hidden service introduction once a second until it + // runs out of known relays to gossip to. + ticker := time.NewTicker(time.Second) + var cursor int + for { + select { + case <-eng.C.Wait(): + return + case <-done: + return + case <-ticker.C: + n := eng.FindNodeByIndex(peerIndices[cursor]) + n.Transport.Send(msg) + cursor++ + } + } +} diff --git a/pkg/relay/postacct.go b/pkg/relay/postacct.go deleted file mode 100644 index b97d40d2..00000000 --- a/pkg/relay/postacct.go +++ /dev/null @@ -1,99 +0,0 @@ -package relay - -import ( - "git-indra.lan/indra-labs/lnd/lnd/lnwire" - - "git-indra.lan/indra-labs/indra/pkg/crypto/nonce" - "git-indra.lan/indra-labs/indra/pkg/messages/balance" - "git-indra.lan/indra-labs/indra/pkg/messages/confirm" - "git-indra.lan/indra-labs/indra/pkg/messages/crypt" - "git-indra.lan/indra-labs/indra/pkg/messages/exit" - "git-indra.lan/indra-labs/indra/pkg/messages/forward" - "git-indra.lan/indra-labs/indra/pkg/messages/getbalance" - "git-indra.lan/indra-labs/indra/pkg/messages/hiddenservice" - "git-indra.lan/indra-labs/indra/pkg/messages/reverse" - "git-indra.lan/indra-labs/indra/pkg/util/slice" -) - -type SendData struct { - b slice.Bytes - sessions Sessions - billable []nonce.ID - ret, last nonce.ID - port uint16 - postAcct []func() -} - -// PostAcctOnion takes a slice of Skins and calculates their costs and -// the list of sessions inside them and attaches accounting operations to -// apply when the associated confirmation(s) or response hooks are executed. -func (eng *Engine) PostAcctOnion(o Skins) (res SendData) { - res.b = Encode(o.Assemble()) - // do client accounting - skip := false - for i := range o { - if skip { - skip = false - continue - } - switch on := o[i].(type) { - case *crypt.Layer: - s := eng.FindSessionByHeaderPub(on.ToHeaderPub) - if s == nil { - continue - } - res.sessions = append(res.sessions, s) - // The last hop needs no accounting as it's us! - if i == len(o)-1 { - // The session used for the last hop is stored, however. - res.ret = s.ID - res.billable = append(res.billable, s.ID) - break - } - switch on2 := o[i+1].(type) { - case *forward.Layer: - res.billable = append(res.billable, s.ID) - res.postAcct = append(res.postAcct, - func() { - eng.DecSession(s.ID, - s.RelayRate* - lnwire.MilliSatoshi(len(res.b))/1024/1024, true, - "forward") - }) - case *hiddenservice.Layer: - res.last = on2.ID - res.billable = append(res.billable, s.ID) - skip = true - case *reverse.Layer: - res.billable = append(res.billable, s.ID) - case *exit.Layer: - for j := range s.Services { - if s.Services[j].Port != on2.Port { - continue - } - res.port = on2.Port - res.postAcct = append(res.postAcct, - func() { - eng.DecSession(s.ID, - s.Services[j].RelayRate* - lnwire.MilliSatoshi(len(res.b)/2)/1024/1024, - true, "exit") - }) - break - } - res.billable = append(res.billable, s.ID) - res.last = on2.ID - skip = true - case *getbalance.Layer: - res.last = s.ID - res.billable = append(res.billable, s.ID) - skip = true - } - case *confirm.Layer: - res.last = on.ID - case *balance.Layer: - res.last = on.ID - } - } - return -} diff --git a/pkg/relay/send.go b/pkg/relay/send.go index 6b97173d..bcc57f40 100644 --- a/pkg/relay/send.go +++ b/pkg/relay/send.go @@ -4,7 +4,17 @@ import ( "net/netip" "runtime" + "git-indra.lan/indra-labs/lnd/lnd/lnwire" + "git-indra.lan/indra-labs/indra/pkg/crypto/nonce" + "git-indra.lan/indra-labs/indra/pkg/messages/balance" + "git-indra.lan/indra-labs/indra/pkg/messages/confirm" + "git-indra.lan/indra-labs/indra/pkg/messages/crypt" + "git-indra.lan/indra-labs/indra/pkg/messages/exit" + "git-indra.lan/indra-labs/indra/pkg/messages/forward" + "git-indra.lan/indra-labs/indra/pkg/messages/getbalance" + "git-indra.lan/indra-labs/indra/pkg/messages/hiddenservice" + "git-indra.lan/indra-labs/indra/pkg/messages/reverse" "git-indra.lan/indra-labs/indra/pkg/util/slice" ) @@ -25,6 +35,89 @@ func (eng *Engine) Send(addr *netip.AddrPort, b slice.Bytes) { }) } +type SendData struct { + b slice.Bytes + sessions Sessions + billable []nonce.ID + ret, last nonce.ID + port uint16 + postAcct []func() +} + +// PostAcctOnion takes a slice of Skins and calculates their costs and +// the list of sessions inside them and attaches accounting operations to +// apply when the associated confirmation(s) or response hooks are executed. +func (eng *Engine) PostAcctOnion(o Skins) (res SendData) { + res.b = Encode(o.Assemble()) + // do client accounting + skip := false + for i := range o { + if skip { + skip = false + continue + } + switch on := o[i].(type) { + case *crypt.Layer: + s := eng.FindSessionByHeaderPub(on.ToHeaderPub) + if s == nil { + continue + } + res.sessions = append(res.sessions, s) + // The last hop needs no accounting as it's us! + if i == len(o)-1 { + // The session used for the last hop is stored, however. + res.ret = s.ID + res.billable = append(res.billable, s.ID) + break + } + switch on2 := o[i+1].(type) { + case *forward.Layer: + res.billable = append(res.billable, s.ID) + res.postAcct = append(res.postAcct, + func() { + eng.DecSession(s.ID, + s.RelayRate* + lnwire.MilliSatoshi(len(res.b))/1024/1024, true, + "forward") + }) + case *hiddenservice.Layer: + res.last = on2.ID + res.billable = append(res.billable, s.ID) + skip = true + case *reverse.Layer: + res.billable = append(res.billable, s.ID) + case *exit.Layer: + for j := range s.Services { + if s.Services[j].Port != on2.Port { + continue + } + res.port = on2.Port + res.postAcct = append(res.postAcct, + func() { + eng.DecSession(s.ID, + s.Services[j].RelayRate* + lnwire.MilliSatoshi(len(res.b)/2)/1024/1024, + true, "exit") + }) + break + } + res.billable = append(res.billable, s.ID) + res.last = on2.ID + skip = true + case *getbalance.Layer: + res.last = s.ID + res.billable = append(res.billable, s.ID) + skip = true + } + case *confirm.Layer: + res.last = on.ID + case *balance.Layer: + res.last = on.ID + } + } + return +} + // SendWithOneHook is used for onions with only one confirmation hook. Usually // as returned from PostAcctOnion this is the last, confirmation or response // layer in an onion.Skins.