From 51f33919d353f28a9c19da1a3d55ae135614c174 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 17 May 2023 16:31:16 +0300 Subject: [PATCH] Switch to concurrent SkipMap for row versions (#49) Let's switch to concurrent SkipMap as the first small step towards lockless index... --- core/mvcc/mvcc-rs/Cargo.toml | 1 + core/mvcc/mvcc-rs/src/database.rs | 60 ++++++++++++++++++++----------- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/core/mvcc/mvcc-rs/Cargo.toml b/core/mvcc/mvcc-rs/Cargo.toml index f2087c651..21c83167d 100644 --- a/core/mvcc/mvcc-rs/Cargo.toml +++ b/core/mvcc/mvcc-rs/Cargo.toml @@ -15,6 +15,7 @@ aws-sdk-s3 = "0.27.0" aws-config = "0.55.2" parking_lot = "0.12.1" futures = "0.3.28" +crossbeam-skiplist = "0.1.1" [dev-dependencies] criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] } diff --git a/core/mvcc/mvcc-rs/src/database.rs b/core/mvcc/mvcc-rs/src/database.rs index c69da6c2e..a60ac7663 100644 --- a/core/mvcc/mvcc-rs/src/database.rs +++ b/core/mvcc/mvcc-rs/src/database.rs @@ -2,11 +2,12 @@ use crate::clock::LogicalClock; use crate::errors::DatabaseError; use crate::persistent_storage::Storage; use parking_lot::Mutex; +use crossbeam_skiplist::SkipMap; use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; pub type Result = std::result::Result; @@ -134,7 +135,7 @@ impl Database { /// Creates a new database. pub fn new(clock: Clock, storage: Storage) -> Self { let inner = DatabaseInner { - rows: RefCell::new(BTreeMap::new()), + rows: RefCell::new(SkipMap::new()), txs: RefCell::new(HashMap::new()), tx_timestamps: RefCell::new(BTreeMap::new()), tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes @@ -289,7 +290,7 @@ impl Database { #[derive(Debug)] pub struct DatabaseInner { - rows: RefCell>>, + rows: RefCell>>>, txs: RefCell>, tx_timestamps: RefCell>, tx_ids: AtomicU64, @@ -310,8 +311,10 @@ impl DatabaseInner { end: None, row, }; - let mut rows = self.rows.borrow_mut(); - rows.entry(id).or_insert_with(Vec::new).push(row_version); + let rows = self.rows.borrow_mut(); + let versions = rows.get_or_insert_with(id, || RwLock::new(Vec::new())); + let mut versions = versions.value().write().unwrap(); + versions.push(row_version); tx.insert_to_write_set(id); Ok(()) } @@ -321,8 +324,10 @@ impl DatabaseInner { // NOTICE: They *are* dropped before an await point!!! But the await is conditional, // so I think clippy is just confused. let mut txs = self.txs.borrow_mut(); - let mut rows = self.rows.borrow_mut(); - if let Some(row_versions) = rows.get_mut(&id) { + let rows = self.rows.borrow_mut(); + let row_versions_opt = rows.get(&id); + if let Some(ref row_versions) = row_versions_opt { + let mut row_versions = row_versions.value().write().unwrap(); for rv in row_versions.iter_mut().rev() { let tx = txs .get(&tx_id) @@ -330,6 +335,8 @@ impl DatabaseInner { assert!(tx.state == TransactionState::Active); if is_write_write_conflict(&txs, tx, rv) { drop(txs); + drop(row_versions); + drop(row_versions_opt); drop(rows); self.rollback_tx(tx_id); return Err(DatabaseError::WriteWriteConflict); @@ -353,6 +360,7 @@ impl DatabaseInner { assert!(tx.state == TransactionState::Active); let rows = self.rows.borrow(); if let Some(row_versions) = rows.get(&id) { + let row_versions = row_versions.value().read().unwrap(); for rv in row_versions.iter().rev() { if is_version_visible(&txs, tx, rv) { tx.insert_to_read_set(id); @@ -365,11 +373,12 @@ impl DatabaseInner { fn scan_row_ids(&self) -> Result> { let rows = self.rows.borrow(); - Ok(rows.keys().cloned().collect()) + let keys = rows.iter().map(|entry| *entry.key()); + Ok(keys.collect()) } fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { - let rows = self.rows.borrow(); + let rows = &self.rows.borrow(); Ok(rows .range( RowID { @@ -380,8 +389,7 @@ impl DatabaseInner { row_id: u64::MAX, }, ) - .map(|(k, _)| k) - .cloned() + .map(|entry| *entry.key()) .collect()) } @@ -407,12 +415,13 @@ impl DatabaseInner { assert!(tx.state == TransactionState::Active); } } - let mut rows = self.rows.borrow_mut(); + let rows = self.rows.borrow_mut(); tx.state = TransactionState::Preparing; tracing::trace!("PREPARE {tx}"); let mut log_record: LogRecord = LogRecord::new(end_ts); for id in &tx.write_set { - if let Some(row_versions) = rows.get_mut(id) { + if let Some(row_versions) = rows.get(id) { + let mut row_versions = row_versions.value().write().unwrap(); for row_version in row_versions.iter_mut() { if let TxTimestampOrID::TxID(id) = row_version.begin { if id == tx_id { @@ -459,9 +468,10 @@ impl DatabaseInner { assert!(tx.state == TransactionState::Active); tx.state = TransactionState::Aborted; tracing::trace!("ABORT {tx}"); - let mut rows = self.rows.borrow_mut(); + let rows = self.rows.borrow_mut(); for id in &tx.write_set { - if let Some(row_versions) = rows.get_mut(id) { + if let Some(row_versions) = rows.get(id) { + let mut row_versions = row_versions.value().write().unwrap(); row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id)); if row_versions.is_empty() { rows.remove(id); @@ -493,9 +503,10 @@ impl DatabaseInner { fn drop_unused_row_versions(&self) { let txs = self.txs.borrow(); let tx_timestamps = self.tx_timestamps.borrow(); - let mut rows = self.rows.borrow_mut(); + let rows = self.rows.borrow_mut(); let mut to_remove = Vec::new(); - for (id, row_versions) in rows.iter_mut() { + for entry in rows.iter() { + let mut row_versions = entry.value().write().unwrap(); row_versions.retain(|rv| { let should_stay = match rv.end { Some(TxTimestampOrID::Timestamp(version_end_ts)) => { @@ -515,12 +526,17 @@ impl DatabaseInner { None => true, }; if !should_stay { - tracing::debug!("Dropping row version {:?} {:?}-{:?}", id, rv.begin, rv.end); + tracing::debug!( + "Dropping row version {:?} {:?}-{:?}", + entry.key(), + rv.begin, + rv.end + ); } should_stay }); if row_versions.is_empty() { - to_remove.push(*id); + to_remove.push(*entry.key()); } } for id in to_remove { @@ -533,8 +549,10 @@ impl DatabaseInner { for record in tx_log { tracing::debug!("RECOVERING {:?}", record); for version in record.row_versions { - let mut rows = self.rows.borrow_mut(); - let row_versions = rows.entry(version.row.id).or_insert_with(Vec::new); + let rows = self.rows.borrow_mut(); + let row_versions = + rows.get_or_insert_with(version.row.id, || RwLock::new(Vec::new())); + let mut row_versions = row_versions.value().write().unwrap(); row_versions.push(version); } self.clock.reset(record.tx_timestamp);