mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-28 05:24:22 +01:00
Merge 'test/fuzz: add ALTER TABLE column ops to tx isolation fuzz test' from Jussi Saurio
## Beef Adds `AddColumn`, `DropColumn`, `RenameColumn` ## Details - Previously the test was hardcoded to assume there's always 2 named columns, so changed a bunch of things for this reason - Still assumes the primary key column is always `id` and is never renamed or dropped etc. Closes #2434
This commit is contained in:
@@ -5,28 +5,57 @@ use std::collections::HashMap;
|
||||
use turso::{Builder, Value};
|
||||
|
||||
// In-memory representation of the database state
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
struct DbRow {
|
||||
id: i64,
|
||||
text: String,
|
||||
other_columns: HashMap<String, Value>,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for DbRow {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "(id={}, text={})", self.id, self.text)
|
||||
if self.other_columns.is_empty() {
|
||||
write!(f, "(id={})", self.id)
|
||||
} else {
|
||||
write!(
|
||||
f,
|
||||
"(id={},{})",
|
||||
self.id,
|
||||
self.other_columns
|
||||
.iter()
|
||||
.map(|(k, v)| format!("{k}={v:?}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", "),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct TransactionState {
|
||||
// What this transaction can see (snapshot)
|
||||
// The schema this transaction can see (snapshot)
|
||||
schema: HashMap<String, TableSchema>,
|
||||
// The rows this transaction can see (snapshot)
|
||||
visible_rows: HashMap<i64, DbRow>,
|
||||
// Pending changes in this transaction
|
||||
pending_changes: Vec<Operation>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
struct Column {
|
||||
name: String,
|
||||
ty: String,
|
||||
primary_key: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
struct TableSchema {
|
||||
columns: Vec<Column>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ShadowDb {
|
||||
// Schema
|
||||
schema: HashMap<String, TableSchema>,
|
||||
// Committed state (what's actually in the database)
|
||||
committed_rows: HashMap<i64, DbRow>,
|
||||
// Transaction states
|
||||
@@ -34,8 +63,9 @@ struct ShadowDb {
|
||||
}
|
||||
|
||||
impl ShadowDb {
|
||||
fn new() -> Self {
|
||||
fn new(initial_schema: HashMap<String, TableSchema>) -> Self {
|
||||
Self {
|
||||
schema: initial_schema,
|
||||
committed_rows: HashMap::new(),
|
||||
transactions: HashMap::new(),
|
||||
}
|
||||
@@ -46,6 +76,7 @@ impl ShadowDb {
|
||||
tx_id,
|
||||
if immediate {
|
||||
Some(TransactionState {
|
||||
schema: self.schema.clone(),
|
||||
visible_rows: self.committed_rows.clone(),
|
||||
pending_changes: Vec::new(),
|
||||
})
|
||||
@@ -59,6 +90,7 @@ impl ShadowDb {
|
||||
if let Some(tx_state) = self.transactions.get_mut(&tx_id) {
|
||||
assert!(tx_state.is_none());
|
||||
tx_state.replace(TransactionState {
|
||||
schema: self.schema.clone(),
|
||||
visible_rows: self.committed_rows.clone(),
|
||||
pending_changes: Vec::new(),
|
||||
});
|
||||
@@ -74,16 +106,69 @@ impl ShadowDb {
|
||||
// Apply pending changes to committed state
|
||||
for op in tx_state.pending_changes {
|
||||
match op {
|
||||
Operation::Insert { id, text } => {
|
||||
self.committed_rows.insert(id, DbRow { id, text });
|
||||
Operation::Insert { id, other_columns } => {
|
||||
assert!(
|
||||
other_columns.len()
|
||||
== self.schema.get("test_table").unwrap().columns.len() - 1,
|
||||
"Inserted row has {} columns, expected {}",
|
||||
other_columns.len() + 1,
|
||||
self.schema.get("test_table").unwrap().columns.len()
|
||||
);
|
||||
self.committed_rows.insert(id, DbRow { id, other_columns });
|
||||
}
|
||||
Operation::Update { id, text } => {
|
||||
self.committed_rows.insert(id, DbRow { id, text });
|
||||
Operation::Update { id, other_columns } => {
|
||||
let mut row_to_update = self.committed_rows.get(&id).unwrap().clone();
|
||||
for (k, v) in other_columns {
|
||||
row_to_update.other_columns.insert(k, v);
|
||||
}
|
||||
self.committed_rows.insert(id, row_to_update);
|
||||
}
|
||||
Operation::Delete { id } => {
|
||||
self.committed_rows.remove(&id);
|
||||
}
|
||||
other => unreachable!("Unexpected operation: {other}"),
|
||||
Operation::AlterTable { op } => match op {
|
||||
AlterTableOp::AddColumn { name, ty } => {
|
||||
let table_columns =
|
||||
&mut self.schema.get_mut("test_table").unwrap().columns;
|
||||
table_columns.push(Column {
|
||||
name: name.clone(),
|
||||
ty: ty.clone(),
|
||||
primary_key: false,
|
||||
});
|
||||
for row in self.committed_rows.values_mut() {
|
||||
row.other_columns.insert(name.clone(), Value::Null);
|
||||
}
|
||||
}
|
||||
AlterTableOp::DropColumn { name } => {
|
||||
let table_columns =
|
||||
&mut self.schema.get_mut("test_table").unwrap().columns;
|
||||
table_columns.retain(|c| c.name != name);
|
||||
for row in self.committed_rows.values_mut() {
|
||||
row.other_columns.remove(&name);
|
||||
}
|
||||
}
|
||||
AlterTableOp::RenameColumn { old_name, new_name } => {
|
||||
let table_columns =
|
||||
&mut self.schema.get_mut("test_table").unwrap().columns;
|
||||
let col_type = table_columns
|
||||
.iter()
|
||||
.find(|c| c.name == old_name)
|
||||
.unwrap()
|
||||
.ty
|
||||
.clone();
|
||||
table_columns.retain(|c| c.name != old_name);
|
||||
table_columns.push(Column {
|
||||
name: new_name.clone(),
|
||||
ty: col_type,
|
||||
primary_key: false,
|
||||
});
|
||||
for row in self.committed_rows.values_mut() {
|
||||
let value = row.other_columns.remove(&old_name).unwrap();
|
||||
row.other_columns.insert(new_name.clone(), value);
|
||||
}
|
||||
}
|
||||
},
|
||||
_ => unreachable!("Unexpected operation: {op}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -93,7 +178,12 @@ impl ShadowDb {
|
||||
self.transactions.remove(&tx_id);
|
||||
}
|
||||
|
||||
fn insert(&mut self, tx_id: usize, id: i64, text: String) -> Result<(), String> {
|
||||
fn insert(
|
||||
&mut self,
|
||||
tx_id: usize,
|
||||
id: i64,
|
||||
other_columns: HashMap<String, Value>,
|
||||
) -> Result<(), String> {
|
||||
if let Some(tx_state) = self.transactions.get_mut(&tx_id) {
|
||||
// Check if row exists in visible state
|
||||
if tx_state.as_ref().unwrap().visible_rows.contains_key(&id) {
|
||||
@@ -101,13 +191,13 @@ impl ShadowDb {
|
||||
}
|
||||
let row = DbRow {
|
||||
id,
|
||||
text: text.clone(),
|
||||
other_columns: other_columns.clone(),
|
||||
};
|
||||
tx_state
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.pending_changes
|
||||
.push(Operation::Insert { id, text });
|
||||
.push(Operation::Insert { id, other_columns });
|
||||
tx_state.as_mut().unwrap().visible_rows.insert(id, row);
|
||||
Ok(())
|
||||
} else {
|
||||
@@ -115,22 +205,31 @@ impl ShadowDb {
|
||||
}
|
||||
}
|
||||
|
||||
fn update(&mut self, tx_id: usize, id: i64, text: String) -> Result<(), String> {
|
||||
fn update(
|
||||
&mut self,
|
||||
tx_id: usize,
|
||||
id: i64,
|
||||
other_columns: HashMap<String, Value>,
|
||||
) -> Result<(), String> {
|
||||
if let Some(tx_state) = self.transactions.get_mut(&tx_id) {
|
||||
// Check if row exists in visible state
|
||||
if !tx_state.as_ref().unwrap().visible_rows.contains_key(&id) {
|
||||
let visible_rows = &tx_state.as_ref().unwrap().visible_rows;
|
||||
if !visible_rows.contains_key(&id) {
|
||||
return Err("Row not found".to_string());
|
||||
}
|
||||
let row = DbRow {
|
||||
id,
|
||||
text: text.clone(),
|
||||
};
|
||||
let mut new_row = visible_rows.get(&id).unwrap().clone();
|
||||
for (k, v) in other_columns {
|
||||
new_row.other_columns.insert(k, v);
|
||||
}
|
||||
tx_state
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.pending_changes
|
||||
.push(Operation::Update { id, text });
|
||||
tx_state.as_mut().unwrap().visible_rows.insert(id, row);
|
||||
.push(Operation::Update {
|
||||
id,
|
||||
other_columns: new_row.other_columns.clone(),
|
||||
});
|
||||
tx_state.as_mut().unwrap().visible_rows.insert(id, new_row);
|
||||
Ok(())
|
||||
} else {
|
||||
Err("No active transaction".to_string())
|
||||
@@ -155,6 +254,78 @@ impl ShadowDb {
|
||||
}
|
||||
}
|
||||
|
||||
fn alter_table(&mut self, tx_id: usize, op: AlterTableOp) -> Result<(), String> {
|
||||
if let Some(tx_state) = self.transactions.get_mut(&tx_id) {
|
||||
let table_columns = &mut tx_state
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.schema
|
||||
.get_mut("test_table")
|
||||
.unwrap()
|
||||
.columns;
|
||||
match op {
|
||||
AlterTableOp::AddColumn { name, ty } => {
|
||||
table_columns.push(Column {
|
||||
name: name.clone(),
|
||||
ty: ty.clone(),
|
||||
primary_key: false,
|
||||
});
|
||||
let pending_changes = &mut tx_state.as_mut().unwrap().pending_changes;
|
||||
pending_changes.push(Operation::AlterTable {
|
||||
op: AlterTableOp::AddColumn {
|
||||
name: name.clone(),
|
||||
ty: ty.clone(),
|
||||
},
|
||||
});
|
||||
let visible_rows = &mut tx_state.as_mut().unwrap().visible_rows;
|
||||
for visible_row in visible_rows.values_mut() {
|
||||
visible_row.other_columns.insert(name.clone(), Value::Null);
|
||||
}
|
||||
}
|
||||
AlterTableOp::DropColumn { name } => {
|
||||
table_columns.retain(|c| c.name != name);
|
||||
let pending_changes = &mut tx_state.as_mut().unwrap().pending_changes;
|
||||
pending_changes.push(Operation::AlterTable {
|
||||
op: AlterTableOp::DropColumn { name: name.clone() },
|
||||
});
|
||||
let visible_rows = &mut tx_state.as_mut().unwrap().visible_rows;
|
||||
for visible_row in visible_rows.values_mut() {
|
||||
visible_row.other_columns.remove(&name);
|
||||
}
|
||||
}
|
||||
AlterTableOp::RenameColumn { old_name, new_name } => {
|
||||
let col_type = table_columns
|
||||
.iter()
|
||||
.find(|c| c.name == old_name)
|
||||
.unwrap()
|
||||
.ty
|
||||
.clone();
|
||||
table_columns.retain(|c| c.name != old_name);
|
||||
table_columns.push(Column {
|
||||
name: new_name.clone(),
|
||||
ty: col_type,
|
||||
primary_key: false,
|
||||
});
|
||||
let pending_changes = &mut tx_state.as_mut().unwrap().pending_changes;
|
||||
pending_changes.push(Operation::AlterTable {
|
||||
op: AlterTableOp::RenameColumn {
|
||||
old_name: old_name.clone(),
|
||||
new_name: new_name.clone(),
|
||||
},
|
||||
});
|
||||
let visible_rows = &mut tx_state.as_mut().unwrap().visible_rows;
|
||||
for visible_row in visible_rows.values_mut() {
|
||||
let value = visible_row.other_columns.remove(&old_name).unwrap();
|
||||
visible_row.other_columns.insert(new_name.clone(), value);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
Err("No active transaction".to_string())
|
||||
}
|
||||
}
|
||||
|
||||
fn get_visible_rows(&self, tx_id: Option<usize>) -> Vec<DbRow> {
|
||||
let Some(tx_id) = tx_id else {
|
||||
// No transaction - see committed state
|
||||
@@ -190,33 +361,94 @@ impl std::fmt::Display for CheckpointMode {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
enum AlterTableOp {
|
||||
AddColumn { name: String, ty: String },
|
||||
DropColumn { name: String },
|
||||
RenameColumn { old_name: String, new_name: String },
|
||||
}
|
||||
|
||||
impl std::fmt::Display for AlterTableOp {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
AlterTableOp::AddColumn { name, ty } => write!(f, "ADD COLUMN {name} {ty}"),
|
||||
AlterTableOp::DropColumn { name } => write!(f, "DROP COLUMN {name}"),
|
||||
AlterTableOp::RenameColumn { old_name, new_name } => {
|
||||
write!(f, "RENAME COLUMN {old_name} TO {new_name}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum Operation {
|
||||
Begin,
|
||||
Commit,
|
||||
Rollback,
|
||||
Insert { id: i64, text: String },
|
||||
Update { id: i64, text: String },
|
||||
Delete { id: i64 },
|
||||
Checkpoint { mode: CheckpointMode },
|
||||
Insert {
|
||||
id: i64,
|
||||
other_columns: HashMap<String, Value>,
|
||||
},
|
||||
Update {
|
||||
id: i64,
|
||||
other_columns: HashMap<String, Value>,
|
||||
},
|
||||
Delete {
|
||||
id: i64,
|
||||
},
|
||||
Checkpoint {
|
||||
mode: CheckpointMode,
|
||||
},
|
||||
AlterTable {
|
||||
op: AlterTableOp,
|
||||
},
|
||||
Select,
|
||||
}
|
||||
|
||||
fn value_to_sql(v: &Value) -> String {
|
||||
match v {
|
||||
Value::Integer(i) => i.to_string(),
|
||||
Value::Text(s) => format!("'{s}'"),
|
||||
Value::Null => "NULL".to_string(),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Operation {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Operation::Begin => write!(f, "BEGIN"),
|
||||
Operation::Commit => write!(f, "COMMIT"),
|
||||
Operation::Rollback => write!(f, "ROLLBACK"),
|
||||
Operation::Insert { id, text } => {
|
||||
write!(f, "INSERT INTO test_table (id, text) VALUES ({id}, {text})")
|
||||
Operation::Insert { id, other_columns } => {
|
||||
let col_names = other_columns.keys().cloned().collect::<Vec<_>>().join(", ");
|
||||
let col_values = other_columns
|
||||
.values()
|
||||
.map(value_to_sql)
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
if col_names.is_empty() {
|
||||
write!(f, "INSERT INTO test_table (id) VALUES ({id})")
|
||||
} else {
|
||||
write!(
|
||||
f,
|
||||
"INSERT INTO test_table (id, {col_names}) VALUES ({id}, {col_values})"
|
||||
)
|
||||
}
|
||||
}
|
||||
Operation::Update { id, text } => {
|
||||
write!(f, "UPDATE test_table SET text = {text} WHERE id = {id}")
|
||||
Operation::Update { id, other_columns } => {
|
||||
let update_set = other_columns
|
||||
.iter()
|
||||
.map(|(k, v)| format!("{k}={}", value_to_sql(v)))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
write!(f, "UPDATE test_table SET {update_set} WHERE id = {id}")
|
||||
}
|
||||
Operation::Delete { id } => write!(f, "DELETE FROM test_table WHERE id = {id}"),
|
||||
Operation::Select => write!(f, "SELECT * FROM test_table"),
|
||||
Operation::Checkpoint { mode } => write!(f, "PRAGMA wal_checkpoint({mode})"),
|
||||
Operation::AlterTable { op } => write!(f, "ALTER TABLE test_table {op}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -259,6 +491,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
|
||||
const NUM_CONNECTIONS: usize = 2;
|
||||
|
||||
for iteration in 0..NUM_ITERATIONS {
|
||||
println!("--- Seed {seed} Iteration {iteration} ---");
|
||||
// Create a fresh database for each iteration
|
||||
let tempfile = tempfile::NamedTempFile::new().unwrap();
|
||||
let db = Builder::new_local(tempfile.path().to_str().unwrap())
|
||||
@@ -268,7 +501,25 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
|
||||
.unwrap();
|
||||
|
||||
// SHARED shadow database for all connections
|
||||
let mut shared_shadow_db = ShadowDb::new();
|
||||
let mut schema = HashMap::new();
|
||||
schema.insert(
|
||||
"test_table".to_string(),
|
||||
TableSchema {
|
||||
columns: vec![
|
||||
Column {
|
||||
name: "id".to_string(),
|
||||
ty: "INTEGER".to_string(),
|
||||
primary_key: true,
|
||||
},
|
||||
Column {
|
||||
name: "text".to_string(),
|
||||
ty: "TEXT".to_string(),
|
||||
primary_key: false,
|
||||
},
|
||||
],
|
||||
},
|
||||
);
|
||||
let mut shared_shadow_db = ShadowDb::new(schema);
|
||||
let mut next_tx_id = 0;
|
||||
|
||||
// Create connections
|
||||
@@ -357,13 +608,20 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
|
||||
}
|
||||
}
|
||||
}
|
||||
Operation::Insert { id, text } => {
|
||||
let result = conn
|
||||
.execute(
|
||||
"INSERT INTO test_table (id, text) VALUES (?, ?)",
|
||||
vec![Value::Integer(id), Value::Text(text.clone())],
|
||||
)
|
||||
.await;
|
||||
Operation::Insert { id, other_columns } => {
|
||||
let col_names =
|
||||
other_columns.keys().cloned().collect::<Vec<_>>().join(", ");
|
||||
let col_values = other_columns
|
||||
.values()
|
||||
.map(value_to_sql)
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
let query = if col_names.is_empty() {
|
||||
format!("INSERT INTO test_table (id) VALUES ({id})")
|
||||
} else {
|
||||
format!("INSERT INTO test_table (id, {col_names}) VALUES ({id}, {col_values})")
|
||||
};
|
||||
let result = conn.execute(query.as_str(), ()).await;
|
||||
|
||||
// Check if real DB operation succeeded
|
||||
match result {
|
||||
@@ -371,12 +629,14 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
|
||||
// Success - update shadow DB
|
||||
if let Some(tx_id) = *current_tx_id {
|
||||
// In transaction - update transaction's view
|
||||
shared_shadow_db.insert(tx_id, id, text.clone()).unwrap();
|
||||
shared_shadow_db
|
||||
.insert(tx_id, id, other_columns.clone())
|
||||
.unwrap();
|
||||
} else {
|
||||
// Auto-commit - update shadow DB committed state
|
||||
shared_shadow_db.begin_transaction(next_tx_id, true);
|
||||
shared_shadow_db
|
||||
.insert(next_tx_id, id, text.clone())
|
||||
.insert(next_tx_id, id, other_columns.clone())
|
||||
.unwrap();
|
||||
shared_shadow_db.commit_transaction(next_tx_id);
|
||||
next_tx_id += 1;
|
||||
@@ -395,13 +655,14 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
|
||||
}
|
||||
}
|
||||
}
|
||||
Operation::Update { id, text } => {
|
||||
let result = conn
|
||||
.execute(
|
||||
"UPDATE test_table SET text = ? WHERE id = ?",
|
||||
vec![Value::Text(text.clone()), Value::Integer(id)],
|
||||
)
|
||||
.await;
|
||||
Operation::Update { id, other_columns } => {
|
||||
let col_set = other_columns
|
||||
.iter()
|
||||
.map(|(k, v)| format!("{k}={}", value_to_sql(v)))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
let query = format!("UPDATE test_table SET {col_set} WHERE id = {id}");
|
||||
let result = conn.execute(query.as_str(), ()).await;
|
||||
|
||||
// Check if real DB operation succeeded
|
||||
match result {
|
||||
@@ -409,12 +670,14 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
|
||||
// Success - update shadow DB
|
||||
if let Some(tx_id) = *current_tx_id {
|
||||
// In transaction - update transaction's view
|
||||
shared_shadow_db.update(tx_id, id, text.clone()).unwrap();
|
||||
shared_shadow_db
|
||||
.update(tx_id, id, other_columns.clone())
|
||||
.unwrap();
|
||||
} else {
|
||||
// Auto-commit - update shadow DB committed state
|
||||
shared_shadow_db.begin_transaction(next_tx_id, true);
|
||||
shared_shadow_db
|
||||
.update(next_tx_id, id, text.clone())
|
||||
.update(next_tx_id, id, other_columns.clone())
|
||||
.unwrap();
|
||||
shared_shadow_db.commit_transaction(next_tx_id);
|
||||
next_tx_id += 1;
|
||||
@@ -436,8 +699,8 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
|
||||
Operation::Delete { id } => {
|
||||
let result = conn
|
||||
.execute(
|
||||
"DELETE FROM test_table WHERE id = ?",
|
||||
vec![Value::Integer(id)],
|
||||
format!("DELETE FROM test_table WHERE id = {id}").as_str(),
|
||||
(),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -470,17 +733,23 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
|
||||
}
|
||||
}
|
||||
Operation::Select => {
|
||||
let query_str = "SELECT id, text FROM test_table ORDER BY id";
|
||||
let mut rows = conn.query(query_str, ()).await.unwrap();
|
||||
let query_str = "SELECT * FROM test_table ORDER BY id";
|
||||
let mut stmt = conn.prepare(query_str).await.unwrap();
|
||||
let columns = stmt.columns();
|
||||
let mut rows = stmt.query(()).await.unwrap();
|
||||
|
||||
let mut real_rows = Vec::new();
|
||||
while let Some(row) = rows.next().await.unwrap() {
|
||||
let id = row.get_value(0).unwrap();
|
||||
let text = row.get_value(1).unwrap();
|
||||
|
||||
if let (Value::Integer(id), Value::Text(text)) = (id, text) {
|
||||
real_rows.push(DbRow { id, text });
|
||||
let Value::Integer(id) = row.get_value(0).unwrap() else {
|
||||
panic!("Unexpected value for id: {:?}", row.get_value(0));
|
||||
};
|
||||
let mut other_columns = HashMap::new();
|
||||
for i in 1..columns.len() {
|
||||
let column = columns.get(i).unwrap();
|
||||
let value = row.get_value(i).unwrap();
|
||||
other_columns.insert(column.name().to_string(), value);
|
||||
}
|
||||
real_rows.push(DbRow { id, other_columns });
|
||||
}
|
||||
real_rows.sort_by_key(|r| r.id);
|
||||
|
||||
@@ -510,6 +779,39 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
|
||||
);
|
||||
}
|
||||
}
|
||||
Operation::AlterTable { op } => {
|
||||
let query = format!("ALTER TABLE test_table {op}");
|
||||
let result = conn.execute(&query, ()).await;
|
||||
|
||||
match result {
|
||||
Ok(_) => {
|
||||
if let Some(tx_id) = *current_tx_id {
|
||||
// In transaction - update transaction's view
|
||||
shared_shadow_db.alter_table(tx_id, op).unwrap();
|
||||
} else {
|
||||
// Auto-commit - update shadow DB committed state
|
||||
shared_shadow_db.begin_transaction(next_tx_id, true);
|
||||
shared_shadow_db
|
||||
.alter_table(next_tx_id, op.clone())
|
||||
.unwrap();
|
||||
shared_shadow_db.commit_transaction(next_tx_id);
|
||||
next_tx_id += 1;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Connection {conn_id}(op={op_num}) FAILED: {e}");
|
||||
// Check if it's an acceptable error
|
||||
if e.to_string().contains("database is locked") {
|
||||
if let Some(tx_id) = *current_tx_id {
|
||||
shared_shadow_db.rollback_transaction(tx_id);
|
||||
*current_tx_id = None;
|
||||
}
|
||||
} else {
|
||||
panic!("Unexpected error during alter table: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Operation::Checkpoint { mode } => {
|
||||
let query = format!("PRAGMA wal_checkpoint({mode})");
|
||||
let mut rows = conn.query(&query, ()).await.unwrap();
|
||||
@@ -550,6 +852,15 @@ fn generate_operation(
|
||||
shadow_db: &mut ShadowDb,
|
||||
) -> (Operation, Vec<DbRow>) {
|
||||
let in_transaction = current_tx_id.is_some();
|
||||
let schema_clone = if let Some(tx_id) = current_tx_id {
|
||||
if let Some(Some(tx_state)) = shadow_db.transactions.get(&tx_id) {
|
||||
tx_state.schema.clone()
|
||||
} else {
|
||||
shadow_db.schema.clone()
|
||||
}
|
||||
} else {
|
||||
shadow_db.schema.clone()
|
||||
};
|
||||
let mut get_visible_rows = |accesses_db: bool| {
|
||||
if let Some(tx_id) = current_tx_id {
|
||||
let tx_state = shadow_db.transactions.get(&tx_id).unwrap();
|
||||
@@ -568,7 +879,10 @@ fn generate_operation(
|
||||
(Operation::Begin, get_visible_rows(false))
|
||||
} else {
|
||||
let visible_rows = get_visible_rows(true);
|
||||
(generate_data_operation(rng, &visible_rows), visible_rows)
|
||||
(
|
||||
generate_data_operation(rng, &visible_rows, &schema_clone),
|
||||
visible_rows,
|
||||
)
|
||||
}
|
||||
}
|
||||
10..=14 => {
|
||||
@@ -576,7 +890,10 @@ fn generate_operation(
|
||||
(Operation::Commit, get_visible_rows(false))
|
||||
} else {
|
||||
let visible_rows = get_visible_rows(true);
|
||||
(generate_data_operation(rng, &visible_rows), visible_rows)
|
||||
(
|
||||
generate_data_operation(rng, &visible_rows, &schema_clone),
|
||||
visible_rows,
|
||||
)
|
||||
}
|
||||
}
|
||||
15..=19 => {
|
||||
@@ -584,7 +901,10 @@ fn generate_operation(
|
||||
(Operation::Rollback, get_visible_rows(false))
|
||||
} else {
|
||||
let visible_rows = get_visible_rows(true);
|
||||
(generate_data_operation(rng, &visible_rows), visible_rows)
|
||||
(
|
||||
generate_data_operation(rng, &visible_rows, &schema_clone),
|
||||
visible_rows,
|
||||
)
|
||||
}
|
||||
}
|
||||
20..=22 => {
|
||||
@@ -596,53 +916,147 @@ fn generate_operation(
|
||||
};
|
||||
(Operation::Checkpoint { mode }, get_visible_rows(false))
|
||||
}
|
||||
// 80% chance for data operations
|
||||
23..=26 => {
|
||||
let op = match rng.random_range(0..6) {
|
||||
0..=2 => AlterTableOp::AddColumn {
|
||||
name: format!("col_{}", rng.random_range(1..i64::MAX)),
|
||||
ty: "TEXT".to_string(),
|
||||
},
|
||||
3..=4 => {
|
||||
let table_schema = schema_clone.get("test_table").unwrap();
|
||||
let columns_no_id = table_schema
|
||||
.columns
|
||||
.iter()
|
||||
.filter(|c| c.name != "id")
|
||||
.collect::<Vec<_>>();
|
||||
if columns_no_id.is_empty() {
|
||||
AlterTableOp::AddColumn {
|
||||
name: format!("col_{}", rng.random_range(1..i64::MAX)),
|
||||
ty: "TEXT".to_string(),
|
||||
}
|
||||
} else {
|
||||
let column = columns_no_id.choose(rng).unwrap();
|
||||
AlterTableOp::DropColumn {
|
||||
name: column.name.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
5 => {
|
||||
let columns_no_id = schema_clone
|
||||
.get("test_table")
|
||||
.unwrap()
|
||||
.columns
|
||||
.iter()
|
||||
.filter(|c| c.name != "id")
|
||||
.collect::<Vec<_>>();
|
||||
if columns_no_id.is_empty() {
|
||||
AlterTableOp::AddColumn {
|
||||
name: format!("col_{}", rng.random_range(1..i64::MAX)),
|
||||
ty: "TEXT".to_string(),
|
||||
}
|
||||
} else {
|
||||
let column = columns_no_id.choose(rng).unwrap();
|
||||
AlterTableOp::RenameColumn {
|
||||
old_name: column.name.clone(),
|
||||
new_name: format!("col_{}", rng.random_range(1..i64::MAX)),
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
(Operation::AlterTable { op }, get_visible_rows(true))
|
||||
}
|
||||
_ => {
|
||||
let visible_rows = get_visible_rows(true);
|
||||
(generate_data_operation(rng, &visible_rows), visible_rows)
|
||||
(
|
||||
generate_data_operation(rng, &visible_rows, &schema_clone),
|
||||
visible_rows,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_data_operation(rng: &mut ChaCha8Rng, visible_rows: &[DbRow]) -> Operation {
|
||||
match rng.random_range(0..4) {
|
||||
fn generate_data_operation(
|
||||
rng: &mut ChaCha8Rng,
|
||||
visible_rows: &[DbRow],
|
||||
schema: &HashMap<String, TableSchema>,
|
||||
) -> Operation {
|
||||
let table_schema = schema.get("test_table").unwrap();
|
||||
let op_num = rng.random_range(0..4);
|
||||
let mut generate_insert_operation = || {
|
||||
let id = rng.random_range(1..i64::MAX);
|
||||
let mut other_columns = HashMap::new();
|
||||
for column in table_schema.columns.iter() {
|
||||
if column.name == "id" {
|
||||
continue;
|
||||
}
|
||||
other_columns.insert(
|
||||
column.name.clone(),
|
||||
match column.ty.as_str() {
|
||||
"TEXT" => Value::Text(format!("text_{}", rng.random_range(1..i64::MAX))),
|
||||
"INTEGER" => Value::Integer(rng.random_range(1..i64::MAX)),
|
||||
"REAL" => Value::Real(rng.random_range(1..i64::MAX) as f64),
|
||||
_ => Value::Null,
|
||||
},
|
||||
);
|
||||
}
|
||||
Operation::Insert { id, other_columns }
|
||||
};
|
||||
match op_num {
|
||||
0 => {
|
||||
// Insert - generate a new ID that doesn't exist
|
||||
let id = if visible_rows.is_empty() {
|
||||
rng.random_range(1..1000)
|
||||
} else {
|
||||
let max_id = visible_rows.iter().map(|r| r.id).max().unwrap();
|
||||
rng.random_range(max_id + 1..max_id + 100)
|
||||
};
|
||||
let text = format!("text_{}", rng.random_range(1..1000));
|
||||
Operation::Insert { id, text }
|
||||
// Insert
|
||||
generate_insert_operation()
|
||||
}
|
||||
1 => {
|
||||
// Update - only if there are visible rows
|
||||
// Update
|
||||
if visible_rows.is_empty() {
|
||||
// No rows to update, try insert instead
|
||||
let id = rng.random_range(1..1000);
|
||||
let text = format!("text_{}", rng.random_range(1..1000));
|
||||
Operation::Insert { id, text }
|
||||
generate_insert_operation()
|
||||
} else {
|
||||
let columns_no_id = table_schema
|
||||
.columns
|
||||
.iter()
|
||||
.filter(|c| c.name != "id")
|
||||
.collect::<Vec<_>>();
|
||||
if columns_no_id.is_empty() {
|
||||
// No columns to update, try insert instead
|
||||
return generate_insert_operation();
|
||||
}
|
||||
let id = visible_rows.choose(rng).unwrap().id;
|
||||
let text = format!("updated_{}", rng.random_range(1..1000));
|
||||
Operation::Update { id, text }
|
||||
let col_name_to_update = columns_no_id.choose(rng).unwrap().name.clone();
|
||||
let mut other_columns = HashMap::new();
|
||||
other_columns.insert(
|
||||
col_name_to_update.clone(),
|
||||
match columns_no_id
|
||||
.iter()
|
||||
.find(|c| c.name == col_name_to_update)
|
||||
.unwrap()
|
||||
.ty
|
||||
.as_str()
|
||||
{
|
||||
"TEXT" => Value::Text(format!("updated_{}", rng.random_range(1..i64::MAX))),
|
||||
"INTEGER" => Value::Integer(rng.random_range(1..i64::MAX)),
|
||||
"REAL" => Value::Real(rng.random_range(1..i64::MAX) as f64),
|
||||
_ => Value::Null,
|
||||
},
|
||||
);
|
||||
Operation::Update { id, other_columns }
|
||||
}
|
||||
}
|
||||
2 => {
|
||||
// Delete - only if there are visible rows
|
||||
// Delete
|
||||
if visible_rows.is_empty() {
|
||||
// No rows to delete, try insert instead
|
||||
let id = rng.random_range(1..1000);
|
||||
let text = format!("text_{}", rng.random_range(1..1000));
|
||||
Operation::Insert { id, text }
|
||||
generate_insert_operation()
|
||||
} else {
|
||||
let id = visible_rows.choose(rng).unwrap().id;
|
||||
Operation::Delete { id }
|
||||
}
|
||||
}
|
||||
3 => Operation::Select,
|
||||
3 => {
|
||||
// Select
|
||||
Operation::Select
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user