feat: add cooperative preimage recovery and optimize flow (#669)

This commit is contained in:
yse
2025-01-17 14:32:58 +01:00
committed by GitHub
parent bc779e3269
commit b185ff0e35
11 changed files with 147 additions and 76 deletions

2
lib/Cargo.lock generated
View File

@@ -776,7 +776,7 @@ dependencies = [
[[package]]
name = "boltz-client"
version = "0.1.3"
source = "git+https://github.com/SatoshiPortal/boltz-rust?rev=3bbc0ddb068df7f12a1b8b37cfbb353f4db36fe5#3bbc0ddb068df7f12a1b8b37cfbb353f4db36fe5"
source = "git+https://github.com/hydra-yse/boltz-rust?rev=5869f2444280890716b756adaeaef2942b8041a3#5869f2444280890716b756adaeaef2942b8041a3"
dependencies = [
"bip39",
"bitcoin 0.31.2",

View File

@@ -17,7 +17,7 @@ workspace = true
[dependencies]
anyhow = { workspace = true }
bip39 = "2.0.0"
boltz-client = { git = "https://github.com/SatoshiPortal/boltz-rust", rev = "3bbc0ddb068df7f12a1b8b37cfbb353f4db36fe5" }
boltz-client = { git = "https://github.com/hydra-yse/boltz-rust", rev = "5869f2444280890716b756adaeaef2942b8041a3" }
chrono = "0.4"
derivative = "2.2.0"
env_logger = "0.11"

View File

@@ -65,6 +65,7 @@ pub(crate) struct RecoveredOnchainDataSend {
pub(crate) lockup_tx_id: Option<HistoryTxId>,
pub(crate) claim_tx_id: Option<HistoryTxId>,
pub(crate) refund_tx_id: Option<HistoryTxId>,
pub(crate) preimage: Option<String>,
}
impl RecoveredOnchainDataSend {

View File

@@ -14,6 +14,7 @@ use tokio::sync::Mutex;
use super::model::*;
use crate::prelude::{Direction, Swap};
use crate::swapper::Swapper;
use crate::wallet::OnchainWallet;
use crate::{
chain::{bitcoin::BitcoinChainService, liquid::LiquidChainService},
@@ -23,6 +24,7 @@ use crate::{
pub(crate) struct Recoverer {
master_blinding_key: MasterBlindingKey,
swapper: Arc<dyn Swapper>,
onchain_wallet: Arc<dyn OnchainWallet>,
liquid_chain_service: Arc<Mutex<dyn LiquidChainService>>,
bitcoin_chain_service: Arc<Mutex<dyn BitcoinChainService>>,
@@ -31,6 +33,7 @@ pub(crate) struct Recoverer {
impl Recoverer {
pub(crate) fn new(
master_blinding_key: Vec<u8>,
swapper: Arc<dyn Swapper>,
onchain_wallet: Arc<dyn OnchainWallet>,
liquid_chain_service: Arc<Mutex<dyn LiquidChainService>>,
bitcoin_chain_service: Arc<Mutex<dyn BitcoinChainService>>,
@@ -39,18 +42,40 @@ impl Recoverer {
master_blinding_key: MasterBlindingKey::from_hex(
&master_blinding_key.to_lower_hex_string(),
)?,
swapper,
onchain_wallet,
liquid_chain_service,
bitcoin_chain_service,
})
}
async fn recover_preimages(
fn recover_cooperative_preimages(
&self,
claim_tx_ids_by_swap_id: HashMap<&String, Txid>,
) -> Result<HashMap<String, String>> {
let claim_tx_ids: Vec<Txid> = claim_tx_ids_by_swap_id.values().copied().collect();
recovered_send_data: &mut HashMap<String, &mut RecoveredOnchainDataSend>,
) -> HashMap<String, Txid> {
let mut failed = HashMap::new();
for (swap_id, recovered_data) in recovered_send_data {
let Some(claim_tx_id) = &recovered_data.claim_tx_id else {
continue;
};
match self.swapper.get_submarine_preimage(swap_id) {
Ok(preimage) => recovered_data.preimage = Some(preimage),
Err(err) => {
warn!("Could not recover Send swap {swap_id} preimage cooperatively: {err:?}");
failed.insert(swap_id.clone(), claim_tx_id.txid);
}
}
}
failed
}
async fn recover_non_cooperative_preimages(
&self,
recovered_send_data: &mut HashMap<String, &mut RecoveredOnchainDataSend>,
failed_cooperative: HashMap<String, Txid>,
) -> Result<()> {
let claim_tx_ids: Vec<Txid> = failed_cooperative.values().cloned().collect();
let claim_txs = self
.liquid_chain_service
.lock()
@@ -66,25 +91,40 @@ impl Recoverer {
anyhow!("Got {claim_txs_len} send claim transactions, expected {claim_tx_ids_len}")
);
let claim_txs_by_swap_id: HashMap<&String, lwk_wollet::elements::Transaction> =
claim_tx_ids_by_swap_id.into_keys().zip(claim_txs).collect();
let claim_txs_by_swap_id: HashMap<String, lwk_wollet::elements::Transaction> =
failed_cooperative.into_keys().zip(claim_txs).collect();
let mut preimages = HashMap::new();
for (swap_id, claim_tx) in claim_txs_by_swap_id {
match Self::get_send_swap_preimage_from_claim_tx(swap_id, &claim_tx) {
Ok(preimage) => {
preimages.insert(swap_id.to_string(), preimage);
}
let Some(recovered_data) = recovered_send_data.get_mut(&swap_id) else {
continue;
};
match Self::get_send_swap_preimage_from_claim_tx(&swap_id, &claim_tx) {
Ok(preimage) => recovered_data.preimage = Some(preimage),
Err(e) => {
debug!(
"Couldn't get swap preimage from claim tx {} for swap {swap_id}: {e} - \
could be a cooperative claim tx",
error!(
"Couldn't get non-cooperative swap preimage from claim tx {} for swap {swap_id}: {e}",
claim_tx.txid()
);
// Keep only claim tx for which there is a recovered or synced preimage
recovered_data.claim_tx_id = None;
}
}
}
Ok(preimages)
Ok(())
}
async fn recover_preimages(
&self,
mut recovered_send_data: HashMap<String, &mut RecoveredOnchainDataSend>,
) -> Result<()> {
// Recover the preimages by querying the swapper, only if there is a claim_tx_id
let failed_cooperative = self.recover_cooperative_preimages(&mut recovered_send_data);
// For those which failed, recover the preimages by querying onchain (non-cooperative case)
self.recover_non_cooperative_preimages(&mut recovered_send_data, failed_cooperative)
.await
}
pub(crate) fn get_send_swap_preimage_from_claim_tx(
@@ -132,44 +172,19 @@ impl Recoverer {
let histories = self.fetch_swaps_histories(&swaps_list).await?;
let mut recovered_send_data = self.recover_send_swap_tx_ids(&tx_map, histories.send)?;
let recovered_send_with_claim_tx = recovered_send_data
.iter()
.filter_map(|(swap_id, send_data)| {
send_data
.claim_tx_id
.clone()
.map(|claim_tx_id| (swap_id, claim_tx_id.txid))
})
.collect::<HashMap<&String, Txid>>();
let mut recovered_preimages = self.recover_preimages(recovered_send_with_claim_tx).await?;
// Keep only verified preimages
recovered_preimages.retain(|swap_id, preimage| {
if let Some(Swap::Send(send_swap)) = swaps.iter().find(|s| s.id() == *swap_id) {
match utils::verify_payment_hash(preimage, &send_swap.invoice) {
Ok(_) => true,
Err(e) => {
error!("Failed to verify recovered preimage for swap {swap_id}: {e}");
false
let recovered_send_data_without_preimage = recovered_send_data
.iter_mut()
.filter_map(|(swap_id, recovered_data)| {
if let Some(Swap::Send(send_swap)) = swaps.iter().find(|s| s.id() == *swap_id) {
if send_swap.preimage.is_none() {
return Some((swap_id.clone(), recovered_data));
}
}
} else {
false
}
});
// Keep only claim tx for which there is a recovered or synced preimage
for (swap_id, send_data) in recovered_send_data.iter_mut() {
if let Some(Swap::Send(send_swap)) = swaps.iter().find(|s| s.id() == *swap_id) {
if send_data.claim_tx_id.is_some()
&& !recovered_preimages.contains_key(swap_id)
&& send_swap.preimage.is_none()
{
error!(
"Seemingly found a claim tx but no preimage for swap {swap_id}. Ignoring claim tx."
);
send_data.claim_tx_id = None;
}
}
}
None
})
.collect::<HashMap<String, &mut RecoveredOnchainDataSend>>();
self.recover_preimages(recovered_send_data_without_preimage)
.await?;
let recovered_receive_data = self.recover_receive_swap_tx_ids(
&tx_map,
@@ -194,15 +209,10 @@ impl Recoverer {
let swap_id = &swap.id();
match swap {
Swap::Send(send_swap) => {
let Some(recovered_data) = recovered_send_data.get(swap_id) else {
let Some(recovered_data) = recovered_send_data.get_mut(swap_id) else {
log::warn!("Could not apply recovered data for Send swap {swap_id}: recovery data not found");
continue;
};
let timeout_block_height = send_swap.timeout_block_height as u32;
let is_expired = liquid_tip >= timeout_block_height;
if let Some(new_state) = recovered_data.derive_partial_state(is_expired) {
send_swap.state = new_state;
}
send_swap.lockup_tx_id = recovered_data
.lockup_tx_id
.clone()
@@ -211,8 +221,28 @@ impl Recoverer {
.refund_tx_id
.clone()
.map(|h| h.txid.to_string());
if let Some(preimage) = recovered_preimages.remove(swap_id) {
send_swap.preimage = Some(preimage);
match (&send_swap.preimage, &recovered_data.preimage) {
// Update the preimage only if we don't have one already (e.g. from
// real-time sync)
(Some(_), _) | (None, None) => {}
// Keep only verified preimages
(None, Some(recovered_preimage)) => {
match utils::verify_payment_hash(recovered_preimage, &send_swap.invoice)
{
Ok(_) => send_swap.preimage = Some(recovered_preimage.clone()),
Err(e) => {
error!("Failed to verify recovered preimage for swap {swap_id}: {e}");
recovered_data.claim_tx_id = None;
}
}
}
}
// Set the state only AFTER the preimage and claim_tx_id have been verified
let timeout_block_height = send_swap.timeout_block_height as u32;
let is_expired = liquid_tip >= timeout_block_height;
if let Some(new_state) = recovered_data.derive_partial_state(is_expired) {
send_swap.state = new_state;
}
}
Swap::Receive(receive_swap) => {
@@ -441,6 +471,7 @@ impl Recoverer {
lockup_tx_id,
claim_tx_id,
refund_tx_id,
preimage: None,
},
);
}

View File

@@ -195,8 +195,19 @@ impl LiquidSdk {
signer.clone(),
)?);
let event_manager = Arc::new(EventManager::new());
let (shutdown_sender, shutdown_receiver) = watch::channel::<()>(());
if let Some(swapper_proxy_url) = swapper_proxy_url {
persister.set_swapper_proxy_url(swapper_proxy_url)?;
}
let cached_swapper_proxy_url = persister.get_swapper_proxy_url()?;
let swapper = Arc::new(BoltzSwapper::new(config.clone(), cached_swapper_proxy_url));
let status_stream = Arc::<dyn SwapperStatusStream>::from(swapper.create_status_stream());
let recoverer = Arc::new(Recoverer::new(
signer.slip77_master_blinding_key()?,
swapper.clone(),
onchain_wallet.clone(),
liquid_chain_service.clone(),
bitcoin_chain_service.clone(),
@@ -212,16 +223,6 @@ impl LiquidSdk {
sync_trigger_rx,
));
let event_manager = Arc::new(EventManager::new());
let (shutdown_sender, shutdown_receiver) = watch::channel::<()>(());
if let Some(swapper_proxy_url) = swapper_proxy_url {
persister.set_swapper_proxy_url(swapper_proxy_url)?;
}
let cached_swapper_proxy_url = persister.get_swapper_proxy_url()?;
let swapper = Arc::new(BoltzSwapper::new(config.clone(), cached_swapper_proxy_url));
let status_stream = Arc::<dyn SwapperStatusStream>::from(swapper.create_status_stream());
let send_swap_handler = SendSwapHandler::new(
config.clone(),
onchain_wallet.clone(),

View File

@@ -202,6 +202,11 @@ impl Swapper for BoltzSwapper {
Ok(self.client.get_submarine_pairs()?.get_lbtc_to_btc_pair())
}
/// Get a submarine swap's preimage
fn get_submarine_preimage(&self, swap_id: &str) -> Result<String, PaymentError> {
Ok(self.client.get_submarine_preimage(swap_id)?.preimage)
}
/// Get claim tx details which includes the preimage as a proof of payment.
/// It is used to validate the preimage before claiming which is the reason why we need to separate
/// the claim into two steps.

View File

@@ -57,6 +57,9 @@ pub trait Swapper: Send + Sync {
/// Get a submarine pair information
fn get_submarine_pairs(&self) -> Result<Option<SubmarinePair>, PaymentError>;
/// Get a submarine swap's preimage
fn get_submarine_preimage(&self, swap_id: &str) -> Result<String, PaymentError>;
/// Get send swap claim tx details which includes the preimage as a proof of payment.
/// It is used to validate the preimage before claiming which is the reason why we need to separate
/// the claim into two steps.

View File

@@ -455,6 +455,7 @@ mod tests {
chain_swap::new_chain_swap,
persist::{create_persister, new_receive_swap, new_send_swap},
recover::new_recoverer,
swapper::MockSwapper,
sync::{
new_chain_sync_data, new_receive_sync_data, new_send_sync_data, new_sync_service,
},
@@ -468,8 +469,13 @@ mod tests {
async fn test_incoming_sync_create_and_update() -> Result<()> {
create_persister!(persister);
let signer: Arc<Box<dyn Signer>> = Arc::new(Box::new(MockSigner::new()?));
let swapper = Arc::new(MockSwapper::new());
let onchain_wallet = Arc::new(MockWallet::new(signer.clone())?);
let recoverer = Arc::new(new_recoverer(signer.clone(), onchain_wallet.clone())?);
let recoverer = Arc::new(new_recoverer(
signer.clone(),
swapper.clone(),
onchain_wallet.clone(),
)?);
let sync_data = vec![
SyncData::Receive(new_receive_sync_data()),
@@ -563,8 +569,13 @@ mod tests {
async fn test_outgoing_sync() -> Result<()> {
create_persister!(persister);
let signer: Arc<Box<dyn Signer>> = Arc::new(Box::new(MockSigner::new()?));
let swapper = Arc::new(MockSwapper::new());
let onchain_wallet = Arc::new(MockWallet::new(signer.clone())?);
let recoverer = Arc::new(new_recoverer(signer.clone(), onchain_wallet.clone())?);
let recoverer = Arc::new(new_recoverer(
signer.clone(),
swapper.clone(),
onchain_wallet.clone(),
)?);
let (_incoming_tx, outgoing_records, sync_service) =
new_sync_service(persister.clone(), recoverer, signer.clone())?;
@@ -676,8 +687,13 @@ mod tests {
async fn test_sync_clean() -> Result<()> {
create_persister!(persister);
let signer: Arc<Box<dyn Signer>> = Arc::new(Box::new(MockSigner::new()?));
let swapper = Arc::new(MockSwapper::new());
let onchain_wallet = Arc::new(MockWallet::new(signer.clone())?);
let recoverer = Arc::new(new_recoverer(signer.clone(), onchain_wallet.clone())?);
let recoverer = Arc::new(new_recoverer(
signer.clone(),
swapper.clone(),
onchain_wallet.clone(),
)?);
let (incoming_tx, _outgoing_records, sync_service) =
new_sync_service(persister.clone(), recoverer, signer.clone())?;
@@ -738,8 +754,13 @@ mod tests {
async fn test_last_derivation_index_update() -> Result<()> {
create_persister!(persister);
let signer: Arc<Box<dyn Signer>> = Arc::new(Box::new(MockSigner::new()?));
let swapper = Arc::new(MockSwapper::new());
let onchain_wallet = Arc::new(MockWallet::new(signer.clone())?);
let recoverer = Arc::new(new_recoverer(signer.clone(), onchain_wallet.clone())?);
let recoverer = Arc::new(new_recoverer(
signer.clone(),
swapper.clone(),
onchain_wallet.clone(),
)?);
let (incoming_tx, outgoing_records, sync_service) =
new_sync_service(persister.clone(), recoverer, signer.clone())?;

View File

@@ -3,12 +3,15 @@ use std::sync::Arc;
use anyhow::Result;
use tokio::sync::Mutex;
use crate::{model::Signer, recover::recoverer::Recoverer, wallet::OnchainWallet};
use crate::{
model::Signer, recover::recoverer::Recoverer, swapper::Swapper, wallet::OnchainWallet,
};
use super::chain::{MockBitcoinChainService, MockLiquidChainService};
pub(crate) fn new_recoverer(
signer: Arc<Box<dyn Signer>>,
swapper: Arc<dyn Swapper>,
onchain_wallet: Arc<dyn OnchainWallet>,
) -> Result<Recoverer> {
let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new()));
@@ -16,6 +19,7 @@ pub(crate) fn new_recoverer(
Recoverer::new(
signer.slip77_master_blinding_key()?,
swapper,
onchain_wallet,
liquid_chain_service,
bitcoin_chain_service,

View File

@@ -90,6 +90,7 @@ pub(crate) fn new_liquid_sdk_with_chain_services(
let recoverer = Arc::new(Recoverer::new(
signer.slip77_master_blinding_key()?,
swapper.clone(),
onchain_wallet.clone(),
liquid_chain_service.clone(),
bitcoin_chain_service.clone(),

View File

@@ -178,6 +178,10 @@ impl Swapper for MockSwapper {
Ok((test_pair.clone(), test_pair))
}
fn get_submarine_preimage(&self, _swap_id: &str) -> Result<String, PaymentError> {
Ok(Preimage::new().to_string().unwrap())
}
fn get_submarine_pairs(&self) -> Result<Option<SubmarinePair>, PaymentError> {
Ok(Some(SubmarinePair {
hash: generate_random_string(10),