Use lowball node for broadcast and verify swapper lockup tx

This commit is contained in:
Roei Erez
2024-06-20 23:53:16 +03:00
parent bcb4743260
commit 1a454ece19
8 changed files with 264 additions and 45 deletions

View File

@@ -39,7 +39,7 @@ url = "2.5.0"
futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
async-trait = "0.1.80"
hex = "0.4"
reqwest = { version = "=0.11.20", features = ["json"] }
reqwest = { version = "=0.11.20", features = ["json", "blocking"] }
electrum-client = { version = "0.19.0" }
[dev-dependencies]

View File

@@ -0,0 +1,128 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use boltz_client::ToHex;
use log::info;
use lwk_wollet::{
elements::{pset::serialize::Serialize, BlockHash, Script, Transaction, Txid},
hashes::{sha256, Hash},
BlockchainBackend, ElectrumClient, ElectrumUrl, History,
};
use reqwest::Response;
use serde::Deserialize;
use std::str::FromStr;
use crate::model::Config;
const LIQUID_ESPLORA_URL: &str = "https://lq1.breez.technology/liquid/api";
#[async_trait]
pub trait LiquidChainService: Send + Sync {
/// Get the blockchain latest block
async fn tip(&mut self) -> Result<u32>;
/// Broadcast a transaction
async fn broadcast(&self, tx: &Transaction, swap_id: Option<&str>) -> Result<Txid>;
/// Get a list of transactions
async fn get_transactions(&self, txids: &[Txid]) -> Result<Vec<Transaction>>;
/// Get the transactions involved in a list of scripts including lowball
async fn get_script_history(&self, scripts: &Script) -> Result<Vec<History>>;
}
#[derive(Deserialize)]
struct EsploraTx {
txid: Txid,
status: Status,
}
// TODO some of this fields may be Option in unconfirmed
#[derive(Deserialize)]
struct Status {
block_height: Option<i32>,
block_hash: Option<BlockHash>,
}
pub(crate) struct HybridLiquidChainService {
electrum_client: ElectrumClient,
}
impl HybridLiquidChainService {
pub(crate) fn new(config: Config) -> Result<Self> {
let electrum_client =
ElectrumClient::new(&ElectrumUrl::new(&config.liquid_electrum_url, true, true))?;
Ok(Self { electrum_client })
}
}
#[async_trait]
impl LiquidChainService for HybridLiquidChainService {
async fn tip(&mut self) -> Result<u32> {
Ok(self.electrum_client.tip()?.height)
}
async fn broadcast(&self, tx: &Transaction, swap_id: Option<&str>) -> Result<Txid> {
let tx_bytes = tx.serialize();
info!("tx: {}", tx_bytes.to_hex());
let client = reqwest::Client::new();
let response = client
.post(format!("{LIQUID_ESPLORA_URL}/tx"))
.header("Swap-ID", swap_id.unwrap_or_default())
.body(tx_bytes.to_hex())
.send()
.await?;
let txid = Txid::from_str(&response.text().await?)?;
Ok(txid)
}
async fn get_transactions(&self, txids: &[Txid]) -> Result<Vec<Transaction>> {
Ok(self.electrum_client.get_transactions(txids)?)
}
async fn get_script_history(&self, script: &Script) -> Result<Vec<History>> {
let script = lwk_wollet::elements::bitcoin::Script::from_bytes(script.as_bytes());
let script_hash = sha256::Hash::hash(script.as_bytes())
.to_byte_array()
.to_hex();
let url = format!("{}/scripthash/{}/txs", LIQUID_ESPLORA_URL, script_hash);
// TODO must handle paging -> https://github.com/blockstream/esplora/blob/master/API.md#addresses
let response = get_with_retry(&url, 2).await?;
let json: Vec<EsploraTx> = response.json().await?;
let history: Vec<History> = json.into_iter().map(Into::into).collect();
Ok(history)
}
}
async fn get_with_retry(url: &str, retries: usize) -> Result<Response> {
let mut attempt = 0;
loop {
info!("liquid chain service get_with_retry for url {url}");
let response = reqwest::get(url).await?;
attempt += 1;
// 429 Too many requests
// 503 Service Temporarily Unavailable
if response.status() == 429 || response.status() == 503 {
if attempt >= retries {
return Err(anyhow!("Too many retry".to_string()));
}
let secs = 1 << attempt;
std::thread::sleep(std::time::Duration::from_secs(secs));
} else {
return Ok(response);
}
}
}
impl From<EsploraTx> for History {
fn from(value: EsploraTx) -> Self {
let status = value.status;
History {
txid: value.txid,
height: status.block_height.unwrap_or_default(),
block_hash: status.block_hash,
}
}
}

