mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-27 11:54:30 +01:00
Merge 'mvcc: simplify StateMachine' from Jussi Saurio
`TransitionResult::Continue` is an internal implementation detail that tells an invocation of `StateMachine::step()` to continue looping, but it is of no use to upstream callers. For this reason, just return an IOResult from StateMachine::step() which simplifies the result handling. Reviewed-by: Pere Diaz Bou <pere-altea@homail.com> Closes #3248
This commit is contained in:
@@ -5,8 +5,7 @@ use criterion::{criterion_group, criterion_main, Criterion, Throughput};
|
||||
use pprof::criterion::{Output, PProfProfiler};
|
||||
use turso_core::mvcc::clock::LocalClock;
|
||||
use turso_core::mvcc::database::{MvStore, Row, RowID};
|
||||
use turso_core::state_machine::{StateTransition, TransitionResult};
|
||||
use turso_core::types::{ImmutableRecord, Text};
|
||||
use turso_core::types::{IOResult, ImmutableRecord, Text};
|
||||
use turso_core::{Connection, Database, MemoryIO, Value};
|
||||
|
||||
struct BenchDb {
|
||||
@@ -55,9 +54,8 @@ fn bench(c: &mut Criterion) {
|
||||
loop {
|
||||
let res = sm.step(mv_store).unwrap();
|
||||
match res {
|
||||
TransitionResult::Io(io) => io.wait(db._db.io.as_ref()).unwrap(),
|
||||
TransitionResult::Continue => continue,
|
||||
TransitionResult::Done(_) => break,
|
||||
IOResult::IO(io) => io.wait(db._db.io.as_ref()).unwrap(),
|
||||
IOResult::Done(_) => break,
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -85,9 +83,8 @@ fn bench(c: &mut Criterion) {
|
||||
loop {
|
||||
let res = sm.step(mv_store).unwrap();
|
||||
match res {
|
||||
TransitionResult::Io(io) => io.wait(db._db.io.as_ref()).unwrap(),
|
||||
TransitionResult::Continue => continue,
|
||||
TransitionResult::Done(_) => break,
|
||||
IOResult::IO(io) => io.wait(db._db.io.as_ref()).unwrap(),
|
||||
IOResult::Done(_) => break,
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -121,9 +118,8 @@ fn bench(c: &mut Criterion) {
|
||||
loop {
|
||||
let res = sm.step(mv_store).unwrap();
|
||||
match res {
|
||||
TransitionResult::Io(io) => io.wait(db._db.io.as_ref()).unwrap(),
|
||||
TransitionResult::Continue => continue,
|
||||
TransitionResult::Done(_) => break,
|
||||
IOResult::IO(io) => io.wait(db._db.io.as_ref()).unwrap(),
|
||||
IOResult::Done(_) => break,
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
@@ -708,11 +708,8 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
|
||||
} => {
|
||||
let write_row_state_machine = self.write_row_state_machine.as_mut().unwrap();
|
||||
match write_row_state_machine.step(&())? {
|
||||
TransitionResult::Io(io) => return Ok(TransitionResult::Io(io)),
|
||||
TransitionResult::Continue => {
|
||||
return Ok(TransitionResult::Continue);
|
||||
}
|
||||
TransitionResult::Done(_) => {
|
||||
IOResult::IO(io) => return Ok(TransitionResult::Io(io)),
|
||||
IOResult::Done(_) => {
|
||||
let requires_seek = {
|
||||
if let Some(next_id) = self.write_set.get(*write_set_index + 1) {
|
||||
let current_id = &self.write_set[*write_set_index];
|
||||
@@ -744,11 +741,8 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
|
||||
} => {
|
||||
let delete_row_state_machine = self.delete_row_state_machine.as_mut().unwrap();
|
||||
match delete_row_state_machine.step(&())? {
|
||||
TransitionResult::Io(io) => return Ok(TransitionResult::Io(io)),
|
||||
TransitionResult::Continue => {
|
||||
return Ok(TransitionResult::Continue);
|
||||
}
|
||||
TransitionResult::Done(_) => {
|
||||
IOResult::IO(io) => return Ok(TransitionResult::Io(io)),
|
||||
IOResult::Done(_) => {
|
||||
self.state = CommitState::WriteRow {
|
||||
end_ts: *end_ts,
|
||||
write_set_index: *write_set_index + 1,
|
||||
|
||||
@@ -803,11 +803,10 @@ pub(crate) fn commit_tx(
|
||||
loop {
|
||||
let res = sm.step(&mv_store)?;
|
||||
match res {
|
||||
crate::state_machine::TransitionResult::Io(io) => {
|
||||
IOResult::IO(io) => {
|
||||
io.wait(conn.db.io.as_ref())?;
|
||||
}
|
||||
crate::state_machine::TransitionResult::Continue => continue,
|
||||
crate::state_machine::TransitionResult::Done(_) => break,
|
||||
IOResult::Done(_) => break,
|
||||
}
|
||||
}
|
||||
assert!(sm.is_finalized());
|
||||
@@ -827,11 +826,10 @@ pub(crate) fn commit_tx_no_conn(
|
||||
loop {
|
||||
let res = sm.step(&mv_store)?;
|
||||
match res {
|
||||
crate::state_machine::TransitionResult::Io(io) => {
|
||||
IOResult::IO(io) => {
|
||||
io.wait(conn.db.io.as_ref())?;
|
||||
}
|
||||
crate::state_machine::TransitionResult::Continue => continue,
|
||||
crate::state_machine::TransitionResult::Done(_) => break,
|
||||
IOResult::Done(_) => break,
|
||||
}
|
||||
}
|
||||
assert!(sm.is_finalized());
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
use crate::{types::IOCompletions, Result};
|
||||
use crate::{
|
||||
types::{IOCompletions, IOResult},
|
||||
Result,
|
||||
};
|
||||
|
||||
pub enum TransitionResult<Result> {
|
||||
Io(IOCompletions),
|
||||
@@ -41,20 +44,15 @@ impl<State: StateTransition> StateMachine<State> {
|
||||
is_finalized: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<State: StateTransition> StateTransition for StateMachine<State> {
|
||||
type Context = State::Context;
|
||||
type SMResult = State::SMResult;
|
||||
|
||||
fn step<'a>(&mut self, context: &Self::Context) -> Result<TransitionResult<Self::SMResult>> {
|
||||
pub fn step(&mut self, context: &State::Context) -> Result<IOResult<State::SMResult>> {
|
||||
loop {
|
||||
if self.is_finalized {
|
||||
unreachable!("StateMachine::transition: state machine is finalized");
|
||||
}
|
||||
match self.state.step(context)? {
|
||||
TransitionResult::Io(io) => {
|
||||
return Ok(TransitionResult::Io(io));
|
||||
return Ok(IOResult::IO(io));
|
||||
}
|
||||
TransitionResult::Continue => {
|
||||
continue;
|
||||
@@ -62,19 +60,19 @@ impl<State: StateTransition> StateTransition for StateMachine<State> {
|
||||
TransitionResult::Done(result) => {
|
||||
assert!(self.state.is_finalized());
|
||||
self.is_finalized = true;
|
||||
return Ok(TransitionResult::Done(result));
|
||||
return Ok(IOResult::Done(result));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn finalize(&mut self, context: &Self::Context) -> Result<()> {
|
||||
pub fn finalize(&mut self, context: &State::Context) -> Result<()> {
|
||||
self.state.finalize(context)?;
|
||||
self.is_finalized = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_finalized(&self) -> bool {
|
||||
pub fn is_finalized(&self) -> bool {
|
||||
self.is_finalized
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,7 +92,6 @@ impl Default for ChecksumContext {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::CompletionError;
|
||||
|
||||
fn get_random_page() -> [u8; CHECKSUM_PAGE_SIZE] {
|
||||
let mut page = [0u8; CHECKSUM_PAGE_SIZE];
|
||||
|
||||
@@ -29,7 +29,7 @@ use crate::{
|
||||
error::LimboError,
|
||||
function::{AggFunc, FuncCtx},
|
||||
mvcc::{database::CommitStateMachine, LocalClock},
|
||||
state_machine::{StateMachine, StateTransition, TransitionResult},
|
||||
state_machine::StateMachine,
|
||||
storage::sqlite3_ondisk::SmallVec,
|
||||
translate::{collate::CollationSeq, plan::TableReferences},
|
||||
types::{IOCompletions, IOResult, RawSlice, TextRef},
|
||||
@@ -932,17 +932,7 @@ impl Program {
|
||||
commit_state: &mut StateMachine<CommitStateMachine<LocalClock>>,
|
||||
mv_store: &Arc<MvStore>,
|
||||
) -> Result<IOResult<()>> {
|
||||
loop {
|
||||
match commit_state.step(mv_store)? {
|
||||
TransitionResult::Continue => {}
|
||||
TransitionResult::Io(iocompletions) => {
|
||||
return Ok(IOResult::IO(iocompletions));
|
||||
}
|
||||
TransitionResult::Done(_) => {
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
}
|
||||
}
|
||||
commit_state.step(mv_store)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user