refactoring seed service.

This commit is contained in:
greg stone
2023-03-01 04:06:24 +00:00
parent c5bb21e4d8
commit d83f46b9f9
10 changed files with 158 additions and 124 deletions

View File

@@ -13,10 +13,10 @@ import (
func init() { func init() {
//// Init flags belonging to the seed package //// Init flags belonging to the seed package
//seed.InitFlags(seedServeCommand) //seed.InitFlags(seedServeCmd)
// //
//// Init flags belonging to the rpc package //// Init flags belonging to the rpc package
//rpc.InitFlags(seedServeCommand) //rpc.InitFlags(seedServeCmd)
seedCommand.AddCommand(seedRPCCmd) seedCommand.AddCommand(seedRPCCmd)

View File

@@ -15,14 +15,14 @@ import (
func init() { func init() {
storage.InitFlags(seedServeCommand) storage.InitFlags(seedServeCmd)
p2p.InitFlags(seedServeCommand) p2p.InitFlags(seedServeCmd)
rpc.InitFlags(seedServeCommand) rpc.InitFlags(seedServeCmd)
seedCommand.AddCommand(seedServeCommand) seedCommand.AddCommand(seedServeCmd)
} }
var seedServeCommand = &cobra.Command{ var seedServeCmd = &cobra.Command{
Use: "serve", Use: "serve",
Short: "Serves an instance of the seed node", Short: "Serves an instance of the seed node",
Long: `Serves an instance of the seed node.`, Long: `Serves an instance of the seed node.`,
@@ -31,7 +31,6 @@ var seedServeCommand = &cobra.Command{
log.I.Ln("-- ", log2.App, "("+viper.GetString("network")+") -", indra.SemVer, "- Network Freedom. --") log.I.Ln("-- ", log2.App, "("+viper.GetString("network")+") -", indra.SemVer, "- Network Freedom. --")
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
interrupt.AddHandler(cancel) interrupt.AddHandler(cancel)
// Seed // // Seed //
@@ -39,10 +38,9 @@ var seedServeCommand = &cobra.Command{
go seed.Run(ctx) go seed.Run(ctx)
select { select {
case err := <-seed.WhenStartFailed(): case <-seed.WhenStartFailed():
log.E.Ln("startup error:", err) log.I.Ln("stopped")
return case <-seed.WhenShutdown():
case <-seed.IsShutdown():
log.I.Ln("shutdown complete") log.I.Ln("shutdown complete")
} }

View File

@@ -4,14 +4,14 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
func init() {
server = grpc.NewServer()
}
var ( var (
server *grpc.Server server *grpc.Server
) )
var (
running bool
)
func RunWith(r func(srv *grpc.Server), opts ...ServerOption) { func RunWith(r func(srv *grpc.Server), opts ...ServerOption) {
log.I.Ln("initializing the rpc server") log.I.Ln("initializing the rpc server")
@@ -21,7 +21,9 @@ func RunWith(r func(srv *grpc.Server), opts ...ServerOption) {
for _, opt := range opts { for _, opt := range opts {
opt.apply(&serverOpts) opt.apply(&serverOpts)
} }
server = grpc.NewServer()
configureUnixSocket() configureUnixSocket()
configureTunnel() configureTunnel()
@@ -36,13 +38,15 @@ func Start() {
var err error var err error
if err = startTunnel(server); check(err) {
startupErrors <- err
}
if err = startUnixSocket(server); check(err) { if err = startUnixSocket(server); check(err) {
startupErrors <- err startupErrors <- err
} }
if err = startTunnel(server); check(err) { running = true
startupErrors <- err
}
log.I.Ln("rpc server is ready") log.I.Ln("rpc server is ready")
@@ -51,10 +55,25 @@ func Start() {
func Shutdown() { func Shutdown() {
if !running {
return
}
log.I.Ln("shutting down rpc server") log.I.Ln("shutting down rpc server")
stopUnixSocket() server.GracefulStop()
stopTunnel()
server.Stop() var err error
//err = stopUnixSocket()
//
//check(err)
err = stopTunnel()
check(err)
running = false
log.I.Ln("- rpc server shutdown completed")
} }

View File

@@ -34,7 +34,7 @@ func stopUnixSocket() (err error) {
} }
if unixSock != nil { if unixSock != nil {
if err = unixSock.Close(); check(err) { if err = unixSock.Close(); err != nil {
// continue // continue
} }
} }

View File

@@ -2,9 +2,7 @@ package seed
import ( import (
"context" "context"
"git-indra.lan/indra-labs/indra/pkg/rpc"
"git-indra.lan/indra-labs/indra/pkg/storage" "git-indra.lan/indra-labs/indra/pkg/storage"
"google.golang.org/grpc"
"sync" "sync"
) )
@@ -26,41 +24,42 @@ func Run(ctx context.Context) {
signals: signals:
for { for {
select { select {
case <-storage.WhenIsLocked(): case err := <-storage.WhenStartupFailed():
log.E.Ln("storage can't start:", err)
log.I.Ln("storage is locked")
// Run an unlock RPC server
go rpc.RunWith(
func(srv *grpc.Server) {
storage.RegisterUnlockServiceServer(srv, storage.NewUnlockService())
},
rpc.WithDisableTunnel(),
)
case err := <-rpc.WhenStartFailed():
startupErrors <- err startupErrors <- err
case <-rpc.IsReady(): return
log.I.Ln("waiting for unlock")
case <-storage.WhenIsUnlocked():
log.I.Ln("restarting rpc server")
// Shut down unlock RPC server to we can launch the main one
rpc.Shutdown()
//go rpc.RunWith(func(srv *grpc.Server) {
// chat.RegisterChatServiceServer(srv, &chat.Server{})
//})
case <-storage.WhenIsReady(): case <-storage.WhenIsReady():
//log.I.Ln("shutting down rpc server, with unlock service")
//
//rpc.Shutdown()
break signals break signals
//case <-storage.WhenIsLocked():
//
// log.I.Ln("running rpc server, with unlock service")
//
// go rpc.RunWith(
// func(srv *grpc.Server) {
// storage.RegisterUnlockServiceServer(srv, storage.NewUnlockService())
// },
// rpc.WithDisableTunnel(),
// )
//
//case err := <-rpc.WhenStartFailed():
// startupErrors <- err
//case <-rpc.IsReady():
// log.I.Ln("... awaiting unlock over rpc")
case <-ctx.Done(): case <-ctx.Done():
Shutdown() Shutdown()
return return
} }
} }
// Startup all RPC services
// Startup P2P services
log.I.Ln("seed is ready") log.I.Ln("seed is ready")
isReadyChan <- true isReadyChan <- true
@@ -76,10 +75,10 @@ func Shutdown() {
log.I.Ln("shutting down seed") log.I.Ln("shutting down seed")
rpc.Shutdown()
err := storage.Shutdown() err := storage.Shutdown()
check(err) check(err)
log.I.Ln("seed shutdown completed")
isShutdownChan <- true isShutdownChan <- true
} }

View File

@@ -10,10 +10,10 @@ func WhenStartFailed() chan error {
return startupErrors return startupErrors
} }
func IsReady() chan bool { func WhenReady() chan bool {
return isReadyChan return isReadyChan
} }
func IsShutdown() chan bool { func WhenShutdown() chan bool {
return isShutdownChan return isShutdownChan
} }

View File

@@ -1,8 +1,9 @@
package storage package storage
import ( import (
"git-indra.lan/indra-labs/indra/pkg/rpc"
"github.com/dgraph-io/badger/v3" "github.com/dgraph-io/badger/v3"
"github.com/spf13/viper" "google.golang.org/grpc"
"sync" "sync"
) )
@@ -16,6 +17,27 @@ var (
running sync.Mutex running sync.Mutex
) )
func run() {
if noKeyProvided {
log.I.Ln("storage is locked")
isLockedChan <- true
return
}
log.I.Ln("attempting to unlock database")
isUnlocked, err := attempt_unlock()
if !isUnlocked {
log.I.Ln("unlock attempt failed")
startupErrors <- err
}
log.I.Ln("successfully unlocked database")
isUnlockedChan <- true
}
func Run() { func Run() {
if !running.TryLock() { if !running.TryLock() {
@@ -24,9 +46,30 @@ func Run() {
configure() configure()
if !attempt_unlock() { run()
isLockedChan <- true
return signals:
for {
select {
case err := <-rpc.WhenStartFailed():
startupErrors <- err
return
case <-WhenIsUnlocked():
rpc.Shutdown()
break signals
case <-WhenIsLocked():
log.I.Ln("running rpc server, with unlock service")
go rpc.RunWith(
func(srv *grpc.Server) {
RegisterUnlockServiceServer(srv, NewUnlockService())
},
rpc.WithDisableTunnel(),
)
case <-rpc.IsReady():
log.I.Ln("... awaiting unlock over rpc")
}
} }
log.I.Ln("storage is ready") log.I.Ln("storage is ready")
@@ -35,21 +78,22 @@ func Run() {
func Shutdown() (err error) { func Shutdown() (err error) {
rpc.Shutdown()
log.I.Ln("shutting down storage") log.I.Ln("shutting down storage")
if db == nil { if db == nil {
log.I.Ln("- storage was never started")
return nil return nil
} }
if db.IsClosed() { log.I.Ln("- storage db closing, it may take a minute...")
return nil
if err = db.Close(); err != nil {
log.W.Ln("- storage shutdown warning: ", err)
} }
if err = db.Close(); check(err) { log.I.Ln("- storage shutdown completed")
return
}
log.I.Ln("storage shutdown completed")
return return
} }
@@ -60,29 +104,3 @@ func Txn(tx func(txn *badger.Txn) error, update bool) error {
return tx(txn) return tx(txn)
} }
func attempt_unlock() bool {
if noKeyProvided {
return false
}
var err error
log.I.Ln("attempting to unlock database")
opts = badger.DefaultOptions(viper.GetString(storeFilePathFlag))
opts.Logger = nil
opts.IndexCacheSize = 128 << 20
opts.EncryptionKey = key.Bytes()
if db, err = badger.Open(opts); check(err) {
startupErrors <- err
return false
}
log.I.Ln("successfully unlocked database")
isUnlockedChan <- true
return true
}

View File

@@ -2,52 +2,29 @@ package storage
import ( import (
"context" "context"
"github.com/dgraph-io/badger/v3"
"github.com/spf13/viper"
) )
type Service struct { type Service struct{}
success chan bool
}
func (s *Service) IsSuccessful() chan bool {
return s.success
}
func (s *Service) Unlock(ctx context.Context, req *UnlockRequest) (res *UnlockResponse, err error) { func (s *Service) Unlock(ctx context.Context, req *UnlockRequest) (res *UnlockResponse, err error) {
var key Key
key.Decode(req.Key) key.Decode(req.Key)
opts = badger.DefaultOptions(viper.GetString(storeFilePathFlag)) isUnlocked, err := attempt_unlock()
opts.Logger = nil
opts.IndexCacheSize = 128 << 20
opts.EncryptionKey = key.Bytes()
if db, err = badger.Open(opts); err != nil { if !isUnlocked {
log.I.Ln("unlock attempt failed:", err) log.I.Ln("unlock attempt failed:", err)
return &UnlockResponse{ return &UnlockResponse{Success: false}, err
Success: false,
}, err
} }
s.success <- true log.I.Ln("successfully unlocked database")
isUnlockedChan <- true isUnlockedChan <- true
log.I.Ln("unlock successful") return &UnlockResponse{Success: true}, nil
return &UnlockResponse{
Success: true,
}, nil
} }
func (s *Service) mustEmbedUnimplementedUnlockServiceServer() {} func (s *Service) mustEmbedUnimplementedUnlockServiceServer() {}
func NewUnlockService() *Service { func NewUnlockService() *Service { return &Service{} }
return &Service{
success: make(chan bool, 1),
}
}

View File

@@ -7,7 +7,7 @@ var (
isReadyChan = make(chan bool, 1) isReadyChan = make(chan bool, 1)
) )
func CantStart() chan error { func WhenStartupFailed() chan error {
return startupErrors return startupErrors
} }

23
pkg/storage/unlock.go Normal file
View File

@@ -0,0 +1,23 @@
package storage
import (
"github.com/dgraph-io/badger/v3"
"github.com/spf13/viper"
)
func attempt_unlock() (isUnlocked bool, err error) {
opts = badger.DefaultOptions(viper.GetString(storeFilePathFlag))
opts.Logger = nil
opts.IndexCacheSize = 128 << 20
opts.EncryptionKey = key.Bytes()
if db, err = badger.Open(opts); err != nil {
db = nil
return false, err
}
return true, nil
}