fix: change send_payment resolve

This commit is contained in:
yse
2024-05-10 18:19:49 +02:00
parent 5476ddad54
commit 2e8bbb3e4a
3 changed files with 200 additions and 203 deletions

View File

@@ -4,9 +4,8 @@ use std::net::TcpStream;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use anyhow::{anyhow, ensure, Result};
use anyhow::{anyhow, Result};
use boltz_client::swaps::{
boltz::{RevSwapStates, SubSwapStates},
boltzv2::{Subscription, SwapUpdate},
@@ -38,7 +37,7 @@ impl BoltzStatusStream {
tungstenite::stream::MaybeTlsStream::NativeTls(s) => {
s.get_mut().set_nonblocking(true)?
}
_ => Err(anyhow!("Unsupported stream type"))?
_ => Err(anyhow!("Unsupported stream type"))?,
};
thread::spawn(move || loop {
@@ -118,8 +117,8 @@ impl BoltzStatusStream {
// Status update
boltz_client::swaps::boltzv2::SwapUpdate::Update {
event,
channel,
event: _,
channel: _,
args,
} => {
let update = args.first().unwrap().clone(); // TODO
@@ -169,7 +168,7 @@ impl BoltzStatusStream {
// Calling socket.read() on a non-blocking stream when there is nothing
// to read results in an WouldBlock error. In this case, we do nothing
// and continue the loop.
ErrorKind::WouldBlock => {},
ErrorKind::WouldBlock => {}
_ => {
error!("Received stream IO error : {io_err:?}");
break;

View File

@@ -11,10 +11,7 @@ use anyhow::{anyhow, Result};
use boltz_client::{
network::electrum::ElectrumConfig,
swaps::{
boltz::{
BoltzApiClient, CreateSwapRequest, RevSwapStates, SubSwapStates, SwapStatusRequest,
BOLTZ_MAINNET_URL, BOLTZ_TESTNET_URL,
},
boltz::{RevSwapStates, SubSwapStates},
boltzv2::*,
liquid::{LBtcSwapScript, LBtcSwapTx},
liquidv2::LBtcSwapTxV2,
@@ -23,7 +20,7 @@ use boltz_client::{
Amount, Bolt11Invoice, Keypair, LBtcSwapScriptV2,
};
use elements::hashes::hex::DisplayHex;
use log::{debug, error, info, warn};
use log::{debug, info, warn};
use lwk_common::{singlesig_desc, Signer, Singlesig};
use lwk_signer::{AnySigner, SwSigner};
use lwk_wollet::{
@@ -137,28 +134,25 @@ impl LiquidSdk {
}
// We may be offline, or claiming failued due to other reasons until the swap reached these states
// If an ongoing reverse swap is in any of these states, we should be able to claim
RevSwapStates::TransactionMempool | RevSwapStates::TransactionConfirmed | RevSwapStates::InvoiceSettled => {
match self.try_claim_v2(&ongoing_swap_out) {
Ok(txid) => {
let payer_amount_sat = get_invoice_amount!(ongoing_swap_out.invoice);
self.persister
.resolve_ongoing_swap(
id,
Some((txid, PaymentData { payer_amount_sat })),
)
.map_err(|e| anyhow!("Could not resolve swap {id}: {e}"))?;
}
Err(err) => {
if let PaymentError::AlreadyClaimed = err {
warn!("Funds already claimed");
self.persister
.resolve_ongoing_swap(id, None)
.map_err(|_| anyhow!("Could not resolve swap {id} in database"))?;
}
warn!("Could not claim swap {id} yet. Err: {err}");
}
RevSwapStates::TransactionMempool
| RevSwapStates::TransactionConfirmed
| RevSwapStates::InvoiceSettled => match self.try_claim_v2(&ongoing_swap_out) {
Ok(txid) => {
let payer_amount_sat = get_invoice_amount!(ongoing_swap_out.invoice);
self.persister
.resolve_ongoing_swap(id, Some((txid, PaymentData { payer_amount_sat })))
.map_err(|e| anyhow!("Could not resolve swap {id}: {e}"))?;
}
}
Err(err) => {
if let PaymentError::AlreadyClaimed = err {
warn!("Funds already claimed");
self.persister
.resolve_ongoing_swap(id, None)
.map_err(|_| anyhow!("Could not resolve swap {id} in database"))?;
}
warn!("Could not claim swap {id} yet. Err: {err}");
}
},
RevSwapStates::Created | RevSwapStates::MinerFeePaid => {
// Too soon to try to claim
}
@@ -196,59 +190,59 @@ impl LiquidSdk {
}
// TODO Not needed anymore with the event stream
fn try_resolve_pending_swap(&self, swap: &OngoingSwap) -> Result<()> {
let client = self.boltz_client();
let client_v2 = self.boltz_client_v2();
match swap {
OngoingSwap::Receive(ongoing_swap_out) => {
let swap_state = utils::get_rev_swap_status_v2(client_v2, &ongoing_swap_out.id)?;
self.try_handle_reverse_swap_status(swap_state, &ongoing_swap_out.id)?;
}
OngoingSwap::Send(ongoing_swap_in) => {
let id = &ongoing_swap_in.id;
let status = client
.swap_status(SwapStatusRequest { id: id.clone() })
.map_err(|e| anyhow!("Failed to fetch swap status for ID {id}: {e:?}"))?
.status;
let swap_state: SubSwapStates = status.parse().map_err(|_| {
anyhow!("Invalid submarine swap state received for swap {id}: {status}")
})?;
self.try_handle_submarine_swap_status(swap_state, &ongoing_swap_in.id)?;
}
};
Ok(())
}
// fn try_resolve_pending_swap(&self, swap: &OngoingSwap) -> Result<()> {
// let client = self.boltz_client();
// let client_v2 = self.boltz_client_v2();
//
// match swap {
// OngoingSwap::Receive(ongoing_swap_out) => {
// let swap_state = utils::get_rev_swap_status_v2(client_v2, &ongoing_swap_out.id)?;
// self.try_handle_reverse_swap_status(swap_state, &ongoing_swap_out.id)?;
// }
// OngoingSwap::Send(ongoing_swap_in) => {
// let id = &ongoing_swap_in.id;
// let status = client
// .swap_status(SwapStatusRequest { id: id.clone() })
// .map_err(|e| anyhow!("Failed to fetch swap status for ID {id}: {e:?}"))?
// .status;
//
// let swap_state: SubSwapStates = status.parse().map_err(|_| {
// anyhow!("Invalid submarine swap state received for swap {id}: {status}")
// })?;
//
// self.try_handle_submarine_swap_status(swap_state, &ongoing_swap_in.id)?;
// }
// };
//
// Ok(())
// }
// TODO Not needed anymore with the event stream
fn track_pending_swaps(self: &Arc<LiquidSdk>) -> Result<()> {
let cloned = self.clone();
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(5));
match cloned.persister.list_ongoing_swaps() {
Ok(ongoing_swaps) => {
for swap in ongoing_swaps {
match cloned.try_resolve_pending_swap(&swap) {
Ok(_) => info!("Resolved pending swap {}", swap.id()),
Err(err) => match swap {
OngoingSwap::Send { .. } => error!("[Ongoing Send] {err}"),
OngoingSwap::Receive { .. } => error!("[Ongoing Receive] {err}"),
},
}
}
}
Err(e) => {
error!("Could not read ongoing swaps from database: {e}");
continue;
}
}
});
Ok(())
}
// fn track_pending_swaps(self: &Arc<LiquidSdk>) -> Result<()> {
// let cloned = self.clone();
// thread::spawn(move || loop {
// thread::sleep(Duration::from_secs(5));
// match cloned.persister.list_ongoing_swaps() {
// Ok(ongoing_swaps) => {
// for swap in ongoing_swaps {
// match cloned.try_resolve_pending_swap(&swap) {
// Ok(_) => info!("Resolved pending swap {}", swap.id()),
// Err(err) => match swap {
// OngoingSwap::Send { .. } => error!("[Ongoing Send] {err}"),
// OngoingSwap::Receive { .. } => error!("[Ongoing Receive] {err}"),
// },
// }
// }
// }
// Err(e) => {
// error!("Could not read ongoing swaps from database: {e}");
// continue;
// }
// }
// });
//
// Ok(())
// }
pub(crate) fn list_ongoing_swaps(&self) -> Result<Vec<OngoingSwap>> {
self.persister.list_ongoing_swaps()
@@ -286,14 +280,14 @@ impl LiquidSdk {
self.lwk_signer.clone()
}
fn boltz_client(&self) -> BoltzApiClient {
let base_url = match self.network {
Network::LiquidTestnet => BOLTZ_TESTNET_URL,
Network::Liquid => BOLTZ_MAINNET_URL,
};
BoltzApiClient::new(base_url)
}
// fn boltz_client(&self) -> BoltzApiClient {
// let base_url = match self.network {
// Network::LiquidTestnet => BOLTZ_TESTNET_URL,
// Network::Liquid => BOLTZ_MAINNET_URL,
// };
//
// BoltzApiClient::new(base_url)
// }
pub(crate) fn boltz_client_v2(&self) -> BoltzApiClientV2 {
let base_url = match self.network {
@@ -454,10 +448,10 @@ impl LiquidSdk {
})?;
let result;
let mut txid = String::new();
loop {
let data = match utils::get_swap_status_v2(&mut socket, &create_response.id) {
Ok(data) => data,
Err(_) => continue,
let Ok(data) = utils::get_swap_status_v2(&mut socket, &create_response.id) else {
continue;
};
let state = data
@@ -466,34 +460,21 @@ impl LiquidSdk {
err: "Invalid state received from swapper".to_string(),
})?;
// See https://docs.boltz.exchange/v/api/lifecycle#normal-submarine-swaps
match state {
SubSwapStates::TransactionMempool | SubSwapStates::TransactionConfirmed => {
// Send detected by Boltz, waiting for invoice
// to be settled
}
// Boltz has locked the HTLC, we proceed with locking up the funds
SubSwapStates::InvoiceSet => {
debug!(
"Send {} sats to BTC address {}",
"Initiated swap-in: send {} sats to liquid address {}",
create_response.expected_amount, create_response.address
);
// let absolute_fees = self
// .build_tx(
// None,
// &create_response.address,
// create_response.expected_amount,
// )?
// .all_fees()
// .values()
// .sum::<u64>();
// let fee_rate =
// req.fees_sat as f32 / absolute_fees as f32 * LIQUID_CLAIM_TX_FEERATE;
let tx = self.build_tx(
None,
&create_response.address,
create_response.expected_amount,
)?;
let txid = match self.network {
txid = match self.network {
Network::Liquid => {
let tx_hex = elements::encode::serialize(&tx).to_lower_hex_string();
let response = client.broadcast_tx(self.network.into(), &tx_hex)?;
@@ -518,26 +499,45 @@ impl LiquidSdk {
}
};
self.persister
.insert_or_update_ongoing_swap_in(OngoingSwapIn {
id: create_response.id.clone(),
invoice: req.invoice.clone(),
payer_amount_sat: req.fees_sat + receiver_amount_sat,
txid: Some(txid.clone()),
})?;
debug!(
"Successfully broadcast lockup transaction for swap {}. Txid: {}",
&create_response.id, &txid
);
self.persister.resolve_ongoing_swap(
&create_response.id,
Some((
txid.clone(),
PaymentData {
payer_amount_sat: receiver_amount_sat + req.fees_sat,
},
)),
)?;
debug!(
"Successfully resolved ongoing swap-in {}",
&create_response.id
);
}
// Boltz has detected our lockup
SubSwapStates::TransactionMempool | SubSwapStates::TransactionConfirmed => {}
// Boltz has broadcast the claim to the mempool, resolve with success
SubSwapStates::TransactionClaimed => {
result = Ok(SendPaymentResponse { txid });
break;
}
SubSwapStates::TransactionClaimed
| SubSwapStates::InvoiceFailedToPay
| SubSwapStates::SwapExpired => {
// Either:
// 1. Boltz failed to pay
// 2. The swap has expired (>24h)
// 3. TODO: Lockup failed (we sent too little funds)
// We initiate a cooperative refund, and then fallback to a regular one
SubSwapStates::InvoiceFailedToPay | SubSwapStates::SwapExpired => {
result = Err(PaymentError::Generic {
err: format!("Payment state is unrecoverable: {}", state.to_string()),
});
break;
}
_ => info!(
_ => debug!(
"New state for swap {}: {}",
create_response.id,
state.to_string()
@@ -591,7 +591,9 @@ impl LiquidSdk {
match self.network {
Network::Liquid => {
let tx_hex = elements::encode::serialize(&tx).to_lower_hex_string();
let response = self.boltz_client_v2().broadcast_tx(self.network.into(), &tx_hex)?;
let response = self
.boltz_client_v2()
.broadcast_tx(self.network.into(), &tx_hex)?;
info!("Claim broadcast response: {response:?}");
}
Network::LiquidTestnet => {

View File

@@ -1,11 +1,7 @@
use std::net::TcpStream;
use std::str::FromStr;
use anyhow::{anyhow, ensure, Result};
use boltz_client::swaps::{
boltz::RevSwapStates,
boltzv2::{BoltzApiClientV2, Subscription, SwapUpdate},
};
use boltz_client::swaps::boltzv2::SwapUpdate;
use log::{error, info};
use tungstenite::{stream::MaybeTlsStream, WebSocket};
@@ -28,7 +24,7 @@ pub(crate) fn get_swap_status_v2(
ensure!(channel == "swap.update", "Wrong WS reply channel {channel}");
let first_arg = args.first();
let is_ok = matches!(first_arg.as_ref(), Some(&x) if x == &swap_id);
let is_ok = matches!(first_arg.as_ref(), Some(&x) if x == swap_id);
ensure!(is_ok, "Wrong WS reply subscription ID {first_arg:?}");
info!("Subscription successful for swap : {swap_id}");
@@ -70,77 +66,77 @@ pub(crate) fn get_swap_status_v2(
}
}
/// Fetch the reverse swap status using the websocket endpoint
pub(crate) fn get_rev_swap_status_v2(
client_v2: BoltzApiClientV2,
swap_id: &str,
) -> Result<RevSwapStates> {
let mut socket = client_v2
.connect_ws()
.map_err(|e| anyhow!("Failed to connect to websocket: {e:?}"))?;
let sub_id = swap_id.to_string();
let subscription = Subscription::new(&sub_id);
let subscribe_json = serde_json::to_string(&subscription)
.map_err(|e| anyhow!("Failed to serialize subscription msg: {e:?}"))?;
socket
.send(tungstenite::Message::Text(subscribe_json))
.map_err(|e| anyhow!("Failed to subscribe to websocket updates: {e:?}"))?;
loop {
let response: SwapUpdate = serde_json::from_str(&socket.read()?.to_string())
.map_err(|e| anyhow!("WS response is invalid SwapUpdate: {e:?}"))?;
match response {
SwapUpdate::Subscription {
event,
channel,
args,
} => {
ensure!(event == "subscribe", "Wrong WS reply event {event}");
ensure!(channel == "swap.update", "Wrong WS reply channel {channel}");
let first_arg = args.first();
let is_ok = matches!(first_arg.as_ref(), Some(&x) if x == &sub_id);
ensure!(is_ok, "Wrong WS reply subscription ID {first_arg:?}");
info!("Subscription successful for swap : {sub_id}");
}
SwapUpdate::Update {
event,
channel,
args,
} => {
ensure!(event == "update", "Wrong WS reply event {event}");
ensure!(channel == "swap.update", "Wrong WS reply channel {channel}");
return match args.first() {
Some(update) if update.id == sub_id => {
info!("Got new reverse swap status: {}", update.status);
RevSwapStates::from_str(&update.status).map_err(|_| {
anyhow!("Invalid state for rev swap {swap_id}: {}", update.status)
})
}
Some(update) => Err(anyhow!("WS reply has wrong swap ID {update:?}")),
None => Err(anyhow!("WS reply contains no update")),
};
}
SwapUpdate::Error {
event,
channel,
args,
} => {
ensure!(event == "update", "Wrong WS reply event {event}");
ensure!(channel == "swap.update", "Wrong WS reply channel {channel}");
for e in &args {
error!("Got error: {} for swap: {}", e.error, e.id);
}
return Err(anyhow!("Got SwapUpdate errors: {args:?}"));
}
}
}
}
// Fetch the reverse swap status using the websocket endpoint
// pub(crate) fn get_rev_swap_status_v2(
// client_v2: BoltzApiClientV2,
// swap_id: &str,
// ) -> Result<RevSwapStates> {
// let mut socket = client_v2
// .connect_ws()
// .map_err(|e| anyhow!("Failed to connect to websocket: {e:?}"))?;
//
// let sub_id = swap_id.to_string();
// let subscription = Subscription::new(&sub_id);
// let subscribe_json = serde_json::to_string(&subscription)
// .map_err(|e| anyhow!("Failed to serialize subscription msg: {e:?}"))?;
// socket
// .send(tungstenite::Message::Text(subscribe_json))
// .map_err(|e| anyhow!("Failed to subscribe to websocket updates: {e:?}"))?;
//
// loop {
// let response: SwapUpdate = serde_json::from_str(&socket.read()?.to_string())
// .map_err(|e| anyhow!("WS response is invalid SwapUpdate: {e:?}"))?;
//
// match response {
// SwapUpdate::Subscription {
// event,
// channel,
// args,
// } => {
// ensure!(event == "subscribe", "Wrong WS reply event {event}");
// ensure!(channel == "swap.update", "Wrong WS reply channel {channel}");
//
// let first_arg = args.first();
// let is_ok = matches!(first_arg.as_ref(), Some(&x) if x == &sub_id);
// ensure!(is_ok, "Wrong WS reply subscription ID {first_arg:?}");
//
// info!("Subscription successful for swap : {sub_id}");
// }
//
// SwapUpdate::Update {
// event,
// channel,
// args,
// } => {
// ensure!(event == "update", "Wrong WS reply event {event}");
// ensure!(channel == "swap.update", "Wrong WS reply channel {channel}");
//
// return match args.first() {
// Some(update) if update.id == sub_id => {
// info!("Got new reverse swap status: {}", update.status);
//
// RevSwapStates::from_str(&update.status).map_err(|_| {
// anyhow!("Invalid state for rev swap {swap_id}: {}", update.status)
// })
// }
// Some(update) => Err(anyhow!("WS reply has wrong swap ID {update:?}")),
// None => Err(anyhow!("WS reply contains no update")),
// };
// }
//
// SwapUpdate::Error {
// event,
// channel,
// args,
// } => {
// ensure!(event == "update", "Wrong WS reply event {event}");
// ensure!(channel == "swap.update", "Wrong WS reply channel {channel}");
//
// for e in &args {
// error!("Got error: {} for swap: {}", e.error, e.id);
// }
// return Err(anyhow!("Got SwapUpdate errors: {args:?}"));
// }
// }
// }
// }