From 97657a86b3baa0c7b6d2908d66d024fc6ba82529 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 19 Aug 2025 12:33:17 +0300 Subject: [PATCH 1/2] Do not assume error message content in FaultyQuery --- simulator/generation/plan.rs | 3 +++ simulator/generation/property.rs | 11 ++++------- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/simulator/generation/plan.rs b/simulator/generation/plan.rs index eae39d4d1..c27becad5 100644 --- a/simulator/generation/plan.rs +++ b/simulator/generation/plan.rs @@ -645,6 +645,9 @@ impl Interaction { &query_str[0..query_str.len().min(4096)], err ); + if let Some(turso_core::LimboError::ParseError(e)) = err { + panic!("Unexpected parse error: {e}"); + } return Err(err.unwrap()); } let mut rows = rows.unwrap().unwrap(); diff --git a/simulator/generation/property.rs b/simulator/generation/property.rs index 6a0237509..4725aa384 100644 --- a/simulator/generation/property.rs +++ b/simulator/generation/property.rs @@ -16,7 +16,6 @@ use crate::{ Create, Delete, Drop, Insert, Query, Select, }, table::SimValue, - FAULT_ERROR_MSG, }, runner::env::SimulatorEnv, }; @@ -752,12 +751,10 @@ impl Property { Ok(Ok(())) } Err(err) => { - let msg = format!("{err}"); - if msg.contains(FAULT_ERROR_MSG) { - Ok(Ok(())) - } else { - Err(LimboError::InternalError(msg)) - } + // We cannot make any assumptions about the error content; all we are about is, if the statement errored, + // we don't shadow the results into the simulator env, i.e. we assume whatever the statement did was rolled back. + tracing::error!("Fault injection produced error: {err}"); + Ok(Ok(())) } } }), From 7f1eac9560dd9cd378864f080e97b827ab2c7f16 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 19 Aug 2025 13:03:14 +0300 Subject: [PATCH 2/2] Do not start or end transaction in nested statement --- core/lib.rs | 7 ++++ core/storage/pager.rs | 4 +++ core/vdbe/execute.rs | 84 +++++++++++++++++++++++++------------------ 3 files changed, 61 insertions(+), 34 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index f6899a081..607c1ffae 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -424,6 +424,7 @@ impl Database { query_only: Cell::new(false), view_transaction_states: RefCell::new(HashMap::new()), metrics: RefCell::new(ConnectionMetrics::new()), + is_nested_stmt: Cell::new(false), }); let builtin_syms = self.builtin_syms.borrow(); // add built-in extensions symbols to the connection to prevent having to load each time @@ -848,6 +849,9 @@ pub struct Connection { view_transaction_states: RefCell>, /// Connection-level metrics aggregation pub metrics: RefCell, + /// Whether the connection is executing a statement initiated by another statement. + /// Generally this is only true for ParseSchema. + is_nested_stmt: Cell, } impl Connection { @@ -2040,6 +2044,9 @@ impl Statement { pub fn run_once(&self) -> Result<()> { let res = self.pager.io.run_once(); + if self.program.connection.is_nested_stmt.get() { + return res; + } if res.is_err() { let state = self.program.connection.transaction_state.get(); if let TransactionState::Write { .. } = state { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index f87faf9d2..4410b238c 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -962,6 +962,10 @@ impl Pager { connection: &Connection, wal_auto_checkpoint_disabled: bool, ) -> Result> { + if connection.is_nested_stmt.get() { + // Parent statement will handle the transaction rollback. + return Ok(IOResult::Done(PagerCommitResult::Rollback)); + } tracing::trace!("end_tx(rollback={})", rollback); let Some(wal) = self.wal.as_ref() else { // TODO: Unsure what the semantics of "end_tx" is for in-memory databases, ephemeral tables and ephemeral indexes. diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 3dc2b47cd..14bc0c7c3 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2015,41 +2015,45 @@ pub fn op_transaction( // 1. We try to upgrade current version let current_state = conn.transaction_state.get(); - let (new_transaction_state, updated) = match (current_state, write) { - // pending state means that we tried beginning a tx and the method returned IO. - // instead of ending the read tx, just update the state to pending. - (TransactionState::PendingUpgrade, write) => { - turso_assert!( - *write, - "pending upgrade should only be set for write transactions" - ); - ( + let (new_transaction_state, updated) = if conn.is_nested_stmt.get() { + (current_state, false) + } else { + match (current_state, write) { + // pending state means that we tried beginning a tx and the method returned IO. + // instead of ending the read tx, just update the state to pending. + (TransactionState::PendingUpgrade, write) => { + turso_assert!( + *write, + "pending upgrade should only be set for write transactions" + ); + ( + TransactionState::Write { + schema_did_change: false, + }, + true, + ) + } + (TransactionState::Write { schema_did_change }, true) => { + (TransactionState::Write { schema_did_change }, false) + } + (TransactionState::Write { schema_did_change }, false) => { + (TransactionState::Write { schema_did_change }, false) + } + (TransactionState::Read, true) => ( TransactionState::Write { schema_did_change: false, }, true, - ) + ), + (TransactionState::Read, false) => (TransactionState::Read, false), + (TransactionState::None, true) => ( + TransactionState::Write { + schema_did_change: false, + }, + true, + ), + (TransactionState::None, false) => (TransactionState::Read, true), } - (TransactionState::Write { schema_did_change }, true) => { - (TransactionState::Write { schema_did_change }, false) - } - (TransactionState::Write { schema_did_change }, false) => { - (TransactionState::Write { schema_did_change }, false) - } - (TransactionState::Read, true) => ( - TransactionState::Write { - schema_did_change: false, - }, - true, - ), - (TransactionState::Read, false) => (TransactionState::Read, false), - (TransactionState::None, true) => ( - TransactionState::Write { - schema_did_change: false, - }, - true, - ), - (TransactionState::None, false) => (TransactionState::Read, true), }; // 2. Start transaction if needed @@ -2073,12 +2077,20 @@ pub fn op_transaction( } } else { if updated && matches!(current_state, TransactionState::None) { + turso_assert!( + !conn.is_nested_stmt.get(), + "nested stmt should not begin a new read transaction" + ); if let LimboResult::Busy = pager.begin_read_tx()? { return Ok(InsnFunctionStepResult::Busy); } } if updated && matches!(new_transaction_state, TransactionState::Write { .. }) { + turso_assert!( + !conn.is_nested_stmt.get(), + "nested stmt should not begin a new write transaction" + ); match pager.begin_write_tx()? { IOResult::Done(r) => { if let LimboResult::Busy = r { @@ -6425,12 +6437,13 @@ pub fn op_parse_schema( let previous_auto_commit = conn.auto_commit.get(); conn.auto_commit.set(false); - if let Some(where_clause) = where_clause { + let maybe_nested_stmt_err = if let Some(where_clause) = where_clause { let stmt = conn.prepare(format!("SELECT * FROM sqlite_schema WHERE {where_clause}"))?; conn.with_schema_mut(|schema| { // TODO: This function below is synchronous, make it async let existing_views = schema.materialized_views.clone(); + conn.is_nested_stmt.set(true); parse_schema_rows( stmt, schema, @@ -6438,13 +6451,14 @@ pub fn op_parse_schema( state.mv_tx_id, existing_views, ) - })?; + }) } else { let stmt = conn.prepare("SELECT * FROM sqlite_schema")?; conn.with_schema_mut(|schema| { // TODO: This function below is synchronous, make it async let existing_views = schema.materialized_views.clone(); + conn.is_nested_stmt.set(true); parse_schema_rows( stmt, schema, @@ -6452,9 +6466,11 @@ pub fn op_parse_schema( state.mv_tx_id, existing_views, ) - })?; - } + }) + }; + conn.is_nested_stmt.set(false); conn.auto_commit.set(previous_auto_commit); + maybe_nested_stmt_err?; state.pc += 1; Ok(InsnFunctionStepResult::Step) }