diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 911d97e9f..881601210 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -506,8 +506,14 @@ impl StateTransition for CommitStateMachine { ) .map_err(|e| LimboError::InternalError(e.to_string())) .unwrap(); - if let crate::types::IOResult::Done(_) = result { - break; + match result { + crate::types::IOResult::Done(_) => { + break; + } + crate::types::IOResult::IO(io) => { + io.wait(self.pager.io.as_ref())?; + continue; + } } } self.state = CommitState::Commit { end_ts }; @@ -981,16 +987,12 @@ impl MvStore { tx_id: TxID, pager: Rc, connection: &Arc, - schema_did_change: bool, ) -> Result>> { - let state_machine: StateMachine> = - StateMachine::>::new(CommitStateMachine::new( - CommitState::Initial, - pager, - tx_id, - connection.clone(), - schema_did_change, - )); + let state_machine: StateMachine> = StateMachine::< + CommitStateMachine, + >::new( + CommitStateMachine::new(CommitState::Initial, pager, tx_id, connection.clone()), + ); Ok(state_machine) } diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 2ae781891..138cb08f2 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -654,10 +654,10 @@ fn test_future_row() { use crate::mvcc::cursor::MvccLazyCursor; use crate::mvcc::database::{MvStore, Row, RowID}; use crate::types::Text; -use crate::Database; use crate::MemoryIO; use crate::RefValue; use crate::Value; +use crate::{Database, StepResult}; // Simple atomic clock implementation for testing @@ -1124,6 +1124,38 @@ fn test_restart() { } } +#[test] +fn test_connection_sees_other_connection_changes() { + let db = MvccTestDbNoConn::new_with_random_db(); + let conn0 = db.connect(); + conn0 + .execute("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, text TEXT)") + .unwrap(); + let conn1 = db.connect(); + conn1 + .execute("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, text TEXT)") + .unwrap(); + conn0 + .execute("INSERT INTO test_table (id, text) VALUES (965, 'text_877')") + .unwrap(); + let mut stmt = conn1.query("SELECT * FROM test_table").unwrap().unwrap(); + loop { + let res = stmt.step().unwrap(); + match res { + StepResult::Row => { + let row = stmt.row().unwrap(); + let text = row.get_value(1).to_text().unwrap(); + assert_eq!(text, "text_877"); + } + StepResult::Done => break, + StepResult::IO => { + stmt.run_once().unwrap(); + } + _ => panic!("Expected Row"), + } + } +} + fn get_record_value(row: &Row) -> ImmutableRecord { let mut record = ImmutableRecord::new(1024); record.start_serialization(&row.data); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index e256d41de..2010f626a 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -971,6 +971,7 @@ impl Pager { TransactionState::Write { schema_did_change } => (true, schema_did_change), _ => (false, false), }; + tracing::trace!("end_tx(schema_did_change={})", schema_did_change); if rollback { if is_write { wal.borrow().end_write_tx(); diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 89ac0a2d6..7d932ef04 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -1981,7 +1981,50 @@ pub fn op_transaction( let pager = program.get_pager_from_database_index(db); + // 1. We try to upgrade current version + let current_state = conn.transaction_state.get(); + let (new_transaction_state, updated) = match (current_state, write) { + // pending state means that we tried beginning a tx and the method returned IO. + // instead of ending the read tx, just update the state to pending. + (TransactionState::PendingUpgrade, write) => { + turso_assert!( + *write, + "pending upgrade should only be set for write transactions" + ); + ( + TransactionState::Write { + schema_did_change: false, + }, + true, + ) + } + (TransactionState::Write { schema_did_change }, true) => { + (TransactionState::Write { schema_did_change }, false) + } + (TransactionState::Write { schema_did_change }, false) => { + (TransactionState::Write { schema_did_change }, false) + } + (TransactionState::Read, true) => ( + TransactionState::Write { + schema_did_change: false, + }, + true, + ), + (TransactionState::Read, false) => (TransactionState::Read, false), + (TransactionState::None, true) => ( + TransactionState::Write { + schema_did_change: false, + }, + true, + ), + (TransactionState::None, false) => (TransactionState::Read, true), + }; + + // 2. Start transaction if needed if let Some(mv_store) = &mv_store { + // In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed. + // Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough + // for both. if state.mv_tx_id.is_none() { // We allocate the first page lazily in the first transaction. return_if_io!(pager.maybe_allocate_page1()); @@ -1997,43 +2040,6 @@ pub fn op_transaction( state.mv_tx_id = Some(tx_id); } } else { - let current_state = conn.transaction_state.get(); - let (new_transaction_state, updated) = match (current_state, write) { - // pending state means that we tried beginning a tx and the method returned IO. - // instead of ending the read tx, just update the state to pending. - (TransactionState::PendingUpgrade, write) => { - turso_assert!( - *write, - "pending upgrade should only be set for write transactions" - ); - ( - TransactionState::Write { - schema_did_change: false, - }, - true, - ) - } - (TransactionState::Write { schema_did_change }, true) => { - (TransactionState::Write { schema_did_change }, false) - } - (TransactionState::Write { schema_did_change }, false) => { - (TransactionState::Write { schema_did_change }, false) - } - (TransactionState::Read, true) => ( - TransactionState::Write { - schema_did_change: false, - }, - true, - ), - (TransactionState::Read, false) => (TransactionState::Read, false), - (TransactionState::None, true) => ( - TransactionState::Write { - schema_did_change: false, - }, - true, - ), - (TransactionState::None, false) => (TransactionState::Read, true), - }; if updated && matches!(current_state, TransactionState::None) { if let LimboResult::Busy = pager.begin_read_tx()? { return Ok(InsnFunctionStepResult::Busy); @@ -2061,36 +2067,38 @@ pub fn op_transaction( } } } + } - // Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error - if updated { - conn.transaction_state.replace(new_transaction_state); - } + // 3. Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error + if updated { + conn.transaction_state.replace(new_transaction_state); + } - // Check whether schema has changed if we are actually going to access the database. - if !matches!(new_transaction_state, TransactionState::None) { - let res = pager - .io - .block(|| pager.with_header(|header| header.schema_cookie.get())); - match res { - Ok(header_schema_cookie) => { - if header_schema_cookie != *schema_cookie { - tracing::info!( - "schema changed, force reprepare: {} != {}", - header_schema_cookie, - *schema_cookie - ); - return Err(LimboError::SchemaUpdated); - } - } - // This means we are starting a read transaction and page 1 is not allocated yet, so we just continue execution - Err(LimboError::Page1NotAlloc) => {} - Err(err) => { - return Err(err); - } + // 4. Check whether schema has changed if we are actually going to access the database. + // Can only read header if page 1 has been allocated already + // begin_write_tx that happens, but not begin_read_tx + // TODO: this is a hack to make the pager run the IO loop + let res = pager + .io + .block(|| pager.with_header(|header| header.schema_cookie.get())); + match res { + Ok(header_schema_cookie) => { + if header_schema_cookie != *schema_cookie { + tracing::info!( + "schema changed, force reprepare: {} != {}", + header_schema_cookie, + *schema_cookie + ); + return Err(LimboError::SchemaUpdated); } } + // This means we are starting a read_tx and we do not have a page 1 yet, so we just continue execution + Err(LimboError::Page1NotAlloc) => {} + Err(err) => { + return Err(err); + } } + state.pc += 1; Ok(InsnFunctionStepResult::Step) } @@ -6476,16 +6484,14 @@ pub fn op_set_cookie( header.incremental_vacuum_enabled = (*value as u32).into() } Cookie::SchemaVersion => { - if mv_store.is_none() { - // we update transaction state to indicate that the schema has changed - match program.connection.transaction_state.get() { - TransactionState::Write { schema_did_change } => { - program.connection.transaction_state.set(TransactionState::Write { schema_did_change: true }); - }, - TransactionState::Read => unreachable!("invalid transaction state for SetCookie: TransactionState::Read, should be write"), - TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"), - TransactionState::PendingUpgrade => unreachable!("invalid transaction state for SetCookie: TransactionState::PendingUpgrade, should be write"), - } + // we update transaction state to indicate that the schema has changed + match program.connection.transaction_state.get() { + TransactionState::Write { schema_did_change } => { + program.connection.transaction_state.set(TransactionState::Write { schema_did_change: true }); + }, + TransactionState::Read => unreachable!("invalid transaction state for SetCookie: TransactionState::Read, should be write"), + TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"), + TransactionState::PendingUpgrade => unreachable!("invalid transaction state for SetCookie: TransactionState::PendingUpgrade, should be write"), } program .connection