mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-20 16:44:19 +01:00
@@ -2,12 +2,9 @@ use crate::clock::LogicalClock;
|
||||
use crate::errors::DatabaseError;
|
||||
use crate::persistent_storage::Storage;
|
||||
use crossbeam_skiplist::{SkipMap, SkipSet};
|
||||
use parking_lot::Mutex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::cell::RefCell;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::RwLock;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, DatabaseError>;
|
||||
|
||||
@@ -150,7 +147,8 @@ impl std::fmt::Display for Transaction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
|
||||
write!(
|
||||
f,
|
||||
"{{ id: {}, begin_ts: {}, write_set: {:?}, read_set: {:?}",
|
||||
"{{ state: {}, id: {}, begin_ts: {}, write_set: {:?}, read_set: {:?}",
|
||||
self.state,
|
||||
self.tx_id,
|
||||
self.begin_ts,
|
||||
// FIXME: I'm sorry, we obviously shouldn't be cloning here.
|
||||
@@ -171,30 +169,41 @@ impl std::fmt::Display for Transaction {
|
||||
enum TransactionState {
|
||||
Active,
|
||||
Preparing,
|
||||
Committed,
|
||||
Committed(u64),
|
||||
Aborted,
|
||||
Terminated,
|
||||
}
|
||||
|
||||
/// A database with MVCC.
|
||||
impl std::fmt::Display for TransactionState {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
|
||||
match self {
|
||||
TransactionState::Active => write!(f, "Active"),
|
||||
TransactionState::Preparing => write!(f, "Preparing"),
|
||||
TransactionState::Committed(ts) => write!(f, "Committed({ts})"),
|
||||
TransactionState::Aborted => write!(f, "Aborted"),
|
||||
TransactionState::Terminated => write!(f, "Terminated"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Database<Clock: LogicalClock> {
|
||||
inner: Arc<Mutex<DatabaseInner<Clock>>>,
|
||||
rows: SkipMap<RowID, RwLock<Vec<RowVersion>>>,
|
||||
txs: SkipMap<TxID, RwLock<Transaction>>,
|
||||
tx_ids: AtomicU64,
|
||||
clock: Clock,
|
||||
storage: Storage,
|
||||
}
|
||||
|
||||
impl<Clock: LogicalClock> Database<Clock> {
|
||||
/// Creates a new database.
|
||||
pub fn new(clock: Clock, storage: Storage) -> Self {
|
||||
let inner = DatabaseInner {
|
||||
Self {
|
||||
rows: SkipMap::new(),
|
||||
txs: SkipMap::new(),
|
||||
tx_timestamps: RefCell::new(BTreeMap::new()),
|
||||
tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes
|
||||
clock,
|
||||
storage,
|
||||
};
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(inner)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,8 +218,23 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
/// * `row` - the row object containing the values to be inserted.
|
||||
///
|
||||
pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
|
||||
let inner = self.inner.lock();
|
||||
inner.insert(tx_id, row)
|
||||
let tx = self
|
||||
.txs
|
||||
.get(&tx_id)
|
||||
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
|
||||
let mut tx = tx.value().write().unwrap();
|
||||
assert!(tx.state == TransactionState::Active);
|
||||
let id = row.id;
|
||||
let row_version = RowVersion {
|
||||
begin: TxTimestampOrID::TxID(tx.tx_id),
|
||||
end: None,
|
||||
row,
|
||||
};
|
||||
let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new()));
|
||||
let mut versions = versions.value().write().unwrap();
|
||||
versions.push(row_version);
|
||||
tx.insert_to_write_set(id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Updates a row in the database with new values.
|
||||
@@ -254,123 +278,6 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
/// Returns `true` if the row was successfully deleted, and `false` otherwise.
|
||||
///
|
||||
pub fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
|
||||
let inner = self.inner.lock();
|
||||
inner.delete(tx_id, id)
|
||||
}
|
||||
|
||||
/// Retrieves a row from the table with the given `id`.
|
||||
///
|
||||
/// This operation is performed within the scope of the transaction identified
|
||||
/// by `tx_id`.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `tx_id` - The ID of the transaction to perform the read operation in.
|
||||
/// * `id` - The ID of the row to retrieve.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Returns `Some(row)` with the row data if the row with the given `id` exists,
|
||||
/// and `None` otherwise.
|
||||
pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
|
||||
let inner = self.inner.lock();
|
||||
inner.read(tx_id, id)
|
||||
}
|
||||
|
||||
pub fn scan_row_ids(&self) -> Result<Vec<RowID>> {
|
||||
let inner = self.inner.lock();
|
||||
inner.scan_row_ids()
|
||||
}
|
||||
|
||||
pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
|
||||
let inner = self.inner.lock();
|
||||
inner.scan_row_ids_for_table(table_id)
|
||||
}
|
||||
|
||||
/// Begins a new transaction in the database.
|
||||
///
|
||||
/// This function starts a new transaction in the database and returns a `TxID` value
|
||||
/// that you can use to perform operations within the transaction. All changes made within the
|
||||
/// transaction are isolated from other transactions until you commit the transaction.
|
||||
pub fn begin_tx(&self) -> TxID {
|
||||
let mut inner = self.inner.lock();
|
||||
inner.begin_tx()
|
||||
}
|
||||
|
||||
/// Commits a transaction with the specified transaction ID.
|
||||
///
|
||||
/// This function commits the changes made within the specified transaction and finalizes the
|
||||
/// transaction. Once a transaction has been committed, all changes made within the transaction
|
||||
/// are visible to other transactions that access the same data.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `tx_id` - The ID of the transaction to commit.
|
||||
pub fn commit_tx(&self, tx_id: TxID) -> Result<()> {
|
||||
let mut inner = self.inner.lock();
|
||||
inner.commit_tx(tx_id)
|
||||
}
|
||||
|
||||
/// Rolls back a transaction with the specified ID.
|
||||
///
|
||||
/// This function rolls back a transaction with the specified `tx_id` by
|
||||
/// discarding any changes made by the transaction.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `tx_id` - The ID of the transaction to abort.
|
||||
pub fn rollback_tx(&self, tx_id: TxID) {
|
||||
let inner = self.inner.lock();
|
||||
inner.rollback_tx(tx_id);
|
||||
}
|
||||
|
||||
/// Drops all unused row versions from the database.
|
||||
///
|
||||
/// A version is considered unused if it is not visible to any active transaction
|
||||
/// and it is not the most recent version of the row.
|
||||
pub fn drop_unused_row_versions(&self) {
|
||||
let inner = self.inner.lock();
|
||||
inner.drop_unused_row_versions();
|
||||
}
|
||||
|
||||
pub fn recover(&self) -> Result<()> {
|
||||
let inner = self.inner.lock();
|
||||
inner.recover()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseInner<Clock: LogicalClock> {
|
||||
rows: SkipMap<RowID, RwLock<Vec<RowVersion>>>,
|
||||
txs: SkipMap<TxID, RwLock<Transaction>>,
|
||||
tx_timestamps: RefCell<BTreeMap<u64, usize>>,
|
||||
tx_ids: AtomicU64,
|
||||
clock: Clock,
|
||||
storage: Storage,
|
||||
}
|
||||
|
||||
impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
|
||||
let tx = self
|
||||
.txs
|
||||
.get(&tx_id)
|
||||
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
|
||||
let mut tx = tx.value().write().unwrap();
|
||||
assert!(tx.state == TransactionState::Active);
|
||||
let id = row.id;
|
||||
let row_version = RowVersion {
|
||||
begin: TxTimestampOrID::TxID(tx.tx_id),
|
||||
end: None,
|
||||
row,
|
||||
};
|
||||
let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new()));
|
||||
let mut versions = versions.value().write().unwrap();
|
||||
versions.push(row_version);
|
||||
tx.insert_to_write_set(id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
|
||||
let row_versions_opt = self.rows.get(&id);
|
||||
if let Some(ref row_versions) = row_versions_opt {
|
||||
let mut row_versions = row_versions.value().write().unwrap();
|
||||
@@ -404,7 +311,21 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
|
||||
/// Retrieves a row from the table with the given `id`.
|
||||
///
|
||||
/// This operation is performed within the scope of the transaction identified
|
||||
/// by `tx_id`.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `tx_id` - The ID of the transaction to perform the read operation in.
|
||||
/// * `id` - The ID of the row to retrieve.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Returns `Some(row)` with the row data if the row with the given `id` exists,
|
||||
/// and `None` otherwise.
|
||||
pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
|
||||
let tx = self.txs.get(&tx_id).unwrap();
|
||||
let tx = tx.value().read().unwrap();
|
||||
assert!(tx.state == TransactionState::Active);
|
||||
@@ -420,12 +341,14 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn scan_row_ids(&self) -> Result<Vec<RowID>> {
|
||||
/// Gets all row ids in the database.
|
||||
pub fn scan_row_ids(&self) -> Result<Vec<RowID>> {
|
||||
let keys = self.rows.iter().map(|entry| *entry.key());
|
||||
Ok(keys.collect())
|
||||
}
|
||||
|
||||
fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
|
||||
/// Gets all row ids in the database for a given table.
|
||||
pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
|
||||
Ok(self
|
||||
.rows
|
||||
.range(
|
||||
@@ -441,18 +364,30 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn begin_tx(&mut self) -> TxID {
|
||||
/// Begins a new transaction in the database.
|
||||
///
|
||||
/// This function starts a new transaction in the database and returns a `TxID` value
|
||||
/// that you can use to perform operations within the transaction. All changes made within the
|
||||
/// transaction are isolated from other transactions until you commit the transaction.
|
||||
pub fn begin_tx(&self) -> TxID {
|
||||
let tx_id = self.get_tx_id();
|
||||
let begin_ts = self.get_timestamp();
|
||||
let tx = Transaction::new(tx_id, begin_ts);
|
||||
tracing::trace!("BEGIN {tx}");
|
||||
let mut tx_timestamps = self.tx_timestamps.borrow_mut();
|
||||
self.txs.insert(tx_id, RwLock::new(tx));
|
||||
*tx_timestamps.entry(begin_ts).or_insert(0) += 1;
|
||||
tx_id
|
||||
}
|
||||
|
||||
fn commit_tx(&mut self, tx_id: TxID) -> Result<()> {
|
||||
/// Commits a transaction with the specified transaction ID.
|
||||
///
|
||||
/// This function commits the changes made within the specified transaction and finalizes the
|
||||
/// transaction. Once a transaction has been committed, all changes made within the transaction
|
||||
/// are visible to other transactions that access the same data.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `tx_id` - The ID of the transaction to commit.
|
||||
pub fn commit_tx(&self, tx_id: TxID) -> Result<()> {
|
||||
let end_ts = self.get_timestamp();
|
||||
let tx = self.txs.get(&tx_id).unwrap();
|
||||
let mut tx = tx.value().write().unwrap();
|
||||
@@ -464,6 +399,84 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
}
|
||||
tx.state = TransactionState::Preparing;
|
||||
tracing::trace!("PREPARE {tx}");
|
||||
|
||||
/* TODO: The code we have here is sufficient for snapshot isolation.
|
||||
** In order to implement serializability, we need the following steps:
|
||||
**
|
||||
** 1. Validate if all read versions are still visible by inspecting the read_set
|
||||
** 2. Validate if there are no phantoms by walking the scans from scan_set (which we don't even have yet)
|
||||
** - a phantom is a version that became visible in the middle of our transaction,
|
||||
** but wasn't taken into account during one of the scans from the scan_set
|
||||
** 3. Wait for commit dependencies, which we don't even track yet...
|
||||
** Excerpt from what's a commit dependency and how it's tracked in the original paper:
|
||||
** """
|
||||
A transaction T1 has a commit dependency on another transaction
|
||||
T2, if T1 is allowed to commit only if T2 commits. If T2 aborts,
|
||||
T1 must also abort, so cascading aborts are possible. T1 acquires a
|
||||
commit dependency either by speculatively reading or speculatively ignoring a version,
|
||||
instead of waiting for T2 to commit.
|
||||
We implement commit dependencies by a register-and-report
|
||||
approach: T1 registers its dependency with T2 and T2 informs T1
|
||||
when it has committed or aborted. Each transaction T contains a
|
||||
counter, CommitDepCounter, that counts how many unresolved
|
||||
commit dependencies it still has. A transaction cannot commit
|
||||
until this counter is zero. In addition, T has a Boolean variable
|
||||
AbortNow that other transactions can set to tell T to abort. Each
|
||||
transaction T also has a set, CommitDepSet, that stores transaction IDs
|
||||
of the transactions that depend on T.
|
||||
To take a commit dependency on a transaction T2, T1 increments
|
||||
its CommitDepCounter and adds its transaction ID to T2’s CommitDepSet.
|
||||
When T2 has committed, it locates each transaction in
|
||||
its CommitDepSet and decrements their CommitDepCounter. If
|
||||
T2 aborted, it tells the dependent transactions to also abort by
|
||||
setting their AbortNow flags. If a dependent transaction is not
|
||||
found, this means that it has already aborted.
|
||||
Note that a transaction with commit dependencies may not have to
|
||||
wait at all - the dependencies may have been resolved before it is
|
||||
ready to commit. Commit dependencies consolidate all waits into
|
||||
a single wait and postpone the wait to just before commit.
|
||||
Some transactions may have to wait before commit.
|
||||
Waiting raises a concern of deadlocks.
|
||||
However, deadlocks cannot occur because an older transaction never
|
||||
waits on a younger transaction. In
|
||||
a wait-for graph the direction of edges would always be from a
|
||||
younger transaction (higher end timestamp) to an older transaction
|
||||
(lower end timestamp) so cycles are impossible.
|
||||
"""
|
||||
** If you're wondering when a speculative read happens, here you go:
|
||||
** Case 1: speculative read of TB:
|
||||
"""
|
||||
If transaction TB is in the Preparing state, it has acquired an end
|
||||
timestamp TS which will be V’s begin timestamp if TB commits.
|
||||
A safe approach in this situation would be to have transaction T
|
||||
wait until transaction TB commits. However, we want to avoid all
|
||||
blocking during normal processing so instead we continue with
|
||||
the visibility test and, if the test returns true, allow T to
|
||||
speculatively read V. Transaction T acquires a commit dependency on
|
||||
TB, restricting the serialization order of the two transactions. That
|
||||
is, T is allowed to commit only if TB commits.
|
||||
"""
|
||||
** Case 2: speculative ignore of TE:
|
||||
"""
|
||||
If TE’s state is Preparing, it has an end timestamp TS that will become
|
||||
the end timestamp of V if TE does commit. If TS is greater than the read
|
||||
time RT, it is obvious that V will be visible if TE commits. If TE
|
||||
aborts, V will still be visible, because any transaction that updates
|
||||
V after TE has aborted will obtain an end timestamp greater than
|
||||
TS. If TS is less than RT, we have a more complicated situation:
|
||||
if TE commits, V will not be visible to T but if TE aborts, it will
|
||||
be visible. We could handle this by forcing T to wait until TE
|
||||
commits or aborts but we want to avoid all blocking during normal processing.
|
||||
Instead we allow T to speculatively ignore V and
|
||||
proceed with its processing. Transaction T acquires a commit
|
||||
dependency (see Section 2.7) on TE, that is, T is allowed to commit
|
||||
only if TE commits.
|
||||
"""
|
||||
*/
|
||||
tx.state = TransactionState::Committed(end_ts);
|
||||
tracing::trace!("COMMIT {tx}");
|
||||
// Postprocessing: inserting row versions and logging the transaction to persistent storage.
|
||||
// TODO: we should probably save to persistent storage first, and only then update the in-memory structures.
|
||||
let mut log_record: LogRecord = LogRecord::new(end_ts);
|
||||
for id in &tx.write_set {
|
||||
let id = id.value();
|
||||
@@ -485,21 +498,14 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
}
|
||||
}
|
||||
}
|
||||
tx.state = TransactionState::Committed;
|
||||
tracing::trace!("COMMIT {tx}");
|
||||
// We have now updated all the versions with a reference to the
|
||||
// transaction ID to a timestamp and can, therefore, remove the
|
||||
// transaction. Please note that when we move to lockless, the
|
||||
// invariant doesn't necessarily hold anymore because another thread
|
||||
// might have speculatively read a version that we want to remove.
|
||||
// But that's a problem for another day.
|
||||
let mut tx_timestamps = self.tx_timestamps.borrow_mut();
|
||||
if let Some(timestamp_entry) = tx_timestamps.get_mut(&tx.begin_ts) {
|
||||
*timestamp_entry -= 1;
|
||||
if timestamp_entry == &0 {
|
||||
tx_timestamps.remove(&tx.begin_ts);
|
||||
}
|
||||
}
|
||||
// FIXME: it actually just become a problem for today!!!
|
||||
// TODO: test that reproduces this failure, and then a fix
|
||||
self.txs.remove(&tx_id);
|
||||
if !log_record.row_versions.is_empty() {
|
||||
self.storage.log_tx(log_record)?;
|
||||
@@ -507,7 +513,15 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rollback_tx(&self, tx_id: TxID) {
|
||||
/// Rolls back a transaction with the specified ID.
|
||||
///
|
||||
/// This function rolls back a transaction with the specified `tx_id` by
|
||||
/// discarding any changes made by the transaction.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `tx_id` - The ID of the transaction to abort.
|
||||
pub fn rollback_tx(&self, tx_id: TxID) {
|
||||
let tx = self.txs.get(&tx_id).unwrap();
|
||||
let mut tx = tx.value().write().unwrap();
|
||||
assert!(tx.state == TransactionState::Active);
|
||||
@@ -527,39 +541,30 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
tracing::trace!("TERMINATE {tx}");
|
||||
}
|
||||
|
||||
fn get_tx_id(&mut self) -> u64 {
|
||||
/// Generates next unique transaction id
|
||||
pub fn get_tx_id(&self) -> u64 {
|
||||
self.tx_ids.fetch_add(1, Ordering::SeqCst)
|
||||
}
|
||||
|
||||
fn get_timestamp(&mut self) -> u64 {
|
||||
/// Gets current timestamp
|
||||
pub fn get_timestamp(&self) -> u64 {
|
||||
self.clock.get_timestamp()
|
||||
}
|
||||
|
||||
/// Drops all rows that are not visible to any transaction.
|
||||
/// The logic is as follows. If a row version has an end marker
|
||||
/// which denotes a transaction that is not active, then we can
|
||||
/// drop the row version -- it is not visible to any transaction.
|
||||
/// If a row version has an end marker that denotes a timestamp T_END,
|
||||
/// then we can drop the row version only if all active transactions
|
||||
/// have a begin timestamp that is greater than timestamp T_END.
|
||||
/// FIXME: this function is a full scan over all rows and row versions.
|
||||
/// We can do better by keeping an index of row versions ordered
|
||||
/// by their end timestamps.
|
||||
fn drop_unused_row_versions(&self) {
|
||||
let tx_timestamps = self.tx_timestamps.borrow();
|
||||
/// FIXME: implement in a lock-free manner
|
||||
pub fn drop_unused_row_versions(&self) {
|
||||
let mut to_remove = Vec::new();
|
||||
for entry in self.rows.iter() {
|
||||
let mut row_versions = entry.value().write().unwrap();
|
||||
row_versions.retain(|rv| {
|
||||
let should_stay = match rv.end {
|
||||
Some(TxTimestampOrID::Timestamp(version_end_ts)) => {
|
||||
match tx_timestamps.first_key_value() {
|
||||
// a transaction started before this row version ended,
|
||||
// ergo row version is needed
|
||||
Some((begin_ts, _)) => version_end_ts >= *begin_ts,
|
||||
// no transaction => row version is not needed
|
||||
None => false,
|
||||
}
|
||||
// a transaction started before this row version ended,
|
||||
// ergo row version is needed
|
||||
// NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable
|
||||
self.txs
|
||||
.iter()
|
||||
.any(|tx| version_end_ts >= tx.value().read().unwrap().begin_ts)
|
||||
}
|
||||
// Let's skip potentially complex logic if the transaction is still
|
||||
// active/tracked. We will drop the row version when the transaction
|
||||
@@ -606,7 +611,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
|
||||
/// A write-write conflict happens when transaction T_m attempts to update a
|
||||
/// row version that is currently being updated by an active transaction T_n.
|
||||
fn is_write_write_conflict(
|
||||
pub(crate) fn is_write_write_conflict(
|
||||
txs: &SkipMap<TxID, RwLock<Transaction>>,
|
||||
tx: &Transaction,
|
||||
rv: &RowVersion,
|
||||
@@ -618,7 +623,7 @@ fn is_write_write_conflict(
|
||||
match te.state {
|
||||
TransactionState::Active => tx.tx_id != te.tx_id,
|
||||
TransactionState::Preparing => todo!(),
|
||||
TransactionState::Committed => todo!(),
|
||||
TransactionState::Committed(_end_ts) => todo!(),
|
||||
TransactionState::Aborted => todo!(),
|
||||
TransactionState::Terminated => todo!(),
|
||||
}
|
||||
@@ -628,7 +633,7 @@ fn is_write_write_conflict(
|
||||
}
|
||||
}
|
||||
|
||||
fn is_version_visible(
|
||||
pub(crate) fn is_version_visible(
|
||||
txs: &SkipMap<TxID, RwLock<Transaction>>,
|
||||
tx: &Transaction,
|
||||
rv: &RowVersion,
|
||||
@@ -646,13 +651,22 @@ fn is_begin_visible(
|
||||
TxTimestampOrID::TxID(rv_begin) => {
|
||||
let tb = txs.get(&rv_begin).unwrap();
|
||||
let tb = tb.value().read().unwrap();
|
||||
match tb.state {
|
||||
let visible = match tb.state {
|
||||
TransactionState::Active => tx.tx_id == tb.tx_id && rv.end.is_none(),
|
||||
TransactionState::Preparing => todo!(),
|
||||
TransactionState::Committed => todo!(),
|
||||
TransactionState::Aborted => todo!(),
|
||||
TransactionState::Terminated => todo!(),
|
||||
}
|
||||
TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable!
|
||||
TransactionState::Committed(committed_ts) => tx.begin_ts >= committed_ts,
|
||||
TransactionState::Aborted => false,
|
||||
TransactionState::Terminated => {
|
||||
tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now");
|
||||
false
|
||||
}
|
||||
};
|
||||
tracing::trace!(
|
||||
"is_begin_visible: tx={tx}, tb={tb} rv = {:?}-{:?} visible = {visible}",
|
||||
rv.begin,
|
||||
rv.end
|
||||
);
|
||||
visible
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -667,13 +681,22 @@ fn is_end_visible(
|
||||
Some(TxTimestampOrID::TxID(rv_end)) => {
|
||||
let te = txs.get(&rv_end).unwrap();
|
||||
let te = te.value().read().unwrap();
|
||||
match te.state {
|
||||
let visible = match te.state {
|
||||
TransactionState::Active => tx.tx_id != te.tx_id,
|
||||
TransactionState::Preparing => todo!(),
|
||||
TransactionState::Committed => todo!(),
|
||||
TransactionState::Aborted => todo!(),
|
||||
TransactionState::Terminated => todo!(),
|
||||
}
|
||||
TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable!
|
||||
TransactionState::Committed(committed_ts) => tx.begin_ts < committed_ts,
|
||||
TransactionState::Aborted => false,
|
||||
TransactionState::Terminated => {
|
||||
tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now");
|
||||
false
|
||||
}
|
||||
};
|
||||
tracing::trace!(
|
||||
"is_end_visible: tx={tx}, te={te} rv = {:?}-{:?} visible = {visible}",
|
||||
rv.begin,
|
||||
rv.end
|
||||
);
|
||||
visible
|
||||
}
|
||||
None => true,
|
||||
}
|
||||
|
||||
@@ -337,7 +337,6 @@ fn test_dirty_read() {
|
||||
assert_eq!(row2, None);
|
||||
}
|
||||
|
||||
#[ignore]
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_dirty_read_deleted() {
|
||||
@@ -777,3 +776,157 @@ fn test_storage1() {
|
||||
"testme3"
|
||||
);
|
||||
}
|
||||
|
||||
/* States described in the Hekaton paper *for serializability*:
|
||||
|
||||
Table 1: Case analysis of action to take when version V’s
|
||||
Begin field contains the ID of transaction TB
|
||||
------------------------------------------------------------------------------------------------------
|
||||
TB’s state | TB’s end timestamp | Action to take when transaction T checks visibility of version V.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Active | Not set | V is visible only if TB=T and V’s end timestamp equals infinity.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Preparing | TS | V’s begin timestamp will be TS ut V is not yet committed. Use TS
|
||||
| as V’s begin time when testing visibility. If the test is true,
|
||||
| allow T to speculatively read V. Committed TS V’s begin timestamp
|
||||
| will be TS and V is committed. Use TS as V’s begin time to test
|
||||
| visibility.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Committed | TS | V’s begin timestamp will be TS and V is committed. Use TS as V’s
|
||||
| begin time to test visibility.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Aborted | Irrelevant | Ignore V; it’s a garbage version.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Terminated | Irrelevant | Reread V’s Begin field. TB has terminated so it must have finalized
|
||||
or not found | | the timestamp.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
|
||||
Table 2: Case analysis of action to take when V's End field
|
||||
contains a transaction ID TE.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
TE’s state | TE’s end timestamp | Action to take when transaction T checks visibility of a version V
|
||||
| | as of read time RT.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Active | Not set | V is visible only if TE is not T.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Preparing | TS | V’s end timestamp will be TS provided that TE commits. If TS > RT,
|
||||
| V is visible to T. If TS < RT, T speculatively ignores V.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Committed | TS | V’s end timestamp will be TS and V is committed. Use TS as V’s end
|
||||
| timestamp when testing visibility.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Aborted | Irrelevant | V is visible.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Terminated | Irrelevant | Reread V’s End field. TE has terminated so it must have finalized
|
||||
or not found | | the timestamp.
|
||||
*/
|
||||
|
||||
fn new_tx(tx_id: TxID, begin_ts: u64, state: TransactionState) -> RwLock<Transaction> {
|
||||
RwLock::new(Transaction {
|
||||
state,
|
||||
tx_id,
|
||||
begin_ts,
|
||||
write_set: SkipSet::new(),
|
||||
read_set: SkipSet::new(),
|
||||
})
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_snapshot_isolation_tx_visible1() {
|
||||
let txs: SkipMap<TxID, RwLock<Transaction>> = SkipMap::from_iter([
|
||||
(1, new_tx(1, 1, TransactionState::Committed(2))),
|
||||
(2, new_tx(2, 2, TransactionState::Committed(5))),
|
||||
(3, new_tx(3, 3, TransactionState::Aborted)),
|
||||
(5, new_tx(5, 5, TransactionState::Preparing)),
|
||||
(6, new_tx(6, 6, TransactionState::Committed(10))),
|
||||
(7, new_tx(7, 7, TransactionState::Active)),
|
||||
]);
|
||||
|
||||
let current_tx = new_tx(4, 4, TransactionState::Preparing);
|
||||
let current_tx = current_tx.read().unwrap();
|
||||
|
||||
let rv_visible = |begin: TxTimestampOrID, end: Option<TxTimestampOrID>| {
|
||||
let row_version = RowVersion {
|
||||
begin,
|
||||
end,
|
||||
row: Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "testme".to_string(),
|
||||
},
|
||||
};
|
||||
tracing::debug!("Testing visibility of {row_version:?}");
|
||||
is_version_visible(&txs, ¤t_tx, &row_version)
|
||||
};
|
||||
|
||||
// begin visible: transaction committed with ts < current_tx.begin_ts
|
||||
// end visible: inf
|
||||
assert!(rv_visible(TxTimestampOrID::TxID(1), None));
|
||||
|
||||
// begin invisible: transaction committed with ts > current_tx.begin_ts
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(2), None));
|
||||
|
||||
// begin invisible: transaction aborted
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(3), None));
|
||||
|
||||
// begin visible: timestamp < current_tx.begin_ts
|
||||
// end invisible: transaction committed with ts > current_tx.begin_ts
|
||||
assert!(!rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::TxID(1))
|
||||
));
|
||||
|
||||
// begin visible: timestamp < current_tx.begin_ts
|
||||
// end visible: transaction committed with ts < current_tx.begin_ts
|
||||
assert!(rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::TxID(2))
|
||||
));
|
||||
|
||||
// begin visible: timestamp < current_tx.begin_ts
|
||||
// end invisible: transaction aborted
|
||||
assert!(!rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::TxID(3))
|
||||
));
|
||||
|
||||
// begin invisible: transaction preparing
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(5), None));
|
||||
|
||||
// begin invisible: transaction committed with ts > current_tx.begin_ts
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(6), None));
|
||||
|
||||
// begin invisible: transaction active
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(7), None));
|
||||
|
||||
// begin invisible: transaction committed with ts > current_tx.begin_ts
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(6), None));
|
||||
|
||||
// begin invisible: transaction active
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(7), None));
|
||||
|
||||
// begin visible: timestamp < current_tx.begin_ts
|
||||
// end invisible: transaction preparing
|
||||
assert!(!rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::TxID(5))
|
||||
));
|
||||
|
||||
// begin invisible: timestamp > current_tx.begin_ts
|
||||
assert!(!rv_visible(
|
||||
TxTimestampOrID::Timestamp(6),
|
||||
Some(TxTimestampOrID::TxID(6))
|
||||
));
|
||||
|
||||
// begin visible: timestamp < current_tx.begin_ts
|
||||
// end visible: some active transaction will eventually overwrite this version,
|
||||
// but that hasn't happened
|
||||
// (this is the https://avi.im/blag/2023/hekaton-paper-typo/ case, I believe!)
|
||||
assert!(rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::TxID(7))
|
||||
));
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ impl Storage {
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
pub fn log_tx(&mut self, m: LogRecord) -> Result<()> {
|
||||
pub fn log_tx(&self, m: LogRecord) -> Result<()> {
|
||||
match self {
|
||||
Self::JsonOnDisk(path) => {
|
||||
use std::io::Write;
|
||||
|
||||
Reference in New Issue
Block a user