migrating p2p to its own package.

This commit is contained in:
greg stone
2023-02-27 10:45:29 +00:00
parent 5eb60fc94f
commit 1703cf3a79
15 changed files with 301 additions and 291 deletions

View File

@@ -4,28 +4,19 @@ import (
"context"
"git-indra.lan/indra-labs/indra"
"git-indra.lan/indra-labs/indra/pkg/interrupt"
"git-indra.lan/indra-labs/indra/pkg/p2p"
log2 "git-indra.lan/indra-labs/indra/pkg/proc/log"
"git-indra.lan/indra-labs/indra/pkg/rpc"
"git-indra.lan/indra-labs/indra/pkg/seed"
"git-indra.lan/indra-labs/indra/pkg/storage"
"github.com/multiformats/go-multiaddr"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"os"
)
var (
err error
)
func init() {
storage.InitFlags(seedServeCommand)
// Init flags belonging to the seed package
seed.InitFlags(seedServeCommand)
// Init flags belonging to the rpc package
p2p.InitFlags(seedServeCommand)
rpc.InitFlags(seedServeCommand)
seedCommand.AddCommand(seedServeCommand)
@@ -39,116 +30,19 @@ var seedServeCommand = &cobra.Command{
log.I.Ln("-- ", log2.App, "("+viper.GetString("network")+") -", indra.SemVer, "- Network Freedom. --")
log.I.Ln("running seed")
var ctx context.Context
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
interrupt.AddHandler(cancel)
//
// Storage
//
// Seed //
go storage.Run(ctx)
go seed.Run(ctx)
select {
case err = <-storage.CantStart():
log.E.Ln("storage:", err)
log.I.Ln("shutting down")
os.Exit(0)
case <-storage.IsReady():
log.I.Ln("storage is ready")
case <-ctx.Done():
log.I.Ln("shutting down")
os.Exit(0)
}
//
////
//// RPC
////
//
//rpc.RunWith(ctx, func(srv *grpc.Server) {
// chat.RegisterChatServiceServer(srv, &chat.Server{})
//})
//
//select {
//case <-rpc.CantStart():
//
// log.I.Ln("issues starting the rpc server")
// log.I.Ln("attempting a graceful shutdown")
//
// ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
//
// rpc.Shutdown(ctx)
//
// select {
// case <-ctx.Done():
//
// log.I.Ln("can't shutdown gracefully, exiting.")
//
// os.Exit(1)
//
// default:
//
// log.I.Ln("graceful shutdown complete")
//
// os.Exit(0)
// }
//
//case <-rpc.IsReady():
//
// log.I.Ln("rpc server is ready")
//}
//
//examples.TunnelHello(ctx)
//
// P2P
//
var config = seed.DefaultConfig
config.SetNetwork(viper.GetString("network"))
if config.PrivKey, err = seed.GetOrGeneratePrivKey(viper.GetString("key")); check(err) {
return
}
for _, listener := range viper.GetStringSlice("listen") {
config.ListenAddresses = append(config.ListenAddresses, multiaddr.StringCast(listener))
}
for _, seed := range viper.GetStringSlice("seed") {
config.SeedAddresses = append(config.SeedAddresses, multiaddr.StringCast(seed))
}
for _, connector := range viper.GetStringSlice("connect") {
config.ConnectAddresses = append(config.ConnectAddresses, multiaddr.StringCast(connector))
}
var srv *seed.Server
if srv, err = seed.New(config); check(err) {
return
}
if err = srv.Serve(); check(err) {
return
case <-seed.IsShutdown():
log.I.Ln("shutdown complete")
}
log.I.Ln("-- fin --")
return
},
}

View File

