From b3f1eed4292af1f65cff3cb490cdb1d2aa0b18c8 Mon Sep 17 00:00:00 2001 From: Ross Savage <551697+dangeross@users.noreply.github.com> Date: Wed, 30 Oct 2024 10:08:13 +0100 Subject: [PATCH] Prevent swap double claim (#542) * Prevent receive swap double claim * Prevent chain swap double claim --- lib/core/src/chain_swap.rs | 125 +++++++++++++++++++++----------- lib/core/src/model.rs | 10 +++ lib/core/src/persist/chain.rs | 44 +++++++++++ lib/core/src/persist/receive.rs | 44 +++++++++++ lib/core/src/receive_swap.rs | 101 ++++++++++++++++---------- 5 files changed, 241 insertions(+), 83 deletions(-) diff --git a/lib/core/src/chain_swap.rs b/lib/core/src/chain_swap.rs index d54da44..27981e7 100644 --- a/lib/core/src/chain_swap.rs +++ b/lib/core/src/chain_swap.rs @@ -298,7 +298,7 @@ impl ChainSwapHandler { .await?; if swap.accept_zero_conf { - self.claim(swap).await.map_err(|e| { + self.claim(id).await.map_err(|e| { error!("Could not cooperate Chain Swap {id} claim: {e}"); anyhow!("Could not post claim details. Err: {e:?}") })?; @@ -343,7 +343,7 @@ impl ChainSwapHandler { ); self.update_swap_info(id, Pending, Some(&transaction.id), None, None, None) .await?; - self.claim(swap).await.map_err(|e| { + self.claim(id).await.map_err(|e| { error!("Could not cooperate Chain Swap {id} claim: {e}"); anyhow!("Could not post claim details. Err: {e:?}") })?; @@ -481,7 +481,7 @@ impl ChainSwapHandler { .await?; if swap.accept_zero_conf { - self.claim(swap).await.map_err(|e| { + self.claim(id).await.map_err(|e| { error!("Could not cooperate Chain Swap {id} claim: {e}"); anyhow!("Could not post claim details. Err: {e:?}") })?; @@ -526,7 +526,7 @@ impl ChainSwapHandler { ); self.update_swap_info(id, Pending, Some(&transaction.id), None, None, None) .await?; - self.claim(swap).await.map_err(|e| { + self.claim(id).await.map_err(|e| { error!("Could not cooperate Chain Swap {id} claim: {e}"); anyhow!("Could not post claim details. Err: {e:?}") })?; @@ -690,59 +690,98 @@ impl ChainSwapHandler { Ok(()) } - async fn claim(&self, swap: &ChainSwap) -> Result<(), PaymentError> { + async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> { + let swap = self + .persister + .fetch_chain_swap_by_id(swap_id)? + .ok_or(anyhow!("No Chain Swap found for ID {swap_id}"))?; ensure_sdk!(swap.claim_tx_id.is_none(), PaymentError::AlreadyClaimed); - let swap_id = &swap.id; - debug!("Initiating claim for Chain Swap {swap_id}"); - let claim_tx = self .swapper .create_claim_tx(Swap::Chain(swap.clone()), None)?; - let claim_tx_id = match claim_tx { - // We attempt broadcasting via chain service, then fallback to Boltz - SdkTransaction::Liquid(tx) => { - let liquid_chain_service = self.liquid_chain_service.lock().await; - let broadcast_response = liquid_chain_service.broadcast(&tx, Some(swap_id)).await; - match broadcast_response { - Ok(tx_id) => tx_id.to_hex(), + + // Set the swap claim_tx_id before broadcasting. + // If another claim_tx_id has been set in the meantime, don't broadcast the claim tx + let tx_id = claim_tx.txid(); + match self.persister.set_chain_swap_claim_tx_id(swap_id, &tx_id) { + Ok(_) => { + let broadcast_res = match claim_tx { + // We attempt broadcasting via chain service, then fallback to Boltz + SdkTransaction::Liquid(tx) => { + let liquid_chain_service = self.liquid_chain_service.lock().await; + liquid_chain_service + .broadcast(&tx, Some(&swap.id)) + .await + .map(|tx_id| tx_id.to_hex()) + .or_else(|err| { + debug!( + "Could not broadcast claim tx via chain service for Chain swap {swap_id}: {err:?}" + ); + let claim_tx_hex = tx.serialize().to_lower_hex_string(); + self.swapper.broadcast_tx(self.config.network.into(), &claim_tx_hex) + }) + } + SdkTransaction::Bitcoin(tx) => { + let bitcoin_chain_service = self.bitcoin_chain_service.lock().await; + bitcoin_chain_service + .broadcast(&tx) + .map(|tx_id| tx_id.to_hex()) + .map_err(|err| PaymentError::Generic { + err: err.to_string(), + }) + } + }; + + match broadcast_res { + Ok(claim_tx_id) => { + if swap.direction == Direction::Incoming { + // We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while + // This makes the tx known to the SDK (get_info, list_payments) instantly + self.persister.insert_or_update_payment( + PaymentTxData { + tx_id: claim_tx_id.clone(), + timestamp: Some(utils::now()), + amount_sat: swap.receiver_amount_sat, + fees_sat: 0, + payment_type: PaymentType::Receive, + is_confirmed: false, + }, + None, + None, + )?; + } + + info!("Successfully broadcast claim tx {claim_tx_id} for Chain Swap {swap_id}"); + self.update_swap_info( + &swap.id, + Pending, + None, + None, + Some(&claim_tx_id), + None, + ) + .await + } Err(err) => { + // Multiple attempts to broadcast have failed. Unset the swap claim_tx_id debug!( - "Could not broadcast claim tx via chain service for Chain swap {swap_id}: {err:?}" + "Could not broadcast claim tx via swapper for Chain swap {swap_id}: {err:?}" ); - let claim_tx_hex = tx.serialize().to_lower_hex_string(); - self.swapper - .broadcast_tx(self.config.network.into(), &claim_tx_hex)? + self.persister + .unset_chain_swap_claim_tx_id(swap_id, &tx_id)?; + Err(err) } } } - SdkTransaction::Bitcoin(tx) => { - let bitcoin_chain_service = self.bitcoin_chain_service.lock().await; - bitcoin_chain_service.broadcast(&tx)?.to_hex() + Err(err) => { + debug!( + "Failed to set claim_tx_id after creating tx for Chain swap {swap_id}: txid {tx_id}" + ); + Err(err) } - }; - - if swap.direction == Direction::Incoming { - // We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while - // This makes the tx known to the SDK (get_info, list_payments) instantly - self.persister.insert_or_update_payment( - PaymentTxData { - tx_id: claim_tx_id.clone(), - timestamp: Some(utils::now()), - amount_sat: swap.receiver_amount_sat, - fees_sat: 0, - payment_type: PaymentType::Receive, - is_confirmed: false, - }, - None, - None, - )?; } - - self.update_swap_info(&swap.id, Pending, None, None, Some(&claim_tx_id), None) - .await?; - Ok(()) } pub(crate) async fn prepare_refund( diff --git a/lib/core/src/model.rs b/lib/core/src/model.rs index a4a4acb..74f67d3 100644 --- a/lib/core/src/model.rs +++ b/lib/core/src/model.rs @@ -8,6 +8,7 @@ use boltz_client::{ swaps::boltz::{ CreateChainResponse, CreateReverseResponse, CreateSubmarineResponse, Leaf, Side, SwapTree, }, + ToHex, }; use boltz_client::{BtcSwapScript, Keypair, LBtcSwapScript}; use lwk_wollet::{bitcoin::bip32, ElementsNetwork}; @@ -1492,6 +1493,15 @@ pub enum Transaction { Bitcoin(boltz_client::bitcoin::Transaction), } +impl Transaction { + pub(crate) fn txid(&self) -> String { + match self { + Transaction::Liquid(tx) => tx.txid().to_hex(), + Transaction::Bitcoin(tx) => tx.txid().to_hex(), + } + } +} + #[derive(Debug, Clone)] pub enum Utxo { Liquid( diff --git a/lib/core/src/persist/chain.rs b/lib/core/src/persist/chain.rs index adbd580..cda87af 100644 --- a/lib/core/src/persist/chain.rs +++ b/lib/core/src/persist/chain.rs @@ -264,6 +264,50 @@ impl Persister { Ok(()) } + // Only set the Chain Swap claim_tx_id if not set, otherwise return an error + pub(crate) fn set_chain_swap_claim_tx_id( + &self, + swap_id: &str, + claim_tx_id: &str, + ) -> Result<(), PaymentError> { + let con = self.get_connection()?; + let row_count = con + .execute( + "UPDATE chain_swaps + SET claim_tx_id = :claim_tx_id + WHERE id = :id AND claim_tx_id IS NULL", + named_params! { + ":id": swap_id, + ":claim_tx_id": claim_tx_id, + }, + ) + .map_err(|_| PaymentError::PersistError)?; + match row_count { + 1 => Ok(()), + _ => Err(PaymentError::AlreadyClaimed), + } + } + + // Only unset the Chain Swap claim_tx_id if set with the same tx id + pub(crate) fn unset_chain_swap_claim_tx_id( + &self, + swap_id: &str, + claim_tx_id: &str, + ) -> Result<(), PaymentError> { + let con = self.get_connection()?; + con.execute( + "UPDATE chain_swaps + SET claim_tx_id = NULL + WHERE id = :id AND claim_tx_id = :claim_tx_id", + named_params! { + ":id": swap_id, + ":claim_tx_id": claim_tx_id, + }, + ) + .map_err(|_| PaymentError::PersistError)?; + Ok(()) + } + pub(crate) fn try_handle_chain_swap_update( &self, swap_id: &str, diff --git a/lib/core/src/persist/receive.rs b/lib/core/src/persist/receive.rs index c5d6412..2d268e4 100644 --- a/lib/core/src/persist/receive.rs +++ b/lib/core/src/persist/receive.rs @@ -182,6 +182,50 @@ impl Persister { Ok(res) } + // Only set the Receive Swap claim_tx_id if not set, otherwise return an error + pub(crate) fn set_receive_swap_claim_tx_id( + &self, + swap_id: &str, + claim_tx_id: &str, + ) -> Result<(), PaymentError> { + let con = self.get_connection()?; + let row_count = con + .execute( + "UPDATE receive_swaps + SET claim_tx_id = :claim_tx_id + WHERE id = :id AND claim_tx_id IS NULL", + named_params! { + ":id": swap_id, + ":claim_tx_id": claim_tx_id, + }, + ) + .map_err(|_| PaymentError::PersistError)?; + match row_count { + 1 => Ok(()), + _ => Err(PaymentError::AlreadyClaimed), + } + } + + // Only unset the Receive Swap claim_tx_id if set with the same tx id + pub(crate) fn unset_receive_swap_claim_tx_id( + &self, + swap_id: &str, + claim_tx_id: &str, + ) -> Result<(), PaymentError> { + let con = self.get_connection()?; + con.execute( + "UPDATE receive_swaps + SET claim_tx_id = NULL + WHERE id = :id AND claim_tx_id = :claim_tx_id", + named_params! { + ":id": swap_id, + ":claim_tx_id": claim_tx_id, + }, + ) + .map_err(|_| PaymentError::PersistError)?; + Ok(()) + } + pub(crate) fn try_handle_receive_swap_update( &self, swap_id: &str, diff --git a/lib/core/src/receive_swap.rs b/lib/core/src/receive_swap.rs index 7d65956..d80155f 100644 --- a/lib/core/src/receive_swap.rs +++ b/lib/core/src/receive_swap.rs @@ -146,7 +146,7 @@ impl ReceiveSwapHandler { debug!("[Receive Swap {id}] Lockup tx fees are within acceptable range ({tx_fees} > {lower_bound_estimated_fees} sat). Proceeding with claim."); - match self.claim(&receive_swap).await { + match self.claim(id).await { Ok(_) => {} Err(err) => match err { PaymentError::AlreadyClaimed => { @@ -183,7 +183,7 @@ impl ReceiveSwapHandler { None => { self.update_swap_info(&receive_swap.id, Pending, None, None) .await?; - match self.claim(&receive_swap).await { + match self.claim(id).await { Ok(_) => {} Err(err) => match err { PaymentError::AlreadyClaimed => { @@ -246,13 +246,14 @@ impl ReceiveSwapHandler { Ok(()) } - async fn claim(&self, swap: &ReceiveSwap) -> Result<(), PaymentError> { + async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> { + let swap = self + .persister + .fetch_receive_swap_by_id(swap_id)? + .ok_or(anyhow!("No Receive Swap found for ID {swap_id}"))?; ensure_sdk!(swap.claim_tx_id.is_none(), PaymentError::AlreadyClaimed); - let swap_id = &swap.id; - info!("Initiating claim for Receive Swap {swap_id}"); - let claim_address = self.onchain_wallet.next_unused_address().await?.to_string(); let Transaction::Liquid(claim_tx) = self .swapper @@ -263,44 +264,64 @@ impl ReceiveSwapHandler { }); }; - // We attempt broadcasting via chain service, then fallback to Boltz - let liquid_chain_service = self.liquid_chain_service.lock().await; - let broadcast_response = liquid_chain_service - .broadcast(&claim_tx, Some(&swap.id)) - .await; - let claim_tx_id = match broadcast_response { - Ok(tx_id) => tx_id.to_hex(), + // Set the swap claim_tx_id before broadcasting. + // If another claim_tx_id has been set in the meantime, don't broadcast the claim tx + let tx_id = claim_tx.txid().to_hex(); + match self.persister.set_receive_swap_claim_tx_id(swap_id, &tx_id) { + Ok(_) => { + // We attempt broadcasting via chain service, then fallback to Boltz + let liquid_chain_service = self.liquid_chain_service.lock().await; + let broadcast_res = liquid_chain_service + .broadcast(&claim_tx, Some(&swap.id)) + .await + .map(|tx_id| tx_id.to_hex()) + .or_else(|err| { + debug!( + "Could not broadcast claim tx via chain service for Receive swap {swap_id}: {err:?}" + ); + let claim_tx_hex = claim_tx.serialize().to_lower_hex_string(); + self.swapper.broadcast_tx(self.config.network.into(), &claim_tx_hex) + }); + + match broadcast_res { + Ok(claim_tx_id) => { + // We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while + // This makes the tx known to the SDK (get_info, list_payments) instantly + self.persister.insert_or_update_payment( + PaymentTxData { + tx_id: claim_tx_id.clone(), + timestamp: Some(utils::now()), + amount_sat: swap.receiver_amount_sat, + fees_sat: 0, + payment_type: PaymentType::Receive, + is_confirmed: false, + }, + None, + None, + )?; + + info!("Successfully broadcast claim tx {claim_tx_id} for Receive Swap {swap_id}"); + self.update_swap_info(swap_id, Pending, Some(&claim_tx_id), None) + .await + } + Err(err) => { + // Multiple attempts to broadcast have failed. Unset the swap claim_tx_id + debug!( + "Could not broadcast claim tx via swapper for Receive swap {swap_id}: {err:?}" + ); + self.persister + .unset_receive_swap_claim_tx_id(swap_id, &tx_id)?; + Err(err) + } + } + } Err(err) => { debug!( - "Could not broadcast claim tx via chain service for Receive swap {swap_id}: {err:?}" + "Failed to set claim_tx_id after creating tx for Receive swap {swap_id}: txid {tx_id}" ); - let claim_tx_hex = claim_tx.serialize().to_lower_hex_string(); - self.swapper - .broadcast_tx(self.config.network.into(), &claim_tx_hex)? + Err(err) } - }; - - // We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while - // This makes the tx known to the SDK (get_info, list_payments) instantly - self.persister.insert_or_update_payment( - PaymentTxData { - tx_id: claim_tx_id.clone(), - timestamp: Some(utils::now()), - amount_sat: swap.receiver_amount_sat, - fees_sat: 0, - payment_type: PaymentType::Receive, - is_confirmed: false, - }, - None, - None, - )?; - - info!("Successfully broadcast claim tx {claim_tx_id} for Receive Swap {swap_id}"); - - self.update_swap_info(swap_id, Pending, Some(&claim_tx_id), None) - .await?; - - Ok(()) + } } fn validate_state_transition(