diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 6727784e5..003b95900 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1,5 +1,7 @@ use crate::mvcc::clock::LogicalClock; use crate::mvcc::persistent_storage::Storage; +use crate::result::LimboResult; +use crate::return_if_io; use crate::state_machine::StateMachine; use crate::state_machine::StateTransition; use crate::state_machine::TransitionResult; @@ -448,12 +450,14 @@ impl StateTransition for CommitStateMachine { // 1. Wait for currently writer to finish before second txn starts. // 2. Choose a txn to write depending on some heuristics like amount of frames will be written. // 3. .. - // - let result = self.pager.io.block(|| self.pager.begin_write_tx())?; - if let crate::result::LimboResult::Busy = result { - return Err(LimboError::InternalError( - "Pager write transaction busy".to_string(), - )); + + // If this is the exclusive transaction, we already acquired a write transaction + // on the pager in begin_exclusive_tx() and don't need to acquire it. + if !mvcc_store.is_exclusive_tx(&self.tx_id) { + let result = self.pager.io.block(|| self.pager.begin_write_tx())?; + if let LimboResult::Busy = result { + return Err(LimboError::Busy); + } } self.state = CommitState::WriteRow { end_ts, @@ -613,6 +617,11 @@ impl StateTransition for CommitStateMachine { // FIXME: it actually just become a problem for today!!! // TODO: test that reproduces this failure, and then a fix mvcc_store.txs.remove(&self.tx_id); + + if mvcc_store.is_exclusive_tx(&self.tx_id) { + mvcc_store.release_exclusive_tx(&self.tx_id); + } + if !log_record.row_versions.is_empty() { mvcc_store.storage.log_tx(log_record)?; } @@ -816,6 +825,13 @@ pub struct MvStore { clock: Clock, storage: Storage, loaded_tables: RwLock>, + + /// The transaction ID of a transaction that has acquired an exclusive write lock, if any. + /// + /// An exclusive MVCC transaction is one that has a write lock on the pager, which means + /// every other MVCC transaction must wait for it to commit before they can commit. We have + /// exclusive transactions to support single-writer semantics for compatibility with SQLite. + exclusive_tx: RwLock>, } impl MvStore { @@ -829,6 +845,7 @@ impl MvStore { clock, storage, loaded_tables: RwLock::new(HashSet::new()), + exclusive_tx: RwLock::new(None), } } @@ -1102,6 +1119,47 @@ impl MvStore { } } + /// Begins an exclusive write transaction that prevents concurrent writes. + /// + /// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need + /// to ensure exclusive write access as per SQLite semantics. + pub fn begin_exclusive_tx(&self, pager: Rc) -> Result> { + let tx_id = self.get_tx_id(); + let begin_ts = self.get_timestamp(); + + self.acquire_exclusive_tx(&tx_id)?; + + // Try to acquire the pager read lock + match pager.begin_read_tx()? { + LimboResult::Busy => { + self.release_exclusive_tx(&tx_id); + return Err(LimboError::Busy); + } + LimboResult::Ok => {} + } + // Try to acquire the pager write lock + match return_if_io!(pager.begin_write_tx()) { + LimboResult::Busy => { + tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id); + // Failed to get pager lock - release our exclusive lock + self.release_exclusive_tx(&tx_id); + pager.end_read_tx()?; + return Err(LimboError::Busy); + } + LimboResult::Ok => { + let tx = Transaction::new(tx_id, begin_ts); + tracing::trace!( + "begin_exclusive_tx(tx_id={}) - exclusive write transaction", + tx_id + ); + tracing::debug!("begin_exclusive_tx: tx_id={} succeeded", tx_id); + self.txs.insert(tx_id, RwLock::new(tx)); + } + } + + Ok(IOResult::Done(tx_id)) + } + /// Begins a new transaction in the database. /// /// This function starts a new transaction in the database and returns a `TxID` value @@ -1161,6 +1219,10 @@ impl MvStore { let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); drop(tx); + if self.is_exclusive_tx(&tx_id) { + self.release_exclusive_tx(&tx_id); + } + for ref id in write_set { if let Some(row_versions) = self.rows.get(id) { let mut row_versions = row_versions.value().write(); @@ -1180,6 +1242,30 @@ impl MvStore { self.txs.remove(&tx_id); } + /// Returns true if the given transaction is the exclusive transaction. + fn is_exclusive_tx(&self, tx_id: &TxID) -> bool { + self.exclusive_tx.read().as_ref() == Some(tx_id) + } + + /// Acquires the exclusive transaction lock to the given transaction ID. + fn acquire_exclusive_tx(&self, tx_id: &TxID) -> Result<()> { + let mut exclusive_tx = self.exclusive_tx.write(); + if exclusive_tx.is_some() { + // Another transaction already holds the exclusive lock + return Err(LimboError::Busy); + } + // Reserve the exclusive lock with our tx_id + *exclusive_tx = Some(*tx_id); + Ok(()) + } + + /// Release the exclusive transaction lock if held by the this transaction. + fn release_exclusive_tx(&self, tx_id: &TxID) { + let mut exclusive_tx = self.exclusive_tx.write(); + assert_eq!(exclusive_tx.as_ref(), Some(tx_id)); + *exclusive_tx = None; + } + /// Generates next unique transaction id pub fn get_tx_id(&self) -> u64 { self.tx_ids.fetch_add(1, Ordering::SeqCst) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index bf0c008fe..cad89e0ef 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -191,6 +191,7 @@ pub enum TransactionMode { None, Read, Write, + Concurrent, } /// Main entry point for emitting bytecode for a SQL query diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 1189ec589..8ed31475e 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -77,6 +77,9 @@ pub fn translate_pragma( TransactionMode::Write => { program.begin_write_operation(); } + TransactionMode::Concurrent => { + program.begin_concurrent_operation(); + } } Ok(program) diff --git a/core/translate/transaction.rs b/core/translate/transaction.rs index 3f1efb606..86427fe19 100644 --- a/core/translate/transaction.rs +++ b/core/translate/transaction.rs @@ -1,5 +1,5 @@ use crate::schema::Schema; -use crate::translate::{ProgramBuilder, ProgramBuilderOpts}; +use crate::translate::{emitter::TransactionMode, ProgramBuilder, ProgramBuilderOpts}; use crate::vdbe::insn::Insn; use crate::Result; use turso_parser::ast::{Name, TransactionType}; @@ -26,7 +26,19 @@ pub fn translate_tx_begin( TransactionType::Immediate | TransactionType::Exclusive => { program.emit_insn(Insn::Transaction { db: 0, - write: true, + tx_mode: TransactionMode::Write, + schema_cookie: schema.schema_version, + }); + // TODO: Emit transaction instruction on temporary tables when we support them. + program.emit_insn(Insn::AutoCommit { + auto_commit: false, + rollback: false, + }); + } + TransactionType::Concurrent => { + program.emit_insn(Insn::Transaction { + db: 0, + tx_mode: TransactionMode::Concurrent, schema_cookie: schema.schema_version, }); // TODO: Emit transaction instruction on temporary tables when we support them. diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index ffa26c03d..79f0cc58e 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -774,6 +774,10 @@ impl ProgramBuilder { } } + pub fn begin_concurrent_operation(&mut self) { + self.txn_mode = TransactionMode::Concurrent; + } + /// Indicates the rollback behvaiour for the halt instruction in epilogue pub fn rollback(&mut self) { self.rollback = true; @@ -791,7 +795,7 @@ impl ProgramBuilder { if !matches!(self.txn_mode, TransactionMode::None) { self.emit_insn(Insn::Transaction { db: 0, - write: matches!(self.txn_mode, TransactionMode::Write), + tx_mode: self.txn_mode, schema_cookie: schema.schema_version, }); } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 83e164adc..429cd4ac7 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -30,6 +30,7 @@ use crate::{ }, printf::exec_printf, }, + translate::emitter::TransactionMode, }; use crate::{get_cursor, MvCursor}; use std::env::temp_dir; @@ -2094,13 +2095,14 @@ pub fn op_transaction( load_insn!( Transaction { db, - write, + tx_mode, schema_cookie, }, insn ); let conn = program.connection.clone(); - if *write && conn._db.open_flags.contains(OpenFlags::ReadOnly) { + let write = matches!(tx_mode, TransactionMode::Write); + if write && conn._db.open_flags.contains(OpenFlags::ReadOnly) { return Err(LimboError::ReadOnly); } @@ -2116,7 +2118,7 @@ pub fn op_transaction( // instead of ending the read tx, just update the state to pending. (TransactionState::PendingUpgrade, write) => { turso_assert!( - *write, + write, "pending upgrade should only be set for write transactions" ); ( @@ -2164,11 +2166,59 @@ pub fn op_transaction( // if header_schema_cookie != *schema_cookie { // return Err(LimboError::SchemaUpdated); // } - let tx_id = mv_store.begin_tx(pager.clone()); + let tx_id = match tx_mode { + TransactionMode::None | TransactionMode::Read | TransactionMode::Concurrent => { + mv_store.begin_tx(pager.clone()) + } + TransactionMode::Write => { + return_if_io!(mv_store.begin_exclusive_tx(pager.clone())) + } + }; conn.mv_transactions.borrow_mut().push(tx_id); program.connection.mv_tx_id.set(Some(tx_id)); + } else if updated + && matches!(new_transaction_state, TransactionState::Write { .. }) + && matches!(tx_mode, TransactionMode::Write) + { + // Handle upgrade from read to write transaction for MVCC + // Similar to non-MVCC path, we need to try upgrading to exclusive write transaction + turso_assert!( + !conn.is_nested_stmt.get(), + "nested stmt should not begin a new write transaction" + ); + match mv_store.begin_exclusive_tx(pager.clone()) { + Ok(IOResult::Done(tx_id)) => { + // Successfully upgraded to exclusive write transaction + // Remove the old read transaction and replace with write transaction + conn.mv_transactions.borrow_mut().push(tx_id); + program.connection.mv_tx_id.set(Some(tx_id)); + } + Err(LimboError::Busy) => { + // We failed to upgrade to write transaction so put the transaction into its original state. + // For MVCC, we don't need to end the transaction like in non-MVCC case, since MVCC transactions + // can be restarted automatically if they haven't performed any reads or writes yet. + // Just ensure the transaction state remains in its original state. + assert_eq!(conn.transaction_state.get(), current_state); + return Ok(InsnFunctionStepResult::Busy); + } + Ok(IOResult::IO(io)) => { + // set the transaction state to pending so we don't have to + // end the transaction. + program + .connection + .transaction_state + .replace(TransactionState::PendingUpgrade); + return Ok(InsnFunctionStepResult::IO(io)); + } + Err(e) => return Err(e), + } } } else { + if matches!(tx_mode, TransactionMode::Concurrent) { + return Err(LimboError::TxError( + "Concurrent transaction mode is only supported when MVCC is enabled".to_string(), + )); + } if updated && matches!(current_state, TransactionState::None) { turso_assert!( !conn.is_nested_stmt.get(), @@ -2275,19 +2325,25 @@ pub fn op_auto_commit( } else { conn.auto_commit.replace(*auto_commit); } - } else if !*auto_commit { - return Err(LimboError::TxError( - "cannot start a transaction within a transaction".to_string(), - )); - } else if *rollback { - return Err(LimboError::TxError( - "cannot rollback - no transaction is active".to_string(), - )); } else { - return Err(LimboError::TxError( - "cannot commit - no transaction is active".to_string(), - )); + let mvcc_tx_active = program.connection.mv_tx_id.get().is_some(); + if !mvcc_tx_active { + if !*auto_commit { + return Err(LimboError::TxError( + "cannot start a transaction within a transaction".to_string(), + )); + } else if *rollback { + return Err(LimboError::TxError( + "cannot rollback - no transaction is active".to_string(), + )); + } else { + return Err(LimboError::TxError( + "cannot commit - no transaction is active".to_string(), + )); + } + } } + program .commit_txn(pager.clone(), state, mv_store, *rollback) .map(Into::into) diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs index 6c850aadd..b7d67d8d6 100644 --- a/core/vdbe/explain.rs +++ b/core/vdbe/explain.rs @@ -653,14 +653,14 @@ pub fn insn_to_str( 0, "".to_string(), ), - Insn::Transaction { db, write , schema_cookie} => ( + Insn::Transaction { db, tx_mode, schema_cookie} => ( "Transaction", *db as i32, - *write as i32, + *tx_mode as i32, *schema_cookie as i32, Value::build_text(""), 0, - format!("iDb={db} write={write}"), + format!("iDb={db} tx_mode={tx_mode:?}"), ), Insn::Goto { target_pc } => ( "Goto", diff --git a/core/vdbe/insn.rs b/core/vdbe/insn.rs index e2d4a07c6..ac0564c0c 100644 --- a/core/vdbe/insn.rs +++ b/core/vdbe/insn.rs @@ -7,7 +7,7 @@ use super::{execute, AggFunc, BranchOffset, CursorID, FuncCtx, InsnFunction, Pag use crate::{ schema::{Affinity, BTreeTable, Column, Index}, storage::{pager::CreateBTreeFlags, wal::CheckpointMode}, - translate::collate::CollationSeq, + translate::{collate::CollationSeq, emitter::TransactionMode}, Value, }; use turso_macros::Description; @@ -465,9 +465,9 @@ pub enum Insn { /// Start a transaction. Transaction { - db: usize, // p1 - write: bool, // p2 - schema_cookie: u32, // p3 + db: usize, // p1 + tx_mode: TransactionMode, // p2 + schema_cookie: u32, // p3 }, /// Set database auto-commit mode and potentially rollback. diff --git a/parser/src/ast.rs b/parser/src/ast.rs index 0d12d2d0b..c1ce16931 100644 --- a/parser/src/ast.rs +++ b/parser/src/ast.rs @@ -1530,6 +1530,8 @@ pub enum TransactionType { Immediate, /// `EXCLUSIVE` Exclusive, + /// `CONCURRENT`, + Concurrent, } /// Upsert clause diff --git a/parser/src/ast/fmt.rs b/parser/src/ast/fmt.rs index 4b774837e..80defd6d3 100644 --- a/parser/src/ast/fmt.rs +++ b/parser/src/ast/fmt.rs @@ -2204,6 +2204,7 @@ impl ToTokens for TransactionType { Self::Deferred => TK_DEFERRED, Self::Immediate => TK_IMMEDIATE, Self::Exclusive => TK_EXCLUSIVE, + Self::Concurrent => TK_CONCURRENT, }, None, ) diff --git a/parser/src/lexer.rs b/parser/src/lexer.rs index 5e9e809b8..aab03f36a 100644 --- a/parser/src/lexer.rs +++ b/parser/src/lexer.rs @@ -27,6 +27,7 @@ fn keyword_or_id_token(input: &[u8]) -> TokenType { b"COLLATE" => TokenType::TK_COLLATE, b"COLUMN" => TokenType::TK_COLUMNKW, b"COMMIT" => TokenType::TK_COMMIT, + b"CONCURRENT" => TokenType::TK_CONCURRENT, b"CONFLICT" => TokenType::TK_CONFLICT, b"CONSTRAINT" => TokenType::TK_CONSTRAINT, b"CREATE" => TokenType::TK_CREATE, @@ -1289,6 +1290,7 @@ mod tests { ("COLLATE", TokenType::TK_COLLATE), ("COLUMN", TokenType::TK_COLUMNKW), ("COMMIT", TokenType::TK_COMMIT), + ("CONCURRENT", TokenType::TK_CONCURRENT), ("CONFLICT", TokenType::TK_CONFLICT), ("CONSTRAINT", TokenType::TK_CONSTRAINT), ("CREATE", TokenType::TK_CREATE), diff --git a/parser/src/parser.rs b/parser/src/parser.rs index cfb10f28e..fa2230373 100644 --- a/parser/src/parser.rs +++ b/parser/src/parser.rs @@ -652,6 +652,10 @@ impl<'a> Parser<'a> { eat_assert!(self, TK_EXCLUSIVE); Some(TransactionType::Exclusive) } + TK_CONCURRENT => { + eat_assert!(self, TK_CONCURRENT); + Some(TransactionType::Concurrent) + } _ => None, }, }; @@ -4129,6 +4133,27 @@ mod tests { name: Some(Name::Quoted("'my_transaction'".to_string())), })], ), + ( + b"BEGIN CONCURRENT TRANSACTION".as_slice(), + vec![Cmd::Stmt(Stmt::Begin { + typ: Some(TransactionType::Concurrent), + name: None, + })], + ), + ( + b"BEGIN CONCURRENT TRANSACTION my_transaction".as_slice(), + vec![Cmd::Stmt(Stmt::Begin { + typ: Some(TransactionType::Concurrent), + name: Some(Name::Ident("my_transaction".to_string())), + })], + ), + ( + b"BEGIN CONCURRENT TRANSACTION 'my_transaction'".as_slice(), + vec![Cmd::Stmt(Stmt::Begin { + typ: Some(TransactionType::Concurrent), + name: Some(Name::Quoted("'my_transaction'".to_string())), + })], + ), ( ";;;BEGIN;BEGIN;;;;;;BEGIN".as_bytes(), vec![ diff --git a/parser/src/token.rs b/parser/src/token.rs index 34a7344e2..ed8f416c5 100644 --- a/parser/src/token.rs +++ b/parser/src/token.rs @@ -178,6 +178,7 @@ pub enum TokenType { TK_OVER = 166, TK_FILTER = 167, TK_ILLEGAL = 185, + TK_CONCURRENT = 186, } impl TokenType { @@ -229,6 +230,7 @@ impl TokenType { TokenType::TK_EXCEPT => Some("EXCEPT"), TokenType::TK_EXCLUDE => Some("EXCLUDE"), TokenType::TK_EXCLUSIVE => Some("EXCLUSIVE"), + TokenType::TK_CONCURRENT => Some("CONCURRENT"), TokenType::TK_EXISTS => Some("EXISTS"), TokenType::TK_EXPLAIN => Some("EXPLAIN"), TokenType::TK_FAIL => Some("FAIL"), @@ -359,6 +361,7 @@ impl Display for TokenType { TK_DEFERRED => "TK_DEFERRED", TK_IMMEDIATE => "TK_IMMEDIATE", TK_EXCLUSIVE => "TK_EXCLUSIVE", + TK_CONCURRENT => "TK_CONCURRENT", TK_COMMIT => "TK_COMMIT", TK_END => "TK_END", TK_ROLLBACK => "TK_ROLLBACK", @@ -531,15 +534,15 @@ impl TokenType { match self { TK_ABORT | TK_ACTION | TK_AFTER | TK_ANALYZE | TK_ASC | TK_ATTACH | TK_BEFORE | TK_BEGIN | TK_BY | TK_CASCADE | TK_CAST | TK_CONFLICT | TK_DATABASE | TK_DEFERRED - | TK_DESC | TK_DETACH | TK_DO | TK_EACH | TK_END | TK_EXCLUSIVE | TK_EXPLAIN - | TK_FAIL | TK_FOR | TK_IGNORE | TK_IMMEDIATE | TK_INITIALLY | TK_INSTEAD - | TK_LIKE_KW | TK_MATCH | TK_NO | TK_PLAN | TK_QUERY | TK_KEY | TK_OF | TK_OFFSET - | TK_PRAGMA | TK_RAISE | TK_RECURSIVE | TK_RELEASE | TK_REPLACE | TK_RESTRICT - | TK_ROW | TK_ROWS | TK_ROLLBACK | TK_SAVEPOINT | TK_TEMP | TK_TRIGGER | TK_VACUUM - | TK_VIEW | TK_VIRTUAL | TK_WITH | TK_NULLS | TK_FIRST | TK_LAST | TK_CURRENT - | TK_FOLLOWING | TK_PARTITION | TK_PRECEDING | TK_RANGE | TK_UNBOUNDED | TK_EXCLUDE - | TK_GROUPS | TK_OTHERS | TK_TIES | TK_ALWAYS | TK_MATERIALIZED | TK_REINDEX - | TK_RENAME | TK_CTIME_KW | TK_IF => TK_ID, + | TK_DESC | TK_DETACH | TK_DO | TK_EACH | TK_END | TK_EXCLUSIVE | TK_CONCURRENT + | TK_EXPLAIN | TK_FAIL | TK_FOR | TK_IGNORE | TK_IMMEDIATE | TK_INITIALLY + | TK_INSTEAD | TK_LIKE_KW | TK_MATCH | TK_NO | TK_PLAN | TK_QUERY | TK_KEY | TK_OF + | TK_OFFSET | TK_PRAGMA | TK_RAISE | TK_RECURSIVE | TK_RELEASE | TK_REPLACE + | TK_RESTRICT | TK_ROW | TK_ROWS | TK_ROLLBACK | TK_SAVEPOINT | TK_TEMP + | TK_TRIGGER | TK_VACUUM | TK_VIEW | TK_VIRTUAL | TK_WITH | TK_NULLS | TK_FIRST + | TK_LAST | TK_CURRENT | TK_FOLLOWING | TK_PARTITION | TK_PRECEDING | TK_RANGE + | TK_UNBOUNDED | TK_EXCLUDE | TK_GROUPS | TK_OTHERS | TK_TIES | TK_ALWAYS + | TK_MATERIALIZED | TK_REINDEX | TK_RENAME | TK_CTIME_KW | TK_IF => TK_ID, // | TK_COLUMNKW | TK_UNION | TK_EXCEPT | TK_INTERSECT | TK_GENERATED | TK_WITHOUT // see comments in `next_token` of parser _ => self, diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 8a571b8ce..858932a09 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -35,6 +35,24 @@ impl TempDatabase { Self { path, io, db } } + pub fn new_with_opts(db_name: &str, opts: turso_core::DatabaseOpts) -> Self { + let mut path = TempDir::new().unwrap().keep(); + path.push(db_name); + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + let db = Database::open_file_with_flags( + io.clone(), + path.to_str().unwrap(), + turso_core::OpenFlags::default(), + opts, + ) + .unwrap(); + Self { + path: path.to_path_buf(), + io, + db, + } + } + pub fn new_with_existent(db_path: &Path, enable_indexes: bool) -> Self { Self::new_with_existent_with_flags( db_path, diff --git a/tests/integration/query_processing/test_transactions.rs b/tests/integration/query_processing/test_transactions.rs index 3a9779e87..e969eaef7 100644 --- a/tests/integration/query_processing/test_transactions.rs +++ b/tests/integration/query_processing/test_transactions.rs @@ -172,3 +172,75 @@ fn test_transaction_visibility() { } } } + +#[test] +fn test_mvcc_transactions_autocommit() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_transactions_autocommit.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn1 = tmp_db.connect_limbo(); + + // This should work - basic CREATE TABLE in MVCC autocommit mode + conn1 + .execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)") + .unwrap(); +} + +#[test] +fn test_mvcc_transactions_immediate() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_transactions_immediate.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn1 = tmp_db.connect_limbo(); + let conn2 = tmp_db.connect_limbo(); + + conn1 + .execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)") + .unwrap(); + + // Start an immediate transaction + conn1.execute("BEGIN IMMEDIATE").unwrap(); + + // Another immediate transaction fails with BUSY + let result = conn2.execute("BEGIN IMMEDIATE"); + assert!(matches!(result, Err(LimboError::Busy))); +} + +#[test] +fn test_mvcc_transactions_deferred() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_transactions_deferred.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn1 = tmp_db.connect_limbo(); + let conn2 = tmp_db.connect_limbo(); + + conn1 + .execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)") + .unwrap(); + + conn1.execute("BEGIN DEFERRED").unwrap(); + conn2.execute("BEGIN DEFERRED").unwrap(); + + conn1 + .execute("INSERT INTO test (id, value) VALUES (1, 'first')") + .unwrap(); + + let result = conn2.execute("INSERT INTO test (id, value) VALUES (2, 'second')"); + assert!(matches!(result, Err(LimboError::Busy))); + + conn1.execute("COMMIT").unwrap(); + + conn2 + .execute("INSERT INTO test (id, value) VALUES (2, 'second')") + .unwrap(); + conn2.execute("COMMIT").unwrap(); + + let mut stmt = conn1.query("SELECT COUNT(*) FROM test").unwrap().unwrap(); + if let StepResult::Row = stmt.step().unwrap() { + let row = stmt.row().unwrap(); + assert_eq!(*row.get::<&Value>(0).unwrap(), Value::Integer(2)); + } +}