add hooks to plug partial sync in the sync engine

This commit is contained in:
Nikita Sivukhin
2025-11-10 12:53:08 +04:00
parent dad7feffca
commit 34f1072071
5 changed files with 147 additions and 8 deletions

View File

@@ -3,7 +3,7 @@ use std::{
sync::{Arc, Mutex},
};
use turso_core::OpenFlags;
use turso_core::{DatabaseStorage, OpenFlags};
use crate::{
database_replay_generator::DatabaseReplayGenerator,
@@ -13,6 +13,7 @@ use crate::{
reset_wal_file, update_last_change_id, wait_all_results, wal_apply_from_file,
wal_pull_to_file, PAGE_SIZE, WAL_FRAME_HEADER, WAL_FRAME_SIZE,
},
database_sync_partial_storage::PartialDatabaseStorage,
database_tape::{
DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts, DatabaseReplaySession,
DatabaseReplaySessionOpts, DatabaseTape, DatabaseTapeOpts, DatabaseWalSession,
@@ -40,6 +41,7 @@ pub struct DatabaseSyncEngineOpts {
pub protocol_version_hint: DatabaseSyncEngineProtocolVersion,
pub bootstrap_if_empty: bool,
pub reserved_bytes: usize,
pub partial: bool,
}
pub struct DatabaseSyncEngine<P: ProtocolIO> {
@@ -89,24 +91,35 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
Some(meta) => meta,
None if opts.bootstrap_if_empty => {
let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4());
let prefix = if opts.partial {
Some(128 * PAGE_SIZE)
} else {
None
};
let revision = bootstrap_db_file(
coro,
protocol.as_ref(),
&io,
main_db_path,
opts.protocol_version_hint,
prefix,
)
.await?;
let meta = DatabaseMetadata {
version: DATABASE_METADATA_VERSION.to_string(),
client_unique_id,
synced_revision: Some(revision),
synced_revision: Some(revision.clone()),
revert_since_wal_salt: None,
revert_since_wal_watermark: 0,
last_pushed_change_id_hint: 0,
last_pushed_pull_gen_hint: 0,
last_pull_unix_time: Some(io.now().secs),
last_push_unix_time: None,
partial_bootstrap_server_revision: if opts.partial {
Some(revision.clone())
} else {
None
},
};
tracing::info!("write meta after successful bootstrap: meta={meta:?}");
let completion = protocol.full_write(&meta_path, meta.dump()?)?;
@@ -120,6 +133,11 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
"deferred bootstrap is not supported for legacy protocol".to_string(),
));
}
if opts.partial {
return Err(Error::DatabaseSyncEngineError(
"deferred bootstrap is not supported for partial sync".to_string(),
));
}
let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4());
let meta = DatabaseMetadata {
version: DATABASE_METADATA_VERSION.to_string(),
@@ -131,6 +149,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
last_pushed_pull_gen_hint: 0,
last_pull_unix_time: None,
last_push_unix_time: None,
partial_bootstrap_server_revision: None,
};
tracing::info!("write meta after successful bootstrap: meta={meta:?}");
let completion = protocol.full_write(&meta_path, meta.dump()?)?;
@@ -156,7 +175,26 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
}
let db_file = io.open_file(main_db_path, turso_core::OpenFlags::Create, false)?;
let db_file = Arc::new(turso_core::storage::database::DatabaseFile::new(db_file));
let mut db_file: Arc<dyn DatabaseStorage> =
Arc::new(turso_core::storage::database::DatabaseFile::new(db_file));
if opts.partial {
let Some(partial_bootstrap_server_revision) = &meta.partial_bootstrap_server_revision
else {
return Err(Error::DatabaseSyncEngineError(
"partial_bootstrap_server_revision must be set in the metadata".to_string(),
));
};
let DatabasePullRevision::V1 { revision } = &partial_bootstrap_server_revision else {
return Err(Error::DatabaseSyncEngineError(
"partial sync is supported only for V1 protocol".to_string(),
));
};
db_file = Arc::new(PartialDatabaseStorage::new(
db_file,
protocol.clone(),
revision.to_string(),
));
}
let main_db = turso_core::Database::open_with_flags(
io.clone(),