Merge 'Simulator: Concurrent transactions' from Pedro Muniz

Depends on #3272.
First big step towards: #1851
- Add ignore error flag to `Interaction` to ignore parse errors when
needed, and still properly report other errors from intermediate
queries.
- adjusted shrinking to accommodate transaction statements from
different connections and properly remove extensional queries from some
properties
- MVCC: generates `Begin Concurrent` and `Commit` statements that are
interleaved to test snapshot isolation between connection transactions.
- MVCC: if the next interactions are going to contain a DDL statement,
we first commit all transaction and execute the DDL statements serially

Closes #3278
This commit is contained in:
Jussi Saurio
2025-10-01 12:53:32 +03:00
committed by GitHub
10 changed files with 515 additions and 272 deletions

7
Cargo.lock generated
View File

@@ -321,6 +321,12 @@ dependencies = [
"serde",
]
[[package]]
name = "bitmaps"
version = "3.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d084b0137aaa901caf9f1e8b21daa6aa24d41cd806e111335541eff9683bd6"
[[package]]
name = "blake3"
version = "1.7.0"
@@ -2331,6 +2337,7 @@ name = "limbo_sim"
version = "0.2.0-pre.11"
dependencies = [
"anyhow",
"bitmaps",
"chrono",
"clap",
"dirs 6.0.0",

View File

@@ -98,6 +98,7 @@ regex = "1.11.1"
regex-syntax = { version = "0.8.5", default-features = false }
similar = { version = "2.7.0" }
similar-asserts = { version = "1.7.0" }
bitmaps = { version = "3.2.1", default-features = false }
[profile.dev.package.similar]
opt-level = 3

View File

@@ -47,3 +47,4 @@ indexmap = { workspace = true }
either = "1.15.0"
similar = { workspace = true }
similar-asserts = { workspace = true }
bitmaps = { workspace = true }

View File

@@ -36,8 +36,10 @@ pub(crate) type ResultSet = Result<Vec<Vec<SimValue>>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct InteractionPlan {
pub plan: Vec<Interactions>,
plan: Vec<Interactions>,
pub mvcc: bool,
// Len should not count transactions statements, just so we can generate more meaningful interactions per run
len: usize,
}
impl InteractionPlan {
@@ -45,11 +47,16 @@ impl InteractionPlan {
Self {
plan: Vec::new(),
mvcc,
len: 0,
}
}
pub fn new_with(plan: Vec<Interactions>, mvcc: bool) -> Self {
Self { plan, mvcc }
let len = plan
.iter()
.filter(|interaction| !interaction.is_transaction())
.count();
Self { plan, mvcc, len }
}
#[inline]
@@ -57,23 +64,17 @@ impl InteractionPlan {
&self.plan
}
// TODO: this is just simplified logic so we can get something rolling with begin concurrent
// transactions in the simulator. Ideally when we generate the plan we will have begin and commits statements across interactions
/// Length of interactions that are not transaction statements
#[inline]
pub fn len(&self) -> usize {
self.len
}
pub fn push(&mut self, interactions: Interactions) {
if self.mvcc {
let conn_index = interactions.connection_index;
let begin = Interactions::new(
conn_index,
InteractionsType::Query(Query::Begin(Begin::Concurrent)),
);
let commit =
Interactions::new(conn_index, InteractionsType::Query(Query::Commit(Commit)));
self.plan.push(begin);
self.plan.push(interactions);
self.plan.push(commit);
} else {
self.plan.push(interactions);
if !interactions.is_transaction() {
self.len += 1;
}
self.plan.push(interactions);
}
/// Compute via diff computes a a plan from a given `.plan` file without the need to parse
@@ -218,7 +219,7 @@ impl InteractionPlan {
let create_query = Create::arbitrary(&mut env.rng.clone(), &env.connection_context(0));
// initial query starts at 0th connection
plan.plan.push(Interactions::new(
plan.push(Interactions::new(
0,
InteractionsType::Query(Query::Create(create_query)),
));
@@ -234,19 +235,69 @@ impl InteractionPlan {
) -> Option<Vec<Interaction>> {
let num_interactions = env.opts.max_interactions as usize;
if self.len() < num_interactions {
tracing::debug!("Generating interaction {}/{}", self.len(), num_interactions);
let interactions = {
let conn_index = env.choose_conn(rng);
let conn_index = env.choose_conn(rng);
let interactions = if self.mvcc && !env.conn_in_transaction(conn_index) {
let query = Query::Begin(Begin::Concurrent);
Interactions::new(conn_index, InteractionsType::Query(query))
} else if self.mvcc
&& env.conn_in_transaction(conn_index)
&& env.has_conn_executed_query_after_transaction(conn_index)
&& rng.random_bool(0.4)
{
let query = Query::Commit(Commit);
Interactions::new(conn_index, InteractionsType::Query(query))
} else {
let conn_ctx = &env.connection_context(conn_index);
Interactions::arbitrary_from(rng, conn_ctx, (env, self.stats(), conn_index))
};
let out_interactions = interactions.interactions();
tracing::debug!("Generating interaction {}/{}", self.len(), num_interactions);
let mut out_interactions = interactions.interactions();
assert!(!out_interactions.is_empty());
let out_interactions = if self.mvcc
&& out_interactions
.iter()
.any(|interaction| interaction.is_ddl())
{
// DDL statements must be serial, so commit all connections and then execute the DDL
let mut commit_interactions = (0..env.connections.len())
.filter(|&idx| env.conn_in_transaction(idx))
.map(|idx| {
let query = Query::Commit(Commit);
let interaction = Interactions::new(idx, InteractionsType::Query(query));
let out_interactions = interaction.interactions();
self.push(interaction);
out_interactions
})
.fold(
Vec::with_capacity(env.connections.len()),
|mut accum, mut curr| {
accum.append(&mut curr);
accum
},
);
commit_interactions.append(&mut out_interactions);
commit_interactions
} else {
out_interactions
};
self.push(interactions);
Some(out_interactions)
} else {
None
// after we generated all interactions if some connection is still in a transaction, commit
(0..env.connections.len())
.find(|idx| env.conn_in_transaction(*idx))
.map(|conn_index| {
let query = Query::Commit(Commit);
let interaction = Interactions::new(conn_index, InteractionsType::Query(query));
let out_interactions = interaction.interactions();
self.push(interaction);
out_interactions
})
}
}
@@ -271,7 +322,7 @@ impl InteractionPlan {
}
impl Deref for InteractionPlan {
type Target = [Interactions];
type Target = Vec<Interactions>;
fn deref(&self) -> &Self::Target {
&self.plan
@@ -294,11 +345,32 @@ impl IntoIterator for InteractionPlan {
}
}
impl<'a> IntoIterator for &'a InteractionPlan {
type Item = &'a Interactions;
type IntoIter = <&'a Vec<Interactions> as IntoIterator>::IntoIter;
fn into_iter(self) -> Self::IntoIter {
self.plan.iter()
}
}
impl<'a> IntoIterator for &'a mut InteractionPlan {
type Item = &'a mut Interactions;
type IntoIter = <&'a mut Vec<Interactions> as IntoIterator>::IntoIter;
fn into_iter(self) -> Self::IntoIter {
self.plan.iter_mut()
}
}
pub trait InteractionPlanIterator {
fn next(&mut self, env: &mut SimulatorEnv) -> Option<Interaction>;
}
impl<T: InteractionPlanIterator> InteractionPlanIterator for &mut T {
#[inline]
fn next(&mut self, env: &mut SimulatorEnv) -> Option<Interaction> {
T::next(self, env)
}
@@ -339,6 +411,7 @@ impl<I> InteractionPlanIterator for PlanIterator<I>
where
I: Iterator<Item = Interaction>,
{
#[inline]
fn next(&mut self, _env: &mut SimulatorEnv) -> Option<Interaction> {
self.iter.next()
}
@@ -367,6 +440,13 @@ impl Interactions {
interactions,
}
}
pub fn get_extensional_queries(&mut self) -> Option<&mut Vec<Query>> {
match &mut self.interactions {
InteractionsType::Property(property) => property.get_extensional_queries(),
InteractionsType::Query(..) | InteractionsType::Fault(..) => None,
}
}
}
impl Deref for Interactions {
@@ -390,26 +470,11 @@ pub enum InteractionsType {
Fault(Fault),
}
impl Shadow for Interactions {
type Result = ();
fn shadow(&self, tables: &mut ShadowTablesMut) {
match &self.interactions {
InteractionsType::Property(property) => {
let initial_tables = tables.clone();
for interaction in property.interactions(self.connection_index) {
let res = interaction.shadow(tables);
if res.is_err() {
// If any interaction fails, we reset the tables to the initial state
**tables = initial_tables.clone();
break;
}
}
}
InteractionsType::Query(query) => {
let _ = query.shadow(tables);
}
InteractionsType::Fault(_) => {}
impl InteractionsType {
pub fn is_transaction(&self) -> bool {
match self {
InteractionsType::Query(query) => query.is_transaction(),
_ => false,
}
}
}
@@ -489,7 +554,7 @@ impl Display for InteractionPlan {
writeln!(f, "-- FAULT '{fault}'")?;
}
InteractionsType::Query(query) => {
writeln!(f, "{query};")?;
writeln!(f, "{query}; -- {}", interactions.connection_index)?;
}
}
}
@@ -575,7 +640,7 @@ impl Assertion {
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub(crate) enum Fault {
pub enum Fault {
Disconnect,
ReopenDatabase,
}
@@ -593,6 +658,7 @@ impl Display for Fault {
pub struct Interaction {
pub connection_index: usize,
pub interaction: InteractionType,
pub ignore_error: bool,
}
impl Deref for Interaction {
@@ -614,6 +680,15 @@ impl Interaction {
Self {
connection_index,
interaction,
ignore_error: false,
}
}
pub fn new_ignore_error(connection_index: usize, interaction: InteractionType) -> Self {
Self {
connection_index,
interaction,
ignore_error: true,
}
}
}
@@ -633,7 +708,7 @@ pub enum InteractionType {
// FIXME: add the connection index here later
impl Display for Interaction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.interaction)
write!(f, "{}; -- {}", self.interaction, self.connection_index)
}
}
@@ -641,13 +716,13 @@ impl Display for InteractionType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Query(query) => write!(f, "{query}"),
Self::Assumption(assumption) => write!(f, "-- ASSUME {};", assumption.name),
Self::Assumption(assumption) => write!(f, "-- ASSUME {}", assumption.name),
Self::Assertion(assertion) => {
write!(f, "-- ASSERT {};", assertion.name)
}
Self::Fault(fault) => write!(f, "-- FAULT '{fault}';"),
Self::Fault(fault) => write!(f, "-- FAULT '{fault}'"),
Self::FsyncQuery(query) => {
writeln!(f, "-- FSYNC QUERY;")?;
writeln!(f, "-- FSYNC QUERY")?;
writeln!(f, "{query};")?;
write!(f, "{query};")
}
@@ -660,19 +735,31 @@ impl Shadow for InteractionType {
type Result = anyhow::Result<Vec<Vec<SimValue>>>;
fn shadow(&self, env: &mut ShadowTablesMut) -> Self::Result {
match self {
Self::Query(query) => query.shadow(env),
Self::FsyncQuery(query) => {
let mut first = query.shadow(env)?;
first.extend(query.shadow(env)?);
Ok(first)
}
Self::Assumption(_) | Self::Assertion(_) | Self::Fault(_) | Self::FaultyQuery(_) => {
Ok(vec![])
Self::Query(query) => {
if !query.is_transaction() {
env.add_query(query);
}
query.shadow(env)
}
Self::Assumption(_)
| Self::Assertion(_)
| Self::Fault(_)
| Self::FaultyQuery(_)
| Self::FsyncQuery(_) => Ok(vec![]),
}
}
}
impl InteractionType {
pub fn is_ddl(&self) -> bool {
match self {
InteractionType::Query(query)
| InteractionType::FsyncQuery(query)
| InteractionType::FaultyQuery(query) => query.is_ddl(),
_ => false,
}
}
pub(crate) fn execute_query(&self, conn: &mut Arc<Connection>) -> ResultSet {
if let Self::Query(query) = self {
let query_str = query.to_string();
@@ -779,13 +866,15 @@ impl InteractionType {
match fault {
Fault::Disconnect => {
if env.connections[conn_index].is_connected() {
if env.conn_in_transaction(conn_index) {
env.rollback_conn(conn_index);
}
env.connections[conn_index].disconnect();
} else {
return Err(turso_core::LimboError::InternalError(
"connection already disconnected".into(),
));
}
env.connections[conn_index] = SimConnection::Disconnected;
}
Fault::ReopenDatabase => {
reopen_database(env);

View File

@@ -27,7 +27,7 @@ use super::plan::{Assertion, Interaction, InteractionStats, ResultSet};
/// Properties are representations of executable specifications
/// about the database behavior.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum Property {
pub enum Property {
/// Insert-Select is a property in which the inserted row
/// must be in the resulting rows of a select query that has a
/// where clause that matches the inserted row.
@@ -190,6 +190,10 @@ pub(crate) enum Property {
query: Query,
tables: Vec<String>,
},
/// Property used to subsititute a property with its queries only
Queries {
queries: Vec<Query>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -213,8 +217,27 @@ impl Property {
Property::FsyncNoWait { .. } => "FsyncNoWait",
Property::FaultyQuery { .. } => "FaultyQuery",
Property::UNIONAllPreservesCardinality { .. } => "UNION-All-Preserves-Cardinality",
Property::Queries { .. } => "Queries",
}
}
pub fn get_extensional_queries(&mut self) -> Option<&mut Vec<Query>> {
match self {
Property::InsertValuesSelect { queries, .. }
| Property::DoubleCreateFailure { queries, .. }
| Property::DeleteSelect { queries, .. }
| Property::DropSelect { queries, .. }
| Property::Queries { queries } => Some(queries),
Property::FsyncNoWait { .. } | Property::FaultyQuery { .. } => None,
Property::SelectLimit { .. }
| Property::SelectSelectOptimizer { .. }
| Property::WhereTrueFalseNull { .. }
| Property::UNIONAllPreservesCardinality { .. }
| Property::ReadYourUpdatesBack { .. }
| Property::TableHasExpectedContent { .. } => None,
}
}
/// interactions construct a list of interactions, which is an executable representation of the property.
/// the requirement of property -> vec<interaction> conversion emerges from the need to serialize the property,
/// and `interaction` cannot be serialized directly.
@@ -447,12 +470,14 @@ impl Property {
let table_name = create.table.name.clone();
let assertion = InteractionType::Assertion(Assertion::new("creating two tables with the name should result in a failure for the second query"
.to_string(), move |stack: &Vec<ResultSet>, _| {
.to_string(), move |stack: &Vec<ResultSet>, env| {
let last = stack.last().unwrap();
match last {
Ok(success) => Ok(Err(format!("expected table creation to fail but it succeeded: {success:?}"))),
Err(e) => {
if e.to_string().to_lowercase().contains(&format!("table {table_name} already exists")) {
// On error we rollback the transaction if there is any active here
env.rollback_conn(connection_index);
Ok(Ok(()))
} else {
Ok(Err(format!("expected table already exists error, got: {e}")))
@@ -470,7 +495,7 @@ impl Property {
.into_iter()
.map(|q| Interaction::new(connection_index, InteractionType::Query(q))),
);
interactions.push(Interaction::new(connection_index, cq2));
interactions.push(Interaction::new_ignore_error(connection_index, cq2));
interactions.push(Interaction::new(connection_index, assertion));
interactions
@@ -804,6 +829,9 @@ impl Property {
// We cannot make any assumptions about the error content; all we are about is, if the statement errored,
// we don't shadow the results into the simulator env, i.e. we assume whatever the statement did was rolled back.
tracing::error!("Fault injection produced error: {err}");
// On error we rollback the transaction if there is any active here
env.rollback_conn(connection_index);
Ok(Ok(()))
}
}
@@ -1023,6 +1051,11 @@ impl Property {
)),
].into_iter().map(|i| Interaction::new(connection_index, i)).collect()
}
Property::Queries { queries } => queries
.clone()
.into_iter()
.map(|query| Interaction::new(connection_index, InteractionType::Query(query)))
.collect(),
}
}
}
@@ -1159,6 +1192,7 @@ fn property_insert_values_select<R: rand::Rng>(
rng: &mut R,
remaining: &Remaining,
ctx: &impl GenerationContext,
mvcc: bool,
) -> Property {
// Get a random table
let table = pick(ctx.tables(), rng);
@@ -1178,7 +1212,7 @@ fn property_insert_values_select<R: rand::Rng>(
};
// Choose if we want queries to be executed in an interactive transaction
let interactive = if rng.random_bool(0.5) {
let interactive = if !mvcc && rng.random_bool(0.5) {
Some(InteractiveQueryInfo {
start_with_immediate: rng.random_bool(0.5),
end_with_commit: rng.random_bool(0.5),
@@ -1536,7 +1570,14 @@ impl ArbitraryFrom<(&SimulatorEnv, &InteractionStats)> for Property {
} else {
0
},
Box::new(|rng: &mut R| property_insert_values_select(rng, &remaining_, conn_ctx)),
Box::new(|rng: &mut R| {
property_insert_values_select(
rng,
&remaining_,
conn_ctx,
env.profile.experimental_mvcc,
)
}),
),
(
remaining_.select.max(1),

View File

@@ -61,6 +61,22 @@ impl Query {
Query::Begin(..) | Query::Commit(..) | Query::Rollback(..) => vec![],
}
}
#[inline]
pub fn is_transaction(&self) -> bool {
matches!(
self,
Self::Begin(..) | Self::Commit(..) | Self::Rollback(..)
)
}
#[inline]
pub fn is_ddl(&self) -> bool {
matches!(
self,
Self::Create(..) | Self::CreateIndex(..) | Self::Drop(..)
)
}
}
impl Display for Query {

View File

@@ -28,7 +28,7 @@ pub struct Profile {
#[garde(skip)]
/// Experimental MVCC feature
pub experimental_mvcc: bool,
#[garde(range(min = 1))]
#[garde(range(min = 1, max = 64))]
pub max_connections: usize,
#[garde(dive)]
pub io: IOProfile,

View File

@@ -5,13 +5,17 @@ use std::panic::UnwindSafe;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use bitmaps::Bitmap;
use garde::Validate;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
use sql_generation::generation::GenerationContext;
use sql_generation::model::query::transaction::Rollback;
use sql_generation::model::table::Table;
use turso_core::Database;
use crate::generation::Shadow;
use crate::model::Query;
use crate::profiles::Profile;
use crate::runner::SimIO;
use crate::runner::io::SimulatorIO;
@@ -32,21 +36,37 @@ pub(crate) enum SimulationPhase {
Shrink,
}
#[derive(Debug, Clone)]
pub struct TransactionTables {
current_tables: Vec<Table>,
pending_changes: Vec<Query>,
}
impl TransactionTables {
pub fn new(tables: Vec<Table>) -> Self {
Self {
current_tables: tables,
pending_changes: Vec::new(),
}
}
}
#[derive(Debug)]
pub struct ShadowTables<'a> {
commited_tables: &'a Vec<Table>,
transaction_tables: Option<&'a Vec<Table>>,
transaction_tables: Option<&'a TransactionTables>,
}
#[derive(Debug)]
pub struct ShadowTablesMut<'a> {
commited_tables: &'a mut Vec<Table>,
transaction_tables: &'a mut Option<Vec<Table>>,
transaction_tables: &'a mut Option<TransactionTables>,
}
impl<'a> ShadowTables<'a> {
fn tables(&self) -> &'a Vec<Table> {
self.transaction_tables.map_or(self.commited_tables, |v| v)
self.transaction_tables
.map_or(self.commited_tables, |v| &v.current_tables)
}
}
@@ -65,30 +85,48 @@ where
fn tables(&'a self) -> &'a Vec<Table> {
self.transaction_tables
.as_ref()
.map(|t| &t.current_tables)
.unwrap_or(self.commited_tables)
}
fn tables_mut(&'b mut self) -> &'b mut Vec<Table> {
self.transaction_tables
.as_mut()
.map(|t| &mut t.current_tables)
.unwrap_or(self.commited_tables)
}
pub fn create_snapshot(&mut self) {
*self.transaction_tables = Some(self.commited_tables.clone());
*self.transaction_tables = Some(TransactionTables::new(self.commited_tables.clone()));
}
pub fn apply_snapshot(&mut self) {
// TODO: as we do not have concurrent tranasactions yet in the simulator
// there is no conflict we are ignoring conflict problems right now
if let Some(transation_tables) = self.transaction_tables.take() {
*self.commited_tables = transation_tables
let mut shadow_table = ShadowTablesMut {
commited_tables: self.commited_tables,
transaction_tables: &mut None,
};
for query in transation_tables.pending_changes {
// TODO: maybe panic on shadow error here
let _ = query.shadow(&mut shadow_table);
}
}
}
pub fn delete_snapshot(&mut self) {
*self.transaction_tables = None;
}
/// Append non transaction queries to the shadow tables
pub fn add_query(&mut self, query: &Query) {
assert!(!query.is_transaction());
if let Some(transaction_tables) = self.transaction_tables {
transaction_tables.pending_changes.push(query.clone());
}
}
}
impl<'a> Deref for ShadowTablesMut<'a> {
@@ -105,36 +143,6 @@ impl<'a> DerefMut for ShadowTablesMut<'a> {
}
}
#[derive(Debug, Clone)]
pub(crate) struct SimulatorTables {
pub(crate) tables: Vec<Table>,
pub(crate) snapshot: Option<Vec<Table>>,
}
impl SimulatorTables {
pub(crate) fn new() -> Self {
Self {
tables: Vec::new(),
snapshot: None,
}
}
pub(crate) fn clear(&mut self) {
self.tables.clear();
self.snapshot = None;
}
pub(crate) fn push(&mut self, table: Table) {
self.tables.push(table);
}
}
impl Deref for SimulatorTables {
type Target = Vec<Table>;
fn deref(&self) -> &Self::Target {
&self.tables
}
}
pub(crate) struct SimulatorEnv {
pub(crate) opts: SimulatorOpts,
pub profile: Profile,
@@ -150,7 +158,13 @@ pub(crate) struct SimulatorEnv {
pub memory_io: bool,
/// If connection state is None, means we are not in a transaction
pub connection_tables: Vec<Option<Vec<Table>>>,
pub connection_tables: Vec<Option<TransactionTables>>,
/// Bit map indicating whether a connection has executed a query that is not transaction related
///
/// E.g Select, Insert, Create
/// and not Begin, Commit, Rollback \
/// Has max size of 64 to accomodate 64 connections
connection_last_query: Bitmap<64>,
// Table data that is committed into the database or wal
pub committed_tables: Vec<Table>,
}
@@ -175,6 +189,7 @@ impl SimulatorEnv {
.collect(),
// TODO: not sure if connection_tables should be recreated instead
connection_tables: self.connection_tables.clone(),
connection_last_query: self.connection_last_query,
committed_tables: self.committed_tables.clone(),
}
}
@@ -393,6 +408,7 @@ impl SimulatorEnv {
profile: profile.clone(),
committed_tables: Vec::new(),
connection_tables: vec![None; profile.max_connections],
connection_last_query: Bitmap::new(),
}
}
@@ -430,11 +446,8 @@ impl SimulatorEnv {
/// Clears the commited tables and the connection tables
pub fn clear_tables(&mut self) {
self.committed_tables.clear();
self.connection_tables.iter_mut().for_each(|t| {
if let Some(t) = t {
t.clear();
}
});
self.connection_tables.iter_mut().for_each(|t| *t = None);
self.connection_last_query = Bitmap::new();
}
// TODO: does not yet create the appropriate context to avoid WriteWriteConflitcs
@@ -462,14 +475,41 @@ impl SimulatorEnv {
}
}
pub fn get_conn_tables<'a>(&'a self, conn_index: usize) -> ShadowTables<'a> {
pub fn conn_in_transaction(&self, conn_index: usize) -> bool {
self.connection_tables
.get(conn_index)
.is_some_and(|t| t.is_some())
}
pub fn has_conn_executed_query_after_transaction(&self, conn_index: usize) -> bool {
self.connection_last_query.get(conn_index)
}
pub fn update_conn_last_interaction(&mut self, conn_index: usize, query: Option<&Query>) {
// If the conn will execute a transaction statement then we set the bitmap to false
// to indicate we have not executed any queries yet after the transaction begun
let value = query.is_some_and(|query| {
matches!(
query,
Query::Begin(..) | Query::Commit(..) | Query::Rollback(..)
)
});
self.connection_last_query.set(conn_index, value);
}
pub fn rollback_conn(&mut self, conn_index: usize) {
Rollback.shadow(&mut self.get_conn_tables_mut(conn_index));
self.update_conn_last_interaction(conn_index, Some(&Query::Rollback(Rollback)));
}
pub fn get_conn_tables(&self, conn_index: usize) -> ShadowTables<'_> {
ShadowTables {
transaction_tables: self.connection_tables.get(conn_index).unwrap().as_ref(),
commited_tables: &self.committed_tables,
}
}
pub fn get_conn_tables_mut<'a>(&'a mut self, conn_index: usize) -> ShadowTablesMut<'a> {
pub fn get_conn_tables_mut(&mut self, conn_index: usize) -> ShadowTablesMut<'_> {
ShadowTablesMut {
transaction_tables: self.connection_tables.get_mut(conn_index).unwrap(),
commited_tables: &mut self.committed_tables,

View File

@@ -160,7 +160,7 @@ pub fn execute_interaction(
}
}
#[instrument(skip(env, interaction, stack), fields(seed = %env.opts.seed, interaction = %interaction))]
#[instrument(skip(env, interaction, stack), fields(conn_index = interaction.connection_index, interaction = %interaction))]
pub fn execute_interaction_turso(
env: &mut SimulatorEnv,
interaction: &Interaction,
@@ -173,23 +173,29 @@ pub fn execute_interaction_turso(
// Leave this empty info! here to print the span of the execution
tracing::info!("");
match &interaction.interaction {
InteractionType::Query(_) => {
InteractionType::Query(query) => {
tracing::debug!(?interaction);
let results = interaction.execute_query(conn);
if results.is_err() {
tracing::error!(?results);
let results = interaction
.execute_query(conn)
.inspect_err(|err| tracing::error!(?err));
if let Err(err) = &results
&& !interaction.ignore_error
{
return Err(err.clone());
}
stack.push(results);
// TODO: skip integrity check with mvcc
if !env.profile.experimental_mvcc {
limbo_integrity_check(conn)?;
}
env.update_conn_last_interaction(interaction.connection_index, Some(query));
}
InteractionType::FsyncQuery(query) => {
let results = interaction.execute_fsync_query(conn.clone(), env);
if results.is_err() {
tracing::error!(?results);
}
let results = interaction
.execute_fsync_query(conn.clone(), env)
.inspect_err(|err| tracing::error!(?err));
stack.push(results);
let query_interaction = Interaction::new(
@@ -217,10 +223,10 @@ pub fn execute_interaction_turso(
}
InteractionType::FaultyQuery(_) => {
let conn = conn.clone();
let results = interaction.execute_faulty_query(&conn, env);
if results.is_err() {
tracing::error!(?results);
}
let results = interaction
.execute_faulty_query(&conn, env)
.inspect_err(|err| tracing::error!(?err));
stack.push(results);
// Reset fault injection
env.io.inject_fault(false);
@@ -296,8 +302,14 @@ fn execute_interaction_rusqlite(
let results = execute_query_rusqlite(conn, query).map_err(|e| {
turso_core::LimboError::InternalError(format!("error executing query: {e}"))
});
if let Err(err) = &results
&& !interaction.ignore_error
{
return Err(err.clone());
}
tracing::debug!("{:?}", results);
stack.push(results);
env.update_conn_last_interaction(interaction.connection_index, Some(query));
}
InteractionType::FsyncQuery(..) => {
unimplemented!("cannot implement fsync query in rusqlite, as we do not control IO");
@@ -332,10 +344,6 @@ fn execute_query_rusqlite(
query: &Query,
) -> rusqlite::Result<Vec<Vec<SimValue>>> {
match query {
Query::Create(create) => {
connection.execute(create.to_string().as_str(), ())?;
Ok(vec![])
}
Query::Select(select) => {
let mut stmt = connection.prepare(select.to_string().as_str())?;
let columns = stmt.column_count();
@@ -360,36 +368,8 @@ fn execute_query_rusqlite(
}
Ok(result)
}
Query::Insert(insert) => {
connection.execute(insert.to_string().as_str(), ())?;
Ok(vec![])
}
Query::Delete(delete) => {
connection.execute(delete.to_string().as_str(), ())?;
Ok(vec![])
}
Query::Drop(drop) => {
connection.execute(drop.to_string().as_str(), ())?;
Ok(vec![])
}
Query::Update(update) => {
connection.execute(update.to_string().as_str(), ())?;
Ok(vec![])
}
Query::CreateIndex(create_index) => {
connection.execute(create_index.to_string().as_str(), ())?;
Ok(vec![])
}
Query::Begin(begin) => {
connection.execute(begin.to_string().as_str(), ())?;
Ok(vec![])
}
Query::Commit(commit) => {
connection.execute(commit.to_string().as_str(), ())?;
Ok(vec![])
}
Query::Rollback(rollback) => {
connection.execute(rollback.to_string().as_str(), ())?;
_ => {
connection.execute(query.to_string().as_str(), ())?;
Ok(vec![])
}
}

View File

@@ -1,3 +1,5 @@
use indexmap::IndexSet;
use crate::{
SandboxedResult, SimulatorEnv,
generation::{
@@ -8,7 +10,21 @@ use crate::{
run_simulation,
runner::execution::Execution,
};
use std::sync::{Arc, Mutex};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
fn retain_relevant_queries(
extensional_queries: &mut Vec<Query>,
depending_tables: &IndexSet<String>,
) {
extensional_queries.retain(|query| {
query.is_transaction()
|| (!matches!(query, Query::Select(..))
&& query.uses().iter().any(|t| depending_tables.contains(t)))
});
}
impl InteractionPlan {
/// Create a smaller interaction plan by deleting a property
@@ -22,7 +38,7 @@ impl InteractionPlan {
let secondary_interactions_index = all_interactions[failing_execution.interaction_index].0;
// Index of the parent property where the interaction originated from
let failing_property = &self.plan[secondary_interactions_index];
let failing_property = &self[secondary_interactions_index];
let mut depending_tables = failing_property.dependencies();
{
@@ -57,120 +73,11 @@ impl InteractionPlan {
let before = self.len();
// Remove all properties after the failing one
plan.plan.truncate(secondary_interactions_index + 1);
plan.truncate(secondary_interactions_index + 1);
// means we errored in some fault on transaction statement so just maintain the statements from before the failing one
if !depending_tables.is_empty() {
let mut idx = 0;
// Remove all properties that do not use the failing tables
plan.plan.retain_mut(|interactions| {
let retain = if idx == secondary_interactions_index {
if let InteractionsType::Property(
Property::FsyncNoWait { tables, .. } | Property::FaultyQuery { tables, .. },
) = &mut interactions.interactions
{
tables.retain(|table| depending_tables.contains(table));
}
true
} else if matches!(
interactions.interactions,
InteractionsType::Query(Query::Begin(..))
| InteractionsType::Query(Query::Commit(..))
| InteractionsType::Query(Query::Rollback(..))
) {
true
} else {
let mut has_table = interactions
.uses()
.iter()
.any(|t| depending_tables.contains(t));
if has_table {
// Remove the extensional parts of the properties
if let InteractionsType::Property(p) = &mut interactions.interactions {
match p {
Property::InsertValuesSelect { queries, .. }
| Property::DoubleCreateFailure { queries, .. }
| Property::DeleteSelect { queries, .. }
| Property::DropSelect { queries, .. } => {
queries.clear();
}
Property::FsyncNoWait { tables, query }
| Property::FaultyQuery { tables, query } => {
if !query.uses().iter().any(|t| depending_tables.contains(t)) {
tables.clear();
} else {
tables.retain(|table| depending_tables.contains(table));
}
}
Property::SelectLimit { .. }
| Property::SelectSelectOptimizer { .. }
| Property::WhereTrueFalseNull { .. }
| Property::UNIONAllPreservesCardinality { .. }
| Property::ReadYourUpdatesBack { .. }
| Property::TableHasExpectedContent { .. } => {}
}
}
// Check again after query clear if the interactions still uses the failing table
has_table = interactions
.uses()
.iter()
.any(|t| depending_tables.contains(t));
}
let is_fault = matches!(interactions.interactions, InteractionsType::Fault(..));
is_fault
|| (has_table
&& !matches!(
interactions.interactions,
InteractionsType::Query(Query::Select(_))
| InteractionsType::Property(Property::SelectLimit { .. })
| InteractionsType::Property(
Property::SelectSelectOptimizer { .. }
)
))
};
idx += 1;
retain
});
// Comprise of idxs of Begin interactions
let mut begin_idx = Vec::new();
// Comprise of idxs of the intereactions Commit and Rollback
let mut end_tx_idx = Vec::new();
for (idx, interactions) in plan.plan.iter().enumerate() {
match &interactions.interactions {
InteractionsType::Query(Query::Begin(..)) => {
begin_idx.push(idx);
}
InteractionsType::Query(Query::Commit(..))
| InteractionsType::Query(Query::Rollback(..)) => {
let last_begin = begin_idx.last().unwrap() + 1;
if last_begin == idx {
end_tx_idx.push(idx);
}
}
_ => {}
}
}
// remove interactions if its just a Begin Commit/Rollback with no queries in the middle
let mut range_transactions = end_tx_idx.into_iter().peekable();
let mut idx = 0;
plan.plan.retain_mut(|_| {
let mut retain = true;
if let Some(txn_interaction_idx) = range_transactions.peek().copied() {
if txn_interaction_idx == idx {
range_transactions.next();
}
if txn_interaction_idx == idx || txn_interaction_idx.saturating_sub(1) == idx {
retain = false;
}
}
idx += 1;
retain
});
plan.remove_properties(&depending_tables, secondary_interactions_index);
}
let after = plan.len();
@@ -183,6 +90,166 @@ impl InteractionPlan {
plan
}
/// Remove all properties that do not use the failing tables
fn remove_properties(
&mut self,
depending_tables: &IndexSet<String>,
failing_interaction_index: usize,
) {
let mut idx = 0;
// Remove all properties that do not use the failing tables
self.retain_mut(|interactions| {
let retain = if idx == failing_interaction_index {
if let InteractionsType::Property(
Property::FsyncNoWait { tables, .. } | Property::FaultyQuery { tables, .. },
) = &mut interactions.interactions
{
tables.retain(|table| depending_tables.contains(table));
}
true
} else {
let mut has_table = interactions
.uses()
.iter()
.any(|t| depending_tables.contains(t));
if has_table {
// will contain extensional queries that reference the depending tables
let mut extensional_queries = Vec::new();
// Remove the extensional parts of the properties
if let InteractionsType::Property(p) = &mut interactions.interactions {
match p {
Property::InsertValuesSelect { queries, .. }
| Property::DoubleCreateFailure { queries, .. }
| Property::DeleteSelect { queries, .. }
| Property::DropSelect { queries, .. }
| Property::Queries { queries } => {
extensional_queries.append(queries);
}
Property::FsyncNoWait { tables, query }
| Property::FaultyQuery { tables, query } => {
if !query.uses().iter().any(|t| depending_tables.contains(t)) {
tables.clear();
} else {
tables.retain(|table| depending_tables.contains(table));
}
}
Property::SelectLimit { .. }
| Property::SelectSelectOptimizer { .. }
| Property::WhereTrueFalseNull { .. }
| Property::UNIONAllPreservesCardinality { .. }
| Property::ReadYourUpdatesBack { .. }
| Property::TableHasExpectedContent { .. } => {}
}
}
// Check again after query clear if the interactions still uses the failing table
has_table = interactions
.uses()
.iter()
.any(|t| depending_tables.contains(t));
// means the queries in the original property are present in the depending tables regardless of the extensional queries
if has_table {
if let Some(queries) = interactions.get_extensional_queries() {
retain_relevant_queries(&mut extensional_queries, depending_tables);
queries.append(&mut extensional_queries);
}
} else {
// original property without extensional queries does not reference the tables so convert the property to
// `Property::Queries` if `extensional_queries` is not empty
retain_relevant_queries(&mut extensional_queries, depending_tables);
if !extensional_queries.is_empty() {
has_table = true;
*interactions = Interactions::new(
interactions.connection_index,
InteractionsType::Property(Property::Queries {
queries: extensional_queries,
}),
);
}
}
}
let is_fault = matches!(interactions.interactions, InteractionsType::Fault(..));
let is_transaction = matches!(
interactions.interactions,
InteractionsType::Query(Query::Begin(..))
| InteractionsType::Query(Query::Commit(..))
| InteractionsType::Query(Query::Rollback(..))
);
is_fault
|| is_transaction
|| (has_table
&& !matches!(
interactions.interactions,
InteractionsType::Query(Query::Select(_))
| InteractionsType::Property(Property::SelectLimit { .. })
| InteractionsType::Property(
Property::SelectSelectOptimizer { .. }
)
))
};
idx += 1;
retain
});
// Comprises of idxs of Begin interactions
let mut begin_idx: HashMap<usize, Vec<usize>> = HashMap::new();
// Comprises of idxs of Commit and Rollback intereactions
let mut end_tx_idx: HashMap<usize, Vec<usize>> = HashMap::new();
for (idx, interactions) in self.iter().enumerate() {
match &interactions.interactions {
InteractionsType::Query(Query::Begin(..)) => {
begin_idx
.entry(interactions.connection_index)
.or_insert_with(|| vec![idx]);
}
InteractionsType::Query(Query::Commit(..))
| InteractionsType::Query(Query::Rollback(..)) => {
let last_begin = begin_idx
.get(&interactions.connection_index)
.and_then(|list| list.last())
.unwrap()
+ 1;
if last_begin == idx {
end_tx_idx
.entry(interactions.connection_index)
.or_insert_with(|| vec![idx]);
}
}
_ => {}
}
}
// remove interactions if its just a Begin Commit/Rollback with no queries in the middle
let mut range_transactions = end_tx_idx
.into_iter()
.map(|(conn_index, list)| (conn_index, list.into_iter().peekable()))
.collect::<HashMap<_, _>>();
let mut idx = 0;
self.retain_mut(|interactions| {
let mut retain = true;
let iter = range_transactions.get_mut(&interactions.connection_index);
if let Some(iter) = iter {
if let Some(txn_interaction_idx) = iter.peek().copied() {
if txn_interaction_idx == idx {
iter.next();
}
if txn_interaction_idx == idx || txn_interaction_idx.saturating_sub(1) == idx {
retain = false;
}
}
}
idx += 1;
retain
});
}
/// Create a smaller interaction plan by deleting a property
pub(crate) fn brute_shrink_interaction_plan(
&self,
@@ -235,16 +302,17 @@ impl InteractionPlan {
let before = self.len();
plan.plan.truncate(secondary_interactions_index + 1);
plan.truncate(secondary_interactions_index + 1);
// phase 1: shrink extensions
for interaction in &mut plan.plan {
for interaction in &mut plan {
if let InteractionsType::Property(property) = &mut interaction.interactions {
match property {
Property::InsertValuesSelect { queries, .. }
| Property::DoubleCreateFailure { queries, .. }
| Property::DeleteSelect { queries, .. }
| Property::DropSelect { queries, .. } => {
| Property::DropSelect { queries, .. }
| Property::Queries { queries } => {
let mut temp_plan = InteractionPlan::new_with(
queries
.iter()
@@ -321,7 +389,7 @@ impl InteractionPlan {
}
let mut test_plan = plan.clone();
test_plan.plan.remove(i);
test_plan.remove(i);
if Self::test_shrunk_plan(&test_plan, failing_execution, old_result, env.clone()) {
plan = test_plan;