Prevent swap double claim (#542)

* Prevent receive swap double claim

* Prevent chain swap double claim
This commit is contained in:
Ross Savage
2024-10-30 10:08:13 +01:00
committed by GitHub
parent 59dfacc12d
commit b3f1eed429
5 changed files with 241 additions and 83 deletions

View File

@@ -298,7 +298,7 @@ impl ChainSwapHandler {
.await?;
if swap.accept_zero_conf {
self.claim(swap).await.map_err(|e| {
self.claim(id).await.map_err(|e| {
error!("Could not cooperate Chain Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
})?;
@@ -343,7 +343,7 @@ impl ChainSwapHandler {
);
self.update_swap_info(id, Pending, Some(&transaction.id), None, None, None)
.await?;
self.claim(swap).await.map_err(|e| {
self.claim(id).await.map_err(|e| {
error!("Could not cooperate Chain Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
})?;
@@ -481,7 +481,7 @@ impl ChainSwapHandler {
.await?;
if swap.accept_zero_conf {
self.claim(swap).await.map_err(|e| {
self.claim(id).await.map_err(|e| {
error!("Could not cooperate Chain Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
})?;
@@ -526,7 +526,7 @@ impl ChainSwapHandler {
);
self.update_swap_info(id, Pending, Some(&transaction.id), None, None, None)
.await?;
self.claim(swap).await.map_err(|e| {
self.claim(id).await.map_err(|e| {
error!("Could not cooperate Chain Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
})?;
@@ -690,59 +690,98 @@ impl ChainSwapHandler {
Ok(())
}
async fn claim(&self, swap: &ChainSwap) -> Result<(), PaymentError> {
async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> {
let swap = self
.persister
.fetch_chain_swap_by_id(swap_id)?
.ok_or(anyhow!("No Chain Swap found for ID {swap_id}"))?;
ensure_sdk!(swap.claim_tx_id.is_none(), PaymentError::AlreadyClaimed);
let swap_id = &swap.id;
debug!("Initiating claim for Chain Swap {swap_id}");
let claim_tx = self
.swapper
.create_claim_tx(Swap::Chain(swap.clone()), None)?;
let claim_tx_id = match claim_tx {
// We attempt broadcasting via chain service, then fallback to Boltz
SdkTransaction::Liquid(tx) => {
let liquid_chain_service = self.liquid_chain_service.lock().await;
let broadcast_response = liquid_chain_service.broadcast(&tx, Some(swap_id)).await;
match broadcast_response {
Ok(tx_id) => tx_id.to_hex(),
// Set the swap claim_tx_id before broadcasting.
// If another claim_tx_id has been set in the meantime, don't broadcast the claim tx
let tx_id = claim_tx.txid();
match self.persister.set_chain_swap_claim_tx_id(swap_id, &tx_id) {
Ok(_) => {
let broadcast_res = match claim_tx {
// We attempt broadcasting via chain service, then fallback to Boltz
SdkTransaction::Liquid(tx) => {
let liquid_chain_service = self.liquid_chain_service.lock().await;
liquid_chain_service
.broadcast(&tx, Some(&swap.id))
.await
.map(|tx_id| tx_id.to_hex())
.or_else(|err| {
debug!(
"Could not broadcast claim tx via chain service for Chain swap {swap_id}: {err:?}"
);
let claim_tx_hex = tx.serialize().to_lower_hex_string();
self.swapper.broadcast_tx(self.config.network.into(), &claim_tx_hex)
})
}
SdkTransaction::Bitcoin(tx) => {
let bitcoin_chain_service = self.bitcoin_chain_service.lock().await;
bitcoin_chain_service
.broadcast(&tx)
.map(|tx_id| tx_id.to_hex())
.map_err(|err| PaymentError::Generic {
err: err.to_string(),
})
}
};
match broadcast_res {
Ok(claim_tx_id) => {
if swap.direction == Direction::Incoming {
// We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while
// This makes the tx known to the SDK (get_info, list_payments) instantly
self.persister.insert_or_update_payment(
PaymentTxData {
tx_id: claim_tx_id.clone(),
timestamp: Some(utils::now()),
amount_sat: swap.receiver_amount_sat,
fees_sat: 0,
payment_type: PaymentType::Receive,
is_confirmed: false,
},
None,
None,
)?;
}
info!("Successfully broadcast claim tx {claim_tx_id} for Chain Swap {swap_id}");
self.update_swap_info(
&swap.id,
Pending,
None,
None,
Some(&claim_tx_id),
None,
)
.await
}
Err(err) => {
// Multiple attempts to broadcast have failed. Unset the swap claim_tx_id
debug!(
"Could not broadcast claim tx via chain service for Chain swap {swap_id}: {err:?}"
"Could not broadcast claim tx via swapper for Chain swap {swap_id}: {err:?}"
);
let claim_tx_hex = tx.serialize().to_lower_hex_string();
self.swapper
.broadcast_tx(self.config.network.into(), &claim_tx_hex)?
self.persister
.unset_chain_swap_claim_tx_id(swap_id, &tx_id)?;
Err(err)
}
}
}
SdkTransaction::Bitcoin(tx) => {
let bitcoin_chain_service = self.bitcoin_chain_service.lock().await;
bitcoin_chain_service.broadcast(&tx)?.to_hex()
Err(err) => {
debug!(
"Failed to set claim_tx_id after creating tx for Chain swap {swap_id}: txid {tx_id}"
);
Err(err)
}
};
if swap.direction == Direction::Incoming {
// We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while
// This makes the tx known to the SDK (get_info, list_payments) instantly
self.persister.insert_or_update_payment(
PaymentTxData {
tx_id: claim_tx_id.clone(),
timestamp: Some(utils::now()),
amount_sat: swap.receiver_amount_sat,
fees_sat: 0,
payment_type: PaymentType::Receive,
is_confirmed: false,
},
None,
None,
)?;
}
self.update_swap_info(&swap.id, Pending, None, None, Some(&claim_tx_id), None)
.await?;
Ok(())
}
pub(crate) async fn prepare_refund(

View File

@@ -8,6 +8,7 @@ use boltz_client::{
swaps::boltz::{
CreateChainResponse, CreateReverseResponse, CreateSubmarineResponse, Leaf, Side, SwapTree,
},
ToHex,
};
use boltz_client::{BtcSwapScript, Keypair, LBtcSwapScript};
use lwk_wollet::{bitcoin::bip32, ElementsNetwork};
@@ -1492,6 +1493,15 @@ pub enum Transaction {
Bitcoin(boltz_client::bitcoin::Transaction),
}
impl Transaction {
pub(crate) fn txid(&self) -> String {
match self {
Transaction::Liquid(tx) => tx.txid().to_hex(),
Transaction::Bitcoin(tx) => tx.txid().to_hex(),
}
}
}
#[derive(Debug, Clone)]
pub enum Utxo {
Liquid(

View File

@@ -264,6 +264,50 @@ impl Persister {
Ok(())
}
// Only set the Chain Swap claim_tx_id if not set, otherwise return an error
pub(crate) fn set_chain_swap_claim_tx_id(
&self,
swap_id: &str,
claim_tx_id: &str,
) -> Result<(), PaymentError> {
let con = self.get_connection()?;
let row_count = con
.execute(
"UPDATE chain_swaps
SET claim_tx_id = :claim_tx_id
WHERE id = :id AND claim_tx_id IS NULL",
named_params! {
":id": swap_id,
":claim_tx_id": claim_tx_id,
},
)
.map_err(|_| PaymentError::PersistError)?;
match row_count {
1 => Ok(()),
_ => Err(PaymentError::AlreadyClaimed),
}
}
// Only unset the Chain Swap claim_tx_id if set with the same tx id
pub(crate) fn unset_chain_swap_claim_tx_id(
&self,
swap_id: &str,
claim_tx_id: &str,
) -> Result<(), PaymentError> {
let con = self.get_connection()?;
con.execute(
"UPDATE chain_swaps
SET claim_tx_id = NULL
WHERE id = :id AND claim_tx_id = :claim_tx_id",
named_params! {
":id": swap_id,
":claim_tx_id": claim_tx_id,
},
)
.map_err(|_| PaymentError::PersistError)?;
Ok(())
}
pub(crate) fn try_handle_chain_swap_update(
&self,
swap_id: &str,

View File

@@ -182,6 +182,50 @@ impl Persister {
Ok(res)
}
// Only set the Receive Swap claim_tx_id if not set, otherwise return an error
pub(crate) fn set_receive_swap_claim_tx_id(
&self,
swap_id: &str,
claim_tx_id: &str,
) -> Result<(), PaymentError> {
let con = self.get_connection()?;
let row_count = con
.execute(
"UPDATE receive_swaps
SET claim_tx_id = :claim_tx_id
WHERE id = :id AND claim_tx_id IS NULL",
named_params! {
":id": swap_id,
":claim_tx_id": claim_tx_id,
},
)
.map_err(|_| PaymentError::PersistError)?;
match row_count {
1 => Ok(()),
_ => Err(PaymentError::AlreadyClaimed),
}
}
// Only unset the Receive Swap claim_tx_id if set with the same tx id
pub(crate) fn unset_receive_swap_claim_tx_id(
&self,
swap_id: &str,
claim_tx_id: &str,
) -> Result<(), PaymentError> {
let con = self.get_connection()?;
con.execute(
"UPDATE receive_swaps
SET claim_tx_id = NULL
WHERE id = :id AND claim_tx_id = :claim_tx_id",
named_params! {
":id": swap_id,
":claim_tx_id": claim_tx_id,
},
)
.map_err(|_| PaymentError::PersistError)?;
Ok(())
}
pub(crate) fn try_handle_receive_swap_update(
&self,
swap_id: &str,

View File

@@ -146,7 +146,7 @@ impl ReceiveSwapHandler {
debug!("[Receive Swap {id}] Lockup tx fees are within acceptable range ({tx_fees} > {lower_bound_estimated_fees} sat). Proceeding with claim.");
match self.claim(&receive_swap).await {
match self.claim(id).await {
Ok(_) => {}
Err(err) => match err {
PaymentError::AlreadyClaimed => {
@@ -183,7 +183,7 @@ impl ReceiveSwapHandler {
None => {
self.update_swap_info(&receive_swap.id, Pending, None, None)
.await?;
match self.claim(&receive_swap).await {
match self.claim(id).await {
Ok(_) => {}
Err(err) => match err {
PaymentError::AlreadyClaimed => {
@@ -246,13 +246,14 @@ impl ReceiveSwapHandler {
Ok(())
}
async fn claim(&self, swap: &ReceiveSwap) -> Result<(), PaymentError> {
async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> {
let swap = self
.persister
.fetch_receive_swap_by_id(swap_id)?
.ok_or(anyhow!("No Receive Swap found for ID {swap_id}"))?;
ensure_sdk!(swap.claim_tx_id.is_none(), PaymentError::AlreadyClaimed);
let swap_id = &swap.id;
info!("Initiating claim for Receive Swap {swap_id}");
let claim_address = self.onchain_wallet.next_unused_address().await?.to_string();
let Transaction::Liquid(claim_tx) = self
.swapper
@@ -263,44 +264,64 @@ impl ReceiveSwapHandler {
});
};
// We attempt broadcasting via chain service, then fallback to Boltz
let liquid_chain_service = self.liquid_chain_service.lock().await;
let broadcast_response = liquid_chain_service
.broadcast(&claim_tx, Some(&swap.id))
.await;
let claim_tx_id = match broadcast_response {
Ok(tx_id) => tx_id.to_hex(),
// Set the swap claim_tx_id before broadcasting.
// If another claim_tx_id has been set in the meantime, don't broadcast the claim tx
let tx_id = claim_tx.txid().to_hex();
match self.persister.set_receive_swap_claim_tx_id(swap_id, &tx_id) {
Ok(_) => {
// We attempt broadcasting via chain service, then fallback to Boltz
let liquid_chain_service = self.liquid_chain_service.lock().await;
let broadcast_res = liquid_chain_service
.broadcast(&claim_tx, Some(&swap.id))
.await
.map(|tx_id| tx_id.to_hex())
.or_else(|err| {
debug!(
"Could not broadcast claim tx via chain service for Receive swap {swap_id}: {err:?}"
);
let claim_tx_hex = claim_tx.serialize().to_lower_hex_string();
self.swapper.broadcast_tx(self.config.network.into(), &claim_tx_hex)
});
match broadcast_res {
Ok(claim_tx_id) => {
// We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while
// This makes the tx known to the SDK (get_info, list_payments) instantly
self.persister.insert_or_update_payment(
PaymentTxData {
tx_id: claim_tx_id.clone(),
timestamp: Some(utils::now()),
amount_sat: swap.receiver_amount_sat,
fees_sat: 0,
payment_type: PaymentType::Receive,
is_confirmed: false,
},
None,
None,
)?;
info!("Successfully broadcast claim tx {claim_tx_id} for Receive Swap {swap_id}");
self.update_swap_info(swap_id, Pending, Some(&claim_tx_id), None)
.await
}
Err(err) => {
// Multiple attempts to broadcast have failed. Unset the swap claim_tx_id
debug!(
"Could not broadcast claim tx via swapper for Receive swap {swap_id}: {err:?}"
);
self.persister
.unset_receive_swap_claim_tx_id(swap_id, &tx_id)?;
Err(err)
}
}
}
Err(err) => {
debug!(
"Could not broadcast claim tx via chain service for Receive swap {swap_id}: {err:?}"
"Failed to set claim_tx_id after creating tx for Receive swap {swap_id}: txid {tx_id}"
);
let claim_tx_hex = claim_tx.serialize().to_lower_hex_string();
self.swapper
.broadcast_tx(self.config.network.into(), &claim_tx_hex)?
Err(err)
}
};
// We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while
// This makes the tx known to the SDK (get_info, list_payments) instantly
self.persister.insert_or_update_payment(
PaymentTxData {
tx_id: claim_tx_id.clone(),
timestamp: Some(utils::now()),
amount_sat: swap.receiver_amount_sat,
fees_sat: 0,
payment_type: PaymentType::Receive,
is_confirmed: false,
},
None,
None,
)?;
info!("Successfully broadcast claim tx {claim_tx_id} for Receive Swap {swap_id}");
self.update_swap_info(swap_id, Pending, Some(&claim_tx_id), None)
.await?;
Ok(())
}
}
fn validate_state_transition(