diff --git a/peer.go b/peer.go index 74ff4c74..6ce9302d 100644 --- a/peer.go +++ b/peer.go @@ -43,7 +43,7 @@ const ( idleTimeout = 5 * time.Minute // writeMessageTimeout is the timeout used when writing a message to peer. - writeMessageTimeout = 10 * time.Second + writeMessageTimeout = 50 * time.Second // outgoingQueueLen is the buffer size of the channel which houses // messages to be sent across the wire, requested by objects outside @@ -567,8 +567,18 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, // WaitForDisconnect waits until the peer has disconnected. A peer may be // disconnected if the local or remote side terminating the connection, or an -// irrecoverable protocol error has been encountered. -func (p *peer) WaitForDisconnect() { +// irrecoverable protocol error has been encountered. This method will only +// begin watching the peer's waitgroup after the ready channel or the peer's +// quit channel are signaled. The ready channel should only be signaled if a +// call to Start returns no error. Otherwise, if the peer fails to start, +// calling Disconnect will signal the quit channel and the method will not +// block, since no goroutines were spawned. +func (p *peer) WaitForDisconnect(ready chan struct{}) { + select { + case <-ready: + case <-p.quit: + } + p.wg.Wait() } diff --git a/server.go b/server.go index 2729fa76..bf770092 100644 --- a/server.go +++ b/server.go @@ -1917,152 +1917,6 @@ func (s *server) findPeerByPubStr(pubStr string) (*peer, error) { return peer, nil } -// peerTerminationWatcher waits until a peer has been disconnected unexpectedly, -// and then cleans up all resources allocated to the peer, notifies relevant -// sub-systems of its demise, and finally handles re-connecting to the peer if -// it's persistent. If the server intentionally disconnects a peer, it should -// have a corresponding entry in the ignorePeerTermination map which will cause -// the cleanup routine to exit early. -// -// NOTE: This MUST be launched as a goroutine. -func (s *server) peerTerminationWatcher(p *peer) { - defer s.wg.Done() - - p.WaitForDisconnect() - - srvrLog.Debugf("Peer %v has been disconnected", p) - - // If the server is exiting then we can bail out early ourselves as all - // the other sub-systems will already be shutting down. - if s.Stopped() { - return - } - - // Next, we'll cancel all pending funding reservations with this node. - // If we tried to initiate any funding flows that haven't yet finished, - // then we need to unlock those committed outputs so they're still - // available for use. - s.fundingMgr.CancelPeerReservations(p.PubKey()) - - pubKey := p.addr.IdentityKey - - // We'll also inform the gossiper that this peer is no longer active, - // so we don't need to maintain sync state for it any longer. - s.authGossiper.PruneSyncState(pubKey) - - // Tell the switch to remove all links associated with this peer. - // Passing nil as the target link indicates that all links associated - // with this interface should be closed. - // - // TODO(roasbeef): instead add a PurgeInterfaceLinks function? - links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes) - if err != nil { - srvrLog.Errorf("unable to get channel links: %v", err) - } - - for _, link := range links { - p.server.htlcSwitch.RemoveLink(link.ChanID()) - } - - s.mu.Lock() - defer s.mu.Unlock() - - // If the server has already removed this peer, we can short circuit the - // peer termination watcher and skip cleanup. - if _, ok := s.ignorePeerTermination[p]; ok { - delete(s.ignorePeerTermination, p) - - pubKey := p.PubKey() - pubStr := string(pubKey[:]) - - // If a connection callback is present, we'll go ahead and - // execute it now that previous peer has fully disconnected. If - // the callback is not present, this likely implies the peer was - // purposefully disconnected via RPC, and that no reconnect - // should be attempted. - connCallback, ok := s.scheduledPeerConnection[pubStr] - if ok { - delete(s.scheduledPeerConnection, pubStr) - connCallback() - } - return - } - - // First, cleanup any remaining state the server has regarding the peer - // in question. - s.removePeer(p) - - // Next, check to see if this is a persistent peer or not. - pubStr := string(pubKey.SerializeCompressed()) - _, ok := s.persistentPeers[pubStr] - if ok { - // We'll only need to re-launch a connection request if one - // isn't already currently pending. - if _, ok := s.persistentConnReqs[pubStr]; ok { - return - } - - // We'll ensure that we locate an advertised address to use - // within the peer's address for reconnection purposes. - // - // TODO(roasbeef): use them all? - if p.inbound { - advertisedAddr, err := s.fetchNodeAdvertisedAddr( - pubKey, - ) - if err != nil { - srvrLog.Errorf("Unable to retrieve advertised "+ - "address for node %x: %v", - pubKey.SerializeCompressed(), err) - } else { - p.addr.Address = advertisedAddr - } - } - - // Otherwise, we'll launch a new connection request in order to - // attempt to maintain a persistent connection with this peer. - connReq := &connmgr.ConnReq{ - Addr: p.addr, - Permanent: true, - } - s.persistentConnReqs[pubStr] = append( - s.persistentConnReqs[pubStr], connReq) - - // Record the computed backoff in the backoff map. - backoff := s.nextPeerBackoff(pubStr, p.StartTime()) - s.persistentPeersBackoff[pubStr] = backoff - - // Initialize a retry canceller for this peer if one does not - // exist. - cancelChan, ok := s.persistentRetryCancels[pubStr] - if !ok { - cancelChan = make(chan struct{}) - s.persistentRetryCancels[pubStr] = cancelChan - } - - // We choose not to wait group this go routine since the Connect - // call can stall for arbitrarily long if we shutdown while an - // outbound connection attempt is being made. - go func() { - srvrLog.Debugf("Scheduling connection re-establishment to "+ - "persistent peer %v in %s", p, backoff) - - select { - case <-time.After(backoff): - case <-cancelChan: - return - case <-s.quit: - return - } - - srvrLog.Debugf("Attempting to re-establish persistent "+ - "connection to peer %v", p) - - s.connMgr.Connect(connReq) - }() - } -} - // nextPeerBackoff computes the next backoff duration for a peer's pubkey using // exponential backoff. If no previous backoff was known, the default is // returned. @@ -2110,64 +1964,6 @@ func (s *server) shouldRequestGraphSync() bool { return len(s.peersByPub) <= 2 } -// peerConnected is a function that handles initialization a newly connected -// peer by adding it to the server's global list of all active peers, and -// starting all the goroutines the peer needs to function properly. The inbound -// boolean should be true if the peer initiated the connection to us. -func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, - inbound bool) { - - brontideConn := conn.(*brontide.Conn) - addr := conn.RemoteAddr() - pubKey := brontideConn.RemotePub() - - srvrLog.Infof("finalizing connection to %x, inbound=%v", - pubKey.SerializeCompressed(), inbound) - - peerAddr := &lnwire.NetAddress{ - IdentityKey: pubKey, - Address: addr, - ChainNet: activeNetParams.Net, - } - - // With the brontide connection established, we'll now craft the local - // feature vector to advertise to the remote node. - localFeatures := lnwire.NewRawFeatureVector() - - // We'll signal that we understand the data loss protection feature, - // and also that we support the new gossip query features. - localFeatures.Set(lnwire.DataLossProtectOptional) - localFeatures.Set(lnwire.GossipQueriesOptional) - - // We'll only request a full channel graph sync if we detect that that - // we aren't fully synced yet. - if s.shouldRequestGraphSync() { - // TODO(roasbeef): only do so if gossiper doesn't have active - // peers? - localFeatures.Set(lnwire.InitialRoutingSync) - } - - // Now that we've established a connection, create a peer, and it to - // the set of currently active peers. - p, err := newPeer(conn, connReq, s, peerAddr, inbound, localFeatures) - if err != nil { - srvrLog.Errorf("unable to create peer %v", err) - return - } - - // TODO(roasbeef): update IP address for link-node - // * also mark last-seen, do it one single transaction? - - // Attempt to start the peer, if we're unable to do so, then disconnect - // this peer. - if err := p.Start(); err != nil { - p.Disconnect(errors.Errorf("unable to start peer: %v", err)) - return - } - - s.addPeer(p) -} - // shouldDropConnection determines if our local connection to a remote peer // should be dropped in the case of concurrent connection establishment. In // order to deterministically decide which connection should be dropped, we'll @@ -2426,6 +2222,63 @@ func (s *server) cancelConnReqs(pubStr string, skip *uint64) { delete(s.persistentConnReqs, pubStr) } +// peerConnected is a function that handles initialization a newly connected +// peer by adding it to the server's global list of all active peers, and +// starting all the goroutines the peer needs to function properly. The inbound +// boolean should be true if the peer initiated the connection to us. +func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, + inbound bool) { + + brontideConn := conn.(*brontide.Conn) + addr := conn.RemoteAddr() + pubKey := brontideConn.RemotePub() + + srvrLog.Infof("Finalizing connection to %x, inbound=%v", + pubKey.SerializeCompressed(), inbound) + + peerAddr := &lnwire.NetAddress{ + IdentityKey: pubKey, + Address: addr, + ChainNet: activeNetParams.Net, + } + + // With the brontide connection established, we'll now craft the local + // feature vector to advertise to the remote node. + localFeatures := lnwire.NewRawFeatureVector() + + // We'll signal that we understand the data loss protection feature, + // and also that we support the new gossip query features. + localFeatures.Set(lnwire.DataLossProtectOptional) + localFeatures.Set(lnwire.GossipQueriesOptional) + + // We'll only request a full channel graph sync if we detect that that + // we aren't fully synced yet. + if s.shouldRequestGraphSync() { + // TODO(roasbeef): only do so if gossiper doesn't have active + // peers? + localFeatures.Set(lnwire.InitialRoutingSync) + } + + // Now that we've established a connection, create a peer, and it to + // the set of currently active peers. + p, err := newPeer(conn, connReq, s, peerAddr, inbound, localFeatures) + if err != nil { + srvrLog.Errorf("unable to create peer %v", err) + return + } + + // TODO(roasbeef): update IP address for link-node + // * also mark last-seen, do it one single transaction? + + s.addPeer(p) + + // Dispatch a goroutine to asynchronously start the peer. This process + // includes sending and receiving Init messages, which would be a DOS + // vector if we held the server's mutex throughout the procedure. + s.wg.Add(1) + go s.peerInitializer(p) +} + // addPeer adds the passed peer to the server's global state of all active // peers. func (s *server) addPeer(p *peer) { @@ -2453,15 +2306,49 @@ func (s *server) addPeer(p *peer) { } else { s.outboundPeers[pubStr] = p } +} - // Launch a goroutine to watch for the unexpected termination of this - // peer, which will ensure all resources are properly cleaned up, and - // re-establish persistent connections when necessary. The peer - // termination watcher will be short circuited if the peer is ever - // added to the ignorePeerTermination map, indicating that the server - // has already handled the removal of this peer. +// peerInitializer asynchronously starts a newly connected peer after it has +// been added to the server's peer map. This method sets up a +// peerTerminationWatcher for the given peer, and ensures that it executes even +// if the peer failed to start. In the event of a successful connection, this +// method reads the negotiated, local feature-bits and spawns the appropriate +// graph synchronization method. Any registered clients of NotifyWhenOnline will +// be signaled of the new peer once the method returns. +// +// NOTE: This MUST be launched as a goroutine. +func (s *server) peerInitializer(p *peer) { + defer s.wg.Done() + + // Avoid initializing peers while the server is exiting. + if s.Stopped() { + return + } + + // Create a channel that will be used to signal a successful start of + // the link. This prevents the peer termination watcher from beginning + // its duty too early. + ready := make(chan struct{}) + + // Before starting the peer, launch a goroutine to watch for the + // unexpected termination of this peer, which will ensure all resources + // are properly cleaned up, and re-establish persistent connections when + // necessary. The peer termination watcher will be short circuited if + // the peer is ever added to the ignorePeerTermination map, indicating + // that the server has already handled the removal of this peer. s.wg.Add(1) - go s.peerTerminationWatcher(p) + go s.peerTerminationWatcher(p, ready) + + // Start teh peer! If an error occurs, we Disconnect the peer, which + // will unblock the peerTerminationWatcher. + if err := p.Start(); err != nil { + p.Disconnect(errors.New("unable to start peer: %v")) + return + } + + // Otherwise, signal to the peerTerminationWatcher that the peer startup + // was successful, and to begin watching the peer's wait group. + close(ready) switch { // If the remote peer knows of the new gossip queries feature, then @@ -2490,6 +2377,11 @@ func (s *server) addPeer(p *peer) { go s.authGossiper.SynchronizeNode(p) } + pubStr := string(p.addr.IdentityKey.SerializeCompressed()) + + s.mu.Lock() + defer s.mu.Unlock() + // Check if there are listeners waiting for this peer to come online. for _, peerChan := range s.peerConnectedListeners[pubStr] { select { @@ -2501,6 +2393,155 @@ func (s *server) addPeer(p *peer) { delete(s.peerConnectedListeners, pubStr) } +// peerTerminationWatcher waits until a peer has been disconnected unexpectedly, +// and then cleans up all resources allocated to the peer, notifies relevant +// sub-systems of its demise, and finally handles re-connecting to the peer if +// it's persistent. If the server intentionally disconnects a peer, it should +// have a corresponding entry in the ignorePeerTermination map which will cause +// the cleanup routine to exit early. The passed `ready` chan is used to +// synchronize when WaitForDisconnect should begin watching on the peer's +// waitgroup. The ready chan should only be signaled if the peer starts +// successfully, otherwise the peer should be disconnected instead. +// +// NOTE: This MUST be launched as a goroutine. +func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { + defer s.wg.Done() + + p.WaitForDisconnect(ready) + + srvrLog.Debugf("Peer %v has been disconnected", p) + + // If the server is exiting then we can bail out early ourselves as all + // the other sub-systems will already be shutting down. + if s.Stopped() { + return + } + + // Next, we'll cancel all pending funding reservations with this node. + // If we tried to initiate any funding flows that haven't yet finished, + // then we need to unlock those committed outputs so they're still + // available for use. + s.fundingMgr.CancelPeerReservations(p.PubKey()) + + pubKey := p.addr.IdentityKey + + // We'll also inform the gossiper that this peer is no longer active, + // so we don't need to maintain sync state for it any longer. + s.authGossiper.PruneSyncState(pubKey) + + // Tell the switch to remove all links associated with this peer. + // Passing nil as the target link indicates that all links associated + // with this interface should be closed. + // + // TODO(roasbeef): instead add a PurgeInterfaceLinks function? + links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes) + if err != nil { + srvrLog.Errorf("unable to get channel links: %v", err) + } + + for _, link := range links { + p.server.htlcSwitch.RemoveLink(link.ChanID()) + } + + s.mu.Lock() + defer s.mu.Unlock() + + // If the server has already removed this peer, we can short circuit the + // peer termination watcher and skip cleanup. + if _, ok := s.ignorePeerTermination[p]; ok { + delete(s.ignorePeerTermination, p) + + pubKey := p.PubKey() + pubStr := string(pubKey[:]) + + // If a connection callback is present, we'll go ahead and + // execute it now that previous peer has fully disconnected. If + // the callback is not present, this likely implies the peer was + // purposefully disconnected via RPC, and that no reconnect + // should be attempted. + connCallback, ok := s.scheduledPeerConnection[pubStr] + if ok { + delete(s.scheduledPeerConnection, pubStr) + connCallback() + } + return + } + + // First, cleanup any remaining state the server has regarding the peer + // in question. + s.removePeer(p) + + // Next, check to see if this is a persistent peer or not. + pubStr := string(pubKey.SerializeCompressed()) + _, ok := s.persistentPeers[pubStr] + if ok { + // We'll only need to re-launch a connection request if one + // isn't already currently pending. + if _, ok := s.persistentConnReqs[pubStr]; ok { + return + } + + // We'll ensure that we locate an advertised address to use + // within the peer's address for reconnection purposes. + // + // TODO(roasbeef): use them all? + if p.inbound { + advertisedAddr, err := s.fetchNodeAdvertisedAddr( + pubKey, + ) + if err != nil { + srvrLog.Errorf("Unable to retrieve advertised "+ + "address for node %x: %v", + pubKey.SerializeCompressed(), err) + } else { + p.addr.Address = advertisedAddr + } + } + + // Otherwise, we'll launch a new connection request in order to + // attempt to maintain a persistent connection with this peer. + connReq := &connmgr.ConnReq{ + Addr: p.addr, + Permanent: true, + } + s.persistentConnReqs[pubStr] = append( + s.persistentConnReqs[pubStr], connReq) + + // Record the computed backoff in the backoff map. + backoff := s.nextPeerBackoff(pubStr, p.StartTime()) + s.persistentPeersBackoff[pubStr] = backoff + + // Initialize a retry canceller for this peer if one does not + // exist. + cancelChan, ok := s.persistentRetryCancels[pubStr] + if !ok { + cancelChan = make(chan struct{}) + s.persistentRetryCancels[pubStr] = cancelChan + } + + // We choose not to wait group this go routine since the Connect + // call can stall for arbitrarily long if we shutdown while an + // outbound connection attempt is being made. + go func() { + srvrLog.Debugf("Scheduling connection re-establishment to "+ + "persistent peer %v in %s", p, backoff) + + select { + case <-time.After(backoff): + case <-cancelChan: + return + case <-s.quit: + return + } + + srvrLog.Debugf("Attempting to re-establish persistent "+ + "connection to peer %v", p) + + s.connMgr.Connect(connReq) + }() + } +} + // removePeer removes the passed peer from the server's state of all active // peers. func (s *server) removePeer(p *peer) {