From b78cd533f28927789fd1e4350e39953a683727d4 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Mon, 21 May 2018 11:47:39 -0400 Subject: [PATCH 1/9] chainntnfs: add height hint cache interfaces and channeldb implementation In this commit, we introduce two new interfaces: SpendHintCache and ConfirmHintCache for a height hint cache. The SpendHintCache is responsible for maintaining the earliest height at which an outpoint could have been spent within the chain, while the ConfirmHintCache is responsible for maintaining the earliest height at which a transaction confirms within the chain. We also add an implementation of these interfaces with a single struct HeightHintCache, backed by a channeldb instance, which will cary the duties of the interfaces above. --- chainntnfs/height_hint_cache.go | 297 +++++++++++++++++++++++++++ chainntnfs/height_hint_cache_test.go | 148 +++++++++++++ 2 files changed, 445 insertions(+) create mode 100644 chainntnfs/height_hint_cache.go create mode 100644 chainntnfs/height_hint_cache_test.go diff --git a/chainntnfs/height_hint_cache.go b/chainntnfs/height_hint_cache.go new file mode 100644 index 00000000..ba3c2224 --- /dev/null +++ b/chainntnfs/height_hint_cache.go @@ -0,0 +1,297 @@ +package chainntnfs + +import ( + "bytes" + "errors" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + bolt "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/channeldb" +) + +const ( + // dbName is the default name of the database storing the height hints. + dbName = "heighthint.db" + + // dbFilePermission is the default permission of the database file + // storing the height hints. + dbFilePermission = 0600 +) + +var ( + // spendHintBucket is the name of the bucket which houses the height + // hint for outpoints. Each height hint represents the earliest height + // at which its corresponding outpoint could have been spent within. + spendHintBucket = []byte("spend-hints") + + // confirmHintBucket is the name of the bucket which houses the height + // hints for transactions. Each height hint represents the earliest + // height at which its corresponding transaction could have been + // confirmed within. + confirmHintBucket = []byte("confirm-hints") + + // ErrCorruptedHeightHintCache indicates that the on-disk bucketing + // structure has altered since the height hint cache instance was + // initialized. + ErrCorruptedHeightHintCache = errors.New("height hint cache has been " + + "corrupted") + + // ErrSpendHintNotFound is an error returned when a spend hint for an + // outpoint was not found. + ErrSpendHintNotFound = errors.New("spend hint not found") + + // ErrConfirmHintNotFound is an error returned when a confirm hint for a + // transaction was not found. + ErrConfirmHintNotFound = errors.New("confirm hint not found") +) + +// SpendHintCache is an interface whose duty is to cache spend hints for +// outpoints. A spend hint is defined as the earliest height in the chain at +// which an outpoint could have been spent within. +type SpendHintCache interface { + // CommitSpendHint commits a spend hint for the outpoints to the cache. + CommitSpendHint(height uint32, ops ...wire.OutPoint) error + + // QuerySpendHint returns the latest spend hint for an outpoint. + // ErrSpendHintNotFound is returned if a spend hint does not exist + // within the cache for the outpoint. + QuerySpendHint(op wire.OutPoint) (uint32, error) + + // PurgeSpendHint removes the spend hint for the outpoints from the + // cache. + PurgeSpendHint(ops ...wire.OutPoint) error +} + +// ConfirmHintCache is an interface whose duty is to cache confirm hints for +// transactions. A confirm hint is defined as the earliest height in the chain +// at which a transaction could have been included in a block. +type ConfirmHintCache interface { + // CommitConfirmHint commits a confirm hint for the transactions to the + // cache. + CommitConfirmHint(height uint32, txids ...chainhash.Hash) error + + // QueryConfirmHint returns the latest confirm hint for a transaction + // hash. ErrConfirmHintNotFound is returned if a confirm hint does not + // exist within the cache for the transaction hash. + QueryConfirmHint(txid chainhash.Hash) (uint32, error) + + // PurgeConfirmHint removes the confirm hint for the transactions from + // the cache. + PurgeConfirmHint(txids ...chainhash.Hash) error +} + +// HeightHintCache is an implementation of the SpendHintCache and +// ConfirmHintCache interfaces backed by a channeldb DB instance where the hints +// will be stored. +type HeightHintCache struct { + db *channeldb.DB +} + +// Compile-time checks to ensure HeightHintCache satisfies the SpendHintCache +// and ConfirmHintCache interfaces. +var _ SpendHintCache = (*HeightHintCache)(nil) +var _ ConfirmHintCache = (*HeightHintCache)(nil) + +// NewHeightHintCache returns a new height hint cache backed by a database. +func NewHeightHintCache(db *channeldb.DB) (*HeightHintCache, error) { + cache := &HeightHintCache{db} + if err := cache.initBuckets(); err != nil { + return nil, err + } + + return cache, nil +} + +// initBuckets ensures that the primary buckets used by the circuit are +// initialized so that we can assume their existence after startup. +func (c *HeightHintCache) initBuckets() error { + return c.db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(spendHintBucket) + if err != nil { + return err + } + + _, err = tx.CreateBucketIfNotExists(confirmHintBucket) + return err + }) +} + +// CommitSpendHint commits a spend hint for the outpoints to the cache. +func (c *HeightHintCache) CommitSpendHint(height uint32, ops ...wire.OutPoint) error { + Log.Tracef("Updating spend hint to height %d for %v", height, ops) + + return c.db.Batch(func(tx *bolt.Tx) error { + spendHints := tx.Bucket(spendHintBucket) + if spendHints == nil { + return ErrCorruptedHeightHintCache + } + + var hint bytes.Buffer + if err := channeldb.WriteElement(&hint, height); err != nil { + return err + } + + for _, op := range ops { + var outpoint bytes.Buffer + err := channeldb.WriteElement(&outpoint, op) + if err != nil { + return err + } + + err = spendHints.Put(outpoint.Bytes(), hint.Bytes()) + if err != nil { + return err + } + } + + return nil + }) +} + +// QuerySpendHint returns the latest spend hint for an outpoint. +// ErrSpendHintNotFound is returned if a spend hint does not exist within the +// cache for the outpoint. +func (c *HeightHintCache) QuerySpendHint(op wire.OutPoint) (uint32, error) { + var hint uint32 + err := c.db.View(func(tx *bolt.Tx) error { + spendHints := tx.Bucket(spendHintBucket) + if spendHints == nil { + return ErrCorruptedHeightHintCache + } + + var outpoint bytes.Buffer + if err := channeldb.WriteElement(&outpoint, op); err != nil { + return err + } + + spendHint := spendHints.Get(outpoint.Bytes()) + if spendHint == nil { + return ErrSpendHintNotFound + } + + return channeldb.ReadElement(bytes.NewReader(spendHint), &hint) + }) + if err != nil { + return 0, err + } + + return hint, nil +} + +// PurgeSpendHint removes the spend hint for the outpoints from the cache. +func (c *HeightHintCache) PurgeSpendHint(ops ...wire.OutPoint) error { + Log.Tracef("Removing spend hints for %v", ops) + + return c.db.Batch(func(tx *bolt.Tx) error { + spendHints := tx.Bucket(spendHintBucket) + if spendHints == nil { + return ErrCorruptedHeightHintCache + } + + for _, op := range ops { + var outpoint bytes.Buffer + err := channeldb.WriteElement(&outpoint, op) + if err != nil { + return err + } + + err = spendHints.Delete(outpoint.Bytes()) + if err != nil { + return err + } + } + + return nil + }) +} + +// CommitConfirmHint commits a confirm hint for the transactions to the cache. +func (c *HeightHintCache) CommitConfirmHint(height uint32, txids ...chainhash.Hash) error { + Log.Tracef("Updating confirm hints to height %d for %v", height, txids) + + return c.db.Batch(func(tx *bolt.Tx) error { + confirmHints := tx.Bucket(confirmHintBucket) + if confirmHints == nil { + return ErrCorruptedHeightHintCache + } + + var hint bytes.Buffer + if err := channeldb.WriteElement(&hint, height); err != nil { + return err + } + + for _, txid := range txids { + var txHash bytes.Buffer + err := channeldb.WriteElement(&txHash, txid) + if err != nil { + return err + } + + err = confirmHints.Put(txHash.Bytes(), hint.Bytes()) + if err != nil { + return err + } + } + + return nil + }) +} + +// QueryConfirmHint returns the latest confirm hint for a transaction hash. +// ErrConfirmHintNotFound is returned if a confirm hint does not exist within +// the cache for the transaction hash. +func (c *HeightHintCache) QueryConfirmHint(txid chainhash.Hash) (uint32, error) { + var hint uint32 + err := c.db.View(func(tx *bolt.Tx) error { + confirmHints := tx.Bucket(confirmHintBucket) + if confirmHints == nil { + return ErrCorruptedHeightHintCache + } + + var txHash bytes.Buffer + if err := channeldb.WriteElement(&txHash, txid); err != nil { + return err + } + + confirmHint := confirmHints.Get(txHash.Bytes()) + if confirmHint == nil { + return ErrConfirmHintNotFound + } + + return channeldb.ReadElement(bytes.NewReader(confirmHint), &hint) + }) + if err != nil { + return 0, err + } + + return hint, nil +} + +// PurgeConfirmHint removes the confirm hint for the transactions from the +// cache. +func (c *HeightHintCache) PurgeConfirmHint(txids ...chainhash.Hash) error { + Log.Tracef("Removing confirm hints for %v", txids) + + return c.db.Batch(func(tx *bolt.Tx) error { + confirmHints := tx.Bucket(confirmHintBucket) + if confirmHints == nil { + return ErrCorruptedHeightHintCache + } + + for _, txid := range txids { + var txHash bytes.Buffer + err := channeldb.WriteElement(&txHash, txid) + if err != nil { + return err + } + + err = confirmHints.Delete(txHash.Bytes()) + if err != nil { + return err + } + } + + return nil + }) +} diff --git a/chainntnfs/height_hint_cache_test.go b/chainntnfs/height_hint_cache_test.go new file mode 100644 index 00000000..f444b18d --- /dev/null +++ b/chainntnfs/height_hint_cache_test.go @@ -0,0 +1,148 @@ +package chainntnfs + +import ( + "bytes" + "io/ioutil" + "testing" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" +) + +func initHintCache(t *testing.T) *HeightHintCache { + t.Helper() + + tempDir, err := ioutil.TempDir("", "kek") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + db, err := channeldb.Open(tempDir) + if err != nil { + t.Fatalf("unable to create db: %v", err) + } + hintCache, err := NewHeightHintCache(db) + if err != nil { + t.Fatalf("unable to create hint cache: %v", err) + } + + return hintCache +} + +// TestHeightHintCacheConfirms ensures that the height hint cache properly +// caches confirm hints for transactions. +func TestHeightHintCacheConfirms(t *testing.T) { + t.Parallel() + + hintCache := initHintCache(t) + + // Querying for a transaction hash not found within the cache should + // return an error indication so. + var unknownHash chainhash.Hash + _, err := hintCache.QueryConfirmHint(unknownHash) + if err != ErrConfirmHintNotFound { + t.Fatalf("expected ErrConfirmHintNotFound, got: %v", err) + } + + // Now, we'll create some transaction hashes and commit them to the + // cache with the same confirm hint. + const height = 100 + const numHashes = 5 + txHashes := make([]chainhash.Hash, numHashes) + for i := 0; i < numHashes; i++ { + var txHash chainhash.Hash + copy(txHash[:], bytes.Repeat([]byte{byte(i)}, 32)) + txHashes[i] = txHash + } + + if err := hintCache.CommitConfirmHint(height, txHashes...); err != nil { + t.Fatalf("unable to add entries to cache: %v", err) + } + + // With the hashes committed, we'll now query the cache to ensure that + // we're able to properly retrieve the confirm hints. + for _, txHash := range txHashes { + confirmHint, err := hintCache.QueryConfirmHint(txHash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if confirmHint != height { + t.Fatalf("expected confirm hint %d, got %d", height, + confirmHint) + } + } + + // We'll also attempt to purge all of them in a single database + // transaction. + if err := hintCache.PurgeConfirmHint(txHashes...); err != nil { + t.Fatalf("unable to remove confirm hints: %v", err) + } + + // Finally, we'll attempt to query for each hash. We should expect not + // to find a hint for any of them. + for _, txHash := range txHashes { + _, err := hintCache.QueryConfirmHint(txHash) + if err != ErrConfirmHintNotFound { + t.Fatalf("expected ErrConfirmHintNotFound, got :%v", err) + } + } +} + +// TestHeightHintCacheSpends ensures that the height hint cache properly caches +// spend hints for outpoints. +func TestHeightHintCacheSpends(t *testing.T) { + t.Parallel() + + hintCache := initHintCache(t) + + // Querying for an outpoint not found within the cache should return an + // error indication so. + var unknownOutPoint wire.OutPoint + _, err := hintCache.QuerySpendHint(unknownOutPoint) + if err != ErrSpendHintNotFound { + t.Fatalf("expected ErrSpendHintNotFound, got: %v", err) + } + + // Now, we'll create some outpoints and commit them to the cache with + // the same spend hint. + const height = 100 + const numOutpoints = 5 + var txHash chainhash.Hash + copy(txHash[:], bytes.Repeat([]byte{0xFF}, 32)) + outpoints := make([]wire.OutPoint, numOutpoints) + for i := uint32(0); i < numOutpoints; i++ { + outpoints[i] = wire.OutPoint{Hash: txHash, Index: i} + } + + if err := hintCache.CommitSpendHint(height, outpoints...); err != nil { + t.Fatalf("unable to add entry to cache: %v", err) + } + + // With the outpoints committed, we'll now query the cache to ensure + // that we're able to properly retrieve the confirm hints. + for _, op := range outpoints { + spendHint, err := hintCache.QuerySpendHint(op) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if spendHint != height { + t.Fatalf("expected spend hint %d, got %d", height, + spendHint) + } + } + + // We'll also attempt to purge all of them in a single database + // transaction. + if err := hintCache.PurgeSpendHint(outpoints...); err != nil { + t.Fatalf("unable to remove spend hint: %v", err) + } + + // Finally, we'll attempt to query for each outpoint. We should expect + // not to find a hint for any of them. + for _, op := range outpoints { + _, err = hintCache.QuerySpendHint(op) + if err != ErrSpendHintNotFound { + t.Fatalf("expected ErrSpendHintNotFound, got: %v", err) + } + } +} From 6be642a0338dce469ceb2813e8005d3656372cd7 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 22 May 2018 15:55:32 -0400 Subject: [PATCH 2/9] chainntnfs: cache confirm hints within TxConfNotifier In this commit, we extend our TxConfNotifier to cache height hints for our confirmation events. Each transaction we've requested a confirmation notification for will have its initial height hint cached. We increment this height hint at every new block for unconfirmed transactions. This allows us to retrieve the *exact* height at which the transaction has been included in a block. By doing this, we optimize the different ChainNotifier implementations since they will no longer have to scan forward (and possibly fetch blocks in the neutrino/pruned node case) from the initial height hint looking for the confirmation. --- chainntnfs/txconfnotifier.go | 84 ++++++++++- chainntnfs/txconfnotifier_test.go | 238 +++++++++++++++++++++++++++++- 2 files changed, 317 insertions(+), 5 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index bb049b8a..ff97b23e 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -89,6 +89,11 @@ type TxConfNotifier struct { // at which the transaction will have sufficient confirmations. ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{} + // hintCache is a cache used to maintain the latest height hints for + // transactions. Each height hint represents the earliest height at + // which the transactions could have been confirmed within the chain. + hintCache ConfirmHintCache + // quit is closed in order to signal that the notifier is gracefully // exiting. quit chan struct{} @@ -98,13 +103,16 @@ type TxConfNotifier struct { // NewTxConfNotifier creates a TxConfNotifier. The current height of the // blockchain is accepted as a parameter. -func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotifier { +func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, + hintCache ConfirmHintCache) *TxConfNotifier { + return &TxConfNotifier{ currentHeight: startHeight, reorgSafetyLimit: reorgSafetyLimit, confNotifications: make(map[chainhash.Hash]map[uint64]*ConfNtfn), txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), + hintCache: hintCache, quit: make(chan struct{}), } } @@ -130,6 +138,16 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error { if !ok { ntfns = make(map[uint64]*ConfNtfn) tcn.confNotifications[*ntfn.TxID] = ntfns + + err := tcn.hintCache.CommitConfirmHint( + tcn.currentHeight, *ntfn.TxID, + ) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + Log.Errorf("Unable to update confirm hint to %d for "+ + "%v: %v", tcn.currentHeight, *ntfn.TxID, err) + } } ntfns[ntfn.ConfID] = ntfn @@ -175,6 +193,14 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, return nil } + err := tcn.hintCache.CommitConfirmHint(details.BlockHeight, txid) + if err != nil { + // The error is not fatal, so we should not return an error to + // the caller. + Log.Errorf("Unable to update confirm hint to %d for %v: %v", + details.BlockHeight, txid, err) + } + // The notifier has yet to reach the height at which the transaction was // included in a block, so we should defer until handling it then within // ConnectTip. @@ -297,6 +323,48 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, } } + // In order to update the height hint for all the required transactions + // under one database transaction, we'll gather the set of unconfirmed + // transactions along with the ones that confirmed at the current + // height. To do so, we'll iterate over the confNotifications map, which + // contains the transactions we currently have notifications for. Since + // this map doesn't tell us whether the transaction hsa confirmed or + // not, we'll need to look at txsByInitialHeight to determine so. + var txsToUpdateHints []chainhash.Hash + for confirmedTx := range tcn.txsByInitialHeight[tcn.currentHeight] { + txsToUpdateHints = append(txsToUpdateHints, confirmedTx) + } +out: + for maybeUnconfirmedTx := range tcn.confNotifications { + for height, confirmedTxs := range tcn.txsByInitialHeight { + // Skip the transactions that confirmed at the new block + // height as those have already been added. + if height == blockHeight { + continue + } + + // If the transaction was found within the set of + // confirmed transactions at this height, we'll skip it. + if _, ok := confirmedTxs[maybeUnconfirmedTx]; ok { + continue out + } + } + txsToUpdateHints = append(txsToUpdateHints, maybeUnconfirmedTx) + } + + if len(txsToUpdateHints) > 0 { + err := tcn.hintCache.CommitConfirmHint( + tcn.currentHeight, txsToUpdateHints..., + ) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + Log.Errorf("Unable to update confirm hint to %d for "+ + "%v: %v", tcn.currentHeight, txsToUpdateHints, + err) + } + } + // Next, we'll dispatch an update to all of the notification clients for // our watched transactions with the number of confirmations left at // this new height. @@ -447,6 +515,20 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { } } + // Rewind the height hint for all watched transactions. + var txs []chainhash.Hash + for tx := range tcn.confNotifications { + txs = append(txs, tx) + } + + err := tcn.hintCache.CommitConfirmHint(tcn.currentHeight, txs...) + if err != nil { + // The error is not fatal, so we should not return an error to + // the caller. + Log.Errorf("Unable to update confirm hint to %d for %v: %v", + tcn.currentHeight, txs, err) + } + // Finally, we can remove the transactions we're currently watching that // were included in this block height. delete(tcn.txsByInitialHeight, blockHeight) diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index 2a57245a..fb996d0a 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -1,6 +1,7 @@ package chainntnfs_test import ( + "sync" "testing" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -11,6 +12,90 @@ import ( var zeroHash chainhash.Hash +type mockHintCache struct { + mu sync.Mutex + confHints map[chainhash.Hash]uint32 + spendHints map[wire.OutPoint]uint32 +} + +var _ chainntnfs.SpendHintCache = (*mockHintCache)(nil) +var _ chainntnfs.ConfirmHintCache = (*mockHintCache)(nil) + +func (c *mockHintCache) CommitSpendHint(heightHint uint32, ops ...wire.OutPoint) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, op := range ops { + c.spendHints[op] = heightHint + } + + return nil +} + +func (c *mockHintCache) QuerySpendHint(op wire.OutPoint) (uint32, error) { + c.mu.Lock() + defer c.mu.Unlock() + + hint, ok := c.spendHints[op] + if !ok { + return 0, chainntnfs.ErrSpendHintNotFound + } + + return hint, nil +} + +func (c *mockHintCache) PurgeSpendHint(ops ...wire.OutPoint) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, op := range ops { + delete(c.spendHints, op) + } + + return nil +} + +func (c *mockHintCache) CommitConfirmHint(heightHint uint32, txids ...chainhash.Hash) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, txid := range txids { + c.confHints[txid] = heightHint + } + + return nil +} + +func (c *mockHintCache) QueryConfirmHint(txid chainhash.Hash) (uint32, error) { + c.mu.Lock() + defer c.mu.Unlock() + + hint, ok := c.confHints[txid] + if !ok { + return 0, chainntnfs.ErrConfirmHintNotFound + } + + return hint, nil +} + +func (c *mockHintCache) PurgeConfirmHint(txids ...chainhash.Hash) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, txid := range txids { + delete(c.confHints, txid) + } + + return nil +} + +func newMockHintCache() *mockHintCache { + return &mockHintCache{ + confHints: make(map[chainhash.Hash]uint32), + spendHints: make(map[wire.OutPoint]uint32), + } +} + // TestTxConfFutureDispatch tests that the TxConfNotifier dispatches // registered notifications when the transaction confirms after registration. func TestTxConfFutureDispatch(t *testing.T) { @@ -27,7 +112,8 @@ func TestTxConfFutureDispatch(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions and register them with the // TxConfNotifier before including them in a block to receive future @@ -200,7 +286,8 @@ func TestTxConfHistoricalDispatch(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions at a height before the TxConfNotifier's // starting height so that they are confirmed once registering them. @@ -351,7 +438,8 @@ func TestTxConfChainReorg(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) - txConfNotifier := chainntnfs.NewTxConfNotifier(7, 100) + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier(7, 100, hintCache) // Tx 1 will be confirmed in block 9 and requires 2 confs. tx1Hash := tx1.TxHash() @@ -586,6 +674,147 @@ func TestTxConfChainReorg(t *testing.T) { } } +// TestTxConfHeightHintCache ensures that the height hints for transactions are +// kept track of correctly with each new block connected/disconnected. +func TestTxConfHeightHintCache(t *testing.T) { + t.Parallel() + + const ( + startingHeight = 10 + tx1Height = 11 + tx2Height = 12 + ) + + // Initialize our TxConfNotifier instance backed by a height hint cache. + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier( + startingHeight, 100, hintCache, + ) + + // Create two test transactions and register them for notifications. + tx1 := wire.MsgTx{Version: 1} + tx1Hash := tx1.TxHash() + ntfn1 := &chainntnfs.ConfNtfn{ + TxID: &tx1Hash, + NumConfirmations: 1, + Event: chainntnfs.NewConfirmationEvent(1), + } + + tx2 := wire.MsgTx{Version: 2} + tx2Hash := tx2.TxHash() + ntfn2 := &chainntnfs.ConfNtfn{ + TxID: &tx2Hash, + NumConfirmations: 2, + Event: chainntnfs.NewConfirmationEvent(2), + } + + if err := txConfNotifier.Register(ntfn1); err != nil { + t.Fatalf("unable to register tx1: %v", err) + } + if err := txConfNotifier.Register(ntfn2); err != nil { + t.Fatalf("unable to register tx2: %v", err) + } + + // Both transactions should have a height hint of the starting height + // due to registering notifications for them. + hint, err := hintCache.QueryConfirmHint(tx1Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != startingHeight { + t.Fatalf("expected hint %d, got %d", startingHeight, hint) + } + + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != startingHeight { + t.Fatalf("expected hint %d, got %d", startingHeight, hint) + } + + // Create a new block that will include the first transaction and extend + // the chain. + block1 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx1}, + }) + + err = txConfNotifier.ConnectTip( + block1.Hash(), tx1Height, block1.Transactions(), + ) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + // The height hint for the first transaction should now be updated to + // reflect its confirmation. + hint, err = hintCache.QueryConfirmHint(tx1Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx1Height { + t.Fatalf("expected hint %d, got %d", tx1Height, hint) + } + + // The height hint for the second transaction should also be updated due + // to it still being unconfirmed. + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx1Height { + t.Fatalf("expected hint %d, got %d", tx1Height, hint) + } + + // Now, we'll create another block that will include the second + // transaction and extend the chain. + block2 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx2}, + }) + + err = txConfNotifier.ConnectTip( + block2.Hash(), tx2Height, block2.Transactions(), + ) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + // The height hint for the first transaction should remain the same. + hint, err = hintCache.QueryConfirmHint(tx1Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx1Height { + t.Fatalf("expected hint %d, got %d", tx1Height, hint) + } + + // The height hint for the second transaction should now be updated to + // reflect its confirmation. + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx2Height { + t.Fatalf("expected hint %d, got %d", tx2Height, hint) + } + + // Now, we'll attempt do disconnect the last block in order to simulate + // a chain reorg. + if err := txConfNotifier.DisconnectTip(tx2Height); err != nil { + t.Fatalf("Failed to disconnect block: %v", err) + } + + // This should update the second transaction's height hint within the + // cache to the previous height. + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx1Height { + t.Fatalf("expected hint %d, got %d", tx1Height, hint) + } +} + func TestTxConfTearDown(t *testing.T) { t.Parallel() @@ -594,7 +823,8 @@ func TestTxConfTearDown(t *testing.T) { tx2 = wire.MsgTx{Version: 2} ) - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions and register them with the // TxConfNotifier to receive notifications. From 30fd219b1c010c0c66528be727c251ba3d78ed37 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 14 Aug 2018 17:53:34 -0700 Subject: [PATCH 3/9] chainntnfs: add height hint caches to chain notifiers --- chainntnfs/bitcoindnotify/bitcoind.go | 20 ++++++++++++-- chainntnfs/bitcoindnotify/bitcoind_debug.go | 3 ++- chainntnfs/bitcoindnotify/driver.go | 21 ++++++++++++--- chainntnfs/btcdnotify/btcd.go | 20 ++++++++++++-- chainntnfs/btcdnotify/btcd_debug.go | 3 ++- chainntnfs/btcdnotify/driver.go | 25 +++++++++++++----- chainntnfs/interface_test.go | 29 +++++++++++++++++---- chainntnfs/neutrinonotify/driver.go | 25 +++++++++++++----- chainntnfs/neutrinonotify/neutrino.go | 19 ++++++++++++-- chainntnfs/neutrinonotify/neutrino_debug.go | 3 ++- 10 files changed, 138 insertions(+), 30 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 21140994..3fd41156 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -76,6 +76,16 @@ type BitcoindNotifier struct { bestBlock chainntnfs.BlockEpoch + // spendHintCache is a cache used to query and update the latest height + // hints for an outpoint. Each height hint represents the earliest + // height at which the outpoint could have been spent within the chain. + spendHintCache chainntnfs.SpendHintCache + + // confirmHintCache is a cache used to query the latest height hints for + // a transaction. Each height hint represents the earliest height at + // which the transaction could have confirmed within the chain. + confirmHintCache chainntnfs.ConfirmHintCache + wg sync.WaitGroup quit chan struct{} } @@ -87,7 +97,9 @@ var _ chainntnfs.ChainNotifier = (*BitcoindNotifier)(nil) // New returns a new BitcoindNotifier instance. This function assumes the // bitcoind node detailed in the passed configuration is already running, and // willing to accept RPC requests and new zmq clients. -func New(chainConn *chain.BitcoindConn) *BitcoindNotifier { +func New(chainConn *chain.BitcoindConn, spendHintCache chainntnfs.SpendHintCache, + confirmHintCache chainntnfs.ConfirmHintCache) *BitcoindNotifier { + notifier := &BitcoindNotifier{ notificationCancels: make(chan interface{}), notificationRegistry: make(chan interface{}), @@ -96,6 +108,9 @@ func New(chainConn *chain.BitcoindConn) *BitcoindNotifier { spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), + spendHintCache: spendHintCache, + confirmHintCache: confirmHintCache, + quit: make(chan struct{}), } @@ -127,7 +142,8 @@ func (b *BitcoindNotifier) Start() error { } b.txConfNotifier = chainntnfs.NewTxConfNotifier( - uint32(currentHeight), reorgSafetyLimit) + uint32(currentHeight), reorgSafetyLimit, b.confirmHintCache, + ) b.bestBlock = chainntnfs.BlockEpoch{ Height: currentHeight, diff --git a/chainntnfs/bitcoindnotify/bitcoind_debug.go b/chainntnfs/bitcoindnotify/bitcoind_debug.go index 33d1aa4d..85842ca5 100644 --- a/chainntnfs/bitcoindnotify/bitcoind_debug.go +++ b/chainntnfs/bitcoindnotify/bitcoind_debug.go @@ -29,7 +29,8 @@ func (b *BitcoindNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Has } b.txConfNotifier = chainntnfs.NewTxConfNotifier( - uint32(bestHeight), reorgSafetyLimit) + uint32(bestHeight), reorgSafetyLimit, b.confirmHintCache, + ) if generateBlocks != nil { // Ensure no block notifications are pending when we start the diff --git a/chainntnfs/bitcoindnotify/driver.go b/chainntnfs/bitcoindnotify/driver.go index b27d2c64..d6ef3705 100644 --- a/chainntnfs/bitcoindnotify/driver.go +++ b/chainntnfs/bitcoindnotify/driver.go @@ -1,6 +1,7 @@ package bitcoindnotify import ( + "errors" "fmt" "github.com/btcsuite/btcwallet/chain" @@ -10,18 +11,30 @@ import ( // createNewNotifier creates a new instance of the ChainNotifier interface // implemented by BitcoindNotifier. func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) { - if len(args) != 1 { + if len(args) != 3 { return nil, fmt.Errorf("incorrect number of arguments to "+ - ".New(...), expected 1, instead passed %v", len(args)) + ".New(...), expected 2, instead passed %v", len(args)) } chainConn, ok := args[0].(*chain.BitcoindConn) if !ok { - return nil, fmt.Errorf("first argument to bitcoindnotify.New " + + return nil, errors.New("first argument to bitcoindnotify.New " + "is incorrect, expected a *chain.BitcoindConn") } - return New(chainConn), nil + spendHintCache, ok := args[1].(chainntnfs.SpendHintCache) + if !ok { + return nil, errors.New("second argument to bitcoindnotify.New " + + "is incorrect, expected a chainntnfs.SpendHintCache") + } + + confirmHintCache, ok := args[2].(chainntnfs.ConfirmHintCache) + if !ok { + return nil, errors.New("third argument to bitcoindnotify.New " + + "is incorrect, expected a chainntnfs.ConfirmHintCache") + } + + return New(chainConn, spendHintCache, confirmHintCache), nil } // init registers a driver for the BtcdNotifier concrete implementation of the diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index c7cb6995..13ad76d7 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -83,6 +83,16 @@ type BtcdNotifier struct { chainUpdates *chainntnfs.ConcurrentQueue txUpdates *chainntnfs.ConcurrentQueue + // spendHintCache is a cache used to query and update the latest height + // hints for an outpoint. Each height hint represents the earliest + // height at which the outpoint could have been spent within the chain. + spendHintCache chainntnfs.SpendHintCache + + // confirmHintCache is a cache used to query the latest height hints for + // a transaction. Each height hint represents the earliest height at + // which the transaction could have confirmed within the chain. + confirmHintCache chainntnfs.ConfirmHintCache + wg sync.WaitGroup quit chan struct{} } @@ -93,7 +103,9 @@ var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil) // New returns a new BtcdNotifier instance. This function assumes the btcd node // detailed in the passed configuration is already running, and willing to // accept new websockets clients. -func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) { +func New(config *rpcclient.ConnConfig, spendHintCache chainntnfs.SpendHintCache, + confirmHintCache chainntnfs.ConfirmHintCache) (*BtcdNotifier, error) { + notifier := &BtcdNotifier{ notificationCancels: make(chan interface{}), notificationRegistry: make(chan interface{}), @@ -105,6 +117,9 @@ func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) { chainUpdates: chainntnfs.NewConcurrentQueue(10), txUpdates: chainntnfs.NewConcurrentQueue(10), + spendHintCache: spendHintCache, + confirmHintCache: confirmHintCache, + quit: make(chan struct{}), } @@ -150,7 +165,8 @@ func (b *BtcdNotifier) Start() error { } b.txConfNotifier = chainntnfs.NewTxConfNotifier( - uint32(currentHeight), reorgSafetyLimit) + uint32(currentHeight), reorgSafetyLimit, b.confirmHintCache, + ) b.bestBlock = chainntnfs.BlockEpoch{ Height: currentHeight, diff --git a/chainntnfs/btcdnotify/btcd_debug.go b/chainntnfs/btcdnotify/btcd_debug.go index 8ddffc39..47136f0b 100644 --- a/chainntnfs/btcdnotify/btcd_debug.go +++ b/chainntnfs/btcdnotify/btcd_debug.go @@ -28,7 +28,8 @@ func (b *BtcdNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash, } b.txConfNotifier = chainntnfs.NewTxConfNotifier( - uint32(bestHeight), reorgSafetyLimit) + uint32(bestHeight), reorgSafetyLimit, b.confirmHintCache, + ) b.chainUpdates.Start() b.txUpdates.Start() diff --git a/chainntnfs/btcdnotify/driver.go b/chainntnfs/btcdnotify/driver.go index 19e405a1..1cda9192 100644 --- a/chainntnfs/btcdnotify/driver.go +++ b/chainntnfs/btcdnotify/driver.go @@ -1,6 +1,7 @@ package btcdnotify import ( + "errors" "fmt" "github.com/btcsuite/btcd/rpcclient" @@ -10,18 +11,30 @@ import ( // createNewNotifier creates a new instance of the ChainNotifier interface // implemented by BtcdNotifier. func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) { - if len(args) != 1 { - return nil, fmt.Errorf("incorrect number of arguments to .New(...), "+ - "expected 1, instead passed %v", len(args)) + if len(args) != 3 { + return nil, fmt.Errorf("incorrect number of arguments to "+ + ".New(...), expected 2, instead passed %v", len(args)) } config, ok := args[0].(*rpcclient.ConnConfig) if !ok { - return nil, fmt.Errorf("first argument to btcdnotifier.New is " + - "incorrect, expected a *rpcclient.ConnConfig") + return nil, errors.New("first argument to btcdnotifier.New " + + "is incorrect, expected a *rpcclient.ConnConfig") } - return New(config) + spendHintCache, ok := args[1].(chainntnfs.SpendHintCache) + if !ok { + return nil, errors.New("second argument to btcdnotifier.New " + + "is incorrect, expected a chainntnfs.SpendHintCache") + } + + confirmHintCache, ok := args[2].(chainntnfs.ConfirmHintCache) + if !ok { + return nil, errors.New("third argument to btcdnotifier.New " + + "is incorrect, expected a chainntnfs.ConfirmHintCache") + } + + return New(config, spendHintCache, confirmHintCache) } // init registers a driver for the BtcdNotifier concrete implementation of the diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index dade94ed..242e9ed9 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -26,6 +26,7 @@ import ( "github.com/btcsuite/btcwallet/walletdb" "github.com/lightninglabs/neutrino" "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" // Required to auto-register the bitcoind backed ChainNotifier // implementation. @@ -1741,10 +1742,22 @@ func TestInterfaces(t *testing.T) { newNotifier func() (chainntnfs.TestChainNotifier, error) ) for _, notifierDriver := range chainntnfs.RegisteredNotifiers() { + // Initialize a height hint cache for each notifier. + tempDir, err := ioutil.TempDir("", "channeldb") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + db, err := channeldb.Open(tempDir) + if err != nil { + t.Fatalf("unable to create db: %v", err) + } + hintCache, err := chainntnfs.NewHeightHintCache(db) + if err != nil { + t.Fatalf("unable to create height hint cache: %v", err) + } + notifierType := notifierDriver.NotifierType - switch notifierType { - case "bitcoind": // Start a bitcoind instance. tempBitcoindDir, err := ioutil.TempDir("", "bitcoind") @@ -1807,12 +1820,16 @@ func TestInterfaces(t *testing.T) { cleanUp = cleanUp3 newNotifier = func() (chainntnfs.TestChainNotifier, error) { - return bitcoindnotify.New(chainConn), nil + return bitcoindnotify.New( + chainConn, hintCache, hintCache, + ), nil } case "btcd": newNotifier = func() (chainntnfs.TestChainNotifier, error) { - return btcdnotify.New(&rpcConfig) + return btcdnotify.New( + &rpcConfig, hintCache, hintCache, + ) } cleanUp = func() {} @@ -1855,7 +1872,9 @@ func TestInterfaces(t *testing.T) { time.Sleep(time.Millisecond * 100) } newNotifier = func() (chainntnfs.TestChainNotifier, error) { - return neutrinonotify.New(spvNode) + return neutrinonotify.New( + spvNode, hintCache, hintCache, + ) } } diff --git a/chainntnfs/neutrinonotify/driver.go b/chainntnfs/neutrinonotify/driver.go index c9dc1b36..6a6ffd8e 100644 --- a/chainntnfs/neutrinonotify/driver.go +++ b/chainntnfs/neutrinonotify/driver.go @@ -1,6 +1,7 @@ package neutrinonotify import ( + "errors" "fmt" "github.com/lightninglabs/neutrino" @@ -10,18 +11,30 @@ import ( // createNewNotifier creates a new instance of the ChainNotifier interface // implemented by NeutrinoNotifier. func createNewNotifier(args ...interface{}) (chainntnfs.ChainNotifier, error) { - if len(args) != 1 { - return nil, fmt.Errorf("incorrect number of arguments to .New(...), "+ - "expected 1, instead passed %v", len(args)) + if len(args) != 2 { + return nil, fmt.Errorf("incorrect number of arguments to "+ + ".New(...), expected 2, instead passed %v", len(args)) } config, ok := args[0].(*neutrino.ChainService) if !ok { - return nil, fmt.Errorf("first argument to neutrinonotify.New is " + - "incorrect, expected a *neutrino.ChainService") + return nil, errors.New("first argument to neutrinonotify.New " + + "is incorrect, expected a *neutrino.ChainService") } - return New(config) + spendHintCache, ok := args[1].(chainntnfs.SpendHintCache) + if !ok { + return nil, errors.New("second argument to neutrinonotify.New " + + "is incorrect, expected a chainntfs.SpendHintCache") + } + + confirmHintCache, ok := args[2].(chainntnfs.ConfirmHintCache) + if !ok { + return nil, errors.New("third argument to neutrinonotify.New " + + "is incorrect, expected a chainntfs.ConfirmHintCache") + } + + return New(config, spendHintCache, confirmHintCache) } // init registers a driver for the NeutrinoNotify concrete implementation of diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 07e297a6..7ce0934e 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -77,6 +77,16 @@ type NeutrinoNotifier struct { chainUpdates *chainntnfs.ConcurrentQueue + // spendHintCache is a cache used to query and update the latest height + // hints for an outpoint. Each height hint represents the earliest + // height at which the outpoint could have been spent within the chain. + spendHintCache chainntnfs.SpendHintCache + + // confirmHintCache is a cache used to query the latest height hints for + // a transaction. Each height hint represents the earliest height at + // which the transaction could have confirmed within the chain. + confirmHintCache chainntnfs.ConfirmHintCache + wg sync.WaitGroup quit chan struct{} } @@ -89,7 +99,9 @@ var _ chainntnfs.ChainNotifier = (*NeutrinoNotifier)(nil) // // NOTE: The passed neutrino node should already be running and active before // being passed into this function. -func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) { +func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache, + confirmHintCache chainntnfs.ConfirmHintCache) (*NeutrinoNotifier, error) { + notifier := &NeutrinoNotifier{ notificationCancels: make(chan interface{}), notificationRegistry: make(chan interface{}), @@ -104,6 +116,9 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) { chainUpdates: chainntnfs.NewConcurrentQueue(10), + spendHintCache: spendHintCache, + confirmHintCache: confirmHintCache, + quit: make(chan struct{}), } @@ -150,7 +165,7 @@ func (n *NeutrinoNotifier) Start() error { } n.txConfNotifier = chainntnfs.NewTxConfNotifier( - bestHeight, reorgSafetyLimit, + bestHeight, reorgSafetyLimit, n.confirmHintCache, ) n.chainConn = &NeutrinoChainConn{n.p2pNode} diff --git a/chainntnfs/neutrinonotify/neutrino_debug.go b/chainntnfs/neutrinonotify/neutrino_debug.go index 56724c4f..ece4af27 100644 --- a/chainntnfs/neutrinonotify/neutrino_debug.go +++ b/chainntnfs/neutrinonotify/neutrino_debug.go @@ -51,7 +51,8 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Has } n.txConfNotifier = chainntnfs.NewTxConfNotifier( - uint32(bestHeight), reorgSafetyLimit) + uint32(bestHeight), reorgSafetyLimit, n.confirmHintCache, + ) n.chainConn = &NeutrinoChainConn{n.p2pNode} From 7e872566c46baff742ee5e20ca3652e6a99255bc Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 14 Aug 2018 17:54:21 -0700 Subject: [PATCH 4/9] chainntnfs: query the hint cache before registering a conf ntfn In this commit, we alter the different chain notifiers to query their height hint cache before registering a confimation notification. We do this as it's possible that the cache has a higher height hint, which can potentially reduce the amount of blocked fetched when attempting historical dispatches. --- chainntnfs/bitcoindnotify/bitcoind.go | 12 ++++++++++++ chainntnfs/btcdnotify/btcd.go | 12 ++++++++++++ chainntnfs/neutrinonotify/neutrino.go | 12 ++++++++++++ 3 files changed, 36 insertions(+) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 3fd41156..3fc60524 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -869,6 +869,18 @@ type confirmationNotification struct { func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { + // Before proceeding to register the notification, we'll query our + // height hint cache to determine whether a better one exists. + if hint, err := b.confirmHintCache.QueryConfirmHint(*txid); err == nil { + if hint > heightHint { + chainntnfs.Log.Debugf("Using height hint %d retrieved "+ + "from cache for %v", hint, txid) + heightHint = hint + } + } + + // Construct a notification request for the transaction and send it to + // the main event loop. ntfn := &confirmationNotification{ ConfNtfn: chainntnfs.ConfNtfn{ ConfID: atomic.AddUint64(&b.confClientCounter, 1), diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 13ad76d7..d8b4c909 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -916,6 +916,18 @@ type confirmationNotification struct { func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { + // Before proceeding to register the notification, we'll query our + // height hint cache to determine whether a better one exists. + if hint, err := b.confirmHintCache.QueryConfirmHint(*txid); err == nil { + if hint > heightHint { + chainntnfs.Log.Debugf("Using height hint %d retrieved "+ + "from cache for %v", hint, txid) + heightHint = hint + } + } + + // Construct a notification request for the transaction and send it to + // the main event loop. ntfn := &confirmationNotification{ ConfNtfn: chainntnfs.ConfNtfn{ ConfID: atomic.AddUint64(&b.confClientCounter, 1), diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 7ce0934e..d3aa0d6d 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -869,6 +869,18 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { + // Before proceeding to register the notification, we'll query our + // height hint cache to determine whether a better one exists. + if hint, err := n.confirmHintCache.QueryConfirmHint(*txid); err == nil { + if hint > heightHint { + chainntnfs.Log.Debugf("Using height hint %d retrieved "+ + "from cache for %v", hint, txid) + heightHint = hint + } + } + + // Construct a notification request for the transaction and send it to + // the main event loop. ntfn := &confirmationsNotification{ ConfNtfn: chainntnfs.ConfNtfn{ ConfID: atomic.AddUint64(&n.confClientCounter, 1), From 94beabf34b6698613b85beff2759f35668f2c6d5 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 14 Aug 2018 17:55:29 -0700 Subject: [PATCH 5/9] chainntnfs: cache spend hints within the different chain notifiers In this commit, we extend the different ChainNotifier implementations to cache height hints for our spend events. Each outpoint we've requested a spend notification for will have its initial height hint cached. We then increment this height hint at every new block for unspent outpoints. This allows us to retrieve the *exact* height at which the outpoint has been spent. By doing this, we optimize the different ChainNotifier implementations since they will no longer have to rescan forward (and possibly fetch blocks in the neutrino/pruned node case) from the initial height hint. --- chainntnfs/bitcoindnotify/bitcoind.go | 63 ++++++++++++++++++++--- chainntnfs/btcdnotify/btcd.go | 73 ++++++++++++++++++++++----- chainntnfs/neutrinonotify/neutrino.go | 61 ++++++++++++++++++---- 3 files changed, 167 insertions(+), 30 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 3fc60524..134c591b 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -587,9 +587,6 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err return fmt.Errorf("unable to get block: %v", err) } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - block.Height, block.Hash) - txns := btcutil.NewBlock(rawBlock).Transactions() err = b.txConfNotifier.ConnectTip( block.Hash, uint32(block.Height), txns) @@ -597,6 +594,9 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err return fmt.Errorf("unable to connect tip: %v", err) } + chainntnfs.Log.Infof("New block: height=%v, sha=%v", block.Height, + block.Hash) + // We want to set the best block before dispatching notifications so // if any subscribers make queries based on their received block epoch, // our state is fully updated in time. @@ -604,6 +604,26 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err b.notifyBlockEpochs(block.Height, block.Hash) + // Finally, we'll update the spend height hint for all of our watched + // outpoints that have not been spent yet. This is safe to do as we do + // not watch already spent outpoints for spend notifications. + ops := make([]wire.OutPoint, 0, len(b.spendNotifications)) + for op := range b.spendNotifications { + ops = append(ops, op) + } + + if len(ops) > 0 { + err := b.spendHintCache.CommitSpendHint( + uint32(block.Height), ops..., + ) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + chainntnfs.Log.Errorf("Unable to update spend hint to "+ + "%d for %v: %v", block.Height, ops, err) + } + } + return nil } @@ -662,6 +682,18 @@ type spendCancel struct { func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { + // Before proceeding to register the notification, we'll query our + // height hint cache to determine whether a better one exists. + if hint, err := b.spendHintCache.QuerySpendHint(*outpoint); err == nil { + if hint > heightHint { + chainntnfs.Log.Debugf("Using height hint %d retrieved "+ + "from cache for %v", hint, outpoint) + heightHint = hint + } + } + + // Construct a notification request for the outpoint and send it to the + // main event loop. ntfn := &spendNotification{ targetOutpoint: outpoint, spendChan: make(chan *chainntnfs.SpendDetail, 1), @@ -688,7 +720,20 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return nil, err } - if txOut == nil { + // If the output is unspent, then we'll write it to the cache with the + // given height hint. This allows us to increase the height hint as the + // chain extends and the output remains unspent. + if txOut != nil { + err := b.spendHintCache.CommitSpendHint(heightHint, *outpoint) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + chainntnfs.Log.Error("Unable to update spend hint to "+ + "%d for %v: %v", heightHint, *outpoint, err) + } + } else { + // Otherwise, we'll determine when the output was spent. + // // First, we'll attempt to retrieve the transaction's block hash // using the backend's transaction index. tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash) @@ -718,7 +763,9 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, int64(heightHint), ) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to retrieve "+ + "hash for block with height %d: %v", + heightHint, err) } } @@ -810,11 +857,13 @@ func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint, blockHash, err := b.chainConn.GetBlockHash(int64(height)) if err != nil { - return err + return fmt.Errorf("unable to retrieve hash for block "+ + "with height %d: %v", height, err) } block, err := b.chainConn.GetBlock(blockHash) if err != nil { - return err + return fmt.Errorf("unable to retrieve block with hash "+ + "%v: %v", blockHash, err) } for _, tx := range block.Transactions { diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index d8b4c909..22febdd2 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -662,23 +662,23 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { return fmt.Errorf("unable to get block: %v", err) } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - epoch.Height, epoch.Hash) - - txns := btcutil.NewBlock(rawBlock).Transactions() - newBlock := &filteredBlock{ hash: *epoch.Hash, height: uint32(epoch.Height), - txns: txns, + txns: btcutil.NewBlock(rawBlock).Transactions(), connect: true, } - err = b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, - newBlock.txns) + + err = b.txConfNotifier.ConnectTip( + &newBlock.hash, newBlock.height, newBlock.txns, + ) if err != nil { return fmt.Errorf("unable to connect tip: %v", err) } + chainntnfs.Log.Infof("New block: height=%v, sha=%v", epoch.Height, + epoch.Hash) + // We want to set the best block before dispatching notifications // so if any subscribers make queries based on their received // block epoch, our state is fully updated in time. @@ -687,8 +687,8 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { // Next we'll notify any subscribed clients of the block. b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - // Finally, we'll scan over the list of relevant transactions and - // possibly dispatch notifications for confirmations and spends. + // Scan over the list of relevant transactions and possibly dispatch + // notifications for spends. for _, tx := range newBlock.txns { mtx := tx.MsgTx() txSha := mtx.TxHash() @@ -714,8 +714,10 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { } for _, ntfn := range clients { - chainntnfs.Log.Infof("Dispatching spend notification for "+ - "outpoint=%v", ntfn.targetOutpoint) + chainntnfs.Log.Infof("Dispatching spend "+ + "notification for outpoint=%v", + ntfn.targetOutpoint) + ntfn.spendChan <- spendDetails // Close spendChan to ensure that any calls to @@ -729,6 +731,26 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { } } + // Finally, we'll update the spend height hint for all of our watched + // outpoints that have not been spent yet. This is safe to do as we do + // not watch already spent outpoints for spend notifications. + ops := make([]wire.OutPoint, 0, len(b.spendNotifications)) + for op := range b.spendNotifications { + ops = append(ops, op) + } + + if len(ops) > 0 { + err := b.spendHintCache.CommitSpendHint( + uint32(epoch.Height), ops..., + ) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + chainntnfs.Log.Errorf("Unable to update spend hint to "+ + "%d for %v: %v", epoch.Height, ops, err) + } + } + return nil } @@ -787,6 +809,18 @@ type spendCancel struct { func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { + // Before proceeding to register the notification, we'll query our + // height hint cache to determine whether a better one exists. + if hint, err := b.spendHintCache.QuerySpendHint(*outpoint); err == nil { + if hint > heightHint { + chainntnfs.Log.Debugf("Using height hint %d retrieved "+ + "from cache for %v", hint, outpoint) + heightHint = hint + } + } + + // Construct a notification request for the outpoint and send it to the + // main event loop. ntfn := &spendNotification{ targetOutpoint: outpoint, spendChan: make(chan *chainntnfs.SpendDetail, 1), @@ -815,7 +849,20 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return nil, err } - if txOut == nil { + // If the output is unspent, then we'll write it to the cache with the + // given height hint. This allows us to increase the height hint as the + // chain extends and the output remains unspent. + if txOut != nil { + err := b.spendHintCache.CommitSpendHint(heightHint, *outpoint) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + chainntnfs.Log.Error("Unable to update spend hint to "+ + "%d for %v: %v", heightHint, *outpoint, err) + } + } else { + // Otherwise, we'll determine when the output was spent. + // // First, we'll attempt to retrieve the transaction's block hash // using the backend's transaction index. tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index d3aa0d6d..c7cf321e 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -603,22 +603,23 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { // First process the block for our internal state. A new block has // been connected to the main chain. Send out any N confirmation // notifications which may have been triggered by this new block. - err := n.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, - newBlock.txns) + err := n.txConfNotifier.ConnectTip( + &newBlock.hash, newBlock.height, newBlock.txns, + ) if err != nil { return fmt.Errorf("unable to connect tip: %v", err) } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - newBlock.height, newBlock.hash) + chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height, + newBlock.hash) n.bestHeight = newBlock.height // Next, notify any subscribed clients of the block. n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - // Finally, we'll scan over the list of relevant transactions and - // possibly dispatch notifications for confirmations and spends. + // Scan over the list of relevant transactions and possibly dispatch + // notifications for spends. for _, tx := range newBlock.txns { mtx := tx.MsgTx() txSha := mtx.TxHash() @@ -661,6 +662,24 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { } } + // Finally, we'll update the spend height hint for all of our watched + // outpoints that have not been spent yet. This is safe to do as we do + // not watch already spent outpoints for spend notifications. + ops := make([]wire.OutPoint, 0, len(n.spendNotifications)) + for op := range n.spendNotifications { + ops = append(ops, op) + } + + if len(ops) > 0 { + err := n.spendHintCache.CommitSpendHint(newBlock.height, ops...) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + chainntnfs.Log.Errorf("Unable to update spend hint to "+ + "%d for %v: %v", newBlock.height, ops, err) + } + } + return nil } @@ -740,15 +759,26 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, currentHeight := n.bestHeight n.heightMtx.RUnlock() - chainntnfs.Log.Infof("New spend notification for outpoint=%v, "+ - "height_hint=%v", outpoint, heightHint) + // Before proceeding to register the notification, we'll query our + // height hint cache to determine whether a better one exists. + if hint, err := n.spendHintCache.QuerySpendHint(*outpoint); err == nil { + if hint > heightHint { + chainntnfs.Log.Debugf("Using height hint %d retrieved "+ + "from cache for %v", hint, outpoint) + heightHint = hint + } + } + // Construct a notification request for the outpoint. We'll defer + // sending it to the main event loop until after we've guaranteed that + // the outpoint has not been spent. ntfn := &spendNotification{ targetOutpoint: outpoint, spendChan: make(chan *chainntnfs.SpendDetail, 1), spendID: atomic.AddUint64(&n.spendClientCounter, 1), heightHint: heightHint, } + spendEvent := &chainntnfs.SpendEvent{ Spend: ntfn.spendChan, Cancel: func() { @@ -760,8 +790,9 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // Submit spend cancellation to notification dispatcher. select { case n.notificationCancels <- cancel: - // Cancellation is being handled, drain the spend chan until it is - // closed before yielding to the caller. + // Cancellation is being handled, drain the + // spend chan until it is closed before yielding + // to the caller. for { select { case _, ok := <-ntfn.spendChan: @@ -851,6 +882,16 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return nil, ErrChainNotifierShuttingDown } + // Finally, we'll add a spent hint with the current height to the cache + // in order to better keep track of when this outpoint is spent. + err = n.spendHintCache.CommitSpendHint(currentHeight, *outpoint) + if err != nil { + // The error is not fatal, so we should not return an error to + // the caller. + chainntnfs.Log.Errorf("Unable to update spend hint to %d for "+ + "%v: %v", currentHeight, outpoint, err) + } + return spendEvent, nil } From 4c2a0970b472e9576dc2cd5864ca1aa9eb274e37 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 22 May 2018 15:48:47 -0400 Subject: [PATCH 6/9] chainregistry: modify backends to use a height hint cache --- chainregistry.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/chainregistry.go b/chainregistry.go index 27f763c1..108cfc55 100644 --- a/chainregistry.go +++ b/chainregistry.go @@ -181,6 +181,13 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB, cleanUp func() ) + // Initialize the height hint cache within the chain directory. + hintCache, err := chainntnfs.NewHeightHintCache(chanDB) + if err != nil { + return nil, nil, fmt.Errorf("unable to initialize height hint "+ + "cache: %v", err) + } + // If spv mode is active, then we'll be using a distinct set of // chainControl interfaces that interface directly with the p2p network // of the selected chain. @@ -245,7 +252,9 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB, // Next we'll create the instances of the ChainNotifier and // FilteredChainView interface which is backed by the neutrino // light client. - cc.chainNotifier, err = neutrinonotify.New(svc) + cc.chainNotifier, err = neutrinonotify.New( + svc, hintCache, hintCache, + ) if err != nil { return nil, nil, err } @@ -322,7 +331,9 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB, "bitcoind: %v", err) } - cc.chainNotifier = bitcoindnotify.New(bitcoindConn) + cc.chainNotifier = bitcoindnotify.New( + bitcoindConn, hintCache, hintCache, + ) cc.chainView = chainview.NewBitcoindFilteredChainView(bitcoindConn) walletConfig.ChainSource = bitcoindConn.NewBitcoindClient(birthday) @@ -430,7 +441,9 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB, DisableConnectOnNew: true, DisableAutoReconnect: false, } - cc.chainNotifier, err = btcdnotify.New(rpcConfig) + cc.chainNotifier, err = btcdnotify.New( + rpcConfig, hintCache, hintCache, + ) if err != nil { return nil, nil, err } From 2931b2d3b2e071c46ed4bdda3fb62037a1de0f0c Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 22 May 2018 15:49:38 -0400 Subject: [PATCH 7/9] lnwallet: add height hint cache to backend tests --- lnwallet/interface_test.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/lnwallet/interface_test.go b/lnwallet/interface_test.go index d2ded0f3..2ca744c4 100644 --- a/lnwallet/interface_test.go +++ b/lnwallet/interface_test.go @@ -2068,7 +2068,19 @@ func TestLightningWallet(t *testing.T) { rpcConfig := miningNode.RPCConfig() - chainNotifier, err := btcdnotify.New(&rpcConfig) + tempDir, err := ioutil.TempDir("", "channeldb") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + db, err := channeldb.Open(tempDir) + if err != nil { + t.Fatalf("unable to create db: %v", err) + } + hintCache, err := chainntnfs.NewHeightHintCache(db) + if err != nil { + t.Fatalf("unable to create height hint cache: %v", err) + } + chainNotifier, err := btcdnotify.New(&rpcConfig, hintCache, hintCache) if err != nil { t.Fatalf("unable to create notifier: %v", err) } From 96a079873a6da2f0a93adc30945971fc07c5e610 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 23 May 2018 13:54:12 -0400 Subject: [PATCH 8/9] test: remove sleep timeouts used within channel force closure In this commit, we remove the sleep timeouts used within the channel force closure integration test. This is needed because the recent changes within the ChainNotifier require a longer timeout due to making a database transaction on every new block to update the confirm/spend hints of transactions. Rather than increasing the timeouts, we simply remove them to ensure this isn't an issue down the road. --- lnd_test.go | 432 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 276 insertions(+), 156 deletions(-) diff --git a/lnd_test.go b/lnd_test.go index dce4e5f4..41b63763 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -1628,122 +1628,119 @@ func testChannelBalance(net *lntest.NetworkHarness, t *harnessTest) { // findForceClosedChannel searches a pending channel response for a particular // channel, returning the force closed channel upon success. -func findForceClosedChannel(t *harnessTest, - pendingChanResp *lnrpc.PendingChannelsResponse, - op *wire.OutPoint) *lnrpc.PendingChannelsResponse_ForceClosedChannel { +func findForceClosedChannel(pendingChanResp *lnrpc.PendingChannelsResponse, + op *wire.OutPoint) (*lnrpc.PendingChannelsResponse_ForceClosedChannel, error) { - var found bool - var forceClose *lnrpc.PendingChannelsResponse_ForceClosedChannel - for _, forceClose = range pendingChanResp.PendingForceClosingChannels { + for _, forceClose := range pendingChanResp.PendingForceClosingChannels { if forceClose.Channel.ChannelPoint == op.String() { - found = true - break + return forceClose, nil } } - if !found { - t.Fatalf("channel not marked as force closed") - } - return forceClose + return nil, errors.New("channel not marked as force closed") } // findWaitingCloseChannel searches a pending channel response for a particular // channel, returning the waiting close channel upon success. -func findWaitingCloseChannel(t *harnessTest, - pendingChanResp *lnrpc.PendingChannelsResponse, - op *wire.OutPoint) *lnrpc.PendingChannelsResponse_WaitingCloseChannel { +func findWaitingCloseChannel(pendingChanResp *lnrpc.PendingChannelsResponse, + op *wire.OutPoint) (*lnrpc.PendingChannelsResponse_WaitingCloseChannel, error) { - var found bool - var waitingClose *lnrpc.PendingChannelsResponse_WaitingCloseChannel - for _, waitingClose = range pendingChanResp.WaitingCloseChannels { + for _, waitingClose := range pendingChanResp.WaitingCloseChannels { if waitingClose.Channel.ChannelPoint == op.String() { - found = true - break + return waitingClose, nil } } - if !found { - t.Fatalf("channel not marked as waiting close") - } - return waitingClose + return nil, errors.New("channel not marked as waiting close") } -func assertCommitmentMaturity(t *harnessTest, +func checkCommitmentMaturity( forceClose *lnrpc.PendingChannelsResponse_ForceClosedChannel, - maturityHeight uint32, blocksTilMaturity int32) { + maturityHeight uint32, blocksTilMaturity int32) error { if forceClose.MaturityHeight != maturityHeight { - t.Fatalf("expected commitment maturity height to be %d, "+ - "found %d instead", maturityHeight, + return fmt.Errorf("expected commitment maturity height to be "+ + "%d, found %d instead", maturityHeight, forceClose.MaturityHeight) } if forceClose.BlocksTilMaturity != blocksTilMaturity { - t.Fatalf("expected commitment blocks til maturity to be %d, "+ - "found %d instead", blocksTilMaturity, + return fmt.Errorf("expected commitment blocks til maturity to "+ + "be %d, found %d instead", blocksTilMaturity, forceClose.BlocksTilMaturity) } + + return nil } -// assertForceClosedChannelNumHtlcs verifies that a force closed channel has the +// checkForceClosedChannelNumHtlcs verifies that a force closed channel has the // proper number of htlcs. -func assertPendingChannelNumHtlcs(t *harnessTest, +func checkPendingChannelNumHtlcs( forceClose *lnrpc.PendingChannelsResponse_ForceClosedChannel, - expectedNumHtlcs int) { + expectedNumHtlcs int) error { if len(forceClose.PendingHtlcs) != expectedNumHtlcs { - t.Fatalf("expected force closed channel to have %d pending "+ - "htlcs, found %d instead", expectedNumHtlcs, + return fmt.Errorf("expected force closed channel to have %d "+ + "pending htlcs, found %d instead", expectedNumHtlcs, len(forceClose.PendingHtlcs)) } + + return nil } -// assertNumForceClosedChannels checks that a pending channel response has the +// checkNumForceClosedChannels checks that a pending channel response has the // expected number of force closed channels. -func assertNumForceClosedChannels(t *harnessTest, - pendingChanResp *lnrpc.PendingChannelsResponse, expectedNumChans int) { +func checkNumForceClosedChannels(pendingChanResp *lnrpc.PendingChannelsResponse, + expectedNumChans int) error { if len(pendingChanResp.PendingForceClosingChannels) != expectedNumChans { - t.Fatalf("expected to find %d force closed channels, got %d", - expectedNumChans, + return fmt.Errorf("expected to find %d force closed channels, "+ + "got %d", expectedNumChans, len(pendingChanResp.PendingForceClosingChannels)) } + + return nil } -// assertNumWaitingCloseChannels checks that a pending channel response has the +// checkNumWaitingCloseChannels checks that a pending channel response has the // expected number of channels waiting for closing tx to confirm. -func assertNumWaitingCloseChannels(t *harnessTest, - pendingChanResp *lnrpc.PendingChannelsResponse, expectedNumChans int) { +func checkNumWaitingCloseChannels(pendingChanResp *lnrpc.PendingChannelsResponse, + expectedNumChans int) error { if len(pendingChanResp.WaitingCloseChannels) != expectedNumChans { - t.Fatalf("expected to find %d channels waiting closure, got %d", - expectedNumChans, + return fmt.Errorf("expected to find %d channels waiting "+ + "closure, got %d", expectedNumChans, len(pendingChanResp.WaitingCloseChannels)) } + + return nil } -// assertPendingHtlcStageAndMaturity uniformly tests all pending htlc's -// belonging to a force closed channel, testing for the expected stage number, -// blocks till maturity, and the maturity height. -func assertPendingHtlcStageAndMaturity(t *harnessTest, +// checkPendingHtlcStageAndMaturity uniformly tests all pending htlc's belonging +// to a force closed channel, testing for the expected stage number, blocks till +// maturity, and the maturity height. +func checkPendingHtlcStageAndMaturity( forceClose *lnrpc.PendingChannelsResponse_ForceClosedChannel, - stage, maturityHeight uint32, blocksTillMaturity int32) { + stage, maturityHeight uint32, blocksTillMaturity int32) error { for _, pendingHtlc := range forceClose.PendingHtlcs { if pendingHtlc.Stage != stage { - t.Fatalf("expected pending htlc to be stage %d, "+ - "found %d", stage, pendingHtlc.Stage) + return fmt.Errorf("expected pending htlc to be stage "+ + "%d, found %d", stage, pendingHtlc.Stage) } if pendingHtlc.MaturityHeight != maturityHeight { - t.Fatalf("expected pending htlc maturity height to be "+ - "%d, instead has %d", maturityHeight, - pendingHtlc.MaturityHeight) + return fmt.Errorf("expected pending htlc maturity "+ + "height to be %d, instead has %d", + maturityHeight, pendingHtlc.MaturityHeight) } if pendingHtlc.BlocksTilMaturity != blocksTillMaturity { - t.Fatalf("expected pending htlc blocks til maturity "+ - "to be %d, instead has %d", blocksTillMaturity, + return fmt.Errorf("expected pending htlc blocks til "+ + "maturity to be %d, instead has %d", + blocksTillMaturity, pendingHtlc.BlocksTilMaturity) } } + + return nil } // testChannelForceClosure performs a test to exercise the behavior of "force" @@ -1887,8 +1884,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { htlcCsvMaturityHeight = startHeight + defaultCLTV + 1 + defaultCSV ) - time.Sleep(200 * time.Millisecond) - aliceChan, err := getAliceChanInfo() if err != nil { t.Fatalf("unable to get alice's channel info: %v", err) @@ -1913,7 +1908,10 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err != nil { t.Fatalf("unable to query for pending channels: %v", err) } - assertNumWaitingCloseChannels(t, pendingChanResp, 1) + err = checkNumWaitingCloseChannels(pendingChanResp, 1) + if err != nil { + t.Fatalf(err.Error()) + } // Compute the outpoint of the channel, which we will use repeatedly to // locate the pending channel information in the rpc responses. @@ -1930,7 +1928,10 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { Index: chanPoint.OutputIndex, } - waitingClose := findWaitingCloseChannel(t, pendingChanResp, &op) + waitingClose, err := findWaitingCloseChannel(pendingChanResp, &op) + if err != nil { + t.Fatalf(err.Error()) + } // Immediately after force closing, all of the funds should be in limbo. if waitingClose.LimboBalance == 0 { @@ -1953,37 +1954,56 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to generate block: %v", err) } - // The following sleep provides time for the UTXO nursery to move the - // output from the preschool to the kindergarten database buckets - // prior to RestartNode() being triggered. Without this sleep, the - // database update may fail, causing the UTXO nursery to retry the move - // operation upon restart. This will change the blockheights from what - // is expected by the test. - // TODO(bvu): refactor out this sleep. - duration := time.Millisecond * 300 - time.Sleep(duration) - // Now that the commitment has been confirmed, the channel should be // marked as force closed. - pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest) + err = lntest.WaitPredicate(func() bool { + pendingChanResp, err := net.Alice.PendingChannels( + ctxb, pendingChansRequest, + ) + if err != nil { + predErr = fmt.Errorf("unable to query for pending "+ + "channels: %v", err) + return false + } + + predErr = checkNumForceClosedChannels(pendingChanResp, 1) + if predErr != nil { + return false + } + + forceClose, predErr := findForceClosedChannel( + pendingChanResp, &op, + ) + if predErr != nil { + return false + } + + // Now that the channel has been force closed, it should now + // have the height and number of blocks to confirm populated. + predErr = checkCommitmentMaturity( + forceClose, commCsvMaturityHeight, int32(defaultCSV), + ) + if predErr != nil { + return false + } + + // None of our outputs have been swept, so they should all be in + // limbo. + if forceClose.LimboBalance == 0 { + predErr = errors.New("all funds should still be in " + + "limbo") + return false + } + if forceClose.RecoveredBalance != 0 { + predErr = errors.New("no funds should yet be shown " + + "as recovered") + return false + } + + return true + }, 15*time.Second) if err != nil { - t.Fatalf("unable to query for pending channels: %v", err) - } - assertNumForceClosedChannels(t, pendingChanResp, 1) - - forceClose := findForceClosedChannel(t, pendingChanResp, &op) - - // Now that the channel has been force closed, it should now have the - // height and number of blocks to confirm populated. - assertCommitmentMaturity(t, forceClose, commCsvMaturityHeight, - int32(defaultCSV)) - - // None of our outputs have been swept, so they should all be limbo. - if forceClose.LimboBalance == 0 { - t.Fatalf("all funds should still be in limbo") - } - if forceClose.RecoveredBalance != 0 { - t.Fatalf("no funds should yet be shown as recovered") + t.Fatalf(predErr.Error()) } // The following restart is intended to ensure that outputs from the @@ -2009,26 +2029,58 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("Node restart failed: %v", err) } - pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest) + // Alice should see the channel in her set of pending force closed + // channels with her funds still in limbo. + err = lntest.WaitPredicate(func() bool { + pendingChanResp, err := net.Alice.PendingChannels( + ctxb, pendingChansRequest, + ) + if err != nil { + predErr = fmt.Errorf("unable to query for pending "+ + "channels: %v", err) + return false + } + + predErr = checkNumForceClosedChannels(pendingChanResp, 1) + if predErr != nil { + return false + } + + forceClose, predErr := findForceClosedChannel( + pendingChanResp, &op, + ) + if predErr != nil { + return false + } + + // At this point, the nursery should show that the commitment + // output has 1 block left before its CSV delay expires. In + // total, we have mined exactly defaultCSV blocks, so the htlc + // outputs should also reflect that this many blocks have + // passed. + predErr = checkCommitmentMaturity( + forceClose, commCsvMaturityHeight, 1, + ) + if predErr != nil { + return false + } + + // All funds should still be shown in limbo. + if forceClose.LimboBalance == 0 { + predErr = errors.New("all funds should still be in " + + "limbo") + return false + } + if forceClose.RecoveredBalance != 0 { + predErr = errors.New("no funds should yet be shown " + + "as recovered") + return false + } + + return true + }, 15*time.Second) if err != nil { - t.Fatalf("unable to query for pending channels: %v", err) - } - assertNumForceClosedChannels(t, pendingChanResp, 1) - - forceClose = findForceClosedChannel(t, pendingChanResp, &op) - - // At this point, the nursery should show that the commitment output has - // 1 block left before its CSV delay expires. In total, we have mined - // exactly defaultCSV blocks, so the htlc outputs should also reflect - // that this many blocks have passed. - assertCommitmentMaturity(t, forceClose, commCsvMaturityHeight, 1) - - // All funds should still be shown in limbo. - if forceClose.LimboBalance == 0 { - t.Fatalf("all funds should still be in limbo") - } - if forceClose.RecoveredBalance != 0 { - t.Fatalf("no funds should yet be shown as recovered") + t.Fatalf(predErr.Error()) } // Generate an additional block, which should cause the CSV delayed @@ -2079,22 +2131,25 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { assertTxInBlock(t, block, sweepTx.Hash()) - // We sleep here to ensure that Alice has enough time to receive a - // confirmation for the commitment transaction, which we already - // asserted was in the last block. - time.Sleep(300 * time.Millisecond) - // Now that the commit output has been fully swept, check to see that // the channel remains open for the pending htlc outputs. pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest) if err != nil { t.Fatalf("unable to query for pending channels: %v", err) } - assertNumForceClosedChannels(t, pendingChanResp, 1) + + err = checkNumForceClosedChannels(pendingChanResp, 1) + if err != nil { + t.Fatalf(err.Error()) + } // The htlc funds will still be shown as limbo, since they are still in // their first stage. The commitment funds will have been recovered // after the commit txn was included in the last block. + forceClose, err := findForceClosedChannel(pendingChanResp, &op) + if err != nil { + t.Fatalf(err.Error()) + } if forceClose.LimboBalance == 0 { t.Fatalf("htlc funds should still be in limbo") } @@ -2113,7 +2168,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err != nil { t.Fatalf("unable to generate block: %v", err) } - time.Sleep(duration) // We now restart Alice, to ensure that she will broadcast the presigned // htlc timeout txns after the delay expires after experiencing a while @@ -2121,26 +2175,58 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err := net.RestartNode(net.Alice, nil); err != nil { t.Fatalf("Node restart failed: %v", err) } - time.Sleep(duration) - pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest) + // Alice should now see the channel in her set of pending force closed + // channels with one pending HTLC. + err = lntest.WaitPredicate(func() bool { + pendingChanResp, err = net.Alice.PendingChannels( + ctxb, pendingChansRequest, + ) + if err != nil { + predErr = fmt.Errorf("unable to query for pending "+ + "channels: %v", err) + return false + } + + predErr = checkNumForceClosedChannels(pendingChanResp, 1) + if predErr != nil { + return false + } + + forceClose, predErr = findForceClosedChannel( + pendingChanResp, &op, + ) + if predErr != nil { + return false + } + + // We should now be at the block just before the utxo nursery + // will attempt to broadcast the htlc timeout transactions. + predErr = checkPendingChannelNumHtlcs(forceClose, numInvoices) + if predErr != nil { + return false + } + predErr = checkPendingHtlcStageAndMaturity( + forceClose, 1, htlcExpiryHeight, 1, + ) + if predErr != nil { + return false + } + + // Now that our commitment confirmation depth has been + // surpassed, we should now see a non-zero recovered balance. + // All htlc outputs are still left in limbo, so it should be + // non-zero as well. + if forceClose.LimboBalance == 0 { + predErr = errors.New("htlc funds should still be in " + + "limbo") + return false + } + + return true + }, 15*time.Second) if err != nil { - t.Fatalf("unable to query for pending channels: %v", err) - } - assertNumForceClosedChannels(t, pendingChanResp, 1) - - forceClose = findForceClosedChannel(t, pendingChanResp, &op) - - // We should now be at the block just before the utxo nursery will - // attempt to broadcast the htlc timeout transactions. - assertPendingChannelNumHtlcs(t, forceClose, numInvoices) - assertPendingHtlcStageAndMaturity(t, forceClose, 1, htlcExpiryHeight, 1) - - // Now that our commitment confirmation depth has been surpassed, we - // should now see a non-zero recovered balance. All htlc outputs are - // still left in limbo, so it should be non-zero as well. - if forceClose.LimboBalance == 0 { - t.Fatalf("htlc funds should still be in limbo") + t.Fatalf(predErr.Error()) } // Now, generate the block which will cause Alice to broadcast the @@ -2191,7 +2277,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err := net.RestartNode(net.Alice, nil); err != nil { t.Fatalf("Node restart failed: %v", err) } - time.Sleep(duration) // Generate a block that mines the htlc timeout txns. Doing so now // activates the 2nd-stage CSV delayed outputs. @@ -2199,9 +2284,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err != nil { t.Fatalf("unable to generate block: %v", err) } - // This sleep gives Alice enough to time move the crib outputs into the - // kindergarten bucket. - time.Sleep(duration) // Alice is restarted here to ensure that she promptly moved the crib // outputs to the kindergarten bucket after the htlc timeout txns were @@ -2229,15 +2311,24 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err != nil { t.Fatalf("unable to query for pending channels: %v", err) } - assertNumForceClosedChannels(t, pendingChanResp, 1) + err = checkNumForceClosedChannels(pendingChanResp, 1) + if err != nil { + t.Fatalf(err.Error()) + } - forceClose = findForceClosedChannel(t, pendingChanResp, &op) + forceClose, err = findForceClosedChannel(pendingChanResp, &op) + if err != nil { + t.Fatalf(err.Error()) + } if forceClose.LimboBalance == 0 { t.Fatalf("htlc funds should still be in limbo") } - assertPendingChannelNumHtlcs(t, forceClose, numInvoices) + err = checkPendingChannelNumHtlcs(forceClose, numInvoices) + if err != nil { + t.Fatalf(err.Error()) + } // Generate a block that causes Alice to sweep the htlc outputs in the // kindergarten bucket. @@ -2296,7 +2387,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err := net.RestartNode(net.Alice, nil); err != nil { t.Fatalf("Node restart failed: %v", err) } - time.Sleep(duration) // Now that the channel has been fully swept, it should no longer show // incubated, check to see that Alice's node still reports the channel @@ -2305,14 +2395,27 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err != nil { t.Fatalf("unable to query for pending channels: %v", err) } - assertNumForceClosedChannels(t, pendingChanResp, 1) + err = checkNumForceClosedChannels(pendingChanResp, 1) + if err != nil { + t.Fatalf(err.Error()) + } // All htlcs should show zero blocks until maturity, as evidenced by // having checked the sweep transaction in the mempool. - forceClose = findForceClosedChannel(t, pendingChanResp, &op) - assertPendingChannelNumHtlcs(t, forceClose, numInvoices) - assertPendingHtlcStageAndMaturity(t, forceClose, 2, - htlcCsvMaturityHeight, 0) + forceClose, err = findForceClosedChannel(pendingChanResp, &op) + if err != nil { + t.Fatalf(err.Error()) + } + err = checkPendingChannelNumHtlcs(forceClose, numInvoices) + if err != nil { + t.Fatalf(err.Error()) + } + err = checkPendingHtlcStageAndMaturity( + forceClose, 2, htlcCsvMaturityHeight, 0, + ) + if err != nil { + t.Fatalf(err.Error()) + } // Generate the final block that sweeps all htlc funds into the user's // wallet. @@ -2320,20 +2423,37 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { if err != nil { t.Fatalf("unable to generate block: %v", err) } - time.Sleep(3 * duration) // Now that the channel has been fully swept, it should no longer show // up within the pending channels RPC. - pendingChanResp, err = net.Alice.PendingChannels(ctxb, pendingChansRequest) - if err != nil { - t.Fatalf("unable to query for pending channels: %v", err) - } - assertNumForceClosedChannels(t, pendingChanResp, 0) + err = lntest.WaitPredicate(func() bool { + pendingChanResp, err := net.Alice.PendingChannels( + ctxb, pendingChansRequest, + ) + if err != nil { + predErr = fmt.Errorf("unable to query for pending "+ + "channels: %v", err) + return false + } - // In addition to there being no pending channels, we verify that - // pending channels does not report any money still in limbo. - if pendingChanResp.TotalLimboBalance != 0 { - t.Fatalf("no user funds should be left in limbo after incubation") + predErr = checkNumForceClosedChannels(pendingChanResp, 0) + if predErr != nil { + return false + } + + // In addition to there being no pending channels, we verify + // that pending channels does not report any money still in + // limbo. + if pendingChanResp.TotalLimboBalance != 0 { + predErr = errors.New("no user funds should be left " + + "in limbo after incubation") + return false + } + + return true + }, 15*time.Second) + if err != nil { + t.Fatalf(predErr.Error()) } // At this point, Bob should now be aware of his new immediately From 2f644d9b62d4d50acbc1c2bbceeb2b434faf6331 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 15 Aug 2018 14:51:31 -0700 Subject: [PATCH 9/9] chainntnfs/neutrinonotify: disable disconnected block ntfns while rescanning In this commit, we modify the rescan options Neutrino uses when performing a rescan for historical chain events to disable disconnected block notifications. This is needed as the Neutrino backend will mutate its internal state while rewinding, which causes disconnected block notifications to be sent. Since the notifier acts upon these notifications, they would cause it to also rewind unnecessarily. --- chainntnfs/neutrinonotify/neutrino.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index c7cf321e..5fb4d2a3 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -373,6 +373,7 @@ out: rescanUpdate := []neutrino.UpdateOption{ neutrino.AddAddrs(addrs...), neutrino.Rewind(currentHeight), + neutrino.DisableDisconnectedNtfns(true), } err = n.chainView.Update(rescanUpdate...) if err != nil { @@ -870,6 +871,7 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, rescanUpdate := []neutrino.UpdateOption{ neutrino.AddInputs(inputToWatch), neutrino.Rewind(currentHeight), + neutrino.DisableDisconnectedNtfns(true), } if err := n.chainView.Update(rescanUpdate...); err != nil {