From b4ac38cd25b032701f2d643444266b033da0d04d Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 30 Jul 2025 12:52:45 +0200 Subject: [PATCH] core/mvcc: persist writes on mvcc commit On Mvcc `commit_txn` we need to persist changes to database, for this case we re-use pager's semantics of transactions: 1. If there are no conflicts, we start `pager.begin_write_txn` 2. `pager.end_txn`: We flush changes to WAL 3. We finish Mvcc transaction by marking rows with new timestamp. --- core/mvcc/database/mod.rs | 139 ++++++++++++++++++++++++++++++++++++-- core/storage/btree.rs | 8 ++- core/types.rs | 6 ++ core/vdbe/mod.rs | 3 +- 4 files changed, 148 insertions(+), 8 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index b3f71a871..47faae986 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1,9 +1,13 @@ use crate::mvcc::clock::LogicalClock; use crate::mvcc::errors::DatabaseError; use crate::mvcc::persistent_storage::Storage; +use crate::storage::btree::BTreeKey; +use crate::types::ImmutableRecord; +use crate::{Connection, Pager}; use crossbeam_skiplist::{SkipMap, SkipSet}; use parking_lot::RwLock; use std::fmt::Debug; +use std::rc::Rc; use std::sync::atomic::{AtomicU64, Ordering}; pub type Result = std::result::Result; @@ -13,6 +17,7 @@ mod tests; #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct RowID { + /// The table ID. Analogous to table's root page number. pub table_id: u64, pub row_id: i64, } @@ -28,11 +33,16 @@ impl RowID { pub struct Row { pub id: RowID, pub data: Vec, + pub column_count: usize, } impl Row { - pub fn new(id: RowID, data: Vec) -> Self { - Self { id, data } + pub fn new(id: RowID, data: Vec, column_count: usize) -> Self { + Self { + id, + data, + column_count, + } } } @@ -412,7 +422,7 @@ impl MvStore { /// Gets all row ids in the database for a given table. pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { tracing::trace!("scan_row_ids_for_table(table_id={})", table_id); - Ok(self + let rows: Vec = self .rows .range( RowID { @@ -424,7 +434,8 @@ impl MvStore { }, ) .map(|entry| *entry.key()) - .collect()) + .collect(); + Ok(rows) } pub fn get_row_id_range( @@ -502,7 +513,12 @@ impl MvStore { /// # Arguments /// /// * `tx_id` - The ID of the transaction to commit. - pub fn commit_tx(&self, tx_id: TxID) -> Result<()> { + pub fn commit_tx( + &self, + tx_id: TxID, + pager: Rc, + connection: &Arc, + ) -> Result<()> { let end_ts = self.get_timestamp(); // NOTICE: the first shadowed tx keeps the entry alive in the map // for the duration of this whole function, which is important for correctness! @@ -595,7 +611,63 @@ impl MvStore { let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); drop(tx); // Postprocessing: inserting row versions and logging the transaction to persistent storage. - // TODO: we should probably save to persistent storage first, and only then update the in-memory structures. + + // FIXME: how do we deal with multiple concurrent writes? + // WAL requires a txn to be written sequentially. Either we: + // 1. Wait for currently writer to finish before second txn starts. + // 2. Choose a txn to write depending on some heuristics like amount of frames will be written. + // 3. .. + // + if let crate::types::IOResult::Done(result) = pager + .begin_write_tx() + .map_err(|e| DatabaseError::Io(e.to_string())) + .unwrap() + { + if let crate::result::LimboResult::Busy = result { + return Err(DatabaseError::Io( + "Pager write transaction busy".to_string(), + )); + } + } + // 1. Write rows to btree for persistence + for ref id in &write_set { + if let Some(row_versions) = self.rows.get(id) { + let row_versions = row_versions.value().read().unwrap(); + // Find rows that were written by this transaction + for row_version in row_versions.iter() { + if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin { + if row_tx_id == tx_id { + self.write_row_to_pager(pager.clone(), &row_version.row)?; + break; + } + } + if let Some(TxTimestampOrID::Timestamp(row_tx_id)) = row_version.end { + if row_tx_id == tx_id { + self.write_row_to_pager(pager.clone(), &row_version.row)?; + break; + } + } + } + } + } + // Write committed data to pager for persistence + // Flush dirty pages to WAL - this is critical for data persistence + // Similar to what step_end_write_txn does for legacy transactions + loop { + let result = pager + .end_tx( + false, // rollback = false since we're committing + false, // schema_did_change = false for now (could be improved) + connection, + connection.wal_checkpoint_disabled.get(), + ) + .map_err(|e| DatabaseError::Io(e.to_string())) + .unwrap(); + if let crate::types::IOResult::Done(result) = result { + break; + } + } + // 2. Commit rows to log let mut log_record = LogRecord::new(end_ts); for ref id in write_set { if let Some(row_versions) = self.rows.get(id) { @@ -627,6 +699,7 @@ impl MvStore { } } tracing::trace!("updated(tx_id={})", tx_id); + // We have now updated all the versions with a reference to the // transaction ID to a timestamp and can, therefore, remove the // transaction. Please note that when we move to lockless, the @@ -798,6 +871,60 @@ impl MvStore { } versions.insert(position, row_version); } + + fn write_row_to_pager(&self, pager: Rc, row: &Row) -> Result<()> { + use crate::storage::btree::BTreeCursor; + use crate::types::{IOResult, SeekKey, SeekOp}; + + // The row.data is already a properly serialized SQLite record payload + // Create an ImmutableRecord and copy the data + let mut record = ImmutableRecord::new(row.data.len()); + record.start_serialization(&row.data); + + // Create a BTreeKey for the row + let key = BTreeKey::new_table_rowid(row.id.row_id, Some(&record)); + + // Get the column count from the row + let root_page = row.id.table_id as usize; + let num_columns = row.column_count; + + let mut cursor = BTreeCursor::new_table( + None, // Write directly to B-tree + pager, + root_page, + num_columns, + ); + + // Position the cursor first by seeking to the row position + let seek_key = SeekKey::TableRowId(row.id.row_id); + match cursor + .seek(seek_key, SeekOp::GE { eq_only: true }) + .map_err(|e| DatabaseError::Io(e.to_string()))? + { + IOResult::Done(_) => {} + IOResult::IO => { + panic!("IOResult::IO not supported in write_row_to_pager seek"); + } + } + + // Insert the record into the B-tree + match cursor + .insert(&key, true) + .map_err(|e| DatabaseError::Io(e.to_string()))? + { + IOResult::Done(()) => {} + IOResult::IO => { + panic!("IOResult::IO not supported in write_row_to_pager insert"); + } + } + + tracing::trace!( + "write_row_to_pager(table_id={}, row_id={})", + row.id.table_id, + row.id.row_id + ); + Ok(()) + } } /// A write-write conflict happens when transaction T_current attempts to update a diff --git a/core/storage/btree.rs b/core/storage/btree.rs index c664a863a..db64e578c 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -4433,7 +4433,13 @@ impl BTreeCursor { Some(rowid) => { let row_id = crate::mvcc::database::RowID::new(self.table_id() as u64, rowid); let record_buf = key.get_record().unwrap().get_payload().to_vec(); - let row = crate::mvcc::database::Row::new(row_id, record_buf); + let num_columns = match key { + BTreeKey::IndexKey(record) => record.column_count(), + BTreeKey::TableRowId((rowid, record)) => { + record.as_ref().unwrap().column_count() + } + }; + let row = crate::mvcc::database::Row::new(row_id, record_buf, num_columns); mv_cursor.borrow_mut().insert(row).unwrap(); } None => todo!("Support mvcc inserts with index btrees"), diff --git a/core/types.rs b/core/types.rs index 2adfd30de..b3f118fb6 100644 --- a/core/types.rs +++ b/core/types.rs @@ -1156,6 +1156,12 @@ impl ImmutableRecord { Err(_) => None, } } + + pub fn column_count(&self) -> usize { + let mut cursor = RecordCursor::new(); + cursor.parse_full_header(self).unwrap(); + cursor.offsets.len() + } } /// A cursor for lazily parsing SQLite record format data. diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 1e67932d3..eb2a24bb1 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -439,9 +439,10 @@ impl Program { let conn = self.connection.clone(); let auto_commit = conn.auto_commit.get(); if auto_commit { + // FIXME: we don't want to commit stuff from other programs. let mut mv_transactions = conn.mv_transactions.borrow_mut(); for tx_id in mv_transactions.iter() { - mv_store.commit_tx(*tx_id).unwrap(); + mv_store.commit_tx(*tx_id, pager.clone(), &conn).unwrap(); } mv_transactions.clear(); }