diff --git a/sync/engine/src/database_sync_engine.rs b/sync/engine/src/database_sync_engine.rs index dcdc843cf..7f795ba29 100644 --- a/sync/engine/src/database_sync_engine.rs +++ b/sync/engine/src/database_sync_engine.rs @@ -3,7 +3,7 @@ use std::{ sync::{Arc, Mutex}, }; -use turso_core::OpenFlags; +use turso_core::{DatabaseStorage, OpenFlags}; use crate::{ database_replay_generator::DatabaseReplayGenerator, @@ -13,6 +13,7 @@ use crate::{ reset_wal_file, update_last_change_id, wait_all_results, wal_apply_from_file, wal_pull_to_file, PAGE_SIZE, WAL_FRAME_HEADER, WAL_FRAME_SIZE, }, + database_sync_partial_storage::PartialDatabaseStorage, database_tape::{ DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts, DatabaseReplaySession, DatabaseReplaySessionOpts, DatabaseTape, DatabaseTapeOpts, DatabaseWalSession, @@ -40,6 +41,7 @@ pub struct DatabaseSyncEngineOpts { pub protocol_version_hint: DatabaseSyncEngineProtocolVersion, pub bootstrap_if_empty: bool, pub reserved_bytes: usize, + pub partial: bool, } pub struct DatabaseSyncEngine { @@ -89,24 +91,35 @@ impl DatabaseSyncEngine

{ Some(meta) => meta, None if opts.bootstrap_if_empty => { let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4()); + let prefix = if opts.partial { + Some(128 * PAGE_SIZE) + } else { + None + }; let revision = bootstrap_db_file( coro, protocol.as_ref(), &io, main_db_path, opts.protocol_version_hint, + prefix, ) .await?; let meta = DatabaseMetadata { version: DATABASE_METADATA_VERSION.to_string(), client_unique_id, - synced_revision: Some(revision), + synced_revision: Some(revision.clone()), revert_since_wal_salt: None, revert_since_wal_watermark: 0, last_pushed_change_id_hint: 0, last_pushed_pull_gen_hint: 0, last_pull_unix_time: Some(io.now().secs), last_push_unix_time: None, + partial_bootstrap_server_revision: if opts.partial { + Some(revision.clone()) + } else { + None + }, }; tracing::info!("write meta after successful bootstrap: meta={meta:?}"); let completion = protocol.full_write(&meta_path, meta.dump()?)?; @@ -120,6 +133,11 @@ impl DatabaseSyncEngine

{ "deferred bootstrap is not supported for legacy protocol".to_string(), )); } + if opts.partial { + return Err(Error::DatabaseSyncEngineError( + "deferred bootstrap is not supported for partial sync".to_string(), + )); + } let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4()); let meta = DatabaseMetadata { version: DATABASE_METADATA_VERSION.to_string(), @@ -131,6 +149,7 @@ impl DatabaseSyncEngine

{ last_pushed_pull_gen_hint: 0, last_pull_unix_time: None, last_push_unix_time: None, + partial_bootstrap_server_revision: None, }; tracing::info!("write meta after successful bootstrap: meta={meta:?}"); let completion = protocol.full_write(&meta_path, meta.dump()?)?; @@ -156,7 +175,26 @@ impl DatabaseSyncEngine

{ } let db_file = io.open_file(main_db_path, turso_core::OpenFlags::Create, false)?; - let db_file = Arc::new(turso_core::storage::database::DatabaseFile::new(db_file)); + let mut db_file: Arc = + Arc::new(turso_core::storage::database::DatabaseFile::new(db_file)); + if opts.partial { + let Some(partial_bootstrap_server_revision) = &meta.partial_bootstrap_server_revision + else { + return Err(Error::DatabaseSyncEngineError( + "partial_bootstrap_server_revision must be set in the metadata".to_string(), + )); + }; + let DatabasePullRevision::V1 { revision } = &partial_bootstrap_server_revision else { + return Err(Error::DatabaseSyncEngineError( + "partial sync is supported only for V1 protocol".to_string(), + )); + }; + db_file = Arc::new(PartialDatabaseStorage::new( + db_file, + protocol.clone(), + revision.to_string(), + )); + } let main_db = turso_core::Database::open_with_flags( io.clone(), diff --git a/sync/engine/src/database_sync_operations.rs b/sync/engine/src/database_sync_operations.rs index 3cb069e98..e9c3831ef 100644 --- a/sync/engine/src/database_sync_operations.rs +++ b/sync/engine/src/database_sync_operations.rs @@ -2,6 +2,7 @@ use std::sync::{Arc, Mutex}; use bytes::BytesMut; use prost::Message; +use roaring::RoaringBitmap; use turso_core::{ types::{Text, WalFrameInfo}, Buffer, Completion, LimboError, OpenFlags, Value, @@ -298,6 +299,74 @@ pub async fn wal_pull_to_file_v1( }) } +/// Pull pages from remote +pub async fn pull_pages_v1( + coro: &Coro, + client: &C, + server_revision: &str, + pages: &[u32], +) -> Result> { + tracing::info!("pull_pages_v1: revision={server_revision}"); + let mut bytes = BytesMut::new(); + + let mut bitmap = RoaringBitmap::new(); + bitmap.extend(pages); + + let mut bitmap_bytes = Vec::with_capacity(bitmap.serialized_size()); + bitmap.serialize_into(&mut bitmap_bytes).map_err(|e| { + Error::DatabaseSyncEngineError(format!("unable to serialize pull page request: {e}")) + })?; + + let request = PullUpdatesReqProtoBody { + encoding: PageUpdatesEncodingReq::Raw as i32, + server_revision: server_revision.to_string(), + client_revision: String::new(), + long_poll_timeout_ms: 0, + server_pages: bitmap_bytes.into(), + client_pages: BytesMut::new().into(), + }; + let request = request.encode_to_vec(); + let completion = client.http( + "POST", + "/pull-updates", + Some(request), + &[ + ("content-type", "application/protobuf"), + ("accept-encoding", "application/protobuf"), + ], + )?; + let Some(header) = + wait_proto_message::(coro, &completion, &mut bytes).await? + else { + return Err(Error::DatabaseSyncEngineError( + "no header returned in the pull-updates protobuf call".to_string(), + )); + }; + tracing::info!("pull_pages_v1: got header={:?}", header); + + let mut pages = Vec::with_capacity(PAGE_SIZE * pages.len()); + + let mut page_data_opt = + wait_proto_message::(coro, &completion, &mut bytes).await?; + while let Some(page_data) = page_data_opt.take() { + let page_id = page_data.page_id; + tracing::info!("received page {}", page_id); + let page = decode_page(&header, page_data)?; + if page.len() != PAGE_SIZE { + return Err(Error::DatabaseSyncEngineError(format!( + "page has unexpected size: {} != {}", + page.len(), + PAGE_SIZE + ))); + } + pages.extend_from_slice(&page); + page_data_opt = wait_proto_message(coro, &completion, &mut bytes).await?; + tracing::info!("page_data_opt: {}", page_data_opt.is_some()); + } + + Ok(pages) +} + /// Pull updates from remote to the separate file pub async fn wal_pull_to_file_legacy( coro: &Coro, @@ -999,13 +1068,19 @@ pub async fn bootstrap_db_file( io: &Arc, main_db_path: &str, protocol: DatabaseSyncEngineProtocolVersion, + prefix: Option, ) -> Result { match protocol { DatabaseSyncEngineProtocolVersion::Legacy => { + if prefix.is_some() { + return Err(Error::DatabaseSyncEngineError( + "can't bootstrap prefix of database with legacy protocol".to_string(), + )); + } bootstrap_db_file_legacy(coro, client, io, main_db_path).await } DatabaseSyncEngineProtocolVersion::V1 => { - bootstrap_db_file_v1(coro, client, io, main_db_path).await + bootstrap_db_file_v1(coro, client, io, main_db_path, prefix).await } } } @@ -1015,17 +1090,37 @@ pub async fn bootstrap_db_file_v1( client: &C, io: &Arc, main_db_path: &str, + prefix: Option, ) -> Result { - let mut bytes = BytesMut::new(); + let mut bitmap = RoaringBitmap::new(); + if let Some(prefix) = prefix { + bitmap.insert_range(0..(prefix / PAGE_SIZE) as u32); + } + + let mut bitmap_bytes = Vec::with_capacity(bitmap.serialized_size()); + bitmap.serialize_into(&mut bitmap_bytes).map_err(|e| { + Error::DatabaseSyncEngineError(format!("unable to serialize bootstrap request: {e}")) + })?; + + let request = PullUpdatesReqProtoBody { + encoding: PageUpdatesEncodingReq::Raw as i32, + server_revision: String::new(), + client_revision: String::new(), + long_poll_timeout_ms: 0, + server_pages: bitmap_bytes.into(), + client_pages: BytesMut::new().into(), + }; + let request = request.encode_to_vec(); let completion = client.http( - "GET", + "POST", "/pull-updates", - None, + Some(request), &[ ("content-type", "application/protobuf"), ("accept-encoding", "application/protobuf"), ], )?; + let mut bytes = BytesMut::new(); let Some(header) = wait_proto_message::(coro, &completion, &mut bytes).await? else { @@ -1430,6 +1525,8 @@ mod tests { fn is_done(&self) -> crate::Result { Ok(self.data.borrow().is_empty()) } + + fn set_callback(&self, callback: Box ()>) {} } #[test] diff --git a/sync/engine/src/lib.rs b/sync/engine/src/lib.rs index 0546a15e7..6d8199651 100644 --- a/sync/engine/src/lib.rs +++ b/sync/engine/src/lib.rs @@ -1,6 +1,7 @@ pub mod database_replay_generator; pub mod database_sync_engine; pub mod database_sync_operations; +pub mod database_sync_partial_storage; pub mod database_tape; pub mod errors; pub mod io_operations; diff --git a/sync/engine/src/protocol_io.rs b/sync/engine/src/protocol_io.rs index 797a2aef7..19933b993 100644 --- a/sync/engine/src/protocol_io.rs +++ b/sync/engine/src/protocol_io.rs @@ -12,9 +12,10 @@ pub trait DataCompletion { fn status(&self) -> Result>; fn poll_data(&self) -> Result>; fn is_done(&self) -> Result; + fn set_callback(&self, callback: Box ()>); } -pub trait ProtocolIO { +pub trait ProtocolIO: Send + Sync + 'static { type DataCompletionBytes: DataCompletion; type DataCompletionTransform: DataCompletion; fn full_read(&self, path: &str) -> Result; @@ -30,4 +31,5 @@ pub trait ProtocolIO { body: Option>, headers: &[(&str, &str)], ) -> Result; + fn register(&self, callback: Box bool>); } diff --git a/sync/engine/src/types.rs b/sync/engine/src/types.rs index 4c08ee394..0ffc7a7b6 100644 --- a/sync/engine/src/types.rs +++ b/sync/engine/src/types.rs @@ -95,6 +95,7 @@ pub struct DatabaseMetadata { pub last_push_unix_time: Option, pub last_pushed_pull_gen_hint: i64, pub last_pushed_change_id_hint: i64, + pub partial_bootstrap_server_revision: Option, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]