Merge 'Fix 3 different MVCC bugs' from Jussi Saurio

Commit messages contain explanations of each change.
Closes #3129
Closes #3128

Reviewed-by: Pere Diaz Bou <pere-altea@homail.com>

Closes #3146
This commit is contained in:
Jussi Saurio
2025-09-16 15:19:38 +03:00
committed by GitHub
2 changed files with 49 additions and 24 deletions

View File

@@ -12,6 +12,7 @@ use crate::storage::sqlite3_ondisk::DatabaseHeader;
use crate::storage::wal::TursoRwLock;
use crate::types::IOResult;
use crate::types::ImmutableRecord;
use crate::types::SeekResult;
use crate::Completion;
use crate::IOExt;
use crate::LimboError;
@@ -562,6 +563,22 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
})?;
}
}
// We started a pager read transaction at the beginning of the MV transaction, because
// any reads we do from the database file and WAL must uphold snapshot isolation.
// However, now we must end and immediately restart the read transaction before committing.
// This is because other transactions may have committed writes to the DB file or WAL,
// and our pager must read in those changes when applying our writes; otherwise we would overwrite
// the changes from the previous committed transactions.
//
// Note that this would be incredibly unsafe in the regular transaction model, but in MVCC we trust
// the MV-store to uphold the guarantee that no write-write conflicts happened.
self.pager.end_read_tx().expect("end_read_tx cannot fail");
let result = self.pager.begin_read_tx()?;
if let crate::result::LimboResult::Busy = result {
// We cannot obtain a WAL read lock due to contention, so we must abort.
self.commit_coordinator.pager_commit_lock.unlock();
return Err(LimboError::WriteWriteConflict);
}
let result = self.pager.io.block(|| self.pager.begin_write_tx())?;
if let crate::result::LimboResult::Busy = result {
// There is a non-CONCURRENT transaction holding the write lock. We must abort.
@@ -587,8 +604,10 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
let id = &self.write_set[write_set_index];
if let Some(row_versions) = mvcc_store.rows.get(id) {
let row_versions = row_versions.value().read();
// Find rows that were written by this transaction
for row_version in row_versions.iter() {
// Find rows that were written by this transaction.
// Hekaton uses oldest-to-newest order for row versions, so we reverse iterate to find the newest one
// this transaction changed.
for row_version in row_versions.iter().rev() {
if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin {
if row_tx_id == self.tx_id {
let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) {
@@ -913,7 +932,13 @@ impl StateTransition for DeleteRowStateMachine {
.write()
.seek(seek_key, SeekOp::GE { eq_only: true })?
{
IOResult::Done(_) => {
IOResult::Done(seek_res) => {
if seek_res == SeekResult::NotFound {
crate::bail_corrupt_error!(
"MVCC delete: rowid {} not found",
self.rowid.row_id
);
}
self.state = DeleteRowState::Delete;
Ok(TransitionResult::Continue)
}
@@ -1602,8 +1627,8 @@ impl<Clock: LogicalClock> MvStore<Clock> {
// we can either switch to a tree-like structure, or at least use partition_point()
// which performs a binary search for the insertion point.
let mut position = 0_usize;
for (i, v) in versions.iter().rev().enumerate() {
if self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin) {
for (i, v) in versions.iter().enumerate().rev() {
if self.get_begin_timestamp(&v.begin) <= self.get_begin_timestamp(&row_version.begin) {
position = i + 1;
break;
}

View File

@@ -1,14 +1,14 @@
use rand::seq::IndexedRandom;
use rand::Rng;
use rand_chacha::{rand_core::SeedableRng, ChaCha8Rng};
use std::collections::HashMap;
use std::collections::BTreeMap;
use turso::{Builder, Value};
// In-memory representation of the database state
#[derive(Debug, Clone, PartialEq)]
struct DbRow {
id: i64,
other_columns: HashMap<String, Value>,
other_columns: BTreeMap<String, Value>,
}
impl std::fmt::Display for DbRow {
@@ -33,9 +33,9 @@ impl std::fmt::Display for DbRow {
#[derive(Debug, Clone)]
struct TransactionState {
// The schema this transaction can see (snapshot)
schema: HashMap<String, TableSchema>,
schema: BTreeMap<String, TableSchema>,
// The rows this transaction can see (snapshot)
visible_rows: HashMap<i64, DbRow>,
visible_rows: BTreeMap<i64, DbRow>,
// Pending changes in this transaction
pending_changes: Vec<Operation>,
}
@@ -55,23 +55,23 @@ struct TableSchema {
#[derive(Debug)]
struct ShadowDb {
// Schema
schema: HashMap<String, TableSchema>,
schema: BTreeMap<String, TableSchema>,
// Committed state (what's actually in the database)
committed_rows: HashMap<i64, DbRow>,
committed_rows: BTreeMap<i64, DbRow>,
// Transaction states
transactions: HashMap<usize, Option<TransactionState>>,
transactions: BTreeMap<usize, Option<TransactionState>>,
query_gen_options: QueryGenOptions,
}
impl ShadowDb {
fn new(
initial_schema: HashMap<String, TableSchema>,
initial_schema: BTreeMap<String, TableSchema>,
query_gen_options: QueryGenOptions,
) -> Self {
Self {
schema: initial_schema,
committed_rows: HashMap::new(),
transactions: HashMap::new(),
committed_rows: BTreeMap::new(),
transactions: BTreeMap::new(),
query_gen_options,
}
}
@@ -190,7 +190,7 @@ impl ShadowDb {
&mut self,
tx_id: usize,
id: i64,
other_columns: HashMap<String, Value>,
other_columns: BTreeMap<String, Value>,
) -> Result<(), String> {
if let Some(tx_state) = self.transactions.get_mut(&tx_id) {
// Check if row exists in visible state
@@ -217,7 +217,7 @@ impl ShadowDb {
&mut self,
tx_id: usize,
id: i64,
other_columns: HashMap<String, Value>,
other_columns: BTreeMap<String, Value>,
) -> Result<(), String> {
if let Some(tx_state) = self.transactions.get_mut(&tx_id) {
// Check if row exists in visible state
@@ -400,11 +400,11 @@ enum Operation {
Rollback,
Insert {
id: i64,
other_columns: HashMap<String, Value>,
other_columns: BTreeMap<String, Value>,
},
Update {
id: i64,
other_columns: HashMap<String, Value>,
other_columns: BTreeMap<String, Value>,
},
Delete {
id: i64,
@@ -600,7 +600,7 @@ async fn multiple_connections_fuzz(opts: FuzzOptions) {
.unwrap();
// SHARED shadow database for all connections
let mut schema = HashMap::new();
let mut schema = BTreeMap::new();
schema.insert(
"test_table".to_string(),
TableSchema {
@@ -883,7 +883,7 @@ async fn multiple_connections_fuzz(opts: FuzzOptions) {
let Value::Integer(id) = row.get_value(0).unwrap() else {
panic!("Unexpected value for id: {:?}", row.get_value(0));
};
let mut other_columns = HashMap::new();
let mut other_columns = BTreeMap::new();
for i in 1..columns.len() {
let column = columns.get(i).unwrap();
let value = row.get_value(i).unwrap();
@@ -1171,13 +1171,13 @@ fn generate_operation(
fn generate_data_operation(
rng: &mut ChaCha8Rng,
visible_rows: &[DbRow],
schema: &HashMap<String, TableSchema>,
schema: &BTreeMap<String, TableSchema>,
dml_gen_options: &DmlGenOptions,
) -> Operation {
let table_schema = schema.get("test_table").unwrap();
let generate_insert_operation = |rng: &mut ChaCha8Rng| {
let id = rng.random_range(1..i64::MAX);
let mut other_columns = HashMap::new();
let mut other_columns = BTreeMap::new();
for column in table_schema.columns.iter() {
if column.name == "id" {
continue;
@@ -1224,7 +1224,7 @@ fn generate_data_operation(
}
let id = visible_rows.choose(rng).unwrap().id;
let col_name_to_update = columns_no_id.choose(rng).unwrap().name.clone();
let mut other_columns = HashMap::new();
let mut other_columns = BTreeMap::new();
other_columns.insert(
col_name_to_update.clone(),
match columns_no_id