mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-10 10:44:22 +01:00
core/mvcc: add option to test with a random file and restart it
This commit is contained in:
@@ -1,13 +1,15 @@
|
||||
use tempfile::TempPath;
|
||||
|
||||
use super::*;
|
||||
use crate::mvcc::clock::LocalClock;
|
||||
|
||||
pub(crate) struct MvccTestDbNoConn {
|
||||
pub(crate) db: Arc<Database>,
|
||||
pub(crate) db: Option<Arc<Database>>,
|
||||
path: Option<String>,
|
||||
}
|
||||
pub(crate) struct MvccTestDb {
|
||||
pub(crate) mvcc_store: Arc<MvStore<LocalClock>>,
|
||||
|
||||
pub(crate) _db: Arc<Database>,
|
||||
pub(crate) db: Arc<Database>,
|
||||
pub(crate) conn: Arc<Connection>,
|
||||
}
|
||||
|
||||
@@ -19,7 +21,7 @@ impl MvccTestDb {
|
||||
let mvcc_store = db.mv_store.as_ref().unwrap().clone();
|
||||
Self {
|
||||
mvcc_store,
|
||||
_db: db,
|
||||
db,
|
||||
conn,
|
||||
}
|
||||
}
|
||||
@@ -29,7 +31,37 @@ impl MvccTestDbNoConn {
|
||||
pub fn new() -> Self {
|
||||
let io = Arc::new(MemoryIO::new());
|
||||
let db = Database::open_file(io.clone(), ":memory:", true, true).unwrap();
|
||||
Self { db }
|
||||
Self {
|
||||
db: Some(db),
|
||||
path: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Opens a database with a file
|
||||
pub fn new_with_random_db() -> Self {
|
||||
let temp_dir = tempfile::TempDir::new().unwrap();
|
||||
let path = temp_dir
|
||||
.path()
|
||||
.join(format!("test_{}", rand::random::<u64>()));
|
||||
let io = Arc::new(PlatformIO::new().unwrap());
|
||||
let db = Database::open_file(io.clone(), path.as_os_str().to_str().unwrap(), true, true)
|
||||
.unwrap();
|
||||
Self {
|
||||
db: Some(db),
|
||||
path: Some(path.to_str().unwrap().to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Restarts the database, make sure there is no connection to the database open before calling this!
|
||||
pub fn restart(&mut self) {
|
||||
let io = Arc::new(PlatformIO::new().unwrap());
|
||||
let db = Database::open_file(io.clone(), &self.path.unwrap(), true, true).unwrap();
|
||||
self.db.replace(db);
|
||||
}
|
||||
|
||||
/// Asumes there is a database open
|
||||
pub fn get_db(&self) -> Arc<Database> {
|
||||
self.db.as_ref().unwrap().clone()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -292,7 +324,7 @@ fn test_dirty_write() {
|
||||
.unwrap();
|
||||
assert_eq!(tx1_row, row);
|
||||
|
||||
let conn2 = db._db.connect().unwrap();
|
||||
let conn2 = db.db.connect().unwrap();
|
||||
// T2 attempts to delete row with ID 1, but fails because T1 has not committed.
|
||||
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
|
||||
let tx2_row = generate_simple_string_row(1, 1, "World");
|
||||
@@ -325,7 +357,7 @@ fn test_dirty_read() {
|
||||
db.mvcc_store.insert(tx1, row1).unwrap();
|
||||
|
||||
// T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed.
|
||||
let conn2 = db._db.connect().unwrap();
|
||||
let conn2 = db.db.connect().unwrap();
|
||||
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
|
||||
let row2 = db
|
||||
.mvcc_store
|
||||
@@ -351,7 +383,7 @@ fn test_dirty_read_deleted() {
|
||||
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
|
||||
|
||||
// T2 deletes row with ID 1, but does not commit.
|
||||
let conn2 = db._db.connect().unwrap();
|
||||
let conn2 = db.db.connect().unwrap();
|
||||
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
|
||||
assert!(db
|
||||
.mvcc_store
|
||||
@@ -366,7 +398,7 @@ fn test_dirty_read_deleted() {
|
||||
.unwrap());
|
||||
|
||||
// T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed.
|
||||
let conn3 = db._db.connect().unwrap();
|
||||
let conn3 = db.db.connect().unwrap();
|
||||
let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone());
|
||||
let row = db
|
||||
.mvcc_store
|
||||
@@ -405,7 +437,7 @@ fn test_fuzzy_read() {
|
||||
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
|
||||
|
||||
// T2 reads the row with ID 1 within an active transaction.
|
||||
let conn2 = db._db.connect().unwrap();
|
||||
let conn2 = db.db.connect().unwrap();
|
||||
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
|
||||
let row = db
|
||||
.mvcc_store
|
||||
@@ -421,7 +453,7 @@ fn test_fuzzy_read() {
|
||||
assert_eq!(tx1_row, row);
|
||||
|
||||
// T3 updates the row and commits.
|
||||
let conn3 = db._db.connect().unwrap();
|
||||
let conn3 = db.db.connect().unwrap();
|
||||
let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone());
|
||||
let tx3_row = generate_simple_string_row(1, 1, "Second");
|
||||
db.mvcc_store
|
||||
@@ -475,7 +507,7 @@ fn test_lost_update() {
|
||||
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
|
||||
|
||||
// T2 attempts to update row ID 1 within an active transaction.
|
||||
let conn2 = db._db.connect().unwrap();
|
||||
let conn2 = db.db.connect().unwrap();
|
||||
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
|
||||
let tx2_row = generate_simple_string_row(1, 1, "World");
|
||||
assert!(db
|
||||
@@ -484,7 +516,7 @@ fn test_lost_update() {
|
||||
.unwrap());
|
||||
|
||||
// T3 also attempts to update row ID 1 within an active transaction.
|
||||
let conn3 = db._db.connect().unwrap();
|
||||
let conn3 = db.db.connect().unwrap();
|
||||
let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone());
|
||||
let tx3_row = generate_simple_string_row(1, 1, "Hello, world!");
|
||||
assert!(matches!(
|
||||
@@ -499,7 +531,7 @@ fn test_lost_update() {
|
||||
Err(LimboError::TxTerminated)
|
||||
));
|
||||
|
||||
let conn4 = db._db.connect().unwrap();
|
||||
let conn4 = db.db.connect().unwrap();
|
||||
let tx4 = db.mvcc_store.begin_tx(conn4.pager.borrow().clone());
|
||||
let row = db
|
||||
.mvcc_store
|
||||
@@ -528,7 +560,7 @@ fn test_committed_visibility() {
|
||||
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
|
||||
|
||||
// but I like more money, so let me try adding $10 more
|
||||
let conn2 = db._db.connect().unwrap();
|
||||
let conn2 = db.db.connect().unwrap();
|
||||
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
|
||||
let tx2_row = generate_simple_string_row(1, 1, "20");
|
||||
assert!(db
|
||||
@@ -549,7 +581,7 @@ fn test_committed_visibility() {
|
||||
assert_eq!(row, tx2_row);
|
||||
|
||||
// can I check how much money I have?
|
||||
let conn3 = db._db.connect().unwrap();
|
||||
let conn3 = db.db.connect().unwrap();
|
||||
let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone());
|
||||
let row = db
|
||||
.mvcc_store
|
||||
@@ -572,7 +604,7 @@ fn test_future_row() {
|
||||
|
||||
let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
|
||||
|
||||
let conn2 = db._db.connect().unwrap();
|
||||
let conn2 = db.db.connect().unwrap();
|
||||
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
|
||||
let tx2_row = generate_simple_string_row(1, 1, "Hello");
|
||||
db.mvcc_store.insert(tx2, tx2_row).unwrap();
|
||||
@@ -608,10 +640,10 @@ fn test_future_row() {
|
||||
use crate::mvcc::cursor::MvccLazyCursor;
|
||||
use crate::mvcc::database::{MvStore, Row, RowID};
|
||||
use crate::types::Text;
|
||||
use crate::Database;
|
||||
use crate::MemoryIO;
|
||||
use crate::RefValue;
|
||||
use crate::Value;
|
||||
use crate::{Database, UnixIO};
|
||||
|
||||
// Simple atomic clock implementation for testing
|
||||
|
||||
@@ -682,7 +714,7 @@ pub(crate) fn commit_tx_no_conn(
|
||||
conn: &Arc<Connection>,
|
||||
) -> Result<(), LimboError> {
|
||||
let mut sm = db
|
||||
.db
|
||||
.get_db()
|
||||
.get_mv_store()
|
||||
.unwrap()
|
||||
.commit_tx(tx_id, conn.pager.borrow().clone(), conn)
|
||||
@@ -1028,3 +1060,9 @@ fn test_snapshot_isolation_tx_visible1() {
|
||||
Some(TxTimestampOrID::TxID(7))
|
||||
));
|
||||
}
|
||||
|
||||
fn setup_random_db() -> (MvccTestDb, u64) {
|
||||
let db = MvccTestDb::new();
|
||||
let tx_id = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
|
||||
(db, tx_id)
|
||||
}
|
||||
|
||||
@@ -62,8 +62,8 @@ mod tests {
|
||||
let th1 = {
|
||||
let db = db.clone();
|
||||
std::thread::spawn(move || {
|
||||
let conn = db.db.connect().unwrap();
|
||||
let mvcc_store = db.db.mv_store.as_ref().unwrap().clone();
|
||||
let conn = db.get_db().connect().unwrap();
|
||||
let mvcc_store = db.get_db().mv_store.as_ref().unwrap().clone();
|
||||
for _ in 0..iterations {
|
||||
let tx = mvcc_store.begin_tx(conn.pager.borrow().clone());
|
||||
let id = IDS.fetch_add(1, Ordering::SeqCst);
|
||||
@@ -83,8 +83,8 @@ mod tests {
|
||||
};
|
||||
let th2 = {
|
||||
std::thread::spawn(move || {
|
||||
let conn = db.db.connect().unwrap();
|
||||
let mvcc_store = db.db.mv_store.as_ref().unwrap().clone();
|
||||
let conn = db.get_db().connect().unwrap();
|
||||
let mvcc_store = db.get_db().mv_store.as_ref().unwrap().clone();
|
||||
for _ in 0..iterations {
|
||||
let tx = mvcc_store.begin_tx(conn.pager.borrow().clone());
|
||||
let id = IDS.fetch_add(1, Ordering::SeqCst);
|
||||
@@ -116,8 +116,8 @@ mod tests {
|
||||
let work = |prefix: &'static str| {
|
||||
let db = db.clone();
|
||||
std::thread::spawn(move || {
|
||||
let conn = db.db.connect().unwrap();
|
||||
let mvcc_store = db.db.mv_store.as_ref().unwrap().clone();
|
||||
let conn = db.get_db().connect().unwrap();
|
||||
let mvcc_store = db.get_db().mv_store.as_ref().unwrap().clone();
|
||||
let mut failed_upserts = 0;
|
||||
for i in 0..iterations {
|
||||
if i % 1000 == 0 {
|
||||
|
||||
Reference in New Issue
Block a user