diff --git a/core/benches/mvcc_benchmark.rs b/core/benches/mvcc_benchmark.rs index 15faffbac..f12875202 100644 --- a/core/benches/mvcc_benchmark.rs +++ b/core/benches/mvcc_benchmark.rs @@ -1,13 +1,29 @@ +use std::sync::Arc; + use criterion::async_executor::FuturesExecutor; use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use pprof::criterion::{Output, PProfProfiler}; use turso_core::mvcc::clock::LocalClock; use turso_core::mvcc::database::{MvStore, Row, RowID}; +use turso_core::types::{ImmutableRecord, Text}; +use turso_core::{Connection, Database, MemoryIO, Value}; -fn bench_db() -> MvStore { - let clock = LocalClock::default(); - let storage = turso_core::mvcc::persistent_storage::Storage::new_noop(); - MvStore::new(clock, storage) +struct BenchDb { + db: Arc, + conn: Arc, + mvcc_store: Arc>, +} + +fn bench_db() -> BenchDb { + let io = Arc::new(MemoryIO::new()); + let db = Database::open_file(io.clone(), ":memory:", true, true).unwrap(); + let conn = db.connect().unwrap(); + let mvcc_store = db.get_mv_store().unwrap().clone(); + BenchDb { + db, + conn, + mvcc_store, + } } fn bench(c: &mut Criterion) { @@ -16,107 +32,129 @@ fn bench(c: &mut Criterion) { let db = bench_db(); group.bench_function("begin_tx + rollback_tx", |b| { + let db = bench_db(); b.to_async(FuturesExecutor).iter(|| async { - let tx_id = db.begin_tx(); - db.rollback_tx(tx_id) + let conn = db.conn.clone(); + let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()); + db.mvcc_store.rollback_tx(tx_id) }) }); let db = bench_db(); group.bench_function("begin_tx + commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { - let tx_id = db.begin_tx(); - db.commit_tx(tx_id) + let conn = &db.conn; + let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()); + db.mvcc_store + .commit_tx(tx_id, conn.get_pager().clone(), &conn) }) }); let db = bench_db(); group.bench_function("begin_tx-read-commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { - let tx_id = db.begin_tx(); - db.read( - tx_id, - RowID { - table_id: 1, - row_id: 1, - }, - ) - .unwrap(); - db.commit_tx(tx_id) + let conn = &db.conn; + let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()); + db.mvcc_store + .read( + tx_id, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); + db.mvcc_store + .commit_tx(tx_id, conn.get_pager().clone(), &conn) }) }); let db = bench_db(); + let record = ImmutableRecord::from_values(&vec![Value::Text(Text::new("World"))], 1); + let record_data = record.as_blob(); group.bench_function("begin_tx-update-commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { - let tx_id = db.begin_tx(); - db.update( - tx_id, - Row { - id: RowID { - table_id: 1, - row_id: 1, + let conn = &db.conn; + let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()); + db.mvcc_store + .update( + tx_id, + Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: record_data.clone(), + column_count: 1, }, - data: "World".to_string().into_bytes(), - }, - ) - .unwrap(); - db.commit_tx(tx_id) + ) + .unwrap(); + db.mvcc_store + .commit_tx(tx_id, conn.get_pager().clone(), &conn) + .unwrap(); }) }); let db = bench_db(); - let tx = db.begin_tx(); - db.insert( - tx, - Row { - id: RowID { - table_id: 1, - row_id: 1, - }, - data: "Hello".to_string().into_bytes(), - }, - ) - .unwrap(); - group.bench_function("read", |b| { - b.to_async(FuturesExecutor).iter(|| async { - db.read( - tx, - RowID { + let tx_id = db.mvcc_store.begin_tx(db.conn.get_pager().clone()); + db.mvcc_store + .insert( + tx_id, + Row { + id: RowID { table_id: 1, row_id: 1, }, - ) - .unwrap(); + data: record_data.clone(), + column_count: 1, + }, + ) + .unwrap(); + group.bench_function("read", |b| { + b.to_async(FuturesExecutor).iter(|| async { + db.mvcc_store + .read( + tx_id, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .unwrap(); }) }); let db = bench_db(); - let tx = db.begin_tx(); - db.insert( - tx, - Row { - id: RowID { - table_id: 1, - row_id: 1, + let tx_id = db.mvcc_store.begin_tx(db.conn.get_pager().clone()); + let conn = &db.conn; + db.mvcc_store + .insert( + tx_id, + Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: record_data.clone(), + column_count: 1, }, - data: "Hello".to_string().into_bytes(), - }, - ) - .unwrap(); + ) + .unwrap(); group.bench_function("update", |b| { b.to_async(FuturesExecutor).iter(|| async { - db.update( - tx, - Row { - id: RowID { - table_id: 1, - row_id: 1, + db.mvcc_store + .update( + tx_id, + Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: record_data.clone(), + column_count: 1, }, - data: "World".to_string().into_bytes(), - }, - ) - .unwrap(); + ) + .unwrap(); }) }); } diff --git a/core/lib.rs b/core/lib.rs index cb3ed3dd9..229edabb3 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -528,6 +528,10 @@ impl Database { } Ok(()) } + + pub fn get_mv_store(&self) -> Option<&Arc> { + self.mv_store.as_ref() + } } fn get_schema_version(conn: &Arc) -> Result { @@ -1700,6 +1704,10 @@ impl Connection { databases.sort_by_key(|&(seq, _, _)| seq); databases } + + pub fn get_pager(&self) -> Rc { + self.pager.borrow().clone() + } } pub struct Statement { diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 3d205e92d..fedce8802 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -313,9 +313,9 @@ impl MvStore { /// # Returns /// /// Returns `true` if the row was successfully updated, and `false` otherwise. - pub fn update(&self, tx_id: TxID, row: Row) -> Result { + pub fn update(&self, tx_id: TxID, row: Row, pager: Rc) -> Result { tracing::trace!("update(tx_id={}, row.id={:?})", tx_id, row.id); - if !self.delete(tx_id, row.id)? { + if !self.delete(tx_id, row.id, pager)? { return Ok(false); } self.insert(tx_id, row)?; @@ -324,9 +324,9 @@ impl MvStore { /// Inserts a row in the database with new values, previously deleting /// any old data if it existed. Bails on a delete error, e.g. write-write conflict. - pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> { + pub fn upsert(&self, tx_id: TxID, row: Row, pager: Rc) -> Result<()> { tracing::trace!("upsert(tx_id={}, row.id={:?})", tx_id, row.id); - self.delete(tx_id, row.id)?; + self.delete(tx_id, row.id, pager)?; self.insert(tx_id, row) } @@ -344,7 +344,7 @@ impl MvStore { /// /// Returns `true` if the row was successfully deleted, and `false` otherwise. /// - pub fn delete(&self, tx_id: TxID, id: RowID) -> Result { + pub fn delete(&self, tx_id: TxID, id: RowID, pager: Rc) -> Result { tracing::trace!("delete(tx_id={}, id={:?})", tx_id, id); let row_versions_opt = self.rows.get(&id); if let Some(ref row_versions) = row_versions_opt { @@ -365,7 +365,7 @@ impl MvStore { drop(row_versions); drop(row_versions_opt); drop(tx); - self.rollback_tx(tx_id); + self.rollback_tx(tx_id, pager); return Err(DatabaseError::WriteWriteConflict); } @@ -725,7 +725,7 @@ impl MvStore { /// # Arguments /// /// * `tx_id` - The ID of the transaction to abort. - pub fn rollback_tx(&self, tx_id: TxID) { + pub fn rollback_tx(&self, tx_id: TxID, pager: Rc) { let tx_unlocked = self.txs.get(&tx_id).unwrap(); let tx = tx_unlocked.value().write(); assert_eq!(tx.state, TransactionState::Active); @@ -747,6 +747,7 @@ impl MvStore { let tx = tx_unlocked.value().read(); tx.state.store(TransactionState::Terminated); tracing::trace!("terminate(tx_id={})", tx_id); + pager.end_read_tx().unwrap(); // FIXME: verify that we can already remove the transaction here! // Maybe it's fine for snapshot isolation, but too early for serializable? self.txs.remove(&tx_id); diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 72486a550..6af7a5e6a 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -123,6 +123,7 @@ fn test_delete() { table_id: 1, row_id: 1, }, + db.conn.pager.borrow().clone(), ) .unwrap(); let row = db @@ -165,7 +166,8 @@ fn test_delete_nonexistent() { RowID { table_id: 1, row_id: 1 - } + }, + db.conn.pager.borrow().clone(), ) .unwrap()); } @@ -189,7 +191,9 @@ fn test_commit() { .unwrap(); assert_eq!(tx1_row, row); let tx1_updated_row = generate_simple_string_row(1, 1, "World"); - db.mvcc_store.update(tx1, tx1_updated_row.clone()).unwrap(); + db.mvcc_store + .update(tx1, tx1_updated_row.clone(), db.conn.pager.borrow().clone()) + .unwrap(); let row = db .mvcc_store .read( @@ -244,7 +248,9 @@ fn test_rollback() { .unwrap(); assert_eq!(row1, row2); let row3 = generate_simple_string_row(1, 1, "World"); - db.mvcc_store.update(tx1, row3.clone()).unwrap(); + db.mvcc_store + .update(tx1, row3.clone(), db.conn.pager.borrow().clone()) + .unwrap(); let row4 = db .mvcc_store .read( @@ -257,7 +263,8 @@ fn test_rollback() { .unwrap() .unwrap(); assert_eq!(row3, row4); - db.mvcc_store.rollback_tx(tx1); + db.mvcc_store + .rollback_tx(tx1, db.conn.pager.borrow().clone()); let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); let row5 = db .mvcc_store @@ -293,10 +300,14 @@ fn test_dirty_write() { .unwrap(); assert_eq!(tx1_row, row); + let conn2 = db._db.connect().unwrap(); // T2 attempts to delete row with ID 1, but fails because T1 has not committed. - let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone()); let tx2_row = generate_simple_string_row(1, 1, "World"); - assert!(!db.mvcc_store.update(tx2, tx2_row).unwrap()); + assert!(!db + .mvcc_store + .update(tx2, tx2_row, conn2.pager.borrow().clone()) + .unwrap()); let row = db .mvcc_store @@ -322,7 +333,8 @@ fn test_dirty_read() { db.mvcc_store.insert(tx1, row1).unwrap(); // T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed. - let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let conn2 = db._db.connect().unwrap(); + let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone()); let row2 = db .mvcc_store .read( @@ -349,7 +361,8 @@ fn test_dirty_read_deleted() { .unwrap(); // T2 deletes row with ID 1, but does not commit. - let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let conn2 = db._db.connect().unwrap(); + let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone()); assert!(db .mvcc_store .delete( @@ -357,12 +370,14 @@ fn test_dirty_read_deleted() { RowID { table_id: 1, row_id: 1 - } + }, + conn2.pager.borrow().clone(), ) .unwrap()); // T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed. - let tx3 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let conn3 = db._db.connect().unwrap(); + let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone()); let row = db .mvcc_store .read( @@ -402,7 +417,8 @@ fn test_fuzzy_read() { .unwrap(); // T2 reads the row with ID 1 within an active transaction. - let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let conn2 = db._db.connect().unwrap(); + let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone()); let row = db .mvcc_store .read( @@ -417,11 +433,14 @@ fn test_fuzzy_read() { assert_eq!(tx1_row, row); // T3 updates the row and commits. - let tx3 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let conn3 = db._db.connect().unwrap(); + let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone()); let tx3_row = generate_simple_string_row(1, 1, "Second"); - db.mvcc_store.update(tx3, tx3_row).unwrap(); db.mvcc_store - .commit_tx(tx3, db.conn.pager.borrow().clone(), &db.conn) + .update(tx3, tx3_row, conn3.pager.borrow().clone()) + .unwrap(); + db.mvcc_store + .commit_tx(tx3, conn3.pager.borrow().clone(), &db.conn) .unwrap(); // T2 still reads the same version of the row as before. @@ -441,7 +460,9 @@ fn test_fuzzy_read() { // T2 tries to update the row, but fails because T3 has already committed an update to the row, // so T2 trying to write would violate snapshot isolation if it succeeded. let tx2_newrow = generate_simple_string_row(1, 1, "Third"); - let update_result = db.mvcc_store.update(tx2, tx2_newrow); + let update_result = db + .mvcc_store + .update(tx2, tx2_newrow, conn2.pager.borrow().clone()); assert_eq!(Err(DatabaseError::WriteWriteConflict), update_result); } @@ -470,28 +491,35 @@ fn test_lost_update() { .unwrap(); // T2 attempts to update row ID 1 within an active transaction. - let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let conn2 = db._db.connect().unwrap(); + let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone()); let tx2_row = generate_simple_string_row(1, 1, "World"); - assert!(db.mvcc_store.update(tx2, tx2_row.clone()).unwrap()); + assert!(db + .mvcc_store + .update(tx2, tx2_row.clone(), conn2.pager.borrow().clone()) + .unwrap()); // T3 also attempts to update row ID 1 within an active transaction. - let tx3 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let conn3 = db._db.connect().unwrap(); + let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone()); let tx3_row = generate_simple_string_row(1, 1, "Hello, world!"); assert_eq!( Err(DatabaseError::WriteWriteConflict), - db.mvcc_store.update(tx3, tx3_row) + db.mvcc_store + .update(tx3, tx3_row, conn3.pager.borrow().clone()) ); db.mvcc_store - .commit_tx(tx2, db.conn.pager.borrow().clone(), &db.conn) + .commit_tx(tx2, conn2.pager.borrow().clone(), &db.conn) .unwrap(); assert_eq!( Err(DatabaseError::TxTerminated), db.mvcc_store - .commit_tx(tx3, db.conn.pager.borrow().clone(), &db.conn) + .commit_tx(tx3, conn3.pager.borrow().clone(), &db.conn) ); - let tx4 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let conn4 = db._db.connect().unwrap(); + let tx4 = db.mvcc_store.begin_tx(conn4.pager.borrow().clone()); let row = db .mvcc_store .read( @@ -521,9 +549,13 @@ fn test_committed_visibility() { .unwrap(); // but I like more money, so let me try adding $10 more - let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let conn2 = db._db.connect().unwrap(); + let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone()); let tx2_row = generate_simple_string_row(1, 1, "20"); - assert!(db.mvcc_store.update(tx2, tx2_row.clone()).unwrap()); + assert!(db + .mvcc_store + .update(tx2, tx2_row.clone(), conn2.pager.borrow().clone()) + .unwrap()); let row = db .mvcc_store .read( @@ -538,7 +570,8 @@ fn test_committed_visibility() { assert_eq!(row, tx2_row); // can I check how much money I have? - let tx3 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let conn3 = db._db.connect().unwrap(); + let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone()); let row = db .mvcc_store .read( @@ -560,7 +593,8 @@ fn test_future_row() { let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); - let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let conn2 = db._db.connect().unwrap(); + let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone()); let tx2_row = generate_simple_string_row(1, 1, "Hello"); db.mvcc_store.insert(tx2, tx2_row).unwrap(); @@ -579,7 +613,7 @@ fn test_future_row() { // lets commit the transaction and check if tx1 can see it db.mvcc_store - .commit_tx(tx2, db.conn.pager.borrow().clone(), &db.conn) + .commit_tx(tx2, conn2.pager.borrow().clone(), &db.conn) .unwrap(); let row = db .mvcc_store diff --git a/core/mvcc/mod.rs b/core/mvcc/mod.rs index 73a29d8fb..c5ad30d9d 100644 --- a/core/mvcc/mod.rs +++ b/core/mvcc/mod.rs @@ -141,7 +141,8 @@ mod tests { row_id: id, }; let row = generate_simple_string_row(1, id.row_id, &format!("{prefix} @{tx}")); - if let Err(e) = mvcc_store.upsert(tx, row.clone()) { + if let Err(e) = mvcc_store.upsert(tx, row.clone(), conn.pager.borrow().clone()) + { tracing::trace!("upsert failed: {e}"); failed_upserts += 1; continue; diff --git a/core/storage/btree.rs b/core/storage/btree.rs index ea101bd99..cf23e6f99 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -4435,7 +4435,7 @@ impl BTreeCursor { let record_buf = key.get_record().unwrap().get_payload().to_vec(); let num_columns = match key { BTreeKey::IndexKey(record) => record.column_count(), - BTreeKey::TableRowId((rowid, record)) => { + BTreeKey::TableRowId((_, record)) => { record.as_ref().unwrap().column_count() } }; diff --git a/core/storage/wal.rs b/core/storage/wal.rs index b9be07d1f..bb7c31c4a 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -2455,10 +2455,14 @@ pub mod test { fn check_read_lock_slot(conn: &Arc, expected_slot: usize) -> bool { let pager = conn.pager.borrow(); let wal = pager.wal.as_ref().unwrap().borrow(); - let wal_any = wal.as_any(); - if let Some(wal_file) = wal_any.downcast_ref::() { - return wal_file.max_frame_read_lock_index.get() == expected_slot; + #[cfg(debug_assertions)] + { + let wal_any = wal.as_any(); + if let Some(wal_file) = wal_any.downcast_ref::() { + return wal_file.max_frame_read_lock_index.get() == expected_slot; + } } + false }