Merge 'Do not begin or end transactions in nested statement' from Jussi Saurio

Closes #2657
Closes #2659
Closes #2660
Closes #2661
1. In simulator, do not assume every error that happens after fault
injection has the literal error message `"Injected fault"`. If an error
happened, all we need to do is not shadow the query into the in-memory
simulator environment -- in other words, we assume whatever the
statement did failed and was rolled back.
2. Do not begin or end transactions inside a nested statement. Nested
statements generally only occur when `ParseSchema` is invoked, which
runs the equivalent of `SELECT * FROM sqlite_schema`. This does not need
a new transaction nor does it need to end the transaction on error,
since the parent statement will handle it.

Reviewed-by: Pere Diaz Bou <pere-altea@homail.com>

Closes #2666
This commit is contained in:
Jussi Saurio
2025-08-19 14:25:33 +03:00
committed by GitHub
5 changed files with 68 additions and 41 deletions

View File

@@ -424,6 +424,7 @@ impl Database {
query_only: Cell::new(false),
view_transaction_states: RefCell::new(HashMap::new()),
metrics: RefCell::new(ConnectionMetrics::new()),
is_nested_stmt: Cell::new(false),
});
let builtin_syms = self.builtin_syms.borrow();
// add built-in extensions symbols to the connection to prevent having to load each time
@@ -848,6 +849,9 @@ pub struct Connection {
view_transaction_states: RefCell<HashMap<String, ViewTransactionState>>,
/// Connection-level metrics aggregation
pub metrics: RefCell<ConnectionMetrics>,
/// Whether the connection is executing a statement initiated by another statement.
/// Generally this is only true for ParseSchema.
is_nested_stmt: Cell<bool>,
}
impl Connection {
@@ -2040,6 +2044,9 @@ impl Statement {
pub fn run_once(&self) -> Result<()> {
let res = self.pager.io.run_once();
if self.program.connection.is_nested_stmt.get() {
return res;
}
if res.is_err() {
let state = self.program.connection.transaction_state.get();
if let TransactionState::Write { .. } = state {

View File

@@ -962,6 +962,10 @@ impl Pager {
connection: &Connection,
wal_auto_checkpoint_disabled: bool,
) -> Result<IOResult<PagerCommitResult>> {
if connection.is_nested_stmt.get() {
// Parent statement will handle the transaction rollback.
return Ok(IOResult::Done(PagerCommitResult::Rollback));
}
tracing::trace!("end_tx(rollback={})", rollback);
let Some(wal) = self.wal.as_ref() else {
// TODO: Unsure what the semantics of "end_tx" is for in-memory databases, ephemeral tables and ephemeral indexes.

View File

@@ -2015,41 +2015,45 @@ pub fn op_transaction(
// 1. We try to upgrade current version
let current_state = conn.transaction_state.get();
let (new_transaction_state, updated) = match (current_state, write) {
// pending state means that we tried beginning a tx and the method returned IO.
// instead of ending the read tx, just update the state to pending.
(TransactionState::PendingUpgrade, write) => {
turso_assert!(
*write,
"pending upgrade should only be set for write transactions"
);
(
let (new_transaction_state, updated) = if conn.is_nested_stmt.get() {
(current_state, false)
} else {
match (current_state, write) {
// pending state means that we tried beginning a tx and the method returned IO.
// instead of ending the read tx, just update the state to pending.
(TransactionState::PendingUpgrade, write) => {
turso_assert!(
*write,
"pending upgrade should only be set for write transactions"
);
(
TransactionState::Write {
schema_did_change: false,
},
true,
)
}
(TransactionState::Write { schema_did_change }, true) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Write { schema_did_change }, false) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Read, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
)
),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::None, false) => (TransactionState::Read, true),
}
(TransactionState::Write { schema_did_change }, true) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Write { schema_did_change }, false) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Read, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::None, false) => (TransactionState::Read, true),
};
// 2. Start transaction if needed
@@ -2073,12 +2077,20 @@ pub fn op_transaction(
}
} else {
if updated && matches!(current_state, TransactionState::None) {
turso_assert!(
!conn.is_nested_stmt.get(),
"nested stmt should not begin a new read transaction"
);
if let LimboResult::Busy = pager.begin_read_tx()? {
return Ok(InsnFunctionStepResult::Busy);
}
}
if updated && matches!(new_transaction_state, TransactionState::Write { .. }) {
turso_assert!(
!conn.is_nested_stmt.get(),
"nested stmt should not begin a new write transaction"
);
match pager.begin_write_tx()? {
IOResult::Done(r) => {
if let LimboResult::Busy = r {
@@ -6425,12 +6437,13 @@ pub fn op_parse_schema(
let previous_auto_commit = conn.auto_commit.get();
conn.auto_commit.set(false);
if let Some(where_clause) = where_clause {
let maybe_nested_stmt_err = if let Some(where_clause) = where_clause {
let stmt = conn.prepare(format!("SELECT * FROM sqlite_schema WHERE {where_clause}"))?;
conn.with_schema_mut(|schema| {
// TODO: This function below is synchronous, make it async
let existing_views = schema.materialized_views.clone();
conn.is_nested_stmt.set(true);
parse_schema_rows(
stmt,
schema,
@@ -6438,13 +6451,14 @@ pub fn op_parse_schema(
state.mv_tx_id,
existing_views,
)
})?;
})
} else {
let stmt = conn.prepare("SELECT * FROM sqlite_schema")?;
conn.with_schema_mut(|schema| {
// TODO: This function below is synchronous, make it async
let existing_views = schema.materialized_views.clone();
conn.is_nested_stmt.set(true);
parse_schema_rows(
stmt,
schema,
@@ -6452,9 +6466,11 @@ pub fn op_parse_schema(
state.mv_tx_id,
existing_views,
)
})?;
}
})
};
conn.is_nested_stmt.set(false);
conn.auto_commit.set(previous_auto_commit);
maybe_nested_stmt_err?;
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}

View File

@@ -645,6 +645,9 @@ impl Interaction {
&query_str[0..query_str.len().min(4096)],
err
);
if let Some(turso_core::LimboError::ParseError(e)) = err {
panic!("Unexpected parse error: {e}");
}
return Err(err.unwrap());
}
let mut rows = rows.unwrap().unwrap();

View File

@@ -16,7 +16,6 @@ use crate::{
Create, Delete, Drop, Insert, Query, Select,
},
table::SimValue,
FAULT_ERROR_MSG,
},
runner::env::SimulatorEnv,
};
@@ -752,12 +751,10 @@ impl Property {
Ok(Ok(()))
}
Err(err) => {
let msg = format!("{err}");
if msg.contains(FAULT_ERROR_MSG) {
Ok(Ok(()))
} else {
Err(LimboError::InternalError(msg))
}
// We cannot make any assumptions about the error content; all we are about is, if the statement errored,
// we don't shadow the results into the simulator env, i.e. we assume whatever the statement did was rolled back.
tracing::error!("Fault injection produced error: {err}");
Ok(Ok(()))
}
}
}),