Merge ' core/mvcc: make commit_txn return on I/O ' from Pere Diaz Bou

`commit_txn` in MVCC was hacking its way through I/O until now. After
adding this and the test for concurrent writers we now see `busy` errors
returning as expected because there is no `commit` queueing happening
yet until next PR I open.

Closes #2895
This commit is contained in:
Pekka Enberg
2025-09-04 21:24:10 +03:00
committed by GitHub
5 changed files with 145 additions and 26 deletions

View File

@@ -271,6 +271,15 @@ pub struct CommitStateMachine<Clock: LogicalClock> {
_phantom: PhantomData<Clock>,
}
impl<Clock: LogicalClock> Debug for CommitStateMachine<Clock> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CommitStateMachine")
.field("state", &self.state)
.field("is_finalized", &self.is_finalized)
.finish()
}
}
pub struct WriteRowStateMachine {
state: WriteRowState,
is_finalized: bool,

View File

@@ -654,10 +654,10 @@ fn test_future_row() {
use crate::mvcc::cursor::MvccLazyCursor;
use crate::mvcc::database::{MvStore, Row, RowID};
use crate::types::Text;
use crate::MemoryIO;
use crate::RefValue;
use crate::Value;
use crate::{Database, StepResult};
use crate::{MemoryIO, Statement};
// Simple atomic clock implementation for testing
@@ -1243,3 +1243,76 @@ fn get_rows(conn: &Arc<Connection>, query: &str) -> Vec<Vec<Value>> {
}
rows
}
#[test]
#[ignore]
fn test_concurrent_writes() {
struct ConnectionState {
conn: Arc<Connection>,
inserts: Vec<i64>,
current_statement: Option<Statement>,
}
let db = MvccTestDbNoConn::new_with_random_db();
let mut connecitons = Vec::new();
{
let conn = db.connect();
conn.execute("CREATE TABLE test (x)").unwrap();
conn.close().unwrap();
}
for i in 0..2 {
let conn = db.connect();
let mut inserts = ((100 * i)..(100 * (i + 1))).collect::<Vec<_>>();
inserts.reverse();
connecitons.push(ConnectionState {
conn,
inserts,
current_statement: None,
});
}
loop {
let mut all_finished = true;
for conn in &mut connecitons {
if !conn.inserts.is_empty() && conn.current_statement.is_none() {
all_finished = false;
break;
}
}
for (conn_id, conn) in connecitons.iter_mut().enumerate() {
println!("connection {conn_id} inserts: {:?}", conn.inserts);
if conn.current_statement.is_none() && !conn.inserts.is_empty() {
let write = conn.inserts.pop().unwrap();
println!("inserting row {write} from connection {conn_id}");
conn.current_statement = Some(
conn.conn
.prepare(format!("INSERT INTO test (x) VALUES ({write})"))
.unwrap(),
);
}
if conn.current_statement.is_none() {
continue;
}
let stmt = conn.current_statement.as_mut().unwrap();
match stmt.step().unwrap() {
// These you be only possible cases in write concurrency.
// No rows because insert doesn't return
// No interrupt because insert doesn't interrupt
// No busy because insert in mvcc should be multi concurrent write
StepResult::Done => {
conn.current_statement = None;
}
StepResult::IO => {
// let's skip doing I/O here, we want to perform io only after all the statements are stepped
}
_ => {
unreachable!()
}
}
}
db.get_db().io.run_once().unwrap();
if all_finished {
break;
}
}
}

View File

@@ -27,6 +27,7 @@ pub trait StateTransition {
fn is_finalized(&self) -> bool;
}
#[derive(Debug)]
pub struct StateMachine<State: StateTransition> {
state: State,
is_finalized: bool,

View File

@@ -2164,7 +2164,7 @@ pub fn op_auto_commit(
insn
);
let conn = program.connection.clone();
if state.commit_state == CommitState::Committing {
if matches!(state.commit_state, CommitState::Committing) {
return program
.commit_txn(pager.clone(), state, mv_store, *rollback)
.map(Into::into);

View File

@@ -28,15 +28,18 @@ pub mod sorter;
use crate::{
error::LimboError,
function::{AggFunc, FuncCtx},
state_machine::StateTransition,
mvcc::{database::CommitStateMachine, LocalClock},
state_machine::{StateMachine, StateTransition, TransitionResult},
storage::sqlite3_ondisk::SmallVec,
translate::{collate::CollationSeq, plan::TableReferences},
types::{IOCompletions, IOResult, RawSlice, TextRef},
vdbe::execute::{
OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState,
OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState,
vdbe::{
execute::{
OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState,
OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState,
},
metrics::StatementMetrics,
},
vdbe::metrics::StatementMetrics,
IOExt, RefValue,
};
@@ -210,7 +213,8 @@ impl<const N: usize> Bitfield<N> {
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
/// The commit state of the program.
/// There are two states:
/// - Ready: The program is ready to run the next instruction, or has shut down after
@@ -220,6 +224,9 @@ impl<const N: usize> Bitfield<N> {
enum CommitState {
Ready,
Committing,
CommitingMvcc {
state_machine: StateMachine<CommitStateMachine<LocalClock>>,
},
}
#[derive(Debug, Clone)]
@@ -550,25 +557,35 @@ impl Program {
if auto_commit {
// FIXME: we don't want to commit stuff from other programs.
let mut mv_transactions = conn.mv_transactions.borrow_mut();
for tx_id in mv_transactions.iter() {
let mut state_machine =
mv_store.commit_tx(*tx_id, pager.clone(), &conn).unwrap();
// TODO: sync IO hack
loop {
let res = state_machine.step(mv_store)?;
match res {
crate::state_machine::TransitionResult::Io(io) => {
io.wait(conn._db.io.as_ref())?;
}
crate::state_machine::TransitionResult::Continue => continue,
crate::state_machine::TransitionResult::Done(_) => break,
}
if matches!(program_state.commit_state, CommitState::Ready) {
assert!(
mv_transactions.len() <= 1,
"for now we only support one mv transaction in single connection, {mv_transactions:?}",
);
if mv_transactions.is_empty() {
return Ok(IOResult::Done(()));
}
let tx_id = mv_transactions.first().unwrap();
let state_machine = mv_store.commit_tx(*tx_id, pager.clone(), &conn).unwrap();
program_state.commit_state = CommitState::CommitingMvcc { state_machine };
}
let CommitState::CommitingMvcc { state_machine } = &mut program_state.commit_state
else {
panic!("invalid state for mvcc commit step")
};
match self.step_end_mvcc_txn(state_machine, mv_store)? {
IOResult::Done(_) => {
assert!(state_machine.is_finalized());
conn.mv_tx_id.set(None);
conn.transaction_state.replace(TransactionState::None);
program_state.commit_state = CommitState::Ready;
mv_transactions.clear();
return Ok(IOResult::Done(()));
}
IOResult::IO(io) => {
return Ok(IOResult::IO(io));
}
assert!(state_machine.is_finalized());
}
conn.mv_tx_id.set(None);
conn.transaction_state.replace(TransactionState::None);
mv_transactions.clear();
}
Ok(IOResult::Done(()))
} else {
@@ -579,7 +596,7 @@ impl Program {
auto_commit,
program_state.commit_state
);
if program_state.commit_state == CommitState::Committing {
if matches!(program_state.commit_state, CommitState::Committing) {
let TransactionState::Write { .. } = connection.transaction_state.get() else {
unreachable!("invalid state for write commit step")
};
@@ -644,6 +661,25 @@ impl Program {
Ok(IOResult::Done(()))
}
#[instrument(skip(self, commit_state, mv_store), level = Level::DEBUG)]
fn step_end_mvcc_txn(
&self,
commit_state: &mut StateMachine<CommitStateMachine<LocalClock>>,
mv_store: &Arc<MvStore>,
) -> Result<IOResult<()>> {
loop {
match commit_state.step(mv_store)? {
TransitionResult::Continue => {}
TransitionResult::Io(iocompletions) => {
return Ok(IOResult::IO(iocompletions));
}
TransitionResult::Done(_) => {
return Ok(IOResult::Done(()));
}
}
}
}
#[rustfmt::skip]
pub fn explain(&self) -> String {
let mut buff = String::with_capacity(1024);