make transactions truly concurrent with mvcc

This commit is contained in:
pedrocarlo
2025-09-23 16:17:14 -03:00
parent d070c1c184
commit f2d29ffaaf
6 changed files with 63 additions and 24 deletions

7
Cargo.lock generated
View File

@@ -321,6 +321,12 @@ dependencies = [
"serde",
]
[[package]]
name = "bitmaps"
version = "3.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d084b0137aaa901caf9f1e8b21daa6aa24d41cd806e111335541eff9683bd6"
[[package]]
name = "blake3"
version = "1.7.0"
@@ -2332,6 +2338,7 @@ name = "limbo_sim"
version = "0.2.0-pre.10"
dependencies = [
"anyhow",
"bitmaps",
"chrono",
"clap",
"dirs 6.0.0",

View File

@@ -100,6 +100,7 @@ regex = "1.11.1"
regex-syntax = { version = "0.8.5", default-features = false }
similar = { version = "2.7.0" }
similar-asserts = { version = "1.7.0" }
bitmaps = { version = "3.2.1", default-features = false }
[profile.dev.package.similar]
opt-level = 3

View File

@@ -47,3 +47,4 @@ indexmap = { workspace = true }
either = "1.15.0"
similar = { workspace = true }
similar-asserts = { workspace = true }
bitmaps = { workspace = true }

View File

@@ -57,25 +57,6 @@ impl InteractionPlan {
&self.plan
}
// TODO: this is just simplified logic so we can get something rolling with begin concurrent
// transactions in the simulator. Ideally when we generate the plan we will have begin and commits statements across interactions
pub fn push(&mut self, interactions: Interactions) {
if self.mvcc {
let conn_index = interactions.connection_index;
let begin = Interactions::new(
conn_index,
InteractionsType::Query(Query::Begin(Begin::Concurrent)),
);
let commit =
Interactions::new(conn_index, InteractionsType::Query(Query::Commit(Commit)));
self.plan.push(begin);
self.plan.push(interactions);
self.plan.push(commit);
} else {
self.plan.push(interactions);
}
}
/// Compute via diff computes a a plan from a given `.plan` file without the need to parse
/// sql. This is possible because there are two versions of the plan file, one that is human
/// readable and one that is serialized as JSON. Under watch mode, the users will be able to
@@ -234,16 +215,32 @@ impl InteractionPlan {
) -> Option<Vec<Interaction>> {
let num_interactions = env.opts.max_interactions as usize;
if self.len() < num_interactions {
tracing::debug!("Generating interaction {}/{}", self.len(), num_interactions);
let interactions = {
let conn_index = env.choose_conn(rng);
let conn_index = env.choose_conn(rng);
dbg!(self.mvcc, env.conn_in_transaction(conn_index));
let interactions = if self.mvcc && !env.conn_in_transaction(conn_index) {
let query = Query::Begin(Begin::Concurrent);
env.update_conn_last_interaction(conn_index, Some(&query));
Interactions::new(conn_index, InteractionsType::Query(query))
} else if self.mvcc
&& env.conn_in_transaction(conn_index)
&& env.has_conn_executed_query_after_transaction(conn_index)
&& rng.random_bool(0.4)
{
let query = Query::Commit(Commit);
env.update_conn_last_interaction(conn_index, Some(&query));
Interactions::new(conn_index, InteractionsType::Query(query))
} else {
env.update_conn_last_interaction(conn_index, None);
let conn_ctx = &env.connection_context(conn_index);
Interactions::arbitrary_from(rng, conn_ctx, (env, self.stats(), conn_index))
};
tracing::debug!("Generating interaction {}/{}", self.len(), num_interactions);
let out_interactions = interactions.interactions();
assert!(!out_interactions.is_empty());
self.push(interactions);
self.plan.push(interactions);
Some(out_interactions)
} else {
None

View File

@@ -28,7 +28,7 @@ pub struct Profile {
#[garde(skip)]
/// Experimental MVCC feature
pub experimental_mvcc: bool,
#[garde(range(min = 1))]
#[garde(range(min = 1, max = 64))]
pub max_connections: usize,
#[garde(dive)]
pub io: IOProfile,

View File

@@ -5,6 +5,7 @@ use std::panic::UnwindSafe;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use bitmaps::Bitmap;
use garde::Validate;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
@@ -12,6 +13,7 @@ use sql_generation::generation::GenerationContext;
use sql_generation::model::table::Table;
use turso_core::Database;
use crate::model::Query;
use crate::profiles::Profile;
use crate::runner::SimIO;
use crate::runner::io::SimulatorIO;
@@ -151,6 +153,12 @@ pub(crate) struct SimulatorEnv {
/// If connection state is None, means we are not in a transaction
pub connection_tables: Vec<Option<Vec<Table>>>,
/// Bit map indicating whether a connection has executed a query that is not transaction related
///
/// E.g Select, Insert, Create
/// and not Begin, Commit, Rollback \
/// Has max size of 64 to accomodate 64 connections
connection_last_query: Bitmap<64>,
// Table data that is committed into the database or wal
pub committed_tables: Vec<Table>,
}
@@ -175,6 +183,7 @@ impl SimulatorEnv {
.collect(),
// TODO: not sure if connection_tables should be recreated instead
connection_tables: self.connection_tables.clone(),
connection_last_query: self.connection_last_query,
committed_tables: self.committed_tables.clone(),
}
}
@@ -393,6 +402,7 @@ impl SimulatorEnv {
profile: profile.clone(),
committed_tables: Vec::new(),
connection_tables: vec![None; profile.max_connections],
connection_last_query: Bitmap::new(),
}
}
@@ -435,6 +445,7 @@ impl SimulatorEnv {
t.clear();
}
});
self.connection_last_query = Bitmap::new();
}
// TODO: does not yet create the appropriate context to avoid WriteWriteConflitcs
@@ -462,6 +473,28 @@ impl SimulatorEnv {
}
}
pub fn conn_in_transaction(&self, conn_index: usize) -> bool {
self.connection_tables
.get(conn_index)
.is_some_and(|t| t.is_some())
}
pub fn has_conn_executed_query_after_transaction(&self, conn_index: usize) -> bool {
self.connection_last_query.get(conn_index)
}
pub fn update_conn_last_interaction(&mut self, conn_index: usize, query: Option<&Query>) {
// If the conn will execute a transaction statement then we set the bitmap to false
// to indicate we have not executed any queries yet after the transaction begun
let value = query.is_some_and(|query| {
matches!(
query,
Query::Begin(..) | Query::Commit(..) | Query::Rollback(..)
)
});
self.connection_last_query.set(conn_index, value);
}
pub fn get_conn_tables<'a>(&'a self, conn_index: usize) -> ShadowTables<'a> {
ShadowTables {
transaction_tables: self.connection_tables.get(conn_index).unwrap().as_ref(),