Merge branch 'lock-free-liquid-chain-service'

* lock-free-liquid-chain-service:
  cleanup tip endpoint
  fix formatting
  Add logs
  cargo fmt
  remove lock for bitcoin chain service
  Add logs
  remove lock for liquid chain servivce
This commit is contained in:
Roei Erez
2025-01-29 12:28:50 +02:00
13 changed files with 225 additions and 212 deletions

View File

@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{sync::Mutex, time::Duration};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
@@ -24,7 +24,7 @@ use crate::{
#[async_trait]
pub trait BitcoinChainService: Send + Sync {
/// Get the blockchain latest block
fn tip(&mut self) -> Result<HeaderNotification>;
fn tip(&self) -> Result<HeaderNotification>;
/// Broadcast a transaction
fn broadcast(&self, tx: &Transaction) -> Result<Txid>;
@@ -76,7 +76,7 @@ pub trait BitcoinChainService: Send + Sync {
pub(crate) struct HybridBitcoinChainService {
client: Client,
tip: HeaderNotification,
tip: Mutex<HeaderNotification>,
config: Config,
}
impl HybridBitcoinChainService {
@@ -93,7 +93,7 @@ impl HybridBitcoinChainService {
Ok(Self {
client,
tip,
tip: Mutex::new(tip),
config,
})
}
@@ -101,30 +101,33 @@ impl HybridBitcoinChainService {
#[async_trait]
impl BitcoinChainService for HybridBitcoinChainService {
fn tip(&mut self) -> Result<HeaderNotification> {
fn tip(&self) -> Result<HeaderNotification> {
let mut maybe_popped_header = None;
while let Some(header) = self.client.block_headers_pop_raw()? {
maybe_popped_header = Some(header)
}
match maybe_popped_header {
Some(popped_header) => {
let tip: HeaderNotification = popped_header.try_into()?;
self.tip = tip;
}
let new_tip = match maybe_popped_header {
Some(popped_header) => Some(popped_header.try_into()?),
None => {
// https://github.com/bitcoindevkit/rust-electrum-client/issues/124
// It might be that the client has reconnected and subscriptions don't persist
// across connections. Calling `client.ping()` won't help here because the
// successful retry will prevent us knowing about the reconnect.
if let Ok(header) = self.client.block_headers_subscribe_raw() {
let tip: HeaderNotification = header.try_into()?;
self.tip = tip;
Some(header.try_into()?)
} else {
None
}
}
};
let mut tip = self.tip.lock().unwrap();
if let Some(new_tip) = new_tip {
*tip = new_tip;
}
Ok(self.tip.clone())
Ok(tip.clone())
}
fn broadcast(&self, tx: &Transaction) -> Result<Txid> {

View File

@@ -1,3 +1,4 @@
use std::sync::Mutex;
use std::time::Duration;
use anyhow::{anyhow, Result};
@@ -17,7 +18,7 @@ use crate::{model::Config, utils};
#[async_trait]
pub trait LiquidChainService: Send + Sync {
/// Get the blockchain latest block
async fn tip(&mut self) -> Result<u32>;
async fn tip(&self) -> Result<u32>;
/// Broadcast a transaction
async fn broadcast(&self, tx: &Transaction) -> Result<Txid>;
@@ -58,20 +59,25 @@ pub trait LiquidChainService: Send + Sync {
pub(crate) struct HybridLiquidChainService {
electrum_client: ElectrumClient,
tip_client: Mutex<ElectrumClient>,
}
impl HybridLiquidChainService {
pub(crate) fn new(config: Config) -> Result<Self> {
let electrum_url = ElectrumUrl::new(&config.liquid_electrum_url, true, true)?;
let electrum_client = ElectrumClient::new(&electrum_url)?;
Ok(Self { electrum_client })
let tip_client = ElectrumClient::new(&electrum_url)?;
Ok(Self {
electrum_client,
tip_client: Mutex::new(tip_client),
})
}
}
#[async_trait]
impl LiquidChainService for HybridLiquidChainService {
async fn tip(&mut self) -> Result<u32> {
Ok(self.electrum_client.tip()?.height)
async fn tip(&self) -> Result<u32> {
Ok(self.tip_client.lock().unwrap().tip()?.height)
}
async fn broadcast(&self, tx: &Transaction) -> Result<Txid> {

View File

@@ -14,7 +14,7 @@ use lwk_wollet::{
hashes::hex::DisplayHex,
History,
};
use tokio::sync::{broadcast, Mutex};
use tokio::sync::broadcast;
use crate::model::{BlockListener, ChainSwapUpdate, LIQUID_FEE_RATE_MSAT_PER_VBYTE};
use crate::{
@@ -41,8 +41,8 @@ pub(crate) struct ChainSwapHandler {
onchain_wallet: Arc<dyn OnchainWallet>,
persister: Arc<Persister>,
swapper: Arc<dyn Swapper>,
liquid_chain_service: Arc<Mutex<dyn LiquidChainService>>,
bitcoin_chain_service: Arc<Mutex<dyn BitcoinChainService>>,
liquid_chain_service: Arc<dyn LiquidChainService>,
bitcoin_chain_service: Arc<dyn BitcoinChainService>,
subscription_notifier: broadcast::Sender<String>,
}
@@ -70,8 +70,8 @@ impl ChainSwapHandler {
onchain_wallet: Arc<dyn OnchainWallet>,
persister: Arc<Persister>,
swapper: Arc<dyn Swapper>,
liquid_chain_service: Arc<Mutex<dyn LiquidChainService>>,
bitcoin_chain_service: Arc<Mutex<dyn BitcoinChainService>>,
liquid_chain_service: Arc<dyn LiquidChainService>,
bitcoin_chain_service: Arc<dyn BitcoinChainService>,
) -> Result<Self> {
let (subscription_notifier, _) = broadcast::channel::<String>(30);
Ok(Self {
@@ -455,8 +455,6 @@ impl ChainSwapHandler {
let script_pubkey = swap.get_receive_lockup_swap_script_pubkey(self.config.network)?;
let script_balance = self
.bitcoin_chain_service
.lock()
.await
.script_get_balance_with_retry(script_pubkey.as_script(), 10)
.await?;
debug!("Found lockup balance {script_balance:?}");
@@ -760,8 +758,6 @@ impl ChainSwapHandler {
let lockup_tx_id = self
.liquid_chain_service
.lock()
.await
.broadcast(&lockup_tx)
.await?
.to_string();
@@ -844,8 +840,7 @@ impl ChainSwapHandler {
let broadcast_res = match claim_tx {
// We attempt broadcasting via chain service, then fallback to Boltz
SdkTransaction::Liquid(tx) => {
let liquid_chain_service = self.liquid_chain_service.lock().await;
liquid_chain_service
self.liquid_chain_service
.broadcast(&tx)
.await
.map(|tx_id| tx_id.to_hex())
@@ -858,8 +853,7 @@ impl ChainSwapHandler {
})
}
SdkTransaction::Bitcoin(tx) => {
let bitcoin_chain_service = self.bitcoin_chain_service.lock().await;
bitcoin_chain_service
self.bitcoin_chain_service
.broadcast(&tx)
.map(|tx_id| tx_id.to_hex())
.map_err(|err| PaymentError::Generic {
@@ -981,12 +975,14 @@ impl ChainSwapHandler {
});
};
let bitcoin_chain_service = self.bitcoin_chain_service.lock().await;
let script_pk = swap_script
.to_address(self.config.network.as_bitcoin_chain())
.map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))?
.script_pubkey();
let utxos = bitcoin_chain_service.get_script_utxos(&script_pk).await?;
let utxos = self
.bitcoin_chain_service
.get_script_utxos(&script_pk)
.await?;
let SdkTransaction::Bitcoin(refund_tx) = self.swapper.create_refund_tx(
Swap::Chain(swap.clone()),
@@ -1000,7 +996,10 @@ impl ChainSwapHandler {
err: format!("Unexpected refund tx type returned for incoming Chain swap {id}",),
});
};
let refund_tx_id = bitcoin_chain_service.broadcast(&refund_tx)?.to_string();
let refund_tx_id = self
.bitcoin_chain_service
.broadcast(&refund_tx)?
.to_string();
info!("Successfully broadcast refund for incoming Chain Swap {id}, is_cooperative: {is_cooperative}");
@@ -1043,13 +1042,15 @@ impl ChainSwapHandler {
});
};
let liquid_chain_service = self.liquid_chain_service.lock().await;
let script_pk = swap_script
.to_address(self.config.network.into())
.map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))?
.to_unconfidential()
.script_pubkey();
let utxos = liquid_chain_service.get_script_utxos(&script_pk).await?;
let utxos = self
.liquid_chain_service
.get_script_utxos(&script_pk)
.await?;
let refund_address = self.onchain_wallet.next_unused_address().await?.to_string();
let SdkTransaction::Liquid(refund_tx) = self.swapper.create_refund_tx(
@@ -1067,7 +1068,8 @@ impl ChainSwapHandler {
),
});
};
let refund_tx_id = liquid_chain_service
let refund_tx_id = self
.liquid_chain_service
.broadcast(&refund_tx)
.await?
.to_string();
@@ -1207,8 +1209,6 @@ impl ChainSwapHandler {
// Get full transaction
let txs = self
.bitcoin_chain_service
.lock()
.await
.get_transactions(&[first_tx_id])?;
let user_lockup_tx = txs.first().ok_or(anyhow!(
"No transactions found for user lockup script for swap {}",
@@ -1265,8 +1265,6 @@ impl ChainSwapHandler {
.map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
let tx = self
.liquid_chain_service
.lock()
.await
.verify_tx(
&address,
&swap_update_tx.id,
@@ -1330,8 +1328,6 @@ impl ChainSwapHandler {
.map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
let tx = self
.bitcoin_chain_service
.lock()
.await
.verify_tx(
&address,
&swap_update_tx.id,
@@ -1430,8 +1426,6 @@ impl ChainSwapHandler {
let script_pubkey = address.script_pubkey();
let script = script_pubkey.as_script();
self.bitcoin_chain_service
.lock()
.await
.get_script_history_with_retry(script, 5)
.await
}
@@ -1448,8 +1442,6 @@ impl ChainSwapHandler {
let script = Script::from_hex(hex::encode(address.script_pubkey().as_bytes()).as_str())
.map_err(|e| anyhow!("Failed to get script from address {e:?}"))?;
self.liquid_chain_service
.lock()
.await
.get_script_history_with_retry(&script, 5)
.await
}

View File

@@ -9,7 +9,7 @@ use lwk_wollet::elements::secp256k1_zkp::Secp256k1;
use lwk_wollet::elements::{Transaction, Txid};
use lwk_wollet::hashes::hex::DisplayHex;
use lwk_wollet::secp256k1::SecretKey;
use tokio::sync::{broadcast, Mutex};
use tokio::sync::broadcast;
use crate::chain::liquid::LiquidChainService;
use crate::model::{BlockListener, PaymentState::*};
@@ -32,7 +32,7 @@ pub(crate) struct ReceiveSwapHandler {
persister: Arc<Persister>,
swapper: Arc<dyn Swapper>,
subscription_notifier: broadcast::Sender<String>,
liquid_chain_service: Arc<Mutex<dyn LiquidChainService>>,
liquid_chain_service: Arc<dyn LiquidChainService>,
}
#[async_trait]
@@ -52,7 +52,7 @@ impl ReceiveSwapHandler {
onchain_wallet: Arc<dyn OnchainWallet>,
persister: Arc<Persister>,
swapper: Arc<dyn Swapper>,
liquid_chain_service: Arc<Mutex<dyn LiquidChainService>>,
liquid_chain_service: Arc<dyn LiquidChainService>,
) -> Self {
let (subscription_notifier, _) = broadcast::channel::<String>(30);
Self {
@@ -354,8 +354,7 @@ impl ReceiveSwapHandler {
match self.persister.set_receive_swap_claim_tx_id(swap_id, &tx_id) {
Ok(_) => {
// We attempt broadcasting via chain service, then fallback to Boltz
let liquid_chain_service = self.liquid_chain_service.lock().await;
let broadcast_res = liquid_chain_service
let broadcast_res = self.liquid_chain_service
.broadcast(&claim_tx)
.await
.map(|tx_id| tx_id.to_hex())
@@ -440,8 +439,6 @@ impl ReceiveSwapHandler {
let swap_id = &receive_swap.id;
let tx_hex = self
.liquid_chain_service
.lock()
.await
.get_transaction_hex(&Txid::from_str(&tx_id)?)
.await?
.ok_or(anyhow!("Lockup tx not found for Receive swap {swap_id}"))?
@@ -516,8 +513,6 @@ impl ReceiveSwapHandler {
.to_address(self.config.network.into())
.map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
self.liquid_chain_service
.lock()
.await
.verify_tx(&address, tx_id, tx_hex, verify_confirmation)
.await
}

View File

@@ -3,14 +3,13 @@ use std::{collections::HashMap, sync::Arc};
use anyhow::{anyhow, ensure, Result};
use boltz_client::{ElementsAddress, ToHex as _};
use electrum_client::GetBalanceRes;
use log::{debug, error, warn};
use log::{debug, error, info, warn};
use lwk_wollet::bitcoin::Witness;
use lwk_wollet::elements::{secp256k1_zkp, AddressParams, Txid};
use lwk_wollet::elements_miniscript::slip77::MasterBlindingKey;
use lwk_wollet::hashes::hex::{DisplayHex, FromHex};
use lwk_wollet::hashes::{sha256, Hash as _};
use lwk_wollet::WalletTx;
use tokio::sync::Mutex;
use super::model::*;
@@ -27,8 +26,8 @@ pub(crate) struct Recoverer {
master_blinding_key: MasterBlindingKey,
swapper: Arc<dyn Swapper>,
onchain_wallet: Arc<dyn OnchainWallet>,
liquid_chain_service: Arc<Mutex<dyn LiquidChainService>>,
bitcoin_chain_service: Arc<Mutex<dyn BitcoinChainService>>,
liquid_chain_service: Arc<dyn LiquidChainService>,
bitcoin_chain_service: Arc<dyn BitcoinChainService>,
}
impl Recoverer {
@@ -36,8 +35,8 @@ impl Recoverer {
master_blinding_key: Vec<u8>,
swapper: Arc<dyn Swapper>,
onchain_wallet: Arc<dyn OnchainWallet>,
liquid_chain_service: Arc<Mutex<dyn LiquidChainService>>,
bitcoin_chain_service: Arc<Mutex<dyn BitcoinChainService>>,
liquid_chain_service: Arc<dyn LiquidChainService>,
bitcoin_chain_service: Arc<dyn BitcoinChainService>,
) -> Result<Self> {
Ok(Self {
master_blinding_key: MasterBlindingKey::from_hex(
@@ -79,8 +78,6 @@ impl Recoverer {
let claim_tx_ids: Vec<Txid> = failed_cooperative.values().cloned().collect();
let claim_txs = self
.liquid_chain_service
.lock()
.await
.get_transactions(claim_tx_ids.as_slice())
.await
.map_err(|e| anyhow!("Failed to fetch claim txs from recovery: {e}"))?;
@@ -202,8 +199,8 @@ impl Recoverer {
&swaps_list.receive_chain_swap_immutable_data_by_swap_id,
)?;
let bitcoin_tip = self.bitcoin_chain_service.lock().await.tip()?;
let liquid_tip = self.liquid_chain_service.lock().await.tip().await?;
let bitcoin_tip = self.bitcoin_chain_service.tip()?;
let liquid_tip = self.liquid_chain_service.tip().await?;
for swap in swaps.iter_mut() {
let swap_id = &swap.id();
@@ -353,12 +350,18 @@ impl Recoverer {
/// For a given [SwapList], this fetches the script histories from the chain services
async fn fetch_swaps_histories(&self, swaps_list: &SwapsList) -> Result<SwapsHistories> {
let swap_lbtc_scripts = swaps_list.get_swap_lbtc_scripts();
let t0 = std::time::Instant::now();
let lbtc_script_histories = self
.liquid_chain_service
.lock()
.await
.get_scripts_history(&swap_lbtc_scripts.iter().collect::<Vec<&LBtcScript>>())
.await?;
info!(
"Recoverer executed liquid get_scripts_history for {} scripts in {} milliseconds",
swap_lbtc_scripts.len(),
t0.elapsed().as_millis()
);
let lbtc_swap_scripts_len = swap_lbtc_scripts.len();
let lbtc_script_histories_len = lbtc_script_histories.len();
ensure!(
@@ -371,13 +374,23 @@ impl Recoverer {
.map(|(k, v)| (k, v.into_iter().map(HistoryTxId::from).collect()))
.collect();
let bitcoin_chain_service = self.bitcoin_chain_service.lock().await;
let swap_btc_script_bufs = swaps_list.get_swap_btc_scripts();
let swap_btc_scripts = swap_btc_script_bufs
.iter()
.map(|x| x.as_script())
.collect::<Vec<&lwk_wollet::bitcoin::Script>>();
let btc_script_histories = bitcoin_chain_service.get_scripts_history(&swap_btc_scripts)?;
let t0 = std::time::Instant::now();
let btc_script_histories = self
.bitcoin_chain_service
.get_scripts_history(&swap_btc_scripts)?;
info!(
"Recoverer executed bitcoin get_scripts_history for {} scripts in {} milliseconds",
swap_btc_scripts.len(),
t0.elapsed().as_millis()
);
let btx_script_tx_ids: Vec<lwk_wollet::bitcoin::Txid> = btc_script_histories
.iter()
.flatten()
@@ -398,8 +411,26 @@ impl Recoverer {
.map(|(k, v)| (k, v.iter().map(HistoryTxId::from).collect()))
.collect();
let btc_script_txs = bitcoin_chain_service.get_transactions(&btx_script_tx_ids)?;
let btc_script_balances = bitcoin_chain_service.scripts_get_balance(&swap_btc_scripts)?;
let t0 = std::time::Instant::now();
let btc_script_txs = self
.bitcoin_chain_service
.get_transactions(&btx_script_tx_ids)?;
info!(
"Recoverer executed bitcoin get_transactions for {} transactions in {} milliseconds",
btx_script_tx_ids.len(),
t0.elapsed().as_millis()
);
let t0 = std::time::Instant::now();
let btc_script_balances = self
.bitcoin_chain_service
.scripts_get_balance(&swap_btc_scripts)?;
info!(
"Recoverer executed bitcoin scripts_get_balance for {} scripts in {} milliseconds",
swap_btc_scripts.len(),
t0.elapsed().as_millis()
);
let btc_script_to_txs_map: HashMap<BtcScript, Vec<boltz_client::bitcoin::Transaction>> =
swap_btc_script_bufs
.clone()

View File

@@ -26,7 +26,7 @@ use sdk_common::input_parser::InputType;
use sdk_common::liquid::LiquidAddressData;
use sdk_common::prelude::{FiatAPI, FiatCurrency, LnUrlPayError, LnUrlWithdrawError, Rate};
use signer::SdkSigner;
use tokio::sync::{watch, Mutex, RwLock};
use tokio::sync::{watch, RwLock};
use tokio::time::MissedTickBehavior;
use tokio_stream::wrappers::BroadcastStream;
use x509_parser::parse_x509_certificate;
@@ -75,8 +75,8 @@ pub struct LiquidSdk {
pub(crate) status_stream: Arc<dyn SwapperStatusStream>,
pub(crate) swapper: Arc<dyn Swapper>,
pub(crate) recoverer: Arc<Recoverer>,
pub(crate) liquid_chain_service: Arc<Mutex<dyn LiquidChainService>>,
pub(crate) bitcoin_chain_service: Arc<Mutex<dyn BitcoinChainService>>,
pub(crate) liquid_chain_service: Arc<dyn LiquidChainService>,
pub(crate) bitcoin_chain_service: Arc<dyn BitcoinChainService>,
pub(crate) fiat_api: Arc<dyn FiatAPI>,
pub(crate) is_started: RwLock<bool>,
pub(crate) shutdown_sender: watch::Sender<()>,
@@ -176,10 +176,8 @@ impl LiquidSdk {
persister.init()?;
persister.replace_asset_metadata(config.asset_metadata.clone())?;
let liquid_chain_service =
Arc::new(Mutex::new(HybridLiquidChainService::new(config.clone())?));
let bitcoin_chain_service =
Arc::new(Mutex::new(HybridBitcoinChainService::new(config.clone())?));
let liquid_chain_service = Arc::new(HybridLiquidChainService::new(config.clone())?);
let bitcoin_chain_service = Arc::new(HybridBitcoinChainService::new(config.clone())?);
let onchain_wallet = Arc::new(LiquidOnchainWallet::new(
config.clone(),
@@ -353,8 +351,13 @@ impl LiquidSdk {
loop {
tokio::select! {
_ = interval.tick() => {
info!("Track blocks loop ticked");
// Get the Liquid tip and process a new block
let liquid_tip_res = cloned.liquid_chain_service.lock().await.tip().await;
let t0 = Instant::now();
let liquid_tip_res = cloned.liquid_chain_service.tip().await;
let duration_ms = Instant::now().duration_since(t0).as_millis();
info!("Fetched liquid tip at ({duration_ms} ms)");
let is_new_liquid_block = match &liquid_tip_res {
Ok(height) => {
debug!("Got Liquid tip: {height}");
@@ -368,7 +371,10 @@ impl LiquidSdk {
}
};
// Get the Bitcoin tip and process a new block
let bitcoin_tip_res = cloned.bitcoin_chain_service.lock().await.tip().map(|tip| tip.height as u32);
let t0 = Instant::now();
let bitcoin_tip_res = cloned.bitcoin_chain_service.tip().map(|tip| tip.height as u32);
let duration_ms = Instant::now().duration_since(t0).as_millis();
info!("Fetched bitcoin tip at ({duration_ms} ms)");
let is_new_bitcoin_block = match &bitcoin_tip_res {
Ok(height) => {
debug!("Got Bitcoin tip: {height}");
@@ -1348,8 +1354,7 @@ impl LiquidSdk {
"Built onchain L-BTC tx with receiver_amount_sat = {receiver_amount_sat}, fees_sat = {fees_sat} and txid = {tx_id}"
);
let liquid_chain_service = self.liquid_chain_service.lock().await;
let tx_id = liquid_chain_service.broadcast(&tx).await?.to_string();
let tx_id = self.liquid_chain_service.broadcast(&tx).await?.to_string();
// We insert a pseudo-tx in case LWK fails to pick up the new mempool tx for a while
// This makes the tx known to the SDK (get_info, list_payments) instantly
@@ -2363,8 +2368,6 @@ impl LiquidSdk {
.collect();
let scripts_balance = self
.bitcoin_chain_service
.lock()
.await
.scripts_get_balance(&lockup_scripts)?;
let mut refundables = vec![];
@@ -2460,6 +2463,7 @@ impl LiquidSdk {
/// (within last [CHAIN_SWAP_MONITORING_PERIOD_BITCOIN_BLOCKS] blocks = ~30 days), calling this
/// is not necessary as it happens automatically in the background.
pub async fn rescan_onchain_swaps(&self) -> SdkResult<()> {
let t0 = Instant::now();
let mut rescannable_swaps: Vec<Swap> = self
.persister
.list_chain_swaps()?
@@ -2469,6 +2473,7 @@ impl LiquidSdk {
self.recoverer
.recover_from_onchain(&mut rescannable_swaps)
.await?;
let scanned_len = rescannable_swaps.len();
for swap in rescannable_swaps {
let swap_id = &swap.id();
if let Swap::Chain(chain_swap) = swap {
@@ -2477,6 +2482,11 @@ impl LiquidSdk {
}
}
}
info!(
"Rescanned {} chain swaps in {} seconds",
scanned_len,
t0.elapsed().as_millis()
);
Ok(())
}
@@ -2570,8 +2580,8 @@ impl LiquidSdk {
.collect();
match partial_sync {
false => {
let bitcoin_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32;
let liquid_height = self.liquid_chain_service.lock().await.tip().await?;
let bitcoin_height = self.bitcoin_chain_service.tip()?.height as u32;
let liquid_height = self.liquid_chain_service.tip().await?;
let final_swap_states = [PaymentState::Complete, PaymentState::Failed];
let send_swaps = self
@@ -3282,12 +3292,7 @@ impl LiquidSdk {
/// Get the recommended BTC fees based on the configured mempool.space instance.
pub async fn recommended_fees(&self) -> Result<RecommendedFees, SdkError> {
Ok(self
.bitcoin_chain_service
.lock()
.await
.recommended_fees()
.await?)
Ok(self.bitcoin_chain_service.recommended_fees().await?)
}
/// Get the full default [Config] for specific [LiquidNetwork].
@@ -3373,7 +3378,6 @@ mod tests {
swaps::boltz::{ChainSwapStates, RevSwapStates, SubSwapStates},
};
use lwk_wollet::{elements::Txid, hashes::hex::DisplayHex};
use tokio::sync::Mutex;
use crate::chain_swap::ESTIMATED_BTC_LOCKUP_TX_VSIZE;
use crate::test_utils::chain_swap::{
@@ -3521,8 +3525,8 @@ mod tests {
create_persister!(persister);
let swapper = Arc::new(MockSwapper::default());
let status_stream = Arc::new(MockStatusStream::new());
let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new()));
let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new()));
let liquid_chain_service = Arc::new(MockLiquidChainService::new());
let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new());
let sdk = Arc::new(new_liquid_sdk_with_chain_services(
persister.clone(),
@@ -3569,15 +3573,12 @@ mod tests {
let height = (serde_json::to_string(&status).unwrap()
== serde_json::to_string(&RevSwapStates::TransactionConfirmed).unwrap())
as i32;
liquid_chain_service
.lock()
.await
.set_history(vec![MockHistory {
txid: mock_tx_id,
height,
block_hash: None,
block_timestamp: None,
}]);
liquid_chain_service.set_history(vec![MockHistory {
txid: mock_tx_id,
height,
block_hash: None,
block_timestamp: None,
}]);
let persisted_swap = trigger_swap_update!(
"receive",
@@ -3606,15 +3607,12 @@ mod tests {
let height = (serde_json::to_string(&status).unwrap()
== serde_json::to_string(&RevSwapStates::TransactionConfirmed).unwrap())
as i32;
liquid_chain_service
.lock()
.await
.set_history(vec![MockHistory {
txid: mock_tx_id,
height,
block_hash: None,
block_timestamp: None,
}]);
liquid_chain_service.set_history(vec![MockHistory {
txid: mock_tx_id,
height,
block_hash: None,
block_timestamp: None,
}]);
let persisted_swap = trigger_swap_update!(
"receive",
@@ -3699,8 +3697,8 @@ mod tests {
create_persister!(persister);
let swapper = Arc::new(MockSwapper::default());
let status_stream = Arc::new(MockStatusStream::new());
let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new()));
let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new()));
let liquid_chain_service = Arc::new(MockLiquidChainService::new());
let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new());
let sdk = Arc::new(new_liquid_sdk_with_chain_services(
persister.clone(),
@@ -3778,26 +3776,20 @@ mod tests {
if let Some(user_lockup_tx_id) = user_lockup_tx_id {
match direction {
Direction::Incoming => {
bitcoin_chain_service
.lock()
.await
.set_history(vec![MockHistory {
txid: Txid::from_str(user_lockup_tx_id).unwrap(),
height: 0,
block_hash: None,
block_timestamp: None,
}]);
bitcoin_chain_service.set_history(vec![MockHistory {
txid: Txid::from_str(user_lockup_tx_id).unwrap(),
height: 0,
block_hash: None,
block_timestamp: None,
}]);
}
Direction::Outgoing => {
liquid_chain_service
.lock()
.await
.set_history(vec![MockHistory {
txid: Txid::from_str(user_lockup_tx_id).unwrap(),
height: 0,
block_hash: None,
block_timestamp: None,
}]);
liquid_chain_service.set_history(vec![MockHistory {
txid: Txid::from_str(user_lockup_tx_id).unwrap(),
height: 0,
block_hash: None,
block_timestamp: None,
}]);
}
}
}
@@ -3831,19 +3823,13 @@ mod tests {
ChainSwapStates::TransactionConfirmed,
] {
if direction == Direction::Incoming {
bitcoin_chain_service
.lock()
.await
.set_history(vec![MockHistory {
txid: Txid::from_str(&mock_user_lockup_tx_id).unwrap(),
height: 0,
block_hash: None,
block_timestamp: None,
}]);
bitcoin_chain_service
.lock()
.await
.set_transactions(&[&mock_user_lockup_tx_hex]);
bitcoin_chain_service.set_history(vec![MockHistory {
txid: Txid::from_str(&mock_user_lockup_tx_id).unwrap(),
height: 0,
block_hash: None,
block_timestamp: None,
}]);
bitcoin_chain_service.set_transactions(&[&mock_user_lockup_tx_hex]);
}
let persisted_swap = trigger_swap_update!(
"chain",
@@ -3944,8 +3930,8 @@ mod tests {
create_persister!(persister);
let swapper = Arc::new(MockSwapper::new());
let status_stream = Arc::new(MockStatusStream::new());
let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new()));
let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new()));
let liquid_chain_service = Arc::new(MockLiquidChainService::new());
let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new());
let sdk = Arc::new(new_liquid_sdk_with_chain_services(
persister.clone(),
@@ -3968,10 +3954,7 @@ mod tests {
user_lockup_sat,
onchain_fee_increase_sat: fee_increase,
});
bitcoin_chain_service
.lock()
.await
.set_script_balance_sat(user_lockup_sat);
bitcoin_chain_service.set_script_balance_sat(user_lockup_sat);
let persisted_swap = trigger_swap_update!(
"chain",
NewSwapArgs::default()
@@ -4008,8 +3991,8 @@ mod tests {
create_persister!(persister);
let swapper = Arc::new(MockSwapper::new());
let status_stream = Arc::new(MockStatusStream::new());
let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new()));
let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new()));
let liquid_chain_service = Arc::new(MockLiquidChainService::new());
let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new());
let sdk = Arc::new(new_liquid_sdk_with_chain_services(
persister.clone(),
@@ -4038,10 +4021,7 @@ mod tests {
user_lockup_sat,
onchain_fee_increase_sat: fee_increase,
});
bitcoin_chain_service
.lock()
.await
.set_script_balance_sat(user_lockup_sat);
bitcoin_chain_service.set_script_balance_sat(user_lockup_sat);
let persisted_swap = trigger_swap_update!(
"chain",
NewSwapArgs::default()

View File

@@ -10,7 +10,7 @@ use log::{debug, error, info, warn};
use lwk_wollet::elements::{LockTime, Transaction};
use lwk_wollet::hashes::{sha256, Hash};
use sdk_common::prelude::{AesSuccessActionDataResult, SuccessAction, SuccessActionProcessed};
use tokio::sync::{broadcast, Mutex};
use tokio::sync::broadcast;
use crate::chain::liquid::LiquidChainService;
use crate::model::{
@@ -34,7 +34,7 @@ pub(crate) struct SendSwapHandler {
onchain_wallet: Arc<dyn OnchainWallet>,
persister: Arc<Persister>,
swapper: Arc<dyn Swapper>,
chain_service: Arc<Mutex<dyn LiquidChainService>>,
chain_service: Arc<dyn LiquidChainService>,
subscription_notifier: broadcast::Sender<String>,
}
@@ -55,7 +55,7 @@ impl SendSwapHandler {
onchain_wallet: Arc<dyn OnchainWallet>,
persister: Arc<Persister>,
swapper: Arc<dyn Swapper>,
chain_service: Arc<Mutex<dyn LiquidChainService>>,
chain_service: Arc<dyn LiquidChainService>,
) -> Self {
let (subscription_notifier, _) = broadcast::channel::<String>(30);
Self {
@@ -208,7 +208,7 @@ impl SendSwapHandler {
info!("Broadcasting lockup tx {lockup_tx_id} for Send swap {swap_id}",);
let broadcast_result = self.chain_service.lock().await.broadcast(&lockup_tx).await;
let broadcast_result = self.chain_service.broadcast(&lockup_tx).await;
if let Err(err) = broadcast_result {
debug!("Could not broadcast lockup tx for Send Swap {swap_id}: {err:?}");
@@ -384,8 +384,6 @@ impl SendSwapHandler {
// Get tx history of the swap script (lockup address)
let history: Vec<_> = self
.chain_service
.lock()
.await
.get_script_history(&swap_script_pk)
.await?;
@@ -405,8 +403,6 @@ impl SendSwapHandler {
let claim_tx_id = claim_tx_entry.txid;
let claim_tx = self
.chain_service
.lock()
.await
.get_transactions(&[claim_tx_id])
.await
.map_err(|e| anyhow!("Failed to fetch claim txs {claim_tx_id:?}: {e}"))?
@@ -445,13 +441,12 @@ impl SendSwapHandler {
let swap_script = swap.get_swap_script()?;
let refund_address = self.onchain_wallet.next_unused_address().await?.to_string();
let liquid_chain_service = self.chain_service.lock().await;
let script_pk = swap_script
.to_address(self.config.network.into())
.map_err(|e| anyhow!("Could not retrieve address from swap script: {e:?}"))?
.to_unconfidential()
.script_pubkey();
let utxos = liquid_chain_service.get_script_utxos(&script_pk).await?;
let utxos = self.chain_service.get_script_utxos(&script_pk).await?;
let SdkTransaction::Liquid(refund_tx) = self.swapper.create_refund_tx(
Swap::Send(swap.clone()),
&refund_address,
@@ -467,10 +462,7 @@ impl SendSwapHandler {
),
});
};
let refund_tx_id = liquid_chain_service
.broadcast(&refund_tx)
.await?
.to_string();
let refund_tx_id = self.chain_service.broadcast(&refund_tx).await?.to_string();
info!(
"Successfully broadcast refund for Send Swap {}, is_cooperative: {is_cooperative}",

View File

@@ -1,5 +1,7 @@
#![cfg(test)]
use std::sync::Mutex;
use anyhow::Result;
use async_trait::async_trait;
use boltz_client::{
@@ -47,7 +49,7 @@ impl From<MockHistory> for lwk_wollet::History {
#[derive(Default)]
pub(crate) struct MockLiquidChainService {
history: Vec<MockHistory>,
history: Mutex<Vec<MockHistory>>,
}
impl MockLiquidChainService {
@@ -55,15 +57,19 @@ impl MockLiquidChainService {
MockLiquidChainService::default()
}
pub(crate) fn set_history(&mut self, history: Vec<MockHistory>) -> &mut Self {
self.history = history;
pub(crate) fn set_history(&self, history: Vec<MockHistory>) -> &Self {
*self.history.lock().unwrap() = history;
self
}
pub(crate) fn get_history(&self) -> Vec<MockHistory> {
self.history.lock().unwrap().clone()
}
}
#[async_trait]
impl LiquidChainService for MockLiquidChainService {
async fn tip(&mut self) -> Result<u32> {
async fn tip(&self) -> Result<u32> {
Ok(0)
}
@@ -92,7 +98,7 @@ impl LiquidChainService for MockLiquidChainService {
&self,
_scripts: &ElementsScript,
) -> Result<Vec<lwk_wollet::History>> {
Ok(self.history.clone().into_iter().map(Into::into).collect())
Ok(self.get_history().into_iter().map(Into::into).collect())
}
async fn get_script_history_with_retry(
@@ -100,7 +106,7 @@ impl LiquidChainService for MockLiquidChainService {
_script: &ElementsScript,
_retries: u64,
) -> Result<Vec<lwk_wollet::History>> {
Ok(self.history.clone().into_iter().map(Into::into).collect())
Ok(self.get_history().into_iter().map(Into::into).collect())
}
async fn get_scripts_history(&self, _scripts: &[&ElementsScript]) -> Result<Vec<Vec<History>>> {
@@ -126,42 +132,42 @@ impl LiquidChainService for MockLiquidChainService {
}
pub(crate) struct MockBitcoinChainService {
history: Vec<MockHistory>,
txs: Vec<boltz_client::bitcoin::Transaction>,
script_balance_sat: u64,
history: Mutex<Vec<MockHistory>>,
txs: Mutex<Vec<boltz_client::bitcoin::Transaction>>,
script_balance_sat: Mutex<u64>,
}
impl MockBitcoinChainService {
pub(crate) fn new() -> Self {
MockBitcoinChainService {
history: vec![],
txs: vec![],
script_balance_sat: 0,
history: Mutex::new(vec![]),
txs: Mutex::new(vec![]),
script_balance_sat: Mutex::new(0),
}
}
pub(crate) fn set_history(&mut self, history: Vec<MockHistory>) -> &mut Self {
self.history = history;
pub(crate) fn set_history(&self, history: Vec<MockHistory>) -> &Self {
*self.history.lock().unwrap() = history;
self
}
pub(crate) fn set_transactions(&mut self, txs: &[&str]) -> &mut Self {
self.txs = txs
pub(crate) fn set_transactions(&self, txs: &[&str]) -> &Self {
*self.txs.lock().unwrap() = txs
.iter()
.map(|tx_hex| deserialize(&Vec::<u8>::from_hex(tx_hex).unwrap()).unwrap())
.collect();
self
}
pub(crate) fn set_script_balance_sat(&mut self, script_balance_sat: u64) -> &mut Self {
self.script_balance_sat = script_balance_sat;
pub(crate) fn set_script_balance_sat(&self, script_balance_sat: u64) -> &Self {
*self.script_balance_sat.lock().unwrap() = script_balance_sat;
self
}
}
#[async_trait]
impl BitcoinChainService for MockBitcoinChainService {
fn tip(&mut self) -> Result<HeaderNotification> {
fn tip(&self) -> Result<HeaderNotification> {
Ok(HeaderNotification {
height: 0,
header: genesis_block(lwk_wollet::bitcoin::Network::Testnet).header,
@@ -179,11 +185,18 @@ impl BitcoinChainService for MockBitcoinChainService {
&self,
_txids: &[boltz_client::bitcoin::Txid],
) -> Result<Vec<boltz_client::bitcoin::Transaction>> {
Ok(self.txs.clone())
Ok(self.txs.lock().unwrap().clone())
}
fn get_script_history(&self, _script: &Script) -> Result<Vec<lwk_wollet::History>> {
Ok(self.history.clone().into_iter().map(Into::into).collect())
Ok(self
.history
.lock()
.unwrap()
.clone()
.into_iter()
.map(Into::into)
.collect())
}
async fn get_script_history_with_retry(
@@ -191,7 +204,14 @@ impl BitcoinChainService for MockBitcoinChainService {
_script: &Script,
_retries: u64,
) -> Result<Vec<lwk_wollet::History>> {
Ok(self.history.clone().into_iter().map(Into::into).collect())
Ok(self
.history
.lock()
.unwrap()
.clone()
.into_iter()
.map(Into::into)
.collect())
}
fn get_scripts_history(&self, _scripts: &[&Script]) -> Result<Vec<Vec<History>>> {
@@ -227,7 +247,7 @@ impl BitcoinChainService for MockBitcoinChainService {
_retries: u64,
) -> Result<electrum_client::GetBalanceRes> {
Ok(GetBalanceRes {
confirmed: self.script_balance_sat,
confirmed: self.script_balance_sat.lock().unwrap().clone(),
unconfirmed: 0,
})
}

View File

@@ -6,8 +6,6 @@ use lazy_static::lazy_static;
use sdk_common::bitcoin::{consensus::deserialize, Transaction};
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::{
chain_swap::ChainSwapHandler,
model::{ChainSwap, Config, Direction, PaymentState, Signer},
@@ -34,8 +32,8 @@ pub(crate) fn new_chain_swap_handler(persister: Arc<Persister>) -> Result<ChainS
let signer: Arc<Box<dyn Signer>> = Arc::new(Box::new(MockSigner::new()?));
let onchain_wallet = Arc::new(MockWallet::new(signer)?);
let swapper = Arc::new(BoltzSwapper::new(config.clone(), None));
let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new()));
let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new()));
let liquid_chain_service = Arc::new(MockLiquidChainService::new());
let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new());
ChainSwapHandler::new(
config,

View File

@@ -3,8 +3,6 @@
use anyhow::Result;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::{
model::{Config, Signer},
persist::Persister,
@@ -22,7 +20,7 @@ pub(crate) fn new_receive_swap_handler(persister: Arc<Persister>) -> Result<Rece
let signer: Arc<Box<dyn Signer>> = Arc::new(Box::new(MockSigner::new()?));
let onchain_wallet = Arc::new(MockWallet::new(signer)?);
let swapper = Arc::new(MockSwapper::default());
let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new()));
let liquid_chain_service = Arc::new(MockLiquidChainService::new());
Ok(ReceiveSwapHandler::new(
config,

View File

@@ -1,7 +1,6 @@
use std::sync::Arc;
use anyhow::Result;
use tokio::sync::Mutex;
use crate::{
model::Signer, recover::recoverer::Recoverer, swapper::Swapper, wallet::OnchainWallet,
@@ -14,8 +13,8 @@ pub(crate) fn new_recoverer(
swapper: Arc<dyn Swapper>,
onchain_wallet: Arc<dyn OnchainWallet>,
) -> Result<Recoverer> {
let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new()));
let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new()));
let liquid_chain_service = Arc::new(MockLiquidChainService::new());
let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new());
Recoverer::new(
signer.slip77_master_blinding_key()?,

View File

@@ -4,7 +4,7 @@ use anyhow::{anyhow, Result};
use sdk_common::prelude::{BreezServer, STAGING_BREEZSERVER_URL};
use std::sync::Arc;
use tokio::sync::{watch, Mutex, RwLock};
use tokio::sync::{watch, RwLock};
use crate::{
buy::BuyBitcoinService,
@@ -31,8 +31,8 @@ pub(crate) fn new_liquid_sdk(
swapper: Arc<MockSwapper>,
status_stream: Arc<MockStatusStream>,
) -> Result<LiquidSdk> {
let liquid_chain_service = Arc::new(Mutex::new(MockLiquidChainService::new()));
let bitcoin_chain_service = Arc::new(Mutex::new(MockBitcoinChainService::new()));
let liquid_chain_service = Arc::new(MockLiquidChainService::new());
let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new());
new_liquid_sdk_with_chain_services(
persister,
@@ -48,8 +48,8 @@ pub(crate) fn new_liquid_sdk_with_chain_services(
persister: Arc<Persister>,
swapper: Arc<MockSwapper>,
status_stream: Arc<MockStatusStream>,
liquid_chain_service: Arc<Mutex<MockLiquidChainService>>,
bitcoin_chain_service: Arc<Mutex<MockBitcoinChainService>>,
liquid_chain_service: Arc<MockLiquidChainService>,
bitcoin_chain_service: Arc<MockBitcoinChainService>,
onchain_fee_rate_leeway_sat_per_vbyte: Option<u32>,
) -> Result<LiquidSdk> {
let mut config = Config::testnet(None);

View File

@@ -8,7 +8,6 @@ use crate::{
send_swap::SendSwapHandler,
};
use anyhow::Result;
use tokio::sync::Mutex;
use super::{
chain::MockLiquidChainService,
@@ -21,7 +20,7 @@ pub(crate) fn new_send_swap_handler(persister: Arc<Persister>) -> Result<SendSwa
let signer: Arc<Box<dyn Signer>> = Arc::new(Box::new(MockSigner::new()?));
let onchain_wallet = Arc::new(MockWallet::new(signer)?);
let swapper = Arc::new(MockSwapper::default());
let chain_service = Arc::new(Mutex::new(MockLiquidChainService::new()));
let chain_service = Arc::new(MockLiquidChainService::new());
Ok(SendSwapHandler::new(
config,