core/mvcc: fix tests again

had to create connections for every different txn
This commit is contained in:
Pere Diaz Bou
2025-08-01 10:44:19 +02:00
parent 5ad7d10790
commit c807b035c5
7 changed files with 195 additions and 109 deletions

View File

@@ -1,13 +1,29 @@
use std::sync::Arc;
use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use pprof::criterion::{Output, PProfProfiler};
use turso_core::mvcc::clock::LocalClock;
use turso_core::mvcc::database::{MvStore, Row, RowID};
use turso_core::types::{ImmutableRecord, Text};
use turso_core::{Connection, Database, MemoryIO, Value};
fn bench_db() -> MvStore<LocalClock> {
let clock = LocalClock::default();
let storage = turso_core::mvcc::persistent_storage::Storage::new_noop();
MvStore::new(clock, storage)
struct BenchDb {
db: Arc<Database>,
conn: Arc<Connection>,
mvcc_store: Arc<MvStore<LocalClock>>,
}
fn bench_db() -> BenchDb {
let io = Arc::new(MemoryIO::new());
let db = Database::open_file(io.clone(), ":memory:", true, true).unwrap();
let conn = db.connect().unwrap();
let mvcc_store = db.get_mv_store().unwrap().clone();
BenchDb {
db,
conn,
mvcc_store,
}
}
fn bench(c: &mut Criterion) {
@@ -16,107 +32,129 @@ fn bench(c: &mut Criterion) {
let db = bench_db();
group.bench_function("begin_tx + rollback_tx", |b| {
let db = bench_db();
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx();
db.rollback_tx(tx_id)
let conn = db.conn.clone();
let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone());
db.mvcc_store.rollback_tx(tx_id)
})
});
let db = bench_db();
group.bench_function("begin_tx + commit_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx();
db.commit_tx(tx_id)
let conn = &db.conn;
let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone());
db.mvcc_store
.commit_tx(tx_id, conn.get_pager().clone(), &conn)
})
});
let db = bench_db();
group.bench_function("begin_tx-read-commit_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx();
db.read(
tx_id,
RowID {
table_id: 1,
row_id: 1,
},
)
.unwrap();
db.commit_tx(tx_id)
let conn = &db.conn;
let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone());
db.mvcc_store
.read(
tx_id,
RowID {
table_id: 1,
row_id: 1,
},
)
.unwrap();
db.mvcc_store
.commit_tx(tx_id, conn.get_pager().clone(), &conn)
})
});
let db = bench_db();
let record = ImmutableRecord::from_values(&vec![Value::Text(Text::new("World"))], 1);
let record_data = record.as_blob();
group.bench_function("begin_tx-update-commit_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx();
db.update(
tx_id,
Row {
id: RowID {
table_id: 1,
row_id: 1,
let conn = &db.conn;
let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone());
db.mvcc_store
.update(
tx_id,
Row {
id: RowID {
table_id: 1,
row_id: 1,
},
data: record_data.clone(),
column_count: 1,
},
data: "World".to_string().into_bytes(),
},
)
.unwrap();
db.commit_tx(tx_id)
)
.unwrap();
db.mvcc_store
.commit_tx(tx_id, conn.get_pager().clone(), &conn)
.unwrap();
})
});
let db = bench_db();
let tx = db.begin_tx();
db.insert(
tx,
Row {
id: RowID {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string().into_bytes(),
},
)
.unwrap();
group.bench_function("read", |b| {
b.to_async(FuturesExecutor).iter(|| async {
db.read(
tx,
RowID {
let tx_id = db.mvcc_store.begin_tx(db.conn.get_pager().clone());
db.mvcc_store
.insert(
tx_id,
Row {
id: RowID {
table_id: 1,
row_id: 1,
},
)
.unwrap();
data: record_data.clone(),
column_count: 1,
},
)
.unwrap();
group.bench_function("read", |b| {
b.to_async(FuturesExecutor).iter(|| async {
db.mvcc_store
.read(
tx_id,
RowID {
table_id: 1,
row_id: 1,
},
)
.unwrap();
})
});
let db = bench_db();
let tx = db.begin_tx();
db.insert(
tx,
Row {
id: RowID {
table_id: 1,
row_id: 1,
let tx_id = db.mvcc_store.begin_tx(db.conn.get_pager().clone());
let conn = &db.conn;
db.mvcc_store
.insert(
tx_id,
Row {
id: RowID {
table_id: 1,
row_id: 1,
},
data: record_data.clone(),
column_count: 1,
},
data: "Hello".to_string().into_bytes(),
},
)
.unwrap();
)
.unwrap();
group.bench_function("update", |b| {
b.to_async(FuturesExecutor).iter(|| async {
db.update(
tx,
Row {
id: RowID {
table_id: 1,
row_id: 1,
db.mvcc_store
.update(
tx_id,
Row {
id: RowID {
table_id: 1,
row_id: 1,
},
data: record_data.clone(),
column_count: 1,
},
data: "World".to_string().into_bytes(),
},
)
.unwrap();
)
.unwrap();
})
});
}

View File

@@ -528,6 +528,10 @@ impl Database {
}
Ok(())
}
pub fn get_mv_store(&self) -> Option<&Arc<MvStore>> {
self.mv_store.as_ref()
}
}
fn get_schema_version(conn: &Arc<Connection>) -> Result<u32> {
@@ -1700,6 +1704,10 @@ impl Connection {
databases.sort_by_key(|&(seq, _, _)| seq);
databases
}
pub fn get_pager(&self) -> Rc<Pager> {
self.pager.borrow().clone()
}
}
pub struct Statement {

View File

@@ -313,9 +313,9 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// # Returns
///
/// Returns `true` if the row was successfully updated, and `false` otherwise.
pub fn update(&self, tx_id: TxID, row: Row) -> Result<bool> {
pub fn update(&self, tx_id: TxID, row: Row, pager: Rc<Pager>) -> Result<bool> {
tracing::trace!("update(tx_id={}, row.id={:?})", tx_id, row.id);
if !self.delete(tx_id, row.id)? {
if !self.delete(tx_id, row.id, pager)? {
return Ok(false);
}
self.insert(tx_id, row)?;
@@ -324,9 +324,9 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// Inserts a row in the database with new values, previously deleting
/// any old data if it existed. Bails on a delete error, e.g. write-write conflict.
pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> {
pub fn upsert(&self, tx_id: TxID, row: Row, pager: Rc<Pager>) -> Result<()> {
tracing::trace!("upsert(tx_id={}, row.id={:?})", tx_id, row.id);
self.delete(tx_id, row.id)?;
self.delete(tx_id, row.id, pager)?;
self.insert(tx_id, row)
}
@@ -344,7 +344,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
///
/// Returns `true` if the row was successfully deleted, and `false` otherwise.
///
pub fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
pub fn delete(&self, tx_id: TxID, id: RowID, pager: Rc<Pager>) -> Result<bool> {
tracing::trace!("delete(tx_id={}, id={:?})", tx_id, id);
let row_versions_opt = self.rows.get(&id);
if let Some(ref row_versions) = row_versions_opt {
@@ -365,7 +365,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
drop(row_versions);
drop(row_versions_opt);
drop(tx);
self.rollback_tx(tx_id);
self.rollback_tx(tx_id, pager);
return Err(DatabaseError::WriteWriteConflict);
}
@@ -725,7 +725,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// # Arguments
///
/// * `tx_id` - The ID of the transaction to abort.
pub fn rollback_tx(&self, tx_id: TxID) {
pub fn rollback_tx(&self, tx_id: TxID, pager: Rc<Pager>) {
let tx_unlocked = self.txs.get(&tx_id).unwrap();
let tx = tx_unlocked.value().write();
assert_eq!(tx.state, TransactionState::Active);
@@ -747,6 +747,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let tx = tx_unlocked.value().read();
tx.state.store(TransactionState::Terminated);
tracing::trace!("terminate(tx_id={})", tx_id);
pager.end_read_tx().unwrap();
// FIXME: verify that we can already remove the transaction here!
// Maybe it's fine for snapshot isolation, but too early for serializable?
self.txs.remove(&tx_id);

View File

@@ -123,6 +123,7 @@ fn test_delete() {
table_id: 1,
row_id: 1,
},
db.conn.pager.borrow().clone(),
)
.unwrap();
let row = db
@@ -165,7 +166,8 @@ fn test_delete_nonexistent() {
RowID {
table_id: 1,
row_id: 1
}
},
db.conn.pager.borrow().clone(),
)
.unwrap());
}
@@ -189,7 +191,9 @@ fn test_commit() {
.unwrap();
assert_eq!(tx1_row, row);
let tx1_updated_row = generate_simple_string_row(1, 1, "World");
db.mvcc_store.update(tx1, tx1_updated_row.clone()).unwrap();
db.mvcc_store
.update(tx1, tx1_updated_row.clone(), db.conn.pager.borrow().clone())
.unwrap();
let row = db
.mvcc_store
.read(
@@ -244,7 +248,9 @@ fn test_rollback() {
.unwrap();
assert_eq!(row1, row2);
let row3 = generate_simple_string_row(1, 1, "World");
db.mvcc_store.update(tx1, row3.clone()).unwrap();
db.mvcc_store
.update(tx1, row3.clone(), db.conn.pager.borrow().clone())
.unwrap();
let row4 = db
.mvcc_store
.read(
@@ -257,7 +263,8 @@ fn test_rollback() {
.unwrap()
.unwrap();
assert_eq!(row3, row4);
db.mvcc_store.rollback_tx(tx1);
db.mvcc_store
.rollback_tx(tx1, db.conn.pager.borrow().clone());
let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let row5 = db
.mvcc_store
@@ -293,10 +300,14 @@ fn test_dirty_write() {
.unwrap();
assert_eq!(tx1_row, row);
let conn2 = db._db.connect().unwrap();
// T2 attempts to delete row with ID 1, but fails because T1 has not committed.
let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
let tx2_row = generate_simple_string_row(1, 1, "World");
assert!(!db.mvcc_store.update(tx2, tx2_row).unwrap());
assert!(!db
.mvcc_store
.update(tx2, tx2_row, conn2.pager.borrow().clone())
.unwrap());
let row = db
.mvcc_store
@@ -322,7 +333,8 @@ fn test_dirty_read() {
db.mvcc_store.insert(tx1, row1).unwrap();
// T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed.
let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let conn2 = db._db.connect().unwrap();
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
let row2 = db
.mvcc_store
.read(
@@ -349,7 +361,8 @@ fn test_dirty_read_deleted() {
.unwrap();
// T2 deletes row with ID 1, but does not commit.
let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let conn2 = db._db.connect().unwrap();
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
assert!(db
.mvcc_store
.delete(
@@ -357,12 +370,14 @@ fn test_dirty_read_deleted() {
RowID {
table_id: 1,
row_id: 1
}
},
conn2.pager.borrow().clone(),
)
.unwrap());
// T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed.
let tx3 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let conn3 = db._db.connect().unwrap();
let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone());
let row = db
.mvcc_store
.read(
@@ -402,7 +417,8 @@ fn test_fuzzy_read() {
.unwrap();
// T2 reads the row with ID 1 within an active transaction.
let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let conn2 = db._db.connect().unwrap();
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
let row = db
.mvcc_store
.read(
@@ -417,11 +433,14 @@ fn test_fuzzy_read() {
assert_eq!(tx1_row, row);
// T3 updates the row and commits.
let tx3 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let conn3 = db._db.connect().unwrap();
let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone());
let tx3_row = generate_simple_string_row(1, 1, "Second");
db.mvcc_store.update(tx3, tx3_row).unwrap();
db.mvcc_store
.commit_tx(tx3, db.conn.pager.borrow().clone(), &db.conn)
.update(tx3, tx3_row, conn3.pager.borrow().clone())
.unwrap();
db.mvcc_store
.commit_tx(tx3, conn3.pager.borrow().clone(), &db.conn)
.unwrap();
// T2 still reads the same version of the row as before.
@@ -441,7 +460,9 @@ fn test_fuzzy_read() {
// T2 tries to update the row, but fails because T3 has already committed an update to the row,
// so T2 trying to write would violate snapshot isolation if it succeeded.
let tx2_newrow = generate_simple_string_row(1, 1, "Third");
let update_result = db.mvcc_store.update(tx2, tx2_newrow);
let update_result = db
.mvcc_store
.update(tx2, tx2_newrow, conn2.pager.borrow().clone());
assert_eq!(Err(DatabaseError::WriteWriteConflict), update_result);
}
@@ -470,28 +491,35 @@ fn test_lost_update() {
.unwrap();
// T2 attempts to update row ID 1 within an active transaction.
let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let conn2 = db._db.connect().unwrap();
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
let tx2_row = generate_simple_string_row(1, 1, "World");
assert!(db.mvcc_store.update(tx2, tx2_row.clone()).unwrap());
assert!(db
.mvcc_store
.update(tx2, tx2_row.clone(), conn2.pager.borrow().clone())
.unwrap());
// T3 also attempts to update row ID 1 within an active transaction.
let tx3 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let conn3 = db._db.connect().unwrap();
let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone());
let tx3_row = generate_simple_string_row(1, 1, "Hello, world!");
assert_eq!(
Err(DatabaseError::WriteWriteConflict),
db.mvcc_store.update(tx3, tx3_row)
db.mvcc_store
.update(tx3, tx3_row, conn3.pager.borrow().clone())
);
db.mvcc_store
.commit_tx(tx2, db.conn.pager.borrow().clone(), &db.conn)
.commit_tx(tx2, conn2.pager.borrow().clone(), &db.conn)
.unwrap();
assert_eq!(
Err(DatabaseError::TxTerminated),
db.mvcc_store
.commit_tx(tx3, db.conn.pager.borrow().clone(), &db.conn)
.commit_tx(tx3, conn3.pager.borrow().clone(), &db.conn)
);
let tx4 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let conn4 = db._db.connect().unwrap();
let tx4 = db.mvcc_store.begin_tx(conn4.pager.borrow().clone());
let row = db
.mvcc_store
.read(
@@ -521,9 +549,13 @@ fn test_committed_visibility() {
.unwrap();
// but I like more money, so let me try adding $10 more
let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let conn2 = db._db.connect().unwrap();
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
let tx2_row = generate_simple_string_row(1, 1, "20");
assert!(db.mvcc_store.update(tx2, tx2_row.clone()).unwrap());
assert!(db
.mvcc_store
.update(tx2, tx2_row.clone(), conn2.pager.borrow().clone())
.unwrap());
let row = db
.mvcc_store
.read(
@@ -538,7 +570,8 @@ fn test_committed_visibility() {
assert_eq!(row, tx2_row);
// can I check how much money I have?
let tx3 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let conn3 = db._db.connect().unwrap();
let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone());
let row = db
.mvcc_store
.read(
@@ -560,7 +593,8 @@ fn test_future_row() {
let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let conn2 = db._db.connect().unwrap();
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
let tx2_row = generate_simple_string_row(1, 1, "Hello");
db.mvcc_store.insert(tx2, tx2_row).unwrap();
@@ -579,7 +613,7 @@ fn test_future_row() {
// lets commit the transaction and check if tx1 can see it
db.mvcc_store
.commit_tx(tx2, db.conn.pager.borrow().clone(), &db.conn)
.commit_tx(tx2, conn2.pager.borrow().clone(), &db.conn)
.unwrap();
let row = db
.mvcc_store

View File

@@ -141,7 +141,8 @@ mod tests {
row_id: id,
};
let row = generate_simple_string_row(1, id.row_id, &format!("{prefix} @{tx}"));
if let Err(e) = mvcc_store.upsert(tx, row.clone()) {
if let Err(e) = mvcc_store.upsert(tx, row.clone(), conn.pager.borrow().clone())
{
tracing::trace!("upsert failed: {e}");
failed_upserts += 1;
continue;

View File

@@ -4435,7 +4435,7 @@ impl BTreeCursor {
let record_buf = key.get_record().unwrap().get_payload().to_vec();
let num_columns = match key {
BTreeKey::IndexKey(record) => record.column_count(),
BTreeKey::TableRowId((rowid, record)) => {
BTreeKey::TableRowId((_, record)) => {
record.as_ref().unwrap().column_count()
}
};

View File

@@ -2455,10 +2455,14 @@ pub mod test {
fn check_read_lock_slot(conn: &Arc<Connection>, expected_slot: usize) -> bool {
let pager = conn.pager.borrow();
let wal = pager.wal.as_ref().unwrap().borrow();
let wal_any = wal.as_any();
if let Some(wal_file) = wal_any.downcast_ref::<WalFile>() {
return wal_file.max_frame_read_lock_index.get() == expected_slot;
#[cfg(debug_assertions)]
{
let wal_any = wal.as_any();
if let Some(wal_file) = wal_any.downcast_ref::<WalFile>() {
return wal_file.max_frame_read_lock_index.get() == expected_slot;
}
}
false
}