diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 13e28c437..6727784e5 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -271,6 +271,15 @@ pub struct CommitStateMachine { _phantom: PhantomData, } +impl Debug for CommitStateMachine { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CommitStateMachine") + .field("state", &self.state) + .field("is_finalized", &self.is_finalized) + .finish() + } +} + pub struct WriteRowStateMachine { state: WriteRowState, is_finalized: bool, diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 438e897ec..be3a29a5d 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -654,10 +654,10 @@ fn test_future_row() { use crate::mvcc::cursor::MvccLazyCursor; use crate::mvcc::database::{MvStore, Row, RowID}; use crate::types::Text; -use crate::MemoryIO; use crate::RefValue; use crate::Value; use crate::{Database, StepResult}; +use crate::{MemoryIO, Statement}; // Simple atomic clock implementation for testing @@ -1243,3 +1243,76 @@ fn get_rows(conn: &Arc, query: &str) -> Vec> { } rows } + +#[test] +#[ignore] +fn test_concurrent_writes() { + struct ConnectionState { + conn: Arc, + inserts: Vec, + current_statement: Option, + } + let db = MvccTestDbNoConn::new_with_random_db(); + let mut connecitons = Vec::new(); + { + let conn = db.connect(); + conn.execute("CREATE TABLE test (x)").unwrap(); + conn.close().unwrap(); + } + for i in 0..2 { + let conn = db.connect(); + let mut inserts = ((100 * i)..(100 * (i + 1))).collect::>(); + inserts.reverse(); + connecitons.push(ConnectionState { + conn, + inserts, + current_statement: None, + }); + } + + loop { + let mut all_finished = true; + for conn in &mut connecitons { + if !conn.inserts.is_empty() && conn.current_statement.is_none() { + all_finished = false; + break; + } + } + for (conn_id, conn) in connecitons.iter_mut().enumerate() { + println!("connection {conn_id} inserts: {:?}", conn.inserts); + if conn.current_statement.is_none() && !conn.inserts.is_empty() { + let write = conn.inserts.pop().unwrap(); + println!("inserting row {write} from connection {conn_id}"); + conn.current_statement = Some( + conn.conn + .prepare(format!("INSERT INTO test (x) VALUES ({write})")) + .unwrap(), + ); + } + if conn.current_statement.is_none() { + continue; + } + let stmt = conn.current_statement.as_mut().unwrap(); + match stmt.step().unwrap() { + // These you be only possible cases in write concurrency. + // No rows because insert doesn't return + // No interrupt because insert doesn't interrupt + // No busy because insert in mvcc should be multi concurrent write + StepResult::Done => { + conn.current_statement = None; + } + StepResult::IO => { + // let's skip doing I/O here, we want to perform io only after all the statements are stepped + } + _ => { + unreachable!() + } + } + } + db.get_db().io.run_once().unwrap(); + + if all_finished { + break; + } + } +} diff --git a/core/state_machine.rs b/core/state_machine.rs index 0e3f30816..0d776df10 100644 --- a/core/state_machine.rs +++ b/core/state_machine.rs @@ -27,6 +27,7 @@ pub trait StateTransition { fn is_finalized(&self) -> bool; } +#[derive(Debug)] pub struct StateMachine { state: State, is_finalized: bool, diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 42e034320..7a989d0c5 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2164,7 +2164,7 @@ pub fn op_auto_commit( insn ); let conn = program.connection.clone(); - if state.commit_state == CommitState::Committing { + if matches!(state.commit_state, CommitState::Committing) { return program .commit_txn(pager.clone(), state, mv_store, *rollback) .map(Into::into); diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index f3a9425a8..1c844e08a 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -28,15 +28,18 @@ pub mod sorter; use crate::{ error::LimboError, function::{AggFunc, FuncCtx}, - state_machine::StateTransition, + mvcc::{database::CommitStateMachine, LocalClock}, + state_machine::{StateMachine, StateTransition, TransitionResult}, storage::sqlite3_ondisk::SmallVec, translate::{collate::CollationSeq, plan::TableReferences}, types::{IOCompletions, IOResult, RawSlice, TextRef}, - vdbe::execute::{ - OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState, - OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState, + vdbe::{ + execute::{ + OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState, + OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState, + }, + metrics::StatementMetrics, }, - vdbe::metrics::StatementMetrics, IOExt, RefValue, }; @@ -210,7 +213,8 @@ impl Bitfield { } } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] /// The commit state of the program. /// There are two states: /// - Ready: The program is ready to run the next instruction, or has shut down after @@ -220,6 +224,9 @@ impl Bitfield { enum CommitState { Ready, Committing, + CommitingMvcc { + state_machine: StateMachine>, + }, } #[derive(Debug, Clone)] @@ -550,25 +557,35 @@ impl Program { if auto_commit { // FIXME: we don't want to commit stuff from other programs. let mut mv_transactions = conn.mv_transactions.borrow_mut(); - for tx_id in mv_transactions.iter() { - let mut state_machine = - mv_store.commit_tx(*tx_id, pager.clone(), &conn).unwrap(); - // TODO: sync IO hack - loop { - let res = state_machine.step(mv_store)?; - match res { - crate::state_machine::TransitionResult::Io(io) => { - io.wait(conn._db.io.as_ref())?; - } - crate::state_machine::TransitionResult::Continue => continue, - crate::state_machine::TransitionResult::Done(_) => break, - } + if matches!(program_state.commit_state, CommitState::Ready) { + assert!( + mv_transactions.len() <= 1, + "for now we only support one mv transaction in single connection, {mv_transactions:?}", + ); + if mv_transactions.is_empty() { + return Ok(IOResult::Done(())); + } + let tx_id = mv_transactions.first().unwrap(); + let state_machine = mv_store.commit_tx(*tx_id, pager.clone(), &conn).unwrap(); + program_state.commit_state = CommitState::CommitingMvcc { state_machine }; + } + let CommitState::CommitingMvcc { state_machine } = &mut program_state.commit_state + else { + panic!("invalid state for mvcc commit step") + }; + match self.step_end_mvcc_txn(state_machine, mv_store)? { + IOResult::Done(_) => { + assert!(state_machine.is_finalized()); + conn.mv_tx_id.set(None); + conn.transaction_state.replace(TransactionState::None); + program_state.commit_state = CommitState::Ready; + mv_transactions.clear(); + return Ok(IOResult::Done(())); + } + IOResult::IO(io) => { + return Ok(IOResult::IO(io)); } - assert!(state_machine.is_finalized()); } - conn.mv_tx_id.set(None); - conn.transaction_state.replace(TransactionState::None); - mv_transactions.clear(); } Ok(IOResult::Done(())) } else { @@ -579,7 +596,7 @@ impl Program { auto_commit, program_state.commit_state ); - if program_state.commit_state == CommitState::Committing { + if matches!(program_state.commit_state, CommitState::Committing) { let TransactionState::Write { .. } = connection.transaction_state.get() else { unreachable!("invalid state for write commit step") }; @@ -644,6 +661,25 @@ impl Program { Ok(IOResult::Done(())) } + #[instrument(skip(self, commit_state, mv_store), level = Level::DEBUG)] + fn step_end_mvcc_txn( + &self, + commit_state: &mut StateMachine>, + mv_store: &Arc, + ) -> Result> { + loop { + match commit_state.step(mv_store)? { + TransitionResult::Continue => {} + TransitionResult::Io(iocompletions) => { + return Ok(IOResult::IO(iocompletions)); + } + TransitionResult::Done(_) => { + return Ok(IOResult::Done(())); + } + } + } + } + #[rustfmt::skip] pub fn explain(&self) -> String { let mut buff = String::with_capacity(1024);