diff --git a/core/mvcc/cursor.rs b/core/mvcc/cursor.rs index 30a2fb785..d9454cd7e 100644 --- a/core/mvcc/cursor.rs +++ b/core/mvcc/cursor.rs @@ -52,6 +52,11 @@ impl MvccLazyCursor { Ok(()) } + pub fn delete(&mut self, rowid: RowID, pager: Rc) -> Result<()> { + self.db.delete(self.tx_id, rowid, pager)?; + Ok(()) + } + pub fn current_row_id(&mut self) -> Option { match self.current_pos { CursorPosition::Loaded(id) => Some(id), diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 195ac9333..8d950d221 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -246,6 +246,7 @@ pub enum CommitState { BeginPagerTxn { end_ts: u64 }, WriteRow { end_ts: u64, write_set_index: usize }, WriteRowStateMachine { end_ts: u64, write_set_index: usize }, + DeleteRowStateMachine { end_ts: u64, write_set_index: usize }, CommitPagerTxn { end_ts: u64 }, Commit { end_ts: u64 }, } @@ -266,6 +267,7 @@ pub struct CommitStateMachine { connection: Arc, write_set: Vec, write_row_state_machine: Option>, + delete_row_state_machine: Option>, _phantom: PhantomData, } @@ -278,6 +280,23 @@ pub struct WriteRowStateMachine { cursor: Option, } +#[derive(Debug)] +pub enum DeleteRowState { + Initial, + CreateCursor, + Seek, + Delete, +} + +pub struct DeleteRowStateMachine { + state: DeleteRowState, + is_finalized: bool, + pager: Rc, + rowid: RowID, + column_count: usize, + cursor: Option, +} + impl CommitStateMachine { fn new(state: CommitState, pager: Rc, tx_id: TxID, connection: Arc) -> Self { Self { @@ -288,6 +307,7 @@ impl CommitStateMachine { connection, write_set: Vec::new(), write_row_state_machine: None, + delete_row_state_machine: None, _phantom: PhantomData, } } @@ -457,12 +477,16 @@ impl StateTransition for CommitStateMachine { break; } } - if let Some(TxTimestampOrID::Timestamp(row_tx_id)) = row_version.end { + if let Some(TxTimestampOrID::TxID(row_tx_id)) = row_version.end { if row_tx_id == self.tx_id { - let state_machine = mvcc_store - .write_row_to_pager(self.pager.clone(), &row_version.row)?; - self.write_row_state_machine = Some(state_machine); - self.state = CommitState::WriteRowStateMachine { + let column_count = row_version.row.column_count; + let state_machine = mvcc_store.delete_row_from_pager( + self.pager.clone(), + row_version.row.id, + column_count, + )?; + self.delete_row_state_machine = Some(state_machine); + self.state = CommitState::DeleteRowStateMachine { end_ts, write_set_index, }; @@ -473,6 +497,7 @@ impl StateTransition for CommitStateMachine { } Ok(TransitionResult::Continue) } + CommitState::WriteRowStateMachine { end_ts, write_set_index, @@ -492,6 +517,25 @@ impl StateTransition for CommitStateMachine { } } } + CommitState::DeleteRowStateMachine { + end_ts, + write_set_index, + } => { + let delete_row_state_machine = self.delete_row_state_machine.as_mut().unwrap(); + match delete_row_state_machine.step(&())? { + TransitionResult::Io(io) => return Ok(TransitionResult::Io(io)), + TransitionResult::Continue => { + return Ok(TransitionResult::Continue); + } + TransitionResult::Done(_) => { + self.state = CommitState::WriteRow { + end_ts, + write_set_index: write_set_index + 1, + }; + return Ok(TransitionResult::Continue); + } + } + } CommitState::CommitPagerTxn { end_ts } => { // Write committed data to pager for persistence // Flush dirty pages to WAL - this is critical for data persistence @@ -667,6 +711,93 @@ impl StateTransition for WriteRowStateMachine { } } +impl StateTransition for DeleteRowStateMachine { + type Context = (); + type SMResult = (); + + #[tracing::instrument(fields(state = ?self.state), skip(self, _context))] + fn step(&mut self, _context: &Self::Context) -> Result> { + use crate::storage::btree::BTreeCursor; + use crate::types::{IOResult, SeekKey, SeekOp}; + + match self.state { + DeleteRowState::Initial => { + self.state = DeleteRowState::CreateCursor; + Ok(TransitionResult::Continue) + } + DeleteRowState::CreateCursor => { + let root_page = self.rowid.table_id as usize; + let num_columns = self.column_count; + + let cursor = + BTreeCursor::new_table(None, self.pager.clone(), root_page, num_columns); + self.cursor = Some(cursor); + + self.state = DeleteRowState::Seek; + Ok(TransitionResult::Continue) + } + DeleteRowState::Seek => { + let seek_key = SeekKey::TableRowId(self.rowid.row_id); + let cursor = self.cursor.as_mut().unwrap(); + + match cursor.seek(seek_key, SeekOp::GE { eq_only: true })? { + IOResult::Done(_) => { + self.state = DeleteRowState::Delete; + Ok(TransitionResult::Continue) + } + IOResult::IO(io) => { + return Ok(TransitionResult::Io(io)); + } + } + } + DeleteRowState::Delete => { + // Insert the record into the B-tree + let cursor = self.cursor.as_mut().unwrap(); + + match cursor + .delete() + .map_err(|e| LimboError::InternalError(e.to_string()))? + { + IOResult::Done(()) => { + tracing::trace!( + "delete_row_from_pager(table_id={}, row_id={})", + self.rowid.table_id, + self.rowid.row_id + ); + self.finalize(&())?; + Ok(TransitionResult::Done(())) + } + IOResult::IO(io) => { + return Ok(TransitionResult::Io(io)); + } + } + } + } + } + + fn finalize(&mut self, _context: &Self::Context) -> Result<()> { + self.is_finalized = true; + Ok(()) + } + + fn is_finalized(&self) -> bool { + self.is_finalized + } +} + +impl DeleteRowStateMachine { + fn new(pager: Rc, rowid: RowID, column_count: usize) -> Self { + Self { + state: DeleteRowState::Initial, + is_finalized: false, + pager, + rowid, + column_count, + cursor: None, + } + } +} + /// A multi-version concurrency control database. #[derive(Debug)] pub struct MvStore { @@ -911,10 +1042,18 @@ impl MvStore { let tx = self.txs.get(&tx_id).unwrap(); let tx = tx.value().read(); let mut rows = self.rows.range(min_bound..max_bound); - rows.next().and_then(|row| { - // Find last valid version based on transaction. - self.find_last_visible_version(&tx, row) - }) + loop { + // We are moving forward, so if a row was deleted we just need to skip it. Therefore, we need + // to loop either until we find a row that is not deleted or until we reach the end of the table. + let next_row = rows.next(); + let row = next_row?; + + // We found a row, let's check if it's visible to the transaction. + if let Some(visible_row) = self.find_last_visible_version(&tx, row) { + return Some(visible_row); + } + // If this row is not visible, continue to the next row + } } fn find_last_visible_version( @@ -1167,6 +1306,21 @@ impl MvStore { Ok(state_machine) } + pub fn delete_row_from_pager( + &self, + pager: Rc, + rowid: RowID, + column_count: usize, + ) -> Result> { + let state_machine: StateMachine = StateMachine::< + DeleteRowStateMachine, + >::new( + DeleteRowStateMachine::new(pager, rowid, column_count), + ); + + Ok(state_machine) + } + /// Try to scan for row ids in the table. /// /// This function loads all row ids of a table if the rowids of table were not populated yet. @@ -1356,18 +1510,21 @@ fn is_begin_visible( fn is_end_visible( txs: &SkipMap>, - tx: &Transaction, - rv: &RowVersion, + current_tx: &Transaction, + row_version: &RowVersion, ) -> bool { - match rv.end { - Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts, + match row_version.end { + Some(TxTimestampOrID::Timestamp(rv_end_ts)) => current_tx.begin_ts < rv_end_ts, Some(TxTimestampOrID::TxID(rv_end)) => { - let te = txs.get(&rv_end).unwrap(); - let te = te.value().read(); - let visible = match te.state.load() { - TransactionState::Active => tx.tx_id != te.tx_id, + let other_tx = txs.get(&rv_end).unwrap(); + let other_tx = other_tx.value().read(); + let visible = match other_tx.state.load() { + // V's sharp mind discovered an issue with the hekaton paper which basically states that a + // transaction can see a row version if the end is a TXId only if it isn't the same transaction. + // Source: https://avi.im/blag/2023/hekaton-paper-typo/ + TransactionState::Active => current_tx.tx_id != other_tx.tx_id, TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable! - TransactionState::Committed(committed_ts) => tx.begin_ts < committed_ts, + TransactionState::Committed(committed_ts) => current_tx.begin_ts < committed_ts, TransactionState::Aborted => false, TransactionState::Terminated => { tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now"); @@ -1375,9 +1532,9 @@ fn is_end_visible( } }; tracing::trace!( - "is_end_visible: tx={tx}, te={te} rv = {:?}-{:?} visible = {visible}", - rv.begin, - rv.end + "is_end_visible: tx={current_tx}, te={other_tx} rv = {:?}-{:?} visible = {visible}", + row_version.begin, + row_version.end ); visible } diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 138cb08f2..9972c7aca 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -1156,6 +1156,43 @@ fn test_connection_sees_other_connection_changes() { } } +#[test] +fn test_delete_with_conn() { + let db = MvccTestDbNoConn::new_with_random_db(); + let conn0 = db.connect(); + conn0.execute("CREATE TABLE test(t)").unwrap(); + + let mut inserts = vec![1, 2, 3, 4, 5, 6, 7]; + + for t in &inserts { + conn0 + .execute(format!("INSERT INTO test(t) VALUES ({t})")) + .unwrap(); + } + + conn0.execute("DELETE FROM test WHERE t = 5").unwrap(); + inserts.remove(4); + + let mut stmt = conn0.prepare("SELECT * FROM test").unwrap(); + let mut pos = 0; + loop { + let res = stmt.step().unwrap(); + match res { + StepResult::Row => { + let row = stmt.row().unwrap(); + let t = row.get_value(0).as_int().unwrap(); + assert_eq!(t, inserts[pos]); + pos += 1; + } + StepResult::Done => break, + StepResult::IO => { + stmt.run_once().unwrap(); + } + _ => panic!("Expected Row"), + } + } +} + fn get_record_value(row: &Row) -> ImmutableRecord { let mut record = ImmutableRecord::new(1024); record.start_serialization(&row.data); diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 2ee4e0b3e..0a3d67cd8 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -4421,7 +4421,11 @@ impl BTreeCursor { /// 9. Finish -> Delete operation is done. Return CursorResult(Ok()) #[instrument(skip(self), level = Level::DEBUG)] pub fn delete(&mut self) -> Result> { - assert!(self.mv_cursor.is_none()); + if let Some(mv_cursor) = &self.mv_cursor { + let rowid = mv_cursor.borrow_mut().current_row_id().unwrap(); + mv_cursor.borrow_mut().delete(rowid, self.pager.clone())?; + return Ok(IOResult::Done(())); + } if let CursorState::None = &self.state { self.state = CursorState::Delete(DeleteState::Start);