diff --git a/Cargo.lock b/Cargo.lock index 19098e920..08483d9af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -356,7 +356,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "531a9155a481e2ee699d4f98f43c0ca4ff8ee1bfd55c31e9e98fb29d2b176fe0" dependencies = [ "memchr", - "regex-automata 0.4.9", + "regex-automata", "serde", ] @@ -2194,7 +2194,7 @@ dependencies = [ "rand 0.9.2", "rand_chacha 0.9.0", "regex", - "regex-syntax 0.8.5", + "regex-syntax", "rusqlite", "schemars 1.0.4", "serde", @@ -2276,11 +2276,11 @@ checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" [[package]] name = "matchers" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" dependencies = [ - "regex-automata 0.1.10", + "regex-automata", ] [[package]] @@ -2512,16 +2512,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d" -[[package]] -name = "nu-ansi-term" -version = "0.46.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" -dependencies = [ - "overload", - "winapi", -] - [[package]] name = "nu-ansi-term" version = "0.50.1" @@ -2633,12 +2623,6 @@ dependencies = [ "log", ] -[[package]] -name = "overload" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" - [[package]] name = "owo-colors" version = "4.2.0" @@ -3229,17 +3213,8 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.9", - "regex-syntax 0.8.5", -] - -[[package]] -name = "regex-automata" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" -dependencies = [ - "regex-syntax 0.6.29", + "regex-automata", + "regex-syntax", ] [[package]] @@ -3250,15 +3225,9 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.5", + "regex-syntax", ] -[[package]] -name = "regex-syntax" -version = "0.6.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" - [[package]] name = "regex-syntax" version = "0.8.5" @@ -3817,7 +3786,7 @@ dependencies = [ "once_cell", "onig", "plist", - "regex-syntax 0.8.5", + "regex-syntax", "serde", "serde_derive", "serde_json", @@ -4142,14 +4111,14 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.19" +version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" dependencies = [ "matchers", - "nu-ansi-term 0.46.0", + "nu-ansi-term", "once_cell", - "regex", + "regex-automata", "sharded-slab", "smallvec", "thread_local", @@ -4196,7 +4165,7 @@ dependencies = [ "limbo_completion", "miette", "mimalloc", - "nu-ansi-term 0.50.1", + "nu-ansi-term", "rustyline", "schemars 0.8.22", "serde", @@ -4249,7 +4218,7 @@ dependencies = [ "rand 0.8.5", "rand_chacha 0.9.0", "regex", - "regex-syntax 0.8.5", + "regex-syntax", "rstest", "rusqlite", "rustix 1.0.7", @@ -5016,6 +4985,7 @@ dependencies = [ "clap", "futures", "tokio", + "tracing-subscriber", "turso", ] diff --git a/core/benches/mvcc_benchmark.rs b/core/benches/mvcc_benchmark.rs index c69392208..de8d4bdff 100644 --- a/core/benches/mvcc_benchmark.rs +++ b/core/benches/mvcc_benchmark.rs @@ -35,7 +35,7 @@ fn bench(c: &mut Criterion) { let db = bench_db(); b.to_async(FuturesExecutor).iter(|| async { let conn = db.conn.clone(); - let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()); + let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()).unwrap(); db.mvcc_store .rollback_tx(tx_id, conn.get_pager().clone(), &conn) .unwrap(); @@ -46,7 +46,7 @@ fn bench(c: &mut Criterion) { group.bench_function("begin_tx + commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { let conn = &db.conn; - let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()); + let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()).unwrap(); let mv_store = &db.mvcc_store; let mut sm = mv_store .commit_tx(tx_id, conn.get_pager().clone(), conn) @@ -67,7 +67,7 @@ fn bench(c: &mut Criterion) { group.bench_function("begin_tx-read-commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { let conn = &db.conn; - let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()); + let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()).unwrap(); db.mvcc_store .read( tx_id, @@ -99,7 +99,7 @@ fn bench(c: &mut Criterion) { group.bench_function("begin_tx-update-commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { let conn = &db.conn; - let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()); + let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()).unwrap(); db.mvcc_store .update( tx_id, @@ -130,7 +130,7 @@ fn bench(c: &mut Criterion) { }); let db = bench_db(); - let tx_id = db.mvcc_store.begin_tx(db.conn.get_pager().clone()); + let tx_id = db.mvcc_store.begin_tx(db.conn.get_pager().clone()).unwrap(); db.mvcc_store .insert( tx_id, @@ -159,7 +159,7 @@ fn bench(c: &mut Criterion) { }); let db = bench_db(); - let tx_id = db.mvcc_store.begin_tx(db.conn.get_pager().clone()); + let tx_id = db.mvcc_store.begin_tx(db.conn.get_pager().clone()).unwrap(); db.mvcc_store .insert( tx_id, diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 71fb4f4b9..20a0d9979 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1365,7 +1365,7 @@ impl MvStore { /// This function starts a new transaction in the database and returns a `TxID` value /// that you can use to perform operations within the transaction. All changes made within the /// transaction are isolated from other transactions until you commit the transaction. - pub fn begin_tx(&self, pager: Rc) -> TxID { + pub fn begin_tx(&self, pager: Rc) -> Result { let tx_id = self.get_tx_id(); let begin_ts = self.get_timestamp(); let tx = Transaction::new(tx_id, begin_ts); @@ -1374,8 +1374,8 @@ impl MvStore { // TODO: we need to tie a pager's read transaction to a transaction ID, so that future refactors to read // pages from WAL/DB read from a consistent state to maintiain snapshot isolation. - pager.begin_read_tx().unwrap(); - tx_id + pager.begin_read_tx()?; + Ok(tx_id) } /// Commits a transaction with the specified transaction ID. diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 9ffe565e3..9ff6f2416 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -95,7 +95,10 @@ pub(crate) fn generate_simple_string_row(table_id: u64, id: i64, data: &str) -> fn test_insert_read() { let db = MvccTestDb::new(); - let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx1 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let tx1_row = generate_simple_string_row(1, 1, "Hello"); db.mvcc_store.insert(tx1, tx1_row.clone()).unwrap(); let row = db @@ -112,7 +115,10 @@ fn test_insert_read() { assert_eq!(tx1_row, row); commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap(); - let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx2 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let row = db .mvcc_store .read( @@ -130,7 +136,10 @@ fn test_insert_read() { #[test] fn test_read_nonexistent() { let db = MvccTestDb::new(); - let tx = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let row = db.mvcc_store.read( tx, RowID { @@ -145,7 +154,10 @@ fn test_read_nonexistent() { fn test_delete() { let db = MvccTestDb::new(); - let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx1 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let tx1_row = generate_simple_string_row(1, 1, "Hello"); db.mvcc_store.insert(tx1, tx1_row.clone()).unwrap(); let row = db @@ -182,7 +194,10 @@ fn test_delete() { assert!(row.is_none()); commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap(); - let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx2 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let row = db .mvcc_store .read( @@ -199,7 +214,10 @@ fn test_delete() { #[test] fn test_delete_nonexistent() { let db = MvccTestDb::new(); - let tx = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); assert!(!db .mvcc_store .delete( @@ -215,7 +233,10 @@ fn test_delete_nonexistent() { #[test] fn test_commit() { let db = MvccTestDb::new(); - let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx1 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let tx1_row = generate_simple_string_row(1, 1, "Hello"); db.mvcc_store.insert(tx1, tx1_row.clone()).unwrap(); let row = db @@ -246,7 +267,10 @@ fn test_commit() { assert_eq!(tx1_updated_row, row); commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap(); - let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx2 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let row = db .mvcc_store .read( @@ -266,7 +290,10 @@ fn test_commit() { #[test] fn test_rollback() { let db = MvccTestDb::new(); - let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx1 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let row1 = generate_simple_string_row(1, 1, "Hello"); db.mvcc_store.insert(tx1, row1.clone()).unwrap(); let row2 = db @@ -298,7 +325,10 @@ fn test_rollback() { db.mvcc_store .rollback_tx(tx1, db.conn.pager.borrow().clone(), &db.conn) .unwrap(); - let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx2 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let row5 = db .mvcc_store .read( @@ -317,7 +347,10 @@ fn test_dirty_write() { let db = MvccTestDb::new(); // T1 inserts a row with ID 1, but does not commit. - let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx1 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let tx1_row = generate_simple_string_row(1, 1, "Hello"); db.mvcc_store.insert(tx1, tx1_row.clone()).unwrap(); let row = db @@ -335,7 +368,10 @@ fn test_dirty_write() { let conn2 = db.db.connect().unwrap(); // T2 attempts to delete row with ID 1, but fails because T1 has not committed. - let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone()); + let tx2 = db + .mvcc_store + .begin_tx(conn2.pager.borrow().clone()) + .unwrap(); let tx2_row = generate_simple_string_row(1, 1, "World"); assert!(!db.mvcc_store.update(tx2, tx2_row).unwrap()); @@ -358,13 +394,19 @@ fn test_dirty_read() { let db = MvccTestDb::new(); // T1 inserts a row with ID 1, but does not commit. - let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx1 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let row1 = generate_simple_string_row(1, 1, "Hello"); db.mvcc_store.insert(tx1, row1).unwrap(); // T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed. let conn2 = db.db.connect().unwrap(); - let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone()); + let tx2 = db + .mvcc_store + .begin_tx(conn2.pager.borrow().clone()) + .unwrap(); let row2 = db .mvcc_store .read( @@ -383,14 +425,20 @@ fn test_dirty_read_deleted() { let db = MvccTestDb::new(); // T1 inserts a row with ID 1 and commits. - let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx1 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let tx1_row = generate_simple_string_row(1, 1, "Hello"); db.mvcc_store.insert(tx1, tx1_row.clone()).unwrap(); commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap(); // T2 deletes row with ID 1, but does not commit. let conn2 = db.db.connect().unwrap(); - let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone()); + let tx2 = db + .mvcc_store + .begin_tx(conn2.pager.borrow().clone()) + .unwrap(); assert!(db .mvcc_store .delete( @@ -404,7 +452,10 @@ fn test_dirty_read_deleted() { // T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed. let conn3 = db.db.connect().unwrap(); - let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone()); + let tx3 = db + .mvcc_store + .begin_tx(conn3.pager.borrow().clone()) + .unwrap(); let row = db .mvcc_store .read( @@ -424,7 +475,10 @@ fn test_fuzzy_read() { let db = MvccTestDb::new(); // T1 inserts a row with ID 1 and commits. - let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx1 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let tx1_row = generate_simple_string_row(1, 1, "First"); db.mvcc_store.insert(tx1, tx1_row.clone()).unwrap(); let row = db @@ -443,7 +497,10 @@ fn test_fuzzy_read() { // T2 reads the row with ID 1 within an active transaction. let conn2 = db.db.connect().unwrap(); - let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone()); + let tx2 = db + .mvcc_store + .begin_tx(conn2.pager.borrow().clone()) + .unwrap(); let row = db .mvcc_store .read( @@ -459,7 +516,10 @@ fn test_fuzzy_read() { // T3 updates the row and commits. let conn3 = db.db.connect().unwrap(); - let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone()); + let tx3 = db + .mvcc_store + .begin_tx(conn3.pager.borrow().clone()) + .unwrap(); let tx3_row = generate_simple_string_row(1, 1, "Second"); db.mvcc_store.update(tx3, tx3_row).unwrap(); commit_tx(db.mvcc_store.clone(), &conn3, tx3).unwrap(); @@ -490,7 +550,10 @@ fn test_lost_update() { let db = MvccTestDb::new(); // T1 inserts a row with ID 1 and commits. - let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx1 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let tx1_row = generate_simple_string_row(1, 1, "Hello"); db.mvcc_store.insert(tx1, tx1_row.clone()).unwrap(); let row = db @@ -509,13 +572,19 @@ fn test_lost_update() { // T2 attempts to update row ID 1 within an active transaction. let conn2 = db.db.connect().unwrap(); - let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone()); + let tx2 = db + .mvcc_store + .begin_tx(conn2.pager.borrow().clone()) + .unwrap(); let tx2_row = generate_simple_string_row(1, 1, "World"); assert!(db.mvcc_store.update(tx2, tx2_row.clone()).unwrap()); // T3 also attempts to update row ID 1 within an active transaction. let conn3 = db.db.connect().unwrap(); - let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone()); + let tx3 = db + .mvcc_store + .begin_tx(conn3.pager.borrow().clone()) + .unwrap(); let tx3_row = generate_simple_string_row(1, 1, "Hello, world!"); assert!(matches!( db.mvcc_store.update(tx3, tx3_row), @@ -533,7 +602,10 @@ fn test_lost_update() { )); let conn4 = db.db.connect().unwrap(); - let tx4 = db.mvcc_store.begin_tx(conn4.pager.borrow().clone()); + let tx4 = db + .mvcc_store + .begin_tx(conn4.pager.borrow().clone()) + .unwrap(); let row = db .mvcc_store .read( @@ -555,14 +627,20 @@ fn test_committed_visibility() { let db = MvccTestDb::new(); // let's add $10 to my account since I like money - let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx1 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let tx1_row = generate_simple_string_row(1, 1, "10"); db.mvcc_store.insert(tx1, tx1_row.clone()).unwrap(); commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap(); // but I like more money, so let me try adding $10 more let conn2 = db.db.connect().unwrap(); - let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone()); + let tx2 = db + .mvcc_store + .begin_tx(conn2.pager.borrow().clone()) + .unwrap(); let tx2_row = generate_simple_string_row(1, 1, "20"); assert!(db.mvcc_store.update(tx2, tx2_row.clone()).unwrap()); let row = db @@ -580,7 +658,10 @@ fn test_committed_visibility() { // can I check how much money I have? let conn3 = db.db.connect().unwrap(); - let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone()); + let tx3 = db + .mvcc_store + .begin_tx(conn3.pager.borrow().clone()) + .unwrap(); let row = db .mvcc_store .read( @@ -600,10 +681,16 @@ fn test_committed_visibility() { fn test_future_row() { let db = MvccTestDb::new(); - let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx1 = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let conn2 = db.db.connect().unwrap(); - let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone()); + let tx2 = db + .mvcc_store + .begin_tx(conn2.pager.borrow().clone()) + .unwrap(); let tx2_row = generate_simple_string_row(1, 1, "Hello"); db.mvcc_store.insert(tx2, tx2_row).unwrap(); @@ -647,7 +734,10 @@ use crate::{MemoryIO, Statement}; fn setup_test_db() -> (MvccTestDb, u64) { let db = MvccTestDb::new(); - let tx_id = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx_id = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let table_id = 1; let test_rows = [ @@ -667,13 +757,19 @@ fn setup_test_db() -> (MvccTestDb, u64) { commit_tx(db.mvcc_store.clone(), &db.conn, tx_id).unwrap(); - let tx_id = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx_id = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); (db, tx_id) } fn setup_lazy_db(initial_keys: &[i64]) -> (MvccTestDb, u64) { let db = MvccTestDb::new(); - let tx_id = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx_id = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let table_id = 1; for i in initial_keys { @@ -686,7 +782,10 @@ fn setup_lazy_db(initial_keys: &[i64]) -> (MvccTestDb, u64) { commit_tx(db.mvcc_store.clone(), &db.conn, tx_id).unwrap(); - let tx_id = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx_id = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); (db, tx_id) } @@ -850,10 +949,13 @@ fn test_cursor_with_empty_table() { { // FIXME: force page 1 initialization let pager = db.conn.pager.borrow().clone(); - let tx_id = db.mvcc_store.begin_tx(pager.clone()); + let tx_id = db.mvcc_store.begin_tx(pager.clone()).unwrap(); commit_tx(db.mvcc_store.clone(), &db.conn, tx_id).unwrap(); } - let tx_id = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); + let tx_id = db + .mvcc_store + .begin_tx(db.conn.pager.borrow().clone()) + .unwrap(); let table_id = 1; // Empty table // Test LazyScanCursor with empty table @@ -1076,7 +1178,7 @@ fn test_restart() { { let conn = db.connect(); let mvcc_store = db.get_mvcc_store(); - let tx_id = mvcc_store.begin_tx(conn.pager.borrow().clone()); + let tx_id = mvcc_store.begin_tx(conn.pager.borrow().clone()).unwrap(); let row = generate_simple_string_row(1, 1, "foo"); mvcc_store.insert(tx_id, row).unwrap(); @@ -1088,13 +1190,13 @@ fn test_restart() { { let conn = db.connect(); let mvcc_store = db.get_mvcc_store(); - let tx_id = mvcc_store.begin_tx(conn.pager.borrow().clone()); + let tx_id = mvcc_store.begin_tx(conn.pager.borrow().clone()).unwrap(); let row = generate_simple_string_row(1, 2, "bar"); mvcc_store.insert(tx_id, row).unwrap(); commit_tx(mvcc_store.clone(), &conn, tx_id).unwrap(); - let tx_id = mvcc_store.begin_tx(conn.pager.borrow().clone()); + let tx_id = mvcc_store.begin_tx(conn.pager.borrow().clone()).unwrap(); let row = mvcc_store.read(tx_id, RowID::new(1, 2)).unwrap().unwrap(); let record = get_record_value(&row); match record.get_value(0).unwrap() { diff --git a/core/mvcc/mod.rs b/core/mvcc/mod.rs index 310e71c1e..26a01cfbb 100644 --- a/core/mvcc/mod.rs +++ b/core/mvcc/mod.rs @@ -65,7 +65,7 @@ mod tests { let conn = db.get_db().connect().unwrap(); let mvcc_store = db.get_db().mv_store.as_ref().unwrap().clone(); for _ in 0..iterations { - let tx = mvcc_store.begin_tx(conn.pager.borrow().clone()); + let tx = mvcc_store.begin_tx(conn.pager.borrow().clone()).unwrap(); let id = IDS.fetch_add(1, Ordering::SeqCst); let id = RowID { table_id: 1, @@ -74,7 +74,7 @@ mod tests { let row = generate_simple_string_row(1, id.row_id, "Hello"); mvcc_store.insert(tx, row.clone()).unwrap(); commit_tx_no_conn(&db, tx, &conn).unwrap(); - let tx = mvcc_store.begin_tx(conn.pager.borrow().clone()); + let tx = mvcc_store.begin_tx(conn.pager.borrow().clone()).unwrap(); let committed_row = mvcc_store.read(tx, id).unwrap(); commit_tx_no_conn(&db, tx, &conn).unwrap(); assert_eq!(committed_row, Some(row)); @@ -86,7 +86,7 @@ mod tests { let conn = db.get_db().connect().unwrap(); let mvcc_store = db.get_db().mv_store.as_ref().unwrap().clone(); for _ in 0..iterations { - let tx = mvcc_store.begin_tx(conn.pager.borrow().clone()); + let tx = mvcc_store.begin_tx(conn.pager.borrow().clone()).unwrap(); let id = IDS.fetch_add(1, Ordering::SeqCst); let id = RowID { table_id: 1, @@ -95,7 +95,7 @@ mod tests { let row = generate_simple_string_row(1, id.row_id, "World"); mvcc_store.insert(tx, row.clone()).unwrap(); commit_tx_no_conn(&db, tx, &conn).unwrap(); - let tx = mvcc_store.begin_tx(conn.pager.borrow().clone()); + let tx = mvcc_store.begin_tx(conn.pager.borrow().clone()).unwrap(); let committed_row = mvcc_store.read(tx, id).unwrap(); commit_tx_no_conn(&db, tx, &conn).unwrap(); assert_eq!(committed_row, Some(row)); @@ -127,7 +127,7 @@ mod tests { let dropped = mvcc_store.drop_unused_row_versions(); tracing::debug!("garbage collected {dropped} versions"); } - let tx = mvcc_store.begin_tx(conn.pager.borrow().clone()); + let tx = mvcc_store.begin_tx(conn.pager.borrow().clone()).unwrap(); let id = i % 16; let id = RowID { table_id: 1, diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 637216caa..988245b28 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1099,14 +1099,27 @@ impl Wal for WalFile { let epoch = shared_file.read().epoch.load(Ordering::Acquire); frame.set_wal_tag(frame_id, epoch); }); - let shared = self.get_shared(); - assert!( - shared.enabled.load(Ordering::Relaxed), - "WAL must be enabled" - ); - let file = shared.file.as_ref().unwrap(); + let file = { + let shared = self.get_shared(); + assert!( + shared.enabled.load(Ordering::Relaxed), + "WAL must be enabled" + ); + // important not to hold shared lock beyond this point to avoid deadlock scenario where: + // thread 1: takes readlock here, passes reference to shared.file to begin_read_wal_frame + // thread 2: tries to acquire write lock elsewhere + // thread 1: tries to re-acquire read lock in the completion (see 'complete' above) + // + // this causes a deadlock due to the locking policy in parking_lot: + // from https://docs.rs/parking_lot/latest/parking_lot/type.RwLock.html: + // "This lock uses a task-fair locking policy which avoids both reader and writer starvation. + // This means that readers trying to acquire the lock will block even if the lock is unlocked + // when there are writers waiting to acquire the lock. + // Because of this, attempts to recursively acquire a read lock within a single thread may result in a deadlock." + shared.file.as_ref().unwrap().clone() + }; begin_read_wal_frame( - file, + &file, offset + WAL_FRAME_HEADER_SIZE as u64, buffer_pool, complete, diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 53fc9ee76..a6d3e5a58 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2168,7 +2168,7 @@ pub fn op_transaction( // } let tx_id = match tx_mode { TransactionMode::None | TransactionMode::Read | TransactionMode::Concurrent => { - mv_store.begin_tx(pager.clone()) + mv_store.begin_tx(pager.clone())? } TransactionMode::Write => { return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None)) diff --git a/perf/throughput/turso/Cargo.toml b/perf/throughput/turso/Cargo.toml index 57de85fac..fb7523378 100644 --- a/perf/throughput/turso/Cargo.toml +++ b/perf/throughput/turso/Cargo.toml @@ -11,4 +11,5 @@ path = "src/main.rs" turso = { workspace = true } clap = { version = "4.0", features = ["derive"] } tokio = { workspace = true, default-features = true, features = ["full"] } -futures = "0.3" \ No newline at end of file +futures = "0.3" +tracing-subscriber = "0.3.20" diff --git a/perf/throughput/turso/src/main.rs b/perf/throughput/turso/src/main.rs index 22d5ce20d..61bd35ed0 100644 --- a/perf/throughput/turso/src/main.rs +++ b/perf/throughput/turso/src/main.rs @@ -44,6 +44,7 @@ struct Args { #[tokio::main] async fn main() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); let args = Args::parse(); println!(