diff --git a/core/mvcc/.github/workflows/smoke_test.yml b/core/mvcc/.github/workflows/smoke_test.yml new file mode 100644 index 000000000..3a00d72b4 --- /dev/null +++ b/core/mvcc/.github/workflows/smoke_test.yml @@ -0,0 +1,25 @@ +name: Rust + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +env: + CARGO_TERM_COLOR: always + RUST_LOG: info,mvcc_rs=trace + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Check + run: cargo check --all-targets --all-features + - name: Clippy + run: cargo clippy --all-targets --all-features -- -D warnings + - name: Run tests + run: cargo test --verbose diff --git a/core/mvcc/.gitignore b/core/mvcc/.gitignore new file mode 100644 index 000000000..1e7caa9ea --- /dev/null +++ b/core/mvcc/.gitignore @@ -0,0 +1,2 @@ +Cargo.lock +target/ diff --git a/core/mvcc/Cargo.toml b/core/mvcc/Cargo.toml new file mode 100644 index 000000000..7ebb0ebff --- /dev/null +++ b/core/mvcc/Cargo.toml @@ -0,0 +1,11 @@ +[workspace] +resolver = "2" +members = [ + "mvcc-rs", + "bindings/c", +] + +[profile.release] +codegen-units = 1 +panic = "abort" +strip = true diff --git a/core/mvcc/LICENSE.md b/core/mvcc/LICENSE.md new file mode 100644 index 000000000..0c99a0831 --- /dev/null +++ b/core/mvcc/LICENSE.md @@ -0,0 +1,20 @@ +MIT License + +Copyright 2023 Pekka Enberg + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/core/mvcc/README.md b/core/mvcc/README.md new file mode 100644 index 000000000..7dcc797d9 --- /dev/null +++ b/core/mvcc/README.md @@ -0,0 +1,58 @@ +# Tihku + +Tihku is an _work-in-progress_, open-source implementation of the Hekaton multi-version concurrency control (MVCC) written in Rust. +The project aims to provide a foundational building block for implementing database management systems. + +One of the projects using Tihku is an experimental [libSQL branch with MVCC](https://github.com/penberg/libsql/tree/mvcc) that aims to implement `BEGIN CONCURRENT` with Tihku improve SQLite write concurrency. + +## Features + +* Main memory architecture, rows are accessed via an index +* Optimistic multi-version concurrency control +* Rust and C APIs + +## Experimental Evaluation + +**Single-threaded micro-benchmarks** + +Operations | Throughput +-----------------------------------|------------ +`begin_tx`, `read`, and `commit` | 2.2M ops/second +`begin_tx`, `update`, and `commit` | 2.2M ops/second +`read` | 12.9M ops/second +`update` | 6.2M ops/second + +(The `cargo bench` was run on a AMD Ryzen 9 3900XT 2.2 GHz CPU.) + +## Development + +Run tests: + +```console +cargo test +``` + +Test coverage report: + +```console +cargo tarpaulin -o html +``` + +Run benchmarks: + +```console +cargo bench +``` + +Run benchmarks and generate flamegraphs: + +```console +echo -1 | sudo tee /proc/sys/kernel/perf_event_paranoid +cargo bench --bench my_benchmark -- --profile-time=5 +``` + +## References + +Larson et al. [High-Performance Concurrency Control Mechanisms for Main-Memory Databases](https://vldb.org/pvldb/vol5/p298_per-akelarson_vldb2012.pdf). VLDB '11 + +Paper errata: The visibility check in Table 2 is wrong and causes uncommitted delete to become visible to transactions (fixed in [commit 6ca3773]( https://github.com/penberg/mvcc-rs/commit/6ca377320bb59b52ecc0430b9e5e422e8d61658d)). diff --git a/core/mvcc/bindings/c/Cargo.toml b/core/mvcc/bindings/c/Cargo.toml new file mode 100644 index 000000000..be23a9a1b --- /dev/null +++ b/core/mvcc/bindings/c/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "mvcc-c" +version = "0.0.0" +edition = "2021" + +[lib] +crate-type = ["cdylib", "staticlib"] +doc = false + +[build-dependencies] +cbindgen = "0.24.0" + +[dependencies] +base64 = "0.21.0" +mvcc-rs = { path = "../../mvcc-rs" } +tracing = "0.1.37" +tracing-subscriber = { version = "0" } + +[features] +default = [] +json_on_disk_storage = [] +s3_storage = [] diff --git a/core/mvcc/bindings/c/build.rs b/core/mvcc/bindings/c/build.rs new file mode 100644 index 000000000..f418d0a9a --- /dev/null +++ b/core/mvcc/bindings/c/build.rs @@ -0,0 +1,8 @@ +use std::path::Path; + +fn main() { + let header_file = Path::new("include").join("mvcc.h"); + cbindgen::generate(".") + .expect("Failed to generate C bindings") + .write_to_file(header_file); +} diff --git a/core/mvcc/bindings/c/cbindgen.toml b/core/mvcc/bindings/c/cbindgen.toml new file mode 100644 index 000000000..1b5ac2f31 --- /dev/null +++ b/core/mvcc/bindings/c/cbindgen.toml @@ -0,0 +1,7 @@ +language = "C" +cpp_compat = true +include_guard = "MVCC_H" +line_length = 120 +no_includes = true +style = "type" +sys_includes = ["stdint.h"] diff --git a/core/mvcc/bindings/c/include/mvcc.h b/core/mvcc/bindings/c/include/mvcc.h new file mode 100644 index 000000000..eead91b01 --- /dev/null +++ b/core/mvcc/bindings/c/include/mvcc.h @@ -0,0 +1,64 @@ +#ifndef MVCC_H +#define MVCC_H + +#include + +typedef enum { + MVCC_OK = 0, + MVCC_IO_ERROR_READ = 266, + MVCC_IO_ERROR_WRITE = 778, +} MVCCError; + +typedef struct DbContext DbContext; + +typedef struct ScanCursorContext ScanCursorContext; + +typedef const DbContext *MVCCDatabaseRef; + +typedef ScanCursorContext *MVCCScanCursorRef; + +#ifdef __cplusplus +extern "C" { +#endif // __cplusplus + +MVCCDatabaseRef MVCCDatabaseOpen(const char *path); + +void MVCCDatabaseClose(MVCCDatabaseRef db); + +uint64_t MVCCTransactionBegin(MVCCDatabaseRef db); + +MVCCError MVCCTransactionCommit(MVCCDatabaseRef db, uint64_t tx_id); + +MVCCError MVCCTransactionRollback(MVCCDatabaseRef db, uint64_t tx_id); + +MVCCError MVCCDatabaseInsert(MVCCDatabaseRef db, + uint64_t tx_id, + uint64_t table_id, + uint64_t row_id, + const void *value_ptr, + uintptr_t value_len); + +MVCCError MVCCDatabaseRead(MVCCDatabaseRef db, + uint64_t tx_id, + uint64_t table_id, + uint64_t row_id, + uint8_t **value_ptr, + int64_t *value_len); + +void MVCCFreeStr(void *ptr); + +MVCCScanCursorRef MVCCScanCursorOpen(MVCCDatabaseRef db, uint64_t tx_id, uint64_t table_id); + +void MVCCScanCursorClose(MVCCScanCursorRef cursor); + +MVCCError MVCCScanCursorRead(MVCCScanCursorRef cursor, uint8_t **value_ptr, int64_t *value_len); + +int MVCCScanCursorNext(MVCCScanCursorRef cursor); + +uint64_t MVCCScanCursorPosition(MVCCScanCursorRef cursor); + +#ifdef __cplusplus +} // extern "C" +#endif // __cplusplus + +#endif /* MVCC_H */ diff --git a/core/mvcc/bindings/c/src/errors.rs b/core/mvcc/bindings/c/src/errors.rs new file mode 100644 index 000000000..65a174b50 --- /dev/null +++ b/core/mvcc/bindings/c/src/errors.rs @@ -0,0 +1,6 @@ +#[repr(C)] +pub enum MVCCError { + MVCC_OK = 0, + MVCC_IO_ERROR_READ = 266, + MVCC_IO_ERROR_WRITE = 778, +} diff --git a/core/mvcc/bindings/c/src/lib.rs b/core/mvcc/bindings/c/src/lib.rs new file mode 100644 index 000000000..7d222a8d2 --- /dev/null +++ b/core/mvcc/bindings/c/src/lib.rs @@ -0,0 +1,298 @@ +#![allow(non_camel_case_types)] +#![allow(clippy::missing_safety_doc)] + +mod errors; +mod types; + +use errors::MVCCError; +use mvcc_rs::persistent_storage::{s3, Storage}; +use mvcc_rs::*; +use types::{DbContext, MVCCDatabaseRef, MVCCScanCursorRef, ScanCursorContext}; + +/// cbindgen:ignore +type Clock = clock::LocalClock; + +/// cbindgen:ignore +/// Note - We use String type in C bindings as Row type. Type is generic. +type Db = database::Database; + +/// cbindgen:ignore +/// Note - We use String type in C bindings as Row type. Type is generic. +type ScanCursor = cursor::ScanCursor<'static, Clock, String>; + +static INIT_RUST_LOG: std::sync::Once = std::sync::Once::new(); + +fn storage_for(main_db_path: &str) -> database::Result { + // TODO: let's accept an URL instead of main_db_path here, so we can + // pass custom S3 endpoints, options, etc. + if cfg!(feature = "json_on_disk_storage") { + tracing::info!("JSONonDisk storage stored in {main_db_path}-mvcc"); + return Ok(Storage::new_json_on_disk(format!("{main_db_path}-mvcc"))); + } + if cfg!(feature = "s3_storage") { + tracing::info!("S3 storage for {main_db_path}"); + let options = s3::Options::with_create_bucket_if_not_exists(true); + return Storage::new_s3(options); + } + tracing::info!("No persistent storage for {main_db_path}"); + Ok(Storage::new_noop()) +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCDatabaseOpen(path: *const std::ffi::c_char) -> MVCCDatabaseRef { + INIT_RUST_LOG.call_once(|| { + tracing_subscriber::fmt::init(); + }); + + tracing::debug!("MVCCDatabaseOpen"); + + let clock = clock::LocalClock::new(); + let main_db_path = unsafe { std::ffi::CStr::from_ptr(path) }; + let main_db_path = match main_db_path.to_str() { + Ok(path) => path, + Err(_) => { + tracing::error!("Invalid UTF-8 path"); + return MVCCDatabaseRef::null(); + } + }; + + tracing::debug!("mvccrs: opening persistent storage for {main_db_path}"); + let storage = match storage_for(main_db_path) { + Ok(storage) => storage, + Err(e) => { + tracing::error!("Failed to open persistent storage: {e}"); + return MVCCDatabaseRef::null(); + } + }; + let db = Db::new(clock, storage); + + db.recover().ok(); + + let db = Box::leak(Box::new(DbContext { db })); + MVCCDatabaseRef::from(db) +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCDatabaseClose(db: MVCCDatabaseRef) { + tracing::debug!("MVCCDatabaseClose"); + if db.is_null() { + tracing::debug!("warning: `db` is null in MVCCDatabaseClose()"); + return; + } + let _ = unsafe { Box::from_raw(db.get_ref_mut()) }; +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCTransactionBegin(db: MVCCDatabaseRef) -> u64 { + let db = db.get_ref(); + let tx_id = db.begin_tx(); + tracing::debug!("MVCCTransactionBegin: {tx_id}"); + tx_id +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCTransactionCommit(db: MVCCDatabaseRef, tx_id: u64) -> MVCCError { + let db = db.get_ref(); + tracing::debug!("MVCCTransactionCommit: {tx_id}"); + match db.commit_tx(tx_id) { + Ok(()) => MVCCError::MVCC_OK, + Err(e) => { + tracing::error!("MVCCTransactionCommit: {e}"); + MVCCError::MVCC_IO_ERROR_WRITE + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCTransactionRollback(db: MVCCDatabaseRef, tx_id: u64) -> MVCCError { + let db = db.get_ref(); + tracing::debug!("MVCCTransactionRollback: {tx_id}"); + db.rollback_tx(tx_id); + MVCCError::MVCC_OK +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCDatabaseInsert( + db: MVCCDatabaseRef, + tx_id: u64, + table_id: u64, + row_id: u64, + value_ptr: *const std::ffi::c_void, + value_len: usize, +) -> MVCCError { + let db = db.get_ref(); + let value = std::slice::from_raw_parts(value_ptr as *const u8, value_len); + let data = match std::str::from_utf8(value) { + Ok(value) => value.to_string(), + Err(_) => { + tracing::info!("Invalid UTF-8, let's base64 this fellow"); + use base64::{engine::general_purpose, Engine as _}; + general_purpose::STANDARD.encode(value) + } + }; + let id = database::RowID { table_id, row_id }; + let row = database::Row { id, data }; + tracing::debug!("MVCCDatabaseInsert: {row:?}"); + match db.insert(tx_id, row) { + Ok(_) => { + tracing::debug!("MVCCDatabaseInsert: success"); + MVCCError::MVCC_OK + } + Err(e) => { + tracing::error!("MVCCDatabaseInsert: {e}"); + MVCCError::MVCC_IO_ERROR_WRITE + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCDatabaseRead( + db: MVCCDatabaseRef, + tx_id: u64, + table_id: u64, + row_id: u64, + value_ptr: *mut *mut u8, + value_len: *mut i64, +) -> MVCCError { + let db = db.get_ref(); + + match { + let id = database::RowID { table_id, row_id }; + let maybe_row = db.read(tx_id, id); + match maybe_row { + Ok(Some(row)) => { + tracing::debug!("Found row {row:?}"); + let str_len = row.data.len() + 1; + let value = std::ffi::CString::new(row.data.as_bytes()).unwrap_or_default(); + unsafe { + *value_ptr = value.into_raw() as *mut u8; + *value_len = str_len as i64; + } + } + _ => unsafe { *value_len = -1 }, + }; + Ok::<(), mvcc_rs::errors::DatabaseError>(()) + } { + Ok(_) => { + tracing::debug!("MVCCDatabaseRead: success"); + MVCCError::MVCC_OK + } + Err(e) => { + tracing::error!("MVCCDatabaseRead: {e}"); + MVCCError::MVCC_IO_ERROR_READ + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCFreeStr(ptr: *mut std::ffi::c_void) { + if ptr.is_null() { + return; + } + let _ = std::ffi::CString::from_raw(ptr as *mut std::ffi::c_char); +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCScanCursorOpen( + db: MVCCDatabaseRef, + tx_id: u64, + table_id: u64, +) -> MVCCScanCursorRef { + tracing::debug!("MVCCScanCursorOpen()"); + // Reference is transmuted to &'static in order to be able to pass the cursor back to C. + // The contract with C is to never use a cursor after MVCCDatabaseClose() has been called. + let db = unsafe { std::mem::transmute::<&Db, &'static Db>(db.get_ref()) }; + match mvcc_rs::cursor::ScanCursor::new(db, tx_id, table_id) { + Ok(cursor) => { + if cursor.is_empty() { + tracing::debug!("Cursor is empty"); + return MVCCScanCursorRef { + ptr: std::ptr::null_mut(), + }; + } + tracing::debug!("Cursor open: {cursor:?}"); + MVCCScanCursorRef { + ptr: Box::into_raw(Box::new(ScanCursorContext { cursor })), + } + } + Err(e) => { + tracing::error!("MVCCScanCursorOpen: {e}"); + MVCCScanCursorRef { + ptr: std::ptr::null_mut(), + } + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCScanCursorClose(cursor: MVCCScanCursorRef) { + tracing::debug!("MVCCScanCursorClose()"); + if cursor.ptr.is_null() { + tracing::debug!("warning: `cursor` is null in MVCCScanCursorClose()"); + return; + } + let cursor = unsafe { Box::from_raw(cursor.ptr) }.cursor; + cursor.close().ok(); +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCScanCursorRead( + cursor: MVCCScanCursorRef, + value_ptr: *mut *mut u8, + value_len: *mut i64, +) -> MVCCError { + tracing::debug!("MVCCScanCursorRead()"); + if cursor.ptr.is_null() { + tracing::debug!("warning: `cursor` is null in MVCCScanCursorRead()"); + return MVCCError::MVCC_IO_ERROR_READ; + } + let cursor = cursor.get_ref(); + + match { + let maybe_row = cursor.current_row(); + match maybe_row { + Ok(Some(row)) => { + tracing::debug!("Found row {row:?}"); + let str_len = row.data.len() + 1; + let value = std::ffi::CString::new(row.data.as_bytes()).unwrap_or_default(); + unsafe { + *value_ptr = value.into_raw() as *mut u8; + *value_len = str_len as i64; + } + } + _ => unsafe { *value_len = -1 }, + }; + Ok::<(), mvcc_rs::errors::DatabaseError>(()) + } { + Ok(_) => { + tracing::debug!("MVCCDatabaseRead: success"); + MVCCError::MVCC_OK + } + Err(e) => { + tracing::error!("MVCCDatabaseRead: {e}"); + MVCCError::MVCC_IO_ERROR_READ + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCScanCursorNext(cursor: MVCCScanCursorRef) -> std::ffi::c_int { + let cursor = cursor.get_ref_mut(); + tracing::debug!("MVCCScanCursorNext(): {}", cursor.index); + if cursor.forward() { + tracing::debug!("Forwarded to {}", cursor.index); + 1 + } else { + tracing::debug!("Forwarded to end"); + 0 + } +} + +#[no_mangle] +pub unsafe extern "C" fn MVCCScanCursorPosition(cursor: MVCCScanCursorRef) -> u64 { + let cursor = cursor.get_ref(); + cursor + .current_row_id() + .map(|row_id| row_id.row_id) + .unwrap_or(0) +} diff --git a/core/mvcc/bindings/c/src/types.rs b/core/mvcc/bindings/c/src/types.rs new file mode 100644 index 000000000..52c21951d --- /dev/null +++ b/core/mvcc/bindings/c/src/types.rs @@ -0,0 +1,79 @@ +use crate::Db; + +#[derive(Clone, Debug)] +#[repr(transparent)] +pub struct MVCCDatabaseRef { + ptr: *const DbContext, +} + +impl MVCCDatabaseRef { + pub fn null() -> MVCCDatabaseRef { + MVCCDatabaseRef { + ptr: std::ptr::null(), + } + } + + pub fn is_null(&self) -> bool { + self.ptr.is_null() + } + + pub fn get_ref(&self) -> &Db { + &unsafe { &*(self.ptr) }.db + } + + #[allow(clippy::mut_from_ref)] + pub fn get_ref_mut(&self) -> &mut Db { + let ptr_mut = self.ptr as *mut DbContext; + &mut unsafe { &mut (*ptr_mut) }.db + } +} + +#[allow(clippy::from_over_into)] +impl From<&DbContext> for MVCCDatabaseRef { + fn from(value: &DbContext) -> Self { + Self { ptr: value } + } +} + +#[allow(clippy::from_over_into)] +impl From<&mut DbContext> for MVCCDatabaseRef { + fn from(value: &mut DbContext) -> Self { + Self { ptr: value } + } +} + +pub struct DbContext { + pub(crate) db: Db, +} + +pub struct ScanCursorContext { + pub(crate) cursor: crate::ScanCursor, +} + +#[derive(Clone, Debug)] +#[repr(transparent)] +pub struct MVCCScanCursorRef { + pub ptr: *mut ScanCursorContext, +} + +impl MVCCScanCursorRef { + pub fn null() -> MVCCScanCursorRef { + MVCCScanCursorRef { + ptr: std::ptr::null_mut(), + } + } + + pub fn is_null(&self) -> bool { + self.ptr.is_null() + } + + pub fn get_ref(&self) -> &crate::ScanCursor { + &unsafe { &*(self.ptr) }.cursor + } + + #[allow(clippy::mut_from_ref)] + pub fn get_ref_mut(&self) -> &mut crate::ScanCursor { + let ptr_mut = self.ptr as *mut ScanCursorContext; + &mut unsafe { &mut (*ptr_mut) }.cursor + } +} diff --git a/core/mvcc/docs/DESIGN.md b/core/mvcc/docs/DESIGN.md new file mode 100644 index 000000000..37943d992 --- /dev/null +++ b/core/mvcc/docs/DESIGN.md @@ -0,0 +1,19 @@ +# Design + +## Persistent storage + +Persistent storage must implement the `Storage` trait that the MVCC module uses for transaction logging. + +Figure 1 shows an example of write-ahead log across three transactions. +The first transaction T0 executes a `INSERT (id) VALUES (1)` statement, which results in a log record with `id` set to `1`, begin timestamp to 0 (which is the transaction ID) and end timestamp as infinity (meaning the row version is still visible). +The second transaction T1 executes another `INSERT` statement, which adds another log record to the transaction log with `id` set to `2`, begin timesstamp to 1 and end timestamp as infinity, similar to what T0 did. +Finally, a third transaction T2 executes two statements: `DELETE WHERE id = 1` and `INSERT (id) VALUES (3)`. The first one results in a log record with `id` set to `1` and begin timestamp set to 0 (which is the transaction that created the entry). However, the end timestamp is now set to 2 (the current transaction), which means the entry is now deleted. +The second statement results in an entry in the transaction log similar to the `INSERT` statements in T0 and T1. + +![Transactions](figures/transactions.png) +

