From cfe0babd7838986d229f37f287a8d7f15b82cc90 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Thu, 30 Apr 2020 12:34:47 +0300 Subject: [PATCH 1/2] chainntnfs: use sync.Once to start notifiers. --- chainntnfs/bitcoindnotify/bitcoind.go | 63 ++++++++++++------------ chainntnfs/btcdnotify/btcd.go | 69 ++++++++++++++------------- chainntnfs/neutrinonotify/neutrino.go | 61 ++++++++++++----------- 3 files changed, 101 insertions(+), 92 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 5956a0fb..bb96a7ec 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -34,7 +34,7 @@ const ( type BitcoindNotifier struct { epochClientCounter uint64 // To be used atomically. - started int32 // To be used atomically. + start sync.Once stopped int32 // To be used atomically. chainConn *chain.BitcoindClient @@ -96,11 +96,41 @@ func New(chainConn *chain.BitcoindConn, chainParams *chaincfg.Params, // Start connects to the running bitcoind node over websockets, registers for // block notifications, and finally launches all related helper goroutines. func (b *BitcoindNotifier) Start() error { - // Already started? - if atomic.AddInt32(&b.started, 1) != 1 { + var startErr error + b.start.Do(func() { + startErr = b.startNotifier() + }) + return startErr +} + +// Stop shutsdown the BitcoindNotifier. +func (b *BitcoindNotifier) Stop() error { + // Already shutting down? + if atomic.AddInt32(&b.stopped, 1) != 1 { return nil } + // Shutdown the rpc client, this gracefully disconnects from bitcoind, + // and cleans up all related resources. + b.chainConn.Stop() + + close(b.quit) + b.wg.Wait() + + // Notify all pending clients of our shutdown by closing the related + // notification channels. + for _, epochClient := range b.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + + close(epochClient.epochChan) + } + b.txNotifier.TearDown() + + return nil +} + +func (b *BitcoindNotifier) startNotifier() error { // Connect to bitcoind, and register for notifications on connected, // and disconnected blocks. if err := b.chainConn.Start(); err != nil { @@ -131,33 +161,6 @@ func (b *BitcoindNotifier) Start() error { return nil } -// Stop shutsdown the BitcoindNotifier. -func (b *BitcoindNotifier) Stop() error { - // Already shutting down? - if atomic.AddInt32(&b.stopped, 1) != 1 { - return nil - } - - // Shutdown the rpc client, this gracefully disconnects from bitcoind, - // and cleans up all related resources. - b.chainConn.Stop() - - close(b.quit) - b.wg.Wait() - - // Notify all pending clients of our shutdown by closing the related - // notification channels. - for _, epochClient := range b.blockEpochClients { - close(epochClient.cancelChan) - epochClient.wg.Wait() - - close(epochClient.epochChan) - } - b.txNotifier.TearDown() - - return nil -} - // notificationDispatcher is the primary goroutine which handles client // notification registrations, as well as notification dispatches. func (b *BitcoindNotifier) notificationDispatcher() { diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 7b179209..1ce8c630 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -53,7 +53,7 @@ type txUpdate struct { type BtcdNotifier struct { epochClientCounter uint64 // To be used atomically. - started int32 // To be used atomically. + start sync.Once stopped int32 // To be used atomically. chainConn *rpcclient.Client @@ -134,11 +134,44 @@ func New(config *rpcclient.ConnConfig, chainParams *chaincfg.Params, // Start connects to the running btcd node over websockets, registers for block // notifications, and finally launches all related helper goroutines. func (b *BtcdNotifier) Start() error { - // Already started? - if atomic.AddInt32(&b.started, 1) != 1 { + var startErr error + b.start.Do(func() { + startErr = b.startNotifier() + }) + return startErr +} + +// Stop shutsdown the BtcdNotifier. +func (b *BtcdNotifier) Stop() error { + // Already shutting down? + if atomic.AddInt32(&b.stopped, 1) != 1 { return nil } + // Shutdown the rpc client, this gracefully disconnects from btcd, and + // cleans up all related resources. + b.chainConn.Shutdown() + + close(b.quit) + b.wg.Wait() + + b.chainUpdates.Stop() + b.txUpdates.Stop() + + // Notify all pending clients of our shutdown by closing the related + // notification channels. + for _, epochClient := range b.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + + close(epochClient.epochChan) + } + b.txNotifier.TearDown() + + return nil +} + +func (b *BtcdNotifier) startNotifier() error { // Start our concurrent queues before starting the chain connection, to // ensure onBlockConnected and onRedeemingTx callbacks won't be // blocked. @@ -182,36 +215,6 @@ func (b *BtcdNotifier) Start() error { return nil } -// Stop shutsdown the BtcdNotifier. -func (b *BtcdNotifier) Stop() error { - // Already shutting down? - if atomic.AddInt32(&b.stopped, 1) != 1 { - return nil - } - - // Shutdown the rpc client, this gracefully disconnects from btcd, and - // cleans up all related resources. - b.chainConn.Shutdown() - - close(b.quit) - b.wg.Wait() - - b.chainUpdates.Stop() - b.txUpdates.Stop() - - // Notify all pending clients of our shutdown by closing the related - // notification channels. - for _, epochClient := range b.blockEpochClients { - close(epochClient.cancelChan) - epochClient.wg.Wait() - - close(epochClient.epochChan) - } - b.txNotifier.TearDown() - - return nil -} - // onBlockConnected implements on OnBlockConnected callback for rpcclient. // Ingesting a block updates the wallet's internal utxo state based on the // outputs created and destroyed within each block. diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 771c4b86..be1e8f80 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -39,7 +39,7 @@ const ( type NeutrinoNotifier struct { epochClientCounter uint64 // To be used atomically. - started int32 // To be used atomically. + start sync.Once stopped int32 // To be used atomically. bestBlockMtx sync.RWMutex @@ -111,11 +111,40 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache, // Start contacts the running neutrino light client and kicks off an initial // empty rescan. func (n *NeutrinoNotifier) Start() error { - // Already started? - if atomic.AddInt32(&n.started, 1) != 1 { + var startErr error + n.start.Do(func() { + startErr = n.startNotifier() + }) + return startErr +} + +// Stop shuts down the NeutrinoNotifier. +func (n *NeutrinoNotifier) Stop() error { + // Already shutting down? + if atomic.AddInt32(&n.stopped, 1) != 1 { return nil } + close(n.quit) + n.wg.Wait() + + n.chainUpdates.Stop() + n.txUpdates.Stop() + + // Notify all pending clients of our shutdown by closing the related + // notification channels. + for _, epochClient := range n.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + + close(epochClient.epochChan) + } + n.txNotifier.TearDown() + + return nil +} + +func (n *NeutrinoNotifier) startNotifier() error { // Start our concurrent queues before starting the rescan, to ensure // onFilteredBlockConnected and onRelavantTx callbacks won't be // blocked. @@ -174,32 +203,6 @@ func (n *NeutrinoNotifier) Start() error { return nil } -// Stop shuts down the NeutrinoNotifier. -func (n *NeutrinoNotifier) Stop() error { - // Already shutting down? - if atomic.AddInt32(&n.stopped, 1) != 1 { - return nil - } - - close(n.quit) - n.wg.Wait() - - n.chainUpdates.Stop() - n.txUpdates.Stop() - - // Notify all pending clients of our shutdown by closing the related - // notification channels. - for _, epochClient := range n.blockEpochClients { - close(epochClient.cancelChan) - epochClient.wg.Wait() - - close(epochClient.epochChan) - } - n.txNotifier.TearDown() - - return nil -} - // filteredBlock represents a new block which has been connected to the main // chain. The slice of transactions will only be populated if the block // includes a transaction that confirmed one of our watched txids, or spends From ae2c37e043225c12bcc69790201d3d3dcd28db62 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Thu, 30 Apr 2020 12:54:33 +0300 Subject: [PATCH 2/2] Ensure chain notifier is started before accessed. The use case comes from the RPC layer that is ready before the chain notifier which is used in the sub server. --- chainntnfs/bitcoindnotify/bitcoind.go | 10 ++++++++++ chainntnfs/btcdnotify/btcd.go | 10 ++++++++++ chainntnfs/interface.go | 3 +++ chainntnfs/neutrinonotify/neutrino.go | 10 ++++++++++ contractcourt/chain_watcher_test.go | 4 ++++ discovery/gossiper_test.go | 4 ++++ fundingmanager_test.go | 4 ++++ htlcswitch/mock.go | 4 ++++ lnrpc/chainrpc/chainnotifier_server.go | 17 +++++++++++++++++ mock.go | 4 ++++ sweep/test_utils.go | 5 +++++ 11 files changed, 75 insertions(+) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index bb96a7ec..b39f78a4 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -35,6 +35,7 @@ type BitcoindNotifier struct { epochClientCounter uint64 // To be used atomically. start sync.Once + active int32 // To be used atomically. stopped int32 // To be used atomically. chainConn *chain.BitcoindClient @@ -130,6 +131,11 @@ func (b *BitcoindNotifier) Stop() error { return nil } +// Started returns true if this instance has been started, and false otherwise. +func (b *BitcoindNotifier) Started() bool { + return atomic.LoadInt32(&b.active) != 0 +} + func (b *BitcoindNotifier) startNotifier() error { // Connect to bitcoind, and register for notifications on connected, // and disconnected blocks. @@ -158,6 +164,10 @@ func (b *BitcoindNotifier) startNotifier() error { b.wg.Add(1) go b.notificationDispatcher() + // Set the active flag now that we've completed the full + // startup. + atomic.StoreInt32(&b.active, 1) + return nil } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 1ce8c630..d23a3059 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -54,6 +54,7 @@ type BtcdNotifier struct { epochClientCounter uint64 // To be used atomically. start sync.Once + active int32 // To be used atomically. stopped int32 // To be used atomically. chainConn *rpcclient.Client @@ -141,6 +142,11 @@ func (b *BtcdNotifier) Start() error { return startErr } +// Started returns true if this instance has been started, and false otherwise. +func (b *BtcdNotifier) Started() bool { + return atomic.LoadInt32(&b.active) != 0 +} + // Stop shutsdown the BtcdNotifier. func (b *BtcdNotifier) Stop() error { // Already shutting down? @@ -212,6 +218,10 @@ func (b *BtcdNotifier) startNotifier() error { b.wg.Add(1) go b.notificationDispatcher() + // Set the active flag now that we've completed the full + // startup. + atomic.StoreInt32(&b.active, 1) + return nil } diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index ace86e77..c224181c 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -139,6 +139,9 @@ type ChainNotifier interface { // ready, and able to receive notification registrations from clients. Start() error + // Started returns true if this instance has been started, and false otherwise. + Started() bool + // Stops the concrete ChainNotifier. Once stopped, the ChainNotifier // should disallow any future requests from potential clients. // Additionally, all pending client notifications will be canceled diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index be1e8f80..567fa520 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -40,6 +40,7 @@ type NeutrinoNotifier struct { epochClientCounter uint64 // To be used atomically. start sync.Once + active int32 // To be used atomically. stopped int32 // To be used atomically. bestBlockMtx sync.RWMutex @@ -144,6 +145,11 @@ func (n *NeutrinoNotifier) Stop() error { return nil } +// Started returns true if this instance has been started, and false otherwise. +func (n *NeutrinoNotifier) Started() bool { + return atomic.LoadInt32(&n.active) != 0 +} + func (n *NeutrinoNotifier) startNotifier() error { // Start our concurrent queues before starting the rescan, to ensure // onFilteredBlockConnected and onRelavantTx callbacks won't be @@ -200,6 +206,10 @@ func (n *NeutrinoNotifier) startNotifier() error { n.wg.Add(1) go n.notificationDispatcher() + // Set the active flag now that we've completed the full + // startup. + atomic.StoreInt32(&n.active, 1) + return nil } diff --git a/contractcourt/chain_watcher_test.go b/contractcourt/chain_watcher_test.go index 6dc47f53..62c76871 100644 --- a/contractcourt/chain_watcher_test.go +++ b/contractcourt/chain_watcher_test.go @@ -43,6 +43,10 @@ func (m *mockNotifier) Start() error { return nil } +func (m *mockNotifier) Started() bool { + return true +} + func (m *mockNotifier) Stop() error { return nil } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 75d3cef6..5281d4aa 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -453,6 +453,10 @@ func (m *mockNotifier) Start() error { return nil } +func (m *mockNotifier) Started() bool { + return true +} + func (m *mockNotifier) Stop() error { return nil } diff --git a/fundingmanager_test.go b/fundingmanager_test.go index b222bf94..0fa5831f 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -131,6 +131,10 @@ func (m *mockNotifier) Start() error { return nil } +func (m *mockNotifier) Started() bool { + return true +} + func (m *mockNotifier) Stop() error { return nil } diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index e9a2a1ef..0ce5adf6 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -931,6 +931,10 @@ func (m *mockNotifier) Start() error { return nil } +func (m *mockNotifier) Started() bool { + return true +} + func (m *mockNotifier) Stop() error { return nil } diff --git a/lnrpc/chainrpc/chainnotifier_server.go b/lnrpc/chainrpc/chainnotifier_server.go index 2d662c8e..261f7605 100644 --- a/lnrpc/chainrpc/chainnotifier_server.go +++ b/lnrpc/chainrpc/chainnotifier_server.go @@ -63,6 +63,11 @@ var ( // has been shut down. ErrChainNotifierServerShuttingDown = errors.New("chain notifier RPC " + "subserver shutting down") + + // ErrChainNotifierServerNotActive indicates that the chain notifier hasn't + // finished the startup process. + ErrChainNotifierServerNotActive = errors.New("chain notifier RPC is" + + "still in the process of starting") ) // fileExists reports whether the named file or directory exists. @@ -196,6 +201,10 @@ func (s *Server) RegisterWithRootServer(grpcServer *grpc.Server) error { func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest, confStream ChainNotifier_RegisterConfirmationsNtfnServer) error { + if !s.cfg.ChainNotifier.Started() { + return ErrChainNotifierServerNotActive + } + // We'll start by reconstructing the RPC request into what the // underlying ChainNotifier expects. var txid chainhash.Hash @@ -292,6 +301,10 @@ func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest, func (s *Server) RegisterSpendNtfn(in *SpendRequest, spendStream ChainNotifier_RegisterSpendNtfnServer) error { + if !s.cfg.ChainNotifier.Started() { + return ErrChainNotifierServerNotActive + } + // We'll start by reconstructing the RPC request into what the // underlying ChainNotifier expects. var op *wire.OutPoint @@ -399,6 +412,10 @@ func (s *Server) RegisterSpendNtfn(in *SpendRequest, func (s *Server) RegisterBlockEpochNtfn(in *BlockEpoch, epochStream ChainNotifier_RegisterBlockEpochNtfnServer) error { + if !s.cfg.ChainNotifier.Started() { + return ErrChainNotifierServerNotActive + } + // We'll start by reconstructing the RPC request into what the // underlying ChainNotifier expects. var hash chainhash.Hash diff --git a/mock.go b/mock.go index 0b71a6dc..34e466d9 100644 --- a/mock.go +++ b/mock.go @@ -112,6 +112,10 @@ func (m *mockNotfier) Start() error { return nil } +func (m *mockNotfier) Started() bool { + return true +} + func (m *mockNotfier) Stop() error { return nil } diff --git a/sweep/test_utils.go b/sweep/test_utils.go index 7c28710b..5b83d730 100644 --- a/sweep/test_utils.go +++ b/sweep/test_utils.go @@ -200,6 +200,11 @@ func (m *MockNotifier) Start() error { return nil } +// Started checks if started +func (m *MockNotifier) Started() bool { + return true +} + // Stop the notifier. func (m *MockNotifier) Stop() error { return nil