diff --git a/core/mvcc/Cargo.lock b/core/mvcc/Cargo.lock index 373c791b7..0f4712b90 100644 --- a/core/mvcc/Cargo.lock +++ b/core/mvcc/Cargo.lock @@ -167,6 +167,7 @@ version = "0.0.0" dependencies = [ "anyhow", "rustyline", + "thiserror", ] [[package]] diff --git a/core/mvcc/database/Cargo.toml b/core/mvcc/database/Cargo.toml index 9789f8733..7029f5419 100644 --- a/core/mvcc/database/Cargo.toml +++ b/core/mvcc/database/Cargo.toml @@ -6,3 +6,4 @@ edition = "2021" [dependencies] anyhow = "1.0.70" rustyline = "11.0.0" +thiserror = "1.0.40" diff --git a/core/mvcc/database/src/errors.rs b/core/mvcc/database/src/errors.rs new file mode 100644 index 000000000..95901137b --- /dev/null +++ b/core/mvcc/database/src/errors.rs @@ -0,0 +1,7 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum DatabaseError { + #[error("no such transaction ID: `{0}`")] + NoSuchTransactionID(u64), +} diff --git a/core/mvcc/database/src/lib.rs b/core/mvcc/database/src/lib.rs index 40c17cd91..550b91d22 100644 --- a/core/mvcc/database/src/lib.rs +++ b/core/mvcc/database/src/lib.rs @@ -31,11 +31,16 @@ //! * Optimistic reads and writes //! * Garbage collection +pub mod errors; + +use crate::errors::DatabaseError; use std::cell::RefCell; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; +type Result = std::result::Result; + #[derive(Clone, Debug, PartialEq)] pub struct Row { pub id: u64, @@ -173,10 +178,12 @@ impl Database { /// * `tx_id` - the ID of the transaction in which to insert the new row. /// * `row` - the row object containing the values to be inserted. /// - pub fn insert(&self, tx_id: TxID, row: Row) { + pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { let inner = self.inner.lock().unwrap(); let mut txs = inner.txs.borrow_mut(); - let tx = txs.get_mut(&tx_id).unwrap(); + let tx = txs + .get_mut(&tx_id) + .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; let id = row.id; let row_version = RowVersion { begin: TxTimestampOrID::TxID(tx.tx_id), @@ -186,6 +193,7 @@ impl Database { let mut rows = inner.rows.borrow_mut(); rows.entry(id).or_insert_with(Vec::new).push(row_version); tx.insert_to_write_set(id); + Ok(()) } /// Updates a row in the database with new values. @@ -206,12 +214,12 @@ impl Database { /// # Returns /// /// Returns `true` if the row was successfully updated, and `false` otherwise. - pub fn update(&self, tx_id: TxID, row: Row) -> bool { - if !self.delete(tx_id, row.id) { - return false; + pub fn update(&self, tx_id: TxID, row: Row) -> Result { + if !self.delete(tx_id, row.id)? { + return Ok(false); } - self.insert(tx_id, row); - true + self.insert(tx_id, row)?; + Ok(true) } /// Deletes a row from the table with the given `id`. @@ -228,27 +236,29 @@ impl Database { /// /// Returns `true` if the row was successfully deleted, and `false` otherwise. /// - pub fn delete(&self, tx: TxID, id: u64) -> bool { + pub fn delete(&self, tx: TxID, id: u64) -> Result { let inner = self.inner.lock().unwrap(); let mut rows = inner.rows.borrow_mut(); let mut txs = inner.txs.borrow_mut(); match rows.get_mut(&id) { Some(row_versions) => match row_versions.last_mut() { Some(v) => { - let tx = txs.get(&tx).unwrap(); + let tx = txs.get(&tx).ok_or(DatabaseError::NoSuchTransactionID(tx))?; if is_version_visible(&txs, tx, v) { v.end = Some(TxTimestampOrID::TxID(tx.tx_id)); } else { - return false; + return Ok(false); } } None => unreachable!("no versions for row {}", id), }, - None => return false, + None => return Ok(false), } - let tx = txs.get_mut(&tx).unwrap(); + let tx = txs + .get_mut(&tx) + .ok_or(DatabaseError::NoSuchTransactionID(tx))?; tx.insert_to_write_set(id); - true + Ok(true) } /// Retrieves a row from the table with the given `id`. @@ -265,7 +275,7 @@ impl Database { /// /// Returns `Some(row)` with the row data if the row with the given `id` exists, /// and `None` otherwise. - pub fn read(&self, tx_id: TxID, id: u64) -> Option { + pub fn read(&self, tx_id: TxID, id: u64) -> Result> { let inner = self.inner.lock().unwrap(); let txs = inner.txs.borrow_mut(); let tx = txs.get(&tx_id).unwrap(); @@ -274,11 +284,11 @@ impl Database { for rv in row_versions.iter().rev() { if is_version_visible(&txs, tx, rv) { tx.insert_to_read_set(id); - return Some(rv.row.clone()); + return Ok(Some(rv.row.clone())); } } } - None + Ok(None) } /// Begins a new transaction in the database. @@ -416,13 +426,13 @@ mod tests { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()); - let row = db.read(tx1, 1).unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db.read(tx1, 1).unwrap().unwrap(); assert_eq!(tx1_row, row); db.commit_tx(tx1); let tx2 = db.begin_tx(); - let row = db.read(tx2, 1).unwrap(); + let row = db.read(tx2, 1).unwrap().unwrap(); assert_eq!(tx1_row, row); } @@ -432,7 +442,7 @@ mod tests { let db = Database::new(clock); let tx = db.begin_tx(); let row = db.read(tx, 1); - assert!(row.is_none()); + assert!(row.unwrap().is_none()); } #[test] @@ -445,16 +455,16 @@ mod tests { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()); - let row = db.read(tx1, 1).unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db.read(tx1, 1).unwrap().unwrap(); assert_eq!(tx1_row, row); - db.delete(tx1, 1); - let row = db.read(tx1, 1); + db.delete(tx1, 1).unwrap(); + let row = db.read(tx1, 1).unwrap(); assert!(row.is_none()); db.commit_tx(tx1); let tx2 = db.begin_tx(); - let row = db.read(tx2, 1); + let row = db.read(tx2, 1).unwrap(); assert!(row.is_none()); } @@ -463,7 +473,7 @@ mod tests { let clock = LocalClock::default(); let db = Database::new(clock); let tx = db.begin_tx(); - assert_eq!(false, db.delete(tx, 1)); + assert_eq!(false, db.delete(tx, 1).unwrap()); } #[test] @@ -475,20 +485,20 @@ mod tests { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()); - let row = db.read(tx1, 1).unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db.read(tx1, 1).unwrap().unwrap(); assert_eq!(tx1_row, row); let tx1_updated_row = Row { id: 1, data: "World".to_string(), }; - db.update(tx1, tx1_updated_row.clone()); - let row = db.read(tx1, 1).unwrap(); + db.update(tx1, tx1_updated_row.clone()).unwrap(); + let row = db.read(tx1, 1).unwrap().unwrap(); assert_eq!(tx1_updated_row, row); db.commit_tx(tx1); let tx2 = db.begin_tx(); - let row = db.read(tx2, 1).unwrap(); + let row = db.read(tx2, 1).unwrap().unwrap(); db.commit_tx(tx2); assert_eq!(tx1_updated_row, row); } @@ -502,19 +512,19 @@ mod tests { id: 1, data: "Hello".to_string(), }; - db.insert(tx1.clone(), row1.clone()); - let row2 = db.read(tx1.clone(), 1).unwrap(); + db.insert(tx1.clone(), row1.clone()).unwrap(); + let row2 = db.read(tx1.clone(), 1).unwrap().unwrap(); assert_eq!(row1, row2); let row3 = Row { id: 1, data: "World".to_string(), }; - db.update(tx1.clone(), row3.clone()); - let row4 = db.read(tx1.clone(), 1).unwrap(); + db.update(tx1.clone(), row3.clone()).unwrap(); + let row4 = db.read(tx1.clone(), 1).unwrap().unwrap(); assert_eq!(row3, row4); db.rollback_tx(tx1); let tx2 = db.begin_tx(); - let row5 = db.read(tx2.clone(), 1); + let row5 = db.read(tx2.clone(), 1).unwrap(); assert_eq!(row5, None); } @@ -529,8 +539,8 @@ mod tests { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()); - let row = db.read(tx1.clone(), 1).unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db.read(tx1.clone(), 1).unwrap().unwrap(); assert_eq!(tx1_row, row); // T2 attempts to delete row with ID 1, but fails because T1 has not committed. @@ -539,9 +549,9 @@ mod tests { id: 1, data: "World".to_string(), }; - assert_eq!(false, db.update(tx2, tx2_row.clone())); + assert_eq!(false, db.update(tx2, tx2_row.clone()).unwrap()); - let row = db.read(tx1, 1).unwrap(); + let row = db.read(tx1, 1).unwrap().unwrap(); assert_eq!(tx1_row, row); } @@ -556,11 +566,11 @@ mod tests { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, row1.clone()); + db.insert(tx1, row1.clone()).unwrap(); // T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed. let tx2 = db.begin_tx(); - let row2 = db.read(tx2, 1); + let row2 = db.read(tx2, 1).unwrap(); assert_eq!(row2, None); } @@ -576,16 +586,16 @@ mod tests { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()); + db.insert(tx1, tx1_row.clone()).unwrap(); db.commit_tx(tx1); // T2 deletes row with ID 1, but does not commit. let tx2 = db.begin_tx(); - assert_eq!(true, db.delete(tx2, 1)); + assert_eq!(true, db.delete(tx2, 1).unwrap()); // T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed. let tx3 = db.begin_tx(); - let row = db.read(tx3, 1).unwrap(); + let row = db.read(tx3, 1).unwrap().unwrap(); assert_eq!(tx1_row, row); } @@ -600,14 +610,14 @@ mod tests { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()); - let row = db.read(tx1.clone(), 1).unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db.read(tx1.clone(), 1).unwrap().unwrap(); assert_eq!(tx1_row, row); db.commit_tx(tx1); // T2 reads the row with ID 1 within an active transaction. let tx2 = db.begin_tx(); - let row = db.read(tx2, 1).unwrap(); + let row = db.read(tx2, 1).unwrap().unwrap(); assert_eq!(tx1_row, row); // T3 updates the row and commits. @@ -616,11 +626,11 @@ mod tests { id: 1, data: "World".to_string(), }; - db.update(tx3, tx3_row.clone()); + db.update(tx3, tx3_row.clone()).unwrap(); db.commit_tx(tx3); // T2 still reads the same version of the row as before. - let row = db.read(tx2, 1).unwrap(); + let row = db.read(tx2, 1).unwrap().unwrap(); assert_eq!(tx1_row, row); } @@ -636,8 +646,8 @@ mod tests { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()); - let row = db.read(tx1.clone(), 1).unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); + let row = db.read(tx1.clone(), 1).unwrap().unwrap(); assert_eq!(tx1_row, row); db.commit_tx(tx1); @@ -647,7 +657,7 @@ mod tests { id: 1, data: "World".to_string(), }; - db.update(tx2, tx2_row.clone()); + db.update(tx2, tx2_row.clone()).unwrap(); // T3 also attempts to update row ID 1 within an active transaction. let tx3 = db.begin_tx(); @@ -655,13 +665,13 @@ mod tests { id: 1, data: "Hello, world!".to_string(), }; - db.update(tx3, tx3_row.clone()); + db.update(tx3, tx3_row.clone()).unwrap(); db.commit_tx(tx2); db.commit_tx(tx3); // TODO: this should fail let tx4 = db.begin_tx(); - let row = db.read(tx4, 1).unwrap(); + let row = db.read(tx4, 1).unwrap().unwrap(); assert_eq!(tx2_row, row); } }