@@ -1,4 +1,4 @@
package seed
package p2p
import (
"git-indra.lan/indra-labs/indra/pkg/cfg"

57
pkg/p2p/configure.go Normal file
View File

@@ -0,0 +1,57 @@
package p2p
import (
"github.com/multiformats/go-multiaddr"
"github.com/spf13/viper"
)
func configure() {
configureSeeds()
}
func configureKey() {
if viper.GetString(keyFlag) == "" {
}
}
func configureListeners() {
if len(viper.GetString(listenFlag)) > 0 {
}
}
func configureSeeds() {
if len(viper.GetStringSlice(connectFlag)) > 0 {
log.I.Ln("connect only detected, using only the connect seed addresses")
for _, connector := range viper.GetStringSlice(connectFlag) {
seedAddresses = append(seedAddresses, multiaddr.StringCast(connector))
}
return
}
var err error
if seedAddresses, err = netParams.ParseSeedMultiAddresses(); err != nil {
return
}
if len(viper.GetStringSlice("seed")) > 0 {
log.I.Ln("found", len(viper.GetStringSlice("seed")), "additional seeds.")
for _, seed := range viper.GetStringSlice("seed") {
seedAddresses = append(seedAddresses, multiaddr.StringCast(seed))
}
}
return
}

53
pkg/p2p/flags.go Normal file
View File

@@ -0,0 +1,53 @@
package p2p
import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var (
keyFlag = "p2p-key"
listenFlag = "p2p-listen"
seedFlag = "p2p-seed"
connectFlag = "p2p-connect"
)
var (
key string
listeners []string
seeds []string
connectors []string
)
func InitFlags(cmd *cobra.Command) {
cmd.Flags().StringVarP(&key, keyFlag, "",
"",
"the base58 encoded private key for the seed node")
viper.BindPFlag(keyFlag, cmd.Flags().Lookup(keyFlag))
cmd.PersistentFlags().StringSliceVarP(&listeners, listenFlag, "",
[]string{
"/ip4/127.0.0.1/tcp/8337",
"/ip6/::1/tcp/8337",
},
"binds to an interface",
)
viper.BindPFlag(listenFlag, cmd.PersistentFlags().Lookup(listenFlag))
cmd.PersistentFlags().StringSliceVarP(&seeds, seedFlag, "",
[]string{},
"adds an additional seed connection (e.g /dns4/seed0.indra.org/tcp/8337/p2p/<pub_key>)",
)
viper.BindPFlag(seedFlag, cmd.PersistentFlags().Lookup(seedFlag))
cmd.PersistentFlags().StringSliceVarP(&connectors, connectFlag, "",
[]string{},
"connects only to the seed multi-addresses specified",
)
viper.BindPFlag(connectFlag, cmd.PersistentFlags().Lookup(connectFlag))
}

11
pkg/p2p/log.go Normal file
View File

@@ -0,0 +1,11 @@
package p2p
import (
"git-indra.lan/indra-labs/indra"
log2 "git-indra.lan/indra-labs/indra/pkg/proc/log"
)
var (
log = log2.GetLogger(indra.PathBase)
check = log.E.Chk
)

View File

@@ -1,7 +1,10 @@
package seed
package p2p
import (
"context"
"git-indra.lan/indra-labs/indra/pkg/cfg"
"git-indra.lan/indra-labs/indra/pkg/p2p/metrics"
"github.com/libp2p/go-libp2p/core/crypto"
"time"
"github.com/libp2p/go-libp2p"
@@ -11,19 +14,25 @@ import (
"git-indra.lan/indra-labs/indra"
"git-indra.lan/indra-labs/indra/pkg/interrupt"
"git-indra.lan/indra-labs/indra/pkg/p2p/introducer"
log2 "git-indra.lan/indra-labs/indra/pkg/proc/log"
"git-indra.lan/indra-labs/indra/pkg/seed/metrics"
)
var (
log = log2.GetLogger(indra.PathBase)
check = log.E.Chk
)
var (
userAgent = "/indra:" + indra.SemVer + "/"
)
var (
privKey crypto.PrivKey
p2pHost host.Host
seedAddresses []multiaddr.Multiaddr
listenAddresses []multiaddr.Multiaddr
netParams *cfg.Params
)
func init() {
seedAddresses = []multiaddr.Multiaddr{}
listenAddresses = []multiaddr.Multiaddr{}
}
type Server struct {
context.Context
@@ -32,13 +41,6 @@ type Server struct {
host host.Host
}
func (srv *Server) Restart() (err error) {
log.I.Ln("restarting the server.")
return nil
}
func (srv *Server) Shutdown() (err error) {
if err = srv.host.Close(); check(err) {
@@ -73,14 +75,6 @@ func (srv *Server) Serve() (err error) {
go metrics.HostStatus(ctx, srv.host)
//var client *rpc.RPCClient
//
//if client, err = rpc.NewClient(rpc.DefaultClientConfig); check(err) {
// return err
//}
//
//client.Start()
select {
case <-ctx.Done():

View File

@@ -1,49 +1,11 @@
package seed
package p2p
import (
"github.com/btcsuite/btcd/btcutil/base58"
"github.com/btcsuite/btcd/btcutil/bech32"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/spf13/viper"
)
func bech32encode(key crypto.PrivKey) (keyStr string, err error) {
var raw []byte
if raw, err = key.Raw(); check(err) {
return
}
var conv []byte
if conv, err = bech32.ConvertBits(raw, 8, 5, true); check(err) {
return
}
if keyStr, err = bech32.Encode("ind", conv); check(err) {
return
}
return
}
func bech32decode(keyStr string) (privKey crypto.PrivKey, err error) {
// var hnd string
var key []byte
if _, key, err = bech32.Decode(keyStr); check(err) {
return
}
if privKey, err = crypto.UnmarshalSecp256k1PrivateKey(key); check(err) {
return
}
return privKey, nil
}
func GeneratePrivKey() (privKey crypto.PrivKey) {
var err error
@@ -89,7 +51,7 @@ func GetOrGeneratePrivKey(key string) (privKey crypto.PrivKey, err error) {
return
}
viper.Set("key", key)
viper.Set(keyFlag, key)
return
}

View File

@@ -1,4 +1,4 @@
package seed
package p2p
import (
"crypto/rand"

View File

@@ -59,8 +59,6 @@ func Start(ctx context.Context) {
func Shutdown(ctx context.Context) {
defer ctx.Done()
log.I.Ln("shutting down rpc server")
stopUnixSocket()

View File

@@ -1,25 +0,0 @@
package seed
import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var (
key string
listeners []string
seeds []string
connectors []string
)
func InitFlags(cmd *cobra.Command) {
cmd.PersistentFlags().StringVarP(&key, "key", "k", "", "the base58 encoded private key for the seed node")
cmd.PersistentFlags().StringSliceVarP(&listeners, "listen", "l", []string{"/ip4/127.0.0.1/tcp/8337", "/ip6/::1/tcp/8337"}, "binds to an interface")
cmd.PersistentFlags().StringSliceVarP(&seeds, "seed", "s", []string{}, "adds an additional seed connection (e.g /dns4/seed0.indra.org/tcp/8337/p2p/<pub_key>)")
cmd.PersistentFlags().StringSliceVarP(&connectors, "connect", "c", []string{}, "connects only to the seed multi-addresses specified")
viper.BindPFlag("key", cmd.PersistentFlags().Lookup("key"))
viper.BindPFlag("listen", cmd.PersistentFlags().Lookup("listen"))
viper.BindPFlag("seed", cmd.PersistentFlags().Lookup("seed"))
viper.BindPFlag("connect", cmd.PersistentFlags().Lookup("connect"))
}

11
pkg/seed/log.go Normal file
View File

@@ -0,0 +1,11 @@
package seed
import (
"git-indra.lan/indra-labs/indra"
log2 "git-indra.lan/indra-labs/indra/pkg/proc/log"
)
var (
log = log2.GetLogger(indra.PathBase)
check = log.E.Chk
)

80
pkg/seed/proc.go Normal file
View File

@@ -0,0 +1,80 @@
package seed
import (
"context"
"git-indra.lan/indra-labs/indra/pkg/storage"
"sync"
)
var (
inUse sync.Mutex
)
var (
startupErrors = make(chan error, 32)
isReadyChan = make(chan bool, 1)
isShutdownChan = make(chan bool, 1)
)
func CantStart() chan error {
return startupErrors
}
func IsReady() chan bool {
return isReadyChan
}
func IsShutdown() chan bool {
return isShutdownChan
}
func Shutdown() {
log.I.Ln("shutting down seed")
err := storage.Shutdown()
check(err)
isShutdownChan <- true
}
func Run(ctx context.Context) {
if !inUse.TryLock() {
log.E.Ln("seed is in use")
return
}
log.I.Ln("running seed")
var err error
go storage.Run(ctx)
signals:
for {
select {
case err = <-CantStart():
log.E.Ln("startup error:", err)
return
case <-storage.IsLocked():
log.I.Ln("storage is locked, waiting for unlock")
//go rpc.RunWith(ctx, func(srv *grpc.Server) {
// storage.RegisterUnlockServiceServer(srv, storage.NewUnlockService())
//})
// Run RPC unlock
case <-storage.IsReady():
break signals
case <-ctx.Done():
Shutdown()
return
}
}
log.I.Ln("seed is ready")
isReadyChan <- true
}

View File

@@ -1,28 +1,25 @@
package storage
import (
"errors"
"fmt"
"github.com/spf13/viper"
"golang.org/x/term"
"os"
"strings"
"syscall"
)
var (
isNewKey bool
isRPCUnlockable bool
key Key
isNewKey bool
isNewDB bool
isLocked bool
key Key
)
func configure() {
log.I.Ln("initializing storage")
configureKey()
configureDirPath()
configureFile()
configureKey()
}
func configureKey() {
@@ -51,6 +48,8 @@ func configureKey() {
return
}
log.I.Ln("keyfile found")
if fileInfo.Mode() != 0600 {
log.W.Ln("keyfile permissions are too open:", fileInfo.Mode())
log.W.Ln("It is recommended that you change them to 0600")
@@ -68,34 +67,16 @@ func configureKey() {
return
}
log.I.Ln("no keyfile found, checking for rpc unlock")
if !isNewDB {
if viper.GetBool(storeKeyRPCFlag) {
log.I.Ln("no keyfile found")
log.I.Ln("attempting rpc unlock")
isRPCUnlockable = true
isLocked = true
return
}
log.I.Ln("rpc unlock disabled, checking for a user prompt")
if viper.GetBool(storeAskPassFlag) {
log.I.Ln("prompting user for key")
var password []byte
fmt.Print("Enter Encryption Key: ")
password, err = term.ReadPassword(int(syscall.Stdin))
key.Decode(string(password))
return
}
log.I.Ln("no prompt specified, generating a new key")
log.I.Ln("no keyfile found, generating a new key")
isNewKey = true
@@ -162,28 +143,18 @@ func configureFile() {
log.I.Ln("using storage db path:")
log.I.Ln("-", viper.GetString(storeFilePathFlag))
log.I.Ln("checking if database exists")
var err error
if _, err = os.Stat(viper.GetString(storeFilePathFlag)); err != nil {
log.I.Ln("none found, creating a new one")
log.I.Ln("no database found, creating a new one")
//file, err := os.OpenFile(viper.GetString(storeFilePathFlag), os.O_WRONLY, 0666)
//
//if err != nil && os.IsPermission(err) {
// startupErrors <- err
// return
//}
//
//file.Close()
//
//os.Remove(viper.GetString(storeFilePathFlag))
isNewDB = true
return
}
if isNewKey {
startupErrors <- errors.New("new key generated for an existing database. check your configuration.")
return
}
log.I.Ln("database found")
}

View File

@@ -2,10 +2,9 @@ package storage
import (
"context"
"git-indra.lan/indra-labs/indra/pkg/rpc"
"git-indra.lan/indra-labs/indra/pkg/interrupt"
"github.com/dgraph-io/badger/v3"
"github.com/spf13/viper"
"google.golang.org/grpc"
"sync"
)
@@ -17,15 +16,20 @@ var (
db *badger.DB
opts badger.Options
startupErrors = make(chan error, 128)
isReady = make(chan bool, 1)
isLockedChan = make(chan bool, 1)
isReadyChan = make(chan bool, 1)
)
func CantStart() chan error {
return startupErrors
}
func IsLocked() chan bool {
return isLockedChan
}
func IsReady() chan bool {
return isReady
return isReadyChan
}
func Shutdown() (err error) {
@@ -40,7 +44,7 @@ func Shutdown() (err error) {
return
}
log.I.Ln("storage shutdown complete")
log.I.Ln("storage shutdown completed")
return
}
@@ -56,6 +60,23 @@ var (
running sync.Mutex
)
func open() {
var err error
opts.EncryptionKey = key.Bytes()
if db, err = badger.Open(opts); check(err) {
startupErrors <- err
return
}
log.I.Ln("successfully opened database")
log.I.Ln("storage is ready")
isReadyChan <- true
}
func Run(ctx context.Context) {
if !running.TryLock() {
@@ -68,50 +89,33 @@ func Run(ctx context.Context) {
opts.IndexCacheSize = 128 << 20
opts.Logger = nil
if isRPCUnlockable {
if !isLocked {
var unlock = NewUnlockService()
log.I.Ln("attempting to open database with key")
go rpc.RunWith(ctx, func(srv *grpc.Server) {
RegisterUnlockServiceServer(srv, unlock)
})
open()
}
for {
select {
case <-rpc.IsReady():
isLockedChan <- true
log.I.Ln("waiting for unlock")
lockedCtx, cancel := context.WithCancel(context.Background())
case <-unlock.IsSuccessful():
interrupt.AddHandler(cancel)
log.I.Ln("storage successfully unlocked")
for {
select {
case <-IsReady():
log.I.Ln("storage is ready")
isReady <- true
//case <-unlock.IsSuccessful():
//
// log.I.Ln("storage successfully unlocked")
//
// isReadyChan <- true
case <-ctx.Done():
Shutdown()
return
}
case <-lockedCtx.Done():
Shutdown()
return
}
}
var err error
opts.EncryptionKey = key.Bytes()
if db, err = badger.Open(opts); check(err) {
startupErrors <- err
return
}
log.I.Ln("running storage")
isReady <- true
select {
case <-ctx.Done():
Shutdown()
}
return
}