mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-31 13:54:27 +01:00
core/mvcc: add btree_cursor under MVCC cursor
This commit is contained in:
@@ -490,15 +490,10 @@ impl DbspCircuit {
|
||||
) -> Result<IOResult<Delta>> {
|
||||
if let Some(root_id) = self.root {
|
||||
// Create temporary cursors for execute (non-commit) operations
|
||||
let table_cursor = BTreeCursor::new_table(
|
||||
None,
|
||||
pager.clone(),
|
||||
self.internal_state_root,
|
||||
OPERATOR_COLUMNS,
|
||||
);
|
||||
let table_cursor =
|
||||
BTreeCursor::new_table(pager.clone(), self.internal_state_root, OPERATOR_COLUMNS);
|
||||
let index_def = create_dbsp_state_index(self.internal_state_index_root);
|
||||
let index_cursor = BTreeCursor::new_index(
|
||||
None,
|
||||
pager.clone(),
|
||||
self.internal_state_index_root,
|
||||
&index_def,
|
||||
@@ -547,14 +542,12 @@ impl DbspCircuit {
|
||||
CommitState::Init => {
|
||||
// Create state cursors when entering CommitOperators state
|
||||
let state_table_cursor = BTreeCursor::new_table(
|
||||
None,
|
||||
pager.clone(),
|
||||
self.internal_state_root,
|
||||
OPERATOR_COLUMNS,
|
||||
);
|
||||
let index_def = create_dbsp_state_index(self.internal_state_index_root);
|
||||
let state_index_cursor = BTreeCursor::new_index(
|
||||
None,
|
||||
pager.clone(),
|
||||
self.internal_state_index_root,
|
||||
&index_def,
|
||||
@@ -585,7 +578,6 @@ impl DbspCircuit {
|
||||
|
||||
// Create view cursor when entering UpdateView state
|
||||
let view_cursor = Box::new(BTreeCursor::new_table(
|
||||
None,
|
||||
pager.clone(),
|
||||
main_data_root,
|
||||
num_columns,
|
||||
@@ -615,7 +607,6 @@ impl DbspCircuit {
|
||||
// due to btree cursor state machine limitations
|
||||
if matches!(write_row_state, WriteRowView::GetRecord) {
|
||||
*view_cursor = Box::new(BTreeCursor::new_table(
|
||||
None,
|
||||
pager.clone(),
|
||||
main_data_root,
|
||||
num_columns,
|
||||
@@ -643,7 +634,6 @@ impl DbspCircuit {
|
||||
let view_cursor = std::mem::replace(
|
||||
view_cursor,
|
||||
Box::new(BTreeCursor::new_table(
|
||||
None,
|
||||
pager.clone(),
|
||||
main_data_root,
|
||||
num_columns,
|
||||
@@ -739,14 +729,12 @@ impl DbspCircuit {
|
||||
|
||||
// Create temporary cursors for the recursive call
|
||||
let temp_table_cursor = BTreeCursor::new_table(
|
||||
None,
|
||||
pager.clone(),
|
||||
self.internal_state_root,
|
||||
OPERATOR_COLUMNS,
|
||||
);
|
||||
let index_def = create_dbsp_state_index(self.internal_state_index_root);
|
||||
let temp_index_cursor = BTreeCursor::new_index(
|
||||
None,
|
||||
pager.clone(),
|
||||
self.internal_state_index_root,
|
||||
&index_def,
|
||||
@@ -2774,8 +2762,7 @@ mod tests {
|
||||
let num_columns = circuit.output_schema.columns.len() + 1;
|
||||
|
||||
// Create a cursor to read the btree
|
||||
let mut btree_cursor =
|
||||
BTreeCursor::new_table(None, pager.clone(), main_data_root, num_columns);
|
||||
let mut btree_cursor = BTreeCursor::new_table(pager.clone(), main_data_root, num_columns);
|
||||
|
||||
// Rewind to the beginning
|
||||
pager.io.block(|| btree_cursor.rewind())?;
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::{
|
||||
view::{IncrementalView, ViewTransactionState},
|
||||
},
|
||||
return_if_io,
|
||||
storage::btree::{BTreeCursor, CursorTrait},
|
||||
storage::btree::CursorTrait,
|
||||
types::{IOResult, SeekKey, SeekOp, SeekResult, Value},
|
||||
LimboError, Pager, Result,
|
||||
};
|
||||
@@ -35,7 +35,7 @@ enum SeekState {
|
||||
/// and overlays transaction changes as needed.
|
||||
pub struct MaterializedViewCursor {
|
||||
// Core components
|
||||
btree_cursor: Box<BTreeCursor>,
|
||||
btree_cursor: Box<dyn CursorTrait>,
|
||||
view: Arc<Mutex<IncrementalView>>,
|
||||
pager: Arc<Pager>,
|
||||
|
||||
@@ -62,7 +62,7 @@ pub struct MaterializedViewCursor {
|
||||
|
||||
impl MaterializedViewCursor {
|
||||
pub fn new(
|
||||
btree_cursor: Box<BTreeCursor>,
|
||||
btree_cursor: Box<dyn CursorTrait>,
|
||||
view: Arc<Mutex<IncrementalView>>,
|
||||
pager: Arc<Pager>,
|
||||
tx_state: Arc<ViewTransactionState>,
|
||||
@@ -296,6 +296,7 @@ impl MaterializedViewCursor {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::storage::btree::BTreeCursor;
|
||||
use crate::util::IOExt;
|
||||
use crate::{Connection, Database, OpenFlags};
|
||||
use std::sync::Arc;
|
||||
@@ -359,12 +360,7 @@ mod tests {
|
||||
|
||||
// Create a btree cursor
|
||||
let pager = conn.get_pager();
|
||||
let btree_cursor = Box::new(BTreeCursor::new(
|
||||
None, // No MvCursor
|
||||
pager.clone(),
|
||||
root_page,
|
||||
num_columns,
|
||||
));
|
||||
let btree_cursor = Box::new(BTreeCursor::new(pager.clone(), root_page, num_columns));
|
||||
|
||||
// Get or create transaction state for this view
|
||||
let tx_state = conn.view_transaction_states.get_or_create("test_view");
|
||||
|
||||
@@ -393,12 +393,11 @@ mod tests {
|
||||
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
// Create an aggregate operator for SUM(age) with no GROUP BY
|
||||
@@ -513,12 +512,11 @@ mod tests {
|
||||
// Create an aggregate operator for SUM(score) GROUP BY team
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -664,12 +662,11 @@ mod tests {
|
||||
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
// Create COUNT(*) GROUP BY category
|
||||
@@ -745,12 +742,11 @@ mod tests {
|
||||
// Create SUM(amount) GROUP BY product
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -842,12 +838,11 @@ mod tests {
|
||||
// Test the example from DBSP_ROADMAP: COUNT(*) and SUM(amount) GROUP BY user_id
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -934,12 +929,11 @@ mod tests {
|
||||
// Test AVG aggregation
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -1034,12 +1028,11 @@ mod tests {
|
||||
// Test that deletes (negative weights) properly update aggregates
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -1121,12 +1114,11 @@ mod tests {
|
||||
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -1211,12 +1203,11 @@ mod tests {
|
||||
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -1295,12 +1286,11 @@ mod tests {
|
||||
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -1364,12 +1354,11 @@ mod tests {
|
||||
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -1450,12 +1439,11 @@ mod tests {
|
||||
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut filter = FilterOperator::new(FilterPredicate::GreaterThan {
|
||||
@@ -1509,12 +1497,11 @@ mod tests {
|
||||
fn test_filter_eval_with_uncommitted() {
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut filter = FilterOperator::new(FilterPredicate::GreaterThan {
|
||||
@@ -1600,12 +1587,11 @@ mod tests {
|
||||
// This is the critical test - aggregations must not modify internal state during eval
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -1770,12 +1756,11 @@ mod tests {
|
||||
// doesn't pollute the internal state
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -1852,12 +1837,11 @@ mod tests {
|
||||
// Test eval with both committed delta and uncommitted changes
|
||||
// Create a persistent pager for the test
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
// Create index cursor with proper index definition for DBSP state table
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
// Index has 4 columns: operator_id, zset_id, element_id, rowid
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -1968,10 +1952,9 @@ mod tests {
|
||||
fn test_min_max_basic() {
|
||||
// Test basic MIN/MAX functionality
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -2036,10 +2019,9 @@ mod tests {
|
||||
fn test_min_max_deletion_updates_min() {
|
||||
// Test that deleting the MIN value updates to the next lowest
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -2126,10 +2108,9 @@ mod tests {
|
||||
fn test_min_max_deletion_updates_max() {
|
||||
// Test that deleting the MAX value updates to the next highest
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -2216,10 +2197,9 @@ mod tests {
|
||||
fn test_min_max_insertion_updates_min() {
|
||||
// Test that inserting a new MIN value updates the aggregate
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -2298,10 +2278,9 @@ mod tests {
|
||||
fn test_min_max_insertion_updates_max() {
|
||||
// Test that inserting a new MAX value updates the aggregate
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -2380,10 +2359,9 @@ mod tests {
|
||||
fn test_min_max_update_changes_min() {
|
||||
// Test that updating a row to become the new MIN updates the aggregate
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -2470,10 +2448,9 @@ mod tests {
|
||||
fn test_min_max_with_group_by() {
|
||||
// Test MIN/MAX with GROUP BY
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -2572,10 +2549,9 @@ mod tests {
|
||||
fn test_min_max_with_nulls() {
|
||||
// Test that NULL values are ignored in MIN/MAX
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -2648,10 +2624,9 @@ mod tests {
|
||||
fn test_min_max_integer_values() {
|
||||
// Test MIN/MAX with integer values instead of floats
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -2716,10 +2691,9 @@ mod tests {
|
||||
fn test_min_max_text_values() {
|
||||
// Test MIN/MAX with text values (alphabetical ordering)
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -2755,10 +2729,9 @@ mod tests {
|
||||
#[test]
|
||||
fn test_min_max_with_other_aggregates() {
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -2847,10 +2820,9 @@ mod tests {
|
||||
#[test]
|
||||
fn test_min_max_multiple_columns() {
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut agg = AggregateOperator::new(
|
||||
@@ -2926,10 +2898,9 @@ mod tests {
|
||||
fn test_join_operator_inner() {
|
||||
// Test INNER JOIN with incremental updates
|
||||
let (pager, table_page_id, index_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10);
|
||||
let index_def = create_dbsp_state_index(index_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
let mut join = JoinOperator::new(
|
||||
1, // operator_id
|
||||
@@ -3023,10 +2994,9 @@ mod tests {
|
||||
fn test_join_operator_with_deletions() {
|
||||
// Test INNER JOIN with deletions (negative weights)
|
||||
let (pager, table_page_id, index_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10);
|
||||
let index_def = create_dbsp_state_index(index_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut join = JoinOperator::new(
|
||||
@@ -3114,10 +3084,9 @@ mod tests {
|
||||
fn test_join_operator_one_to_many() {
|
||||
// Test one-to-many relationship: one customer with multiple orders
|
||||
let (pager, table_page_id, index_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10);
|
||||
let index_def = create_dbsp_state_index(index_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut join = JoinOperator::new(
|
||||
@@ -3251,10 +3220,9 @@ mod tests {
|
||||
fn test_join_operator_many_to_many() {
|
||||
// Test many-to-many: multiple rows with same key on both sides
|
||||
let (pager, table_page_id, index_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10);
|
||||
let index_def = create_dbsp_state_index(index_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut join = JoinOperator::new(
|
||||
@@ -3368,10 +3336,9 @@ mod tests {
|
||||
fn test_join_operator_update_in_one_to_many() {
|
||||
// Test updates in one-to-many scenarios
|
||||
let (pager, table_page_id, index_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10);
|
||||
let index_def = create_dbsp_state_index(index_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut join = JoinOperator::new(
|
||||
@@ -3491,10 +3458,9 @@ mod tests {
|
||||
fn test_join_operator_weight_accumulation_complex() {
|
||||
// Test complex weight accumulation with multiple identical rows
|
||||
let (pager, table_page_id, index_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10);
|
||||
let index_def = create_dbsp_state_index(index_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut join = JoinOperator::new(
|
||||
@@ -3627,9 +3593,9 @@ mod tests {
|
||||
let mut state = EvalState::Init { deltas: delta_pair };
|
||||
|
||||
let (pager, table_root, index_root) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root, 5);
|
||||
let index_def = create_dbsp_state_index(index_root);
|
||||
let index_cursor = BTreeCursor::new_index(None, pager.clone(), index_root, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let result = pager
|
||||
@@ -3687,10 +3653,10 @@ mod tests {
|
||||
#[test]
|
||||
fn test_merge_operator_basic() {
|
||||
let (_pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, _pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(_pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, _pager.clone(), index_root_page_id, &index_def, 4);
|
||||
BTreeCursor::new_index(_pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut merge_op = MergeOperator::new(
|
||||
@@ -3748,10 +3714,10 @@ mod tests {
|
||||
#[test]
|
||||
fn test_merge_operator_stateful_distinct() {
|
||||
let (_pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, _pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(_pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, _pager.clone(), index_root_page_id, &index_def, 4);
|
||||
BTreeCursor::new_index(_pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
// Test that UNION (distinct) properly deduplicates across multiple operations
|
||||
@@ -3822,10 +3788,10 @@ mod tests {
|
||||
#[test]
|
||||
fn test_merge_operator_single_sided_inputs_union_all() {
|
||||
let (_pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, _pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(_pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, _pager.clone(), index_root_page_id, &index_def, 4);
|
||||
BTreeCursor::new_index(_pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
// Test UNION ALL with inputs coming from only one side at a time
|
||||
@@ -3942,10 +3908,10 @@ mod tests {
|
||||
#[test]
|
||||
fn test_merge_operator_both_sides_empty() {
|
||||
let (_pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, _pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(_pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, _pager.clone(), index_root_page_id, &index_def, 4);
|
||||
BTreeCursor::new_index(_pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
// Test that both sides being empty works correctly
|
||||
@@ -4022,10 +3988,9 @@ mod tests {
|
||||
// Test that aggregate state serialization correctly preserves column indices
|
||||
// when multiple aggregates operate on different columns
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
// Create first operator with SUM(col1), MIN(col3) GROUP BY col0
|
||||
|
||||
@@ -26,6 +26,7 @@ pub struct MvccLazyCursor<Clock: LogicalClock> {
|
||||
tx_id: u64,
|
||||
/// Reusable immutable record, used to allow better allocation strategy.
|
||||
reusable_immutable_record: RefCell<Option<ImmutableRecord>>,
|
||||
_btree_cursor: Box<dyn CursorTrait>,
|
||||
}
|
||||
|
||||
impl<Clock: LogicalClock + 'static> MvccLazyCursor<Clock> {
|
||||
@@ -34,17 +35,21 @@ impl<Clock: LogicalClock + 'static> MvccLazyCursor<Clock> {
|
||||
tx_id: u64,
|
||||
root_page_or_table_id: i64,
|
||||
pager: Arc<Pager>,
|
||||
btree_cursor: Box<dyn CursorTrait>,
|
||||
) -> Result<MvccLazyCursor<Clock>> {
|
||||
assert!(
|
||||
(&*btree_cursor as &dyn Any).is::<BTreeCursor>(),
|
||||
"BTreeCursor expected for mvcc cursor"
|
||||
);
|
||||
let table_id = db.get_table_id_from_root_page(root_page_or_table_id);
|
||||
db.maybe_initialize_table(table_id, pager)?;
|
||||
let cursor = Self {
|
||||
Ok(Self {
|
||||
db,
|
||||
tx_id,
|
||||
current_pos: RefCell::new(CursorPosition::BeforeFirst),
|
||||
table_id,
|
||||
reusable_immutable_record: RefCell::new(None),
|
||||
};
|
||||
Ok(cursor)
|
||||
_btree_cursor: btree_cursor,
|
||||
}
|
||||
|
||||
pub fn current_row(&self) -> Result<Option<Row>> {
|
||||
@@ -375,8 +380,16 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
|
||||
fn get_skip_advance(&self) -> bool {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
fn get_mvcc_cursor(&self) -> Arc<parking_lot::RwLock<crate::MvCursor>> {
|
||||
todo!()
|
||||
impl<Clock: LogicalClock> Debug for MvccLazyCursor<Clock> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("MvccLazyCursor")
|
||||
.field("current_pos", &self.current_pos)
|
||||
.field("table_id", &self.table_id)
|
||||
.field("tx_id", &self.tx_id)
|
||||
.field("reusable_immutable_record", &self.reusable_immutable_record)
|
||||
.field("btree_cursor", &())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -390,7 +390,6 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
|
||||
cursor.clone()
|
||||
} else {
|
||||
let cursor = BTreeCursor::new_table(
|
||||
None,
|
||||
self.pager.clone(),
|
||||
known_root_page as i64,
|
||||
num_columns,
|
||||
@@ -465,12 +464,8 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
|
||||
let cursor = if let Some(cursor) = self.cursors.get(&root_page) {
|
||||
cursor.clone()
|
||||
} else {
|
||||
let cursor = BTreeCursor::new_table(
|
||||
None, // Write directly to B-tree
|
||||
self.pager.clone(),
|
||||
root_page as i64,
|
||||
num_columns,
|
||||
);
|
||||
let cursor =
|
||||
BTreeCursor::new_table(self.pager.clone(), root_page as i64, num_columns);
|
||||
let cursor = Arc::new(RwLock::new(cursor));
|
||||
self.cursors.insert(root_page, cursor.clone());
|
||||
cursor
|
||||
|
||||
@@ -1884,7 +1884,6 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
.value()
|
||||
.unwrap_or_else(|| panic!("Table ID does not have a root page: {table_id}"));
|
||||
let mut cursor = BTreeCursor::new_table(
|
||||
None, // No MVCC cursor for scanning
|
||||
pager.clone(),
|
||||
root_page as i64,
|
||||
1, // We'll adjust this as needed
|
||||
|
||||
@@ -830,6 +830,7 @@ fn test_lazy_scan_cursor_basic() {
|
||||
tx_id,
|
||||
table_id,
|
||||
db.conn.pager.read().clone(),
|
||||
Box::new(BTreeCursor::new(db.conn.pager.read().clone(), table_id, 1)),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -872,6 +873,7 @@ fn test_lazy_scan_cursor_with_gaps() {
|
||||
tx_id,
|
||||
table_id,
|
||||
db.conn.pager.read().clone(),
|
||||
Box::new(BTreeCursor::new(db.conn.pager.read().clone(), table_id, 1)),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -923,6 +925,7 @@ fn test_cursor_basic() {
|
||||
tx_id,
|
||||
table_id,
|
||||
db.conn.pager.read().clone(),
|
||||
Box::new(BTreeCursor::new(db.conn.pager.read().clone(), table_id, 1)),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -977,6 +980,7 @@ fn test_cursor_with_empty_table() {
|
||||
tx_id,
|
||||
table_id,
|
||||
db.conn.pager.read().clone(),
|
||||
Box::new(BTreeCursor::new(db.conn.pager.read().clone(), table_id, 1)),
|
||||
)
|
||||
.unwrap();
|
||||
assert!(cursor.is_empty());
|
||||
@@ -994,6 +998,7 @@ fn test_cursor_modification_during_scan() {
|
||||
tx_id,
|
||||
table_id,
|
||||
db.conn.pager.read().clone(),
|
||||
Box::new(BTreeCursor::new(db.conn.pager.read().clone(), table_id, 1)),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -414,7 +414,7 @@ impl Schema {
|
||||
mv_cursor.is_none(),
|
||||
"mvcc not yet supported for make_from_btree"
|
||||
);
|
||||
let mut cursor = BTreeCursor::new_table(mv_cursor, Arc::clone(&pager), 1, 10);
|
||||
let mut cursor = BTreeCursor::new_table(Arc::clone(&pager), 1, 10);
|
||||
|
||||
let mut from_sql_indexes = Vec::with_capacity(10);
|
||||
let mut automatic_indices: HashMap<String, Vec<(String, i64)>> = HashMap::with_capacity(10);
|
||||
|
||||
@@ -24,7 +24,7 @@ use crate::{
|
||||
RecordCursor, SeekResult,
|
||||
},
|
||||
util::IOExt,
|
||||
Completion, MvCursor, Page,
|
||||
Completion, Page,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
@@ -39,7 +39,6 @@ use super::{
|
||||
write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, MINIMUM_CELL_SIZE,
|
||||
},
|
||||
};
|
||||
use parking_lot::RwLock;
|
||||
use std::{
|
||||
any::Any,
|
||||
cell::{Cell, Ref, RefCell},
|
||||
@@ -556,15 +555,10 @@ pub trait CursorTrait: Any {
|
||||
fn record_cursor_mut(&self) -> std::cell::RefMut<'_, RecordCursor>;
|
||||
fn get_pager(&self) -> Arc<Pager>;
|
||||
fn get_skip_advance(&self) -> bool;
|
||||
|
||||
// FIXME: remove once we implement trait for mvcc
|
||||
fn get_mvcc_cursor(&self) -> Arc<RwLock<MvCursor>>;
|
||||
// --- end: BTreeCursor specific functions ----
|
||||
}
|
||||
|
||||
pub struct BTreeCursor {
|
||||
/// The multi-version cursor that is used to read and write to the database file.
|
||||
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
|
||||
/// The pager that is used to read and write to the database file.
|
||||
pub pager: Arc<Pager>,
|
||||
/// Cached value of the usable space of a BTree page, since it is very expensive to call in a hot loop via pager.usable_space().
|
||||
@@ -667,12 +661,7 @@ impl BTreeNodeState {
|
||||
}
|
||||
|
||||
impl BTreeCursor {
|
||||
pub fn new(
|
||||
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
|
||||
pager: Arc<Pager>,
|
||||
root_page: i64,
|
||||
num_columns: usize,
|
||||
) -> Self {
|
||||
pub fn new(pager: Arc<Pager>, root_page: i64, num_columns: usize) -> Self {
|
||||
let valid_state = if root_page == 1 && !pager.db_state.is_initialized() {
|
||||
CursorValidState::Invalid
|
||||
} else {
|
||||
@@ -680,7 +669,6 @@ impl BTreeCursor {
|
||||
};
|
||||
let usable_space = pager.usable_space();
|
||||
Self {
|
||||
mv_cursor,
|
||||
pager,
|
||||
root_page,
|
||||
usable_space_cached: usable_space,
|
||||
@@ -718,23 +706,12 @@ impl BTreeCursor {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_table(
|
||||
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
|
||||
pager: Arc<Pager>,
|
||||
root_page: i64,
|
||||
num_columns: usize,
|
||||
) -> Self {
|
||||
Self::new(mv_cursor, pager, root_page, num_columns)
|
||||
pub fn new_table(pager: Arc<Pager>, root_page: i64, num_columns: usize) -> Self {
|
||||
Self::new(pager, root_page, num_columns)
|
||||
}
|
||||
|
||||
pub fn new_index(
|
||||
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
|
||||
pager: Arc<Pager>,
|
||||
root_page: i64,
|
||||
index: &Index,
|
||||
num_columns: usize,
|
||||
) -> Self {
|
||||
let mut cursor = Self::new(mv_cursor, pager, root_page, num_columns);
|
||||
pub fn new_index(pager: Arc<Pager>, root_page: i64, index: &Index, num_columns: usize) -> Self {
|
||||
let mut cursor = Self::new(pager, root_page, num_columns);
|
||||
cursor.index_info = Some(IndexInfo::new_from_index(index));
|
||||
cursor
|
||||
}
|
||||
@@ -767,10 +744,6 @@ impl BTreeCursor {
|
||||
let state = self.is_empty_table_state.borrow().clone();
|
||||
match state {
|
||||
EmptyTableState::Start => {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mv_cursor = mv_cursor.read();
|
||||
return Ok(IOResult::Done(mv_cursor.is_empty()));
|
||||
}
|
||||
let (page, c) = self.pager.read_page(self.root_page)?;
|
||||
*self.is_empty_table_state.borrow_mut() = EmptyTableState::ReadPage { page };
|
||||
if let Some(c) = c {
|
||||
@@ -1296,19 +1269,7 @@ impl BTreeCursor {
|
||||
/// Used in forwards iteration, which is the default.
|
||||
#[instrument(skip(self), level = Level::DEBUG, name = "next")]
|
||||
pub fn get_next_record(&mut self) -> Result<IOResult<bool>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mut mv_cursor = mv_cursor.write();
|
||||
assert!(matches!(mv_cursor.next()?, IOResult::Done(_)));
|
||||
let IOResult::Done(rowid) = mv_cursor.rowid()? else {
|
||||
todo!()
|
||||
};
|
||||
match rowid {
|
||||
Some(_rowid) => {
|
||||
return Ok(IOResult::Done(true));
|
||||
}
|
||||
None => return Ok(IOResult::Done(false)),
|
||||
}
|
||||
} else if self.stack.current_page == -1 {
|
||||
if self.stack.current_page == -1 {
|
||||
// This can happen in nested left joins. See:
|
||||
// https://github.com/tursodatabase/turso/issues/2924
|
||||
return Ok(IOResult::Done(false));
|
||||
@@ -1883,10 +1844,6 @@ impl BTreeCursor {
|
||||
/// of iterating cells in order.
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn tablebtree_seek(&mut self, rowid: i64, seek_op: SeekOp) -> Result<IOResult<SeekResult>> {
|
||||
turso_assert!(
|
||||
self.mv_cursor.is_none(),
|
||||
"attempting to seek with MV cursor"
|
||||
);
|
||||
let iter_dir = seek_op.iteration_direction();
|
||||
|
||||
if matches!(
|
||||
@@ -2239,10 +2196,6 @@ impl BTreeCursor {
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result<IOResult<()>> {
|
||||
turso_assert!(
|
||||
self.mv_cursor.is_none(),
|
||||
"attempting to move with MV cursor"
|
||||
);
|
||||
tracing::trace!(?key, ?cmp);
|
||||
// For a table with N rows, we can find any row by row id in O(log(N)) time by starting at the root page and following the B-tree pointers.
|
||||
// B-trees consist of interior pages and leaf pages. Interior pages contain pointers to other pages, while leaf pages contain the actual row data.
|
||||
@@ -4957,7 +4910,6 @@ impl CursorTrait for BTreeCursor {
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn last(&mut self) -> Result<IOResult<()>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
let cursor_has_record = return_if_io!(self.move_to_rightmost());
|
||||
self.has_record.replace(cursor_has_record);
|
||||
self.invalidate_record();
|
||||
@@ -4966,7 +4918,6 @@ impl CursorTrait for BTreeCursor {
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn prev(&mut self) -> Result<IOResult<bool>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
loop {
|
||||
match self.advance_state {
|
||||
AdvanceState::Start => {
|
||||
@@ -4985,16 +4936,6 @@ impl CursorTrait for BTreeCursor {
|
||||
|
||||
#[instrument(skip(self), level = Level::DEBUG)]
|
||||
fn rowid(&self) -> Result<IOResult<Option<i64>>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mv_cursor = mv_cursor.write();
|
||||
let IOResult::Done(rowid) = mv_cursor.rowid()? else {
|
||||
todo!();
|
||||
};
|
||||
let Some(rowid) = rowid else {
|
||||
return Ok(IOResult::Done(None));
|
||||
};
|
||||
return Ok(IOResult::Done(Some(rowid)));
|
||||
}
|
||||
if self.get_null_flag() {
|
||||
return Ok(IOResult::Done(None));
|
||||
}
|
||||
@@ -5017,10 +4958,6 @@ impl CursorTrait for BTreeCursor {
|
||||
|
||||
#[instrument(skip(self, key), level = Level::DEBUG)]
|
||||
fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result<IOResult<SeekResult>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mut mv_cursor = mv_cursor.write();
|
||||
return mv_cursor.seek(key, op);
|
||||
}
|
||||
self.skip_advance.set(false);
|
||||
// Empty trace to capture the span information
|
||||
tracing::trace!("");
|
||||
@@ -5038,7 +4975,7 @@ impl CursorTrait for BTreeCursor {
|
||||
|
||||
#[instrument(skip(self), level = Level::DEBUG)]
|
||||
fn record(&self) -> Result<IOResult<Option<Ref<'_, ImmutableRecord>>>> {
|
||||
if !self.has_record.get() && self.mv_cursor.is_none() {
|
||||
if !self.has_record.get() {
|
||||
return Ok(IOResult::Done(None));
|
||||
}
|
||||
let invalidated = self
|
||||
@@ -5052,25 +4989,6 @@ impl CursorTrait for BTreeCursor {
|
||||
.unwrap();
|
||||
return Ok(IOResult::Done(Some(record_ref)));
|
||||
}
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mv_cursor = mv_cursor.write();
|
||||
let Some(row) = mv_cursor.current_row()? else {
|
||||
return Ok(IOResult::Done(None));
|
||||
};
|
||||
self.get_immutable_record_or_create()
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.invalidate();
|
||||
self.get_immutable_record_or_create()
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.start_serialization(&row.data);
|
||||
self.record_cursor.borrow_mut().invalidate();
|
||||
let record_ref =
|
||||
Ref::filter_map(self.reusable_immutable_record.borrow(), |opt| opt.as_ref())
|
||||
.unwrap();
|
||||
return Ok(IOResult::Done(Some(record_ref)));
|
||||
}
|
||||
|
||||
let page = self.stack.top_ref();
|
||||
let contents = page.get_contents();
|
||||
@@ -5118,17 +5036,10 @@ impl CursorTrait for BTreeCursor {
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn insert(&mut self, key: &BTreeKey) -> Result<IOResult<()>> {
|
||||
tracing::debug!(valid_state = ?self.valid_state, cursor_state = ?self.state, is_write_in_progress = self.is_write_in_progress());
|
||||
match &self.mv_cursor {
|
||||
Some(mv_cursor) => {
|
||||
return_if_io!(mv_cursor.write().insert(key));
|
||||
}
|
||||
None => {
|
||||
return_if_io!(self.insert_into_page(key));
|
||||
if key.maybe_rowid().is_some() {
|
||||
self.has_record.replace(true);
|
||||
}
|
||||
}
|
||||
};
|
||||
return_if_io!(self.insert_into_page(key));
|
||||
if key.maybe_rowid().is_some() {
|
||||
self.has_record.replace(true);
|
||||
}
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
|
||||
@@ -5146,11 +5057,6 @@ impl CursorTrait for BTreeCursor {
|
||||
/// 9. SeekAfterBalancing -> adjust the cursor to a node that is closer to the deleted value. go to Finish
|
||||
/// 10. Finish -> Delete operation is done. Return CursorResult(Ok())
|
||||
fn delete(&mut self) -> Result<IOResult<()>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
return_if_io!(mv_cursor.write().delete());
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
|
||||
if let CursorState::None = &self.state {
|
||||
self.state = CursorState::Delete(DeleteState::Start);
|
||||
}
|
||||
@@ -5524,7 +5430,6 @@ impl CursorTrait for BTreeCursor {
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn exists(&mut self, key: &Value) -> Result<IOResult<bool>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
let int_key = match key {
|
||||
Value::Integer(i) => i,
|
||||
_ => unreachable!("btree tables are indexed by integers!"),
|
||||
@@ -5561,10 +5466,6 @@ impl CursorTrait for BTreeCursor {
|
||||
///
|
||||
/// Only supposed to be used in the context of a simple Count Select Statement
|
||||
fn count(&mut self) -> Result<IOResult<usize>> {
|
||||
if let Some(_mv_cursor) = &self.mv_cursor {
|
||||
todo!("Implement count for mvcc");
|
||||
}
|
||||
|
||||
let mut mem_page;
|
||||
let mut contents;
|
||||
|
||||
@@ -5684,14 +5585,9 @@ impl CursorTrait for BTreeCursor {
|
||||
match self.rewind_state {
|
||||
RewindState::Start => {
|
||||
self.rewind_state = RewindState::NextRecord;
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mut mv_cursor = mv_cursor.write();
|
||||
return_if_io!(mv_cursor.rewind());
|
||||
} else {
|
||||
let c = self.move_to_root()?;
|
||||
if let Some(c) = c {
|
||||
io_yield_one!(c);
|
||||
}
|
||||
let c = self.move_to_root()?;
|
||||
if let Some(c) = c {
|
||||
io_yield_one!(c);
|
||||
}
|
||||
}
|
||||
RewindState::NextRecord => {
|
||||
@@ -5744,7 +5640,6 @@ impl CursorTrait for BTreeCursor {
|
||||
}
|
||||
|
||||
fn seek_end(&mut self) -> Result<IOResult<()>> {
|
||||
assert!(self.mv_cursor.is_none()); // unsure about this -_-
|
||||
loop {
|
||||
match self.seek_end_state {
|
||||
SeekEndState::Start => {
|
||||
@@ -5780,16 +5675,11 @@ impl CursorTrait for BTreeCursor {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_mvcc_cursor(&self) -> Arc<RwLock<MvCursor>> {
|
||||
self.mv_cursor.as_ref().unwrap().clone()
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn seek_to_last(&mut self) -> Result<IOResult<()>> {
|
||||
loop {
|
||||
match self.seek_to_last_state {
|
||||
SeekToLastState::Start => {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
let has_record = return_if_io!(self.move_to_rightmost());
|
||||
self.invalidate_record();
|
||||
self.has_record.replace(has_record);
|
||||
@@ -8076,7 +7966,7 @@ mod tests {
|
||||
|
||||
fn validate_btree(pager: Arc<Pager>, page_idx: i64) -> (usize, bool) {
|
||||
let num_columns = 5;
|
||||
let cursor = BTreeCursor::new_table(None, pager.clone(), page_idx, num_columns);
|
||||
let cursor = BTreeCursor::new_table(pager.clone(), page_idx, num_columns);
|
||||
let (page, _c) = cursor.read_page(page_idx).unwrap();
|
||||
while page.is_locked() {
|
||||
pager.io.step().unwrap();
|
||||
@@ -8187,7 +8077,7 @@ mod tests {
|
||||
fn format_btree(pager: Arc<Pager>, page_idx: i64, depth: usize) -> String {
|
||||
let num_columns = 5;
|
||||
|
||||
let cursor = BTreeCursor::new_table(None, pager.clone(), page_idx, num_columns);
|
||||
let cursor = BTreeCursor::new_table(pager.clone(), page_idx, num_columns);
|
||||
let (page, _c) = cursor.read_page(page_idx).unwrap();
|
||||
while page.is_locked() {
|
||||
pager.io.step().unwrap();
|
||||
@@ -8269,7 +8159,7 @@ mod tests {
|
||||
let conn = db.connect().unwrap();
|
||||
let pager = conn.pager.read().clone();
|
||||
|
||||
let mut cursor = BTreeCursor::new(None, pager, 1, 5);
|
||||
let mut cursor = BTreeCursor::new(pager, 1, 5);
|
||||
let result = cursor.rewind()?;
|
||||
assert!(matches!(result, IOResult::Done(_)));
|
||||
let result = cursor.next()?;
|
||||
@@ -8299,7 +8189,7 @@ mod tests {
|
||||
let large_record = ImmutableRecord::from_registers(regs, regs.len());
|
||||
|
||||
// Create cursor for the table
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
|
||||
let initial_pagecount = pager
|
||||
.io
|
||||
@@ -8451,7 +8341,7 @@ mod tests {
|
||||
let (pager, root_page, _, _) = empty_btree();
|
||||
let num_columns = 5;
|
||||
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
for (key, size) in sequence.iter() {
|
||||
run_until_done(
|
||||
|| {
|
||||
@@ -8517,7 +8407,7 @@ mod tests {
|
||||
|
||||
for _ in 0..attempts {
|
||||
let (pager, root_page, _db, conn) = empty_btree();
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
let mut keys = SortedVec::new();
|
||||
tracing::info!("seed: {seed}");
|
||||
for insert_id in 0..inserts {
|
||||
@@ -8659,13 +8549,8 @@ mod tests {
|
||||
has_rowid: false,
|
||||
};
|
||||
let num_columns = index_def.columns.len();
|
||||
let mut cursor = BTreeCursor::new_index(
|
||||
None,
|
||||
pager.clone(),
|
||||
index_root_page,
|
||||
&index_def,
|
||||
num_columns,
|
||||
);
|
||||
let mut cursor =
|
||||
BTreeCursor::new_index(pager.clone(), index_root_page, &index_def, num_columns);
|
||||
let mut keys = SortedVec::new();
|
||||
tracing::info!("seed: {seed}");
|
||||
for i in 0..inserts {
|
||||
@@ -8822,8 +8707,7 @@ mod tests {
|
||||
ephemeral: false,
|
||||
has_rowid: false,
|
||||
};
|
||||
let mut cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page, &index_def, 1);
|
||||
let mut cursor = BTreeCursor::new_index(pager.clone(), index_root_page, &index_def, 1);
|
||||
|
||||
// Track expected keys that should be present in the tree
|
||||
let mut expected_keys = Vec::new();
|
||||
@@ -9214,7 +9098,7 @@ mod tests {
|
||||
let pager = setup_test_env(5);
|
||||
let num_columns = 5;
|
||||
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), 1, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), 1, num_columns);
|
||||
|
||||
let max_local = payload_overflow_threshold_max(PageType::TableLeaf, 4096);
|
||||
let usable_size = cursor.usable_space();
|
||||
@@ -9325,7 +9209,7 @@ mod tests {
|
||||
let pager = setup_test_env(5);
|
||||
let num_columns = 5;
|
||||
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), 1, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), 1, num_columns);
|
||||
|
||||
let small_payload = vec![b'A'; 10];
|
||||
|
||||
@@ -9374,7 +9258,7 @@ mod tests {
|
||||
let pager = setup_test_env(initial_size);
|
||||
let num_columns = 5;
|
||||
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), 2, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), 2, num_columns);
|
||||
|
||||
// Initialize page 2 as a root page (interior)
|
||||
let root_page = run_until_done(
|
||||
@@ -9467,7 +9351,7 @@ mod tests {
|
||||
let num_columns = 5;
|
||||
let record_count = 10;
|
||||
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
|
||||
for rowid in 1..=record_count {
|
||||
insert_record(&mut cursor, &pager, rowid, Value::Integer(rowid))?;
|
||||
@@ -9492,7 +9376,7 @@ mod tests {
|
||||
let num_columns = 5;
|
||||
let record_count = 1000;
|
||||
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
|
||||
for rowid in 1..=record_count {
|
||||
insert_record(&mut cursor, &pager, rowid, Value::Integer(rowid))?;
|
||||
@@ -9518,7 +9402,7 @@ mod tests {
|
||||
let num_columns = 5;
|
||||
let record_count = 1000;
|
||||
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
|
||||
for rowid in 1..=record_count {
|
||||
insert_record(&mut cursor, &pager, rowid, Value::Integer(rowid))?;
|
||||
@@ -9558,8 +9442,8 @@ mod tests {
|
||||
let num_columns = 5;
|
||||
let record_count = 1000;
|
||||
|
||||
let mut cursor1 = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor2 = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor1 = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
let mut cursor2 = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
|
||||
// Use cursor1 to insert records
|
||||
for rowid in 1..=record_count {
|
||||
@@ -9592,7 +9476,7 @@ mod tests {
|
||||
let num_columns = 5;
|
||||
let record_count = 100;
|
||||
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
|
||||
let initial_page_count = pager
|
||||
.io
|
||||
@@ -10228,7 +10112,7 @@ mod tests {
|
||||
let num_columns = 5;
|
||||
|
||||
for i in 0..10000 {
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
tracing::info!("INSERT INTO t VALUES ({});", i,);
|
||||
let regs = &[Register::Value(Value::Integer(i))];
|
||||
let value = ImmutableRecord::from_registers(regs, regs.len());
|
||||
@@ -10256,7 +10140,7 @@ mod tests {
|
||||
format_btree(pager.clone(), root_page, 0)
|
||||
);
|
||||
for key in keys.iter() {
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
let key = Value::Integer(*key);
|
||||
let exists = run_until_done(|| cursor.exists(&key), pager.deref()).unwrap();
|
||||
assert!(exists, "key not found {key}");
|
||||
@@ -10315,7 +10199,7 @@ mod tests {
|
||||
|
||||
// Insert 10,000 records in to the BTree.
|
||||
for i in 1..=10000 {
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
let regs = &[Register::Value(Value::Text(Text::new("hello world")))];
|
||||
let value = ImmutableRecord::from_registers(regs, regs.len());
|
||||
|
||||
@@ -10342,7 +10226,7 @@ mod tests {
|
||||
|
||||
// Delete records with 500 <= key <= 3500
|
||||
for i in 500..=3500 {
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
let seek_key = SeekKey::TableRowId(i);
|
||||
|
||||
let seek_result = run_until_done(
|
||||
@@ -10362,7 +10246,7 @@ mod tests {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
let key = Value::Integer(i);
|
||||
let exists = run_until_done(|| cursor.exists(&key), pager.deref()).unwrap();
|
||||
assert!(exists, "Key {i} should exist but doesn't");
|
||||
@@ -10370,7 +10254,7 @@ mod tests {
|
||||
|
||||
// Verify the deleted records don't exist.
|
||||
for i in 500..=3500 {
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
let key = Value::Integer(i);
|
||||
let exists = run_until_done(|| cursor.exists(&key), pager.deref()).unwrap();
|
||||
assert!(!exists, "Deleted key {i} still exists");
|
||||
@@ -10393,7 +10277,7 @@ mod tests {
|
||||
let num_columns = 5;
|
||||
|
||||
for (i, huge_text) in huge_texts.iter().enumerate().take(iterations) {
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
tracing::info!("INSERT INTO t VALUES ({});", i,);
|
||||
let regs = &[Register::Value(Value::Text(Text {
|
||||
value: huge_text.as_bytes().to_vec(),
|
||||
@@ -10423,7 +10307,7 @@ mod tests {
|
||||
format_btree(pager.clone(), root_page, 0)
|
||||
);
|
||||
}
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new_table(pager.clone(), root_page, num_columns);
|
||||
let _c = cursor.move_to_root().unwrap();
|
||||
for i in 0..iterations {
|
||||
let has_next = run_until_done(|| cursor.next(), pager.deref()).unwrap();
|
||||
@@ -10441,7 +10325,7 @@ mod tests {
|
||||
pub fn test_read_write_payload_with_offset() {
|
||||
let (pager, root_page, _, _) = empty_btree();
|
||||
let num_columns = 5;
|
||||
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new(pager.clone(), root_page, num_columns);
|
||||
let offset = 2; // blobs data starts at offset 2
|
||||
let initial_text = "hello world";
|
||||
let initial_blob = initial_text.as_bytes().to_vec();
|
||||
@@ -10518,7 +10402,7 @@ mod tests {
|
||||
pub fn test_read_write_payload_with_overflow_page() {
|
||||
let (pager, root_page, _, _) = empty_btree();
|
||||
let num_columns = 5;
|
||||
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page, num_columns);
|
||||
let mut cursor = BTreeCursor::new(pager.clone(), root_page, num_columns);
|
||||
let mut large_blob = vec![b'A'; 40960 - 11]; // insert large blob. 40960 = 10 page long.
|
||||
let hello_world = b"hello world";
|
||||
large_blob.extend_from_slice(hello_world);
|
||||
|
||||
@@ -36,6 +36,7 @@ use crate::{
|
||||
translate::emitter::TransactionMode,
|
||||
};
|
||||
use crate::{get_cursor, CheckpointMode, Connection, MvCursor};
|
||||
use std::any::Any;
|
||||
use std::env::temp_dir;
|
||||
use std::ops::DerefMut;
|
||||
use std::{
|
||||
@@ -1044,16 +1045,9 @@ pub fn op_open_read(
|
||||
let pager = program.get_pager_from_database_index(db);
|
||||
|
||||
let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap();
|
||||
let mv_cursor = if let Some(tx_id) = program.connection.get_mv_tx_id() {
|
||||
let mv_store = mv_store.unwrap().clone();
|
||||
let mv_cursor = Arc::new(RwLock::new(
|
||||
MvCursor::new(mv_store, tx_id, *root_page, pager.clone()).unwrap(),
|
||||
));
|
||||
Some(mv_cursor)
|
||||
} else {
|
||||
if program.connection.get_mv_tx_id().is_none() {
|
||||
assert!(*root_page >= 0, "");
|
||||
None
|
||||
};
|
||||
}
|
||||
let cursors = &mut state.cursors;
|
||||
let num_columns = match cursor_type {
|
||||
CursorType::BTreeTable(table_rc) => table_rc.columns.len(),
|
||||
@@ -1062,16 +1056,33 @@ pub fn op_open_read(
|
||||
_ => unreachable!("This should not have happened"),
|
||||
};
|
||||
|
||||
let maybe_promote_to_mvcc_cursor =
|
||||
|btree_cursor: Box<dyn CursorTrait>| -> Result<Box<dyn CursorTrait>> {
|
||||
if let Some(tx_id) = program.connection.get_mv_tx_id() {
|
||||
let mv_store = mv_store.unwrap().clone();
|
||||
Ok(Box::new(MvCursor::new(
|
||||
mv_store,
|
||||
tx_id,
|
||||
*root_page,
|
||||
pager.clone(),
|
||||
btree_cursor,
|
||||
)?))
|
||||
} else {
|
||||
Ok(btree_cursor)
|
||||
}
|
||||
};
|
||||
|
||||
match cursor_type {
|
||||
CursorType::MaterializedView(_, view_mutex) => {
|
||||
// This is a materialized view with storage
|
||||
// Create btree cursor for reading the persistent data
|
||||
|
||||
let btree_cursor = Box::new(BTreeCursor::new_table(
|
||||
mv_cursor,
|
||||
pager.clone(),
|
||||
*root_page,
|
||||
num_columns,
|
||||
));
|
||||
let cursor = maybe_promote_to_mvcc_cursor(btree_cursor)?;
|
||||
|
||||
// Get the view name and look up or create its transaction state
|
||||
let view_name = view_mutex.lock().unwrap().name().to_string();
|
||||
@@ -1082,7 +1093,7 @@ pub fn op_open_read(
|
||||
|
||||
// Create materialized view cursor with this view's transaction state
|
||||
let mv_cursor = crate::incremental::cursor::MaterializedViewCursor::new(
|
||||
btree_cursor,
|
||||
cursor,
|
||||
view_mutex.clone(),
|
||||
pager.clone(),
|
||||
tx_state,
|
||||
@@ -1095,24 +1106,29 @@ pub fn op_open_read(
|
||||
}
|
||||
CursorType::BTreeTable(_) => {
|
||||
// Regular table
|
||||
let cursor = BTreeCursor::new_table(mv_cursor, pager.clone(), *root_page, num_columns);
|
||||
let btree_cursor = Box::new(BTreeCursor::new_table(
|
||||
pager.clone(),
|
||||
*root_page,
|
||||
num_columns,
|
||||
));
|
||||
let cursor = maybe_promote_to_mvcc_cursor(btree_cursor)?;
|
||||
cursors
|
||||
.get_mut(*cursor_id)
|
||||
.unwrap()
|
||||
.replace(Cursor::new_btree(Box::new(cursor)));
|
||||
.replace(Cursor::new_btree(cursor));
|
||||
}
|
||||
CursorType::BTreeIndex(index) => {
|
||||
let cursor = BTreeCursor::new_index(
|
||||
mv_cursor,
|
||||
let btree_cursor = Box::new(BTreeCursor::new_index(
|
||||
pager.clone(),
|
||||
*root_page,
|
||||
index.as_ref(),
|
||||
num_columns,
|
||||
);
|
||||
));
|
||||
let cursor = maybe_promote_to_mvcc_cursor(btree_cursor)?;
|
||||
cursors
|
||||
.get_mut(*cursor_id)
|
||||
.unwrap()
|
||||
.replace(Cursor::new_btree(Box::new(cursor)));
|
||||
.replace(Cursor::new_btree(cursor));
|
||||
}
|
||||
CursorType::Pseudo(_) => {
|
||||
panic!("OpenRead on pseudo cursor");
|
||||
@@ -6423,13 +6439,13 @@ pub fn op_new_rowid(
|
||||
},
|
||||
insn
|
||||
);
|
||||
|
||||
if let Some(mv_store) = mv_store {
|
||||
// With MVCC we can't simply find last rowid and get rowid + 1 as a result. To not have two conflicting rowids concurrently we need to call `get_next_rowid`
|
||||
// which will make sure we don't collide.
|
||||
let rowid = {
|
||||
let cursor = state.get_cursor(*cursor);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let mvcc_cursor = cursor.get_mvcc_cursor();
|
||||
let mut mvcc_cursor = mvcc_cursor.write();
|
||||
let cursor = cursor.as_btree_mut() as &mut dyn Any;
|
||||
let mvcc_cursor = cursor.downcast_mut::<MvCursor>().unwrap();
|
||||
mvcc_cursor.get_next_rowid()
|
||||
};
|
||||
state.registers[*rowid_reg] = Register::Value(Value::Integer(rowid));
|
||||
@@ -6706,16 +6722,10 @@ pub fn op_not_exists(
|
||||
},
|
||||
insn
|
||||
);
|
||||
let exists = if let Some(mv_store) = mv_store {
|
||||
let cursor = must_be_btree_cursor!(*cursor, program.cursor_ref, state, "NotExists");
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let mvcc_cursor = cursor.get_mvcc_cursor();
|
||||
false
|
||||
} else {
|
||||
let cursor = must_be_btree_cursor!(*cursor, program.cursor_ref, state, "NotExists");
|
||||
let cursor = cursor.as_btree_mut();
|
||||
return_if_io!(cursor.exists(state.registers[*rowid_reg].get_value()))
|
||||
};
|
||||
let cursor = must_be_btree_cursor!(*cursor, program.cursor_ref, state, "NotExists");
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let exists = return_if_io!(cursor.exists(state.registers[*rowid_reg].get_value()));
|
||||
|
||||
if exists {
|
||||
state.pc += 1;
|
||||
} else {
|
||||
@@ -6823,15 +6833,21 @@ pub fn op_open_write(
|
||||
CursorType::BTreeIndex(index) => Some(index),
|
||||
_ => None,
|
||||
};
|
||||
let mv_cursor = if let Some(tx_id) = program.connection.get_mv_tx_id() {
|
||||
let mv_store = mv_store.unwrap().clone();
|
||||
let mv_cursor = Arc::new(RwLock::new(
|
||||
MvCursor::new(mv_store.clone(), tx_id, root_page, pager.clone()).unwrap(),
|
||||
));
|
||||
Some(mv_cursor)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let maybe_promote_to_mvcc_cursor =
|
||||
|btree_cursor: Box<dyn CursorTrait>| -> Result<Box<dyn CursorTrait>> {
|
||||
if let Some(tx_id) = program.connection.get_mv_tx_id() {
|
||||
let mv_store = mv_store.unwrap().clone();
|
||||
Ok(Box::new(MvCursor::new(
|
||||
mv_store,
|
||||
tx_id,
|
||||
root_page,
|
||||
pager.clone(),
|
||||
btree_cursor,
|
||||
)?))
|
||||
} else {
|
||||
Ok(btree_cursor)
|
||||
}
|
||||
};
|
||||
if let Some(index) = maybe_index {
|
||||
let conn = program.connection.clone();
|
||||
let schema = conn.schema.read();
|
||||
@@ -6840,17 +6856,17 @@ pub fn op_open_write(
|
||||
.and_then(|table| table.btree());
|
||||
|
||||
let num_columns = index.columns.len();
|
||||
let cursor = BTreeCursor::new_index(
|
||||
mv_cursor,
|
||||
let btree_cursor = Box::new(BTreeCursor::new_index(
|
||||
pager.clone(),
|
||||
root_page,
|
||||
index.as_ref(),
|
||||
num_columns,
|
||||
);
|
||||
));
|
||||
let cursor = maybe_promote_to_mvcc_cursor(btree_cursor)?;
|
||||
cursors
|
||||
.get_mut(*cursor_id)
|
||||
.unwrap()
|
||||
.replace(Cursor::new_btree(Box::new(cursor)));
|
||||
.replace(Cursor::new_btree(cursor));
|
||||
} else {
|
||||
let num_columns = match cursor_type {
|
||||
CursorType::BTreeTable(table_rc) => table_rc.columns.len(),
|
||||
@@ -6860,11 +6876,16 @@ pub fn op_open_write(
|
||||
),
|
||||
};
|
||||
|
||||
let cursor = BTreeCursor::new_table(mv_cursor, pager.clone(), root_page, num_columns);
|
||||
let btree_cursor = Box::new(BTreeCursor::new_table(
|
||||
pager.clone(),
|
||||
root_page,
|
||||
num_columns,
|
||||
));
|
||||
let cursor = maybe_promote_to_mvcc_cursor(btree_cursor)?;
|
||||
cursors
|
||||
.get_mut(*cursor_id)
|
||||
.unwrap()
|
||||
.replace(Cursor::new_btree(Box::new(cursor)));
|
||||
.replace(Cursor::new_btree(cursor));
|
||||
}
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
@@ -6958,7 +6979,7 @@ pub fn op_destroy(
|
||||
OpDestroyState::CreateCursor => {
|
||||
// Destroy doesn't do anything meaningful with the table/index distinction so we can just use a
|
||||
// table btree cursor for both.
|
||||
let cursor = BTreeCursor::new(None, pager.clone(), *root, 0);
|
||||
let cursor = BTreeCursor::new(pager.clone(), *root, 0);
|
||||
state.op_destroy_state =
|
||||
OpDestroyState::DestroyBtree(Arc::new(RwLock::new(cursor)));
|
||||
}
|
||||
@@ -7623,9 +7644,9 @@ pub fn op_open_ephemeral(
|
||||
};
|
||||
|
||||
let cursor = if let CursorType::BTreeIndex(index) = cursor_type {
|
||||
BTreeCursor::new_index(None, pager.clone(), root_page, index, num_columns)
|
||||
BTreeCursor::new_index(pager.clone(), root_page, index, num_columns)
|
||||
} else {
|
||||
BTreeCursor::new_table(None, pager.clone(), root_page, num_columns)
|
||||
BTreeCursor::new_table(pager.clone(), root_page, num_columns)
|
||||
};
|
||||
state.op_open_ephemeral_state = OpOpenEphemeralState::Rewind {
|
||||
cursor: Box::new(cursor),
|
||||
@@ -7704,29 +7725,32 @@ pub fn op_open_dup(
|
||||
// a separate database file).
|
||||
let pager = original_cursor.get_pager();
|
||||
|
||||
let mv_cursor = if let Some(tx_id) = program.connection.get_mv_tx_id() {
|
||||
let mv_store = mv_store.unwrap().clone();
|
||||
let mv_cursor = Arc::new(RwLock::new(MvCursor::new(
|
||||
mv_store,
|
||||
tx_id,
|
||||
root_page,
|
||||
pager.clone(),
|
||||
)?));
|
||||
Some(mv_cursor)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let (_, cursor_type) = program.cursor_ref.get(*original_cursor_id).unwrap();
|
||||
match cursor_type {
|
||||
CursorType::BTreeTable(table) => {
|
||||
let cursor =
|
||||
BTreeCursor::new_table(mv_cursor, pager.clone(), root_page, table.columns.len());
|
||||
let cursor = Box::new(BTreeCursor::new_table(
|
||||
pager.clone(),
|
||||
root_page,
|
||||
table.columns.len(),
|
||||
));
|
||||
let cursor: Box<dyn CursorTrait> =
|
||||
if let Some(tx_id) = program.connection.get_mv_tx_id() {
|
||||
let mv_store = mv_store.unwrap().clone();
|
||||
Box::new(MvCursor::new(
|
||||
mv_store,
|
||||
tx_id,
|
||||
root_page,
|
||||
pager.clone(),
|
||||
cursor,
|
||||
)?)
|
||||
} else {
|
||||
cursor
|
||||
};
|
||||
let cursors = &mut state.cursors;
|
||||
cursors
|
||||
.get_mut(*new_cursor_id)
|
||||
.unwrap()
|
||||
.replace(Cursor::new_btree(Box::new(cursor)));
|
||||
.replace(Cursor::new_btree(cursor));
|
||||
}
|
||||
CursorType::BTreeIndex(table) => {
|
||||
// In principle, we could implement OpenDup for BTreeIndex,
|
||||
|
||||
Reference in New Issue
Block a user