From 2e8bbb3e4aaa21981021a1aebf01386433f207fc Mon Sep 17 00:00:00 2001 From: yse Date: Fri, 10 May 2024 18:19:49 +0200 Subject: [PATCH] fix: change send_payment resolve --- lib/core/src/boltz_status_stream.rs | 11 +- lib/core/src/sdk.rs | 236 ++++++++++++++-------------- lib/core/src/utils.rs | 156 +++++++++--------- 3 files changed, 200 insertions(+), 203 deletions(-) diff --git a/lib/core/src/boltz_status_stream.rs b/lib/core/src/boltz_status_stream.rs index a21478f..7446826 100644 --- a/lib/core/src/boltz_status_stream.rs +++ b/lib/core/src/boltz_status_stream.rs @@ -4,9 +4,8 @@ use std::net::TcpStream; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::thread; -use std::time::Duration; -use anyhow::{anyhow, ensure, Result}; +use anyhow::{anyhow, Result}; use boltz_client::swaps::{ boltz::{RevSwapStates, SubSwapStates}, boltzv2::{Subscription, SwapUpdate}, @@ -38,7 +37,7 @@ impl BoltzStatusStream { tungstenite::stream::MaybeTlsStream::NativeTls(s) => { s.get_mut().set_nonblocking(true)? } - _ => Err(anyhow!("Unsupported stream type"))? + _ => Err(anyhow!("Unsupported stream type"))?, }; thread::spawn(move || loop { @@ -118,8 +117,8 @@ impl BoltzStatusStream { // Status update boltz_client::swaps::boltzv2::SwapUpdate::Update { - event, - channel, + event: _, + channel: _, args, } => { let update = args.first().unwrap().clone(); // TODO @@ -169,7 +168,7 @@ impl BoltzStatusStream { // Calling socket.read() on a non-blocking stream when there is nothing // to read results in an WouldBlock error. In this case, we do nothing // and continue the loop. - ErrorKind::WouldBlock => {}, + ErrorKind::WouldBlock => {} _ => { error!("Received stream IO error : {io_err:?}"); break; diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index eea5be2..0215b82 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -11,10 +11,7 @@ use anyhow::{anyhow, Result}; use boltz_client::{ network::electrum::ElectrumConfig, swaps::{ - boltz::{ - BoltzApiClient, CreateSwapRequest, RevSwapStates, SubSwapStates, SwapStatusRequest, - BOLTZ_MAINNET_URL, BOLTZ_TESTNET_URL, - }, + boltz::{RevSwapStates, SubSwapStates}, boltzv2::*, liquid::{LBtcSwapScript, LBtcSwapTx}, liquidv2::LBtcSwapTxV2, @@ -23,7 +20,7 @@ use boltz_client::{ Amount, Bolt11Invoice, Keypair, LBtcSwapScriptV2, }; use elements::hashes::hex::DisplayHex; -use log::{debug, error, info, warn}; +use log::{debug, info, warn}; use lwk_common::{singlesig_desc, Signer, Singlesig}; use lwk_signer::{AnySigner, SwSigner}; use lwk_wollet::{ @@ -137,28 +134,25 @@ impl LiquidSdk { } // We may be offline, or claiming failued due to other reasons until the swap reached these states // If an ongoing reverse swap is in any of these states, we should be able to claim - RevSwapStates::TransactionMempool | RevSwapStates::TransactionConfirmed | RevSwapStates::InvoiceSettled => { - match self.try_claim_v2(&ongoing_swap_out) { - Ok(txid) => { - let payer_amount_sat = get_invoice_amount!(ongoing_swap_out.invoice); - self.persister - .resolve_ongoing_swap( - id, - Some((txid, PaymentData { payer_amount_sat })), - ) - .map_err(|e| anyhow!("Could not resolve swap {id}: {e}"))?; - } - Err(err) => { - if let PaymentError::AlreadyClaimed = err { - warn!("Funds already claimed"); - self.persister - .resolve_ongoing_swap(id, None) - .map_err(|_| anyhow!("Could not resolve swap {id} in database"))?; - } - warn!("Could not claim swap {id} yet. Err: {err}"); - } + RevSwapStates::TransactionMempool + | RevSwapStates::TransactionConfirmed + | RevSwapStates::InvoiceSettled => match self.try_claim_v2(&ongoing_swap_out) { + Ok(txid) => { + let payer_amount_sat = get_invoice_amount!(ongoing_swap_out.invoice); + self.persister + .resolve_ongoing_swap(id, Some((txid, PaymentData { payer_amount_sat }))) + .map_err(|e| anyhow!("Could not resolve swap {id}: {e}"))?; } - } + Err(err) => { + if let PaymentError::AlreadyClaimed = err { + warn!("Funds already claimed"); + self.persister + .resolve_ongoing_swap(id, None) + .map_err(|_| anyhow!("Could not resolve swap {id} in database"))?; + } + warn!("Could not claim swap {id} yet. Err: {err}"); + } + }, RevSwapStates::Created | RevSwapStates::MinerFeePaid => { // Too soon to try to claim } @@ -196,59 +190,59 @@ impl LiquidSdk { } // TODO Not needed anymore with the event stream - fn try_resolve_pending_swap(&self, swap: &OngoingSwap) -> Result<()> { - let client = self.boltz_client(); - let client_v2 = self.boltz_client_v2(); - - match swap { - OngoingSwap::Receive(ongoing_swap_out) => { - let swap_state = utils::get_rev_swap_status_v2(client_v2, &ongoing_swap_out.id)?; - self.try_handle_reverse_swap_status(swap_state, &ongoing_swap_out.id)?; - } - OngoingSwap::Send(ongoing_swap_in) => { - let id = &ongoing_swap_in.id; - let status = client - .swap_status(SwapStatusRequest { id: id.clone() }) - .map_err(|e| anyhow!("Failed to fetch swap status for ID {id}: {e:?}"))? - .status; - - let swap_state: SubSwapStates = status.parse().map_err(|_| { - anyhow!("Invalid submarine swap state received for swap {id}: {status}") - })?; - - self.try_handle_submarine_swap_status(swap_state, &ongoing_swap_in.id)?; - } - }; - - Ok(()) - } + // fn try_resolve_pending_swap(&self, swap: &OngoingSwap) -> Result<()> { + // let client = self.boltz_client(); + // let client_v2 = self.boltz_client_v2(); + // + // match swap { + // OngoingSwap::Receive(ongoing_swap_out) => { + // let swap_state = utils::get_rev_swap_status_v2(client_v2, &ongoing_swap_out.id)?; + // self.try_handle_reverse_swap_status(swap_state, &ongoing_swap_out.id)?; + // } + // OngoingSwap::Send(ongoing_swap_in) => { + // let id = &ongoing_swap_in.id; + // let status = client + // .swap_status(SwapStatusRequest { id: id.clone() }) + // .map_err(|e| anyhow!("Failed to fetch swap status for ID {id}: {e:?}"))? + // .status; + // + // let swap_state: SubSwapStates = status.parse().map_err(|_| { + // anyhow!("Invalid submarine swap state received for swap {id}: {status}") + // })?; + // + // self.try_handle_submarine_swap_status(swap_state, &ongoing_swap_in.id)?; + // } + // }; + // + // Ok(()) + // } // TODO Not needed anymore with the event stream - fn track_pending_swaps(self: &Arc) -> Result<()> { - let cloned = self.clone(); - thread::spawn(move || loop { - thread::sleep(Duration::from_secs(5)); - match cloned.persister.list_ongoing_swaps() { - Ok(ongoing_swaps) => { - for swap in ongoing_swaps { - match cloned.try_resolve_pending_swap(&swap) { - Ok(_) => info!("Resolved pending swap {}", swap.id()), - Err(err) => match swap { - OngoingSwap::Send { .. } => error!("[Ongoing Send] {err}"), - OngoingSwap::Receive { .. } => error!("[Ongoing Receive] {err}"), - }, - } - } - } - Err(e) => { - error!("Could not read ongoing swaps from database: {e}"); - continue; - } - } - }); - - Ok(()) - } + // fn track_pending_swaps(self: &Arc) -> Result<()> { + // let cloned = self.clone(); + // thread::spawn(move || loop { + // thread::sleep(Duration::from_secs(5)); + // match cloned.persister.list_ongoing_swaps() { + // Ok(ongoing_swaps) => { + // for swap in ongoing_swaps { + // match cloned.try_resolve_pending_swap(&swap) { + // Ok(_) => info!("Resolved pending swap {}", swap.id()), + // Err(err) => match swap { + // OngoingSwap::Send { .. } => error!("[Ongoing Send] {err}"), + // OngoingSwap::Receive { .. } => error!("[Ongoing Receive] {err}"), + // }, + // } + // } + // } + // Err(e) => { + // error!("Could not read ongoing swaps from database: {e}"); + // continue; + // } + // } + // }); + // + // Ok(()) + // } pub(crate) fn list_ongoing_swaps(&self) -> Result> { self.persister.list_ongoing_swaps() @@ -286,14 +280,14 @@ impl LiquidSdk { self.lwk_signer.clone() } - fn boltz_client(&self) -> BoltzApiClient { - let base_url = match self.network { - Network::LiquidTestnet => BOLTZ_TESTNET_URL, - Network::Liquid => BOLTZ_MAINNET_URL, - }; - - BoltzApiClient::new(base_url) - } + // fn boltz_client(&self) -> BoltzApiClient { + // let base_url = match self.network { + // Network::LiquidTestnet => BOLTZ_TESTNET_URL, + // Network::Liquid => BOLTZ_MAINNET_URL, + // }; + // + // BoltzApiClient::new(base_url) + // } pub(crate) fn boltz_client_v2(&self) -> BoltzApiClientV2 { let base_url = match self.network { @@ -454,10 +448,10 @@ impl LiquidSdk { })?; let result; + let mut txid = String::new(); loop { - let data = match utils::get_swap_status_v2(&mut socket, &create_response.id) { - Ok(data) => data, - Err(_) => continue, + let Ok(data) = utils::get_swap_status_v2(&mut socket, &create_response.id) else { + continue; }; let state = data @@ -466,34 +460,21 @@ impl LiquidSdk { err: "Invalid state received from swapper".to_string(), })?; + // See https://docs.boltz.exchange/v/api/lifecycle#normal-submarine-swaps match state { - SubSwapStates::TransactionMempool | SubSwapStates::TransactionConfirmed => { - // Send detected by Boltz, waiting for invoice - // to be settled - } + // Boltz has locked the HTLC, we proceed with locking up the funds SubSwapStates::InvoiceSet => { debug!( - "Send {} sats to BTC address {}", + "Initiated swap-in: send {} sats to liquid address {}", create_response.expected_amount, create_response.address ); - // let absolute_fees = self - // .build_tx( - // None, - // &create_response.address, - // create_response.expected_amount, - // )? - // .all_fees() - // .values() - // .sum::(); - // let fee_rate = - // req.fees_sat as f32 / absolute_fees as f32 * LIQUID_CLAIM_TX_FEERATE; let tx = self.build_tx( None, &create_response.address, create_response.expected_amount, )?; - let txid = match self.network { + txid = match self.network { Network::Liquid => { let tx_hex = elements::encode::serialize(&tx).to_lower_hex_string(); let response = client.broadcast_tx(self.network.into(), &tx_hex)?; @@ -518,26 +499,45 @@ impl LiquidSdk { } }; - self.persister - .insert_or_update_ongoing_swap_in(OngoingSwapIn { - id: create_response.id.clone(), - invoice: req.invoice.clone(), - payer_amount_sat: req.fees_sat + receiver_amount_sat, - txid: Some(txid.clone()), - })?; + debug!( + "Successfully broadcast lockup transaction for swap {}. Txid: {}", + &create_response.id, &txid + ); + self.persister.resolve_ongoing_swap( + &create_response.id, + Some(( + txid.clone(), + PaymentData { + payer_amount_sat: receiver_amount_sat + req.fees_sat, + }, + )), + )?; + + debug!( + "Successfully resolved ongoing swap-in {}", + &create_response.id + ); + } + // Boltz has detected our lockup + SubSwapStates::TransactionMempool | SubSwapStates::TransactionConfirmed => {} + // Boltz has broadcast the claim to the mempool, resolve with success + SubSwapStates::TransactionClaimed => { result = Ok(SendPaymentResponse { txid }); break; } - SubSwapStates::TransactionClaimed - | SubSwapStates::InvoiceFailedToPay - | SubSwapStates::SwapExpired => { + // Either: + // 1. Boltz failed to pay + // 2. The swap has expired (>24h) + // 3. TODO: Lockup failed (we sent too little funds) + // We initiate a cooperative refund, and then fallback to a regular one + SubSwapStates::InvoiceFailedToPay | SubSwapStates::SwapExpired => { result = Err(PaymentError::Generic { err: format!("Payment state is unrecoverable: {}", state.to_string()), }); break; } - _ => info!( + _ => debug!( "New state for swap {}: {}", create_response.id, state.to_string() @@ -591,7 +591,9 @@ impl LiquidSdk { match self.network { Network::Liquid => { let tx_hex = elements::encode::serialize(&tx).to_lower_hex_string(); - let response = self.boltz_client_v2().broadcast_tx(self.network.into(), &tx_hex)?; + let response = self + .boltz_client_v2() + .broadcast_tx(self.network.into(), &tx_hex)?; info!("Claim broadcast response: {response:?}"); } Network::LiquidTestnet => { diff --git a/lib/core/src/utils.rs b/lib/core/src/utils.rs index 851e380..41a923f 100644 --- a/lib/core/src/utils.rs +++ b/lib/core/src/utils.rs @@ -1,11 +1,7 @@ use std::net::TcpStream; -use std::str::FromStr; use anyhow::{anyhow, ensure, Result}; -use boltz_client::swaps::{ - boltz::RevSwapStates, - boltzv2::{BoltzApiClientV2, Subscription, SwapUpdate}, -}; +use boltz_client::swaps::boltzv2::SwapUpdate; use log::{error, info}; use tungstenite::{stream::MaybeTlsStream, WebSocket}; @@ -28,7 +24,7 @@ pub(crate) fn get_swap_status_v2( ensure!(channel == "swap.update", "Wrong WS reply channel {channel}"); let first_arg = args.first(); - let is_ok = matches!(first_arg.as_ref(), Some(&x) if x == &swap_id); + let is_ok = matches!(first_arg.as_ref(), Some(&x) if x == swap_id); ensure!(is_ok, "Wrong WS reply subscription ID {first_arg:?}"); info!("Subscription successful for swap : {swap_id}"); @@ -70,77 +66,77 @@ pub(crate) fn get_swap_status_v2( } } -/// Fetch the reverse swap status using the websocket endpoint -pub(crate) fn get_rev_swap_status_v2( - client_v2: BoltzApiClientV2, - swap_id: &str, -) -> Result { - let mut socket = client_v2 - .connect_ws() - .map_err(|e| anyhow!("Failed to connect to websocket: {e:?}"))?; - - let sub_id = swap_id.to_string(); - let subscription = Subscription::new(&sub_id); - let subscribe_json = serde_json::to_string(&subscription) - .map_err(|e| anyhow!("Failed to serialize subscription msg: {e:?}"))?; - socket - .send(tungstenite::Message::Text(subscribe_json)) - .map_err(|e| anyhow!("Failed to subscribe to websocket updates: {e:?}"))?; - - loop { - let response: SwapUpdate = serde_json::from_str(&socket.read()?.to_string()) - .map_err(|e| anyhow!("WS response is invalid SwapUpdate: {e:?}"))?; - - match response { - SwapUpdate::Subscription { - event, - channel, - args, - } => { - ensure!(event == "subscribe", "Wrong WS reply event {event}"); - ensure!(channel == "swap.update", "Wrong WS reply channel {channel}"); - - let first_arg = args.first(); - let is_ok = matches!(first_arg.as_ref(), Some(&x) if x == &sub_id); - ensure!(is_ok, "Wrong WS reply subscription ID {first_arg:?}"); - - info!("Subscription successful for swap : {sub_id}"); - } - - SwapUpdate::Update { - event, - channel, - args, - } => { - ensure!(event == "update", "Wrong WS reply event {event}"); - ensure!(channel == "swap.update", "Wrong WS reply channel {channel}"); - - return match args.first() { - Some(update) if update.id == sub_id => { - info!("Got new reverse swap status: {}", update.status); - - RevSwapStates::from_str(&update.status).map_err(|_| { - anyhow!("Invalid state for rev swap {swap_id}: {}", update.status) - }) - } - Some(update) => Err(anyhow!("WS reply has wrong swap ID {update:?}")), - None => Err(anyhow!("WS reply contains no update")), - }; - } - - SwapUpdate::Error { - event, - channel, - args, - } => { - ensure!(event == "update", "Wrong WS reply event {event}"); - ensure!(channel == "swap.update", "Wrong WS reply channel {channel}"); - - for e in &args { - error!("Got error: {} for swap: {}", e.error, e.id); - } - return Err(anyhow!("Got SwapUpdate errors: {args:?}")); - } - } - } -} +// Fetch the reverse swap status using the websocket endpoint +// pub(crate) fn get_rev_swap_status_v2( +// client_v2: BoltzApiClientV2, +// swap_id: &str, +// ) -> Result { +// let mut socket = client_v2 +// .connect_ws() +// .map_err(|e| anyhow!("Failed to connect to websocket: {e:?}"))?; +// +// let sub_id = swap_id.to_string(); +// let subscription = Subscription::new(&sub_id); +// let subscribe_json = serde_json::to_string(&subscription) +// .map_err(|e| anyhow!("Failed to serialize subscription msg: {e:?}"))?; +// socket +// .send(tungstenite::Message::Text(subscribe_json)) +// .map_err(|e| anyhow!("Failed to subscribe to websocket updates: {e:?}"))?; +// +// loop { +// let response: SwapUpdate = serde_json::from_str(&socket.read()?.to_string()) +// .map_err(|e| anyhow!("WS response is invalid SwapUpdate: {e:?}"))?; +// +// match response { +// SwapUpdate::Subscription { +// event, +// channel, +// args, +// } => { +// ensure!(event == "subscribe", "Wrong WS reply event {event}"); +// ensure!(channel == "swap.update", "Wrong WS reply channel {channel}"); +// +// let first_arg = args.first(); +// let is_ok = matches!(first_arg.as_ref(), Some(&x) if x == &sub_id); +// ensure!(is_ok, "Wrong WS reply subscription ID {first_arg:?}"); +// +// info!("Subscription successful for swap : {sub_id}"); +// } +// +// SwapUpdate::Update { +// event, +// channel, +// args, +// } => { +// ensure!(event == "update", "Wrong WS reply event {event}"); +// ensure!(channel == "swap.update", "Wrong WS reply channel {channel}"); +// +// return match args.first() { +// Some(update) if update.id == sub_id => { +// info!("Got new reverse swap status: {}", update.status); +// +// RevSwapStates::from_str(&update.status).map_err(|_| { +// anyhow!("Invalid state for rev swap {swap_id}: {}", update.status) +// }) +// } +// Some(update) => Err(anyhow!("WS reply has wrong swap ID {update:?}")), +// None => Err(anyhow!("WS reply contains no update")), +// }; +// } +// +// SwapUpdate::Error { +// event, +// channel, +// args, +// } => { +// ensure!(event == "update", "Wrong WS reply event {event}"); +// ensure!(channel == "swap.update", "Wrong WS reply channel {channel}"); +// +// for e in &args { +// error!("Got error: {} for swap: {}", e.error, e.id); +// } +// return Err(anyhow!("Got SwapUpdate errors: {args:?}")); +// } +// } +// } +// }