starting on intro broadcaster

This commit is contained in:
херетик
2023-02-25 20:09:13 +00:00
parent 09eedffa31
commit a0c690ae64
14 changed files with 188 additions and 121 deletions

View File

@@ -15,20 +15,20 @@ import (
)
var (
eng *relay.Engine
p2p []string
rpc []string
eng *relay.Engine
engineP2P []string
engineRPC []string
)
func init() {
pf := relayCmd.PersistentFlags()
pf.StringSliceVarP(&p2p, "p2p-relay", "P",
pf.StringSliceVarP(&engineP2P, "engineP2P-relay", "P",
[]string{"127.0.0.1:8337", "::1:8337"},
"address/ports for IPv4 and v6 listeners")
pf.StringSliceVarP(&rpc, "relay-control", "r",
pf.StringSliceVarP(&engineRPC, "relay-control", "r",
[]string{"127.0.0.1:8339", "::1:8339"},
"address/ports for IPv4 and v6 listeners")
viper.BindPFlag("p2p-relay", seedCmd.PersistentFlags().Lookup("p2p-relay"))
viper.BindPFlag("engineP2P-relay", seedCmd.PersistentFlags().Lookup("engineP2P-relay"))
viper.BindPFlag("relay-control", seedCmd.PersistentFlags().Lookup(
"relay-control"))
rootCmd.AddCommand(relayCmd)

View File

@@ -3,10 +3,10 @@ package main
import "github.com/spf13/cobra"
func init() {
rootCmd.AddCommand(seedCommand)
rootCmd.AddCommand(seedCmd)
}
var seedCommand = &cobra.Command{
var seedCmd = &cobra.Command{
Use: "seed",
Short: "run and manage your seed node",
Long: `run and manage your seed node`,

View File

@@ -5,14 +5,14 @@ import (
)
func init() {
//// Init flags belonging to the seed package
//seed.InitFlags(seedServeCommand)
// // Init flags belonging to the seed package
// seed.InitFlags(seedServeCommand)
//
//// Init flags belonging to the rpc package
//rpc.InitFlags(seedServeCommand)
seedCommand.AddCommand(seedRPCCmd)
// // Init flags belonging to the rpc package
// rpc.InitFlags(seedServeCommand)
seedCmd.AddCommand(seedRPCCmd)
}
var seedRPCCmd = &cobra.Command{
@@ -20,6 +20,6 @@ var seedRPCCmd = &cobra.Command{
Short: "A list of commands for interacting with a seed",
Long: `A list of commands for interacting with a seed.`,
Run: func(cmd *cobra.Command, args []string) {
},
}

View File

@@ -2,18 +2,20 @@ package main
import (
"context"
"git-indra.lan/indra-labs/indra"
"git-indra.lan/indra-labs/indra/pkg/interrupt"
log2 "git-indra.lan/indra-labs/indra/pkg/proc/log"
"git-indra.lan/indra-labs/indra/pkg/rpc"
"git-indra.lan/indra-labs/indra/pkg/seed"
"os"
"time"
"github.com/multiformats/go-multiaddr"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/tutorialedge/go-grpc-tutorial/chat"
"google.golang.org/grpc"
"os"
"time"
"git-indra.lan/indra-labs/indra"
"git-indra.lan/indra-labs/indra/pkg/interrupt"
log2 "git-indra.lan/indra-labs/indra/pkg/proc/log"
"git-indra.lan/indra-labs/indra/pkg/rpc"
"git-indra.lan/indra-labs/indra/pkg/seed"
)
var (
@@ -21,14 +23,14 @@ var (
)
func init() {
// Init flags belonging to the seed package
seed.InitFlags(seedServeCommand)
// Init flags belonging to the rpc package
rpc.InitFlags(seedServeCommand)
seedCommand.AddCommand(seedServeCommand)
seedCmd.AddCommand(seedServeCommand)
}
var seedServeCommand = &cobra.Command{
@@ -36,93 +38,93 @@ var seedServeCommand = &cobra.Command{
Short: "Serves an instance of the seed node",
Long: `Serves an instance of the seed node.`,
Run: func(cmd *cobra.Command, args []string) {
log.I.Ln("-- ", log2.App, "("+viper.GetString("network")+") -", indra.SemVer, "- Network Freedom. --")
log.I.Ln("running seed")
var ctx context.Context
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(context.Background())
interrupt.AddHandler(cancel)
//
// RPC
//
rpc.RunWith(ctx, func(srv *grpc.Server) {
chat.RegisterChatServiceServer(srv, &chat.Server{})
})
select {
case <-rpc.CantStart():
log.I.Ln("issues starting the rpc server")
log.I.Ln("attempting a graceful shutdown")
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
rpc.Shutdown(ctx)
select {
case <-ctx.Done():
log.I.Ln("can't shutdown gracefully, exiting.")
os.Exit(1)
default:
log.I.Ln("graceful shutdown complete")
os.Exit(0)
}
case <-rpc.IsReady():
log.I.Ln("rpc server is ready")
}
rpc.Run(ctx)
//
// P2P
//
var config = seed.DefaultConfig
config.SetNetwork(viper.GetString("network"))
if config.PrivKey, err = seed.GetOrGeneratePrivKey(viper.GetString("key")); check(err) {
return
}
for _, listener := range viper.GetStringSlice("listen") {
config.ListenAddresses = append(config.ListenAddresses, multiaddr.StringCast(listener))
}
for _, seed := range viper.GetStringSlice("seed") {
config.SeedAddresses = append(config.SeedAddresses, multiaddr.StringCast(seed))
}
for _, connector := range viper.GetStringSlice("connect") {
config.ConnectAddresses = append(config.ConnectAddresses, multiaddr.StringCast(connector))
}
var srv *seed.Server
if srv, err = seed.New(config); check(err) {
return
}
if err = srv.Serve(); check(err) {
return
}
log.I.Ln("-- fin --")
return
},
}

View File

@@ -7,12 +7,13 @@ import (
"git-indra.lan/indra-labs/indra/pkg/crypto/key/pub"
"git-indra.lan/indra-labs/indra/pkg/messages/magicbytes"
"git-indra.lan/indra-labs/indra/pkg/splice"
"git-indra.lan/indra-labs/indra/pkg/types"
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
const (
MagicString = "in"
AddrLen = net.IPv6len + 2
AddrLen = net.IPv6len + 3
Len = magicbytes.Len + pub.KeyLen + AddrLen
)
@@ -20,21 +21,20 @@ var (
Magic = slice.Bytes(MagicString)
)
type IntroductionMessage struct {
type Layer struct {
*pub.Key
*netip.AddrPort
}
func Encode(im *IntroductionMessage) (o slice.Bytes) {
b, c := make(slice.Bytes, Len), slice.NewCursor()
func (im *Layer) Insert(o types.Onion) {}
func (im *Layer) Len() int { return Len }
func (im *Layer) Encode(b slice.Bytes, c *slice.Cursor) {
splice.Splice(b, c).Magic(Magic).Pubkey(im.Key).AddrPort(im.AddrPort)
return
}
func Decode(b slice.Bytes) (o *IntroductionMessage) {
o = &IntroductionMessage{}
c := slice.NewCursor()
c.Inc(magicbytes.Len)
splice.Splice(b, c).ReadPubkey(&o.Key).ReadAddrPort(&o.AddrPort)
func (im *Layer) Decode(b slice.Bytes, c *slice.Cursor) (e error) {
splice.Splice(b, c).ReadPubkey(&im.Key).ReadAddrPort(&im.AddrPort)
return
}

View File

@@ -31,7 +31,7 @@ func (p handlemessages) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func main() {
typesList := []string{"balance", "confirm", "crypt", "delay", "dxresponse",
"exit", "forward", "getbalance", "hiddenservice", "reverse",
"exit", "forward", "getbalance", "hiddenservice", "intro", "reverse",
"response", "session"}
sort.Strings(typesList)
tpl := `package relay
@@ -41,19 +41,20 @@ import (
"github.com/davecgh/go-spew/spew"
"git-indra.lan/indra-labs/indra/pkg/onion/balance"
"git-indra.lan/indra-labs/indra/pkg/onion/confirm"
"git-indra.lan/indra-labs/indra/pkg/onion/crypt"
"git-indra.lan/indra-labs/indra/pkg/onion/delay"
"git-indra.lan/indra-labs/indra/pkg/onion/dxresponse"
"git-indra.lan/indra-labs/indra/pkg/onion/exit"
"git-indra.lan/indra-labs/indra/pkg/onion/forward"
"git-indra.lan/indra-labs/indra/pkg/onion/getbalance"
"git-indra.lan/indra-labs/indra/pkg/onion/hiddenservice"
"git-indra.lan/indra-labs/indra/pkg/onion/magicbytes"
"git-indra.lan/indra-labs/indra/pkg/onion/response"
"git-indra.lan/indra-labs/indra/pkg/onion/reverse"
"git-indra.lan/indra-labs/indra/pkg/onion/session"
"git-indra.lan/indra-labs/indra/pkg/messages/balance"
"git-indra.lan/indra-labs/indra/pkg/messages/confirm"
"git-indra.lan/indra-labs/indra/pkg/messages/crypt"
"git-indra.lan/indra-labs/indra/pkg/messages/delay"
"git-indra.lan/indra-labs/indra/pkg/messages/dxresponse"
"git-indra.lan/indra-labs/indra/pkg/messages/exit"
"git-indra.lan/indra-labs/indra/pkg/messages/forward"
"git-indra.lan/indra-labs/indra/pkg/messages/getbalance"
"git-indra.lan/indra-labs/indra/pkg/messages/hiddenservice"
"git-indra.lan/indra-labs/indra/pkg/messages/intro"
"git-indra.lan/indra-labs/indra/pkg/messages/magicbytes"
"git-indra.lan/indra-labs/indra/pkg/messages/response"
"git-indra.lan/indra-labs/indra/pkg/messages/reverse"
"git-indra.lan/indra-labs/indra/pkg/messages/session"
"git-indra.lan/indra-labs/indra/pkg/types"
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
@@ -100,6 +101,7 @@ func Peel(b slice.Bytes, c *slice.Cursor) (on types.Onion, e error) {
{"forward", true},
{"getbalance", true},
{"hiddenservice", true},
{"intro", false},
{"reverse", false},
{"response", true},
{"session", true},
@@ -111,17 +113,18 @@ func Peel(b slice.Bytes, c *slice.Cursor) (on types.Onion, e error) {
import (
"reflect"
"git-indra.lan/indra-labs/indra/pkg/onion/balance"
"git-indra.lan/indra-labs/indra/pkg/onion/confirm"
"git-indra.lan/indra-labs/indra/pkg/onion/crypt"
"git-indra.lan/indra-labs/indra/pkg/onion/delay"
"git-indra.lan/indra-labs/indra/pkg/onion/exit"
"git-indra.lan/indra-labs/indra/pkg/onion/forward"
"git-indra.lan/indra-labs/indra/pkg/onion/getbalance"
"git-indra.lan/indra-labs/indra/pkg/onion/hiddenservice"
"git-indra.lan/indra-labs/indra/pkg/onion/response"
"git-indra.lan/indra-labs/indra/pkg/onion/reverse"
"git-indra.lan/indra-labs/indra/pkg/onion/session"
"git-indra.lan/indra-labs/indra/pkg/messages/balance"
"git-indra.lan/indra-labs/indra/pkg/messages/confirm"
"git-indra.lan/indra-labs/indra/pkg/messages/crypt"
"git-indra.lan/indra-labs/indra/pkg/messages/delay"
"git-indra.lan/indra-labs/indra/pkg/messages/exit"
"git-indra.lan/indra-labs/indra/pkg/messages/forward"
"git-indra.lan/indra-labs/indra/pkg/messages/getbalance"
"git-indra.lan/indra-labs/indra/pkg/messages/hiddenservice"
"git-indra.lan/indra-labs/indra/pkg/messages/intro"
"git-indra.lan/indra-labs/indra/pkg/messages/response"
"git-indra.lan/indra-labs/indra/pkg/messages/reverse"
"git-indra.lan/indra-labs/indra/pkg/messages/session"
"git-indra.lan/indra-labs/indra/pkg/types"
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)

View File

@@ -2,7 +2,7 @@ package relay
import (
"reflect"
"git-indra.lan/indra-labs/indra/pkg/messages/balance"
"git-indra.lan/indra-labs/indra/pkg/messages/confirm"
"git-indra.lan/indra-labs/indra/pkg/messages/crypt"
@@ -11,6 +11,7 @@ import (
"git-indra.lan/indra-labs/indra/pkg/messages/forward"
"git-indra.lan/indra-labs/indra/pkg/messages/getbalance"
"git-indra.lan/indra-labs/indra/pkg/messages/hiddenservice"
"git-indra.lan/indra-labs/indra/pkg/messages/intro"
"git-indra.lan/indra-labs/indra/pkg/messages/response"
"git-indra.lan/indra-labs/indra/pkg/messages/reverse"
"git-indra.lan/indra-labs/indra/pkg/messages/session"
@@ -19,8 +20,7 @@ import (
)
func (eng *Engine) handleMessage(b slice.Bytes, prev types.Onion) {
log.T.F("%v handling received message %v", eng.GetLocalNodeAddress(),
prev == nil)
log.T.F("%v handling received message", eng.GetLocalNodeAddress())
var on1 types.Onion
var e error
c := slice.NewCursor()
@@ -80,6 +80,9 @@ func (eng *Engine) handleMessage(b slice.Bytes, prev types.Onion) {
}
log.T.C(recLog(on, b, eng))
eng.hiddenservice(on, b, c, prev)
case *intro.Layer:
log.T.C(recLog(on, b, eng))
eng.intro(on, b, c, prev)
case *response.Layer:
if prev == nil {
log.E.Ln(reflect.TypeOf(on), "requests from outside? absurd!")

View File

@@ -11,8 +11,7 @@ func (eng *Engine) hiddenservice(hs *hiddenservice.Layer, b slice.Bytes,
log.D.F("%s adding introduction for key %x", eng.GetLocalNodeAddress(),
hs.Identity.ToBytes())
hsk := hs.Identity.ToBytes()
eng.Introductions.AddIntro(hsk, b[*c:])
eng.Introductions.AddIntro(hs.Identity, b[*c:])
log.I.Ln("stored new introduction, starting broadcast")
go eng.hiddenserviceBroadcaster(hsk)
go eng.hiddenserviceBroadcaster(hs.Identity)
}

View File

@@ -0,0 +1,13 @@
package relay
import (
"git-indra.lan/indra-labs/indra/pkg/messages/intro"
"git-indra.lan/indra-labs/indra/pkg/types"
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
func (eng *Engine) intro(intr *intro.Layer, b slice.Bytes,
c *slice.Cursor, prev types.Onion) {
log.D.S(intr)
}

View File

@@ -0,0 +1,51 @@
package relay
import (
"time"
"github.com/cybriq/qu"
"git-indra.lan/indra-labs/indra/pkg/crypto/key/pub"
"git-indra.lan/indra-labs/indra/pkg/messages/intro"
"git-indra.lan/indra-labs/indra/pkg/util/cryptorand"
"git-indra.lan/indra-labs/indra/pkg/util/slice"
)
type Referrers map[pub.Bytes][]pub.Bytes
func (eng *Engine) hiddenserviceBroadcaster(hsk *pub.Key) {
log.D.F("propagating hidden service introduction for %x", hsk.ToBytes())
done := qu.T()
me := eng.GetLocalNodeAddress()
intr := &intro.Layer{
Key: hsk, AddrPort: me,
}
msg := make(slice.Bytes, intro.Len)
c := slice.NewCursor()
intr.Encode(msg, c)
nPeers := eng.NodesLen()
peerIndices := make([]int, nPeers)
for i := 0; i < nPeers; i++ {
peerIndices[i] = i
}
cryptorand.Shuffle(nPeers, func(i, j int) {
peerIndices[i], peerIndices[j] = peerIndices[j], peerIndices[i]
})
// Since relays will also gossip this information, we will start a ticker
// that sends out the hidden service introduction once a second until it
// runs out of known relays to gossip to.
ticker := time.NewTicker(time.Second)
var cursor int
for {
select {
case <-eng.C.Wait():
return
case <-done:
return
case <-ticker.C:
n := eng.FindNodeByIndex(peerIndices[cursor])
n.Transport.Send(msg)
cursor++
}
}
}

View File

@@ -1,16 +0,0 @@
package relay
import "git-indra.lan/indra-labs/indra/pkg/crypto/key/pub"
type Referrers map[pub.Bytes][]pub.Bytes
func (eng *Engine) hiddenserviceBroadcaster(hsk pub.Bytes) {
log.D.Ln("propagating hidden service introduction for %x", hsk)
for {
select {
case <-eng.C.Wait():
return
}
}
}

View File

@@ -41,9 +41,10 @@ func (in *Introductions) Find(key pub.Bytes) (header slice.Bytes) {
return
}
func (in *Introductions) AddIntro(key pub.Bytes, header slice.Bytes) {
func (in *Introductions) AddIntro(pk *pub.Key, header slice.Bytes) {
in.Lock()
var ok bool
key := pk.ToBytes()
if _, ok = in.Intros[key]; ok {
log.D.Ln("entry already exists for key %x", key)
} else {

View File

@@ -3,9 +3,9 @@ package relay
import (
"fmt"
"net/netip"
"git-indra.lan/indra-labs/lnd/lnd/lnwire"
"git-indra.lan/indra-labs/indra/pkg/crypto/key/prv"
"git-indra.lan/indra-labs/indra/pkg/crypto/key/pub"
"git-indra.lan/indra-labs/indra/pkg/crypto/nonce"
@@ -44,7 +44,7 @@ func (sm *SessionManager) SetLocalNodeAddress(addr *netip.AddrPort) {
func (sm *SessionManager) SendFromLocalNode(port uint16,
b slice.Bytes) (e error) {
sm.Lock()
defer sm.Unlock()
return sm.GetLocalNode().SendTo(port, b)
@@ -96,6 +96,11 @@ func (sm *SessionManager) AddNodes(nn ...*Node) {
defer sm.Unlock()
sm.nodes = append(sm.nodes, nn...)
}
func (sm *SessionManager) FindNodeByIndex(i int) (no *Node) {
sm.Lock()
defer sm.Unlock()
return sm.nodes[i]
}
// FindNodeByID searches for a Node by ID.
func (sm *SessionManager) FindNodeByID(i nonce.ID) (no *Node) {

View File

@@ -2,9 +2,9 @@ package relay
import (
"fmt"
"github.com/davecgh/go-spew/spew"
"git-indra.lan/indra-labs/indra/pkg/messages/balance"
"git-indra.lan/indra-labs/indra/pkg/messages/confirm"
"git-indra.lan/indra-labs/indra/pkg/messages/crypt"
@@ -14,6 +14,7 @@ import (
"git-indra.lan/indra-labs/indra/pkg/messages/forward"
"git-indra.lan/indra-labs/indra/pkg/messages/getbalance"
"git-indra.lan/indra-labs/indra/pkg/messages/hiddenservice"
"git-indra.lan/indra-labs/indra/pkg/messages/intro"
"git-indra.lan/indra-labs/indra/pkg/messages/magicbytes"
"git-indra.lan/indra-labs/indra/pkg/messages/response"
"git-indra.lan/indra-labs/indra/pkg/messages/reverse"
@@ -69,6 +70,11 @@ func Peel(b slice.Bytes, c *slice.Cursor) (on types.Onion, e error) {
if e = on.Decode(b, c); check(e) {
return
}
case intro.MagicString:
on = &intro.Layer{}
if e = on.Decode(b, c); check(e) {
return
}
case response.MagicString:
on = &response.Layer{}
if e = on.Decode(b, c); check(e) {