disable auto-checkpoint for opened connections in the sync engine

This commit is contained in:
Nikita Sivukhin
2025-08-13 12:49:40 +04:00
parent 6e0ad0a483
commit 0a12b2d74a
2 changed files with 24 additions and 11 deletions

View File

@@ -2,8 +2,9 @@ use std::sync::Arc;
use crate::{
database_sync_operations::{
checkpoint_wal_file, db_bootstrap, reset_wal_file, transfer_logical_changes,
transfer_physical_changes, wait_full_body, wal_pull, wal_push, WalPullResult,
checkpoint_wal_file, connect, connect_untracked, db_bootstrap, reset_wal_file,
transfer_logical_changes, transfer_physical_changes, wait_full_body, wal_pull, wal_push,
WalPullResult,
},
database_tape::DatabaseTape,
errors::Error,
@@ -95,7 +96,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
/// Create database connection and appropriately configure it before use
pub async fn connect(&self, coro: &Coro) -> Result<Arc<turso_core::Connection>> {
self.draft_tape.connect(coro).await
connect(coro, &self.draft_tape).await
}
/// Sync all new changes from remote DB and apply them locally
@@ -122,7 +123,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
tracing::info!("opened synced");
// we will start wal write session for Draft DB in order to hold write lock during transfer of changes
let mut draft_session = WalSession::new(self.draft_tape.connect(coro).await?);
let mut draft_session = WalSession::new(connect(coro, &self.draft_tape).await?);
draft_session.begin()?;
// mark Synced as dirty as we will start transfer of logical changes there and if we will fail in the middle - we will need to cleanup Synced db
@@ -174,7 +175,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
);
{
let synced = self.io.open_tape(&self.synced_path, false)?;
checkpoint_wal_file(coro, &synced.connect_untracked()?).await?;
checkpoint_wal_file(coro, &connect_untracked(&synced)?).await?;
update_meta(
coro,
self.protocol.as_ref(),
@@ -288,7 +289,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
self.synced_path,
);
let synced = self.io.open_tape(&self.synced_path, false)?;
let synced_conn = synced.connect(coro).await?;
let synced_conn = connect(coro, &synced).await?;
let mut wal = WalSession::new(synced_conn);
let generation = self.meta().synced_generation;
@@ -334,7 +335,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
self.meta().client_unique_id
);
let synced = self.io.open_tape(&self.synced_path, false)?;
let synced_conn = synced.connect(coro).await?;
let synced_conn = connect(coro, &synced).await?;
let mut wal = WalSession::new(synced_conn);
wal.begin()?;

View File

@@ -35,6 +35,18 @@ pub enum WalPushResult {
NeedCheckpoint,
}
pub async fn connect(coro: &Coro, tape: &DatabaseTape) -> Result<Arc<turso_core::Connection>> {
let conn = tape.connect(coro).await?;
conn.wal_disable_checkpoint();
Ok(conn)
}
pub fn connect_untracked(tape: &DatabaseTape) -> Result<Arc<turso_core::Connection>> {
let conn = tape.connect_untracked()?;
conn.wal_disable_checkpoint();
Ok(conn)
}
/// Bootstrap multiple DB files from latest generation from remote
pub async fn db_bootstrap<C: ProtocolIO>(
coro: &Coro,
@@ -257,8 +269,8 @@ pub async fn transfer_logical_changes(
bump_pull_gen: bool,
) -> Result<()> {
tracing::debug!("transfer_logical_changes: client_id={client_id}");
let source_conn = source.connect_untracked()?;
let target_conn = target.connect_untracked()?;
let source_conn = connect_untracked(source)?;
let target_conn = connect_untracked(target)?;
// fetch last_change_id from the target DB in order to guarantee atomic replay of changes and avoid conflicts in case of failure
let source_pull_gen = 'source_pull_gen: {
@@ -333,7 +345,7 @@ pub async fn transfer_logical_changes(
use_implicit_rowid: false,
};
let source_schema_cookie = source.connect_untracked()?.read_schema_version()?;
let source_schema_cookie = connect_untracked(source)?.read_schema_version()?;
let mut session = target.start_replay_session(coro, replay_opts).await?;
@@ -414,7 +426,7 @@ pub async fn transfer_physical_changes(
) -> Result<u64> {
tracing::debug!("transfer_physical_changes: source_wal_match_watermark={source_wal_match_watermark}, source_sync_watermark={source_sync_watermark}, target_wal_match_watermark={target_wal_match_watermark}");
let source_conn = source.connect(coro).await?;
let source_conn = connect(coro, source).await?;
let mut source_session = WalSession::new(source_conn.clone());
source_session.begin()?;