Files
p9/cmd/wallet/chainntfns.go
Loki Verloren 0e2bba237a initial commit
2021-05-03 10:43:10 +02:00

315 lines
9.2 KiB
Go

package wallet
import (
"bytes"
"github.com/p9c/p9/pkg/btcaddr"
"strings"
"github.com/p9c/p9/pkg/chainclient"
"github.com/p9c/p9/pkg/txscript"
wm "github.com/p9c/p9/pkg/waddrmgr"
"github.com/p9c/p9/pkg/walletdb"
tm "github.com/p9c/p9/pkg/wtxmgr"
)
func (w *Wallet) handleChainNotifications() {
defer w.wg.Done()
if w == nil {
panic("w should not be nil")
}
chainClient, e := w.requireChainClient()
if e != nil {
E.Ln("handleChainNotifications called without RPC client", e)
return
}
sync := func(w *Wallet) {
if w.db != nil {
// At the moment there is no recourse if the rescan fails for some reason, however, the wallet will not be
// marked synced and many methods will error early since the wallet is known to be out of date.
e := w.syncWithChain()
if e != nil && !w.ShuttingDown() {
W.Ln("unable to synchronize wallet to chain:", e)
}
}
}
catchUpHashes := func(
w *Wallet, client chainclient.Interface,
height int32,
) (e error) {
// TODO(aakselrod): There's a race condition here, which happens when a reorg occurs between the rescanProgress
// notification and the last GetBlockHash call. The solution when using pod is to make pod send blockconnected
// notifications with each block the way Neutrino does, and get rid of the loop. The other alternative is to
// check the final hash and, if it doesn't match the original hash returned by the notification, to roll back
// and restart the rescan.
I.F(
"handleChainNotifications: catching up block hashes to height %d, this might take a while", height,
)
e = walletdb.Update(
w.db, func(tx walletdb.ReadWriteTx) (e error) {
ns := tx.ReadWriteBucket(waddrmgrNamespaceKey)
startBlock := w.Manager.SyncedTo()
for i := startBlock.Height + 1; i <= height; i++ {
hash, e := client.GetBlockHash(int64(i))
if e != nil {
return e
}
header, e := chainClient.GetBlockHeader(hash)
if e != nil {
return e
}
bs := wm.BlockStamp{
Height: i,
Hash: *hash,
Timestamp: header.Timestamp,
}
e = w.Manager.SetSyncedTo(ns, &bs)
if e != nil {
return e
}
}
return nil
},
)
if e != nil {
E.F(
"failed to update address manager sync state for height %d: %v",
height, e,
)
}
I.Ln("done catching up block hashes")
return e
}
for {
select {
case n, ok := <-chainClient.Notifications():
if !ok {
return
}
var notificationName string
var e error
switch n := n.(type) {
case chainclient.ClientConnected:
if w != nil {
go sync(w)
}
case chainclient.BlockConnected:
e = walletdb.Update(
w.db, func(tx walletdb.ReadWriteTx) (e error) {
return w.connectBlock(tx, tm.BlockMeta(n))
},
)
notificationName = "blockconnected"
case chainclient.BlockDisconnected:
e = walletdb.Update(
w.db, func(tx walletdb.ReadWriteTx) (e error) {
return w.disconnectBlock(tx, tm.BlockMeta(n))
},
)
notificationName = "blockdisconnected"
case chainclient.RelevantTx:
e = walletdb.Update(
w.db, func(tx walletdb.ReadWriteTx) (e error) {
return w.addRelevantTx(tx, n.TxRecord, n.Block)
},
)
notificationName = "recvtx/redeemingtx"
case chainclient.FilteredBlockConnected:
// Atomically update for the whole block.
if len(n.RelevantTxs) > 0 {
e = walletdb.Update(
w.db, func(
tx walletdb.ReadWriteTx,
) (e error) {
for _, rec := range n.RelevantTxs {
e = w.addRelevantTx(
tx, rec,
n.Block,
)
if e != nil {
return e
}
}
return nil
},
)
}
notificationName = "filteredblockconnected"
// The following require some database maintenance, but also need to be reported to the wallet's rescan
// goroutine.
case *chainclient.RescanProgress:
e = catchUpHashes(w, chainClient, n.Height)
notificationName = "rescanprogress"
select {
case w.rescanNotifications <- n:
case <-w.quitChan().Wait():
return
}
case *chainclient.RescanFinished:
e = catchUpHashes(w, chainClient, n.Height)
notificationName = "rescanprogress"
w.SetChainSynced(true)
select {
case w.rescanNotifications <- n:
case <-w.quitChan().Wait():
return
}
}
if e != nil {
// On out-of-sync blockconnected notifications, only send a debug message.
errStr := "failed to process consensus server " +
"notification (name: `%s`, detail: `%v`)"
if notificationName == "blockconnected" &&
strings.Contains(
e.Error(),
"couldn't get hash from database",
) {
D.F(errStr, notificationName, e)
} else {
E.F(errStr, notificationName, e)
}
}
case <-w.quit.Wait():
return
}
}
}
// connectBlock handles a chain server notification by marking a wallet that's currently in-sync with the chain server
// as being synced up to the passed block.
func (w *Wallet) connectBlock(dbtx walletdb.ReadWriteTx, b tm.BlockMeta) (e error) {
addrmgrNs := dbtx.ReadWriteBucket(waddrmgrNamespaceKey)
bs := wm.BlockStamp{
Height: b.Height,
Hash: b.Hash,
Timestamp: b.Time,
}
e = w.Manager.SetSyncedTo(addrmgrNs, &bs)
if e != nil {
return e
}
// Notify interested clients of the connected block.
//
// TODO: move all notifications outside of the database transaction.
w.NtfnServer.notifyAttachedBlock(dbtx, &b)
return nil
}
// disconnectBlock handles a chain server reorganize by rolling back all block history from the reorged block for a
// wallet in-sync with the chain server.
func (w *Wallet) disconnectBlock(dbtx walletdb.ReadWriteTx, b tm.BlockMeta) (e error) {
addrmgrNs := dbtx.ReadWriteBucket(waddrmgrNamespaceKey)
txmgrNs := dbtx.ReadWriteBucket(wtxmgrNamespaceKey)
if !w.ChainSynced() {
return nil
}
// Disconnect the removed block and all blocks after it if we know about the disconnected block. Otherwise, the
// block is in the future.
if b.Height <= w.Manager.SyncedTo().Height {
hash, e := w.Manager.BlockHash(addrmgrNs, b.Height)
if e != nil {
return e
}
if bytes.Equal(hash[:], b.Hash[:]) {
bs := wm.BlockStamp{
Height: b.Height - 1,
}
hash, e = w.Manager.BlockHash(addrmgrNs, bs.Height)
if e != nil {
return e
}
b.Hash = *hash
client := w.ChainClient()
header, e := client.GetBlockHeader(hash)
if e != nil {
return e
}
bs.Timestamp = header.Timestamp
e = w.Manager.SetSyncedTo(addrmgrNs, &bs)
if e != nil {
return e
}
e = w.TxStore.Rollback(txmgrNs, b.Height)
if e != nil {
return e
}
}
}
// Notify interested clients of the disconnected block.
w.NtfnServer.notifyDetachedBlock(&b.Hash)
return nil
}
func (w *Wallet) addRelevantTx(dbtx walletdb.ReadWriteTx, rec *tm.TxRecord, block *tm.BlockMeta) (e error) {
addrmgrNs := dbtx.ReadWriteBucket(waddrmgrNamespaceKey)
txmgrNs := dbtx.ReadWriteBucket(wtxmgrNamespaceKey)
// At the moment all notified transactions are assumed to actually be relevant. This assumption will not hold true
// when SPV support is added, but until then, simply insert the transaction because there should either be one or
// more relevant inputs or outputs.
e = w.TxStore.InsertTx(txmgrNs, rec, block)
if e != nil {
return e
}
// Chk every output to determine whether it is controlled by a wallet key. If so, mark the output as a credit.
for i, output := range rec.MsgTx.TxOut {
var addrs []btcaddr.Address
_, addrs, _, e = txscript.ExtractPkScriptAddrs(
output.PkScript,
w.chainParams,
)
if e != nil {
// Non-standard outputs are skipped.
continue
}
for _, addr := range addrs {
ma, e := w.Manager.Address(addrmgrNs, addr)
if e == nil {
// TODO: Credits should be added with the account they belong to, so tm is able to track per-account
// balances.
e = w.TxStore.AddCredit(
txmgrNs, rec, block, uint32(i),
ma.Internal(),
)
if e != nil {
return e
}
e = w.Manager.MarkUsed(addrmgrNs, addr)
if e != nil {
return e
}
T.Ln("marked address used:", addr)
continue
}
// Missing addresses are skipped. Other errors should be propagated.
if !wm.IsError(e, wm.ErrAddressNotFound) {
return e
}
}
}
// Send notification of mined or unmined transaction to any interested clients.
//
// TODO: Avoid the extra db hits.
if block == nil {
details, e := w.TxStore.UniqueTxDetails(txmgrNs, &rec.Hash, nil)
if e != nil {
E.Ln("cannot query transaction details for notification:", e)
}
// It's possible that the transaction was not found within the wallet's set of unconfirmed transactions due to
// it already being confirmed, so we'll avoid notifying it.
//
// TODO(wilmer): ideally we should find the culprit to why we're receiving an additional unconfirmed
// chain.RelevantTx notification from the chain backend.
if details != nil {
w.NtfnServer.notifyUnminedTransaction(dbtx, details)
}
} else {
details, e := w.TxStore.UniqueTxDetails(txmgrNs, &rec.Hash, &block.Block)
if e != nil {
E.Ln("cannot query transaction details for notification:", e)
}
// We'll only notify the transaction if it was found within the wallet's set of confirmed transactions.
if details != nil {
w.NtfnServer.notifyMinedTransaction(dbtx, details, block)
}
}
return nil
}