Switch to concurrent SkipMap for row versions (#49)

Let's switch to concurrent SkipMap as the first small step towards
lockless index...
This commit is contained in:
Pekka Enberg
2023-05-17 16:31:16 +03:00
committed by GitHub
parent 3b9e235d09
commit 51f33919d3
2 changed files with 40 additions and 21 deletions

View File

@@ -15,6 +15,7 @@ aws-sdk-s3 = "0.27.0"
aws-config = "0.55.2"
parking_lot = "0.12.1"
futures = "0.3.28"
crossbeam-skiplist = "0.1.1"
[dev-dependencies]
criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] }

View File

@@ -2,11 +2,12 @@ use crate::clock::LogicalClock;
use crate::errors::DatabaseError;
use crate::persistent_storage::Storage;
use parking_lot::Mutex;
use crossbeam_skiplist::SkipMap;
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
pub type Result<T> = std::result::Result<T, DatabaseError>;
@@ -134,7 +135,7 @@ impl<Clock: LogicalClock> Database<Clock> {
/// Creates a new database.
pub fn new(clock: Clock, storage: Storage) -> Self {
let inner = DatabaseInner {
rows: RefCell::new(BTreeMap::new()),
rows: RefCell::new(SkipMap::new()),
txs: RefCell::new(HashMap::new()),
tx_timestamps: RefCell::new(BTreeMap::new()),
tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes
@@ -289,7 +290,7 @@ impl<Clock: LogicalClock> Database<Clock> {
#[derive(Debug)]
pub struct DatabaseInner<Clock: LogicalClock> {
rows: RefCell<BTreeMap<RowID, Vec<RowVersion>>>,
rows: RefCell<SkipMap<RowID, RwLock<Vec<RowVersion>>>>,
txs: RefCell<HashMap<TxID, Transaction>>,
tx_timestamps: RefCell<BTreeMap<u64, usize>>,
tx_ids: AtomicU64,
@@ -310,8 +311,10 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
end: None,
row,
};
let mut rows = self.rows.borrow_mut();
rows.entry(id).or_insert_with(Vec::new).push(row_version);
let rows = self.rows.borrow_mut();
let versions = rows.get_or_insert_with(id, || RwLock::new(Vec::new()));
let mut versions = versions.value().write().unwrap();
versions.push(row_version);
tx.insert_to_write_set(id);
Ok(())
}
@@ -321,8 +324,10 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
// NOTICE: They *are* dropped before an await point!!! But the await is conditional,
// so I think clippy is just confused.
let mut txs = self.txs.borrow_mut();
let mut rows = self.rows.borrow_mut();
if let Some(row_versions) = rows.get_mut(&id) {
let rows = self.rows.borrow_mut();
let row_versions_opt = rows.get(&id);
if let Some(ref row_versions) = row_versions_opt {
let mut row_versions = row_versions.value().write().unwrap();
for rv in row_versions.iter_mut().rev() {
let tx = txs
.get(&tx_id)
@@ -330,6 +335,8 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
assert!(tx.state == TransactionState::Active);
if is_write_write_conflict(&txs, tx, rv) {
drop(txs);
drop(row_versions);
drop(row_versions_opt);
drop(rows);
self.rollback_tx(tx_id);
return Err(DatabaseError::WriteWriteConflict);
@@ -353,6 +360,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
assert!(tx.state == TransactionState::Active);
let rows = self.rows.borrow();
if let Some(row_versions) = rows.get(&id) {
let row_versions = row_versions.value().read().unwrap();
for rv in row_versions.iter().rev() {
if is_version_visible(&txs, tx, rv) {
tx.insert_to_read_set(id);
@@ -365,11 +373,12 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
fn scan_row_ids(&self) -> Result<Vec<RowID>> {
let rows = self.rows.borrow();
Ok(rows.keys().cloned().collect())
let keys = rows.iter().map(|entry| *entry.key());
Ok(keys.collect())
}
fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
let rows = self.rows.borrow();
let rows = &self.rows.borrow();
Ok(rows
.range(
RowID {
@@ -380,8 +389,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
row_id: u64::MAX,
},
)
.map(|(k, _)| k)
.cloned()
.map(|entry| *entry.key())
.collect())
}
@@ -407,12 +415,13 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
assert!(tx.state == TransactionState::Active);
}
}
let mut rows = self.rows.borrow_mut();
let rows = self.rows.borrow_mut();
tx.state = TransactionState::Preparing;
tracing::trace!("PREPARE {tx}");
let mut log_record: LogRecord = LogRecord::new(end_ts);
for id in &tx.write_set {
if let Some(row_versions) = rows.get_mut(id) {
if let Some(row_versions) = rows.get(id) {
let mut row_versions = row_versions.value().write().unwrap();
for row_version in row_versions.iter_mut() {
if let TxTimestampOrID::TxID(id) = row_version.begin {
if id == tx_id {
@@ -459,9 +468,10 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
assert!(tx.state == TransactionState::Active);
tx.state = TransactionState::Aborted;
tracing::trace!("ABORT {tx}");
let mut rows = self.rows.borrow_mut();
let rows = self.rows.borrow_mut();
for id in &tx.write_set {
if let Some(row_versions) = rows.get_mut(id) {
if let Some(row_versions) = rows.get(id) {
let mut row_versions = row_versions.value().write().unwrap();
row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id));
if row_versions.is_empty() {
rows.remove(id);
@@ -493,9 +503,10 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
fn drop_unused_row_versions(&self) {
let txs = self.txs.borrow();
let tx_timestamps = self.tx_timestamps.borrow();
let mut rows = self.rows.borrow_mut();
let rows = self.rows.borrow_mut();
let mut to_remove = Vec::new();
for (id, row_versions) in rows.iter_mut() {
for entry in rows.iter() {
let mut row_versions = entry.value().write().unwrap();
row_versions.retain(|rv| {
let should_stay = match rv.end {
Some(TxTimestampOrID::Timestamp(version_end_ts)) => {
@@ -515,12 +526,17 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
None => true,
};
if !should_stay {
tracing::debug!("Dropping row version {:?} {:?}-{:?}", id, rv.begin, rv.end);
tracing::debug!(
"Dropping row version {:?} {:?}-{:?}",
entry.key(),
rv.begin,
rv.end
);
}
should_stay
});
if row_versions.is_empty() {
to_remove.push(*id);
to_remove.push(*entry.key());
}
}
for id in to_remove {
@@ -533,8 +549,10 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
for record in tx_log {
tracing::debug!("RECOVERING {:?}", record);
for version in record.row_versions {
let mut rows = self.rows.borrow_mut();
let row_versions = rows.entry(version.row.id).or_insert_with(Vec::new);
let rows = self.rows.borrow_mut();
let row_versions =
rows.get_or_insert_with(version.row.id, || RwLock::new(Vec::new()));
let mut row_versions = row_versions.value().write().unwrap();
row_versions.push(version);
}
self.clock.reset(record.tx_timestamp);