From 5ec2d96bc142b8eec0ddffddd666a39aa810768a Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 2 Oct 2025 15:57:25 +0400 Subject: [PATCH] support defered sync in the engine --- sync/engine/src/database_sync_engine.rs | 61 +++++++++++++++++---- sync/engine/src/database_sync_operations.rs | 11 ++-- sync/engine/src/types.rs | 4 +- 3 files changed, 57 insertions(+), 19 deletions(-) diff --git a/sync/engine/src/database_sync_engine.rs b/sync/engine/src/database_sync_engine.rs index 0257ca629..0b37b52c1 100644 --- a/sync/engine/src/database_sync_engine.rs +++ b/sync/engine/src/database_sync_engine.rs @@ -38,6 +38,8 @@ pub struct DatabaseSyncEngineOpts { pub wal_pull_batch_size: u64, pub long_poll_timeout: Option, pub protocol_version_hint: DatabaseSyncEngineProtocolVersion, + pub bootstrap_if_empty: bool, + pub reserved_bytes: usize, } pub struct DatabaseSyncEngine { @@ -73,9 +75,6 @@ impl DatabaseSyncEngine

{ let meta_path = format!("{main_db_path}-info"); let changes_path = format!("{main_db_path}-changes"); - let db_file = io.open_file(main_db_path, turso_core::OpenFlags::Create, false)?; - let db_file = Arc::new(turso_core::storage::database::DatabaseFile::new(db_file)); - tracing::info!("init(path={}): opts={:?}", main_db_path, opts); let completion = protocol.full_read(&meta_path)?; @@ -88,7 +87,7 @@ impl DatabaseSyncEngine

{ let meta = match meta { Some(meta) => meta, - None => { + None if opts.bootstrap_if_empty => { let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4()); let revision = bootstrap_db_file( coro, @@ -106,7 +105,31 @@ impl DatabaseSyncEngine

{ revert_since_wal_watermark: 0, last_pushed_change_id_hint: 0, last_pushed_pull_gen_hint: 0, - last_pull_unix_time: io.now().secs, + last_pull_unix_time: Some(io.now().secs), + last_push_unix_time: None, + }; + tracing::info!("write meta after successful bootstrap: meta={meta:?}"); + let completion = protocol.full_write(&meta_path, meta.dump()?)?; + // todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated + wait_all_results(coro, &completion).await?; + meta + } + None => { + if opts.protocol_version_hint == DatabaseSyncEngineProtocolVersion::Legacy { + return Err(Error::DatabaseSyncEngineError( + "deferred bootstrap is not supported for legacy protocol".to_string(), + )); + } + let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4()); + let meta = DatabaseMetadata { + version: DATABASE_METADATA_VERSION.to_string(), + client_unique_id, + synced_revision: None, + revert_since_wal_salt: None, + revert_since_wal_watermark: 0, + last_pushed_change_id_hint: 0, + last_pushed_pull_gen_hint: 0, + last_pull_unix_time: None, last_push_unix_time: None, }; tracing::info!("write meta after successful bootstrap: meta={meta:?}"); @@ -127,11 +150,14 @@ impl DatabaseSyncEngine

{ tracing::info!("check if main db file exists"); let main_exists = io.try_open(main_db_path)?.is_some(); - if !main_exists { + if !main_exists && meta.synced_revision.is_some() { let error = "main DB file doesn't exists, but metadata is".to_string(); return Err(Error::DatabaseSyncEngineError(error)); } + let db_file = io.open_file(main_db_path, turso_core::OpenFlags::Create, false)?; + let db_file = Arc::new(turso_core::storage::database::DatabaseFile::new(db_file)); + let main_db = turso_core::Database::open_with_flags( io.clone(), main_db_path, @@ -140,6 +166,17 @@ impl DatabaseSyncEngine

{ turso_core::DatabaseOpts::new().with_indexes(true), None, )?; + + // DB wasn't bootstrapped but remote is encrypted - so we must properly set reserved bytes field in advance + if meta.synced_revision.is_none() && opts.reserved_bytes != 0 { + let conn = main_db.connect()?; + conn.set_reserved_bytes(opts.reserved_bytes as u8)?; + + // write transaction forces allocation of root DB page + conn.execute("BEGIN IMMEDIATE")?; + conn.execute("COMMIT")?; + } + let tape_opts = DatabaseTapeOpts { cdc_table: None, cdc_mode: Some("full".to_string()), @@ -162,11 +199,11 @@ impl DatabaseSyncEngine

{ client_unique_id: meta.client_unique_id.clone(), }; - let synced_revision = meta.synced_revision.as_ref().unwrap(); - if let DatabasePullRevision::Legacy { + let synced_revision = meta.synced_revision.as_ref(); + if let Some(DatabasePullRevision::Legacy { synced_frame_no: None, .. - } = synced_revision + }) = synced_revision { // sync WAL from the remote in case of bootstrap - all subsequent initializations will be fast db.pull_changes_from_remote(coro).await?; @@ -380,7 +417,7 @@ impl DatabaseSyncEngine

{ let file = acquire_slot(&self.changes_file)?; let now = self.io.now(); - let revision = self.meta().synced_revision.clone().unwrap(); + let revision = self.meta().synced_revision.clone(); let next_revision = wal_pull_to_file( coro, self.protocol.as_ref(), @@ -427,7 +464,7 @@ impl DatabaseSyncEngine

{ ) -> Result<()> { if remote_changes.file_slot.is_none() { self.update_meta(coro, |m| { - m.last_pull_unix_time = remote_changes.time.secs; + m.last_pull_unix_time = Some(remote_changes.time.secs); }) .await?; return Ok(()); @@ -450,7 +487,7 @@ impl DatabaseSyncEngine

{ m.revert_since_wal_watermark = revert_since_wal_watermark; m.synced_revision = Some(remote_changes.revision); m.last_pushed_change_id_hint = 0; - m.last_pull_unix_time = remote_changes.time.secs; + m.last_pull_unix_time = Some(remote_changes.time.secs); }) .await?; Ok(()) diff --git a/sync/engine/src/database_sync_operations.rs b/sync/engine/src/database_sync_operations.rs index 00d58360a..60f37f1ff 100644 --- a/sync/engine/src/database_sync_operations.rs +++ b/sync/engine/src/database_sync_operations.rs @@ -153,7 +153,7 @@ pub async fn wal_apply_from_file( coro.yield_(ProtocolCommand::IO).await?; } let info = WalFrameInfo::from_frame_header(buffer.as_slice()); - tracing::debug!("got frame: {:?}", info); + tracing::info!("got frame: {:?}", info); db_size = info.db_size; session.append_page(info.page_no, &buffer.as_slice()[WAL_FRAME_HEADER..])?; } @@ -165,7 +165,7 @@ pub async fn wal_pull_to_file( coro: &Coro, client: &C, frames_file: &Arc, - revision: &DatabasePullRevision, + revision: &Option, wal_pull_batch_size: u64, long_poll_timeout: Option, ) -> Result { @@ -181,10 +181,10 @@ pub async fn wal_pull_to_file( coro.yield_(ProtocolCommand::IO).await?; } match revision { - DatabasePullRevision::Legacy { + Some(DatabasePullRevision::Legacy { generation, synced_frame_no, - } => { + }) => { let start_frame = synced_frame_no.unwrap_or(0) + 1; wal_pull_to_file_legacy( coro, @@ -196,9 +196,10 @@ pub async fn wal_pull_to_file( ) .await } - DatabasePullRevision::V1 { revision } => { + Some(DatabasePullRevision::V1 { revision }) => { wal_pull_to_file_v1(coro, client, frames_file, revision, long_poll_timeout).await } + None => wal_pull_to_file_v1(coro, client, frames_file, "", long_poll_timeout).await, } } diff --git a/sync/engine/src/types.rs b/sync/engine/src/types.rs index 1b78e8cb1..4c08ee394 100644 --- a/sync/engine/src/types.rs +++ b/sync/engine/src/types.rs @@ -65,7 +65,7 @@ pub struct SyncEngineStats { pub cdc_operations: i64, pub main_wal_size: u64, pub revert_wal_size: u64, - pub last_pull_unix_time: i64, + pub last_pull_unix_time: Option, pub last_push_unix_time: Option, pub revision: Option, } @@ -90,7 +90,7 @@ pub struct DatabaseMetadata { pub revert_since_wal_salt: Option>, pub revert_since_wal_watermark: u64, /// Unix time of last successful pull - pub last_pull_unix_time: i64, + pub last_pull_unix_time: Option, /// Unix time of last successful push pub last_push_unix_time: Option, pub last_pushed_pull_gen_hint: i64,