Merge pull request #296 from breez/receive-swap-handler

Extract out receive swap handling logic
This commit is contained in:
Roei Erez
2024-06-06 15:38:45 +03:00
committed by GitHub
6 changed files with 259 additions and 164 deletions

13
cli/Cargo.lock generated
View File

@@ -440,6 +440,7 @@ dependencies = [
"serde_json",
"thiserror",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"url",
]
@@ -2452,6 +2453,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-tungstenite"
version = "0.21.0"

13
lib/Cargo.lock generated
View File

@@ -545,6 +545,7 @@ dependencies = [
"tempdir",
"thiserror",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"url",
"uuid",
@@ -2881,6 +2882,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-tungstenite"
version = "0.21.0"

View File

@@ -33,6 +33,7 @@ thiserror = { workspace = true }
tokio-tungstenite = { version = "0.21.0", features = ["native-tls-vendored"] }
openssl = { version = "0.10", features = ["vendored"] }
tokio = { version = "1", features = ["rt", "macros"] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
url = "2.5.0"
futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
async-trait = "0.1.80"

View File

@@ -7,6 +7,7 @@ pub mod frb;
pub mod logger;
pub mod model;
pub mod persist;
pub(crate) mod receive_swap;
pub mod sdk;
pub(crate) mod send_swap;
pub(crate) mod swapper;

View File

@@ -0,0 +1,176 @@
use crate::ensure_sdk;
use crate::model::PaymentState::{Complete, Created, Failed, Pending, TimedOut};
use crate::model::{PaymentTxData, PaymentType, ReceiveSwap};
use crate::{
error::PaymentError, model::PaymentState, persist::Persister, swapper::Swapper,
wallet::OnchainWallet,
};
use anyhow::{anyhow, Result};
use boltz_client::swaps::boltz::RevSwapStates;
use log::{debug, error, info, warn};
use std::{str::FromStr, sync::Arc};
use tokio::sync::broadcast;
pub(crate) struct ReceiveSwapStateHandler {
onchain_wallet: Arc<dyn OnchainWallet>,
persister: Arc<Persister>,
swapper: Arc<dyn Swapper>,
subscription_notifier: broadcast::Sender<String>,
}
impl ReceiveSwapStateHandler {
pub(crate) fn new(
onchain_wallet: Arc<dyn OnchainWallet>,
persister: Arc<Persister>,
swapper: Arc<dyn Swapper>,
) -> Self {
let (subscription_notifier, _) = broadcast::channel::<String>(30);
Self {
onchain_wallet,
persister,
swapper,
subscription_notifier,
}
}
pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver<String> {
self.subscription_notifier.subscribe()
}
/// Handles status updates from Boltz for Receive swaps
pub(crate) async fn on_new_status(&self, swap_state: &str, id: &str) -> Result<()> {
let receive_swap = self
.persister
.fetch_receive_swap(id)?
.ok_or(anyhow!("No ongoing Receive Swap found for ID {id}"))?;
info!("Handling Receive Swap transition to {swap_state:?} for swap {id}");
match RevSwapStates::from_str(swap_state) {
Ok(RevSwapStates::SwapExpired
| RevSwapStates::InvoiceExpired
| RevSwapStates::TransactionFailed
| RevSwapStates::TransactionRefunded) => {
error!("Swap {id} entered into an unrecoverable state: {swap_state:?}");
self.update_swap_info(id, Failed, None).await?;
Ok(())
}
// The lockup tx is in the mempool and we accept 0-conf => try to claim
// TODO Add 0-conf preconditions check: https://github.com/breez/breez-liquid-sdk/issues/187
Ok(RevSwapStates::TransactionMempool
// The lockup tx is confirmed => try to claim
| RevSwapStates::TransactionConfirmed) => {
match receive_swap.claim_tx_id {
Some(claim_tx_id) => {
warn!("Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}")
}
None => {
self.update_swap_info(&receive_swap.id, Pending, None)
.await?;
match self.claim(&receive_swap).await {
Ok(_) => {}
Err(err) => match err {
PaymentError::AlreadyClaimed => warn!("Funds already claimed for Receive Swap {id}"),
_ => error!("Claim for Receive Swap {id} failed: {err}")
}
}
}
}
Ok(())
}
Ok(_) => {
debug!("Unhandled state for Receive Swap {id}: {swap_state}");
Ok(())
},
_ => Err(anyhow!("Invalid RevSwapState for Receive Swap {id}: {swap_state}")),
}
}
/// Transitions a Receive swap to a new state
pub(crate) async fn update_swap_info(
&self,
swap_id: &str,
to_state: PaymentState,
claim_tx_id: Option<&str>,
) -> Result<(), PaymentError> {
info!(
"Transitioning Receive swap {swap_id} to {to_state:?} (claim_tx_id = {claim_tx_id:?})"
);
let swap = self
.persister
.fetch_receive_swap(swap_id)
.map_err(|_| PaymentError::PersistError)?
.ok_or(PaymentError::Generic {
err: format!("Receive Swap not found {swap_id}"),
})?;
let payment_id = claim_tx_id.map(|c| c.to_string()).or(swap.claim_tx_id);
Self::validate_state_transition(swap.state, to_state)?;
self.persister
.try_handle_receive_swap_update(swap_id, to_state, claim_tx_id)?;
if let Some(payment_id) = payment_id {
let _ = self.subscription_notifier.send(payment_id);
}
Ok(())
}
async fn claim(&self, ongoing_receive_swap: &ReceiveSwap) -> Result<(), PaymentError> {
ensure_sdk!(
ongoing_receive_swap.claim_tx_id.is_none(),
PaymentError::AlreadyClaimed
);
let swap_id = &ongoing_receive_swap.id;
let claim_address = self.onchain_wallet.next_unused_address().await?.to_string();
let claim_tx_id = self
.swapper
.claim_receive_swap(ongoing_receive_swap, claim_address)?;
// We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while
// This makes the tx known to the SDK (get_info, list_payments) instantly
self.persister.insert_or_update_payment(PaymentTxData {
tx_id: claim_tx_id.clone(),
timestamp: None,
amount_sat: ongoing_receive_swap.receiver_amount_sat,
payment_type: PaymentType::Receive,
is_confirmed: false,
})?;
self.update_swap_info(swap_id, Pending, Some(&claim_tx_id))
.await?;
Ok(())
}
fn validate_state_transition(
from_state: PaymentState,
to_state: PaymentState,
) -> Result<(), PaymentError> {
match (from_state, to_state) {
(_, Created) => Err(PaymentError::Generic {
err: "Cannot transition to Created state".to_string(),
}),
(Created | Pending, Pending) => Ok(()),
(Complete | Failed | TimedOut, Pending) => Err(PaymentError::Generic {
err: format!("Cannot transition from {from_state:?} to Pending state"),
}),
(Created | Pending, Complete) => Ok(()),
(Complete | Failed | TimedOut, Complete) => Err(PaymentError::Generic {
err: format!("Cannot transition from {from_state:?} to Complete state"),
}),
(Created, TimedOut) => Ok(()),
(_, TimedOut) => Err(PaymentError::Generic {
err: format!("Cannot transition from {from_state:?} to TimedOut state"),
}),
(_, Failed) => Ok(()),
}
}
}

View File

@@ -1,13 +1,11 @@
use anyhow::{anyhow, Result};
use anyhow::Result;
use async_trait::async_trait;
use boltz_client::lightning_invoice::Bolt11InvoiceDescription;
use boltz_client::swaps::boltzv2;
use boltz_client::ToHex;
use boltz_client::{
swaps::{boltz::RevSwapStates, boltzv2::*},
util::secrets::Preimage,
Amount, Bolt11Invoice,
};
use boltz_client::{swaps::boltzv2::*, util::secrets::Preimage, Amount, Bolt11Invoice};
use futures_util::stream::select_all;
use futures_util::StreamExt;
use log::{debug, error, info, warn};
use lwk_wollet::{elements::LockTime, ElementsNetwork};
use std::time::Instant;
@@ -20,9 +18,11 @@ use std::{
};
use tokio::sync::{watch, RwLock};
use tokio::time::MissedTickBehavior;
use tokio_stream::wrappers::BroadcastStream;
use crate::error::LiquidSdkError;
use crate::model::PaymentState::*;
use crate::receive_swap::ReceiveSwapStateHandler;
use crate::send_swap::SendSwapStateHandler;
use crate::swapper::{BoltzSwapper, ReconnectHandler, Swapper, SwapperStatusStream};
use crate::wallet::{LiquidOnchainWallet, OnchainWallet};
@@ -52,6 +52,8 @@ pub struct LiquidSdk {
is_started: RwLock<bool>,
shutdown_sender: watch::Sender<()>,
shutdown_receiver: watch::Receiver<()>,
send_swap_state_handler: SendSwapStateHandler,
receive_swap_state_handler: ReceiveSwapStateHandler,
}
impl LiquidSdk {
@@ -75,9 +77,22 @@ impl LiquidSdk {
let swapper = Arc::new(BoltzSwapper::new(config.clone()));
let status_stream = Arc::<dyn SwapperStatusStream>::from(swapper.create_status_stream());
let onchain_wallet = Arc::new(LiquidOnchainWallet::new(mnemonic, config.clone())?);
let send_swap_state_handler = SendSwapStateHandler::new(
config.clone(),
onchain_wallet.clone(),
persister.clone(),
swapper.clone(),
);
let receive_swap_state_handler = ReceiveSwapStateHandler::new(
onchain_wallet.clone(),
persister.clone(),
swapper.clone(),
);
let sdk = Arc::new(LiquidSdk {
config: config.clone(),
onchain_wallet: Arc::new(LiquidOnchainWallet::new(mnemonic, config)?),
onchain_wallet,
persister: persister.clone(),
event_manager,
status_stream: status_stream.clone(),
@@ -85,8 +100,9 @@ impl LiquidSdk {
is_started: RwLock::new(false),
shutdown_sender,
shutdown_receiver,
send_swap_state_handler,
receive_swap_state_handler,
});
Ok(sdk)
}
@@ -168,23 +184,26 @@ impl LiquidSdk {
tokio::spawn(async move {
let mut shutdown_receiver = cloned.shutdown_receiver.clone();
let mut updates_stream = cloned.status_stream.subscribe_swap_updates();
let send_swap_state_handler = SendSwapStateHandler::new(
cloned.config.clone(),
cloned.onchain_wallet.clone(),
cloned.persister.clone(),
cloned.swapper.clone(),
);
let mut swap_state_changes = send_swap_state_handler.subscribe_payment_updates();
let swaps_streams = vec![
cloned.send_swap_state_handler.subscribe_payment_updates(),
cloned
.receive_swap_state_handler
.subscribe_payment_updates(),
];
let mut combined_swap_streams =
select_all(swaps_streams.into_iter().map(BroadcastStream::new));
loop {
tokio::select! {
payment_id = swap_state_changes.recv() => {
match payment_id {
Ok(payment_id) => {
if let Err(e) = cloned.emit_payment_updated(Some(payment_id)).await {
error!("Failed to emit payment update: {e:?}");
payment_id = combined_swap_streams.next() => {
if let Some(payment_id) = payment_id {
match payment_id {
Ok(payment_id) => {
if let Err(e) = cloned.emit_payment_updated(Some(payment_id)).await {
error!("Failed to emit payment update: {e:?}");
}
}
}
Err(e) => error!("Failed to receive send swap state change: {e:?}")
Err(e) => error!("Failed to receive swap state change: {e:?}")
}
}
}
update = updates_stream.recv() => match update {
@@ -192,7 +211,7 @@ impl LiquidSdk {
let _ = cloned.sync().await;
match cloned.persister.fetch_send_swap_by_id(&id) {
Ok(Some(_)) => {
match send_swap_state_handler.on_new_status(&status, &id).await {
match cloned.send_swap_state_handler.on_new_status(&status, &id).await {
Ok(_) => info!("Succesfully handled Send Swap {id} update"),
Err(e) => error!("Failed to handle Send Swap {id} update: {e}")
}
@@ -200,7 +219,7 @@ impl LiquidSdk {
_ => {
match cloned.persister.fetch_receive_swap(&id) {
Ok(Some(_)) => {
match cloned.try_handle_receive_swap_boltz_status(&status, &id).await {
match cloned.receive_swap_state_handler.on_new_status(&status, &id).await {
Ok(_) => info!("Succesfully handled Receive Swap {id} update"),
Err(e) => error!("Failed to handle Receive Swap {id} update: {e}")
}
@@ -294,61 +313,6 @@ impl LiquidSdk {
Ok(())
}
fn validate_state_transition(
from_state: PaymentState,
to_state: PaymentState,
) -> Result<(), PaymentError> {
match (from_state, to_state) {
(_, Created) => Err(PaymentError::Generic {
err: "Cannot transition to Created state".to_string(),
}),
(Created | Pending, Pending) => Ok(()),
(Complete | Failed | TimedOut, Pending) => Err(PaymentError::Generic {
err: format!("Cannot transition from {from_state:?} to Pending state"),
}),
(Created | Pending, Complete) => Ok(()),
(Complete | Failed | TimedOut, Complete) => Err(PaymentError::Generic {
err: format!("Cannot transition from {from_state:?} to Complete state"),
}),
(Created, TimedOut) => Ok(()),
(_, TimedOut) => Err(PaymentError::Generic {
err: format!("Cannot transition from {from_state:?} to TimedOut state"),
}),
(_, Failed) => Ok(()),
}
}
/// Transitions a Receive swap to a new state
pub(crate) async fn try_handle_receive_swap_update(
&self,
swap_id: &str,
to_state: PaymentState,
claim_tx_id: Option<&str>,
) -> Result<(), PaymentError> {
info!(
"Transitioning Receive swap {swap_id} to {to_state:?} (claim_tx_id = {claim_tx_id:?})"
);
let swap = self
.persister
.fetch_receive_swap(swap_id)
.map_err(|_| PaymentError::PersistError)?
.ok_or(PaymentError::Generic {
err: format!("Receive Swap not found {swap_id}"),
})?;
let payment_id = claim_tx_id.map(|c| c.to_string()).or(swap.claim_tx_id);
Self::validate_state_transition(swap.state, to_state)?;
self.persister
.try_handle_receive_swap_update(swap_id, to_state, claim_tx_id)?;
Ok(self.emit_payment_updated(payment_id).await?)
}
async fn emit_payment_updated(&self, payment_id: Option<String>) -> Result<()> {
if let Some(id) = payment_id {
match self.persister.get_payment(id.clone())? {
@@ -442,62 +406,6 @@ impl LiquidSdk {
Ok(())
}
/// Handles status updates from Boltz for Receive swaps
pub(crate) async fn try_handle_receive_swap_boltz_status(
&self,
swap_state: &str,
id: &str,
) -> Result<()> {
let receive_swap = self
.persister
.fetch_receive_swap(id)?
.ok_or(anyhow!("No ongoing Receive Swap found for ID {id}"))?;
info!("Handling Receive Swap transition to {swap_state:?} for swap {id}");
match RevSwapStates::from_str(swap_state) {
Ok(RevSwapStates::SwapExpired
| RevSwapStates::InvoiceExpired
| RevSwapStates::TransactionFailed
| RevSwapStates::TransactionRefunded) => {
error!("Swap {id} entered into an unrecoverable state: {swap_state:?}");
self.try_handle_receive_swap_update(id, Failed, None).await?;
Ok(())
}
// The lockup tx is in the mempool and we accept 0-conf => try to claim
// TODO Add 0-conf preconditions check: https://github.com/breez/breez-liquid-sdk/issues/187
Ok(RevSwapStates::TransactionMempool
// The lockup tx is confirmed => try to claim
| RevSwapStates::TransactionConfirmed) => {
match receive_swap.claim_tx_id {
Some(claim_tx_id) => {
warn!("Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}")
}
None => {
self.try_handle_receive_swap_update(&receive_swap.id, Pending, None)
.await?;
match self.try_claim(&receive_swap).await {
Ok(_) => {}
Err(err) => match err {
PaymentError::AlreadyClaimed => warn!("Funds already claimed for Receive Swap {id}"),
_ => error!("Claim for Receive Swap {id} failed: {err}")
}
}
}
}
Ok(())
}
Ok(_) => {
debug!("Unhandled state for Receive Swap {id}: {swap_state}");
Ok(())
},
_ => Err(anyhow!("Invalid RevSwapState for Receive Swap {id}: {swap_state}")),
}
}
pub async fn get_info(&self, req: GetInfoRequest) -> Result<GetInfoResponse> {
self.ensure_is_started().await?;
debug!(
@@ -801,33 +709,6 @@ impl LiquidSdk {
}
}
async fn try_claim(&self, ongoing_receive_swap: &ReceiveSwap) -> Result<(), PaymentError> {
ensure_sdk!(
ongoing_receive_swap.claim_tx_id.is_none(),
PaymentError::AlreadyClaimed
);
let swap_id = &ongoing_receive_swap.id;
let claim_address = self.onchain_wallet.next_unused_address().await?.to_string();
let claim_tx_id = self
.swapper
.claim_receive_swap(ongoing_receive_swap, claim_address)?;
// We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while
// This makes the tx known to the SDK (get_info, list_payments) instantly
self.persister.insert_or_update_payment(PaymentTxData {
tx_id: claim_tx_id.clone(),
timestamp: None,
amount_sat: ongoing_receive_swap.receiver_amount_sat,
payment_type: PaymentType::Receive,
is_confirmed: false,
})?;
self.try_handle_receive_swap_update(swap_id, Pending, Some(&claim_tx_id))
.await?;
Ok(())
}
pub async fn prepare_receive_payment(
&self,
req: &PrepareReceiveRequest,
@@ -952,7 +833,9 @@ impl LiquidSdk {
self.persister.clone(),
self.swapper.clone(),
);
for tx in self.onchain_wallet.transactions().await? {
let txs = self.onchain_wallet.transactions().await?;
let num = txs.len();
for tx in txs {
let tx_id = tx.txid.to_string();
let is_tx_confirmed = tx.height.is_some();
let amount_sat = tx.balance.values().sum::<i64>();
@@ -960,7 +843,8 @@ impl LiquidSdk {
// Transition the swaps whose state depends on this tx being confirmed
if is_tx_confirmed {
if let Some(swap) = pending_receive_swaps_by_claim_tx_id.get(&tx_id) {
self.try_handle_receive_swap_update(&swap.id, Complete, None)
self.receive_swap_state_handler
.update_swap_info(&swap.id, Complete, None)
.await?;
}
if let Some(swap) = pending_send_swaps_by_refund_tx_id.get(&tx_id) {
@@ -968,8 +852,15 @@ impl LiquidSdk {
.update_swap_info(&swap.id, Failed, None, None, None)
.await?;
}
} else {
info!("*******Processing unconfirmed tx: {tx_id}");
}
print!(
"*******Got tx num: {num}: {tx_id} height: {:?} \n",
tx.height
);
self.persister.insert_or_update_payment(PaymentTxData {
tx_id,
timestamp: tx.timestamp,