mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-25 12:04:21 +01:00
Merge 'turso-sync: support updates and schema changes' from Nikita Sivukhin
Add support for schema changes and granular updates in the `DatabaseTape` and turso-sync-engine Now, schema changes made locally will be replicated to the remote too. Also, `UPDATE`s made locally will touch only changed columns (before we did `DELETE` + `INSERT` which can overwrite non-conflicting changes from another device to the same row). Note, that schema changes replication for now can be pretty dangerous, as we can't extract proper schema at some moment in time from turso_cdc and always use latest schema columns. This means that it's better to avoid `ALTER TABLE ...` to be executed locally, but basic DDL like `CREATE TABLE / CREATE INDEX / DROP TABLE / DROP INDEX` will work fine (as columns only appear/disappear for schema in this case). Closes #2540
This commit is contained in:
@@ -326,6 +326,7 @@ pub async fn transfer_logical_changes(
|
||||
let iterate_opts = DatabaseChangesIteratorOpts {
|
||||
first_change_id: last_change_id.map(|x| x + 1),
|
||||
mode: DatabaseChangesIteratorMode::Apply,
|
||||
ignore_schema_changes: false,
|
||||
..Default::default()
|
||||
};
|
||||
let mut changes = source.iterate_changes(iterate_opts)?;
|
||||
@@ -599,7 +600,6 @@ pub mod tests {
|
||||
conn1.execute("INSERT INTO t VALUES (1, 2), (3, 4), (5, 6)")?;
|
||||
|
||||
let conn2 = db2.connect(&coro).await?;
|
||||
conn2.execute("CREATE TABLE t(x, y)")?;
|
||||
|
||||
transfer_logical_changes(&coro, &db1, &db2, "id-1", false).await?;
|
||||
|
||||
|
||||
@@ -149,6 +149,7 @@ impl DatabaseTape {
|
||||
conn: self.connect(coro).await?,
|
||||
cached_delete_stmt: HashMap::new(),
|
||||
cached_insert_stmt: HashMap::new(),
|
||||
cached_update_stmt: HashMap::new(),
|
||||
in_txn: false,
|
||||
opts,
|
||||
})
|
||||
@@ -283,7 +284,7 @@ impl DatabaseWalSession {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum DatabaseChangesIteratorMode {
|
||||
Apply,
|
||||
Revert,
|
||||
@@ -313,7 +314,7 @@ impl DatabaseChangesIteratorMode {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DatabaseChangesIteratorOpts {
|
||||
pub first_change_id: Option<i64>,
|
||||
pub batch_size: usize,
|
||||
@@ -341,6 +342,7 @@ pub struct DatabaseChangesIterator {
|
||||
ignore_schema_changes: bool,
|
||||
}
|
||||
|
||||
const SQLITE_SCHEMA_TABLE: &str = "sqlite_schema";
|
||||
impl DatabaseChangesIterator {
|
||||
pub async fn next(&mut self, coro: &Coro) -> Result<Option<DatabaseTapeOperation>> {
|
||||
if self.batch.is_empty() {
|
||||
@@ -359,7 +361,7 @@ impl DatabaseChangesIterator {
|
||||
None
|
||||
};
|
||||
if let Some(DatabaseTapeOperation::RowChange(change)) = &next {
|
||||
if self.ignore_schema_changes && change.table_name == "sqlite_schema" {
|
||||
if self.ignore_schema_changes && change.table_name == SQLITE_SCHEMA_TABLE {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -400,10 +402,16 @@ struct DeleteCachedStmt {
|
||||
pk_column_indices: Option<Vec<usize>>, // if None - use rowid instead
|
||||
}
|
||||
|
||||
struct UpdateCachedStmt {
|
||||
stmt: turso_core::Statement,
|
||||
pk_column_indices: Option<Vec<usize>>, // if None - use rowid instead
|
||||
}
|
||||
|
||||
pub struct DatabaseReplaySession {
|
||||
conn: Arc<turso_core::Connection>,
|
||||
cached_delete_stmt: HashMap<String, DeleteCachedStmt>,
|
||||
cached_insert_stmt: HashMap<(String, usize), turso_core::Statement>,
|
||||
cached_update_stmt: HashMap<(String, Vec<bool>), UpdateCachedStmt>,
|
||||
in_txn: bool,
|
||||
opts: DatabaseReplaySessionOpts,
|
||||
}
|
||||
@@ -429,24 +437,106 @@ impl DatabaseReplaySession {
|
||||
}
|
||||
tracing::trace!("replay: change={:?}", change);
|
||||
let table_name = &change.table_name;
|
||||
match change.change {
|
||||
DatabaseTapeRowChangeType::Delete { before } => {
|
||||
let before = parse_bin_record(before)?;
|
||||
self.replay_delete(coro, table_name, change.id, before)
|
||||
.await?
|
||||
|
||||
if table_name == SQLITE_SCHEMA_TABLE {
|
||||
// sqlite_schema table: type, name, tbl_name, rootpage, sql
|
||||
match change.change {
|
||||
DatabaseTapeRowChangeType::Delete { before } => {
|
||||
let before = parse_bin_record(before)?;
|
||||
assert!(before.len() == 5);
|
||||
let Some(turso_core::Value::Text(entity_type)) = before.first() else {
|
||||
panic!(
|
||||
"unexpected 'type' column of sqlite_schema table: {:?}",
|
||||
before.first()
|
||||
);
|
||||
};
|
||||
let Some(turso_core::Value::Text(entity_name)) = before.get(1) else {
|
||||
panic!(
|
||||
"unexpected 'name' column of sqlite_schema table: {:?}",
|
||||
before.get(1)
|
||||
);
|
||||
};
|
||||
self.conn.execute(format!(
|
||||
"DROP {} {}",
|
||||
entity_type.as_str(),
|
||||
entity_name.as_str()
|
||||
))?;
|
||||
}
|
||||
DatabaseTapeRowChangeType::Insert { after } => {
|
||||
let after = parse_bin_record(after)?;
|
||||
assert!(after.len() == 5);
|
||||
let Some(turso_core::Value::Text(sql)) = after.last() else {
|
||||
return Err(Error::DatabaseTapeError(format!(
|
||||
"unexpected 'sql' column of sqlite_schema table: {:?}",
|
||||
after.last()
|
||||
)));
|
||||
};
|
||||
self.conn.execute(sql.as_str())?;
|
||||
}
|
||||
DatabaseTapeRowChangeType::Update { updates, .. } => {
|
||||
let Some(updates) = updates else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
"'updates' column of CDC table must be populated".to_string(),
|
||||
));
|
||||
};
|
||||
let updates = parse_bin_record(updates)?;
|
||||
assert!(updates.len() % 2 == 0);
|
||||
assert!(updates.len() / 2 == 5);
|
||||
let turso_core::Value::Text(ddl_stmt) = updates.last().unwrap() else {
|
||||
panic!(
|
||||
"unexpected 'sql' column of sqlite_schema table update record: {:?}",
|
||||
updates.last()
|
||||
);
|
||||
};
|
||||
self.conn.execute(ddl_stmt.as_str())?;
|
||||
}
|
||||
}
|
||||
DatabaseTapeRowChangeType::Update { before, after } => {
|
||||
let before = parse_bin_record(before)?;
|
||||
self.replay_delete(coro, table_name, change.id, before)
|
||||
.await?;
|
||||
let after = parse_bin_record(after)?;
|
||||
self.replay_insert(coro, table_name, change.id, after)
|
||||
.await?;
|
||||
}
|
||||
DatabaseTapeRowChangeType::Insert { after } => {
|
||||
let values = parse_bin_record(after)?;
|
||||
self.replay_insert(coro, table_name, change.id, values)
|
||||
.await?;
|
||||
} else {
|
||||
match change.change {
|
||||
DatabaseTapeRowChangeType::Delete { before } => {
|
||||
let before = parse_bin_record(before)?;
|
||||
self.replay_delete(coro, table_name, change.id, before)
|
||||
.await?
|
||||
}
|
||||
DatabaseTapeRowChangeType::Update {
|
||||
before,
|
||||
after,
|
||||
updates,
|
||||
} => {
|
||||
let after = parse_bin_record(after)?;
|
||||
if let Some(updates) = updates {
|
||||
let updates = parse_bin_record(updates)?;
|
||||
assert!(updates.len() % 2 == 0);
|
||||
let columns_cnt = updates.len() / 2;
|
||||
let mut columns = Vec::with_capacity(columns_cnt);
|
||||
let mut values = Vec::with_capacity(columns_cnt);
|
||||
for (i, value) in updates.into_iter().enumerate() {
|
||||
if i < columns_cnt {
|
||||
columns.push(match value {
|
||||
turso_core::Value::Integer(x @ (1 | 0)) => x > 0,
|
||||
_ => panic!("unexpected 'changes' binary record first-half component: {value:?}")
|
||||
})
|
||||
} else {
|
||||
values.push(value);
|
||||
}
|
||||
}
|
||||
self.replay_update(
|
||||
coro, table_name, change.id, columns, after, values,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
let before = parse_bin_record(before)?;
|
||||
self.replay_delete(coro, table_name, change.id, before)
|
||||
.await?;
|
||||
self.replay_insert(coro, table_name, change.id, after)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
DatabaseTapeRowChangeType::Insert { after } => {
|
||||
let values = parse_bin_record(after)?;
|
||||
self.replay_insert(coro, table_name, change.id, values)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -497,6 +587,38 @@ impl DatabaseReplaySession {
|
||||
exec_stmt(coro, stmt).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn replay_update(
|
||||
&mut self,
|
||||
coro: &Coro,
|
||||
table_name: &str,
|
||||
id: i64,
|
||||
columns: Vec<bool>,
|
||||
mut full: Vec<turso_core::Value>,
|
||||
mut updates: Vec<turso_core::Value>,
|
||||
) -> Result<()> {
|
||||
let cached = self.cached_update_stmt(coro, table_name, &columns).await?;
|
||||
let mut position: usize = 1;
|
||||
for (i, updated) in columns.into_iter().enumerate() {
|
||||
if !updated {
|
||||
continue;
|
||||
}
|
||||
let value = std::mem::replace(&mut updates[i], turso_core::Value::Null);
|
||||
cached.stmt.bind_at(position.try_into().unwrap(), value);
|
||||
position += 1;
|
||||
}
|
||||
if let Some(pk_column_indices) = &cached.pk_column_indices {
|
||||
for pk_column in pk_column_indices {
|
||||
let value = std::mem::replace(&mut full[*pk_column], turso_core::Value::Null);
|
||||
cached.stmt.bind_at(position.try_into().unwrap(), value);
|
||||
position += 1
|
||||
}
|
||||
} else {
|
||||
let value = turso_core::Value::Integer(id);
|
||||
cached.stmt.bind_at(position.try_into().unwrap(), value);
|
||||
}
|
||||
exec_stmt(coro, &mut cached.stmt).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn cached_delete_stmt(
|
||||
&mut self,
|
||||
coro: &Coro,
|
||||
@@ -540,7 +662,26 @@ impl DatabaseReplaySession {
|
||||
stmt.reset();
|
||||
Ok(stmt)
|
||||
}
|
||||
|
||||
async fn cached_update_stmt(
|
||||
&mut self,
|
||||
coro: &Coro,
|
||||
table_name: &str,
|
||||
columns: &[bool],
|
||||
) -> Result<&mut UpdateCachedStmt> {
|
||||
let key = (table_name.to_string(), columns.to_owned());
|
||||
if !self.cached_update_stmt.contains_key(&key) {
|
||||
tracing::trace!("prepare update statement for replay: table={}", table_name);
|
||||
let stmt = self.update_query(coro, table_name, columns).await?;
|
||||
self.cached_update_stmt.insert(key.clone(), stmt);
|
||||
}
|
||||
tracing::trace!(
|
||||
"ready to use prepared update statement for replay: table={}",
|
||||
table_name
|
||||
);
|
||||
let cached = self.cached_update_stmt.get_mut(&key).unwrap();
|
||||
cached.stmt.reset();
|
||||
Ok(cached)
|
||||
}
|
||||
async fn insert_query(
|
||||
&self,
|
||||
coro: &Coro,
|
||||
@@ -571,7 +712,6 @@ impl DatabaseReplaySession {
|
||||
};
|
||||
Ok(self.conn.prepare(&query)?)
|
||||
}
|
||||
|
||||
async fn delete_query(&self, coro: &Coro, table_name: &str) -> Result<DeleteCachedStmt> {
|
||||
let (query, pk_column_indices) = if self.opts.use_implicit_rowid {
|
||||
(format!("DELETE FROM {table_name} WHERE rowid = ?"), None)
|
||||
@@ -612,6 +752,68 @@ impl DatabaseReplaySession {
|
||||
pk_column_indices,
|
||||
})
|
||||
}
|
||||
async fn update_query(
|
||||
&self,
|
||||
coro: &Coro,
|
||||
table_name: &str,
|
||||
columns: &[bool],
|
||||
) -> Result<UpdateCachedStmt> {
|
||||
let mut table_info_stmt = self.conn.prepare(format!(
|
||||
"SELECT cid, name, pk FROM pragma_table_info('{table_name}')"
|
||||
))?;
|
||||
let mut pk_predicates = Vec::with_capacity(1);
|
||||
let mut pk_column_indices = Vec::with_capacity(1);
|
||||
let mut column_updates = Vec::with_capacity(1);
|
||||
while let Some(column) = run_stmt(coro, &mut table_info_stmt).await? {
|
||||
let turso_core::Value::Integer(column_id) = column.get_value(0) else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
"unexpected column type for pragma_table_info query".to_string(),
|
||||
));
|
||||
};
|
||||
let turso_core::Value::Text(name) = column.get_value(1) else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
"unexpected column type for pragma_table_info query".to_string(),
|
||||
));
|
||||
};
|
||||
let turso_core::Value::Integer(pk) = column.get_value(2) else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
"unexpected column type for pragma_table_info query".to_string(),
|
||||
));
|
||||
};
|
||||
if *pk == 1 {
|
||||
pk_predicates.push(format!("{name} = ?"));
|
||||
pk_column_indices.push(*column_id as usize);
|
||||
}
|
||||
if columns[*column_id as usize] {
|
||||
column_updates.push(format!("{name} = ?"));
|
||||
}
|
||||
}
|
||||
|
||||
let (query, pk_column_indices) = if self.opts.use_implicit_rowid {
|
||||
(
|
||||
format!(
|
||||
"UPDATE {table_name} SET {} WHERE rowid = ?",
|
||||
column_updates.join(", ")
|
||||
),
|
||||
None,
|
||||
)
|
||||
} else {
|
||||
(
|
||||
format!(
|
||||
"UPDATE {table_name} SET {} WHERE {}",
|
||||
column_updates.join(", "),
|
||||
pk_predicates.join(" AND ")
|
||||
),
|
||||
Some(pk_column_indices),
|
||||
)
|
||||
};
|
||||
let stmt = self.conn.prepare(&query)?;
|
||||
let cached_stmt = UpdateCachedStmt {
|
||||
stmt,
|
||||
pk_column_indices,
|
||||
};
|
||||
Ok(cached_stmt)
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_bin_record(bin_record: Vec<u8>) -> Result<Vec<turso_core::Value>> {
|
||||
@@ -633,7 +835,9 @@ mod tests {
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
use crate::{
|
||||
database_tape::{run_stmt, DatabaseReplaySessionOpts, DatabaseTape},
|
||||
database_tape::{
|
||||
run_stmt, DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape,
|
||||
},
|
||||
types::{DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType},
|
||||
};
|
||||
|
||||
@@ -941,4 +1145,384 @@ mod tests {
|
||||
]]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_database_tape_replay_schema_changes() {
|
||||
let temp_file1 = NamedTempFile::new().unwrap();
|
||||
let db_path1 = temp_file1.path().to_str().unwrap();
|
||||
let temp_file2 = NamedTempFile::new().unwrap();
|
||||
let db_path2 = temp_file2.path().to_str().unwrap();
|
||||
let temp_file3 = NamedTempFile::new().unwrap();
|
||||
let db_path3 = temp_file3.path().to_str().unwrap();
|
||||
|
||||
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
|
||||
|
||||
let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap();
|
||||
let db1 = Arc::new(DatabaseTape::new(db1));
|
||||
|
||||
let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap();
|
||||
let db2 = Arc::new(DatabaseTape::new(db2));
|
||||
|
||||
let db3 = turso_core::Database::open_file(io.clone(), db_path3, false, true).unwrap();
|
||||
let db3 = Arc::new(DatabaseTape::new(db3));
|
||||
|
||||
let mut gen = genawaiter::sync::Gen::new({
|
||||
|coro| async move {
|
||||
let conn1 = db1.connect(&coro).await.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)")
|
||||
.unwrap();
|
||||
conn1
|
||||
.execute("INSERT INTO t(x, y) VALUES ('a', 10)")
|
||||
.unwrap();
|
||||
let conn2 = db2.connect(&coro).await.unwrap();
|
||||
conn2
|
||||
.execute("CREATE TABLE q(x TEXT PRIMARY KEY, y)")
|
||||
.unwrap();
|
||||
conn2
|
||||
.execute("INSERT INTO q(x, y) VALUES ('b', 20)")
|
||||
.unwrap();
|
||||
|
||||
let conn3 = db3.connect(&coro).await.unwrap();
|
||||
{
|
||||
let opts = DatabaseReplaySessionOpts {
|
||||
use_implicit_rowid: false,
|
||||
};
|
||||
let mut session = db3.start_replay_session(&coro, opts).await.unwrap();
|
||||
|
||||
let opts = DatabaseChangesIteratorOpts {
|
||||
ignore_schema_changes: false,
|
||||
..Default::default()
|
||||
};
|
||||
let mut iterator = db1.iterate_changes(opts.clone()).unwrap();
|
||||
while let Some(operation) = iterator.next(&coro).await.unwrap() {
|
||||
session.replay(&coro, operation).await.unwrap();
|
||||
}
|
||||
let mut iterator = db2.iterate_changes(opts.clone()).unwrap();
|
||||
while let Some(operation) = iterator.next(&coro).await.unwrap() {
|
||||
session.replay(&coro, operation).await.unwrap();
|
||||
}
|
||||
}
|
||||
let mut rows = Vec::new();
|
||||
let mut stmt = conn3.prepare("SELECT rowid, x, y FROM t").unwrap();
|
||||
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
|
||||
rows.push(row.get_values().cloned().collect::<Vec<_>>());
|
||||
}
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![vec![
|
||||
turso_core::Value::Integer(1),
|
||||
turso_core::Value::Text(turso_core::types::Text::new("a")),
|
||||
turso_core::Value::Integer(10),
|
||||
]]
|
||||
);
|
||||
|
||||
let mut rows = Vec::new();
|
||||
let mut stmt = conn3.prepare("SELECT rowid, x, y FROM q").unwrap();
|
||||
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
|
||||
rows.push(row.get_values().cloned().collect::<Vec<_>>());
|
||||
}
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![vec![
|
||||
turso_core::Value::Integer(1),
|
||||
turso_core::Value::Text(turso_core::types::Text::new("b")),
|
||||
turso_core::Value::Integer(20),
|
||||
]]
|
||||
);
|
||||
let mut rows = Vec::new();
|
||||
let mut stmt = conn3
|
||||
.prepare(
|
||||
"SELECT * FROM sqlite_schema WHERE name != 'turso_cdc' AND type = 'table'",
|
||||
)
|
||||
.unwrap();
|
||||
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
|
||||
rows.push(row.get_values().cloned().collect::<Vec<_>>());
|
||||
}
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![
|
||||
turso_core::Value::Text(turso_core::types::Text::new("table")),
|
||||
turso_core::Value::Text(turso_core::types::Text::new("t")),
|
||||
turso_core::Value::Text(turso_core::types::Text::new("t")),
|
||||
turso_core::Value::Integer(3),
|
||||
turso_core::Value::Text(turso_core::types::Text::new(
|
||||
"CREATE TABLE t (x TEXT PRIMARY KEY, y)"
|
||||
)),
|
||||
],
|
||||
vec![
|
||||
turso_core::Value::Text(turso_core::types::Text::new("table")),
|
||||
turso_core::Value::Text(turso_core::types::Text::new("q")),
|
||||
turso_core::Value::Text(turso_core::types::Text::new("q")),
|
||||
turso_core::Value::Integer(5),
|
||||
turso_core::Value::Text(turso_core::types::Text::new(
|
||||
"CREATE TABLE q (x TEXT PRIMARY KEY, y)"
|
||||
)),
|
||||
]
|
||||
]
|
||||
);
|
||||
crate::Result::Ok(())
|
||||
}
|
||||
});
|
||||
loop {
|
||||
match gen.resume_with(Ok(())) {
|
||||
genawaiter::GeneratorState::Yielded(..) => io.run_once().unwrap(),
|
||||
genawaiter::GeneratorState::Complete(result) => {
|
||||
result.unwrap();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_database_tape_replay_create_index() {
|
||||
let temp_file1 = NamedTempFile::new().unwrap();
|
||||
let db_path1 = temp_file1.path().to_str().unwrap();
|
||||
let temp_file2 = NamedTempFile::new().unwrap();
|
||||
let db_path2 = temp_file2.path().to_str().unwrap();
|
||||
|
||||
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
|
||||
|
||||
let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap();
|
||||
let db1 = Arc::new(DatabaseTape::new(db1));
|
||||
|
||||
let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap();
|
||||
let db2 = Arc::new(DatabaseTape::new(db2));
|
||||
|
||||
let mut gen = genawaiter::sync::Gen::new({
|
||||
|coro| async move {
|
||||
let conn1 = db1.connect(&coro).await.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)")
|
||||
.unwrap();
|
||||
conn1.execute("CREATE INDEX t_idx ON t(y)").unwrap();
|
||||
|
||||
let conn2 = db2.connect(&coro).await.unwrap();
|
||||
{
|
||||
let opts = DatabaseReplaySessionOpts {
|
||||
use_implicit_rowid: false,
|
||||
};
|
||||
let mut session = db2.start_replay_session(&coro, opts).await.unwrap();
|
||||
|
||||
let opts = DatabaseChangesIteratorOpts {
|
||||
ignore_schema_changes: false,
|
||||
..Default::default()
|
||||
};
|
||||
let mut iterator = db1.iterate_changes(opts.clone()).unwrap();
|
||||
while let Some(operation) = iterator.next(&coro).await.unwrap() {
|
||||
session.replay(&coro, operation).await.unwrap();
|
||||
}
|
||||
}
|
||||
let mut rows = Vec::new();
|
||||
let mut stmt = conn2
|
||||
.prepare("SELECT * FROM sqlite_schema WHERE name IN ('t', 't_idx')")
|
||||
.unwrap();
|
||||
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
|
||||
rows.push(row.get_values().cloned().collect::<Vec<_>>());
|
||||
}
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![
|
||||
turso_core::Value::Text(turso_core::types::Text::new("table")),
|
||||
turso_core::Value::Text(turso_core::types::Text::new("t")),
|
||||
turso_core::Value::Text(turso_core::types::Text::new("t")),
|
||||
turso_core::Value::Integer(3),
|
||||
turso_core::Value::Text(turso_core::types::Text::new(
|
||||
"CREATE TABLE t (x TEXT PRIMARY KEY, y)"
|
||||
)),
|
||||
],
|
||||
vec![
|
||||
turso_core::Value::Text(turso_core::types::Text::new("index")),
|
||||
turso_core::Value::Text(turso_core::types::Text::new("t_idx")),
|
||||
turso_core::Value::Text(turso_core::types::Text::new("t")),
|
||||
turso_core::Value::Integer(5),
|
||||
turso_core::Value::Text(turso_core::types::Text::new(
|
||||
"CREATE INDEX t_idx ON t (y)"
|
||||
)),
|
||||
]
|
||||
]
|
||||
);
|
||||
crate::Result::Ok(())
|
||||
}
|
||||
});
|
||||
loop {
|
||||
match gen.resume_with(Ok(())) {
|
||||
genawaiter::GeneratorState::Yielded(..) => io.run_once().unwrap(),
|
||||
genawaiter::GeneratorState::Complete(result) => {
|
||||
result.unwrap();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_database_tape_replay_alter_table() {
|
||||
let temp_file1 = NamedTempFile::new().unwrap();
|
||||
let db_path1 = temp_file1.path().to_str().unwrap();
|
||||
let temp_file2 = NamedTempFile::new().unwrap();
|
||||
let db_path2 = temp_file2.path().to_str().unwrap();
|
||||
|
||||
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
|
||||
|
||||
let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap();
|
||||
let db1 = Arc::new(DatabaseTape::new(db1));
|
||||
|
||||
let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap();
|
||||
let db2 = Arc::new(DatabaseTape::new(db2));
|
||||
|
||||
let mut gen = genawaiter::sync::Gen::new({
|
||||
|coro| async move {
|
||||
let conn1 = db1.connect(&coro).await.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)")
|
||||
.unwrap();
|
||||
conn1.execute("ALTER TABLE t ADD COLUMN z").unwrap();
|
||||
conn1.execute("ALTER TABLE t DROP COLUMN y").unwrap();
|
||||
|
||||
let conn2 = db2.connect(&coro).await.unwrap();
|
||||
{
|
||||
let opts = DatabaseReplaySessionOpts {
|
||||
use_implicit_rowid: false,
|
||||
};
|
||||
let mut session = db2.start_replay_session(&coro, opts).await.unwrap();
|
||||
|
||||
let opts = DatabaseChangesIteratorOpts {
|
||||
ignore_schema_changes: false,
|
||||
..Default::default()
|
||||
};
|
||||
let mut iterator = db1.iterate_changes(opts.clone()).unwrap();
|
||||
while let Some(operation) = iterator.next(&coro).await.unwrap() {
|
||||
session.replay(&coro, operation).await.unwrap();
|
||||
}
|
||||
}
|
||||
let mut rows = Vec::new();
|
||||
let mut stmt = conn2
|
||||
.prepare("SELECT * FROM sqlite_schema WHERE name IN ('t')")
|
||||
.unwrap();
|
||||
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
|
||||
rows.push(row.get_values().cloned().collect::<Vec<_>>());
|
||||
}
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![vec![
|
||||
turso_core::Value::Text(turso_core::types::Text::new("table")),
|
||||
turso_core::Value::Text(turso_core::types::Text::new("t")),
|
||||
turso_core::Value::Text(turso_core::types::Text::new("t")),
|
||||
turso_core::Value::Integer(3),
|
||||
turso_core::Value::Text(turso_core::types::Text::new(
|
||||
"CREATE TABLE t (x TEXT PRIMARY KEY, z)"
|
||||
)),
|
||||
]]
|
||||
);
|
||||
crate::Result::Ok(())
|
||||
}
|
||||
});
|
||||
loop {
|
||||
match gen.resume_with(Ok(())) {
|
||||
genawaiter::GeneratorState::Yielded(..) => io.run_once().unwrap(),
|
||||
genawaiter::GeneratorState::Complete(result) => {
|
||||
result.unwrap();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_database_tape_replay_non_overlapping_updates() {
|
||||
let temp_file1 = NamedTempFile::new().unwrap();
|
||||
let db_path1 = temp_file1.path().to_str().unwrap();
|
||||
let temp_file2 = NamedTempFile::new().unwrap();
|
||||
let db_path2 = temp_file2.path().to_str().unwrap();
|
||||
let temp_file3 = NamedTempFile::new().unwrap();
|
||||
let db_path3 = temp_file3.path().to_str().unwrap();
|
||||
|
||||
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
|
||||
|
||||
let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap();
|
||||
let db1 = Arc::new(DatabaseTape::new(db1));
|
||||
|
||||
let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap();
|
||||
let db2 = Arc::new(DatabaseTape::new(db2));
|
||||
|
||||
let db3 = turso_core::Database::open_file(io.clone(), db_path3, false, true).unwrap();
|
||||
let db3 = Arc::new(DatabaseTape::new(db3));
|
||||
|
||||
let mut gen = genawaiter::sync::Gen::new({
|
||||
|coro| async move {
|
||||
let conn1 = db1.connect(&coro).await.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)")
|
||||
.unwrap();
|
||||
conn1
|
||||
.execute("INSERT INTO t VALUES ('turso', 1, 2)")
|
||||
.unwrap();
|
||||
conn1
|
||||
.execute("UPDATE t SET y = 10 WHERE x = 'turso'")
|
||||
.unwrap();
|
||||
|
||||
let conn2 = db2.connect_untracked().unwrap();
|
||||
conn2
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)")
|
||||
.unwrap();
|
||||
conn2
|
||||
.execute("INSERT INTO t VALUES ('turso', 1, 2)")
|
||||
.unwrap();
|
||||
|
||||
let conn2 = db2.connect(&coro).await.unwrap();
|
||||
conn2
|
||||
.execute("UPDATE t SET z = 20 WHERE x = 'turso'")
|
||||
.unwrap();
|
||||
|
||||
let conn3 = db3.connect(&coro).await.unwrap();
|
||||
{
|
||||
let opts = DatabaseReplaySessionOpts {
|
||||
use_implicit_rowid: false,
|
||||
};
|
||||
let mut session = db3.start_replay_session(&coro, opts).await.unwrap();
|
||||
|
||||
let opts = DatabaseChangesIteratorOpts {
|
||||
ignore_schema_changes: false,
|
||||
..Default::default()
|
||||
};
|
||||
let mut iterator = db1.iterate_changes(opts.clone()).unwrap();
|
||||
while let Some(operation) = iterator.next(&coro).await.unwrap() {
|
||||
session.replay(&coro, operation).await.unwrap();
|
||||
}
|
||||
|
||||
let mut iterator = db2.iterate_changes(opts.clone()).unwrap();
|
||||
while let Some(operation) = iterator.next(&coro).await.unwrap() {
|
||||
session.replay(&coro, operation).await.unwrap();
|
||||
}
|
||||
}
|
||||
let mut rows = Vec::new();
|
||||
let mut stmt = conn3.prepare("SELECT * FROM t").unwrap();
|
||||
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
|
||||
rows.push(row.get_values().cloned().collect::<Vec<_>>());
|
||||
}
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![vec![
|
||||
turso_core::Value::Text(turso_core::types::Text::new("turso")),
|
||||
turso_core::Value::Integer(10),
|
||||
turso_core::Value::Integer(20),
|
||||
]]
|
||||
);
|
||||
crate::Result::Ok(())
|
||||
}
|
||||
});
|
||||
loop {
|
||||
match gen.resume_with(Ok(())) {
|
||||
genawaiter::GeneratorState::Yielded(..) => io.run_once().unwrap(),
|
||||
genawaiter::GeneratorState::Complete(result) => {
|
||||
result.unwrap();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,6 +66,8 @@ pub struct DatabaseChange {
|
||||
pub before: Option<Vec<u8>>,
|
||||
/// Binary record of the row after the change, if CDC pragma set to either 'after' or 'full'
|
||||
pub after: Option<Vec<u8>>,
|
||||
/// Binary record from "updates" column, if CDC pragma set to 'full'
|
||||
pub updates: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl DatabaseChange {
|
||||
@@ -86,6 +88,7 @@ impl DatabaseChange {
|
||||
after: self.after.ok_or_else(|| {
|
||||
Error::DatabaseTapeError("cdc_mode must be set to 'full'".to_string())
|
||||
})?,
|
||||
updates: self.updates,
|
||||
},
|
||||
DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Insert {
|
||||
after: self.after.ok_or_else(|| {
|
||||
@@ -122,6 +125,7 @@ impl DatabaseChange {
|
||||
"cdc_mode must be set to either 'full' or 'before'".to_string(),
|
||||
)
|
||||
})?,
|
||||
updates: None,
|
||||
},
|
||||
DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Delete {
|
||||
before: self.after.ok_or_else(|| {
|
||||
@@ -166,6 +170,7 @@ impl TryFrom<&turso_core::Row> for DatabaseChange {
|
||||
let id = get_core_value_i64(row, 4)?;
|
||||
let before = get_core_value_blob_or_null(row, 5)?;
|
||||
let after = get_core_value_blob_or_null(row, 6)?;
|
||||
let updates = get_core_value_blob_or_null(row, 7)?;
|
||||
|
||||
let change_type = match change_type {
|
||||
-1 => DatabaseChangeType::Delete,
|
||||
@@ -185,14 +190,23 @@ impl TryFrom<&turso_core::Row> for DatabaseChange {
|
||||
id,
|
||||
before,
|
||||
after,
|
||||
updates,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub enum DatabaseTapeRowChangeType {
|
||||
Delete { before: Vec<u8> },
|
||||
Update { before: Vec<u8>, after: Vec<u8> },
|
||||
Insert { after: Vec<u8> },
|
||||
Delete {
|
||||
before: Vec<u8>,
|
||||
},
|
||||
Update {
|
||||
before: Vec<u8>,
|
||||
after: Vec<u8>,
|
||||
updates: Option<Vec<u8>>,
|
||||
},
|
||||
Insert {
|
||||
after: Vec<u8>,
|
||||
},
|
||||
}
|
||||
|
||||
/// [DatabaseTapeOperation] extends [DatabaseTapeRowChange] by adding information about transaction boundary
|
||||
@@ -222,10 +236,15 @@ impl std::fmt::Debug for DatabaseTapeRowChangeType {
|
||||
.debug_struct("Delete")
|
||||
.field("before.len()", &before.len())
|
||||
.finish(),
|
||||
Self::Update { before, after } => f
|
||||
Self::Update {
|
||||
before,
|
||||
after,
|
||||
updates,
|
||||
} => f
|
||||
.debug_struct("Update")
|
||||
.field("before.len()", &before.len())
|
||||
.field("after.len()", &after.len())
|
||||
.field("updates.len()", &updates.as_ref().map(|x| x.len()))
|
||||
.finish(),
|
||||
Self::Insert { after } => f
|
||||
.debug_struct("Insert")
|
||||
|
||||
Reference in New Issue
Block a user