diff --git a/cmd/indra/seed_rpc.go b/cmd/indra/seed_rpc.go index 4fcfd783..b0be899b 100644 --- a/cmd/indra/seed_rpc.go +++ b/cmd/indra/seed_rpc.go @@ -13,10 +13,10 @@ import ( func init() { //// Init flags belonging to the seed package - //seed.InitFlags(seedServeCommand) + //seed.InitFlags(seedServeCmd) // //// Init flags belonging to the rpc package - //rpc.InitFlags(seedServeCommand) + //rpc.InitFlags(seedServeCmd) seedCommand.AddCommand(seedRPCCmd) diff --git a/cmd/indra/seed_serve.go b/cmd/indra/seed_serve.go index 4552a8ae..41f5078c 100644 --- a/cmd/indra/seed_serve.go +++ b/cmd/indra/seed_serve.go @@ -15,14 +15,14 @@ import ( func init() { - storage.InitFlags(seedServeCommand) - p2p.InitFlags(seedServeCommand) - rpc.InitFlags(seedServeCommand) + storage.InitFlags(seedServeCmd) + p2p.InitFlags(seedServeCmd) + rpc.InitFlags(seedServeCmd) - seedCommand.AddCommand(seedServeCommand) + seedCommand.AddCommand(seedServeCmd) } -var seedServeCommand = &cobra.Command{ +var seedServeCmd = &cobra.Command{ Use: "serve", Short: "Serves an instance of the seed node", Long: `Serves an instance of the seed node.`, @@ -31,7 +31,6 @@ var seedServeCommand = &cobra.Command{ log.I.Ln("-- ", log2.App, "("+viper.GetString("network")+") -", indra.SemVer, "- Network Freedom. --") ctx, cancel := context.WithCancel(context.Background()) - interrupt.AddHandler(cancel) // Seed // @@ -39,10 +38,9 @@ var seedServeCommand = &cobra.Command{ go seed.Run(ctx) select { - case err := <-seed.WhenStartFailed(): - log.E.Ln("startup error:", err) - return - case <-seed.IsShutdown(): + case <-seed.WhenStartFailed(): + log.I.Ln("stopped") + case <-seed.WhenShutdown(): log.I.Ln("shutdown complete") } diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go index 550b2eba..c092c539 100644 --- a/pkg/rpc/server.go +++ b/pkg/rpc/server.go @@ -4,14 +4,14 @@ import ( "google.golang.org/grpc" ) -func init() { - server = grpc.NewServer() -} - var ( server *grpc.Server ) +var ( + running bool +) + func RunWith(r func(srv *grpc.Server), opts ...ServerOption) { log.I.Ln("initializing the rpc server") @@ -21,7 +21,9 @@ func RunWith(r func(srv *grpc.Server), opts ...ServerOption) { for _, opt := range opts { opt.apply(&serverOpts) } - + + server = grpc.NewServer() + configureUnixSocket() configureTunnel() @@ -36,13 +38,15 @@ func Start() { var err error + if err = startTunnel(server); check(err) { + startupErrors <- err + } + if err = startUnixSocket(server); check(err) { startupErrors <- err } - if err = startTunnel(server); check(err) { - startupErrors <- err - } + running = true log.I.Ln("rpc server is ready") @@ -51,10 +55,25 @@ func Start() { func Shutdown() { + if !running { + return + } + log.I.Ln("shutting down rpc server") - stopUnixSocket() - stopTunnel() + server.GracefulStop() - server.Stop() + var err error + + //err = stopUnixSocket() + // + //check(err) + + err = stopTunnel() + + check(err) + + running = false + + log.I.Ln("- rpc server shutdown completed") } diff --git a/pkg/rpc/socket_unix.go b/pkg/rpc/socket_unix.go index 7703edbc..a06fd833 100644 --- a/pkg/rpc/socket_unix.go +++ b/pkg/rpc/socket_unix.go @@ -34,7 +34,7 @@ func stopUnixSocket() (err error) { } if unixSock != nil { - if err = unixSock.Close(); check(err) { + if err = unixSock.Close(); err != nil { // continue } } diff --git a/pkg/seed/server.go b/pkg/seed/server.go index b1ee9291..fb68eecc 100644 --- a/pkg/seed/server.go +++ b/pkg/seed/server.go @@ -2,9 +2,7 @@ package seed import ( "context" - "git-indra.lan/indra-labs/indra/pkg/rpc" "git-indra.lan/indra-labs/indra/pkg/storage" - "google.golang.org/grpc" "sync" ) @@ -26,41 +24,42 @@ func Run(ctx context.Context) { signals: for { select { - case <-storage.WhenIsLocked(): - - log.I.Ln("storage is locked") - - // Run an unlock RPC server - go rpc.RunWith( - func(srv *grpc.Server) { - storage.RegisterUnlockServiceServer(srv, storage.NewUnlockService()) - }, - rpc.WithDisableTunnel(), - ) - - case err := <-rpc.WhenStartFailed(): + case err := <-storage.WhenStartupFailed(): + log.E.Ln("storage can't start:", err) startupErrors <- err - case <-rpc.IsReady(): - log.I.Ln("waiting for unlock") - case <-storage.WhenIsUnlocked(): - - log.I.Ln("restarting rpc server") - - // Shut down unlock RPC server to we can launch the main one - rpc.Shutdown() - - //go rpc.RunWith(func(srv *grpc.Server) { - // chat.RegisterChatServiceServer(srv, &chat.Server{}) - //}) - + return case <-storage.WhenIsReady(): + + //log.I.Ln("shutting down rpc server, with unlock service") + // + //rpc.Shutdown() + break signals + //case <-storage.WhenIsLocked(): + // + // log.I.Ln("running rpc server, with unlock service") + // + // go rpc.RunWith( + // func(srv *grpc.Server) { + // storage.RegisterUnlockServiceServer(srv, storage.NewUnlockService()) + // }, + // rpc.WithDisableTunnel(), + // ) + // + //case err := <-rpc.WhenStartFailed(): + // startupErrors <- err + //case <-rpc.IsReady(): + // log.I.Ln("... awaiting unlock over rpc") case <-ctx.Done(): Shutdown() return } } + // Startup all RPC services + + // Startup P2P services + log.I.Ln("seed is ready") isReadyChan <- true @@ -76,10 +75,10 @@ func Shutdown() { log.I.Ln("shutting down seed") - rpc.Shutdown() - err := storage.Shutdown() check(err) + log.I.Ln("seed shutdown completed") + isShutdownChan <- true } diff --git a/pkg/seed/signals.go b/pkg/seed/signals.go index 2cec9a98..50c069a2 100644 --- a/pkg/seed/signals.go +++ b/pkg/seed/signals.go @@ -10,10 +10,10 @@ func WhenStartFailed() chan error { return startupErrors } -func IsReady() chan bool { +func WhenReady() chan bool { return isReadyChan } -func IsShutdown() chan bool { +func WhenShutdown() chan bool { return isShutdownChan } diff --git a/pkg/storage/service.go b/pkg/storage/service.go index 3b42dd26..54da563d 100644 --- a/pkg/storage/service.go +++ b/pkg/storage/service.go @@ -1,8 +1,9 @@ package storage import ( + "git-indra.lan/indra-labs/indra/pkg/rpc" "github.com/dgraph-io/badger/v3" - "github.com/spf13/viper" + "google.golang.org/grpc" "sync" ) @@ -16,6 +17,27 @@ var ( running sync.Mutex ) +func run() { + + if noKeyProvided { + log.I.Ln("storage is locked") + isLockedChan <- true + + return + } + + log.I.Ln("attempting to unlock database") + isUnlocked, err := attempt_unlock() + + if !isUnlocked { + log.I.Ln("unlock attempt failed") + startupErrors <- err + } + + log.I.Ln("successfully unlocked database") + isUnlockedChan <- true +} + func Run() { if !running.TryLock() { @@ -24,9 +46,30 @@ func Run() { configure() - if !attempt_unlock() { - isLockedChan <- true - return + run() + +signals: + for { + select { + case err := <-rpc.WhenStartFailed(): + startupErrors <- err + return + case <-WhenIsUnlocked(): + rpc.Shutdown() + break signals + case <-WhenIsLocked(): + + log.I.Ln("running rpc server, with unlock service") + + go rpc.RunWith( + func(srv *grpc.Server) { + RegisterUnlockServiceServer(srv, NewUnlockService()) + }, + rpc.WithDisableTunnel(), + ) + case <-rpc.IsReady(): + log.I.Ln("... awaiting unlock over rpc") + } } log.I.Ln("storage is ready") @@ -35,21 +78,22 @@ func Run() { func Shutdown() (err error) { + rpc.Shutdown() + log.I.Ln("shutting down storage") if db == nil { + log.I.Ln("- storage was never started") return nil } - if db.IsClosed() { - return nil + log.I.Ln("- storage db closing, it may take a minute...") + + if err = db.Close(); err != nil { + log.W.Ln("- storage shutdown warning: ", err) } - if err = db.Close(); check(err) { - return - } - - log.I.Ln("storage shutdown completed") + log.I.Ln("- storage shutdown completed") return } @@ -60,29 +104,3 @@ func Txn(tx func(txn *badger.Txn) error, update bool) error { return tx(txn) } - -func attempt_unlock() bool { - - if noKeyProvided { - return false - } - - var err error - - log.I.Ln("attempting to unlock database") - - opts = badger.DefaultOptions(viper.GetString(storeFilePathFlag)) - opts.Logger = nil - opts.IndexCacheSize = 128 << 20 - opts.EncryptionKey = key.Bytes() - - if db, err = badger.Open(opts); check(err) { - startupErrors <- err - return false - } - - log.I.Ln("successfully unlocked database") - isUnlockedChan <- true - - return true -} diff --git a/pkg/storage/service_unlock.go b/pkg/storage/service_unlock.go index 8cca378d..04df9b53 100644 --- a/pkg/storage/service_unlock.go +++ b/pkg/storage/service_unlock.go @@ -2,52 +2,29 @@ package storage import ( "context" - "github.com/dgraph-io/badger/v3" - "github.com/spf13/viper" ) -type Service struct { - success chan bool -} - -func (s *Service) IsSuccessful() chan bool { - return s.success -} +type Service struct{} func (s *Service) Unlock(ctx context.Context, req *UnlockRequest) (res *UnlockResponse, err error) { - var key Key - key.Decode(req.Key) - opts = badger.DefaultOptions(viper.GetString(storeFilePathFlag)) - opts.Logger = nil - opts.IndexCacheSize = 128 << 20 - opts.EncryptionKey = key.Bytes() + isUnlocked, err := attempt_unlock() - if db, err = badger.Open(opts); err != nil { + if !isUnlocked { log.I.Ln("unlock attempt failed:", err) - return &UnlockResponse{ - Success: false, - }, err + return &UnlockResponse{Success: false}, err } - s.success <- true + log.I.Ln("successfully unlocked database") isUnlockedChan <- true - log.I.Ln("unlock successful") - - return &UnlockResponse{ - Success: true, - }, nil + return &UnlockResponse{Success: true}, nil } func (s *Service) mustEmbedUnimplementedUnlockServiceServer() {} -func NewUnlockService() *Service { - return &Service{ - success: make(chan bool, 1), - } -} +func NewUnlockService() *Service { return &Service{} } diff --git a/pkg/storage/signals.go b/pkg/storage/signals.go index 07cedd13..ca530760 100644 --- a/pkg/storage/signals.go +++ b/pkg/storage/signals.go @@ -7,7 +7,7 @@ var ( isReadyChan = make(chan bool, 1) ) -func CantStart() chan error { +func WhenStartupFailed() chan error { return startupErrors } diff --git a/pkg/storage/unlock.go b/pkg/storage/unlock.go new file mode 100644 index 00000000..f2ba7c5c --- /dev/null +++ b/pkg/storage/unlock.go @@ -0,0 +1,23 @@ +package storage + +import ( + "github.com/dgraph-io/badger/v3" + "github.com/spf13/viper" +) + +func attempt_unlock() (isUnlocked bool, err error) { + + opts = badger.DefaultOptions(viper.GetString(storeFilePathFlag)) + opts.Logger = nil + opts.IndexCacheSize = 128 << 20 + opts.EncryptionKey = key.Bytes() + + if db, err = badger.Open(opts); err != nil { + + db = nil + + return false, err + } + + return true, nil +}