diff --git a/core/lib.rs b/core/lib.rs index da49b2b80..e4874e755 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -423,6 +423,7 @@ impl Database { closed: Cell::new(false), attached_databases: RefCell::new(DatabaseCatalog::new()), query_only: Cell::new(false), + mv_tx_id: Cell::new(None), view_transaction_states: RefCell::new(HashMap::new()), metrics: RefCell::new(ConnectionMetrics::new()), is_nested_stmt: Cell::new(false), @@ -826,6 +827,7 @@ pub struct Connection { database_schemas: RefCell>>, /// Whether to automatically commit transaction auto_commit: Cell, + /// Transactions that are in progress. mv_transactions: RefCell>, transaction_state: Cell, last_insert_rowid: Cell, @@ -845,6 +847,7 @@ pub struct Connection { /// Attached databases attached_databases: RefCell, query_only: Cell, + pub(crate) mv_tx_id: Cell>, /// Per-connection view transaction states for uncommitted changes. This represents /// one entry per view that was touched in the transaction. @@ -1966,7 +1969,7 @@ impl Statement { } pub fn set_mv_tx_id(&mut self, mv_tx_id: Option) { - self.state.mv_tx_id = mv_tx_id; + self.program.connection.mv_tx_id.set(mv_tx_id); } pub fn interrupt(&mut self) { diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 8d950d221..bb1a0608e 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1127,6 +1127,7 @@ impl MvStore { pager: Rc, connection: &Arc, ) -> Result>> { + tracing::trace!("commit_tx(tx_id={})", tx_id); let state_machine: StateMachine> = StateMachine::< CommitStateMachine, >::new( diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 9972c7aca..438e897ec 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -1198,3 +1198,48 @@ fn get_record_value(row: &Row) -> ImmutableRecord { record.start_serialization(&row.data); record } + +#[test] +fn test_interactive_transaction() { + let db = MvccTestDbNoConn::new_with_random_db(); + let conn = db.connect(); + + // do some transaction + conn.execute("BEGIN").unwrap(); + conn.execute("CREATE TABLE test (x)").unwrap(); + conn.execute("INSERT INTO test (x) VALUES (1)").unwrap(); + conn.execute("INSERT INTO test (x) VALUES (2)").unwrap(); + conn.execute("COMMIT").unwrap(); + + // expect other transaction to see the changes + let rows = get_rows(&conn, "SELECT * FROM test"); + assert_eq!(rows, vec![vec![Value::Integer(1)], vec![Value::Integer(2)]]); +} + +#[test] +fn test_commit_without_tx() { + let db = MvccTestDbNoConn::new_with_random_db(); + let conn = db.connect(); + // do not start interactive transaction + conn.execute("CREATE TABLE test (x)").unwrap(); + conn.execute("INSERT INTO test (x) VALUES (1)").unwrap(); + + // expect error on trying to commit a non-existent interactive transaction + let err = conn.execute("COMMIT").unwrap_err(); + if let LimboError::TxError(e) = err { + assert_eq!(e.to_string(), "cannot commit - no transaction is active"); + } else { + panic!("Expected TxError"); + } +} + +fn get_rows(conn: &Arc, query: &str) -> Vec> { + let mut stmt = conn.prepare(query).unwrap(); + let mut rows = Vec::new(); + while let StepResult::Row = stmt.step().unwrap() { + let row = stmt.row().unwrap(); + let values = row.get_values().cloned().collect::>(); + rows.push(values); + } + rows +} diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index aa96ec520..b6ecf885d 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -937,7 +937,7 @@ pub fn op_open_read( let pager = program.get_pager_from_database_index(db); let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap(); - let mv_cursor = match state.mv_tx_id { + let mv_cursor = match program.connection.mv_tx_id.get() { Some(tx_id) => { let table_id = *root_page as u64; let mv_store = mv_store.unwrap().clone(); @@ -2061,7 +2061,7 @@ pub fn op_transaction( // In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed. // Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough // for both. - if state.mv_tx_id.is_none() { + if program.connection.mv_tx_id.get().is_none() { // We allocate the first page lazily in the first transaction. return_if_io!(pager.maybe_allocate_page1()); // TODO: when we fix MVCC enable schema cookie detection for reprepare statements @@ -2073,7 +2073,7 @@ pub fn op_transaction( // } let tx_id = mv_store.begin_tx(pager.clone()); conn.mv_transactions.borrow_mut().push(tx_id); - state.mv_tx_id = Some(tx_id); + program.connection.mv_tx_id.set(Some(tx_id)); } } else { if updated && matches!(current_state, TransactionState::None) { @@ -6195,7 +6195,7 @@ pub fn op_open_write( CursorType::BTreeIndex(index) => Some(index), _ => None, }; - let mv_cursor = match state.mv_tx_id { + let mv_cursor = match program.connection.mv_tx_id.get() { Some(tx_id) => { let table_id = root_page; let mv_store = mv_store.unwrap().clone(); @@ -6466,7 +6466,7 @@ pub fn op_parse_schema( stmt, schema, &conn.syms.borrow(), - state.mv_tx_id, + program.connection.mv_tx_id.get(), existing_views, ) }) @@ -6481,7 +6481,7 @@ pub fn op_parse_schema( stmt, schema, &conn.syms.borrow(), - state.mv_tx_id, + program.connection.mv_tx_id.get(), existing_views, ) }) @@ -6860,7 +6860,7 @@ pub fn op_open_ephemeral( let root_page = return_if_io!(pager.btree_create(flag)); let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap(); - let mv_cursor = match state.mv_tx_id { + let mv_cursor = match program.connection.mv_tx_id.get() { Some(tx_id) => { let table_id = root_page as u64; let mv_store = mv_store.unwrap().clone(); diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 09a083456..b99c17cd5 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -250,7 +250,6 @@ pub struct ProgramState { /// Indicate whether an [Insn::Once] instruction at a given program counter position has already been executed, well, once. once: SmallVec, regex_cache: RegexCache, - pub(crate) mv_tx_id: Option, interrupted: bool, pub parameters: HashMap, Value>, commit_state: CommitState, @@ -289,7 +288,6 @@ impl ProgramState { ended_coroutine: Bitfield::new(), once: SmallVec::::new(), regex_cache: RegexCache::new(), - mv_tx_id: None, interrupted: false, parameters: HashMap::new(), commit_state: CommitState::Ready, @@ -546,6 +544,8 @@ impl Program { } assert!(state_machine.is_finalized()); } + conn.mv_tx_id.set(None); + conn.transaction_state.replace(TransactionState::None); mv_transactions.clear(); } Ok(IOResult::Done(()))