diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index e85a98b0a..48a021ae8 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -422,7 +422,7 @@ impl DatabaseSyncEngine { #[cfg(test)] pub mod tests { - use std::sync::Arc; + use std::{collections::BTreeMap, sync::Arc}; use rand::RngCore; @@ -563,6 +563,65 @@ pub mod tests { }); } + #[test] + pub fn test_sync_single_db_many_pulls_big_payloads() { + deterministic_runtime(async || { + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + let dir = tempfile::TempDir::new().unwrap(); + let server_path = dir.path().join("server.db"); + let ctx = Arc::new(TestContext::new(seed_u64())); + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) + .await + .unwrap(); + let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); + let local_path = dir.path().join("local.db"); + let opts = DatabaseSyncEngineOpts { + client_name: "id-1".to_string(), + wal_pull_batch_size: 1, + }; + runner.init(local_path, opts).await.unwrap(); + + protocol + .server + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)", ()) + .await + .unwrap(); + + runner.pull().await.unwrap(); + + // create connection in outer scope in order to prevent Database from being dropped in between of pull operations + let conn = runner.connect().await.unwrap(); + + let mut expected = BTreeMap::new(); + for attempt in 0..10 { + for _ in 0..5 { + let key = ctx.rng().await.next_u32(); + let length = ctx.rng().await.next_u32() % (10 * 4096); + protocol + .server + .execute("INSERT INTO t VALUES (?, randomblob(?))", (key, length)) + .await + .unwrap(); + expected.insert(key as i64, length as i64); + } + + tracing::info!("pull attempt={}", attempt); + runner.pull().await.unwrap(); + + let expected = expected + .iter() + .map(|(x, y)| vec![turso::Value::Integer(*x), turso::Value::Integer(*y)]) + .collect::>(); + assert_eq!( + query_rows(&conn, "SELECT x, length(y) FROM t") + .await + .unwrap(), + expected + ); + } + }); + } + #[test] pub fn test_sync_single_db_full_syncs() { deterministic_runtime(async || { diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index f8667c5ab..2837fc1a8 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -4,8 +4,8 @@ use turso_core::{types::Text, Buffer, Completion, LimboError, Value}; use crate::{ database_tape::{ - exec_stmt, run_stmt, DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts, - DatabaseReplaySessionOpts, DatabaseTape, DatabaseWalSession, + exec_stmt, run_stmt_expect_one_row, DatabaseChangesIteratorMode, + DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape, DatabaseWalSession, }, errors::Error, protocol_io::{DataCompletion, DataPollResult, ProtocolIO}, @@ -263,8 +263,8 @@ pub async fn transfer_logical_changes( select_last_change_id_stmt .bind_at(1.try_into().unwrap(), Value::Text(Text::new(client_id))); - match run_stmt(coro, &mut select_last_change_id_stmt).await? { - Some(row) => row.get_value(0).as_int().ok_or_else(|| { + match run_stmt_expect_one_row(coro, &mut select_last_change_id_stmt).await? { + Some(row) => row[0].as_int().ok_or_else(|| { Error::DatabaseSyncEngineError("unexpected source pull_gen type".to_string()) })?, None => { @@ -284,12 +284,14 @@ pub async fn transfer_logical_changes( let mut select_last_change_id_stmt = target_conn.prepare(TURSO_SYNC_SELECT_LAST_CHANGE_ID)?; select_last_change_id_stmt.bind_at(1.try_into().unwrap(), Value::Text(Text::new(client_id))); - let mut last_change_id = match run_stmt(coro, &mut select_last_change_id_stmt).await? { + let mut last_change_id = match run_stmt_expect_one_row(coro, &mut select_last_change_id_stmt) + .await? + { Some(row) => { - let target_pull_gen = row.get_value(0).as_int().ok_or_else(|| { + let target_pull_gen = row[0].as_int().ok_or_else(|| { Error::DatabaseSyncEngineError("unexpected target pull_gen type".to_string()) })?; - let target_change_id = row.get_value(1).as_int().ok_or_else(|| { + let target_change_id = row[1].as_int().ok_or_else(|| { Error::DatabaseSyncEngineError("unexpected target change_id type".to_string()) })?; tracing::debug!( @@ -321,6 +323,9 @@ pub async fn transfer_logical_changes( let replay_opts = DatabaseReplaySessionOpts { use_implicit_rowid: false, }; + + let source_schema_cookie = source.connect_untracked()?.read_schema_version()?; + let mut session = target.start_replay_session(coro, replay_opts).await?; let iterate_opts = DatabaseChangesIteratorOpts { @@ -369,6 +374,13 @@ pub async fn transfer_logical_changes( set_last_change_id_stmt .bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id))); exec_stmt(coro, &mut set_last_change_id_stmt).await?; + + let session_schema_cookie = session.conn().read_schema_version()?; + if session_schema_cookie <= source_schema_cookie { + session + .conn() + .write_schema_version(source_schema_cookie + 1)?; + } } _ => {} } @@ -575,7 +587,7 @@ pub mod tests { use crate::{ database_sync_operations::{transfer_logical_changes, transfer_physical_changes}, - database_tape::{run_stmt, DatabaseTape, DatabaseTapeOpts}, + database_tape::{run_stmt_once, DatabaseTape, DatabaseTapeOpts}, wal_session::WalSession, Result, }; @@ -605,7 +617,7 @@ pub mod tests { let mut rows = Vec::new(); let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap(); - while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { rows.push(row.get_values().cloned().collect::>()); } assert_eq!( @@ -622,7 +634,7 @@ pub mod tests { let mut rows = Vec::new(); let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap(); - while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { rows.push(row.get_values().cloned().collect::>()); } assert_eq!( @@ -701,7 +713,7 @@ pub mod tests { let conn2 = db2.connect(&coro).await.unwrap(); let mut rows = Vec::new(); let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap(); - while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { rows.push(row.get_values().cloned().collect::>()); } assert_eq!( @@ -716,7 +728,7 @@ pub mod tests { conn2.execute("INSERT INTO t VALUES (7, 8)")?; let mut rows = Vec::new(); let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap(); - while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { rows.push(row.get_values().cloned().collect::>()); } assert_eq!( diff --git a/packages/turso-sync-engine/src/database_tape.rs b/packages/turso-sync-engine/src/database_tape.rs index 83255f47f..ec9eac976 100644 --- a/packages/turso-sync-engine/src/database_tape.rs +++ b/packages/turso-sync-engine/src/database_tape.rs @@ -35,7 +35,7 @@ pub struct DatabaseTapeOpts { pub cdc_mode: Option, } -pub(crate) async fn run_stmt<'a>( +pub(crate) async fn run_stmt_once<'a>( coro: &'_ Coro, stmt: &'a mut turso_core::Statement, ) -> Result> { @@ -60,6 +60,28 @@ pub(crate) async fn run_stmt<'a>( } } +pub(crate) async fn run_stmt_expect_one_row<'a>( + coro: &'_ Coro, + stmt: &'a mut turso_core::Statement, +) -> Result>> { + let Some(row) = run_stmt_once(coro, stmt).await? else { + return Ok(None); + }; + let values = row.get_values().cloned().collect(); + let None = run_stmt_once(coro, stmt).await? else { + return Err(Error::DatabaseTapeError("single row expected".to_string())); + }; + return Ok(Some(values)); +} + +pub(crate) async fn run_stmt_ignore_rows( + coro: &Coro, + stmt: &mut turso_core::Statement, +) -> Result<()> { + while let Some(_) = run_stmt_once(coro, stmt).await? {} + Ok(()) +} + pub(crate) async fn exec_stmt(coro: &Coro, stmt: &mut turso_core::Statement) -> Result<()> { loop { match stmt.step()? { @@ -109,7 +131,7 @@ impl DatabaseTape { let connection = self.inner.connect()?; tracing::debug!("set '{CDC_PRAGMA_NAME}' for new connection"); let mut stmt = connection.prepare(&self.pragma_query)?; - run_stmt(coro, &mut stmt).await?; + run_stmt_ignore_rows(coro, &mut stmt).await?; Ok(connection) } /// Builds an iterator which emits [DatabaseTapeOperation] by extracting data from CDC table @@ -168,7 +190,7 @@ impl DatabaseWalSession { let conn = wal_session.conn(); let frames_count = conn.wal_frame_count()?; let mut page_size_stmt = conn.prepare("PRAGMA page_size")?; - let Some(row) = run_stmt(coro, &mut page_size_stmt).await? else { + let Some(row) = run_stmt_expect_one_row(coro, &mut page_size_stmt).await? else { return Err(Error::DatabaseTapeError( "unable to get database page size".to_string(), )); @@ -178,18 +200,11 @@ impl DatabaseWalSession { "unexpected columns count for PRAGMA page_size query".to_string(), )); } - let turso_core::Value::Integer(page_size) = row.get_value(0) else { + let turso_core::Value::Integer(page_size) = row[0] else { return Err(Error::DatabaseTapeError( "unexpected column type for PRAGMA page_size query".to_string(), )); }; - let page_size = *page_size; - let None = run_stmt(coro, &mut page_size_stmt).await? else { - return Err(Error::DatabaseTapeError( - "page size pragma returned multiple rows".to_string(), - )); - }; - Ok(Self { page_size: page_size as usize, next_wal_frame_no: frames_count + 1, @@ -376,7 +391,7 @@ impl DatabaseChangesIterator { turso_core::Value::Integer(change_id_filter), ); - while let Some(row) = run_stmt(coro, &mut self.query_stmt).await? { + while let Some(row) = run_stmt_once(coro, &mut self.query_stmt).await? { let database_change: DatabaseChange = row.try_into()?; let tape_change = match self.mode { DatabaseChangesIteratorMode::Apply => database_change.into_apply()?, @@ -432,7 +447,7 @@ impl DatabaseReplaySession { DatabaseTapeOperation::RowChange(change) => { if !self.in_txn { tracing::trace!("replay: start txn for replaying changes"); - self.conn.execute("BEGIN")?; + self.conn.execute("BEGIN IMMEDIATE")?; self.in_txn = true; } tracing::trace!("replay: change={:?}", change); @@ -696,7 +711,7 @@ impl DatabaseReplaySession { "SELECT name FROM pragma_table_info('{table_name}')" ))?; let mut column_names = Vec::with_capacity(columns + 1); - while let Some(column) = run_stmt(coro, &mut table_info_stmt).await? { + while let Some(column) = run_stmt_once(coro, &mut table_info_stmt).await? { let turso_core::Value::Text(text) = column.get_value(0) else { return Err(Error::DatabaseTapeError( "unexpected column type for pragma_table_info query".to_string(), @@ -721,7 +736,7 @@ impl DatabaseReplaySession { ))?; let mut pk_predicates = Vec::with_capacity(1); let mut pk_column_indices = Vec::with_capacity(1); - while let Some(column) = run_stmt(coro, &mut pk_info_stmt).await? { + while let Some(column) = run_stmt_once(coro, &mut pk_info_stmt).await? { let turso_core::Value::Integer(column_id) = column.get_value(0) else { return Err(Error::DatabaseTapeError( "unexpected column type for pragma_table_info query".to_string(), @@ -764,7 +779,7 @@ impl DatabaseReplaySession { let mut pk_predicates = Vec::with_capacity(1); let mut pk_column_indices = Vec::with_capacity(1); let mut column_updates = Vec::with_capacity(1); - while let Some(column) = run_stmt(coro, &mut table_info_stmt).await? { + while let Some(column) = run_stmt_once(coro, &mut table_info_stmt).await? { let turso_core::Value::Integer(column_id) = column.get_value(0) else { return Err(Error::DatabaseTapeError( "unexpected column type for pragma_table_info query".to_string(), @@ -836,7 +851,7 @@ mod tests { use crate::{ database_tape::{ - run_stmt, DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape, + run_stmt_once, DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape, }, types::{DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType}, }; @@ -855,7 +870,7 @@ mod tests { let conn = db1.connect(&coro).await.unwrap(); let mut stmt = conn.prepare("SELECT * FROM turso_cdc").unwrap(); let mut rows = Vec::new(); - while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { rows.push(row.get_values().cloned().collect::>()); } rows @@ -977,7 +992,7 @@ mod tests { } let mut stmt = conn2.prepare("SELECT rowid, x FROM t").unwrap(); let mut rows = Vec::new(); - while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { rows.push(row.get_values().cloned().collect::>()); } rows @@ -1055,7 +1070,7 @@ mod tests { } let mut stmt = conn2.prepare("SELECT rowid, x FROM t").unwrap(); let mut rows = Vec::new(); - while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { rows.push(row.get_values().cloned().collect::>()); } rows @@ -1124,7 +1139,7 @@ mod tests { } let mut stmt = conn2.prepare("SELECT rowid, x FROM t").unwrap(); let mut rows = Vec::new(); - while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { rows.push(row.get_values().cloned().collect::>()); } rows @@ -1205,7 +1220,7 @@ mod tests { } let mut rows = Vec::new(); let mut stmt = conn3.prepare("SELECT rowid, x, y FROM t").unwrap(); - while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { rows.push(row.get_values().cloned().collect::>()); } assert_eq!( @@ -1219,7 +1234,7 @@ mod tests { let mut rows = Vec::new(); let mut stmt = conn3.prepare("SELECT rowid, x, y FROM q").unwrap(); - while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { rows.push(row.get_values().cloned().collect::>()); } assert_eq!( @@ -1236,7 +1251,7 @@ mod tests { "SELECT * FROM sqlite_schema WHERE name != 'turso_cdc' AND type = 'table'", ) .unwrap(); - while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { rows.push(row.get_values().cloned().collect::>()); } assert_eq!( @@ -1319,7 +1334,7 @@ mod tests { let mut stmt = conn2 .prepare("SELECT * FROM sqlite_schema WHERE name IN ('t', 't_idx')") .unwrap(); - while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { rows.push(row.get_values().cloned().collect::>()); } assert_eq!( @@ -1403,7 +1418,7 @@ mod tests { let mut stmt = conn2 .prepare("SELECT * FROM sqlite_schema WHERE name IN ('t')") .unwrap(); - while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { rows.push(row.get_values().cloned().collect::>()); } assert_eq!( @@ -1501,7 +1516,7 @@ mod tests { } let mut rows = Vec::new(); let mut stmt = conn3.prepare("SELECT * FROM t").unwrap(); - while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { rows.push(row.get_values().cloned().collect::>()); } assert_eq!(