core/mvcc: queue write txn commits in mvcc on pager end_tx

Flushing mvcc changes to disk requires serialization. To do so we simply
introduce a lock for pager.end_tx, which will take ownership of flushing
to WAL. Once this is finished we can simply release lock.
This commit is contained in:
Pere Diaz Bou
2025-09-12 14:00:02 +00:00
parent e87226548c
commit 39fb5913e0

View File

@@ -7,14 +7,19 @@ use crate::state_machine::StateTransition;
use crate::state_machine::TransitionResult;
use crate::storage::btree::BTreeCursor;
use crate::storage::btree::BTreeKey;
use crate::storage::btree::CursorValidState;
use crate::storage::sqlite3_ondisk::DatabaseHeader;
use crate::storage::wal::TursoRwLock;
use crate::types::IOResult;
use crate::types::ImmutableRecord;
use crate::Completion;
use crate::IOExt;
use crate::LimboError;
use crate::Result;
use crate::{Connection, Pager};
use crossbeam_skiplist::{SkipMap, SkipSet};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::marker::PhantomData;
@@ -245,20 +250,43 @@ impl AtomicTransactionState {
#[derive(Debug)]
pub enum CommitState {
Initial,
BeginPagerTxn { end_ts: u64 },
WriteRow { end_ts: u64, write_set_index: usize },
WriteRowStateMachine { end_ts: u64, write_set_index: usize },
DeleteRowStateMachine { end_ts: u64, write_set_index: usize },
CommitPagerTxn { end_ts: u64 },
Commit { end_ts: u64 },
BeginPagerTxn {
end_ts: u64,
},
WriteRow {
end_ts: u64,
write_set_index: usize,
requires_seek: bool,
},
WriteRowStateMachine {
end_ts: u64,
write_set_index: usize,
},
DeleteRowStateMachine {
end_ts: u64,
write_set_index: usize,
},
CommitPagerTxn {
end_ts: u64,
},
Commit {
end_ts: u64,
},
}
#[derive(Debug)]
pub enum WriteRowState {
Initial,
CreateCursor,
Seek,
Insert,
/// Move to the next record in order to leave the cursor in the next position, this is used for inserting multiple rows for optimizations.
Next,
}
#[derive(Debug)]
struct CommitCoordinator {
pager_commit_lock: Arc<TursoRwLock>,
commits_waiting: Arc<AtomicU64>,
}
pub struct CommitStateMachine<Clock: LogicalClock> {
@@ -267,9 +295,13 @@ pub struct CommitStateMachine<Clock: LogicalClock> {
pager: Rc<Pager>,
tx_id: TxID,
connection: Arc<Connection>,
/// Write set sorted by table id and row id
write_set: Vec<RowID>,
write_row_state_machine: Option<StateMachine<WriteRowStateMachine>>,
delete_row_state_machine: Option<StateMachine<DeleteRowStateMachine>>,
commit_coordinator: Arc<CommitCoordinator>,
cursors: HashMap<u64, Arc<RwLock<BTreeCursor>>>,
header: Arc<RwLock<Option<DatabaseHeader>>>,
_phantom: PhantomData<Clock>,
}
@@ -285,16 +317,15 @@ impl<Clock: LogicalClock> Debug for CommitStateMachine<Clock> {
pub struct WriteRowStateMachine {
state: WriteRowState,
is_finalized: bool,
pager: Rc<Pager>,
row: Row,
record: Option<ImmutableRecord>,
cursor: Option<BTreeCursor>,
cursor: Arc<RwLock<BTreeCursor>>,
requires_seek: bool,
}
#[derive(Debug)]
pub enum DeleteRowState {
Initial,
CreateCursor,
Seek,
Delete,
}
@@ -302,14 +333,19 @@ pub enum DeleteRowState {
pub struct DeleteRowStateMachine {
state: DeleteRowState,
is_finalized: bool,
pager: Rc<Pager>,
rowid: RowID,
column_count: usize,
cursor: Option<BTreeCursor>,
cursor: Arc<RwLock<BTreeCursor>>,
}
impl<Clock: LogicalClock> CommitStateMachine<Clock> {
fn new(state: CommitState, pager: Rc<Pager>, tx_id: TxID, connection: Arc<Connection>) -> Self {
fn new(
state: CommitState,
pager: Rc<Pager>,
tx_id: TxID,
connection: Arc<Connection>,
commit_coordinator: Arc<CommitCoordinator>,
header: Arc<RwLock<Option<DatabaseHeader>>>,
) -> Self {
Self {
state,
is_finalized: false,
@@ -319,20 +355,23 @@ impl<Clock: LogicalClock> CommitStateMachine<Clock> {
write_set: Vec::new(),
write_row_state_machine: None,
delete_row_state_machine: None,
commit_coordinator,
cursors: HashMap::new(),
header,
_phantom: PhantomData,
}
}
}
impl WriteRowStateMachine {
fn new(pager: Rc<Pager>, row: Row) -> Self {
fn new(row: Row, cursor: Arc<RwLock<BTreeCursor>>, requires_seek: bool) -> Self {
Self {
state: WriteRowState::Initial,
is_finalized: false,
pager,
row,
record: None,
cursor: None,
cursor,
requires_seek,
}
}
}
@@ -441,6 +480,8 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
tracing::trace!("commit_tx(tx_id={})", self.tx_id);
self.write_set
.extend(tx.write_set.iter().map(|v| *v.value()));
self.write_set
.sort_by(|a, b| a.table_id.cmp(&b.table_id).then(a.row_id.cmp(&b.row_id)));
self.state = CommitState::BeginPagerTxn { end_ts };
Ok(TransitionResult::Continue)
}
@@ -453,21 +494,62 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
// If this is the exclusive transaction, we already acquired a write transaction
// on the pager in begin_exclusive_tx() and don't need to acquire it.
if !mvcc_store.is_exclusive_tx(&self.tx_id) {
let result = self.pager.io.block(|| self.pager.begin_write_tx())?;
if let LimboResult::Busy = result {
return Err(LimboError::Busy);
if mvcc_store.is_exclusive_tx(&self.tx_id) {
self.state = CommitState::WriteRow {
end_ts,
write_set_index: 0,
requires_seek: true,
};
return Ok(TransitionResult::Continue);
}
// Currently txns are queued without any heuristics whasoever. This is important because
// we need to ensure writes to disk happen sequentially.
// * We don't want txns to write to WAL in parallel.
// * We don't want BTree modifications to happen in parallel.
// If any of these were to happen, we would find ourselves in a bad corruption situation.
// NOTE: since we are blocking for `begin_write_tx` we do not care about re-entrancy right now.
let locked = self.commit_coordinator.pager_commit_lock.write();
if !locked {
self.commit_coordinator
.commits_waiting
.fetch_add(1, Ordering::SeqCst);
// FIXME: IOCompletions still needs a yield variant...
return Ok(TransitionResult::Io(crate::types::IOCompletions::Single(
Completion::new_dummy(),
)));
}
{
let mut wal = self.pager.wal.as_ref().unwrap().borrow_mut();
// we need to update the max frame to the latest shared max frame in order to avoid snapshot staleness
wal.update_max_frame();
}
// TODO: Force updated header?
{
if let Some(last_commited_header) = self.header.read().as_ref() {
self.pager.io.block(|| {
self.pager.with_header_mut(|header_in_pager| {
header_in_pager.database_size = last_commited_header.database_size;
// TODO: deal with other fields
})
})?;
}
}
let result = self.pager.io.block(|| self.pager.begin_write_tx())?;
if let crate::result::LimboResult::Busy = result {
panic!("Pager write transaction busy, in mvcc this should never happen");
}
self.state = CommitState::WriteRow {
end_ts,
write_set_index: 0,
requires_seek: true,
};
return Ok(TransitionResult::Continue);
}
CommitState::WriteRow {
end_ts,
write_set_index,
requires_seek,
} => {
if write_set_index == self.write_set.len() {
self.state = CommitState::CommitPagerTxn { end_ts };
@@ -480,9 +562,26 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
for row_version in row_versions.iter() {
if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin {
if row_tx_id == self.tx_id {
let state_machine = mvcc_store
.write_row_to_pager(self.pager.clone(), &row_version.row)?;
let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) {
cursor.clone()
} else {
let cursor = BTreeCursor::new_table(
None, // Write directly to B-tree
self.pager.clone(),
id.table_id as usize,
row_version.row.column_count,
);
let cursor = Arc::new(RwLock::new(cursor));
self.cursors.insert(id.table_id, cursor.clone());
cursor
};
let state_machine = mvcc_store.write_row_to_pager(
&row_version.row,
cursor,
requires_seek,
)?;
self.write_row_state_machine = Some(state_machine);
self.state = CommitState::WriteRowStateMachine {
end_ts,
write_set_index,
@@ -493,11 +592,21 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
if let Some(TxTimestampOrID::TxID(row_tx_id)) = row_version.end {
if row_tx_id == self.tx_id {
let column_count = row_version.row.column_count;
let state_machine = mvcc_store.delete_row_from_pager(
self.pager.clone(),
row_version.row.id,
column_count,
)?;
let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) {
cursor.clone()
} else {
let cursor = BTreeCursor::new_table(
None, // Write directly to B-tree
self.pager.clone(),
id.table_id as usize,
column_count,
);
let cursor = Arc::new(RwLock::new(cursor));
self.cursors.insert(id.table_id, cursor.clone());
cursor
};
let state_machine =
mvcc_store.delete_row_from_pager(row_version.row.id, cursor)?;
self.delete_row_state_machine = Some(state_machine);
self.state = CommitState::DeleteRowStateMachine {
end_ts,
@@ -522,9 +631,26 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
return Ok(TransitionResult::Continue);
}
TransitionResult::Done(_) => {
let requires_seek = {
if let Some(next_id) = self.write_set.get(write_set_index + 1) {
let current_id = &self.write_set[write_set_index];
if current_id.table_id == next_id.table_id
&& current_id.row_id + 1 == next_id.row_id
{
// simple optimizaiton for sequential inserts with inceasing by 1 ids
// we should probably just check record in next row and see if it requires seek
false
} else {
true
}
} else {
false
}
};
self.state = CommitState::WriteRow {
end_ts,
write_set_index: write_set_index + 1,
requires_seek,
};
return Ok(TransitionResult::Continue);
}
@@ -544,6 +670,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
self.state = CommitState::WriteRow {
end_ts,
write_set_index: write_set_index + 1,
requires_seek: true,
};
return Ok(TransitionResult::Continue);
}
@@ -553,27 +680,32 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
// Write committed data to pager for persistence
// Flush dirty pages to WAL - this is critical for data persistence
// Similar to what step_end_write_txn does for legacy transactions
loop {
let result = self
.pager
.end_tx(
false, // rollback = false since we're committing
&self.connection,
)
.map_err(|e| LimboError::InternalError(e.to_string()))
.unwrap();
match result {
crate::types::IOResult::Done(_) => {
break;
}
crate::types::IOResult::IO(io) => {
io.wait(self.pager.io.as_ref())?;
continue;
}
let result = self
.pager
.end_tx(
false, // rollback = false since we're committing
&self.connection,
)
.map_err(|e| LimboError::InternalError(e.to_string()))
.unwrap();
match result {
IOResult::Done(_) => {
// FIXME: hack for now to keep database header updated for pager commit
self.pager.io.block(|| {
self.pager.with_header(|header| {
self.header.write().replace(*header);
})
})?;
self.commit_coordinator.pager_commit_lock.unlock();
// TODO: here mark we are ready for a batch
self.state = CommitState::Commit { end_ts };
return Ok(TransitionResult::Continue);
}
IOResult::IO(io) => {
return Ok(TransitionResult::Io(io));
}
}
self.state = CommitState::Commit { end_ts };
Ok(TransitionResult::Continue)
}
CommitState::Commit { end_ts } => {
let mut log_record = LogRecord::new(end_ts);
@@ -648,7 +780,6 @@ impl StateTransition for WriteRowStateMachine {
#[tracing::instrument(fields(state = ?self.state), skip(self, _context))]
fn step(&mut self, _context: &Self::Context) -> Result<TransitionResult<Self::SMResult>> {
use crate::storage::btree::BTreeCursor;
use crate::types::{IOResult, SeekKey, SeekOp};
match self.state {
@@ -658,62 +789,63 @@ impl StateTransition for WriteRowStateMachine {
record.start_serialization(&self.row.data);
self.record = Some(record);
self.state = WriteRowState::CreateCursor;
Ok(TransitionResult::Continue)
}
WriteRowState::CreateCursor => {
// Create the cursor
let root_page = self.row.id.table_id as usize;
let num_columns = self.row.column_count;
let cursor = BTreeCursor::new_table(
None, // Write directly to B-tree
self.pager.clone(),
root_page,
num_columns,
);
self.cursor = Some(cursor);
self.state = WriteRowState::Seek;
if self.requires_seek {
self.state = WriteRowState::Seek;
} else {
self.state = WriteRowState::Insert;
}
Ok(TransitionResult::Continue)
}
WriteRowState::Seek => {
// Position the cursor by seeking to the row position
let seek_key = SeekKey::TableRowId(self.row.id.row_id);
let cursor = self.cursor.as_mut().unwrap();
match cursor.seek(seek_key, SeekOp::GE { eq_only: true })? {
IOResult::Done(_) => {
self.state = WriteRowState::Insert;
Ok(TransitionResult::Continue)
}
match self
.cursor
.write()
.seek(seek_key, SeekOp::GE { eq_only: true })?
{
IOResult::Done(_) => {}
IOResult::IO(io) => {
return Ok(TransitionResult::Io(io));
}
}
assert_eq!(self.cursor.write().valid_state, CursorValidState::Valid);
self.state = WriteRowState::Insert;
Ok(TransitionResult::Continue)
}
WriteRowState::Insert => {
// Insert the record into the B-tree
let cursor = self.cursor.as_mut().unwrap();
let key = BTreeKey::new_table_rowid(self.row.id.row_id, self.record.as_ref());
match cursor
match self
.cursor
.write()
.insert(&key)
.map_err(|e| LimboError::InternalError(e.to_string()))?
.map_err(|e: LimboError| LimboError::InternalError(e.to_string()))?
{
IOResult::Done(()) => {
tracing::trace!(
"write_row_to_pager(table_id={}, row_id={})",
self.row.id.table_id,
self.row.id.row_id
);
self.finalize(&())?;
Ok(TransitionResult::Done(()))
}
IOResult::Done(()) => {}
IOResult::IO(io) => {
return Ok(TransitionResult::Io(io));
}
}
self.state = WriteRowState::Next;
Ok(TransitionResult::Continue)
}
WriteRowState::Next => {
match self
.cursor
.write()
.next()
.map_err(|e: LimboError| LimboError::InternalError(e.to_string()))?
{
IOResult::Done(_) => {}
IOResult::IO(io) => {
return Ok(TransitionResult::Io(io));
}
}
self.finalize(&())?;
Ok(TransitionResult::Done(()))
}
}
}
@@ -734,30 +866,21 @@ impl StateTransition for DeleteRowStateMachine {
#[tracing::instrument(fields(state = ?self.state), skip(self, _context))]
fn step(&mut self, _context: &Self::Context) -> Result<TransitionResult<Self::SMResult>> {
use crate::storage::btree::BTreeCursor;
use crate::types::{IOResult, SeekKey, SeekOp};
match self.state {
DeleteRowState::Initial => {
self.state = DeleteRowState::CreateCursor;
Ok(TransitionResult::Continue)
}
DeleteRowState::CreateCursor => {
let root_page = self.rowid.table_id as usize;
let num_columns = self.column_count;
let cursor =
BTreeCursor::new_table(None, self.pager.clone(), root_page, num_columns);
self.cursor = Some(cursor);
self.state = DeleteRowState::Seek;
Ok(TransitionResult::Continue)
}
DeleteRowState::Seek => {
let seek_key = SeekKey::TableRowId(self.rowid.row_id);
let cursor = self.cursor.as_mut().unwrap();
match cursor.seek(seek_key, SeekOp::GE { eq_only: true })? {
match self
.cursor
.write()
.seek(seek_key, SeekOp::GE { eq_only: true })?
{
IOResult::Done(_) => {
self.state = DeleteRowState::Delete;
Ok(TransitionResult::Continue)
@@ -769,25 +892,25 @@ impl StateTransition for DeleteRowStateMachine {
}
DeleteRowState::Delete => {
// Insert the record into the B-tree
let cursor = self.cursor.as_mut().unwrap();
match cursor
match self
.cursor
.write()
.delete()
.map_err(|e| LimboError::InternalError(e.to_string()))?
{
IOResult::Done(()) => {
tracing::trace!(
"delete_row_from_pager(table_id={}, row_id={})",
self.rowid.table_id,
self.rowid.row_id
);
self.finalize(&())?;
Ok(TransitionResult::Done(()))
}
IOResult::Done(()) => {}
IOResult::IO(io) => {
return Ok(TransitionResult::Io(io));
}
}
tracing::trace!(
"delete_row_from_pager(table_id={}, row_id={})",
self.rowid.table_id,
self.rowid.row_id
);
self.finalize(&())?;
Ok(TransitionResult::Done(()))
}
}
}
@@ -803,14 +926,12 @@ impl StateTransition for DeleteRowStateMachine {
}
impl DeleteRowStateMachine {
fn new(pager: Rc<Pager>, rowid: RowID, column_count: usize) -> Self {
fn new(rowid: RowID, cursor: Arc<RwLock<BTreeCursor>>) -> Self {
Self {
state: DeleteRowState::Initial,
is_finalized: false,
pager,
rowid,
column_count,
cursor: None,
cursor,
}
}
}
@@ -832,6 +953,8 @@ pub struct MvStore<Clock: LogicalClock> {
/// every other MVCC transaction must wait for it to commit before they can commit. We have
/// exclusive transactions to support single-writer semantics for compatibility with SQLite.
exclusive_tx: RwLock<Option<TxID>>,
commit_coordinator: Arc<CommitCoordinator>,
header: Arc<RwLock<Option<DatabaseHeader>>>,
}
impl<Clock: LogicalClock> MvStore<Clock> {
@@ -846,6 +969,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
storage,
loaded_tables: RwLock::new(HashSet::new()),
exclusive_tx: RwLock::new(None),
commit_coordinator: Arc::new(CommitCoordinator {
pager_commit_lock: Arc::new(TursoRwLock::new()),
commits_waiting: Arc::new(AtomicU64::new(0)),
}),
header: Arc::new(RwLock::new(None)),
}
}
@@ -1134,14 +1262,18 @@ impl<Clock: LogicalClock> MvStore<Clock> {
}
LimboResult::Ok => {}
}
let locked = self.commit_coordinator.pager_commit_lock.write();
if !locked {
self.release_exclusive_tx(&tx_id);
pager.end_read_tx()?;
return Err(LimboError::Busy);
}
// Try to acquire the pager write lock
match return_if_io!(pager.begin_write_tx()) {
LimboResult::Busy => {
tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id);
// Failed to get pager lock - release our exclusive lock
self.release_exclusive_tx(&tx_id);
pager.end_read_tx()?;
return Err(LimboError::Busy);
panic!("begin_exclusive_tx: tx_id={tx_id} failed with Busy, this should never happen as we were able to lock mvcc exclusive write lock");
}
LimboResult::Ok => {
let tx = Transaction::new(tx_id, begin_ts);
@@ -1191,11 +1323,15 @@ impl<Clock: LogicalClock> MvStore<Clock> {
connection: &Arc<Connection>,
) -> Result<StateMachine<CommitStateMachine<Clock>>> {
tracing::trace!("commit_tx(tx_id={})", tx_id);
let state_machine: StateMachine<CommitStateMachine<Clock>> = StateMachine::<
CommitStateMachine<Clock>,
>::new(
CommitStateMachine::new(CommitState::Initial, pager, tx_id, connection.clone()),
);
let state_machine: StateMachine<CommitStateMachine<Clock>> =
StateMachine::<CommitStateMachine<Clock>>::new(CommitStateMachine::new(
CommitState::Initial,
pager,
tx_id,
connection.clone(),
self.commit_coordinator.clone(),
self.header.clone(),
));
Ok(state_machine)
}
@@ -1343,6 +1479,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
}
// Extracts the begin timestamp from a transaction
#[inline]
fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 {
match ts_or_id {
TxTimestampOrID::Timestamp(ts) => *ts,
@@ -1367,13 +1504,13 @@ impl<Clock: LogicalClock> MvStore<Clock> {
// another data structure, e.g. a BTreeSet. If it proves to be too quadratic empirically,
// we can either switch to a tree-like structure, or at least use partition_point()
// which performs a binary search for the insertion point.
let position = versions
.iter()
.rposition(|v| {
self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin)
})
.map(|p| p + 1)
.unwrap_or(0);
let mut position = 0_usize;
for (i, v) in versions.iter().rev().enumerate() {
if self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin) {
position = i + 1;
break;
}
}
if versions.len() - position > 3 {
tracing::debug!(
"Inserting a row version {} positions from the end",
@@ -1385,13 +1522,15 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pub fn write_row_to_pager(
&self,
pager: Rc<Pager>,
row: &Row,
cursor: Arc<RwLock<BTreeCursor>>,
requires_seek: bool,
) -> Result<StateMachine<WriteRowStateMachine>> {
let state_machine: StateMachine<WriteRowStateMachine> =
StateMachine::<WriteRowStateMachine>::new(WriteRowStateMachine::new(
pager,
row.clone(),
cursor,
requires_seek,
));
Ok(state_machine)
@@ -1399,15 +1538,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pub fn delete_row_from_pager(
&self,
pager: Rc<Pager>,
rowid: RowID,
column_count: usize,
cursor: Arc<RwLock<BTreeCursor>>,
) -> Result<StateMachine<DeleteRowStateMachine>> {
let state_machine: StateMachine<DeleteRowStateMachine> = StateMachine::<
DeleteRowStateMachine,
>::new(
DeleteRowStateMachine::new(pager, rowid, column_count),
);
let state_machine: StateMachine<DeleteRowStateMachine> =
StateMachine::<DeleteRowStateMachine>::new(DeleteRowStateMachine::new(rowid, cursor));
Ok(state_machine)
}