mirror of
https://github.com/aljazceru/breez-sdk-liquid.git
synced 2026-01-31 11:54:24 +01:00
Skip bitcoin tip fetching if there are no monitored chain swaps
This commit is contained in:
@@ -226,12 +226,6 @@ impl Persister {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn has_chain_swaps(&self) -> Result<bool> {
|
||||
let con: Connection = self.get_connection()?;
|
||||
let result = con.query_row("SELECT 1 FROM chain_swaps LIMIT 1", [], |_| Ok(()));
|
||||
Ok(result.is_ok())
|
||||
}
|
||||
|
||||
pub(crate) fn list_chain_swaps(&self) -> Result<Vec<ChainSwap>> {
|
||||
let con: Connection = self.get_connection()?;
|
||||
self.list_chain_swaps_where(&con, vec![])
|
||||
|
||||
@@ -599,37 +599,50 @@ impl LiquidSdk {
|
||||
false
|
||||
}
|
||||
};
|
||||
// Get the Bitcoin tip and process a new block (only if chain swaps are used)
|
||||
let (maybe_bitcoin_tip, is_new_bitcoin_block) = if self
|
||||
.persister
|
||||
.has_chain_swaps()
|
||||
|
||||
// Get the recoverable swaps list using the last known tips
|
||||
let last_monitored_swaps = self
|
||||
.get_monitored_swaps_list(
|
||||
false,
|
||||
true,
|
||||
ChainTips {
|
||||
liquid_tip: *current_liquid_block,
|
||||
bitcoin_tip: Some(*current_bitcoin_block),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
error!("Failed to check if there are chain swaps: {e} - proceeding as if not");
|
||||
false
|
||||
})
|
||||
.not()
|
||||
{
|
||||
info!("No known chain swaps, skipping bitcoin tip fetch");
|
||||
(None, false)
|
||||
} else {
|
||||
let t0 = Instant::now();
|
||||
let bitcoin_tip_res = self.bitcoin_chain_service.tip().await;
|
||||
let duration_ms = Instant::now().duration_since(t0).as_millis();
|
||||
info!("Fetched bitcoin tip at ({duration_ms} ms)");
|
||||
let is_new_bitcoin_block = match &bitcoin_tip_res {
|
||||
Ok(height) => {
|
||||
debug!("Got Bitcoin tip: {height}");
|
||||
let is_new_bitcoin_block = *height > *current_bitcoin_block;
|
||||
*current_bitcoin_block = *height;
|
||||
is_new_bitcoin_block
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to fetch Bitcoin tip: {e}");
|
||||
false
|
||||
}
|
||||
error!("Failed to get last monitored swaps list: {e}");
|
||||
Vec::new()
|
||||
});
|
||||
let last_monitored_swaps_include_chain_swaps = last_monitored_swaps
|
||||
.iter()
|
||||
.any(|s| matches!(s, Swap::Chain(_)));
|
||||
|
||||
// Get the Bitcoin tip and process a new block (only if chain swaps are being monitored)
|
||||
let (maybe_bitcoin_tip, is_new_bitcoin_block) =
|
||||
if last_monitored_swaps_include_chain_swaps.not() {
|
||||
info!("No chain swaps being monitored, skipping bitcoin tip fetch");
|
||||
(None, false)
|
||||
} else {
|
||||
let t0 = Instant::now();
|
||||
let bitcoin_tip_res = self.bitcoin_chain_service.tip().await;
|
||||
let duration_ms = Instant::now().duration_since(t0).as_millis();
|
||||
info!("Fetched bitcoin tip at ({duration_ms} ms)");
|
||||
let is_new_bitcoin_block = match &bitcoin_tip_res {
|
||||
Ok(height) => {
|
||||
debug!("Got Bitcoin tip: {height}");
|
||||
let is_new_bitcoin_block = *height > *current_bitcoin_block;
|
||||
*current_bitcoin_block = *height;
|
||||
is_new_bitcoin_block
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to fetch Bitcoin tip: {e}");
|
||||
false
|
||||
}
|
||||
};
|
||||
(bitcoin_tip_res.ok(), is_new_bitcoin_block)
|
||||
};
|
||||
(bitcoin_tip_res.ok(), is_new_bitcoin_block)
|
||||
};
|
||||
|
||||
if let Ok(liquid_tip) = liquid_tip_res {
|
||||
self.persister
|
||||
@@ -642,7 +655,22 @@ impl LiquidSdk {
|
||||
|
||||
// Only partial sync when there are no new Liquid or Bitcoin blocks
|
||||
let partial_sync = (is_new_liquid_block || is_new_bitcoin_block).not();
|
||||
if let Err(e) = self.sync_inner(partial_sync, chain_tips).await {
|
||||
// Get the recoverable swaps list again now using the new chain tips and now knowing wether this is a partial sync or not
|
||||
let include_chain_swaps =
|
||||
if last_monitored_swaps_include_chain_swaps && chain_tips.bitcoin_tip.is_none() {
|
||||
error!("Skipping syncing of chain swaps because bitcoin tip is not available");
|
||||
false
|
||||
} else {
|
||||
true
|
||||
};
|
||||
let recoverable_swaps = self
|
||||
.get_monitored_swaps_list(partial_sync, include_chain_swaps, chain_tips)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
error!("Failed to get monitored swaps list: {e}");
|
||||
Vec::new()
|
||||
});
|
||||
if let Err(e) = self.sync_inner(recoverable_swaps, chain_tips).await {
|
||||
error!("Failed to sync while tracking new blocks: {e}");
|
||||
}
|
||||
}
|
||||
@@ -675,8 +703,14 @@ impl LiquidSdk {
|
||||
fn start_track_new_blocks_task(self: &Arc<LiquidSdk>) {
|
||||
let cloned = self.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut current_liquid_block: u32 = 0;
|
||||
let mut current_bitcoin_block: u32 = 0;
|
||||
let last_blockchain_info = cloned
|
||||
.get_info()
|
||||
.await
|
||||
.map(|i| i.blockchain_info)
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut current_liquid_block: u32 = last_blockchain_info.liquid_tip;
|
||||
let mut current_bitcoin_block: u32 = last_blockchain_info.bitcoin_tip;
|
||||
let mut shutdown_receiver = cloned.shutdown_receiver.clone();
|
||||
cloned
|
||||
.track_new_blocks(&mut current_liquid_block, &mut current_bitcoin_block)
|
||||
@@ -3571,9 +3605,15 @@ impl LiquidSdk {
|
||||
.await?)
|
||||
}
|
||||
|
||||
/// Returns a list of swaps that need to be monitored for recovery.
|
||||
///
|
||||
/// If `partial_sync` is true, only receive swaps will be considered.
|
||||
///
|
||||
/// If `include_chain_swaps` is true, the provided chain tips must include a Bitcoin tip.
|
||||
pub(crate) async fn get_monitored_swaps_list(
|
||||
&self,
|
||||
partial_sync: bool,
|
||||
include_chain_swaps: bool,
|
||||
chain_tips: ChainTips,
|
||||
) -> Result<Vec<Swap>> {
|
||||
let receive_swaps = self
|
||||
@@ -3592,16 +3632,19 @@ impl LiquidSdk {
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect();
|
||||
let all_chain_swaps: Vec<ChainSwap> = self.persister.list_chain_swaps()?;
|
||||
if all_chain_swaps.is_empty() {
|
||||
|
||||
if include_chain_swaps.not() {
|
||||
return Ok([receive_swaps, send_swaps].concat());
|
||||
}
|
||||
// There are some chain swaps, so we need to filter them out based on the bitcoin tip, which may not be available
|
||||
let Some(bitcoin_tip) = chain_tips.bitcoin_tip else {
|
||||
warn!("Skipping tracking of chain swaps because bitcoin tip is not available");
|
||||
return Ok([receive_swaps, send_swaps].concat());
|
||||
};
|
||||
let chain_swaps: Vec<Swap> = all_chain_swaps
|
||||
|
||||
let bitcoin_tip = chain_tips
|
||||
.bitcoin_tip
|
||||
.ok_or_else(|| PaymentError::Generic {
|
||||
err: "Bitcoin tip is not available".to_string(),
|
||||
})?;
|
||||
let chain_swaps: Vec<Swap> = self
|
||||
.persister
|
||||
.list_chain_swaps()?
|
||||
.into_iter()
|
||||
.filter(|swap| match swap.direction {
|
||||
Direction::Incoming => {
|
||||
@@ -3626,12 +3669,9 @@ impl LiquidSdk {
|
||||
/// it inserts or updates a corresponding entry in our Payments table.
|
||||
async fn sync_payments_with_chain_data(
|
||||
&self,
|
||||
partial_sync: bool,
|
||||
mut recoverable_swaps: Vec<Swap>,
|
||||
chain_tips: ChainTips,
|
||||
) -> Result<()> {
|
||||
let mut recoverable_swaps = self
|
||||
.get_monitored_swaps_list(partial_sync, chain_tips)
|
||||
.await?;
|
||||
let mut wallet_tx_map = self
|
||||
.recoverer
|
||||
.recover_from_onchain(&mut recoverable_swaps, Some(chain_tips))
|
||||
@@ -4018,31 +4058,44 @@ impl LiquidSdk {
|
||||
|
||||
/// Synchronizes the local state with the mempool and onchain data.
|
||||
pub async fn sync(&self, partial_sync: bool) -> SdkResult<()> {
|
||||
let liquid_tip = self.liquid_chain_service.tip().await?;
|
||||
|
||||
let last_bitcoin_tip = self.get_info().await?.blockchain_info.bitcoin_tip;
|
||||
let last_chain_tips = ChainTips {
|
||||
liquid_tip,
|
||||
bitcoin_tip: Some(last_bitcoin_tip),
|
||||
};
|
||||
let recoverable_swaps = self
|
||||
.get_monitored_swaps_list(partial_sync, true, last_chain_tips)
|
||||
.await?;
|
||||
|
||||
let chain_tips = ChainTips {
|
||||
liquid_tip: self.liquid_chain_service.tip().await?,
|
||||
liquid_tip,
|
||||
bitcoin_tip: {
|
||||
if self
|
||||
.persister
|
||||
.has_chain_swaps()
|
||||
.unwrap_or_else(|e| {
|
||||
error!(
|
||||
"Failed to check if there are chain swaps: {e} - proceeding as if not"
|
||||
);
|
||||
false
|
||||
})
|
||||
if recoverable_swaps
|
||||
.iter()
|
||||
.any(|s| matches!(s, Swap::Chain(_)))
|
||||
.not()
|
||||
{
|
||||
info!("No known chain swaps, skipping bitcoin tip fetch");
|
||||
info!("No chain swaps being monitored, skipping bitcoin tip fetch");
|
||||
None
|
||||
} else {
|
||||
self.bitcoin_chain_service.tip().await.ok()
|
||||
}
|
||||
},
|
||||
};
|
||||
self.sync_inner(partial_sync, chain_tips).await
|
||||
|
||||
// Here we run the sync on the recoverable swaps built using the last known bitcoin block
|
||||
// This may result in swaps being processed, which wouldn't if we used the current bitcoin tip,
|
||||
// but these would be swaps that seems a good idea to recover in any case.
|
||||
self.sync_inner(recoverable_swaps, chain_tips).await
|
||||
}
|
||||
|
||||
async fn sync_inner(&self, partial_sync: bool, chain_tips: ChainTips) -> SdkResult<()> {
|
||||
async fn sync_inner(
|
||||
&self,
|
||||
recoverable_swaps: Vec<Swap>,
|
||||
chain_tips: ChainTips,
|
||||
) -> SdkResult<()> {
|
||||
self.ensure_is_started().await?;
|
||||
|
||||
let t0 = Instant::now();
|
||||
@@ -4059,18 +4112,18 @@ impl LiquidSdk {
|
||||
match is_first_sync {
|
||||
true => {
|
||||
self.event_manager.pause_notifications();
|
||||
self.sync_payments_with_chain_data(partial_sync, chain_tips)
|
||||
self.sync_payments_with_chain_data(recoverable_swaps, chain_tips)
|
||||
.await?;
|
||||
self.event_manager.resume_notifications();
|
||||
self.persister.set_is_first_sync_complete(true)?;
|
||||
}
|
||||
false => {
|
||||
self.sync_payments_with_chain_data(partial_sync, chain_tips)
|
||||
self.sync_payments_with_chain_data(recoverable_swaps, chain_tips)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
let duration_ms = Instant::now().duration_since(t0).as_millis();
|
||||
info!("Synchronized (partial: {partial_sync}) with mempool and onchain data ({duration_ms} ms)");
|
||||
info!("Synchronized with mempool and onchain data ({duration_ms} ms)");
|
||||
|
||||
self.notify_event_listeners(SdkEvent::Synced).await;
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user