adjust mvcc code to return completions in state machines

This commit is contained in:
pedrocarlo
2025-08-12 15:38:02 -03:00
committed by Jussi Saurio
parent c1975cdfa1
commit fbe7e685ce
6 changed files with 28 additions and 35 deletions

View File

@@ -53,7 +53,7 @@ fn bench(c: &mut Criterion) {
loop {
let res = sm.step(mv_store).unwrap();
match res {
TransitionResult::Io => {}
TransitionResult::Io(io) => io.wait(db._db.io.as_ref()).unwrap(),
TransitionResult::Continue => continue,
TransitionResult::Done(_) => break,
}
@@ -83,7 +83,7 @@ fn bench(c: &mut Criterion) {
loop {
let res = sm.step(mv_store).unwrap();
match res {
TransitionResult::Io => {}
TransitionResult::Io(io) => io.wait(db._db.io.as_ref()).unwrap(),
TransitionResult::Continue => continue,
TransitionResult::Done(_) => break,
}
@@ -120,7 +120,7 @@ fn bench(c: &mut Criterion) {
loop {
let res = sm.step(mv_store).unwrap();
match res {
TransitionResult::Io => {}
TransitionResult::Io(io) => io.wait(db._db.io.as_ref()).unwrap(),
TransitionResult::Continue => continue,
TransitionResult::Done(_) => break,
}

View File

@@ -124,7 +124,7 @@ pub struct Database {
schema: Mutex<Arc<Schema>>,
db_file: Arc<dyn DatabaseStorage>,
path: String,
io: Arc<dyn IO>,
pub io: Arc<dyn IO>,
// Shared structures of a Database are the parts that are common to multiple threads that might
// create DB connections.
_shared_page_cache: Arc<RwLock<DumbLruPageCache>>,

View File

@@ -7,6 +7,7 @@ use crate::storage::btree::BTreeCursor;
use crate::storage::btree::BTreeKey;
use crate::types::IOResult;
use crate::types::ImmutableRecord;
use crate::IOExt;
use crate::LimboError;
use crate::Result;
use crate::{Connection, Pager};
@@ -419,25 +420,11 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
// 2. Choose a txn to write depending on some heuristics like amount of frames will be written.
// 3. ..
//
loop {
match self.pager.begin_write_tx() {
Ok(crate::types::IOResult::Done(result)) => {
if let crate::result::LimboResult::Busy = result {
return Err(LimboError::InternalError(
"Pager write transaction busy".to_string(),
));
}
break;
}
Ok(crate::types::IOResult::IO) => {
// FIXME: this is a hack to make the pager run the IO loop
self.pager.io.run_once().unwrap();
continue;
}
Err(e) => {
return Err(LimboError::InternalError(e.to_string()));
}
}
let result = self.pager.io.block(|| self.pager.begin_write_tx())?;
if let crate::result::LimboResult::Busy = result {
return Err(LimboError::InternalError(
"Pager write transaction busy".to_string(),
));
}
self.state = CommitState::WriteRow {
end_ts,
@@ -492,7 +479,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
} => {
let write_row_state_machine = self.write_row_state_machine.as_mut().unwrap();
match write_row_state_machine.step(&())? {
TransitionResult::Io => return Ok(TransitionResult::Io),
TransitionResult::Io(io) => return Ok(TransitionResult::Io(io)),
TransitionResult::Continue => {
return Ok(TransitionResult::Continue);
}
@@ -633,8 +620,8 @@ impl StateTransition for WriteRowStateMachine {
self.state = WriteRowState::Insert;
Ok(TransitionResult::Continue)
}
IOResult::IO => {
return Ok(TransitionResult::Io);
IOResult::IO(io) => {
return Ok(TransitionResult::Io(io));
}
}
}
@@ -656,8 +643,8 @@ impl StateTransition for WriteRowStateMachine {
self.finalize(&())?;
Ok(TransitionResult::Done(()))
}
IOResult::IO => {
return Ok(TransitionResult::Io);
IOResult::IO(io) => {
return Ok(TransitionResult::Io(io));
}
}
}

View File

@@ -718,7 +718,9 @@ pub(crate) fn commit_tx(
loop {
let res = sm.step(&mv_store)?;
match res {
crate::state_machine::TransitionResult::Io => {}
crate::state_machine::TransitionResult::Io(io) => {
io.wait(conn._db.io.as_ref())?;
}
crate::state_machine::TransitionResult::Continue => continue,
crate::state_machine::TransitionResult::Done(_) => break,
}
@@ -740,7 +742,9 @@ pub(crate) fn commit_tx_no_conn(
loop {
let res = sm.step(&mv_store)?;
match res {
crate::state_machine::TransitionResult::Io => {}
crate::state_machine::TransitionResult::Io(io) => {
io.wait(conn._db.io.as_ref())?;
}
crate::state_machine::TransitionResult::Continue => continue,
crate::state_machine::TransitionResult::Done(_) => break,
}

View File

@@ -1,7 +1,7 @@
use crate::Result;
use crate::{types::IOCompletions, Result};
pub enum TransitionResult<Result> {
Io,
Io(IOCompletions),
Continue,
Done(Result),
}
@@ -52,8 +52,8 @@ impl<State: StateTransition> StateTransition for StateMachine<State> {
unreachable!("StateMachine::transition: state machine is finalized");
}
match self.state.step(context)? {
TransitionResult::Io => {
return Ok(TransitionResult::Io);
TransitionResult::Io(io) => {
return Ok(TransitionResult::Io(io));
}
TransitionResult::Continue => {
continue;

View File

@@ -511,7 +511,9 @@ impl Program {
loop {
let res = state_machine.step(mv_store)?;
match res {
crate::state_machine::TransitionResult::Io => {}
crate::state_machine::TransitionResult::Io(io) => {
io.wait(conn._db.io.as_ref())?;
}
crate::state_machine::TransitionResult::Continue => continue,
crate::state_machine::TransitionResult::Done(_) => break,
}