From eadbd6988200f6019dac39fac0d54f2a4ea30ab1 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 5 Aug 2021 20:37:36 +0800 Subject: [PATCH 01/14] routing: increase log level when notifying topology change --- routing/notifications.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/routing/notifications.go b/routing/notifications.go index 0fc8feba..02956c84 100644 --- a/routing/notifications.go +++ b/routing/notifications.go @@ -125,7 +125,7 @@ func (r *ChannelRouter) notifyTopologyChange(topologyDiff *TopologyChange) { return } - log.Tracef("Sending topology notification to %v clients %v", + log.Debugf("Sending topology notification to %v clients %v", numClients, newLogClosure(func() string { return spew.Sdump(topologyDiff) From a1024163fea06369c31a2389cc4c4dda952e2585 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 8 Aug 2021 17:16:49 +0800 Subject: [PATCH 02/14] itest: add more verbose log and print node state --- lntest/itest/assertions.go | 8 --- lntest/itest/lnd_channel_force_close.go | 5 -- lntest/itest/lnd_funding_test.go | 2 +- lntest/itest/lnd_multi-hop_test.go | 2 +- lntest/itest/lnd_test.go | 4 +- lntest/node.go | 74 +++++++++++++++++++++++-- 6 files changed, 73 insertions(+), 22 deletions(-) diff --git a/lntest/itest/assertions.go b/lntest/itest/assertions.go index 40c5bdc6..160d65a9 100644 --- a/lntest/itest/assertions.go +++ b/lntest/itest/assertions.go @@ -26,14 +26,6 @@ import ( "google.golang.org/protobuf/proto" ) -// AddToNodeLog adds a line to the log file and asserts there's no error. -func AddToNodeLog(t *testing.T, - node *lntest.HarnessNode, logLine string) { - - err := node.AddToLog(logLine) - require.NoError(t, err, "unable to add to log") -} - // openChannelStream blocks until an OpenChannel request for a channel funding // by alice succeeds. If it does, a stream client is returned to receive events // about the opening channel. diff --git a/lntest/itest/lnd_channel_force_close.go b/lntest/itest/lnd_channel_force_close.go index 908d4116..461be62c 100644 --- a/lntest/itest/lnd_channel_force_close.go +++ b/lntest/itest/lnd_channel_force_close.go @@ -251,11 +251,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) { for _, channelType := range commitTypes { testName := fmt.Sprintf("committype=%v", channelType) - logLine := fmt.Sprintf( - "---- channel force close subtest %s ----\n", - testName, - ) - AddToNodeLog(t.t, net.Alice, logLine) channelType := channelType success := t.t.Run(testName, func(t *testing.T) { diff --git a/lntest/itest/lnd_funding_test.go b/lntest/itest/lnd_funding_test.go index f9a963b2..17bb01b0 100644 --- a/lntest/itest/lnd_funding_test.go +++ b/lntest/itest/lnd_funding_test.go @@ -139,7 +139,7 @@ test: "---- basic channel funding subtest %s ----\n", testName, ) - AddToNodeLog(t.t, net.Alice, logLine) + net.Alice.AddToLog(logLine) success := t.t.Run(testName, func(t *testing.T) { testFunding(cc, dc) diff --git a/lntest/itest/lnd_multi-hop_test.go b/lntest/itest/lnd_multi-hop_test.go index 0d33e47d..70154366 100644 --- a/lntest/itest/lnd_multi-hop_test.go +++ b/lntest/itest/lnd_multi-hop_test.go @@ -97,7 +97,7 @@ func testMultiHopHtlcClaims(net *lntest.NetworkHarness, t *harnessTest) { "%s/%s ----\n", testName, subTest.name, ) - AddToNodeLog(t, net.Alice, logLine) + net.Alice.AddToLog(logLine) success := ht.t.Run(subTest.name, func(t *testing.T) { ht := newHarnessTest(t, net) diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 5b152ddc..01468ee6 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -232,8 +232,8 @@ func TestLightningNetworkDaemon(t *testing.T) { testCase.name, ) - AddToNodeLog(t, lndHarness.Alice, logLine) - AddToNodeLog(t, lndHarness.Bob, logLine) + lndHarness.Alice.AddToLog(logLine) + lndHarness.Bob.AddToLog(logLine) // Start every test with the default static fee estimate. lndHarness.SetFeeEstimate(12500) diff --git a/lntest/node.go b/lntest/node.go index da60349f..1cb493ec 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/hex" + "encoding/json" "flag" "fmt" "io" @@ -519,6 +520,62 @@ func NewMiner(baseLogDir, logFilename string, netParams *chaincfg.Params, return miner, cleanUp, nil } +// String gives the internal state of the node which is useful for debugging. +func (hn *HarnessNode) String() string { + type nodeCfg struct { + LogFilenamePrefix string + ExtraArgs []string + HasSeed bool + P2PPort int + RPCPort int + RESTPort int + ProfilePort int + AcceptKeySend bool + AcceptAMP bool + FeeURL string + } + + nodeState := struct { + NodeID int + Name string + PubKey string + OpenChans map[string]int + ClosedChans map[string]struct{} + NodeCfg nodeCfg + }{ + NodeID: hn.NodeID, + Name: hn.Cfg.Name, + PubKey: hn.PubKeyStr, + OpenChans: make(map[string]int), + ClosedChans: make(map[string]struct{}), + NodeCfg: nodeCfg{ + LogFilenamePrefix: hn.Cfg.LogFilenamePrefix, + ExtraArgs: hn.Cfg.ExtraArgs, + HasSeed: hn.Cfg.HasSeed, + P2PPort: hn.Cfg.P2PPort, + RPCPort: hn.Cfg.RPCPort, + RESTPort: hn.Cfg.RESTPort, + AcceptKeySend: hn.Cfg.AcceptKeySend, + AcceptAMP: hn.Cfg.AcceptAMP, + FeeURL: hn.Cfg.FeeURL, + }, + } + + for outpoint, count := range hn.openChans { + nodeState.OpenChans[outpoint.String()] = count + } + for outpoint, count := range hn.closedChans { + nodeState.ClosedChans[outpoint.String()] = count + } + + bytes, err := json.MarshalIndent(nodeState, "", "\t") + if err != nil { + return fmt.Sprintf("\n encode node state with err: %v", err) + } + + return fmt.Sprintf("\nnode state: %s", bytes) +} + // DBPath returns the filepath to the channeldb database file for this node. func (hn *HarnessNode) DBPath() string { return hn.Cfg.DBPath() @@ -1066,15 +1123,16 @@ func (hn *HarnessNode) FetchNodeInfo() error { // AddToLog adds a line of choice to the node's logfile. This is useful // to interleave test output with output from the node. -func (hn *HarnessNode) AddToLog(line string) error { +func (hn *HarnessNode) AddToLog(format string, a ...interface{}) { // If this node was not set up with a log file, just return early. if hn.logFile == nil { - return nil + return } - if _, err := hn.logFile.WriteString(line); err != nil { - return err + + desc := fmt.Sprintf("itest: %s\n", fmt.Sprintf(format, a...)) + if _, err := hn.logFile.WriteString(desc); err != nil { + hn.PrintErr("write to log err: %v", err) } - return nil } // writePidFile writes the process ID of the running lnd process to a .pid file. @@ -1616,3 +1674,9 @@ func (hn *HarnessNode) WaitForBalance(expectedBalance btcutil.Amount, confirmed return nil } + +// PrintErr prints an error to the console. +func (hn *HarnessNode) PrintErr(format string, a ...interface{}) { + fmt.Printf("itest error from [node:%s]: %s\n", + hn.Cfg.Name, fmt.Sprintf(format, a...)) +} From 0701834a5d928694776cc01067eaf41e5efa4900 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 8 Aug 2021 17:19:25 +0800 Subject: [PATCH 03/14] lntest: refactor handle update open channel --- lntest/itest/lnd_channel_backup_test.go | 15 +-- lntest/node.go | 160 ++++++++++++++---------- 2 files changed, 97 insertions(+), 78 deletions(-) diff --git a/lntest/itest/lnd_channel_backup_test.go b/lntest/itest/lnd_channel_backup_test.go index ec61dd38..a2acf74f 100644 --- a/lntest/itest/lnd_channel_backup_test.go +++ b/lntest/itest/lnd_channel_backup_test.go @@ -1346,14 +1346,11 @@ func copyPorts(oldNode *lntest.HarnessNode) lntest.NodeOption { } } -func rpcPointToWirePoint(t *harnessTest, chanPoint *lnrpc.ChannelPoint) wire.OutPoint { - txid, err := lnrpc.GetChanPointFundingTxid(chanPoint) - if err != nil { - t.Fatalf("unable to get txid: %v", err) - } +func rpcPointToWirePoint(t *harnessTest, + chanPoint *lnrpc.ChannelPoint) wire.OutPoint { - return wire.OutPoint{ - Hash: *txid, - Index: chanPoint.OutputIndex, - } + op, err := lntest.MakeOutpoint(chanPoint) + require.NoError(t.t, err, "unable to get txid") + + return op } diff --git a/lntest/node.go b/lntest/node.go index 1cb493ec..ce58752b 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -370,8 +370,8 @@ type HarnessNode struct { // edges seen for that channel within the network. When this number // reaches 2, then it means that both edge advertisements has propagated // through the network. - openChans map[wire.OutPoint]int - openClients map[wire.OutPoint][]chan struct{} + openChans map[wire.OutPoint]int + openChanWatchers map[wire.OutPoint][]chan struct{} closedChans map[wire.OutPoint]struct{} closeClients map[wire.OutPoint][]chan struct{} @@ -447,7 +447,7 @@ func newNode(cfg NodeConfig) (*HarnessNode, error) { NodeID: nodeNum, chanWatchRequests: make(chan *chanWatchRequest), openChans: make(map[wire.OutPoint]int), - openClients: make(map[wire.OutPoint][]chan struct{}), + openChanWatchers: make(map[wire.OutPoint][]chan struct{}), closedChans: make(map[wire.OutPoint]struct{}), closeClients: make(map[wire.OutPoint][]chan struct{}), @@ -1438,31 +1438,7 @@ func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) { // the current set of registered clients to see if we can // dispatch any requests. case graphUpdate := <-graphUpdates: - // For each new channel, we'll increment the number of - // edges seen by one. - for _, newChan := range graphUpdate.ChannelUpdates { - txidHash, _ := getChanPointFundingTxid(newChan.ChanPoint) - txid, _ := chainhash.NewHash(txidHash) - op := wire.OutPoint{ - Hash: *txid, - Index: newChan.ChanPoint.OutputIndex, - } - hn.openChans[op]++ - - // For this new channel, if the number of edges - // seen is less than two, then the channel - // hasn't been fully announced yet. - if numEdges := hn.openChans[op]; numEdges < 2 { - continue - } - - // Otherwise, we'll notify all the registered - // clients and remove the dispatched clients. - for _, eventChan := range hn.openClients[op] { - close(eventChan) - } - delete(hn.openClients, op) - } + hn.handleChannelEdgeUpdates(graphUpdate.ChannelUpdates) // For each channel closed, we'll mark that we've // detected a channel closure while lnd was pruning the @@ -1493,35 +1469,7 @@ func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) { // TODO(roasbeef): add update type also, checks for // multiple of 2 if watchRequest.chanOpen { - // If this is an open request, then it can be - // dispatched if the number of edges seen for - // the channel is at least two. - if numEdges := hn.openChans[targetChan]; numEdges >= 2 { - close(watchRequest.eventChan) - continue - } - - // Before we add the channel to our set of open - // clients, we'll check to see if the channel - // is already in the channel graph of the - // target node. This lets us handle the case - // where a node has already seen a channel - // before a notification has been requested, - // causing us to miss it. - chanFound := checkChanPointInGraph( - context.Background(), hn, targetChan, - ) - if chanFound { - close(watchRequest.eventChan) - continue - } - - // Otherwise, we'll add this to the list of - // watch open clients for this out point. - hn.openClients[targetChan] = append( - hn.openClients[targetChan], - watchRequest.eventChan, - ) + hn.handleOpenChannelWatchRequest(watchRequest) continue } @@ -1551,24 +1499,18 @@ func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) { // considered "fully advertised" once both of its directional edges has been // advertised within the test Lightning Network. func (hn *HarnessNode) WaitForNetworkChannelOpen(ctx context.Context, - op *lnrpc.ChannelPoint) error { + chanPoint *lnrpc.ChannelPoint) error { eventChan := make(chan struct{}) - txidHash, err := getChanPointFundingTxid(op) + op, err := MakeOutpoint(chanPoint) if err != nil { - return err - } - txid, err := chainhash.NewHash(txidHash) - if err != nil { - return err + return fmt.Errorf("failed to create outpoint for %v "+ + "got err: %v", chanPoint, err) } hn.chanWatchRequests <- &chanWatchRequest{ - chanPoint: wire.OutPoint{ - Hash: *txid, - Index: op.OutputIndex, - }, + chanPoint: op, eventChan: eventChan, chanOpen: true, } @@ -1577,7 +1519,8 @@ func (hn *HarnessNode) WaitForNetworkChannelOpen(ctx context.Context, case <-eventChan: return nil case <-ctx.Done(): - return fmt.Errorf("channel not opened before timeout") + return fmt.Errorf("channel:%s not opened before timeout: %s", + op, hn) } } @@ -1680,3 +1623,82 @@ func (hn *HarnessNode) PrintErr(format string, a ...interface{}) { fmt.Printf("itest error from [node:%s]: %s\n", hn.Cfg.Name, fmt.Sprintf(format, a...)) } + +// MakeOutpoint returns the outpoint of the channel's funding transaction. +func MakeOutpoint(chanPoint *lnrpc.ChannelPoint) (wire.OutPoint, error) { + fundingTxID, err := lnrpc.GetChanPointFundingTxid(chanPoint) + if err != nil { + return wire.OutPoint{}, err + } + + return wire.OutPoint{ + Hash: *fundingTxID, + Index: chanPoint.OutputIndex, + }, nil +} + +// handleChannelEdgeUpdates takes a series of channel edge updates, extracts +// the outpoints, and saves them to harness node's internal state. +func (hn *HarnessNode) handleChannelEdgeUpdates( + updates []*lnrpc.ChannelEdgeUpdate) { + + // For each new channel, we'll increment the number of + // edges seen by one. + for _, newChan := range updates { + op, err := MakeOutpoint(newChan.ChanPoint) + if err != nil { + hn.PrintErr("failed to create outpoint for %v "+ + "got err: %v", newChan.ChanPoint, err) + return + } + hn.openChans[op]++ + + // For this new channel, if the number of edges seen is less + // than two, then the channel hasn't been fully announced yet. + if numEdges := hn.openChans[op]; numEdges < 2 { + return + } + + // Otherwise, we'll notify all the registered watchers and + // remove the dispatched watchers. + for _, eventChan := range hn.openChanWatchers[op] { + close(eventChan) + } + delete(hn.openChanWatchers, op) + } +} + +// handleOpenChannelWatchRequest processes a watch open channel request by +// checking the number of the edges seen for a given channel point. If the +// number is no less than 2 then the channel is considered open. Otherwise, we +// will attempt to find it in its channel graph. If neither can be found, the +// request is added to a watch request list than will be handled by +// handleChannelEdgeUpdates. +func (hn *HarnessNode) handleOpenChannelWatchRequest(req *chanWatchRequest) { + targetChan := req.chanPoint + + // If this is an open request, then it can be dispatched if the number + // of edges seen for the channel is at least two. + if numEdges := hn.openChans[targetChan]; numEdges >= 2 { + close(req.eventChan) + return + } + + // Before we add the channel to our set of open clients, we'll check to + // see if the channel is already in the channel graph of the target + // node. This lets us handle the case where a node has already seen a + // channel before a notification has been requested, causing us to miss + // it. + chanFound := checkChanPointInGraph(context.Background(), hn, targetChan) + if chanFound { + close(req.eventChan) + return + } + + // Otherwise, we'll add this to the list of open channel watchers for + // this out point. + hn.openChanWatchers[targetChan] = append( + hn.openChanWatchers[targetChan], + req.eventChan, + ) +} From 92cd6657c541c3b106dde404cabcfa04eb854b77 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 8 Aug 2021 17:20:04 +0800 Subject: [PATCH 04/14] lntest: refactor handle close channel update --- lntest/node.go | 115 +++++++++++++++++++++++++++---------------------- 1 file changed, 63 insertions(+), 52 deletions(-) diff --git a/lntest/node.go b/lntest/node.go index ce58752b..4cd42856 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -373,8 +373,8 @@ type HarnessNode struct { openChans map[wire.OutPoint]int openChanWatchers map[wire.OutPoint][]chan struct{} - closedChans map[wire.OutPoint]struct{} - closeClients map[wire.OutPoint][]chan struct{} + closedChans map[wire.OutPoint]struct{} + closeChanWatchers map[wire.OutPoint][]chan struct{} quit chan struct{} wg sync.WaitGroup @@ -449,8 +449,8 @@ func newNode(cfg NodeConfig) (*HarnessNode, error) { openChans: make(map[wire.OutPoint]int), openChanWatchers: make(map[wire.OutPoint][]chan struct{}), - closedChans: make(map[wire.OutPoint]struct{}), - closeClients: make(map[wire.OutPoint][]chan struct{}), + closedChans: make(map[wire.OutPoint]struct{}), + closeChanWatchers: make(map[wire.OutPoint][]chan struct{}), }, nil } @@ -1439,33 +1439,13 @@ func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) { // dispatch any requests. case graphUpdate := <-graphUpdates: hn.handleChannelEdgeUpdates(graphUpdate.ChannelUpdates) - - // For each channel closed, we'll mark that we've - // detected a channel closure while lnd was pruning the - // channel graph. - for _, closedChan := range graphUpdate.ClosedChans { - txidHash, _ := getChanPointFundingTxid(closedChan.ChanPoint) - txid, _ := chainhash.NewHash(txidHash) - op := wire.OutPoint{ - Hash: *txid, - Index: closedChan.ChanPoint.OutputIndex, - } - hn.closedChans[op] = struct{}{} - - // As the channel has been closed, we'll notify - // all register clients. - for _, eventChan := range hn.closeClients[op] { - close(eventChan) - } - delete(hn.closeClients, op) - } + hn.handleClosedChannelUpdate(graphUpdate.ClosedChans) + // TODO(yy): handle node updates too // A new watch request, has just arrived. We'll either be able // to dispatch immediately, or need to add the client for // processing later. case watchRequest := <-hn.chanWatchRequests: - targetChan := watchRequest.chanPoint - // TODO(roasbeef): add update type also, checks for // multiple of 2 if watchRequest.chanOpen { @@ -1473,20 +1453,7 @@ func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) { continue } - // If this is a close request, then it can be - // immediately dispatched if we've already seen a - // channel closure for this channel. - if _, ok := hn.closedChans[targetChan]; ok { - close(watchRequest.eventChan) - continue - } - - // Otherwise, we'll add this to the list of close watch - // clients for this out point. - hn.closeClients[targetChan] = append( - hn.closeClients[targetChan], - watchRequest.eventChan, - ) + hn.handleCloseChannelWatchRequest(watchRequest) case <-hn.quit: return @@ -1529,24 +1496,18 @@ func (hn *HarnessNode) WaitForNetworkChannelOpen(ctx context.Context, // closed once a transaction spending the funding outpoint is seen within a // confirmed block. func (hn *HarnessNode) WaitForNetworkChannelClose(ctx context.Context, - op *lnrpc.ChannelPoint) error { + chanPoint *lnrpc.ChannelPoint) error { eventChan := make(chan struct{}) - txidHash, err := getChanPointFundingTxid(op) + op, err := MakeOutpoint(chanPoint) if err != nil { - return err - } - txid, err := chainhash.NewHash(txidHash) - if err != nil { - return err + return fmt.Errorf("failed to create outpoint for %v "+ + "got err: %v", chanPoint, err) } hn.chanWatchRequests <- &chanWatchRequest{ - chanPoint: wire.OutPoint{ - Hash: *txid, - Index: op.OutputIndex, - }, + chanPoint: op, eventChan: eventChan, chanOpen: false, } @@ -1555,7 +1516,8 @@ func (hn *HarnessNode) WaitForNetworkChannelClose(ctx context.Context, case <-eventChan: return nil case <-ctx.Done(): - return fmt.Errorf("channel not closed before timeout") + return fmt.Errorf("channel:%s not closed before timeout: "+ + "%s", op, hn) } } @@ -1702,3 +1664,52 @@ func (hn *HarnessNode) handleOpenChannelWatchRequest(req *chanWatchRequest) { req.eventChan, ) } + +// handleClosedChannelUpdate takes a series of closed channel updates, extracts +// the outpoints, saves them to harness node's internal state, and notifies all +// registered clients. +func (hn *HarnessNode) handleClosedChannelUpdate( + updates []*lnrpc.ClosedChannelUpdate) { + + // For each channel closed, we'll mark that we've detected a channel + // closure while lnd was pruning the channel graph. + for _, closedChan := range updates { + op, err := MakeOutpoint(closedChan.ChanPoint) + if err != nil { + hn.PrintErr("failed to create outpoint for %v "+ + "got err: %v", closedChan.ChanPoint, err) + return + } + + hn.closedChans[op] = struct{}{} + + // As the channel has been closed, we'll notify all register + // watchers. + for _, eventChan := range hn.closeChanWatchers[op] { + close(eventChan) + } + delete(hn.closeChanWatchers, op) + } +} + +// handleCloseChannelWatchRequest processes a watch close channel request by +// checking whether the given channel point can be found in the node's internal +// state. If not, the request is added to a watch request list than will be +// handled by handleCloseChannelWatchRequest. +func (hn *HarnessNode) handleCloseChannelWatchRequest(req *chanWatchRequest) { + targetChan := req.chanPoint + + // If this is a close request, then it can be immediately dispatched if + // we've already seen a channel closure for this channel. + if _, ok := hn.closedChans[targetChan]; ok { + close(req.eventChan) + return + } + + // Otherwise, we'll add this to the list of close channel watchers for + // this out point. + hn.closeChanWatchers[targetChan] = append( + hn.closeChanWatchers[targetChan], + req.eventChan, + ) +} From a58543d1c7d6e7d6b7957fe07e69878520a4da8a Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 8 Aug 2021 16:57:24 +0800 Subject: [PATCH 05/14] itest: remove extra graph topology subscription --- lntest/node.go | 162 ++++++++++++++++++++++++++------------------ lntest/wait/wait.go | 7 +- 2 files changed, 99 insertions(+), 70 deletions(-) diff --git a/lntest/node.go b/lntest/node.go index 4cd42856..1cdd2f0b 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -1055,50 +1055,12 @@ func (hn *HarnessNode) initLightningClient(conn *grpc.ClientConn) error { return err } - // Due to a race condition between the ChannelRouter starting and us - // making the subscription request, it's possible for our graph - // subscription to fail. To ensure we don't start listening for updates - // until then, we'll create a dummy subscription to ensure we can do so - // successfully before proceeding. We use a dummy subscription in order - // to not consume an update from the real one. - err := wait.NoError(func() error { - req := &lnrpc.GraphTopologySubscription{} - ctx, cancelFunc := context.WithCancel(context.Background()) - topologyClient, err := hn.SubscribeChannelGraph(ctx, req) - if err != nil { - return err - } - - // We'll wait to receive an error back within a one second - // timeout. This is needed since creating the client's stream is - // independent of the graph subscription being created. The - // stream is closed from the server's side upon an error. - errChan := make(chan error, 1) - go func() { - if _, err := topologyClient.Recv(); err != nil { - errChan <- err - } - }() - - select { - case err = <-errChan: - case <-time.After(time.Second): - } - - cancelFunc() - return err - }, DefaultTimeout) - if err != nil { - return err - } - // Launch the watcher that will hook into graph related topology change // from the PoV of this node. hn.wg.Add(1) - subscribed := make(chan error) - go hn.lightningNetworkWatcher(subscribed) + go hn.lightningNetworkWatcher() - return <-subscribed + return nil } // FetchNodeInfo queries an unlocked node to retrieve its public key. @@ -1394,40 +1356,19 @@ func checkChanPointInGraph(ctx context.Context, // closed or opened within the network. In order to dispatch these // notifications, the GraphTopologySubscription client exposed as part of the // gRPC interface is used. -func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) { +func (hn *HarnessNode) lightningNetworkWatcher() { defer hn.wg.Done() graphUpdates := make(chan *lnrpc.GraphTopologyUpdate) + + // Start a goroutine to receive graph updates. hn.wg.Add(1) go func() { defer hn.wg.Done() - - req := &lnrpc.GraphTopologySubscription{} - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - topologyClient, err := hn.SubscribeChannelGraph(ctx, req) + err := hn.receiveTopologyClientStream(graphUpdates) if err != nil { - msg := fmt.Sprintf("%s(%d): unable to create topology "+ - "client: %v (%s)", hn.Name(), hn.NodeID, err, - time.Now().String()) - subscribed <- fmt.Errorf(msg) - return - } - close(subscribed) - - for { - update, err := topologyClient.Recv() - if err == io.EOF { - return - } else if err != nil { - return - } - - select { - case graphUpdates <- update: - case <-hn.quit: - return - } + hn.PrintErr("receive topology client stream "+ + "got err:%v", err) } }() @@ -1713,3 +1654,90 @@ func (hn *HarnessNode) handleCloseChannelWatchRequest(req *chanWatchRequest) { req.eventChan, ) } + +type topologyClient lnrpc.Lightning_SubscribeChannelGraphClient + +// newTopologyClient creates a topology client. +func (hn *HarnessNode) newTopologyClient( + ctx context.Context) (topologyClient, error) { + + req := &lnrpc.GraphTopologySubscription{} + client, err := hn.SubscribeChannelGraph(ctx, req) + if err != nil { + return nil, fmt.Errorf("%s(%d): unable to create topology "+ + "client: %v (%s)", hn.Name(), hn.NodeID, err, + time.Now().String()) + } + + return client, nil +} + +// receiveTopologyClientStream initializes a topologyClient to subscribe +// topology update events. Due to a race condition between the ChannelRouter +// starting and us making the subscription request, it's possible for our graph +// subscription to fail. In that case, we will retry the subscription until it +// succeeds or fail after 10 seconds. +// +// NOTE: must be run as a goroutine. +func (hn *HarnessNode) receiveTopologyClientStream( + receiver chan *lnrpc.GraphTopologyUpdate) error { + + ctxb := context.Background() + + // Create a topology client to receive graph updates. + client, err := hn.newTopologyClient(ctxb) + if err != nil { + return fmt.Errorf("create topologyClient failed: %v", err) + } + + // We use the context to time out when retrying graph subscription. + ctxt, cancel := context.WithTimeout(ctxb, DefaultTimeout) + defer cancel() + + for { + update, err := client.Recv() + + switch { + case err == nil: + // Good case. We will send the update to the receiver. + + case strings.Contains(err.Error(), "router not started"): + // If the router hasn't been started, we will retry + // every 200 ms until it has been started or fail + // after the ctxt is timed out. + select { + case <-ctxt.Done(): + return fmt.Errorf("graph subscription: " + + "router not started before timeout") + case <-time.After(wait.PollInterval): + case <-hn.quit: + return nil + } + + // Re-create the topology client. + client, err = hn.newTopologyClient(ctxb) + if err != nil { + return fmt.Errorf("create topologyClient "+ + "failed: %v", err) + } + + continue + + case strings.Contains(err.Error(), "EOF"): + // End of subscription stream. Do nothing and quit. + return nil + + default: + // An expected error is returned, return and leave it + // to be handled by the caller. + return fmt.Errorf("graph subscription err: %v", err) + } + + // Send the update or quit. + select { + case receiver <- update: + case <-hn.quit: + return nil + } + } +} diff --git a/lntest/wait/wait.go b/lntest/wait/wait.go index a2981816..4bd8c7a2 100644 --- a/lntest/wait/wait.go +++ b/lntest/wait/wait.go @@ -5,17 +5,18 @@ import ( "time" ) +// PollInterval is a constant specifying a 200 ms interval. +const PollInterval = 200 * time.Millisecond + // Predicate is a helper test function that will wait for a timeout period of // time until the passed predicate returns true. This function is helpful as // timing doesn't always line up well when running integration tests with // several running lnd nodes. This function gives callers a way to assert that // some property is upheld within a particular time frame. func Predicate(pred func() bool, timeout time.Duration) error { - const pollInterval = 200 * time.Millisecond - exitTimer := time.After(timeout) for { - <-time.After(pollInterval) + <-time.After(PollInterval) select { case <-exitTimer: From d2277ac915eb4c6fc90da8a547e5c930bae2ddff Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 8 Aug 2021 17:01:56 +0800 Subject: [PATCH 06/14] itest: replace chanOpen bool with chanWatchType --- lntest/node.go | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/lntest/node.go b/lntest/node.go index 1cdd2f0b..d4207ed4 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -1296,13 +1296,25 @@ func (hn *HarnessNode) kill() error { return hn.cmd.Process.Kill() } +type chanWatchType uint8 + +const ( + // watchOpenChannel specifies that this is a request to watch an open + // channel event. + watchOpenChannel chanWatchType = iota + + // watchCloseChannel specifies that this is a request to watch a close + // channel event. + watchCloseChannel +) + // closeChanWatchRequest is a request to the lightningNetworkWatcher to be // notified once it's detected within the test Lightning Network, that a // channel has either been added or closed. type chanWatchRequest struct { chanPoint wire.OutPoint - chanOpen bool + chanWatchType chanWatchType eventChan chan struct{} } @@ -1387,14 +1399,15 @@ func (hn *HarnessNode) lightningNetworkWatcher() { // to dispatch immediately, or need to add the client for // processing later. case watchRequest := <-hn.chanWatchRequests: - // TODO(roasbeef): add update type also, checks for - // multiple of 2 - if watchRequest.chanOpen { + switch watchRequest.chanWatchType { + case watchOpenChannel: + // TODO(roasbeef): add update type also, checks + // for multiple of 2 hn.handleOpenChannelWatchRequest(watchRequest) - continue - } - hn.handleCloseChannelWatchRequest(watchRequest) + case watchCloseChannel: + hn.handleCloseChannelWatchRequest(watchRequest) + } case <-hn.quit: return @@ -1418,9 +1431,9 @@ func (hn *HarnessNode) WaitForNetworkChannelOpen(ctx context.Context, } hn.chanWatchRequests <- &chanWatchRequest{ - chanPoint: op, - eventChan: eventChan, - chanOpen: true, + chanPoint: op, + eventChan: eventChan, + chanWatchType: watchOpenChannel, } select { @@ -1448,9 +1461,9 @@ func (hn *HarnessNode) WaitForNetworkChannelClose(ctx context.Context, } hn.chanWatchRequests <- &chanWatchRequest{ - chanPoint: op, - eventChan: eventChan, - chanOpen: false, + chanPoint: op, + eventChan: eventChan, + chanWatchType: watchCloseChannel, } select { From 87c13d31b4c9a38fc8d6db92f7e7f41a44bb38e0 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 8 Aug 2021 17:08:37 +0800 Subject: [PATCH 07/14] itest: watch channel policy updates in harness node --- lntest/itest/assertions.go | 35 +------ lntest/node.go | 199 +++++++++++++++++++++++++++++++++++++ 2 files changed, 201 insertions(+), 33 deletions(-) diff --git a/lntest/itest/assertions.go b/lntest/itest/assertions.go index 160d65a9..fe881f1d 100644 --- a/lntest/itest/assertions.go +++ b/lntest/itest/assertions.go @@ -686,7 +686,7 @@ out: continue } - err := checkChannelPolicy( + err := lntest.CheckChannelPolicy( update.RoutingPolicy, exp.expectedPolicy, ) @@ -810,44 +810,13 @@ func assertChannelPolicy(t *harnessTest, node *lntest.HarnessNode, policies := getChannelPolicies(t, node, advertisingNode, chanPoints...) for _, policy := range policies { - err := checkChannelPolicy(policy, expectedPolicy) + err := lntest.CheckChannelPolicy(policy, expectedPolicy) if err != nil { t.Fatalf(err.Error()) } } } -// checkChannelPolicy checks that the policy matches the expected one. -func checkChannelPolicy(policy, expectedPolicy *lnrpc.RoutingPolicy) error { - if policy.FeeBaseMsat != expectedPolicy.FeeBaseMsat { - return fmt.Errorf("expected base fee %v, got %v", - expectedPolicy.FeeBaseMsat, policy.FeeBaseMsat) - } - if policy.FeeRateMilliMsat != expectedPolicy.FeeRateMilliMsat { - return fmt.Errorf("expected fee rate %v, got %v", - expectedPolicy.FeeRateMilliMsat, - policy.FeeRateMilliMsat) - } - if policy.TimeLockDelta != expectedPolicy.TimeLockDelta { - return fmt.Errorf("expected time lock delta %v, got %v", - expectedPolicy.TimeLockDelta, - policy.TimeLockDelta) - } - if policy.MinHtlc != expectedPolicy.MinHtlc { - return fmt.Errorf("expected min htlc %v, got %v", - expectedPolicy.MinHtlc, policy.MinHtlc) - } - if policy.MaxHtlcMsat != expectedPolicy.MaxHtlcMsat { - return fmt.Errorf("expected max htlc %v, got %v", - expectedPolicy.MaxHtlcMsat, policy.MaxHtlcMsat) - } - if policy.Disabled != expectedPolicy.Disabled { - return errors.New("edge should be disabled but isn't") - } - - return nil -} - // assertMinerBlockHeightDelta ensures that tempMiner is 'delta' blocks ahead // of miner. func assertMinerBlockHeightDelta(t *harnessTest, diff --git a/lntest/node.go b/lntest/node.go index d4207ed4..71d2044e 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -341,6 +341,21 @@ func (cfg NodeConfig) genArgs() []string { return args } +// policyUpdateMap defines a type to store channel policy updates. It has the +// format, +// { +// "chanPoint1": { +// "advertisingNode1": [ +// policy1, policy2, ... +// ], +// "advertisingNode2": [ +// policy1, policy2, ... +// ] +// }, +// "chanPoint2": ... +// } +type policyUpdateMap map[string]map[string][]*lnrpc.RoutingPolicy + // HarnessNode represents an instance of lnd running within our test network // harness. Each HarnessNode instance also fully embeds an RPC client in // order to pragmatically drive the node. @@ -376,6 +391,10 @@ type HarnessNode struct { closedChans map[wire.OutPoint]struct{} closeChanWatchers map[wire.OutPoint][]chan struct{} + // policyUpdates stores a slice of seen polices by each advertising + // node and the outpoint. + policyUpdates policyUpdateMap + quit chan struct{} wg sync.WaitGroup @@ -451,6 +470,8 @@ func newNode(cfg NodeConfig) (*HarnessNode, error) { closedChans: make(map[wire.OutPoint]struct{}), closeChanWatchers: make(map[wire.OutPoint][]chan struct{}), + + policyUpdates: policyUpdateMap{}, }, nil } @@ -1306,6 +1327,10 @@ const ( // watchCloseChannel specifies that this is a request to watch a close // channel event. watchCloseChannel + + // watchPolicyUpdate specifies that this is a request to watch a policy + // update event. + watchPolicyUpdate ) // closeChanWatchRequest is a request to the lightningNetworkWatcher to be @@ -1317,6 +1342,10 @@ type chanWatchRequest struct { chanWatchType chanWatchType eventChan chan struct{} + + advertisingNode string + policy *lnrpc.RoutingPolicy + includeUnannounced bool } // getChanPointFundingTxid returns the given channel point's funding txid in @@ -1407,6 +1436,9 @@ func (hn *HarnessNode) lightningNetworkWatcher() { case watchCloseChannel: hn.handleCloseChannelWatchRequest(watchRequest) + + case watchPolicyUpdate: + hn.handlePolicyUpdateWatchRequest(watchRequest) } case <-hn.quit: @@ -1475,6 +1507,47 @@ func (hn *HarnessNode) WaitForNetworkChannelClose(ctx context.Context, } } +// WaitForChannelPolicyUpdate will block until a channel policy with the target +// outpoint and advertisingNode is seen within the network. +func (hn *HarnessNode) WaitForChannelPolicyUpdate(ctx context.Context, + advertisingNode string, policy *lnrpc.RoutingPolicy, + chanPoint *lnrpc.ChannelPoint, includeUnannounced bool) error { + + eventChan := make(chan struct{}) + + op, err := MakeOutpoint(chanPoint) + if err != nil { + return fmt.Errorf("failed to create outpoint for %v"+ + "got err: %v", chanPoint, err) + } + + ticker := time.NewTicker(wait.PollInterval) + defer ticker.Stop() + + for { + select { + // Send a watch request every second. + case <-ticker.C: + hn.chanWatchRequests <- &chanWatchRequest{ + chanPoint: op, + eventChan: eventChan, + chanWatchType: watchPolicyUpdate, + policy: policy, + advertisingNode: advertisingNode, + includeUnannounced: includeUnannounced, + } + + case <-eventChan: + return nil + + case <-ctx.Done(): + return fmt.Errorf("channel:%s policy not updated "+ + "before timeout: [%s:%v] %s", op, + advertisingNode, policy, hn.String()) + } + } +} + // WaitForBlockchainSync waits for the target node to be fully synchronized with // the blockchain. If the passed context object has a set timeout, it will // continually poll until the timeout has elapsed. In the case that the chain @@ -1581,6 +1654,25 @@ func (hn *HarnessNode) handleChannelEdgeUpdates( close(eventChan) } delete(hn.openChanWatchers, op) + + // Check whether there's a routing policy update. If so, save + // it to the node state. + if newChan.RoutingPolicy == nil { + continue + } + + // Append the policy to the slice. + node := newChan.AdvertisingNode + policies := hn.policyUpdates[op.String()] + + // If the map[op] is nil, we need to initialize the map first. + if policies == nil { + policies = make(map[string][]*lnrpc.RoutingPolicy) + } + policies[node] = append( + policies[node], newChan.RoutingPolicy, + ) + hn.policyUpdates[op.String()] = policies } } @@ -1754,3 +1846,110 @@ func (hn *HarnessNode) receiveTopologyClientStream( } } } + +// CheckChannelPolicy checks that the policy matches the expected one. +func CheckChannelPolicy(policy, expectedPolicy *lnrpc.RoutingPolicy) error { + if policy.FeeBaseMsat != expectedPolicy.FeeBaseMsat { + return fmt.Errorf("expected base fee %v, got %v", + expectedPolicy.FeeBaseMsat, policy.FeeBaseMsat) + } + if policy.FeeRateMilliMsat != expectedPolicy.FeeRateMilliMsat { + return fmt.Errorf("expected fee rate %v, got %v", + expectedPolicy.FeeRateMilliMsat, + policy.FeeRateMilliMsat) + } + if policy.TimeLockDelta != expectedPolicy.TimeLockDelta { + return fmt.Errorf("expected time lock delta %v, got %v", + expectedPolicy.TimeLockDelta, + policy.TimeLockDelta) + } + if policy.MinHtlc != expectedPolicy.MinHtlc { + return fmt.Errorf("expected min htlc %v, got %v", + expectedPolicy.MinHtlc, policy.MinHtlc) + } + if policy.MaxHtlcMsat != expectedPolicy.MaxHtlcMsat { + return fmt.Errorf("expected max htlc %v, got %v", + expectedPolicy.MaxHtlcMsat, policy.MaxHtlcMsat) + } + if policy.Disabled != expectedPolicy.Disabled { + return errors.New("edge should be disabled but isn't") + } + + return nil +} + +// handlePolicyUpdateWatchRequest checks that if the expected policy can be +// found either in the node's interval state or describe graph response. If +// found, it will signal the request by closing the event channel. Otherwise it +// does nothing but returns nil. +func (hn *HarnessNode) handlePolicyUpdateWatchRequest(req *chanWatchRequest) { + op := req.chanPoint + + // Get a list of known policies for this chanPoint+advertisingNode + // combination. Start searching in the node state first. + policies, ok := hn.policyUpdates[op.String()][req.advertisingNode] + + if !ok { + // If it cannot be found in the node state, try searching it + // from the node's DescribeGraph. + policyMap := hn.getChannelPolicies(req.includeUnannounced) + policies, ok = policyMap[op.String()][req.advertisingNode] + if !ok { + return + } + } + + // Check if there's a matched policy. + for _, policy := range policies { + if CheckChannelPolicy(policy, req.policy) == nil { + close(req.eventChan) + return + } + } +} + +// getChannelPolicies queries the channel graph and formats the policies into +// the format defined in type policyUpdateMap. +func (hn *HarnessNode) getChannelPolicies(include bool) policyUpdateMap { + + ctxt, cancel := context.WithTimeout( + context.Background(), DefaultTimeout, + ) + defer cancel() + + graph, err := hn.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{ + IncludeUnannounced: include, + }) + if err != nil { + hn.PrintErr("DescribeGraph got err: %v", err) + return nil + } + + policyUpdates := policyUpdateMap{} + + for _, e := range graph.Edges { + + policies := policyUpdates[e.ChanPoint] + + // If the map[op] is nil, we need to initialize the map first. + if policies == nil { + policies = make(map[string][]*lnrpc.RoutingPolicy) + } + + if e.Node1Policy != nil { + policies[e.Node1Pub] = append( + policies[e.Node1Pub], e.Node1Policy, + ) + } + + if e.Node2Policy != nil { + policies[e.Node2Pub] = append( + policies[e.Node2Pub], e.Node2Policy, + ) + } + + policyUpdates[e.ChanPoint] = policies + } + + return policyUpdates +} From 06fa17513ca6460ea4401700cea5050133738013 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 8 Aug 2021 04:50:45 +0800 Subject: [PATCH 08/14] itest: move tests by their category --- lntest/itest/assertions.go | 67 ---- lntest/itest/lnd_channel_graph_test.go | 67 ++++ lntest/itest/lnd_channel_policy_test.go | 377 ++++++++++++++++++-- lntest/itest/lnd_misc_test.go | 203 ----------- lntest/itest/lnd_multi-hop-payments_test.go | 45 +++ lntest/itest/lnd_routing_test.go | 139 -------- 6 files changed, 451 insertions(+), 447 deletions(-) diff --git a/lntest/itest/assertions.go b/lntest/itest/assertions.go index fe881f1d..4b540a09 100644 --- a/lntest/itest/assertions.go +++ b/lntest/itest/assertions.go @@ -4,7 +4,6 @@ import ( "context" "encoding/hex" "fmt" - "io" "math" "sync/atomic" "testing" @@ -94,72 +93,6 @@ func openChannelAndAssert(t *harnessTest, net *lntest.NetworkHarness, return fundingChanPoint } -// graphSubscription houses the proxied update and error chans for a node's -// graph subscriptions. -type graphSubscription struct { - updateChan chan *lnrpc.GraphTopologyUpdate - errChan chan error - quit chan struct{} -} - -// subscribeGraphNotifications subscribes to channel graph updates and launches -// a goroutine that forwards these to the returned channel. -func subscribeGraphNotifications(ctxb context.Context, t *harnessTest, - node *lntest.HarnessNode) graphSubscription { - - // We'll first start by establishing a notification client which will - // send us notifications upon detected changes in the channel graph. - req := &lnrpc.GraphTopologySubscription{} - ctx, cancelFunc := context.WithCancel(ctxb) - topologyClient, err := node.SubscribeChannelGraph(ctx, req) - require.NoError(t.t, err, "unable to create topology client") - - // We'll launch a goroutine that will be responsible for proxying all - // notifications recv'd from the client into the channel below. - errChan := make(chan error, 1) - quit := make(chan struct{}) - graphUpdates := make(chan *lnrpc.GraphTopologyUpdate, 20) - go func() { - for { - defer cancelFunc() - - select { - case <-quit: - return - default: - graphUpdate, err := topologyClient.Recv() - select { - case <-quit: - return - default: - } - - if err == io.EOF { - return - } else if err != nil { - select { - case errChan <- err: - case <-quit: - } - return - } - - select { - case graphUpdates <- graphUpdate: - case <-quit: - return - } - } - } - }() - - return graphSubscription{ - updateChan: graphUpdates, - errChan: errChan, - quit: quit, - } -} - func waitForGraphSync(t *harnessTest, node *lntest.HarnessNode) { t.t.Helper() diff --git a/lntest/itest/lnd_channel_graph_test.go b/lntest/itest/lnd_channel_graph_test.go index f357696b..774f8e2b 100644 --- a/lntest/itest/lnd_channel_graph_test.go +++ b/lntest/itest/lnd_channel_graph_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "testing" "time" @@ -749,3 +750,69 @@ func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) { // Close the channel between Bob and Dave. closeChannelAndAssert(t, net, net.Bob, chanPoint, false) } + +// graphSubscription houses the proxied update and error chans for a node's +// graph subscriptions. +type graphSubscription struct { + updateChan chan *lnrpc.GraphTopologyUpdate + errChan chan error + quit chan struct{} +} + +// subscribeGraphNotifications subscribes to channel graph updates and launches +// a goroutine that forwards these to the returned channel. +func subscribeGraphNotifications(ctxb context.Context, t *harnessTest, + node *lntest.HarnessNode) graphSubscription { + + // We'll first start by establishing a notification client which will + // send us notifications upon detected changes in the channel graph. + req := &lnrpc.GraphTopologySubscription{} + ctx, cancelFunc := context.WithCancel(ctxb) + topologyClient, err := node.SubscribeChannelGraph(ctx, req) + require.NoError(t.t, err, "unable to create topology client") + + // We'll launch a goroutine that will be responsible for proxying all + // notifications recv'd from the client into the channel below. + errChan := make(chan error, 1) + quit := make(chan struct{}) + graphUpdates := make(chan *lnrpc.GraphTopologyUpdate, 20) + go func() { + for { + defer cancelFunc() + + select { + case <-quit: + return + default: + graphUpdate, err := topologyClient.Recv() + select { + case <-quit: + return + default: + } + + if err == io.EOF { + return + } else if err != nil { + select { + case errChan <- err: + case <-quit: + } + return + } + + select { + case graphUpdates <- graphUpdate: + case <-quit: + return + } + } + } + }() + + return graphSubscription{ + updateChan: graphUpdates, + errChan: errChan, + quit: quit, + } +} diff --git a/lntest/itest/lnd_channel_policy_test.go b/lntest/itest/lnd_channel_policy_test.go index 6906d54a..67c76f34 100644 --- a/lntest/itest/lnd_channel_policy_test.go +++ b/lntest/itest/lnd_channel_policy_test.go @@ -3,13 +3,16 @@ package itest import ( "context" "strings" + "time" + "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd/chainreg" "github.com/lightningnetwork/lnd/funding" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" ) // testUpdateChannelPolicy tests that policy updates made to a channel @@ -509,47 +512,345 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { closeChannelAndAssert(t, net, net.Alice, chanPoint3, false) } -// updateChannelPolicy updates the channel policy of node to the -// given fees and timelock delta. This function blocks until -// listenerNode has received the policy update. -func updateChannelPolicy(t *harnessTest, node *lntest.HarnessNode, - chanPoint *lnrpc.ChannelPoint, baseFee int64, feeRate int64, - timeLockDelta uint32, maxHtlc uint64, listenerNode *lntest.HarnessNode) { - +// testSendUpdateDisableChannel ensures that a channel update with the disable +// flag set is sent once a channel has been either unilaterally or cooperatively +// closed. +func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() - expectedPolicy := &lnrpc.RoutingPolicy{ - FeeBaseMsat: baseFee, - FeeRateMilliMsat: feeRate, - TimeLockDelta: timeLockDelta, - MinHtlc: 1000, // default value - MaxHtlcMsat: maxHtlc, - } + const ( + chanAmt = 100000 + ) - updateFeeReq := &lnrpc.PolicyUpdateRequest{ - BaseFeeMsat: baseFee, - FeeRate: float64(feeRate) / testFeeBase, - TimeLockDelta: timeLockDelta, - Scope: &lnrpc.PolicyUpdateRequest_ChanPoint{ - ChanPoint: chanPoint, - }, - MaxHtlcMsat: maxHtlc, - } - - ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) - if _, err := node.UpdateChannelPolicy(ctxt, updateFeeReq); err != nil { - t.Fatalf("unable to update chan policy: %v", err) - } - - // Wait for listener node to receive the channel update from node. - ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) - graphSub := subscribeGraphNotifications(ctxt, t, listenerNode) - defer close(graphSub.quit) - - waitForChannelUpdate( - t, graphSub, - []expectedChanUpdate{ - {node.PubKeyStr, expectedPolicy, chanPoint}, + // Open a channel between Alice and Bob and Alice and Carol. These will + // be closed later on in order to trigger channel update messages + // marking the channels as disabled. + chanPointAliceBob := openChannelAndAssert( + t, net, net.Alice, net.Bob, + lntest.OpenChannelParams{ + Amt: chanAmt, }, ) + + carol := net.NewNode( + t.t, "Carol", []string{ + "--minbackoff=10s", + "--chan-enable-timeout=1.5s", + "--chan-disable-timeout=3s", + "--chan-status-sample-interval=.5s", + }) + defer shutdownAndAssert(net, t, carol) + + net.ConnectNodes(t.t, net.Alice, carol) + chanPointAliceCarol := openChannelAndAssert( + t, net, net.Alice, carol, + lntest.OpenChannelParams{ + Amt: chanAmt, + }, + ) + + // We create a new node Eve that has an inactive channel timeout of + // just 2 seconds (down from the default 20m). It will be used to test + // channel updates for channels going inactive. + eve := net.NewNode( + t.t, "Eve", []string{ + "--minbackoff=10s", + "--chan-enable-timeout=1.5s", + "--chan-disable-timeout=3s", + "--chan-status-sample-interval=.5s", + }) + defer shutdownAndAssert(net, t, eve) + + // Give Eve some coins. + net.SendCoins(t.t, btcutil.SatoshiPerBitcoin, eve) + + // Connect Eve to Carol and Bob, and open a channel to carol. + net.ConnectNodes(t.t, eve, carol) + net.ConnectNodes(t.t, eve, net.Bob) + + chanPointEveCarol := openChannelAndAssert( + t, net, eve, carol, + lntest.OpenChannelParams{ + Amt: chanAmt, + }, + ) + + // Launch a node for Dave which will connect to Bob in order to receive + // graph updates from. This will ensure that the channel updates are + // propagated throughout the network. + dave := net.NewNode(t.t, "Dave", nil) + defer shutdownAndAssert(net, t, dave) + + net.ConnectNodes(t.t, net.Bob, dave) + + daveSub := subscribeGraphNotifications(ctxb, t, dave) + defer close(daveSub.quit) + + // We should expect to see a channel update with the default routing + // policy, except that it should indicate the channel is disabled. + expectedPolicy := &lnrpc.RoutingPolicy{ + FeeBaseMsat: int64(chainreg.DefaultBitcoinBaseFeeMSat), + FeeRateMilliMsat: int64(chainreg.DefaultBitcoinFeeRate), + TimeLockDelta: chainreg.DefaultBitcoinTimeLockDelta, + MinHtlc: 1000, // default value + MaxHtlcMsat: calculateMaxHtlc(chanAmt), + Disabled: true, + } + + // Let Carol go offline. Since Eve has an inactive timeout of 2s, we + // expect her to send an update disabling the channel. + restartCarol, err := net.SuspendNode(carol) + if err != nil { + t.Fatalf("unable to suspend carol: %v", err) + } + waitForChannelUpdate( + t, daveSub, + []expectedChanUpdate{ + {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, + }, + ) + + // We restart Carol. Since the channel now becomes active again, Eve + // should send a ChannelUpdate setting the channel no longer disabled. + if err := restartCarol(); err != nil { + t.Fatalf("unable to restart carol: %v", err) + } + + expectedPolicy.Disabled = false + waitForChannelUpdate( + t, daveSub, + []expectedChanUpdate{ + {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, + }, + ) + + // Now we'll test a long disconnection. Disconnect Carol and Eve and + // ensure they both detect each other as disabled. Their min backoffs + // are high enough to not interfere with disabling logic. + if err := net.DisconnectNodes(carol, eve); err != nil { + t.Fatalf("unable to disconnect Carol from Eve: %v", err) + } + + // Wait for a disable from both Carol and Eve to come through. + expectedPolicy.Disabled = true + waitForChannelUpdate( + t, daveSub, + []expectedChanUpdate{ + {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, + {carol.PubKeyStr, expectedPolicy, chanPointEveCarol}, + }, + ) + + // Reconnect Carol and Eve, this should cause them to reenable the + // channel from both ends after a short delay. + net.EnsureConnected(t.t, carol, eve) + + expectedPolicy.Disabled = false + waitForChannelUpdate( + t, daveSub, + []expectedChanUpdate{ + {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, + {carol.PubKeyStr, expectedPolicy, chanPointEveCarol}, + }, + ) + + // Now we'll test a short disconnection. Disconnect Carol and Eve, then + // reconnect them after one second so that their scheduled disables are + // aborted. One second is twice the status sample interval, so this + // should allow for the disconnect to be detected, but still leave time + // to cancel the announcement before the 3 second inactive timeout is + // hit. + if err := net.DisconnectNodes(carol, eve); err != nil { + t.Fatalf("unable to disconnect Carol from Eve: %v", err) + } + time.Sleep(time.Second) + net.EnsureConnected(t.t, eve, carol) + + // Since the disable should have been canceled by both Carol and Eve, we + // expect no channel updates to appear on the network. + assertNoChannelUpdates(t, daveSub, 4*time.Second) + + // Close Alice's channels with Bob and Carol cooperatively and + // unilaterally respectively. + _, _, err = net.CloseChannel(net.Alice, chanPointAliceBob, false) + if err != nil { + t.Fatalf("unable to close channel: %v", err) + } + + _, _, err = net.CloseChannel(net.Alice, chanPointAliceCarol, true) + if err != nil { + t.Fatalf("unable to close channel: %v", err) + } + + // Now that the channel close processes have been started, we should + // receive an update marking each as disabled. + expectedPolicy.Disabled = true + waitForChannelUpdate( + t, daveSub, + []expectedChanUpdate{ + {net.Alice.PubKeyStr, expectedPolicy, chanPointAliceBob}, + {net.Alice.PubKeyStr, expectedPolicy, chanPointAliceCarol}, + }, + ) + + // Finally, close the channels by mining the closing transactions. + mineBlocks(t, net, 1, 2) + + // Also do this check for Eve's channel with Carol. + _, _, err = net.CloseChannel(eve, chanPointEveCarol, false) + if err != nil { + t.Fatalf("unable to close channel: %v", err) + } + + waitForChannelUpdate( + t, daveSub, + []expectedChanUpdate{ + {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, + }, + ) + mineBlocks(t, net, 1, 1) + + // And finally, clean up the force closed channel by mining the + // sweeping transaction. + cleanupForceClose(t, net, net.Alice, chanPointAliceCarol) +} + +// testUpdateChannelPolicyForPrivateChannel tests when a private channel +// updates its channel edge policy, we will use the updated policy to send our +// payment. +// The topology is created as: Alice -> Bob -> Carol, where Alice -> Bob is +// public and Bob -> Carol is private. After an invoice is created by Carol, +// Bob will update the base fee via UpdateChannelPolicy, we will test that +// Alice will not fail the payment and send it using the updated channel +// policy. +func testUpdateChannelPolicyForPrivateChannel(net *lntest.NetworkHarness, + t *harnessTest) { + + ctxb := context.Background() + defer ctxb.Done() + + // We'll create the following topology first, + // Alice <--public:100k--> Bob <--private:100k--> Carol + const chanAmt = btcutil.Amount(100000) + + // Open a channel with 100k satoshis between Alice and Bob. + chanPointAliceBob := openChannelAndAssert( + t, net, net.Alice, net.Bob, + lntest.OpenChannelParams{ + Amt: chanAmt, + }, + ) + defer closeChannelAndAssert(t, net, net.Alice, chanPointAliceBob, false) + + // Get Alice's funding point. + aliceChanTXID, err := lnrpc.GetChanPointFundingTxid(chanPointAliceBob) + require.NoError(t.t, err, "unable to get txid") + aliceFundPoint := wire.OutPoint{ + Hash: *aliceChanTXID, + Index: chanPointAliceBob.OutputIndex, + } + + // Create a new node Carol. + carol := net.NewNode(t.t, "Carol", nil) + defer shutdownAndAssert(net, t, carol) + + // Connect Carol to Bob. + net.ConnectNodes(t.t, carol, net.Bob) + + // Open a channel with 100k satoshis between Bob and Carol. + chanPointBobCarol := openChannelAndAssert( + t, net, net.Bob, carol, + lntest.OpenChannelParams{ + Amt: chanAmt, + Private: true, + }, + ) + defer closeChannelAndAssert(t, net, net.Bob, chanPointBobCarol, false) + + // Get Bob's funding point. + bobChanTXID, err := lnrpc.GetChanPointFundingTxid(chanPointBobCarol) + require.NoError(t.t, err, "unable to get txid") + bobFundPoint := wire.OutPoint{ + Hash: *bobChanTXID, + Index: chanPointBobCarol.OutputIndex, + } + + // We should have the following topology now, + // Alice <--public:100k--> Bob <--private:100k--> Carol + // + // Now we will create an invoice for Carol. + const paymentAmt = 20000 + invoice := &lnrpc.Invoice{ + Memo: "routing hints", + Value: paymentAmt, + Private: true, + } + ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) + resp, err := carol.AddInvoice(ctxt, invoice) + require.NoError(t.t, err, "unable to create invoice for carol") + + // Bob now updates the channel edge policy for the private channel. + const ( + baseFeeMSat = 33000 + ) + timeLockDelta := uint32(chainreg.DefaultBitcoinTimeLockDelta) + updateFeeReq := &lnrpc.PolicyUpdateRequest{ + BaseFeeMsat: baseFeeMSat, + TimeLockDelta: timeLockDelta, + Scope: &lnrpc.PolicyUpdateRequest_ChanPoint{ + ChanPoint: chanPointBobCarol, + }, + } + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + _, err = net.Bob.UpdateChannelPolicy(ctxt, updateFeeReq) + require.NoError(t.t, err, "unable to update chan policy") + + // Alice pays the invoices. She will use the updated baseFeeMSat in the + // payment + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + payReqs := []string{resp.PaymentRequest} + require.NoError(t.t, + completePaymentRequests( + net.Alice, net.Alice.RouterClient, payReqs, true, + ), "unable to send payment", + ) + + // Check that Alice did make the payment with two HTLCs, one failed and + // one succeeded. + ctxt, _ = context.WithTimeout(ctxt, defaultTimeout) + paymentsResp, err := net.Alice.ListPayments( + ctxt, &lnrpc.ListPaymentsRequest{}, + ) + require.NoError(t.t, err, "failed to obtain payments for Alice") + require.Equal(t.t, 1, len(paymentsResp.Payments), "expected 1 payment") + + htlcs := paymentsResp.Payments[0].Htlcs + require.Equal(t.t, 2, len(htlcs), "expected to have 2 HTLCs") + require.Equal( + t.t, lnrpc.HTLCAttempt_FAILED, htlcs[0].Status, + "the first HTLC attempt should fail", + ) + require.Equal( + t.t, lnrpc.HTLCAttempt_SUCCEEDED, htlcs[1].Status, + "the second HTLC attempt should succeed", + ) + + // Carol should have received 20k satoshis from Bob. + assertAmountPaid(t, "Carol(remote) [<=private] Bob(local)", + carol, bobFundPoint, 0, paymentAmt) + + // Bob should have sent 20k satoshis to Carol. + assertAmountPaid(t, "Bob(local) [private=>] Carol(remote)", + net.Bob, bobFundPoint, paymentAmt, 0) + + // Calcuate the amount in satoshis. + amtExpected := int64(paymentAmt + baseFeeMSat/1000) + + // Bob should have received 20k satoshis + fee from Alice. + assertAmountPaid(t, "Bob(remote) <= Alice(local)", + net.Bob, aliceFundPoint, 0, amtExpected) + + // Alice should have sent 20k satoshis + fee to Bob. + assertAmountPaid(t, "Alice(local) => Bob(remote)", + net.Alice, aliceFundPoint, amtExpected, 0) } diff --git a/lntest/itest/lnd_misc_test.go b/lntest/itest/lnd_misc_test.go index 9b9af7a4..3d2c877e 100644 --- a/lntest/itest/lnd_misc_test.go +++ b/lntest/itest/lnd_misc_test.go @@ -1323,209 +1323,6 @@ func testNodeSignVerify(net *lntest.NetworkHarness, t *harnessTest) { closeChannelAndAssert(t, net, net.Alice, aliceBobCh, false) } -// testSendUpdateDisableChannel ensures that a channel update with the disable -// flag set is sent once a channel has been either unilaterally or cooperatively -// closed. -func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { - ctxb := context.Background() - - const ( - chanAmt = 100000 - ) - - // Open a channel between Alice and Bob and Alice and Carol. These will - // be closed later on in order to trigger channel update messages - // marking the channels as disabled. - chanPointAliceBob := openChannelAndAssert( - t, net, net.Alice, net.Bob, - lntest.OpenChannelParams{ - Amt: chanAmt, - }, - ) - - carol := net.NewNode( - t.t, "Carol", []string{ - "--minbackoff=10s", - "--chan-enable-timeout=1.5s", - "--chan-disable-timeout=3s", - "--chan-status-sample-interval=.5s", - }) - defer shutdownAndAssert(net, t, carol) - - net.ConnectNodes(t.t, net.Alice, carol) - chanPointAliceCarol := openChannelAndAssert( - t, net, net.Alice, carol, - lntest.OpenChannelParams{ - Amt: chanAmt, - }, - ) - - // We create a new node Eve that has an inactive channel timeout of - // just 2 seconds (down from the default 20m). It will be used to test - // channel updates for channels going inactive. - eve := net.NewNode( - t.t, "Eve", []string{ - "--minbackoff=10s", - "--chan-enable-timeout=1.5s", - "--chan-disable-timeout=3s", - "--chan-status-sample-interval=.5s", - }) - defer shutdownAndAssert(net, t, eve) - - // Give Eve some coins. - net.SendCoins(t.t, btcutil.SatoshiPerBitcoin, eve) - - // Connect Eve to Carol and Bob, and open a channel to carol. - net.ConnectNodes(t.t, eve, carol) - net.ConnectNodes(t.t, eve, net.Bob) - - chanPointEveCarol := openChannelAndAssert( - t, net, eve, carol, - lntest.OpenChannelParams{ - Amt: chanAmt, - }, - ) - - // Launch a node for Dave which will connect to Bob in order to receive - // graph updates from. This will ensure that the channel updates are - // propagated throughout the network. - dave := net.NewNode(t.t, "Dave", nil) - defer shutdownAndAssert(net, t, dave) - - net.ConnectNodes(t.t, net.Bob, dave) - - daveSub := subscribeGraphNotifications(ctxb, t, dave) - defer close(daveSub.quit) - - // We should expect to see a channel update with the default routing - // policy, except that it should indicate the channel is disabled. - expectedPolicy := &lnrpc.RoutingPolicy{ - FeeBaseMsat: int64(chainreg.DefaultBitcoinBaseFeeMSat), - FeeRateMilliMsat: int64(chainreg.DefaultBitcoinFeeRate), - TimeLockDelta: chainreg.DefaultBitcoinTimeLockDelta, - MinHtlc: 1000, // default value - MaxHtlcMsat: calculateMaxHtlc(chanAmt), - Disabled: true, - } - - // Let Carol go offline. Since Eve has an inactive timeout of 2s, we - // expect her to send an update disabling the channel. - restartCarol, err := net.SuspendNode(carol) - if err != nil { - t.Fatalf("unable to suspend carol: %v", err) - } - waitForChannelUpdate( - t, daveSub, - []expectedChanUpdate{ - {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, - }, - ) - - // We restart Carol. Since the channel now becomes active again, Eve - // should send a ChannelUpdate setting the channel no longer disabled. - if err := restartCarol(); err != nil { - t.Fatalf("unable to restart carol: %v", err) - } - - expectedPolicy.Disabled = false - waitForChannelUpdate( - t, daveSub, - []expectedChanUpdate{ - {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, - }, - ) - - // Now we'll test a long disconnection. Disconnect Carol and Eve and - // ensure they both detect each other as disabled. Their min backoffs - // are high enough to not interfere with disabling logic. - if err := net.DisconnectNodes(carol, eve); err != nil { - t.Fatalf("unable to disconnect Carol from Eve: %v", err) - } - - // Wait for a disable from both Carol and Eve to come through. - expectedPolicy.Disabled = true - waitForChannelUpdate( - t, daveSub, - []expectedChanUpdate{ - {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, - {carol.PubKeyStr, expectedPolicy, chanPointEveCarol}, - }, - ) - - // Reconnect Carol and Eve, this should cause them to reenable the - // channel from both ends after a short delay. - net.EnsureConnected(t.t, carol, eve) - - expectedPolicy.Disabled = false - waitForChannelUpdate( - t, daveSub, - []expectedChanUpdate{ - {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, - {carol.PubKeyStr, expectedPolicy, chanPointEveCarol}, - }, - ) - - // Now we'll test a short disconnection. Disconnect Carol and Eve, then - // reconnect them after one second so that their scheduled disables are - // aborted. One second is twice the status sample interval, so this - // should allow for the disconnect to be detected, but still leave time - // to cancel the announcement before the 3 second inactive timeout is - // hit. - if err := net.DisconnectNodes(carol, eve); err != nil { - t.Fatalf("unable to disconnect Carol from Eve: %v", err) - } - time.Sleep(time.Second) - net.EnsureConnected(t.t, eve, carol) - - // Since the disable should have been canceled by both Carol and Eve, we - // expect no channel updates to appear on the network. - assertNoChannelUpdates(t, daveSub, 4*time.Second) - - // Close Alice's channels with Bob and Carol cooperatively and - // unilaterally respectively. - _, _, err = net.CloseChannel(net.Alice, chanPointAliceBob, false) - if err != nil { - t.Fatalf("unable to close channel: %v", err) - } - - _, _, err = net.CloseChannel(net.Alice, chanPointAliceCarol, true) - if err != nil { - t.Fatalf("unable to close channel: %v", err) - } - - // Now that the channel close processes have been started, we should - // receive an update marking each as disabled. - expectedPolicy.Disabled = true - waitForChannelUpdate( - t, daveSub, - []expectedChanUpdate{ - {net.Alice.PubKeyStr, expectedPolicy, chanPointAliceBob}, - {net.Alice.PubKeyStr, expectedPolicy, chanPointAliceCarol}, - }, - ) - - // Finally, close the channels by mining the closing transactions. - mineBlocks(t, net, 1, 2) - - // Also do this check for Eve's channel with Carol. - _, _, err = net.CloseChannel(eve, chanPointEveCarol, false) - if err != nil { - t.Fatalf("unable to close channel: %v", err) - } - - waitForChannelUpdate( - t, daveSub, - []expectedChanUpdate{ - {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, - }, - ) - mineBlocks(t, net, 1, 1) - - // And finally, clean up the force closed channel by mining the - // sweeping transaction. - cleanupForceClose(t, net, net.Alice, chanPointAliceCarol) -} - // testAbandonChannel abandones a channel and asserts that it is no // longer open and not in one of the pending closure states. It also // verifies that the abandoned channel is reported as closed with close diff --git a/lntest/itest/lnd_multi-hop-payments_test.go b/lntest/itest/lnd_multi-hop-payments_test.go index 792d4fb1..734d5378 100644 --- a/lntest/itest/lnd_multi-hop-payments_test.go +++ b/lntest/itest/lnd_multi-hop-payments_test.go @@ -384,3 +384,48 @@ func assertEventAndType(t *harnessTest, eventType routerrpc.HtlcEvent_EventType, return event } + +// updateChannelPolicy updates the channel policy of node to the +// given fees and timelock delta. This function blocks until +// listenerNode has received the policy update. +func updateChannelPolicy(t *harnessTest, node *lntest.HarnessNode, + chanPoint *lnrpc.ChannelPoint, baseFee int64, feeRate int64, + timeLockDelta uint32, maxHtlc uint64, listenerNode *lntest.HarnessNode) { + + ctxb := context.Background() + + expectedPolicy := &lnrpc.RoutingPolicy{ + FeeBaseMsat: baseFee, + FeeRateMilliMsat: feeRate, + TimeLockDelta: timeLockDelta, + MinHtlc: 1000, // default value + MaxHtlcMsat: maxHtlc, + } + + updateFeeReq := &lnrpc.PolicyUpdateRequest{ + BaseFeeMsat: baseFee, + FeeRate: float64(feeRate) / testFeeBase, + TimeLockDelta: timeLockDelta, + Scope: &lnrpc.PolicyUpdateRequest_ChanPoint{ + ChanPoint: chanPoint, + }, + MaxHtlcMsat: maxHtlc, + } + + ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) + if _, err := node.UpdateChannelPolicy(ctxt, updateFeeReq); err != nil { + t.Fatalf("unable to update chan policy: %v", err) + } + + // Wait for listener node to receive the channel update from node. + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + graphSub := subscribeGraphNotifications(ctxt, t, listenerNode) + defer close(graphSub.quit) + + waitForChannelUpdate( + t, graphSub, + []expectedChanUpdate{ + {node.PubKeyStr, expectedPolicy, chanPoint}, + }, + ) +} diff --git a/lntest/itest/lnd_routing_test.go b/lntest/itest/lnd_routing_test.go index e6935415..6f33d3de 100644 --- a/lntest/itest/lnd_routing_test.go +++ b/lntest/itest/lnd_routing_test.go @@ -1017,145 +1017,6 @@ func testPrivateChannels(net *lntest.NetworkHarness, t *harnessTest) { closeChannelAndAssert(t, net, carol, chanPointPrivate, false) } -// testUpdateChannelPolicyForPrivateChannel tests when a private channel -// updates its channel edge policy, we will use the updated policy to send our -// payment. -// The topology is created as: Alice -> Bob -> Carol, where Alice -> Bob is -// public and Bob -> Carol is private. After an invoice is created by Carol, -// Bob will update the base fee via UpdateChannelPolicy, we will test that -// Alice will not fail the payment and send it using the updated channel -// policy. -func testUpdateChannelPolicyForPrivateChannel(net *lntest.NetworkHarness, - t *harnessTest) { - - ctxb := context.Background() - defer ctxb.Done() - - // We'll create the following topology first, - // Alice <--public:100k--> Bob <--private:100k--> Carol - const chanAmt = btcutil.Amount(100000) - - // Open a channel with 100k satoshis between Alice and Bob. - chanPointAliceBob := openChannelAndAssert( - t, net, net.Alice, net.Bob, - lntest.OpenChannelParams{ - Amt: chanAmt, - }, - ) - defer closeChannelAndAssert(t, net, net.Alice, chanPointAliceBob, false) - - // Get Alice's funding point. - aliceChanTXID, err := lnrpc.GetChanPointFundingTxid(chanPointAliceBob) - require.NoError(t.t, err, "unable to get txid") - aliceFundPoint := wire.OutPoint{ - Hash: *aliceChanTXID, - Index: chanPointAliceBob.OutputIndex, - } - - // Create a new node Carol. - carol := net.NewNode(t.t, "Carol", nil) - defer shutdownAndAssert(net, t, carol) - - // Connect Carol to Bob. - net.ConnectNodes(t.t, carol, net.Bob) - - // Open a channel with 100k satoshis between Bob and Carol. - chanPointBobCarol := openChannelAndAssert( - t, net, net.Bob, carol, - lntest.OpenChannelParams{ - Amt: chanAmt, - Private: true, - }, - ) - defer closeChannelAndAssert(t, net, net.Bob, chanPointBobCarol, false) - - // Get Bob's funding point. - bobChanTXID, err := lnrpc.GetChanPointFundingTxid(chanPointBobCarol) - require.NoError(t.t, err, "unable to get txid") - bobFundPoint := wire.OutPoint{ - Hash: *bobChanTXID, - Index: chanPointBobCarol.OutputIndex, - } - - // We should have the following topology now, - // Alice <--public:100k--> Bob <--private:100k--> Carol - // - // Now we will create an invoice for Carol. - const paymentAmt = 20000 - invoice := &lnrpc.Invoice{ - Memo: "routing hints", - Value: paymentAmt, - Private: true, - } - ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) - resp, err := carol.AddInvoice(ctxt, invoice) - require.NoError(t.t, err, "unable to create invoice for carol") - - // Bob now updates the channel edge policy for the private channel. - const ( - baseFeeMSat = 33000 - ) - timeLockDelta := uint32(chainreg.DefaultBitcoinTimeLockDelta) - updateFeeReq := &lnrpc.PolicyUpdateRequest{ - BaseFeeMsat: baseFeeMSat, - TimeLockDelta: timeLockDelta, - Scope: &lnrpc.PolicyUpdateRequest_ChanPoint{ - ChanPoint: chanPointBobCarol, - }, - } - ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) - _, err = net.Bob.UpdateChannelPolicy(ctxt, updateFeeReq) - require.NoError(t.t, err, "unable to update chan policy") - - // Alice pays the invoices. She will use the updated baseFeeMSat in the - // payment - payReqs := []string{resp.PaymentRequest} - require.NoError(t.t, - completePaymentRequests( - net.Alice, net.Alice.RouterClient, payReqs, true, - ), "unable to send payment", - ) - - // Check that Alice did make the payment with two HTLCs, one failed and - // one succeeded. - ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) - paymentsResp, err := net.Alice.ListPayments( - ctxt, &lnrpc.ListPaymentsRequest{}, - ) - require.NoError(t.t, err, "failed to obtain payments for Alice") - require.Equal(t.t, 1, len(paymentsResp.Payments), "expected 1 payment") - - htlcs := paymentsResp.Payments[0].Htlcs - require.Equal(t.t, 2, len(htlcs), "expected to have 2 HTLCs") - require.Equal( - t.t, lnrpc.HTLCAttempt_FAILED, htlcs[0].Status, - "the first HTLC attempt should fail", - ) - require.Equal( - t.t, lnrpc.HTLCAttempt_SUCCEEDED, htlcs[1].Status, - "the second HTLC attempt should succeed", - ) - - // Carol should have received 20k satoshis from Bob. - assertAmountPaid(t, "Carol(remote) [<=private] Bob(local)", - carol, bobFundPoint, 0, paymentAmt) - - // Bob should have sent 20k satoshis to Carol. - assertAmountPaid(t, "Bob(local) [private=>] Carol(remote)", - net.Bob, bobFundPoint, paymentAmt, 0) - - // Calcuate the amount in satoshis. - amtExpected := int64(paymentAmt + baseFeeMSat/1000) - - // Bob should have received 20k satoshis + fee from Alice. - assertAmountPaid(t, "Bob(remote) <= Alice(local)", - net.Bob, aliceFundPoint, 0, amtExpected) - - // Alice should have sent 20k satoshis + fee to Bob. - assertAmountPaid(t, "Alice(local) => Bob(remote)", - net.Alice, aliceFundPoint, amtExpected, 0) -} - // testInvoiceRoutingHints tests that the routing hints for an invoice are // created properly. func testInvoiceRoutingHints(net *lntest.NetworkHarness, t *harnessTest) { From cdec34c5f754de950a78bdab1e91ec1c7b1fe1f4 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 8 Aug 2021 17:12:37 +0800 Subject: [PATCH 09/14] itest: remove the method waitForChannelUpdate This commit removes the method waitForChannelUpdate, and uses node.WaitForChannelPolicyUpdate instead. --- lntest/itest/assertions.go | 147 ++----------- lntest/itest/lnd_channel_graph_test.go | 86 +++----- lntest/itest/lnd_channel_policy_test.go | 227 +++++++++----------- lntest/itest/lnd_multi-hop-payments_test.go | 12 +- lntest/itest/lnd_routing_test.go | 12 +- 5 files changed, 156 insertions(+), 328 deletions(-) diff --git a/lntest/itest/assertions.go b/lntest/itest/assertions.go index 4b540a09..0702ba34 100644 --- a/lntest/itest/assertions.go +++ b/lntest/itest/assertions.go @@ -146,15 +146,6 @@ func closeChannelAndAssertType(t *harnessTest, )[0] expectDisable := !curPolicy.Disabled - // If the current channel policy is enabled, begin subscribing the graph - // updates before initiating the channel closure. - var graphSub *graphSubscription - if expectDisable { - sub := subscribeGraphNotifications(ctxt, t, node) - graphSub = &sub - defer close(graphSub.quit) - } - closeUpdates, _, err := net.CloseChannel(node, fundingChanPoint, force) require.NoError(t.t, err, "unable to close channel") @@ -162,11 +153,9 @@ func closeChannelAndAssertType(t *harnessTest, // received the disabled update. if expectDisable { curPolicy.Disabled = true - waitForChannelUpdate( - t, *graphSub, - []expectedChanUpdate{ - {node.PubKeyStr, curPolicy, fundingChanPoint}, - }, + assertChannelPolicyUpdate( + t.t, node, node.PubKeyStr, + curPolicy, fundingChanPoint, false, ) } @@ -565,13 +554,6 @@ func getChannelBalance(t *harnessTest, return resp } -// expectedChanUpdate houses params we expect a ChannelUpdate to advertise. -type expectedChanUpdate struct { - advertisingNode string - expectedPolicy *lnrpc.RoutingPolicy - chanPoint *lnrpc.ChannelPoint -} - // txStr returns the string representation of the channel's funding transaction. func txStr(chanPoint *lnrpc.ChannelPoint) string { fundingTxID, err := lnrpc.GetChanPointFundingTxid(chanPoint) @@ -585,109 +567,6 @@ func txStr(chanPoint *lnrpc.ChannelPoint) string { return cp.String() } -// waitForChannelUpdate waits for a node to receive the expected channel -// updates. -func waitForChannelUpdate(t *harnessTest, subscription graphSubscription, - expUpdates []expectedChanUpdate) { - - // Create an array indicating which expected channel updates we have - // received. - found := make([]bool, len(expUpdates)) -out: - for { - select { - case graphUpdate := <-subscription.updateChan: - for _, update := range graphUpdate.ChannelUpdates { - require.NotZerof( - t.t, len(expUpdates), - "received unexpected channel "+ - "update from %v for channel %v", - update.AdvertisingNode, - update.ChanId, - ) - - // For each expected update, check if it matches - // the update we just received. - for i, exp := range expUpdates { - fundingTxStr := txStr(update.ChanPoint) - if fundingTxStr != txStr(exp.chanPoint) { - continue - } - - if update.AdvertisingNode != - exp.advertisingNode { - continue - } - - err := lntest.CheckChannelPolicy( - update.RoutingPolicy, - exp.expectedPolicy, - ) - if err != nil { - continue - } - - // We got a policy update that matched - // the values and channel point of what - // we expected, mark it as found. - found[i] = true - - // If we have no more channel updates - // we are waiting for, break out of the - // loop. - rem := 0 - for _, f := range found { - if !f { - rem++ - } - } - - if rem == 0 { - break out - } - - // Since we found a match among the - // expected updates, break out of the - // inner loop. - break - } - } - case err := <-subscription.errChan: - t.Fatalf("unable to recv graph update: %v", err) - case <-time.After(defaultTimeout): - if len(expUpdates) == 0 { - return - } - t.Fatalf("did not receive channel update") - } - } -} - -// assertNoChannelUpdates ensures that no ChannelUpdates are sent via the -// graphSubscription. This method will block for the provided duration before -// returning to the caller if successful. -func assertNoChannelUpdates(t *harnessTest, subscription graphSubscription, - duration time.Duration) { - - timeout := time.After(duration) - for { - select { - case graphUpdate := <-subscription.updateChan: - require.Zero( - t.t, len(graphUpdate.ChannelUpdates), - "no channel updates were expected", - ) - - case err := <-subscription.errChan: - t.Fatalf("graph subscription failure: %v", err) - - case <-timeout: - // No updates received, success. - return - } - } -} - // getChannelPolicies queries the channel graph and retrieves the current edge // policies for the provided channel points. func getChannelPolicies(t *harnessTest, node *lntest.HarnessNode, @@ -745,7 +624,7 @@ func assertChannelPolicy(t *harnessTest, node *lntest.HarnessNode, for _, policy := range policies { err := lntest.CheckChannelPolicy(policy, expectedPolicy) if err != nil { - t.Fatalf(err.Error()) + t.Fatalf(fmt.Sprintf("%v: %s", err.Error(), node)) } } } @@ -1761,3 +1640,21 @@ func assertNumUTXOs(t *testing.T, node *lntest.HarnessNode, expectedUtxos int) { }, defaultTimeout) require.NoError(t, err, "wait for listunspent") } + +// assertChannelPolicyUpdate checks that the required policy update has +// happened on the given node. +func assertChannelPolicyUpdate(t *testing.T, node *lntest.HarnessNode, + advertisingNode string, policy *lnrpc.RoutingPolicy, + chanPoint *lnrpc.ChannelPoint, includeUnannounced bool) { + + ctxb := context.Background() + ctxt, cancel := context.WithTimeout(ctxb, lntest.DefaultTimeout) + defer cancel() + + require.NoError( + t, node.WaitForChannelPolicyUpdate( + ctxt, advertisingNode, policy, + chanPoint, includeUnannounced, + ), "error while waiting for channel update", + ) +} diff --git a/lntest/itest/lnd_channel_graph_test.go b/lntest/itest/lnd_channel_graph_test.go index 774f8e2b..c7040e02 100644 --- a/lntest/itest/lnd_channel_graph_test.go +++ b/lntest/itest/lnd_channel_graph_test.go @@ -86,8 +86,20 @@ func testUpdateChanStatus(net *lntest.NetworkHarness, t *harnessTest) { net.ConnectNodes(t.t, alice, carol) net.ConnectNodes(t.t, bob, carol) - carolSub := subscribeGraphNotifications(ctxb, t, carol) - defer close(carolSub.quit) + // assertChannelUpdate checks that the required policy update has + // happened on the given node. + assertChannelUpdate := func(node *lntest.HarnessNode, + policy *lnrpc.RoutingPolicy) { + + ctxt, cancel := context.WithTimeout(ctxb, defaultTimeout) + defer cancel() + + require.NoError( + t.t, carol.WaitForChannelPolicyUpdate( + ctxt, node.PubKeyStr, policy, chanPoint, false, + ), "error while waiting for channel update", + ) + } // sendReq sends an UpdateChanStatus request to the given node. sendReq := func(node *lntest.HarnessNode, chanPoint *lnrpc.ChannelPoint, @@ -169,23 +181,13 @@ func testUpdateChanStatus(net *lntest.NetworkHarness, t *harnessTest) { // update is propagated. sendReq(alice, chanPoint, routerrpc.ChanStatusAction_DISABLE) expectedPolicy.Disabled = true - waitForChannelUpdate( - t, carolSub, - []expectedChanUpdate{ - {alice.PubKeyStr, expectedPolicy, chanPoint}, - }, - ) + assertChannelUpdate(alice, expectedPolicy) // Re-enable the channel and ensure that a "Disabled = false" update // is propagated. sendReq(alice, chanPoint, routerrpc.ChanStatusAction_ENABLE) expectedPolicy.Disabled = false - waitForChannelUpdate( - t, carolSub, - []expectedChanUpdate{ - {alice.PubKeyStr, expectedPolicy, chanPoint}, - }, - ) + assertChannelUpdate(alice, expectedPolicy) // Manually enabling a channel should NOT prevent subsequent // disconnections from automatically disabling the channel again @@ -195,24 +197,14 @@ func testUpdateChanStatus(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to disconnect Alice from Bob: %v", err) } expectedPolicy.Disabled = true - waitForChannelUpdate( - t, carolSub, - []expectedChanUpdate{ - {alice.PubKeyStr, expectedPolicy, chanPoint}, - {bob.PubKeyStr, expectedPolicy, chanPoint}, - }, - ) + assertChannelUpdate(alice, expectedPolicy) + assertChannelUpdate(bob, expectedPolicy) // Reconnecting the nodes should propagate a "Disabled = false" update. net.EnsureConnected(t.t, alice, bob) expectedPolicy.Disabled = false - waitForChannelUpdate( - t, carolSub, - []expectedChanUpdate{ - {alice.PubKeyStr, expectedPolicy, chanPoint}, - {bob.PubKeyStr, expectedPolicy, chanPoint}, - }, - ) + assertChannelUpdate(alice, expectedPolicy) + assertChannelUpdate(bob, expectedPolicy) // Manually disabling the channel should prevent a subsequent // disconnect / reconnect from re-enabling the channel on @@ -223,12 +215,7 @@ func testUpdateChanStatus(net *lntest.NetworkHarness, t *harnessTest) { // Alice sends out the "Disabled = true" update in response to // the ChanStatusAction_DISABLE request. expectedPolicy.Disabled = true - waitForChannelUpdate( - t, carolSub, - []expectedChanUpdate{ - {alice.PubKeyStr, expectedPolicy, chanPoint}, - }, - ) + assertChannelUpdate(alice, expectedPolicy) if err := net.DisconnectNodes(alice, bob); err != nil { t.Fatalf("unable to disconnect Alice from Bob: %v", err) @@ -237,23 +224,13 @@ func testUpdateChanStatus(net *lntest.NetworkHarness, t *harnessTest) { // Bob sends a "Disabled = true" update upon detecting the // disconnect. expectedPolicy.Disabled = true - waitForChannelUpdate( - t, carolSub, - []expectedChanUpdate{ - {bob.PubKeyStr, expectedPolicy, chanPoint}, - }, - ) + assertChannelUpdate(bob, expectedPolicy) // Bob sends a "Disabled = false" update upon detecting the // reconnect. net.EnsureConnected(t.t, alice, bob) expectedPolicy.Disabled = false - waitForChannelUpdate( - t, carolSub, - []expectedChanUpdate{ - {bob.PubKeyStr, expectedPolicy, chanPoint}, - }, - ) + assertChannelUpdate(bob, expectedPolicy) // However, since we manually disabled the channel on Alice's end, // the policy on Alice's end should still be "Disabled = true". Again, @@ -267,26 +244,17 @@ func testUpdateChanStatus(net *lntest.NetworkHarness, t *harnessTest) { // Bob sends a "Disabled = true" update upon detecting the // disconnect. expectedPolicy.Disabled = true - waitForChannelUpdate( - t, carolSub, - []expectedChanUpdate{ - {bob.PubKeyStr, expectedPolicy, chanPoint}, - }, - ) + assertChannelUpdate(bob, expectedPolicy) // After restoring automatic channel state management on Alice's end, // BOTH Alice and Bob should set the channel state back to "enabled" // on reconnect. sendReq(alice, chanPoint, routerrpc.ChanStatusAction_AUTO) net.EnsureConnected(t.t, alice, bob) + expectedPolicy.Disabled = false - waitForChannelUpdate( - t, carolSub, - []expectedChanUpdate{ - {alice.PubKeyStr, expectedPolicy, chanPoint}, - {bob.PubKeyStr, expectedPolicy, chanPoint}, - }, - ) + assertChannelUpdate(alice, expectedPolicy) + assertChannelUpdate(bob, expectedPolicy) assertEdgeDisabled(alice, chanPoint, false) } diff --git a/lntest/itest/lnd_channel_policy_test.go b/lntest/itest/lnd_channel_policy_test.go index 67c76f34..3bdadf86 100644 --- a/lntest/itest/lnd_channel_policy_test.go +++ b/lntest/itest/lnd_channel_policy_test.go @@ -28,14 +28,6 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { ) defaultMaxHtlc := calculateMaxHtlc(funding.MaxBtcFundingAmount) - // Launch notification clients for all nodes, such that we can - // get notified when they discover new channels and updates in the - // graph. - aliceSub := subscribeGraphNotifications(ctxb, t, net.Alice) - defer close(aliceSub.quit) - bobSub := subscribeGraphNotifications(ctxb, t, net.Bob) - defer close(bobSub.quit) - chanAmt := funding.MaxBtcFundingAmount pushAmt := chanAmt / 2 @@ -47,12 +39,27 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { PushAmt: pushAmt, }, ) + defer closeChannelAndAssert(t, net, net.Alice, chanPoint, false) // We add all the nodes' update channels to a slice, such that we can // make sure they all receive the expected updates. - graphSubs := []graphSubscription{aliceSub, bobSub} nodes := []*lntest.HarnessNode{net.Alice, net.Bob} + // assertPolicyUpdate checks that a given policy update has been + // received by a list of given nodes. + assertPolicyUpdate := func(nodes []*lntest.HarnessNode, + advertisingNode string, policy *lnrpc.RoutingPolicy, + chanPoint *lnrpc.ChannelPoint) { + + for _, node := range nodes { + assertChannelPolicyUpdate( + t.t, node, advertisingNode, + policy, chanPoint, false, + ) + } + + } + // Alice and Bob should see each other's ChannelUpdates, advertising the // default routing policies. expectedPolicy := &lnrpc.RoutingPolicy{ @@ -63,15 +70,10 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { MaxHtlcMsat: defaultMaxHtlc, } - for _, graphSub := range graphSubs { - waitForChannelUpdate( - t, graphSub, - []expectedChanUpdate{ - {net.Alice.PubKeyStr, expectedPolicy, chanPoint}, - {net.Bob.PubKeyStr, expectedPolicy, chanPoint}, - }, - ) - } + assertPolicyUpdate( + nodes, net.Alice.PubKeyStr, expectedPolicy, chanPoint, + ) + assertPolicyUpdate(nodes, net.Bob.PubKeyStr, expectedPolicy, chanPoint) // They should now know about the default policies. for _, node := range nodes { @@ -105,10 +107,6 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { // Clean up carol's node when the test finishes. defer shutdownAndAssert(net, t, carol) - carolSub := subscribeGraphNotifications(ctxb, t, carol) - defer close(carolSub.quit) - - graphSubs = append(graphSubs, carolSub) nodes = append(nodes, carol) // Send some coins to Carol that can be used for channel funding. @@ -129,6 +127,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { MinHtlc: customMinHtlc, }, ) + defer closeChannelAndAssert(t, net, net.Bob, chanPoint2, false) expectedPolicyBob := &lnrpc.RoutingPolicy{ FeeBaseMsat: defaultFeeBase, @@ -145,15 +144,12 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { MaxHtlcMsat: defaultMaxHtlc, } - for _, graphSub := range graphSubs { - waitForChannelUpdate( - t, graphSub, - []expectedChanUpdate{ - {net.Bob.PubKeyStr, expectedPolicyBob, chanPoint2}, - {carol.PubKeyStr, expectedPolicyCarol, chanPoint2}, - }, - ) - } + assertPolicyUpdate( + nodes, net.Bob.PubKeyStr, expectedPolicyBob, chanPoint2, + ) + assertPolicyUpdate( + nodes, carol.PubKeyStr, expectedPolicyCarol, chanPoint2, + ) // Check that all nodes now know about the updated policies. for _, node := range nodes { @@ -345,14 +341,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { } // Wait for all nodes to have seen the policy update done by Bob. - for _, graphSub := range graphSubs { - waitForChannelUpdate( - t, graphSub, - []expectedChanUpdate{ - {net.Bob.PubKeyStr, expectedPolicy, chanPoint}, - }, - ) - } + assertPolicyUpdate(nodes, net.Bob.PubKeyStr, expectedPolicy, chanPoint) // Check that all nodes now know about Bob's updated policy. for _, node := range nodes { @@ -396,6 +385,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { PushAmt: pushAmt, }, ) + defer closeChannelAndAssert(t, net, net.Alice, chanPoint3, false) ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) err = net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint3) @@ -436,15 +426,12 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { // Wait for all nodes to have seen the policy updates for both of // Alice's channels. - for _, graphSub := range graphSubs { - waitForChannelUpdate( - t, graphSub, - []expectedChanUpdate{ - {net.Alice.PubKeyStr, expectedPolicy, chanPoint}, - {net.Alice.PubKeyStr, expectedPolicy, chanPoint3}, - }, - ) - } + assertPolicyUpdate( + nodes, net.Alice.PubKeyStr, expectedPolicy, chanPoint, + ) + assertPolicyUpdate( + nodes, net.Alice.PubKeyStr, expectedPolicy, chanPoint3, + ) // And finally check that all nodes remembers the policy update they // received. @@ -469,47 +456,49 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { ctxt, cancel := context.WithTimeout(ctxb, defaultTimeout) defer cancel() _, err = net.Alice.UpdateChannelPolicy(ctxt, req) - if err != nil { - t.Fatalf("unable to update alice's channel policy: %v", err) - } + require.NoError(t.t, err) // Wait for all nodes to have seen the policy updates for both // of Alice's channels. Carol will not see the last update as // the limit has been reached. - for idx, graphSub := range graphSubs { - expUpdates := []expectedChanUpdate{ - {net.Alice.PubKeyStr, expectedPolicy, chanPoint}, - {net.Alice.PubKeyStr, expectedPolicy, chanPoint3}, - } - // Carol was added last, which is why we check the last - // index. - if i == numUpdatesTilRateLimit-1 && idx == len(graphSubs)-1 { - expUpdates = nil - } - waitForChannelUpdate(t, graphSub, expUpdates) - } + assertPolicyUpdate( + []*lntest.HarnessNode{net.Alice, net.Bob}, + net.Alice.PubKeyStr, expectedPolicy, chanPoint, + ) + assertPolicyUpdate( + []*lntest.HarnessNode{net.Alice, net.Bob}, + net.Alice.PubKeyStr, expectedPolicy, chanPoint3, + ) + // Check that all nodes remembers the policy update + // they received. + assertChannelPolicy( + t, net.Alice, net.Alice.PubKeyStr, + expectedPolicy, chanPoint, chanPoint3, + ) + assertChannelPolicy( + t, net.Bob, net.Alice.PubKeyStr, + expectedPolicy, chanPoint, chanPoint3, + ) - // And finally check that all nodes remembers the policy update - // they received. Since Carol didn't receive the last update, - // she still has Alice's old policy. - for idx, node := range nodes { - policy := expectedPolicy - // Carol was added last, which is why we check the last - // index. - if i == numUpdatesTilRateLimit-1 && idx == len(nodes)-1 { - policy = &prevAlicePolicy - } - assertChannelPolicy( - t, node, net.Alice.PubKeyStr, policy, chanPoint, - chanPoint3, - ) + // Carol was added last, which is why we check the last index. + // Since Carol didn't receive the last update, she still has + // Alice's old policy. + if i == numUpdatesTilRateLimit-1 { + expectedPolicy = &prevAlicePolicy } + assertPolicyUpdate( + []*lntest.HarnessNode{carol}, + net.Alice.PubKeyStr, expectedPolicy, chanPoint, + ) + assertPolicyUpdate( + []*lntest.HarnessNode{carol}, + net.Alice.PubKeyStr, expectedPolicy, chanPoint3, + ) + assertChannelPolicy( + t, carol, net.Alice.PubKeyStr, + expectedPolicy, chanPoint, chanPoint3, + ) } - - // Close the channels. - closeChannelAndAssert(t, net, net.Alice, chanPoint, false) - closeChannelAndAssert(t, net, net.Bob, chanPoint2, false) - closeChannelAndAssert(t, net, net.Alice, chanPoint3, false) } // testSendUpdateDisableChannel ensures that a channel update with the disable @@ -583,9 +572,6 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { net.ConnectNodes(t.t, net.Bob, dave) - daveSub := subscribeGraphNotifications(ctxb, t, dave) - defer close(daveSub.quit) - // We should expect to see a channel update with the default routing // policy, except that it should indicate the channel is disabled. expectedPolicy := &lnrpc.RoutingPolicy{ @@ -597,18 +583,29 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { Disabled: true, } + // assertPolicyUpdate checks that the required policy update has + // happened on the given node. + assertPolicyUpdate := func(node *lntest.HarnessNode, + policy *lnrpc.RoutingPolicy, chanPoint *lnrpc.ChannelPoint) { + + ctxt, cancel := context.WithTimeout(ctxb, defaultTimeout) + defer cancel() + + require.NoError( + t.t, dave.WaitForChannelPolicyUpdate( + ctxt, node.PubKeyStr, policy, chanPoint, false, + ), "error while waiting for channel update", + ) + } + // Let Carol go offline. Since Eve has an inactive timeout of 2s, we // expect her to send an update disabling the channel. restartCarol, err := net.SuspendNode(carol) if err != nil { t.Fatalf("unable to suspend carol: %v", err) } - waitForChannelUpdate( - t, daveSub, - []expectedChanUpdate{ - {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, - }, - ) + + assertPolicyUpdate(eve, expectedPolicy, chanPointEveCarol) // We restart Carol. Since the channel now becomes active again, Eve // should send a ChannelUpdate setting the channel no longer disabled. @@ -617,12 +614,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { } expectedPolicy.Disabled = false - waitForChannelUpdate( - t, daveSub, - []expectedChanUpdate{ - {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, - }, - ) + assertPolicyUpdate(eve, expectedPolicy, chanPointEveCarol) // Now we'll test a long disconnection. Disconnect Carol and Eve and // ensure they both detect each other as disabled. Their min backoffs @@ -633,26 +625,16 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { // Wait for a disable from both Carol and Eve to come through. expectedPolicy.Disabled = true - waitForChannelUpdate( - t, daveSub, - []expectedChanUpdate{ - {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, - {carol.PubKeyStr, expectedPolicy, chanPointEveCarol}, - }, - ) + assertPolicyUpdate(eve, expectedPolicy, chanPointEveCarol) + assertPolicyUpdate(carol, expectedPolicy, chanPointEveCarol) // Reconnect Carol and Eve, this should cause them to reenable the // channel from both ends after a short delay. net.EnsureConnected(t.t, carol, eve) expectedPolicy.Disabled = false - waitForChannelUpdate( - t, daveSub, - []expectedChanUpdate{ - {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, - {carol.PubKeyStr, expectedPolicy, chanPointEveCarol}, - }, - ) + assertPolicyUpdate(eve, expectedPolicy, chanPointEveCarol) + assertPolicyUpdate(carol, expectedPolicy, chanPointEveCarol) // Now we'll test a short disconnection. Disconnect Carol and Eve, then // reconnect them after one second so that their scheduled disables are @@ -667,8 +649,10 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { net.EnsureConnected(t.t, eve, carol) // Since the disable should have been canceled by both Carol and Eve, we - // expect no channel updates to appear on the network. - assertNoChannelUpdates(t, daveSub, 4*time.Second) + // expect no channel updates to appear on the network, which means we + // expect the polices stay unchanged(Disable == false). + assertPolicyUpdate(eve, expectedPolicy, chanPointEveCarol) + assertPolicyUpdate(carol, expectedPolicy, chanPointEveCarol) // Close Alice's channels with Bob and Carol cooperatively and // unilaterally respectively. @@ -685,13 +669,8 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { // Now that the channel close processes have been started, we should // receive an update marking each as disabled. expectedPolicy.Disabled = true - waitForChannelUpdate( - t, daveSub, - []expectedChanUpdate{ - {net.Alice.PubKeyStr, expectedPolicy, chanPointAliceBob}, - {net.Alice.PubKeyStr, expectedPolicy, chanPointAliceCarol}, - }, - ) + assertPolicyUpdate(net.Alice, expectedPolicy, chanPointAliceBob) + assertPolicyUpdate(net.Alice, expectedPolicy, chanPointAliceCarol) // Finally, close the channels by mining the closing transactions. mineBlocks(t, net, 1, 2) @@ -702,12 +681,8 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to close channel: %v", err) } - waitForChannelUpdate( - t, daveSub, - []expectedChanUpdate{ - {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, - }, - ) + assertPolicyUpdate(eve, expectedPolicy, chanPointEveCarol) + mineBlocks(t, net, 1, 1) // And finally, clean up the force closed channel by mining the diff --git a/lntest/itest/lnd_multi-hop-payments_test.go b/lntest/itest/lnd_multi-hop-payments_test.go index 734d5378..d1b578dc 100644 --- a/lntest/itest/lnd_multi-hop-payments_test.go +++ b/lntest/itest/lnd_multi-hop-payments_test.go @@ -418,14 +418,8 @@ func updateChannelPolicy(t *harnessTest, node *lntest.HarnessNode, } // Wait for listener node to receive the channel update from node. - ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) - graphSub := subscribeGraphNotifications(ctxt, t, listenerNode) - defer close(graphSub.quit) - - waitForChannelUpdate( - t, graphSub, - []expectedChanUpdate{ - {node.PubKeyStr, expectedPolicy, chanPoint}, - }, + assertChannelPolicyUpdate( + t.t, listenerNode, node.PubKeyStr, + expectedPolicy, chanPoint, false, ) } diff --git a/lntest/itest/lnd_routing_test.go b/lntest/itest/lnd_routing_test.go index 6f33d3de..54ea43c1 100644 --- a/lntest/itest/lnd_routing_test.go +++ b/lntest/itest/lnd_routing_test.go @@ -1814,15 +1814,9 @@ func testRouteFeeCutoff(net *lntest.NetworkHarness, t *harnessTest) { } // Wait for Alice to receive the channel update from Carol. - ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) - aliceSub := subscribeGraphNotifications(ctxt, t, net.Alice) - defer close(aliceSub.quit) - - waitForChannelUpdate( - t, aliceSub, - []expectedChanUpdate{ - {carol.PubKeyStr, expectedPolicy, chanPointCarolDave}, - }, + assertChannelPolicyUpdate( + t.t, net.Alice, carol.PubKeyStr, + expectedPolicy, chanPointCarolDave, false, ) // We'll also need the channel IDs for Bob's channels in order to From 7038d0e5c8403810939ab546c4b75f291eb8c774 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 8 Aug 2021 16:07:23 +0800 Subject: [PATCH 10/14] itest: fix typo --- lntest/itest/lnd_revocation_test.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/lntest/itest/lnd_revocation_test.go b/lntest/itest/lnd_revocation_test.go index e7103efc..97d6c3eb 100644 --- a/lntest/itest/lnd_revocation_test.go +++ b/lntest/itest/lnd_revocation_test.go @@ -262,7 +262,7 @@ func testRevokedCloseRetribution(net *lntest.NetworkHarness, t *harnessTest) { } // testRevokedCloseRetributionZeroValueRemoteOutput tests that Dave is able -// carry out retribution in the event that she fails in state where the remote +// carry out retribution in the event that he fails in state where the remote // commitment output has zero-value. func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness, t *harnessTest) { @@ -290,7 +290,7 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness ) defer shutdownAndAssert(net, t, dave) - // We must let Dave have an open channel before she can send a node + // We must let Dave have an open channel before he can send a node // announcement, so we open a channel with Carol, net.ConnectNodes(t.t, dave, carol) @@ -337,7 +337,7 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness } // Grab Carol's current commitment height (update number), we'll later - // revert her to this state after additional updates to force him to + // revert her to this state after additional updates to force her to // broadcast this soon to be revoked state. carolStateNumPreCopy := carolChan.NumUpdates @@ -348,8 +348,8 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness t.Fatalf("unable to copy database files: %v", err) } - // Finally, send payments from Dave to Carol, consuming Carol's remaining - // payment hashes. + // Finally, send payments from Dave to Carol, consuming Carol's + // remaining payment hashes. err = completePaymentRequests( dave, dave.RouterClient, carolPayReqs, false, ) @@ -362,8 +362,8 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness t.Fatalf("unable to get carol chan info: %v", err) } - // Now we shutdown Carol, copying over the his temporary database state - // which has the *prior* channel state over his current most up to date + // Now we shutdown Carol, copying over the her temporary database state + // which has the *prior* channel state over her current most up to date // state. With this, we essentially force Carol to travel back in time // within the channel's history. if err = net.RestartNode(carol, func() error { @@ -372,7 +372,7 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness t.Fatalf("unable to restart node: %v", err) } - // Now query for Carol's channel state, it should show that he's at a + // Now query for Carol's channel state, it should show that she's at a // state number in the past, not the *latest* state. carolChan, err = getChanInfo(carol) if err != nil { @@ -383,8 +383,8 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness } // Now force Carol to execute a *force* channel closure by unilaterally - // broadcasting his current channel state. This is actually the - // commitment transaction of a prior *revoked* state, so he'll soon + // broadcasting her current channel state. This is actually the + // commitment transaction of a prior *revoked* state, so she'll soon // feel the wrath of Dave's retribution. var ( closeUpdates lnrpc.Lightning_CloseChannelClient @@ -421,8 +421,8 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness block := mineBlocks(t, net, 1, 1)[0] // Here, Dave receives a confirmation of Carol's breach transaction. - // We restart Dave to ensure that she is persisting her retribution - // state and continues exacting justice after her node restarts. + // We restart Dave to ensure that he is persisting his retribution + // state and continues exacting justice after his node restarts. if err := net.RestartNode(dave, nil); err != nil { t.Fatalf("unable to stop Dave's node: %v", err) } @@ -457,10 +457,10 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness } } - // We restart Dave here to ensure that he persists her retribution state + // We restart Dave here to ensure that he persists his retribution state // and successfully continues exacting retribution after restarting. At // this point, Dave has broadcast the justice transaction, but it hasn't - // been confirmed yet; when Dave restarts, she should start waiting for + // been confirmed yet; when Dave restarts, he should start waiting for // the justice transaction to confirm again. if err := net.RestartNode(dave, nil); err != nil { t.Fatalf("unable to restart Dave's node: %v", err) From 64f4e21ab46d490bef703bd862deb84c4f06930b Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 8 Aug 2021 16:08:08 +0800 Subject: [PATCH 11/14] contractcourt+lnd: add debug log --- breacharbiter.go | 13 +++++++++++++ contractcourt/channel_arbitrator.go | 10 +++++----- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/breacharbiter.go b/breacharbiter.go index 692cb8f3..3045a1e7 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -176,6 +176,7 @@ func (b *breachArbiter) start() error { }, func() { breachRetInfos = make(map[wire.OutPoint]retributionInfo) }); err != nil { + brarLog.Errorf("Unable to create retribution info: %v", err) return err } @@ -190,6 +191,9 @@ func (b *breachArbiter) start() error { return err } + brarLog.Debugf("Found %v closing channels, %v retribution records", + len(closedChans), len(breachRetInfos)) + // Using the set of non-pending, closed channels, reconcile any // discrepancies between the channeldb and the retribution store by // removing any retribution information for which we have already @@ -199,6 +203,9 @@ func (b *breachArbiter) start() error { // TODO(halseth): no need continue on IsPending once closed channels // actually means close transaction is confirmed. for _, chanSummary := range closedChans { + brarLog.Debugf("Working on close channel: %v, is_pending: %v", + chanSummary.ChanPoint, chanSummary.IsPending) + if chanSummary.IsPending { continue } @@ -212,6 +219,9 @@ func (b *breachArbiter) start() error { return err } delete(breachRetInfos, *chanPoint) + + brarLog.Debugf("Skipped closed channel: %v", + chanSummary.ChanPoint) } } @@ -220,6 +230,9 @@ func (b *breachArbiter) start() error { for chanPoint := range breachRetInfos { retInfo := breachRetInfos[chanPoint] + brarLog.Debugf("Handling breach handoff on startup "+ + "for ChannelPoint(%v)", chanPoint) + // Register for a notification when the breach transaction is // confirmed on chain. breachTXID := retInfo.commitHash diff --git a/contractcourt/channel_arbitrator.go b/contractcourt/channel_arbitrator.go index 93710f04..19d68a55 100644 --- a/contractcourt/channel_arbitrator.go +++ b/contractcourt/channel_arbitrator.go @@ -435,10 +435,10 @@ func (c *ChannelArbitrator) Start(state *chanArbStartState) error { } } - log.Debugf("Starting ChannelArbitrator(%v), htlc_set=%v", + log.Debugf("Starting ChannelArbitrator(%v), htlc_set=%v, state=%v", c.cfg.ChanPoint, newLogClosure(func() string { return spew.Sdump(c.activeHTLCs) - }), + }), state.currentState, ) // Set our state from our starting state. @@ -795,7 +795,7 @@ func (c *ChannelArbitrator) stateStep( // default state. If this isn't a self initiated event (we're // checking due to a chain update), then we'll exit now. if len(chainActions) == 0 && trigger == chainTrigger { - log.Tracef("ChannelArbitrator(%v): no actions for "+ + log.Debugf("ChannelArbitrator(%v): no actions for "+ "chain trigger, terminating", c.cfg.ChanPoint) return StateDefault, closeTx, nil @@ -1303,7 +1303,7 @@ func (c *ChannelArbitrator) advanceState( // transition to is that same state that we started at. for { priorState = c.state - log.Tracef("ChannelArbitrator(%v): attempting state step with "+ + log.Debugf("ChannelArbitrator(%v): attempting state step with "+ "trigger=%v from state=%v", c.cfg.ChanPoint, trigger, priorState) @@ -1324,7 +1324,7 @@ func (c *ChannelArbitrator) advanceState( // our prior state back as the next state, then we'll // terminate. if nextState == priorState { - log.Tracef("ChannelArbitrator(%v): terminating at "+ + log.Debugf("ChannelArbitrator(%v): terminating at "+ "state=%v", c.cfg.ChanPoint, nextState) return nextState, forceCloseTx, nil } From e0e1bfb9356422590eebd76f9b61a2321f824f8a Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 8 Aug 2021 16:09:15 +0800 Subject: [PATCH 12/14] lnd: change start/stop order of subsystems This commit adds the part of the changes made in this PR: https://github.com/lightningnetwork/lnd/pull/1783. The origin PR is quite outdated, instead of rebasing it the relevant changes are taken out and put into this commit. --- server.go | 112 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 58 insertions(+), 54 deletions(-) diff --git a/server.go b/server.go index b8d71152..7355e79e 100644 --- a/server.go +++ b/server.go @@ -1497,18 +1497,6 @@ func (s *server) Start() error { cleanup := cleaner{} s.start.Do(func() { - if s.torController != nil { - if err := s.createNewHiddenService(); err != nil { - startErr = err - return - } - cleanup = cleanup.add(s.torController.Stop) - } - - if s.natTraversal != nil { - s.wg.Add(1) - go s.watchExternalIP() - } if s.hostAnn != nil { if err := s.hostAnn.Start(); err != nil { @@ -1574,12 +1562,6 @@ func (s *server) Start() error { } cleanup = cleanup.add(s.htlcNotifier.Stop) - if err := s.sphinx.Start(); err != nil { - startErr = err - return - } - cleanup = cleanup.add(s.sphinx.Stop) - if s.towerClient != nil { if err := s.towerClient.Start(); err != nil { startErr = err @@ -1595,12 +1577,6 @@ func (s *server) Start() error { cleanup = cleanup.add(s.anchorTowerClient.Stop) } - if err := s.htlcSwitch.Start(); err != nil { - startErr = err - return - } - cleanup = cleanup.add(s.htlcSwitch.Stop) - if err := s.sweeper.Start(); err != nil { startErr = err return @@ -1613,18 +1589,24 @@ func (s *server) Start() error { } cleanup = cleanup.add(s.utxoNursery.Stop) - if err := s.chainArb.Start(); err != nil { - startErr = err - return - } - cleanup = cleanup.add(s.chainArb.Stop) - if err := s.breachArbiter.Start(); err != nil { startErr = err return } cleanup = cleanup.add(s.breachArbiter.Stop) + if err := s.fundingMgr.Start(); err != nil { + startErr = err + return + } + cleanup = cleanup.add(s.fundingMgr.Stop) + + if err := s.chainArb.Start(); err != nil { + startErr = err + return + } + cleanup = cleanup.add(s.chainArb.Stop) + if err := s.authGossiper.Start(); err != nil { startErr = err return @@ -1637,18 +1619,24 @@ func (s *server) Start() error { } cleanup = cleanup.add(s.chanRouter.Stop) - if err := s.fundingMgr.Start(); err != nil { - startErr = err - return - } - cleanup = cleanup.add(s.fundingMgr.Stop) - if err := s.invoices.Start(); err != nil { startErr = err return } cleanup = cleanup.add(s.invoices.Stop) + if err := s.sphinx.Start(); err != nil { + startErr = err + return + } + cleanup = cleanup.add(s.sphinx.Stop) + + if err := s.htlcSwitch.Start(); err != nil { + startErr = err + return + } + cleanup = cleanup.add(s.htlcSwitch.Stop) + if err := s.chanStatusMgr.Start(); err != nil { startErr = err return @@ -1708,6 +1696,20 @@ func (s *server) Start() error { } cleanup = cleanup.add(s.chanSubSwapper.Stop) + if s.torController != nil { + if err := s.createNewHiddenService(); err != nil { + startErr = err + return + } + cleanup = cleanup.add(s.torController.Stop) + } + + if s.natTraversal != nil { + s.wg.Add(1) + go s.watchExternalIP() + } + + // Start connmgr last to prevent connections before init. s.connMgr.Start() cleanup = cleanup.add(func() error { s.connMgr.Stop() @@ -1824,32 +1826,38 @@ func (s *server) Stop() error { close(s.quit) + // Shutdown connMgr first to prevent conns during shutdown. + s.connMgr.Stop() + // Shutdown the wallet, funding manager, and the rpc server. s.chanStatusMgr.Stop() - if err := s.cc.ChainNotifier.Stop(); err != nil { - srvrLog.Warnf("Unable to stop ChainNotifier: %v", err) - } - if err := s.chanRouter.Stop(); err != nil { - srvrLog.Warnf("failed to stop chanRouter: %v", err) - } if err := s.htlcSwitch.Stop(); err != nil { srvrLog.Warnf("failed to stop htlcSwitch: %v", err) } if err := s.sphinx.Stop(); err != nil { srvrLog.Warnf("failed to stop sphinx: %v", err) } - if err := s.utxoNursery.Stop(); err != nil { - srvrLog.Warnf("failed to stop utxoNursery: %v", err) + if err := s.invoices.Stop(); err != nil { + srvrLog.Warnf("failed to stop invoices: %v", err) + } + if err := s.chanRouter.Stop(); err != nil { + srvrLog.Warnf("failed to stop chanRouter: %v", err) + } + if err := s.chainArb.Stop(); err != nil { + srvrLog.Warnf("failed to stop chainArb: %v", err) + } + if err := s.fundingMgr.Stop(); err != nil { + srvrLog.Warnf("failed to stop fundingMgr: %v", err) } if err := s.breachArbiter.Stop(); err != nil { srvrLog.Warnf("failed to stop breachArbiter: %v", err) } + if err := s.utxoNursery.Stop(); err != nil { + srvrLog.Warnf("failed to stop utxoNursery: %v", err) + } if err := s.authGossiper.Stop(); err != nil { srvrLog.Warnf("failed to stop authGossiper: %v", err) } - if err := s.chainArb.Stop(); err != nil { - srvrLog.Warnf("failed to stop chainArb: %v", err) - } if err := s.sweeper.Stop(); err != nil { srvrLog.Warnf("failed to stop sweeper: %v", err) } @@ -1862,16 +1870,12 @@ func (s *server) Stop() error { if err := s.htlcNotifier.Stop(); err != nil { srvrLog.Warnf("failed to stop htlcNotifier: %v", err) } - s.connMgr.Stop() - if err := s.invoices.Stop(); err != nil { - srvrLog.Warnf("failed to stop invoices: %v", err) - } - if err := s.fundingMgr.Stop(); err != nil { - srvrLog.Warnf("failed to stop fundingMgr: %v", err) - } if err := s.chanSubSwapper.Stop(); err != nil { srvrLog.Warnf("failed to stop chanSubSwapper: %v", err) } + if err := s.cc.ChainNotifier.Stop(); err != nil { + srvrLog.Warnf("Unable to stop ChainNotifier: %v", err) + } s.chanEventStore.Stop() s.missionControl.StopStoreTicker() From 66dae6ecf7e82213daa08f05506715daf02e31c0 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 25 Aug 2021 15:22:49 +0800 Subject: [PATCH 13/14] itest: put node.CloseChannel inside wait --- lntest/harness.go | 60 ++++++++++++++--------------- lntest/itest/lnd_revocation_test.go | 39 +++++-------------- 2 files changed, 37 insertions(+), 62 deletions(-) diff --git a/lntest/harness.go b/lntest/harness.go index d2648a5d..8e5fe878 100644 --- a/lntest/harness.go +++ b/lntest/harness.go @@ -1263,56 +1263,52 @@ func (n *NetworkHarness) CloseChannel(lnNode *HarnessNode, } } - closeReq := &lnrpc.CloseChannelRequest{ - ChannelPoint: cp, - Force: force, - } - closeRespStream, err := lnNode.CloseChannel(ctx, closeReq) - if err != nil { - return nil, nil, fmt.Errorf("unable to close channel: %v", err) - } + var ( + closeRespStream lnrpc.Lightning_CloseChannelClient + closeTxid *chainhash.Hash + ) - errChan := make(chan error) - fin := make(chan *chainhash.Hash) - go func() { - // Consume the "channel close" update in order to wait for the closing - // transaction to be broadcast, then wait for the closing tx to be seen - // within the network. + err = wait.NoError(func() error { + closeReq := &lnrpc.CloseChannelRequest{ + ChannelPoint: cp, Force: force, + } + closeRespStream, err = lnNode.CloseChannel(ctx, closeReq) + if err != nil { + return fmt.Errorf("unable to close channel: %v", err) + } + + // Consume the "channel close" update in order to wait for the + // closing transaction to be broadcast, then wait for the + // closing tx to be seen within the network. closeResp, err := closeRespStream.Recv() if err != nil { - errChan <- fmt.Errorf("unable to recv() from close "+ + return fmt.Errorf("unable to recv() from close "+ "stream: %v", err) - return } pendingClose, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ClosePending) if !ok { - errChan <- fmt.Errorf("expected channel close update, "+ + return fmt.Errorf("expected channel close update, "+ "instead got %v", pendingClose) - return } - closeTxid, err := chainhash.NewHash(pendingClose.ClosePending.Txid) + closeTxid, err = chainhash.NewHash( + pendingClose.ClosePending.Txid, + ) if err != nil { - errChan <- fmt.Errorf("unable to decode closeTxid: "+ + return fmt.Errorf("unable to decode closeTxid: "+ "%v", err) - return } if err := n.waitForTxInMempool(ctx, *closeTxid); err != nil { - errChan <- fmt.Errorf("error while waiting for "+ + return fmt.Errorf("error while waiting for "+ "broadcast tx: %v", err) - return } - fin <- closeTxid - }() - - // Wait until either the deadline for the context expires, an error - // occurs, or the channel close update is received. - select { - case err := <-errChan: + return nil + }, ChannelCloseTimeout) + if err != nil { return nil, nil, err - case closeTxid := <-fin: - return closeRespStream, closeTxid, nil } + + return closeRespStream, closeTxid, nil } // WaitForChannelClose waits for a notification from the passed channel close diff --git a/lntest/itest/lnd_revocation_test.go b/lntest/itest/lnd_revocation_test.go index 97d6c3eb..d26356b8 100644 --- a/lntest/itest/lnd_revocation_test.go +++ b/lntest/itest/lnd_revocation_test.go @@ -17,6 +17,7 @@ import ( "github.com/lightningnetwork/lnd/lnrpc/wtclientrpc" "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lntest/wait" + "github.com/stretchr/testify/require" ) // testRevokedCloseRetribution tests that Carol is able carry out @@ -159,22 +160,11 @@ func testRevokedCloseRetribution(net *lntest.NetworkHarness, t *harnessTest) { // broadcasting his current channel state. This is actually the // commitment transaction of a prior *revoked* state, so he'll soon // feel the wrath of Carol's retribution. - var closeUpdates lnrpc.Lightning_CloseChannelClient force := true - err = wait.Predicate(func() bool { - closeUpdates, _, err = net.CloseChannel( - net.Bob, chanPoint, force, - ) - if err != nil { - predErr = err - return false - } - - return true - }, defaultTimeout) - if err != nil { - t.Fatalf("unable to close channel: %v", predErr) - } + closeUpdates, _, err := net.CloseChannel( + net.Bob, chanPoint, force, + ) + require.NoError(t.t, err, "unable to close channel") // Wait for Bob's breach transaction to show up in the mempool to ensure // that Carol's node has started waiting for confirmations. @@ -386,22 +376,11 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness // broadcasting her current channel state. This is actually the // commitment transaction of a prior *revoked* state, so she'll soon // feel the wrath of Dave's retribution. - var ( - closeUpdates lnrpc.Lightning_CloseChannelClient - closeTxID *chainhash.Hash - closeErr error - ) - force := true - err = wait.Predicate(func() bool { - closeUpdates, closeTxID, closeErr = net.CloseChannel( - carol, chanPoint, force, - ) - return closeErr == nil - }, defaultTimeout) - if err != nil { - t.Fatalf("unable to close channel: %v", closeErr) - } + closeUpdates, closeTxID, closeErr := net.CloseChannel( + carol, chanPoint, force, + ) + require.NoError(t.t, closeErr, "unable to close channel") // Query the mempool for the breaching closing transaction, this should // be broadcast by Carol when she force closes the channel above. From 87ab4de149d3afe0b917a9b7c9fb9ea715bcf45a Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 8 Aug 2021 18:35:50 +0800 Subject: [PATCH 14/14] docs: add release note --- docs/release-notes/release-notes-0.14.0.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/release-notes/release-notes-0.14.0.md b/docs/release-notes/release-notes-0.14.0.md index 53c988b5..adbb3d36 100644 --- a/docs/release-notes/release-notes-0.14.0.md +++ b/docs/release-notes/release-notes-0.14.0.md @@ -239,6 +239,10 @@ you. * [Upgraded miekg/dns to improve the security posture](https://github.com/lightningnetwork/lnd/pull/5738) +* [Fixed flakes caused by graph topology subcription](https://github.com/lightningnetwork/lnd/pull/5611). + +* [Order of the start/stop on subsystems are changed to promote better safety](https://github.com/lightningnetwork/lnd/pull/1783). + ## Database * [Ensure single writer for legacy