From 5e773d286ea3952f9e9ef7a308b920647738bc16 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 11 Aug 2025 16:50:59 +0400 Subject: [PATCH 1/3] support UPDATEs and schema changes in database tape --- .../turso-sync-engine/src/database_tape.rs | 630 +++++++++++++++++- packages/turso-sync-engine/src/types.rs | 27 +- 2 files changed, 630 insertions(+), 27 deletions(-) diff --git a/packages/turso-sync-engine/src/database_tape.rs b/packages/turso-sync-engine/src/database_tape.rs index a2bbddaca..448ad48de 100644 --- a/packages/turso-sync-engine/src/database_tape.rs +++ b/packages/turso-sync-engine/src/database_tape.rs @@ -149,6 +149,7 @@ impl DatabaseTape { conn: self.connect(coro).await?, cached_delete_stmt: HashMap::new(), cached_insert_stmt: HashMap::new(), + cached_update_stmt: HashMap::new(), in_txn: false, opts, }) @@ -283,7 +284,7 @@ impl DatabaseWalSession { } } -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub enum DatabaseChangesIteratorMode { Apply, Revert, @@ -313,7 +314,7 @@ impl DatabaseChangesIteratorMode { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DatabaseChangesIteratorOpts { pub first_change_id: Option, pub batch_size: usize, @@ -341,6 +342,7 @@ pub struct DatabaseChangesIterator { ignore_schema_changes: bool, } +const SQLITE_SCHEMA_TABLE: &str = "sqlite_schema"; impl DatabaseChangesIterator { pub async fn next(&mut self, coro: &Coro) -> Result> { if self.batch.is_empty() { @@ -359,7 +361,7 @@ impl DatabaseChangesIterator { None }; if let Some(DatabaseTapeOperation::RowChange(change)) = &next { - if self.ignore_schema_changes && change.table_name == "sqlite_schema" { + if self.ignore_schema_changes && change.table_name == SQLITE_SCHEMA_TABLE { continue; } } @@ -400,10 +402,16 @@ struct DeleteCachedStmt { pk_column_indices: Option>, // if None - use rowid instead } +struct UpdateCachedStmt { + stmt: turso_core::Statement, + pk_column_indices: Option>, // if None - use rowid instead +} + pub struct DatabaseReplaySession { conn: Arc, cached_delete_stmt: HashMap, cached_insert_stmt: HashMap<(String, usize), turso_core::Statement>, + cached_update_stmt: HashMap<(String, Vec), UpdateCachedStmt>, in_txn: bool, opts: DatabaseReplaySessionOpts, } @@ -429,24 +437,106 @@ impl DatabaseReplaySession { } tracing::trace!("replay: change={:?}", change); let table_name = &change.table_name; - match change.change { - DatabaseTapeRowChangeType::Delete { before } => { - let before = parse_bin_record(before)?; - self.replay_delete(coro, table_name, change.id, before) - .await? + + if table_name == SQLITE_SCHEMA_TABLE { + // sqlite_schema table: type, name, tbl_name, rootpage, sql + match change.change { + DatabaseTapeRowChangeType::Delete { before } => { + let before = parse_bin_record(before)?; + assert!(before.len() == 5); + let Some(turso_core::Value::Text(entity_type)) = before.get(0) else { + panic!( + "unexpected 'type' column of sqlite_schema table: {:?}", + before.get(0) + ); + }; + let Some(turso_core::Value::Text(entity_name)) = before.get(1) else { + panic!( + "unexpected 'name' column of sqlite_schema table: {:?}", + before.get(1) + ); + }; + self.conn.execute(format!( + "DROP {} {}", + entity_type.as_str(), + entity_name.as_str() + ))?; + } + DatabaseTapeRowChangeType::Insert { after } => { + let after = parse_bin_record(after)?; + assert!(after.len() == 5); + let Some(turso_core::Value::Text(sql)) = after.last() else { + return Err(Error::DatabaseTapeError(format!( + "unexpected 'sql' column of sqlite_schema table: {:?}", + after.last() + ))); + }; + self.conn.execute(sql.as_str())?; + } + DatabaseTapeRowChangeType::Update { updates, .. } => { + let Some(updates) = updates else { + return Err(Error::DatabaseTapeError(format!( + "'updates' column of CDC table must be populated" + ))); + }; + let updates = parse_bin_record(updates)?; + assert!(updates.len() % 2 == 0); + assert!(updates.len() / 2 == 5); + let turso_core::Value::Text(ddl_stmt) = updates.last().unwrap() else { + panic!( + "unexpected 'sql' column of sqlite_schema table update record: {:?}", + updates.last() + ); + }; + self.conn.execute(ddl_stmt.as_str())?; + } } - DatabaseTapeRowChangeType::Update { before, after } => { - let before = parse_bin_record(before)?; - self.replay_delete(coro, table_name, change.id, before) - .await?; - let after = parse_bin_record(after)?; - self.replay_insert(coro, table_name, change.id, after) - .await?; - } - DatabaseTapeRowChangeType::Insert { after } => { - let values = parse_bin_record(after)?; - self.replay_insert(coro, table_name, change.id, values) - .await?; + } else { + match change.change { + DatabaseTapeRowChangeType::Delete { before } => { + let before = parse_bin_record(before)?; + self.replay_delete(coro, table_name, change.id, before) + .await? + } + DatabaseTapeRowChangeType::Update { + before, + after, + updates, + } => { + let after = parse_bin_record(after)?; + if let Some(updates) = updates { + let updates = parse_bin_record(updates)?; + assert!(updates.len() % 2 == 0); + let columns_cnt = updates.len() / 2; + let mut columns = Vec::with_capacity(columns_cnt); + let mut values = Vec::with_capacity(columns_cnt); + for (i, value) in updates.into_iter().enumerate() { + if i < columns_cnt { + columns.push(match value { + turso_core::Value::Integer(x @ (1 | 0)) => x > 0, + _ => panic!("unexpected 'changes' binary record first-half component: {:?}", value) + }) + } else { + values.push(value); + } + } + self.replay_update( + coro, table_name, change.id, columns, after, values, + ) + .await?; + } else { + let before = parse_bin_record(before)?; + self.replay_delete(coro, table_name, change.id, before) + .await?; + self.replay_insert(coro, table_name, change.id, after) + .await?; + } + } + DatabaseTapeRowChangeType::Insert { after } => { + let values = parse_bin_record(after)?; + self.replay_insert(coro, table_name, change.id, values) + .await?; + } } } } @@ -497,6 +587,38 @@ impl DatabaseReplaySession { exec_stmt(coro, stmt).await?; Ok(()) } + async fn replay_update( + &mut self, + coro: &Coro, + table_name: &str, + id: i64, + columns: Vec, + mut full: Vec, + mut updates: Vec, + ) -> Result<()> { + let cached = self.cached_update_stmt(coro, table_name, &columns).await?; + let mut position: usize = 1; + for (i, updated) in columns.into_iter().enumerate() { + if !updated { + continue; + } + let value = std::mem::replace(&mut updates[i], turso_core::Value::Null); + cached.stmt.bind_at(position.try_into().unwrap(), value); + position += 1; + } + if let Some(pk_column_indices) = &cached.pk_column_indices { + for pk_column in pk_column_indices { + let value = std::mem::replace(&mut full[*pk_column], turso_core::Value::Null); + cached.stmt.bind_at(position.try_into().unwrap(), value); + position += 1 + } + } else { + let value = turso_core::Value::Integer(id); + cached.stmt.bind_at(position.try_into().unwrap(), value); + } + exec_stmt(coro, &mut cached.stmt).await?; + Ok(()) + } async fn cached_delete_stmt( &mut self, coro: &Coro, @@ -540,7 +662,26 @@ impl DatabaseReplaySession { stmt.reset(); Ok(stmt) } - + async fn cached_update_stmt( + &mut self, + coro: &Coro, + table_name: &str, + columns: &Vec, + ) -> Result<&mut UpdateCachedStmt> { + let key = (table_name.to_string(), columns.clone()); + if !self.cached_update_stmt.contains_key(&key) { + tracing::trace!("prepare update statement for replay: table={}", table_name); + let stmt = self.update_query(coro, table_name, &columns).await?; + self.cached_update_stmt.insert(key.clone(), stmt); + } + tracing::trace!( + "ready to use prepared update statement for replay: table={}", + table_name + ); + let cached = self.cached_update_stmt.get_mut(&key).unwrap(); + cached.stmt.reset(); + Ok(cached) + } async fn insert_query( &self, coro: &Coro, @@ -571,7 +712,6 @@ impl DatabaseReplaySession { }; Ok(self.conn.prepare(&query)?) } - async fn delete_query(&self, coro: &Coro, table_name: &str) -> Result { let (query, pk_column_indices) = if self.opts.use_implicit_rowid { (format!("DELETE FROM {table_name} WHERE rowid = ?"), None) @@ -612,6 +752,68 @@ impl DatabaseReplaySession { pk_column_indices, }) } + async fn update_query( + &self, + coro: &Coro, + table_name: &str, + columns: &[bool], + ) -> Result { + let mut table_info_stmt = self.conn.prepare(format!( + "SELECT cid, name, pk FROM pragma_table_info('{table_name}')" + ))?; + let mut pk_predicates = Vec::with_capacity(1); + let mut pk_column_indices = Vec::with_capacity(1); + let mut column_updates = Vec::with_capacity(1); + while let Some(column) = run_stmt(coro, &mut table_info_stmt).await? { + let turso_core::Value::Integer(column_id) = column.get_value(0) else { + return Err(Error::DatabaseTapeError( + "unexpected column type for pragma_table_info query".to_string(), + )); + }; + let turso_core::Value::Text(name) = column.get_value(1) else { + return Err(Error::DatabaseTapeError( + "unexpected column type for pragma_table_info query".to_string(), + )); + }; + let turso_core::Value::Integer(pk) = column.get_value(2) else { + return Err(Error::DatabaseTapeError( + "unexpected column type for pragma_table_info query".to_string(), + )); + }; + if *pk == 1 { + pk_predicates.push(format!("{name} = ?")); + pk_column_indices.push(*column_id as usize); + } + if columns[*column_id as usize] { + column_updates.push(format!("{name} = ?")); + } + } + + let (query, pk_column_indices) = if self.opts.use_implicit_rowid { + ( + format!( + "UPDATE {table_name} SET {} WHERE rowid = ?", + column_updates.join(", ") + ), + None, + ) + } else { + ( + format!( + "UPDATE {table_name} SET {} WHERE {}", + column_updates.join(", "), + pk_predicates.join(" AND ") + ), + Some(pk_column_indices), + ) + }; + let stmt = self.conn.prepare(&query)?; + let cached_stmt = UpdateCachedStmt { + stmt, + pk_column_indices, + }; + Ok(cached_stmt) + } } fn parse_bin_record(bin_record: Vec) -> Result> { @@ -633,7 +835,9 @@ mod tests { use tempfile::NamedTempFile; use crate::{ - database_tape::{run_stmt, DatabaseReplaySessionOpts, DatabaseTape}, + database_tape::{ + run_stmt, DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape, + }, types::{DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType}, }; @@ -941,4 +1145,384 @@ mod tests { ]] ); } + + #[test] + pub fn test_database_tape_replay_schema_changes() { + let temp_file1 = NamedTempFile::new().unwrap(); + let db_path1 = temp_file1.path().to_str().unwrap(); + let temp_file2 = NamedTempFile::new().unwrap(); + let db_path2 = temp_file2.path().to_str().unwrap(); + let temp_file3 = NamedTempFile::new().unwrap(); + let db_path3 = temp_file3.path().to_str().unwrap(); + + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + + let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap(); + let db1 = Arc::new(DatabaseTape::new(db1)); + + let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap(); + let db2 = Arc::new(DatabaseTape::new(db2)); + + let db3 = turso_core::Database::open_file(io.clone(), db_path3, false, true).unwrap(); + let db3 = Arc::new(DatabaseTape::new(db3)); + + let mut gen = genawaiter::sync::Gen::new({ + |coro| async move { + let conn1 = db1.connect(&coro).await.unwrap(); + conn1 + .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)") + .unwrap(); + conn1 + .execute("INSERT INTO t(x, y) VALUES ('a', 10)") + .unwrap(); + let conn2 = db2.connect(&coro).await.unwrap(); + conn2 + .execute("CREATE TABLE q(x TEXT PRIMARY KEY, y)") + .unwrap(); + conn2 + .execute("INSERT INTO q(x, y) VALUES ('b', 20)") + .unwrap(); + + let conn3 = db3.connect(&coro).await.unwrap(); + { + let opts = DatabaseReplaySessionOpts { + use_implicit_rowid: false, + }; + let mut session = db3.start_replay_session(&coro, opts).await.unwrap(); + + let opts = DatabaseChangesIteratorOpts { + ignore_schema_changes: false, + ..Default::default() + }; + let mut iterator = db1.iterate_changes(opts.clone()).unwrap(); + while let Some(operation) = iterator.next(&coro).await.unwrap() { + session.replay(&coro, operation).await.unwrap(); + } + let mut iterator = db2.iterate_changes(opts.clone()).unwrap(); + while let Some(operation) = iterator.next(&coro).await.unwrap() { + session.replay(&coro, operation).await.unwrap(); + } + } + let mut rows = Vec::new(); + let mut stmt = conn3.prepare("SELECT rowid, x, y FROM t").unwrap(); + while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + rows.push(row.get_values().cloned().collect::>()); + } + assert_eq!( + rows, + vec![vec![ + turso_core::Value::Integer(1), + turso_core::Value::Text(turso_core::types::Text::new("a")), + turso_core::Value::Integer(10), + ]] + ); + + let mut rows = Vec::new(); + let mut stmt = conn3.prepare("SELECT rowid, x, y FROM q").unwrap(); + while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + rows.push(row.get_values().cloned().collect::>()); + } + assert_eq!( + rows, + vec![vec![ + turso_core::Value::Integer(1), + turso_core::Value::Text(turso_core::types::Text::new("b")), + turso_core::Value::Integer(20), + ]] + ); + let mut rows = Vec::new(); + let mut stmt = conn3 + .prepare( + "SELECT * FROM sqlite_schema WHERE name != 'turso_cdc' AND type = 'table'", + ) + .unwrap(); + while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + rows.push(row.get_values().cloned().collect::>()); + } + assert_eq!( + rows, + vec![ + vec![ + turso_core::Value::Text(turso_core::types::Text::new("table")), + turso_core::Value::Text(turso_core::types::Text::new("t")), + turso_core::Value::Text(turso_core::types::Text::new("t")), + turso_core::Value::Integer(3), + turso_core::Value::Text(turso_core::types::Text::new( + "CREATE TABLE t (x TEXT PRIMARY KEY, y)" + )), + ], + vec![ + turso_core::Value::Text(turso_core::types::Text::new("table")), + turso_core::Value::Text(turso_core::types::Text::new("q")), + turso_core::Value::Text(turso_core::types::Text::new("q")), + turso_core::Value::Integer(5), + turso_core::Value::Text(turso_core::types::Text::new( + "CREATE TABLE q (x TEXT PRIMARY KEY, y)" + )), + ] + ] + ); + crate::Result::Ok(()) + } + }); + loop { + match gen.resume_with(Ok(())) { + genawaiter::GeneratorState::Yielded(..) => io.run_once().unwrap(), + genawaiter::GeneratorState::Complete(result) => { + result.unwrap(); + break; + } + } + } + } + + #[test] + pub fn test_database_tape_replay_create_index() { + let temp_file1 = NamedTempFile::new().unwrap(); + let db_path1 = temp_file1.path().to_str().unwrap(); + let temp_file2 = NamedTempFile::new().unwrap(); + let db_path2 = temp_file2.path().to_str().unwrap(); + + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + + let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap(); + let db1 = Arc::new(DatabaseTape::new(db1)); + + let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap(); + let db2 = Arc::new(DatabaseTape::new(db2)); + + let mut gen = genawaiter::sync::Gen::new({ + |coro| async move { + let conn1 = db1.connect(&coro).await.unwrap(); + conn1 + .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)") + .unwrap(); + conn1.execute("CREATE INDEX t_idx ON t(y)").unwrap(); + + let conn2 = db2.connect(&coro).await.unwrap(); + { + let opts = DatabaseReplaySessionOpts { + use_implicit_rowid: false, + }; + let mut session = db2.start_replay_session(&coro, opts).await.unwrap(); + + let opts = DatabaseChangesIteratorOpts { + ignore_schema_changes: false, + ..Default::default() + }; + let mut iterator = db1.iterate_changes(opts.clone()).unwrap(); + while let Some(operation) = iterator.next(&coro).await.unwrap() { + session.replay(&coro, operation).await.unwrap(); + } + } + let mut rows = Vec::new(); + let mut stmt = conn2 + .prepare("SELECT * FROM sqlite_schema WHERE name IN ('t', 't_idx')") + .unwrap(); + while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + rows.push(row.get_values().cloned().collect::>()); + } + assert_eq!( + rows, + vec![ + vec![ + turso_core::Value::Text(turso_core::types::Text::new("table")), + turso_core::Value::Text(turso_core::types::Text::new("t")), + turso_core::Value::Text(turso_core::types::Text::new("t")), + turso_core::Value::Integer(3), + turso_core::Value::Text(turso_core::types::Text::new( + "CREATE TABLE t (x TEXT PRIMARY KEY, y)" + )), + ], + vec![ + turso_core::Value::Text(turso_core::types::Text::new("index")), + turso_core::Value::Text(turso_core::types::Text::new("t_idx")), + turso_core::Value::Text(turso_core::types::Text::new("t")), + turso_core::Value::Integer(5), + turso_core::Value::Text(turso_core::types::Text::new( + "CREATE INDEX t_idx ON t (y)" + )), + ] + ] + ); + crate::Result::Ok(()) + } + }); + loop { + match gen.resume_with(Ok(())) { + genawaiter::GeneratorState::Yielded(..) => io.run_once().unwrap(), + genawaiter::GeneratorState::Complete(result) => { + result.unwrap(); + break; + } + } + } + } + + #[test] + pub fn test_database_tape_replay_alter_table() { + let temp_file1 = NamedTempFile::new().unwrap(); + let db_path1 = temp_file1.path().to_str().unwrap(); + let temp_file2 = NamedTempFile::new().unwrap(); + let db_path2 = temp_file2.path().to_str().unwrap(); + + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + + let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap(); + let db1 = Arc::new(DatabaseTape::new(db1)); + + let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap(); + let db2 = Arc::new(DatabaseTape::new(db2)); + + let mut gen = genawaiter::sync::Gen::new({ + |coro| async move { + let conn1 = db1.connect(&coro).await.unwrap(); + conn1 + .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)") + .unwrap(); + conn1.execute("ALTER TABLE t ADD COLUMN z").unwrap(); + conn1.execute("ALTER TABLE t DROP COLUMN y").unwrap(); + + let conn2 = db2.connect(&coro).await.unwrap(); + { + let opts = DatabaseReplaySessionOpts { + use_implicit_rowid: false, + }; + let mut session = db2.start_replay_session(&coro, opts).await.unwrap(); + + let opts = DatabaseChangesIteratorOpts { + ignore_schema_changes: false, + ..Default::default() + }; + let mut iterator = db1.iterate_changes(opts.clone()).unwrap(); + while let Some(operation) = iterator.next(&coro).await.unwrap() { + session.replay(&coro, operation).await.unwrap(); + } + } + let mut rows = Vec::new(); + let mut stmt = conn2 + .prepare("SELECT * FROM sqlite_schema WHERE name IN ('t')") + .unwrap(); + while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + rows.push(row.get_values().cloned().collect::>()); + } + assert_eq!( + rows, + vec![vec![ + turso_core::Value::Text(turso_core::types::Text::new("table")), + turso_core::Value::Text(turso_core::types::Text::new("t")), + turso_core::Value::Text(turso_core::types::Text::new("t")), + turso_core::Value::Integer(3), + turso_core::Value::Text(turso_core::types::Text::new( + "CREATE TABLE t (x TEXT PRIMARY KEY, z)" + )), + ]] + ); + crate::Result::Ok(()) + } + }); + loop { + match gen.resume_with(Ok(())) { + genawaiter::GeneratorState::Yielded(..) => io.run_once().unwrap(), + genawaiter::GeneratorState::Complete(result) => { + result.unwrap(); + break; + } + } + } + } + + #[test] + pub fn test_database_tape_replay_non_overlapping_updates() { + let temp_file1 = NamedTempFile::new().unwrap(); + let db_path1 = temp_file1.path().to_str().unwrap(); + let temp_file2 = NamedTempFile::new().unwrap(); + let db_path2 = temp_file2.path().to_str().unwrap(); + let temp_file3 = NamedTempFile::new().unwrap(); + let db_path3 = temp_file3.path().to_str().unwrap(); + + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + + let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap(); + let db1 = Arc::new(DatabaseTape::new(db1)); + + let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap(); + let db2 = Arc::new(DatabaseTape::new(db2)); + + let db3 = turso_core::Database::open_file(io.clone(), db_path3, false, true).unwrap(); + let db3 = Arc::new(DatabaseTape::new(db3)); + + let mut gen = genawaiter::sync::Gen::new({ + |coro| async move { + let conn1 = db1.connect(&coro).await.unwrap(); + conn1 + .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)") + .unwrap(); + conn1 + .execute("INSERT INTO t VALUES ('turso', 1, 2)") + .unwrap(); + conn1 + .execute("UPDATE t SET y = 10 WHERE x = 'turso'") + .unwrap(); + + let conn2 = db2.connect_untracked().unwrap(); + conn2 + .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)") + .unwrap(); + conn2 + .execute("INSERT INTO t VALUES ('turso', 1, 2)") + .unwrap(); + + let conn2 = db2.connect(&coro).await.unwrap(); + conn2 + .execute("UPDATE t SET z = 20 WHERE x = 'turso'") + .unwrap(); + + let conn3 = db3.connect(&coro).await.unwrap(); + { + let opts = DatabaseReplaySessionOpts { + use_implicit_rowid: false, + }; + let mut session = db3.start_replay_session(&coro, opts).await.unwrap(); + + let opts = DatabaseChangesIteratorOpts { + ignore_schema_changes: false, + ..Default::default() + }; + let mut iterator = db1.iterate_changes(opts.clone()).unwrap(); + while let Some(operation) = iterator.next(&coro).await.unwrap() { + session.replay(&coro, operation).await.unwrap(); + } + + let mut iterator = db2.iterate_changes(opts.clone()).unwrap(); + while let Some(operation) = iterator.next(&coro).await.unwrap() { + session.replay(&coro, operation).await.unwrap(); + } + } + let mut rows = Vec::new(); + let mut stmt = conn3.prepare("SELECT * FROM t").unwrap(); + while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() { + rows.push(row.get_values().cloned().collect::>()); + } + assert_eq!( + rows, + vec![vec![ + turso_core::Value::Text(turso_core::types::Text::new("turso")), + turso_core::Value::Integer(10), + turso_core::Value::Integer(20), + ]] + ); + crate::Result::Ok(()) + } + }); + loop { + match gen.resume_with(Ok(())) { + genawaiter::GeneratorState::Yielded(..) => io.run_once().unwrap(), + genawaiter::GeneratorState::Complete(result) => { + result.unwrap(); + break; + } + } + } + } } diff --git a/packages/turso-sync-engine/src/types.rs b/packages/turso-sync-engine/src/types.rs index af7cad9ce..c4fd297d1 100644 --- a/packages/turso-sync-engine/src/types.rs +++ b/packages/turso-sync-engine/src/types.rs @@ -66,6 +66,8 @@ pub struct DatabaseChange { pub before: Option>, /// Binary record of the row after the change, if CDC pragma set to either 'after' or 'full' pub after: Option>, + /// Binary record from "updates" column, if CDC pragma set to 'full' + pub updates: Option>, } impl DatabaseChange { @@ -86,6 +88,7 @@ impl DatabaseChange { after: self.after.ok_or_else(|| { Error::DatabaseTapeError("cdc_mode must be set to 'full'".to_string()) })?, + updates: self.updates, }, DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Insert { after: self.after.ok_or_else(|| { @@ -122,6 +125,7 @@ impl DatabaseChange { "cdc_mode must be set to either 'full' or 'before'".to_string(), ) })?, + updates: None, }, DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Delete { before: self.after.ok_or_else(|| { @@ -166,6 +170,7 @@ impl TryFrom<&turso_core::Row> for DatabaseChange { let id = get_core_value_i64(row, 4)?; let before = get_core_value_blob_or_null(row, 5)?; let after = get_core_value_blob_or_null(row, 6)?; + let updates = get_core_value_blob_or_null(row, 7)?; let change_type = match change_type { -1 => DatabaseChangeType::Delete, @@ -185,14 +190,23 @@ impl TryFrom<&turso_core::Row> for DatabaseChange { id, before, after, + updates, }) } } pub enum DatabaseTapeRowChangeType { - Delete { before: Vec }, - Update { before: Vec, after: Vec }, - Insert { after: Vec }, + Delete { + before: Vec, + }, + Update { + before: Vec, + after: Vec, + updates: Option>, + }, + Insert { + after: Vec, + }, } /// [DatabaseTapeOperation] extends [DatabaseTapeRowChange] by adding information about transaction boundary @@ -222,10 +236,15 @@ impl std::fmt::Debug for DatabaseTapeRowChangeType { .debug_struct("Delete") .field("before.len()", &before.len()) .finish(), - Self::Update { before, after } => f + Self::Update { + before, + after, + updates, + } => f .debug_struct("Update") .field("before.len()", &before.len()) .field("after.len()", &after.len()) + .field("updates.len()", &updates.as_ref().map(|x| x.len())) .finish(), Self::Insert { after } => f .debug_struct("Insert") From 372d23b7ccf760400c3ffa0e1a84cd9bb75fc343 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 11 Aug 2025 16:52:18 +0400 Subject: [PATCH 2/3] handle schema changes in sync engine --- packages/turso-sync-engine/src/database_sync_operations.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index 0e8d145a6..f8667c5ab 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -326,6 +326,7 @@ pub async fn transfer_logical_changes( let iterate_opts = DatabaseChangesIteratorOpts { first_change_id: last_change_id.map(|x| x + 1), mode: DatabaseChangesIteratorMode::Apply, + ignore_schema_changes: false, ..Default::default() }; let mut changes = source.iterate_changes(iterate_opts)?; @@ -599,7 +600,6 @@ pub mod tests { conn1.execute("INSERT INTO t VALUES (1, 2), (3, 4), (5, 6)")?; let conn2 = db2.connect(&coro).await?; - conn2.execute("CREATE TABLE t(x, y)")?; transfer_logical_changes(&coro, &db1, &db2, "id-1", false).await?; From aa5de7b8bd8159ecc517591d99e55c6e997a85c7 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 11 Aug 2025 17:02:36 +0400 Subject: [PATCH 3/3] fix clippy --- .../turso-sync-engine/src/database_tape.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/turso-sync-engine/src/database_tape.rs b/packages/turso-sync-engine/src/database_tape.rs index 448ad48de..83255f47f 100644 --- a/packages/turso-sync-engine/src/database_tape.rs +++ b/packages/turso-sync-engine/src/database_tape.rs @@ -444,10 +444,10 @@ impl DatabaseReplaySession { DatabaseTapeRowChangeType::Delete { before } => { let before = parse_bin_record(before)?; assert!(before.len() == 5); - let Some(turso_core::Value::Text(entity_type)) = before.get(0) else { + let Some(turso_core::Value::Text(entity_type)) = before.first() else { panic!( "unexpected 'type' column of sqlite_schema table: {:?}", - before.get(0) + before.first() ); }; let Some(turso_core::Value::Text(entity_name)) = before.get(1) else { @@ -475,9 +475,9 @@ impl DatabaseReplaySession { } DatabaseTapeRowChangeType::Update { updates, .. } => { let Some(updates) = updates else { - return Err(Error::DatabaseTapeError(format!( - "'updates' column of CDC table must be populated" - ))); + return Err(Error::DatabaseTapeError( + "'updates' column of CDC table must be populated".to_string(), + )); }; let updates = parse_bin_record(updates)?; assert!(updates.len() % 2 == 0); @@ -514,7 +514,7 @@ impl DatabaseReplaySession { if i < columns_cnt { columns.push(match value { turso_core::Value::Integer(x @ (1 | 0)) => x > 0, - _ => panic!("unexpected 'changes' binary record first-half component: {:?}", value) + _ => panic!("unexpected 'changes' binary record first-half component: {value:?}") }) } else { values.push(value); @@ -666,12 +666,12 @@ impl DatabaseReplaySession { &mut self, coro: &Coro, table_name: &str, - columns: &Vec, + columns: &[bool], ) -> Result<&mut UpdateCachedStmt> { - let key = (table_name.to_string(), columns.clone()); + let key = (table_name.to_string(), columns.to_owned()); if !self.cached_update_stmt.contains_key(&key) { tracing::trace!("prepare update statement for replay: table={}", table_name); - let stmt = self.update_query(coro, table_name, &columns).await?; + let stmt = self.update_query(coro, table_name, columns).await?; self.cached_update_stmt.insert(key.clone(), stmt); } tracing::trace!(