support UPDATEs and schema changes in database tape

This commit is contained in:
Nikita Sivukhin
2025-08-11 16:50:59 +04:00
parent 8749a66217
commit 5e773d286e
2 changed files with 630 additions and 27 deletions

View File

@@ -149,6 +149,7 @@ impl DatabaseTape {
conn: self.connect(coro).await?,
cached_delete_stmt: HashMap::new(),
cached_insert_stmt: HashMap::new(),
cached_update_stmt: HashMap::new(),
in_txn: false,
opts,
})
@@ -283,7 +284,7 @@ impl DatabaseWalSession {
}
}
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub enum DatabaseChangesIteratorMode {
Apply,
Revert,
@@ -313,7 +314,7 @@ impl DatabaseChangesIteratorMode {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DatabaseChangesIteratorOpts {
pub first_change_id: Option<i64>,
pub batch_size: usize,
@@ -341,6 +342,7 @@ pub struct DatabaseChangesIterator {
ignore_schema_changes: bool,
}
const SQLITE_SCHEMA_TABLE: &str = "sqlite_schema";
impl DatabaseChangesIterator {
pub async fn next(&mut self, coro: &Coro) -> Result<Option<DatabaseTapeOperation>> {
if self.batch.is_empty() {
@@ -359,7 +361,7 @@ impl DatabaseChangesIterator {
None
};
if let Some(DatabaseTapeOperation::RowChange(change)) = &next {
if self.ignore_schema_changes && change.table_name == "sqlite_schema" {
if self.ignore_schema_changes && change.table_name == SQLITE_SCHEMA_TABLE {
continue;
}
}
@@ -400,10 +402,16 @@ struct DeleteCachedStmt {
pk_column_indices: Option<Vec<usize>>, // if None - use rowid instead
}
struct UpdateCachedStmt {
stmt: turso_core::Statement,
pk_column_indices: Option<Vec<usize>>, // if None - use rowid instead
}
pub struct DatabaseReplaySession {
conn: Arc<turso_core::Connection>,
cached_delete_stmt: HashMap<String, DeleteCachedStmt>,
cached_insert_stmt: HashMap<(String, usize), turso_core::Statement>,
cached_update_stmt: HashMap<(String, Vec<bool>), UpdateCachedStmt>,
in_txn: bool,
opts: DatabaseReplaySessionOpts,
}
@@ -429,24 +437,106 @@ impl DatabaseReplaySession {
}
tracing::trace!("replay: change={:?}", change);
let table_name = &change.table_name;
match change.change {
DatabaseTapeRowChangeType::Delete { before } => {
let before = parse_bin_record(before)?;
self.replay_delete(coro, table_name, change.id, before)
.await?
if table_name == SQLITE_SCHEMA_TABLE {
// sqlite_schema table: type, name, tbl_name, rootpage, sql
match change.change {
DatabaseTapeRowChangeType::Delete { before } => {
let before = parse_bin_record(before)?;
assert!(before.len() == 5);
let Some(turso_core::Value::Text(entity_type)) = before.get(0) else {
panic!(
"unexpected 'type' column of sqlite_schema table: {:?}",
before.get(0)
);
};
let Some(turso_core::Value::Text(entity_name)) = before.get(1) else {
panic!(
"unexpected 'name' column of sqlite_schema table: {:?}",
before.get(1)
);
};
self.conn.execute(format!(
"DROP {} {}",
entity_type.as_str(),
entity_name.as_str()
))?;
}
DatabaseTapeRowChangeType::Insert { after } => {
let after = parse_bin_record(after)?;
assert!(after.len() == 5);
let Some(turso_core::Value::Text(sql)) = after.last() else {
return Err(Error::DatabaseTapeError(format!(
"unexpected 'sql' column of sqlite_schema table: {:?}",
after.last()
)));
};
self.conn.execute(sql.as_str())?;
}
DatabaseTapeRowChangeType::Update { updates, .. } => {
let Some(updates) = updates else {
return Err(Error::DatabaseTapeError(format!(
"'updates' column of CDC table must be populated"
)));
};
let updates = parse_bin_record(updates)?;
assert!(updates.len() % 2 == 0);
assert!(updates.len() / 2 == 5);
let turso_core::Value::Text(ddl_stmt) = updates.last().unwrap() else {
panic!(
"unexpected 'sql' column of sqlite_schema table update record: {:?}",
updates.last()
);
};
self.conn.execute(ddl_stmt.as_str())?;
}
}
DatabaseTapeRowChangeType::Update { before, after } => {
let before = parse_bin_record(before)?;
self.replay_delete(coro, table_name, change.id, before)
.await?;
let after = parse_bin_record(after)?;
self.replay_insert(coro, table_name, change.id, after)
.await?;
}
DatabaseTapeRowChangeType::Insert { after } => {
let values = parse_bin_record(after)?;
self.replay_insert(coro, table_name, change.id, values)
.await?;
} else {
match change.change {
DatabaseTapeRowChangeType::Delete { before } => {
let before = parse_bin_record(before)?;
self.replay_delete(coro, table_name, change.id, before)
.await?
}
DatabaseTapeRowChangeType::Update {
before,
after,
updates,
} => {
let after = parse_bin_record(after)?;
if let Some(updates) = updates {
let updates = parse_bin_record(updates)?;
assert!(updates.len() % 2 == 0);
let columns_cnt = updates.len() / 2;
let mut columns = Vec::with_capacity(columns_cnt);
let mut values = Vec::with_capacity(columns_cnt);
for (i, value) in updates.into_iter().enumerate() {
if i < columns_cnt {
columns.push(match value {
turso_core::Value::Integer(x @ (1 | 0)) => x > 0,
_ => panic!("unexpected 'changes' binary record first-half component: {:?}", value)
})
} else {
values.push(value);
}
}
self.replay_update(
coro, table_name, change.id, columns, after, values,
)
.await?;
} else {
let before = parse_bin_record(before)?;
self.replay_delete(coro, table_name, change.id, before)
.await?;
self.replay_insert(coro, table_name, change.id, after)
.await?;
}
}
DatabaseTapeRowChangeType::Insert { after } => {
let values = parse_bin_record(after)?;
self.replay_insert(coro, table_name, change.id, values)
.await?;
}
}
}
}
@@ -497,6 +587,38 @@ impl DatabaseReplaySession {
exec_stmt(coro, stmt).await?;
Ok(())
}
async fn replay_update(
&mut self,
coro: &Coro,
table_name: &str,
id: i64,
columns: Vec<bool>,
mut full: Vec<turso_core::Value>,
mut updates: Vec<turso_core::Value>,
) -> Result<()> {
let cached = self.cached_update_stmt(coro, table_name, &columns).await?;
let mut position: usize = 1;
for (i, updated) in columns.into_iter().enumerate() {
if !updated {
continue;
}
let value = std::mem::replace(&mut updates[i], turso_core::Value::Null);
cached.stmt.bind_at(position.try_into().unwrap(), value);
position += 1;
}
if let Some(pk_column_indices) = &cached.pk_column_indices {
for pk_column in pk_column_indices {
let value = std::mem::replace(&mut full[*pk_column], turso_core::Value::Null);
cached.stmt.bind_at(position.try_into().unwrap(), value);
position += 1
}
} else {
let value = turso_core::Value::Integer(id);
cached.stmt.bind_at(position.try_into().unwrap(), value);
}
exec_stmt(coro, &mut cached.stmt).await?;
Ok(())
}
async fn cached_delete_stmt(
&mut self,
coro: &Coro,
@@ -540,7 +662,26 @@ impl DatabaseReplaySession {
stmt.reset();
Ok(stmt)
}
async fn cached_update_stmt(
&mut self,
coro: &Coro,
table_name: &str,
columns: &Vec<bool>,
) -> Result<&mut UpdateCachedStmt> {
let key = (table_name.to_string(), columns.clone());
if !self.cached_update_stmt.contains_key(&key) {
tracing::trace!("prepare update statement for replay: table={}", table_name);
let stmt = self.update_query(coro, table_name, &columns).await?;
self.cached_update_stmt.insert(key.clone(), stmt);
}
tracing::trace!(
"ready to use prepared update statement for replay: table={}",
table_name
);
let cached = self.cached_update_stmt.get_mut(&key).unwrap();
cached.stmt.reset();
Ok(cached)
}
async fn insert_query(
&self,
coro: &Coro,
@@ -571,7 +712,6 @@ impl DatabaseReplaySession {
};
Ok(self.conn.prepare(&query)?)
}
async fn delete_query(&self, coro: &Coro, table_name: &str) -> Result<DeleteCachedStmt> {
let (query, pk_column_indices) = if self.opts.use_implicit_rowid {
(format!("DELETE FROM {table_name} WHERE rowid = ?"), None)
@@ -612,6 +752,68 @@ impl DatabaseReplaySession {
pk_column_indices,
})
}
async fn update_query(
&self,
coro: &Coro,
table_name: &str,
columns: &[bool],
) -> Result<UpdateCachedStmt> {
let mut table_info_stmt = self.conn.prepare(format!(
"SELECT cid, name, pk FROM pragma_table_info('{table_name}')"
))?;
let mut pk_predicates = Vec::with_capacity(1);
let mut pk_column_indices = Vec::with_capacity(1);
let mut column_updates = Vec::with_capacity(1);
while let Some(column) = run_stmt(coro, &mut table_info_stmt).await? {
let turso_core::Value::Integer(column_id) = column.get_value(0) else {
return Err(Error::DatabaseTapeError(
"unexpected column type for pragma_table_info query".to_string(),
));
};
let turso_core::Value::Text(name) = column.get_value(1) else {
return Err(Error::DatabaseTapeError(
"unexpected column type for pragma_table_info query".to_string(),
));
};
let turso_core::Value::Integer(pk) = column.get_value(2) else {
return Err(Error::DatabaseTapeError(
"unexpected column type for pragma_table_info query".to_string(),
));
};
if *pk == 1 {
pk_predicates.push(format!("{name} = ?"));
pk_column_indices.push(*column_id as usize);
}
if columns[*column_id as usize] {
column_updates.push(format!("{name} = ?"));
}
}
let (query, pk_column_indices) = if self.opts.use_implicit_rowid {
(
format!(
"UPDATE {table_name} SET {} WHERE rowid = ?",
column_updates.join(", ")
),
None,
)
} else {
(
format!(
"UPDATE {table_name} SET {} WHERE {}",
column_updates.join(", "),
pk_predicates.join(" AND ")
),
Some(pk_column_indices),
)
};
let stmt = self.conn.prepare(&query)?;
let cached_stmt = UpdateCachedStmt {
stmt,
pk_column_indices,
};
Ok(cached_stmt)
}
}
fn parse_bin_record(bin_record: Vec<u8>) -> Result<Vec<turso_core::Value>> {
@@ -633,7 +835,9 @@ mod tests {
use tempfile::NamedTempFile;
use crate::{
database_tape::{run_stmt, DatabaseReplaySessionOpts, DatabaseTape},
database_tape::{
run_stmt, DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape,
},
types::{DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType},
};
@@ -941,4 +1145,384 @@ mod tests {
]]
);
}
#[test]
pub fn test_database_tape_replay_schema_changes() {
let temp_file1 = NamedTempFile::new().unwrap();
let db_path1 = temp_file1.path().to_str().unwrap();
let temp_file2 = NamedTempFile::new().unwrap();
let db_path2 = temp_file2.path().to_str().unwrap();
let temp_file3 = NamedTempFile::new().unwrap();
let db_path3 = temp_file3.path().to_str().unwrap();
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap();
let db1 = Arc::new(DatabaseTape::new(db1));
let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap();
let db2 = Arc::new(DatabaseTape::new(db2));
let db3 = turso_core::Database::open_file(io.clone(), db_path3, false, true).unwrap();
let db3 = Arc::new(DatabaseTape::new(db3));
let mut gen = genawaiter::sync::Gen::new({
|coro| async move {
let conn1 = db1.connect(&coro).await.unwrap();
conn1
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)")
.unwrap();
conn1
.execute("INSERT INTO t(x, y) VALUES ('a', 10)")
.unwrap();
let conn2 = db2.connect(&coro).await.unwrap();
conn2
.execute("CREATE TABLE q(x TEXT PRIMARY KEY, y)")
.unwrap();
conn2
.execute("INSERT INTO q(x, y) VALUES ('b', 20)")
.unwrap();
let conn3 = db3.connect(&coro).await.unwrap();
{
let opts = DatabaseReplaySessionOpts {
use_implicit_rowid: false,
};
let mut session = db3.start_replay_session(&coro, opts).await.unwrap();
let opts = DatabaseChangesIteratorOpts {
ignore_schema_changes: false,
..Default::default()
};
let mut iterator = db1.iterate_changes(opts.clone()).unwrap();
while let Some(operation) = iterator.next(&coro).await.unwrap() {
session.replay(&coro, operation).await.unwrap();
}
let mut iterator = db2.iterate_changes(opts.clone()).unwrap();
while let Some(operation) = iterator.next(&coro).await.unwrap() {
session.replay(&coro, operation).await.unwrap();
}
}
let mut rows = Vec::new();
let mut stmt = conn3.prepare("SELECT rowid, x, y FROM t").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
rows,
vec![vec![
turso_core::Value::Integer(1),
turso_core::Value::Text(turso_core::types::Text::new("a")),
turso_core::Value::Integer(10),
]]
);
let mut rows = Vec::new();
let mut stmt = conn3.prepare("SELECT rowid, x, y FROM q").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
rows,
vec![vec![
turso_core::Value::Integer(1),
turso_core::Value::Text(turso_core::types::Text::new("b")),
turso_core::Value::Integer(20),
]]
);
let mut rows = Vec::new();
let mut stmt = conn3
.prepare(
"SELECT * FROM sqlite_schema WHERE name != 'turso_cdc' AND type = 'table'",
)
.unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
rows,
vec![
vec![
turso_core::Value::Text(turso_core::types::Text::new("table")),
turso_core::Value::Text(turso_core::types::Text::new("t")),
turso_core::Value::Text(turso_core::types::Text::new("t")),
turso_core::Value::Integer(3),
turso_core::Value::Text(turso_core::types::Text::new(
"CREATE TABLE t (x TEXT PRIMARY KEY, y)"
)),
],
vec![
turso_core::Value::Text(turso_core::types::Text::new("table")),
turso_core::Value::Text(turso_core::types::Text::new("q")),
turso_core::Value::Text(turso_core::types::Text::new("q")),
turso_core::Value::Integer(5),
turso_core::Value::Text(turso_core::types::Text::new(
"CREATE TABLE q (x TEXT PRIMARY KEY, y)"
)),
]
]
);
crate::Result::Ok(())
}
});
loop {
match gen.resume_with(Ok(())) {
genawaiter::GeneratorState::Yielded(..) => io.run_once().unwrap(),
genawaiter::GeneratorState::Complete(result) => {
result.unwrap();
break;
}
}
}
}
#[test]
pub fn test_database_tape_replay_create_index() {
let temp_file1 = NamedTempFile::new().unwrap();
let db_path1 = temp_file1.path().to_str().unwrap();
let temp_file2 = NamedTempFile::new().unwrap();
let db_path2 = temp_file2.path().to_str().unwrap();
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap();
let db1 = Arc::new(DatabaseTape::new(db1));
let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap();
let db2 = Arc::new(DatabaseTape::new(db2));
let mut gen = genawaiter::sync::Gen::new({
|coro| async move {
let conn1 = db1.connect(&coro).await.unwrap();
conn1
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)")
.unwrap();
conn1.execute("CREATE INDEX t_idx ON t(y)").unwrap();
let conn2 = db2.connect(&coro).await.unwrap();
{
let opts = DatabaseReplaySessionOpts {
use_implicit_rowid: false,
};
let mut session = db2.start_replay_session(&coro, opts).await.unwrap();
let opts = DatabaseChangesIteratorOpts {
ignore_schema_changes: false,
..Default::default()
};
let mut iterator = db1.iterate_changes(opts.clone()).unwrap();
while let Some(operation) = iterator.next(&coro).await.unwrap() {
session.replay(&coro, operation).await.unwrap();
}
}
let mut rows = Vec::new();
let mut stmt = conn2
.prepare("SELECT * FROM sqlite_schema WHERE name IN ('t', 't_idx')")
.unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
rows,
vec![
vec![
turso_core::Value::Text(turso_core::types::Text::new("table")),
turso_core::Value::Text(turso_core::types::Text::new("t")),
turso_core::Value::Text(turso_core::types::Text::new("t")),
turso_core::Value::Integer(3),
turso_core::Value::Text(turso_core::types::Text::new(
"CREATE TABLE t (x TEXT PRIMARY KEY, y)"
)),
],
vec![
turso_core::Value::Text(turso_core::types::Text::new("index")),
turso_core::Value::Text(turso_core::types::Text::new("t_idx")),
turso_core::Value::Text(turso_core::types::Text::new("t")),
turso_core::Value::Integer(5),
turso_core::Value::Text(turso_core::types::Text::new(
"CREATE INDEX t_idx ON t (y)"
)),
]
]
);
crate::Result::Ok(())
}
});
loop {
match gen.resume_with(Ok(())) {
genawaiter::GeneratorState::Yielded(..) => io.run_once().unwrap(),
genawaiter::GeneratorState::Complete(result) => {
result.unwrap();
break;
}
}
}
}
#[test]
pub fn test_database_tape_replay_alter_table() {
let temp_file1 = NamedTempFile::new().unwrap();
let db_path1 = temp_file1.path().to_str().unwrap();
let temp_file2 = NamedTempFile::new().unwrap();
let db_path2 = temp_file2.path().to_str().unwrap();
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap();
let db1 = Arc::new(DatabaseTape::new(db1));
let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap();
let db2 = Arc::new(DatabaseTape::new(db2));
let mut gen = genawaiter::sync::Gen::new({
|coro| async move {
let conn1 = db1.connect(&coro).await.unwrap();
conn1
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)")
.unwrap();
conn1.execute("ALTER TABLE t ADD COLUMN z").unwrap();
conn1.execute("ALTER TABLE t DROP COLUMN y").unwrap();
let conn2 = db2.connect(&coro).await.unwrap();
{
let opts = DatabaseReplaySessionOpts {
use_implicit_rowid: false,
};
let mut session = db2.start_replay_session(&coro, opts).await.unwrap();
let opts = DatabaseChangesIteratorOpts {
ignore_schema_changes: false,
..Default::default()
};
let mut iterator = db1.iterate_changes(opts.clone()).unwrap();
while let Some(operation) = iterator.next(&coro).await.unwrap() {
session.replay(&coro, operation).await.unwrap();
}
}
let mut rows = Vec::new();
let mut stmt = conn2
.prepare("SELECT * FROM sqlite_schema WHERE name IN ('t')")
.unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
rows,
vec![vec![
turso_core::Value::Text(turso_core::types::Text::new("table")),
turso_core::Value::Text(turso_core::types::Text::new("t")),
turso_core::Value::Text(turso_core::types::Text::new("t")),
turso_core::Value::Integer(3),
turso_core::Value::Text(turso_core::types::Text::new(
"CREATE TABLE t (x TEXT PRIMARY KEY, z)"
)),
]]
);
crate::Result::Ok(())
}
});
loop {
match gen.resume_with(Ok(())) {
genawaiter::GeneratorState::Yielded(..) => io.run_once().unwrap(),
genawaiter::GeneratorState::Complete(result) => {
result.unwrap();
break;
}
}
}
}
#[test]
pub fn test_database_tape_replay_non_overlapping_updates() {
let temp_file1 = NamedTempFile::new().unwrap();
let db_path1 = temp_file1.path().to_str().unwrap();
let temp_file2 = NamedTempFile::new().unwrap();
let db_path2 = temp_file2.path().to_str().unwrap();
let temp_file3 = NamedTempFile::new().unwrap();
let db_path3 = temp_file3.path().to_str().unwrap();
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap();
let db1 = Arc::new(DatabaseTape::new(db1));
let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap();
let db2 = Arc::new(DatabaseTape::new(db2));
let db3 = turso_core::Database::open_file(io.clone(), db_path3, false, true).unwrap();
let db3 = Arc::new(DatabaseTape::new(db3));
let mut gen = genawaiter::sync::Gen::new({
|coro| async move {
let conn1 = db1.connect(&coro).await.unwrap();
conn1
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)")
.unwrap();
conn1
.execute("INSERT INTO t VALUES ('turso', 1, 2)")
.unwrap();
conn1
.execute("UPDATE t SET y = 10 WHERE x = 'turso'")
.unwrap();
let conn2 = db2.connect_untracked().unwrap();
conn2
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)")
.unwrap();
conn2
.execute("INSERT INTO t VALUES ('turso', 1, 2)")
.unwrap();
let conn2 = db2.connect(&coro).await.unwrap();
conn2
.execute("UPDATE t SET z = 20 WHERE x = 'turso'")
.unwrap();
let conn3 = db3.connect(&coro).await.unwrap();
{
let opts = DatabaseReplaySessionOpts {
use_implicit_rowid: false,
};
let mut session = db3.start_replay_session(&coro, opts).await.unwrap();
let opts = DatabaseChangesIteratorOpts {
ignore_schema_changes: false,
..Default::default()
};
let mut iterator = db1.iterate_changes(opts.clone()).unwrap();
while let Some(operation) = iterator.next(&coro).await.unwrap() {
session.replay(&coro, operation).await.unwrap();
}
let mut iterator = db2.iterate_changes(opts.clone()).unwrap();
while let Some(operation) = iterator.next(&coro).await.unwrap() {
session.replay(&coro, operation).await.unwrap();
}
}
let mut rows = Vec::new();
let mut stmt = conn3.prepare("SELECT * FROM t").unwrap();
while let Some(row) = run_stmt(&coro, &mut stmt).await.unwrap() {
rows.push(row.get_values().cloned().collect::<Vec<_>>());
}
assert_eq!(
rows,
vec![vec![
turso_core::Value::Text(turso_core::types::Text::new("turso")),
turso_core::Value::Integer(10),
turso_core::Value::Integer(20),
]]
);
crate::Result::Ok(())
}
});
loop {
match gen.resume_with(Ok(())) {
genawaiter::GeneratorState::Yielded(..) => io.run_once().unwrap(),
genawaiter::GeneratorState::Complete(result) => {
result.unwrap();
break;
}
}
}
}
}

