reintegrating p2p

This commit is contained in:
greg stone
2023-03-01 07:25:16 +00:00
parent f6099db3bf
commit 7bbd0869c8
11 changed files with 239 additions and 143 deletions

View File

@@ -3,6 +3,7 @@ package main
import (
"context"
"git-indra.lan/indra-labs/indra"
"git-indra.lan/indra-labs/indra/pkg/cfg"
"git-indra.lan/indra-labs/indra/pkg/interrupt"
"git-indra.lan/indra-labs/indra/pkg/p2p"
log2 "git-indra.lan/indra-labs/indra/pkg/proc/log"
@@ -30,6 +31,8 @@ var seedServeCmd = &cobra.Command{
log.I.Ln("-- ", log2.App, "("+viper.GetString("network")+") -", indra.SemVer, "- Network Freedom. --")
cfg.SelectNetworkParams(viper.GetString("network"))
ctx, cancel := context.WithCancel(context.Background())
interrupt.AddHandler(cancel)

View File

@@ -1,28 +1,80 @@
package p2p
import (
"git-indra.lan/indra-labs/indra/pkg/cfg"
"git-indra.lan/indra-labs/indra/pkg/storage"
"github.com/dgraph-io/badger/v3"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/multiformats/go-multiaddr"
"github.com/spf13/viper"
)
func configure() {
log.I.Ln("initializing p2p")
configureKey()
configureListeners()
configureSeeds()
}
func configureKey() {
if viper.GetString(keyFlag) == "" {
log.I.Ln("looking for key in storage")
var err error
var item *badger.Item
var keyBytes []byte = make([]byte, 32)
err = storage.View(func(txn *badger.Txn) error {
if item, err = txn.Get([]byte(keyFlag)); err != nil {
return err
}
item.ValueCopy(keyBytes)
return nil
})
if err == badger.ErrKeyNotFound {
log.I.Ln("key not found, generating a new one")
if privKey, _, err = crypto.GenerateKeyPair(crypto.Secp256k1, 0); check(err) {
return
}
if keyBytes, err = privKey.Raw(); check(err) {
return
}
err = storage.Update(func(txn *badger.Txn) error {
err = txn.Set([]byte(keyFlag), keyBytes)
check(err)
return nil
})
return
}
if privKey, err = crypto.UnmarshalSecp256k1PrivateKey(keyBytes); check(err) {
return
}
log.I.Ln("key found")
}
func configureListeners() {
if len(viper.GetString(listenFlag)) > 0 {
if len(viper.GetStringSlice(listenFlag)) == 0 {
log.I.Ln("no listeners found, using defaults")
return
}
for _, listener := range viper.GetStringSlice(listenFlag) {
listenAddresses = append(listenAddresses, multiaddr.StringCast(listener))
}
}
func configureSeeds() {
@@ -40,6 +92,8 @@ func configureSeeds() {
var err error
netParams = cfg.SelectNetworkParams(viper.GetString("network"))
if seedAddresses, err = netParams.ParseSeedMultiAddresses(); err != nil {
return
}

View File

@@ -87,13 +87,15 @@ func Bootstrap(ctx context.Context, host host.Host, seeds []multiaddr.Multiaddr)
log.I.Ln("[introducer.bootstrap] is ready")
select {
case <-c.Done():
go func() {
select {
case <-c.Done():
log.I.Ln("shutting down [introducer.bootstrap]")
log.I.Ln("shutting down [introducer.bootstrap]")
return
}
return
}
}()
return
}

1
pkg/p2p/keys.go Normal file
View File

@@ -0,0 +1 @@
package p2p

View File

@@ -39,23 +39,23 @@ func HostStatus(ctx context.Context, host host.Host) {
log.I.Ln("[metrics.hoststatus] is ready")
for {
go func() {
for {
select {
case <-time.After(hostStatusInterval):
select {
log.I.Ln()
log.I.Ln("---- host status ----")
log.I.Ln("-- peers:", len(host.Network().Peers()))
log.I.Ln("-- connections:", len(host.Network().Conns()))
log.I.Ln("---- ---- ------ ----")
case <-time.After(hostStatusInterval):
case <-ctx.Done():
log.I.Ln()
log.I.Ln("---- host status ----")
log.I.Ln("-- peers:", len(host.Network().Peers()))
log.I.Ln("-- connections:", len(host.Network().Conns()))
log.I.Ln("---- ---- ------ ----")
log.I.Ln("shutting down [metrics.hoststatus]")
case <-ctx.Done():
log.I.Ln("shutting down [metrics.hoststatus]")
return
return
}
}
}
}()
}

View File

@@ -4,11 +4,11 @@ import (
"context"
"git-indra.lan/indra-labs/indra/pkg/cfg"
"git-indra.lan/indra-labs/indra/pkg/p2p/metrics"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/multiformats/go-multiaddr"
"git-indra.lan/indra-labs/indra"
@@ -33,28 +33,15 @@ func init() {
listenAddresses = []multiaddr.Multiaddr{}
}
type Server struct {
context.Context
func run() {
config *Config
log.I.Ln("starting p2p server")
host host.Host
}
log.I.Ln("host id:")
log.I.Ln("-", p2pHost.ID())
func (srv *Server) Shutdown() (err error) {
if err = srv.host.Close(); check(err) {
// continue
}
log.I.Ln("shutdown complete")
return
}
func (srv *Server) Serve() (err error) {
log.I.Ln("starting the p2p server")
log.I.Ln("p2p listeners:")
log.I.Ln("-", p2pHost.Addrs())
// Here we create a context with cancel and add it to the interrupt handler
var ctx context.Context
@@ -64,64 +51,48 @@ func (srv *Server) Serve() (err error) {
interrupt.AddHandler(cancel)
// Introduce your node to the network
go introducer.Bootstrap(ctx, srv.host, srv.config.SeedAddresses)
introducer.Bootstrap(ctx, p2pHost, seedAddresses)
// Get some basic metrics for the host
// metrics.Init()
// metrics.Set('indra.host.status.reporting.interval', 30 * time.Second)
// metrics.Enable('indra.host.status')
metrics.SetInterval(30 * time.Second)
go metrics.HostStatus(ctx, srv.host)
metrics.HostStatus(ctx, p2pHost)
select {
case <-ctx.Done():
log.I.Ln("shutting down p2p server")
srv.Shutdown()
}
return nil
isReadyChan <- true
}
func New(config *Config) (*Server, error) {
func Run() {
log.I.Ln("initializing the p2p server")
//storage.Update(func(txn *badger.Txn) error {
// txn.Delete([]byte(keyFlag))
// return nil
//})
configure()
var err error
var s Server
s.config = config
p2pHost, err = libp2p.New(
libp2p.Identity(privKey),
libp2p.UserAgent(userAgent),
libp2p.ListenAddrs(listenAddresses...),
)
if s.host, err = libp2p.New(libp2p.Identity(config.PrivKey), libp2p.UserAgent(userAgent), libp2p.ListenAddrs(config.ListenAddresses...)); check(err) {
return nil, err
if check(err) {
return
}
log.I.Ln("host id:")
log.I.Ln("-", s.host.ID())
log.I.Ln("p2p listeners:")
log.I.Ln("-", s.host.Addrs())
if len(config.ConnectAddresses) > 0 {
log.I.Ln("connect detected, using only the connect seed addresses")
config.SeedAddresses = config.ConnectAddresses
return &s, nil
}
var seedAddresses []multiaddr.Multiaddr
if seedAddresses, err = config.Params.ParseSeedMultiAddresses(); check(err) {
return nil, err
}
config.SeedAddresses = append(config.SeedAddresses, seedAddresses...)
return &s, err
run()
}
func Shutdown() (err error) {
log.I.Ln("shutting down p2p server")
if err = p2pHost.Close(); check(err) {
// continue
}
log.I.Ln("- p2p server shutdown complete")
return
}

19
pkg/p2p/signals.go Normal file
View File

@@ -0,0 +1,19 @@
package p2p
var (
startupErrors = make(chan error, 32)
isReadyChan = make(chan bool, 1)
isShutdownChan = make(chan bool, 1)
)
func WhenStartFailed() chan error {
return startupErrors
}
func WhenReady() chan bool {
return isReadyChan
}
func WhenShutdown() chan bool {
return isShutdownChan
}

View File

@@ -3,19 +3,18 @@ package p2p
import (
"github.com/btcsuite/btcd/btcutil/base58"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/spf13/viper"
)
func GeneratePrivKey() (privKey crypto.PrivKey) {
var err error
if privKey, _, err = crypto.GenerateKeyPair(crypto.Secp256k1, 0); check(err) {
return
}
return
}
//func GeneratePrivKey() (privKey crypto.PrivKey) {
//
// var err error
//
// if privKey, _, err = crypto.GenerateKeyPair(crypto.Secp256k1, 0); check(err) {
// return
// }
//
// return
//}
func Base58Encode(priv crypto.PrivKey) (key string, err error) {
@@ -41,24 +40,24 @@ func Base58Decode(key string) (priv crypto.PrivKey, err error) {
return
}
func GetOrGeneratePrivKey(key string) (privKey crypto.PrivKey, err error) {
if key == "" {
privKey = GeneratePrivKey()
if key, err = Base58Encode(privKey); check(err) {
return
}
viper.Set(keyFlag, key)
return
}
if privKey, err = Base58Decode(key); check(err) {
return
}
return
}
//func GetOrGeneratePrivKey(key string) (privKey crypto.PrivKey, err error) {
//
// if key == "" {
//
// privKey = GeneratePrivKey()
//
// if key, err = Base58Encode(privKey); check(err) {
// return
// }
//
// viper.Set(keyFlag, key)
//
// return
// }
//
// if privKey, err = Base58Decode(key); check(err) {
// return
// }
//
// return
//}

View File

@@ -2,6 +2,7 @@ package seed
import (
"context"
"git-indra.lan/indra-labs/indra/pkg/p2p"
"git-indra.lan/indra-labs/indra/pkg/storage"
"sync"
)
@@ -19,29 +20,51 @@ func Run(ctx context.Context) {
log.I.Ln("running seed")
//
// Storage
//
go storage.Run()
signals:
for {
select {
case err := <-storage.WhenStartupFailed():
log.E.Ln("storage can't start:", err)
startupErrors <- err
return
case <-storage.WhenIsReady():
break signals
case <-ctx.Done():
Shutdown()
return
}
select {
case err := <-storage.WhenStartFailed():
log.E.Ln("storage can't start:", err)
startupErrors <- err
return
case <-storage.WhenReady():
// continue
case <-ctx.Done():
Shutdown()
return
}
// Startup all RPC services
//
// P2P
//
// Startup P2P services
go p2p.Run()
select {
case err := <-p2p.WhenStartFailed():
log.E.Ln("p2p can't start:", err)
startupErrors <- err
return
case <-p2p.WhenReady():
// continue
case <-ctx.Done():
Shutdown()
return
}
//
// RPC
//
//
// Ready!
//
log.I.Ln("seed is ready")
isReadyChan <- true
select {
@@ -55,7 +78,12 @@ func Shutdown() {
log.I.Ln("shutting down seed")
err := storage.Shutdown()
var err error
err = p2p.Shutdown()
check(err)
err = storage.Shutdown()
check(err)
log.I.Ln("seed shutdown completed")

View File

@@ -72,6 +72,9 @@ signals:
}
}
log.I.Ln("running garbage collection before ready")
db.RunValueLogGC(0.5)
log.I.Ln("storage is ready")
isReadyChan <- true
}
@@ -98,9 +101,21 @@ func Shutdown() (err error) {
return
}
func Txn(tx func(txn *badger.Txn) error, update bool) error {
func Txn(tx func(txn *badger.Txn) error, update bool) (err error) {
txn := db.NewTransaction(update)
return tx(txn)
if err = tx(txn); err != nil {
return
}
return txn.Commit()
}
func View(fn func(txn *badger.Txn) error) error {
return db.View(fn)
}
func Update(fn func(txn *badger.Txn) error) error {
return db.Update(fn)
}

View File

@@ -7,7 +7,11 @@ var (
isReadyChan = make(chan bool, 1)
)
func WhenStartupFailed() chan error {
var (
isReady bool
)
func WhenStartFailed() chan error {
return startupErrors
}
@@ -19,6 +23,6 @@ func WhenIsUnlocked() chan bool {
return isUnlockedChan
}
func WhenIsReady() chan bool {
func WhenReady() chan bool {
return isReadyChan
}