mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-29 12:54:22 +01:00
core/mvcc: commit_tx state machine
This commit is contained in:
@@ -10,6 +10,7 @@ use crossbeam_skiplist::{SkipMap, SkipSet};
|
||||
use parking_lot::RwLock;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
use std::rc::Rc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
@@ -235,6 +236,349 @@ impl AtomicTransactionState {
|
||||
}
|
||||
}
|
||||
|
||||
pub enum TransitionResult {
|
||||
Io,
|
||||
Continue,
|
||||
Done,
|
||||
}
|
||||
|
||||
pub trait StateTransition {
|
||||
type State;
|
||||
type Context;
|
||||
|
||||
fn transition<'a>(&mut self, context: &Self::Context) -> Result<TransitionResult>;
|
||||
fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()>;
|
||||
fn is_finalized(&self) -> bool;
|
||||
}
|
||||
|
||||
pub struct StateMachine<State: StateTransition> {
|
||||
state: State,
|
||||
is_finalized: bool,
|
||||
}
|
||||
|
||||
impl<State: StateTransition> StateMachine<State> {
|
||||
fn new(state: State) -> Self {
|
||||
Self {
|
||||
state,
|
||||
is_finalized: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<State: StateTransition> StateTransition for StateMachine<State> {
|
||||
type State = State;
|
||||
type Context = State::Context;
|
||||
|
||||
fn transition<'a>(&mut self, context: &Self::Context) -> Result<TransitionResult> {
|
||||
loop {
|
||||
if self.is_finalized {
|
||||
unreachable!("StateMachine::transition: state machine is finalized");
|
||||
}
|
||||
match self.state.transition(context)? {
|
||||
TransitionResult::Io => {
|
||||
return Ok(TransitionResult::Io);
|
||||
}
|
||||
TransitionResult::Continue => {
|
||||
continue;
|
||||
}
|
||||
TransitionResult::Done => {
|
||||
assert!(self.state.is_finalized());
|
||||
self.is_finalized = true;
|
||||
return Ok(TransitionResult::Done);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()> {
|
||||
self.state.finalize(context)?;
|
||||
self.is_finalized = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_finalized(&self) -> bool {
|
||||
self.is_finalized
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum CommitState {
|
||||
Initial,
|
||||
BeginPagerTxn { end_ts: u64 },
|
||||
WriteRows { end_ts: u64 },
|
||||
CommitPagerTxn { end_ts: u64 },
|
||||
Commit { end_ts: u64 },
|
||||
}
|
||||
|
||||
struct CommitStateMachine<Clock: LogicalClock> {
|
||||
state: CommitState,
|
||||
is_finalized: bool,
|
||||
pager: Rc<Pager>,
|
||||
tx_id: TxID,
|
||||
connection: Arc<Connection>,
|
||||
write_set: Vec<RowID>,
|
||||
_phantom: PhantomData<Clock>,
|
||||
}
|
||||
|
||||
impl<Clock: LogicalClock> CommitStateMachine<Clock> {
|
||||
fn new(state: CommitState, pager: Rc<Pager>, tx_id: TxID, connection: Arc<Connection>) -> Self {
|
||||
Self {
|
||||
state,
|
||||
is_finalized: false,
|
||||
pager,
|
||||
tx_id,
|
||||
connection,
|
||||
write_set: Vec::new(),
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
|
||||
type State = CommitStateMachine<Clock>;
|
||||
type Context = MvStore<Clock>;
|
||||
|
||||
#[tracing::instrument(fields(state = ?self.state), skip(self, mvcc_store))]
|
||||
fn transition<'a>(&mut self, mvcc_store: &Self::Context) -> Result<TransitionResult> {
|
||||
match self.state {
|
||||
CommitState::Initial => {
|
||||
let end_ts = mvcc_store.get_timestamp();
|
||||
// NOTICE: the first shadowed tx keeps the entry alive in the map
|
||||
// for the duration of this whole function, which is important for correctness!
|
||||
let tx = mvcc_store
|
||||
.txs
|
||||
.get(&self.tx_id)
|
||||
.ok_or(DatabaseError::TxTerminated)?;
|
||||
let tx = tx.value().write();
|
||||
match tx.state.load() {
|
||||
TransactionState::Terminated => return Err(DatabaseError::TxTerminated),
|
||||
_ => {
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
}
|
||||
}
|
||||
tx.state.store(TransactionState::Preparing);
|
||||
tracing::trace!("prepare_tx(tx_id={})", self.tx_id);
|
||||
|
||||
/* TODO: The code we have here is sufficient for snapshot isolation.
|
||||
** In order to implement serializability, we need the following steps:
|
||||
**
|
||||
** 1. Validate if all read versions are still visible by inspecting the read_set
|
||||
** 2. Validate if there are no phantoms by walking the scans from scan_set (which we don't even have yet)
|
||||
** - a phantom is a version that became visible in the middle of our transaction,
|
||||
** but wasn't taken into account during one of the scans from the scan_set
|
||||
** 3. Wait for commit dependencies, which we don't even track yet...
|
||||
** Excerpt from what's a commit dependency and how it's tracked in the original paper:
|
||||
** """
|
||||
A transaction T1 has a commit dependency on another transaction
|
||||
T2, if T1 is allowed to commit only if T2 commits. If T2 aborts,
|
||||
T1 must also abort, so cascading aborts are possible. T1 acquires a
|
||||
commit dependency either by speculatively reading or speculatively ignoring a version,
|
||||
instead of waiting for T2 to commit.
|
||||
We implement commit dependencies by a register-and-report
|
||||
approach: T1 registers its dependency with T2 and T2 informs T1
|
||||
when it has committed or aborted. Each transaction T contains a
|
||||
counter, CommitDepCounter, that counts how many unresolved
|
||||
commit dependencies it still has. A transaction cannot commit
|
||||
until this counter is zero. In addition, T has a Boolean variable
|
||||
AbortNow that other transactions can set to tell T to abort. Each
|
||||
transaction T also has a set, CommitDepSet, that stores transaction IDs
|
||||
of the transactions that depend on T.
|
||||
To take a commit dependency on a transaction T2, T1 increments
|
||||
its CommitDepCounter and adds its transaction ID to T2’s CommitDepSet.
|
||||
When T2 has committed, it locates each transaction in
|
||||
its CommitDepSet and decrements their CommitDepCounter. If
|
||||
T2 aborted, it tells the dependent transactions to also abort by
|
||||
setting their AbortNow flags. If a dependent transaction is not
|
||||
found, this means that it has already aborted.
|
||||
Note that a transaction with commit dependencies may not have to
|
||||
wait at all - the dependencies may have been resolved before it is
|
||||
ready to commit. Commit dependencies consolidate all waits into
|
||||
a single wait and postpone the wait to just before commit.
|
||||
Some transactions may have to wait before commit.
|
||||
Waiting raises a concern of deadlocks.
|
||||
However, deadlocks cannot occur because an older transaction never
|
||||
waits on a younger transaction. In
|
||||
a wait-for graph the direction of edges would always be from a
|
||||
younger transaction (higher end timestamp) to an older transaction
|
||||
(lower end timestamp) so cycles are impossible.
|
||||
"""
|
||||
** If you're wondering when a speculative read happens, here you go:
|
||||
** Case 1: speculative read of TB:
|
||||
"""
|
||||
If transaction TB is in the Preparing state, it has acquired an end
|
||||
timestamp TS which will be V’s begin timestamp if TB commits.
|
||||
A safe approach in this situation would be to have transaction T
|
||||
wait until transaction TB commits. However, we want to avoid all
|
||||
blocking during normal processing so instead we continue with
|
||||
the visibility test and, if the test returns true, allow T to
|
||||
speculatively read V. Transaction T acquires a commit dependency on
|
||||
TB, restricting the serialization order of the two transactions. That
|
||||
is, T is allowed to commit only if TB commits.
|
||||
"""
|
||||
** Case 2: speculative ignore of TE:
|
||||
"""
|
||||
If TE’s state is Preparing, it has an end timestamp TS that will become
|
||||
the end timestamp of V if TE does commit. If TS is greater than the read
|
||||
time RT, it is obvious that V will be visible if TE commits. If TE
|
||||
aborts, V will still be visible, because any transaction that updates
|
||||
V after TE has aborted will obtain an end timestamp greater than
|
||||
TS. If TS is less than RT, we have a more complicated situation:
|
||||
if TE commits, V will not be visible to T but if TE aborts, it will
|
||||
be visible. We could handle this by forcing T to wait until TE
|
||||
commits or aborts but we want to avoid all blocking during normal processing.
|
||||
Instead we allow T to speculatively ignore V and
|
||||
proceed with its processing. Transaction T acquires a commit
|
||||
dependency (see Section 2.7) on TE, that is, T is allowed to commit
|
||||
only if TE commits.
|
||||
"""
|
||||
*/
|
||||
tx.state.store(TransactionState::Committed(end_ts));
|
||||
tracing::trace!("commit_tx(tx_id={})", self.tx_id);
|
||||
self.write_set
|
||||
.extend(tx.write_set.iter().map(|v| *v.value()));
|
||||
self.state = CommitState::BeginPagerTxn { end_ts };
|
||||
Ok(TransitionResult::Continue)
|
||||
}
|
||||
CommitState::BeginPagerTxn { end_ts } => {
|
||||
// FIXME: how do we deal with multiple concurrent writes?
|
||||
// WAL requires a txn to be written sequentially. Either we:
|
||||
// 1. Wait for currently writer to finish before second txn starts.
|
||||
// 2. Choose a txn to write depending on some heuristics like amount of frames will be written.
|
||||
// 3. ..
|
||||
//
|
||||
loop {
|
||||
match self.pager.begin_write_tx() {
|
||||
Ok(crate::types::IOResult::Done(result)) => {
|
||||
if let crate::result::LimboResult::Busy = result {
|
||||
return Err(DatabaseError::Io(
|
||||
"Pager write transaction busy".to_string(),
|
||||
));
|
||||
}
|
||||
break;
|
||||
}
|
||||
Ok(crate::types::IOResult::IO) => {
|
||||
// FIXME: this is a hack to make the pager run the IO loop
|
||||
self.pager.io.run_once().unwrap();
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(DatabaseError::Io(e.to_string()));
|
||||
}
|
||||
}
|
||||
}
|
||||
self.state = CommitState::WriteRows { end_ts };
|
||||
return Ok(TransitionResult::Continue);
|
||||
}
|
||||
CommitState::WriteRows { end_ts } => {
|
||||
for id in &self.write_set {
|
||||
if let Some(row_versions) = mvcc_store.rows.get(id) {
|
||||
let row_versions = row_versions.value().read();
|
||||
// Find rows that were written by this transaction
|
||||
for row_version in row_versions.iter() {
|
||||
if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin {
|
||||
if row_tx_id == self.tx_id {
|
||||
mvcc_store
|
||||
.write_row_to_pager(self.pager.clone(), &row_version.row)?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if let Some(TxTimestampOrID::Timestamp(row_tx_id)) = row_version.end {
|
||||
if row_tx_id == self.tx_id {
|
||||
mvcc_store
|
||||
.write_row_to_pager(self.pager.clone(), &row_version.row)?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.state = CommitState::CommitPagerTxn { end_ts };
|
||||
Ok(TransitionResult::Continue)
|
||||
}
|
||||
CommitState::CommitPagerTxn { end_ts } => {
|
||||
// Write committed data to pager for persistence
|
||||
// Flush dirty pages to WAL - this is critical for data persistence
|
||||
// Similar to what step_end_write_txn does for legacy transactions
|
||||
loop {
|
||||
let result = self
|
||||
.pager
|
||||
.end_tx(
|
||||
false, // rollback = false since we're committing
|
||||
false, // schema_did_change = false for now (could be improved)
|
||||
&self.connection,
|
||||
self.connection.wal_checkpoint_disabled.get(),
|
||||
)
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))
|
||||
.unwrap();
|
||||
if let crate::types::IOResult::Done(_) = result {
|
||||
break;
|
||||
}
|
||||
}
|
||||
self.state = CommitState::Commit { end_ts };
|
||||
Ok(TransitionResult::Continue)
|
||||
}
|
||||
CommitState::Commit { end_ts } => {
|
||||
let mut log_record = LogRecord::new(end_ts);
|
||||
for ref id in &self.write_set {
|
||||
if let Some(row_versions) = mvcc_store.rows.get(id) {
|
||||
let mut row_versions = row_versions.value().write();
|
||||
for row_version in row_versions.iter_mut() {
|
||||
if let TxTimestampOrID::TxID(id) = row_version.begin {
|
||||
if id == self.tx_id {
|
||||
// New version is valid STARTING FROM committing transaction's end timestamp
|
||||
// See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
|
||||
row_version.begin = TxTimestampOrID::Timestamp(end_ts);
|
||||
mvcc_store.insert_version_raw(
|
||||
&mut log_record.row_versions,
|
||||
row_version.clone(),
|
||||
); // FIXME: optimize cloning out
|
||||
}
|
||||
}
|
||||
if let Some(TxTimestampOrID::TxID(id)) = row_version.end {
|
||||
if id == self.tx_id {
|
||||
// Old version is valid UNTIL committing transaction's end timestamp
|
||||
// See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
|
||||
row_version.end = Some(TxTimestampOrID::Timestamp(end_ts));
|
||||
mvcc_store.insert_version_raw(
|
||||
&mut log_record.row_versions,
|
||||
row_version.clone(),
|
||||
); // FIXME: optimize cloning out
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::trace!("updated(tx_id={})", self.tx_id);
|
||||
|
||||
// We have now updated all the versions with a reference to the
|
||||
// transaction ID to a timestamp and can, therefore, remove the
|
||||
// transaction. Please note that when we move to lockless, the
|
||||
// invariant doesn't necessarily hold anymore because another thread
|
||||
// might have speculatively read a version that we want to remove.
|
||||
// But that's a problem for another day.
|
||||
// FIXME: it actually just become a problem for today!!!
|
||||
// TODO: test that reproduces this failure, and then a fix
|
||||
mvcc_store.txs.remove(&self.tx_id);
|
||||
if !log_record.row_versions.is_empty() {
|
||||
mvcc_store.storage.log_tx(log_record)?;
|
||||
}
|
||||
tracing::trace!("logged(tx_id={})", self.tx_id);
|
||||
self.finalize(mvcc_store)?;
|
||||
Ok(TransitionResult::Done)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn finalize<'a>(&mut self, _context: &Self::Context) -> Result<()> {
|
||||
self.is_finalized = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_finalized(&self) -> bool {
|
||||
self.is_finalized
|
||||
}
|
||||
}
|
||||
|
||||
/// A multi-version concurrency control database.
|
||||
#[derive(Debug)]
|
||||
pub struct MvStore<Clock: LogicalClock> {
|
||||
@@ -510,210 +854,13 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
pager: Rc<Pager>,
|
||||
connection: &Arc<Connection>,
|
||||
) -> Result<()> {
|
||||
let end_ts = self.get_timestamp();
|
||||
// NOTICE: the first shadowed tx keeps the entry alive in the map
|
||||
// for the duration of this whole function, which is important for correctness!
|
||||
let tx = self.txs.get(&tx_id).ok_or(DatabaseError::TxTerminated)?;
|
||||
let tx = tx.value().write();
|
||||
match tx.state.load() {
|
||||
TransactionState::Terminated => return Err(DatabaseError::TxTerminated),
|
||||
_ => {
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
}
|
||||
}
|
||||
tx.state.store(TransactionState::Preparing);
|
||||
tracing::trace!("prepare_tx(tx_id={})", tx_id);
|
||||
|
||||
/* TODO: The code we have here is sufficient for snapshot isolation.
|
||||
** In order to implement serializability, we need the following steps:
|
||||
**
|
||||
** 1. Validate if all read versions are still visible by inspecting the read_set
|
||||
** 2. Validate if there are no phantoms by walking the scans from scan_set (which we don't even have yet)
|
||||
** - a phantom is a version that became visible in the middle of our transaction,
|
||||
** but wasn't taken into account during one of the scans from the scan_set
|
||||
** 3. Wait for commit dependencies, which we don't even track yet...
|
||||
** Excerpt from what's a commit dependency and how it's tracked in the original paper:
|
||||
** """
|
||||
A transaction T1 has a commit dependency on another transaction
|
||||
T2, if T1 is allowed to commit only if T2 commits. If T2 aborts,
|
||||
T1 must also abort, so cascading aborts are possible. T1 acquires a
|
||||
commit dependency either by speculatively reading or speculatively ignoring a version,
|
||||
instead of waiting for T2 to commit.
|
||||
We implement commit dependencies by a register-and-report
|
||||
approach: T1 registers its dependency with T2 and T2 informs T1
|
||||
when it has committed or aborted. Each transaction T contains a
|
||||
counter, CommitDepCounter, that counts how many unresolved
|
||||
commit dependencies it still has. A transaction cannot commit
|
||||
until this counter is zero. In addition, T has a Boolean variable
|
||||
AbortNow that other transactions can set to tell T to abort. Each
|
||||
transaction T also has a set, CommitDepSet, that stores transaction IDs
|
||||
of the transactions that depend on T.
|
||||
To take a commit dependency on a transaction T2, T1 increments
|
||||
its CommitDepCounter and adds its transaction ID to T2’s CommitDepSet.
|
||||
When T2 has committed, it locates each transaction in
|
||||
its CommitDepSet and decrements their CommitDepCounter. If
|
||||
T2 aborted, it tells the dependent transactions to also abort by
|
||||
setting their AbortNow flags. If a dependent transaction is not
|
||||
found, this means that it has already aborted.
|
||||
Note that a transaction with commit dependencies may not have to
|
||||
wait at all - the dependencies may have been resolved before it is
|
||||
ready to commit. Commit dependencies consolidate all waits into
|
||||
a single wait and postpone the wait to just before commit.
|
||||
Some transactions may have to wait before commit.
|
||||
Waiting raises a concern of deadlocks.
|
||||
However, deadlocks cannot occur because an older transaction never
|
||||
waits on a younger transaction. In
|
||||
a wait-for graph the direction of edges would always be from a
|
||||
younger transaction (higher end timestamp) to an older transaction
|
||||
(lower end timestamp) so cycles are impossible.
|
||||
"""
|
||||
** If you're wondering when a speculative read happens, here you go:
|
||||
** Case 1: speculative read of TB:
|
||||
"""
|
||||
If transaction TB is in the Preparing state, it has acquired an end
|
||||
timestamp TS which will be V’s begin timestamp if TB commits.
|
||||
A safe approach in this situation would be to have transaction T
|
||||
wait until transaction TB commits. However, we want to avoid all
|
||||
blocking during normal processing so instead we continue with
|
||||
the visibility test and, if the test returns true, allow T to
|
||||
speculatively read V. Transaction T acquires a commit dependency on
|
||||
TB, restricting the serialization order of the two transactions. That
|
||||
is, T is allowed to commit only if TB commits.
|
||||
"""
|
||||
** Case 2: speculative ignore of TE:
|
||||
"""
|
||||
If TE’s state is Preparing, it has an end timestamp TS that will become
|
||||
the end timestamp of V if TE does commit. If TS is greater than the read
|
||||
time RT, it is obvious that V will be visible if TE commits. If TE
|
||||
aborts, V will still be visible, because any transaction that updates
|
||||
V after TE has aborted will obtain an end timestamp greater than
|
||||
TS. If TS is less than RT, we have a more complicated situation:
|
||||
if TE commits, V will not be visible to T but if TE aborts, it will
|
||||
be visible. We could handle this by forcing T to wait until TE
|
||||
commits or aborts but we want to avoid all blocking during normal processing.
|
||||
Instead we allow T to speculatively ignore V and
|
||||
proceed with its processing. Transaction T acquires a commit
|
||||
dependency (see Section 2.7) on TE, that is, T is allowed to commit
|
||||
only if TE commits.
|
||||
"""
|
||||
*/
|
||||
tx.state.store(TransactionState::Committed(end_ts));
|
||||
tracing::trace!("commit_tx(tx_id={})", tx_id);
|
||||
let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
|
||||
drop(tx);
|
||||
// Postprocessing: inserting row versions and logging the transaction to persistent storage.
|
||||
|
||||
// FIXME: how do we deal with multiple concurrent writes?
|
||||
// WAL requires a txn to be written sequentially. Either we:
|
||||
// 1. Wait for currently writer to finish before second txn starts.
|
||||
// 2. Choose a txn to write depending on some heuristics like amount of frames will be written.
|
||||
// 3. ..
|
||||
//
|
||||
loop {
|
||||
match pager.begin_write_tx() {
|
||||
Ok(crate::types::IOResult::Done(result)) => {
|
||||
if let crate::result::LimboResult::Busy = result {
|
||||
return Err(DatabaseError::Io(
|
||||
"Pager write transaction busy".to_string(),
|
||||
));
|
||||
}
|
||||
break;
|
||||
}
|
||||
Ok(crate::types::IOResult::IO) => {
|
||||
// FIXME: this is a hack to make the pager run the IO loop
|
||||
pager.io.run_once().unwrap();
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(DatabaseError::Io(e.to_string()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 1. Write rows to btree for persistence
|
||||
for id in &write_set {
|
||||
if let Some(row_versions) = self.rows.get(id) {
|
||||
let row_versions = row_versions.value().read();
|
||||
// Find rows that were written by this transaction
|
||||
for row_version in row_versions.iter() {
|
||||
if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin {
|
||||
if row_tx_id == tx_id {
|
||||
self.write_row_to_pager(pager.clone(), &row_version.row)?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if let Some(TxTimestampOrID::Timestamp(row_tx_id)) = row_version.end {
|
||||
if row_tx_id == tx_id {
|
||||
self.write_row_to_pager(pager.clone(), &row_version.row)?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Write committed data to pager for persistence
|
||||
// Flush dirty pages to WAL - this is critical for data persistence
|
||||
// Similar to what step_end_write_txn does for legacy transactions
|
||||
loop {
|
||||
let result = pager
|
||||
.end_tx(
|
||||
false, // rollback = false since we're committing
|
||||
false, // schema_did_change = false for now (could be improved)
|
||||
connection,
|
||||
connection.wal_checkpoint_disabled.get(),
|
||||
)
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))
|
||||
.unwrap();
|
||||
if let crate::types::IOResult::Done(_) = result {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// 2. Commit rows to log
|
||||
let mut log_record = LogRecord::new(end_ts);
|
||||
for ref id in write_set {
|
||||
if let Some(row_versions) = self.rows.get(id) {
|
||||
let mut row_versions = row_versions.value().write();
|
||||
for row_version in row_versions.iter_mut() {
|
||||
if let TxTimestampOrID::TxID(id) = row_version.begin {
|
||||
if id == tx_id {
|
||||
// New version is valid STARTING FROM committing transaction's end timestamp
|
||||
// See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
|
||||
row_version.begin = TxTimestampOrID::Timestamp(end_ts);
|
||||
self.insert_version_raw(
|
||||
&mut log_record.row_versions,
|
||||
row_version.clone(),
|
||||
); // FIXME: optimize cloning out
|
||||
}
|
||||
}
|
||||
if let Some(TxTimestampOrID::TxID(id)) = row_version.end {
|
||||
if id == tx_id {
|
||||
// Old version is valid UNTIL committing transaction's end timestamp
|
||||
// See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
|
||||
row_version.end = Some(TxTimestampOrID::Timestamp(end_ts));
|
||||
self.insert_version_raw(
|
||||
&mut log_record.row_versions,
|
||||
row_version.clone(),
|
||||
); // FIXME: optimize cloning out
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::trace!("updated(tx_id={})", tx_id);
|
||||
|
||||
// We have now updated all the versions with a reference to the
|
||||
// transaction ID to a timestamp and can, therefore, remove the
|
||||
// transaction. Please note that when we move to lockless, the
|
||||
// invariant doesn't necessarily hold anymore because another thread
|
||||
// might have speculatively read a version that we want to remove.
|
||||
// But that's a problem for another day.
|
||||
// FIXME: it actually just become a problem for today!!!
|
||||
// TODO: test that reproduces this failure, and then a fix
|
||||
self.txs.remove(&tx_id);
|
||||
if !log_record.row_versions.is_empty() {
|
||||
self.storage.log_tx(log_record)?;
|
||||
}
|
||||
tracing::trace!("logged(tx_id={})", tx_id);
|
||||
let mut state_machine: StateMachine<CommitStateMachine<Clock>> = StateMachine::<
|
||||
CommitStateMachine<Clock>,
|
||||
>::new(
|
||||
CommitStateMachine::new(CommitState::Initial, pager, tx_id, connection.clone()),
|
||||
);
|
||||
state_machine.transition(self)?;
|
||||
assert!(state_machine.is_finalized());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -851,7 +998,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
|
||||
/// Inserts a new row version into the internal data structure for versions,
|
||||
/// while making sure that the row version is inserted in the correct order.
|
||||
fn insert_version_raw(&self, versions: &mut Vec<RowVersion>, row_version: RowVersion) {
|
||||
pub fn insert_version_raw(&self, versions: &mut Vec<RowVersion>, row_version: RowVersion) {
|
||||
// NOTICE: this is an insert a'la insertion sort, with pessimistic linear complexity.
|
||||
// However, we expect the number of versions to be nearly sorted, so we deem it worthy
|
||||
// to search linearly for the insertion point instead of paying the price of using
|
||||
@@ -874,7 +1021,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
versions.insert(position, row_version);
|
||||
}
|
||||
|
||||
fn write_row_to_pager(&self, pager: Rc<Pager>, row: &Row) -> Result<()> {
|
||||
pub fn write_row_to_pager(&self, pager: Rc<Pager>, row: &Row) -> Result<()> {
|
||||
use crate::storage::btree::BTreeCursor;
|
||||
use crate::types::{IOResult, SeekKey, SeekOp};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user