diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 855ad8f8d..7b35c21f3 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -12,6 +12,7 @@ use crate::storage::sqlite3_ondisk::DatabaseHeader; use crate::storage::wal::TursoRwLock; use crate::types::IOResult; use crate::types::ImmutableRecord; +use crate::types::SeekResult; use crate::Completion; use crate::IOExt; use crate::LimboError; @@ -562,6 +563,22 @@ impl StateTransition for CommitStateMachine { })?; } } + // We started a pager read transaction at the beginning of the MV transaction, because + // any reads we do from the database file and WAL must uphold snapshot isolation. + // However, now we must end and immediately restart the read transaction before committing. + // This is because other transactions may have committed writes to the DB file or WAL, + // and our pager must read in those changes when applying our writes; otherwise we would overwrite + // the changes from the previous committed transactions. + // + // Note that this would be incredibly unsafe in the regular transaction model, but in MVCC we trust + // the MV-store to uphold the guarantee that no write-write conflicts happened. + self.pager.end_read_tx().expect("end_read_tx cannot fail"); + let result = self.pager.begin_read_tx()?; + if let crate::result::LimboResult::Busy = result { + // We cannot obtain a WAL read lock due to contention, so we must abort. + self.commit_coordinator.pager_commit_lock.unlock(); + return Err(LimboError::WriteWriteConflict); + } let result = self.pager.io.block(|| self.pager.begin_write_tx())?; if let crate::result::LimboResult::Busy = result { // There is a non-CONCURRENT transaction holding the write lock. We must abort. @@ -587,8 +604,10 @@ impl StateTransition for CommitStateMachine { let id = &self.write_set[write_set_index]; if let Some(row_versions) = mvcc_store.rows.get(id) { let row_versions = row_versions.value().read(); - // Find rows that were written by this transaction - for row_version in row_versions.iter() { + // Find rows that were written by this transaction. + // Hekaton uses oldest-to-newest order for row versions, so we reverse iterate to find the newest one + // this transaction changed. + for row_version in row_versions.iter().rev() { if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin { if row_tx_id == self.tx_id { let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) { @@ -913,7 +932,13 @@ impl StateTransition for DeleteRowStateMachine { .write() .seek(seek_key, SeekOp::GE { eq_only: true })? { - IOResult::Done(_) => { + IOResult::Done(seek_res) => { + if seek_res == SeekResult::NotFound { + crate::bail_corrupt_error!( + "MVCC delete: rowid {} not found", + self.rowid.row_id + ); + } self.state = DeleteRowState::Delete; Ok(TransitionResult::Continue) } @@ -1602,8 +1627,8 @@ impl MvStore { // we can either switch to a tree-like structure, or at least use partition_point() // which performs a binary search for the insertion point. let mut position = 0_usize; - for (i, v) in versions.iter().rev().enumerate() { - if self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin) { + for (i, v) in versions.iter().enumerate().rev() { + if self.get_begin_timestamp(&v.begin) <= self.get_begin_timestamp(&row_version.begin) { position = i + 1; break; } diff --git a/tests/integration/fuzz_transaction/mod.rs b/tests/integration/fuzz_transaction/mod.rs index 24b9aaeac..74dad6571 100644 --- a/tests/integration/fuzz_transaction/mod.rs +++ b/tests/integration/fuzz_transaction/mod.rs @@ -1,14 +1,14 @@ use rand::seq::IndexedRandom; use rand::Rng; use rand_chacha::{rand_core::SeedableRng, ChaCha8Rng}; -use std::collections::HashMap; +use std::collections::BTreeMap; use turso::{Builder, Value}; // In-memory representation of the database state #[derive(Debug, Clone, PartialEq)] struct DbRow { id: i64, - other_columns: HashMap, + other_columns: BTreeMap, } impl std::fmt::Display for DbRow { @@ -33,9 +33,9 @@ impl std::fmt::Display for DbRow { #[derive(Debug, Clone)] struct TransactionState { // The schema this transaction can see (snapshot) - schema: HashMap, + schema: BTreeMap, // The rows this transaction can see (snapshot) - visible_rows: HashMap, + visible_rows: BTreeMap, // Pending changes in this transaction pending_changes: Vec, } @@ -55,23 +55,23 @@ struct TableSchema { #[derive(Debug)] struct ShadowDb { // Schema - schema: HashMap, + schema: BTreeMap, // Committed state (what's actually in the database) - committed_rows: HashMap, + committed_rows: BTreeMap, // Transaction states - transactions: HashMap>, + transactions: BTreeMap>, query_gen_options: QueryGenOptions, } impl ShadowDb { fn new( - initial_schema: HashMap, + initial_schema: BTreeMap, query_gen_options: QueryGenOptions, ) -> Self { Self { schema: initial_schema, - committed_rows: HashMap::new(), - transactions: HashMap::new(), + committed_rows: BTreeMap::new(), + transactions: BTreeMap::new(), query_gen_options, } } @@ -190,7 +190,7 @@ impl ShadowDb { &mut self, tx_id: usize, id: i64, - other_columns: HashMap, + other_columns: BTreeMap, ) -> Result<(), String> { if let Some(tx_state) = self.transactions.get_mut(&tx_id) { // Check if row exists in visible state @@ -217,7 +217,7 @@ impl ShadowDb { &mut self, tx_id: usize, id: i64, - other_columns: HashMap, + other_columns: BTreeMap, ) -> Result<(), String> { if let Some(tx_state) = self.transactions.get_mut(&tx_id) { // Check if row exists in visible state @@ -400,11 +400,11 @@ enum Operation { Rollback, Insert { id: i64, - other_columns: HashMap, + other_columns: BTreeMap, }, Update { id: i64, - other_columns: HashMap, + other_columns: BTreeMap, }, Delete { id: i64, @@ -600,7 +600,7 @@ async fn multiple_connections_fuzz(opts: FuzzOptions) { .unwrap(); // SHARED shadow database for all connections - let mut schema = HashMap::new(); + let mut schema = BTreeMap::new(); schema.insert( "test_table".to_string(), TableSchema { @@ -883,7 +883,7 @@ async fn multiple_connections_fuzz(opts: FuzzOptions) { let Value::Integer(id) = row.get_value(0).unwrap() else { panic!("Unexpected value for id: {:?}", row.get_value(0)); }; - let mut other_columns = HashMap::new(); + let mut other_columns = BTreeMap::new(); for i in 1..columns.len() { let column = columns.get(i).unwrap(); let value = row.get_value(i).unwrap(); @@ -1171,13 +1171,13 @@ fn generate_operation( fn generate_data_operation( rng: &mut ChaCha8Rng, visible_rows: &[DbRow], - schema: &HashMap, + schema: &BTreeMap, dml_gen_options: &DmlGenOptions, ) -> Operation { let table_schema = schema.get("test_table").unwrap(); let generate_insert_operation = |rng: &mut ChaCha8Rng| { let id = rng.random_range(1..i64::MAX); - let mut other_columns = HashMap::new(); + let mut other_columns = BTreeMap::new(); for column in table_schema.columns.iter() { if column.name == "id" { continue; @@ -1224,7 +1224,7 @@ fn generate_data_operation( } let id = visible_rows.choose(rng).unwrap().id; let col_name_to_update = columns_no_id.choose(rng).unwrap().name.clone(); - let mut other_columns = HashMap::new(); + let mut other_columns = BTreeMap::new(); other_columns.insert( col_name_to_update.clone(), match columns_no_id