Merge 'turso-sync: fix schema bug' from Nikita Sivukhin

This PR fixes few issues in turso-sync-engine implementation:
1. One step of `pull` implementation works like this:
   a. Start write WAL session
   b. Revert local changes in WAL
   c. Replay WAL frames from remote DB
   d. Replay WAL frames produced by local changes applied to the remote
DB copy (`synced`)
My initial thinking was that by executing step (d) we will get the same
schema as before (with same schema cookie) and everything will be fine.
With more deep thinking we can see that it's not fine - as after step
(d) tables created locally can change their root pages (if they were
created locally, for example) - and DB will have "broken" schema
2. Sync engine execute few SQL statements and do not run them to
completion - which basically created "orphaned" locks
In order to fix (1) I decided to introduce another `conn_raw_api`
extension which allows to read and write schema cookie directly in the
transaction. So, the process described above adds step (e) which set
schema cookie to the value strictly greater than the value before.
In order to fix (2) I just fixed all places where statement were dropped
before running to completion.
These fixes are merged together because I explored them by fixing one
new test: `test_sync_single_db_many_pulls_big_payloads`

Closes #2561
This commit is contained in:
Pekka Enberg
2025-08-12 18:24:46 +03:00
committed by GitHub
6 changed files with 183 additions and 41 deletions

View File

