From a25e3e76ebeeecb91cd4585cc95e73bd6fba0ba4 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 12 Nov 2025 13:19:31 +0400 Subject: [PATCH] wip --- sync/engine/src/database_sync_engine.rs | 38 ++--- sync/engine/src/database_sync_operations.rs | 50 ++++--- .../src/database_sync_partial_storage.rs | 140 ------------------ sync/engine/src/lib.rs | 5 +- sync/engine/src/server_proto.rs | 4 +- 5 files changed, 56 insertions(+), 181 deletions(-) delete mode 100644 sync/engine/src/database_sync_partial_storage.rs diff --git a/sync/engine/src/database_sync_engine.rs b/sync/engine/src/database_sync_engine.rs index 18601feae..6a09d153d 100644 --- a/sync/engine/src/database_sync_engine.rs +++ b/sync/engine/src/database_sync_engine.rs @@ -10,13 +10,13 @@ use turso_core::{DatabaseStorage, OpenFlags}; use crate::{ database_replay_generator::DatabaseReplayGenerator, + database_sync_lazy_storage::LazyDatabaseStorage, database_sync_operations::{ acquire_slot, apply_transformation, bootstrap_db_file, connect_untracked, count_local_changes, has_table, push_logical_changes, read_last_change_id, read_wal_salt, reset_wal_file, update_last_change_id, wait_all_results, wal_apply_from_file, wal_pull_to_file, ProtocolIoStats, PAGE_SIZE, WAL_FRAME_HEADER, WAL_FRAME_SIZE, }, - database_sync_partial_storage::PartialDatabaseStorage, database_tape::{ DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts, DatabaseReplaySession, DatabaseReplaySessionOpts, DatabaseTape, DatabaseTapeOpts, DatabaseWalSession, @@ -34,6 +34,12 @@ use crate::{ Result, }; +#[derive(Clone, Debug)] +pub enum PartialBootstrapStrategy { + Prefix { length: usize }, + Query { query: String }, +} + #[derive(Clone, Debug)] pub struct DatabaseSyncEngineOpts { pub client_name: String, @@ -44,7 +50,7 @@ pub struct DatabaseSyncEngineOpts { pub protocol_version_hint: DatabaseSyncEngineProtocolVersion, pub bootstrap_if_empty: bool, pub reserved_bytes: usize, - pub partial: bool, + pub partial_bootstrap_strategy: Option, } pub struct DataStats { @@ -99,7 +105,7 @@ impl DatabaseSyncEngine

{ io: Arc, protocol: Arc

, main_db_path: &str, - opts: DatabaseSyncEngineOpts, + mut opts: DatabaseSyncEngineOpts, ) -> Result { let main_db_wal_path = format!("{main_db_path}-wal"); let revert_db_wal_path = format!("{main_db_path}-wal-revert"); @@ -116,23 +122,20 @@ impl DatabaseSyncEngine

{ Some(DatabaseMetadata::load(&data)?) }; let protocol = ProtocolIoStats::new(protocol); + let partial_bootstrap_strategy = opts.partial_bootstrap_strategy.take(); + let partial = partial_bootstrap_strategy.is_some(); let meta = match meta { Some(meta) => meta, None if opts.bootstrap_if_empty => { let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4()); - let prefix = if opts.partial { - Some(128 * PAGE_SIZE) - } else { - None - }; let revision = bootstrap_db_file( coro, &protocol, &io, main_db_path, opts.protocol_version_hint, - prefix, + partial_bootstrap_strategy, ) .await?; let meta = DatabaseMetadata { @@ -145,7 +148,7 @@ impl DatabaseSyncEngine

{ last_pushed_pull_gen_hint: 0, last_pull_unix_time: Some(io.now().secs), last_push_unix_time: None, - partial_bootstrap_server_revision: if opts.partial { + partial_bootstrap_server_revision: if partial { Some(revision.clone()) } else { None @@ -163,7 +166,7 @@ impl DatabaseSyncEngine

{ "deferred bootstrap is not supported for legacy protocol".to_string(), )); } - if opts.partial { + if partial { return Err(Error::DatabaseSyncEngineError( "deferred bootstrap is not supported for partial sync".to_string(), )); @@ -205,9 +208,7 @@ impl DatabaseSyncEngine

{ } let db_file = io.open_file(main_db_path, turso_core::OpenFlags::Create, false)?; - let mut db_file: Arc = - Arc::new(turso_core::storage::database::DatabaseFile::new(db_file)); - if opts.partial { + let db_file: Arc = if partial { let Some(partial_bootstrap_server_revision) = &meta.partial_bootstrap_server_revision else { return Err(Error::DatabaseSyncEngineError( @@ -219,12 +220,15 @@ impl DatabaseSyncEngine

{ "partial sync is supported only for V1 protocol".to_string(), )); }; - db_file = Arc::new(PartialDatabaseStorage::new( + Arc::new(LazyDatabaseStorage::new( db_file, + None, // todo(sivukhin): allocate dirty file for FS IO protocol.clone(), revision.to_string(), - )); - } + )) + } else { + Arc::new(turso_core::storage::database::DatabaseFile::new(db_file)) + }; let main_db = turso_core::Database::open_with_flags( io.clone(), diff --git a/sync/engine/src/database_sync_operations.rs b/sync/engine/src/database_sync_operations.rs index 67ac483b5..906bdb7da 100644 --- a/sync/engine/src/database_sync_operations.rs +++ b/sync/engine/src/database_sync_operations.rs @@ -13,7 +13,7 @@ use turso_core::{ use crate::{ database_replay_generator::DatabaseReplayGenerator, - database_sync_engine::{DataStats, DatabaseSyncEngineOpts}, + database_sync_engine::{DataStats, DatabaseSyncEngineOpts, PartialBootstrapStrategy}, database_tape::{ run_stmt_expect_one_row, run_stmt_ignore_rows, DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape, DatabaseWalSession, @@ -256,7 +256,8 @@ pub async fn wal_pull_to_file_v1( server_revision: String::new(), client_revision: revision.to_string(), long_poll_timeout_ms: long_poll_timeout.map(|x| x.as_millis() as u32).unwrap_or(0), - server_pages: BytesMut::new().into(), + server_pages_selector: BytesMut::new().into(), + server_query_selector: String::new(), client_pages: BytesMut::new().into(), }; let request = request.encode_to_vec(); @@ -366,7 +367,8 @@ pub async fn pull_pages_v1( server_revision: server_revision.to_string(), client_revision: String::new(), long_poll_timeout_ms: 0, - server_pages: bitmap_bytes.into(), + server_pages_selector: bitmap_bytes.into(), + server_query_selector: String::new(), client_pages: BytesMut::new().into(), }; let request = request.encode_to_vec(); @@ -1122,11 +1124,11 @@ pub async fn bootstrap_db_file( io: &Arc, main_db_path: &str, protocol: DatabaseSyncEngineProtocolVersion, - prefix: Option, + partial_bootstrap_strategy: Option, ) -> Result { match protocol { DatabaseSyncEngineProtocolVersion::Legacy => { - if prefix.is_some() { + if partial_bootstrap_strategy.is_some() { return Err(Error::DatabaseSyncEngineError( "can't bootstrap prefix of database with legacy protocol".to_string(), )); @@ -1134,7 +1136,7 @@ pub async fn bootstrap_db_file( bootstrap_db_file_legacy(coro, client, io, main_db_path).await } DatabaseSyncEngineProtocolVersion::V1 => { - bootstrap_db_file_v1(coro, client, io, main_db_path, prefix).await + bootstrap_db_file_v1(coro, client, io, main_db_path, partial_bootstrap_strategy).await } } } @@ -1144,22 +1146,25 @@ pub async fn bootstrap_db_file_v1( client: &ProtocolIoStats, io: &Arc, main_db_path: &str, - prefix: Option, + bootstrap: Option, ) -> Result { - let bitmap_bytes = { - if let Some(prefix) = prefix { - let mut bitmap = RoaringBitmap::new(); - bitmap.insert_range(0..(prefix / PAGE_SIZE) as u32); - let mut bitmap_bytes = Vec::with_capacity(bitmap.serialized_size()); - bitmap.serialize_into(&mut bitmap_bytes).map_err(|e| { - Error::DatabaseSyncEngineError(format!( - "unable to serialize bootstrap request: {e}" - )) - })?; - bitmap_bytes - } else { - Vec::new() - } + let server_pages_selector = if let Some(PartialBootstrapStrategy::Prefix { length }) = + &bootstrap + { + let mut bitmap = RoaringBitmap::new(); + bitmap.insert_range(0..(*length / PAGE_SIZE) as u32); + let mut bitmap_bytes = Vec::with_capacity(bitmap.serialized_size()); + bitmap.serialize_into(&mut bitmap_bytes).map_err(|e| { + Error::DatabaseSyncEngineError(format!("unable to serialize bootstrap request: {e}")) + })?; + bitmap_bytes + } else { + Vec::new() + }; + let server_query_selector = if let Some(PartialBootstrapStrategy::Query { query }) = bootstrap { + query + } else { + String::new() }; let request = PullUpdatesReqProtoBody { @@ -1167,7 +1172,8 @@ pub async fn bootstrap_db_file_v1( server_revision: String::new(), client_revision: String::new(), long_poll_timeout_ms: 0, - server_pages: bitmap_bytes.into(), + server_pages_selector: server_pages_selector.into(), + server_query_selector: server_query_selector, client_pages: BytesMut::new().into(), }; let request = request.encode_to_vec(); diff --git a/sync/engine/src/database_sync_partial_storage.rs b/sync/engine/src/database_sync_partial_storage.rs deleted file mode 100644 index 096151b07..000000000 --- a/sync/engine/src/database_sync_partial_storage.rs +++ /dev/null @@ -1,140 +0,0 @@ -use std::sync::Arc; - -use turso_core::{Completion, CompletionError, DatabaseStorage}; - -use crate::{ - database_sync_operations::{pull_pages_v1, ProtocolIoStats, PAGE_SIZE}, - errors, - protocol_io::ProtocolIO, - types::Coro, -}; - -pub struct PartialDatabaseStorage { - base: Arc, - protocol: ProtocolIoStats

, - server_revision: String, -} - -impl PartialDatabaseStorage

{ - pub fn new( - base: Arc, - protocol: ProtocolIoStats

, - server_revision: String, - ) -> Self { - Self { - base, - protocol, - server_revision, - } - } -} - -impl DatabaseStorage for PartialDatabaseStorage

{ - fn read_header(&self, c: turso_core::Completion) -> turso_core::Result { - assert!( - !self.base.has_hole(0, PAGE_SIZE)?, - "first page must be filled" - ); - self.base.read_header(c) - } - - fn read_page( - &self, - page_idx: usize, - io_ctx: &turso_core::IOContext, - c: turso_core::Completion, - ) -> turso_core::Result { - assert!( - io_ctx.encryption_context().is_none(), - "encryption or checksum are not supported with partial sync" - ); - if !self.base.has_hole((page_idx - 1) * PAGE_SIZE, PAGE_SIZE)? { - return self.base.read_page(page_idx, io_ctx, c); - } - tracing::info!( - "PartialDatabaseStorage::read_page(page_idx={}): read page from the remote server", - page_idx - ); - let mut generator = genawaiter::sync::Gen::new({ - let protocol = self.protocol.clone(); - let server_revision = self.server_revision.clone(); - let base = self.base.clone(); - let io_ctx = io_ctx.clone(); - let c = c.clone(); - |coro| async move { - let coro = Coro::new((), coro); - let result = - pull_pages_v1(&coro, &protocol, &server_revision, &[(page_idx - 1) as u32]) - .await; - match result { - Ok(page) => { - let read = c.as_read(); - let buf = read.buf_arc(); - buf.as_mut_slice().copy_from_slice(&page); - let write = Completion::new_write(move |result| { - let Ok(_) = result else { - panic!("unexpected write error: {result:?}"); - }; - c.complete(page.len() as i32); - }); - let _ = base.write_page(page_idx, buf.clone(), &io_ctx, write)?; - Ok::<(), errors::Error>(()) - } - Err(err) => { - tracing::error!("faile to fetch path from remote server: {err}"); - c.error(CompletionError::IOError(std::io::ErrorKind::Other)); - Err(err) - } - } - } - }); - self.protocol - .add_work(Box::new(move || match generator.resume_with(Ok(())) { - genawaiter::GeneratorState::Yielded(_) => false, - genawaiter::GeneratorState::Complete(_) => true, - })); - Ok(c) - } - - fn write_page( - &self, - page_idx: usize, - buffer: std::sync::Arc, - io_ctx: &turso_core::IOContext, - c: turso_core::Completion, - ) -> turso_core::Result { - self.base.write_page(page_idx, buffer, io_ctx, c) - } - - fn write_pages( - &self, - first_page_idx: usize, - page_size: usize, - buffers: Vec>, - io_ctx: &turso_core::IOContext, - c: turso_core::Completion, - ) -> turso_core::Result { - self.base - .write_pages(first_page_idx, page_size, buffers, io_ctx, c) - } - - fn sync(&self, c: turso_core::Completion) -> turso_core::Result { - self.base.sync(c) - } - - fn size(&self) -> turso_core::Result { - self.base.size() - } - - fn truncate( - &self, - len: usize, - c: turso_core::Completion, - ) -> turso_core::Result { - self.base.truncate(len, c) - } - - fn has_hole(&self, _pos: usize, _len: usize) -> turso_core::Result { - Ok(false) - } -} diff --git a/sync/engine/src/lib.rs b/sync/engine/src/lib.rs index 6d8199651..1bf3368e4 100644 --- a/sync/engine/src/lib.rs +++ b/sync/engine/src/lib.rs @@ -1,7 +1,7 @@ pub mod database_replay_generator; pub mod database_sync_engine; +pub mod database_sync_lazy_storage; pub mod database_sync_operations; -pub mod database_sync_partial_storage; pub mod database_tape; pub mod errors; pub mod io_operations; @@ -10,6 +10,9 @@ pub mod server_proto; pub mod types; pub mod wal_session; +// #[cfg(unix)] +pub mod sparse_io; + pub type Result = std::result::Result; #[cfg(test)] diff --git a/sync/engine/src/server_proto.rs b/sync/engine/src/server_proto.rs index bca505d68..e74f9171d 100644 --- a/sync/engine/src/server_proto.rs +++ b/sync/engine/src/server_proto.rs @@ -23,7 +23,9 @@ pub struct PullUpdatesReqProtoBody { #[prost(uint32, tag = "4")] pub long_poll_timeout_ms: u32, #[prost(bytes, tag = "5")] - pub server_pages: Bytes, + pub server_pages_selector: Bytes, + #[prost(string, tag = "7")] + pub server_query_selector: String, #[prost(bytes, tag = "6")] pub client_pages: Bytes, }