diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 21140994..134c591b 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -76,6 +76,16 @@ type BitcoindNotifier struct { bestBlock chainntnfs.BlockEpoch + // spendHintCache is a cache used to query and update the latest height + // hints for an outpoint. Each height hint represents the earliest + // height at which the outpoint could have been spent within the chain. + spendHintCache chainntnfs.SpendHintCache + + // confirmHintCache is a cache used to query the latest height hints for + // a transaction. Each height hint represents the earliest height at + // which the transaction could have confirmed within the chain. + confirmHintCache chainntnfs.ConfirmHintCache + wg sync.WaitGroup quit chan struct{} } @@ -87,7 +97,9 @@ var _ chainntnfs.ChainNotifier = (*BitcoindNotifier)(nil) // New returns a new BitcoindNotifier instance. This function assumes the // bitcoind node detailed in the passed configuration is already running, and // willing to accept RPC requests and new zmq clients. -func New(chainConn *chain.BitcoindConn) *BitcoindNotifier { +func New(chainConn *chain.BitcoindConn, spendHintCache chainntnfs.SpendHintCache, + confirmHintCache chainntnfs.ConfirmHintCache) *BitcoindNotifier { + notifier := &BitcoindNotifier{ notificationCancels: make(chan interface{}), notificationRegistry: make(chan interface{}), @@ -96,6 +108,9 @@ func New(chainConn *chain.BitcoindConn) *BitcoindNotifier { spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), + spendHintCache: spendHintCache, + confirmHintCache: confirmHintCache, + quit: make(chan struct{}), } @@ -127,7 +142,8 @@ func (b *BitcoindNotifier) Start() error { } b.txConfNotifier = chainntnfs.NewTxConfNotifier( - uint32(currentHeight), reorgSafetyLimit) + uint32(currentHeight), reorgSafetyLimit, b.confirmHintCache, + ) b.bestBlock = chainntnfs.BlockEpoch{ Height: currentHeight, @@ -571,9 +587,6 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err return fmt.Errorf("unable to get block: %v", err) } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - block.Height, block.Hash) - txns := btcutil.NewBlock(rawBlock).Transactions() err = b.txConfNotifier.ConnectTip( block.Hash, uint32(block.Height), txns) @@ -581,6 +594,9 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err return fmt.Errorf("unable to connect tip: %v", err) } + chainntnfs.Log.Infof("New block: height=%v, sha=%v", block.Height, + block.Hash) + // We want to set the best block before dispatching notifications so // if any subscribers make queries based on their received block epoch, // our state is fully updated in time. @@ -588,6 +604,26 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err b.notifyBlockEpochs(block.Height, block.Hash) + // Finally, we'll update the spend height hint for all of our watched + // outpoints that have not been spent yet. This is safe to do as we do + // not watch already spent outpoints for spend notifications. + ops := make([]wire.OutPoint, 0, len(b.spendNotifications)) + for op := range b.spendNotifications { + ops = append(ops, op) + } + + if len(ops) > 0 { + err := b.spendHintCache.CommitSpendHint( + uint32(block.Height), ops..., + ) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + chainntnfs.Log.Errorf("Unable to update spend hint to "+ + "%d for %v: %v", block.Height, ops, err) + } + } + return nil } @@ -646,6 +682,18 @@ type spendCancel struct { func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { + // Before proceeding to register the notification, we'll query our + // height hint cache to determine whether a better one exists. + if hint, err := b.spendHintCache.QuerySpendHint(*outpoint); err == nil { + if hint > heightHint { + chainntnfs.Log.Debugf("Using height hint %d retrieved "+ + "from cache for %v", hint, outpoint) + heightHint = hint + } + } + + // Construct a notification request for the outpoint and send it to the + // main event loop. ntfn := &spendNotification{ targetOutpoint: outpoint, spendChan: make(chan *chainntnfs.SpendDetail, 1), @@ -672,7 +720,20 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return nil, err } - if txOut == nil { + // If the output is unspent, then we'll write it to the cache with the + // given height hint. This allows us to increase the height hint as the + // chain extends and the output remains unspent. + if txOut != nil { + err := b.spendHintCache.CommitSpendHint(heightHint, *outpoint) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + chainntnfs.Log.Error("Unable to update spend hint to "+ + "%d for %v: %v", heightHint, *outpoint, err) + } + } else { + // Otherwise, we'll determine when the output was spent. + // // First, we'll attempt to retrieve the transaction's block hash // using the backend's transaction index. tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash) @@ -702,7 +763,9 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, int64(heightHint), ) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to retrieve "+ + "hash for block with height %d: %v", + heightHint, err) } } @@ -794,11 +857,13 @@ func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint, blockHash, err := b.chainConn.GetBlockHash(int64(height)) if err != nil { - return err + return fmt.Errorf("unable to retrieve hash for block "+ + "with height %d: %v", height, err) } block, err := b.chainConn.GetBlock(blockHash) if err != nil { - return err + return fmt.Errorf("unable to retrieve block with hash "+ + "%v: %v", blockHash, err) } for _, tx := range block.Transactions { @@ -853,6 +918,18 @@ type confirmationNotification struct { func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { + // Before proceeding to register the notification, we'll query our + // height hint cache to determine whether a better one exists. + if hint, err := b.confirmHintCache.QueryConfirmHint(*txid); err == nil { + if hint > heightHint { + chainntnfs.Log.Debugf("Using height hint %d retrieved "+ + "from cache for %v", hint, txid) + heightHint = hint + } + } + + // Construct a notification request for the transaction and send it to + // the main event loop. ntfn := &confirmationNotification{ ConfNtfn: chainntnfs.ConfNtfn{ ConfID: atomic.AddUint64(&b.confClientCounter, 1), diff --git a/chainntnfs/bitcoindnotify/bitcoind_debug.go b/chainntnfs/bitcoindnotify/bitcoind_debug.go index 33d1aa4d..85842ca5 100644 --- a/chainntnfs/bitcoindnotify/bitcoind_debug.go +++ b/chainntnfs/bitcoindnotify/bitcoind_debug.go @@ -29,7 +29,8 @@ func (b *BitcoindNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Has } b.txConfNotifier = chainntnfs.NewTxConfNotifier( - uint32(bestHeight), reorgSafetyLimit) + uint32(bestHeight), reorgSafetyLimit, b.confirmHintCache, + ) if generateBlocks != nil { // Ensure no block notifications are pending when we start the diff --git a/chainntnfs/bitcoindnotify/driver.go b/chainntnfs/bitcoindnotify/driver.go index b27d2c64..d6ef3705 100644 --- a/chainntnfs/bitcoindnotify/driver.go +++ b/chainntnfs/bitcoindnotify/driver.go @@ -1,6 +1,7 @@ package bitcoindnotify import ( + "errors" "fmt" "github.com/btcsuite/btcwallet/chain" @@ -10,18 +11,30 @@ import ( // createNewNotifier creates a new instance of the ChainNotifier interface // implemented by BitcoindNotifier. func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) { - if len(args) != 1 { + if len(args) != 3 { return nil, fmt.Errorf("incorrect number of arguments to "+ - ".New(...), expected 1, instead passed %v", len(args)) + ".New(...), expected 2, instead passed %v", len(args)) } chainConn, ok := args[0].(*chain.BitcoindConn) if !ok { - return nil, fmt.Errorf("first argument to bitcoindnotify.New " + + return nil, errors.New("first argument to bitcoindnotify.New " + "is incorrect, expected a *chain.BitcoindConn") } - return New(chainConn), nil + spendHintCache, ok := args[1].(chainntnfs.SpendHintCache) + if !ok { + return nil, errors.New("second argument to bitcoindnotify.New " + + "is incorrect, expected a chainntnfs.SpendHintCache") + } + + confirmHintCache, ok := args[2].(chainntnfs.ConfirmHintCache) + if !ok { + return nil, errors.New("third argument to bitcoindnotify.New " + + "is incorrect, expected a chainntnfs.ConfirmHintCache") + } + + return New(chainConn, spendHintCache, confirmHintCache), nil } // init registers a driver for the BtcdNotifier concrete implementation of the diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index c7cb6995..22febdd2 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -83,6 +83,16 @@ type BtcdNotifier struct { chainUpdates *chainntnfs.ConcurrentQueue txUpdates *chainntnfs.ConcurrentQueue + // spendHintCache is a cache used to query and update the latest height + // hints for an outpoint. Each height hint represents the earliest + // height at which the outpoint could have been spent within the chain. + spendHintCache chainntnfs.SpendHintCache + + // confirmHintCache is a cache used to query the latest height hints for + // a transaction. Each height hint represents the earliest height at + // which the transaction could have confirmed within the chain. + confirmHintCache chainntnfs.ConfirmHintCache + wg sync.WaitGroup quit chan struct{} } @@ -93,7 +103,9 @@ var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil) // New returns a new BtcdNotifier instance. This function assumes the btcd node // detailed in the passed configuration is already running, and willing to // accept new websockets clients. -func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) { +func New(config *rpcclient.ConnConfig, spendHintCache chainntnfs.SpendHintCache, + confirmHintCache chainntnfs.ConfirmHintCache) (*BtcdNotifier, error) { + notifier := &BtcdNotifier{ notificationCancels: make(chan interface{}), notificationRegistry: make(chan interface{}), @@ -105,6 +117,9 @@ func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) { chainUpdates: chainntnfs.NewConcurrentQueue(10), txUpdates: chainntnfs.NewConcurrentQueue(10), + spendHintCache: spendHintCache, + confirmHintCache: confirmHintCache, + quit: make(chan struct{}), } @@ -150,7 +165,8 @@ func (b *BtcdNotifier) Start() error { } b.txConfNotifier = chainntnfs.NewTxConfNotifier( - uint32(currentHeight), reorgSafetyLimit) + uint32(currentHeight), reorgSafetyLimit, b.confirmHintCache, + ) b.bestBlock = chainntnfs.BlockEpoch{ Height: currentHeight, @@ -646,23 +662,23 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { return fmt.Errorf("unable to get block: %v", err) } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - epoch.Height, epoch.Hash) - - txns := btcutil.NewBlock(rawBlock).Transactions() - newBlock := &filteredBlock{ hash: *epoch.Hash, height: uint32(epoch.Height), - txns: txns, + txns: btcutil.NewBlock(rawBlock).Transactions(), connect: true, } - err = b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, - newBlock.txns) + + err = b.txConfNotifier.ConnectTip( + &newBlock.hash, newBlock.height, newBlock.txns, + ) if err != nil { return fmt.Errorf("unable to connect tip: %v", err) } + chainntnfs.Log.Infof("New block: height=%v, sha=%v", epoch.Height, + epoch.Hash) + // We want to set the best block before dispatching notifications // so if any subscribers make queries based on their received // block epoch, our state is fully updated in time. @@ -671,8 +687,8 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { // Next we'll notify any subscribed clients of the block. b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - // Finally, we'll scan over the list of relevant transactions and - // possibly dispatch notifications for confirmations and spends. + // Scan over the list of relevant transactions and possibly dispatch + // notifications for spends. for _, tx := range newBlock.txns { mtx := tx.MsgTx() txSha := mtx.TxHash() @@ -698,8 +714,10 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { } for _, ntfn := range clients { - chainntnfs.Log.Infof("Dispatching spend notification for "+ - "outpoint=%v", ntfn.targetOutpoint) + chainntnfs.Log.Infof("Dispatching spend "+ + "notification for outpoint=%v", + ntfn.targetOutpoint) + ntfn.spendChan <- spendDetails // Close spendChan to ensure that any calls to @@ -713,6 +731,26 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { } } + // Finally, we'll update the spend height hint for all of our watched + // outpoints that have not been spent yet. This is safe to do as we do + // not watch already spent outpoints for spend notifications. + ops := make([]wire.OutPoint, 0, len(b.spendNotifications)) + for op := range b.spendNotifications { + ops = append(ops, op) + } + + if len(ops) > 0 { + err := b.spendHintCache.CommitSpendHint( + uint32(epoch.Height), ops..., + ) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + chainntnfs.Log.Errorf("Unable to update spend hint to "+ + "%d for %v: %v", epoch.Height, ops, err) + } + } + return nil } @@ -771,6 +809,18 @@ type spendCancel struct { func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { + // Before proceeding to register the notification, we'll query our + // height hint cache to determine whether a better one exists. + if hint, err := b.spendHintCache.QuerySpendHint(*outpoint); err == nil { + if hint > heightHint { + chainntnfs.Log.Debugf("Using height hint %d retrieved "+ + "from cache for %v", hint, outpoint) + heightHint = hint + } + } + + // Construct a notification request for the outpoint and send it to the + // main event loop. ntfn := &spendNotification{ targetOutpoint: outpoint, spendChan: make(chan *chainntnfs.SpendDetail, 1), @@ -799,7 +849,20 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return nil, err } - if txOut == nil { + // If the output is unspent, then we'll write it to the cache with the + // given height hint. This allows us to increase the height hint as the + // chain extends and the output remains unspent. + if txOut != nil { + err := b.spendHintCache.CommitSpendHint(heightHint, *outpoint) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + chainntnfs.Log.Error("Unable to update spend hint to "+ + "%d for %v: %v", heightHint, *outpoint, err) + } + } else { + // Otherwise, we'll determine when the output was spent. + // // First, we'll attempt to retrieve the transaction's block hash // using the backend's transaction index. tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash) @@ -900,6 +963,18 @@ type confirmationNotification struct { func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { + // Before proceeding to register the notification, we'll query our + // height hint cache to determine whether a better one exists. + if hint, err := b.confirmHintCache.QueryConfirmHint(*txid); err == nil { + if hint > heightHint { + chainntnfs.Log.Debugf("Using height hint %d retrieved "+ + "from cache for %v", hint, txid) + heightHint = hint + } + } + + // Construct a notification request for the transaction and send it to + // the main event loop. ntfn := &confirmationNotification{ ConfNtfn: chainntnfs.ConfNtfn{ ConfID: atomic.AddUint64(&b.confClientCounter, 1), diff --git a/chainntnfs/btcdnotify/btcd_debug.go b/chainntnfs/btcdnotify/btcd_debug.go index 8ddffc39..47136f0b 100644 --- a/chainntnfs/btcdnotify/btcd_debug.go +++ b/chainntnfs/btcdnotify/btcd_debug.go @@ -28,7 +28,8 @@ func (b *BtcdNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash, } b.txConfNotifier = chainntnfs.NewTxConfNotifier( - uint32(bestHeight), reorgSafetyLimit) + uint32(bestHeight), reorgSafetyLimit, b.confirmHintCache, + ) b.chainUpdates.Start() b.txUpdates.Start() diff --git a/chainntnfs/btcdnotify/driver.go b/chainntnfs/btcdnotify/driver.go index 19e405a1..1cda9192 100644 --- a/chainntnfs/btcdnotify/driver.go +++ b/chainntnfs/btcdnotify/driver.go @@ -1,6 +1,7 @@ package btcdnotify import ( + "errors" "fmt" "github.com/btcsuite/btcd/rpcclient" @@ -10,18 +11,30 @@ import ( // createNewNotifier creates a new instance of the ChainNotifier interface // implemented by BtcdNotifier. func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) { - if len(args) != 1 { - return nil, fmt.Errorf("incorrect number of arguments to .New(...), "+ - "expected 1, instead passed %v", len(args)) + if len(args) != 3 { + return nil, fmt.Errorf("incorrect number of arguments to "+ + ".New(...), expected 2, instead passed %v", len(args)) } config, ok := args[0].(*rpcclient.ConnConfig) if !ok { - return nil, fmt.Errorf("first argument to btcdnotifier.New is " + - "incorrect, expected a *rpcclient.ConnConfig") + return nil, errors.New("first argument to btcdnotifier.New " + + "is incorrect, expected a *rpcclient.ConnConfig") } - return New(config) + spendHintCache, ok := args[1].(chainntnfs.SpendHintCache) + if !ok { + return nil, errors.New("second argument to btcdnotifier.New " + + "is incorrect, expected a chainntnfs.SpendHintCache") + } + + confirmHintCache, ok := args[2].(chainntnfs.ConfirmHintCache) + if !ok { + return nil, errors.New("third argument to btcdnotifier.New " + + "is incorrect, expected a chainntnfs.ConfirmHintCache") + } + + return New(config, spendHintCache, confirmHintCache) } // init registers a driver for the BtcdNotifier concrete implementation of the diff --git a/chainntnfs/height_hint_cache.go b/chainntnfs/height_hint_cache.go new file mode 100644 index 00000000..ba3c2224 --- /dev/null +++ b/chainntnfs/height_hint_cache.go @@ -0,0 +1,297 @@ +package chainntnfs + +import ( + "bytes" + "errors" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + bolt "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/channeldb" +) + +const ( + // dbName is the default name of the database storing the height hints. + dbName = "heighthint.db" + + // dbFilePermission is the default permission of the database file + // storing the height hints. + dbFilePermission = 0600 +) + +var ( + // spendHintBucket is the name of the bucket which houses the height + // hint for outpoints. Each height hint represents the earliest height + // at which its corresponding outpoint could have been spent within. + spendHintBucket = []byte("spend-hints") + + // confirmHintBucket is the name of the bucket which houses the height + // hints for transactions. Each height hint represents the earliest + // height at which its corresponding transaction could have been + // confirmed within. + confirmHintBucket = []byte("confirm-hints") + + // ErrCorruptedHeightHintCache indicates that the on-disk bucketing + // structure has altered since the height hint cache instance was + // initialized. + ErrCorruptedHeightHintCache = errors.New("height hint cache has been " + + "corrupted") + + // ErrSpendHintNotFound is an error returned when a spend hint for an + // outpoint was not found. + ErrSpendHintNotFound = errors.New("spend hint not found") + + // ErrConfirmHintNotFound is an error returned when a confirm hint for a + // transaction was not found. + ErrConfirmHintNotFound = errors.New("confirm hint not found") +) + +// SpendHintCache is an interface whose duty is to cache spend hints for +// outpoints. A spend hint is defined as the earliest height in the chain at +// which an outpoint could have been spent within. +type SpendHintCache interface { + // CommitSpendHint commits a spend hint for the outpoints to the cache. + CommitSpendHint(height uint32, ops ...wire.OutPoint) error + + // QuerySpendHint returns the latest spend hint for an outpoint. + // ErrSpendHintNotFound is returned if a spend hint does not exist + // within the cache for the outpoint. + QuerySpendHint(op wire.OutPoint) (uint32, error) + + // PurgeSpendHint removes the spend hint for the outpoints from the + // cache. + PurgeSpendHint(ops ...wire.OutPoint) error +} + +// ConfirmHintCache is an interface whose duty is to cache confirm hints for +// transactions. A confirm hint is defined as the earliest height in the chain +// at which a transaction could have been included in a block. +type ConfirmHintCache interface { + // CommitConfirmHint commits a confirm hint for the transactions to the + // cache. + CommitConfirmHint(height uint32, txids ...chainhash.Hash) error + + // QueryConfirmHint returns the latest confirm hint for a transaction + // hash. ErrConfirmHintNotFound is returned if a confirm hint does not + // exist within the cache for the transaction hash. + QueryConfirmHint(txid chainhash.Hash) (uint32, error) + + // PurgeConfirmHint removes the confirm hint for the transactions from + // the cache. + PurgeConfirmHint(txids ...chainhash.Hash) error +} + +// HeightHintCache is an implementation of the SpendHintCache and +// ConfirmHintCache interfaces backed by a channeldb DB instance where the hints +// will be stored. +type HeightHintCache struct { + db *channeldb.DB +} + +// Compile-time checks to ensure HeightHintCache satisfies the SpendHintCache +// and ConfirmHintCache interfaces. +var _ SpendHintCache = (*HeightHintCache)(nil) +var _ ConfirmHintCache = (*HeightHintCache)(nil) + +// NewHeightHintCache returns a new height hint cache backed by a database. +func NewHeightHintCache(db *channeldb.DB) (*HeightHintCache, error) { + cache := &HeightHintCache{db} + if err := cache.initBuckets(); err != nil { + return nil, err + } + + return cache, nil +} + +// initBuckets ensures that the primary buckets used by the circuit are +// initialized so that we can assume their existence after startup. +func (c *HeightHintCache) initBuckets() error { + return c.db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(spendHintBucket) + if err != nil { + return err + } + + _, err = tx.CreateBucketIfNotExists(confirmHintBucket) + return err + }) +} + +// CommitSpendHint commits a spend hint for the outpoints to the cache. +func (c *HeightHintCache) CommitSpendHint(height uint32, ops ...wire.OutPoint) error { + Log.Tracef("Updating spend hint to height %d for %v", height, ops) + + return c.db.Batch(func(tx *bolt.Tx) error { + spendHints := tx.Bucket(spendHintBucket) + if spendHints == nil { + return ErrCorruptedHeightHintCache + } + + var hint bytes.Buffer + if err := channeldb.WriteElement(&hint, height); err != nil { + return err + } + + for _, op := range ops { + var outpoint bytes.Buffer + err := channeldb.WriteElement(&outpoint, op) + if err != nil { + return err + } + + err = spendHints.Put(outpoint.Bytes(), hint.Bytes()) + if err != nil { + return err + } + } + + return nil + }) +} + +// QuerySpendHint returns the latest spend hint for an outpoint. +// ErrSpendHintNotFound is returned if a spend hint does not exist within the +// cache for the outpoint. +func (c *HeightHintCache) QuerySpendHint(op wire.OutPoint) (uint32, error) { + var hint uint32 + err := c.db.View(func(tx *bolt.Tx) error { + spendHints := tx.Bucket(spendHintBucket) + if spendHints == nil { + return ErrCorruptedHeightHintCache + } + + var outpoint bytes.Buffer + if err := channeldb.WriteElement(&outpoint, op); err != nil { + return err + } + + spendHint := spendHints.Get(outpoint.Bytes()) + if spendHint == nil { + return ErrSpendHintNotFound + } + + return channeldb.ReadElement(bytes.NewReader(spendHint), &hint) + }) + if err != nil { + return 0, err + } + + return hint, nil +} + +// PurgeSpendHint removes the spend hint for the outpoints from the cache. +func (c *HeightHintCache) PurgeSpendHint(ops ...wire.OutPoint) error { + Log.Tracef("Removing spend hints for %v", ops) + + return c.db.Batch(func(tx *bolt.Tx) error { + spendHints := tx.Bucket(spendHintBucket) + if spendHints == nil { + return ErrCorruptedHeightHintCache + } + + for _, op := range ops { + var outpoint bytes.Buffer + err := channeldb.WriteElement(&outpoint, op) + if err != nil { + return err + } + + err = spendHints.Delete(outpoint.Bytes()) + if err != nil { + return err + } + } + + return nil + }) +} + +// CommitConfirmHint commits a confirm hint for the transactions to the cache. +func (c *HeightHintCache) CommitConfirmHint(height uint32, txids ...chainhash.Hash) error { + Log.Tracef("Updating confirm hints to height %d for %v", height, txids) + + return c.db.Batch(func(tx *bolt.Tx) error { + confirmHints := tx.Bucket(confirmHintBucket) + if confirmHints == nil { + return ErrCorruptedHeightHintCache + } + + var hint bytes.Buffer + if err := channeldb.WriteElement(&hint, height); err != nil { + return err + } + + for _, txid := range txids { + var txHash bytes.Buffer + err := channeldb.WriteElement(&txHash, txid) + if err != nil { + return err + } + + err = confirmHints.Put(txHash.Bytes(), hint.Bytes()) + if err != nil { + return err + } + } + + return nil + }) +} + +// QueryConfirmHint returns the latest confirm hint for a transaction hash. +// ErrConfirmHintNotFound is returned if a confirm hint does not exist within +// the cache for the transaction hash. +func (c *HeightHintCache) QueryConfirmHint(txid chainhash.Hash) (uint32, error) { + var hint uint32 + err := c.db.View(func(tx *bolt.Tx) error { + confirmHints := tx.Bucket(confirmHintBucket) + if confirmHints == nil { + return ErrCorruptedHeightHintCache + } + + var txHash bytes.Buffer + if err := channeldb.WriteElement(&txHash, txid); err != nil { + return err + } + + confirmHint := confirmHints.Get(txHash.Bytes()) + if confirmHint == nil { + return ErrConfirmHintNotFound + } + + return channeldb.ReadElement(bytes.NewReader(confirmHint), &hint) + }) + if err != nil { + return 0, err + } + + return hint, nil +} + +// PurgeConfirmHint removes the confirm hint for the transactions from the +// cache. +func (c *HeightHintCache) PurgeConfirmHint(txids ...chainhash.Hash) error { + Log.Tracef("Removing confirm hints for %v", txids) + + return c.db.Batch(func(tx *bolt.Tx) error { + confirmHints := tx.Bucket(confirmHintBucket) + if confirmHints == nil { + return ErrCorruptedHeightHintCache + } + + for _, txid := range txids { + var txHash bytes.Buffer + err := channeldb.WriteElement(&txHash, txid) + if err != nil { + return err + } + + err = confirmHints.Delete(txHash.Bytes()) + if err != nil { + return err + } + } + + return nil + }) +} diff --git a/chainntnfs/height_hint_cache_test.go b/chainntnfs/height_hint_cache_test.go new file mode 100644 index 00000000..f444b18d --- /dev/null +++ b/chainntnfs/height_hint_cache_test.go @@ -0,0 +1,148 @@ +package chainntnfs + +import ( + "bytes" + "io/ioutil" + "testing" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" +) + +func initHintCache(t *testing.T) *HeightHintCache { + t.Helper() + + tempDir, err := ioutil.TempDir("", "kek") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + db, err := channeldb.Open(tempDir) + if err != nil { + t.Fatalf("unable to create db: %v", err) + } + hintCache, err := NewHeightHintCache(db) + if err != nil { + t.Fatalf("unable to create hint cache: %v", err) + } + + return hintCache +} + +// TestHeightHintCacheConfirms ensures that the height hint cache properly +// caches confirm hints for transactions. +func TestHeightHintCacheConfirms(t *testing.T) { + t.Parallel() + + hintCache := initHintCache(t) + + // Querying for a transaction hash not found within the cache should + // return an error indication so. + var unknownHash chainhash.Hash + _, err := hintCache.QueryConfirmHint(unknownHash) + if err != ErrConfirmHintNotFound { + t.Fatalf("expected ErrConfirmHintNotFound, got: %v", err) + } + + // Now, we'll create some transaction hashes and commit them to the + // cache with the same confirm hint. + const height = 100 + const numHashes = 5 + txHashes := make([]chainhash.Hash, numHashes) + for i := 0; i < numHashes; i++ { + var txHash chainhash.Hash + copy(txHash[:], bytes.Repeat([]byte{byte(i)}, 32)) + txHashes[i] = txHash + } + + if err := hintCache.CommitConfirmHint(height, txHashes...); err != nil { + t.Fatalf("unable to add entries to cache: %v", err) + } + + // With the hashes committed, we'll now query the cache to ensure that + // we're able to properly retrieve the confirm hints. + for _, txHash := range txHashes { + confirmHint, err := hintCache.QueryConfirmHint(txHash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if confirmHint != height { + t.Fatalf("expected confirm hint %d, got %d", height, + confirmHint) + } + } + + // We'll also attempt to purge all of them in a single database + // transaction. + if err := hintCache.PurgeConfirmHint(txHashes...); err != nil { + t.Fatalf("unable to remove confirm hints: %v", err) + } + + // Finally, we'll attempt to query for each hash. We should expect not + // to find a hint for any of them. + for _, txHash := range txHashes { + _, err := hintCache.QueryConfirmHint(txHash) + if err != ErrConfirmHintNotFound { + t.Fatalf("expected ErrConfirmHintNotFound, got :%v", err) + } + } +} + +// TestHeightHintCacheSpends ensures that the height hint cache properly caches +// spend hints for outpoints. +func TestHeightHintCacheSpends(t *testing.T) { + t.Parallel() + + hintCache := initHintCache(t) + + // Querying for an outpoint not found within the cache should return an + // error indication so. + var unknownOutPoint wire.OutPoint + _, err := hintCache.QuerySpendHint(unknownOutPoint) + if err != ErrSpendHintNotFound { + t.Fatalf("expected ErrSpendHintNotFound, got: %v", err) + } + + // Now, we'll create some outpoints and commit them to the cache with + // the same spend hint. + const height = 100 + const numOutpoints = 5 + var txHash chainhash.Hash + copy(txHash[:], bytes.Repeat([]byte{0xFF}, 32)) + outpoints := make([]wire.OutPoint, numOutpoints) + for i := uint32(0); i < numOutpoints; i++ { + outpoints[i] = wire.OutPoint{Hash: txHash, Index: i} + } + + if err := hintCache.CommitSpendHint(height, outpoints...); err != nil { + t.Fatalf("unable to add entry to cache: %v", err) + } + + // With the outpoints committed, we'll now query the cache to ensure + // that we're able to properly retrieve the confirm hints. + for _, op := range outpoints { + spendHint, err := hintCache.QuerySpendHint(op) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if spendHint != height { + t.Fatalf("expected spend hint %d, got %d", height, + spendHint) + } + } + + // We'll also attempt to purge all of them in a single database + // transaction. + if err := hintCache.PurgeSpendHint(outpoints...); err != nil { + t.Fatalf("unable to remove spend hint: %v", err) + } + + // Finally, we'll attempt to query for each outpoint. We should expect + // not to find a hint for any of them. + for _, op := range outpoints { + _, err = hintCache.QuerySpendHint(op) + if err != ErrSpendHintNotFound { + t.Fatalf("expected ErrSpendHintNotFound, got: %v", err) + } + } +} diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index dade94ed..242e9ed9 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -26,6 +26,7 @@ import ( "github.com/btcsuite/btcwallet/walletdb" "github.com/lightninglabs/neutrino" "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" // Required to auto-register the bitcoind backed ChainNotifier // implementation. @@ -1741,10 +1742,22 @@ func TestInterfaces(t *testing.T) { newNotifier func() (chainntnfs.TestChainNotifier, error) ) for _, notifierDriver := range chainntnfs.RegisteredNotifiers() { + // Initialize a height hint cache for each notifier. + tempDir, err := ioutil.TempDir("", "channeldb") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + db, err := channeldb.Open(tempDir) + if err != nil { + t.Fatalf("unable to create db: %v", err) + } + hintCache, err := chainntnfs.NewHeightHintCache(db) + if err != nil { + t.Fatalf("unable to create height hint cache: %v", err) + } + notifierType := notifierDriver.NotifierType - switch notifierType { - case "bitcoind": // Start a bitcoind instance. tempBitcoindDir, err := ioutil.TempDir("", "bitcoind") @@ -1807,12 +1820,16 @@ func TestInterfaces(t *testing.T) { cleanUp = cleanUp3 newNotifier = func() (chainntnfs.TestChainNotifier, error) { - return bitcoindnotify.New(chainConn), nil + return bitcoindnotify.New( + chainConn, hintCache, hintCache, + ), nil } case "btcd": newNotifier = func() (chainntnfs.TestChainNotifier, error) { - return btcdnotify.New(&rpcConfig) + return btcdnotify.New( + &rpcConfig, hintCache, hintCache, + ) } cleanUp = func() {} @@ -1855,7 +1872,9 @@ func TestInterfaces(t *testing.T) { time.Sleep(time.Millisecond * 100) } newNotifier = func() (chainntnfs.TestChainNotifier, error) { - return neutrinonotify.New(spvNode) + return neutrinonotify.New( + spvNode, hintCache, hintCache, + ) } } diff --git a/chainntnfs/neutrinonotify/driver.go b/chainntnfs/neutrinonotify/driver.go index c9dc1b36..6a6ffd8e 100644 --- a/chainntnfs/neutrinonotify/driver.go +++ b/chainntnfs/neutrinonotify/driver.go @@ -1,6 +1,7 @@ package neutrinonotify import ( + "errors" "fmt" "github.com/lightninglabs/neutrino" @@ -10,18 +11,30 @@ import ( // createNewNotifier creates a new instance of the ChainNotifier interface // implemented by NeutrinoNotifier. func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) { - if len(args) != 1 { - return nil, fmt.Errorf("incorrect number of arguments to .New(...), "+ - "expected 1, instead passed %v", len(args)) + if len(args) != 2 { + return nil, fmt.Errorf("incorrect number of arguments to "+ + ".New(...), expected 2, instead passed %v", len(args)) } config, ok := args[0].(*neutrino.ChainService) if !ok { - return nil, fmt.Errorf("first argument to neutrinonotify.New is " + - "incorrect, expected a *neutrino.ChainService") + return nil, errors.New("first argument to neutrinonotify.New " + + "is incorrect, expected a *neutrino.ChainService") } - return New(config) + spendHintCache, ok := args[1].(chainntnfs.SpendHintCache) + if !ok { + return nil, errors.New("second argument to neutrinonotify.New " + + "is incorrect, expected a chainntfs.SpendHintCache") + } + + confirmHintCache, ok := args[2].(chainntnfs.ConfirmHintCache) + if !ok { + return nil, errors.New("third argument to neutrinonotify.New " + + "is incorrect, expected a chainntfs.ConfirmHintCache") + } + + return New(config, spendHintCache, confirmHintCache) } // init registers a driver for the NeutrinoNotify concrete implementation of diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 07e297a6..5fb4d2a3 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -77,6 +77,16 @@ type NeutrinoNotifier struct { chainUpdates *chainntnfs.ConcurrentQueue + // spendHintCache is a cache used to query and update the latest height + // hints for an outpoint. Each height hint represents the earliest + // height at which the outpoint could have been spent within the chain. + spendHintCache chainntnfs.SpendHintCache + + // confirmHintCache is a cache used to query the latest height hints for + // a transaction. Each height hint represents the earliest height at + // which the transaction could have confirmed within the chain. + confirmHintCache chainntnfs.ConfirmHintCache + wg sync.WaitGroup quit chan struct{} } @@ -89,7 +99,9 @@ var _ chainntnfs.ChainNotifier = (*NeutrinoNotifier)(nil) // // NOTE: The passed neutrino node should already be running and active before // being passed into this function. -func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) { +func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache, + confirmHintCache chainntnfs.ConfirmHintCache) (*NeutrinoNotifier, error) { + notifier := &NeutrinoNotifier{ notificationCancels: make(chan interface{}), notificationRegistry: make(chan interface{}), @@ -104,6 +116,9 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) { chainUpdates: chainntnfs.NewConcurrentQueue(10), + spendHintCache: spendHintCache, + confirmHintCache: confirmHintCache, + quit: make(chan struct{}), } @@ -150,7 +165,7 @@ func (n *NeutrinoNotifier) Start() error { } n.txConfNotifier = chainntnfs.NewTxConfNotifier( - bestHeight, reorgSafetyLimit, + bestHeight, reorgSafetyLimit, n.confirmHintCache, ) n.chainConn = &NeutrinoChainConn{n.p2pNode} @@ -358,6 +373,7 @@ out: rescanUpdate := []neutrino.UpdateOption{ neutrino.AddAddrs(addrs...), neutrino.Rewind(currentHeight), + neutrino.DisableDisconnectedNtfns(true), } err = n.chainView.Update(rescanUpdate...) if err != nil { @@ -588,22 +604,23 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { // First process the block for our internal state. A new block has // been connected to the main chain. Send out any N confirmation // notifications which may have been triggered by this new block. - err := n.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, - newBlock.txns) + err := n.txConfNotifier.ConnectTip( + &newBlock.hash, newBlock.height, newBlock.txns, + ) if err != nil { return fmt.Errorf("unable to connect tip: %v", err) } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - newBlock.height, newBlock.hash) + chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height, + newBlock.hash) n.bestHeight = newBlock.height // Next, notify any subscribed clients of the block. n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - // Finally, we'll scan over the list of relevant transactions and - // possibly dispatch notifications for confirmations and spends. + // Scan over the list of relevant transactions and possibly dispatch + // notifications for spends. for _, tx := range newBlock.txns { mtx := tx.MsgTx() txSha := mtx.TxHash() @@ -646,6 +663,24 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { } } + // Finally, we'll update the spend height hint for all of our watched + // outpoints that have not been spent yet. This is safe to do as we do + // not watch already spent outpoints for spend notifications. + ops := make([]wire.OutPoint, 0, len(n.spendNotifications)) + for op := range n.spendNotifications { + ops = append(ops, op) + } + + if len(ops) > 0 { + err := n.spendHintCache.CommitSpendHint(newBlock.height, ops...) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + chainntnfs.Log.Errorf("Unable to update spend hint to "+ + "%d for %v: %v", newBlock.height, ops, err) + } + } + return nil } @@ -725,15 +760,26 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, currentHeight := n.bestHeight n.heightMtx.RUnlock() - chainntnfs.Log.Infof("New spend notification for outpoint=%v, "+ - "height_hint=%v", outpoint, heightHint) + // Before proceeding to register the notification, we'll query our + // height hint cache to determine whether a better one exists. + if hint, err := n.spendHintCache.QuerySpendHint(*outpoint); err == nil { + if hint > heightHint { + chainntnfs.Log.Debugf("Using height hint %d retrieved "+ + "from cache for %v", hint, outpoint) + heightHint = hint + } + } + // Construct a notification request for the outpoint. We'll defer + // sending it to the main event loop until after we've guaranteed that + // the outpoint has not been spent. ntfn := &spendNotification{ targetOutpoint: outpoint, spendChan: make(chan *chainntnfs.SpendDetail, 1), spendID: atomic.AddUint64(&n.spendClientCounter, 1), heightHint: heightHint, } + spendEvent := &chainntnfs.SpendEvent{ Spend: ntfn.spendChan, Cancel: func() { @@ -745,8 +791,9 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // Submit spend cancellation to notification dispatcher. select { case n.notificationCancels <- cancel: - // Cancellation is being handled, drain the spend chan until it is - // closed before yielding to the caller. + // Cancellation is being handled, drain the + // spend chan until it is closed before yielding + // to the caller. for { select { case _, ok := <-ntfn.spendChan: @@ -824,6 +871,7 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, rescanUpdate := []neutrino.UpdateOption{ neutrino.AddInputs(inputToWatch), neutrino.Rewind(currentHeight), + neutrino.DisableDisconnectedNtfns(true), } if err := n.chainView.Update(rescanUpdate...); err != nil { @@ -836,6 +884,16 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return nil, ErrChainNotifierShuttingDown } + // Finally, we'll add a spent hint with the current height to the cache + // in order to better keep track of when this outpoint is spent. + err = n.spendHintCache.CommitSpendHint(currentHeight, *outpoint) + if err != nil { + // The error is not fatal, so we should not return an error to + // the caller. + chainntnfs.Log.Errorf("Unable to update spend hint to %d for "+ + "%v: %v", currentHeight, outpoint, err) + } + return spendEvent, nil } @@ -854,6 +912,18 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { + // Before proceeding to register the notification, we'll query our + // height hint cache to determine whether a better one exists. + if hint, err := n.confirmHintCache.QueryConfirmHint(*txid); err == nil { + if hint > heightHint { + chainntnfs.Log.Debugf("Using height hint %d retrieved "+ + "from cache for %v", hint, txid) + heightHint = hint + } + } + + // Construct a notification request for the transaction and send it to + // the main event loop. ntfn := &confirmationsNotification{ ConfNtfn: chainntnfs.ConfNtfn{ ConfID: atomic.AddUint64(&n.confClientCounter, 1), diff --git a/chainntnfs/neutrinonotify/neutrino_debug.go b/chainntnfs/neutrinonotify/neutrino_debug.go index 56724c4f..ece4af27 100644 --- a/chainntnfs/neutrinonotify/neutrino_debug.go +++ b/chainntnfs/neutrinonotify/neutrino_debug.go @@ -51,7 +51,8 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Has } n.txConfNotifier = chainntnfs.NewTxConfNotifier( - uint32(bestHeight), reorgSafetyLimit) + uint32(bestHeight), reorgSafetyLimit, n.confirmHintCache, + ) n.chainConn = &NeutrinoChainConn{n.p2pNode} diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index bb049b8a..ff97b23e 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -89,6 +89,11 @@ type TxConfNotifier struct { // at which the transaction will have sufficient confirmations. ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{} + // hintCache is a cache used to maintain the latest height hints for + // transactions. Each height hint represents the earliest height at + // which the transactions could have been confirmed within the chain. + hintCache ConfirmHintCache + // quit is closed in order to signal that the notifier is gracefully // exiting. quit chan struct{} @@ -98,13 +103,16 @@ type TxConfNotifier struct { // NewTxConfNotifier creates a TxConfNotifier. The current height of the // blockchain is accepted as a parameter. -func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotifier { +func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, + hintCache ConfirmHintCache) *TxConfNotifier { + return &TxConfNotifier{ currentHeight: startHeight, reorgSafetyLimit: reorgSafetyLimit, confNotifications: make(map[chainhash.Hash]map[uint64]*ConfNtfn), txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), + hintCache: hintCache, quit: make(chan struct{}), } } @@ -130,6 +138,16 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error { if !ok { ntfns = make(map[uint64]*ConfNtfn) tcn.confNotifications[*ntfn.TxID] = ntfns + + err := tcn.hintCache.CommitConfirmHint( + tcn.currentHeight, *ntfn.TxID, + ) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + Log.Errorf("Unable to update confirm hint to %d for "+ + "%v: %v", tcn.currentHeight, *ntfn.TxID, err) + } } ntfns[ntfn.ConfID] = ntfn @@ -175,6 +193,14 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, return nil } + err := tcn.hintCache.CommitConfirmHint(details.BlockHeight, txid) + if err != nil { + // The error is not fatal, so we should not return an error to + // the caller. + Log.Errorf("Unable to update confirm hint to %d for %v: %v", + details.BlockHeight, txid, err) + } + // The notifier has yet to reach the height at which the transaction was // included in a block, so we should defer until handling it then within // ConnectTip. @@ -297,6 +323,48 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, } } + // In order to update the height hint for all the required transactions + // under one database transaction, we'll gather the set of unconfirmed + // transactions along with the ones that confirmed at the current + // height. To do so, we'll iterate over the confNotifications map, which + // contains the transactions we currently have notifications for. Since + // this map doesn't tell us whether the transaction hsa confirmed or + // not, we'll need to look at txsByInitialHeight to determine so. + var txsToUpdateHints []chainhash.Hash + for confirmedTx := range tcn.txsByInitialHeight[tcn.currentHeight] { + txsToUpdateHints = append(txsToUpdateHints, confirmedTx) + } +out: + for maybeUnconfirmedTx := range tcn.confNotifications { + for height, confirmedTxs := range tcn.txsByInitialHeight { + // Skip the transactions that confirmed at the new block + // height as those have already been added. + if height == blockHeight { + continue + } + + // If the transaction was found within the set of + // confirmed transactions at this height, we'll skip it. + if _, ok := confirmedTxs[maybeUnconfirmedTx]; ok { + continue out + } + } + txsToUpdateHints = append(txsToUpdateHints, maybeUnconfirmedTx) + } + + if len(txsToUpdateHints) > 0 { + err := tcn.hintCache.CommitConfirmHint( + tcn.currentHeight, txsToUpdateHints..., + ) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + Log.Errorf("Unable to update confirm hint to %d for "+ + "%v: %v", tcn.currentHeight, txsToUpdateHints, + err) + } + } + // Next, we'll dispatch an update to all of the notification clients for // our watched transactions with the number of confirmations left at // this new height. @@ -447,6 +515,20 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { } } + // Rewind the height hint for all watched transactions. + var txs []chainhash.Hash + for tx := range tcn.confNotifications { + txs = append(txs, tx) + } + + err := tcn.hintCache.CommitConfirmHint(tcn.currentHeight, txs...) + if err != nil { + // The error is not fatal, so we should not return an error to + // the caller. + Log.Errorf("Unable to update confirm hint to %d for %v: %v", + tcn.currentHeight, txs, err) + } + // Finally, we can remove the transactions we're currently watching that // were included in this block height. delete(tcn.txsByInitialHeight, blockHeight) diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index 2a57245a..fb996d0a 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -1,6 +1,7 @@ package chainntnfs_test import ( + "sync" "testing" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -11,6 +12,90 @@ import ( var zeroHash chainhash.Hash +type mockHintCache struct { + mu sync.Mutex + confHints map[chainhash.Hash]uint32 + spendHints map[wire.OutPoint]uint32 +} + +var _ chainntnfs.SpendHintCache = (*mockHintCache)(nil) +var _ chainntnfs.ConfirmHintCache = (*mockHintCache)(nil) + +func (c *mockHintCache) CommitSpendHint(heightHint uint32, ops ...wire.OutPoint) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, op := range ops { + c.spendHints[op] = heightHint + } + + return nil +} + +func (c *mockHintCache) QuerySpendHint(op wire.OutPoint) (uint32, error) { + c.mu.Lock() + defer c.mu.Unlock() + + hint, ok := c.spendHints[op] + if !ok { + return 0, chainntnfs.ErrSpendHintNotFound + } + + return hint, nil +} + +func (c *mockHintCache) PurgeSpendHint(ops ...wire.OutPoint) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, op := range ops { + delete(c.spendHints, op) + } + + return nil +} + +func (c *mockHintCache) CommitConfirmHint(heightHint uint32, txids ...chainhash.Hash) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, txid := range txids { + c.confHints[txid] = heightHint + } + + return nil +} + +func (c *mockHintCache) QueryConfirmHint(txid chainhash.Hash) (uint32, error) { + c.mu.Lock() + defer c.mu.Unlock() + + hint, ok := c.confHints[txid] + if !ok { + return 0, chainntnfs.ErrConfirmHintNotFound + } + + return hint, nil +} + +func (c *mockHintCache) PurgeConfirmHint(txids ...chainhash.Hash) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, txid := range txids { + delete(c.confHints, txid) + } + + return nil +} + +func newMockHintCache() *mockHintCache { + return &mockHintCache{ + confHints: make(map[chainhash.Hash]uint32), + spendHints: make(map[wire.OutPoint]uint32), + } +} + // TestTxConfFutureDispatch tests that the TxConfNotifier dispatches // registered notifications when the transaction confirms after registration. func TestTxConfFutureDispatch(t *testing.T) { @@ -27,7 +112,8 @@ func TestTxConfFutureDispatch(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions and register them with the // TxConfNotifier before including them in a block to receive future @@ -200,7 +286,8 @@ func TestTxConfHistoricalDispatch(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions at a height before the TxConfNotifier's // starting height so that they are confirmed once registering them. @@ -351,7 +438,8 @@ func TestTxConfChainReorg(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) - txConfNotifier := chainntnfs.NewTxConfNotifier(7, 100) + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier(7, 100, hintCache) // Tx 1 will be confirmed in block 9 and requires 2 confs. tx1Hash := tx1.TxHash() @@ -586,6 +674,147 @@ func TestTxConfChainReorg(t *testing.T) { } } +// TestTxConfHeightHintCache ensures that the height hints for transactions are +// kept track of correctly with each new block connected/disconnected. +func TestTxConfHeightHintCache(t *testing.T) { + t.Parallel() + + const ( + startingHeight = 10 + tx1Height = 11 + tx2Height = 12 + ) + + // Initialize our TxConfNotifier instance backed by a height hint cache. + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier( + startingHeight, 100, hintCache, + ) + + // Create two test transactions and register them for notifications. + tx1 := wire.MsgTx{Version: 1} + tx1Hash := tx1.TxHash() + ntfn1 := &chainntnfs.ConfNtfn{ + TxID: &tx1Hash, + NumConfirmations: 1, + Event: chainntnfs.NewConfirmationEvent(1), + } + + tx2 := wire.MsgTx{Version: 2} + tx2Hash := tx2.TxHash() + ntfn2 := &chainntnfs.ConfNtfn{ + TxID: &tx2Hash, + NumConfirmations: 2, + Event: chainntnfs.NewConfirmationEvent(2), + } + + if err := txConfNotifier.Register(ntfn1); err != nil { + t.Fatalf("unable to register tx1: %v", err) + } + if err := txConfNotifier.Register(ntfn2); err != nil { + t.Fatalf("unable to register tx2: %v", err) + } + + // Both transactions should have a height hint of the starting height + // due to registering notifications for them. + hint, err := hintCache.QueryConfirmHint(tx1Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != startingHeight { + t.Fatalf("expected hint %d, got %d", startingHeight, hint) + } + + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != startingHeight { + t.Fatalf("expected hint %d, got %d", startingHeight, hint) + } + + // Create a new block that will include the first transaction and extend + // the chain. + block1 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx1}, + }) + + err = txConfNotifier.ConnectTip( + block1.Hash(), tx1Height, block1.Transactions(), + ) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + // The height hint for the first transaction should now be updated to + // reflect its confirmation. + hint, err = hintCache.QueryConfirmHint(tx1Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx1Height { + t.Fatalf("expected hint %d, got %d", tx1Height, hint) + } + + // The height hint for the second transaction should also be updated due + // to it still being unconfirmed. + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx1Height { + t.Fatalf("expected hint %d, got %d", tx1Height, hint) + } + + // Now, we'll create another block that will include the second + // transaction and extend the chain. + block2 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx2}, + }) + + err = txConfNotifier.ConnectTip( + block2.Hash(), tx2Height, block2.Transactions(), + ) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + // The height hint for the first transaction should remain the same. + hint, err = hintCache.QueryConfirmHint(tx1Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx1Height { + t.Fatalf("expected hint %d, got %d", tx1Height, hint) + } + + // The height hint for the second transaction should now be updated to + // reflect its confirmation. + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx2Height { + t.Fatalf("expected hint %d, got %d", tx2Height, hint) + } + + // Now, we'll attempt do disconnect the last block in order to simulate + // a chain reorg. + if err := txConfNotifier.DisconnectTip(tx2Height); err != nil { + t.Fatalf("Failed to disconnect block: %v", err) + } + + // This should update the second transaction's height hint within the + // cache to the previous height. + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx1Height { + t.Fatalf("expected hint %d, got %d", tx1Height, hint) + } +} + func TestTxConfTearDown(t *testing.T) { t.Parallel() @@ -594,7 +823,8 @@ func TestTxConfTearDown(t *testing.T) { tx2 = wire.MsgTx{Version: 2} ) - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions and register them with the // TxConfNotifier to receive notifications. diff --git a/chainregistry.go b/chainregistry.go index 27f763c1..108cfc55 100644 --- a/chainregistry.go +++ b/chainregistry.go @@ -181,6 +181,13 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB, cleanUp func() ) + // Initialize the height hint cache within the chain directory. + hintCache, err := chainntnfs.NewHeightHintCache(chanDB) + if err != nil { + return nil, nil, fmt.Errorf("unable to initialize height hint "+ + "cache: %v", err) + } + // If spv mode is active, then we'll be using a distinct set of // chainControl interfaces that interface directly with the p2p network // of the selected chain. @@ -245,7 +252,9 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB, // Next we'll create the instances of the ChainNotifier and // FilteredChainView interface which is backed by the neutrino // light client. - cc.chainNotifier, err = neutrinonotify.New(svc) + cc.chainNotifier, err = neutrinonotify.New( + svc, hintCache, hintCache, + ) if err != nil { return nil, nil, err } @@ -322,7 +331,9 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB, "bitcoind: %v", err) } - cc.chainNotifier = bitcoindnotify.New(bitcoindConn) + cc.chainNotifier = bitcoindnotify.New( + bitcoindConn, hintCache, hintCache, + ) cc.chainView = chainview.NewBitcoindFilteredChainView(bitcoindConn) walletConfig.ChainSource = bitcoindConn.NewBitcoindClient(birthday) @@ -430,7 +441,9 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB, DisableConnectOnNew: true, DisableAutoReconnect: false, } - cc.chainNotifier, err = btcdnotify.New(rpcConfig) + cc.chainNotifier, err = btcdnotify.New( + rpcConfig, hintCache, hintCache, + ) if err != nil { return nil, nil, err } diff --git a/lnd_test.go b/lnd_test.go index dce4e5f4..41b63763 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -1628,122 +1628,119 @@ func testChannelBalance(net *lntest.NetworkHarness, t *harnessTest) { // findForceClosedChannel searches a pending channel response for a particular // channel, returning the force closed channel upon success. -func findForceClosedChannel(t *harnessTest, - pendingChanResp *lnrpc.PendingChannelsResponse, - op *wire.OutPoint) *lnrpc.PendingChannelsResponse_ForceClosedChannel { +func findForceClosedChannel(pendingChanResp *lnrpc.PendingChannelsResponse, + op *wire.OutPoint) (*lnrpc.PendingChannelsResponse_ForceClosedChannel, error) { - var found bool - var forceClose *lnrpc.PendingChannelsResponse_ForceClosedChannel - for _, forceClose = range pendingChanResp.PendingForceClosingChannels { + for _, forceClose := range pendingChanResp.PendingForceClosingChannels { if forceClose.Channel.ChannelPoint == op.String() { - found = true - break + return forceClose, nil } } - if !found { - t.Fatalf("channel not marked as force closed") - } - return forceClose + return nil, errors.New("channel not marked as force closed") } // findWaitingCloseChannel searches a pending channel response for a particular // channel, returning the waiting close channel upon success. -func findWaitingCloseChannel(t *harnessTest, - pendingChanResp *lnrpc.PendingChannelsResponse, - op *wire.OutPoint) *lnrpc.PendingChannelsResponse_WaitingCloseChannel { +func findWaitingCloseChannel(pendingChanResp *lnrpc.PendingChannelsResponse, + op *wire.OutPoint) (*lnrpc.PendingChannelsResponse_WaitingCloseChannel, error) { - var found bool - var waitingClose *lnrpc.PendingChannelsResponse_WaitingCloseChannel - for _, waitingClose = range pendingChanResp.WaitingCloseChannels { + for _, waitingClose := range pendingChanResp.WaitingCloseChannels { if waitingClose.Channel.ChannelPoint == op.String() { - found = true - break + return waitingClose, nil } } - if !found { - t.Fatalf("channel not marked as waiting close") - } - return waitingClose + return nil, errors.New("channel not marked as waiting close") } -func assertCommitmentMaturity(t *harnessTest, +func checkCommitmentMaturity( forceClose *lnrpc.PendingChannelsResponse_ForceClosedChannel, - maturityHeight uint32, blocksTilMaturity int32) { + maturityHeight uint32, blocksTilMaturity int32) error { if forceClose.MaturityHeight != maturityHeight { - t.Fatalf("expected commitment maturity height to be %d, "+ - "found %d instead", maturityHeight, + return fmt.Errorf("expected commitment maturity height to be "+ + "%d, found %d instead", maturityHeight, forceClose.MaturityHeight) } if forceClose.BlocksTilMaturity != blocksTilMaturity { - t.Fatalf("expected commitment blocks til maturity to be %d, "+ - "found %d instead", blocksTilMaturity, + return fmt.Errorf("expected commitment blocks til maturity to "+ + "be %d, found %d instead", blocksTilMaturity, forceClose.BlocksTilMaturity) } + + return nil } -// assertForceClosedChannelNumHtlcs verifies that a force closed channel has the +// checkForceClosedChannelNumHtlcs verifies that a force closed channel has the // proper number of htlcs. -func assertPendingChannelNumHtlcs(t *harnessTest, +func checkPendingChannelNumHtlcs( forceClose *lnrpc.PendingChannelsResponse_ForceClosedChannel, - expectedNumHtlcs int) { + expectedNumHtlcs int) error { if len(forceClose.PendingHtlcs) != expectedNumHtlcs { - t.Fatalf("expected force closed channel to have %d pending "+ - "htlcs, found %d instead", expectedNumHtlcs, + return fmt.Errorf("expected force closed channel to have %d "+ + "pending htlcs, found %d instead", expectedNumHtlcs, len(forceClose.PendingHtlcs)) } + + return nil } -// assertNumForceClosedChannels checks that a pending channel response has the +// checkNumForceClosedChannels checks that a pending channel response has the // expected number of force closed channels. -func assertNumForceClosedChannels(t *harnessTest, - pendingChanResp *lnrpc.PendingChannelsResponse, expectedNumChans int) { +func checkNumForceClosedChannels(pendingChanResp *lnrpc.PendingChannelsResponse, + expectedNumChans int) error { if len(pendingChanResp.PendingForceClosingChannels) != expectedNumChans { - t.Fatalf("expected to find %d force closed channels, got %d", - expectedNumChans, + return fmt.Errorf("expected to find %d force closed channels, "+ + "got %d", expectedNumChans, len(pendingChanResp.PendingForceClosingChannels)) } + + return nil } -// assertNumWaitingCloseChannels checks that a pending channel response has the +// checkNumWaitingCloseChannels checks that a pending channel response has the // expected number of channels waiting for closing tx to confirm. -func assertNumWaitingCloseChannels(t *harnessTest, - pendingChanResp *lnrpc.PendingChannelsResponse, expectedNumChans int) { +func checkNumWaitingCloseChannels(pendingChanResp *lnrpc.PendingChannelsResponse, + expectedNumChans int) error { if len(pendingChanResp.WaitingCloseChannels) != expectedNumChans { - t.Fatalf("expected to find %d channels waiting closure, got %d", - expectedNumChans, + return fmt.Errorf("expected to find %d channels waiting "+ + "closure, got %d", expectedNumChans, len(pendingChanResp.WaitingCloseChannels)) } + + return nil } -// assertPendingHtlcStageAndMaturity uniformly tests all pending htlc's -// belonging to a force closed channel, testing for the expected stage number, -// blocks till maturity, and the maturity height. -func assertPendingHtlcStageAndMaturity(t *harnessTest, +// checkPendingHtlcStageAndMaturity uniformly tests all pending htlc's belonging +// to a force closed channel, testing for the expected stage number, blocks till +// maturity, and the maturity height. +func checkPendingHtlcStageAndMaturity( forceClose *lnrpc.PendingChannelsResponse_ForceClosedChannel, - stage, maturityHeight uint32, blocksTillMaturity int32) { + stage, maturityHeight uint32, blocksTillMaturity int32) error { for _, pendingHtlc := range forceClose.PendingHtlcs { if pendingHtlc.Stage != stage { - t.Fatalf("expected pending htlc to be stage %d, "+ - "found %d", stage, pendingHtlc.Stage) + return fmt.Errorf("expected pending htlc to be stage "+ + "%d, found %d", stage, pendingHtlc.Stage) } if pendingHtlc.MaturityHeight != maturityHeight { - t.Fatalf("expected pending htlc maturity height to be "+ - "%d, instead has %d", maturityHeight, - pendingHtlc.MaturityHeight) + return fmt.Errorf("expected pending htlc maturity "+ + "height to be %d, instead has %d", + maturityHeight, pendingHtlc.MaturityHeight) } if pendingHtlc.BlocksTilMaturity != blocksTillMaturity { - t.Fatalf("expected pending htlc blocks til maturity "+ - "to be %d, instead has %d", blocksTillMaturity, + return fmt.Errorf("expected pending htlc blocks til "+ + "maturity to be %d, instead has %d", + blocksTillMaturity, pendingHtlc.BlocksTilMaturity) } } + + return nil } // testChannelForceClosure performs a test to exercise the behavior of "force" @@ -1887,8 +1884,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { htlcCsvMaturityHeight = startHeight + defaultCLTV + 1 + defaultCSV ) - time.Sleep(200 * time.Millisecond) - aliceChan, err := getAliceChanInfo() if err != nil { t.Fatalf("unable to get alice's channel info: %v", err) @@ -1913,7 +1908,10 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err != nil { t.Fatalf("unable to query for pending channels: %v", err) } - assertNumWaitingCloseChannels(t, pendingChanResp, 1) + err = checkNumWaitingCloseChannels(pendingChanResp, 1) + if err != nil { + t.Fatalf(err.Error()) + } // Compute the outpoint of the channel, which we will use repeatedly to // locate the pending channel information in the rpc responses. @@ -1930,7 +1928,10 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { Index: chanPoint.OutputIndex, } - waitingClose := findWaitingCloseChannel(t, pendingChanResp, &op) + waitingClose, err := findWaitingCloseChannel(pendingChanResp, &op) + if err != nil { + t.Fatalf(err.Error()) + } // Immediately after force closing, all of the funds should be in limbo. if waitingClose.LimboBalance == 0 { @@ -1953,37 +1954,56 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to generate block: %v", err) } - // The following sleep provides time for the UTXO nursery to move the - // output from the preschool to the kindergarten database buckets - // prior to RestartNode() being triggered. Without this sleep, the - // database update may fail, causing the UTXO nursery to retry the move - // operation upon restart. This will change the blockheights from what - // is expected by the test. - // TODO(bvu): refactor out this sleep. - duration := time.Millisecond * 300 - time.Sleep(duration) - // Now that the commitment has been confirmed, the channel should be // marked as force closed. - pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest) + err = lntest.WaitPredicate(func() bool { + pendingChanResp, err := net.Alice.PendingChannels( + ctxb, pendingChansRequest, + ) + if err != nil { + predErr = fmt.Errorf("unable to query for pending "+ + "channels: %v", err) + return false + } + + predErr = checkNumForceClosedChannels(pendingChanResp, 1) + if predErr != nil { + return false + } + + forceClose, predErr := findForceClosedChannel( + pendingChanResp, &op, + ) + if predErr != nil { + return false + } + + // Now that the channel has been force closed, it should now + // have the height and number of blocks to confirm populated. + predErr = checkCommitmentMaturity( + forceClose, commCsvMaturityHeight, int32(defaultCSV), + ) + if predErr != nil { + return false + } + + // None of our outputs have been swept, so they should all be in + // limbo. + if forceClose.LimboBalance == 0 { + predErr = errors.New("all funds should still be in " + + "limbo") + return false + } + if forceClose.RecoveredBalance != 0 { + predErr = errors.New("no funds should yet be shown " + + "as recovered") + return false + } + + return true + }, 15*time.Second) if err != nil { - t.Fatalf("unable to query for pending channels: %v", err) - } - assertNumForceClosedChannels(t, pendingChanResp, 1) - - forceClose := findForceClosedChannel(t, pendingChanResp, &op) - - // Now that the channel has been force closed, it should now have the - // height and number of blocks to confirm populated. - assertCommitmentMaturity(t, forceClose, commCsvMaturityHeight, - int32(defaultCSV)) - - // None of our outputs have been swept, so they should all be limbo. - if forceClose.LimboBalance == 0 { - t.Fatalf("all funds should still be in limbo") - } - if forceClose.RecoveredBalance != 0 { - t.Fatalf("no funds should yet be shown as recovered") + t.Fatalf(predErr.Error()) } // The following restart is intended to ensure that outputs from the @@ -2009,26 +2029,58 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("Node restart failed: %v", err) } - pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest) + // Alice should see the channel in her set of pending force closed + // channels with her funds still in limbo. + err = lntest.WaitPredicate(func() bool { + pendingChanResp, err := net.Alice.PendingChannels( + ctxb, pendingChansRequest, + ) + if err != nil { + predErr = fmt.Errorf("unable to query for pending "+ + "channels: %v", err) + return false + } + + predErr = checkNumForceClosedChannels(pendingChanResp, 1) + if predErr != nil { + return false + } + + forceClose, predErr := findForceClosedChannel( + pendingChanResp, &op, + ) + if predErr != nil { + return false + } + + // At this point, the nursery should show that the commitment + // output has 1 block left before its CSV delay expires. In + // total, we have mined exactly defaultCSV blocks, so the htlc + // outputs should also reflect that this many blocks have + // passed. + predErr = checkCommitmentMaturity( + forceClose, commCsvMaturityHeight, 1, + ) + if predErr != nil { + return false + } + + // All funds should still be shown in limbo. + if forceClose.LimboBalance == 0 { + predErr = errors.New("all funds should still be in " + + "limbo") + return false + } + if forceClose.RecoveredBalance != 0 { + predErr = errors.New("no funds should yet be shown " + + "as recovered") + return false + } + + return true + }, 15*time.Second) if err != nil { - t.Fatalf("unable to query for pending channels: %v", err) - } - assertNumForceClosedChannels(t, pendingChanResp, 1) - - forceClose = findForceClosedChannel(t, pendingChanResp, &op) - - // At this point, the nursery should show that the commitment output has - // 1 block left before its CSV delay expires. In total, we have mined - // exactly defaultCSV blocks, so the htlc outputs should also reflect - // that this many blocks have passed. - assertCommitmentMaturity(t, forceClose, commCsvMaturityHeight, 1) - - // All funds should still be shown in limbo. - if forceClose.LimboBalance == 0 { - t.Fatalf("all funds should still be in limbo") - } - if forceClose.RecoveredBalance != 0 { - t.Fatalf("no funds should yet be shown as recovered") + t.Fatalf(predErr.Error()) } // Generate an additional block, which should cause the CSV delayed @@ -2079,22 +2131,25 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { assertTxInBlock(t, block, sweepTx.Hash()) - // We sleep here to ensure that Alice has enough time to receive a - // confirmation for the commitment transaction, which we already - // asserted was in the last block. - time.Sleep(300 * time.Millisecond) - // Now that the commit output has been fully swept, check to see that // the channel remains open for the pending htlc outputs. pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest) if err != nil { t.Fatalf("unable to query for pending channels: %v", err) } - assertNumForceClosedChannels(t, pendingChanResp, 1) + + err = checkNumForceClosedChannels(pendingChanResp, 1) + if err != nil { + t.Fatalf(err.Error()) + } // The htlc funds will still be shown as limbo, since they are still in // their first stage. The commitment funds will have been recovered // after the commit txn was included in the last block. + forceClose, err := findForceClosedChannel(pendingChanResp, &op) + if err != nil { + t.Fatalf(err.Error()) + } if forceClose.LimboBalance == 0 { t.Fatalf("htlc funds should still be in limbo") } @@ -2113,7 +2168,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err != nil { t.Fatalf("unable to generate block: %v", err) } - time.Sleep(duration) // We now restart Alice, to ensure that she will broadcast the presigned // htlc timeout txns after the delay expires after experiencing a while @@ -2121,26 +2175,58 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err := net.RestartNode(net.Alice, nil); err != nil { t.Fatalf("Node restart failed: %v", err) } - time.Sleep(duration) - pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest) + // Alice should now see the channel in her set of pending force closed + // channels with one pending HTLC. + err = lntest.WaitPredicate(func() bool { + pendingChanResp, err = net.Alice.PendingChannels( + ctxb, pendingChansRequest, + ) + if err != nil { + predErr = fmt.Errorf("unable to query for pending "+ + "channels: %v", err) + return false + } + + predErr = checkNumForceClosedChannels(pendingChanResp, 1) + if predErr != nil { + return false + } + + forceClose, predErr = findForceClosedChannel( + pendingChanResp, &op, + ) + if predErr != nil { + return false + } + + // We should now be at the block just before the utxo nursery + // will attempt to broadcast the htlc timeout transactions. + predErr = checkPendingChannelNumHtlcs(forceClose, numInvoices) + if predErr != nil { + return false + } + predErr = checkPendingHtlcStageAndMaturity( + forceClose, 1, htlcExpiryHeight, 1, + ) + if predErr != nil { + return false + } + + // Now that our commitment confirmation depth has been + // surpassed, we should now see a non-zero recovered balance. + // All htlc outputs are still left in limbo, so it should be + // non-zero as well. + if forceClose.LimboBalance == 0 { + predErr = errors.New("htlc funds should still be in " + + "limbo") + return false + } + + return true + }, 15*time.Second) if err != nil { - t.Fatalf("unable to query for pending channels: %v", err) - } - assertNumForceClosedChannels(t, pendingChanResp, 1) - - forceClose = findForceClosedChannel(t, pendingChanResp, &op) - - // We should now be at the block just before the utxo nursery will - // attempt to broadcast the htlc timeout transactions. - assertPendingChannelNumHtlcs(t, forceClose, numInvoices) - assertPendingHtlcStageAndMaturity(t, forceClose, 1, htlcExpiryHeight, 1) - - // Now that our commitment confirmation depth has been surpassed, we - // should now see a non-zero recovered balance. All htlc outputs are - // still left in limbo, so it should be non-zero as well. - if forceClose.LimboBalance == 0 { - t.Fatalf("htlc funds should still be in limbo") + t.Fatalf(predErr.Error()) } // Now, generate the block which will cause Alice to broadcast the @@ -2191,7 +2277,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err := net.RestartNode(net.Alice, nil); err != nil { t.Fatalf("Node restart failed: %v", err) } - time.Sleep(duration) // Generate a block that mines the htlc timeout txns. Doing so now // activates the 2nd-stage CSV delayed outputs. @@ -2199,9 +2284,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err != nil { t.Fatalf("unable to generate block: %v", err) } - // This sleep gives Alice enough to time move the crib outputs into the - // kindergarten bucket. - time.Sleep(duration) // Alice is restarted here to ensure that she promptly moved the crib // outputs to the kindergarten bucket after the htlc timeout txns were @@ -2229,15 +2311,24 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err != nil { t.Fatalf("unable to query for pending channels: %v", err) } - assertNumForceClosedChannels(t, pendingChanResp, 1) + err = checkNumForceClosedChannels(pendingChanResp, 1) + if err != nil { + t.Fatalf(err.Error()) + } - forceClose = findForceClosedChannel(t, pendingChanResp, &op) + forceClose, err = findForceClosedChannel(pendingChanResp, &op) + if err != nil { + t.Fatalf(err.Error()) + } if forceClose.LimboBalance == 0 { t.Fatalf("htlc funds should still be in limbo") } - assertPendingChannelNumHtlcs(t, forceClose, numInvoices) + err = checkPendingChannelNumHtlcs(forceClose, numInvoices) + if err != nil { + t.Fatalf(err.Error()) + } // Generate a block that causes Alice to sweep the htlc outputs in the // kindergarten bucket. @@ -2296,7 +2387,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err := net.RestartNode(net.Alice, nil); err != nil { t.Fatalf("Node restart failed: %v", err) } - time.Sleep(duration) // Now that the channel has been fully swept, it should no longer show // incubated, check to see that Alice's node still reports the channel @@ -2305,14 +2395,27 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err != nil { t.Fatalf("unable to query for pending channels: %v", err) } - assertNumForceClosedChannels(t, pendingChanResp, 1) + err = checkNumForceClosedChannels(pendingChanResp, 1) + if err != nil { + t.Fatalf(err.Error()) + } // All htlcs should show zero blocks until maturity, as evidenced by // having checked the sweep transaction in the mempool. - forceClose = findForceClosedChannel(t, pendingChanResp, &op) - assertPendingChannelNumHtlcs(t, forceClose, numInvoices) - assertPendingHtlcStageAndMaturity(t, forceClose, 2, - htlcCsvMaturityHeight, 0) + forceClose, err = findForceClosedChannel(pendingChanResp, &op) + if err != nil { + t.Fatalf(err.Error()) + } + err = checkPendingChannelNumHtlcs(forceClose, numInvoices) + if err != nil { + t.Fatalf(err.Error()) + } + err = checkPendingHtlcStageAndMaturity( + forceClose, 2, htlcCsvMaturityHeight, 0, + ) + if err != nil { + t.Fatalf(err.Error()) + } // Generate the final block that sweeps all htlc funds into the user's // wallet. @@ -2320,20 +2423,37 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err != nil { t.Fatalf("unable to generate block: %v", err) } - time.Sleep(3 * duration) // Now that the channel has been fully swept, it should no longer show // up within the pending channels RPC. - pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest) - if err != nil { - t.Fatalf("unable to query for pending channels: %v", err) - } - assertNumForceClosedChannels(t, pendingChanResp, 0) + err = lntest.WaitPredicate(func() bool { + pendingChanResp, err := net.Alice.PendingChannels( + ctxb, pendingChansRequest, + ) + if err != nil { + predErr = fmt.Errorf("unable to query for pending "+ + "channels: %v", err) + return false + } - // In addition to there being no pending channels, we verify that - // pending channels does not report any money still in limbo. - if pendingChanResp.TotalLimboBalance != 0 { - t.Fatalf("no user funds should be left in limbo after incubation") + predErr = checkNumForceClosedChannels(pendingChanResp, 0) + if predErr != nil { + return false + } + + // In addition to there being no pending channels, we verify + // that pending channels does not report any money still in + // limbo. + if pendingChanResp.TotalLimboBalance != 0 { + predErr = errors.New("no user funds should be left " + + "in limbo after incubation") + return false + } + + return true + }, 15*time.Second) + if err != nil { + t.Fatalf(predErr.Error()) } // At this point, Bob should now be aware of his new immediately diff --git a/lnwallet/interface_test.go b/lnwallet/interface_test.go index d2ded0f3..2ca744c4 100644 --- a/lnwallet/interface_test.go +++ b/lnwallet/interface_test.go @@ -2068,7 +2068,19 @@ func TestLightningWallet(t *testing.T) { rpcConfig := miningNode.RPCConfig() - chainNotifier, err := btcdnotify.New(&rpcConfig) + tempDir, err := ioutil.TempDir("", "channeldb") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + db, err := channeldb.Open(tempDir) + if err != nil { + t.Fatalf("unable to create db: %v", err) + } + hintCache, err := chainntnfs.NewHeightHintCache(db) + if err != nil { + t.Fatalf("unable to create height hint cache: %v", err) + } + chainNotifier, err := btcdnotify.New(&rpcConfig, hintCache, hintCache) if err != nil { t.Fatalf("unable to create notifier: %v", err) }