mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-10 19:54:24 +01:00
use DatabaseReplayGenerator in the DatabaseTape
This commit is contained in:
@@ -15,7 +15,7 @@ use crate::{
|
||||
Result,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DatabaseSyncEngineOpts {
|
||||
pub client_name: String,
|
||||
pub wal_pull_batch_size: u64,
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::{
|
||||
use turso_core::{types::WalFrameInfo, StepResult};
|
||||
|
||||
use crate::{
|
||||
database_replay_generator::{DatabaseReplayGenerator, ReplayInfo},
|
||||
database_sync_operations::WAL_FRAME_HEADER,
|
||||
errors::Error,
|
||||
types::{
|
||||
@@ -170,12 +171,12 @@ impl DatabaseTape {
|
||||
let conn = self.connect(coro).await?;
|
||||
conn.execute("BEGIN IMMEDIATE")?;
|
||||
Ok(DatabaseReplaySession {
|
||||
conn,
|
||||
conn: conn.clone(),
|
||||
cached_delete_stmt: HashMap::new(),
|
||||
cached_insert_stmt: HashMap::new(),
|
||||
cached_update_stmt: HashMap::new(),
|
||||
in_txn: true,
|
||||
opts,
|
||||
generator: DatabaseReplayGenerator { conn, opts },
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -414,23 +415,31 @@ pub struct DatabaseReplaySessionOpts {
|
||||
pub use_implicit_rowid: bool,
|
||||
}
|
||||
|
||||
struct DeleteCachedStmt {
|
||||
struct CachedStmt {
|
||||
stmt: turso_core::Statement,
|
||||
pk_column_indices: Option<Vec<usize>>, // if None - use rowid instead
|
||||
}
|
||||
|
||||
struct UpdateCachedStmt {
|
||||
stmt: turso_core::Statement,
|
||||
pk_column_indices: Option<Vec<usize>>, // if None - use rowid instead
|
||||
info: ReplayInfo,
|
||||
}
|
||||
|
||||
pub struct DatabaseReplaySession {
|
||||
conn: Arc<turso_core::Connection>,
|
||||
cached_delete_stmt: HashMap<String, DeleteCachedStmt>,
|
||||
cached_insert_stmt: HashMap<(String, usize), turso_core::Statement>,
|
||||
cached_update_stmt: HashMap<(String, Vec<bool>), UpdateCachedStmt>,
|
||||
cached_delete_stmt: HashMap<String, CachedStmt>,
|
||||
cached_insert_stmt: HashMap<(String, usize), CachedStmt>,
|
||||
cached_update_stmt: HashMap<(String, Vec<bool>), CachedStmt>,
|
||||
in_txn: bool,
|
||||
opts: DatabaseReplaySessionOpts,
|
||||
generator: DatabaseReplayGenerator,
|
||||
}
|
||||
|
||||
async fn replay_stmt(
|
||||
coro: &Coro,
|
||||
cached: &mut CachedStmt,
|
||||
values: Vec<turso_core::Value>,
|
||||
) -> Result<()> {
|
||||
cached.stmt.reset();
|
||||
for (i, value) in values.into_iter().enumerate() {
|
||||
cached.stmt.bind_at((i + 1).try_into().unwrap(), value);
|
||||
}
|
||||
exec_stmt(coro, &mut cached.stmt).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl DatabaseReplaySession {
|
||||
@@ -452,107 +461,116 @@ impl DatabaseReplaySession {
|
||||
self.conn.execute("BEGIN IMMEDIATE")?;
|
||||
self.in_txn = true;
|
||||
}
|
||||
tracing::trace!("replay: change={:?}", change);
|
||||
let table_name = &change.table_name;
|
||||
let table = &change.table_name;
|
||||
let change_type = (&change.change).into();
|
||||
|
||||
if table_name == SQLITE_SCHEMA_TABLE {
|
||||
// sqlite_schema table: type, name, tbl_name, rootpage, sql
|
||||
match change.change {
|
||||
DatabaseTapeRowChangeType::Delete { before } => {
|
||||
let before = parse_bin_record(before)?;
|
||||
assert!(before.len() == 5);
|
||||
let Some(turso_core::Value::Text(entity_type)) = before.first() else {
|
||||
panic!(
|
||||
"unexpected 'type' column of sqlite_schema table: {:?}",
|
||||
before.first()
|
||||
);
|
||||
};
|
||||
let Some(turso_core::Value::Text(entity_name)) = before.get(1) else {
|
||||
panic!(
|
||||
"unexpected 'name' column of sqlite_schema table: {:?}",
|
||||
before.get(1)
|
||||
);
|
||||
};
|
||||
self.conn.execute(format!(
|
||||
"DROP {} {}",
|
||||
entity_type.as_str(),
|
||||
entity_name.as_str()
|
||||
))?;
|
||||
}
|
||||
DatabaseTapeRowChangeType::Insert { after } => {
|
||||
let after = parse_bin_record(after)?;
|
||||
assert!(after.len() == 5);
|
||||
let Some(turso_core::Value::Text(sql)) = after.last() else {
|
||||
return Err(Error::DatabaseTapeError(format!(
|
||||
"unexpected 'sql' column of sqlite_schema table: {:?}",
|
||||
after.last()
|
||||
)));
|
||||
};
|
||||
self.conn.execute(sql.as_str())?;
|
||||
}
|
||||
DatabaseTapeRowChangeType::Update { updates, .. } => {
|
||||
let Some(updates) = updates else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
"'updates' column of CDC table must be populated".to_string(),
|
||||
));
|
||||
};
|
||||
let updates = parse_bin_record(updates)?;
|
||||
assert!(updates.len() % 2 == 0);
|
||||
assert!(updates.len() / 2 == 5);
|
||||
let turso_core::Value::Text(ddl_stmt) = updates.last().unwrap() else {
|
||||
panic!(
|
||||
"unexpected 'sql' column of sqlite_schema table update record: {:?}",
|
||||
updates.last()
|
||||
);
|
||||
};
|
||||
self.conn.execute(ddl_stmt.as_str())?;
|
||||
}
|
||||
if table == SQLITE_SCHEMA_TABLE {
|
||||
let replay_info = self.generator.replay_info(coro, &change).await?;
|
||||
for replay in &replay_info {
|
||||
self.conn.execute(replay.query.as_str())?;
|
||||
}
|
||||
} else {
|
||||
match change.change {
|
||||
DatabaseTapeRowChangeType::Delete { before } => {
|
||||
let before = parse_bin_record(before)?;
|
||||
self.replay_delete(coro, table_name, change.id, before)
|
||||
.await?
|
||||
let key = self.populate_delete_stmt(coro, table).await?;
|
||||
tracing::trace!(
|
||||
"ready to use prepared delete statement for replay: key={}",
|
||||
key
|
||||
);
|
||||
let cached = self.cached_delete_stmt.get_mut(key).unwrap();
|
||||
cached.stmt.reset();
|
||||
let values = self.generator.replay_values(
|
||||
&cached.info,
|
||||
change_type,
|
||||
change.id,
|
||||
before,
|
||||
None,
|
||||
);
|
||||
replay_stmt(coro, cached, values).await?;
|
||||
}
|
||||
DatabaseTapeRowChangeType::Insert { after } => {
|
||||
let key = self.populate_insert_stmt(coro, table, after.len()).await?;
|
||||
tracing::trace!(
|
||||
"ready to use prepared insert statement for replay: key={:?}",
|
||||
key
|
||||
);
|
||||
let cached = self.cached_insert_stmt.get_mut(&key).unwrap();
|
||||
cached.stmt.reset();
|
||||
let values = self.generator.replay_values(
|
||||
&cached.info,
|
||||
change_type,
|
||||
change.id,
|
||||
after,
|
||||
None,
|
||||
);
|
||||
replay_stmt(coro, cached, values).await?;
|
||||
}
|
||||
DatabaseTapeRowChangeType::Update {
|
||||
after,
|
||||
updates: Some(updates),
|
||||
..
|
||||
} => {
|
||||
assert!(updates.len() % 2 == 0);
|
||||
let columns_cnt = updates.len() / 2;
|
||||
let mut columns = Vec::with_capacity(columns_cnt);
|
||||
for value in updates.iter().take(columns_cnt) {
|
||||
columns.push(match value {
|
||||
turso_core::Value::Integer(x @ (1 | 0)) => *x > 0,
|
||||
_ => panic!("unexpected 'changes' binary record first-half component: {value:?}")
|
||||
});
|
||||
}
|
||||
let key = self.populate_update_stmt(coro, table, &columns).await?;
|
||||
tracing::trace!(
|
||||
"ready to use prepared update statement for replay: key={:?}",
|
||||
key
|
||||
);
|
||||
let cached = self.cached_update_stmt.get_mut(&key).unwrap();
|
||||
cached.stmt.reset();
|
||||
let values = self.generator.replay_values(
|
||||
&cached.info,
|
||||
change_type,
|
||||
change.id,
|
||||
after,
|
||||
Some(updates),
|
||||
);
|
||||
replay_stmt(coro, cached, values).await?;
|
||||
}
|
||||
DatabaseTapeRowChangeType::Update {
|
||||
before,
|
||||
after,
|
||||
updates,
|
||||
updates: None,
|
||||
} => {
|
||||
let after = parse_bin_record(after)?;
|
||||
if let Some(updates) = updates {
|
||||
let updates = parse_bin_record(updates)?;
|
||||
assert!(updates.len() % 2 == 0);
|
||||
let columns_cnt = updates.len() / 2;
|
||||
let mut columns = Vec::with_capacity(columns_cnt);
|
||||
let mut values = Vec::with_capacity(columns_cnt);
|
||||
for (i, value) in updates.into_iter().enumerate() {
|
||||
if i < columns_cnt {
|
||||
columns.push(match value {
|
||||
turso_core::Value::Integer(x @ (1 | 0)) => x > 0,
|
||||
_ => panic!("unexpected 'changes' binary record first-half component: {value:?}")
|
||||
})
|
||||
} else {
|
||||
values.push(value);
|
||||
}
|
||||
}
|
||||
self.replay_update(
|
||||
coro, table_name, change.id, columns, after, values,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
let before = parse_bin_record(before)?;
|
||||
self.replay_delete(coro, table_name, change.id, before)
|
||||
.await?;
|
||||
self.replay_insert(coro, table_name, change.id, after)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
DatabaseTapeRowChangeType::Insert { after } => {
|
||||
let values = parse_bin_record(after)?;
|
||||
self.replay_insert(coro, table_name, change.id, values)
|
||||
.await?;
|
||||
let key = self.populate_delete_stmt(coro, table).await?;
|
||||
tracing::trace!(
|
||||
"ready to use prepared delete statement for replay of update: key={:?}",
|
||||
key
|
||||
);
|
||||
let cached = self.cached_delete_stmt.get_mut(key).unwrap();
|
||||
cached.stmt.reset();
|
||||
let values = self.generator.replay_values(
|
||||
&cached.info,
|
||||
change_type,
|
||||
change.id,
|
||||
before,
|
||||
None,
|
||||
);
|
||||
replay_stmt(coro, cached, values).await?;
|
||||
|
||||
let key = self.populate_insert_stmt(coro, table, after.len()).await?;
|
||||
tracing::trace!(
|
||||
"ready to use prepared insert statement for replay of update: key={:?}",
|
||||
key
|
||||
);
|
||||
let cached = self.cached_insert_stmt.get_mut(&key).unwrap();
|
||||
cached.stmt.reset();
|
||||
let values = self.generator.replay_values(
|
||||
&cached.info,
|
||||
change_type,
|
||||
change.id,
|
||||
after,
|
||||
None,
|
||||
);
|
||||
replay_stmt(coro, cached, values).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -560,289 +578,55 @@ impl DatabaseReplaySession {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn replay_delete(
|
||||
&mut self,
|
||||
coro: &Coro,
|
||||
table_name: &str,
|
||||
id: i64,
|
||||
mut values: Vec<turso_core::Value>,
|
||||
) -> Result<()> {
|
||||
let cached = self.cached_delete_stmt(coro, table_name).await?;
|
||||
if let Some(pk_column_indices) = &cached.pk_column_indices {
|
||||
for (i, pk_column) in pk_column_indices.iter().enumerate() {
|
||||
let value = std::mem::replace(&mut values[*pk_column], turso_core::Value::Null);
|
||||
cached.stmt.bind_at((i + 1).try_into().unwrap(), value);
|
||||
}
|
||||
} else {
|
||||
let value = turso_core::Value::Integer(id);
|
||||
cached.stmt.bind_at(1.try_into().unwrap(), value);
|
||||
async fn populate_delete_stmt<'a>(&mut self, coro: &Coro, table: &'a str) -> Result<&'a str> {
|
||||
if self.cached_delete_stmt.contains_key(table) {
|
||||
return Ok(table);
|
||||
}
|
||||
exec_stmt(coro, &mut cached.stmt).await?;
|
||||
Ok(())
|
||||
tracing::trace!("prepare delete statement for replay: table={}", table);
|
||||
let info = self.generator.delete_query(coro, table).await?;
|
||||
let stmt = self.conn.prepare(&info.query)?;
|
||||
self.cached_delete_stmt
|
||||
.insert(table.to_string(), CachedStmt { stmt, info });
|
||||
Ok(table)
|
||||
}
|
||||
async fn replay_insert(
|
||||
async fn populate_insert_stmt(
|
||||
&mut self,
|
||||
coro: &Coro,
|
||||
table_name: &str,
|
||||
id: i64,
|
||||
values: Vec<turso_core::Value>,
|
||||
) -> Result<()> {
|
||||
let columns = values.len();
|
||||
let use_implicit_rowid = self.opts.use_implicit_rowid;
|
||||
let stmt = self.cached_insert_stmt(coro, table_name, columns).await?;
|
||||
stmt.reset();
|
||||
|
||||
for (i, value) in values.into_iter().enumerate() {
|
||||
stmt.bind_at((i + 1).try_into().unwrap(), value);
|
||||
}
|
||||
if use_implicit_rowid {
|
||||
stmt.bind_at(
|
||||
(columns + 1).try_into().unwrap(),
|
||||
turso_core::Value::Integer(id),
|
||||
);
|
||||
}
|
||||
exec_stmt(coro, stmt).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn replay_update(
|
||||
&mut self,
|
||||
coro: &Coro,
|
||||
table_name: &str,
|
||||
id: i64,
|
||||
columns: Vec<bool>,
|
||||
mut full: Vec<turso_core::Value>,
|
||||
mut updates: Vec<turso_core::Value>,
|
||||
) -> Result<()> {
|
||||
let cached = self.cached_update_stmt(coro, table_name, &columns).await?;
|
||||
let mut position: usize = 1;
|
||||
for (i, updated) in columns.into_iter().enumerate() {
|
||||
if !updated {
|
||||
continue;
|
||||
}
|
||||
let value = std::mem::replace(&mut updates[i], turso_core::Value::Null);
|
||||
cached.stmt.bind_at(position.try_into().unwrap(), value);
|
||||
position += 1;
|
||||
}
|
||||
if let Some(pk_column_indices) = &cached.pk_column_indices {
|
||||
for pk_column in pk_column_indices {
|
||||
let value = std::mem::replace(&mut full[*pk_column], turso_core::Value::Null);
|
||||
cached.stmt.bind_at(position.try_into().unwrap(), value);
|
||||
position += 1
|
||||
}
|
||||
} else {
|
||||
let value = turso_core::Value::Integer(id);
|
||||
cached.stmt.bind_at(position.try_into().unwrap(), value);
|
||||
}
|
||||
exec_stmt(coro, &mut cached.stmt).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn cached_delete_stmt(
|
||||
&mut self,
|
||||
coro: &Coro,
|
||||
table_name: &str,
|
||||
) -> Result<&mut DeleteCachedStmt> {
|
||||
if !self.cached_delete_stmt.contains_key(table_name) {
|
||||
tracing::trace!("prepare delete statement for replay: table={}", table_name);
|
||||
let stmt = self.delete_query(coro, table_name).await?;
|
||||
self.cached_delete_stmt.insert(table_name.to_string(), stmt);
|
||||
}
|
||||
tracing::trace!(
|
||||
"ready to use prepared delete statement for replay: table={}",
|
||||
table_name
|
||||
);
|
||||
let cached = self.cached_delete_stmt.get_mut(table_name).unwrap();
|
||||
cached.stmt.reset();
|
||||
Ok(cached)
|
||||
}
|
||||
async fn cached_insert_stmt(
|
||||
&mut self,
|
||||
coro: &Coro,
|
||||
table_name: &str,
|
||||
table: &str,
|
||||
columns: usize,
|
||||
) -> Result<&mut turso_core::Statement> {
|
||||
let key = (table_name.to_string(), columns);
|
||||
if !self.cached_insert_stmt.contains_key(&key) {
|
||||
tracing::trace!(
|
||||
"prepare insert statement for replay: table={}, columns={}",
|
||||
table_name,
|
||||
columns
|
||||
);
|
||||
let stmt = self.insert_query(coro, table_name, columns).await?;
|
||||
self.cached_insert_stmt.insert(key.clone(), stmt);
|
||||
) -> Result<(String, usize)> {
|
||||
let key = (table.to_string(), columns);
|
||||
if self.cached_insert_stmt.contains_key(&key) {
|
||||
return Ok(key);
|
||||
}
|
||||
tracing::trace!(
|
||||
"ready to use prepared insert statement for replay: table={}, columns={}",
|
||||
table_name,
|
||||
"prepare insert statement for replay: table={}, columns={}",
|
||||
table,
|
||||
columns
|
||||
);
|
||||
let stmt = self.cached_insert_stmt.get_mut(&key).unwrap();
|
||||
stmt.reset();
|
||||
Ok(stmt)
|
||||
let info = self.generator.insert_query(coro, table, columns).await?;
|
||||
let stmt = self.conn.prepare(&info.query)?;
|
||||
self.cached_insert_stmt
|
||||
.insert(key.clone(), CachedStmt { stmt, info });
|
||||
Ok(key)
|
||||
}
|
||||
async fn cached_update_stmt(
|
||||
async fn populate_update_stmt(
|
||||
&mut self,
|
||||
coro: &Coro,
|
||||
table_name: &str,
|
||||
table: &str,
|
||||
columns: &[bool],
|
||||
) -> Result<&mut UpdateCachedStmt> {
|
||||
let key = (table_name.to_string(), columns.to_owned());
|
||||
if !self.cached_update_stmt.contains_key(&key) {
|
||||
tracing::trace!("prepare update statement for replay: table={}", table_name);
|
||||
let stmt = self.update_query(coro, table_name, columns).await?;
|
||||
self.cached_update_stmt.insert(key.clone(), stmt);
|
||||
) -> Result<(String, Vec<bool>)> {
|
||||
let key = (table.to_string(), columns.to_owned());
|
||||
if self.cached_update_stmt.contains_key(&key) {
|
||||
return Ok(key);
|
||||
}
|
||||
tracing::trace!(
|
||||
"ready to use prepared update statement for replay: table={}",
|
||||
table_name
|
||||
);
|
||||
let cached = self.cached_update_stmt.get_mut(&key).unwrap();
|
||||
cached.stmt.reset();
|
||||
Ok(cached)
|
||||
tracing::trace!("prepare update statement for replay: table={}", table);
|
||||
let info = self.generator.update_query(coro, table, columns).await?;
|
||||
let stmt = self.conn.prepare(&info.query)?;
|
||||
self.cached_update_stmt
|
||||
.insert(key.clone(), CachedStmt { stmt, info });
|
||||
Ok(key)
|
||||
}
|
||||
async fn insert_query(
|
||||
&self,
|
||||
coro: &Coro,
|
||||
table_name: &str,
|
||||
columns: usize,
|
||||
) -> Result<turso_core::Statement> {
|
||||
let query = if !self.opts.use_implicit_rowid {
|
||||
let placeholders = ["?"].repeat(columns).join(",");
|
||||
format!("INSERT INTO {table_name} VALUES ({placeholders})")
|
||||
} else {
|
||||
let mut table_info_stmt = self.conn.prepare(format!(
|
||||
"SELECT name FROM pragma_table_info('{table_name}')"
|
||||
))?;
|
||||
let mut column_names = Vec::with_capacity(columns + 1);
|
||||
while let Some(column) = run_stmt_once(coro, &mut table_info_stmt).await? {
|
||||
let turso_core::Value::Text(text) = column.get_value(0) else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
"unexpected column type for pragma_table_info query".to_string(),
|
||||
));
|
||||
};
|
||||
column_names.push(text.to_string());
|
||||
}
|
||||
column_names.push("rowid".to_string());
|
||||
|
||||
let placeholders = ["?"].repeat(columns + 1).join(",");
|
||||
let column_names = column_names.join(", ");
|
||||
format!("INSERT INTO {table_name}({column_names}) VALUES ({placeholders})")
|
||||
};
|
||||
Ok(self.conn.prepare(&query)?)
|
||||
}
|
||||
async fn delete_query(&self, coro: &Coro, table_name: &str) -> Result<DeleteCachedStmt> {
|
||||
let (query, pk_column_indices) = if self.opts.use_implicit_rowid {
|
||||
(format!("DELETE FROM {table_name} WHERE rowid = ?"), None)
|
||||
} else {
|
||||
let mut pk_info_stmt = self.conn.prepare(format!(
|
||||
"SELECT cid, name FROM pragma_table_info('{table_name}') WHERE pk = 1"
|
||||
))?;
|
||||
let mut pk_predicates = Vec::with_capacity(1);
|
||||
let mut pk_column_indices = Vec::with_capacity(1);
|
||||
while let Some(column) = run_stmt_once(coro, &mut pk_info_stmt).await? {
|
||||
let turso_core::Value::Integer(column_id) = column.get_value(0) else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
"unexpected column type for pragma_table_info query".to_string(),
|
||||
));
|
||||
};
|
||||
let turso_core::Value::Text(name) = column.get_value(1) else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
"unexpected column type for pragma_table_info query".to_string(),
|
||||
));
|
||||
};
|
||||
pk_predicates.push(format!("{name} = ?"));
|
||||
pk_column_indices.push(*column_id as usize);
|
||||
}
|
||||
|
||||
if pk_column_indices.is_empty() {
|
||||
(format!("DELETE FROM {table_name} WHERE rowid = ?"), None)
|
||||
} else {
|
||||
let pk_predicates = pk_predicates.join(" AND ");
|
||||
let query = format!("DELETE FROM {table_name} WHERE {pk_predicates}");
|
||||
(query, Some(pk_column_indices))
|
||||
}
|
||||
};
|
||||
let use_implicit_rowid = self.opts.use_implicit_rowid;
|
||||
tracing::trace!("delete_query: table_name={table_name}, query={query}, use_implicit_rowid={use_implicit_rowid}");
|
||||
let stmt = self.conn.prepare(&query)?;
|
||||
Ok(DeleteCachedStmt {
|
||||
stmt,
|
||||
pk_column_indices,
|
||||
})
|
||||
}
|
||||
async fn update_query(
|
||||
&self,
|
||||
coro: &Coro,
|
||||
table_name: &str,
|
||||
columns: &[bool],
|
||||
) -> Result<UpdateCachedStmt> {
|
||||
let mut table_info_stmt = self.conn.prepare(format!(
|
||||
"SELECT cid, name, pk FROM pragma_table_info('{table_name}')"
|
||||
))?;
|
||||
let mut pk_predicates = Vec::with_capacity(1);
|
||||
let mut pk_column_indices = Vec::with_capacity(1);
|
||||
let mut column_updates = Vec::with_capacity(1);
|
||||
while let Some(column) = run_stmt_once(coro, &mut table_info_stmt).await? {
|
||||
let turso_core::Value::Integer(column_id) = column.get_value(0) else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
"unexpected column type for pragma_table_info query".to_string(),
|
||||
));
|
||||
};
|
||||
let turso_core::Value::Text(name) = column.get_value(1) else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
"unexpected column type for pragma_table_info query".to_string(),
|
||||
));
|
||||
};
|
||||
let turso_core::Value::Integer(pk) = column.get_value(2) else {
|
||||
return Err(Error::DatabaseTapeError(
|
||||
"unexpected column type for pragma_table_info query".to_string(),
|
||||
));
|
||||
};
|
||||
if *pk == 1 {
|
||||
pk_predicates.push(format!("{name} = ?"));
|
||||
pk_column_indices.push(*column_id as usize);
|
||||
}
|
||||
if columns[*column_id as usize] {
|
||||
column_updates.push(format!("{name} = ?"));
|
||||
}
|
||||
}
|
||||
|
||||
let (query, pk_column_indices) = if self.opts.use_implicit_rowid {
|
||||
(
|
||||
format!(
|
||||
"UPDATE {table_name} SET {} WHERE rowid = ?",
|
||||
column_updates.join(", ")
|
||||
),
|
||||
None,
|
||||
)
|
||||
} else {
|
||||
(
|
||||
format!(
|
||||
"UPDATE {table_name} SET {} WHERE {}",
|
||||
column_updates.join(", "),
|
||||
pk_predicates.join(" AND ")
|
||||
),
|
||||
Some(pk_column_indices),
|
||||
)
|
||||
};
|
||||
let stmt = self.conn.prepare(&query)?;
|
||||
let cached_stmt = UpdateCachedStmt {
|
||||
stmt,
|
||||
pk_column_indices,
|
||||
};
|
||||
Ok(cached_stmt)
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_bin_record(bin_record: Vec<u8>) -> Result<Vec<turso_core::Value>> {
|
||||
let record = turso_core::types::ImmutableRecord::from_bin_record(bin_record);
|
||||
let mut cursor = turso_core::types::RecordCursor::new();
|
||||
let columns = cursor.count(&record);
|
||||
let mut values = Vec::with_capacity(columns);
|
||||
for i in 0..columns {
|
||||
let value = cursor.get_value(&record, i)?;
|
||||
values.push(value.to_owned());
|
||||
}
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod database_replay_generator;
|
||||
pub mod database_sync_engine;
|
||||
pub mod database_sync_operations;
|
||||
pub mod database_tape;
|
||||
|
||||
Reference in New Issue
Block a user