From 5f74b9df4b41bbd9e5d0e2fda21b3a5e8ab6ad5c Mon Sep 17 00:00:00 2001 From: ok300 <106775972+ok300@users.noreply.github.com> Date: Fri, 30 Aug 2024 17:18:25 +0000 Subject: [PATCH] Restore: Associate swap tx IDs from onchain data (#399) * Add extend_incomplete_failed_send_swaps() on first sync * Find lockup txs * Send Swaps: find refund txs * Simplify recover_send_swap_tx_ids, add recover_receive_swap_tx_ids * recover_receive_swap_tx_ids: batch tx lookups * Move onchain-restore methods to own module * Store restored data in own struct * Fix CI: bump pubspec.lock dependencies * LiquidChainService: add get_scripts_history_electrum() * restore_onchain: rely on batch call to fetch histories of all known swaps * Rename get_scripts_history_electrum * Rename restore_onchain.rs, flatten onchain inner module * Rename ImmutableDb to SwapsList * Simplify logic in restore.rs * restore.rs: Add chain swap support, simplify logic * restore.rs: add logging when script retrieval fails * restore.rs: remove unused field create_resp * restore.rs: rename SwapCompositeData to SwapHistory * restore.rs: allow unused fields in simulated immutable data * restore.rs: cargo fmt * Cargo fmt * Fix failing test * When fetching script history, also fetch if tx is confirmed or not * Recover send swaps: fetch claim tx IDs * Recover onchain data: persist reconstructed swaps * Simplify recover_from_onchain: store swap txs per swap ID * Receive swaps: do not treat lockup/claim txs as pair * Clarify meaning of partial swap state * Cargo clippy * Receive Chain Swap: distinguish BTC lockup from claim/refund tx * Send Chain Swap: distinguish BTC lockup/claim by vout, not by history order * get_partial_state: default to Created when state is unclear * Receive Chain Swaps: differentiate BTC refund from BTC claim txs * Send Swaps: clarify reason for defaulting to TimedOut on no lockup * Chain swaps: add docs for meaning of server, user txs * Recover Receive swaps: cover the case when only the lockup is available * HistoryTxId: store confirmation block height * Receive swaps: differentiate claim tx from swapper refund tx * recover_from_onchain: extract immutable DB (swaps list) as arg * Rename get_partial_state to derive_partial_state * Restore: remove validation steps * Restore chain swaps: treat as Complete only when claim is confirmed * Fix clippy warnings * Remove restore call from sync call --- lib/core/src/chain/bitcoin.rs | 12 + lib/core/src/chain/liquid.rs | 15 +- lib/core/src/lib.rs | 2 + lib/core/src/model.rs | 2 + lib/core/src/persist/receive.rs | 18 +- lib/core/src/persist/send.rs | 18 +- lib/core/src/restore.rs | 1056 ++++++++++++++++++++++++++++++ lib/core/src/sdk.rs | 14 +- lib/core/src/test_utils/chain.rs | 14 +- 9 files changed, 1130 insertions(+), 21 deletions(-) create mode 100644 lib/core/src/restore.rs diff --git a/lib/core/src/chain/bitcoin.rs b/lib/core/src/chain/bitcoin.rs index dc35a6e..b9ede9c 100644 --- a/lib/core/src/chain/bitcoin.rs +++ b/lib/core/src/chain/bitcoin.rs @@ -33,6 +33,9 @@ pub trait BitcoinChainService: Send + Sync { /// Get the transactions involved for a script fn get_script_history(&self, script: &Script) -> Result>; + /// Get the transactions involved in a list of scripts. + fn get_scripts_history(&self, scripts: &[&Script]) -> Result>>; + /// Get the transactions involved for a script async fn get_script_history_with_retry( &self, @@ -132,6 +135,15 @@ impl BitcoinChainService for HybridBitcoinChainService { .collect()) } + fn get_scripts_history(&self, scripts: &[&Script]) -> Result>> { + Ok(self + .client + .batch_script_get_history(scripts)? + .into_iter() + .map(|v| v.into_iter().map(Into::into).collect()) + .collect()) + } + async fn get_script_history_with_retry( &self, script: &Script, diff --git a/lib/core/src/chain/liquid.rs b/lib/core/src/chain/liquid.rs index 8da0305..2066b47 100644 --- a/lib/core/src/chain/liquid.rs +++ b/lib/core/src/chain/liquid.rs @@ -31,9 +31,16 @@ pub trait LiquidChainService: Send + Sync { /// Get a list of transactions async fn get_transactions(&self, txids: &[Txid]) -> Result>; - /// Get the transactions involved in a list of scripts including lowball + /// Get the transactions involved in a script, including lowball transactions. + /// + /// On mainnet, the data is fetched from Esplora. On testnet, it's fetched from Electrum. async fn get_script_history(&self, scripts: &Script) -> Result>; + /// Get the transactions involved in a list of scripts, including lowball transactions. + /// + /// The data is fetched in a single call from the Electrum endpoint. + async fn get_scripts_history(&self, scripts: &[&Script]) -> Result>>; + /// Get the transactions involved in a list of scripts including lowball async fn get_script_history_with_retry( &self, @@ -131,6 +138,12 @@ impl LiquidChainService for HybridLiquidChainService { } } + async fn get_scripts_history(&self, scripts: &[&Script]) -> Result>> { + self.electrum_client + .get_scripts_history(scripts) + .map_err(Into::into) + } + async fn get_script_history_with_retry( &self, script: &Script, diff --git a/lib/core/src/lib.rs b/lib/core/src/lib.rs index 6531f17..baac709 100644 --- a/lib/core/src/lib.rs +++ b/lib/core/src/lib.rs @@ -175,6 +175,8 @@ pub mod logger; pub mod model; pub mod persist; pub(crate) mod receive_swap; +#[allow(dead_code)] +mod restore; pub mod sdk; pub(crate) mod send_swap; pub(crate) mod swapper; diff --git a/lib/core/src/model.rs b/lib/core/src/model.rs index c481c63..f7a9167 100644 --- a/lib/core/src/model.rs +++ b/lib/core/src/model.rs @@ -519,6 +519,8 @@ impl FromSql for Direction { } /// A chain swap +/// +/// See #[derive(Clone, Debug)] pub(crate) struct ChainSwap { pub(crate) id: String, diff --git a/lib/core/src/persist/receive.rs b/lib/core/src/persist/receive.rs index 39ab473..42f8eee 100644 --- a/lib/core/src/persist/receive.rs +++ b/lib/core/src/persist/receive.rs @@ -119,11 +119,16 @@ impl Persister { }) } - pub(crate) fn list_receive_swaps( + pub(crate) fn list_receive_swaps(&self) -> Result> { + let con: Connection = self.get_connection()?; + self.list_receive_swaps_where(&con, vec![]) + } + + pub(crate) fn list_receive_swaps_where( &self, con: &Connection, where_clauses: Vec, - ) -> rusqlite::Result> { + ) -> Result> { let query = Self::list_receive_swaps_query(where_clauses); let ongoing_receive = con .prepare(&query)? @@ -133,16 +138,13 @@ impl Persister { Ok(ongoing_receive) } - pub(crate) fn list_ongoing_receive_swaps( - &self, - con: &Connection, - ) -> rusqlite::Result> { + pub(crate) fn list_ongoing_receive_swaps(&self, con: &Connection) -> Result> { let where_clause = vec![get_where_clause_state_in(&[ PaymentState::Created, PaymentState::Pending, ])]; - self.list_receive_swaps(con, where_clause) + self.list_receive_swaps_where(con, where_clause) } pub(crate) fn list_pending_receive_swaps(&self) -> Result> { @@ -288,7 +290,7 @@ mod tests { } let con = storage.get_connection()?; - let swaps = storage.list_receive_swaps(&con, vec![])?; + let swaps = storage.list_receive_swaps_where(&con, vec![])?; assert_eq!(swaps.len(), range.len()); // List ongoing receive swaps diff --git a/lib/core/src/persist/send.rs b/lib/core/src/persist/send.rs index 14bb99d..bbe274d 100644 --- a/lib/core/src/persist/send.rs +++ b/lib/core/src/persist/send.rs @@ -137,11 +137,16 @@ impl Persister { }) } - pub(crate) fn list_send_swaps( + pub(crate) fn list_send_swaps(&self) -> Result> { + let con: Connection = self.get_connection()?; + self.list_send_swaps_where(&con, vec![]) + } + + pub(crate) fn list_send_swaps_where( &self, con: &Connection, where_clauses: Vec, - ) -> rusqlite::Result> { + ) -> Result> { let query = Self::list_send_swaps_query(where_clauses); let ongoing_send = con .prepare(&query)? @@ -151,16 +156,13 @@ impl Persister { Ok(ongoing_send) } - pub(crate) fn list_ongoing_send_swaps( - &self, - con: &Connection, - ) -> rusqlite::Result> { + pub(crate) fn list_ongoing_send_swaps(&self, con: &Connection) -> Result> { let where_clause = vec![get_where_clause_state_in(&[ PaymentState::Created, PaymentState::Pending, ])]; - self.list_send_swaps(con, where_clause) + self.list_send_swaps_where(con, where_clause) } pub(crate) fn list_pending_send_swaps(&self) -> Result> { @@ -319,7 +321,7 @@ mod tests { } let con = storage.get_connection()?; - let swaps = storage.list_send_swaps(&con, vec![])?; + let swaps = storage.list_send_swaps_where(&con, vec![])?; assert_eq!(swaps.len(), range.len()); // List ongoing send swaps diff --git a/lib/core/src/restore.rs b/lib/core/src/restore.rs new file mode 100644 index 0000000..d3f5117 --- /dev/null +++ b/lib/core/src/restore.rs @@ -0,0 +1,1056 @@ +//! This module provides functionality for restoring the swap tx IDs from onchain data + +use std::collections::HashMap; + +use anyhow::{anyhow, Result}; +use log::{error, info}; +use lwk_wollet::elements::Txid; +use lwk_wollet::WalletTx; + +use crate::prelude::*; +use crate::restore::immutable::*; + +/// A map of all our known LWK onchain txs, indexed by tx ID. Essentially our own cache of the LWK txs. +pub(crate) struct TxMap { + outgoing_tx_map: HashMap, + incoming_tx_map: HashMap, +} +impl TxMap { + pub(crate) fn from_raw_tx_map(raw_tx_map: HashMap) -> Self { + let (outgoing_tx_map, incoming_tx_map): (HashMap, HashMap) = + raw_tx_map + .into_iter() + .partition(|(_, tx)| tx.balance.values().sum::() < 0); + + Self { + outgoing_tx_map, + incoming_tx_map, + } + } +} + +trait PartialSwapState { + /// Determine partial swap state, based on recovered chain data. + /// + /// This is a partial state, which means it may be incomplete because it's based on partial + /// information. Some swap states cannot be determined based only on chain data. + /// + /// For example, it cannot distinguish between [PaymentState::Created] and [PaymentState::TimedOut], + /// and in some cases, between [PaymentState::Created] and [PaymentState::Failed]. + fn derive_partial_state(&self) -> PaymentState; +} + +pub(crate) struct RecoveredOnchainDataSend { + lockup_tx_id: Option, + claim_tx_id: Option, + refund_tx_id: Option, +} +impl PartialSwapState for RecoveredOnchainDataSend { + fn derive_partial_state(&self) -> PaymentState { + match &self.lockup_tx_id { + Some(_) => match &self.claim_tx_id { + Some(_) => PaymentState::Complete, + None => match &self.refund_tx_id { + Some(refund_tx_id) => match refund_tx_id.confirmed() { + true => PaymentState::Failed, + false => PaymentState::RefundPending, + }, + None => PaymentState::Pending, + }, + }, + // For Send swaps, no lockup could mean both Created or TimedOut. + // However, we're in Created for a very short period of time in the originating instance, + // after which we expect Pending or TimedOut. Therefore here we default to TimedOut. + None => PaymentState::TimedOut, + } + } +} + +pub(crate) struct RecoveredOnchainDataReceive { + lockup_tx_id: Option, + claim_tx_id: Option, +} +impl PartialSwapState for RecoveredOnchainDataReceive { + fn derive_partial_state(&self) -> PaymentState { + match (&self.lockup_tx_id, &self.claim_tx_id) { + (Some(_), Some(claim_tx_id)) => match claim_tx_id.confirmed() { + true => PaymentState::Complete, + false => PaymentState::Pending, + }, + (Some(_), None) => PaymentState::Pending, + // TODO How to distinguish between Failed and Created (if in both cases, no lockup or claim tx present) + // See https://docs.boltz.exchange/v/api/lifecycle#reverse-submarine-swaps + _ => PaymentState::Created, + } + } +} + +pub(crate) struct RecoveredOnchainDataChainSend { + /// LBTC tx initiated by the SDK (the "user" as per Boltz), sending funds to the swap funding address. + lbtc_user_lockup_tx_id: Option, + /// LBTC tx initiated by the SDK to itself, in case the initial funds have to be refunded. + lbtc_refund_tx_id: Option, + /// BTC tx locking up funds by the swapper + btc_server_lockup_tx_id: Option, + /// BTC tx that claims to the final BTC destination address. The final step in a successful swap. + btc_claim_tx_id: Option, +} +impl PartialSwapState for RecoveredOnchainDataChainSend { + fn derive_partial_state(&self) -> PaymentState { + match &self.lbtc_user_lockup_tx_id { + Some(_) => match &self.btc_claim_tx_id { + Some(btc_claim_tx_id) => match btc_claim_tx_id.confirmed() { + true => PaymentState::Complete, + false => PaymentState::Pending, + }, + None => match &self.lbtc_refund_tx_id { + Some(tx) => match tx.confirmed() { + true => PaymentState::Failed, + false => PaymentState::RefundPending, + }, + None => PaymentState::Created, + }, + }, + // For Send swaps, no lockup could mean both Created or TimedOut. + // However, we're in Created for a very short period of time in the originating instance, + // after which we expect Pending or TimedOut. Therefore here we default to TimedOut. + None => PaymentState::TimedOut, + } + } +} + +pub(crate) struct RecoveredOnchainDataChainReceive { + /// LBTC tx locking up funds by the swapper + lbtc_server_lockup_tx_id: Option, + /// LBTC tx that claims to our wallet. The final step in a successful swap. + lbtc_server_claim_tx_id: Option, + /// BTC tx initiated by the payer (the "user" as per Boltz), sending funds to the swap funding address. + btc_user_lockup_tx_id: Option, + /// BTC tx initiated by the SDK to a user-chosen address, in case the initial funds have to be refunded. + btc_refund_tx_id: Option, +} +impl PartialSwapState for RecoveredOnchainDataChainReceive { + fn derive_partial_state(&self) -> PaymentState { + match &self.btc_user_lockup_tx_id { + Some(_) => match &self.lbtc_server_claim_tx_id { + Some(lbtc_server_claim_tx_id) => match lbtc_server_claim_tx_id.confirmed() { + true => PaymentState::Complete, + false => PaymentState::Pending, + }, + None => match &self.btc_refund_tx_id { + Some(tx) => match tx.confirmed() { + true => PaymentState::Failed, + false => PaymentState::RefundPending, + }, + None => PaymentState::Created, + }, + }, + None => PaymentState::Created, + } + } +} + +pub(crate) struct RecoveredOnchainData { + send: HashMap, + receive: HashMap, + chain_send: HashMap, + chain_receive: HashMap, +} + +impl LiquidSdk { + /// For each swap, recovers data from chain services. + /// + /// The returned data include txs and the partial swap state. See [PartialSwapState::derive_partial_state]. + /// + /// The caller is expected to merge this data with any other data available, then persist the + /// reconstructed swap. + /// + /// ### Arguments + /// + /// - `tx_map`: all known onchain txs of this wallet at this time, essentially our own LWK cache. + /// - `swaps`: immutable data of the swaps for which we want to recover onchain data. + pub(crate) async fn recover_from_onchain( + &self, + tx_map: TxMap, + swaps: SwapsList, + ) -> Result { + let histories = self.fetch_swaps_histories(&swaps).await?; + + let recovered_send_data = self + .recover_send_swap_tx_ids(&tx_map, histories.send) + .await?; + let recovered_receive_data = self + .recover_receive_swap_tx_ids(&tx_map, histories.receive) + .await?; + let recovered_chain_send_data = self + .recover_send_chain_swap_tx_ids( + &tx_map, + histories.send_chain, + &swaps.send_chain_swap_immutable_db_by_swap_id, + ) + .await?; + let recovered_chain_receive_data = self + .recover_receive_chain_swap_tx_ids( + &tx_map, + histories.receive_chain, + &swaps.receive_chain_swap_immutable_db_by_swap_id, + ) + .await?; + + Ok(RecoveredOnchainData { + send: recovered_send_data, + receive: recovered_receive_data, + chain_send: recovered_chain_send_data, + chain_receive: recovered_chain_receive_data, + }) + } + + /// Reconstruct Send Swap tx IDs from the onchain data and the immutable DB data + async fn recover_send_swap_tx_ids( + &self, + tx_map: &TxMap, + send_histories_by_swap_id: HashMap, + ) -> Result> { + let mut res: HashMap = HashMap::new(); + for (swap_id, history) in send_histories_by_swap_id { + // If a history tx is one of our outgoing txs, it's a lockup tx + let lockup_tx_id = history + .iter() + .find(|&tx| tx_map.outgoing_tx_map.contains_key::(&tx.txid)) + .cloned(); + if lockup_tx_id.is_none() { + error!("No lockup tx found when recovering data for Send Swap {swap_id}"); + } + + // If a history tx is one of our incoming txs, it's a refund tx + let refund_tx_id = history + .iter() + .find(|&tx| tx_map.incoming_tx_map.contains_key::(&tx.txid)) + .cloned(); + + // A history tx that is neither a known incoming or outgoing tx is a claim tx + let claim_tx_id = history + .iter() + .filter(|&tx| !tx_map.incoming_tx_map.contains_key::(&tx.txid)) + .find(|&tx| !tx_map.outgoing_tx_map.contains_key::(&tx.txid)) + .cloned(); + + res.insert( + swap_id, + RecoveredOnchainDataSend { + lockup_tx_id, + claim_tx_id, + refund_tx_id, + }, + ); + } + + Ok(res) + } + + /// Reconstruct Receive Swap tx IDs from the onchain data and the immutable DB data + async fn recover_receive_swap_tx_ids( + &self, + tx_map: &TxMap, + receive_histories_by_swap_id: HashMap, + ) -> Result> { + let mut res: HashMap = HashMap::new(); + for (swap_id, history) in receive_histories_by_swap_id { + let (lockup_tx_id, claim_tx_id) = match history.len() { + // Only lockup tx available + 1 => (Some(history[0].clone()), None), + + 2 => { + let first = history[0].clone(); + let second = history[1].clone(); + + if tx_map.incoming_tx_map.contains_key::(&first.txid) { + // If the first tx is a known incoming tx, it's the claim tx and the second is the lockup + (Some(second), Some(first)) + } else if tx_map.incoming_tx_map.contains_key::(&second.txid) { + // If the second tx is a known incoming tx, it's the claim tx and the first is the lockup + (Some(first), Some(second)) + } else { + // If none of the 2 txs is the claim tx, then the txs are lockup and swapper refund + // If so, we expect them to be confirmed at different heights + let first_conf_height = first.height; + let second_conf_height = second.height; + match (first.confirmed(), second.confirmed()) { + // If they're both confirmed, the one with the lowest confirmation height is the lockup + (true, true) => match first_conf_height < second_conf_height { + true => (Some(first), None), + false => (Some(second), None), + }, + + // If only one tx is confirmed, then that is the lockup + (true, false) => (Some(first), None), + (false, true) => (Some(second), None), + + // If neither is confirmed, this is an edge-case + (false, false) => { + error!("Found unconfirmed lockup and refund txs while recovering data for Receive Swap {swap_id}"); + (None, None) + } + } + } + } + n => { + error!("Script history with unexpected length {n} found while recovering data for Receive Swap {swap_id}"); + (None, None) + } + }; + + res.insert( + swap_id, + RecoveredOnchainDataReceive { + lockup_tx_id, + claim_tx_id, + }, + ); + } + + Ok(res) + } + + /// Reconstruct Chain Send Swap tx IDs from the onchain data and the immutable DB data + async fn recover_send_chain_swap_tx_ids( + &self, + tx_map: &TxMap, + chain_send_histories_by_swap_id: HashMap, + send_chain_swap_immutable_db_by_swap_id: &HashMap, + ) -> Result> { + let mut res: HashMap = HashMap::new(); + for (swap_id, history) in chain_send_histories_by_swap_id { + info!("[Recover Chain Send] Checking swap {swap_id}"); + + // If a history tx is one of our outgoing txs, it's a lockup tx + let lbtc_user_lockup_tx_id = history + .lbtc_lockup_script_history + .iter() + .find(|&tx| tx_map.outgoing_tx_map.contains_key::(&tx.txid)) + .cloned(); + if lbtc_user_lockup_tx_id.is_none() { + error!("No lockup tx found when recovering data for Chain Send Swap {swap_id}"); + } + + // If a history tx is one of our incoming txs, it's a refund tx + let lbtc_refund_tx_id = history + .lbtc_lockup_script_history + .iter() + .find(|&tx| tx_map.incoming_tx_map.contains_key::(&tx.txid)) + .cloned(); + + let (btc_server_lockup_tx_id, btc_claim_tx_id) = match history + .btc_claim_script_history + .len() + { + // Only lockup tx available + 1 => (Some(history.btc_claim_script_history[0].clone()), None), + + 2 => { + let first_tx = history.btc_claim_script_txs[0].clone(); + let first_tx_id = history.btc_claim_script_history[0].clone(); + let second_tx_id = history.btc_claim_script_history[1].clone(); + + let btc_lockup_script = send_chain_swap_immutable_db_by_swap_id + .get(&swap_id) + .map(|imm| imm.claim_script.clone()) + .ok_or_else(|| { + anyhow!("BTC claim script not found for Onchain Send Swap {swap_id}") + })?; + + // We check the full tx, to determine if this is the BTC lockup tx + let is_first_tx_lockup_tx = first_tx + .output + .iter() + .any(|out| matches!(&out.script_pubkey, x if x == &btc_lockup_script)); + + match is_first_tx_lockup_tx { + true => (Some(first_tx_id), Some(second_tx_id)), + false => (Some(second_tx_id), Some(first_tx_id)), + } + } + n => { + error!("BTC script history with unexpected length {n} found while recovering data for Chain Send Swap {swap_id}"); + (None, None) + } + }; + + res.insert( + swap_id, + RecoveredOnchainDataChainSend { + lbtc_user_lockup_tx_id, + lbtc_refund_tx_id, + btc_server_lockup_tx_id, + btc_claim_tx_id, + }, + ); + } + + Ok(res) + } + + /// Reconstruct Chain Receive Swap tx IDs from the onchain data and the immutable DB data + async fn recover_receive_chain_swap_tx_ids( + &self, + tx_map: &TxMap, + chain_receive_histories_by_swap_id: HashMap, + receive_chain_swap_immutable_db_by_swap_id: &HashMap, + ) -> Result> { + let mut res: HashMap = HashMap::new(); + for (swap_id, history) in chain_receive_histories_by_swap_id { + info!("[Recover Chain Receive] Checking swap {swap_id}"); + + let (lbtc_server_lockup_tx_id, lbtc_server_claim_tx_id) = match history + .lbtc_claim_script_history + .len() + { + // Only lockup tx available + 1 => (Some(history.lbtc_claim_script_history[0].clone()), None), + + 2 => { + let first = &history.lbtc_claim_script_history[0]; + let second = &history.lbtc_claim_script_history[1]; + + // If a history tx is a known incoming tx, it's the claim tx + let (lockup_tx_id, claim_tx_id) = + match tx_map.incoming_tx_map.contains_key::(&first.txid) { + true => (second, first), + false => (first, second), + }; + (Some(lockup_tx_id.clone()), Some(claim_tx_id.clone())) + } + n => { + error!("L-BTC script history with unexpected length {n} found while recovering data for Chain Receive Swap {swap_id}"); + (None, None) + } + }; + + // The btc_lockup_script_history can contain 3 kinds of txs, of which only 2 are expected: + // - 1) btc_user_lockup_tx_id (initial BTC funds sent by the sender) + // - 2A) btc_server_claim_tx_id (the swapper tx that claims the BTC funds, in Success case) + // - 2B) btc_refund_tx_id (refund tx we initiate, in Failure case) + // The exact type of the second is found in the next step. + let (btc_user_lockup_tx_id, btc_second_tx_id) = match history + .btc_lockup_script_history + .len() + { + // Only lockup tx available + 1 => (Some(history.btc_lockup_script_history[0].clone()), None), + + // Both txs available (lockup + claim, or lockup + refund) + // Any tx above the first two, we ignore, as that is address re-use which is not supported + n if n >= 2 => { + let first_tx = history.btc_lockup_script_txs[0].clone(); + let first_tx_id = history.btc_lockup_script_history[0].clone(); + let second_tx_id = history.btc_lockup_script_history[1].clone(); + + let btc_lockup_script = receive_chain_swap_immutable_db_by_swap_id + .get(&swap_id) + .map(|imm| imm.lockup_script.clone()) + .ok_or_else(|| { + anyhow!( + "BTC lockup script not found for Onchain Receive Swap {swap_id}" + ) + })?; + + // We check the full tx, to determine if this is the BTC lockup tx + let is_first_tx_lockup_tx = first_tx + .output + .iter() + .any(|out| matches!(&out.script_pubkey, x if x == &btc_lockup_script)); + + match is_first_tx_lockup_tx { + true => (Some(first_tx_id), Some(second_tx_id)), + false => (Some(second_tx_id), Some(first_tx_id)), + } + } + n => { + error!("BTC script history with unexpected length {n} found while recovering data for Chain Receive Swap {swap_id}"); + (None, None) + } + }; + + // The second BTC tx is only a refund in case we didn't claim. + // If we claimed, then the second BTC tx was an internal BTC server claim tx, which we're not tracking. + let btc_refund_tx_id = match lbtc_server_claim_tx_id.is_some() { + true => None, + false => btc_second_tx_id, + }; + + res.insert( + swap_id, + RecoveredOnchainDataChainReceive { + lbtc_server_lockup_tx_id, + lbtc_server_claim_tx_id, + btc_user_lockup_tx_id, + btc_refund_tx_id, + }, + ); + } + + Ok(res) + } +} + +/// Methods to simulate the immutable DB data available from real-time sync +// TODO Remove once real-time sync is integrated +pub(crate) mod immutable { + use std::collections::HashMap; + + use anyhow::{anyhow, ensure, Result}; + use boltz_client::{BtcSwapScript, LBtcSwapScript}; + use log::{error, info}; + use lwk_wollet::elements::Txid; + use lwk_wollet::History; + + use crate::prelude::*; + use crate::sdk::LiquidSdk; + + type BtcScript = lwk_wollet::bitcoin::ScriptBuf; + type LBtcScript = lwk_wollet::elements::Script; + + pub(crate) type SendSwapHistory = Vec; + pub(crate) type ReceiveSwapHistory = Vec; + + #[derive(Clone)] + pub(crate) struct HistoryTxId { + pub txid: Txid, + /// Confirmation height of txid + /// + /// -1 means unconfirmed with unconfirmed parents + /// 0 means unconfirmed with confirmed parents + pub height: i32, + } + impl HistoryTxId { + pub(crate) fn confirmed(&self) -> bool { + self.height > 0 + } + } + impl From for HistoryTxId { + fn from(value: History) -> Self { + Self::from(&value) + } + } + impl From<&History> for HistoryTxId { + fn from(value: &History) -> Self { + Self { + txid: value.txid, + height: value.height, + } + } + } + + #[derive(Clone)] + pub(crate) struct SendSwapImmutableData { + pub(crate) swap_id: String, + pub(crate) swap_script: LBtcSwapScript, + pub(crate) script: LBtcScript, + } + + #[derive(Clone)] + pub(crate) struct ReceiveSwapImmutableData { + pub(crate) swap_id: String, + pub(crate) swap_script: LBtcSwapScript, + pub(crate) script: LBtcScript, + } + + #[derive(Clone)] + pub(crate) struct SendChainSwapImmutableData { + swap_id: String, + lockup_swap_script: LBtcSwapScript, + lockup_script: LBtcScript, + claim_swap_script: BtcSwapScript, + pub(crate) claim_script: BtcScript, + } + + pub(crate) struct SendChainSwapHistory { + pub(crate) lbtc_lockup_script_history: Vec, + pub(crate) btc_claim_script_history: Vec, + pub(crate) btc_claim_script_txs: Vec, + } + + #[derive(Clone)] + pub(crate) struct ReceiveChainSwapImmutableData { + swap_id: String, + lockup_swap_script: BtcSwapScript, + pub(crate) lockup_script: BtcScript, + claim_swap_script: LBtcSwapScript, + claim_script: LBtcScript, + } + + pub(crate) struct ReceiveChainSwapHistory { + pub(crate) lbtc_claim_script_history: Vec, + pub(crate) btc_lockup_script_history: Vec, + pub(crate) btc_lockup_script_txs: Vec, + } + + /// Swap data received from the immutable DB + pub(crate) struct SwapsList { + pub(crate) send_swap_immutable_db_by_swap_id: HashMap, + pub(crate) receive_swap_immutable_db_by_swap_id_: HashMap, + pub(crate) send_chain_swap_immutable_db_by_swap_id: + HashMap, + pub(crate) receive_chain_swap_immutable_db_by_swap_id: + HashMap, + } + + impl SwapsList { + fn init( + send_swaps: Vec, + receive_swaps: Vec, + send_chain_swaps: Vec, + receive_chain_swaps: Vec, + ) -> Result { + let send_swap_immutable_db_by_swap_id: HashMap = + send_swaps + .iter() + .filter_map(|swap| match swap.get_swap_script() { + Ok(swap_script) => match &swap_script.funding_addrs { + Some(address) => Some(( + swap.id.clone(), + SendSwapImmutableData { + swap_id: swap.id.clone(), + swap_script: swap_script.clone(), + script: address.script_pubkey(), + }, + )), + None => { + error!("No funding address found for Send Swap {}", swap.id); + None + } + }, + Err(e) => { + error!("Failed to get swap script for Send Swap {}: {e}", swap.id); + None + } + }) + .collect(); + let send_swap_immutable_db_size = send_swap_immutable_db_by_swap_id.len(); + info!("Send Swap immutable DB: {send_swap_immutable_db_size} rows"); + + let receive_swap_immutable_db_by_swap_id_: HashMap = + receive_swaps + .iter() + .filter_map(|swap| { + let swap_id = &swap.id; + + let swap_script = swap + .get_swap_script() + .map_err(|e| { + error!("Failed to get swap script for Receive Swap {swap_id}: {e}") + }) + .ok()?; + + match &swap_script.funding_addrs { + Some(address) => Some(( + swap.id.clone(), + ReceiveSwapImmutableData { + swap_id: swap.id.clone(), + swap_script: swap_script.clone(), + script: address.script_pubkey(), + }, + )), + None => { + error!("No funding address found for Receive Swap {}", swap.id); + None + } + } + }) + .collect(); + let receive_swap_immutable_db_size = receive_swap_immutable_db_by_swap_id_.len(); + info!("Receive Swap immutable DB: {receive_swap_immutable_db_size} rows"); + + let send_chain_swap_immutable_db_by_swap_id: HashMap = + send_chain_swaps.iter().filter_map(|swap| { + let swap_id = &swap.id; + + let lockup_swap_script = swap.get_lockup_swap_script() + .map_err(|e| error!("Failed to get lockup swap script for swap {swap_id}: {e}")) + .map(|s| s.as_liquid_script().ok()) + .ok() + .flatten()?; + let claim_swap_script = swap.get_claim_swap_script() + .map_err(|e| error!("Failed to get claim swap script for swap {swap_id}: {e}")) + .map(|s| s.as_bitcoin_script().ok()).ok().flatten()?; + + let maybe_lockup_script = lockup_swap_script.clone().funding_addrs.map(|addr| addr.script_pubkey()); + let maybe_claim_script = claim_swap_script.clone().funding_addrs.map(|addr| addr.script_pubkey()); + + match (maybe_lockup_script, maybe_claim_script) { + (Some(lockup_script), Some(claim_script)) => { + Some((swap.id.clone(), SendChainSwapImmutableData { + swap_id: swap.id.clone(), + lockup_swap_script, + lockup_script, + claim_swap_script, + claim_script, + })) + } + (lockup_script, claim_script) => { + error!("Failed to get lockup or claim script for swap {swap_id}. Lockup script: {lockup_script:?}. Claim script: {claim_script:?}"); + None + } + } + }) + .collect(); + let send_chain_swap_immutable_db_size = send_chain_swap_immutable_db_by_swap_id.len(); + info!("Send Chain Swap immutable DB: {send_chain_swap_immutable_db_size} rows"); + + let receive_chain_swap_immutable_db_by_swap_id: HashMap = + receive_chain_swaps.iter().filter_map(|swap| { + let swap_id = &swap.id; + + let lockup_swap_script = swap.get_lockup_swap_script() + .map_err(|e| error!("Failed to get lockup swap script for swap {swap_id}: {e}")) + .map(|s| s.as_bitcoin_script().ok()).ok().flatten()?; + let claim_swap_script = swap.get_claim_swap_script() + .map_err(|e| error!("Failed to get claim swap script for swap {swap_id}: {e}")) + .map(|s| s.as_liquid_script().ok()).ok().flatten()?; + + let maybe_lockup_script = lockup_swap_script.clone().funding_addrs.map(|addr| addr.script_pubkey()); + let maybe_claim_script = claim_swap_script.clone().funding_addrs.map(|addr| addr.script_pubkey()); + + match (maybe_lockup_script, maybe_claim_script) { + (Some(lockup_script), Some(claim_script)) => { + Some((swap.id.clone(), ReceiveChainSwapImmutableData { + swap_id: swap.id.clone(), + lockup_swap_script, + lockup_script, + claim_swap_script, + claim_script, + })) + } + (lockup_script, claim_script) => { + error!("Failed to get lockup or claim script for swap {swap_id}. Lockup script: {lockup_script:?}. Claim script: {claim_script:?}"); + None + } + } + }) + .collect(); + let receive_chain_swap_immutable_db_size = + receive_chain_swap_immutable_db_by_swap_id.len(); + info!("Receive Chain Swap immutable DB: {receive_chain_swap_immutable_db_size} rows"); + + Ok(SwapsList { + send_swap_immutable_db_by_swap_id, + receive_swap_immutable_db_by_swap_id_, + send_chain_swap_immutable_db_by_swap_id, + receive_chain_swap_immutable_db_by_swap_id, + }) + } + + fn send_swaps_by_script(&self) -> HashMap { + self.send_swap_immutable_db_by_swap_id + .clone() + .into_values() + .map(|imm| (imm.script.clone(), imm)) + .collect() + } + + fn send_histories_by_swap_id( + &self, + lbtc_script_to_history_map: &HashMap>, + ) -> HashMap { + let send_swaps_by_script = self.send_swaps_by_script(); + + let mut data: HashMap = HashMap::new(); + lbtc_script_to_history_map + .iter() + .for_each(|(lbtc_script, lbtc_script_history)| { + if let Some(imm) = send_swaps_by_script.get(lbtc_script) { + data.insert(imm.swap_id.clone(), lbtc_script_history.clone()); + } + }); + data + } + + fn receive_swaps_by_script(&self) -> HashMap { + self.receive_swap_immutable_db_by_swap_id_ + .clone() + .into_values() + .map(|imm| (imm.script.clone(), imm)) + .collect() + } + + fn receive_histories_by_swap_id( + &self, + lbtc_script_to_history_map: &HashMap>, + ) -> HashMap { + let receive_swaps_by_script = self.receive_swaps_by_script(); + + let mut data: HashMap = HashMap::new(); + lbtc_script_to_history_map + .iter() + .for_each(|(lbtc_script, lbtc_script_history)| { + if let Some(imm) = receive_swaps_by_script.get(lbtc_script) { + data.insert(imm.swap_id.clone(), lbtc_script_history.clone()); + } + }); + data + } + + fn send_chain_swaps_by_lbtc_lockup_script( + &self, + ) -> HashMap { + self.send_chain_swap_immutable_db_by_swap_id + .clone() + .into_values() + .map(|imm| (imm.lockup_script.clone(), imm)) + .collect() + } + + fn send_chain_histories_by_swap_id( + &self, + lbtc_script_to_history_map: &HashMap>, + btc_script_to_history_map: &HashMap>, + btc_script_to_txs_map: &HashMap>, + ) -> HashMap { + let send_chain_swaps_by_lbtc_script = self.send_chain_swaps_by_lbtc_lockup_script(); + + let mut data: HashMap = HashMap::new(); + lbtc_script_to_history_map.iter().for_each( + |(lbtc_lockup_script, lbtc_script_history)| { + if let Some(imm) = send_chain_swaps_by_lbtc_script.get(lbtc_lockup_script) { + let btc_script_history = btc_script_to_history_map + .get(&imm.claim_script) + .cloned() + .unwrap_or_default(); + let btc_script_txs = btc_script_to_txs_map + .get(&imm.claim_script) + .cloned() + .unwrap_or_default(); + + data.insert( + imm.swap_id.clone(), + SendChainSwapHistory { + lbtc_lockup_script_history: lbtc_script_history.clone(), + btc_claim_script_history: btc_script_history, + btc_claim_script_txs: btc_script_txs, + }, + ); + } + }, + ); + data + } + + fn receive_chain_swaps_by_lbtc_claim_script( + &self, + ) -> HashMap { + self.receive_chain_swap_immutable_db_by_swap_id + .clone() + .into_values() + .map(|imm| (imm.claim_script.clone(), imm)) + .collect() + } + + fn receive_chain_histories_by_swap_id( + &self, + lbtc_script_to_history_map: &HashMap>, + btc_script_to_history_map: &HashMap>, + btc_script_to_txs_map: &HashMap>, + ) -> HashMap { + let receive_chain_swaps_by_lbtc_script = + self.receive_chain_swaps_by_lbtc_claim_script(); + + let mut data: HashMap = HashMap::new(); + lbtc_script_to_history_map + .iter() + .for_each(|(lbtc_script_pk, lbtc_script_history)| { + if let Some(imm) = receive_chain_swaps_by_lbtc_script.get(lbtc_script_pk) { + let btc_script_history = btc_script_to_history_map + .get(&imm.lockup_script) + .cloned() + .unwrap_or_default(); + let btc_script_txs = btc_script_to_txs_map + .get(&imm.lockup_script) + .cloned() + .unwrap_or_default(); + + data.insert( + imm.swap_id.clone(), + ReceiveChainSwapHistory { + lbtc_claim_script_history: lbtc_script_history.clone(), + btc_lockup_script_history: btc_script_history, + btc_lockup_script_txs: btc_script_txs, + }, + ); + } + }); + data + } + + fn get_all_swap_lbtc_scripts(&self) -> Vec { + let send_swap_scripts: Vec = self + .send_swap_immutable_db_by_swap_id + .clone() + .into_values() + .map(|imm| imm.script) + .collect(); + let receive_swap_scripts: Vec = self + .receive_swap_immutable_db_by_swap_id_ + .clone() + .into_values() + .map(|imm| imm.script) + .collect(); + let send_chain_swap_lbtc_lockup_scripts: Vec = self + .send_chain_swap_immutable_db_by_swap_id + .clone() + .into_values() + .map(|imm| imm.lockup_script) + .collect(); + let receive_chain_swap_lbtc_claim_scripts: Vec = self + .receive_chain_swap_immutable_db_by_swap_id + .clone() + .into_values() + .map(|imm| imm.claim_script) + .collect(); + + let mut swap_scripts = send_swap_scripts.clone(); + swap_scripts.extend(receive_swap_scripts.clone()); + swap_scripts.extend(send_chain_swap_lbtc_lockup_scripts.clone()); + swap_scripts.extend(receive_chain_swap_lbtc_claim_scripts.clone()); + swap_scripts + } + + fn get_all_swap_btc_scripts(&self) -> Vec { + let send_chain_swap_btc_claim_scripts: Vec = self + .send_chain_swap_immutable_db_by_swap_id + .clone() + .into_values() + .map(|imm| imm.claim_script) + .collect(); + let receive_chain_swap_btc_lockup_scripts: Vec = self + .receive_chain_swap_immutable_db_by_swap_id + .clone() + .into_values() + .map(|imm| imm.lockup_script) + .collect(); + + let mut swap_scripts = send_chain_swap_btc_claim_scripts.clone(); + swap_scripts.extend(receive_chain_swap_btc_lockup_scripts.clone()); + swap_scripts + } + } + + pub(crate) struct SwapsHistories { + pub(crate) send: HashMap, + pub(crate) receive: HashMap, + pub(crate) send_chain: HashMap, + pub(crate) receive_chain: HashMap, + } + + impl LiquidSdk { + pub(crate) async fn get_swaps_list(&self) -> Result { + let send_swaps = self.persister.list_send_swaps()?; + let receive_swaps = self.persister.list_receive_swaps()?; + let chain_swaps = self.persister.list_chain_swaps()?; + let (send_chain_swaps, receive_chain_swaps): (Vec, Vec) = + chain_swaps + .into_iter() + .partition(|swap| swap.direction == Direction::Outgoing); + + SwapsList::init( + send_swaps, + receive_swaps, + send_chain_swaps, + receive_chain_swaps, + ) + } + + /// For a given [SwapList], this fetches the script histories from the chain services + pub(crate) async fn fetch_swaps_histories( + &self, + swaps_list: &SwapsList, + ) -> Result { + let swap_lbtc_scripts = swaps_list.get_all_swap_lbtc_scripts(); + + let lbtc_script_histories = self + .liquid_chain_service + .lock() + .await + .get_scripts_history(&swap_lbtc_scripts.iter().collect::>()) + .await?; + let lbtc_swap_scripts_len = swap_lbtc_scripts.len(); + let lbtc_script_histories_len = lbtc_script_histories.len(); + ensure!( + lbtc_swap_scripts_len == lbtc_script_histories_len, + anyhow!("Got {lbtc_script_histories_len} L-BTC script histories, expected {lbtc_swap_scripts_len}") + ); + let lbtc_script_to_history_map: HashMap> = + swap_lbtc_scripts + .into_iter() + .zip(lbtc_script_histories.into_iter()) + .map(|(k, v)| (k, v.into_iter().map(HistoryTxId::from).collect())) + .collect(); + + let swap_btc_scripts = swaps_list.get_all_swap_btc_scripts(); + let btc_script_histories = self + .bitcoin_chain_service + .lock() + .await + .get_scripts_history( + &swap_btc_scripts + .iter() + .map(|x| x.as_script()) + .collect::>(), + )?; + let btx_script_tx_ids: Vec = btc_script_histories + .iter() + .flatten() + .map(|h| h.txid.to_raw_hash()) + .map(lwk_wollet::bitcoin::Txid::from_raw_hash) + .collect::>(); + + let btc_swap_scripts_len = swap_btc_scripts.len(); + let btc_script_histories_len = btc_script_histories.len(); + ensure!( + btc_swap_scripts_len == btc_script_histories_len, + anyhow!("Got {btc_script_histories_len} BTC script histories, expected {btc_swap_scripts_len}") + ); + let btc_script_to_history_map: HashMap> = swap_btc_scripts + .clone() + .into_iter() + .zip(btc_script_histories.iter()) + .map(|(k, v)| (k, v.iter().map(HistoryTxId::from).collect())) + .collect(); + + let btc_script_txs = self + .bitcoin_chain_service + .lock() + .await + .get_transactions(&btx_script_tx_ids)?; + let btc_script_to_txs_map: HashMap> = + swap_btc_scripts + .into_iter() + .zip(btc_script_histories.iter()) + .map(|(script, history)| { + let relevant_tx_ids: Vec = history.iter().map(|h| h.txid).collect(); + let relevant_txs: Vec = btc_script_txs + .iter() + .filter(|&tx| relevant_tx_ids.contains(&tx.txid().to_raw_hash().into())) + .cloned() + .collect(); + + (script, relevant_txs) + }) + .collect(); + + Ok(SwapsHistories { + send: swaps_list.send_histories_by_swap_id(&lbtc_script_to_history_map), + receive: swaps_list.receive_histories_by_swap_id(&lbtc_script_to_history_map), + send_chain: swaps_list.send_chain_histories_by_swap_id( + &lbtc_script_to_history_map, + &btc_script_to_history_map, + &btc_script_to_txs_map, + ), + receive_chain: swaps_list.receive_chain_histories_by_swap_id( + &lbtc_script_to_history_map, + &btc_script_to_history_map, + &btc_script_to_txs_map, + ), + }) + } + } +} diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 98c5bd8..c220f82 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -14,10 +14,10 @@ use futures_util::stream::select_all; use futures_util::StreamExt; use log::{debug, error, info}; use lwk_wollet::bitcoin::hex::DisplayHex; -use lwk_wollet::elements::AssetId; +use lwk_wollet::elements::{AssetId, Txid}; use lwk_wollet::hashes::{sha256, Hash}; use lwk_wollet::secp256k1::ThirtyTwoByteHash; -use lwk_wollet::{elements, ElementsNetwork}; +use lwk_wollet::{elements, ElementsNetwork, WalletTx}; use sdk_common::bitcoin::secp256k1::Secp256k1; use sdk_common::bitcoin::util::bip32::ChildNumber; use sdk_common::liquid::LiquidAddressData; @@ -1871,7 +1871,15 @@ impl LiquidSdk { let pending_chain_swaps_by_refund_tx_id = self.persister.list_pending_chain_swaps_by_refund_tx_id()?; - for tx in self.onchain_wallet.transactions().await? { + let tx_map: HashMap = self + .onchain_wallet + .transactions() + .await? + .iter() + .map(|tx| (tx.txid, tx.clone())) + .collect(); + + for tx in tx_map.values() { let tx_id = tx.txid.to_string(); let is_tx_confirmed = tx.height.is_some(); let amount_sat = tx.balance.values().sum::(); diff --git a/lib/core/src/test_utils/chain.rs b/lib/core/src/test_utils/chain.rs index 6bcac2c..e25c326 100644 --- a/lib/core/src/test_utils/chain.rs +++ b/lib/core/src/test_utils/chain.rs @@ -2,8 +2,9 @@ use anyhow::Result; use async_trait::async_trait; +use boltz_client::elements::Script; use lwk_wollet::elements::{BlockHash, Txid}; -use lwk_wollet::{bitcoin::consensus::deserialize, elements::hex::FromHex}; +use lwk_wollet::{bitcoin::consensus::deserialize, elements::hex::FromHex, History}; use crate::{ chain::{bitcoin::BitcoinChainService, liquid::LiquidChainService}, @@ -82,6 +83,10 @@ impl LiquidChainService for MockLiquidChainService { Ok(self.history.clone().into_iter().map(Into::into).collect()) } + async fn get_scripts_history(&self, _scripts: &[&Script]) -> Result>> { + unimplemented!() + } + async fn verify_tx( &self, _address: &boltz_client::ElementsAddress, @@ -143,6 +148,13 @@ impl BitcoinChainService for MockBitcoinChainService { Ok(self.history.clone().into_iter().map(Into::into).collect()) } + fn get_scripts_history( + &self, + _scripts: &[&boltz_client::bitcoin::Script], + ) -> Result>> { + unimplemented!() + } + fn script_get_balance( &self, _script: &boltz_client::bitcoin::Script,