From ffaf8580e00aee4167096dd08371f022b4226de9 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 20 Aug 2025 12:22:31 +0200 Subject: [PATCH 1/3] mvcc/core: simple interactive transaction tests for mvcc --- core/mvcc/database/tests.rs | 45 +++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 138cb08f2..0a38d575f 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -1161,3 +1161,48 @@ fn get_record_value(row: &Row) -> ImmutableRecord { record.start_serialization(&row.data); record } + +#[test] +fn test_interactive_transaction() { + let db = MvccTestDbNoConn::new_with_random_db(); + let conn = db.connect(); + + // do some transaction + conn.execute("BEGIN").unwrap(); + conn.execute("CREATE TABLE test (x)").unwrap(); + conn.execute("INSERT INTO test (x) VALUES (1)").unwrap(); + conn.execute("INSERT INTO test (x) VALUES (2)").unwrap(); + conn.execute("COMMIT").unwrap(); + + // expect other transaction to see the changes + let rows = get_rows(&conn, "SELECT * FROM test"); + assert_eq!(rows, vec![vec![Value::Integer(1)], vec![Value::Integer(2)]]); +} + +#[test] +fn test_commit_without_tx() { + let db = MvccTestDbNoConn::new_with_random_db(); + let conn = db.connect(); + // do not start interactive transaction + conn.execute("CREATE TABLE test (x)").unwrap(); + conn.execute("INSERT INTO test (x) VALUES (1)").unwrap(); + + // expect error on trying to commit a non-existent interactive transaction + let err = conn.execute("COMMIT").unwrap_err(); + if let LimboError::TxError(e) = err { + assert_eq!(e.to_string(), "cannot commit - no transaction is active"); + } else { + panic!("Expected TxError"); + } +} + +fn get_rows(conn: &Arc, query: &str) -> Vec> { + let mut stmt = conn.prepare(query).unwrap(); + let mut rows = Vec::new(); + while let StepResult::Row = stmt.step().unwrap() { + let row = stmt.row().unwrap(); + let values = row.get_values().map(|v| v.clone()).collect::>(); + rows.push(values); + } + rows +} From 9e3b7b0c980e32b50c3cae0427b8a75f2378660a Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 20 Aug 2025 12:23:28 +0200 Subject: [PATCH 2/3] core/mvcc: store txid in conn and reset transaction state on commit --- core/lib.rs | 5 ++++- core/mvcc/database/mod.rs | 1 + core/vdbe/execute.rs | 14 +++++++------- core/vdbe/mod.rs | 4 ++-- 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 5dd947673..5cb9dcf61 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -421,6 +421,7 @@ impl Database { closed: Cell::new(false), attached_databases: RefCell::new(DatabaseCatalog::new()), query_only: Cell::new(false), + mv_tx_id: Cell::new(None), view_transaction_states: RefCell::new(HashMap::new()), }); let builtin_syms = self.builtin_syms.borrow(); @@ -821,6 +822,7 @@ pub struct Connection { database_schemas: RefCell>>, /// Whether to automatically commit transaction auto_commit: Cell, + /// Transactions that are in progress. mv_transactions: RefCell>, transaction_state: Cell, last_insert_rowid: Cell, @@ -840,6 +842,7 @@ pub struct Connection { /// Attached databases attached_databases: RefCell, query_only: Cell, + pub(crate) mv_tx_id: Cell>, /// Per-connection view transaction states for uncommitted changes. This represents /// one entry per view that was touched in the transaction. @@ -1948,7 +1951,7 @@ impl Statement { } pub fn set_mv_tx_id(&mut self, mv_tx_id: Option) { - self.state.mv_tx_id = mv_tx_id; + self.program.connection.mv_tx_id.set(mv_tx_id); } pub fn interrupt(&mut self) { diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 881601210..666444e47 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -988,6 +988,7 @@ impl MvStore { pager: Rc, connection: &Arc, ) -> Result>> { + tracing::trace!("commit_tx(tx_id={})", tx_id); let state_machine: StateMachine> = StateMachine::< CommitStateMachine, >::new( diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 7d932ef04..fce6f59e4 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -937,7 +937,7 @@ pub fn op_open_read( let pager = program.get_pager_from_database_index(db); let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap(); - let mv_cursor = match state.mv_tx_id { + let mv_cursor = match program.connection.mv_tx_id.get() { Some(tx_id) => { let table_id = *root_page as u64; let mv_store = mv_store.unwrap().clone(); @@ -2025,7 +2025,7 @@ pub fn op_transaction( // In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed. // Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough // for both. - if state.mv_tx_id.is_none() { + if program.connection.mv_tx_id.get().is_none() { // We allocate the first page lazily in the first transaction. return_if_io!(pager.maybe_allocate_page1()); // TODO: when we fix MVCC enable schema cookie detection for reprepare statements @@ -2037,7 +2037,7 @@ pub fn op_transaction( // } let tx_id = mv_store.begin_tx(pager.clone()); conn.mv_transactions.borrow_mut().push(tx_id); - state.mv_tx_id = Some(tx_id); + program.connection.mv_tx_id.set(Some(tx_id)); } } else { if updated && matches!(current_state, TransactionState::None) { @@ -6113,7 +6113,7 @@ pub fn op_open_write( CursorType::BTreeIndex(index) => Some(index), _ => None, }; - let mv_cursor = match state.mv_tx_id { + let mv_cursor = match program.connection.mv_tx_id.get() { Some(tx_id) => { let table_id = root_page; let mv_store = mv_store.unwrap().clone(); @@ -6383,7 +6383,7 @@ pub fn op_parse_schema( stmt, schema, &conn.syms.borrow(), - state.mv_tx_id, + program.connection.mv_tx_id.get(), existing_views, ) })?; @@ -6397,7 +6397,7 @@ pub fn op_parse_schema( stmt, schema, &conn.syms.borrow(), - state.mv_tx_id, + program.connection.mv_tx_id.get(), existing_views, ) })?; @@ -6774,7 +6774,7 @@ pub fn op_open_ephemeral( let root_page = return_if_io!(pager.btree_create(flag)); let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap(); - let mv_cursor = match state.mv_tx_id { + let mv_cursor = match program.connection.mv_tx_id.get() { Some(tx_id) => { let table_id = root_page as u64; let mv_store = mv_store.unwrap().clone(); diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 4ac1455e3..4f84627f8 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -248,7 +248,6 @@ pub struct ProgramState { /// Indicate whether an [Insn::Once] instruction at a given program counter position has already been executed, well, once. once: SmallVec, regex_cache: RegexCache, - pub(crate) mv_tx_id: Option, interrupted: bool, pub parameters: HashMap, Value>, commit_state: CommitState, @@ -285,7 +284,6 @@ impl ProgramState { ended_coroutine: Bitfield::new(), once: SmallVec::::new(), regex_cache: RegexCache::new(), - mv_tx_id: None, interrupted: false, parameters: HashMap::new(), commit_state: CommitState::Ready, @@ -520,6 +518,8 @@ impl Program { } assert!(state_machine.is_finalized()); } + conn.mv_tx_id.set(None); + conn.transaction_state.replace(TransactionState::None); mv_transactions.clear(); } Ok(IOResult::Done(())) From 636a3e76e643111fedf66fa6bc6dd0dca281df55 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 20 Aug 2025 12:34:11 +0200 Subject: [PATCH 3/3] clippy mvcc tests --- core/mvcc/database/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 0a38d575f..27e321c7c 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -1201,7 +1201,7 @@ fn get_rows(conn: &Arc, query: &str) -> Vec> { let mut rows = Vec::new(); while let StepResult::Row = stmt.step().unwrap() { let row = stmt.row().unwrap(); - let values = row.get_values().map(|v| v.clone()).collect::>(); + let values = row.get_values().cloned().collect::>(); rows.push(values); } rows