merged sending stuff together

This commit is contained in:
херетик
2023-02-27 11:03:08 +00:00
parent 71a26c80f6
commit a3b6562b03
4 changed files with 131 additions and 141 deletions

View File

@@ -1,10 +1,6 @@
package relay
import (
"time"
"github.com/cybriq/qu"
"git-indra.lan/indra-labs/indra/pkg/crypto/key/prv"
"git-indra.lan/indra-labs/indra/pkg/crypto/key/pub"
"git-indra.lan/indra-labs/indra/pkg/crypto/key/signer"
@@ -12,48 +8,11 @@ import (
"git-indra.lan/indra-labs/indra/pkg/messages/hiddenservice"
"git-indra.lan/indra-labs/indra/pkg/messages/intro"
"git-indra.lan/indra-labs/indra/pkg/types"
"git-indra.lan/indra-labs/indra/pkg/util/cryptorand"
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
type Referrers map[pub.Bytes][]pub.Bytes
func (eng *Engine) hiddenserviceBroadcaster(hs *intro.Layer) {
log.D.F("propagating hidden service introduction for %x", hs.Key.ToBytes())
done := qu.T()
intr := &intro.Layer{
Key: hs.Key, AddrPort: hs.AddrPort, Bytes: hs.Bytes,
}
msg := make(slice.Bytes, intro.Len)
c := slice.NewCursor()
intr.Encode(msg, c)
nPeers := eng.NodesLen()
peerIndices := make([]int, nPeers)
for i := 0; i < nPeers; i++ {
peerIndices[i] = i
}
cryptorand.Shuffle(nPeers, func(i, j int) {
peerIndices[i], peerIndices[j] = peerIndices[j], peerIndices[i]
})
// Since relays will also gossip this information, we will start a ticker
// that sends out the hidden service introduction once a second until it
// runs out of known relays to gossip to.
ticker := time.NewTicker(time.Second)
var cursor int
for {
select {
case <-eng.C.Wait():
return
case <-done:
return
case <-ticker.C:
n := eng.FindNodeByIndex(peerIndices[cursor])
n.Transport.Send(msg)
cursor++
}
}
}
func HiddenService(id nonce.ID, il *intro.Layer, client *Session, s Circuit,
ks *signer.KeySet) Skins {
@@ -85,5 +44,5 @@ func (eng *Engine) hiddenservice(hs *hiddenservice.Layer, b slice.Bytes,
hs.Layer.Key.ToBase32())
eng.Introductions.AddIntro(hs.Layer.Key, b[*c:])
log.I.Ln("stored new introduction, starting broadcast")
go eng.hiddenserviceBroadcaster(&hs.Layer)
go eng.introductionBroadcaster(&hs.Layer)
}

View File

@@ -2,11 +2,15 @@ package relay
import (
"sync"
"time"
"github.com/cybriq/qu"
"git-indra.lan/indra-labs/indra/pkg/crypto/key/pub"
"git-indra.lan/indra-labs/indra/pkg/crypto/nonce"
"git-indra.lan/indra-labs/indra/pkg/messages/intro"
"git-indra.lan/indra-labs/indra/pkg/types"
"git-indra.lan/indra-labs/indra/pkg/util/cryptorand"
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
@@ -91,3 +95,36 @@ func (eng *Engine) intro(intr *intro.Layer, b slice.Bytes,
intr.Key.ToBase32(), intr.AddrPort.String())
}
}
func (eng *Engine) introductionBroadcaster(intr *intro.Layer) {
log.D.F("propagating hidden service introduction for %x", intr.Key.ToBytes())
done := qu.T()
msg := make(slice.Bytes, intro.Len)
c := slice.NewCursor()
intr.Encode(msg, c)
nPeers := eng.NodesLen()
peerIndices := make([]int, nPeers)
for i := 0; i < nPeers; i++ {
peerIndices[i] = i
}
cryptorand.Shuffle(nPeers, func(i, j int) {
peerIndices[i], peerIndices[j] = peerIndices[j], peerIndices[i]
})
// Since relays will also gossip this information, we will start a ticker
// that sends out the hidden service introduction once a second until it
// runs out of known relays to gossip to.
ticker := time.NewTicker(time.Second)
var cursor int
for {
select {
case <-eng.C.Wait():
return
case <-done:
return
case <-ticker.C:
n := eng.FindNodeByIndex(peerIndices[cursor])
n.Transport.Send(msg)
cursor++
}
}
}

View File

@@ -1,99 +0,0 @@
package relay
import (
"git-indra.lan/indra-labs/lnd/lnd/lnwire"
"git-indra.lan/indra-labs/indra/pkg/crypto/nonce"
"git-indra.lan/indra-labs/indra/pkg/messages/balance"
"git-indra.lan/indra-labs/indra/pkg/messages/confirm"
"git-indra.lan/indra-labs/indra/pkg/messages/crypt"
"git-indra.lan/indra-labs/indra/pkg/messages/exit"
"git-indra.lan/indra-labs/indra/pkg/messages/forward"
"git-indra.lan/indra-labs/indra/pkg/messages/getbalance"
"git-indra.lan/indra-labs/indra/pkg/messages/hiddenservice"
"git-indra.lan/indra-labs/indra/pkg/messages/reverse"
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
type SendData struct {
b slice.Bytes
sessions Sessions
billable []nonce.ID
ret, last nonce.ID
port uint16
postAcct []func()
}
// PostAcctOnion takes a slice of Skins and calculates their costs and
// the list of sessions inside them and attaches accounting operations to
// apply when the associated confirmation(s) or response hooks are executed.
func (eng *Engine) PostAcctOnion(o Skins) (res SendData) {
res.b = Encode(o.Assemble())
// do client accounting
skip := false
for i := range o {
if skip {
skip = false
continue
}
switch on := o[i].(type) {
case *crypt.Layer:
s := eng.FindSessionByHeaderPub(on.ToHeaderPub)
if s == nil {
continue
}
res.sessions = append(res.sessions, s)
// The last hop needs no accounting as it's us!
if i == len(o)-1 {
// The session used for the last hop is stored, however.
res.ret = s.ID
res.billable = append(res.billable, s.ID)
break
}
switch on2 := o[i+1].(type) {
case *forward.Layer:
res.billable = append(res.billable, s.ID)
res.postAcct = append(res.postAcct,
func() {
eng.DecSession(s.ID,
s.RelayRate*
lnwire.MilliSatoshi(len(res.b))/1024/1024, true,
"forward")
})
case *hiddenservice.Layer:
res.last = on2.ID
res.billable = append(res.billable, s.ID)
skip = true
case *reverse.Layer:
res.billable = append(res.billable, s.ID)
case *exit.Layer:
for j := range s.Services {
if s.Services[j].Port != on2.Port {
continue
}
res.port = on2.Port
res.postAcct = append(res.postAcct,
func() {
eng.DecSession(s.ID,
s.Services[j].RelayRate*
lnwire.MilliSatoshi(len(res.b)/2)/1024/1024,
true, "exit")
})
break
}
res.billable = append(res.billable, s.ID)
res.last = on2.ID
skip = true
case *getbalance.Layer:
res.last = s.ID
res.billable = append(res.billable, s.ID)
skip = true
}
case *confirm.Layer:
res.last = on.ID
case *balance.Layer:
res.last = on.ID
}
}
return
}

View File

@@ -4,7 +4,17 @@ import (
"net/netip"
"runtime"
"git-indra.lan/indra-labs/lnd/lnd/lnwire"
"git-indra.lan/indra-labs/indra/pkg/crypto/nonce"
"git-indra.lan/indra-labs/indra/pkg/messages/balance"
"git-indra.lan/indra-labs/indra/pkg/messages/confirm"
"git-indra.lan/indra-labs/indra/pkg/messages/crypt"
"git-indra.lan/indra-labs/indra/pkg/messages/exit"
"git-indra.lan/indra-labs/indra/pkg/messages/forward"
"git-indra.lan/indra-labs/indra/pkg/messages/getbalance"
"git-indra.lan/indra-labs/indra/pkg/messages/hiddenservice"
"git-indra.lan/indra-labs/indra/pkg/messages/reverse"
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
@@ -25,6 +35,89 @@ func (eng *Engine) Send(addr *netip.AddrPort, b slice.Bytes) {
})
}
type SendData struct {
b slice.Bytes
sessions Sessions
billable []nonce.ID
ret, last nonce.ID
port uint16
postAcct []func()
}
// PostAcctOnion takes a slice of Skins and calculates their costs and
// the list of sessions inside them and attaches accounting operations to
// apply when the associated confirmation(s) or response hooks are executed.
func (eng *Engine) PostAcctOnion(o Skins) (res SendData) {
res.b = Encode(o.Assemble())
// do client accounting
skip := false
for i := range o {
if skip {
skip = false
continue
}
switch on := o[i].(type) {
case *crypt.Layer:
s := eng.FindSessionByHeaderPub(on.ToHeaderPub)
if s == nil {
continue
}
res.sessions = append(res.sessions, s)
// The last hop needs no accounting as it's us!
if i == len(o)-1 {
// The session used for the last hop is stored, however.
res.ret = s.ID
res.billable = append(res.billable, s.ID)
break
}
switch on2 := o[i+1].(type) {
case *forward.Layer:
res.billable = append(res.billable, s.ID)
res.postAcct = append(res.postAcct,
func() {
eng.DecSession(s.ID,
s.RelayRate*
lnwire.MilliSatoshi(len(res.b))/1024/1024, true,
"forward")
})
case *hiddenservice.Layer:
res.last = on2.ID
res.billable = append(res.billable, s.ID)
skip = true
case *reverse.Layer:
res.billable = append(res.billable, s.ID)
case *exit.Layer:
for j := range s.Services {
if s.Services[j].Port != on2.Port {
continue
}
res.port = on2.Port
res.postAcct = append(res.postAcct,
func() {
eng.DecSession(s.ID,
s.Services[j].RelayRate*
lnwire.MilliSatoshi(len(res.b)/2)/1024/1024,
true, "exit")
})
break
}
res.billable = append(res.billable, s.ID)
res.last = on2.ID
skip = true
case *getbalance.Layer:
res.last = s.ID
res.billable = append(res.billable, s.ID)
skip = true
}
case *confirm.Layer:
res.last = on.ID
case *balance.Layer:
res.last = on.ID
}
}
return
}
// SendWithOneHook is used for onions with only one confirmation hook. Usually
// as returned from PostAcctOnion this is the last, confirmation or response
// layer in an onion.Skins.