mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-25 19:04:26 +01:00
Merge 'FaultyQuery enabled by default' from Pedro Muniz
FaultyQuery was disabled. We were not simulating faults. Reviewed-by: Avinash Sajjanshetty (@avinassh) Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com> Closes #2594
This commit is contained in:
@@ -410,7 +410,7 @@ pub struct Pager {
|
||||
|
||||
commit_info: RefCell<CommitInfo>,
|
||||
checkpoint_state: RefCell<CheckpointState>,
|
||||
syncing: Rc<RefCell<bool>>,
|
||||
syncing: Rc<Cell<bool>>,
|
||||
auto_vacuum_mode: RefCell<AutoVacuumMode>,
|
||||
/// 0 -> Database is empty,
|
||||
/// 1 -> Database is being initialized,
|
||||
@@ -521,7 +521,7 @@ impl Pager {
|
||||
commit_info: RefCell::new(CommitInfo {
|
||||
state: CommitState::Start,
|
||||
}),
|
||||
syncing: Rc::new(RefCell::new(false)),
|
||||
syncing: Rc::new(Cell::new(false)),
|
||||
checkpoint_state: RefCell::new(CheckpointState::Checkpoint),
|
||||
buffer_pool,
|
||||
auto_vacuum_mode: RefCell::new(AutoVacuumMode::None),
|
||||
@@ -1234,6 +1234,7 @@ impl Pager {
|
||||
io_yield_one!(c);
|
||||
}
|
||||
CommitState::AfterSyncWal => {
|
||||
turso_assert!(!wal.borrow().is_syncing(), "wal should have synced");
|
||||
if wal_auto_checkpoint_disabled || !wal.borrow().should_checkpoint() {
|
||||
self.commit_info.borrow_mut().state = CommitState::Start;
|
||||
break PagerCommitResult::WalWritten;
|
||||
@@ -1250,7 +1251,7 @@ impl Pager {
|
||||
io_yield_one!(c);
|
||||
}
|
||||
CommitState::AfterSyncDbFile => {
|
||||
turso_assert!(!*self.syncing.borrow(), "should have finished syncing");
|
||||
turso_assert!(!self.syncing.get(), "should have finished syncing");
|
||||
self.commit_info.borrow_mut().state = CommitState::Start;
|
||||
break PagerCommitResult::Checkpointed(checkpoint_result);
|
||||
}
|
||||
@@ -1342,7 +1343,7 @@ impl Pager {
|
||||
io_yield_one!(c);
|
||||
}
|
||||
CheckpointState::CheckpointDone { res } => {
|
||||
turso_assert!(!*self.syncing.borrow(), "syncing should be done");
|
||||
turso_assert!(!self.syncing.get(), "syncing should be done");
|
||||
self.checkpoint_state.replace(CheckpointState::Checkpoint);
|
||||
return Ok(IOResult::Done(res));
|
||||
}
|
||||
@@ -1929,6 +1930,16 @@ impl Pager {
|
||||
state: CommitState::Start,
|
||||
});
|
||||
self.allocate_page_state.replace(AllocatePageState::Start);
|
||||
self.free_page_state.replace(FreePageState::Start);
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
{
|
||||
self.ptrmap_get_state.replace(PtrMapGetState::Start);
|
||||
self.ptrmap_put_state.replace(PtrMapPutState::Start);
|
||||
self.btree_create_vacuum_full_state
|
||||
.replace(BtreeCreateVacuumFullState::Start);
|
||||
}
|
||||
|
||||
self.header_ref_state.replace(HeaderRefState::Start);
|
||||
}
|
||||
|
||||
pub fn with_header<T>(&self, f: impl Fn(&DatabaseHeader) -> T) -> Result<IOResult<T>> {
|
||||
|
||||
@@ -63,7 +63,7 @@ use crate::storage::pager::Pager;
|
||||
use crate::storage::wal::{PendingFlush, READMARK_NOT_USED};
|
||||
use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype};
|
||||
use crate::{bail_corrupt_error, turso_assert, File, Result, WalFileShared};
|
||||
use std::cell::{RefCell, UnsafeCell};
|
||||
use std::cell::{Cell, UnsafeCell};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::mem::MaybeUninit;
|
||||
use std::pin::Pin;
|
||||
@@ -1055,12 +1055,12 @@ pub fn write_pages_vectored(
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn begin_sync(
|
||||
db_file: Arc<dyn DatabaseStorage>,
|
||||
syncing: Rc<RefCell<bool>>,
|
||||
syncing: Rc<Cell<bool>>,
|
||||
) -> Result<Completion> {
|
||||
assert!(!*syncing.borrow());
|
||||
*syncing.borrow_mut() = true;
|
||||
assert!(!syncing.get());
|
||||
syncing.set(true);
|
||||
let completion = Completion::new_sync(move |_| {
|
||||
*syncing.borrow_mut() = false;
|
||||
syncing.set(false);
|
||||
});
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
db_file.sync(completion)
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use strum::EnumString;
|
||||
use tracing::{instrument, Level};
|
||||
|
||||
use std::fmt::Formatter;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::{cell::Cell, fmt, rc::Rc, sync::Arc};
|
||||
|
||||
@@ -213,7 +213,7 @@ impl TursoRwLock {
|
||||
}
|
||||
|
||||
/// Write-ahead log (WAL).
|
||||
pub trait Wal {
|
||||
pub trait Wal: Debug {
|
||||
/// Begin a read transaction.
|
||||
fn begin_read_tx(&mut self) -> Result<(LimboResult, bool)>;
|
||||
|
||||
@@ -277,6 +277,7 @@ pub trait Wal {
|
||||
mode: CheckpointMode,
|
||||
) -> Result<IOResult<CheckpointResult>>;
|
||||
fn sync(&mut self) -> Result<Completion>;
|
||||
fn is_syncing(&self) -> bool;
|
||||
fn get_max_frame_in_wal(&self) -> u64;
|
||||
fn get_max_frame(&self) -> u64;
|
||||
fn get_min_frame(&self) -> u64;
|
||||
@@ -1122,11 +1123,16 @@ impl Wal for WalFile {
|
||||
syncing.set(false);
|
||||
});
|
||||
let shared = self.get_shared();
|
||||
let c = shared.file.sync(completion)?;
|
||||
self.syncing.set(true);
|
||||
let c = shared.file.sync(completion)?;
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
// Currently used for assertion purposes
|
||||
fn is_syncing(&self) -> bool {
|
||||
self.syncing.get()
|
||||
}
|
||||
|
||||
fn get_max_frame_in_wal(&self) -> u64 {
|
||||
self.get_shared().max_frame.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
@@ -123,11 +123,12 @@ impl Shadow for Interactions {
|
||||
let mut is_error = false;
|
||||
for interaction in property.interactions() {
|
||||
match interaction {
|
||||
Interaction::Query(query)
|
||||
| Interaction::FsyncQuery(query)
|
||||
| Interaction::FaultyQuery(query) => {
|
||||
is_error = is_error || query.shadow(tables).is_err();
|
||||
Interaction::Query(query) | Interaction::FsyncQuery(query) => {
|
||||
if query.shadow(tables).is_err() {
|
||||
is_error = true;
|
||||
}
|
||||
}
|
||||
Interaction::FaultyQuery(..) => {}
|
||||
Interaction::Assertion(_) => {}
|
||||
Interaction::Assumption(_) => {}
|
||||
Interaction::Fault(_) => {}
|
||||
@@ -653,13 +654,13 @@ impl Interaction {
|
||||
loop {
|
||||
let syncing = {
|
||||
let files = env.io.files.borrow();
|
||||
// TODO: currently assuming we only have 1 file that is syncing
|
||||
files
|
||||
.iter()
|
||||
.any(|file| file.sync_completion.borrow().is_some())
|
||||
};
|
||||
let inject_fault = env.rng.gen_bool(current_prob);
|
||||
if inject_fault || syncing {
|
||||
// TODO: avoid for now injecting faults when syncing
|
||||
if inject_fault && !syncing {
|
||||
env.io.inject_fault(true);
|
||||
}
|
||||
|
||||
|
||||
@@ -80,7 +80,6 @@ pub(crate) enum Property {
|
||||
/// ASSERT <expected_content>
|
||||
TableHasExpectedContent {
|
||||
table: String,
|
||||
expected_content: Vec<Vec<SimValue>>,
|
||||
},
|
||||
/// Double Create Failure is a property in which creating
|
||||
/// the same table twice leads to an error.
|
||||
@@ -224,13 +223,9 @@ impl Property {
|
||||
/// and `interaction` cannot be serialized directly.
|
||||
pub(crate) fn interactions(&self) -> Vec<Interaction> {
|
||||
match self {
|
||||
Property::TableHasExpectedContent {
|
||||
table,
|
||||
expected_content,
|
||||
} => {
|
||||
Property::TableHasExpectedContent { table } => {
|
||||
let table = table.to_string();
|
||||
let table_name = table.clone();
|
||||
let expected_content = expected_content.clone();
|
||||
let assumption = Interaction::Assumption(Assertion {
|
||||
name: format!("table {} exists", table.clone()),
|
||||
func: Box::new(move |_: &Vec<ResultSet>, env: &mut SimulatorEnv| {
|
||||
@@ -249,20 +244,25 @@ impl Property {
|
||||
|
||||
let assertion = Interaction::Assertion(Assertion {
|
||||
name: format!("table {} should have the expected content", table.clone()),
|
||||
func: Box::new(move |stack: &Vec<ResultSet>, _| {
|
||||
func: Box::new(move |stack: &Vec<ResultSet>, env| {
|
||||
let rows = stack.last().unwrap();
|
||||
let Ok(rows) = rows else {
|
||||
return Ok(Err(format!("expected rows but got error: {rows:?}")));
|
||||
};
|
||||
if rows.len() != expected_content.len() {
|
||||
let sim_table = env
|
||||
.tables
|
||||
.iter()
|
||||
.find(|t| t.name == table)
|
||||
.expect("table should be in enviroment");
|
||||
if rows.len() != sim_table.rows.len() {
|
||||
return Ok(Err(format!(
|
||||
"expected {} rows but got {} for table {}",
|
||||
expected_content.len(),
|
||||
sim_table.rows.len(),
|
||||
rows.len(),
|
||||
table.clone()
|
||||
)));
|
||||
}
|
||||
for expected_row in expected_content.iter() {
|
||||
for expected_row in sim_table.rows.iter() {
|
||||
if !rows.contains(expected_row) {
|
||||
return Ok(Err(format!(
|
||||
"expected row {:?} not found in table {}",
|
||||
@@ -1188,11 +1188,8 @@ fn property_read_your_updates_back<R: rand::Rng>(rng: &mut R, env: &SimulatorEnv
|
||||
fn property_table_has_expected_content<R: rand::Rng>(rng: &mut R, env: &SimulatorEnv) -> Property {
|
||||
// Get a random table
|
||||
let table = pick(&env.tables, rng);
|
||||
// Generate rows to insert
|
||||
let rows = table.rows.clone();
|
||||
Property::TableHasExpectedContent {
|
||||
table: table.name.clone(),
|
||||
expected_content: rows,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1509,7 +1506,7 @@ impl ArbitraryFrom<(&SimulatorEnv, &InteractionStats)> for Property {
|
||||
Box::new(|rng: &mut R| property_fsync_no_wait(rng, env, &remaining_)),
|
||||
),
|
||||
(
|
||||
if env.opts.enable_faulty_query {
|
||||
if !env.opts.disable_faulty_query {
|
||||
20.0
|
||||
} else {
|
||||
0.0
|
||||
|
||||
@@ -103,8 +103,8 @@ pub struct SimulatorCLI {
|
||||
pub disable_union_all_preserves_cardinality: bool,
|
||||
#[clap(long, help = "disable FsyncNoWait Property", default_value_t = true)]
|
||||
pub disable_fsync_no_wait: bool,
|
||||
#[clap(long, help = "enable FaultyQuery Property", default_value_t = false)]
|
||||
pub enable_faulty_query: bool,
|
||||
#[clap(long, help = "disable FaultyQuery Property", default_value_t = false)]
|
||||
pub disable_faulty_query: bool,
|
||||
#[clap(long, help = "disable Reopen-Database fault", default_value_t = false)]
|
||||
pub disable_reopen_database: bool,
|
||||
#[clap(
|
||||
|
||||
@@ -241,7 +241,7 @@ impl SimulatorEnv {
|
||||
disable_union_all_preserves_cardinality: cli_opts
|
||||
.disable_union_all_preserves_cardinality,
|
||||
disable_fsync_no_wait: cli_opts.disable_fsync_no_wait,
|
||||
enable_faulty_query: cli_opts.enable_faulty_query,
|
||||
disable_faulty_query: cli_opts.disable_faulty_query,
|
||||
page_size: 4096, // TODO: randomize this too
|
||||
max_interactions: rng.gen_range(cli_opts.minimum_tests..=cli_opts.maximum_tests),
|
||||
max_time_simulation: cli_opts.maximum_time,
|
||||
@@ -414,7 +414,7 @@ pub(crate) struct SimulatorOpts {
|
||||
pub(crate) disable_where_true_false_null: bool,
|
||||
pub(crate) disable_union_all_preserves_cardinality: bool,
|
||||
pub(crate) disable_fsync_no_wait: bool,
|
||||
pub(crate) enable_faulty_query: bool,
|
||||
pub(crate) disable_faulty_query: bool,
|
||||
pub(crate) disable_reopen_database: bool,
|
||||
|
||||
pub(crate) max_interactions: usize,
|
||||
|
||||
@@ -192,7 +192,9 @@ pub(crate) fn execute_interaction(
|
||||
};
|
||||
tracing::debug!(?interaction);
|
||||
let results = interaction.execute_query(conn, &env.io);
|
||||
tracing::debug!(?results);
|
||||
if results.is_err() {
|
||||
tracing::error!(?results);
|
||||
}
|
||||
stack.push(results);
|
||||
limbo_integrity_check(conn)?;
|
||||
}
|
||||
@@ -204,7 +206,9 @@ pub(crate) fn execute_interaction(
|
||||
};
|
||||
|
||||
let results = interaction.execute_fsync_query(conn.clone(), env);
|
||||
tracing::debug!(?results);
|
||||
if results.is_err() {
|
||||
tracing::error!(?results);
|
||||
}
|
||||
stack.push(results);
|
||||
|
||||
let query_interaction = Interaction::Query(query.clone());
|
||||
@@ -235,7 +239,9 @@ pub(crate) fn execute_interaction(
|
||||
};
|
||||
|
||||
let results = interaction.execute_faulty_query(&conn, env);
|
||||
tracing::debug!(?results);
|
||||
if results.is_err() {
|
||||
tracing::error!(?results);
|
||||
}
|
||||
stack.push(results);
|
||||
// Reset fault injection
|
||||
env.io.inject_fault(false);
|
||||
|
||||
@@ -11,6 +11,7 @@ use turso_core::{File, Result};
|
||||
|
||||
use crate::{model::FAULT_ERROR_MSG, runner::clock::SimulatorClock};
|
||||
pub(crate) struct SimulatorFile {
|
||||
pub path: String,
|
||||
pub(crate) inner: Arc<dyn File>,
|
||||
pub(crate) fault: Cell<bool>,
|
||||
|
||||
|
||||
@@ -65,7 +65,11 @@ impl SimulatorIO {
|
||||
pub(crate) fn print_stats(&self) {
|
||||
tracing::info!("run_once faults: {}", self.nr_run_once_faults.get());
|
||||
for file in self.files.borrow().iter() {
|
||||
tracing::info!("\n===========================\n{}", file.stats_table());
|
||||
tracing::info!(
|
||||
"\n===========================\n\nPath: {}\n{}",
|
||||
file.path,
|
||||
file.stats_table()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -85,6 +89,7 @@ impl IO for SimulatorIO {
|
||||
) -> Result<Arc<dyn turso_core::File>> {
|
||||
let inner = self.inner.open_file(path, flags, false)?;
|
||||
let file = Arc::new(SimulatorFile {
|
||||
path: path.to_string(),
|
||||
inner,
|
||||
fault: Cell::new(false),
|
||||
nr_pread_faults: Cell::new(0),
|
||||
|
||||
@@ -73,14 +73,16 @@ impl InteractionPlan {
|
||||
| Property::DropSelect { queries, .. } => {
|
||||
queries.clear();
|
||||
}
|
||||
Property::FsyncNoWait { tables, .. }
|
||||
| Property::FaultyQuery { tables, .. } => {
|
||||
tables.retain(|table| depending_tables.contains(table));
|
||||
}
|
||||
Property::SelectLimit { .. }
|
||||
| Property::SelectSelectOptimizer { .. }
|
||||
| Property::WhereTrueFalseNull { .. }
|
||||
| Property::UNIONAllPreservesCardinality { .. }
|
||||
| Property::FsyncNoWait { .. }
|
||||
| Property::ReadYourUpdatesBack { .. }
|
||||
| Property::TableHasExpectedContent { .. }
|
||||
| Property::FaultyQuery { .. } => {}
|
||||
| Property::TableHasExpectedContent { .. } => {}
|
||||
}
|
||||
}
|
||||
// Check again after query clear if the interactions still uses the failing table
|
||||
|
||||
Reference in New Issue
Block a user