Merge 'core/mvcc: support for MVCC' from Pere Diaz Bou

This PR tries to add simple support for delete, with limited testing for
now.
Moreover, there was an error with `forward`, which wasn't obvious
without delete, which didn't skip deleted rows.

Reviewed-by: Avinash Sajjanshetty (@avinassh)

Closes #2672
This commit is contained in:
Pekka Enberg
2025-08-20 16:51:31 +03:00
committed by GitHub
4 changed files with 225 additions and 22 deletions

View File

@@ -52,6 +52,11 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
Ok(())
}
pub fn delete(&mut self, rowid: RowID, pager: Rc<Pager>) -> Result<()> {
self.db.delete(self.tx_id, rowid, pager)?;
Ok(())
}
pub fn current_row_id(&mut self) -> Option<RowID> {
match self.current_pos {
CursorPosition::Loaded(id) => Some(id),

View File

@@ -246,6 +246,7 @@ pub enum CommitState {
BeginPagerTxn { end_ts: u64 },
WriteRow { end_ts: u64, write_set_index: usize },
WriteRowStateMachine { end_ts: u64, write_set_index: usize },
DeleteRowStateMachine { end_ts: u64, write_set_index: usize },
CommitPagerTxn { end_ts: u64 },
Commit { end_ts: u64 },
}
@@ -266,6 +267,7 @@ pub struct CommitStateMachine<Clock: LogicalClock> {
connection: Arc<Connection>,
write_set: Vec<RowID>,
write_row_state_machine: Option<StateMachine<WriteRowStateMachine>>,
delete_row_state_machine: Option<StateMachine<DeleteRowStateMachine>>,
_phantom: PhantomData<Clock>,
}
@@ -278,6 +280,23 @@ pub struct WriteRowStateMachine {
cursor: Option<BTreeCursor>,
}
#[derive(Debug)]
pub enum DeleteRowState {
Initial,
CreateCursor,
Seek,
Delete,
}
pub struct DeleteRowStateMachine {
state: DeleteRowState,
is_finalized: bool,
pager: Rc<Pager>,
rowid: RowID,
column_count: usize,
cursor: Option<BTreeCursor>,
}
impl<Clock: LogicalClock> CommitStateMachine<Clock> {
fn new(state: CommitState, pager: Rc<Pager>, tx_id: TxID, connection: Arc<Connection>) -> Self {
Self {
@@ -288,6 +307,7 @@ impl<Clock: LogicalClock> CommitStateMachine<Clock> {
connection,
write_set: Vec::new(),
write_row_state_machine: None,
delete_row_state_machine: None,
_phantom: PhantomData,
}
}
@@ -457,12 +477,16 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
break;
}
}
if let Some(TxTimestampOrID::Timestamp(row_tx_id)) = row_version.end {
if let Some(TxTimestampOrID::TxID(row_tx_id)) = row_version.end {
if row_tx_id == self.tx_id {
let state_machine = mvcc_store
.write_row_to_pager(self.pager.clone(), &row_version.row)?;
self.write_row_state_machine = Some(state_machine);
self.state = CommitState::WriteRowStateMachine {
let column_count = row_version.row.column_count;
let state_machine = mvcc_store.delete_row_from_pager(
self.pager.clone(),
row_version.row.id,
column_count,
)?;
self.delete_row_state_machine = Some(state_machine);
self.state = CommitState::DeleteRowStateMachine {
end_ts,
write_set_index,
};
@@ -473,6 +497,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
}
Ok(TransitionResult::Continue)
}
CommitState::WriteRowStateMachine {
end_ts,
write_set_index,
@@ -492,6 +517,25 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
}
}
}
CommitState::DeleteRowStateMachine {
end_ts,
write_set_index,
} => {
let delete_row_state_machine = self.delete_row_state_machine.as_mut().unwrap();
match delete_row_state_machine.step(&())? {
TransitionResult::Io(io) => return Ok(TransitionResult::Io(io)),
TransitionResult::Continue => {
return Ok(TransitionResult::Continue);
}
TransitionResult::Done(_) => {
self.state = CommitState::WriteRow {
end_ts,
write_set_index: write_set_index + 1,
};
return Ok(TransitionResult::Continue);
}
}
}
CommitState::CommitPagerTxn { end_ts } => {
// Write committed data to pager for persistence
// Flush dirty pages to WAL - this is critical for data persistence
@@ -667,6 +711,93 @@ impl StateTransition for WriteRowStateMachine {
}
}
impl StateTransition for DeleteRowStateMachine {
type Context = ();
type SMResult = ();
#[tracing::instrument(fields(state = ?self.state), skip(self, _context))]
fn step(&mut self, _context: &Self::Context) -> Result<TransitionResult<Self::SMResult>> {
use crate::storage::btree::BTreeCursor;
use crate::types::{IOResult, SeekKey, SeekOp};
match self.state {
DeleteRowState::Initial => {
self.state = DeleteRowState::CreateCursor;
Ok(TransitionResult::Continue)
}
DeleteRowState::CreateCursor => {
let root_page = self.rowid.table_id as usize;
let num_columns = self.column_count;
let cursor =
BTreeCursor::new_table(None, self.pager.clone(), root_page, num_columns);
self.cursor = Some(cursor);
self.state = DeleteRowState::Seek;
Ok(TransitionResult::Continue)
}
DeleteRowState::Seek => {
let seek_key = SeekKey::TableRowId(self.rowid.row_id);
let cursor = self.cursor.as_mut().unwrap();
match cursor.seek(seek_key, SeekOp::GE { eq_only: true })? {
IOResult::Done(_) => {
self.state = DeleteRowState::Delete;
Ok(TransitionResult::Continue)
}
IOResult::IO(io) => {
return Ok(TransitionResult::Io(io));
}
}
}
DeleteRowState::Delete => {
// Insert the record into the B-tree
let cursor = self.cursor.as_mut().unwrap();
match cursor
.delete()
.map_err(|e| LimboError::InternalError(e.to_string()))?
{
IOResult::Done(()) => {
tracing::trace!(
"delete_row_from_pager(table_id={}, row_id={})",
self.rowid.table_id,
self.rowid.row_id
);
self.finalize(&())?;
Ok(TransitionResult::Done(()))
}
IOResult::IO(io) => {
return Ok(TransitionResult::Io(io));
}
}
}
}
}
fn finalize(&mut self, _context: &Self::Context) -> Result<()> {
self.is_finalized = true;
Ok(())
}
fn is_finalized(&self) -> bool {
self.is_finalized
}
}
impl DeleteRowStateMachine {
fn new(pager: Rc<Pager>, rowid: RowID, column_count: usize) -> Self {
Self {
state: DeleteRowState::Initial,
is_finalized: false,
pager,
rowid,
column_count,
cursor: None,
}
}
}
/// A multi-version concurrency control database.
#[derive(Debug)]
pub struct MvStore<Clock: LogicalClock> {
@@ -911,10 +1042,18 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let tx = self.txs.get(&tx_id).unwrap();
let tx = tx.value().read();
let mut rows = self.rows.range(min_bound..max_bound);
rows.next().and_then(|row| {
// Find last valid version based on transaction.
self.find_last_visible_version(&tx, row)
})
loop {
// We are moving forward, so if a row was deleted we just need to skip it. Therefore, we need
// to loop either until we find a row that is not deleted or until we reach the end of the table.
let next_row = rows.next();
let row = next_row?;
// We found a row, let's check if it's visible to the transaction.
if let Some(visible_row) = self.find_last_visible_version(&tx, row) {
return Some(visible_row);
}
// If this row is not visible, continue to the next row
}
}
fn find_last_visible_version(
@@ -1167,6 +1306,21 @@ impl<Clock: LogicalClock> MvStore<Clock> {
Ok(state_machine)
}
pub fn delete_row_from_pager(
&self,
pager: Rc<Pager>,
rowid: RowID,
column_count: usize,
) -> Result<StateMachine<DeleteRowStateMachine>> {
let state_machine: StateMachine<DeleteRowStateMachine> = StateMachine::<
DeleteRowStateMachine,
>::new(
DeleteRowStateMachine::new(pager, rowid, column_count),
);
Ok(state_machine)
}
/// Try to scan for row ids in the table.
///
/// This function loads all row ids of a table if the rowids of table were not populated yet.
@@ -1356,18 +1510,21 @@ fn is_begin_visible(
fn is_end_visible(
txs: &SkipMap<TxID, RwLock<Transaction>>,
tx: &Transaction,
rv: &RowVersion,
current_tx: &Transaction,
row_version: &RowVersion,
) -> bool {
match rv.end {
Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts,
match row_version.end {
Some(TxTimestampOrID::Timestamp(rv_end_ts)) => current_tx.begin_ts < rv_end_ts,
Some(TxTimestampOrID::TxID(rv_end)) => {
let te = txs.get(&rv_end).unwrap();
let te = te.value().read();
let visible = match te.state.load() {
TransactionState::Active => tx.tx_id != te.tx_id,
let other_tx = txs.get(&rv_end).unwrap();
let other_tx = other_tx.value().read();
let visible = match other_tx.state.load() {
// V's sharp mind discovered an issue with the hekaton paper which basically states that a
// transaction can see a row version if the end is a TXId only if it isn't the same transaction.
// Source: https://avi.im/blag/2023/hekaton-paper-typo/
TransactionState::Active => current_tx.tx_id != other_tx.tx_id,
TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable!
TransactionState::Committed(committed_ts) => tx.begin_ts < committed_ts,
TransactionState::Committed(committed_ts) => current_tx.begin_ts < committed_ts,
TransactionState::Aborted => false,
TransactionState::Terminated => {
tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now");
@@ -1375,9 +1532,9 @@ fn is_end_visible(
}
};
tracing::trace!(
"is_end_visible: tx={tx}, te={te} rv = {:?}-{:?} visible = {visible}",
rv.begin,
rv.end
"is_end_visible: tx={current_tx}, te={other_tx} rv = {:?}-{:?} visible = {visible}",
row_version.begin,
row_version.end
);
visible
}

View File

@@ -1156,6 +1156,43 @@ fn test_connection_sees_other_connection_changes() {
}
}
#[test]
fn test_delete_with_conn() {
let db = MvccTestDbNoConn::new_with_random_db();
let conn0 = db.connect();
conn0.execute("CREATE TABLE test(t)").unwrap();
let mut inserts = vec![1, 2, 3, 4, 5, 6, 7];
for t in &inserts {
conn0
.execute(format!("INSERT INTO test(t) VALUES ({t})"))
.unwrap();
}
conn0.execute("DELETE FROM test WHERE t = 5").unwrap();
inserts.remove(4);
let mut stmt = conn0.prepare("SELECT * FROM test").unwrap();
let mut pos = 0;
loop {
let res = stmt.step().unwrap();
match res {
StepResult::Row => {
let row = stmt.row().unwrap();
let t = row.get_value(0).as_int().unwrap();
assert_eq!(t, inserts[pos]);
pos += 1;
}
StepResult::Done => break,
StepResult::IO => {
stmt.run_once().unwrap();
}
_ => panic!("Expected Row"),
}
}
}
fn get_record_value(row: &Row) -> ImmutableRecord {
let mut record = ImmutableRecord::new(1024);
record.start_serialization(&row.data);

View File

@@ -4421,7 +4421,11 @@ impl BTreeCursor {
/// 9. Finish -> Delete operation is done. Return CursorResult(Ok())
#[instrument(skip(self), level = Level::DEBUG)]
pub fn delete(&mut self) -> Result<IOResult<()>> {
assert!(self.mv_cursor.is_none());
if let Some(mv_cursor) = &self.mv_cursor {
let rowid = mv_cursor.borrow_mut().current_row_id().unwrap();
mv_cursor.borrow_mut().delete(rowid, self.pager.clone())?;
return Ok(IOResult::Done(()));
}
if let CursorState::None = &self.state {
self.state = CursorState::Delete(DeleteState::Start);