store tx_mode in conn.mv_tx

otherwise op_transaction works completely wrong because each separate
insert statement overrides the tx_mode to Write
This commit is contained in:
Jussi Saurio
2025-09-14 21:58:44 +03:00
parent 7fe25a1d0e
commit 396091044e
5 changed files with 39 additions and 31 deletions

View File

@@ -40,7 +40,6 @@ pub mod numeric;
#[cfg(not(feature = "fuzz"))]
mod numeric;
use crate::incremental::view::AllViewsTxState;
use crate::storage::checksum::CHECKSUM_REQUIRED_RESERVED_BYTES;
use crate::storage::encryption::CipherMode;
use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME;
@@ -50,6 +49,7 @@ use crate::types::{WalFrameInfo, WalState};
use crate::util::{OpenMode, OpenOptions};
use crate::vdbe::metrics::ConnectionMetrics;
use crate::vtab::VirtualTable;
use crate::{incremental::view::AllViewsTxState, translate::emitter::TransactionMode};
use core::str;
pub use error::{CompletionError, LimboError};
pub use io::clock::{Clock, Instant};
@@ -477,7 +477,7 @@ impl Database {
closed: Cell::new(false),
attached_databases: RefCell::new(DatabaseCatalog::new()),
query_only: Cell::new(false),
mv_tx_id: Cell::new(None),
mv_tx: Cell::new(None),
view_transaction_states: AllViewsTxState::new(),
metrics: RefCell::new(ConnectionMetrics::new()),
is_nested_stmt: Cell::new(false),
@@ -961,7 +961,7 @@ pub struct Connection {
/// Attached databases
attached_databases: RefCell<DatabaseCatalog>,
query_only: Cell<bool>,
pub(crate) mv_tx_id: Cell<Option<crate::mvcc::database::TxID>>,
pub(crate) mv_tx: Cell<Option<(crate::mvcc::database::TxID, TransactionMode)>>,
/// Per-connection view transaction states for uncommitted changes. This represents
/// one entry per view that was touched in the transaction.
@@ -2145,8 +2145,8 @@ impl Statement {
self.program.n_change.get()
}
pub fn set_mv_tx_id(&mut self, mv_tx_id: Option<u64>) {
self.program.connection.mv_tx_id.set(mv_tx_id);
pub fn set_mv_tx(&mut self, mv_tx: Option<(u64, TransactionMode)>) {
self.program.connection.mv_tx.set(mv_tx);
}
pub fn interrupt(&mut self) {

View File

@@ -185,7 +185,7 @@ pub enum OperationMode {
DELETE,
}
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
/// Sqlite always considers Read transactions implicit
pub enum TransactionMode {
None,

View File

@@ -1,6 +1,7 @@
#![allow(unused)]
use crate::incremental::view::IncrementalView;
use crate::numeric::StrToF64;
use crate::translate::emitter::TransactionMode;
use crate::translate::expr::WalkControl;
use crate::types::IOResult;
use crate::{
@@ -150,10 +151,10 @@ pub fn parse_schema_rows(
mut rows: Statement,
schema: &mut Schema,
syms: &SymbolTable,
mv_tx_id: Option<u64>,
mv_tx: Option<(u64, TransactionMode)>,
mut existing_views: HashMap<String, Arc<Mutex<IncrementalView>>>,
) -> Result<()> {
rows.set_mv_tx_id(mv_tx_id);
rows.set_mv_tx(mv_tx);
// TODO: if we IO, this unparsed indexes is lost. Will probably need some state between
// IO runs
let mut from_sql_indexes = Vec::with_capacity(10);

View File

@@ -941,8 +941,8 @@ pub fn op_open_read(
let pager = program.get_pager_from_database_index(db);
let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap();
let mv_cursor = match program.connection.mv_tx_id.get() {
Some(tx_id) => {
let mv_cursor = match program.connection.mv_tx.get() {
Some((tx_id, _)) => {
let table_id = *root_page as u64;
let mv_store = mv_store.unwrap().clone();
let mv_cursor = Rc::new(RefCell::new(
@@ -2156,7 +2156,7 @@ pub fn op_transaction(
// In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed.
// Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough
// for both.
if program.connection.mv_tx_id.get().is_none() {
if program.connection.mv_tx.get().is_none() {
// We allocate the first page lazily in the first transaction.
return_if_io!(pager.maybe_allocate_page1());
// TODO: when we fix MVCC enable schema cookie detection for reprepare statements
@@ -2174,17 +2174,24 @@ pub fn op_transaction(
return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None))
}
};
program.connection.mv_tx_id.set(Some(tx_id));
} else if updated
&& matches!(new_transaction_state, TransactionState::Write { .. })
&& matches!(tx_mode, TransactionMode::Write)
{
let is_upgrade_from_read = matches!(current_state, TransactionState::Read);
let tx_id = program.connection.mv_tx_id.get().unwrap();
if is_upgrade_from_read {
return_if_io!(mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id)));
program.connection.mv_tx.set(Some((tx_id, *tx_mode)));
} else if updated {
// TODO: fix tx_mode in Insn::Transaction, now each statement overrides it even if there's already a CONCURRENT Tx in progress, for example
let mv_tx_mode = program.connection.mv_tx.get().unwrap().1;
let actual_tx_mode = if mv_tx_mode == TransactionMode::Concurrent {
TransactionMode::Concurrent
} else {
return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id)));
TransactionMode::Write
};
if matches!(new_transaction_state, TransactionState::Write { .. })
&& matches!(actual_tx_mode, TransactionMode::Write)
{
let (tx_id, mv_tx_mode) = program.connection.mv_tx.get().unwrap();
if mv_tx_mode == TransactionMode::Read {
return_if_io!(mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id)));
} else {
return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id)));
}
}
}
} else {
@@ -2300,7 +2307,7 @@ pub fn op_auto_commit(
conn.auto_commit.replace(*auto_commit);
}
} else {
let mvcc_tx_active = program.connection.mv_tx_id.get().is_some();
let mvcc_tx_active = program.connection.mv_tx.get().is_some();
if !mvcc_tx_active {
if !*auto_commit {
return Err(LimboError::TxError(
@@ -6375,8 +6382,8 @@ pub fn op_open_write(
CursorType::BTreeIndex(index) => Some(index),
_ => None,
};
let mv_cursor = match program.connection.mv_tx_id.get() {
Some(tx_id) => {
let mv_cursor = match program.connection.mv_tx.get() {
Some((tx_id, _)) => {
let table_id = root_page;
let mv_store = mv_store.unwrap().clone();
let mv_cursor = Rc::new(RefCell::new(
@@ -6650,7 +6657,7 @@ pub fn op_parse_schema(
stmt,
schema,
&conn.syms.borrow(),
program.connection.mv_tx_id.get(),
program.connection.mv_tx.get(),
existing_views,
)
})
@@ -6665,7 +6672,7 @@ pub fn op_parse_schema(
stmt,
schema,
&conn.syms.borrow(),
program.connection.mv_tx_id.get(),
program.connection.mv_tx.get(),
existing_views,
)
})
@@ -7121,8 +7128,8 @@ pub fn op_open_ephemeral(
let root_page = return_if_io!(pager.btree_create(flag));
let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();
let mv_cursor = match program.connection.mv_tx_id.get() {
Some(tx_id) => {
let mv_cursor = match program.connection.mv_tx.get() {
Some((tx_id, _)) => {
let table_id = root_page as u64;
let mv_store = mv_store.unwrap().clone();
let mv_cursor = Rc::new(RefCell::new(

View File

@@ -821,7 +821,7 @@ impl Program {
if auto_commit {
// FIXME: we don't want to commit stuff from other programs.
if matches!(program_state.commit_state, CommitState::Ready) {
let Some(tx_id) = conn.mv_tx_id.get() else {
let Some((tx_id, _)) = conn.mv_tx.get() else {
return Ok(IOResult::Done(()));
};
let state_machine = mv_store.commit_tx(tx_id, pager.clone(), &conn).unwrap();
@@ -834,7 +834,7 @@ impl Program {
match self.step_end_mvcc_txn(state_machine, mv_store)? {
IOResult::Done(_) => {
assert!(state_machine.is_finalized());
conn.mv_tx_id.set(None);
conn.mv_tx.set(None);
conn.transaction_state.replace(TransactionState::None);
program_state.commit_state = CommitState::Ready;
return Ok(IOResult::Done(()));
@@ -1079,7 +1079,7 @@ pub fn handle_program_error(
LimboError::Busy => {}
_ => {
if let Some(mv_store) = mv_store {
if let Some(tx_id) = connection.mv_tx_id.get() {
if let Some((tx_id, _)) = connection.mv_tx.get() {
mv_store.rollback_tx(tx_id, pager.clone());
}
} else {