Merge 'turso-sync: support checkpoint' from Nikita Sivukhin

remote protocol can require client to perform checkpoint of the WAL.
This PR supports that need in the sync-engine implementation

Reviewed-by: Preston Thorpe <preston@turso.tech>

Closes #2565
This commit is contained in:
Jussi Saurio
2025-08-13 08:49:31 +03:00
committed by GitHub
4 changed files with 243 additions and 70 deletions

View File

@@ -918,7 +918,11 @@ impl Wal for WalFile {
page: PageRef,
buffer_pool: Arc<BufferPool>,
) -> Result<Completion> {
tracing::debug!("read_frame({})", frame_id);
tracing::debug!(
"read_frame(page_idx = {}, frame_id = {})",
page.get().id,
frame_id
);
let offset = self.frame_offset(frame_id);
page.set_locked();
let frame = page.clone();
@@ -969,6 +973,9 @@ impl Wal for WalFile {
db_size: u64,
page: &[u8],
) -> Result<()> {
if self.get_max_frame_in_wal() == 0 {
self.ensure_header_if_needed()?;
}
tracing::debug!("write_raw_frame({})", frame_id);
if page.len() != self.page_size() as usize {
return Err(LimboError::InvalidArgument(format!(

View File

@@ -2,8 +2,8 @@ use std::sync::Arc;
use crate::{
database_sync_operations::{
db_bootstrap, reset_wal_file, transfer_logical_changes, transfer_physical_changes,
wait_full_body, wal_pull, wal_push, WalPullResult,
checkpoint_wal_file, db_bootstrap, reset_wal_file, transfer_logical_changes,
transfer_physical_changes, wait_full_body, wal_pull, wal_push, WalPullResult,
},
errors::Error,
io_operations::IoOperations,
@@ -107,58 +107,87 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
// reset Synced DB if it wasn't properly cleaned-up on previous "sync-method" attempt
self.reset_synced_if_dirty(coro).await?;
// update Synced DB with fresh changes from remote
self.pull_synced_from_remote(coro).await?;
loop {
// update Synced DB with fresh changes from remote
let pull_result = self.pull_synced_from_remote(coro).await?;
// we will "replay" Synced WAL to the Draft WAL later without pushing it to the remote
// so, we pass 'capture: true' as we need to preserve all changes for future push of WAL
let draft = self.io.open_tape(&self.draft_path, true)?;
let synced = self.io.open_tape(&self.synced_path, true)?;
{
// we will "replay" Synced WAL to the Draft WAL later without pushing it to the remote
// so, we pass 'capture: true' as we need to preserve all changes for future push of WAL
let draft = self.io.open_tape(&self.draft_path, true)?;
tracing::info!("opened draft");
let synced = self.io.open_tape(&self.synced_path, true)?;
tracing::info!("opened synced");
{
// we will start wal write session for Draft DB in order to hold write lock during transfer of changes
let mut draft_session = WalSession::new(draft.connect(coro).await?);
draft_session.begin()?;
// we will start wal write session for Draft DB in order to hold write lock during transfer of changes
let mut draft_session = WalSession::new(draft.connect(coro).await?);
draft_session.begin()?;
// mark Synced as dirty as we will start transfer of logical changes there and if we will fail in the middle - we will need to cleanup Synced db
self.synced_is_dirty = true;
// mark Synced as dirty as we will start transfer of logical changes there and if we will fail in the middle - we will need to cleanup Synced db
self.synced_is_dirty = true;
// transfer logical changes to the Synced DB in order to later execute physical "rebase" operation
let client_id = &self.meta().client_unique_id;
transfer_logical_changes(coro, &draft, &synced, client_id, true).await?;
// transfer logical changes to the Synced DB in order to later execute physical "rebase" operation
let client_id = &self.meta().client_unique_id;
transfer_logical_changes(coro, &draft, &synced, client_id, true).await?;
// now we are ready to do the rebase: let's transfer physical changes from Synced to Draft
let synced_wal_watermark = self.meta().synced_wal_match_watermark;
let synced_sync_watermark = self.meta().synced_frame_no.expect(
"synced_frame_no must be set as we call pull_synced_from_remote before that",
// now we are ready to do the rebase: let's transfer physical changes from Synced to Draft
let synced_wal_watermark = self.meta().synced_wal_match_watermark;
let synced_sync_watermark = self.meta().synced_frame_no.expect(
"synced_frame_no must be set as we call pull_synced_from_remote before that",
);
let draft_wal_watermark = self.meta().draft_wal_match_watermark;
let draft_sync_watermark = transfer_physical_changes(
coro,
&synced,
draft_session,
synced_wal_watermark,
synced_sync_watermark,
draft_wal_watermark,
)
.await?;
update_meta(
coro,
self.protocol.as_ref(),
&self.meta_path,
&mut self.meta,
|m| {
m.draft_wal_match_watermark = draft_sync_watermark;
m.synced_wal_match_watermark = synced_sync_watermark;
},
)
.await?;
}
// Synced DB is 100% dirty now - let's reset it
assert!(self.synced_is_dirty);
self.reset_synced_if_dirty(coro).await?;
let WalPullResult::NeedCheckpoint = pull_result else {
break;
};
tracing::debug!(
"ready to checkpoint synced db file at {:?}, generation={}",
self.synced_path,
self.meta().synced_generation
);
let draft_wal_watermark = self.meta().draft_wal_match_watermark;
let draft_sync_watermark = transfer_physical_changes(
coro,
&synced,
draft_session,
synced_wal_watermark,
synced_sync_watermark,
draft_wal_watermark,
)
.await?;
update_meta(
coro,
self.protocol.as_ref(),
&self.meta_path,
&mut self.meta,
|m| {
m.draft_wal_match_watermark = draft_sync_watermark;
m.synced_wal_match_watermark = synced_sync_watermark;
},
)
.await?;
{
let synced = self.io.open_tape(&self.synced_path, false)?;
checkpoint_wal_file(coro, &synced.connect_untracked()?).await?;
update_meta(
coro,
self.protocol.as_ref(),
&self.meta_path,
&mut self.meta,
|m| {
m.synced_generation += 1;
m.synced_frame_no = Some(0);
m.synced_wal_match_watermark = 0;
},
)
.await?;
}
}
// Synced DB is 100% dirty now - let's reset it
assert!(self.synced_is_dirty);
self.reset_synced_if_dirty(coro).await?;
Ok(())
}
@@ -251,7 +280,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
Ok(())
}
async fn pull_synced_from_remote(&mut self, coro: &Coro) -> Result<()> {
async fn pull_synced_from_remote(&mut self, coro: &Coro) -> Result<WalPullResult> {
tracing::debug!(
"pull_synced_from_remote: draft={:?}, synced={:?}",
self.draft_path,
@@ -286,16 +315,12 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
)
.await?
{
WalPullResult::Done => return Ok(()),
WalPullResult::Done => return Ok(WalPullResult::Done),
WalPullResult::NeedCheckpoint => return Ok(WalPullResult::NeedCheckpoint),
WalPullResult::PullMore => {
start_frame = end_frame;
continue;
}
WalPullResult::NeedCheckpoint => {
return Err(Error::DatabaseSyncEngineError(
"checkpoint is temporary not supported".to_string(),
));
}
}
}
}
@@ -579,6 +604,7 @@ pub mod tests {
client_name: "id-1".to_string(),
wal_pull_batch_size: 1,
};
runner.init(local_path, opts).await.unwrap();
protocol
@@ -622,6 +648,70 @@ pub mod tests {
});
}
#[test]
pub fn test_sync_single_db_checkpoint() {
deterministic_runtime(async || {
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let dir = tempfile::TempDir::new().unwrap();
let server_path = dir.path().join("server.db");
let ctx = Arc::new(TestContext::new(seed_u64()));
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
let local_path = dir.path().join("local.db");
let opts = DatabaseSyncEngineOpts {
client_name: "id-1".to_string(),
wal_pull_batch_size: 1,
};
runner.init(local_path, opts).await.unwrap();
protocol
.server
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)", ())
.await
.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES (1, randomblob(4 * 4096))", ())
.await
.unwrap();
protocol.server.checkpoint().await.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES (2, randomblob(5 * 4096))", ())
.await
.unwrap();
protocol.server.checkpoint().await.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES (3, randomblob(6 * 4096))", ())
.await
.unwrap();
protocol
.server
.execute("INSERT INTO t VALUES (4, randomblob(7 * 4096))", ())
.await
.unwrap();
let conn = runner.connect().await.unwrap();
runner.pull().await.unwrap();
assert_eq!(
query_rows(&conn, "SELECT x, length(y) FROM t")
.await
.unwrap(),
vec![
vec![turso::Value::Integer(1), turso::Value::Integer(4 * 4096)],
vec![turso::Value::Integer(2), turso::Value::Integer(5 * 4096)],
vec![turso::Value::Integer(3), turso::Value::Integer(6 * 4096)],
vec![turso::Value::Integer(4), turso::Value::Integer(7 * 4096)],
]
);
});
}
#[test]
pub fn test_sync_single_db_full_syncs() {
deterministic_runtime(async || {

View File

@@ -30,6 +30,11 @@ pub enum WalPullResult {
NeedCheckpoint,
}
pub enum WalPushResult {
Ok { baton: Option<String> },
NeedCheckpoint,
}
/// Bootstrap multiple DB files from latest generation from remote
pub async fn db_bootstrap<C: ProtocolIO>(
coro: &Coro,
@@ -186,12 +191,12 @@ pub async fn wal_push<C: ProtocolIO>(
generation: u64,
start_frame: u64,
end_frame: u64,
) -> Result<Option<String>> {
) -> Result<WalPushResult> {
assert!(wal_session.in_txn());
tracing::debug!("wal_push: baton={baton:?}, generation={generation}, start_frame={start_frame}, end_frame={end_frame}");
if start_frame == end_frame {
return Ok(None);
return Ok(WalPushResult::Ok { baton: None });
}
let mut frames_data = Vec::with_capacity((end_frame - start_frame) as usize * WAL_FRAME_SIZE);
@@ -216,17 +221,21 @@ pub async fn wal_push<C: ProtocolIO>(
frames_data,
)
.await?;
if status.status == "conflict" {
return Err(Error::DatabaseSyncEngineConflict(format!(
if status.status == "ok" {
Ok(WalPushResult::Ok {
baton: status.baton,
})
} else if status.status == "checkpoint_needed" {
Ok(WalPushResult::NeedCheckpoint)
} else if status.status == "conflict" {
Err(Error::DatabaseSyncEngineConflict(format!(
"wal_push conflict: {status:?}"
)));
}
if status.status != "ok" {
return Err(Error::DatabaseSyncEngineError(format!(
)))
} else {
Err(Error::DatabaseSyncEngineError(format!(
"wal_push unexpected status: {status:?}"
)));
)))
}
Ok(status.baton)
}
const TURSO_SYNC_META_TABLE: &str =
@@ -334,11 +343,13 @@ pub async fn transfer_logical_changes(
ignore_schema_changes: false,
..Default::default()
};
let mut rows_changed = 0;
let mut changes = source.iterate_changes(iterate_opts)?;
let mut updated = false;
while let Some(operation) = changes.next(coro).await? {
match &operation {
DatabaseTapeOperation::RowChange(change) => {
rows_changed += 1;
assert!(
last_change_id.is_none() || last_change_id.unwrap() < change.change_id,
"change id must be strictly increasing: last_change_id={:?}, change.change_id={}",
@@ -387,6 +398,7 @@ pub async fn transfer_logical_changes(
session.replay(coro, operation).await?;
}
tracing::debug!("transfer_logical_changes: rows_changed={:?}", rows_changed);
Ok(())
}
@@ -450,6 +462,23 @@ pub async fn transfer_physical_changes(
Ok(target_sync_watermark)
}
pub async fn checkpoint_wal_file(coro: &Coro, conn: &Arc<turso_core::Connection>) -> Result<()> {
let mut checkpoint_stmt = conn.prepare("PRAGMA wal_checkpoint(TRUNCATE)")?;
loop {
match checkpoint_stmt.step()? {
turso_core::StepResult::IO => coro.yield_(ProtocolCommand::IO).await?,
turso_core::StepResult::Done => break,
turso_core::StepResult::Row => continue,
r => {
return Err(Error::DatabaseSyncEngineError(format!(
"unexepcted checkpoint result: {r:?}"
)))
}
}
}
Ok(())
}
pub async fn reset_wal_file(
coro: &Coro,
wal: Arc<dyn turso_core::File>,

View File

@@ -1,4 +1,8 @@
use std::{collections::HashMap, path::Path, sync::Arc};
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::Mutex;
@@ -33,6 +37,7 @@ struct TestSyncServerState {
#[derive(Clone)]
pub struct TestSyncServer {
path: PathBuf,
ctx: Arc<TestContext>,
db: turso::Database,
state: Arc<Mutex<TestSyncServerState>>,
@@ -49,6 +54,7 @@ impl TestSyncServer {
},
);
Ok(Self {
path: path.to_path_buf(),
ctx,
db: turso::Builder::new_local(path.to_str().unwrap())
.build()
@@ -179,9 +185,25 @@ impl TestSyncServer {
let mut session = {
let mut state = self.state.lock().await;
if state.generation != generation_id {
return Err(Error::DatabaseSyncEngineError(
"generation id mismatch".to_string(),
));
let generation = state.generations.get(&state.generation).unwrap();
let max_frame_no = generation.frames.len();
let status = DbSyncStatus {
baton: None,
status: "checkpoint_needed".to_string(),
generation: state.generation,
max_frame_no: max_frame_no as u64,
};
let status = serde_json::to_vec(&status)?;
completion.set_status(200);
self.ctx.faulty_call("wal_push_status").await?;
completion.push_data(status);
self.ctx.faulty_call("wal_push_push").await?;
completion.set_done();
return Ok(());
}
let baton_str = baton.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let session = match state.sessions.get(&baton_str) {
@@ -263,6 +285,31 @@ impl TestSyncServer {
pub fn db(&self) -> turso::Database {
self.db.clone()
}
pub async fn checkpoint(&self) -> Result<()> {
tracing::debug!("checkpoint sync-server db");
let conn = self.db.connect()?;
let mut rows = conn.query("PRAGMA wal_checkpoint(TRUNCATE)", ()).await?;
let Some(_) = rows.next().await? else {
return Err(Error::DatabaseSyncEngineError(
"checkpoint must return single row".to_string(),
));
};
let mut state = self.state.lock().await;
let generation = state.generation + 1;
state.generation = generation;
state.generations.insert(
generation,
Generation {
snapshot: std::fs::read(&self.path).map_err(|e| {
Error::DatabaseSyncEngineError(format!(
"failed to create generation snapshot: {e}"
))
})?,
frames: Vec::new(),
},
);
Ok(())
}
pub async fn execute(&self, sql: &str, params: impl turso::IntoParams) -> Result<()> {
let conn = self.db.connect()?;
conn.execute(sql, params).await?;
@@ -278,8 +325,8 @@ impl TestSyncServer {
let wal_frame_count = conn.wal_frame_count()?;
tracing::debug!("conn frames count: {}", wal_frame_count);
for frame_no in last_frame..=wal_frame_count as usize {
conn.wal_get_frame(frame_no as u64, &mut frame)?;
tracing::debug!("push local frame {}", frame_no);
let frame_info = conn.wal_get_frame(frame_no as u64, &mut frame)?;
tracing::debug!("push local frame {}, info={:?}", frame_no, frame_info);
generation.frames.push(frame.to_vec());
}
Ok(())