From 71a81f59a99bbba5093c47e50fcbb10bff0eff1e Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:27 -0700 Subject: [PATCH] chainntnfs: allow clients to pass in best block Clients can optionally pass their best block known into RegisterBlockEpochNtfn. This enables the notifiers to catch up clients on blocks they may have missed. --- chainntnfs/bitcoindnotify/bitcoind.go | 13 +++++++++++-- chainntnfs/btcdnotify/btcd.go | 14 +++++++++++--- chainntnfs/interface.go | 7 ++++++- chainntnfs/neutrinonotify/neutrino.go | 17 +++++++++++++---- 4 files changed, 41 insertions(+), 10 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index b3e5f7c7..cf367217 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -805,6 +805,10 @@ type blockEpochRegistration struct { epochQueue *chainntnfs.ConcurrentQueue + bestBlock *chainntnfs.BlockEpoch + + errorChan chan error + cancelChan chan struct{} wg sync.WaitGroup @@ -818,13 +822,18 @@ type epochCancel struct { // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the // caller to receive notifications, of each new block connected to the main -// chain. -func (b *BitcoindNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +// chain. Clients have the option of passing in their best known block, which +// the notifier uses to check if they are behind on blocks and catch them up. +func (b *BitcoindNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { + reg := &blockEpochRegistration{ epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&b.epochClientCounter, 1), + bestBlock: bestBlock, + errorChan: make(chan error, 1), } reg.epochQueue.Start() diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 2fb2f26c..5f45cf0f 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -16,7 +16,6 @@ import ( ) const ( - // notifierType uniquely identifies this concrete implementation of the // ChainNotifier interface. notifierType = "btcd" @@ -857,6 +856,10 @@ type blockEpochRegistration struct { epochQueue *chainntnfs.ConcurrentQueue + bestBlock *chainntnfs.BlockEpoch + + errorChan chan error + cancelChan chan struct{} wg sync.WaitGroup @@ -870,13 +873,18 @@ type epochCancel struct { // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the // caller to receive notifications, of each new block connected to the main -// chain. -func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +// chain. Clients have the option of passing in their best known block, which +// the notifier uses to check if they are behind on blocks and catch them up. +func (b *BtcdNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { + reg := &blockEpochRegistration{ epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&b.epochClientCounter, 1), + bestBlock: bestBlock, + errorChan: make(chan error, 1), } reg.epochQueue.Start() diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index ff90cc0a..8f09c640 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -59,7 +59,12 @@ type ChainNotifier interface { // new block connected to the tip of the main chain. The returned // BlockEpochEvent struct contains a channel which will be sent upon // for each new block discovered. - RegisterBlockEpochNtfn() (*BlockEpochEvent, error) + // + // Clients have the option of passing in their best known block. + // If they specify a block, the ChainNotifier checks whether the client + // is behind on blocks. If they are, the ChainNotifier sends a backlog + // of block notifications for the missed blocks. + RegisterBlockEpochNtfn(*BlockEpoch) (*BlockEpochEvent, error) // Start the ChainNotifier. Once started, the implementation should be // ready, and able to receive notification registrations from clients. diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index f612ef1c..badda36b 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -20,7 +20,6 @@ import ( ) const ( - // notifierType uniquely identifies this concrete implementation of the // ChainNotifier interface. notifierType = "neutrino" @@ -781,6 +780,10 @@ type blockEpochRegistration struct { cancelChan chan struct{} + bestBlock *chainntnfs.BlockEpoch + + errorChan chan error + wg sync.WaitGroup } @@ -790,14 +793,20 @@ type epochCancel struct { epochID uint64 } -// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the caller -// to receive notifications, of each new block connected to the main chain. -func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { +// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the +// caller to receive notifications, of each new block connected to the main +// chain. Clients have the option of passing in their best known block, which +// the notifier uses to check if they are behind on blocks and catch them up. +func (n *NeutrinoNotifier) RegisterBlockEpochNtfn( + bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { + reg := &blockEpochRegistration{ epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&n.epochClientCounter, 1), + bestBlock: bestBlock, + errorChan: make(chan error, 1), } reg.epochQueue.Start()