diff --git a/chainntnfs/bitcoindnotify/bitcoind_test.go b/chainntnfs/bitcoindnotify/bitcoind_test.go index 16bbc2ed..e65945b0 100644 --- a/chainntnfs/bitcoindnotify/bitcoind_test.go +++ b/chainntnfs/bitcoindnotify/bitcoind_test.go @@ -25,7 +25,7 @@ func initHintCache(t *testing.T) *chainntnfs.HeightHintCache { if err != nil { t.Fatalf("unable to create db: %v", err) } - hintCache, err := chainntnfs.NewHeightHintCache(db) + hintCache, err := chainntnfs.NewHeightHintCache(db, true) if err != nil { t.Fatalf("unable to create hint cache: %v", err) } diff --git a/chainntnfs/btcdnotify/btcd_test.go b/chainntnfs/btcdnotify/btcd_test.go index 34f53d5b..239e0d9d 100644 --- a/chainntnfs/btcdnotify/btcd_test.go +++ b/chainntnfs/btcdnotify/btcd_test.go @@ -23,7 +23,7 @@ func initHintCache(t *testing.T) *chainntnfs.HeightHintCache { if err != nil { t.Fatalf("unable to create db: %v", err) } - hintCache, err := chainntnfs.NewHeightHintCache(db) + hintCache, err := chainntnfs.NewHeightHintCache(db, true) if err != nil { t.Fatalf("unable to create hint cache: %v", err) } diff --git a/chainntnfs/height_hint_cache.go b/chainntnfs/height_hint_cache.go index ba3c2224..33dac971 100644 --- a/chainntnfs/height_hint_cache.go +++ b/chainntnfs/height_hint_cache.go @@ -85,7 +85,8 @@ type ConfirmHintCache interface { // ConfirmHintCache interfaces backed by a channeldb DB instance where the hints // will be stored. type HeightHintCache struct { - db *channeldb.DB + db *channeldb.DB + disabled bool } // Compile-time checks to ensure HeightHintCache satisfies the SpendHintCache @@ -94,8 +95,11 @@ var _ SpendHintCache = (*HeightHintCache)(nil) var _ ConfirmHintCache = (*HeightHintCache)(nil) // NewHeightHintCache returns a new height hint cache backed by a database. -func NewHeightHintCache(db *channeldb.DB) (*HeightHintCache, error) { - cache := &HeightHintCache{db} +func NewHeightHintCache(db *channeldb.DB, disable bool) (*HeightHintCache, error) { + cache := &HeightHintCache{ + db: db, + disabled: disable, + } if err := cache.initBuckets(); err != nil { return nil, err } @@ -119,6 +123,10 @@ func (c *HeightHintCache) initBuckets() error { // CommitSpendHint commits a spend hint for the outpoints to the cache. func (c *HeightHintCache) CommitSpendHint(height uint32, ops ...wire.OutPoint) error { + if c.disabled { + return nil + } + Log.Tracef("Updating spend hint to height %d for %v", height, ops) return c.db.Batch(func(tx *bolt.Tx) error { @@ -153,6 +161,10 @@ func (c *HeightHintCache) CommitSpendHint(height uint32, ops ...wire.OutPoint) e // ErrSpendHintNotFound is returned if a spend hint does not exist within the // cache for the outpoint. func (c *HeightHintCache) QuerySpendHint(op wire.OutPoint) (uint32, error) { + if c.disabled { + return 0, ErrSpendHintNotFound + } + var hint uint32 err := c.db.View(func(tx *bolt.Tx) error { spendHints := tx.Bucket(spendHintBucket) @@ -181,6 +193,10 @@ func (c *HeightHintCache) QuerySpendHint(op wire.OutPoint) (uint32, error) { // PurgeSpendHint removes the spend hint for the outpoints from the cache. func (c *HeightHintCache) PurgeSpendHint(ops ...wire.OutPoint) error { + if c.disabled { + return nil + } + Log.Tracef("Removing spend hints for %v", ops) return c.db.Batch(func(tx *bolt.Tx) error { @@ -208,6 +224,10 @@ func (c *HeightHintCache) PurgeSpendHint(ops ...wire.OutPoint) error { // CommitConfirmHint commits a confirm hint for the transactions to the cache. func (c *HeightHintCache) CommitConfirmHint(height uint32, txids ...chainhash.Hash) error { + if c.disabled { + return nil + } + Log.Tracef("Updating confirm hints to height %d for %v", height, txids) return c.db.Batch(func(tx *bolt.Tx) error { @@ -242,6 +262,10 @@ func (c *HeightHintCache) CommitConfirmHint(height uint32, txids ...chainhash.Ha // ErrConfirmHintNotFound is returned if a confirm hint does not exist within // the cache for the transaction hash. func (c *HeightHintCache) QueryConfirmHint(txid chainhash.Hash) (uint32, error) { + if c.disabled { + return 0, ErrConfirmHintNotFound + } + var hint uint32 err := c.db.View(func(tx *bolt.Tx) error { confirmHints := tx.Bucket(confirmHintBucket) @@ -271,6 +295,10 @@ func (c *HeightHintCache) QueryConfirmHint(txid chainhash.Hash) (uint32, error) // PurgeConfirmHint removes the confirm hint for the transactions from the // cache. func (c *HeightHintCache) PurgeConfirmHint(txids ...chainhash.Hash) error { + if c.disabled { + return nil + } + Log.Tracef("Removing confirm hints for %v", txids) return c.db.Batch(func(tx *bolt.Tx) error { diff --git a/chainntnfs/height_hint_cache_test.go b/chainntnfs/height_hint_cache_test.go index f444b18d..2f09bce7 100644 --- a/chainntnfs/height_hint_cache_test.go +++ b/chainntnfs/height_hint_cache_test.go @@ -10,7 +10,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb" ) -func initHintCache(t *testing.T) *HeightHintCache { +func initHintCache(t *testing.T, disable bool) *HeightHintCache { t.Helper() tempDir, err := ioutil.TempDir("", "kek") @@ -21,7 +21,7 @@ func initHintCache(t *testing.T) *HeightHintCache { if err != nil { t.Fatalf("unable to create db: %v", err) } - hintCache, err := NewHeightHintCache(db) + hintCache, err := NewHeightHintCache(db, disable) if err != nil { t.Fatalf("unable to create hint cache: %v", err) } @@ -34,7 +34,7 @@ func initHintCache(t *testing.T) *HeightHintCache { func TestHeightHintCacheConfirms(t *testing.T) { t.Parallel() - hintCache := initHintCache(t) + hintCache := initHintCache(t, false) // Querying for a transaction hash not found within the cache should // return an error indication so. @@ -93,7 +93,7 @@ func TestHeightHintCacheConfirms(t *testing.T) { func TestHeightHintCacheSpends(t *testing.T) { t.Parallel() - hintCache := initHintCache(t) + hintCache := initHintCache(t, false) // Querying for an outpoint not found within the cache should return an // error indication so. @@ -146,3 +146,76 @@ func TestHeightHintCacheSpends(t *testing.T) { } } } + +// TestHeightHintCacheDisabled asserts that a disabled height hint cache never +// returns spend or confirm hints that are committed. +func TestHeightHintCacheDisabled(t *testing.T) { + t.Parallel() + + const height uint32 = 100 + + // Create a disabled height hint cache. + hintCache := initHintCache(t, true) + + // Querying a disabled cache w/ no spend hint should return not found. + var outpoint wire.OutPoint + _, err := hintCache.QuerySpendHint(outpoint) + if err != ErrSpendHintNotFound { + t.Fatalf("expected ErrSpendHintNotFound, got: %v", err) + } + + // Commit a spend hint to the disabled cache, which should be a noop. + if err := hintCache.CommitSpendHint(height, outpoint); err != nil { + t.Fatalf("unable to commit spend hint: %v", err) + } + + // Querying a disabled cache after commit noop should return not found. + _, err = hintCache.QuerySpendHint(outpoint) + if err != ErrSpendHintNotFound { + t.Fatalf("expected ErrSpendHintNotFound, got: %v", err) + } + + // Reenable the cache, this time actually committing a spend hint. + hintCache.disabled = false + if err := hintCache.CommitSpendHint(height, outpoint); err != nil { + t.Fatalf("unable to commit spend hint: %v", err) + } + + // Disable the cache again, spend hint should not be found. + hintCache.disabled = true + _, err = hintCache.QuerySpendHint(outpoint) + if err != ErrSpendHintNotFound { + t.Fatalf("expected ErrSpendHintNotFound, got: %v", err) + } + + // Querying a disabled cache w/ no conf hint should return not found. + var txid chainhash.Hash + _, err = hintCache.QueryConfirmHint(txid) + if err != ErrConfirmHintNotFound { + t.Fatalf("expected ErrConfirmHintNotFound, got: %v", err) + } + + // Commit a conf hint to the disabled cache, which should be a noop. + if err := hintCache.CommitConfirmHint(height, txid); err != nil { + t.Fatalf("unable to commit spend hint: %v", err) + } + + // Querying a disabled cache after commit noop should return not found. + _, err = hintCache.QueryConfirmHint(txid) + if err != ErrConfirmHintNotFound { + t.Fatalf("expected ErrConfirmHintNotFound, got: %v", err) + } + + // Reenable the cache, this time actually committing a conf hint. + hintCache.disabled = false + if err := hintCache.CommitConfirmHint(height, txid); err != nil { + t.Fatalf("unable to commit spend hint: %v", err) + } + + // Disable the cache again, conf hint should not be found. + hintCache.disabled = true + _, err = hintCache.QueryConfirmHint(txid) + if err != ErrConfirmHintNotFound { + t.Fatalf("expected ErrConfirmHintNotFound, got: %v", err) + } +} diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index bdd4dca5..384cd7ee 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -1572,7 +1572,7 @@ func TestInterfaces(t *testing.T) { if err != nil { t.Fatalf("unable to create db: %v", err) } - hintCache, err := chainntnfs.NewHeightHintCache(db) + hintCache, err := chainntnfs.NewHeightHintCache(db, true) if err != nil { t.Fatalf("unable to create height hint cache: %v", err) } diff --git a/chainregistry.go b/chainregistry.go index 108cfc55..fcebfbc8 100644 --- a/chainregistry.go +++ b/chainregistry.go @@ -181,8 +181,8 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB, cleanUp func() ) - // Initialize the height hint cache within the chain directory. - hintCache, err := chainntnfs.NewHeightHintCache(chanDB) + // Initialize disabled height hint cache within the chain directory. + hintCache, err := chainntnfs.NewHeightHintCache(chanDB, true) if err != nil { return nil, nil, fmt.Errorf("unable to initialize height hint "+ "cache: %v", err) diff --git a/lnd_test.go b/lnd_test.go index e8236f24..11b99934 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -2411,15 +2411,22 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf(err.Error()) } - // The htlc funds will still be shown as limbo, since they are still in - // their first stage. The commitment funds will have been recovered - // after the commit txn was included in the last block. + // The commitment funds will have been recovered after the commit txn + // was included in the last block. The htlc funds will not be shown in + // limbo, since they are still in their first stage and the nursery + // hasn't received them from the contract court. forceClose, err := findForceClosedChannel(pendingChanResp, &op) if err != nil { t.Fatalf(err.Error()) } - if forceClose.LimboBalance == 0 { - t.Fatalf("htlc funds should still be in limbo") + err = checkPendingChannelNumHtlcs(forceClose, 0) + if err != nil { + t.Fatalf("expected 0 pending htlcs, found %d", + len(forceClose.PendingHtlcs)) + } + if forceClose.LimboBalance != 0 { + t.Fatalf("expected 0 funds in limbo, found %d", + forceClose.LimboBalance) } // Compute the height preceding that which will cause the htlc CLTV diff --git a/lnwallet/interface_test.go b/lnwallet/interface_test.go index 6bdffc24..d71025a8 100644 --- a/lnwallet/interface_test.go +++ b/lnwallet/interface_test.go @@ -2076,7 +2076,7 @@ func TestLightningWallet(t *testing.T) { if err != nil { t.Fatalf("unable to create db: %v", err) } - hintCache, err := chainntnfs.NewHeightHintCache(db) + hintCache, err := chainntnfs.NewHeightHintCache(db, true) if err != nil { t.Fatalf("unable to create height hint cache: %v", err) } diff --git a/peer.go b/peer.go index c82ea843..4b5daa02 100644 --- a/peer.go +++ b/peer.go @@ -804,7 +804,7 @@ func (ms *msgStream) msgConsumer() { // AddMsg adds a new message to the msgStream. This function is safe for // concurrent access. -func (ms *msgStream) AddMsg(msg lnwire.Message) { +func (ms *msgStream) AddMsg(msg lnwire.Message, quit chan struct{}) { // First, we'll attempt to receive from the producerSema struct. This // acts as a sempahore to prevent us from indefinitely buffering // incoming items from the wire. Either the msg queue isn't full, and @@ -812,6 +812,8 @@ func (ms *msgStream) AddMsg(msg lnwire.Message) { // we're signalled to quit, or a slot is freed up. select { case <-ms.producerSema: + case <-quit: + return case <-ms.quit: return } @@ -1020,7 +1022,7 @@ out: // forward the error to all channels with this peer. case msg.ChanID == lnwire.ConnectionWideID: for chanID, chanStream := range chanMsgStreams { - chanStream.AddMsg(nextMsg) + chanStream.AddMsg(nextMsg, p.quit) // Also marked this channel as failed, // so we won't try to restart it on @@ -1082,7 +1084,7 @@ out: *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd: - discStream.AddMsg(msg) + discStream.AddMsg(msg, p.quit) default: peerLog.Errorf("unknown message %v received from peer "+ @@ -1105,7 +1107,7 @@ out: // With the stream obtained, add the message to the // stream so we can continue processing message. - chanStream.AddMsg(nextMsg) + chanStream.AddMsg(nextMsg, p.quit) } idleTimer.Reset(idleTimeout)