From 0a12b2d74a7dee84ff6ade23f1d4109f1faef145 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 13 Aug 2025 12:49:40 +0400 Subject: [PATCH] disable auto-checkpoint for opened connections in the sync engine --- .../src/database_sync_engine.rs | 15 +++++++------- .../src/database_sync_operations.rs | 20 +++++++++++++++---- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index e9ddd038b..4621b61d5 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -2,8 +2,9 @@ use std::sync::Arc; use crate::{ database_sync_operations::{ - checkpoint_wal_file, db_bootstrap, reset_wal_file, transfer_logical_changes, - transfer_physical_changes, wait_full_body, wal_pull, wal_push, WalPullResult, + checkpoint_wal_file, connect, connect_untracked, db_bootstrap, reset_wal_file, + transfer_logical_changes, transfer_physical_changes, wait_full_body, wal_pull, wal_push, + WalPullResult, }, database_tape::DatabaseTape, errors::Error, @@ -95,7 +96,7 @@ impl DatabaseSyncEngine { /// Create database connection and appropriately configure it before use pub async fn connect(&self, coro: &Coro) -> Result> { - self.draft_tape.connect(coro).await + connect(coro, &self.draft_tape).await } /// Sync all new changes from remote DB and apply them locally @@ -122,7 +123,7 @@ impl DatabaseSyncEngine { tracing::info!("opened synced"); // we will start wal write session for Draft DB in order to hold write lock during transfer of changes - let mut draft_session = WalSession::new(self.draft_tape.connect(coro).await?); + let mut draft_session = WalSession::new(connect(coro, &self.draft_tape).await?); draft_session.begin()?; // mark Synced as dirty as we will start transfer of logical changes there and if we will fail in the middle - we will need to cleanup Synced db @@ -174,7 +175,7 @@ impl DatabaseSyncEngine { ); { let synced = self.io.open_tape(&self.synced_path, false)?; - checkpoint_wal_file(coro, &synced.connect_untracked()?).await?; + checkpoint_wal_file(coro, &connect_untracked(&synced)?).await?; update_meta( coro, self.protocol.as_ref(), @@ -288,7 +289,7 @@ impl DatabaseSyncEngine { self.synced_path, ); let synced = self.io.open_tape(&self.synced_path, false)?; - let synced_conn = synced.connect(coro).await?; + let synced_conn = connect(coro, &synced).await?; let mut wal = WalSession::new(synced_conn); let generation = self.meta().synced_generation; @@ -334,7 +335,7 @@ impl DatabaseSyncEngine { self.meta().client_unique_id ); let synced = self.io.open_tape(&self.synced_path, false)?; - let synced_conn = synced.connect(coro).await?; + let synced_conn = connect(coro, &synced).await?; let mut wal = WalSession::new(synced_conn); wal.begin()?; diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index 3de1a4ace..581da3d49 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -35,6 +35,18 @@ pub enum WalPushResult { NeedCheckpoint, } +pub async fn connect(coro: &Coro, tape: &DatabaseTape) -> Result> { + let conn = tape.connect(coro).await?; + conn.wal_disable_checkpoint(); + Ok(conn) +} + +pub fn connect_untracked(tape: &DatabaseTape) -> Result> { + let conn = tape.connect_untracked()?; + conn.wal_disable_checkpoint(); + Ok(conn) +} + /// Bootstrap multiple DB files from latest generation from remote pub async fn db_bootstrap( coro: &Coro, @@ -257,8 +269,8 @@ pub async fn transfer_logical_changes( bump_pull_gen: bool, ) -> Result<()> { tracing::debug!("transfer_logical_changes: client_id={client_id}"); - let source_conn = source.connect_untracked()?; - let target_conn = target.connect_untracked()?; + let source_conn = connect_untracked(source)?; + let target_conn = connect_untracked(target)?; // fetch last_change_id from the target DB in order to guarantee atomic replay of changes and avoid conflicts in case of failure let source_pull_gen = 'source_pull_gen: { @@ -333,7 +345,7 @@ pub async fn transfer_logical_changes( use_implicit_rowid: false, }; - let source_schema_cookie = source.connect_untracked()?.read_schema_version()?; + let source_schema_cookie = connect_untracked(source)?.read_schema_version()?; let mut session = target.start_replay_session(coro, replay_opts).await?; @@ -414,7 +426,7 @@ pub async fn transfer_physical_changes( ) -> Result { tracing::debug!("transfer_physical_changes: source_wal_match_watermark={source_wal_match_watermark}, source_sync_watermark={source_sync_watermark}, target_wal_match_watermark={target_wal_match_watermark}"); - let source_conn = source.connect(coro).await?; + let source_conn = connect(coro, source).await?; let mut source_session = WalSession::new(source_conn.clone()); source_session.begin()?;