diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 2010f626a..f87faf9d2 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -410,7 +410,7 @@ pub struct Pager { commit_info: RefCell, checkpoint_state: RefCell, - syncing: Rc>, + syncing: Rc>, auto_vacuum_mode: RefCell, /// 0 -> Database is empty, /// 1 -> Database is being initialized, @@ -521,7 +521,7 @@ impl Pager { commit_info: RefCell::new(CommitInfo { state: CommitState::Start, }), - syncing: Rc::new(RefCell::new(false)), + syncing: Rc::new(Cell::new(false)), checkpoint_state: RefCell::new(CheckpointState::Checkpoint), buffer_pool, auto_vacuum_mode: RefCell::new(AutoVacuumMode::None), @@ -1234,6 +1234,7 @@ impl Pager { io_yield_one!(c); } CommitState::AfterSyncWal => { + turso_assert!(!wal.borrow().is_syncing(), "wal should have synced"); if wal_auto_checkpoint_disabled || !wal.borrow().should_checkpoint() { self.commit_info.borrow_mut().state = CommitState::Start; break PagerCommitResult::WalWritten; @@ -1250,7 +1251,7 @@ impl Pager { io_yield_one!(c); } CommitState::AfterSyncDbFile => { - turso_assert!(!*self.syncing.borrow(), "should have finished syncing"); + turso_assert!(!self.syncing.get(), "should have finished syncing"); self.commit_info.borrow_mut().state = CommitState::Start; break PagerCommitResult::Checkpointed(checkpoint_result); } @@ -1342,7 +1343,7 @@ impl Pager { io_yield_one!(c); } CheckpointState::CheckpointDone { res } => { - turso_assert!(!*self.syncing.borrow(), "syncing should be done"); + turso_assert!(!self.syncing.get(), "syncing should be done"); self.checkpoint_state.replace(CheckpointState::Checkpoint); return Ok(IOResult::Done(res)); } @@ -1929,6 +1930,16 @@ impl Pager { state: CommitState::Start, }); self.allocate_page_state.replace(AllocatePageState::Start); + self.free_page_state.replace(FreePageState::Start); + #[cfg(not(feature = "omit_autovacuum"))] + { + self.ptrmap_get_state.replace(PtrMapGetState::Start); + self.ptrmap_put_state.replace(PtrMapPutState::Start); + self.btree_create_vacuum_full_state + .replace(BtreeCreateVacuumFullState::Start); + } + + self.header_ref_state.replace(HeaderRefState::Start); } pub fn with_header(&self, f: impl Fn(&DatabaseHeader) -> T) -> Result> { diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 733773836..d792da47d 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -63,7 +63,7 @@ use crate::storage::pager::Pager; use crate::storage::wal::{PendingFlush, READMARK_NOT_USED}; use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype}; use crate::{bail_corrupt_error, turso_assert, File, Result, WalFileShared}; -use std::cell::{RefCell, UnsafeCell}; +use std::cell::{Cell, UnsafeCell}; use std::collections::{BTreeMap, HashMap}; use std::mem::MaybeUninit; use std::pin::Pin; @@ -1055,12 +1055,12 @@ pub fn write_pages_vectored( #[instrument(skip_all, level = Level::DEBUG)] pub fn begin_sync( db_file: Arc, - syncing: Rc>, + syncing: Rc>, ) -> Result { - assert!(!*syncing.borrow()); - *syncing.borrow_mut() = true; + assert!(!syncing.get()); + syncing.set(true); let completion = Completion::new_sync(move |_| { - *syncing.borrow_mut() = false; + syncing.set(false); }); #[allow(clippy::arc_with_non_send_sync)] db_file.sync(completion) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 2d480f0d6..ea532fb0d 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -7,7 +7,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use strum::EnumString; use tracing::{instrument, Level}; -use std::fmt::Formatter; +use std::fmt::{Debug, Formatter}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::{cell::Cell, fmt, rc::Rc, sync::Arc}; @@ -213,7 +213,7 @@ impl TursoRwLock { } /// Write-ahead log (WAL). -pub trait Wal { +pub trait Wal: Debug { /// Begin a read transaction. fn begin_read_tx(&mut self) -> Result<(LimboResult, bool)>; @@ -277,6 +277,7 @@ pub trait Wal { mode: CheckpointMode, ) -> Result>; fn sync(&mut self) -> Result; + fn is_syncing(&self) -> bool; fn get_max_frame_in_wal(&self) -> u64; fn get_max_frame(&self) -> u64; fn get_min_frame(&self) -> u64; @@ -1122,11 +1123,16 @@ impl Wal for WalFile { syncing.set(false); }); let shared = self.get_shared(); - let c = shared.file.sync(completion)?; self.syncing.set(true); + let c = shared.file.sync(completion)?; Ok(c) } + // Currently used for assertion purposes + fn is_syncing(&self) -> bool { + self.syncing.get() + } + fn get_max_frame_in_wal(&self) -> u64 { self.get_shared().max_frame.load(Ordering::Acquire) } diff --git a/simulator/generation/plan.rs b/simulator/generation/plan.rs index 908e9f654..eae39d4d1 100644 --- a/simulator/generation/plan.rs +++ b/simulator/generation/plan.rs @@ -123,11 +123,12 @@ impl Shadow for Interactions { let mut is_error = false; for interaction in property.interactions() { match interaction { - Interaction::Query(query) - | Interaction::FsyncQuery(query) - | Interaction::FaultyQuery(query) => { - is_error = is_error || query.shadow(tables).is_err(); + Interaction::Query(query) | Interaction::FsyncQuery(query) => { + if query.shadow(tables).is_err() { + is_error = true; + } } + Interaction::FaultyQuery(..) => {} Interaction::Assertion(_) => {} Interaction::Assumption(_) => {} Interaction::Fault(_) => {} @@ -653,13 +654,13 @@ impl Interaction { loop { let syncing = { let files = env.io.files.borrow(); - // TODO: currently assuming we only have 1 file that is syncing files .iter() .any(|file| file.sync_completion.borrow().is_some()) }; let inject_fault = env.rng.gen_bool(current_prob); - if inject_fault || syncing { + // TODO: avoid for now injecting faults when syncing + if inject_fault && !syncing { env.io.inject_fault(true); } diff --git a/simulator/generation/property.rs b/simulator/generation/property.rs index b06a92e04..6a0237509 100644 --- a/simulator/generation/property.rs +++ b/simulator/generation/property.rs @@ -80,7 +80,6 @@ pub(crate) enum Property { /// ASSERT TableHasExpectedContent { table: String, - expected_content: Vec>, }, /// Double Create Failure is a property in which creating /// the same table twice leads to an error. @@ -224,13 +223,9 @@ impl Property { /// and `interaction` cannot be serialized directly. pub(crate) fn interactions(&self) -> Vec { match self { - Property::TableHasExpectedContent { - table, - expected_content, - } => { + Property::TableHasExpectedContent { table } => { let table = table.to_string(); let table_name = table.clone(); - let expected_content = expected_content.clone(); let assumption = Interaction::Assumption(Assertion { name: format!("table {} exists", table.clone()), func: Box::new(move |_: &Vec, env: &mut SimulatorEnv| { @@ -249,20 +244,25 @@ impl Property { let assertion = Interaction::Assertion(Assertion { name: format!("table {} should have the expected content", table.clone()), - func: Box::new(move |stack: &Vec, _| { + func: Box::new(move |stack: &Vec, env| { let rows = stack.last().unwrap(); let Ok(rows) = rows else { return Ok(Err(format!("expected rows but got error: {rows:?}"))); }; - if rows.len() != expected_content.len() { + let sim_table = env + .tables + .iter() + .find(|t| t.name == table) + .expect("table should be in enviroment"); + if rows.len() != sim_table.rows.len() { return Ok(Err(format!( "expected {} rows but got {} for table {}", - expected_content.len(), + sim_table.rows.len(), rows.len(), table.clone() ))); } - for expected_row in expected_content.iter() { + for expected_row in sim_table.rows.iter() { if !rows.contains(expected_row) { return Ok(Err(format!( "expected row {:?} not found in table {}", @@ -1188,11 +1188,8 @@ fn property_read_your_updates_back(rng: &mut R, env: &SimulatorEnv fn property_table_has_expected_content(rng: &mut R, env: &SimulatorEnv) -> Property { // Get a random table let table = pick(&env.tables, rng); - // Generate rows to insert - let rows = table.rows.clone(); Property::TableHasExpectedContent { table: table.name.clone(), - expected_content: rows, } } @@ -1509,7 +1506,7 @@ impl ArbitraryFrom<(&SimulatorEnv, &InteractionStats)> for Property { Box::new(|rng: &mut R| property_fsync_no_wait(rng, env, &remaining_)), ), ( - if env.opts.enable_faulty_query { + if !env.opts.disable_faulty_query { 20.0 } else { 0.0 diff --git a/simulator/runner/cli.rs b/simulator/runner/cli.rs index b67b6fef3..1a79a121f 100644 --- a/simulator/runner/cli.rs +++ b/simulator/runner/cli.rs @@ -103,8 +103,8 @@ pub struct SimulatorCLI { pub disable_union_all_preserves_cardinality: bool, #[clap(long, help = "disable FsyncNoWait Property", default_value_t = true)] pub disable_fsync_no_wait: bool, - #[clap(long, help = "enable FaultyQuery Property", default_value_t = false)] - pub enable_faulty_query: bool, + #[clap(long, help = "disable FaultyQuery Property", default_value_t = false)] + pub disable_faulty_query: bool, #[clap(long, help = "disable Reopen-Database fault", default_value_t = false)] pub disable_reopen_database: bool, #[clap( diff --git a/simulator/runner/env.rs b/simulator/runner/env.rs index c061cfc52..f5787bc57 100644 --- a/simulator/runner/env.rs +++ b/simulator/runner/env.rs @@ -241,7 +241,7 @@ impl SimulatorEnv { disable_union_all_preserves_cardinality: cli_opts .disable_union_all_preserves_cardinality, disable_fsync_no_wait: cli_opts.disable_fsync_no_wait, - enable_faulty_query: cli_opts.enable_faulty_query, + disable_faulty_query: cli_opts.disable_faulty_query, page_size: 4096, // TODO: randomize this too max_interactions: rng.gen_range(cli_opts.minimum_tests..=cli_opts.maximum_tests), max_time_simulation: cli_opts.maximum_time, @@ -414,7 +414,7 @@ pub(crate) struct SimulatorOpts { pub(crate) disable_where_true_false_null: bool, pub(crate) disable_union_all_preserves_cardinality: bool, pub(crate) disable_fsync_no_wait: bool, - pub(crate) enable_faulty_query: bool, + pub(crate) disable_faulty_query: bool, pub(crate) disable_reopen_database: bool, pub(crate) max_interactions: usize, diff --git a/simulator/runner/execution.rs b/simulator/runner/execution.rs index 206423e4e..9cbac3826 100644 --- a/simulator/runner/execution.rs +++ b/simulator/runner/execution.rs @@ -192,7 +192,9 @@ pub(crate) fn execute_interaction( }; tracing::debug!(?interaction); let results = interaction.execute_query(conn, &env.io); - tracing::debug!(?results); + if results.is_err() { + tracing::error!(?results); + } stack.push(results); limbo_integrity_check(conn)?; } @@ -204,7 +206,9 @@ pub(crate) fn execute_interaction( }; let results = interaction.execute_fsync_query(conn.clone(), env); - tracing::debug!(?results); + if results.is_err() { + tracing::error!(?results); + } stack.push(results); let query_interaction = Interaction::Query(query.clone()); @@ -235,7 +239,9 @@ pub(crate) fn execute_interaction( }; let results = interaction.execute_faulty_query(&conn, env); - tracing::debug!(?results); + if results.is_err() { + tracing::error!(?results); + } stack.push(results); // Reset fault injection env.io.inject_fault(false); diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index 076e2ed9e..c8c5ff4fa 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -11,6 +11,7 @@ use turso_core::{File, Result}; use crate::{model::FAULT_ERROR_MSG, runner::clock::SimulatorClock}; pub(crate) struct SimulatorFile { + pub path: String, pub(crate) inner: Arc, pub(crate) fault: Cell, diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index 29973f889..2e0de95dc 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -65,7 +65,11 @@ impl SimulatorIO { pub(crate) fn print_stats(&self) { tracing::info!("run_once faults: {}", self.nr_run_once_faults.get()); for file in self.files.borrow().iter() { - tracing::info!("\n===========================\n{}", file.stats_table()); + tracing::info!( + "\n===========================\n\nPath: {}\n{}", + file.path, + file.stats_table() + ); } } } @@ -85,6 +89,7 @@ impl IO for SimulatorIO { ) -> Result> { let inner = self.inner.open_file(path, flags, false)?; let file = Arc::new(SimulatorFile { + path: path.to_string(), inner, fault: Cell::new(false), nr_pread_faults: Cell::new(0), diff --git a/simulator/shrink/plan.rs b/simulator/shrink/plan.rs index 29935aa0d..f08ccbb5a 100644 --- a/simulator/shrink/plan.rs +++ b/simulator/shrink/plan.rs @@ -73,14 +73,16 @@ impl InteractionPlan { | Property::DropSelect { queries, .. } => { queries.clear(); } + Property::FsyncNoWait { tables, .. } + | Property::FaultyQuery { tables, .. } => { + tables.retain(|table| depending_tables.contains(table)); + } Property::SelectLimit { .. } | Property::SelectSelectOptimizer { .. } | Property::WhereTrueFalseNull { .. } | Property::UNIONAllPreservesCardinality { .. } - | Property::FsyncNoWait { .. } | Property::ReadYourUpdatesBack { .. } - | Property::TableHasExpectedContent { .. } - | Property::FaultyQuery { .. } => {} + | Property::TableHasExpectedContent { .. } => {} } } // Check again after query clear if the interactions still uses the failing table