lock free sync trigger

This commit is contained in:
Roei Erez
2025-02-11 17:58:43 +02:00
parent 2216e8859f
commit fb953c3386
9 changed files with 54 additions and 46 deletions

View File

@@ -159,7 +159,7 @@ impl Persister {
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
self.set_last_derivation_index_inner(&tx, index)?;
tx.commit()?;
self.trigger_sync()?;
self.trigger_sync();
Ok(())
}
@@ -183,7 +183,7 @@ impl Persister {
None => None,
};
tx.commit()?;
self.trigger_sync()?;
self.trigger_sync();
Ok(res)
}
}

View File

@@ -117,7 +117,7 @@ impl Persister {
true => {
self.commit_outgoing(&tx, &chain_swap.id, RecordType::Chain, updated_fields)?;
tx.commit()?;
self.trigger_sync()?;
self.trigger_sync();
}
false => {
tx.commit()?;
@@ -303,9 +303,7 @@ impl Persister {
Some(vec!["accept_zero_conf".to_string()]),
)?;
tx.commit()?;
self.trigger_sync().map_err(|err| PaymentError::Generic {
err: format!("Could not trigger manual sync: {err:?}"),
})?;
self.trigger_sync();
Ok(())
}
@@ -363,9 +361,7 @@ impl Persister {
Some(vec!["accepted_receiver_amount_sat".to_string()]),
)?;
tx.commit()?;
self.trigger_sync().map_err(|err| PaymentError::Generic {
err: format!("Could not trigger manual sync: {err:?}"),
})?;
self.trigger_sync();
Ok(())
}
@@ -394,9 +390,7 @@ impl Persister {
Some(vec!["auto_accepted_fees".to_string()]),
)?;
tx.commit()?;
self.trigger_sync().map_err(|err| PaymentError::Generic {
err: format!("Could not trigger manual sync: {err:?}"),
})?;
self.trigger_sync();
Ok(())
}

View File

