diff --git a/core/benches/mvcc_benchmark.rs b/core/benches/mvcc_benchmark.rs index 547a473b4..ecce8c06d 100644 --- a/core/benches/mvcc_benchmark.rs +++ b/core/benches/mvcc_benchmark.rs @@ -47,9 +47,7 @@ fn bench(c: &mut Criterion) { let conn = &db.conn; let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()).unwrap(); let mv_store = &db.mvcc_store; - let mut sm = mv_store - .commit_tx(tx_id, conn.get_pager().clone(), conn) - .unwrap(); + let mut sm = mv_store.commit_tx(tx_id, conn).unwrap(); // TODO: sync IO hack loop { let res = sm.step(mv_store).unwrap(); @@ -76,9 +74,7 @@ fn bench(c: &mut Criterion) { ) .unwrap(); let mv_store = &db.mvcc_store; - let mut sm = mv_store - .commit_tx(tx_id, conn.get_pager().clone(), conn) - .unwrap(); + let mut sm = mv_store.commit_tx(tx_id, conn).unwrap(); // TODO: sync IO hack loop { let res = sm.step(mv_store).unwrap(); @@ -111,9 +107,7 @@ fn bench(c: &mut Criterion) { ) .unwrap(); let mv_store = &db.mvcc_store; - let mut sm = mv_store - .commit_tx(tx_id, conn.get_pager().clone(), conn) - .unwrap(); + let mut sm = mv_store.commit_tx(tx_id, conn).unwrap(); // TODO: sync IO hack loop { let res = sm.step(mv_store).unwrap(); diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 95b36beb9..8dc08463d 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -19,7 +19,6 @@ use crate::Result; use crate::{Connection, Pager}; use crossbeam_skiplist::{SkipMap, SkipSet}; use parking_lot::RwLock; -use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Debug; use std::marker::PhantomData; @@ -262,41 +261,11 @@ impl AtomicTransactionState { #[derive(Debug)] pub enum CommitState { Initial, - BeginPagerTxn { - end_ts: u64, - }, - WriteRow { - end_ts: u64, - write_set_index: usize, - requires_seek: bool, - }, - WriteRowStateMachine { - end_ts: u64, - write_set_index: usize, - }, - DeleteRowStateMachine { - end_ts: u64, - write_set_index: usize, - }, - CommitPagerTxn { - end_ts: u64, - }, - Commit { - end_ts: u64, - }, - BeginCommitLogicalLog { - end_ts: u64, - log_record: LogRecord, - }, - EndCommitLogicalLog { - end_ts: u64, - }, - SyncLogicalLog { - end_ts: u64, - }, - CommitEnd { - end_ts: u64, - }, + Commit { end_ts: u64 }, + BeginCommitLogicalLog { end_ts: u64, log_record: LogRecord }, + EndCommitLogicalLog { end_ts: u64 }, + SyncLogicalLog { end_ts: u64 }, + CommitEnd { end_ts: u64 }, } #[derive(Debug)] @@ -311,21 +280,16 @@ pub enum WriteRowState { #[derive(Debug)] struct CommitCoordinator { pager_commit_lock: Arc, - commits_waiting: Arc, } pub struct CommitStateMachine { state: CommitState, is_finalized: bool, - pager: Arc, tx_id: TxID, connection: Arc, /// Write set sorted by table id and row id write_set: Vec, - write_row_state_machine: Option>, - delete_row_state_machine: Option>, commit_coordinator: Arc, - cursors: HashMap>>, header: Arc>>, _phantom: PhantomData, } @@ -365,7 +329,6 @@ pub struct DeleteRowStateMachine { impl CommitStateMachine { fn new( state: CommitState, - pager: Arc, tx_id: TxID, connection: Arc, commit_coordinator: Arc, @@ -374,46 +337,14 @@ impl CommitStateMachine { Self { state, is_finalized: false, - pager, tx_id, connection, write_set: Vec::new(), - write_row_state_machine: None, - delete_row_state_machine: None, commit_coordinator, - cursors: HashMap::new(), header, _phantom: PhantomData, } } - - /// We need to update pager's header to account for changes made by other transactions. - fn update_pager_header(&self, mvcc_store: &MvStore) -> Result<()> { - let header = self.header.read(); - let last_commited_header = header.as_ref().expect("Header not found"); - self.pager.io.block(|| self.pager.maybe_allocate_page1())?; - let _ = self.pager.io.block(|| { - self.pager.with_header_mut(|header_in_pager| { - let header_in_transaction = mvcc_store.get_transaction_database_header(&self.tx_id); - tracing::debug!("update header here {}", header_in_transaction.schema_cookie); - // database_size should only be updated in each commit so it should be safe to assume correct database_size is in last_commited_header - header_in_pager.database_size = last_commited_header.database_size; - if header_in_transaction.schema_cookie < last_commited_header.schema_cookie { - tracing::error!("txn's schema cookie went back in time, aborting"); - return Err(LimboError::SchemaUpdated); - } - - assert!( - header_in_transaction.schema_cookie >= last_commited_header.schema_cookie, - "txn's schema cookie went back in time" - ); - header_in_pager.schema_cookie = header_in_transaction.schema_cookie; - // TODO: deal with other fields - Ok(()) - }) - })?; - Ok(()) - } } impl WriteRowStateMachine { @@ -547,239 +478,6 @@ impl StateTransition for CommitStateMachine { self.state = CommitState::Commit { end_ts }; Ok(TransitionResult::Continue) } - CommitState::BeginPagerTxn { end_ts } => { - // FIXME: how do we deal with multiple concurrent writes? - // WAL requires a txn to be written sequentially. Either we: - // 1. Wait for currently writer to finish before second txn starts. - // 2. Choose a txn to write depending on some heuristics like amount of frames will be written. - // 3. .. - - // If this is the exclusive transaction, we already acquired a write transaction - // on the pager in begin_exclusive_tx() and don't need to acquire it. - if mvcc_store.is_exclusive_tx(&self.tx_id) { - self.update_pager_header(mvcc_store)?; - self.state = CommitState::WriteRow { - end_ts: *end_ts, - write_set_index: 0, - requires_seek: true, - }; - return Ok(TransitionResult::Continue); - } else if mvcc_store.has_exclusive_tx() { - // There is an exclusive transaction holding the write lock. We must abort. - return Err(LimboError::WriteWriteConflict); - } - // Currently txns are queued without any heuristics whasoever. This is important because - // we need to ensure writes to disk happen sequentially. - // * We don't want txns to write to WAL in parallel. - // * We don't want BTree modifications to happen in parallel. - // If any of these were to happen, we would find ourselves in a bad corruption situation. - - // NOTE: since we are blocking for `begin_write_tx` we do not care about re-entrancy right now. - let locked = self.commit_coordinator.pager_commit_lock.write(); - if !locked { - self.commit_coordinator - .commits_waiting - .fetch_add(1, Ordering::SeqCst); - // FIXME: IOCompletions still needs a yield variant... - return Ok(TransitionResult::Io(crate::types::IOCompletions::Single( - Completion::new_dummy(), - ))); - } - - self.update_pager_header(mvcc_store)?; - - { - let mut wal = self.pager.wal.as_ref().unwrap().borrow_mut(); - // we need to update the max frame to the latest shared max frame in order to avoid snapshot staleness - wal.update_max_frame(); - } - - // We started a pager read transaction at the beginning of the MV transaction, because - // any reads we do from the database file and WAL must uphold snapshot isolation. - // However, now we must end and immediately restart the read transaction before committing. - // This is because other transactions may have committed writes to the DB file or WAL, - // and our pager must read in those changes when applying our writes; otherwise we would overwrite - // the changes from the previous committed transactions. - // - // Note that this would be incredibly unsafe in the regular transaction model, but in MVCC we trust - // the MV-store to uphold the guarantee that no write-write conflicts happened. - self.pager.end_read_tx().expect("end_read_tx cannot fail"); - let result = self.pager.begin_read_tx(); - if let Err(LimboError::Busy) = result { - // We cannot obtain a WAL read lock due to contention, so we must abort. - self.commit_coordinator.pager_commit_lock.unlock(); - return Err(LimboError::WriteWriteConflict); - } - result?; - let result = self.pager.io.block(|| self.pager.begin_write_tx()); - if let Err(LimboError::Busy) = result { - // There is a non-CONCURRENT transaction holding the write lock. We must abort. - self.commit_coordinator.pager_commit_lock.unlock(); - return Err(LimboError::WriteWriteConflict); - } - result?; - self.state = CommitState::WriteRow { - end_ts: *end_ts, - write_set_index: 0, - requires_seek: true, - }; - return Ok(TransitionResult::Continue); - } - CommitState::WriteRow { - end_ts, - write_set_index, - requires_seek, - } => { - if *write_set_index == self.write_set.len() { - self.state = CommitState::CommitPagerTxn { end_ts: *end_ts }; - return Ok(TransitionResult::Continue); - } - let id = &self.write_set[*write_set_index]; - if let Some(row_versions) = mvcc_store.rows.get(id) { - let row_versions = row_versions.value().read(); - // Find rows that were written by this transaction. - // Hekaton uses oldest-to-newest order for row versions, so we reverse iterate to find the newest one - // this transaction changed. - for row_version in row_versions.iter().rev() { - if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin { - if row_tx_id == self.tx_id { - let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) { - cursor.clone() - } else { - let cursor = BTreeCursor::new_table( - None, // Write directly to B-tree - self.pager.clone(), - id.table_id as usize, - row_version.row.column_count, - ); - let cursor = Arc::new(RwLock::new(cursor)); - self.cursors.insert(id.table_id, cursor.clone()); - cursor - }; - let state_machine = mvcc_store.write_row_to_pager( - &row_version.row, - cursor, - *requires_seek, - )?; - self.write_row_state_machine = Some(state_machine); - - self.state = CommitState::WriteRowStateMachine { - end_ts: *end_ts, - write_set_index: *write_set_index, - }; - break; - } - } - if let Some(TxTimestampOrID::TxID(row_tx_id)) = row_version.end { - if row_tx_id == self.tx_id { - let column_count = row_version.row.column_count; - let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) { - cursor.clone() - } else { - let cursor = BTreeCursor::new_table( - None, // Write directly to B-tree - self.pager.clone(), - id.table_id as usize, - column_count, - ); - let cursor = Arc::new(RwLock::new(cursor)); - self.cursors.insert(id.table_id, cursor.clone()); - cursor - }; - let state_machine = - mvcc_store.delete_row_from_pager(row_version.row.id, cursor)?; - self.delete_row_state_machine = Some(state_machine); - self.state = CommitState::DeleteRowStateMachine { - end_ts: *end_ts, - write_set_index: *write_set_index, - }; - break; - } - } - } - } - Ok(TransitionResult::Continue) - } - - CommitState::WriteRowStateMachine { - end_ts, - write_set_index, - } => { - let write_row_state_machine = self.write_row_state_machine.as_mut().unwrap(); - match write_row_state_machine.step(&())? { - IOResult::IO(io) => return Ok(TransitionResult::Io(io)), - IOResult::Done(_) => { - let requires_seek = { - if let Some(next_id) = self.write_set.get(*write_set_index + 1) { - let current_id = &self.write_set[*write_set_index]; - if current_id.table_id == next_id.table_id - && current_id.row_id + 1 == next_id.row_id - { - // simple optimizaiton for sequential inserts with inceasing by 1 ids - // we should probably just check record in next row and see if it requires seek - false - } else { - true - } - } else { - false - } - }; - self.state = CommitState::WriteRow { - end_ts: *end_ts, - write_set_index: *write_set_index + 1, - requires_seek, - }; - return Ok(TransitionResult::Continue); - } - } - } - CommitState::DeleteRowStateMachine { - end_ts, - write_set_index, - } => { - let delete_row_state_machine = self.delete_row_state_machine.as_mut().unwrap(); - match delete_row_state_machine.step(&())? { - IOResult::IO(io) => return Ok(TransitionResult::Io(io)), - IOResult::Done(_) => { - self.state = CommitState::WriteRow { - end_ts: *end_ts, - write_set_index: *write_set_index + 1, - requires_seek: true, - }; - return Ok(TransitionResult::Continue); - } - } - } - CommitState::CommitPagerTxn { end_ts } => { - // Write committed data to pager for persistence - // Flush dirty pages to WAL - this is critical for data persistence - // Similar to what step_end_write_txn does for legacy transactions - - let result = self - .pager - .end_tx( - false, // rollback = false since we're committing - &self.connection, - ) - .map_err(|e| LimboError::InternalError(e.to_string())) - .unwrap(); - match result { - IOResult::Done(_) => { - // FIXME: hack for now to keep database header updated for pager commit - let tx = mvcc_store.txs.get(&self.tx_id).unwrap(); - let tx_unlocked = tx.value(); - self.header.write().replace(*tx_unlocked.header.read()); - self.commit_coordinator.pager_commit_lock.unlock(); - // TODO: here mark we are ready for a batch - self.state = CommitState::Commit { end_ts: *end_ts }; - return Ok(TransitionResult::Continue); - } - IOResult::IO(io) => { - return Ok(TransitionResult::Io(io)); - } - } - } CommitState::Commit { end_ts } => { let mut log_record = LogRecord::new(*end_ts); if !mvcc_store.is_exclusive_tx(&self.tx_id) && mvcc_store.has_exclusive_tx() { @@ -1126,7 +824,6 @@ impl MvStore { exclusive_tx: RwLock::new(None), commit_coordinator: Arc::new(CommitCoordinator { pager_commit_lock: Arc::new(TursoRwLock::new()), - commits_waiting: Arc::new(AtomicU64::new(0)), }), global_header: Arc::new(RwLock::new(None)), blocking_checkpoint_lock: Arc::new(TursoRwLock::new()), @@ -1553,13 +1250,11 @@ impl MvStore { pub fn commit_tx( &self, tx_id: TxID, - pager: Arc, connection: &Arc, ) -> Result>> { let state_machine: StateMachine> = StateMachine::>::new(CommitStateMachine::new( CommitState::Initial, - pager, tx_id, connection.clone(), self.commit_coordinator.clone(), diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index ff35ba9ab..1f29160e4 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -760,9 +760,7 @@ pub(crate) fn commit_tx( conn: &Arc, tx_id: u64, ) -> Result<()> { - let mut sm = mv_store - .commit_tx(tx_id, conn.pager.read().clone(), conn) - .unwrap(); + let mut sm = mv_store.commit_tx(tx_id, conn).unwrap(); // TODO: sync IO hack loop { let res = sm.step(&mv_store)?; @@ -783,9 +781,7 @@ pub(crate) fn commit_tx_no_conn( conn: &Arc, ) -> Result<(), LimboError> { let mv_store = db.get_mvcc_store(); - let mut sm = mv_store - .commit_tx(tx_id, conn.pager.read().clone(), conn) - .unwrap(); + let mut sm = mv_store.commit_tx(tx_id, conn).unwrap(); // TODO: sync IO hack loop { let res = sm.step(&mv_store)?; diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index f70028f47..422737140 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -837,7 +837,7 @@ impl Program { let Some((tx_id, _)) = conn.mv_tx.get() else { return Ok(IOResult::Done(())); }; - let state_machine = mv_store.commit_tx(tx_id, pager.clone(), &conn).unwrap(); + let state_machine = mv_store.commit_tx(tx_id, &conn).unwrap(); program_state.commit_state = CommitState::CommitingMvcc { state_machine }; } let CommitState::CommitingMvcc { state_machine } = &mut program_state.commit_state