WASM: Update boltz client dependency and use included wasm tokio-tungstenite (#769)

* Update boltz client dependency and use included wasm tokio-tungstenite

* Avoid unnecessary swapper init

* Update boltz client rev after merge
This commit is contained in:
Daniel Granhão
2025-03-15 10:29:35 +00:00
committed by GitHub
parent 97a8039b76
commit 7bd09aea5d
16 changed files with 1369 additions and 1069 deletions

509
cli/Cargo.lock generated

File diff suppressed because it is too large Load Diff

1425
lib/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -20,6 +20,7 @@ workspace = true
[dependencies]
anyhow = { workspace = true }
bip39 = "2.0.0"
boltz-client = { git = "https://github.com/SatoshiPortal/boltz-rust", rev = "f78e159fe72e1c357e7830bc08d2b9e42a65362c", features = ["electrum"] }
chrono = "0.4"
derivative = "2.2.0"
env_logger = "0.11"
@@ -64,11 +65,9 @@ electrum-client = { version = "0.21.0", default-features = false, features = [
"use-rustls-ring",
"proxy",
] }
boltz-client = { git = "https://github.com/SatoshiPortal/boltz-rust", rev = "12c9e546f15706b563ba7e49f2be7e8a5e7ada90" }
lwk_wollet = { git = "https://github.com/breez/lwk", branch = "breez-sdk-liquid-0.6.3" }
maybe-sync = { version = "0.1.1", features = ["sync"] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tokio-tungstenite = { version = "0.21.0", features = ["native-tls-vendored"] }
tonic = { version = "0.12.3", features = ["tls", "tls-webpki-roots"] }
uuid = { version = "1.8.0", features = ["v4"] }

View File

@@ -3,7 +3,7 @@ use std::{str::FromStr, sync::Arc};
use anyhow::{anyhow, bail, Context, Result};
use boltz_client::{
boltz::{self},
swaps::boltz::{ChainSwapStates, CreateChainResponse, SwapUpdateTxDetails},
swaps::boltz::{ChainSwapStates, CreateChainResponse, TransactionInfo},
ElementsLockTime, Secp256k1, Serialize, ToHex,
};
use futures_util::TryFutureExt;
@@ -89,7 +89,7 @@ impl ChainSwapHandler {
}
/// Handles status updates from Boltz for Chain swaps
pub(crate) async fn on_new_status(&self, update: &boltz::Update) -> Result<()> {
pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
let id = &update.id;
let swap = self.fetch_chain_swap_by_id(id)?;
@@ -191,7 +191,11 @@ impl ChainSwapHandler {
Ok(())
}
async fn on_new_incoming_status(&self, swap: &ChainSwap, update: &boltz::Update) -> Result<()> {
async fn on_new_incoming_status(
&self,
swap: &ChainSwap,
update: &boltz::SwapStatus,
) -> Result<()> {
let id = update.id.clone();
let status = &update.status;
let swap_state = ChainSwapStates::from_str(status)
@@ -511,7 +515,11 @@ impl ChainSwapHandler {
}
}
async fn on_new_outgoing_status(&self, swap: &ChainSwap, update: &boltz::Update) -> Result<()> {
async fn on_new_outgoing_status(
&self,
swap: &ChainSwap,
update: &boltz::SwapStatus,
) -> Result<()> {
let id = update.id.clone();
let status = &update.status;
let swap_state = ChainSwapStates::from_str(status)
@@ -1239,7 +1247,7 @@ impl ChainSwapHandler {
async fn verify_server_lockup_tx(
&self,
chain_swap: &ChainSwap,
swap_update_tx: &SwapUpdateTxDetails,
swap_update_tx: &TransactionInfo,
verify_confirmation: bool,
) -> Result<()> {
match chain_swap.direction {
@@ -1265,7 +1273,7 @@ impl ChainSwapHandler {
async fn verify_incoming_server_lockup_tx(
&self,
chain_swap: &ChainSwap,
swap_update_tx: &SwapUpdateTxDetails,
swap_update_tx: &TransactionInfo,
verify_confirmation: bool,
) -> Result<()> {
let swap_script = chain_swap.get_claim_swap_script()?;
@@ -1275,14 +1283,13 @@ impl ChainSwapHandler {
let address = liquid_swap_script
.to_address(self.config.network.into())
.map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
let tx_hex = swap_update_tx
.hex
.as_ref()
.ok_or(anyhow!("Transaction info without hex"))?;
let tx = self
.liquid_chain_service
.verify_tx(
&address,
&swap_update_tx.id,
&swap_update_tx.hex,
verify_confirmation,
)
.verify_tx(&address, &swap_update_tx.id, tx_hex, verify_confirmation)
.await?;
// Verify RBF
let rbf_explicit = tx.input.iter().any(|tx_in| tx_in.sequence.is_rbf());
@@ -1328,7 +1335,7 @@ impl ChainSwapHandler {
async fn verify_outgoing_server_lockup_tx(
&self,
chain_swap: &ChainSwap,
swap_update_tx: &SwapUpdateTxDetails,
swap_update_tx: &TransactionInfo,
verify_confirmation: bool,
) -> Result<()> {
let swap_script = chain_swap.get_claim_swap_script()?;
@@ -1338,14 +1345,13 @@ impl ChainSwapHandler {
.as_bitcoin_script()?
.to_address(self.config.network.as_bitcoin_chain())
.map_err(|e| anyhow!("Failed to get swap script address {e:?}"))?;
let tx_hex = swap_update_tx
.hex
.as_ref()
.ok_or(anyhow!("Transaction info without hex"))?;
let tx = self
.bitcoin_chain_service
.verify_tx(
&address,
&swap_update_tx.id,
&swap_update_tx.hex,
verify_confirmation,
)
.verify_tx(&address, &swap_update_tx.id, tx_hex, verify_confirmation)
.await?;
// Verify RBF
let rbf_explicit = tx.input.iter().any(|input| input.sequence.is_rbf());

View File

@@ -2,7 +2,7 @@ use anyhow::{anyhow, Result};
use boltz_client::{
bitcoin::ScriptBuf,
boltz::{ChainPair, BOLTZ_MAINNET_URL_V2, BOLTZ_REGTEST, BOLTZ_TESTNET_URL_V2},
network::Chain,
network::{BitcoinChain, Chain, LiquidChain},
swaps::boltz::{
CreateChainResponse, CreateReverseResponse, CreateSubmarineResponse, Leaf, Side, SwapTree,
},
@@ -203,11 +203,11 @@ pub enum LiquidNetwork {
Regtest,
}
impl LiquidNetwork {
pub fn as_bitcoin_chain(&self) -> Chain {
pub fn as_bitcoin_chain(&self) -> BitcoinChain {
match self {
LiquidNetwork::Mainnet => Chain::Bitcoin,
LiquidNetwork::Testnet => Chain::BitcoinTestnet,
LiquidNetwork::Regtest => Chain::BitcoinRegtest,
LiquidNetwork::Mainnet => BitcoinChain::Bitcoin,
LiquidNetwork::Testnet => BitcoinChain::BitcoinTestnet,
LiquidNetwork::Regtest => BitcoinChain::BitcoinRegtest,
}
}
}
@@ -228,11 +228,17 @@ impl From<LiquidNetwork> for ElementsNetwork {
}
impl From<LiquidNetwork> for Chain {
fn from(value: LiquidNetwork) -> Self {
Chain::Liquid(value.into())
}
}
impl From<LiquidNetwork> for LiquidChain {
fn from(value: LiquidNetwork) -> Self {
match value {
LiquidNetwork::Mainnet => Chain::Liquid,
LiquidNetwork::Testnet => Chain::LiquidTestnet,
LiquidNetwork::Regtest => Chain::LiquidRegtest,
LiquidNetwork::Mainnet => LiquidChain::Liquid,
LiquidNetwork::Testnet => LiquidChain::LiquidTestnet,
LiquidNetwork::Regtest => LiquidChain::LiquidRegtest,
}
}
}

View File

@@ -67,7 +67,7 @@ impl ReceiveSwapHandler {
}
/// Handles status updates from Boltz for Receive swaps
pub(crate) async fn on_new_status(&self, update: &boltz::Update) -> Result<()> {
pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
let id = &update.id;
let status = &update.status;
let swap_state = RevSwapStates::from_str(status)
@@ -113,8 +113,11 @@ impl ReceiveSwapHandler {
}
// Looking for lockup script history to verify lockup was broadcasted
let tx_hex = transaction.hex.ok_or(anyhow!(
"Missing lockup transaction hex in swap status update"
))?;
let lockup_tx = match self
.verify_lockup_tx(&receive_swap, &transaction.id, &transaction.hex, false)
.verify_lockup_tx(&receive_swap, &transaction.id, &tx_hex, false)
.await
{
Ok(lockup_tx) => lockup_tx,
@@ -192,8 +195,11 @@ impl ReceiveSwapHandler {
}
// Looking for lockup script history to verify lockup was broadcasted and confirmed
let tx_hex = transaction.hex.ok_or(anyhow!(
"Missing lockup transaction hex in swap status update"
))?;
let lockup_tx = match self
.verify_lockup_tx(&receive_swap, &transaction.id, &transaction.hex, true)
.verify_lockup_tx(&receive_swap, &transaction.id, &tx_hex, true)
.await
{
Ok(lockup_tx) => lockup_tx,

View File

@@ -235,18 +235,19 @@ impl LiquidSdkBuilder {
let event_manager = Arc::new(EventManager::new());
let (shutdown_sender, shutdown_receiver) = watch::channel::<()>(());
let swapper: Arc<dyn Swapper> = match self.swapper.clone() {
Some(swapper) => swapper,
None => {
let proxy_url_fetcher = Arc::new(BoltzProxyFetcher::new(persister.clone()));
Arc::new(BoltzSwapper::new(self.config.clone(), proxy_url_fetcher))
}
};
let status_stream: Arc<dyn SwapperStatusStream> = match self.status_stream.clone() {
Some(status_stream) => status_stream,
None => Arc::from(swapper.create_status_stream()),
};
let (swapper, status_stream): (Arc<dyn Swapper>, Arc<dyn SwapperStatusStream>) =
match (self.swapper.clone(), self.status_stream.clone()) {
(Some(swapper), Some(status_stream)) => (swapper, status_stream),
(maybe_swapper, maybe_status_stream) => {
let proxy_url_fetcher = Arc::new(BoltzProxyFetcher::new(persister.clone()));
let boltz_swapper =
Arc::new(BoltzSwapper::new(self.config.clone(), proxy_url_fetcher)?);
(
maybe_swapper.unwrap_or(boltz_swapper.clone()),
maybe_status_stream.unwrap_or(boltz_swapper),
)
}
};
let recoverer = match self.recoverer.clone() {
Some(recoverer) => recoverer,
@@ -3829,7 +3830,7 @@ mod tests {
use anyhow::{anyhow, Result};
use boltz_client::{
boltz::{self, SwapUpdateTxDetails},
boltz::{self, TransactionInfo},
swaps::boltz::{ChainSwapStates, RevSwapStates, SubSwapStates},
};
use lwk_wollet::{elements::Txid, hashes::hex::DisplayHex};
@@ -3960,11 +3961,12 @@ mod tests {
$status_stream
.clone()
.send_mock_update(boltz::Update {
.send_mock_update(boltz::SwapStatus {
id: swap.id(),
status: $status.to_string(),
transaction: $transaction,
zero_conf_rejected: $zero_conf_rejected,
..Default::default()
})
.await
.unwrap();
@@ -4044,10 +4046,12 @@ mod tests {
persister,
status_stream,
status,
Some(SwapUpdateTxDetails {
Some(TransactionInfo {
id: mock_tx_id.to_string(),
hex: lwk_wollet::elements::encode::serialize(&mock_tx)
.to_lower_hex_string(),
hex: Some(
lwk_wollet::elements::encode::serialize(&mock_tx).to_lower_hex_string()
),
eta: None,
}),
None
);
@@ -4078,10 +4082,12 @@ mod tests {
persister,
status_stream,
status,
Some(SwapUpdateTxDetails {
Some(TransactionInfo {
id: mock_tx_id.to_string(),
hex: lwk_wollet::elements::encode::serialize(&mock_tx)
.to_lower_hex_string(),
hex: Some(
lwk_wollet::elements::encode::serialize(&mock_tx).to_lower_hex_string()
),
eta: None
}),
None
);
@@ -4295,9 +4301,10 @@ mod tests {
persister,
status_stream,
status,
Some(SwapUpdateTxDetails {
Some(TransactionInfo {
id: mock_user_lockup_tx_id.clone(),
hex: mock_user_lockup_tx_hex.clone(),
hex: Some(mock_user_lockup_tx_hex.clone()),
eta: None
}), // sets `update.transaction`
Some(true) // sets `update.zero_conf_rejected`
);
@@ -4323,9 +4330,10 @@ mod tests {
persister,
status_stream,
ChainSwapStates::TransactionServerMempool,
Some(SwapUpdateTxDetails {
Some(TransactionInfo {
id: mock_server_lockup_tx_id.clone(),
hex: mock_server_lockup_tx_hex.clone(),
hex: Some(mock_server_lockup_tx_hex.clone()),
eta: None,
}),
None
);
@@ -4351,9 +4359,10 @@ mod tests {
persister,
status_stream,
ChainSwapStates::TransactionServerConfirmed,
Some(SwapUpdateTxDetails {
Some(TransactionInfo {
id: mock_server_lockup_tx_id,
hex: mock_server_lockup_tx_hex,
hex: Some(mock_server_lockup_tx_hex),
eta: None,
}),
None
);

View File

@@ -72,7 +72,7 @@ impl SendSwapHandler {
}
/// Handles status updates from Boltz for Send swaps
pub(crate) async fn on_new_status(&self, update: &boltz::Update) -> Result<()> {
pub(crate) async fn on_new_status(&self, update: &boltz::SwapStatus) -> Result<()> {
let id = &update.id;
let status = &update.status;
let swap_state = SubSwapStates::from_str(status)

View File

@@ -29,10 +29,11 @@ impl<P: ProxyUrlFetcher> BoltzSwapper<P> {
BtcSwapTx::new_refund(
swap_script.as_bitcoin_script()?,
refund_address,
&self.bitcoin_electrum_config,
&self.bitcoin_electrum_client,
self.get_url().await?,
swap.id.clone(),
)
.await
}
Direction::Outgoing => {
return Err(SdkError::generic(format!(
@@ -97,11 +98,13 @@ impl<P: ProxyUrlFetcher> BoltzSwapper<P> {
false => None,
};
let signed_tx = refund_tx.sign_refund(
&refund_keypair,
Fee::Absolute(broadcast_fees_sat),
cooperative,
)?;
let signed_tx = refund_tx
.sign_refund(
&refund_keypair,
Fee::Absolute(broadcast_fees_sat),
cooperative,
)
.await?;
Ok(signed_tx)
}
@@ -115,20 +118,23 @@ impl<P: ProxyUrlFetcher> BoltzSwapper<P> {
let claim_tx_wrapper = BtcSwapTx::new_claim(
claim_swap_script,
claim_address,
&self.bitcoin_electrum_config,
&self.bitcoin_electrum_client,
self.get_url().await?,
swap.id.clone(),
)?;
)
.await?;
let (partial_sig, pub_nonce) = self.get_claim_partial_sig(swap).await?;
let signed_tx = claim_tx_wrapper.sign_claim(
&claim_keypair,
&Preimage::from_str(&swap.preimage)?,
Fee::Absolute(swap.claim_fees_sat),
self.get_cooperative_details(swap.id.clone(), Some(pub_nonce), Some(partial_sig))
.await?,
)?;
let signed_tx = claim_tx_wrapper
.sign_claim(
&claim_keypair,
&Preimage::from_str(&swap.preimage)?,
Fee::Absolute(swap.claim_fees_sat),
self.get_cooperative_details(swap.id.clone(), Some(pub_nonce), Some(partial_sig))
.await?,
)
.await?;
Ok(signed_tx)
}

View File

@@ -1,11 +1,8 @@
use std::str::FromStr;
use boltz_client::{
boltz::SwapTxKind,
elements::Transaction,
fees::Fee,
util::{liquid_genesis_hash, secrets::Preimage},
ElementsAddress as Address, LBtcSwapTx,
boltz::SwapTxKind, elements::Transaction, fees::Fee, network::LiquidClient,
util::secrets::Preimage, ElementsAddress as Address, LBtcSwapTx,
};
use log::info;
@@ -40,19 +37,22 @@ impl<P: ProxyUrlFetcher> BoltzSwapper<P> {
let claim_tx_wrapper = LBtcSwapTx::new_claim(
swap_script,
claim_address,
&self.liquid_electrum_config,
&self.liquid_electrum_client,
self.get_url().await?,
swap.id.clone(),
)?;
)
.await?;
let signed_tx = claim_tx_wrapper.sign_claim(
&swap.get_claim_keypair()?,
&Preimage::from_str(&swap.preimage)?,
Fee::Absolute(swap.claim_fees_sat),
self.get_cooperative_details(swap.id.clone(), None, None)
.await?,
true,
)?;
let signed_tx = claim_tx_wrapper
.sign_claim(
&swap.get_claim_keypair()?,
&Preimage::from_str(&swap.preimage)?,
Fee::Absolute(swap.claim_fees_sat),
self.get_cooperative_details(swap.id.clone(), None, None)
.await?,
true,
)
.await?;
Ok(signed_tx)
}
@@ -67,21 +67,24 @@ impl<P: ProxyUrlFetcher> BoltzSwapper<P> {
let claim_tx_wrapper = LBtcSwapTx::new_claim(
swap_script,
claim_address,
&self.liquid_electrum_config,
&self.liquid_electrum_client,
self.get_url().await?,
swap.id.clone(),
)?;
)
.await?;
let (partial_sig, pub_nonce) = self.get_claim_partial_sig(swap).await?;
let signed_tx = claim_tx_wrapper.sign_claim(
&claim_keypair,
&Preimage::from_str(&swap.preimage)?,
Fee::Absolute(swap.claim_fees_sat),
self.get_cooperative_details(swap.id.clone(), Some(pub_nonce), Some(partial_sig))
.await?,
true,
)?;
let signed_tx = claim_tx_wrapper
.sign_claim(
&claim_keypair,
&Preimage::from_str(&swap.preimage)?,
Fee::Absolute(swap.claim_fees_sat),
self.get_cooperative_details(swap.id.clone(), Some(pub_nonce), Some(partial_sig))
.await?,
true,
)
.await?;
Ok(signed_tx)
}
@@ -108,10 +111,11 @@ impl<P: ProxyUrlFetcher> BoltzSwapper<P> {
LBtcSwapTx::new_refund(
swap_script.as_liquid_script()?,
refund_address,
&self.liquid_electrum_config,
&self.liquid_electrum_client,
self.get_url().await?,
swap.id.clone(),
)
.await
}
},
Swap::Send(swap) => {
@@ -119,10 +123,11 @@ impl<P: ProxyUrlFetcher> BoltzSwapper<P> {
LBtcSwapTx::new_refund(
swap_script,
refund_address,
&self.liquid_electrum_config,
&self.liquid_electrum_client,
self.get_url().await?,
swap.id.clone(),
)
.await
}
Swap::Receive(swap) => {
return Err(SdkError::generic(format!(
@@ -165,7 +170,7 @@ impl<P: ProxyUrlFetcher> BoltzSwapper<P> {
let address = Address::from_str(refund_address)
.map_err(|err| SdkError::generic(format!("Could not parse address: {err:?}")))?;
let genesis_hash = liquid_genesis_hash(&self.liquid_electrum_config)?;
let genesis_hash = self.liquid_electrum_client.get_genesis_hash().await?;
let (funding_outpoint, funding_tx_out) =
*utxos
@@ -193,12 +198,14 @@ impl<P: ProxyUrlFetcher> BoltzSwapper<P> {
false => None,
};
let signed_tx = refund_tx.sign_refund(
&refund_keypair,
Fee::Absolute(broadcast_fees_sat),
cooperative,
true,
)?;
let signed_tx = refund_tx
.sign_refund(
&refund_keypair,
Fee::Absolute(broadcast_fees_sat),
cooperative,
true,
)
.await?;
Ok(signed_tx)
}
}

View File

@@ -8,19 +8,19 @@ use crate::{
use anyhow::Result;
use boltz_client::{
boltz::{
BoltzApiClientV2, ChainPair, Cooperative, CreateChainRequest, CreateChainResponse,
self, BoltzApiClientV2, ChainPair, Cooperative, CreateChainRequest, CreateChainResponse,
CreateReverseRequest, CreateReverseResponse, CreateSubmarineRequest,
CreateSubmarineResponse, ReversePair, SubmarineClaimTxResponse, SubmarinePair,
},
elements::secp256k1_zkp::{MusigPartialSignature, MusigPubNonce},
network::{electrum::ElectrumConfig, Chain},
network::{electrum::ElectrumBitcoinClient, electrum::ElectrumLiquidClient, Chain},
Amount,
};
use log::info;
use proxy::split_proxy_url;
use tokio::sync::broadcast;
use self::status_stream::BoltzStatusStream;
use super::{ProxyUrlFetcher, Swapper, SwapperStatusStream};
use super::{ProxyUrlFetcher, Swapper};
pub(crate) mod bitcoin;
pub(crate) mod liquid;
@@ -36,37 +36,44 @@ pub(crate) struct BoltzClient {
pub struct BoltzSwapper<P: ProxyUrlFetcher> {
config: Config,
client: OnceLock<BoltzClient>,
liquid_electrum_config: ElectrumConfig,
bitcoin_electrum_config: ElectrumConfig,
liquid_electrum_client: ElectrumLiquidClient,
bitcoin_electrum_client: ElectrumBitcoinClient,
proxy_url: Arc<P>,
subscription_notifier: broadcast::Sender<String>,
update_notifier: broadcast::Sender<boltz::SwapStatus>,
}
impl<P: ProxyUrlFetcher> BoltzSwapper<P> {
pub fn new(config: Config, proxy_url: Arc<P>) -> Self {
pub fn new(config: Config, proxy_url: Arc<P>) -> Result<Self, SdkError> {
let (tls, validate_domain) = match config.network {
LiquidNetwork::Mainnet | LiquidNetwork::Testnet => (true, true),
LiquidNetwork::Regtest => (false, false),
};
Self {
let (subscription_notifier, _) = broadcast::channel::<String>(30);
let (update_notifier, _) = broadcast::channel::<boltz::SwapStatus>(30);
Ok(Self {
proxy_url,
client: OnceLock::new(),
config: config.clone(),
liquid_electrum_config: ElectrumConfig::new(
liquid_electrum_client: ElectrumLiquidClient::new(
config.network.into(),
&config.liquid_electrum_url,
tls,
validate_domain,
100,
),
bitcoin_electrum_config: ElectrumConfig::new(
)?,
bitcoin_electrum_client: ElectrumBitcoinClient::new(
config.network.as_bitcoin_chain(),
&config.bitcoin_electrum_url,
tls,
validate_domain,
100,
),
}
)?,
subscription_notifier,
update_notifier,
})
}
async fn get_client(&self) -> Result<&BoltzClient> {
@@ -110,7 +117,8 @@ impl<P: ProxyUrlFetcher> BoltzSwapper<P> {
.get_client()
.await?
.inner
.get_chain_claim_tx_details(&swap.id)?;
.get_chain_claim_tx_details(&swap.id)
.await?;
match swap.direction {
Direction::Incoming => {
let refund_tx_wrapper = self
@@ -165,7 +173,7 @@ impl<P: ProxyUrlFetcher> Swapper for BoltzSwapper<P> {
referral_id: client.referral_id.clone(),
..req.clone()
};
Ok(client.inner.post_chain_req(modified_req)?)
Ok(client.inner.post_chain_req(modified_req).await?)
}
/// Create a new send swap
@@ -178,14 +186,14 @@ impl<P: ProxyUrlFetcher> Swapper for BoltzSwapper<P> {
referral_id: client.referral_id.clone(),
..req.clone()
};
Ok(client.inner.post_swap_req(&modified_req)?)
Ok(client.inner.post_swap_req(&modified_req).await?)
}
async fn get_chain_pair(
&self,
direction: Direction,
) -> Result<Option<ChainPair>, PaymentError> {
let pairs = self.get_client().await?.inner.get_chain_pairs()?;
let pairs = self.get_client().await?.inner.get_chain_pairs().await?;
let pair = match direction {
Direction::Incoming => pairs.get_btc_to_lbtc_pair(),
Direction::Outgoing => pairs.get_lbtc_to_btc_pair(),
@@ -196,7 +204,7 @@ impl<P: ProxyUrlFetcher> Swapper for BoltzSwapper<P> {
async fn get_chain_pairs(
&self,
) -> Result<(Option<ChainPair>, Option<ChainPair>), PaymentError> {
let pairs = self.get_client().await?.inner.get_chain_pairs()?;
let pairs = self.get_client().await?.inner.get_chain_pairs().await?;
let pair_outgoing = pairs.get_lbtc_to_btc_pair();
let pair_incoming = pairs.get_btc_to_lbtc_pair();
Ok((pair_outgoing, pair_incoming))
@@ -207,6 +215,7 @@ impl<P: ProxyUrlFetcher> Swapper for BoltzSwapper<P> {
.await?
.inner
.get_quote(swap_id)
.await
.map(|r| Amount::from_sat(r.amount))
.map_err(Into::into)
}
@@ -220,6 +229,7 @@ impl<P: ProxyUrlFetcher> Swapper for BoltzSwapper<P> {
.await?
.inner
.accept_quote(swap_id, server_lockup_sat)
.await
.map_err(Into::into)
}
@@ -229,7 +239,8 @@ impl<P: ProxyUrlFetcher> Swapper for BoltzSwapper<P> {
.get_client()
.await?
.inner
.get_submarine_pairs()?
.get_submarine_pairs()
.await?
.get_lbtc_to_btc_pair())
}
@@ -239,7 +250,8 @@ impl<P: ProxyUrlFetcher> Swapper for BoltzSwapper<P> {
.get_client()
.await?
.inner
.get_submarine_preimage(swap_id)?
.get_submarine_preimage(swap_id)
.await?
.preimage)
}
@@ -254,7 +266,8 @@ impl<P: ProxyUrlFetcher> Swapper for BoltzSwapper<P> {
.get_client()
.await?
.inner
.get_submarine_claim_tx_details(&swap.id)?;
.get_submarine_claim_tx_details(&swap.id)
.await?;
info!("Received claim tx details: {:?}", &claim_tx_response);
self.validate_send_swap_preimage(&swap.id, &swap.invoice, &claim_tx_response.preimage)?;
@@ -286,7 +299,8 @@ impl<P: ProxyUrlFetcher> Swapper for BoltzSwapper<P> {
self.get_client()
.await?
.inner
.post_submarine_claim_tx_details(&swap_id.to_string(), pub_nonce, partial_sig)?;
.post_submarine_claim_tx_details(&swap_id.to_string(), pub_nonce, partial_sig)
.await?;
info!("Successfully sent claim details for swap-in {swap_id}");
Ok(())
}
@@ -301,7 +315,7 @@ impl<P: ProxyUrlFetcher> Swapper for BoltzSwapper<P> {
referral_id: client.referral_id.clone(),
..req.clone()
};
Ok(client.inner.post_reverse_req(modified_req)?)
Ok(client.inner.post_reverse_req(modified_req).await?)
}
// Get a reverse pair information
@@ -310,7 +324,8 @@ impl<P: ProxyUrlFetcher> Swapper for BoltzSwapper<P> {
.get_client()
.await?
.inner
.get_reverse_pairs()?
.get_reverse_pairs()
.await?
.get_btc_to_lbtc_pair())
}
@@ -457,7 +472,8 @@ impl<P: ProxyUrlFetcher> Swapper for BoltzSwapper<P> {
.get_client()
.await?
.inner
.broadcast_tx(chain, &tx_hex.into())?;
.broadcast_tx(chain, &tx_hex.into())
.await?;
let err = format!("Unexpected response from Boltz server: {response}");
let tx_id = response
.as_object()
@@ -470,22 +486,13 @@ impl<P: ProxyUrlFetcher> Swapper for BoltzSwapper<P> {
Ok(tx_id)
}
fn create_status_stream(&self) -> Box<dyn SwapperStatusStream> {
Box::new(BoltzStatusStream::new(
self.config.clone(),
self.proxy_url.clone(),
))
}
async fn check_for_mrh(
&self,
invoice: &str,
) -> Result<Option<(String, boltz_client::bitcoin::Amount)>, PaymentError> {
async fn check_for_mrh(&self, invoice: &str) -> Result<Option<(String, Amount)>, PaymentError> {
boltz_client::swaps::magic_routing::check_for_mrh(
&self.get_client().await?.inner,
invoice,
self.config.network.into(),
)
.await
.map_err(Into::into)
}
@@ -498,7 +505,8 @@ impl<P: ProxyUrlFetcher> Swapper for BoltzSwapper<P> {
.get_client()
.await?
.inner
.get_bolt12_invoice(offer, amount_sat)?;
.get_bolt12_invoice(offer, amount_sat)
.await?;
info!("Received BOLT12 invoice response: {invoice_res:?}");
Ok(invoice_res.invoice)
}

View File

@@ -2,65 +2,31 @@ use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Result};
use boltz_client::swaps::boltz::{self, Subscription, SwapUpdate};
use futures_util::{SinkExt, StreamExt};
use crate::swapper::{
boltz::BoltzSwapper, ProxyUrlFetcher, SubscriptionHandler, SwapperStatusStream,
};
use anyhow::Result;
use boltz_client::boltz::{
self,
tokio_tungstenite_wasm::{Message, WebSocketStream},
WsRequest, WsResponse,
};
use futures_util::{stream::SplitSink, SinkExt, StreamExt};
use log::{debug, error, info, warn};
use tokio::net::TcpStream;
use tokio::sync::{broadcast, watch};
use tokio::time::MissedTickBehavior;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use url::Url;
use crate::model::Config;
use crate::swapper::{SubscriptionHandler, SwapperStatusStream};
use super::{split_proxy_url, ProxyUrlFetcher};
pub(crate) struct BoltzStatusStream {
config: Config,
proxy_url: Arc<dyn ProxyUrlFetcher>,
subscription_notifier: broadcast::Sender<String>,
update_notifier: broadcast::Sender<boltz::Update>,
}
impl BoltzStatusStream {
pub(crate) fn new(config: Config, proxy_url: Arc<dyn ProxyUrlFetcher>) -> Self {
let (subscription_notifier, _) = broadcast::channel::<String>(30);
let (update_notifier, _) = broadcast::channel::<boltz::Update>(30);
Self {
config,
proxy_url,
subscription_notifier,
update_notifier,
}
}
async fn connect(&self) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
let default_url = self.config.default_boltz_url().to_string();
let url = match self.proxy_url.fetch().await {
Ok(Some(url)) => split_proxy_url(url).0.unwrap_or(default_url),
_ => default_url,
};
let url = url.replace("http", "ws") + "/ws";
let (socket, _) = connect_async(Url::parse(&url)?)
.await
.map_err(|e| anyhow!("Failed to connect to websocket: {e:?}"))?;
Ok(socket)
}
impl<P: ProxyUrlFetcher> BoltzSwapper<P> {
async fn send_subscription(
&self,
swap_id: String,
ws_stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
sender: &mut SplitSink<WebSocketStream, Message>,
) {
info!("Subscribing to status updates for swap ID {swap_id}");
let subscription = Subscription::new(&swap_id);
let subscription = WsRequest::subscribe_swap_request(&swap_id);
match serde_json::to_string(&subscription) {
Ok(subscribe_json) => match ws_stream.send(Message::Text(subscribe_json)).await {
Ok(subscribe_json) => match sender.send(Message::Text(subscribe_json.into())).await {
Ok(_) => info!("Subscribed"),
Err(e) => error!("Failed to subscribe to {swap_id}: {e:?}"),
},
@@ -69,16 +35,7 @@ impl BoltzStatusStream {
}
}
impl SwapperStatusStream for BoltzStatusStream {
fn track_swap_id(&self, swap_id: &str) -> Result<()> {
let _ = self.subscription_notifier.send(swap_id.to_string());
Ok(())
}
fn subscribe_swap_updates(&self) -> broadcast::Receiver<boltz::Update> {
self.update_notifier.subscribe()
}
impl<P: ProxyUrlFetcher> SwapperStatusStream for BoltzSwapper<P> {
fn start(
self: Arc<Self>,
callback: Box<dyn SubscriptionHandler>,
@@ -87,11 +44,22 @@ impl SwapperStatusStream for BoltzStatusStream {
let keep_alive_ping_interval = Duration::from_secs(15);
let reconnect_delay = Duration::from_secs(2);
let swapper = Arc::clone(&self);
tokio::spawn(async move {
loop {
debug!("Start of ws stream loop");
match self.connect().await {
Ok(mut ws_stream) => {
let client = match swapper.get_client().await {
Ok(client) => client,
Err(e) => {
warn!("Failed to get swapper client: {e:?}");
tokio::time::sleep(reconnect_delay).await;
continue;
}
};
match client.inner.connect_ws().await {
Ok(ws_stream) => {
let (mut sender, mut receiver) = ws_stream.split();
let mut tracked_swap_ids: HashSet<String> = HashSet::new();
let mut subscription_stream = self.subscription_notifier.subscribe();
@@ -108,23 +76,29 @@ impl SwapperStatusStream for BoltzStatusStream {
},
_ = interval.tick() => {
match ws_stream.send(Message::Ping(vec![])).await {
Ok(_) => debug!("Sent keep-alive ping"),
Err(e) => warn!("Failed to send keep-alive ping: {e:?}"),
match serde_json::to_string(&WsRequest::Ping) {
Ok(ping_msg) => {
match sender.send(Message::Text(ping_msg.into())).await {
Ok(_) => debug!("Sent keep-alive ping"),
Err(e) => warn!("Failed to send keep-alive ping: {e:?}"),
}
},
Err(e) => error!("Failed to serialize ping message: {e:?}"),
}
},
swap_res = subscription_stream.recv() => match swap_res {
Ok(swap_id) => {
if !tracked_swap_ids.contains(&swap_id) {
self.send_subscription(swap_id.clone(), &mut ws_stream).await;
self.send_subscription(swap_id.clone(), &mut sender).await;
tracked_swap_ids.insert(swap_id.clone());
}
},
Err(e) => error!("Received error on subscription stream: {e:?}"),
},
maybe_next = ws_stream.next() => match maybe_next {
maybe_next = receiver.next() => match maybe_next {
Some(msg) => match msg {
Ok(Message::Close(_)) => {
warn!("Received close msg, exiting socket loop");
@@ -132,42 +106,36 @@ impl SwapperStatusStream for BoltzStatusStream {
break;
},
Ok(Message::Text(payload)) => {
let payload = payload.as_str();
info!("Received text msg: {payload:?}");
match serde_json::from_str::<SwapUpdate>(&payload) {
// Subscription confirmation
Ok(SwapUpdate::Subscription { .. }) => {}
match serde_json::from_str::<WsResponse>(payload) {
// Subscribing/unsubscribing confirmation
Ok(WsResponse::Subscribe { .. }) | Ok(WsResponse::Unsubscribe { .. }) => {}
// Status update(s)
Ok(SwapUpdate::Update {
args,
..
}) => {
for update in args {
Ok(WsResponse::Update(update)) => {
for update in update.args {
let _ = self.update_notifier.send(update);
}
}
// Error related to subscription, like "Unknown swap ID"
Ok(SwapUpdate::Error {
args,
..
}) => error!("Received a status update error: {args:?}"),
// A response to one of our pings
Ok(WsResponse::Pong) => debug!("Received pong"),
Err(e) => warn!("WS response is invalid SwapUpdate: {e:?}"),
// Either an invalid response, or an error related to subscription
Err(e) => error!("Failed to parse websocket response: {e:?} - response: {payload}"),
}
},
Ok(Message::Ping(_)) => debug!("Received ping"),
Ok(Message::Pong(_)) => debug!("Received pong"),
Ok(msg) => warn!("Unhandled msg: {msg:?}"),
Err(e) => {
error!("Received stream error: {e:?}");
let _ = ws_stream.close(None).await;
let _ = sender.close().await;
break;
}
},
None => {
warn!("Received nothing from the stream");
let _ = ws_stream.close(None).await;
let _ = sender.close().await;
tokio::time::sleep(reconnect_delay).await;
break;
},
@@ -176,11 +144,20 @@ impl SwapperStatusStream for BoltzStatusStream {
}
}
Err(e) => {
warn!("Error connecting to stream: {e}");
warn!("Error connecting to stream: {e:?}");
tokio::time::sleep(reconnect_delay).await;
}
}
}
});
}
fn track_swap_id(&self, swap_id: &str) -> Result<()> {
let _ = self.subscription_notifier.send(swap_id.to_string());
Ok(())
}
fn subscribe_swap_updates(&self) -> broadcast::Receiver<boltz::SwapStatus> {
self.update_notifier.subscribe()
}
}

View File

@@ -118,8 +118,6 @@ pub trait Swapper: Send + Sync {
/// Broadcasts a transaction and returns its id
async fn broadcast_tx(&self, chain: Chain, tx_hex: &str) -> Result<String, PaymentError>;
fn create_status_stream(&self) -> Box<dyn SwapperStatusStream>;
/// Look for a valid Magic Routing Hint. If found, validate it and extract the BIP21 info (amount, address).
async fn check_for_mrh(
&self,
@@ -139,8 +137,8 @@ pub trait SwapperStatusStream: Send + Sync {
callback: Box<dyn SubscriptionHandler>,
shutdown: watch::Receiver<()>,
);
fn track_swap_id(&self, swap_id: &str) -> anyhow::Result<()>;
fn subscribe_swap_updates(&self) -> broadcast::Receiver<boltz_client::boltz::Update>;
fn track_swap_id(&self, swap_id: &str) -> Result<()>;
fn subscribe_swap_updates(&self) -> broadcast::Receiver<boltz_client::boltz::SwapStatus>;
}
#[sdk_macros::async_trait]

View File

@@ -35,7 +35,7 @@ pub(crate) fn new_chain_swap_handler(persister: Arc<Persister>) -> Result<ChainS
let swapper = Arc::new(BoltzSwapper::new(
config.clone(),
Arc::new(MockProxyUrlFetcher::new()),
));
)?);
let liquid_chain_service = Arc::new(MockLiquidChainService::new());
let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new());

View File

@@ -9,17 +9,17 @@ use tokio::sync::{broadcast, watch};
use crate::swapper::{SubscriptionHandler, SwapperStatusStream};
pub(crate) struct MockStatusStream {
pub update_notifier: broadcast::Sender<boltz::Update>,
pub update_notifier: broadcast::Sender<boltz::SwapStatus>,
}
impl MockStatusStream {
pub(crate) fn new() -> Self {
let (update_notifier, _) = broadcast::channel::<boltz::Update>(30);
let (update_notifier, _) = broadcast::channel::<boltz::SwapStatus>(30);
Self { update_notifier }
}
pub(crate) async fn send_mock_update(self: Arc<Self>, update: boltz::Update) -> Result<()> {
pub(crate) async fn send_mock_update(self: Arc<Self>, update: boltz::SwapStatus) -> Result<()> {
tokio::spawn(async move {
self.update_notifier.send(update).unwrap();
})
@@ -40,7 +40,7 @@ impl SwapperStatusStream for MockStatusStream {
Ok(())
}
fn subscribe_swap_updates(&self) -> broadcast::Receiver<boltz::Update> {
fn subscribe_swap_updates(&self) -> broadcast::Receiver<boltz::SwapStatus> {
self.update_notifier.subscribe()
}
}

View File

@@ -23,8 +23,6 @@ use crate::{
utils,
};
use super::status_stream::MockStatusStream;
#[derive(Default)]
pub struct ZeroAmountSwapMockConfig {
pub user_lockup_sat: u64,
@@ -338,10 +336,6 @@ impl Swapper for MockSwapper {
Ok(tx.txid().to_string())
}
fn create_status_stream(&self) -> Box<dyn crate::swapper::SwapperStatusStream> {
Box::new(MockStatusStream::new())
}
async fn check_for_mrh(
&self,
_invoice: &str,