Add tx propagation grace period to swap recovery (#733)

* Add tx propagation grace period to swap recovery

* Add missing tx reset checks and improve logs
This commit is contained in:
Daniel Granhão
2025-02-17 13:23:32 +00:00
committed by GitHub
parent c686e44dfe
commit 3ec57ad93e
13 changed files with 180 additions and 68 deletions

View File

@@ -94,21 +94,19 @@ impl ChainSwapHandler {
let id = &update.id;
let swap = self.fetch_chain_swap_by_id(id)?;
if let Some(sync_state) = self.persister.get_sync_state_by_data_id(&swap.id)? {
if !sync_state.is_local {
let status = &update.status;
let swap_state = ChainSwapStates::from_str(status)
.map_err(|_| anyhow!("Invalid ChainSwapState for Chain Swap {id}: {status}"))?;
if !swap.metadata.is_local {
let status = &update.status;
let swap_state = ChainSwapStates::from_str(status)
.map_err(|_| anyhow!("Invalid ChainSwapState for Chain Swap {id}: {status}"))?;
match swap_state {
// If the swap is not local (pulled from real-time sync) we do not claim twice
ChainSwapStates::TransactionServerMempool
| ChainSwapStates::TransactionServerConfirmed => {
log::debug!("Received {swap_state:?} for non-local Chain swap {id} from status stream, skipping update.");
return Ok(());
}
_ => {}
match swap_state {
// If the swap is not local (pulled from real-time sync) we do not claim twice
ChainSwapStates::TransactionServerMempool
| ChainSwapStates::TransactionServerConfirmed => {
log::debug!("Received {swap_state:?} for non-local Chain swap {id} from status stream, skipping update.");
return Ok(());
}
_ => {}
}
}

View File

@@ -809,24 +809,41 @@ impl Swap {
pub(crate) fn version(&self) -> u64 {
match self {
Swap::Chain(ChainSwap { version, .. })
| Swap::Send(SendSwap { version, .. })
| Swap::Receive(ReceiveSwap { version, .. }) => *version,
Swap::Chain(ChainSwap { metadata, .. })
| Swap::Send(SendSwap { metadata, .. })
| Swap::Receive(ReceiveSwap { metadata, .. }) => metadata.version,
}
}
pub(crate) fn set_version(&mut self, version: u64) {
match self {
Swap::Chain(chain_swap) => {
chain_swap.version = version;
chain_swap.metadata.version = version;
}
Swap::Send(send_swap) => {
send_swap.version = version;
send_swap.metadata.version = version;
}
Swap::Receive(receive_swap) => {
receive_swap.version = version;
receive_swap.metadata.version = version;
}
}
}
pub(crate) fn is_local(&self) -> bool {
match self {
Swap::Chain(ChainSwap { metadata, .. })
| Swap::Send(SendSwap { metadata, .. })
| Swap::Receive(ReceiveSwap { metadata, .. }) => metadata.is_local,
}
}
pub(crate) fn last_updated_at(&self) -> u32 {
match self {
Swap::Chain(ChainSwap { metadata, .. })
| Swap::Send(SendSwap { metadata, .. })
| Swap::Receive(ReceiveSwap { metadata, .. }) => metadata.last_updated_at,
}
}
}
impl From<ChainSwap> for Swap {
fn from(swap: ChainSwap) -> Self {
@@ -888,6 +905,14 @@ impl FromSql for Direction {
}
}
#[derive(Clone, Debug, Default)]
pub(crate) struct SwapMetadata {
/// Version used for optimistic concurrency control within local db
pub(crate) version: u64,
pub(crate) last_updated_at: u32,
pub(crate) is_local: bool,
}
/// A chain swap
///
/// See <https://docs.boltz.exchange/v/api/lifecycle#chain-swaps>
@@ -930,9 +955,9 @@ pub(crate) struct ChainSwap {
pub(crate) claim_private_key: String,
pub(crate) refund_private_key: String,
pub(crate) auto_accepted_fees: bool,
/// Version used for optimistic concurrency control within local db
/// Swap metadata that is only valid when reading one from the local database
#[derivative(PartialEq = "ignore")]
pub(crate) version: u64,
pub(crate) metadata: SwapMetadata,
}
impl ChainSwap {
pub(crate) fn get_claim_keypair(&self) -> SdkResult<Keypair> {
@@ -1084,9 +1109,9 @@ pub(crate) struct SendSwap {
pub(crate) timeout_block_height: u64,
pub(crate) state: PaymentState,
pub(crate) refund_private_key: String,
/// Version used for optimistic concurrency control within local db
/// Swap metadata that is only valid when reading one from the local database
#[derivative(PartialEq = "ignore")]
pub(crate) version: u64,
pub(crate) metadata: SwapMetadata,
}
impl SendSwap {
pub(crate) fn get_refund_keypair(&self) -> Result<Keypair, SdkError> {
@@ -1182,9 +1207,9 @@ pub(crate) struct ReceiveSwap {
pub(crate) created_at: u32,
pub(crate) timeout_block_height: u32,
pub(crate) state: PaymentState,
/// Version used for optimistic concurrency control within local db
/// Swap metadata that is only valid when reading one from the local database
#[derivative(PartialEq = "ignore")]
pub(crate) version: u64,
pub(crate) metadata: SwapMetadata,
}
impl ReceiveSwap {
pub(crate) fn get_claim_keypair(&self) -> Result<Keypair, PaymentError> {

View File

@@ -89,7 +89,7 @@ impl Persister {
":state": &chain_swap.state,
":actual_payer_amount_sat": &chain_swap.actual_payer_amount_sat,
":accepted_receiver_amount_sat": &chain_swap.accepted_receiver_amount_sat,
":version": &chain_swap.version,
":version": &chain_swap.metadata.version,
},
)?;
ensure_sdk!(
@@ -158,8 +158,8 @@ impl Persister {
accepted_receiver_amount_sat,
auto_accepted_fees,
version,
last_updated_at,
-- Used for filtering
sync_state.is_local
FROM chain_swaps
LEFT JOIN sync_state ON chain_swaps.id = sync_state.data_id
@@ -214,7 +214,11 @@ impl Persister {
actual_payer_amount_sat: row.get(21)?,
accepted_receiver_amount_sat: row.get(22)?,
auto_accepted_fees: row.get(23)?,
version: row.get(24)?,
metadata: SwapMetadata {
version: row.get(24)?,
last_updated_at: row.get(25)?,
is_local: row.get::<usize, Option<bool>>(26)?.unwrap_or(true),
},
})
}

View File

@@ -292,5 +292,28 @@ pub(crate) fn current_migrations(network: LiquidNetwork) -> Vec<&'static str> {
",
insert_default_asset_metadata,
"ALTER TABLE payment_details ADD COLUMN bip353_address TEXT;",
"
ALTER TABLE receive_swaps ADD COLUMN last_updated_at INTEGER NOT NULL DEFAULT 0;
ALTER TABLE send_swaps ADD COLUMN last_updated_at INTEGER NOT NULL DEFAULT 0;
ALTER TABLE chain_swaps ADD COLUMN last_updated_at INTEGER NOT NULL DEFAULT 0;
CREATE TRIGGER IF NOT EXISTS update_receive_swaps_last_updated_at
AFTER UPDATE ON receive_swaps
BEGIN
UPDATE receive_swaps SET last_updated_at = (strftime('%s', 'now'))
WHERE id = NEW.id;
END;
CREATE TRIGGER IF NOT EXISTS update_send_swaps_last_updated_at
AFTER UPDATE ON send_swaps
BEGIN
UPDATE send_swaps SET last_updated_at = (strftime('%s', 'now'))
WHERE id = NEW.id;
END;
CREATE TRIGGER IF NOT EXISTS update_chain_swaps_last_updated_at
AFTER UPDATE ON chain_swaps
BEGIN
UPDATE chain_swaps SET last_updated_at = (strftime('%s', 'now'))
WHERE id = NEW.id;
END;
",
]
}

View File

@@ -85,7 +85,7 @@ impl Persister {
":payer_amount_sat": &receive_swap.payer_amount_sat,
":receiver_amount_sat": &receive_swap.receiver_amount_sat,
":state": &receive_swap.state,
":version": &receive_swap.version,
":version": &receive_swap.metadata.version,
},
)?;
ensure_sdk!(
@@ -153,8 +153,8 @@ impl Persister {
rs.state,
rs.pair_fees_json,
rs.version,
rs.last_updated_at,
-- Used for filtering
sync_state.is_local
FROM receive_swaps AS rs
LEFT JOIN sync_state ON rs.id = sync_state.data_id
@@ -204,7 +204,11 @@ impl Persister {
created_at: row.get(16)?,
state: row.get(17)?,
pair_fees_json: row.get(18)?,
version: row.get(19)?,
metadata: SwapMetadata {
version: row.get(19)?,
last_updated_at: row.get(20)?,
is_local: row.get::<usize, Option<bool>>(21)?.unwrap_or(true),
},
})
}

View File

@@ -76,7 +76,7 @@ impl Persister {
":lockup_tx_id": &send_swap.lockup_tx_id,
":refund_tx_id": &send_swap.refund_tx_id,
":state": &send_swap.state,
":version": &send_swap.version,
":version": &send_swap.metadata.version,
},
)?;
ensure_sdk!(
@@ -183,8 +183,12 @@ impl Persister {
created_at,
state,
pair_fees_json,
version
version,
last_updated_at,
sync_state.is_local
FROM send_swaps AS ss
LEFT JOIN sync_state ON ss.id = sync_state.data_id
{where_clause_str}
ORDER BY created_at
"
@@ -226,7 +230,11 @@ impl Persister {
created_at: row.get(14)?,
state: row.get(15)?,
pair_fees_json: row.get(16)?,
version: row.get(17)?,
metadata: SwapMetadata {
version: row.get(17)?,
last_updated_at: row.get(18)?,
is_local: row.get::<usize, Option<bool>>(19)?.unwrap_or(true),
},
})
}

View File

@@ -52,15 +52,6 @@ impl Persister {
Ok(sync_state)
}
pub(crate) fn get_sync_state_by_data_id(&self, data_id: &str) -> Result<Option<SyncState>> {
let con = self.get_connection()?;
let query = Self::select_sync_state_query(vec!["data_id = ?1".to_string()]);
let sync_state = con
.query_row(&query, [data_id], Self::sql_row_to_sync_state)
.optional()?;
Ok(sync_state)
}
fn set_sync_state_stmt(con: &Connection) -> rusqlite::Result<Statement> {
con.prepare(
"

View File

@@ -78,12 +78,6 @@ impl ReceiveSwapHandler {
let receive_swap = self.fetch_receive_swap_by_id(id)?;
info!("Handling Receive Swap transition to {swap_state:?} for swap {id}");
// Get if the swap is local from the sync state. This allows us to verify
// the update but avoid claiming if not local.
let is_local_swap = self
.persister
.get_sync_state_by_data_id(&receive_swap.id)?
.map_or(true, |sync_state| sync_state.is_local);
match swap_state {
RevSwapStates::SwapExpired
@@ -188,7 +182,7 @@ impl ReceiveSwapHandler {
debug!("[Receive Swap {id}] Lockup tx fees are within acceptable range ({tx_fees} > {lower_bound_estimated_fees} sat). Proceeding with claim.");
if is_local_swap {
if receive_swap.metadata.is_local {
// Only claim a local swap
if let Err(err) = self.claim(id).await {
match err {
@@ -250,7 +244,7 @@ impl ReceiveSwapHandler {
None => {
self.update_swap_info(&receive_swap.id, Pending, None, None, None, None)?;
if is_local_swap {
if receive_swap.metadata.is_local {
// Only claim a local swap
if let Err(err) = self.claim(id).await {
match err {

View File

@@ -14,6 +14,7 @@ use lwk_wollet::WalletTx;
use super::model::*;
use crate::prelude::{Direction, Swap};
use crate::sdk::NETWORK_PROPAGATION_GRACE_PERIOD;
use crate::swapper::Swapper;
use crate::wallet::OnchainWallet;
use crate::{
@@ -170,6 +171,8 @@ impl Recoverer {
&self,
swaps: &mut [Swap],
) -> Result<HashMap<Txid, WalletTx>> {
let recovery_started_at = utils::now();
let raw_tx_map = self.onchain_wallet.transactions_by_tx_id().await?;
let tx_map = TxMap::from_raw_tx_map(raw_tx_map.clone());
@@ -212,12 +215,31 @@ impl Recoverer {
for swap in swaps.iter_mut() {
let swap_id = &swap.id();
let is_local_within_grace_period = swap.is_local()
&& recovery_started_at.saturating_sub(swap.last_updated_at())
< NETWORK_PROPAGATION_GRACE_PERIOD.as_secs() as u32;
match swap {
Swap::Send(send_swap) => {
let Some(recovered_data) = recovered_send_data.get_mut(swap_id) else {
log::warn!("Could not apply recovered data for Send swap {swap_id}: recovery data not found");
warn!("Could not apply recovered data for Send swap {swap_id}: recovery data not found");
continue;
};
let lockup_is_cleared =
send_swap.lockup_tx_id.is_some() && recovered_data.lockup_tx_id.is_none();
let refund_is_cleared =
send_swap.refund_tx_id.is_some() && recovered_data.refund_tx_id.is_none();
if is_local_within_grace_period && (lockup_is_cleared || refund_is_cleared) {
warn!(
"Local send swap {swap_id} was updated recently - skipping recovery \
as it would clear a tx that may have been broadcasted by us. Lockup clear: \
{lockup_is_cleared} - Refund clear: {refund_is_cleared}"
);
continue;
}
send_swap.lockup_tx_id = recovered_data
.lockup_tx_id
.clone()
@@ -252,9 +274,20 @@ impl Recoverer {
}
Swap::Receive(receive_swap) => {
let Some(recovered_data) = recovered_receive_data.get(swap_id) else {
log::warn!("Could not apply recovered data for Receive swap {swap_id}: recovery data not found");
warn!("Could not apply recovered data for Receive swap {swap_id}: recovery data not found");
continue;
};
let claim_is_cleared =
receive_swap.claim_tx_id.is_some() && recovered_data.claim_tx_id.is_none();
if is_local_within_grace_period && claim_is_cleared {
warn!(
"Local receive swap {swap_id} was updated recently - skipping recovery \
as it would clear a tx that may have been broadcasted by us (claim)"
);
continue;
}
let timeout_block_height = receive_swap.timeout_block_height;
let is_expired = liquid_tip >= timeout_block_height;
if let Some(new_state) = recovered_data.derive_partial_state(is_expired) {
@@ -280,9 +313,23 @@ impl Recoverer {
Swap::Chain(chain_swap) => match chain_swap.direction {
Direction::Incoming => {
let Some(recovered_data) = recovered_chain_receive_data.get(swap_id) else {
log::warn!("Could not apply recovered data for incoming Chain swap {swap_id}: recovery data not found");
warn!("Could not apply recovered data for incoming Chain swap {swap_id}: recovery data not found");
continue;
};
let claim_is_cleared = chain_swap.claim_tx_id.is_some()
&& recovered_data.lbtc_claim_tx_id.is_none();
let refund_is_cleared = chain_swap.refund_tx_id.is_some()
&& recovered_data.btc_refund_tx_id.is_none();
if is_local_within_grace_period && (claim_is_cleared || refund_is_cleared) {
warn!(
"Local incoming chain swap {swap_id} was updated recently - skipping recovery \
as it would clear a tx that may have been broadcasted by us. Claim clear: \
{claim_is_cleared} - Refund clear: {refund_is_cleared}"
);
continue;
}
if recovered_data.btc_user_lockup_amount_sat > 0 {
chain_swap.actual_payer_amount_sat =
Some(recovered_data.btc_user_lockup_amount_sat);
@@ -324,9 +371,27 @@ impl Recoverer {
}
Direction::Outgoing => {
let Some(recovered_data) = recovered_chain_send_data.get(swap_id) else {
log::warn!("Could not apply recovered data for outgoing Chain swap {swap_id}: recovery data not found");
warn!("Could not apply recovered data for outgoing Chain swap {swap_id}: recovery data not found");
continue;
};
let lockup_is_cleared = chain_swap.user_lockup_tx_id.is_some()
&& recovered_data.lbtc_user_lockup_tx_id.is_none();
let refund_is_cleared = chain_swap.refund_tx_id.is_some()
&& recovered_data.lbtc_refund_tx_id.is_none();
let claim_is_cleared = chain_swap.claim_tx_id.is_some()
&& recovered_data.btc_claim_tx_id.is_none();
if is_local_within_grace_period
&& (lockup_is_cleared || refund_is_cleared || claim_is_cleared)
{
warn!(
"Local outgoing chain swap {swap_id} was updated recently - skipping recovery \
as it would clear a tx that may have been broadcasted by us. Lockup clear: \
{lockup_is_cleared} - Refund clear: {refund_is_cleared} - Claim clear: {claim_is_cleared}"
);
continue;
}
let is_expired = liquid_tip >= chain_swap.timeout_block_height;
if let Some(new_state) = recovered_data.derive_partial_state(is_expired) {
chain_swap.state = new_state;

View File

@@ -67,7 +67,7 @@ pub const DEFAULT_EXTERNAL_INPUT_PARSERS: &[(&str, &str, &str)] = &[(
"https://cryptoqr.net/.well-known/lnurlp/<input>",
)];
const NETWORK_PROPAGATION_GRACE_PERIOD: Duration = Duration::from_secs(60 * 3);
pub(crate) const NETWORK_PROPAGATION_GRACE_PERIOD: Duration = Duration::from_secs(30);
pub struct LiquidSdk {
pub(crate) config: Config,
@@ -1582,7 +1582,7 @@ impl LiquidSdk {
created_at: utils::now(),
state: PaymentState::Created,
refund_private_key: keypair.display_secret().to_string(),
version: 0,
metadata: Default::default(),
};
self.persister.insert_or_update_send_swap(&swap)?;
swap
@@ -1885,7 +1885,7 @@ impl LiquidSdk {
created_at: utils::now(),
state: PaymentState::Created,
auto_accepted_fees: false,
version: 0,
metadata: Default::default(),
};
self.persister.insert_or_update_chain_swap(&swap)?;
self.status_stream.track_swap_id(&swap_id)?;
@@ -2304,7 +2304,7 @@ impl LiquidSdk {
mrh_tx_id: None,
created_at: utils::now(),
state: PaymentState::Created,
version: 0,
metadata: Default::default(),
})
.map_err(|_| PaymentError::PersistError)?;
self.status_stream.track_swap_id(&swap_id)?;
@@ -2410,7 +2410,7 @@ impl LiquidSdk {
created_at: utils::now(),
state: PaymentState::Created,
auto_accepted_fees: false,
version: 0,
metadata: Default::default(),
};
self.persister.insert_or_update_chain_swap(&swap)?;
self.status_stream.track_swap_id(&swap.id)?;

View File

@@ -120,7 +120,7 @@ impl From<ChainSyncData> for ChainSwap {
claim_tx_id: None,
refund_tx_id: None,
auto_accepted_fees: val.auto_accepted_fees,
version: 0,
metadata: Default::default(),
}
}
}
@@ -209,7 +209,7 @@ impl From<SendSyncData> for SendSwap {
state: PaymentState::Created,
lockup_tx_id: None,
refund_tx_id: None,
version: 0,
metadata: Default::default(),
}
}
}
@@ -293,7 +293,7 @@ impl From<ReceiveSyncData> for ReceiveSwap {
claim_tx_id: None,
lockup_tx_id: None,
mrh_tx_id: None,
version: 0,
metadata: Default::default(),
}
}
}

View File

@@ -148,7 +148,7 @@ pub(crate) fn new_chain_swap(
}"#
.to_string(),
auto_accepted_fees: false,
version: 0
metadata: Default::default(),
};
}
match direction {
@@ -233,7 +233,7 @@ pub(crate) fn new_chain_swap(
}
}"#.to_string(),
auto_accepted_fees: false,
version: 0,
metadata: Default::default(),
},
Direction::Outgoing => ChainSwap {
id: generate_random_string(4),
@@ -316,7 +316,7 @@ pub(crate) fn new_chain_swap(
}
}"#.to_string(),
auto_accepted_fees: false,
version: 0
metadata: Default::default(),
}
}
}

View File

@@ -89,7 +89,7 @@ pub(crate) fn new_send_swap(
created_at: utils::now(),
state: payment_state.unwrap_or(PaymentState::Created),
refund_private_key: "945affeef55f12227f1d4a3f80a17062a05b229ddc5a01591eb5ddf882df92e3".to_string(),
version: 0,
metadata: Default::default(),
}
}
@@ -117,7 +117,7 @@ pub(crate) fn new_receive_swap(
mrh_tx_id: None,
created_at: utils::now(),
state: payment_state.unwrap_or(PaymentState::Created),
version: 0,
metadata: Default::default(),
}
}