diff --git a/Cargo.lock b/Cargo.lock index dc49a85f0..cb6dda8d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -554,6 +554,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -1570,6 +1580,7 @@ dependencies = [ "cfg_block", "chrono", "criterion", + "crossbeam-skiplist", "fallible-iterator 0.3.0", "getrandom 0.2.15", "hex", @@ -1607,6 +1618,7 @@ dependencies = [ "strum", "tempfile", "thiserror 1.0.69", + "tracing", ] [[package]] @@ -3043,14 +3055,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "tracing-core" version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" +dependencies = [ + "once_cell", +] [[package]] name = "typenum" diff --git a/core/Cargo.toml b/core/Cargo.toml index f2d43a156..f2bba5f59 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -70,6 +70,8 @@ limbo_time = { path = "../extensions/time", optional = true, features = ["static miette = "7.4.0" strum = "0.26" parking_lot = "0.12.3" +tracing = "0.1.41" +crossbeam-skiplist = "0.1.3" [build-dependencies] chrono = "0.4.38" @@ -92,3 +94,7 @@ tempfile = "3.8.0" [[bench]] name = "benchmark" harness = false + +[[bench]] +name = "mvcc_benchmark" +harness = false diff --git a/core/benches/mvcc_benchmark.rs b/core/benches/mvcc_benchmark.rs new file mode 100644 index 000000000..899c8b82d --- /dev/null +++ b/core/benches/mvcc_benchmark.rs @@ -0,0 +1,129 @@ +use criterion::async_executor::FuturesExecutor; +use criterion::{criterion_group, criterion_main, Criterion, Throughput}; +use limbo_core::mvcc::clock::LocalClock; +use limbo_core::mvcc::database::{Database, Row, RowID}; +use pprof::criterion::{Output, PProfProfiler}; + +fn bench_db() -> Database { + let clock = LocalClock::default(); + let storage = limbo_core::mvcc::persistent_storage::Storage::new_noop(); + Database::new(clock, storage) +} + +fn bench(c: &mut Criterion) { + let mut group = c.benchmark_group("mvcc-ops-throughput"); + group.throughput(Throughput::Elements(1)); + + let db = bench_db(); + group.bench_function("begin_tx + rollback_tx", |b| { + b.to_async(FuturesExecutor).iter(|| async { + let tx_id = db.begin_tx(); + db.rollback_tx(tx_id) + }) + }); + + let db = bench_db(); + group.bench_function("begin_tx + commit_tx", |b| { + b.to_async(FuturesExecutor).iter(|| async { + let tx_id = db.begin_tx(); + db.commit_tx(tx_id) + }) + }); + + let db = bench_db(); + group.bench_function("begin_tx-read-commit_tx", |b| { + b.to_async(FuturesExecutor).iter(|| async { + let tx_id = db.begin_tx(); + db.read( + tx_id, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + db.commit_tx(tx_id) + }) + }); + + let db = bench_db(); + group.bench_function("begin_tx-update-commit_tx", |b| { + b.to_async(FuturesExecutor).iter(|| async { + let tx_id = db.begin_tx(); + db.update( + tx_id, + Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "World".to_string(), + }, + ) + .unwrap(); + db.commit_tx(tx_id) + }) + }); + + let db = bench_db(); + let tx = db.begin_tx(); + db.insert( + tx, + Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }, + ) + .unwrap(); + group.bench_function("read", |b| { + b.to_async(FuturesExecutor).iter(|| async { + db.read( + tx, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + }) + }); + + let db = bench_db(); + let tx = db.begin_tx(); + db.insert( + tx, + Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }, + ) + .unwrap(); + group.bench_function("update", |b| { + b.to_async(FuturesExecutor).iter(|| async { + db.update( + tx, + Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "World".to_string(), + }, + ) + .unwrap(); + }) + }); +} + +criterion_group! { + name = benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench +} +criterion_main!(benches); diff --git a/core/lib.rs b/core/lib.rs index f1c78f9f2..c2e0f22c6 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -5,6 +5,7 @@ mod info; mod io; #[cfg(feature = "json")] mod json; +pub mod mvcc; mod parameters; mod pseudo; mod result; diff --git a/core/mvcc/clock.rs b/core/mvcc/clock.rs new file mode 100644 index 000000000..7bab1fe5d --- /dev/null +++ b/core/mvcc/clock.rs @@ -0,0 +1,31 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +/// Logical clock. +pub trait LogicalClock { + fn get_timestamp(&self) -> u64; + fn reset(&self, ts: u64); +} + +/// A node-local clock backed by an atomic counter. +#[derive(Debug, Default)] +pub struct LocalClock { + ts_sequence: AtomicU64, +} + +impl LocalClock { + pub fn new() -> Self { + Self { + ts_sequence: AtomicU64::new(0), + } + } +} + +impl LogicalClock for LocalClock { + fn get_timestamp(&self) -> u64 { + self.ts_sequence.fetch_add(1, Ordering::SeqCst) + } + + fn reset(&self, ts: u64) { + self.ts_sequence.store(ts, Ordering::SeqCst); + } +} diff --git a/core/mvcc/cursor.rs b/core/mvcc/cursor.rs new file mode 100644 index 000000000..4d120e214 --- /dev/null +++ b/core/mvcc/cursor.rs @@ -0,0 +1,67 @@ +use serde::de::DeserializeOwned; +use serde::Serialize; + +use crate::mvcc::clock::LogicalClock; +use crate::mvcc::database::{Database, Result, Row, RowID}; +use std::fmt::Debug; + +#[derive(Debug)] +pub struct ScanCursor< + 'a, + Clock: LogicalClock, + T: Sync + Send + Clone + Serialize + DeserializeOwned + Debug, +> { + pub db: &'a Database, + pub row_ids: Vec, + pub index: usize, + tx_id: u64, +} + +impl< + 'a, + Clock: LogicalClock, + T: Sync + Send + Clone + Serialize + DeserializeOwned + Debug + 'static, + > ScanCursor<'a, Clock, T> +{ + pub fn new( + db: &'a Database, + tx_id: u64, + table_id: u64, + ) -> Result> { + let row_ids = db.scan_row_ids_for_table(table_id)?; + Ok(Self { + db, + tx_id, + row_ids, + index: 0, + }) + } + + pub fn current_row_id(&self) -> Option { + if self.index >= self.row_ids.len() { + return None; + } + Some(self.row_ids[self.index]) + } + + pub fn current_row(&self) -> Result>> { + if self.index >= self.row_ids.len() { + return Ok(None); + } + let id = self.row_ids[self.index]; + self.db.read(self.tx_id, id) + } + + pub fn close(self) -> Result<()> { + Ok(()) + } + + pub fn forward(&mut self) -> bool { + self.index += 1; + self.index < self.row_ids.len() + } + + pub fn is_empty(&self) -> bool { + self.index >= self.row_ids.len() + } +} diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs new file mode 100644 index 000000000..6fa8420f3 --- /dev/null +++ b/core/mvcc/database/mod.rs @@ -0,0 +1,810 @@ +use crate::mvcc::clock::LogicalClock; +use crate::mvcc::errors::DatabaseError; +use crate::mvcc::persistent_storage::Storage; +use crossbeam_skiplist::{SkipMap, SkipSet}; +use std::fmt::Debug; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::RwLock; + +pub type Result = std::result::Result; + +#[cfg(test)] +mod tests; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct RowID { + pub table_id: u64, + pub row_id: u64, +} + +#[derive(Clone, Debug, PartialEq, PartialOrd)] + +pub struct Row { + pub id: RowID, + pub data: T, +} + +/// A row version. +#[derive(Clone, Debug, PartialEq)] +pub struct RowVersion { + begin: TxTimestampOrID, + end: Option, + row: Row, +} + +pub type TxID = u64; + +/// A log record contains all the versions inserted and deleted by a transaction. +#[derive(Clone, Debug)] +pub struct LogRecord { + pub(crate) tx_timestamp: TxID, + row_versions: Vec>, +} + +impl LogRecord { + fn new(tx_timestamp: TxID) -> Self { + Self { + tx_timestamp, + row_versions: Vec::new(), + } + } +} + +/// A transaction timestamp or ID. +/// +/// Versions either track a timestamp or a transaction ID, depending on the +/// phase of the transaction. During the active phase, new versions track the +/// transaction ID in the `begin` and `end` fields. After a transaction commits, +/// versions switch to tracking timestamps. +#[derive(Clone, Debug, PartialEq, PartialOrd)] +enum TxTimestampOrID { + Timestamp(u64), + TxID(TxID), +} + +/// Transaction +#[derive(Debug)] +pub struct Transaction { + /// The state of the transaction. + state: AtomicTransactionState, + /// The transaction ID. + tx_id: u64, + /// The transaction begin timestamp. + begin_ts: u64, + /// The transaction write set. + write_set: SkipSet, + /// The transaction read set. + read_set: SkipSet, +} + +impl Transaction { + fn new(tx_id: u64, begin_ts: u64) -> Transaction { + Transaction { + state: TransactionState::Active.into(), + tx_id, + begin_ts, + write_set: SkipSet::new(), + read_set: SkipSet::new(), + } + } + + fn insert_to_read_set(&self, id: RowID) { + self.read_set.insert(id); + } + + fn insert_to_write_set(&mut self, id: RowID) { + self.write_set.insert(id); + } +} + +impl std::fmt::Display for Transaction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + write!( + f, + "{{ state: {}, id: {}, begin_ts: {}, write_set: {:?}, read_set: {:?}", + self.state.load(), + self.tx_id, + self.begin_ts, + // FIXME: I'm sorry, we obviously shouldn't be cloning here. + self.write_set + .iter() + .map(|v| *v.value()) + .collect::>(), + self.read_set + .iter() + .map(|v| *v.value()) + .collect::>() + ) + } +} + +/// Transaction state. +#[derive(Debug, Clone, PartialEq)] +enum TransactionState { + Active, + Preparing, + Aborted, + Terminated, + Committed(u64), +} + +impl TransactionState { + pub fn encode(&self) -> u64 { + match self { + TransactionState::Active => 0, + TransactionState::Preparing => 1, + TransactionState::Aborted => 2, + TransactionState::Terminated => 3, + TransactionState::Committed(ts) => { + // We only support 2*62 - 1 timestamps, because the extra bit + // is used to encode the type. + assert!(ts & 0x8000_0000_0000_0000 == 0); + 0x8000_0000_0000_0000 | ts + } + } + } + + pub fn decode(v: u64) -> Self { + match v { + 0 => TransactionState::Active, + 1 => TransactionState::Preparing, + 2 => TransactionState::Aborted, + 3 => TransactionState::Terminated, + v if v & 0x8000_0000_0000_0000 != 0 => { + TransactionState::Committed(v & 0x7fff_ffff_ffff_ffff) + } + _ => panic!("Invalid transaction state"), + } + } +} + +// Transaction state encoded into a single 64-bit atomic. +#[derive(Debug)] +pub(crate) struct AtomicTransactionState { + pub(crate) state: AtomicU64, +} + +impl From for AtomicTransactionState { + fn from(state: TransactionState) -> Self { + Self { + state: AtomicU64::new(state.encode()), + } + } +} + +impl From for TransactionState { + fn from(state: AtomicTransactionState) -> Self { + let encoded = state.state.load(Ordering::Acquire); + TransactionState::decode(encoded) + } +} + +impl std::cmp::PartialEq for AtomicTransactionState { + fn eq(&self, other: &TransactionState) -> bool { + &self.load() == other + } +} + +impl std::fmt::Display for TransactionState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + match self { + TransactionState::Active => write!(f, "Active"), + TransactionState::Preparing => write!(f, "Preparing"), + TransactionState::Committed(ts) => write!(f, "Committed({ts})"), + TransactionState::Aborted => write!(f, "Aborted"), + TransactionState::Terminated => write!(f, "Terminated"), + } + } +} + +impl AtomicTransactionState { + fn store(&self, state: TransactionState) { + self.state.store(state.encode(), Ordering::Release); + } + + fn load(&self) -> TransactionState { + TransactionState::decode(self.state.load(Ordering::Acquire)) + } +} + +#[derive(Debug)] +pub struct Database { + rows: SkipMap>>>, + txs: SkipMap>, + tx_ids: AtomicU64, + clock: Clock, + storage: Storage, +} + +impl Database { + /// Creates a new database. + pub fn new(clock: Clock, storage: Storage) -> Self { + Self { + rows: SkipMap::new(), + txs: SkipMap::new(), + tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes + clock, + storage, + } + } + + // Extracts the begin timestamp from a transaction + fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 { + match ts_or_id { + TxTimestampOrID::Timestamp(ts) => *ts, + TxTimestampOrID::TxID(tx_id) => { + self.txs + .get(tx_id) + .unwrap() + .value() + .read() + .unwrap() + .begin_ts + } + } + } + + /// Inserts a new row version into the database, while making sure that + /// the row version is inserted in the correct order. + fn insert_version(&self, id: RowID, row_version: RowVersion) { + let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new())); + let mut versions = versions.value().write().unwrap(); + self.insert_version_raw(&mut versions, row_version) + } + + /// Inserts a new row version into the internal data structure for versions, + /// while making sure that the row version is inserted in the correct order. + fn insert_version_raw(&self, versions: &mut Vec>, row_version: RowVersion) { + // NOTICE: this is an insert a'la insertion sort, with pessimistic linear complexity. + // However, we expect the number of versions to be nearly sorted, so we deem it worthy + // to search linearly for the insertion point instead of paying the price of using + // another data structure, e.g. a BTreeSet. If it proves to be too quadratic empirically, + // we can either switch to a tree-like structure, or at least use partition_point() + // which performs a binary search for the insertion point. + let position = versions + .iter() + .rposition(|v| { + self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin) + }) + .map(|p| p + 1) + .unwrap_or(0); + if versions.len() - position > 3 { + tracing::debug!( + "Inserting a row version {} positions from the end", + versions.len() - position + ); + } + versions.insert(position, row_version); + } + + /// Inserts a new row into the database. + /// + /// This function inserts a new `row` into the database within the context + /// of the transaction `tx_id`. + /// + /// # Arguments + /// + /// * `tx_id` - the ID of the transaction in which to insert the new row. + /// * `row` - the row object containing the values to be inserted. + /// + pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { + let tx = self + .txs + .get(&tx_id) + .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + let mut tx = tx.value().write().unwrap(); + assert_eq!(tx.state, TransactionState::Active); + let id = row.id; + let row_version = RowVersion { + begin: TxTimestampOrID::TxID(tx.tx_id), + end: None, + row, + }; + tx.insert_to_write_set(id); + drop(tx); + self.insert_version(id, row_version); + Ok(()) + } + + /// Updates a row in the database with new values. + /// + /// This function updates an existing row in the database within the + /// context of the transaction `tx_id`. The `row` argument identifies the + /// row to be updated as `id` and contains the new values to be inserted. + /// + /// If the row identified by the `id` does not exist, this function does + /// nothing and returns `false`. Otherwise, the function updates the row + /// with the new values and returns `true`. + /// + /// # Arguments + /// + /// * `tx_id` - the ID of the transaction in which to update the new row. + /// * `row` - the row object containing the values to be updated. + /// + /// # Returns + /// + /// Returns `true` if the row was successfully updated, and `false` otherwise. + pub fn update(&self, tx_id: TxID, row: Row) -> Result { + if !self.delete(tx_id, row.id)? { + return Ok(false); + } + self.insert(tx_id, row)?; + Ok(true) + } + + /// Inserts a row in the database with new values, previously deleting + /// any old data if it existed. Bails on a delete error, e.g. write-write conflict. + pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> { + self.delete(tx_id, row.id)?; + self.insert(tx_id, row) + } + + /// Deletes a row from the table with the given `id`. + /// + /// This function deletes an existing row `id` in the database within the + /// context of the transaction `tx_id`. + /// + /// # Arguments + /// + /// * `tx_id` - the ID of the transaction in which to delete the new row. + /// * `id` - the ID of the row to delete. + /// + /// # Returns + /// + /// Returns `true` if the row was successfully deleted, and `false` otherwise. + /// + pub fn delete(&self, tx_id: TxID, id: RowID) -> Result { + let row_versions_opt = self.rows.get(&id); + if let Some(ref row_versions) = row_versions_opt { + let mut row_versions = row_versions.value().write().unwrap(); + for rv in row_versions.iter_mut().rev() { + let tx = self + .txs + .get(&tx_id) + .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + let tx = tx.value().read().unwrap(); + assert_eq!(tx.state, TransactionState::Active); + if is_write_write_conflict(&self.txs, &tx, rv) { + drop(row_versions); + drop(row_versions_opt); + drop(tx); + self.rollback_tx(tx_id); + return Err(DatabaseError::WriteWriteConflict); + } + if is_version_visible(&self.txs, &tx, rv) { + rv.end = Some(TxTimestampOrID::TxID(tx.tx_id)); + drop(row_versions); + drop(row_versions_opt); + drop(tx); + let tx = self + .txs + .get(&tx_id) + .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + let mut tx = tx.value().write().unwrap(); + tx.insert_to_write_set(id); + return Ok(true); + } + } + } + Ok(false) + } + + /// Retrieves a row from the table with the given `id`. + /// + /// This operation is performed within the scope of the transaction identified + /// by `tx_id`. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to perform the read operation in. + /// * `id` - The ID of the row to retrieve. + /// + /// # Returns + /// + /// Returns `Some(row)` with the row data if the row with the given `id` exists, + /// and `None` otherwise. + pub fn read(&self, tx_id: TxID, id: RowID) -> Result>> { + let tx = self.txs.get(&tx_id).unwrap(); + let tx = tx.value().read().unwrap(); + assert_eq!(tx.state, TransactionState::Active); + if let Some(row_versions) = self.rows.get(&id) { + let row_versions = row_versions.value().read().unwrap(); + for rv in row_versions.iter().rev() { + if is_version_visible(&self.txs, &tx, rv) { + tx.insert_to_read_set(id); + return Ok(Some(rv.row.clone())); + } + } + } + Ok(None) + } + + /// Gets all row ids in the database. + pub fn scan_row_ids(&self) -> Result> { + let keys = self.rows.iter().map(|entry| *entry.key()); + Ok(keys.collect()) + } + + /// Gets all row ids in the database for a given table. + pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { + Ok(self + .rows + .range( + RowID { + table_id, + row_id: 0, + }..RowID { + table_id, + row_id: u64::MAX, + }, + ) + .map(|entry| *entry.key()) + .collect()) + } + + /// Begins a new transaction in the database. + /// + /// This function starts a new transaction in the database and returns a `TxID` value + /// that you can use to perform operations within the transaction. All changes made within the + /// transaction are isolated from other transactions until you commit the transaction. + pub fn begin_tx(&self) -> TxID { + let tx_id = self.get_tx_id(); + let begin_ts = self.get_timestamp(); + let tx = Transaction::new(tx_id, begin_ts); + tracing::trace!("BEGIN {tx}"); + self.txs.insert(tx_id, RwLock::new(tx)); + tx_id + } + + /// Commits a transaction with the specified transaction ID. + /// + /// This function commits the changes made within the specified transaction and finalizes the + /// transaction. Once a transaction has been committed, all changes made within the transaction + /// are visible to other transactions that access the same data. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to commit. + pub fn commit_tx(&self, tx_id: TxID) -> Result<()> { + let end_ts = self.get_timestamp(); + // NOTICE: the first shadowed tx keeps the entry alive in the map + // for the duration of this whole function, which is important for correctness! + let tx = self.txs.get(&tx_id).ok_or(DatabaseError::TxTerminated)?; + let tx = tx.value().write().unwrap(); + match tx.state.load() { + TransactionState::Terminated => return Err(DatabaseError::TxTerminated), + _ => { + assert_eq!(tx.state, TransactionState::Active); + } + } + tx.state.store(TransactionState::Preparing); + tracing::trace!("PREPARE {tx}"); + + /* TODO: The code we have here is sufficient for snapshot isolation. + ** In order to implement serializability, we need the following steps: + ** + ** 1. Validate if all read versions are still visible by inspecting the read_set + ** 2. Validate if there are no phantoms by walking the scans from scan_set (which we don't even have yet) + ** - a phantom is a version that became visible in the middle of our transaction, + ** but wasn't taken into account during one of the scans from the scan_set + ** 3. Wait for commit dependencies, which we don't even track yet... + ** Excerpt from what's a commit dependency and how it's tracked in the original paper: + ** """ + A transaction T1 has a commit dependency on another transaction + T2, if T1 is allowed to commit only if T2 commits. If T2 aborts, + T1 must also abort, so cascading aborts are possible. T1 acquires a + commit dependency either by speculatively reading or speculatively ignoring a version, + instead of waiting for T2 to commit. + We implement commit dependencies by a register-and-report + approach: T1 registers its dependency with T2 and T2 informs T1 + when it has committed or aborted. Each transaction T contains a + counter, CommitDepCounter, that counts how many unresolved + commit dependencies it still has. A transaction cannot commit + until this counter is zero. In addition, T has a Boolean variable + AbortNow that other transactions can set to tell T to abort. Each + transaction T also has a set, CommitDepSet, that stores transaction IDs + of the transactions that depend on T. + To take a commit dependency on a transaction T2, T1 increments + its CommitDepCounter and adds its transaction ID to T2’s CommitDepSet. + When T2 has committed, it locates each transaction in + its CommitDepSet and decrements their CommitDepCounter. If + T2 aborted, it tells the dependent transactions to also abort by + setting their AbortNow flags. If a dependent transaction is not + found, this means that it has already aborted. + Note that a transaction with commit dependencies may not have to + wait at all - the dependencies may have been resolved before it is + ready to commit. Commit dependencies consolidate all waits into + a single wait and postpone the wait to just before commit. + Some transactions may have to wait before commit. + Waiting raises a concern of deadlocks. + However, deadlocks cannot occur because an older transaction never + waits on a younger transaction. In + a wait-for graph the direction of edges would always be from a + younger transaction (higher end timestamp) to an older transaction + (lower end timestamp) so cycles are impossible. + """ + ** If you're wondering when a speculative read happens, here you go: + ** Case 1: speculative read of TB: + """ + If transaction TB is in the Preparing state, it has acquired an end + timestamp TS which will be V’s begin timestamp if TB commits. + A safe approach in this situation would be to have transaction T + wait until transaction TB commits. However, we want to avoid all + blocking during normal processing so instead we continue with + the visibility test and, if the test returns true, allow T to + speculatively read V. Transaction T acquires a commit dependency on + TB, restricting the serialization order of the two transactions. That + is, T is allowed to commit only if TB commits. + """ + ** Case 2: speculative ignore of TE: + """ + If TE’s state is Preparing, it has an end timestamp TS that will become + the end timestamp of V if TE does commit. If TS is greater than the read + time RT, it is obvious that V will be visible if TE commits. If TE + aborts, V will still be visible, because any transaction that updates + V after TE has aborted will obtain an end timestamp greater than + TS. If TS is less than RT, we have a more complicated situation: + if TE commits, V will not be visible to T but if TE aborts, it will + be visible. We could handle this by forcing T to wait until TE + commits or aborts but we want to avoid all blocking during normal processing. + Instead we allow T to speculatively ignore V and + proceed with its processing. Transaction T acquires a commit + dependency (see Section 2.7) on TE, that is, T is allowed to commit + only if TE commits. + """ + */ + tx.state.store(TransactionState::Committed(end_ts)); + tracing::trace!("COMMIT {tx}"); + let tx_begin_ts = tx.begin_ts; + let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); + drop(tx); + // Postprocessing: inserting row versions and logging the transaction to persistent storage. + // TODO: we should probably save to persistent storage first, and only then update the in-memory structures. + let mut log_record: LogRecord = LogRecord::new(end_ts); + for ref id in write_set { + if let Some(row_versions) = self.rows.get(id) { + let mut row_versions = row_versions.value().write().unwrap(); + for row_version in row_versions.iter_mut() { + if let TxTimestampOrID::TxID(id) = row_version.begin { + if id == tx_id { + row_version.begin = TxTimestampOrID::Timestamp(tx_begin_ts); + self.insert_version_raw( + &mut log_record.row_versions, + row_version.clone(), + ); // FIXME: optimize cloning out + } + } + if let Some(TxTimestampOrID::TxID(id)) = row_version.end { + if id == tx_id { + row_version.end = Some(TxTimestampOrID::Timestamp(end_ts)); + self.insert_version_raw( + &mut log_record.row_versions, + row_version.clone(), + ); // FIXME: optimize cloning out + } + } + } + } + } + tracing::trace!("UPDATED TX{tx_id}"); + // We have now updated all the versions with a reference to the + // transaction ID to a timestamp and can, therefore, remove the + // transaction. Please note that when we move to lockless, the + // invariant doesn't necessarily hold anymore because another thread + // might have speculatively read a version that we want to remove. + // But that's a problem for another day. + // FIXME: it actually just become a problem for today!!! + // TODO: test that reproduces this failure, and then a fix + self.txs.remove(&tx_id); + if !log_record.row_versions.is_empty() { + self.storage.log_tx(log_record)?; + } + tracing::trace!("LOGGED {tx_id}"); + Ok(()) + } + + /// Rolls back a transaction with the specified ID. + /// + /// This function rolls back a transaction with the specified `tx_id` by + /// discarding any changes made by the transaction. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to abort. + pub fn rollback_tx(&self, tx_id: TxID) { + let tx_unlocked = self.txs.get(&tx_id).unwrap(); + let tx = tx_unlocked.value().write().unwrap(); + assert_eq!(tx.state, TransactionState::Active); + tx.state.store(TransactionState::Aborted); + tracing::trace!("ABORT {tx}"); + let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); + drop(tx); + + for ref id in write_set { + if let Some(row_versions) = self.rows.get(id) { + let mut row_versions = row_versions.value().write().unwrap(); + row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id)); + if row_versions.is_empty() { + self.rows.remove(id); + } + } + } + + let tx = tx_unlocked.value().read().unwrap(); + tx.state.store(TransactionState::Terminated); + tracing::trace!("TERMINATE {tx}"); + // FIXME: verify that we can already remove the transaction here! + // Maybe it's fine for snapshot isolation, but too early for serializable? + self.txs.remove(&tx_id); + } + + /// Generates next unique transaction id + pub fn get_tx_id(&self) -> u64 { + self.tx_ids.fetch_add(1, Ordering::SeqCst) + } + + /// Gets current timestamp + pub fn get_timestamp(&self) -> u64 { + self.clock.get_timestamp() + } + + /// Removes unused row versions with very loose heuristics, + /// which sometimes leaves versions intact for too long. + /// Returns the number of removed versions. + pub fn drop_unused_row_versions(&self) -> usize { + tracing::trace!( + "Dropping unused row versions. Database stats: transactions: {}; rows: {}", + self.txs.len(), + self.rows.len() + ); + let mut dropped = 0; + let mut to_remove = Vec::new(); + for entry in self.rows.iter() { + let mut row_versions = entry.value().write().unwrap(); + row_versions.retain(|rv| { + // FIXME: should take rv.begin into account as well + let should_stay = match rv.end { + Some(TxTimestampOrID::Timestamp(version_end_ts)) => { + // a transaction started before this row version ended, ergo row version is needed + // NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable + self.txs.iter().any(|tx| { + let tx = tx.value().read().unwrap(); + // FIXME: verify! + match tx.state.load() { + TransactionState::Active | TransactionState::Preparing => { + version_end_ts > tx.begin_ts + } + _ => false, + } + }) + } + // Let's skip potentially complex logic if the transafction is still + // active/tracked. We will drop the row version when the transaction + // gets garbage-collected itself, it will always happen eventually. + Some(TxTimestampOrID::TxID(tx_id)) => !self.txs.contains_key(&tx_id), + // this row version is current, ergo visible + None => true, + }; + if !should_stay { + dropped += 1; + tracing::trace!( + "Dropping row version {:?} {:?}-{:?}", + entry.key(), + rv.begin, + rv.end + ); + } + should_stay + }); + if row_versions.is_empty() { + to_remove.push(*entry.key()); + } + } + for id in to_remove { + self.rows.remove(&id); + } + dropped + } + + pub fn recover(&self) -> Result<()> { + let tx_log = self.storage.read_tx_log()?; + for record in tx_log { + tracing::debug!("RECOVERING {:?}", record); + for version in record.row_versions { + self.insert_version(version.row.id, version); + } + self.clock.reset(record.tx_timestamp); + } + Ok(()) + } +} + +/// A write-write conflict happens when transaction T_m attempts to update a +/// row version that is currently being updated by an active transaction T_n. +pub(crate) fn is_write_write_conflict( + txs: &SkipMap>, + tx: &Transaction, + rv: &RowVersion, +) -> bool { + match rv.end { + Some(TxTimestampOrID::TxID(rv_end)) => { + let te = txs.get(&rv_end).unwrap(); + let te = te.value().read().unwrap(); + match te.state.load() { + TransactionState::Active | TransactionState::Preparing => tx.tx_id != te.tx_id, + _ => false, + } + } + Some(TxTimestampOrID::Timestamp(_)) => false, + None => false, + } +} + +pub(crate) fn is_version_visible( + txs: &SkipMap>, + tx: &Transaction, + rv: &RowVersion, +) -> bool { + is_begin_visible(txs, tx, rv) && is_end_visible(txs, tx, rv) +} + +fn is_begin_visible( + txs: &SkipMap>, + tx: &Transaction, + rv: &RowVersion, +) -> bool { + match rv.begin { + TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts, + TxTimestampOrID::TxID(rv_begin) => { + let tb = txs.get(&rv_begin).unwrap(); + let tb = tb.value().read().unwrap(); + let visible = match tb.state.load() { + TransactionState::Active => tx.tx_id == tb.tx_id && rv.end.is_none(), + TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable! + TransactionState::Committed(committed_ts) => tx.begin_ts >= committed_ts, + TransactionState::Aborted => false, + TransactionState::Terminated => { + tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now"); + false + } + }; + tracing::trace!( + "is_begin_visible: tx={tx}, tb={tb} rv = {:?}-{:?} visible = {visible}", + rv.begin, + rv.end + ); + visible + } + } +} + +fn is_end_visible( + txs: &SkipMap>, + tx: &Transaction, + rv: &RowVersion, +) -> bool { + match rv.end { + Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts, + Some(TxTimestampOrID::TxID(rv_end)) => { + let te = txs.get(&rv_end).unwrap(); + let te = te.value().read().unwrap(); + let visible = match te.state.load() { + TransactionState::Active => tx.tx_id != te.tx_id, + TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable! + TransactionState::Committed(committed_ts) => tx.begin_ts < committed_ts, + TransactionState::Aborted => false, + TransactionState::Terminated => { + tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now"); + false + } + }; + tracing::trace!( + "is_end_visible: tx={tx}, te={te} rv = {:?}-{:?} visible = {visible}", + rv.begin, + rv.end + ); + visible + } + None => true, + } +} diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs new file mode 100644 index 000000000..741ada4cb --- /dev/null +++ b/core/mvcc/database/tests.rs @@ -0,0 +1,760 @@ +use super::*; +use crate::mvcc::clock::LocalClock; + +fn test_db() -> Database { + let clock = LocalClock::new(); + let storage = crate::mvcc::persistent_storage::Storage::new_noop(); + Database::new(clock, storage) +} + +#[test] +fn test_insert_read() { + let db = test_db(); + + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); + db.commit_tx(tx1).unwrap(); + + let tx2 = db.begin_tx(); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); +} + +#[test] +fn test_read_nonexistent() { + let db = test_db(); + let tx = db.begin_tx(); + let row = db.read( + tx, + RowID { + table_id: 1, + row_id: 1, + }, + ); + assert!(row.unwrap().is_none()); +} + +#[test] +fn test_delete() { + let db = test_db(); + + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); + db.delete( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + assert!(row.is_none()); + db.commit_tx(tx1).unwrap(); + + let tx2 = db.begin_tx(); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + assert!(row.is_none()); +} + +#[test] +fn test_delete_nonexistent() { + let db = test_db(); + let tx = db.begin_tx(); + assert!(!db + .delete( + tx, + RowID { + table_id: 1, + row_id: 1 + } + ) + .unwrap()); +} + +#[test] +fn test_commit() { + let db = test_db(); + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); + let tx1_updated_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "World".to_string(), + }; + db.update(tx1, tx1_updated_row.clone()).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_updated_row, row); + db.commit_tx(tx1).unwrap(); + + let tx2 = db.begin_tx(); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + db.commit_tx(tx2).unwrap(); + assert_eq!(tx1_updated_row, row); + db.drop_unused_row_versions(); +} + +#[test] +fn test_rollback() { + let db = test_db(); + let tx1 = db.begin_tx(); + let row1 = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, row1.clone()).unwrap(); + let row2 = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(row1, row2); + let row3 = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "World".to_string(), + }; + db.update(tx1, row3.clone()).unwrap(); + let row4 = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(row3, row4); + db.rollback_tx(tx1); + let tx2 = db.begin_tx(); + let row5 = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + assert_eq!(row5, None); +} + +#[test] +fn test_dirty_write() { + let db = test_db(); + + // T1 inserts a row with ID 1, but does not commit. + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); + + // T2 attempts to delete row with ID 1, but fails because T1 has not committed. + let tx2 = db.begin_tx(); + let tx2_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "World".to_string(), + }; + assert!(!db.update(tx2, tx2_row).unwrap()); + + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); +} + +#[test] +fn test_dirty_read() { + let db = test_db(); + + // T1 inserts a row with ID 1, but does not commit. + let tx1 = db.begin_tx(); + let row1 = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, row1).unwrap(); + + // T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed. + let tx2 = db.begin_tx(); + let row2 = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + assert_eq!(row2, None); +} + +#[test] +fn test_dirty_read_deleted() { + let db = test_db(); + + // T1 inserts a row with ID 1 and commits. + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + db.commit_tx(tx1).unwrap(); + + // T2 deletes row with ID 1, but does not commit. + let tx2 = db.begin_tx(); + assert!(db + .delete( + tx2, + RowID { + table_id: 1, + row_id: 1 + } + ) + .unwrap()); + + // T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed. + let tx3 = db.begin_tx(); + let row = db + .read( + tx3, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); +} + +#[test] +fn test_fuzzy_read() { + let db = test_db(); + + // T1 inserts a row with ID 1 and commits. + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); + db.commit_tx(tx1).unwrap(); + + // T2 reads the row with ID 1 within an active transaction. + let tx2 = db.begin_tx(); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); + + // T3 updates the row and commits. + let tx3 = db.begin_tx(); + let tx3_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "World".to_string(), + }; + db.update(tx3, tx3_row).unwrap(); + db.commit_tx(tx3).unwrap(); + + // T2 still reads the same version of the row as before. + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); +} + +#[test] +fn test_lost_update() { + let db = test_db(); + + // T1 inserts a row with ID 1 and commits. + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); + db.commit_tx(tx1).unwrap(); + + // T2 attempts to update row ID 1 within an active transaction. + let tx2 = db.begin_tx(); + let tx2_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "World".to_string(), + }; + assert!(db.update(tx2, tx2_row.clone()).unwrap()); + + // T3 also attempts to update row ID 1 within an active transaction. + let tx3 = db.begin_tx(); + let tx3_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "Hello, world!".to_string(), + }; + assert_eq!( + Err(DatabaseError::WriteWriteConflict), + db.update(tx3, tx3_row) + ); + + db.commit_tx(tx2).unwrap(); + assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3)); + + let tx4 = db.begin_tx(); + let row = db + .read( + tx4, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx2_row, row); +} + +// Test for the visibility to check if a new transaction can see old committed values. +// This test checks for the typo present in the paper, explained in https://github.com/penberg/mvcc-rs/issues/15 +#[test] +fn test_committed_visibility() { + let db = test_db(); + + // let's add $10 to my account since I like money + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "10".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + db.commit_tx(tx1).unwrap(); + + // but I like more money, so let me try adding $10 more + let tx2 = db.begin_tx(); + let tx2_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "20".to_string(), + }; + assert!(db.update(tx2, tx2_row.clone()).unwrap()); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(row, tx2_row); + + // can I check how much money I have? + let tx3 = db.begin_tx(); + let row = db + .read( + tx3, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(tx1_row, row); +} + +// Test to check if a older transaction can see (un)committed future rows +#[test] +fn test_future_row() { + let db = test_db(); + + let tx1 = db.begin_tx(); + + let tx2 = db.begin_tx(); + let tx2_row = Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "10".to_string(), + }; + db.insert(tx2, tx2_row).unwrap(); + + // transaction in progress, so tx1 shouldn't be able to see the value + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + assert_eq!(row, None); + + // lets commit the transaction and check if tx1 can see it + db.commit_tx(tx2).unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + assert_eq!(row, None); +} + +/* States described in the Hekaton paper *for serializability*: + +Table 1: Case analysis of action to take when version V’s +Begin field contains the ID of transaction TB +------------------------------------------------------------------------------------------------------ +TB’s state | TB’s end timestamp | Action to take when transaction T checks visibility of version V. +------------------------------------------------------------------------------------------------------ +Active | Not set | V is visible only if TB=T and V’s end timestamp equals infinity. +------------------------------------------------------------------------------------------------------ +Preparing | TS | V’s begin timestamp will be TS ut V is not yet committed. Use TS + | as V’s begin time when testing visibility. If the test is true, + | allow T to speculatively read V. Committed TS V’s begin timestamp + | will be TS and V is committed. Use TS as V’s begin time to test + | visibility. +------------------------------------------------------------------------------------------------------ +Committed | TS | V’s begin timestamp will be TS and V is committed. Use TS as V’s + | begin time to test visibility. +------------------------------------------------------------------------------------------------------ +Aborted | Irrelevant | Ignore V; it’s a garbage version. +------------------------------------------------------------------------------------------------------ +Terminated | Irrelevant | Reread V’s Begin field. TB has terminated so it must have finalized +or not found | | the timestamp. +------------------------------------------------------------------------------------------------------ + +Table 2: Case analysis of action to take when V's End field +contains a transaction ID TE. +------------------------------------------------------------------------------------------------------ +TE’s state | TE’s end timestamp | Action to take when transaction T checks visibility of a version V + | | as of read time RT. +------------------------------------------------------------------------------------------------------ +Active | Not set | V is visible only if TE is not T. +------------------------------------------------------------------------------------------------------ +Preparing | TS | V’s end timestamp will be TS provided that TE commits. If TS > RT, + | V is visible to T. If TS < RT, T speculatively ignores V. +------------------------------------------------------------------------------------------------------ +Committed | TS | V’s end timestamp will be TS and V is committed. Use TS as V’s end + | timestamp when testing visibility. +------------------------------------------------------------------------------------------------------ +Aborted | Irrelevant | V is visible. +------------------------------------------------------------------------------------------------------ +Terminated | Irrelevant | Reread V’s End field. TE has terminated so it must have finalized +or not found | | the timestamp. +*/ + +fn new_tx(tx_id: TxID, begin_ts: u64, state: TransactionState) -> RwLock { + let state = state.into(); + RwLock::new(Transaction { + state, + tx_id, + begin_ts, + write_set: SkipSet::new(), + read_set: SkipSet::new(), + }) +} + +#[test] +fn test_snapshot_isolation_tx_visible1() { + let txs: SkipMap> = SkipMap::from_iter([ + (1, new_tx(1, 1, TransactionState::Committed(2))), + (2, new_tx(2, 2, TransactionState::Committed(5))), + (3, new_tx(3, 3, TransactionState::Aborted)), + (5, new_tx(5, 5, TransactionState::Preparing)), + (6, new_tx(6, 6, TransactionState::Committed(10))), + (7, new_tx(7, 7, TransactionState::Active)), + ]); + + let current_tx = new_tx(4, 4, TransactionState::Preparing); + let current_tx = current_tx.read().unwrap(); + + let rv_visible = |begin: TxTimestampOrID, end: Option| { + let row_version = RowVersion { + begin, + end, + row: Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "testme".to_string(), + }, + }; + tracing::debug!("Testing visibility of {row_version:?}"); + is_version_visible(&txs, ¤t_tx, &row_version) + }; + + // begin visible: transaction committed with ts < current_tx.begin_ts + // end visible: inf + assert!(rv_visible(TxTimestampOrID::TxID(1), None)); + + // begin invisible: transaction committed with ts > current_tx.begin_ts + assert!(!rv_visible(TxTimestampOrID::TxID(2), None)); + + // begin invisible: transaction aborted + assert!(!rv_visible(TxTimestampOrID::TxID(3), None)); + + // begin visible: timestamp < current_tx.begin_ts + // end invisible: transaction committed with ts > current_tx.begin_ts + assert!(!rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(1)) + )); + + // begin visible: timestamp < current_tx.begin_ts + // end visible: transaction committed with ts < current_tx.begin_ts + assert!(rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(2)) + )); + + // begin visible: timestamp < current_tx.begin_ts + // end invisible: transaction aborted + assert!(!rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(3)) + )); + + // begin invisible: transaction preparing + assert!(!rv_visible(TxTimestampOrID::TxID(5), None)); + + // begin invisible: transaction committed with ts > current_tx.begin_ts + assert!(!rv_visible(TxTimestampOrID::TxID(6), None)); + + // begin invisible: transaction active + assert!(!rv_visible(TxTimestampOrID::TxID(7), None)); + + // begin invisible: transaction committed with ts > current_tx.begin_ts + assert!(!rv_visible(TxTimestampOrID::TxID(6), None)); + + // begin invisible: transaction active + assert!(!rv_visible(TxTimestampOrID::TxID(7), None)); + + // begin visible: timestamp < current_tx.begin_ts + // end invisible: transaction preparing + assert!(!rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(5)) + )); + + // begin invisible: timestamp > current_tx.begin_ts + assert!(!rv_visible( + TxTimestampOrID::Timestamp(6), + Some(TxTimestampOrID::TxID(6)) + )); + + // begin visible: timestamp < current_tx.begin_ts + // end visible: some active transaction will eventually overwrite this version, + // but that hasn't happened + // (this is the https://avi.im/blag/2023/hekaton-paper-typo/ case, I believe!) + assert!(rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(7)) + )); +} diff --git a/core/mvcc/errors.rs b/core/mvcc/errors.rs new file mode 100644 index 000000000..6cdad8ca3 --- /dev/null +++ b/core/mvcc/errors.rs @@ -0,0 +1,13 @@ +use thiserror::Error; + +#[derive(Error, Debug, PartialEq)] +pub enum DatabaseError { + #[error("no such transaction ID: `{0}`")] + NoSuchTransactionID(u64), + #[error("transaction aborted because of a write-write conflict")] + WriteWriteConflict, + #[error("transaction is terminated")] + TxTerminated, + #[error("I/O error: {0}")] + Io(String), +} diff --git a/core/mvcc/mod.rs b/core/mvcc/mod.rs new file mode 100644 index 000000000..57e3c717e --- /dev/null +++ b/core/mvcc/mod.rs @@ -0,0 +1,158 @@ +//! Multiversion concurrency control (MVCC) for Rust. +//! +//! This module implements the main memory MVCC method outlined in the paper +//! "High-Performance Concurrency Control Mechanisms for Main-Memory Databases" +//! by Per-Åke Larson et al (VLDB, 2011). +//! +//! ## Data anomalies +//! +//! * A *dirty write* occurs when transaction T_m updates a value that is written by +//! transaction T_n but not yet committed. The MVCC algorithm prevents dirty +//! writes by validating that a row version is visible to transaction T_m before +//! allowing update to it. +//! +//! * A *dirty read* occurs when transaction T_m reads a value that was written by +//! transaction T_n but not yet committed. The MVCC algorithm prevents dirty +//! reads by validating that a row version is visible to transaction T_m. +//! +//! * A *fuzzy read* (non-repeatable read) occurs when transaction T_m reads a +//! different value in the course of the transaction because another +//! transaction T_n has updated the value. +//! +//! * A *lost update* occurs when transactions T_m and T_n both attempt to update +//! the same value, resulting in one of the updates being lost. The MVCC algorithm +//! prevents lost updates by detecting the write-write conflict and letting the +//! first-writer win by aborting the later transaction. +//! +//! TODO: phantom reads, cursor lost updates, read skew, write skew. +//! +//! ## TODO +//! +//! * Optimistic reads and writes +//! * Garbage collection + +pub mod clock; +pub mod cursor; +pub mod database; +pub mod errors; +pub mod persistent_storage; + +#[cfg(test)] +mod tests { + use crate::mvcc::clock::LocalClock; + use crate::mvcc::database::{Database, Row, RowID}; + use std::sync::atomic::AtomicU64; + use std::sync::atomic::Ordering; + use std::sync::Arc; + + static IDS: AtomicU64 = AtomicU64::new(1); + + #[test] + fn test_non_overlapping_concurrent_inserts() { + // Two threads insert to the database concurrently using non-overlapping + // row IDs. + let clock = LocalClock::default(); + let storage = crate::mvcc::persistent_storage::Storage::new_noop(); + let db = Arc::new(Database::new(clock, storage)); + let iterations = 100000; + + let th1 = { + let db = db.clone(); + std::thread::spawn(move || { + for _ in 0..iterations { + let tx = db.begin_tx(); + let id = IDS.fetch_add(1, Ordering::SeqCst); + let id = RowID { + table_id: 1, + row_id: id, + }; + let row = Row { + id, + data: "Hello".to_string(), + }; + db.insert(tx, row.clone()).unwrap(); + db.commit_tx(tx).unwrap(); + let tx = db.begin_tx(); + let committed_row = db.read(tx, id).unwrap(); + db.commit_tx(tx).unwrap(); + assert_eq!(committed_row, Some(row)); + } + }) + }; + let th2 = { + std::thread::spawn(move || { + for _ in 0..iterations { + let tx = db.begin_tx(); + let id = IDS.fetch_add(1, Ordering::SeqCst); + let id = RowID { + table_id: 1, + row_id: id, + }; + let row = Row { + id, + data: "World".to_string(), + }; + db.insert(tx, row.clone()).unwrap(); + db.commit_tx(tx).unwrap(); + let tx = db.begin_tx(); + let committed_row = db.read(tx, id).unwrap(); + db.commit_tx(tx).unwrap(); + assert_eq!(committed_row, Some(row)); + } + }) + }; + th1.join().unwrap(); + th2.join().unwrap(); + } + + #[test] + fn test_overlapping_concurrent_inserts_read_your_writes() { + let clock = LocalClock::default(); + let storage = crate::mvcc::persistent_storage::Storage::new_noop(); + let db = Arc::new(Database::new(clock, storage)); + let iterations = 100000; + + let work = |prefix: &'static str| { + let db = db.clone(); + std::thread::spawn(move || { + let mut failed_upserts = 0; + for i in 0..iterations { + if i % 1000 == 0 { + tracing::debug!("{prefix}: {i}"); + } + if i % 10000 == 0 { + let dropped = db.drop_unused_row_versions(); + tracing::debug!("garbage collected {dropped} versions"); + } + let tx = db.begin_tx(); + let id = i % 16; + let id = RowID { + table_id: 1, + row_id: id, + }; + let row = Row { + id, + data: format!("{prefix} @{tx}"), + }; + if let Err(e) = db.upsert(tx, row.clone()) { + tracing::trace!("upsert failed: {e}"); + failed_upserts += 1; + continue; + } + let committed_row = db.read(tx, id).unwrap(); + db.commit_tx(tx).unwrap(); + assert_eq!(committed_row, Some(row)); + } + tracing::info!( + "{prefix}'s failed upserts: {failed_upserts}/{iterations} {:.2}%", + (failed_upserts * 100) as f64 / iterations as f64 + ); + }) + }; + + let threads = vec![work("A"), work("B"), work("C"), work("D")]; + for th in threads { + th.join().unwrap(); + } + } +} diff --git a/core/mvcc/persistent_storage/mod.rs b/core/mvcc/persistent_storage/mod.rs new file mode 100644 index 000000000..3f9ff2171 --- /dev/null +++ b/core/mvcc/persistent_storage/mod.rs @@ -0,0 +1,32 @@ +use std::fmt::Debug; + +use crate::mvcc::database::{LogRecord, Result}; +use crate::mvcc::errors::DatabaseError; + +#[derive(Debug)] +pub enum Storage { + Noop, +} + +impl Storage { + pub fn new_noop() -> Self { + Self::Noop + } +} + +impl Storage { + pub fn log_tx(&self, _m: LogRecord) -> Result<()> { + match self { + Self::Noop => (), + } + Ok(()) + } + + pub fn read_tx_log(&self) -> Result>> { + match self { + Self::Noop => Err(DatabaseError::Io( + "cannot read from Noop storage".to_string(), + )), + } + } +} diff --git a/docs/internals/mvcc/DESIGN.md b/docs/internals/mvcc/DESIGN.md new file mode 100644 index 000000000..37943d992 --- /dev/null +++ b/docs/internals/mvcc/DESIGN.md @@ -0,0 +1,19 @@ +# Design + +## Persistent storage + +Persistent storage must implement the `Storage` trait that the MVCC module uses for transaction logging. + +Figure 1 shows an example of write-ahead log across three transactions. +The first transaction T0 executes a `INSERT (id) VALUES (1)` statement, which results in a log record with `id` set to `1`, begin timestamp to 0 (which is the transaction ID) and end timestamp as infinity (meaning the row version is still visible). +The second transaction T1 executes another `INSERT` statement, which adds another log record to the transaction log with `id` set to `2`, begin timesstamp to 1 and end timestamp as infinity, similar to what T0 did. +Finally, a third transaction T2 executes two statements: `DELETE WHERE id = 1` and `INSERT (id) VALUES (3)`. The first one results in a log record with `id` set to `1` and begin timestamp set to 0 (which is the transaction that created the entry). However, the end timestamp is now set to 2 (the current transaction), which means the entry is now deleted. +The second statement results in an entry in the transaction log similar to the `INSERT` statements in T0 and T1. + +![Transactions](figures/transactions.png) +

