From d77dd8400d948e1640e424105af18a334eb4aa04 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Thu, 16 Oct 2025 15:42:51 +0300 Subject: [PATCH 1/4] bindings/rust: rollback dangling tx on next access of DB, instead of panicing Closes #3748 Right now if any error happens during an interactive tx that causes the `Transaction` to drop, the program will panic. I don't know how good this solution is, but we can at least prevent a panic by storing whether the connection has a dangling transaction and roll it back automatically the next time the connection tries to do something. --- bindings/rust/src/lib.rs | 27 +++++++++++ bindings/rust/src/transaction.rs | 77 +++++++++++++++++++++++++++----- 2 files changed, 94 insertions(+), 10 deletions(-) diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 5d87ad5f0..4f35728dc 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -48,6 +48,8 @@ pub use params::IntoParams; use std::fmt::Debug; use std::future::Future; use std::num::NonZero; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use std::task::Poll; pub use turso_core::EncryptionOpts; @@ -222,6 +224,11 @@ impl Database { pub struct Connection { inner: Arc>>, transaction_behavior: TransactionBehavior, + /// Whether there is a dangling transaction after it was dropped without being finished. + /// We cannot rollback the transaction on Drop because drop is not async. + /// Instead, we roll back the dangling transaction whenever a new transaction is requested + /// or the connection queries/executes. + dangling_tx: AtomicBool, } impl Clone for Connection { @@ -229,6 +236,7 @@ impl Clone for Connection { Self { inner: Arc::clone(&self.inner), transaction_behavior: self.transaction_behavior, + dangling_tx: AtomicBool::new(self.dangling_tx.load(Ordering::SeqCst)), } } } @@ -242,17 +250,34 @@ impl Connection { let connection = Connection { inner: Arc::new(Mutex::new(conn)), transaction_behavior: TransactionBehavior::Deferred, + dangling_tx: AtomicBool::new(false), }; connection } + + fn has_dangling_tx(&self) -> bool { + self.dangling_tx.load(Ordering::SeqCst) + } + + async fn maybe_rollback_dangling_tx(&self) -> Result<()> { + if self.has_dangling_tx() { + let mut stmt = self.prepare("ROLLBACK").await?; + stmt.execute(()).await?; + self.dangling_tx.store(false, Ordering::SeqCst); + } + Ok(()) + } + /// Query the database with SQL. pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result { + self.maybe_rollback_dangling_tx().await?; let mut stmt = self.prepare(sql).await?; stmt.query(params).await } /// Execute SQL statement on the database. pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result { + self.maybe_rollback_dangling_tx().await?; let mut stmt = self.prepare(sql).await?; stmt.execute(params).await } @@ -337,6 +362,7 @@ impl Connection { /// Execute a batch of SQL statements on the database. pub async fn execute_batch(&self, sql: &str) -> Result<()> { + self.maybe_rollback_dangling_tx().await?; self.prepare_execute_batch(sql).await?; Ok(()) } @@ -358,6 +384,7 @@ impl Connection { } async fn prepare_execute_batch(&self, sql: impl AsRef) -> Result<()> { + self.maybe_rollback_dangling_tx().await?; let conn = self .inner .lock() diff --git a/bindings/rust/src/transaction.rs b/bindings/rust/src/transaction.rs index b68cc1fab..9780856a5 100644 --- a/bindings/rust/src/transaction.rs +++ b/bindings/rust/src/transaction.rs @@ -1,4 +1,4 @@ -use std::ops::Deref; +use std::{ops::Deref, sync::atomic::Ordering}; use crate::{Connection, Result}; @@ -63,7 +63,7 @@ pub enum DropBehavior { pub struct Transaction<'conn> { conn: &'conn Connection, drop_behavior: DropBehavior, - must_finish: bool, + in_progress: bool, } impl Transaction<'_> { @@ -99,7 +99,7 @@ impl Transaction<'_> { conn.execute(query, ()).await.map(move |_| Transaction { conn, drop_behavior: DropBehavior::Rollback, - must_finish: true, + in_progress: true, }) } @@ -126,7 +126,7 @@ impl Transaction<'_> { #[inline] async fn _commit(&mut self) -> Result<()> { - self.must_finish = false; + self.in_progress = false; self.conn.execute("COMMIT", ()).await?; Ok(()) } @@ -139,7 +139,7 @@ impl Transaction<'_> { #[inline] async fn _rollback(&mut self) -> Result<()> { - self.must_finish = false; + self.in_progress = false; self.conn.execute("ROLLBACK", ()).await?; Ok(()) } @@ -186,8 +186,10 @@ impl Deref for Transaction<'_> { impl Drop for Transaction<'_> { #[inline] fn drop(&mut self) { - if self.must_finish { - panic!("Transaction dropped without finish()") + if self.in_progress { + self.conn.dangling_tx.store(true, Ordering::SeqCst); + } else { + self.conn.dangling_tx.store(false, Ordering::SeqCst); } } } @@ -221,7 +223,8 @@ impl Connection { /// Will return `Err` if the call fails. #[inline] pub async fn transaction(&mut self) -> Result> { - Transaction::new(self, self.transaction_behavior).await + self.transaction_with_behavior(self.transaction_behavior) + .await } /// Begin a new transaction with a specified behavior. @@ -236,6 +239,7 @@ impl Connection { &mut self, behavior: TransactionBehavior, ) -> Result> { + self.maybe_rollback_dangling_tx().await?; Transaction::new(self, behavior).await } @@ -318,13 +322,66 @@ mod test { } #[tokio::test] - #[should_panic(expected = "Transaction dropped without finish()")] - async fn test_drop_panic() { + async fn test_drop_rollback_on_new_transaction() { let mut conn = checked_memory_handle().await.unwrap(); { let tx = conn.transaction().await.unwrap(); tx.execute("INSERT INTO foo VALUES(?)", &[1]).await.unwrap(); + // Drop without finish - should be rolled back when next transaction starts } + + // Start a new transaction - this should rollback the dangling one + let tx = conn.transaction().await.unwrap(); + tx.execute("INSERT INTO foo VALUES(?)", &[2]).await.unwrap(); + let result = tx + .prepare("SELECT SUM(x) FROM foo") + .await + .unwrap() + .query_row(()) + .await + .unwrap(); + + // The insert from the dropped transaction should have been rolled back + assert_eq!(2, result.get::(0).unwrap()); + tx.finish().await.unwrap(); + } + + #[tokio::test] + async fn test_drop_rollback_on_query() { + let mut conn = checked_memory_handle().await.unwrap(); + { + let tx = conn.transaction().await.unwrap(); + tx.execute("INSERT INTO foo VALUES(?)", &[1]).await.unwrap(); + // Drop without finish - should be rolled back when conn.query is called + } + + // Using conn.query should rollback the dangling transaction + let mut rows = conn.query("SELECT count(*) FROM foo", ()).await.unwrap(); + let result = rows.next().await.unwrap().unwrap(); + + // The insert from the dropped transaction should have been rolled back + assert_eq!(0, result.get::(0).unwrap()); + } + + #[tokio::test] + async fn test_drop_rollback_on_execute() { + let mut conn = checked_memory_handle().await.unwrap(); + { + let tx = conn.transaction().await.unwrap(); + tx.execute("INSERT INTO foo VALUES(?)", &[1]).await.unwrap(); + // Drop without finish - should be rolled back when conn.execute is called + } + + // Using conn.execute should rollback the dangling transaction + conn.execute("INSERT INTO foo VALUES(?)", &[2]) + .await + .unwrap(); + + let mut rows = conn.query("SELECT count(*) FROM foo", ()).await.unwrap(); + let result = rows.next().await.unwrap().unwrap(); + + // The insert from the dropped transaction should have been rolled back + assert_eq!(1, result.get::(0).unwrap()); } #[tokio::test] From 455f0fbc461a4fa222b9387da34482af26e8fd9f Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Thu, 16 Oct 2025 15:53:58 +0300 Subject: [PATCH 2/4] Set in_progress to false AFTER executing the statement --- bindings/rust/src/transaction.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bindings/rust/src/transaction.rs b/bindings/rust/src/transaction.rs index 9780856a5..c265a0de8 100644 --- a/bindings/rust/src/transaction.rs +++ b/bindings/rust/src/transaction.rs @@ -126,8 +126,8 @@ impl Transaction<'_> { #[inline] async fn _commit(&mut self) -> Result<()> { - self.in_progress = false; self.conn.execute("COMMIT", ()).await?; + self.in_progress = false; Ok(()) } @@ -139,8 +139,8 @@ impl Transaction<'_> { #[inline] async fn _rollback(&mut self) -> Result<()> { - self.in_progress = false; self.conn.execute("ROLLBACK", ()).await?; + self.in_progress = false; Ok(()) } From 392932206182f8ae8214e275bd7c1330b91b985b Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Thu, 16 Oct 2025 16:38:42 +0300 Subject: [PATCH 3/4] Propagate DropBehavior to dangling_tx so DropBehavior makes sense --- bindings/rust/src/lib.rs | 74 +++++++++++++++++++++++--------- bindings/rust/src/transaction.rs | 36 +++++++++++++--- 2 files changed, 84 insertions(+), 26 deletions(-) diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 4f35728dc..491acb926 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -48,7 +48,7 @@ pub use params::IntoParams; use std::fmt::Debug; use std::future::Future; use std::num::NonZero; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicU8; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use std::task::Poll; @@ -57,6 +57,7 @@ use turso_core::OpenFlags; // Re-exports rows pub use crate::rows::{Row, Rows}; +use crate::transaction::DropBehavior; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -220,15 +221,39 @@ impl Database { } } +/// Atomic wrapper for [DropBehavior] +struct AtomicDropBehavior { + inner: AtomicU8, +} + +impl AtomicDropBehavior { + fn new(behavior: DropBehavior) -> Self { + Self { + inner: AtomicU8::new(behavior.into()), + } + } + + fn load(&self, ordering: Ordering) -> DropBehavior { + self.inner.load(ordering).into() + } + + fn store(&self, behavior: DropBehavior, ordering: Ordering) { + self.inner.store(behavior.into(), ordering); + } +} + /// A database connection. pub struct Connection { inner: Arc>>, transaction_behavior: TransactionBehavior, - /// Whether there is a dangling transaction after it was dropped without being finished. - /// We cannot rollback the transaction on Drop because drop is not async. - /// Instead, we roll back the dangling transaction whenever a new transaction is requested + /// If there is a dangling transaction after it was dropped without being finished, + /// [Connection::dangling_tx] will be set to the [DropBehavior] of the dangling transaction, + /// and the corresponding action will be taken when a new transaction is requested /// or the connection queries/executes. - dangling_tx: AtomicBool, + /// We cannot do this eagerly on Drop because drop is not async. + /// + /// By default, the value is [DropBehavior::Ignore] which effectively does nothing. + dangling_tx: AtomicDropBehavior, } impl Clone for Connection { @@ -236,7 +261,7 @@ impl Clone for Connection { Self { inner: Arc::clone(&self.inner), transaction_behavior: self.transaction_behavior, - dangling_tx: AtomicBool::new(self.dangling_tx.load(Ordering::SeqCst)), + dangling_tx: AtomicDropBehavior::new(self.dangling_tx.load(Ordering::SeqCst)), } } } @@ -250,34 +275,43 @@ impl Connection { let connection = Connection { inner: Arc::new(Mutex::new(conn)), transaction_behavior: TransactionBehavior::Deferred, - dangling_tx: AtomicBool::new(false), + dangling_tx: AtomicDropBehavior::new(DropBehavior::Ignore), }; connection } - fn has_dangling_tx(&self) -> bool { - self.dangling_tx.load(Ordering::SeqCst) - } - - async fn maybe_rollback_dangling_tx(&self) -> Result<()> { - if self.has_dangling_tx() { - let mut stmt = self.prepare("ROLLBACK").await?; - stmt.execute(()).await?; - self.dangling_tx.store(false, Ordering::SeqCst); + async fn maybe_handle_dangling_tx(&self) -> Result<()> { + match self.dangling_tx.load(Ordering::SeqCst) { + DropBehavior::Rollback => { + let mut stmt = self.prepare("ROLLBACK").await?; + stmt.execute(()).await?; + self.dangling_tx + .store(DropBehavior::Ignore, Ordering::SeqCst); + } + DropBehavior::Commit => { + let mut stmt = self.prepare("COMMIT").await?; + stmt.execute(()).await?; + self.dangling_tx + .store(DropBehavior::Ignore, Ordering::SeqCst); + } + DropBehavior::Ignore => {} + DropBehavior::Panic => { + panic!("Transaction dropped unexpectedly."); + } } Ok(()) } /// Query the database with SQL. pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result { - self.maybe_rollback_dangling_tx().await?; + self.maybe_handle_dangling_tx().await?; let mut stmt = self.prepare(sql).await?; stmt.query(params).await } /// Execute SQL statement on the database. pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result { - self.maybe_rollback_dangling_tx().await?; + self.maybe_handle_dangling_tx().await?; let mut stmt = self.prepare(sql).await?; stmt.execute(params).await } @@ -362,7 +396,7 @@ impl Connection { /// Execute a batch of SQL statements on the database. pub async fn execute_batch(&self, sql: &str) -> Result<()> { - self.maybe_rollback_dangling_tx().await?; + self.maybe_handle_dangling_tx().await?; self.prepare_execute_batch(sql).await?; Ok(()) } @@ -384,7 +418,7 @@ impl Connection { } async fn prepare_execute_batch(&self, sql: impl AsRef) -> Result<()> { - self.maybe_rollback_dangling_tx().await?; + self.maybe_handle_dangling_tx().await?; let conn = self .inner .lock() diff --git a/bindings/rust/src/transaction.rs b/bindings/rust/src/transaction.rs index c265a0de8..75d0b69c5 100644 --- a/bindings/rust/src/transaction.rs +++ b/bindings/rust/src/transaction.rs @@ -36,6 +36,29 @@ pub enum DropBehavior { Panic, } +impl From for u8 { + fn from(behavior: DropBehavior) -> Self { + match behavior { + DropBehavior::Rollback => 0, + DropBehavior::Commit => 1, + DropBehavior::Ignore => 2, + DropBehavior::Panic => 3, + } + } +} + +impl From for DropBehavior { + fn from(value: u8) -> Self { + match value { + 0 => DropBehavior::Rollback, + 1 => DropBehavior::Commit, + 2 => DropBehavior::Ignore, + 3 => DropBehavior::Panic, + _ => panic!("Invalid drop behavior: {value}"), + } + } +} + /// Represents a transaction on a database connection. /// /// ## Note @@ -187,9 +210,13 @@ impl Drop for Transaction<'_> { #[inline] fn drop(&mut self) { if self.in_progress { - self.conn.dangling_tx.store(true, Ordering::SeqCst); + self.conn + .dangling_tx + .store(self.drop_behavior(), Ordering::SeqCst); } else { - self.conn.dangling_tx.store(false, Ordering::SeqCst); + self.conn + .dangling_tx + .store(DropBehavior::Ignore, Ordering::SeqCst); } } } @@ -239,7 +266,7 @@ impl Connection { &mut self, behavior: TransactionBehavior, ) -> Result> { - self.maybe_rollback_dangling_tx().await?; + self.maybe_handle_dangling_tx().await?; Transaction::new(self, behavior).await } @@ -390,14 +417,12 @@ mod test { { let tx = conn.transaction().await?; tx.execute("INSERT INTO foo VALUES(?)", &[1]).await?; - tx.finish().await?; // default: rollback } { let mut tx = conn.transaction().await?; tx.execute("INSERT INTO foo VALUES(?)", &[2]).await?; tx.set_drop_behavior(DropBehavior::Commit); - tx.finish().await?; } { let tx = conn.transaction().await?; @@ -408,7 +433,6 @@ mod test { .await?; assert_eq!(2, result.get::(0)?); - tx.finish().await?; } Ok(()) } From 7728f3ab58bb7c14063bbb7cf1b86fa64b8ff51f Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Thu, 16 Oct 2025 16:40:02 +0300 Subject: [PATCH 4/4] Update DropBehavior related documentation to reflect reality --- bindings/rust/src/transaction.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bindings/rust/src/transaction.rs b/bindings/rust/src/transaction.rs index 75d0b69c5..5e0b214de 100644 --- a/bindings/rust/src/transaction.rs +++ b/bindings/rust/src/transaction.rs @@ -65,7 +65,7 @@ impl From for DropBehavior { /// /// Transactions will roll back by default. Use `commit` method to explicitly /// commit the transaction, or use `set_drop_behavior` to change what happens -/// when the transaction is dropped. +/// on the next access to the connection after the transaction is dropped. /// /// ## Example /// @@ -224,7 +224,8 @@ impl Drop for Transaction<'_> { impl Connection { /// Begin a new transaction with the default behavior (DEFERRED). /// - /// The transaction defaults to rolling back when it is dropped. If you + /// The transaction defaults to rolling back on the next access to the connection + /// if it is not finished when the transaction is dropped. If you /// want the transaction to commit, you must call /// [`commit`](Transaction::commit) or /// [`set_drop_behavior(DropBehavior::Commit)`](Transaction::set_drop_behavior).