Swap name consolidation (#218)

* Rename swap_in.rs, swap_out.rs

* Rename SwapIn, SwapOut structs SendSwap, ReceiveSwap

* Rename .*swap_in.* variables and methods to .*send_swap.*

* Rename .*swap_out.* variables and methods to .*receive_swap.*

* Rename Submarine Swap to Send Swap, Reverse Swap to Receive Swap

* Rename rev_swap_id to swap_id
This commit is contained in:
ok300
2024-05-23 08:14:27 +00:00
parent 6214d3d3b2
commit fcee44b1c2
6 changed files with 164 additions and 156 deletions

View File

@@ -19,17 +19,17 @@ use tungstenite::{Message, WebSocket};
use crate::model::*;
use crate::sdk::LiquidSdk;
static SWAP_IN_IDS: OnceLock<Arc<Mutex<HashSet<String>>>> = OnceLock::new();
static SWAP_OUT_IDS: OnceLock<Arc<Mutex<HashSet<String>>>> = OnceLock::new();
static SEND_SWAP_IDS: OnceLock<Arc<Mutex<HashSet<String>>>> = OnceLock::new();
static RECEIVE_SWAP_IDS: OnceLock<Arc<Mutex<HashSet<String>>>> = OnceLock::new();
fn swap_in_ids() -> &'static Arc<Mutex<HashSet<String>>> {
let swap_in_ids = Default::default();
SWAP_IN_IDS.get_or_init(|| swap_in_ids)
fn send_swap_ids() -> &'static Arc<Mutex<HashSet<String>>> {
let send_swap_ids = Default::default();
SEND_SWAP_IDS.get_or_init(|| send_swap_ids)
}
fn swap_out_ids() -> &'static Arc<Mutex<HashSet<String>>> {
let swap_out_ids = Default::default();
SWAP_OUT_IDS.get_or_init(|| swap_out_ids)
fn receive_swap_ids() -> &'static Arc<Mutex<HashSet<String>>> {
let receive_swap_ids = Default::default();
RECEIVE_SWAP_IDS.get_or_init(|| receive_swap_ids)
}
/// Set underlying TCP stream to nonblocking mode.
@@ -48,15 +48,15 @@ pub(super) struct BoltzStatusStream {}
impl BoltzStatusStream {
pub(super) fn mark_swap_as_tracked(id: &str, swap_type: SwapType) {
match swap_type {
SwapType::Submarine => swap_in_ids().lock().unwrap().insert(id.to_string()),
SwapType::ReverseSubmarine => swap_out_ids().lock().unwrap().insert(id.to_string()),
SwapType::Submarine => send_swap_ids().lock().unwrap().insert(id.to_string()),
SwapType::ReverseSubmarine => receive_swap_ids().lock().unwrap().insert(id.to_string()),
};
}
pub(super) fn unmark_swap_as_tracked(id: &str, swap_type: SwapType) {
match swap_type {
SwapType::Submarine => swap_in_ids().lock().unwrap().remove(id),
SwapType::ReverseSubmarine => swap_out_ids().lock().unwrap().remove(id),
SwapType::Submarine => send_swap_ids().lock().unwrap().remove(id),
SwapType::ReverseSubmarine => receive_swap_ids().lock().unwrap().remove(id),
};
}
@@ -140,30 +140,26 @@ impl BoltzStatusStream {
}) => {
for boltz_client::swaps::boltzv2::Update { id, status } in args
{
if Self::is_tracked_swap_in(&id) {
// Known OngoingSwapIn / Send swap
if Self::is_tracked_send_swap(&id) {
match SubSwapStates::from_str(&status) {
Ok(new_state) => {
let res = sdk.try_handle_submarine_swap_status(
let res = sdk.try_handle_send_swap_boltz_status(
new_state,
&id,
);
info!("OngoingSwapIn / send try_handle_submarine_swap_status res: {res:?}");
info!("Handled new Send Swap status from Boltz, result: {res:?}");
}
Err(_) => error!("Received invalid SubSwapState for swap {id}: {status}")
Err(_) => error!("Received invalid SubSwapState for Send Swap {id}: {status}")
}
} else if Self::is_tracked_swap_out(&id) {
// Known OngoingSwapOut / receive swap
} else if Self::is_tracked_receive_swap(&id) {
match RevSwapStates::from_str(&status) {
Ok(new_state) => {
let res = sdk.try_handle_reverse_swap_status(
let res = sdk.try_handle_receive_swap_boltz_status(
new_state, &id,
);
info!("OngoingSwapOut / receive try_handle_reverse_swap_status res: {res:?}");
info!("Handled new Receive Swap status from Boltz, result: {res:?}");
}
Err(_) => error!("Received invalid RevSwapState for swap {id}: {status}"),
Err(_) => error!("Received invalid RevSwapState for Receive Swap {id}: {status}"),
}
} else {
warn!("Received a status update for swap {id}, which is not tracked as ongoing")
@@ -203,8 +199,8 @@ impl BoltzStatusStream {
info!("Re-connected to WS stream");
// Clear monitored swaps, so on re-connect we re-subscribe to them
swap_in_ids().lock().unwrap().clear();
swap_out_ids().lock().unwrap().clear();
send_swap_ids().lock().unwrap().clear();
receive_swap_ids().lock().unwrap().clear();
}
Err(e) => warn!("Failed to re-connected to WS stream: {e:}"),
};
@@ -221,19 +217,19 @@ impl BoltzStatusStream {
Ok(())
}
fn is_tracked_swap_in(id: &str) -> bool {
swap_in_ids().lock().unwrap().contains(id)
fn is_tracked_send_swap(id: &str) -> bool {
send_swap_ids().lock().unwrap().contains(id)
}
fn is_tracked_swap_out(id: &str) -> bool {
swap_out_ids().lock().unwrap().contains(id)
fn is_tracked_receive_swap(id: &str) -> bool {
receive_swap_ids().lock().unwrap().contains(id)
}
fn maybe_subscribe_fn(swap: &Swap, socket: &mut WebSocket<MaybeTlsStream<TcpStream>>) {
let id = swap.id();
let is_ongoing_swap_already_tracked = match swap {
Swap::Send(_) => Self::is_tracked_swap_in(&id),
Swap::Receive(_) => Self::is_tracked_swap_out(&id),
Swap::Send(_) => Self::is_tracked_send_swap(&id),
Swap::Receive(_) => Self::is_tracked_receive_swap(&id),
};
if !is_ongoing_swap_already_tracked {

View File

@@ -141,13 +141,13 @@ pub struct RestoreRequest {
#[derive(Clone, Debug)]
pub(crate) enum Swap {
Send(SwapIn),
Receive(SwapOut),
Send(SendSwap),
Receive(ReceiveSwap),
}
impl Swap {
pub(crate) fn id(&self) -> String {
match &self {
Swap::Send(SwapIn { id, .. }) | Swap::Receive(SwapOut { id, .. }) => id.clone(),
Swap::Send(SendSwap { id, .. }) | Swap::Receive(ReceiveSwap { id, .. }) => id.clone(),
}
}
@@ -159,14 +159,14 @@ impl Swap {
}
}
/// A submarine swap, used for swap-in (Send)
/// A submarine swap, used for Send
#[derive(Clone, Debug)]
pub(crate) struct SwapIn {
pub(crate) struct SendSwap {
pub(crate) id: String,
pub(crate) invoice: String,
pub(crate) payer_amount_sat: u64,
pub(crate) receiver_amount_sat: u64,
/// JSON representation of [crate::persist::swap_in::InternalCreateSubmarineResponse]
/// JSON representation of [crate::persist::send::InternalCreateSubmarineResponse]
pub(crate) create_response_json: String,
/// Persisted only when the lockup tx is successfully broadcast
pub(crate) lockup_tx_id: Option<String>,
@@ -175,11 +175,11 @@ pub(crate) struct SwapIn {
pub(crate) created_at: u32,
pub(crate) state: PaymentState,
}
impl SwapIn {
impl SendSwap {
pub(crate) fn get_boltz_create_response(
&self,
) -> Result<CreateSubmarineResponse, PaymentError> {
let internal_create_response: crate::persist::swap_in::InternalCreateSubmarineResponse =
let internal_create_response: crate::persist::send::InternalCreateSubmarineResponse =
serde_json::from_str(&self.create_response_json).map_err(|e| {
PaymentError::Generic {
err: format!("Failed to deserialize InternalCreateSubmarineResponse: {e:?}"),
@@ -206,7 +206,7 @@ impl SwapIn {
expected_swap_id: &str,
) -> Result<String, PaymentError> {
let internal_create_response =
crate::persist::swap_in::InternalCreateSubmarineResponse::try_convert_from_boltz(
crate::persist::send::InternalCreateSubmarineResponse::try_convert_from_boltz(
create_response,
expected_swap_id,
)?;
@@ -222,12 +222,12 @@ impl SwapIn {
}
}
/// A reverse swap, used for swap-out (Receive)
/// A reverse swap, used for Receive
#[derive(Clone, Debug)]
pub(crate) struct SwapOut {
pub(crate) struct ReceiveSwap {
pub(crate) id: String,
pub(crate) preimage: String,
/// JSON representation of [crate::persist::swap_out::InternalCreateReverseResponse]
/// JSON representation of [crate::persist::receive::InternalCreateReverseResponse]
pub(crate) create_response_json: String,
pub(crate) invoice: String,
/// The amount of the invoice
@@ -239,9 +239,9 @@ pub(crate) struct SwapOut {
pub(crate) created_at: u32,
pub(crate) state: PaymentState,
}
impl SwapOut {
impl ReceiveSwap {
pub(crate) fn get_boltz_create_response(&self) -> Result<CreateReverseResponse, PaymentError> {
let internal_create_response: crate::persist::swap_out::InternalCreateReverseResponse =
let internal_create_response: crate::persist::receive::InternalCreateReverseResponse =
serde_json::from_str(&self.create_response_json).map_err(|e| {
PaymentError::Generic {
err: format!("Failed to deserialize InternalCreateReverseResponse: {e:?}"),
@@ -269,7 +269,7 @@ impl SwapOut {
expected_invoice: &str,
) -> Result<String, PaymentError> {
let internal_create_response =
crate::persist::swap_out::InternalCreateReverseResponse::try_convert_from_boltz(
crate::persist::receive::InternalCreateReverseResponse::try_convert_from_boltz(
create_response,
expected_swap_id,
expected_invoice,

View File

@@ -1,7 +1,7 @@
mod backup;
mod migrations;
pub(crate) mod swap_in;
pub(crate) mod swap_out;
pub(crate) mod receive;
pub(crate) mod send;
use std::{collections::HashMap, fs::create_dir_all, path::PathBuf, str::FromStr};
@@ -79,17 +79,17 @@ impl Persister {
pub(crate) fn list_ongoing_swaps(&self) -> Result<Vec<Swap>> {
let con = self.get_connection()?;
let ongoing_swap_ins: Vec<Swap> = self
let ongoing_send_swaps: Vec<Swap> = self
.list_ongoing_send_swaps(&con)?
.into_iter()
.map(Swap::Send)
.collect();
let ongoing_swap_outs: Vec<Swap> = self
let ongoing_receive_swaps: Vec<Swap> = self
.list_ongoing_receive_swaps(&con)?
.into_iter()
.map(Swap::Receive)
.collect();
Ok([ongoing_swap_ins, ongoing_swap_outs].concat())
Ok([ongoing_send_swaps, ongoing_receive_swaps].concat())
}
pub fn get_payments(&self) -> Result<HashMap<String, Payment>> {

View File

@@ -11,7 +11,7 @@ use rusqlite::{named_params, params, Connection, OptionalExtension, Row};
use serde::{Deserialize, Serialize};
impl Persister {
pub(crate) fn insert_swap_out(&self, swap_out: SwapOut) -> Result<()> {
pub(crate) fn insert_receive_swap(&self, receive_swap: ReceiveSwap) -> Result<()> {
let con = self.get_connection()?;
let mut stmt = con.prepare(
@@ -31,22 +31,22 @@ impl Persister {
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)?;
_ = stmt.execute((
swap_out.id,
swap_out.preimage,
swap_out.create_response_json,
swap_out.invoice,
swap_out.payer_amount_sat,
swap_out.receiver_amount_sat,
swap_out.created_at,
swap_out.claim_fees_sat,
swap_out.claim_tx_id,
swap_out.state,
receive_swap.id,
receive_swap.preimage,
receive_swap.create_response_json,
receive_swap.invoice,
receive_swap.payer_amount_sat,
receive_swap.receiver_amount_sat,
receive_swap.created_at,
receive_swap.claim_fees_sat,
receive_swap.claim_tx_id,
receive_swap.state,
))?;
Ok(())
}
fn list_swap_out_query(where_clauses: Vec<String>) -> String {
fn list_receive_swaps_query(where_clauses: Vec<String>) -> String {
let mut where_clause_str = String::new();
if !where_clauses.is_empty() {
where_clause_str = String::from("WHERE ");
@@ -73,14 +73,17 @@ impl Persister {
)
}
pub(crate) fn fetch_swap_out(con: &Connection, id: &str) -> rusqlite::Result<Option<SwapOut>> {
let query = Self::list_swap_out_query(vec!["id = ?1".to_string()]);
con.query_row(&query, [id], Self::sql_row_to_swap_out)
pub(crate) fn fetch_receive_swap(
con: &Connection,
id: &str,
) -> rusqlite::Result<Option<ReceiveSwap>> {
let query = Self::list_receive_swaps_query(vec!["id = ?1".to_string()]);
con.query_row(&query, [id], Self::sql_row_to_receive_swap)
.optional()
}
fn sql_row_to_swap_out(row: &Row) -> rusqlite::Result<SwapOut> {
Ok(SwapOut {
fn sql_row_to_receive_swap(row: &Row) -> rusqlite::Result<ReceiveSwap> {
Ok(ReceiveSwap {
id: row.get(0)?,
preimage: row.get(1)?,
create_response_json: row.get(2)?,
@@ -98,11 +101,11 @@ impl Persister {
&self,
con: &Connection,
where_clauses: Vec<String>,
) -> rusqlite::Result<Vec<SwapOut>> {
let query = Self::list_swap_out_query(where_clauses);
) -> rusqlite::Result<Vec<ReceiveSwap>> {
let query = Self::list_receive_swaps_query(where_clauses);
let ongoing_receive = con
.prepare(&query)?
.query_map(params![], Self::sql_row_to_swap_out)?
.query_map(params![], Self::sql_row_to_receive_swap)?
.map(|i| i.unwrap())
.collect();
Ok(ongoing_receive)
@@ -111,7 +114,7 @@ impl Persister {
pub(crate) fn list_ongoing_receive_swaps(
&self,
con: &Connection,
) -> rusqlite::Result<Vec<SwapOut>> {
) -> rusqlite::Result<Vec<ReceiveSwap>> {
let mut where_clause: Vec<String> = Vec::new();
where_clause.push(format!(
"state in ({})",
@@ -128,29 +131,32 @@ impl Persister {
pub(crate) fn list_pending_receive_swaps(
&self,
con: &Connection,
) -> rusqlite::Result<Vec<SwapOut>> {
let query = Self::list_swap_out_query(vec!["state = ?1".to_string()]);
) -> rusqlite::Result<Vec<ReceiveSwap>> {
let query = Self::list_receive_swaps_query(vec!["state = ?1".to_string()]);
let res = con
.prepare(&query)?
.query_map(params![PaymentState::Pending], Self::sql_row_to_swap_out)?
.query_map(
params![PaymentState::Pending],
Self::sql_row_to_receive_swap,
)?
.map(|i| i.unwrap())
.collect();
Ok(res)
}
/// Pending swap outs, indexed by claim_tx_id
/// Pending Receive Swaps, indexed by claim_tx_id
pub(crate) fn list_pending_receive_swaps_by_claim_tx_id(
&self,
con: &Connection,
) -> rusqlite::Result<HashMap<String, SwapOut>> {
) -> rusqlite::Result<HashMap<String, ReceiveSwap>> {
let res = self
.list_pending_receive_swaps(con)?
.iter()
.filter_map(|pending_swap_out| {
pending_swap_out
.filter_map(|pending_receive_swap| {
pending_receive_swap
.claim_tx_id
.as_ref()
.map(|claim_tx_id| (claim_tx_id.clone(), pending_swap_out.clone()))
.map(|claim_tx_id| (claim_tx_id.clone(), pending_receive_swap.clone()))
})
.collect();
Ok(res)

View File

@@ -11,7 +11,7 @@ use crate::model::*;
use crate::persist::Persister;
impl Persister {
pub(crate) fn insert_swap_in(&self, swap_in: SwapIn) -> Result<()> {
pub(crate) fn insert_send_swap(&self, send_swap: SendSwap) -> Result<()> {
let con = self.get_connection()?;
let mut stmt = con.prepare(
@@ -30,21 +30,21 @@ impl Persister {
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
)?;
_ = stmt.execute((
swap_in.id,
swap_in.invoice,
swap_in.payer_amount_sat,
swap_in.receiver_amount_sat,
swap_in.create_response_json,
swap_in.lockup_tx_id,
swap_in.refund_tx_id,
swap_in.created_at,
swap_in.state,
send_swap.id,
send_swap.invoice,
send_swap.payer_amount_sat,
send_swap.receiver_amount_sat,
send_swap.create_response_json,
send_swap.lockup_tx_id,
send_swap.refund_tx_id,
send_swap.created_at,
send_swap.state,
))?;
Ok(())
}
fn list_swap_in_query(where_clauses: Vec<String>) -> String {
fn list_send_swaps_query(where_clauses: Vec<String>) -> String {
let mut where_clause_str = String::new();
if !where_clauses.is_empty() {
where_clause_str = String::from("WHERE ");
@@ -70,14 +70,17 @@ impl Persister {
)
}
pub(crate) fn fetch_swap_in(con: &Connection, id: &str) -> rusqlite::Result<Option<SwapIn>> {
let query = Self::list_swap_in_query(vec!["id = ?1".to_string()]);
con.query_row(&query, [id], Self::sql_row_to_swap_in)
pub(crate) fn fetch_send_swap(
con: &Connection,
id: &str,
) -> rusqlite::Result<Option<SendSwap>> {
let query = Self::list_send_swaps_query(vec!["id = ?1".to_string()]);
con.query_row(&query, [id], Self::sql_row_to_send_swap)
.optional()
}
fn sql_row_to_swap_in(row: &Row) -> rusqlite::Result<SwapIn> {
Ok(SwapIn {
fn sql_row_to_send_swap(row: &Row) -> rusqlite::Result<SendSwap> {
Ok(SendSwap {
id: row.get(0)?,
invoice: row.get(1)?,
payer_amount_sat: row.get(2)?,
@@ -94,11 +97,11 @@ impl Persister {
&self,
con: &Connection,
where_clauses: Vec<String>,
) -> rusqlite::Result<Vec<SwapIn>> {
let query = Self::list_swap_in_query(where_clauses);
) -> rusqlite::Result<Vec<SendSwap>> {
let query = Self::list_send_swaps_query(where_clauses);
let ongoing_send = con
.prepare(&query)?
.query_map(params![], Self::sql_row_to_swap_in)?
.query_map(params![], Self::sql_row_to_send_swap)?
.map(|i| i.unwrap())
.collect();
Ok(ongoing_send)
@@ -107,7 +110,7 @@ impl Persister {
pub(crate) fn list_ongoing_send_swaps(
&self,
con: &Connection,
) -> rusqlite::Result<Vec<SwapIn>> {
) -> rusqlite::Result<Vec<SendSwap>> {
let mut where_clause: Vec<String> = Vec::new();
where_clause.push(format!(
"state in ({})",
@@ -124,29 +127,29 @@ impl Persister {
pub(crate) fn list_pending_send_swaps(
&self,
con: &Connection,
) -> rusqlite::Result<Vec<SwapIn>> {
let query = Self::list_swap_in_query(vec!["state = ?1".to_string()]);
) -> rusqlite::Result<Vec<SendSwap>> {
let query = Self::list_send_swaps_query(vec!["state = ?1".to_string()]);
let res = con
.prepare(&query)?
.query_map(params![PaymentState::Pending], Self::sql_row_to_swap_in)?
.query_map(params![PaymentState::Pending], Self::sql_row_to_send_swap)?
.map(|i| i.unwrap())
.collect();
Ok(res)
}
/// Pending swap ins, indexed by refund tx id
/// Pending Send swaps, indexed by refund tx id
pub(crate) fn list_pending_send_swaps_by_refund_tx_id(
&self,
con: &Connection,
) -> rusqlite::Result<HashMap<String, SwapIn>> {
let res: HashMap<String, SwapIn> = self
) -> rusqlite::Result<HashMap<String, SendSwap>> {
let res: HashMap<String, SendSwap> = self
.list_pending_send_swaps(con)?
.iter()
.filter_map(|pending_swap_in| {
pending_swap_in
.filter_map(|pending_send_swap| {
pending_send_swap
.refund_tx_id
.as_ref()
.map(|refund_tx_id| (refund_tx_id.clone(), pending_swap_in.clone()))
.map(|refund_tx_id| (refund_tx_id.clone(), pending_send_swap.clone()))
})
.collect();
Ok(res)

View File

@@ -176,10 +176,10 @@ impl LiquidSdk {
);
let con = self.persister.get_connection()?;
let swap = Persister::fetch_swap_out(&con, swap_id)
let swap = Persister::fetch_receive_swap(&con, swap_id)
.map_err(|_| PaymentError::PersistError)?
.ok_or(PaymentError::Generic {
err: format!("Swap Out not found {swap_id}"),
err: format!("Receive Swap not found {swap_id}"),
})?;
Self::validate_state_transition(swap.state, to_state)?;
@@ -198,10 +198,10 @@ impl LiquidSdk {
info!("Transitioning Send swap {swap_id} to {to_state:?} (lockup_tx_id = {lockup_tx_id:?}, refund_tx_id = {refund_tx_id:?})");
let con = self.persister.get_connection()?;
let swap = Persister::fetch_swap_in(&con, swap_id)
let swap = Persister::fetch_send_swap(&con, swap_id)
.map_err(|_| PaymentError::PersistError)?
.ok_or(PaymentError::Generic {
err: format!("Swap In not found {swap_id}"),
err: format!("Send Swap not found {swap_id}"),
})?;
Self::validate_state_transition(swap.state, to_state)?;
@@ -215,18 +215,18 @@ impl LiquidSdk {
}
/// Handles status updates from Boltz for Receive swaps
pub(crate) fn try_handle_reverse_swap_status(
pub(crate) fn try_handle_receive_swap_boltz_status(
&self,
swap_state: RevSwapStates,
id: &str,
) -> Result<()> {
self.sync()?;
info!("Handling reverse swap transition to {swap_state:?} for swap {id}");
info!("Handling Receive Swap transition to {swap_state:?} for swap {id}");
let con = self.persister.get_connection()?;
let swap_out = Persister::fetch_swap_out(&con, id)?
.ok_or(anyhow!("No ongoing swap out found for ID {id}"))?;
let receive_swap = Persister::fetch_receive_swap(&con, id)?
.ok_or(anyhow!("No ongoing Receive Swap found for ID {id}"))?;
match swap_state {
RevSwapStates::SwapExpired
@@ -242,15 +242,15 @@ impl LiquidSdk {
RevSwapStates::TransactionMempool
// The lockup tx is confirmed => try to claim
| RevSwapStates::TransactionConfirmed => {
match swap_out.claim_tx_id {
match receive_swap.claim_tx_id {
Some(claim_tx_id) => {
warn!("Claim tx for reverse swap {id} was already broadcast: txid {claim_tx_id}")
warn!("Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}")
}
None => match self.try_claim(&swap_out) {
None => match self.try_claim(&receive_swap) {
Ok(()) => {}
Err(err) => match err {
PaymentError::AlreadyClaimed => warn!("Funds already claimed for reverse swap {id}"),
_ => error!("Claim reverse swap {id} failed: {err}")
PaymentError::AlreadyClaimed => warn!("Funds already claimed for Receive Swap {id}"),
_ => error!("Claim for Receive Swap {id} failed: {err}")
}
},
}
@@ -267,27 +267,27 @@ impl LiquidSdk {
}
/// Handles status updates from Boltz for Send swaps
pub(crate) fn try_handle_submarine_swap_status(
pub(crate) fn try_handle_send_swap_boltz_status(
&self,
swap_state: SubSwapStates,
id: &str,
) -> Result<()> {
self.sync()?;
info!("Handling submarine swap transition to {swap_state:?} for swap {id}");
info!("Handling Send Swap transition to {swap_state:?} for swap {id}");
let con = self.persister.get_connection()?;
let ongoing_swap_in = Persister::fetch_swap_in(&con, id)?
.ok_or(anyhow!("No ongoing swap in found for ID {id}"))?;
let ongoing_send_swap = Persister::fetch_send_swap(&con, id)?
.ok_or(anyhow!("No ongoing Send Swap found for ID {id}"))?;
let create_response: CreateSubmarineResponse =
ongoing_swap_in.get_boltz_create_response()?;
ongoing_send_swap.get_boltz_create_response()?;
let receiver_amount_sat = get_invoice_amount!(ongoing_swap_in.invoice);
let receiver_amount_sat = get_invoice_amount!(ongoing_send_swap.invoice);
let keypair = self.get_submarine_keys(0)?;
match swap_state {
SubSwapStates::TransactionClaimPending => {
let lockup_tx_id = ongoing_swap_in.lockup_tx_id.ok_or(anyhow!(
let lockup_tx_id = ongoing_send_swap.lockup_tx_id.ok_or(anyhow!(
"Swap-in {id} is pending but no lockup txid is present"
))?;
@@ -300,7 +300,7 @@ impl LiquidSdk {
self.post_submarine_claim_details(
id,
&swap_script,
&ongoing_swap_in.invoice,
&ongoing_send_swap.invoice,
&keypair,
)
.map_err(|e| anyhow!("Could not post claim details. Err: {e:?}"))?;
@@ -310,7 +310,7 @@ impl LiquidSdk {
self.persister.insert_or_update_payment(PaymentTxData {
tx_id: lockup_tx_id,
timestamp: None,
amount_sat: ongoing_swap_in.payer_amount_sat,
amount_sat: ongoing_send_swap.payer_amount_sat,
payment_type: PaymentType::Send,
is_confirmed: false,
})?;
@@ -343,7 +343,7 @@ impl LiquidSdk {
Ok(())
}
_ => Err(anyhow!("New state for submarine swap {id}: {swap_state:?}")),
_ => Err(anyhow!("New state for Send Swap {id}: {swap_state:?}")),
}
}
@@ -657,7 +657,7 @@ impl LiquidSdk {
&create_response,
keypair.public_key().into(),
)?;
let create_response_json = SwapIn::from_boltz_struct_to_json(&create_response, swap_id)?;
let create_response_json = SendSwap::from_boltz_struct_to_json(&create_response, swap_id)?;
debug!("Opening WS connection for swap {swap_id}");
let mut socket = client.connect_ws()?;
@@ -673,7 +673,7 @@ impl LiquidSdk {
// We mark the pending send as already tracked to avoid it being handled by the status stream
BoltzStatusStream::mark_swap_as_tracked(swap_id, SwapType::Submarine);
self.persister.insert_swap_in(SwapIn {
self.persister.insert_send_swap(SendSwap {
id: swap_id.clone(),
invoice: req.invoice.clone(),
payer_amount_sat: req.fees_sat + receiver_amount_sat,
@@ -711,7 +711,7 @@ impl LiquidSdk {
// Check that we have not persisted the swap already
let con = self.persister.get_connection()?;
if let Some(ongoing_swap) = Persister::fetch_swap_in(&con, swap_id)
if let Some(ongoing_swap) = Persister::fetch_send_swap(&con, swap_id)
.map_err(|_| PaymentError::PersistError)?
{
if ongoing_swap.lockup_tx_id.is_some() {
@@ -726,7 +726,7 @@ impl LiquidSdk {
// Boltz has detected the lockup in the mempool, we can speed up
// the claim by doing so cooperatively
SubSwapStates::TransactionClaimPending => {
// TODO Consolidate status handling: merge with and reuse try_handle_submarine_swap_status
// TODO Consolidate status handling: merge with and reuse try_handle_send_swap_boltz_status
self.post_submarine_claim_details(
swap_id,
@@ -772,19 +772,19 @@ impl LiquidSdk {
result
}
fn try_claim(&self, ongoing_swap_out: &SwapOut) -> Result<(), PaymentError> {
fn try_claim(&self, ongoing_receive_swap: &ReceiveSwap) -> Result<(), PaymentError> {
ensure_sdk!(
ongoing_swap_out.claim_tx_id.is_none(),
ongoing_receive_swap.claim_tx_id.is_none(),
PaymentError::AlreadyClaimed
);
let rev_swap_id = &ongoing_swap_out.id;
debug!("Trying to claim reverse swap {rev_swap_id}",);
let swap_id = &ongoing_receive_swap.id;
debug!("Trying to claim Receive Swap {swap_id}",);
let lsk = self.get_liquid_swap_key()?;
let our_keys = lsk.keypair;
let create_response = ongoing_swap_out.get_boltz_create_response()?;
let create_response = ongoing_receive_swap.get_boltz_create_response()?;
let swap_script = LBtcSwapScriptV2::reverse_from_swap_resp(
&create_response,
our_keys.public_key().into(),
@@ -796,15 +796,15 @@ impl LiquidSdk {
claim_address,
&self.network_config(),
self.boltz_url_v2().into(),
ongoing_swap_out.id.clone(),
ongoing_receive_swap.id.clone(),
)?;
let claim_tx = claim_tx_wrapper.sign_claim(
&our_keys,
&Preimage::from_str(&ongoing_swap_out.preimage)?,
Amount::from_sat(ongoing_swap_out.claim_fees_sat),
&Preimage::from_str(&ongoing_receive_swap.preimage)?,
Amount::from_sat(ongoing_receive_swap.claim_fees_sat),
// Enable cooperative claim (Some) or not (None)
Some((&self.boltz_client_v2(), rev_swap_id.clone())),
Some((&self.boltz_client_v2(), swap_id.clone())),
// None
)?;
@@ -813,17 +813,17 @@ impl LiquidSdk {
&self.network_config(),
Some((&self.boltz_client_v2(), self.network.into())),
)?;
info!("Successfully broadcast claim tx {claim_tx_id} for rev swap {rev_swap_id}");
info!("Successfully broadcast claim tx {claim_tx_id} for Receive Swap {swap_id}");
debug!("Claim Tx {:?}", claim_tx);
self.try_handle_receive_swap_update(rev_swap_id, Pending, Some(&claim_tx_id))?;
self.try_handle_receive_swap_update(swap_id, Pending, Some(&claim_tx_id))?;
// We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while
// This makes the tx known to the SDK (get_info, list_payments) instantly
self.persister.insert_or_update_payment(PaymentTxData {
tx_id: claim_tx_id,
timestamp: None,
amount_sat: ongoing_swap_out.receiver_amount_sat,
amount_sat: ongoing_receive_swap.receiver_amount_sat,
payment_type: PaymentType::Receive,
is_confirmed: false,
})?;
@@ -851,7 +851,7 @@ impl LiquidSdk {
.within(payer_amount_sat)
.map_err(|_| PaymentError::AmountOutOfRange)?;
debug!("Preparing reverse swap with: payer_amount_sat {payer_amount_sat} sat, fees_sat {fees_sat} sat");
debug!("Preparing Receive Swap with: payer_amount_sat {payer_amount_sat} sat, fees_sat {fees_sat} sat");
Ok(PrepareReceiveResponse {
payer_amount_sat,
@@ -874,7 +874,7 @@ impl LiquidSdk {
let new_fees_sat = reverse_pair.fees.total(req.payer_amount_sat);
ensure_sdk!(fees_sat == new_fees_sat, PaymentError::InvalidOrExpiredFees);
debug!("Creating reverse swap with: payer_amount_sat {payer_amount_sat} sat, fees_sat {fees_sat} sat");
debug!("Creating Receive Swap with: payer_amount_sat {payer_amount_sat} sat, fees_sat {fees_sat} sat");
let lsk = self.get_liquid_swap_key()?;
@@ -908,10 +908,13 @@ impl LiquidSdk {
return Err(PaymentError::InvalidInvoice);
};
let create_response_json =
SwapOut::from_boltz_struct_to_json(&create_response, &swap_id, &invoice.to_string())?;
let create_response_json = ReceiveSwap::from_boltz_struct_to_json(
&create_response,
&swap_id,
&invoice.to_string(),
)?;
self.persister
.insert_swap_out(SwapOut {
.insert_receive_swap(ReceiveSwap {
id: swap_id.clone(),
preimage: preimage_str,
create_response_json,
@@ -941,10 +944,10 @@ impl LiquidSdk {
}
let con = self.persister.get_connection()?;
let pending_receive_swaps_by_claim_tx_id: HashMap<String, SwapOut> = self
let pending_receive_swaps_by_claim_tx_id: HashMap<String, ReceiveSwap> = self
.persister
.list_pending_receive_swaps_by_claim_tx_id(&con)?;
let pending_send_swaps_by_refund_tx_id: HashMap<String, SwapIn> = self
let pending_send_swaps_by_refund_tx_id: HashMap<String, SendSwap> = self
.persister
.list_pending_send_swaps_by_refund_tx_id(&con)?;