diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 494ca92b..f0d4da2e 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -217,6 +217,14 @@ type filteredBlock struct { connect bool } +// rescanFilterUpdate represents a request that will be sent to the +// notificaionRegistry in order to prevent race conditions between the filter +// update and new block notifications. +type rescanFilterUpdate struct { + updateOptions []neutrino.UpdateOption + errChan chan error +} + // onFilteredBlockConnected is a callback which is executed each a new block is // connected to the end of the main chain. func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, @@ -283,9 +291,8 @@ out: case registerMsg := <-n.notificationRegistry: switch msg := registerMsg.(type) { case *chainntnfs.HistoricalConfDispatch: - // Look up whether the transaction is already - // included in the active chain. We'll do this - // in a goroutine to prevent blocking + // We'll start a historical rescan chain of the + // chain asynchronously to prevent blocking // potentially long rescans. n.wg.Add(1) go func() { @@ -299,18 +306,6 @@ out: chainntnfs.Log.Error(err) } - // We'll map the script into an address - // type so we can instruct neutrino to - // match if the transaction containing - // the script is found in a block. - params := n.p2pNode.ChainParams() - _, addrs, _, err := txscript.ExtractPkScriptAddrs( - msg.PkScript, ¶ms, - ) - if err != nil { - chainntnfs.Log.Error(err) - } - // If the historical dispatch finished // without error, we will invoke // UpdateConfDetails even if none were @@ -324,25 +319,6 @@ out: if err != nil { chainntnfs.Log.Error(err) } - - if confDetails != nil { - return - } - - // If we can't fully dispatch - // confirmation, then we'll update our - // filter so we can be notified of its - // future initial confirmation. - rescanUpdate := []neutrino.UpdateOption{ - neutrino.AddAddrs(addrs...), - neutrino.Rewind(msg.EndHeight), - neutrino.DisableDisconnectedNtfns(true), - } - err = n.chainView.Update(rescanUpdate...) - if err != nil { - chainntnfs.Log.Errorf("Unable to update rescan: %v", - err) - } }() case *blockEpochRegistration: @@ -367,6 +343,14 @@ out: } } msg.errorChan <- nil + + case *rescanFilterUpdate: + err := n.chainView.Update(msg.updateOptions...) + if err != nil { + chainntnfs.Log.Errorf("Unable to "+ + "update rescan filter: %v", err) + } + msg.errChan <- err } case item := <-n.chainUpdates.ChanOut(): @@ -658,9 +642,47 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return ntfn.Event, nil } - // Ensure that neutrino is caught up to the height hint before we - // attempt to fetch the UTXO from the chain. If we're behind, then we - // may miss a notification dispatch. + // To determine whether this outpoint has been spent on-chain, we'll + // update our filter to watch for the transaction at tip and we'll also + // dispatch a historical rescan to determine if it has been spent in the + // past. + // + // We'll update our filter first to ensure we can immediately detect the + // spend at tip. To do so, we'll map the script into an address + // type so we can instruct neutrino to match if the transaction + // containing the script is found in a block. + inputToWatch := neutrino.InputWithScript{ + OutPoint: *outpoint, + PkScript: pkScript, + } + errChan := make(chan error, 1) + select { + case n.notificationRegistry <- &rescanFilterUpdate{ + updateOptions: []neutrino.UpdateOption{ + neutrino.AddInputs(inputToWatch), + neutrino.Rewind(historicalDispatch.EndHeight), + neutrino.DisableDisconnectedNtfns(true), + }, + errChan: errChan, + }: + case <-n.quit: + return nil, ErrChainNotifierShuttingDown + } + + select { + case err = <-errChan: + case <-n.quit: + return nil, ErrChainNotifierShuttingDown + } + if err != nil { + return nil, fmt.Errorf("unable to update filter: %v", err) + } + + // With the filter updated, we'll dispatch our historical rescan to + // ensure we detect the spend if it happened in the past. We'll ensure + // that neutrino is caught up to the starting height before we attempt + // to fetch the UTXO from the chain. If we're behind, then we may miss a + // notification dispatch. for { n.heightMtx.RLock() currentHeight := n.bestHeight @@ -673,12 +695,6 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, time.Sleep(time.Millisecond * 200) } - // Before sending off the notification request, we'll attempt to see if - // this output is still spent or not at this point in the chain. - inputToWatch := neutrino.InputWithScript{ - OutPoint: *outpoint, - PkScript: pkScript, - } spendReport, err := n.p2pNode.GetUtxo( neutrino.WatchInputs(inputToWatch), neutrino.StartBlock(&waddrmgr.BlockStamp{ @@ -694,60 +710,28 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // If a spend report was returned, and the transaction is present, then // this means that the output is already spent. + var spendDetails *chainntnfs.SpendDetail if spendReport != nil && spendReport.SpendingTx != nil { - // As a result, we'll launch a goroutine to immediately - // dispatch the notification with a normal response. spendingTxHash := spendReport.SpendingTx.TxHash() - spendDetails := &chainntnfs.SpendDetail{ + spendDetails = &chainntnfs.SpendDetail{ SpentOutPoint: outpoint, SpenderTxHash: &spendingTxHash, SpendingTx: spendReport.SpendingTx, SpenderInputIndex: spendReport.SpendingInputIndex, SpendingHeight: int32(spendReport.SpendingTxHeight), } - - err := n.txNotifier.UpdateSpendDetails(*outpoint, spendDetails) - if err != nil { - return nil, err - } - - return ntfn.Event, nil } - // If the output is still unspent, then we'll mark our historical rescan - // as complete and update our rescan's filter to watch for the spend of - // the outpoint in question. - if err := n.txNotifier.UpdateSpendDetails(*outpoint, nil); err != nil { - return nil, err - } - - rescanUpdate := []neutrino.UpdateOption{ - neutrino.AddInputs(inputToWatch), - neutrino.Rewind(historicalDispatch.EndHeight), - neutrino.DisableDisconnectedNtfns(true), - } - - if err := n.chainView.Update(rescanUpdate...); err != nil { - return nil, err - } - - select { - case n.notificationRegistry <- ntfn: - case <-n.quit: - return nil, ErrChainNotifierShuttingDown - } - - // Finally, we'll add a spent hint with the current height to the cache - // in order to better keep track of when this outpoint is spent. - err = n.spendHintCache.CommitSpendHint(currentHeight, *outpoint) + // Finally, no matter whether the rescan found a spend in the past or + // not, we'll mark our historical rescan as complete to ensure the + // outpoint's spend hint gets updated upon connected/disconnected + // blocks. + err = n.txNotifier.UpdateSpendDetails(*outpoint, spendDetails) if err != nil { - // The error is not fatal, so we should not return an error to - // the caller. - chainntnfs.Log.Errorf("Unable to update spend hint to %d for "+ - "%v: %v", currentHeight, outpoint, err) + return nil, err } - return spendEvent, nil + return ntfn.Event, nil } // RegisterConfirmationsNtfn registers a notification with NeutrinoNotifier @@ -784,12 +768,55 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, return ntfn.Event, nil } + // To determine whether this transaction has confirmed on-chain, we'll + // update our filter to watch for the transaction at tip and we'll also + // dispatch a historical rescan to determine if it has confirmed in the + // past. + // + // We'll update our filter first to ensure we can immediately detect the + // confirmation at tip. To do so, we'll map the script into an address + // type so we can instruct neutrino to match if the transaction + // containing the script is found in a block. + params := n.p2pNode.ChainParams() + _, addrs, _, err := txscript.ExtractPkScriptAddrs(pkScript, ¶ms) + if err != nil { + return nil, fmt.Errorf("unable to extract script: %v", err) + } + + // We'll send the filter update request to the notifier's main event + // handler and wait for its response. + errChan := make(chan error, 1) select { - case n.notificationRegistry <- dispatch: - return ntfn.Event, nil + case n.notificationRegistry <- &rescanFilterUpdate{ + updateOptions: []neutrino.UpdateOption{ + neutrino.AddAddrs(addrs...), + neutrino.Rewind(dispatch.EndHeight), + neutrino.DisableDisconnectedNtfns(true), + }, + errChan: errChan, + }: case <-n.quit: return nil, ErrChainNotifierShuttingDown } + + select { + case err = <-errChan: + case <-n.quit: + return nil, ErrChainNotifierShuttingDown + } + if err != nil { + return nil, fmt.Errorf("unable to update filter: %v", err) + } + + // Finally, with the filter updates, we can dispatch the historical + // rescan to ensure we can detect if the event happened in the past. + select { + case n.notificationRegistry <- dispatch: + case <-n.quit: + return nil, ErrChainNotifierShuttingDown + } + + return ntfn.Event, nil } // blockEpochRegistration represents a client's intent to receive a