From f26e36b6ded3f60f9e6a3a681a764ac0095174aa Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 2 Sep 2025 15:02:38 +0200 Subject: [PATCH 1/4] core/mvcc: test write concurrency fix --- core/mvcc/database/tests.rs | 75 ++++++++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 438e897ec..e91dfd0cb 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -654,10 +654,10 @@ fn test_future_row() { use crate::mvcc::cursor::MvccLazyCursor; use crate::mvcc::database::{MvStore, Row, RowID}; use crate::types::Text; -use crate::MemoryIO; use crate::RefValue; use crate::Value; use crate::{Database, StepResult}; +use crate::{MemoryIO, Statement}; // Simple atomic clock implementation for testing @@ -1243,3 +1243,76 @@ fn get_rows(conn: &Arc, query: &str) -> Vec> { } rows } + +#[test] +#[ignore] +fn test_concurrent_writes() { + struct ConnectionState { + conn: Arc, + inserts: Vec, + current_statement: Option, + } + let db = MvccTestDbNoConn::new_with_random_db(); + let mut connecitons = Vec::new(); + { + let conn = db.connect(); + conn.execute("CREATE TABLE test (x)").unwrap(); + conn.close().unwrap(); + } + for i in 0..2 { + let conn = db.connect(); + let mut inserts = ((100 * i)..(100 * (i + 1))).collect::>(); + inserts.reverse(); + connecitons.push(ConnectionState { + conn, + inserts, + current_statement: None, + }); + } + + loop { + let mut all_finished = true; + for conn in &mut connecitons { + if !conn.inserts.is_empty() && conn.current_statement.is_none() { + all_finished = false; + break; + } + } + for (conn_id, conn) in connecitons.iter_mut().enumerate() { + println!("connection {conn_id} inserts: {:?}", conn.inserts); + if conn.current_statement.is_none() && !conn.inserts.is_empty() { + let write = conn.inserts.pop().unwrap(); + println!("inserting row {write} from connection {conn_id}"); + conn.current_statement = Some( + conn.conn + .prepare(&format!("INSERT INTO test (x) VALUES ({write})")) + .unwrap(), + ); + } + if conn.current_statement.is_none() { + continue; + } + let stmt = conn.current_statement.as_mut().unwrap(); + match stmt.step().unwrap() { + // These you be only possible cases in write concurrency. + // No rows because insert doesn't return + // No interrupt because insert doesn't interrupt + // No busy because insert in mvcc should be multi concurrent write + StepResult::Done => { + conn.current_statement = None; + } + StepResult::IO => { + // let's skip doing I/O here, we want to perform io only after all the statements are stepped + } + _ => { + unreachable!() + } + } + } + db.get_db().io.run_once().unwrap(); + + if all_finished { + break; + } + } +} From 13c505109a3a1b46e9e93a1071e23b765bdc93c5 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 2 Sep 2025 17:06:49 +0200 Subject: [PATCH 2/4] core/mvcc: make commit_txn return on I/O --- core/mvcc/database/mod.rs | 9 +++++ core/state_machine.rs | 1 + core/vdbe/execute.rs | 2 +- core/vdbe/mod.rs | 82 ++++++++++++++++++++++++++------------- 4 files changed, 67 insertions(+), 27 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 13e28c437..6727784e5 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -271,6 +271,15 @@ pub struct CommitStateMachine { _phantom: PhantomData, } +impl Debug for CommitStateMachine { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CommitStateMachine") + .field("state", &self.state) + .field("is_finalized", &self.is_finalized) + .finish() + } +} + pub struct WriteRowStateMachine { state: WriteRowState, is_finalized: bool, diff --git a/core/state_machine.rs b/core/state_machine.rs index 0e3f30816..0d776df10 100644 --- a/core/state_machine.rs +++ b/core/state_machine.rs @@ -27,6 +27,7 @@ pub trait StateTransition { fn is_finalized(&self) -> bool; } +#[derive(Debug)] pub struct StateMachine { state: State, is_finalized: bool, diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index f2f51da86..7b67b77a7 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2165,7 +2165,7 @@ pub fn op_auto_commit( insn ); let conn = program.connection.clone(); - if state.commit_state == CommitState::Committing { + if matches!(state.commit_state, CommitState::Committing) { return program .commit_txn(pager.clone(), state, mv_store, *rollback) .map(Into::into); diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index f3a9425a8..20ab70584 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -28,16 +28,19 @@ pub mod sorter; use crate::{ error::LimboError, function::{AggFunc, FuncCtx}, - state_machine::StateTransition, + mvcc::{database::CommitStateMachine, LocalClock}, + state_machine::{StateMachine, StateTransition, TransitionResult}, storage::sqlite3_ondisk::SmallVec, translate::{collate::CollationSeq, plan::TableReferences}, types::{IOCompletions, IOResult, RawSlice, TextRef}, - vdbe::execute::{ - OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState, - OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState, + vdbe::{ + execute::{ + OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState, + OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState, + }, + metrics::StatementMetrics, }, - vdbe::metrics::StatementMetrics, - IOExt, RefValue, + IOExt, RefValue, }; use crate::{ @@ -210,7 +213,7 @@ impl Bitfield { } } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug)] /// The commit state of the program. /// There are two states: /// - Ready: The program is ready to run the next instruction, or has shut down after @@ -220,6 +223,9 @@ impl Bitfield { enum CommitState { Ready, Committing, + CommitingMvcc { + state_machine: StateMachine>, + }, } #[derive(Debug, Clone)] @@ -550,25 +556,30 @@ impl Program { if auto_commit { // FIXME: we don't want to commit stuff from other programs. let mut mv_transactions = conn.mv_transactions.borrow_mut(); - for tx_id in mv_transactions.iter() { - let mut state_machine = - mv_store.commit_tx(*tx_id, pager.clone(), &conn).unwrap(); - // TODO: sync IO hack - loop { - let res = state_machine.step(mv_store)?; - match res { - crate::state_machine::TransitionResult::Io(io) => { - io.wait(conn._db.io.as_ref())?; - } - crate::state_machine::TransitionResult::Continue => continue, - crate::state_machine::TransitionResult::Done(_) => break, - } - } - assert!(state_machine.is_finalized()); + if matches!(program_state.commit_state, CommitState::Ready) { + assert!( + mv_transactions.len() == 1, + "for now we only support one mv transaction in single connection" + ); + let tx_id = mv_transactions.first().unwrap(); + let state_machine = mv_store.commit_tx(*tx_id, pager.clone(), &conn).unwrap(); + program_state.commit_state = CommitState::CommitingMvcc { state_machine }; + } + let CommitState::CommitingMvcc { state_machine } = + &mut program_state.commit_state else { + panic!("invalid state for mvcc commit step") + }; + match self.step_end_mvcc_txn(state_machine, mv_store)? { + IOResult::Done(_) => { + assert!(state_machine.is_finalized()); + conn.mv_tx_id.set(None); + conn.transaction_state.replace(TransactionState::None); + mv_transactions.clear(); + } + IOResult::IO(io) => { + return Ok(IOResult::IO(io)); + } } - conn.mv_tx_id.set(None); - conn.transaction_state.replace(TransactionState::None); - mv_transactions.clear(); } Ok(IOResult::Done(())) } else { @@ -579,7 +590,7 @@ impl Program { auto_commit, program_state.commit_state ); - if program_state.commit_state == CommitState::Committing { + if matches!(program_state.commit_state, CommitState::Committing) { let TransactionState::Write { .. } = connection.transaction_state.get() else { unreachable!("invalid state for write commit step") }; @@ -644,6 +655,25 @@ impl Program { Ok(IOResult::Done(())) } + #[instrument(skip(self, commit_state, mv_store), level = Level::DEBUG)] + fn step_end_mvcc_txn( + &self, + commit_state: &mut StateMachine>, + mv_store: &Arc, + ) -> Result> { + loop { + match commit_state.step(mv_store)? { + TransitionResult::Continue => {} + TransitionResult::Io(iocompletions) => { + return Ok(IOResult::IO(iocompletions)); + } + TransitionResult::Done(_) => { + return Ok(IOResult::Done(())); + } + } + } + } + #[rustfmt::skip] pub fn explain(&self) -> String { let mut buff = String::with_capacity(1024); From b8f83e1fc03dcff7e26e190ec8cdb35173679d8a Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 2 Sep 2025 18:12:18 +0200 Subject: [PATCH 3/4] clippy and fmt stuff because if not pekka will tweet --- core/mvcc/database/tests.rs | 2 +- core/vdbe/mod.rs | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index e91dfd0cb..be3a29a5d 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -1285,7 +1285,7 @@ fn test_concurrent_writes() { println!("inserting row {write} from connection {conn_id}"); conn.current_statement = Some( conn.conn - .prepare(&format!("INSERT INTO test (x) VALUES ({write})")) + .prepare(format!("INSERT INTO test (x) VALUES ({write})")) .unwrap(), ); } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 20ab70584..39d526564 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -40,7 +40,7 @@ use crate::{ }, metrics::StatementMetrics, }, - IOExt, RefValue, + IOExt, RefValue, }; use crate::{ @@ -214,6 +214,7 @@ impl Bitfield { } #[derive(Debug)] +#[allow(clippy::large_enum_variant)] /// The commit state of the program. /// There are two states: /// - Ready: The program is ready to run the next instruction, or has shut down after @@ -565,10 +566,10 @@ impl Program { let state_machine = mv_store.commit_tx(*tx_id, pager.clone(), &conn).unwrap(); program_state.commit_state = CommitState::CommitingMvcc { state_machine }; } - let CommitState::CommitingMvcc { state_machine } = - &mut program_state.commit_state else { - panic!("invalid state for mvcc commit step") - }; + let CommitState::CommitingMvcc { state_machine } = &mut program_state.commit_state + else { + panic!("invalid state for mvcc commit step") + }; match self.step_end_mvcc_txn(state_machine, mv_store)? { IOResult::Done(_) => { assert!(state_machine.is_finalized()); From 8db5cead07e6339bfe7300d4a26f356bfed7356d Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 3 Sep 2025 14:11:28 +0200 Subject: [PATCH 4/4] core/mvcc: only commit if there is a txn --- core/vdbe/mod.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 39d526564..1c844e08a 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -559,9 +559,12 @@ impl Program { let mut mv_transactions = conn.mv_transactions.borrow_mut(); if matches!(program_state.commit_state, CommitState::Ready) { assert!( - mv_transactions.len() == 1, - "for now we only support one mv transaction in single connection" + mv_transactions.len() <= 1, + "for now we only support one mv transaction in single connection, {mv_transactions:?}", ); + if mv_transactions.is_empty() { + return Ok(IOResult::Done(())); + } let tx_id = mv_transactions.first().unwrap(); let state_machine = mv_store.commit_tx(*tx_id, pager.clone(), &conn).unwrap(); program_state.commit_state = CommitState::CommitingMvcc { state_machine }; @@ -575,7 +578,9 @@ impl Program { assert!(state_machine.is_finalized()); conn.mv_tx_id.set(None); conn.transaction_state.replace(TransactionState::None); + program_state.commit_state = CommitState::Ready; mv_transactions.clear(); + return Ok(IOResult::Done(())); } IOResult::IO(io) => { return Ok(IOResult::IO(io));