Merge pull request #760 from breez/subscribe-synced-swaps

Subscribe incoming synced records to swapper updates
This commit is contained in:
Roei Erez
2025-02-28 14:00:20 +02:00
committed by GitHub
9 changed files with 137 additions and 48 deletions

View File

@@ -89,7 +89,7 @@ impl BindingLiquidSdk {
}
pub fn get_info(&self) -> Result<GetInfoResponse, SdkError> {
rt().block_on(self.sdk.get_info()).map_err(Into::into)
rt().block_on(self.sdk.get_info())
}
pub fn sign_message(&self, req: SignMessageRequest) -> SdkResult<SignMessageResponse> {
@@ -189,11 +189,10 @@ impl BindingLiquidSdk {
req: PrepareLnUrlPayRequest,
) -> Result<PrepareLnUrlPayResponse, LnUrlPayError> {
rt().block_on(self.sdk.prepare_lnurl_pay(req))
.map_err(Into::into)
}
pub fn lnurl_pay(&self, req: model::LnUrlPayRequest) -> Result<LnUrlPayResult, LnUrlPayError> {
rt().block_on(self.sdk.lnurl_pay(req)).map_err(Into::into)
rt().block_on(self.sdk.lnurl_pay(req))
}
pub fn lnurl_withdraw(
@@ -201,7 +200,6 @@ impl BindingLiquidSdk {
req: LnUrlWithdrawRequest,
) -> Result<LnUrlWithdrawResult, LnUrlWithdrawError> {
rt().block_on(self.sdk.lnurl_withdraw(req))
.map_err(Into::into)
}
pub fn lnurl_auth(
@@ -244,7 +242,7 @@ impl BindingLiquidSdk {
}
pub fn sync(&self) -> SdkResult<()> {
rt().block_on(self.sdk.sync(false)).map_err(Into::into)
rt().block_on(self.sdk.sync(false))
}
pub fn recommended_fees(&self) -> SdkResult<RecommendedFees> {

View File

@@ -260,7 +260,7 @@ impl ChainSwapHandler {
..Default::default()
})?;
if swap.accept_zero_conf {
if swap.accept_zero_conf && swap.metadata.is_local {
self.claim(&id).await.map_err(|e| {
error!("Could not cooperate Chain Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
@@ -304,10 +304,12 @@ impl ChainSwapHandler {
match verify_res {
Ok(_) => {
info!("Server lockup transaction was verified for incoming Chain Swap {}", swap.id);
self.claim(&id).await.map_err(|e| {
error!("Could not cooperate Chain Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
})?;
if swap.metadata.is_local {
self.claim(&id).await.map_err(|e| {
error!("Could not cooperate Chain Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
})?;
}
}
Err(e) => {
warn!("Server lockup transaction for incoming Chain Swap {} could not be verified. txid: {}, err: {}", swap.id, transaction.id, e);
@@ -613,7 +615,7 @@ impl ChainSwapHandler {
..Default::default()
})?;
if swap.accept_zero_conf {
if swap.accept_zero_conf && swap.metadata.is_local {
self.claim(&id).await.map_err(|e| {
error!("Could not cooperate Chain Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
@@ -663,10 +665,12 @@ impl ChainSwapHandler {
server_lockup_tx_id: Some(transaction.id),
..Default::default()
})?;
self.claim(&id).await.map_err(|e| {
error!("Could not cooperate Chain Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
})?;
if swap.metadata.is_local {
self.claim(&id).await.map_err(|e| {
error!("Could not cooperate Chain Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
})?;
}
}
Some(claim_tx_id) => {
warn!("Claim tx for Chain Swap {id} was already broadcast: txid {claim_tx_id}")

View File

@@ -41,7 +41,10 @@ use crate::model::PaymentState::*;
use crate::model::Signer;
use crate::receive_swap::ReceiveSwapHandler;
use crate::send_swap::SendSwapHandler;
use crate::swapper::{boltz::BoltzSwapper, Swapper, SwapperReconnectHandler, SwapperStatusStream};
use crate::swapper::SubscriptionHandler;
use crate::swapper::{
boltz::BoltzSwapper, Swapper, SwapperStatusStream, SwapperSubscriptionHandler,
};
use crate::wallet::{LiquidOnchainWallet, OnchainWallet};
use crate::{
error::{PaymentError, SdkResult},
@@ -308,18 +311,19 @@ impl LiquidSdk {
///
/// Internal method. Should only be used as part of [LiquidSdk::start].
async fn start_background_tasks(self: &Arc<LiquidSdk>) -> SdkResult<()> {
let reconnect_handler = Box::new(SwapperReconnectHandler::new(
let subscription_handler = Box::new(SwapperSubscriptionHandler::new(
self.persister.clone(),
self.status_stream.clone(),
));
self.status_stream
.clone()
.start(reconnect_handler, self.shutdown_receiver.clone());
.start(subscription_handler.clone(), self.shutdown_receiver.clone());
if let Some(sync_service) = self.sync_service.clone() {
sync_service.start(self.shutdown_receiver.clone());
}
self.track_new_blocks();
self.track_swap_updates();
self.track_realtime_sync_events(subscription_handler);
Ok(())
}
@@ -342,6 +346,44 @@ impl LiquidSdk {
Ok(())
}
fn track_realtime_sync_events(
self: &Arc<LiquidSdk>,
subscription_handler: Box<dyn SubscriptionHandler>,
) {
let cloned = self.clone();
let Some(sync_service) = cloned.sync_service.clone() else {
return;
};
let mut shutdown_receiver = cloned.shutdown_receiver.clone();
tokio::spawn(async move {
let mut sync_events_receiver = sync_service.subscribe_events();
loop {
tokio::select! {
event = sync_events_receiver.recv() => {
if let Ok(e) = event {
match e {
sync::Event::SyncedCompleted{data} => {
info!(
"Received sync event: pulled {} records, pushed {} records",
data.pulled_records_count, data.pushed_records_count
);
if data.pulled_records_count > 0 {
subscription_handler.subscribe_swaps().await;
}
}
}
}
}
_ = shutdown_receiver.changed() => {
info!("Received shutdown signal, exiting real-time sync loop");
return;
}
}
}
});
}
fn track_new_blocks(self: &Arc<LiquidSdk>) {
let cloned = self.clone();
tokio::spawn(async move {

View File

@@ -92,10 +92,12 @@ impl SendSwapHandler {
// Boltz has detected the lockup in the mempool, we can speed up
// the claim by doing so cooperatively
SubSwapStates::TransactionClaimPending => {
self.cooperate_claim(&swap).await.map_err(|e| {
error!("Could not cooperate Send Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
})?;
if swap.metadata.is_local {
self.cooperate_claim(&swap).await.map_err(|e| {
error!("Could not cooperate Send Swap {id} claim: {e}");
anyhow!("Could not post claim details. Err: {e:?}")
})?;
}
Ok(())
}

View File

@@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
@@ -13,7 +14,7 @@ use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use url::Url;
use crate::model::Config;
use crate::swapper::{ReconnectHandler, SwapperStatusStream};
use crate::swapper::{SubscriptionHandler, SwapperStatusStream};
use super::{split_proxy_url, ProxyUrlFetcher};
@@ -80,7 +81,7 @@ impl SwapperStatusStream for BoltzStatusStream {
fn start(
self: Arc<Self>,
callback: Box<dyn ReconnectHandler>,
callback: Box<dyn SubscriptionHandler>,
mut shutdown: watch::Receiver<()>,
) {
let keep_alive_ping_interval = Duration::from_secs(15);
@@ -91,9 +92,10 @@ impl SwapperStatusStream for BoltzStatusStream {
debug!("Start of ws stream loop");
match self.connect().await {
Ok(mut ws_stream) => {
let mut tracked_swap_ids: HashSet<String> = HashSet::new();
let mut subscription_stream = self.subscription_notifier.subscribe();
callback.on_stream_reconnect().await;
callback.subscribe_swaps().await;
let mut interval = tokio::time::interval(keep_alive_ping_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
@@ -113,7 +115,12 @@ impl SwapperStatusStream for BoltzStatusStream {
},
swap_res = subscription_stream.recv() => match swap_res {
Ok(swap_id) => self.send_subscription(swap_id, &mut ws_stream).await,
Ok(swap_id) => {
if !tracked_swap_ids.contains(&swap_id) {
self.send_subscription(swap_id.clone(), &mut ws_stream).await;
tracked_swap_ids.insert(swap_id.clone());
}
},
Err(e) => error!("Received error on subscription stream: {e:?}"),
},

View File

@@ -18,10 +18,10 @@ use crate::{
prelude::{Direction, SendSwap, Swap, Utxo},
};
pub(crate) use reconnect_handler::*;
pub(crate) use subscription_handler::*;
pub(crate) mod boltz;
pub(crate) mod reconnect_handler;
pub(crate) mod subscription_handler;
#[async_trait]
pub trait Swapper: Send + Sync {
@@ -135,7 +135,11 @@ pub trait Swapper: Send + Sync {
}
pub trait SwapperStatusStream: Send + Sync {
fn start(self: Arc<Self>, callback: Box<dyn ReconnectHandler>, shutdown: watch::Receiver<()>);
fn start(
self: Arc<Self>,
callback: Box<dyn SubscriptionHandler>,
shutdown: watch::Receiver<()>,
);
fn track_swap_id(&self, swap_id: &str) -> anyhow::Result<()>;
fn subscribe_swap_updates(&self) -> broadcast::Receiver<boltz_client::boltz::Update>;
}

View File

@@ -8,16 +8,17 @@ use crate::persist::Persister;
use super::SwapperStatusStream;
#[async_trait]
pub trait ReconnectHandler: Send + Sync {
async fn on_stream_reconnect(&self);
pub trait SubscriptionHandler: Send + Sync {
async fn subscribe_swaps(&self);
}
pub(crate) struct SwapperReconnectHandler {
#[derive(Clone)]
pub(crate) struct SwapperSubscriptionHandler {
persister: Arc<Persister>,
status_stream: Arc<dyn SwapperStatusStream>,
}
impl SwapperReconnectHandler {
impl SwapperSubscriptionHandler {
pub(crate) fn new(
persister: Arc<Persister>,
status_stream: Arc<dyn SwapperStatusStream>,
@@ -30,8 +31,8 @@ impl SwapperReconnectHandler {
}
#[async_trait]
impl ReconnectHandler for SwapperReconnectHandler {
async fn on_stream_reconnect(&self) {
impl SubscriptionHandler for SwapperSubscriptionHandler {
async fn subscribe_swaps(&self) {
match self.persister.list_ongoing_swaps() {
Ok(initial_ongoing_swaps) => {
info!(

View File

@@ -3,9 +3,8 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Result};
use futures_util::TryFutureExt;
use log::{info, trace, warn};
use tokio::sync::watch;
use tokio::sync::{broadcast, watch};
use tokio::time::sleep;
use tokio_stream::StreamExt as _;
use tonic::Streaming;
@@ -29,6 +28,17 @@ use crate::{
pub(crate) mod client;
pub(crate) mod model;
#[derive(Clone, Debug)]
pub(crate) enum Event {
SyncedCompleted { data: SyncCompletedData },
}
#[derive(Clone, Debug)]
pub(crate) struct SyncCompletedData {
pub(crate) pulled_records_count: u32,
pub(crate) pushed_records_count: u32,
}
pub(crate) struct SyncService {
remote_url: String,
client_id: String,
@@ -36,6 +46,7 @@ pub(crate) struct SyncService {
recoverer: Arc<Recoverer>,
signer: Arc<Box<dyn Signer>>,
client: Box<dyn SyncerClient>,
subscription_notifier: broadcast::Sender<Event>,
}
impl SyncService {
@@ -47,7 +58,7 @@ impl SyncService {
client: Box<dyn SyncerClient>,
) -> Self {
let client_id = uuid::Uuid::new_v4().to_string();
let (subscription_notifier, _) = broadcast::channel::<Event>(30);
Self {
client_id,
remote_url,
@@ -55,9 +66,14 @@ impl SyncService {
recoverer,
signer,
client,
subscription_notifier,
}
}
pub(crate) fn subscribe_events(&self) -> broadcast::Receiver<Event> {
self.subscription_notifier.subscribe()
}
fn check_remote_change(&self) -> Result<()> {
match self
.persister
@@ -72,8 +88,22 @@ impl SyncService {
async fn run_event_loop(&self) {
info!("realtime-sync: Running sync event loop");
if let Err(err) = self.pull().and_then(|_| self.push()).await {
log::debug!("Could not run sync event loop: {err:?}");
let Ok(pulled_records_count) = self.pull().await else {
warn!("realtime-sync: Could not pull records");
return;
};
let Ok(pushed_records_count) = self.push().await else {
warn!("realtime-sync: Could not push records");
return;
};
if let Err(e) = self.subscription_notifier.send(Event::SyncedCompleted {
data: SyncCompletedData {
pulled_records_count,
pushed_records_count,
},
}) {
warn!("realtime-sync: Could not send sync completed event {:?}", e);
}
}
@@ -358,7 +388,7 @@ impl SyncService {
Ok(succeded.into_iter().zip(swaps.into_iter()).collect())
}
pub(crate) async fn pull(&self) -> Result<()> {
pub(crate) async fn pull(&self) -> Result<u32> {
// Step 1: Fetch and save incoming records from remote, then update local tip
self.fetch_and_save_records().await?;
@@ -412,10 +442,10 @@ impl SyncService {
// Step 7: Clear succeded records
if !succeded.is_empty() {
self.persister.remove_incoming_records(succeded)?;
self.persister.remove_incoming_records(succeded.clone())?;
}
Ok(())
Ok(succeded.len() as u32)
}
async fn handle_push(
@@ -476,7 +506,7 @@ impl SyncService {
Ok(())
}
async fn push(&self) -> Result<()> {
async fn push(&self) -> Result<u32> {
let outgoing_changes = self.persister.get_sync_outgoing_changes()?;
let mut succeded = vec![];
@@ -495,10 +525,11 @@ impl SyncService {
}
if !succeded.is_empty() {
self.persister.remove_sync_outgoing_changes(succeded)?;
self.persister
.remove_sync_outgoing_changes(succeded.clone())?;
}
Ok(())
Ok(succeded.len() as u32)
}
}

View File

@@ -6,7 +6,7 @@ use std::sync::Arc;
use tokio::sync::{broadcast, watch};
use crate::swapper::{ReconnectHandler, SwapperStatusStream};
use crate::swapper::{SubscriptionHandler, SwapperStatusStream};
pub(crate) struct MockStatusStream {
pub update_notifier: broadcast::Sender<boltz::Update>,
@@ -31,7 +31,7 @@ impl MockStatusStream {
impl SwapperStatusStream for MockStatusStream {
fn start(
self: Arc<Self>,
_callback: Box<dyn ReconnectHandler>,
_callback: Box<dyn SubscriptionHandler>,
_shutdown: watch::Receiver<()>,
) {
}