diff --git a/lib/core/src/persist/chain.rs b/lib/core/src/persist/chain.rs index 9c702d1..50a2162 100644 --- a/lib/core/src/persist/chain.rs +++ b/lib/core/src/persist/chain.rs @@ -226,12 +226,6 @@ impl Persister { }) } - pub(crate) fn has_chain_swaps(&self) -> Result { - let con: Connection = self.get_connection()?; - let result = con.query_row("SELECT 1 FROM chain_swaps LIMIT 1", [], |_| Ok(())); - Ok(result.is_ok()) - } - pub(crate) fn list_chain_swaps(&self) -> Result> { let con: Connection = self.get_connection()?; self.list_chain_swaps_where(&con, vec![]) diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index d52f34a..a19a81a 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -599,37 +599,50 @@ impl LiquidSdk { false } }; - // Get the Bitcoin tip and process a new block (only if chain swaps are used) - let (maybe_bitcoin_tip, is_new_bitcoin_block) = if self - .persister - .has_chain_swaps() + + // Get the recoverable swaps list using the last known tips + let last_monitored_swaps = self + .get_monitored_swaps_list( + false, + true, + ChainTips { + liquid_tip: *current_liquid_block, + bitcoin_tip: Some(*current_bitcoin_block), + }, + ) + .await .unwrap_or_else(|e| { - error!("Failed to check if there are chain swaps: {e} - proceeding as if not"); - false - }) - .not() - { - info!("No known chain swaps, skipping bitcoin tip fetch"); - (None, false) - } else { - let t0 = Instant::now(); - let bitcoin_tip_res = self.bitcoin_chain_service.tip().await; - let duration_ms = Instant::now().duration_since(t0).as_millis(); - info!("Fetched bitcoin tip at ({duration_ms} ms)"); - let is_new_bitcoin_block = match &bitcoin_tip_res { - Ok(height) => { - debug!("Got Bitcoin tip: {height}"); - let is_new_bitcoin_block = *height > *current_bitcoin_block; - *current_bitcoin_block = *height; - is_new_bitcoin_block - } - Err(e) => { - error!("Failed to fetch Bitcoin tip: {e}"); - false - } + error!("Failed to get last monitored swaps list: {e}"); + Vec::new() + }); + let last_monitored_swaps_include_chain_swaps = last_monitored_swaps + .iter() + .any(|s| matches!(s, Swap::Chain(_))); + + // Get the Bitcoin tip and process a new block (only if chain swaps are being monitored) + let (maybe_bitcoin_tip, is_new_bitcoin_block) = + if last_monitored_swaps_include_chain_swaps.not() { + info!("No chain swaps being monitored, skipping bitcoin tip fetch"); + (None, false) + } else { + let t0 = Instant::now(); + let bitcoin_tip_res = self.bitcoin_chain_service.tip().await; + let duration_ms = Instant::now().duration_since(t0).as_millis(); + info!("Fetched bitcoin tip at ({duration_ms} ms)"); + let is_new_bitcoin_block = match &bitcoin_tip_res { + Ok(height) => { + debug!("Got Bitcoin tip: {height}"); + let is_new_bitcoin_block = *height > *current_bitcoin_block; + *current_bitcoin_block = *height; + is_new_bitcoin_block + } + Err(e) => { + error!("Failed to fetch Bitcoin tip: {e}"); + false + } + }; + (bitcoin_tip_res.ok(), is_new_bitcoin_block) }; - (bitcoin_tip_res.ok(), is_new_bitcoin_block) - }; if let Ok(liquid_tip) = liquid_tip_res { self.persister @@ -642,7 +655,22 @@ impl LiquidSdk { // Only partial sync when there are no new Liquid or Bitcoin blocks let partial_sync = (is_new_liquid_block || is_new_bitcoin_block).not(); - if let Err(e) = self.sync_inner(partial_sync, chain_tips).await { + // Get the recoverable swaps list again now using the new chain tips and now knowing wether this is a partial sync or not + let include_chain_swaps = + if last_monitored_swaps_include_chain_swaps && chain_tips.bitcoin_tip.is_none() { + error!("Skipping syncing of chain swaps because bitcoin tip is not available"); + false + } else { + true + }; + let recoverable_swaps = self + .get_monitored_swaps_list(partial_sync, include_chain_swaps, chain_tips) + .await + .unwrap_or_else(|e| { + error!("Failed to get monitored swaps list: {e}"); + Vec::new() + }); + if let Err(e) = self.sync_inner(recoverable_swaps, chain_tips).await { error!("Failed to sync while tracking new blocks: {e}"); } } @@ -675,8 +703,14 @@ impl LiquidSdk { fn start_track_new_blocks_task(self: &Arc) { let cloned = self.clone(); tokio::spawn(async move { - let mut current_liquid_block: u32 = 0; - let mut current_bitcoin_block: u32 = 0; + let last_blockchain_info = cloned + .get_info() + .await + .map(|i| i.blockchain_info) + .unwrap_or_default(); + + let mut current_liquid_block: u32 = last_blockchain_info.liquid_tip; + let mut current_bitcoin_block: u32 = last_blockchain_info.bitcoin_tip; let mut shutdown_receiver = cloned.shutdown_receiver.clone(); cloned .track_new_blocks(&mut current_liquid_block, &mut current_bitcoin_block) @@ -3571,9 +3605,15 @@ impl LiquidSdk { .await?) } + /// Returns a list of swaps that need to be monitored for recovery. + /// + /// If `partial_sync` is true, only receive swaps will be considered. + /// + /// If `include_chain_swaps` is true, the provided chain tips must include a Bitcoin tip. pub(crate) async fn get_monitored_swaps_list( &self, partial_sync: bool, + include_chain_swaps: bool, chain_tips: ChainTips, ) -> Result> { let receive_swaps = self @@ -3592,16 +3632,19 @@ impl LiquidSdk { .into_iter() .map(Into::into) .collect(); - let all_chain_swaps: Vec = self.persister.list_chain_swaps()?; - if all_chain_swaps.is_empty() { + + if include_chain_swaps.not() { return Ok([receive_swaps, send_swaps].concat()); } - // There are some chain swaps, so we need to filter them out based on the bitcoin tip, which may not be available - let Some(bitcoin_tip) = chain_tips.bitcoin_tip else { - warn!("Skipping tracking of chain swaps because bitcoin tip is not available"); - return Ok([receive_swaps, send_swaps].concat()); - }; - let chain_swaps: Vec = all_chain_swaps + + let bitcoin_tip = chain_tips + .bitcoin_tip + .ok_or_else(|| PaymentError::Generic { + err: "Bitcoin tip is not available".to_string(), + })?; + let chain_swaps: Vec = self + .persister + .list_chain_swaps()? .into_iter() .filter(|swap| match swap.direction { Direction::Incoming => { @@ -3626,12 +3669,9 @@ impl LiquidSdk { /// it inserts or updates a corresponding entry in our Payments table. async fn sync_payments_with_chain_data( &self, - partial_sync: bool, + mut recoverable_swaps: Vec, chain_tips: ChainTips, ) -> Result<()> { - let mut recoverable_swaps = self - .get_monitored_swaps_list(partial_sync, chain_tips) - .await?; let mut wallet_tx_map = self .recoverer .recover_from_onchain(&mut recoverable_swaps, Some(chain_tips)) @@ -4018,31 +4058,44 @@ impl LiquidSdk { /// Synchronizes the local state with the mempool and onchain data. pub async fn sync(&self, partial_sync: bool) -> SdkResult<()> { + let liquid_tip = self.liquid_chain_service.tip().await?; + + let last_bitcoin_tip = self.get_info().await?.blockchain_info.bitcoin_tip; + let last_chain_tips = ChainTips { + liquid_tip, + bitcoin_tip: Some(last_bitcoin_tip), + }; + let recoverable_swaps = self + .get_monitored_swaps_list(partial_sync, true, last_chain_tips) + .await?; + let chain_tips = ChainTips { - liquid_tip: self.liquid_chain_service.tip().await?, + liquid_tip, bitcoin_tip: { - if self - .persister - .has_chain_swaps() - .unwrap_or_else(|e| { - error!( - "Failed to check if there are chain swaps: {e} - proceeding as if not" - ); - false - }) + if recoverable_swaps + .iter() + .any(|s| matches!(s, Swap::Chain(_))) .not() { - info!("No known chain swaps, skipping bitcoin tip fetch"); + info!("No chain swaps being monitored, skipping bitcoin tip fetch"); None } else { self.bitcoin_chain_service.tip().await.ok() } }, }; - self.sync_inner(partial_sync, chain_tips).await + + // Here we run the sync on the recoverable swaps built using the last known bitcoin block + // This may result in swaps being processed, which wouldn't if we used the current bitcoin tip, + // but these would be swaps that seems a good idea to recover in any case. + self.sync_inner(recoverable_swaps, chain_tips).await } - async fn sync_inner(&self, partial_sync: bool, chain_tips: ChainTips) -> SdkResult<()> { + async fn sync_inner( + &self, + recoverable_swaps: Vec, + chain_tips: ChainTips, + ) -> SdkResult<()> { self.ensure_is_started().await?; let t0 = Instant::now(); @@ -4059,18 +4112,18 @@ impl LiquidSdk { match is_first_sync { true => { self.event_manager.pause_notifications(); - self.sync_payments_with_chain_data(partial_sync, chain_tips) + self.sync_payments_with_chain_data(recoverable_swaps, chain_tips) .await?; self.event_manager.resume_notifications(); self.persister.set_is_first_sync_complete(true)?; } false => { - self.sync_payments_with_chain_data(partial_sync, chain_tips) + self.sync_payments_with_chain_data(recoverable_swaps, chain_tips) .await?; } } let duration_ms = Instant::now().duration_since(t0).as_millis(); - info!("Synchronized (partial: {partial_sync}) with mempool and onchain data ({duration_ms} ms)"); + info!("Synchronized with mempool and onchain data ({duration_ms} ms)"); self.notify_event_listeners(SdkEvent::Synced).await; Ok(())