diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 4355ec3..49a7e8a 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -503,7 +503,7 @@ impl LiquidSdk { if let Some(sync_service) = self.sync_service.clone() { sync_service.start(self.shutdown_receiver.clone()); } - self.track_new_blocks(); + self.start_track_new_blocks_task(); self.track_swap_updates(); self.track_realtime_sync_events(subscription_handler); @@ -568,81 +568,99 @@ impl LiquidSdk { }); } - fn track_new_blocks(self: &Arc) { + async fn track_new_blocks( + self: &Arc, + current_liquid_block: &mut u32, + current_bitcoin_block: &mut u32, + ) { + info!("Track new blocks iteration started"); + // Get the Liquid tip and process a new block + let t0 = Instant::now(); + let liquid_tip_res = self.liquid_chain_service.tip().await; + let duration_ms = Instant::now().duration_since(t0).as_millis(); + info!("Fetched liquid tip at ({duration_ms} ms)"); + + let is_new_liquid_block = match &liquid_tip_res { + Ok(height) => { + debug!("Got Liquid tip: {height}"); + let is_new_liquid_block = *height > *current_liquid_block; + *current_liquid_block = *height; + is_new_liquid_block + } + Err(e) => { + error!("Failed to fetch Liquid tip {e}"); + false + } + }; + // Get the Bitcoin tip and process a new block + let t0 = Instant::now(); + let bitcoin_tip_res = self.bitcoin_chain_service.tip().await; + let duration_ms = Instant::now().duration_since(t0).as_millis(); + info!("Fetched bitcoin tip at ({duration_ms} ms)"); + let is_new_bitcoin_block = match &bitcoin_tip_res { + Ok(height) => { + debug!("Got Bitcoin tip: {height}"); + let is_new_bitcoin_block = *height > *current_bitcoin_block; + *current_bitcoin_block = *height; + is_new_bitcoin_block + } + Err(e) => { + error!("Failed to fetch Bitcoin tip {e}"); + false + } + }; + + if let (Ok(liquid_tip), Ok(bitcoin_tip)) = (liquid_tip_res, bitcoin_tip_res) { + self.persister + .set_blockchain_info(&BlockchainInfo { + liquid_tip, + bitcoin_tip, + }) + .unwrap_or_else(|err| warn!("Could not update local tips: {err:?}")); + }; + + // Only partial sync when there are no new Liquid or Bitcoin blocks + let partial_sync = (is_new_liquid_block || is_new_bitcoin_block).not(); + _ = self.sync(partial_sync).await; + + // Update swap handlers + if is_new_liquid_block { + self.chain_swap_handler + .on_liquid_block(*current_liquid_block) + .await; + self.receive_swap_handler + .on_liquid_block(*current_liquid_block) + .await; + self.send_swap_handler + .on_liquid_block(*current_liquid_block) + .await; + } + if is_new_bitcoin_block { + self.chain_swap_handler + .on_bitcoin_block(*current_bitcoin_block) + .await; + self.receive_swap_handler + .on_bitcoin_block(*current_liquid_block) + .await; + self.send_swap_handler + .on_bitcoin_block(*current_bitcoin_block) + .await; + } + } + + fn start_track_new_blocks_task(self: &Arc) { let cloned = self.clone(); tokio::spawn(async move { let mut current_liquid_block: u32 = 0; let mut current_bitcoin_block: u32 = 0; let mut shutdown_receiver = cloned.shutdown_receiver.clone(); - let mut interval = tokio::time::interval(Duration::from_secs(10)); - #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + cloned + .track_new_blocks(&mut current_liquid_block, &mut current_bitcoin_block) + .await; loop { tokio::select! { - _ = interval.tick() => { - info!("Track blocks loop ticked"); - // Get the Liquid tip and process a new block - let t0 = Instant::now(); - let liquid_tip_res = cloned.liquid_chain_service.tip().await; - let duration_ms = Instant::now().duration_since(t0).as_millis(); - info!("Fetched liquid tip at ({duration_ms} ms)"); - - let is_new_liquid_block = match &liquid_tip_res { - Ok(height) => { - debug!("Got Liquid tip: {height}"); - let is_new_liquid_block = *height > current_liquid_block; - current_liquid_block = *height; - is_new_liquid_block - }, - Err(e) => { - error!("Failed to fetch Liquid tip {e}"); - false - } - }; - // Get the Bitcoin tip and process a new block - let t0 = Instant::now(); - let bitcoin_tip_res = cloned.bitcoin_chain_service.tip().await; - let duration_ms = Instant::now().duration_since(t0).as_millis(); - info!("Fetched bitcoin tip at ({duration_ms} ms)"); - let is_new_bitcoin_block = match &bitcoin_tip_res { - Ok(height) => { - debug!("Got Bitcoin tip: {height}"); - let is_new_bitcoin_block = *height > current_bitcoin_block; - current_bitcoin_block = *height; - is_new_bitcoin_block - }, - Err(e) => { - error!("Failed to fetch Bitcoin tip {e}"); - false - } - }; - - if let (Ok(liquid_tip), Ok(bitcoin_tip)) = (liquid_tip_res, bitcoin_tip_res) { - cloned.persister.set_blockchain_info(&BlockchainInfo { - liquid_tip, - bitcoin_tip - }) - .unwrap_or_else(|err| warn!("Could not update local tips: {err:?}")); - }; - - // Only partial sync when there are no new Liquid or Bitcoin blocks - let partial_sync = (is_new_liquid_block || is_new_bitcoin_block).not(); - _ = cloned.sync(partial_sync).await; - - // Update swap handlers - if is_new_liquid_block { - cloned.chain_swap_handler.on_liquid_block(current_liquid_block).await; - cloned.receive_swap_handler.on_liquid_block(current_liquid_block).await; - cloned.send_swap_handler.on_liquid_block(current_liquid_block).await; - } - if is_new_bitcoin_block { - cloned.chain_swap_handler.on_bitcoin_block(current_bitcoin_block).await; - cloned.receive_swap_handler.on_bitcoin_block(current_liquid_block).await; - cloned.send_swap_handler.on_bitcoin_block(current_bitcoin_block).await; - } - - #[cfg(all(target_family = "wasm", target_os = "unknown"))] - interval.reset(); + _ = tokio::time::sleep(Duration::from_secs(10)) => { + cloned.track_new_blocks(&mut current_liquid_block, &mut current_bitcoin_block).await; } _ = shutdown_receiver.changed() => { @@ -4029,6 +4047,7 @@ fn extract_description_from_metadata(request_data: &LnUrlPayRequestData) -> Opti #[cfg(test)] mod tests { use std::str::FromStr; + use std::time::Duration; use anyhow::{anyhow, Result}; use boltz_client::{ @@ -4713,4 +4732,31 @@ mod tests { Ok(()) } + + #[sdk_macros::async_test_all] + async fn test_background_tasks() -> Result<()> { + create_persister!(persister); + let swapper = Arc::new(MockSwapper::new()); + let status_stream = Arc::new(MockStatusStream::new()); + let liquid_chain_service = Arc::new(MockLiquidChainService::new()); + let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new()); + + let sdk = new_liquid_sdk_with_chain_services( + persister.clone(), + swapper.clone(), + status_stream.clone(), + liquid_chain_service.clone(), + bitcoin_chain_service.clone(), + None, + ) + .await?; + + sdk.start().await?; + + tokio::time::sleep(Duration::from_secs(3)).await; + + sdk.disconnect().await?; + + Ok(()) + } } diff --git a/lib/core/src/swapper/boltz/status_stream.rs b/lib/core/src/swapper/boltz/status_stream.rs index 12d1990..40f6b3e 100644 --- a/lib/core/src/swapper/boltz/status_stream.rs +++ b/lib/core/src/swapper/boltz/status_stream.rs @@ -65,10 +65,6 @@ impl SwapperStatusStream for BoltzSwapper

{ callback.subscribe_swaps().await; - let mut interval = tokio::time::interval(keep_alive_ping_interval); - #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { tokio::select! { _ = shutdown.changed() => { @@ -76,7 +72,7 @@ impl SwapperStatusStream for BoltzSwapper

{ return; }, - _ = interval.tick() => { + _ = tokio::time::sleep(keep_alive_ping_interval) => { match serde_json::to_string(&WsRequest::Ping) { Ok(ping_msg) => { match sender.send(Message::Text(ping_msg.into())).await { diff --git a/lib/core/src/test_utils/sync.rs b/lib/core/src/test_utils/sync.rs index 518adde..f559fe2 100644 --- a/lib/core/src/test_utils/sync.rs +++ b/lib/core/src/test_utils/sync.rs @@ -42,7 +42,7 @@ impl MockSyncerClient { #[sdk_macros::async_trait] impl SyncerClient for MockSyncerClient { async fn connect(&self, _connect_url: String) -> Result<()> { - todo!() + Ok(()) } async fn push(&self, req: SetRecordRequest) -> Result { @@ -68,7 +68,7 @@ impl SyncerClient for MockSyncerClient { }); } - return Err(anyhow::anyhow!("No record was sent")); + Err(anyhow::anyhow!("No record was sent")) } async fn pull(&self, _req: ListChangesRequest) -> Result { @@ -79,11 +79,11 @@ impl SyncerClient for MockSyncerClient { } async fn listen(&self, _req: ListenChangesRequest) -> Result> { - todo!() + Err(anyhow::anyhow!("Not implemented")) } async fn disconnect(&self) -> Result<()> { - todo!() + Ok(()) } } diff --git a/lib/core/tests/regtest/bitcoin.rs b/lib/core/tests/regtest/bitcoin.rs index 4f3df22..4ec5d00 100644 --- a/lib/core/tests/regtest/bitcoin.rs +++ b/lib/core/tests/regtest/bitcoin.rs @@ -16,6 +16,11 @@ wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); async fn bitcoin() { let mut handle = SdkNodeHandle::init_node().await.unwrap(); + handle + .wait_for_event(|e| matches!(e, SdkEvent::Synced { .. }), TIMEOUT) + .await + .unwrap(); + // --------------RECEIVE-------------- let payer_amount_sat = 100_000; @@ -258,12 +263,6 @@ async fn bitcoin() { assert_eq!(payments.len(), 3); let payment = &payments[0]; assert_eq!(payment.status, PaymentState::Failed); - println!("Payment details: {:?}", payment.details); - println!("Lockup amount sat: {}", lockup_amount_sat); - println!( - "Prepare refund rbf response: {:?}", - prepare_refund_rbf_response - ); // The following fails because the payment's refund_tx_amount_sat is None. Related issue: https://github.com/breez/breez-sdk-liquid/issues/773 // TODO: uncomment once the issue is fixed /*assert!(matches!( diff --git a/lib/core/tests/regtest/bolt11.rs b/lib/core/tests/regtest/bolt11.rs index e99c69f..d85e2b9 100644 --- a/lib/core/tests/regtest/bolt11.rs +++ b/lib/core/tests/regtest/bolt11.rs @@ -17,6 +17,16 @@ wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); #[serial] async fn bolt11() { let mut handle_alice = SdkNodeHandle::init_node().await.unwrap(); + let mut handle_bob = SdkNodeHandle::init_node().await.unwrap(); + + handle_alice + .wait_for_event(|e| matches!(e, SdkEvent::Synced { .. }), TIMEOUT) + .await + .unwrap(); + handle_bob + .wait_for_event(|e| matches!(e, SdkEvent::Synced { .. }), TIMEOUT) + .await + .unwrap(); // -------------------RECEIVE SWAP------------------- let payer_amount_sat = 200_000; @@ -136,8 +146,6 @@ async fn bolt11() { assert!(matches!(payment.details, PaymentDetails::Lightning { .. })); // -------------------MRH------------------- - let mut handle_bob = SdkNodeHandle::init_node().await.unwrap(); - let receiver_amount_sat = 50_000; let (_, receive_response) = handle_bob diff --git a/lib/core/tests/regtest/liquid.rs b/lib/core/tests/regtest/liquid.rs index c56ed93..de94c2d 100644 --- a/lib/core/tests/regtest/liquid.rs +++ b/lib/core/tests/regtest/liquid.rs @@ -14,6 +14,11 @@ wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); async fn liquid() { let mut handle = SdkNodeHandle::init_node().await.unwrap(); + handle + .wait_for_event(|e| matches!(e, SdkEvent::Synced { .. }), TIMEOUT) + .await + .unwrap(); + // --------------RECEIVE-------------- let (prepare_response, receive_response) = handle diff --git a/lib/core/tests/regtest/mod.rs b/lib/core/tests/regtest/mod.rs index cf21182..9217f0a 100644 --- a/lib/core/tests/regtest/mod.rs +++ b/lib/core/tests/regtest/mod.rs @@ -19,7 +19,7 @@ use breez_sdk_liquid::{ }; use tokio::sync::mpsc::{self, Receiver, Sender}; -pub const TIMEOUT: Duration = Duration::from_secs(15); +pub const TIMEOUT: Duration = Duration::from_secs(30); struct ForwardingEventListener { sender: Sender,