From 61ea9c558242ba89f082be7f4fa1b2cbe7342524 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Thu, 27 Feb 2025 16:02:42 +0200 Subject: [PATCH 1/5] subscribe incoming synced records to swapper updates --- lib/core/src/sdk.rs | 48 +++++++++++++++++-- lib/core/src/swapper/boltz/status_stream.rs | 15 ++++-- lib/core/src/swapper/mod.rs | 6 ++- lib/core/src/swapper/reconnect_handler.rs | 13 ++--- lib/core/src/sync/mod.rs | 53 ++++++++++++++++----- lib/core/src/test_utils/status_stream.rs | 4 +- 6 files changed, 112 insertions(+), 27 deletions(-) diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 5c776ed..4f54217 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -41,7 +41,10 @@ use crate::model::PaymentState::*; use crate::model::Signer; use crate::receive_swap::ReceiveSwapHandler; use crate::send_swap::SendSwapHandler; -use crate::swapper::{boltz::BoltzSwapper, Swapper, SwapperReconnectHandler, SwapperStatusStream}; +use crate::swapper::SubscriptionHandler; +use crate::swapper::{ + boltz::BoltzSwapper, Swapper, SwapperStatusStream, SwapperSubscriptionHandler, +}; use crate::wallet::{LiquidOnchainWallet, OnchainWallet}; use crate::{ error::{PaymentError, SdkResult}, @@ -308,18 +311,19 @@ impl LiquidSdk { /// /// Internal method. Should only be used as part of [LiquidSdk::start]. async fn start_background_tasks(self: &Arc) -> SdkResult<()> { - let reconnect_handler = Box::new(SwapperReconnectHandler::new( + let subscription_handler = Box::new(SwapperSubscriptionHandler::new( self.persister.clone(), self.status_stream.clone(), )); self.status_stream .clone() - .start(reconnect_handler, self.shutdown_receiver.clone()); + .start(subscription_handler.clone(), self.shutdown_receiver.clone()); if let Some(sync_service) = self.sync_service.clone() { sync_service.start(self.shutdown_receiver.clone()); } self.track_new_blocks(); self.track_swap_updates(); + self.track_realtime_sync_events(subscription_handler); Ok(()) } @@ -342,6 +346,44 @@ impl LiquidSdk { Ok(()) } + fn track_realtime_sync_events( + self: &Arc, + subscription_handler: Box, + ) { + let cloned = self.clone(); + let Some(sync_service) = cloned.sync_service.clone() else { + return; + }; + let mut shutdown_receiver = cloned.shutdown_receiver.clone(); + + tokio::spawn(async move { + let mut sync_events_receiver = sync_service.subscribe_events(); + loop { + tokio::select! { + event = sync_events_receiver.recv() => { + if let Ok(e) = event { + match e { + sync::Event::SyncedCompleted{data} => { + info!( + "Received sync event: pulled {} records, pushed {} records", + data.pulled_records_count, data.pushed_records_count + ); + if data.pulled_records_count > 0 { + subscription_handler.subscribe_swaps().await; + } + } + } + } + } + _ = shutdown_receiver.changed() => { + info!("Received shutdown signal, exiting real-time sync loop"); + return; + } + } + } + }); + } + fn track_new_blocks(self: &Arc) { let cloned = self.clone(); tokio::spawn(async move { diff --git a/lib/core/src/swapper/boltz/status_stream.rs b/lib/core/src/swapper/boltz/status_stream.rs index f077a2a..02ee65f 100644 --- a/lib/core/src/swapper/boltz/status_stream.rs +++ b/lib/core/src/swapper/boltz/status_stream.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -13,7 +14,7 @@ use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; use url::Url; use crate::model::Config; -use crate::swapper::{ReconnectHandler, SwapperStatusStream}; +use crate::swapper::{SubscriptionHandler, SwapperStatusStream}; use super::{split_proxy_url, ProxyUrlFetcher}; @@ -80,7 +81,7 @@ impl SwapperStatusStream for BoltzStatusStream { fn start( self: Arc, - callback: Box, + callback: Box, mut shutdown: watch::Receiver<()>, ) { let keep_alive_ping_interval = Duration::from_secs(15); @@ -91,9 +92,10 @@ impl SwapperStatusStream for BoltzStatusStream { debug!("Start of ws stream loop"); match self.connect().await { Ok(mut ws_stream) => { + let mut tracked_swap_ids: HashSet = HashSet::new(); let mut subscription_stream = self.subscription_notifier.subscribe(); - callback.on_stream_reconnect().await; + callback.subscribe_swaps().await; let mut interval = tokio::time::interval(keep_alive_ping_interval); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -113,7 +115,12 @@ impl SwapperStatusStream for BoltzStatusStream { }, swap_res = subscription_stream.recv() => match swap_res { - Ok(swap_id) => self.send_subscription(swap_id, &mut ws_stream).await, + Ok(swap_id) => { + if !tracked_swap_ids.contains(&swap_id) { + self.send_subscription(swap_id.clone(), &mut ws_stream).await; + tracked_swap_ids.insert(swap_id.clone()); + } + }, Err(e) => error!("Received error on subscription stream: {e:?}"), }, diff --git a/lib/core/src/swapper/mod.rs b/lib/core/src/swapper/mod.rs index 23a7723..b443aa5 100644 --- a/lib/core/src/swapper/mod.rs +++ b/lib/core/src/swapper/mod.rs @@ -135,7 +135,11 @@ pub trait Swapper: Send + Sync { } pub trait SwapperStatusStream: Send + Sync { - fn start(self: Arc, callback: Box, shutdown: watch::Receiver<()>); + fn start( + self: Arc, + callback: Box, + shutdown: watch::Receiver<()>, + ); fn track_swap_id(&self, swap_id: &str) -> anyhow::Result<()>; fn subscribe_swap_updates(&self) -> broadcast::Receiver; } diff --git a/lib/core/src/swapper/reconnect_handler.rs b/lib/core/src/swapper/reconnect_handler.rs index 4d02eaa..4cf2054 100644 --- a/lib/core/src/swapper/reconnect_handler.rs +++ b/lib/core/src/swapper/reconnect_handler.rs @@ -8,16 +8,17 @@ use crate::persist::Persister; use super::SwapperStatusStream; #[async_trait] -pub trait ReconnectHandler: Send + Sync { - async fn on_stream_reconnect(&self); +pub trait SubscriptionHandler: Send + Sync { + async fn subscribe_swaps(&self); } -pub(crate) struct SwapperReconnectHandler { +#[derive(Clone)] +pub(crate) struct SwapperSubscriptionHandler { persister: Arc, status_stream: Arc, } -impl SwapperReconnectHandler { +impl SwapperSubscriptionHandler { pub(crate) fn new( persister: Arc, status_stream: Arc, @@ -30,8 +31,8 @@ impl SwapperReconnectHandler { } #[async_trait] -impl ReconnectHandler for SwapperReconnectHandler { - async fn on_stream_reconnect(&self) { +impl SubscriptionHandler for SwapperSubscriptionHandler { + async fn subscribe_swaps(&self) { match self.persister.list_ongoing_swaps() { Ok(initial_ongoing_swaps) => { info!( diff --git a/lib/core/src/sync/mod.rs b/lib/core/src/sync/mod.rs index d9fbaee..9dd7b0f 100644 --- a/lib/core/src/sync/mod.rs +++ b/lib/core/src/sync/mod.rs @@ -3,9 +3,8 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Result}; -use futures_util::TryFutureExt; use log::{info, trace, warn}; -use tokio::sync::watch; +use tokio::sync::{broadcast, watch}; use tokio::time::sleep; use tokio_stream::StreamExt as _; use tonic::Streaming; @@ -29,6 +28,17 @@ use crate::{ pub(crate) mod client; pub(crate) mod model; +#[derive(Clone, Debug)] +pub(crate) enum Event { + SyncedCompleted { data: SyncCompletedData }, +} + +#[derive(Clone, Debug)] +pub(crate) struct SyncCompletedData { + pub(crate) pulled_records_count: u32, + pub(crate) pushed_records_count: u32, +} + pub(crate) struct SyncService { remote_url: String, client_id: String, @@ -36,6 +46,7 @@ pub(crate) struct SyncService { recoverer: Arc, signer: Arc>, client: Box, + subscription_notifier: broadcast::Sender, } impl SyncService { @@ -47,7 +58,7 @@ impl SyncService { client: Box, ) -> Self { let client_id = uuid::Uuid::new_v4().to_string(); - + let (subscription_notifier, _) = broadcast::channel::(30); Self { client_id, remote_url, @@ -55,9 +66,14 @@ impl SyncService { recoverer, signer, client, + subscription_notifier, } } + pub(crate) fn subscribe_events(&self) -> broadcast::Receiver { + self.subscription_notifier.subscribe() + } + fn check_remote_change(&self) -> Result<()> { match self .persister @@ -72,8 +88,22 @@ impl SyncService { async fn run_event_loop(&self) { info!("realtime-sync: Running sync event loop"); - if let Err(err) = self.pull().and_then(|_| self.push()).await { - log::debug!("Could not run sync event loop: {err:?}"); + let Ok(pulled_records_count) = self.pull().await else { + warn!("realtime-sync: Could not pull records"); + return; + }; + let Ok(pushed_records_count) = self.push().await else { + warn!("realtime-sync: Could not push records"); + return; + }; + + if let Err(e) = self.subscription_notifier.send(Event::SyncedCompleted { + data: SyncCompletedData { + pulled_records_count, + pushed_records_count, + }, + }) { + warn!("realtime-sync: Could not send sync completed event {:?}", e); } } @@ -358,7 +388,7 @@ impl SyncService { Ok(succeded.into_iter().zip(swaps.into_iter()).collect()) } - pub(crate) async fn pull(&self) -> Result<()> { + pub(crate) async fn pull(&self) -> Result { // Step 1: Fetch and save incoming records from remote, then update local tip self.fetch_and_save_records().await?; @@ -412,10 +442,10 @@ impl SyncService { // Step 7: Clear succeded records if !succeded.is_empty() { - self.persister.remove_incoming_records(succeded)?; + self.persister.remove_incoming_records(succeded.clone())?; } - Ok(()) + Ok(succeded.len() as u32) } async fn handle_push( @@ -476,7 +506,7 @@ impl SyncService { Ok(()) } - async fn push(&self) -> Result<()> { + async fn push(&self) -> Result { let outgoing_changes = self.persister.get_sync_outgoing_changes()?; let mut succeded = vec![]; @@ -495,10 +525,11 @@ impl SyncService { } if !succeded.is_empty() { - self.persister.remove_sync_outgoing_changes(succeded)?; + self.persister + .remove_sync_outgoing_changes(succeded.clone())?; } - Ok(()) + Ok(succeded.len() as u32) } } diff --git a/lib/core/src/test_utils/status_stream.rs b/lib/core/src/test_utils/status_stream.rs index cfea086..5a9095a 100644 --- a/lib/core/src/test_utils/status_stream.rs +++ b/lib/core/src/test_utils/status_stream.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use tokio::sync::{broadcast, watch}; -use crate::swapper::{ReconnectHandler, SwapperStatusStream}; +use crate::swapper::{SubscriptionHandler, SwapperStatusStream}; pub(crate) struct MockStatusStream { pub update_notifier: broadcast::Sender, @@ -31,7 +31,7 @@ impl MockStatusStream { impl SwapperStatusStream for MockStatusStream { fn start( self: Arc, - _callback: Box, + _callback: Box, _shutdown: watch::Receiver<()>, ) { } From f8252fca6850e8cc1b53da21087d9034858fdcfa Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Thu, 27 Feb 2025 22:39:42 +0200 Subject: [PATCH 2/5] Only claim if local swap --- lib/core/src/send_swap.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/core/src/send_swap.rs b/lib/core/src/send_swap.rs index dcf1124..1b7ad96 100644 --- a/lib/core/src/send_swap.rs +++ b/lib/core/src/send_swap.rs @@ -92,10 +92,12 @@ impl SendSwapHandler { // Boltz has detected the lockup in the mempool, we can speed up // the claim by doing so cooperatively SubSwapStates::TransactionClaimPending => { - self.cooperate_claim(&swap).await.map_err(|e| { - error!("Could not cooperate Send Swap {id} claim: {e}"); - anyhow!("Could not post claim details. Err: {e:?}") - })?; + if swap.metadata.is_local { + self.cooperate_claim(&swap).await.map_err(|e| { + error!("Could not cooperate Send Swap {id} claim: {e}"); + anyhow!("Could not post claim details. Err: {e:?}") + })?; + } Ok(()) } From 33e5ac5a16615a43635c2b54e97bad5ae238b6ba Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Thu, 27 Feb 2025 22:56:32 +0200 Subject: [PATCH 3/5] fix clippy --- lib/bindings/src/lib.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/bindings/src/lib.rs b/lib/bindings/src/lib.rs index 6845f8e..e34fc5b 100644 --- a/lib/bindings/src/lib.rs +++ b/lib/bindings/src/lib.rs @@ -89,7 +89,7 @@ impl BindingLiquidSdk { } pub fn get_info(&self) -> Result { - rt().block_on(self.sdk.get_info()).map_err(Into::into) + rt().block_on(self.sdk.get_info()) } pub fn sign_message(&self, req: SignMessageRequest) -> SdkResult { @@ -189,11 +189,10 @@ impl BindingLiquidSdk { req: PrepareLnUrlPayRequest, ) -> Result { rt().block_on(self.sdk.prepare_lnurl_pay(req)) - .map_err(Into::into) } pub fn lnurl_pay(&self, req: model::LnUrlPayRequest) -> Result { - rt().block_on(self.sdk.lnurl_pay(req)).map_err(Into::into) + rt().block_on(self.sdk.lnurl_pay(req)) } pub fn lnurl_withdraw( @@ -201,7 +200,6 @@ impl BindingLiquidSdk { req: LnUrlWithdrawRequest, ) -> Result { rt().block_on(self.sdk.lnurl_withdraw(req)) - .map_err(Into::into) } pub fn lnurl_auth( @@ -244,7 +242,7 @@ impl BindingLiquidSdk { } pub fn sync(&self) -> SdkResult<()> { - rt().block_on(self.sdk.sync(false)).map_err(Into::into) + rt().block_on(self.sdk.sync(false)) } pub fn recommended_fees(&self) -> SdkResult { From 2a2514cbe9ef99e1f799d4f85ec5b98081e35cc0 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Fri, 28 Feb 2025 11:17:18 +0200 Subject: [PATCH 4/5] limit claim for local swaps --- lib/core/src/chain_swap.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/lib/core/src/chain_swap.rs b/lib/core/src/chain_swap.rs index 7d16a75..0b35ede 100644 --- a/lib/core/src/chain_swap.rs +++ b/lib/core/src/chain_swap.rs @@ -260,7 +260,7 @@ impl ChainSwapHandler { ..Default::default() })?; - if swap.accept_zero_conf { + if swap.accept_zero_conf && swap.metadata.is_local { self.claim(&id).await.map_err(|e| { error!("Could not cooperate Chain Swap {id} claim: {e}"); anyhow!("Could not post claim details. Err: {e:?}") @@ -304,10 +304,12 @@ impl ChainSwapHandler { match verify_res { Ok(_) => { info!("Server lockup transaction was verified for incoming Chain Swap {}", swap.id); - self.claim(&id).await.map_err(|e| { - error!("Could not cooperate Chain Swap {id} claim: {e}"); - anyhow!("Could not post claim details. Err: {e:?}") - })?; + if swap.metadata.is_local { + self.claim(&id).await.map_err(|e| { + error!("Could not cooperate Chain Swap {id} claim: {e}"); + anyhow!("Could not post claim details. Err: {e:?}") + })?; + } } Err(e) => { warn!("Server lockup transaction for incoming Chain Swap {} could not be verified. txid: {}, err: {}", swap.id, transaction.id, e); @@ -613,7 +615,7 @@ impl ChainSwapHandler { ..Default::default() })?; - if swap.accept_zero_conf { + if swap.accept_zero_conf && swap.metadata.is_local { self.claim(&id).await.map_err(|e| { error!("Could not cooperate Chain Swap {id} claim: {e}"); anyhow!("Could not post claim details. Err: {e:?}") @@ -663,10 +665,12 @@ impl ChainSwapHandler { server_lockup_tx_id: Some(transaction.id), ..Default::default() })?; - self.claim(&id).await.map_err(|e| { - error!("Could not cooperate Chain Swap {id} claim: {e}"); - anyhow!("Could not post claim details. Err: {e:?}") - })?; + if swap.metadata.is_local { + self.claim(&id).await.map_err(|e| { + error!("Could not cooperate Chain Swap {id} claim: {e}"); + anyhow!("Could not post claim details. Err: {e:?}") + })?; + } } Some(claim_tx_id) => { warn!("Claim tx for Chain Swap {id} was already broadcast: txid {claim_tx_id}") From 3081c0bc0e61a87193468da6a36a2c9ef445999c Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Fri, 28 Feb 2025 13:59:29 +0200 Subject: [PATCH 5/5] rename reconnect_handler to subscription_handler --- lib/core/src/swapper/mod.rs | 4 ++-- .../swapper/{reconnect_handler.rs => subscription_handler.rs} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename lib/core/src/swapper/{reconnect_handler.rs => subscription_handler.rs} (100%) diff --git a/lib/core/src/swapper/mod.rs b/lib/core/src/swapper/mod.rs index b443aa5..46c3d6d 100644 --- a/lib/core/src/swapper/mod.rs +++ b/lib/core/src/swapper/mod.rs @@ -18,10 +18,10 @@ use crate::{ prelude::{Direction, SendSwap, Swap, Utxo}, }; -pub(crate) use reconnect_handler::*; +pub(crate) use subscription_handler::*; pub(crate) mod boltz; -pub(crate) mod reconnect_handler; +pub(crate) mod subscription_handler; #[async_trait] pub trait Swapper: Send + Sync { diff --git a/lib/core/src/swapper/reconnect_handler.rs b/lib/core/src/swapper/subscription_handler.rs similarity index 100% rename from lib/core/src/swapper/reconnect_handler.rs rename to lib/core/src/swapper/subscription_handler.rs