Migrate receive-payment to V2 API, use WS to get status

This commit is contained in:
ok300
2024-05-09 17:13:15 +02:00
parent 18257dbc91
commit d67756bf43
9 changed files with 478 additions and 313 deletions

2
cli/Cargo.lock generated
View File

@@ -355,7 +355,7 @@ dependencies = [
[[package]]
name = "boltz-client"
version = "0.1.3"
source = "git+https://github.com/hydra-yse/boltz-rust?rev=410d3a95e528fce36c02e8d414d5b647a31cc28f#410d3a95e528fce36c02e8d414d5b647a31cc28f"
source = "git+https://github.com/hydra-yse/boltz-rust?rev=50b93fb7eba043e12a71fd7ddb1e9604a946c21b#50b93fb7eba043e12a71fd7ddb1e9604a946c21b"
dependencies = [
"bip39",
"bitcoin 0.31.2",

2
lib/Cargo.lock generated
View File

@@ -478,7 +478,7 @@ dependencies = [
[[package]]
name = "boltz-client"
version = "0.1.3"
source = "git+https://github.com/hydra-yse/boltz-rust?rev=410d3a95e528fce36c02e8d414d5b647a31cc28f#410d3a95e528fce36c02e8d414d5b647a31cc28f"
source = "git+https://github.com/hydra-yse/boltz-rust?rev=50b93fb7eba043e12a71fd7ddb1e9604a946c21b#50b93fb7eba043e12a71fd7ddb1e9604a946c21b"
dependencies = [
"bip39",
"bitcoin 0.31.2",

View File

@@ -10,7 +10,8 @@ crate-type = ["lib", "cdylib", "staticlib"]
[dependencies]
anyhow = { workspace = true }
bip39 = { version = "2.0.0", features = ["serde"] }
boltz-client = { git = "https://github.com/hydra-yse/boltz-rust", rev = "410d3a95e528fce36c02e8d414d5b647a31cc28f" }
#boltz-client = { git = "https://github.com/SatoshiPortal/boltz-rust", rev = "6f45fff8b87c7530c847eb05f018906c48785a6c" }
boltz-client = { git = "https://github.com/hydra-yse/boltz-rust", rev = "50b93fb7eba043e12a71fd7ddb1e9604a946c21b" }
flutter_rust_bridge = { version = "=2.0.0-dev.33", features = ["chrono"], optional = true }
log = "0.4.20"
lwk_common = "0.3.0"
@@ -19,12 +20,12 @@ lwk_wollet = "0.3.0"
rusqlite = { version = "0.31", features = ["backup", "bundled"] }
rusqlite_migration = "1.0"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.116"
thiserror = { workspace = true }
tungstenite = { version = "0.21.0", features = ["native-tls-vendored"] }
openssl = { version = "0.10", features = ["vendored"] }
# TODO Remove once fully migrated to v2 API
elements = "0.24.1"
serde_json = "1.0.116"
tungstenite = { version = "0.21.0", features = ["native-tls-vendored"] }
[dev-dependencies]
tempdir = "0.3.7"

View File

@@ -128,32 +128,39 @@ pub struct RestoreRequest {
pub backup_path: Option<String>,
}
#[derive(Debug)]
#[derive(Clone, Debug)]
pub(crate) enum OngoingSwap {
Send {
id: String,
invoice: String,
payer_amount_sat: u64,
txid: Option<String>,
},
Receive {
id: String,
preimage: String,
redeem_script: String,
blinding_key: String,
invoice: String,
receiver_amount_sat: u64,
claim_fees_sat: u64,
},
Send(OngoingSwapIn),
Receive(OngoingSwapOut),
}
impl OngoingSwap {
pub(crate) fn id(&self) -> &str {
pub(crate) fn id(&self) -> String {
match &self {
OngoingSwap::Send { id, .. } | OngoingSwap::Receive { id, .. } => id,
OngoingSwap::Send(OngoingSwapIn { id, .. })
| OngoingSwap::Receive(OngoingSwapOut { id, .. }) => id.clone(),
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct OngoingSwapIn {
pub(crate) id: String,
pub(crate) invoice: String,
pub(crate) payer_amount_sat: u64,
pub(crate) txid: Option<String>,
}
#[derive(Clone, Debug)]
pub(crate) struct OngoingSwapOut {
pub(crate) id: String,
pub(crate) preimage: String,
pub(crate) redeem_script: String,
pub(crate) blinding_key: String,
pub(crate) invoice: String,
pub(crate) receiver_amount_sat: u64,
pub(crate) claim_fees_sat: u64,
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub enum PaymentType {
Sent,
@@ -177,11 +184,11 @@ pub struct Payment {
impl From<OngoingSwap> for Payment {
fn from(swap: OngoingSwap) -> Self {
match swap {
OngoingSwap::Send {
OngoingSwap::Send(OngoingSwapIn {
invoice,
payer_amount_sat,
..
} => {
}) => {
let receiver_amount_sat = get_invoice_amount!(invoice);
Payment {
id: None,
@@ -192,11 +199,11 @@ impl From<OngoingSwap> for Payment {
fees_sat: Some(payer_amount_sat - receiver_amount_sat),
}
}
OngoingSwap::Receive {
OngoingSwap::Receive(OngoingSwapOut {
receiver_amount_sat,
invoice,
..
} => {
}) => {
let payer_amount_sat = get_invoice_amount!(invoice);
Payment {
id: None,

View File

@@ -1,5 +1,7 @@
mod backup;
mod migrations;
mod swap_in;
mod swap_out;
use std::{collections::HashMap, fs::create_dir_all, path::PathBuf, str::FromStr};
@@ -8,7 +10,7 @@ use migrations::current_migrations;
use rusqlite::{params, Connection};
use rusqlite_migration::{Migrations, M};
use crate::model::{Network, Network::*, OngoingSwap, PaymentData};
use crate::model::{Network::*, *};
pub(crate) struct Persister {
main_db_dir: PathBuf,
@@ -47,70 +49,6 @@ impl Persister {
Ok(())
}
pub fn insert_or_update_ongoing_swap(&self, swaps: &[OngoingSwap]) -> Result<()> {
let con = self.get_connection()?;
for swap in swaps {
match swap {
OngoingSwap::Send {
id,
invoice,
payer_amount_sat,
txid,
} => {
let mut stmt = con.prepare(
"
INSERT OR REPLACE INTO ongoing_send_swaps (
id,
invoice,
payer_amount_sat,
txid
)
VALUES (?, ?, ?, ?)
",
)?;
_ = stmt.execute((id, invoice, payer_amount_sat, txid))?
}
OngoingSwap::Receive {
id,
preimage,
redeem_script,
blinding_key,
invoice,
receiver_amount_sat,
claim_fees_sat,
} => {
let mut stmt = con.prepare(
"
INSERT OR REPLACE INTO ongoing_receive_swaps (
id,
preimage,
redeem_script,
blinding_key,
invoice,
receiver_amount_sat,
claim_fees_sat
)
VALUES (?, ?, ?, ?, ?, ?, ?)
",
)?;
_ = stmt.execute((
id,
preimage,
redeem_script,
blinding_key,
invoice,
receiver_amount_sat,
claim_fees_sat,
))?
}
}
}
Ok(())
}
pub fn resolve_ongoing_swap(
&self,
id: &str,
@@ -136,75 +74,19 @@ impl Persister {
Ok(())
}
pub fn list_ongoing_swaps(&self) -> Result<Vec<OngoingSwap>> {
pub(crate) fn list_ongoing_swaps(&self) -> Result<Vec<OngoingSwap>> {
let con = self.get_connection()?;
let mut ongoing_swaps = self.list_ongoing_send(&con)?;
ongoing_swaps.append(&mut self.list_ongoing_receive(&con)?);
Ok(ongoing_swaps)
}
fn list_ongoing_send(&self, con: &Connection) -> Result<Vec<OngoingSwap>, rusqlite::Error> {
let mut stmt = con.prepare(
"
SELECT
id,
invoice,
payer_amount_sat,
txid,
created_at
FROM ongoing_send_swaps
ORDER BY created_at
",
)?;
let ongoing_send = stmt
.query_map(params![], |row| {
Ok(OngoingSwap::Send {
id: row.get(0)?,
invoice: row.get(1)?,
payer_amount_sat: row.get(2)?,
txid: row.get(3)?,
})
})?
.map(|i| i.unwrap())
let ongoing_swap_ins: Vec<OngoingSwap> = self
.list_ongoing_send(&con, vec![])?
.into_iter()
.map(OngoingSwap::Send)
.collect();
Ok(ongoing_send)
}
fn list_ongoing_receive(&self, con: &Connection) -> Result<Vec<OngoingSwap>, rusqlite::Error> {
let mut stmt = con.prepare(
"
SELECT
id,
preimage,
redeem_script,
blinding_key,
invoice,
receiver_amount_sat,
claim_fees_sat,
created_at
FROM ongoing_receive_swaps
ORDER BY created_at
",
)?;
let ongoing_receive = stmt
.query_map(params![], |row| {
Ok(OngoingSwap::Receive {
id: row.get(0)?,
preimage: row.get(1)?,
redeem_script: row.get(2)?,
blinding_key: row.get(3)?,
invoice: row.get(4)?,
receiver_amount_sat: row.get(5)?,
claim_fees_sat: row.get(6)?,
})
})?
.map(|i| i.unwrap())
let ongoing_swap_outs: Vec<OngoingSwap> = self
.list_ongoing_receive(&con, vec![])?
.into_iter()
.map(OngoingSwap::Receive)
.collect();
Ok(ongoing_receive)
Ok([ongoing_swap_ins, ongoing_swap_outs].concat())
}
pub fn get_payment_data(&self) -> Result<HashMap<String, PaymentData>> {

View File

@@ -0,0 +1,84 @@
use crate::model::*;
use crate::persist::Persister;
use anyhow::Result;
use rusqlite::{params, Connection, OptionalExtension, Row};
impl Persister {
pub(crate) fn insert_or_update_ongoing_swap_in(&self, swap_in: OngoingSwapIn) -> Result<()> {
let con = self.get_connection()?;
let mut stmt = con.prepare(
"
INSERT OR REPLACE INTO ongoing_send_swaps (
id,
invoice,
payer_amount_sat,
txid
)
VALUES (?, ?, ?, ?)",
)?;
_ = stmt.execute((
swap_in.id,
swap_in.invoice,
swap_in.payer_amount_sat,
swap_in.txid,
))?;
Ok(())
}
fn list_ongoing_swap_in_query(where_clauses: Vec<&str>) -> String {
let mut where_clause_str = String::new();
if !where_clauses.is_empty() {
where_clause_str = String::from("WHERE ");
where_clause_str.push_str(where_clauses.join(" AND ").as_str());
}
format!(
"
SELECT
id,
invoice,
payer_amount_sat,
txid,
created_at
FROM ongoing_send_swaps
{where_clause_str}
ORDER BY created_at
"
)
}
pub(crate) fn fetch_ongoing_swap_in(
con: &Connection,
id: &str,
) -> rusqlite::Result<Option<OngoingSwapIn>> {
let query = Self::list_ongoing_swap_in_query(vec!["id = ?1"]);
con.query_row(&query, [id], Self::sql_row_to_ongoing_swap_in)
.optional()
}
fn sql_row_to_ongoing_swap_in(row: &Row) -> rusqlite::Result<OngoingSwapIn> {
Ok(OngoingSwapIn {
id: row.get(0)?,
invoice: row.get(1)?,
payer_amount_sat: row.get(2)?,
txid: row.get(3)?,
})
}
pub(crate) fn list_ongoing_send(
&self,
con: &Connection,
where_clauses: Vec<&str>,
) -> rusqlite::Result<Vec<OngoingSwapIn>> {
let query = Self::list_ongoing_swap_in_query(where_clauses);
let ongoing_send = con
.prepare(&query)?
.query_map(params![], Self::sql_row_to_ongoing_swap_in)?
.map(|i| i.unwrap())
.collect();
Ok(ongoing_send)
}
}

View File

@@ -0,0 +1,96 @@
use crate::model::*;
use crate::persist::Persister;
use anyhow::Result;
use rusqlite::{params, Connection, OptionalExtension, Row};
impl Persister {
pub(crate) fn insert_or_update_ongoing_swap_out(&self, swap_out: OngoingSwapOut) -> Result<()> {
let con = self.get_connection()?;
let mut stmt = con.prepare(
"
INSERT OR REPLACE INTO ongoing_receive_swaps (
id,
preimage,
redeem_script,
blinding_key,
invoice,
receiver_amount_sat,
claim_fees_sat
)
VALUES (?, ?, ?, ?, ?, ?, ?)",
)?;
_ = stmt.execute((
swap_out.id,
swap_out.preimage,
swap_out.redeem_script,
swap_out.blinding_key,
swap_out.invoice,
swap_out.receiver_amount_sat,
swap_out.claim_fees_sat,
))?;
Ok(())
}
fn list_ongoing_swap_out_query(where_clauses: Vec<&str>) -> String {
let mut where_clause_str = String::new();
if !where_clauses.is_empty() {
where_clause_str = String::from("WHERE ");
where_clause_str.push_str(where_clauses.join(" AND ").as_str());
}
format!(
"
SELECT
id,
preimage,
redeem_script,
blinding_key,
invoice,
receiver_amount_sat,
claim_fees_sat,
created_at
FROM ongoing_receive_swaps
{where_clause_str}
ORDER BY created_at
"
)
}
pub(crate) fn fetch_ongoing_swap_out(
con: &Connection,
id: &str,
) -> rusqlite::Result<Option<OngoingSwapOut>> {
let query = Self::list_ongoing_swap_out_query(vec!["id = ?1"]);
con.query_row(&query, [id], Self::sql_row_to_ongoing_swap_out)
.optional()
}
fn sql_row_to_ongoing_swap_out(row: &Row) -> rusqlite::Result<OngoingSwapOut> {
Ok(OngoingSwapOut {
id: row.get(0)?,
preimage: row.get(1)?,
redeem_script: row.get(2)?,
blinding_key: row.get(3)?,
invoice: row.get(4)?,
receiver_amount_sat: row.get(5)?,
claim_fees_sat: row.get(6)?,
})
}
pub(crate) fn list_ongoing_receive(
&self,
con: &Connection,
where_clauses: Vec<&str>,
) -> rusqlite::Result<Vec<OngoingSwapOut>> {
let query = Self::list_ongoing_swap_out_query(where_clauses);
let ongoing_receive = con
.prepare(&query)?
.query_map(params![], Self::sql_row_to_ongoing_swap_out)?
.map(|i| i.unwrap())
.collect();
Ok(ongoing_receive)
}
}

View File

@@ -15,14 +15,12 @@ use boltz_client::{
BoltzApiClient, CreateSwapRequest, RevSwapStates, SubSwapStates, SwapStatusRequest,
BOLTZ_MAINNET_URL, BOLTZ_TESTNET_URL,
},
boltzv2::{
BoltzApiClientV2, CreateSubmarineRequest, ReversePair, SubmarinePair, Subscription,
BOLTZ_MAINNET_URL_V2, BOLTZ_TESTNET_URL_V2,
},
boltzv2::*,
liquid::{LBtcSwapScript, LBtcSwapTx},
liquidv2::LBtcSwapTxV2,
},
util::secrets::{LBtcReverseRecovery, LiquidSwapKey, Preimage, SwapKey},
Bolt11Invoice, Keypair,
Amount, Bolt11Invoice, Keypair, LBtcSwapScriptV2,
};
use elements::hashes::hex::DisplayHex;
use log::{debug, error, info, warn};
@@ -35,8 +33,7 @@ use lwk_wollet::{
};
use crate::{
ensure_sdk, error::PaymentError, get_invoice_amount, model::*, persist::Persister,
utils::get_swap_status_v2,
ensure_sdk, error::PaymentError, get_invoice_amount, model::*, persist::Persister, utils,
};
/// Claim tx feerate, in sats per vbyte.
@@ -117,60 +114,40 @@ impl LiquidSdk {
Ok(descriptor_str.parse()?)
}
fn try_resolve_pending_swap(
sdk: &Arc<LiquidSdk>,
client: &BoltzApiClient,
swap: &OngoingSwap,
pub(crate) fn try_handle_reverse_swap_status(
&self,
swap_state: RevSwapStates,
id: &str,
) -> Result<()> {
match swap {
OngoingSwap::Receive {
id,
preimage,
redeem_script,
blinding_key,
invoice,
claim_fees_sat,
..
} => {
let status = client
.swap_status(SwapStatusRequest { id: id.clone() })
.map_err(|e| anyhow!("Failed to fetch swap status for ID {id}: {e:?}"))?
.status;
let con = self.persister.get_connection()?;
let ongoing_swap_out = Persister::fetch_ongoing_swap_out(&con, id)?
.ok_or(anyhow!("No ongoing swap out found for ID {id}"))?;
let swap_state = status.parse::<RevSwapStates>().map_err(|_| {
anyhow!("Invalid reverse swap state received for swap {id}: {status}",)
})?;
match swap_state {
RevSwapStates::SwapExpired
| RevSwapStates::InvoiceExpired
| RevSwapStates::TransactionFailed
| RevSwapStates::TransactionRefunded => {
warn!("Cannot claim swap {id}, unrecoverable state: {status}");
sdk.persister
.resolve_ongoing_swap(id, None)
.map_err(|_| anyhow!("Could not resolve swap {id} in database"))?;
}
RevSwapStates::TransactionConfirmed => {}
_ => {
return Err(anyhow!("New swap state for reverse swap {id}: {status}"));
}
}
match sdk.try_claim(preimage, redeem_script, blinding_key, *claim_fees_sat) {
match swap_state {
RevSwapStates::SwapExpired
| RevSwapStates::InvoiceExpired
| RevSwapStates::TransactionFailed
| RevSwapStates::TransactionRefunded => {
warn!("Cannot claim swap {id}, unrecoverable state: {swap_state:?}");
self.persister
.resolve_ongoing_swap(id, None)
.map_err(|_| anyhow!("Could not resolve swap {id} in database"))?;
}
RevSwapStates::TransactionMempool | RevSwapStates::TransactionConfirmed => {
match self.try_claim_v2(&ongoing_swap_out) {
Ok(txid) => {
let payer_amount_sat = get_invoice_amount!(invoice);
sdk.persister
let payer_amount_sat = get_invoice_amount!(ongoing_swap_out.invoice);
self.persister
.resolve_ongoing_swap(
id,
Some((txid, PaymentData { payer_amount_sat })),
)
.map_err(|_| anyhow!("Could not resolve swap {id} in database"))?;
.map_err(|e| anyhow!("Could not resolve swap {id}: {e}"))?;
}
Err(err) => {
if let PaymentError::AlreadyClaimed = err {
warn!("Funds already claimed");
sdk.persister
self.persister
.resolve_ongoing_swap(id, None)
.map_err(|_| anyhow!("Could not resolve swap {id} in database"))?;
}
@@ -178,40 +155,66 @@ impl LiquidSdk {
}
}
}
OngoingSwap::Send {
id, invoice, txid, ..
} => {
let Some(txid) = txid.clone() else {
return Err(anyhow!("Transaction not broadcast yet for swap {id}"));
};
RevSwapStates::Created | RevSwapStates::MinerFeePaid => {
// Too soon to try to claim
}
RevSwapStates::InvoiceSettled => {
// Reverse swap already completed at this point, from our perspective
}
}
Ok(())
}
pub(crate) fn try_handle_submarine_swap_status(
&self,
swap_state: SubSwapStates,
id: &str,
) -> Result<()> {
let con = self.persister.get_connection()?;
let ongoing_swap_in = Persister::fetch_ongoing_swap_in(&con, id)?
.ok_or(anyhow!("No ongoing swap in found for ID {id}"))?;
let Some(txid) = ongoing_swap_in.txid.clone() else {
return Err(anyhow!("Transaction not broadcast yet for swap {id}"));
};
match swap_state {
SubSwapStates::TransactionClaimed
| SubSwapStates::InvoiceFailedToPay
| SubSwapStates::SwapExpired => {
warn!("Cannot positively resolve swap {id}, unrecoverable state: {swap_state:?}");
let payer_amount_sat = get_invoice_amount!(ongoing_swap_in.invoice);
self.persister
.resolve_ongoing_swap(id, Some((txid, PaymentData { payer_amount_sat })))
.map_err(|_| anyhow!("Could not resolve swap {id} in database"))
}
_ => Err(anyhow!("New state for submarine swap {id}: {swap_state:?}")),
}
}
fn try_resolve_pending_swap(&self, swap: &OngoingSwap) -> Result<()> {
let client = self.boltz_client();
let client_v2 = self.boltz_client_v2();
match swap {
OngoingSwap::Receive(ongoing_swap_out) => {
let swap_state = utils::get_rev_swap_status_v2(client_v2, &ongoing_swap_out.id)?;
self.try_handle_reverse_swap_status(swap_state, &ongoing_swap_out.id)?;
}
OngoingSwap::Send(ongoing_swap_in) => {
let id = &ongoing_swap_in.id;
let status = client
.swap_status(SwapStatusRequest { id: id.clone() })
.map_err(|e| anyhow!("Failed to fetch swap status for ID {id}: {e:?}"))?
.status;
let state: SubSwapStates = status.parse().map_err(|_| {
let swap_state: SubSwapStates = status.parse().map_err(|_| {
anyhow!("Invalid submarine swap state received for swap {id}: {status}")
})?;
match state {
SubSwapStates::TransactionClaimed
| SubSwapStates::InvoiceFailedToPay
| SubSwapStates::SwapExpired => {
warn!("Cannot positively resolve swap {id}, unrecoverable state: {status}");
let payer_amount_sat = get_invoice_amount!(invoice);
sdk.persister
.resolve_ongoing_swap(
id,
Some((txid, PaymentData { payer_amount_sat })),
)
.map_err(|_| anyhow!("Could not resolve swap {id} in database"))?;
}
_ => {
return Err(anyhow!("New swap state for submarine swap {id}: {status}"));
}
}
self.try_handle_submarine_swap_status(swap_state, &ongoing_swap_in.id)?;
}
};
@@ -220,14 +223,12 @@ impl LiquidSdk {
fn track_pending_swaps(self: &Arc<LiquidSdk>) -> Result<()> {
let cloned = self.clone();
let client = self.boltz_client();
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(5));
match cloned.persister.list_ongoing_swaps() {
Ok(ongoing_swaps) => {
for swap in ongoing_swaps {
match LiquidSdk::try_resolve_pending_swap(&cloned, &client, &swap) {
match cloned.try_resolve_pending_swap(&swap) {
Ok(_) => info!("Resolved pending swap {}", swap.id()),
Err(err) => match swap {
OngoingSwap::Send { .. } => error!("[Ongoing Send] {err}"),
@@ -438,16 +439,16 @@ impl LiquidSdk {
.map_err(|e| anyhow!("Failed to subscribe to websocket updates: {e:?}"))?;
self.persister
.insert_or_update_ongoing_swap(&[OngoingSwap::Send {
.insert_or_update_ongoing_swap_in(OngoingSwapIn {
id: create_response.id.clone(),
invoice: req.invoice.clone(),
payer_amount_sat: req.fees_sat + receiver_amount_sat,
txid: None,
}])?;
})?;
let result;
loop {
let data = match get_swap_status_v2(&mut socket, &create_response.id) {
let data = match utils::get_swap_status_v2(&mut socket, &create_response.id) {
Ok(data) => data,
Err(_) => continue,
};
@@ -511,12 +512,12 @@ impl LiquidSdk {
};
self.persister
.insert_or_update_ongoing_swap(&[OngoingSwap::Send {
.insert_or_update_ongoing_swap_in(OngoingSwapIn {
id: create_response.id.clone(),
invoice: req.invoice.clone(),
payer_amount_sat: req.fees_sat + receiver_amount_sat,
txid: Some(txid.clone()),
}])?;
})?;
result = Ok(SendPaymentResponse { txid });
break;
@@ -543,19 +544,8 @@ impl LiquidSdk {
result
}
fn try_claim(
&self,
preimage: &str,
redeem_script: &str,
blinding_key: &str,
claim_fees_sat: u64,
) -> Result<String, PaymentError> {
let network_config = &self.network_config();
let rev_swap_tx = LBtcSwapTx::new_claim(
LBtcSwapScript::reverse_from_str(redeem_script, blinding_key)?,
self.address()?.to_string(),
network_config,
)?;
fn try_claim_v2(&self, ongoing_swap_out: &OngoingSwapOut) -> Result<String, PaymentError> {
debug!("Trying to claim reverse swap {}", ongoing_swap_out.id);
let mnemonic = self
.lwk_signer
@@ -565,31 +555,40 @@ impl LiquidSdk {
})?;
let swap_key =
SwapKey::from_reverse_account(&mnemonic.to_string(), "", self.network.into(), 0)?;
let lsk = LiquidSwapKey::try_from(swap_key)?;
let preimage = Preimage::from_str(preimage)?;
let our_keys = lsk.keypair;
let signed_tx = rev_swap_tx.sign_claim(&lsk.keypair, &preimage, claim_fees_sat)?;
let tx_hex = elements::encode::serialize(&signed_tx).to_lower_hex_string();
let swap_response_v2: CreateReverseResponse =
serde_json::from_str(&ongoing_swap_out.redeem_script).unwrap();
let swap_script = LBtcSwapScriptV2::reverse_from_swap_resp(
&swap_response_v2,
our_keys.public_key().into(),
)?;
let client = self.boltz_client_v2();
let response = client.broadcast_tx(self.network.into(), &tx_hex)?;
let txid = response
.as_object()
.ok_or(PaymentError::Generic {
err: "Invalid data received from swapper".to_string(),
})?
.get("id")
.ok_or(PaymentError::Generic {
err: "Invalid data received from swapper".to_string(),
})?
.as_str()
.ok_or(PaymentError::Generic {
err: "Invalid data received from swapper".to_string(),
})?
.to_string();
let claim_address = self.address()?.to_string();
let claim_tx = LBtcSwapTxV2::new_claim(
swap_script.clone(),
claim_address,
&ElectrumConfig::default_liquid(),
)?;
Ok(txid)
let tx = claim_tx.sign_claim(
&our_keys,
&Preimage::from_str(&ongoing_swap_out.preimage)?,
Amount::from_sat(ongoing_swap_out.claim_fees_sat),
Some((&self.boltz_client_v2(), ongoing_swap_out.id.clone())),
)?;
claim_tx.broadcast(
&tx,
&ElectrumConfig::default(self.network.into(), None)?,
None,
)?;
info!("Succesfully broadcasted claim tx {}", tx.txid());
debug!("Claim Tx {:?}", tx);
Ok(tx.txid().to_string())
}
#[allow(dead_code)]
@@ -615,19 +614,26 @@ impl LiquidSdk {
&self,
req: &PrepareReceiveRequest,
) -> Result<PrepareReceiveResponse, PaymentError> {
// let client = self.boltz_client_v2();
// let lbtc_pair = Self::validate_reverse_pairs(&client, req.payer_amount_sat)?;
let client = self.boltz_client();
let lbtc_pair = client
.get_pairs()?
.get_lbtc_pair()
let reverse_pair = self
.boltz_client_v2()
.get_reverse_pairs()?
.get_btc_to_lbtc_pair()
.ok_or(PaymentError::PairsNotFound)?;
let fees_sat = lbtc_pair.fees.reverse_total(req.payer_amount_sat);
let payer_amount_sat = req.payer_amount_sat;
let fees_sat = reverse_pair.fees.total(req.payer_amount_sat);
ensure_sdk!(payer_amount_sat > fees_sat, PaymentError::AmountOutOfRange);
reverse_pair
.limits
.within(payer_amount_sat)
.map_err(|_| PaymentError::AmountOutOfRange)?;
debug!("Preparing reverse swap with: payer_amount_sat {payer_amount_sat} sat, fees_sat {fees_sat} sat");
Ok(PrepareReceiveResponse {
payer_amount_sat: req.payer_amount_sat,
payer_amount_sat,
fees_sat,
})
}
@@ -636,21 +642,19 @@ impl LiquidSdk {
&self,
req: &PrepareReceiveResponse,
) -> Result<ReceivePaymentResponse, PaymentError> {
// let client = self.boltz_client_v2();
// let lbtc_pair = Self::validate_reverse_pairs(&client, req.payer_amount_sat)?;
let payer_amount_sat = req.payer_amount_sat;
let fees_sat = req.fees_sat;
let client = self.boltz_client();
let lbtc_pair = client
.get_pairs()?
.get_lbtc_pair()
let reverse_pair = self
.boltz_client_v2()
.get_reverse_pairs()?
.get_btc_to_lbtc_pair()
.ok_or(PaymentError::PairsNotFound)?;
let new_fees_sat = reverse_pair.fees.total(req.payer_amount_sat);
ensure_sdk!(fees_sat == new_fees_sat, PaymentError::InvalidOrExpiredFees);
ensure_sdk!(
req.fees_sat == lbtc_pair.fees.reverse_total(req.payer_amount_sat),
PaymentError::InvalidOrExpiredFees
);
debug!("Creating reverse swap with: payer_amount_sat {payer_amount_sat} sat, fees_sat {fees_sat} sat");
let client = self.boltz_client();
let mnemonic = self
.lwk_signer
.mnemonic()
@@ -665,21 +669,33 @@ impl LiquidSdk {
let preimage_str = preimage.to_string().ok_or(PaymentError::InvalidPreimage)?;
let preimage_hash = preimage.sha256.to_string();
debug!(
"Creating reverse swap with: payer_amount_sat {} sat, fees_total {} sat",
req.payer_amount_sat, req.fees_sat
);
let swap_response = client.create_swap(CreateSwapRequest::new_lbtc_reverse_invoice_amt(
lbtc_pair.hash.clone(),
preimage_hash.clone(),
lsk.keypair.public_key().to_string(),
req.payer_amount_sat,
))?;
let v2_req = CreateReverseRequest {
invoice_amount: req.payer_amount_sat as u32, // TODO update our model
from: "BTC".to_string(),
to: "L-BTC".to_string(),
preimage_hash: preimage.sha256,
claim_public_key: lsk.keypair.public_key().into(),
address: None,
address_signature: None,
referral_id: None,
};
let swap_response_v2 = self.boltz_client_v2().post_reverse_req(v2_req)?;
let swap_id = swap_response.get_id();
let invoice = swap_response.get_invoice()?;
let blinding_str = swap_response.get_blinding_key()?;
let redeem_script = swap_response.get_redeem_script()?;
// TODO Persisting this in the DB (reusing "redeem_script" field), as we need it later when claiming
let redeem_script = serde_json::to_string(&swap_response_v2).unwrap();
let swap_id = swap_response_v2.id;
let invoice = Bolt11Invoice::from_str(&swap_response_v2.invoice).map_err(|_| {
boltz_client::error::Error::Protocol(
"Boltz response does not contain an invoice.".to_string(),
)
})?;
let blinding_str =
swap_response_v2
.blinding_key
.ok_or(boltz_client::error::Error::Protocol(
"Boltz response does not contain a blinding key.".to_string(),
))?;
let payer_amount_sat = invoice
.amount_milli_satoshis()
.ok_or(PaymentError::InvalidInvoice)?
@@ -692,15 +708,15 @@ impl LiquidSdk {
};
self.persister
.insert_or_update_ongoing_swap(&[OngoingSwap::Receive {
.insert_or_update_ongoing_swap_out(OngoingSwapOut {
id: swap_id.clone(),
preimage: preimage_str,
blinding_key: blinding_str,
redeem_script,
invoice: invoice.to_string(),
receiver_amount_sat: payer_amount_sat - req.fees_sat,
claim_fees_sat: lbtc_pair.fees.reverse_claim_estimate(),
}])
claim_fees_sat: reverse_pair.fees.claim_estimate(),
})
.map_err(|_| PaymentError::PersistError)?;
Ok(ReceivePaymentResponse {

View File

@@ -1,7 +1,11 @@
use std::net::TcpStream;
use std::str::FromStr;
use anyhow::{anyhow, ensure, Result};
use boltz_client::swaps::boltzv2::SwapUpdate;
use boltz_client::swaps::{
boltz::RevSwapStates,
boltzv2::{BoltzApiClientV2, Subscription, SwapUpdate},
};
use log::{error, info};
use tungstenite::{stream::MaybeTlsStream, WebSocket};
@@ -65,3 +69,78 @@ pub(crate) fn get_swap_status_v2(
}
}
}
/// Fetch the reverse swap status using the websocket endpoint
pub(crate) fn get_rev_swap_status_v2(
client_v2: BoltzApiClientV2,
swap_id: &str,
) -> Result<RevSwapStates> {
let mut socket = client_v2
.connect_ws()
.map_err(|e| anyhow!("Failed to connect to websocket: {e:?}"))?;
let sub_id = swap_id.to_string();
let subscription = Subscription::new(&sub_id);
let subscribe_json = serde_json::to_string(&subscription)
.map_err(|e| anyhow!("Failed to serialize subscription msg: {e:?}"))?;
socket
.send(tungstenite::Message::Text(subscribe_json))
.map_err(|e| anyhow!("Failed to subscribe to websocket updates: {e:?}"))?;
loop {
let response: SwapUpdate = serde_json::from_str(&socket.read()?.to_string())
.map_err(|e| anyhow!("WS response is invalid SwapUpdate: {e:?}"))?;
match response {
SwapUpdate::Subscription {
event,
channel,
args,
} => {
ensure!(event == "subscribe", "Wrong WS reply event {event}");
ensure!(channel == "swap.update", "Wrong WS reply channel {channel}");
let first_arg = args.first();
let is_ok = matches!(first_arg.as_ref(), Some(&x) if x == &sub_id);
ensure!(is_ok, "Wrong WS reply subscription ID {first_arg:?}");
info!("Subscription successful for swap : {sub_id}");
}
SwapUpdate::Update {
event,
channel,
args,
} => {
ensure!(event == "update", "Wrong WS reply event {event}");
ensure!(channel == "swap.update", "Wrong WS reply channel {channel}");
return match args.first() {
Some(update) if update.id == sub_id => {
info!("Got new reverse swap status: {}", update.status);
RevSwapStates::from_str(&update.status).map_err(|_| {
anyhow!("Invalid state for rev swap {swap_id}: {}", update.status)
})
}
Some(update) => Err(anyhow!("WS reply has wrong swap ID {update:?}")),
None => Err(anyhow!("WS reply contains no update")),
};
}
SwapUpdate::Error {
event,
channel,
args,
} => {
ensure!(event == "update", "Wrong WS reply event {event}");
ensure!(channel == "swap.update", "Wrong WS reply channel {channel}");
for e in &args {
error!("Got error: {} for swap: {}", e.error, e.id);
}
return Err(anyhow!("Got SwapUpdate errors: {args:?}"));
}
}
}
}