From 433b60555f1323e3ccb46ade6e5b8f522180acab Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 10 Sep 2025 19:43:46 +0300 Subject: [PATCH] Add BEGIN CONCURRENT support for MVCC mode Currently, when MVCC is enabled, every transaction mode supports concurrent reads and writes, which makes it hard to adopt for existing applications that use `BEGIN DEFERRED` or `BEGIN IMMEDIATE`. Therefore, add support for `BEGIN CONCURRENT` transactions when MVCC is enabled. The transaction mode allows multiple concurrent read/write transactions that don't block each other, with conflicts resolved at commit time. Furthermore, implement the correct semantics for `BEGIN DEFERRED` and `BEGIN IMMEDIATE` by taking advantage of the pager level write lock when transaction upgrades to write. This means that now concurrent MVCC transactions are serialized against the legacy ones when needed. The implementation includes: - Parser support for CONCURRENT keyword in BEGIN statements - New Concurrent variant in TransactionMode to distinguish from regular read/write transactions - MVCC store tracking of exclusive transactions to support IMMEDIATE and EXCLUSIVE modes alongside CONCURRENT - Proper transaction state management for all transaction types in MVCC This enables better concurrency for applications that can handle optimistic concurrency control, while still supporting traditional SQLite transaction semantics via IMMEDIATE and EXCLUSIVE modes. --- core/mvcc/database/mod.rs | 98 +++++++++++++++++-- core/translate/emitter.rs | 1 + core/translate/pragma.rs | 3 + core/translate/transaction.rs | 16 ++- core/vdbe/builder.rs | 6 +- core/vdbe/execute.rs | 86 +++++++++++++--- core/vdbe/explain.rs | 6 +- core/vdbe/insn.rs | 8 +- parser/src/ast.rs | 2 + parser/src/ast/fmt.rs | 1 + parser/src/lexer.rs | 2 + parser/src/parser.rs | 25 +++++ parser/src/token.rs | 21 ++-- tests/integration/common.rs | 18 ++++ .../query_processing/test_transactions.rs | 72 ++++++++++++++ 15 files changed, 325 insertions(+), 40 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 6727784e5..003b95900 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1,5 +1,7 @@ use crate::mvcc::clock::LogicalClock; use crate::mvcc::persistent_storage::Storage; +use crate::result::LimboResult; +use crate::return_if_io; use crate::state_machine::StateMachine; use crate::state_machine::StateTransition; use crate::state_machine::TransitionResult; @@ -448,12 +450,14 @@ impl StateTransition for CommitStateMachine { // 1. Wait for currently writer to finish before second txn starts. // 2. Choose a txn to write depending on some heuristics like amount of frames will be written. // 3. .. - // - let result = self.pager.io.block(|| self.pager.begin_write_tx())?; - if let crate::result::LimboResult::Busy = result { - return Err(LimboError::InternalError( - "Pager write transaction busy".to_string(), - )); + + // If this is the exclusive transaction, we already acquired a write transaction + // on the pager in begin_exclusive_tx() and don't need to acquire it. + if !mvcc_store.is_exclusive_tx(&self.tx_id) { + let result = self.pager.io.block(|| self.pager.begin_write_tx())?; + if let LimboResult::Busy = result { + return Err(LimboError::Busy); + } } self.state = CommitState::WriteRow { end_ts, @@ -613,6 +617,11 @@ impl StateTransition for CommitStateMachine { // FIXME: it actually just become a problem for today!!! // TODO: test that reproduces this failure, and then a fix mvcc_store.txs.remove(&self.tx_id); + + if mvcc_store.is_exclusive_tx(&self.tx_id) { + mvcc_store.release_exclusive_tx(&self.tx_id); + } + if !log_record.row_versions.is_empty() { mvcc_store.storage.log_tx(log_record)?; } @@ -816,6 +825,13 @@ pub struct MvStore { clock: Clock, storage: Storage, loaded_tables: RwLock>, + + /// The transaction ID of a transaction that has acquired an exclusive write lock, if any. + /// + /// An exclusive MVCC transaction is one that has a write lock on the pager, which means + /// every other MVCC transaction must wait for it to commit before they can commit. We have + /// exclusive transactions to support single-writer semantics for compatibility with SQLite. + exclusive_tx: RwLock>, } impl MvStore { @@ -829,6 +845,7 @@ impl MvStore { clock, storage, loaded_tables: RwLock::new(HashSet::new()), + exclusive_tx: RwLock::new(None), } } @@ -1102,6 +1119,47 @@ impl MvStore { } } + /// Begins an exclusive write transaction that prevents concurrent writes. + /// + /// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need + /// to ensure exclusive write access as per SQLite semantics. + pub fn begin_exclusive_tx(&self, pager: Rc) -> Result> { + let tx_id = self.get_tx_id(); + let begin_ts = self.get_timestamp(); + + self.acquire_exclusive_tx(&tx_id)?; + + // Try to acquire the pager read lock + match pager.begin_read_tx()? { + LimboResult::Busy => { + self.release_exclusive_tx(&tx_id); + return Err(LimboError::Busy); + } + LimboResult::Ok => {} + } + // Try to acquire the pager write lock + match return_if_io!(pager.begin_write_tx()) { + LimboResult::Busy => { + tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id); + // Failed to get pager lock - release our exclusive lock + self.release_exclusive_tx(&tx_id); + pager.end_read_tx()?; + return Err(LimboError::Busy); + } + LimboResult::Ok => { + let tx = Transaction::new(tx_id, begin_ts); + tracing::trace!( + "begin_exclusive_tx(tx_id={}) - exclusive write transaction", + tx_id + ); + tracing::debug!("begin_exclusive_tx: tx_id={} succeeded", tx_id); + self.txs.insert(tx_id, RwLock::new(tx)); + } + } + + Ok(IOResult::Done(tx_id)) + } + /// Begins a new transaction in the database. /// /// This function starts a new transaction in the database and returns a `TxID` value @@ -1161,6 +1219,10 @@ impl MvStore { let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); drop(tx); + if self.is_exclusive_tx(&tx_id) { + self.release_exclusive_tx(&tx_id); + } + for ref id in write_set { if let Some(row_versions) = self.rows.get(id) { let mut row_versions = row_versions.value().write(); @@ -1180,6 +1242,30 @@ impl MvStore { self.txs.remove(&tx_id); } + /// Returns true if the given transaction is the exclusive transaction. + fn is_exclusive_tx(&self, tx_id: &TxID) -> bool { + self.exclusive_tx.read().as_ref() == Some(tx_id) + } + + /// Acquires the exclusive transaction lock to the given transaction ID. + fn acquire_exclusive_tx(&self, tx_id: &TxID) -> Result<()> { + let mut exclusive_tx = self.exclusive_tx.write(); + if exclusive_tx.is_some() { + // Another transaction already holds the exclusive lock + return Err(LimboError::Busy); + } + // Reserve the exclusive lock with our tx_id + *exclusive_tx = Some(*tx_id); + Ok(()) + } + + /// Release the exclusive transaction lock if held by the this transaction. + fn release_exclusive_tx(&self, tx_id: &TxID) { + let mut exclusive_tx = self.exclusive_tx.write(); + assert_eq!(exclusive_tx.as_ref(), Some(tx_id)); + *exclusive_tx = None; + } + /// Generates next unique transaction id pub fn get_tx_id(&self) -> u64 { self.tx_ids.fetch_add(1, Ordering::SeqCst) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index bf0c008fe..cad89e0ef 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -191,6 +191,7 @@ pub enum TransactionMode { None, Read, Write, + Concurrent, } /// Main entry point for emitting bytecode for a SQL query diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 1189ec589..8ed31475e 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -77,6 +77,9 @@ pub fn translate_pragma( TransactionMode::Write => { program.begin_write_operation(); } + TransactionMode::Concurrent => { + program.begin_concurrent_operation(); + } } Ok(program) diff --git a/core/translate/transaction.rs b/core/translate/transaction.rs index 3f1efb606..86427fe19 100644 --- a/core/translate/transaction.rs +++ b/core/translate/transaction.rs @@ -1,5 +1,5 @@ use crate::schema::Schema; -use crate::translate::{ProgramBuilder, ProgramBuilderOpts}; +use crate::translate::{emitter::TransactionMode, ProgramBuilder, ProgramBuilderOpts}; use crate::vdbe::insn::Insn; use crate::Result; use turso_parser::ast::{Name, TransactionType}; @@ -26,7 +26,19 @@ pub fn translate_tx_begin( TransactionType::Immediate | TransactionType::Exclusive => { program.emit_insn(Insn::Transaction { db: 0, - write: true, + tx_mode: TransactionMode::Write, + schema_cookie: schema.schema_version, + }); + // TODO: Emit transaction instruction on temporary tables when we support them. + program.emit_insn(Insn::AutoCommit { + auto_commit: false, + rollback: false, + }); + } + TransactionType::Concurrent => { + program.emit_insn(Insn::Transaction { + db: 0, + tx_mode: TransactionMode::Concurrent, schema_cookie: schema.schema_version, }); // TODO: Emit transaction instruction on temporary tables when we support them. diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index ffa26c03d..79f0cc58e 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -774,6 +774,10 @@ impl ProgramBuilder { } } + pub fn begin_concurrent_operation(&mut self) { + self.txn_mode = TransactionMode::Concurrent; + } + /// Indicates the rollback behvaiour for the halt instruction in epilogue pub fn rollback(&mut self) { self.rollback = true; @@ -791,7 +795,7 @@ impl ProgramBuilder { if !matches!(self.txn_mode, TransactionMode::None) { self.emit_insn(Insn::Transaction { db: 0, - write: matches!(self.txn_mode, TransactionMode::Write), + tx_mode: self.txn_mode, schema_cookie: schema.schema_version, }); } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 83e164adc..429cd4ac7 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -30,6 +30,7 @@ use crate::{ }, printf::exec_printf, }, + translate::emitter::TransactionMode, }; use crate::{get_cursor, MvCursor}; use std::env::temp_dir; @@ -2094,13 +2095,14 @@ pub fn op_transaction( load_insn!( Transaction { db, - write, + tx_mode, schema_cookie, }, insn ); let conn = program.connection.clone(); - if *write && conn._db.open_flags.contains(OpenFlags::ReadOnly) { + let write = matches!(tx_mode, TransactionMode::Write); + if write && conn._db.open_flags.contains(OpenFlags::ReadOnly) { return Err(LimboError::ReadOnly); } @@ -2116,7 +2118,7 @@ pub fn op_transaction( // instead of ending the read tx, just update the state to pending. (TransactionState::PendingUpgrade, write) => { turso_assert!( - *write, + write, "pending upgrade should only be set for write transactions" ); ( @@ -2164,11 +2166,59 @@ pub fn op_transaction( // if header_schema_cookie != *schema_cookie { // return Err(LimboError::SchemaUpdated); // } - let tx_id = mv_store.begin_tx(pager.clone()); + let tx_id = match tx_mode { + TransactionMode::None | TransactionMode::Read | TransactionMode::Concurrent => { + mv_store.begin_tx(pager.clone()) + } + TransactionMode::Write => { + return_if_io!(mv_store.begin_exclusive_tx(pager.clone())) + } + }; conn.mv_transactions.borrow_mut().push(tx_id); program.connection.mv_tx_id.set(Some(tx_id)); + } else if updated + && matches!(new_transaction_state, TransactionState::Write { .. }) + && matches!(tx_mode, TransactionMode::Write) + { + // Handle upgrade from read to write transaction for MVCC + // Similar to non-MVCC path, we need to try upgrading to exclusive write transaction + turso_assert!( + !conn.is_nested_stmt.get(), + "nested stmt should not begin a new write transaction" + ); + match mv_store.begin_exclusive_tx(pager.clone()) { + Ok(IOResult::Done(tx_id)) => { + // Successfully upgraded to exclusive write transaction + // Remove the old read transaction and replace with write transaction + conn.mv_transactions.borrow_mut().push(tx_id); + program.connection.mv_tx_id.set(Some(tx_id)); + } + Err(LimboError::Busy) => { + // We failed to upgrade to write transaction so put the transaction into its original state. + // For MVCC, we don't need to end the transaction like in non-MVCC case, since MVCC transactions + // can be restarted automatically if they haven't performed any reads or writes yet. + // Just ensure the transaction state remains in its original state. + assert_eq!(conn.transaction_state.get(), current_state); + return Ok(InsnFunctionStepResult::Busy); + } + Ok(IOResult::IO(io)) => { + // set the transaction state to pending so we don't have to + // end the transaction. + program + .connection + .transaction_state + .replace(TransactionState::PendingUpgrade); + return Ok(InsnFunctionStepResult::IO(io)); + } + Err(e) => return Err(e), + } } } else { + if matches!(tx_mode, TransactionMode::Concurrent) { + return Err(LimboError::TxError( + "Concurrent transaction mode is only supported when MVCC is enabled".to_string(), + )); + } if updated && matches!(current_state, TransactionState::None) { turso_assert!( !conn.is_nested_stmt.get(), @@ -2275,19 +2325,25 @@ pub fn op_auto_commit( } else { conn.auto_commit.replace(*auto_commit); } - } else if !*auto_commit { - return Err(LimboError::TxError( - "cannot start a transaction within a transaction".to_string(), - )); - } else if *rollback { - return Err(LimboError::TxError( - "cannot rollback - no transaction is active".to_string(), - )); } else { - return Err(LimboError::TxError( - "cannot commit - no transaction is active".to_string(), - )); + let mvcc_tx_active = program.connection.mv_tx_id.get().is_some(); + if !mvcc_tx_active { + if !*auto_commit { + return Err(LimboError::TxError( + "cannot start a transaction within a transaction".to_string(), + )); + } else if *rollback { + return Err(LimboError::TxError( + "cannot rollback - no transaction is active".to_string(), + )); + } else { + return Err(LimboError::TxError( + "cannot commit - no transaction is active".to_string(), + )); + } + } } + program .commit_txn(pager.clone(), state, mv_store, *rollback) .map(Into::into) diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs index 6c850aadd..b7d67d8d6 100644 --- a/core/vdbe/explain.rs +++ b/core/vdbe/explain.rs @@ -653,14 +653,14 @@ pub fn insn_to_str( 0, "".to_string(), ), - Insn::Transaction { db, write , schema_cookie} => ( + Insn::Transaction { db, tx_mode, schema_cookie} => ( "Transaction", *db as i32, - *write as i32, + *tx_mode as i32, *schema_cookie as i32, Value::build_text(""), 0, - format!("iDb={db} write={write}"), + format!("iDb={db} tx_mode={tx_mode:?}"), ), Insn::Goto { target_pc } => ( "Goto", diff --git a/core/vdbe/insn.rs b/core/vdbe/insn.rs index e2d4a07c6..ac0564c0c 100644 --- a/core/vdbe/insn.rs +++ b/core/vdbe/insn.rs @@ -7,7 +7,7 @@ use super::{execute, AggFunc, BranchOffset, CursorID, FuncCtx, InsnFunction, Pag use crate::{ schema::{Affinity, BTreeTable, Column, Index}, storage::{pager::CreateBTreeFlags, wal::CheckpointMode}, - translate::collate::CollationSeq, + translate::{collate::CollationSeq, emitter::TransactionMode}, Value, }; use turso_macros::Description; @@ -465,9 +465,9 @@ pub enum Insn { /// Start a transaction. Transaction { - db: usize, // p1 - write: bool, // p2 - schema_cookie: u32, // p3 + db: usize, // p1 + tx_mode: TransactionMode, // p2 + schema_cookie: u32, // p3 }, /// Set database auto-commit mode and potentially rollback. diff --git a/parser/src/ast.rs b/parser/src/ast.rs index 0d12d2d0b..c1ce16931 100644 --- a/parser/src/ast.rs +++ b/parser/src/ast.rs @@ -1530,6 +1530,8 @@ pub enum TransactionType { Immediate, /// `EXCLUSIVE` Exclusive, + /// `CONCURRENT`, + Concurrent, } /// Upsert clause diff --git a/parser/src/ast/fmt.rs b/parser/src/ast/fmt.rs index 4b774837e..80defd6d3 100644 --- a/parser/src/ast/fmt.rs +++ b/parser/src/ast/fmt.rs @@ -2204,6 +2204,7 @@ impl ToTokens for TransactionType { Self::Deferred => TK_DEFERRED, Self::Immediate => TK_IMMEDIATE, Self::Exclusive => TK_EXCLUSIVE, + Self::Concurrent => TK_CONCURRENT, }, None, ) diff --git a/parser/src/lexer.rs b/parser/src/lexer.rs index 5e9e809b8..aab03f36a 100644 --- a/parser/src/lexer.rs +++ b/parser/src/lexer.rs @@ -27,6 +27,7 @@ fn keyword_or_id_token(input: &[u8]) -> TokenType { b"COLLATE" => TokenType::TK_COLLATE, b"COLUMN" => TokenType::TK_COLUMNKW, b"COMMIT" => TokenType::TK_COMMIT, + b"CONCURRENT" => TokenType::TK_CONCURRENT, b"CONFLICT" => TokenType::TK_CONFLICT, b"CONSTRAINT" => TokenType::TK_CONSTRAINT, b"CREATE" => TokenType::TK_CREATE, @@ -1289,6 +1290,7 @@ mod tests { ("COLLATE", TokenType::TK_COLLATE), ("COLUMN", TokenType::TK_COLUMNKW), ("COMMIT", TokenType::TK_COMMIT), + ("CONCURRENT", TokenType::TK_CONCURRENT), ("CONFLICT", TokenType::TK_CONFLICT), ("CONSTRAINT", TokenType::TK_CONSTRAINT), ("CREATE", TokenType::TK_CREATE), diff --git a/parser/src/parser.rs b/parser/src/parser.rs index cfb10f28e..fa2230373 100644 --- a/parser/src/parser.rs +++ b/parser/src/parser.rs @@ -652,6 +652,10 @@ impl<'a> Parser<'a> { eat_assert!(self, TK_EXCLUSIVE); Some(TransactionType::Exclusive) } + TK_CONCURRENT => { + eat_assert!(self, TK_CONCURRENT); + Some(TransactionType::Concurrent) + } _ => None, }, }; @@ -4129,6 +4133,27 @@ mod tests { name: Some(Name::Quoted("'my_transaction'".to_string())), })], ), + ( + b"BEGIN CONCURRENT TRANSACTION".as_slice(), + vec![Cmd::Stmt(Stmt::Begin { + typ: Some(TransactionType::Concurrent), + name: None, + })], + ), + ( + b"BEGIN CONCURRENT TRANSACTION my_transaction".as_slice(), + vec![Cmd::Stmt(Stmt::Begin { + typ: Some(TransactionType::Concurrent), + name: Some(Name::Ident("my_transaction".to_string())), + })], + ), + ( + b"BEGIN CONCURRENT TRANSACTION 'my_transaction'".as_slice(), + vec![Cmd::Stmt(Stmt::Begin { + typ: Some(TransactionType::Concurrent), + name: Some(Name::Quoted("'my_transaction'".to_string())), + })], + ), ( ";;;BEGIN;BEGIN;;;;;;BEGIN".as_bytes(), vec![ diff --git a/parser/src/token.rs b/parser/src/token.rs index 34a7344e2..ed8f416c5 100644 --- a/parser/src/token.rs +++ b/parser/src/token.rs @@ -178,6 +178,7 @@ pub enum TokenType { TK_OVER = 166, TK_FILTER = 167, TK_ILLEGAL = 185, + TK_CONCURRENT = 186, } impl TokenType { @@ -229,6 +230,7 @@ impl TokenType { TokenType::TK_EXCEPT => Some("EXCEPT"), TokenType::TK_EXCLUDE => Some("EXCLUDE"), TokenType::TK_EXCLUSIVE => Some("EXCLUSIVE"), + TokenType::TK_CONCURRENT => Some("CONCURRENT"), TokenType::TK_EXISTS => Some("EXISTS"), TokenType::TK_EXPLAIN => Some("EXPLAIN"), TokenType::TK_FAIL => Some("FAIL"), @@ -359,6 +361,7 @@ impl Display for TokenType { TK_DEFERRED => "TK_DEFERRED", TK_IMMEDIATE => "TK_IMMEDIATE", TK_EXCLUSIVE => "TK_EXCLUSIVE", + TK_CONCURRENT => "TK_CONCURRENT", TK_COMMIT => "TK_COMMIT", TK_END => "TK_END", TK_ROLLBACK => "TK_ROLLBACK", @@ -531,15 +534,15 @@ impl TokenType { match self { TK_ABORT | TK_ACTION | TK_AFTER | TK_ANALYZE | TK_ASC | TK_ATTACH | TK_BEFORE | TK_BEGIN | TK_BY | TK_CASCADE | TK_CAST | TK_CONFLICT | TK_DATABASE | TK_DEFERRED - | TK_DESC | TK_DETACH | TK_DO | TK_EACH | TK_END | TK_EXCLUSIVE | TK_EXPLAIN - | TK_FAIL | TK_FOR | TK_IGNORE | TK_IMMEDIATE | TK_INITIALLY | TK_INSTEAD - | TK_LIKE_KW | TK_MATCH | TK_NO | TK_PLAN | TK_QUERY | TK_KEY | TK_OF | TK_OFFSET - | TK_PRAGMA | TK_RAISE | TK_RECURSIVE | TK_RELEASE | TK_REPLACE | TK_RESTRICT - | TK_ROW | TK_ROWS | TK_ROLLBACK | TK_SAVEPOINT | TK_TEMP | TK_TRIGGER | TK_VACUUM - | TK_VIEW | TK_VIRTUAL | TK_WITH | TK_NULLS | TK_FIRST | TK_LAST | TK_CURRENT - | TK_FOLLOWING | TK_PARTITION | TK_PRECEDING | TK_RANGE | TK_UNBOUNDED | TK_EXCLUDE - | TK_GROUPS | TK_OTHERS | TK_TIES | TK_ALWAYS | TK_MATERIALIZED | TK_REINDEX - | TK_RENAME | TK_CTIME_KW | TK_IF => TK_ID, + | TK_DESC | TK_DETACH | TK_DO | TK_EACH | TK_END | TK_EXCLUSIVE | TK_CONCURRENT + | TK_EXPLAIN | TK_FAIL | TK_FOR | TK_IGNORE | TK_IMMEDIATE | TK_INITIALLY + | TK_INSTEAD | TK_LIKE_KW | TK_MATCH | TK_NO | TK_PLAN | TK_QUERY | TK_KEY | TK_OF + | TK_OFFSET | TK_PRAGMA | TK_RAISE | TK_RECURSIVE | TK_RELEASE | TK_REPLACE + | TK_RESTRICT | TK_ROW | TK_ROWS | TK_ROLLBACK | TK_SAVEPOINT | TK_TEMP + | TK_TRIGGER | TK_VACUUM | TK_VIEW | TK_VIRTUAL | TK_WITH | TK_NULLS | TK_FIRST + | TK_LAST | TK_CURRENT | TK_FOLLOWING | TK_PARTITION | TK_PRECEDING | TK_RANGE + | TK_UNBOUNDED | TK_EXCLUDE | TK_GROUPS | TK_OTHERS | TK_TIES | TK_ALWAYS + | TK_MATERIALIZED | TK_REINDEX | TK_RENAME | TK_CTIME_KW | TK_IF => TK_ID, // | TK_COLUMNKW | TK_UNION | TK_EXCEPT | TK_INTERSECT | TK_GENERATED | TK_WITHOUT // see comments in `next_token` of parser _ => self, diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 8a571b8ce..858932a09 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -35,6 +35,24 @@ impl TempDatabase { Self { path, io, db } } + pub fn new_with_opts(db_name: &str, opts: turso_core::DatabaseOpts) -> Self { + let mut path = TempDir::new().unwrap().keep(); + path.push(db_name); + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + let db = Database::open_file_with_flags( + io.clone(), + path.to_str().unwrap(), + turso_core::OpenFlags::default(), + opts, + ) + .unwrap(); + Self { + path: path.to_path_buf(), + io, + db, + } + } + pub fn new_with_existent(db_path: &Path, enable_indexes: bool) -> Self { Self::new_with_existent_with_flags( db_path, diff --git a/tests/integration/query_processing/test_transactions.rs b/tests/integration/query_processing/test_transactions.rs index 3a9779e87..e969eaef7 100644 --- a/tests/integration/query_processing/test_transactions.rs +++ b/tests/integration/query_processing/test_transactions.rs @@ -172,3 +172,75 @@ fn test_transaction_visibility() { } } } + +#[test] +fn test_mvcc_transactions_autocommit() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_transactions_autocommit.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn1 = tmp_db.connect_limbo(); + + // This should work - basic CREATE TABLE in MVCC autocommit mode + conn1 + .execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)") + .unwrap(); +} + +#[test] +fn test_mvcc_transactions_immediate() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_transactions_immediate.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn1 = tmp_db.connect_limbo(); + let conn2 = tmp_db.connect_limbo(); + + conn1 + .execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)") + .unwrap(); + + // Start an immediate transaction + conn1.execute("BEGIN IMMEDIATE").unwrap(); + + // Another immediate transaction fails with BUSY + let result = conn2.execute("BEGIN IMMEDIATE"); + assert!(matches!(result, Err(LimboError::Busy))); +} + +#[test] +fn test_mvcc_transactions_deferred() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_transactions_deferred.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn1 = tmp_db.connect_limbo(); + let conn2 = tmp_db.connect_limbo(); + + conn1 + .execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)") + .unwrap(); + + conn1.execute("BEGIN DEFERRED").unwrap(); + conn2.execute("BEGIN DEFERRED").unwrap(); + + conn1 + .execute("INSERT INTO test (id, value) VALUES (1, 'first')") + .unwrap(); + + let result = conn2.execute("INSERT INTO test (id, value) VALUES (2, 'second')"); + assert!(matches!(result, Err(LimboError::Busy))); + + conn1.execute("COMMIT").unwrap(); + + conn2 + .execute("INSERT INTO test (id, value) VALUES (2, 'second')") + .unwrap(); + conn2.execute("COMMIT").unwrap(); + + let mut stmt = conn1.query("SELECT COUNT(*) FROM test").unwrap().unwrap(); + if let StepResult::Row = stmt.step().unwrap() { + let row = stmt.row().unwrap(); + assert_eq!(*row.get::<&Value>(0).unwrap(), Value::Integer(2)); + } +}