fix sync engine

This commit is contained in:
Nikita Sivukhin
2025-08-12 17:32:42 +04:00
parent 1d8ed9aa55
commit d3a27ee311
3 changed files with 126 additions and 40 deletions

View File

@@ -422,7 +422,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};
use rand::RngCore;
@@ -563,6 +563,65 @@ pub mod tests {
});
}
#[test]
pub fn test_sync_single_db_many_pulls_big_payloads() {
deterministic_runtime(async || {
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let dir = tempfile::TempDir::new().unwrap();
let server_path = dir.path().join("server.db");
let ctx = Arc::new(TestContext::new(seed_u64()));
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
let local_path = dir.path().join("local.db");
let opts = DatabaseSyncEngineOpts {
client_name: "id-1".to_string(),
wal_pull_batch_size: 1,
};
runner.init(local_path, opts).await.unwrap();
protocol
.server
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)", ())
.await
.unwrap();
runner.pull().await.unwrap();
// create connection in outer scope in order to prevent Database from being dropped in between of pull operations
let conn = runner.connect().await.unwrap();
let mut expected = BTreeMap::new();
for attempt in 0..10 {
for _ in 0..5 {
let key = ctx.rng().await.next_u32();
let length = ctx.rng().await.next_u32() % (10 * 4096);
protocol
.server
.execute("INSERT INTO t VALUES (?, randomblob(?))", (key, length))
.await
.unwrap();
expected.insert(key as i64, length as i64);
}
tracing::info!("pull attempt={}", attempt);
runner.pull().await.unwrap();
let expected = expected
.iter()
.map(|(x, y)| vec![turso::Value::Integer(*x), turso::Value::Integer(*y)])
.collect::<Vec<_>>();
assert_eq!(
query_rows(&conn, "SELECT x, length(y) FROM t")
.await
.unwrap(),
expected
);
}
});
}
#[test]
pub fn test_sync_single_db_full_syncs() {
deterministic_runtime(async || {

View File

@@ -4,8 +4,8 @@ use turso_core::{types::Text, Buffer, Completion, LimboError, Value};
use crate::{
database_tape::{
exec_stmt, run_stmt, DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts,
DatabaseReplaySessionOpts, DatabaseTape, DatabaseWalSession,
exec_stmt, run_stmt_expect_one_row, DatabaseChangesIteratorMode,
DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape, DatabaseWalSession,
},
errors::Error,
protocol_io::{DataCompletion, DataPollResult, ProtocolIO},
@@ -263,8 +263,8 @@ pub async fn transfer_logical_changes(
select_last_change_id_stmt
.bind_at(1.try_into().unwrap(), Value::Text(Text::new(client_id)));
match run_stmt(coro, &mut select_last_change_id_stmt).await? {
Some(row) => row.get_value(0).as_int().ok_or_else(|| {
match run_stmt_expect_one_row(coro, &mut select_last_change_id_stmt).await? {
Some(row) => row[0].as_int().ok_or_else(|| {
Error::DatabaseSyncEngineError("unexpected source pull_gen type".to_string())
})?,
None => {
@@ -284,12 +284,14 @@ pub async fn transfer_logical_changes(
let mut select_last_change_id_stmt = target_conn.prepare(TURSO_SYNC_SELECT_LAST_CHANGE_ID)?;
select_last_change_id_stmt.bind_at(1.try_into().unwrap(), Value::Text(Text::new(client_id)));
let mut last_change_id = match run_stmt(coro, &mut select_last_change_id_stmt).await? {
let mut last_change_id = match run_stmt_expect_one_row(coro, &mut select_last_change_id_stmt)
.await?
{
Some(row) => {
let target_pull_gen = row.get_value(0).as_int().ok_or_else(|| {
let target_pull_gen = row[0].as_int().ok_or_else(|| {
Error::DatabaseSyncEngineError("unexpected target pull_gen type".to_string())
})?;
let target_change_id = row.get_value(1).as_int().ok_or_else(|| {
let target_change_id = row[1].as_int().ok_or_else(|| {
Error::DatabaseSyncEngineError("unexpected target change_id type".to_string())
})?;
tracing::debug!(
@@ -321,6 +323,9 @@ pub async fn transfer_logical_changes(
let replay_opts = DatabaseReplaySessionOpts {
use_implicit_rowid: false,
};
let source_schema_cookie = source.connect_untracked()?.read_schema_version()?;
let mut session = target.start_replay_session(coro, replay_opts).await?;
let iterate_opts = DatabaseChangesIteratorOpts {
@@ -369,6 +374,13 @@ pub async fn transfer_logical_changes(
set_last_change_id_stmt
.bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id)));
exec_stmt(coro, &mut set_last_change_id_stmt).await?;
let session_schema_cookie = session.conn().read_schema_version()?;
if session_schema_cookie <= source_schema_cookie {
session
.conn()
.write_schema_version(source_schema_cookie + 1)?;
}
}
_ => {}
}
@@ -575,7 +587,7 @@ pub mod tests {
use crate::{
database_sync_operations::{transfer_logical_changes, transfer_physical_changes},
database_tape::{run_stmt, DatabaseTape, DatabaseTapeOpts},
database_tape::{run_stmt_once, DatabaseTape, DatabaseTapeOpts},
wal_session::WalSession,
Result,
};
@@ -605,7 +617,7 @@ pub mod tests {
let mut rows = Vec::new();
let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -622,7 +634,7 @@ pub mod tests {
let mut rows = Vec::new();
let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -701,7 +713,7 @@ pub mod tests {
let conn2 = db2.connect(&coro).await.unwrap();
let mut rows = Vec::new();
let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -716,7 +728,7 @@ pub mod tests {
conn2.execute("INSERT INTO t VALUES (7, 8)")?;
let mut rows = Vec::new();
let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(

View File

@@ -35,7 +35,7 @@ pub struct DatabaseTapeOpts {
pub cdc_mode: Option<String>,
}
pub(crate) async fn run_stmt<'a>(
pub(crate) async fn run_stmt_once<'a>(
coro: &'_ Coro,
stmt: &'a mut turso_core::Statement,
) -> Result<Option<&'a turso_core::Row>> {
@@ -60,6 +60,28 @@ pub(crate) async fn run_stmt<'a>(
}
}
pub(crate) async fn run_stmt_expect_one_row<'a>(
coro: &'_ Coro,
stmt: &'a mut turso_core::Statement,
) -> Result<Option<Vec<turso_core::Value>>> {
let Some(row) = run_stmt_once(coro, stmt).await? else {
return Ok(None);
};
let values = row.get_values().cloned().collect();
let None = run_stmt_once(coro, stmt).await? else {
return Err(Error::DatabaseTapeError("single row expected".to_string()));
};
return Ok(Some(values));
}
pub(crate) async fn run_stmt_ignore_rows(
coro: &Coro,
stmt: &mut turso_core::Statement,
) -> Result<()> {
while let Some(_) = run_stmt_once(coro, stmt).await? {}
Ok(())
}
pub(crate) async fn exec_stmt(coro: &Coro, stmt: &mut turso_core::Statement) -> Result<()> {
loop {
match stmt.step()? {
@@ -109,7 +131,7 @@ impl DatabaseTape {
let connection = self.inner.connect()?;
tracing::debug!("set '{CDC_PRAGMA_NAME}' for new connection");
let mut stmt = connection.prepare(&self.pragma_query)?;
run_stmt(coro, &mut stmt).await?;
run_stmt_ignore_rows(coro, &mut stmt).await?;
Ok(connection)
}
/// Builds an iterator which emits [DatabaseTapeOperation] by extracting data from CDC table
@@ -168,7 +190,7 @@ impl DatabaseWalSession {
let conn = wal_session.conn();
let frames_count = conn.wal_frame_count()?;
let mut page_size_stmt = conn.prepare("PRAGMA page_size")?;
let Some(row) = run_stmt(coro, &mut page_size_stmt).await? else {
let Some(row) = run_stmt_expect_one_row(coro, &mut page_size_stmt).await? else {
return Err(Error::DatabaseTapeError(
"unable to get database page size".to_string(),
));
@@ -178,18 +200,11 @@ impl DatabaseWalSession {
"unexpected columns count for PRAGMA page_size query".to_string(),
));
}
let turso_core::Value::Integer(page_size) = row.get_value(0) else {
let turso_core::Value::Integer(page_size) = row[0] else {
return Err(Error::DatabaseTapeError(
"unexpected column type for PRAGMA page_size query".to_string(),
));
};
let page_size = *page_size;
let None = run_stmt(coro, &mut page_size_stmt).await? else {
return Err(Error::DatabaseTapeError(
"page size pragma returned multiple rows".to_string(),
));
};
Ok(Self {
page_size: page_size as usize,
next_wal_frame_no: frames_count + 1,
@@ -376,7 +391,7 @@ impl DatabaseChangesIterator {
turso_core::Value::Integer(change_id_filter),
);
while let Some(row) = run_stmt(coro, &mut self.query_stmt).await? {
while let Some(row) = run_stmt_once(coro, &mut self.query_stmt).await? {
let database_change: DatabaseChange = row.try_into()?;
let tape_change = match self.mode {
DatabaseChangesIteratorMode::Apply => database_change.into_apply()?,
@@ -432,7 +447,7 @@ impl DatabaseReplaySession {
DatabaseTapeOperation::RowChange(change) => {
if !self.in_txn {
tracing::trace!("replay: start txn for replaying changes");
self.conn.execute("BEGIN")?;
self.conn.execute("BEGIN IMMEDIATE")?;
self.in_txn = true;
}
tracing::trace!("replay: change={:?}", change);
@@ -696,7 +711,7 @@ impl DatabaseReplaySession {
"SELECT name FROM pragma_table_info('{table_name}')"
))?;
let mut column_names = Vec::with_capacity(columns + 1);
while let Some(column) = run_stmt(coro, &mut table_info_stmt).await? {
while let Some(column) = run_stmt_once(coro, &mut table_info_stmt).await? {
let turso_core::Value::Text(text) = column.get_value(0) else {
return Err(Error::DatabaseTapeError(
"unexpected column type for pragma_table_info query".to_string(),
@@ -721,7 +736,7 @@ impl DatabaseReplaySession {
))?;
let mut pk_predicates = Vec::with_capacity(1);
let mut pk_column_indices = Vec::with_capacity(1);
while let Some(column) = run_stmt(coro, &mut pk_info_stmt).await? {
while let Some(column) = run_stmt_once(coro, &mut pk_info_stmt).await? {
let turso_core::Value::Integer(column_id) = column.get_value(0) else {
return Err(Error::DatabaseTapeError(
"unexpected column type for pragma_table_info query".to_string(),
@@ -764,7 +779,7 @@ impl DatabaseReplaySession {
let mut pk_predicates = Vec::with_capacity(1);
let mut pk_column_indices = Vec::with_capacity(1);
let mut column_updates = Vec::with_capacity(1);
while let Some(column) = run_stmt(coro, &mut table_info_stmt).await? {
while let Some(column) = run_stmt_once(coro, &mut table_info_stmt).await? {
let turso_core::Value::Integer(column_id) = column.get_value(0) else {
return Err(Error::DatabaseTapeError(
"unexpected column type for pragma_table_info query".to_string(),
@@ -836,7 +851,7 @@ mod tests {
use crate::{
database_tape::{
run_stmt, DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape,
run_stmt_once, DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape,
},
types::{DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType},
};
@@ -855,7 +870,7 @@ mod tests {
let conn = db1.connect(&coro).await.unwrap();
let mut stmt = conn.prepare("SELECT * FROM turso_cdc").unwrap();
let mut rows = Vec::new();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
rows
@@ -977,7 +992,7 @@ mod tests {
}
let mut stmt = conn2.prepare("SELECT rowid, x FROM t").unwrap();
let mut rows = Vec::new();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
rows
@@ -1055,7 +1070,7 @@ mod tests {
}
let mut stmt = conn2.prepare("SELECT rowid, x FROM t").unwrap();
let mut rows = Vec::new();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
rows
@@ -1124,7 +1139,7 @@ mod tests {
}
let mut stmt = conn2.prepare("SELECT rowid, x FROM t").unwrap();
let mut rows = Vec::new();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
rows
@@ -1205,7 +1220,7 @@ mod tests {
}
let mut rows = Vec::new();
let mut stmt = conn3.prepare("SELECT rowid, x, y FROM t").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -1219,7 +1234,7 @@ mod tests {
let mut rows = Vec::new();
let mut stmt = conn3.prepare("SELECT rowid, x, y FROM q").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -1236,7 +1251,7 @@ mod tests {
"SELECT * FROM sqlite_schema WHERE name != 'turso_cdc' AND type = 'table'",
)
.unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -1319,7 +1334,7 @@ mod tests {
let mut stmt = conn2
.prepare("SELECT * FROM sqlite_schema WHERE name IN ('t', 't_idx')")
.unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -1403,7 +1418,7 @@ mod tests {
let mut stmt = conn2
.prepare("SELECT * FROM sqlite_schema WHERE name IN ('t')")
.unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -1501,7 +1516,7 @@ mod tests {
}
let mut rows = Vec::new();
let mut stmt = conn3.prepare("SELECT * FROM t").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(