From fb953c3386f9d155698b5cba4024a065481bc4fa Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Tue, 11 Feb 2025 17:58:43 +0200 Subject: [PATCH] lock free sync trigger --- lib/core/src/persist/cache.rs | 4 ++-- lib/core/src/persist/chain.rs | 14 ++++---------- lib/core/src/persist/mod.rs | 22 +++++++++++----------- lib/core/src/persist/receive.rs | 2 +- lib/core/src/persist/send.rs | 6 ++---- lib/core/src/persist/sync.rs | 20 ++++++++++++++------ lib/core/src/sdk.rs | 7 ++++++- lib/core/src/sync/mod.rs | 23 +++++++++++++---------- lib/core/src/test_utils/persist.rs | 2 +- 9 files changed, 54 insertions(+), 46 deletions(-) diff --git a/lib/core/src/persist/cache.rs b/lib/core/src/persist/cache.rs index 68978db..f7bd85c 100644 --- a/lib/core/src/persist/cache.rs +++ b/lib/core/src/persist/cache.rs @@ -159,7 +159,7 @@ impl Persister { let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; self.set_last_derivation_index_inner(&tx, index)?; tx.commit()?; - self.trigger_sync()?; + self.trigger_sync(); Ok(()) } @@ -183,7 +183,7 @@ impl Persister { None => None, }; tx.commit()?; - self.trigger_sync()?; + self.trigger_sync(); Ok(res) } } diff --git a/lib/core/src/persist/chain.rs b/lib/core/src/persist/chain.rs index ccb278e..cb4be69 100644 --- a/lib/core/src/persist/chain.rs +++ b/lib/core/src/persist/chain.rs @@ -117,7 +117,7 @@ impl Persister { true => { self.commit_outgoing(&tx, &chain_swap.id, RecordType::Chain, updated_fields)?; tx.commit()?; - self.trigger_sync()?; + self.trigger_sync(); } false => { tx.commit()?; @@ -303,9 +303,7 @@ impl Persister { Some(vec!["accept_zero_conf".to_string()]), )?; tx.commit()?; - self.trigger_sync().map_err(|err| PaymentError::Generic { - err: format!("Could not trigger manual sync: {err:?}"), - })?; + self.trigger_sync(); Ok(()) } @@ -363,9 +361,7 @@ impl Persister { Some(vec!["accepted_receiver_amount_sat".to_string()]), )?; tx.commit()?; - self.trigger_sync().map_err(|err| PaymentError::Generic { - err: format!("Could not trigger manual sync: {err:?}"), - })?; + self.trigger_sync(); Ok(()) } @@ -394,9 +390,7 @@ impl Persister { Some(vec!["auto_accepted_fees".to_string()]), )?; tx.commit()?; - self.trigger_sync().map_err(|err| PaymentError::Generic { - err: format!("Could not trigger manual sync: {err:?}"), - })?; + self.trigger_sync(); Ok(()) } diff --git a/lib/core/src/persist/mod.rs b/lib/core/src/persist/mod.rs index 1be4ae3..237e682 100644 --- a/lib/core/src/persist/mod.rs +++ b/lib/core/src/persist/mod.rs @@ -11,7 +11,6 @@ pub(crate) mod sync; use std::collections::{HashMap, HashSet}; use std::ops::Not; -use std::sync::RwLock; use std::{fs::create_dir_all, path::PathBuf, str::FromStr}; use crate::lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription}; @@ -28,14 +27,14 @@ use rusqlite::{ }; use rusqlite_migration::{Migrations, M}; use sdk_common::bitcoin::hashes::hex::ToHex; -use tokio::sync::mpsc::Sender; +use tokio::sync::broadcast::{self, Sender}; const DEFAULT_DB_FILENAME: &str = "storage.sql"; pub(crate) struct Persister { main_db_dir: PathBuf, network: LiquidNetwork, - pub(crate) sync_trigger: RwLock>>, + pub(crate) sync_trigger: Option>, } /// Builds a WHERE clause that checks if `state` is any of the given arguments @@ -60,19 +59,20 @@ fn where_clauses_to_string(where_clauses: Vec) -> String { } impl Persister { - pub fn new( - working_dir: &str, - network: LiquidNetwork, - sync_trigger: Option>, - ) -> Result { + pub fn new(working_dir: &str, network: LiquidNetwork, sync_enabled: bool) -> Result { let main_db_dir = PathBuf::from_str(working_dir)?; if !main_db_dir.exists() { create_dir_all(&main_db_dir)?; } + let mut sync_trigger = None; + if sync_enabled { + let (events_notifier, _) = broadcast::channel::<()>(100); + sync_trigger = Some(events_notifier); + } Ok(Persister { main_db_dir, network, - sync_trigger: RwLock::new(sync_trigger), + sync_trigger, }) } @@ -273,7 +273,7 @@ impl Persister { tx.commit()?; if trigger_sync { - self.trigger_sync()?; + self.trigger_sync(); } Ok(()) @@ -340,7 +340,7 @@ impl Persister { None, )?; tx.commit()?; - self.trigger_sync()?; + self.trigger_sync(); Ok(()) } diff --git a/lib/core/src/persist/receive.rs b/lib/core/src/persist/receive.rs index b378962..f56696f 100644 --- a/lib/core/src/persist/receive.rs +++ b/lib/core/src/persist/receive.rs @@ -117,7 +117,7 @@ impl Persister { true => { self.commit_outgoing(&tx, &receive_swap.id, RecordType::Receive, updated_fields)?; tx.commit()?; - self.trigger_sync()?; + self.trigger_sync(); } false => { tx.commit()?; diff --git a/lib/core/src/persist/send.rs b/lib/core/src/persist/send.rs index 7830c80..c8a683f 100644 --- a/lib/core/src/persist/send.rs +++ b/lib/core/src/persist/send.rs @@ -104,7 +104,7 @@ impl Persister { true => { self.commit_outgoing(&tx, &send_swap.id, RecordType::Send, updated_fields)?; tx.commit()?; - self.trigger_sync()?; + self.trigger_sync(); } false => { tx.commit()?; @@ -306,9 +306,7 @@ impl Persister { self.commit_outgoing(&tx, swap_id, RecordType::Send, updated_fields)?; tx.commit()?; - self.trigger_sync().map_err(|err| PaymentError::Generic { - err: format!("Could not trigger manual sync: {err:?}"), - })?; + self.trigger_sync(); Ok(()) } diff --git a/lib/core/src/persist/sync.rs b/lib/core/src/persist/sync.rs index 49e7e26..2e3111c 100644 --- a/lib/core/src/persist/sync.rs +++ b/lib/core/src/persist/sync.rs @@ -4,6 +4,7 @@ use anyhow::Result; use rusqlite::{ named_params, Connection, OptionalExtension, Row, Statement, Transaction, TransactionBehavior, }; +use tokio::sync::broadcast; use super::{cache::KEY_LAST_DERIVATION_INDEX, PaymentTxDetails, Persister, Swap}; use crate::{ @@ -246,6 +247,9 @@ impl Persister { record_type: RecordType, updated_fields: Option>, ) -> Result<()> { + if self.sync_trigger.is_none() { + return Ok(()); + } let record_id = Record::get_id_from_record_type(record_type, data_id); let updated_fields = updated_fields .map(|fields| { @@ -487,12 +491,16 @@ impl Persister { Ok(()) } - pub(crate) fn trigger_sync(&self) -> Result<()> { - if let Ok(lock) = self.sync_trigger.try_read() { - if let Some(trigger) = lock.clone() { - trigger.try_send(())?; - } + pub(crate) fn subscribe_sync_trigger(&self) -> Result> { + match self.sync_trigger { + Some(ref sender) => Ok(sender.subscribe()), + None => Err(anyhow::anyhow!("Sync is not enabled")), + } + } + + pub(crate) fn trigger_sync(&self) { + if let Some(sender) = self.sync_trigger.clone() { + _ = sender.send(()); } - Ok(()) } } diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index d9f7f1e..f19e893 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -172,7 +172,12 @@ impl LiquidSdk { &fingerprint_hex, )?; - let persister = Arc::new(Persister::new(&working_dir, config.network, None)?); + let sync_enabled = config + .sync_service_url + .clone() + .map(|_| true) + .unwrap_or(false); + let persister = Arc::new(Persister::new(&working_dir, config.network, sync_enabled)?); persister.init()?; persister.replace_asset_metadata(config.asset_metadata.clone())?; diff --git a/lib/core/src/sync/mod.rs b/lib/core/src/sync/mod.rs index 65a59a0..7326489 100644 --- a/lib/core/src/sync/mod.rs +++ b/lib/core/src/sync/mod.rs @@ -5,7 +5,6 @@ use std::time::Duration; use anyhow::{anyhow, Result}; use futures_util::TryFutureExt; use log::{info, trace, warn}; -use tokio::sync::mpsc::Receiver; use tokio::sync::watch; use tokio::time::sleep; use tokio_stream::StreamExt as _; @@ -39,13 +38,6 @@ pub(crate) struct SyncService { } impl SyncService { - fn set_sync_trigger(persister: Arc) -> Receiver<()> { - let (sync_trigger_tx, sync_trigger_rx) = tokio::sync::mpsc::channel::<()>(30); - let mut persister_trigger = persister.sync_trigger.write().unwrap(); - *persister_trigger = Some(sync_trigger_tx); - sync_trigger_rx - } - pub(crate) fn new( remote_url: String, persister: Arc, @@ -97,7 +89,10 @@ impl SyncService { log::warn!("realtime-sync: Could not check for remote change: {err:?}"); return; } - let mut local_sync_trigger = Self::set_sync_trigger(self.persister.clone()); + let Ok(mut local_sync_trigger) = self.persister.subscribe_sync_trigger() else { + log::warn!("realtime-sync: Could not subscribe to local sync trigger"); + return; + }; log::debug!("realtime-sync: Starting real-time sync event loop"); loop { @@ -115,10 +110,18 @@ impl SyncService { continue; } }; + loop { log::info!("realtime-sync: before tokio_select"); tokio::select! { - Some(_) = local_sync_trigger.recv() => self.run_event_loop().await, + local_event = local_sync_trigger.recv() => { + match local_event { + Ok(_) => self.run_event_loop().await, + Err(err) => { + log::warn!("realtime-sync: local trigger received error, probably lagging behind {err:?}"); + } + } + } Some(msg) = remote_sync_trigger.next() => match msg { Ok(_) => self.run_event_loop().await, Err(err) => { diff --git a/lib/core/src/test_utils/persist.rs b/lib/core/src/test_utils/persist.rs index 7d795e5..c6e8741 100644 --- a/lib/core/src/test_utils/persist.rs +++ b/lib/core/src/test_utils/persist.rs @@ -130,7 +130,7 @@ macro_rules! create_persister { .to_str() .ok_or(anyhow::anyhow!("Could not create temporary directory"))?, crate::model::LiquidNetwork::Testnet, - None, + true, )?); $name.init()?; };