support defered sync in the engine

This commit is contained in:
Nikita Sivukhin
2025-10-02 15:57:25 +04:00
parent 7016d9456e
commit 5ec2d96bc1
3 changed files with 57 additions and 19 deletions

View File

@@ -38,6 +38,8 @@ pub struct DatabaseSyncEngineOpts {
pub wal_pull_batch_size: u64,
pub long_poll_timeout: Option<std::time::Duration>,
pub protocol_version_hint: DatabaseSyncEngineProtocolVersion,
pub bootstrap_if_empty: bool,
pub reserved_bytes: usize,
}
pub struct DatabaseSyncEngine<P: ProtocolIO> {
@@ -73,9 +75,6 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
let meta_path = format!("{main_db_path}-info");
let changes_path = format!("{main_db_path}-changes");
let db_file = io.open_file(main_db_path, turso_core::OpenFlags::Create, false)?;
let db_file = Arc::new(turso_core::storage::database::DatabaseFile::new(db_file));
tracing::info!("init(path={}): opts={:?}", main_db_path, opts);
let completion = protocol.full_read(&meta_path)?;
@@ -88,7 +87,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
let meta = match meta {
Some(meta) => meta,
None => {
None if opts.bootstrap_if_empty => {
let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4());
let revision = bootstrap_db_file(
coro,
@@ -106,7 +105,31 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
revert_since_wal_watermark: 0,
last_pushed_change_id_hint: 0,
last_pushed_pull_gen_hint: 0,
last_pull_unix_time: io.now().secs,
last_pull_unix_time: Some(io.now().secs),
last_push_unix_time: None,
};
tracing::info!("write meta after successful bootstrap: meta={meta:?}");
let completion = protocol.full_write(&meta_path, meta.dump()?)?;
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
wait_all_results(coro, &completion).await?;
meta
}
None => {
if opts.protocol_version_hint == DatabaseSyncEngineProtocolVersion::Legacy {
return Err(Error::DatabaseSyncEngineError(
"deferred bootstrap is not supported for legacy protocol".to_string(),
));
}
let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4());
let meta = DatabaseMetadata {
version: DATABASE_METADATA_VERSION.to_string(),
client_unique_id,
synced_revision: None,
revert_since_wal_salt: None,
revert_since_wal_watermark: 0,
last_pushed_change_id_hint: 0,
last_pushed_pull_gen_hint: 0,
last_pull_unix_time: None,
last_push_unix_time: None,
};
tracing::info!("write meta after successful bootstrap: meta={meta:?}");
@@ -127,11 +150,14 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
tracing::info!("check if main db file exists");
let main_exists = io.try_open(main_db_path)?.is_some();
if !main_exists {
if !main_exists && meta.synced_revision.is_some() {
let error = "main DB file doesn't exists, but metadata is".to_string();
return Err(Error::DatabaseSyncEngineError(error));
}
let db_file = io.open_file(main_db_path, turso_core::OpenFlags::Create, false)?;
let db_file = Arc::new(turso_core::storage::database::DatabaseFile::new(db_file));
let main_db = turso_core::Database::open_with_flags(
io.clone(),
main_db_path,
@@ -140,6 +166,17 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
turso_core::DatabaseOpts::new().with_indexes(true),
None,
)?;
// DB wasn't bootstrapped but remote is encrypted - so we must properly set reserved bytes field in advance
if meta.synced_revision.is_none() && opts.reserved_bytes != 0 {
let conn = main_db.connect()?;
conn.set_reserved_bytes(opts.reserved_bytes as u8)?;
// write transaction forces allocation of root DB page
conn.execute("BEGIN IMMEDIATE")?;
conn.execute("COMMIT")?;
}
let tape_opts = DatabaseTapeOpts {
cdc_table: None,
cdc_mode: Some("full".to_string()),
@@ -162,11 +199,11 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
client_unique_id: meta.client_unique_id.clone(),
};
let synced_revision = meta.synced_revision.as_ref().unwrap();
if let DatabasePullRevision::Legacy {
let synced_revision = meta.synced_revision.as_ref();
if let Some(DatabasePullRevision::Legacy {
synced_frame_no: None,
..
} = synced_revision
}) = synced_revision
{
// sync WAL from the remote in case of bootstrap - all subsequent initializations will be fast
db.pull_changes_from_remote(coro).await?;
@@ -380,7 +417,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
let file = acquire_slot(&self.changes_file)?;
let now = self.io.now();
let revision = self.meta().synced_revision.clone().unwrap();
let revision = self.meta().synced_revision.clone();
let next_revision = wal_pull_to_file(
coro,
self.protocol.as_ref(),
@@ -427,7 +464,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
) -> Result<()> {
if remote_changes.file_slot.is_none() {
self.update_meta(coro, |m| {
m.last_pull_unix_time = remote_changes.time.secs;
m.last_pull_unix_time = Some(remote_changes.time.secs);
})
.await?;
return Ok(());
@@ -450,7 +487,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
m.revert_since_wal_watermark = revert_since_wal_watermark;
m.synced_revision = Some(remote_changes.revision);
m.last_pushed_change_id_hint = 0;
m.last_pull_unix_time = remote_changes.time.secs;
m.last_pull_unix_time = Some(remote_changes.time.secs);
})
.await?;
Ok(())

View File

@@ -153,7 +153,7 @@ pub async fn wal_apply_from_file<Ctx>(
coro.yield_(ProtocolCommand::IO).await?;
}
let info = WalFrameInfo::from_frame_header(buffer.as_slice());
tracing::debug!("got frame: {:?}", info);
tracing::info!("got frame: {:?}", info);
db_size = info.db_size;
session.append_page(info.page_no, &buffer.as_slice()[WAL_FRAME_HEADER..])?;
}
@@ -165,7 +165,7 @@ pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
frames_file: &Arc<dyn turso_core::File>,
revision: &DatabasePullRevision,
revision: &Option<DatabasePullRevision>,
wal_pull_batch_size: u64,
long_poll_timeout: Option<std::time::Duration>,
) -> Result<DatabasePullRevision> {
@@ -181,10 +181,10 @@ pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
coro.yield_(ProtocolCommand::IO).await?;
}
match revision {
DatabasePullRevision::Legacy {
Some(DatabasePullRevision::Legacy {
generation,
synced_frame_no,
} => {
}) => {
let start_frame = synced_frame_no.unwrap_or(0) + 1;
wal_pull_to_file_legacy(
coro,
@@ -196,9 +196,10 @@ pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
)
.await
}
DatabasePullRevision::V1 { revision } => {
Some(DatabasePullRevision::V1 { revision }) => {
wal_pull_to_file_v1(coro, client, frames_file, revision, long_poll_timeout).await
}
None => wal_pull_to_file_v1(coro, client, frames_file, "", long_poll_timeout).await,
}
}

View File

@@ -65,7 +65,7 @@ pub struct SyncEngineStats {
pub cdc_operations: i64,
pub main_wal_size: u64,
pub revert_wal_size: u64,
pub last_pull_unix_time: i64,
pub last_pull_unix_time: Option<i64>,
pub last_push_unix_time: Option<i64>,
pub revision: Option<String>,
}
@@ -90,7 +90,7 @@ pub struct DatabaseMetadata {
pub revert_since_wal_salt: Option<Vec<u32>>,
pub revert_since_wal_watermark: u64,
/// Unix time of last successful pull
pub last_pull_unix_time: i64,
pub last_pull_unix_time: Option<i64>,
/// Unix time of last successful push
pub last_push_unix_time: Option<i64>,
pub last_pushed_pull_gen_hint: i64,