+Figure 1. Transaction log of three transactions. +

+ +When MVCC bootstraps or recovers, it simply redos the transaction log. +If the transaction log grows big, we can checkpoint it it by dropping all entries that are no longer visible after the the latest transaction and create a snapshot. diff --git a/core/mvcc/docs/figures/transactions.excalidraw b/core/mvcc/docs/figures/transactions.excalidraw new file mode 100644 index 000000000..cee1947f9 --- /dev/null +++ b/core/mvcc/docs/figures/transactions.excalidraw @@ -0,0 +1,656 @@ +{ + "type": "excalidraw", + "version": 2, + "source": "https://excalidraw.com", + "elements": [ + { + "id": "tFvpBUMWe3qPFUTQVV14X", + "type": "text", + "x": 233.14035848761839, + "y": 205.73272444200816, + "width": 278.57781982421875, + "height": 25, + "angle": 0, + "strokeColor": "#087f5b", + "backgroundColor": "#82c91e", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "groupIds": [], + "roundness": null, + "seed": 94988319, + "version": 510, + "versionNonce": 1210831775, + "isDeleted": false, + "boundElements": null, + "updated": 1683370319070, + "link": null, + "locked": false, + "text": "", + "fontSize": 20, + "fontFamily": 1, + "textAlign": "left", + "verticalAlign": "top", + "baseline": 18, + "containerId": null, + "originalText": "", + "lineHeight": 1.25 + }, + { + "type": "text", + "version": 515, + "versionNonce": 1881893969, + "isDeleted": false, + "id": "7i88n1PIb89NxUbVQmTTi", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 938.4614491858606, + "y": 311.23272444200813, + "strokeColor": "#0b7285", + "backgroundColor": "#82c91e", + "width": 279.0400085449219, + "height": 25, + "seed": 1123646321, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "", + "lineHeight": 1.25, + "baseline": 18 + }, + { + "type": "text", + "version": 556, + "versionNonce": 153125934, + "isDeleted": false, + "id": "Yh8XLtKqXUUYmcmG4SEXn", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 581.1603475012903, + "y": 256.23272444200813, + "strokeColor": "#e67700", + "backgroundColor": "#82c91e", + "width": 270.71783447265625, + "height": 25, + "seed": 1685524017, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683371076075, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "", + "lineHeight": 1.25, + "baseline": 18 + }, + { + "id": "8l0CCJzCAtOLt_2GRcNpa", + "type": "text", + "x": 256.1403584876185, + "y": 409.73272444200813, + "width": 234.41998291015625, + "height": 75, + "angle": 0, + "strokeColor": "#087f5b", + "backgroundColor": "#82c91e", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "groupIds": [], + "roundness": null, + "seed": 583129809, + "version": 570, + "versionNonce": 561756721, + "isDeleted": false, + "boundElements": null, + "updated": 1683370316909, + "link": null, + "locked": false, + "text": "BEGIN\nINSERT (id) VALUEs (1)\nCOMMIT", + "fontSize": 20, + "fontFamily": 1, + "textAlign": "left", + "verticalAlign": "top", + "baseline": 68, + "containerId": null, + "originalText": "BEGIN\nINSERT (id) VALUEs (1)\nCOMMIT", + "lineHeight": 1.25 + }, + { + "type": "text", + "version": 628, + "versionNonce": 282656095, + "isDeleted": false, + "id": "3m7VluAP5tair6-60b_sp", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 962.0903554358606, + "y": 416.23272444200813, + "strokeColor": "#0b7285", + "backgroundColor": "#82c91e", + "width": 243.91998291015625, + "height": 100, + "seed": 479705617, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "BEGIN\nDELETE WHERE id =1\nINSERT (id) VALUES (3)\nCOMMIT", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "BEGIN\nDELETE WHERE id =1\nINSERT (id) VALUES (3)\nCOMMIT", + "lineHeight": 1.25, + "baseline": 93 + }, + { + "type": "text", + "version": 574, + "versionNonce": 1128746001, + "isDeleted": false, + "id": "Z-Mh1kti2oC6sIMnuGluo", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 613.0903554358607, + "y": 417.23272444200813, + "strokeColor": "#e67700", + "backgroundColor": "#82c91e", + "width": 243.239990234375, + "height": 75, + "seed": 580440625, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "BEGIN\nINSERT (id) VALUEs (2)\nCOMMIT", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "BEGIN\nINSERT (id) VALUEs (2)\nCOMMIT", + "lineHeight": 1.25, + "baseline": 68 + }, + { + "type": "line", + "version": 1502, + "versionNonce": 1835608607, + "isDeleted": false, + "id": "VuJNZCgz1Y0WEWwug7pGk", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 226.3083636621349, + "y": 173.11701218356845, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 20.336010349032712, + "height": 203.23377930246647, + "seed": 1879839231, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "startBinding": null, + "endBinding": null, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": null, + "points": [ + [ + 0, + 0 + ], + [ + -20.264781987976257, + -0.0011773927935071482 + ], + [ + -20.336010349032712, + 203.23260190967298 + ], + [ + -0.07239358683375485, + 203.135377672515 + ] + ] + }, + { + "type": "line", + "version": 1755, + "versionNonce": 1487752017, + "isDeleted": false, + "id": "GpZg3Rw4Hszxzxf38Q4Hn", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 3.141592653589793, + "x": 539.3083636621348, + "y": 178.11701218356845, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 20.336010349032712, + "height": 203.23377930246647, + "seed": 470135121, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "startBinding": null, + "endBinding": null, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": null, + "points": [ + [ + 0, + 0 + ], + [ + -20.264781987976257, + -0.0011773927935071482 + ], + [ + -20.336010349032712, + 203.23260190967298 + ], + [ + -0.07239358683375485, + 203.135377672515 + ] + ] + }, + { + "type": "text", + "version": 528, + "versionNonce": 1276939839, + "isDeleted": false, + "id": "AGEyNvBxBm2cwm1WRW8n8", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 576.6403584876185, + "y": 210.23272444200816, + "strokeColor": "#087f5b", + "backgroundColor": "#82c91e", + "width": 278.57781982421875, + "height": 25, + "seed": 877528401, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "", + "lineHeight": 1.25, + "baseline": 18 + }, + { + "type": "line", + "version": 1557, + "versionNonce": 773679889, + "isDeleted": false, + "id": "Q8E0gAcLvq6VXqMDZhLdA", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 581.8083636621351, + "y": 177.61701218356845, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 20.336010349032712, + "height": 203.23377930246647, + "seed": 153279217, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "startBinding": null, + "endBinding": null, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": null, + "points": [ + [ + 0, + 0 + ], + [ + -20.264781987976257, + -0.0011773927935071482 + ], + [ + -20.336010349032712, + 203.23260190967298 + ], + [ + -0.07239358683375485, + 203.135377672515 + ] + ] + }, + { + "type": "line", + "version": 1810, + "versionNonce": 1561283199, + "isDeleted": false, + "id": "uhh3ZkPO6bwwf0-AI8syI", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 3.141592653589793, + "x": 894.8083636621349, + "y": 182.61701218356845, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 20.336010349032712, + "height": 203.23377930246647, + "seed": 315380945, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "startBinding": null, + "endBinding": null, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": null, + "points": [ + [ + 0, + 0 + ], + [ + -20.264781987976257, + -0.0011773927935071482 + ], + [ + -20.336010349032712, + 203.23260190967298 + ], + [ + -0.07239358683375485, + 203.135377672515 + ] + ] + }, + { + "type": "text", + "version": 575, + "versionNonce": 910156017, + "isDeleted": false, + "id": "jI5YKyaOdGYYKiBWZmCMs", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 929.6403584876182, + "y": 215.23272444200813, + "strokeColor": "#087f5b", + "backgroundColor": "#82c91e", + "width": 278.57781982421875, + "height": 25, + "seed": 121503167, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "", + "lineHeight": 1.25, + "baseline": 18 + }, + { + "type": "line", + "version": 1604, + "versionNonce": 19920575, + "isDeleted": false, + "id": "QqIk7VTnRWYq499wkttvv", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 934.8083636621348, + "y": 182.61701218356842, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 20.336010349032712, + "height": 203.23377930246647, + "seed": 2012037663, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "startBinding": null, + "endBinding": null, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": null, + "points": [ + [ + 0, + 0 + ], + [ + -20.264781987976257, + -0.0011773927935071482 + ], + [ + -20.336010349032712, + 203.23260190967298 + ], + [ + -0.07239358683375485, + 203.135377672515 + ] + ] + }, + { + "type": "line", + "version": 1857, + "versionNonce": 1660885169, + "isDeleted": false, + "id": "gk89VsYpnf9Jby9KEUBd3", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 3.141592653589793, + "x": 1247.808363662135, + "y": 187.61701218356842, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 20.336010349032712, + "height": 203.23377930246647, + "seed": 509453887, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "startBinding": null, + "endBinding": null, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": null, + "points": [ + [ + 0, + 0 + ], + [ + -20.264781987976257, + -0.0011773927935071482 + ], + [ + -20.336010349032712, + 203.23260190967298 + ], + [ + -0.07239358683375485, + 203.135377672515 + ] + ] + }, + { + "type": "text", + "version": 620, + "versionNonce": 1588681010, + "isDeleted": false, + "id": "a1c-iZI0SafCiy0u4xieZ", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 934.3714375891809, + "y": 261.23272444200813, + "strokeColor": "#e67700", + "backgroundColor": "#82c91e", + "width": 270.71783447265625, + "height": 25, + "seed": 1742829553, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683371080181, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "", + "lineHeight": 1.25, + "baseline": 18 + }, + { + "type": "text", + "version": 564, + "versionNonce": 1968863633, + "isDeleted": false, + "id": "hdhhgp5nA06o5EcSgHQE8", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 937.6203542151575, + "y": 354.23272444200813, + "strokeColor": "#0b7285", + "backgroundColor": "#82c91e", + "width": 287.73785400390625, + "height": 25, + "seed": 309558367, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370363648, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "", + "lineHeight": 1.25, + "baseline": 18 + } + ], + "appState": { + "gridSize": null, + "viewBackgroundColor": "#ffffff" + }, + "files": {} +} \ No newline at end of file diff --git a/core/mvcc/docs/figures/transactions.png b/core/mvcc/docs/figures/transactions.png new file mode 100644 index 000000000..3b8fe59bc Binary files /dev/null and b/core/mvcc/docs/figures/transactions.png differ diff --git a/core/mvcc/mvcc-rs/Cargo.toml b/core/mvcc/mvcc-rs/Cargo.toml new file mode 100644 index 000000000..27f030a73 --- /dev/null +++ b/core/mvcc/mvcc-rs/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "mvcc-rs" +version = "0.0.0" +edition = "2021" + +[dependencies] +anyhow = "1.0.70" +thiserror = "1.0.40" +tracing = "0.1.37" +serde = { version = "1.0.160", features = ["derive"] } +serde_json = "1.0.96" +tracing-subscriber = { version = "0", optional = true } +base64 = "0.21.0" +aws-sdk-s3 = "0.27.0" +aws-config = "0.55.2" +parking_lot = "0.12.1" +futures = "0.3.28" +crossbeam-skiplist = "0.1.1" +tracing-test = "0" + +[dev-dependencies] +criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] } +pprof = { version = "0.11.1", features = ["criterion", "flamegraph"] } +tracing-subscriber = "0" +mvcc-rs = { path = "." } + +[[bench]] +name = "my_benchmark" +harness = false + +[features] +default = [] +c_bindings = ["dep:tracing-subscriber"] diff --git a/core/mvcc/mvcc-rs/benches/my_benchmark.rs b/core/mvcc/mvcc-rs/benches/my_benchmark.rs new file mode 100644 index 000000000..9cb998ca6 --- /dev/null +++ b/core/mvcc/mvcc-rs/benches/my_benchmark.rs @@ -0,0 +1,136 @@ +use criterion::async_executor::FuturesExecutor; +use criterion::{criterion_group, criterion_main, Criterion, Throughput}; +use mvcc_rs::clock::LocalClock; +use mvcc_rs::database::{Database, Row, RowID}; +use pprof::criterion::{Output, PProfProfiler}; + +fn bench_db() -> Database { + let clock = LocalClock::default(); + let storage = mvcc_rs::persistent_storage::Storage::new_noop(); + Database::new(clock, storage) +} + +fn bench(c: &mut Criterion) { + let mut group = c.benchmark_group("mvcc-ops-throughput"); + group.throughput(Throughput::Elements(1)); + + let db = bench_db(); + group.bench_function("begin_tx", |b| { + b.to_async(FuturesExecutor).iter(|| async { + db.begin_tx(); + }) + }); + + let db = bench_db(); + group.bench_function("begin_tx + rollback_tx", |b| { + b.to_async(FuturesExecutor).iter(|| async { + let tx_id = db.begin_tx(); + db.rollback_tx(tx_id) + }) + }); + + let db = bench_db(); + group.bench_function("begin_tx + commit_tx", |b| { + b.to_async(FuturesExecutor).iter(|| async { + let tx_id = db.begin_tx(); + db.commit_tx(tx_id) + }) + }); + + let db = bench_db(); + group.bench_function("begin_tx-read-commit_tx", |b| { + b.to_async(FuturesExecutor).iter(|| async { + let tx_id = db.begin_tx(); + db.read( + tx_id, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + db.commit_tx(tx_id) + }) + }); + + let db = bench_db(); + group.bench_function("begin_tx-update-commit_tx", |b| { + b.to_async(FuturesExecutor).iter(|| async { + let tx_id = db.begin_tx(); + db.update( + tx_id, + Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "World".to_string(), + }, + ) + .unwrap(); + db.commit_tx(tx_id) + }) + }); + + let db = bench_db(); + let tx = db.begin_tx(); + db.insert( + tx, + Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }, + ) + .unwrap(); + group.bench_function("read", |b| { + b.to_async(FuturesExecutor).iter(|| async { + db.read( + tx, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + }) + }); + + let db = bench_db(); + let tx = db.begin_tx(); + db.insert( + tx, + Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }, + ) + .unwrap(); + group.bench_function("update", |b| { + b.to_async(FuturesExecutor).iter(|| async { + db.update( + tx, + Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "World".to_string(), + }, + ) + .unwrap(); + }) + }); +} + +criterion_group! { + name = benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench +} +criterion_main!(benches); diff --git a/core/mvcc/mvcc-rs/src/clock.rs b/core/mvcc/mvcc-rs/src/clock.rs new file mode 100644 index 000000000..7bab1fe5d --- /dev/null +++ b/core/mvcc/mvcc-rs/src/clock.rs @@ -0,0 +1,31 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +/// Logical clock. +pub trait LogicalClock { + fn get_timestamp(&self) -> u64; + fn reset(&self, ts: u64); +} + +/// A node-local clock backed by an atomic counter. +#[derive(Debug, Default)] +pub struct LocalClock { + ts_sequence: AtomicU64, +} + +impl LocalClock { + pub fn new() -> Self { + Self { + ts_sequence: AtomicU64::new(0), + } + } +} + +impl LogicalClock for LocalClock { + fn get_timestamp(&self) -> u64 { + self.ts_sequence.fetch_add(1, Ordering::SeqCst) + } + + fn reset(&self, ts: u64) { + self.ts_sequence.store(ts, Ordering::SeqCst); + } +} diff --git a/core/mvcc/mvcc-rs/src/cursor.rs b/core/mvcc/mvcc-rs/src/cursor.rs new file mode 100644 index 000000000..93ec3e2bd --- /dev/null +++ b/core/mvcc/mvcc-rs/src/cursor.rs @@ -0,0 +1,58 @@ +use serde::de::DeserializeOwned; +use serde::Serialize; + +use crate::clock::LogicalClock; +use crate::database::{Database, Result, Row, RowID}; +use std::fmt::Debug; + +#[derive(Debug)] +pub struct ScanCursor<'a, Clock: LogicalClock, T: Sync + Send + Clone + Serialize + DeserializeOwned + Debug> { + pub db: &'a Database, + pub row_ids: Vec, + pub index: usize, + tx_id: u64, +} + +impl<'a, Clock: LogicalClock, T: Sync + Send + Clone + Serialize + DeserializeOwned + Debug + 'static> ScanCursor<'a, Clock, T> { + pub fn new( + db: &'a Database, + tx_id: u64, + table_id: u64, + ) -> Result> { + let row_ids = db.scan_row_ids_for_table(table_id)?; + Ok(Self { + db, + tx_id, + row_ids, + index: 0, + }) + } + + pub fn current_row_id(&self) -> Option { + if self.index >= self.row_ids.len() { + return None; + } + Some(self.row_ids[self.index]) + } + + pub fn current_row(&self) -> Result>> { + if self.index >= self.row_ids.len() { + return Ok(None); + } + let id = self.row_ids[self.index]; + self.db.read(self.tx_id, id) + } + + pub fn close(self) -> Result<()> { + Ok(()) + } + + pub fn forward(&mut self) -> bool { + self.index += 1; + self.index < self.row_ids.len() + } + + pub fn is_empty(&self) -> bool { + self.index >= self.row_ids.len() + } +} diff --git a/core/mvcc/mvcc-rs/src/database/mod.rs b/core/mvcc/mvcc-rs/src/database/mod.rs new file mode 100644 index 000000000..d434722ea --- /dev/null +++ b/core/mvcc/mvcc-rs/src/database/mod.rs @@ -0,0 +1,863 @@ +use crate::clock::LogicalClock; +use crate::errors::DatabaseError; +use crate::persistent_storage::Storage; +use crossbeam_skiplist::{SkipMap, SkipSet}; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::RwLock; + +pub type Result = std::result::Result; + +#[cfg(test)] +mod tests; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash)] +pub struct RowID { + pub table_id: u64, + pub row_id: u64, +} + +#[derive(Clone, Debug, PartialEq, PartialOrd, Serialize, Deserialize)] + +pub struct Row { + pub id: RowID, + pub data: T, +} + +/// A row version. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct RowVersion { + begin: TxTimestampOrID, + end: Option, + row: Row, +} + +pub type TxID = u64; + +/// A log record contains all the versions inserted and deleted by a transaction. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct LogRecord { + pub(crate) tx_timestamp: TxID, + row_versions: Vec>, +} + +impl LogRecord { + fn new(tx_timestamp: TxID) -> Self { + Self { + tx_timestamp, + row_versions: Vec::new(), + } + } +} + +/// A transaction timestamp or ID. +/// +/// Versions either track a timestamp or a transaction ID, depending on the +/// phase of the transaction. During the active phase, new versions track the +/// transaction ID in the `begin` and `end` fields. After a transaction commits, +/// versions switch to tracking timestamps. +#[derive(Clone, Debug, PartialEq, PartialOrd, Serialize, Deserialize)] +enum TxTimestampOrID { + Timestamp(u64), + TxID(TxID), +} + +/// Transaction +#[derive(Debug, Serialize, Deserialize)] +pub struct Transaction { + /// The state of the transaction. + state: AtomicTransactionState, + /// The transaction ID. + tx_id: u64, + /// The transaction begin timestamp. + begin_ts: u64, + /// The transaction write set. + #[serde(with = "skipset_rowid")] + write_set: SkipSet, + /// The transaction read set. + #[serde(with = "skipset_rowid")] + read_set: SkipSet, +} + +mod skipset_rowid { + use super::*; + use serde::{de, ser, ser::SerializeSeq}; + + struct SkipSetDeserializer; + + impl<'de> serde::de::Visitor<'de> for SkipSetDeserializer { + type Value = SkipSet; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("SkipSet key value sequence.") + } + + fn visit_seq(self, mut seq: A) -> std::result::Result + where + A: serde::de::SeqAccess<'de>, + { + let new_skipset = SkipSet::new(); + while let Some(elem) = seq.next_element()? { + new_skipset.insert(elem); + } + + Ok(new_skipset) + } + } + + pub fn serialize( + value: &SkipSet, + ser: S, + ) -> std::result::Result { + let mut set = ser.serialize_seq(Some(value.len()))?; + for v in value { + set.serialize_element(v.value())?; + } + set.end() + } + + pub fn deserialize<'de, D: de::Deserializer<'de>>( + de: D, + ) -> std::result::Result, D::Error> { + de.deserialize_seq(SkipSetDeserializer) + } +} + +impl Transaction { + fn new(tx_id: u64, begin_ts: u64) -> Transaction { + Transaction { + state: TransactionState::Active.into(), + tx_id, + begin_ts, + write_set: SkipSet::new(), + read_set: SkipSet::new(), + } + } + + fn insert_to_read_set(&self, id: RowID) { + self.read_set.insert(id); + } + + fn insert_to_write_set(&mut self, id: RowID) { + self.write_set.insert(id); + } +} + +impl std::fmt::Display for Transaction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + write!( + f, + "{{ state: {}, id: {}, begin_ts: {}, write_set: {:?}, read_set: {:?}", + self.state.load(), + self.tx_id, + self.begin_ts, + // FIXME: I'm sorry, we obviously shouldn't be cloning here. + self.write_set + .iter() + .map(|v| *v.value()) + .collect::>(), + self.read_set + .iter() + .map(|v| *v.value()) + .collect::>() + ) + } +} + +/// Transaction state. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +enum TransactionState { + Active, + Preparing, + Aborted, + Terminated, + Committed(u64), +} + +impl TransactionState { + pub fn encode(&self) -> u64 { + match self { + TransactionState::Active => 0, + TransactionState::Preparing => 1, + TransactionState::Aborted => 2, + TransactionState::Terminated => 3, + TransactionState::Committed(ts) => { + // We only support 2*62 - 1 timestamps, because the extra bit + // is used to encode the type. + assert!(ts & 0x8000_0000_0000_0000 == 0); + 0x8000_0000_0000_0000 | ts + } + } + } + + pub fn decode(v: u64) -> Self { + match v { + 0 => TransactionState::Active, + 1 => TransactionState::Preparing, + 2 => TransactionState::Aborted, + 3 => TransactionState::Terminated, + v if v & 0x8000_0000_0000_0000 != 0 => { + TransactionState::Committed(v & 0x7fff_ffff_ffff_ffff) + } + _ => panic!("Invalid transaction state"), + } + } +} + +// Transaction state encoded into a single 64-bit atomic. +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct AtomicTransactionState { + pub(crate) state: AtomicU64, +} + +impl From for AtomicTransactionState { + fn from(state: TransactionState) -> Self { + Self { + state: AtomicU64::new(state.encode()), + } + } +} + +impl From for TransactionState { + fn from(state: AtomicTransactionState) -> Self { + let encoded = state.state.load(Ordering::Acquire); + TransactionState::decode(encoded) + } +} + +impl std::cmp::PartialEq for AtomicTransactionState { + fn eq(&self, other: &TransactionState) -> bool { + &self.load() == other + } +} + +impl std::fmt::Display for TransactionState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + match self { + TransactionState::Active => write!(f, "Active"), + TransactionState::Preparing => write!(f, "Preparing"), + TransactionState::Committed(ts) => write!(f, "Committed({ts})"), + TransactionState::Aborted => write!(f, "Aborted"), + TransactionState::Terminated => write!(f, "Terminated"), + } + } +} + +impl AtomicTransactionState { + fn store(&self, state: TransactionState) { + self.state.store(state.encode(), Ordering::Release); + } + + fn load(&self) -> TransactionState { + TransactionState::decode(self.state.load(Ordering::Acquire)) + } +} + +#[derive(Debug)] +pub struct Database< + Clock: LogicalClock, + T: Sync + Send + Clone + Serialize + Debug + DeserializeOwned, +> { + rows: SkipMap>>>, + txs: SkipMap>, + tx_ids: AtomicU64, + clock: Clock, + storage: Storage, +} + +impl + Database +{ + /// Creates a new database. + pub fn new(clock: Clock, storage: Storage) -> Self { + Self { + rows: SkipMap::new(), + txs: SkipMap::new(), + tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes + clock, + storage, + } + } + + // Extracts the begin timestamp from a transaction + fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 { + match ts_or_id { + TxTimestampOrID::Timestamp(ts) => *ts, + TxTimestampOrID::TxID(tx_id) => { + self.txs + .get(tx_id) + .unwrap() + .value() + .read() + .unwrap() + .begin_ts + } + } + } + + /// Inserts a new row version into the database, while making sure that + /// the row version is inserted in the correct order. + fn insert_version(&self, id: RowID, row_version: RowVersion) { + let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new())); + let mut versions = versions.value().write().unwrap(); + self.insert_version_raw(&mut versions, row_version) + } + + /// Inserts a new row version into the internal data structure for versions, + /// while making sure that the row version is inserted in the correct order. + fn insert_version_raw(&self, versions: &mut Vec>, row_version: RowVersion) { + // NOTICE: this is an insert a'la insertion sort, with pessimistic linear complexity. + // However, we expect the number of versions to be nearly sorted, so we deem it worthy + // to search linearly for the insertion point instead of paying the price of using + // another data structure, e.g. a BTreeSet. If it proves to be too quadratic empirically, + // we can either switch to a tree-like structure, or at least use partition_point() + // which performs a binary search for the insertion point. + let position = versions + .iter() + .rposition(|v| { + self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin) + }) + .map(|p| p + 1) + .unwrap_or(0); + if versions.len() - position > 3 { + tracing::debug!( + "Inserting a row version {} positions from the end", + versions.len() - position + ); + } + versions.insert(position, row_version); + } + + /// Inserts a new row into the database. + /// + /// This function inserts a new `row` into the database within the context + /// of the transaction `tx_id`. + /// + /// # Arguments + /// + /// * `tx_id` - the ID of the transaction in which to insert the new row. + /// * `row` - the row object containing the values to be inserted. + /// + pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { + let tx = self + .txs + .get(&tx_id) + .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + let mut tx = tx.value().write().unwrap(); + assert_eq!(tx.state, TransactionState::Active); + let id = row.id; + let row_version = RowVersion { + begin: TxTimestampOrID::TxID(tx.tx_id), + end: None, + row, + }; + tx.insert_to_write_set(id); + drop(tx); + self.insert_version(id, row_version); + Ok(()) + } + + /// Updates a row in the database with new values. + /// + /// This function updates an existing row in the database within the + /// context of the transaction `tx_id`. The `row` argument identifies the + /// row to be updated as `id` and contains the new values to be inserted. + /// + /// If the row identified by the `id` does not exist, this function does + /// nothing and returns `false`. Otherwise, the function updates the row + /// with the new values and returns `true`. + /// + /// # Arguments + /// + /// * `tx_id` - the ID of the transaction in which to update the new row. + /// * `row` - the row object containing the values to be updated. + /// + /// # Returns + /// + /// Returns `true` if the row was successfully updated, and `false` otherwise. + pub fn update(&self, tx_id: TxID, row: Row) -> Result { + if !self.delete(tx_id, row.id)? { + return Ok(false); + } + self.insert(tx_id, row)?; + Ok(true) + } + + /// Inserts a row in the database with new values, previously deleting + /// any old data if it existed. Bails on a delete error, e.g. write-write conflict. + pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> { + self.delete(tx_id, row.id)?; + self.insert(tx_id, row) + } + + /// Deletes a row from the table with the given `id`. + /// + /// This function deletes an existing row `id` in the database within the + /// context of the transaction `tx_id`. + /// + /// # Arguments + /// + /// * `tx_id` - the ID of the transaction in which to delete the new row. + /// * `id` - the ID of the row to delete. + /// + /// # Returns + /// + /// Returns `true` if the row was successfully deleted, and `false` otherwise. + /// + pub fn delete(&self, tx_id: TxID, id: RowID) -> Result { + let row_versions_opt = self.rows.get(&id); + if let Some(ref row_versions) = row_versions_opt { + let mut row_versions = row_versions.value().write().unwrap(); + for rv in row_versions.iter_mut().rev() { + let tx = self + .txs + .get(&tx_id) + .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + let tx = tx.value().read().unwrap(); + assert_eq!(tx.state, TransactionState::Active); + if is_write_write_conflict(&self.txs, &tx, rv) { + drop(row_versions); + drop(row_versions_opt); + drop(tx); + self.rollback_tx(tx_id); + return Err(DatabaseError::WriteWriteConflict); + } + if is_version_visible(&self.txs, &tx, rv) { + rv.end = Some(TxTimestampOrID::TxID(tx.tx_id)); + drop(row_versions); + drop(row_versions_opt); + drop(tx); + let tx = self + .txs + .get(&tx_id) + .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + let mut tx = tx.value().write().unwrap(); + tx.insert_to_write_set(id); + return Ok(true); + } + } + } + Ok(false) + } + + /// Retrieves a row from the table with the given `id`. + /// + /// This operation is performed within the scope of the transaction identified + /// by `tx_id`. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to perform the read operation in. + /// * `id` - The ID of the row to retrieve. + /// + /// # Returns + /// + /// Returns `Some(row)` with the row data if the row with the given `id` exists, + /// and `None` otherwise. + pub fn read(&self, tx_id: TxID, id: RowID) -> Result>> { + let tx = self.txs.get(&tx_id).unwrap(); + let tx = tx.value().read().unwrap(); + assert_eq!(tx.state, TransactionState::Active); + if let Some(row_versions) = self.rows.get(&id) { + let row_versions = row_versions.value().read().unwrap(); + for rv in row_versions.iter().rev() { + if is_version_visible(&self.txs, &tx, rv) { + tx.insert_to_read_set(id); + return Ok(Some(rv.row.clone())); + } + } + } + Ok(None) + } + + /// Gets all row ids in the database. + pub fn scan_row_ids(&self) -> Result> { + let keys = self.rows.iter().map(|entry| *entry.key()); + Ok(keys.collect()) + } + + /// Gets all row ids in the database for a given table. + pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { + Ok(self + .rows + .range( + RowID { + table_id, + row_id: 0, + }..RowID { + table_id, + row_id: u64::MAX, + }, + ) + .map(|entry| *entry.key()) + .collect()) + } + + /// Begins a new transaction in the database. + /// + /// This function starts a new transaction in the database and returns a `TxID` value + /// that you can use to perform operations within the transaction. All changes made within the + /// transaction are isolated from other transactions until you commit the transaction. + pub fn begin_tx(&self) -> TxID { + let tx_id = self.get_tx_id(); + let begin_ts = self.get_timestamp(); + let tx = Transaction::new(tx_id, begin_ts); + tracing::trace!("BEGIN {tx}"); + self.txs.insert(tx_id, RwLock::new(tx)); + tx_id + } + + /// Commits a transaction with the specified transaction ID. + /// + /// This function commits the changes made within the specified transaction and finalizes the + /// transaction. Once a transaction has been committed, all changes made within the transaction + /// are visible to other transactions that access the same data. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to commit. + pub fn commit_tx(&self, tx_id: TxID) -> Result<()> { + let end_ts = self.get_timestamp(); + // NOTICE: the first shadowed tx keeps the entry alive in the map + // for the duration of this whole function, which is important for correctness! + let tx = self.txs.get(&tx_id).ok_or(DatabaseError::TxTerminated)?; + let tx = tx.value().write().unwrap(); + match tx.state.load() { + TransactionState::Terminated => return Err(DatabaseError::TxTerminated), + _ => { + assert_eq!(tx.state, TransactionState::Active); + } + } + tx.state.store(TransactionState::Preparing); + tracing::trace!("PREPARE {tx}"); + + /* TODO: The code we have here is sufficient for snapshot isolation. + ** In order to implement serializability, we need the following steps: + ** + ** 1. Validate if all read versions are still visible by inspecting the read_set + ** 2. Validate if there are no phantoms by walking the scans from scan_set (which we don't even have yet) + ** - a phantom is a version that became visible in the middle of our transaction, + ** but wasn't taken into account during one of the scans from the scan_set + ** 3. Wait for commit dependencies, which we don't even track yet... + ** Excerpt from what's a commit dependency and how it's tracked in the original paper: + ** """ + A transaction T1 has a commit dependency on another transaction + T2, if T1 is allowed to commit only if T2 commits. If T2 aborts, + T1 must also abort, so cascading aborts are possible. T1 acquires a + commit dependency either by speculatively reading or speculatively ignoring a version, + instead of waiting for T2 to commit. + We implement commit dependencies by a register-and-report + approach: T1 registers its dependency with T2 and T2 informs T1 + when it has committed or aborted. Each transaction T contains a + counter, CommitDepCounter, that counts how many unresolved + commit dependencies it still has. A transaction cannot commit + until this counter is zero. In addition, T has a Boolean variable + AbortNow that other transactions can set to tell T to abort. Each + transaction T also has a set, CommitDepSet, that stores transaction IDs + of the transactions that depend on T. + To take a commit dependency on a transaction T2, T1 increments + its CommitDepCounter and adds its transaction ID to T2’s CommitDepSet. + When T2 has committed, it locates each transaction in + its CommitDepSet and decrements their CommitDepCounter. If + T2 aborted, it tells the dependent transactions to also abort by + setting their AbortNow flags. If a dependent transaction is not + found, this means that it has already aborted. + Note that a transaction with commit dependencies may not have to + wait at all - the dependencies may have been resolved before it is + ready to commit. Commit dependencies consolidate all waits into + a single wait and postpone the wait to just before commit. + Some transactions may have to wait before commit. + Waiting raises a concern of deadlocks. + However, deadlocks cannot occur because an older transaction never + waits on a younger transaction. In + a wait-for graph the direction of edges would always be from a + younger transaction (higher end timestamp) to an older transaction + (lower end timestamp) so cycles are impossible. + """ + ** If you're wondering when a speculative read happens, here you go: + ** Case 1: speculative read of TB: + """ + If transaction TB is in the Preparing state, it has acquired an end + timestamp TS which will be V’s begin timestamp if TB commits. + A safe approach in this situation would be to have transaction T + wait until transaction TB commits. However, we want to avoid all + blocking during normal processing so instead we continue with + the visibility test and, if the test returns true, allow T to + speculatively read V. Transaction T acquires a commit dependency on + TB, restricting the serialization order of the two transactions. That + is, T is allowed to commit only if TB commits. + """ + ** Case 2: speculative ignore of TE: + """ + If TE’s state is Preparing, it has an end timestamp TS that will become + the end timestamp of V if TE does commit. If TS is greater than the read + time RT, it is obvious that V will be visible if TE commits. If TE + aborts, V will still be visible, because any transaction that updates + V after TE has aborted will obtain an end timestamp greater than + TS. If TS is less than RT, we have a more complicated situation: + if TE commits, V will not be visible to T but if TE aborts, it will + be visible. We could handle this by forcing T to wait until TE + commits or aborts but we want to avoid all blocking during normal processing. + Instead we allow T to speculatively ignore V and + proceed with its processing. Transaction T acquires a commit + dependency (see Section 2.7) on TE, that is, T is allowed to commit + only if TE commits. + """ + */ + tx.state.store(TransactionState::Committed(end_ts)); + tracing::trace!("COMMIT {tx}"); + let tx_begin_ts = tx.begin_ts; + let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); + drop(tx); + // Postprocessing: inserting row versions and logging the transaction to persistent storage. + // TODO: we should probably save to persistent storage first, and only then update the in-memory structures. + let mut log_record: LogRecord = LogRecord::new(end_ts); + for ref id in write_set { + if let Some(row_versions) = self.rows.get(id) { + let mut row_versions = row_versions.value().write().unwrap(); + for row_version in row_versions.iter_mut() { + if let TxTimestampOrID::TxID(id) = row_version.begin { + if id == tx_id { + row_version.begin = TxTimestampOrID::Timestamp(tx_begin_ts); + self.insert_version_raw( + &mut log_record.row_versions, + row_version.clone(), + ); // FIXME: optimize cloning out + } + } + if let Some(TxTimestampOrID::TxID(id)) = row_version.end { + if id == tx_id { + row_version.end = Some(TxTimestampOrID::Timestamp(end_ts)); + self.insert_version_raw( + &mut log_record.row_versions, + row_version.clone(), + ); // FIXME: optimize cloning out + } + } + } + } + } + tracing::trace!("UPDATED TX{tx_id}"); + // We have now updated all the versions with a reference to the + // transaction ID to a timestamp and can, therefore, remove the + // transaction. Please note that when we move to lockless, the + // invariant doesn't necessarily hold anymore because another thread + // might have speculatively read a version that we want to remove. + // But that's a problem for another day. + // FIXME: it actually just become a problem for today!!! + // TODO: test that reproduces this failure, and then a fix + self.txs.remove(&tx_id); + if !log_record.row_versions.is_empty() { + self.storage.log_tx(log_record)?; + } + tracing::trace!("LOGGED {tx_id}"); + Ok(()) + } + + /// Rolls back a transaction with the specified ID. + /// + /// This function rolls back a transaction with the specified `tx_id` by + /// discarding any changes made by the transaction. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to abort. + pub fn rollback_tx(&self, tx_id: TxID) { + let tx_unlocked = self.txs.get(&tx_id).unwrap(); + let tx = tx_unlocked.value().write().unwrap(); + assert_eq!(tx.state, TransactionState::Active); + tx.state.store(TransactionState::Aborted); + tracing::trace!("ABORT {tx}"); + let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); + drop(tx); + + for ref id in write_set { + if let Some(row_versions) = self.rows.get(id) { + let mut row_versions = row_versions.value().write().unwrap(); + row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id)); + if row_versions.is_empty() { + self.rows.remove(id); + } + } + } + + let tx = tx_unlocked.value().write().unwrap(); + tx.state.store(TransactionState::Terminated); + tracing::trace!("TERMINATE {tx}"); + // FIXME: verify that we can already remove the transaction here! + // Maybe it's fine for snapshot isolation, but too early for serializable? + self.txs.remove(&tx_id); + } + + /// Generates next unique transaction id + pub fn get_tx_id(&self) -> u64 { + self.tx_ids.fetch_add(1, Ordering::SeqCst) + } + + /// Gets current timestamp + pub fn get_timestamp(&self) -> u64 { + self.clock.get_timestamp() + } + + /// Removes unused row versions with very loose heuristics, + /// which sometimes leaves versions intact for too long. + /// Returns the number of removed versions. + pub fn drop_unused_row_versions(&self) -> usize { + tracing::trace!( + "Dropping unused row versions. Database stats: transactions: {}; rows: {}", + self.txs.len(), + self.rows.len() + ); + let mut dropped = 0; + let mut to_remove = Vec::new(); + for entry in self.rows.iter() { + let mut row_versions = entry.value().write().unwrap(); + row_versions.retain(|rv| { + // FIXME: should take rv.begin into account as well + let should_stay = match rv.end { + Some(TxTimestampOrID::Timestamp(version_end_ts)) => { + // a transaction started before this row version ended, ergo row version is needed + // NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable + self.txs.iter().any(|tx| { + let tx = tx.value().read().unwrap(); + // FIXME: verify! + match tx.state.load() { + TransactionState::Active | TransactionState::Preparing => { + version_end_ts > tx.begin_ts + } + _ => false, + } + }) + } + // Let's skip potentially complex logic if the transafction is still + // active/tracked. We will drop the row version when the transaction + // gets garbage-collected itself, it will always happen eventually. + Some(TxTimestampOrID::TxID(tx_id)) => !self.txs.contains_key(&tx_id), + // this row version is current, ergo visible + None => true, + }; + if !should_stay { + dropped += 1; + tracing::trace!( + "Dropping row version {:?} {:?}-{:?}", + entry.key(), + rv.begin, + rv.end + ); + } + should_stay + }); + if row_versions.is_empty() { + to_remove.push(*entry.key()); + } + } + for id in to_remove { + self.rows.remove(&id); + } + dropped + } + + pub fn recover(&self) -> Result<()> { + let tx_log = self.storage.read_tx_log()?; + for record in tx_log { + tracing::debug!("RECOVERING {:?}", record); + for version in record.row_versions { + self.insert_version(version.row.id, version); + } + self.clock.reset(record.tx_timestamp); + } + Ok(()) + } +} + +/// A write-write conflict happens when transaction T_m attempts to update a +/// row version that is currently being updated by an active transaction T_n. +pub(crate) fn is_write_write_conflict( + txs: &SkipMap>, + tx: &Transaction, + rv: &RowVersion, +) -> bool { + match rv.end { + Some(TxTimestampOrID::TxID(rv_end)) => { + let te = txs.get(&rv_end).unwrap(); + let te = te.value().read().unwrap(); + match te.state.load() { + TransactionState::Active | TransactionState::Preparing => tx.tx_id != te.tx_id, + _ => false, + } + } + Some(TxTimestampOrID::Timestamp(_)) => false, + None => false, + } +} + +pub(crate) fn is_version_visible( + txs: &SkipMap>, + tx: &Transaction, + rv: &RowVersion, +) -> bool { + is_begin_visible(txs, tx, rv) && is_end_visible(txs, tx, rv) +} + +fn is_begin_visible( + txs: &SkipMap>, + tx: &Transaction, + rv: &RowVersion, +) -> bool { + match rv.begin { + TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts, + TxTimestampOrID::TxID(rv_begin) => { + let tb = txs.get(&rv_begin).unwrap(); + let tb = tb.value().read().unwrap(); + let visible = match tb.state.load() { + TransactionState::Active => tx.tx_id == tb.tx_id && rv.end.is_none(), + TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable! + TransactionState::Committed(committed_ts) => tx.begin_ts >= committed_ts, + TransactionState::Aborted => false, + TransactionState::Terminated => { + tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now"); + false + } + }; + tracing::trace!( + "is_begin_visible: tx={tx}, tb={tb} rv = {:?}-{:?} visible = {visible}", + rv.begin, + rv.end + ); + visible + } + } +} + +fn is_end_visible( + txs: &SkipMap>, + tx: &Transaction, + rv: &RowVersion, +) -> bool { + match rv.end { + Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts, + Some(TxTimestampOrID::TxID(rv_end)) => { + let te = txs.get(&rv_end).unwrap(); + let te = te.value().read().unwrap(); + let visible = match te.state.load() { + TransactionState::Active => tx.tx_id != te.tx_id, + TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable! + TransactionState::Committed(committed_ts) => tx.begin_ts < committed_ts, + TransactionState::Aborted => false, + TransactionState::Terminated => { + tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now"); + false + } + }; + tracing::trace!( + "is_end_visible: tx={tx}, te={te} rv = {:?}-{:?} visible = {visible}", + rv.begin, + rv.end + ); + visible + } + None => true, + } +} diff --git a/core/mvcc/mvcc-rs/src/database/tests.rs b/core/mvcc/mvcc-rs/src/database/tests.rs new file mode 100644 index 000000000..225c34a0e --- /dev/null +++ b/core/mvcc/mvcc-rs/src/database/tests.rs @@ -0,0 +1,933 @@ +use super::*; +use crate::clock::LocalClock; +use tracing_test::traced_test; + +fn test_db() -> Database { + let clock = LocalClock::new(); + let storage = crate::persistent_storage::Storage::new_noop(); + Database::new(clock, storage) +} + +#[traced_test] +#[test] +fn test_insert_read() { + let db = test_db(); + + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); + db.commit_tx(tx1).unwrap(); + + let tx2 = db.begin_tx(); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); +} + +#[traced_test] +#[test] +fn test_read_nonexistent() { + let db = test_db(); + let tx = db.begin_tx(); + let row = db.read( + tx, + RowID { + table_id: 1, + row_id: 1, + }, + ); + assert!(row.unwrap().is_none()); +} + +#[traced_test] +#[test] +fn test_delete() { + let db = test_db(); + + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); + db.delete( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + assert!(row.is_none()); + db.commit_tx(tx1).unwrap(); + + let tx2 = db.begin_tx(); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + assert!(row.is_none()); +} + +#[traced_test] +#[test] +fn test_delete_nonexistent() { + let db = test_db(); + let tx = db.begin_tx(); + assert!(!db + .delete( + tx, + RowID { + table_id: 1, + row_id: 1 + } + ) + .unwrap()); +} + +#[traced_test] +#[test] +fn test_commit() { + let db = test_db(); + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); + let tx1_updated_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "World".to_string(), + }; + db.update(tx1, tx1_updated_row.clone()).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_updated_row, row); + db.commit_tx(tx1).unwrap(); + + let tx2 = db.begin_tx(); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + db.commit_tx(tx2).unwrap(); + assert_eq!(tx1_updated_row, row); + db.drop_unused_row_versions(); +} + +#[traced_test] +#[test] +fn test_rollback() { + let db = test_db(); + let tx1 = db.begin_tx(); + let row1 = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, row1.clone()).unwrap(); + let row2 = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(row1, row2); + let row3 = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "World".to_string(), + }; + db.update(tx1, row3.clone()).unwrap(); + let row4 = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(row3, row4); + db.rollback_tx(tx1); + let tx2 = db.begin_tx(); + let row5 = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + assert_eq!(row5, None); +} + +#[traced_test] +#[test] +fn test_dirty_write() { + let db = test_db(); + + // T1 inserts a row with ID 1, but does not commit. + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); + + // T2 attempts to delete row with ID 1, but fails because T1 has not committed. + let tx2 = db.begin_tx(); + let tx2_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "World".to_string(), + }; + assert!(!db.update(tx2, tx2_row).unwrap()); + + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); +} + +#[traced_test] +#[test] +fn test_dirty_read() { + let db = test_db(); + + // T1 inserts a row with ID 1, but does not commit. + let tx1 = db.begin_tx(); + let row1 = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, row1).unwrap(); + + // T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed. + let tx2 = db.begin_tx(); + let row2 = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + assert_eq!(row2, None); +} + +#[traced_test] +#[test] +fn test_dirty_read_deleted() { + let db = test_db(); + + // T1 inserts a row with ID 1 and commits. + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + db.commit_tx(tx1).unwrap(); + + // T2 deletes row with ID 1, but does not commit. + let tx2 = db.begin_tx(); + assert!(db + .delete( + tx2, + RowID { + table_id: 1, + row_id: 1 + } + ) + .unwrap()); + + // T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed. + let tx3 = db.begin_tx(); + let row = db + .read( + tx3, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); +} + +#[traced_test] +#[test] +fn test_fuzzy_read() { + let db = test_db(); + + // T1 inserts a row with ID 1 and commits. + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); + db.commit_tx(tx1).unwrap(); + + // T2 reads the row with ID 1 within an active transaction. + let tx2 = db.begin_tx(); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); + + // T3 updates the row and commits. + let tx3 = db.begin_tx(); + let tx3_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "World".to_string(), + }; + db.update(tx3, tx3_row).unwrap(); + db.commit_tx(tx3).unwrap(); + + // T2 still reads the same version of the row as before. + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); +} + +#[traced_test] +#[test] +fn test_lost_update() { + let db = test_db(); + + // T1 inserts a row with ID 1 and commits. + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); + db.commit_tx(tx1).unwrap(); + + // T2 attempts to update row ID 1 within an active transaction. + let tx2 = db.begin_tx(); + let tx2_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "World".to_string(), + }; + assert!(db.update(tx2, tx2_row.clone()).unwrap()); + + // T3 also attempts to update row ID 1 within an active transaction. + let tx3 = db.begin_tx(); + let tx3_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello, world!".to_string(), + }; + assert_eq!( + Err(DatabaseError::WriteWriteConflict), + db.update(tx3, tx3_row) + ); + + db.commit_tx(tx2).unwrap(); + assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3)); + + let tx4 = db.begin_tx(); + let row = db + .read( + tx4, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx2_row, row); +} + +// Test for the visibility to check if a new transaction can see old committed values. +// This test checks for the typo present in the paper, explained in https://github.com/penberg/mvcc-rs/issues/15 +#[traced_test] +#[test] +fn test_committed_visibility() { + let db = test_db(); + + // let's add $10 to my account since I like money + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "10".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + db.commit_tx(tx1).unwrap(); + + // but I like more money, so let me try adding $10 more + let tx2 = db.begin_tx(); + let tx2_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "20".to_string(), + }; + assert!(db.update(tx2, tx2_row.clone()).unwrap()); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(row, tx2_row); + + // can I check how much money I have? + let tx3 = db.begin_tx(); + let row = db + .read( + tx3, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); +} + +// Test to check if a older transaction can see (un)committed future rows +#[traced_test] +#[test] +fn test_future_row() { + let db = test_db(); + + let tx1 = db.begin_tx(); + + let tx2 = db.begin_tx(); + let tx2_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "10".to_string(), + }; + db.insert(tx2, tx2_row).unwrap(); + + // transaction in progress, so tx1 shouldn't be able to see the value + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + assert_eq!(row, None); + + // lets commit the transaction and check if tx1 can see it + db.commit_tx(tx2).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + assert_eq!(row, None); +} + +#[traced_test] +#[test] +fn test_storage1() { + let clock = LocalClock::new(); + let mut path = std::env::temp_dir(); + path.push(format!( + "mvcc-rs-storage-test-{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(), + )); + let storage = crate::persistent_storage::Storage::new_json_on_disk(path.clone()); + let db = Database::new(clock, storage); + + let tx1 = db.begin_tx(); + let tx2 = db.begin_tx(); + let tx3 = db.begin_tx(); + + db.insert( + tx3, + Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "testme".to_string(), + }, + ) + .unwrap(); + + db.commit_tx(tx1).unwrap(); + db.rollback_tx(tx2); + db.commit_tx(tx3).unwrap(); + + let tx4 = db.begin_tx(); + db.insert( + tx4, + Row { + id: RowID { + table_id: 1, + row_id: 2, + }, + data: "testme2".to_string(), + }, + ) + .unwrap(); + db.insert( + tx4, + Row { + id: RowID { + table_id: 1, + row_id: 3, + }, + data: "testme3".to_string(), + }, + ) + .unwrap(); + + assert_eq!( + db.read( + tx4, + RowID { + table_id: 1, + row_id: 1 + } + ) + .unwrap() + .unwrap() + .data, + "testme" + ); + assert_eq!( + db.read( + tx4, + RowID { + table_id: 1, + row_id: 2 + } + ) + .unwrap() + .unwrap() + .data, + "testme2" + ); + assert_eq!( + db.read( + tx4, + RowID { + table_id: 1, + row_id: 3 + } + ) + .unwrap() + .unwrap() + .data, + "testme3" + ); + db.commit_tx(tx4).unwrap(); + + let clock = LocalClock::new(); + let storage = crate::persistent_storage::Storage::new_json_on_disk(path); + let db: Database = Database::new(clock, storage); + db.recover().unwrap(); + println!("{:#?}", db); + + let tx5 = db.begin_tx(); + println!( + "{:#?}", + db.read( + tx5, + RowID { + table_id: 1, + row_id: 1 + } + ) + ); + assert_eq!( + db.read( + tx5, + RowID { + table_id: 1, + row_id: 1 + } + ) + .unwrap() + .unwrap() + .data, + "testme" + ); + assert_eq!( + db.read( + tx5, + RowID { + table_id: 1, + row_id: 2 + } + ) + .unwrap() + .unwrap() + .data, + "testme2" + ); + assert_eq!( + db.read( + tx5, + RowID { + table_id: 1, + row_id: 3 + } + ) + .unwrap() + .unwrap() + .data, + "testme3" + ); +} + +/* States described in the Hekaton paper *for serializability*: + +Table 1: Case analysis of action to take when version V’s +Begin field contains the ID of transaction TB +------------------------------------------------------------------------------------------------------ +TB’s state | TB’s end timestamp | Action to take when transaction T checks visibility of version V. +------------------------------------------------------------------------------------------------------ +Active | Not set | V is visible only if TB=T and V’s end timestamp equals infinity. +------------------------------------------------------------------------------------------------------ +Preparing | TS | V’s begin timestamp will be TS ut V is not yet committed. Use TS + | as V’s begin time when testing visibility. If the test is true, + | allow T to speculatively read V. Committed TS V’s begin timestamp + | will be TS and V is committed. Use TS as V’s begin time to test + | visibility. +------------------------------------------------------------------------------------------------------ +Committed | TS | V’s begin timestamp will be TS and V is committed. Use TS as V’s + | begin time to test visibility. +------------------------------------------------------------------------------------------------------ +Aborted | Irrelevant | Ignore V; it’s a garbage version. +------------------------------------------------------------------------------------------------------ +Terminated | Irrelevant | Reread V’s Begin field. TB has terminated so it must have finalized +or not found | | the timestamp. +------------------------------------------------------------------------------------------------------ + +Table 2: Case analysis of action to take when V's End field +contains a transaction ID TE. +------------------------------------------------------------------------------------------------------ +TE’s state | TE’s end timestamp | Action to take when transaction T checks visibility of a version V + | | as of read time RT. +------------------------------------------------------------------------------------------------------ +Active | Not set | V is visible only if TE is not T. +------------------------------------------------------------------------------------------------------ +Preparing | TS | V’s end timestamp will be TS provided that TE commits. If TS > RT, + | V is visible to T. If TS < RT, T speculatively ignores V. +------------------------------------------------------------------------------------------------------ +Committed | TS | V’s end timestamp will be TS and V is committed. Use TS as V’s end + | timestamp when testing visibility. +------------------------------------------------------------------------------------------------------ +Aborted | Irrelevant | V is visible. +------------------------------------------------------------------------------------------------------ +Terminated | Irrelevant | Reread V’s End field. TE has terminated so it must have finalized +or not found | | the timestamp. +*/ + +fn new_tx(tx_id: TxID, begin_ts: u64, state: TransactionState) -> RwLock { + let state = state.into(); + RwLock::new(Transaction { + state, + tx_id, + begin_ts, + write_set: SkipSet::new(), + read_set: SkipSet::new(), + }) +} + +#[traced_test] +#[test] +fn test_snapshot_isolation_tx_visible1() { + let txs: SkipMap> = SkipMap::from_iter([ + (1, new_tx(1, 1, TransactionState::Committed(2))), + (2, new_tx(2, 2, TransactionState::Committed(5))), + (3, new_tx(3, 3, TransactionState::Aborted)), + (5, new_tx(5, 5, TransactionState::Preparing)), + (6, new_tx(6, 6, TransactionState::Committed(10))), + (7, new_tx(7, 7, TransactionState::Active)), + ]); + + let current_tx = new_tx(4, 4, TransactionState::Preparing); + let current_tx = current_tx.read().unwrap(); + + let rv_visible = |begin: TxTimestampOrID, end: Option| { + let row_version = RowVersion { + begin, + end, + row: Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "testme".to_string(), + }, + }; + tracing::debug!("Testing visibility of {row_version:?}"); + is_version_visible(&txs, ¤t_tx, &row_version) + }; + + // begin visible: transaction committed with ts < current_tx.begin_ts + // end visible: inf + assert!(rv_visible(TxTimestampOrID::TxID(1), None)); + + // begin invisible: transaction committed with ts > current_tx.begin_ts + assert!(!rv_visible(TxTimestampOrID::TxID(2), None)); + + // begin invisible: transaction aborted + assert!(!rv_visible(TxTimestampOrID::TxID(3), None)); + + // begin visible: timestamp < current_tx.begin_ts + // end invisible: transaction committed with ts > current_tx.begin_ts + assert!(!rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(1)) + )); + + // begin visible: timestamp < current_tx.begin_ts + // end visible: transaction committed with ts < current_tx.begin_ts + assert!(rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(2)) + )); + + // begin visible: timestamp < current_tx.begin_ts + // end invisible: transaction aborted + assert!(!rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(3)) + )); + + // begin invisible: transaction preparing + assert!(!rv_visible(TxTimestampOrID::TxID(5), None)); + + // begin invisible: transaction committed with ts > current_tx.begin_ts + assert!(!rv_visible(TxTimestampOrID::TxID(6), None)); + + // begin invisible: transaction active + assert!(!rv_visible(TxTimestampOrID::TxID(7), None)); + + // begin invisible: transaction committed with ts > current_tx.begin_ts + assert!(!rv_visible(TxTimestampOrID::TxID(6), None)); + + // begin invisible: transaction active + assert!(!rv_visible(TxTimestampOrID::TxID(7), None)); + + // begin visible: timestamp < current_tx.begin_ts + // end invisible: transaction preparing + assert!(!rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(5)) + )); + + // begin invisible: timestamp > current_tx.begin_ts + assert!(!rv_visible( + TxTimestampOrID::Timestamp(6), + Some(TxTimestampOrID::TxID(6)) + )); + + // begin visible: timestamp < current_tx.begin_ts + // end visible: some active transaction will eventually overwrite this version, + // but that hasn't happened + // (this is the https://avi.im/blag/2023/hekaton-paper-typo/ case, I believe!) + assert!(rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(7)) + )); +} diff --git a/core/mvcc/mvcc-rs/src/errors.rs b/core/mvcc/mvcc-rs/src/errors.rs new file mode 100644 index 000000000..6cdad8ca3 --- /dev/null +++ b/core/mvcc/mvcc-rs/src/errors.rs @@ -0,0 +1,13 @@ +use thiserror::Error; + +#[derive(Error, Debug, PartialEq)] +pub enum DatabaseError { + #[error("no such transaction ID: `{0}`")] + NoSuchTransactionID(u64), + #[error("transaction aborted because of a write-write conflict")] + WriteWriteConflict, + #[error("transaction is terminated")] + TxTerminated, + #[error("I/O error: {0}")] + Io(String), +} diff --git a/core/mvcc/mvcc-rs/src/lib.rs b/core/mvcc/mvcc-rs/src/lib.rs new file mode 100644 index 000000000..00eaee336 --- /dev/null +++ b/core/mvcc/mvcc-rs/src/lib.rs @@ -0,0 +1,38 @@ +//! Multiversion concurrency control (MVCC) for Rust. +//! +//! This module implements the main memory MVCC method outlined in the paper +//! "High-Performance Concurrency Control Mechanisms for Main-Memory Databases" +//! by Per-Åke Larson et al (VLDB, 2011). +//! +//! ## Data anomalies +//! +//! * A *dirty write* occurs when transaction T_m updates a value that is written by +//! transaction T_n but not yet committed. The MVCC algorithm prevents dirty +//! writes by validating that a row version is visible to transaction T_m before +//! allowing update to it. +//! +//! * A *dirty read* occurs when transaction T_m reads a value that was written by +//! transaction T_n but not yet committed. The MVCC algorithm prevents dirty +//! reads by validating that a row version is visible to transaction T_m. +//! +//! * A *fuzzy read* (non-repeatable read) occurs when transaction T_m reads a +//! different value in the course of the transaction because another +//! transaction T_n has updated the value. +//! +//! * A *lost update* occurs when transactions T_m and T_n both attempt to update +//! the same value, resulting in one of the updates being lost. The MVCC algorithm +//! prevents lost updates by detecting the write-write conflict and letting the +//! first-writer win by aborting the later transaction. +//! +//! TODO: phantom reads, cursor lost updates, read skew, write skew. +//! +//! ## TODO +//! +//! * Optimistic reads and writes +//! * Garbage collection + +pub mod clock; +pub mod cursor; +pub mod database; +pub mod errors; +pub mod persistent_storage; diff --git a/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs new file mode 100644 index 000000000..0cac45259 --- /dev/null +++ b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs @@ -0,0 +1,82 @@ +use serde::Serialize; +use serde::de::DeserializeOwned; +use std::fmt::Debug; + +use crate::database::{LogRecord, Result}; +use crate::errors::DatabaseError; + +pub mod s3; + +#[derive(Debug)] +pub enum Storage { + Noop, + JsonOnDisk(std::path::PathBuf), + S3(s3::Replicator), +} + +impl Storage { + pub fn new_noop() -> Self { + Self::Noop + } + + pub fn new_json_on_disk(path: impl Into) -> Self { + let path = path.into(); + Self::JsonOnDisk(path) + } + + pub fn new_s3(options: s3::Options) -> Result { + let replicator = futures::executor::block_on(s3::Replicator::new(options))?; + Ok(Self::S3(replicator)) + } +} + +impl Storage { + pub fn log_tx(&self, m: LogRecord) -> Result<()> { + match self { + Self::JsonOnDisk(path) => { + use std::io::Write; + let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?; + let mut file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(path) + .map_err(|e| DatabaseError::Io(e.to_string()))?; + file.write_all(&t) + .map_err(|e| DatabaseError::Io(e.to_string()))?; + file.write_all(b"\n") + .map_err(|e| DatabaseError::Io(e.to_string()))?; + } + Self::S3(replicator) => { + futures::executor::block_on(replicator.replicate_tx(m))?; + } + Self::Noop => (), + } + Ok(()) + } + + pub fn read_tx_log(&self) -> Result>> { + match self { + Self::JsonOnDisk(path) => { + use std::io::BufRead; + let file = std::fs::OpenOptions::new() + .read(true) + .open(path) + .map_err(|e| DatabaseError::Io(e.to_string()))?; + + let mut records: Vec> = Vec::new(); + let mut lines = std::io::BufReader::new(file).lines(); + while let Some(Ok(line)) = lines.next() { + records.push( + serde_json::from_str(&line) + .map_err(|e| DatabaseError::Io(e.to_string()))?, + ) + } + Ok(records) + } + Self::S3(replicator) => futures::executor::block_on(replicator.read_tx_log()), + Self::Noop => Err(crate::errors::DatabaseError::Io( + "cannot read from Noop storage".to_string(), + )), + } + } +} diff --git a/core/mvcc/mvcc-rs/src/persistent_storage/s3.rs b/core/mvcc/mvcc-rs/src/persistent_storage/s3.rs new file mode 100644 index 000000000..cda65fd5e --- /dev/null +++ b/core/mvcc/mvcc-rs/src/persistent_storage/s3.rs @@ -0,0 +1,139 @@ +use crate::database::{LogRecord, Result}; +use crate::errors::DatabaseError; +use aws_sdk_s3::Client; +use serde::Serialize; +use serde::de::DeserializeOwned; +use std::fmt::Debug; + +#[derive(Clone, Copy, Debug)] +#[non_exhaustive] +pub struct Options { + pub create_bucket_if_not_exists: bool, +} + +impl Options { + pub fn with_create_bucket_if_not_exists(create_bucket_if_not_exists: bool) -> Self { + Self { + create_bucket_if_not_exists, + } + } +} + +#[derive(Debug)] +pub struct Replicator { + pub client: Client, + pub bucket: String, + pub prefix: String, +} + +impl Replicator { + pub async fn new(options: Options) -> Result { + let mut loader = aws_config::from_env(); + if let Ok(endpoint) = std::env::var("MVCCRS_ENDPOINT") { + loader = loader.endpoint_url(endpoint); + } + let sdk_config = loader.load().await; + let config = aws_sdk_s3::config::Builder::from(&sdk_config) + .force_path_style(true) + .build(); + let bucket = std::env::var("MVCCRS_BUCKET").unwrap_or_else(|_| "mvccrs".to_string()); + let prefix = std::env::var("MVCCRS_PREFIX").unwrap_or_else(|_| "tx".to_string()); + let client = Client::from_conf(config); + + match client.head_bucket().bucket(&bucket).send().await { + Ok(_) => tracing::info!("Bucket {bucket} exists and is accessible"), + Err(aws_sdk_s3::error::SdkError::ServiceError(err)) if err.err().is_not_found() => { + if options.create_bucket_if_not_exists { + tracing::info!("Bucket {bucket} not found, recreating"); + client + .create_bucket() + .bucket(&bucket) + .send() + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + } else { + tracing::error!("Bucket {bucket} does not exist"); + return Err(DatabaseError::Io(err.err().to_string())); + } + } + Err(e) => { + tracing::error!("Bucket checking error: {e}"); + return Err(DatabaseError::Io(e.to_string())); + } + } + + Ok(Self { + client, + bucket, + prefix, + }) + } + + pub async fn replicate_tx(&self, record: LogRecord) -> Result<()> { + let key = format!("{}-{:020}", self.prefix, record.tx_timestamp); + tracing::trace!("Replicating {key}"); + let body = serde_json::to_vec(&record).map_err(|e| DatabaseError::Io(e.to_string()))?; + let resp = self + .client + .put_object() + .bucket(&self.bucket) + .key(&key) + .body(body.into()) + .send() + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + tracing::trace!("Replicator response: {:?}", resp); + Ok(()) + } + + pub async fn read_tx_log(&self) -> Result>> { + let mut records: Vec> = Vec::new(); + // Read all objects from the bucket, one log record is stored in one object + let mut next_token = None; + loop { + let mut req = self + .client + .list_objects_v2() + .bucket(&self.bucket) + .prefix(&self.prefix); + if let Some(next_token) = next_token { + req = req.continuation_token(next_token); + } + let resp = req + .send() + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + tracing::trace!("List objects response: {:?}", resp); + if let Some(contents) = resp.contents { + // read the record from s3 based on the object metadata (`contents`) + // and store it in the `records` vector + for object in contents { + let key = object.key.unwrap(); + let resp = self + .client + .get_object() + .bucket(&self.bucket) + .key(&key) + .send() + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + tracing::trace!("Get object response: {:?}", resp); + let body = resp + .body + .collect() + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + let record: LogRecord = serde_json::from_slice(&body.into_bytes()) + .map_err(|e| DatabaseError::Io(e.to_string()))?; + records.push(record); + } + } + if resp.next_continuation_token.is_none() { + break; + } + next_token = resp.next_continuation_token; + } + tracing::trace!("Records: {records:?}"); + Ok(records) + } +} diff --git a/core/mvcc/mvcc-rs/tests/concurrency_test.rs b/core/mvcc/mvcc-rs/tests/concurrency_test.rs new file mode 100644 index 000000000..f7d77893e --- /dev/null +++ b/core/mvcc/mvcc-rs/tests/concurrency_test.rs @@ -0,0 +1,124 @@ +use mvcc_rs::clock::LocalClock; +use mvcc_rs::database::{Database, Row, RowID}; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::{Arc, Once}; + +static IDS: AtomicU64 = AtomicU64::new(1); + +static START: Once = Once::new(); + +#[test] +fn test_non_overlapping_concurrent_inserts() { + START.call_once(|| { + tracing_subscriber::fmt::init(); + }); + // Two threads insert to the database concurrently using non-overlapping + // row IDs. + let clock = LocalClock::default(); + let storage = mvcc_rs::persistent_storage::Storage::new_noop(); + let db = Arc::new(Database::new(clock, storage)); + let iterations = 100000; + + let th1 = { + let db = db.clone(); + std::thread::spawn(move || { + for _ in 0..iterations { + let tx = db.begin_tx(); + let id = IDS.fetch_add(1, Ordering::SeqCst); + let id = RowID { + table_id: 1, + row_id: id, + }; + let row = Row { + id, + data: "Hello".to_string(), + }; + db.insert(tx, row.clone()).unwrap(); + db.commit_tx(tx).unwrap(); + let tx = db.begin_tx(); + let committed_row = db.read(tx, id).unwrap(); + db.commit_tx(tx).unwrap(); + assert_eq!(committed_row, Some(row)); + } + }) + }; + let th2 = { + std::thread::spawn(move || { + for _ in 0..iterations { + let tx = db.begin_tx(); + let id = IDS.fetch_add(1, Ordering::SeqCst); + let id = RowID { + table_id: 1, + row_id: id, + }; + let row = Row { + id, + data: "World".to_string(), + }; + db.insert(tx, row.clone()).unwrap(); + db.commit_tx(tx).unwrap(); + let tx = db.begin_tx(); + let committed_row = db.read(tx, id).unwrap(); + db.commit_tx(tx).unwrap(); + assert_eq!(committed_row, Some(row)); + } + }) + }; + th1.join().unwrap(); + th2.join().unwrap(); +} + +#[test] +fn test_overlapping_concurrent_inserts_read_your_writes() { + START.call_once(|| { + tracing_subscriber::fmt::init(); + }); // Two threads insert to the database concurrently using overlapping row IDs. + let clock = LocalClock::default(); + let storage = mvcc_rs::persistent_storage::Storage::new_noop(); + let db = Arc::new(Database::new(clock, storage)); + let iterations = 100000; + + let work = |prefix: &'static str| { + let db = db.clone(); + std::thread::spawn(move || { + let mut failed_upserts = 0; + for i in 0..iterations { + if i % 1000 == 0 { + tracing::debug!("{prefix}: {i}"); + } + if i % 10000 == 0 { + let dropped = db.drop_unused_row_versions(); + tracing::debug!("garbage collected {dropped} versions"); + } + let tx = db.begin_tx(); + let id = i % 16; + let id = RowID { + table_id: 1, + row_id: id, + }; + let row = Row { + id, + data: format!("{prefix} @{tx}"), + }; + if let Err(e) = db.upsert(tx, row.clone()) { + tracing::trace!("upsert failed: {e}"); + failed_upserts += 1; + continue; + } + let committed_row = db.read(tx, id).unwrap(); + db.commit_tx(tx).unwrap(); + assert_eq!(committed_row, Some(row)); + } + tracing::info!( + "{prefix}'s failed upserts: {failed_upserts}/{iterations} {:.2}%", + (failed_upserts * 100) as f64 / iterations as f64 + ); + }) + }; + + let threads = vec![work("A"), work("B"), work("C"), work("D")]; + for th in threads { + th.join().unwrap(); + } +}