diff --git a/lib/core/Cargo.toml b/lib/core/Cargo.toml index 216744e..766cbc4 100644 --- a/lib/core/Cargo.toml +++ b/lib/core/Cargo.toml @@ -39,7 +39,7 @@ url = "2.5.0" futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] } async-trait = "0.1.80" hex = "0.4" -reqwest = { version = "=0.11.20", features = ["json"] } +reqwest = { version = "=0.11.20", features = ["json", "blocking"] } electrum-client = { version = "0.19.0" } [dev-dependencies] diff --git a/lib/core/src/chain/liquid.rs b/lib/core/src/chain/liquid.rs new file mode 100644 index 0000000..3bb041f --- /dev/null +++ b/lib/core/src/chain/liquid.rs @@ -0,0 +1,128 @@ +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use boltz_client::ToHex; +use log::info; +use lwk_wollet::{ + elements::{pset::serialize::Serialize, BlockHash, Script, Transaction, Txid}, + hashes::{sha256, Hash}, + BlockchainBackend, ElectrumClient, ElectrumUrl, History, +}; +use reqwest::Response; +use serde::Deserialize; +use std::str::FromStr; + +use crate::model::Config; + +const LIQUID_ESPLORA_URL: &str = "https://lq1.breez.technology/liquid/api"; + +#[async_trait] +pub trait LiquidChainService: Send + Sync { + /// Get the blockchain latest block + async fn tip(&mut self) -> Result; + + /// Broadcast a transaction + async fn broadcast(&self, tx: &Transaction, swap_id: Option<&str>) -> Result; + + /// Get a list of transactions + async fn get_transactions(&self, txids: &[Txid]) -> Result>; + + /// Get the transactions involved in a list of scripts including lowball + async fn get_script_history(&self, scripts: &Script) -> Result>; +} + +#[derive(Deserialize)] +struct EsploraTx { + txid: Txid, + status: Status, +} + +// TODO some of this fields may be Option in unconfirmed + +#[derive(Deserialize)] +struct Status { + block_height: Option, + block_hash: Option, +} + +pub(crate) struct HybridLiquidChainService { + electrum_client: ElectrumClient, +} + +impl HybridLiquidChainService { + pub(crate) fn new(config: Config) -> Result { + let electrum_client = + ElectrumClient::new(&ElectrumUrl::new(&config.liquid_electrum_url, true, true))?; + Ok(Self { electrum_client }) + } +} + +#[async_trait] +impl LiquidChainService for HybridLiquidChainService { + async fn tip(&mut self) -> Result { + Ok(self.electrum_client.tip()?.height) + } + + async fn broadcast(&self, tx: &Transaction, swap_id: Option<&str>) -> Result { + let tx_bytes = tx.serialize(); + info!("tx: {}", tx_bytes.to_hex()); + let client = reqwest::Client::new(); + let response = client + .post(format!("{LIQUID_ESPLORA_URL}/tx")) + .header("Swap-ID", swap_id.unwrap_or_default()) + .body(tx_bytes.to_hex()) + .send() + .await?; + let txid = Txid::from_str(&response.text().await?)?; + Ok(txid) + } + + async fn get_transactions(&self, txids: &[Txid]) -> Result> { + Ok(self.electrum_client.get_transactions(txids)?) + } + + async fn get_script_history(&self, script: &Script) -> Result> { + let script = lwk_wollet::elements::bitcoin::Script::from_bytes(script.as_bytes()); + let script_hash = sha256::Hash::hash(script.as_bytes()) + .to_byte_array() + .to_hex(); + let url = format!("{}/scripthash/{}/txs", LIQUID_ESPLORA_URL, script_hash); + // TODO must handle paging -> https://github.com/blockstream/esplora/blob/master/API.md#addresses + let response = get_with_retry(&url, 2).await?; + let json: Vec = response.json().await?; + + let history: Vec = json.into_iter().map(Into::into).collect(); + Ok(history) + } +} + +async fn get_with_retry(url: &str, retries: usize) -> Result { + let mut attempt = 0; + loop { + info!("liquid chain service get_with_retry for url {url}"); + let response = reqwest::get(url).await?; + attempt += 1; + // 429 Too many requests + // 503 Service Temporarily Unavailable + if response.status() == 429 || response.status() == 503 { + if attempt >= retries { + return Err(anyhow!("Too many retry".to_string())); + } + let secs = 1 << attempt; + + std::thread::sleep(std::time::Duration::from_secs(secs)); + } else { + return Ok(response); + } + } +} + +impl From for History { + fn from(value: EsploraTx) -> Self { + let status = value.status; + History { + txid: value.txid, + height: status.block_height.unwrap_or_default(), + block_hash: status.block_hash, + } + } +} diff --git a/lib/core/src/chain/mod.rs b/lib/core/src/chain/mod.rs index c95281b..9845a46 100644 --- a/lib/core/src/chain/mod.rs +++ b/lib/core/src/chain/mod.rs @@ -1,6 +1,2 @@ pub(crate) mod bitcoin; - -use lwk_wollet::{BlockchainBackend, ElectrumClient}; - -pub(crate) trait ChainService: Send + Sync + BlockchainBackend {} -impl ChainService for ElectrumClient {} +pub(crate) mod liquid; diff --git a/lib/core/src/chain_swap.rs b/lib/core/src/chain_swap.rs index 36ad7d4..50a454d 100644 --- a/lib/core/src/chain_swap.rs +++ b/lib/core/src/chain_swap.rs @@ -9,7 +9,7 @@ use lwk_wollet::ElectrumUrl; use tokio::sync::{broadcast, Mutex}; use crate::chain::bitcoin::{BitcoinChainService, ElectrumClient}; -use crate::chain::ChainService; +use crate::chain::liquid::LiquidChainService; use crate::model::PaymentState::{Complete, Created, Failed, Pending, TimedOut}; use crate::model::{ChainSwap, Config, Direction, PaymentTxData, PaymentType}; use crate::swapper::Swapper; @@ -20,7 +20,7 @@ pub(crate) struct ChainSwapStateHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, - liquid_chain_service: Arc>, + liquid_chain_service: Arc>, bitcoin_chain_service: Arc>, subscription_notifier: broadcast::Sender, } @@ -31,7 +31,7 @@ impl ChainSwapStateHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, - liquid_chain_service: Arc>, + liquid_chain_service: Arc>, ) -> Result { let (subscription_notifier, _) = broadcast::channel::(30); let bitcoin_chain_service = Arc::new(Mutex::new(ElectrumClient::new(&ElectrumUrl::new( @@ -331,7 +331,8 @@ impl ChainSwapStateHandler { .liquid_chain_service .lock() .await - .broadcast(&lockup_tx)? + .broadcast(&lockup_tx, Some(swap_id)) + .await? .to_string(); debug!( @@ -444,7 +445,7 @@ impl ChainSwapStateHandler { let current_height = match swap.direction { Direction::Incoming => self.bitcoin_chain_service.lock().await.tip()?.height as u32, - Direction::Outgoing => self.liquid_chain_service.lock().await.tip()?.height, + Direction::Outgoing => self.liquid_chain_service.lock().await.tip().await?, }; let output_address = self.onchain_wallet.next_unused_address().await?.to_string(); diff --git a/lib/core/src/receive_swap.rs b/lib/core/src/receive_swap.rs index 5927b5a..f642a1e 100644 --- a/lib/core/src/receive_swap.rs +++ b/lib/core/src/receive_swap.rs @@ -2,10 +2,16 @@ use std::{str::FromStr, sync::Arc}; use anyhow::{anyhow, Result}; use boltz_client::swaps::boltz::RevSwapStates; -use boltz_client::swaps::boltzv2; +use boltz_client::swaps::boltzv2::{self, SwapUpdateTxDetails}; +use boltz_client::ToHex; use log::{debug, error, info, warn}; -use tokio::sync::broadcast; +use lwk_wollet::elements::hex::FromHex; +use lwk_wollet::elements::Script; +use lwk_wollet::hashes::{sha256, Hash}; +use lwk_wollet::History; +use tokio::sync::{broadcast, Mutex}; +use crate::chain::liquid::LiquidChainService; use crate::model::PaymentState::{Complete, Created, Failed, Pending, TimedOut}; use crate::model::{Config, PaymentTxData, PaymentType, ReceiveSwap}; use crate::{ensure_sdk, utils}; @@ -26,6 +32,7 @@ pub(crate) struct ReceiveSwapStateHandler { persister: Arc, swapper: Arc, subscription_notifier: broadcast::Sender, + liquid_chain_service: Arc>, } impl ReceiveSwapStateHandler { @@ -34,6 +41,7 @@ impl ReceiveSwapStateHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, + liquid_chain_service: Arc>, ) -> Self { let (subscription_notifier, _) = broadcast::channel::(30); Self { @@ -42,6 +50,7 @@ impl ReceiveSwapStateHandler { persister, swapper, subscription_notifier, + liquid_chain_service, } } @@ -88,6 +97,15 @@ impl ReceiveSwapStateHandler { )); } + // looking for lockup script history to verify lockup was broadcasted + if let Err(e) = self.verify_lockup_tx(&receive_swap, &transaction).await { + return Err(anyhow!( + "swapper mempool reported lockup could not be verified. txid: {}, err: {}", + transaction.id, + e + )); + } + info!("swapper lockup was verified"); let lockup_tx = utils::deserialize_tx_hex(&transaction.hex)?; // If the amount is greater than the zero-conf limit @@ -142,6 +160,19 @@ impl ReceiveSwapStateHandler { // return Err(anyhow!("Tx state mismatch: Lockup transaction was marked as confirmed by the swapper, but isn't.")); // } + let Some(transaction) = update.transaction.clone() else { + return Err(anyhow!("Unexpected payload from Boltz status stream")); + }; + // looking for lockup script history to verify lockup was broadcasted + if let Err(e) = self.verify_lockup_tx(&receive_swap, &transaction).await { + return Err(anyhow!( + "swapper reported lockup could not be verified. txid: {}, err: {}", + transaction.id, + e + )); + } + info!("swapper lockup was verified, moving to claim"); + match receive_swap.claim_tx_id { Some(claim_tx_id) => { warn!("Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}") @@ -270,6 +301,78 @@ impl ReceiveSwapStateHandler { (_, Failed) => Ok(()), } } + + async fn verify_lockup_tx( + &self, + receive_swap: &ReceiveSwap, + swap_update_tx: &SwapUpdateTxDetails, + ) -> Result<()> { + // looking for lockup script history to verify lockup was broadcasted + let script_history = self.lockup_script_history(receive_swap).await?; + let lockup_tx_history = script_history + .iter() + .find(|h| h.txid.to_hex().eq(&swap_update_tx.id)); + + if lockup_tx_history.is_none() { + return Err(anyhow!( + "swapper lockup wasn't found, reported txid={} waiting for confirmation", + swap_update_tx.id, + )); + } + + info!("swapper lockup found, verifying transaction content..."); + + let lockup_tx = utils::deserialize_tx_hex(&swap_update_tx.hex)?; + if !lockup_tx + .txid() + .to_hex() + .eq(&lockup_tx_history.unwrap().txid.to_hex()) + { + return Err(anyhow!( + "swapper reported txid and transaction hex do not match: {} vs {}", + swap_update_tx.id, + lockup_tx.txid().to_hex() + )); + } + Ok(()) + } + + async fn lockup_script_history(&self, receive_swap: &ReceiveSwap) -> Result> { + let script = receive_swap.get_swap_script()?; + let address = + script + .to_address(self.config.network.into()) + .map_err(|e| PaymentError::Generic { + err: format!("failed to get swap script address {e:?}"), + })?; + let sc = Script::from_hex( + hex::encode(address.to_unconfidential().script_pubkey().as_bytes()).as_str(), + ) + .map_err(|e| PaymentError::Generic { + err: format!("failed to get swap script address {e:?}"), + })?; + + let script_hash = sha256::Hash::hash(sc.as_bytes()).to_byte_array().to_hex(); + info!("fetching script history for {}", script_hash); + let mut script_history = vec![]; + + let mut retries = 1; + while script_history.is_empty() && retries < 5 { + script_history = self + .liquid_chain_service + .lock() + .await + .get_script_history(&sc) + .await?; + info!( + "script history for {} got zero transactions, retrying in {} seconds...", + script_hash, retries + ); + std::thread::sleep(std::time::Duration::from_secs(retries)); + retries += 1; + } + Ok(script_history) + } } #[cfg(test)] diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 7da0f74..0d19de3 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -13,7 +13,6 @@ use lwk_wollet::bitcoin::hex::DisplayHex; use lwk_wollet::hashes::{sha256, Hash}; use lwk_wollet::secp256k1::ThirtyTwoByteHash; use lwk_wollet::{elements::LockTime, ElementsNetwork}; -use lwk_wollet::{ElectrumClient, ElectrumUrl}; use sdk_common::bitcoin::secp256k1::Secp256k1; use sdk_common::bitcoin::util::bip32::ChildNumber; use tokio::sync::{watch, Mutex, RwLock}; @@ -21,7 +20,7 @@ use tokio::time::MissedTickBehavior; use tokio_stream::wrappers::BroadcastStream; use url::Url; -use crate::chain::ChainService; +use crate::chain::liquid::{HybridLiquidChainService, LiquidChainService}; use crate::chain_swap::ChainSwapStateHandler; use crate::error::LiquidSdkError; use crate::model::PaymentState::*; @@ -46,7 +45,7 @@ pub struct LiquidSdk { event_manager: Arc, status_stream: Arc, swapper: Arc, - chain_service: Arc>, + chain_service: Arc>, is_started: RwLock, shutdown_sender: watch::Sender<()>, shutdown_receiver: watch::Receiver<()>, @@ -76,11 +75,7 @@ impl LiquidSdk { let swapper = Arc::new(BoltzSwapper::new(config.clone())); let status_stream = Arc::::from(swapper.create_status_stream()); - let chain_service = Arc::new(Mutex::new(ElectrumClient::new(&ElectrumUrl::new( - &config.liquid_electrum_url, - true, - true, - ))?)); + let chain_service = Arc::new(Mutex::new(HybridLiquidChainService::new(config.clone())?)); let onchain_wallet = Arc::new(LiquidOnchainWallet::new(mnemonic, config.clone())?); @@ -97,6 +92,7 @@ impl LiquidSdk { onchain_wallet.clone(), persister.clone(), swapper.clone(), + chain_service.clone(), ); let chain_swap_state_handler = ChainSwapStateHandler::new( @@ -303,7 +299,7 @@ impl LiquidSdk { match chain_swap.direction { Direction::Outgoing => { let swap_script = chain_swap.get_lockup_swap_script()?.as_liquid_script()?; - let current_height = self.chain_service.lock().await.tip()?.height; + let current_height = self.chain_service.lock().await.tip().await?; let locktime_from_height = LockTime::from_height(current_height)?; info!("Checking Chain Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", chain_swap.id, swap_script.locktime); @@ -331,7 +327,7 @@ impl LiquidSdk { async fn check_send_swap_expiration(&self, send_swap: &SendSwap) -> Result<()> { if send_swap.lockup_tx_id.is_some() && send_swap.refund_tx_id.is_none() { let swap_script = send_swap.get_swap_script()?; - let current_height = self.chain_service.lock().await.tip()?.height; + let current_height = self.chain_service.lock().await.tip().await?; let locktime_from_height = LockTime::from_height(current_height)?; info!("Checking Send Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", send_swap.id, swap_script.locktime); @@ -569,7 +565,7 @@ impl LiquidSdk { async fn estimate_onchain_tx_fee(&self, amount_sat: u64, address: &str) -> Result { Ok(self .onchain_wallet - .build_tx(None, address, amount_sat) + .build_tx(Some(10.0), address, amount_sat) .await? .all_fees() .values() diff --git a/lib/core/src/send_swap.rs b/lib/core/src/send_swap.rs index 134d013..f1c7ae5 100644 --- a/lib/core/src/send_swap.rs +++ b/lib/core/src/send_swap.rs @@ -11,7 +11,7 @@ use lwk_wollet::elements::Transaction; use lwk_wollet::hashes::{sha256, Hash}; use tokio::sync::{broadcast, Mutex}; -use crate::chain::ChainService; +use crate::chain::liquid::LiquidChainService; use crate::model::PaymentState::{Complete, Created, Failed, Pending, TimedOut}; use crate::model::{Config, SendSwap}; use crate::swapper::Swapper; @@ -28,7 +28,7 @@ pub(crate) struct SendSwapStateHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, - chain_service: Arc>, + chain_service: Arc>, subscription_notifier: broadcast::Sender, } @@ -38,7 +38,7 @@ impl SendSwapStateHandler { onchain_wallet: Arc, persister: Arc, swapper: Arc, - chain_service: Arc>, + chain_service: Arc>, ) -> Self { let (subscription_notifier, _) = broadcast::channel::(30); Self { @@ -196,20 +196,22 @@ impl SendSwapStateHandler { let lockup_tx = self .onchain_wallet .build_tx( - None, + Some(10.0), &create_response.address, create_response.expected_amount, ) .await?; + info!("broadcasting lockup tx {}", lockup_tx.txid()); let lockup_tx_id = self .chain_service .lock() .await - .broadcast(&lockup_tx)? + .broadcast(&lockup_tx, Some(swap_id)) + .await? .to_string(); - debug!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}"); + info!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}"); Ok(lockup_tx) } @@ -285,10 +287,8 @@ impl SendSwapStateHandler { .chain_service .lock() .await - .get_scripts_history(&[&swap_script_pk])? - .into_iter() - .flatten() - .collect(); + .get_script_history(&swap_script_pk) + .await?; // We expect at most 2 txs: lockup and maybe the claim ensure_sdk!( @@ -311,6 +311,7 @@ impl SendSwapStateHandler { .lock() .await .get_transactions(&[claim_tx_id]) + .await .map_err(|e| anyhow!("Failed to fetch claim tx {claim_tx_id}: {e}"))? .first() .cloned() diff --git a/lib/core/src/test_utils.rs b/lib/core/src/test_utils.rs index 6909d83..579f968 100644 --- a/lib/core/src/test_utils.rs +++ b/lib/core/src/test_utils.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use crate::{ + chain::liquid::HybridLiquidChainService, chain_swap::ChainSwapStateHandler, model::{ ChainSwap, Config, Direction, LiquidNetwork, PaymentState, PaymentTxData, PaymentType, @@ -17,7 +18,6 @@ use crate::{ use anyhow::{anyhow, Result}; use bip39::rand::{self, distributions::Alphanumeric, Rng}; -use lwk_wollet::{ElectrumClient, ElectrumUrl}; use tempdir::TempDir; use tokio::sync::Mutex; @@ -30,11 +30,7 @@ pub(crate) fn new_send_swap_state_handler( let config = Config::testnet(); let onchain_wallet = Arc::new(new_onchain_wallet(&config)?); let swapper = Arc::new(BoltzSwapper::new(config.clone())); - let chain_service = Arc::new(Mutex::new(ElectrumClient::new(&ElectrumUrl::new( - &config.liquid_electrum_url, - true, - true, - ))?)); + let chain_service = Arc::new(Mutex::new(HybridLiquidChainService::new(config.clone())?)); Ok(SendSwapStateHandler::new( config, @@ -51,12 +47,14 @@ pub(crate) fn new_receive_swap_state_handler( let config = Config::testnet(); let onchain_wallet = Arc::new(new_onchain_wallet(&config)?); let swapper = Arc::new(BoltzSwapper::new(config.clone())); + let liquid_chain_service = Arc::new(Mutex::new(HybridLiquidChainService::new(config.clone())?)); Ok(ReceiveSwapStateHandler::new( config, onchain_wallet, persister, swapper, + liquid_chain_service, )) } @@ -66,11 +64,7 @@ pub(crate) fn new_chain_swap_state_handler( let config = Config::testnet(); let onchain_wallet = Arc::new(new_onchain_wallet(&config)?); let swapper = Arc::new(BoltzSwapper::new(config.clone())); - let liquid_chain_service = Arc::new(Mutex::new(ElectrumClient::new(&ElectrumUrl::new( - &config.liquid_electrum_url, - true, - true, - ))?)); + let liquid_chain_service = Arc::new(Mutex::new(HybridLiquidChainService::new(config.clone())?)); ChainSwapStateHandler::new( config,