diff --git a/cli/Cargo.lock b/cli/Cargo.lock index daadbd2..f4f3cb5 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -405,7 +405,9 @@ dependencies = [ "rusqlite", "rusqlite_migration", "serde", + "serde_json", "thiserror", + "tungstenite", ] [[package]] diff --git a/lib/Cargo.lock b/lib/Cargo.lock index b8d1ab6..527c8d4 100644 --- a/lib/Cargo.lock +++ b/lib/Cargo.lock @@ -512,8 +512,10 @@ dependencies = [ "rusqlite", "rusqlite_migration", "serde", + "serde_json", "tempdir", "thiserror", + "tungstenite", "uuid", ] diff --git a/lib/core/Cargo.toml b/lib/core/Cargo.toml index f91e7ac..85f917c 100644 --- a/lib/core/Cargo.toml +++ b/lib/core/Cargo.toml @@ -23,6 +23,8 @@ thiserror = { workspace = true } openssl = { version = "0.10", features = ["vendored"] } # TODO Remove once fully migrated to v2 API elements = "0.24.1" +serde_json = "1.0.116" +tungstenite = { version = "0.21.0", features = ["native-tls-vendored"] } [dev-dependencies] tempdir = "0.3.7" diff --git a/lib/core/src/lib.rs b/lib/core/src/lib.rs index bf40ec2..cd70d01 100644 --- a/lib/core/src/lib.rs +++ b/lib/core/src/lib.rs @@ -6,3 +6,4 @@ pub mod frb; pub mod model; pub mod persist; pub mod sdk; +pub(crate) mod utils; diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index b0fd071..4ca8107 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -16,8 +16,8 @@ use boltz_client::{ BOLTZ_MAINNET_URL, BOLTZ_TESTNET_URL, }, boltzv2::{ - BoltzApiClientV2, ReversePair, SubmarinePair, BOLTZ_MAINNET_URL_V2, - BOLTZ_TESTNET_URL_V2, + BoltzApiClientV2, CreateSubmarineRequest, ReversePair, SubmarinePair, Subscription, + BOLTZ_MAINNET_URL_V2, BOLTZ_TESTNET_URL_V2, }, liquid::{LBtcSwapScript, LBtcSwapTx}, }, @@ -34,15 +34,15 @@ use lwk_wollet::{ ElementsNetwork, FsPersister, Wollet as LwkWollet, WolletDescriptor, }; -use crate::{ensure_sdk, error::PaymentError, get_invoice_amount, model::*, persist::Persister}; +use crate::{ + ensure_sdk, error::PaymentError, get_invoice_amount, model::*, persist::Persister, + utils::get_swap_status_v2, +}; /// Claim tx feerate, in sats per vbyte. /// Since the Liquid blocks are consistently empty for now, we hardcode the minimum feerate. pub const LIQUID_CLAIM_TX_FEERATE: f32 = 0.1; -// TODO: Remove in favor of V2 API (necessary as V1 claim_estimate is not working) -pub const LIQUID_MIN_CLAIM_ABSOLUTE_FEES: u64 = 134; - pub const DEFAULT_DATA_DIR: &str = ".data"; pub struct LiquidSdk { @@ -137,9 +137,9 @@ impl LiquidSdk { .map_err(|e| anyhow!("Failed to fetch swap status for ID {id}: {e:?}"))? .status; - let swap_state = status - .parse::() - .map_err(|_| anyhow!("Invalid reverse swap state received for swap {id}: {status}",))?; + let swap_state = status.parse::().map_err(|_| { + anyhow!("Invalid reverse swap state received for swap {id}: {status}",) + })?; match swap_state { RevSwapStates::SwapExpired @@ -190,9 +190,9 @@ impl LiquidSdk { .map_err(|e| anyhow!("Failed to fetch swap status for ID {id}: {e:?}"))? .status; - let state: SubSwapStates = status - .parse() - .map_err(|_| anyhow!("Invalid submarine swap state received for swap {id}: {status}"))?; + let state: SubSwapStates = status.parse().map_err(|_| { + anyhow!("Invalid submarine swap state received for swap {id}: {status}") + })?; match state { SubSwapStates::TransactionClaimed @@ -368,20 +368,12 @@ impl LiquidSdk { .ok_or(PaymentError::AmountOutOfRange)? / 1000; - // let client = self.boltz_client_v2(); - // let lbtc_pair = Self::validate_submarine_pairs(&client, receiver_amount_sat)?; - - let client = self.boltz_client(); - let lbtc_pair = client - .get_pairs()? - .get_lbtc_pair() - .ok_or(PaymentError::PairsNotFound)?; + let client = self.boltz_client_v2(); + let lbtc_pair = Self::validate_submarine_pairs(&client, receiver_amount_sat)?; Ok(PrepareSendResponse { invoice: req.invoice.clone(), - fees_sat: lbtc_pair.fees.submarine_boltz(receiver_amount_sat) - + lbtc_pair.fees.submarine_lockup_estimate() - + LIQUID_MIN_CLAIM_ABSOLUTE_FEES, + fees_sat: lbtc_pair.fees.total(receiver_amount_sat), }) } @@ -395,61 +387,160 @@ impl LiquidSdk { .ok_or(PaymentError::AmountOutOfRange)? / 1000; - // let client = self.boltz_client_v2(); - // let lbtc_pair = Self::validate_submarine_pairs(&client, receiver_amount_sat)?; - - let client = self.boltz_client(); - let lbtc_pair = client - .get_pairs()? - .get_lbtc_pair() - .ok_or(PaymentError::PairsNotFound)?; - - // let new_fees = lbtc_pair.fees.total(receiver_amount_sat); - let new_fees = lbtc_pair.fees.submarine_boltz(receiver_amount_sat) - + lbtc_pair.fees.submarine_lockup_estimate() - + LIQUID_MIN_CLAIM_ABSOLUTE_FEES; - - ensure_sdk!(req.fees_sat == new_fees, PaymentError::InvalidOrExpiredFees); + let client = self.boltz_client_v2(); + let lbtc_pair = Self::validate_submarine_pairs(&client, receiver_amount_sat)?; ensure_sdk!( - receiver_amount_sat + req.fees_sat <= self.total_balance_sat(true)?, - PaymentError::InsufficientFunds + req.fees_sat == lbtc_pair.fees.total(receiver_amount_sat), + PaymentError::InvalidOrExpiredFees ); - let swap_response = client.create_swap(CreateSwapRequest::new_lbtc_submarine( - &lbtc_pair.hash, - &req.invoice.to_string(), - // TODO Add refund - "", - ))?; + let lwk_wollet = self.lwk_wollet.lock().unwrap(); + // let our_pubkey = self + // .address()? + // .to_unconfidential() + // .blinding_pubkey + // .ok_or(PaymentError::Generic { + // err: "Could not retrieve wallet pubkey".to_string(), + // })? + // .into(); + let refund_public_key = lwk_wollet + .address(None)? + .address() + .blinding_pubkey + .ok_or(PaymentError::Generic { + err: "Could not generate refund pubkey".to_string(), + })? + .into(); - let id = swap_response.get_id(); - let funding_address = swap_response.get_funding_address()?; - let funding_amount_sat = swap_response.get_funding_amount()?; + // Unlock lwk wallet so it can be used to build the tx + std::mem::drop(lwk_wollet); - // let absolute_fees = self - // .build_tx(None, &funding_address, funding_amount_sat)? - // .all_fees() - // .values() - // .sum::(); - // let fee_rate = req.fees_sat as f32 * LIQUID_CLAIM_TX_FEERATE / absolute_fees as f32; - // - // let tx = self.build_tx(Some(fee_rate), &funding_address, funding_amount_sat)?; - let tx = self.build_tx(None, &funding_address, funding_amount_sat)?; + let create_response = client.post_swap_req(&CreateSubmarineRequest { + from: "L-BTC".to_string(), + to: "BTC".to_string(), + invoice: req.invoice.to_string(), + // TODO: Add refund flow + refund_public_key, + // TODO: Add referral id + referral_id: None, + })?; - let electrum_client = ElectrumClient::new(&self.electrum_url)?; - let txid = electrum_client.broadcast(&tx)?.to_string(); + // let swap_script = LBtcSwapScriptV2::submarine_from_swap_resp(&create_response, our_pubkey)?; + debug!("Opening WS connection for swap {}", create_response.id); + + let mut socket = client.connect_ws().unwrap(); + let subscription = Subscription::new(&create_response.id); + let subscribe_json = serde_json::to_string(&subscription) + .map_err(|e| anyhow!("Failed to serialize subscription msg: {e:?}"))?; + socket + .send(tungstenite::Message::Text(subscribe_json)) + .map_err(|e| anyhow!("Failed to subscribe to websocket updates: {e:?}"))?; self.persister .insert_or_update_ongoing_swap(&[OngoingSwap::Send { - id, + id: create_response.id.clone(), invoice: req.invoice.clone(), - payer_amount_sat: receiver_amount_sat + req.fees_sat, - txid: Some(txid.clone()), - }]) - .map_err(|_| PaymentError::PersistError)?; + payer_amount_sat: req.fees_sat + receiver_amount_sat, + txid: None, + }])?; - Ok(SendPaymentResponse { txid }) + let result; + loop { + let data = match get_swap_status_v2(&mut socket, &create_response.id) { + Ok(data) => data, + Err(_) => continue, + }; + + let state = data + .parse::() + .map_err(|_| PaymentError::Generic { + err: "Invalid state received from swapper".to_string(), + })?; + + match state { + SubSwapStates::TransactionMempool | SubSwapStates::TransactionConfirmed => { + // Send detected by Boltz, waiting for invoice + // to be settled + } + SubSwapStates::InvoiceSet => { + debug!( + "Send {} sats to BTC address {}", + create_response.expected_amount, create_response.address + ); + // let absolute_fees = self + // .build_tx( + // None, + // &create_response.address, + // create_response.expected_amount, + // )? + // .all_fees() + // .values() + // .sum::(); + // let fee_rate = + // req.fees_sat as f32 / absolute_fees as f32 * LIQUID_CLAIM_TX_FEERATE; + let tx = self.build_tx( + None, + &create_response.address, + create_response.expected_amount, + )?; + + let txid = match self.network { + Network::Liquid => { + let tx_hex = elements::encode::serialize(&tx).to_lower_hex_string(); + let response = client.broadcast_tx(self.network.into(), &tx_hex)?; + response + .as_object() + .ok_or(PaymentError::Generic { + err: "Invalid data received from swapper".to_string(), + })? + .get("id") + .ok_or(PaymentError::Generic { + err: "Invalid data received from swapper".to_string(), + })? + .as_str() + .ok_or(PaymentError::Generic { + err: "Invalid data received from swapper".to_string(), + })? + .to_string() + } + Network::LiquidTestnet => { + let electrum_client = ElectrumClient::new(&self.electrum_url)?; + electrum_client.broadcast(&tx)?.to_string() + } + }; + + self.persister + .insert_or_update_ongoing_swap(&[OngoingSwap::Send { + id: create_response.id.clone(), + invoice: req.invoice.clone(), + payer_amount_sat: req.fees_sat + receiver_amount_sat, + txid: Some(txid.clone()), + }])?; + + result = Ok(SendPaymentResponse { txid }); + break; + } + SubSwapStates::TransactionClaimed + | SubSwapStates::InvoiceFailedToPay + | SubSwapStates::SwapExpired => { + result = Err(PaymentError::Generic { + err: format!("Payment state is unrecoverable: {}", state.to_string()), + }); + break; + } + _ => info!( + "New state for swap {}: {}", + create_response.id, + state.to_string() + ), + }; + + thread::sleep(Duration::from_millis(500)); + } + + socket.close(None).unwrap(); + result } fn try_claim( diff --git a/lib/core/src/utils.rs b/lib/core/src/utils.rs new file mode 100644 index 0000000..dc57137 --- /dev/null +++ b/lib/core/src/utils.rs @@ -0,0 +1,67 @@ +use std::net::TcpStream; + +use anyhow::{anyhow, ensure, Result}; +use boltz_client::swaps::boltzv2::SwapUpdate; +use log::{error, info}; +use tungstenite::{stream::MaybeTlsStream, WebSocket}; + +/// Fetch the swap status using the websocket endpoint +pub(crate) fn get_swap_status_v2( + socket: &mut WebSocket>, + swap_id: &str, +) -> Result { + loop { + let response: SwapUpdate = serde_json::from_str(&socket.read()?.to_string()) + .map_err(|e| anyhow!("WS response is invalid SwapUpdate: {e:?}"))?; + + match response { + SwapUpdate::Subscription { + event, + channel, + args, + } => { + ensure!(event == "subscribe", "Wrong WS reply event {event}"); + ensure!(channel == "swap.update", "Wrong WS reply channel {channel}"); + + let first_arg = args.first(); + let is_ok = matches!(first_arg.as_ref(), Some(&x) if x == &swap_id); + ensure!(is_ok, "Wrong WS reply subscription ID {first_arg:?}"); + + info!("Subscription successful for swap : {swap_id}"); + } + + SwapUpdate::Update { + event, + channel, + args, + } => { + ensure!(event == "update", "Wrong WS reply event {event}"); + ensure!(channel == "swap.update", "Wrong WS reply channel {channel}"); + + return match args.first() { + Some(update) if update.id == swap_id => { + info!("Got new reverse swap status: {}", update.status); + + Ok(update.status.clone()) + } + Some(update) => Err(anyhow!("WS reply has wrong swap ID {update:?}")), + None => Err(anyhow!("WS reply contains no update")), + }; + } + + SwapUpdate::Error { + event, + channel, + args, + } => { + ensure!(event == "update", "Wrong WS reply event {event}"); + ensure!(channel == "swap.update", "Wrong WS reply channel {channel}"); + + for e in &args { + error!("Got error: {} for swap: {}", e.error, e.id); + } + return Err(anyhow!("Got SwapUpdate errors: {args:?}")); + } + } + } +}