diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 5f3b8927..1dad460a 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -35,6 +35,11 @@ var ( // ErrGossiperShuttingDown is an error that is returned if the gossiper // is in the process of being shut down. ErrGossiperShuttingDown = errors.New("gossiper is shutting down") + + // ErrGossipSyncerNotFound signals that we were unable to find an active + // gossip syncer corresponding to a gossip query message received from + // the remote peer. + ErrGossipSyncerNotFound = errors.New("gossip syncer not found") ) // networkMsg couples a routing related wire message with the peer that @@ -919,45 +924,7 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) ( return syncer, nil } - // A known gossip syncer doesn't exist, so we may have to create one - // from scratch. To do so, we'll query for a reference directly to the - // active peer. - syncPeer, err := d.cfg.FindPeer(pub) - if err != nil { - log.Debugf("unable to find gossip peer %v: %v", - pub.SerializeCompressed(), err) - return nil, err - } - - // Finally, we'll obtain the exclusive mutex, then check again if a - // gossiper was added after we dropped the read mutex. - d.syncerMtx.Lock() - syncer, ok = d.peerSyncers[target] - if ok { - d.syncerMtx.Unlock() - return syncer, nil - } - - // At this point, a syncer doesn't yet exist, so we'll create a new one - // for the peer and return it to the caller. - encoding := lnwire.EncodingSortedPlain - syncer = newGossiperSyncer(gossipSyncerCfg{ - chainHash: d.cfg.ChainHash, - syncChanUpdates: true, - channelSeries: d.cfg.ChanSeries, - encodingType: encoding, - chunkSize: encodingTypeToChunkSize[encoding], - sendToPeer: func(msgs ...lnwire.Message) error { - return syncPeer.SendMessage(false, msgs...) - }, - }) - copy(syncer.peerPub[:], pub.SerializeCompressed()) - d.peerSyncers[target] = syncer - syncer.Start() - - d.syncerMtx.Unlock() - - return syncer, nil + return nil, ErrGossipSyncerNotFound } // networkHandler is the primary goroutine that drives this service. The roles @@ -1041,6 +1008,9 @@ func (d *AuthenticatedGossiper) networkHandler() { announcement.source, ) if err != nil { + log.Warnf("Unable to find gossip "+ + "syncer for peer=%x: %v", + announcement.peer.PubKey(), err) continue } @@ -1066,6 +1036,9 @@ func (d *AuthenticatedGossiper) networkHandler() { announcement.source, ) if err != nil { + log.Warnf("Unable to find gossip "+ + "syncer for peer=%x: %v", + announcement.source, err) continue } diff --git a/peer.go b/peer.go index c1afa0ed..de8e54b8 100644 --- a/peer.go +++ b/peer.go @@ -3,6 +3,7 @@ package main import ( "bytes" "container/list" + "errors" "fmt" "net" "sync" @@ -15,7 +16,6 @@ import ( "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" - "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/brontide" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" @@ -32,7 +32,7 @@ var ( numNodes int32 // ErrPeerExiting signals that the peer received a disconnect request. - ErrPeerExiting = errors.Errorf("peer exiting") + ErrPeerExiting = fmt.Errorf("peer exiting") ) const ( @@ -113,7 +113,7 @@ type peer struct { pubKeyBytes [33]byte // startTime is the time this peer connection was successfully - // established. It will be zero for peers that did not successfuly + // established. It will be zero for peers that did not successfully // Start(). startTime time.Time @@ -239,7 +239,7 @@ func (p *peer) Start() error { return nil } - peerLog.Tracef("peer %v starting", p) + peerLog.Tracef("Peer %v starting", p) // Exchange local and global features, the init message should be very // first between two nodes. @@ -320,6 +320,43 @@ func (p *peer) Start() error { return nil } +// initGossipSync initializes either a gossip syncer or an initial routing +// dump, depending on the negotiated synchronization method. +func (p *peer) initGossipSync() { + switch { + + // If the remote peer knows of the new gossip queries feature, then + // we'll create a new gossipSyncer in the AuthenticatedGossiper for it. + case p.remoteLocalFeatures.HasFeature(lnwire.GossipQueriesOptional): + srvrLog.Infof("Negotiated chan series queries with %x", + p.pubKeyBytes[:]) + + // We'll only request channel updates from the remote peer if + // its enabled in the config, or we're already getting updates + // from enough peers. + // + // TODO(roasbeef): craft s.t. we only get updates from a few + // peers + recvUpdates := !cfg.NoChanUpdates + + // Register the this peer's for gossip syncer with the gossiper. + // This is blocks synchronously to ensure the gossip syncer is + // registered with the gossiper before attempting to read + // messages from the remote peer. + p.server.authGossiper.InitSyncState(p, recvUpdates) + + // If the remote peer has the initial sync feature bit set, then we'll + // being the synchronization protocol to exchange authenticated channel + // graph edges/vertexes, but only if they don't know of the new gossip + // queries. + case p.remoteLocalFeatures.HasFeature(lnwire.InitialRoutingSync): + srvrLog.Infof("Requesting full table sync with %x", + p.pubKeyBytes[:]) + + go p.server.authGossiper.SynchronizeNode(p) + } +} + // QuitSignal is a method that should return a channel which will be sent upon // or closed once the backing peer exits. This allows callers using the // interface to cancel any processing in the event the backing implementation @@ -569,7 +606,7 @@ func (p *peer) Disconnect(reason error) { return } - peerLog.Debugf("Disconnecting %s, reason: %v", p, reason) + peerLog.Infof("Disconnecting %s, reason: %v", p, reason) // Ensure that the TCP connection is properly closed before continuing. p.conn.Close() @@ -877,6 +914,14 @@ func (p *peer) readHandler() { p.Disconnect(err) }) + // Initialize our negotiated gossip sync method before reading + // messages off the wire. When using gossip queries, this ensures + // a gossip syncer is active by the time query messages arrive. + // + // TODO(conner): have peer store gossip syncer directly and bypass + // gossiper? + p.initGossipSync() + discStream := newDiscMsgStream(p) discStream.Start() defer discStream.Stop() @@ -1316,7 +1361,8 @@ out: } if err != nil { - exitErr = errors.Errorf("unable to write message: %v", err) + exitErr = fmt.Errorf("unable to write "+ + "message: %v", err) break out } @@ -2048,17 +2094,15 @@ func (p *peer) handleInitMsg(msg *lnwire.Init) error { unknownLocalFeatures := p.remoteLocalFeatures.UnknownRequiredFeatures() if len(unknownLocalFeatures) > 0 { - err := errors.Errorf("Peer set unknown local feature bits: %v", + err := fmt.Errorf("Peer set unknown local feature bits: %v", unknownLocalFeatures) - peerLog.Error(err) return err } unknownGlobalFeatures := p.remoteGlobalFeatures.UnknownRequiredFeatures() if len(unknownGlobalFeatures) > 0 { - err := errors.Errorf("Peer set unknown global feature bits: %v", + err := fmt.Errorf("Peer set unknown global feature bits: %v", unknownGlobalFeatures) - peerLog.Error(err) return err } diff --git a/server.go b/server.go index 598cd7c3..c0ee4f4f 100644 --- a/server.go +++ b/server.go @@ -2356,33 +2356,6 @@ func (s *server) peerInitializer(p *peer) { // was successful, and to begin watching the peer's wait group. close(ready) - switch { - // If the remote peer knows of the new gossip queries feature, then - // we'll create a new gossipSyncer in the AuthenticatedGossiper for it. - case p.remoteLocalFeatures.HasFeature(lnwire.GossipQueriesOptional): - srvrLog.Infof("Negotiated chan series queries with %x", - p.pubKeyBytes[:]) - - // We'll only request channel updates from the remote peer if - // its enabled in the config, or we're already getting updates - // from enough peers. - // - // TODO(roasbeef): craft s.t. we only get updates from a few - // peers - recvUpdates := !cfg.NoChanUpdates - go s.authGossiper.InitSyncState(p, recvUpdates) - - // If the remote peer has the initial sync feature bit set, then we'll - // being the synchronization protocol to exchange authenticated channel - // graph edges/vertexes, but only if they don't know of the new gossip - // queries. - case p.remoteLocalFeatures.HasFeature(lnwire.InitialRoutingSync): - srvrLog.Infof("Requesting full table sync with %x", - p.pubKeyBytes[:]) - - go s.authGossiper.SynchronizeNode(p) - } - pubStr := string(p.addr.IdentityKey.SerializeCompressed()) s.mu.Lock()