Merge pull request #5611 from yyforyongyu/itest-flake-chan-open

itest: fix test flakes from open channel not found and tx not found in mempool
This commit is contained in:
Oliver Gugger
2021-09-17 09:51:17 +02:00
committed by GitHub
20 changed files with 1239 additions and 1120 deletions

View File

@@ -176,6 +176,7 @@ func (b *breachArbiter) start() error {
}, func() {
breachRetInfos = make(map[wire.OutPoint]retributionInfo)
}); err != nil {
brarLog.Errorf("Unable to create retribution info: %v", err)
return err
}
@@ -190,6 +191,9 @@ func (b *breachArbiter) start() error {
return err
}
brarLog.Debugf("Found %v closing channels, %v retribution records",
len(closedChans), len(breachRetInfos))
// Using the set of non-pending, closed channels, reconcile any
// discrepancies between the channeldb and the retribution store by
// removing any retribution information for which we have already
@@ -199,6 +203,9 @@ func (b *breachArbiter) start() error {
// TODO(halseth): no need continue on IsPending once closed channels
// actually means close transaction is confirmed.
for _, chanSummary := range closedChans {
brarLog.Debugf("Working on close channel: %v, is_pending: %v",
chanSummary.ChanPoint, chanSummary.IsPending)
if chanSummary.IsPending {
continue
}
@@ -212,6 +219,9 @@ func (b *breachArbiter) start() error {
return err
}
delete(breachRetInfos, *chanPoint)
brarLog.Debugf("Skipped closed channel: %v",
chanSummary.ChanPoint)
}
}
@@ -220,6 +230,9 @@ func (b *breachArbiter) start() error {
for chanPoint := range breachRetInfos {
retInfo := breachRetInfos[chanPoint]
brarLog.Debugf("Handling breach handoff on startup "+
"for ChannelPoint(%v)", chanPoint)
// Register for a notification when the breach transaction is
// confirmed on chain.
breachTXID := retInfo.commitHash

View File

@@ -435,10 +435,10 @@ func (c *ChannelArbitrator) Start(state *chanArbStartState) error {
}
}
log.Debugf("Starting ChannelArbitrator(%v), htlc_set=%v",
log.Debugf("Starting ChannelArbitrator(%v), htlc_set=%v, state=%v",
c.cfg.ChanPoint, newLogClosure(func() string {
return spew.Sdump(c.activeHTLCs)
}),
}), state.currentState,
)
// Set our state from our starting state.
@@ -795,7 +795,7 @@ func (c *ChannelArbitrator) stateStep(
// default state. If this isn't a self initiated event (we're
// checking due to a chain update), then we'll exit now.
if len(chainActions) == 0 && trigger == chainTrigger {
log.Tracef("ChannelArbitrator(%v): no actions for "+
log.Debugf("ChannelArbitrator(%v): no actions for "+
"chain trigger, terminating", c.cfg.ChanPoint)
return StateDefault, closeTx, nil
@@ -1303,7 +1303,7 @@ func (c *ChannelArbitrator) advanceState(
// transition to is that same state that we started at.
for {
priorState = c.state
log.Tracef("ChannelArbitrator(%v): attempting state step with "+
log.Debugf("ChannelArbitrator(%v): attempting state step with "+
"trigger=%v from state=%v", c.cfg.ChanPoint, trigger,
priorState)
@@ -1324,7 +1324,7 @@ func (c *ChannelArbitrator) advanceState(
// our prior state back as the next state, then we'll
// terminate.
if nextState == priorState {
log.Tracef("ChannelArbitrator(%v): terminating at "+
log.Debugf("ChannelArbitrator(%v): terminating at "+
"state=%v", c.cfg.ChanPoint, nextState)
return nextState, forceCloseTx, nil
}

View File

@@ -239,6 +239,10 @@ you.
* [Upgraded miekg/dns to improve the security posture](https://github.com/lightningnetwork/lnd/pull/5738)
* [Fixed flakes caused by graph topology subcription](https://github.com/lightningnetwork/lnd/pull/5611).
* [Order of the start/stop on subsystems are changed to promote better safety](https://github.com/lightningnetwork/lnd/pull/1783).
## Database
* [Ensure single writer for legacy

View File

@@ -1263,56 +1263,52 @@ func (n *NetworkHarness) CloseChannel(lnNode *HarnessNode,
}
}
closeReq := &lnrpc.CloseChannelRequest{
ChannelPoint: cp,
Force: force,
}
closeRespStream, err := lnNode.CloseChannel(ctx, closeReq)
if err != nil {
return nil, nil, fmt.Errorf("unable to close channel: %v", err)
}
var (
closeRespStream lnrpc.Lightning_CloseChannelClient
closeTxid *chainhash.Hash
)
errChan := make(chan error)
fin := make(chan *chainhash.Hash)
go func() {
// Consume the "channel close" update in order to wait for the closing
// transaction to be broadcast, then wait for the closing tx to be seen
// within the network.
err = wait.NoError(func() error {
closeReq := &lnrpc.CloseChannelRequest{
ChannelPoint: cp, Force: force,
}
closeRespStream, err = lnNode.CloseChannel(ctx, closeReq)
if err != nil {
return fmt.Errorf("unable to close channel: %v", err)
}
// Consume the "channel close" update in order to wait for the
// closing transaction to be broadcast, then wait for the
// closing tx to be seen within the network.
closeResp, err := closeRespStream.Recv()
if err != nil {
errChan <- fmt.Errorf("unable to recv() from close "+
return fmt.Errorf("unable to recv() from close "+
"stream: %v", err)
return
}
pendingClose, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ClosePending)
if !ok {
errChan <- fmt.Errorf("expected channel close update, "+
return fmt.Errorf("expected channel close update, "+
"instead got %v", pendingClose)
return
}
closeTxid, err := chainhash.NewHash(pendingClose.ClosePending.Txid)
closeTxid, err = chainhash.NewHash(
pendingClose.ClosePending.Txid,
)
if err != nil {
errChan <- fmt.Errorf("unable to decode closeTxid: "+
return fmt.Errorf("unable to decode closeTxid: "+
"%v", err)
return
}
if err := n.waitForTxInMempool(ctx, *closeTxid); err != nil {
errChan <- fmt.Errorf("error while waiting for "+
return fmt.Errorf("error while waiting for "+
"broadcast tx: %v", err)
return
}
fin <- closeTxid
}()
// Wait until either the deadline for the context expires, an error
// occurs, or the channel close update is received.
select {
case err := <-errChan:
return nil
}, ChannelCloseTimeout)
if err != nil {
return nil, nil, err
case closeTxid := <-fin:
return closeRespStream, closeTxid, nil
}
return closeRespStream, closeTxid, nil
}
// WaitForChannelClose waits for a notification from the passed channel close

View File

@@ -4,7 +4,6 @@ import (
"context"
"encoding/hex"
"fmt"
"io"
"math"
"sync/atomic"
"testing"
@@ -26,14 +25,6 @@ import (
"google.golang.org/protobuf/proto"
)
// AddToNodeLog adds a line to the log file and asserts there's no error.
func AddToNodeLog(t *testing.T,
node *lntest.HarnessNode, logLine string) {
err := node.AddToLog(logLine)
require.NoError(t, err, "unable to add to log")
}
// openChannelStream blocks until an OpenChannel request for a channel funding
// by alice succeeds. If it does, a stream client is returned to receive events
// about the opening channel.
@@ -102,72 +93,6 @@ func openChannelAndAssert(t *harnessTest, net *lntest.NetworkHarness,
return fundingChanPoint
}
// graphSubscription houses the proxied update and error chans for a node's
// graph subscriptions.
type graphSubscription struct {
updateChan chan *lnrpc.GraphTopologyUpdate
errChan chan error
quit chan struct{}
}
// subscribeGraphNotifications subscribes to channel graph updates and launches
// a goroutine that forwards these to the returned channel.
func subscribeGraphNotifications(ctxb context.Context, t *harnessTest,
node *lntest.HarnessNode) graphSubscription {
// We'll first start by establishing a notification client which will
// send us notifications upon detected changes in the channel graph.
req := &lnrpc.GraphTopologySubscription{}
ctx, cancelFunc := context.WithCancel(ctxb)
topologyClient, err := node.SubscribeChannelGraph(ctx, req)
require.NoError(t.t, err, "unable to create topology client")
// We'll launch a goroutine that will be responsible for proxying all
// notifications recv'd from the client into the channel below.
errChan := make(chan error, 1)
quit := make(chan struct{})
graphUpdates := make(chan *lnrpc.GraphTopologyUpdate, 20)
go func() {
for {
defer cancelFunc()
select {
case <-quit:
return
default:
graphUpdate, err := topologyClient.Recv()
select {
case <-quit:
return
default:
}
if err == io.EOF {
return
} else if err != nil {
select {
case errChan <- err:
case <-quit:
}
return
}
select {
case graphUpdates <- graphUpdate:
case <-quit:
return
}
}
}
}()
return graphSubscription{
updateChan: graphUpdates,
errChan: errChan,
quit: quit,
}
}
func waitForGraphSync(t *harnessTest, node *lntest.HarnessNode) {
t.t.Helper()
@@ -221,15 +146,6 @@ func closeChannelAndAssertType(t *harnessTest,
)[0]
expectDisable := !curPolicy.Disabled
// If the current channel policy is enabled, begin subscribing the graph
// updates before initiating the channel closure.
var graphSub *graphSubscription
if expectDisable {
sub := subscribeGraphNotifications(ctxt, t, node)
graphSub = &sub
defer close(graphSub.quit)
}
closeUpdates, _, err := net.CloseChannel(node, fundingChanPoint, force)
require.NoError(t.t, err, "unable to close channel")
@@ -237,11 +153,9 @@ func closeChannelAndAssertType(t *harnessTest,
// received the disabled update.
if expectDisable {
curPolicy.Disabled = true
waitForChannelUpdate(
t, *graphSub,
[]expectedChanUpdate{
{node.PubKeyStr, curPolicy, fundingChanPoint},
},
assertChannelPolicyUpdate(
t.t, node, node.PubKeyStr,
curPolicy, fundingChanPoint, false,
)
}
@@ -640,13 +554,6 @@ func getChannelBalance(t *harnessTest,
return resp
}
// expectedChanUpdate houses params we expect a ChannelUpdate to advertise.
type expectedChanUpdate struct {
advertisingNode string
expectedPolicy *lnrpc.RoutingPolicy
chanPoint *lnrpc.ChannelPoint
}
// txStr returns the string representation of the channel's funding transaction.
func txStr(chanPoint *lnrpc.ChannelPoint) string {
fundingTxID, err := lnrpc.GetChanPointFundingTxid(chanPoint)
@@ -660,109 +567,6 @@ func txStr(chanPoint *lnrpc.ChannelPoint) string {
return cp.String()
}
// waitForChannelUpdate waits for a node to receive the expected channel
// updates.
func waitForChannelUpdate(t *harnessTest, subscription graphSubscription,
expUpdates []expectedChanUpdate) {
// Create an array indicating which expected channel updates we have
// received.
found := make([]bool, len(expUpdates))
out:
for {
select {
case graphUpdate := <-subscription.updateChan:
for _, update := range graphUpdate.ChannelUpdates {
require.NotZerof(
t.t, len(expUpdates),
"received unexpected channel "+
"update from %v for channel %v",
update.AdvertisingNode,
update.ChanId,
)
// For each expected update, check if it matches
// the update we just received.
for i, exp := range expUpdates {
fundingTxStr := txStr(update.ChanPoint)
if fundingTxStr != txStr(exp.chanPoint) {
continue
}
if update.AdvertisingNode !=
exp.advertisingNode {
continue
}
err := checkChannelPolicy(
update.RoutingPolicy,
exp.expectedPolicy,
)
if err != nil {
continue
}
// We got a policy update that matched
// the values and channel point of what
// we expected, mark it as found.
found[i] = true
// If we have no more channel updates
// we are waiting for, break out of the
// loop.
rem := 0
for _, f := range found {
if !f {
rem++
}
}
if rem == 0 {
break out
}
// Since we found a match among the
// expected updates, break out of the
// inner loop.
break
}
}
case err := <-subscription.errChan:
t.Fatalf("unable to recv graph update: %v", err)
case <-time.After(defaultTimeout):
if len(expUpdates) == 0 {
return
}
t.Fatalf("did not receive channel update")
}
}
}
// assertNoChannelUpdates ensures that no ChannelUpdates are sent via the
// graphSubscription. This method will block for the provided duration before
// returning to the caller if successful.
func assertNoChannelUpdates(t *harnessTest, subscription graphSubscription,
duration time.Duration) {
timeout := time.After(duration)
for {
select {
case graphUpdate := <-subscription.updateChan:
require.Zero(
t.t, len(graphUpdate.ChannelUpdates),
"no channel updates were expected",
)
case err := <-subscription.errChan:
t.Fatalf("graph subscription failure: %v", err)
case <-timeout:
// No updates received, success.
return
}
}
}
// getChannelPolicies queries the channel graph and retrieves the current edge
// policies for the provided channel points.
func getChannelPolicies(t *harnessTest, node *lntest.HarnessNode,
@@ -818,44 +622,13 @@ func assertChannelPolicy(t *harnessTest, node *lntest.HarnessNode,
policies := getChannelPolicies(t, node, advertisingNode, chanPoints...)
for _, policy := range policies {
err := checkChannelPolicy(policy, expectedPolicy)
err := lntest.CheckChannelPolicy(policy, expectedPolicy)
if err != nil {
t.Fatalf(err.Error())
t.Fatalf(fmt.Sprintf("%v: %s", err.Error(), node))
}
}
}
// checkChannelPolicy checks that the policy matches the expected one.
func checkChannelPolicy(policy, expectedPolicy *lnrpc.RoutingPolicy) error {
if policy.FeeBaseMsat != expectedPolicy.FeeBaseMsat {
return fmt.Errorf("expected base fee %v, got %v",
expectedPolicy.FeeBaseMsat, policy.FeeBaseMsat)
}
if policy.FeeRateMilliMsat != expectedPolicy.FeeRateMilliMsat {
return fmt.Errorf("expected fee rate %v, got %v",
expectedPolicy.FeeRateMilliMsat,
policy.FeeRateMilliMsat)
}
if policy.TimeLockDelta != expectedPolicy.TimeLockDelta {
return fmt.Errorf("expected time lock delta %v, got %v",
expectedPolicy.TimeLockDelta,
policy.TimeLockDelta)
}
if policy.MinHtlc != expectedPolicy.MinHtlc {
return fmt.Errorf("expected min htlc %v, got %v",
expectedPolicy.MinHtlc, policy.MinHtlc)
}
if policy.MaxHtlcMsat != expectedPolicy.MaxHtlcMsat {
return fmt.Errorf("expected max htlc %v, got %v",
expectedPolicy.MaxHtlcMsat, policy.MaxHtlcMsat)
}
if policy.Disabled != expectedPolicy.Disabled {
return errors.New("edge should be disabled but isn't")
}
return nil
}
// assertMinerBlockHeightDelta ensures that tempMiner is 'delta' blocks ahead
// of miner.
func assertMinerBlockHeightDelta(t *harnessTest,
@@ -1867,3 +1640,21 @@ func assertNumUTXOs(t *testing.T, node *lntest.HarnessNode, expectedUtxos int) {
}, defaultTimeout)
require.NoError(t, err, "wait for listunspent")
}
// assertChannelPolicyUpdate checks that the required policy update has
// happened on the given node.
func assertChannelPolicyUpdate(t *testing.T, node *lntest.HarnessNode,
advertisingNode string, policy *lnrpc.RoutingPolicy,
chanPoint *lnrpc.ChannelPoint, includeUnannounced bool) {
ctxb := context.Background()
ctxt, cancel := context.WithTimeout(ctxb, lntest.DefaultTimeout)
defer cancel()
require.NoError(
t, node.WaitForChannelPolicyUpdate(
ctxt, advertisingNode, policy,
chanPoint, includeUnannounced,
), "error while waiting for channel update",
)
}

View File

@@ -1346,14 +1346,11 @@ func copyPorts(oldNode *lntest.HarnessNode) lntest.NodeOption {
}
}
func rpcPointToWirePoint(t *harnessTest, chanPoint *lnrpc.ChannelPoint) wire.OutPoint {
txid, err := lnrpc.GetChanPointFundingTxid(chanPoint)
if err != nil {
t.Fatalf("unable to get txid: %v", err)
}
func rpcPointToWirePoint(t *harnessTest,
chanPoint *lnrpc.ChannelPoint) wire.OutPoint {
return wire.OutPoint{
Hash: *txid,
Index: chanPoint.OutputIndex,
}
op, err := lntest.MakeOutpoint(chanPoint)
require.NoError(t.t, err, "unable to get txid")
return op
}

View File

@@ -251,11 +251,6 @@ func testChannelForceClosure(net *lntest.NetworkHarness, t *harnessTest) {
for _, channelType := range commitTypes {
testName := fmt.Sprintf("committype=%v", channelType)
logLine := fmt.Sprintf(
"---- channel force close subtest %s ----\n",
testName,
)
AddToNodeLog(t.t, net.Alice, logLine)
channelType := channelType
success := t.t.Run(testName, func(t *testing.T) {

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"testing"
"time"
@@ -85,8 +86,20 @@ func testUpdateChanStatus(net *lntest.NetworkHarness, t *harnessTest) {
net.ConnectNodes(t.t, alice, carol)
net.ConnectNodes(t.t, bob, carol)
carolSub := subscribeGraphNotifications(ctxb, t, carol)
defer close(carolSub.quit)
// assertChannelUpdate checks that the required policy update has
// happened on the given node.
assertChannelUpdate := func(node *lntest.HarnessNode,
policy *lnrpc.RoutingPolicy) {
ctxt, cancel := context.WithTimeout(ctxb, defaultTimeout)
defer cancel()
require.NoError(
t.t, carol.WaitForChannelPolicyUpdate(
ctxt, node.PubKeyStr, policy, chanPoint, false,
), "error while waiting for channel update",
)
}
// sendReq sends an UpdateChanStatus request to the given node.
sendReq := func(node *lntest.HarnessNode, chanPoint *lnrpc.ChannelPoint,
@@ -168,23 +181,13 @@ func testUpdateChanStatus(net *lntest.NetworkHarness, t *harnessTest) {
// update is propagated.
sendReq(alice, chanPoint, routerrpc.ChanStatusAction_DISABLE)
expectedPolicy.Disabled = true
waitForChannelUpdate(
t, carolSub,
[]expectedChanUpdate{
{alice.PubKeyStr, expectedPolicy, chanPoint},
},
)
assertChannelUpdate(alice, expectedPolicy)
// Re-enable the channel and ensure that a "Disabled = false" update
// is propagated.
sendReq(alice, chanPoint, routerrpc.ChanStatusAction_ENABLE)
expectedPolicy.Disabled = false
waitForChannelUpdate(
t, carolSub,
[]expectedChanUpdate{
{alice.PubKeyStr, expectedPolicy, chanPoint},
},
)
assertChannelUpdate(alice, expectedPolicy)
// Manually enabling a channel should NOT prevent subsequent
// disconnections from automatically disabling the channel again
@@ -194,24 +197,14 @@ func testUpdateChanStatus(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("unable to disconnect Alice from Bob: %v", err)
}
expectedPolicy.Disabled = true
waitForChannelUpdate(
t, carolSub,
[]expectedChanUpdate{
{alice.PubKeyStr, expectedPolicy, chanPoint},
{bob.PubKeyStr, expectedPolicy, chanPoint},
},
)
assertChannelUpdate(alice, expectedPolicy)
assertChannelUpdate(bob, expectedPolicy)
// Reconnecting the nodes should propagate a "Disabled = false" update.
net.EnsureConnected(t.t, alice, bob)
expectedPolicy.Disabled = false
waitForChannelUpdate(
t, carolSub,
[]expectedChanUpdate{
{alice.PubKeyStr, expectedPolicy, chanPoint},
{bob.PubKeyStr, expectedPolicy, chanPoint},
},
)
assertChannelUpdate(alice, expectedPolicy)
assertChannelUpdate(bob, expectedPolicy)
// Manually disabling the channel should prevent a subsequent
// disconnect / reconnect from re-enabling the channel on
@@ -222,12 +215,7 @@ func testUpdateChanStatus(net *lntest.NetworkHarness, t *harnessTest) {
// Alice sends out the "Disabled = true" update in response to
// the ChanStatusAction_DISABLE request.
expectedPolicy.Disabled = true
waitForChannelUpdate(
t, carolSub,
[]expectedChanUpdate{
{alice.PubKeyStr, expectedPolicy, chanPoint},
},
)
assertChannelUpdate(alice, expectedPolicy)
if err := net.DisconnectNodes(alice, bob); err != nil {
t.Fatalf("unable to disconnect Alice from Bob: %v", err)
@@ -236,23 +224,13 @@ func testUpdateChanStatus(net *lntest.NetworkHarness, t *harnessTest) {
// Bob sends a "Disabled = true" update upon detecting the
// disconnect.
expectedPolicy.Disabled = true
waitForChannelUpdate(
t, carolSub,
[]expectedChanUpdate{
{bob.PubKeyStr, expectedPolicy, chanPoint},
},
)
assertChannelUpdate(bob, expectedPolicy)
// Bob sends a "Disabled = false" update upon detecting the
// reconnect.
net.EnsureConnected(t.t, alice, bob)
expectedPolicy.Disabled = false
waitForChannelUpdate(
t, carolSub,
[]expectedChanUpdate{
{bob.PubKeyStr, expectedPolicy, chanPoint},
},
)
assertChannelUpdate(bob, expectedPolicy)
// However, since we manually disabled the channel on Alice's end,
// the policy on Alice's end should still be "Disabled = true". Again,
@@ -266,26 +244,17 @@ func testUpdateChanStatus(net *lntest.NetworkHarness, t *harnessTest) {
// Bob sends a "Disabled = true" update upon detecting the
// disconnect.
expectedPolicy.Disabled = true
waitForChannelUpdate(
t, carolSub,
[]expectedChanUpdate{
{bob.PubKeyStr, expectedPolicy, chanPoint},
},
)
assertChannelUpdate(bob, expectedPolicy)
// After restoring automatic channel state management on Alice's end,
// BOTH Alice and Bob should set the channel state back to "enabled"
// on reconnect.
sendReq(alice, chanPoint, routerrpc.ChanStatusAction_AUTO)
net.EnsureConnected(t.t, alice, bob)
expectedPolicy.Disabled = false
waitForChannelUpdate(
t, carolSub,
[]expectedChanUpdate{
{alice.PubKeyStr, expectedPolicy, chanPoint},
{bob.PubKeyStr, expectedPolicy, chanPoint},
},
)
assertChannelUpdate(alice, expectedPolicy)
assertChannelUpdate(bob, expectedPolicy)
assertEdgeDisabled(alice, chanPoint, false)
}
@@ -749,3 +718,69 @@ func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) {
// Close the channel between Bob and Dave.
closeChannelAndAssert(t, net, net.Bob, chanPoint, false)
}
// graphSubscription houses the proxied update and error chans for a node's
// graph subscriptions.
type graphSubscription struct {
updateChan chan *lnrpc.GraphTopologyUpdate
errChan chan error
quit chan struct{}
}
// subscribeGraphNotifications subscribes to channel graph updates and launches
// a goroutine that forwards these to the returned channel.
func subscribeGraphNotifications(ctxb context.Context, t *harnessTest,
node *lntest.HarnessNode) graphSubscription {
// We'll first start by establishing a notification client which will
// send us notifications upon detected changes in the channel graph.
req := &lnrpc.GraphTopologySubscription{}
ctx, cancelFunc := context.WithCancel(ctxb)
topologyClient, err := node.SubscribeChannelGraph(ctx, req)
require.NoError(t.t, err, "unable to create topology client")
// We'll launch a goroutine that will be responsible for proxying all
// notifications recv'd from the client into the channel below.
errChan := make(chan error, 1)
quit := make(chan struct{})
graphUpdates := make(chan *lnrpc.GraphTopologyUpdate, 20)
go func() {
for {
defer cancelFunc()
select {
case <-quit:
return
default:
graphUpdate, err := topologyClient.Recv()
select {
case <-quit:
return
default:
}
if err == io.EOF {
return
} else if err != nil {
select {
case errChan <- err:
case <-quit:
}
return
}
select {
case graphUpdates <- graphUpdate:
case <-quit:
return
}
}
}
}()
return graphSubscription{
updateChan: graphUpdates,
errChan: errChan,
quit: quit,
}
}

View File

@@ -3,13 +3,16 @@ package itest
import (
"context"
"strings"
"time"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/funding"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/require"
)
// testUpdateChannelPolicy tests that policy updates made to a channel
@@ -25,14 +28,6 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
)
defaultMaxHtlc := calculateMaxHtlc(funding.MaxBtcFundingAmount)
// Launch notification clients for all nodes, such that we can
// get notified when they discover new channels and updates in the
// graph.
aliceSub := subscribeGraphNotifications(ctxb, t, net.Alice)
defer close(aliceSub.quit)
bobSub := subscribeGraphNotifications(ctxb, t, net.Bob)
defer close(bobSub.quit)
chanAmt := funding.MaxBtcFundingAmount
pushAmt := chanAmt / 2
@@ -44,12 +39,27 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
PushAmt: pushAmt,
},
)
defer closeChannelAndAssert(t, net, net.Alice, chanPoint, false)
// We add all the nodes' update channels to a slice, such that we can
// make sure they all receive the expected updates.
graphSubs := []graphSubscription{aliceSub, bobSub}
nodes := []*lntest.HarnessNode{net.Alice, net.Bob}
// assertPolicyUpdate checks that a given policy update has been
// received by a list of given nodes.
assertPolicyUpdate := func(nodes []*lntest.HarnessNode,
advertisingNode string, policy *lnrpc.RoutingPolicy,
chanPoint *lnrpc.ChannelPoint) {
for _, node := range nodes {
assertChannelPolicyUpdate(
t.t, node, advertisingNode,
policy, chanPoint, false,
)
}
}
// Alice and Bob should see each other's ChannelUpdates, advertising the
// default routing policies.
expectedPolicy := &lnrpc.RoutingPolicy{
@@ -60,15 +70,10 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
MaxHtlcMsat: defaultMaxHtlc,
}
for _, graphSub := range graphSubs {
waitForChannelUpdate(
t, graphSub,
[]expectedChanUpdate{
{net.Alice.PubKeyStr, expectedPolicy, chanPoint},
{net.Bob.PubKeyStr, expectedPolicy, chanPoint},
},
)
}
assertPolicyUpdate(
nodes, net.Alice.PubKeyStr, expectedPolicy, chanPoint,
)
assertPolicyUpdate(nodes, net.Bob.PubKeyStr, expectedPolicy, chanPoint)
// They should now know about the default policies.
for _, node := range nodes {
@@ -102,10 +107,6 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
// Clean up carol's node when the test finishes.
defer shutdownAndAssert(net, t, carol)
carolSub := subscribeGraphNotifications(ctxb, t, carol)
defer close(carolSub.quit)
graphSubs = append(graphSubs, carolSub)
nodes = append(nodes, carol)
// Send some coins to Carol that can be used for channel funding.
@@ -126,6 +127,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
MinHtlc: customMinHtlc,
},
)
defer closeChannelAndAssert(t, net, net.Bob, chanPoint2, false)
expectedPolicyBob := &lnrpc.RoutingPolicy{
FeeBaseMsat: defaultFeeBase,
@@ -142,15 +144,12 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
MaxHtlcMsat: defaultMaxHtlc,
}
for _, graphSub := range graphSubs {
waitForChannelUpdate(
t, graphSub,
[]expectedChanUpdate{
{net.Bob.PubKeyStr, expectedPolicyBob, chanPoint2},
{carol.PubKeyStr, expectedPolicyCarol, chanPoint2},
},
)
}
assertPolicyUpdate(
nodes, net.Bob.PubKeyStr, expectedPolicyBob, chanPoint2,
)
assertPolicyUpdate(
nodes, carol.PubKeyStr, expectedPolicyCarol, chanPoint2,
)
// Check that all nodes now know about the updated policies.
for _, node := range nodes {
@@ -342,14 +341,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
}
// Wait for all nodes to have seen the policy update done by Bob.
for _, graphSub := range graphSubs {
waitForChannelUpdate(
t, graphSub,
[]expectedChanUpdate{
{net.Bob.PubKeyStr, expectedPolicy, chanPoint},
},
)
}
assertPolicyUpdate(nodes, net.Bob.PubKeyStr, expectedPolicy, chanPoint)
// Check that all nodes now know about Bob's updated policy.
for _, node := range nodes {
@@ -393,6 +385,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
PushAmt: pushAmt,
},
)
defer closeChannelAndAssert(t, net, net.Alice, chanPoint3, false)
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
err = net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint3)
@@ -433,15 +426,12 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
// Wait for all nodes to have seen the policy updates for both of
// Alice's channels.
for _, graphSub := range graphSubs {
waitForChannelUpdate(
t, graphSub,
[]expectedChanUpdate{
{net.Alice.PubKeyStr, expectedPolicy, chanPoint},
{net.Alice.PubKeyStr, expectedPolicy, chanPoint3},
},
)
}
assertPolicyUpdate(
nodes, net.Alice.PubKeyStr, expectedPolicy, chanPoint,
)
assertPolicyUpdate(
nodes, net.Alice.PubKeyStr, expectedPolicy, chanPoint3,
)
// And finally check that all nodes remembers the policy update they
// received.
@@ -466,90 +456,376 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
ctxt, cancel := context.WithTimeout(ctxb, defaultTimeout)
defer cancel()
_, err = net.Alice.UpdateChannelPolicy(ctxt, req)
if err != nil {
t.Fatalf("unable to update alice's channel policy: %v", err)
}
require.NoError(t.t, err)
// Wait for all nodes to have seen the policy updates for both
// of Alice's channels. Carol will not see the last update as
// the limit has been reached.
for idx, graphSub := range graphSubs {
expUpdates := []expectedChanUpdate{
{net.Alice.PubKeyStr, expectedPolicy, chanPoint},
{net.Alice.PubKeyStr, expectedPolicy, chanPoint3},
}
// Carol was added last, which is why we check the last
// index.
if i == numUpdatesTilRateLimit-1 && idx == len(graphSubs)-1 {
expUpdates = nil
}
waitForChannelUpdate(t, graphSub, expUpdates)
}
assertPolicyUpdate(
[]*lntest.HarnessNode{net.Alice, net.Bob},
net.Alice.PubKeyStr, expectedPolicy, chanPoint,
)
assertPolicyUpdate(
[]*lntest.HarnessNode{net.Alice, net.Bob},
net.Alice.PubKeyStr, expectedPolicy, chanPoint3,
)
// Check that all nodes remembers the policy update
// they received.
assertChannelPolicy(
t, net.Alice, net.Alice.PubKeyStr,
expectedPolicy, chanPoint, chanPoint3,
)
assertChannelPolicy(
t, net.Bob, net.Alice.PubKeyStr,
expectedPolicy, chanPoint, chanPoint3,
)
// And finally check that all nodes remembers the policy update
// they received. Since Carol didn't receive the last update,
// she still has Alice's old policy.
for idx, node := range nodes {
policy := expectedPolicy
// Carol was added last, which is why we check the last
// index.
if i == numUpdatesTilRateLimit-1 && idx == len(nodes)-1 {
policy = &prevAlicePolicy
}
assertChannelPolicy(
t, node, net.Alice.PubKeyStr, policy, chanPoint,
chanPoint3,
)
// Carol was added last, which is why we check the last index.
// Since Carol didn't receive the last update, she still has
// Alice's old policy.
if i == numUpdatesTilRateLimit-1 {
expectedPolicy = &prevAlicePolicy
}
assertPolicyUpdate(
[]*lntest.HarnessNode{carol},
net.Alice.PubKeyStr, expectedPolicy, chanPoint,
)
assertPolicyUpdate(
[]*lntest.HarnessNode{carol},
net.Alice.PubKeyStr, expectedPolicy, chanPoint3,
)
assertChannelPolicy(
t, carol, net.Alice.PubKeyStr,
expectedPolicy, chanPoint, chanPoint3,
)
}
// Close the channels.
closeChannelAndAssert(t, net, net.Alice, chanPoint, false)
closeChannelAndAssert(t, net, net.Bob, chanPoint2, false)
closeChannelAndAssert(t, net, net.Alice, chanPoint3, false)
}
// updateChannelPolicy updates the channel policy of node to the
// given fees and timelock delta. This function blocks until
// listenerNode has received the policy update.
func updateChannelPolicy(t *harnessTest, node *lntest.HarnessNode,
chanPoint *lnrpc.ChannelPoint, baseFee int64, feeRate int64,
timeLockDelta uint32, maxHtlc uint64, listenerNode *lntest.HarnessNode) {
// testSendUpdateDisableChannel ensures that a channel update with the disable
// flag set is sent once a channel has been either unilaterally or cooperatively
// closed.
func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
ctxb := context.Background()
expectedPolicy := &lnrpc.RoutingPolicy{
FeeBaseMsat: baseFee,
FeeRateMilliMsat: feeRate,
TimeLockDelta: timeLockDelta,
MinHtlc: 1000, // default value
MaxHtlcMsat: maxHtlc,
}
const (
chanAmt = 100000
)
updateFeeReq := &lnrpc.PolicyUpdateRequest{
BaseFeeMsat: baseFee,
FeeRate: float64(feeRate) / testFeeBase,
TimeLockDelta: timeLockDelta,
Scope: &lnrpc.PolicyUpdateRequest_ChanPoint{
ChanPoint: chanPoint,
},
MaxHtlcMsat: maxHtlc,
}
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
if _, err := node.UpdateChannelPolicy(ctxt, updateFeeReq); err != nil {
t.Fatalf("unable to update chan policy: %v", err)
}
// Wait for listener node to receive the channel update from node.
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
graphSub := subscribeGraphNotifications(ctxt, t, listenerNode)
defer close(graphSub.quit)
waitForChannelUpdate(
t, graphSub,
[]expectedChanUpdate{
{node.PubKeyStr, expectedPolicy, chanPoint},
// Open a channel between Alice and Bob and Alice and Carol. These will
// be closed later on in order to trigger channel update messages
// marking the channels as disabled.
chanPointAliceBob := openChannelAndAssert(
t, net, net.Alice, net.Bob,
lntest.OpenChannelParams{
Amt: chanAmt,
},
)
carol := net.NewNode(
t.t, "Carol", []string{
"--minbackoff=10s",
"--chan-enable-timeout=1.5s",
"--chan-disable-timeout=3s",
"--chan-status-sample-interval=.5s",
})
defer shutdownAndAssert(net, t, carol)
net.ConnectNodes(t.t, net.Alice, carol)
chanPointAliceCarol := openChannelAndAssert(
t, net, net.Alice, carol,
lntest.OpenChannelParams{
Amt: chanAmt,
},
)
// We create a new node Eve that has an inactive channel timeout of
// just 2 seconds (down from the default 20m). It will be used to test
// channel updates for channels going inactive.
eve := net.NewNode(
t.t, "Eve", []string{
"--minbackoff=10s",
"--chan-enable-timeout=1.5s",
"--chan-disable-timeout=3s",
"--chan-status-sample-interval=.5s",
})
defer shutdownAndAssert(net, t, eve)
// Give Eve some coins.
net.SendCoins(t.t, btcutil.SatoshiPerBitcoin, eve)
// Connect Eve to Carol and Bob, and open a channel to carol.
net.ConnectNodes(t.t, eve, carol)
net.ConnectNodes(t.t, eve, net.Bob)
chanPointEveCarol := openChannelAndAssert(
t, net, eve, carol,
lntest.OpenChannelParams{
Amt: chanAmt,
},
)
// Launch a node for Dave which will connect to Bob in order to receive
// graph updates from. This will ensure that the channel updates are
// propagated throughout the network.
dave := net.NewNode(t.t, "Dave", nil)
defer shutdownAndAssert(net, t, dave)
net.ConnectNodes(t.t, net.Bob, dave)
// We should expect to see a channel update with the default routing
// policy, except that it should indicate the channel is disabled.
expectedPolicy := &lnrpc.RoutingPolicy{
FeeBaseMsat: int64(chainreg.DefaultBitcoinBaseFeeMSat),
FeeRateMilliMsat: int64(chainreg.DefaultBitcoinFeeRate),
TimeLockDelta: chainreg.DefaultBitcoinTimeLockDelta,
MinHtlc: 1000, // default value
MaxHtlcMsat: calculateMaxHtlc(chanAmt),
Disabled: true,
}
// assertPolicyUpdate checks that the required policy update has
// happened on the given node.
assertPolicyUpdate := func(node *lntest.HarnessNode,
policy *lnrpc.RoutingPolicy, chanPoint *lnrpc.ChannelPoint) {
ctxt, cancel := context.WithTimeout(ctxb, defaultTimeout)
defer cancel()
require.NoError(
t.t, dave.WaitForChannelPolicyUpdate(
ctxt, node.PubKeyStr, policy, chanPoint, false,
), "error while waiting for channel update",
)
}
// Let Carol go offline. Since Eve has an inactive timeout of 2s, we
// expect her to send an update disabling the channel.
restartCarol, err := net.SuspendNode(carol)
if err != nil {
t.Fatalf("unable to suspend carol: %v", err)
}
assertPolicyUpdate(eve, expectedPolicy, chanPointEveCarol)
// We restart Carol. Since the channel now becomes active again, Eve
// should send a ChannelUpdate setting the channel no longer disabled.
if err := restartCarol(); err != nil {
t.Fatalf("unable to restart carol: %v", err)
}
expectedPolicy.Disabled = false
assertPolicyUpdate(eve, expectedPolicy, chanPointEveCarol)
// Now we'll test a long disconnection. Disconnect Carol and Eve and
// ensure they both detect each other as disabled. Their min backoffs
// are high enough to not interfere with disabling logic.
if err := net.DisconnectNodes(carol, eve); err != nil {
t.Fatalf("unable to disconnect Carol from Eve: %v", err)
}
// Wait for a disable from both Carol and Eve to come through.
expectedPolicy.Disabled = true
assertPolicyUpdate(eve, expectedPolicy, chanPointEveCarol)
assertPolicyUpdate(carol, expectedPolicy, chanPointEveCarol)
// Reconnect Carol and Eve, this should cause them to reenable the
// channel from both ends after a short delay.
net.EnsureConnected(t.t, carol, eve)
expectedPolicy.Disabled = false
assertPolicyUpdate(eve, expectedPolicy, chanPointEveCarol)
assertPolicyUpdate(carol, expectedPolicy, chanPointEveCarol)
// Now we'll test a short disconnection. Disconnect Carol and Eve, then
// reconnect them after one second so that their scheduled disables are
// aborted. One second is twice the status sample interval, so this
// should allow for the disconnect to be detected, but still leave time
// to cancel the announcement before the 3 second inactive timeout is
// hit.
if err := net.DisconnectNodes(carol, eve); err != nil {
t.Fatalf("unable to disconnect Carol from Eve: %v", err)
}
time.Sleep(time.Second)
net.EnsureConnected(t.t, eve, carol)
// Since the disable should have been canceled by both Carol and Eve, we
// expect no channel updates to appear on the network, which means we
// expect the polices stay unchanged(Disable == false).
assertPolicyUpdate(eve, expectedPolicy, chanPointEveCarol)
assertPolicyUpdate(carol, expectedPolicy, chanPointEveCarol)
// Close Alice's channels with Bob and Carol cooperatively and
// unilaterally respectively.
_, _, err = net.CloseChannel(net.Alice, chanPointAliceBob, false)
if err != nil {
t.Fatalf("unable to close channel: %v", err)
}
_, _, err = net.CloseChannel(net.Alice, chanPointAliceCarol, true)
if err != nil {
t.Fatalf("unable to close channel: %v", err)
}
// Now that the channel close processes have been started, we should
// receive an update marking each as disabled.
expectedPolicy.Disabled = true
assertPolicyUpdate(net.Alice, expectedPolicy, chanPointAliceBob)
assertPolicyUpdate(net.Alice, expectedPolicy, chanPointAliceCarol)
// Finally, close the channels by mining the closing transactions.
mineBlocks(t, net, 1, 2)
// Also do this check for Eve's channel with Carol.
_, _, err = net.CloseChannel(eve, chanPointEveCarol, false)
if err != nil {
t.Fatalf("unable to close channel: %v", err)
}
assertPolicyUpdate(eve, expectedPolicy, chanPointEveCarol)
mineBlocks(t, net, 1, 1)
// And finally, clean up the force closed channel by mining the
// sweeping transaction.
cleanupForceClose(t, net, net.Alice, chanPointAliceCarol)
}
// testUpdateChannelPolicyForPrivateChannel tests when a private channel
// updates its channel edge policy, we will use the updated policy to send our
// payment.
// The topology is created as: Alice -> Bob -> Carol, where Alice -> Bob is
// public and Bob -> Carol is private. After an invoice is created by Carol,
// Bob will update the base fee via UpdateChannelPolicy, we will test that
// Alice will not fail the payment and send it using the updated channel
// policy.
func testUpdateChannelPolicyForPrivateChannel(net *lntest.NetworkHarness,
t *harnessTest) {
ctxb := context.Background()
defer ctxb.Done()
// We'll create the following topology first,
// Alice <--public:100k--> Bob <--private:100k--> Carol
const chanAmt = btcutil.Amount(100000)
// Open a channel with 100k satoshis between Alice and Bob.
chanPointAliceBob := openChannelAndAssert(
t, net, net.Alice, net.Bob,
lntest.OpenChannelParams{
Amt: chanAmt,
},
)
defer closeChannelAndAssert(t, net, net.Alice, chanPointAliceBob, false)
// Get Alice's funding point.
aliceChanTXID, err := lnrpc.GetChanPointFundingTxid(chanPointAliceBob)
require.NoError(t.t, err, "unable to get txid")
aliceFundPoint := wire.OutPoint{
Hash: *aliceChanTXID,
Index: chanPointAliceBob.OutputIndex,
}
// Create a new node Carol.
carol := net.NewNode(t.t, "Carol", nil)
defer shutdownAndAssert(net, t, carol)
// Connect Carol to Bob.
net.ConnectNodes(t.t, carol, net.Bob)
// Open a channel with 100k satoshis between Bob and Carol.
chanPointBobCarol := openChannelAndAssert(
t, net, net.Bob, carol,
lntest.OpenChannelParams{
Amt: chanAmt,
Private: true,
},
)
defer closeChannelAndAssert(t, net, net.Bob, chanPointBobCarol, false)
// Get Bob's funding point.
bobChanTXID, err := lnrpc.GetChanPointFundingTxid(chanPointBobCarol)
require.NoError(t.t, err, "unable to get txid")
bobFundPoint := wire.OutPoint{
Hash: *bobChanTXID,
Index: chanPointBobCarol.OutputIndex,
}
// We should have the following topology now,
// Alice <--public:100k--> Bob <--private:100k--> Carol
//
// Now we will create an invoice for Carol.
const paymentAmt = 20000
invoice := &lnrpc.Invoice{
Memo: "routing hints",
Value: paymentAmt,
Private: true,
}
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
resp, err := carol.AddInvoice(ctxt, invoice)
require.NoError(t.t, err, "unable to create invoice for carol")
// Bob now updates the channel edge policy for the private channel.
const (
baseFeeMSat = 33000
)
timeLockDelta := uint32(chainreg.DefaultBitcoinTimeLockDelta)
updateFeeReq := &lnrpc.PolicyUpdateRequest{
BaseFeeMsat: baseFeeMSat,
TimeLockDelta: timeLockDelta,
Scope: &lnrpc.PolicyUpdateRequest_ChanPoint{
ChanPoint: chanPointBobCarol,
},
}
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
_, err = net.Bob.UpdateChannelPolicy(ctxt, updateFeeReq)
require.NoError(t.t, err, "unable to update chan policy")
// Alice pays the invoices. She will use the updated baseFeeMSat in the
// payment
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
payReqs := []string{resp.PaymentRequest}
require.NoError(t.t,
completePaymentRequests(
net.Alice, net.Alice.RouterClient, payReqs, true,
), "unable to send payment",
)
// Check that Alice did make the payment with two HTLCs, one failed and
// one succeeded.
ctxt, _ = context.WithTimeout(ctxt, defaultTimeout)
paymentsResp, err := net.Alice.ListPayments(
ctxt, &lnrpc.ListPaymentsRequest{},
)
require.NoError(t.t, err, "failed to obtain payments for Alice")
require.Equal(t.t, 1, len(paymentsResp.Payments), "expected 1 payment")
htlcs := paymentsResp.Payments[0].Htlcs
require.Equal(t.t, 2, len(htlcs), "expected to have 2 HTLCs")
require.Equal(
t.t, lnrpc.HTLCAttempt_FAILED, htlcs[0].Status,
"the first HTLC attempt should fail",
)
require.Equal(
t.t, lnrpc.HTLCAttempt_SUCCEEDED, htlcs[1].Status,
"the second HTLC attempt should succeed",
)
// Carol should have received 20k satoshis from Bob.
assertAmountPaid(t, "Carol(remote) [<=private] Bob(local)",
carol, bobFundPoint, 0, paymentAmt)
// Bob should have sent 20k satoshis to Carol.
assertAmountPaid(t, "Bob(local) [private=>] Carol(remote)",
net.Bob, bobFundPoint, paymentAmt, 0)
// Calcuate the amount in satoshis.
amtExpected := int64(paymentAmt + baseFeeMSat/1000)
// Bob should have received 20k satoshis + fee from Alice.
assertAmountPaid(t, "Bob(remote) <= Alice(local)",
net.Bob, aliceFundPoint, 0, amtExpected)
// Alice should have sent 20k satoshis + fee to Bob.
assertAmountPaid(t, "Alice(local) => Bob(remote)",
net.Alice, aliceFundPoint, amtExpected, 0)
}

View File

@@ -139,7 +139,7 @@ test:
"---- basic channel funding subtest %s ----\n",
testName,
)
AddToNodeLog(t.t, net.Alice, logLine)
net.Alice.AddToLog(logLine)
success := t.t.Run(testName, func(t *testing.T) {
testFunding(cc, dc)

View File

@@ -1323,209 +1323,6 @@ func testNodeSignVerify(net *lntest.NetworkHarness, t *harnessTest) {
closeChannelAndAssert(t, net, net.Alice, aliceBobCh, false)
}
// testSendUpdateDisableChannel ensures that a channel update with the disable
// flag set is sent once a channel has been either unilaterally or cooperatively
// closed.
func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
ctxb := context.Background()
const (
chanAmt = 100000
)
// Open a channel between Alice and Bob and Alice and Carol. These will
// be closed later on in order to trigger channel update messages
// marking the channels as disabled.
chanPointAliceBob := openChannelAndAssert(
t, net, net.Alice, net.Bob,
lntest.OpenChannelParams{
Amt: chanAmt,
},
)
carol := net.NewNode(
t.t, "Carol", []string{
"--minbackoff=10s",
"--chan-enable-timeout=1.5s",
"--chan-disable-timeout=3s",
"--chan-status-sample-interval=.5s",
})
defer shutdownAndAssert(net, t, carol)
net.ConnectNodes(t.t, net.Alice, carol)
chanPointAliceCarol := openChannelAndAssert(
t, net, net.Alice, carol,
lntest.OpenChannelParams{
Amt: chanAmt,
},
)
// We create a new node Eve that has an inactive channel timeout of
// just 2 seconds (down from the default 20m). It will be used to test
// channel updates for channels going inactive.
eve := net.NewNode(
t.t, "Eve", []string{
"--minbackoff=10s",
"--chan-enable-timeout=1.5s",
"--chan-disable-timeout=3s",
"--chan-status-sample-interval=.5s",
})
defer shutdownAndAssert(net, t, eve)
// Give Eve some coins.
net.SendCoins(t.t, btcutil.SatoshiPerBitcoin, eve)
// Connect Eve to Carol and Bob, and open a channel to carol.
net.ConnectNodes(t.t, eve, carol)
net.ConnectNodes(t.t, eve, net.Bob)
chanPointEveCarol := openChannelAndAssert(
t, net, eve, carol,
lntest.OpenChannelParams{
Amt: chanAmt,
},
)
// Launch a node for Dave which will connect to Bob in order to receive
// graph updates from. This will ensure that the channel updates are
// propagated throughout the network.
dave := net.NewNode(t.t, "Dave", nil)
defer shutdownAndAssert(net, t, dave)
net.ConnectNodes(t.t, net.Bob, dave)
daveSub := subscribeGraphNotifications(ctxb, t, dave)
defer close(daveSub.quit)
// We should expect to see a channel update with the default routing
// policy, except that it should indicate the channel is disabled.
expectedPolicy := &lnrpc.RoutingPolicy{
FeeBaseMsat: int64(chainreg.DefaultBitcoinBaseFeeMSat),
FeeRateMilliMsat: int64(chainreg.DefaultBitcoinFeeRate),
TimeLockDelta: chainreg.DefaultBitcoinTimeLockDelta,
MinHtlc: 1000, // default value
MaxHtlcMsat: calculateMaxHtlc(chanAmt),
Disabled: true,
}
// Let Carol go offline. Since Eve has an inactive timeout of 2s, we
// expect her to send an update disabling the channel.
restartCarol, err := net.SuspendNode(carol)
if err != nil {
t.Fatalf("unable to suspend carol: %v", err)
}
waitForChannelUpdate(
t, daveSub,
[]expectedChanUpdate{
{eve.PubKeyStr, expectedPolicy, chanPointEveCarol},
},
)
// We restart Carol. Since the channel now becomes active again, Eve
// should send a ChannelUpdate setting the channel no longer disabled.
if err := restartCarol(); err != nil {
t.Fatalf("unable to restart carol: %v", err)
}
expectedPolicy.Disabled = false
waitForChannelUpdate(
t, daveSub,
[]expectedChanUpdate{
{eve.PubKeyStr, expectedPolicy, chanPointEveCarol},
},
)
// Now we'll test a long disconnection. Disconnect Carol and Eve and
// ensure they both detect each other as disabled. Their min backoffs
// are high enough to not interfere with disabling logic.
if err := net.DisconnectNodes(carol, eve); err != nil {
t.Fatalf("unable to disconnect Carol from Eve: %v", err)
}
// Wait for a disable from both Carol and Eve to come through.
expectedPolicy.Disabled = true
waitForChannelUpdate(
t, daveSub,
[]expectedChanUpdate{
{eve.PubKeyStr, expectedPolicy, chanPointEveCarol},
{carol.PubKeyStr, expectedPolicy, chanPointEveCarol},
},
)
// Reconnect Carol and Eve, this should cause them to reenable the
// channel from both ends after a short delay.
net.EnsureConnected(t.t, carol, eve)
expectedPolicy.Disabled = false
waitForChannelUpdate(
t, daveSub,
[]expectedChanUpdate{
{eve.PubKeyStr, expectedPolicy, chanPointEveCarol},
{carol.PubKeyStr, expectedPolicy, chanPointEveCarol},
},
)
// Now we'll test a short disconnection. Disconnect Carol and Eve, then
// reconnect them after one second so that their scheduled disables are
// aborted. One second is twice the status sample interval, so this
// should allow for the disconnect to be detected, but still leave time
// to cancel the announcement before the 3 second inactive timeout is
// hit.
if err := net.DisconnectNodes(carol, eve); err != nil {
t.Fatalf("unable to disconnect Carol from Eve: %v", err)
}
time.Sleep(time.Second)
net.EnsureConnected(t.t, eve, carol)
// Since the disable should have been canceled by both Carol and Eve, we
// expect no channel updates to appear on the network.
assertNoChannelUpdates(t, daveSub, 4*time.Second)
// Close Alice's channels with Bob and Carol cooperatively and
// unilaterally respectively.
_, _, err = net.CloseChannel(net.Alice, chanPointAliceBob, false)
if err != nil {
t.Fatalf("unable to close channel: %v", err)
}
_, _, err = net.CloseChannel(net.Alice, chanPointAliceCarol, true)
if err != nil {
t.Fatalf("unable to close channel: %v", err)
}
// Now that the channel close processes have been started, we should
// receive an update marking each as disabled.
expectedPolicy.Disabled = true
waitForChannelUpdate(
t, daveSub,
[]expectedChanUpdate{
{net.Alice.PubKeyStr, expectedPolicy, chanPointAliceBob},
{net.Alice.PubKeyStr, expectedPolicy, chanPointAliceCarol},
},
)
// Finally, close the channels by mining the closing transactions.
mineBlocks(t, net, 1, 2)
// Also do this check for Eve's channel with Carol.
_, _, err = net.CloseChannel(eve, chanPointEveCarol, false)
if err != nil {
t.Fatalf("unable to close channel: %v", err)
}
waitForChannelUpdate(
t, daveSub,
[]expectedChanUpdate{
{eve.PubKeyStr, expectedPolicy, chanPointEveCarol},
},
)
mineBlocks(t, net, 1, 1)
// And finally, clean up the force closed channel by mining the
// sweeping transaction.
cleanupForceClose(t, net, net.Alice, chanPointAliceCarol)
}
// testAbandonChannel abandones a channel and asserts that it is no
// longer open and not in one of the pending closure states. It also
// verifies that the abandoned channel is reported as closed with close

View File

@@ -384,3 +384,42 @@ func assertEventAndType(t *harnessTest, eventType routerrpc.HtlcEvent_EventType,
return event
}
// updateChannelPolicy updates the channel policy of node to the
// given fees and timelock delta. This function blocks until
// listenerNode has received the policy update.
func updateChannelPolicy(t *harnessTest, node *lntest.HarnessNode,
chanPoint *lnrpc.ChannelPoint, baseFee int64, feeRate int64,
timeLockDelta uint32, maxHtlc uint64, listenerNode *lntest.HarnessNode) {
ctxb := context.Background()
expectedPolicy := &lnrpc.RoutingPolicy{
FeeBaseMsat: baseFee,
FeeRateMilliMsat: feeRate,
TimeLockDelta: timeLockDelta,
MinHtlc: 1000, // default value
MaxHtlcMsat: maxHtlc,
}
updateFeeReq := &lnrpc.PolicyUpdateRequest{
BaseFeeMsat: baseFee,
FeeRate: float64(feeRate) / testFeeBase,
TimeLockDelta: timeLockDelta,
Scope: &lnrpc.PolicyUpdateRequest_ChanPoint{
ChanPoint: chanPoint,
},
MaxHtlcMsat: maxHtlc,
}
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
if _, err := node.UpdateChannelPolicy(ctxt, updateFeeReq); err != nil {
t.Fatalf("unable to update chan policy: %v", err)
}
// Wait for listener node to receive the channel update from node.
assertChannelPolicyUpdate(
t.t, listenerNode, node.PubKeyStr,
expectedPolicy, chanPoint, false,
)
}

View File

@@ -97,7 +97,7 @@ func testMultiHopHtlcClaims(net *lntest.NetworkHarness, t *harnessTest) {
"%s/%s ----\n",
testName, subTest.name,
)
AddToNodeLog(t, net.Alice, logLine)
net.Alice.AddToLog(logLine)
success := ht.t.Run(subTest.name, func(t *testing.T) {
ht := newHarnessTest(t, net)

View File

@@ -17,6 +17,7 @@ import (
"github.com/lightningnetwork/lnd/lnrpc/wtclientrpc"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/stretchr/testify/require"
)
// testRevokedCloseRetribution tests that Carol is able carry out
@@ -159,22 +160,11 @@ func testRevokedCloseRetribution(net *lntest.NetworkHarness, t *harnessTest) {
// broadcasting his current channel state. This is actually the
// commitment transaction of a prior *revoked* state, so he'll soon
// feel the wrath of Carol's retribution.
var closeUpdates lnrpc.Lightning_CloseChannelClient
force := true
err = wait.Predicate(func() bool {
closeUpdates, _, err = net.CloseChannel(
net.Bob, chanPoint, force,
)
if err != nil {
predErr = err
return false
}
return true
}, defaultTimeout)
if err != nil {
t.Fatalf("unable to close channel: %v", predErr)
}
closeUpdates, _, err := net.CloseChannel(
net.Bob, chanPoint, force,
)
require.NoError(t.t, err, "unable to close channel")
// Wait for Bob's breach transaction to show up in the mempool to ensure
// that Carol's node has started waiting for confirmations.
@@ -262,7 +252,7 @@ func testRevokedCloseRetribution(net *lntest.NetworkHarness, t *harnessTest) {
}
// testRevokedCloseRetributionZeroValueRemoteOutput tests that Dave is able
// carry out retribution in the event that she fails in state where the remote
// carry out retribution in the event that he fails in state where the remote
// commitment output has zero-value.
func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness,
t *harnessTest) {
@@ -290,7 +280,7 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness
)
defer shutdownAndAssert(net, t, dave)
// We must let Dave have an open channel before she can send a node
// We must let Dave have an open channel before he can send a node
// announcement, so we open a channel with Carol,
net.ConnectNodes(t.t, dave, carol)
@@ -337,7 +327,7 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness
}
// Grab Carol's current commitment height (update number), we'll later
// revert her to this state after additional updates to force him to
// revert her to this state after additional updates to force her to
// broadcast this soon to be revoked state.
carolStateNumPreCopy := carolChan.NumUpdates
@@ -348,8 +338,8 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness
t.Fatalf("unable to copy database files: %v", err)
}
// Finally, send payments from Dave to Carol, consuming Carol's remaining
// payment hashes.
// Finally, send payments from Dave to Carol, consuming Carol's
// remaining payment hashes.
err = completePaymentRequests(
dave, dave.RouterClient, carolPayReqs, false,
)
@@ -362,8 +352,8 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness
t.Fatalf("unable to get carol chan info: %v", err)
}
// Now we shutdown Carol, copying over the his temporary database state
// which has the *prior* channel state over his current most up to date
// Now we shutdown Carol, copying over the her temporary database state
// which has the *prior* channel state over her current most up to date
// state. With this, we essentially force Carol to travel back in time
// within the channel's history.
if err = net.RestartNode(carol, func() error {
@@ -372,7 +362,7 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness
t.Fatalf("unable to restart node: %v", err)
}
// Now query for Carol's channel state, it should show that he's at a
// Now query for Carol's channel state, it should show that she's at a
// state number in the past, not the *latest* state.
carolChan, err = getChanInfo(carol)
if err != nil {
@@ -383,25 +373,14 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness
}
// Now force Carol to execute a *force* channel closure by unilaterally
// broadcasting his current channel state. This is actually the
// commitment transaction of a prior *revoked* state, so he'll soon
// broadcasting her current channel state. This is actually the
// commitment transaction of a prior *revoked* state, so she'll soon
// feel the wrath of Dave's retribution.
var (
closeUpdates lnrpc.Lightning_CloseChannelClient
closeTxID *chainhash.Hash
closeErr error
)
force := true
err = wait.Predicate(func() bool {
closeUpdates, closeTxID, closeErr = net.CloseChannel(
carol, chanPoint, force,
)
return closeErr == nil
}, defaultTimeout)
if err != nil {
t.Fatalf("unable to close channel: %v", closeErr)
}
closeUpdates, closeTxID, closeErr := net.CloseChannel(
carol, chanPoint, force,
)
require.NoError(t.t, closeErr, "unable to close channel")
// Query the mempool for the breaching closing transaction, this should
// be broadcast by Carol when she force closes the channel above.
@@ -421,8 +400,8 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness
block := mineBlocks(t, net, 1, 1)[0]
// Here, Dave receives a confirmation of Carol's breach transaction.
// We restart Dave to ensure that she is persisting her retribution
// state and continues exacting justice after her node restarts.
// We restart Dave to ensure that he is persisting his retribution
// state and continues exacting justice after his node restarts.
if err := net.RestartNode(dave, nil); err != nil {
t.Fatalf("unable to stop Dave's node: %v", err)
}
@@ -457,10 +436,10 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness
}
}
// We restart Dave here to ensure that he persists her retribution state
// We restart Dave here to ensure that he persists his retribution state
// and successfully continues exacting retribution after restarting. At
// this point, Dave has broadcast the justice transaction, but it hasn't
// been confirmed yet; when Dave restarts, she should start waiting for
// been confirmed yet; when Dave restarts, he should start waiting for
// the justice transaction to confirm again.
if err := net.RestartNode(dave, nil); err != nil {
t.Fatalf("unable to restart Dave's node: %v", err)

View File

@@ -1017,145 +1017,6 @@ func testPrivateChannels(net *lntest.NetworkHarness, t *harnessTest) {
closeChannelAndAssert(t, net, carol, chanPointPrivate, false)
}
// testUpdateChannelPolicyForPrivateChannel tests when a private channel
// updates its channel edge policy, we will use the updated policy to send our
// payment.
// The topology is created as: Alice -> Bob -> Carol, where Alice -> Bob is
// public and Bob -> Carol is private. After an invoice is created by Carol,
// Bob will update the base fee via UpdateChannelPolicy, we will test that
// Alice will not fail the payment and send it using the updated channel
// policy.
func testUpdateChannelPolicyForPrivateChannel(net *lntest.NetworkHarness,
t *harnessTest) {
ctxb := context.Background()
defer ctxb.Done()
// We'll create the following topology first,
// Alice <--public:100k--> Bob <--private:100k--> Carol
const chanAmt = btcutil.Amount(100000)
// Open a channel with 100k satoshis between Alice and Bob.
chanPointAliceBob := openChannelAndAssert(
t, net, net.Alice, net.Bob,
lntest.OpenChannelParams{
Amt: chanAmt,
},
)
defer closeChannelAndAssert(t, net, net.Alice, chanPointAliceBob, false)
// Get Alice's funding point.
aliceChanTXID, err := lnrpc.GetChanPointFundingTxid(chanPointAliceBob)
require.NoError(t.t, err, "unable to get txid")
aliceFundPoint := wire.OutPoint{
Hash: *aliceChanTXID,
Index: chanPointAliceBob.OutputIndex,
}
// Create a new node Carol.
carol := net.NewNode(t.t, "Carol", nil)
defer shutdownAndAssert(net, t, carol)
// Connect Carol to Bob.
net.ConnectNodes(t.t, carol, net.Bob)
// Open a channel with 100k satoshis between Bob and Carol.
chanPointBobCarol := openChannelAndAssert(
t, net, net.Bob, carol,
lntest.OpenChannelParams{
Amt: chanAmt,
Private: true,
},
)
defer closeChannelAndAssert(t, net, net.Bob, chanPointBobCarol, false)
// Get Bob's funding point.
bobChanTXID, err := lnrpc.GetChanPointFundingTxid(chanPointBobCarol)
require.NoError(t.t, err, "unable to get txid")
bobFundPoint := wire.OutPoint{
Hash: *bobChanTXID,
Index: chanPointBobCarol.OutputIndex,
}
// We should have the following topology now,
// Alice <--public:100k--> Bob <--private:100k--> Carol
//
// Now we will create an invoice for Carol.
const paymentAmt = 20000
invoice := &lnrpc.Invoice{
Memo: "routing hints",
Value: paymentAmt,
Private: true,
}
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
resp, err := carol.AddInvoice(ctxt, invoice)
require.NoError(t.t, err, "unable to create invoice for carol")
// Bob now updates the channel edge policy for the private channel.
const (
baseFeeMSat = 33000
)
timeLockDelta := uint32(chainreg.DefaultBitcoinTimeLockDelta)
updateFeeReq := &lnrpc.PolicyUpdateRequest{
BaseFeeMsat: baseFeeMSat,
TimeLockDelta: timeLockDelta,
Scope: &lnrpc.PolicyUpdateRequest_ChanPoint{
ChanPoint: chanPointBobCarol,
},
}
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
_, err = net.Bob.UpdateChannelPolicy(ctxt, updateFeeReq)
require.NoError(t.t, err, "unable to update chan policy")
// Alice pays the invoices. She will use the updated baseFeeMSat in the
// payment
payReqs := []string{resp.PaymentRequest}
require.NoError(t.t,
completePaymentRequests(
net.Alice, net.Alice.RouterClient, payReqs, true,
), "unable to send payment",
)
// Check that Alice did make the payment with two HTLCs, one failed and
// one succeeded.
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
paymentsResp, err := net.Alice.ListPayments(
ctxt, &lnrpc.ListPaymentsRequest{},
)
require.NoError(t.t, err, "failed to obtain payments for Alice")
require.Equal(t.t, 1, len(paymentsResp.Payments), "expected 1 payment")
htlcs := paymentsResp.Payments[0].Htlcs
require.Equal(t.t, 2, len(htlcs), "expected to have 2 HTLCs")
require.Equal(
t.t, lnrpc.HTLCAttempt_FAILED, htlcs[0].Status,
"the first HTLC attempt should fail",
)
require.Equal(
t.t, lnrpc.HTLCAttempt_SUCCEEDED, htlcs[1].Status,
"the second HTLC attempt should succeed",
)
// Carol should have received 20k satoshis from Bob.
assertAmountPaid(t, "Carol(remote) [<=private] Bob(local)",
carol, bobFundPoint, 0, paymentAmt)
// Bob should have sent 20k satoshis to Carol.
assertAmountPaid(t, "Bob(local) [private=>] Carol(remote)",
net.Bob, bobFundPoint, paymentAmt, 0)
// Calcuate the amount in satoshis.
amtExpected := int64(paymentAmt + baseFeeMSat/1000)
// Bob should have received 20k satoshis + fee from Alice.
assertAmountPaid(t, "Bob(remote) <= Alice(local)",
net.Bob, aliceFundPoint, 0, amtExpected)
// Alice should have sent 20k satoshis + fee to Bob.
assertAmountPaid(t, "Alice(local) => Bob(remote)",
net.Alice, aliceFundPoint, amtExpected, 0)
}
// testInvoiceRoutingHints tests that the routing hints for an invoice are
// created properly.
func testInvoiceRoutingHints(net *lntest.NetworkHarness, t *harnessTest) {
@@ -1953,15 +1814,9 @@ func testRouteFeeCutoff(net *lntest.NetworkHarness, t *harnessTest) {
}
// Wait for Alice to receive the channel update from Carol.
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
aliceSub := subscribeGraphNotifications(ctxt, t, net.Alice)
defer close(aliceSub.quit)
waitForChannelUpdate(
t, aliceSub,
[]expectedChanUpdate{
{carol.PubKeyStr, expectedPolicy, chanPointCarolDave},
},
assertChannelPolicyUpdate(
t.t, net.Alice, carol.PubKeyStr,
expectedPolicy, chanPointCarolDave, false,
)
// We'll also need the channel IDs for Bob's channels in order to

View File

@@ -232,8 +232,8 @@ func TestLightningNetworkDaemon(t *testing.T) {
testCase.name,
)
AddToNodeLog(t, lndHarness.Alice, logLine)
AddToNodeLog(t, lndHarness.Bob, logLine)
lndHarness.Alice.AddToLog(logLine)
lndHarness.Bob.AddToLog(logLine)
// Start every test with the default static fee estimate.
lndHarness.SetFeeEstimate(12500)

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"io"
@@ -340,6 +341,21 @@ func (cfg NodeConfig) genArgs() []string {
return args
}
// policyUpdateMap defines a type to store channel policy updates. It has the
// format,
// {
// "chanPoint1": {
// "advertisingNode1": [
// policy1, policy2, ...
// ],
// "advertisingNode2": [
// policy1, policy2, ...
// ]
// },
// "chanPoint2": ...
// }
type policyUpdateMap map[string]map[string][]*lnrpc.RoutingPolicy
// HarnessNode represents an instance of lnd running within our test network
// harness. Each HarnessNode instance also fully embeds an RPC client in
// order to pragmatically drive the node.
@@ -369,11 +385,15 @@ type HarnessNode struct {
// edges seen for that channel within the network. When this number
// reaches 2, then it means that both edge advertisements has propagated
// through the network.
openChans map[wire.OutPoint]int
openClients map[wire.OutPoint][]chan struct{}
openChans map[wire.OutPoint]int
openChanWatchers map[wire.OutPoint][]chan struct{}
closedChans map[wire.OutPoint]struct{}
closeClients map[wire.OutPoint][]chan struct{}
closedChans map[wire.OutPoint]struct{}
closeChanWatchers map[wire.OutPoint][]chan struct{}
// policyUpdates stores a slice of seen polices by each advertising
// node and the outpoint.
policyUpdates policyUpdateMap
quit chan struct{}
wg sync.WaitGroup
@@ -446,10 +466,12 @@ func newNode(cfg NodeConfig) (*HarnessNode, error) {
NodeID: nodeNum,
chanWatchRequests: make(chan *chanWatchRequest),
openChans: make(map[wire.OutPoint]int),
openClients: make(map[wire.OutPoint][]chan struct{}),
openChanWatchers: make(map[wire.OutPoint][]chan struct{}),
closedChans: make(map[wire.OutPoint]struct{}),
closeClients: make(map[wire.OutPoint][]chan struct{}),
closedChans: make(map[wire.OutPoint]struct{}),
closeChanWatchers: make(map[wire.OutPoint][]chan struct{}),
policyUpdates: policyUpdateMap{},
}, nil
}
@@ -519,6 +541,62 @@ func NewMiner(baseLogDir, logFilename string, netParams *chaincfg.Params,
return miner, cleanUp, nil
}
// String gives the internal state of the node which is useful for debugging.
func (hn *HarnessNode) String() string {
type nodeCfg struct {
LogFilenamePrefix string
ExtraArgs []string
HasSeed bool
P2PPort int
RPCPort int
RESTPort int
ProfilePort int
AcceptKeySend bool
AcceptAMP bool
FeeURL string
}
nodeState := struct {
NodeID int
Name string
PubKey string
OpenChans map[string]int
ClosedChans map[string]struct{}
NodeCfg nodeCfg
}{
NodeID: hn.NodeID,
Name: hn.Cfg.Name,
PubKey: hn.PubKeyStr,
OpenChans: make(map[string]int),
ClosedChans: make(map[string]struct{}),
NodeCfg: nodeCfg{
LogFilenamePrefix: hn.Cfg.LogFilenamePrefix,
ExtraArgs: hn.Cfg.ExtraArgs,
HasSeed: hn.Cfg.HasSeed,
P2PPort: hn.Cfg.P2PPort,
RPCPort: hn.Cfg.RPCPort,
RESTPort: hn.Cfg.RESTPort,
AcceptKeySend: hn.Cfg.AcceptKeySend,
AcceptAMP: hn.Cfg.AcceptAMP,
FeeURL: hn.Cfg.FeeURL,
},
}
for outpoint, count := range hn.openChans {
nodeState.OpenChans[outpoint.String()] = count
}
for outpoint, count := range hn.closedChans {
nodeState.ClosedChans[outpoint.String()] = count
}
bytes, err := json.MarshalIndent(nodeState, "", "\t")
if err != nil {
return fmt.Sprintf("\n encode node state with err: %v", err)
}
return fmt.Sprintf("\nnode state: %s", bytes)
}
// DBPath returns the filepath to the channeldb database file for this node.
func (hn *HarnessNode) DBPath() string {
return hn.Cfg.DBPath()
@@ -998,50 +1076,12 @@ func (hn *HarnessNode) initLightningClient(conn *grpc.ClientConn) error {
return err
}
// Due to a race condition between the ChannelRouter starting and us
// making the subscription request, it's possible for our graph
// subscription to fail. To ensure we don't start listening for updates
// until then, we'll create a dummy subscription to ensure we can do so
// successfully before proceeding. We use a dummy subscription in order
// to not consume an update from the real one.
err := wait.NoError(func() error {
req := &lnrpc.GraphTopologySubscription{}
ctx, cancelFunc := context.WithCancel(context.Background())
topologyClient, err := hn.SubscribeChannelGraph(ctx, req)
if err != nil {
return err
}
// We'll wait to receive an error back within a one second
// timeout. This is needed since creating the client's stream is
// independent of the graph subscription being created. The
// stream is closed from the server's side upon an error.
errChan := make(chan error, 1)
go func() {
if _, err := topologyClient.Recv(); err != nil {
errChan <- err
}
}()
select {
case err = <-errChan:
case <-time.After(time.Second):
}
cancelFunc()
return err
}, DefaultTimeout)
if err != nil {
return err
}
// Launch the watcher that will hook into graph related topology change
// from the PoV of this node.
hn.wg.Add(1)
subscribed := make(chan error)
go hn.lightningNetworkWatcher(subscribed)
go hn.lightningNetworkWatcher()
return <-subscribed
return nil
}
// FetchNodeInfo queries an unlocked node to retrieve its public key.
@@ -1066,15 +1106,16 @@ func (hn *HarnessNode) FetchNodeInfo() error {
// AddToLog adds a line of choice to the node's logfile. This is useful
// to interleave test output with output from the node.
func (hn *HarnessNode) AddToLog(line string) error {
func (hn *HarnessNode) AddToLog(format string, a ...interface{}) {
// If this node was not set up with a log file, just return early.
if hn.logFile == nil {
return nil
return
}
if _, err := hn.logFile.WriteString(line); err != nil {
return err
desc := fmt.Sprintf("itest: %s\n", fmt.Sprintf(format, a...))
if _, err := hn.logFile.WriteString(desc); err != nil {
hn.PrintErr("write to log err: %v", err)
}
return nil
}
// writePidFile writes the process ID of the running lnd process to a .pid file.
@@ -1276,15 +1317,35 @@ func (hn *HarnessNode) kill() error {
return hn.cmd.Process.Kill()
}
type chanWatchType uint8
const (
// watchOpenChannel specifies that this is a request to watch an open
// channel event.
watchOpenChannel chanWatchType = iota
// watchCloseChannel specifies that this is a request to watch a close
// channel event.
watchCloseChannel
// watchPolicyUpdate specifies that this is a request to watch a policy
// update event.
watchPolicyUpdate
)
// closeChanWatchRequest is a request to the lightningNetworkWatcher to be
// notified once it's detected within the test Lightning Network, that a
// channel has either been added or closed.
type chanWatchRequest struct {
chanPoint wire.OutPoint
chanOpen bool
chanWatchType chanWatchType
eventChan chan struct{}
advertisingNode string
policy *lnrpc.RoutingPolicy
includeUnannounced bool
}
// getChanPointFundingTxid returns the given channel point's funding txid in
@@ -1336,40 +1397,19 @@ func checkChanPointInGraph(ctx context.Context,
// closed or opened within the network. In order to dispatch these
// notifications, the GraphTopologySubscription client exposed as part of the
// gRPC interface is used.
func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) {
func (hn *HarnessNode) lightningNetworkWatcher() {
defer hn.wg.Done()
graphUpdates := make(chan *lnrpc.GraphTopologyUpdate)
// Start a goroutine to receive graph updates.
hn.wg.Add(1)
go func() {
defer hn.wg.Done()
req := &lnrpc.GraphTopologySubscription{}
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
topologyClient, err := hn.SubscribeChannelGraph(ctx, req)
err := hn.receiveTopologyClientStream(graphUpdates)
if err != nil {
msg := fmt.Sprintf("%s(%d): unable to create topology "+
"client: %v (%s)", hn.Name(), hn.NodeID, err,
time.Now().String())
subscribed <- fmt.Errorf(msg)
return
}
close(subscribed)
for {
update, err := topologyClient.Recv()
if err == io.EOF {
return
} else if err != nil {
return
}
select {
case graphUpdates <- update:
case <-hn.quit:
return
}
hn.PrintErr("receive topology client stream "+
"got err:%v", err)
}
}()
@@ -1380,108 +1420,27 @@ func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) {
// the current set of registered clients to see if we can
// dispatch any requests.
case graphUpdate := <-graphUpdates:
// For each new channel, we'll increment the number of
// edges seen by one.
for _, newChan := range graphUpdate.ChannelUpdates {
txidHash, _ := getChanPointFundingTxid(newChan.ChanPoint)
txid, _ := chainhash.NewHash(txidHash)
op := wire.OutPoint{
Hash: *txid,
Index: newChan.ChanPoint.OutputIndex,
}
hn.openChans[op]++
// For this new channel, if the number of edges
// seen is less than two, then the channel
// hasn't been fully announced yet.
if numEdges := hn.openChans[op]; numEdges < 2 {
continue
}
// Otherwise, we'll notify all the registered
// clients and remove the dispatched clients.
for _, eventChan := range hn.openClients[op] {
close(eventChan)
}
delete(hn.openClients, op)
}
// For each channel closed, we'll mark that we've
// detected a channel closure while lnd was pruning the
// channel graph.
for _, closedChan := range graphUpdate.ClosedChans {
txidHash, _ := getChanPointFundingTxid(closedChan.ChanPoint)
txid, _ := chainhash.NewHash(txidHash)
op := wire.OutPoint{
Hash: *txid,
Index: closedChan.ChanPoint.OutputIndex,
}
hn.closedChans[op] = struct{}{}
// As the channel has been closed, we'll notify
// all register clients.
for _, eventChan := range hn.closeClients[op] {
close(eventChan)
}
delete(hn.closeClients, op)
}
hn.handleChannelEdgeUpdates(graphUpdate.ChannelUpdates)
hn.handleClosedChannelUpdate(graphUpdate.ClosedChans)
// TODO(yy): handle node updates too
// A new watch request, has just arrived. We'll either be able
// to dispatch immediately, or need to add the client for
// processing later.
case watchRequest := <-hn.chanWatchRequests:
targetChan := watchRequest.chanPoint
switch watchRequest.chanWatchType {
case watchOpenChannel:
// TODO(roasbeef): add update type also, checks
// for multiple of 2
hn.handleOpenChannelWatchRequest(watchRequest)
// TODO(roasbeef): add update type also, checks for
// multiple of 2
if watchRequest.chanOpen {
// If this is an open request, then it can be
// dispatched if the number of edges seen for
// the channel is at least two.
if numEdges := hn.openChans[targetChan]; numEdges >= 2 {
close(watchRequest.eventChan)
continue
}
case watchCloseChannel:
hn.handleCloseChannelWatchRequest(watchRequest)
// Before we add the channel to our set of open
// clients, we'll check to see if the channel
// is already in the channel graph of the
// target node. This lets us handle the case
// where a node has already seen a channel
// before a notification has been requested,
// causing us to miss it.
chanFound := checkChanPointInGraph(
context.Background(), hn, targetChan,
)
if chanFound {
close(watchRequest.eventChan)
continue
}
// Otherwise, we'll add this to the list of
// watch open clients for this out point.
hn.openClients[targetChan] = append(
hn.openClients[targetChan],
watchRequest.eventChan,
)
continue
case watchPolicyUpdate:
hn.handlePolicyUpdateWatchRequest(watchRequest)
}
// If this is a close request, then it can be
// immediately dispatched if we've already seen a
// channel closure for this channel.
if _, ok := hn.closedChans[targetChan]; ok {
close(watchRequest.eventChan)
continue
}
// Otherwise, we'll add this to the list of close watch
// clients for this out point.
hn.closeClients[targetChan] = append(
hn.closeClients[targetChan],
watchRequest.eventChan,
)
case <-hn.quit:
return
}
@@ -1493,33 +1452,28 @@ func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) {
// considered "fully advertised" once both of its directional edges has been
// advertised within the test Lightning Network.
func (hn *HarnessNode) WaitForNetworkChannelOpen(ctx context.Context,
op *lnrpc.ChannelPoint) error {
chanPoint *lnrpc.ChannelPoint) error {
eventChan := make(chan struct{})
txidHash, err := getChanPointFundingTxid(op)
op, err := MakeOutpoint(chanPoint)
if err != nil {
return err
}
txid, err := chainhash.NewHash(txidHash)
if err != nil {
return err
return fmt.Errorf("failed to create outpoint for %v "+
"got err: %v", chanPoint, err)
}
hn.chanWatchRequests <- &chanWatchRequest{
chanPoint: wire.OutPoint{
Hash: *txid,
Index: op.OutputIndex,
},
eventChan: eventChan,
chanOpen: true,
chanPoint: op,
eventChan: eventChan,
chanWatchType: watchOpenChannel,
}
select {
case <-eventChan:
return nil
case <-ctx.Done():
return fmt.Errorf("channel not opened before timeout")
return fmt.Errorf("channel:%s not opened before timeout: %s",
op, hn)
}
}
@@ -1528,33 +1482,69 @@ func (hn *HarnessNode) WaitForNetworkChannelOpen(ctx context.Context,
// closed once a transaction spending the funding outpoint is seen within a
// confirmed block.
func (hn *HarnessNode) WaitForNetworkChannelClose(ctx context.Context,
op *lnrpc.ChannelPoint) error {
chanPoint *lnrpc.ChannelPoint) error {
eventChan := make(chan struct{})
txidHash, err := getChanPointFundingTxid(op)
op, err := MakeOutpoint(chanPoint)
if err != nil {
return err
}
txid, err := chainhash.NewHash(txidHash)
if err != nil {
return err
return fmt.Errorf("failed to create outpoint for %v "+
"got err: %v", chanPoint, err)
}
hn.chanWatchRequests <- &chanWatchRequest{
chanPoint: wire.OutPoint{
Hash: *txid,
Index: op.OutputIndex,
},
eventChan: eventChan,
chanOpen: false,
chanPoint: op,
eventChan: eventChan,
chanWatchType: watchCloseChannel,
}
select {
case <-eventChan:
return nil
case <-ctx.Done():
return fmt.Errorf("channel not closed before timeout")
return fmt.Errorf("channel:%s not closed before timeout: "+
"%s", op, hn)
}
}
// WaitForChannelPolicyUpdate will block until a channel policy with the target
// outpoint and advertisingNode is seen within the network.
func (hn *HarnessNode) WaitForChannelPolicyUpdate(ctx context.Context,
advertisingNode string, policy *lnrpc.RoutingPolicy,
chanPoint *lnrpc.ChannelPoint, includeUnannounced bool) error {
eventChan := make(chan struct{})
op, err := MakeOutpoint(chanPoint)
if err != nil {
return fmt.Errorf("failed to create outpoint for %v"+
"got err: %v", chanPoint, err)
}
ticker := time.NewTicker(wait.PollInterval)
defer ticker.Stop()
for {
select {
// Send a watch request every second.
case <-ticker.C:
hn.chanWatchRequests <- &chanWatchRequest{
chanPoint: op,
eventChan: eventChan,
chanWatchType: watchPolicyUpdate,
policy: policy,
advertisingNode: advertisingNode,
includeUnannounced: includeUnannounced,
}
case <-eventChan:
return nil
case <-ctx.Done():
return fmt.Errorf("channel:%s policy not updated "+
"before timeout: [%s:%v] %s", op,
advertisingNode, policy, hn.String())
}
}
}
@@ -1616,3 +1606,350 @@ func (hn *HarnessNode) WaitForBalance(expectedBalance btcutil.Amount, confirmed
return nil
}
// PrintErr prints an error to the console.
func (hn *HarnessNode) PrintErr(format string, a ...interface{}) {
fmt.Printf("itest error from [node:%s]: %s\n",
hn.Cfg.Name, fmt.Sprintf(format, a...))
}
// MakeOutpoint returns the outpoint of the channel's funding transaction.
func MakeOutpoint(chanPoint *lnrpc.ChannelPoint) (wire.OutPoint, error) {
fundingTxID, err := lnrpc.GetChanPointFundingTxid(chanPoint)
if err != nil {
return wire.OutPoint{}, err
}
return wire.OutPoint{
Hash: *fundingTxID,
Index: chanPoint.OutputIndex,
}, nil
}
// handleChannelEdgeUpdates takes a series of channel edge updates, extracts
// the outpoints, and saves them to harness node's internal state.
func (hn *HarnessNode) handleChannelEdgeUpdates(
updates []*lnrpc.ChannelEdgeUpdate) {
// For each new channel, we'll increment the number of
// edges seen by one.
for _, newChan := range updates {
op, err := MakeOutpoint(newChan.ChanPoint)
if err != nil {
hn.PrintErr("failed to create outpoint for %v "+
"got err: %v", newChan.ChanPoint, err)
return
}
hn.openChans[op]++
// For this new channel, if the number of edges seen is less
// than two, then the channel hasn't been fully announced yet.
if numEdges := hn.openChans[op]; numEdges < 2 {
return
}
// Otherwise, we'll notify all the registered watchers and
// remove the dispatched watchers.
for _, eventChan := range hn.openChanWatchers[op] {
close(eventChan)
}
delete(hn.openChanWatchers, op)
// Check whether there's a routing policy update. If so, save
// it to the node state.
if newChan.RoutingPolicy == nil {
continue
}
// Append the policy to the slice.
node := newChan.AdvertisingNode
policies := hn.policyUpdates[op.String()]
// If the map[op] is nil, we need to initialize the map first.
if policies == nil {
policies = make(map[string][]*lnrpc.RoutingPolicy)
}
policies[node] = append(
policies[node], newChan.RoutingPolicy,
)
hn.policyUpdates[op.String()] = policies
}
}
// handleOpenChannelWatchRequest processes a watch open channel request by
// checking the number of the edges seen for a given channel point. If the
// number is no less than 2 then the channel is considered open. Otherwise, we
// will attempt to find it in its channel graph. If neither can be found, the
// request is added to a watch request list than will be handled by
// handleChannelEdgeUpdates.
func (hn *HarnessNode) handleOpenChannelWatchRequest(req *chanWatchRequest) {
targetChan := req.chanPoint
// If this is an open request, then it can be dispatched if the number
// of edges seen for the channel is at least two.
if numEdges := hn.openChans[targetChan]; numEdges >= 2 {
close(req.eventChan)
return
}
// Before we add the channel to our set of open clients, we'll check to
// see if the channel is already in the channel graph of the target
// node. This lets us handle the case where a node has already seen a
// channel before a notification has been requested, causing us to miss
// it.
chanFound := checkChanPointInGraph(context.Background(), hn, targetChan)
if chanFound {
close(req.eventChan)
return
}
// Otherwise, we'll add this to the list of open channel watchers for
// this out point.
hn.openChanWatchers[targetChan] = append(
hn.openChanWatchers[targetChan],
req.eventChan,
)
}
// handleClosedChannelUpdate takes a series of closed channel updates, extracts
// the outpoints, saves them to harness node's internal state, and notifies all
// registered clients.
func (hn *HarnessNode) handleClosedChannelUpdate(
updates []*lnrpc.ClosedChannelUpdate) {
// For each channel closed, we'll mark that we've detected a channel
// closure while lnd was pruning the channel graph.
for _, closedChan := range updates {
op, err := MakeOutpoint(closedChan.ChanPoint)
if err != nil {
hn.PrintErr("failed to create outpoint for %v "+
"got err: %v", closedChan.ChanPoint, err)
return
}
hn.closedChans[op] = struct{}{}
// As the channel has been closed, we'll notify all register
// watchers.
for _, eventChan := range hn.closeChanWatchers[op] {
close(eventChan)
}
delete(hn.closeChanWatchers, op)
}
}
// handleCloseChannelWatchRequest processes a watch close channel request by
// checking whether the given channel point can be found in the node's internal
// state. If not, the request is added to a watch request list than will be
// handled by handleCloseChannelWatchRequest.
func (hn *HarnessNode) handleCloseChannelWatchRequest(req *chanWatchRequest) {
targetChan := req.chanPoint
// If this is a close request, then it can be immediately dispatched if
// we've already seen a channel closure for this channel.
if _, ok := hn.closedChans[targetChan]; ok {
close(req.eventChan)
return
}
// Otherwise, we'll add this to the list of close channel watchers for
// this out point.
hn.closeChanWatchers[targetChan] = append(
hn.closeChanWatchers[targetChan],
req.eventChan,
)
}
type topologyClient lnrpc.Lightning_SubscribeChannelGraphClient
// newTopologyClient creates a topology client.
func (hn *HarnessNode) newTopologyClient(
ctx context.Context) (topologyClient, error) {
req := &lnrpc.GraphTopologySubscription{}
client, err := hn.SubscribeChannelGraph(ctx, req)
if err != nil {
return nil, fmt.Errorf("%s(%d): unable to create topology "+
"client: %v (%s)", hn.Name(), hn.NodeID, err,
time.Now().String())
}
return client, nil
}
// receiveTopologyClientStream initializes a topologyClient to subscribe
// topology update events. Due to a race condition between the ChannelRouter
// starting and us making the subscription request, it's possible for our graph
// subscription to fail. In that case, we will retry the subscription until it
// succeeds or fail after 10 seconds.
//
// NOTE: must be run as a goroutine.
func (hn *HarnessNode) receiveTopologyClientStream(
receiver chan *lnrpc.GraphTopologyUpdate) error {
ctxb := context.Background()
// Create a topology client to receive graph updates.
client, err := hn.newTopologyClient(ctxb)
if err != nil {
return fmt.Errorf("create topologyClient failed: %v", err)
}
// We use the context to time out when retrying graph subscription.
ctxt, cancel := context.WithTimeout(ctxb, DefaultTimeout)
defer cancel()
for {
update, err := client.Recv()
switch {
case err == nil:
// Good case. We will send the update to the receiver.
case strings.Contains(err.Error(), "router not started"):
// If the router hasn't been started, we will retry
// every 200 ms until it has been started or fail
// after the ctxt is timed out.
select {
case <-ctxt.Done():
return fmt.Errorf("graph subscription: " +
"router not started before timeout")
case <-time.After(wait.PollInterval):
case <-hn.quit:
return nil
}
// Re-create the topology client.
client, err = hn.newTopologyClient(ctxb)
if err != nil {
return fmt.Errorf("create topologyClient "+
"failed: %v", err)
}
continue
case strings.Contains(err.Error(), "EOF"):
// End of subscription stream. Do nothing and quit.
return nil
default:
// An expected error is returned, return and leave it
// to be handled by the caller.
return fmt.Errorf("graph subscription err: %v", err)
}
// Send the update or quit.
select {
case receiver <- update:
case <-hn.quit:
return nil
}
}
}
// CheckChannelPolicy checks that the policy matches the expected one.
func CheckChannelPolicy(policy, expectedPolicy *lnrpc.RoutingPolicy) error {
if policy.FeeBaseMsat != expectedPolicy.FeeBaseMsat {
return fmt.Errorf("expected base fee %v, got %v",
expectedPolicy.FeeBaseMsat, policy.FeeBaseMsat)
}
if policy.FeeRateMilliMsat != expectedPolicy.FeeRateMilliMsat {
return fmt.Errorf("expected fee rate %v, got %v",
expectedPolicy.FeeRateMilliMsat,
policy.FeeRateMilliMsat)
}
if policy.TimeLockDelta != expectedPolicy.TimeLockDelta {
return fmt.Errorf("expected time lock delta %v, got %v",
expectedPolicy.TimeLockDelta,
policy.TimeLockDelta)
}
if policy.MinHtlc != expectedPolicy.MinHtlc {
return fmt.Errorf("expected min htlc %v, got %v",
expectedPolicy.MinHtlc, policy.MinHtlc)
}
if policy.MaxHtlcMsat != expectedPolicy.MaxHtlcMsat {
return fmt.Errorf("expected max htlc %v, got %v",
expectedPolicy.MaxHtlcMsat, policy.MaxHtlcMsat)
}
if policy.Disabled != expectedPolicy.Disabled {
return errors.New("edge should be disabled but isn't")
}
return nil
}
// handlePolicyUpdateWatchRequest checks that if the expected policy can be
// found either in the node's interval state or describe graph response. If
// found, it will signal the request by closing the event channel. Otherwise it
// does nothing but returns nil.
func (hn *HarnessNode) handlePolicyUpdateWatchRequest(req *chanWatchRequest) {
op := req.chanPoint
// Get a list of known policies for this chanPoint+advertisingNode
// combination. Start searching in the node state first.
policies, ok := hn.policyUpdates[op.String()][req.advertisingNode]
if !ok {
// If it cannot be found in the node state, try searching it
// from the node's DescribeGraph.
policyMap := hn.getChannelPolicies(req.includeUnannounced)
policies, ok = policyMap[op.String()][req.advertisingNode]
if !ok {
return
}
}
// Check if there's a matched policy.
for _, policy := range policies {
if CheckChannelPolicy(policy, req.policy) == nil {
close(req.eventChan)
return
}
}
}
// getChannelPolicies queries the channel graph and formats the policies into
// the format defined in type policyUpdateMap.
func (hn *HarnessNode) getChannelPolicies(include bool) policyUpdateMap {
ctxt, cancel := context.WithTimeout(
context.Background(), DefaultTimeout,
)
defer cancel()
graph, err := hn.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{
IncludeUnannounced: include,
})
if err != nil {
hn.PrintErr("DescribeGraph got err: %v", err)
return nil
}
policyUpdates := policyUpdateMap{}
for _, e := range graph.Edges {
policies := policyUpdates[e.ChanPoint]
// If the map[op] is nil, we need to initialize the map first.
if policies == nil {
policies = make(map[string][]*lnrpc.RoutingPolicy)
}
if e.Node1Policy != nil {
policies[e.Node1Pub] = append(
policies[e.Node1Pub], e.Node1Policy,
)
}
if e.Node2Policy != nil {
policies[e.Node2Pub] = append(
policies[e.Node2Pub], e.Node2Policy,
)
}
policyUpdates[e.ChanPoint] = policies
}
return policyUpdates
}

View File

@@ -5,17 +5,18 @@ import (
"time"
)
// PollInterval is a constant specifying a 200 ms interval.
const PollInterval = 200 * time.Millisecond
// Predicate is a helper test function that will wait for a timeout period of
// time until the passed predicate returns true. This function is helpful as
// timing doesn't always line up well when running integration tests with
// several running lnd nodes. This function gives callers a way to assert that
// some property is upheld within a particular time frame.
func Predicate(pred func() bool, timeout time.Duration) error {
const pollInterval = 200 * time.Millisecond
exitTimer := time.After(timeout)
for {
<-time.After(pollInterval)
<-time.After(PollInterval)
select {
case <-exitTimer:

View File

@@ -125,7 +125,7 @@ func (r *ChannelRouter) notifyTopologyChange(topologyDiff *TopologyChange) {
return
}
log.Tracef("Sending topology notification to %v clients %v",
log.Debugf("Sending topology notification to %v clients %v",
numClients,
newLogClosure(func() string {
return spew.Sdump(topologyDiff)

112
server.go
View File

@@ -1497,18 +1497,6 @@ func (s *server) Start() error {
cleanup := cleaner{}
s.start.Do(func() {
if s.torController != nil {
if err := s.createNewHiddenService(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.torController.Stop)
}
if s.natTraversal != nil {
s.wg.Add(1)
go s.watchExternalIP()
}
if s.hostAnn != nil {
if err := s.hostAnn.Start(); err != nil {
@@ -1574,12 +1562,6 @@ func (s *server) Start() error {
}
cleanup = cleanup.add(s.htlcNotifier.Stop)
if err := s.sphinx.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.sphinx.Stop)
if s.towerClient != nil {
if err := s.towerClient.Start(); err != nil {
startErr = err
@@ -1595,12 +1577,6 @@ func (s *server) Start() error {
cleanup = cleanup.add(s.anchorTowerClient.Stop)
}
if err := s.htlcSwitch.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.htlcSwitch.Stop)
if err := s.sweeper.Start(); err != nil {
startErr = err
return
@@ -1613,18 +1589,24 @@ func (s *server) Start() error {
}
cleanup = cleanup.add(s.utxoNursery.Stop)
if err := s.chainArb.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.chainArb.Stop)
if err := s.breachArbiter.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.breachArbiter.Stop)
if err := s.fundingMgr.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.fundingMgr.Stop)
if err := s.chainArb.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.chainArb.Stop)
if err := s.authGossiper.Start(); err != nil {
startErr = err
return
@@ -1637,18 +1619,24 @@ func (s *server) Start() error {
}
cleanup = cleanup.add(s.chanRouter.Stop)
if err := s.fundingMgr.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.fundingMgr.Stop)
if err := s.invoices.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.invoices.Stop)
if err := s.sphinx.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.sphinx.Stop)
if err := s.htlcSwitch.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.htlcSwitch.Stop)
if err := s.chanStatusMgr.Start(); err != nil {
startErr = err
return
@@ -1708,6 +1696,20 @@ func (s *server) Start() error {
}
cleanup = cleanup.add(s.chanSubSwapper.Stop)
if s.torController != nil {
if err := s.createNewHiddenService(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.torController.Stop)
}
if s.natTraversal != nil {
s.wg.Add(1)
go s.watchExternalIP()
}
// Start connmgr last to prevent connections before init.
s.connMgr.Start()
cleanup = cleanup.add(func() error {
s.connMgr.Stop()
@@ -1824,32 +1826,38 @@ func (s *server) Stop() error {
close(s.quit)
// Shutdown connMgr first to prevent conns during shutdown.
s.connMgr.Stop()
// Shutdown the wallet, funding manager, and the rpc server.
s.chanStatusMgr.Stop()
if err := s.cc.ChainNotifier.Stop(); err != nil {
srvrLog.Warnf("Unable to stop ChainNotifier: %v", err)
}
if err := s.chanRouter.Stop(); err != nil {
srvrLog.Warnf("failed to stop chanRouter: %v", err)
}
if err := s.htlcSwitch.Stop(); err != nil {
srvrLog.Warnf("failed to stop htlcSwitch: %v", err)
}
if err := s.sphinx.Stop(); err != nil {
srvrLog.Warnf("failed to stop sphinx: %v", err)
}
if err := s.utxoNursery.Stop(); err != nil {
srvrLog.Warnf("failed to stop utxoNursery: %v", err)
if err := s.invoices.Stop(); err != nil {
srvrLog.Warnf("failed to stop invoices: %v", err)
}
if err := s.chanRouter.Stop(); err != nil {
srvrLog.Warnf("failed to stop chanRouter: %v", err)
}
if err := s.chainArb.Stop(); err != nil {
srvrLog.Warnf("failed to stop chainArb: %v", err)
}
if err := s.fundingMgr.Stop(); err != nil {
srvrLog.Warnf("failed to stop fundingMgr: %v", err)
}
if err := s.breachArbiter.Stop(); err != nil {
srvrLog.Warnf("failed to stop breachArbiter: %v", err)
}
if err := s.utxoNursery.Stop(); err != nil {
srvrLog.Warnf("failed to stop utxoNursery: %v", err)
}
if err := s.authGossiper.Stop(); err != nil {
srvrLog.Warnf("failed to stop authGossiper: %v", err)
}
if err := s.chainArb.Stop(); err != nil {
srvrLog.Warnf("failed to stop chainArb: %v", err)
}
if err := s.sweeper.Stop(); err != nil {
srvrLog.Warnf("failed to stop sweeper: %v", err)
}
@@ -1862,16 +1870,12 @@ func (s *server) Stop() error {
if err := s.htlcNotifier.Stop(); err != nil {
srvrLog.Warnf("failed to stop htlcNotifier: %v", err)
}
s.connMgr.Stop()
if err := s.invoices.Stop(); err != nil {
srvrLog.Warnf("failed to stop invoices: %v", err)
}
if err := s.fundingMgr.Stop(); err != nil {
srvrLog.Warnf("failed to stop fundingMgr: %v", err)
}
if err := s.chanSubSwapper.Stop(); err != nil {
srvrLog.Warnf("failed to stop chanSubSwapper: %v", err)
}
if err := s.cc.ChainNotifier.Stop(); err != nil {
srvrLog.Warnf("Unable to stop ChainNotifier: %v", err)
}
s.chanEventStore.Stop()
s.missionControl.StopStoreTicker()