mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-30 13:24:22 +01:00
apply create DDL operation with IF NOT EXISTS clause in order to make them idempotent
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use turso_parser::parser::Parser;
|
||||
|
||||
use crate::{
|
||||
database_tape::{run_stmt_once, DatabaseReplaySessionOpts},
|
||||
errors::Error,
|
||||
@@ -211,9 +213,43 @@ impl DatabaseReplayGenerator {
|
||||
after.last()
|
||||
)));
|
||||
};
|
||||
let mut parser = Parser::new(sql.as_str().as_bytes());
|
||||
let mut ast = parser
|
||||
.next()
|
||||
.ok_or_else(|| {
|
||||
Error::DatabaseTapeError(format!(
|
||||
"unexpected DDL query: {}",
|
||||
sql.as_str()
|
||||
))
|
||||
})?
|
||||
.map_err(|e| {
|
||||
Error::DatabaseTapeError(format!(
|
||||
"unexpected DDL query {}: {}",
|
||||
e,
|
||||
sql.as_str()
|
||||
))
|
||||
})?;
|
||||
let turso_parser::ast::Cmd::Stmt(stmt) = &mut ast else {
|
||||
return Err(Error::DatabaseTapeError(format!(
|
||||
"unexpected DDL query: {}",
|
||||
sql.as_str()
|
||||
)));
|
||||
};
|
||||
match stmt {
|
||||
turso_parser::ast::Stmt::CreateTable { if_not_exists, .. }
|
||||
| turso_parser::ast::Stmt::CreateIndex { if_not_exists, .. }
|
||||
| turso_parser::ast::Stmt::CreateTrigger { if_not_exists, .. }
|
||||
| turso_parser::ast::Stmt::CreateMaterializedView {
|
||||
if_not_exists, ..
|
||||
}
|
||||
| turso_parser::ast::Stmt::CreateView { if_not_exists, .. } => {
|
||||
*if_not_exists = true
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
let insert = ReplayInfo {
|
||||
change_type: DatabaseChangeType::Insert,
|
||||
query: sql.as_str().to_string(),
|
||||
query: ast.to_string(),
|
||||
pk_column_indices: None,
|
||||
column_names: Vec::new(),
|
||||
is_ddl_replay: true,
|
||||
|
||||
@@ -1387,4 +1387,91 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_database_tape_replay_ddl_changes_idempotent() {
|
||||
let temp_file1 = NamedTempFile::new().unwrap();
|
||||
let db_path1 = temp_file1.path().to_str().unwrap();
|
||||
let temp_file2 = NamedTempFile::new().unwrap();
|
||||
let db_path2 = temp_file2.path().to_str().unwrap();
|
||||
let temp_file3 = NamedTempFile::new().unwrap();
|
||||
let db_path3 = temp_file3.path().to_str().unwrap();
|
||||
|
||||
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
|
||||
|
||||
let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap();
|
||||
let db1 = Arc::new(DatabaseTape::new(db1));
|
||||
|
||||
let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap();
|
||||
let db2 = Arc::new(DatabaseTape::new(db2));
|
||||
|
||||
let db3 = turso_core::Database::open_file(io.clone(), db_path3, false, true).unwrap();
|
||||
let db3 = Arc::new(DatabaseTape::new(db3));
|
||||
|
||||
let mut gen = genawaiter::sync::Gen::new({
|
||||
|coro| async move {
|
||||
let coro: Coro<()> = coro.into();
|
||||
let conn1 = db1.connect(&coro).await.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)")
|
||||
.unwrap();
|
||||
conn1.execute("CREATE INDEX t_idx ON t(y, z)").unwrap();
|
||||
|
||||
let conn2 = db2.connect(&coro).await.unwrap();
|
||||
conn2
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)")
|
||||
.unwrap();
|
||||
conn2.execute("CREATE INDEX t_idx ON t(y, z)").unwrap();
|
||||
|
||||
let conn3 = db3.connect(&coro).await.unwrap();
|
||||
{
|
||||
let opts = DatabaseReplaySessionOpts {
|
||||
use_implicit_rowid: false,
|
||||
};
|
||||
let mut session = db3.start_replay_session(&coro, opts).await.unwrap();
|
||||
|
||||
let opts = DatabaseChangesIteratorOpts {
|
||||
ignore_schema_changes: false,
|
||||
..Default::default()
|
||||
};
|
||||
let mut iterator = db1.iterate_changes(opts.clone()).unwrap();
|
||||
while let Some(operation) = iterator.next(&coro).await.unwrap() {
|
||||
tracing::info!("1. operation: {:?}", operation);
|
||||
session.replay(&coro, operation).await.unwrap();
|
||||
}
|
||||
|
||||
let mut iterator = db2.iterate_changes(opts.clone()).unwrap();
|
||||
while let Some(operation) = iterator.next(&coro).await.unwrap() {
|
||||
tracing::info!("2. operation: {:?}", operation);
|
||||
session.replay(&coro, operation).await.unwrap();
|
||||
}
|
||||
}
|
||||
let mut rows = Vec::new();
|
||||
let mut stmt = conn3.prepare("SELECT name FROM sqlite_master").unwrap();
|
||||
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
|
||||
rows.push(row.get_value(0).to_text().unwrap().to_string());
|
||||
}
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
"sqlite_sequence".to_string(),
|
||||
"turso_cdc".to_string(),
|
||||
"t".to_string(),
|
||||
"sqlite_autoindex_t_1".to_string(),
|
||||
"t_idx".to_string()
|
||||
]
|
||||
);
|
||||
crate::Result::Ok(())
|
||||
}
|
||||
});
|
||||
loop {
|
||||
match gen.resume_with(Ok(())) {
|
||||
genawaiter::GeneratorState::Yielded(..) => io.step().unwrap(),
|
||||
genawaiter::GeneratorState::Complete(result) => {
|
||||
result.unwrap();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user