Files
p9/cmd/wallet/main.go
Loki Verloren 0e2bba237a initial commit
2021-05-03 10:43:10 +02:00

288 lines
8.4 KiB
Go

package wallet
import (
"fmt"
// This enables pprof
// _ "net/http/pprof"
"sync"
"github.com/p9c/p9/pkg/qu"
"github.com/p9c/p9/pkg/log"
"github.com/p9c/p9/pkg/chaincfg"
"github.com/p9c/p9/pod/config"
"github.com/p9c/p9/pod/state"
"github.com/p9c/p9/pkg/interrupt"
"github.com/p9c/p9/pkg/chainclient"
)
// Main is a work-around main function that is required since deferred functions
// (such as log flushing) are not called with calls to os.Exit. Instead, main
// runs this function and checks for a non-nil error, at point any defers have
// already run, and if the error is non-nil, the program can be exited with an
// error exit status.
func Main(cx *state.State) (e error) {
// cx.WaitGroup.Add(1)
cx.WaitAdd()
// if *config.Profile != "" {
// go func() {
// listenAddr := net.JoinHostPort("127.0.0.1", *config.Profile)
// I.Ln("profile server listening on", listenAddr)
// profileRedirect := http.RedirectHandler("/debug/pprof",
// http.StatusSeeOther)
// http.Handle("/", profileRedirect)
// fmt.Println(http.ListenAndServe(listenAddr, nil))
// }()
// }
loader := NewLoader(cx.ActiveNet, cx.Config.WalletFile.V(), 250)
// Create and start HTTP server to serve wallet client connections. This will be updated with the wallet and chain
// server RPC client created below after each is created.
D.Ln("starting RPC servers")
var legacyServer *Server
if legacyServer, e = startRPCServers(cx, loader); E.Chk(e) {
E.Ln("unable to create RPC servers:", e)
return
}
loader.RunAfterLoad(
func(w *Wallet) {
D.Ln("starting wallet RPC services", w != nil)
startWalletRPCServices(w, legacyServer)
// cx.WalletChan <- w
},
)
if !cx.Config.NoInitialLoad.True() {
go func() {
D.Ln("loading wallet", cx.Config.WalletPass.V())
if e = LoadWallet(loader, cx, legacyServer); E.Chk(e) {
}
}()
}
interrupt.AddHandler(cx.WalletKill.Q)
select {
case <-cx.WalletKill.Wait():
D.Ln("wallet killswitch activated")
if legacyServer != nil {
D.Ln("stopping wallet RPC server")
legacyServer.Stop()
I.Ln("stopped wallet RPC server")
}
I.Ln("wallet shutdown from killswitch complete")
cx.WaitDone()
return
case <-cx.KillAll.Wait():
D.Ln("killall")
cx.WalletKill.Q()
case <-interrupt.HandlersDone.Wait():
}
I.Ln("wallet shutdown complete")
cx.WaitDone()
return
}
// LoadWallet ...
func LoadWallet(
loader *Loader, cx *state.State, legacyServer *Server,
) (e error) {
T.Ln("starting rpc client connection handler", cx.Config.WalletPass.V())
// Create and start chain RPC client so it's ready to connect to the wallet when
// loaded later. Load the wallet database. It must have been created already or
// this will return an appropriate error.
var w *Wallet
T.Ln("opening existing wallet, pass:", cx.Config.WalletPass.V())
if w, e = loader.OpenExistingWallet(cx.Config.WalletPass.Bytes(), true, cx.Config, nil); E.Chk(e) {
T.Ln("failed to open existing wallet")
return
}
T.Ln("opened existing wallet")
// go func() {
// W.Ln("refilling mining addresses", cx.Config, cx.StateCfg)
// addresses.RefillMiningAddresses(w, cx.Config, cx.StateCfg)
// W.Ln("done refilling mining addresses")
// D.S(*cx.Config.MiningAddrs)
// save.Save(cx.Config)
// }()
loader.Wallet = w
// D.Ln("^^^^^^^^^^^ sending back wallet")
// cx.WalletChan <- w
T.Ln("starting rpcClientConnectLoop")
go rpcClientConnectLoop(cx, legacyServer, loader)
T.Ln("adding interrupt handler to unload wallet")
// Add interrupt handlers to shutdown the various process components before
// exiting. Interrupt handlers run in LIFO order, so the wallet (which should be
// closed last) is added first.
interrupt.AddHandler(
func() {
D.Ln("wallet.CtlMain interrupt")
e := loader.UnloadWallet()
if e != nil && e != ErrNotLoaded {
E.Ln("failed to close wallet:", e)
}
},
)
if legacyServer != nil {
interrupt.AddHandler(
func() {
D.Ln("stopping wallet RPC server")
legacyServer.Stop()
D.Ln("wallet RPC server shutdown")
},
)
}
go func() {
select {
case <-cx.KillAll.Wait():
case <-legacyServer.RequestProcessShutdownChan().Wait():
}
interrupt.Request()
}()
return
}
// rpcClientConnectLoop continuously attempts a connection to the consensus RPC
// server. When a connection is established, the client is used to sync the
// loaded wallet, either immediately or when loaded at a later time.
//
// The legacy RPC is optional. If set, the connected RPC client will be
// associated with the server for RPC pass-through and to enable additional
// methods.
func rpcClientConnectLoop(
cx *state.State, legacyServer *Server,
loader *Loader,
) {
T.Ln("rpcClientConnectLoop", log.Caller("which was started at:", 2))
// var certs []byte
// if !cx.PodConfig.UseSPV {
certs := cx.Config.ReadCAFile()
// }
for {
var (
chainClient chainclient.Interface
e error
)
// if cx.PodConfig.UseSPV {
// var (
// chainService *neutrino.ChainService
// spvdb walletdb.DB
// )
// netDir := networkDir(cx.PodConfig.AppDataDir.value, ActiveNet.Params)
// spvdb, e = walletdb.Create("bdb",
// filepath.Join(netDir, "neutrino.db"))
// defer spvdb.Close()
// if e != nil {
// log<-cl.Errorf{"unable to create Neutrino DB: %s", e)
// continue
// }
// chainService, e = neutrino.NewChainService(
// neutrino.Config{
// DataDir: netDir,
// Database: spvdb,
// ChainParams: *ActiveNet.Params,
// ConnectPeers: cx.PodConfig.ConnectPeers,
// AddPeers: cx.PodConfig.AddPeers,
// })
// if e != nil {
// log<-cl.Errorf{"couldn't create Neutrino ChainService: %s", e)
// continue
// }
// chainClient = chain.NewNeutrinoClient(ActiveNet.Params, chainService)
// e = chainClient.Start()
// if e != nil {
// log<-cl.Errorf{"couldn't start Neutrino client: %s", e)
// }
// } else {
var cc *chainclient.RPCClient
T.Ln("starting wallet's ChainClient")
cc, e = StartChainRPC(cx.Config, cx.ActiveNet, certs, cx.KillAll)
if e != nil {
E.Ln(
"unable to open connection to consensus RPC server:", e,
)
continue
}
T.Ln("storing chain client")
cx.ChainClient = cc
cx.ChainClientReady.Q()
chainClient = cc
// Rather than inlining this logic directly into the loader callback, a function
// variable is used to avoid running any of this after the client disconnects by
// setting it to nil. This prevents the callback from associating a wallet
// loaded at a later time with a client that has already disconnected. A mutex
// is used to make this concurrent safe.
associateRPCClient := func(w *Wallet) {
T.Ln("associating chain client")
if w != nil {
w.SynchronizeRPC(chainClient)
}
if legacyServer != nil {
legacyServer.SetChainServer(chainClient)
}
}
T.Ln("adding wallet loader hook to connect to chain")
mu := new(sync.Mutex)
loader.RunAfterLoad(
func(w *Wallet) {
T.Ln("running associate chain client")
mu.Lock()
associate := associateRPCClient
mu.Unlock()
if associate != nil {
associate(w)
T.Ln("wallet is now associated by chain client")
} else {
T.Ln("wallet chain client associate function is nil")
}
},
)
chainClient.WaitForShutdown()
mu.Lock()
associateRPCClient = nil
mu.Unlock()
loadedWallet, ok := loader.LoadedWallet()
if ok {
// Do not attempt a reconnect when the wallet was explicitly stopped.
if loadedWallet.ShuttingDown() {
return
}
loadedWallet.SetChainSynced(false)
// TODO: Rework the wallet so changing the RPC client does not
// require stopping and restarting everything.
loadedWallet.Stop()
loadedWallet.WaitForShutdown()
loadedWallet.Start()
}
}
}
// StartChainRPC opens a RPC client connection to a pod server for blockchain
// services. This function uses the RPC options from the global config and there
// is no recovery in case the server is not available or if there is an
// authentication error. Instead, all requests to the client will simply error.
func StartChainRPC(
config *config.Config,
activeNet *chaincfg.Params,
certs []byte,
quit qu.C,
) (rpcC *chainclient.RPCClient, e error) {
D.F(
"attempting RPC client connection to %v, TLS: %s",
config.RPCConnect.V(),
fmt.Sprint(config.ClientTLS.True()),
)
if rpcC, e = chainclient.NewRPCClient(
activeNet,
config.RPCConnect.V(),
config.Username.V(),
config.Password.V(),
certs,
config.ClientTLS.True(),
0,
quit,
); E.Chk(e) {
return nil, e
}
e = rpcC.Start()
return rpcC, e
}