resolve conflict

This commit is contained in:
TcMits
2025-09-02 18:46:41 +07:00
18 changed files with 640 additions and 83 deletions

29
Cargo.lock generated
View File

@@ -1800,9 +1800,9 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.8.0"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3954d50fe15b02142bf25d3b8bdadb634ec3948f103d04ffe3031bc8fe9d7058"
checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661"
dependencies = [
"equivalent",
"hashbrown 0.15.2",
@@ -1822,7 +1822,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88"
dependencies = [
"ahash",
"indexmap 2.8.0",
"indexmap 2.10.0",
"is-terminal",
"itoa",
"log",
@@ -2192,10 +2192,12 @@ dependencies = [
"env_logger 0.10.2",
"garde",
"hex",
"indexmap 2.10.0",
"itertools 0.14.0",
"json5",
"log",
"notify",
"parking_lot",
"rand 0.9.2",
"rand_chacha 0.9.0",
"regex",
@@ -2265,9 +2267,9 @@ checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856"
[[package]]
name = "lock_api"
version = "0.4.12"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765"
dependencies = [
"autocfg",
"scopeguard",
@@ -2670,9 +2672,9 @@ dependencies = [
[[package]]
name = "parking_lot"
version = "0.12.3"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13"
dependencies = [
"lock_api",
"parking_lot_core",
@@ -2680,9 +2682,9 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.10"
version = "0.9.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5"
dependencies = [
"cfg-if",
"libc",
@@ -2772,7 +2774,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac26e981c03a6e53e0aee43c113e3202f5581d5360dae7bd2c70e800dd0451d"
dependencies = [
"base64",
"indexmap 2.8.0",
"indexmap 2.10.0",
"quick-xml 0.32.0",
"serde",
"time",
@@ -4062,7 +4064,7 @@ version = "0.8.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05ae329d1f08c4d17a59bed7ff5b5a769d062e64a62d34a3261b219e62cd5aae"
dependencies = [
"indexmap 2.8.0",
"indexmap 2.10.0",
"serde",
"serde_spanned",
"toml_datetime",
@@ -4084,7 +4086,7 @@ version = "0.22.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "310068873db2c5b3e7659d2cc35d21855dbafa50d1ce336397c666e3cb08137e"
dependencies = [
"indexmap 2.8.0",
"indexmap 2.10.0",
"serde",
"serde_spanned",
"toml_datetime",
@@ -4346,7 +4348,6 @@ dependencies = [
"strum_macros",
"thiserror 1.0.69",
"turso_macros",
"turso_sqlite3_parser",
]
[[package]]
@@ -4370,7 +4371,7 @@ dependencies = [
"cc",
"env_logger 0.11.7",
"fallible-iterator",
"indexmap 2.8.0",
"indexmap 2.10.0",
"log",
"memchr",
"miette",

View File

@@ -69,6 +69,7 @@ rand = "0.9.2"
tracing = "0.1.41"
schemars = "1.0.4"
garde = "0.22"
parking_lot = "0.12.4"
[profile.release]
debug = "line-tables-only"

View File

@@ -61,7 +61,7 @@ libm = "0.2"
turso_macros = { workspace = true }
miette = "7.6.0"
strum = { workspace = true }
parking_lot = "0.12.3"
parking_lot = { workspace = true }
crossbeam-skiplist = "0.1.3"
tracing = "0.1.41"
ryu = "1.0.19"

View File

@@ -2487,8 +2487,15 @@ impl IOCompletions {
match self {
IOCompletions::Single(c) => io.wait_for_completion(c),
IOCompletions::Many(completions) => {
for c in completions {
io.wait_for_completion(c)?;
let mut completions = completions.into_iter();
while let Some(c) = completions.next() {
let res = io.wait_for_completion(c);
if res.is_err() {
for c in completions {
c.abort();
}
return res;
}
}
Ok(())
}

View File

@@ -26,7 +26,6 @@ turso_macros = { workspace = true }
[dev-dependencies]
fallible-iterator = "0.3"
criterion = { version = "0.5", features = ["html_reports" ] }
turso_sqlite3_parser = { workspace = true }
[target.'cfg(not(target_family = "windows"))'.dev-dependencies]
pprof = { version = "0.14.0", features = ["criterion", "flamegraph"] }

View File

@@ -1,11 +1,6 @@
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use fallible_iterator::FallibleIterator;
use pprof::criterion::{Output, PProfProfiler};
use turso_parser::{lexer::Lexer, parser::Parser};
use turso_sqlite3_parser::lexer::{
sql::{Parser as OldParser, Tokenizer},
Scanner,
};
fn bench_parser(criterion: &mut Criterion) {
let queries = [
@@ -22,12 +17,6 @@ fn bench_parser(criterion: &mut Criterion) {
b.iter(|| Parser::new(black_box(qb)).next().unwrap());
});
group.bench_function(BenchmarkId::new("limbo_old_parser_query", ""), |b| {
b.iter(|| {
OldParser::new(black_box(qb)).next().unwrap().unwrap();
});
});
group.finish();
}
}
@@ -49,12 +38,6 @@ fn bench_parser_insert_batch(criterion: &mut Criterion) {
b.iter(|| Parser::new(black_box(qb)).next().unwrap());
});
group.bench_function(BenchmarkId::new("limbo_old_parser_insert_batch", ""), |b| {
b.iter(|| {
OldParser::new(black_box(qb)).next().unwrap().unwrap();
});
});
group.finish();
}
}
@@ -78,18 +61,6 @@ fn bench_lexer(criterion: &mut Criterion) {
});
});
group.bench_function(BenchmarkId::new("limbo_old_lexer_query", ""), |b| {
b.iter(|| {
let tokenizer = Tokenizer::new();
let mut scanner = Scanner::new(black_box(tokenizer));
loop {
if let (_, None, _) = scanner.scan(black_box(qb)).unwrap() {
break;
}
}
});
});
group.finish();
}
}

View File

@@ -2,7 +2,6 @@
cargo publish -p turso_macros
cargo publish -p turso_ext
cargo publish -p turso_sqlite3_parser
cargo publish -p turso_parser
cargo publish -p turso_core
cargo publish -p turso

View File

@@ -42,3 +42,5 @@ schemars = { workspace = true }
garde = { workspace = true, features = ["derive", "serde"] }
json5 = { version = "0.4.1" }
strum = { workspace = true }
parking_lot = { workspace = true }
indexmap = "2.10.0"

View File

@@ -21,10 +21,7 @@ use crate::{
SimulatorEnv,
generation::Shadow,
model::Query,
runner::{
env::{SimConnection, SimulationType, SimulatorTables},
io::SimulatorIO,
},
runner::env::{SimConnection, SimulationType, SimulatorTables},
};
use super::property::{Property, remaining};
@@ -452,7 +449,7 @@ impl Shadow for Interaction {
}
}
impl Interaction {
pub(crate) fn execute_query(&self, conn: &mut Arc<Connection>, _io: &SimulatorIO) -> ResultSet {
pub(crate) fn execute_query(&self, conn: &mut Arc<Connection>) -> ResultSet {
if let Self::Query(query) = self {
let query_str = query.to_string();
let rows = conn.query(&query_str);
@@ -611,13 +608,7 @@ impl Interaction {
out.push(r);
}
StepResult::IO => {
let syncing = {
let files = env.io.files.borrow();
// TODO: currently assuming we only have 1 file that is syncing
files
.iter()
.any(|file| file.sync_completion.borrow().is_some())
};
let syncing = env.io.syncing();
if syncing {
reopen_database(env);
} else {
@@ -666,12 +657,7 @@ impl Interaction {
let mut current_prob = 0.05;
let mut incr = 0.001;
loop {
let syncing = {
let files = env.io.files.borrow();
files
.iter()
.any(|file| file.sync_completion.borrow().is_some())
};
let syncing = env.io.syncing();
let inject_fault = env.rng.random_bool(current_prob);
// TODO: avoid for now injecting faults when syncing
if inject_fault && !syncing {
@@ -722,7 +708,7 @@ fn reopen_database(env: &mut SimulatorEnv) {
// Clear all open files
// TODO: for correct reporting of faults we should get all the recorded numbers and transfer to the new file
env.io.files.borrow_mut().clear();
env.io.close_files();
// 2. Re-open database
match env.type_ {

View File

@@ -130,6 +130,12 @@ pub struct SimulatorCLI {
default_value_t = false
)]
pub keep_files: bool,
#[clap(
long,
help = "Use memory IO for complex simulations",
default_value_t = false
)]
pub memory_io: bool,
#[clap(long, default_value_t = ProfileType::Default)]
/// Profile selector for Simulation run
pub profile: ProfileType,

View File

@@ -12,7 +12,9 @@ use sql_generation::model::table::Table;
use turso_core::Database;
use crate::profiles::Profile;
use crate::runner::SimIO;
use crate::runner::io::SimulatorIO;
use crate::runner::memory::io::MemorySimIO;
use super::cli::SimulatorCLI;
@@ -63,13 +65,14 @@ pub(crate) struct SimulatorEnv {
pub(crate) opts: SimulatorOpts,
pub profile: Profile,
pub(crate) connections: Vec<SimConnection>,
pub(crate) io: Arc<SimulatorIO>,
pub(crate) io: Arc<dyn SimIO>,
pub(crate) db: Option<Arc<Database>>,
pub(crate) rng: ChaCha8Rng,
pub(crate) paths: Paths,
pub(crate) type_: SimulationType,
pub(crate) phase: SimulationPhase,
pub(crate) tables: SimulatorTables,
pub memory_io: bool,
}
impl UnwindSafe for SimulatorEnv {}
@@ -88,6 +91,7 @@ impl SimulatorEnv {
paths: self.paths.clone(),
type_: self.type_,
phase: self.phase,
memory_io: self.memory_io,
profile: self.profile.clone(),
}
}
@@ -99,16 +103,26 @@ impl SimulatorEnv {
let latency_prof = &self.profile.io.latency;
let io = Arc::new(
SimulatorIO::new(
let io: Arc<dyn SimIO> = if self.memory_io {
Arc::new(MemorySimIO::new(
self.opts.seed,
self.opts.page_size,
latency_prof.latency_probability,
latency_prof.min_tick,
latency_prof.max_tick,
))
} else {
Arc::new(
SimulatorIO::new(
self.opts.seed,
self.opts.page_size,
latency_prof.latency_probability,
latency_prof.min_tick,
latency_prof.max_tick,
)
.unwrap(),
)
.unwrap(),
);
};
// Remove existing database file
let db_path = self.get_db_path();
@@ -282,16 +296,26 @@ impl SimulatorEnv {
let latency_prof = &profile.io.latency;
let io = Arc::new(
SimulatorIO::new(
let io: Arc<dyn SimIO> = if cli_opts.memory_io {
Arc::new(MemorySimIO::new(
seed,
opts.page_size,
latency_prof.latency_probability,
latency_prof.min_tick,
latency_prof.max_tick,
))
} else {
Arc::new(
SimulatorIO::new(
seed,
opts.page_size,
latency_prof.latency_probability,
latency_prof.min_tick,
latency_prof.max_tick,
)
.unwrap(),
)
.unwrap(),
);
};
let db = match Database::open_file(
io.clone(),
@@ -319,6 +343,7 @@ impl SimulatorEnv {
db: Some(db),
type_: simulation_type,
phase: SimulationPhase::Test,
memory_io: cli_opts.memory_io,
profile: profile.clone(),
}
}

View File

@@ -191,7 +191,7 @@ pub(crate) fn execute_interaction(
SimConnection::Disconnected => unreachable!(),
};
tracing::debug!(?interaction);
let results = interaction.execute_query(conn, &env.io);
let results = interaction.execute_query(conn);
if results.is_err() {
tracing::error!(?results);
}

View File

@@ -7,7 +7,7 @@ use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha8Rng;
use turso_core::{Clock, IO, Instant, OpenFlags, PlatformIO, Result};
use crate::runner::{clock::SimulatorClock, file::SimulatorFile};
use crate::runner::{SimIO, clock::SimulatorClock, file::SimulatorFile};
pub(crate) struct SimulatorIO {
pub(crate) inner: Box<dyn IO>,
@@ -48,15 +48,17 @@ impl SimulatorIO {
clock: Arc::new(clock),
})
}
}
pub(crate) fn inject_fault(&self, fault: bool) {
impl SimIO for SimulatorIO {
fn inject_fault(&self, fault: bool) {
self.fault.replace(fault);
for file in self.files.borrow().iter() {
file.inject_fault(fault);
}
}
pub(crate) fn print_stats(&self) {
fn print_stats(&self) {
for file in self.files.borrow().iter() {
tracing::info!(
"\n===========================\n\nPath: {}\n{}",
@@ -65,6 +67,18 @@ impl SimulatorIO {
);
}
}
fn syncing(&self) -> bool {
let files = self.files.borrow();
// TODO: currently assuming we only have 1 file that is syncing
files
.iter()
.any(|file| file.sync_completion.borrow().is_some())
}
fn close_files(&self) {
self.files.borrow_mut().clear()
}
}
impl Clock for SimulatorIO {

View File

@@ -0,0 +1,253 @@
use std::{
cell::{Cell, RefCell},
sync::Arc,
};
use rand::{Rng as _, SeedableRng};
use rand_chacha::ChaCha8Rng;
use tracing::{Level, instrument};
use turso_core::{Completion, File, Result};
use crate::runner::{
clock::SimulatorClock,
memory::io::{CallbackQueue, Fd, Operation, OperationType},
};
/// Tracks IO calls and faults for each type of I/O operation
#[derive(Debug, Default)]
struct IOTracker {
pread_calls: usize,
pread_faults: usize,
pwrite_calls: usize,
pwrite_faults: usize,
pwritev_calls: usize,
pwritev_faults: usize,
sync_calls: usize,
sync_faults: usize,
truncate_calls: usize,
truncate_faults: usize,
}
impl IOTracker {
fn total_calls(&self) -> usize {
self.pread_calls
+ self.pwrite_calls
+ self.pwritev_calls
+ self.sync_calls
+ self.truncate_calls
}
}
pub struct MemorySimFile {
// TODO: maybe have a pending queue which is fast to append
// and then we just do a mem swap the pending with the callback to minimize lock contention on callback queue
pub callbacks: CallbackQueue,
pub fd: Arc<Fd>,
pub buffer: RefCell<Vec<u8>>,
// TODO: add fault map later here
pub closed: Cell<bool>,
io_tracker: RefCell<IOTracker>,
pub rng: RefCell<ChaCha8Rng>,
pub latency_probability: usize,
clock: Arc<SimulatorClock>,
fault: Cell<bool>,
}
unsafe impl Send for MemorySimFile {}
unsafe impl Sync for MemorySimFile {}
impl MemorySimFile {
pub fn new(
callbacks: CallbackQueue,
fd: Fd,
seed: u64,
latency_probability: usize,
clock: Arc<SimulatorClock>,
) -> Self {
Self {
callbacks,
fd: Arc::new(fd),
buffer: RefCell::new(Vec::new()),
closed: Cell::new(false),
io_tracker: RefCell::new(IOTracker::default()),
rng: RefCell::new(ChaCha8Rng::seed_from_u64(seed)),
latency_probability,
clock,
fault: Cell::new(false),
}
}
pub fn inject_fault(&self, fault: bool) {
self.fault.set(fault);
}
pub fn stats_table(&self) -> String {
let io_tracker = self.io_tracker.borrow();
let sum_calls = io_tracker.total_calls();
let stats_table = [
"op calls faults ".to_string(),
"--------- -------- --------".to_string(),
format!(
"pread {:8} {:8}",
io_tracker.pread_calls, io_tracker.pread_faults
),
format!(
"pwrite {:8} {:8}",
io_tracker.pwrite_calls, io_tracker.pwrite_faults
),
format!(
"pwritev {:8} {:8}",
io_tracker.pwritev_calls, io_tracker.pwritev_faults
),
format!(
"sync {:8} {:8}",
io_tracker.sync_calls, io_tracker.sync_faults
),
format!(
"truncate {:8} {:8}",
io_tracker.truncate_calls, io_tracker.truncate_faults
),
"--------- -------- --------".to_string(),
format!("total {sum_calls:8}"),
];
stats_table.join("\n")
}
#[instrument(skip_all, level = Level::TRACE)]
fn generate_latency(&self) -> Option<turso_core::Instant> {
let mut rng = self.rng.borrow_mut();
// Chance to introduce some latency
rng.random_bool(self.latency_probability as f64 / 100.0)
.then(|| {
let now = self.clock.now();
let sum = now + std::time::Duration::from_millis(rng.random_range(5..20));
sum.into()
})
}
fn insert_op(&self, op: OperationType) {
// FIXME: currently avoid any fsync faults until we correctly define the expected behaviour in the simulator
let fault = self.fault.get() && !matches!(op, OperationType::Sync { .. });
if fault {
let mut io_tracker = self.io_tracker.borrow_mut();
match &op {
OperationType::Read { .. } => io_tracker.pread_faults += 1,
OperationType::Write { .. } => io_tracker.pwrite_faults += 1,
OperationType::WriteV { .. } => io_tracker.pwritev_faults += 1,
OperationType::Sync { .. } => io_tracker.sync_faults += 1,
OperationType::Truncate { .. } => io_tracker.truncate_faults += 1,
}
}
self.callbacks.lock().push(Operation {
time: self.generate_latency(),
op,
fault,
fd: self.fd.clone(),
});
}
pub fn write_buf(&self, buf: &[u8], offset: usize) -> usize {
let mut file_buf = self.buffer.borrow_mut();
let more_space = if file_buf.len() < offset {
(offset + buf.len()) - file_buf.len()
} else {
buf.len().saturating_sub(file_buf.len() - offset)
};
if more_space > 0 {
file_buf.reserve(more_space);
for _ in 0..more_space {
file_buf.push(0);
}
}
file_buf[offset..][0..buf.len()].copy_from_slice(buf);
buf.len()
}
}
impl File for MemorySimFile {
fn lock_file(&self, _exclusive: bool) -> Result<()> {
Ok(())
}
fn unlock_file(&self) -> Result<()> {
Ok(())
}
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
self.io_tracker.borrow_mut().pread_calls += 1;
let op = OperationType::Read {
completion: c.clone(),
offset: pos as usize,
};
self.insert_op(op);
Ok(c)
}
fn pwrite(
&self,
pos: u64,
buffer: Arc<turso_core::Buffer>,
c: Completion,
) -> Result<Completion> {
self.io_tracker.borrow_mut().pwrite_calls += 1;
let op = OperationType::Write {
buffer,
completion: c.clone(),
offset: pos as usize,
};
self.insert_op(op);
Ok(c)
}
fn pwritev(
&self,
pos: u64,
buffers: Vec<Arc<turso_core::Buffer>>,
c: Completion,
) -> Result<Completion> {
if buffers.len() == 1 {
return self.pwrite(pos, buffers[0].clone(), c);
}
self.io_tracker.borrow_mut().pwritev_calls += 1;
let op = OperationType::WriteV {
buffers,
completion: c.clone(),
offset: pos as usize,
};
self.insert_op(op);
Ok(c)
}
fn sync(&self, c: Completion) -> Result<Completion> {
self.io_tracker.borrow_mut().sync_calls += 1;
let op = OperationType::Sync {
completion: c.clone(),
};
self.insert_op(op);
Ok(c)
}
fn size(&self) -> Result<u64> {
// TODO: size operation should also be scheduled. But this requires a change in how we
// Use this function internally in Turso
Ok(self.buffer.borrow().len() as u64)
}
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
self.io_tracker.borrow_mut().truncate_calls += 1;
let op = OperationType::Truncate {
completion: c.clone(),
len: len as usize,
};
self.insert_op(op);
Ok(c)
}
}

View File

@@ -0,0 +1,270 @@
use std::cell::{Cell, RefCell};
use std::sync::Arc;
use indexmap::IndexMap;
use parking_lot::Mutex;
use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha8Rng;
use turso_core::{Clock, Completion, IO, Instant, OpenFlags, Result};
use crate::runner::SimIO;
use crate::runner::clock::SimulatorClock;
use crate::runner::memory::file::MemorySimFile;
/// File descriptor
pub type Fd = String;
pub enum OperationType {
Read {
completion: Completion,
offset: usize,
},
Write {
buffer: Arc<turso_core::Buffer>,
completion: Completion,
offset: usize,
},
WriteV {
buffers: Vec<Arc<turso_core::Buffer>>,
completion: Completion,
offset: usize,
},
Sync {
completion: Completion,
},
Truncate {
completion: Completion,
len: usize,
},
}
impl OperationType {
fn get_completion(&self) -> &Completion {
match self {
OperationType::Read { completion, .. }
| OperationType::Write { completion, .. }
| OperationType::WriteV { completion, .. }
| OperationType::Sync { completion, .. }
| OperationType::Truncate { completion, .. } => completion,
}
}
}
pub struct Operation {
pub time: Option<turso_core::Instant>,
pub op: OperationType,
pub fault: bool,
pub fd: Arc<Fd>,
}
impl Operation {
fn do_operation(self, files: &IndexMap<Fd, Arc<MemorySimFile>>) {
let fd = self.fd;
match self.op {
OperationType::Read { completion, offset } => {
let file = files.get(fd.as_str()).unwrap();
let file_buf = file.buffer.borrow_mut();
let buffer = completion.as_read().buf.clone();
let buf_size = {
let buf = buffer.as_mut_slice();
// TODO: check for sector faults here
buf.copy_from_slice(&file_buf[offset..][0..buf.len()]);
buf.len() as i32
};
completion.complete(buf_size);
}
OperationType::Write {
buffer,
completion,
offset,
} => {
let file = files.get(fd.as_str()).unwrap();
let buf_size = file.write_buf(buffer.as_slice(), offset);
completion.complete(buf_size as i32);
}
OperationType::WriteV {
buffers,
completion,
offset,
} => {
if buffers.is_empty() {
return;
}
let file = files.get(fd.as_str()).unwrap();
let mut pos = offset;
let written = buffers.into_iter().fold(0, |written, buffer| {
let buf_size = file.write_buf(buffer.as_slice(), pos);
pos += buf_size;
written + buf_size
});
completion.complete(written as i32);
}
OperationType::Sync { completion, .. } => {
// There is no Sync for in memory
completion.complete(0);
}
OperationType::Truncate { completion, len } => {
let file = files.get(fd.as_str()).unwrap();
let mut file_buf = file.buffer.borrow_mut();
file_buf.truncate(len);
completion.complete(0);
}
}
}
}
pub type CallbackQueue = Arc<Mutex<Vec<Operation>>>;
pub struct MemorySimIO {
callbacks: CallbackQueue,
timeouts: CallbackQueue,
pub files: RefCell<IndexMap<Fd, Arc<MemorySimFile>>>,
pub rng: RefCell<ChaCha8Rng>,
pub nr_run_once_faults: Cell<usize>,
pub page_size: usize,
seed: u64,
latency_probability: usize,
clock: Arc<SimulatorClock>,
}
unsafe impl Send for MemorySimIO {}
unsafe impl Sync for MemorySimIO {}
impl MemorySimIO {
pub fn new(
seed: u64,
page_size: usize,
latency_probability: usize,
min_tick: u64,
max_tick: u64,
) -> Self {
let files = RefCell::new(IndexMap::new());
let rng = RefCell::new(ChaCha8Rng::seed_from_u64(seed));
let nr_run_once_faults = Cell::new(0);
Self {
callbacks: Arc::new(Mutex::new(Vec::new())),
timeouts: Arc::new(Mutex::new(Vec::new())),
files,
rng,
nr_run_once_faults,
page_size,
seed,
latency_probability,
clock: Arc::new(SimulatorClock::new(
ChaCha8Rng::seed_from_u64(seed),
min_tick,
max_tick,
)),
}
}
}
impl SimIO for MemorySimIO {
fn inject_fault(&self, fault: bool) {
for file in self.files.borrow().values() {
file.inject_fault(fault);
}
if fault {
tracing::debug!("fault injected");
}
}
fn print_stats(&self) {
for (path, file) in self.files.borrow().iter() {
tracing::info!(
"\n===========================\n\nPath: {}\n{}",
path,
file.stats_table()
);
}
}
fn syncing(&self) -> bool {
let callbacks = self.callbacks.try_lock().unwrap();
callbacks
.iter()
.any(|operation| matches!(operation.op, OperationType::Sync { .. }))
}
fn close_files(&self) {
for file in self.files.borrow().values() {
file.closed.set(true);
}
}
}
impl Clock for MemorySimIO {
fn now(&self) -> Instant {
self.clock.now().into()
}
}
impl IO for MemorySimIO {
fn open_file(
&self,
path: &str,
_flags: OpenFlags, // TODO: ignoring open flags for now as we don't test read only mode in the simulator yet
_direct: bool,
) -> Result<Arc<dyn turso_core::File>> {
let mut files = self.files.borrow_mut();
let fd = path.to_string();
let file = if let Some(file) = files.get(path) {
file.closed.set(false);
file.clone()
} else {
let file = Arc::new(MemorySimFile::new(
self.callbacks.clone(),
fd.clone(),
self.seed,
self.latency_probability,
self.clock.clone(),
));
files.insert(fd, file.clone());
file
};
Ok(file)
}
fn run_once(&self) -> Result<()> {
let mut callbacks = self.callbacks.lock();
let mut timeouts = self.timeouts.lock();
tracing::trace!(
callbacks.len = callbacks.len(),
timeouts.len = timeouts.len()
);
let files = self.files.borrow_mut();
let now = self.now();
callbacks.append(&mut timeouts);
while let Some(callback) = callbacks.pop() {
let completion = callback.op.get_completion();
if completion.finished() {
continue;
}
if callback.time.is_none() || callback.time.is_some_and(|time| time < now) {
if callback.fault {
// Inject the fault by aborting the completion
completion.abort();
continue;
}
callback.do_operation(&files);
} else {
timeouts.push(callback);
}
}
Ok(())
}
fn generate_random_number(&self) -> i64 {
self.rng.borrow_mut().next_u64() as i64
}
fn remove_file(&self, path: &str) -> Result<()> {
self.files.borrow_mut().shift_remove(path);
Ok(())
}
}

View File

@@ -0,0 +1,2 @@
pub mod file;
pub mod io;

View File

@@ -8,6 +8,17 @@ pub mod execution;
#[allow(dead_code)]
pub mod file;
pub mod io;
pub mod memory;
pub mod watch;
pub const FAULT_ERROR_MSG: &str = "Injected Fault";
pub trait SimIO: turso_core::IO {
fn inject_fault(&self, fault: bool);
fn print_stats(&self);
fn syncing(&self) -> bool;
fn close_files(&self);
}

View File

@@ -56,6 +56,12 @@ impl InteractionPlan {
// Remove all properties that do not use the failing tables
plan.plan.retain_mut(|interactions| {
let retain = if idx == failing_execution.interaction_index {
if let Interactions::Property(
Property::FsyncNoWait { tables, .. } | Property::FaultyQuery { tables, .. },
) = interactions
{
tables.retain(|table| depending_tables.contains(table));
}
true
} else {
let mut has_table = interactions
@@ -73,9 +79,13 @@ impl InteractionPlan {
| Property::DropSelect { queries, .. } => {
queries.clear();
}
Property::FsyncNoWait { tables, .. }
| Property::FaultyQuery { tables, .. } => {
tables.retain(|table| depending_tables.contains(table));
Property::FsyncNoWait { tables, query }
| Property::FaultyQuery { tables, query } => {
if !query.uses().iter().any(|t| depending_tables.contains(t)) {
tables.clear();
} else {
tables.retain(|table| depending_tables.contains(table));
}
}
Property::SelectLimit { .. }
| Property::SelectSelectOptimizer { .. }