diff --git a/autopilot/agent.go b/autopilot/agent.go index 8cd93be6..e91a9fc6 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -301,22 +301,26 @@ func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) { } // mergeNodeMaps merges the Agent's set of nodes that it already has active -// channels open to, with the set of nodes that are pending new channels. This -// ensures that the Agent doesn't attempt to open any "duplicate" channels to -// the same node. -func mergeNodeMaps(a map[NodeID]struct{}, b map[NodeID]struct{}, - c map[NodeID]Channel) map[NodeID]struct{} { +// channels open to, with the other sets of nodes that should be removed from +// consideration during heuristic selection. This ensures that the Agent doesn't +// attempt to open any "duplicate" channels to the same node. +func mergeNodeMaps(c map[NodeID]Channel, + skips ...map[NodeID]struct{}) map[NodeID]struct{} { - res := make(map[NodeID]struct{}, len(a)+len(b)+len(c)) - for nodeID := range a { - res[nodeID] = struct{}{} - } - for nodeID := range b { - res[nodeID] = struct{}{} + numNodes := len(c) + for _, skip := range skips { + numNodes += len(skip) } + + res := make(map[NodeID]struct{}, len(c)+numNodes) for nodeID := range c { res[nodeID] = struct{}{} } + for _, skip := range skips { + for nodeID := range skip { + res[nodeID] = struct{}{} + } + } return res } @@ -360,6 +364,11 @@ func (a *Agent) controller() { // channels with, but didn't succeed. failedNodes := make(map[NodeID]struct{}) + // pendingConns tracks the nodes that we are attempting to make + // connections to. This prevents us from making duplicate connection + // requests to the same node. + pendingConns := make(map[NodeID]struct{}) + // pendingOpens tracks the channels that we've requested to be // initiated, but haven't yet been confirmed as being fully opened. // This state is required as otherwise, we may go over our allotted @@ -481,7 +490,9 @@ func (a *Agent) controller() { // duplicate edges. connectedNodes := a.chanState.ConnectedNodes() pendingMtx.Lock() - nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens) + nodesToSkip := mergeNodeMaps(pendingOpens, + pendingConns, connectedNodes, failedNodes, + ) pendingMtx.Unlock() // If we reach this point, then according to our heuristic we @@ -507,32 +518,40 @@ func (a *Agent) controller() { log.Infof("Attempting to execute channel attachment "+ "directives: %v", spew.Sdump(chanCandidates)) + // Before proceeding, check to see if we have any slots + // available to open channels. If there are any, we will attempt + // to dispatch the retrieved directives since we can't be + // certain which ones may actually succeed. If too many + // connections succeed, we will they will be ignored and made + // available to future heuristic selections. + pendingMtx.Lock() + if uint16(len(pendingOpens)) >= a.cfg.MaxPendingOpens { + pendingMtx.Unlock() + log.Debugf("Reached cap of %v pending "+ + "channel opens, will retry "+ + "after success/failure", + a.cfg.MaxPendingOpens) + continue + } + // For each recommended attachment directive, we'll launch a // new goroutine to attempt to carry out the directive. If any // of these succeed, then we'll receive a new state update, // taking us back to the top of our controller loop. - pendingMtx.Lock() for _, chanCandidate := range chanCandidates { - // Before we proceed, we'll check to see if this - // attempt would take us past the total number of - // allowed pending opens. If so, then we'll skip this - // round and wait for an attempt to either fail or - // succeed. - if uint16(len(pendingOpens))+1 > - a.cfg.MaxPendingOpens { - - log.Debugf("Reached cap of %v pending "+ - "channel opens, will retry "+ - "after success/failure", - a.cfg.MaxPendingOpens) + // Skip candidates which we are already trying + // to establish a connection with. + nodeID := chanCandidate.NodeID + if _, ok := pendingConns[nodeID]; ok { continue } + pendingConns[nodeID] = struct{}{} go func(directive AttachmentDirective) { // We'll start out by attempting to connect to // the peer in order to begin the funding // workflow. - pub := directive.PeerKey + pub := directive.NodeKey alreadyConnected, err := a.cfg.ConnectToPeer( pub, directive.Addrs, ) @@ -548,6 +567,7 @@ func (a *Agent) controller() { // again. nodeID := NewNodeID(pub) pendingMtx.Lock() + delete(pendingConns, nodeID) failedNodes[nodeID] = struct{}{} pendingMtx.Unlock() @@ -558,24 +578,31 @@ func (a *Agent) controller() { return } - // If we were succesful, we'll track this peer - // in our set of pending opens. We do this here - // to ensure we don't stall on selecting new - // peers if the connection attempt happens to - // take too long. + // The connection was successful, though before + // progressing we must check that we have not + // already met our quota for max pending open + // channels. This can happen if multiple + // directives were spawned but fewer slots were + // available, and other successful attempts + // finished first. pendingMtx.Lock() - if uint16(len(pendingOpens))+1 > + if uint16(len(pendingOpens)) >= a.cfg.MaxPendingOpens { - - pendingMtx.Unlock() - - // Since we've reached our max number - // of pending opens, we'll disconnect - // this peer and exit. However, if we - // were previously connected to them, - // then we'll make sure to maintain the + // Since we've reached our max number of + // pending opens, we'll disconnect this + // peer and exit. However, if we were + // previously connected to them, then + // we'll make sure to maintain the // connection alive. if alreadyConnected { + // Since we succeeded in + // connecting, we won't add this + // peer to the failed nodes map, + // but we will remove it from + // pendingConns so that it can + // be retried in the future. + delete(pendingConns, nodeID) + pendingMtx.Unlock() return } @@ -589,10 +616,23 @@ func (a *Agent) controller() { pub.SerializeCompressed(), err) } + + // Now that we have disconnected, we can + // remove this node from our pending + // conns map, permitting subsequent + // connection attempts. + delete(pendingConns, nodeID) + pendingMtx.Unlock() return } - nodeID := NewNodeID(directive.PeerKey) + // If we were successful, we'll track this peer + // in our set of pending opens. We do this here + // to ensure we don't stall on selecting new + // peers if the connection attempt happens to + // take too long. + nodeID := directive.NodeID + delete(pendingConns, nodeID) pendingOpens[nodeID] = Channel{ Capacity: directive.ChanAmt, Node: nodeID, diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 113aad42..16099eeb 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -3,6 +3,7 @@ package autopilot import ( "bytes" "errors" + "fmt" "net" "sync" "testing" @@ -186,22 +187,13 @@ func TestAgentChannelOpenSignal(t *testing.T) { } defer agent.Stop() - var wg sync.WaitGroup - // We'll send an initial "no" response to advance the agent past its // initial check. - wg.Add(1) - go func() { - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - wg.Done() - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - wg.Wait() + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Next we'll signal a new channel being opened by the backing LN node, // with a capacity of 1 BTC. @@ -211,34 +203,20 @@ func TestAgentChannelOpenSignal(t *testing.T) { } agent.OnChannelOpen(newChan) - wg = sync.WaitGroup{} - // The agent should now query the heuristic in order to determine its // next action as it local state has now been modified. - wg.Add(1) - go func() { - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - // At this point, the local state of the agent should - // have also been updated to reflect that the LN node - // now has an additional channel with one BTC. - if _, ok := agent.chanState[newChan.ChanID]; !ok { - t.Fatalf("internal channel state wasn't updated") - } - - // With all of our assertions passed, we'll signal the - // main test goroutine to continue the test. - wg.Done() - return - - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // now has an additional channel with one BTC. + if _, ok := agent.chanState[newChan.ChanID]; !ok { + t.Fatalf("internal channel state wasn't updated") } - }() - // We'll wait here for either the agent to query the heuristic to be - // queried, or for the timeout above to tick. - wg.Wait() + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // There shouldn't be a call to the Select method as we've returned // "false" for NeedMoreChans above. @@ -343,7 +321,8 @@ func TestAgentChannelFailureSignal(t *testing.T) { // request attachment directives, return a fake so the agent will // attempt to open a channel. var fakeDirective = AttachmentDirective{ - PeerKey: self, + NodeKey: self, + NodeID: NewNodeID(self), ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ &net.TCPAddr{ @@ -441,55 +420,32 @@ func TestAgentChannelCloseSignal(t *testing.T) { } defer agent.Stop() - var wg sync.WaitGroup - // We'll send an initial "no" response to advance the agent past its // initial check. - wg.Add(1) - go func() { - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - wg.Done() - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - wg.Wait() + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Next, we'll close both channels which should force the agent to // re-query the heuristic. agent.OnChannelClose(initialChans[0].ChanID, initialChans[1].ChanID) - wg = sync.WaitGroup{} - // The agent should now query the heuristic in order to determine its // next action as it local state has now been modified. - wg.Add(1) - go func() { - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - // At this point, the local state of the agent should - // have also been updated to reflect that the LN node - // has no existing open channels. - if len(agent.chanState) != 0 { - t.Fatalf("internal channel state wasn't updated") - } - - // With all of our assertions passed, we'll signal the - // main test goroutine to continue the test. - wg.Done() - return - - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // has no existing open channels. + if len(agent.chanState) != 0 { + t.Fatalf("internal channel state wasn't updated") } - }() - // We'll wait here for either the agent to query the heuristic to be - // queried, or for the timeout above to tick. - wg.Wait() + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // There shouldn't be a call to the Select method as we've returned // "false" for NeedMoreChans above. @@ -567,22 +523,13 @@ func TestAgentBalanceUpdate(t *testing.T) { } defer agent.Stop() - var wg sync.WaitGroup - // We'll send an initial "no" response to advance the agent past its // initial check. - wg.Add(1) - go func() { - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - wg.Done() - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - wg.Wait() + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Next we'll send a new balance update signal to the agent, adding 5 // BTC to the amount of available funds. @@ -592,36 +539,22 @@ func TestAgentBalanceUpdate(t *testing.T) { agent.OnBalanceChange() - wg = sync.WaitGroup{} - // The agent should now query the heuristic in order to determine its // next action as it local state has now been modified. - wg.Add(1) - go func() { - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - // At this point, the local state of the agent should - // have also been updated to reflect that the LN node - // now has an additional 5BTC available. - if agent.totalBalance != walletBalance { - t.Fatalf("expected %v wallet balance "+ - "instead have %v", agent.totalBalance, - walletBalance) - } - - // With all of our assertions passed, we'll signal the - // main test goroutine to continue the test. - wg.Done() - return - - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // now has an additional 5BTC available. + if agent.totalBalance != walletBalance { + t.Fatalf("expected %v wallet balance "+ + "instead have %v", agent.totalBalance, + walletBalance) } - }() - // We'll wait here for either the agent to query the heuristic to be - // queried, or for the timeout above to tick. - wg.Wait() + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // There shouldn't be a call to the Select method as we've returned // "false" for NeedMoreChans above. @@ -695,39 +628,39 @@ func TestAgentImmediateAttach(t *testing.T) { } defer agent.Stop() - var wg sync.WaitGroup - const numChans = 5 // The very first thing the agent should do is query the NeedMoreChans // method on the passed heuristic. So we'll provide it with a response // that will kick off the main loop. - wg.Add(1) - go func() { - select { + select { - // We'll send over a response indicating that it should - // establish more channels, and give it a budget of 5 BTC to do - // so. - case heuristic.moreChansResps <- moreChansResp{true, numChans, 5 * btcutil.SatoshiPerBitcoin}: - wg.Done() - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - // We'll wait here for the agent to query the heuristic. If ti doesn't - // do so within 10 seconds, then the test will fail out. - wg.Wait() + // We'll send over a response indicating that it should + // establish more channels, and give it a budget of 5 BTC to do + // so. + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: numChans, + amt: 5 * btcutil.SatoshiPerBitcoin, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // At this point, the agent should now be querying the heuristic to // requests attachment directives. We'll generate 5 mock directives so // it can progress within its loop. directives := make([]AttachmentDirective, numChans) + nodeKeys := make(map[NodeID]struct{}) for i := 0; i < numChans; i++ { + pub, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + nodeID := NewNodeID(pub) directives[i] = AttachmentDirective{ - PeerKey: self, + NodeKey: pub, + NodeID: nodeID, ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ &net.TCPAddr{ @@ -735,26 +668,16 @@ func TestAgentImmediateAttach(t *testing.T) { }, }, } + nodeKeys[nodeID] = struct{}{} } - wg = sync.WaitGroup{} - // With our fake directives created, we'll now send then to the agent // as a return value for the Select function. - wg.Add(1) - go func() { - select { - case heuristic.directiveResps <- directives: - wg.Done() - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - // We'll wait here for either the agent to query the heuristic to be - // queried, or for the timeout above to tick. - wg.Wait() + select { + case heuristic.directiveResps <- directives: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Finally, we should receive 5 calls to the OpenChannel method with // the exact same parameters that we specified within the attachment @@ -766,11 +689,13 @@ func TestAgentImmediateAttach(t *testing.T) { t.Fatalf("invalid chan amt: expected %v, got %v", btcutil.SatoshiPerBitcoin, openChan.amt) } - if !openChan.target.IsEqual(self) { - t.Fatalf("unexpected key: expected %x, got %x", - self.SerializeCompressed(), - openChan.target.SerializeCompressed()) + nodeID := NewNodeID(openChan.target) + _, ok := nodeKeys[nodeID] + if !ok { + t.Fatalf("unexpected key: %v, not found", + nodeID) } + delete(nodeKeys, nodeID) case <-time.After(time.Second * 10): t.Fatalf("channel not opened in time") } @@ -838,42 +763,35 @@ func TestAgentPrivateChannels(t *testing.T) { defer agent.Stop() const numChans = 5 - var wg sync.WaitGroup // The very first thing the agent should do is query the NeedMoreChans // method on the passed heuristic. So we'll provide it with a response - // that will kick off the main loop. - wg.Add(1) - go func() { - defer wg.Done() - - // We'll send over a response indicating that it should - // establish more channels, and give it a budget of 5 BTC to do - // so. - resp := moreChansResp{ - needMore: true, - numMore: numChans, - amt: 5 * btcutil.SatoshiPerBitcoin, - } - select { - case heuristic.moreChansResps <- resp: - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - // We'll wait here for the agent to query the heuristic. If it doesn't - // do so within 10 seconds, then the test will fail out. - wg.Wait() + // that will kick off the main loop. We'll send over a response + // indicating that it should establish more channels, and give it a + // budget of 5 BTC to do so. + resp := moreChansResp{ + needMore: true, + numMore: numChans, + amt: 5 * btcutil.SatoshiPerBitcoin, + } + select { + case heuristic.moreChansResps <- resp: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // At this point, the agent should now be querying the heuristic to // requests attachment directives. We'll generate 5 mock directives so // it can progress within its loop. directives := make([]AttachmentDirective, numChans) for i := 0; i < numChans; i++ { + pub, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } directives[i] = AttachmentDirective{ - PeerKey: self, + NodeKey: pub, + NodeID: NewNodeID(pub), ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ &net.TCPAddr{ @@ -885,21 +803,11 @@ func TestAgentPrivateChannels(t *testing.T) { // With our fake directives created, we'll now send then to the agent // as a return value for the Select function. - wg.Add(1) - go func() { - defer wg.Done() - - select { - case heuristic.directiveResps <- directives: - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - // We'll wait here for either the agent to query the heuristic to be - // queried, or for the timeout above to tick. - wg.Wait() + select { + case heuristic.directiveResps <- directives: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Finally, we should receive 5 calls to the OpenChannel method, each // specifying that it's for a private channel. @@ -978,29 +886,19 @@ func TestAgentPendingChannelState(t *testing.T) { } defer agent.Stop() - var wg sync.WaitGroup - // Once again, we'll start by telling the agent as part of its first // query, that it needs more channels and has 3 BTC available for - // attachment. - wg.Add(1) - go func() { - select { - - // We'll send over a response indicating that it should - // establish more channels, and give it a budget of 1 BTC to do - // so. - case heuristic.moreChansResps <- moreChansResp{true, 1, btcutil.SatoshiPerBitcoin}: - wg.Done() - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - // We'll wait for the first query to be consumed. If this doesn't - // happen then the above goroutine will timeout, and fail the test. - wg.Wait() + // attachment. We'll send over a response indicating that it should + // establish more channels, and give it a budget of 1 BTC to do so. + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 1, + amt: btcutil.SatoshiPerBitcoin, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } heuristic.moreChanArgs = make(chan moreChanArg) @@ -1013,7 +911,8 @@ func TestAgentPendingChannelState(t *testing.T) { } nodeID := NewNodeID(nodeKey) nodeDirective := AttachmentDirective{ - PeerKey: nodeKey, + NodeKey: nodeKey, + NodeID: nodeID, ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ &net.TCPAddr{ @@ -1072,7 +971,7 @@ func TestAgentPendingChannelState(t *testing.T) { } if req.chans[0].Node != nodeID { t.Fatalf("wrong node ID: expected %x, got %x", - req.chans[0].Node[:], nodeID) + nodeID, req.chans[0].Node[:]) } case <-time.After(time.Second * 10): t.Fatalf("need more chans wasn't queried in time") @@ -1157,16 +1056,11 @@ func TestAgentPendingOpenChannel(t *testing.T) { // We'll send an initial "no" response to advance the agent past its // initial check. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Next, we'll signal that a new channel has been opened, but it is // still pending. @@ -1174,19 +1068,11 @@ func TestAgentPendingOpenChannel(t *testing.T) { // The agent should now query the heuristic in order to determine its // next action as its local state has now been modified. - wg.Add(1) - go func() { - defer wg.Done() - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - // We'll wait here for either the agent to query the heuristic to be - // queried, or for the timeout above to tick. - wg.Wait() + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // There shouldn't be a call to the Select method as we've returned // "false" for NeedMoreChans above. @@ -1254,21 +1140,15 @@ func TestAgentOnNodeUpdates(t *testing.T) { // We'll send an initial "yes" response to advance the agent past its // initial check. This will cause it to try to get directives from an // empty graph. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - select { - case heuristic.moreChansResps <- moreChansResp{ - needMore: true, - numMore: 2, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - wg.Wait() + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 2, + amt: walletBalance, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Send over an empty list of attachment directives, which should cause // the agent to return to waiting on a new signal. @@ -1285,21 +1165,15 @@ func TestAgentOnNodeUpdates(t *testing.T) { // In response, the agent should wake up and see if it needs more // channels. Since we haven't done anything, we will send the same // response as before since we are still trying to open channels. - var wg2 sync.WaitGroup - wg2.Add(1) - go func() { - defer wg2.Done() - select { - case heuristic.moreChansResps <- moreChansResp{ - needMore: true, - numMore: 2, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - wg2.Wait() + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 2, + amt: walletBalance, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Again the agent should pull in the next set of attachment directives. // It's not important that this list is also empty, so long as the node @@ -1310,3 +1184,180 @@ func TestAgentOnNodeUpdates(t *testing.T) { t.Fatalf("Select was not called but should have been") } } + +// TestAgentSkipPendingConns asserts that the agent will not try to make +// duplicate connection requests to the same node, even if the attachment +// heuristic instructs the agent to do so. It also asserts that the agent +// stops tracking the pending connection once it finishes. Note that in +// practice, a failed connection would be inserted into the skip map passed to +// the attachment heuristic, though this does not assert that case. +func TestAgentSkipPendingConns(t *testing.T) { + t.Parallel() + + // First, we'll create all the dependencies that we'll need in order to + // create the autopilot agent. + self, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + heuristic := &mockHeuristic{ + moreChansResps: make(chan moreChansResp), + directiveResps: make(chan []AttachmentDirective), + } + chanController := &mockChanController{ + openChanSignals: make(chan openChanIntent), + } + memGraph, _, _ := newMemChanGraph() + + // The wallet will start with 6 BTC available. + const walletBalance = btcutil.SatoshiPerBitcoin * 6 + + connect := make(chan chan error) + + // With the dependencies we created, we can now create the initial + // agent itself. + testCfg := Config{ + Self: self, + Heuristic: heuristic, + ChanController: chanController, + WalletBalance: func() (btcutil.Amount, error) { + return walletBalance, nil + }, + ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { + errChan := make(chan error) + connect <- errChan + err := <-errChan + return false, err + }, + DisconnectPeer: func(*btcec.PublicKey) error { + return nil + }, + Graph: memGraph, + MaxPendingOpens: 10, + } + initialChans := []Channel{} + agent, err := New(testCfg, initialChans) + if err != nil { + t.Fatalf("unable to create agent: %v", err) + } + + // To ensure the heuristic doesn't block on quitting the agent, we'll + // use the agent's quit chan to signal when it should also stop. + heuristic.quit = agent.quit + + // With the autopilot agent and all its dependencies we'll start the + // primary controller goroutine. + if err := agent.Start(); err != nil { + t.Fatalf("unable to start agent: %v", err) + } + defer agent.Stop() + + // We'll send an initial "yes" response to advance the agent past its + // initial check. This will cause it to try to get directives from the + // graph. + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 1, + amt: walletBalance, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + + // Next, the agent should deliver a query to the Select method of the + // heuristic. We'll only return a single directive for a pre-chosen + // node. + nodeKey, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + nodeDirective := AttachmentDirective{ + NodeKey: nodeKey, + NodeID: NewNodeID(nodeKey), + ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, + Addrs: []net.Addr{ + &net.TCPAddr{ + IP: bytes.Repeat([]byte("a"), 16), + }, + }, + } + select { + case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + + var errChan chan error + select { + case errChan = <-connect: + case <-time.After(time.Second * 10): + t.Fatalf("agent did not attempt connection") + } + + // Signal the agent to go again, now that we've tried to connect. + agent.OnNodeUpdates() + + // The heuristic again informs the agent that we need more channels. + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 1, + amt: walletBalance, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + + // Send a directive for the same node, which already has a pending conn. + select { + case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + + // This time, the agent should skip trying to connect to the node with a + // pending connection. + select { + case <-connect: + t.Fatalf("agent should not have attempted connection") + case <-time.After(time.Second * 3): + } + + // Now, timeout the original request, which should still be waiting for + // a response. + select { + case errChan <- fmt.Errorf("connection timeout"): + case <-time.After(time.Second * 10): + t.Fatalf("agent did not receive connection timeout") + } + + // Signal the agent to try again, now that there are no pending conns. + agent.OnNodeUpdates() + + // The heuristic again informs the agent that we need more channels. + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 1, + amt: walletBalance, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + + // Send a directive for the same node, which already has a pending conn. + select { + case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + + // This time, the agent should try the connection since the peer has + // been removed from the pending map. + select { + case <-connect: + case <-time.After(time.Second * 10): + t.Fatalf("agent have attempted connection") + } +} diff --git a/autopilot/interface.go b/autopilot/interface.go index 7f632d89..efa99298 100644 --- a/autopilot/interface.go +++ b/autopilot/interface.go @@ -85,10 +85,13 @@ type ChannelGraph interface { // AttachmentHeuristic. It details to which node a channel should be created // to, and also the parameters which should be used in the channel creation. type AttachmentDirective struct { - // PeerKey is the target node for this attachment directive. It can be + // NodeKey is the target node for this attachment directive. It can be // identified by its public key, and therefore can be used along with // a ChannelOpener implementation to execute the directive. - PeerKey *btcec.PublicKey + NodeKey *btcec.PublicKey + + // NodeID is the serialized compressed pubkey of the target node. + NodeID NodeID // ChanAmt is the size of the channel that should be opened, expressed // in satoshis. diff --git a/autopilot/prefattach.go b/autopilot/prefattach.go index fd1913ed..22362b7a 100644 --- a/autopilot/prefattach.go +++ b/autopilot/prefattach.go @@ -245,11 +245,12 @@ func (p *ConstrainedPrefAttachment) Select(self *btcec.PublicKey, g ChannelGraph } directives = append(directives, AttachmentDirective{ // TODO(roasbeef): need curve? - PeerKey: &btcec.PublicKey{ + NodeKey: &btcec.PublicKey{ X: pub.X, Y: pub.Y, }, - Addrs: selectedNode.Addrs(), + NodeID: NewNodeID(pub), + Addrs: selectedNode.Addrs(), }) // With the node selected, we'll add it to the set of visited diff --git a/autopilot/prefattach_test.go b/autopilot/prefattach_test.go index b05bdd52..d3440277 100644 --- a/autopilot/prefattach_test.go +++ b/autopilot/prefattach_test.go @@ -349,11 +349,11 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) { edge2Pub := edge2.Peer.PubKey() switch { - case bytes.Equal(directive.PeerKey.SerializeCompressed(), edge1Pub[:]): - case bytes.Equal(directive.PeerKey.SerializeCompressed(), edge2Pub[:]): + case bytes.Equal(directive.NodeKey.SerializeCompressed(), edge1Pub[:]): + case bytes.Equal(directive.NodeKey.SerializeCompressed(), edge2Pub[:]): default: t1.Fatalf("attached to unknown node: %x", - directive.PeerKey.SerializeCompressed()) + directive.NodeKey.SerializeCompressed()) } // As the number of funds available exceed the @@ -634,8 +634,8 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) { // We'll simulate a channel update by adding the nodes // we just establish channel with the to set of nodes // to be skipped. - skipNodes[NewNodeID(directives[0].PeerKey)] = struct{}{} - skipNodes[NewNodeID(directives[1].PeerKey)] = struct{}{} + skipNodes[NewNodeID(directives[0].NodeKey)] = struct{}{} + skipNodes[NewNodeID(directives[1].NodeKey)] = struct{}{} // If we attempt to make a call to the Select function, // without providing any new information, then we