mirror of
https://github.com/aljazceru/breez-sdk-liquid.git
synced 2026-01-03 06:14:19 +01:00
fix: double-lockup when payment is TimedOut (#541)
This commit is contained in:
@@ -239,6 +239,49 @@ impl Persister {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn set_send_swap_lockup_tx_id(
|
||||
&self,
|
||||
swap_id: &str,
|
||||
lockup_tx_id: &str,
|
||||
) -> Result<(), PaymentError> {
|
||||
let con = self.get_connection()?;
|
||||
|
||||
let row_count = con
|
||||
.execute(
|
||||
"UPDATE send_swaps
|
||||
SET lockup_tx_id = :lockup_tx_id
|
||||
WHERE id = :id AND lockup_tx_id IS NULL",
|
||||
named_params! {
|
||||
":id": swap_id,
|
||||
":lockup_tx_id": lockup_tx_id,
|
||||
},
|
||||
)
|
||||
.map_err(|_| PaymentError::PersistError)?;
|
||||
match row_count {
|
||||
1 => Ok(()),
|
||||
_ => Err(PaymentError::PaymentInProgress),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn unset_send_swap_lockup_tx_id(
|
||||
&self,
|
||||
swap_id: &str,
|
||||
lockup_tx_id: &str,
|
||||
) -> Result<(), PaymentError> {
|
||||
let con = self.get_connection()?;
|
||||
con.execute(
|
||||
"UPDATE send_swaps
|
||||
SET lockup_tx_id = NULL
|
||||
WHERE id = :id AND lockup_tx_id = :lockup_tx_id",
|
||||
named_params! {
|
||||
":id": swap_id,
|
||||
":lockup_tx_id": lockup_tx_id,
|
||||
},
|
||||
)
|
||||
.map_err(|_| PaymentError::PersistError)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
|
||||
@@ -1003,14 +1003,20 @@ impl LiquidSdk {
|
||||
|
||||
let swap = match self.persister.fetch_send_swap_by_invoice(invoice)? {
|
||||
Some(swap) => match swap.state {
|
||||
Created => swap,
|
||||
TimedOut => {
|
||||
self.send_swap_handler
|
||||
.update_swap_info(&swap.id, PaymentState::Created, None, None, None)
|
||||
.await?;
|
||||
swap
|
||||
}
|
||||
Pending => return Err(PaymentError::PaymentInProgress),
|
||||
Complete => return Err(PaymentError::AlreadyPaid),
|
||||
RefundPending | Failed => {
|
||||
RefundPending | Refundable | Failed => {
|
||||
return Err(PaymentError::invalid_invoice(
|
||||
"Payment has already failed. Please try with another invoice",
|
||||
))
|
||||
}
|
||||
_ => swap,
|
||||
},
|
||||
None => {
|
||||
let keypair = utils::generate_keypair();
|
||||
@@ -1069,8 +1075,12 @@ impl LiquidSdk {
|
||||
};
|
||||
self.status_stream.track_swap_id(&swap.id)?;
|
||||
|
||||
let accept_zero_conf = swap.get_boltz_create_response()?.accept_zero_conf;
|
||||
self.wait_for_payment(Swap::Send(swap), accept_zero_conf)
|
||||
let create_response = swap.get_boltz_create_response()?;
|
||||
self.send_swap_handler
|
||||
.try_lockup(&swap, &create_response)
|
||||
.await?;
|
||||
|
||||
self.wait_for_payment(Swap::Send(swap), create_response.accept_zero_conf)
|
||||
.await
|
||||
.map(|payment| SendPaymentResponse { payment })
|
||||
}
|
||||
@@ -1308,8 +1318,9 @@ impl LiquidSdk {
|
||||
webhook,
|
||||
})?;
|
||||
|
||||
let swap_id = &create_response.id;
|
||||
let create_response_json = ChainSwap::from_boltz_struct_to_json(&create_response, swap_id)?;
|
||||
let create_response_json =
|
||||
ChainSwap::from_boltz_struct_to_json(&create_response, &create_response.id)?;
|
||||
let swap_id = create_response.id;
|
||||
|
||||
let accept_zero_conf = server_lockup_amount_sat <= pair.limits.maximal_zero_conf;
|
||||
let payer_amount_sat = req.prepare_response.total_fees_sat + receiver_amount_sat;
|
||||
@@ -1337,7 +1348,7 @@ impl LiquidSdk {
|
||||
state: PaymentState::Created,
|
||||
};
|
||||
self.persister.insert_chain_swap(&swap)?;
|
||||
self.status_stream.track_swap_id(&swap.id)?;
|
||||
self.status_stream.track_swap_id(&swap_id)?;
|
||||
|
||||
self.wait_for_payment(Swap::Chain(swap), accept_zero_conf)
|
||||
.await
|
||||
@@ -2701,20 +2712,6 @@ mod tests {
|
||||
assert_eq!(persisted_swap.state, PaymentState::Failed);
|
||||
}
|
||||
|
||||
// Verify that `InvoiceSet` correctly sets the state to `Pending` and
|
||||
// assigns the `lockup_tx_id` to the payment
|
||||
let persisted_swap = trigger_swap_update!(
|
||||
"send",
|
||||
NewSwapArgs::default(),
|
||||
persister,
|
||||
status_stream,
|
||||
SubSwapStates::InvoiceSet,
|
||||
None,
|
||||
None
|
||||
);
|
||||
assert_eq!(persisted_swap.state, PaymentState::Pending);
|
||||
assert!(persisted_swap.lockup_tx_id.is_some());
|
||||
|
||||
// Verify that `TransactionClaimPending` correctly sets the state to `Complete`
|
||||
// and stores the preimage
|
||||
let persisted_swap = trigger_swap_update!(
|
||||
|
||||
@@ -15,13 +15,13 @@ use tokio::sync::{broadcast, Mutex};
|
||||
|
||||
use crate::chain::liquid::LiquidChainService;
|
||||
use crate::model::{Config, PaymentState::*, SendSwap};
|
||||
use crate::prelude::Swap;
|
||||
use crate::prelude::{PaymentTxData, PaymentType, Swap};
|
||||
use crate::swapper::Swapper;
|
||||
use crate::wallet::OnchainWallet;
|
||||
use crate::{ensure_sdk, utils};
|
||||
use crate::{
|
||||
error::PaymentError,
|
||||
model::{PaymentState, PaymentTxData, PaymentType, Transaction as SdkTransaction},
|
||||
model::{PaymentState, Transaction as SdkTransaction},
|
||||
persist::Persister,
|
||||
};
|
||||
|
||||
@@ -71,40 +71,9 @@ impl SendSwapHandler {
|
||||
|
||||
// See https://docs.boltz.exchange/v/api/lifecycle#normal-submarine-swaps
|
||||
match SubSwapStates::from_str(swap_state) {
|
||||
// Boltz has locked the HTLC, we proceed with locking up the funds
|
||||
// Boltz has locked the HTLC
|
||||
Ok(SubSwapStates::InvoiceSet) => {
|
||||
match (swap.state, swap.lockup_tx_id.clone()) {
|
||||
(PaymentState::Created, None) | (PaymentState::TimedOut, None) => {
|
||||
let create_response = swap.get_boltz_create_response()?;
|
||||
let lockup_tx = self.lockup_funds(id, &create_response).await?;
|
||||
let lockup_tx_id = lockup_tx.txid().to_string();
|
||||
let lockup_tx_fees_sat: u64 = lockup_tx.all_fees().values().sum();
|
||||
|
||||
// We insert a pseudo-lockup-tx in case LWK fails to pick up the new mempool tx for a while
|
||||
// This makes the tx known to the SDK (get_info, list_payments) instantly
|
||||
self.persister.insert_or_update_payment(
|
||||
PaymentTxData {
|
||||
tx_id: lockup_tx_id.clone(),
|
||||
timestamp: Some(utils::now()),
|
||||
amount_sat: swap.payer_amount_sat,
|
||||
fees_sat: lockup_tx_fees_sat,
|
||||
payment_type: PaymentType::Send,
|
||||
is_confirmed: false,
|
||||
},
|
||||
None,
|
||||
None,
|
||||
)?;
|
||||
|
||||
self.update_swap_info(id, Pending, None, Some(&lockup_tx_id), None)
|
||||
.await?;
|
||||
}
|
||||
(_, Some(lockup_tx_id)) => {
|
||||
warn!("Lockup tx for Send Swap {id} was already broadcast: txid {lockup_tx_id}")
|
||||
}
|
||||
(state, _) => {
|
||||
debug!("Send Swap {id} is in an invalid state for {swap_state}: {state:?}")
|
||||
}
|
||||
}
|
||||
warn!("Received `invoice.set` state for Send Swap {id}");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -203,11 +172,17 @@ impl SendSwapHandler {
|
||||
}
|
||||
}
|
||||
|
||||
async fn lockup_funds(
|
||||
pub(crate) async fn try_lockup(
|
||||
&self,
|
||||
swap_id: &str,
|
||||
swap: &SendSwap,
|
||||
create_response: &CreateSubmarineResponse,
|
||||
) -> Result<Transaction, PaymentError> {
|
||||
if swap.lockup_tx_id.is_some() {
|
||||
debug!("Lockup tx was already broadcast for Send Swap {}", swap.id);
|
||||
return Err(PaymentError::PaymentInProgress);
|
||||
}
|
||||
|
||||
let swap_id = &swap.id;
|
||||
debug!(
|
||||
"Initiated Send Swap: send {} sats to liquid address {}",
|
||||
create_response.expected_amount, create_response.address
|
||||
@@ -223,17 +198,48 @@ impl SendSwapHandler {
|
||||
create_response.expected_amount,
|
||||
)
|
||||
.await?;
|
||||
let lockup_tx_id = lockup_tx.txid().to_string();
|
||||
|
||||
info!("broadcasting lockup tx {}", lockup_tx.txid());
|
||||
let lockup_tx_id = self
|
||||
self.persister
|
||||
.set_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
|
||||
|
||||
info!("Broadcasting lockup tx {lockup_tx_id} for Send swap {swap_id}",);
|
||||
|
||||
let broadcast_result = self
|
||||
.chain_service
|
||||
.lock()
|
||||
.await
|
||||
.broadcast(&lockup_tx, Some(swap_id))
|
||||
.await?
|
||||
.to_string();
|
||||
.await;
|
||||
|
||||
if let Err(err) = broadcast_result {
|
||||
debug!("Could not broadcast lockup tx for Send Swap {swap_id}: {err:?}");
|
||||
self.persister
|
||||
.unset_send_swap_lockup_tx_id(swap_id, &lockup_tx_id)?;
|
||||
return Err(err.into());
|
||||
}
|
||||
|
||||
info!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}");
|
||||
|
||||
// We insert a pseudo-lockup-tx in case LWK fails to pick up the new mempool tx for a while
|
||||
// This makes the tx known to the SDK (get_info, list_payments) instantly
|
||||
let lockup_tx_fees_sat: u64 = lockup_tx.all_fees().values().sum();
|
||||
self.persister.insert_or_update_payment(
|
||||
PaymentTxData {
|
||||
tx_id: lockup_tx_id.clone(),
|
||||
timestamp: Some(utils::now()),
|
||||
amount_sat: swap.payer_amount_sat,
|
||||
fees_sat: lockup_tx_fees_sat,
|
||||
payment_type: PaymentType::Send,
|
||||
is_confirmed: false,
|
||||
},
|
||||
None,
|
||||
None,
|
||||
)?;
|
||||
|
||||
self.update_swap_info(swap_id, Pending, None, Some(&lockup_tx_id), None)
|
||||
.await?;
|
||||
|
||||
Ok(lockup_tx)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user