From 3ec57ad93eea85d77d80655ae271cbede8d47a4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Granh=C3=A3o?= <32176319+danielgranhao@users.noreply.github.com> Date: Mon, 17 Feb 2025 13:23:32 +0000 Subject: [PATCH] Add tx propagation grace period to swap recovery (#733) * Add tx propagation grace period to swap recovery * Add missing tx reset checks and improve logs --- lib/core/src/chain_swap.rs | 24 ++++----- lib/core/src/model.rs | 49 +++++++++++++----- lib/core/src/persist/chain.rs | 10 ++-- lib/core/src/persist/migrations.rs | 23 +++++++++ lib/core/src/persist/receive.rs | 10 ++-- lib/core/src/persist/send.rs | 14 +++-- lib/core/src/persist/sync.rs | 9 ---- lib/core/src/receive_swap.rs | 10 +--- lib/core/src/recover/recoverer.rs | 73 +++++++++++++++++++++++++-- lib/core/src/sdk.rs | 10 ++-- lib/core/src/sync/model/data.rs | 6 +-- lib/core/src/test_utils/chain_swap.rs | 6 +-- lib/core/src/test_utils/persist.rs | 4 +- 13 files changed, 180 insertions(+), 68 deletions(-) diff --git a/lib/core/src/chain_swap.rs b/lib/core/src/chain_swap.rs index a95798e..9f04867 100644 --- a/lib/core/src/chain_swap.rs +++ b/lib/core/src/chain_swap.rs @@ -94,21 +94,19 @@ impl ChainSwapHandler { let id = &update.id; let swap = self.fetch_chain_swap_by_id(id)?; - if let Some(sync_state) = self.persister.get_sync_state_by_data_id(&swap.id)? { - if !sync_state.is_local { - let status = &update.status; - let swap_state = ChainSwapStates::from_str(status) - .map_err(|_| anyhow!("Invalid ChainSwapState for Chain Swap {id}: {status}"))?; + if !swap.metadata.is_local { + let status = &update.status; + let swap_state = ChainSwapStates::from_str(status) + .map_err(|_| anyhow!("Invalid ChainSwapState for Chain Swap {id}: {status}"))?; - match swap_state { - // If the swap is not local (pulled from real-time sync) we do not claim twice - ChainSwapStates::TransactionServerMempool - | ChainSwapStates::TransactionServerConfirmed => { - log::debug!("Received {swap_state:?} for non-local Chain swap {id} from status stream, skipping update."); - return Ok(()); - } - _ => {} + match swap_state { + // If the swap is not local (pulled from real-time sync) we do not claim twice + ChainSwapStates::TransactionServerMempool + | ChainSwapStates::TransactionServerConfirmed => { + log::debug!("Received {swap_state:?} for non-local Chain swap {id} from status stream, skipping update."); + return Ok(()); } + _ => {} } } diff --git a/lib/core/src/model.rs b/lib/core/src/model.rs index 00308ed..9faa89c 100644 --- a/lib/core/src/model.rs +++ b/lib/core/src/model.rs @@ -809,24 +809,41 @@ impl Swap { pub(crate) fn version(&self) -> u64 { match self { - Swap::Chain(ChainSwap { version, .. }) - | Swap::Send(SendSwap { version, .. }) - | Swap::Receive(ReceiveSwap { version, .. }) => *version, + Swap::Chain(ChainSwap { metadata, .. }) + | Swap::Send(SendSwap { metadata, .. }) + | Swap::Receive(ReceiveSwap { metadata, .. }) => metadata.version, } } + pub(crate) fn set_version(&mut self, version: u64) { match self { Swap::Chain(chain_swap) => { - chain_swap.version = version; + chain_swap.metadata.version = version; } Swap::Send(send_swap) => { - send_swap.version = version; + send_swap.metadata.version = version; } Swap::Receive(receive_swap) => { - receive_swap.version = version; + receive_swap.metadata.version = version; } } } + + pub(crate) fn is_local(&self) -> bool { + match self { + Swap::Chain(ChainSwap { metadata, .. }) + | Swap::Send(SendSwap { metadata, .. }) + | Swap::Receive(ReceiveSwap { metadata, .. }) => metadata.is_local, + } + } + + pub(crate) fn last_updated_at(&self) -> u32 { + match self { + Swap::Chain(ChainSwap { metadata, .. }) + | Swap::Send(SendSwap { metadata, .. }) + | Swap::Receive(ReceiveSwap { metadata, .. }) => metadata.last_updated_at, + } + } } impl From for Swap { fn from(swap: ChainSwap) -> Self { @@ -888,6 +905,14 @@ impl FromSql for Direction { } } +#[derive(Clone, Debug, Default)] +pub(crate) struct SwapMetadata { + /// Version used for optimistic concurrency control within local db + pub(crate) version: u64, + pub(crate) last_updated_at: u32, + pub(crate) is_local: bool, +} + /// A chain swap /// /// See @@ -930,9 +955,9 @@ pub(crate) struct ChainSwap { pub(crate) claim_private_key: String, pub(crate) refund_private_key: String, pub(crate) auto_accepted_fees: bool, - /// Version used for optimistic concurrency control within local db + /// Swap metadata that is only valid when reading one from the local database #[derivative(PartialEq = "ignore")] - pub(crate) version: u64, + pub(crate) metadata: SwapMetadata, } impl ChainSwap { pub(crate) fn get_claim_keypair(&self) -> SdkResult { @@ -1084,9 +1109,9 @@ pub(crate) struct SendSwap { pub(crate) timeout_block_height: u64, pub(crate) state: PaymentState, pub(crate) refund_private_key: String, - /// Version used for optimistic concurrency control within local db + /// Swap metadata that is only valid when reading one from the local database #[derivative(PartialEq = "ignore")] - pub(crate) version: u64, + pub(crate) metadata: SwapMetadata, } impl SendSwap { pub(crate) fn get_refund_keypair(&self) -> Result { @@ -1182,9 +1207,9 @@ pub(crate) struct ReceiveSwap { pub(crate) created_at: u32, pub(crate) timeout_block_height: u32, pub(crate) state: PaymentState, - /// Version used for optimistic concurrency control within local db + /// Swap metadata that is only valid when reading one from the local database #[derivative(PartialEq = "ignore")] - pub(crate) version: u64, + pub(crate) metadata: SwapMetadata, } impl ReceiveSwap { pub(crate) fn get_claim_keypair(&self) -> Result { diff --git a/lib/core/src/persist/chain.rs b/lib/core/src/persist/chain.rs index c0a9329..82d3fc4 100644 --- a/lib/core/src/persist/chain.rs +++ b/lib/core/src/persist/chain.rs @@ -89,7 +89,7 @@ impl Persister { ":state": &chain_swap.state, ":actual_payer_amount_sat": &chain_swap.actual_payer_amount_sat, ":accepted_receiver_amount_sat": &chain_swap.accepted_receiver_amount_sat, - ":version": &chain_swap.version, + ":version": &chain_swap.metadata.version, }, )?; ensure_sdk!( @@ -158,8 +158,8 @@ impl Persister { accepted_receiver_amount_sat, auto_accepted_fees, version, + last_updated_at, - -- Used for filtering sync_state.is_local FROM chain_swaps LEFT JOIN sync_state ON chain_swaps.id = sync_state.data_id @@ -214,7 +214,11 @@ impl Persister { actual_payer_amount_sat: row.get(21)?, accepted_receiver_amount_sat: row.get(22)?, auto_accepted_fees: row.get(23)?, - version: row.get(24)?, + metadata: SwapMetadata { + version: row.get(24)?, + last_updated_at: row.get(25)?, + is_local: row.get::>(26)?.unwrap_or(true), + }, }) } diff --git a/lib/core/src/persist/migrations.rs b/lib/core/src/persist/migrations.rs index 34d8f7e..a49da3f 100644 --- a/lib/core/src/persist/migrations.rs +++ b/lib/core/src/persist/migrations.rs @@ -292,5 +292,28 @@ pub(crate) fn current_migrations(network: LiquidNetwork) -> Vec<&'static str> { ", insert_default_asset_metadata, "ALTER TABLE payment_details ADD COLUMN bip353_address TEXT;", + " + ALTER TABLE receive_swaps ADD COLUMN last_updated_at INTEGER NOT NULL DEFAULT 0; + ALTER TABLE send_swaps ADD COLUMN last_updated_at INTEGER NOT NULL DEFAULT 0; + ALTER TABLE chain_swaps ADD COLUMN last_updated_at INTEGER NOT NULL DEFAULT 0; + CREATE TRIGGER IF NOT EXISTS update_receive_swaps_last_updated_at + AFTER UPDATE ON receive_swaps + BEGIN + UPDATE receive_swaps SET last_updated_at = (strftime('%s', 'now')) + WHERE id = NEW.id; + END; + CREATE TRIGGER IF NOT EXISTS update_send_swaps_last_updated_at + AFTER UPDATE ON send_swaps + BEGIN + UPDATE send_swaps SET last_updated_at = (strftime('%s', 'now')) + WHERE id = NEW.id; + END; + CREATE TRIGGER IF NOT EXISTS update_chain_swaps_last_updated_at + AFTER UPDATE ON chain_swaps + BEGIN + UPDATE chain_swaps SET last_updated_at = (strftime('%s', 'now')) + WHERE id = NEW.id; + END; + ", ] } diff --git a/lib/core/src/persist/receive.rs b/lib/core/src/persist/receive.rs index f56696f..5c92554 100644 --- a/lib/core/src/persist/receive.rs +++ b/lib/core/src/persist/receive.rs @@ -85,7 +85,7 @@ impl Persister { ":payer_amount_sat": &receive_swap.payer_amount_sat, ":receiver_amount_sat": &receive_swap.receiver_amount_sat, ":state": &receive_swap.state, - ":version": &receive_swap.version, + ":version": &receive_swap.metadata.version, }, )?; ensure_sdk!( @@ -153,8 +153,8 @@ impl Persister { rs.state, rs.pair_fees_json, rs.version, + rs.last_updated_at, - -- Used for filtering sync_state.is_local FROM receive_swaps AS rs LEFT JOIN sync_state ON rs.id = sync_state.data_id @@ -204,7 +204,11 @@ impl Persister { created_at: row.get(16)?, state: row.get(17)?, pair_fees_json: row.get(18)?, - version: row.get(19)?, + metadata: SwapMetadata { + version: row.get(19)?, + last_updated_at: row.get(20)?, + is_local: row.get::>(21)?.unwrap_or(true), + }, }) } diff --git a/lib/core/src/persist/send.rs b/lib/core/src/persist/send.rs index 69f8761..ca1c380 100644 --- a/lib/core/src/persist/send.rs +++ b/lib/core/src/persist/send.rs @@ -76,7 +76,7 @@ impl Persister { ":lockup_tx_id": &send_swap.lockup_tx_id, ":refund_tx_id": &send_swap.refund_tx_id, ":state": &send_swap.state, - ":version": &send_swap.version, + ":version": &send_swap.metadata.version, }, )?; ensure_sdk!( @@ -183,8 +183,12 @@ impl Persister { created_at, state, pair_fees_json, - version + version, + last_updated_at, + + sync_state.is_local FROM send_swaps AS ss + LEFT JOIN sync_state ON ss.id = sync_state.data_id {where_clause_str} ORDER BY created_at " @@ -226,7 +230,11 @@ impl Persister { created_at: row.get(14)?, state: row.get(15)?, pair_fees_json: row.get(16)?, - version: row.get(17)?, + metadata: SwapMetadata { + version: row.get(17)?, + last_updated_at: row.get(18)?, + is_local: row.get::>(19)?.unwrap_or(true), + }, }) } diff --git a/lib/core/src/persist/sync.rs b/lib/core/src/persist/sync.rs index d998561..c3c71f1 100644 --- a/lib/core/src/persist/sync.rs +++ b/lib/core/src/persist/sync.rs @@ -52,15 +52,6 @@ impl Persister { Ok(sync_state) } - pub(crate) fn get_sync_state_by_data_id(&self, data_id: &str) -> Result> { - let con = self.get_connection()?; - let query = Self::select_sync_state_query(vec!["data_id = ?1".to_string()]); - let sync_state = con - .query_row(&query, [data_id], Self::sql_row_to_sync_state) - .optional()?; - Ok(sync_state) - } - fn set_sync_state_stmt(con: &Connection) -> rusqlite::Result { con.prepare( " diff --git a/lib/core/src/receive_swap.rs b/lib/core/src/receive_swap.rs index 29a5729..5ba7320 100644 --- a/lib/core/src/receive_swap.rs +++ b/lib/core/src/receive_swap.rs @@ -78,12 +78,6 @@ impl ReceiveSwapHandler { let receive_swap = self.fetch_receive_swap_by_id(id)?; info!("Handling Receive Swap transition to {swap_state:?} for swap {id}"); - // Get if the swap is local from the sync state. This allows us to verify - // the update but avoid claiming if not local. - let is_local_swap = self - .persister - .get_sync_state_by_data_id(&receive_swap.id)? - .map_or(true, |sync_state| sync_state.is_local); match swap_state { RevSwapStates::SwapExpired @@ -188,7 +182,7 @@ impl ReceiveSwapHandler { debug!("[Receive Swap {id}] Lockup tx fees are within acceptable range ({tx_fees} > {lower_bound_estimated_fees} sat). Proceeding with claim."); - if is_local_swap { + if receive_swap.metadata.is_local { // Only claim a local swap if let Err(err) = self.claim(id).await { match err { @@ -250,7 +244,7 @@ impl ReceiveSwapHandler { None => { self.update_swap_info(&receive_swap.id, Pending, None, None, None, None)?; - if is_local_swap { + if receive_swap.metadata.is_local { // Only claim a local swap if let Err(err) = self.claim(id).await { match err { diff --git a/lib/core/src/recover/recoverer.rs b/lib/core/src/recover/recoverer.rs index 26d638d..8e31271 100644 --- a/lib/core/src/recover/recoverer.rs +++ b/lib/core/src/recover/recoverer.rs @@ -14,6 +14,7 @@ use lwk_wollet::WalletTx; use super::model::*; use crate::prelude::{Direction, Swap}; +use crate::sdk::NETWORK_PROPAGATION_GRACE_PERIOD; use crate::swapper::Swapper; use crate::wallet::OnchainWallet; use crate::{ @@ -170,6 +171,8 @@ impl Recoverer { &self, swaps: &mut [Swap], ) -> Result> { + let recovery_started_at = utils::now(); + let raw_tx_map = self.onchain_wallet.transactions_by_tx_id().await?; let tx_map = TxMap::from_raw_tx_map(raw_tx_map.clone()); @@ -212,12 +215,31 @@ impl Recoverer { for swap in swaps.iter_mut() { let swap_id = &swap.id(); + + let is_local_within_grace_period = swap.is_local() + && recovery_started_at.saturating_sub(swap.last_updated_at()) + < NETWORK_PROPAGATION_GRACE_PERIOD.as_secs() as u32; + match swap { Swap::Send(send_swap) => { let Some(recovered_data) = recovered_send_data.get_mut(swap_id) else { - log::warn!("Could not apply recovered data for Send swap {swap_id}: recovery data not found"); + warn!("Could not apply recovered data for Send swap {swap_id}: recovery data not found"); continue; }; + + let lockup_is_cleared = + send_swap.lockup_tx_id.is_some() && recovered_data.lockup_tx_id.is_none(); + let refund_is_cleared = + send_swap.refund_tx_id.is_some() && recovered_data.refund_tx_id.is_none(); + if is_local_within_grace_period && (lockup_is_cleared || refund_is_cleared) { + warn!( + "Local send swap {swap_id} was updated recently - skipping recovery \ + as it would clear a tx that may have been broadcasted by us. Lockup clear: \ + {lockup_is_cleared} - Refund clear: {refund_is_cleared}" + ); + continue; + } + send_swap.lockup_tx_id = recovered_data .lockup_tx_id .clone() @@ -252,9 +274,20 @@ impl Recoverer { } Swap::Receive(receive_swap) => { let Some(recovered_data) = recovered_receive_data.get(swap_id) else { - log::warn!("Could not apply recovered data for Receive swap {swap_id}: recovery data not found"); + warn!("Could not apply recovered data for Receive swap {swap_id}: recovery data not found"); continue; }; + + let claim_is_cleared = + receive_swap.claim_tx_id.is_some() && recovered_data.claim_tx_id.is_none(); + if is_local_within_grace_period && claim_is_cleared { + warn!( + "Local receive swap {swap_id} was updated recently - skipping recovery \ + as it would clear a tx that may have been broadcasted by us (claim)" + ); + continue; + } + let timeout_block_height = receive_swap.timeout_block_height; let is_expired = liquid_tip >= timeout_block_height; if let Some(new_state) = recovered_data.derive_partial_state(is_expired) { @@ -280,9 +313,23 @@ impl Recoverer { Swap::Chain(chain_swap) => match chain_swap.direction { Direction::Incoming => { let Some(recovered_data) = recovered_chain_receive_data.get(swap_id) else { - log::warn!("Could not apply recovered data for incoming Chain swap {swap_id}: recovery data not found"); + warn!("Could not apply recovered data for incoming Chain swap {swap_id}: recovery data not found"); continue; }; + + let claim_is_cleared = chain_swap.claim_tx_id.is_some() + && recovered_data.lbtc_claim_tx_id.is_none(); + let refund_is_cleared = chain_swap.refund_tx_id.is_some() + && recovered_data.btc_refund_tx_id.is_none(); + if is_local_within_grace_period && (claim_is_cleared || refund_is_cleared) { + warn!( + "Local incoming chain swap {swap_id} was updated recently - skipping recovery \ + as it would clear a tx that may have been broadcasted by us. Claim clear: \ + {claim_is_cleared} - Refund clear: {refund_is_cleared}" + ); + continue; + } + if recovered_data.btc_user_lockup_amount_sat > 0 { chain_swap.actual_payer_amount_sat = Some(recovered_data.btc_user_lockup_amount_sat); @@ -324,9 +371,27 @@ impl Recoverer { } Direction::Outgoing => { let Some(recovered_data) = recovered_chain_send_data.get(swap_id) else { - log::warn!("Could not apply recovered data for outgoing Chain swap {swap_id}: recovery data not found"); + warn!("Could not apply recovered data for outgoing Chain swap {swap_id}: recovery data not found"); continue; }; + + let lockup_is_cleared = chain_swap.user_lockup_tx_id.is_some() + && recovered_data.lbtc_user_lockup_tx_id.is_none(); + let refund_is_cleared = chain_swap.refund_tx_id.is_some() + && recovered_data.lbtc_refund_tx_id.is_none(); + let claim_is_cleared = chain_swap.claim_tx_id.is_some() + && recovered_data.btc_claim_tx_id.is_none(); + if is_local_within_grace_period + && (lockup_is_cleared || refund_is_cleared || claim_is_cleared) + { + warn!( + "Local outgoing chain swap {swap_id} was updated recently - skipping recovery \ + as it would clear a tx that may have been broadcasted by us. Lockup clear: \ + {lockup_is_cleared} - Refund clear: {refund_is_cleared} - Claim clear: {claim_is_cleared}" + ); + continue; + } + let is_expired = liquid_tip >= chain_swap.timeout_block_height; if let Some(new_state) = recovered_data.derive_partial_state(is_expired) { chain_swap.state = new_state; diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 97ff7a9..14b7a5e 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -67,7 +67,7 @@ pub const DEFAULT_EXTERNAL_INPUT_PARSERS: &[(&str, &str, &str)] = &[( "https://cryptoqr.net/.well-known/lnurlp/", )]; -const NETWORK_PROPAGATION_GRACE_PERIOD: Duration = Duration::from_secs(60 * 3); +pub(crate) const NETWORK_PROPAGATION_GRACE_PERIOD: Duration = Duration::from_secs(30); pub struct LiquidSdk { pub(crate) config: Config, @@ -1582,7 +1582,7 @@ impl LiquidSdk { created_at: utils::now(), state: PaymentState::Created, refund_private_key: keypair.display_secret().to_string(), - version: 0, + metadata: Default::default(), }; self.persister.insert_or_update_send_swap(&swap)?; swap @@ -1885,7 +1885,7 @@ impl LiquidSdk { created_at: utils::now(), state: PaymentState::Created, auto_accepted_fees: false, - version: 0, + metadata: Default::default(), }; self.persister.insert_or_update_chain_swap(&swap)?; self.status_stream.track_swap_id(&swap_id)?; @@ -2304,7 +2304,7 @@ impl LiquidSdk { mrh_tx_id: None, created_at: utils::now(), state: PaymentState::Created, - version: 0, + metadata: Default::default(), }) .map_err(|_| PaymentError::PersistError)?; self.status_stream.track_swap_id(&swap_id)?; @@ -2410,7 +2410,7 @@ impl LiquidSdk { created_at: utils::now(), state: PaymentState::Created, auto_accepted_fees: false, - version: 0, + metadata: Default::default(), }; self.persister.insert_or_update_chain_swap(&swap)?; self.status_stream.track_swap_id(&swap.id)?; diff --git a/lib/core/src/sync/model/data.rs b/lib/core/src/sync/model/data.rs index 31e2615..da79e84 100644 --- a/lib/core/src/sync/model/data.rs +++ b/lib/core/src/sync/model/data.rs @@ -120,7 +120,7 @@ impl From for ChainSwap { claim_tx_id: None, refund_tx_id: None, auto_accepted_fees: val.auto_accepted_fees, - version: 0, + metadata: Default::default(), } } } @@ -209,7 +209,7 @@ impl From for SendSwap { state: PaymentState::Created, lockup_tx_id: None, refund_tx_id: None, - version: 0, + metadata: Default::default(), } } } @@ -293,7 +293,7 @@ impl From for ReceiveSwap { claim_tx_id: None, lockup_tx_id: None, mrh_tx_id: None, - version: 0, + metadata: Default::default(), } } } diff --git a/lib/core/src/test_utils/chain_swap.rs b/lib/core/src/test_utils/chain_swap.rs index a1e4a4b..c90423d 100644 --- a/lib/core/src/test_utils/chain_swap.rs +++ b/lib/core/src/test_utils/chain_swap.rs @@ -148,7 +148,7 @@ pub(crate) fn new_chain_swap( }"# .to_string(), auto_accepted_fees: false, - version: 0 + metadata: Default::default(), }; } match direction { @@ -233,7 +233,7 @@ pub(crate) fn new_chain_swap( } }"#.to_string(), auto_accepted_fees: false, - version: 0, + metadata: Default::default(), }, Direction::Outgoing => ChainSwap { id: generate_random_string(4), @@ -316,7 +316,7 @@ pub(crate) fn new_chain_swap( } }"#.to_string(), auto_accepted_fees: false, - version: 0 + metadata: Default::default(), } } } diff --git a/lib/core/src/test_utils/persist.rs b/lib/core/src/test_utils/persist.rs index c6e8741..112162b 100644 --- a/lib/core/src/test_utils/persist.rs +++ b/lib/core/src/test_utils/persist.rs @@ -89,7 +89,7 @@ pub(crate) fn new_send_swap( created_at: utils::now(), state: payment_state.unwrap_or(PaymentState::Created), refund_private_key: "945affeef55f12227f1d4a3f80a17062a05b229ddc5a01591eb5ddf882df92e3".to_string(), - version: 0, + metadata: Default::default(), } } @@ -117,7 +117,7 @@ pub(crate) fn new_receive_swap( mrh_tx_id: None, created_at: utils::now(), state: payment_state.unwrap_or(PaymentState::Created), - version: 0, + metadata: Default::default(), } }