From ea04e9033a2c91b0c749ffaccfc85c153d52541e Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 21 Oct 2025 13:38:00 +0200 Subject: [PATCH] core/mvcc: add btree_cursor under MVCC cursor --- core/incremental/compiler.rs | 19 +- core/incremental/cursor.rs | 14 +- core/incremental/operator.rs | 195 +++++++---------- core/mvcc/cursor.rs | 23 +- .../mvcc/database/checkpoint_state_machine.rs | 9 +- core/mvcc/database/mod.rs | 1 - core/mvcc/database/tests.rs | 5 + core/schema.rs | 2 +- core/storage/btree.rs | 202 ++++-------------- core/vdbe/execute.rs | 154 +++++++------ 10 files changed, 246 insertions(+), 378 deletions(-) diff --git a/core/incremental/compiler.rs b/core/incremental/compiler.rs index 98cc29896..a319bb9ad 100644 --- a/core/incremental/compiler.rs +++ b/core/incremental/compiler.rs @@ -490,15 +490,10 @@ impl DbspCircuit { ) -> Result> { if let Some(root_id) = self.root { // Create temporary cursors for execute (non-commit) operations - let table_cursor = BTreeCursor::new_table( - None, - pager.clone(), - self.internal_state_root, - OPERATOR_COLUMNS, - ); + let table_cursor = + BTreeCursor::new_table(pager.clone(), self.internal_state_root, OPERATOR_COLUMNS); let index_def = create_dbsp_state_index(self.internal_state_index_root); let index_cursor = BTreeCursor::new_index( - None, pager.clone(), self.internal_state_index_root, &index_def, @@ -547,14 +542,12 @@ impl DbspCircuit { CommitState::Init => { // Create state cursors when entering CommitOperators state let state_table_cursor = BTreeCursor::new_table( - None, pager.clone(), self.internal_state_root, OPERATOR_COLUMNS, ); let index_def = create_dbsp_state_index(self.internal_state_index_root); let state_index_cursor = BTreeCursor::new_index( - None, pager.clone(), self.internal_state_index_root, &index_def, @@ -585,7 +578,6 @@ impl DbspCircuit { // Create view cursor when entering UpdateView state let view_cursor = Box::new(BTreeCursor::new_table( - None, pager.clone(), main_data_root, num_columns, @@ -615,7 +607,6 @@ impl DbspCircuit { // due to btree cursor state machine limitations if matches!(write_row_state, WriteRowView::GetRecord) { *view_cursor = Box::new(BTreeCursor::new_table( - None, pager.clone(), main_data_root, num_columns, @@ -643,7 +634,6 @@ impl DbspCircuit { let view_cursor = std::mem::replace( view_cursor, Box::new(BTreeCursor::new_table( - None, pager.clone(), main_data_root, num_columns, @@ -739,14 +729,12 @@ impl DbspCircuit { // Create temporary cursors for the recursive call let temp_table_cursor = BTreeCursor::new_table( - None, pager.clone(), self.internal_state_root, OPERATOR_COLUMNS, ); let index_def = create_dbsp_state_index(self.internal_state_index_root); let temp_index_cursor = BTreeCursor::new_index( - None, pager.clone(), self.internal_state_index_root, &index_def, @@ -2774,8 +2762,7 @@ mod tests { let num_columns = circuit.output_schema.columns.len() + 1; // Create a cursor to read the btree - let mut btree_cursor = - BTreeCursor::new_table(None, pager.clone(), main_data_root, num_columns); + let mut btree_cursor = BTreeCursor::new_table(pager.clone(), main_data_root, num_columns); // Rewind to the beginning pager.io.block(|| btree_cursor.rewind())?; diff --git a/core/incremental/cursor.rs b/core/incremental/cursor.rs index a33734c72..12e3c49c6 100644 --- a/core/incremental/cursor.rs +++ b/core/incremental/cursor.rs @@ -5,7 +5,7 @@ use crate::{ view::{IncrementalView, ViewTransactionState}, }, return_if_io, - storage::btree::{BTreeCursor, CursorTrait}, + storage::btree::CursorTrait, types::{IOResult, SeekKey, SeekOp, SeekResult, Value}, LimboError, Pager, Result, }; @@ -35,7 +35,7 @@ enum SeekState { /// and overlays transaction changes as needed. pub struct MaterializedViewCursor { // Core components - btree_cursor: Box, + btree_cursor: Box, view: Arc>, pager: Arc, @@ -62,7 +62,7 @@ pub struct MaterializedViewCursor { impl MaterializedViewCursor { pub fn new( - btree_cursor: Box, + btree_cursor: Box, view: Arc>, pager: Arc, tx_state: Arc, @@ -296,6 +296,7 @@ impl MaterializedViewCursor { #[cfg(test)] mod tests { use super::*; + use crate::storage::btree::BTreeCursor; use crate::util::IOExt; use crate::{Connection, Database, OpenFlags}; use std::sync::Arc; @@ -359,12 +360,7 @@ mod tests { // Create a btree cursor let pager = conn.get_pager(); - let btree_cursor = Box::new(BTreeCursor::new( - None, // No MvCursor - pager.clone(), - root_page, - num_columns, - )); + let btree_cursor = Box::new(BTreeCursor::new(pager.clone(), root_page, num_columns)); // Get or create transaction state for this view let tx_state = conn.view_transaction_states.get_or_create("test_view"); diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 70ab72d74..613497e8d 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -393,12 +393,11 @@ mod tests { // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); // Create an aggregate operator for SUM(age) with no GROUP BY @@ -513,12 +512,11 @@ mod tests { // Create an aggregate operator for SUM(score) GROUP BY team // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -664,12 +662,11 @@ mod tests { // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); // Create COUNT(*) GROUP BY category @@ -745,12 +742,11 @@ mod tests { // Create SUM(amount) GROUP BY product // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -842,12 +838,11 @@ mod tests { // Test the example from DBSP_ROADMAP: COUNT(*) and SUM(amount) GROUP BY user_id // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -934,12 +929,11 @@ mod tests { // Test AVG aggregation // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -1034,12 +1028,11 @@ mod tests { // Test that deletes (negative weights) properly update aggregates // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -1121,12 +1114,11 @@ mod tests { // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -1211,12 +1203,11 @@ mod tests { // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -1295,12 +1286,11 @@ mod tests { // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -1364,12 +1354,11 @@ mod tests { // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -1450,12 +1439,11 @@ mod tests { // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut filter = FilterOperator::new(FilterPredicate::GreaterThan { @@ -1509,12 +1497,11 @@ mod tests { fn test_filter_eval_with_uncommitted() { // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut filter = FilterOperator::new(FilterPredicate::GreaterThan { @@ -1600,12 +1587,11 @@ mod tests { // This is the critical test - aggregations must not modify internal state during eval // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -1770,12 +1756,11 @@ mod tests { // doesn't pollute the internal state // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -1852,12 +1837,11 @@ mod tests { // Test eval with both committed delta and uncommitted changes // Create a persistent pager for the test let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); // Create index cursor with proper index definition for DBSP state table let index_def = create_dbsp_state_index(index_root_page_id); // Index has 4 columns: operator_id, zset_id, element_id, rowid - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -1968,10 +1952,9 @@ mod tests { fn test_min_max_basic() { // Test basic MIN/MAX functionality let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -2036,10 +2019,9 @@ mod tests { fn test_min_max_deletion_updates_min() { // Test that deleting the MIN value updates to the next lowest let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -2126,10 +2108,9 @@ mod tests { fn test_min_max_deletion_updates_max() { // Test that deleting the MAX value updates to the next highest let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -2216,10 +2197,9 @@ mod tests { fn test_min_max_insertion_updates_min() { // Test that inserting a new MIN value updates the aggregate let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -2298,10 +2278,9 @@ mod tests { fn test_min_max_insertion_updates_max() { // Test that inserting a new MAX value updates the aggregate let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -2380,10 +2359,9 @@ mod tests { fn test_min_max_update_changes_min() { // Test that updating a row to become the new MIN updates the aggregate let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -2470,10 +2448,9 @@ mod tests { fn test_min_max_with_group_by() { // Test MIN/MAX with GROUP BY let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -2572,10 +2549,9 @@ mod tests { fn test_min_max_with_nulls() { // Test that NULL values are ignored in MIN/MAX let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -2648,10 +2624,9 @@ mod tests { fn test_min_max_integer_values() { // Test MIN/MAX with integer values instead of floats let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -2716,10 +2691,9 @@ mod tests { fn test_min_max_text_values() { // Test MIN/MAX with text values (alphabetical ordering) let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -2755,10 +2729,9 @@ mod tests { #[test] fn test_min_max_with_other_aggregates() { let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -2847,10 +2820,9 @@ mod tests { #[test] fn test_min_max_multiple_columns() { let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( @@ -2926,10 +2898,9 @@ mod tests { fn test_join_operator_inner() { // Test INNER JOIN with incremental updates let (pager, table_page_id, index_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10); let index_def = create_dbsp_state_index(index_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut join = JoinOperator::new( 1, // operator_id @@ -3023,10 +2994,9 @@ mod tests { fn test_join_operator_with_deletions() { // Test INNER JOIN with deletions (negative weights) let (pager, table_page_id, index_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10); let index_def = create_dbsp_state_index(index_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut join = JoinOperator::new( @@ -3114,10 +3084,9 @@ mod tests { fn test_join_operator_one_to_many() { // Test one-to-many relationship: one customer with multiple orders let (pager, table_page_id, index_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10); let index_def = create_dbsp_state_index(index_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut join = JoinOperator::new( @@ -3251,10 +3220,9 @@ mod tests { fn test_join_operator_many_to_many() { // Test many-to-many: multiple rows with same key on both sides let (pager, table_page_id, index_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10); let index_def = create_dbsp_state_index(index_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut join = JoinOperator::new( @@ -3368,10 +3336,9 @@ mod tests { fn test_join_operator_update_in_one_to_many() { // Test updates in one-to-many scenarios let (pager, table_page_id, index_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10); let index_def = create_dbsp_state_index(index_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut join = JoinOperator::new( @@ -3491,10 +3458,9 @@ mod tests { fn test_join_operator_weight_accumulation_complex() { // Test complex weight accumulation with multiple identical rows let (pager, table_page_id, index_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10); let index_def = create_dbsp_state_index(index_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut join = JoinOperator::new( @@ -3627,9 +3593,9 @@ mod tests { let mut state = EvalState::Init { deltas: delta_pair }; let (pager, table_root, index_root) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root, 5); let index_def = create_dbsp_state_index(index_root); - let index_cursor = BTreeCursor::new_index(None, pager.clone(), index_root, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let result = pager @@ -3687,10 +3653,10 @@ mod tests { #[test] fn test_merge_operator_basic() { let (_pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, _pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(_pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); let index_cursor = - BTreeCursor::new_index(None, _pager.clone(), index_root_page_id, &index_def, 4); + BTreeCursor::new_index(_pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut merge_op = MergeOperator::new( @@ -3748,10 +3714,10 @@ mod tests { #[test] fn test_merge_operator_stateful_distinct() { let (_pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, _pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(_pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); let index_cursor = - BTreeCursor::new_index(None, _pager.clone(), index_root_page_id, &index_def, 4); + BTreeCursor::new_index(_pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); // Test that UNION (distinct) properly deduplicates across multiple operations @@ -3822,10 +3788,10 @@ mod tests { #[test] fn test_merge_operator_single_sided_inputs_union_all() { let (_pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, _pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(_pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); let index_cursor = - BTreeCursor::new_index(None, _pager.clone(), index_root_page_id, &index_def, 4); + BTreeCursor::new_index(_pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); // Test UNION ALL with inputs coming from only one side at a time @@ -3942,10 +3908,10 @@ mod tests { #[test] fn test_merge_operator_both_sides_empty() { let (_pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, _pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(_pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); let index_cursor = - BTreeCursor::new_index(None, _pager.clone(), index_root_page_id, &index_def, 4); + BTreeCursor::new_index(_pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); // Test that both sides being empty works correctly @@ -4022,10 +3988,9 @@ mod tests { // Test that aggregate state serialization correctly preserves column indices // when multiple aggregates operate on different columns let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); - let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5); let index_def = create_dbsp_state_index(index_root_page_id); - let index_cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); // Create first operator with SUM(col1), MIN(col3) GROUP BY col0 diff --git a/core/mvcc/cursor.rs b/core/mvcc/cursor.rs index f177181df..20a012cc3 100644 --- a/core/mvcc/cursor.rs +++ b/core/mvcc/cursor.rs @@ -26,6 +26,7 @@ pub struct MvccLazyCursor { tx_id: u64, /// Reusable immutable record, used to allow better allocation strategy. reusable_immutable_record: RefCell>, + _btree_cursor: Box, } impl MvccLazyCursor { @@ -34,17 +35,21 @@ impl MvccLazyCursor { tx_id: u64, root_page_or_table_id: i64, pager: Arc, + btree_cursor: Box, ) -> Result> { + assert!( + (&*btree_cursor as &dyn Any).is::(), + "BTreeCursor expected for mvcc cursor" + ); let table_id = db.get_table_id_from_root_page(root_page_or_table_id); db.maybe_initialize_table(table_id, pager)?; - let cursor = Self { + Ok(Self { db, tx_id, current_pos: RefCell::new(CursorPosition::BeforeFirst), table_id, reusable_immutable_record: RefCell::new(None), - }; - Ok(cursor) + _btree_cursor: btree_cursor, } pub fn current_row(&self) -> Result> { @@ -375,8 +380,16 @@ impl CursorTrait for MvccLazyCursor { fn get_skip_advance(&self) -> bool { todo!() } +} - fn get_mvcc_cursor(&self) -> Arc> { - todo!() +impl Debug for MvccLazyCursor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MvccLazyCursor") + .field("current_pos", &self.current_pos) + .field("table_id", &self.table_id) + .field("tx_id", &self.tx_id) + .field("reusable_immutable_record", &self.reusable_immutable_record) + .field("btree_cursor", &()) + .finish() } } diff --git a/core/mvcc/database/checkpoint_state_machine.rs b/core/mvcc/database/checkpoint_state_machine.rs index 778431adc..8c3e109e3 100644 --- a/core/mvcc/database/checkpoint_state_machine.rs +++ b/core/mvcc/database/checkpoint_state_machine.rs @@ -390,7 +390,6 @@ impl CheckpointStateMachine { cursor.clone() } else { let cursor = BTreeCursor::new_table( - None, self.pager.clone(), known_root_page as i64, num_columns, @@ -465,12 +464,8 @@ impl CheckpointStateMachine { let cursor = if let Some(cursor) = self.cursors.get(&root_page) { cursor.clone() } else { - let cursor = BTreeCursor::new_table( - None, // Write directly to B-tree - self.pager.clone(), - root_page as i64, - num_columns, - ); + let cursor = + BTreeCursor::new_table(self.pager.clone(), root_page as i64, num_columns); let cursor = Arc::new(RwLock::new(cursor)); self.cursors.insert(root_page, cursor.clone()); cursor diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index f1852ebfd..9675ead3c 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1884,7 +1884,6 @@ impl MvStore { .value() .unwrap_or_else(|| panic!("Table ID does not have a root page: {table_id}")); let mut cursor = BTreeCursor::new_table( - None, // No MVCC cursor for scanning pager.clone(), root_page as i64, 1, // We'll adjust this as needed diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 9198c5825..206636f02 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -830,6 +830,7 @@ fn test_lazy_scan_cursor_basic() { tx_id, table_id, db.conn.pager.read().clone(), + Box::new(BTreeCursor::new(db.conn.pager.read().clone(), table_id, 1)), ) .unwrap(); @@ -872,6 +873,7 @@ fn test_lazy_scan_cursor_with_gaps() { tx_id, table_id, db.conn.pager.read().clone(), + Box::new(BTreeCursor::new(db.conn.pager.read().clone(), table_id, 1)), ) .unwrap(); @@ -923,6 +925,7 @@ fn test_cursor_basic() { tx_id, table_id, db.conn.pager.read().clone(), + Box::new(BTreeCursor::new(db.conn.pager.read().clone(), table_id, 1)), ) .unwrap(); @@ -977,6 +980,7 @@ fn test_cursor_with_empty_table() { tx_id, table_id, db.conn.pager.read().clone(), + Box::new(BTreeCursor::new(db.conn.pager.read().clone(), table_id, 1)), ) .unwrap(); assert!(cursor.is_empty()); @@ -994,6 +998,7 @@ fn test_cursor_modification_during_scan() { tx_id, table_id, db.conn.pager.read().clone(), + Box::new(BTreeCursor::new(db.conn.pager.read().clone(), table_id, 1)), ) .unwrap(); diff --git a/core/schema.rs b/core/schema.rs index ce30d3950..67e6d2061 100644 --- a/core/schema.rs +++ b/core/schema.rs @@ -414,7 +414,7 @@ impl Schema { mv_cursor.is_none(), "mvcc not yet supported for make_from_btree" ); - let mut cursor = BTreeCursor::new_table(mv_cursor, Arc::clone(&pager), 1, 10); + let mut cursor = BTreeCursor::new_table(Arc::clone(&pager), 1, 10); let mut from_sql_indexes = Vec::with_capacity(10); let mut automatic_indices: HashMap> = HashMap::with_capacity(10); diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 65659c4b3..6d615f611 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -24,7 +24,7 @@ use crate::{ RecordCursor, SeekResult, }, util::IOExt, - Completion, MvCursor, Page, + Completion, Page, }; use crate::{ @@ -39,7 +39,6 @@ use super::{ write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, MINIMUM_CELL_SIZE, }, }; -use parking_lot::RwLock; use std::{ any::Any, cell::{Cell, Ref, RefCell}, @@ -556,15 +555,10 @@ pub trait CursorTrait: Any { fn record_cursor_mut(&self) -> std::cell::RefMut<'_, RecordCursor>; fn get_pager(&self) -> Arc; fn get_skip_advance(&self) -> bool; - - // FIXME: remove once we implement trait for mvcc - fn get_mvcc_cursor(&self) -> Arc>; // --- end: BTreeCursor specific functions ---- } pub struct BTreeCursor { - /// The multi-version cursor that is used to read and write to the database file. - mv_cursor: Option>>, /// The pager that is used to read and write to the database file. pub pager: Arc, /// Cached value of the usable space of a BTree page, since it is very expensive to call in a hot loop via pager.usable_space(). @@ -667,12 +661,7 @@ impl BTreeNodeState { } impl BTreeCursor { - pub fn new( - mv_cursor: Option>>, - pager: Arc, - root_page: i64, - num_columns: usize, - ) -> Self { + pub fn new(pager: Arc, root_page: i64, num_columns: usize) -> Self { let valid_state = if root_page == 1 && !pager.db_state.is_initialized() { CursorValidState::Invalid } else { @@ -680,7 +669,6 @@ impl BTreeCursor { }; let usable_space = pager.usable_space(); Self { - mv_cursor, pager, root_page, usable_space_cached: usable_space, @@ -718,23 +706,12 @@ impl BTreeCursor { } } - pub fn new_table( - mv_cursor: Option>>, - pager: Arc, - root_page: i64, - num_columns: usize, - ) -> Self { - Self::new(mv_cursor, pager, root_page, num_columns) + pub fn new_table(pager: Arc, root_page: i64, num_columns: usize) -> Self { + Self::new(pager, root_page, num_columns) } - pub fn new_index( - mv_cursor: Option>>, - pager: Arc, - root_page: i64, - index: &Index, - num_columns: usize, - ) -> Self { - let mut cursor = Self::new(mv_cursor, pager, root_page, num_columns); + pub fn new_index(pager: Arc, root_page: i64, index: &Index, num_columns: usize) -> Self { + let mut cursor = Self::new(pager, root_page, num_columns); cursor.index_info = Some(IndexInfo::new_from_index(index)); cursor } @@ -767,10 +744,6 @@ impl BTreeCursor { let state = self.is_empty_table_state.borrow().clone(); match state { EmptyTableState::Start => { - if let Some(mv_cursor) = &self.mv_cursor { - let mv_cursor = mv_cursor.read(); - return Ok(IOResult::Done(mv_cursor.is_empty())); - } let (page, c) = self.pager.read_page(self.root_page)?; *self.is_empty_table_state.borrow_mut() = EmptyTableState::ReadPage { page }; if let Some(c) = c { @@ -1296,19 +1269,7 @@ impl BTreeCursor { /// Used in forwards iteration, which is the default. #[instrument(skip(self), level = Level::DEBUG, name = "next")] pub fn get_next_record(&mut self) -> Result> { - if let Some(mv_cursor) = &self.mv_cursor { - let mut mv_cursor = mv_cursor.write(); - assert!(matches!(mv_cursor.next()?, IOResult::Done(_))); - let IOResult::Done(rowid) = mv_cursor.rowid()? else { - todo!() - }; - match rowid { - Some(_rowid) => { - return Ok(IOResult::Done(true)); - } - None => return Ok(IOResult::Done(false)), - } - } else if self.stack.current_page == -1 { + if self.stack.current_page == -1 { // This can happen in nested left joins. See: // https://github.com/tursodatabase/turso/issues/2924 return Ok(IOResult::Done(false)); @@ -1883,10 +1844,6 @@ impl BTreeCursor { /// of iterating cells in order. #[instrument(skip_all, level = Level::DEBUG)] fn tablebtree_seek(&mut self, rowid: i64, seek_op: SeekOp) -> Result> { - turso_assert!( - self.mv_cursor.is_none(), - "attempting to seek with MV cursor" - ); let iter_dir = seek_op.iteration_direction(); if matches!( @@ -2239,10 +2196,6 @@ impl BTreeCursor { #[instrument(skip_all, level = Level::DEBUG)] pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result> { - turso_assert!( - self.mv_cursor.is_none(), - "attempting to move with MV cursor" - ); tracing::trace!(?key, ?cmp); // For a table with N rows, we can find any row by row id in O(log(N)) time by starting at the root page and following the B-tree pointers. // B-trees consist of interior pages and leaf pages. Interior pages contain pointers to other pages, while leaf pages contain the actual row data. @@ -4957,7 +4910,6 @@ impl CursorTrait for BTreeCursor { #[instrument(skip_all, level = Level::DEBUG)] fn last(&mut self) -> Result> { - assert!(self.mv_cursor.is_none()); let cursor_has_record = return_if_io!(self.move_to_rightmost()); self.has_record.replace(cursor_has_record); self.invalidate_record(); @@ -4966,7 +4918,6 @@ impl CursorTrait for BTreeCursor { #[instrument(skip_all, level = Level::DEBUG)] fn prev(&mut self) -> Result> { - assert!(self.mv_cursor.is_none()); loop { match self.advance_state { AdvanceState::Start => { @@ -4985,16 +4936,6 @@ impl CursorTrait for BTreeCursor { #[instrument(skip(self), level = Level::DEBUG)] fn rowid(&self) -> Result>> { - if let Some(mv_cursor) = &self.mv_cursor { - let mv_cursor = mv_cursor.write(); - let IOResult::Done(rowid) = mv_cursor.rowid()? else { - todo!(); - }; - let Some(rowid) = rowid else { - return Ok(IOResult::Done(None)); - }; - return Ok(IOResult::Done(Some(rowid))); - } if self.get_null_flag() { return Ok(IOResult::Done(None)); } @@ -5017,10 +4958,6 @@ impl CursorTrait for BTreeCursor { #[instrument(skip(self, key), level = Level::DEBUG)] fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result> { - if let Some(mv_cursor) = &self.mv_cursor { - let mut mv_cursor = mv_cursor.write(); - return mv_cursor.seek(key, op); - } self.skip_advance.set(false); // Empty trace to capture the span information tracing::trace!(""); @@ -5038,7 +4975,7 @@ impl CursorTrait for BTreeCursor { #[instrument(skip(self), level = Level::DEBUG)] fn record(&self) -> Result>>> { - if !self.has_record.get() && self.mv_cursor.is_none() { + if !self.has_record.get() { return Ok(IOResult::Done(None)); } let invalidated = self @@ -5052,25 +4989,6 @@ impl CursorTrait for BTreeCursor { .unwrap(); return Ok(IOResult::Done(Some(record_ref))); } - if let Some(mv_cursor) = &self.mv_cursor { - let mv_cursor = mv_cursor.write(); - let Some(row) = mv_cursor.current_row()? else { - return Ok(IOResult::Done(None)); - }; - self.get_immutable_record_or_create() - .as_mut() - .unwrap() - .invalidate(); - self.get_immutable_record_or_create() - .as_mut() - .unwrap() - .start_serialization(&row.data); - self.record_cursor.borrow_mut().invalidate(); - let record_ref = - Ref::filter_map(self.reusable_immutable_record.borrow(), |opt| opt.as_ref()) - .unwrap(); - return Ok(IOResult::Done(Some(record_ref))); - } let page = self.stack.top_ref(); let contents = page.get_contents(); @@ -5118,17 +5036,10 @@ impl CursorTrait for BTreeCursor { #[instrument(skip_all, level = Level::DEBUG)] fn insert(&mut self, key: &BTreeKey) -> Result> { tracing::debug!(valid_state = ?self.valid_state, cursor_state = ?self.state, is_write_in_progress = self.is_write_in_progress()); - match &self.mv_cursor { - Some(mv_cursor) => { - return_if_io!(mv_cursor.write().insert(key)); - } - None => { - return_if_io!(self.insert_into_page(key)); - if key.maybe_rowid().is_some() { - self.has_record.replace(true); - } - } - }; + return_if_io!(self.insert_into_page(key)); + if key.maybe_rowid().is_some() { + self.has_record.replace(true); + } Ok(IOResult::Done(())) } @@ -5146,11 +5057,6 @@ impl CursorTrait for BTreeCursor { /// 9. SeekAfterBalancing -> adjust the cursor to a node that is closer to the deleted value. go to Finish /// 10. Finish -> Delete operation is done. Return CursorResult(Ok()) fn delete(&mut self) -> Result> { - if let Some(mv_cursor) = &self.mv_cursor { - return_if_io!(mv_cursor.write().delete()); - return Ok(IOResult::Done(())); - } - if let CursorState::None = &self.state { self.state = CursorState::Delete(DeleteState::Start); } @@ -5524,7 +5430,6 @@ impl CursorTrait for BTreeCursor { #[instrument(skip_all, level = Level::DEBUG)] fn exists(&mut self, key: &Value) -> Result> { - assert!(self.mv_cursor.is_none()); let int_key = match key { Value::Integer(i) => i, _ => unreachable!("btree tables are indexed by integers!"), @@ -5561,10 +5466,6 @@ impl CursorTrait for BTreeCursor { /// /// Only supposed to be used in the context of a simple Count Select Statement fn count(&mut self) -> Result> { - if let Some(_mv_cursor) = &self.mv_cursor { - todo!("Implement count for mvcc"); - } - let mut mem_page; let mut contents; @@ -5684,14 +5585,9 @@ impl CursorTrait for BTreeCursor { match self.rewind_state { RewindState::Start => { self.rewind_state = RewindState::NextRecord; - if let Some(mv_cursor) = &self.mv_cursor { - let mut mv_cursor = mv_cursor.write(); - return_if_io!(mv_cursor.rewind()); - } else { - let c = self.move_to_root()?; - if let Some(c) = c { - io_yield_one!(c); - } + let c = self.move_to_root()?; + if let Some(c) = c { + io_yield_one!(c); } } RewindState::NextRecord => { @@ -5744,7 +5640,6 @@ impl CursorTrait for BTreeCursor { } fn seek_end(&mut self) -> Result> { - assert!(self.mv_cursor.is_none()); // unsure about this -_- loop { match self.seek_end_state { SeekEndState::Start => { @@ -5780,16 +5675,11 @@ impl CursorTrait for BTreeCursor { } } - fn get_mvcc_cursor(&self) -> Arc> { - self.mv_cursor.as_ref().unwrap().clone() - } - #[instrument(skip_all, level = Level::DEBUG)] fn seek_to_last(&mut self) -> Result> { loop { match self.seek_to_last_state { SeekToLastState::Start => { - assert!(self.mv_cursor.is_none()); let has_record = return_if_io!(self.move_to_rightmost()); self.invalidate_record(); self.has_record.replace(has_record); @@ -8076,7 +7966,7 @@ mod tests { fn validate_btree(pager: Arc, page_idx: i64) -> (usize, bool) { let num_columns = 5; - let cursor = BTreeCursor::new_table(None, pager.clone(), page_idx, num_columns); + let cursor = BTreeCursor::new_table(pager.clone(), page_idx, num_columns); let (page, _c) = cursor.read_page(page_idx).unwrap(); while page.is_locked() { pager.io.step().unwrap(); @@ -8187,7 +8077,7 @@ mod tests { fn format_btree(pager: Arc, page_idx: i64, depth: usize) -> String { let num_columns = 5; - let cursor = BTreeCursor::new_table(None, pager.clone(), page_idx, num_columns); + let cursor = BTreeCursor::new_table(pager.clone(), page_idx, num_columns); let (page, _c) = cursor.read_page(page_idx).unwrap(); while page.is_locked() { pager.io.step().unwrap(); @@ -8269,7 +8159,7 @@ mod tests { let conn = db.connect().unwrap(); let pager = conn.pager.read().clone(); - let mut cursor = BTreeCursor::new(None, pager, 1, 5); + let mut cursor = BTreeCursor::new(pager, 1, 5); let result = cursor.rewind()?; assert!(matches!(result, IOResult::Done(_))); let result = cursor.next()?; @@ -8299,7 +8189,7 @@ mod tests { let large_record = ImmutableRecord::from_registers(regs, regs.len()); // Create cursor for the table - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); let initial_pagecount = pager .io @@ -8451,7 +8341,7 @@ mod tests { let (pager, root_page, _, _) = empty_btree(); let num_columns = 5; - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); for (key, size) in sequence.iter() { run_until_done( || { @@ -8517,7 +8407,7 @@ mod tests { for _ in 0..attempts { let (pager, root_page, _db, conn) = empty_btree(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); let mut keys = SortedVec::new(); tracing::info!("seed: {seed}"); for insert_id in 0..inserts { @@ -8659,13 +8549,8 @@ mod tests { has_rowid: false, }; let num_columns = index_def.columns.len(); - let mut cursor = BTreeCursor::new_index( - None, - pager.clone(), - index_root_page, - &index_def, - num_columns, - ); + let mut cursor = + BTreeCursor::new_index(pager.clone(), index_root_page, &index_def, num_columns); let mut keys = SortedVec::new(); tracing::info!("seed: {seed}"); for i in 0..inserts { @@ -8822,8 +8707,7 @@ mod tests { ephemeral: false, has_rowid: false, }; - let mut cursor = - BTreeCursor::new_index(None, pager.clone(), index_root_page, &index_def, 1); + let mut cursor = BTreeCursor::new_index(pager.clone(), index_root_page, &index_def, 1); // Track expected keys that should be present in the tree let mut expected_keys = Vec::new(); @@ -9214,7 +9098,7 @@ mod tests { let pager = setup_test_env(5); let num_columns = 5; - let mut cursor = BTreeCursor::new_table(None, pager.clone(), 1, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), 1, num_columns); let max_local = payload_overflow_threshold_max(PageType::TableLeaf, 4096); let usable_size = cursor.usable_space(); @@ -9325,7 +9209,7 @@ mod tests { let pager = setup_test_env(5); let num_columns = 5; - let mut cursor = BTreeCursor::new_table(None, pager.clone(), 1, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), 1, num_columns); let small_payload = vec![b'A'; 10]; @@ -9374,7 +9258,7 @@ mod tests { let pager = setup_test_env(initial_size); let num_columns = 5; - let mut cursor = BTreeCursor::new_table(None, pager.clone(), 2, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), 2, num_columns); // Initialize page 2 as a root page (interior) let root_page = run_until_done( @@ -9467,7 +9351,7 @@ mod tests { let num_columns = 5; let record_count = 10; - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); for rowid in 1..=record_count { insert_record(&mut cursor, &pager, rowid, Value::Integer(rowid))?; @@ -9492,7 +9376,7 @@ mod tests { let num_columns = 5; let record_count = 1000; - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); for rowid in 1..=record_count { insert_record(&mut cursor, &pager, rowid, Value::Integer(rowid))?; @@ -9518,7 +9402,7 @@ mod tests { let num_columns = 5; let record_count = 1000; - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); for rowid in 1..=record_count { insert_record(&mut cursor, &pager, rowid, Value::Integer(rowid))?; @@ -9558,8 +9442,8 @@ mod tests { let num_columns = 5; let record_count = 1000; - let mut cursor1 = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); - let mut cursor2 = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor1 = BTreeCursor::new_table(pager.clone(), root_page, num_columns); + let mut cursor2 = BTreeCursor::new_table(pager.clone(), root_page, num_columns); // Use cursor1 to insert records for rowid in 1..=record_count { @@ -9592,7 +9476,7 @@ mod tests { let num_columns = 5; let record_count = 100; - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); let initial_page_count = pager .io @@ -10228,7 +10112,7 @@ mod tests { let num_columns = 5; for i in 0..10000 { - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); tracing::info!("INSERT INTO t VALUES ({});", i,); let regs = &[Register::Value(Value::Integer(i))]; let value = ImmutableRecord::from_registers(regs, regs.len()); @@ -10256,7 +10140,7 @@ mod tests { format_btree(pager.clone(), root_page, 0) ); for key in keys.iter() { - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); let key = Value::Integer(*key); let exists = run_until_done(|| cursor.exists(&key), pager.deref()).unwrap(); assert!(exists, "key not found {key}"); @@ -10315,7 +10199,7 @@ mod tests { // Insert 10,000 records in to the BTree. for i in 1..=10000 { - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); let regs = &[Register::Value(Value::Text(Text::new("hello world")))]; let value = ImmutableRecord::from_registers(regs, regs.len()); @@ -10342,7 +10226,7 @@ mod tests { // Delete records with 500 <= key <= 3500 for i in 500..=3500 { - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); let seek_key = SeekKey::TableRowId(i); let seek_result = run_until_done( @@ -10362,7 +10246,7 @@ mod tests { continue; } - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); let key = Value::Integer(i); let exists = run_until_done(|| cursor.exists(&key), pager.deref()).unwrap(); assert!(exists, "Key {i} should exist but doesn't"); @@ -10370,7 +10254,7 @@ mod tests { // Verify the deleted records don't exist. for i in 500..=3500 { - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); let key = Value::Integer(i); let exists = run_until_done(|| cursor.exists(&key), pager.deref()).unwrap(); assert!(!exists, "Deleted key {i} still exists"); @@ -10393,7 +10277,7 @@ mod tests { let num_columns = 5; for (i, huge_text) in huge_texts.iter().enumerate().take(iterations) { - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); tracing::info!("INSERT INTO t VALUES ({});", i,); let regs = &[Register::Value(Value::Text(Text { value: huge_text.as_bytes().to_vec(), @@ -10423,7 +10307,7 @@ mod tests { format_btree(pager.clone(), root_page, 0) ); } - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns); let _c = cursor.move_to_root().unwrap(); for i in 0..iterations { let has_next = run_until_done(|| cursor.next(), pager.deref()).unwrap(); @@ -10441,7 +10325,7 @@ mod tests { pub fn test_read_write_payload_with_offset() { let (pager, root_page, _, _) = empty_btree(); let num_columns = 5; - let mut cursor = BTreeCursor::new(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new(pager.clone(), root_page, num_columns); let offset = 2; // blobs data starts at offset 2 let initial_text = "hello world"; let initial_blob = initial_text.as_bytes().to_vec(); @@ -10518,7 +10402,7 @@ mod tests { pub fn test_read_write_payload_with_overflow_page() { let (pager, root_page, _, _) = empty_btree(); let num_columns = 5; - let mut cursor = BTreeCursor::new(None, pager.clone(), root_page, num_columns); + let mut cursor = BTreeCursor::new(pager.clone(), root_page, num_columns); let mut large_blob = vec![b'A'; 40960 - 11]; // insert large blob. 40960 = 10 page long. let hello_world = b"hello world"; large_blob.extend_from_slice(hello_world); diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 0d7daeacd..00ce85fec 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -36,6 +36,7 @@ use crate::{ translate::emitter::TransactionMode, }; use crate::{get_cursor, CheckpointMode, Connection, MvCursor}; +use std::any::Any; use std::env::temp_dir; use std::ops::DerefMut; use std::{ @@ -1044,16 +1045,9 @@ pub fn op_open_read( let pager = program.get_pager_from_database_index(db); let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap(); - let mv_cursor = if let Some(tx_id) = program.connection.get_mv_tx_id() { - let mv_store = mv_store.unwrap().clone(); - let mv_cursor = Arc::new(RwLock::new( - MvCursor::new(mv_store, tx_id, *root_page, pager.clone()).unwrap(), - )); - Some(mv_cursor) - } else { + if program.connection.get_mv_tx_id().is_none() { assert!(*root_page >= 0, ""); - None - }; + } let cursors = &mut state.cursors; let num_columns = match cursor_type { CursorType::BTreeTable(table_rc) => table_rc.columns.len(), @@ -1062,16 +1056,33 @@ pub fn op_open_read( _ => unreachable!("This should not have happened"), }; + let maybe_promote_to_mvcc_cursor = + |btree_cursor: Box| -> Result> { + if let Some(tx_id) = program.connection.get_mv_tx_id() { + let mv_store = mv_store.unwrap().clone(); + Ok(Box::new(MvCursor::new( + mv_store, + tx_id, + *root_page, + pager.clone(), + btree_cursor, + )?)) + } else { + Ok(btree_cursor) + } + }; + match cursor_type { CursorType::MaterializedView(_, view_mutex) => { // This is a materialized view with storage // Create btree cursor for reading the persistent data + let btree_cursor = Box::new(BTreeCursor::new_table( - mv_cursor, pager.clone(), *root_page, num_columns, )); + let cursor = maybe_promote_to_mvcc_cursor(btree_cursor)?; // Get the view name and look up or create its transaction state let view_name = view_mutex.lock().unwrap().name().to_string(); @@ -1082,7 +1093,7 @@ pub fn op_open_read( // Create materialized view cursor with this view's transaction state let mv_cursor = crate::incremental::cursor::MaterializedViewCursor::new( - btree_cursor, + cursor, view_mutex.clone(), pager.clone(), tx_state, @@ -1095,24 +1106,29 @@ pub fn op_open_read( } CursorType::BTreeTable(_) => { // Regular table - let cursor = BTreeCursor::new_table(mv_cursor, pager.clone(), *root_page, num_columns); + let btree_cursor = Box::new(BTreeCursor::new_table( + pager.clone(), + *root_page, + num_columns, + )); + let cursor = maybe_promote_to_mvcc_cursor(btree_cursor)?; cursors .get_mut(*cursor_id) .unwrap() - .replace(Cursor::new_btree(Box::new(cursor))); + .replace(Cursor::new_btree(cursor)); } CursorType::BTreeIndex(index) => { - let cursor = BTreeCursor::new_index( - mv_cursor, + let btree_cursor = Box::new(BTreeCursor::new_index( pager.clone(), *root_page, index.as_ref(), num_columns, - ); + )); + let cursor = maybe_promote_to_mvcc_cursor(btree_cursor)?; cursors .get_mut(*cursor_id) .unwrap() - .replace(Cursor::new_btree(Box::new(cursor))); + .replace(Cursor::new_btree(cursor)); } CursorType::Pseudo(_) => { panic!("OpenRead on pseudo cursor"); @@ -6423,13 +6439,13 @@ pub fn op_new_rowid( }, insn ); - if let Some(mv_store) = mv_store { + // With MVCC we can't simply find last rowid and get rowid + 1 as a result. To not have two conflicting rowids concurrently we need to call `get_next_rowid` + // which will make sure we don't collide. let rowid = { let cursor = state.get_cursor(*cursor); - let cursor = cursor.as_btree_mut(); - let mvcc_cursor = cursor.get_mvcc_cursor(); - let mut mvcc_cursor = mvcc_cursor.write(); + let cursor = cursor.as_btree_mut() as &mut dyn Any; + let mvcc_cursor = cursor.downcast_mut::().unwrap(); mvcc_cursor.get_next_rowid() }; state.registers[*rowid_reg] = Register::Value(Value::Integer(rowid)); @@ -6706,16 +6722,10 @@ pub fn op_not_exists( }, insn ); - let exists = if let Some(mv_store) = mv_store { - let cursor = must_be_btree_cursor!(*cursor, program.cursor_ref, state, "NotExists"); - let cursor = cursor.as_btree_mut(); - let mvcc_cursor = cursor.get_mvcc_cursor(); - false - } else { - let cursor = must_be_btree_cursor!(*cursor, program.cursor_ref, state, "NotExists"); - let cursor = cursor.as_btree_mut(); - return_if_io!(cursor.exists(state.registers[*rowid_reg].get_value())) - }; + let cursor = must_be_btree_cursor!(*cursor, program.cursor_ref, state, "NotExists"); + let cursor = cursor.as_btree_mut(); + let exists = return_if_io!(cursor.exists(state.registers[*rowid_reg].get_value())); + if exists { state.pc += 1; } else { @@ -6823,15 +6833,21 @@ pub fn op_open_write( CursorType::BTreeIndex(index) => Some(index), _ => None, }; - let mv_cursor = if let Some(tx_id) = program.connection.get_mv_tx_id() { - let mv_store = mv_store.unwrap().clone(); - let mv_cursor = Arc::new(RwLock::new( - MvCursor::new(mv_store.clone(), tx_id, root_page, pager.clone()).unwrap(), - )); - Some(mv_cursor) - } else { - None - }; + let maybe_promote_to_mvcc_cursor = + |btree_cursor: Box| -> Result> { + if let Some(tx_id) = program.connection.get_mv_tx_id() { + let mv_store = mv_store.unwrap().clone(); + Ok(Box::new(MvCursor::new( + mv_store, + tx_id, + root_page, + pager.clone(), + btree_cursor, + )?)) + } else { + Ok(btree_cursor) + } + }; if let Some(index) = maybe_index { let conn = program.connection.clone(); let schema = conn.schema.read(); @@ -6840,17 +6856,17 @@ pub fn op_open_write( .and_then(|table| table.btree()); let num_columns = index.columns.len(); - let cursor = BTreeCursor::new_index( - mv_cursor, + let btree_cursor = Box::new(BTreeCursor::new_index( pager.clone(), root_page, index.as_ref(), num_columns, - ); + )); + let cursor = maybe_promote_to_mvcc_cursor(btree_cursor)?; cursors .get_mut(*cursor_id) .unwrap() - .replace(Cursor::new_btree(Box::new(cursor))); + .replace(Cursor::new_btree(cursor)); } else { let num_columns = match cursor_type { CursorType::BTreeTable(table_rc) => table_rc.columns.len(), @@ -6860,11 +6876,16 @@ pub fn op_open_write( ), }; - let cursor = BTreeCursor::new_table(mv_cursor, pager.clone(), root_page, num_columns); + let btree_cursor = Box::new(BTreeCursor::new_table( + pager.clone(), + root_page, + num_columns, + )); + let cursor = maybe_promote_to_mvcc_cursor(btree_cursor)?; cursors .get_mut(*cursor_id) .unwrap() - .replace(Cursor::new_btree(Box::new(cursor))); + .replace(Cursor::new_btree(cursor)); } state.pc += 1; Ok(InsnFunctionStepResult::Step) @@ -6958,7 +6979,7 @@ pub fn op_destroy( OpDestroyState::CreateCursor => { // Destroy doesn't do anything meaningful with the table/index distinction so we can just use a // table btree cursor for both. - let cursor = BTreeCursor::new(None, pager.clone(), *root, 0); + let cursor = BTreeCursor::new(pager.clone(), *root, 0); state.op_destroy_state = OpDestroyState::DestroyBtree(Arc::new(RwLock::new(cursor))); } @@ -7623,9 +7644,9 @@ pub fn op_open_ephemeral( }; let cursor = if let CursorType::BTreeIndex(index) = cursor_type { - BTreeCursor::new_index(None, pager.clone(), root_page, index, num_columns) + BTreeCursor::new_index(pager.clone(), root_page, index, num_columns) } else { - BTreeCursor::new_table(None, pager.clone(), root_page, num_columns) + BTreeCursor::new_table(pager.clone(), root_page, num_columns) }; state.op_open_ephemeral_state = OpOpenEphemeralState::Rewind { cursor: Box::new(cursor), @@ -7704,29 +7725,32 @@ pub fn op_open_dup( // a separate database file). let pager = original_cursor.get_pager(); - let mv_cursor = if let Some(tx_id) = program.connection.get_mv_tx_id() { - let mv_store = mv_store.unwrap().clone(); - let mv_cursor = Arc::new(RwLock::new(MvCursor::new( - mv_store, - tx_id, - root_page, - pager.clone(), - )?)); - Some(mv_cursor) - } else { - None - }; - let (_, cursor_type) = program.cursor_ref.get(*original_cursor_id).unwrap(); match cursor_type { CursorType::BTreeTable(table) => { - let cursor = - BTreeCursor::new_table(mv_cursor, pager.clone(), root_page, table.columns.len()); + let cursor = Box::new(BTreeCursor::new_table( + pager.clone(), + root_page, + table.columns.len(), + )); + let cursor: Box = + if let Some(tx_id) = program.connection.get_mv_tx_id() { + let mv_store = mv_store.unwrap().clone(); + Box::new(MvCursor::new( + mv_store, + tx_id, + root_page, + pager.clone(), + cursor, + )?) + } else { + cursor + }; let cursors = &mut state.cursors; cursors .get_mut(*new_cursor_id) .unwrap() - .replace(Cursor::new_btree(Box::new(cursor))); + .replace(Cursor::new_btree(cursor)); } CursorType::BTreeIndex(table) => { // In principle, we could implement OpenDup for BTreeIndex,