From ac26e1cbe5d0bce336b631940df7a8f2677174ad Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 14 Aug 2025 17:17:52 +0400 Subject: [PATCH] use DatabaseReplayGenerator in the DatabaseTape --- .../src/database_sync_engine.rs | 2 +- .../turso-sync-engine/src/database_tape.rs | 528 ++++++------------ packages/turso-sync-engine/src/lib.rs | 1 + 3 files changed, 158 insertions(+), 373 deletions(-) diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index 1b8d53161..31267db1c 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -15,7 +15,7 @@ use crate::{ Result, }; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DatabaseSyncEngineOpts { pub client_name: String, pub wal_pull_batch_size: u64, diff --git a/packages/turso-sync-engine/src/database_tape.rs b/packages/turso-sync-engine/src/database_tape.rs index bec747df8..310327a03 100644 --- a/packages/turso-sync-engine/src/database_tape.rs +++ b/packages/turso-sync-engine/src/database_tape.rs @@ -6,6 +6,7 @@ use std::{ use turso_core::{types::WalFrameInfo, StepResult}; use crate::{ + database_replay_generator::{DatabaseReplayGenerator, ReplayInfo}, database_sync_operations::WAL_FRAME_HEADER, errors::Error, types::{ @@ -170,12 +171,12 @@ impl DatabaseTape { let conn = self.connect(coro).await?; conn.execute("BEGIN IMMEDIATE")?; Ok(DatabaseReplaySession { - conn, + conn: conn.clone(), cached_delete_stmt: HashMap::new(), cached_insert_stmt: HashMap::new(), cached_update_stmt: HashMap::new(), in_txn: true, - opts, + generator: DatabaseReplayGenerator { conn, opts }, }) } } @@ -414,23 +415,31 @@ pub struct DatabaseReplaySessionOpts { pub use_implicit_rowid: bool, } -struct DeleteCachedStmt { +struct CachedStmt { stmt: turso_core::Statement, - pk_column_indices: Option>, // if None - use rowid instead -} - -struct UpdateCachedStmt { - stmt: turso_core::Statement, - pk_column_indices: Option>, // if None - use rowid instead + info: ReplayInfo, } pub struct DatabaseReplaySession { conn: Arc, - cached_delete_stmt: HashMap, - cached_insert_stmt: HashMap<(String, usize), turso_core::Statement>, - cached_update_stmt: HashMap<(String, Vec), UpdateCachedStmt>, + cached_delete_stmt: HashMap, + cached_insert_stmt: HashMap<(String, usize), CachedStmt>, + cached_update_stmt: HashMap<(String, Vec), CachedStmt>, in_txn: bool, - opts: DatabaseReplaySessionOpts, + generator: DatabaseReplayGenerator, +} + +async fn replay_stmt( + coro: &Coro, + cached: &mut CachedStmt, + values: Vec, +) -> Result<()> { + cached.stmt.reset(); + for (i, value) in values.into_iter().enumerate() { + cached.stmt.bind_at((i + 1).try_into().unwrap(), value); + } + exec_stmt(coro, &mut cached.stmt).await?; + Ok(()) } impl DatabaseReplaySession { @@ -452,107 +461,116 @@ impl DatabaseReplaySession { self.conn.execute("BEGIN IMMEDIATE")?; self.in_txn = true; } - tracing::trace!("replay: change={:?}", change); - let table_name = &change.table_name; + let table = &change.table_name; + let change_type = (&change.change).into(); - if table_name == SQLITE_SCHEMA_TABLE { - // sqlite_schema table: type, name, tbl_name, rootpage, sql - match change.change { - DatabaseTapeRowChangeType::Delete { before } => { - let before = parse_bin_record(before)?; - assert!(before.len() == 5); - let Some(turso_core::Value::Text(entity_type)) = before.first() else { - panic!( - "unexpected 'type' column of sqlite_schema table: {:?}", - before.first() - ); - }; - let Some(turso_core::Value::Text(entity_name)) = before.get(1) else { - panic!( - "unexpected 'name' column of sqlite_schema table: {:?}", - before.get(1) - ); - }; - self.conn.execute(format!( - "DROP {} {}", - entity_type.as_str(), - entity_name.as_str() - ))?; - } - DatabaseTapeRowChangeType::Insert { after } => { - let after = parse_bin_record(after)?; - assert!(after.len() == 5); - let Some(turso_core::Value::Text(sql)) = after.last() else { - return Err(Error::DatabaseTapeError(format!( - "unexpected 'sql' column of sqlite_schema table: {:?}", - after.last() - ))); - }; - self.conn.execute(sql.as_str())?; - } - DatabaseTapeRowChangeType::Update { updates, .. } => { - let Some(updates) = updates else { - return Err(Error::DatabaseTapeError( - "'updates' column of CDC table must be populated".to_string(), - )); - }; - let updates = parse_bin_record(updates)?; - assert!(updates.len() % 2 == 0); - assert!(updates.len() / 2 == 5); - let turso_core::Value::Text(ddl_stmt) = updates.last().unwrap() else { - panic!( - "unexpected 'sql' column of sqlite_schema table update record: {:?}", - updates.last() - ); - }; - self.conn.execute(ddl_stmt.as_str())?; - } + if table == SQLITE_SCHEMA_TABLE { + let replay_info = self.generator.replay_info(coro, &change).await?; + for replay in &replay_info { + self.conn.execute(replay.query.as_str())?; } } else { match change.change { DatabaseTapeRowChangeType::Delete { before } => { - let before = parse_bin_record(before)?; - self.replay_delete(coro, table_name, change.id, before) - .await? + let key = self.populate_delete_stmt(coro, table).await?; + tracing::trace!( + "ready to use prepared delete statement for replay: key={}", + key + ); + let cached = self.cached_delete_stmt.get_mut(key).unwrap(); + cached.stmt.reset(); + let values = self.generator.replay_values( + &cached.info, + change_type, + change.id, + before, + None, + ); + replay_stmt(coro, cached, values).await?; + } + DatabaseTapeRowChangeType::Insert { after } => { + let key = self.populate_insert_stmt(coro, table, after.len()).await?; + tracing::trace!( + "ready to use prepared insert statement for replay: key={:?}", + key + ); + let cached = self.cached_insert_stmt.get_mut(&key).unwrap(); + cached.stmt.reset(); + let values = self.generator.replay_values( + &cached.info, + change_type, + change.id, + after, + None, + ); + replay_stmt(coro, cached, values).await?; + } + DatabaseTapeRowChangeType::Update { + after, + updates: Some(updates), + .. + } => { + assert!(updates.len() % 2 == 0); + let columns_cnt = updates.len() / 2; + let mut columns = Vec::with_capacity(columns_cnt); + for value in updates.iter().take(columns_cnt) { + columns.push(match value { + turso_core::Value::Integer(x @ (1 | 0)) => *x > 0, + _ => panic!("unexpected 'changes' binary record first-half component: {value:?}") + }); + } + let key = self.populate_update_stmt(coro, table, &columns).await?; + tracing::trace!( + "ready to use prepared update statement for replay: key={:?}", + key + ); + let cached = self.cached_update_stmt.get_mut(&key).unwrap(); + cached.stmt.reset(); + let values = self.generator.replay_values( + &cached.info, + change_type, + change.id, + after, + Some(updates), + ); + replay_stmt(coro, cached, values).await?; } DatabaseTapeRowChangeType::Update { before, after, - updates, + updates: None, } => { - let after = parse_bin_record(after)?; - if let Some(updates) = updates { - let updates = parse_bin_record(updates)?; - assert!(updates.len() % 2 == 0); - let columns_cnt = updates.len() / 2; - let mut columns = Vec::with_capacity(columns_cnt); - let mut values = Vec::with_capacity(columns_cnt); - for (i, value) in updates.into_iter().enumerate() { - if i < columns_cnt { - columns.push(match value { - turso_core::Value::Integer(x @ (1 | 0)) => x > 0, - _ => panic!("unexpected 'changes' binary record first-half component: {value:?}") - }) - } else { - values.push(value); - } - } - self.replay_update( - coro, table_name, change.id, columns, after, values, - ) - .await?; - } else { - let before = parse_bin_record(before)?; - self.replay_delete(coro, table_name, change.id, before) - .await?; - self.replay_insert(coro, table_name, change.id, after) - .await?; - } - } - DatabaseTapeRowChangeType::Insert { after } => { - let values = parse_bin_record(after)?; - self.replay_insert(coro, table_name, change.id, values) - .await?; + let key = self.populate_delete_stmt(coro, table).await?; + tracing::trace!( + "ready to use prepared delete statement for replay of update: key={:?}", + key + ); + let cached = self.cached_delete_stmt.get_mut(key).unwrap(); + cached.stmt.reset(); + let values = self.generator.replay_values( + &cached.info, + change_type, + change.id, + before, + None, + ); + replay_stmt(coro, cached, values).await?; + + let key = self.populate_insert_stmt(coro, table, after.len()).await?; + tracing::trace!( + "ready to use prepared insert statement for replay of update: key={:?}", + key + ); + let cached = self.cached_insert_stmt.get_mut(&key).unwrap(); + cached.stmt.reset(); + let values = self.generator.replay_values( + &cached.info, + change_type, + change.id, + after, + None, + ); + replay_stmt(coro, cached, values).await?; } } } @@ -560,289 +578,55 @@ impl DatabaseReplaySession { } Ok(()) } - async fn replay_delete( - &mut self, - coro: &Coro, - table_name: &str, - id: i64, - mut values: Vec, - ) -> Result<()> { - let cached = self.cached_delete_stmt(coro, table_name).await?; - if let Some(pk_column_indices) = &cached.pk_column_indices { - for (i, pk_column) in pk_column_indices.iter().enumerate() { - let value = std::mem::replace(&mut values[*pk_column], turso_core::Value::Null); - cached.stmt.bind_at((i + 1).try_into().unwrap(), value); - } - } else { - let value = turso_core::Value::Integer(id); - cached.stmt.bind_at(1.try_into().unwrap(), value); + async fn populate_delete_stmt<'a>(&mut self, coro: &Coro, table: &'a str) -> Result<&'a str> { + if self.cached_delete_stmt.contains_key(table) { + return Ok(table); } - exec_stmt(coro, &mut cached.stmt).await?; - Ok(()) + tracing::trace!("prepare delete statement for replay: table={}", table); + let info = self.generator.delete_query(coro, table).await?; + let stmt = self.conn.prepare(&info.query)?; + self.cached_delete_stmt + .insert(table.to_string(), CachedStmt { stmt, info }); + Ok(table) } - async fn replay_insert( + async fn populate_insert_stmt( &mut self, coro: &Coro, - table_name: &str, - id: i64, - values: Vec, - ) -> Result<()> { - let columns = values.len(); - let use_implicit_rowid = self.opts.use_implicit_rowid; - let stmt = self.cached_insert_stmt(coro, table_name, columns).await?; - stmt.reset(); - - for (i, value) in values.into_iter().enumerate() { - stmt.bind_at((i + 1).try_into().unwrap(), value); - } - if use_implicit_rowid { - stmt.bind_at( - (columns + 1).try_into().unwrap(), - turso_core::Value::Integer(id), - ); - } - exec_stmt(coro, stmt).await?; - Ok(()) - } - async fn replay_update( - &mut self, - coro: &Coro, - table_name: &str, - id: i64, - columns: Vec, - mut full: Vec, - mut updates: Vec, - ) -> Result<()> { - let cached = self.cached_update_stmt(coro, table_name, &columns).await?; - let mut position: usize = 1; - for (i, updated) in columns.into_iter().enumerate() { - if !updated { - continue; - } - let value = std::mem::replace(&mut updates[i], turso_core::Value::Null); - cached.stmt.bind_at(position.try_into().unwrap(), value); - position += 1; - } - if let Some(pk_column_indices) = &cached.pk_column_indices { - for pk_column in pk_column_indices { - let value = std::mem::replace(&mut full[*pk_column], turso_core::Value::Null); - cached.stmt.bind_at(position.try_into().unwrap(), value); - position += 1 - } - } else { - let value = turso_core::Value::Integer(id); - cached.stmt.bind_at(position.try_into().unwrap(), value); - } - exec_stmt(coro, &mut cached.stmt).await?; - Ok(()) - } - async fn cached_delete_stmt( - &mut self, - coro: &Coro, - table_name: &str, - ) -> Result<&mut DeleteCachedStmt> { - if !self.cached_delete_stmt.contains_key(table_name) { - tracing::trace!("prepare delete statement for replay: table={}", table_name); - let stmt = self.delete_query(coro, table_name).await?; - self.cached_delete_stmt.insert(table_name.to_string(), stmt); - } - tracing::trace!( - "ready to use prepared delete statement for replay: table={}", - table_name - ); - let cached = self.cached_delete_stmt.get_mut(table_name).unwrap(); - cached.stmt.reset(); - Ok(cached) - } - async fn cached_insert_stmt( - &mut self, - coro: &Coro, - table_name: &str, + table: &str, columns: usize, - ) -> Result<&mut turso_core::Statement> { - let key = (table_name.to_string(), columns); - if !self.cached_insert_stmt.contains_key(&key) { - tracing::trace!( - "prepare insert statement for replay: table={}, columns={}", - table_name, - columns - ); - let stmt = self.insert_query(coro, table_name, columns).await?; - self.cached_insert_stmt.insert(key.clone(), stmt); + ) -> Result<(String, usize)> { + let key = (table.to_string(), columns); + if self.cached_insert_stmt.contains_key(&key) { + return Ok(key); } tracing::trace!( - "ready to use prepared insert statement for replay: table={}, columns={}", - table_name, + "prepare insert statement for replay: table={}, columns={}", + table, columns ); - let stmt = self.cached_insert_stmt.get_mut(&key).unwrap(); - stmt.reset(); - Ok(stmt) + let info = self.generator.insert_query(coro, table, columns).await?; + let stmt = self.conn.prepare(&info.query)?; + self.cached_insert_stmt + .insert(key.clone(), CachedStmt { stmt, info }); + Ok(key) } - async fn cached_update_stmt( + async fn populate_update_stmt( &mut self, coro: &Coro, - table_name: &str, + table: &str, columns: &[bool], - ) -> Result<&mut UpdateCachedStmt> { - let key = (table_name.to_string(), columns.to_owned()); - if !self.cached_update_stmt.contains_key(&key) { - tracing::trace!("prepare update statement for replay: table={}", table_name); - let stmt = self.update_query(coro, table_name, columns).await?; - self.cached_update_stmt.insert(key.clone(), stmt); + ) -> Result<(String, Vec)> { + let key = (table.to_string(), columns.to_owned()); + if self.cached_update_stmt.contains_key(&key) { + return Ok(key); } - tracing::trace!( - "ready to use prepared update statement for replay: table={}", - table_name - ); - let cached = self.cached_update_stmt.get_mut(&key).unwrap(); - cached.stmt.reset(); - Ok(cached) + tracing::trace!("prepare update statement for replay: table={}", table); + let info = self.generator.update_query(coro, table, columns).await?; + let stmt = self.conn.prepare(&info.query)?; + self.cached_update_stmt + .insert(key.clone(), CachedStmt { stmt, info }); + Ok(key) } - async fn insert_query( - &self, - coro: &Coro, - table_name: &str, - columns: usize, - ) -> Result { - let query = if !self.opts.use_implicit_rowid { - let placeholders = ["?"].repeat(columns).join(","); - format!("INSERT INTO {table_name} VALUES ({placeholders})") - } else { - let mut table_info_stmt = self.conn.prepare(format!( - "SELECT name FROM pragma_table_info('{table_name}')" - ))?; - let mut column_names = Vec::with_capacity(columns + 1); - while let Some(column) = run_stmt_once(coro, &mut table_info_stmt).await? { - let turso_core::Value::Text(text) = column.get_value(0) else { - return Err(Error::DatabaseTapeError( - "unexpected column type for pragma_table_info query".to_string(), - )); - }; - column_names.push(text.to_string()); - } - column_names.push("rowid".to_string()); - - let placeholders = ["?"].repeat(columns + 1).join(","); - let column_names = column_names.join(", "); - format!("INSERT INTO {table_name}({column_names}) VALUES ({placeholders})") - }; - Ok(self.conn.prepare(&query)?) - } - async fn delete_query(&self, coro: &Coro, table_name: &str) -> Result { - let (query, pk_column_indices) = if self.opts.use_implicit_rowid { - (format!("DELETE FROM {table_name} WHERE rowid = ?"), None) - } else { - let mut pk_info_stmt = self.conn.prepare(format!( - "SELECT cid, name FROM pragma_table_info('{table_name}') WHERE pk = 1" - ))?; - let mut pk_predicates = Vec::with_capacity(1); - let mut pk_column_indices = Vec::with_capacity(1); - while let Some(column) = run_stmt_once(coro, &mut pk_info_stmt).await? { - let turso_core::Value::Integer(column_id) = column.get_value(0) else { - return Err(Error::DatabaseTapeError( - "unexpected column type for pragma_table_info query".to_string(), - )); - }; - let turso_core::Value::Text(name) = column.get_value(1) else { - return Err(Error::DatabaseTapeError( - "unexpected column type for pragma_table_info query".to_string(), - )); - }; - pk_predicates.push(format!("{name} = ?")); - pk_column_indices.push(*column_id as usize); - } - - if pk_column_indices.is_empty() { - (format!("DELETE FROM {table_name} WHERE rowid = ?"), None) - } else { - let pk_predicates = pk_predicates.join(" AND "); - let query = format!("DELETE FROM {table_name} WHERE {pk_predicates}"); - (query, Some(pk_column_indices)) - } - }; - let use_implicit_rowid = self.opts.use_implicit_rowid; - tracing::trace!("delete_query: table_name={table_name}, query={query}, use_implicit_rowid={use_implicit_rowid}"); - let stmt = self.conn.prepare(&query)?; - Ok(DeleteCachedStmt { - stmt, - pk_column_indices, - }) - } - async fn update_query( - &self, - coro: &Coro, - table_name: &str, - columns: &[bool], - ) -> Result { - let mut table_info_stmt = self.conn.prepare(format!( - "SELECT cid, name, pk FROM pragma_table_info('{table_name}')" - ))?; - let mut pk_predicates = Vec::with_capacity(1); - let mut pk_column_indices = Vec::with_capacity(1); - let mut column_updates = Vec::with_capacity(1); - while let Some(column) = run_stmt_once(coro, &mut table_info_stmt).await? { - let turso_core::Value::Integer(column_id) = column.get_value(0) else { - return Err(Error::DatabaseTapeError( - "unexpected column type for pragma_table_info query".to_string(), - )); - }; - let turso_core::Value::Text(name) = column.get_value(1) else { - return Err(Error::DatabaseTapeError( - "unexpected column type for pragma_table_info query".to_string(), - )); - }; - let turso_core::Value::Integer(pk) = column.get_value(2) else { - return Err(Error::DatabaseTapeError( - "unexpected column type for pragma_table_info query".to_string(), - )); - }; - if *pk == 1 { - pk_predicates.push(format!("{name} = ?")); - pk_column_indices.push(*column_id as usize); - } - if columns[*column_id as usize] { - column_updates.push(format!("{name} = ?")); - } - } - - let (query, pk_column_indices) = if self.opts.use_implicit_rowid { - ( - format!( - "UPDATE {table_name} SET {} WHERE rowid = ?", - column_updates.join(", ") - ), - None, - ) - } else { - ( - format!( - "UPDATE {table_name} SET {} WHERE {}", - column_updates.join(", "), - pk_predicates.join(" AND ") - ), - Some(pk_column_indices), - ) - }; - let stmt = self.conn.prepare(&query)?; - let cached_stmt = UpdateCachedStmt { - stmt, - pk_column_indices, - }; - Ok(cached_stmt) - } -} - -fn parse_bin_record(bin_record: Vec) -> Result> { - let record = turso_core::types::ImmutableRecord::from_bin_record(bin_record); - let mut cursor = turso_core::types::RecordCursor::new(); - let columns = cursor.count(&record); - let mut values = Vec::with_capacity(columns); - for i in 0..columns { - let value = cursor.get_value(&record, i)?; - values.push(value.to_owned()); - } - Ok(values) } #[cfg(test)] diff --git a/packages/turso-sync-engine/src/lib.rs b/packages/turso-sync-engine/src/lib.rs index c67b2e363..c561a5e44 100644 --- a/packages/turso-sync-engine/src/lib.rs +++ b/packages/turso-sync-engine/src/lib.rs @@ -1,3 +1,4 @@ +pub mod database_replay_generator; pub mod database_sync_engine; pub mod database_sync_operations; pub mod database_tape;