core/mvcc: persist writes on mvcc commit

On Mvcc `commit_txn` we need to persist changes to database, for this case we re-use pager's semantics of transactions:
1. If there are no conflicts, we start `pager.begin_write_txn`
2. `pager.end_txn`: We flush changes to WAL
3. We finish Mvcc transaction by marking rows with new timestamp.
This commit is contained in:
Pere Diaz Bou
2025-07-30 12:52:45 +02:00
parent 2233bb41c3
commit b4ac38cd25
4 changed files with 148 additions and 8 deletions

View File

@@ -1,9 +1,13 @@
use crate::mvcc::clock::LogicalClock;
use crate::mvcc::errors::DatabaseError;
use crate::mvcc::persistent_storage::Storage;
use crate::storage::btree::BTreeKey;
use crate::types::ImmutableRecord;
use crate::{Connection, Pager};
use crossbeam_skiplist::{SkipMap, SkipSet};
use parking_lot::RwLock;
use std::fmt::Debug;
use std::rc::Rc;
use std::sync::atomic::{AtomicU64, Ordering};
pub type Result<T> = std::result::Result<T, DatabaseError>;
@@ -13,6 +17,7 @@ mod tests;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RowID {
/// The table ID. Analogous to table's root page number.
pub table_id: u64,
pub row_id: i64,
}
@@ -28,11 +33,16 @@ impl RowID {
pub struct Row {
pub id: RowID,
pub data: Vec<u8>,
pub column_count: usize,
}
impl Row {
pub fn new(id: RowID, data: Vec<u8>) -> Self {
Self { id, data }
pub fn new(id: RowID, data: Vec<u8>, column_count: usize) -> Self {
Self {
id,
data,
column_count,
}
}
}
@@ -412,7 +422,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// Gets all row ids in the database for a given table.
pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
tracing::trace!("scan_row_ids_for_table(table_id={})", table_id);
Ok(self
let rows: Vec<RowID> = self
.rows
.range(
RowID {
@@ -424,7 +434,8 @@ impl<Clock: LogicalClock> MvStore<Clock> {
},
)
.map(|entry| *entry.key())
.collect())
.collect();
Ok(rows)
}
pub fn get_row_id_range(
@@ -502,7 +513,12 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// # Arguments
///
/// * `tx_id` - The ID of the transaction to commit.
pub fn commit_tx(&self, tx_id: TxID) -> Result<()> {
pub fn commit_tx(
&self,
tx_id: TxID,
pager: Rc<Pager>,
connection: &Arc<Connection>,
) -> Result<()> {
let end_ts = self.get_timestamp();
// NOTICE: the first shadowed tx keeps the entry alive in the map
// for the duration of this whole function, which is important for correctness!
@@ -595,7 +611,63 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
drop(tx);
// Postprocessing: inserting row versions and logging the transaction to persistent storage.
// TODO: we should probably save to persistent storage first, and only then update the in-memory structures.
// FIXME: how do we deal with multiple concurrent writes?
// WAL requires a txn to be written sequentially. Either we:
// 1. Wait for currently writer to finish before second txn starts.
// 2. Choose a txn to write depending on some heuristics like amount of frames will be written.
// 3. ..
//
if let crate::types::IOResult::Done(result) = pager
.begin_write_tx()
.map_err(|e| DatabaseError::Io(e.to_string()))
.unwrap()
{
if let crate::result::LimboResult::Busy = result {
return Err(DatabaseError::Io(
"Pager write transaction busy".to_string(),
));
}
}
// 1. Write rows to btree for persistence
for ref id in &write_set {
if let Some(row_versions) = self.rows.get(id) {
let row_versions = row_versions.value().read().unwrap();
// Find rows that were written by this transaction
for row_version in row_versions.iter() {
if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin {
if row_tx_id == tx_id {
self.write_row_to_pager(pager.clone(), &row_version.row)?;
break;
}
}
if let Some(TxTimestampOrID::Timestamp(row_tx_id)) = row_version.end {
if row_tx_id == tx_id {
self.write_row_to_pager(pager.clone(), &row_version.row)?;
break;
}
}
}
}
}
// Write committed data to pager for persistence
// Flush dirty pages to WAL - this is critical for data persistence
// Similar to what step_end_write_txn does for legacy transactions
loop {
let result = pager
.end_tx(
false, // rollback = false since we're committing
false, // schema_did_change = false for now (could be improved)
connection,
connection.wal_checkpoint_disabled.get(),
)
.map_err(|e| DatabaseError::Io(e.to_string()))
.unwrap();
if let crate::types::IOResult::Done(result) = result {
break;
}
}
// 2. Commit rows to log
let mut log_record = LogRecord::new(end_ts);
for ref id in write_set {
if let Some(row_versions) = self.rows.get(id) {
@@ -627,6 +699,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
}
}
tracing::trace!("updated(tx_id={})", tx_id);
// We have now updated all the versions with a reference to the
// transaction ID to a timestamp and can, therefore, remove the
// transaction. Please note that when we move to lockless, the
@@ -798,6 +871,60 @@ impl<Clock: LogicalClock> MvStore<Clock> {
}
versions.insert(position, row_version);
}
fn write_row_to_pager(&self, pager: Rc<Pager>, row: &Row) -> Result<()> {
use crate::storage::btree::BTreeCursor;
use crate::types::{IOResult, SeekKey, SeekOp};
// The row.data is already a properly serialized SQLite record payload
// Create an ImmutableRecord and copy the data
let mut record = ImmutableRecord::new(row.data.len());
record.start_serialization(&row.data);
// Create a BTreeKey for the row
let key = BTreeKey::new_table_rowid(row.id.row_id, Some(&record));
// Get the column count from the row
let root_page = row.id.table_id as usize;
let num_columns = row.column_count;
let mut cursor = BTreeCursor::new_table(
None, // Write directly to B-tree
pager,
root_page,
num_columns,
);
// Position the cursor first by seeking to the row position
let seek_key = SeekKey::TableRowId(row.id.row_id);
match cursor
.seek(seek_key, SeekOp::GE { eq_only: true })
.map_err(|e| DatabaseError::Io(e.to_string()))?
{
IOResult::Done(_) => {}
IOResult::IO => {
panic!("IOResult::IO not supported in write_row_to_pager seek");
}
}
// Insert the record into the B-tree
match cursor
.insert(&key, true)
.map_err(|e| DatabaseError::Io(e.to_string()))?
{
IOResult::Done(()) => {}
IOResult::IO => {
panic!("IOResult::IO not supported in write_row_to_pager insert");
}
}
tracing::trace!(
"write_row_to_pager(table_id={}, row_id={})",
row.id.table_id,
row.id.row_id
);
Ok(())
}
}
/// A write-write conflict happens when transaction T_current attempts to update a

View File

@@ -4433,7 +4433,13 @@ impl BTreeCursor {
Some(rowid) => {
let row_id = crate::mvcc::database::RowID::new(self.table_id() as u64, rowid);
let record_buf = key.get_record().unwrap().get_payload().to_vec();
let row = crate::mvcc::database::Row::new(row_id, record_buf);
let num_columns = match key {
BTreeKey::IndexKey(record) => record.column_count(),
BTreeKey::TableRowId((rowid, record)) => {
record.as_ref().unwrap().column_count()
}
};
let row = crate::mvcc::database::Row::new(row_id, record_buf, num_columns);
mv_cursor.borrow_mut().insert(row).unwrap();
}
None => todo!("Support mvcc inserts with index btrees"),

View File

@@ -1156,6 +1156,12 @@ impl ImmutableRecord {
Err(_) => None,
}
}
pub fn column_count(&self) -> usize {
let mut cursor = RecordCursor::new();
cursor.parse_full_header(self).unwrap();
cursor.offsets.len()
}
}
/// A cursor for lazily parsing SQLite record format data.

View File

@@ -439,9 +439,10 @@ impl Program {
let conn = self.connection.clone();
let auto_commit = conn.auto_commit.get();
if auto_commit {
// FIXME: we don't want to commit stuff from other programs.
let mut mv_transactions = conn.mv_transactions.borrow_mut();
for tx_id in mv_transactions.iter() {
mv_store.commit_tx(*tx_id).unwrap();
mv_store.commit_tx(*tx_id, pager.clone(), &conn).unwrap();
}
mv_transactions.clear();
}