Merge pull request #38 from penberg/tableid

mvcc: switch to (table_id, row_id) for row ids
This commit is contained in:
Pekka Enberg
2023-05-12 14:20:16 +03:00
committed by GitHub
6 changed files with 594 additions and 89 deletions

View File

@@ -25,13 +25,21 @@ MVCCDatabaseRef MVCCDatabaseOpen(const char *path);
void MVCCDatabaseClose(MVCCDatabaseRef db);
MVCCError MVCCDatabaseInsert(MVCCDatabaseRef db, uint64_t id, const void *value_ptr, uintptr_t value_len);
MVCCError MVCCDatabaseInsert(MVCCDatabaseRef db,
uint64_t table_id,
uint64_t row_id,
const void *value_ptr,
uintptr_t value_len);
MVCCError MVCCDatabaseRead(MVCCDatabaseRef db, uint64_t id, uint8_t **value_ptr, int64_t *value_len);
MVCCError MVCCDatabaseRead(MVCCDatabaseRef db,
uint64_t table_id,
uint64_t row_id,
uint8_t **value_ptr,
int64_t *value_len);
void MVCCFreeStr(void *ptr);
MVCCScanCursorRef MVCCScanCursorOpen(MVCCDatabaseRef db);
MVCCScanCursorRef MVCCScanCursorOpen(MVCCDatabaseRef db, uint64_t table_id);
void MVCCScanCursorClose(MVCCScanCursorRef cursor);

View File

