Do not start or end transaction in nested statement

This commit is contained in:
Jussi Saurio
2025-08-19 13:03:14 +03:00
parent 97657a86b3
commit 7f1eac9560
3 changed files with 61 additions and 34 deletions

View File

@@ -424,6 +424,7 @@ impl Database {
query_only: Cell::new(false),
view_transaction_states: RefCell::new(HashMap::new()),
metrics: RefCell::new(ConnectionMetrics::new()),
is_nested_stmt: Cell::new(false),
});
let builtin_syms = self.builtin_syms.borrow();
// add built-in extensions symbols to the connection to prevent having to load each time
@@ -848,6 +849,9 @@ pub struct Connection {
view_transaction_states: RefCell<HashMap<String, ViewTransactionState>>,
/// Connection-level metrics aggregation
pub metrics: RefCell<ConnectionMetrics>,
/// Whether the connection is executing a statement initiated by another statement.
/// Generally this is only true for ParseSchema.
is_nested_stmt: Cell<bool>,
}
impl Connection {
@@ -2040,6 +2044,9 @@ impl Statement {
pub fn run_once(&self) -> Result<()> {
let res = self.pager.io.run_once();
if self.program.connection.is_nested_stmt.get() {
return res;
}
if res.is_err() {
let state = self.program.connection.transaction_state.get();
if let TransactionState::Write { .. } = state {

View File

@@ -962,6 +962,10 @@ impl Pager {
connection: &Connection,
wal_auto_checkpoint_disabled: bool,
) -> Result<IOResult<PagerCommitResult>> {
if connection.is_nested_stmt.get() {
// Parent statement will handle the transaction rollback.
return Ok(IOResult::Done(PagerCommitResult::Rollback));
}
tracing::trace!("end_tx(rollback={})", rollback);
let Some(wal) = self.wal.as_ref() else {
// TODO: Unsure what the semantics of "end_tx" is for in-memory databases, ephemeral tables and ephemeral indexes.

View File

@@ -2015,41 +2015,45 @@ pub fn op_transaction(
// 1. We try to upgrade current version
let current_state = conn.transaction_state.get();
let (new_transaction_state, updated) = match (current_state, write) {
// pending state means that we tried beginning a tx and the method returned IO.
// instead of ending the read tx, just update the state to pending.
(TransactionState::PendingUpgrade, write) => {
turso_assert!(
*write,
"pending upgrade should only be set for write transactions"
);
(
let (new_transaction_state, updated) = if conn.is_nested_stmt.get() {
(current_state, false)
} else {
match (current_state, write) {
// pending state means that we tried beginning a tx and the method returned IO.
// instead of ending the read tx, just update the state to pending.
(TransactionState::PendingUpgrade, write) => {
turso_assert!(
*write,
"pending upgrade should only be set for write transactions"
);
(
TransactionState::Write {
schema_did_change: false,
},
true,
)
}
(TransactionState::Write { schema_did_change }, true) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Write { schema_did_change }, false) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Read, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
)
),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::None, false) => (TransactionState::Read, true),
}
(TransactionState::Write { schema_did_change }, true) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Write { schema_did_change }, false) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Read, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::None, false) => (TransactionState::Read, true),
};
// 2. Start transaction if needed
@@ -2073,12 +2077,20 @@ pub fn op_transaction(
}
} else {
if updated && matches!(current_state, TransactionState::None) {
turso_assert!(
!conn.is_nested_stmt.get(),
"nested stmt should not begin a new read transaction"
);
if let LimboResult::Busy = pager.begin_read_tx()? {
return Ok(InsnFunctionStepResult::Busy);
}
}
if updated && matches!(new_transaction_state, TransactionState::Write { .. }) {
turso_assert!(
!conn.is_nested_stmt.get(),
"nested stmt should not begin a new write transaction"
);
match pager.begin_write_tx()? {
IOResult::Done(r) => {
if let LimboResult::Busy = r {
@@ -6425,12 +6437,13 @@ pub fn op_parse_schema(
let previous_auto_commit = conn.auto_commit.get();
conn.auto_commit.set(false);
if let Some(where_clause) = where_clause {
let maybe_nested_stmt_err = if let Some(where_clause) = where_clause {
let stmt = conn.prepare(format!("SELECT * FROM sqlite_schema WHERE {where_clause}"))?;
conn.with_schema_mut(|schema| {
// TODO: This function below is synchronous, make it async
let existing_views = schema.materialized_views.clone();
conn.is_nested_stmt.set(true);
parse_schema_rows(
stmt,
schema,
@@ -6438,13 +6451,14 @@ pub fn op_parse_schema(
state.mv_tx_id,
existing_views,
)
})?;
})
} else {
let stmt = conn.prepare("SELECT * FROM sqlite_schema")?;
conn.with_schema_mut(|schema| {
// TODO: This function below is synchronous, make it async
let existing_views = schema.materialized_views.clone();
conn.is_nested_stmt.set(true);
parse_schema_rows(
stmt,
schema,
@@ -6452,9 +6466,11 @@ pub fn op_parse_schema(
state.mv_tx_id,
existing_views,
)
})?;
}
})
};
conn.is_nested_stmt.set(false);
conn.auto_commit.set(previous_auto_commit);
maybe_nested_stmt_err?;
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}