From f7b7149187810f53c4242bf9e1a0d1f3ebca4e20 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Mon, 3 Jun 2024 13:48:26 +0300 Subject: [PATCH] Abstract swapper stream as part of swapper trait --- cli/Cargo.lock | 12 +++ lib/Cargo.lock | 12 +++ lib/core/Cargo.toml | 1 + lib/core/src/lib.rs | 1 - lib/core/src/sdk.rs | 54 +++++++++---- .../src/{ => swapper}/boltz_status_stream.rs | 79 +++++++++---------- lib/core/src/{swapper.rs => swapper/mod.rs} | 29 ++++++- 7 files changed, 127 insertions(+), 61 deletions(-) rename lib/core/src/{ => swapper}/boltz_status_stream.rs (86%) rename lib/core/src/{swapper.rs => swapper/mod.rs} (92%) diff --git a/cli/Cargo.lock b/cli/Cargo.lock index 39ef37f..a668182 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -183,6 +183,17 @@ dependencies = [ "backtrace", ] +[[package]] +name = "async-trait" +version = "0.1.80" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic" version = "0.5.3" @@ -408,6 +419,7 @@ name = "breez-liquid-sdk" version = "0.0.1" dependencies = [ "anyhow", + "async-trait", "bip39", "boltz-client", "chrono", diff --git a/lib/Cargo.lock b/lib/Cargo.lock index da855c7..9d88457 100644 --- a/lib/Cargo.lock +++ b/lib/Cargo.lock @@ -263,6 +263,17 @@ dependencies = [ "toml", ] +[[package]] +name = "async-trait" +version = "0.1.80" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.61", +] + [[package]] name = "atomic" version = "0.5.3" @@ -512,6 +523,7 @@ name = "breez-liquid-sdk" version = "0.0.1" dependencies = [ "anyhow", + "async-trait", "bip39", "boltz-client", "chrono", diff --git a/lib/core/Cargo.toml b/lib/core/Cargo.toml index dfdc90d..ca0d8cd 100644 --- a/lib/core/Cargo.toml +++ b/lib/core/Cargo.toml @@ -35,6 +35,7 @@ openssl = { version = "0.10", features = ["vendored"] } tokio = { version = "1", features = ["rt", "macros"] } url = "2.5.0" futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] } +async-trait = "0.1.80" # Pin these versions to fix iOS build issues security-framework = "=2.10.0" diff --git a/lib/core/src/lib.rs b/lib/core/src/lib.rs index 0a2a2fc..3067617 100644 --- a/lib/core/src/lib.rs +++ b/lib/core/src/lib.rs @@ -1,6 +1,5 @@ #[cfg(feature = "frb")] pub mod bindings; -pub(crate) mod boltz_status_stream; pub mod error; pub(crate) mod event; #[cfg(feature = "frb")] diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index cbd8911..0aaec64 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -6,7 +6,7 @@ use std::{ sync::Arc, time::{Duration, UNIX_EPOCH}, }; - +use async_trait::async_trait; use anyhow::{anyhow, Result}; use boltz_client::lightning_invoice::Bolt11InvoiceDescription; use boltz_client::swaps::boltzv2; @@ -33,9 +33,8 @@ use tokio::time::MissedTickBehavior; use crate::error::LiquidSdkError; use crate::model::PaymentState::*; -use crate::swapper::{BoltzSwapper, Swapper}; -use crate::{ - boltz_status_stream::BoltzStatusStream, +use crate::swapper::{BoltzSwapper, ReconnectHandler, Swapper, SwapperStatusStream}; +use crate::{ ensure_sdk, error::{LiquidSdkResult, PaymentError}, event::EventManager, @@ -59,7 +58,7 @@ pub struct LiquidSdk { lwk_signer: SwSigner, persister: Arc, event_manager: Arc, - status_stream: Arc, + status_stream: Arc, swapper: Arc, is_started: RwLock, shutdown_sender: watch::Sender<()>, @@ -99,19 +98,19 @@ impl LiquidSdk { let persister = Arc::new(Persister::new(&config.working_dir, config.network)?); persister.init()?; - let event_manager = Arc::new(EventManager::new()); - let status_stream = Arc::new(BoltzStatusStream::new(&config.boltz_url, persister.clone())); + let event_manager = Arc::new(EventManager::new()); let (shutdown_sender, shutdown_receiver) = watch::channel::<()>(()); let swapper = Arc::new(BoltzSwapper::new(config.clone())); + let status_stream = Arc::::from(swapper.create_status_stream()); let sdk = Arc::new(LiquidSdk { config, lwk_wollet: Arc::new(Mutex::new(lwk_wollet)), lwk_signer: opts.signer, - persister, + persister: persister.clone(), event_manager, - status_stream, + status_stream: status_stream.clone(), swapper, is_started: RwLock::new(false), shutdown_sender, @@ -159,12 +158,12 @@ impl LiquidSdk { } } }); - - self.status_stream - .clone() - .track_pending_swaps(self.shutdown_receiver.clone()) - .await; - + + let reconnect_handler = Box::new(SwapperReconnectHandler{ + persister: self.persister.clone(), + status_stream: self.status_stream.clone(), + }); + self.status_stream.clone().start(reconnect_handler, self.shutdown_receiver.clone()).await; self.track_swap_updates().await; self.track_refundable_swaps().await; @@ -1446,7 +1445,30 @@ impl LiquidSdk { /// An error is thrown if a global logger is already configured. pub fn init_logging(log_dir: &str, app_logger: Option>) -> Result<()> { crate::logger::init_logging(log_dir, app_logger) - } + } +} + +struct SwapperReconnectHandler { + persister: Arc, + status_stream: Arc, +} + +#[async_trait] +impl ReconnectHandler for SwapperReconnectHandler { + async fn on_stream_reconnect(&self) { + match self.persister.list_ongoing_swaps() { + Ok(initial_ongoing_swaps) => { + info!("On stream reconnection, got {} initial ongoing swaps", initial_ongoing_swaps.len()); + for ongoing_swap in initial_ongoing_swaps { + match self.status_stream.track_swap_id(&ongoing_swap.id()) { + Ok(_) => info!("Tracking ongoing swap: {}", ongoing_swap.id()), + Err(e) => error!("Failed to track ongoing swap: {e:?}"), + } + } + } + Err(e) => error!("Failed to list initial ongoing swaps: {e:?}"), + } + } } #[cfg(test)] diff --git a/lib/core/src/boltz_status_stream.rs b/lib/core/src/swapper/boltz_status_stream.rs similarity index 86% rename from lib/core/src/boltz_status_stream.rs rename to lib/core/src/swapper/boltz_status_stream.rs index 4999a0c..144281d 100644 --- a/lib/core/src/boltz_status_stream.rs +++ b/lib/core/src/swapper/boltz_status_stream.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Result}; +use async_trait::async_trait; use boltz_client::swaps::boltzv2::{self, Subscription, SwapUpdate}; use futures_util::{SinkExt, StreamExt}; use log::{debug, error, info, warn}; @@ -12,39 +13,65 @@ use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; use url::Url; -use crate::persist::Persister; +use super::{ReconnectHandler, SwapperStatusStream}; pub(crate) struct BoltzStatusStream { url: String, - persister: Arc, subscription_notifier: broadcast::Sender, update_notifier: broadcast::Sender, } impl BoltzStatusStream { - pub(crate) fn new(url: &str, persister: Arc) -> Self { + pub(crate) fn new(url: &str) -> Self { let (subscription_notifier, _) = broadcast::channel::(30); let (update_notifier, _) = broadcast::channel::(30); Self { url: url.replace("http", "ws") + "/ws", - persister, subscription_notifier, update_notifier, } } - pub(crate) fn track_swap_id(&self, swap_id: &str) -> Result<()> { + async fn connect(&self) -> Result>> { + let (socket, _) = connect_async(Url::parse(&self.url)?) + .await + .map_err(|e| anyhow!("Failed to connect to websocket: {e:?}"))?; + Ok(socket) + } + + async fn send_subscription( + &self, + swap_id: String, + ws_stream: &mut WebSocketStream>, + ) { + info!("Subscribing to status updates for swap ID {swap_id}"); + + let subscription = Subscription::new(&swap_id); + match serde_json::to_string(&subscription) { + Ok(subscribe_json) => match ws_stream.send(Message::Text(subscribe_json)).await { + Ok(_) => info!("Subscribed"), + Err(e) => error!("Failed to subscribe to {swap_id}: {e:?}"), + }, + Err(e) => error!("Invalid subscription msg: {e:?}"), + } + } +} + +#[async_trait] +impl SwapperStatusStream for BoltzStatusStream { + fn track_swap_id(&self, swap_id: &str) -> Result<()> { let _ = self.subscription_notifier.send(swap_id.to_string()); Ok(()) } - pub(crate) fn subscribe_swap_updates(&self) -> broadcast::Receiver { + fn subscribe_swap_updates(&self) -> broadcast::Receiver { self.update_notifier.subscribe() } - pub(crate) async fn track_pending_swaps( - self: Arc, + async fn start( + self: Arc, + callback: Box, mut shutdown: watch::Receiver<()>, ) { let keep_alive_ping_interval = Duration::from_secs(15); @@ -55,17 +82,7 @@ impl BoltzStatusStream { debug!("Start of ws stream loop"); match self.connect().await { Ok(mut ws_stream) => { - // Initially subscribe to all ongoing swaps - match self.persister.list_ongoing_swaps() { - Ok(initial_ongoing_swaps) => { - info!("Got {} initial ongoing swaps", initial_ongoing_swaps.len()); - for ongoing_swap in initial_ongoing_swaps { - self.send_subscription(ongoing_swap.id(), &mut ws_stream) - .await; - } - } - Err(e) => error!("Failed to list initial ongoing swaps: {e:?}"), - } + callback.on_stream_reconnect().await; let mut interval = tokio::time::interval(keep_alive_ping_interval); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -148,28 +165,4 @@ impl BoltzStatusStream { } }); } - - async fn connect(&self) -> Result>> { - let (socket, _) = connect_async(Url::parse(&self.url)?) - .await - .map_err(|e| anyhow!("Failed to connect to websocket: {e:?}"))?; - Ok(socket) - } - - async fn send_subscription( - &self, - swap_id: String, - ws_stream: &mut WebSocketStream>, - ) { - info!("Subscribing to status updates for swap ID {swap_id}"); - - let subscription = Subscription::new(&swap_id); - match serde_json::to_string(&subscription) { - Ok(subscribe_json) => match ws_stream.send(Message::Text(subscribe_json)).await { - Ok(_) => info!("Subscribed"), - Err(e) => error!("Failed to subscribe to {swap_id}: {e:?}"), - }, - Err(e) => error!("Invalid subscription msg: {e:?}"), - } - } } diff --git a/lib/core/src/swapper.rs b/lib/core/src/swapper/mod.rs similarity index 92% rename from lib/core/src/swapper.rs rename to lib/core/src/swapper/mod.rs index 2517362..e53b985 100644 --- a/lib/core/src/swapper.rs +++ b/lib/core/src/swapper/mod.rs @@ -1,9 +1,12 @@ +mod boltz_status_stream; use std::str::FromStr; +use std::sync::Arc; use anyhow::Result; +use async_trait::async_trait; use boltz_client::network::Chain; use boltz_client::swaps::boltzv2::{ - BoltzApiClientV2, ClaimTxResponse, CreateReverseRequest, CreateReverseResponse, + self, BoltzApiClientV2, ClaimTxResponse, CreateReverseRequest, CreateReverseResponse, CreateSubmarineRequest, CreateSubmarineResponse, ReversePair, SubmarinePair, }; @@ -11,14 +14,32 @@ use boltz_client::error::Error; use boltz_client::util::secrets::Preimage; use boltz_client::{Amount, Bolt11Invoice, LBtcSwapTxV2}; +use boltz_status_stream::BoltzStatusStream; use log::{debug, info}; use lwk_wollet::elements::LockTime; use serde_json::Value; +use tokio::sync::{broadcast, watch}; use crate::error::PaymentError; use crate::model::{Config, Network, ReceiveSwap, SendSwap}; use crate::utils; +#[async_trait] +pub trait ReconnectHandler: Send + Sync { + async fn on_stream_reconnect(&self); +} + +#[async_trait] +pub trait SwapperStatusStream: Send + Sync { + async fn start( + self: Arc, + callback: Box, + shutdown: watch::Receiver<()>, + ); + fn track_swap_id(&self, swap_id: &str) -> Result<()>; + fn subscribe_swap_updates(&self) -> broadcast::Receiver; +} + pub trait Swapper: Send + Sync { /// Create a new send swap fn create_send_swap( @@ -79,6 +100,8 @@ pub trait Swapper: Send + Sync { /// Chain broadcast #[allow(dead_code)] fn broadcast_tx(&self, chain: Chain, tx_hex: &str) -> Result; + + fn create_status_stream(&self) -> Box; } pub struct BoltzSwapper { @@ -310,4 +333,8 @@ impl Swapper for BoltzSwapper { fn broadcast_tx(&self, chain: Chain, tx_hex: &str) -> Result { Ok(self.client.broadcast_tx(chain, &tx_hex.into())?) } + + fn create_status_stream(&self) -> Box { + Box::new(BoltzStatusStream::new(&self.config.boltz_url)) + } }