View File

@@ -66,6 +66,8 @@ pub struct DatabaseChange {
pub before: Option<Vec<u8>>,
/// Binary record of the row after the change, if CDC pragma set to either 'after' or 'full'
pub after: Option<Vec<u8>>,
/// Binary record from "updates" column, if CDC pragma set to 'full'
pub updates: Option<Vec<u8>>,
}
impl DatabaseChange {
@@ -86,6 +88,7 @@ impl DatabaseChange {
after: self.after.ok_or_else(|| {
Error::DatabaseTapeError("cdc_mode must be set to 'full'".to_string())
})?,
updates: self.updates,
},
DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Insert {
after: self.after.ok_or_else(|| {
@@ -122,6 +125,7 @@ impl DatabaseChange {
"cdc_mode must be set to either 'full' or 'before'".to_string(),
)
})?,
updates: None,
},
DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Delete {
before: self.after.ok_or_else(|| {
@@ -166,6 +170,7 @@ impl TryFrom<&turso_core::Row> for DatabaseChange {
let id = get_core_value_i64(row, 4)?;
let before = get_core_value_blob_or_null(row, 5)?;
let after = get_core_value_blob_or_null(row, 6)?;
let updates = get_core_value_blob_or_null(row, 7)?;
let change_type = match change_type {
-1 => DatabaseChangeType::Delete,
@@ -185,14 +190,23 @@ impl TryFrom<&turso_core::Row> for DatabaseChange {
id,
before,
after,
updates,
})
}
}
pub enum DatabaseTapeRowChangeType {
Delete { before: Vec<u8> },
Update { before: Vec<u8>, after: Vec<u8> },
Insert { after: Vec<u8> },
Delete {
before: Vec<u8>,
},
Update {
before: Vec<u8>,
after: Vec<u8>,
updates: Option<Vec<u8>>,
},
Insert {
after: Vec<u8>,
},
}
/// [DatabaseTapeOperation] extends [DatabaseTapeRowChange] by adding information about transaction boundary
@@ -222,10 +236,15 @@ impl std::fmt::Debug for DatabaseTapeRowChangeType {
.debug_struct("Delete")
.field("before.len()", &before.len())
.finish(),
Self::Update { before, after } => f
Self::Update {
before,
after,
updates,
} => f
.debug_struct("Update")
.field("before.len()", &before.len())
.field("after.len()", &after.len())
.field("updates.len()", &updates.as_ref().map(|x| x.len()))
.finish(),
Self::Insert { after } => f
.debug_struct("Insert")