From 66b56308701caac2840fd2bfad9070859379b8c9 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 12 Sep 2025 13:47:45 +0000 Subject: [PATCH 1/4] vdbe/mvcc: rollback mvcc txn on vdbe error --- core/vdbe/mod.rs | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 2ca090ff8..62fad1c38 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -602,7 +602,7 @@ impl Program { } if let Some(err) = io.get_error() { let err = err.into(); - handle_program_error(&pager, &self.connection, &err)?; + handle_program_error(&pager, &self.connection, &err, mv_store.as_ref())?; return Err(err); } state.io_completions = None; @@ -645,7 +645,7 @@ impl Program { return Ok(StepResult::Busy); } Err(err) => { - handle_program_error(&pager, &self.connection, &err)?; + handle_program_error(&pager, &self.connection, &err, mv_store.as_ref())?; return Err(err); } } @@ -771,12 +771,16 @@ impl Program { // Reset state for next use program_state.view_delta_state = ViewDeltaCommitState::NotStarted; - if self.connection.transaction_state.get() == TransactionState::None && mv_store.is_none() { + if self.connection.transaction_state.get() == TransactionState::None { // No need to do any work here if not in tx. Current MVCC logic doesn't work with this assumption, // hence the mv_store.is_none() check. return Ok(IOResult::Done(())); } if let Some(mv_store) = mv_store { + if self.connection.is_nested_stmt.get() { + // We don't want to commit on nested statements. Let parent handle it. + return Ok(IOResult::Done(())); + } let conn = self.connection.clone(); let auto_commit = conn.auto_commit.get(); if auto_commit { @@ -1031,6 +1035,7 @@ pub fn handle_program_error( pager: &Rc, connection: &Connection, err: &LimboError, + mv_store: Option<&Arc>, ) -> Result<()> { if connection.is_nested_stmt.get() { // Errors from nested statements are handled by the parent statement. @@ -1042,12 +1047,18 @@ pub fn handle_program_error( // Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback. LimboError::TableLocked => {} _ => { - pager - .io - .block(|| pager.end_tx(true, connection)) - .inspect_err(|e| { - tracing::error!("end_tx failed: {e}"); - })?; + if let Some(mv_store) = mv_store { + if let Some(tx_id) = connection.mv_tx_id.get() { + mv_store.rollback_tx(tx_id, pager.clone()); + } + } else { + pager + .io + .block(|| pager.end_tx(true, connection)) + .inspect_err(|e| { + tracing::error!("end_tx failed: {e}"); + })?; + } connection.transaction_state.replace(TransactionState::None); } } From 9b6d181be460f9eb111fc2c162641138334e45e6 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 12 Sep 2025 13:49:14 +0000 Subject: [PATCH 2/4] wal: add hacky update max frame for mvcc use When multiple tx writes happen concurrently in mvcc, max frame will be updated. This new max_frame makes is the point of view of the other transaction return busy because his current wal snapshot is outdated. --- core/storage/wal.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 28e55bfbd..dff796790 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -306,6 +306,11 @@ pub trait Wal: Debug { fn set_io_context(&mut self, ctx: IOContext); + /// Update the max frame to the current shared max frame. + /// Currently this is only used for MVCC as it takes care of write conflicts on its own. + /// This should't be used with regular WAL mode. + fn update_max_frame(&mut self); + #[cfg(debug_assertions)] fn as_any(&self) -> &dyn std::any::Any; } @@ -1596,6 +1601,11 @@ impl Wal for WalFile { fn set_io_context(&mut self, ctx: IOContext) { self.io_ctx.replace(ctx); } + + fn update_max_frame(&mut self) { + let new_max_frame = self.get_shared().max_frame.load(Ordering::Acquire); + self.max_frame = new_max_frame; + } } impl WalFile { From e87226548ccd595eb97ca21b63e9eb11207f0b35 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 12 Sep 2025 13:49:40 +0000 Subject: [PATCH 3/4] core/mvcc: fix concurrent tests mvcc --- core/mvcc/database/tests.rs | 90 ++++++++++++++++++++++++++++++++----- 1 file changed, 78 insertions(+), 12 deletions(-) diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index f5d7f7d08..a348d89d0 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -1235,10 +1235,21 @@ fn test_commit_without_tx() { fn get_rows(conn: &Arc, query: &str) -> Vec> { let mut stmt = conn.prepare(query).unwrap(); let mut rows = Vec::new(); - while let StepResult::Row = stmt.step().unwrap() { - let row = stmt.row().unwrap(); - let values = row.get_values().cloned().collect::>(); - rows.push(values); + loop { + match stmt.step().unwrap() { + StepResult::Row => { + let row = stmt.row().unwrap(); + let values = row.get_values().cloned().collect::>(); + rows.push(values); + } + StepResult::Done => break, + StepResult::IO => { + stmt.run_once().unwrap(); + } + StepResult::Interrupt | StepResult::Busy => { + panic!("unexpected step result"); + } + } } rows } @@ -1252,17 +1263,21 @@ fn test_concurrent_writes() { current_statement: Option, } let db = MvccTestDbNoConn::new_with_random_db(); - let mut connecitons = Vec::new(); + let mut connections = Vec::new(); { let conn = db.connect(); conn.execute("CREATE TABLE test (x)").unwrap(); conn.close().unwrap(); } - for i in 0..2 { + let num_connections = 20; + let num_inserts_per_connection = 10000; + for i in 0..num_connections { let conn = db.connect(); - let mut inserts = ((100 * i)..(100 * (i + 1))).collect::>(); + let mut inserts = ((num_inserts_per_connection * i) + ..(num_inserts_per_connection * (i + 1))) + .collect::>(); inserts.reverse(); - connecitons.push(ConnectionState { + connections.push(ConnectionState { conn, inserts, current_statement: None, @@ -1271,14 +1286,14 @@ fn test_concurrent_writes() { loop { let mut all_finished = true; - for conn in &mut connecitons { - if !conn.inserts.is_empty() && conn.current_statement.is_none() { + for conn in &mut connections { + if !conn.inserts.is_empty() || conn.current_statement.is_some() { all_finished = false; break; } } - for (conn_id, conn) in connecitons.iter_mut().enumerate() { - println!("connection {conn_id} inserts: {:?}", conn.inserts); + for (conn_id, conn) in connections.iter_mut().enumerate() { + // println!("connection {conn_id} inserts: {:?}", conn.inserts); if conn.current_statement.is_none() && !conn.inserts.is_empty() { let write = conn.inserts.pop().unwrap(); println!("inserting row {write} from connection {conn_id}"); @@ -1291,6 +1306,7 @@ fn test_concurrent_writes() { if conn.current_statement.is_none() { continue; } + println!("connection step {conn_id}"); let stmt = conn.current_statement.as_mut().unwrap(); match stmt.step().unwrap() { // These you be only possible cases in write concurrency. @@ -1298,11 +1314,17 @@ fn test_concurrent_writes() { // No interrupt because insert doesn't interrupt // No busy because insert in mvcc should be multi concurrent write StepResult::Done => { + println!("connection {conn_id} done"); conn.current_statement = None; } StepResult::IO => { // let's skip doing I/O here, we want to perform io only after all the statements are stepped } + StepResult::Busy => { + println!("connection {conn_id} busy"); + // stmt.reprepare().unwrap(); + unreachable!(); + } _ => { unreachable!() } @@ -1311,7 +1333,51 @@ fn test_concurrent_writes() { db.get_db().io.step().unwrap(); if all_finished { + println!("all finished"); break; } } + + // Now let's find out if we wrote everything we intended to write. + let conn = db.connect(); + let rows = get_rows(&conn, "SELECT * FROM test ORDER BY x ASC"); + assert_eq!( + rows.len() as i64, + num_connections * num_inserts_per_connection + ); + for (row_id, row) in rows.iter().enumerate() { + assert_eq!(row[0].as_int().unwrap(), row_id as i64); + } + conn.close().unwrap(); +} + +fn generate_batched_insert(num_inserts: usize) -> String { + let mut inserts = String::from("INSERT INTO test (x) VALUES "); + for i in 0..num_inserts { + inserts.push_str(&format!("({i})")); + if i < num_inserts - 1 { + inserts.push(','); + } + } + inserts.push(';'); + inserts +} +#[test] +#[ignore] +fn test_batch_writes() { + let mut start = 0; + let mut end = 5000; + while start < end { + let i = ((end - start) / 2) + start; + let db = MvccTestDbNoConn::new_with_random_db(); + let conn = db.connect(); + conn.execute("CREATE TABLE test (x)").unwrap(); + let inserts = generate_batched_insert(i); + if conn.execute(inserts.clone()).is_err() { + end = i; + } else { + start = i + 1; + } + } + println!("start: {start} end: {end}"); } From 39fb5913e07803375c224bf2f089bfe9671c80da Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 12 Sep 2025 14:00:02 +0000 Subject: [PATCH 4/4] core/mvcc: queue write txn commits in mvcc on pager end_tx Flushing mvcc changes to disk requires serialization. To do so we simply introduce a lock for pager.end_tx, which will take ownership of flushing to WAL. Once this is finished we can simply release lock. --- core/mvcc/database/mod.rs | 407 +++++++++++++++++++++++++------------- 1 file changed, 271 insertions(+), 136 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index dc67120f9..08548a94b 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -7,14 +7,19 @@ use crate::state_machine::StateTransition; use crate::state_machine::TransitionResult; use crate::storage::btree::BTreeCursor; use crate::storage::btree::BTreeKey; +use crate::storage::btree::CursorValidState; +use crate::storage::sqlite3_ondisk::DatabaseHeader; +use crate::storage::wal::TursoRwLock; use crate::types::IOResult; use crate::types::ImmutableRecord; +use crate::Completion; use crate::IOExt; use crate::LimboError; use crate::Result; use crate::{Connection, Pager}; use crossbeam_skiplist::{SkipMap, SkipSet}; use parking_lot::RwLock; +use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Debug; use std::marker::PhantomData; @@ -245,20 +250,43 @@ impl AtomicTransactionState { #[derive(Debug)] pub enum CommitState { Initial, - BeginPagerTxn { end_ts: u64 }, - WriteRow { end_ts: u64, write_set_index: usize }, - WriteRowStateMachine { end_ts: u64, write_set_index: usize }, - DeleteRowStateMachine { end_ts: u64, write_set_index: usize }, - CommitPagerTxn { end_ts: u64 }, - Commit { end_ts: u64 }, + BeginPagerTxn { + end_ts: u64, + }, + WriteRow { + end_ts: u64, + write_set_index: usize, + requires_seek: bool, + }, + WriteRowStateMachine { + end_ts: u64, + write_set_index: usize, + }, + DeleteRowStateMachine { + end_ts: u64, + write_set_index: usize, + }, + CommitPagerTxn { + end_ts: u64, + }, + Commit { + end_ts: u64, + }, } #[derive(Debug)] pub enum WriteRowState { Initial, - CreateCursor, Seek, Insert, + /// Move to the next record in order to leave the cursor in the next position, this is used for inserting multiple rows for optimizations. + Next, +} + +#[derive(Debug)] +struct CommitCoordinator { + pager_commit_lock: Arc, + commits_waiting: Arc, } pub struct CommitStateMachine { @@ -267,9 +295,13 @@ pub struct CommitStateMachine { pager: Rc, tx_id: TxID, connection: Arc, + /// Write set sorted by table id and row id write_set: Vec, write_row_state_machine: Option>, delete_row_state_machine: Option>, + commit_coordinator: Arc, + cursors: HashMap>>, + header: Arc>>, _phantom: PhantomData, } @@ -285,16 +317,15 @@ impl Debug for CommitStateMachine { pub struct WriteRowStateMachine { state: WriteRowState, is_finalized: bool, - pager: Rc, row: Row, record: Option, - cursor: Option, + cursor: Arc>, + requires_seek: bool, } #[derive(Debug)] pub enum DeleteRowState { Initial, - CreateCursor, Seek, Delete, } @@ -302,14 +333,19 @@ pub enum DeleteRowState { pub struct DeleteRowStateMachine { state: DeleteRowState, is_finalized: bool, - pager: Rc, rowid: RowID, - column_count: usize, - cursor: Option, + cursor: Arc>, } impl CommitStateMachine { - fn new(state: CommitState, pager: Rc, tx_id: TxID, connection: Arc) -> Self { + fn new( + state: CommitState, + pager: Rc, + tx_id: TxID, + connection: Arc, + commit_coordinator: Arc, + header: Arc>>, + ) -> Self { Self { state, is_finalized: false, @@ -319,20 +355,23 @@ impl CommitStateMachine { write_set: Vec::new(), write_row_state_machine: None, delete_row_state_machine: None, + commit_coordinator, + cursors: HashMap::new(), + header, _phantom: PhantomData, } } } impl WriteRowStateMachine { - fn new(pager: Rc, row: Row) -> Self { + fn new(row: Row, cursor: Arc>, requires_seek: bool) -> Self { Self { state: WriteRowState::Initial, is_finalized: false, - pager, row, record: None, - cursor: None, + cursor, + requires_seek, } } } @@ -441,6 +480,8 @@ impl StateTransition for CommitStateMachine { tracing::trace!("commit_tx(tx_id={})", self.tx_id); self.write_set .extend(tx.write_set.iter().map(|v| *v.value())); + self.write_set + .sort_by(|a, b| a.table_id.cmp(&b.table_id).then(a.row_id.cmp(&b.row_id))); self.state = CommitState::BeginPagerTxn { end_ts }; Ok(TransitionResult::Continue) } @@ -453,21 +494,62 @@ impl StateTransition for CommitStateMachine { // If this is the exclusive transaction, we already acquired a write transaction // on the pager in begin_exclusive_tx() and don't need to acquire it. - if !mvcc_store.is_exclusive_tx(&self.tx_id) { - let result = self.pager.io.block(|| self.pager.begin_write_tx())?; - if let LimboResult::Busy = result { - return Err(LimboError::Busy); + if mvcc_store.is_exclusive_tx(&self.tx_id) { + self.state = CommitState::WriteRow { + end_ts, + write_set_index: 0, + requires_seek: true, + }; + return Ok(TransitionResult::Continue); + } + // Currently txns are queued without any heuristics whasoever. This is important because + // we need to ensure writes to disk happen sequentially. + // * We don't want txns to write to WAL in parallel. + // * We don't want BTree modifications to happen in parallel. + // If any of these were to happen, we would find ourselves in a bad corruption situation. + + // NOTE: since we are blocking for `begin_write_tx` we do not care about re-entrancy right now. + let locked = self.commit_coordinator.pager_commit_lock.write(); + if !locked { + self.commit_coordinator + .commits_waiting + .fetch_add(1, Ordering::SeqCst); + // FIXME: IOCompletions still needs a yield variant... + return Ok(TransitionResult::Io(crate::types::IOCompletions::Single( + Completion::new_dummy(), + ))); + } + { + let mut wal = self.pager.wal.as_ref().unwrap().borrow_mut(); + // we need to update the max frame to the latest shared max frame in order to avoid snapshot staleness + wal.update_max_frame(); + } + // TODO: Force updated header? + { + if let Some(last_commited_header) = self.header.read().as_ref() { + self.pager.io.block(|| { + self.pager.with_header_mut(|header_in_pager| { + header_in_pager.database_size = last_commited_header.database_size; + // TODO: deal with other fields + }) + })?; } } + let result = self.pager.io.block(|| self.pager.begin_write_tx())?; + if let crate::result::LimboResult::Busy = result { + panic!("Pager write transaction busy, in mvcc this should never happen"); + } self.state = CommitState::WriteRow { end_ts, write_set_index: 0, + requires_seek: true, }; return Ok(TransitionResult::Continue); } CommitState::WriteRow { end_ts, write_set_index, + requires_seek, } => { if write_set_index == self.write_set.len() { self.state = CommitState::CommitPagerTxn { end_ts }; @@ -480,9 +562,26 @@ impl StateTransition for CommitStateMachine { for row_version in row_versions.iter() { if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin { if row_tx_id == self.tx_id { - let state_machine = mvcc_store - .write_row_to_pager(self.pager.clone(), &row_version.row)?; + let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) { + cursor.clone() + } else { + let cursor = BTreeCursor::new_table( + None, // Write directly to B-tree + self.pager.clone(), + id.table_id as usize, + row_version.row.column_count, + ); + let cursor = Arc::new(RwLock::new(cursor)); + self.cursors.insert(id.table_id, cursor.clone()); + cursor + }; + let state_machine = mvcc_store.write_row_to_pager( + &row_version.row, + cursor, + requires_seek, + )?; self.write_row_state_machine = Some(state_machine); + self.state = CommitState::WriteRowStateMachine { end_ts, write_set_index, @@ -493,11 +592,21 @@ impl StateTransition for CommitStateMachine { if let Some(TxTimestampOrID::TxID(row_tx_id)) = row_version.end { if row_tx_id == self.tx_id { let column_count = row_version.row.column_count; - let state_machine = mvcc_store.delete_row_from_pager( - self.pager.clone(), - row_version.row.id, - column_count, - )?; + let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) { + cursor.clone() + } else { + let cursor = BTreeCursor::new_table( + None, // Write directly to B-tree + self.pager.clone(), + id.table_id as usize, + column_count, + ); + let cursor = Arc::new(RwLock::new(cursor)); + self.cursors.insert(id.table_id, cursor.clone()); + cursor + }; + let state_machine = + mvcc_store.delete_row_from_pager(row_version.row.id, cursor)?; self.delete_row_state_machine = Some(state_machine); self.state = CommitState::DeleteRowStateMachine { end_ts, @@ -522,9 +631,26 @@ impl StateTransition for CommitStateMachine { return Ok(TransitionResult::Continue); } TransitionResult::Done(_) => { + let requires_seek = { + if let Some(next_id) = self.write_set.get(write_set_index + 1) { + let current_id = &self.write_set[write_set_index]; + if current_id.table_id == next_id.table_id + && current_id.row_id + 1 == next_id.row_id + { + // simple optimizaiton for sequential inserts with inceasing by 1 ids + // we should probably just check record in next row and see if it requires seek + false + } else { + true + } + } else { + false + } + }; self.state = CommitState::WriteRow { end_ts, write_set_index: write_set_index + 1, + requires_seek, }; return Ok(TransitionResult::Continue); } @@ -544,6 +670,7 @@ impl StateTransition for CommitStateMachine { self.state = CommitState::WriteRow { end_ts, write_set_index: write_set_index + 1, + requires_seek: true, }; return Ok(TransitionResult::Continue); } @@ -553,27 +680,32 @@ impl StateTransition for CommitStateMachine { // Write committed data to pager for persistence // Flush dirty pages to WAL - this is critical for data persistence // Similar to what step_end_write_txn does for legacy transactions - loop { - let result = self - .pager - .end_tx( - false, // rollback = false since we're committing - &self.connection, - ) - .map_err(|e| LimboError::InternalError(e.to_string())) - .unwrap(); - match result { - crate::types::IOResult::Done(_) => { - break; - } - crate::types::IOResult::IO(io) => { - io.wait(self.pager.io.as_ref())?; - continue; - } + + let result = self + .pager + .end_tx( + false, // rollback = false since we're committing + &self.connection, + ) + .map_err(|e| LimboError::InternalError(e.to_string())) + .unwrap(); + match result { + IOResult::Done(_) => { + // FIXME: hack for now to keep database header updated for pager commit + self.pager.io.block(|| { + self.pager.with_header(|header| { + self.header.write().replace(*header); + }) + })?; + self.commit_coordinator.pager_commit_lock.unlock(); + // TODO: here mark we are ready for a batch + self.state = CommitState::Commit { end_ts }; + return Ok(TransitionResult::Continue); + } + IOResult::IO(io) => { + return Ok(TransitionResult::Io(io)); } } - self.state = CommitState::Commit { end_ts }; - Ok(TransitionResult::Continue) } CommitState::Commit { end_ts } => { let mut log_record = LogRecord::new(end_ts); @@ -648,7 +780,6 @@ impl StateTransition for WriteRowStateMachine { #[tracing::instrument(fields(state = ?self.state), skip(self, _context))] fn step(&mut self, _context: &Self::Context) -> Result> { - use crate::storage::btree::BTreeCursor; use crate::types::{IOResult, SeekKey, SeekOp}; match self.state { @@ -658,62 +789,63 @@ impl StateTransition for WriteRowStateMachine { record.start_serialization(&self.row.data); self.record = Some(record); - self.state = WriteRowState::CreateCursor; - Ok(TransitionResult::Continue) - } - WriteRowState::CreateCursor => { - // Create the cursor - let root_page = self.row.id.table_id as usize; - let num_columns = self.row.column_count; - - let cursor = BTreeCursor::new_table( - None, // Write directly to B-tree - self.pager.clone(), - root_page, - num_columns, - ); - self.cursor = Some(cursor); - - self.state = WriteRowState::Seek; + if self.requires_seek { + self.state = WriteRowState::Seek; + } else { + self.state = WriteRowState::Insert; + } Ok(TransitionResult::Continue) } WriteRowState::Seek => { // Position the cursor by seeking to the row position let seek_key = SeekKey::TableRowId(self.row.id.row_id); - let cursor = self.cursor.as_mut().unwrap(); - match cursor.seek(seek_key, SeekOp::GE { eq_only: true })? { - IOResult::Done(_) => { - self.state = WriteRowState::Insert; - Ok(TransitionResult::Continue) - } + match self + .cursor + .write() + .seek(seek_key, SeekOp::GE { eq_only: true })? + { + IOResult::Done(_) => {} IOResult::IO(io) => { return Ok(TransitionResult::Io(io)); } } + assert_eq!(self.cursor.write().valid_state, CursorValidState::Valid); + self.state = WriteRowState::Insert; + Ok(TransitionResult::Continue) } WriteRowState::Insert => { // Insert the record into the B-tree - let cursor = self.cursor.as_mut().unwrap(); let key = BTreeKey::new_table_rowid(self.row.id.row_id, self.record.as_ref()); - match cursor + match self + .cursor + .write() .insert(&key) - .map_err(|e| LimboError::InternalError(e.to_string()))? + .map_err(|e: LimboError| LimboError::InternalError(e.to_string()))? { - IOResult::Done(()) => { - tracing::trace!( - "write_row_to_pager(table_id={}, row_id={})", - self.row.id.table_id, - self.row.id.row_id - ); - self.finalize(&())?; - Ok(TransitionResult::Done(())) - } + IOResult::Done(()) => {} IOResult::IO(io) => { return Ok(TransitionResult::Io(io)); } } + self.state = WriteRowState::Next; + Ok(TransitionResult::Continue) + } + WriteRowState::Next => { + match self + .cursor + .write() + .next() + .map_err(|e: LimboError| LimboError::InternalError(e.to_string()))? + { + IOResult::Done(_) => {} + IOResult::IO(io) => { + return Ok(TransitionResult::Io(io)); + } + } + self.finalize(&())?; + Ok(TransitionResult::Done(())) } } } @@ -734,30 +866,21 @@ impl StateTransition for DeleteRowStateMachine { #[tracing::instrument(fields(state = ?self.state), skip(self, _context))] fn step(&mut self, _context: &Self::Context) -> Result> { - use crate::storage::btree::BTreeCursor; use crate::types::{IOResult, SeekKey, SeekOp}; match self.state { DeleteRowState::Initial => { - self.state = DeleteRowState::CreateCursor; - Ok(TransitionResult::Continue) - } - DeleteRowState::CreateCursor => { - let root_page = self.rowid.table_id as usize; - let num_columns = self.column_count; - - let cursor = - BTreeCursor::new_table(None, self.pager.clone(), root_page, num_columns); - self.cursor = Some(cursor); - self.state = DeleteRowState::Seek; Ok(TransitionResult::Continue) } DeleteRowState::Seek => { let seek_key = SeekKey::TableRowId(self.rowid.row_id); - let cursor = self.cursor.as_mut().unwrap(); - match cursor.seek(seek_key, SeekOp::GE { eq_only: true })? { + match self + .cursor + .write() + .seek(seek_key, SeekOp::GE { eq_only: true })? + { IOResult::Done(_) => { self.state = DeleteRowState::Delete; Ok(TransitionResult::Continue) @@ -769,25 +892,25 @@ impl StateTransition for DeleteRowStateMachine { } DeleteRowState::Delete => { // Insert the record into the B-tree - let cursor = self.cursor.as_mut().unwrap(); - match cursor + match self + .cursor + .write() .delete() .map_err(|e| LimboError::InternalError(e.to_string()))? { - IOResult::Done(()) => { - tracing::trace!( - "delete_row_from_pager(table_id={}, row_id={})", - self.rowid.table_id, - self.rowid.row_id - ); - self.finalize(&())?; - Ok(TransitionResult::Done(())) - } + IOResult::Done(()) => {} IOResult::IO(io) => { return Ok(TransitionResult::Io(io)); } } + tracing::trace!( + "delete_row_from_pager(table_id={}, row_id={})", + self.rowid.table_id, + self.rowid.row_id + ); + self.finalize(&())?; + Ok(TransitionResult::Done(())) } } } @@ -803,14 +926,12 @@ impl StateTransition for DeleteRowStateMachine { } impl DeleteRowStateMachine { - fn new(pager: Rc, rowid: RowID, column_count: usize) -> Self { + fn new(rowid: RowID, cursor: Arc>) -> Self { Self { state: DeleteRowState::Initial, is_finalized: false, - pager, rowid, - column_count, - cursor: None, + cursor, } } } @@ -832,6 +953,8 @@ pub struct MvStore { /// every other MVCC transaction must wait for it to commit before they can commit. We have /// exclusive transactions to support single-writer semantics for compatibility with SQLite. exclusive_tx: RwLock>, + commit_coordinator: Arc, + header: Arc>>, } impl MvStore { @@ -846,6 +969,11 @@ impl MvStore { storage, loaded_tables: RwLock::new(HashSet::new()), exclusive_tx: RwLock::new(None), + commit_coordinator: Arc::new(CommitCoordinator { + pager_commit_lock: Arc::new(TursoRwLock::new()), + commits_waiting: Arc::new(AtomicU64::new(0)), + }), + header: Arc::new(RwLock::new(None)), } } @@ -1134,14 +1262,18 @@ impl MvStore { } LimboResult::Ok => {} } + let locked = self.commit_coordinator.pager_commit_lock.write(); + if !locked { + self.release_exclusive_tx(&tx_id); + pager.end_read_tx()?; + return Err(LimboError::Busy); + } // Try to acquire the pager write lock match return_if_io!(pager.begin_write_tx()) { LimboResult::Busy => { tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id); // Failed to get pager lock - release our exclusive lock - self.release_exclusive_tx(&tx_id); - pager.end_read_tx()?; - return Err(LimboError::Busy); + panic!("begin_exclusive_tx: tx_id={tx_id} failed with Busy, this should never happen as we were able to lock mvcc exclusive write lock"); } LimboResult::Ok => { let tx = Transaction::new(tx_id, begin_ts); @@ -1191,11 +1323,15 @@ impl MvStore { connection: &Arc, ) -> Result>> { tracing::trace!("commit_tx(tx_id={})", tx_id); - let state_machine: StateMachine> = StateMachine::< - CommitStateMachine, - >::new( - CommitStateMachine::new(CommitState::Initial, pager, tx_id, connection.clone()), - ); + let state_machine: StateMachine> = + StateMachine::>::new(CommitStateMachine::new( + CommitState::Initial, + pager, + tx_id, + connection.clone(), + self.commit_coordinator.clone(), + self.header.clone(), + )); Ok(state_machine) } @@ -1343,6 +1479,7 @@ impl MvStore { } // Extracts the begin timestamp from a transaction + #[inline] fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 { match ts_or_id { TxTimestampOrID::Timestamp(ts) => *ts, @@ -1367,13 +1504,13 @@ impl MvStore { // another data structure, e.g. a BTreeSet. If it proves to be too quadratic empirically, // we can either switch to a tree-like structure, or at least use partition_point() // which performs a binary search for the insertion point. - let position = versions - .iter() - .rposition(|v| { - self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin) - }) - .map(|p| p + 1) - .unwrap_or(0); + let mut position = 0_usize; + for (i, v) in versions.iter().rev().enumerate() { + if self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin) { + position = i + 1; + break; + } + } if versions.len() - position > 3 { tracing::debug!( "Inserting a row version {} positions from the end", @@ -1385,13 +1522,15 @@ impl MvStore { pub fn write_row_to_pager( &self, - pager: Rc, row: &Row, + cursor: Arc>, + requires_seek: bool, ) -> Result> { let state_machine: StateMachine = StateMachine::::new(WriteRowStateMachine::new( - pager, row.clone(), + cursor, + requires_seek, )); Ok(state_machine) @@ -1399,15 +1538,11 @@ impl MvStore { pub fn delete_row_from_pager( &self, - pager: Rc, rowid: RowID, - column_count: usize, + cursor: Arc>, ) -> Result> { - let state_machine: StateMachine = StateMachine::< - DeleteRowStateMachine, - >::new( - DeleteRowStateMachine::new(pager, rowid, column_count), - ); + let state_machine: StateMachine = + StateMachine::::new(DeleteRowStateMachine::new(rowid, cursor)); Ok(state_machine) }