From 906bbdd1c41885d3b98559aa6c02e05eb2f3f41f Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 23 Oct 2025 14:02:34 +0400 Subject: [PATCH] support deep nestedness --- core/lib.rs | 20 ++++++++++++-------- core/storage/pager.rs | 6 +++--- core/vdbe/execute.rs | 13 ++++++------- core/vdbe/mod.rs | 10 +++++----- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 4e5eeea8d..04775c48d 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -580,7 +580,7 @@ impl Database { mv_tx: RwLock::new(None), view_transaction_states: AllViewsTxState::new(), metrics: RwLock::new(ConnectionMetrics::new()), - is_nested_stmt: AtomicBool::new(false), + nestedness: AtomicI32::new(0), encryption_key: RwLock::new(None), encryption_cipher_mode: AtomicCipherMode::new(CipherMode::None), sync_mode: AtomicSyncMode::new(SyncMode::Full), @@ -1096,7 +1096,7 @@ pub struct Connection { pub metrics: RwLock, /// Whether the connection is executing a statement initiated by another statement. /// Generally this is only true for ParseSchema. - is_nested_stmt: AtomicBool, + nestedness: AtomicI32, encryption_key: RwLock>, encryption_cipher_mode: AtomicCipherMode, sync_mode: AtomicSyncMode, @@ -1128,6 +1128,15 @@ impl Drop for Connection { } impl Connection { + pub fn is_nested_stmt(&self) -> bool { + self.nestedness.load(Ordering::SeqCst) > 0 + } + pub fn start_nested(&self) { + self.nestedness.fetch_add(1, Ordering::SeqCst); + } + pub fn end_nested(&self) { + self.nestedness.fetch_add(-1, Ordering::SeqCst); + } pub fn prepare(self: &Arc, sql: impl AsRef) -> Result { if self.is_mvcc_bootstrap_connection() { // Never use MV store for bootstrapping - we read state directly from sqlite_schema in the DB file. @@ -2681,12 +2690,7 @@ impl Statement { pub fn run_once(&self) -> Result<()> { let res = self.pager.io.step(); - if self - .program - .connection - .is_nested_stmt - .load(Ordering::SeqCst) - { + if self.program.connection.is_nested_stmt() { return res; } if res.is_err() { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 75d4a47db..28257c1b1 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1443,8 +1443,8 @@ impl Pager { #[instrument(skip_all, level = Level::DEBUG)] pub fn commit_tx(&self, connection: &Connection) -> Result> { - if connection.is_nested_stmt.load(Ordering::SeqCst) { - // Parent statement will handle the transaction rollback. + if connection.is_nested_stmt() { + // Parent statement will handle the transaction commit. return Ok(IOResult::Done(PagerCommitResult::Rollback)); } let Some(wal) = self.wal.as_ref() else { @@ -1473,7 +1473,7 @@ impl Pager { #[instrument(skip_all, level = Level::DEBUG)] pub fn rollback_tx(&self, connection: &Connection) { - if connection.is_nested_stmt.load(Ordering::SeqCst) { + if connection.is_nested_stmt() { // Parent statement will handle the transaction rollback. return; } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index ce1c8a7d2..9f8d4fdf5 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2274,8 +2274,7 @@ pub fn op_transaction_inner( // 1. We try to upgrade current version let current_state = conn.get_tx_state(); - let (new_transaction_state, updated) = if conn.is_nested_stmt.load(Ordering::SeqCst) - { + let (new_transaction_state, updated) = if conn.is_nested_stmt() { (current_state, false) } else { match (current_state, write) { @@ -2365,7 +2364,7 @@ pub fn op_transaction_inner( } if updated && matches!(current_state, TransactionState::None) { turso_assert!( - !conn.is_nested_stmt.load(Ordering::SeqCst), + !conn.is_nested_stmt(), "nested stmt should not begin a new read transaction" ); pager.begin_read_tx()?; @@ -2374,7 +2373,7 @@ pub fn op_transaction_inner( if updated && matches!(new_transaction_state, TransactionState::Write { .. }) { turso_assert!( - !conn.is_nested_stmt.load(Ordering::SeqCst), + !conn.is_nested_stmt(), "nested stmt should not begin a new write transaction" ); let begin_w_tx_res = pager.begin_write_tx(); @@ -7303,7 +7302,7 @@ pub fn op_parse_schema( conn.with_schema_mut(|schema| { // TODO: This function below is synchronous, make it async let existing_views = schema.incremental_views.clone(); - conn.is_nested_stmt.store(true, Ordering::SeqCst); + conn.start_nested(); parse_schema_rows( stmt, schema, @@ -7318,7 +7317,7 @@ pub fn op_parse_schema( conn.with_schema_mut(|schema| { // TODO: This function below is synchronous, make it async let existing_views = schema.incremental_views.clone(); - conn.is_nested_stmt.store(true, Ordering::SeqCst); + conn.start_nested(); parse_schema_rows( stmt, schema, @@ -7328,7 +7327,7 @@ pub fn op_parse_schema( ) }) }; - conn.is_nested_stmt.store(false, Ordering::SeqCst); + conn.end_nested(); conn.auto_commit .store(previous_auto_commit, Ordering::SeqCst); maybe_nested_stmt_err?; diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 8eec50ba3..0ee6d3e68 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -917,11 +917,11 @@ impl Program { // hence the mv_store.is_none() check. return Ok(IOResult::Done(())); } + if self.connection.is_nested_stmt() { + // We don't want to commit on nested statements. Let parent handle it. + return Ok(IOResult::Done(())); + } if let Some(mv_store) = mv_store { - if self.connection.is_nested_stmt.load(Ordering::SeqCst) { - // We don't want to commit on nested statements. Let parent handle it. - return Ok(IOResult::Done(())); - } let conn = self.connection.clone(); let auto_commit = conn.auto_commit.load(Ordering::SeqCst); if auto_commit { @@ -1050,7 +1050,7 @@ impl Program { cleanup: &mut TxnCleanup, ) { // Errors from nested statements are handled by the parent statement. - if !self.connection.is_nested_stmt.load(Ordering::SeqCst) { + if !self.connection.is_nested_stmt() { match err { // Transaction errors, e.g. trying to start a nested transaction, do not cause a rollback. Some(LimboError::TxError(_)) => {}