diff --git a/lib/core/src/chain/bitcoin.rs b/lib/core/src/chain/bitcoin.rs index 5d11d92..149771c 100644 --- a/lib/core/src/chain/bitcoin.rs +++ b/lib/core/src/chain/bitcoin.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{sync::Mutex, time::Duration}; use anyhow::{anyhow, Result}; use async_trait::async_trait; @@ -24,7 +24,7 @@ use crate::{ #[async_trait] pub trait BitcoinChainService: Send + Sync { /// Get the blockchain latest block - fn tip(&mut self) -> Result; + fn tip(&self) -> Result; /// Broadcast a transaction fn broadcast(&self, tx: &Transaction) -> Result; @@ -76,7 +76,7 @@ pub trait BitcoinChainService: Send + Sync { pub(crate) struct HybridBitcoinChainService { client: Client, - tip: HeaderNotification, + tip: Mutex, config: Config, } impl HybridBitcoinChainService { @@ -93,7 +93,7 @@ impl HybridBitcoinChainService { Ok(Self { client, - tip, + tip: Mutex::new(tip), config, }) } @@ -101,30 +101,33 @@ impl HybridBitcoinChainService { #[async_trait] impl BitcoinChainService for HybridBitcoinChainService { - fn tip(&mut self) -> Result { + fn tip(&self) -> Result { let mut maybe_popped_header = None; while let Some(header) = self.client.block_headers_pop_raw()? { maybe_popped_header = Some(header) } - match maybe_popped_header { - Some(popped_header) => { - let tip: HeaderNotification = popped_header.try_into()?; - self.tip = tip; - } + let new_tip = match maybe_popped_header { + Some(popped_header) => Some(popped_header.try_into()?), None => { // https://github.com/bitcoindevkit/rust-electrum-client/issues/124 // It might be that the client has reconnected and subscriptions don't persist // across connections. Calling `client.ping()` won't help here because the // successful retry will prevent us knowing about the reconnect. if let Ok(header) = self.client.block_headers_subscribe_raw() { - let tip: HeaderNotification = header.try_into()?; - self.tip = tip; + Some(header.try_into()?) + } else { + None } } + }; + + let mut tip = self.tip.lock().unwrap(); + if let Some(new_tip) = new_tip { + *tip = new_tip; } - Ok(self.tip.clone()) + Ok(tip.clone()) } fn broadcast(&self, tx: &Transaction) -> Result { diff --git a/lib/core/src/chain/liquid.rs b/lib/core/src/chain/liquid.rs index f2d1c78..2fa3823 100644 --- a/lib/core/src/chain/liquid.rs +++ b/lib/core/src/chain/liquid.rs @@ -1,3 +1,4 @@ +use std::sync::Mutex; use std::time::Duration; use anyhow::{anyhow, Result}; @@ -17,7 +18,7 @@ use crate::{model::Config, utils}; #[async_trait] pub trait LiquidChainService: Send + Sync { /// Get the blockchain latest block - async fn tip(&mut self) -> Result; + async fn tip(&self) -> Result; /// Broadcast a transaction async fn broadcast(&self, tx: &Transaction) -> Result; @@ -58,20 +59,25 @@ pub trait LiquidChainService: Send + Sync { pub(crate) struct HybridLiquidChainService { electrum_client: ElectrumClient, + tip_client: Mutex, } impl HybridLiquidChainService { pub(crate) fn new(config: Config) -> Result { let electrum_url = ElectrumUrl::new(&config.liquid_electrum_url, true, true)?; let electrum_client = ElectrumClient::new(&electrum_url)?; - Ok(Self { electrum_client }) + let tip_client = ElectrumClient::new(&electrum_url)?; + Ok(Self { + electrum_client, + tip_client: Mutex::new(tip_client), + }) } } #[async_trait] impl LiquidChainService for HybridLiquidChainService { - async fn tip(&mut self) -> Result { - Ok(self.electrum_client.tip()?.height) + async fn tip(&self) -> Result { + Ok(self.tip_client.lock().unwrap().tip()?.height) } async fn broadcast(&self, tx: &Transaction) -> Result { diff --git a/lib/core/src/chain_swap.rs b/lib/core/src/chain_swap.rs index 1e95b4e..f59c308 100644 --- a/lib/core/src/chain_swap.rs +++ b/lib/core/src/chain_swap.rs @@ -14,7 +14,7 @@ use lwk_wollet::{ hashes::hex::DisplayHex, History, }; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::broadcast; use crate::model::{BlockListener, ChainSwapUpdate, LIQUID_FEE_RATE_MSAT_PER_VBYTE}; use crate::{ @@ -41,8 +41,8 @@ pub(crate) struct ChainSwapHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, - liquid_chain_service: Arc>, - bitcoin_chain_service: Arc>, + liquid_chain_service: Arc, + bitcoin_chain_service: Arc, subscription_notifier: broadcast::Sender, } @@ -70,8 +70,8 @@ impl ChainSwapHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, - liquid_chain_service: Arc>, - bitcoin_chain_service: Arc>, + liquid_chain_service: Arc, + bitcoin_chain_service: Arc, ) -> Result { let (subscription_notifier, _) = broadcast::channel::(30); Ok(Self { @@ -455,8 +455,6 @@ impl ChainSwapHandler { let script_pubkey = swap.get_receive_lockup_swap_script_pubkey(self.config.network)?; let script_balance = self .bitcoin_chain_service - .lock() - .await .script_get_balance_with_retry(script_pubkey.as_script(), 10) .await?; debug!("Found lockup balance {script_balance:?}"); @@ -760,8 +758,6 @@ impl ChainSwapHandler { let lockup_tx_id = self .liquid_chain_service - .lock() - .await .broadcast(&lockup_tx) .await? .to_string(); @@ -844,8 +840,7 @@ impl ChainSwapHandler { let broadcast_res = match claim_tx { // We attempt broadcasting via chain service, then fallback to Boltz SdkTransaction::Liquid(tx) => { - let liquid_chain_service = self.liquid_chain_service.lock().await; - liquid_chain_service + self.liquid_chain_service .broadcast(&tx) .await .map(|tx_id| tx_id.to_hex()) @@ -858,8 +853,7 @@ impl ChainSwapHandler { }) } SdkTransaction::Bitcoin(tx) => { - let bitcoin_chain_service = self.bitcoin_chain_service.lock().await; - bitcoin_chain_service + self.bitcoin_chain_service .broadcast(&tx) .map(|tx_id| tx_id.to_hex()) .map_err(|err| PaymentError::Generic { @@ -981,12 +975,14 @@ impl ChainSwapHandler { }); }; - let bitcoin_chain_service = self.bitcoin_chain_service.lock().await; let script_pk = swap_script .to_address(self.config.network.as_bitcoin_chain()) .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))? .script_pubkey(); - let utxos = bitcoin_chain_service.get_script_utxos(&script_pk).await?; + let utxos = self + .bitcoin_chain_service + .get_script_utxos(&script_pk) + .await?; let SdkTransaction::Bitcoin(refund_tx) = self.swapper.create_refund_tx( Swap::Chain(swap.clone()), @@ -1000,7 +996,10 @@ impl ChainSwapHandler { err: format!("Unexpected refund tx type returned for incoming Chain swap {id}",), }); }; - let refund_tx_id = bitcoin_chain_service.broadcast(&refund_tx)?.to_string(); + let refund_tx_id = self + .bitcoin_chain_service + .broadcast(&refund_tx)? + .to_string(); info!("Successfully broadcast refund for incoming Chain Swap {id}, is_cooperative: {is_cooperative}"); @@ -1043,13 +1042,15 @@ impl ChainSwapHandler { }); }; - let liquid_chain_service = self.liquid_chain_service.lock().await; let script_pk = swap_script .to_address(self.config.network.into()) .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))? .to_unconfidential() .script_pubkey(); - let utxos = liquid_chain_service.get_script_utxos(&script_pk).await?; + let utxos = self + .liquid_chain_service + .get_script_utxos(&script_pk) + .await?; let refund_address = self.onchain_wallet.next_unused_address().await?.to_string(); let SdkTransaction::Liquid(refund_tx) = self.swapper.create_refund_tx( @@ -1067,7 +1068,8 @@ impl ChainSwapHandler { ), }); }; - let refund_tx_id = liquid_chain_service + let refund_tx_id = self + .liquid_chain_service .broadcast(&refund_tx) .await? .to_string(); @@ -1207,8 +1209,6 @@ impl ChainSwapHandler { // Get full transaction let txs = self .bitcoin_chain_service - .lock() - .await .get_transactions(&[first_tx_id])?; let user_lockup_tx = txs.first().ok_or(anyhow!( "No transactions found for user lockup script for swap {}", @@ -1265,8 +1265,6 @@ impl ChainSwapHandler { .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?; let tx = self .liquid_chain_service - .lock() - .await .verify_tx( &address, &swap_update_tx.id, @@ -1330,8 +1328,6 @@ impl ChainSwapHandler { .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?; let tx = self .bitcoin_chain_service - .lock() - .await .verify_tx( &address, &swap_update_tx.id, @@ -1430,8 +1426,6 @@ impl ChainSwapHandler { let script_pubkey = address.script_pubkey(); let script = script_pubkey.as_script(); self.bitcoin_chain_service - .lock() - .await .get_script_history_with_retry(script, 5) .await } @@ -1448,8 +1442,6 @@ impl ChainSwapHandler { let script = Script::from_hex(hex::encode(address.script_pubkey().as_bytes()).as_str()) .map_err(|e| anyhow!("Failed to get script from address {e:?}"))?; self.liquid_chain_service - .lock() - .await .get_script_history_with_retry(&script, 5) .await } diff --git a/lib/core/src/receive_swap.rs b/lib/core/src/receive_swap.rs index 46d0e27..4f41ace 100644 --- a/lib/core/src/receive_swap.rs +++ b/lib/core/src/receive_swap.rs @@ -9,7 +9,7 @@ use lwk_wollet::elements::secp256k1_zkp::Secp256k1; use lwk_wollet::elements::{Transaction, Txid}; use lwk_wollet::hashes::hex::DisplayHex; use lwk_wollet::secp256k1::SecretKey; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::broadcast; use crate::chain::liquid::LiquidChainService; use crate::model::{BlockListener, PaymentState::*}; @@ -32,7 +32,7 @@ pub(crate) struct ReceiveSwapHandler { persister: Arc, swapper: Arc, subscription_notifier: broadcast::Sender, - liquid_chain_service: Arc>, + liquid_chain_service: Arc, } #[async_trait] @@ -52,7 +52,7 @@ impl ReceiveSwapHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, - liquid_chain_service: Arc>, + liquid_chain_service: Arc, ) -> Self { let (subscription_notifier, _) = broadcast::channel::(30); Self { @@ -354,8 +354,7 @@ impl ReceiveSwapHandler { match self.persister.set_receive_swap_claim_tx_id(swap_id, &tx_id) { Ok(_) => { // We attempt broadcasting via chain service, then fallback to Boltz - let liquid_chain_service = self.liquid_chain_service.lock().await; - let broadcast_res = liquid_chain_service + let broadcast_res = self.liquid_chain_service .broadcast(&claim_tx) .await .map(|tx_id| tx_id.to_hex()) @@ -440,8 +439,6 @@ impl ReceiveSwapHandler { let swap_id = &receive_swap.id; let tx_hex = self .liquid_chain_service - .lock() - .await .get_transaction_hex(&Txid::from_str(&tx_id)?) .await? .ok_or(anyhow!("Lockup tx not found for Receive swap {swap_id}"))? @@ -516,8 +513,6 @@ impl ReceiveSwapHandler { .to_address(self.config.network.into()) .map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?; self.liquid_chain_service - .lock() - .await .verify_tx(&address, tx_id, tx_hex, verify_confirmation) .await } diff --git a/lib/core/src/recover/recoverer.rs b/lib/core/src/recover/recoverer.rs index 4402418..fbe1f48 100644 --- a/lib/core/src/recover/recoverer.rs +++ b/lib/core/src/recover/recoverer.rs @@ -3,14 +3,13 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::{anyhow, ensure, Result}; use boltz_client::{ElementsAddress, ToHex as _}; use electrum_client::GetBalanceRes; -use log::{debug, error, warn}; +use log::{debug, error, info, warn}; use lwk_wollet::bitcoin::Witness; use lwk_wollet::elements::{secp256k1_zkp, AddressParams, Txid}; use lwk_wollet::elements_miniscript::slip77::MasterBlindingKey; use lwk_wollet::hashes::hex::{DisplayHex, FromHex}; use lwk_wollet::hashes::{sha256, Hash as _}; use lwk_wollet::WalletTx; -use tokio::sync::Mutex; use super::model::*; @@ -27,8 +26,8 @@ pub(crate) struct Recoverer { master_blinding_key: MasterBlindingKey, swapper: Arc, onchain_wallet: Arc, - liquid_chain_service: Arc>, - bitcoin_chain_service: Arc>, + liquid_chain_service: Arc, + bitcoin_chain_service: Arc, } impl Recoverer { @@ -36,8 +35,8 @@ impl Recoverer { master_blinding_key: Vec, swapper: Arc, onchain_wallet: Arc, - liquid_chain_service: Arc>, - bitcoin_chain_service: Arc>, + liquid_chain_service: Arc, + bitcoin_chain_service: Arc, ) -> Result { Ok(Self { master_blinding_key: MasterBlindingKey::from_hex( @@ -79,8 +78,6 @@ impl Recoverer { let claim_tx_ids: Vec = failed_cooperative.values().cloned().collect(); let claim_txs = self .liquid_chain_service - .lock() - .await .get_transactions(claim_tx_ids.as_slice()) .await .map_err(|e| anyhow!("Failed to fetch claim txs from recovery: {e}"))?; @@ -202,8 +199,8 @@ impl Recoverer { &swaps_list.receive_chain_swap_immutable_data_by_swap_id, )?; - let bitcoin_tip = self.bitcoin_chain_service.lock().await.tip()?; - let liquid_tip = self.liquid_chain_service.lock().await.tip().await?; + let bitcoin_tip = self.bitcoin_chain_service.tip()?; + let liquid_tip = self.liquid_chain_service.tip().await?; for swap in swaps.iter_mut() { let swap_id = &swap.id(); @@ -353,12 +350,18 @@ impl Recoverer { /// For a given [SwapList], this fetches the script histories from the chain services async fn fetch_swaps_histories(&self, swaps_list: &SwapsList) -> Result { let swap_lbtc_scripts = swaps_list.get_swap_lbtc_scripts(); + + let t0 = std::time::Instant::now(); let lbtc_script_histories = self .liquid_chain_service - .lock() - .await .get_scripts_history(&swap_lbtc_scripts.iter().collect::>()) .await?; + info!( + "Recoverer executed liquid get_scripts_history for {} scripts in {} milliseconds", + swap_lbtc_scripts.len(), + t0.elapsed().as_millis() + ); + let lbtc_swap_scripts_len = swap_lbtc_scripts.len(); let lbtc_script_histories_len = lbtc_script_histories.len(); ensure!( @@ -371,13 +374,23 @@ impl Recoverer { .map(|(k, v)| (k, v.into_iter().map(HistoryTxId::from).collect())) .collect(); - let bitcoin_chain_service = self.bitcoin_chain_service.lock().await; let swap_btc_script_bufs = swaps_list.get_swap_btc_scripts(); let swap_btc_scripts = swap_btc_script_bufs .iter() .map(|x| x.as_script()) .collect::>(); - let btc_script_histories = bitcoin_chain_service.get_scripts_history(&swap_btc_scripts)?; + + let t0 = std::time::Instant::now(); + let btc_script_histories = self + .bitcoin_chain_service + .get_scripts_history(&swap_btc_scripts)?; + + info!( + "Recoverer executed bitcoin get_scripts_history for {} scripts in {} milliseconds", + swap_btc_scripts.len(), + t0.elapsed().as_millis() + ); + let btx_script_tx_ids: Vec = btc_script_histories .iter() .flatten() @@ -398,8 +411,26 @@ impl Recoverer { .map(|(k, v)| (k, v.iter().map(HistoryTxId::from).collect())) .collect(); - let btc_script_txs = bitcoin_chain_service.get_transactions(&btx_script_tx_ids)?; - let btc_script_balances = bitcoin_chain_service.scripts_get_balance(&swap_btc_scripts)?; + let t0 = std::time::Instant::now(); + let btc_script_txs = self + .bitcoin_chain_service + .get_transactions(&btx_script_tx_ids)?; + info!( + "Recoverer executed bitcoin get_transactions for {} transactions in {} milliseconds", + btx_script_tx_ids.len(), + t0.elapsed().as_millis() + ); + + let t0 = std::time::Instant::now(); + let btc_script_balances = self + .bitcoin_chain_service + .scripts_get_balance(&swap_btc_scripts)?; + info!( + "Recoverer executed bitcoin scripts_get_balance for {} scripts in {} milliseconds", + swap_btc_scripts.len(), + t0.elapsed().as_millis() + ); + let btc_script_to_txs_map: HashMap> = swap_btc_script_bufs .clone() diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 5749d0b..1b7655e 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -26,7 +26,7 @@ use sdk_common::input_parser::InputType; use sdk_common::liquid::LiquidAddressData; use sdk_common::prelude::{FiatAPI, FiatCurrency, LnUrlPayError, LnUrlWithdrawError, Rate}; use signer::SdkSigner; -use tokio::sync::{watch, Mutex, RwLock}; +use tokio::sync::{watch, RwLock}; use tokio::time::MissedTickBehavior; use tokio_stream::wrappers::BroadcastStream; use x509_parser::parse_x509_certificate; @@ -75,8 +75,8 @@ pub struct LiquidSdk { pub(crate) status_stream: Arc, pub(crate) swapper: Arc, pub(crate) recoverer: Arc, - pub(crate) liquid_chain_service: Arc>, - pub(crate) bitcoin_chain_service: Arc>, + pub(crate) liquid_chain_service: Arc, + pub(crate) bitcoin_chain_service: Arc, pub(crate) fiat_api: Arc, pub(crate) is_started: RwLock, pub(crate) shutdown_sender: watch::Sender<()>, @@ -176,10 +176,8 @@ impl LiquidSdk { persister.init()?; persister.replace_asset_metadata(config.asset_metadata.clone())?; - let liquid_chain_service = - Arc::new(Mutex::new(HybridLiquidChainService::new(config.clone())?)); - let bitcoin_chain_service = - Arc::new(Mutex::new(HybridBitcoinChainService::new(config.clone())?)); + let liquid_chain_service = Arc::new(HybridLiquidChainService::new(config.clone())?); + let bitcoin_chain_service = Arc::new(HybridBitcoinChainService::new(config.clone())?); let onchain_wallet = Arc::new(LiquidOnchainWallet::new( config.clone(), @@ -353,8 +351,13 @@ impl LiquidSdk { loop { tokio::select! { _ = interval.tick() => { + info!("Track blocks loop ticked"); // Get the Liquid tip and process a new block - let liquid_tip_res = cloned.liquid_chain_service.lock().await.tip().await; + let t0 = Instant::now(); + let liquid_tip_res = cloned.liquid_chain_service.tip().await; + let duration_ms = Instant::now().duration_since(t0).as_millis(); + info!("Fetched liquid tip at ({duration_ms} ms)"); + let is_new_liquid_block = match &liquid_tip_res { Ok(height) => { debug!("Got Liquid tip: {height}"); @@ -368,7 +371,10 @@ impl LiquidSdk { } }; // Get the Bitcoin tip and process a new block - let bitcoin_tip_res = cloned.bitcoin_chain_service.lock().await.tip().map(|tip| tip.height as u32); + let t0 = Instant::now(); + let bitcoin_tip_res = cloned.bitcoin_chain_service.tip().map(|tip| tip.height as u32); + let duration_ms = Instant::now().duration_since(t0).as_millis(); + info!("Fetched bitcoin tip at ({duration_ms} ms)"); let is_new_bitcoin_block = match &bitcoin_tip_res { Ok(height) => { debug!("Got Bitcoin tip: {height}"); @@ -1348,8 +1354,7 @@ impl LiquidSdk { "Built onchain L-BTC tx with receiver_amount_sat = {receiver_amount_sat}, fees_sat = {fees_sat} and txid = {tx_id}" ); - let liquid_chain_service = self.liquid_chain_service.lock().await; - let tx_id = liquid_chain_service.broadcast(&tx).await?.to_string(); + let tx_id = self.liquid_chain_service.broadcast(&tx).await?.to_string(); // We insert a pseudo-tx in case LWK fails to pick up the new mempool tx for a while // This makes the tx known to the SDK (get_info, list_payments) instantly @@ -2363,8 +2368,6 @@ impl LiquidSdk { .collect(); let scripts_balance = self .bitcoin_chain_service - .lock() - .await .scripts_get_balance(&lockup_scripts)?; let mut refundables = vec![]; @@ -2460,6 +2463,7 @@ impl LiquidSdk { /// (within last [CHAIN_SWAP_MONITORING_PERIOD_BITCOIN_BLOCKS] blocks = ~30 days), calling this /// is not necessary as it happens automatically in the background. pub async fn rescan_onchain_swaps(&self) -> SdkResult<()> { + let t0 = Instant::now(); let mut rescannable_swaps: Vec = self .persister .list_chain_swaps()? @@ -2469,6 +2473,7 @@ impl LiquidSdk { self.recoverer .recover_from_onchain(&mut rescannable_swaps) .await?; + let scanned_len = rescannable_swaps.len(); for swap in rescannable_swaps { let swap_id = &swap.id(); if let Swap::Chain(chain_swap) = swap { @@ -2477,6 +2482,11 @@ impl LiquidSdk { } } } + info!( + "Rescanned {} chain swaps in {} seconds", + scanned_len, + t0.elapsed().as_millis() + ); Ok(()) } @@ -2570,8 +2580,8 @@ impl LiquidSdk { .collect(); match partial_sync { false => { - let bitcoin_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32; - let liquid_height = self.liquid_chain_service.lock().await.tip().await?; + let bitcoin_height = self.bitcoin_chain_service.tip()?.height as u32; + let liquid_height = self.liquid_chain_service.tip().await?; let final_swap_states = [PaymentState::Complete, PaymentState::Failed]; let send_swaps = self @@ -3282,12 +3292,7 @@ impl LiquidSdk { /// Get the recommended BTC fees based on the configured mempool.space instance. pub async fn recommended_fees(&self) -> Result { - Ok(self - .bitcoin_chain_service - .lock() - .await - .recommended_fees() - .await?) + Ok(self.bitcoin_chain_service.recommended_fees().await?) } /// Get the full default [Config] for specific [LiquidNetwork]. @@ -3373,7 +3378,6 @@ mod tests { swaps::boltz::{ChainSwapStates, RevSwapStates, SubSwapStates}, }; use lwk_wollet::{elements::Txid, hashes::hex::DisplayHex}; - use tokio::sync::Mutex; use crate::chain_swap::ESTIMATED_BTC_LOCKUP_TX_VSIZE; use crate::test_utils::chain_swap::{ @@ -3521,8 +3525,8 @@ mod tests { create_persister!(persister); let swapper = Arc::new(MockSwapper::default()); let status_stream = Arc::new(MockStatusStream::new()); - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); - let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); + let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new()); let sdk = Arc::new(new_liquid_sdk_with_chain_services( persister.clone(), @@ -3569,15 +3573,12 @@ mod tests { let height = (serde_json::to_string(&status).unwrap() == serde_json::to_string(&RevSwapStates::TransactionConfirmed).unwrap()) as i32; - liquid_chain_service - .lock() - .await - .set_history(vec![MockHistory { - txid: mock_tx_id, - height, - block_hash: None, - block_timestamp: None, - }]); + liquid_chain_service.set_history(vec![MockHistory { + txid: mock_tx_id, + height, + block_hash: None, + block_timestamp: None, + }]); let persisted_swap = trigger_swap_update!( "receive", @@ -3606,15 +3607,12 @@ mod tests { let height = (serde_json::to_string(&status).unwrap() == serde_json::to_string(&RevSwapStates::TransactionConfirmed).unwrap()) as i32; - liquid_chain_service - .lock() - .await - .set_history(vec![MockHistory { - txid: mock_tx_id, - height, - block_hash: None, - block_timestamp: None, - }]); + liquid_chain_service.set_history(vec![MockHistory { + txid: mock_tx_id, + height, + block_hash: None, + block_timestamp: None, + }]); let persisted_swap = trigger_swap_update!( "receive", @@ -3699,8 +3697,8 @@ mod tests { create_persister!(persister); let swapper = Arc::new(MockSwapper::default()); let status_stream = Arc::new(MockStatusStream::new()); - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); - let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); + let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new()); let sdk = Arc::new(new_liquid_sdk_with_chain_services( persister.clone(), @@ -3778,26 +3776,20 @@ mod tests { if let Some(user_lockup_tx_id) = user_lockup_tx_id { match direction { Direction::Incoming => { - bitcoin_chain_service - .lock() - .await - .set_history(vec![MockHistory { - txid: Txid::from_str(user_lockup_tx_id).unwrap(), - height: 0, - block_hash: None, - block_timestamp: None, - }]); + bitcoin_chain_service.set_history(vec![MockHistory { + txid: Txid::from_str(user_lockup_tx_id).unwrap(), + height: 0, + block_hash: None, + block_timestamp: None, + }]); } Direction::Outgoing => { - liquid_chain_service - .lock() - .await - .set_history(vec![MockHistory { - txid: Txid::from_str(user_lockup_tx_id).unwrap(), - height: 0, - block_hash: None, - block_timestamp: None, - }]); + liquid_chain_service.set_history(vec![MockHistory { + txid: Txid::from_str(user_lockup_tx_id).unwrap(), + height: 0, + block_hash: None, + block_timestamp: None, + }]); } } } @@ -3831,19 +3823,13 @@ mod tests { ChainSwapStates::TransactionConfirmed, ] { if direction == Direction::Incoming { - bitcoin_chain_service - .lock() - .await - .set_history(vec![MockHistory { - txid: Txid::from_str(&mock_user_lockup_tx_id).unwrap(), - height: 0, - block_hash: None, - block_timestamp: None, - }]); - bitcoin_chain_service - .lock() - .await - .set_transactions(&[&mock_user_lockup_tx_hex]); + bitcoin_chain_service.set_history(vec![MockHistory { + txid: Txid::from_str(&mock_user_lockup_tx_id).unwrap(), + height: 0, + block_hash: None, + block_timestamp: None, + }]); + bitcoin_chain_service.set_transactions(&[&mock_user_lockup_tx_hex]); } let persisted_swap = trigger_swap_update!( "chain", @@ -3944,8 +3930,8 @@ mod tests { create_persister!(persister); let swapper = Arc::new(MockSwapper::new()); let status_stream = Arc::new(MockStatusStream::new()); - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); - let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); + let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new()); let sdk = Arc::new(new_liquid_sdk_with_chain_services( persister.clone(), @@ -3968,10 +3954,7 @@ mod tests { user_lockup_sat, onchain_fee_increase_sat: fee_increase, }); - bitcoin_chain_service - .lock() - .await - .set_script_balance_sat(user_lockup_sat); + bitcoin_chain_service.set_script_balance_sat(user_lockup_sat); let persisted_swap = trigger_swap_update!( "chain", NewSwapArgs::default() @@ -4008,8 +3991,8 @@ mod tests { create_persister!(persister); let swapper = Arc::new(MockSwapper::new()); let status_stream = Arc::new(MockStatusStream::new()); - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); - let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); + let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new()); let sdk = Arc::new(new_liquid_sdk_with_chain_services( persister.clone(), @@ -4038,10 +4021,7 @@ mod tests { user_lockup_sat, onchain_fee_increase_sat: fee_increase, }); - bitcoin_chain_service - .lock() - .await - .set_script_balance_sat(user_lockup_sat); + bitcoin_chain_service.set_script_balance_sat(user_lockup_sat); let persisted_swap = trigger_swap_update!( "chain", NewSwapArgs::default() diff --git a/lib/core/src/send_swap.rs b/lib/core/src/send_swap.rs index 59ba1b4..16a623b 100644 --- a/lib/core/src/send_swap.rs +++ b/lib/core/src/send_swap.rs @@ -10,7 +10,7 @@ use log::{debug, error, info, warn}; use lwk_wollet::elements::{LockTime, Transaction}; use lwk_wollet::hashes::{sha256, Hash}; use sdk_common::prelude::{AesSuccessActionDataResult, SuccessAction, SuccessActionProcessed}; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::broadcast; use crate::chain::liquid::LiquidChainService; use crate::model::{ @@ -34,7 +34,7 @@ pub(crate) struct SendSwapHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, - chain_service: Arc>, + chain_service: Arc, subscription_notifier: broadcast::Sender, } @@ -55,7 +55,7 @@ impl SendSwapHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, - chain_service: Arc>, + chain_service: Arc, ) -> Self { let (subscription_notifier, _) = broadcast::channel::(30); Self { @@ -208,7 +208,7 @@ impl SendSwapHandler { info!("Broadcasting lockup tx {lockup_tx_id} for Send swap {swap_id}",); - let broadcast_result = self.chain_service.lock().await.broadcast(&lockup_tx).await; + let broadcast_result = self.chain_service.broadcast(&lockup_tx).await; if let Err(err) = broadcast_result { debug!("Could not broadcast lockup tx for Send Swap {swap_id}: {err:?}"); @@ -384,8 +384,6 @@ impl SendSwapHandler { // Get tx history of the swap script (lockup address) let history: Vec<_> = self .chain_service - .lock() - .await .get_script_history(&swap_script_pk) .await?; @@ -405,8 +403,6 @@ impl SendSwapHandler { let claim_tx_id = claim_tx_entry.txid; let claim_tx = self .chain_service - .lock() - .await .get_transactions(&[claim_tx_id]) .await .map_err(|e| anyhow!("Failed to fetch claim txs {claim_tx_id:?}: {e}"))? @@ -445,13 +441,12 @@ impl SendSwapHandler { let swap_script = swap.get_swap_script()?; let refund_address = self.onchain_wallet.next_unused_address().await?.to_string(); - let liquid_chain_service = self.chain_service.lock().await; let script_pk = swap_script .to_address(self.config.network.into()) .map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))? .to_unconfidential() .script_pubkey(); - let utxos = liquid_chain_service.get_script_utxos(&script_pk).await?; + let utxos = self.chain_service.get_script_utxos(&script_pk).await?; let SdkTransaction::Liquid(refund_tx) = self.swapper.create_refund_tx( Swap::Send(swap.clone()), &refund_address, @@ -467,10 +462,7 @@ impl SendSwapHandler { ), }); }; - let refund_tx_id = liquid_chain_service - .broadcast(&refund_tx) - .await? - .to_string(); + let refund_tx_id = self.chain_service.broadcast(&refund_tx).await?.to_string(); info!( "Successfully broadcast refund for Send Swap {}, is_cooperative: {is_cooperative}", diff --git a/lib/core/src/test_utils/chain.rs b/lib/core/src/test_utils/chain.rs index bd75a8d..e892f30 100644 --- a/lib/core/src/test_utils/chain.rs +++ b/lib/core/src/test_utils/chain.rs @@ -1,5 +1,7 @@ #![cfg(test)] +use std::sync::Mutex; + use anyhow::Result; use async_trait::async_trait; use boltz_client::{ @@ -47,7 +49,7 @@ impl From for lwk_wollet::History { #[derive(Default)] pub(crate) struct MockLiquidChainService { - history: Vec, + history: Mutex>, } impl MockLiquidChainService { @@ -55,15 +57,19 @@ impl MockLiquidChainService { MockLiquidChainService::default() } - pub(crate) fn set_history(&mut self, history: Vec) -> &mut Self { - self.history = history; + pub(crate) fn set_history(&self, history: Vec) -> &Self { + *self.history.lock().unwrap() = history; self } + + pub(crate) fn get_history(&self) -> Vec { + self.history.lock().unwrap().clone() + } } #[async_trait] impl LiquidChainService for MockLiquidChainService { - async fn tip(&mut self) -> Result { + async fn tip(&self) -> Result { Ok(0) } @@ -92,7 +98,7 @@ impl LiquidChainService for MockLiquidChainService { &self, _scripts: &ElementsScript, ) -> Result> { - Ok(self.history.clone().into_iter().map(Into::into).collect()) + Ok(self.get_history().into_iter().map(Into::into).collect()) } async fn get_script_history_with_retry( @@ -100,7 +106,7 @@ impl LiquidChainService for MockLiquidChainService { _script: &ElementsScript, _retries: u64, ) -> Result> { - Ok(self.history.clone().into_iter().map(Into::into).collect()) + Ok(self.get_history().into_iter().map(Into::into).collect()) } async fn get_scripts_history(&self, _scripts: &[&ElementsScript]) -> Result>> { @@ -126,42 +132,42 @@ impl LiquidChainService for MockLiquidChainService { } pub(crate) struct MockBitcoinChainService { - history: Vec, - txs: Vec, - script_balance_sat: u64, + history: Mutex>, + txs: Mutex>, + script_balance_sat: Mutex, } impl MockBitcoinChainService { pub(crate) fn new() -> Self { MockBitcoinChainService { - history: vec![], - txs: vec![], - script_balance_sat: 0, + history: Mutex::new(vec![]), + txs: Mutex::new(vec![]), + script_balance_sat: Mutex::new(0), } } - pub(crate) fn set_history(&mut self, history: Vec) -> &mut Self { - self.history = history; + pub(crate) fn set_history(&self, history: Vec) -> &Self { + *self.history.lock().unwrap() = history; self } - pub(crate) fn set_transactions(&mut self, txs: &[&str]) -> &mut Self { - self.txs = txs + pub(crate) fn set_transactions(&self, txs: &[&str]) -> &Self { + *self.txs.lock().unwrap() = txs .iter() .map(|tx_hex| deserialize(&Vec::::from_hex(tx_hex).unwrap()).unwrap()) .collect(); self } - pub(crate) fn set_script_balance_sat(&mut self, script_balance_sat: u64) -> &mut Self { - self.script_balance_sat = script_balance_sat; + pub(crate) fn set_script_balance_sat(&self, script_balance_sat: u64) -> &Self { + *self.script_balance_sat.lock().unwrap() = script_balance_sat; self } } #[async_trait] impl BitcoinChainService for MockBitcoinChainService { - fn tip(&mut self) -> Result { + fn tip(&self) -> Result { Ok(HeaderNotification { height: 0, header: genesis_block(lwk_wollet::bitcoin::Network::Testnet).header, @@ -179,11 +185,18 @@ impl BitcoinChainService for MockBitcoinChainService { &self, _txids: &[boltz_client::bitcoin::Txid], ) -> Result> { - Ok(self.txs.clone()) + Ok(self.txs.lock().unwrap().clone()) } fn get_script_history(&self, _script: &Script) -> Result> { - Ok(self.history.clone().into_iter().map(Into::into).collect()) + Ok(self + .history + .lock() + .unwrap() + .clone() + .into_iter() + .map(Into::into) + .collect()) } async fn get_script_history_with_retry( @@ -191,7 +204,14 @@ impl BitcoinChainService for MockBitcoinChainService { _script: &Script, _retries: u64, ) -> Result> { - Ok(self.history.clone().into_iter().map(Into::into).collect()) + Ok(self + .history + .lock() + .unwrap() + .clone() + .into_iter() + .map(Into::into) + .collect()) } fn get_scripts_history(&self, _scripts: &[&Script]) -> Result>> { @@ -227,7 +247,7 @@ impl BitcoinChainService for MockBitcoinChainService { _retries: u64, ) -> Result { Ok(GetBalanceRes { - confirmed: self.script_balance_sat, + confirmed: self.script_balance_sat.lock().unwrap().clone(), unconfirmed: 0, }) } diff --git a/lib/core/src/test_utils/chain_swap.rs b/lib/core/src/test_utils/chain_swap.rs index 3735b78..dc10d9a 100644 --- a/lib/core/src/test_utils/chain_swap.rs +++ b/lib/core/src/test_utils/chain_swap.rs @@ -6,8 +6,6 @@ use lazy_static::lazy_static; use sdk_common::bitcoin::{consensus::deserialize, Transaction}; use std::sync::Arc; -use tokio::sync::Mutex; - use crate::{ chain_swap::ChainSwapHandler, model::{ChainSwap, Config, Direction, PaymentState, Signer}, @@ -34,8 +32,8 @@ pub(crate) fn new_chain_swap_handler(persister: Arc) -> Result> = Arc::new(Box::new(MockSigner::new()?)); let onchain_wallet = Arc::new(MockWallet::new(signer)?); let swapper = Arc::new(BoltzSwapper::new(config.clone(), None)); - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); - let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); + let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new()); ChainSwapHandler::new( config, diff --git a/lib/core/src/test_utils/receive_swap.rs b/lib/core/src/test_utils/receive_swap.rs index 4245132..becaa1e 100644 --- a/lib/core/src/test_utils/receive_swap.rs +++ b/lib/core/src/test_utils/receive_swap.rs @@ -3,8 +3,6 @@ use anyhow::Result; use std::sync::Arc; -use tokio::sync::Mutex; - use crate::{ model::{Config, Signer}, persist::Persister, @@ -22,7 +20,7 @@ pub(crate) fn new_receive_swap_handler(persister: Arc) -> Result> = Arc::new(Box::new(MockSigner::new()?)); let onchain_wallet = Arc::new(MockWallet::new(signer)?); let swapper = Arc::new(MockSwapper::default()); - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); Ok(ReceiveSwapHandler::new( config, diff --git a/lib/core/src/test_utils/recover.rs b/lib/core/src/test_utils/recover.rs index b963f27..05e0e40 100644 --- a/lib/core/src/test_utils/recover.rs +++ b/lib/core/src/test_utils/recover.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use anyhow::Result; -use tokio::sync::Mutex; use crate::{ model::Signer, recover::recoverer::Recoverer, swapper::Swapper, wallet::OnchainWallet, @@ -14,8 +13,8 @@ pub(crate) fn new_recoverer( swapper: Arc, onchain_wallet: Arc, ) -> Result { - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); - let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); + let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new()); Recoverer::new( signer.slip77_master_blinding_key()?, diff --git a/lib/core/src/test_utils/sdk.rs b/lib/core/src/test_utils/sdk.rs index c8305e3..4169e1f 100644 --- a/lib/core/src/test_utils/sdk.rs +++ b/lib/core/src/test_utils/sdk.rs @@ -4,7 +4,7 @@ use anyhow::{anyhow, Result}; use sdk_common::prelude::{BreezServer, STAGING_BREEZSERVER_URL}; use std::sync::Arc; -use tokio::sync::{watch, Mutex, RwLock}; +use tokio::sync::{watch, RwLock}; use crate::{ buy::BuyBitcoinService, @@ -31,8 +31,8 @@ pub(crate) fn new_liquid_sdk( swapper: Arc, status_stream: Arc, ) -> Result { - let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); - let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new())); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); + let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new()); new_liquid_sdk_with_chain_services( persister, @@ -48,8 +48,8 @@ pub(crate) fn new_liquid_sdk_with_chain_services( persister: Arc, swapper: Arc, status_stream: Arc, - liquid_chain_service: Arc>, - bitcoin_chain_service: Arc>, + liquid_chain_service: Arc, + bitcoin_chain_service: Arc, onchain_fee_rate_leeway_sat_per_vbyte: Option, ) -> Result { let mut config = Config::testnet(None); diff --git a/lib/core/src/test_utils/send_swap.rs b/lib/core/src/test_utils/send_swap.rs index 8cc7adc..5141c63 100644 --- a/lib/core/src/test_utils/send_swap.rs +++ b/lib/core/src/test_utils/send_swap.rs @@ -8,7 +8,6 @@ use crate::{ send_swap::SendSwapHandler, }; use anyhow::Result; -use tokio::sync::Mutex; use super::{ chain::MockLiquidChainService, @@ -21,7 +20,7 @@ pub(crate) fn new_send_swap_handler(persister: Arc) -> Result> = Arc::new(Box::new(MockSigner::new()?)); let onchain_wallet = Arc::new(MockWallet::new(signer)?); let swapper = Arc::new(MockSwapper::default()); - let chain_service = Arc::new(Mutex::new(MockLiquidChainService::new())); + let chain_service = Arc::new(MockLiquidChainService::new()); Ok(SendSwapHandler::new( config,