diff --git a/sync/engine/src/database_sync_engine.rs b/sync/engine/src/database_sync_engine.rs index 7176d7ede..e98acb182 100644 --- a/sync/engine/src/database_sync_engine.rs +++ b/sync/engine/src/database_sync_engine.rs @@ -1,5 +1,4 @@ use std::{ - cell::RefCell, collections::{HashMap, HashSet}, sync::{Arc, Mutex}, }; @@ -51,7 +50,7 @@ pub struct DatabaseSyncEngine { meta_path: String, changes_file: Arc>>>, opts: DatabaseSyncEngineOpts, - meta: RefCell, + meta: Mutex, client_unique_id: String, } @@ -147,7 +146,7 @@ impl DatabaseSyncEngine

{ tracing::info!("initialize database tape connection: path={}", main_db_path); let main_tape = DatabaseTape::new_with_opts(main_db, tape_opts); let changes_file = io.open_file(&changes_path, OpenFlags::Create, false)?; - let mut db = Self { + let db = Self { io, protocol, db_file, @@ -158,7 +157,7 @@ impl DatabaseSyncEngine

{ meta_path: format!("{main_db_path}-info"), changes_file: Arc::new(Mutex::new(Some(changes_file))), opts, - meta: RefCell::new(meta.clone()), + meta: Mutex::new(meta.clone()), client_unique_id: meta.client_unique_id.clone(), }; @@ -176,7 +175,7 @@ impl DatabaseSyncEngine

{ Ok(db) } - fn open_revert_db_conn(&mut self) -> Result> { + fn open_revert_db_conn(&self) -> Result> { let db = turso_core::Database::open_with_flags_bypass_registry( self.io.clone(), &self.main_db_path, @@ -191,10 +190,7 @@ impl DatabaseSyncEngine

{ Ok(conn) } - async fn checkpoint_passive( - &mut self, - coro: &Coro, - ) -> Result<(Option>, u64)> { + async fn checkpoint_passive(&self, coro: &Coro) -> Result<(Option>, u64)> { let watermark = self.meta().revert_since_wal_watermark; tracing::info!( "checkpoint(path={:?}): revert_since_wal_watermark={}", @@ -273,9 +269,13 @@ impl DatabaseSyncEngine

{ }) } - pub async fn checkpoint(&mut self, coro: &Coro) -> Result<()> { + pub async fn checkpoint(&self, coro: &Coro) -> Result<()> { let (main_wal_salt, watermark) = self.checkpoint_passive(coro).await?; + tracing::info!( + "checkpoint(path={:?}): passive checkpoint is done", + self.main_db_path + ); let main_conn = connect_untracked(&self.main_tape)?; let revert_conn = self.open_revert_db_conn()?; @@ -419,10 +419,17 @@ impl DatabaseSyncEngine

{ /// This method will **not** send local changed to the remote /// This method will block writes for the period of pull pub async fn apply_changes_from_remote( - &mut self, + &self, coro: &Coro, remote_changes: DbChangesStatus, ) -> Result<()> { + if remote_changes.file_slot.is_none() { + self.update_meta(coro, |m| { + m.last_pull_unix_time = remote_changes.time.secs; + }) + .await?; + return Ok(()); + } assert!(remote_changes.file_slot.is_some(), "file_slot must be set"); let changes_file = remote_changes.file_slot.as_ref().unwrap().value.clone(); let pull_result = self.apply_changes_internal(coro, &changes_file).await; @@ -447,7 +454,7 @@ impl DatabaseSyncEngine

{ Ok(()) } async fn apply_changes_internal( - &mut self, + &self, coro: &Coro, changes_file: &Arc, ) -> Result { @@ -652,7 +659,7 @@ impl DatabaseSyncEngine

{ /// Sync local changes to remote DB and bring new changes from remote to local /// This method will block writes for the period of sync - pub async fn sync(&mut self, coro: &Coro) -> Result<()> { + pub async fn sync(&self, coro: &Coro) -> Result<()> { // todo(sivukhin): this is bit suboptimal as both 'push' and 'pull' will call pull_synced_from_remote // but for now - keep it simple self.push_changes_to_remote(coro).await?; @@ -660,21 +667,14 @@ impl DatabaseSyncEngine

{ Ok(()) } - pub async fn pull_changes_from_remote(&mut self, coro: &Coro) -> Result<()> { + pub async fn pull_changes_from_remote(&self, coro: &Coro) -> Result<()> { let changes = self.wait_changes_from_remote(coro).await?; - if changes.file_slot.is_some() { - self.apply_changes_from_remote(coro, changes).await?; - } else { - self.update_meta(coro, |m| { - m.last_pull_unix_time = changes.time.secs; - }) - .await?; - } + self.apply_changes_from_remote(coro, changes).await?; Ok(()) } - fn meta(&self) -> std::cell::Ref<'_, DatabaseMetadata> { - self.meta.borrow() + fn meta(&self) -> std::sync::MutexGuard<'_, DatabaseMetadata> { + self.meta.lock().unwrap() } async fn update_meta( @@ -688,7 +688,7 @@ impl DatabaseSyncEngine

{ let completion = self.protocol.full_write(&self.meta_path, meta.dump()?)?; // todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated wait_all_results(coro, &completion).await?; - self.meta.replace(meta); + *self.meta.lock().unwrap() = meta; Ok(()) } }