View File

@@ -1,6 +1,2 @@
pub(crate) mod bitcoin;
use lwk_wollet::{BlockchainBackend, ElectrumClient};
pub(crate) trait ChainService: Send + Sync + BlockchainBackend {}
impl ChainService for ElectrumClient {}
pub(crate) mod liquid;

View File

@@ -9,7 +9,7 @@ use lwk_wollet::ElectrumUrl;
use tokio::sync::{broadcast, Mutex};
use crate::chain::bitcoin::{BitcoinChainService, ElectrumClient};
use crate::chain::ChainService;
use crate::chain::liquid::LiquidChainService;
use crate::model::PaymentState::{Complete, Created, Failed, Pending, TimedOut};
use crate::model::{ChainSwap, Config, Direction, PaymentTxData, PaymentType};
use crate::swapper::Swapper;
@@ -20,7 +20,7 @@ pub(crate) struct ChainSwapStateHandler {
onchain_wallet: Arc<dyn OnchainWallet>,
persister: Arc<Persister>,
swapper: Arc<dyn Swapper>,
liquid_chain_service: Arc<Mutex<dyn ChainService>>,
liquid_chain_service: Arc<Mutex<dyn LiquidChainService>>,
bitcoin_chain_service: Arc<Mutex<dyn BitcoinChainService>>,
subscription_notifier: broadcast::Sender<String>,
}
@@ -31,7 +31,7 @@ impl ChainSwapStateHandler {
onchain_wallet: Arc<dyn OnchainWallet>,
persister: Arc<Persister>,
swapper: Arc<dyn Swapper>,
liquid_chain_service: Arc<Mutex<dyn ChainService>>,
liquid_chain_service: Arc<Mutex<dyn LiquidChainService>>,
) -> Result<Self> {
let (subscription_notifier, _) = broadcast::channel::<String>(30);
let bitcoin_chain_service = Arc::new(Mutex::new(ElectrumClient::new(&ElectrumUrl::new(
@@ -331,7 +331,8 @@ impl ChainSwapStateHandler {
.liquid_chain_service
.lock()
.await
.broadcast(&lockup_tx)?
.broadcast(&lockup_tx, Some(swap_id))
.await?
.to_string();
debug!(
@@ -444,7 +445,7 @@ impl ChainSwapStateHandler {
let current_height = match swap.direction {
Direction::Incoming => self.bitcoin_chain_service.lock().await.tip()?.height as u32,
Direction::Outgoing => self.liquid_chain_service.lock().await.tip()?.height,
Direction::Outgoing => self.liquid_chain_service.lock().await.tip().await?,
};
let output_address = self.onchain_wallet.next_unused_address().await?.to_string();

View File

@@ -2,10 +2,16 @@ use std::{str::FromStr, sync::Arc};
use anyhow::{anyhow, Result};
use boltz_client::swaps::boltz::RevSwapStates;
use boltz_client::swaps::boltzv2;
use boltz_client::swaps::boltzv2::{self, SwapUpdateTxDetails};
use boltz_client::ToHex;
use log::{debug, error, info, warn};
use tokio::sync::broadcast;
use lwk_wollet::elements::hex::FromHex;
use lwk_wollet::elements::Script;
use lwk_wollet::hashes::{sha256, Hash};
use lwk_wollet::History;
use tokio::sync::{broadcast, Mutex};
use crate::chain::liquid::LiquidChainService;
use crate::model::PaymentState::{Complete, Created, Failed, Pending, TimedOut};
use crate::model::{Config, PaymentTxData, PaymentType, ReceiveSwap};
use crate::{ensure_sdk, utils};
@@ -26,6 +32,7 @@ pub(crate) struct ReceiveSwapStateHandler {
persister: Arc<Persister>,
swapper: Arc<dyn Swapper>,
subscription_notifier: broadcast::Sender<String>,
liquid_chain_service: Arc<Mutex<dyn LiquidChainService>>,
}
impl ReceiveSwapStateHandler {
@@ -34,6 +41,7 @@ impl ReceiveSwapStateHandler {
onchain_wallet: Arc<dyn OnchainWallet>,
persister: Arc<Persister>,
swapper: Arc<dyn Swapper>,
liquid_chain_service: Arc<Mutex<dyn LiquidChainService>>,
) -> Self {
let (subscription_notifier, _) = broadcast::channel::<String>(30);
Self {
@@ -42,6 +50,7 @@ impl ReceiveSwapStateHandler {
persister,
swapper,
subscription_notifier,
liquid_chain_service,
}
}
@@ -88,6 +97,15 @@ impl ReceiveSwapStateHandler {
));
}
// looking for lockup script history to verify lockup was broadcasted
if let Err(e) = self.verify_lockup_tx(&receive_swap, &transaction).await {
return Err(anyhow!(
"swapper mempool reported lockup could not be verified. txid: {}, err: {}",
transaction.id,
e
));
}
info!("swapper lockup was verified");
let lockup_tx = utils::deserialize_tx_hex(&transaction.hex)?;
// If the amount is greater than the zero-conf limit
@@ -142,6 +160,19 @@ impl ReceiveSwapStateHandler {
// return Err(anyhow!("Tx state mismatch: Lockup transaction was marked as confirmed by the swapper, but isn't."));
// }
let Some(transaction) = update.transaction.clone() else {
return Err(anyhow!("Unexpected payload from Boltz status stream"));
};
// looking for lockup script history to verify lockup was broadcasted
if let Err(e) = self.verify_lockup_tx(&receive_swap, &transaction).await {
return Err(anyhow!(
"swapper reported lockup could not be verified. txid: {}, err: {}",
transaction.id,
e
));
}
info!("swapper lockup was verified, moving to claim");
match receive_swap.claim_tx_id {
Some(claim_tx_id) => {
warn!("Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}")
@@ -270,6 +301,78 @@ impl ReceiveSwapStateHandler {
(_, Failed) => Ok(()),
}
}
async fn verify_lockup_tx(
&self,
receive_swap: &ReceiveSwap,
swap_update_tx: &SwapUpdateTxDetails,
) -> Result<()> {
// looking for lockup script history to verify lockup was broadcasted
let script_history = self.lockup_script_history(receive_swap).await?;
let lockup_tx_history = script_history
.iter()
.find(|h| h.txid.to_hex().eq(&swap_update_tx.id));
if lockup_tx_history.is_none() {
return Err(anyhow!(
"swapper lockup wasn't found, reported txid={} waiting for confirmation",
swap_update_tx.id,
));
}
info!("swapper lockup found, verifying transaction content...");
let lockup_tx = utils::deserialize_tx_hex(&swap_update_tx.hex)?;
if !lockup_tx
.txid()
.to_hex()
.eq(&lockup_tx_history.unwrap().txid.to_hex())
{
return Err(anyhow!(
"swapper reported txid and transaction hex do not match: {} vs {}",
swap_update_tx.id,
lockup_tx.txid().to_hex()
));
}
Ok(())
}
async fn lockup_script_history(&self, receive_swap: &ReceiveSwap) -> Result<Vec<History>> {
let script = receive_swap.get_swap_script()?;
let address =
script
.to_address(self.config.network.into())
.map_err(|e| PaymentError::Generic {
err: format!("failed to get swap script address {e:?}"),
})?;
let sc = Script::from_hex(
hex::encode(address.to_unconfidential().script_pubkey().as_bytes()).as_str(),
)
.map_err(|e| PaymentError::Generic {
err: format!("failed to get swap script address {e:?}"),
})?;
let script_hash = sha256::Hash::hash(sc.as_bytes()).to_byte_array().to_hex();
info!("fetching script history for {}", script_hash);
let mut script_history = vec![];
let mut retries = 1;
while script_history.is_empty() && retries < 5 {
script_history = self
.liquid_chain_service
.lock()
.await
.get_script_history(&sc)
.await?;
info!(
"script history for {} got zero transactions, retrying in {} seconds...",
script_hash, retries
);
std::thread::sleep(std::time::Duration::from_secs(retries));
retries += 1;
}
Ok(script_history)
}
}
#[cfg(test)]

View File

@@ -13,7 +13,6 @@ use lwk_wollet::bitcoin::hex::DisplayHex;
use lwk_wollet::hashes::{sha256, Hash};
use lwk_wollet::secp256k1::ThirtyTwoByteHash;
use lwk_wollet::{elements::LockTime, ElementsNetwork};
use lwk_wollet::{ElectrumClient, ElectrumUrl};
use sdk_common::bitcoin::secp256k1::Secp256k1;
use sdk_common::bitcoin::util::bip32::ChildNumber;
use tokio::sync::{watch, Mutex, RwLock};
@@ -21,7 +20,7 @@ use tokio::time::MissedTickBehavior;
use tokio_stream::wrappers::BroadcastStream;
use url::Url;
use crate::chain::ChainService;
use crate::chain::liquid::{HybridLiquidChainService, LiquidChainService};
use crate::chain_swap::ChainSwapStateHandler;
use crate::error::LiquidSdkError;
use crate::model::PaymentState::*;
@@ -46,7 +45,7 @@ pub struct LiquidSdk {
event_manager: Arc<EventManager>,
status_stream: Arc<dyn SwapperStatusStream>,
swapper: Arc<dyn Swapper>,
chain_service: Arc<Mutex<dyn ChainService>>,
chain_service: Arc<Mutex<dyn LiquidChainService>>,
is_started: RwLock<bool>,
shutdown_sender: watch::Sender<()>,
shutdown_receiver: watch::Receiver<()>,
@@ -76,11 +75,7 @@ impl LiquidSdk {
let swapper = Arc::new(BoltzSwapper::new(config.clone()));
let status_stream = Arc::<dyn SwapperStatusStream>::from(swapper.create_status_stream());
let chain_service = Arc::new(Mutex::new(ElectrumClient::new(&ElectrumUrl::new(
&config.liquid_electrum_url,
true,
true,
))?));
let chain_service = Arc::new(Mutex::new(HybridLiquidChainService::new(config.clone())?));
let onchain_wallet = Arc::new(LiquidOnchainWallet::new(mnemonic, config.clone())?);
@@ -97,6 +92,7 @@ impl LiquidSdk {
onchain_wallet.clone(),
persister.clone(),
swapper.clone(),
chain_service.clone(),
);
let chain_swap_state_handler = ChainSwapStateHandler::new(
@@ -303,7 +299,7 @@ impl LiquidSdk {
match chain_swap.direction {
Direction::Outgoing => {
let swap_script = chain_swap.get_lockup_swap_script()?.as_liquid_script()?;
let current_height = self.chain_service.lock().await.tip()?.height;
let current_height = self.chain_service.lock().await.tip().await?;
let locktime_from_height = LockTime::from_height(current_height)?;
info!("Checking Chain Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", chain_swap.id, swap_script.locktime);
@@ -331,7 +327,7 @@ impl LiquidSdk {
async fn check_send_swap_expiration(&self, send_swap: &SendSwap) -> Result<()> {
if send_swap.lockup_tx_id.is_some() && send_swap.refund_tx_id.is_none() {
let swap_script = send_swap.get_swap_script()?;
let current_height = self.chain_service.lock().await.tip()?.height;
let current_height = self.chain_service.lock().await.tip().await?;
let locktime_from_height = LockTime::from_height(current_height)?;
info!("Checking Send Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", send_swap.id, swap_script.locktime);
@@ -569,7 +565,7 @@ impl LiquidSdk {
async fn estimate_onchain_tx_fee(&self, amount_sat: u64, address: &str) -> Result<u64> {
Ok(self
.onchain_wallet
.build_tx(None, address, amount_sat)
.build_tx(Some(10.0), address, amount_sat)
.await?
.all_fees()
.values()

View File

@@ -11,7 +11,7 @@ use lwk_wollet::elements::Transaction;
use lwk_wollet::hashes::{sha256, Hash};
use tokio::sync::{broadcast, Mutex};
use crate::chain::ChainService;
use crate::chain::liquid::LiquidChainService;
use crate::model::PaymentState::{Complete, Created, Failed, Pending, TimedOut};
use crate::model::{Config, SendSwap};
use crate::swapper::Swapper;
@@ -28,7 +28,7 @@ pub(crate) struct SendSwapStateHandler {
onchain_wallet: Arc<dyn OnchainWallet>,
persister: Arc<Persister>,
swapper: Arc<dyn Swapper>,
chain_service: Arc<Mutex<dyn ChainService>>,
chain_service: Arc<Mutex<dyn LiquidChainService>>,
subscription_notifier: broadcast::Sender<String>,
}
@@ -38,7 +38,7 @@ impl SendSwapStateHandler {
onchain_wallet: Arc<dyn OnchainWallet>,
persister: Arc<Persister>,
swapper: Arc<dyn Swapper>,
chain_service: Arc<Mutex<dyn ChainService>>,
chain_service: Arc<Mutex<dyn LiquidChainService>>,
) -> Self {
let (subscription_notifier, _) = broadcast::channel::<String>(30);
Self {
@@ -196,20 +196,22 @@ impl SendSwapStateHandler {
let lockup_tx = self
.onchain_wallet
.build_tx(
None,
Some(10.0),
&create_response.address,
create_response.expected_amount,
)
.await?;
info!("broadcasting lockup tx {}", lockup_tx.txid());
let lockup_tx_id = self
.chain_service
.lock()
.await
.broadcast(&lockup_tx)?
.broadcast(&lockup_tx, Some(swap_id))
.await?
.to_string();
debug!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}");
info!("Successfully broadcast lockup tx for Send Swap {swap_id}. Lockup tx id: {lockup_tx_id}");
Ok(lockup_tx)
}
@@ -285,10 +287,8 @@ impl SendSwapStateHandler {
.chain_service
.lock()
.await
.get_scripts_history(&[&swap_script_pk])?
.into_iter()
.flatten()
.collect();
.get_script_history(&swap_script_pk)
.await?;
// We expect at most 2 txs: lockup and maybe the claim
ensure_sdk!(
@@ -311,6 +311,7 @@ impl SendSwapStateHandler {
.lock()
.await
.get_transactions(&[claim_tx_id])
.await
.map_err(|e| anyhow!("Failed to fetch claim tx {claim_tx_id}: {e}"))?
.first()
.cloned()

View File

@@ -2,6 +2,7 @@
use std::sync::Arc;
use crate::{
chain::liquid::HybridLiquidChainService,
chain_swap::ChainSwapStateHandler,
model::{
ChainSwap, Config, Direction, LiquidNetwork, PaymentState, PaymentTxData, PaymentType,
@@ -17,7 +18,6 @@ use crate::{
use anyhow::{anyhow, Result};
use bip39::rand::{self, distributions::Alphanumeric, Rng};
use lwk_wollet::{ElectrumClient, ElectrumUrl};
use tempdir::TempDir;
use tokio::sync::Mutex;
@@ -30,11 +30,7 @@ pub(crate) fn new_send_swap_state_handler(
let config = Config::testnet();
let onchain_wallet = Arc::new(new_onchain_wallet(&config)?);
let swapper = Arc::new(BoltzSwapper::new(config.clone()));
let chain_service = Arc::new(Mutex::new(ElectrumClient::new(&ElectrumUrl::new(
&config.liquid_electrum_url,
true,
true,
))?));
let chain_service = Arc::new(Mutex::new(HybridLiquidChainService::new(config.clone())?));
Ok(SendSwapStateHandler::new(
config,
@@ -51,12 +47,14 @@ pub(crate) fn new_receive_swap_state_handler(
let config = Config::testnet();
let onchain_wallet = Arc::new(new_onchain_wallet(&config)?);
let swapper = Arc::new(BoltzSwapper::new(config.clone()));
let liquid_chain_service = Arc::new(Mutex::new(HybridLiquidChainService::new(config.clone())?));
Ok(ReceiveSwapStateHandler::new(
config,
onchain_wallet,
persister,
swapper,
liquid_chain_service,
))
}
@@ -66,11 +64,7 @@ pub(crate) fn new_chain_swap_state_handler(
let config = Config::testnet();
let onchain_wallet = Arc::new(new_onchain_wallet(&config)?);
let swapper = Arc::new(BoltzSwapper::new(config.clone()));
let liquid_chain_service = Arc::new(Mutex::new(ElectrumClient::new(&ElectrumUrl::new(
&config.liquid_electrum_url,
true,
true,
))?));
let liquid_chain_service = Arc::new(Mutex::new(HybridLiquidChainService::new(config.clone())?));
ChainSwapStateHandler::new(
config,