diff --git a/discovery/mock_test.go b/discovery/mock_test.go index 9e945193..84ba8636 100644 --- a/discovery/mock_test.go +++ b/discovery/mock_test.go @@ -1,6 +1,7 @@ package discovery import ( + "errors" "net" "sync" @@ -30,6 +31,7 @@ func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { select { case p.sentMsgs <- msg: case <-p.quit: + return errors.New("peer disconnected") } } diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index f95a784a..06022518 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -190,6 +190,15 @@ func (m *SyncManager) syncerHandler() { // attempt an initial historical sync when a new peer connects. attemptInitialHistoricalSync = true + // initialHistoricalSyncCompleted serves as a barrier when + // initializing new active GossipSyncers. If false, the initial + // historical sync has not completed, so we'll defer + // initializing any active GossipSyncers. If true, then we can + // transition the GossipSyncer immediately. We set up this + // barrier to ensure we have most of the graph before attempting + // to accept new updates at tip. + initialHistoricalSyncCompleted = false + // initialHistoricalSyncer is the syncer we are currently // performing an initial historical sync with. initialHistoricalSyncer *GossipSyncer @@ -221,10 +230,18 @@ func (m *SyncManager) syncerHandler() { // If we've exceeded our total number of active syncers, // we'll initialize this GossipSyncer as passive. case len(m.activeSyncers) >= m.cfg.NumActiveSyncers: + fallthrough + + // Otherwise, it should be initialized as active. If the + // initial historical sync has yet to complete, then + // we'll declare is as passive and attempt to transition + // it when the initial historical sync completes. + case !initialHistoricalSyncCompleted: s.setSyncType(PassiveSync) m.inactiveSyncers[s.cfg.peerPub] = s - // Otherwise, it should be initialized as active. + // The initial historical sync has completed, so we can + // immediately start the GossipSyncer as active. default: s.setSyncType(ActiveSync) m.activeSyncers[s.cfg.peerPub] = s @@ -310,6 +327,32 @@ func (m *SyncManager) syncerHandler() { case <-initialHistoricalSyncSignal: initialHistoricalSyncer = nil initialHistoricalSyncSignal = nil + initialHistoricalSyncCompleted = true + + log.Debug("Initial historical sync completed") + + // With the initial historical sync complete, we can + // begin receiving new graph updates at tip. We'll + // determine whether we can have any more active + // GossipSyncers. If we do, we'll randomly select some + // that are currently passive to transition. + m.syncersMu.Lock() + numActiveLeft := m.cfg.NumActiveSyncers - len(m.activeSyncers) + if numActiveLeft <= 0 { + m.syncersMu.Unlock() + continue + } + + log.Debugf("Attempting to transition %v passive "+ + "GossipSyncers to active", numActiveLeft) + + for i := 0; i < numActiveLeft; i++ { + chooseRandomSyncer( + m.inactiveSyncers, m.transitionPassiveSyncer, + ) + } + + m.syncersMu.Unlock() // Our RotateTicker has ticked, so we'll attempt to rotate a // single active syncer with a passive one. diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index 3c7fd6c7..b61253de 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -60,10 +60,10 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) { // The first syncer registered always attempts a historical // sync. - assertActiveGossipTimestampRange(t, peer) if i == 0 { assertTransitionToChansSynced(t, s, peer) } + assertActiveGossipTimestampRange(t, peer) assertSyncerStatus(t, s, chansSynced, ActiveSync) } @@ -90,8 +90,8 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) { historicalSyncPeer := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(historicalSyncPeer) historicalSyncer := assertSyncerExistence(t, syncMgr, historicalSyncPeer) - assertActiveGossipTimestampRange(t, historicalSyncPeer) assertTransitionToChansSynced(t, historicalSyncer, historicalSyncPeer) + assertActiveGossipTimestampRange(t, historicalSyncPeer) assertSyncerStatus(t, historicalSyncer, chansSynced, ActiveSync) // Then, we'll create the second active syncer, which is the one we'll @@ -142,8 +142,8 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { activeSyncPeer := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(activeSyncPeer) activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer) - assertActiveGossipTimestampRange(t, activeSyncPeer) assertTransitionToChansSynced(t, activeSyncer, activeSyncPeer) + assertActiveGossipTimestampRange(t, activeSyncPeer) assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) // We'll send a tick to force a rotation. Since there aren't any @@ -254,6 +254,66 @@ func TestSyncManagerForceHistoricalSync(t *testing.T) { }) } +// TestSyncManagerWaitUntilInitialHistoricalSync ensures that no GossipSyncers +// are initialized as ActiveSync until the initial historical sync has been +// completed. Once it does, the pending GossipSyncers should be transitioned to +// ActiveSync. +func TestSyncManagerWaitUntilInitialHistoricalSync(t *testing.T) { + t.Parallel() + + const numActiveSyncers = 2 + + // We'll start by creating our test sync manager which will hold up to + // 2 active syncers. + syncMgr := newTestSyncManager(numActiveSyncers) + syncMgr.Start() + defer syncMgr.Stop() + + // We'll go ahead and create our syncers. + peers := make([]*mockPeer, 0, numActiveSyncers) + syncers := make([]*GossipSyncer, 0, numActiveSyncers) + for i := 0; i < numActiveSyncers; i++ { + peer := randPeer(t, syncMgr.quit) + peers = append(peers, peer) + + syncMgr.InitSyncState(peer) + s := assertSyncerExistence(t, syncMgr, peer) + syncers = append(syncers, s) + + // The first one always attempts a historical sync. We won't + // transition it to chansSynced to ensure the remaining syncers + // aren't started as active. + if i == 0 { + assertSyncerStatus(t, s, syncingChans, PassiveSync) + continue + } + + // The rest should remain in a passive and chansSynced state, + // and they should be queued to transition to active once the + // initial historical sync is completed. + assertNoMsgSent(t, peer) + assertSyncerStatus(t, s, chansSynced, PassiveSync) + } + + // To ensure we don't transition any pending active syncers that have + // previously disconnected, we'll disconnect the last one. + stalePeer := peers[numActiveSyncers-1] + syncMgr.PruneSyncState(stalePeer.PubKey()) + + // Then, we'll complete the initial historical sync by transitioning the + // historical syncer to its final chansSynced state. This should trigger + // all of the pending active syncers to transition, except for the one + // we disconnected. + assertTransitionToChansSynced(t, syncers[0], peers[0]) + for i, s := range syncers { + if i == numActiveSyncers-1 { + assertNoMsgSent(t, peers[i]) + continue + } + assertPassiveSyncerTransition(t, s, peers[i]) + } +} + // assertNoMsgSent is a helper function that ensures a peer hasn't sent any // messages. func assertNoMsgSent(t *testing.T, peer *mockPeer) { @@ -294,7 +354,7 @@ func assertActiveGossipTimestampRange(t *testing.T, peer *mockPeer) { var msgSent lnwire.Message select { case msgSent = <-peer.sentMsgs: - case <-time.After(time.Second): + case <-time.After(2 * time.Second): t.Fatalf("expected peer %x to send lnwire.GossipTimestampRange "+ "message", peer.PubKey()) }