core/state_machine: move state_machine to its own file

This commit is contained in:
Pere Diaz Bou
2025-08-01 12:48:31 +02:00
parent 27757ab4eb
commit 0f70e7101f
9 changed files with 114 additions and 105 deletions

View File

@@ -63,6 +63,12 @@ pub enum LimboError {
Busy,
#[error("Conflict: {0}")]
Conflict(String),
#[error("Transaction terminated")]
TxTerminated,
#[error("Write-write conflict")]
WriteWriteConflict,
#[error("No such transaction ID: {0}")]
NoSuchTransactionID(String),
}
#[macro_export]

View File

@@ -18,6 +18,7 @@ pub mod result;
mod schema;
#[cfg(feature = "series")]
mod series;
mod state_machine;
mod storage;
#[allow(dead_code)]
#[cfg(feature = "time")]

View File

@@ -1,6 +1,7 @@
use crate::mvcc::clock::LogicalClock;
use crate::mvcc::database::{MvStore, Result, Row, RowID};
use crate::mvcc::database::{MvStore, Row, RowID};
use crate::Pager;
use crate::Result;
use std::fmt::Debug;
use std::rc::Rc;
use std::sync::Arc;

View File

@@ -1,10 +1,14 @@
use crate::mvcc::clock::LogicalClock;
use crate::mvcc::errors::DatabaseError;
use crate::mvcc::persistent_storage::Storage;
use crate::state_machine::StateMachine;
use crate::state_machine::StateTransition;
use crate::state_machine::TransitionResult;
use crate::storage::btree::BTreeCursor;
use crate::storage::btree::BTreeKey;
use crate::types::IOResult;
use crate::types::ImmutableRecord;
use crate::LimboError;
use crate::Result;
use crate::{Connection, Pager};
use crossbeam_skiplist::{SkipMap, SkipSet};
use parking_lot::RwLock;
@@ -15,8 +19,6 @@ use std::rc::Rc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
pub type Result<T> = std::result::Result<T, DatabaseError>;
#[cfg(test)]
pub mod tests;
@@ -236,71 +238,6 @@ impl AtomicTransactionState {
}
}
pub enum TransitionResult {
Io,
Continue,
Done,
}
pub trait StateTransition {
type State;
type Context;
fn transition<'a>(&mut self, context: &Self::Context) -> Result<TransitionResult>;
fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()>;
fn is_finalized(&self) -> bool;
}
pub struct StateMachine<State: StateTransition> {
state: State,
is_finalized: bool,
}
impl<State: StateTransition> StateMachine<State> {
fn new(state: State) -> Self {
Self {
state,
is_finalized: false,
}
}
}
impl<State: StateTransition> StateTransition for StateMachine<State> {
type State = State;
type Context = State::Context;
fn transition<'a>(&mut self, context: &Self::Context) -> Result<TransitionResult> {
loop {
if self.is_finalized {
unreachable!("StateMachine::transition: state machine is finalized");
}
match self.state.transition(context)? {
TransitionResult::Io => {
return Ok(TransitionResult::Io);
}
TransitionResult::Continue => {
continue;
}
TransitionResult::Done => {
assert!(self.state.is_finalized());
self.is_finalized = true;
return Ok(TransitionResult::Done);
}
}
}
}
fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()> {
self.state.finalize(context)?;
self.is_finalized = true;
Ok(())
}
fn is_finalized(&self) -> bool {
self.is_finalized
}
}
#[derive(Debug)]
pub enum CommitState {
Initial,
@@ -381,10 +318,12 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
let tx = mvcc_store
.txs
.get(&self.tx_id)
.ok_or(DatabaseError::TxTerminated)?;
.ok_or(LimboError::TxTerminated)?;
let tx = tx.value().write();
match tx.state.load() {
TransactionState::Terminated => return Err(DatabaseError::TxTerminated),
TransactionState::Terminated => {
return Err(LimboError::TxTerminated);
}
_ => {
assert_eq!(tx.state, TransactionState::Active);
}
@@ -483,7 +422,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
match self.pager.begin_write_tx() {
Ok(crate::types::IOResult::Done(result)) => {
if let crate::result::LimboResult::Busy = result {
return Err(DatabaseError::Io(
return Err(LimboError::InternalError(
"Pager write transaction busy".to_string(),
));
}
@@ -495,7 +434,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
continue;
}
Err(e) => {
return Err(DatabaseError::Io(e.to_string()));
return Err(LimboError::InternalError(e.to_string()));
}
}
}
@@ -578,7 +517,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
&self.connection,
self.connection.wal_checkpoint_disabled.get(),
)
.map_err(|e| DatabaseError::Io(e.to_string()))
.map_err(|e| LimboError::InternalError(e.to_string()))
.unwrap();
if let crate::types::IOResult::Done(_) = result {
break;
@@ -689,10 +628,7 @@ impl StateTransition for WriteRowStateMachine {
let seek_key = SeekKey::TableRowId(self.row.id.row_id);
let cursor = self.cursor.as_mut().unwrap();
match cursor
.seek(seek_key, SeekOp::GE { eq_only: true })
.map_err(|e| DatabaseError::Io(e.to_string()))?
{
match cursor.seek(seek_key, SeekOp::GE { eq_only: true })? {
IOResult::Done(_) => {
self.state = WriteRowState::Insert;
Ok(TransitionResult::Continue)
@@ -709,7 +645,7 @@ impl StateTransition for WriteRowStateMachine {
match cursor
.insert(&key, true)
.map_err(|e| DatabaseError::Io(e.to_string()))?
.map_err(|e| LimboError::InternalError(e.to_string()))?
{
IOResult::Done(()) => {
tracing::trace!(
@@ -783,7 +719,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let tx = self
.txs
.get(&tx_id)
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
.ok_or(LimboError::NoSuchTransactionID(tx_id.to_string()))?;
let mut tx = tx.value().write();
assert_eq!(tx.state, TransactionState::Active);
let id = row.id;
@@ -856,7 +792,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let tx = self
.txs
.get(&tx_id)
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
.ok_or(LimboError::NoSuchTransactionID(tx_id.to_string()))?;
let tx = tx.value().read();
assert_eq!(tx.state, TransactionState::Active);
// A transaction cannot delete a version that it cannot see,
@@ -869,7 +805,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
drop(row_versions_opt);
drop(tx);
self.rollback_tx(tx_id, pager);
return Err(DatabaseError::WriteWriteConflict);
return Err(LimboError::WriteWriteConflict);
}
rv.end = Some(TxTimestampOrID::TxID(tx.tx_id));
@@ -879,7 +815,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let tx = self
.txs
.get(&tx_id)
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
.ok_or(LimboError::NoSuchTransactionID(tx_id.to_string()))?;
let mut tx = tx.value().write();
tx.insert_to_write_set(id);
return Ok(true);
@@ -1229,7 +1165,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
loop {
match cursor
.rewind()
.map_err(|e| DatabaseError::Io(e.to_string()))?
.map_err(|e| LimboError::InternalError(e.to_string()))?
{
IOResult::Done(()) => break,
IOResult::IO => {
@@ -1241,7 +1177,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
loop {
let rowid_result = cursor
.rowid()
.map_err(|e| DatabaseError::Io(e.to_string()))?;
.map_err(|e| LimboError::InternalError(e.to_string()))?;
let row_id = match rowid_result {
IOResult::Done(Some(row_id)) => row_id,
IOResult::Done(None) => break,
@@ -1252,7 +1188,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
};
match cursor
.record()
.map_err(|e| DatabaseError::Io(e.to_string()))?
.map_err(|e| LimboError::InternalError(e.to_string()))?
{
IOResult::Done(Some(record)) => {
let id = RowID { table_id, row_id };
@@ -1274,7 +1210,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
// Move to next record
match cursor
.next()
.map_err(|e| DatabaseError::Io(e.to_string()))?
.map_err(|e| LimboError::InternalError(e.to_string()))?
{
IOResult::Done(has_next) => {
if !has_next {

View File

@@ -1,13 +0,0 @@
use thiserror::Error;
#[derive(Error, Debug, PartialEq)]
pub enum DatabaseError {
#[error("no such transaction ID: `{0}`")]
NoSuchTransactionID(u64),
#[error("transaction aborted because of a write-write conflict")]
WriteWriteConflict,
#[error("transaction is terminated")]
TxTerminated,
#[error("I/O error: {0}")]
Io(String),
}

View File

@@ -34,7 +34,6 @@
pub mod clock;
pub mod cursor;
pub mod database;
pub mod errors;
pub mod persistent_storage;
pub use clock::LocalClock;

View File

@@ -1,7 +1,7 @@
use std::fmt::Debug;
use crate::mvcc::database::{LogRecord, Result};
use crate::mvcc::errors::DatabaseError;
use crate::mvcc::database::LogRecord;
use crate::{LimboError, Result};
#[derive(Debug)]
pub enum Storage {
@@ -24,7 +24,7 @@ impl Storage {
pub fn read_tx_log(&self) -> Result<Vec<LogRecord>> {
match self {
Self::Noop => Err(DatabaseError::Io(
Self::Noop => Err(LimboError::InternalError(
"cannot read from Noop storage".to_string(),
)),
}

79
core/state_machine.rs Normal file
View File

@@ -0,0 +1,79 @@
use crate::Result;
pub enum TransitionResult {
Io,
Continue,
Done,
}
/// A generic trait for state machines.
pub trait StateTransition {
type State;
type Context;
/// Transition the state machine to the next state.
///
/// Returns `TransitionResult::Io` if the state machine needs to perform an IO operation.
/// Returns `TransitionResult::Continue` if the state machine needs to continue.
/// Returns `TransitionResult::Done` if the state machine is done.
fn transition<'a>(&mut self, context: &Self::Context) -> Result<TransitionResult>;
/// Finalize the state machine.
///
/// This is called when the state machine is done.
fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()>;
/// Check if the state machine is finalized.
fn is_finalized(&self) -> bool;
}
pub struct StateMachine<State: StateTransition> {
state: State,
is_finalized: bool,
}
/// A generic state machine that loops calling `transition` until it returns `TransitionResult::Done` or `TransitionResult::Io`.
impl<State: StateTransition> StateMachine<State> {
pub fn new(state: State) -> Self {
Self {
state,
is_finalized: false,
}
}
}
impl<State: StateTransition> StateTransition for StateMachine<State> {
type State = State;
type Context = State::Context;
fn transition<'a>(&mut self, context: &Self::Context) -> Result<TransitionResult> {
loop {
if self.is_finalized {
unreachable!("StateMachine::transition: state machine is finalized");
}
match self.state.transition(context)? {
TransitionResult::Io => {
return Ok(TransitionResult::Io);
}
TransitionResult::Continue => {
continue;
}
TransitionResult::Done => {
assert!(self.state.is_finalized());
self.is_finalized = true;
return Ok(TransitionResult::Done);
}
}
}
}
fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()> {
self.state.finalize(context)?;
self.is_finalized = true;
Ok(())
}
fn is_finalized(&self) -> bool {
self.is_finalized
}
}

View File

@@ -27,7 +27,7 @@ pub mod sorter;
use crate::{
error::LimboError,
function::{AggFunc, FuncCtx},
mvcc::database::StateTransition,
state_machine::StateTransition,
storage::sqlite3_ondisk::SmallVec,
translate::plan::TableReferences,
types::{IOResult, RawSlice, TextRef},