database: drop the mutex

Without a critical section, we naturally hit a few unimplemented
paths when handling concurrent transactions, which is great news!
Visiting previously impossible paths already proves that lock-free
is able to handle concurrency > 1.
Now, the easy part - fixing all the unimplemented paths and making
the Hekaton implementation 100% foolproof.
This commit is contained in:
Piotr Sarna
2023-06-06 11:21:10 +02:00
parent 57b2e031fb
commit 47eb149214
2 changed files with 78 additions and 206 deletions

View File

@@ -2,12 +2,9 @@ use crate::clock::LogicalClock;
use crate::errors::DatabaseError;
use crate::persistent_storage::Storage;
use crossbeam_skiplist::{SkipMap, SkipSet};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::RwLock;
pub type Result<T> = std::result::Result<T, DatabaseError>;
@@ -175,29 +172,28 @@ enum TransactionState {
Aborted,
Terminated,
}
/// A database with MVCC.
#[derive(Debug)]
pub struct Database<Clock: LogicalClock> {
inner: Arc<Mutex<DatabaseInner<Clock>>>,
rows: SkipMap<RowID, RwLock<Vec<RowVersion>>>,
txs: SkipMap<TxID, RwLock<Transaction>>,
tx_ids: AtomicU64,
clock: Clock,
storage: Storage,
}
impl<Clock: LogicalClock> Database<Clock> {
/// Creates a new database.
pub fn new(clock: Clock, storage: Storage) -> Self {
let inner = DatabaseInner {
Self {
rows: SkipMap::new(),
txs: SkipMap::new(),
tx_timestamps: RefCell::new(BTreeMap::new()),
tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes
clock,
storage,
};
Self {
inner: Arc::new(Mutex::new(inner)),
}
}
/// Inserts a new row into the database.
///
/// This function inserts a new `row` into the database within the context
@@ -209,8 +205,23 @@ impl<Clock: LogicalClock> Database<Clock> {
/// * `row` - the row object containing the values to be inserted.
///
pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
let inner = self.inner.lock();
inner.insert(tx_id, row)
let tx = self
.txs
.get(&tx_id)
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
let mut tx = tx.value().write().unwrap();
assert!(tx.state == TransactionState::Active);
let id = row.id;
let row_version = RowVersion {
begin: TxTimestampOrID::TxID(tx.tx_id),
end: None,
row,
};
let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new()));
let mut versions = versions.value().write().unwrap();
versions.push(row_version);
tx.insert_to_write_set(id);
Ok(())
}
/// Updates a row in the database with new values.
@@ -254,123 +265,6 @@ impl<Clock: LogicalClock> Database<Clock> {
/// Returns `true` if the row was successfully deleted, and `false` otherwise.
///
pub fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
let inner = self.inner.lock();
inner.delete(tx_id, id)
}
/// Retrieves a row from the table with the given `id`.
///
/// This operation is performed within the scope of the transaction identified
/// by `tx_id`.
///
/// # Arguments
///
/// * `tx_id` - The ID of the transaction to perform the read operation in.
/// * `id` - The ID of the row to retrieve.
///
/// # Returns
///
/// Returns `Some(row)` with the row data if the row with the given `id` exists,
/// and `None` otherwise.
pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
let inner = self.inner.lock();
inner.read(tx_id, id)
}
pub fn scan_row_ids(&self) -> Result<Vec<RowID>> {
let inner = self.inner.lock();
inner.scan_row_ids()
}
pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
let inner = self.inner.lock();
inner.scan_row_ids_for_table(table_id)
}
/// Begins a new transaction in the database.
///
/// This function starts a new transaction in the database and returns a `TxID` value
/// that you can use to perform operations within the transaction. All changes made within the
/// transaction are isolated from other transactions until you commit the transaction.
pub fn begin_tx(&self) -> TxID {
let mut inner = self.inner.lock();
inner.begin_tx()
}
/// Commits a transaction with the specified transaction ID.
///
/// This function commits the changes made within the specified transaction and finalizes the
/// transaction. Once a transaction has been committed, all changes made within the transaction
/// are visible to other transactions that access the same data.
///
/// # Arguments
///
/// * `tx_id` - The ID of the transaction to commit.
pub fn commit_tx(&self, tx_id: TxID) -> Result<()> {
let mut inner = self.inner.lock();
inner.commit_tx(tx_id)
}
/// Rolls back a transaction with the specified ID.
///
/// This function rolls back a transaction with the specified `tx_id` by
/// discarding any changes made by the transaction.
///
/// # Arguments
///
/// * `tx_id` - The ID of the transaction to abort.
pub fn rollback_tx(&self, tx_id: TxID) {
let inner = self.inner.lock();
inner.rollback_tx(tx_id);
}
/// Drops all unused row versions from the database.
///
/// A version is considered unused if it is not visible to any active transaction
/// and it is not the most recent version of the row.
pub fn drop_unused_row_versions(&self) {
let inner = self.inner.lock();
inner.drop_unused_row_versions();
}
pub fn recover(&self) -> Result<()> {
let inner = self.inner.lock();
inner.recover()
}
}
#[derive(Debug)]
pub struct DatabaseInner<Clock: LogicalClock> {
rows: SkipMap<RowID, RwLock<Vec<RowVersion>>>,
txs: SkipMap<TxID, RwLock<Transaction>>,
tx_timestamps: RefCell<BTreeMap<u64, usize>>,
tx_ids: AtomicU64,
clock: Clock,
storage: Storage,
}
impl<Clock: LogicalClock> DatabaseInner<Clock> {
fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
let tx = self
.txs
.get(&tx_id)
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
let mut tx = tx.value().write().unwrap();
assert!(tx.state == TransactionState::Active);
let id = row.id;
let row_version = RowVersion {
begin: TxTimestampOrID::TxID(tx.tx_id),
end: None,
row,
};
let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new()));
let mut versions = versions.value().write().unwrap();
versions.push(row_version);
tx.insert_to_write_set(id);
Ok(())
}
fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
let row_versions_opt = self.rows.get(&id);
if let Some(ref row_versions) = row_versions_opt {
let mut row_versions = row_versions.value().write().unwrap();
@@ -404,7 +298,21 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
Ok(false)
}
fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
/// Retrieves a row from the table with the given `id`.
///
/// This operation is performed within the scope of the transaction identified
/// by `tx_id`.
///
/// # Arguments
///
/// * `tx_id` - The ID of the transaction to perform the read operation in.
/// * `id` - The ID of the row to retrieve.
///
/// # Returns
///
/// Returns `Some(row)` with the row data if the row with the given `id` exists,
/// and `None` otherwise.
pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
let tx = self.txs.get(&tx_id).unwrap();
let tx = tx.value().read().unwrap();
assert!(tx.state == TransactionState::Active);
@@ -420,12 +328,14 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
Ok(None)
}
fn scan_row_ids(&self) -> Result<Vec<RowID>> {
/// Gets all row ids in the database.
pub fn scan_row_ids(&self) -> Result<Vec<RowID>> {
let keys = self.rows.iter().map(|entry| *entry.key());
Ok(keys.collect())
}
fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
/// Gets all row ids in the database for a given table.
pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
Ok(self
.rows
.range(
@@ -441,18 +351,30 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
.collect())
}
fn begin_tx(&mut self) -> TxID {
/// Begins a new transaction in the database.
///
/// This function starts a new transaction in the database and returns a `TxID` value
/// that you can use to perform operations within the transaction. All changes made within the
/// transaction are isolated from other transactions until you commit the transaction.
pub fn begin_tx(&self) -> TxID {
let tx_id = self.get_tx_id();
let begin_ts = self.get_timestamp();
let tx = Transaction::new(tx_id, begin_ts);
tracing::trace!("BEGIN {tx}");
let mut tx_timestamps = self.tx_timestamps.borrow_mut();
self.txs.insert(tx_id, RwLock::new(tx));
*tx_timestamps.entry(begin_ts).or_insert(0) += 1;
tx_id
}
fn commit_tx(&mut self, tx_id: TxID) -> Result<()> {
/// Commits a transaction with the specified transaction ID.
///
/// This function commits the changes made within the specified transaction and finalizes the
/// transaction. Once a transaction has been committed, all changes made within the transaction
/// are visible to other transactions that access the same data.
///
/// # Arguments
///
/// * `tx_id` - The ID of the transaction to commit.
pub fn commit_tx(&self, tx_id: TxID) -> Result<()> {
let end_ts = self.get_timestamp();
let tx = self.txs.get(&tx_id).unwrap();
let mut tx = tx.value().write().unwrap();
@@ -487,19 +409,6 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
}
tx.state = TransactionState::Committed;
tracing::trace!("COMMIT {tx}");
// We have now updated all the versions with a reference to the
// transaction ID to a timestamp and can, therefore, remove the
// transaction. Please note that when we move to lockless, the
// invariant doesn't necessarily hold anymore because another thread
// might have speculatively read a version that we want to remove.
// But that's a problem for another day.
let mut tx_timestamps = self.tx_timestamps.borrow_mut();
if let Some(timestamp_entry) = tx_timestamps.get_mut(&tx.begin_ts) {
*timestamp_entry -= 1;
if timestamp_entry == &0 {
tx_timestamps.remove(&tx.begin_ts);
}
}
self.txs.remove(&tx_id);
if !log_record.row_versions.is_empty() {
self.storage.log_tx(log_record)?;
@@ -507,7 +416,15 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
Ok(())
}
fn rollback_tx(&self, tx_id: TxID) {
/// Rolls back a transaction with the specified ID.
///
/// This function rolls back a transaction with the specified `tx_id` by
/// discarding any changes made by the transaction.
///
/// # Arguments
///
/// * `tx_id` - The ID of the transaction to abort.
pub fn rollback_tx(&self, tx_id: TxID) {
let tx = self.txs.get(&tx_id).unwrap();
let mut tx = tx.value().write().unwrap();
assert!(tx.state == TransactionState::Active);
@@ -527,64 +444,19 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
tracing::trace!("TERMINATE {tx}");
}
fn get_tx_id(&mut self) -> u64 {
/// Generates next unique transaction id
pub fn get_tx_id(&self) -> u64 {
self.tx_ids.fetch_add(1, Ordering::SeqCst)
}
fn get_timestamp(&mut self) -> u64 {
/// Gets current timestamp
pub fn get_timestamp(&self) -> u64 {
self.clock.get_timestamp()
}
/// Drops all rows that are not visible to any transaction.
/// The logic is as follows. If a row version has an end marker
/// which denotes a transaction that is not active, then we can
/// drop the row version -- it is not visible to any transaction.
/// If a row version has an end marker that denotes a timestamp T_END,
/// then we can drop the row version only if all active transactions
/// have a begin timestamp that is greater than timestamp T_END.
/// FIXME: this function is a full scan over all rows and row versions.
/// We can do better by keeping an index of row versions ordered
/// by their end timestamps.
fn drop_unused_row_versions(&self) {
let tx_timestamps = self.tx_timestamps.borrow();
let mut to_remove = Vec::new();
for entry in self.rows.iter() {
let mut row_versions = entry.value().write().unwrap();
row_versions.retain(|rv| {
let should_stay = match rv.end {
Some(TxTimestampOrID::Timestamp(version_end_ts)) => {
match tx_timestamps.first_key_value() {
// a transaction started before this row version ended,
// ergo row version is needed
Some((begin_ts, _)) => version_end_ts >= *begin_ts,
// no transaction => row version is not needed
None => false,
}
}
// Let's skip potentially complex logic if the transaction is still
// active/tracked. We will drop the row version when the transaction
// gets garbage-collected itself, it will always happen eventually.
Some(TxTimestampOrID::TxID(tx_id)) => !self.txs.contains_key(&tx_id),
// this row version is current, ergo visible
None => true,
};
if !should_stay {
tracing::debug!(
"Dropping row version {:?} {:?}-{:?}",
entry.key(),
rv.begin,
rv.end
);
}
should_stay
});
if row_versions.is_empty() {
to_remove.push(*entry.key());
}
}
for id in to_remove {
self.rows.remove(&id);
}
/// FIXME: implement in a lock-free manner
pub fn drop_unused_row_versions(&self) {
tracing::error!("Unused rows are not dropped at the moment. Will do!");
}
pub fn recover(&self) -> Result<()> {

View File

@@ -27,7 +27,7 @@ impl Storage {
}
impl Storage {
pub fn log_tx(&mut self, m: LogRecord) -> Result<()> {
pub fn log_tx(&self, m: LogRecord) -> Result<()> {
match self {
Self::JsonOnDisk(path) => {
use std::io::Write;