This commit is contained in:
alpaylan
2025-07-11 01:33:50 -04:00
163 changed files with 19438 additions and 2320 deletions

View File

@@ -8,8 +8,6 @@ use std::{
use serde::{Deserialize, Serialize};
use tracing;
use turso_core::{Connection, Result, StepResult, IO};
use crate::{
@@ -258,20 +256,26 @@ pub(crate) struct InteractionStats {
pub(crate) create_count: usize,
pub(crate) create_index_count: usize,
pub(crate) drop_count: usize,
pub(crate) begin_count: usize,
pub(crate) commit_count: usize,
pub(crate) rollback_count: usize,
}
impl Display for InteractionStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Read: {}, Write: {}, Delete: {}, Update: {}, Create: {}, CreateIndex: {}, Drop: {}",
"Read: {}, Write: {}, Delete: {}, Update: {}, Create: {}, CreateIndex: {}, Drop: {}, Begin: {}, Commit: {}, Rollback: {}",
self.read_count,
self.write_count,
self.delete_count,
self.update_count,
self.create_count,
self.create_index_count,
self.drop_count
self.drop_count,
self.begin_count,
self.commit_count,
self.rollback_count,
)
}
}
@@ -301,7 +305,7 @@ impl Display for Interaction {
}
}
type AssertionFunc = dyn Fn(&Vec<ResultSet>, &SimulatorEnv) -> Result<bool>;
type AssertionFunc = dyn Fn(&Vec<ResultSet>, &mut SimulatorEnv) -> Result<bool>;
enum AssertionAST {
Pick(),
@@ -349,6 +353,9 @@ impl InteractionPlan {
create_count: 0,
create_index_count: 0,
drop_count: 0,
begin_count: 0,
commit_count: 0,
rollback_count: 0,
};
fn query_stat(q: &Query, stats: &mut InteractionStats) {
@@ -360,9 +367,11 @@ impl InteractionPlan {
Query::Drop(_) => stats.drop_count += 1,
Query::Update(_) => stats.update_count += 1,
Query::CreateIndex(_) => stats.create_index_count += 1,
Query::Begin(_) => stats.begin_count += 1,
Query::Commit(_) => stats.commit_count += 1,
Query::Rollback(_) => stats.rollback_count += 1,
}
}
for interactions in &self.plan {
match interactions {
Interactions::Property(property) => {
@@ -458,7 +467,7 @@ impl Interaction {
out.push(r);
}
StepResult::IO => {
io.run_once().unwrap();
rows.run_once().unwrap();
}
StepResult::Interrupt => {}
StepResult::Done => {
@@ -477,7 +486,7 @@ impl Interaction {
pub(crate) fn execute_assertion(
&self,
stack: &Vec<ResultSet>,
env: &SimulatorEnv,
env: &mut SimulatorEnv,
) -> Result<()> {
match self {
Self::Assertion(assertion) => {
@@ -502,7 +511,7 @@ impl Interaction {
pub(crate) fn execute_assumption(
&self,
stack: &Vec<ResultSet>,
env: &SimulatorEnv,
env: &mut SimulatorEnv,
) -> Result<()> {
match self {
Self::Assumption(assumption) => {
@@ -682,6 +691,7 @@ fn reopen_database(env: &mut SimulatorEnv) {
env.connections.clear();
// Clear all open files
// TODO: for correct reporting of faults we should get all the recorded numbers and transfer to the new file
env.io.files.borrow_mut().clear();
// 2. Re-open database

View File

@@ -10,10 +10,13 @@ use crate::{
CompoundOperator, CompoundSelect, Distinctness, ResultColumn, SelectBody,
SelectInner,
},
select::{Distinctness, ResultColumn},
transaction::{Begin, Commit, Rollback},
update::Update,
Create, Delete, Drop, Insert, Query, Select,
},
table::SimValue,
FAULT_ERROR_MSG,
},
runner::env::SimulatorEnv,
};
@@ -52,6 +55,8 @@ pub(crate) enum Property {
queries: Vec<Query>,
/// The select query
select: Select,
/// Interactive query information if any
interactive: Option<InteractiveQueryInfo>,
},
/// Double Create Failure is a property in which creating
/// the same table twice leads to an error.
@@ -167,6 +172,12 @@ pub(crate) enum Property {
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InteractiveQueryInfo {
start_with_immediate: bool,
end_with_commit: bool,
}
impl Property {
pub(crate) fn name(&self) -> &str {
match self {
@@ -192,6 +203,7 @@ impl Property {
row_index,
queries,
select,
interactive,
} => {
let (table, values) = if let Insert::Values { table, values } = insert {
(table, values)
@@ -214,7 +226,7 @@ impl Property {
message: format!("table {} exists", insert.table()),
func: Box::new({
let table_name = table.clone();
move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
move |_: &Vec<ResultSet>, env: &mut SimulatorEnv| {
Ok(env.tables.iter().any(|t| t.name == table_name))
}
}),
@@ -222,14 +234,26 @@ impl Property {
let assertion = Interaction::Assertion(Assertion {
message: format!(
"row [{:?}] not found in table {}",
"row [{:?}] not found in table {}, interactive={} commit={}, rollback={}",
row.iter().map(|v| v.to_string()).collect::<Vec<String>>(),
insert.table(),
interactive.is_some(),
interactive
.as_ref()
.map(|i| i.end_with_commit)
.unwrap_or(false),
interactive
.as_ref()
.map(|i| !i.end_with_commit)
.unwrap_or(false),
),
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
func: Box::new(move |stack: &Vec<ResultSet>, _| {
let rows = stack.last().unwrap();
match rows {
Ok(rows) => Ok(rows.iter().any(|r| r == &row)),
Ok(rows) => {
let found = rows.iter().any(|r| r == &row);
Ok(found)
}
Err(err) => Err(LimboError::InternalError(err.to_string())),
}
}),
@@ -250,7 +274,7 @@ impl Property {
let assumption = Interaction::Assumption(Assertion {
message: "Double-Create-Failure should not be called on an existing table"
.to_string(),
func: Box::new(move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
func: Box::new(move |_: &Vec<ResultSet>, env: &mut SimulatorEnv| {
Ok(!env.tables.iter().any(|t| t.name == table_name))
}),
});
@@ -308,7 +332,7 @@ impl Property {
let assertion = Interaction::Assertion(Assertion {
message: "select query should respect the limit clause".to_string(),
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
func: Box::new(move |stack: &Vec<ResultSet>, _| {
let last = stack.last().unwrap();
match last {
Ok(rows) => Ok(limit >= rows.len()),
@@ -332,7 +356,7 @@ impl Property {
message: format!("table {table} exists"),
func: Box::new({
let table = table.clone();
move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
move |_: &Vec<ResultSet>, env: &mut SimulatorEnv| {
Ok(env.tables.iter().any(|t| t.name == table))
}
}),
@@ -377,7 +401,7 @@ impl Property {
message: format!("table {table} exists"),
func: Box::new({
let table = table.clone();
move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
move |_, env: &mut SimulatorEnv| {
Ok(env.tables.iter().any(|t| t.name == table))
}
}),
@@ -419,7 +443,7 @@ impl Property {
message: format!("table {table} exists"),
func: Box::new({
let table = table.clone();
move |_: &Vec<ResultSet>, env: &SimulatorEnv| {
move |_: &Vec<ResultSet>, env: &mut SimulatorEnv| {
Ok(env.tables.iter().any(|t| t.name == table))
}
}),
@@ -439,7 +463,7 @@ impl Property {
let assertion = Interaction::Assertion(Assertion {
message: "select queries should return the same amount of results".to_string(),
func: Box::new(move |stack: &Vec<ResultSet>, _: &SimulatorEnv| {
func: Box::new(move |stack: &Vec<ResultSet>, _| {
let select_star = stack.last().unwrap();
let select_predicate = stack.get(stack.len() - 2).unwrap();
match (select_predicate, select_star) {
@@ -487,7 +511,35 @@ impl Property {
}
Property::FaultyQuery { query, tables } => {
let checks = assert_all_table_values(tables);
let first = std::iter::once(Interaction::FaultyQuery(query.clone()));
let query_clone = query.clone();
let assumption = Assertion {
// A fault may not occur as we first signal we want a fault injected,
// then when IO is called the fault triggers. It may happen that a fault is injected
// but no IO happens right after it
message: "fault occured".to_string(),
func: Box::new(move |stack, env| {
let last = stack.last().unwrap();
match last {
Ok(_) => {
query_clone.shadow(env);
Ok(true)
}
Err(err) => {
let msg = format!("{}", err);
if msg.contains(FAULT_ERROR_MSG) {
Ok(true)
} else {
Err(LimboError::InternalError(msg))
}
}
}
}),
};
let first = [
Interaction::FaultyQuery(query.clone()),
Interaction::Assumption(assumption),
]
.into_iter();
Vec::from_iter(first.chain(checks))
}
Property::WhereTrueFalseNull { select, predicate } => {
@@ -673,9 +725,11 @@ fn assert_all_table_values(tables: &[String]) -> impl Iterator<Item = Interactio
),
func: Box::new({
let table = table.clone();
move |stack: &Vec<ResultSet>, env: &SimulatorEnv| {
move |stack: &Vec<ResultSet>, env: &mut SimulatorEnv| {
let table = env.tables.iter().find(|t| t.name == table).ok_or_else(|| {
LimboError::InternalError(format!("table {table} should exist"))
LimboError::InternalError(format!(
"table {table} should exist in simulator env",
))
})?;
let last = stack.last().unwrap();
match last {
@@ -760,12 +814,26 @@ fn property_insert_values_select<R: rand::Rng>(
values: rows,
};
// Choose if we want queries to be executed in an interactive transaction
let interactive = if rng.gen_bool(0.5) {
Some(InteractiveQueryInfo {
start_with_immediate: rng.gen_bool(0.5),
end_with_commit: rng.gen_bool(0.5),
})
} else {
None
};
// Create random queries respecting the constraints
let mut queries = Vec::new();
// - [x] There will be no errors in the middle interactions. (this constraint is impossible to check, so this is just best effort)
// - [x] The inserted row will not be deleted.
// - [x] The inserted row will not be updated.
// - [ ] The table `t` will not be renamed, dropped, or altered. (todo: add this constraint once ALTER or DROP is implemented)
if let Some(ref interactive) = interactive {
queries.push(Query::Begin(Begin {
immediate: interactive.start_with_immediate,
}));
}
for _ in 0..rng.gen_range(0..3) {
let query = Query::arbitrary_from(rng, (env, remaining));
match &query {
@@ -799,6 +867,13 @@ fn property_insert_values_select<R: rand::Rng>(
}
queries.push(query);
}
if let Some(ref interactive) = interactive {
queries.push(if interactive.end_with_commit {
Query::Commit(Commit)
} else {
Query::Rollback(Rollback)
});
}
// Select the row
let select_query = Select::simple(
@@ -811,6 +886,7 @@ fn property_insert_values_select<R: rand::Rng>(
row_index,
queries,
select: select_query,
interactive,
}
}

View File

@@ -1,2 +1,4 @@
pub mod query;
pub mod table;
pub(crate) const FAULT_ERROR_MSG: &str = "Injected fault";

View File

@@ -13,6 +13,11 @@ use update::Update;
use crate::{
generation::Shadow,
model::table::{SimValue, Table},
model::{
query::transaction::{Begin, Commit, Rollback},
table::SimValue,
},
runner::env::SimulatorEnv,
};
pub mod create;
@@ -22,6 +27,7 @@ pub mod drop;
pub mod insert;
pub mod predicate;
pub mod select;
pub mod transaction;
pub mod update;
// This type represents the potential queries on the database.
@@ -34,6 +40,9 @@ pub(crate) enum Query {
Update(Update),
Drop(Drop),
CreateIndex(CreateIndex),
Begin(Begin),
Commit(Commit),
Rollback(Rollback),
}
impl Query {
@@ -49,6 +58,7 @@ impl Query {
Query::CreateIndex(CreateIndex { table_name, .. }) => {
HashSet::from_iter([table_name.clone()])
}
Query::Begin(_) | Query::Commit(_) | Query::Rollback(_) => HashSet::new(),
}
}
pub(crate) fn uses(&self) -> Vec<String> {
@@ -61,6 +71,7 @@ impl Query {
| Query::Update(Update { table, .. })
| Query::Drop(Drop { table, .. }) => vec![table.clone()],
Query::CreateIndex(CreateIndex { table_name, .. }) => vec![table_name.clone()],
Query::Begin(..) | Query::Commit(..) | Query::Rollback(..) => vec![],
}
}
}
@@ -77,6 +88,9 @@ impl Shadow for Query {
Query::Update(update) => update.shadow(env),
Query::Drop(drop) => drop.shadow(env),
Query::CreateIndex(create_index) => Ok(create_index.shadow(env)),
Query::Begin(begin) => begin.shadow(env),
Query::Commit(commit) => commit.shadow(env),
Query::Rollback(rollback) => rollback.shadow(env),
}
}
}
@@ -91,6 +105,9 @@ impl Display for Query {
Self::Update(update) => write!(f, "{update}"),
Self::Drop(drop) => write!(f, "{drop}"),
Self::CreateIndex(create_index) => write!(f, "{create_index}"),
Self::Begin(begin) => write!(f, "{begin}"),
Self::Commit(commit) => write!(f, "{commit}"),
Self::Rollback(rollback) => write!(f, "{rollback}"),
}
}
}

View File

@@ -0,0 +1,57 @@
use std::fmt::Display;
use serde::{Deserialize, Serialize};
use crate::{model::table::SimValue, runner::env::SimulatorEnv};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct Begin {
pub(crate) immediate: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct Commit;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct Rollback;
impl Begin {
pub(crate) fn shadow(&self, env: &mut SimulatorEnv) -> Vec<Vec<SimValue>> {
env.tables_snapshot = Some(env.tables.clone());
vec![]
}
}
impl Commit {
pub(crate) fn shadow(&self, env: &mut SimulatorEnv) -> Vec<Vec<SimValue>> {
env.tables_snapshot = None;
vec![]
}
}
impl Rollback {
pub(crate) fn shadow(&self, env: &mut SimulatorEnv) -> Vec<Vec<SimValue>> {
if let Some(tables) = env.tables_snapshot.take() {
env.tables = tables;
}
vec![]
}
}
impl Display for Begin {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BEGIN {}", if self.immediate { "IMMEDIATE" } else { "" })
}
}
impl Display for Commit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "COMMIT")
}
}
impl Display for Rollback {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ROLLBACK")
}
}

View File

@@ -58,7 +58,7 @@ pub struct SimulatorCLI {
pub disable_delete: bool,
#[clap(long, help = "disable CREATE Statement", default_value_t = false)]
pub disable_create: bool,
#[clap(long, help = "disable CREATE INDEX Statement", default_value_t = false)]
#[clap(long, help = "disable CREATE INDEX Statement", default_value_t = true)]
pub disable_create_index: bool,
#[clap(long, help = "disable DROP Statement", default_value_t = false)]
pub disable_drop: bool,
@@ -100,7 +100,7 @@ pub struct SimulatorCLI {
pub disable_union_all_preserves_cardinality: bool,
#[clap(long, help = "disable FsyncNoWait Property", default_value_t = true)]
pub disable_fsync_no_wait: bool,
#[clap(long, help = "disable FaultyQuery Property", default_value_t = true)]
#[clap(long, help = "disable FaultyQuery Property", default_value_t = false)]
pub disable_faulty_query: bool,
#[clap(long, help = "disable Reopen-Database fault", default_value_t = false)]
pub disable_reopen_database: bool,
@@ -110,6 +110,10 @@ pub struct SimulatorCLI {
default_value_t = 0
)]
pub latency_probability: usize,
#[clap(long, help = "Enable experimental MVCC feature")]
pub experimental_mvcc: bool,
#[clap(long, help = "Enable experimental indexing feature")]
pub experimental_indexes: bool,
}
#[derive(Parser, Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd, Eq, Ord)]

View File

@@ -111,6 +111,18 @@ fn execute_query_rusqlite(
connection.execute(create_index.to_string().as_str(), ())?;
Ok(vec![])
}
Query::Begin(begin) => {
connection.execute(begin.to_string().as_str(), ())?;
Ok(vec![])
}
Query::Commit(commit) => {
connection.execute(commit.to_string().as_str(), ())?;
Ok(vec![])
}
Query::Rollback(rollback) => {
connection.execute(rollback.to_string().as_str(), ())?;
Ok(vec![])
}
}
}

View File

@@ -37,6 +37,7 @@ pub(crate) struct SimulatorEnv {
pub(crate) paths: Paths,
pub(crate) type_: SimulationType,
pub(crate) phase: SimulationPhase,
pub tables_snapshot: Option<Vec<Table>>,
}
impl UnwindSafe for SimulatorEnv {}
@@ -55,6 +56,7 @@ impl SimulatorEnv {
paths: self.paths.clone(),
type_: self.type_,
phase: self.phase,
tables_snapshot: None,
}
}
@@ -207,6 +209,8 @@ impl SimulatorEnv {
max_time_simulation: cli_opts.maximum_time,
disable_reopen_database: cli_opts.disable_reopen_database,
latency_probability: cli_opts.latency_probability,
experimental_mvcc: cli_opts.experimental_mvcc,
experimental_indexes: cli_opts.experimental_indexes,
};
let io =
@@ -224,7 +228,12 @@ impl SimulatorEnv {
std::fs::remove_file(&wal_path).unwrap();
}
let db = match Database::open_file(io.clone(), db_path.to_str().unwrap(), false, true) {
let db = match Database::open_file(
io.clone(),
db_path.to_str().unwrap(),
opts.experimental_mvcc,
opts.experimental_indexes,
) {
Ok(db) => db,
Err(e) => {
panic!("error opening simulator test file {db_path:?}: {e:?}");
@@ -245,6 +254,7 @@ impl SimulatorEnv {
db,
type_: simulation_type,
phase: SimulationPhase::Test,
tables_snapshot: None,
}
}
@@ -362,6 +372,8 @@ pub(crate) struct SimulatorOpts {
pub(crate) page_size: usize,
pub(crate) max_time_simulation: usize,
pub(crate) latency_probability: usize,
pub(crate) experimental_mvcc: bool,
pub(crate) experimental_indexes: bool,
}
#[derive(Debug, Clone)]

View File

@@ -7,6 +7,8 @@ use rand::Rng as _;
use rand_chacha::ChaCha8Rng;
use tracing::{instrument, Level};
use turso_core::{CompletionType, File, Result};
use crate::model::FAULT_ERROR_MSG;
pub(crate) struct SimulatorFile {
pub(crate) inner: Arc<dyn File>,
pub(crate) fault: Cell<bool>,
@@ -88,7 +90,7 @@ impl File for SimulatorFile {
fn lock_file(&self, exclusive: bool) -> Result<()> {
if self.fault.get() {
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
FAULT_ERROR_MSG.into(),
));
}
self.inner.lock_file(exclusive)
@@ -97,7 +99,7 @@ impl File for SimulatorFile {
fn unlock_file(&self) -> Result<()> {
if self.fault.get() {
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
FAULT_ERROR_MSG.into(),
));
}
self.inner.unlock_file()
@@ -113,7 +115,7 @@ impl File for SimulatorFile {
tracing::debug!("pread fault");
self.nr_pread_faults.set(self.nr_pread_faults.get() + 1);
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
FAULT_ERROR_MSG.into(),
));
}
if let Some(latency) = self.generate_latency_duration() {
@@ -148,7 +150,7 @@ impl File for SimulatorFile {
tracing::debug!("pwrite fault");
self.nr_pwrite_faults.set(self.nr_pwrite_faults.get() + 1);
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
FAULT_ERROR_MSG.into(),
));
}
if let Some(latency) = self.generate_latency_duration() {
@@ -178,7 +180,7 @@ impl File for SimulatorFile {
tracing::debug!("sync fault");
self.nr_sync_faults.set(self.nr_sync_faults.get() + 1);
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
FAULT_ERROR_MSG.into(),
));
}
if let Some(latency) = self.generate_latency_duration() {

View File

@@ -7,7 +7,7 @@ use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha8Rng;
use turso_core::{Clock, Instant, MemoryIO, OpenFlags, PlatformIO, Result, IO};
use crate::runner::file::SimulatorFile;
use crate::{model::FAULT_ERROR_MSG, runner::file::SimulatorFile};
pub(crate) struct SimulatorIO {
pub(crate) inner: Box<dyn IO>,
@@ -104,7 +104,7 @@ impl IO for SimulatorIO {
self.nr_run_once_faults
.replace(self.nr_run_once_faults.get() + 1);
return Err(turso_core::LimboError::InternalError(
"Injected fault".into(),
FAULT_ERROR_MSG.into(),
));
}
self.inner.run_once()?;

View File

@@ -62,6 +62,7 @@ impl InteractionPlan {
.uses()
.iter()
.any(|t| depending_tables.contains(t));
if has_table {
// Remove the extensional parts of the properties
if let Interactions::Property(p) = interactions {
@@ -86,13 +87,15 @@ impl InteractionPlan {
.iter()
.any(|t| depending_tables.contains(t));
}
has_table
&& !matches!(
interactions,
Interactions::Query(Query::Select(_))
| Interactions::Property(Property::SelectLimit { .. })
| Interactions::Property(Property::SelectSelectOptimizer { .. })
)
let is_fault = matches!(interactions, Interactions::Fault(..));
is_fault
|| (has_table
&& !matches!(
interactions,
Interactions::Query(Query::Select(_))
| Interactions::Property(Property::SelectLimit { .. })
| Interactions::Property(Property::SelectSelectOptimizer { .. })
))
};
idx += 1;
retain