Replace unwrap() with NoSuchTransactionID error

This commit is contained in:
Pekka Enberg
2023-04-09 08:43:00 +03:00
parent df0cadc02e
commit 8f30c20215
4 changed files with 74 additions and 55 deletions

1
core/mvcc/Cargo.lock generated
View File

@@ -167,6 +167,7 @@ version = "0.0.0"
dependencies = [
"anyhow",
"rustyline",
"thiserror",
]
[[package]]

View File

@@ -6,3 +6,4 @@ edition = "2021"
[dependencies]
anyhow = "1.0.70"
rustyline = "11.0.0"
thiserror = "1.0.40"

View File

@@ -0,0 +1,7 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum DatabaseError {
#[error("no such transaction ID: `{0}`")]
NoSuchTransactionID(u64),
}

View File

@@ -31,11 +31,16 @@
//! * Optimistic reads and writes
//! * Garbage collection
pub mod errors;
use crate::errors::DatabaseError;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
type Result<T> = std::result::Result<T, DatabaseError>;
#[derive(Clone, Debug, PartialEq)]
pub struct Row {
pub id: u64,
@@ -173,10 +178,12 @@ impl<Clock: LogicalClock> Database<Clock> {
/// * `tx_id` - the ID of the transaction in which to insert the new row.
/// * `row` - the row object containing the values to be inserted.
///
pub fn insert(&self, tx_id: TxID, row: Row) {
pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
let inner = self.inner.lock().unwrap();
let mut txs = inner.txs.borrow_mut();
let tx = txs.get_mut(&tx_id).unwrap();
let tx = txs
.get_mut(&tx_id)
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
let id = row.id;
let row_version = RowVersion {
begin: TxTimestampOrID::TxID(tx.tx_id),
@@ -186,6 +193,7 @@ impl<Clock: LogicalClock> Database<Clock> {
let mut rows = inner.rows.borrow_mut();
rows.entry(id).or_insert_with(Vec::new).push(row_version);
tx.insert_to_write_set(id);
Ok(())
}
/// Updates a row in the database with new values.
@@ -206,12 +214,12 @@ impl<Clock: LogicalClock> Database<Clock> {
/// # Returns
///
/// Returns `true` if the row was successfully updated, and `false` otherwise.
pub fn update(&self, tx_id: TxID, row: Row) -> bool {
if !self.delete(tx_id, row.id) {
return false;
pub fn update(&self, tx_id: TxID, row: Row) -> Result<bool> {
if !self.delete(tx_id, row.id)? {
return Ok(false);
}
self.insert(tx_id, row);
true
self.insert(tx_id, row)?;
Ok(true)
}
/// Deletes a row from the table with the given `id`.
@@ -228,27 +236,29 @@ impl<Clock: LogicalClock> Database<Clock> {
///
/// Returns `true` if the row was successfully deleted, and `false` otherwise.
///
pub fn delete(&self, tx: TxID, id: u64) -> bool {
pub fn delete(&self, tx: TxID, id: u64) -> Result<bool> {
let inner = self.inner.lock().unwrap();
let mut rows = inner.rows.borrow_mut();
let mut txs = inner.txs.borrow_mut();
match rows.get_mut(&id) {
Some(row_versions) => match row_versions.last_mut() {
Some(v) => {
let tx = txs.get(&tx).unwrap();
let tx = txs.get(&tx).ok_or(DatabaseError::NoSuchTransactionID(tx))?;
if is_version_visible(&txs, tx, v) {
v.end = Some(TxTimestampOrID::TxID(tx.tx_id));
} else {
return false;
return Ok(false);
}
}
None => unreachable!("no versions for row {}", id),
},
None => return false,
None => return Ok(false),
}
let tx = txs.get_mut(&tx).unwrap();
let tx = txs
.get_mut(&tx)
.ok_or(DatabaseError::NoSuchTransactionID(tx))?;
tx.insert_to_write_set(id);
true
Ok(true)
}
/// Retrieves a row from the table with the given `id`.
@@ -265,7 +275,7 @@ impl<Clock: LogicalClock> Database<Clock> {
///
/// Returns `Some(row)` with the row data if the row with the given `id` exists,
/// and `None` otherwise.
pub fn read(&self, tx_id: TxID, id: u64) -> Option<Row> {
pub fn read(&self, tx_id: TxID, id: u64) -> Result<Option<Row>> {
let inner = self.inner.lock().unwrap();
let txs = inner.txs.borrow_mut();
let tx = txs.get(&tx_id).unwrap();
@@ -274,11 +284,11 @@ impl<Clock: LogicalClock> Database<Clock> {
for rv in row_versions.iter().rev() {
if is_version_visible(&txs, tx, rv) {
tx.insert_to_read_set(id);
return Some(rv.row.clone());
return Ok(Some(rv.row.clone()));
}
}
}
None
Ok(None)
}
/// Begins a new transaction in the database.
@@ -416,13 +426,13 @@ mod tests {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone());
let row = db.read(tx1, 1).unwrap();
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db.read(tx1, 1).unwrap().unwrap();
assert_eq!(tx1_row, row);
db.commit_tx(tx1);
let tx2 = db.begin_tx();
let row = db.read(tx2, 1).unwrap();
let row = db.read(tx2, 1).unwrap().unwrap();
assert_eq!(tx1_row, row);
}
@@ -432,7 +442,7 @@ mod tests {
let db = Database::new(clock);
let tx = db.begin_tx();
let row = db.read(tx, 1);
assert!(row.is_none());
assert!(row.unwrap().is_none());
}
#[test]
@@ -445,16 +455,16 @@ mod tests {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone());
let row = db.read(tx1, 1).unwrap();
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db.read(tx1, 1).unwrap().unwrap();
assert_eq!(tx1_row, row);
db.delete(tx1, 1);
let row = db.read(tx1, 1);
db.delete(tx1, 1).unwrap();
let row = db.read(tx1, 1).unwrap();
assert!(row.is_none());
db.commit_tx(tx1);
let tx2 = db.begin_tx();
let row = db.read(tx2, 1);
let row = db.read(tx2, 1).unwrap();
assert!(row.is_none());
}
@@ -463,7 +473,7 @@ mod tests {
let clock = LocalClock::default();
let db = Database::new(clock);
let tx = db.begin_tx();
assert_eq!(false, db.delete(tx, 1));
assert_eq!(false, db.delete(tx, 1).unwrap());
}
#[test]
@@ -475,20 +485,20 @@ mod tests {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone());
let row = db.read(tx1, 1).unwrap();
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db.read(tx1, 1).unwrap().unwrap();
assert_eq!(tx1_row, row);
let tx1_updated_row = Row {
id: 1,
data: "World".to_string(),
};
db.update(tx1, tx1_updated_row.clone());
let row = db.read(tx1, 1).unwrap();
db.update(tx1, tx1_updated_row.clone()).unwrap();
let row = db.read(tx1, 1).unwrap().unwrap();
assert_eq!(tx1_updated_row, row);
db.commit_tx(tx1);
let tx2 = db.begin_tx();
let row = db.read(tx2, 1).unwrap();
let row = db.read(tx2, 1).unwrap().unwrap();
db.commit_tx(tx2);
assert_eq!(tx1_updated_row, row);
}
@@ -502,19 +512,19 @@ mod tests {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1.clone(), row1.clone());
let row2 = db.read(tx1.clone(), 1).unwrap();
db.insert(tx1.clone(), row1.clone()).unwrap();
let row2 = db.read(tx1.clone(), 1).unwrap().unwrap();
assert_eq!(row1, row2);
let row3 = Row {
id: 1,
data: "World".to_string(),
};
db.update(tx1.clone(), row3.clone());
let row4 = db.read(tx1.clone(), 1).unwrap();
db.update(tx1.clone(), row3.clone()).unwrap();
let row4 = db.read(tx1.clone(), 1).unwrap().unwrap();
assert_eq!(row3, row4);
db.rollback_tx(tx1);
let tx2 = db.begin_tx();
let row5 = db.read(tx2.clone(), 1);
let row5 = db.read(tx2.clone(), 1).unwrap();
assert_eq!(row5, None);
}
@@ -529,8 +539,8 @@ mod tests {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone());
let row = db.read(tx1.clone(), 1).unwrap();
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db.read(tx1.clone(), 1).unwrap().unwrap();
assert_eq!(tx1_row, row);
// T2 attempts to delete row with ID 1, but fails because T1 has not committed.
@@ -539,9 +549,9 @@ mod tests {
id: 1,
data: "World".to_string(),
};
assert_eq!(false, db.update(tx2, tx2_row.clone()));
assert_eq!(false, db.update(tx2, tx2_row.clone()).unwrap());
let row = db.read(tx1, 1).unwrap();
let row = db.read(tx1, 1).unwrap().unwrap();
assert_eq!(tx1_row, row);
}
@@ -556,11 +566,11 @@ mod tests {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, row1.clone());
db.insert(tx1, row1.clone()).unwrap();
// T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed.
let tx2 = db.begin_tx();
let row2 = db.read(tx2, 1);
let row2 = db.read(tx2, 1).unwrap();
assert_eq!(row2, None);
}
@@ -576,16 +586,16 @@ mod tests {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone());
db.insert(tx1, tx1_row.clone()).unwrap();
db.commit_tx(tx1);
// T2 deletes row with ID 1, but does not commit.
let tx2 = db.begin_tx();
assert_eq!(true, db.delete(tx2, 1));
assert_eq!(true, db.delete(tx2, 1).unwrap());
// T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed.
let tx3 = db.begin_tx();
let row = db.read(tx3, 1).unwrap();
let row = db.read(tx3, 1).unwrap().unwrap();
assert_eq!(tx1_row, row);
}
@@ -600,14 +610,14 @@ mod tests {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone());
let row = db.read(tx1.clone(), 1).unwrap();
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db.read(tx1.clone(), 1).unwrap().unwrap();
assert_eq!(tx1_row, row);
db.commit_tx(tx1);
// T2 reads the row with ID 1 within an active transaction.
let tx2 = db.begin_tx();
let row = db.read(tx2, 1).unwrap();
let row = db.read(tx2, 1).unwrap().unwrap();
assert_eq!(tx1_row, row);
// T3 updates the row and commits.
@@ -616,11 +626,11 @@ mod tests {
id: 1,
data: "World".to_string(),
};
db.update(tx3, tx3_row.clone());
db.update(tx3, tx3_row.clone()).unwrap();
db.commit_tx(tx3);
// T2 still reads the same version of the row as before.
let row = db.read(tx2, 1).unwrap();
let row = db.read(tx2, 1).unwrap().unwrap();
assert_eq!(tx1_row, row);
}
@@ -636,8 +646,8 @@ mod tests {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone());
let row = db.read(tx1.clone(), 1).unwrap();
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db.read(tx1.clone(), 1).unwrap().unwrap();
assert_eq!(tx1_row, row);
db.commit_tx(tx1);
@@ -647,7 +657,7 @@ mod tests {
id: 1,
data: "World".to_string(),
};
db.update(tx2, tx2_row.clone());
db.update(tx2, tx2_row.clone()).unwrap();
// T3 also attempts to update row ID 1 within an active transaction.
let tx3 = db.begin_tx();
@@ -655,13 +665,13 @@ mod tests {
id: 1,
data: "Hello, world!".to_string(),
};
db.update(tx3, tx3_row.clone());
db.update(tx3, tx3_row.clone()).unwrap();
db.commit_tx(tx2);
db.commit_tx(tx3); // TODO: this should fail
let tx4 = db.begin_tx();
let row = db.read(tx4, 1).unwrap();
let row = db.read(tx4, 1).unwrap().unwrap();
assert_eq!(tx2_row, row);
}
}