diff --git a/core/lib.rs b/core/lib.rs index 5c97c8a72..507316091 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -40,7 +40,6 @@ pub mod numeric; #[cfg(not(feature = "fuzz"))] mod numeric; -use crate::incremental::view::AllViewsTxState; use crate::storage::checksum::CHECKSUM_REQUIRED_RESERVED_BYTES; use crate::storage::encryption::CipherMode; use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME; @@ -50,6 +49,7 @@ use crate::types::{WalFrameInfo, WalState}; use crate::util::{OpenMode, OpenOptions}; use crate::vdbe::metrics::ConnectionMetrics; use crate::vtab::VirtualTable; +use crate::{incremental::view::AllViewsTxState, translate::emitter::TransactionMode}; use core::str; pub use error::{CompletionError, LimboError}; pub use io::clock::{Clock, Instant}; @@ -477,7 +477,7 @@ impl Database { closed: Cell::new(false), attached_databases: RefCell::new(DatabaseCatalog::new()), query_only: Cell::new(false), - mv_tx_id: Cell::new(None), + mv_tx: Cell::new(None), view_transaction_states: AllViewsTxState::new(), metrics: RefCell::new(ConnectionMetrics::new()), is_nested_stmt: Cell::new(false), @@ -961,7 +961,7 @@ pub struct Connection { /// Attached databases attached_databases: RefCell, query_only: Cell, - pub(crate) mv_tx_id: Cell>, + pub(crate) mv_tx: Cell>, /// Per-connection view transaction states for uncommitted changes. This represents /// one entry per view that was touched in the transaction. @@ -2145,8 +2145,8 @@ impl Statement { self.program.n_change.get() } - pub fn set_mv_tx_id(&mut self, mv_tx_id: Option) { - self.program.connection.mv_tx_id.set(mv_tx_id); + pub fn set_mv_tx(&mut self, mv_tx: Option<(u64, TransactionMode)>) { + self.program.connection.mv_tx.set(mv_tx); } pub fn interrupt(&mut self) { diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index cad89e0ef..8b1e5a176 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -185,7 +185,7 @@ pub enum OperationMode { DELETE, } -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] /// Sqlite always considers Read transactions implicit pub enum TransactionMode { None, diff --git a/core/util.rs b/core/util.rs index a40ea12d4..5ec33a7e2 100644 --- a/core/util.rs +++ b/core/util.rs @@ -1,6 +1,7 @@ #![allow(unused)] use crate::incremental::view::IncrementalView; use crate::numeric::StrToF64; +use crate::translate::emitter::TransactionMode; use crate::translate::expr::WalkControl; use crate::types::IOResult; use crate::{ @@ -150,10 +151,10 @@ pub fn parse_schema_rows( mut rows: Statement, schema: &mut Schema, syms: &SymbolTable, - mv_tx_id: Option, + mv_tx: Option<(u64, TransactionMode)>, mut existing_views: HashMap>>, ) -> Result<()> { - rows.set_mv_tx_id(mv_tx_id); + rows.set_mv_tx(mv_tx); // TODO: if we IO, this unparsed indexes is lost. Will probably need some state between // IO runs let mut from_sql_indexes = Vec::with_capacity(10); diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 5974fe9c6..608819423 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -941,8 +941,8 @@ pub fn op_open_read( let pager = program.get_pager_from_database_index(db); let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap(); - let mv_cursor = match program.connection.mv_tx_id.get() { - Some(tx_id) => { + let mv_cursor = match program.connection.mv_tx.get() { + Some((tx_id, _)) => { let table_id = *root_page as u64; let mv_store = mv_store.unwrap().clone(); let mv_cursor = Rc::new(RefCell::new( @@ -2156,7 +2156,7 @@ pub fn op_transaction( // In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed. // Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough // for both. - if program.connection.mv_tx_id.get().is_none() { + if program.connection.mv_tx.get().is_none() { // We allocate the first page lazily in the first transaction. return_if_io!(pager.maybe_allocate_page1()); // TODO: when we fix MVCC enable schema cookie detection for reprepare statements @@ -2174,17 +2174,24 @@ pub fn op_transaction( return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None)) } }; - program.connection.mv_tx_id.set(Some(tx_id)); - } else if updated - && matches!(new_transaction_state, TransactionState::Write { .. }) - && matches!(tx_mode, TransactionMode::Write) - { - let is_upgrade_from_read = matches!(current_state, TransactionState::Read); - let tx_id = program.connection.mv_tx_id.get().unwrap(); - if is_upgrade_from_read { - return_if_io!(mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id))); + program.connection.mv_tx.set(Some((tx_id, *tx_mode))); + } else if updated { + // TODO: fix tx_mode in Insn::Transaction, now each statement overrides it even if there's already a CONCURRENT Tx in progress, for example + let mv_tx_mode = program.connection.mv_tx.get().unwrap().1; + let actual_tx_mode = if mv_tx_mode == TransactionMode::Concurrent { + TransactionMode::Concurrent } else { - return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))); + TransactionMode::Write + }; + if matches!(new_transaction_state, TransactionState::Write { .. }) + && matches!(actual_tx_mode, TransactionMode::Write) + { + let (tx_id, mv_tx_mode) = program.connection.mv_tx.get().unwrap(); + if mv_tx_mode == TransactionMode::Read { + return_if_io!(mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id))); + } else { + return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))); + } } } } else { @@ -2300,7 +2307,7 @@ pub fn op_auto_commit( conn.auto_commit.replace(*auto_commit); } } else { - let mvcc_tx_active = program.connection.mv_tx_id.get().is_some(); + let mvcc_tx_active = program.connection.mv_tx.get().is_some(); if !mvcc_tx_active { if !*auto_commit { return Err(LimboError::TxError( @@ -6375,8 +6382,8 @@ pub fn op_open_write( CursorType::BTreeIndex(index) => Some(index), _ => None, }; - let mv_cursor = match program.connection.mv_tx_id.get() { - Some(tx_id) => { + let mv_cursor = match program.connection.mv_tx.get() { + Some((tx_id, _)) => { let table_id = root_page; let mv_store = mv_store.unwrap().clone(); let mv_cursor = Rc::new(RefCell::new( @@ -6650,7 +6657,7 @@ pub fn op_parse_schema( stmt, schema, &conn.syms.borrow(), - program.connection.mv_tx_id.get(), + program.connection.mv_tx.get(), existing_views, ) }) @@ -6665,7 +6672,7 @@ pub fn op_parse_schema( stmt, schema, &conn.syms.borrow(), - program.connection.mv_tx_id.get(), + program.connection.mv_tx.get(), existing_views, ) }) @@ -7121,8 +7128,8 @@ pub fn op_open_ephemeral( let root_page = return_if_io!(pager.btree_create(flag)); let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap(); - let mv_cursor = match program.connection.mv_tx_id.get() { - Some(tx_id) => { + let mv_cursor = match program.connection.mv_tx.get() { + Some((tx_id, _)) => { let table_id = root_page as u64; let mv_store = mv_store.unwrap().clone(); let mv_cursor = Rc::new(RefCell::new( diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 5d28d2e8a..b4df333dd 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -821,7 +821,7 @@ impl Program { if auto_commit { // FIXME: we don't want to commit stuff from other programs. if matches!(program_state.commit_state, CommitState::Ready) { - let Some(tx_id) = conn.mv_tx_id.get() else { + let Some((tx_id, _)) = conn.mv_tx.get() else { return Ok(IOResult::Done(())); }; let state_machine = mv_store.commit_tx(tx_id, pager.clone(), &conn).unwrap(); @@ -834,7 +834,7 @@ impl Program { match self.step_end_mvcc_txn(state_machine, mv_store)? { IOResult::Done(_) => { assert!(state_machine.is_finalized()); - conn.mv_tx_id.set(None); + conn.mv_tx.set(None); conn.transaction_state.replace(TransactionState::None); program_state.commit_state = CommitState::Ready; return Ok(IOResult::Done(())); @@ -1079,7 +1079,7 @@ pub fn handle_program_error( LimboError::Busy => {} _ => { if let Some(mv_store) = mv_store { - if let Some(tx_id) = connection.mv_tx_id.get() { + if let Some((tx_id, _)) = connection.mv_tx.get() { mv_store.rollback_tx(tx_id, pager.clone()); } } else {