Implement get_sync_context

This commit is contained in:
Daniel Granhão
2025-07-16 11:41:09 +01:00
parent bdd639c861
commit c0cba7f557
2 changed files with 161 additions and 119 deletions

View File

@@ -2500,6 +2500,20 @@ impl From<electrum_client::GetBalanceRes> for BtcScriptBalance {
}
}
pub(crate) struct GetSyncContextRequest {
pub partial_sync: Option<bool>,
pub last_liquid_tip: u32,
pub last_bitcoin_tip: u32,
}
pub(crate) struct SyncContext {
pub maybe_liquid_tip: Option<u32>,
pub maybe_bitcoin_tip: Option<u32>,
pub recoverable_swaps: Vec<Swap>,
pub is_new_liquid_block: bool,
pub is_new_bitcoin_block: bool,
}
#[macro_export]
macro_rules! get_updated_fields {
($($var:ident),* $(,)?) => {{

View File

@@ -581,102 +581,47 @@ impl LiquidSdk {
current_bitcoin_block: &mut u32,
) {
info!("Track new blocks iteration started");
// Get the Liquid tip and process a new block
let t0 = Instant::now();
let liquid_tip_res = self.liquid_chain_service.tip().await;
let duration_ms = Instant::now().duration_since(t0).as_millis();
info!("Fetched liquid tip at ({duration_ms} ms)");
let is_new_liquid_block = match &liquid_tip_res {
Ok(height) => {
debug!("Got Liquid tip: {height}");
let is_new_liquid_block = *height > *current_liquid_block;
*current_liquid_block = *height;
is_new_liquid_block
}
Err(e) => {
error!("Failed to fetch Liquid tip: {e}");
false
}
let Ok(sync_context) = self
.get_sync_context(GetSyncContextRequest {
partial_sync: None,
last_liquid_tip: *current_liquid_block,
last_bitcoin_tip: *current_bitcoin_block,
})
.await
else {
error!("Failed to get sync context");
return;
};
// Get the recoverable swaps list using the last known tips
let last_monitored_swaps = self
.get_monitored_swaps_list(
false,
true,
ChainTips {
liquid_tip: *current_liquid_block,
bitcoin_tip: Some(*current_bitcoin_block),
},
)
.await
.unwrap_or_else(|e| {
error!("Failed to get last monitored swaps list: {e}");
Vec::new()
});
let last_monitored_swaps_include_chain_swaps = last_monitored_swaps
.iter()
.any(|s| matches!(s, Swap::Chain(_)));
*current_liquid_block = sync_context
.maybe_liquid_tip
.unwrap_or(*current_liquid_block);
*current_bitcoin_block = sync_context
.maybe_bitcoin_tip
.unwrap_or(*current_bitcoin_block);
// Get the Bitcoin tip and process a new block (only if chain swaps are being monitored)
let (maybe_bitcoin_tip, is_new_bitcoin_block) =
if last_monitored_swaps_include_chain_swaps.not() {
info!("No chain swaps being monitored, skipping bitcoin tip fetch");
(None, false)
} else {
let t0 = Instant::now();
let bitcoin_tip_res = self.bitcoin_chain_service.tip().await;
let duration_ms = Instant::now().duration_since(t0).as_millis();
info!("Fetched bitcoin tip at ({duration_ms} ms)");
let is_new_bitcoin_block = match &bitcoin_tip_res {
Ok(height) => {
debug!("Got Bitcoin tip: {height}");
let is_new_bitcoin_block = *height > *current_bitcoin_block;
*current_bitcoin_block = *height;
is_new_bitcoin_block
}
Err(e) => {
error!("Failed to fetch Bitcoin tip: {e}");
false
}
};
(bitcoin_tip_res.ok(), is_new_bitcoin_block)
};
if let Ok(liquid_tip) = liquid_tip_res {
if let Some(liquid_tip) = sync_context.maybe_liquid_tip {
self.persister
.update_blockchain_info(liquid_tip, maybe_bitcoin_tip)
.update_blockchain_info(liquid_tip, sync_context.maybe_bitcoin_tip)
.unwrap_or_else(|err| warn!("Could not update local tips: {err:?}"));
let chain_tips = ChainTips {
liquid_tip,
bitcoin_tip: maybe_bitcoin_tip,
};
// Only partial sync when there are no new Liquid or Bitcoin blocks
let partial_sync = (is_new_liquid_block || is_new_bitcoin_block).not();
// Get the recoverable swaps list again now using the new chain tips and now knowing wether this is a partial sync or not
let include_chain_swaps =
if last_monitored_swaps_include_chain_swaps && chain_tips.bitcoin_tip.is_none() {
error!("Skipping syncing of chain swaps because bitcoin tip is not available");
false
} else {
true
};
let recoverable_swaps = self
.get_monitored_swaps_list(partial_sync, include_chain_swaps, chain_tips)
if let Err(e) = self
.sync_inner(
sync_context.recoverable_swaps,
ChainTips {
liquid_tip,
bitcoin_tip: sync_context.maybe_bitcoin_tip,
},
)
.await
.unwrap_or_else(|e| {
error!("Failed to get monitored swaps list: {e}");
Vec::new()
});
if let Err(e) = self.sync_inner(recoverable_swaps, chain_tips).await {
{
error!("Failed to sync while tracking new blocks: {e}");
}
}
// Update swap handlers
if is_new_liquid_block {
if sync_context.is_new_liquid_block {
self.chain_swap_handler
.on_liquid_block(*current_liquid_block)
.await;
@@ -687,7 +632,7 @@ impl LiquidSdk {
.on_liquid_block(*current_liquid_block)
.await;
}
if is_new_bitcoin_block {
if sync_context.is_new_bitcoin_block {
self.chain_swap_handler
.on_bitcoin_block(*current_bitcoin_block)
.await;
@@ -3609,11 +3554,10 @@ impl LiquidSdk {
///
/// If `partial_sync` is true, only receive swaps will be considered.
///
/// If `include_chain_swaps` is true, the provided chain tips must include a Bitcoin tip.
/// If no Bitcoin tip is provided, chain swaps will not be considered.
pub(crate) async fn get_monitored_swaps_list(
&self,
partial_sync: bool,
include_chain_swaps: bool,
chain_tips: ChainTips,
) -> Result<Vec<Swap>> {
let receive_swaps = self
@@ -3633,15 +3577,10 @@ impl LiquidSdk {
.map(Into::into)
.collect();
if include_chain_swaps.not() {
let Some(bitcoin_tip) = chain_tips.bitcoin_tip else {
return Ok([receive_swaps, send_swaps].concat());
}
};
let bitcoin_tip = chain_tips
.bitcoin_tip
.ok_or_else(|| PaymentError::Generic {
err: "Bitcoin tip is not available".to_string(),
})?;
let chain_swaps: Vec<Swap> = self
.persister
.list_chain_swaps()?
@@ -4058,37 +3997,126 @@ impl LiquidSdk {
/// Synchronizes the local state with the mempool and onchain data.
pub async fn sync(&self, partial_sync: bool) -> SdkResult<()> {
let liquid_tip = self.liquid_chain_service.tip().await?;
let last_bitcoin_tip = self.get_info().await?.blockchain_info.bitcoin_tip;
let last_chain_tips = ChainTips {
liquid_tip,
bitcoin_tip: Some(last_bitcoin_tip),
};
let recoverable_swaps = self
.get_monitored_swaps_list(partial_sync, true, last_chain_tips)
let blockchain_info = self.get_info().await?.blockchain_info;
let sync_context = self
.get_sync_context(GetSyncContextRequest {
partial_sync: Some(partial_sync),
last_liquid_tip: blockchain_info.liquid_tip,
last_bitcoin_tip: blockchain_info.bitcoin_tip,
})
.await?;
let chain_tips = ChainTips {
liquid_tip,
bitcoin_tip: {
if recoverable_swaps
.iter()
.any(|s| matches!(s, Swap::Chain(_)))
.not()
{
info!("No chain swaps being monitored, skipping bitcoin tip fetch");
None
} else {
self.bitcoin_chain_service.tip().await.ok()
}
self.sync_inner(
sync_context.recoverable_swaps,
ChainTips {
liquid_tip: sync_context.maybe_liquid_tip.ok_or(SdkError::Generic {
err: "Liquid tip not available".to_string(),
})?,
bitcoin_tip: sync_context.maybe_bitcoin_tip,
},
)
.await
}
/// Computes the sync context.
///
/// # Arguments
/// * `partial_sync` - if not provided, this will infer it based on the last known tips.
/// * `last_liquid_tip` - the last known liquid tip
/// * `last_bitcoin_tip` - the last known bitcoin tip
///
/// # Returns
/// * `maybe_liquid_tip` - the current liquid tip, or `None` if the liquid tip could not be fetched
/// * `maybe_bitcoin_tip` - the current bitcoin tip, or `None` if the bitcoin tip could not be fetched
/// * `recoverable_swaps` - the recoverable swaps, which are built using the last known bitcoin tip. If
/// the bitcoin tip could not be fetched, this won't include chain swaps. If the liquid tip could not be fetched,
/// this will be an empty vector.
/// * `is_new_liquid_block` - true if the liquid tip is new
/// * `is_new_bitcoin_block` - true if the bitcoin tip is new
async fn get_sync_context(&self, req: GetSyncContextRequest) -> SdkResult<SyncContext> {
// Get the liquid tip
let t0 = Instant::now();
let liquid_tip = match self.liquid_chain_service.tip().await {
Ok(tip) => Some(tip),
Err(e) => {
error!("Failed to fetch liquid tip: {e}");
None
}
};
let duration_ms = Instant::now().duration_since(t0).as_millis();
if liquid_tip.is_some() {
info!("Fetched liquid tip in ({duration_ms} ms)");
}
// Get the recoverable swaps assuming full sync if partial sync is not provided
let mut recoverable_swaps = self
.get_monitored_swaps_list(
req.partial_sync.unwrap_or(false),
ChainTips {
liquid_tip: liquid_tip.unwrap_or(req.last_liquid_tip),
bitcoin_tip: Some(req.last_bitcoin_tip),
},
)
.await?;
let bitcoin_tip = if recoverable_swaps
.iter()
.any(|s| matches!(s, Swap::Chain(_)))
.not()
{
info!("No chain swaps being monitored, skipping bitcoin tip fetch");
None
} else {
// Get the bitcoin tip
let t0 = Instant::now();
let bitcoin_tip = match self.bitcoin_chain_service.tip().await {
Ok(tip) => Some(tip),
Err(e) => {
error!("Failed to fetch bitcoin tip: {e}");
None
}
};
let duration_ms = Instant::now().duration_since(t0).as_millis();
if bitcoin_tip.is_some() {
info!("Fetched bitcoin tip in ({duration_ms} ms)");
} else {
recoverable_swaps.retain(|s| !matches!(s, Swap::Chain(_)));
}
bitcoin_tip
};
// Here we run the sync on the recoverable swaps built using the last known bitcoin block
// This may result in swaps being processed, which wouldn't if we used the current bitcoin tip,
// but these would be swaps that seems a good idea to recover in any case.
self.sync_inner(recoverable_swaps, chain_tips).await
let is_new_liquid_block = liquid_tip.is_some_and(|lt| lt > req.last_liquid_tip);
let is_new_bitcoin_block = bitcoin_tip.is_some_and(|bt| bt > req.last_bitcoin_tip);
// Update the recoverable swaps if we previously didn't know if this is a partial sync or not
// No liquid tip means there's no point in returning recoverable swaps
if let Some(liquid_tip) = liquid_tip {
if req.partial_sync.is_none() {
let partial_sync = (is_new_liquid_block || is_new_bitcoin_block).not();
if partial_sync {
recoverable_swaps = self
.get_monitored_swaps_list(
true,
ChainTips {
liquid_tip,
bitcoin_tip: None,
},
)
.await?;
}
}
} else {
recoverable_swaps = Vec::new();
}
Ok(SyncContext {
maybe_liquid_tip: liquid_tip,
maybe_bitcoin_tip: bitcoin_tip,
recoverable_swaps,
is_new_liquid_block,
is_new_bitcoin_block,
})
}
async fn sync_inner(