From 0f70e7101fc9f2b2dcd9fd786c5bef18655d6e4b Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 1 Aug 2025 12:48:31 +0200 Subject: [PATCH] core/state_machine: move state_machine to its own file --- core/error.rs | 6 ++ core/lib.rs | 1 + core/mvcc/cursor.rs | 3 +- core/mvcc/database/mod.rs | 108 ++++++---------------------- core/mvcc/errors.rs | 13 ---- core/mvcc/mod.rs | 1 - core/mvcc/persistent_storage/mod.rs | 6 +- core/state_machine.rs | 79 ++++++++++++++++++++ core/vdbe/mod.rs | 2 +- 9 files changed, 114 insertions(+), 105 deletions(-) delete mode 100644 core/mvcc/errors.rs create mode 100644 core/state_machine.rs diff --git a/core/error.rs b/core/error.rs index 5e5ac89bf..97c6d563b 100644 --- a/core/error.rs +++ b/core/error.rs @@ -63,6 +63,12 @@ pub enum LimboError { Busy, #[error("Conflict: {0}")] Conflict(String), + #[error("Transaction terminated")] + TxTerminated, + #[error("Write-write conflict")] + WriteWriteConflict, + #[error("No such transaction ID: {0}")] + NoSuchTransactionID(String), } #[macro_export] diff --git a/core/lib.rs b/core/lib.rs index 229edabb3..41fe11f36 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -18,6 +18,7 @@ pub mod result; mod schema; #[cfg(feature = "series")] mod series; +mod state_machine; mod storage; #[allow(dead_code)] #[cfg(feature = "time")] diff --git a/core/mvcc/cursor.rs b/core/mvcc/cursor.rs index 3aa3bd490..b6965a4c4 100644 --- a/core/mvcc/cursor.rs +++ b/core/mvcc/cursor.rs @@ -1,6 +1,7 @@ use crate::mvcc::clock::LogicalClock; -use crate::mvcc::database::{MvStore, Result, Row, RowID}; +use crate::mvcc::database::{MvStore, Row, RowID}; use crate::Pager; +use crate::Result; use std::fmt::Debug; use std::rc::Rc; use std::sync::Arc; diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 596825eac..8e29e1740 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1,10 +1,14 @@ use crate::mvcc::clock::LogicalClock; -use crate::mvcc::errors::DatabaseError; use crate::mvcc::persistent_storage::Storage; +use crate::state_machine::StateMachine; +use crate::state_machine::StateTransition; +use crate::state_machine::TransitionResult; use crate::storage::btree::BTreeCursor; use crate::storage::btree::BTreeKey; use crate::types::IOResult; use crate::types::ImmutableRecord; +use crate::LimboError; +use crate::Result; use crate::{Connection, Pager}; use crossbeam_skiplist::{SkipMap, SkipSet}; use parking_lot::RwLock; @@ -15,8 +19,6 @@ use std::rc::Rc; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -pub type Result = std::result::Result; - #[cfg(test)] pub mod tests; @@ -236,71 +238,6 @@ impl AtomicTransactionState { } } -pub enum TransitionResult { - Io, - Continue, - Done, -} - -pub trait StateTransition { - type State; - type Context; - - fn transition<'a>(&mut self, context: &Self::Context) -> Result; - fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()>; - fn is_finalized(&self) -> bool; -} - -pub struct StateMachine { - state: State, - is_finalized: bool, -} - -impl StateMachine { - fn new(state: State) -> Self { - Self { - state, - is_finalized: false, - } - } -} - -impl StateTransition for StateMachine { - type State = State; - type Context = State::Context; - - fn transition<'a>(&mut self, context: &Self::Context) -> Result { - loop { - if self.is_finalized { - unreachable!("StateMachine::transition: state machine is finalized"); - } - match self.state.transition(context)? { - TransitionResult::Io => { - return Ok(TransitionResult::Io); - } - TransitionResult::Continue => { - continue; - } - TransitionResult::Done => { - assert!(self.state.is_finalized()); - self.is_finalized = true; - return Ok(TransitionResult::Done); - } - } - } - } - - fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()> { - self.state.finalize(context)?; - self.is_finalized = true; - Ok(()) - } - - fn is_finalized(&self) -> bool { - self.is_finalized - } -} - #[derive(Debug)] pub enum CommitState { Initial, @@ -381,10 +318,12 @@ impl StateTransition for CommitStateMachine { let tx = mvcc_store .txs .get(&self.tx_id) - .ok_or(DatabaseError::TxTerminated)?; + .ok_or(LimboError::TxTerminated)?; let tx = tx.value().write(); match tx.state.load() { - TransactionState::Terminated => return Err(DatabaseError::TxTerminated), + TransactionState::Terminated => { + return Err(LimboError::TxTerminated); + } _ => { assert_eq!(tx.state, TransactionState::Active); } @@ -483,7 +422,7 @@ impl StateTransition for CommitStateMachine { match self.pager.begin_write_tx() { Ok(crate::types::IOResult::Done(result)) => { if let crate::result::LimboResult::Busy = result { - return Err(DatabaseError::Io( + return Err(LimboError::InternalError( "Pager write transaction busy".to_string(), )); } @@ -495,7 +434,7 @@ impl StateTransition for CommitStateMachine { continue; } Err(e) => { - return Err(DatabaseError::Io(e.to_string())); + return Err(LimboError::InternalError(e.to_string())); } } } @@ -578,7 +517,7 @@ impl StateTransition for CommitStateMachine { &self.connection, self.connection.wal_checkpoint_disabled.get(), ) - .map_err(|e| DatabaseError::Io(e.to_string())) + .map_err(|e| LimboError::InternalError(e.to_string())) .unwrap(); if let crate::types::IOResult::Done(_) = result { break; @@ -689,10 +628,7 @@ impl StateTransition for WriteRowStateMachine { let seek_key = SeekKey::TableRowId(self.row.id.row_id); let cursor = self.cursor.as_mut().unwrap(); - match cursor - .seek(seek_key, SeekOp::GE { eq_only: true }) - .map_err(|e| DatabaseError::Io(e.to_string()))? - { + match cursor.seek(seek_key, SeekOp::GE { eq_only: true })? { IOResult::Done(_) => { self.state = WriteRowState::Insert; Ok(TransitionResult::Continue) @@ -709,7 +645,7 @@ impl StateTransition for WriteRowStateMachine { match cursor .insert(&key, true) - .map_err(|e| DatabaseError::Io(e.to_string()))? + .map_err(|e| LimboError::InternalError(e.to_string()))? { IOResult::Done(()) => { tracing::trace!( @@ -783,7 +719,7 @@ impl MvStore { let tx = self .txs .get(&tx_id) - .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + .ok_or(LimboError::NoSuchTransactionID(tx_id.to_string()))?; let mut tx = tx.value().write(); assert_eq!(tx.state, TransactionState::Active); let id = row.id; @@ -856,7 +792,7 @@ impl MvStore { let tx = self .txs .get(&tx_id) - .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + .ok_or(LimboError::NoSuchTransactionID(tx_id.to_string()))?; let tx = tx.value().read(); assert_eq!(tx.state, TransactionState::Active); // A transaction cannot delete a version that it cannot see, @@ -869,7 +805,7 @@ impl MvStore { drop(row_versions_opt); drop(tx); self.rollback_tx(tx_id, pager); - return Err(DatabaseError::WriteWriteConflict); + return Err(LimboError::WriteWriteConflict); } rv.end = Some(TxTimestampOrID::TxID(tx.tx_id)); @@ -879,7 +815,7 @@ impl MvStore { let tx = self .txs .get(&tx_id) - .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + .ok_or(LimboError::NoSuchTransactionID(tx_id.to_string()))?; let mut tx = tx.value().write(); tx.insert_to_write_set(id); return Ok(true); @@ -1229,7 +1165,7 @@ impl MvStore { loop { match cursor .rewind() - .map_err(|e| DatabaseError::Io(e.to_string()))? + .map_err(|e| LimboError::InternalError(e.to_string()))? { IOResult::Done(()) => break, IOResult::IO => { @@ -1241,7 +1177,7 @@ impl MvStore { loop { let rowid_result = cursor .rowid() - .map_err(|e| DatabaseError::Io(e.to_string()))?; + .map_err(|e| LimboError::InternalError(e.to_string()))?; let row_id = match rowid_result { IOResult::Done(Some(row_id)) => row_id, IOResult::Done(None) => break, @@ -1252,7 +1188,7 @@ impl MvStore { }; match cursor .record() - .map_err(|e| DatabaseError::Io(e.to_string()))? + .map_err(|e| LimboError::InternalError(e.to_string()))? { IOResult::Done(Some(record)) => { let id = RowID { table_id, row_id }; @@ -1274,7 +1210,7 @@ impl MvStore { // Move to next record match cursor .next() - .map_err(|e| DatabaseError::Io(e.to_string()))? + .map_err(|e| LimboError::InternalError(e.to_string()))? { IOResult::Done(has_next) => { if !has_next { diff --git a/core/mvcc/errors.rs b/core/mvcc/errors.rs deleted file mode 100644 index 6cdad8ca3..000000000 --- a/core/mvcc/errors.rs +++ /dev/null @@ -1,13 +0,0 @@ -use thiserror::Error; - -#[derive(Error, Debug, PartialEq)] -pub enum DatabaseError { - #[error("no such transaction ID: `{0}`")] - NoSuchTransactionID(u64), - #[error("transaction aborted because of a write-write conflict")] - WriteWriteConflict, - #[error("transaction is terminated")] - TxTerminated, - #[error("I/O error: {0}")] - Io(String), -} diff --git a/core/mvcc/mod.rs b/core/mvcc/mod.rs index da15f0244..b45a281e6 100644 --- a/core/mvcc/mod.rs +++ b/core/mvcc/mod.rs @@ -34,7 +34,6 @@ pub mod clock; pub mod cursor; pub mod database; -pub mod errors; pub mod persistent_storage; pub use clock::LocalClock; diff --git a/core/mvcc/persistent_storage/mod.rs b/core/mvcc/persistent_storage/mod.rs index 4b9b06407..3dbd891a0 100644 --- a/core/mvcc/persistent_storage/mod.rs +++ b/core/mvcc/persistent_storage/mod.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; -use crate::mvcc::database::{LogRecord, Result}; -use crate::mvcc::errors::DatabaseError; +use crate::mvcc::database::LogRecord; +use crate::{LimboError, Result}; #[derive(Debug)] pub enum Storage { @@ -24,7 +24,7 @@ impl Storage { pub fn read_tx_log(&self) -> Result> { match self { - Self::Noop => Err(DatabaseError::Io( + Self::Noop => Err(LimboError::InternalError( "cannot read from Noop storage".to_string(), )), } diff --git a/core/state_machine.rs b/core/state_machine.rs new file mode 100644 index 000000000..1a748a3f0 --- /dev/null +++ b/core/state_machine.rs @@ -0,0 +1,79 @@ +use crate::Result; + +pub enum TransitionResult { + Io, + Continue, + Done, +} + +/// A generic trait for state machines. +pub trait StateTransition { + type State; + type Context; + + /// Transition the state machine to the next state. + /// + /// Returns `TransitionResult::Io` if the state machine needs to perform an IO operation. + /// Returns `TransitionResult::Continue` if the state machine needs to continue. + /// Returns `TransitionResult::Done` if the state machine is done. + fn transition<'a>(&mut self, context: &Self::Context) -> Result; + + /// Finalize the state machine. + /// + /// This is called when the state machine is done. + fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()>; + + /// Check if the state machine is finalized. + fn is_finalized(&self) -> bool; +} + +pub struct StateMachine { + state: State, + is_finalized: bool, +} + +/// A generic state machine that loops calling `transition` until it returns `TransitionResult::Done` or `TransitionResult::Io`. +impl StateMachine { + pub fn new(state: State) -> Self { + Self { + state, + is_finalized: false, + } + } +} + +impl StateTransition for StateMachine { + type State = State; + type Context = State::Context; + + fn transition<'a>(&mut self, context: &Self::Context) -> Result { + loop { + if self.is_finalized { + unreachable!("StateMachine::transition: state machine is finalized"); + } + match self.state.transition(context)? { + TransitionResult::Io => { + return Ok(TransitionResult::Io); + } + TransitionResult::Continue => { + continue; + } + TransitionResult::Done => { + assert!(self.state.is_finalized()); + self.is_finalized = true; + return Ok(TransitionResult::Done); + } + } + } + } + + fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()> { + self.state.finalize(context)?; + self.is_finalized = true; + Ok(()) + } + + fn is_finalized(&self) -> bool { + self.is_finalized + } +} diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 9e3ebbb76..a9e17ba44 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -27,7 +27,7 @@ pub mod sorter; use crate::{ error::LimboError, function::{AggFunc, FuncCtx}, - mvcc::database::StateTransition, + state_machine::StateTransition, storage::sqlite3_ondisk::SmallVec, translate::plan::TableReferences, types::{IOResult, RawSlice, TextRef},