@@ -11,7 +11,6 @@ pub(crate) mod sync;
use std::collections::{HashMap, HashSet};
use std::ops::Not;
use std::sync::RwLock;
use std::{fs::create_dir_all, path::PathBuf, str::FromStr};
use crate::lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription};
@@ -28,14 +27,14 @@ use rusqlite::{
};
use rusqlite_migration::{Migrations, M};
use sdk_common::bitcoin::hashes::hex::ToHex;
use tokio::sync::mpsc::Sender;
use tokio::sync::broadcast::{self, Sender};
const DEFAULT_DB_FILENAME: &str = "storage.sql";
pub(crate) struct Persister {
main_db_dir: PathBuf,
network: LiquidNetwork,
pub(crate) sync_trigger: RwLock<Option<Sender<()>>>,
pub(crate) sync_trigger: Option<Sender<()>>,
}
/// Builds a WHERE clause that checks if `state` is any of the given arguments
@@ -60,19 +59,20 @@ fn where_clauses_to_string(where_clauses: Vec<String>) -> String {
}
impl Persister {
pub fn new(
working_dir: &str,
network: LiquidNetwork,
sync_trigger: Option<Sender<()>>,
) -> Result<Self> {
pub fn new(working_dir: &str, network: LiquidNetwork, sync_enabled: bool) -> Result<Self> {
let main_db_dir = PathBuf::from_str(working_dir)?;
if !main_db_dir.exists() {
create_dir_all(&main_db_dir)?;
}
let mut sync_trigger = None;
if sync_enabled {
let (events_notifier, _) = broadcast::channel::<()>(100);
sync_trigger = Some(events_notifier);
}
Ok(Persister {
main_db_dir,
network,
sync_trigger: RwLock::new(sync_trigger),
sync_trigger,
})
}
@@ -273,7 +273,7 @@ impl Persister {
tx.commit()?;
if trigger_sync {
self.trigger_sync()?;
self.trigger_sync();
}
Ok(())
@@ -340,7 +340,7 @@ impl Persister {
None,
)?;
tx.commit()?;
self.trigger_sync()?;
self.trigger_sync();
Ok(())
}

View File

@@ -117,7 +117,7 @@ impl Persister {
true => {
self.commit_outgoing(&tx, &receive_swap.id, RecordType::Receive, updated_fields)?;
tx.commit()?;
self.trigger_sync()?;
self.trigger_sync();
}
false => {
tx.commit()?;

View File

@@ -104,7 +104,7 @@ impl Persister {
true => {
self.commit_outgoing(&tx, &send_swap.id, RecordType::Send, updated_fields)?;
tx.commit()?;
self.trigger_sync()?;
self.trigger_sync();
}
false => {
tx.commit()?;
@@ -306,9 +306,7 @@ impl Persister {
self.commit_outgoing(&tx, swap_id, RecordType::Send, updated_fields)?;
tx.commit()?;
self.trigger_sync().map_err(|err| PaymentError::Generic {
err: format!("Could not trigger manual sync: {err:?}"),
})?;
self.trigger_sync();
Ok(())
}

View File

@@ -4,6 +4,7 @@ use anyhow::Result;
use rusqlite::{
named_params, Connection, OptionalExtension, Row, Statement, Transaction, TransactionBehavior,
};
use tokio::sync::broadcast;
use super::{cache::KEY_LAST_DERIVATION_INDEX, PaymentTxDetails, Persister, Swap};
use crate::{
@@ -246,6 +247,9 @@ impl Persister {
record_type: RecordType,
updated_fields: Option<Vec<String>>,
) -> Result<()> {
if self.sync_trigger.is_none() {
return Ok(());
}
let record_id = Record::get_id_from_record_type(record_type, data_id);
let updated_fields = updated_fields
.map(|fields| {
@@ -487,12 +491,16 @@ impl Persister {
Ok(())
}
pub(crate) fn trigger_sync(&self) -> Result<()> {
if let Ok(lock) = self.sync_trigger.try_read() {
if let Some(trigger) = lock.clone() {
trigger.try_send(())?;
}
pub(crate) fn subscribe_sync_trigger(&self) -> Result<broadcast::Receiver<()>> {
match self.sync_trigger {
Some(ref sender) => Ok(sender.subscribe()),
None => Err(anyhow::anyhow!("Sync is not enabled")),
}
}
pub(crate) fn trigger_sync(&self) {
if let Some(sender) = self.sync_trigger.clone() {
_ = sender.send(());
}
Ok(())
}
}

View File

@@ -172,7 +172,12 @@ impl LiquidSdk {
&fingerprint_hex,
)?;
let persister = Arc::new(Persister::new(&working_dir, config.network, None)?);
let sync_enabled = config
.sync_service_url
.clone()
.map(|_| true)
.unwrap_or(false);
let persister = Arc::new(Persister::new(&working_dir, config.network, sync_enabled)?);
persister.init()?;
persister.replace_asset_metadata(config.asset_metadata.clone())?;

View File

@@ -5,7 +5,6 @@ use std::time::Duration;
use anyhow::{anyhow, Result};
use futures_util::TryFutureExt;
use log::{info, trace, warn};
use tokio::sync::mpsc::Receiver;
use tokio::sync::watch;
use tokio::time::sleep;
use tokio_stream::StreamExt as _;
@@ -39,13 +38,6 @@ pub(crate) struct SyncService {
}
impl SyncService {
fn set_sync_trigger(persister: Arc<Persister>) -> Receiver<()> {
let (sync_trigger_tx, sync_trigger_rx) = tokio::sync::mpsc::channel::<()>(30);
let mut persister_trigger = persister.sync_trigger.write().unwrap();
*persister_trigger = Some(sync_trigger_tx);
sync_trigger_rx
}
pub(crate) fn new(
remote_url: String,
persister: Arc<Persister>,
@@ -97,7 +89,10 @@ impl SyncService {
log::warn!("realtime-sync: Could not check for remote change: {err:?}");
return;
}
let mut local_sync_trigger = Self::set_sync_trigger(self.persister.clone());
let Ok(mut local_sync_trigger) = self.persister.subscribe_sync_trigger() else {
log::warn!("realtime-sync: Could not subscribe to local sync trigger");
return;
};
log::debug!("realtime-sync: Starting real-time sync event loop");
loop {
@@ -115,10 +110,18 @@ impl SyncService {
continue;
}
};
loop {
log::info!("realtime-sync: before tokio_select");
tokio::select! {
Some(_) = local_sync_trigger.recv() => self.run_event_loop().await,
local_event = local_sync_trigger.recv() => {
match local_event {
Ok(_) => self.run_event_loop().await,
Err(err) => {
log::warn!("realtime-sync: local trigger received error, probably lagging behind {err:?}");
}
}
}
Some(msg) = remote_sync_trigger.next() => match msg {
Ok(_) => self.run_event_loop().await,
Err(err) => {

View File

@@ -130,7 +130,7 @@ macro_rules! create_persister {
.to_str()
.ok_or(anyhow::anyhow!("Could not create temporary directory"))?,
crate::model::LiquidNetwork::Testnet,
None,
true,
)?);
$name.init()?;
};