mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-06 09:44:21 +01:00
Merge 'core/mvcc: queue mvcc txns on pager's end_tx' from Pere Diaz Bou
Flushing mvcc changes to disk requires serialization. To do so we simply introduce a lock for pager.end_tx, which will take ownership of flushing to WAL. Once this is finished we can simply release lock. When multiple tx writes happen concurrently in mvcc, max frame will be updated. This new max_frame makes is the point of view of the other transaction return busy because his current wal snapshot is outdated. Closes #3059
This commit is contained in:
@@ -7,14 +7,19 @@ use crate::state_machine::StateTransition;
|
||||
use crate::state_machine::TransitionResult;
|
||||
use crate::storage::btree::BTreeCursor;
|
||||
use crate::storage::btree::BTreeKey;
|
||||
use crate::storage::btree::CursorValidState;
|
||||
use crate::storage::sqlite3_ondisk::DatabaseHeader;
|
||||
use crate::storage::wal::TursoRwLock;
|
||||
use crate::types::IOResult;
|
||||
use crate::types::ImmutableRecord;
|
||||
use crate::Completion;
|
||||
use crate::IOExt;
|
||||
use crate::LimboError;
|
||||
use crate::Result;
|
||||
use crate::{Connection, Pager};
|
||||
use crossbeam_skiplist::{SkipMap, SkipSet};
|
||||
use parking_lot::RwLock;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
@@ -245,20 +250,43 @@ impl AtomicTransactionState {
|
||||
#[derive(Debug)]
|
||||
pub enum CommitState {
|
||||
Initial,
|
||||
BeginPagerTxn { end_ts: u64 },
|
||||
WriteRow { end_ts: u64, write_set_index: usize },
|
||||
WriteRowStateMachine { end_ts: u64, write_set_index: usize },
|
||||
DeleteRowStateMachine { end_ts: u64, write_set_index: usize },
|
||||
CommitPagerTxn { end_ts: u64 },
|
||||
Commit { end_ts: u64 },
|
||||
BeginPagerTxn {
|
||||
end_ts: u64,
|
||||
},
|
||||
WriteRow {
|
||||
end_ts: u64,
|
||||
write_set_index: usize,
|
||||
requires_seek: bool,
|
||||
},
|
||||
WriteRowStateMachine {
|
||||
end_ts: u64,
|
||||
write_set_index: usize,
|
||||
},
|
||||
DeleteRowStateMachine {
|
||||
end_ts: u64,
|
||||
write_set_index: usize,
|
||||
},
|
||||
CommitPagerTxn {
|
||||
end_ts: u64,
|
||||
},
|
||||
Commit {
|
||||
end_ts: u64,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum WriteRowState {
|
||||
Initial,
|
||||
CreateCursor,
|
||||
Seek,
|
||||
Insert,
|
||||
/// Move to the next record in order to leave the cursor in the next position, this is used for inserting multiple rows for optimizations.
|
||||
Next,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct CommitCoordinator {
|
||||
pager_commit_lock: Arc<TursoRwLock>,
|
||||
commits_waiting: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
pub struct CommitStateMachine<Clock: LogicalClock> {
|
||||
@@ -267,9 +295,13 @@ pub struct CommitStateMachine<Clock: LogicalClock> {
|
||||
pager: Rc<Pager>,
|
||||
tx_id: TxID,
|
||||
connection: Arc<Connection>,
|
||||
/// Write set sorted by table id and row id
|
||||
write_set: Vec<RowID>,
|
||||
write_row_state_machine: Option<StateMachine<WriteRowStateMachine>>,
|
||||
delete_row_state_machine: Option<StateMachine<DeleteRowStateMachine>>,
|
||||
commit_coordinator: Arc<CommitCoordinator>,
|
||||
cursors: HashMap<u64, Arc<RwLock<BTreeCursor>>>,
|
||||
header: Arc<RwLock<Option<DatabaseHeader>>>,
|
||||
_phantom: PhantomData<Clock>,
|
||||
}
|
||||
|
||||
@@ -285,16 +317,15 @@ impl<Clock: LogicalClock> Debug for CommitStateMachine<Clock> {
|
||||
pub struct WriteRowStateMachine {
|
||||
state: WriteRowState,
|
||||
is_finalized: bool,
|
||||
pager: Rc<Pager>,
|
||||
row: Row,
|
||||
record: Option<ImmutableRecord>,
|
||||
cursor: Option<BTreeCursor>,
|
||||
cursor: Arc<RwLock<BTreeCursor>>,
|
||||
requires_seek: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DeleteRowState {
|
||||
Initial,
|
||||
CreateCursor,
|
||||
Seek,
|
||||
Delete,
|
||||
}
|
||||
@@ -302,14 +333,19 @@ pub enum DeleteRowState {
|
||||
pub struct DeleteRowStateMachine {
|
||||
state: DeleteRowState,
|
||||
is_finalized: bool,
|
||||
pager: Rc<Pager>,
|
||||
rowid: RowID,
|
||||
column_count: usize,
|
||||
cursor: Option<BTreeCursor>,
|
||||
cursor: Arc<RwLock<BTreeCursor>>,
|
||||
}
|
||||
|
||||
impl<Clock: LogicalClock> CommitStateMachine<Clock> {
|
||||
fn new(state: CommitState, pager: Rc<Pager>, tx_id: TxID, connection: Arc<Connection>) -> Self {
|
||||
fn new(
|
||||
state: CommitState,
|
||||
pager: Rc<Pager>,
|
||||
tx_id: TxID,
|
||||
connection: Arc<Connection>,
|
||||
commit_coordinator: Arc<CommitCoordinator>,
|
||||
header: Arc<RwLock<Option<DatabaseHeader>>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
state,
|
||||
is_finalized: false,
|
||||
@@ -319,20 +355,23 @@ impl<Clock: LogicalClock> CommitStateMachine<Clock> {
|
||||
write_set: Vec::new(),
|
||||
write_row_state_machine: None,
|
||||
delete_row_state_machine: None,
|
||||
commit_coordinator,
|
||||
cursors: HashMap::new(),
|
||||
header,
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WriteRowStateMachine {
|
||||
fn new(pager: Rc<Pager>, row: Row) -> Self {
|
||||
fn new(row: Row, cursor: Arc<RwLock<BTreeCursor>>, requires_seek: bool) -> Self {
|
||||
Self {
|
||||
state: WriteRowState::Initial,
|
||||
is_finalized: false,
|
||||
pager,
|
||||
row,
|
||||
record: None,
|
||||
cursor: None,
|
||||
cursor,
|
||||
requires_seek,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -441,6 +480,8 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
|
||||
tracing::trace!("commit_tx(tx_id={})", self.tx_id);
|
||||
self.write_set
|
||||
.extend(tx.write_set.iter().map(|v| *v.value()));
|
||||
self.write_set
|
||||
.sort_by(|a, b| a.table_id.cmp(&b.table_id).then(a.row_id.cmp(&b.row_id)));
|
||||
self.state = CommitState::BeginPagerTxn { end_ts };
|
||||
Ok(TransitionResult::Continue)
|
||||
}
|
||||
@@ -453,21 +494,62 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
|
||||
|
||||
// If this is the exclusive transaction, we already acquired a write transaction
|
||||
// on the pager in begin_exclusive_tx() and don't need to acquire it.
|
||||
if !mvcc_store.is_exclusive_tx(&self.tx_id) {
|
||||
let result = self.pager.io.block(|| self.pager.begin_write_tx())?;
|
||||
if let LimboResult::Busy = result {
|
||||
return Err(LimboError::Busy);
|
||||
if mvcc_store.is_exclusive_tx(&self.tx_id) {
|
||||
self.state = CommitState::WriteRow {
|
||||
end_ts,
|
||||
write_set_index: 0,
|
||||
requires_seek: true,
|
||||
};
|
||||
return Ok(TransitionResult::Continue);
|
||||
}
|
||||
// Currently txns are queued without any heuristics whasoever. This is important because
|
||||
// we need to ensure writes to disk happen sequentially.
|
||||
// * We don't want txns to write to WAL in parallel.
|
||||
// * We don't want BTree modifications to happen in parallel.
|
||||
// If any of these were to happen, we would find ourselves in a bad corruption situation.
|
||||
|
||||
// NOTE: since we are blocking for `begin_write_tx` we do not care about re-entrancy right now.
|
||||
let locked = self.commit_coordinator.pager_commit_lock.write();
|
||||
if !locked {
|
||||
self.commit_coordinator
|
||||
.commits_waiting
|
||||
.fetch_add(1, Ordering::SeqCst);
|
||||
// FIXME: IOCompletions still needs a yield variant...
|
||||
return Ok(TransitionResult::Io(crate::types::IOCompletions::Single(
|
||||
Completion::new_dummy(),
|
||||
)));
|
||||
}
|
||||
{
|
||||
let mut wal = self.pager.wal.as_ref().unwrap().borrow_mut();
|
||||
// we need to update the max frame to the latest shared max frame in order to avoid snapshot staleness
|
||||
wal.update_max_frame();
|
||||
}
|
||||
// TODO: Force updated header?
|
||||
{
|
||||
if let Some(last_commited_header) = self.header.read().as_ref() {
|
||||
self.pager.io.block(|| {
|
||||
self.pager.with_header_mut(|header_in_pager| {
|
||||
header_in_pager.database_size = last_commited_header.database_size;
|
||||
// TODO: deal with other fields
|
||||
})
|
||||
})?;
|
||||
}
|
||||
}
|
||||
let result = self.pager.io.block(|| self.pager.begin_write_tx())?;
|
||||
if let crate::result::LimboResult::Busy = result {
|
||||
panic!("Pager write transaction busy, in mvcc this should never happen");
|
||||
}
|
||||
self.state = CommitState::WriteRow {
|
||||
end_ts,
|
||||
write_set_index: 0,
|
||||
requires_seek: true,
|
||||
};
|
||||
return Ok(TransitionResult::Continue);
|
||||
}
|
||||
CommitState::WriteRow {
|
||||
end_ts,
|
||||
write_set_index,
|
||||
requires_seek,
|
||||
} => {
|
||||
if write_set_index == self.write_set.len() {
|
||||
self.state = CommitState::CommitPagerTxn { end_ts };
|
||||
@@ -480,9 +562,26 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
|
||||
for row_version in row_versions.iter() {
|
||||
if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin {
|
||||
if row_tx_id == self.tx_id {
|
||||
let state_machine = mvcc_store
|
||||
.write_row_to_pager(self.pager.clone(), &row_version.row)?;
|
||||
let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) {
|
||||
cursor.clone()
|
||||
} else {
|
||||
let cursor = BTreeCursor::new_table(
|
||||
None, // Write directly to B-tree
|
||||
self.pager.clone(),
|
||||
id.table_id as usize,
|
||||
row_version.row.column_count,
|
||||
);
|
||||
let cursor = Arc::new(RwLock::new(cursor));
|
||||
self.cursors.insert(id.table_id, cursor.clone());
|
||||
cursor
|
||||
};
|
||||
let state_machine = mvcc_store.write_row_to_pager(
|
||||
&row_version.row,
|
||||
cursor,
|
||||
requires_seek,
|
||||
)?;
|
||||
self.write_row_state_machine = Some(state_machine);
|
||||
|
||||
self.state = CommitState::WriteRowStateMachine {
|
||||
end_ts,
|
||||
write_set_index,
|
||||
@@ -493,11 +592,21 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
|
||||
if let Some(TxTimestampOrID::TxID(row_tx_id)) = row_version.end {
|
||||
if row_tx_id == self.tx_id {
|
||||
let column_count = row_version.row.column_count;
|
||||
let state_machine = mvcc_store.delete_row_from_pager(
|
||||
self.pager.clone(),
|
||||
row_version.row.id,
|
||||
column_count,
|
||||
)?;
|
||||
let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) {
|
||||
cursor.clone()
|
||||
} else {
|
||||
let cursor = BTreeCursor::new_table(
|
||||
None, // Write directly to B-tree
|
||||
self.pager.clone(),
|
||||
id.table_id as usize,
|
||||
column_count,
|
||||
);
|
||||
let cursor = Arc::new(RwLock::new(cursor));
|
||||
self.cursors.insert(id.table_id, cursor.clone());
|
||||
cursor
|
||||
};
|
||||
let state_machine =
|
||||
mvcc_store.delete_row_from_pager(row_version.row.id, cursor)?;
|
||||
self.delete_row_state_machine = Some(state_machine);
|
||||
self.state = CommitState::DeleteRowStateMachine {
|
||||
end_ts,
|
||||
@@ -522,9 +631,26 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
|
||||
return Ok(TransitionResult::Continue);
|
||||
}
|
||||
TransitionResult::Done(_) => {
|
||||
let requires_seek = {
|
||||
if let Some(next_id) = self.write_set.get(write_set_index + 1) {
|
||||
let current_id = &self.write_set[write_set_index];
|
||||
if current_id.table_id == next_id.table_id
|
||||
&& current_id.row_id + 1 == next_id.row_id
|
||||
{
|
||||
// simple optimizaiton for sequential inserts with inceasing by 1 ids
|
||||
// we should probably just check record in next row and see if it requires seek
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
};
|
||||
self.state = CommitState::WriteRow {
|
||||
end_ts,
|
||||
write_set_index: write_set_index + 1,
|
||||
requires_seek,
|
||||
};
|
||||
return Ok(TransitionResult::Continue);
|
||||
}
|
||||
@@ -544,6 +670,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
|
||||
self.state = CommitState::WriteRow {
|
||||
end_ts,
|
||||
write_set_index: write_set_index + 1,
|
||||
requires_seek: true,
|
||||
};
|
||||
return Ok(TransitionResult::Continue);
|
||||
}
|
||||
@@ -553,27 +680,32 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
|
||||
// Write committed data to pager for persistence
|
||||
// Flush dirty pages to WAL - this is critical for data persistence
|
||||
// Similar to what step_end_write_txn does for legacy transactions
|
||||
loop {
|
||||
let result = self
|
||||
.pager
|
||||
.end_tx(
|
||||
false, // rollback = false since we're committing
|
||||
&self.connection,
|
||||
)
|
||||
.map_err(|e| LimboError::InternalError(e.to_string()))
|
||||
.unwrap();
|
||||
match result {
|
||||
crate::types::IOResult::Done(_) => {
|
||||
break;
|
||||
}
|
||||
crate::types::IOResult::IO(io) => {
|
||||
io.wait(self.pager.io.as_ref())?;
|
||||
continue;
|
||||
}
|
||||
|
||||
let result = self
|
||||
.pager
|
||||
.end_tx(
|
||||
false, // rollback = false since we're committing
|
||||
&self.connection,
|
||||
)
|
||||
.map_err(|e| LimboError::InternalError(e.to_string()))
|
||||
.unwrap();
|
||||
match result {
|
||||
IOResult::Done(_) => {
|
||||
// FIXME: hack for now to keep database header updated for pager commit
|
||||
self.pager.io.block(|| {
|
||||
self.pager.with_header(|header| {
|
||||
self.header.write().replace(*header);
|
||||
})
|
||||
})?;
|
||||
self.commit_coordinator.pager_commit_lock.unlock();
|
||||
// TODO: here mark we are ready for a batch
|
||||
self.state = CommitState::Commit { end_ts };
|
||||
return Ok(TransitionResult::Continue);
|
||||
}
|
||||
IOResult::IO(io) => {
|
||||
return Ok(TransitionResult::Io(io));
|
||||
}
|
||||
}
|
||||
self.state = CommitState::Commit { end_ts };
|
||||
Ok(TransitionResult::Continue)
|
||||
}
|
||||
CommitState::Commit { end_ts } => {
|
||||
let mut log_record = LogRecord::new(end_ts);
|
||||
@@ -648,7 +780,6 @@ impl StateTransition for WriteRowStateMachine {
|
||||
|
||||
#[tracing::instrument(fields(state = ?self.state), skip(self, _context))]
|
||||
fn step(&mut self, _context: &Self::Context) -> Result<TransitionResult<Self::SMResult>> {
|
||||
use crate::storage::btree::BTreeCursor;
|
||||
use crate::types::{IOResult, SeekKey, SeekOp};
|
||||
|
||||
match self.state {
|
||||
@@ -658,62 +789,63 @@ impl StateTransition for WriteRowStateMachine {
|
||||
record.start_serialization(&self.row.data);
|
||||
self.record = Some(record);
|
||||
|
||||
self.state = WriteRowState::CreateCursor;
|
||||
Ok(TransitionResult::Continue)
|
||||
}
|
||||
WriteRowState::CreateCursor => {
|
||||
// Create the cursor
|
||||
let root_page = self.row.id.table_id as usize;
|
||||
let num_columns = self.row.column_count;
|
||||
|
||||
let cursor = BTreeCursor::new_table(
|
||||
None, // Write directly to B-tree
|
||||
self.pager.clone(),
|
||||
root_page,
|
||||
num_columns,
|
||||
);
|
||||
self.cursor = Some(cursor);
|
||||
|
||||
self.state = WriteRowState::Seek;
|
||||
if self.requires_seek {
|
||||
self.state = WriteRowState::Seek;
|
||||
} else {
|
||||
self.state = WriteRowState::Insert;
|
||||
}
|
||||
Ok(TransitionResult::Continue)
|
||||
}
|
||||
WriteRowState::Seek => {
|
||||
// Position the cursor by seeking to the row position
|
||||
let seek_key = SeekKey::TableRowId(self.row.id.row_id);
|
||||
let cursor = self.cursor.as_mut().unwrap();
|
||||
|
||||
match cursor.seek(seek_key, SeekOp::GE { eq_only: true })? {
|
||||
IOResult::Done(_) => {
|
||||
self.state = WriteRowState::Insert;
|
||||
Ok(TransitionResult::Continue)
|
||||
}
|
||||
match self
|
||||
.cursor
|
||||
.write()
|
||||
.seek(seek_key, SeekOp::GE { eq_only: true })?
|
||||
{
|
||||
IOResult::Done(_) => {}
|
||||
IOResult::IO(io) => {
|
||||
return Ok(TransitionResult::Io(io));
|
||||
}
|
||||
}
|
||||
assert_eq!(self.cursor.write().valid_state, CursorValidState::Valid);
|
||||
self.state = WriteRowState::Insert;
|
||||
Ok(TransitionResult::Continue)
|
||||
}
|
||||
WriteRowState::Insert => {
|
||||
// Insert the record into the B-tree
|
||||
let cursor = self.cursor.as_mut().unwrap();
|
||||
let key = BTreeKey::new_table_rowid(self.row.id.row_id, self.record.as_ref());
|
||||
|
||||
match cursor
|
||||
match self
|
||||
.cursor
|
||||
.write()
|
||||
.insert(&key)
|
||||
.map_err(|e| LimboError::InternalError(e.to_string()))?
|
||||
.map_err(|e: LimboError| LimboError::InternalError(e.to_string()))?
|
||||
{
|
||||
IOResult::Done(()) => {
|
||||
tracing::trace!(
|
||||
"write_row_to_pager(table_id={}, row_id={})",
|
||||
self.row.id.table_id,
|
||||
self.row.id.row_id
|
||||
);
|
||||
self.finalize(&())?;
|
||||
Ok(TransitionResult::Done(()))
|
||||
}
|
||||
IOResult::Done(()) => {}
|
||||
IOResult::IO(io) => {
|
||||
return Ok(TransitionResult::Io(io));
|
||||
}
|
||||
}
|
||||
self.state = WriteRowState::Next;
|
||||
Ok(TransitionResult::Continue)
|
||||
}
|
||||
WriteRowState::Next => {
|
||||
match self
|
||||
.cursor
|
||||
.write()
|
||||
.next()
|
||||
.map_err(|e: LimboError| LimboError::InternalError(e.to_string()))?
|
||||
{
|
||||
IOResult::Done(_) => {}
|
||||
IOResult::IO(io) => {
|
||||
return Ok(TransitionResult::Io(io));
|
||||
}
|
||||
}
|
||||
self.finalize(&())?;
|
||||
Ok(TransitionResult::Done(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -734,30 +866,21 @@ impl StateTransition for DeleteRowStateMachine {
|
||||
|
||||
#[tracing::instrument(fields(state = ?self.state), skip(self, _context))]
|
||||
fn step(&mut self, _context: &Self::Context) -> Result<TransitionResult<Self::SMResult>> {
|
||||
use crate::storage::btree::BTreeCursor;
|
||||
use crate::types::{IOResult, SeekKey, SeekOp};
|
||||
|
||||
match self.state {
|
||||
DeleteRowState::Initial => {
|
||||
self.state = DeleteRowState::CreateCursor;
|
||||
Ok(TransitionResult::Continue)
|
||||
}
|
||||
DeleteRowState::CreateCursor => {
|
||||
let root_page = self.rowid.table_id as usize;
|
||||
let num_columns = self.column_count;
|
||||
|
||||
let cursor =
|
||||
BTreeCursor::new_table(None, self.pager.clone(), root_page, num_columns);
|
||||
self.cursor = Some(cursor);
|
||||
|
||||
self.state = DeleteRowState::Seek;
|
||||
Ok(TransitionResult::Continue)
|
||||
}
|
||||
DeleteRowState::Seek => {
|
||||
let seek_key = SeekKey::TableRowId(self.rowid.row_id);
|
||||
let cursor = self.cursor.as_mut().unwrap();
|
||||
|
||||
match cursor.seek(seek_key, SeekOp::GE { eq_only: true })? {
|
||||
match self
|
||||
.cursor
|
||||
.write()
|
||||
.seek(seek_key, SeekOp::GE { eq_only: true })?
|
||||
{
|
||||
IOResult::Done(_) => {
|
||||
self.state = DeleteRowState::Delete;
|
||||
Ok(TransitionResult::Continue)
|
||||
@@ -769,25 +892,25 @@ impl StateTransition for DeleteRowStateMachine {
|
||||
}
|
||||
DeleteRowState::Delete => {
|
||||
// Insert the record into the B-tree
|
||||
let cursor = self.cursor.as_mut().unwrap();
|
||||
|
||||
match cursor
|
||||
match self
|
||||
.cursor
|
||||
.write()
|
||||
.delete()
|
||||
.map_err(|e| LimboError::InternalError(e.to_string()))?
|
||||
{
|
||||
IOResult::Done(()) => {
|
||||
tracing::trace!(
|
||||
"delete_row_from_pager(table_id={}, row_id={})",
|
||||
self.rowid.table_id,
|
||||
self.rowid.row_id
|
||||
);
|
||||
self.finalize(&())?;
|
||||
Ok(TransitionResult::Done(()))
|
||||
}
|
||||
IOResult::Done(()) => {}
|
||||
IOResult::IO(io) => {
|
||||
return Ok(TransitionResult::Io(io));
|
||||
}
|
||||
}
|
||||
tracing::trace!(
|
||||
"delete_row_from_pager(table_id={}, row_id={})",
|
||||
self.rowid.table_id,
|
||||
self.rowid.row_id
|
||||
);
|
||||
self.finalize(&())?;
|
||||
Ok(TransitionResult::Done(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -803,14 +926,12 @@ impl StateTransition for DeleteRowStateMachine {
|
||||
}
|
||||
|
||||
impl DeleteRowStateMachine {
|
||||
fn new(pager: Rc<Pager>, rowid: RowID, column_count: usize) -> Self {
|
||||
fn new(rowid: RowID, cursor: Arc<RwLock<BTreeCursor>>) -> Self {
|
||||
Self {
|
||||
state: DeleteRowState::Initial,
|
||||
is_finalized: false,
|
||||
pager,
|
||||
rowid,
|
||||
column_count,
|
||||
cursor: None,
|
||||
cursor,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -832,6 +953,8 @@ pub struct MvStore<Clock: LogicalClock> {
|
||||
/// every other MVCC transaction must wait for it to commit before they can commit. We have
|
||||
/// exclusive transactions to support single-writer semantics for compatibility with SQLite.
|
||||
exclusive_tx: RwLock<Option<TxID>>,
|
||||
commit_coordinator: Arc<CommitCoordinator>,
|
||||
header: Arc<RwLock<Option<DatabaseHeader>>>,
|
||||
}
|
||||
|
||||
impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
@@ -846,6 +969,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
storage,
|
||||
loaded_tables: RwLock::new(HashSet::new()),
|
||||
exclusive_tx: RwLock::new(None),
|
||||
commit_coordinator: Arc::new(CommitCoordinator {
|
||||
pager_commit_lock: Arc::new(TursoRwLock::new()),
|
||||
commits_waiting: Arc::new(AtomicU64::new(0)),
|
||||
}),
|
||||
header: Arc::new(RwLock::new(None)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1134,14 +1262,18 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
}
|
||||
LimboResult::Ok => {}
|
||||
}
|
||||
let locked = self.commit_coordinator.pager_commit_lock.write();
|
||||
if !locked {
|
||||
self.release_exclusive_tx(&tx_id);
|
||||
pager.end_read_tx()?;
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
// Try to acquire the pager write lock
|
||||
match return_if_io!(pager.begin_write_tx()) {
|
||||
LimboResult::Busy => {
|
||||
tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id);
|
||||
// Failed to get pager lock - release our exclusive lock
|
||||
self.release_exclusive_tx(&tx_id);
|
||||
pager.end_read_tx()?;
|
||||
return Err(LimboError::Busy);
|
||||
panic!("begin_exclusive_tx: tx_id={tx_id} failed with Busy, this should never happen as we were able to lock mvcc exclusive write lock");
|
||||
}
|
||||
LimboResult::Ok => {
|
||||
let tx = Transaction::new(tx_id, begin_ts);
|
||||
@@ -1191,11 +1323,15 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
connection: &Arc<Connection>,
|
||||
) -> Result<StateMachine<CommitStateMachine<Clock>>> {
|
||||
tracing::trace!("commit_tx(tx_id={})", tx_id);
|
||||
let state_machine: StateMachine<CommitStateMachine<Clock>> = StateMachine::<
|
||||
CommitStateMachine<Clock>,
|
||||
>::new(
|
||||
CommitStateMachine::new(CommitState::Initial, pager, tx_id, connection.clone()),
|
||||
);
|
||||
let state_machine: StateMachine<CommitStateMachine<Clock>> =
|
||||
StateMachine::<CommitStateMachine<Clock>>::new(CommitStateMachine::new(
|
||||
CommitState::Initial,
|
||||
pager,
|
||||
tx_id,
|
||||
connection.clone(),
|
||||
self.commit_coordinator.clone(),
|
||||
self.header.clone(),
|
||||
));
|
||||
Ok(state_machine)
|
||||
}
|
||||
|
||||
@@ -1343,6 +1479,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
}
|
||||
|
||||
// Extracts the begin timestamp from a transaction
|
||||
#[inline]
|
||||
fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 {
|
||||
match ts_or_id {
|
||||
TxTimestampOrID::Timestamp(ts) => *ts,
|
||||
@@ -1367,13 +1504,13 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
// another data structure, e.g. a BTreeSet. If it proves to be too quadratic empirically,
|
||||
// we can either switch to a tree-like structure, or at least use partition_point()
|
||||
// which performs a binary search for the insertion point.
|
||||
let position = versions
|
||||
.iter()
|
||||
.rposition(|v| {
|
||||
self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin)
|
||||
})
|
||||
.map(|p| p + 1)
|
||||
.unwrap_or(0);
|
||||
let mut position = 0_usize;
|
||||
for (i, v) in versions.iter().rev().enumerate() {
|
||||
if self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin) {
|
||||
position = i + 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if versions.len() - position > 3 {
|
||||
tracing::debug!(
|
||||
"Inserting a row version {} positions from the end",
|
||||
@@ -1385,13 +1522,15 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
|
||||
pub fn write_row_to_pager(
|
||||
&self,
|
||||
pager: Rc<Pager>,
|
||||
row: &Row,
|
||||
cursor: Arc<RwLock<BTreeCursor>>,
|
||||
requires_seek: bool,
|
||||
) -> Result<StateMachine<WriteRowStateMachine>> {
|
||||
let state_machine: StateMachine<WriteRowStateMachine> =
|
||||
StateMachine::<WriteRowStateMachine>::new(WriteRowStateMachine::new(
|
||||
pager,
|
||||
row.clone(),
|
||||
cursor,
|
||||
requires_seek,
|
||||
));
|
||||
|
||||
Ok(state_machine)
|
||||
@@ -1399,15 +1538,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
|
||||
pub fn delete_row_from_pager(
|
||||
&self,
|
||||
pager: Rc<Pager>,
|
||||
rowid: RowID,
|
||||
column_count: usize,
|
||||
cursor: Arc<RwLock<BTreeCursor>>,
|
||||
) -> Result<StateMachine<DeleteRowStateMachine>> {
|
||||
let state_machine: StateMachine<DeleteRowStateMachine> = StateMachine::<
|
||||
DeleteRowStateMachine,
|
||||
>::new(
|
||||
DeleteRowStateMachine::new(pager, rowid, column_count),
|
||||
);
|
||||
let state_machine: StateMachine<DeleteRowStateMachine> =
|
||||
StateMachine::<DeleteRowStateMachine>::new(DeleteRowStateMachine::new(rowid, cursor));
|
||||
|
||||
Ok(state_machine)
|
||||
}
|
||||
|
||||
@@ -1235,10 +1235,21 @@ fn test_commit_without_tx() {
|
||||
fn get_rows(conn: &Arc<Connection>, query: &str) -> Vec<Vec<Value>> {
|
||||
let mut stmt = conn.prepare(query).unwrap();
|
||||
let mut rows = Vec::new();
|
||||
while let StepResult::Row = stmt.step().unwrap() {
|
||||
let row = stmt.row().unwrap();
|
||||
let values = row.get_values().cloned().collect::<Vec<_>>();
|
||||
rows.push(values);
|
||||
loop {
|
||||
match stmt.step().unwrap() {
|
||||
StepResult::Row => {
|
||||
let row = stmt.row().unwrap();
|
||||
let values = row.get_values().cloned().collect::<Vec<_>>();
|
||||
rows.push(values);
|
||||
}
|
||||
StepResult::Done => break,
|
||||
StepResult::IO => {
|
||||
stmt.run_once().unwrap();
|
||||
}
|
||||
StepResult::Interrupt | StepResult::Busy => {
|
||||
panic!("unexpected step result");
|
||||
}
|
||||
}
|
||||
}
|
||||
rows
|
||||
}
|
||||
@@ -1252,17 +1263,21 @@ fn test_concurrent_writes() {
|
||||
current_statement: Option<Statement>,
|
||||
}
|
||||
let db = MvccTestDbNoConn::new_with_random_db();
|
||||
let mut connecitons = Vec::new();
|
||||
let mut connections = Vec::new();
|
||||
{
|
||||
let conn = db.connect();
|
||||
conn.execute("CREATE TABLE test (x)").unwrap();
|
||||
conn.close().unwrap();
|
||||
}
|
||||
for i in 0..2 {
|
||||
let num_connections = 20;
|
||||
let num_inserts_per_connection = 10000;
|
||||
for i in 0..num_connections {
|
||||
let conn = db.connect();
|
||||
let mut inserts = ((100 * i)..(100 * (i + 1))).collect::<Vec<_>>();
|
||||
let mut inserts = ((num_inserts_per_connection * i)
|
||||
..(num_inserts_per_connection * (i + 1)))
|
||||
.collect::<Vec<i64>>();
|
||||
inserts.reverse();
|
||||
connecitons.push(ConnectionState {
|
||||
connections.push(ConnectionState {
|
||||
conn,
|
||||
inserts,
|
||||
current_statement: None,
|
||||
@@ -1271,14 +1286,14 @@ fn test_concurrent_writes() {
|
||||
|
||||
loop {
|
||||
let mut all_finished = true;
|
||||
for conn in &mut connecitons {
|
||||
if !conn.inserts.is_empty() && conn.current_statement.is_none() {
|
||||
for conn in &mut connections {
|
||||
if !conn.inserts.is_empty() || conn.current_statement.is_some() {
|
||||
all_finished = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (conn_id, conn) in connecitons.iter_mut().enumerate() {
|
||||
println!("connection {conn_id} inserts: {:?}", conn.inserts);
|
||||
for (conn_id, conn) in connections.iter_mut().enumerate() {
|
||||
// println!("connection {conn_id} inserts: {:?}", conn.inserts);
|
||||
if conn.current_statement.is_none() && !conn.inserts.is_empty() {
|
||||
let write = conn.inserts.pop().unwrap();
|
||||
println!("inserting row {write} from connection {conn_id}");
|
||||
@@ -1291,6 +1306,7 @@ fn test_concurrent_writes() {
|
||||
if conn.current_statement.is_none() {
|
||||
continue;
|
||||
}
|
||||
println!("connection step {conn_id}");
|
||||
let stmt = conn.current_statement.as_mut().unwrap();
|
||||
match stmt.step().unwrap() {
|
||||
// These you be only possible cases in write concurrency.
|
||||
@@ -1298,11 +1314,17 @@ fn test_concurrent_writes() {
|
||||
// No interrupt because insert doesn't interrupt
|
||||
// No busy because insert in mvcc should be multi concurrent write
|
||||
StepResult::Done => {
|
||||
println!("connection {conn_id} done");
|
||||
conn.current_statement = None;
|
||||
}
|
||||
StepResult::IO => {
|
||||
// let's skip doing I/O here, we want to perform io only after all the statements are stepped
|
||||
}
|
||||
StepResult::Busy => {
|
||||
println!("connection {conn_id} busy");
|
||||
// stmt.reprepare().unwrap();
|
||||
unreachable!();
|
||||
}
|
||||
_ => {
|
||||
unreachable!()
|
||||
}
|
||||
@@ -1311,7 +1333,51 @@ fn test_concurrent_writes() {
|
||||
db.get_db().io.step().unwrap();
|
||||
|
||||
if all_finished {
|
||||
println!("all finished");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Now let's find out if we wrote everything we intended to write.
|
||||
let conn = db.connect();
|
||||
let rows = get_rows(&conn, "SELECT * FROM test ORDER BY x ASC");
|
||||
assert_eq!(
|
||||
rows.len() as i64,
|
||||
num_connections * num_inserts_per_connection
|
||||
);
|
||||
for (row_id, row) in rows.iter().enumerate() {
|
||||
assert_eq!(row[0].as_int().unwrap(), row_id as i64);
|
||||
}
|
||||
conn.close().unwrap();
|
||||
}
|
||||
|
||||
fn generate_batched_insert(num_inserts: usize) -> String {
|
||||
let mut inserts = String::from("INSERT INTO test (x) VALUES ");
|
||||
for i in 0..num_inserts {
|
||||
inserts.push_str(&format!("({i})"));
|
||||
if i < num_inserts - 1 {
|
||||
inserts.push(',');
|
||||
}
|
||||
}
|
||||
inserts.push(';');
|
||||
inserts
|
||||
}
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_batch_writes() {
|
||||
let mut start = 0;
|
||||
let mut end = 5000;
|
||||
while start < end {
|
||||
let i = ((end - start) / 2) + start;
|
||||
let db = MvccTestDbNoConn::new_with_random_db();
|
||||
let conn = db.connect();
|
||||
conn.execute("CREATE TABLE test (x)").unwrap();
|
||||
let inserts = generate_batched_insert(i);
|
||||
if conn.execute(inserts.clone()).is_err() {
|
||||
end = i;
|
||||
} else {
|
||||
start = i + 1;
|
||||
}
|
||||
}
|
||||
println!("start: {start} end: {end}");
|
||||
}
|
||||
|
||||
@@ -306,6 +306,11 @@ pub trait Wal: Debug {
|
||||
|
||||
fn set_io_context(&mut self, ctx: IOContext);
|
||||
|
||||
/// Update the max frame to the current shared max frame.
|
||||
/// Currently this is only used for MVCC as it takes care of write conflicts on its own.
|
||||
/// This should't be used with regular WAL mode.
|
||||
fn update_max_frame(&mut self);
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
fn as_any(&self) -> &dyn std::any::Any;
|
||||
}
|
||||
@@ -1596,6 +1601,11 @@ impl Wal for WalFile {
|
||||
fn set_io_context(&mut self, ctx: IOContext) {
|
||||
self.io_ctx.replace(ctx);
|
||||
}
|
||||
|
||||
fn update_max_frame(&mut self) {
|
||||
let new_max_frame = self.get_shared().max_frame.load(Ordering::Acquire);
|
||||
self.max_frame = new_max_frame;
|
||||
}
|
||||
}
|
||||
|
||||
impl WalFile {
|
||||
|
||||
@@ -602,7 +602,7 @@ impl Program {
|
||||
}
|
||||
if let Some(err) = io.get_error() {
|
||||
let err = err.into();
|
||||
handle_program_error(&pager, &self.connection, &err)?;
|
||||
handle_program_error(&pager, &self.connection, &err, mv_store.as_ref())?;
|
||||
return Err(err);
|
||||
}
|
||||
state.io_completions = None;
|
||||
@@ -645,7 +645,7 @@ impl Program {
|
||||
return Ok(StepResult::Busy);
|
||||
}
|
||||
Err(err) => {
|
||||
handle_program_error(&pager, &self.connection, &err)?;
|
||||
handle_program_error(&pager, &self.connection, &err, mv_store.as_ref())?;
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
@@ -771,12 +771,16 @@ impl Program {
|
||||
// Reset state for next use
|
||||
program_state.view_delta_state = ViewDeltaCommitState::NotStarted;
|
||||
|
||||
if self.connection.transaction_state.get() == TransactionState::None && mv_store.is_none() {
|
||||
if self.connection.transaction_state.get() == TransactionState::None {
|
||||
// No need to do any work here if not in tx. Current MVCC logic doesn't work with this assumption,
|
||||
// hence the mv_store.is_none() check.
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
if let Some(mv_store) = mv_store {
|
||||
if self.connection.is_nested_stmt.get() {
|
||||
// We don't want to commit on nested statements. Let parent handle it.
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
let conn = self.connection.clone();
|
||||
let auto_commit = conn.auto_commit.get();
|
||||
if auto_commit {
|
||||
@@ -1031,6 +1035,7 @@ pub fn handle_program_error(
|
||||
pager: &Rc<Pager>,
|
||||
connection: &Connection,
|
||||
err: &LimboError,
|
||||
mv_store: Option<&Arc<MvStore>>,
|
||||
) -> Result<()> {
|
||||
if connection.is_nested_stmt.get() {
|
||||
// Errors from nested statements are handled by the parent statement.
|
||||
@@ -1042,12 +1047,18 @@ pub fn handle_program_error(
|
||||
// Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback.
|
||||
LimboError::TableLocked => {}
|
||||
_ => {
|
||||
pager
|
||||
.io
|
||||
.block(|| pager.end_tx(true, connection))
|
||||
.inspect_err(|e| {
|
||||
tracing::error!("end_tx failed: {e}");
|
||||
})?;
|
||||
if let Some(mv_store) = mv_store {
|
||||
if let Some(tx_id) = connection.mv_tx_id.get() {
|
||||
mv_store.rollback_tx(tx_id, pager.clone());
|
||||
}
|
||||
} else {
|
||||
pager
|
||||
.io
|
||||
.block(|| pager.end_tx(true, connection))
|
||||
.inspect_err(|e| {
|
||||
tracing::error!("end_tx failed: {e}");
|
||||
})?;
|
||||
}
|
||||
connection.transaction_state.replace(TransactionState::None);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user