From d77dd8400d948e1640e424105af18a334eb4aa04 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Thu, 16 Oct 2025 15:42:51 +0300 Subject: [PATCH] bindings/rust: rollback dangling tx on next access of DB, instead of panicing Closes #3748 Right now if any error happens during an interactive tx that causes the `Transaction` to drop, the program will panic. I don't know how good this solution is, but we can at least prevent a panic by storing whether the connection has a dangling transaction and roll it back automatically the next time the connection tries to do something. --- bindings/rust/src/lib.rs | 27 +++++++++++ bindings/rust/src/transaction.rs | 77 +++++++++++++++++++++++++++----- 2 files changed, 94 insertions(+), 10 deletions(-) diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 5d87ad5f0..4f35728dc 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -48,6 +48,8 @@ pub use params::IntoParams; use std::fmt::Debug; use std::future::Future; use std::num::NonZero; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use std::task::Poll; pub use turso_core::EncryptionOpts; @@ -222,6 +224,11 @@ impl Database { pub struct Connection { inner: Arc>>, transaction_behavior: TransactionBehavior, + /// Whether there is a dangling transaction after it was dropped without being finished. + /// We cannot rollback the transaction on Drop because drop is not async. + /// Instead, we roll back the dangling transaction whenever a new transaction is requested + /// or the connection queries/executes. + dangling_tx: AtomicBool, } impl Clone for Connection { @@ -229,6 +236,7 @@ impl Clone for Connection { Self { inner: Arc::clone(&self.inner), transaction_behavior: self.transaction_behavior, + dangling_tx: AtomicBool::new(self.dangling_tx.load(Ordering::SeqCst)), } } } @@ -242,17 +250,34 @@ impl Connection { let connection = Connection { inner: Arc::new(Mutex::new(conn)), transaction_behavior: TransactionBehavior::Deferred, + dangling_tx: AtomicBool::new(false), }; connection } + + fn has_dangling_tx(&self) -> bool { + self.dangling_tx.load(Ordering::SeqCst) + } + + async fn maybe_rollback_dangling_tx(&self) -> Result<()> { + if self.has_dangling_tx() { + let mut stmt = self.prepare("ROLLBACK").await?; + stmt.execute(()).await?; + self.dangling_tx.store(false, Ordering::SeqCst); + } + Ok(()) + } + /// Query the database with SQL. pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result { + self.maybe_rollback_dangling_tx().await?; let mut stmt = self.prepare(sql).await?; stmt.query(params).await } /// Execute SQL statement on the database. pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result { + self.maybe_rollback_dangling_tx().await?; let mut stmt = self.prepare(sql).await?; stmt.execute(params).await } @@ -337,6 +362,7 @@ impl Connection { /// Execute a batch of SQL statements on the database. pub async fn execute_batch(&self, sql: &str) -> Result<()> { + self.maybe_rollback_dangling_tx().await?; self.prepare_execute_batch(sql).await?; Ok(()) } @@ -358,6 +384,7 @@ impl Connection { } async fn prepare_execute_batch(&self, sql: impl AsRef) -> Result<()> { + self.maybe_rollback_dangling_tx().await?; let conn = self .inner .lock() diff --git a/bindings/rust/src/transaction.rs b/bindings/rust/src/transaction.rs index b68cc1fab..9780856a5 100644 --- a/bindings/rust/src/transaction.rs +++ b/bindings/rust/src/transaction.rs @@ -1,4 +1,4 @@ -use std::ops::Deref; +use std::{ops::Deref, sync::atomic::Ordering}; use crate::{Connection, Result}; @@ -63,7 +63,7 @@ pub enum DropBehavior { pub struct Transaction<'conn> { conn: &'conn Connection, drop_behavior: DropBehavior, - must_finish: bool, + in_progress: bool, } impl Transaction<'_> { @@ -99,7 +99,7 @@ impl Transaction<'_> { conn.execute(query, ()).await.map(move |_| Transaction { conn, drop_behavior: DropBehavior::Rollback, - must_finish: true, + in_progress: true, }) } @@ -126,7 +126,7 @@ impl Transaction<'_> { #[inline] async fn _commit(&mut self) -> Result<()> { - self.must_finish = false; + self.in_progress = false; self.conn.execute("COMMIT", ()).await?; Ok(()) } @@ -139,7 +139,7 @@ impl Transaction<'_> { #[inline] async fn _rollback(&mut self) -> Result<()> { - self.must_finish = false; + self.in_progress = false; self.conn.execute("ROLLBACK", ()).await?; Ok(()) } @@ -186,8 +186,10 @@ impl Deref for Transaction<'_> { impl Drop for Transaction<'_> { #[inline] fn drop(&mut self) { - if self.must_finish { - panic!("Transaction dropped without finish()") + if self.in_progress { + self.conn.dangling_tx.store(true, Ordering::SeqCst); + } else { + self.conn.dangling_tx.store(false, Ordering::SeqCst); } } } @@ -221,7 +223,8 @@ impl Connection { /// Will return `Err` if the call fails. #[inline] pub async fn transaction(&mut self) -> Result> { - Transaction::new(self, self.transaction_behavior).await + self.transaction_with_behavior(self.transaction_behavior) + .await } /// Begin a new transaction with a specified behavior. @@ -236,6 +239,7 @@ impl Connection { &mut self, behavior: TransactionBehavior, ) -> Result> { + self.maybe_rollback_dangling_tx().await?; Transaction::new(self, behavior).await } @@ -318,13 +322,66 @@ mod test { } #[tokio::test] - #[should_panic(expected = "Transaction dropped without finish()")] - async fn test_drop_panic() { + async fn test_drop_rollback_on_new_transaction() { let mut conn = checked_memory_handle().await.unwrap(); { let tx = conn.transaction().await.unwrap(); tx.execute("INSERT INTO foo VALUES(?)", &[1]).await.unwrap(); + // Drop without finish - should be rolled back when next transaction starts } + + // Start a new transaction - this should rollback the dangling one + let tx = conn.transaction().await.unwrap(); + tx.execute("INSERT INTO foo VALUES(?)", &[2]).await.unwrap(); + let result = tx + .prepare("SELECT SUM(x) FROM foo") + .await + .unwrap() + .query_row(()) + .await + .unwrap(); + + // The insert from the dropped transaction should have been rolled back + assert_eq!(2, result.get::(0).unwrap()); + tx.finish().await.unwrap(); + } + + #[tokio::test] + async fn test_drop_rollback_on_query() { + let mut conn = checked_memory_handle().await.unwrap(); + { + let tx = conn.transaction().await.unwrap(); + tx.execute("INSERT INTO foo VALUES(?)", &[1]).await.unwrap(); + // Drop without finish - should be rolled back when conn.query is called + } + + // Using conn.query should rollback the dangling transaction + let mut rows = conn.query("SELECT count(*) FROM foo", ()).await.unwrap(); + let result = rows.next().await.unwrap().unwrap(); + + // The insert from the dropped transaction should have been rolled back + assert_eq!(0, result.get::(0).unwrap()); + } + + #[tokio::test] + async fn test_drop_rollback_on_execute() { + let mut conn = checked_memory_handle().await.unwrap(); + { + let tx = conn.transaction().await.unwrap(); + tx.execute("INSERT INTO foo VALUES(?)", &[1]).await.unwrap(); + // Drop without finish - should be rolled back when conn.execute is called + } + + // Using conn.execute should rollback the dangling transaction + conn.execute("INSERT INTO foo VALUES(?)", &[2]) + .await + .unwrap(); + + let mut rows = conn.query("SELECT count(*) FROM foo", ()).await.unwrap(); + let result = rows.next().await.unwrap().unwrap(); + + // The insert from the dropped transaction should have been rolled back + assert_eq!(1, result.get::(0).unwrap()); } #[tokio::test]