fix: Interval not compatible with node.js (#880)

* fix: Interval not compatible with node.js

* Address review and fix E2E tests
This commit is contained in:
Daniel Granhão
2025-04-15 09:19:15 +01:00
committed by GitHub
parent 0f1932ede2
commit 1f71244c45
7 changed files with 141 additions and 87 deletions

View File

@@ -503,7 +503,7 @@ impl LiquidSdk {
if let Some(sync_service) = self.sync_service.clone() {
sync_service.start(self.shutdown_receiver.clone());
}
self.track_new_blocks();
self.start_track_new_blocks_task();
self.track_swap_updates();
self.track_realtime_sync_events(subscription_handler);
@@ -568,81 +568,99 @@ impl LiquidSdk {
});
}
fn track_new_blocks(self: &Arc<LiquidSdk>) {
async fn track_new_blocks(
self: &Arc<LiquidSdk>,
current_liquid_block: &mut u32,
current_bitcoin_block: &mut u32,
) {
info!("Track new blocks iteration started");
// Get the Liquid tip and process a new block
let t0 = Instant::now();
let liquid_tip_res = self.liquid_chain_service.tip().await;
let duration_ms = Instant::now().duration_since(t0).as_millis();
info!("Fetched liquid tip at ({duration_ms} ms)");
let is_new_liquid_block = match &liquid_tip_res {
Ok(height) => {
debug!("Got Liquid tip: {height}");
let is_new_liquid_block = *height > *current_liquid_block;
*current_liquid_block = *height;
is_new_liquid_block
}
Err(e) => {
error!("Failed to fetch Liquid tip {e}");
false
}
};
// Get the Bitcoin tip and process a new block
let t0 = Instant::now();
let bitcoin_tip_res = self.bitcoin_chain_service.tip().await;
let duration_ms = Instant::now().duration_since(t0).as_millis();
info!("Fetched bitcoin tip at ({duration_ms} ms)");
let is_new_bitcoin_block = match &bitcoin_tip_res {
Ok(height) => {
debug!("Got Bitcoin tip: {height}");
let is_new_bitcoin_block = *height > *current_bitcoin_block;
*current_bitcoin_block = *height;
is_new_bitcoin_block
}
Err(e) => {
error!("Failed to fetch Bitcoin tip {e}");
false
}
};
if let (Ok(liquid_tip), Ok(bitcoin_tip)) = (liquid_tip_res, bitcoin_tip_res) {
self.persister
.set_blockchain_info(&BlockchainInfo {
liquid_tip,
bitcoin_tip,
})
.unwrap_or_else(|err| warn!("Could not update local tips: {err:?}"));
};
// Only partial sync when there are no new Liquid or Bitcoin blocks
let partial_sync = (is_new_liquid_block || is_new_bitcoin_block).not();
_ = self.sync(partial_sync).await;
// Update swap handlers
if is_new_liquid_block {
self.chain_swap_handler
.on_liquid_block(*current_liquid_block)
.await;
self.receive_swap_handler
.on_liquid_block(*current_liquid_block)
.await;
self.send_swap_handler
.on_liquid_block(*current_liquid_block)
.await;
}
if is_new_bitcoin_block {
self.chain_swap_handler
.on_bitcoin_block(*current_bitcoin_block)
.await;
self.receive_swap_handler
.on_bitcoin_block(*current_liquid_block)
.await;
self.send_swap_handler
.on_bitcoin_block(*current_bitcoin_block)
.await;
}
}
fn start_track_new_blocks_task(self: &Arc<LiquidSdk>) {
let cloned = self.clone();
tokio::spawn(async move {
let mut current_liquid_block: u32 = 0;
let mut current_bitcoin_block: u32 = 0;
let mut shutdown_receiver = cloned.shutdown_receiver.clone();
let mut interval = tokio::time::interval(Duration::from_secs(10));
#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
cloned
.track_new_blocks(&mut current_liquid_block, &mut current_bitcoin_block)
.await;
loop {
tokio::select! {
_ = interval.tick() => {
info!("Track blocks loop ticked");
// Get the Liquid tip and process a new block
let t0 = Instant::now();
let liquid_tip_res = cloned.liquid_chain_service.tip().await;
let duration_ms = Instant::now().duration_since(t0).as_millis();
info!("Fetched liquid tip at ({duration_ms} ms)");
let is_new_liquid_block = match &liquid_tip_res {
Ok(height) => {
debug!("Got Liquid tip: {height}");
let is_new_liquid_block = *height > current_liquid_block;
current_liquid_block = *height;
is_new_liquid_block
},
Err(e) => {
error!("Failed to fetch Liquid tip {e}");
false
}
};
// Get the Bitcoin tip and process a new block
let t0 = Instant::now();
let bitcoin_tip_res = cloned.bitcoin_chain_service.tip().await;
let duration_ms = Instant::now().duration_since(t0).as_millis();
info!("Fetched bitcoin tip at ({duration_ms} ms)");
let is_new_bitcoin_block = match &bitcoin_tip_res {
Ok(height) => {
debug!("Got Bitcoin tip: {height}");
let is_new_bitcoin_block = *height > current_bitcoin_block;
current_bitcoin_block = *height;
is_new_bitcoin_block
},
Err(e) => {
error!("Failed to fetch Bitcoin tip {e}");
false
}
};
if let (Ok(liquid_tip), Ok(bitcoin_tip)) = (liquid_tip_res, bitcoin_tip_res) {
cloned.persister.set_blockchain_info(&BlockchainInfo {
liquid_tip,
bitcoin_tip
})
.unwrap_or_else(|err| warn!("Could not update local tips: {err:?}"));
};
// Only partial sync when there are no new Liquid or Bitcoin blocks
let partial_sync = (is_new_liquid_block || is_new_bitcoin_block).not();
_ = cloned.sync(partial_sync).await;
// Update swap handlers
if is_new_liquid_block {
cloned.chain_swap_handler.on_liquid_block(current_liquid_block).await;
cloned.receive_swap_handler.on_liquid_block(current_liquid_block).await;
cloned.send_swap_handler.on_liquid_block(current_liquid_block).await;
}
if is_new_bitcoin_block {
cloned.chain_swap_handler.on_bitcoin_block(current_bitcoin_block).await;
cloned.receive_swap_handler.on_bitcoin_block(current_liquid_block).await;
cloned.send_swap_handler.on_bitcoin_block(current_bitcoin_block).await;
}
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
interval.reset();
_ = tokio::time::sleep(Duration::from_secs(10)) => {
cloned.track_new_blocks(&mut current_liquid_block, &mut current_bitcoin_block).await;
}
_ = shutdown_receiver.changed() => {
@@ -4029,6 +4047,7 @@ fn extract_description_from_metadata(request_data: &LnUrlPayRequestData) -> Opti
#[cfg(test)]
mod tests {
use std::str::FromStr;
use std::time::Duration;
use anyhow::{anyhow, Result};
use boltz_client::{
@@ -4713,4 +4732,31 @@ mod tests {
Ok(())
}
#[sdk_macros::async_test_all]
async fn test_background_tasks() -> Result<()> {
create_persister!(persister);
let swapper = Arc::new(MockSwapper::new());
let status_stream = Arc::new(MockStatusStream::new());
let liquid_chain_service = Arc::new(MockLiquidChainService::new());
let bitcoin_chain_service = Arc::new(MockBitcoinChainService::new());
let sdk = new_liquid_sdk_with_chain_services(
persister.clone(),
swapper.clone(),
status_stream.clone(),
liquid_chain_service.clone(),
bitcoin_chain_service.clone(),
None,
)
.await?;
sdk.start().await?;
tokio::time::sleep(Duration::from_secs(3)).await;
sdk.disconnect().await?;
Ok(())
}
}

View File

@@ -65,10 +65,6 @@ impl<P: ProxyUrlFetcher> SwapperStatusStream for BoltzSwapper<P> {
callback.subscribe_swaps().await;
let mut interval = tokio::time::interval(keep_alive_ping_interval);
#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = shutdown.changed() => {
@@ -76,7 +72,7 @@ impl<P: ProxyUrlFetcher> SwapperStatusStream for BoltzSwapper<P> {
return;
},
_ = interval.tick() => {
_ = tokio::time::sleep(keep_alive_ping_interval) => {
match serde_json::to_string(&WsRequest::Ping) {
Ok(ping_msg) => {
match sender.send(Message::Text(ping_msg.into())).await {

View File

@@ -42,7 +42,7 @@ impl MockSyncerClient {
#[sdk_macros::async_trait]
impl SyncerClient for MockSyncerClient {
async fn connect(&self, _connect_url: String) -> Result<()> {
todo!()
Ok(())
}
async fn push(&self, req: SetRecordRequest) -> Result<SetRecordReply> {
@@ -68,7 +68,7 @@ impl SyncerClient for MockSyncerClient {
});
}
return Err(anyhow::anyhow!("No record was sent"));
Err(anyhow::anyhow!("No record was sent"))
}
async fn pull(&self, _req: ListChangesRequest) -> Result<ListChangesReply> {
@@ -79,11 +79,11 @@ impl SyncerClient for MockSyncerClient {
}
async fn listen(&self, _req: ListenChangesRequest) -> Result<Streaming<Notification>> {
todo!()
Err(anyhow::anyhow!("Not implemented"))
}
async fn disconnect(&self) -> Result<()> {
todo!()
Ok(())
}
}

View File

@@ -16,6 +16,11 @@ wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
async fn bitcoin() {
let mut handle = SdkNodeHandle::init_node().await.unwrap();
handle
.wait_for_event(|e| matches!(e, SdkEvent::Synced { .. }), TIMEOUT)
.await
.unwrap();
// --------------RECEIVE--------------
let payer_amount_sat = 100_000;
@@ -258,12 +263,6 @@ async fn bitcoin() {
assert_eq!(payments.len(), 3);
let payment = &payments[0];
assert_eq!(payment.status, PaymentState::Failed);
println!("Payment details: {:?}", payment.details);
println!("Lockup amount sat: {}", lockup_amount_sat);
println!(
"Prepare refund rbf response: {:?}",
prepare_refund_rbf_response
);
// The following fails because the payment's refund_tx_amount_sat is None. Related issue: https://github.com/breez/breez-sdk-liquid/issues/773
// TODO: uncomment once the issue is fixed
/*assert!(matches!(

View File

@@ -17,6 +17,16 @@ wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
#[serial]
async fn bolt11() {
let mut handle_alice = SdkNodeHandle::init_node().await.unwrap();
let mut handle_bob = SdkNodeHandle::init_node().await.unwrap();
handle_alice
.wait_for_event(|e| matches!(e, SdkEvent::Synced { .. }), TIMEOUT)
.await
.unwrap();
handle_bob
.wait_for_event(|e| matches!(e, SdkEvent::Synced { .. }), TIMEOUT)
.await
.unwrap();
// -------------------RECEIVE SWAP-------------------
let payer_amount_sat = 200_000;
@@ -136,8 +146,6 @@ async fn bolt11() {
assert!(matches!(payment.details, PaymentDetails::Lightning { .. }));
// -------------------MRH-------------------
let mut handle_bob = SdkNodeHandle::init_node().await.unwrap();
let receiver_amount_sat = 50_000;
let (_, receive_response) = handle_bob

View File

@@ -14,6 +14,11 @@ wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
async fn liquid() {
let mut handle = SdkNodeHandle::init_node().await.unwrap();
handle
.wait_for_event(|e| matches!(e, SdkEvent::Synced { .. }), TIMEOUT)
.await
.unwrap();
// --------------RECEIVE--------------
let (prepare_response, receive_response) = handle

View File

@@ -19,7 +19,7 @@ use breez_sdk_liquid::{
};
use tokio::sync::mpsc::{self, Receiver, Sender};
pub const TIMEOUT: Duration = Duration::from_secs(15);
pub const TIMEOUT: Duration = Duration::from_secs(30);
struct ForwardingEventListener {
sender: Sender<SdkEvent>,