@@ -64,7 +64,8 @@ pub unsafe extern "C" fn MVCCDatabaseClose(db: MVCCDatabaseRef) {
#[no_mangle]
pub unsafe extern "C" fn MVCCDatabaseInsert(
db: MVCCDatabaseRef,
id: u64,
table_id: u64,
row_id: u64,
value_ptr: *const std::ffi::c_void,
value_len: usize,
) -> MVCCError {
@@ -79,6 +80,7 @@ pub unsafe extern "C" fn MVCCDatabaseInsert(
}
};
let (db, runtime) = (&db.db, &db.runtime);
let id = database::RowID { table_id, row_id };
let row = database::Row { id, data };
tracing::debug!("MVCCDatabaseInsert: {row:?}");
match runtime.block_on(async move {
@@ -100,7 +102,8 @@ pub unsafe extern "C" fn MVCCDatabaseInsert(
#[no_mangle]
pub unsafe extern "C" fn MVCCDatabaseRead(
db: MVCCDatabaseRef,
id: u64,
table_id: u64,
row_id: u64,
value_ptr: *mut *mut u8,
value_len: *mut i64,
) -> MVCCError {
@@ -109,6 +112,7 @@ pub unsafe extern "C" fn MVCCDatabaseRead(
match runtime.block_on(async move {
let tx = db.begin_tx().await;
let id = database::RowID { table_id, row_id };
let maybe_row = db.read(tx, id).await?;
match maybe_row {
Some(row) => {
@@ -148,13 +152,18 @@ pub unsafe extern "C" fn MVCCFreeStr(ptr: *mut std::ffi::c_void) {
}
#[no_mangle]
pub unsafe extern "C" fn MVCCScanCursorOpen(db: MVCCDatabaseRef) -> MVCCScanCursorRef {
pub unsafe extern "C" fn MVCCScanCursorOpen(
db: MVCCDatabaseRef,
table_id: u64,
) -> MVCCScanCursorRef {
tracing::debug!("MVCCScanCursorOpen()");
// Reference is transmuted to &'static in order to be able to pass the cursor back to C.
// The contract with C is to never use a cursor after MVCCDatabaseClose() has been called.
let database = unsafe { std::mem::transmute::<&DbContext, &'static DbContext>(db.get_ref()) };
let (database, runtime) = (&database.db, &database.runtime);
match runtime.block_on(async move { mvcc_rs::cursor::ScanCursor::new(database).await }) {
match runtime
.block_on(async move { mvcc_rs::cursor::ScanCursor::new(database, table_id).await })
{
Ok(cursor) => {
if cursor.is_empty() {
tracing::debug!("Cursor is empty");
@@ -254,5 +263,8 @@ pub unsafe extern "C" fn MVCCScanCursorNext(cursor: MVCCScanCursorRef) -> std::f
pub unsafe extern "C" fn MVCCScanCursorPosition(cursor: MVCCScanCursorRef) -> u64 {
let cursor_ctx = unsafe { &mut *cursor.ptr };
let cursor = &mut cursor_ctx.cursor;
cursor.current_row_id().unwrap_or(0)
cursor
.current_row_id()
.map(|row_id| row_id.row_id)
.unwrap_or(0)
}

View File

@@ -1,7 +1,7 @@
use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use mvcc_rs::clock::LocalClock;
use mvcc_rs::database::{Database, Row};
use mvcc_rs::database::{Database, Row, RowID};
use pprof::criterion::{Output, PProfProfiler};
fn bench_db() -> Database<
@@ -47,7 +47,15 @@ fn bench(c: &mut Criterion) {
group.bench_function("begin_tx-read-commit_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
db.read(tx_id, 1).await.unwrap();
db.read(
tx_id,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap();
db.commit_tx(tx_id).await
})
});
@@ -59,7 +67,10 @@ fn bench(c: &mut Criterion) {
db.update(
tx_id,
Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "World".to_string(),
},
)
@@ -74,14 +85,25 @@ fn bench(c: &mut Criterion) {
futures::executor::block_on(db.insert(
tx,
Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
},
))
.unwrap();
group.bench_function("read", |b| {
b.to_async(FuturesExecutor).iter(|| async {
db.read(tx, 1).await.unwrap();
db.read(
tx,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap();
})
});
@@ -90,7 +112,10 @@ fn bench(c: &mut Criterion) {
futures::executor::block_on(db.insert(
tx,
Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
},
))
@@ -100,7 +125,10 @@ fn bench(c: &mut Criterion) {
db.update(
tx,
Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "World".to_string(),
},
)

View File

@@ -1,5 +1,5 @@
use crate::clock::LogicalClock;
use crate::database::{Database, DatabaseInner, Result, Row};
use crate::database::{Database, DatabaseInner, Result, Row, RowID};
use crate::persistent_storage::Storage;
use crate::sync::AsyncMutex;
@@ -11,7 +11,7 @@ pub struct ScanCursor<
Mutex: AsyncMutex<Inner = DatabaseInner<Clock, StorageImpl>>,
> {
pub db: &'a Database<Clock, StorageImpl, Mutex>,
pub row_ids: Vec<u64>,
pub row_ids: Vec<RowID>,
pub index: usize,
tx_id: u64,
}
@@ -25,9 +25,10 @@ impl<
{
pub async fn new(
db: &'a Database<Clock, StorageImpl, Mutex>,
table_id: u64,
) -> Result<ScanCursor<'a, Clock, StorageImpl, Mutex>> {
let tx_id = db.begin_tx().await;
let row_ids = db.scan_row_ids().await?;
let row_ids = db.scan_row_ids_for_table(table_id).await?;
Ok(Self {
db,
tx_id,
@@ -36,7 +37,7 @@ impl<
})
}
pub fn current_row_id(&self) -> Option<u64> {
pub fn current_row_id(&self) -> Option<RowID> {
if self.index >= self.row_ids.len() {
return None;
}

View File

@@ -8,9 +8,16 @@ use std::sync::Arc;
pub type Result<T> = std::result::Result<T, DatabaseError>;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash)]
pub struct RowID {
pub table_id: u64,
pub row_id: u64,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Row {
pub id: u64,
pub id: RowID,
pub data: String,
}
@@ -62,9 +69,9 @@ pub struct Transaction {
/// The transaction begin timestamp.
begin_ts: u64,
/// The transaction write set.
write_set: HashSet<u64>,
write_set: HashSet<RowID>,
/// The transaction read set.
read_set: RefCell<HashSet<u64>>,
read_set: RefCell<HashSet<RowID>>,
}
impl Transaction {
@@ -78,12 +85,12 @@ impl Transaction {
}
}
fn insert_to_read_set(&self, id: u64) {
fn insert_to_read_set(&self, id: RowID) {
let mut read_set = self.read_set.borrow_mut();
read_set.insert(id);
}
fn insert_to_write_set(&mut self, id: u64) {
fn insert_to_write_set(&mut self, id: RowID) {
self.write_set.insert(id);
}
}
@@ -201,7 +208,7 @@ impl<
///
/// Returns `true` if the row was successfully deleted, and `false` otherwise.
///
pub async fn delete(&self, tx_id: TxID, id: u64) -> Result<bool> {
pub async fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
let inner = self.inner.lock().await;
inner.delete(tx_id, id).await
}
@@ -220,16 +227,21 @@ impl<
///
/// Returns `Some(row)` with the row data if the row with the given `id` exists,
/// and `None` otherwise.
pub async fn read(&self, tx_id: TxID, id: u64) -> Result<Option<Row>> {
pub async fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
let inner = self.inner.lock().await;
inner.read(tx_id, id).await
}
pub async fn scan_row_ids(&self) -> Result<Vec<u64>> {
pub async fn scan_row_ids(&self) -> Result<Vec<RowID>> {
let inner = self.inner.lock().await;
inner.scan_row_ids()
}
pub async fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
let inner = self.inner.lock().await;
inner.scan_row_ids_for_table(table_id)
}
/// Begins a new transaction in the database.
///
/// This function starts a new transaction in the database and returns a `TxID` value
@@ -284,7 +296,7 @@ impl<
#[derive(Debug)]
pub struct DatabaseInner<Clock: LogicalClock, Storage: crate::persistent_storage::Storage> {
rows: RefCell<BTreeMap<u64, Vec<RowVersion>>>,
rows: RefCell<BTreeMap<RowID, Vec<RowVersion>>>,
txs: RefCell<HashMap<TxID, Transaction>>,
tx_timestamps: RefCell<BTreeMap<u64, usize>>,
tx_ids: AtomicU64,
@@ -314,7 +326,7 @@ impl<Clock: LogicalClock, Storage: crate::persistent_storage::Storage>
}
#[allow(clippy::await_holding_refcell_ref)]
async fn delete(&self, tx_id: TxID, id: u64) -> Result<bool> {
async fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
// NOTICE: They *are* dropped before an await point!!! But the await is conditional,
// so I think clippy is just confused.
let mut txs = self.txs.borrow_mut();
@@ -344,7 +356,7 @@ impl<Clock: LogicalClock, Storage: crate::persistent_storage::Storage>
Ok(false)
}
async fn read(&self, tx_id: TxID, id: u64) -> Result<Option<Row>> {
async fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
let txs = self.txs.borrow_mut();
let tx = txs.get(&tx_id).unwrap();
assert!(tx.state == TransactionState::Active);
@@ -360,11 +372,28 @@ impl<Clock: LogicalClock, Storage: crate::persistent_storage::Storage>
Ok(None)
}
fn scan_row_ids(&self) -> Result<Vec<u64>> {
fn scan_row_ids(&self) -> Result<Vec<RowID>> {
let rows = self.rows.borrow();
Ok(rows.keys().cloned().collect())
}
fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
let rows = self.rows.borrow();
Ok(rows
.range(
RowID {
table_id,
row_id: 0,
}..RowID {
table_id,
row_id: u64::MAX,
},
)
.map(|(k, _)| k)
.cloned()
.collect())
}
async fn begin_tx(&mut self) -> TxID {
let tx_id = self.get_tx_id();
let begin_ts = self.get_timestamp();
@@ -496,7 +525,7 @@ impl<Clock: LogicalClock, Storage: crate::persistent_storage::Storage>
None => true,
};
if !should_stay {
tracing::debug!("Dropping row version {} {:?}-{:?}", id, rv.begin, rv.end);
tracing::debug!("Dropping row version {:?} {:?}-{:?}", id, rv.begin, rv.end);
}
should_stay
});
@@ -613,16 +642,39 @@ mod tests {
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
let row = db
.read(
tx1,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
db.commit_tx(tx1).await.unwrap();
let tx2 = db.begin_tx().await;
let row = db.read(tx2, 1).await.unwrap().unwrap();
let row = db
.read(
tx2,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
}
@@ -631,7 +683,15 @@ mod tests {
async fn test_read_nonexistent() {
let db = test_db();
let tx = db.begin_tx().await;
let row = db.read(tx, 1).await;
let row = db
.read(
tx,
RowID {
table_id: 1,
row_id: 1,
},
)
.await;
assert!(row.unwrap().is_none());
}
@@ -642,19 +702,58 @@ mod tests {
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
let row = db
.read(
tx1,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
db.delete(tx1, 1).await.unwrap();
let row = db.read(tx1, 1).await.unwrap();
db.delete(
tx1,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap();
let row = db
.read(
tx1,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap();
assert!(row.is_none());
db.commit_tx(tx1).await.unwrap();
let tx2 = db.begin_tx().await;
let row = db.read(tx2, 1).await.unwrap();
let row = db
.read(
tx2,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap();
assert!(row.is_none());
}
@@ -663,7 +762,16 @@ mod tests {
async fn test_delete_nonexistent() {
let db = test_db();
let tx = db.begin_tx().await;
assert!(!db.delete(tx, 1).await.unwrap());
assert!(!db
.delete(
tx,
RowID {
table_id: 1,
row_id: 1
}
)
.await
.unwrap());
}
#[traced_test]
@@ -672,23 +780,59 @@ mod tests {
let db = test_db();
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
let row = db
.read(
tx1,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
let tx1_updated_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "World".to_string(),
};
db.update(tx1, tx1_updated_row.clone()).await.unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
let row = db
.read(
tx1,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_updated_row, row);
db.commit_tx(tx1).await.unwrap();
let tx2 = db.begin_tx().await;
let row = db.read(tx2, 1).await.unwrap().unwrap();
let row = db
.read(
tx2,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
db.commit_tx(tx2).await.unwrap();
assert_eq!(tx1_updated_row, row);
db.drop_unused_row_versions().await;
@@ -700,22 +844,57 @@ mod tests {
let db = test_db();
let tx1 = db.begin_tx().await;
let row1 = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
};
db.insert(tx1, row1.clone()).await.unwrap();
let row2 = db.read(tx1, 1).await.unwrap().unwrap();
let row2 = db
.read(
tx1,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(row1, row2);
let row3 = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "World".to_string(),
};
db.update(tx1, row3.clone()).await.unwrap();
let row4 = db.read(tx1, 1).await.unwrap().unwrap();
let row4 = db
.read(
tx1,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(row3, row4);
db.rollback_tx(tx1).await;
let tx2 = db.begin_tx().await;
let row5 = db.read(tx2, 1).await.unwrap();
let row5 = db
.read(
tx2,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap();
assert_eq!(row5, None);
}
@@ -727,22 +906,48 @@ mod tests {
// T1 inserts a row with ID 1, but does not commit.
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
let row = db
.read(
tx1,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
// T2 attempts to delete row with ID 1, but fails because T1 has not committed.
let tx2 = db.begin_tx().await;
let tx2_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "World".to_string(),
};
assert!(!db.update(tx2, tx2_row).await.unwrap());
let row = db.read(tx1, 1).await.unwrap().unwrap();
let row = db
.read(
tx1,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
}
@@ -754,14 +959,26 @@ mod tests {
// T1 inserts a row with ID 1, but does not commit.
let tx1 = db.begin_tx().await;
let row1 = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
};
db.insert(tx1, row1).await.unwrap();
// T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed.
let tx2 = db.begin_tx().await;
let row2 = db.read(tx2, 1).await.unwrap();
let row2 = db
.read(
tx2,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap();
assert_eq!(row2, None);
}
@@ -774,7 +991,10 @@ mod tests {
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
@@ -782,11 +1002,30 @@ mod tests {
// T2 deletes row with ID 1, but does not commit.
let tx2 = db.begin_tx().await;
assert!(db.delete(tx2, 1).await.unwrap());
assert!(db
.delete(
tx2,
RowID {
table_id: 1,
row_id: 1
}
)
.await
.unwrap());
// T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed.
let tx3 = db.begin_tx().await;
let row = db.read(tx3, 1).await.unwrap().unwrap();
let row = db
.read(
tx3,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
}
@@ -798,30 +1037,66 @@ mod tests {
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
let row = db
.read(
tx1,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
db.commit_tx(tx1).await.unwrap();
// T2 reads the row with ID 1 within an active transaction.
let tx2 = db.begin_tx().await;
let row = db.read(tx2, 1).await.unwrap().unwrap();
let row = db
.read(
tx2,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
// T3 updates the row and commits.
let tx3 = db.begin_tx().await;
let tx3_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "World".to_string(),
};
db.update(tx3, tx3_row).await.unwrap();
db.commit_tx(tx3).await.unwrap();
// T2 still reads the same version of the row as before.
let row = db.read(tx2, 1).await.unwrap().unwrap();
let row = db
.read(
tx2,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
}
@@ -833,18 +1108,34 @@ mod tests {
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
let row = db
.read(
tx1,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
db.commit_tx(tx1).await.unwrap();
// T2 attempts to update row ID 1 within an active transaction.
let tx2 = db.begin_tx().await;
let tx2_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "World".to_string(),
};
assert!(db.update(tx2, tx2_row.clone()).await.unwrap());
@@ -852,7 +1143,10 @@ mod tests {
// T3 also attempts to update row ID 1 within an active transaction.
let tx3 = db.begin_tx().await;
let tx3_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "Hello, world!".to_string(),
};
assert_eq!(
@@ -864,7 +1158,17 @@ mod tests {
assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3).await);
let tx4 = db.begin_tx().await;
let row = db.read(tx4, 1).await.unwrap().unwrap();
let row = db
.read(
tx4,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx2_row, row);
}
@@ -878,7 +1182,10 @@ mod tests {
// let's add $10 to my account since I like money
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "10".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
@@ -887,16 +1194,39 @@ mod tests {
// but I like more money, so let me try adding $10 more
let tx2 = db.begin_tx().await;
let tx2_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "20".to_string(),
};
assert!(db.update(tx2, tx2_row.clone()).await.unwrap());
let row = db.read(tx2, 1).await.unwrap().unwrap();
let row = db
.read(
tx2,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(row, tx2_row);
// can I check how much money I have?
let tx3 = db.begin_tx().await;
let row = db.read(tx3, 1).await.unwrap().unwrap();
let row = db
.read(
tx3,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
}
@@ -910,18 +1240,39 @@ mod tests {
let tx2 = db.begin_tx().await;
let tx2_row = Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "10".to_string(),
};
db.insert(tx2, tx2_row.clone()).await.unwrap();
// transaction in progress, so tx1 shouldn't be able to see the value
let row = db.read(tx1, 1).await.unwrap();
let row = db
.read(
tx1,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap();
assert_eq!(row, None);
// lets commit the transaction and check if tx1 can see it
db.commit_tx(tx2).await.unwrap();
let row = db.read(tx1, 1).await.unwrap();
let row = db
.read(
tx1,
RowID {
table_id: 1,
row_id: 1,
},
)
.await
.unwrap();
assert_eq!(row, None);
}
@@ -947,7 +1298,10 @@ mod tests {
db.insert(
tx3,
Row {
id: 1,
id: RowID {
table_id: 1,
row_id: 1,
},
data: "testme".to_string(),
},
)
@@ -962,7 +1316,10 @@ mod tests {
db.insert(
tx4,
Row {
id: 2,
id: RowID {
table_id: 1,
row_id: 2,
},
data: "testme2".to_string(),
},
)
@@ -971,16 +1328,58 @@ mod tests {
db.insert(
tx4,
Row {
id: 3,
id: RowID {
table_id: 1,
row_id: 3,
},
data: "testme3".to_string(),
},
)
.await
.unwrap();
assert_eq!(db.read(tx4, 1).await.unwrap().unwrap().data, "testme");
assert_eq!(db.read(tx4, 2).await.unwrap().unwrap().data, "testme2");
assert_eq!(db.read(tx4, 3).await.unwrap().unwrap().data, "testme3");
assert_eq!(
db.read(
tx4,
RowID {
table_id: 1,
row_id: 1
}
)
.await
.unwrap()
.unwrap()
.data,
"testme"
);
assert_eq!(
db.read(
tx4,
RowID {
table_id: 1,
row_id: 2
}
)
.await
.unwrap()
.unwrap()
.data,
"testme2"
);
assert_eq!(
db.read(
tx4,
RowID {
table_id: 1,
row_id: 3
}
)
.await
.unwrap()
.unwrap()
.data,
"testme3"
);
db.commit_tx(tx4).await.unwrap();
let clock = LocalClock::new();
@@ -990,9 +1389,58 @@ mod tests {
println!("{:#?}", db);
let tx5 = db.begin_tx().await;
println!("{:#?}", db.read(tx5, 1).await);
assert_eq!(db.read(tx5, 1).await.unwrap().unwrap().data, "testme");
assert_eq!(db.read(tx5, 2).await.unwrap().unwrap().data, "testme2");
assert_eq!(db.read(tx5, 3).await.unwrap().unwrap().data, "testme3");
println!(
"{:#?}",
db.read(
tx5,
RowID {
table_id: 1,
row_id: 1
}
)
.await
);
assert_eq!(
db.read(
tx5,
RowID {
table_id: 1,
row_id: 1
}
)
.await
.unwrap()
.unwrap()
.data,
"testme"
);
assert_eq!(
db.read(
tx5,
RowID {
table_id: 1,
row_id: 2
}
)
.await
.unwrap()
.unwrap()
.data,
"testme2"
);
assert_eq!(
db.read(
tx5,
RowID {
table_id: 1,
row_id: 3
}
)
.await
.unwrap()
.unwrap()
.data,
"testme3"
);
}
}

View File

@@ -1,5 +1,5 @@
use mvcc_rs::clock::LocalClock;
use mvcc_rs::database::{Database, Row};
use mvcc_rs::database::{Database, Row, RowID};
use shuttle::sync::atomic::AtomicU64;
use shuttle::sync::Arc;
use shuttle::thread;
@@ -22,6 +22,10 @@ fn test_non_overlapping_concurrent_inserts() {
shuttle::future::block_on(async move {
let tx = db.begin_tx().await;
let id = ids.fetch_add(1, Ordering::SeqCst);
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: "Hello".to_string(),
@@ -42,6 +46,10 @@ fn test_non_overlapping_concurrent_inserts() {
shuttle::future::block_on(async move {
let tx = db.begin_tx().await;
let id = ids.fetch_add(1, Ordering::SeqCst);
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: "World".to_string(),