bindings/rust: rollback dangling tx on next access of DB, instead of panicing

Closes #3748

Right now if any error happens during an interactive tx that causes the
`Transaction` to drop, the program will panic.

I don't know how good this solution is, but we can at least prevent a panic
by storing whether the connection has a dangling transaction and roll it back
automatically the next time the connection tries to do something.
This commit is contained in:
Jussi Saurio
2025-10-16 15:42:51 +03:00
parent e9c0fdcb4b
commit d77dd8400d
2 changed files with 94 additions and 10 deletions

View File

@@ -48,6 +48,8 @@ pub use params::IntoParams;
use std::fmt::Debug;
use std::future::Future;
use std::num::NonZero;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::task::Poll;
pub use turso_core::EncryptionOpts;
@@ -222,6 +224,11 @@ impl Database {
pub struct Connection {
inner: Arc<Mutex<Arc<turso_core::Connection>>>,
transaction_behavior: TransactionBehavior,
/// Whether there is a dangling transaction after it was dropped without being finished.
/// We cannot rollback the transaction on Drop because drop is not async.
/// Instead, we roll back the dangling transaction whenever a new transaction is requested
/// or the connection queries/executes.
dangling_tx: AtomicBool,
}
impl Clone for Connection {
@@ -229,6 +236,7 @@ impl Clone for Connection {
Self {
inner: Arc::clone(&self.inner),
transaction_behavior: self.transaction_behavior,
dangling_tx: AtomicBool::new(self.dangling_tx.load(Ordering::SeqCst)),
}
}
}
@@ -242,17 +250,34 @@ impl Connection {
let connection = Connection {
inner: Arc::new(Mutex::new(conn)),
transaction_behavior: TransactionBehavior::Deferred,
dangling_tx: AtomicBool::new(false),
};
connection
}
fn has_dangling_tx(&self) -> bool {
self.dangling_tx.load(Ordering::SeqCst)
}
async fn maybe_rollback_dangling_tx(&self) -> Result<()> {
if self.has_dangling_tx() {
let mut stmt = self.prepare("ROLLBACK").await?;
stmt.execute(()).await?;
self.dangling_tx.store(false, Ordering::SeqCst);
}
Ok(())
}
/// Query the database with SQL.
pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result<Rows> {
self.maybe_rollback_dangling_tx().await?;
let mut stmt = self.prepare(sql).await?;
stmt.query(params).await
}
/// Execute SQL statement on the database.
pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result<u64> {
self.maybe_rollback_dangling_tx().await?;
let mut stmt = self.prepare(sql).await?;
stmt.execute(params).await
}
@@ -337,6 +362,7 @@ impl Connection {
/// Execute a batch of SQL statements on the database.
pub async fn execute_batch(&self, sql: &str) -> Result<()> {
self.maybe_rollback_dangling_tx().await?;
self.prepare_execute_batch(sql).await?;
Ok(())
}
@@ -358,6 +384,7 @@ impl Connection {
}
async fn prepare_execute_batch(&self, sql: impl AsRef<str>) -> Result<()> {
self.maybe_rollback_dangling_tx().await?;
let conn = self
.inner
.lock()

View File

@@ -1,4 +1,4 @@
use std::ops::Deref;
use std::{ops::Deref, sync::atomic::Ordering};
use crate::{Connection, Result};
@@ -63,7 +63,7 @@ pub enum DropBehavior {
pub struct Transaction<'conn> {
conn: &'conn Connection,
drop_behavior: DropBehavior,
must_finish: bool,
in_progress: bool,
}
impl Transaction<'_> {
@@ -99,7 +99,7 @@ impl Transaction<'_> {
conn.execute(query, ()).await.map(move |_| Transaction {
conn,
drop_behavior: DropBehavior::Rollback,
must_finish: true,
in_progress: true,
})
}
@@ -126,7 +126,7 @@ impl Transaction<'_> {
#[inline]
async fn _commit(&mut self) -> Result<()> {
self.must_finish = false;
self.in_progress = false;
self.conn.execute("COMMIT", ()).await?;
Ok(())
}
@@ -139,7 +139,7 @@ impl Transaction<'_> {
#[inline]
async fn _rollback(&mut self) -> Result<()> {
self.must_finish = false;
self.in_progress = false;
self.conn.execute("ROLLBACK", ()).await?;
Ok(())
}
@@ -186,8 +186,10 @@ impl Deref for Transaction<'_> {
impl Drop for Transaction<'_> {
#[inline]
fn drop(&mut self) {
if self.must_finish {
panic!("Transaction dropped without finish()")
if self.in_progress {
self.conn.dangling_tx.store(true, Ordering::SeqCst);
} else {
self.conn.dangling_tx.store(false, Ordering::SeqCst);
}
}
}
@@ -221,7 +223,8 @@ impl Connection {
/// Will return `Err` if the call fails.
#[inline]
pub async fn transaction(&mut self) -> Result<Transaction<'_>> {
Transaction::new(self, self.transaction_behavior).await
self.transaction_with_behavior(self.transaction_behavior)
.await
}
/// Begin a new transaction with a specified behavior.
@@ -236,6 +239,7 @@ impl Connection {
&mut self,
behavior: TransactionBehavior,
) -> Result<Transaction<'_>> {
self.maybe_rollback_dangling_tx().await?;
Transaction::new(self, behavior).await
}
@@ -318,13 +322,66 @@ mod test {
}
#[tokio::test]
#[should_panic(expected = "Transaction dropped without finish()")]
async fn test_drop_panic() {
async fn test_drop_rollback_on_new_transaction() {
let mut conn = checked_memory_handle().await.unwrap();
{
let tx = conn.transaction().await.unwrap();
tx.execute("INSERT INTO foo VALUES(?)", &[1]).await.unwrap();
// Drop without finish - should be rolled back when next transaction starts
}
// Start a new transaction - this should rollback the dangling one
let tx = conn.transaction().await.unwrap();
tx.execute("INSERT INTO foo VALUES(?)", &[2]).await.unwrap();
let result = tx
.prepare("SELECT SUM(x) FROM foo")
.await
.unwrap()
.query_row(())
.await
.unwrap();
// The insert from the dropped transaction should have been rolled back
assert_eq!(2, result.get::<i32>(0).unwrap());
tx.finish().await.unwrap();
}
#[tokio::test]
async fn test_drop_rollback_on_query() {
let mut conn = checked_memory_handle().await.unwrap();
{
let tx = conn.transaction().await.unwrap();
tx.execute("INSERT INTO foo VALUES(?)", &[1]).await.unwrap();
// Drop without finish - should be rolled back when conn.query is called
}
// Using conn.query should rollback the dangling transaction
let mut rows = conn.query("SELECT count(*) FROM foo", ()).await.unwrap();
let result = rows.next().await.unwrap().unwrap();
// The insert from the dropped transaction should have been rolled back
assert_eq!(0, result.get::<i32>(0).unwrap());
}
#[tokio::test]
async fn test_drop_rollback_on_execute() {
let mut conn = checked_memory_handle().await.unwrap();
{
let tx = conn.transaction().await.unwrap();
tx.execute("INSERT INTO foo VALUES(?)", &[1]).await.unwrap();
// Drop without finish - should be rolled back when conn.execute is called
}
// Using conn.execute should rollback the dangling transaction
conn.execute("INSERT INTO foo VALUES(?)", &[2])
.await
.unwrap();
let mut rows = conn.query("SELECT count(*) FROM foo", ()).await.unwrap();
let result = rows.next().await.unwrap().unwrap();
// The insert from the dropped transaction should have been rolled back
assert_eq!(1, result.get::<i32>(0).unwrap());
}
#[tokio::test]