Propagate DropBehavior to dangling_tx so DropBehavior makes sense

This commit is contained in:
Jussi Saurio
2025-10-16 16:38:42 +03:00
parent 455f0fbc46
commit 3929322061
2 changed files with 84 additions and 26 deletions

View File

@@ -48,7 +48,7 @@ pub use params::IntoParams;
use std::fmt::Debug;
use std::future::Future;
use std::num::NonZero;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU8;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::task::Poll;
@@ -57,6 +57,7 @@ use turso_core::OpenFlags;
// Re-exports rows
pub use crate::rows::{Row, Rows};
use crate::transaction::DropBehavior;
#[derive(Debug, thiserror::Error)]
pub enum Error {
@@ -220,15 +221,39 @@ impl Database {
}
}
/// Atomic wrapper for [DropBehavior]
struct AtomicDropBehavior {
inner: AtomicU8,
}
impl AtomicDropBehavior {
fn new(behavior: DropBehavior) -> Self {
Self {
inner: AtomicU8::new(behavior.into()),
}
}
fn load(&self, ordering: Ordering) -> DropBehavior {
self.inner.load(ordering).into()
}
fn store(&self, behavior: DropBehavior, ordering: Ordering) {
self.inner.store(behavior.into(), ordering);
}
}
/// A database connection.
pub struct Connection {
inner: Arc<Mutex<Arc<turso_core::Connection>>>,
transaction_behavior: TransactionBehavior,
/// Whether there is a dangling transaction after it was dropped without being finished.
/// We cannot rollback the transaction on Drop because drop is not async.
/// Instead, we roll back the dangling transaction whenever a new transaction is requested
/// If there is a dangling transaction after it was dropped without being finished,
/// [Connection::dangling_tx] will be set to the [DropBehavior] of the dangling transaction,
/// and the corresponding action will be taken when a new transaction is requested
/// or the connection queries/executes.
dangling_tx: AtomicBool,
/// We cannot do this eagerly on Drop because drop is not async.
///
/// By default, the value is [DropBehavior::Ignore] which effectively does nothing.
dangling_tx: AtomicDropBehavior,
}
impl Clone for Connection {
@@ -236,7 +261,7 @@ impl Clone for Connection {
Self {
inner: Arc::clone(&self.inner),
transaction_behavior: self.transaction_behavior,
dangling_tx: AtomicBool::new(self.dangling_tx.load(Ordering::SeqCst)),
dangling_tx: AtomicDropBehavior::new(self.dangling_tx.load(Ordering::SeqCst)),
}
}
}
@@ -250,34 +275,43 @@ impl Connection {
let connection = Connection {
inner: Arc::new(Mutex::new(conn)),
transaction_behavior: TransactionBehavior::Deferred,
dangling_tx: AtomicBool::new(false),
dangling_tx: AtomicDropBehavior::new(DropBehavior::Ignore),
};
connection
}
fn has_dangling_tx(&self) -> bool {
self.dangling_tx.load(Ordering::SeqCst)
}
async fn maybe_rollback_dangling_tx(&self) -> Result<()> {
if self.has_dangling_tx() {
let mut stmt = self.prepare("ROLLBACK").await?;
stmt.execute(()).await?;
self.dangling_tx.store(false, Ordering::SeqCst);
async fn maybe_handle_dangling_tx(&self) -> Result<()> {
match self.dangling_tx.load(Ordering::SeqCst) {
DropBehavior::Rollback => {
let mut stmt = self.prepare("ROLLBACK").await?;
stmt.execute(()).await?;
self.dangling_tx
.store(DropBehavior::Ignore, Ordering::SeqCst);
}
DropBehavior::Commit => {
let mut stmt = self.prepare("COMMIT").await?;
stmt.execute(()).await?;
self.dangling_tx
.store(DropBehavior::Ignore, Ordering::SeqCst);
}
DropBehavior::Ignore => {}
DropBehavior::Panic => {
panic!("Transaction dropped unexpectedly.");
}
}
Ok(())
}
/// Query the database with SQL.
pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result<Rows> {
self.maybe_rollback_dangling_tx().await?;
self.maybe_handle_dangling_tx().await?;
let mut stmt = self.prepare(sql).await?;
stmt.query(params).await
}
/// Execute SQL statement on the database.
pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result<u64> {
self.maybe_rollback_dangling_tx().await?;
self.maybe_handle_dangling_tx().await?;
let mut stmt = self.prepare(sql).await?;
stmt.execute(params).await
}
@@ -362,7 +396,7 @@ impl Connection {
/// Execute a batch of SQL statements on the database.
pub async fn execute_batch(&self, sql: &str) -> Result<()> {
self.maybe_rollback_dangling_tx().await?;
self.maybe_handle_dangling_tx().await?;
self.prepare_execute_batch(sql).await?;
Ok(())
}
@@ -384,7 +418,7 @@ impl Connection {
}
async fn prepare_execute_batch(&self, sql: impl AsRef<str>) -> Result<()> {
self.maybe_rollback_dangling_tx().await?;
self.maybe_handle_dangling_tx().await?;
let conn = self
.inner
.lock()

View File

@@ -36,6 +36,29 @@ pub enum DropBehavior {
Panic,
}
impl From<DropBehavior> for u8 {
fn from(behavior: DropBehavior) -> Self {
match behavior {
DropBehavior::Rollback => 0,
DropBehavior::Commit => 1,
DropBehavior::Ignore => 2,
DropBehavior::Panic => 3,
}
}
}
impl From<u8> for DropBehavior {
fn from(value: u8) -> Self {
match value {
0 => DropBehavior::Rollback,
1 => DropBehavior::Commit,
2 => DropBehavior::Ignore,
3 => DropBehavior::Panic,
_ => panic!("Invalid drop behavior: {value}"),
}
}
}
/// Represents a transaction on a database connection.
///
/// ## Note
@@ -187,9 +210,13 @@ impl Drop for Transaction<'_> {
#[inline]
fn drop(&mut self) {
if self.in_progress {
self.conn.dangling_tx.store(true, Ordering::SeqCst);
self.conn
.dangling_tx
.store(self.drop_behavior(), Ordering::SeqCst);
} else {
self.conn.dangling_tx.store(false, Ordering::SeqCst);
self.conn
.dangling_tx
.store(DropBehavior::Ignore, Ordering::SeqCst);
}
}
}
@@ -239,7 +266,7 @@ impl Connection {
&mut self,
behavior: TransactionBehavior,
) -> Result<Transaction<'_>> {
self.maybe_rollback_dangling_tx().await?;
self.maybe_handle_dangling_tx().await?;
Transaction::new(self, behavior).await
}
@@ -390,14 +417,12 @@ mod test {
{
let tx = conn.transaction().await?;
tx.execute("INSERT INTO foo VALUES(?)", &[1]).await?;
tx.finish().await?;
// default: rollback
}
{
let mut tx = conn.transaction().await?;
tx.execute("INSERT INTO foo VALUES(?)", &[2]).await?;
tx.set_drop_behavior(DropBehavior::Commit);
tx.finish().await?;
}
{
let tx = conn.transaction().await?;
@@ -408,7 +433,6 @@ mod test {
.await?;
assert_eq!(2, result.get::<i32>(0)?);
tx.finish().await?;
}
Ok(())
}