From 02f40c05686751465132eec4ce1de4dd55e34e91 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Sun, 9 Apr 2023 08:55:06 +0300 Subject: [PATCH] Move MVCC to database.rs Let's keep lib.rs small and tidy. --- core/mvcc/database/src/database.rs | 642 ++++++++++++++++++++++++++++ core/mvcc/database/src/lib.rs | 644 +---------------------------- 2 files changed, 643 insertions(+), 643 deletions(-) create mode 100644 core/mvcc/database/src/database.rs diff --git a/core/mvcc/database/src/database.rs b/core/mvcc/database/src/database.rs new file mode 100644 index 000000000..cdebed823 --- /dev/null +++ b/core/mvcc/database/src/database.rs @@ -0,0 +1,642 @@ +use crate::errors::DatabaseError; +use std::cell::RefCell; +use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; + +type Result = std::result::Result; + +#[derive(Clone, Debug, PartialEq)] +pub struct Row { + pub id: u64, + pub data: String, +} + +/// A row version. +#[derive(Clone, Debug)] +struct RowVersion { + begin: TxTimestampOrID, + end: Option, + row: Row, +} + +/// A transaction timestamp or ID. +/// +/// Versions either track a timestamp or a transaction ID, depending on the +/// phase of the transaction. During the active phase, new versions track the +/// transaction ID in the `begin` and `end` fields. After a transaction commits, +/// versions switch to tracking timestamps. +#[derive(Clone, Debug, PartialEq)] +enum TxTimestampOrID { + Timestamp(u64), + TxID(u64), +} + +/// Transaction +#[derive(Debug, Clone)] +pub struct Transaction { + /// The state of the transaction. + state: TransactionState, + /// The transaction ID. + tx_id: u64, + /// The transaction begin timestamp. + begin_ts: u64, + /// The transaction write set. + write_set: HashSet, + /// The transaction read set. + read_set: RefCell>, +} + +impl Transaction { + fn new(tx_id: u64, begin_ts: u64) -> Transaction { + Transaction { + state: TransactionState::Active, + tx_id, + begin_ts, + write_set: HashSet::new(), + read_set: RefCell::new(HashSet::new()), + } + } + + fn insert_to_read_set(&self, id: u64) { + let mut read_set = self.read_set.borrow_mut(); + read_set.insert(id); + } + + fn insert_to_write_set(&mut self, id: u64) { + self.write_set.insert(id); + } +} + +/// Transaction state. +#[derive(Debug, Clone)] +enum TransactionState { + Active, + Preparing, + Committed, + Aborted, + Terminated, +} + +/// A database with MVCC. +#[derive(Debug)] +pub struct Database { + inner: Arc>>, +} + +type TxID = u64; + +/// Logical clock. +pub trait LogicalClock { + fn get_timestamp(&self) -> u64; +} + +/// A node-local clock backed by an atomic counter. +#[derive(Debug, Default)] +pub struct LocalClock { + ts_sequence: AtomicU64, +} + +impl LocalClock { + pub fn new() -> Self { + Self { + ts_sequence: AtomicU64::new(0), + } + } +} + +impl LogicalClock for LocalClock { + fn get_timestamp(&self) -> u64 { + self.ts_sequence.fetch_add(1, Ordering::SeqCst) + } +} + +#[derive(Debug)] +pub struct DatabaseInner { + rows: RefCell>>, + txs: RefCell>, + tx_ids: AtomicU64, + clock: Clock, +} + +impl Database { + /// Creates a new database. + pub fn new(clock: Clock) -> Self { + let inner = DatabaseInner { + rows: RefCell::new(HashMap::new()), + txs: RefCell::new(HashMap::new()), + tx_ids: AtomicU64::new(0), + clock, + }; + Self { + inner: Arc::new(Mutex::new(inner)), + } + } + + /// Inserts a new row into the database. + /// + /// This function inserts a new `row` into the database within the context + /// of the transaction `tx_id`. + /// + /// # Arguments + /// + /// * `tx_id` - the ID of the transaction in which to insert the new row. + /// * `row` - the row object containing the values to be inserted. + /// + pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { + let inner = self.inner.lock().unwrap(); + let mut txs = inner.txs.borrow_mut(); + let tx = txs + .get_mut(&tx_id) + .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + let id = row.id; + let row_version = RowVersion { + begin: TxTimestampOrID::TxID(tx.tx_id), + end: None, + row, + }; + let mut rows = inner.rows.borrow_mut(); + rows.entry(id).or_insert_with(Vec::new).push(row_version); + tx.insert_to_write_set(id); + Ok(()) + } + + /// Updates a row in the database with new values. + /// + /// This function updates an existing row in the database within the + /// context of the transaction `tx_id`. The `row` argument identifies the + /// row to be updated as `id` and contains the new values to be inserted. + /// + /// If the row identified by the `id` does not exist, this function does + /// nothing and returns `false`. Otherwise, the function updates the row + /// with the new values and returns `true`. + /// + /// # Arguments + /// + /// * `tx_id` - the ID of the transaction in which to update the new row. + /// * `row` - the row object containing the values to be updated. + /// + /// # Returns + /// + /// Returns `true` if the row was successfully updated, and `false` otherwise. + pub fn update(&self, tx_id: TxID, row: Row) -> Result { + if !self.delete(tx_id, row.id)? { + return Ok(false); + } + self.insert(tx_id, row)?; + Ok(true) + } + + /// Deletes a row from the table with the given `id`. + /// + /// This function deletes an existing row `id` in the database within the + /// context of the transaction `tx_id`. + /// + /// # Arguments + /// + /// * `tx_id` - the ID of the transaction in which to delete the new row. + /// * `id` - the ID of the row to delete. + /// + /// # Returns + /// + /// Returns `true` if the row was successfully deleted, and `false` otherwise. + /// + pub fn delete(&self, tx: TxID, id: u64) -> Result { + let inner = self.inner.lock().unwrap(); + let mut rows = inner.rows.borrow_mut(); + let mut txs = inner.txs.borrow_mut(); + match rows.get_mut(&id) { + Some(row_versions) => match row_versions.last_mut() { + Some(v) => { + let tx = txs.get(&tx).ok_or(DatabaseError::NoSuchTransactionID(tx))?; + if is_version_visible(&txs, tx, v) { + v.end = Some(TxTimestampOrID::TxID(tx.tx_id)); + } else { + return Ok(false); + } + } + None => unreachable!("no versions for row {}", id), + }, + None => return Ok(false), + } + let tx = txs + .get_mut(&tx) + .ok_or(DatabaseError::NoSuchTransactionID(tx))?; + tx.insert_to_write_set(id); + Ok(true) + } + + /// Retrieves a row from the table with the given `id`. + /// + /// This operation is performed within the scope of the transaction identified + /// by `tx_id`. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to perform the read operation in. + /// * `id` - The ID of the row to retrieve. + /// + /// # Returns + /// + /// Returns `Some(row)` with the row data if the row with the given `id` exists, + /// and `None` otherwise. + pub fn read(&self, tx_id: TxID, id: u64) -> Result> { + let inner = self.inner.lock().unwrap(); + let txs = inner.txs.borrow_mut(); + let tx = txs.get(&tx_id).unwrap(); + let rows = inner.rows.borrow(); + if let Some(row_versions) = rows.get(&id) { + for rv in row_versions.iter().rev() { + if is_version_visible(&txs, tx, rv) { + tx.insert_to_read_set(id); + return Ok(Some(rv.row.clone())); + } + } + } + Ok(None) + } + + /// Begins a new transaction in the database. + /// + /// This function starts a new transaction in the database and returns a `TxID` value + /// that you can use to perform operations within the transaction. All changes made within the + /// transaction are isolated from other transactions until you commit the transaction. + pub fn begin_tx(&self) -> TxID { + let mut inner = self.inner.lock().unwrap(); + let tx_id = get_tx_id(&mut inner); + let begin_ts = get_timestamp(&mut inner); + let tx = Transaction::new(tx_id, begin_ts); + let mut txs = inner.txs.borrow_mut(); + txs.insert(tx_id, tx); + tx_id + } + + /// Commits a transaction with the specified transaction ID. + /// + /// This function commits the changes made within the specified transaction and finalizes the + /// transaction. Once a transaction has been committed, all changes made within the transaction + /// are visible to other transactions that access the same data. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to commit. + pub fn commit_tx(&self, tx_id: TxID) { + let mut inner = self.inner.lock().unwrap(); + let end_ts = get_timestamp(&mut inner); + let mut txs = inner.txs.borrow_mut(); + let mut tx = txs.get_mut(&tx_id).unwrap(); + let mut rows = inner.rows.borrow_mut(); + tx.state = TransactionState::Preparing; + for id in &tx.write_set { + if let Some(row_versions) = rows.get_mut(id) { + for row_version in row_versions.iter_mut() { + if let TxTimestampOrID::TxID(id) = row_version.begin { + if id == tx_id { + row_version.begin = TxTimestampOrID::Timestamp(tx.begin_ts); + } + } + if let Some(TxTimestampOrID::TxID(id)) = row_version.end { + if id == tx_id { + row_version.end = Some(TxTimestampOrID::Timestamp(end_ts)); + } + } + } + } + } + tx.state = TransactionState::Committed; + } + + /// Rolls back a transaction with the specified ID. + /// + /// This function rolls back a transaction with the specified `tx_id` by + /// discarding any changes made by the transaction. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to abort. + pub fn rollback_tx(&self, tx_id: TxID) { + let inner = self.inner.lock().unwrap(); + let mut txs = inner.txs.borrow_mut(); + let mut tx = txs.get_mut(&tx_id).unwrap(); + tx.state = TransactionState::Aborted; + let mut rows = inner.rows.borrow_mut(); + for id in &tx.write_set { + if let Some(row_versions) = rows.get_mut(id) { + row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id)); + if row_versions.is_empty() { + rows.remove(id); + } + } + } + tx.state = TransactionState::Terminated; + } +} + +fn is_version_visible(txs: &HashMap, tx: &Transaction, rv: &RowVersion) -> bool { + is_begin_visible(txs, tx, rv) && is_end_visible(txs, tx, rv) +} + +fn is_begin_visible(txs: &HashMap, tx: &Transaction, rv: &RowVersion) -> bool { + match rv.begin { + TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts, + TxTimestampOrID::TxID(rv_begin) => { + let tb = txs.get(&rv_begin).unwrap(); + match tb.state { + TransactionState::Active => tx.tx_id == tb.tx_id && rv.end.is_none(), + TransactionState::Preparing => todo!(), + TransactionState::Committed => todo!(), + TransactionState::Aborted => todo!(), + TransactionState::Terminated => todo!(), + } + } + } +} + +fn is_end_visible(txs: &HashMap, tx: &Transaction, rv: &RowVersion) -> bool { + match rv.end { + Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts, + Some(TxTimestampOrID::TxID(rv_end)) => { + let te = txs.get(&rv_end).unwrap(); + match te.state { + TransactionState::Active => tx.tx_id == te.tx_id && rv.end.is_none(), + TransactionState::Preparing => todo!(), + TransactionState::Committed => todo!(), + TransactionState::Aborted => todo!(), + TransactionState::Terminated => todo!(), + } + } + None => true, + } +} + +fn get_tx_id(inner: &mut DatabaseInner) -> u64 { + inner.tx_ids.fetch_add(1, Ordering::SeqCst) +} + +fn get_timestamp(inner: &mut DatabaseInner) -> u64 { + inner.clock.get_timestamp() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_insert_read() { + let clock = LocalClock::default(); + let db = Database::new(clock); + + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: 1, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db.read(tx1, 1).unwrap().unwrap(); + assert_eq!(tx1_row, row); + db.commit_tx(tx1); + + let tx2 = db.begin_tx(); + let row = db.read(tx2, 1).unwrap().unwrap(); + assert_eq!(tx1_row, row); + } + + #[test] + fn test_read_nonexistent() { + let clock = LocalClock::default(); + let db = Database::new(clock); + let tx = db.begin_tx(); + let row = db.read(tx, 1); + assert!(row.unwrap().is_none()); + } + + #[test] + fn test_delete() { + let clock = LocalClock::default(); + let db = Database::new(clock); + + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: 1, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db.read(tx1, 1).unwrap().unwrap(); + assert_eq!(tx1_row, row); + db.delete(tx1, 1).unwrap(); + let row = db.read(tx1, 1).unwrap(); + assert!(row.is_none()); + db.commit_tx(tx1); + + let tx2 = db.begin_tx(); + let row = db.read(tx2, 1).unwrap(); + assert!(row.is_none()); + } + + #[test] + fn test_delete_nonexistent() { + let clock = LocalClock::default(); + let db = Database::new(clock); + let tx = db.begin_tx(); + assert_eq!(false, db.delete(tx, 1).unwrap()); + } + + #[test] + fn test_commit() { + let clock = LocalClock::default(); + let db = Database::new(clock); + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: 1, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db.read(tx1, 1).unwrap().unwrap(); + assert_eq!(tx1_row, row); + let tx1_updated_row = Row { + id: 1, + data: "World".to_string(), + }; + db.update(tx1, tx1_updated_row.clone()).unwrap(); + let row = db.read(tx1, 1).unwrap().unwrap(); + assert_eq!(tx1_updated_row, row); + db.commit_tx(tx1); + + let tx2 = db.begin_tx(); + let row = db.read(tx2, 1).unwrap().unwrap(); + db.commit_tx(tx2); + assert_eq!(tx1_updated_row, row); + } + + #[test] + fn test_rollback() { + let clock = LocalClock::default(); + let db = Database::new(clock); + let tx1 = db.begin_tx(); + let row1 = Row { + id: 1, + data: "Hello".to_string(), + }; + db.insert(tx1.clone(), row1.clone()).unwrap(); + let row2 = db.read(tx1.clone(), 1).unwrap().unwrap(); + assert_eq!(row1, row2); + let row3 = Row { + id: 1, + data: "World".to_string(), + }; + db.update(tx1.clone(), row3.clone()).unwrap(); + let row4 = db.read(tx1.clone(), 1).unwrap().unwrap(); + assert_eq!(row3, row4); + db.rollback_tx(tx1); + let tx2 = db.begin_tx(); + let row5 = db.read(tx2.clone(), 1).unwrap(); + assert_eq!(row5, None); + } + + #[test] + fn test_dirty_write() { + let clock = LocalClock::default(); + let db = Database::new(clock); + + // T1 inserts a row with ID 1, but does not commit. + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: 1, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db.read(tx1.clone(), 1).unwrap().unwrap(); + assert_eq!(tx1_row, row); + + // T2 attempts to delete row with ID 1, but fails because T1 has not committed. + let tx2 = db.begin_tx(); + let tx2_row = Row { + id: 1, + data: "World".to_string(), + }; + assert_eq!(false, db.update(tx2, tx2_row.clone()).unwrap()); + + let row = db.read(tx1, 1).unwrap().unwrap(); + assert_eq!(tx1_row, row); + } + + #[test] + fn test_dirty_read() { + let clock = LocalClock::default(); + let db = Database::new(clock); + + // T1 inserts a row with ID 1, but does not commit. + let tx1 = db.begin_tx(); + let row1 = Row { + id: 1, + data: "Hello".to_string(), + }; + db.insert(tx1, row1.clone()).unwrap(); + + // T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed. + let tx2 = db.begin_tx(); + let row2 = db.read(tx2, 1).unwrap(); + assert_eq!(row2, None); + } + + #[ignore] + #[test] + fn test_dirty_read_deleted() { + let clock = LocalClock::default(); + let db = Database::new(clock); + + // T1 inserts a row with ID 1 and commits. + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: 1, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + db.commit_tx(tx1); + + // T2 deletes row with ID 1, but does not commit. + let tx2 = db.begin_tx(); + assert_eq!(true, db.delete(tx2, 1).unwrap()); + + // T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed. + let tx3 = db.begin_tx(); + let row = db.read(tx3, 1).unwrap().unwrap(); + assert_eq!(tx1_row, row); + } + + #[test] + fn test_fuzzy_read() { + let clock = LocalClock::default(); + let db = Database::new(clock); + + // T1 inserts a row with ID 1 and commits. + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: 1, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db.read(tx1.clone(), 1).unwrap().unwrap(); + assert_eq!(tx1_row, row); + db.commit_tx(tx1); + + // T2 reads the row with ID 1 within an active transaction. + let tx2 = db.begin_tx(); + let row = db.read(tx2, 1).unwrap().unwrap(); + assert_eq!(tx1_row, row); + + // T3 updates the row and commits. + let tx3 = db.begin_tx(); + let tx3_row = Row { + id: 1, + data: "World".to_string(), + }; + db.update(tx3, tx3_row.clone()).unwrap(); + db.commit_tx(tx3); + + // T2 still reads the same version of the row as before. + let row = db.read(tx2, 1).unwrap().unwrap(); + assert_eq!(tx1_row, row); + } + + #[ignore] + #[test] + fn test_lost_update() { + let clock = LocalClock::default(); + let db = Database::new(clock); + + // T1 inserts a row with ID 1 and commits. + let tx1 = db.begin_tx(); + let tx1_row = Row { + id: 1, + data: "Hello".to_string(), + }; + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db.read(tx1.clone(), 1).unwrap().unwrap(); + assert_eq!(tx1_row, row); + db.commit_tx(tx1); + + // T2 attempts to update row ID 1 within an active transaction. + let tx2 = db.begin_tx(); + let tx2_row = Row { + id: 1, + data: "World".to_string(), + }; + db.update(tx2, tx2_row.clone()).unwrap(); + + // T3 also attempts to update row ID 1 within an active transaction. + let tx3 = db.begin_tx(); + let tx3_row = Row { + id: 1, + data: "Hello, world!".to_string(), + }; + db.update(tx3, tx3_row.clone()).unwrap(); + + db.commit_tx(tx2); + db.commit_tx(tx3); // TODO: this should fail + + let tx4 = db.begin_tx(); + let row = db.read(tx4, 1).unwrap().unwrap(); + assert_eq!(tx2_row, row); + } +} \ No newline at end of file diff --git a/core/mvcc/database/src/lib.rs b/core/mvcc/database/src/lib.rs index 550b91d22..534e2ec26 100644 --- a/core/mvcc/database/src/lib.rs +++ b/core/mvcc/database/src/lib.rs @@ -32,646 +32,4 @@ //! * Garbage collection pub mod errors; - -use crate::errors::DatabaseError; -use std::cell::RefCell; -use std::collections::{HashMap, HashSet}; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, Mutex}; - -type Result = std::result::Result; - -#[derive(Clone, Debug, PartialEq)] -pub struct Row { - pub id: u64, - pub data: String, -} - -/// A row version. -#[derive(Clone, Debug)] -struct RowVersion { - begin: TxTimestampOrID, - end: Option, - row: Row, -} - -/// A transaction timestamp or ID. -/// -/// Versions either track a timestamp or a transaction ID, depending on the -/// phase of the transaction. During the active phase, new versions track the -/// transaction ID in the `begin` and `end` fields. After a transaction commits, -/// versions switch to tracking timestamps. -#[derive(Clone, Debug, PartialEq)] -enum TxTimestampOrID { - Timestamp(u64), - TxID(u64), -} - -/// Transaction -#[derive(Debug, Clone)] -pub struct Transaction { - /// The state of the transaction. - state: TransactionState, - /// The transaction ID. - tx_id: u64, - /// The transaction begin timestamp. - begin_ts: u64, - /// The transaction write set. - write_set: HashSet, - /// The transaction read set. - read_set: RefCell>, -} - -impl Transaction { - fn new(tx_id: u64, begin_ts: u64) -> Transaction { - Transaction { - state: TransactionState::Active, - tx_id, - begin_ts, - write_set: HashSet::new(), - read_set: RefCell::new(HashSet::new()), - } - } - - fn insert_to_read_set(&self, id: u64) { - let mut read_set = self.read_set.borrow_mut(); - read_set.insert(id); - } - - fn insert_to_write_set(&mut self, id: u64) { - self.write_set.insert(id); - } -} - -/// Transaction state. -#[derive(Debug, Clone)] -enum TransactionState { - Active, - Preparing, - Committed, - Aborted, - Terminated, -} - -/// A database with MVCC. -#[derive(Debug)] -pub struct Database { - inner: Arc>>, -} - -type TxID = u64; - -/// Logical clock. -pub trait LogicalClock { - fn get_timestamp(&self) -> u64; -} - -/// A node-local clock backed by an atomic counter. -#[derive(Debug, Default)] -pub struct LocalClock { - ts_sequence: AtomicU64, -} - -impl LocalClock { - pub fn new() -> Self { - Self { - ts_sequence: AtomicU64::new(0), - } - } -} - -impl LogicalClock for LocalClock { - fn get_timestamp(&self) -> u64 { - self.ts_sequence.fetch_add(1, Ordering::SeqCst) - } -} - -#[derive(Debug)] -pub struct DatabaseInner { - rows: RefCell>>, - txs: RefCell>, - tx_ids: AtomicU64, - clock: Clock, -} - -impl Database { - /// Creates a new database. - pub fn new(clock: Clock) -> Self { - let inner = DatabaseInner { - rows: RefCell::new(HashMap::new()), - txs: RefCell::new(HashMap::new()), - tx_ids: AtomicU64::new(0), - clock, - }; - Self { - inner: Arc::new(Mutex::new(inner)), - } - } - - /// Inserts a new row into the database. - /// - /// This function inserts a new `row` into the database within the context - /// of the transaction `tx_id`. - /// - /// # Arguments - /// - /// * `tx_id` - the ID of the transaction in which to insert the new row. - /// * `row` - the row object containing the values to be inserted. - /// - pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { - let inner = self.inner.lock().unwrap(); - let mut txs = inner.txs.borrow_mut(); - let tx = txs - .get_mut(&tx_id) - .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; - let id = row.id; - let row_version = RowVersion { - begin: TxTimestampOrID::TxID(tx.tx_id), - end: None, - row, - }; - let mut rows = inner.rows.borrow_mut(); - rows.entry(id).or_insert_with(Vec::new).push(row_version); - tx.insert_to_write_set(id); - Ok(()) - } - - /// Updates a row in the database with new values. - /// - /// This function updates an existing row in the database within the - /// context of the transaction `tx_id`. The `row` argument identifies the - /// row to be updated as `id` and contains the new values to be inserted. - /// - /// If the row identified by the `id` does not exist, this function does - /// nothing and returns `false`. Otherwise, the function updates the row - /// with the new values and returns `true`. - /// - /// # Arguments - /// - /// * `tx_id` - the ID of the transaction in which to update the new row. - /// * `row` - the row object containing the values to be updated. - /// - /// # Returns - /// - /// Returns `true` if the row was successfully updated, and `false` otherwise. - pub fn update(&self, tx_id: TxID, row: Row) -> Result { - if !self.delete(tx_id, row.id)? { - return Ok(false); - } - self.insert(tx_id, row)?; - Ok(true) - } - - /// Deletes a row from the table with the given `id`. - /// - /// This function deletes an existing row `id` in the database within the - /// context of the transaction `tx_id`. - /// - /// # Arguments - /// - /// * `tx_id` - the ID of the transaction in which to delete the new row. - /// * `id` - the ID of the row to delete. - /// - /// # Returns - /// - /// Returns `true` if the row was successfully deleted, and `false` otherwise. - /// - pub fn delete(&self, tx: TxID, id: u64) -> Result { - let inner = self.inner.lock().unwrap(); - let mut rows = inner.rows.borrow_mut(); - let mut txs = inner.txs.borrow_mut(); - match rows.get_mut(&id) { - Some(row_versions) => match row_versions.last_mut() { - Some(v) => { - let tx = txs.get(&tx).ok_or(DatabaseError::NoSuchTransactionID(tx))?; - if is_version_visible(&txs, tx, v) { - v.end = Some(TxTimestampOrID::TxID(tx.tx_id)); - } else { - return Ok(false); - } - } - None => unreachable!("no versions for row {}", id), - }, - None => return Ok(false), - } - let tx = txs - .get_mut(&tx) - .ok_or(DatabaseError::NoSuchTransactionID(tx))?; - tx.insert_to_write_set(id); - Ok(true) - } - - /// Retrieves a row from the table with the given `id`. - /// - /// This operation is performed within the scope of the transaction identified - /// by `tx_id`. - /// - /// # Arguments - /// - /// * `tx_id` - The ID of the transaction to perform the read operation in. - /// * `id` - The ID of the row to retrieve. - /// - /// # Returns - /// - /// Returns `Some(row)` with the row data if the row with the given `id` exists, - /// and `None` otherwise. - pub fn read(&self, tx_id: TxID, id: u64) -> Result> { - let inner = self.inner.lock().unwrap(); - let txs = inner.txs.borrow_mut(); - let tx = txs.get(&tx_id).unwrap(); - let rows = inner.rows.borrow(); - if let Some(row_versions) = rows.get(&id) { - for rv in row_versions.iter().rev() { - if is_version_visible(&txs, tx, rv) { - tx.insert_to_read_set(id); - return Ok(Some(rv.row.clone())); - } - } - } - Ok(None) - } - - /// Begins a new transaction in the database. - /// - /// This function starts a new transaction in the database and returns a `TxID` value - /// that you can use to perform operations within the transaction. All changes made within the - /// transaction are isolated from other transactions until you commit the transaction. - pub fn begin_tx(&self) -> TxID { - let mut inner = self.inner.lock().unwrap(); - let tx_id = get_tx_id(&mut inner); - let begin_ts = get_timestamp(&mut inner); - let tx = Transaction::new(tx_id, begin_ts); - let mut txs = inner.txs.borrow_mut(); - txs.insert(tx_id, tx); - tx_id - } - - /// Commits a transaction with the specified transaction ID. - /// - /// This function commits the changes made within the specified transaction and finalizes the - /// transaction. Once a transaction has been committed, all changes made within the transaction - /// are visible to other transactions that access the same data. - /// - /// # Arguments - /// - /// * `tx_id` - The ID of the transaction to commit. - pub fn commit_tx(&self, tx_id: TxID) { - let mut inner = self.inner.lock().unwrap(); - let end_ts = get_timestamp(&mut inner); - let mut txs = inner.txs.borrow_mut(); - let mut tx = txs.get_mut(&tx_id).unwrap(); - let mut rows = inner.rows.borrow_mut(); - tx.state = TransactionState::Preparing; - for id in &tx.write_set { - if let Some(row_versions) = rows.get_mut(id) { - for row_version in row_versions.iter_mut() { - if let TxTimestampOrID::TxID(id) = row_version.begin { - if id == tx_id { - row_version.begin = TxTimestampOrID::Timestamp(tx.begin_ts); - } - } - if let Some(TxTimestampOrID::TxID(id)) = row_version.end { - if id == tx_id { - row_version.end = Some(TxTimestampOrID::Timestamp(end_ts)); - } - } - } - } - } - tx.state = TransactionState::Committed; - } - - /// Rolls back a transaction with the specified ID. - /// - /// This function rolls back a transaction with the specified `tx_id` by - /// discarding any changes made by the transaction. - /// - /// # Arguments - /// - /// * `tx_id` - The ID of the transaction to abort. - pub fn rollback_tx(&self, tx_id: TxID) { - let inner = self.inner.lock().unwrap(); - let mut txs = inner.txs.borrow_mut(); - let mut tx = txs.get_mut(&tx_id).unwrap(); - tx.state = TransactionState::Aborted; - let mut rows = inner.rows.borrow_mut(); - for id in &tx.write_set { - if let Some(row_versions) = rows.get_mut(id) { - row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id)); - if row_versions.is_empty() { - rows.remove(id); - } - } - } - tx.state = TransactionState::Terminated; - } -} - -fn is_version_visible(txs: &HashMap, tx: &Transaction, rv: &RowVersion) -> bool { - is_begin_visible(txs, tx, rv) && is_end_visible(txs, tx, rv) -} - -fn is_begin_visible(txs: &HashMap, tx: &Transaction, rv: &RowVersion) -> bool { - match rv.begin { - TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts, - TxTimestampOrID::TxID(rv_begin) => { - let tb = txs.get(&rv_begin).unwrap(); - match tb.state { - TransactionState::Active => tx.tx_id == tb.tx_id && rv.end.is_none(), - TransactionState::Preparing => todo!(), - TransactionState::Committed => todo!(), - TransactionState::Aborted => todo!(), - TransactionState::Terminated => todo!(), - } - } - } -} - -fn is_end_visible(txs: &HashMap, tx: &Transaction, rv: &RowVersion) -> bool { - match rv.end { - Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts, - Some(TxTimestampOrID::TxID(rv_end)) => { - let te = txs.get(&rv_end).unwrap(); - match te.state { - TransactionState::Active => tx.tx_id == te.tx_id && rv.end.is_none(), - TransactionState::Preparing => todo!(), - TransactionState::Committed => todo!(), - TransactionState::Aborted => todo!(), - TransactionState::Terminated => todo!(), - } - } - None => true, - } -} - -fn get_tx_id(inner: &mut DatabaseInner) -> u64 { - inner.tx_ids.fetch_add(1, Ordering::SeqCst) -} - -fn get_timestamp(inner: &mut DatabaseInner) -> u64 { - inner.clock.get_timestamp() -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_insert_read() { - let clock = LocalClock::default(); - let db = Database::new(clock); - - let tx1 = db.begin_tx(); - let tx1_row = Row { - id: 1, - data: "Hello".to_string(), - }; - db.insert(tx1, tx1_row.clone()).unwrap(); - let row = db.read(tx1, 1).unwrap().unwrap(); - assert_eq!(tx1_row, row); - db.commit_tx(tx1); - - let tx2 = db.begin_tx(); - let row = db.read(tx2, 1).unwrap().unwrap(); - assert_eq!(tx1_row, row); - } - - #[test] - fn test_read_nonexistent() { - let clock = LocalClock::default(); - let db = Database::new(clock); - let tx = db.begin_tx(); - let row = db.read(tx, 1); - assert!(row.unwrap().is_none()); - } - - #[test] - fn test_delete() { - let clock = LocalClock::default(); - let db = Database::new(clock); - - let tx1 = db.begin_tx(); - let tx1_row = Row { - id: 1, - data: "Hello".to_string(), - }; - db.insert(tx1, tx1_row.clone()).unwrap(); - let row = db.read(tx1, 1).unwrap().unwrap(); - assert_eq!(tx1_row, row); - db.delete(tx1, 1).unwrap(); - let row = db.read(tx1, 1).unwrap(); - assert!(row.is_none()); - db.commit_tx(tx1); - - let tx2 = db.begin_tx(); - let row = db.read(tx2, 1).unwrap(); - assert!(row.is_none()); - } - - #[test] - fn test_delete_nonexistent() { - let clock = LocalClock::default(); - let db = Database::new(clock); - let tx = db.begin_tx(); - assert_eq!(false, db.delete(tx, 1).unwrap()); - } - - #[test] - fn test_commit() { - let clock = LocalClock::default(); - let db = Database::new(clock); - let tx1 = db.begin_tx(); - let tx1_row = Row { - id: 1, - data: "Hello".to_string(), - }; - db.insert(tx1, tx1_row.clone()).unwrap(); - let row = db.read(tx1, 1).unwrap().unwrap(); - assert_eq!(tx1_row, row); - let tx1_updated_row = Row { - id: 1, - data: "World".to_string(), - }; - db.update(tx1, tx1_updated_row.clone()).unwrap(); - let row = db.read(tx1, 1).unwrap().unwrap(); - assert_eq!(tx1_updated_row, row); - db.commit_tx(tx1); - - let tx2 = db.begin_tx(); - let row = db.read(tx2, 1).unwrap().unwrap(); - db.commit_tx(tx2); - assert_eq!(tx1_updated_row, row); - } - - #[test] - fn test_rollback() { - let clock = LocalClock::default(); - let db = Database::new(clock); - let tx1 = db.begin_tx(); - let row1 = Row { - id: 1, - data: "Hello".to_string(), - }; - db.insert(tx1.clone(), row1.clone()).unwrap(); - let row2 = db.read(tx1.clone(), 1).unwrap().unwrap(); - assert_eq!(row1, row2); - let row3 = Row { - id: 1, - data: "World".to_string(), - }; - db.update(tx1.clone(), row3.clone()).unwrap(); - let row4 = db.read(tx1.clone(), 1).unwrap().unwrap(); - assert_eq!(row3, row4); - db.rollback_tx(tx1); - let tx2 = db.begin_tx(); - let row5 = db.read(tx2.clone(), 1).unwrap(); - assert_eq!(row5, None); - } - - #[test] - fn test_dirty_write() { - let clock = LocalClock::default(); - let db = Database::new(clock); - - // T1 inserts a row with ID 1, but does not commit. - let tx1 = db.begin_tx(); - let tx1_row = Row { - id: 1, - data: "Hello".to_string(), - }; - db.insert(tx1, tx1_row.clone()).unwrap(); - let row = db.read(tx1.clone(), 1).unwrap().unwrap(); - assert_eq!(tx1_row, row); - - // T2 attempts to delete row with ID 1, but fails because T1 has not committed. - let tx2 = db.begin_tx(); - let tx2_row = Row { - id: 1, - data: "World".to_string(), - }; - assert_eq!(false, db.update(tx2, tx2_row.clone()).unwrap()); - - let row = db.read(tx1, 1).unwrap().unwrap(); - assert_eq!(tx1_row, row); - } - - #[test] - fn test_dirty_read() { - let clock = LocalClock::default(); - let db = Database::new(clock); - - // T1 inserts a row with ID 1, but does not commit. - let tx1 = db.begin_tx(); - let row1 = Row { - id: 1, - data: "Hello".to_string(), - }; - db.insert(tx1, row1.clone()).unwrap(); - - // T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed. - let tx2 = db.begin_tx(); - let row2 = db.read(tx2, 1).unwrap(); - assert_eq!(row2, None); - } - - #[ignore] - #[test] - fn test_dirty_read_deleted() { - let clock = LocalClock::default(); - let db = Database::new(clock); - - // T1 inserts a row with ID 1 and commits. - let tx1 = db.begin_tx(); - let tx1_row = Row { - id: 1, - data: "Hello".to_string(), - }; - db.insert(tx1, tx1_row.clone()).unwrap(); - db.commit_tx(tx1); - - // T2 deletes row with ID 1, but does not commit. - let tx2 = db.begin_tx(); - assert_eq!(true, db.delete(tx2, 1).unwrap()); - - // T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed. - let tx3 = db.begin_tx(); - let row = db.read(tx3, 1).unwrap().unwrap(); - assert_eq!(tx1_row, row); - } - - #[test] - fn test_fuzzy_read() { - let clock = LocalClock::default(); - let db = Database::new(clock); - - // T1 inserts a row with ID 1 and commits. - let tx1 = db.begin_tx(); - let tx1_row = Row { - id: 1, - data: "Hello".to_string(), - }; - db.insert(tx1, tx1_row.clone()).unwrap(); - let row = db.read(tx1.clone(), 1).unwrap().unwrap(); - assert_eq!(tx1_row, row); - db.commit_tx(tx1); - - // T2 reads the row with ID 1 within an active transaction. - let tx2 = db.begin_tx(); - let row = db.read(tx2, 1).unwrap().unwrap(); - assert_eq!(tx1_row, row); - - // T3 updates the row and commits. - let tx3 = db.begin_tx(); - let tx3_row = Row { - id: 1, - data: "World".to_string(), - }; - db.update(tx3, tx3_row.clone()).unwrap(); - db.commit_tx(tx3); - - // T2 still reads the same version of the row as before. - let row = db.read(tx2, 1).unwrap().unwrap(); - assert_eq!(tx1_row, row); - } - - #[ignore] - #[test] - fn test_lost_update() { - let clock = LocalClock::default(); - let db = Database::new(clock); - - // T1 inserts a row with ID 1 and commits. - let tx1 = db.begin_tx(); - let tx1_row = Row { - id: 1, - data: "Hello".to_string(), - }; - db.insert(tx1, tx1_row.clone()).unwrap(); - let row = db.read(tx1.clone(), 1).unwrap().unwrap(); - assert_eq!(tx1_row, row); - db.commit_tx(tx1); - - // T2 attempts to update row ID 1 within an active transaction. - let tx2 = db.begin_tx(); - let tx2_row = Row { - id: 1, - data: "World".to_string(), - }; - db.update(tx2, tx2_row.clone()).unwrap(); - - // T3 also attempts to update row ID 1 within an active transaction. - let tx3 = db.begin_tx(); - let tx3_row = Row { - id: 1, - data: "Hello, world!".to_string(), - }; - db.update(tx3, tx3_row.clone()).unwrap(); - - db.commit_tx(tx2); - db.commit_tx(tx3); // TODO: this should fail - - let tx4 = db.begin_tx(); - let row = db.read(tx4, 1).unwrap().unwrap(); - assert_eq!(tx2_row, row); - } -} +pub mod database; \ No newline at end of file