diff --git a/peer.go b/peer.go index fd7c0f89..c57c01fa 100644 --- a/peer.go +++ b/peer.go @@ -342,21 +342,6 @@ func (p *peer) Disconnect() { close(p.quit) - // If this connection was established persistently, then notify the - // connection manager that the peer has been disconnected. - if p.connReq != nil { - p.server.connMgr.Disconnect(p.connReq.ID()) - } - - // Launch a goroutine to clean up the remaining resources. - go func() { - // Tell the switch to unregister all links associated with this - // peer. Passing nil as the target link indicates that all - // links associated with this interface should be closed. - p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, nil) - - p.server.donePeers <- p - }() } // String returns the string representation of this peer. diff --git a/server.go b/server.go index 0bce7ece..b60a258a 100644 --- a/server.go +++ b/server.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "crypto/rand" "crypto/sha256" "encoding/hex" @@ -52,6 +53,10 @@ type server struct { peersByID map[int32]*peer peersByPub map[string]*peer + persistentPeers map[string]struct{} + inboundPeers map[string]*peer + outboundPeers map[string]*peer + rpcServer *rpcServer chainNotifier chainntnfs.ChainNotifier @@ -137,10 +142,13 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, sphinx: sphinx.NewRouter(privKey, activeNetParams.Params), lightningID: sha256.Sum256(serializedPubKey), + persistentPeers: make(map[string]struct{}), persistentConnReqs: make(map[string][]*connmgr.ConnReq), - peersByID: make(map[int32]*peer), - peersByPub: make(map[string]*peer), + peersByID: make(map[int32]*peer), + peersByPub: make(map[string]*peer), + inboundPeers: make(map[string]*peer), + outboundPeers: make(map[string]*peer), newPeers: make(chan *peer, 10), donePeers: make(chan *peer, 10), @@ -625,6 +633,55 @@ func (s *server) findPeer(peerKey *btcec.PublicKey) (*peer, error) { return peer, nil } +// peerTerminationWatcher waits until a peer has been disconnected, and then +// cleans up all resources allocated to the peer, notifies relevant sub-systems +// of its demise, and finally handles re-connecting to the peer if it's +// persistent. +// +// NOTE: This MUST be launched as a goroutine. +func (s *server) peerTerminationWatcher(p *peer) { + p.WaitForDisconnect() + + srvrLog.Debugf("Peer %v has been disconnected", p) + + // Tell the switch to unregister all links associated with this peer. + // Passing nil as the target link indicates that all + // links associated with this interface should be closed. + p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, nil) + + // Send the peer to be garbage collected by the server. + p.server.donePeers <- p + + // If this peer had an active persistent connection request, then we + // can remove this as we manually decide below if we should attempt to + // re-connect. + if p.connReq != nil { + s.connMgr.Remove(p.connReq.ID()) + } + + // Next, check to see if this is a persistent peer or not. + pubStr := string(p.addr.IdentityKey.SerializeCompressed()) + if _, ok := s.persistentPeers[pubStr]; ok { + srvrLog.Debugf("Attempting to re-establish persistent "+ + "connection to peer %v", p) + + // If so, then we'll attempt to re-establish a persistent + // connection to the peer. + // TODO(roasbeef): get latest port info? + connReq := &connmgr.ConnReq{ + Addr: p.addr, + Permanent: true, + } + + s.pendingConnMtx.Lock() + s.persistentConnReqs[pubStr] = append(s.persistentConnReqs[pubStr], + connReq) + s.pendingConnMtx.Unlock() + + go s.connMgr.Connect(connReq) + } +} + // peerConnected is a function that handles initialization a newly connected // peer by adding it to the server's global list of all active peers, and // starting all the goroutines the peer needs to function properly. @@ -641,15 +698,14 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound p, err := newPeer(conn, connReq, s, peerAddr, inbound) if err != nil { srvrLog.Errorf("unable to create peer %v", err) - if connReq != nil { - s.connMgr.Remove(connReq.ID()) - } return } // TODO(roasbeef): update IP address for link-node // * also mark last-seen, do it one single transaction? + // Attempt to start the peer, if we're unable to do so, then disconnect + // this peer. if err := p.Start(); err != nil { srvrLog.Errorf("unable to start peer: %v", err) p.Disconnect() @@ -659,6 +715,22 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound s.newPeers <- p } +// shouldDropConnection determines if our local connection to a remote peer +// should be dropped in the case of concurrent connection establishment. In +// order to deterministically decide which connection should be dropped, we'll +// utilize the ordering of the local and remote public key. If we didn't use +// such a tie breaker, then we risk _both_ connections erroneously being +// dropped. +func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool { + localPubBytes := local.SerializeCompressed() + remotePubPbytes := remote.SerializeCompressed() + + // The connection that comes from the node with a "smaller" pubkey should + // be kept. Therefore, if our pubkey is "greater" than theirs, we should + // drop our established connection. + return bytes.Compare(localPubBytes, remotePubPbytes) > 0 +} + // inboundPeerConnected initializes a new peer in response to a new inbound // connection. func (s *server) inboundPeerConnected(conn net.Conn) { @@ -667,30 +739,45 @@ func (s *server) inboundPeerConnected(conn net.Conn) { srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr()) + localPub := s.identityPriv.PubKey() nodePub := conn.(*brontide.Conn).RemotePub() - // If we already have an outbound connection to this peer, simply drop - // the connection. + // Check to see if we should drop our connection, if not, then we'll + // close out this connection with the remote peer. This + // prevents us from having duplicate connections, or none. pubStr := string(nodePub.SerializeCompressed()) - if _, ok := s.peersByPub[pubStr]; ok { - srvrLog.Errorf("Received inbound connection from peer %x, but "+ - "already connected, dropping conn", - nodePub.SerializeCompressed()) - conn.Close() - return + if connectedPeer, ok := s.peersByPub[pubStr]; ok { + // If the connection we've already established should be kept, + // then we'll close out this connection s.t there's only a + // single connection between us. + if !shouldDropLocalConnection(localPub, nodePub) { + srvrLog.Warnf("Received inbound connection from "+ + "peer %x, but already connected, dropping conn", + nodePub.SerializeCompressed()) + conn.Close() + return + } + + // Otherwise, if we should drop the connection, then we'll + // disconnect our already connected peer, and also send the + // peer to the peer garbage collection goroutine. + srvrLog.Debugf("Disconnecting stale connection to %v", + connectedPeer) + connectedPeer.Disconnect() + s.donePeers <- connectedPeer } - // However, if we receive an incoming connection from a peer we're - // attempting to maintain a persistent connection with then we need to - // cancel the ongoing connection attempts to ensure that we don't end - // up with a duplicate connecting to the same peer. - s.pendingConnMtx.RLock() + // Next, check to see if we have any outstanding persistent connection + // requests to this peer. If so, then we'll remove all of these + // connection requests, and also delete the entry from the map. + s.pendingConnMtx.Lock() if connReqs, ok := s.persistentConnReqs[pubStr]; ok { for _, connReq := range connReqs { s.connMgr.Remove(connReq.ID()) } + delete(s.persistentConnReqs, pubStr) } - s.pendingConnMtx.RUnlock() + s.pendingConnMtx.Unlock() go s.peerConnected(conn, nil, false) } @@ -701,22 +788,64 @@ func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) s.peersMtx.Lock() defer s.peersMtx.Unlock() - srvrLog.Infof("Established connection to: %v", conn.RemoteAddr()) - + localPub := s.identityPriv.PubKey() nodePub := conn.(*brontide.Conn).RemotePub() - - // If we already have an inbound connection from this peer, simply drop - // the connection. pubStr := string(nodePub.SerializeCompressed()) - if _, ok := s.peersByPub[pubStr]; ok { - srvrLog.Errorf("Established outbound connection to peer %x, but "+ - "already connected, dropping conn", - nodePub.SerializeCompressed()) - s.connMgr.Remove(connReq.ID()) + + // If we already have an outbound connection to this peer, then ignore + // this new connection. + if _, ok := s.outboundPeers[pubStr]; ok { + srvrLog.Debugf("Ignoring duplicate outbound connection") conn.Close() return } + if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil { + srvrLog.Debugf("Ignoring cancelled outbound connection") + conn.Close() + return + } + + srvrLog.Infof("Established connection to: %v", conn.RemoteAddr()) + + // As we've just established an outbound connection to this peer, we'll + // cancel all other persistent connection requests and eliminate the + // entry for this peer from the map. + s.pendingConnMtx.Lock() + if connReqs, ok := s.persistentConnReqs[pubStr]; ok { + for _, pConnReq := range connReqs { + if pConnReq.ID() != connReq.ID() { + s.connMgr.Remove(pConnReq.ID()) + } + } + delete(s.persistentConnReqs, pubStr) + } + s.pendingConnMtx.Unlock() + + // If we already have an inbound connection from this peer, then we'll + // check to see _which_ of our connections should be dropped. + if connectedPeer, ok := s.peersByPub[pubStr]; ok { + // If our (this) connection should be dropped, then we'll do + // so, in order to ensure we don't have any duplicate + // connections. + if shouldDropLocalConnection(localPub, nodePub) { + srvrLog.Warnf("Established outbound connection to "+ + "peer %x, but already connected, dropping conn", + nodePub.SerializeCompressed()) + s.connMgr.Remove(connReq.ID()) + conn.Close() + return + } + + // Otherwise, _their_ connection should be dropped. So we'll + // disconnect the peer and send the now obsolete peer to the + // server for garbage collection. + srvrLog.Debugf("Disconnecting stale connection to %v", + connectedPeer) + connectedPeer.Disconnect() + s.donePeers <- connectedPeer + } + go s.peerConnected(conn, connReq, true) } @@ -729,7 +858,7 @@ func (s *server) addPeer(p *peer) { // Ignore new peers if we're shutting down. if atomic.LoadInt32(&s.shutdown) != 0 { - p.Stop() + p.Disconnect() return } @@ -738,10 +867,25 @@ func (s *server) addPeer(p *peer) { // TODO(roasbeef): pipe all requests through to the // queryHandler/peerManager s.peersMtx.Lock() + + pubStr := string(p.addr.IdentityKey.SerializeCompressed()) + s.peersByID[p.id] = p - s.peersByPub[string(p.addr.IdentityKey.SerializeCompressed())] = p + s.peersByPub[pubStr] = p + + if p.inbound { + s.inboundPeers[pubStr] = p + } else { + s.outboundPeers[pubStr] = p + } + s.peersMtx.Unlock() + // Launch a goroutine to watch for the termination of this peer so we + // can ensure all resources are properly cleaned up and if need be + // connections are re-established. + go s.peerTerminationWatcher(p) + // Once the peer has been added to our indexes, send a message to the // channel router so we can synchronize our view of the channel graph // with this new peer. @@ -762,17 +906,23 @@ func (s *server) removePeer(p *peer) { // As the peer is now finished, ensure that the TCP connection is // closed and all of its related goroutines have exited. - if err := p.Stop(); err != nil { - peerLog.Errorf("unable to stop peer: %v", err) - } + p.Disconnect() // Ignore deleting peers if we're shutting down. if atomic.LoadInt32(&s.shutdown) != 0 { return } + pubStr := string(p.addr.IdentityKey.SerializeCompressed()) + delete(s.peersByID, p.id) - delete(s.peersByPub, string(p.addr.IdentityKey.SerializeCompressed())) + delete(s.peersByPub, pubStr) + + if p.inbound { + delete(s.inboundPeers, pubStr) + } else { + delete(s.outboundPeers, pubStr) + } } // connectPeerMsg is a message requesting the server to open a connection to a