From 1703cf3a791093bd7f1d70b0233e1f73a85014d5 Mon Sep 17 00:00:00 2001 From: greg stone Date: Mon, 27 Feb 2023 10:45:29 +0000 Subject: [PATCH] migrating p2p to its own package. --- cmd/indra/seed_serve.go | 120 ++----------------- pkg/{seed => p2p}/config.go | 2 +- pkg/p2p/configure.go | 57 +++++++++ pkg/p2p/flags.go | 53 ++++++++ pkg/p2p/log.go | 11 ++ pkg/{seed => p2p}/metrics/host.go | 0 pkg/{seed => p2p}/server.go | 40 +++---- pkg/{seed/utils.go => p2p/util.go} | 42 +------ pkg/{seed/utils_test.go => p2p/util_test.go} | 2 +- pkg/rpc/server.go | 2 - pkg/seed/flags.go | 25 ---- pkg/seed/log.go | 11 ++ pkg/seed/proc.go | 80 +++++++++++++ pkg/storage/cmd.go | 61 +++------- pkg/storage/service.go | 86 ++++++------- 15 files changed, 301 insertions(+), 291 deletions(-) rename pkg/{seed => p2p}/config.go (98%) create mode 100644 pkg/p2p/configure.go create mode 100644 pkg/p2p/flags.go create mode 100644 pkg/p2p/log.go rename pkg/{seed => p2p}/metrics/host.go (100%) rename pkg/{seed => p2p}/server.go (83%) rename pkg/{seed/utils.go => p2p/util.go} (57%) rename pkg/{seed/utils_test.go => p2p/util_test.go} (98%) delete mode 100644 pkg/seed/flags.go create mode 100644 pkg/seed/log.go create mode 100644 pkg/seed/proc.go diff --git a/cmd/indra/seed_serve.go b/cmd/indra/seed_serve.go index ec35d176..e1245528 100644 --- a/cmd/indra/seed_serve.go +++ b/cmd/indra/seed_serve.go @@ -4,28 +4,19 @@ import ( "context" "git-indra.lan/indra-labs/indra" "git-indra.lan/indra-labs/indra/pkg/interrupt" + "git-indra.lan/indra-labs/indra/pkg/p2p" log2 "git-indra.lan/indra-labs/indra/pkg/proc/log" "git-indra.lan/indra-labs/indra/pkg/rpc" "git-indra.lan/indra-labs/indra/pkg/seed" "git-indra.lan/indra-labs/indra/pkg/storage" - "github.com/multiformats/go-multiaddr" "github.com/spf13/cobra" "github.com/spf13/viper" - "os" -) - -var ( - err error ) func init() { storage.InitFlags(seedServeCommand) - - // Init flags belonging to the seed package - seed.InitFlags(seedServeCommand) - - // Init flags belonging to the rpc package + p2p.InitFlags(seedServeCommand) rpc.InitFlags(seedServeCommand) seedCommand.AddCommand(seedServeCommand) @@ -39,116 +30,19 @@ var seedServeCommand = &cobra.Command{ log.I.Ln("-- ", log2.App, "("+viper.GetString("network")+") -", indra.SemVer, "- Network Freedom. --") - log.I.Ln("running seed") - - var ctx context.Context - var cancel context.CancelFunc - - ctx, cancel = context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) interrupt.AddHandler(cancel) - // - // Storage - // + // Seed // - go storage.Run(ctx) + go seed.Run(ctx) select { - case err = <-storage.CantStart(): - - log.E.Ln("storage:", err) - log.I.Ln("shutting down") - - os.Exit(0) - - case <-storage.IsReady(): - - log.I.Ln("storage is ready") - - case <-ctx.Done(): - - log.I.Ln("shutting down") - - os.Exit(0) - } - - // - //// - //// RPC - //// - // - //rpc.RunWith(ctx, func(srv *grpc.Server) { - // chat.RegisterChatServiceServer(srv, &chat.Server{}) - //}) - // - //select { - //case <-rpc.CantStart(): - // - // log.I.Ln("issues starting the rpc server") - // log.I.Ln("attempting a graceful shutdown") - // - // ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) - // - // rpc.Shutdown(ctx) - // - // select { - // case <-ctx.Done(): - // - // log.I.Ln("can't shutdown gracefully, exiting.") - // - // os.Exit(1) - // - // default: - // - // log.I.Ln("graceful shutdown complete") - // - // os.Exit(0) - // } - // - //case <-rpc.IsReady(): - // - // log.I.Ln("rpc server is ready") - //} - // - //examples.TunnelHello(ctx) - - // - // P2P - // - - var config = seed.DefaultConfig - - config.SetNetwork(viper.GetString("network")) - - if config.PrivKey, err = seed.GetOrGeneratePrivKey(viper.GetString("key")); check(err) { - return - } - - for _, listener := range viper.GetStringSlice("listen") { - config.ListenAddresses = append(config.ListenAddresses, multiaddr.StringCast(listener)) - } - - for _, seed := range viper.GetStringSlice("seed") { - config.SeedAddresses = append(config.SeedAddresses, multiaddr.StringCast(seed)) - } - - for _, connector := range viper.GetStringSlice("connect") { - config.ConnectAddresses = append(config.ConnectAddresses, multiaddr.StringCast(connector)) - } - - var srv *seed.Server - - if srv, err = seed.New(config); check(err) { - return - } - - if err = srv.Serve(); check(err) { - return + case <-seed.IsShutdown(): + log.I.Ln("shutdown complete") } log.I.Ln("-- fin --") - - return }, } diff --git a/pkg/seed/config.go b/pkg/p2p/config.go similarity index 98% rename from pkg/seed/config.go rename to pkg/p2p/config.go index ce22c11e..dc40d585 100644 --- a/pkg/seed/config.go +++ b/pkg/p2p/config.go @@ -1,4 +1,4 @@ -package seed +package p2p import ( "git-indra.lan/indra-labs/indra/pkg/cfg" diff --git a/pkg/p2p/configure.go b/pkg/p2p/configure.go new file mode 100644 index 00000000..9ea07312 --- /dev/null +++ b/pkg/p2p/configure.go @@ -0,0 +1,57 @@ +package p2p + +import ( + "github.com/multiformats/go-multiaddr" + "github.com/spf13/viper" +) + +func configure() { + configureSeeds() +} + +func configureKey() { + + if viper.GetString(keyFlag) == "" { + + } + +} + +func configureListeners() { + + if len(viper.GetString(listenFlag)) > 0 { + + } + +} + +func configureSeeds() { + + if len(viper.GetStringSlice(connectFlag)) > 0 { + + log.I.Ln("connect only detected, using only the connect seed addresses") + + for _, connector := range viper.GetStringSlice(connectFlag) { + seedAddresses = append(seedAddresses, multiaddr.StringCast(connector)) + } + + return + } + + var err error + + if seedAddresses, err = netParams.ParseSeedMultiAddresses(); err != nil { + return + } + + if len(viper.GetStringSlice("seed")) > 0 { + + log.I.Ln("found", len(viper.GetStringSlice("seed")), "additional seeds.") + + for _, seed := range viper.GetStringSlice("seed") { + seedAddresses = append(seedAddresses, multiaddr.StringCast(seed)) + } + } + + return +} diff --git a/pkg/p2p/flags.go b/pkg/p2p/flags.go new file mode 100644 index 00000000..86eb1125 --- /dev/null +++ b/pkg/p2p/flags.go @@ -0,0 +1,53 @@ +package p2p + +import ( + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +var ( + keyFlag = "p2p-key" + listenFlag = "p2p-listen" + seedFlag = "p2p-seed" + connectFlag = "p2p-connect" +) + +var ( + key string + listeners []string + seeds []string + connectors []string +) + +func InitFlags(cmd *cobra.Command) { + + cmd.Flags().StringVarP(&key, keyFlag, "", + "", + "the base58 encoded private key for the seed node") + + viper.BindPFlag(keyFlag, cmd.Flags().Lookup(keyFlag)) + + cmd.PersistentFlags().StringSliceVarP(&listeners, listenFlag, "", + []string{ + "/ip4/127.0.0.1/tcp/8337", + "/ip6/::1/tcp/8337", + }, + "binds to an interface", + ) + + viper.BindPFlag(listenFlag, cmd.PersistentFlags().Lookup(listenFlag)) + + cmd.PersistentFlags().StringSliceVarP(&seeds, seedFlag, "", + []string{}, + "adds an additional seed connection (e.g /dns4/seed0.indra.org/tcp/8337/p2p/)", + ) + + viper.BindPFlag(seedFlag, cmd.PersistentFlags().Lookup(seedFlag)) + + cmd.PersistentFlags().StringSliceVarP(&connectors, connectFlag, "", + []string{}, + "connects only to the seed multi-addresses specified", + ) + + viper.BindPFlag(connectFlag, cmd.PersistentFlags().Lookup(connectFlag)) +} diff --git a/pkg/p2p/log.go b/pkg/p2p/log.go new file mode 100644 index 00000000..23441ecf --- /dev/null +++ b/pkg/p2p/log.go @@ -0,0 +1,11 @@ +package p2p + +import ( + "git-indra.lan/indra-labs/indra" + log2 "git-indra.lan/indra-labs/indra/pkg/proc/log" +) + +var ( + log = log2.GetLogger(indra.PathBase) + check = log.E.Chk +) diff --git a/pkg/seed/metrics/host.go b/pkg/p2p/metrics/host.go similarity index 100% rename from pkg/seed/metrics/host.go rename to pkg/p2p/metrics/host.go diff --git a/pkg/seed/server.go b/pkg/p2p/server.go similarity index 83% rename from pkg/seed/server.go rename to pkg/p2p/server.go index 9f12fcb1..bf87ec21 100644 --- a/pkg/seed/server.go +++ b/pkg/p2p/server.go @@ -1,7 +1,10 @@ -package seed +package p2p import ( "context" + "git-indra.lan/indra-labs/indra/pkg/cfg" + "git-indra.lan/indra-labs/indra/pkg/p2p/metrics" + "github.com/libp2p/go-libp2p/core/crypto" "time" "github.com/libp2p/go-libp2p" @@ -11,19 +14,25 @@ import ( "git-indra.lan/indra-labs/indra" "git-indra.lan/indra-labs/indra/pkg/interrupt" "git-indra.lan/indra-labs/indra/pkg/p2p/introducer" - log2 "git-indra.lan/indra-labs/indra/pkg/proc/log" - "git-indra.lan/indra-labs/indra/pkg/seed/metrics" -) - -var ( - log = log2.GetLogger(indra.PathBase) - check = log.E.Chk ) var ( userAgent = "/indra:" + indra.SemVer + "/" ) +var ( + privKey crypto.PrivKey + p2pHost host.Host + seedAddresses []multiaddr.Multiaddr + listenAddresses []multiaddr.Multiaddr + netParams *cfg.Params +) + +func init() { + seedAddresses = []multiaddr.Multiaddr{} + listenAddresses = []multiaddr.Multiaddr{} +} + type Server struct { context.Context @@ -32,13 +41,6 @@ type Server struct { host host.Host } -func (srv *Server) Restart() (err error) { - - log.I.Ln("restarting the server.") - - return nil -} - func (srv *Server) Shutdown() (err error) { if err = srv.host.Close(); check(err) { @@ -73,14 +75,6 @@ func (srv *Server) Serve() (err error) { go metrics.HostStatus(ctx, srv.host) - //var client *rpc.RPCClient - // - //if client, err = rpc.NewClient(rpc.DefaultClientConfig); check(err) { - // return err - //} - // - //client.Start() - select { case <-ctx.Done(): diff --git a/pkg/seed/utils.go b/pkg/p2p/util.go similarity index 57% rename from pkg/seed/utils.go rename to pkg/p2p/util.go index 0c287984..1e0cb4dc 100644 --- a/pkg/seed/utils.go +++ b/pkg/p2p/util.go @@ -1,49 +1,11 @@ -package seed +package p2p import ( "github.com/btcsuite/btcd/btcutil/base58" - "github.com/btcsuite/btcd/btcutil/bech32" "github.com/libp2p/go-libp2p/core/crypto" "github.com/spf13/viper" ) -func bech32encode(key crypto.PrivKey) (keyStr string, err error) { - - var raw []byte - - if raw, err = key.Raw(); check(err) { - return - } - - var conv []byte - - if conv, err = bech32.ConvertBits(raw, 8, 5, true); check(err) { - return - } - - if keyStr, err = bech32.Encode("ind", conv); check(err) { - return - } - - return -} - -func bech32decode(keyStr string) (privKey crypto.PrivKey, err error) { - - // var hnd string - var key []byte - - if _, key, err = bech32.Decode(keyStr); check(err) { - return - } - - if privKey, err = crypto.UnmarshalSecp256k1PrivateKey(key); check(err) { - return - } - - return privKey, nil -} - func GeneratePrivKey() (privKey crypto.PrivKey) { var err error @@ -89,7 +51,7 @@ func GetOrGeneratePrivKey(key string) (privKey crypto.PrivKey, err error) { return } - viper.Set("key", key) + viper.Set(keyFlag, key) return } diff --git a/pkg/seed/utils_test.go b/pkg/p2p/util_test.go similarity index 98% rename from pkg/seed/utils_test.go rename to pkg/p2p/util_test.go index dca0b302..a2b77362 100644 --- a/pkg/seed/utils_test.go +++ b/pkg/p2p/util_test.go @@ -1,4 +1,4 @@ -package seed +package p2p import ( "crypto/rand" diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go index 640de4e5..6809b6c3 100644 --- a/pkg/rpc/server.go +++ b/pkg/rpc/server.go @@ -59,8 +59,6 @@ func Start(ctx context.Context) { func Shutdown(ctx context.Context) { - defer ctx.Done() - log.I.Ln("shutting down rpc server") stopUnixSocket() diff --git a/pkg/seed/flags.go b/pkg/seed/flags.go deleted file mode 100644 index b476ebad..00000000 --- a/pkg/seed/flags.go +++ /dev/null @@ -1,25 +0,0 @@ -package seed - -import ( - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -var ( - key string - listeners []string - seeds []string - connectors []string -) - -func InitFlags(cmd *cobra.Command) { - cmd.PersistentFlags().StringVarP(&key, "key", "k", "", "the base58 encoded private key for the seed node") - cmd.PersistentFlags().StringSliceVarP(&listeners, "listen", "l", []string{"/ip4/127.0.0.1/tcp/8337", "/ip6/::1/tcp/8337"}, "binds to an interface") - cmd.PersistentFlags().StringSliceVarP(&seeds, "seed", "s", []string{}, "adds an additional seed connection (e.g /dns4/seed0.indra.org/tcp/8337/p2p/)") - cmd.PersistentFlags().StringSliceVarP(&connectors, "connect", "c", []string{}, "connects only to the seed multi-addresses specified") - - viper.BindPFlag("key", cmd.PersistentFlags().Lookup("key")) - viper.BindPFlag("listen", cmd.PersistentFlags().Lookup("listen")) - viper.BindPFlag("seed", cmd.PersistentFlags().Lookup("seed")) - viper.BindPFlag("connect", cmd.PersistentFlags().Lookup("connect")) -} diff --git a/pkg/seed/log.go b/pkg/seed/log.go new file mode 100644 index 00000000..9d23cb07 --- /dev/null +++ b/pkg/seed/log.go @@ -0,0 +1,11 @@ +package seed + +import ( + "git-indra.lan/indra-labs/indra" + log2 "git-indra.lan/indra-labs/indra/pkg/proc/log" +) + +var ( + log = log2.GetLogger(indra.PathBase) + check = log.E.Chk +) diff --git a/pkg/seed/proc.go b/pkg/seed/proc.go new file mode 100644 index 00000000..47cca06e --- /dev/null +++ b/pkg/seed/proc.go @@ -0,0 +1,80 @@ +package seed + +import ( + "context" + "git-indra.lan/indra-labs/indra/pkg/storage" + "sync" +) + +var ( + inUse sync.Mutex +) + +var ( + startupErrors = make(chan error, 32) + isReadyChan = make(chan bool, 1) + isShutdownChan = make(chan bool, 1) +) + +func CantStart() chan error { + return startupErrors +} + +func IsReady() chan bool { + return isReadyChan +} + +func IsShutdown() chan bool { + return isShutdownChan +} + +func Shutdown() { + + log.I.Ln("shutting down seed") + + err := storage.Shutdown() + check(err) + + isShutdownChan <- true +} + +func Run(ctx context.Context) { + + if !inUse.TryLock() { + log.E.Ln("seed is in use") + return + } + + log.I.Ln("running seed") + + var err error + + go storage.Run(ctx) + +signals: + for { + select { + case err = <-CantStart(): + log.E.Ln("startup error:", err) + return + case <-storage.IsLocked(): + + log.I.Ln("storage is locked, waiting for unlock") + + //go rpc.RunWith(ctx, func(srv *grpc.Server) { + // storage.RegisterUnlockServiceServer(srv, storage.NewUnlockService()) + //}) + + // Run RPC unlock + case <-storage.IsReady(): + break signals + case <-ctx.Done(): + Shutdown() + return + } + } + + log.I.Ln("seed is ready") + + isReadyChan <- true +} diff --git a/pkg/storage/cmd.go b/pkg/storage/cmd.go index 7b33b80b..a4876ba4 100644 --- a/pkg/storage/cmd.go +++ b/pkg/storage/cmd.go @@ -1,28 +1,25 @@ package storage import ( - "errors" - "fmt" "github.com/spf13/viper" - "golang.org/x/term" "os" "strings" - "syscall" ) var ( - isNewKey bool - isRPCUnlockable bool - key Key + isNewKey bool + isNewDB bool + isLocked bool + key Key ) func configure() { log.I.Ln("initializing storage") - configureKey() configureDirPath() configureFile() + configureKey() } func configureKey() { @@ -51,6 +48,8 @@ func configureKey() { return } + log.I.Ln("keyfile found") + if fileInfo.Mode() != 0600 { log.W.Ln("keyfile permissions are too open:", fileInfo.Mode()) log.W.Ln("It is recommended that you change them to 0600") @@ -68,34 +67,16 @@ func configureKey() { return } - log.I.Ln("no keyfile found, checking for rpc unlock") + if !isNewDB { - if viper.GetBool(storeKeyRPCFlag) { + log.I.Ln("no keyfile found") - log.I.Ln("attempting rpc unlock") - - isRPCUnlockable = true + isLocked = true return } - log.I.Ln("rpc unlock disabled, checking for a user prompt") - - if viper.GetBool(storeAskPassFlag) { - - log.I.Ln("prompting user for key") - - var password []byte - - fmt.Print("Enter Encryption Key: ") - password, err = term.ReadPassword(int(syscall.Stdin)) - - key.Decode(string(password)) - - return - } - - log.I.Ln("no prompt specified, generating a new key") + log.I.Ln("no keyfile found, generating a new key") isNewKey = true @@ -162,28 +143,18 @@ func configureFile() { log.I.Ln("using storage db path:") log.I.Ln("-", viper.GetString(storeFilePathFlag)) + log.I.Ln("checking if database exists") + var err error if _, err = os.Stat(viper.GetString(storeFilePathFlag)); err != nil { - log.I.Ln("none found, creating a new one") + log.I.Ln("no database found, creating a new one") - //file, err := os.OpenFile(viper.GetString(storeFilePathFlag), os.O_WRONLY, 0666) - // - //if err != nil && os.IsPermission(err) { - // startupErrors <- err - // return - //} - // - //file.Close() - // - //os.Remove(viper.GetString(storeFilePathFlag)) + isNewDB = true return } - if isNewKey { - startupErrors <- errors.New("new key generated for an existing database. check your configuration.") - return - } + log.I.Ln("database found") } diff --git a/pkg/storage/service.go b/pkg/storage/service.go index ba61ded2..75d00946 100644 --- a/pkg/storage/service.go +++ b/pkg/storage/service.go @@ -2,10 +2,9 @@ package storage import ( "context" - "git-indra.lan/indra-labs/indra/pkg/rpc" + "git-indra.lan/indra-labs/indra/pkg/interrupt" "github.com/dgraph-io/badger/v3" "github.com/spf13/viper" - "google.golang.org/grpc" "sync" ) @@ -17,15 +16,20 @@ var ( db *badger.DB opts badger.Options startupErrors = make(chan error, 128) - isReady = make(chan bool, 1) + isLockedChan = make(chan bool, 1) + isReadyChan = make(chan bool, 1) ) func CantStart() chan error { return startupErrors } +func IsLocked() chan bool { + return isLockedChan +} + func IsReady() chan bool { - return isReady + return isReadyChan } func Shutdown() (err error) { @@ -40,7 +44,7 @@ func Shutdown() (err error) { return } - log.I.Ln("storage shutdown complete") + log.I.Ln("storage shutdown completed") return } @@ -56,6 +60,23 @@ var ( running sync.Mutex ) +func open() { + + var err error + + opts.EncryptionKey = key.Bytes() + + if db, err = badger.Open(opts); check(err) { + startupErrors <- err + return + } + + log.I.Ln("successfully opened database") + log.I.Ln("storage is ready") + + isReadyChan <- true +} + func Run(ctx context.Context) { if !running.TryLock() { @@ -68,50 +89,33 @@ func Run(ctx context.Context) { opts.IndexCacheSize = 128 << 20 opts.Logger = nil - if isRPCUnlockable { + if !isLocked { - var unlock = NewUnlockService() + log.I.Ln("attempting to open database with key") - go rpc.RunWith(ctx, func(srv *grpc.Server) { - RegisterUnlockServiceServer(srv, unlock) - }) + open() + } - for { - select { - case <-rpc.IsReady(): + isLockedChan <- true - log.I.Ln("waiting for unlock") + lockedCtx, cancel := context.WithCancel(context.Background()) - case <-unlock.IsSuccessful(): + interrupt.AddHandler(cancel) - log.I.Ln("storage successfully unlocked") + for { + select { + case <-IsReady(): + log.I.Ln("storage is ready") - isReady <- true + //case <-unlock.IsSuccessful(): + // + // log.I.Ln("storage successfully unlocked") + // + // isReadyChan <- true - case <-ctx.Done(): - Shutdown() - return - } + case <-lockedCtx.Done(): + Shutdown() + return } } - - var err error - - opts.EncryptionKey = key.Bytes() - - if db, err = badger.Open(opts); check(err) { - startupErrors <- err - return - } - - log.I.Ln("running storage") - - isReady <- true - - select { - case <-ctx.Done(): - Shutdown() - } - - return }