diff --git a/cli/Cargo.lock b/cli/Cargo.lock index a668182..fb166bc 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -440,6 +440,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", + "tokio-stream", "tokio-tungstenite", "url", ] @@ -2452,6 +2453,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-tungstenite" version = "0.21.0" diff --git a/lib/Cargo.lock b/lib/Cargo.lock index 9d88457..68fe9c8 100644 --- a/lib/Cargo.lock +++ b/lib/Cargo.lock @@ -545,6 +545,7 @@ dependencies = [ "tempdir", "thiserror", "tokio", + "tokio-stream", "tokio-tungstenite", "url", "uuid", @@ -2881,6 +2882,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-tungstenite" version = "0.21.0" diff --git a/lib/core/Cargo.toml b/lib/core/Cargo.toml index ca0d8cd..2fae22e 100644 --- a/lib/core/Cargo.toml +++ b/lib/core/Cargo.toml @@ -33,6 +33,7 @@ thiserror = { workspace = true } tokio-tungstenite = { version = "0.21.0", features = ["native-tls-vendored"] } openssl = { version = "0.10", features = ["vendored"] } tokio = { version = "1", features = ["rt", "macros"] } +tokio-stream = { version = "0.1.14", features = ["sync"] } url = "2.5.0" futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] } async-trait = "0.1.80" diff --git a/lib/core/src/lib.rs b/lib/core/src/lib.rs index 2d8f61f..3ab8fb2 100644 --- a/lib/core/src/lib.rs +++ b/lib/core/src/lib.rs @@ -7,6 +7,7 @@ pub mod frb; pub mod logger; pub mod model; pub mod persist; +pub(crate) mod receive_swap; pub mod sdk; pub(crate) mod send_swap; pub(crate) mod swapper; diff --git a/lib/core/src/receive_swap.rs b/lib/core/src/receive_swap.rs new file mode 100644 index 0000000..14d7806 --- /dev/null +++ b/lib/core/src/receive_swap.rs @@ -0,0 +1,176 @@ +use crate::ensure_sdk; +use crate::model::PaymentState::{Complete, Created, Failed, Pending, TimedOut}; +use crate::model::{PaymentTxData, PaymentType, ReceiveSwap}; +use crate::{ + error::PaymentError, model::PaymentState, persist::Persister, swapper::Swapper, + wallet::OnchainWallet, +}; +use anyhow::{anyhow, Result}; +use boltz_client::swaps::boltz::RevSwapStates; +use log::{debug, error, info, warn}; +use std::{str::FromStr, sync::Arc}; +use tokio::sync::broadcast; + +pub(crate) struct ReceiveSwapStateHandler { + onchain_wallet: Arc, + persister: Arc, + swapper: Arc, + subscription_notifier: broadcast::Sender, +} + +impl ReceiveSwapStateHandler { + pub(crate) fn new( + onchain_wallet: Arc, + persister: Arc, + swapper: Arc, + ) -> Self { + let (subscription_notifier, _) = broadcast::channel::(30); + Self { + onchain_wallet, + persister, + swapper, + subscription_notifier, + } + } + + pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver { + self.subscription_notifier.subscribe() + } + + /// Handles status updates from Boltz for Receive swaps + pub(crate) async fn on_new_status(&self, swap_state: &str, id: &str) -> Result<()> { + let receive_swap = self + .persister + .fetch_receive_swap(id)? + .ok_or(anyhow!("No ongoing Receive Swap found for ID {id}"))?; + + info!("Handling Receive Swap transition to {swap_state:?} for swap {id}"); + + match RevSwapStates::from_str(swap_state) { + Ok(RevSwapStates::SwapExpired + | RevSwapStates::InvoiceExpired + | RevSwapStates::TransactionFailed + | RevSwapStates::TransactionRefunded) => { + error!("Swap {id} entered into an unrecoverable state: {swap_state:?}"); + self.update_swap_info(id, Failed, None).await?; + Ok(()) + } + + // The lockup tx is in the mempool and we accept 0-conf => try to claim + // TODO Add 0-conf preconditions check: https://github.com/breez/breez-liquid-sdk/issues/187 + Ok(RevSwapStates::TransactionMempool + // The lockup tx is confirmed => try to claim + | RevSwapStates::TransactionConfirmed) => { + match receive_swap.claim_tx_id { + Some(claim_tx_id) => { + warn!("Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}") + } + None => { + self.update_swap_info(&receive_swap.id, Pending, None) + .await?; + match self.claim(&receive_swap).await { + Ok(_) => {} + Err(err) => match err { + PaymentError::AlreadyClaimed => warn!("Funds already claimed for Receive Swap {id}"), + _ => error!("Claim for Receive Swap {id} failed: {err}") + } + } + } + } + Ok(()) + } + + Ok(_) => { + debug!("Unhandled state for Receive Swap {id}: {swap_state}"); + Ok(()) + }, + + _ => Err(anyhow!("Invalid RevSwapState for Receive Swap {id}: {swap_state}")), + } + } + + /// Transitions a Receive swap to a new state + pub(crate) async fn update_swap_info( + &self, + swap_id: &str, + to_state: PaymentState, + claim_tx_id: Option<&str>, + ) -> Result<(), PaymentError> { + info!( + "Transitioning Receive swap {swap_id} to {to_state:?} (claim_tx_id = {claim_tx_id:?})" + ); + + let swap = self + .persister + .fetch_receive_swap(swap_id) + .map_err(|_| PaymentError::PersistError)? + .ok_or(PaymentError::Generic { + err: format!("Receive Swap not found {swap_id}"), + })?; + let payment_id = claim_tx_id.map(|c| c.to_string()).or(swap.claim_tx_id); + + Self::validate_state_transition(swap.state, to_state)?; + self.persister + .try_handle_receive_swap_update(swap_id, to_state, claim_tx_id)?; + + if let Some(payment_id) = payment_id { + let _ = self.subscription_notifier.send(payment_id); + } + Ok(()) + } + + async fn claim(&self, ongoing_receive_swap: &ReceiveSwap) -> Result<(), PaymentError> { + ensure_sdk!( + ongoing_receive_swap.claim_tx_id.is_none(), + PaymentError::AlreadyClaimed + ); + let swap_id = &ongoing_receive_swap.id; + let claim_address = self.onchain_wallet.next_unused_address().await?.to_string(); + let claim_tx_id = self + .swapper + .claim_receive_swap(ongoing_receive_swap, claim_address)?; + + // We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while + // This makes the tx known to the SDK (get_info, list_payments) instantly + self.persister.insert_or_update_payment(PaymentTxData { + tx_id: claim_tx_id.clone(), + timestamp: None, + amount_sat: ongoing_receive_swap.receiver_amount_sat, + payment_type: PaymentType::Receive, + is_confirmed: false, + })?; + + self.update_swap_info(swap_id, Pending, Some(&claim_tx_id)) + .await?; + + Ok(()) + } + + fn validate_state_transition( + from_state: PaymentState, + to_state: PaymentState, + ) -> Result<(), PaymentError> { + match (from_state, to_state) { + (_, Created) => Err(PaymentError::Generic { + err: "Cannot transition to Created state".to_string(), + }), + + (Created | Pending, Pending) => Ok(()), + (Complete | Failed | TimedOut, Pending) => Err(PaymentError::Generic { + err: format!("Cannot transition from {from_state:?} to Pending state"), + }), + + (Created | Pending, Complete) => Ok(()), + (Complete | Failed | TimedOut, Complete) => Err(PaymentError::Generic { + err: format!("Cannot transition from {from_state:?} to Complete state"), + }), + + (Created, TimedOut) => Ok(()), + (_, TimedOut) => Err(PaymentError::Generic { + err: format!("Cannot transition from {from_state:?} to TimedOut state"), + }), + + (_, Failed) => Ok(()), + } + } +} diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 3684151..08c7f5a 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -1,13 +1,11 @@ -use anyhow::{anyhow, Result}; +use anyhow::Result; use async_trait::async_trait; use boltz_client::lightning_invoice::Bolt11InvoiceDescription; use boltz_client::swaps::boltzv2; use boltz_client::ToHex; -use boltz_client::{ - swaps::{boltz::RevSwapStates, boltzv2::*}, - util::secrets::Preimage, - Amount, Bolt11Invoice, -}; +use boltz_client::{swaps::boltzv2::*, util::secrets::Preimage, Amount, Bolt11Invoice}; +use futures_util::stream::select_all; +use futures_util::StreamExt; use log::{debug, error, info, warn}; use lwk_wollet::{elements::LockTime, ElementsNetwork}; use std::time::Instant; @@ -20,9 +18,11 @@ use std::{ }; use tokio::sync::{watch, RwLock}; use tokio::time::MissedTickBehavior; +use tokio_stream::wrappers::BroadcastStream; use crate::error::LiquidSdkError; use crate::model::PaymentState::*; +use crate::receive_swap::ReceiveSwapStateHandler; use crate::send_swap::SendSwapStateHandler; use crate::swapper::{BoltzSwapper, ReconnectHandler, Swapper, SwapperStatusStream}; use crate::wallet::{LiquidOnchainWallet, OnchainWallet}; @@ -52,6 +52,8 @@ pub struct LiquidSdk { is_started: RwLock, shutdown_sender: watch::Sender<()>, shutdown_receiver: watch::Receiver<()>, + send_swap_state_handler: SendSwapStateHandler, + receive_swap_state_handler: ReceiveSwapStateHandler, } impl LiquidSdk { @@ -75,9 +77,22 @@ impl LiquidSdk { let swapper = Arc::new(BoltzSwapper::new(config.clone())); let status_stream = Arc::::from(swapper.create_status_stream()); + let onchain_wallet = Arc::new(LiquidOnchainWallet::new(mnemonic, config.clone())?); + let send_swap_state_handler = SendSwapStateHandler::new( + config.clone(), + onchain_wallet.clone(), + persister.clone(), + swapper.clone(), + ); + let receive_swap_state_handler = ReceiveSwapStateHandler::new( + onchain_wallet.clone(), + persister.clone(), + swapper.clone(), + ); + let sdk = Arc::new(LiquidSdk { config: config.clone(), - onchain_wallet: Arc::new(LiquidOnchainWallet::new(mnemonic, config)?), + onchain_wallet, persister: persister.clone(), event_manager, status_stream: status_stream.clone(), @@ -85,8 +100,9 @@ impl LiquidSdk { is_started: RwLock::new(false), shutdown_sender, shutdown_receiver, + send_swap_state_handler, + receive_swap_state_handler, }); - Ok(sdk) } @@ -168,23 +184,26 @@ impl LiquidSdk { tokio::spawn(async move { let mut shutdown_receiver = cloned.shutdown_receiver.clone(); let mut updates_stream = cloned.status_stream.subscribe_swap_updates(); - let send_swap_state_handler = SendSwapStateHandler::new( - cloned.config.clone(), - cloned.onchain_wallet.clone(), - cloned.persister.clone(), - cloned.swapper.clone(), - ); - let mut swap_state_changes = send_swap_state_handler.subscribe_payment_updates(); + let swaps_streams = vec![ + cloned.send_swap_state_handler.subscribe_payment_updates(), + cloned + .receive_swap_state_handler + .subscribe_payment_updates(), + ]; + let mut combined_swap_streams = + select_all(swaps_streams.into_iter().map(BroadcastStream::new)); loop { tokio::select! { - payment_id = swap_state_changes.recv() => { - match payment_id { - Ok(payment_id) => { - if let Err(e) = cloned.emit_payment_updated(Some(payment_id)).await { - error!("Failed to emit payment update: {e:?}"); + payment_id = combined_swap_streams.next() => { + if let Some(payment_id) = payment_id { + match payment_id { + Ok(payment_id) => { + if let Err(e) = cloned.emit_payment_updated(Some(payment_id)).await { + error!("Failed to emit payment update: {e:?}"); + } } - } - Err(e) => error!("Failed to receive send swap state change: {e:?}") + Err(e) => error!("Failed to receive swap state change: {e:?}") + } } } update = updates_stream.recv() => match update { @@ -192,7 +211,7 @@ impl LiquidSdk { let _ = cloned.sync().await; match cloned.persister.fetch_send_swap_by_id(&id) { Ok(Some(_)) => { - match send_swap_state_handler.on_new_status(&status, &id).await { + match cloned.send_swap_state_handler.on_new_status(&status, &id).await { Ok(_) => info!("Succesfully handled Send Swap {id} update"), Err(e) => error!("Failed to handle Send Swap {id} update: {e}") } @@ -200,7 +219,7 @@ impl LiquidSdk { _ => { match cloned.persister.fetch_receive_swap(&id) { Ok(Some(_)) => { - match cloned.try_handle_receive_swap_boltz_status(&status, &id).await { + match cloned.receive_swap_state_handler.on_new_status(&status, &id).await { Ok(_) => info!("Succesfully handled Receive Swap {id} update"), Err(e) => error!("Failed to handle Receive Swap {id} update: {e}") } @@ -294,61 +313,6 @@ impl LiquidSdk { Ok(()) } - fn validate_state_transition( - from_state: PaymentState, - to_state: PaymentState, - ) -> Result<(), PaymentError> { - match (from_state, to_state) { - (_, Created) => Err(PaymentError::Generic { - err: "Cannot transition to Created state".to_string(), - }), - - (Created | Pending, Pending) => Ok(()), - (Complete | Failed | TimedOut, Pending) => Err(PaymentError::Generic { - err: format!("Cannot transition from {from_state:?} to Pending state"), - }), - - (Created | Pending, Complete) => Ok(()), - (Complete | Failed | TimedOut, Complete) => Err(PaymentError::Generic { - err: format!("Cannot transition from {from_state:?} to Complete state"), - }), - - (Created, TimedOut) => Ok(()), - (_, TimedOut) => Err(PaymentError::Generic { - err: format!("Cannot transition from {from_state:?} to TimedOut state"), - }), - - (_, Failed) => Ok(()), - } - } - - /// Transitions a Receive swap to a new state - pub(crate) async fn try_handle_receive_swap_update( - &self, - swap_id: &str, - to_state: PaymentState, - claim_tx_id: Option<&str>, - ) -> Result<(), PaymentError> { - info!( - "Transitioning Receive swap {swap_id} to {to_state:?} (claim_tx_id = {claim_tx_id:?})" - ); - - let swap = self - .persister - .fetch_receive_swap(swap_id) - .map_err(|_| PaymentError::PersistError)? - .ok_or(PaymentError::Generic { - err: format!("Receive Swap not found {swap_id}"), - })?; - let payment_id = claim_tx_id.map(|c| c.to_string()).or(swap.claim_tx_id); - - Self::validate_state_transition(swap.state, to_state)?; - self.persister - .try_handle_receive_swap_update(swap_id, to_state, claim_tx_id)?; - - Ok(self.emit_payment_updated(payment_id).await?) - } - async fn emit_payment_updated(&self, payment_id: Option) -> Result<()> { if let Some(id) = payment_id { match self.persister.get_payment(id.clone())? { @@ -442,62 +406,6 @@ impl LiquidSdk { Ok(()) } - /// Handles status updates from Boltz for Receive swaps - pub(crate) async fn try_handle_receive_swap_boltz_status( - &self, - swap_state: &str, - id: &str, - ) -> Result<()> { - let receive_swap = self - .persister - .fetch_receive_swap(id)? - .ok_or(anyhow!("No ongoing Receive Swap found for ID {id}"))?; - - info!("Handling Receive Swap transition to {swap_state:?} for swap {id}"); - - match RevSwapStates::from_str(swap_state) { - Ok(RevSwapStates::SwapExpired - | RevSwapStates::InvoiceExpired - | RevSwapStates::TransactionFailed - | RevSwapStates::TransactionRefunded) => { - error!("Swap {id} entered into an unrecoverable state: {swap_state:?}"); - self.try_handle_receive_swap_update(id, Failed, None).await?; - Ok(()) - } - - // The lockup tx is in the mempool and we accept 0-conf => try to claim - // TODO Add 0-conf preconditions check: https://github.com/breez/breez-liquid-sdk/issues/187 - Ok(RevSwapStates::TransactionMempool - // The lockup tx is confirmed => try to claim - | RevSwapStates::TransactionConfirmed) => { - match receive_swap.claim_tx_id { - Some(claim_tx_id) => { - warn!("Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}") - } - None => { - self.try_handle_receive_swap_update(&receive_swap.id, Pending, None) - .await?; - match self.try_claim(&receive_swap).await { - Ok(_) => {} - Err(err) => match err { - PaymentError::AlreadyClaimed => warn!("Funds already claimed for Receive Swap {id}"), - _ => error!("Claim for Receive Swap {id} failed: {err}") - } - } - } - } - Ok(()) - } - - Ok(_) => { - debug!("Unhandled state for Receive Swap {id}: {swap_state}"); - Ok(()) - }, - - _ => Err(anyhow!("Invalid RevSwapState for Receive Swap {id}: {swap_state}")), - } - } - pub async fn get_info(&self, req: GetInfoRequest) -> Result { self.ensure_is_started().await?; debug!( @@ -801,33 +709,6 @@ impl LiquidSdk { } } - async fn try_claim(&self, ongoing_receive_swap: &ReceiveSwap) -> Result<(), PaymentError> { - ensure_sdk!( - ongoing_receive_swap.claim_tx_id.is_none(), - PaymentError::AlreadyClaimed - ); - let swap_id = &ongoing_receive_swap.id; - let claim_address = self.onchain_wallet.next_unused_address().await?.to_string(); - let claim_tx_id = self - .swapper - .claim_receive_swap(ongoing_receive_swap, claim_address)?; - - // We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while - // This makes the tx known to the SDK (get_info, list_payments) instantly - self.persister.insert_or_update_payment(PaymentTxData { - tx_id: claim_tx_id.clone(), - timestamp: None, - amount_sat: ongoing_receive_swap.receiver_amount_sat, - payment_type: PaymentType::Receive, - is_confirmed: false, - })?; - - self.try_handle_receive_swap_update(swap_id, Pending, Some(&claim_tx_id)) - .await?; - - Ok(()) - } - pub async fn prepare_receive_payment( &self, req: &PrepareReceiveRequest, @@ -952,7 +833,9 @@ impl LiquidSdk { self.persister.clone(), self.swapper.clone(), ); - for tx in self.onchain_wallet.transactions().await? { + let txs = self.onchain_wallet.transactions().await?; + let num = txs.len(); + for tx in txs { let tx_id = tx.txid.to_string(); let is_tx_confirmed = tx.height.is_some(); let amount_sat = tx.balance.values().sum::(); @@ -960,7 +843,8 @@ impl LiquidSdk { // Transition the swaps whose state depends on this tx being confirmed if is_tx_confirmed { if let Some(swap) = pending_receive_swaps_by_claim_tx_id.get(&tx_id) { - self.try_handle_receive_swap_update(&swap.id, Complete, None) + self.receive_swap_state_handler + .update_swap_info(&swap.id, Complete, None) .await?; } if let Some(swap) = pending_send_swaps_by_refund_tx_id.get(&tx_id) { @@ -968,8 +852,15 @@ impl LiquidSdk { .update_swap_info(&swap.id, Failed, None, None, None) .await?; } + } else { + info!("*******Processing unconfirmed tx: {tx_id}"); } + print!( + "*******Got tx num: {num}: {tx_id} height: {:?} \n", + tx.height + ); + self.persister.insert_or_update_payment(PaymentTxData { tx_id, timestamp: tx.timestamp,