@@ -1218,6 +1218,55 @@ impl Connection {
Ok(())
}
/// Read schema version at current transaction
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
pub fn read_schema_version(&self) -> Result<u32> {
loop {
let pager = self.pager.borrow();
match pager.with_header(|header| header.schema_cookie)? {
IOResult::Done(cookie) => return Ok(cookie.get()),
IOResult::IO => {
self.run_once()?;
continue;
}
};
}
}
/// Update schema version to the new value within opened write transaction
///
/// New version of the schema must be strictly greater than previous one - otherwise method will panic
/// Write transaction must be opened in advance - otherwise method will panic
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
pub fn write_schema_version(self: &Arc<Connection>, version: u32) -> Result<()> {
let TransactionState::Write { .. } = self.transaction_state.get() else {
return Err(LimboError::InternalError(
"write_schema_version must be called from within Write transaction".to_string(),
));
};
loop {
let pager = self.pager.borrow();
match pager.with_header_mut(|header| {
turso_assert!(
header.schema_cookie.get() < version,
"cookie can't go back in time"
);
self.transaction_state.replace(TransactionState::Write {
schema_did_change: true,
});
self.with_schema_mut(|schema| schema.schema_version = version);
header.schema_cookie = version.into();
})? {
IOResult::Done(()) => break,
IOResult::IO => {
self.run_once()?;
continue;
}
};
}
Ok(())
}
/// Try to read page with given ID with fixed WAL watermark position
/// This method return false if page is not found (so, this is probably new page created after watermark position which wasn't checkpointed to the DB file yet)
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
@@ -1917,11 +1966,12 @@ impl Statement {
let mut res = self
.program
.step(&mut self.state, self.mv_store.clone(), self.pager.clone());
for _ in 0..MAX_SCHEMA_RETRY {
for attempt in 0..MAX_SCHEMA_RETRY {
// Only reprepare if we still need to update schema
if !matches!(res, Err(LimboError::SchemaUpdated)) {
break;
}
tracing::debug!("reprepare: attempt={}", attempt);
self.reprepare()?;
res = self
.program

View File

@@ -849,6 +849,7 @@ impl Wal for WalFile {
}
// Snapshot is stale, give up and let caller retry from scratch
tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch");
shared.write_lock.unlock();
Ok(LimboResult::Busy)
}

View File

@@ -2062,6 +2062,11 @@ pub fn op_transaction(
match res {
Ok(header_schema_cookie) => {
if header_schema_cookie != *schema_cookie {
tracing::info!(
"schema changed, force reprepare: {} != {}",
header_schema_cookie,
*schema_cookie
);
return Err(LimboError::SchemaUpdated);
}
}

View File

@@ -422,7 +422,7 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};
use rand::RngCore;
@@ -563,6 +563,65 @@ pub mod tests {
});
}
#[test]
pub fn test_sync_single_db_many_pulls_big_payloads() {
deterministic_runtime(async || {
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let dir = tempfile::TempDir::new().unwrap();
let server_path = dir.path().join("server.db");
let ctx = Arc::new(TestContext::new(seed_u64()));
let protocol = TestProtocolIo::new(ctx.clone(), &server_path)
.await
.unwrap();
let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone());
let local_path = dir.path().join("local.db");
let opts = DatabaseSyncEngineOpts {
client_name: "id-1".to_string(),
wal_pull_batch_size: 1,
};
runner.init(local_path, opts).await.unwrap();
protocol
.server
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)", ())
.await
.unwrap();
runner.pull().await.unwrap();
// create connection in outer scope in order to prevent Database from being dropped in between of pull operations
let conn = runner.connect().await.unwrap();
let mut expected = BTreeMap::new();
for attempt in 0..10 {
for _ in 0..5 {
let key = ctx.rng().await.next_u32();
let length = ctx.rng().await.next_u32() % (10 * 4096);
protocol
.server
.execute("INSERT INTO t VALUES (?, randomblob(?))", (key, length))
.await
.unwrap();
expected.insert(key as i64, length as i64);
}
tracing::info!("pull attempt={}", attempt);
runner.pull().await.unwrap();
let expected = expected
.iter()
.map(|(x, y)| vec![turso::Value::Integer(*x), turso::Value::Integer(*y)])
.collect::<Vec<_>>();
assert_eq!(
query_rows(&conn, "SELECT x, length(y) FROM t")
.await
.unwrap(),
expected
);
}
});
}
#[test]
pub fn test_sync_single_db_full_syncs() {
deterministic_runtime(async || {

View File

@@ -4,8 +4,8 @@ use turso_core::{types::Text, Buffer, Completion, LimboError, Value};
use crate::{
database_tape::{
exec_stmt, run_stmt, DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts,
DatabaseReplaySessionOpts, DatabaseTape, DatabaseWalSession,
exec_stmt, run_stmt_expect_one_row, DatabaseChangesIteratorMode,
DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape, DatabaseWalSession,
},
errors::Error,
protocol_io::{DataCompletion, DataPollResult, ProtocolIO},
@@ -263,8 +263,8 @@ pub async fn transfer_logical_changes(
select_last_change_id_stmt
.bind_at(1.try_into().unwrap(), Value::Text(Text::new(client_id)));
match run_stmt(coro, &mut select_last_change_id_stmt).await? {
Some(row) => row.get_value(0).as_int().ok_or_else(|| {
match run_stmt_expect_one_row(coro, &mut select_last_change_id_stmt).await? {
Some(row) => row[0].as_int().ok_or_else(|| {
Error::DatabaseSyncEngineError("unexpected source pull_gen type".to_string())
})?,
None => {
@@ -284,12 +284,14 @@ pub async fn transfer_logical_changes(
let mut select_last_change_id_stmt = target_conn.prepare(TURSO_SYNC_SELECT_LAST_CHANGE_ID)?;
select_last_change_id_stmt.bind_at(1.try_into().unwrap(), Value::Text(Text::new(client_id)));
let mut last_change_id = match run_stmt(coro, &mut select_last_change_id_stmt).await? {
let mut last_change_id = match run_stmt_expect_one_row(coro, &mut select_last_change_id_stmt)
.await?
{
Some(row) => {
let target_pull_gen = row.get_value(0).as_int().ok_or_else(|| {
let target_pull_gen = row[0].as_int().ok_or_else(|| {
Error::DatabaseSyncEngineError("unexpected target pull_gen type".to_string())
})?;
let target_change_id = row.get_value(1).as_int().ok_or_else(|| {
let target_change_id = row[1].as_int().ok_or_else(|| {
Error::DatabaseSyncEngineError("unexpected target change_id type".to_string())
})?;
tracing::debug!(
@@ -321,6 +323,9 @@ pub async fn transfer_logical_changes(
let replay_opts = DatabaseReplaySessionOpts {
use_implicit_rowid: false,
};
let source_schema_cookie = source.connect_untracked()?.read_schema_version()?;
let mut session = target.start_replay_session(coro, replay_opts).await?;
let iterate_opts = DatabaseChangesIteratorOpts {
@@ -369,6 +374,13 @@ pub async fn transfer_logical_changes(
set_last_change_id_stmt
.bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id)));
exec_stmt(coro, &mut set_last_change_id_stmt).await?;
let session_schema_cookie = session.conn().read_schema_version()?;
if session_schema_cookie <= source_schema_cookie {
session
.conn()
.write_schema_version(source_schema_cookie + 1)?;
}
}
_ => {}
}
@@ -575,7 +587,7 @@ pub mod tests {
use crate::{
database_sync_operations::{transfer_logical_changes, transfer_physical_changes},
database_tape::{run_stmt, DatabaseTape, DatabaseTapeOpts},
database_tape::{run_stmt_once, DatabaseTape, DatabaseTapeOpts},
wal_session::WalSession,
Result,
};
@@ -605,7 +617,7 @@ pub mod tests {
let mut rows = Vec::new();
let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -622,7 +634,7 @@ pub mod tests {
let mut rows = Vec::new();
let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -701,7 +713,7 @@ pub mod tests {
let conn2 = db2.connect(&coro).await.unwrap();
let mut rows = Vec::new();
let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -716,7 +728,7 @@ pub mod tests {
conn2.execute("INSERT INTO t VALUES (7, 8)")?;
let mut rows = Vec::new();
let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(

View File

@@ -35,7 +35,7 @@ pub struct DatabaseTapeOpts {
pub cdc_mode: Option<String>,
}
pub(crate) async fn run_stmt<'a>(
pub(crate) async fn run_stmt_once<'a>(
coro: &'_ Coro,
stmt: &'a mut turso_core::Statement,
) -> Result<Option<&'a turso_core::Row>> {
@@ -60,6 +60,28 @@ pub(crate) async fn run_stmt<'a>(
}
}
pub(crate) async fn run_stmt_expect_one_row(
coro: &Coro,
stmt: &mut turso_core::Statement,
) -> Result<Option<Vec<turso_core::Value>>> {
let Some(row) = run_stmt_once(coro, stmt).await? else {
return Ok(None);
};
let values = row.get_values().cloned().collect();
let None = run_stmt_once(coro, stmt).await? else {
return Err(Error::DatabaseTapeError("single row expected".to_string()));
};
Ok(Some(values))
}
pub(crate) async fn run_stmt_ignore_rows(
coro: &Coro,
stmt: &mut turso_core::Statement,
) -> Result<()> {
while run_stmt_once(coro, stmt).await?.is_some() {}
Ok(())
}
pub(crate) async fn exec_stmt(coro: &Coro, stmt: &mut turso_core::Statement) -> Result<()> {
loop {
match stmt.step()? {
@@ -109,7 +131,7 @@ impl DatabaseTape {
let connection = self.inner.connect()?;
tracing::debug!("set '{CDC_PRAGMA_NAME}' for new connection");
let mut stmt = connection.prepare(&self.pragma_query)?;
run_stmt(coro, &mut stmt).await?;
run_stmt_ignore_rows(coro, &mut stmt).await?;
Ok(connection)
}
/// Builds an iterator which emits [DatabaseTapeOperation] by extracting data from CDC table
@@ -168,7 +190,7 @@ impl DatabaseWalSession {
let conn = wal_session.conn();
let frames_count = conn.wal_frame_count()?;
let mut page_size_stmt = conn.prepare("PRAGMA page_size")?;
let Some(row) = run_stmt(coro, &mut page_size_stmt).await? else {
let Some(row) = run_stmt_expect_one_row(coro, &mut page_size_stmt).await? else {
return Err(Error::DatabaseTapeError(
"unable to get database page size".to_string(),
));
@@ -178,18 +200,11 @@ impl DatabaseWalSession {
"unexpected columns count for PRAGMA page_size query".to_string(),
));
}
let turso_core::Value::Integer(page_size) = row.get_value(0) else {
let turso_core::Value::Integer(page_size) = row[0] else {
return Err(Error::DatabaseTapeError(
"unexpected column type for PRAGMA page_size query".to_string(),
));
};
let page_size = *page_size;
let None = run_stmt(coro, &mut page_size_stmt).await? else {
return Err(Error::DatabaseTapeError(
"page size pragma returned multiple rows".to_string(),
));
};
Ok(Self {
page_size: page_size as usize,
next_wal_frame_no: frames_count + 1,
@@ -376,7 +391,7 @@ impl DatabaseChangesIterator {
turso_core::Value::Integer(change_id_filter),
);
while let Some(row) = run_stmt(coro, &mut self.query_stmt).await? {
while let Some(row) = run_stmt_once(coro, &mut self.query_stmt).await? {
let database_change: DatabaseChange = row.try_into()?;
let tape_change = match self.mode {
DatabaseChangesIteratorMode::Apply => database_change.into_apply()?,
@@ -432,7 +447,7 @@ impl DatabaseReplaySession {
DatabaseTapeOperation::RowChange(change) => {
if !self.in_txn {
tracing::trace!("replay: start txn for replaying changes");
self.conn.execute("BEGIN")?;
self.conn.execute("BEGIN IMMEDIATE")?;
self.in_txn = true;
}
tracing::trace!("replay: change={:?}", change);
@@ -696,7 +711,7 @@ impl DatabaseReplaySession {
"SELECT name FROM pragma_table_info('{table_name}')"
))?;
let mut column_names = Vec::with_capacity(columns + 1);
while let Some(column) = run_stmt(coro, &mut table_info_stmt).await? {
while let Some(column) = run_stmt_once(coro, &mut table_info_stmt).await? {
let turso_core::Value::Text(text) = column.get_value(0) else {
return Err(Error::DatabaseTapeError(
"unexpected column type for pragma_table_info query".to_string(),
@@ -721,7 +736,7 @@ impl DatabaseReplaySession {
))?;
let mut pk_predicates = Vec::with_capacity(1);
let mut pk_column_indices = Vec::with_capacity(1);
while let Some(column) = run_stmt(coro, &mut pk_info_stmt).await? {
while let Some(column) = run_stmt_once(coro, &mut pk_info_stmt).await? {
let turso_core::Value::Integer(column_id) = column.get_value(0) else {
return Err(Error::DatabaseTapeError(
"unexpected column type for pragma_table_info query".to_string(),
@@ -764,7 +779,7 @@ impl DatabaseReplaySession {
let mut pk_predicates = Vec::with_capacity(1);
let mut pk_column_indices = Vec::with_capacity(1);
let mut column_updates = Vec::with_capacity(1);
while let Some(column) = run_stmt(coro, &mut table_info_stmt).await? {
while let Some(column) = run_stmt_once(coro, &mut table_info_stmt).await? {
let turso_core::Value::Integer(column_id) = column.get_value(0) else {
return Err(Error::DatabaseTapeError(
"unexpected column type for pragma_table_info query".to_string(),
@@ -836,7 +851,7 @@ mod tests {
use crate::{
database_tape::{
run_stmt, DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape,
run_stmt_once, DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape,
},
types::{DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType},
};
@@ -855,7 +870,7 @@ mod tests {
let conn = db1.connect(&coro).await.unwrap();
let mut stmt = conn.prepare("SELECT * FROM turso_cdc").unwrap();
let mut rows = Vec::new();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
rows
@@ -977,7 +992,7 @@ mod tests {
}
let mut stmt = conn2.prepare("SELECT rowid, x FROM t").unwrap();
let mut rows = Vec::new();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
rows
@@ -1055,7 +1070,7 @@ mod tests {
}
let mut stmt = conn2.prepare("SELECT rowid, x FROM t").unwrap();
let mut rows = Vec::new();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
rows
@@ -1124,7 +1139,7 @@ mod tests {
}
let mut stmt = conn2.prepare("SELECT rowid, x FROM t").unwrap();
let mut rows = Vec::new();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
rows
@@ -1205,7 +1220,7 @@ mod tests {
}
let mut rows = Vec::new();
let mut stmt = conn3.prepare("SELECT rowid, x, y FROM t").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -1219,7 +1234,7 @@ mod tests {
let mut rows = Vec::new();
let mut stmt = conn3.prepare("SELECT rowid, x, y FROM q").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -1236,7 +1251,7 @@ mod tests {
"SELECT * FROM sqlite_schema WHERE name != 'turso_cdc' AND type = 'table'",
)
.unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -1319,7 +1334,7 @@ mod tests {
let mut stmt = conn2
.prepare("SELECT * FROM sqlite_schema WHERE name IN ('t', 't_idx')")
.unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -1403,7 +1418,7 @@ mod tests {
let mut stmt = conn2
.prepare("SELECT * FROM sqlite_schema WHERE name IN ('t')")
.unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
@@ -1501,7 +1516,7 @@ mod tests {
}
let mut rows = Vec::new();
let mut stmt = conn3.prepare("SELECT * FROM t").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(