From b2e1d6988cc355df0c33af0f2e66b7a0a7788fa8 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 30 Mar 2018 15:16:13 -0700 Subject: [PATCH 01/11] server: don't immediately add bootstrap peer to set of persistent connections --- server.go | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/server.go b/server.go index 9634040d..92ca6a42 100644 --- a/server.go +++ b/server.go @@ -690,16 +690,6 @@ func (s *server) peerBootstrapper(numTargetPeers uint32, return } - // Add bootstrapped peer as persistent to maintain - // connectivity even if we have no open channels. - targetPub := string(conn.RemotePub().SerializeCompressed()) - s.mu.Lock() - s.persistentPeers[targetPub] = struct{}{} - if _, ok := s.persistentPeersBackoff[targetPub]; !ok { - s.persistentPeersBackoff[targetPub] = defaultBackoff - } - s.mu.Unlock() - s.OutboundPeerConnected(nil, conn) }(addr) } @@ -805,16 +795,6 @@ func (s *server) peerBootstrapper(numTargetPeers uint32, return } - // Add bootstrapped peer as persistent to maintain - // connectivity even if we have no open channels. - targetPub := string(conn.RemotePub().SerializeCompressed()) - s.mu.Lock() - s.persistentPeers[targetPub] = struct{}{} - if _, ok := s.persistentPeersBackoff[targetPub]; !ok { - s.persistentPeersBackoff[targetPub] = defaultBackoff - } - s.mu.Unlock() - s.OutboundPeerConnected(nil, conn) }(addr) } From 71b8195ad016475710af43d22f8f6170c4f2a5b8 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 30 Mar 2018 15:57:15 -0700 Subject: [PATCH 02/11] funding: remove old TODO --- fundingmanager.go | 1 - 1 file changed, 1 deletion(-) diff --git a/fundingmanager.go b/fundingmanager.go index 87db5a95..2fc2e108 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -942,7 +942,6 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { return } - // TODO(roasbeef): error if funding flow already ongoing fndgLog.Infof("Recv'd fundingRequest(amt=%v, push=%v, delay=%v, "+ "pendingId=%x) from peer(%x)", amt, msg.PushAmount, msg.CsvDelay, msg.PendingChannelID, From d34c37f3dd45b38a678165495ea19aa474cab55c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 30 Mar 2018 15:59:05 -0700 Subject: [PATCH 03/11] server: when constructing peer addr's attempt to locate proper port, fallback to default port In this commit, we fix an existing bug within the codebase: if a peer connected to us inbound, then we'd attempt to use the assigned port when re-establishing a connection to them. We fix this issue in this commit by adding a new method to look up any advertisements for the peer, and use the specified port that matches our connection attempt. If we can't find a proper advertisement, then we'll simply use the default peer port. --- server.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 52 insertions(+), 3 deletions(-) diff --git a/server.go b/server.go index 92ca6a42..53ae5e55 100644 --- a/server.go +++ b/server.go @@ -1285,7 +1285,6 @@ func (s *server) peerTerminationWatcher(p *peer) { // Otherwise, we'll launch a new connection request in order to // attempt to maintain a persistent connection with this peer. - // TODO(roasbeef): look up latest info for peer in database connReq := &connmgr.ConnReq{ Addr: p.addr, Permanent: true, @@ -1337,9 +1336,19 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound bool) { brontideConn := conn.(*brontide.Conn) + addr := conn.RemoteAddr() + pubKey := brontideConn.RemotePub() + + // We'll ensure that we locate the proper port to use within the peer's + // address for reconnecting purposes. + if tcpAddr, ok := addr.(*net.TCPAddr); ok { + targetPort := s.fetchNodeAdvertisedPort(pubKey, tcpAddr) + tcpAddr.Port = targetPort + } + peerAddr := &lnwire.NetAddress{ - IdentityKey: brontideConn.RemotePub(), - Address: conn.RemoteAddr(), + IdentityKey: pubKey, + Address: addr, ChainNet: activeNetParams.Net, } @@ -1908,3 +1917,43 @@ func computeNextBackoff(currBackoff time.Duration) time.Duration { // that the backoff can tweaked by 1/20 in either direction. return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2) } + +// fetchNodeAdvertisedPort attempts to fetch the advertised port of the target +// node. If a port isn't found, then the default port will be used. +func (s *server) fetchNodeAdvertisedPort(pub *btcec.PublicKey, + targetAddr *net.TCPAddr) int { + + // If the target port is already the default peer port, then we'll + // return that. + if targetAddr.Port == defaultPeerPort { + return defaultPeerPort + } + + node, err := s.chanDB.ChannelGraph().FetchLightningNode(pub) + + // If the node wasn't found, then we'll just return the current default + // port. + if err != nil { + return defaultPeerPort + } + + // Otherwise, we'll attempt to find a matching advertised IP, and will + // then use the port for that. + for _, addr := range node.Addresses { + // We'll only examine an address if it's a TCP address. + tcpAddr, ok := addr.(*net.TCPAddr) + if !ok { + continue + } + + // If this is the matching IP, then we'll return the port that + // it has been advertised with. + if tcpAddr.IP.Equal(targetAddr.IP) { + return tcpAddr.Port + } + } + + // If we couldn't find a matching IP, then we'll just return the + // default port. + return defaultPeerPort +} From 956d20ebdcea83528065a2fa7b77d572176faf9a Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 30 Mar 2018 18:42:27 -0700 Subject: [PATCH 04/11] funding+lnd: ensure we reconnect to new channel peers In this commit, we fix a minor bug in the prior versions of lnd. Before this commit, if we received a new inbound connection for channel creation, the channel was created, and then the peer disconnected, we wouldn't automatically reconnect. In this commit we fix this issue by overloading the WatchNewChannel method to also accept the peer's ID so we can add it to the set of persistent connections. --- fundingmanager.go | 14 +++++++------- lnd.go | 15 ++++++++++++++- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 2fc2e108..0171ebb1 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -299,8 +299,9 @@ type fundingConfig struct { // WatchNewChannel is to be called once a new channel enters the final // funding stage: waiting for on-chain confirmation. This method sends // the channel to the ChainArbitrator so it can watch for any on-chain - // events related to the channel. - WatchNewChannel func(*channeldb.OpenChannel) error + // events related to the channel. We also provide the address of the + // node we're establishing a channel with for reconnection purposes. + WatchNewChannel func(*channeldb.OpenChannel, *lnwire.NetAddress) error // ReportShortChanID allows the funding manager to report the newly // discovered short channel ID of a formerly pending channel to outside @@ -952,9 +953,6 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // reservation attempt may be rejected. Note that since we're on the // responding side of a single funder workflow, we don't commit any // funds to the channel ourselves. - // - // TODO(roasbeef): assuming this was an inbound connection, replace - // port with default advertised port chainHash := chainhash.Hash(msg.ChainHash) reservation, err := f.cfg.Wallet.InitChannelReservation( amt, 0, msg.PushAmount, @@ -1355,7 +1353,8 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // Now that we've sent over our final signature for this channel, we'll // send it to the ChainArbitrator so it can watch for any on-chain // actions during this final confirmation stage. - if err := f.cfg.WatchNewChannel(completeChan); err != nil { + peerAddr := resCtx.peerAddress + if err := f.cfg.WatchNewChannel(completeChan, peerAddr); err != nil { fndgLog.Errorf("Unable to send new ChannelPoint(%v) for "+ "arbitration: %v", fundingOut, err) } @@ -1503,7 +1502,8 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { // we'll send the to be active channel to the ChainArbitrator so it can // watch for any on-chin actions before the channel has fully // confirmed. - if err := f.cfg.WatchNewChannel(completeChan); err != nil { + peerAddr := resCtx.peerAddress + if err := f.cfg.WatchNewChannel(completeChan, peerAddr); err != nil { fndgLog.Errorf("Unable to send new ChannelPoint(%v) for "+ "arbitration: %v", fundingPoint, err) } diff --git a/lnd.go b/lnd.go index 11c77bdd..c911315b 100644 --- a/lnd.go +++ b/lnd.go @@ -430,7 +430,20 @@ func lndMain() error { } return delay }, - WatchNewChannel: server.chainArb.WatchNewChannel, + WatchNewChannel: func(channel *channeldb.OpenChannel, + addr *lnwire.NetAddress) error { + + // First, we'll mark this new peer as a persistent + // re-connection purposes. + server.mu.Lock() + pubStr := string(addr.IdentityKey.SerializeCompressed()) + server.persistentPeers[pubStr] = struct{}{} + server.mu.Unlock() + + // With that taken care of, we'll send this channel to + // the chain arb so it can react to on-chain events. + return server.chainArb.WatchNewChannel(channel) + }, ReportShortChanID: func(chanPoint wire.OutPoint, sid lnwire.ShortChannelID) error { From 4480052cd6340fecb8459862a490bc0e9ea08cf3 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 30 Mar 2018 19:11:04 -0700 Subject: [PATCH 05/11] server: ensure a default backoff is always used --- server.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/server.go b/server.go index 53ae5e55..f4d8808a 100644 --- a/server.go +++ b/server.go @@ -1292,20 +1292,28 @@ func (s *server) peerTerminationWatcher(p *peer) { s.persistentConnReqs[pubStr] = append( s.persistentConnReqs[pubStr], connReq) - // Compute the subsequent backoff duration. - currBackoff := s.persistentPeersBackoff[pubStr] - nextBackoff := computeNextBackoff(currBackoff) - s.persistentPeersBackoff[pubStr] = nextBackoff + // Now, determine the appropriate backoff to use for the retry. + backoff, ok := s.persistentPeersBackoff[pubStr] + if !ok { + // If an existing backoff was unknown, use the default. + backoff = defaultBackoff + } else { + // Otherwise, use a previous backoff to compute the + // subsequent randomized exponential backoff duration. + backoff = computeNextBackoff(backoff) + } + + s.persistentPeersBackoff[pubStr] = backoff // We choose not to wait group this go routine since the Connect // call can stall for arbitrarily long if we shutdown while an // outbound connection attempt is being made. go func() { srvrLog.Debugf("Scheduling connection re-establishment to "+ - "persistent peer %v in %s", p, nextBackoff) + "persistent peer %v in %s", p, backoff) select { - case <-time.After(nextBackoff): + case <-time.After(backoff): case <-s.quit: return } From 2490b9b6e3b98351b8a1e35f9ae8ba64e14d3ba3 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 30 Mar 2018 16:19:52 -0700 Subject: [PATCH 06/11] server: cleanup persistent connection retries This commits changes the behavior of our connection reestablishment, and resolves some minor issues that could lead to uncancelled requests or an infinite connection loop. - Will not attempt to Remove connection requests with an ID of 0. This can happen for reconnect attempts that get scheduled, but have not started at the time the server cancels the connection requests. - Adds a per-peer cancellation channel, that is closed upon a successful inbound or outbound connection. The goroutine spwaned to handle the reconnect by the peerTerminationWatch now selects on this channel, and skips reconnecting if it is closed before the backoff matures. - Properly computes the backoff when no entry in persistentPeersBackoff is found. Previously, a value of 0 would be returned, cause all subsequent backoff attempts to use a backoff of 0. - Cancels a peers retries and remove connections immediately after receiving an inbound connection, to mimic the structure of OutboundPeerConnected. - Cancels all persistent connection requests after calling DisconnectPeers. - Allow additional connection attempts to peers, even if there already exists a pending connection attempt. --- server.go | 143 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 104 insertions(+), 39 deletions(-) diff --git a/server.go b/server.go index f4d8808a..326363d0 100644 --- a/server.go +++ b/server.go @@ -51,7 +51,7 @@ var ( // maximumBackoff is the largest backoff we will permit when // reattempting connections to persistent peers. - maximumBackoff = time.Minute + maximumBackoff = time.Hour ) // server is the main server of the Lightning Network Daemon. The server houses @@ -85,6 +85,7 @@ type server struct { persistentPeers map[string]struct{} persistentPeersBackoff map[string]time.Duration persistentConnReqs map[string][]*connmgr.ConnReq + persistentRetryCancels map[string]chan struct{} // ignorePeerTermination tracks peers for which the server has initiated // a disconnect. Adding a peer to this map causes the peer termination @@ -179,6 +180,7 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, persistentPeers: make(map[string]struct{}), persistentPeersBackoff: make(map[string]time.Duration), persistentConnReqs: make(map[string][]*connmgr.ConnReq), + persistentRetryCancels: make(map[string]chan struct{}), ignorePeerTermination: make(map[*peer]struct{}), peersByPub: make(map[string]*peer), @@ -1292,19 +1294,18 @@ func (s *server) peerTerminationWatcher(p *peer) { s.persistentConnReqs[pubStr] = append( s.persistentConnReqs[pubStr], connReq) - // Now, determine the appropriate backoff to use for the retry. - backoff, ok := s.persistentPeersBackoff[pubStr] - if !ok { - // If an existing backoff was unknown, use the default. - backoff = defaultBackoff - } else { - // Otherwise, use a previous backoff to compute the - // subsequent randomized exponential backoff duration. - backoff = computeNextBackoff(backoff) - } - + // Record the computed backoff in the backoff map. + backoff := s.nextPeerBackoff(pubStr) s.persistentPeersBackoff[pubStr] = backoff + // Initialize a retry canceller for this peer if one does not + // exist. + cancelChan, ok := s.persistentRetryCancels[pubStr] + if !ok { + cancelChan = make(chan struct{}) + s.persistentRetryCancels[pubStr] = cancelChan + } + // We choose not to wait group this go routine since the Connect // call can stall for arbitrarily long if we shutdown while an // outbound connection attempt is being made. @@ -1314,6 +1315,8 @@ func (s *server) peerTerminationWatcher(p *peer) { select { case <-time.After(backoff): + case <-cancelChan: + return case <-s.quit: return } @@ -1326,6 +1329,22 @@ func (s *server) peerTerminationWatcher(p *peer) { } } +// nextPeerBackoff computes the next backoff duration for a peer's pubkey using +// exponential backoff. If no previous backoff was known, the default is +// returned. +func (s *server) nextPeerBackoff(pubStr string) time.Duration { + // Now, determine the appropriate backoff to use for the retry. + backoff, ok := s.persistentPeersBackoff[pubStr] + if !ok { + // If an existing backoff was unknown, use the default. + return defaultBackoff + } + + // Otherwise, use a previous backoff to compute the + // subsequent randomized exponential backoff duration. + return computeNextBackoff(backoff) +} + // shouldRequestGraphSync returns true if the servers deems it necessary that // we sync channel graph state with the remote peer. This method is used to // avoid _always_ syncing channel graph state with each peer that connects. @@ -1434,8 +1453,6 @@ func (s *server) InboundPeerConnected(conn net.Conn) { srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr()) - localPub := s.identityPriv.PubKey() - // Check to see if we already have a connection with this peer. If so, // we may need to drop our existing connection. This prevents us from // having duplicate connections to the same peer. We forgo adding a @@ -1452,6 +1469,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) { // connection we've already established should be kept, then // we'll close out this connection s.t there's only a single // connection between us. + localPub := s.identityPriv.PubKey() if !shouldDropLocalConnection(localPub, nodePub) { srvrLog.Warnf("Received inbound connection from "+ "peer %x, but already connected, dropping conn", @@ -1472,15 +1490,9 @@ func (s *server) InboundPeerConnected(conn net.Conn) { s.ignorePeerTermination[connectedPeer] = struct{}{} } - // Next, check to see if we have any outstanding persistent connection - // requests to this peer. If so, then we'll remove all of these - // connection requests, and also delete the entry from the map. - if connReqs, ok := s.persistentConnReqs[pubStr]; ok { - for _, connReq := range connReqs { - s.connMgr.Remove(connReq.ID()) - } - delete(s.persistentConnReqs, pubStr) - } + // Lastly, cancel all pending requests. The incoming connection will not + // have an associated connection request. + s.cancelConnReqs(pubStr, nil) s.peerConnected(conn, nil, false) } @@ -1495,7 +1507,6 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) return } - localPub := s.identityPriv.PubKey() nodePub := conn.(*brontide.Conn).RemotePub() pubStr := string(nodePub.SerializeCompressed()) @@ -1506,29 +1517,31 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) // this new connection. if _, ok := s.outboundPeers[pubStr]; ok { srvrLog.Debugf("Ignoring duplicate outbound connection") + if connReq != nil { + s.connMgr.Remove(connReq.ID()) + } conn.Close() return } if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil { srvrLog.Debugf("Ignoring cancelled outbound connection") + s.connMgr.Remove(connReq.ID()) conn.Close() return } srvrLog.Infof("Established connection to: %v", conn.RemoteAddr()) - // As we've just established an outbound connection to this peer, we'll - // cancel all other persistent connection requests and eliminate the - // entry for this peer from the map. - if connReqs, ok := s.persistentConnReqs[pubStr]; ok { - for _, pConnReq := range connReqs { - if connReq != nil && - pConnReq.ID() != connReq.ID() { - - s.connMgr.Remove(pConnReq.ID()) - } - } - delete(s.persistentConnReqs, pubStr) + if connReq != nil { + // A successful connection was returned by the connmgr. + // Immediately cancel all pending requests, excluding the + // outbound connection we just established. + ignore := connReq.ID() + s.cancelConnReqs(pubStr, &ignore) + } else { + // This was a successful connection made by some other + // subsystem. Remove all requests being managed by the connmgr. + s.cancelConnReqs(pubStr, nil) } // If we already have a connection with this peer, decide whether or not @@ -1546,6 +1559,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) // If our (this) connection should be dropped, then we'll do // so, in order to ensure we don't have any duplicate // connections. + localPub := s.identityPriv.PubKey() if shouldDropLocalConnection(localPub, nodePub) { srvrLog.Warnf("Established outbound connection to "+ "peer %x, but already connected, dropping conn", @@ -1573,6 +1587,55 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) s.peerConnected(conn, connReq, true) } +// UnassignedConnID is the default connection ID that a request can have before +// it actually is submitted to the connmgr. +// TODO(conner): move into connmgr package, or better, add connmgr method for +// generating atomic IDs +const UnassignedConnID uint64 = 0 + +// cancelConnReqs stops all persistent connection requests for a given pubkey. +// Any attempts initiated by the peerTerminationWatcher are canceled first. +// Afterwards, each connection request removed from the connmgr. The caller can +// optionally specify a connection ID to ignore, which prevents us from +// canceling a successful request. All persistent connreqs for the provided +// pubkey are discarded after the operationjw. +func (s *server) cancelConnReqs(pubStr string, skip *uint64) { + // First, cancel any lingering persistent retry attempts, which will + // prevent retries for any with backoffs that are still maturing. + if cancelChan, ok := s.persistentRetryCancels[pubStr]; ok { + close(cancelChan) + delete(s.persistentRetryCancels, pubStr) + } + + // Next, check to see if we have any outstanding persistent connection + // requests to this peer. If so, then we'll remove all of these + // connection requests, and also delete the entry from the map. + connReqs, ok := s.persistentConnReqs[pubStr] + if !ok { + return + } + + for _, connReq := range connReqs { + // Atomically capture the current request identifier. + connID := connReq.ID() + + // Skip any zero IDs, this indicates the request has not + // yet been schedule. + if connID == UnassignedConnID { + continue + } + + // Skip a particular connection ID if instructed. + if skip != nil && connID == *skip { + continue + } + + s.connMgr.Remove(connID) + } + + delete(s.persistentConnReqs, pubStr) +} + // addPeer adds the passed peer to the server's global state of all active // peers. func (s *server) addPeer(p *peer) { @@ -1710,9 +1773,9 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error { // If there's already a pending connection request for this pubkey, // then we ignore this request to ensure we don't create a redundant // connection. - if _, ok := s.persistentConnReqs[targetPub]; ok { - s.mu.Unlock() - return fmt.Errorf("connection attempt to %v is pending", addr) + if reqs, ok := s.persistentConnReqs[targetPub]; ok { + srvrLog.Warnf("Already have %d persistent connection "+ + "requests for %v, connecting anyway.", len(reqs), addr) } // If there's not already a pending or active connection to this node, @@ -1777,6 +1840,8 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error { srvrLog.Infof("Disconnecting from %v", peer) + //s.cancelConnReqs(pubStr, nil) + // If this peer was formerly a persistent connection, then we'll remove // them from this map so we don't attempt to re-connect after we // disconnect. From 0d8f4f4be45648f8b7e3919ab66583b0bbedcfd9 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 30 Mar 2018 22:31:37 -0700 Subject: [PATCH 07/11] lntest/harness: harden ConnectEnsure Alters the behavior of ConnectEnsure to initiate a connection attempt in both directions. Additionally, the wait predicate only returns true after cross checking both peer lists. --- lntest/harness.go | 86 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 60 insertions(+), 26 deletions(-) diff --git a/lntest/harness.go b/lntest/harness.go index 074a0ca0..a582894d 100644 --- a/lntest/harness.go +++ b/lntest/harness.go @@ -1,6 +1,7 @@ package lntest import ( + "errors" "fmt" "io/ioutil" "strings" @@ -271,53 +272,86 @@ func (n *NetworkHarness) NewNode(extraArgs []string) (*HarnessNode, error) { // been made, the method will block until the two nodes appear in each other's // peers list, or until the 15s timeout expires. func (n *NetworkHarness) EnsureConnected(ctx context.Context, a, b *HarnessNode) error { - bobInfo, err := b.GetInfo(ctx, &lnrpc.GetInfoRequest{}) - if err != nil { - return err + // errConnectionRequested is used to signal that a connection was + // requested successfully, which is distinct from already being + // connected to the peer. + errConnectionRequested := errors.New("connection request in progress") + + tryConnect := func(a, b *HarnessNode) error { + ctxt, _ := context.WithTimeout(ctx, 15*time.Second) + bInfo, err := b.GetInfo(ctxt, &lnrpc.GetInfoRequest{}) + if err != nil { + return err + } + + req := &lnrpc.ConnectPeerRequest{ + Addr: &lnrpc.LightningAddress{ + Pubkey: bInfo.IdentityPubkey, + Host: b.cfg.P2PAddr(), + }, + } + + ctxt, _ = context.WithTimeout(ctx, 15*time.Second) + _, err = a.ConnectPeer(ctxt, req) + switch { + + // Request was successful, wait for both to display the + // connection. + case err == nil: + return errConnectionRequested + + // If the two are already connected, we return early with no + // error. + case strings.Contains(err.Error(), "already connected to peer"): + return nil + + default: + return err + } } - req := &lnrpc.ConnectPeerRequest{ - Addr: &lnrpc.LightningAddress{ - Pubkey: bobInfo.IdentityPubkey, - Host: b.cfg.P2PAddr(), - }, - } - - _, err = a.ConnectPeer(ctx, req) + aErr := tryConnect(a, b) + bErr := tryConnect(b, a) switch { - - // Request was successful, wait for both to display the connection. - case err == nil: - - // If we already have pending connection, we will wait until bob appears - // in alice's peer list. - case strings.Contains(err.Error(), "connection attempt to ") && - strings.Contains(err.Error(), " is pending"): - - // If the two are already connected, we return early with no error. - case strings.Contains(err.Error(), "already connected to peer"): + case aErr == nil && bErr == nil: + // If both reported already being connected to each other, we + // can exit early. return nil + case aErr != errConnectionRequested: + // Return any critical errors returned by either alice. + return aErr + + case bErr != errConnectionRequested: + // Return any critical errors returned by either bob. + return bErr + default: - return err + // Otherwise one or both requested a connection, so we wait for + // the peers lists to reflect the connection. } - err = WaitPredicate(func() bool { + findSelfInPeerList := func(a, b *HarnessNode) bool { // If node B is seen in the ListPeers response from node A, // then we can exit early as the connection has been fully // established. - resp, err := a.ListPeers(ctx, &lnrpc.ListPeersRequest{}) + ctxt, _ := context.WithTimeout(ctx, 15*time.Second) + resp, err := b.ListPeers(ctxt, &lnrpc.ListPeersRequest{}) if err != nil { return false } for _, peer := range resp.Peers { - if peer.PubKey == b.PubKeyStr { + if peer.PubKey == a.PubKeyStr { return true } } return false + } + + err := WaitPredicate(func() bool { + return findSelfInPeerList(a, b) && findSelfInPeerList(b, a) }, time.Second*15) if err != nil { return fmt.Errorf("peers not connected within 15 seconds") From 0449c0d50bd3f207f879efc923a08317aee88c12 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 30 Mar 2018 19:39:11 -0700 Subject: [PATCH 08/11] lntest/node: advertise external IP in itests --- lntest/node.go | 1 + 1 file changed, 1 insertion(+) diff --git a/lntest/node.go b/lntest/node.go index 0fd0b97d..0b377b63 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -147,6 +147,7 @@ func (cfg nodeConfig) genArgs() []string { args = append(args, fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr())) args = append(args, fmt.Sprintf("--restlisten=%v", cfg.RESTAddr())) args = append(args, fmt.Sprintf("--listen=%v", cfg.P2PAddr())) + args = append(args, fmt.Sprintf("--externalip=%v", cfg.P2PAddr())) args = append(args, fmt.Sprintf("--logdir=%v", cfg.LogDir)) args = append(args, fmt.Sprintf("--datadir=%v", cfg.DataDir)) args = append(args, fmt.Sprintf("--tlscertpath=%v", cfg.TLSCertPath)) From 517135dd9a0746aee56236945d3604f802bce40c Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 30 Mar 2018 19:59:58 -0700 Subject: [PATCH 09/11] lnd_test: use EnsureConnected in itests hot spots Uses EnsureConnected when reconnecting nodes in: - switch offline delivery persistence - graph topology notifications - channel funding persistence --- lnd_test.go | 49 +++++++++++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/lnd_test.go b/lnd_test.go index 36a2fcb4..bdbac159 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -1147,24 +1147,8 @@ func testChannelFundingPersistence(net *lntest.NetworkHarness, t *harnessTest) { // The following block ensures that after both nodes have restarted, // they have reconnected before the execution of the next test. - peersTimeout := time.After(15 * time.Second) - checkPeersTick := time.NewTicker(100 * time.Millisecond) - defer checkPeersTick.Stop() -peersPoll: - for { - select { - case <-peersTimeout: - t.Fatalf("peers unable to reconnect after restart") - case <-checkPeersTick.C: - peers, err := carol.ListPeers(ctxb, - &lnrpc.ListPeersRequest{}) - if err != nil { - t.Fatalf("ListPeers error: %v\n", err) - } - if len(peers.Peers) > 0 { - break peersPoll - } - } + if err := net.EnsureConnected(ctxb, net.Alice, carol); err != nil { + t.Fatalf("peers unable to reconnect after restart: %v", err) } // Next, mine enough blocks s.t the channel will open with a single @@ -1251,6 +1235,11 @@ func testChannelBalance(net *lntest.NetworkHarness, t *harnessTest) { } } + // Before beginning, make sure alice and bob are connected. + if err := net.EnsureConnected(ctx, net.Alice, net.Bob); err != nil { + t.Fatalf("unable to connect alice and bob: %v", err) + } + chanPoint := openChannelAndAssert(ctx, t, net, net.Alice, net.Bob, amount, 0) @@ -4610,7 +4599,7 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) // and Carol. Note that we will also receive a node announcement from // Bob, since a node will update its node announcement after a new // channel is opened. - if err := net.ConnectNodes(ctxb, net.Alice, net.Bob); err != nil { + if err := net.EnsureConnected(ctxb, net.Alice, net.Bob); err != nil { t.Fatalf("unable to connect alice to bob: %v", err) } @@ -7172,7 +7161,7 @@ func testSwitchOfflineDelivery(net *lntest.NetworkHarness, t *harnessTest) { // Now that the settles have reached Dave, reconnect him with Alice, // allowing the settles to return to the sender. ctxt, _ = context.WithTimeout(ctxb, timeout) - if err := net.ConnectNodes(ctxt, dave, net.Alice); err != nil { + if err := net.EnsureConnected(ctxt, dave, net.Alice); err != nil { t.Fatalf("unable to reconnect alice to dave: %v", err) } @@ -7481,8 +7470,16 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness t.Fatalf("unable to reconnect alice to dave: %v", err) } - // After Dave reconnects, the settles should be propagated all the way - // back to the sender. All nodes should report no active htlcs. + // Force Dave and Alice to reconnect before waiting for the htlcs to + // clear. + ctxt, _ = context.WithTimeout(ctxb, timeout) + err = net.EnsureConnected(ctxt, dave, net.Alice) + if err != nil { + t.Fatalf("unable to reconnect dave and carol: %v", err) + } + + // After reconnection succeeds, the settles should be propagated all the + // way back to the sender. All nodes should report no active htlcs. err = lntest.WaitPredicate(func() bool { return assertNumActiveHtlcs(nodes, 0) }, time.Second*15) @@ -7528,6 +7525,14 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness payReqs = []string{resp.PaymentRequest} + // Before completing the final payment request, ensure that the + // connection between Dave and Carol has been healed. + ctxt, _ = context.WithTimeout(ctxb, timeout) + err = net.EnsureConnected(ctxt, dave, carol) + if err != nil { + t.Fatalf("unable to reconnect dave and carol: %v", err) + } + // Using Carol as the source, pay to the 5 invoices from Bob created // above. ctxt, _ = context.WithTimeout(ctxb, timeout) From 3eb53467e08b402cdab8e97a270ff8571d774989 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 30 Mar 2018 23:41:59 -0700 Subject: [PATCH 10/11] fundingmanager_test: init WatchNewChannel --- fundingmanager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 012b7d32..27d9271b 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -300,7 +300,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, return uint16(lnwallet.MaxHTLCNumber / 2) }, ArbiterChan: arbiterChan, - WatchNewChannel: func(*channeldb.OpenChannel) error { + WatchNewChannel: func(*channeldb.OpenChannel, *lnwire.NetAddress) error { return nil }, ReportShortChanID: func(wire.OutPoint, lnwire.ShortChannelID) error { From 7db7c9c493532201686ab083780c4a8476236b5f Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Sat, 31 Mar 2018 00:45:35 -0700 Subject: [PATCH 11/11] lnd: correct docs in WatchNewChannel --- lnd.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lnd.go b/lnd.go index c911315b..9a0f767f 100644 --- a/lnd.go +++ b/lnd.go @@ -433,8 +433,8 @@ func lndMain() error { WatchNewChannel: func(channel *channeldb.OpenChannel, addr *lnwire.NetAddress) error { - // First, we'll mark this new peer as a persistent - // re-connection purposes. + // First, we'll mark this new peer as a persistent peer + // for re-connection purposes. server.mu.Lock() pubStr := string(addr.IdentityKey.SerializeCompressed()) server.persistentPeers[pubStr] = struct{}{}