From 39fb5913e07803375c224bf2f089bfe9671c80da Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 12 Sep 2025 14:00:02 +0000 Subject: [PATCH] core/mvcc: queue write txn commits in mvcc on pager end_tx Flushing mvcc changes to disk requires serialization. To do so we simply introduce a lock for pager.end_tx, which will take ownership of flushing to WAL. Once this is finished we can simply release lock. --- core/mvcc/database/mod.rs | 407 +++++++++++++++++++++++++------------- 1 file changed, 271 insertions(+), 136 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index dc67120f9..08548a94b 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -7,14 +7,19 @@ use crate::state_machine::StateTransition; use crate::state_machine::TransitionResult; use crate::storage::btree::BTreeCursor; use crate::storage::btree::BTreeKey; +use crate::storage::btree::CursorValidState; +use crate::storage::sqlite3_ondisk::DatabaseHeader; +use crate::storage::wal::TursoRwLock; use crate::types::IOResult; use crate::types::ImmutableRecord; +use crate::Completion; use crate::IOExt; use crate::LimboError; use crate::Result; use crate::{Connection, Pager}; use crossbeam_skiplist::{SkipMap, SkipSet}; use parking_lot::RwLock; +use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Debug; use std::marker::PhantomData; @@ -245,20 +250,43 @@ impl AtomicTransactionState { #[derive(Debug)] pub enum CommitState { Initial, - BeginPagerTxn { end_ts: u64 }, - WriteRow { end_ts: u64, write_set_index: usize }, - WriteRowStateMachine { end_ts: u64, write_set_index: usize }, - DeleteRowStateMachine { end_ts: u64, write_set_index: usize }, - CommitPagerTxn { end_ts: u64 }, - Commit { end_ts: u64 }, + BeginPagerTxn { + end_ts: u64, + }, + WriteRow { + end_ts: u64, + write_set_index: usize, + requires_seek: bool, + }, + WriteRowStateMachine { + end_ts: u64, + write_set_index: usize, + }, + DeleteRowStateMachine { + end_ts: u64, + write_set_index: usize, + }, + CommitPagerTxn { + end_ts: u64, + }, + Commit { + end_ts: u64, + }, } #[derive(Debug)] pub enum WriteRowState { Initial, - CreateCursor, Seek, Insert, + /// Move to the next record in order to leave the cursor in the next position, this is used for inserting multiple rows for optimizations. + Next, +} + +#[derive(Debug)] +struct CommitCoordinator { + pager_commit_lock: Arc, + commits_waiting: Arc, } pub struct CommitStateMachine { @@ -267,9 +295,13 @@ pub struct CommitStateMachine { pager: Rc, tx_id: TxID, connection: Arc, + /// Write set sorted by table id and row id write_set: Vec, write_row_state_machine: Option>, delete_row_state_machine: Option>, + commit_coordinator: Arc, + cursors: HashMap>>, + header: Arc>>, _phantom: PhantomData, } @@ -285,16 +317,15 @@ impl Debug for CommitStateMachine { pub struct WriteRowStateMachine { state: WriteRowState, is_finalized: bool, - pager: Rc, row: Row, record: Option, - cursor: Option, + cursor: Arc>, + requires_seek: bool, } #[derive(Debug)] pub enum DeleteRowState { Initial, - CreateCursor, Seek, Delete, } @@ -302,14 +333,19 @@ pub enum DeleteRowState { pub struct DeleteRowStateMachine { state: DeleteRowState, is_finalized: bool, - pager: Rc, rowid: RowID, - column_count: usize, - cursor: Option, + cursor: Arc>, } impl CommitStateMachine { - fn new(state: CommitState, pager: Rc, tx_id: TxID, connection: Arc) -> Self { + fn new( + state: CommitState, + pager: Rc, + tx_id: TxID, + connection: Arc, + commit_coordinator: Arc, + header: Arc>>, + ) -> Self { Self { state, is_finalized: false, @@ -319,20 +355,23 @@ impl CommitStateMachine { write_set: Vec::new(), write_row_state_machine: None, delete_row_state_machine: None, + commit_coordinator, + cursors: HashMap::new(), + header, _phantom: PhantomData, } } } impl WriteRowStateMachine { - fn new(pager: Rc, row: Row) -> Self { + fn new(row: Row, cursor: Arc>, requires_seek: bool) -> Self { Self { state: WriteRowState::Initial, is_finalized: false, - pager, row, record: None, - cursor: None, + cursor, + requires_seek, } } } @@ -441,6 +480,8 @@ impl StateTransition for CommitStateMachine { tracing::trace!("commit_tx(tx_id={})", self.tx_id); self.write_set .extend(tx.write_set.iter().map(|v| *v.value())); + self.write_set + .sort_by(|a, b| a.table_id.cmp(&b.table_id).then(a.row_id.cmp(&b.row_id))); self.state = CommitState::BeginPagerTxn { end_ts }; Ok(TransitionResult::Continue) } @@ -453,21 +494,62 @@ impl StateTransition for CommitStateMachine { // If this is the exclusive transaction, we already acquired a write transaction // on the pager in begin_exclusive_tx() and don't need to acquire it. - if !mvcc_store.is_exclusive_tx(&self.tx_id) { - let result = self.pager.io.block(|| self.pager.begin_write_tx())?; - if let LimboResult::Busy = result { - return Err(LimboError::Busy); + if mvcc_store.is_exclusive_tx(&self.tx_id) { + self.state = CommitState::WriteRow { + end_ts, + write_set_index: 0, + requires_seek: true, + }; + return Ok(TransitionResult::Continue); + } + // Currently txns are queued without any heuristics whasoever. This is important because + // we need to ensure writes to disk happen sequentially. + // * We don't want txns to write to WAL in parallel. + // * We don't want BTree modifications to happen in parallel. + // If any of these were to happen, we would find ourselves in a bad corruption situation. + + // NOTE: since we are blocking for `begin_write_tx` we do not care about re-entrancy right now. + let locked = self.commit_coordinator.pager_commit_lock.write(); + if !locked { + self.commit_coordinator + .commits_waiting + .fetch_add(1, Ordering::SeqCst); + // FIXME: IOCompletions still needs a yield variant... + return Ok(TransitionResult::Io(crate::types::IOCompletions::Single( + Completion::new_dummy(), + ))); + } + { + let mut wal = self.pager.wal.as_ref().unwrap().borrow_mut(); + // we need to update the max frame to the latest shared max frame in order to avoid snapshot staleness + wal.update_max_frame(); + } + // TODO: Force updated header? + { + if let Some(last_commited_header) = self.header.read().as_ref() { + self.pager.io.block(|| { + self.pager.with_header_mut(|header_in_pager| { + header_in_pager.database_size = last_commited_header.database_size; + // TODO: deal with other fields + }) + })?; } } + let result = self.pager.io.block(|| self.pager.begin_write_tx())?; + if let crate::result::LimboResult::Busy = result { + panic!("Pager write transaction busy, in mvcc this should never happen"); + } self.state = CommitState::WriteRow { end_ts, write_set_index: 0, + requires_seek: true, }; return Ok(TransitionResult::Continue); } CommitState::WriteRow { end_ts, write_set_index, + requires_seek, } => { if write_set_index == self.write_set.len() { self.state = CommitState::CommitPagerTxn { end_ts }; @@ -480,9 +562,26 @@ impl StateTransition for CommitStateMachine { for row_version in row_versions.iter() { if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin { if row_tx_id == self.tx_id { - let state_machine = mvcc_store - .write_row_to_pager(self.pager.clone(), &row_version.row)?; + let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) { + cursor.clone() + } else { + let cursor = BTreeCursor::new_table( + None, // Write directly to B-tree + self.pager.clone(), + id.table_id as usize, + row_version.row.column_count, + ); + let cursor = Arc::new(RwLock::new(cursor)); + self.cursors.insert(id.table_id, cursor.clone()); + cursor + }; + let state_machine = mvcc_store.write_row_to_pager( + &row_version.row, + cursor, + requires_seek, + )?; self.write_row_state_machine = Some(state_machine); + self.state = CommitState::WriteRowStateMachine { end_ts, write_set_index, @@ -493,11 +592,21 @@ impl StateTransition for CommitStateMachine { if let Some(TxTimestampOrID::TxID(row_tx_id)) = row_version.end { if row_tx_id == self.tx_id { let column_count = row_version.row.column_count; - let state_machine = mvcc_store.delete_row_from_pager( - self.pager.clone(), - row_version.row.id, - column_count, - )?; + let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) { + cursor.clone() + } else { + let cursor = BTreeCursor::new_table( + None, // Write directly to B-tree + self.pager.clone(), + id.table_id as usize, + column_count, + ); + let cursor = Arc::new(RwLock::new(cursor)); + self.cursors.insert(id.table_id, cursor.clone()); + cursor + }; + let state_machine = + mvcc_store.delete_row_from_pager(row_version.row.id, cursor)?; self.delete_row_state_machine = Some(state_machine); self.state = CommitState::DeleteRowStateMachine { end_ts, @@ -522,9 +631,26 @@ impl StateTransition for CommitStateMachine { return Ok(TransitionResult::Continue); } TransitionResult::Done(_) => { + let requires_seek = { + if let Some(next_id) = self.write_set.get(write_set_index + 1) { + let current_id = &self.write_set[write_set_index]; + if current_id.table_id == next_id.table_id + && current_id.row_id + 1 == next_id.row_id + { + // simple optimizaiton for sequential inserts with inceasing by 1 ids + // we should probably just check record in next row and see if it requires seek + false + } else { + true + } + } else { + false + } + }; self.state = CommitState::WriteRow { end_ts, write_set_index: write_set_index + 1, + requires_seek, }; return Ok(TransitionResult::Continue); } @@ -544,6 +670,7 @@ impl StateTransition for CommitStateMachine { self.state = CommitState::WriteRow { end_ts, write_set_index: write_set_index + 1, + requires_seek: true, }; return Ok(TransitionResult::Continue); } @@ -553,27 +680,32 @@ impl StateTransition for CommitStateMachine { // Write committed data to pager for persistence // Flush dirty pages to WAL - this is critical for data persistence // Similar to what step_end_write_txn does for legacy transactions - loop { - let result = self - .pager - .end_tx( - false, // rollback = false since we're committing - &self.connection, - ) - .map_err(|e| LimboError::InternalError(e.to_string())) - .unwrap(); - match result { - crate::types::IOResult::Done(_) => { - break; - } - crate::types::IOResult::IO(io) => { - io.wait(self.pager.io.as_ref())?; - continue; - } + + let result = self + .pager + .end_tx( + false, // rollback = false since we're committing + &self.connection, + ) + .map_err(|e| LimboError::InternalError(e.to_string())) + .unwrap(); + match result { + IOResult::Done(_) => { + // FIXME: hack for now to keep database header updated for pager commit + self.pager.io.block(|| { + self.pager.with_header(|header| { + self.header.write().replace(*header); + }) + })?; + self.commit_coordinator.pager_commit_lock.unlock(); + // TODO: here mark we are ready for a batch + self.state = CommitState::Commit { end_ts }; + return Ok(TransitionResult::Continue); + } + IOResult::IO(io) => { + return Ok(TransitionResult::Io(io)); } } - self.state = CommitState::Commit { end_ts }; - Ok(TransitionResult::Continue) } CommitState::Commit { end_ts } => { let mut log_record = LogRecord::new(end_ts); @@ -648,7 +780,6 @@ impl StateTransition for WriteRowStateMachine { #[tracing::instrument(fields(state = ?self.state), skip(self, _context))] fn step(&mut self, _context: &Self::Context) -> Result> { - use crate::storage::btree::BTreeCursor; use crate::types::{IOResult, SeekKey, SeekOp}; match self.state { @@ -658,62 +789,63 @@ impl StateTransition for WriteRowStateMachine { record.start_serialization(&self.row.data); self.record = Some(record); - self.state = WriteRowState::CreateCursor; - Ok(TransitionResult::Continue) - } - WriteRowState::CreateCursor => { - // Create the cursor - let root_page = self.row.id.table_id as usize; - let num_columns = self.row.column_count; - - let cursor = BTreeCursor::new_table( - None, // Write directly to B-tree - self.pager.clone(), - root_page, - num_columns, - ); - self.cursor = Some(cursor); - - self.state = WriteRowState::Seek; + if self.requires_seek { + self.state = WriteRowState::Seek; + } else { + self.state = WriteRowState::Insert; + } Ok(TransitionResult::Continue) } WriteRowState::Seek => { // Position the cursor by seeking to the row position let seek_key = SeekKey::TableRowId(self.row.id.row_id); - let cursor = self.cursor.as_mut().unwrap(); - match cursor.seek(seek_key, SeekOp::GE { eq_only: true })? { - IOResult::Done(_) => { - self.state = WriteRowState::Insert; - Ok(TransitionResult::Continue) - } + match self + .cursor + .write() + .seek(seek_key, SeekOp::GE { eq_only: true })? + { + IOResult::Done(_) => {} IOResult::IO(io) => { return Ok(TransitionResult::Io(io)); } } + assert_eq!(self.cursor.write().valid_state, CursorValidState::Valid); + self.state = WriteRowState::Insert; + Ok(TransitionResult::Continue) } WriteRowState::Insert => { // Insert the record into the B-tree - let cursor = self.cursor.as_mut().unwrap(); let key = BTreeKey::new_table_rowid(self.row.id.row_id, self.record.as_ref()); - match cursor + match self + .cursor + .write() .insert(&key) - .map_err(|e| LimboError::InternalError(e.to_string()))? + .map_err(|e: LimboError| LimboError::InternalError(e.to_string()))? { - IOResult::Done(()) => { - tracing::trace!( - "write_row_to_pager(table_id={}, row_id={})", - self.row.id.table_id, - self.row.id.row_id - ); - self.finalize(&())?; - Ok(TransitionResult::Done(())) - } + IOResult::Done(()) => {} IOResult::IO(io) => { return Ok(TransitionResult::Io(io)); } } + self.state = WriteRowState::Next; + Ok(TransitionResult::Continue) + } + WriteRowState::Next => { + match self + .cursor + .write() + .next() + .map_err(|e: LimboError| LimboError::InternalError(e.to_string()))? + { + IOResult::Done(_) => {} + IOResult::IO(io) => { + return Ok(TransitionResult::Io(io)); + } + } + self.finalize(&())?; + Ok(TransitionResult::Done(())) } } } @@ -734,30 +866,21 @@ impl StateTransition for DeleteRowStateMachine { #[tracing::instrument(fields(state = ?self.state), skip(self, _context))] fn step(&mut self, _context: &Self::Context) -> Result> { - use crate::storage::btree::BTreeCursor; use crate::types::{IOResult, SeekKey, SeekOp}; match self.state { DeleteRowState::Initial => { - self.state = DeleteRowState::CreateCursor; - Ok(TransitionResult::Continue) - } - DeleteRowState::CreateCursor => { - let root_page = self.rowid.table_id as usize; - let num_columns = self.column_count; - - let cursor = - BTreeCursor::new_table(None, self.pager.clone(), root_page, num_columns); - self.cursor = Some(cursor); - self.state = DeleteRowState::Seek; Ok(TransitionResult::Continue) } DeleteRowState::Seek => { let seek_key = SeekKey::TableRowId(self.rowid.row_id); - let cursor = self.cursor.as_mut().unwrap(); - match cursor.seek(seek_key, SeekOp::GE { eq_only: true })? { + match self + .cursor + .write() + .seek(seek_key, SeekOp::GE { eq_only: true })? + { IOResult::Done(_) => { self.state = DeleteRowState::Delete; Ok(TransitionResult::Continue) @@ -769,25 +892,25 @@ impl StateTransition for DeleteRowStateMachine { } DeleteRowState::Delete => { // Insert the record into the B-tree - let cursor = self.cursor.as_mut().unwrap(); - match cursor + match self + .cursor + .write() .delete() .map_err(|e| LimboError::InternalError(e.to_string()))? { - IOResult::Done(()) => { - tracing::trace!( - "delete_row_from_pager(table_id={}, row_id={})", - self.rowid.table_id, - self.rowid.row_id - ); - self.finalize(&())?; - Ok(TransitionResult::Done(())) - } + IOResult::Done(()) => {} IOResult::IO(io) => { return Ok(TransitionResult::Io(io)); } } + tracing::trace!( + "delete_row_from_pager(table_id={}, row_id={})", + self.rowid.table_id, + self.rowid.row_id + ); + self.finalize(&())?; + Ok(TransitionResult::Done(())) } } } @@ -803,14 +926,12 @@ impl StateTransition for DeleteRowStateMachine { } impl DeleteRowStateMachine { - fn new(pager: Rc, rowid: RowID, column_count: usize) -> Self { + fn new(rowid: RowID, cursor: Arc>) -> Self { Self { state: DeleteRowState::Initial, is_finalized: false, - pager, rowid, - column_count, - cursor: None, + cursor, } } } @@ -832,6 +953,8 @@ pub struct MvStore { /// every other MVCC transaction must wait for it to commit before they can commit. We have /// exclusive transactions to support single-writer semantics for compatibility with SQLite. exclusive_tx: RwLock>, + commit_coordinator: Arc, + header: Arc>>, } impl MvStore { @@ -846,6 +969,11 @@ impl MvStore { storage, loaded_tables: RwLock::new(HashSet::new()), exclusive_tx: RwLock::new(None), + commit_coordinator: Arc::new(CommitCoordinator { + pager_commit_lock: Arc::new(TursoRwLock::new()), + commits_waiting: Arc::new(AtomicU64::new(0)), + }), + header: Arc::new(RwLock::new(None)), } } @@ -1134,14 +1262,18 @@ impl MvStore { } LimboResult::Ok => {} } + let locked = self.commit_coordinator.pager_commit_lock.write(); + if !locked { + self.release_exclusive_tx(&tx_id); + pager.end_read_tx()?; + return Err(LimboError::Busy); + } // Try to acquire the pager write lock match return_if_io!(pager.begin_write_tx()) { LimboResult::Busy => { tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id); // Failed to get pager lock - release our exclusive lock - self.release_exclusive_tx(&tx_id); - pager.end_read_tx()?; - return Err(LimboError::Busy); + panic!("begin_exclusive_tx: tx_id={tx_id} failed with Busy, this should never happen as we were able to lock mvcc exclusive write lock"); } LimboResult::Ok => { let tx = Transaction::new(tx_id, begin_ts); @@ -1191,11 +1323,15 @@ impl MvStore { connection: &Arc, ) -> Result>> { tracing::trace!("commit_tx(tx_id={})", tx_id); - let state_machine: StateMachine> = StateMachine::< - CommitStateMachine, - >::new( - CommitStateMachine::new(CommitState::Initial, pager, tx_id, connection.clone()), - ); + let state_machine: StateMachine> = + StateMachine::>::new(CommitStateMachine::new( + CommitState::Initial, + pager, + tx_id, + connection.clone(), + self.commit_coordinator.clone(), + self.header.clone(), + )); Ok(state_machine) } @@ -1343,6 +1479,7 @@ impl MvStore { } // Extracts the begin timestamp from a transaction + #[inline] fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 { match ts_or_id { TxTimestampOrID::Timestamp(ts) => *ts, @@ -1367,13 +1504,13 @@ impl MvStore { // another data structure, e.g. a BTreeSet. If it proves to be too quadratic empirically, // we can either switch to a tree-like structure, or at least use partition_point() // which performs a binary search for the insertion point. - let position = versions - .iter() - .rposition(|v| { - self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin) - }) - .map(|p| p + 1) - .unwrap_or(0); + let mut position = 0_usize; + for (i, v) in versions.iter().rev().enumerate() { + if self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin) { + position = i + 1; + break; + } + } if versions.len() - position > 3 { tracing::debug!( "Inserting a row version {} positions from the end", @@ -1385,13 +1522,15 @@ impl MvStore { pub fn write_row_to_pager( &self, - pager: Rc, row: &Row, + cursor: Arc>, + requires_seek: bool, ) -> Result> { let state_machine: StateMachine = StateMachine::::new(WriteRowStateMachine::new( - pager, row.clone(), + cursor, + requires_seek, )); Ok(state_machine) @@ -1399,15 +1538,11 @@ impl MvStore { pub fn delete_row_from_pager( &self, - pager: Rc, rowid: RowID, - column_count: usize, + cursor: Arc>, ) -> Result> { - let state_machine: StateMachine = StateMachine::< - DeleteRowStateMachine, - >::new( - DeleteRowStateMachine::new(pager, rowid, column_count), - ); + let state_machine: StateMachine = + StateMachine::::new(DeleteRowStateMachine::new(rowid, cursor)); Ok(state_machine) }