diff --git a/channel_notifier.go b/channel_notifier.go index 36a2224a..960ba0d9 100644 --- a/channel_notifier.go +++ b/channel_notifier.go @@ -7,6 +7,7 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/chanbackup" + "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channelnotifier" ) @@ -51,6 +52,35 @@ func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{ quit := make(chan struct{}) chanUpdates := make(chan chanbackup.ChannelEvent, 1) + // sendChanOpenUpdate is a closure that sends a ChannelEvent to the + // chanUpdates channel to inform subscribers about new pending or + // confirmed channels. + sendChanOpenUpdate := func(newOrPendingChan *channeldb.OpenChannel) { + nodeAddrs, err := c.addrs.AddrsForNode( + newOrPendingChan.IdentityPub, + ) + if err != nil { + pub := newOrPendingChan.IdentityPub + ltndLog.Errorf("unable to fetch addrs for %x: %v", + pub.SerializeCompressed(), err) + } + + chanEvent := chanbackup.ChannelEvent{ + NewChans: []chanbackup.ChannelWithAddrs{ + { + OpenChannel: newOrPendingChan, + Addrs: nodeAddrs, + }, + }, + } + + select { + case chanUpdates <- chanEvent: + case <-quit: + return + } + } + // In order to adhere to the interface, we'll proxy the events from the // channel notifier to the sub-swapper in a format it understands. go func() { @@ -74,37 +104,18 @@ func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{ // TODO(roasbeef): batch dispatch ntnfs switch event := e.(type) { + // A new channel has been opened and is still + // pending. We can still create a backup, even + // if the final channel ID is not yet available. + case channelnotifier.PendingOpenChannelEvent: + pendingChan := event.PendingChannel + sendChanOpenUpdate(pendingChan) - // A new channel has been opened, we'll obtain - // the node address, then send to the + // A new channel has been confirmed, we'll + // obtain the node address, then send to the // sub-swapper. case channelnotifier.OpenChannelEvent: - nodeAddrs, err := c.addrs.AddrsForNode( - event.Channel.IdentityPub, - ) - if err != nil { - pub := event.Channel.IdentityPub - ltndLog.Errorf("unable to "+ - "fetch addrs for %x: %v", - pub.SerializeCompressed(), - err) - } - - channel := event.Channel - chanEvent := chanbackup.ChannelEvent{ - NewChans: []chanbackup.ChannelWithAddrs{ - { - OpenChannel: channel, - Addrs: nodeAddrs, - }, - }, - } - - select { - case chanUpdates <- chanEvent: - case <-quit: - return - } + sendChanOpenUpdate(event.Channel) // An existing channel has been closed, we'll // send only the chanPoint of the closed diff --git a/channelnotifier/channelnotifier.go b/channelnotifier/channelnotifier.go index e484b9a6..e6a1fada 100644 --- a/channelnotifier/channelnotifier.go +++ b/channelnotifier/channelnotifier.go @@ -23,8 +23,14 @@ type ChannelNotifier struct { // PendingOpenChannelEvent represents a new event where a new channel has // entered a pending open state. type PendingOpenChannelEvent struct { - // ChannelPoint is the channelpoint for the new channel. + // ChannelPoint is the channel outpoint for the new channel. ChannelPoint *wire.OutPoint + + // PendingChannel is the channel configuration for the newly created + // channel. This might not have been persisted to the channel DB yet + // because we are still waiting for the final message from the remote + // peer. + PendingChannel *channeldb.OpenChannel } // OpenChannelEvent represents a new event where a channel goes from pending @@ -89,10 +95,18 @@ func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) { return c.ntfnServer.Subscribe() } -// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine that a -// new channel is pending. -func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint) { - event := PendingOpenChannelEvent{ChannelPoint: &chanPoint} +// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine +// that a new channel is pending. The pending channel is passed as a parameter +// instead of read from the database because it might not yet have been +// persisted to the DB because we still wait for the final message from the +// remote peer. +func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint, + pendingChan *channeldb.OpenChannel) { + + event := PendingOpenChannelEvent{ + ChannelPoint: &chanPoint, + PendingChannel: pendingChan, + } if err := c.ntfnServer.SendUpdate(event); err != nil { log.Warnf("Unable to send pending open channel update: %v", err) diff --git a/fundingmanager.go b/fundingmanager.go index 7913a11f..1eaa1a66 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -366,7 +366,7 @@ type fundingConfig struct { // NotifyPendingOpenChannelEvent informs the ChannelNotifier when channels // enter a pending state. - NotifyPendingOpenChannelEvent func(wire.OutPoint) + NotifyPendingOpenChannelEvent func(wire.OutPoint, *channeldb.OpenChannel) } // fundingManager acts as an orchestrator/bridge between the wallet's @@ -1705,7 +1705,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // Inform the ChannelNotifier that the channel has entered // pending open state. - f.cfg.NotifyPendingOpenChannelEvent(fundingOut) + f.cfg.NotifyPendingOpenChannelEvent(fundingOut, completeChan) // At this point we have sent our last funding message to the // initiating peer before the funding transaction will be broadcast. @@ -1853,7 +1853,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { case resCtx.updates <- upd: // Inform the ChannelNotifier that the channel has entered // pending open state. - f.cfg.NotifyPendingOpenChannelEvent(*fundingPoint) + f.cfg.NotifyPendingOpenChannelEvent(*fundingPoint, completeChan) case <-f.quit: return } diff --git a/fundingmanager_test.go b/fundingmanager_test.go index c9f9a421..f1ff528d 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -25,6 +25,7 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chanacceptor" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/input" @@ -48,6 +49,10 @@ const ( // testPollSleepMs is the number of milliseconds to sleep between // each attempt to access the database to check its state. testPollSleepMs = 500 + + // maxPending is the maximum number of channels we allow opening to the + // same peer in the max pending channels test. + maxPending = 4 ) var ( @@ -138,6 +143,24 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte, }, nil } +type mockChanEvent struct { + openEvent chan wire.OutPoint + pendingOpenEvent chan channelnotifier.PendingOpenChannelEvent +} + +func (m *mockChanEvent) NotifyOpenChannelEvent(outpoint wire.OutPoint) { + m.openEvent <- outpoint +} + +func (m *mockChanEvent) NotifyPendingOpenChannelEvent(outpoint wire.OutPoint, + pendingChannel *channeldb.OpenChannel) { + + m.pendingOpenEvent <- channelnotifier.PendingOpenChannelEvent{ + ChannelPoint: &outpoint, + PendingChannel: pendingChannel, + } +} + type testNode struct { privKey *btcec.PrivateKey addr *lnwire.NetAddress @@ -147,6 +170,7 @@ type testNode struct { fundingMgr *fundingManager newChannels chan *newChannelMsg mockNotifier *mockNotifier + mockChanEvent *mockChanEvent testDir string shutdownChannel chan struct{} remoteFeatures []lnwire.FeatureBit @@ -274,6 +298,17 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, bestHeight: fundingBroadcastHeight, } + // The mock channel event notifier will receive events for each pending + // open and open channel. Because some tests will create multiple + // channels in a row before advancing to the next step, these channels + // need to be buffered. + evt := &mockChanEvent{ + openEvent: make(chan wire.OutPoint, maxPending), + pendingOpenEvent: make( + chan channelnotifier.PendingOpenChannelEvent, maxPending, + ), + } + dbDir := filepath.Join(tempTestDir, "cdb") cdb, err := channeldb.Open(dbDir) if err != nil { @@ -379,9 +414,9 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, ZombieSweeperInterval: 1 * time.Hour, ReservationTimeout: 1 * time.Nanosecond, MaxPendingChannels: DefaultMaxPendingChannels, - NotifyOpenChannelEvent: func(wire.OutPoint) {}, + NotifyOpenChannelEvent: evt.NotifyOpenChannelEvent, OpenChannelPredicate: chainedAcceptor, - NotifyPendingOpenChannelEvent: func(wire.OutPoint) {}, + NotifyPendingOpenChannelEvent: evt.NotifyPendingOpenChannelEvent, } for _, op := range options { @@ -404,6 +439,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, publTxChan: publTxChan, fundingMgr: f, mockNotifier: chainNotifier, + mockChanEvent: evt, testDir: tempTestDir, shutdownChannel: shutdownChan, addr: addr, @@ -685,6 +721,18 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt, t.Fatalf("alice did not publish funding tx") } + // Make sure the notification about the pending channel was sent out. + select { + case <-alice.mockChanEvent.pendingOpenEvent: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send pending channel event") + } + select { + case <-bob.mockChanEvent.pendingOpenEvent: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send pending channel event") + } + // Finally, make sure neither have active reservation for the channel // now pending open in the database. assertNumPendingReservations(t, alice, bobPubKey, 0) @@ -867,6 +915,18 @@ func assertMarkedOpen(t *testing.T, alice, bob *testNode, fundingOutPoint *wire.OutPoint) { t.Helper() + // Make sure the notification about the pending channel was sent out. + select { + case <-alice.mockChanEvent.openEvent: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send open channel event") + } + select { + case <-bob.mockChanEvent.openEvent: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send open channel event") + } + assertDatabaseState(t, alice, fundingOutPoint, markedOpen) assertDatabaseState(t, bob, fundingOutPoint, markedOpen) } @@ -2558,8 +2618,6 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { func TestFundingManagerMaxPendingChannels(t *testing.T) { t.Parallel() - const maxPending = 4 - alice, bob := setupFundingManagers( t, func(cfg *fundingConfig) { cfg.MaxPendingChannels = maxPending diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 665426a3..98edf8a4 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -13318,6 +13318,15 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to find channel") } + // To make sure the channel is removed from the backup file as well when + // being abandoned, grab a backup snapshot so we can compare it with the + // later state. + bkupBefore, err := ioutil.ReadFile(net.Alice.ChanBackupPath()) + if err != nil { + t.Fatalf("could not get channel backup before abandoning "+ + "channel: %v", err) + } + // Send request to abandon channel. abandonChannelRequest := &lnrpc.AbandonChannelRequest{ ChannelPoint: chanPoint, @@ -13388,6 +13397,16 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) { "graph!") } + // Make sure the channel is no longer in the channel backup list. + bkupAfter, err := ioutil.ReadFile(net.Alice.ChanBackupPath()) + if err != nil { + t.Fatalf("could not get channel backup before abandoning "+ + "channel: %v", err) + } + if len(bkupAfter) >= len(bkupBefore) { + t.Fatalf("channel wasn't removed from channel backup file") + } + // Calling AbandonChannel again, should result in no new errors, as the // channel has already been removed. ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) @@ -13690,9 +13709,9 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) { } } - // As these two channels were just open, we should've got two - // notifications for channel backups. - assertBackupNtfns(2) + // As these two channels were just opened, we should've got two times + // the pending and open notifications for channel backups. + assertBackupNtfns(2 * 2) // The on disk file should also exactly match the latest backup that we // have. @@ -14002,6 +14021,21 @@ func testChanRestoreScenario(t *harnessTest, net *lntest.NetworkHarness, t.Fatalf("couldn't open pending channel: %v", err) } + // Give the pubsub some time to update the channel backup. + err = wait.NoError(func() error { + fi, err := os.Stat(dave.ChanBackupPath()) + if err != nil { + return err + } + if fi.Size() <= chanbackup.NilMultiSizePacked { + return fmt.Errorf("backup file empty") + } + return nil + }, defaultTimeout) + if err != nil { + t.Fatalf("channel backup not updated in time: %v", err) + } + default: ctxt, _ = context.WithTimeout(ctxb, channelOpenTimeout) chanPoint := openChannelAndAssert( @@ -14218,7 +14252,7 @@ func testChannelBackupRestore(net *lntest.NetworkHarness, t *harnessTest) { mnemonic []string) (nodeRestorer, error) { // Read the entire Multi backup stored within - // this node's chaannels.backup file. + // this node's channels.backup file. multi, err := ioutil.ReadFile(backupFilePath) if err != nil { return nil, err @@ -14327,7 +14361,7 @@ func testChannelBackupRestore(net *lntest.NetworkHarness, t *harnessTest) { mnemonic []string) (nodeRestorer, error) { // Read the entire Multi backup stored within - // this node's chaannels.backup file. + // this node's channels.backup file. multi, err := ioutil.ReadFile(backupFilePath) if err != nil { return nil, err @@ -14382,10 +14416,48 @@ func testChannelBackupRestore(net *lntest.NetworkHarness, t *harnessTest) { }, }, - // Create a backup from an unconfirmed channel and make sure - // recovery works as well. + // Use the channel backup file that contains an unconfirmed + // channel and make sure recovery works as well. { - name: "restore unconfirmed channel", + name: "restore unconfirmed channel file", + channelsUpdated: false, + initiator: true, + private: false, + unconfirmed: true, + restoreMethod: func(oldNode *lntest.HarnessNode, + backupFilePath string, + mnemonic []string) (nodeRestorer, error) { + + // Read the entire Multi backup stored within + // this node's channels.backup file. + multi, err := ioutil.ReadFile(backupFilePath) + if err != nil { + return nil, err + } + + // Let's assume time passes, the channel + // confirms in the meantime but for some reason + // the backup we made while it was still + // unconfirmed is the only backup we have. We + // should still be able to restore it. To + // simulate time passing, we mine some blocks + // to get the channel confirmed _after_ we saved + // the backup. + mineBlocks(t, net, 6, 1) + + // In our nodeRestorer function, we'll restore + // the node from seed, then manually recover + // the channel backup. + return chanRestoreViaRPC( + net, password, mnemonic, multi, + ) + }, + }, + + // Create a backup using RPC that contains an unconfirmed + // channel and make sure recovery works as well. + { + name: "restore unconfirmed channel RPC", channelsUpdated: false, initiator: true, private: false, diff --git a/rpcserver.go b/rpcserver.go index 024bfb84..f2ea166b 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2253,6 +2253,10 @@ func (r *rpcServer) AbandonChannel(ctx context.Context, return nil, err } + // Finally, notify the backup listeners that the channel can be removed + // from any channel backups. + r.server.channelNotifier.NotifyClosedChannelEvent(*chanPoint) + return &lnrpc.AbandonChannelResponse{}, nil } @@ -5711,7 +5715,7 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription updateStream lnrpc.Lightning_SubscribeChannelBackupsServer) error { // First, we'll subscribe to the primary channel notifier so we can - // obtain events for new opened/closed channels. + // obtain events for new pending/opened/closed channels. chanSubscription, err := r.server.channelNotifier.SubscribeChannelEvents() if err != nil { return err @@ -5728,9 +5732,10 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription switch e.(type) { // We only care about new/closed channels, so we'll - // skip any events for pending/active/inactive channels. - case channelnotifier.PendingOpenChannelEvent: - continue + // skip any events for active/inactive channels. + // To make the subscription behave the same way as the + // synchronous call and the file based backup, we also + // include pending channels in the update. case channelnotifier.ActiveChannelEvent: continue case channelnotifier.InactiveChannelEvent: