diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 0aaec64..9f18380 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -1,13 +1,5 @@ -use std::time::Instant; -use std::{ - fs, - path::PathBuf, - str::FromStr, - sync::Arc, - time::{Duration, UNIX_EPOCH}, -}; -use async_trait::async_trait; use anyhow::{anyhow, Result}; +use async_trait::async_trait; use boltz_client::lightning_invoice::Bolt11InvoiceDescription; use boltz_client::swaps::boltzv2; use boltz_client::ToHex; @@ -28,13 +20,21 @@ use lwk_wollet::{ elements::{Address, LockTime, Transaction}, BlockchainBackend, ElementsNetwork, FsPersister, Wollet as LwkWollet, WolletDescriptor, }; +use std::time::Instant; +use std::{ + fs, + path::PathBuf, + str::FromStr, + sync::Arc, + time::{Duration, UNIX_EPOCH}, +}; use tokio::sync::{watch, Mutex, RwLock}; use tokio::time::MissedTickBehavior; use crate::error::LiquidSdkError; use crate::model::PaymentState::*; use crate::swapper::{BoltzSwapper, ReconnectHandler, Swapper, SwapperStatusStream}; -use crate::{ +use crate::{ ensure_sdk, error::{LiquidSdkResult, PaymentError}, event::EventManager, @@ -58,7 +58,7 @@ pub struct LiquidSdk { lwk_signer: SwSigner, persister: Arc, event_manager: Arc, - status_stream: Arc, + status_stream: Arc, swapper: Arc, is_started: RwLock, shutdown_sender: watch::Sender<()>, @@ -98,7 +98,7 @@ impl LiquidSdk { let persister = Arc::new(Persister::new(&config.working_dir, config.network)?); persister.init()?; - let event_manager = Arc::new(EventManager::new()); + let event_manager = Arc::new(EventManager::new()); let (shutdown_sender, shutdown_receiver) = watch::channel::<()>(()); let swapper = Arc::new(BoltzSwapper::new(config.clone())); @@ -110,7 +110,7 @@ impl LiquidSdk { lwk_signer: opts.signer, persister: persister.clone(), event_manager, - status_stream: status_stream.clone(), + status_stream: status_stream.clone(), swapper, is_started: RwLock::new(false), shutdown_sender, @@ -158,12 +158,15 @@ impl LiquidSdk { } } }); - - let reconnect_handler = Box::new(SwapperReconnectHandler{ + + let reconnect_handler = Box::new(SwapperReconnectHandler { persister: self.persister.clone(), - status_stream: self.status_stream.clone(), + status_stream: self.status_stream.clone(), }); - self.status_stream.clone().start(reconnect_handler, self.shutdown_receiver.clone()).await; + self.status_stream + .clone() + .start(reconnect_handler, self.shutdown_receiver.clone()) + .await; self.track_swap_updates().await; self.track_refundable_swaps().await; @@ -1445,30 +1448,33 @@ impl LiquidSdk { /// An error is thrown if a global logger is already configured. pub fn init_logging(log_dir: &str, app_logger: Option>) -> Result<()> { crate::logger::init_logging(log_dir, app_logger) - } + } } struct SwapperReconnectHandler { - persister: Arc, - status_stream: Arc, + persister: Arc, + status_stream: Arc, } #[async_trait] impl ReconnectHandler for SwapperReconnectHandler { - async fn on_stream_reconnect(&self) { - match self.persister.list_ongoing_swaps() { - Ok(initial_ongoing_swaps) => { - info!("On stream reconnection, got {} initial ongoing swaps", initial_ongoing_swaps.len()); - for ongoing_swap in initial_ongoing_swaps { - match self.status_stream.track_swap_id(&ongoing_swap.id()) { - Ok(_) => info!("Tracking ongoing swap: {}", ongoing_swap.id()), - Err(e) => error!("Failed to track ongoing swap: {e:?}"), - } - } - } - Err(e) => error!("Failed to list initial ongoing swaps: {e:?}"), - } - } + async fn on_stream_reconnect(&self) { + match self.persister.list_ongoing_swaps() { + Ok(initial_ongoing_swaps) => { + info!( + "On stream reconnection, got {} initial ongoing swaps", + initial_ongoing_swaps.len() + ); + for ongoing_swap in initial_ongoing_swaps { + match self.status_stream.track_swap_id(&ongoing_swap.id()) { + Ok(_) => info!("Tracking ongoing swap: {}", ongoing_swap.id()), + Err(e) => error!("Failed to track ongoing swap: {e:?}"), + } + } + } + Err(e) => error!("Failed to list initial ongoing swaps: {e:?}"), + } + } } #[cfg(test)]