diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 13592ec6c..13c8eb53c 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -33,8 +33,10 @@ //! ``` pub mod params; +pub mod transaction; pub mod value; +use transaction::TransactionBehavior; pub use value::Value; pub use params::params_from_iter; @@ -131,6 +133,7 @@ impl Database { #[allow(clippy::arc_with_non_send_sync)] let connection = Connection { inner: Arc::new(Mutex::new(conn)), + transaction_behavior: TransactionBehavior::Deferred, }; Ok(connection) } @@ -139,12 +142,14 @@ impl Database { /// A database connection. pub struct Connection { inner: Arc>>, + transaction_behavior: TransactionBehavior, } impl Clone for Connection { fn clone(&self) -> Self { Self { inner: Arc::clone(&self.inner), + transaction_behavior: self.transaction_behavior, } } } @@ -216,6 +221,15 @@ impl Connection { conn.cacheflush()?; Ok(()) } + + pub fn is_autocommit(&self) -> Result { + let conn = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + + Ok(conn.get_auto_commit()) + } } impl Debug for Connection { diff --git a/bindings/rust/src/transaction.rs b/bindings/rust/src/transaction.rs new file mode 100644 index 000000000..78925d7c7 --- /dev/null +++ b/bindings/rust/src/transaction.rs @@ -0,0 +1,442 @@ +use std::ops::Deref; + +use crate::{Connection, Result}; + +/// Options for transaction behavior. See [BEGIN +/// TRANSACTION](http://www.sqlite.org/lang_transaction.html) for details. +#[derive(Copy, Clone)] +#[non_exhaustive] +pub enum TransactionBehavior { + /// DEFERRED means that the transaction does not actually start until the + /// database is first accessed. + Deferred, + /// IMMEDIATE cause the database connection to start a new write + /// immediately, without waiting for a writes statement. + Immediate, + /// EXCLUSIVE prevents other database connections from reading the database + /// while the transaction is underway. + Exclusive, +} + +/// Options for how a Transaction should behave when it is dropped. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[non_exhaustive] +pub enum DropBehavior { + /// Roll back the changes. This is the default. + Rollback, + + /// Commit the changes. + Commit, + + /// Do not commit or roll back changes - this will leave the transaction or + /// savepoint open, so should be used with care. + Ignore, + + /// Panic. Used to enforce intentional behavior during development. + Panic, +} + +/// Represents a transaction on a database connection. +/// +/// ## Note +/// +/// Transactions will roll back by default. Use `commit` method to explicitly +/// commit the transaction, or use `set_drop_behavior` to change what happens +/// when the transaction is dropped. +/// +/// ## Example +/// +/// ```rust,no_run +/// # use turso::{Connection, Result}; +/// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) } +/// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) } +/// async fn perform_queries(conn: &mut Connection) -> Result<()> { +/// let tx = conn.transaction().await?; +/// +/// do_queries_part_1(&tx)?; // tx causes rollback if this fails +/// do_queries_part_2(&tx)?; // tx causes rollback if this fails +/// +/// tx.commit().await +/// } +/// ``` +#[derive(Debug)] +pub struct Transaction<'conn> { + conn: &'conn Connection, + drop_behavior: DropBehavior, + must_finish: bool, +} + +impl Transaction<'_> { + /// Begin a new transaction. Cannot be nested; + /// + /// Even though we don't mutate the connection, we take a `&mut Connection` + /// to prevent nested transactions on the same connection. For cases + /// where this is unacceptable, [`Transaction::new_unchecked`] is available. + #[inline] + pub async fn new( + conn: &mut Connection, + behavior: TransactionBehavior, + ) -> Result> { + Self::new_unchecked(conn, behavior).await + } + + /// Begin a new transaction, failing if a transaction is open. + /// + /// If a transaction is already open, this will return an error. Where + /// possible, [`Transaction::new`] should be preferred, as it provides a + /// compile-time guarantee that transactions are not nested. + #[inline] + pub async fn new_unchecked( + conn: &Connection, + behavior: TransactionBehavior, + ) -> Result> { + let query = match behavior { + TransactionBehavior::Deferred => "BEGIN DEFERRED", + TransactionBehavior::Immediate => "BEGIN IMMEDIATE", + TransactionBehavior::Exclusive => "BEGIN EXCLUSIVE", + }; + // TODO: Use execute_batch instead + conn.execute(query, ()).await.map(move |_| Transaction { + conn, + drop_behavior: DropBehavior::Rollback, + must_finish: true, + }) + } + + /// Get the current setting for what happens to the transaction when it is + /// dropped. + #[inline] + #[must_use] + pub fn drop_behavior(&self) -> DropBehavior { + self.drop_behavior + } + + /// Configure the transaction to perform the specified action when it is + /// dropped. + #[inline] + pub fn set_drop_behavior(&mut self, drop_behavior: DropBehavior) { + self.drop_behavior = drop_behavior; + } + + /// A convenience method which consumes and commits a transaction. + #[inline] + pub async fn commit(mut self) -> Result<()> { + self._commit().await + } + + #[inline] + async fn _commit(&mut self) -> Result<()> { + self.must_finish = false; + self.conn.execute("COMMIT", ()).await?; + Ok(()) + } + + /// A convenience method which consumes and rolls back a transaction. + #[inline] + pub async fn rollback(mut self) -> Result<()> { + self._rollback().await + } + + #[inline] + async fn _rollback(&mut self) -> Result<()> { + self.must_finish = false; + self.conn.execute("ROLLBACK", ()).await?; + Ok(()) + } + + /// Consumes the transaction, committing or rolling back according to the + /// current setting (see `drop_behavior`). + /// + /// Functionally equivalent to the `Drop` implementation, but allows + /// callers to see any errors that occur. + #[inline] + pub async fn finish(mut self) -> Result<()> { + self._finish().await + } + + #[inline] + async fn _finish(&mut self) -> Result<()> { + if self.conn.is_autocommit()? { + return Ok(()); + } + match self.drop_behavior() { + DropBehavior::Commit => { + if (self._commit().await).is_err() { + self._rollback().await + } else { + Ok(()) + } + } + DropBehavior::Rollback => self._rollback().await, + DropBehavior::Ignore => Ok(()), + DropBehavior::Panic => panic!("Transaction dropped unexpectedly."), + } + } +} + +impl Deref for Transaction<'_> { + type Target = Connection; + + #[inline] + fn deref(&self) -> &Connection { + self.conn + } +} + +impl Drop for Transaction<'_> { + #[inline] + fn drop(&mut self) { + if self.must_finish { + panic!("Transaction dropped without finish()") + } + } +} + +impl Connection { + /// Begin a new transaction with the default behavior (DEFERRED). + /// + /// The transaction defaults to rolling back when it is dropped. If you + /// want the transaction to commit, you must call + /// [`commit`](Transaction::commit) or + /// [`set_drop_behavior(DropBehavior::Commit)`](Transaction::set_drop_behavior). + /// + /// ## Example + /// + /// ```rust,no_run + /// # use turso::{Connection, Result}; + /// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) } + /// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) } + /// async fn perform_queries(conn: &mut Connection) -> Result<()> { + /// let tx = conn.transaction().await?; + /// + /// do_queries_part_1(&tx)?; // tx causes rollback if this fails + /// do_queries_part_2(&tx)?; // tx causes rollback if this fails + /// + /// tx.commit().await + /// } + /// ``` + /// + /// # Failure + /// + /// Will return `Err` if the call fails. + #[inline] + pub async fn transaction(&mut self) -> Result> { + Transaction::new(self, self.transaction_behavior).await + } + + /// Begin a new transaction with a specified behavior. + /// + /// See [`transaction`](Connection::transaction). + /// + /// # Failure + /// + /// Will return `Err` if the call fails. + #[inline] + pub async fn transaction_with_behavior( + &mut self, + behavior: TransactionBehavior, + ) -> Result> { + Transaction::new(self, behavior).await + } + + /// Begin a new transaction with the default behavior (DEFERRED). + /// + /// Attempt to open a nested transaction will result in a SQLite error. + /// `Connection::transaction` prevents this at compile time by taking `&mut + /// self`, but `Connection::unchecked_transaction()` may be used to defer + /// the checking until runtime. + /// + /// See [`Connection::transaction`] and [`Transaction::new_unchecked`] + /// (which can be used if the default transaction behavior is undesirable). + /// + /// ## Example + /// + /// ```rust,no_run + /// # use turso::{Connection, Result}; + /// # use std::rc::Rc; + /// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) } + /// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) } + /// async fn perform_queries(conn: Rc) -> Result<()> { + /// let tx = conn.unchecked_transaction().await?; + /// + /// do_queries_part_1(&tx)?; // tx causes rollback if this fails + /// do_queries_part_2(&tx)?; // tx causes rollback if this fails + /// + /// tx.commit().await + /// } + /// ``` + /// + /// # Failure + /// + /// Will return `Err` if the underlying SQLite call fails. The specific + /// error returned if transactions are nested is currently unspecified. + pub async fn unchecked_transaction(&self) -> Result> { + Transaction::new_unchecked(self, self.transaction_behavior).await + } + + /// Set the default transaction behavior for the connection. + /// + /// ## Note + /// + /// This will only apply to transactions initiated by [`transaction`](Connection::transaction) + /// or [`unchecked_transaction`](Connection::unchecked_transaction). + /// + /// ## Example + /// + /// ```rust,no_run + /// # use turso::{Connection, Result}; + /// # use turso::transaction::TransactionBehavior; + /// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) } + /// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) } + /// async fn perform_queries(conn: &mut Connection) -> Result<()> { + /// conn.set_transaction_behavior(TransactionBehavior::Immediate); + /// + /// let tx = conn.transaction().await?; + /// + /// do_queries_part_1(&tx)?; // tx causes rollback if this fails + /// do_queries_part_2(&tx)?; // tx causes rollback if this fails + /// + /// tx.commit().await + /// } + /// ``` + pub fn set_transaction_behavior(&mut self, behavior: TransactionBehavior) { + self.transaction_behavior = behavior; + } +} + +#[cfg(test)] +mod test { + use crate::{Builder, Connection, Error, Result}; + + use super::DropBehavior; + + async fn checked_memory_handle() -> Result { + let db = Builder::new_local(":memory:").build().await?; + let conn = db.connect()?; + conn.execute("CREATE TABLE foo (x INTEGER)", ()).await?; + Ok(conn) + } + + #[tokio::test] + #[should_panic(expected = "Transaction dropped without finish()")] + async fn test_drop_panic() { + let mut conn = checked_memory_handle().await.unwrap(); + { + let tx = conn.transaction().await.unwrap(); + tx.execute("INSERT INTO foo VALUES(?)", &[1]).await.unwrap(); + } + } + + #[tokio::test] + async fn test_drop() -> Result<()> { + let mut conn = checked_memory_handle().await?; + { + let tx = conn.transaction().await?; + tx.execute("INSERT INTO foo VALUES(?)", &[1]).await?; + tx.finish().await?; + // default: rollback + } + { + let mut tx = conn.transaction().await?; + tx.execute("INSERT INTO foo VALUES(?)", &[2]).await?; + tx.set_drop_behavior(DropBehavior::Commit); + tx.finish().await?; + } + { + let tx = conn.transaction().await?; + let mut result = tx.query("SELECT SUM(x) FROM foo", ()).await?; + assert_eq!( + 2, + *result + .next() + .await? + .unwrap() + .get_value(0)? + .as_integer() + .unwrap() + ); + tx.finish().await?; + } + Ok(()) + } + + fn assert_nested_tx_error(e: Error) { + if let Error::SqlExecutionFailure(e) = &e { + assert!(e.contains("transaction")); + } else { + panic!("Unexpected error type: {e:?}"); + } + } + + #[tokio::test] + async fn test_unchecked_nesting() -> Result<()> { + let conn = checked_memory_handle().await?; + + { + let tx = conn.unchecked_transaction().await?; + let e = tx.unchecked_transaction().await.unwrap_err(); + assert_nested_tx_error(e); + tx.finish().await?; + // default: rollback + } + { + let tx = conn.unchecked_transaction().await?; + tx.execute("INSERT INTO foo VALUES(?)", &[1]).await?; + // Ensure this doesn't interfere with ongoing transaction + let e = tx.unchecked_transaction().await.unwrap_err(); + assert_nested_tx_error(e); + + tx.execute("INSERT INTO foo VALUES(?)", &[1]).await?; + tx.commit().await?; + } + + let mut result = conn.query("SELECT SUM(x) FROM foo", ()).await?; + assert_eq!( + 2, + *result + .next() + .await? + .unwrap() + .get_value(0)? + .as_integer() + .unwrap() + ); + Ok(()) + } + + #[tokio::test] + async fn test_explicit_rollback_commit() -> Result<()> { + let mut conn = checked_memory_handle().await?; + { + let tx = conn.transaction().await?; + tx.execute("INSERT INTO foo VALUES(?)", &[1]).await?; + tx.rollback().await?; + + // This is a current Turso's limitation. + // Since we don't have support for savepoints yet, + // a rollback ends with a transaction so we need to immediately open a new one. + let tx = conn.transaction().await?; + tx.execute("INSERT INTO foo VALUES(?)", &[2]).await?; + tx.commit().await?; + } + { + let tx = conn.transaction().await?; + tx.execute("INSERT INTO foo VALUES(?)", &[4]).await?; + tx.commit().await?; + } + { + let mut result = conn.query("SELECT SUM(x) FROM foo", ()).await?; + assert_eq!( + 6, + *result + .next() + .await? + .unwrap() + .get_value(0)? + .as_integer() + .unwrap() + ); + } + Ok(()) + } +} diff --git a/cli/mcp_server.rs b/cli/mcp_server.rs index ca66051e1..5411ee468 100644 --- a/cli/mcp_server.rs +++ b/cli/mcp_server.rs @@ -463,7 +463,7 @@ impl TursoMcpServer { if columns.is_empty() { format!("Table '{table_name}' not found") } else { - format!("Table '{}' columns:\n{}", table_name, columns.join("\n")) + format!("Table '{table_name}' columns:\n{}", columns.join("\n")) } } Ok(None) => format!("Table '{table_name}' not found"),