From 282978fddfeb4e3e172c00dd61b93a23f76984e7 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Tue, 28 Jan 2025 20:29:59 +0200 Subject: [PATCH 1/7] remove lock for liquid chain servivce --- lib/core/src/chain/liquid.rs | 14 ++++-- lib/core/src/chain_swap.rs | 27 ++++------ lib/core/src/receive_swap.rs | 13 ++--- lib/core/src/recover/recoverer.rs | 24 ++++++--- lib/core/src/sdk.rs | 65 ++++++++++--------------- lib/core/src/send_swap.rs | 20 +++----- lib/core/src/test_utils/chain.rs | 18 ++++--- lib/core/src/test_utils/chain_swap.rs | 2 +- lib/core/src/test_utils/receive_swap.rs | 4 +- lib/core/src/test_utils/recover.rs | 2 +- lib/core/src/test_utils/sdk.rs | 4 +- lib/core/src/test_utils/send_swap.rs | 3 +- 12 files changed, 92 insertions(+), 104 deletions(-) diff --git a/lib/core/src/chain/liquid.rs b/lib/core/src/chain/liquid.rs index f2d1c78..2fa3823 100644 --- a/lib/core/src/chain/liquid.rs +++ b/lib/core/src/chain/liquid.rs @@ -1,3 +1,4 @@ +use std::sync::Mutex; use std::time::Duration; use anyhow::{anyhow, Result}; @@ -17,7 +18,7 @@ use crate::{model::Config, utils}; #[async_trait] pub trait LiquidChainService: Send + Sync { /// Get the blockchain latest block - async fn tip(&mut self) -> Result; + async fn tip(&self) -> Result; /// Broadcast a transaction async fn broadcast(&self, tx: &Transaction) -> Result; @@ -58,20 +59,25 @@ pub trait LiquidChainService: Send + Sync { pub(crate) struct HybridLiquidChainService { electrum_client: ElectrumClient, + tip_client: Mutex, } impl HybridLiquidChainService { pub(crate) fn new(config: Config) -> Result { let electrum_url = ElectrumUrl::new(&config.liquid_electrum_url, true, true)?; let electrum_client = ElectrumClient::new(&electrum_url)?; - Ok(Self { electrum_client }) + let tip_client = ElectrumClient::new(&electrum_url)?; + Ok(Self { + electrum_client, + tip_client: Mutex::new(tip_client), + }) } } #[async_trait] impl LiquidChainService for HybridLiquidChainService { - async fn tip(&mut self) -> Result { - Ok(self.electrum_client.tip()?.height) + async fn tip(&self) -> Result { + Ok(self.tip_client.lock().unwrap().tip()?.height) } async fn broadcast(&self, tx: &Transaction) -> Result { diff --git a/lib/core/src/chain_swap.rs b/lib/core/src/chain_swap.rs index 1e95b4e..0e8ee3a 100644 --- a/lib/core/src/chain_swap.rs +++ b/lib/core/src/chain_swap.rs @@ -41,7 +41,7 @@ pub(crate) struct ChainSwapHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, - liquid_chain_service: Arc>, + liquid_chain_service: Arc, bitcoin_chain_service: Arc>, subscription_notifier: broadcast::Sender, } @@ -70,7 +70,7 @@ impl ChainSwapHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, - liquid_chain_service: Arc>, + liquid_chain_service: Arc, bitcoin_chain_service: Arc>, ) -> Result { let (subscription_notifier, _) = broadcast::channel::(30); @@ -759,9 +759,7 @@ impl ChainSwapHandler { .await?; let lockup_tx_id = self - .liquid_chain_service - .lock() - .await + .liquid_chain_service .broadcast(&lockup_tx) .await? .to_string(); @@ -843,9 +841,8 @@ impl ChainSwapHandler { Ok(_) => { let broadcast_res = match claim_tx { // We attempt broadcasting via chain service, then fallback to Boltz - SdkTransaction::Liquid(tx) => { - let liquid_chain_service = self.liquid_chain_service.lock().await; - liquid_chain_service + SdkTransaction::Liquid(tx) => { + self.liquid_chain_service .broadcast(&tx) .await .map(|tx_id| tx_id.to_hex()) @@ -1043,13 +1040,13 @@ impl ChainSwapHandler { }); }; - let liquid_chain_service = self.liquid_chain_service.lock().await; + let script_pk = swap_script .to_address(self.config.network.into()) .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))? .to_unconfidential() .script_pubkey(); - let utxos = liquid_chain_service.get_script_utxos(&script_pk).await?; + let utxos = self.liquid_chain_service.get_script_utxos(&script_pk).await?; let refund_address = self.onchain_wallet.next_unused_address().await?.to_string(); let SdkTransaction::Liquid(refund_tx) = self.swapper.create_refund_tx( @@ -1067,7 +1064,7 @@ impl ChainSwapHandler { ), }); }; - let refund_tx_id = liquid_chain_service + let refund_tx_id = self.liquid_chain_service .broadcast(&refund_tx) .await? .to_string(); @@ -1264,9 +1261,7 @@ impl ChainSwapHandler { .to_address(self.config.network.into()) .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?; let tx = self - .liquid_chain_service - .lock() - .await + .liquid_chain_service .verify_tx( &address, &swap_update_tx.id, @@ -1447,9 +1442,7 @@ impl ChainSwapHandler { .to_unconfidential(); let script = Script::from_hex(hex::encode(address.script_pubkey().as_bytes()).as_str()) .map_err(|e| anyhow!("Failed to get script from address {e:?}"))?; - self.liquid_chain_service - .lock() - .await + self.liquid_chain_service .get_script_history_with_retry(&script, 5) .await } diff --git a/lib/core/src/receive_swap.rs b/lib/core/src/receive_swap.rs index 46d0e27..4f41ace 100644 --- a/lib/core/src/receive_swap.rs +++ b/lib/core/src/receive_swap.rs @@ -9,7 +9,7 @@ use lwk_wollet::elements::secp256k1_zkp::Secp256k1; use lwk_wollet::elements::{Transaction, Txid}; use lwk_wollet::hashes::hex::DisplayHex; use lwk_wollet::secp256k1::SecretKey; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::broadcast; use crate::chain::liquid::LiquidChainService; use crate::model::{BlockListener, PaymentState::*}; @@ -32,7 +32,7 @@ pub(crate) struct ReceiveSwapHandler { persister: Arc, swapper: Arc, subscription_notifier: broadcast::Sender, - liquid_chain_service: Arc>, + liquid_chain_service: Arc, } #[async_trait] @@ -52,7 +52,7 @@ impl ReceiveSwapHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, - liquid_chain_service: Arc>, + liquid_chain_service: Arc, ) -> Self { let (subscription_notifier, _) = broadcast::channel::(30); Self { @@ -354,8 +354,7 @@ impl ReceiveSwapHandler { match self.persister.set_receive_swap_claim_tx_id(swap_id, &tx_id) { Ok(_) => { // We attempt broadcasting via chain service, then fallback to Boltz - let liquid_chain_service = self.liquid_chain_service.lock().await; - let broadcast_res = liquid_chain_service + let broadcast_res = self.liquid_chain_service .broadcast(&claim_tx) .await .map(|tx_id| tx_id.to_hex()) @@ -440,8 +439,6 @@ impl ReceiveSwapHandler { let swap_id = &receive_swap.id; let tx_hex = self .liquid_chain_service - .lock() - .await .get_transaction_hex(&Txid::from_str(&tx_id)?) .await? .ok_or(anyhow!("Lockup tx not found for Receive swap {swap_id}"))? @@ -516,8 +513,6 @@ impl ReceiveSwapHandler { .to_address(self.config.network.into()) .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?; self.liquid_chain_service - .lock() - .await .verify_tx(&address, tx_id, tx_hex, verify_confirmation) .await } diff --git a/lib/core/src/recover/recoverer.rs b/lib/core/src/recover/recoverer.rs index 4402418..c1a86c3 100644 --- a/lib/core/src/recover/recoverer.rs +++ b/lib/core/src/recover/recoverer.rs @@ -27,7 +27,7 @@ pub(crate) struct Recoverer { master_blinding_key: MasterBlindingKey, swapper: Arc, onchain_wallet: Arc, - liquid_chain_service: Arc>, + liquid_chain_service: Arc, bitcoin_chain_service: Arc>, } @@ -36,7 +36,7 @@ impl Recoverer { master_blinding_key: Vec, swapper: Arc, onchain_wallet: Arc, - liquid_chain_service: Arc>, + liquid_chain_service: Arc, bitcoin_chain_service: Arc>, ) -> Result { Ok(Self { @@ -79,8 +79,6 @@ impl Recoverer { let claim_tx_ids: Vec = failed_cooperative.values().cloned().collect(); let claim_txs = self .liquid_chain_service - .lock() - .await .get_transactions(claim_tx_ids.as_slice()) .await .map_err(|e| anyhow!("Failed to fetch claim txs from recovery: {e}"))?; @@ -166,6 +164,20 @@ impl Recoverer { /// - `tx_map`: all known onchain txs of this wallet at this time, essentially our own LWK cache. /// - `swaps`: immutable data of the swaps for which we want to recover onchain data. pub(crate) async fn recover_from_onchain(&self, swaps: &mut [Swap]) -> Result<()> { + //println!("swaps: {:?}", swaps.iter().map(|s|s.).collect::>()); + for swap in swaps.iter() { + match swap { + Swap::Send(send_swap) => { + println!("send_swap: {:?}", send_swap.id); + } + Swap::Receive(receive_swap) => { + println!("receive_swap: {:?}", receive_swap.id); + } + Swap::Chain(chain_swap) => { + println!("chain_swap: {:?}", chain_swap.id); + } + } + } let tx_map = TxMap::from_raw_tx_map(self.onchain_wallet.transactions_by_tx_id().await?); let swaps_list = swaps.to_vec().try_into()?; @@ -203,7 +215,7 @@ impl Recoverer { )?; let bitcoin_tip = self.bitcoin_chain_service.lock().await.tip()?; - let liquid_tip = self.liquid_chain_service.lock().await.tip().await?; + let liquid_tip = self.liquid_chain_service.tip().await?; for swap in swaps.iter_mut() { let swap_id = &swap.id(); @@ -355,8 +367,6 @@ impl Recoverer { let swap_lbtc_scripts = swaps_list.get_swap_lbtc_scripts(); let lbtc_script_histories = self .liquid_chain_service - .lock() - .await .get_scripts_history(&swap_lbtc_scripts.iter().collect::>()) .await?; let lbtc_swap_scripts_len = swap_lbtc_scripts.len(); diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 5749d0b..52401c3 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -75,7 +75,7 @@ pub struct LiquidSdk { pub(crate) status_stream: Arc, pub(crate) swapper: Arc, pub(crate) recoverer: Arc, - pub(crate) liquid_chain_service: Arc>, + pub(crate) liquid_chain_service: Arc, pub(crate) bitcoin_chain_service: Arc>, pub(crate) fiat_api: Arc, pub(crate) is_started: RwLock, @@ -176,8 +176,7 @@ impl LiquidSdk { persister.init()?; persister.replace_asset_metadata(config.asset_metadata.clone())?; - let liquid_chain_service = - Arc::new(Mutex::new(HybridLiquidChainService::new(config.clone())?)); + let liquid_chain_service = Arc::new(HybridLiquidChainService::new(config.clone())?); let bitcoin_chain_service = Arc::new(Mutex::new(HybridBitcoinChainService::new(config.clone())?)); @@ -354,7 +353,7 @@ impl LiquidSdk { tokio::select! { _ = interval.tick() => { // Get the Liquid tip and process a new block - let liquid_tip_res = cloned.liquid_chain_service.lock().await.tip().await; + let liquid_tip_res = cloned.liquid_chain_service.tip().await; let is_new_liquid_block = match &liquid_tip_res { Ok(height) => { debug!("Got Liquid tip: {height}"); @@ -1348,8 +1347,7 @@ impl LiquidSdk { "Built onchain L-BTC tx with receiver_amount_sat = {receiver_amount_sat}, fees_sat = {fees_sat} and txid = {tx_id}" ); - let liquid_chain_service = self.liquid_chain_service.lock().await; - let tx_id = liquid_chain_service.broadcast(&tx).await?.to_string(); + let tx_id = self.liquid_chain_service.broadcast(&tx).await?.to_string(); // We insert a pseudo-tx in case LWK fails to pick up the new mempool tx for a while // This makes the tx known to the SDK (get_info, list_payments) instantly @@ -2571,7 +2569,7 @@ impl LiquidSdk { match partial_sync { false => { let bitcoin_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32; - let liquid_height = self.liquid_chain_service.lock().await.tip().await?; + let liquid_height = self.liquid_chain_service.tip().await?; let final_swap_states = [PaymentState::Complete, PaymentState::Failed]; let send_swaps = self @@ -3521,7 +3519,7 @@ mod tests { create_persister!(persister); let swapper = Arc::new(MockSwapper::default()); let status_stream = Arc::new(MockStatusStream::new()); - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); let sdk = Arc::new(new_liquid_sdk_with_chain_services( @@ -3569,15 +3567,12 @@ mod tests { let height = (serde_json::to_string(&status).unwrap() == serde_json::to_string(&RevSwapStates::TransactionConfirmed).unwrap()) as i32; - liquid_chain_service - .lock() - .await - .set_history(vec![MockHistory { - txid: mock_tx_id, - height, - block_hash: None, - block_timestamp: None, - }]); + liquid_chain_service.set_history(vec![MockHistory { + txid: mock_tx_id, + height, + block_hash: None, + block_timestamp: None, + }]); let persisted_swap = trigger_swap_update!( "receive", @@ -3606,15 +3601,12 @@ mod tests { let height = (serde_json::to_string(&status).unwrap() == serde_json::to_string(&RevSwapStates::TransactionConfirmed).unwrap()) as i32; - liquid_chain_service - .lock() - .await - .set_history(vec![MockHistory { - txid: mock_tx_id, - height, - block_hash: None, - block_timestamp: None, - }]); + liquid_chain_service.set_history(vec![MockHistory { + txid: mock_tx_id, + height, + block_hash: None, + block_timestamp: None, + }]); let persisted_swap = trigger_swap_update!( "receive", @@ -3699,7 +3691,7 @@ mod tests { create_persister!(persister); let swapper = Arc::new(MockSwapper::default()); let status_stream = Arc::new(MockStatusStream::new()); - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); let sdk = Arc::new(new_liquid_sdk_with_chain_services( @@ -3789,15 +3781,12 @@ mod tests { }]); } Direction::Outgoing => { - liquid_chain_service - .lock() - .await - .set_history(vec![MockHistory { - txid: Txid::from_str(user_lockup_tx_id).unwrap(), - height: 0, - block_hash: None, - block_timestamp: None, - }]); + liquid_chain_service.set_history(vec![MockHistory { + txid: Txid::from_str(user_lockup_tx_id).unwrap(), + height: 0, + block_hash: None, + block_timestamp: None, + }]); } } } @@ -3944,7 +3933,7 @@ mod tests { create_persister!(persister); let swapper = Arc::new(MockSwapper::new()); let status_stream = Arc::new(MockStatusStream::new()); - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); let sdk = Arc::new(new_liquid_sdk_with_chain_services( @@ -4008,7 +3997,7 @@ mod tests { create_persister!(persister); let swapper = Arc::new(MockSwapper::new()); let status_stream = Arc::new(MockStatusStream::new()); - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); let sdk = Arc::new(new_liquid_sdk_with_chain_services( diff --git a/lib/core/src/send_swap.rs b/lib/core/src/send_swap.rs index 9b44ebf..da68b66 100644 --- a/lib/core/src/send_swap.rs +++ b/lib/core/src/send_swap.rs @@ -10,7 +10,7 @@ use log::{debug, error, info, warn}; use lwk_wollet::elements::{LockTime, Transaction}; use lwk_wollet::hashes::{sha256, Hash}; use sdk_common::prelude::{AesSuccessActionDataResult, SuccessAction, SuccessActionProcessed}; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::broadcast; use crate::chain::liquid::LiquidChainService; use crate::model::{ @@ -34,7 +34,7 @@ pub(crate) struct SendSwapHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, - chain_service: Arc>, + chain_service: Arc, subscription_notifier: broadcast::Sender, } @@ -55,7 +55,7 @@ impl SendSwapHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, - chain_service: Arc>, + chain_service: Arc, ) -> Self { let (subscription_notifier, _) = broadcast::channel::(30); Self { @@ -208,7 +208,7 @@ impl SendSwapHandler { info!("Broadcasting lockup tx {lockup_tx_id} for Send swap {swap_id}",); - let broadcast_result = self.chain_service.lock().await.broadcast(&lockup_tx).await; + let broadcast_result = self.chain_service.broadcast(&lockup_tx).await; if let Err(err) = broadcast_result { debug!("Could not broadcast lockup tx for Send Swap {swap_id}: {err:?}"); @@ -384,8 +384,6 @@ impl SendSwapHandler { // Get tx history of the swap script (lockup address) let history: Vec<_> = self .chain_service - .lock() - .await .get_script_history(&swap_script_pk) .await?; @@ -405,8 +403,6 @@ impl SendSwapHandler { let claim_tx_id = claim_tx_entry.txid; let claim_tx = self .chain_service - .lock() - .await .get_transactions(&[claim_tx_id]) .await .map_err(|e| anyhow!("Failed to fetch claim txs {claim_tx_id:?}: {e}"))? @@ -445,13 +441,12 @@ impl SendSwapHandler { let swap_script = swap.get_swap_script()?; let refund_address = self.onchain_wallet.next_unused_address().await?.to_string(); - let liquid_chain_service = self.chain_service.lock().await; let script_pk = swap_script .to_address(self.config.network.into()) .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))? .to_unconfidential() .script_pubkey(); - let utxos = liquid_chain_service.get_script_utxos(&script_pk).await?; + let utxos = self.chain_service.get_script_utxos(&script_pk).await?; let SdkTransaction::Liquid(refund_tx) = self.swapper.create_refund_tx( Swap::Send(swap.clone()), &refund_address, @@ -467,10 +462,7 @@ impl SendSwapHandler { ), }); }; - let refund_tx_id = liquid_chain_service - .broadcast(&refund_tx) - .await? - .to_string(); + let refund_tx_id = self.chain_service.broadcast(&refund_tx).await?.to_string(); info!( "Successfully broadcast refund for Send Swap {}, is_cooperative: {is_cooperative}", diff --git a/lib/core/src/test_utils/chain.rs b/lib/core/src/test_utils/chain.rs index bd75a8d..408d0c1 100644 --- a/lib/core/src/test_utils/chain.rs +++ b/lib/core/src/test_utils/chain.rs @@ -1,5 +1,7 @@ #![cfg(test)] +use std::sync::Mutex; + use anyhow::Result; use async_trait::async_trait; use boltz_client::{ @@ -47,7 +49,7 @@ impl From for lwk_wollet::History { #[derive(Default)] pub(crate) struct MockLiquidChainService { - history: Vec, + history: Mutex>, } impl MockLiquidChainService { @@ -55,15 +57,19 @@ impl MockLiquidChainService { MockLiquidChainService::default() } - pub(crate) fn set_history(&mut self, history: Vec) -> &mut Self { - self.history = history; + pub(crate) fn set_history(&self, history: Vec) -> &Self { + *self.history.lock().unwrap() = history; self } + + pub(crate) fn get_history(&self) -> Vec { + self.history.lock().unwrap().clone() + } } #[async_trait] impl LiquidChainService for MockLiquidChainService { - async fn tip(&mut self) -> Result { + async fn tip(&self) -> Result { Ok(0) } @@ -92,7 +98,7 @@ impl LiquidChainService for MockLiquidChainService { &self, _scripts: &ElementsScript, ) -> Result> { - Ok(self.history.clone().into_iter().map(Into::into).collect()) + Ok(self.get_history().into_iter().map(Into::into).collect()) } async fn get_script_history_with_retry( @@ -100,7 +106,7 @@ impl LiquidChainService for MockLiquidChainService { _script: &ElementsScript, _retries: u64, ) -> Result> { - Ok(self.history.clone().into_iter().map(Into::into).collect()) + Ok(self.get_history().into_iter().map(Into::into).collect()) } async fn get_scripts_history(&self, _scripts: &[&ElementsScript]) -> Result>> { diff --git a/lib/core/src/test_utils/chain_swap.rs b/lib/core/src/test_utils/chain_swap.rs index 3735b78..787bef0 100644 --- a/lib/core/src/test_utils/chain_swap.rs +++ b/lib/core/src/test_utils/chain_swap.rs @@ -34,7 +34,7 @@ pub(crate) fn new_chain_swap_handler(persister: Arc) -> Result> = Arc::new(Box::new(MockSigner::new()?)); let onchain_wallet = Arc::new(MockWallet::new(signer)?); let swapper = Arc::new(BoltzSwapper::new(config.clone(), None)); - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); ChainSwapHandler::new( diff --git a/lib/core/src/test_utils/receive_swap.rs b/lib/core/src/test_utils/receive_swap.rs index 4245132..becaa1e 100644 --- a/lib/core/src/test_utils/receive_swap.rs +++ b/lib/core/src/test_utils/receive_swap.rs @@ -3,8 +3,6 @@ use anyhow::Result; use std::sync::Arc; -use tokio::sync::Mutex; - use crate::{ model::{Config, Signer}, persist::Persister, @@ -22,7 +20,7 @@ pub(crate) fn new_receive_swap_handler(persister: Arc) -> Result> = Arc::new(Box::new(MockSigner::new()?)); let onchain_wallet = Arc::new(MockWallet::new(signer)?); let swapper = Arc::new(MockSwapper::default()); - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); Ok(ReceiveSwapHandler::new( config, diff --git a/lib/core/src/test_utils/recover.rs b/lib/core/src/test_utils/recover.rs index b963f27..3a033b1 100644 --- a/lib/core/src/test_utils/recover.rs +++ b/lib/core/src/test_utils/recover.rs @@ -14,7 +14,7 @@ pub(crate) fn new_recoverer( swapper: Arc, onchain_wallet: Arc, ) -> Result { - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); Recoverer::new( diff --git a/lib/core/src/test_utils/sdk.rs b/lib/core/src/test_utils/sdk.rs index c8305e3..7ac8d76 100644 --- a/lib/core/src/test_utils/sdk.rs +++ b/lib/core/src/test_utils/sdk.rs @@ -31,7 +31,7 @@ pub(crate) fn new_liquid_sdk( swapper: Arc, status_stream: Arc, ) -> Result { - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); new_liquid_sdk_with_chain_services( @@ -48,7 +48,7 @@ pub(crate) fn new_liquid_sdk_with_chain_services( persister: Arc, swapper: Arc, status_stream: Arc, - liquid_chain_service: Arc>, + liquid_chain_service: Arc, bitcoin_chain_service: Arc>, onchain_fee_rate_leeway_sat_per_vbyte: Option, ) -> Result { diff --git a/lib/core/src/test_utils/send_swap.rs b/lib/core/src/test_utils/send_swap.rs index 8cc7adc..5141c63 100644 --- a/lib/core/src/test_utils/send_swap.rs +++ b/lib/core/src/test_utils/send_swap.rs @@ -8,7 +8,6 @@ use crate::{ send_swap::SendSwapHandler, }; use anyhow::Result; -use tokio::sync::Mutex; use super::{ chain::MockLiquidChainService, @@ -21,7 +20,7 @@ pub(crate) fn new_send_swap_handler(persister: Arc) -> Result> = Arc::new(Box::new(MockSigner::new()?)); let onchain_wallet = Arc::new(MockWallet::new(signer)?); let swapper = Arc::new(MockSwapper::default()); - let chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); + let chain_service = Arc::new(MockLiquidChainService::new()); Ok(SendSwapHandler::new( config, From 2cffea07b815bdd78d4a8a9eaddd3addc48a01a8 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Tue, 28 Jan 2025 21:01:19 +0200 Subject: [PATCH 2/7] Add logs --- lib/core/src/sdk.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 52401c3..9a13083 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -352,8 +352,13 @@ impl LiquidSdk { loop { tokio::select! { _ = interval.tick() => { + info!("Track blocks loop ticked"); // Get the Liquid tip and process a new block + let t0 = Instant::now(); let liquid_tip_res = cloned.liquid_chain_service.tip().await; + let duration_ms = Instant::now().duration_since(t0).as_millis(); + info!("Fetched liquid tip at ({duration_ms} ms)"); + let is_new_liquid_block = match &liquid_tip_res { Ok(height) => { debug!("Got Liquid tip: {height}"); @@ -367,7 +372,10 @@ impl LiquidSdk { } }; // Get the Bitcoin tip and process a new block + let t0 = Instant::now(); let bitcoin_tip_res = cloned.bitcoin_chain_service.lock().await.tip().map(|tip| tip.height as u32); + let duration_ms = Instant::now().duration_since(t0).as_millis(); + info!("Fetched bitcoin tip at ({duration_ms} ms)"); let is_new_bitcoin_block = match &bitcoin_tip_res { Ok(height) => { debug!("Got Bitcoin tip: {height}"); From 9c68b9e8a331a7b11203fd15e04a427b15554766 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Tue, 28 Jan 2025 22:00:50 +0200 Subject: [PATCH 3/7] remove lock for bitcoin chain service --- lib/core/src/chain/bitcoin.rs | 16 +++--- lib/core/src/chain_swap.rs | 34 ++++------- lib/core/src/recover/recoverer.rs | 20 ++++--- lib/core/src/sdk.rs | 81 +++++++++++---------------- lib/core/src/test_utils/chain.rs | 48 ++++++++++------ lib/core/src/test_utils/chain_swap.rs | 4 +- lib/core/src/test_utils/recover.rs | 3 +- lib/core/src/test_utils/sdk.rs | 6 +- 8 files changed, 100 insertions(+), 112 deletions(-) diff --git a/lib/core/src/chain/bitcoin.rs b/lib/core/src/chain/bitcoin.rs index 5d11d92..4c68a2e 100644 --- a/lib/core/src/chain/bitcoin.rs +++ b/lib/core/src/chain/bitcoin.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{sync::Mutex, time::Duration}; use anyhow::{anyhow, Result}; use async_trait::async_trait; @@ -24,7 +24,7 @@ use crate::{ #[async_trait] pub trait BitcoinChainService: Send + Sync { /// Get the blockchain latest block - fn tip(&mut self) -> Result; + fn tip(&self) -> Result; /// Broadcast a transaction fn broadcast(&self, tx: &Transaction) -> Result; @@ -76,7 +76,7 @@ pub trait BitcoinChainService: Send + Sync { pub(crate) struct HybridBitcoinChainService { client: Client, - tip: HeaderNotification, + tip: Mutex, config: Config, } impl HybridBitcoinChainService { @@ -93,7 +93,7 @@ impl HybridBitcoinChainService { Ok(Self { client, - tip, + tip: Mutex::new(tip), config, }) } @@ -101,7 +101,7 @@ impl HybridBitcoinChainService { #[async_trait] impl BitcoinChainService for HybridBitcoinChainService { - fn tip(&mut self) -> Result { + fn tip(&self) -> Result { let mut maybe_popped_header = None; while let Some(header) = self.client.block_headers_pop_raw()? { maybe_popped_header = Some(header) @@ -110,7 +110,7 @@ impl BitcoinChainService for HybridBitcoinChainService { match maybe_popped_header { Some(popped_header) => { let tip: HeaderNotification = popped_header.try_into()?; - self.tip = tip; + *self.tip.lock().unwrap() = tip; } None => { // https://github.com/bitcoindevkit/rust-electrum-client/issues/124 @@ -119,12 +119,12 @@ impl BitcoinChainService for HybridBitcoinChainService { // successful retry will prevent us knowing about the reconnect. if let Ok(header) = self.client.block_headers_subscribe_raw() { let tip: HeaderNotification = header.try_into()?; - self.tip = tip; + *self.tip.lock().unwrap() = tip; } } } - Ok(self.tip.clone()) + Ok(self.tip.lock().unwrap().clone()) } fn broadcast(&self, tx: &Transaction) -> Result { diff --git a/lib/core/src/chain_swap.rs b/lib/core/src/chain_swap.rs index 0e8ee3a..db512aa 100644 --- a/lib/core/src/chain_swap.rs +++ b/lib/core/src/chain_swap.rs @@ -14,7 +14,7 @@ use lwk_wollet::{ hashes::hex::DisplayHex, History, }; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::broadcast; use crate::model::{BlockListener, ChainSwapUpdate, LIQUID_FEE_RATE_MSAT_PER_VBYTE}; use crate::{ @@ -42,7 +42,7 @@ pub(crate) struct ChainSwapHandler { persister: Arc, swapper: Arc, liquid_chain_service: Arc, - bitcoin_chain_service: Arc>, + bitcoin_chain_service: Arc, subscription_notifier: broadcast::Sender, } @@ -71,7 +71,7 @@ impl ChainSwapHandler { persister: Arc, swapper: Arc, liquid_chain_service: Arc, - bitcoin_chain_service: Arc>, + bitcoin_chain_service: Arc, ) -> Result { let (subscription_notifier, _) = broadcast::channel::(30); Ok(Self { @@ -454,9 +454,7 @@ impl ChainSwapHandler { let script_pubkey = swap.get_receive_lockup_swap_script_pubkey(self.config.network)?; let script_balance = self - .bitcoin_chain_service - .lock() - .await + .bitcoin_chain_service .script_get_balance_with_retry(script_pubkey.as_script(), 10) .await?; debug!("Found lockup balance {script_balance:?}"); @@ -854,9 +852,8 @@ impl ChainSwapHandler { self.swapper.broadcast_tx(self.config.network.into(), &claim_tx_hex) }) } - SdkTransaction::Bitcoin(tx) => { - let bitcoin_chain_service = self.bitcoin_chain_service.lock().await; - bitcoin_chain_service + SdkTransaction::Bitcoin(tx) => { + self.bitcoin_chain_service .broadcast(&tx) .map(|tx_id| tx_id.to_hex()) .map_err(|err| PaymentError::Generic { @@ -977,13 +974,12 @@ impl ChainSwapHandler { err: "Unexpected swap script type found".to_string(), }); }; - - let bitcoin_chain_service = self.bitcoin_chain_service.lock().await; + let script_pk = swap_script .to_address(self.config.network.as_bitcoin_chain()) .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))? .script_pubkey(); - let utxos = bitcoin_chain_service.get_script_utxos(&script_pk).await?; + let utxos = self.bitcoin_chain_service.get_script_utxos(&script_pk).await?; let SdkTransaction::Bitcoin(refund_tx) = self.swapper.create_refund_tx( Swap::Chain(swap.clone()), @@ -997,7 +993,7 @@ impl ChainSwapHandler { err: format!("Unexpected refund tx type returned for incoming Chain swap {id}",), }); }; - let refund_tx_id = bitcoin_chain_service.broadcast(&refund_tx)?.to_string(); + let refund_tx_id = self.bitcoin_chain_service.broadcast(&refund_tx)?.to_string(); info!("Successfully broadcast refund for incoming Chain Swap {id}, is_cooperative: {is_cooperative}"); @@ -1203,9 +1199,7 @@ impl ChainSwapHandler { // Get full transaction let txs = self - .bitcoin_chain_service - .lock() - .await + .bitcoin_chain_service .get_transactions(&[first_tx_id])?; let user_lockup_tx = txs.first().ok_or(anyhow!( "No transactions found for user lockup script for swap {}", @@ -1324,9 +1318,7 @@ impl ChainSwapHandler { .to_address(self.config.network.as_bitcoin_chain()) .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?; let tx = self - .bitcoin_chain_service - .lock() - .await + .bitcoin_chain_service .verify_tx( &address, &swap_update_tx.id, @@ -1424,9 +1416,7 @@ impl ChainSwapHandler { .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?; let script_pubkey = address.script_pubkey(); let script = script_pubkey.as_script(); - self.bitcoin_chain_service - .lock() - .await + self.bitcoin_chain_service .get_script_history_with_retry(script, 5) .await } diff --git a/lib/core/src/recover/recoverer.rs b/lib/core/src/recover/recoverer.rs index c1a86c3..e0e3909 100644 --- a/lib/core/src/recover/recoverer.rs +++ b/lib/core/src/recover/recoverer.rs @@ -10,7 +10,6 @@ use lwk_wollet::elements_miniscript::slip77::MasterBlindingKey; use lwk_wollet::hashes::hex::{DisplayHex, FromHex}; use lwk_wollet::hashes::{sha256, Hash as _}; use lwk_wollet::WalletTx; -use tokio::sync::Mutex; use super::model::*; @@ -28,7 +27,7 @@ pub(crate) struct Recoverer { swapper: Arc, onchain_wallet: Arc, liquid_chain_service: Arc, - bitcoin_chain_service: Arc>, + bitcoin_chain_service: Arc, } impl Recoverer { @@ -37,7 +36,7 @@ impl Recoverer { swapper: Arc, onchain_wallet: Arc, liquid_chain_service: Arc, - bitcoin_chain_service: Arc>, + bitcoin_chain_service: Arc, ) -> Result { Ok(Self { master_blinding_key: MasterBlindingKey::from_hex( @@ -214,7 +213,7 @@ impl Recoverer { &swaps_list.receive_chain_swap_immutable_data_by_swap_id, )?; - let bitcoin_tip = self.bitcoin_chain_service.lock().await.tip()?; + let bitcoin_tip = self.bitcoin_chain_service.tip()?; let liquid_tip = self.liquid_chain_service.tip().await?; for swap in swaps.iter_mut() { @@ -381,13 +380,14 @@ impl Recoverer { .map(|(k, v)| (k, v.into_iter().map(HistoryTxId::from).collect())) .collect(); - let bitcoin_chain_service = self.bitcoin_chain_service.lock().await; let swap_btc_script_bufs = swaps_list.get_swap_btc_scripts(); let swap_btc_scripts = swap_btc_script_bufs .iter() .map(|x| x.as_script()) .collect::>(); - let btc_script_histories = bitcoin_chain_service.get_scripts_history(&swap_btc_scripts)?; + let btc_script_histories = self + .bitcoin_chain_service + .get_scripts_history(&swap_btc_scripts)?; let btx_script_tx_ids: Vec = btc_script_histories .iter() .flatten() @@ -408,8 +408,12 @@ impl Recoverer { .map(|(k, v)| (k, v.iter().map(HistoryTxId::from).collect())) .collect(); - let btc_script_txs = bitcoin_chain_service.get_transactions(&btx_script_tx_ids)?; - let btc_script_balances = bitcoin_chain_service.scripts_get_balance(&swap_btc_scripts)?; + let btc_script_txs = self + .bitcoin_chain_service + .get_transactions(&btx_script_tx_ids)?; + let btc_script_balances = self + .bitcoin_chain_service + .scripts_get_balance(&swap_btc_scripts)?; let btc_script_to_txs_map: HashMap> = swap_btc_script_bufs .clone() diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 9a13083..baf89b2 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -26,7 +26,7 @@ use sdk_common::input_parser::InputType; use sdk_common::liquid::LiquidAddressData; use sdk_common::prelude::{FiatAPI, FiatCurrency, LnUrlPayError, LnUrlWithdrawError, Rate}; use signer::SdkSigner; -use tokio::sync::{watch, Mutex, RwLock}; +use tokio::sync::{watch, RwLock}; use tokio::time::MissedTickBehavior; use tokio_stream::wrappers::BroadcastStream; use x509_parser::parse_x509_certificate; @@ -76,7 +76,7 @@ pub struct LiquidSdk { pub(crate) swapper: Arc, pub(crate) recoverer: Arc, pub(crate) liquid_chain_service: Arc, - pub(crate) bitcoin_chain_service: Arc>, + pub(crate) bitcoin_chain_service: Arc, pub(crate) fiat_api: Arc, pub(crate) is_started: RwLock, pub(crate) shutdown_sender: watch::Sender<()>, @@ -177,8 +177,7 @@ impl LiquidSdk { persister.replace_asset_metadata(config.asset_metadata.clone())?; let liquid_chain_service = Arc::new(HybridLiquidChainService::new(config.clone())?); - let bitcoin_chain_service = - Arc::new(Mutex::new(HybridBitcoinChainService::new(config.clone())?)); + let bitcoin_chain_service = Arc::new(HybridBitcoinChainService::new(config.clone())?); let onchain_wallet = Arc::new(LiquidOnchainWallet::new( config.clone(), @@ -373,7 +372,7 @@ impl LiquidSdk { }; // Get the Bitcoin tip and process a new block let t0 = Instant::now(); - let bitcoin_tip_res = cloned.bitcoin_chain_service.lock().await.tip().map(|tip| tip.height as u32); + let bitcoin_tip_res = cloned.bitcoin_chain_service.tip().map(|tip| tip.height as u32); let duration_ms = Instant::now().duration_since(t0).as_millis(); info!("Fetched bitcoin tip at ({duration_ms} ms)"); let is_new_bitcoin_block = match &bitcoin_tip_res { @@ -2369,8 +2368,6 @@ impl LiquidSdk { .collect(); let scripts_balance = self .bitcoin_chain_service - .lock() - .await .scripts_get_balance(&lockup_scripts)?; let mut refundables = vec![]; @@ -2466,6 +2463,7 @@ impl LiquidSdk { /// (within last [CHAIN_SWAP_MONITORING_PERIOD_BITCOIN_BLOCKS] blocks = ~30 days), calling this /// is not necessary as it happens automatically in the background. pub async fn rescan_onchain_swaps(&self) -> SdkResult<()> { + let t0 = Instant::now(); let mut rescannable_swaps: Vec = self .persister .list_chain_swaps()? @@ -2475,6 +2473,7 @@ impl LiquidSdk { self.recoverer .recover_from_onchain(&mut rescannable_swaps) .await?; + let scanned_len = rescannable_swaps.len(); for swap in rescannable_swaps { let swap_id = &swap.id(); if let Swap::Chain(chain_swap) = swap { @@ -2483,6 +2482,11 @@ impl LiquidSdk { } } } + info!( + "Rescanned {} chain swaps in {} seconds", + scanned_len, + t0.elapsed().as_millis() + ); Ok(()) } @@ -2576,7 +2580,7 @@ impl LiquidSdk { .collect(); match partial_sync { false => { - let bitcoin_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32; + let bitcoin_height = self.bitcoin_chain_service.tip()?.height as u32; let liquid_height = self.liquid_chain_service.tip().await?; let final_swap_states = [PaymentState::Complete, PaymentState::Failed]; @@ -3288,12 +3292,7 @@ impl LiquidSdk { /// Get the recommended BTC fees based on the configured mempool.space instance. pub async fn recommended_fees(&self) -> Result { - Ok(self - .bitcoin_chain_service - .lock() - .await - .recommended_fees() - .await?) + Ok(self.bitcoin_chain_service.recommended_fees().await?) } /// Get the full default [Config] for specific [LiquidNetwork]. @@ -3379,7 +3378,6 @@ mod tests { swaps::boltz::{ChainSwapStates, RevSwapStates, SubSwapStates}, }; use lwk_wollet::{elements::Txid, hashes::hex::DisplayHex}; - use tokio::sync::Mutex; use crate::chain_swap::ESTIMATED_BTC_LOCKUP_TX_VSIZE; use crate::test_utils::chain_swap::{ @@ -3528,7 +3526,7 @@ mod tests { let swapper = Arc::new(MockSwapper::default()); let status_stream = Arc::new(MockStatusStream::new()); let liquid_chain_service = Arc::new(MockLiquidChainService::new()); - let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); + let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new()); let sdk = Arc::new(new_liquid_sdk_with_chain_services( persister.clone(), @@ -3700,7 +3698,7 @@ mod tests { let swapper = Arc::new(MockSwapper::default()); let status_stream = Arc::new(MockStatusStream::new()); let liquid_chain_service = Arc::new(MockLiquidChainService::new()); - let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); + let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new()); let sdk = Arc::new(new_liquid_sdk_with_chain_services( persister.clone(), @@ -3778,15 +3776,12 @@ mod tests { if let Some(user_lockup_tx_id) = user_lockup_tx_id { match direction { Direction::Incoming => { - bitcoin_chain_service - .lock() - .await - .set_history(vec![MockHistory { - txid: Txid::from_str(user_lockup_tx_id).unwrap(), - height: 0, - block_hash: None, - block_timestamp: None, - }]); + bitcoin_chain_service.set_history(vec![MockHistory { + txid: Txid::from_str(user_lockup_tx_id).unwrap(), + height: 0, + block_hash: None, + block_timestamp: None, + }]); } Direction::Outgoing => { liquid_chain_service.set_history(vec![MockHistory { @@ -3828,19 +3823,13 @@ mod tests { ChainSwapStates::TransactionConfirmed, ] { if direction == Direction::Incoming { - bitcoin_chain_service - .lock() - .await - .set_history(vec![MockHistory { - txid: Txid::from_str(&mock_user_lockup_tx_id).unwrap(), - height: 0, - block_hash: None, - block_timestamp: None, - }]); - bitcoin_chain_service - .lock() - .await - .set_transactions(&[&mock_user_lockup_tx_hex]); + bitcoin_chain_service.set_history(vec![MockHistory { + txid: Txid::from_str(&mock_user_lockup_tx_id).unwrap(), + height: 0, + block_hash: None, + block_timestamp: None, + }]); + bitcoin_chain_service.set_transactions(&[&mock_user_lockup_tx_hex]); } let persisted_swap = trigger_swap_update!( "chain", @@ -3942,7 +3931,7 @@ mod tests { let swapper = Arc::new(MockSwapper::new()); let status_stream = Arc::new(MockStatusStream::new()); let liquid_chain_service = Arc::new(MockLiquidChainService::new()); - let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); + let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new()); let sdk = Arc::new(new_liquid_sdk_with_chain_services( persister.clone(), @@ -3965,10 +3954,7 @@ mod tests { user_lockup_sat, onchain_fee_increase_sat: fee_increase, }); - bitcoin_chain_service - .lock() - .await - .set_script_balance_sat(user_lockup_sat); + bitcoin_chain_service.set_script_balance_sat(user_lockup_sat); let persisted_swap = trigger_swap_update!( "chain", NewSwapArgs::default() @@ -4006,7 +3992,7 @@ mod tests { let swapper = Arc::new(MockSwapper::new()); let status_stream = Arc::new(MockStatusStream::new()); let liquid_chain_service = Arc::new(MockLiquidChainService::new()); - let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); + let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new()); let sdk = Arc::new(new_liquid_sdk_with_chain_services( persister.clone(), @@ -4035,10 +4021,7 @@ mod tests { user_lockup_sat, onchain_fee_increase_sat: fee_increase, }); - bitcoin_chain_service - .lock() - .await - .set_script_balance_sat(user_lockup_sat); + bitcoin_chain_service.set_script_balance_sat(user_lockup_sat); let persisted_swap = trigger_swap_update!( "chain", NewSwapArgs::default() diff --git a/lib/core/src/test_utils/chain.rs b/lib/core/src/test_utils/chain.rs index 408d0c1..e892f30 100644 --- a/lib/core/src/test_utils/chain.rs +++ b/lib/core/src/test_utils/chain.rs @@ -132,42 +132,42 @@ impl LiquidChainService for MockLiquidChainService { } pub(crate) struct MockBitcoinChainService { - history: Vec, - txs: Vec, - script_balance_sat: u64, + history: Mutex>, + txs: Mutex>, + script_balance_sat: Mutex, } impl MockBitcoinChainService { pub(crate) fn new() -> Self { MockBitcoinChainService { - history: vec![], - txs: vec![], - script_balance_sat: 0, + history: Mutex::new(vec![]), + txs: Mutex::new(vec![]), + script_balance_sat: Mutex::new(0), } } - pub(crate) fn set_history(&mut self, history: Vec) -> &mut Self { - self.history = history; + pub(crate) fn set_history(&self, history: Vec) -> &Self { + *self.history.lock().unwrap() = history; self } - pub(crate) fn set_transactions(&mut self, txs: &[&str]) -> &mut Self { - self.txs = txs + pub(crate) fn set_transactions(&self, txs: &[&str]) -> &Self { + *self.txs.lock().unwrap() = txs .iter() .map(|tx_hex| deserialize(&Vec::::from_hex(tx_hex).unwrap()).unwrap()) .collect(); self } - pub(crate) fn set_script_balance_sat(&mut self, script_balance_sat: u64) -> &mut Self { - self.script_balance_sat = script_balance_sat; + pub(crate) fn set_script_balance_sat(&self, script_balance_sat: u64) -> &Self { + *self.script_balance_sat.lock().unwrap() = script_balance_sat; self } } #[async_trait] impl BitcoinChainService for MockBitcoinChainService { - fn tip(&mut self) -> Result { + fn tip(&self) -> Result { Ok(HeaderNotification { height: 0, header: genesis_block(lwk_wollet::bitcoin::Network::Testnet).header, @@ -185,11 +185,18 @@ impl BitcoinChainService for MockBitcoinChainService { &self, _txids: &[boltz_client::bitcoin::Txid], ) -> Result> { - Ok(self.txs.clone()) + Ok(self.txs.lock().unwrap().clone()) } fn get_script_history(&self, _script: &Script) -> Result> { - Ok(self.history.clone().into_iter().map(Into::into).collect()) + Ok(self + .history + .lock() + .unwrap() + .clone() + .into_iter() + .map(Into::into) + .collect()) } async fn get_script_history_with_retry( @@ -197,7 +204,14 @@ impl BitcoinChainService for MockBitcoinChainService { _script: &Script, _retries: u64, ) -> Result> { - Ok(self.history.clone().into_iter().map(Into::into).collect()) + Ok(self + .history + .lock() + .unwrap() + .clone() + .into_iter() + .map(Into::into) + .collect()) } fn get_scripts_history(&self, _scripts: &[&Script]) -> Result>> { @@ -233,7 +247,7 @@ impl BitcoinChainService for MockBitcoinChainService { _retries: u64, ) -> Result { Ok(GetBalanceRes { - confirmed: self.script_balance_sat, + confirmed: self.script_balance_sat.lock().unwrap().clone(), unconfirmed: 0, }) } diff --git a/lib/core/src/test_utils/chain_swap.rs b/lib/core/src/test_utils/chain_swap.rs index 787bef0..dc10d9a 100644 --- a/lib/core/src/test_utils/chain_swap.rs +++ b/lib/core/src/test_utils/chain_swap.rs @@ -6,8 +6,6 @@ use lazy_static::lazy_static; use sdk_common::bitcoin::{consensus::deserialize, Transaction}; use std::sync::Arc; -use tokio::sync::Mutex; - use crate::{ chain_swap::ChainSwapHandler, model::{ChainSwap, Config, Direction, PaymentState, Signer}, @@ -35,7 +33,7 @@ pub(crate) fn new_chain_swap_handler(persister: Arc) -> Result, ) -> Result { let liquid_chain_service = Arc::new(MockLiquidChainService::new()); - let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); + let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new()); Recoverer::new( signer.slip77_master_blinding_key()?, diff --git a/lib/core/src/test_utils/sdk.rs b/lib/core/src/test_utils/sdk.rs index 7ac8d76..4169e1f 100644 --- a/lib/core/src/test_utils/sdk.rs +++ b/lib/core/src/test_utils/sdk.rs @@ -4,7 +4,7 @@ use anyhow::{anyhow, Result}; use sdk_common::prelude::{BreezServer, STAGING_BREEZSERVER_URL}; use std::sync::Arc; -use tokio::sync::{watch, Mutex, RwLock}; +use tokio::sync::{watch, RwLock}; use crate::{ buy::BuyBitcoinService, @@ -32,7 +32,7 @@ pub(crate) fn new_liquid_sdk( status_stream: Arc, ) -> Result { let liquid_chain_service = Arc::new(MockLiquidChainService::new()); - let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); + let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new()); new_liquid_sdk_with_chain_services( persister, @@ -49,7 +49,7 @@ pub(crate) fn new_liquid_sdk_with_chain_services( swapper: Arc, status_stream: Arc, liquid_chain_service: Arc, - bitcoin_chain_service: Arc>, + bitcoin_chain_service: Arc, onchain_fee_rate_leeway_sat_per_vbyte: Option, ) -> Result { let mut config = Config::testnet(None); From ec6b4d29a8bd17dbb7932ee8854e13a6c181b950 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Tue, 28 Jan 2025 22:02:56 +0200 Subject: [PATCH 4/7] cargo fmt --- lib/core/src/chain_swap.rs | 39 +++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/lib/core/src/chain_swap.rs b/lib/core/src/chain_swap.rs index db512aa..f59c308 100644 --- a/lib/core/src/chain_swap.rs +++ b/lib/core/src/chain_swap.rs @@ -454,7 +454,7 @@ impl ChainSwapHandler { let script_pubkey = swap.get_receive_lockup_swap_script_pubkey(self.config.network)?; let script_balance = self - .bitcoin_chain_service + .bitcoin_chain_service .script_get_balance_with_retry(script_pubkey.as_script(), 10) .await?; debug!("Found lockup balance {script_balance:?}"); @@ -757,7 +757,7 @@ impl ChainSwapHandler { .await?; let lockup_tx_id = self - .liquid_chain_service + .liquid_chain_service .broadcast(&lockup_tx) .await? .to_string(); @@ -839,7 +839,7 @@ impl ChainSwapHandler { Ok(_) => { let broadcast_res = match claim_tx { // We attempt broadcasting via chain service, then fallback to Boltz - SdkTransaction::Liquid(tx) => { + SdkTransaction::Liquid(tx) => { self.liquid_chain_service .broadcast(&tx) .await @@ -852,7 +852,7 @@ impl ChainSwapHandler { self.swapper.broadcast_tx(self.config.network.into(), &claim_tx_hex) }) } - SdkTransaction::Bitcoin(tx) => { + SdkTransaction::Bitcoin(tx) => { self.bitcoin_chain_service .broadcast(&tx) .map(|tx_id| tx_id.to_hex()) @@ -974,12 +974,15 @@ impl ChainSwapHandler { err: "Unexpected swap script type found".to_string(), }); }; - + let script_pk = swap_script .to_address(self.config.network.as_bitcoin_chain()) .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))? .script_pubkey(); - let utxos = self.bitcoin_chain_service.get_script_utxos(&script_pk).await?; + let utxos = self + .bitcoin_chain_service + .get_script_utxos(&script_pk) + .await?; let SdkTransaction::Bitcoin(refund_tx) = self.swapper.create_refund_tx( Swap::Chain(swap.clone()), @@ -993,7 +996,10 @@ impl ChainSwapHandler { err: format!("Unexpected refund tx type returned for incoming Chain swap {id}",), }); }; - let refund_tx_id = self.bitcoin_chain_service.broadcast(&refund_tx)?.to_string(); + let refund_tx_id = self + .bitcoin_chain_service + .broadcast(&refund_tx)? + .to_string(); info!("Successfully broadcast refund for incoming Chain Swap {id}, is_cooperative: {is_cooperative}"); @@ -1036,13 +1042,15 @@ impl ChainSwapHandler { }); }; - let script_pk = swap_script .to_address(self.config.network.into()) .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))? .to_unconfidential() .script_pubkey(); - let utxos = self.liquid_chain_service.get_script_utxos(&script_pk).await?; + let utxos = self + .liquid_chain_service + .get_script_utxos(&script_pk) + .await?; let refund_address = self.onchain_wallet.next_unused_address().await?.to_string(); let SdkTransaction::Liquid(refund_tx) = self.swapper.create_refund_tx( @@ -1060,7 +1068,8 @@ impl ChainSwapHandler { ), }); }; - let refund_tx_id = self.liquid_chain_service + let refund_tx_id = self + .liquid_chain_service .broadcast(&refund_tx) .await? .to_string(); @@ -1199,7 +1208,7 @@ impl ChainSwapHandler { // Get full transaction let txs = self - .bitcoin_chain_service + .bitcoin_chain_service .get_transactions(&[first_tx_id])?; let user_lockup_tx = txs.first().ok_or(anyhow!( "No transactions found for user lockup script for swap {}", @@ -1255,7 +1264,7 @@ impl ChainSwapHandler { .to_address(self.config.network.into()) .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?; let tx = self - .liquid_chain_service + .liquid_chain_service .verify_tx( &address, &swap_update_tx.id, @@ -1318,7 +1327,7 @@ impl ChainSwapHandler { .to_address(self.config.network.as_bitcoin_chain()) .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?; let tx = self - .bitcoin_chain_service + .bitcoin_chain_service .verify_tx( &address, &swap_update_tx.id, @@ -1416,7 +1425,7 @@ impl ChainSwapHandler { .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?; let script_pubkey = address.script_pubkey(); let script = script_pubkey.as_script(); - self.bitcoin_chain_service + self.bitcoin_chain_service .get_script_history_with_retry(script, 5) .await } @@ -1432,7 +1441,7 @@ impl ChainSwapHandler { .to_unconfidential(); let script = Script::from_hex(hex::encode(address.script_pubkey().as_bytes()).as_str()) .map_err(|e| anyhow!("Failed to get script from address {e:?}"))?; - self.liquid_chain_service + self.liquid_chain_service .get_script_history_with_retry(&script, 5) .await } From 7ad4de93812af695b835e9ee958fe581fde67290 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Tue, 28 Jan 2025 22:33:05 +0200 Subject: [PATCH 5/7] Add logs --- lib/core/src/recover/recoverer.rs | 33 ++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/lib/core/src/recover/recoverer.rs b/lib/core/src/recover/recoverer.rs index e0e3909..80b9b40 100644 --- a/lib/core/src/recover/recoverer.rs +++ b/lib/core/src/recover/recoverer.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::{anyhow, ensure, Result}; use boltz_client::{ElementsAddress, ToHex as _}; use electrum_client::GetBalanceRes; -use log::{debug, error, warn}; +use log::{debug, error, info, warn}; use lwk_wollet::bitcoin::Witness; use lwk_wollet::elements::{secp256k1_zkp, AddressParams, Txid}; use lwk_wollet::elements_miniscript::slip77::MasterBlindingKey; @@ -364,10 +364,18 @@ impl Recoverer { /// For a given [SwapList], this fetches the script histories from the chain services async fn fetch_swaps_histories(&self, swaps_list: &SwapsList) -> Result { let swap_lbtc_scripts = swaps_list.get_swap_lbtc_scripts(); + + let t0 = std::time::Instant::now(); let lbtc_script_histories = self .liquid_chain_service .get_scripts_history(&swap_lbtc_scripts.iter().collect::>()) .await?; + info!( + "Recoverer executed liquid get_scripts_history for {} scripts in {} milliseconds", + swap_lbtc_scripts.len(), + t0.elapsed().as_millis() + ); + let lbtc_swap_scripts_len = swap_lbtc_scripts.len(); let lbtc_script_histories_len = lbtc_script_histories.len(); ensure!( @@ -385,9 +393,18 @@ impl Recoverer { .iter() .map(|x| x.as_script()) .collect::>(); + + let t0 = std::time::Instant::now(); let btc_script_histories = self .bitcoin_chain_service .get_scripts_history(&swap_btc_scripts)?; + + info!( + "Recoverer executed bitcoin get_scripts_history for {} scripts in {} milliseconds", + swap_btc_scripts.len(), + t0.elapsed().as_millis() + ); + let btx_script_tx_ids: Vec = btc_script_histories .iter() .flatten() @@ -408,12 +425,26 @@ impl Recoverer { .map(|(k, v)| (k, v.iter().map(HistoryTxId::from).collect())) .collect(); + let t0 = std::time::Instant::now(); let btc_script_txs = self .bitcoin_chain_service .get_transactions(&btx_script_tx_ids)?; + info!( + "Recoverer executed bitcoin get_transactions for {} transactions in {} milliseconds", + btx_script_tx_ids.len(), + t0.elapsed().as_millis() + ); + + let t0 = std::time::Instant::now(); let btc_script_balances = self .bitcoin_chain_service .scripts_get_balance(&swap_btc_scripts)?; + info!( + "Recoverer executed bitcoin scripts_get_balance for {} scripts in {} milliseconds", + swap_btc_scripts.len(), + t0.elapsed().as_millis() + ); + let btc_script_to_txs_map: HashMap> = swap_btc_script_bufs .clone() From fb37e7bcccad3df2513869a3fbb89c3132cc1a1f Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Wed, 29 Jan 2025 10:38:11 +0200 Subject: [PATCH 6/7] fix formatting --- lib/core/src/recover/recoverer.rs | 14 -------------- lib/core/src/sdk.rs | 2 +- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/lib/core/src/recover/recoverer.rs b/lib/core/src/recover/recoverer.rs index 80b9b40..fbe1f48 100644 --- a/lib/core/src/recover/recoverer.rs +++ b/lib/core/src/recover/recoverer.rs @@ -163,20 +163,6 @@ impl Recoverer { /// - `tx_map`: all known onchain txs of this wallet at this time, essentially our own LWK cache. /// - `swaps`: immutable data of the swaps for which we want to recover onchain data. pub(crate) async fn recover_from_onchain(&self, swaps: &mut [Swap]) -> Result<()> { - //println!("swaps: {:?}", swaps.iter().map(|s|s.).collect::>()); - for swap in swaps.iter() { - match swap { - Swap::Send(send_swap) => { - println!("send_swap: {:?}", send_swap.id); - } - Swap::Receive(receive_swap) => { - println!("receive_swap: {:?}", receive_swap.id); - } - Swap::Chain(chain_swap) => { - println!("chain_swap: {:?}", chain_swap.id); - } - } - } let tx_map = TxMap::from_raw_tx_map(self.onchain_wallet.transactions_by_tx_id().await?); let swaps_list = swaps.to_vec().try_into()?; diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index baf89b2..1b7655e 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -351,7 +351,7 @@ impl LiquidSdk { loop { tokio::select! { _ = interval.tick() => { - info!("Track blocks loop ticked"); + info!("Track blocks loop ticked"); // Get the Liquid tip and process a new block let t0 = Instant::now(); let liquid_tip_res = cloned.liquid_chain_service.tip().await; From 461f54be38f1897608deb5ee3a8a06578f6268d5 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Wed, 29 Jan 2025 12:07:39 +0200 Subject: [PATCH 7/7] cleanup tip endpoint --- lib/core/src/chain/bitcoin.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/lib/core/src/chain/bitcoin.rs b/lib/core/src/chain/bitcoin.rs index 4c68a2e..149771c 100644 --- a/lib/core/src/chain/bitcoin.rs +++ b/lib/core/src/chain/bitcoin.rs @@ -107,24 +107,27 @@ impl BitcoinChainService for HybridBitcoinChainService { maybe_popped_header = Some(header) } - match maybe_popped_header { - Some(popped_header) => { - let tip: HeaderNotification = popped_header.try_into()?; - *self.tip.lock().unwrap() = tip; - } + let new_tip = match maybe_popped_header { + Some(popped_header) => Some(popped_header.try_into()?), None => { // https://github.com/bitcoindevkit/rust-electrum-client/issues/124 // It might be that the client has reconnected and subscriptions don't persist // across connections. Calling `client.ping()` won't help here because the // successful retry will prevent us knowing about the reconnect. if let Ok(header) = self.client.block_headers_subscribe_raw() { - let tip: HeaderNotification = header.try_into()?; - *self.tip.lock().unwrap() = tip; + Some(header.try_into()?) + } else { + None } } + }; + + let mut tip = self.tip.lock().unwrap(); + if let Some(new_tip) = new_tip { + *tip = new_tip; } - Ok(self.tip.lock().unwrap().clone()) + Ok(tip.clone()) } fn broadcast(&self, tx: &Transaction) -> Result {