From 4a1e06b2041a8ec365c3c4e020b68eb4509438c0 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 12 Feb 2019 12:40:25 +0100 Subject: [PATCH 1/3] autopilot/agent: distribute available funds among channels This commit fixes a regression in how we allocate funds to attempted channels. We would earlier stay within the channel size limits, but we wouldn't account for funds consumed by other channels being opened in parallel. We fix this by introducing a loop which greadily tries to distribute the funds among the channels to open, and reduces the number of channels to open in case not enough funds are available to satisfy the channel size limits. --- autopilot/agent.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 2ff5cf0b..2f257173 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -567,10 +567,15 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32, // As channel size we'll use the maximum channel size available. chanSize := a.cfg.Constraints.MaxChanSize() - if availableFunds-chanSize < 0 { + if availableFunds < chanSize { chanSize = availableFunds } + if chanSize < a.cfg.Constraints.MinChanSize() { + return fmt.Errorf("not enough funds available to open a " + + "single channel") + } + // Use the heuristic to calculate a score for each node in the // graph. scores, err := a.cfg.Heuristic.NodeScores( @@ -601,6 +606,17 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32, continue } + // Track the available funds we have left. + if availableFunds < chanSize { + chanSize = availableFunds + } + availableFunds -= chanSize + + // If we run out of funds, we can break early. + if chanSize < a.cfg.Constraints.MinChanSize() { + break + } + chanCandidates[nID] = &AttachmentDirective{ NodeID: nID, ChanAmt: chanSize, @@ -725,8 +741,7 @@ func (a *Agent) executeDirective(directive AttachmentDirective) { // fewer slots were available, and other successful attempts finished // first. a.pendingMtx.Lock() - if uint16(len(a.pendingOpens)) >= - a.cfg.Constraints.MaxPendingOpens() { + if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens() { // Since we've reached our max number of pending opens, we'll // disconnect this peer and exit. However, if we were // previously connected to them, then we'll make sure to @@ -799,6 +814,9 @@ func (a *Agent) executeDirective(directive AttachmentDirective) { // Since the channel open was successful and is currently pending, // we'll trigger the autopilot agent to query for more peers. + // TODO(halseth): this triggers a new loop before all the new channels + // are added to the pending channels map. Should add before executing + // directive in goroutine? a.OnChannelPendingOpen() } From b71f4632a62a05c75912565d10d91e68761b3b40 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 5 Mar 2019 13:58:06 +0100 Subject: [PATCH 2/3] autopilot/agent_test: define testCtx, setup, other helpers This commit defines a set of helper methods that are used by many of the existing tests. --- autopilot/agent_test.go | 1155 ++++++++++----------------------------- 1 file changed, 275 insertions(+), 880 deletions(-) diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 2c0b2280..ff7a8be1 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -150,10 +150,20 @@ func (m *mockChanController) SpliceOut(chanPoint *wire.OutPoint, var _ ChannelController = (*mockChanController)(nil) -// TestAgentChannelOpenSignal tests that upon receipt of a chanOpenUpdate, then -// agent modifies its local state accordingly, and reconsults the heuristic. -func TestAgentChannelOpenSignal(t *testing.T) { - t.Parallel() +type testContext struct { + constraints *mockConstraints + heuristic *mockHeuristic + chanController ChannelController + graph testGraph + agent *Agent + walletBalance btcutil.Amount + + quit chan struct{} + sync.Mutex +} + +func setup(t *testing.T, initialChans []Channel) (*testContext, func()) { + t.Helper() // First, we'll create all the dependencies that we'll need in order to // create the autopilot agent. @@ -164,11 +174,13 @@ func TestAgentChannelOpenSignal(t *testing.T) { quit := make(chan struct{}) heuristic := &mockHeuristic{ + nodeScoresArgs: make(chan directiveArg), nodeScoresResps: make(chan map[NodeID]*NodeScore), quit: quit, } constraints := &mockConstraints{ moreChansResps: make(chan moreChansResp), + moreChanArgs: make(chan moreChanArg), quit: quit, } @@ -177,6 +189,19 @@ func TestAgentChannelOpenSignal(t *testing.T) { } memGraph, _, _ := newMemChanGraph() + // We'll keep track of the funds available to the agent, to make sure + // it correctly uses this value when querying the ChannelBudget. + var availableFunds btcutil.Amount = 10 * btcutil.SatoshiPerBitcoin + + ctx := &testContext{ + constraints: constraints, + heuristic: heuristic, + chanController: chanController, + graph: memGraph, + walletBalance: availableFunds, + quit: quit, + } + // With the dependencies we created, we can now create the initial // agent itself. testCfg := Config{ @@ -184,7 +209,9 @@ func TestAgentChannelOpenSignal(t *testing.T) { Heuristic: heuristic, ChanController: chanController, WalletBalance: func() (btcutil.Amount, error) { - return 0, nil + ctx.Lock() + defer ctx.Unlock() + return ctx.walletBalance, nil }, ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { return false, nil @@ -195,35 +222,80 @@ func TestAgentChannelOpenSignal(t *testing.T) { Graph: memGraph, Constraints: constraints, } - initialChans := []Channel{} + agent, err := New(testCfg, initialChans) if err != nil { t.Fatalf("unable to create agent: %v", err) } + ctx.agent = agent - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll star the + // With the autopilot agent and all its dependencies we'll start the // primary controller goroutine. if err := agent.Start(); err != nil { t.Fatalf("unable to start agent: %v", err) } - defer agent.Stop() - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + cleanup := func() { + // We must close quit before agent.Stop(), to make sure + // ChannelBudget won't block preventing the agent from exiting. + close(quit) + agent.Stop() + } + + return ctx, cleanup +} + +// respondMoreChans consumes the moreChanArgs element and responds to the agent +// with the given moreChansResp. +func respondMoreChans(t *testing.T, testCtx *testContext, resp moreChansResp) { + t.Helper() + + // The agent should now query the heuristic. + select { + case <-testCtx.constraints.moreChanArgs: + case <-time.After(time.Second * 3): + t.Fatalf("heuristic wasn't queried in time") + } + + // We'll send the response. + select { + case testCtx.constraints.moreChansResps <- resp: + case <-time.After(time.Second * 10): + t.Fatalf("response wasn't sent in time") + } +} + +// respondMoreChans consumes the nodeScoresArgs element and responds to the +// agent with the given node scores. +func respondNodeScores(t *testing.T, testCtx *testContext, + resp map[NodeID]*NodeScore) { + t.Helper() + + // Send over an empty list of attachment directives, which should cause + // the agent to return to waiting on a new signal. + select { + case <-testCtx.heuristic.nodeScoresArgs: + case <-time.After(time.Second * 3): + t.Fatalf("node scores weren't queried in time") + } + select { + case testCtx.heuristic.nodeScoresResps <- resp: + case <-time.After(time.Second * 10): + t.Fatalf("node scores were not sent in time") + } +} + +// TestAgentChannelOpenSignal tests that upon receipt of a chanOpenUpdate, then +// agent modifies its local state accordingly, and reconsults the heuristic. +func TestAgentChannelOpenSignal(t *testing.T) { + t.Parallel() + + testCtx, cleanup := setup(t, nil) + defer cleanup() // We'll send an initial "no" response to advance the agent past its // initial check. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) // Next we'll signal a new channel being opened by the backing LN node, // with a capacity of 1 BTC. @@ -231,21 +303,17 @@ func TestAgentChannelOpenSignal(t *testing.T) { ChanID: randChanID(), Capacity: btcutil.SatoshiPerBitcoin, } - agent.OnChannelOpen(newChan) + testCtx.agent.OnChannelOpen(newChan) // The agent should now query the heuristic in order to determine its // next action as it local state has now been modified. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - // At this point, the local state of the agent should - // have also been updated to reflect that the LN node - // now has an additional channel with one BTC. - if _, ok := agent.chanState[newChan.ChanID]; !ok { - t.Fatalf("internal channel state wasn't updated") - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // now has an additional channel with one BTC. + if _, ok := testCtx.agent.chanState[newChan.ChanID]; !ok { + t.Fatalf("internal channel state wasn't updated") } // There shouldn't be a call to the Select method as we've returned @@ -254,7 +322,7 @@ func TestAgentChannelOpenSignal(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -290,79 +358,19 @@ var _ ChannelController = (*mockFailingChanController)(nil) func TestAgentChannelFailureSignal(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } + testCtx, cleanup := setup(t, nil) + defer cleanup() - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } + testCtx.chanController = &mockFailingChanController{} - chanController := &mockFailingChanController{} - memGraph, _, _ := newMemChanGraph() - node, err := memGraph.addRandNode() + node, err := testCtx.graph.addRandNode() if err != nil { t.Fatalf("unable to add node: %v", err) } - // With the dependencies we created, we can now create the initial - // agent itself. - testCfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return 0, nil - }, - // TODO: move address check to agent. - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - return false, nil - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, - } - - initialChans := []Channel{} - agent, err := New(testCfg, initialChans) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll start the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) - // First ensure the agent will attempt to open a new channel. Return // that we need more channels, and have 5BTC to use. - select { - case constraints.moreChansResps <- moreChansResp{1, 5 * btcutil.SatoshiPerBitcoin}: - case <-time.After(time.Second * 10): - t.Fatal("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, moreChansResp{1, 5 * btcutil.SatoshiPerBitcoin}) // At this point, the agent should now be querying the heuristic to // request attachment directives, return a fake so the agent will @@ -372,28 +380,17 @@ func TestAgentChannelFailureSignal(t *testing.T) { Score: 0.5, } - select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ - NewNodeID(node): fakeDirective, - }: - case <-time.After(time.Second * 10): - t.Fatal("heuristic wasn't queried in time") - } + respondNodeScores( + t, testCtx, map[NodeID]*NodeScore{ + NewNodeID(node): fakeDirective, + }, + ) // At this point the agent will attempt to create a channel and fail. // Now ensure that the controller loop is re-executed. - select { - case constraints.moreChansResps <- moreChansResp{1, 5 * btcutil.SatoshiPerBitcoin}: - case <-time.After(time.Second * 10): - t.Fatal("heuristic wasn't queried in time") - } - - select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: - case <-time.After(time.Second * 10): - t.Fatal("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, moreChansResp{1, 5 * btcutil.SatoshiPerBitcoin}) + respondNodeScores(t, testCtx, map[NodeID]*NodeScore{}) } // TestAgentChannelCloseSignal ensures that once the agent receives an outside @@ -401,48 +398,6 @@ func TestAgentChannelFailureSignal(t *testing.T) { // will query the heuristic to make its next decision. func TestAgentChannelCloseSignal(t *testing.T) { t.Parallel() - - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // With the dependencies we created, we can now create the initial - // agent itself. - testCfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return 0, nil - }, - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - return false, nil - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, - } - // We'll start the agent with two channels already being active. initialChans := []Channel{ { @@ -454,52 +409,27 @@ func TestAgentChannelCloseSignal(t *testing.T) { Capacity: btcutil.SatoshiPerBitcoin * 2, }, } - agent, err := New(testCfg, initialChans) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll star the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + testCtx, cleanup := setup(t, initialChans) + defer cleanup() // We'll send an initial "no" response to advance the agent past its // initial check. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) // Next, we'll close both channels which should force the agent to // re-query the heuristic. - agent.OnChannelClose(initialChans[0].ChanID, initialChans[1].ChanID) + testCtx.agent.OnChannelClose(initialChans[0].ChanID, initialChans[1].ChanID) // The agent should now query the heuristic in order to determine its // next action as it local state has now been modified. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - // At this point, the local state of the agent should - // have also been updated to reflect that the LN node - // has no existing open channels. - if len(agent.chanState) != 0 { - t.Fatalf("internal channel state wasn't updated") - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // has no existing open channels. + if len(testCtx.agent.chanState) != 0 { + t.Fatalf("internal channel state wasn't updated") } // There shouldn't be a call to the Select method as we've returned @@ -508,7 +438,7 @@ func TestAgentChannelCloseSignal(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -522,105 +452,32 @@ func TestAgentChannelCloseSignal(t *testing.T) { func TestAgentBalanceUpdate(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 2 BTC available. - var walletBalanceMtx sync.Mutex - walletBalance := btcutil.Amount(btcutil.SatoshiPerBitcoin * 2) - - // With the dependencies we created, we can now create the initial - // agent itself. - testCfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - walletBalanceMtx.Lock() - defer walletBalanceMtx.Unlock() - return walletBalance, nil - }, - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - return false, nil - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, - } - initialChans := []Channel{} - agent, err := New(testCfg, initialChans) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll star the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + testCtx, cleanup := setup(t, nil) + defer cleanup() // We'll send an initial "no" response to advance the agent past its // initial check. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) // Next we'll send a new balance update signal to the agent, adding 5 // BTC to the amount of available funds. - walletBalanceMtx.Lock() - walletBalance += btcutil.SatoshiPerBitcoin * 5 - walletBalanceMtx.Unlock() + testCtx.Lock() + testCtx.walletBalance += btcutil.SatoshiPerBitcoin * 5 + testCtx.Unlock() - agent.OnBalanceChange() + testCtx.agent.OnBalanceChange() // The agent should now query the heuristic in order to determine its // next action as it local state has now been modified. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - // At this point, the local state of the agent should - // have also been updated to reflect that the LN node - // now has an additional 5BTC available. - if agent.totalBalance != walletBalance { - t.Fatalf("expected %v wallet balance "+ - "instead have %v", agent.totalBalance, - walletBalance) - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // now has an additional 5BTC available. + if testCtx.agent.totalBalance != testCtx.walletBalance { + t.Fatalf("expected %v wallet balance "+ + "instead have %v", testCtx.agent.totalBalance, + testCtx.walletBalance) } // There shouldn't be a call to the Select method as we've returned @@ -629,7 +486,7 @@ func TestAgentBalanceUpdate(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -642,70 +499,8 @@ func TestAgentBalanceUpdate(t *testing.T) { func TestAgentImmediateAttach(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 10 BTC available. - const walletBalance = btcutil.SatoshiPerBitcoin * 10 - - // With the dependencies we created, we can now create the initial - // agent itself. - testCfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return walletBalance, nil - }, - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - return false, nil - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, - } - initialChans := []Channel{} - agent, err := New(testCfg, initialChans) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll star the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + testCtx, cleanup := setup(t, nil) + defer cleanup() const numChans = 5 @@ -713,7 +508,7 @@ func TestAgentImmediateAttach(t *testing.T) { directives := make(map[NodeID]*NodeScore) nodeKeys := make(map[NodeID]struct{}) for i := 0; i < numChans; i++ { - pub, err := memGraph.addRandNode() + pub, err := testCtx.graph.addRandNode() if err != nil { t.Fatalf("unable to generate key: %v", err) } @@ -727,32 +522,23 @@ func TestAgentImmediateAttach(t *testing.T) { // The very first thing the agent should do is query the NeedMoreChans // method on the passed heuristic. So we'll provide it with a response // that will kick off the main loop. - select { - - // We'll send over a response indicating that it should - // establish more channels, and give it a budget of 5 BTC to do - // so. - case constraints.moreChansResps <- moreChansResp{ - numMore: numChans, - amt: 5 * btcutil.SatoshiPerBitcoin, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, + moreChansResp{ + numMore: numChans, + amt: 5 * btcutil.SatoshiPerBitcoin, + }, + ) // At this point, the agent should now be querying the heuristic to // requests attachment directives. With our fake directives created, // we'll now send then to the agent as a return value for the Select // function. - select { - case heuristic.nodeScoresResps <- directives: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondNodeScores(t, testCtx, directives) // Finally, we should receive 5 calls to the OpenChannel method with // the exact same parameters that we specified within the attachment // directives. + chanController := testCtx.chanController.(*mockChanController) for i := 0; i < numChans; i++ { select { case openChan := <-chanController.openChanSignals: @@ -779,72 +565,12 @@ func TestAgentImmediateAttach(t *testing.T) { func TestAgentPrivateChannels(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } + testCtx, cleanup := setup(t, nil) + defer cleanup() // The chanController should be initialized such that all of its open // channel requests are for private channels. - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - private: true, - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 10 BTC available. - const walletBalance = btcutil.SatoshiPerBitcoin * 10 - - // With the dependencies we created, we can now create the initial - // agent itself. - cfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return walletBalance, nil - }, - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - return false, nil - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, - } - agent, err := New(cfg, nil) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll star the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + testCtx.chanController.(*mockChanController).private = true const numChans = 5 @@ -852,7 +578,7 @@ func TestAgentPrivateChannels(t *testing.T) { // agent's graph, and it can progress within its loop. directives := make(map[NodeID]*NodeScore) for i := 0; i < numChans; i++ { - pub, err := memGraph.addRandNode() + pub, err := testCtx.graph.addRandNode() if err != nil { t.Fatalf("unable to generate key: %v", err) } @@ -871,23 +597,17 @@ func TestAgentPrivateChannels(t *testing.T) { numMore: numChans, amt: 5 * btcutil.SatoshiPerBitcoin, } - select { - case constraints.moreChansResps <- resp: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, resp) + // At this point, the agent should now be querying the heuristic to // requests attachment directives. With our fake directives created, // we'll now send then to the agent as a return value for the Select // function. - select { - case heuristic.nodeScoresResps <- directives: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondNodeScores(t, testCtx, directives) // Finally, we should receive 5 calls to the OpenChannel method, each // specifying that it's for a private channel. + chanController := testCtx.chanController.(*mockChanController) for i := 0; i < numChans; i++ { select { case openChan := <-chanController.openChanSignals: @@ -906,77 +626,11 @@ func TestAgentPrivateChannels(t *testing.T) { func TestAgentPendingChannelState(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 6 BTC available. - var walletBalanceMtx sync.Mutex - walletBalance := btcutil.Amount(btcutil.SatoshiPerBitcoin * 6) - - // With the dependencies we created, we can now create the initial - // agent itself. - testCfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - walletBalanceMtx.Lock() - defer walletBalanceMtx.Unlock() - - return walletBalance, nil - }, - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - return false, nil - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, - } - initialChans := []Channel{} - agent, err := New(testCfg, initialChans) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll start the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + testCtx, cleanup := setup(t, nil) + defer cleanup() // We'll only return a single directive for a pre-chosen node. - nodeKey, err := memGraph.addRandNode() + nodeKey, err := testCtx.graph.addRandNode() if err != nil { t.Fatalf("unable to generate key: %v", err) } @@ -990,31 +644,24 @@ func TestAgentPendingChannelState(t *testing.T) { // query, that it needs more channels and has 3 BTC available for // attachment. We'll send over a response indicating that it should // establish more channels, and give it a budget of 1 BTC to do so. - select { - case constraints.moreChansResps <- moreChansResp{ - numMore: 1, - amt: btcutil.SatoshiPerBitcoin, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, + moreChansResp{ + numMore: 1, + amt: btcutil.SatoshiPerBitcoin, + }, + ) - constraints.moreChanArgs = make(chan moreChanArg) - - select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ - nodeID: nodeDirective, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - - heuristic.nodeScoresArgs = make(chan directiveArg) + respondNodeScores(t, testCtx, + map[NodeID]*NodeScore{ + nodeID: nodeDirective, + }, + ) // A request to open the channel should've also been sent. + chanController := testCtx.chanController.(*mockChanController) select { case openChan := <-chanController.openChanSignals: - chanAmt := constraints.MaxChanSize() + chanAmt := testCtx.constraints.MaxChanSize() if openChan.amt != chanAmt { t.Fatalf("invalid chan amt: expected %v, got %v", chanAmt, openChan.amt) @@ -1031,11 +678,11 @@ func TestAgentPendingChannelState(t *testing.T) { // Now, in order to test that the pending state was properly updated, // we'll trigger a balance update in order to trigger a query to the // heuristic. - walletBalanceMtx.Lock() - walletBalance += 0.4 * btcutil.SatoshiPerBitcoin - walletBalanceMtx.Unlock() + testCtx.Lock() + testCtx.walletBalance += 0.4 * btcutil.SatoshiPerBitcoin + testCtx.Unlock() - agent.OnBalanceChange() + testCtx.agent.OnBalanceChange() // The heuristic should be queried, and the argument for the set of // channels passed in should include the pending channels that @@ -1044,8 +691,8 @@ func TestAgentPendingChannelState(t *testing.T) { // The request that we get should include a pending channel for the // one that we just created, otherwise the agent isn't properly // updating its internal state. - case req := <-constraints.moreChanArgs: - chanAmt := constraints.MaxChanSize() + case req := <-testCtx.constraints.moreChanArgs: + chanAmt := testCtx.constraints.MaxChanSize() if len(req.chans) != 1 { t.Fatalf("should include pending chan in current "+ "state, instead have %v chans", len(req.chans)) @@ -1065,7 +712,7 @@ func TestAgentPendingChannelState(t *testing.T) { // We'll send across a response indicating that it *does* need more // channels. select { - case constraints.moreChansResps <- moreChansResp{1, btcutil.SatoshiPerBitcoin}: + case testCtx.constraints.moreChansResps <- moreChansResp{1, btcutil.SatoshiPerBitcoin}: case <-time.After(time.Second * 10): t.Fatalf("need more chans wasn't queried in time") } @@ -1074,7 +721,7 @@ func TestAgentPendingChannelState(t *testing.T) { // Select method. The arguments passed should reflect the fact that the // node we have a pending channel to, should be ignored. select { - case req := <-heuristic.nodeScoresArgs: + case req := <-testCtx.heuristic.nodeScoresArgs: if len(req.chans) == 0 { t.Fatalf("expected to skip %v nodes, instead "+ "skipping %v", 1, len(req.chans)) @@ -1093,88 +740,25 @@ func TestAgentPendingChannelState(t *testing.T) { func TestAgentPendingOpenChannel(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 6 BTC available. - const walletBalance = btcutil.SatoshiPerBitcoin * 6 - - // With the dependencies we created, we can now create the initial - // agent itself. - cfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return walletBalance, nil - }, - Graph: memGraph, - Constraints: constraints, - } - agent, err := New(cfg, nil) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll start the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + testCtx, cleanup := setup(t, nil) + defer cleanup() // We'll send an initial "no" response to advance the agent past its // initial check. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) // Next, we'll signal that a new channel has been opened, but it is // still pending. - agent.OnChannelPendingOpen() + testCtx.agent.OnChannelPendingOpen() // The agent should now query the heuristic in order to determine its // next action as its local state has now been modified. - select { - case constraints.moreChansResps <- moreChansResp{0, 0}: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, moreChansResp{0, 0}) // There shouldn't be a call to the Select method as we've returned // "false" for NeedMoreChans above. select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: t.Fatalf("Select was called but shouldn't have been") default: } @@ -1188,108 +772,43 @@ func TestAgentPendingOpenChannel(t *testing.T) { func TestAgentOnNodeUpdates(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 6 BTC available. - const walletBalance = btcutil.SatoshiPerBitcoin * 6 - - // With the dependencies we created, we can now create the initial - // agent itself. - cfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return walletBalance, nil - }, - Graph: memGraph, - Constraints: constraints, - } - agent, err := New(cfg, nil) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll start the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) + testCtx, cleanup := setup(t, nil) + defer cleanup() // We'll send an initial "yes" response to advance the agent past its // initial check. This will cause it to try to get directives from an // empty graph. - select { - case constraints.moreChansResps <- moreChansResp{ - numMore: 2, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans( + t, testCtx, + moreChansResp{ + numMore: 2, + amt: testCtx.walletBalance, + }, + ) // Send over an empty list of attachment directives, which should cause // the agent to return to waiting on a new signal. - select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: - case <-time.After(time.Second * 10): - t.Fatalf("Select was not called but should have been") - } + respondNodeScores(t, testCtx, map[NodeID]*NodeScore{}) // Simulate more nodes being added to the graph by informing the agent // that we have node updates. - agent.OnNodeUpdates() + testCtx.agent.OnNodeUpdates() // In response, the agent should wake up and see if it needs more // channels. Since we haven't done anything, we will send the same // response as before since we are still trying to open channels. - select { - case constraints.moreChansResps <- moreChansResp{ - numMore: 2, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans( + t, testCtx, + moreChansResp{ + numMore: 2, + amt: testCtx.walletBalance, + }, + ) // Again the agent should pull in the next set of attachment directives. // It's not important that this list is also empty, so long as the node // updates signal is causing the agent to make this attempt. - select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: - case <-time.After(time.Second * 10): - t.Fatalf("Select was not called but should have been") - } + respondNodeScores(t, testCtx, map[NodeID]*NodeScore{}) } // TestAgentSkipPendingConns asserts that the agent will not try to make @@ -1301,89 +820,29 @@ func TestAgentOnNodeUpdates(t *testing.T) { func TestAgentSkipPendingConns(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - heuristic := &mockHeuristic{ - nodeScoresArgs: make(chan directiveArg), - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 6 BTC available. - const walletBalance = btcutil.SatoshiPerBitcoin * 6 + testCtx, cleanup := setup(t, nil) + defer cleanup() connect := make(chan chan error) + testCtx.agent.cfg.ConnectToPeer = func(*btcec.PublicKey, []net.Addr) (bool, error) { + errChan := make(chan error) - // With the dependencies we created, we can now create the initial - // agent itself. - testCfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return walletBalance, nil - }, - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - errChan := make(chan error) + select { + case connect <- errChan: + case <-testCtx.quit: + return false, errors.New("quit") + } - select { - case connect <- errChan: - case <-quit: - return false, errors.New("quit") - } - - select { - case err := <-errChan: - return false, err - case <-quit: - return false, errors.New("quit") - } - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, + select { + case err := <-errChan: + return false, err + case <-testCtx.quit: + return false, errors.New("quit") + } } - initialChans := []Channel{} - agent, err := New(testCfg, initialChans) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll start the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() - - // We must defer the closing of quit after the defer agent.Stop(), to - // make sure ConnectToPeer won't block preventing the agent from - // exiting. - defer close(quit) // We'll only return a single directive for a pre-chosen node. - nodeKey, err := memGraph.addRandNode() + nodeKey, err := testCtx.graph.addRandNode() if err != nil { t.Fatalf("unable to generate key: %v", err) } @@ -1395,7 +854,7 @@ func TestAgentSkipPendingConns(t *testing.T) { // We'll also add a second node to the graph, to keep the first one // company. - nodeKey2, err := memGraph.addRandNode() + nodeKey2, err := testCtx.graph.addRandNode() if err != nil { t.Fatalf("unable to generate key: %v", err) } @@ -1404,18 +863,16 @@ func TestAgentSkipPendingConns(t *testing.T) { // We'll send an initial "yes" response to advance the agent past its // initial check. This will cause it to try to get directives from the // graph. - select { - case constraints.moreChansResps <- moreChansResp{ - numMore: 1, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, + moreChansResp{ + numMore: 1, + amt: testCtx.walletBalance, + }, + ) // Both nodes should be part of the arguments. select { - case req := <-heuristic.nodeScoresArgs: + case req := <-testCtx.heuristic.nodeScoresArgs: if len(req.nodes) != 2 { t.Fatalf("expected %v nodes, instead "+ "had %v", 2, len(req.nodes)) @@ -1433,7 +890,7 @@ func TestAgentSkipPendingConns(t *testing.T) { // Respond with a scored directive. We skip node2 for now, implicitly // giving it a zero-score. select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ NewNodeID(nodeKey): nodeDirective, }: case <-time.After(time.Second * 10): @@ -1449,22 +906,20 @@ func TestAgentSkipPendingConns(t *testing.T) { } // Signal the agent to go again, now that we've tried to connect. - agent.OnNodeUpdates() + testCtx.agent.OnNodeUpdates() // The heuristic again informs the agent that we need more channels. - select { - case constraints.moreChansResps <- moreChansResp{ - numMore: 1, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, + moreChansResp{ + numMore: 1, + amt: testCtx.walletBalance, + }, + ) // Since the node now has a pending connection, it should be skipped // and not part of the nodes attempting to be scored. select { - case req := <-heuristic.nodeScoresArgs: + case req := <-testCtx.heuristic.nodeScoresArgs: if len(req.nodes) != 1 { t.Fatalf("expected %v nodes, instead "+ "had %v", 1, len(req.nodes)) @@ -1478,7 +933,7 @@ func TestAgentSkipPendingConns(t *testing.T) { // Respond with an emtpty score set. select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{}: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } @@ -1501,20 +956,18 @@ func TestAgentSkipPendingConns(t *testing.T) { // The agent will now retry since the last connection attempt failed. // The heuristic again informs the agent that we need more channels. - select { - case constraints.moreChansResps <- moreChansResp{ - numMore: 1, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, + moreChansResp{ + numMore: 1, + amt: testCtx.walletBalance, + }, + ) // The node should now be marked as "failed", which should make it // being skipped during scoring. Again check that it won't be among the // score request. select { - case req := <-heuristic.nodeScoresArgs: + case req := <-testCtx.heuristic.nodeScoresArgs: if len(req.nodes) != 1 { t.Fatalf("expected %v nodes, instead "+ "had %v", 1, len(req.nodes)) @@ -1532,7 +985,7 @@ func TestAgentSkipPendingConns(t *testing.T) { Score: 0.5, } select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ nodeID2: nodeDirective2, }: case <-time.After(time.Second * 10): @@ -1552,86 +1005,30 @@ func TestAgentSkipPendingConns(t *testing.T) { func TestAgentQuitWhenPendingConns(t *testing.T) { t.Parallel() - // First, we'll create all the dependencies that we'll need in order to - // create the autopilot agent. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate key: %v", err) - } - - quit := make(chan struct{}) - defer close(quit) - - heuristic := &mockHeuristic{ - nodeScoresArgs: make(chan directiveArg), - nodeScoresResps: make(chan map[NodeID]*NodeScore), - quit: quit, - } - constraints := &mockConstraints{ - moreChansResps: make(chan moreChansResp), - quit: quit, - } - - chanController := &mockChanController{ - openChanSignals: make(chan openChanIntent), - } - memGraph, _, _ := newMemChanGraph() - - // The wallet will start with 6 BTC available. - const walletBalance = btcutil.SatoshiPerBitcoin * 6 + testCtx, cleanup := setup(t, nil) + defer cleanup() connect := make(chan chan error) - // With the dependencies we created, we can now create the initial - // agent itself. - testCfg := Config{ - Self: self, - Heuristic: heuristic, - ChanController: chanController, - WalletBalance: func() (btcutil.Amount, error) { - return walletBalance, nil - }, - ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { - errChan := make(chan error) + testCtx.agent.cfg.ConnectToPeer = func(*btcec.PublicKey, []net.Addr) (bool, error) { + errChan := make(chan error) - select { - case connect <- errChan: - case <-quit: - return false, errors.New("quit") - } + select { + case connect <- errChan: + case <-testCtx.quit: + return false, errors.New("quit") + } - select { - case err := <-errChan: - return false, err - case <-quit: - return false, errors.New("quit") - } - }, - DisconnectPeer: func(*btcec.PublicKey) error { - return nil - }, - Graph: memGraph, - Constraints: constraints, + select { + case err := <-errChan: + return false, err + case <-testCtx.quit: + return false, errors.New("quit") + } } - initialChans := []Channel{} - agent, err := New(testCfg, initialChans) - if err != nil { - t.Fatalf("unable to create agent: %v", err) - } - - // To ensure the heuristic doesn't block on quitting the agent, we'll - // use the agent's quit chan to signal when it should also stop. - heuristic.quit = agent.quit - - // With the autopilot agent and all its dependencies we'll start the - // primary controller goroutine. - if err := agent.Start(); err != nil { - t.Fatalf("unable to start agent: %v", err) - } - defer agent.Stop() // We'll only return a single directive for a pre-chosen node. - nodeKey, err := memGraph.addRandNode() + nodeKey, err := testCtx.graph.addRandNode() if err != nil { t.Fatalf("unable to generate key: %v", err) } @@ -1644,18 +1041,16 @@ func TestAgentQuitWhenPendingConns(t *testing.T) { // We'll send an initial "yes" response to advance the agent past its // initial check. This will cause it to try to get directives from the // graph. - select { - case constraints.moreChansResps <- moreChansResp{ - numMore: 1, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } + respondMoreChans(t, testCtx, + moreChansResp{ + numMore: 1, + amt: testCtx.walletBalance, + }, + ) // Check the args. select { - case req := <-heuristic.nodeScoresArgs: + case req := <-testCtx.heuristic.nodeScoresArgs: if len(req.nodes) != 1 { t.Fatalf("expected %v nodes, instead "+ "had %v", 1, len(req.nodes)) @@ -1669,7 +1064,7 @@ func TestAgentQuitWhenPendingConns(t *testing.T) { // Respond with a scored directive. select { - case heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ + case testCtx.heuristic.nodeScoresResps <- map[NodeID]*NodeScore{ NewNodeID(nodeKey): nodeDirective, }: case <-time.After(time.Second * 10): @@ -1687,7 +1082,7 @@ func TestAgentQuitWhenPendingConns(t *testing.T) { // pending connection. stopped := make(chan error) go func() { - stopped <- agent.Stop() + stopped <- testCtx.agent.Stop() }() select { From fb5b6ff425d20e40384972f8fcb4aad9bfc53477 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 5 Mar 2019 13:58:31 +0100 Subject: [PATCH 3/3] autopilot/agent_test: add TestAgentChannelSizeAllocation TestAgentChannelSizeAllocation tests that the autopilot agent opens channel of size that stays within the channel budget and size restrictions. --- autopilot/agent_test.go | 222 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 219 insertions(+), 3 deletions(-) diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index ff7a8be1..92b2b358 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -58,7 +58,7 @@ func (m *mockConstraints) MaxPendingOpens() uint16 { } func (m *mockConstraints) MinChanSize() btcutil.Amount { - return 0 + return 1e7 } func (m *mockConstraints) MaxChanSize() btcutil.Amount { return 1e8 @@ -85,13 +85,13 @@ func (m *mockHeuristic) Name() string { } func (m *mockHeuristic) NodeScores(g ChannelGraph, chans []Channel, - fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) ( + chanSize btcutil.Amount, nodes map[NodeID]struct{}) ( map[NodeID]*NodeScore, error) { if m.nodeScoresArgs != nil { directive := directiveArg{ graph: g, - amt: fundsAvailable, + amt: chanSize, chans: chans, nodes: nodes, } @@ -1094,3 +1094,219 @@ func TestAgentQuitWhenPendingConns(t *testing.T) { t.Fatalf("unable to stop agent") } } + +// respondWithScores checks that the moreChansRequest contains what we expect, +// and responds with the given node scores. +func respondWithScores(t *testing.T, testCtx *testContext, + channelBudget btcutil.Amount, existingChans, newChans int, + nodeScores map[NodeID]*NodeScore) { + + t.Helper() + + select { + case testCtx.constraints.moreChansResps <- moreChansResp{ + numMore: uint32(newChans), + amt: channelBudget, + }: + case <-time.After(time.Second * 3): + t.Fatalf("heuristic wasn't queried in time") + } + + // The agent should query for scores using the constraints returned + // above. We expect the agent to use the maximum channel size when + // opening channels. + chanSize := testCtx.constraints.MaxChanSize() + + select { + case req := <-testCtx.heuristic.nodeScoresArgs: + // All nodes in the graph should be potential channel + // candidates. + if len(req.nodes) != len(nodeScores) { + t.Fatalf("expected %v nodes, instead had %v", + len(nodeScores), len(req.nodes)) + } + + // 'existingChans' is already open. + if len(req.chans) != existingChans { + t.Fatalf("expected %d existing channel, got %v", + existingChans, len(req.chans)) + } + if req.amt != chanSize { + t.Fatalf("expected channel size of %v, got %v", + chanSize, req.amt) + } + + case <-time.After(time.Second * 3): + t.Fatalf("select wasn't queried in time") + } + + // Respond with the given scores. + select { + case testCtx.heuristic.nodeScoresResps <- nodeScores: + case <-time.After(time.Second * 3): + t.Fatalf("NodeScores wasn't queried in time") + } +} + +// checkChannelOpens asserts that the channel controller attempts open the +// number of channels we expect, and with the exact total allocation. +func checkChannelOpens(t *testing.T, testCtx *testContext, + allocation btcutil.Amount, numChans int) []NodeID { + + var nodes []NodeID + + // The agent should attempt to open channels, totaling what we expect. + var totalAllocation btcutil.Amount + chanController := testCtx.chanController.(*mockChanController) + for i := 0; i < numChans; i++ { + select { + case openChan := <-chanController.openChanSignals: + totalAllocation += openChan.amt + + testCtx.Lock() + testCtx.walletBalance -= openChan.amt + testCtx.Unlock() + + nodes = append(nodes, NewNodeID(openChan.target)) + + case <-time.After(time.Second * 3): + t.Fatalf("channel not opened in time") + } + } + + if totalAllocation != allocation { + t.Fatalf("expected agent to open channels totalling %v, "+ + "instead was %v", allocation, totalAllocation) + } + + // Finally, make sure the agent won't try opening more channels. + select { + case <-chanController.openChanSignals: + t.Fatalf("agent unexpectedly opened channel") + + case <-time.After(50 * time.Millisecond): + } + + return nodes +} + +// TestAgentChannelSizeAllocation tests that the autopilot agent opens channel +// of size that stays within the channel budget and size restrictions. +func TestAgentChannelSizeAllocation(t *testing.T) { + t.Parallel() + + // Total number of nodes in our mock graph. + const numNodes = 10 + + testCtx, cleanup := setup(t, nil) + defer cleanup() + + nodeScores := make(map[NodeID]*NodeScore) + for i := 0; i < numNodes; i++ { + nodeKey, err := testCtx.graph.addRandNode() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + nodeID := NewNodeID(nodeKey) + nodeScores[nodeID] = &NodeScore{ + NodeID: nodeID, + Score: 0.5, + } + } + + // The agent should now query the heuristic in order to determine its + // next action as it local state has now been modified. + select { + case arg := <-testCtx.constraints.moreChanArgs: + if len(arg.chans) != 0 { + t.Fatalf("expected agent to have no channels open, "+ + "had %v", len(arg.chans)) + } + if arg.balance != testCtx.walletBalance { + t.Fatalf("expectd agent to have %v balance, had %v", + testCtx.walletBalance, arg.balance) + } + case <-time.After(time.Second * 3): + t.Fatalf("heuristic wasn't queried in time") + } + + // We'll return a response telling the agent to open 5 channels, with a + // total channel budget of 5 BTC. + var channelBudget btcutil.Amount = 5 * btcutil.SatoshiPerBitcoin + numExistingChannels := 0 + numNewChannels := 5 + respondWithScores( + t, testCtx, channelBudget, numExistingChannels, + numNewChannels, nodeScores, + ) + + expectedAllocation := testCtx.constraints.MaxChanSize() * btcutil.Amount(numNewChannels) + nodes := checkChannelOpens( + t, testCtx, expectedAllocation, numNewChannels, + ) + + // Delete the selected nodes from our set of scores, to avoid scoring + // nodes we already have channels to. + for _, node := range nodes { + delete(nodeScores, node) + } + + // TODO(halseth): this loop is a hack to ensure all the attempted + // channels are accounted for. This happens because the agent will + // query the ChannelBudget before all the pending channels are added to + // the map. Fix by adding them to the pending channels map before + // executing directives in goroutines? + waitForNumChans := func(expChans int) { + t.Helper() + + Loop: + for { + select { + case arg := <-testCtx.constraints.moreChanArgs: + // As long as the number of existing channels + // is below our expected number of channels, + // we'll keep responding with "no more + // channels". + if len(arg.chans) != expChans { + select { + case testCtx.constraints.moreChansResps <- moreChansResp{0, 0}: + case <-time.After(time.Second * 3): + t.Fatalf("heuristic wasn't " + + "queried in time") + } + continue + } + + if arg.balance != testCtx.walletBalance { + t.Fatalf("expectd agent to have %v "+ + "balance, had %v", + testCtx.walletBalance, + arg.balance) + } + break Loop + + case <-time.After(time.Second * 3): + t.Fatalf("heuristic wasn't queried in time") + } + } + } + + // Wait for the agent to have 5 channels. + waitForNumChans(numNewChannels) + + // Set the channel budget to 1.5 BTC. + channelBudget = btcutil.SatoshiPerBitcoin * 3 / 2 + + // We'll return a response telling the agent to open 3 channels, with a + // total channel budget of 1.5 BTC. + numExistingChannels = 5 + numNewChannels = 3 + respondWithScores( + t, testCtx, channelBudget, numExistingChannels, + numNewChannels, nodeScores, + ) + + // To stay within the budget, we expect the autopilot to open 2 + // channels. + checkChannelOpens(t, testCtx, channelBudget, 2) +}