+Figure 1. Transaction log of three transactions. +

+ +When MVCC bootstraps or recovers, it simply redos the transaction log. +If the transaction log grows big, we can checkpoint it it by dropping all entries that are no longer visible after the the latest transaction and create a snapshot. diff --git a/docs/internals/mvcc/figures/transactions.excalidraw b/docs/internals/mvcc/figures/transactions.excalidraw new file mode 100644 index 000000000..cee1947f9 --- /dev/null +++ b/docs/internals/mvcc/figures/transactions.excalidraw @@ -0,0 +1,656 @@ +{ + "type": "excalidraw", + "version": 2, + "source": "https://excalidraw.com", + "elements": [ + { + "id": "tFvpBUMWe3qPFUTQVV14X", + "type": "text", + "x": 233.14035848761839, + "y": 205.73272444200816, + "width": 278.57781982421875, + "height": 25, + "angle": 0, + "strokeColor": "#087f5b", + "backgroundColor": "#82c91e", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "groupIds": [], + "roundness": null, + "seed": 94988319, + "version": 510, + "versionNonce": 1210831775, + "isDeleted": false, + "boundElements": null, + "updated": 1683370319070, + "link": null, + "locked": false, + "text": "", + "fontSize": 20, + "fontFamily": 1, + "textAlign": "left", + "verticalAlign": "top", + "baseline": 18, + "containerId": null, + "originalText": "", + "lineHeight": 1.25 + }, + { + "type": "text", + "version": 515, + "versionNonce": 1881893969, + "isDeleted": false, + "id": "7i88n1PIb89NxUbVQmTTi", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 938.4614491858606, + "y": 311.23272444200813, + "strokeColor": "#0b7285", + "backgroundColor": "#82c91e", + "width": 279.0400085449219, + "height": 25, + "seed": 1123646321, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "", + "lineHeight": 1.25, + "baseline": 18 + }, + { + "type": "text", + "version": 556, + "versionNonce": 153125934, + "isDeleted": false, + "id": "Yh8XLtKqXUUYmcmG4SEXn", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 581.1603475012903, + "y": 256.23272444200813, + "strokeColor": "#e67700", + "backgroundColor": "#82c91e", + "width": 270.71783447265625, + "height": 25, + "seed": 1685524017, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683371076075, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "", + "lineHeight": 1.25, + "baseline": 18 + }, + { + "id": "8l0CCJzCAtOLt_2GRcNpa", + "type": "text", + "x": 256.1403584876185, + "y": 409.73272444200813, + "width": 234.41998291015625, + "height": 75, + "angle": 0, + "strokeColor": "#087f5b", + "backgroundColor": "#82c91e", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "groupIds": [], + "roundness": null, + "seed": 583129809, + "version": 570, + "versionNonce": 561756721, + "isDeleted": false, + "boundElements": null, + "updated": 1683370316909, + "link": null, + "locked": false, + "text": "BEGIN\nINSERT (id) VALUEs (1)\nCOMMIT", + "fontSize": 20, + "fontFamily": 1, + "textAlign": "left", + "verticalAlign": "top", + "baseline": 68, + "containerId": null, + "originalText": "BEGIN\nINSERT (id) VALUEs (1)\nCOMMIT", + "lineHeight": 1.25 + }, + { + "type": "text", + "version": 628, + "versionNonce": 282656095, + "isDeleted": false, + "id": "3m7VluAP5tair6-60b_sp", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 962.0903554358606, + "y": 416.23272444200813, + "strokeColor": "#0b7285", + "backgroundColor": "#82c91e", + "width": 243.91998291015625, + "height": 100, + "seed": 479705617, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "BEGIN\nDELETE WHERE id =1\nINSERT (id) VALUES (3)\nCOMMIT", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "BEGIN\nDELETE WHERE id =1\nINSERT (id) VALUES (3)\nCOMMIT", + "lineHeight": 1.25, + "baseline": 93 + }, + { + "type": "text", + "version": 574, + "versionNonce": 1128746001, + "isDeleted": false, + "id": "Z-Mh1kti2oC6sIMnuGluo", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 613.0903554358607, + "y": 417.23272444200813, + "strokeColor": "#e67700", + "backgroundColor": "#82c91e", + "width": 243.239990234375, + "height": 75, + "seed": 580440625, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "BEGIN\nINSERT (id) VALUEs (2)\nCOMMIT", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "BEGIN\nINSERT (id) VALUEs (2)\nCOMMIT", + "lineHeight": 1.25, + "baseline": 68 + }, + { + "type": "line", + "version": 1502, + "versionNonce": 1835608607, + "isDeleted": false, + "id": "VuJNZCgz1Y0WEWwug7pGk", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 226.3083636621349, + "y": 173.11701218356845, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 20.336010349032712, + "height": 203.23377930246647, + "seed": 1879839231, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "startBinding": null, + "endBinding": null, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": null, + "points": [ + [ + 0, + 0 + ], + [ + -20.264781987976257, + -0.0011773927935071482 + ], + [ + -20.336010349032712, + 203.23260190967298 + ], + [ + -0.07239358683375485, + 203.135377672515 + ] + ] + }, + { + "type": "line", + "version": 1755, + "versionNonce": 1487752017, + "isDeleted": false, + "id": "GpZg3Rw4Hszxzxf38Q4Hn", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 3.141592653589793, + "x": 539.3083636621348, + "y": 178.11701218356845, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 20.336010349032712, + "height": 203.23377930246647, + "seed": 470135121, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "startBinding": null, + "endBinding": null, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": null, + "points": [ + [ + 0, + 0 + ], + [ + -20.264781987976257, + -0.0011773927935071482 + ], + [ + -20.336010349032712, + 203.23260190967298 + ], + [ + -0.07239358683375485, + 203.135377672515 + ] + ] + }, + { + "type": "text", + "version": 528, + "versionNonce": 1276939839, + "isDeleted": false, + "id": "AGEyNvBxBm2cwm1WRW8n8", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 576.6403584876185, + "y": 210.23272444200816, + "strokeColor": "#087f5b", + "backgroundColor": "#82c91e", + "width": 278.57781982421875, + "height": 25, + "seed": 877528401, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "", + "lineHeight": 1.25, + "baseline": 18 + }, + { + "type": "line", + "version": 1557, + "versionNonce": 773679889, + "isDeleted": false, + "id": "Q8E0gAcLvq6VXqMDZhLdA", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 581.8083636621351, + "y": 177.61701218356845, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 20.336010349032712, + "height": 203.23377930246647, + "seed": 153279217, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "startBinding": null, + "endBinding": null, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": null, + "points": [ + [ + 0, + 0 + ], + [ + -20.264781987976257, + -0.0011773927935071482 + ], + [ + -20.336010349032712, + 203.23260190967298 + ], + [ + -0.07239358683375485, + 203.135377672515 + ] + ] + }, + { + "type": "line", + "version": 1810, + "versionNonce": 1561283199, + "isDeleted": false, + "id": "uhh3ZkPO6bwwf0-AI8syI", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 3.141592653589793, + "x": 894.8083636621349, + "y": 182.61701218356845, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 20.336010349032712, + "height": 203.23377930246647, + "seed": 315380945, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "startBinding": null, + "endBinding": null, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": null, + "points": [ + [ + 0, + 0 + ], + [ + -20.264781987976257, + -0.0011773927935071482 + ], + [ + -20.336010349032712, + 203.23260190967298 + ], + [ + -0.07239358683375485, + 203.135377672515 + ] + ] + }, + { + "type": "text", + "version": 575, + "versionNonce": 910156017, + "isDeleted": false, + "id": "jI5YKyaOdGYYKiBWZmCMs", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 929.6403584876182, + "y": 215.23272444200813, + "strokeColor": "#087f5b", + "backgroundColor": "#82c91e", + "width": 278.57781982421875, + "height": 25, + "seed": 121503167, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "", + "lineHeight": 1.25, + "baseline": 18 + }, + { + "type": "line", + "version": 1604, + "versionNonce": 19920575, + "isDeleted": false, + "id": "QqIk7VTnRWYq499wkttvv", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 934.8083636621348, + "y": 182.61701218356842, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 20.336010349032712, + "height": 203.23377930246647, + "seed": 2012037663, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "startBinding": null, + "endBinding": null, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": null, + "points": [ + [ + 0, + 0 + ], + [ + -20.264781987976257, + -0.0011773927935071482 + ], + [ + -20.336010349032712, + 203.23260190967298 + ], + [ + -0.07239358683375485, + 203.135377672515 + ] + ] + }, + { + "type": "line", + "version": 1857, + "versionNonce": 1660885169, + "isDeleted": false, + "id": "gk89VsYpnf9Jby9KEUBd3", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 3.141592653589793, + "x": 1247.808363662135, + "y": 187.61701218356842, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 20.336010349032712, + "height": 203.23377930246647, + "seed": 509453887, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370316909, + "link": null, + "locked": false, + "startBinding": null, + "endBinding": null, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": null, + "points": [ + [ + 0, + 0 + ], + [ + -20.264781987976257, + -0.0011773927935071482 + ], + [ + -20.336010349032712, + 203.23260190967298 + ], + [ + -0.07239358683375485, + 203.135377672515 + ] + ] + }, + { + "type": "text", + "version": 620, + "versionNonce": 1588681010, + "isDeleted": false, + "id": "a1c-iZI0SafCiy0u4xieZ", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 934.3714375891809, + "y": 261.23272444200813, + "strokeColor": "#e67700", + "backgroundColor": "#82c91e", + "width": 270.71783447265625, + "height": 25, + "seed": 1742829553, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683371080181, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "", + "lineHeight": 1.25, + "baseline": 18 + }, + { + "type": "text", + "version": 564, + "versionNonce": 1968863633, + "isDeleted": false, + "id": "hdhhgp5nA06o5EcSgHQE8", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 1, + "opacity": 100, + "angle": 0, + "x": 937.6203542151575, + "y": 354.23272444200813, + "strokeColor": "#0b7285", + "backgroundColor": "#82c91e", + "width": 287.73785400390625, + "height": 25, + "seed": 309558367, + "groupIds": [], + "roundness": null, + "boundElements": [], + "updated": 1683370363648, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 1, + "text": "", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "", + "lineHeight": 1.25, + "baseline": 18 + } + ], + "appState": { + "gridSize": null, + "viewBackgroundColor": "#ffffff" + }, + "files": {} +} \ No newline at end of file diff --git a/docs/internals/mvcc/figures/transactions.png b/docs/internals/mvcc/figures/transactions.png new file mode 100644 index 000000000..3b8fe59bc Binary files /dev/null and b/docs/internals/mvcc/figures/transactions.png differ