diff --git a/core/storage/wal.rs b/core/storage/wal.rs index a14236fd6..154f1f3f3 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -918,7 +918,11 @@ impl Wal for WalFile { page: PageRef, buffer_pool: Arc, ) -> Result { - tracing::debug!("read_frame({})", frame_id); + tracing::debug!( + "read_frame(page_idx = {}, frame_id = {})", + page.get().id, + frame_id + ); let offset = self.frame_offset(frame_id); page.set_locked(); let frame = page.clone(); @@ -969,6 +973,9 @@ impl Wal for WalFile { db_size: u64, page: &[u8], ) -> Result<()> { + if self.get_max_frame_in_wal() == 0 { + self.ensure_header_if_needed()?; + } tracing::debug!("write_raw_frame({})", frame_id); if page.len() != self.page_size() as usize { return Err(LimboError::InvalidArgument(format!( diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index 48a021ae8..dafdfbc9b 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use crate::{ database_sync_operations::{ - db_bootstrap, reset_wal_file, transfer_logical_changes, transfer_physical_changes, - wait_full_body, wal_pull, wal_push, WalPullResult, + checkpoint_wal_file, db_bootstrap, reset_wal_file, transfer_logical_changes, + transfer_physical_changes, wait_full_body, wal_pull, wal_push, WalPullResult, }, errors::Error, io_operations::IoOperations, @@ -107,58 +107,87 @@ impl DatabaseSyncEngine { // reset Synced DB if it wasn't properly cleaned-up on previous "sync-method" attempt self.reset_synced_if_dirty(coro).await?; - // update Synced DB with fresh changes from remote - self.pull_synced_from_remote(coro).await?; + loop { + // update Synced DB with fresh changes from remote + let pull_result = self.pull_synced_from_remote(coro).await?; - // we will "replay" Synced WAL to the Draft WAL later without pushing it to the remote - // so, we pass 'capture: true' as we need to preserve all changes for future push of WAL - let draft = self.io.open_tape(&self.draft_path, true)?; - let synced = self.io.open_tape(&self.synced_path, true)?; + { + // we will "replay" Synced WAL to the Draft WAL later without pushing it to the remote + // so, we pass 'capture: true' as we need to preserve all changes for future push of WAL + let draft = self.io.open_tape(&self.draft_path, true)?; + tracing::info!("opened draft"); + let synced = self.io.open_tape(&self.synced_path, true)?; + tracing::info!("opened synced"); - { - // we will start wal write session for Draft DB in order to hold write lock during transfer of changes - let mut draft_session = WalSession::new(draft.connect(coro).await?); - draft_session.begin()?; + // we will start wal write session for Draft DB in order to hold write lock during transfer of changes + let mut draft_session = WalSession::new(draft.connect(coro).await?); + draft_session.begin()?; - // mark Synced as dirty as we will start transfer of logical changes there and if we will fail in the middle - we will need to cleanup Synced db - self.synced_is_dirty = true; + // mark Synced as dirty as we will start transfer of logical changes there and if we will fail in the middle - we will need to cleanup Synced db + self.synced_is_dirty = true; - // transfer logical changes to the Synced DB in order to later execute physical "rebase" operation - let client_id = &self.meta().client_unique_id; - transfer_logical_changes(coro, &draft, &synced, client_id, true).await?; + // transfer logical changes to the Synced DB in order to later execute physical "rebase" operation + let client_id = &self.meta().client_unique_id; + transfer_logical_changes(coro, &draft, &synced, client_id, true).await?; - // now we are ready to do the rebase: let's transfer physical changes from Synced to Draft - let synced_wal_watermark = self.meta().synced_wal_match_watermark; - let synced_sync_watermark = self.meta().synced_frame_no.expect( - "synced_frame_no must be set as we call pull_synced_from_remote before that", + // now we are ready to do the rebase: let's transfer physical changes from Synced to Draft + let synced_wal_watermark = self.meta().synced_wal_match_watermark; + let synced_sync_watermark = self.meta().synced_frame_no.expect( + "synced_frame_no must be set as we call pull_synced_from_remote before that", + ); + let draft_wal_watermark = self.meta().draft_wal_match_watermark; + let draft_sync_watermark = transfer_physical_changes( + coro, + &synced, + draft_session, + synced_wal_watermark, + synced_sync_watermark, + draft_wal_watermark, + ) + .await?; + update_meta( + coro, + self.protocol.as_ref(), + &self.meta_path, + &mut self.meta, + |m| { + m.draft_wal_match_watermark = draft_sync_watermark; + m.synced_wal_match_watermark = synced_sync_watermark; + }, + ) + .await?; + } + + // Synced DB is 100% dirty now - let's reset it + assert!(self.synced_is_dirty); + self.reset_synced_if_dirty(coro).await?; + + let WalPullResult::NeedCheckpoint = pull_result else { + break; + }; + tracing::debug!( + "ready to checkpoint synced db file at {:?}, generation={}", + self.synced_path, + self.meta().synced_generation ); - let draft_wal_watermark = self.meta().draft_wal_match_watermark; - let draft_sync_watermark = transfer_physical_changes( - coro, - &synced, - draft_session, - synced_wal_watermark, - synced_sync_watermark, - draft_wal_watermark, - ) - .await?; - update_meta( - coro, - self.protocol.as_ref(), - &self.meta_path, - &mut self.meta, - |m| { - m.draft_wal_match_watermark = draft_sync_watermark; - m.synced_wal_match_watermark = synced_sync_watermark; - }, - ) - .await?; + { + let synced = self.io.open_tape(&self.synced_path, false)?; + checkpoint_wal_file(coro, &synced.connect_untracked()?).await?; + update_meta( + coro, + self.protocol.as_ref(), + &self.meta_path, + &mut self.meta, + |m| { + m.synced_generation += 1; + m.synced_frame_no = Some(0); + m.synced_wal_match_watermark = 0; + }, + ) + .await?; + } } - // Synced DB is 100% dirty now - let's reset it - assert!(self.synced_is_dirty); - self.reset_synced_if_dirty(coro).await?; - Ok(()) } @@ -251,7 +280,7 @@ impl DatabaseSyncEngine { Ok(()) } - async fn pull_synced_from_remote(&mut self, coro: &Coro) -> Result<()> { + async fn pull_synced_from_remote(&mut self, coro: &Coro) -> Result { tracing::debug!( "pull_synced_from_remote: draft={:?}, synced={:?}", self.draft_path, @@ -286,16 +315,12 @@ impl DatabaseSyncEngine { ) .await? { - WalPullResult::Done => return Ok(()), + WalPullResult::Done => return Ok(WalPullResult::Done), + WalPullResult::NeedCheckpoint => return Ok(WalPullResult::NeedCheckpoint), WalPullResult::PullMore => { start_frame = end_frame; continue; } - WalPullResult::NeedCheckpoint => { - return Err(Error::DatabaseSyncEngineError( - "checkpoint is temporary not supported".to_string(), - )); - } } } } @@ -579,6 +604,7 @@ pub mod tests { client_name: "id-1".to_string(), wal_pull_batch_size: 1, }; + runner.init(local_path, opts).await.unwrap(); protocol @@ -622,6 +648,70 @@ pub mod tests { }); } + #[test] + pub fn test_sync_single_db_checkpoint() { + deterministic_runtime(async || { + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + let dir = tempfile::TempDir::new().unwrap(); + let server_path = dir.path().join("server.db"); + let ctx = Arc::new(TestContext::new(seed_u64())); + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) + .await + .unwrap(); + let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); + let local_path = dir.path().join("local.db"); + let opts = DatabaseSyncEngineOpts { + client_name: "id-1".to_string(), + wal_pull_batch_size: 1, + }; + runner.init(local_path, opts).await.unwrap(); + + protocol + .server + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)", ()) + .await + .unwrap(); + protocol + .server + .execute("INSERT INTO t VALUES (1, randomblob(4 * 4096))", ()) + .await + .unwrap(); + protocol.server.checkpoint().await.unwrap(); + protocol + .server + .execute("INSERT INTO t VALUES (2, randomblob(5 * 4096))", ()) + .await + .unwrap(); + protocol.server.checkpoint().await.unwrap(); + protocol + .server + .execute("INSERT INTO t VALUES (3, randomblob(6 * 4096))", ()) + .await + .unwrap(); + protocol + .server + .execute("INSERT INTO t VALUES (4, randomblob(7 * 4096))", ()) + .await + .unwrap(); + + let conn = runner.connect().await.unwrap(); + + runner.pull().await.unwrap(); + + assert_eq!( + query_rows(&conn, "SELECT x, length(y) FROM t") + .await + .unwrap(), + vec![ + vec![turso::Value::Integer(1), turso::Value::Integer(4 * 4096)], + vec![turso::Value::Integer(2), turso::Value::Integer(5 * 4096)], + vec![turso::Value::Integer(3), turso::Value::Integer(6 * 4096)], + vec![turso::Value::Integer(4), turso::Value::Integer(7 * 4096)], + ] + ); + }); + } + #[test] pub fn test_sync_single_db_full_syncs() { deterministic_runtime(async || { diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index 2837fc1a8..3de1a4ace 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -30,6 +30,11 @@ pub enum WalPullResult { NeedCheckpoint, } +pub enum WalPushResult { + Ok { baton: Option }, + NeedCheckpoint, +} + /// Bootstrap multiple DB files from latest generation from remote pub async fn db_bootstrap( coro: &Coro, @@ -186,12 +191,12 @@ pub async fn wal_push( generation: u64, start_frame: u64, end_frame: u64, -) -> Result> { +) -> Result { assert!(wal_session.in_txn()); tracing::debug!("wal_push: baton={baton:?}, generation={generation}, start_frame={start_frame}, end_frame={end_frame}"); if start_frame == end_frame { - return Ok(None); + return Ok(WalPushResult::Ok { baton: None }); } let mut frames_data = Vec::with_capacity((end_frame - start_frame) as usize * WAL_FRAME_SIZE); @@ -216,17 +221,21 @@ pub async fn wal_push( frames_data, ) .await?; - if status.status == "conflict" { - return Err(Error::DatabaseSyncEngineConflict(format!( + if status.status == "ok" { + Ok(WalPushResult::Ok { + baton: status.baton, + }) + } else if status.status == "checkpoint_needed" { + Ok(WalPushResult::NeedCheckpoint) + } else if status.status == "conflict" { + Err(Error::DatabaseSyncEngineConflict(format!( "wal_push conflict: {status:?}" - ))); - } - if status.status != "ok" { - return Err(Error::DatabaseSyncEngineError(format!( + ))) + } else { + Err(Error::DatabaseSyncEngineError(format!( "wal_push unexpected status: {status:?}" - ))); + ))) } - Ok(status.baton) } const TURSO_SYNC_META_TABLE: &str = @@ -334,11 +343,13 @@ pub async fn transfer_logical_changes( ignore_schema_changes: false, ..Default::default() }; + let mut rows_changed = 0; let mut changes = source.iterate_changes(iterate_opts)?; let mut updated = false; while let Some(operation) = changes.next(coro).await? { match &operation { DatabaseTapeOperation::RowChange(change) => { + rows_changed += 1; assert!( last_change_id.is_none() || last_change_id.unwrap() < change.change_id, "change id must be strictly increasing: last_change_id={:?}, change.change_id={}", @@ -387,6 +398,7 @@ pub async fn transfer_logical_changes( session.replay(coro, operation).await?; } + tracing::debug!("transfer_logical_changes: rows_changed={:?}", rows_changed); Ok(()) } @@ -450,6 +462,23 @@ pub async fn transfer_physical_changes( Ok(target_sync_watermark) } +pub async fn checkpoint_wal_file(coro: &Coro, conn: &Arc) -> Result<()> { + let mut checkpoint_stmt = conn.prepare("PRAGMA wal_checkpoint(TRUNCATE)")?; + loop { + match checkpoint_stmt.step()? { + turso_core::StepResult::IO => coro.yield_(ProtocolCommand::IO).await?, + turso_core::StepResult::Done => break, + turso_core::StepResult::Row => continue, + r => { + return Err(Error::DatabaseSyncEngineError(format!( + "unexepcted checkpoint result: {r:?}" + ))) + } + } + } + Ok(()) +} + pub async fn reset_wal_file( coro: &Coro, wal: Arc, diff --git a/packages/turso-sync-engine/src/test_sync_server.rs b/packages/turso-sync-engine/src/test_sync_server.rs index ccfbc1eb6..9ad902df5 100644 --- a/packages/turso-sync-engine/src/test_sync_server.rs +++ b/packages/turso-sync-engine/src/test_sync_server.rs @@ -1,4 +1,8 @@ -use std::{collections::HashMap, path::Path, sync::Arc}; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::Arc, +}; use tokio::sync::Mutex; @@ -33,6 +37,7 @@ struct TestSyncServerState { #[derive(Clone)] pub struct TestSyncServer { + path: PathBuf, ctx: Arc, db: turso::Database, state: Arc>, @@ -49,6 +54,7 @@ impl TestSyncServer { }, ); Ok(Self { + path: path.to_path_buf(), ctx, db: turso::Builder::new_local(path.to_str().unwrap()) .build() @@ -179,9 +185,25 @@ impl TestSyncServer { let mut session = { let mut state = self.state.lock().await; if state.generation != generation_id { - return Err(Error::DatabaseSyncEngineError( - "generation id mismatch".to_string(), - )); + let generation = state.generations.get(&state.generation).unwrap(); + let max_frame_no = generation.frames.len(); + let status = DbSyncStatus { + baton: None, + status: "checkpoint_needed".to_string(), + generation: state.generation, + max_frame_no: max_frame_no as u64, + }; + + let status = serde_json::to_vec(&status)?; + + completion.set_status(200); + self.ctx.faulty_call("wal_push_status").await?; + + completion.push_data(status); + self.ctx.faulty_call("wal_push_push").await?; + + completion.set_done(); + return Ok(()); } let baton_str = baton.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); let session = match state.sessions.get(&baton_str) { @@ -263,6 +285,31 @@ impl TestSyncServer { pub fn db(&self) -> turso::Database { self.db.clone() } + pub async fn checkpoint(&self) -> Result<()> { + tracing::debug!("checkpoint sync-server db"); + let conn = self.db.connect()?; + let mut rows = conn.query("PRAGMA wal_checkpoint(TRUNCATE)", ()).await?; + let Some(_) = rows.next().await? else { + return Err(Error::DatabaseSyncEngineError( + "checkpoint must return single row".to_string(), + )); + }; + let mut state = self.state.lock().await; + let generation = state.generation + 1; + state.generation = generation; + state.generations.insert( + generation, + Generation { + snapshot: std::fs::read(&self.path).map_err(|e| { + Error::DatabaseSyncEngineError(format!( + "failed to create generation snapshot: {e}" + )) + })?, + frames: Vec::new(), + }, + ); + Ok(()) + } pub async fn execute(&self, sql: &str, params: impl turso::IntoParams) -> Result<()> { let conn = self.db.connect()?; conn.execute(sql, params).await?; @@ -278,8 +325,8 @@ impl TestSyncServer { let wal_frame_count = conn.wal_frame_count()?; tracing::debug!("conn frames count: {}", wal_frame_count); for frame_no in last_frame..=wal_frame_count as usize { - conn.wal_get_frame(frame_no as u64, &mut frame)?; - tracing::debug!("push local frame {}", frame_no); + let frame_info = conn.wal_get_frame(frame_no as u64, &mut frame)?; + tracing::debug!("push local frame {}, info={:?}", frame_no, frame_info); generation.frames.push(frame.to_vec()); } Ok(())