From ea6373b8ae6e514cbda275102e1f488b5ac68b3d Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 16 Sep 2025 10:44:21 +0300 Subject: [PATCH 1/5] Switch to BTreeMap for deterministic iteration --- tests/integration/fuzz_transaction/mod.rs | 38 +++++++++++------------ 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/tests/integration/fuzz_transaction/mod.rs b/tests/integration/fuzz_transaction/mod.rs index 24b9aaeac..74dad6571 100644 --- a/tests/integration/fuzz_transaction/mod.rs +++ b/tests/integration/fuzz_transaction/mod.rs @@ -1,14 +1,14 @@ use rand::seq::IndexedRandom; use rand::Rng; use rand_chacha::{rand_core::SeedableRng, ChaCha8Rng}; -use std::collections::HashMap; +use std::collections::BTreeMap; use turso::{Builder, Value}; // In-memory representation of the database state #[derive(Debug, Clone, PartialEq)] struct DbRow { id: i64, - other_columns: HashMap, + other_columns: BTreeMap, } impl std::fmt::Display for DbRow { @@ -33,9 +33,9 @@ impl std::fmt::Display for DbRow { #[derive(Debug, Clone)] struct TransactionState { // The schema this transaction can see (snapshot) - schema: HashMap, + schema: BTreeMap, // The rows this transaction can see (snapshot) - visible_rows: HashMap, + visible_rows: BTreeMap, // Pending changes in this transaction pending_changes: Vec, } @@ -55,23 +55,23 @@ struct TableSchema { #[derive(Debug)] struct ShadowDb { // Schema - schema: HashMap, + schema: BTreeMap, // Committed state (what's actually in the database) - committed_rows: HashMap, + committed_rows: BTreeMap, // Transaction states - transactions: HashMap>, + transactions: BTreeMap>, query_gen_options: QueryGenOptions, } impl ShadowDb { fn new( - initial_schema: HashMap, + initial_schema: BTreeMap, query_gen_options: QueryGenOptions, ) -> Self { Self { schema: initial_schema, - committed_rows: HashMap::new(), - transactions: HashMap::new(), + committed_rows: BTreeMap::new(), + transactions: BTreeMap::new(), query_gen_options, } } @@ -190,7 +190,7 @@ impl ShadowDb { &mut self, tx_id: usize, id: i64, - other_columns: HashMap, + other_columns: BTreeMap, ) -> Result<(), String> { if let Some(tx_state) = self.transactions.get_mut(&tx_id) { // Check if row exists in visible state @@ -217,7 +217,7 @@ impl ShadowDb { &mut self, tx_id: usize, id: i64, - other_columns: HashMap, + other_columns: BTreeMap, ) -> Result<(), String> { if let Some(tx_state) = self.transactions.get_mut(&tx_id) { // Check if row exists in visible state @@ -400,11 +400,11 @@ enum Operation { Rollback, Insert { id: i64, - other_columns: HashMap, + other_columns: BTreeMap, }, Update { id: i64, - other_columns: HashMap, + other_columns: BTreeMap, }, Delete { id: i64, @@ -600,7 +600,7 @@ async fn multiple_connections_fuzz(opts: FuzzOptions) { .unwrap(); // SHARED shadow database for all connections - let mut schema = HashMap::new(); + let mut schema = BTreeMap::new(); schema.insert( "test_table".to_string(), TableSchema { @@ -883,7 +883,7 @@ async fn multiple_connections_fuzz(opts: FuzzOptions) { let Value::Integer(id) = row.get_value(0).unwrap() else { panic!("Unexpected value for id: {:?}", row.get_value(0)); }; - let mut other_columns = HashMap::new(); + let mut other_columns = BTreeMap::new(); for i in 1..columns.len() { let column = columns.get(i).unwrap(); let value = row.get_value(i).unwrap(); @@ -1171,13 +1171,13 @@ fn generate_operation( fn generate_data_operation( rng: &mut ChaCha8Rng, visible_rows: &[DbRow], - schema: &HashMap, + schema: &BTreeMap, dml_gen_options: &DmlGenOptions, ) -> Operation { let table_schema = schema.get("test_table").unwrap(); let generate_insert_operation = |rng: &mut ChaCha8Rng| { let id = rng.random_range(1..i64::MAX); - let mut other_columns = HashMap::new(); + let mut other_columns = BTreeMap::new(); for column in table_schema.columns.iter() { if column.name == "id" { continue; @@ -1224,7 +1224,7 @@ fn generate_data_operation( } let id = visible_rows.choose(rng).unwrap().id; let col_name_to_update = columns_no_id.choose(rng).unwrap().name.clone(); - let mut other_columns = HashMap::new(); + let mut other_columns = BTreeMap::new(); other_columns.insert( col_name_to_update.clone(), match columns_no_id From 847e413c3458c9bc16a1b206042a04ef5e723402 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 16 Sep 2025 12:24:17 +0300 Subject: [PATCH 2/5] mvcc: assert that DeleteRowStateMachine must find the row it is deleting --- core/mvcc/database/mod.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 855ad8f8d..85d48b832 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -12,6 +12,7 @@ use crate::storage::sqlite3_ondisk::DatabaseHeader; use crate::storage::wal::TursoRwLock; use crate::types::IOResult; use crate::types::ImmutableRecord; +use crate::types::SeekResult; use crate::Completion; use crate::IOExt; use crate::LimboError; @@ -913,7 +914,13 @@ impl StateTransition for DeleteRowStateMachine { .write() .seek(seek_key, SeekOp::GE { eq_only: true })? { - IOResult::Done(_) => { + IOResult::Done(seek_res) => { + if seek_res == SeekResult::NotFound { + crate::bail_corrupt_error!( + "MVCC delete: rowid {} not found", + self.rowid.row_id + ); + } self.state = DeleteRowState::Delete; Ok(TransitionResult::Continue) } From 139ce39a00c9da50bf0424dc7abbf0dc0e2a13e3 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 16 Sep 2025 12:24:54 +0300 Subject: [PATCH 3/5] mvcc: fix logic bug in MvStore::insert_version_raw() In insert_version_raw(), we correctly iterate the versions backwards because we want to find the newest version that is still older than the one we are inserting. However, the order of `.enumerate()` and `.rev()` was wrong, so the insertion position was calculated based on the position in the _reversed_ iterator, not the original iterator. --- core/mvcc/database/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 85d48b832..670b1bef0 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1609,8 +1609,8 @@ impl MvStore { // we can either switch to a tree-like structure, or at least use partition_point() // which performs a binary search for the insertion point. let mut position = 0_usize; - for (i, v) in versions.iter().rev().enumerate() { - if self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin) { + for (i, v) in versions.iter().enumerate().rev() { + if self.get_begin_timestamp(&v.begin) <= self.get_begin_timestamp(&row_version.begin) { position = i + 1; break; } From b4fba69fe26f9a16dc1fbf94619a0f7545c239c8 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 16 Sep 2025 12:26:57 +0300 Subject: [PATCH 4/5] mvcc: fix logic bug in CommitState::WriteRow iteration order We must iterate the row versions in reverse order because the versions are in order of oldest to newest, and we must commit the newest version applied by the active transaction. --- core/mvcc/database/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 670b1bef0..2bfaabd6c 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -588,8 +588,10 @@ impl StateTransition for CommitStateMachine { let id = &self.write_set[write_set_index]; if let Some(row_versions) = mvcc_store.rows.get(id) { let row_versions = row_versions.value().read(); - // Find rows that were written by this transaction - for row_version in row_versions.iter() { + // Find rows that were written by this transaction. + // Hekaton uses oldest-to-newest order for row versions, so we reverse iterate to find the newest one + // this transaction changed. + for row_version in row_versions.iter().rev() { if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin { if row_tx_id == self.tx_id { let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) { From e0127685497e2d170c9514678040986c143f66c0 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 16 Sep 2025 12:30:50 +0300 Subject: [PATCH 5/5] mvcc: dont allow CONCURRENT transaction to overwrite others changes We start a pager read transaction at the beginning of the MV transaction, because any reads we do from the database file and WAL must uphold snapshot isolation. However, we must end and immediately restart the read transaction before committing. This is because other transactions may have committed writes to the DB file or WAL, and our pager must read in those changes when applying our writes; otherwise we would overwrite the changes from the previous committed transactions. Note that this would be incredibly unsafe in the regular transaction model, but in MVCC we trust the MV-store to uphold the guarantee that no write-write conflicts happened. --- core/mvcc/database/mod.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 2bfaabd6c..7b35c21f3 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -563,6 +563,22 @@ impl StateTransition for CommitStateMachine { })?; } } + // We started a pager read transaction at the beginning of the MV transaction, because + // any reads we do from the database file and WAL must uphold snapshot isolation. + // However, now we must end and immediately restart the read transaction before committing. + // This is because other transactions may have committed writes to the DB file or WAL, + // and our pager must read in those changes when applying our writes; otherwise we would overwrite + // the changes from the previous committed transactions. + // + // Note that this would be incredibly unsafe in the regular transaction model, but in MVCC we trust + // the MV-store to uphold the guarantee that no write-write conflicts happened. + self.pager.end_read_tx().expect("end_read_tx cannot fail"); + let result = self.pager.begin_read_tx()?; + if let crate::result::LimboResult::Busy = result { + // We cannot obtain a WAL read lock due to contention, so we must abort. + self.commit_coordinator.pager_commit_lock.unlock(); + return Err(LimboError::WriteWriteConflict); + } let result = self.pager.io.block(|| self.pager.begin_write_tx())?; if let crate::result::LimboResult::Busy = result { // There is a non-CONCURRENT transaction holding the write lock. We must abort.