From 91aeadd94063234a1a5ab3e8f83be8c054e22cfe Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 2 Oct 2025 15:03:25 +0400 Subject: [PATCH] apply create DDL operation with IF NOT EXISTS clause in order to make them idempotent --- Cargo.lock | 1 + sync/engine/Cargo.toml | 1 + sync/engine/src/database_replay_generator.rs | 38 ++++++++- sync/engine/src/database_tape.rs | 87 ++++++++++++++++++++ 4 files changed, 126 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 8af968be4..ba12322bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4589,6 +4589,7 @@ dependencies = [ "tracing-subscriber", "turso", "turso_core", + "turso_parser", "uuid", ] diff --git a/sync/engine/Cargo.toml b/sync/engine/Cargo.toml index 89b20b406..e99564c17 100644 --- a/sync/engine/Cargo.toml +++ b/sync/engine/Cargo.toml @@ -8,6 +8,7 @@ repository.workspace = true [dependencies] turso_core = { workspace = true, features = ["conn_raw_api"] } +turso_parser = { workspace = true } thiserror = "2.0.12" tracing = "0.1.41" serde_json.workspace = true diff --git a/sync/engine/src/database_replay_generator.rs b/sync/engine/src/database_replay_generator.rs index 02532d825..616136305 100644 --- a/sync/engine/src/database_replay_generator.rs +++ b/sync/engine/src/database_replay_generator.rs @@ -1,5 +1,7 @@ use std::{collections::HashMap, sync::Arc}; +use turso_parser::parser::Parser; + use crate::{ database_tape::{run_stmt_once, DatabaseReplaySessionOpts}, errors::Error, @@ -211,9 +213,43 @@ impl DatabaseReplayGenerator { after.last() ))); }; + let mut parser = Parser::new(sql.as_str().as_bytes()); + let mut ast = parser + .next() + .ok_or_else(|| { + Error::DatabaseTapeError(format!( + "unexpected DDL query: {}", + sql.as_str() + )) + })? + .map_err(|e| { + Error::DatabaseTapeError(format!( + "unexpected DDL query {}: {}", + e, + sql.as_str() + )) + })?; + let turso_parser::ast::Cmd::Stmt(stmt) = &mut ast else { + return Err(Error::DatabaseTapeError(format!( + "unexpected DDL query: {}", + sql.as_str() + ))); + }; + match stmt { + turso_parser::ast::Stmt::CreateTable { if_not_exists, .. } + | turso_parser::ast::Stmt::CreateIndex { if_not_exists, .. } + | turso_parser::ast::Stmt::CreateTrigger { if_not_exists, .. } + | turso_parser::ast::Stmt::CreateMaterializedView { + if_not_exists, .. + } + | turso_parser::ast::Stmt::CreateView { if_not_exists, .. } => { + *if_not_exists = true + } + _ => {} + } let insert = ReplayInfo { change_type: DatabaseChangeType::Insert, - query: sql.as_str().to_string(), + query: ast.to_string(), pk_column_indices: None, column_names: Vec::new(), is_ddl_replay: true, diff --git a/sync/engine/src/database_tape.rs b/sync/engine/src/database_tape.rs index bce8acc79..6f6727475 100644 --- a/sync/engine/src/database_tape.rs +++ b/sync/engine/src/database_tape.rs @@ -1387,4 +1387,91 @@ mod tests { } } } + + #[test] + pub fn test_database_tape_replay_ddl_changes_idempotent() { + let temp_file1 = NamedTempFile::new().unwrap(); + let db_path1 = temp_file1.path().to_str().unwrap(); + let temp_file2 = NamedTempFile::new().unwrap(); + let db_path2 = temp_file2.path().to_str().unwrap(); + let temp_file3 = NamedTempFile::new().unwrap(); + let db_path3 = temp_file3.path().to_str().unwrap(); + + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + + let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap(); + let db1 = Arc::new(DatabaseTape::new(db1)); + + let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap(); + let db2 = Arc::new(DatabaseTape::new(db2)); + + let db3 = turso_core::Database::open_file(io.clone(), db_path3, false, true).unwrap(); + let db3 = Arc::new(DatabaseTape::new(db3)); + + let mut gen = genawaiter::sync::Gen::new({ + |coro| async move { + let coro: Coro<()> = coro.into(); + let conn1 = db1.connect(&coro).await.unwrap(); + conn1 + .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)") + .unwrap(); + conn1.execute("CREATE INDEX t_idx ON t(y, z)").unwrap(); + + let conn2 = db2.connect(&coro).await.unwrap(); + conn2 + .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)") + .unwrap(); + conn2.execute("CREATE INDEX t_idx ON t(y, z)").unwrap(); + + let conn3 = db3.connect(&coro).await.unwrap(); + { + let opts = DatabaseReplaySessionOpts { + use_implicit_rowid: false, + }; + let mut session = db3.start_replay_session(&coro, opts).await.unwrap(); + + let opts = DatabaseChangesIteratorOpts { + ignore_schema_changes: false, + ..Default::default() + }; + let mut iterator = db1.iterate_changes(opts.clone()).unwrap(); + while let Some(operation) = iterator.next(&coro).await.unwrap() { + tracing::info!("1. operation: {:?}", operation); + session.replay(&coro, operation).await.unwrap(); + } + + let mut iterator = db2.iterate_changes(opts.clone()).unwrap(); + while let Some(operation) = iterator.next(&coro).await.unwrap() { + tracing::info!("2. operation: {:?}", operation); + session.replay(&coro, operation).await.unwrap(); + } + } + let mut rows = Vec::new(); + let mut stmt = conn3.prepare("SELECT name FROM sqlite_master").unwrap(); + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { + rows.push(row.get_value(0).to_text().unwrap().to_string()); + } + assert_eq!( + rows, + vec![ + "sqlite_sequence".to_string(), + "turso_cdc".to_string(), + "t".to_string(), + "sqlite_autoindex_t_1".to_string(), + "t_idx".to_string() + ] + ); + crate::Result::Ok(()) + } + }); + loop { + match gen.resume_with(Ok(())) { + genawaiter::GeneratorState::Yielded(..) => io.step().unwrap(), + genawaiter::GeneratorState::Complete(result) => { + result.unwrap(); + break; + } + } + } + } }