From d67756bf43a247bb71d973eaec9a2e0f769a9df6 Mon Sep 17 00:00:00 2001 From: ok300 <106775972+ok300@users.noreply.github.com> Date: Thu, 9 May 2024 17:13:15 +0200 Subject: [PATCH] Migrate receive-payment to V2 API, use WS to get status --- cli/Cargo.lock | 2 +- lib/Cargo.lock | 2 +- lib/core/Cargo.toml | 7 +- lib/core/src/model.rs | 51 ++--- lib/core/src/persist/mod.rs | 144 ++------------ lib/core/src/persist/swap_in.rs | 84 ++++++++ lib/core/src/persist/swap_out.rs | 96 +++++++++ lib/core/src/sdk.rs | 324 ++++++++++++++++--------------- lib/core/src/utils.rs | 81 +++++++- 9 files changed, 478 insertions(+), 313 deletions(-) create mode 100644 lib/core/src/persist/swap_in.rs create mode 100644 lib/core/src/persist/swap_out.rs diff --git a/cli/Cargo.lock b/cli/Cargo.lock index f4f3cb5..7c227a5 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -355,7 +355,7 @@ dependencies = [ [[package]] name = "boltz-client" version = "0.1.3" -source = "git+https://github.com/hydra-yse/boltz-rust?rev=410d3a95e528fce36c02e8d414d5b647a31cc28f#410d3a95e528fce36c02e8d414d5b647a31cc28f" +source = "git+https://github.com/hydra-yse/boltz-rust?rev=50b93fb7eba043e12a71fd7ddb1e9604a946c21b#50b93fb7eba043e12a71fd7ddb1e9604a946c21b" dependencies = [ "bip39", "bitcoin 0.31.2", diff --git a/lib/Cargo.lock b/lib/Cargo.lock index 527c8d4..f30073c 100644 --- a/lib/Cargo.lock +++ b/lib/Cargo.lock @@ -478,7 +478,7 @@ dependencies = [ [[package]] name = "boltz-client" version = "0.1.3" -source = "git+https://github.com/hydra-yse/boltz-rust?rev=410d3a95e528fce36c02e8d414d5b647a31cc28f#410d3a95e528fce36c02e8d414d5b647a31cc28f" +source = "git+https://github.com/hydra-yse/boltz-rust?rev=50b93fb7eba043e12a71fd7ddb1e9604a946c21b#50b93fb7eba043e12a71fd7ddb1e9604a946c21b" dependencies = [ "bip39", "bitcoin 0.31.2", diff --git a/lib/core/Cargo.toml b/lib/core/Cargo.toml index 85f917c..e251f64 100644 --- a/lib/core/Cargo.toml +++ b/lib/core/Cargo.toml @@ -10,7 +10,8 @@ crate-type = ["lib", "cdylib", "staticlib"] [dependencies] anyhow = { workspace = true } bip39 = { version = "2.0.0", features = ["serde"] } -boltz-client = { git = "https://github.com/hydra-yse/boltz-rust", rev = "410d3a95e528fce36c02e8d414d5b647a31cc28f" } +#boltz-client = { git = "https://github.com/SatoshiPortal/boltz-rust", rev = "6f45fff8b87c7530c847eb05f018906c48785a6c" } +boltz-client = { git = "https://github.com/hydra-yse/boltz-rust", rev = "50b93fb7eba043e12a71fd7ddb1e9604a946c21b" } flutter_rust_bridge = { version = "=2.0.0-dev.33", features = ["chrono"], optional = true } log = "0.4.20" lwk_common = "0.3.0" @@ -19,12 +20,12 @@ lwk_wollet = "0.3.0" rusqlite = { version = "0.31", features = ["backup", "bundled"] } rusqlite_migration = "1.0" serde = { version = "1.0.197", features = ["derive"] } +serde_json = "1.0.116" thiserror = { workspace = true } +tungstenite = { version = "0.21.0", features = ["native-tls-vendored"] } openssl = { version = "0.10", features = ["vendored"] } # TODO Remove once fully migrated to v2 API elements = "0.24.1" -serde_json = "1.0.116" -tungstenite = { version = "0.21.0", features = ["native-tls-vendored"] } [dev-dependencies] tempdir = "0.3.7" diff --git a/lib/core/src/model.rs b/lib/core/src/model.rs index 3b02843..ea92596 100644 --- a/lib/core/src/model.rs +++ b/lib/core/src/model.rs @@ -128,32 +128,39 @@ pub struct RestoreRequest { pub backup_path: Option, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub(crate) enum OngoingSwap { - Send { - id: String, - invoice: String, - payer_amount_sat: u64, - txid: Option, - }, - Receive { - id: String, - preimage: String, - redeem_script: String, - blinding_key: String, - invoice: String, - receiver_amount_sat: u64, - claim_fees_sat: u64, - }, + Send(OngoingSwapIn), + Receive(OngoingSwapOut), } impl OngoingSwap { - pub(crate) fn id(&self) -> &str { + pub(crate) fn id(&self) -> String { match &self { - OngoingSwap::Send { id, .. } | OngoingSwap::Receive { id, .. } => id, + OngoingSwap::Send(OngoingSwapIn { id, .. }) + | OngoingSwap::Receive(OngoingSwapOut { id, .. }) => id.clone(), } } } +#[derive(Clone, Debug)] +pub(crate) struct OngoingSwapIn { + pub(crate) id: String, + pub(crate) invoice: String, + pub(crate) payer_amount_sat: u64, + pub(crate) txid: Option, +} + +#[derive(Clone, Debug)] +pub(crate) struct OngoingSwapOut { + pub(crate) id: String, + pub(crate) preimage: String, + pub(crate) redeem_script: String, + pub(crate) blinding_key: String, + pub(crate) invoice: String, + pub(crate) receiver_amount_sat: u64, + pub(crate) claim_fees_sat: u64, +} + #[derive(Debug, Clone, PartialEq, Serialize)] pub enum PaymentType { Sent, @@ -177,11 +184,11 @@ pub struct Payment { impl From for Payment { fn from(swap: OngoingSwap) -> Self { match swap { - OngoingSwap::Send { + OngoingSwap::Send(OngoingSwapIn { invoice, payer_amount_sat, .. - } => { + }) => { let receiver_amount_sat = get_invoice_amount!(invoice); Payment { id: None, @@ -192,11 +199,11 @@ impl From for Payment { fees_sat: Some(payer_amount_sat - receiver_amount_sat), } } - OngoingSwap::Receive { + OngoingSwap::Receive(OngoingSwapOut { receiver_amount_sat, invoice, .. - } => { + }) => { let payer_amount_sat = get_invoice_amount!(invoice); Payment { id: None, diff --git a/lib/core/src/persist/mod.rs b/lib/core/src/persist/mod.rs index 7a223fd..797e111 100644 --- a/lib/core/src/persist/mod.rs +++ b/lib/core/src/persist/mod.rs @@ -1,5 +1,7 @@ mod backup; mod migrations; +mod swap_in; +mod swap_out; use std::{collections::HashMap, fs::create_dir_all, path::PathBuf, str::FromStr}; @@ -8,7 +10,7 @@ use migrations::current_migrations; use rusqlite::{params, Connection}; use rusqlite_migration::{Migrations, M}; -use crate::model::{Network, Network::*, OngoingSwap, PaymentData}; +use crate::model::{Network::*, *}; pub(crate) struct Persister { main_db_dir: PathBuf, @@ -47,70 +49,6 @@ impl Persister { Ok(()) } - pub fn insert_or_update_ongoing_swap(&self, swaps: &[OngoingSwap]) -> Result<()> { - let con = self.get_connection()?; - - for swap in swaps { - match swap { - OngoingSwap::Send { - id, - invoice, - payer_amount_sat, - txid, - } => { - let mut stmt = con.prepare( - " - INSERT OR REPLACE INTO ongoing_send_swaps ( - id, - invoice, - payer_amount_sat, - txid - ) - VALUES (?, ?, ?, ?) - ", - )?; - _ = stmt.execute((id, invoice, payer_amount_sat, txid))? - } - OngoingSwap::Receive { - id, - preimage, - redeem_script, - blinding_key, - invoice, - receiver_amount_sat, - claim_fees_sat, - } => { - let mut stmt = con.prepare( - " - INSERT OR REPLACE INTO ongoing_receive_swaps ( - id, - preimage, - redeem_script, - blinding_key, - invoice, - receiver_amount_sat, - claim_fees_sat - ) - VALUES (?, ?, ?, ?, ?, ?, ?) - ", - )?; - - _ = stmt.execute(( - id, - preimage, - redeem_script, - blinding_key, - invoice, - receiver_amount_sat, - claim_fees_sat, - ))? - } - } - } - - Ok(()) - } - pub fn resolve_ongoing_swap( &self, id: &str, @@ -136,75 +74,19 @@ impl Persister { Ok(()) } - pub fn list_ongoing_swaps(&self) -> Result> { + pub(crate) fn list_ongoing_swaps(&self) -> Result> { let con = self.get_connection()?; - let mut ongoing_swaps = self.list_ongoing_send(&con)?; - ongoing_swaps.append(&mut self.list_ongoing_receive(&con)?); - Ok(ongoing_swaps) - } - - fn list_ongoing_send(&self, con: &Connection) -> Result, rusqlite::Error> { - let mut stmt = con.prepare( - " - SELECT - id, - invoice, - payer_amount_sat, - txid, - created_at - FROM ongoing_send_swaps - ORDER BY created_at - ", - )?; - - let ongoing_send = stmt - .query_map(params![], |row| { - Ok(OngoingSwap::Send { - id: row.get(0)?, - invoice: row.get(1)?, - payer_amount_sat: row.get(2)?, - txid: row.get(3)?, - }) - })? - .map(|i| i.unwrap()) + let ongoing_swap_ins: Vec = self + .list_ongoing_send(&con, vec![])? + .into_iter() + .map(OngoingSwap::Send) .collect(); - - Ok(ongoing_send) - } - - fn list_ongoing_receive(&self, con: &Connection) -> Result, rusqlite::Error> { - let mut stmt = con.prepare( - " - SELECT - id, - preimage, - redeem_script, - blinding_key, - invoice, - receiver_amount_sat, - claim_fees_sat, - created_at - FROM ongoing_receive_swaps - ORDER BY created_at - ", - )?; - - let ongoing_receive = stmt - .query_map(params![], |row| { - Ok(OngoingSwap::Receive { - id: row.get(0)?, - preimage: row.get(1)?, - redeem_script: row.get(2)?, - blinding_key: row.get(3)?, - invoice: row.get(4)?, - receiver_amount_sat: row.get(5)?, - claim_fees_sat: row.get(6)?, - }) - })? - .map(|i| i.unwrap()) + let ongoing_swap_outs: Vec = self + .list_ongoing_receive(&con, vec![])? + .into_iter() + .map(OngoingSwap::Receive) .collect(); - - Ok(ongoing_receive) + Ok([ongoing_swap_ins, ongoing_swap_outs].concat()) } pub fn get_payment_data(&self) -> Result> { diff --git a/lib/core/src/persist/swap_in.rs b/lib/core/src/persist/swap_in.rs new file mode 100644 index 0000000..e4a5645 --- /dev/null +++ b/lib/core/src/persist/swap_in.rs @@ -0,0 +1,84 @@ +use crate::model::*; +use crate::persist::Persister; + +use anyhow::Result; +use rusqlite::{params, Connection, OptionalExtension, Row}; + +impl Persister { + pub(crate) fn insert_or_update_ongoing_swap_in(&self, swap_in: OngoingSwapIn) -> Result<()> { + let con = self.get_connection()?; + + let mut stmt = con.prepare( + " + INSERT OR REPLACE INTO ongoing_send_swaps ( + id, + invoice, + payer_amount_sat, + txid + ) + VALUES (?, ?, ?, ?)", + )?; + _ = stmt.execute(( + swap_in.id, + swap_in.invoice, + swap_in.payer_amount_sat, + swap_in.txid, + ))?; + + Ok(()) + } + + fn list_ongoing_swap_in_query(where_clauses: Vec<&str>) -> String { + let mut where_clause_str = String::new(); + if !where_clauses.is_empty() { + where_clause_str = String::from("WHERE "); + where_clause_str.push_str(where_clauses.join(" AND ").as_str()); + } + + format!( + " + SELECT + id, + invoice, + payer_amount_sat, + txid, + created_at + FROM ongoing_send_swaps + {where_clause_str} + ORDER BY created_at + " + ) + } + + pub(crate) fn fetch_ongoing_swap_in( + con: &Connection, + id: &str, + ) -> rusqlite::Result> { + let query = Self::list_ongoing_swap_in_query(vec!["id = ?1"]); + con.query_row(&query, [id], Self::sql_row_to_ongoing_swap_in) + .optional() + } + + fn sql_row_to_ongoing_swap_in(row: &Row) -> rusqlite::Result { + Ok(OngoingSwapIn { + id: row.get(0)?, + invoice: row.get(1)?, + payer_amount_sat: row.get(2)?, + txid: row.get(3)?, + }) + } + + pub(crate) fn list_ongoing_send( + &self, + con: &Connection, + where_clauses: Vec<&str>, + ) -> rusqlite::Result> { + let query = Self::list_ongoing_swap_in_query(where_clauses); + let ongoing_send = con + .prepare(&query)? + .query_map(params![], Self::sql_row_to_ongoing_swap_in)? + .map(|i| i.unwrap()) + .collect(); + Ok(ongoing_send) + } +} diff --git a/lib/core/src/persist/swap_out.rs b/lib/core/src/persist/swap_out.rs new file mode 100644 index 0000000..52df9b2 --- /dev/null +++ b/lib/core/src/persist/swap_out.rs @@ -0,0 +1,96 @@ +use crate::model::*; +use crate::persist::Persister; + +use anyhow::Result; +use rusqlite::{params, Connection, OptionalExtension, Row}; + +impl Persister { + pub(crate) fn insert_or_update_ongoing_swap_out(&self, swap_out: OngoingSwapOut) -> Result<()> { + let con = self.get_connection()?; + + let mut stmt = con.prepare( + " + INSERT OR REPLACE INTO ongoing_receive_swaps ( + id, + preimage, + redeem_script, + blinding_key, + invoice, + receiver_amount_sat, + claim_fees_sat + ) + VALUES (?, ?, ?, ?, ?, ?, ?)", + )?; + _ = stmt.execute(( + swap_out.id, + swap_out.preimage, + swap_out.redeem_script, + swap_out.blinding_key, + swap_out.invoice, + swap_out.receiver_amount_sat, + swap_out.claim_fees_sat, + ))?; + + Ok(()) + } + + fn list_ongoing_swap_out_query(where_clauses: Vec<&str>) -> String { + let mut where_clause_str = String::new(); + if !where_clauses.is_empty() { + where_clause_str = String::from("WHERE "); + where_clause_str.push_str(where_clauses.join(" AND ").as_str()); + } + + format!( + " + SELECT + id, + preimage, + redeem_script, + blinding_key, + invoice, + receiver_amount_sat, + claim_fees_sat, + created_at + FROM ongoing_receive_swaps + {where_clause_str} + ORDER BY created_at + " + ) + } + + pub(crate) fn fetch_ongoing_swap_out( + con: &Connection, + id: &str, + ) -> rusqlite::Result> { + let query = Self::list_ongoing_swap_out_query(vec!["id = ?1"]); + con.query_row(&query, [id], Self::sql_row_to_ongoing_swap_out) + .optional() + } + + fn sql_row_to_ongoing_swap_out(row: &Row) -> rusqlite::Result { + Ok(OngoingSwapOut { + id: row.get(0)?, + preimage: row.get(1)?, + redeem_script: row.get(2)?, + blinding_key: row.get(3)?, + invoice: row.get(4)?, + receiver_amount_sat: row.get(5)?, + claim_fees_sat: row.get(6)?, + }) + } + + pub(crate) fn list_ongoing_receive( + &self, + con: &Connection, + where_clauses: Vec<&str>, + ) -> rusqlite::Result> { + let query = Self::list_ongoing_swap_out_query(where_clauses); + let ongoing_receive = con + .prepare(&query)? + .query_map(params![], Self::sql_row_to_ongoing_swap_out)? + .map(|i| i.unwrap()) + .collect(); + Ok(ongoing_receive) + } +} diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 4ca8107..b57be01 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -15,14 +15,12 @@ use boltz_client::{ BoltzApiClient, CreateSwapRequest, RevSwapStates, SubSwapStates, SwapStatusRequest, BOLTZ_MAINNET_URL, BOLTZ_TESTNET_URL, }, - boltzv2::{ - BoltzApiClientV2, CreateSubmarineRequest, ReversePair, SubmarinePair, Subscription, - BOLTZ_MAINNET_URL_V2, BOLTZ_TESTNET_URL_V2, - }, + boltzv2::*, liquid::{LBtcSwapScript, LBtcSwapTx}, + liquidv2::LBtcSwapTxV2, }, util::secrets::{LBtcReverseRecovery, LiquidSwapKey, Preimage, SwapKey}, - Bolt11Invoice, Keypair, + Amount, Bolt11Invoice, Keypair, LBtcSwapScriptV2, }; use elements::hashes::hex::DisplayHex; use log::{debug, error, info, warn}; @@ -35,8 +33,7 @@ use lwk_wollet::{ }; use crate::{ - ensure_sdk, error::PaymentError, get_invoice_amount, model::*, persist::Persister, - utils::get_swap_status_v2, + ensure_sdk, error::PaymentError, get_invoice_amount, model::*, persist::Persister, utils, }; /// Claim tx feerate, in sats per vbyte. @@ -117,60 +114,40 @@ impl LiquidSdk { Ok(descriptor_str.parse()?) } - fn try_resolve_pending_swap( - sdk: &Arc, - client: &BoltzApiClient, - swap: &OngoingSwap, + pub(crate) fn try_handle_reverse_swap_status( + &self, + swap_state: RevSwapStates, + id: &str, ) -> Result<()> { - match swap { - OngoingSwap::Receive { - id, - preimage, - redeem_script, - blinding_key, - invoice, - claim_fees_sat, - .. - } => { - let status = client - .swap_status(SwapStatusRequest { id: id.clone() }) - .map_err(|e| anyhow!("Failed to fetch swap status for ID {id}: {e:?}"))? - .status; + let con = self.persister.get_connection()?; + let ongoing_swap_out = Persister::fetch_ongoing_swap_out(&con, id)? + .ok_or(anyhow!("No ongoing swap out found for ID {id}"))?; - let swap_state = status.parse::().map_err(|_| { - anyhow!("Invalid reverse swap state received for swap {id}: {status}",) - })?; - - match swap_state { - RevSwapStates::SwapExpired - | RevSwapStates::InvoiceExpired - | RevSwapStates::TransactionFailed - | RevSwapStates::TransactionRefunded => { - warn!("Cannot claim swap {id}, unrecoverable state: {status}"); - sdk.persister - .resolve_ongoing_swap(id, None) - .map_err(|_| anyhow!("Could not resolve swap {id} in database"))?; - } - RevSwapStates::TransactionConfirmed => {} - _ => { - return Err(anyhow!("New swap state for reverse swap {id}: {status}")); - } - } - - match sdk.try_claim(preimage, redeem_script, blinding_key, *claim_fees_sat) { + match swap_state { + RevSwapStates::SwapExpired + | RevSwapStates::InvoiceExpired + | RevSwapStates::TransactionFailed + | RevSwapStates::TransactionRefunded => { + warn!("Cannot claim swap {id}, unrecoverable state: {swap_state:?}"); + self.persister + .resolve_ongoing_swap(id, None) + .map_err(|_| anyhow!("Could not resolve swap {id} in database"))?; + } + RevSwapStates::TransactionMempool | RevSwapStates::TransactionConfirmed => { + match self.try_claim_v2(&ongoing_swap_out) { Ok(txid) => { - let payer_amount_sat = get_invoice_amount!(invoice); - sdk.persister + let payer_amount_sat = get_invoice_amount!(ongoing_swap_out.invoice); + self.persister .resolve_ongoing_swap( id, Some((txid, PaymentData { payer_amount_sat })), ) - .map_err(|_| anyhow!("Could not resolve swap {id} in database"))?; + .map_err(|e| anyhow!("Could not resolve swap {id}: {e}"))?; } Err(err) => { if let PaymentError::AlreadyClaimed = err { warn!("Funds already claimed"); - sdk.persister + self.persister .resolve_ongoing_swap(id, None) .map_err(|_| anyhow!("Could not resolve swap {id} in database"))?; } @@ -178,40 +155,66 @@ impl LiquidSdk { } } } - OngoingSwap::Send { - id, invoice, txid, .. - } => { - let Some(txid) = txid.clone() else { - return Err(anyhow!("Transaction not broadcast yet for swap {id}")); - }; + RevSwapStates::Created | RevSwapStates::MinerFeePaid => { + // Too soon to try to claim + } + RevSwapStates::InvoiceSettled => { + // Reverse swap already completed at this point, from our perspective + } + } + Ok(()) + } + + pub(crate) fn try_handle_submarine_swap_status( + &self, + swap_state: SubSwapStates, + id: &str, + ) -> Result<()> { + let con = self.persister.get_connection()?; + let ongoing_swap_in = Persister::fetch_ongoing_swap_in(&con, id)? + .ok_or(anyhow!("No ongoing swap in found for ID {id}"))?; + + let Some(txid) = ongoing_swap_in.txid.clone() else { + return Err(anyhow!("Transaction not broadcast yet for swap {id}")); + }; + + match swap_state { + SubSwapStates::TransactionClaimed + | SubSwapStates::InvoiceFailedToPay + | SubSwapStates::SwapExpired => { + warn!("Cannot positively resolve swap {id}, unrecoverable state: {swap_state:?}"); + + let payer_amount_sat = get_invoice_amount!(ongoing_swap_in.invoice); + self.persister + .resolve_ongoing_swap(id, Some((txid, PaymentData { payer_amount_sat }))) + .map_err(|_| anyhow!("Could not resolve swap {id} in database")) + } + _ => Err(anyhow!("New state for submarine swap {id}: {swap_state:?}")), + } + } + + fn try_resolve_pending_swap(&self, swap: &OngoingSwap) -> Result<()> { + let client = self.boltz_client(); + let client_v2 = self.boltz_client_v2(); + + match swap { + OngoingSwap::Receive(ongoing_swap_out) => { + let swap_state = utils::get_rev_swap_status_v2(client_v2, &ongoing_swap_out.id)?; + self.try_handle_reverse_swap_status(swap_state, &ongoing_swap_out.id)?; + } + OngoingSwap::Send(ongoing_swap_in) => { + let id = &ongoing_swap_in.id; let status = client .swap_status(SwapStatusRequest { id: id.clone() }) .map_err(|e| anyhow!("Failed to fetch swap status for ID {id}: {e:?}"))? .status; - let state: SubSwapStates = status.parse().map_err(|_| { + let swap_state: SubSwapStates = status.parse().map_err(|_| { anyhow!("Invalid submarine swap state received for swap {id}: {status}") })?; - match state { - SubSwapStates::TransactionClaimed - | SubSwapStates::InvoiceFailedToPay - | SubSwapStates::SwapExpired => { - warn!("Cannot positively resolve swap {id}, unrecoverable state: {status}"); - - let payer_amount_sat = get_invoice_amount!(invoice); - sdk.persister - .resolve_ongoing_swap( - id, - Some((txid, PaymentData { payer_amount_sat })), - ) - .map_err(|_| anyhow!("Could not resolve swap {id} in database"))?; - } - _ => { - return Err(anyhow!("New swap state for submarine swap {id}: {status}")); - } - } + self.try_handle_submarine_swap_status(swap_state, &ongoing_swap_in.id)?; } }; @@ -220,14 +223,12 @@ impl LiquidSdk { fn track_pending_swaps(self: &Arc) -> Result<()> { let cloned = self.clone(); - let client = self.boltz_client(); - thread::spawn(move || loop { thread::sleep(Duration::from_secs(5)); match cloned.persister.list_ongoing_swaps() { Ok(ongoing_swaps) => { for swap in ongoing_swaps { - match LiquidSdk::try_resolve_pending_swap(&cloned, &client, &swap) { + match cloned.try_resolve_pending_swap(&swap) { Ok(_) => info!("Resolved pending swap {}", swap.id()), Err(err) => match swap { OngoingSwap::Send { .. } => error!("[Ongoing Send] {err}"), @@ -438,16 +439,16 @@ impl LiquidSdk { .map_err(|e| anyhow!("Failed to subscribe to websocket updates: {e:?}"))?; self.persister - .insert_or_update_ongoing_swap(&[OngoingSwap::Send { + .insert_or_update_ongoing_swap_in(OngoingSwapIn { id: create_response.id.clone(), invoice: req.invoice.clone(), payer_amount_sat: req.fees_sat + receiver_amount_sat, txid: None, - }])?; + })?; let result; loop { - let data = match get_swap_status_v2(&mut socket, &create_response.id) { + let data = match utils::get_swap_status_v2(&mut socket, &create_response.id) { Ok(data) => data, Err(_) => continue, }; @@ -511,12 +512,12 @@ impl LiquidSdk { }; self.persister - .insert_or_update_ongoing_swap(&[OngoingSwap::Send { + .insert_or_update_ongoing_swap_in(OngoingSwapIn { id: create_response.id.clone(), invoice: req.invoice.clone(), payer_amount_sat: req.fees_sat + receiver_amount_sat, txid: Some(txid.clone()), - }])?; + })?; result = Ok(SendPaymentResponse { txid }); break; @@ -543,19 +544,8 @@ impl LiquidSdk { result } - fn try_claim( - &self, - preimage: &str, - redeem_script: &str, - blinding_key: &str, - claim_fees_sat: u64, - ) -> Result { - let network_config = &self.network_config(); - let rev_swap_tx = LBtcSwapTx::new_claim( - LBtcSwapScript::reverse_from_str(redeem_script, blinding_key)?, - self.address()?.to_string(), - network_config, - )?; + fn try_claim_v2(&self, ongoing_swap_out: &OngoingSwapOut) -> Result { + debug!("Trying to claim reverse swap {}", ongoing_swap_out.id); let mnemonic = self .lwk_signer @@ -565,31 +555,40 @@ impl LiquidSdk { })?; let swap_key = SwapKey::from_reverse_account(&mnemonic.to_string(), "", self.network.into(), 0)?; - let lsk = LiquidSwapKey::try_from(swap_key)?; - let preimage = Preimage::from_str(preimage)?; + let our_keys = lsk.keypair; - let signed_tx = rev_swap_tx.sign_claim(&lsk.keypair, &preimage, claim_fees_sat)?; - let tx_hex = elements::encode::serialize(&signed_tx).to_lower_hex_string(); + let swap_response_v2: CreateReverseResponse = + serde_json::from_str(&ongoing_swap_out.redeem_script).unwrap(); + let swap_script = LBtcSwapScriptV2::reverse_from_swap_resp( + &swap_response_v2, + our_keys.public_key().into(), + )?; - let client = self.boltz_client_v2(); - let response = client.broadcast_tx(self.network.into(), &tx_hex)?; - let txid = response - .as_object() - .ok_or(PaymentError::Generic { - err: "Invalid data received from swapper".to_string(), - })? - .get("id") - .ok_or(PaymentError::Generic { - err: "Invalid data received from swapper".to_string(), - })? - .as_str() - .ok_or(PaymentError::Generic { - err: "Invalid data received from swapper".to_string(), - })? - .to_string(); + let claim_address = self.address()?.to_string(); + let claim_tx = LBtcSwapTxV2::new_claim( + swap_script.clone(), + claim_address, + &ElectrumConfig::default_liquid(), + )?; - Ok(txid) + let tx = claim_tx.sign_claim( + &our_keys, + &Preimage::from_str(&ongoing_swap_out.preimage)?, + Amount::from_sat(ongoing_swap_out.claim_fees_sat), + Some((&self.boltz_client_v2(), ongoing_swap_out.id.clone())), + )?; + + claim_tx.broadcast( + &tx, + &ElectrumConfig::default(self.network.into(), None)?, + None, + )?; + + info!("Succesfully broadcasted claim tx {}", tx.txid()); + debug!("Claim Tx {:?}", tx); + + Ok(tx.txid().to_string()) } #[allow(dead_code)] @@ -615,19 +614,26 @@ impl LiquidSdk { &self, req: &PrepareReceiveRequest, ) -> Result { - // let client = self.boltz_client_v2(); - // let lbtc_pair = Self::validate_reverse_pairs(&client, req.payer_amount_sat)?; - - let client = self.boltz_client(); - let lbtc_pair = client - .get_pairs()? - .get_lbtc_pair() + let reverse_pair = self + .boltz_client_v2() + .get_reverse_pairs()? + .get_btc_to_lbtc_pair() .ok_or(PaymentError::PairsNotFound)?; - let fees_sat = lbtc_pair.fees.reverse_total(req.payer_amount_sat); + let payer_amount_sat = req.payer_amount_sat; + let fees_sat = reverse_pair.fees.total(req.payer_amount_sat); + + ensure_sdk!(payer_amount_sat > fees_sat, PaymentError::AmountOutOfRange); + + reverse_pair + .limits + .within(payer_amount_sat) + .map_err(|_| PaymentError::AmountOutOfRange)?; + + debug!("Preparing reverse swap with: payer_amount_sat {payer_amount_sat} sat, fees_sat {fees_sat} sat"); Ok(PrepareReceiveResponse { - payer_amount_sat: req.payer_amount_sat, + payer_amount_sat, fees_sat, }) } @@ -636,21 +642,19 @@ impl LiquidSdk { &self, req: &PrepareReceiveResponse, ) -> Result { - // let client = self.boltz_client_v2(); - // let lbtc_pair = Self::validate_reverse_pairs(&client, req.payer_amount_sat)?; + let payer_amount_sat = req.payer_amount_sat; + let fees_sat = req.fees_sat; - let client = self.boltz_client(); - let lbtc_pair = client - .get_pairs()? - .get_lbtc_pair() + let reverse_pair = self + .boltz_client_v2() + .get_reverse_pairs()? + .get_btc_to_lbtc_pair() .ok_or(PaymentError::PairsNotFound)?; + let new_fees_sat = reverse_pair.fees.total(req.payer_amount_sat); + ensure_sdk!(fees_sat == new_fees_sat, PaymentError::InvalidOrExpiredFees); - ensure_sdk!( - req.fees_sat == lbtc_pair.fees.reverse_total(req.payer_amount_sat), - PaymentError::InvalidOrExpiredFees - ); + debug!("Creating reverse swap with: payer_amount_sat {payer_amount_sat} sat, fees_sat {fees_sat} sat"); - let client = self.boltz_client(); let mnemonic = self .lwk_signer .mnemonic() @@ -665,21 +669,33 @@ impl LiquidSdk { let preimage_str = preimage.to_string().ok_or(PaymentError::InvalidPreimage)?; let preimage_hash = preimage.sha256.to_string(); - debug!( - "Creating reverse swap with: payer_amount_sat {} sat, fees_total {} sat", - req.payer_amount_sat, req.fees_sat - ); - let swap_response = client.create_swap(CreateSwapRequest::new_lbtc_reverse_invoice_amt( - lbtc_pair.hash.clone(), - preimage_hash.clone(), - lsk.keypair.public_key().to_string(), - req.payer_amount_sat, - ))?; + let v2_req = CreateReverseRequest { + invoice_amount: req.payer_amount_sat as u32, // TODO update our model + from: "BTC".to_string(), + to: "L-BTC".to_string(), + preimage_hash: preimage.sha256, + claim_public_key: lsk.keypair.public_key().into(), + address: None, + address_signature: None, + referral_id: None, + }; + let swap_response_v2 = self.boltz_client_v2().post_reverse_req(v2_req)?; - let swap_id = swap_response.get_id(); - let invoice = swap_response.get_invoice()?; - let blinding_str = swap_response.get_blinding_key()?; - let redeem_script = swap_response.get_redeem_script()?; + // TODO Persisting this in the DB (reusing "redeem_script" field), as we need it later when claiming + let redeem_script = serde_json::to_string(&swap_response_v2).unwrap(); + + let swap_id = swap_response_v2.id; + let invoice = Bolt11Invoice::from_str(&swap_response_v2.invoice).map_err(|_| { + boltz_client::error::Error::Protocol( + "Boltz response does not contain an invoice.".to_string(), + ) + })?; + let blinding_str = + swap_response_v2 + .blinding_key + .ok_or(boltz_client::error::Error::Protocol( + "Boltz response does not contain a blinding key.".to_string(), + ))?; let payer_amount_sat = invoice .amount_milli_satoshis() .ok_or(PaymentError::InvalidInvoice)? @@ -692,15 +708,15 @@ impl LiquidSdk { }; self.persister - .insert_or_update_ongoing_swap(&[OngoingSwap::Receive { + .insert_or_update_ongoing_swap_out(OngoingSwapOut { id: swap_id.clone(), preimage: preimage_str, blinding_key: blinding_str, redeem_script, invoice: invoice.to_string(), receiver_amount_sat: payer_amount_sat - req.fees_sat, - claim_fees_sat: lbtc_pair.fees.reverse_claim_estimate(), - }]) + claim_fees_sat: reverse_pair.fees.claim_estimate(), + }) .map_err(|_| PaymentError::PersistError)?; Ok(ReceivePaymentResponse { diff --git a/lib/core/src/utils.rs b/lib/core/src/utils.rs index dc57137..851e380 100644 --- a/lib/core/src/utils.rs +++ b/lib/core/src/utils.rs @@ -1,7 +1,11 @@ use std::net::TcpStream; +use std::str::FromStr; use anyhow::{anyhow, ensure, Result}; -use boltz_client::swaps::boltzv2::SwapUpdate; +use boltz_client::swaps::{ + boltz::RevSwapStates, + boltzv2::{BoltzApiClientV2, Subscription, SwapUpdate}, +}; use log::{error, info}; use tungstenite::{stream::MaybeTlsStream, WebSocket}; @@ -65,3 +69,78 @@ pub(crate) fn get_swap_status_v2( } } } + +/// Fetch the reverse swap status using the websocket endpoint +pub(crate) fn get_rev_swap_status_v2( + client_v2: BoltzApiClientV2, + swap_id: &str, +) -> Result { + let mut socket = client_v2 + .connect_ws() + .map_err(|e| anyhow!("Failed to connect to websocket: {e:?}"))?; + + let sub_id = swap_id.to_string(); + let subscription = Subscription::new(&sub_id); + let subscribe_json = serde_json::to_string(&subscription) + .map_err(|e| anyhow!("Failed to serialize subscription msg: {e:?}"))?; + socket + .send(tungstenite::Message::Text(subscribe_json)) + .map_err(|e| anyhow!("Failed to subscribe to websocket updates: {e:?}"))?; + + loop { + let response: SwapUpdate = serde_json::from_str(&socket.read()?.to_string()) + .map_err(|e| anyhow!("WS response is invalid SwapUpdate: {e:?}"))?; + + match response { + SwapUpdate::Subscription { + event, + channel, + args, + } => { + ensure!(event == "subscribe", "Wrong WS reply event {event}"); + ensure!(channel == "swap.update", "Wrong WS reply channel {channel}"); + + let first_arg = args.first(); + let is_ok = matches!(first_arg.as_ref(), Some(&x) if x == &sub_id); + ensure!(is_ok, "Wrong WS reply subscription ID {first_arg:?}"); + + info!("Subscription successful for swap : {sub_id}"); + } + + SwapUpdate::Update { + event, + channel, + args, + } => { + ensure!(event == "update", "Wrong WS reply event {event}"); + ensure!(channel == "swap.update", "Wrong WS reply channel {channel}"); + + return match args.first() { + Some(update) if update.id == sub_id => { + info!("Got new reverse swap status: {}", update.status); + + RevSwapStates::from_str(&update.status).map_err(|_| { + anyhow!("Invalid state for rev swap {swap_id}: {}", update.status) + }) + } + Some(update) => Err(anyhow!("WS reply has wrong swap ID {update:?}")), + None => Err(anyhow!("WS reply contains no update")), + }; + } + + SwapUpdate::Error { + event, + channel, + args, + } => { + ensure!(event == "update", "Wrong WS reply event {event}"); + ensure!(channel == "swap.update", "Wrong WS reply channel {channel}"); + + for e in &args { + error!("Got error: {} for swap: {}", e.error, e.id); + } + return Err(anyhow!("Got SwapUpdate errors: {args:?}")); + } + } + } +}