From fbe7e685ceafc96bb8e7c39ee6f362eba31ee9c0 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Tue, 12 Aug 2025 15:38:02 -0300 Subject: [PATCH] adjust mvcc code to return completions in state machines --- core/benches/mvcc_benchmark.rs | 6 +++--- core/lib.rs | 2 +- core/mvcc/database/mod.rs | 35 +++++++++++----------------------- core/mvcc/database/tests.rs | 8 ++++++-- core/state_machine.rs | 8 ++++---- core/vdbe/mod.rs | 4 +++- 6 files changed, 28 insertions(+), 35 deletions(-) diff --git a/core/benches/mvcc_benchmark.rs b/core/benches/mvcc_benchmark.rs index 64bf7cc75..69754f416 100644 --- a/core/benches/mvcc_benchmark.rs +++ b/core/benches/mvcc_benchmark.rs @@ -53,7 +53,7 @@ fn bench(c: &mut Criterion) { loop { let res = sm.step(mv_store).unwrap(); match res { - TransitionResult::Io => {} + TransitionResult::Io(io) => io.wait(db._db.io.as_ref()).unwrap(), TransitionResult::Continue => continue, TransitionResult::Done(_) => break, } @@ -83,7 +83,7 @@ fn bench(c: &mut Criterion) { loop { let res = sm.step(mv_store).unwrap(); match res { - TransitionResult::Io => {} + TransitionResult::Io(io) => io.wait(db._db.io.as_ref()).unwrap(), TransitionResult::Continue => continue, TransitionResult::Done(_) => break, } @@ -120,7 +120,7 @@ fn bench(c: &mut Criterion) { loop { let res = sm.step(mv_store).unwrap(); match res { - TransitionResult::Io => {} + TransitionResult::Io(io) => io.wait(db._db.io.as_ref()).unwrap(), TransitionResult::Continue => continue, TransitionResult::Done(_) => break, } diff --git a/core/lib.rs b/core/lib.rs index b8324eb1e..b83936cc4 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -124,7 +124,7 @@ pub struct Database { schema: Mutex>, db_file: Arc, path: String, - io: Arc, + pub io: Arc, // Shared structures of a Database are the parts that are common to multiple threads that might // create DB connections. _shared_page_cache: Arc>, diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index d4e460b43..e78b9dcd0 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -7,6 +7,7 @@ use crate::storage::btree::BTreeCursor; use crate::storage::btree::BTreeKey; use crate::types::IOResult; use crate::types::ImmutableRecord; +use crate::IOExt; use crate::LimboError; use crate::Result; use crate::{Connection, Pager}; @@ -419,25 +420,11 @@ impl StateTransition for CommitStateMachine { // 2. Choose a txn to write depending on some heuristics like amount of frames will be written. // 3. .. // - loop { - match self.pager.begin_write_tx() { - Ok(crate::types::IOResult::Done(result)) => { - if let crate::result::LimboResult::Busy = result { - return Err(LimboError::InternalError( - "Pager write transaction busy".to_string(), - )); - } - break; - } - Ok(crate::types::IOResult::IO) => { - // FIXME: this is a hack to make the pager run the IO loop - self.pager.io.run_once().unwrap(); - continue; - } - Err(e) => { - return Err(LimboError::InternalError(e.to_string())); - } - } + let result = self.pager.io.block(|| self.pager.begin_write_tx())?; + if let crate::result::LimboResult::Busy = result { + return Err(LimboError::InternalError( + "Pager write transaction busy".to_string(), + )); } self.state = CommitState::WriteRow { end_ts, @@ -492,7 +479,7 @@ impl StateTransition for CommitStateMachine { } => { let write_row_state_machine = self.write_row_state_machine.as_mut().unwrap(); match write_row_state_machine.step(&())? { - TransitionResult::Io => return Ok(TransitionResult::Io), + TransitionResult::Io(io) => return Ok(TransitionResult::Io(io)), TransitionResult::Continue => { return Ok(TransitionResult::Continue); } @@ -633,8 +620,8 @@ impl StateTransition for WriteRowStateMachine { self.state = WriteRowState::Insert; Ok(TransitionResult::Continue) } - IOResult::IO => { - return Ok(TransitionResult::Io); + IOResult::IO(io) => { + return Ok(TransitionResult::Io(io)); } } } @@ -656,8 +643,8 @@ impl StateTransition for WriteRowStateMachine { self.finalize(&())?; Ok(TransitionResult::Done(())) } - IOResult::IO => { - return Ok(TransitionResult::Io); + IOResult::IO(io) => { + return Ok(TransitionResult::Io(io)); } } } diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 4f6d4a10b..2ae781891 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -718,7 +718,9 @@ pub(crate) fn commit_tx( loop { let res = sm.step(&mv_store)?; match res { - crate::state_machine::TransitionResult::Io => {} + crate::state_machine::TransitionResult::Io(io) => { + io.wait(conn._db.io.as_ref())?; + } crate::state_machine::TransitionResult::Continue => continue, crate::state_machine::TransitionResult::Done(_) => break, } @@ -740,7 +742,9 @@ pub(crate) fn commit_tx_no_conn( loop { let res = sm.step(&mv_store)?; match res { - crate::state_machine::TransitionResult::Io => {} + crate::state_machine::TransitionResult::Io(io) => { + io.wait(conn._db.io.as_ref())?; + } crate::state_machine::TransitionResult::Continue => continue, crate::state_machine::TransitionResult::Done(_) => break, } diff --git a/core/state_machine.rs b/core/state_machine.rs index 0ee710ecd..0e3f30816 100644 --- a/core/state_machine.rs +++ b/core/state_machine.rs @@ -1,7 +1,7 @@ -use crate::Result; +use crate::{types::IOCompletions, Result}; pub enum TransitionResult { - Io, + Io(IOCompletions), Continue, Done(Result), } @@ -52,8 +52,8 @@ impl StateTransition for StateMachine { unreachable!("StateMachine::transition: state machine is finalized"); } match self.state.step(context)? { - TransitionResult::Io => { - return Ok(TransitionResult::Io); + TransitionResult::Io(io) => { + return Ok(TransitionResult::Io(io)); } TransitionResult::Continue => { continue; diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index cb411380e..9232887d3 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -511,7 +511,9 @@ impl Program { loop { let res = state_machine.step(mv_store)?; match res { - crate::state_machine::TransitionResult::Io => {} + crate::state_machine::TransitionResult::Io(io) => { + io.wait(conn._db.io.as_ref())?; + } crate::state_machine::TransitionResult::Continue => continue, crate::state_machine::TransitionResult::Done(_) => break, }