refactoring rpc.

This commit is contained in:
greg stone
2023-02-22 11:04:39 +00:00
parent fa367dea27
commit 0531f7c726
8 changed files with 259 additions and 129 deletions

View File

@@ -1,17 +1,20 @@
package main
import (
"context"
"git-indra.lan/indra-labs/indra"
"git-indra.lan/indra-labs/indra/pkg/cfg"
"git-indra.lan/indra-labs/indra/pkg/interrupt"
log2 "git-indra.lan/indra-labs/indra/pkg/proc/log"
"git-indra.lan/indra-labs/indra/pkg/rpc"
"git-indra.lan/indra-labs/indra/pkg/seed"
"github.com/multiformats/go-multiaddr"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"math/rand"
"os"
"time"
)
var (
err error
)
var (
@@ -19,6 +22,7 @@ var (
listeners []string
seeds []string
connectors []string
rpc_enable bool
rpc_listen_port uint16
rpc_key string
rpc_whitelist_peer []string
@@ -32,38 +36,28 @@ func init() {
seedCmd.PersistentFlags().StringSliceVarP(&listeners, "listen", "l", []string{"/ip4/127.0.0.1/tcp/8337", "/ip6/::1/tcp/8337"}, "binds to an interface")
seedCmd.PersistentFlags().StringSliceVarP(&seeds, "seed", "s", []string{}, "adds an additional seed connection (e.g /dns4/seed0.indra.org/tcp/8337/p2p/<pub_key>)")
seedCmd.PersistentFlags().StringSliceVarP(&connectors, "connect", "c", []string{}, "connects only to the seed multi-addresses specified")
seedCmd.PersistentFlags().BoolVarP(&rpc_enable, "rpc-enable", "", false, "enables the rpc server")
seedCmd.PersistentFlags().Uint16VarP(&rpc_listen_port, "rpc-listen-port", "", 0, "binds the udp server to port (random if not selected)")
seedCmd.PersistentFlags().StringVarP(&rpc_key, "rpc-key", "", "", "the base58 encoded pre-shared key for accessing the rpc")
seedCmd.PersistentFlags().StringSliceVarP(&rpc_whitelist_peer, "rpc-whitelist-peer", "", []string{}, "adds a peer id to the whitelist for access")
seedCmd.PersistentFlags().StringSliceVarP(&rpc_whitelist_ip, "rpc-whitelist-ip", "", []string{}, "adds a cidr ip range to the whitelist for access (e.g /ip4/127.0.0.1/ipcidr/32)")
seedCmd.PersistentFlags().StringVarP(&rpc_unix_path, "rpc-listen-unix", "", "", "binds to a unix socket with path (default is $HOME/.indra/indra.sock)")
seedCmd.PersistentFlags().StringVarP(&rpc_unix_path, "rpc-listen-unix", "", "/tmp/indra.sock", "binds to a unix socket with path (default is /tmp/indra.sock)")
viper.BindPFlag("key", seedCmd.PersistentFlags().Lookup("key"))
viper.BindPFlag("listen", seedCmd.PersistentFlags().Lookup("listen"))
viper.BindPFlag("seed", seedCmd.PersistentFlags().Lookup("seed"))
viper.BindPFlag("connect", seedCmd.PersistentFlags().Lookup("connect"))
viper.BindPFlag("rpc-enable", seedCmd.PersistentFlags().Lookup("rpc-enable"))
viper.BindPFlag("rpc-listen-port", seedCmd.PersistentFlags().Lookup("rpc-listen-port"))
viper.BindPFlag("rpc-key", seedCmd.PersistentFlags().Lookup("rpc-key"))
viper.BindPFlag("rpc-whitelist-peer", seedCmd.PersistentFlags().Lookup("rpc-whitelist-peer"))
viper.BindPFlag("rpc-whitelist-ip", seedCmd.PersistentFlags().Lookup("rpc-whitelist-ip"))
viper.BindPFlag("rpc-listen-unix", seedCmd.PersistentFlags().Lookup("rpc-listen-unix"))
cobra.OnInitialize(initUnixSocket)
rootCmd.AddCommand(seedCmd)
}
func initUnixSocket() {
if rpc_unix_path == "" {
home, err := os.UserHomeDir()
cobra.CheckErr(err)
rpc_unix_path = home + "/.indra/indra.sock"
}
}
var seedCmd = &cobra.Command{
Use: "seed",
Short: "Serves an instance of the seed node",
@@ -72,40 +66,62 @@ var seedCmd = &cobra.Command{
log.I.Ln("-- ", log2.App, "("+viper.GetString("network")+") -", indra.SemVer, "- Network Freedom. --")
var err error
var config = seed.DefaultConfig
log.I.Ln("running seed")
config.Params = cfg.SelectNetworkParams(viper.GetString("network"))
//
// Set the context
//
config.RPCConfig.Key.Decode(viper.GetString("rpc-key"))
var ctx context.Context
var cancel context.CancelFunc
if config.RPCConfig.IsEnabled() {
ctx, cancel = context.WithCancel(context.Background())
config.RPCConfig.ListenPort = viper.GetUint16("rpc-listen-port")
interrupt.AddHandler(cancel)
if config.RPCConfig.ListenPort == 0 {
//
// RPC
//
rand.Seed(time.Now().Unix())
if viper.GetBool("rpc-enable") {
config.RPCConfig.ListenPort = uint16(rand.Intn(45534) + 10000)
log.I.Ln("enabling rpc server")
viper.Set("rpc-listen-port", config.RPCConfig.ListenPort)
if err = rpc.ConfigureWithViper(); check(err) {
os.Exit(1)
}
for _, ip := range viper.GetStringSlice("rpc-whitelist-ip") {
config.RPCConfig.IP_Whitelist = append(config.RPCConfig.IP_Whitelist, multiaddr.StringCast(ip))
}
// We need to enable specific gRPC services here
// rpc.Register()
for _, peer := range viper.GetStringSlice("rpc-whitelist-peer") {
log.I.Ln("starting rpc server")
var pubKey rpc.RPCPublicKey
go rpc.Start(ctx)
pubKey.Decode(peer)
select {
case <-rpc.CantStart():
config.RPCConfig.Peer_Whitelist = append(config.RPCConfig.Peer_Whitelist, pubKey)
log.I.Ln("issues starting the rpc server")
log.I.Ln("attempting a graceful shutdown")
rpc.Shutdown()
os.Exit(1)
case <-rpc.IsReady():
log.I.Ln("rpc server is ready!")
}
}
//
// P2P
//
var config = seed.DefaultConfig
config.SetNetwork(viper.GetString("network"))
if config.PrivKey, err = seed.GetOrGeneratePrivKey(viper.GetString("key")); check(err) {
return
}
@@ -124,8 +140,6 @@ var seedCmd = &cobra.Command{
var srv *seed.Server
log.I.Ln("running serve.")
if srv, err = seed.New(config); check(err) {
return
}

View File

@@ -68,7 +68,7 @@ func NewClient(config *ClientConfig) (*RPCClient, error) {
deviceConf := "" +
"public_key=" + config.Peer.PublicKey.HexString() + "\n" +
"endpoint=0.0.0.0:18222" + "\n" +
"allowed_ip=" + DefaultIPAddress.String() + "/32\n" +
"allowed_ip=" + deviceIP.String() + "/32\n" +
"persistent_keepalive_interval=" + strconv.Itoa(int(config.Peer.KeepAliveInterval)) + "\n"
if err = r.device.IpcSet(deviceConf); check(err) {

56
pkg/rpc/config.go Normal file
View File

@@ -0,0 +1,56 @@
package rpc
import (
"github.com/multiformats/go-multiaddr"
"math/rand"
"time"
)
type rpcConfig struct {
Key *RPCPrivateKey
ListenPort uint16
Peer_Whitelist []RPCPublicKey
IP_Whitelist []multiaddr.Multiaddr
UnixPath string
}
func (c *rpcConfig) NewKey() {
var err error
if c.Key, err = NewPrivateKey(); check(err) {
panic(err)
}
}
func (c *rpcConfig) SetKey(key string) {
c.Key.Decode(key)
}
func (c *rpcConfig) IsNullKey() bool {
return c.Key.IsZero()
}
func (c *rpcConfig) SetPort(port uint16) {
c.ListenPort = port
}
func (c *rpcConfig) IsNullPort() bool {
return c.ListenPort == NullPort
}
func (c *rpcConfig) SetRandomPort() uint16 {
rand.Seed(time.Now().Unix())
c.ListenPort = uint16(rand.Intn(45534) + 10000)
return c.ListenPort
}
func (c *rpcConfig) SetUnixPath(path string) {
c.UnixPath = path
}
func (conf *rpcConfig) IsEnabled() bool {
return !conf.Key.IsZero()
}

View File

@@ -20,18 +20,21 @@ type (
)
var (
DefaultRPCPrivateKey RPCPrivateKey
DefaultRPCPublicKey RPCPublicKey
nullRPCPrivateKey RPCPrivateKey
DefaultRPCPublicKey RPCPublicKey
)
func NewPrivateKey() (sk RPCPrivateKey, err error) {
func NewPrivateKey() (*RPCPrivateKey, error) {
var err error
var sk RPCPrivateKey
_, err = rand.Read(sk[:])
sk[0] &= 248
sk[31] = (sk[31] & 127) | 64
return
return &sk, err
}
func (key RPCPrivateKey) Equals(tar RPCPrivateKey) bool {

View File

@@ -1,6 +1,7 @@
package rpc
import (
"context"
"git-indra.lan/indra-labs/indra"
log2 "git-indra.lan/indra-labs/indra/pkg/proc/log"
"github.com/multiformats/go-multiaddr"
@@ -8,78 +9,70 @@ import (
"golang.zx2c4.com/wireguard/device"
"golang.zx2c4.com/wireguard/tun"
"golang.zx2c4.com/wireguard/tun/netstack"
"gvisor.dev/gvisor/pkg/tcpip/adapters/gonet"
"net"
"net/netip"
"net/rpc"
"os"
"strconv"
)
const NullPort = 0
var (
log = log2.GetLogger(indra.PathBase)
check = log.E.Chk
)
var (
DefaultIPAddress = netip.MustParseAddr("127.0.37.1")
config = rpcConfig{
Key: &nullRPCPrivateKey,
ListenPort: NullPort,
Peer_Whitelist: []RPCPublicKey{},
IP_Whitelist: []multiaddr.Multiaddr{},
}
)
type RPCConfig struct {
Key *RPCPrivateKey
ListenPort uint16
Peer_Whitelist []RPCPublicKey
IP_Whitelist []multiaddr.Multiaddr
var (
isReady = make(chan bool)
startupErrors = make(chan error)
)
func IsReady() chan bool {
return isReady
}
func (conf *RPCConfig) IsEnabled() bool {
return !conf.Key.IsZero()
func CantStart() chan error {
return startupErrors
}
type RPC struct {
device *device.Device
network *netstack.Net
tunnel tun.Device
}
var (
deviceIP netip.Addr = netip.MustParseAddr("127.0.37.1")
devicePort int = 0
deviceMTU int = 1420
)
func (r *RPC) Start() error {
var (
dev *device.Device
network *netstack.Net
tunnel tun.Device
unixSock net.Listener
tcpSock net.Listener
)
log.I.Ln("starting rpc server")
r.device.Up()
func Start(ctx context.Context) {
var err error
var listener *gonet.TCPListener
var config = config
if listener, err = r.network.ListenTCP(&net.TCPAddr{Port: 80}); check(err) {
return err
// Initializing the tunnel
if tunnel, network, err = netstack.CreateNetTUN([]netip.Addr{deviceIP}, []netip.Addr{}, deviceMTU); check(err) {
startupErrors <- err
return
}
rpc.HandleHTTP()
dev = device.NewDevice(tunnel, conn.NewDefaultBind(), device.NewLogger(device.LogLevelError, "server "))
go rpc.Accept(listener)
return nil
}
func (rpc *RPC) Stop() {
rpc.device.Close()
}
func New(config *RPCConfig) (*RPC, error) {
var err error
var r RPC
if r.tunnel, r.network, err = netstack.CreateNetTUN([]netip.Addr{DefaultIPAddress}, []netip.Addr{}, 1420); check(err) {
return nil, err
}
r.device = device.NewDevice(r.tunnel, conn.NewDefaultBind(), device.NewLogger(device.LogLevelError, "server "))
r.device.SetPrivateKey(config.Key.AsDeviceKey())
r.device.IpcSet("listen_port=" + strconv.Itoa(int(config.ListenPort)))
dev.SetPrivateKey(config.Key.AsDeviceKey())
dev.IpcSet("listen_port=" + strconv.Itoa(int(config.ListenPort)))
for _, peer_whitelist := range config.Peer_Whitelist {
@@ -87,10 +80,51 @@ func New(config *RPCConfig) (*RPC, error) {
"public_key=" + peer_whitelist.HexString() + "\n" +
"allowed_ip=" + "127.0.37.2" + "/32\n"
if err = r.device.IpcSet(deviceConf); check(err) {
return nil, err
if err = dev.IpcSet(deviceConf); check(err) {
startupErrors <- err
return
}
}
return &r, nil
if err = dev.Up(); check(err) {
startupErrors <- err
return
}
if unixSock, err = net.Listen("unix", config.UnixPath); check(err) {
startupErrors <- err
return
}
if tcpSock, err = network.ListenTCP(&net.TCPAddr{Port: devicePort}); check(err) {
startupErrors <- err
return
}
isReady <- true
select {
case <-ctx.Done():
Shutdown()
}
}
func Shutdown() {
log.I.Ln("shutting down rpc server")
if unixSock != nil {
unixSock.Close()
os.Remove(config.UnixPath)
}
if tcpSock != nil {
tcpSock.Close()
}
if dev != nil {
dev.Close()
}
}

50
pkg/rpc/viper.go Normal file
View File

@@ -0,0 +1,50 @@
package rpc
import (
"github.com/multiformats/go-multiaddr"
"github.com/spf13/viper"
"strconv"
)
func ConfigureWithViper() (err error) {
log.I.Ln("initializing the rpc server")
config.SetKey(viper.GetString("rpc-key"))
if config.IsNullKey() {
log.I.Ln("rpc key not provided, generating a new one.")
config.NewKey()
}
log.I.Ln("rpc public key:")
log.I.Ln("-", config.Key.PubKey().Encode())
config.SetUnixPath(viper.GetString("rpc-listen-unix"))
config.SetPort(viper.GetUint16("rpc-listen-port"))
if viper.GetUint16("rpc-listen-port") == NullPort {
viper.Set("rpc-listen-port", config.SetRandomPort())
}
log.I.Ln("rpc listeners:")
log.I.Ln("- [/ip4/0.0.0.0/udp/"+strconv.Itoa(int(config.ListenPort)), "/ip6/:::/udp/"+strconv.Itoa(int(config.ListenPort))+" /unix"+config.UnixPath+"]")
for _, ip := range viper.GetStringSlice("rpc-whitelist-deviceIP") {
config.IP_Whitelist = append(config.IP_Whitelist, multiaddr.StringCast(ip))
}
for _, peer := range viper.GetStringSlice("rpc-whitelist-peer") {
var pubKey RPCPublicKey
pubKey.Decode(peer)
config.Peer_Whitelist = append(config.Peer_Whitelist, pubKey)
}
return
}

View File

@@ -2,7 +2,6 @@ package seed
import (
"git-indra.lan/indra-labs/indra/pkg/cfg"
"git-indra.lan/indra-labs/indra/pkg/rpc"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/multiformats/go-multiaddr"
)
@@ -22,12 +21,6 @@ var DefaultConfig = &Config{
ListenAddresses: []multiaddr.Multiaddr{},
SeedAddresses: []multiaddr.Multiaddr{},
ConnectAddresses: []multiaddr.Multiaddr{},
RPCConfig: &rpc.RPCConfig{
Key: &rpc.DefaultRPCPrivateKey,
ListenPort: 0,
Peer_Whitelist: []rpc.RPCPublicKey{},
IP_Whitelist: []multiaddr.Multiaddr{},
},
}
type Config struct {
@@ -39,6 +32,9 @@ type Config struct {
ListenAddresses []multiaddr.Multiaddr
Params *cfg.Params
RPCConfig *rpc.RPCConfig
}
func (c *Config) SetNetwork(network string) {
c.Params = cfg.SelectNetworkParams(network)
}

View File

@@ -2,8 +2,6 @@ package seed
import (
"context"
"git-indra.lan/indra-labs/indra/pkg/rpc"
"strconv"
"time"
"github.com/libp2p/go-libp2p"
@@ -32,8 +30,6 @@ type Server struct {
config *Config
host host.Host
rpc *rpc.RPC
}
func (srv *Server) Restart() (err error) {
@@ -47,22 +43,18 @@ func (srv *Server) Shutdown() (err error) {
log.I.Ln("shutting down [p2p.host]")
if srv.host.Close(); check(err) {
return
if err = srv.host.Close(); check(err) {
// continue
}
log.I.Ln("shutdown complete")
return nil
return
}
func (srv *Server) Serve() (err error) {
log.I.Ln("starting the server")
if srv.config.RPCConfig.IsEnabled() {
srv.rpc.Start()
}
log.I.Ln("starting the p2p server")
// Here we create a context with cancel and add it to the interrupt handler
var ctx context.Context
@@ -87,7 +79,7 @@ func (srv *Server) Serve() (err error) {
case <-ctx.Done():
log.I.Ln("shutting down server")
log.I.Ln("shutting down p2p server")
srv.Shutdown()
}
@@ -97,28 +89,13 @@ func (srv *Server) Serve() (err error) {
func New(config *Config) (*Server, error) {
log.I.Ln("initializing the server")
log.I.Ln("initializing the p2p server")
var err error
var s Server
s.config = config
if config.RPCConfig.IsEnabled() {
log.I.Ln("enabling rpc server")
if s.rpc, err = rpc.New(config.RPCConfig); check(err) {
return nil, err
}
log.I.Ln("rpc public key:")
log.I.Ln("-", config.RPCConfig.Key.PubKey().Encode())
log.I.Ln("rpc listeners:")
log.I.Ln("- [/ip4/0.0.0.0/udp/"+strconv.Itoa(int(config.RPCConfig.ListenPort)), "/ip6/:::/udp/"+strconv.Itoa(int(config.RPCConfig.ListenPort))+"]")
}
//var client *rpc.RPCClient
//
//if client, err = rpc.NewClient(rpc.DefaultClientConfig); check(err) {