From 392932206182f8ae8214e275bd7c1330b91b985b Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Thu, 16 Oct 2025 16:38:42 +0300 Subject: [PATCH] Propagate DropBehavior to dangling_tx so DropBehavior makes sense --- bindings/rust/src/lib.rs | 74 +++++++++++++++++++++++--------- bindings/rust/src/transaction.rs | 36 +++++++++++++--- 2 files changed, 84 insertions(+), 26 deletions(-) diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 4f35728dc..491acb926 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -48,7 +48,7 @@ pub use params::IntoParams; use std::fmt::Debug; use std::future::Future; use std::num::NonZero; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicU8; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use std::task::Poll; @@ -57,6 +57,7 @@ use turso_core::OpenFlags; // Re-exports rows pub use crate::rows::{Row, Rows}; +use crate::transaction::DropBehavior; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -220,15 +221,39 @@ impl Database { } } +/// Atomic wrapper for [DropBehavior] +struct AtomicDropBehavior { + inner: AtomicU8, +} + +impl AtomicDropBehavior { + fn new(behavior: DropBehavior) -> Self { + Self { + inner: AtomicU8::new(behavior.into()), + } + } + + fn load(&self, ordering: Ordering) -> DropBehavior { + self.inner.load(ordering).into() + } + + fn store(&self, behavior: DropBehavior, ordering: Ordering) { + self.inner.store(behavior.into(), ordering); + } +} + /// A database connection. pub struct Connection { inner: Arc>>, transaction_behavior: TransactionBehavior, - /// Whether there is a dangling transaction after it was dropped without being finished. - /// We cannot rollback the transaction on Drop because drop is not async. - /// Instead, we roll back the dangling transaction whenever a new transaction is requested + /// If there is a dangling transaction after it was dropped without being finished, + /// [Connection::dangling_tx] will be set to the [DropBehavior] of the dangling transaction, + /// and the corresponding action will be taken when a new transaction is requested /// or the connection queries/executes. - dangling_tx: AtomicBool, + /// We cannot do this eagerly on Drop because drop is not async. + /// + /// By default, the value is [DropBehavior::Ignore] which effectively does nothing. + dangling_tx: AtomicDropBehavior, } impl Clone for Connection { @@ -236,7 +261,7 @@ impl Clone for Connection { Self { inner: Arc::clone(&self.inner), transaction_behavior: self.transaction_behavior, - dangling_tx: AtomicBool::new(self.dangling_tx.load(Ordering::SeqCst)), + dangling_tx: AtomicDropBehavior::new(self.dangling_tx.load(Ordering::SeqCst)), } } } @@ -250,34 +275,43 @@ impl Connection { let connection = Connection { inner: Arc::new(Mutex::new(conn)), transaction_behavior: TransactionBehavior::Deferred, - dangling_tx: AtomicBool::new(false), + dangling_tx: AtomicDropBehavior::new(DropBehavior::Ignore), }; connection } - fn has_dangling_tx(&self) -> bool { - self.dangling_tx.load(Ordering::SeqCst) - } - - async fn maybe_rollback_dangling_tx(&self) -> Result<()> { - if self.has_dangling_tx() { - let mut stmt = self.prepare("ROLLBACK").await?; - stmt.execute(()).await?; - self.dangling_tx.store(false, Ordering::SeqCst); + async fn maybe_handle_dangling_tx(&self) -> Result<()> { + match self.dangling_tx.load(Ordering::SeqCst) { + DropBehavior::Rollback => { + let mut stmt = self.prepare("ROLLBACK").await?; + stmt.execute(()).await?; + self.dangling_tx + .store(DropBehavior::Ignore, Ordering::SeqCst); + } + DropBehavior::Commit => { + let mut stmt = self.prepare("COMMIT").await?; + stmt.execute(()).await?; + self.dangling_tx + .store(DropBehavior::Ignore, Ordering::SeqCst); + } + DropBehavior::Ignore => {} + DropBehavior::Panic => { + panic!("Transaction dropped unexpectedly."); + } } Ok(()) } /// Query the database with SQL. pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result { - self.maybe_rollback_dangling_tx().await?; + self.maybe_handle_dangling_tx().await?; let mut stmt = self.prepare(sql).await?; stmt.query(params).await } /// Execute SQL statement on the database. pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result { - self.maybe_rollback_dangling_tx().await?; + self.maybe_handle_dangling_tx().await?; let mut stmt = self.prepare(sql).await?; stmt.execute(params).await } @@ -362,7 +396,7 @@ impl Connection { /// Execute a batch of SQL statements on the database. pub async fn execute_batch(&self, sql: &str) -> Result<()> { - self.maybe_rollback_dangling_tx().await?; + self.maybe_handle_dangling_tx().await?; self.prepare_execute_batch(sql).await?; Ok(()) } @@ -384,7 +418,7 @@ impl Connection { } async fn prepare_execute_batch(&self, sql: impl AsRef) -> Result<()> { - self.maybe_rollback_dangling_tx().await?; + self.maybe_handle_dangling_tx().await?; let conn = self .inner .lock() diff --git a/bindings/rust/src/transaction.rs b/bindings/rust/src/transaction.rs index c265a0de8..75d0b69c5 100644 --- a/bindings/rust/src/transaction.rs +++ b/bindings/rust/src/transaction.rs @@ -36,6 +36,29 @@ pub enum DropBehavior { Panic, } +impl From for u8 { + fn from(behavior: DropBehavior) -> Self { + match behavior { + DropBehavior::Rollback => 0, + DropBehavior::Commit => 1, + DropBehavior::Ignore => 2, + DropBehavior::Panic => 3, + } + } +} + +impl From for DropBehavior { + fn from(value: u8) -> Self { + match value { + 0 => DropBehavior::Rollback, + 1 => DropBehavior::Commit, + 2 => DropBehavior::Ignore, + 3 => DropBehavior::Panic, + _ => panic!("Invalid drop behavior: {value}"), + } + } +} + /// Represents a transaction on a database connection. /// /// ## Note @@ -187,9 +210,13 @@ impl Drop for Transaction<'_> { #[inline] fn drop(&mut self) { if self.in_progress { - self.conn.dangling_tx.store(true, Ordering::SeqCst); + self.conn + .dangling_tx + .store(self.drop_behavior(), Ordering::SeqCst); } else { - self.conn.dangling_tx.store(false, Ordering::SeqCst); + self.conn + .dangling_tx + .store(DropBehavior::Ignore, Ordering::SeqCst); } } } @@ -239,7 +266,7 @@ impl Connection { &mut self, behavior: TransactionBehavior, ) -> Result> { - self.maybe_rollback_dangling_tx().await?; + self.maybe_handle_dangling_tx().await?; Transaction::new(self, behavior).await } @@ -390,14 +417,12 @@ mod test { { let tx = conn.transaction().await?; tx.execute("INSERT INTO foo VALUES(?)", &[1]).await?; - tx.finish().await?; // default: rollback } { let mut tx = conn.transaction().await?; tx.execute("INSERT INTO foo VALUES(?)", &[2]).await?; tx.set_drop_behavior(DropBehavior::Commit); - tx.finish().await?; } { let tx = conn.transaction().await?; @@ -408,7 +433,6 @@ mod test { .await?; assert_eq!(2, result.get::(0)?); - tx.finish().await?; } Ok(()) }