mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-04 17:04:18 +01:00
Merge "Hekaton MVCC implementation" from Pekka and others
This imports the full history of the following Git repository into `core/mvcc` directory as-is: https://github.com/penberg/tihku/tree/main
This commit is contained in:
25
core/mvcc/.github/workflows/smoke_test.yml
vendored
Normal file
25
core/mvcc/.github/workflows/smoke_test.yml
vendored
Normal file
@@ -0,0 +1,25 @@
|
||||
name: Rust
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ "main" ]
|
||||
pull_request:
|
||||
branches: [ "main" ]
|
||||
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
RUST_LOG: info,mvcc_rs=trace
|
||||
|
||||
jobs:
|
||||
build:
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Check
|
||||
run: cargo check --all-targets --all-features
|
||||
- name: Clippy
|
||||
run: cargo clippy --all-targets --all-features -- -D warnings
|
||||
- name: Run tests
|
||||
run: cargo test --verbose
|
||||
2
core/mvcc/.gitignore
vendored
Normal file
2
core/mvcc/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
Cargo.lock
|
||||
target/
|
||||
11
core/mvcc/Cargo.toml
Normal file
11
core/mvcc/Cargo.toml
Normal file
@@ -0,0 +1,11 @@
|
||||
[workspace]
|
||||
resolver = "2"
|
||||
members = [
|
||||
"mvcc-rs",
|
||||
"bindings/c",
|
||||
]
|
||||
|
||||
[profile.release]
|
||||
codegen-units = 1
|
||||
panic = "abort"
|
||||
strip = true
|
||||
20
core/mvcc/LICENSE.md
Normal file
20
core/mvcc/LICENSE.md
Normal file
@@ -0,0 +1,20 @@
|
||||
MIT License
|
||||
|
||||
Copyright 2023 Pekka Enberg
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the "Software"), to deal in
|
||||
the Software without restriction, including without limitation the rights to
|
||||
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
58
core/mvcc/README.md
Normal file
58
core/mvcc/README.md
Normal file
@@ -0,0 +1,58 @@
|
||||
# Tihku
|
||||
|
||||
Tihku is an _work-in-progress_, open-source implementation of the Hekaton multi-version concurrency control (MVCC) written in Rust.
|
||||
The project aims to provide a foundational building block for implementing database management systems.
|
||||
|
||||
One of the projects using Tihku is an experimental [libSQL branch with MVCC](https://github.com/penberg/libsql/tree/mvcc) that aims to implement `BEGIN CONCURRENT` with Tihku improve SQLite write concurrency.
|
||||
|
||||
## Features
|
||||
|
||||
* Main memory architecture, rows are accessed via an index
|
||||
* Optimistic multi-version concurrency control
|
||||
* Rust and C APIs
|
||||
|
||||
## Experimental Evaluation
|
||||
|
||||
**Single-threaded micro-benchmarks**
|
||||
|
||||
Operations | Throughput
|
||||
-----------------------------------|------------
|
||||
`begin_tx`, `read`, and `commit` | 2.2M ops/second
|
||||
`begin_tx`, `update`, and `commit` | 2.2M ops/second
|
||||
`read` | 12.9M ops/second
|
||||
`update` | 6.2M ops/second
|
||||
|
||||
(The `cargo bench` was run on a AMD Ryzen 9 3900XT 2.2 GHz CPU.)
|
||||
|
||||
## Development
|
||||
|
||||
Run tests:
|
||||
|
||||
```console
|
||||
cargo test
|
||||
```
|
||||
|
||||
Test coverage report:
|
||||
|
||||
```console
|
||||
cargo tarpaulin -o html
|
||||
```
|
||||
|
||||
Run benchmarks:
|
||||
|
||||
```console
|
||||
cargo bench
|
||||
```
|
||||
|
||||
Run benchmarks and generate flamegraphs:
|
||||
|
||||
```console
|
||||
echo -1 | sudo tee /proc/sys/kernel/perf_event_paranoid
|
||||
cargo bench --bench my_benchmark -- --profile-time=5
|
||||
```
|
||||
|
||||
## References
|
||||
|
||||
Larson et al. [High-Performance Concurrency Control Mechanisms for Main-Memory Databases](https://vldb.org/pvldb/vol5/p298_per-akelarson_vldb2012.pdf). VLDB '11
|
||||
|
||||
Paper errata: The visibility check in Table 2 is wrong and causes uncommitted delete to become visible to transactions (fixed in [commit 6ca3773]( https://github.com/penberg/mvcc-rs/commit/6ca377320bb59b52ecc0430b9e5e422e8d61658d)).
|
||||
22
core/mvcc/bindings/c/Cargo.toml
Normal file
22
core/mvcc/bindings/c/Cargo.toml
Normal file
@@ -0,0 +1,22 @@
|
||||
[package]
|
||||
name = "mvcc-c"
|
||||
version = "0.0.0"
|
||||
edition = "2021"
|
||||
|
||||
[lib]
|
||||
crate-type = ["cdylib", "staticlib"]
|
||||
doc = false
|
||||
|
||||
[build-dependencies]
|
||||
cbindgen = "0.24.0"
|
||||
|
||||
[dependencies]
|
||||
base64 = "0.21.0"
|
||||
mvcc-rs = { path = "../../mvcc-rs" }
|
||||
tracing = "0.1.37"
|
||||
tracing-subscriber = { version = "0" }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
json_on_disk_storage = []
|
||||
s3_storage = []
|
||||
8
core/mvcc/bindings/c/build.rs
Normal file
8
core/mvcc/bindings/c/build.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
use std::path::Path;
|
||||
|
||||
fn main() {
|
||||
let header_file = Path::new("include").join("mvcc.h");
|
||||
cbindgen::generate(".")
|
||||
.expect("Failed to generate C bindings")
|
||||
.write_to_file(header_file);
|
||||
}
|
||||
7
core/mvcc/bindings/c/cbindgen.toml
Normal file
7
core/mvcc/bindings/c/cbindgen.toml
Normal file
@@ -0,0 +1,7 @@
|
||||
language = "C"
|
||||
cpp_compat = true
|
||||
include_guard = "MVCC_H"
|
||||
line_length = 120
|
||||
no_includes = true
|
||||
style = "type"
|
||||
sys_includes = ["stdint.h"]
|
||||
64
core/mvcc/bindings/c/include/mvcc.h
Normal file
64
core/mvcc/bindings/c/include/mvcc.h
Normal file
@@ -0,0 +1,64 @@
|
||||
#ifndef MVCC_H
|
||||
#define MVCC_H
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
typedef enum {
|
||||
MVCC_OK = 0,
|
||||
MVCC_IO_ERROR_READ = 266,
|
||||
MVCC_IO_ERROR_WRITE = 778,
|
||||
} MVCCError;
|
||||
|
||||
typedef struct DbContext DbContext;
|
||||
|
||||
typedef struct ScanCursorContext ScanCursorContext;
|
||||
|
||||
typedef const DbContext *MVCCDatabaseRef;
|
||||
|
||||
typedef ScanCursorContext *MVCCScanCursorRef;
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif // __cplusplus
|
||||
|
||||
MVCCDatabaseRef MVCCDatabaseOpen(const char *path);
|
||||
|
||||
void MVCCDatabaseClose(MVCCDatabaseRef db);
|
||||
|
||||
uint64_t MVCCTransactionBegin(MVCCDatabaseRef db);
|
||||
|
||||
MVCCError MVCCTransactionCommit(MVCCDatabaseRef db, uint64_t tx_id);
|
||||
|
||||
MVCCError MVCCTransactionRollback(MVCCDatabaseRef db, uint64_t tx_id);
|
||||
|
||||
MVCCError MVCCDatabaseInsert(MVCCDatabaseRef db,
|
||||
uint64_t tx_id,
|
||||
uint64_t table_id,
|
||||
uint64_t row_id,
|
||||
const void *value_ptr,
|
||||
uintptr_t value_len);
|
||||
|
||||
MVCCError MVCCDatabaseRead(MVCCDatabaseRef db,
|
||||
uint64_t tx_id,
|
||||
uint64_t table_id,
|
||||
uint64_t row_id,
|
||||
uint8_t **value_ptr,
|
||||
int64_t *value_len);
|
||||
|
||||
void MVCCFreeStr(void *ptr);
|
||||
|
||||
MVCCScanCursorRef MVCCScanCursorOpen(MVCCDatabaseRef db, uint64_t tx_id, uint64_t table_id);
|
||||
|
||||
void MVCCScanCursorClose(MVCCScanCursorRef cursor);
|
||||
|
||||
MVCCError MVCCScanCursorRead(MVCCScanCursorRef cursor, uint8_t **value_ptr, int64_t *value_len);
|
||||
|
||||
int MVCCScanCursorNext(MVCCScanCursorRef cursor);
|
||||
|
||||
uint64_t MVCCScanCursorPosition(MVCCScanCursorRef cursor);
|
||||
|
||||
#ifdef __cplusplus
|
||||
} // extern "C"
|
||||
#endif // __cplusplus
|
||||
|
||||
#endif /* MVCC_H */
|
||||
6
core/mvcc/bindings/c/src/errors.rs
Normal file
6
core/mvcc/bindings/c/src/errors.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
#[repr(C)]
|
||||
pub enum MVCCError {
|
||||
MVCC_OK = 0,
|
||||
MVCC_IO_ERROR_READ = 266,
|
||||
MVCC_IO_ERROR_WRITE = 778,
|
||||
}
|
||||
298
core/mvcc/bindings/c/src/lib.rs
Normal file
298
core/mvcc/bindings/c/src/lib.rs
Normal file
@@ -0,0 +1,298 @@
|
||||
#![allow(non_camel_case_types)]
|
||||
#![allow(clippy::missing_safety_doc)]
|
||||
|
||||
mod errors;
|
||||
mod types;
|
||||
|
||||
use errors::MVCCError;
|
||||
use mvcc_rs::persistent_storage::{s3, Storage};
|
||||
use mvcc_rs::*;
|
||||
use types::{DbContext, MVCCDatabaseRef, MVCCScanCursorRef, ScanCursorContext};
|
||||
|
||||
/// cbindgen:ignore
|
||||
type Clock = clock::LocalClock;
|
||||
|
||||
/// cbindgen:ignore
|
||||
/// Note - We use String type in C bindings as Row type. Type is generic.
|
||||
type Db = database::Database<Clock, String>;
|
||||
|
||||
/// cbindgen:ignore
|
||||
/// Note - We use String type in C bindings as Row type. Type is generic.
|
||||
type ScanCursor = cursor::ScanCursor<'static, Clock, String>;
|
||||
|
||||
static INIT_RUST_LOG: std::sync::Once = std::sync::Once::new();
|
||||
|
||||
fn storage_for(main_db_path: &str) -> database::Result<Storage> {
|
||||
// TODO: let's accept an URL instead of main_db_path here, so we can
|
||||
// pass custom S3 endpoints, options, etc.
|
||||
if cfg!(feature = "json_on_disk_storage") {
|
||||
tracing::info!("JSONonDisk storage stored in {main_db_path}-mvcc");
|
||||
return Ok(Storage::new_json_on_disk(format!("{main_db_path}-mvcc")));
|
||||
}
|
||||
if cfg!(feature = "s3_storage") {
|
||||
tracing::info!("S3 storage for {main_db_path}");
|
||||
let options = s3::Options::with_create_bucket_if_not_exists(true);
|
||||
return Storage::new_s3(options);
|
||||
}
|
||||
tracing::info!("No persistent storage for {main_db_path}");
|
||||
Ok(Storage::new_noop())
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn MVCCDatabaseOpen(path: *const std::ffi::c_char) -> MVCCDatabaseRef {
|
||||
INIT_RUST_LOG.call_once(|| {
|
||||
tracing_subscriber::fmt::init();
|
||||
});
|
||||
|
||||
tracing::debug!("MVCCDatabaseOpen");
|
||||
|
||||
let clock = clock::LocalClock::new();
|
||||
let main_db_path = unsafe { std::ffi::CStr::from_ptr(path) };
|
||||
let main_db_path = match main_db_path.to_str() {
|
||||
Ok(path) => path,
|
||||
Err(_) => {
|
||||
tracing::error!("Invalid UTF-8 path");
|
||||
return MVCCDatabaseRef::null();
|
||||
}
|
||||
};
|
||||
|
||||
tracing::debug!("mvccrs: opening persistent storage for {main_db_path}");
|
||||
let storage = match storage_for(main_db_path) {
|
||||
Ok(storage) => storage,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to open persistent storage: {e}");
|
||||
return MVCCDatabaseRef::null();
|
||||
}
|
||||
};
|
||||
let db = Db::new(clock, storage);
|
||||
|
||||
db.recover().ok();
|
||||
|
||||
let db = Box::leak(Box::new(DbContext { db }));
|
||||
MVCCDatabaseRef::from(db)
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn MVCCDatabaseClose(db: MVCCDatabaseRef) {
|
||||
tracing::debug!("MVCCDatabaseClose");
|
||||
if db.is_null() {
|
||||
tracing::debug!("warning: `db` is null in MVCCDatabaseClose()");
|
||||
return;
|
||||
}
|
||||
let _ = unsafe { Box::from_raw(db.get_ref_mut()) };
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn MVCCTransactionBegin(db: MVCCDatabaseRef) -> u64 {
|
||||
let db = db.get_ref();
|
||||
let tx_id = db.begin_tx();
|
||||
tracing::debug!("MVCCTransactionBegin: {tx_id}");
|
||||
tx_id
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn MVCCTransactionCommit(db: MVCCDatabaseRef, tx_id: u64) -> MVCCError {
|
||||
let db = db.get_ref();
|
||||
tracing::debug!("MVCCTransactionCommit: {tx_id}");
|
||||
match db.commit_tx(tx_id) {
|
||||
Ok(()) => MVCCError::MVCC_OK,
|
||||
Err(e) => {
|
||||
tracing::error!("MVCCTransactionCommit: {e}");
|
||||
MVCCError::MVCC_IO_ERROR_WRITE
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn MVCCTransactionRollback(db: MVCCDatabaseRef, tx_id: u64) -> MVCCError {
|
||||
let db = db.get_ref();
|
||||
tracing::debug!("MVCCTransactionRollback: {tx_id}");
|
||||
db.rollback_tx(tx_id);
|
||||
MVCCError::MVCC_OK
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn MVCCDatabaseInsert(
|
||||
db: MVCCDatabaseRef,
|
||||
tx_id: u64,
|
||||
table_id: u64,
|
||||
row_id: u64,
|
||||
value_ptr: *const std::ffi::c_void,
|
||||
value_len: usize,
|
||||
) -> MVCCError {
|
||||
let db = db.get_ref();
|
||||
let value = std::slice::from_raw_parts(value_ptr as *const u8, value_len);
|
||||
let data = match std::str::from_utf8(value) {
|
||||
Ok(value) => value.to_string(),
|
||||
Err(_) => {
|
||||
tracing::info!("Invalid UTF-8, let's base64 this fellow");
|
||||
use base64::{engine::general_purpose, Engine as _};
|
||||
general_purpose::STANDARD.encode(value)
|
||||
}
|
||||
};
|
||||
let id = database::RowID { table_id, row_id };
|
||||
let row = database::Row { id, data };
|
||||
tracing::debug!("MVCCDatabaseInsert: {row:?}");
|
||||
match db.insert(tx_id, row) {
|
||||
Ok(_) => {
|
||||
tracing::debug!("MVCCDatabaseInsert: success");
|
||||
MVCCError::MVCC_OK
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("MVCCDatabaseInsert: {e}");
|
||||
MVCCError::MVCC_IO_ERROR_WRITE
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn MVCCDatabaseRead(
|
||||
db: MVCCDatabaseRef,
|
||||
tx_id: u64,
|
||||
table_id: u64,
|
||||
row_id: u64,
|
||||
value_ptr: *mut *mut u8,
|
||||
value_len: *mut i64,
|
||||
) -> MVCCError {
|
||||
let db = db.get_ref();
|
||||
|
||||
match {
|
||||
let id = database::RowID { table_id, row_id };
|
||||
let maybe_row = db.read(tx_id, id);
|
||||
match maybe_row {
|
||||
Ok(Some(row)) => {
|
||||
tracing::debug!("Found row {row:?}");
|
||||
let str_len = row.data.len() + 1;
|
||||
let value = std::ffi::CString::new(row.data.as_bytes()).unwrap_or_default();
|
||||
unsafe {
|
||||
*value_ptr = value.into_raw() as *mut u8;
|
||||
*value_len = str_len as i64;
|
||||
}
|
||||
}
|
||||
_ => unsafe { *value_len = -1 },
|
||||
};
|
||||
Ok::<(), mvcc_rs::errors::DatabaseError>(())
|
||||
} {
|
||||
Ok(_) => {
|
||||
tracing::debug!("MVCCDatabaseRead: success");
|
||||
MVCCError::MVCC_OK
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("MVCCDatabaseRead: {e}");
|
||||
MVCCError::MVCC_IO_ERROR_READ
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn MVCCFreeStr(ptr: *mut std::ffi::c_void) {
|
||||
if ptr.is_null() {
|
||||
return;
|
||||
}
|
||||
let _ = std::ffi::CString::from_raw(ptr as *mut std::ffi::c_char);
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn MVCCScanCursorOpen(
|
||||
db: MVCCDatabaseRef,
|
||||
tx_id: u64,
|
||||
table_id: u64,
|
||||
) -> MVCCScanCursorRef {
|
||||
tracing::debug!("MVCCScanCursorOpen()");
|
||||
// Reference is transmuted to &'static in order to be able to pass the cursor back to C.
|
||||
// The contract with C is to never use a cursor after MVCCDatabaseClose() has been called.
|
||||
let db = unsafe { std::mem::transmute::<&Db, &'static Db>(db.get_ref()) };
|
||||
match mvcc_rs::cursor::ScanCursor::new(db, tx_id, table_id) {
|
||||
Ok(cursor) => {
|
||||
if cursor.is_empty() {
|
||||
tracing::debug!("Cursor is empty");
|
||||
return MVCCScanCursorRef {
|
||||
ptr: std::ptr::null_mut(),
|
||||
};
|
||||
}
|
||||
tracing::debug!("Cursor open: {cursor:?}");
|
||||
MVCCScanCursorRef {
|
||||
ptr: Box::into_raw(Box::new(ScanCursorContext { cursor })),
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("MVCCScanCursorOpen: {e}");
|
||||
MVCCScanCursorRef {
|
||||
ptr: std::ptr::null_mut(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn MVCCScanCursorClose(cursor: MVCCScanCursorRef) {
|
||||
tracing::debug!("MVCCScanCursorClose()");
|
||||
if cursor.ptr.is_null() {
|
||||
tracing::debug!("warning: `cursor` is null in MVCCScanCursorClose()");
|
||||
return;
|
||||
}
|
||||
let cursor = unsafe { Box::from_raw(cursor.ptr) }.cursor;
|
||||
cursor.close().ok();
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn MVCCScanCursorRead(
|
||||
cursor: MVCCScanCursorRef,
|
||||
value_ptr: *mut *mut u8,
|
||||
value_len: *mut i64,
|
||||
) -> MVCCError {
|
||||
tracing::debug!("MVCCScanCursorRead()");
|
||||
if cursor.ptr.is_null() {
|
||||
tracing::debug!("warning: `cursor` is null in MVCCScanCursorRead()");
|
||||
return MVCCError::MVCC_IO_ERROR_READ;
|
||||
}
|
||||
let cursor = cursor.get_ref();
|
||||
|
||||
match {
|
||||
let maybe_row = cursor.current_row();
|
||||
match maybe_row {
|
||||
Ok(Some(row)) => {
|
||||
tracing::debug!("Found row {row:?}");
|
||||
let str_len = row.data.len() + 1;
|
||||
let value = std::ffi::CString::new(row.data.as_bytes()).unwrap_or_default();
|
||||
unsafe {
|
||||
*value_ptr = value.into_raw() as *mut u8;
|
||||
*value_len = str_len as i64;
|
||||
}
|
||||
}
|
||||
_ => unsafe { *value_len = -1 },
|
||||
};
|
||||
Ok::<(), mvcc_rs::errors::DatabaseError>(())
|
||||
} {
|
||||
Ok(_) => {
|
||||
tracing::debug!("MVCCDatabaseRead: success");
|
||||
MVCCError::MVCC_OK
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("MVCCDatabaseRead: {e}");
|
||||
MVCCError::MVCC_IO_ERROR_READ
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn MVCCScanCursorNext(cursor: MVCCScanCursorRef) -> std::ffi::c_int {
|
||||
let cursor = cursor.get_ref_mut();
|
||||
tracing::debug!("MVCCScanCursorNext(): {}", cursor.index);
|
||||
if cursor.forward() {
|
||||
tracing::debug!("Forwarded to {}", cursor.index);
|
||||
1
|
||||
} else {
|
||||
tracing::debug!("Forwarded to end");
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn MVCCScanCursorPosition(cursor: MVCCScanCursorRef) -> u64 {
|
||||
let cursor = cursor.get_ref();
|
||||
cursor
|
||||
.current_row_id()
|
||||
.map(|row_id| row_id.row_id)
|
||||
.unwrap_or(0)
|
||||
}
|
||||
79
core/mvcc/bindings/c/src/types.rs
Normal file
79
core/mvcc/bindings/c/src/types.rs
Normal file
@@ -0,0 +1,79 @@
|
||||
use crate::Db;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[repr(transparent)]
|
||||
pub struct MVCCDatabaseRef {
|
||||
ptr: *const DbContext,
|
||||
}
|
||||
|
||||
impl MVCCDatabaseRef {
|
||||
pub fn null() -> MVCCDatabaseRef {
|
||||
MVCCDatabaseRef {
|
||||
ptr: std::ptr::null(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_null(&self) -> bool {
|
||||
self.ptr.is_null()
|
||||
}
|
||||
|
||||
pub fn get_ref(&self) -> &Db {
|
||||
&unsafe { &*(self.ptr) }.db
|
||||
}
|
||||
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
pub fn get_ref_mut(&self) -> &mut Db {
|
||||
let ptr_mut = self.ptr as *mut DbContext;
|
||||
&mut unsafe { &mut (*ptr_mut) }.db
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::from_over_into)]
|
||||
impl From<&DbContext> for MVCCDatabaseRef {
|
||||
fn from(value: &DbContext) -> Self {
|
||||
Self { ptr: value }
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::from_over_into)]
|
||||
impl From<&mut DbContext> for MVCCDatabaseRef {
|
||||
fn from(value: &mut DbContext) -> Self {
|
||||
Self { ptr: value }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DbContext {
|
||||
pub(crate) db: Db,
|
||||
}
|
||||
|
||||
pub struct ScanCursorContext {
|
||||
pub(crate) cursor: crate::ScanCursor,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[repr(transparent)]
|
||||
pub struct MVCCScanCursorRef {
|
||||
pub ptr: *mut ScanCursorContext,
|
||||
}
|
||||
|
||||
impl MVCCScanCursorRef {
|
||||
pub fn null() -> MVCCScanCursorRef {
|
||||
MVCCScanCursorRef {
|
||||
ptr: std::ptr::null_mut(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_null(&self) -> bool {
|
||||
self.ptr.is_null()
|
||||
}
|
||||
|
||||
pub fn get_ref(&self) -> &crate::ScanCursor {
|
||||
&unsafe { &*(self.ptr) }.cursor
|
||||
}
|
||||
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
pub fn get_ref_mut(&self) -> &mut crate::ScanCursor {
|
||||
let ptr_mut = self.ptr as *mut ScanCursorContext;
|
||||
&mut unsafe { &mut (*ptr_mut) }.cursor
|
||||
}
|
||||
}
|
||||
19
core/mvcc/docs/DESIGN.md
Normal file
19
core/mvcc/docs/DESIGN.md
Normal file
@@ -0,0 +1,19 @@
|
||||
# Design
|
||||
|
||||
## Persistent storage
|
||||
|
||||
Persistent storage must implement the `Storage` trait that the MVCC module uses for transaction logging.
|
||||
|
||||
Figure 1 shows an example of write-ahead log across three transactions.
|
||||
The first transaction T0 executes a `INSERT (id) VALUES (1)` statement, which results in a log record with `id` set to `1`, begin timestamp to 0 (which is the transaction ID) and end timestamp as infinity (meaning the row version is still visible).
|
||||
The second transaction T1 executes another `INSERT` statement, which adds another log record to the transaction log with `id` set to `2`, begin timesstamp to 1 and end timestamp as infinity, similar to what T0 did.
|
||||
Finally, a third transaction T2 executes two statements: `DELETE WHERE id = 1` and `INSERT (id) VALUES (3)`. The first one results in a log record with `id` set to `1` and begin timestamp set to 0 (which is the transaction that created the entry). However, the end timestamp is now set to 2 (the current transaction), which means the entry is now deleted.
|
||||
The second statement results in an entry in the transaction log similar to the `INSERT` statements in T0 and T1.
|
||||
|
||||

|
||||
<p align="center">
|
||||
Figure 1. Transaction log of three transactions.
|
||||
</p>
|
||||
|
||||
When MVCC bootstraps or recovers, it simply redos the transaction log.
|
||||
If the transaction log grows big, we can checkpoint it it by dropping all entries that are no longer visible after the the latest transaction and create a snapshot.
|
||||
656
core/mvcc/docs/figures/transactions.excalidraw
Normal file
656
core/mvcc/docs/figures/transactions.excalidraw
Normal file
@@ -0,0 +1,656 @@
|
||||
{
|
||||
"type": "excalidraw",
|
||||
"version": 2,
|
||||
"source": "https://excalidraw.com",
|
||||
"elements": [
|
||||
{
|
||||
"id": "tFvpBUMWe3qPFUTQVV14X",
|
||||
"type": "text",
|
||||
"x": 233.14035848761839,
|
||||
"y": 205.73272444200816,
|
||||
"width": 278.57781982421875,
|
||||
"height": 25,
|
||||
"angle": 0,
|
||||
"strokeColor": "#087f5b",
|
||||
"backgroundColor": "#82c91e",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 1,
|
||||
"opacity": 100,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"seed": 94988319,
|
||||
"version": 510,
|
||||
"versionNonce": 1210831775,
|
||||
"isDeleted": false,
|
||||
"boundElements": null,
|
||||
"updated": 1683370319070,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"text": "<tx=0, id=1, begin=0, end=∞>",
|
||||
"fontSize": 20,
|
||||
"fontFamily": 1,
|
||||
"textAlign": "left",
|
||||
"verticalAlign": "top",
|
||||
"baseline": 18,
|
||||
"containerId": null,
|
||||
"originalText": "<tx=0, id=1, begin=0, end=∞>",
|
||||
"lineHeight": 1.25
|
||||
},
|
||||
{
|
||||
"type": "text",
|
||||
"version": 515,
|
||||
"versionNonce": 1881893969,
|
||||
"isDeleted": false,
|
||||
"id": "7i88n1PIb89NxUbVQmTTi",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 1,
|
||||
"opacity": 100,
|
||||
"angle": 0,
|
||||
"x": 938.4614491858606,
|
||||
"y": 311.23272444200813,
|
||||
"strokeColor": "#0b7285",
|
||||
"backgroundColor": "#82c91e",
|
||||
"width": 279.0400085449219,
|
||||
"height": 25,
|
||||
"seed": 1123646321,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"boundElements": [],
|
||||
"updated": 1683370316909,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"fontSize": 20,
|
||||
"fontFamily": 1,
|
||||
"text": "<tx=2, id=1, begin=0, end=2>",
|
||||
"textAlign": "left",
|
||||
"verticalAlign": "top",
|
||||
"containerId": null,
|
||||
"originalText": "<tx=2, id=1, begin=0, end=2>",
|
||||
"lineHeight": 1.25,
|
||||
"baseline": 18
|
||||
},
|
||||
{
|
||||
"type": "text",
|
||||
"version": 556,
|
||||
"versionNonce": 153125934,
|
||||
"isDeleted": false,
|
||||
"id": "Yh8XLtKqXUUYmcmG4SEXn",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 1,
|
||||
"opacity": 100,
|
||||
"angle": 0,
|
||||
"x": 581.1603475012903,
|
||||
"y": 256.23272444200813,
|
||||
"strokeColor": "#e67700",
|
||||
"backgroundColor": "#82c91e",
|
||||
"width": 270.71783447265625,
|
||||
"height": 25,
|
||||
"seed": 1685524017,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"boundElements": [],
|
||||
"updated": 1683371076075,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"fontSize": 20,
|
||||
"fontFamily": 1,
|
||||
"text": "<tx=1, id=2, begin=1, end=∞>",
|
||||
"textAlign": "left",
|
||||
"verticalAlign": "top",
|
||||
"containerId": null,
|
||||
"originalText": "<tx=1, id=2, begin=1, end=∞>",
|
||||
"lineHeight": 1.25,
|
||||
"baseline": 18
|
||||
},
|
||||
{
|
||||
"id": "8l0CCJzCAtOLt_2GRcNpa",
|
||||
"type": "text",
|
||||
"x": 256.1403584876185,
|
||||
"y": 409.73272444200813,
|
||||
"width": 234.41998291015625,
|
||||
"height": 75,
|
||||
"angle": 0,
|
||||
"strokeColor": "#087f5b",
|
||||
"backgroundColor": "#82c91e",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 1,
|
||||
"opacity": 100,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"seed": 583129809,
|
||||
"version": 570,
|
||||
"versionNonce": 561756721,
|
||||
"isDeleted": false,
|
||||
"boundElements": null,
|
||||
"updated": 1683370316909,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"text": "BEGIN\nINSERT (id) VALUEs (1)\nCOMMIT",
|
||||
"fontSize": 20,
|
||||
"fontFamily": 1,
|
||||
"textAlign": "left",
|
||||
"verticalAlign": "top",
|
||||
"baseline": 68,
|
||||
"containerId": null,
|
||||
"originalText": "BEGIN\nINSERT (id) VALUEs (1)\nCOMMIT",
|
||||
"lineHeight": 1.25
|
||||
},
|
||||
{
|
||||
"type": "text",
|
||||
"version": 628,
|
||||
"versionNonce": 282656095,
|
||||
"isDeleted": false,
|
||||
"id": "3m7VluAP5tair6-60b_sp",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 1,
|
||||
"opacity": 100,
|
||||
"angle": 0,
|
||||
"x": 962.0903554358606,
|
||||
"y": 416.23272444200813,
|
||||
"strokeColor": "#0b7285",
|
||||
"backgroundColor": "#82c91e",
|
||||
"width": 243.91998291015625,
|
||||
"height": 100,
|
||||
"seed": 479705617,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"boundElements": [],
|
||||
"updated": 1683370316909,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"fontSize": 20,
|
||||
"fontFamily": 1,
|
||||
"text": "BEGIN\nDELETE WHERE id =1\nINSERT (id) VALUES (3)\nCOMMIT",
|
||||
"textAlign": "left",
|
||||
"verticalAlign": "top",
|
||||
"containerId": null,
|
||||
"originalText": "BEGIN\nDELETE WHERE id =1\nINSERT (id) VALUES (3)\nCOMMIT",
|
||||
"lineHeight": 1.25,
|
||||
"baseline": 93
|
||||
},
|
||||
{
|
||||
"type": "text",
|
||||
"version": 574,
|
||||
"versionNonce": 1128746001,
|
||||
"isDeleted": false,
|
||||
"id": "Z-Mh1kti2oC6sIMnuGluo",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 1,
|
||||
"opacity": 100,
|
||||
"angle": 0,
|
||||
"x": 613.0903554358607,
|
||||
"y": 417.23272444200813,
|
||||
"strokeColor": "#e67700",
|
||||
"backgroundColor": "#82c91e",
|
||||
"width": 243.239990234375,
|
||||
"height": 75,
|
||||
"seed": 580440625,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"boundElements": [],
|
||||
"updated": 1683370316909,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"fontSize": 20,
|
||||
"fontFamily": 1,
|
||||
"text": "BEGIN\nINSERT (id) VALUEs (2)\nCOMMIT",
|
||||
"textAlign": "left",
|
||||
"verticalAlign": "top",
|
||||
"containerId": null,
|
||||
"originalText": "BEGIN\nINSERT (id) VALUEs (2)\nCOMMIT",
|
||||
"lineHeight": 1.25,
|
||||
"baseline": 68
|
||||
},
|
||||
{
|
||||
"type": "line",
|
||||
"version": 1502,
|
||||
"versionNonce": 1835608607,
|
||||
"isDeleted": false,
|
||||
"id": "VuJNZCgz1Y0WEWwug7pGk",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 0,
|
||||
"opacity": 100,
|
||||
"angle": 0,
|
||||
"x": 226.3083636621349,
|
||||
"y": 173.11701218356845,
|
||||
"strokeColor": "#000000",
|
||||
"backgroundColor": "transparent",
|
||||
"width": 20.336010349032712,
|
||||
"height": 203.23377930246647,
|
||||
"seed": 1879839231,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"boundElements": [],
|
||||
"updated": 1683370316909,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"startBinding": null,
|
||||
"endBinding": null,
|
||||
"lastCommittedPoint": null,
|
||||
"startArrowhead": null,
|
||||
"endArrowhead": null,
|
||||
"points": [
|
||||
[
|
||||
0,
|
||||
0
|
||||
],
|
||||
[
|
||||
-20.264781987976257,
|
||||
-0.0011773927935071482
|
||||
],
|
||||
[
|
||||
-20.336010349032712,
|
||||
203.23260190967298
|
||||
],
|
||||
[
|
||||
-0.07239358683375485,
|
||||
203.135377672515
|
||||
]
|
||||
]
|
||||
},
|
||||
{
|
||||
"type": "line",
|
||||
"version": 1755,
|
||||
"versionNonce": 1487752017,
|
||||
"isDeleted": false,
|
||||
"id": "GpZg3Rw4Hszxzxf38Q4Hn",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 0,
|
||||
"opacity": 100,
|
||||
"angle": 3.141592653589793,
|
||||
"x": 539.3083636621348,
|
||||
"y": 178.11701218356845,
|
||||
"strokeColor": "#000000",
|
||||
"backgroundColor": "transparent",
|
||||
"width": 20.336010349032712,
|
||||
"height": 203.23377930246647,
|
||||
"seed": 470135121,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"boundElements": [],
|
||||
"updated": 1683370316909,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"startBinding": null,
|
||||
"endBinding": null,
|
||||
"lastCommittedPoint": null,
|
||||
"startArrowhead": null,
|
||||
"endArrowhead": null,
|
||||
"points": [
|
||||
[
|
||||
0,
|
||||
0
|
||||
],
|
||||
[
|
||||
-20.264781987976257,
|
||||
-0.0011773927935071482
|
||||
],
|
||||
[
|
||||
-20.336010349032712,
|
||||
203.23260190967298
|
||||
],
|
||||
[
|
||||
-0.07239358683375485,
|
||||
203.135377672515
|
||||
]
|
||||
]
|
||||
},
|
||||
{
|
||||
"type": "text",
|
||||
"version": 528,
|
||||
"versionNonce": 1276939839,
|
||||
"isDeleted": false,
|
||||
"id": "AGEyNvBxBm2cwm1WRW8n8",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 1,
|
||||
"opacity": 100,
|
||||
"angle": 0,
|
||||
"x": 576.6403584876185,
|
||||
"y": 210.23272444200816,
|
||||
"strokeColor": "#087f5b",
|
||||
"backgroundColor": "#82c91e",
|
||||
"width": 278.57781982421875,
|
||||
"height": 25,
|
||||
"seed": 877528401,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"boundElements": [],
|
||||
"updated": 1683370316909,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"fontSize": 20,
|
||||
"fontFamily": 1,
|
||||
"text": "<tx=0, id=1, begin=0, end=∞>",
|
||||
"textAlign": "left",
|
||||
"verticalAlign": "top",
|
||||
"containerId": null,
|
||||
"originalText": "<tx=0, id=1, begin=0, end=∞>",
|
||||
"lineHeight": 1.25,
|
||||
"baseline": 18
|
||||
},
|
||||
{
|
||||
"type": "line",
|
||||
"version": 1557,
|
||||
"versionNonce": 773679889,
|
||||
"isDeleted": false,
|
||||
"id": "Q8E0gAcLvq6VXqMDZhLdA",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 0,
|
||||
"opacity": 100,
|
||||
"angle": 0,
|
||||
"x": 581.8083636621351,
|
||||
"y": 177.61701218356845,
|
||||
"strokeColor": "#000000",
|
||||
"backgroundColor": "transparent",
|
||||
"width": 20.336010349032712,
|
||||
"height": 203.23377930246647,
|
||||
"seed": 153279217,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"boundElements": [],
|
||||
"updated": 1683370316909,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"startBinding": null,
|
||||
"endBinding": null,
|
||||
"lastCommittedPoint": null,
|
||||
"startArrowhead": null,
|
||||
"endArrowhead": null,
|
||||
"points": [
|
||||
[
|
||||
0,
|
||||
0
|
||||
],
|
||||
[
|
||||
-20.264781987976257,
|
||||
-0.0011773927935071482
|
||||
],
|
||||
[
|
||||
-20.336010349032712,
|
||||
203.23260190967298
|
||||
],
|
||||
[
|
||||
-0.07239358683375485,
|
||||
203.135377672515
|
||||
]
|
||||
]
|
||||
},
|
||||
{
|
||||
"type": "line",
|
||||
"version": 1810,
|
||||
"versionNonce": 1561283199,
|
||||
"isDeleted": false,
|
||||
"id": "uhh3ZkPO6bwwf0-AI8syI",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 0,
|
||||
"opacity": 100,
|
||||
"angle": 3.141592653589793,
|
||||
"x": 894.8083636621349,
|
||||
"y": 182.61701218356845,
|
||||
"strokeColor": "#000000",
|
||||
"backgroundColor": "transparent",
|
||||
"width": 20.336010349032712,
|
||||
"height": 203.23377930246647,
|
||||
"seed": 315380945,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"boundElements": [],
|
||||
"updated": 1683370316909,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"startBinding": null,
|
||||
"endBinding": null,
|
||||
"lastCommittedPoint": null,
|
||||
"startArrowhead": null,
|
||||
"endArrowhead": null,
|
||||
"points": [
|
||||
[
|
||||
0,
|
||||
0
|
||||
],
|
||||
[
|
||||
-20.264781987976257,
|
||||
-0.0011773927935071482
|
||||
],
|
||||
[
|
||||
-20.336010349032712,
|
||||
203.23260190967298
|
||||
],
|
||||
[
|
||||
-0.07239358683375485,
|
||||
203.135377672515
|
||||
]
|
||||
]
|
||||
},
|
||||
{
|
||||
"type": "text",
|
||||
"version": 575,
|
||||
"versionNonce": 910156017,
|
||||
"isDeleted": false,
|
||||
"id": "jI5YKyaOdGYYKiBWZmCMs",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 1,
|
||||
"opacity": 100,
|
||||
"angle": 0,
|
||||
"x": 929.6403584876182,
|
||||
"y": 215.23272444200813,
|
||||
"strokeColor": "#087f5b",
|
||||
"backgroundColor": "#82c91e",
|
||||
"width": 278.57781982421875,
|
||||
"height": 25,
|
||||
"seed": 121503167,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"boundElements": [],
|
||||
"updated": 1683370316909,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"fontSize": 20,
|
||||
"fontFamily": 1,
|
||||
"text": "<tx=0, id=1, begin=0, end=∞>",
|
||||
"textAlign": "left",
|
||||
"verticalAlign": "top",
|
||||
"containerId": null,
|
||||
"originalText": "<tx=0, id=1, begin=0, end=∞>",
|
||||
"lineHeight": 1.25,
|
||||
"baseline": 18
|
||||
},
|
||||
{
|
||||
"type": "line",
|
||||
"version": 1604,
|
||||
"versionNonce": 19920575,
|
||||
"isDeleted": false,
|
||||
"id": "QqIk7VTnRWYq499wkttvv",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 0,
|
||||
"opacity": 100,
|
||||
"angle": 0,
|
||||
"x": 934.8083636621348,
|
||||
"y": 182.61701218356842,
|
||||
"strokeColor": "#000000",
|
||||
"backgroundColor": "transparent",
|
||||
"width": 20.336010349032712,
|
||||
"height": 203.23377930246647,
|
||||
"seed": 2012037663,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"boundElements": [],
|
||||
"updated": 1683370316909,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"startBinding": null,
|
||||
"endBinding": null,
|
||||
"lastCommittedPoint": null,
|
||||
"startArrowhead": null,
|
||||
"endArrowhead": null,
|
||||
"points": [
|
||||
[
|
||||
0,
|
||||
0
|
||||
],
|
||||
[
|
||||
-20.264781987976257,
|
||||
-0.0011773927935071482
|
||||
],
|
||||
[
|
||||
-20.336010349032712,
|
||||
203.23260190967298
|
||||
],
|
||||
[
|
||||
-0.07239358683375485,
|
||||
203.135377672515
|
||||
]
|
||||
]
|
||||
},
|
||||
{
|
||||
"type": "line",
|
||||
"version": 1857,
|
||||
"versionNonce": 1660885169,
|
||||
"isDeleted": false,
|
||||
"id": "gk89VsYpnf9Jby9KEUBd3",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 0,
|
||||
"opacity": 100,
|
||||
"angle": 3.141592653589793,
|
||||
"x": 1247.808363662135,
|
||||
"y": 187.61701218356842,
|
||||
"strokeColor": "#000000",
|
||||
"backgroundColor": "transparent",
|
||||
"width": 20.336010349032712,
|
||||
"height": 203.23377930246647,
|
||||
"seed": 509453887,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"boundElements": [],
|
||||
"updated": 1683370316909,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"startBinding": null,
|
||||
"endBinding": null,
|
||||
"lastCommittedPoint": null,
|
||||
"startArrowhead": null,
|
||||
"endArrowhead": null,
|
||||
"points": [
|
||||
[
|
||||
0,
|
||||
0
|
||||
],
|
||||
[
|
||||
-20.264781987976257,
|
||||
-0.0011773927935071482
|
||||
],
|
||||
[
|
||||
-20.336010349032712,
|
||||
203.23260190967298
|
||||
],
|
||||
[
|
||||
-0.07239358683375485,
|
||||
203.135377672515
|
||||
]
|
||||
]
|
||||
},
|
||||
{
|
||||
"type": "text",
|
||||
"version": 620,
|
||||
"versionNonce": 1588681010,
|
||||
"isDeleted": false,
|
||||
"id": "a1c-iZI0SafCiy0u4xieZ",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 1,
|
||||
"opacity": 100,
|
||||
"angle": 0,
|
||||
"x": 934.3714375891809,
|
||||
"y": 261.23272444200813,
|
||||
"strokeColor": "#e67700",
|
||||
"backgroundColor": "#82c91e",
|
||||
"width": 270.71783447265625,
|
||||
"height": 25,
|
||||
"seed": 1742829553,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"boundElements": [],
|
||||
"updated": 1683371080181,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"fontSize": 20,
|
||||
"fontFamily": 1,
|
||||
"text": "<tx=1, id=2, begin=1, end=∞>",
|
||||
"textAlign": "left",
|
||||
"verticalAlign": "top",
|
||||
"containerId": null,
|
||||
"originalText": "<tx=1, id=2, begin=1, end=∞>",
|
||||
"lineHeight": 1.25,
|
||||
"baseline": 18
|
||||
},
|
||||
{
|
||||
"type": "text",
|
||||
"version": 564,
|
||||
"versionNonce": 1968863633,
|
||||
"isDeleted": false,
|
||||
"id": "hdhhgp5nA06o5EcSgHQE8",
|
||||
"fillStyle": "hachure",
|
||||
"strokeWidth": 1,
|
||||
"strokeStyle": "solid",
|
||||
"roughness": 1,
|
||||
"opacity": 100,
|
||||
"angle": 0,
|
||||
"x": 937.6203542151575,
|
||||
"y": 354.23272444200813,
|
||||
"strokeColor": "#0b7285",
|
||||
"backgroundColor": "#82c91e",
|
||||
"width": 287.73785400390625,
|
||||
"height": 25,
|
||||
"seed": 309558367,
|
||||
"groupIds": [],
|
||||
"roundness": null,
|
||||
"boundElements": [],
|
||||
"updated": 1683370363648,
|
||||
"link": null,
|
||||
"locked": false,
|
||||
"fontSize": 20,
|
||||
"fontFamily": 1,
|
||||
"text": "<tx=2, id=3, begin=2, end=∞>",
|
||||
"textAlign": "left",
|
||||
"verticalAlign": "top",
|
||||
"containerId": null,
|
||||
"originalText": "<tx=2, id=3, begin=2, end=∞>",
|
||||
"lineHeight": 1.25,
|
||||
"baseline": 18
|
||||
}
|
||||
],
|
||||
"appState": {
|
||||
"gridSize": null,
|
||||
"viewBackgroundColor": "#ffffff"
|
||||
},
|
||||
"files": {}
|
||||
}
|
||||
BIN
core/mvcc/docs/figures/transactions.png
Normal file
BIN
core/mvcc/docs/figures/transactions.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 304 KiB |
33
core/mvcc/mvcc-rs/Cargo.toml
Normal file
33
core/mvcc/mvcc-rs/Cargo.toml
Normal file
@@ -0,0 +1,33 @@
|
||||
[package]
|
||||
name = "mvcc-rs"
|
||||
version = "0.0.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.70"
|
||||
thiserror = "1.0.40"
|
||||
tracing = "0.1.37"
|
||||
serde = { version = "1.0.160", features = ["derive"] }
|
||||
serde_json = "1.0.96"
|
||||
tracing-subscriber = { version = "0", optional = true }
|
||||
base64 = "0.21.0"
|
||||
aws-sdk-s3 = "0.27.0"
|
||||
aws-config = "0.55.2"
|
||||
parking_lot = "0.12.1"
|
||||
futures = "0.3.28"
|
||||
crossbeam-skiplist = "0.1.1"
|
||||
tracing-test = "0"
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] }
|
||||
pprof = { version = "0.11.1", features = ["criterion", "flamegraph"] }
|
||||
tracing-subscriber = "0"
|
||||
mvcc-rs = { path = "." }
|
||||
|
||||
[[bench]]
|
||||
name = "my_benchmark"
|
||||
harness = false
|
||||
|
||||
[features]
|
||||
default = []
|
||||
c_bindings = ["dep:tracing-subscriber"]
|
||||
136
core/mvcc/mvcc-rs/benches/my_benchmark.rs
Normal file
136
core/mvcc/mvcc-rs/benches/my_benchmark.rs
Normal file
@@ -0,0 +1,136 @@
|
||||
use criterion::async_executor::FuturesExecutor;
|
||||
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
|
||||
use mvcc_rs::clock::LocalClock;
|
||||
use mvcc_rs::database::{Database, Row, RowID};
|
||||
use pprof::criterion::{Output, PProfProfiler};
|
||||
|
||||
fn bench_db() -> Database<LocalClock, String> {
|
||||
let clock = LocalClock::default();
|
||||
let storage = mvcc_rs::persistent_storage::Storage::new_noop();
|
||||
Database::new(clock, storage)
|
||||
}
|
||||
|
||||
fn bench(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("mvcc-ops-throughput");
|
||||
group.throughput(Throughput::Elements(1));
|
||||
|
||||
let db = bench_db();
|
||||
group.bench_function("begin_tx", |b| {
|
||||
b.to_async(FuturesExecutor).iter(|| async {
|
||||
db.begin_tx();
|
||||
})
|
||||
});
|
||||
|
||||
let db = bench_db();
|
||||
group.bench_function("begin_tx + rollback_tx", |b| {
|
||||
b.to_async(FuturesExecutor).iter(|| async {
|
||||
let tx_id = db.begin_tx();
|
||||
db.rollback_tx(tx_id)
|
||||
})
|
||||
});
|
||||
|
||||
let db = bench_db();
|
||||
group.bench_function("begin_tx + commit_tx", |b| {
|
||||
b.to_async(FuturesExecutor).iter(|| async {
|
||||
let tx_id = db.begin_tx();
|
||||
db.commit_tx(tx_id)
|
||||
})
|
||||
});
|
||||
|
||||
let db = bench_db();
|
||||
group.bench_function("begin_tx-read-commit_tx", |b| {
|
||||
b.to_async(FuturesExecutor).iter(|| async {
|
||||
let tx_id = db.begin_tx();
|
||||
db.read(
|
||||
tx_id,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
db.commit_tx(tx_id)
|
||||
})
|
||||
});
|
||||
|
||||
let db = bench_db();
|
||||
group.bench_function("begin_tx-update-commit_tx", |b| {
|
||||
b.to_async(FuturesExecutor).iter(|| async {
|
||||
let tx_id = db.begin_tx();
|
||||
db.update(
|
||||
tx_id,
|
||||
Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "World".to_string(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
db.commit_tx(tx_id)
|
||||
})
|
||||
});
|
||||
|
||||
let db = bench_db();
|
||||
let tx = db.begin_tx();
|
||||
db.insert(
|
||||
tx,
|
||||
Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "Hello".to_string(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
group.bench_function("read", |b| {
|
||||
b.to_async(FuturesExecutor).iter(|| async {
|
||||
db.read(
|
||||
tx,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
})
|
||||
});
|
||||
|
||||
let db = bench_db();
|
||||
let tx = db.begin_tx();
|
||||
db.insert(
|
||||
tx,
|
||||
Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "Hello".to_string(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
group.bench_function("update", |b| {
|
||||
b.to_async(FuturesExecutor).iter(|| async {
|
||||
db.update(
|
||||
tx,
|
||||
Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "World".to_string(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group! {
|
||||
name = benches;
|
||||
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
|
||||
targets = bench
|
||||
}
|
||||
criterion_main!(benches);
|
||||
31
core/mvcc/mvcc-rs/src/clock.rs
Normal file
31
core/mvcc/mvcc-rs/src/clock.rs
Normal file
@@ -0,0 +1,31 @@
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
/// Logical clock.
|
||||
pub trait LogicalClock {
|
||||
fn get_timestamp(&self) -> u64;
|
||||
fn reset(&self, ts: u64);
|
||||
}
|
||||
|
||||
/// A node-local clock backed by an atomic counter.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct LocalClock {
|
||||
ts_sequence: AtomicU64,
|
||||
}
|
||||
|
||||
impl LocalClock {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
ts_sequence: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LogicalClock for LocalClock {
|
||||
fn get_timestamp(&self) -> u64 {
|
||||
self.ts_sequence.fetch_add(1, Ordering::SeqCst)
|
||||
}
|
||||
|
||||
fn reset(&self, ts: u64) {
|
||||
self.ts_sequence.store(ts, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
58
core/mvcc/mvcc-rs/src/cursor.rs
Normal file
58
core/mvcc/mvcc-rs/src/cursor.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::clock::LogicalClock;
|
||||
use crate::database::{Database, Result, Row, RowID};
|
||||
use std::fmt::Debug;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ScanCursor<'a, Clock: LogicalClock, T: Sync + Send + Clone + Serialize + DeserializeOwned + Debug> {
|
||||
pub db: &'a Database<Clock, T>,
|
||||
pub row_ids: Vec<RowID>,
|
||||
pub index: usize,
|
||||
tx_id: u64,
|
||||
}
|
||||
|
||||
impl<'a, Clock: LogicalClock, T: Sync + Send + Clone + Serialize + DeserializeOwned + Debug + 'static> ScanCursor<'a, Clock, T> {
|
||||
pub fn new(
|
||||
db: &'a Database<Clock, T>,
|
||||
tx_id: u64,
|
||||
table_id: u64,
|
||||
) -> Result<ScanCursor<'a, Clock, T>> {
|
||||
let row_ids = db.scan_row_ids_for_table(table_id)?;
|
||||
Ok(Self {
|
||||
db,
|
||||
tx_id,
|
||||
row_ids,
|
||||
index: 0,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn current_row_id(&self) -> Option<RowID> {
|
||||
if self.index >= self.row_ids.len() {
|
||||
return None;
|
||||
}
|
||||
Some(self.row_ids[self.index])
|
||||
}
|
||||
|
||||
pub fn current_row(&self) -> Result<Option<Row<T>>> {
|
||||
if self.index >= self.row_ids.len() {
|
||||
return Ok(None);
|
||||
}
|
||||
let id = self.row_ids[self.index];
|
||||
self.db.read(self.tx_id, id)
|
||||
}
|
||||
|
||||
pub fn close(self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn forward(&mut self) -> bool {
|
||||
self.index += 1;
|
||||
self.index < self.row_ids.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.index >= self.row_ids.len()
|
||||
}
|
||||
}
|
||||
863
core/mvcc/mvcc-rs/src/database/mod.rs
Normal file
863
core/mvcc/mvcc-rs/src/database/mod.rs
Normal file
@@ -0,0 +1,863 @@
|
||||
use crate::clock::LogicalClock;
|
||||
use crate::errors::DatabaseError;
|
||||
use crate::persistent_storage::Storage;
|
||||
use crossbeam_skiplist::{SkipMap, SkipSet};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Debug;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::RwLock;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, DatabaseError>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash)]
|
||||
pub struct RowID {
|
||||
pub table_id: u64,
|
||||
pub row_id: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, PartialOrd, Serialize, Deserialize)]
|
||||
|
||||
pub struct Row<T> {
|
||||
pub id: RowID,
|
||||
pub data: T,
|
||||
}
|
||||
|
||||
/// A row version.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct RowVersion<T> {
|
||||
begin: TxTimestampOrID,
|
||||
end: Option<TxTimestampOrID>,
|
||||
row: Row<T>,
|
||||
}
|
||||
|
||||
pub type TxID = u64;
|
||||
|
||||
/// A log record contains all the versions inserted and deleted by a transaction.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct LogRecord<T> {
|
||||
pub(crate) tx_timestamp: TxID,
|
||||
row_versions: Vec<RowVersion<T>>,
|
||||
}
|
||||
|
||||
impl<T> LogRecord<T> {
|
||||
fn new(tx_timestamp: TxID) -> Self {
|
||||
Self {
|
||||
tx_timestamp,
|
||||
row_versions: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A transaction timestamp or ID.
|
||||
///
|
||||
/// Versions either track a timestamp or a transaction ID, depending on the
|
||||
/// phase of the transaction. During the active phase, new versions track the
|
||||
/// transaction ID in the `begin` and `end` fields. After a transaction commits,
|
||||
/// versions switch to tracking timestamps.
|
||||
#[derive(Clone, Debug, PartialEq, PartialOrd, Serialize, Deserialize)]
|
||||
enum TxTimestampOrID {
|
||||
Timestamp(u64),
|
||||
TxID(TxID),
|
||||
}
|
||||
|
||||
/// Transaction
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Transaction {
|
||||
/// The state of the transaction.
|
||||
state: AtomicTransactionState,
|
||||
/// The transaction ID.
|
||||
tx_id: u64,
|
||||
/// The transaction begin timestamp.
|
||||
begin_ts: u64,
|
||||
/// The transaction write set.
|
||||
#[serde(with = "skipset_rowid")]
|
||||
write_set: SkipSet<RowID>,
|
||||
/// The transaction read set.
|
||||
#[serde(with = "skipset_rowid")]
|
||||
read_set: SkipSet<RowID>,
|
||||
}
|
||||
|
||||
mod skipset_rowid {
|
||||
use super::*;
|
||||
use serde::{de, ser, ser::SerializeSeq};
|
||||
|
||||
struct SkipSetDeserializer;
|
||||
|
||||
impl<'de> serde::de::Visitor<'de> for SkipSetDeserializer {
|
||||
type Value = SkipSet<RowID>;
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
formatter.write_str("SkipSet<RowID> key value sequence.")
|
||||
}
|
||||
|
||||
fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
|
||||
where
|
||||
A: serde::de::SeqAccess<'de>,
|
||||
{
|
||||
let new_skipset = SkipSet::new();
|
||||
while let Some(elem) = seq.next_element()? {
|
||||
new_skipset.insert(elem);
|
||||
}
|
||||
|
||||
Ok(new_skipset)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serialize<S: ser::Serializer>(
|
||||
value: &SkipSet<RowID>,
|
||||
ser: S,
|
||||
) -> std::result::Result<S::Ok, S::Error> {
|
||||
let mut set = ser.serialize_seq(Some(value.len()))?;
|
||||
for v in value {
|
||||
set.serialize_element(v.value())?;
|
||||
}
|
||||
set.end()
|
||||
}
|
||||
|
||||
pub fn deserialize<'de, D: de::Deserializer<'de>>(
|
||||
de: D,
|
||||
) -> std::result::Result<SkipSet<RowID>, D::Error> {
|
||||
de.deserialize_seq(SkipSetDeserializer)
|
||||
}
|
||||
}
|
||||
|
||||
impl Transaction {
|
||||
fn new(tx_id: u64, begin_ts: u64) -> Transaction {
|
||||
Transaction {
|
||||
state: TransactionState::Active.into(),
|
||||
tx_id,
|
||||
begin_ts,
|
||||
write_set: SkipSet::new(),
|
||||
read_set: SkipSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_to_read_set(&self, id: RowID) {
|
||||
self.read_set.insert(id);
|
||||
}
|
||||
|
||||
fn insert_to_write_set(&mut self, id: RowID) {
|
||||
self.write_set.insert(id);
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Transaction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
|
||||
write!(
|
||||
f,
|
||||
"{{ state: {}, id: {}, begin_ts: {}, write_set: {:?}, read_set: {:?}",
|
||||
self.state.load(),
|
||||
self.tx_id,
|
||||
self.begin_ts,
|
||||
// FIXME: I'm sorry, we obviously shouldn't be cloning here.
|
||||
self.write_set
|
||||
.iter()
|
||||
.map(|v| *v.value())
|
||||
.collect::<Vec<RowID>>(),
|
||||
self.read_set
|
||||
.iter()
|
||||
.map(|v| *v.value())
|
||||
.collect::<Vec<RowID>>()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Transaction state.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
enum TransactionState {
|
||||
Active,
|
||||
Preparing,
|
||||
Aborted,
|
||||
Terminated,
|
||||
Committed(u64),
|
||||
}
|
||||
|
||||
impl TransactionState {
|
||||
pub fn encode(&self) -> u64 {
|
||||
match self {
|
||||
TransactionState::Active => 0,
|
||||
TransactionState::Preparing => 1,
|
||||
TransactionState::Aborted => 2,
|
||||
TransactionState::Terminated => 3,
|
||||
TransactionState::Committed(ts) => {
|
||||
// We only support 2*62 - 1 timestamps, because the extra bit
|
||||
// is used to encode the type.
|
||||
assert!(ts & 0x8000_0000_0000_0000 == 0);
|
||||
0x8000_0000_0000_0000 | ts
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn decode(v: u64) -> Self {
|
||||
match v {
|
||||
0 => TransactionState::Active,
|
||||
1 => TransactionState::Preparing,
|
||||
2 => TransactionState::Aborted,
|
||||
3 => TransactionState::Terminated,
|
||||
v if v & 0x8000_0000_0000_0000 != 0 => {
|
||||
TransactionState::Committed(v & 0x7fff_ffff_ffff_ffff)
|
||||
}
|
||||
_ => panic!("Invalid transaction state"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Transaction state encoded into a single 64-bit atomic.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub(crate) struct AtomicTransactionState {
|
||||
pub(crate) state: AtomicU64,
|
||||
}
|
||||
|
||||
impl From<TransactionState> for AtomicTransactionState {
|
||||
fn from(state: TransactionState) -> Self {
|
||||
Self {
|
||||
state: AtomicU64::new(state.encode()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<AtomicTransactionState> for TransactionState {
|
||||
fn from(state: AtomicTransactionState) -> Self {
|
||||
let encoded = state.state.load(Ordering::Acquire);
|
||||
TransactionState::decode(encoded)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::cmp::PartialEq<TransactionState> for AtomicTransactionState {
|
||||
fn eq(&self, other: &TransactionState) -> bool {
|
||||
&self.load() == other
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TransactionState {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
|
||||
match self {
|
||||
TransactionState::Active => write!(f, "Active"),
|
||||
TransactionState::Preparing => write!(f, "Preparing"),
|
||||
TransactionState::Committed(ts) => write!(f, "Committed({ts})"),
|
||||
TransactionState::Aborted => write!(f, "Aborted"),
|
||||
TransactionState::Terminated => write!(f, "Terminated"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AtomicTransactionState {
|
||||
fn store(&self, state: TransactionState) {
|
||||
self.state.store(state.encode(), Ordering::Release);
|
||||
}
|
||||
|
||||
fn load(&self) -> TransactionState {
|
||||
TransactionState::decode(self.state.load(Ordering::Acquire))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Database<
|
||||
Clock: LogicalClock,
|
||||
T: Sync + Send + Clone + Serialize + Debug + DeserializeOwned,
|
||||
> {
|
||||
rows: SkipMap<RowID, RwLock<Vec<RowVersion<T>>>>,
|
||||
txs: SkipMap<TxID, RwLock<Transaction>>,
|
||||
tx_ids: AtomicU64,
|
||||
clock: Clock,
|
||||
storage: Storage,
|
||||
}
|
||||
|
||||
impl<Clock: LogicalClock, T: Sync + Send + Clone + Serialize + Debug + DeserializeOwned + 'static>
|
||||
Database<Clock, T>
|
||||
{
|
||||
/// Creates a new database.
|
||||
pub fn new(clock: Clock, storage: Storage) -> Self {
|
||||
Self {
|
||||
rows: SkipMap::new(),
|
||||
txs: SkipMap::new(),
|
||||
tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes
|
||||
clock,
|
||||
storage,
|
||||
}
|
||||
}
|
||||
|
||||
// Extracts the begin timestamp from a transaction
|
||||
fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 {
|
||||
match ts_or_id {
|
||||
TxTimestampOrID::Timestamp(ts) => *ts,
|
||||
TxTimestampOrID::TxID(tx_id) => {
|
||||
self.txs
|
||||
.get(tx_id)
|
||||
.unwrap()
|
||||
.value()
|
||||
.read()
|
||||
.unwrap()
|
||||
.begin_ts
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Inserts a new row version into the database, while making sure that
|
||||
/// the row version is inserted in the correct order.
|
||||
fn insert_version(&self, id: RowID, row_version: RowVersion<T>) {
|
||||
let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new()));
|
||||
let mut versions = versions.value().write().unwrap();
|
||||
self.insert_version_raw(&mut versions, row_version)
|
||||
}
|
||||
|
||||
/// Inserts a new row version into the internal data structure for versions,
|
||||
/// while making sure that the row version is inserted in the correct order.
|
||||
fn insert_version_raw(&self, versions: &mut Vec<RowVersion<T>>, row_version: RowVersion<T>) {
|
||||
// NOTICE: this is an insert a'la insertion sort, with pessimistic linear complexity.
|
||||
// However, we expect the number of versions to be nearly sorted, so we deem it worthy
|
||||
// to search linearly for the insertion point instead of paying the price of using
|
||||
// another data structure, e.g. a BTreeSet. If it proves to be too quadratic empirically,
|
||||
// we can either switch to a tree-like structure, or at least use partition_point()
|
||||
// which performs a binary search for the insertion point.
|
||||
let position = versions
|
||||
.iter()
|
||||
.rposition(|v| {
|
||||
self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin)
|
||||
})
|
||||
.map(|p| p + 1)
|
||||
.unwrap_or(0);
|
||||
if versions.len() - position > 3 {
|
||||
tracing::debug!(
|
||||
"Inserting a row version {} positions from the end",
|
||||
versions.len() - position
|
||||
);
|
||||
}
|
||||
versions.insert(position, row_version);
|
||||
}
|
||||
|
||||
/// Inserts a new row into the database.
|
||||
///
|
||||
/// This function inserts a new `row` into the database within the context
|
||||
/// of the transaction `tx_id`.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `tx_id` - the ID of the transaction in which to insert the new row.
|
||||
/// * `row` - the row object containing the values to be inserted.
|
||||
///
|
||||
pub fn insert(&self, tx_id: TxID, row: Row<T>) -> Result<()> {
|
||||
let tx = self
|
||||
.txs
|
||||
.get(&tx_id)
|
||||
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
|
||||
let mut tx = tx.value().write().unwrap();
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
let id = row.id;
|
||||
let row_version = RowVersion {
|
||||
begin: TxTimestampOrID::TxID(tx.tx_id),
|
||||
end: None,
|
||||
row,
|
||||
};
|
||||
tx.insert_to_write_set(id);
|
||||
drop(tx);
|
||||
self.insert_version(id, row_version);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Updates a row in the database with new values.
|
||||
///
|
||||
/// This function updates an existing row in the database within the
|
||||
/// context of the transaction `tx_id`. The `row` argument identifies the
|
||||
/// row to be updated as `id` and contains the new values to be inserted.
|
||||
///
|
||||
/// If the row identified by the `id` does not exist, this function does
|
||||
/// nothing and returns `false`. Otherwise, the function updates the row
|
||||
/// with the new values and returns `true`.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `tx_id` - the ID of the transaction in which to update the new row.
|
||||
/// * `row` - the row object containing the values to be updated.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Returns `true` if the row was successfully updated, and `false` otherwise.
|
||||
pub fn update(&self, tx_id: TxID, row: Row<T>) -> Result<bool> {
|
||||
if !self.delete(tx_id, row.id)? {
|
||||
return Ok(false);
|
||||
}
|
||||
self.insert(tx_id, row)?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Inserts a row in the database with new values, previously deleting
|
||||
/// any old data if it existed. Bails on a delete error, e.g. write-write conflict.
|
||||
pub fn upsert(&self, tx_id: TxID, row: Row<T>) -> Result<()> {
|
||||
self.delete(tx_id, row.id)?;
|
||||
self.insert(tx_id, row)
|
||||
}
|
||||
|
||||
/// Deletes a row from the table with the given `id`.
|
||||
///
|
||||
/// This function deletes an existing row `id` in the database within the
|
||||
/// context of the transaction `tx_id`.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `tx_id` - the ID of the transaction in which to delete the new row.
|
||||
/// * `id` - the ID of the row to delete.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Returns `true` if the row was successfully deleted, and `false` otherwise.
|
||||
///
|
||||
pub fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
|
||||
let row_versions_opt = self.rows.get(&id);
|
||||
if let Some(ref row_versions) = row_versions_opt {
|
||||
let mut row_versions = row_versions.value().write().unwrap();
|
||||
for rv in row_versions.iter_mut().rev() {
|
||||
let tx = self
|
||||
.txs
|
||||
.get(&tx_id)
|
||||
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
|
||||
let tx = tx.value().read().unwrap();
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
if is_write_write_conflict(&self.txs, &tx, rv) {
|
||||
drop(row_versions);
|
||||
drop(row_versions_opt);
|
||||
drop(tx);
|
||||
self.rollback_tx(tx_id);
|
||||
return Err(DatabaseError::WriteWriteConflict);
|
||||
}
|
||||
if is_version_visible(&self.txs, &tx, rv) {
|
||||
rv.end = Some(TxTimestampOrID::TxID(tx.tx_id));
|
||||
drop(row_versions);
|
||||
drop(row_versions_opt);
|
||||
drop(tx);
|
||||
let tx = self
|
||||
.txs
|
||||
.get(&tx_id)
|
||||
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
|
||||
let mut tx = tx.value().write().unwrap();
|
||||
tx.insert_to_write_set(id);
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
/// Retrieves a row from the table with the given `id`.
|
||||
///
|
||||
/// This operation is performed within the scope of the transaction identified
|
||||
/// by `tx_id`.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `tx_id` - The ID of the transaction to perform the read operation in.
|
||||
/// * `id` - The ID of the row to retrieve.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Returns `Some(row)` with the row data if the row with the given `id` exists,
|
||||
/// and `None` otherwise.
|
||||
pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row<T>>> {
|
||||
let tx = self.txs.get(&tx_id).unwrap();
|
||||
let tx = tx.value().read().unwrap();
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
if let Some(row_versions) = self.rows.get(&id) {
|
||||
let row_versions = row_versions.value().read().unwrap();
|
||||
for rv in row_versions.iter().rev() {
|
||||
if is_version_visible(&self.txs, &tx, rv) {
|
||||
tx.insert_to_read_set(id);
|
||||
return Ok(Some(rv.row.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Gets all row ids in the database.
|
||||
pub fn scan_row_ids(&self) -> Result<Vec<RowID>> {
|
||||
let keys = self.rows.iter().map(|entry| *entry.key());
|
||||
Ok(keys.collect())
|
||||
}
|
||||
|
||||
/// Gets all row ids in the database for a given table.
|
||||
pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
|
||||
Ok(self
|
||||
.rows
|
||||
.range(
|
||||
RowID {
|
||||
table_id,
|
||||
row_id: 0,
|
||||
}..RowID {
|
||||
table_id,
|
||||
row_id: u64::MAX,
|
||||
},
|
||||
)
|
||||
.map(|entry| *entry.key())
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Begins a new transaction in the database.
|
||||
///
|
||||
/// This function starts a new transaction in the database and returns a `TxID` value
|
||||
/// that you can use to perform operations within the transaction. All changes made within the
|
||||
/// transaction are isolated from other transactions until you commit the transaction.
|
||||
pub fn begin_tx(&self) -> TxID {
|
||||
let tx_id = self.get_tx_id();
|
||||
let begin_ts = self.get_timestamp();
|
||||
let tx = Transaction::new(tx_id, begin_ts);
|
||||
tracing::trace!("BEGIN {tx}");
|
||||
self.txs.insert(tx_id, RwLock::new(tx));
|
||||
tx_id
|
||||
}
|
||||
|
||||
/// Commits a transaction with the specified transaction ID.
|
||||
///
|
||||
/// This function commits the changes made within the specified transaction and finalizes the
|
||||
/// transaction. Once a transaction has been committed, all changes made within the transaction
|
||||
/// are visible to other transactions that access the same data.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `tx_id` - The ID of the transaction to commit.
|
||||
pub fn commit_tx(&self, tx_id: TxID) -> Result<()> {
|
||||
let end_ts = self.get_timestamp();
|
||||
// NOTICE: the first shadowed tx keeps the entry alive in the map
|
||||
// for the duration of this whole function, which is important for correctness!
|
||||
let tx = self.txs.get(&tx_id).ok_or(DatabaseError::TxTerminated)?;
|
||||
let tx = tx.value().write().unwrap();
|
||||
match tx.state.load() {
|
||||
TransactionState::Terminated => return Err(DatabaseError::TxTerminated),
|
||||
_ => {
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
}
|
||||
}
|
||||
tx.state.store(TransactionState::Preparing);
|
||||
tracing::trace!("PREPARE {tx}");
|
||||
|
||||
/* TODO: The code we have here is sufficient for snapshot isolation.
|
||||
** In order to implement serializability, we need the following steps:
|
||||
**
|
||||
** 1. Validate if all read versions are still visible by inspecting the read_set
|
||||
** 2. Validate if there are no phantoms by walking the scans from scan_set (which we don't even have yet)
|
||||
** - a phantom is a version that became visible in the middle of our transaction,
|
||||
** but wasn't taken into account during one of the scans from the scan_set
|
||||
** 3. Wait for commit dependencies, which we don't even track yet...
|
||||
** Excerpt from what's a commit dependency and how it's tracked in the original paper:
|
||||
** """
|
||||
A transaction T1 has a commit dependency on another transaction
|
||||
T2, if T1 is allowed to commit only if T2 commits. If T2 aborts,
|
||||
T1 must also abort, so cascading aborts are possible. T1 acquires a
|
||||
commit dependency either by speculatively reading or speculatively ignoring a version,
|
||||
instead of waiting for T2 to commit.
|
||||
We implement commit dependencies by a register-and-report
|
||||
approach: T1 registers its dependency with T2 and T2 informs T1
|
||||
when it has committed or aborted. Each transaction T contains a
|
||||
counter, CommitDepCounter, that counts how many unresolved
|
||||
commit dependencies it still has. A transaction cannot commit
|
||||
until this counter is zero. In addition, T has a Boolean variable
|
||||
AbortNow that other transactions can set to tell T to abort. Each
|
||||
transaction T also has a set, CommitDepSet, that stores transaction IDs
|
||||
of the transactions that depend on T.
|
||||
To take a commit dependency on a transaction T2, T1 increments
|
||||
its CommitDepCounter and adds its transaction ID to T2’s CommitDepSet.
|
||||
When T2 has committed, it locates each transaction in
|
||||
its CommitDepSet and decrements their CommitDepCounter. If
|
||||
T2 aborted, it tells the dependent transactions to also abort by
|
||||
setting their AbortNow flags. If a dependent transaction is not
|
||||
found, this means that it has already aborted.
|
||||
Note that a transaction with commit dependencies may not have to
|
||||
wait at all - the dependencies may have been resolved before it is
|
||||
ready to commit. Commit dependencies consolidate all waits into
|
||||
a single wait and postpone the wait to just before commit.
|
||||
Some transactions may have to wait before commit.
|
||||
Waiting raises a concern of deadlocks.
|
||||
However, deadlocks cannot occur because an older transaction never
|
||||
waits on a younger transaction. In
|
||||
a wait-for graph the direction of edges would always be from a
|
||||
younger transaction (higher end timestamp) to an older transaction
|
||||
(lower end timestamp) so cycles are impossible.
|
||||
"""
|
||||
** If you're wondering when a speculative read happens, here you go:
|
||||
** Case 1: speculative read of TB:
|
||||
"""
|
||||
If transaction TB is in the Preparing state, it has acquired an end
|
||||
timestamp TS which will be V’s begin timestamp if TB commits.
|
||||
A safe approach in this situation would be to have transaction T
|
||||
wait until transaction TB commits. However, we want to avoid all
|
||||
blocking during normal processing so instead we continue with
|
||||
the visibility test and, if the test returns true, allow T to
|
||||
speculatively read V. Transaction T acquires a commit dependency on
|
||||
TB, restricting the serialization order of the two transactions. That
|
||||
is, T is allowed to commit only if TB commits.
|
||||
"""
|
||||
** Case 2: speculative ignore of TE:
|
||||
"""
|
||||
If TE’s state is Preparing, it has an end timestamp TS that will become
|
||||
the end timestamp of V if TE does commit. If TS is greater than the read
|
||||
time RT, it is obvious that V will be visible if TE commits. If TE
|
||||
aborts, V will still be visible, because any transaction that updates
|
||||
V after TE has aborted will obtain an end timestamp greater than
|
||||
TS. If TS is less than RT, we have a more complicated situation:
|
||||
if TE commits, V will not be visible to T but if TE aborts, it will
|
||||
be visible. We could handle this by forcing T to wait until TE
|
||||
commits or aborts but we want to avoid all blocking during normal processing.
|
||||
Instead we allow T to speculatively ignore V and
|
||||
proceed with its processing. Transaction T acquires a commit
|
||||
dependency (see Section 2.7) on TE, that is, T is allowed to commit
|
||||
only if TE commits.
|
||||
"""
|
||||
*/
|
||||
tx.state.store(TransactionState::Committed(end_ts));
|
||||
tracing::trace!("COMMIT {tx}");
|
||||
let tx_begin_ts = tx.begin_ts;
|
||||
let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
|
||||
drop(tx);
|
||||
// Postprocessing: inserting row versions and logging the transaction to persistent storage.
|
||||
// TODO: we should probably save to persistent storage first, and only then update the in-memory structures.
|
||||
let mut log_record: LogRecord<T> = LogRecord::new(end_ts);
|
||||
for ref id in write_set {
|
||||
if let Some(row_versions) = self.rows.get(id) {
|
||||
let mut row_versions = row_versions.value().write().unwrap();
|
||||
for row_version in row_versions.iter_mut() {
|
||||
if let TxTimestampOrID::TxID(id) = row_version.begin {
|
||||
if id == tx_id {
|
||||
row_version.begin = TxTimestampOrID::Timestamp(tx_begin_ts);
|
||||
self.insert_version_raw(
|
||||
&mut log_record.row_versions,
|
||||
row_version.clone(),
|
||||
); // FIXME: optimize cloning out
|
||||
}
|
||||
}
|
||||
if let Some(TxTimestampOrID::TxID(id)) = row_version.end {
|
||||
if id == tx_id {
|
||||
row_version.end = Some(TxTimestampOrID::Timestamp(end_ts));
|
||||
self.insert_version_raw(
|
||||
&mut log_record.row_versions,
|
||||
row_version.clone(),
|
||||
); // FIXME: optimize cloning out
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::trace!("UPDATED TX{tx_id}");
|
||||
// We have now updated all the versions with a reference to the
|
||||
// transaction ID to a timestamp and can, therefore, remove the
|
||||
// transaction. Please note that when we move to lockless, the
|
||||
// invariant doesn't necessarily hold anymore because another thread
|
||||
// might have speculatively read a version that we want to remove.
|
||||
// But that's a problem for another day.
|
||||
// FIXME: it actually just become a problem for today!!!
|
||||
// TODO: test that reproduces this failure, and then a fix
|
||||
self.txs.remove(&tx_id);
|
||||
if !log_record.row_versions.is_empty() {
|
||||
self.storage.log_tx(log_record)?;
|
||||
}
|
||||
tracing::trace!("LOGGED {tx_id}");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Rolls back a transaction with the specified ID.
|
||||
///
|
||||
/// This function rolls back a transaction with the specified `tx_id` by
|
||||
/// discarding any changes made by the transaction.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `tx_id` - The ID of the transaction to abort.
|
||||
pub fn rollback_tx(&self, tx_id: TxID) {
|
||||
let tx_unlocked = self.txs.get(&tx_id).unwrap();
|
||||
let tx = tx_unlocked.value().write().unwrap();
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
tx.state.store(TransactionState::Aborted);
|
||||
tracing::trace!("ABORT {tx}");
|
||||
let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
|
||||
drop(tx);
|
||||
|
||||
for ref id in write_set {
|
||||
if let Some(row_versions) = self.rows.get(id) {
|
||||
let mut row_versions = row_versions.value().write().unwrap();
|
||||
row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id));
|
||||
if row_versions.is_empty() {
|
||||
self.rows.remove(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let tx = tx_unlocked.value().write().unwrap();
|
||||
tx.state.store(TransactionState::Terminated);
|
||||
tracing::trace!("TERMINATE {tx}");
|
||||
// FIXME: verify that we can already remove the transaction here!
|
||||
// Maybe it's fine for snapshot isolation, but too early for serializable?
|
||||
self.txs.remove(&tx_id);
|
||||
}
|
||||
|
||||
/// Generates next unique transaction id
|
||||
pub fn get_tx_id(&self) -> u64 {
|
||||
self.tx_ids.fetch_add(1, Ordering::SeqCst)
|
||||
}
|
||||
|
||||
/// Gets current timestamp
|
||||
pub fn get_timestamp(&self) -> u64 {
|
||||
self.clock.get_timestamp()
|
||||
}
|
||||
|
||||
/// Removes unused row versions with very loose heuristics,
|
||||
/// which sometimes leaves versions intact for too long.
|
||||
/// Returns the number of removed versions.
|
||||
pub fn drop_unused_row_versions(&self) -> usize {
|
||||
tracing::trace!(
|
||||
"Dropping unused row versions. Database stats: transactions: {}; rows: {}",
|
||||
self.txs.len(),
|
||||
self.rows.len()
|
||||
);
|
||||
let mut dropped = 0;
|
||||
let mut to_remove = Vec::new();
|
||||
for entry in self.rows.iter() {
|
||||
let mut row_versions = entry.value().write().unwrap();
|
||||
row_versions.retain(|rv| {
|
||||
// FIXME: should take rv.begin into account as well
|
||||
let should_stay = match rv.end {
|
||||
Some(TxTimestampOrID::Timestamp(version_end_ts)) => {
|
||||
// a transaction started before this row version ended, ergo row version is needed
|
||||
// NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable
|
||||
self.txs.iter().any(|tx| {
|
||||
let tx = tx.value().read().unwrap();
|
||||
// FIXME: verify!
|
||||
match tx.state.load() {
|
||||
TransactionState::Active | TransactionState::Preparing => {
|
||||
version_end_ts > tx.begin_ts
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
})
|
||||
}
|
||||
// Let's skip potentially complex logic if the transafction is still
|
||||
// active/tracked. We will drop the row version when the transaction
|
||||
// gets garbage-collected itself, it will always happen eventually.
|
||||
Some(TxTimestampOrID::TxID(tx_id)) => !self.txs.contains_key(&tx_id),
|
||||
// this row version is current, ergo visible
|
||||
None => true,
|
||||
};
|
||||
if !should_stay {
|
||||
dropped += 1;
|
||||
tracing::trace!(
|
||||
"Dropping row version {:?} {:?}-{:?}",
|
||||
entry.key(),
|
||||
rv.begin,
|
||||
rv.end
|
||||
);
|
||||
}
|
||||
should_stay
|
||||
});
|
||||
if row_versions.is_empty() {
|
||||
to_remove.push(*entry.key());
|
||||
}
|
||||
}
|
||||
for id in to_remove {
|
||||
self.rows.remove(&id);
|
||||
}
|
||||
dropped
|
||||
}
|
||||
|
||||
pub fn recover(&self) -> Result<()> {
|
||||
let tx_log = self.storage.read_tx_log()?;
|
||||
for record in tx_log {
|
||||
tracing::debug!("RECOVERING {:?}", record);
|
||||
for version in record.row_versions {
|
||||
self.insert_version(version.row.id, version);
|
||||
}
|
||||
self.clock.reset(record.tx_timestamp);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A write-write conflict happens when transaction T_m attempts to update a
|
||||
/// row version that is currently being updated by an active transaction T_n.
|
||||
pub(crate) fn is_write_write_conflict<T>(
|
||||
txs: &SkipMap<TxID, RwLock<Transaction>>,
|
||||
tx: &Transaction,
|
||||
rv: &RowVersion<T>,
|
||||
) -> bool {
|
||||
match rv.end {
|
||||
Some(TxTimestampOrID::TxID(rv_end)) => {
|
||||
let te = txs.get(&rv_end).unwrap();
|
||||
let te = te.value().read().unwrap();
|
||||
match te.state.load() {
|
||||
TransactionState::Active | TransactionState::Preparing => tx.tx_id != te.tx_id,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
Some(TxTimestampOrID::Timestamp(_)) => false,
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_version_visible<T>(
|
||||
txs: &SkipMap<TxID, RwLock<Transaction>>,
|
||||
tx: &Transaction,
|
||||
rv: &RowVersion<T>,
|
||||
) -> bool {
|
||||
is_begin_visible(txs, tx, rv) && is_end_visible(txs, tx, rv)
|
||||
}
|
||||
|
||||
fn is_begin_visible<T>(
|
||||
txs: &SkipMap<TxID, RwLock<Transaction>>,
|
||||
tx: &Transaction,
|
||||
rv: &RowVersion<T>,
|
||||
) -> bool {
|
||||
match rv.begin {
|
||||
TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts,
|
||||
TxTimestampOrID::TxID(rv_begin) => {
|
||||
let tb = txs.get(&rv_begin).unwrap();
|
||||
let tb = tb.value().read().unwrap();
|
||||
let visible = match tb.state.load() {
|
||||
TransactionState::Active => tx.tx_id == tb.tx_id && rv.end.is_none(),
|
||||
TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable!
|
||||
TransactionState::Committed(committed_ts) => tx.begin_ts >= committed_ts,
|
||||
TransactionState::Aborted => false,
|
||||
TransactionState::Terminated => {
|
||||
tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now");
|
||||
false
|
||||
}
|
||||
};
|
||||
tracing::trace!(
|
||||
"is_begin_visible: tx={tx}, tb={tb} rv = {:?}-{:?} visible = {visible}",
|
||||
rv.begin,
|
||||
rv.end
|
||||
);
|
||||
visible
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_end_visible<T>(
|
||||
txs: &SkipMap<TxID, RwLock<Transaction>>,
|
||||
tx: &Transaction,
|
||||
rv: &RowVersion<T>,
|
||||
) -> bool {
|
||||
match rv.end {
|
||||
Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts,
|
||||
Some(TxTimestampOrID::TxID(rv_end)) => {
|
||||
let te = txs.get(&rv_end).unwrap();
|
||||
let te = te.value().read().unwrap();
|
||||
let visible = match te.state.load() {
|
||||
TransactionState::Active => tx.tx_id != te.tx_id,
|
||||
TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable!
|
||||
TransactionState::Committed(committed_ts) => tx.begin_ts < committed_ts,
|
||||
TransactionState::Aborted => false,
|
||||
TransactionState::Terminated => {
|
||||
tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now");
|
||||
false
|
||||
}
|
||||
};
|
||||
tracing::trace!(
|
||||
"is_end_visible: tx={tx}, te={te} rv = {:?}-{:?} visible = {visible}",
|
||||
rv.begin,
|
||||
rv.end
|
||||
);
|
||||
visible
|
||||
}
|
||||
None => true,
|
||||
}
|
||||
}
|
||||
933
core/mvcc/mvcc-rs/src/database/tests.rs
Normal file
933
core/mvcc/mvcc-rs/src/database/tests.rs
Normal file
@@ -0,0 +1,933 @@
|
||||
use super::*;
|
||||
use crate::clock::LocalClock;
|
||||
use tracing_test::traced_test;
|
||||
|
||||
fn test_db() -> Database<LocalClock, String> {
|
||||
let clock = LocalClock::new();
|
||||
let storage = crate::persistent_storage::Storage::new_noop();
|
||||
Database::new(clock, storage)
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_insert_read() {
|
||||
let db = test_db();
|
||||
|
||||
let tx1 = db.begin_tx();
|
||||
let tx1_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "Hello".to_string(),
|
||||
};
|
||||
db.insert(tx1, tx1_row.clone()).unwrap();
|
||||
let row = db
|
||||
.read(
|
||||
tx1,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx1_row, row);
|
||||
db.commit_tx(tx1).unwrap();
|
||||
|
||||
let tx2 = db.begin_tx();
|
||||
let row = db
|
||||
.read(
|
||||
tx2,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx1_row, row);
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_read_nonexistent() {
|
||||
let db = test_db();
|
||||
let tx = db.begin_tx();
|
||||
let row = db.read(
|
||||
tx,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
);
|
||||
assert!(row.unwrap().is_none());
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_delete() {
|
||||
let db = test_db();
|
||||
|
||||
let tx1 = db.begin_tx();
|
||||
let tx1_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "Hello".to_string(),
|
||||
};
|
||||
db.insert(tx1, tx1_row.clone()).unwrap();
|
||||
let row = db
|
||||
.read(
|
||||
tx1,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx1_row, row);
|
||||
db.delete(
|
||||
tx1,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
let row = db
|
||||
.read(
|
||||
tx1,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert!(row.is_none());
|
||||
db.commit_tx(tx1).unwrap();
|
||||
|
||||
let tx2 = db.begin_tx();
|
||||
let row = db
|
||||
.read(
|
||||
tx2,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert!(row.is_none());
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_delete_nonexistent() {
|
||||
let db = test_db();
|
||||
let tx = db.begin_tx();
|
||||
assert!(!db
|
||||
.delete(
|
||||
tx,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1
|
||||
}
|
||||
)
|
||||
.unwrap());
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_commit() {
|
||||
let db = test_db();
|
||||
let tx1 = db.begin_tx();
|
||||
let tx1_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "Hello".to_string(),
|
||||
};
|
||||
db.insert(tx1, tx1_row.clone()).unwrap();
|
||||
let row = db
|
||||
.read(
|
||||
tx1,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx1_row, row);
|
||||
let tx1_updated_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "World".to_string(),
|
||||
};
|
||||
db.update(tx1, tx1_updated_row.clone()).unwrap();
|
||||
let row = db
|
||||
.read(
|
||||
tx1,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx1_updated_row, row);
|
||||
db.commit_tx(tx1).unwrap();
|
||||
|
||||
let tx2 = db.begin_tx();
|
||||
let row = db
|
||||
.read(
|
||||
tx2,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
db.commit_tx(tx2).unwrap();
|
||||
assert_eq!(tx1_updated_row, row);
|
||||
db.drop_unused_row_versions();
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_rollback() {
|
||||
let db = test_db();
|
||||
let tx1 = db.begin_tx();
|
||||
let row1 = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "Hello".to_string(),
|
||||
};
|
||||
db.insert(tx1, row1.clone()).unwrap();
|
||||
let row2 = db
|
||||
.read(
|
||||
tx1,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(row1, row2);
|
||||
let row3 = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "World".to_string(),
|
||||
};
|
||||
db.update(tx1, row3.clone()).unwrap();
|
||||
let row4 = db
|
||||
.read(
|
||||
tx1,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(row3, row4);
|
||||
db.rollback_tx(tx1);
|
||||
let tx2 = db.begin_tx();
|
||||
let row5 = db
|
||||
.read(
|
||||
tx2,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(row5, None);
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_dirty_write() {
|
||||
let db = test_db();
|
||||
|
||||
// T1 inserts a row with ID 1, but does not commit.
|
||||
let tx1 = db.begin_tx();
|
||||
let tx1_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "Hello".to_string(),
|
||||
};
|
||||
db.insert(tx1, tx1_row.clone()).unwrap();
|
||||
let row = db
|
||||
.read(
|
||||
tx1,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx1_row, row);
|
||||
|
||||
// T2 attempts to delete row with ID 1, but fails because T1 has not committed.
|
||||
let tx2 = db.begin_tx();
|
||||
let tx2_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "World".to_string(),
|
||||
};
|
||||
assert!(!db.update(tx2, tx2_row).unwrap());
|
||||
|
||||
let row = db
|
||||
.read(
|
||||
tx1,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx1_row, row);
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_dirty_read() {
|
||||
let db = test_db();
|
||||
|
||||
// T1 inserts a row with ID 1, but does not commit.
|
||||
let tx1 = db.begin_tx();
|
||||
let row1 = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "Hello".to_string(),
|
||||
};
|
||||
db.insert(tx1, row1).unwrap();
|
||||
|
||||
// T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed.
|
||||
let tx2 = db.begin_tx();
|
||||
let row2 = db
|
||||
.read(
|
||||
tx2,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(row2, None);
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_dirty_read_deleted() {
|
||||
let db = test_db();
|
||||
|
||||
// T1 inserts a row with ID 1 and commits.
|
||||
let tx1 = db.begin_tx();
|
||||
let tx1_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "Hello".to_string(),
|
||||
};
|
||||
db.insert(tx1, tx1_row.clone()).unwrap();
|
||||
db.commit_tx(tx1).unwrap();
|
||||
|
||||
// T2 deletes row with ID 1, but does not commit.
|
||||
let tx2 = db.begin_tx();
|
||||
assert!(db
|
||||
.delete(
|
||||
tx2,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1
|
||||
}
|
||||
)
|
||||
.unwrap());
|
||||
|
||||
// T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed.
|
||||
let tx3 = db.begin_tx();
|
||||
let row = db
|
||||
.read(
|
||||
tx3,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx1_row, row);
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_fuzzy_read() {
|
||||
let db = test_db();
|
||||
|
||||
// T1 inserts a row with ID 1 and commits.
|
||||
let tx1 = db.begin_tx();
|
||||
let tx1_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "Hello".to_string(),
|
||||
};
|
||||
db.insert(tx1, tx1_row.clone()).unwrap();
|
||||
let row = db
|
||||
.read(
|
||||
tx1,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx1_row, row);
|
||||
db.commit_tx(tx1).unwrap();
|
||||
|
||||
// T2 reads the row with ID 1 within an active transaction.
|
||||
let tx2 = db.begin_tx();
|
||||
let row = db
|
||||
.read(
|
||||
tx2,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx1_row, row);
|
||||
|
||||
// T3 updates the row and commits.
|
||||
let tx3 = db.begin_tx();
|
||||
let tx3_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "World".to_string(),
|
||||
};
|
||||
db.update(tx3, tx3_row).unwrap();
|
||||
db.commit_tx(tx3).unwrap();
|
||||
|
||||
// T2 still reads the same version of the row as before.
|
||||
let row = db
|
||||
.read(
|
||||
tx2,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx1_row, row);
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_lost_update() {
|
||||
let db = test_db();
|
||||
|
||||
// T1 inserts a row with ID 1 and commits.
|
||||
let tx1 = db.begin_tx();
|
||||
let tx1_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "Hello".to_string(),
|
||||
};
|
||||
db.insert(tx1, tx1_row.clone()).unwrap();
|
||||
let row = db
|
||||
.read(
|
||||
tx1,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx1_row, row);
|
||||
db.commit_tx(tx1).unwrap();
|
||||
|
||||
// T2 attempts to update row ID 1 within an active transaction.
|
||||
let tx2 = db.begin_tx();
|
||||
let tx2_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "World".to_string(),
|
||||
};
|
||||
assert!(db.update(tx2, tx2_row.clone()).unwrap());
|
||||
|
||||
// T3 also attempts to update row ID 1 within an active transaction.
|
||||
let tx3 = db.begin_tx();
|
||||
let tx3_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "Hello, world!".to_string(),
|
||||
};
|
||||
assert_eq!(
|
||||
Err(DatabaseError::WriteWriteConflict),
|
||||
db.update(tx3, tx3_row)
|
||||
);
|
||||
|
||||
db.commit_tx(tx2).unwrap();
|
||||
assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3));
|
||||
|
||||
let tx4 = db.begin_tx();
|
||||
let row = db
|
||||
.read(
|
||||
tx4,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx2_row, row);
|
||||
}
|
||||
|
||||
// Test for the visibility to check if a new transaction can see old committed values.
|
||||
// This test checks for the typo present in the paper, explained in https://github.com/penberg/mvcc-rs/issues/15
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_committed_visibility() {
|
||||
let db = test_db();
|
||||
|
||||
// let's add $10 to my account since I like money
|
||||
let tx1 = db.begin_tx();
|
||||
let tx1_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "10".to_string(),
|
||||
};
|
||||
db.insert(tx1, tx1_row.clone()).unwrap();
|
||||
db.commit_tx(tx1).unwrap();
|
||||
|
||||
// but I like more money, so let me try adding $10 more
|
||||
let tx2 = db.begin_tx();
|
||||
let tx2_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "20".to_string(),
|
||||
};
|
||||
assert!(db.update(tx2, tx2_row.clone()).unwrap());
|
||||
let row = db
|
||||
.read(
|
||||
tx2,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(row, tx2_row);
|
||||
|
||||
// can I check how much money I have?
|
||||
let tx3 = db.begin_tx();
|
||||
let row = db
|
||||
.read(
|
||||
tx3,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx1_row, row);
|
||||
}
|
||||
|
||||
// Test to check if a older transaction can see (un)committed future rows
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_future_row() {
|
||||
let db = test_db();
|
||||
|
||||
let tx1 = db.begin_tx();
|
||||
|
||||
let tx2 = db.begin_tx();
|
||||
let tx2_row = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "10".to_string(),
|
||||
};
|
||||
db.insert(tx2, tx2_row).unwrap();
|
||||
|
||||
// transaction in progress, so tx1 shouldn't be able to see the value
|
||||
let row = db
|
||||
.read(
|
||||
tx1,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(row, None);
|
||||
|
||||
// lets commit the transaction and check if tx1 can see it
|
||||
db.commit_tx(tx2).unwrap();
|
||||
let row = db
|
||||
.read(
|
||||
tx1,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(row, None);
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_storage1() {
|
||||
let clock = LocalClock::new();
|
||||
let mut path = std::env::temp_dir();
|
||||
path.push(format!(
|
||||
"mvcc-rs-storage-test-{}",
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_nanos(),
|
||||
));
|
||||
let storage = crate::persistent_storage::Storage::new_json_on_disk(path.clone());
|
||||
let db = Database::new(clock, storage);
|
||||
|
||||
let tx1 = db.begin_tx();
|
||||
let tx2 = db.begin_tx();
|
||||
let tx3 = db.begin_tx();
|
||||
|
||||
db.insert(
|
||||
tx3,
|
||||
Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "testme".to_string(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
db.commit_tx(tx1).unwrap();
|
||||
db.rollback_tx(tx2);
|
||||
db.commit_tx(tx3).unwrap();
|
||||
|
||||
let tx4 = db.begin_tx();
|
||||
db.insert(
|
||||
tx4,
|
||||
Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 2,
|
||||
},
|
||||
data: "testme2".to_string(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
db.insert(
|
||||
tx4,
|
||||
Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 3,
|
||||
},
|
||||
data: "testme3".to_string(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
db.read(
|
||||
tx4,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1
|
||||
}
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.data,
|
||||
"testme"
|
||||
);
|
||||
assert_eq!(
|
||||
db.read(
|
||||
tx4,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 2
|
||||
}
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.data,
|
||||
"testme2"
|
||||
);
|
||||
assert_eq!(
|
||||
db.read(
|
||||
tx4,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 3
|
||||
}
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.data,
|
||||
"testme3"
|
||||
);
|
||||
db.commit_tx(tx4).unwrap();
|
||||
|
||||
let clock = LocalClock::new();
|
||||
let storage = crate::persistent_storage::Storage::new_json_on_disk(path);
|
||||
let db: Database<LocalClock, String> = Database::new(clock, storage);
|
||||
db.recover().unwrap();
|
||||
println!("{:#?}", db);
|
||||
|
||||
let tx5 = db.begin_tx();
|
||||
println!(
|
||||
"{:#?}",
|
||||
db.read(
|
||||
tx5,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1
|
||||
}
|
||||
)
|
||||
);
|
||||
assert_eq!(
|
||||
db.read(
|
||||
tx5,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 1
|
||||
}
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.data,
|
||||
"testme"
|
||||
);
|
||||
assert_eq!(
|
||||
db.read(
|
||||
tx5,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 2
|
||||
}
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.data,
|
||||
"testme2"
|
||||
);
|
||||
assert_eq!(
|
||||
db.read(
|
||||
tx5,
|
||||
RowID {
|
||||
table_id: 1,
|
||||
row_id: 3
|
||||
}
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.data,
|
||||
"testme3"
|
||||
);
|
||||
}
|
||||
|
||||
/* States described in the Hekaton paper *for serializability*:
|
||||
|
||||
Table 1: Case analysis of action to take when version V’s
|
||||
Begin field contains the ID of transaction TB
|
||||
------------------------------------------------------------------------------------------------------
|
||||
TB’s state | TB’s end timestamp | Action to take when transaction T checks visibility of version V.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Active | Not set | V is visible only if TB=T and V’s end timestamp equals infinity.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Preparing | TS | V’s begin timestamp will be TS ut V is not yet committed. Use TS
|
||||
| as V’s begin time when testing visibility. If the test is true,
|
||||
| allow T to speculatively read V. Committed TS V’s begin timestamp
|
||||
| will be TS and V is committed. Use TS as V’s begin time to test
|
||||
| visibility.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Committed | TS | V’s begin timestamp will be TS and V is committed. Use TS as V’s
|
||||
| begin time to test visibility.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Aborted | Irrelevant | Ignore V; it’s a garbage version.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Terminated | Irrelevant | Reread V’s Begin field. TB has terminated so it must have finalized
|
||||
or not found | | the timestamp.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
|
||||
Table 2: Case analysis of action to take when V's End field
|
||||
contains a transaction ID TE.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
TE’s state | TE’s end timestamp | Action to take when transaction T checks visibility of a version V
|
||||
| | as of read time RT.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Active | Not set | V is visible only if TE is not T.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Preparing | TS | V’s end timestamp will be TS provided that TE commits. If TS > RT,
|
||||
| V is visible to T. If TS < RT, T speculatively ignores V.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Committed | TS | V’s end timestamp will be TS and V is committed. Use TS as V’s end
|
||||
| timestamp when testing visibility.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Aborted | Irrelevant | V is visible.
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Terminated | Irrelevant | Reread V’s End field. TE has terminated so it must have finalized
|
||||
or not found | | the timestamp.
|
||||
*/
|
||||
|
||||
fn new_tx(tx_id: TxID, begin_ts: u64, state: TransactionState) -> RwLock<Transaction> {
|
||||
let state = state.into();
|
||||
RwLock::new(Transaction {
|
||||
state,
|
||||
tx_id,
|
||||
begin_ts,
|
||||
write_set: SkipSet::new(),
|
||||
read_set: SkipSet::new(),
|
||||
})
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[test]
|
||||
fn test_snapshot_isolation_tx_visible1() {
|
||||
let txs: SkipMap<TxID, RwLock<Transaction>> = SkipMap::from_iter([
|
||||
(1, new_tx(1, 1, TransactionState::Committed(2))),
|
||||
(2, new_tx(2, 2, TransactionState::Committed(5))),
|
||||
(3, new_tx(3, 3, TransactionState::Aborted)),
|
||||
(5, new_tx(5, 5, TransactionState::Preparing)),
|
||||
(6, new_tx(6, 6, TransactionState::Committed(10))),
|
||||
(7, new_tx(7, 7, TransactionState::Active)),
|
||||
]);
|
||||
|
||||
let current_tx = new_tx(4, 4, TransactionState::Preparing);
|
||||
let current_tx = current_tx.read().unwrap();
|
||||
|
||||
let rv_visible = |begin: TxTimestampOrID, end: Option<TxTimestampOrID>| {
|
||||
let row_version = RowVersion {
|
||||
begin,
|
||||
end,
|
||||
row: Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "testme".to_string(),
|
||||
},
|
||||
};
|
||||
tracing::debug!("Testing visibility of {row_version:?}");
|
||||
is_version_visible(&txs, ¤t_tx, &row_version)
|
||||
};
|
||||
|
||||
// begin visible: transaction committed with ts < current_tx.begin_ts
|
||||
// end visible: inf
|
||||
assert!(rv_visible(TxTimestampOrID::TxID(1), None));
|
||||
|
||||
// begin invisible: transaction committed with ts > current_tx.begin_ts
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(2), None));
|
||||
|
||||
// begin invisible: transaction aborted
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(3), None));
|
||||
|
||||
// begin visible: timestamp < current_tx.begin_ts
|
||||
// end invisible: transaction committed with ts > current_tx.begin_ts
|
||||
assert!(!rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::TxID(1))
|
||||
));
|
||||
|
||||
// begin visible: timestamp < current_tx.begin_ts
|
||||
// end visible: transaction committed with ts < current_tx.begin_ts
|
||||
assert!(rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::TxID(2))
|
||||
));
|
||||
|
||||
// begin visible: timestamp < current_tx.begin_ts
|
||||
// end invisible: transaction aborted
|
||||
assert!(!rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::TxID(3))
|
||||
));
|
||||
|
||||
// begin invisible: transaction preparing
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(5), None));
|
||||
|
||||
// begin invisible: transaction committed with ts > current_tx.begin_ts
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(6), None));
|
||||
|
||||
// begin invisible: transaction active
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(7), None));
|
||||
|
||||
// begin invisible: transaction committed with ts > current_tx.begin_ts
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(6), None));
|
||||
|
||||
// begin invisible: transaction active
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(7), None));
|
||||
|
||||
// begin visible: timestamp < current_tx.begin_ts
|
||||
// end invisible: transaction preparing
|
||||
assert!(!rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::TxID(5))
|
||||
));
|
||||
|
||||
// begin invisible: timestamp > current_tx.begin_ts
|
||||
assert!(!rv_visible(
|
||||
TxTimestampOrID::Timestamp(6),
|
||||
Some(TxTimestampOrID::TxID(6))
|
||||
));
|
||||
|
||||
// begin visible: timestamp < current_tx.begin_ts
|
||||
// end visible: some active transaction will eventually overwrite this version,
|
||||
// but that hasn't happened
|
||||
// (this is the https://avi.im/blag/2023/hekaton-paper-typo/ case, I believe!)
|
||||
assert!(rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::TxID(7))
|
||||
));
|
||||
}
|
||||
13
core/mvcc/mvcc-rs/src/errors.rs
Normal file
13
core/mvcc/mvcc-rs/src/errors.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug, PartialEq)]
|
||||
pub enum DatabaseError {
|
||||
#[error("no such transaction ID: `{0}`")]
|
||||
NoSuchTransactionID(u64),
|
||||
#[error("transaction aborted because of a write-write conflict")]
|
||||
WriteWriteConflict,
|
||||
#[error("transaction is terminated")]
|
||||
TxTerminated,
|
||||
#[error("I/O error: {0}")]
|
||||
Io(String),
|
||||
}
|
||||
38
core/mvcc/mvcc-rs/src/lib.rs
Normal file
38
core/mvcc/mvcc-rs/src/lib.rs
Normal file
@@ -0,0 +1,38 @@
|
||||
//! Multiversion concurrency control (MVCC) for Rust.
|
||||
//!
|
||||
//! This module implements the main memory MVCC method outlined in the paper
|
||||
//! "High-Performance Concurrency Control Mechanisms for Main-Memory Databases"
|
||||
//! by Per-Åke Larson et al (VLDB, 2011).
|
||||
//!
|
||||
//! ## Data anomalies
|
||||
//!
|
||||
//! * A *dirty write* occurs when transaction T_m updates a value that is written by
|
||||
//! transaction T_n but not yet committed. The MVCC algorithm prevents dirty
|
||||
//! writes by validating that a row version is visible to transaction T_m before
|
||||
//! allowing update to it.
|
||||
//!
|
||||
//! * A *dirty read* occurs when transaction T_m reads a value that was written by
|
||||
//! transaction T_n but not yet committed. The MVCC algorithm prevents dirty
|
||||
//! reads by validating that a row version is visible to transaction T_m.
|
||||
//!
|
||||
//! * A *fuzzy read* (non-repeatable read) occurs when transaction T_m reads a
|
||||
//! different value in the course of the transaction because another
|
||||
//! transaction T_n has updated the value.
|
||||
//!
|
||||
//! * A *lost update* occurs when transactions T_m and T_n both attempt to update
|
||||
//! the same value, resulting in one of the updates being lost. The MVCC algorithm
|
||||
//! prevents lost updates by detecting the write-write conflict and letting the
|
||||
//! first-writer win by aborting the later transaction.
|
||||
//!
|
||||
//! TODO: phantom reads, cursor lost updates, read skew, write skew.
|
||||
//!
|
||||
//! ## TODO
|
||||
//!
|
||||
//! * Optimistic reads and writes
|
||||
//! * Garbage collection
|
||||
|
||||
pub mod clock;
|
||||
pub mod cursor;
|
||||
pub mod database;
|
||||
pub mod errors;
|
||||
pub mod persistent_storage;
|
||||
82
core/mvcc/mvcc-rs/src/persistent_storage/mod.rs
Normal file
82
core/mvcc/mvcc-rs/src/persistent_storage/mod.rs
Normal file
@@ -0,0 +1,82 @@
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::fmt::Debug;
|
||||
|
||||
use crate::database::{LogRecord, Result};
|
||||
use crate::errors::DatabaseError;
|
||||
|
||||
pub mod s3;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Storage {
|
||||
Noop,
|
||||
JsonOnDisk(std::path::PathBuf),
|
||||
S3(s3::Replicator),
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
pub fn new_noop() -> Self {
|
||||
Self::Noop
|
||||
}
|
||||
|
||||
pub fn new_json_on_disk(path: impl Into<std::path::PathBuf>) -> Self {
|
||||
let path = path.into();
|
||||
Self::JsonOnDisk(path)
|
||||
}
|
||||
|
||||
pub fn new_s3(options: s3::Options) -> Result<Self> {
|
||||
let replicator = futures::executor::block_on(s3::Replicator::new(options))?;
|
||||
Ok(Self::S3(replicator))
|
||||
}
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
pub fn log_tx<T: Serialize>(&self, m: LogRecord<T>) -> Result<()> {
|
||||
match self {
|
||||
Self::JsonOnDisk(path) => {
|
||||
use std::io::Write;
|
||||
let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?;
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(path)
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?;
|
||||
file.write_all(&t)
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?;
|
||||
file.write_all(b"\n")
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?;
|
||||
}
|
||||
Self::S3(replicator) => {
|
||||
futures::executor::block_on(replicator.replicate_tx(m))?;
|
||||
}
|
||||
Self::Noop => (),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn read_tx_log<T: DeserializeOwned + Debug>(&self) -> Result<Vec<LogRecord<T>>> {
|
||||
match self {
|
||||
Self::JsonOnDisk(path) => {
|
||||
use std::io::BufRead;
|
||||
let file = std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(path)
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?;
|
||||
|
||||
let mut records: Vec<LogRecord<T>> = Vec::new();
|
||||
let mut lines = std::io::BufReader::new(file).lines();
|
||||
while let Some(Ok(line)) = lines.next() {
|
||||
records.push(
|
||||
serde_json::from_str(&line)
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?,
|
||||
)
|
||||
}
|
||||
Ok(records)
|
||||
}
|
||||
Self::S3(replicator) => futures::executor::block_on(replicator.read_tx_log()),
|
||||
Self::Noop => Err(crate::errors::DatabaseError::Io(
|
||||
"cannot read from Noop storage".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
139
core/mvcc/mvcc-rs/src/persistent_storage/s3.rs
Normal file
139
core/mvcc/mvcc-rs/src/persistent_storage/s3.rs
Normal file
@@ -0,0 +1,139 @@
|
||||
use crate::database::{LogRecord, Result};
|
||||
use crate::errors::DatabaseError;
|
||||
use aws_sdk_s3::Client;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::fmt::Debug;
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
#[non_exhaustive]
|
||||
pub struct Options {
|
||||
pub create_bucket_if_not_exists: bool,
|
||||
}
|
||||
|
||||
impl Options {
|
||||
pub fn with_create_bucket_if_not_exists(create_bucket_if_not_exists: bool) -> Self {
|
||||
Self {
|
||||
create_bucket_if_not_exists,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Replicator {
|
||||
pub client: Client,
|
||||
pub bucket: String,
|
||||
pub prefix: String,
|
||||
}
|
||||
|
||||
impl Replicator {
|
||||
pub async fn new(options: Options) -> Result<Self> {
|
||||
let mut loader = aws_config::from_env();
|
||||
if let Ok(endpoint) = std::env::var("MVCCRS_ENDPOINT") {
|
||||
loader = loader.endpoint_url(endpoint);
|
||||
}
|
||||
let sdk_config = loader.load().await;
|
||||
let config = aws_sdk_s3::config::Builder::from(&sdk_config)
|
||||
.force_path_style(true)
|
||||
.build();
|
||||
let bucket = std::env::var("MVCCRS_BUCKET").unwrap_or_else(|_| "mvccrs".to_string());
|
||||
let prefix = std::env::var("MVCCRS_PREFIX").unwrap_or_else(|_| "tx".to_string());
|
||||
let client = Client::from_conf(config);
|
||||
|
||||
match client.head_bucket().bucket(&bucket).send().await {
|
||||
Ok(_) => tracing::info!("Bucket {bucket} exists and is accessible"),
|
||||
Err(aws_sdk_s3::error::SdkError::ServiceError(err)) if err.err().is_not_found() => {
|
||||
if options.create_bucket_if_not_exists {
|
||||
tracing::info!("Bucket {bucket} not found, recreating");
|
||||
client
|
||||
.create_bucket()
|
||||
.bucket(&bucket)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?;
|
||||
} else {
|
||||
tracing::error!("Bucket {bucket} does not exist");
|
||||
return Err(DatabaseError::Io(err.err().to_string()));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Bucket checking error: {e}");
|
||||
return Err(DatabaseError::Io(e.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
client,
|
||||
bucket,
|
||||
prefix,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn replicate_tx<T: Serialize>(&self, record: LogRecord<T>) -> Result<()> {
|
||||
let key = format!("{}-{:020}", self.prefix, record.tx_timestamp);
|
||||
tracing::trace!("Replicating {key}");
|
||||
let body = serde_json::to_vec(&record).map_err(|e| DatabaseError::Io(e.to_string()))?;
|
||||
let resp = self
|
||||
.client
|
||||
.put_object()
|
||||
.bucket(&self.bucket)
|
||||
.key(&key)
|
||||
.body(body.into())
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?;
|
||||
tracing::trace!("Replicator response: {:?}", resp);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn read_tx_log<T: DeserializeOwned + Debug>(&self) -> Result<Vec<LogRecord<T>>> {
|
||||
let mut records: Vec<LogRecord<T>> = Vec::new();
|
||||
// Read all objects from the bucket, one log record is stored in one object
|
||||
let mut next_token = None;
|
||||
loop {
|
||||
let mut req = self
|
||||
.client
|
||||
.list_objects_v2()
|
||||
.bucket(&self.bucket)
|
||||
.prefix(&self.prefix);
|
||||
if let Some(next_token) = next_token {
|
||||
req = req.continuation_token(next_token);
|
||||
}
|
||||
let resp = req
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?;
|
||||
tracing::trace!("List objects response: {:?}", resp);
|
||||
if let Some(contents) = resp.contents {
|
||||
// read the record from s3 based on the object metadata (`contents`)
|
||||
// and store it in the `records` vector
|
||||
for object in contents {
|
||||
let key = object.key.unwrap();
|
||||
let resp = self
|
||||
.client
|
||||
.get_object()
|
||||
.bucket(&self.bucket)
|
||||
.key(&key)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?;
|
||||
tracing::trace!("Get object response: {:?}", resp);
|
||||
let body = resp
|
||||
.body
|
||||
.collect()
|
||||
.await
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?;
|
||||
let record: LogRecord<T> = serde_json::from_slice(&body.into_bytes())
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?;
|
||||
records.push(record);
|
||||
}
|
||||
}
|
||||
if resp.next_continuation_token.is_none() {
|
||||
break;
|
||||
}
|
||||
next_token = resp.next_continuation_token;
|
||||
}
|
||||
tracing::trace!("Records: {records:?}");
|
||||
Ok(records)
|
||||
}
|
||||
}
|
||||
124
core/mvcc/mvcc-rs/tests/concurrency_test.rs
Normal file
124
core/mvcc/mvcc-rs/tests/concurrency_test.rs
Normal file
@@ -0,0 +1,124 @@
|
||||
use mvcc_rs::clock::LocalClock;
|
||||
use mvcc_rs::database::{Database, Row, RowID};
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Arc, Once};
|
||||
|
||||
static IDS: AtomicU64 = AtomicU64::new(1);
|
||||
|
||||
static START: Once = Once::new();
|
||||
|
||||
#[test]
|
||||
fn test_non_overlapping_concurrent_inserts() {
|
||||
START.call_once(|| {
|
||||
tracing_subscriber::fmt::init();
|
||||
});
|
||||
// Two threads insert to the database concurrently using non-overlapping
|
||||
// row IDs.
|
||||
let clock = LocalClock::default();
|
||||
let storage = mvcc_rs::persistent_storage::Storage::new_noop();
|
||||
let db = Arc::new(Database::new(clock, storage));
|
||||
let iterations = 100000;
|
||||
|
||||
let th1 = {
|
||||
let db = db.clone();
|
||||
std::thread::spawn(move || {
|
||||
for _ in 0..iterations {
|
||||
let tx = db.begin_tx();
|
||||
let id = IDS.fetch_add(1, Ordering::SeqCst);
|
||||
let id = RowID {
|
||||
table_id: 1,
|
||||
row_id: id,
|
||||
};
|
||||
let row = Row {
|
||||
id,
|
||||
data: "Hello".to_string(),
|
||||
};
|
||||
db.insert(tx, row.clone()).unwrap();
|
||||
db.commit_tx(tx).unwrap();
|
||||
let tx = db.begin_tx();
|
||||
let committed_row = db.read(tx, id).unwrap();
|
||||
db.commit_tx(tx).unwrap();
|
||||
assert_eq!(committed_row, Some(row));
|
||||
}
|
||||
})
|
||||
};
|
||||
let th2 = {
|
||||
std::thread::spawn(move || {
|
||||
for _ in 0..iterations {
|
||||
let tx = db.begin_tx();
|
||||
let id = IDS.fetch_add(1, Ordering::SeqCst);
|
||||
let id = RowID {
|
||||
table_id: 1,
|
||||
row_id: id,
|
||||
};
|
||||
let row = Row {
|
||||
id,
|
||||
data: "World".to_string(),
|
||||
};
|
||||
db.insert(tx, row.clone()).unwrap();
|
||||
db.commit_tx(tx).unwrap();
|
||||
let tx = db.begin_tx();
|
||||
let committed_row = db.read(tx, id).unwrap();
|
||||
db.commit_tx(tx).unwrap();
|
||||
assert_eq!(committed_row, Some(row));
|
||||
}
|
||||
})
|
||||
};
|
||||
th1.join().unwrap();
|
||||
th2.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_overlapping_concurrent_inserts_read_your_writes() {
|
||||
START.call_once(|| {
|
||||
tracing_subscriber::fmt::init();
|
||||
}); // Two threads insert to the database concurrently using overlapping row IDs.
|
||||
let clock = LocalClock::default();
|
||||
let storage = mvcc_rs::persistent_storage::Storage::new_noop();
|
||||
let db = Arc::new(Database::new(clock, storage));
|
||||
let iterations = 100000;
|
||||
|
||||
let work = |prefix: &'static str| {
|
||||
let db = db.clone();
|
||||
std::thread::spawn(move || {
|
||||
let mut failed_upserts = 0;
|
||||
for i in 0..iterations {
|
||||
if i % 1000 == 0 {
|
||||
tracing::debug!("{prefix}: {i}");
|
||||
}
|
||||
if i % 10000 == 0 {
|
||||
let dropped = db.drop_unused_row_versions();
|
||||
tracing::debug!("garbage collected {dropped} versions");
|
||||
}
|
||||
let tx = db.begin_tx();
|
||||
let id = i % 16;
|
||||
let id = RowID {
|
||||
table_id: 1,
|
||||
row_id: id,
|
||||
};
|
||||
let row = Row {
|
||||
id,
|
||||
data: format!("{prefix} @{tx}"),
|
||||
};
|
||||
if let Err(e) = db.upsert(tx, row.clone()) {
|
||||
tracing::trace!("upsert failed: {e}");
|
||||
failed_upserts += 1;
|
||||
continue;
|
||||
}
|
||||
let committed_row = db.read(tx, id).unwrap();
|
||||
db.commit_tx(tx).unwrap();
|
||||
assert_eq!(committed_row, Some(row));
|
||||
}
|
||||
tracing::info!(
|
||||
"{prefix}'s failed upserts: {failed_upserts}/{iterations} {:.2}%",
|
||||
(failed_upserts * 100) as f64 / iterations as f64
|
||||
);
|
||||
})
|
||||
};
|
||||
|
||||
let threads = vec![work("A"), work("B"), work("C"), work("D")];
|
||||
for th in threads {
|
||||
th.join().unwrap();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user