From 3565e7978aae5e0527f147f140123625b2736fd0 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Sat, 13 Sep 2025 14:41:39 -0500 Subject: [PATCH] Add an index to the dbsp internal table And also change the schema of the main table. I have come to see the current key-value schema as inadequate for non-aggregate operators. Calculating Min/Max, for example, doesn't feat in this schema because we have to be able to track existing values and index them. Another alternative is to keep one table per operator type, but this quickly leads to an explosion of tables. --- core/incremental/compiler.rs | 283 +++++++++++--- core/incremental/operator.rs | 641 +++++++++++++++++++++++--------- core/incremental/persistence.rs | 226 ++++++++--- core/incremental/view.rs | 21 +- core/schema.rs | 48 ++- core/translate/view.rs | 45 ++- core/util.rs | 12 +- 7 files changed, 968 insertions(+), 308 deletions(-) diff --git a/core/incremental/compiler.rs b/core/incremental/compiler.rs index 9dc85ad79..b15dc547c 100644 --- a/core/incremental/compiler.rs +++ b/core/incremental/compiler.rs @@ -8,15 +8,15 @@ use crate::incremental::dbsp::{Delta, DeltaPair}; use crate::incremental::expr_compiler::CompiledExpression; use crate::incremental::operator::{ - EvalState, FilterOperator, FilterPredicate, IncrementalOperator, InputOperator, ProjectOperator, + create_dbsp_state_index, DbspStateCursors, EvalState, FilterOperator, FilterPredicate, + IncrementalOperator, InputOperator, ProjectOperator, }; -use crate::incremental::persistence::WriteRow; -use crate::storage::btree::BTreeCursor; +use crate::storage::btree::{BTreeCursor, BTreeKey}; // Note: logical module must be made pub(crate) in translate/mod.rs use crate::translate::logical::{ BinaryOperator, LogicalExpr, LogicalPlan, LogicalSchema, SchemaRef, }; -use crate::types::{IOResult, SeekKey, Value}; +use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult, Value}; use crate::Pager; use crate::{return_and_restore_if_io, return_if_io, LimboError, Result}; use std::collections::HashMap; @@ -24,8 +24,120 @@ use std::fmt::{self, Display, Formatter}; use std::rc::Rc; use std::sync::Arc; -// The state table is always a key-value store with 3 columns: key, state, and weight. -const OPERATOR_COLUMNS: usize = 3; +// The state table has 5 columns: operator_id, zset_id, element_id, value, weight +const OPERATOR_COLUMNS: usize = 5; + +/// State machine for writing rows to simple materialized views (table-only, no index) +#[derive(Debug, Default)] +pub enum WriteRowView { + #[default] + GetRecord, + Delete, + Insert { + final_weight: isize, + }, + Done, +} + +impl WriteRowView { + pub fn new() -> Self { + Self::default() + } + + /// Write a row with weight management for table-only storage. + /// + /// # Arguments + /// * `cursor` - BTree cursor for the storage + /// * `key` - The key to seek (TableRowId) + /// * `build_record` - Function that builds the record values to insert. + /// Takes the final_weight and returns the complete record values. + /// * `weight` - The weight delta to apply + pub fn write_row( + &mut self, + cursor: &mut BTreeCursor, + key: SeekKey, + build_record: impl Fn(isize) -> Vec, + weight: isize, + ) -> Result> { + loop { + match self { + WriteRowView::GetRecord => { + let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); + if !matches!(res, SeekResult::Found) { + *self = WriteRowView::Insert { + final_weight: weight, + }; + } else { + let existing_record = return_if_io!(cursor.record()); + let r = existing_record.ok_or_else(|| { + LimboError::InternalError(format!( + "Found key {key:?} in storage but could not read record" + )) + })?; + let values = r.get_values(); + + // Weight is always the last value + let existing_weight = match values.last() { + Some(val) => match val.to_owned() { + Value::Integer(w) => w as isize, + _ => { + return Err(LimboError::InternalError(format!( + "Invalid weight value in storage for key {key:?}" + ))) + } + }, + None => { + return Err(LimboError::InternalError(format!( + "No weight value found in storage for key {key:?}" + ))) + } + }; + + let final_weight = existing_weight + weight; + if final_weight <= 0 { + *self = WriteRowView::Delete + } else { + *self = WriteRowView::Insert { final_weight } + } + } + } + WriteRowView::Delete => { + // Mark as Done before delete to avoid retry on I/O + *self = WriteRowView::Done; + return_if_io!(cursor.delete()); + } + WriteRowView::Insert { final_weight } => { + return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); + + // Extract the row ID from the key + let key_i64 = match key { + SeekKey::TableRowId(id) => id, + _ => { + return Err(LimboError::InternalError( + "Expected TableRowId for storage".to_string(), + )) + } + }; + + // Build the record values using the provided function + let record_values = build_record(*final_weight); + + // Create an ImmutableRecord from the values + let immutable_record = + ImmutableRecord::from_values(&record_values, record_values.len()); + let btree_key = BTreeKey::new_table_rowid(key_i64, Some(&immutable_record)); + + // Mark as Done before insert to avoid retry on I/O + *self = WriteRowView::Done; + return_if_io!(cursor.insert(&btree_key)); + } + WriteRowView::Done => { + return Ok(IOResult::Done(())); + } + } + } + } +} /// State machine for commit operations pub enum CommitState { @@ -36,8 +148,8 @@ pub enum CommitState { CommitOperators { /// Execute state for running the circuit execute_state: Box, - /// Persistent cursor for operator state btree (internal_state_root) - state_cursor: Box, + /// Persistent cursors for operator state (table and index) + state_cursors: Box, }, /// Updating the materialized view with the delta @@ -47,7 +159,7 @@ pub enum CommitState { /// Current index in delta.changes being processed current_index: usize, /// State for writing individual rows - write_row_state: WriteRow, + write_row_state: WriteRowView, /// Cursor for view data btree - created fresh for each row view_cursor: Box, }, @@ -60,7 +172,8 @@ impl std::fmt::Debug for CommitState { Self::CommitOperators { execute_state, .. } => f .debug_struct("CommitOperators") .field("execute_state", execute_state) - .field("has_state_cursor", &true) + .field("has_state_table_cursor", &true) + .field("has_state_index_cursor", &true) .finish(), Self::UpdateView { delta, @@ -221,25 +334,13 @@ impl std::fmt::Debug for DbspNode { impl DbspNode { fn process_node( &mut self, - pager: Rc, eval_state: &mut EvalState, - root_page: usize, commit_operators: bool, - state_cursor: Option<&mut Box>, + cursors: &mut DbspStateCursors, ) -> Result> { // Process delta using the executable operator let op = &mut self.executable; - // Use provided cursor or create a local one - let mut local_cursor; - let cursor = if let Some(cursor) = state_cursor { - cursor.as_mut() - } else { - // Create a local cursor if none was provided - local_cursor = BTreeCursor::new_table(None, pager.clone(), root_page, OPERATOR_COLUMNS); - &mut local_cursor - }; - let state = if commit_operators { // Clone the deltas from eval_state - don't extract them // in case we need to re-execute due to I/O @@ -247,12 +348,12 @@ impl DbspNode { EvalState::Init { deltas } => deltas.clone(), _ => panic!("commit can only be called when eval_state is in Init state"), }; - let result = return_if_io!(op.commit(deltas, cursor)); + let result = return_if_io!(op.commit(deltas, cursors)); // After successful commit, move state to Done *eval_state = EvalState::Done; result } else { - return_if_io!(op.eval(eval_state, cursor)) + return_if_io!(op.eval(eval_state, cursors)) }; Ok(IOResult::Done(state)) } @@ -275,14 +376,20 @@ pub struct DbspCircuit { /// Root page for the main materialized view data pub(super) main_data_root: usize, - /// Root page for internal DBSP state + /// Root page for internal DBSP state table pub(super) internal_state_root: usize, + /// Root page for the DBSP state table's primary key index + pub(super) internal_state_index_root: usize, } impl DbspCircuit { /// Create a new empty circuit with initial empty schema /// The actual output schema will be set when the root node is established - pub fn new(main_data_root: usize, internal_state_root: usize) -> Self { + pub fn new( + main_data_root: usize, + internal_state_root: usize, + internal_state_index_root: usize, + ) -> Self { // Start with an empty schema - will be updated when root is set let empty_schema = Arc::new(LogicalSchema::new(vec![])); Self { @@ -293,6 +400,7 @@ impl DbspCircuit { commit_state: CommitState::Init, main_data_root, internal_state_root, + internal_state_index_root, } } @@ -326,18 +434,18 @@ impl DbspCircuit { pub fn run_circuit( &mut self, - pager: Rc, execute_state: &mut ExecuteState, + pager: &Rc, + state_cursors: &mut DbspStateCursors, commit_operators: bool, - state_cursor: &mut Box, ) -> Result> { if let Some(root_id) = self.root { self.execute_node( root_id, - pager, + pager.clone(), execute_state, commit_operators, - Some(state_cursor), + state_cursors, ) } else { Err(LimboError::ParseError( @@ -358,7 +466,23 @@ impl DbspCircuit { execute_state: &mut ExecuteState, ) -> Result> { if let Some(root_id) = self.root { - self.execute_node(root_id, pager, execute_state, false, None) + // Create temporary cursors for execute (non-commit) operations + let table_cursor = BTreeCursor::new_table( + None, + pager.clone(), + self.internal_state_root, + OPERATOR_COLUMNS, + ); + let index_def = create_dbsp_state_index(self.internal_state_index_root); + let index_cursor = BTreeCursor::new_index( + None, + pager.clone(), + self.internal_state_index_root, + &index_def, + 3, + ); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); + self.execute_node(root_id, pager, execute_state, false, &mut cursors) } else { Err(LimboError::ParseError( "Circuit has no root node".to_string(), @@ -398,29 +522,42 @@ impl DbspCircuit { let mut state = std::mem::replace(&mut self.commit_state, CommitState::Init); match &mut state { CommitState::Init => { - // Create state cursor when entering CommitOperators state - let state_cursor = Box::new(BTreeCursor::new_table( + // Create state cursors when entering CommitOperators state + let state_table_cursor = BTreeCursor::new_table( None, pager.clone(), self.internal_state_root, OPERATOR_COLUMNS, + ); + let index_def = create_dbsp_state_index(self.internal_state_index_root); + let state_index_cursor = BTreeCursor::new_index( + None, + pager.clone(), + self.internal_state_index_root, + &index_def, + 3, // Index on first 3 columns + ); + + let state_cursors = Box::new(DbspStateCursors::new( + state_table_cursor, + state_index_cursor, )); self.commit_state = CommitState::CommitOperators { execute_state: Box::new(ExecuteState::Init { input_data: input_delta_set.clone(), }), - state_cursor, + state_cursors, }; } CommitState::CommitOperators { ref mut execute_state, - ref mut state_cursor, + ref mut state_cursors, } => { let delta = return_and_restore_if_io!( &mut self.commit_state, state, - self.run_circuit(pager.clone(), execute_state, true, state_cursor) + self.run_circuit(execute_state, &pager, state_cursors, true,) ); // Create view cursor when entering UpdateView state @@ -434,7 +571,7 @@ impl DbspCircuit { self.commit_state = CommitState::UpdateView { delta, current_index: 0, - write_row_state: WriteRow::new(), + write_row_state: WriteRowView::new(), view_cursor, }; } @@ -453,7 +590,7 @@ impl DbspCircuit { // If we're starting a new row (GetRecord state), we need a fresh cursor // due to btree cursor state machine limitations - if matches!(write_row_state, WriteRow::GetRecord) { + if matches!(write_row_state, WriteRowView::GetRecord) { *view_cursor = Box::new(BTreeCursor::new_table( None, pager.clone(), @@ -493,7 +630,7 @@ impl DbspCircuit { self.commit_state = CommitState::UpdateView { delta, current_index: *current_index + 1, - write_row_state: WriteRow::new(), + write_row_state: WriteRowView::new(), view_cursor, }; } @@ -509,7 +646,7 @@ impl DbspCircuit { pager: Rc, execute_state: &mut ExecuteState, commit_operators: bool, - state_cursor: Option<&mut Box>, + cursors: &mut DbspStateCursors, ) -> Result> { loop { match execute_state { @@ -577,12 +714,30 @@ impl DbspCircuit { // Get the (node_id, state) pair for the current index let (input_node_id, input_state) = &mut input_states[*current_index]; + // Create temporary cursors for the recursive call + let temp_table_cursor = BTreeCursor::new_table( + None, + pager.clone(), + self.internal_state_root, + OPERATOR_COLUMNS, + ); + let index_def = create_dbsp_state_index(self.internal_state_index_root); + let temp_index_cursor = BTreeCursor::new_index( + None, + pager.clone(), + self.internal_state_index_root, + &index_def, + 3, + ); + let mut temp_cursors = + DbspStateCursors::new(temp_table_cursor, temp_index_cursor); + let delta = return_if_io!(self.execute_node( *input_node_id, pager.clone(), input_state, commit_operators, - None // Input nodes don't need state cursor + &mut temp_cursors )); input_deltas.push(delta); *current_index += 1; @@ -595,13 +750,8 @@ impl DbspCircuit { .get_mut(&node_id) .ok_or_else(|| LimboError::ParseError("Node not found".to_string()))?; - let output_delta = return_if_io!(node.process_node( - pager.clone(), - eval_state, - self.internal_state_root, - commit_operators, - state_cursor, - )); + let output_delta = + return_if_io!(node.process_node(eval_state, commit_operators, cursors,)); return Ok(IOResult::Done(output_delta)); } } @@ -660,9 +810,17 @@ pub struct DbspCompiler { impl DbspCompiler { /// Create a new DBSP compiler - pub fn new(main_data_root: usize, internal_state_root: usize) -> Self { + pub fn new( + main_data_root: usize, + internal_state_root: usize, + internal_state_index_root: usize, + ) -> Self { Self { - circuit: DbspCircuit::new(main_data_root, internal_state_root), + circuit: DbspCircuit::new( + main_data_root, + internal_state_root, + internal_state_index_root, + ), } } @@ -1252,7 +1410,7 @@ mod tests { }}; } - fn setup_btree_for_circuit() -> (Rc, usize, usize) { + fn setup_btree_for_circuit() -> (Rc, usize, usize, usize) { let io: Arc = Arc::new(MemoryIO::new()); let db = Database::open_file(io.clone(), ":memory:", false, false).unwrap(); let conn = db.connect().unwrap(); @@ -1270,13 +1428,24 @@ mod tests { .block(|| pager.btree_create(&CreateBTreeFlags::new_table())) .unwrap() as usize; - (pager, main_root_page, dbsp_state_page) + let dbsp_state_index_page = pager + .io + .block(|| pager.btree_create(&CreateBTreeFlags::new_index())) + .unwrap() as usize; + + ( + pager, + main_root_page, + dbsp_state_page, + dbsp_state_index_page, + ) } // Macro to compile SQL to DBSP circuit macro_rules! compile_sql { ($sql:expr) => {{ - let (pager, main_root_page, dbsp_state_page) = setup_btree_for_circuit(); + let (pager, main_root_page, dbsp_state_page, dbsp_state_index_page) = + setup_btree_for_circuit(); let schema = test_schema!(); let mut parser = Parser::new($sql.as_bytes()); let cmd = parser @@ -1289,7 +1458,7 @@ mod tests { let mut builder = LogicalPlanBuilder::new(&schema); let logical_plan = builder.build_statement(&stmt).unwrap(); ( - DbspCompiler::new(main_root_page, dbsp_state_page) + DbspCompiler::new(main_root_page, dbsp_state_page, dbsp_state_index_page) .compile(&logical_plan) .unwrap(), pager, @@ -3162,10 +3331,10 @@ mod tests { #[test] fn test_circuit_rowid_update_consolidation() { - let (pager, p1, p2) = setup_btree_for_circuit(); + let (pager, p1, p2, p3) = setup_btree_for_circuit(); // Test that circuit properly consolidates state when rowid changes - let mut circuit = DbspCircuit::new(p1, p2); + let mut circuit = DbspCircuit::new(p1, p2, p3); // Create a simple filter node let schema = Arc::new(LogicalSchema::new(vec![ diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 825290bef..1c735df7d 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -6,8 +6,9 @@ use crate::function::{AggFunc, Func}; use crate::incremental::dbsp::{Delta, DeltaPair, HashableRow}; use crate::incremental::expr_compiler::CompiledExpression; use crate::incremental::persistence::{ReadRecord, WriteRow}; +use crate::schema::{Index, IndexColumn}; use crate::storage::btree::BTreeCursor; -use crate::types::{IOResult, SeekKey, Text}; +use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult, Text}; use crate::{ return_and_restore_if_io, return_if_io, Connection, Database, Result, SymbolTable, Value, }; @@ -17,6 +18,77 @@ use std::sync::{Arc, Mutex}; use turso_macros::match_ignore_ascii_case; use turso_parser::ast::{As, Expr, Literal, Name, OneSelect, Operator, ResultColumn}; +/// Struct to hold both table and index cursors for DBSP state operations +pub struct DbspStateCursors { + /// Cursor for the DBSP state table + pub table_cursor: BTreeCursor, + /// Cursor for the DBSP state table's primary key index + pub index_cursor: BTreeCursor, +} + +impl DbspStateCursors { + /// Create a new DbspStateCursors with both table and index cursors + pub fn new(table_cursor: BTreeCursor, index_cursor: BTreeCursor) -> Self { + Self { + table_cursor, + index_cursor, + } + } +} + +/// Create an index definition for the DBSP state table +/// This defines the primary key index on (operator_id, zset_id, element_id) +pub fn create_dbsp_state_index(root_page: usize) -> Index { + Index { + name: "dbsp_state_pk".to_string(), + table_name: "dbsp_state".to_string(), + root_page, + columns: vec![ + IndexColumn { + name: "operator_id".to_string(), + order: turso_parser::ast::SortOrder::Asc, + collation: None, + pos_in_table: 0, + default: None, + }, + IndexColumn { + name: "zset_id".to_string(), + order: turso_parser::ast::SortOrder::Asc, + collation: None, + pos_in_table: 1, + default: None, + }, + IndexColumn { + name: "element_id".to_string(), + order: turso_parser::ast::SortOrder::Asc, + collation: None, + pos_in_table: 2, + default: None, + }, + ], + unique: true, + ephemeral: false, + has_rowid: true, + } +} + +/// Storage key types for different operator contexts +#[derive(Debug, Clone, Copy)] +pub enum StorageKeyType { + /// For aggregate operators - uses operator_id * 2 + Aggregate { operator_id: usize }, +} + +impl StorageKeyType { + /// Get the unique storage ID using the same formula as before + /// This ensures different operators get unique IDs + pub fn to_storage_id(self) -> u64 { + match self { + StorageKeyType::Aggregate { operator_id } => (operator_id as u64), + } + } +} + type ComputedStates = HashMap, AggregateState)>; // group_key_str -> (group_key, state) #[derive(Debug)] enum AggregateCommitState { @@ -44,12 +116,20 @@ pub enum EvalState { Init { deltas: DeltaPair, }, + FetchKey { + delta: Delta, // Keep original delta for merge operation + current_idx: usize, + groups_to_read: Vec<(String, Vec)>, // Changed to Vec for index-based access + existing_groups: HashMap, + old_values: HashMap>, + }, FetchData { delta: Delta, // Keep original delta for merge operation current_idx: usize, groups_to_read: Vec<(String, Vec)>, // Changed to Vec for index-based access existing_groups: HashMap, old_values: HashMap>, + rowid: Option, // Rowid found by FetchKey (None if not found) read_record_state: Box, }, Done, @@ -101,20 +181,19 @@ impl EvalState { let _ = std::mem::replace( self, - EvalState::FetchData { + EvalState::FetchKey { delta, current_idx: 0, groups_to_read: groups_to_read.into_iter().collect(), // Convert BTreeMap to Vec existing_groups: HashMap::new(), old_values: HashMap::new(), - read_record_state: Box::new(ReadRecord::new()), }, ); } fn process_delta( &mut self, operator: &mut AggregateOperator, - cursor: &mut BTreeCursor, + cursors: &mut DbspStateCursors, ) -> Result> { loop { match self { @@ -124,13 +203,12 @@ impl EvalState { EvalState::Init { .. } => { panic!("State machine not supposed to reach the init state! advance() should have been called"); } - EvalState::FetchData { + EvalState::FetchKey { delta, current_idx, groups_to_read, existing_groups, old_values, - read_record_state, } => { if *current_idx >= groups_to_read.len() { // All groups processed, compute final output @@ -140,31 +218,102 @@ impl EvalState { return Ok(IOResult::Done(result)); } else { // Get the current group to read - let (group_key_str, group_key) = &groups_to_read[*current_idx]; + let (group_key_str, _group_key) = &groups_to_read[*current_idx]; - let seek_key = operator.generate_storage_key(group_key_str); - let key = SeekKey::TableRowId(seek_key); + // Build the key for the index: (operator_id, zset_id, element_id) + let storage_key = StorageKeyType::Aggregate { + operator_id: operator.operator_id, + }; + let operator_storage_id = storage_key.to_storage_id() as i64; + let zset_id = operator.generate_group_rowid(group_key_str); + let element_id = 0i64; // Always 0 for aggregators + // Create index key values + let index_key_values = vec![ + Value::Integer(operator_storage_id), + Value::Integer(zset_id), + Value::Integer(element_id), + ]; + + // Create an immutable record for the index key + let index_record = + ImmutableRecord::from_values(&index_key_values, index_key_values.len()); + + // Seek in the index to find if this row exists + let seek_result = return_if_io!(cursors.index_cursor.seek( + SeekKey::IndexKey(&index_record), + SeekOp::GE { eq_only: true } + )); + + let rowid = if matches!(seek_result, SeekResult::Found) { + // Found in index, get the table rowid + // The btree code handles extracting the rowid from the index record for has_rowid indexes + let rowid = return_if_io!(cursors.index_cursor.rowid()); + rowid + } else { + // Not found in index, no existing state + None + }; + + // Always transition to FetchData + let taken_existing = std::mem::take(existing_groups); + let taken_old_values = std::mem::take(old_values); + let next_state = EvalState::FetchData { + delta: std::mem::take(delta), + current_idx: *current_idx, + groups_to_read: std::mem::take(groups_to_read), + existing_groups: taken_existing, + old_values: taken_old_values, + rowid, + read_record_state: Box::new(ReadRecord::new()), + }; + *self = next_state; + } + } + EvalState::FetchData { + delta, + current_idx, + groups_to_read, + existing_groups, + old_values, + rowid, + read_record_state, + } => { + // Get the current group to read + let (group_key_str, group_key) = &groups_to_read[*current_idx]; + + // Only try to read if we have a rowid + if let Some(rowid) = rowid { + let key = SeekKey::TableRowId(*rowid); let state = return_if_io!(read_record_state.read_record( key, &operator.aggregates, - cursor + &mut cursors.table_cursor )); - - // Anything that mutates state has to happen after return_if_io! - // Unfortunately there's no good way to enforce that without turning - // this into a hot mess of mem::takes. + // Process the fetched state if let Some(state) = state { let mut old_row = group_key.clone(); old_row.extend(state.to_values(&operator.aggregates)); old_values.insert(group_key_str.clone(), old_row); existing_groups.insert(group_key_str.clone(), state.clone()); } - - // All attributes mutated in place. - *current_idx += 1; - *read_record_state = Box::new(ReadRecord::new()); + } else { + // No rowid for this group, skipping read } + // If no rowid, there's no existing state for this group + + // Move to next group + let next_idx = *current_idx + 1; + let taken_existing = std::mem::take(existing_groups); + let taken_old_values = std::mem::take(old_values); + let next_state = EvalState::FetchKey { + delta: std::mem::take(delta), + current_idx: next_idx, + groups_to_read: std::mem::take(groups_to_read), + existing_groups: taken_existing, + old_values: taken_old_values, + }; + *self = next_state; } EvalState::Done => { return Ok(IOResult::Done((Delta::new(), HashMap::new()))); @@ -511,17 +660,25 @@ pub trait IncrementalOperator: Debug { /// /// # Arguments /// * `state` - The evaluation state (may be in progress from a previous I/O operation) - /// * `cursor` - Cursor for reading operator state from storage + /// * `cursors` - Cursors for reading operator state from storage (table and optional index) /// /// # Returns /// The output delta from the evaluation - fn eval(&mut self, state: &mut EvalState, cursor: &mut BTreeCursor) -> Result>; + fn eval( + &mut self, + state: &mut EvalState, + cursors: &mut DbspStateCursors, + ) -> Result>; /// Commit deltas to the operator's internal state and return the output /// This is called when a transaction commits, making changes permanent /// Returns the output delta (what downstream operators should see) - /// The cursor parameter is for operators that need to persist state - fn commit(&mut self, deltas: DeltaPair, cursor: &mut BTreeCursor) -> Result>; + /// The cursors parameter is for operators that need to persist state + fn commit( + &mut self, + deltas: DeltaPair, + cursors: &mut DbspStateCursors, + ) -> Result>; /// Set computation tracker fn set_tracker(&mut self, tracker: Arc>); @@ -548,7 +705,7 @@ impl IncrementalOperator for InputOperator { fn eval( &mut self, state: &mut EvalState, - _cursor: &mut BTreeCursor, + _cursors: &mut DbspStateCursors, ) -> Result> { match state { EvalState::Init { deltas } => { @@ -567,7 +724,11 @@ impl IncrementalOperator for InputOperator { } } - fn commit(&mut self, deltas: DeltaPair, _cursor: &mut BTreeCursor) -> Result> { + fn commit( + &mut self, + deltas: DeltaPair, + _cursors: &mut DbspStateCursors, + ) -> Result> { // Input operator only uses left delta, right must be empty assert!( deltas.right.is_empty(), @@ -697,7 +858,7 @@ impl IncrementalOperator for FilterOperator { fn eval( &mut self, state: &mut EvalState, - _cursor: &mut BTreeCursor, + _cursors: &mut DbspStateCursors, ) -> Result> { let delta = match state { EvalState::Init { deltas } => { @@ -733,7 +894,11 @@ impl IncrementalOperator for FilterOperator { Ok(IOResult::Done(output_delta)) } - fn commit(&mut self, deltas: DeltaPair, _cursor: &mut BTreeCursor) -> Result> { + fn commit( + &mut self, + deltas: DeltaPair, + _cursors: &mut DbspStateCursors, + ) -> Result> { // Filter operator only uses left delta, right must be empty assert!( deltas.right.is_empty(), @@ -1106,7 +1271,7 @@ impl IncrementalOperator for ProjectOperator { fn eval( &mut self, state: &mut EvalState, - _cursor: &mut BTreeCursor, + _cursors: &mut DbspStateCursors, ) -> Result> { let delta = match state { EvalState::Init { deltas } => { @@ -1138,7 +1303,11 @@ impl IncrementalOperator for ProjectOperator { Ok(IOResult::Done(output_delta)) } - fn commit(&mut self, deltas: DeltaPair, _cursor: &mut BTreeCursor) -> Result> { + fn commit( + &mut self, + deltas: DeltaPair, + _cursors: &mut DbspStateCursors, + ) -> Result> { // Project operator only uses left delta, right must be empty assert!( deltas.right.is_empty(), @@ -1466,7 +1635,7 @@ impl AggregateOperator { fn eval_internal( &mut self, state: &mut EvalState, - cursor: &mut BTreeCursor, + cursors: &mut DbspStateCursors, ) -> Result> { match state { EvalState::Uninitialized => { @@ -1493,7 +1662,7 @@ impl AggregateOperator { } state.advance(groups_to_read); } - EvalState::FetchData { .. } => { + EvalState::FetchKey { .. } | EvalState::FetchData { .. } => { // Already in progress, continue processing on process_delta below. } EvalState::Done => { @@ -1502,7 +1671,7 @@ impl AggregateOperator { } // Process the delta through the state machine - let result = return_if_io!(state.process_delta(self, cursor)); + let result = return_if_io!(state.process_delta(self, cursors)); Ok(IOResult::Done(result)) } @@ -1636,12 +1805,20 @@ impl AggregateOperator { } impl IncrementalOperator for AggregateOperator { - fn eval(&mut self, state: &mut EvalState, cursor: &mut BTreeCursor) -> Result> { - let (delta, _) = return_if_io!(self.eval_internal(state, cursor)); + fn eval( + &mut self, + state: &mut EvalState, + cursors: &mut DbspStateCursors, + ) -> Result> { + let (delta, _) = return_if_io!(self.eval_internal(state, cursors)); Ok(IOResult::Done(delta)) } - fn commit(&mut self, deltas: DeltaPair, cursor: &mut BTreeCursor) -> Result> { + fn commit( + &mut self, + deltas: DeltaPair, + cursors: &mut DbspStateCursors, + ) -> Result> { // Aggregate operator only uses left delta, right must be empty assert!( deltas.right.is_empty(), @@ -1666,7 +1843,7 @@ impl IncrementalOperator for AggregateOperator { let (output_delta, computed_states) = return_and_restore_if_io!( &mut self.commit_state, state, - self.eval_internal(eval_state, cursor) + self.eval_internal(eval_state, cursors) ); self.commit_state = AggregateCommitState::PersistDelta { delta: output_delta, @@ -1690,34 +1867,42 @@ impl IncrementalOperator for AggregateOperator { } else { let (group_key_str, (group_key, agg_state)) = states_vec[*current_idx]; - let seek_key = self.seek_key_from_str(group_key_str); + // Build the key components for the new table structure + let storage_key = StorageKeyType::Aggregate { + operator_id: self.operator_id, + }; + let operator_storage_id = storage_key.to_storage_id() as i64; + let zset_id = self.generate_group_rowid(group_key_str); + let element_id = 0i64; // Determine weight: -1 to delete (cancels existing weight=1), 1 to insert/update let weight = if agg_state.count == 0 { -1 } else { 1 }; // Serialize the aggregate state with group key (even for deletion, we need a row) let state_blob = agg_state.to_blob(&self.aggregates, group_key); - let blob_row = HashableRow::new(0, vec![Value::Blob(state_blob)]); + let blob_value = Value::Blob(state_blob); - // Build the aggregate storage format: [key, blob, weight] - let seek_key_clone = seek_key.clone(); - let blob_value = blob_row.values[0].clone(); - let build_fn = move |final_weight: isize| -> Vec { - let key_i64 = match seek_key_clone.clone() { - SeekKey::TableRowId(id) => id, - _ => panic!("Expected TableRowId"), - }; - vec![ - Value::Integer(key_i64), - blob_value.clone(), // The blob with serialized state - Value::Integer(final_weight as i64), - ] - }; + // Build the aggregate storage format: [operator_id, zset_id, element_id, value, weight] + let operator_id_val = Value::Integer(operator_storage_id); + let zset_id_val = Value::Integer(zset_id); + let element_id_val = Value::Integer(element_id); + let blob_val = blob_value.clone(); + + // Create index key - the first 3 columns of our primary key + let index_key = vec![ + operator_id_val.clone(), + zset_id_val.clone(), + element_id_val.clone(), + ]; + + // Record values (without weight) + let record_values = + vec![operator_id_val, zset_id_val, element_id_val, blob_val]; return_and_restore_if_io!( &mut self.commit_state, state, - write_row.write_row(cursor, seek_key, build_fn, weight) + write_row.write_row(cursors, index_key, record_values, weight) ); let delta = std::mem::take(delta); @@ -1755,8 +1940,8 @@ mod tests { use crate::{Database, MemoryIO, IO}; use std::sync::{Arc, Mutex}; - /// Create a test pager for operator tests - fn create_test_pager() -> (std::rc::Rc, usize) { + /// Create a test pager for operator tests with both table and index + fn create_test_pager() -> (std::rc::Rc, usize, usize) { let io: Arc = Arc::new(MemoryIO::new()); let db = Database::open_file(io.clone(), ":memory:", false, false).unwrap(); let conn = db.connect().unwrap(); @@ -1766,14 +1951,21 @@ mod tests { // Allocate page 1 first (database header) let _ = pager.io.block(|| pager.allocate_page1()); - // Properly create a BTree for aggregate state using the pager API - let root_page_id = pager + // Create a BTree for the table + let table_root_page_id = pager .io .block(|| pager.btree_create(&CreateBTreeFlags::new_table())) - .expect("Failed to create BTree for aggregate state") + .expect("Failed to create BTree for aggregate state table") as usize; - (pager, root_page_id) + // Create a BTree for the index + let index_root_page_id = pager + .io + .block(|| pager.btree_create(&CreateBTreeFlags::new_index())) + .expect("Failed to create BTree for aggregate state index") + as usize; + + (pager, table_root_page_id, index_root_page_id) } /// Read the current state from the BTree (for testing) @@ -1781,23 +1973,23 @@ mod tests { fn get_current_state_from_btree( agg: &AggregateOperator, pager: &std::rc::Rc, - cursor: &mut BTreeCursor, + cursors: &mut DbspStateCursors, ) -> Delta { let mut result = Delta::new(); // Rewind to start of table - pager.io.block(|| cursor.rewind()).unwrap(); + pager.io.block(|| cursors.table_cursor.rewind()).unwrap(); loop { // Check if cursor is empty (no more rows) - if cursor.is_empty() { + if cursors.table_cursor.is_empty() { break; } // Get the record at this position let record = pager .io - .block(|| cursor.record()) + .block(|| cursors.table_cursor.record()) .unwrap() .unwrap() .to_owned(); @@ -1805,18 +1997,21 @@ mod tests { let values_ref = record.get_values(); let values: Vec = values_ref.into_iter().map(|x| x.to_owned()).collect(); - // Check if this record belongs to our operator - if let Some(Value::Integer(key)) = values.first() { - let operator_part = (key >> 32) as usize; + // Parse the 5-column structure: operator_id, zset_id, element_id, value, weight + if let Some(Value::Integer(op_id)) = values.first() { + let storage_key = StorageKeyType::Aggregate { + operator_id: agg.operator_id, + }; + let expected_op_id = storage_key.to_storage_id() as i64; // Skip if not our operator - if operator_part != agg.operator_id { - pager.io.block(|| cursor.next()).unwrap(); + if *op_id != expected_op_id { + pager.io.block(|| cursors.table_cursor.next()).unwrap(); continue; } - // Get the blob data - if let Some(Value::Blob(blob)) = values.get(1) { + // Get the blob data from column 3 (value column) + if let Some(Value::Blob(blob)) = values.get(3) { // Deserialize the state if let Some((state, group_key)) = AggregateState::from_blob(blob, &agg.aggregates) @@ -1836,7 +2031,7 @@ mod tests { } } - pager.io.block(|| cursor.next()).unwrap(); + pager.io.block(|| cursors.table_cursor.next()).unwrap(); } result.consolidate(); @@ -1871,8 +2066,14 @@ mod tests { // and an insertion (+1) of the new value. // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); // Create an aggregate operator for SUM(age) with no GROUP BY let mut agg = AggregateOperator::new( @@ -1912,11 +2113,11 @@ mod tests { // Initialize with initial data pager .io - .block(|| agg.commit((&initial_delta).into(), &mut cursor)) + .block(|| agg.commit((&initial_delta).into(), &mut cursors)) .unwrap(); // Verify initial state: SUM(age) = 25 + 30 + 35 = 90 - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes.len(), 1, "Should have one aggregate row"); let (row, weight) = &state.changes[0]; assert_eq!(*weight, 1, "Aggregate row should have weight 1"); @@ -1936,7 +2137,7 @@ mod tests { // Process the incremental update let output_delta = pager .io - .block(|| agg.commit((&update_delta).into(), &mut cursor)) + .block(|| agg.commit((&update_delta).into(), &mut cursors)) .unwrap(); // CRITICAL: The output delta should contain TWO changes: @@ -1985,8 +2186,14 @@ mod tests { // Create an aggregate operator for SUM(score) GROUP BY team // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2033,11 +2240,11 @@ mod tests { // Initialize with initial data pager .io - .block(|| agg.commit((&initial_delta).into(), &mut cursor)) + .block(|| agg.commit((&initial_delta).into(), &mut cursors)) .unwrap(); // Verify initial state: red team = 30, blue team = 15 - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes.len(), 2, "Should have two groups"); // Find the red and blue team aggregates @@ -2079,7 +2286,7 @@ mod tests { // Process the incremental update let output_delta = pager .io - .block(|| agg.commit((&update_delta).into(), &mut cursor)) + .block(|| agg.commit((&update_delta).into(), &mut cursors)) .unwrap(); // Should have 2 changes: retraction of old red team sum, insertion of new red team sum @@ -2130,8 +2337,14 @@ mod tests { let tracker = Arc::new(Mutex::new(ComputationTracker::new())); // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); // Create COUNT(*) GROUP BY category let mut agg = AggregateOperator::new( @@ -2161,7 +2374,7 @@ mod tests { } pager .io - .block(|| agg.commit((&initial).into(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursors)) .unwrap(); // Reset tracker for delta processing @@ -2180,13 +2393,13 @@ mod tests { pager .io - .block(|| agg.commit((&delta).into(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursors)) .unwrap(); assert_eq!(tracker.lock().unwrap().aggregation_updates, 1); // Check the final state - cat_0 should now have count 11 - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); let cat_0 = final_state .changes .iter() @@ -2205,8 +2418,14 @@ mod tests { // Create SUM(amount) GROUP BY product // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2248,11 +2467,11 @@ mod tests { ); pager .io - .block(|| agg.commit((&initial).into(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursors)) .unwrap(); // Check initial state: Widget=250, Gadget=200 - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); let widget_sum = state .changes .iter() @@ -2277,13 +2496,13 @@ mod tests { pager .io - .block(|| agg.commit((&delta).into(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursors)) .unwrap(); assert_eq!(tracker.lock().unwrap().aggregation_updates, 1); // Check final state - Widget should now be 300 (250 + 50) - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); let widget = final_state .changes .iter() @@ -2296,8 +2515,14 @@ mod tests { fn test_count_and_sum_together() { // Test the example from DBSP_ROADMAP: COUNT(*) and SUM(amount) GROUP BY user_id // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2329,13 +2554,13 @@ mod tests { ); pager .io - .block(|| agg.commit((&initial).into(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursors)) .unwrap(); // Check initial state // User 1: count=2, sum=300 // User 2: count=1, sum=150 - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes.len(), 2); let user1 = state @@ -2364,11 +2589,11 @@ mod tests { ); pager .io - .block(|| agg.commit((&delta).into(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursors)) .unwrap(); // Check final state - user 1 should have updated count and sum - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); let user1 = final_state .changes .iter() @@ -2382,8 +2607,14 @@ mod tests { fn test_avg_maintains_sum_and_count() { // Test AVG aggregation // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2424,13 +2655,13 @@ mod tests { ); pager .io - .block(|| agg.commit((&initial).into(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursors)) .unwrap(); // Check initial averages // Category A: avg = (10 + 20) / 2 = 15 // Category B: avg = 30 / 1 = 30 - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); let cat_a = state .changes .iter() @@ -2459,11 +2690,11 @@ mod tests { ); pager .io - .block(|| agg.commit((&delta).into(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursors)) .unwrap(); // Check final state - Category A avg should now be (10 + 20 + 30) / 3 = 20 - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); let cat_a = final_state .changes .iter() @@ -2476,8 +2707,14 @@ mod tests { fn test_delete_updates_aggregates() { // Test that deletes (negative weights) properly update aggregates // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2513,11 +2750,11 @@ mod tests { ); pager .io - .block(|| agg.commit((&initial).into(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursors)) .unwrap(); // Check initial state: count=2, sum=300 - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert!(!state.changes.is_empty()); let (row, _weight) = &state.changes[0]; assert_eq!(row.values[1], Value::Integer(2)); // count @@ -2536,11 +2773,11 @@ mod tests { pager .io - .block(|| agg.commit((&delta).into(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursors)) .unwrap(); // Check final state - should update to count=1, sum=200 - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); let cat_a = final_state .changes .iter() @@ -2557,8 +2794,14 @@ mod tests { let input_columns = vec!["category".to_string(), "value".to_string()]; // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2574,11 +2817,11 @@ mod tests { init_data.insert(3, vec![Value::Text("B".into()), Value::Integer(30)]); pager .io - .block(|| agg.commit((&init_data).into(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursors)) .unwrap(); // Check initial counts - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes.len(), 2); // Find group A and B @@ -2602,14 +2845,14 @@ mod tests { let output = pager .io - .block(|| agg.commit((&delete_delta).into(), &mut cursor)) + .block(|| agg.commit((&delete_delta).into(), &mut cursors)) .unwrap(); // Should emit retraction for old count and insertion for new count assert_eq!(output.changes.len(), 2); // Check final state - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); let group_a_final = final_state .changes .iter() @@ -2623,13 +2866,13 @@ mod tests { let output_b = pager .io - .block(|| agg.commit((&delete_all_b).into(), &mut cursor)) + .block(|| agg.commit((&delete_all_b).into(), &mut cursors)) .unwrap(); assert_eq!(output_b.changes.len(), 1); // Only retraction, no new row assert_eq!(output_b.changes[0].1, -1); // Retraction // Final state should not have group B - let final_state2 = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state2 = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(final_state2.changes.len(), 1); // Only group A remains assert_eq!(final_state2.changes[0].0.values[0], Value::Text("A".into())); } @@ -2641,8 +2884,14 @@ mod tests { let input_columns = vec!["category".to_string(), "value".to_string()]; // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2659,11 +2908,11 @@ mod tests { init_data.insert(4, vec![Value::Text("B".into()), Value::Integer(15)]); pager .io - .block(|| agg.commit((&init_data).into(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursors)) .unwrap(); // Check initial sums - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); let group_a = state .changes .iter() @@ -2684,11 +2933,11 @@ mod tests { pager .io - .block(|| agg.commit((&delete_delta).into(), &mut cursor)) + .block(|| agg.commit((&delete_delta).into(), &mut cursors)) .unwrap(); // Check updated sum - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); let group_a = state .changes .iter() @@ -2703,11 +2952,11 @@ mod tests { pager .io - .block(|| agg.commit((&delete_all_b).into(), &mut cursor)) + .block(|| agg.commit((&delete_all_b).into(), &mut cursors)) .unwrap(); // Group B should be gone - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(final_state.changes.len(), 1); // Only group A remains assert_eq!(final_state.changes[0].0.values[0], Value::Text("A".into())); } @@ -2719,8 +2968,14 @@ mod tests { let input_columns = vec!["category".to_string(), "value".to_string()]; // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2736,11 +2991,11 @@ mod tests { init_data.insert(3, vec![Value::Text("A".into()), Value::Integer(30)]); pager .io - .block(|| agg.commit((&init_data).into(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursors)) .unwrap(); // Check initial average - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes.len(), 1); assert_eq!(state.changes[0].0.values[1], Value::Float(20.0)); // AVG = (10+20+30)/3 = 20 @@ -2750,11 +3005,11 @@ mod tests { pager .io - .block(|| agg.commit((&delete_delta).into(), &mut cursor)) + .block(|| agg.commit((&delete_delta).into(), &mut cursors)) .unwrap(); // Check updated average - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes[0].0.values[1], Value::Float(20.0)); // AVG = (10+30)/2 = 20 (same!) // Delete another to change the average @@ -2763,10 +3018,10 @@ mod tests { pager .io - .block(|| agg.commit((&delete_another).into(), &mut cursor)) + .block(|| agg.commit((&delete_another).into(), &mut cursors)) .unwrap(); - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes[0].0.values[1], Value::Float(10.0)); // AVG = 10/1 = 10 } @@ -2782,8 +3037,14 @@ mod tests { let input_columns = vec!["category".to_string(), "value".to_string()]; // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2799,11 +3060,11 @@ mod tests { init_data.insert(3, vec![Value::Text("B".into()), Value::Integer(50)]); pager .io - .block(|| agg.commit((&init_data).into(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursors)) .unwrap(); // Check initial state - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); let group_a = state .changes .iter() @@ -2820,11 +3081,11 @@ mod tests { pager .io - .block(|| agg.commit((&delete_delta).into(), &mut cursor)) + .block(|| agg.commit((&delete_delta).into(), &mut cursors)) .unwrap(); // Check all aggregates updated correctly - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); let group_a = state .changes .iter() @@ -2841,10 +3102,10 @@ mod tests { pager .io - .block(|| agg.commit((&insert_delta).into(), &mut cursor)) + .block(|| agg.commit((&insert_delta).into(), &mut cursors)) .unwrap(); - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); let group_a = state .changes .iter() @@ -2862,8 +3123,14 @@ mod tests { // the operator should properly consolidate the state // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut filter = FilterOperator::new( FilterPredicate::GreaterThan { @@ -2878,7 +3145,7 @@ mod tests { init_data.insert(3, vec![Value::Integer(3), Value::Integer(3)]); let state = pager .io - .block(|| filter.commit((&init_data).into(), &mut cursor)) + .block(|| filter.commit((&init_data).into(), &mut cursors)) .unwrap(); // Check initial state @@ -2897,7 +3164,7 @@ mod tests { let output = pager .io - .block(|| filter.commit((&update_delta).into(), &mut cursor)) + .block(|| filter.commit((&update_delta).into(), &mut cursors)) .unwrap(); // The output delta should have both changes (both pass the filter b > 2) @@ -2918,8 +3185,14 @@ mod tests { #[test] fn test_filter_eval_with_uncommitted() { // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut filter = FilterOperator::new( FilterPredicate::GreaterThan { @@ -2949,7 +3222,7 @@ mod tests { ); let state = pager .io - .block(|| filter.commit((&init_data).into(), &mut cursor)) + .block(|| filter.commit((&init_data).into(), &mut cursors)) .unwrap(); // Verify initial state (only Alice passes filter) @@ -2979,7 +3252,7 @@ mod tests { let mut eval_state = uncommitted.clone().into(); let result = pager .io - .block(|| filter.eval(&mut eval_state, &mut cursor)) + .block(|| filter.eval(&mut eval_state, &mut cursors)) .unwrap(); assert_eq!( result.changes.len(), @@ -2991,7 +3264,7 @@ mod tests { // Now commit the changes let state = pager .io - .block(|| filter.commit((&uncommitted).into(), &mut cursor)) + .block(|| filter.commit((&uncommitted).into(), &mut cursors)) .unwrap(); // State should now include Charlie (who passes filter) @@ -3006,8 +3279,14 @@ mod tests { fn test_aggregate_eval_with_uncommitted_preserves_state() { // This is the critical test - aggregations must not modify internal state during eval // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -3051,11 +3330,11 @@ mod tests { ); pager .io - .block(|| agg.commit((&init_data).into(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursors)) .unwrap(); // Check initial state: A -> (count=2, sum=300), B -> (count=1, sum=150) - let initial_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let initial_state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(initial_state.changes.len(), 2); // Store initial state for comparison @@ -3090,7 +3369,7 @@ mod tests { let mut eval_state = uncommitted.clone().into(); let result = pager .io - .block(|| agg.eval(&mut eval_state, &mut cursor)) + .block(|| agg.eval(&mut eval_state, &mut cursors)) .unwrap(); // Result should contain updates for A and new group C @@ -3099,7 +3378,7 @@ mod tests { assert!(!result.changes.is_empty(), "Should have aggregate changes"); // CRITICAL: Verify internal state hasn't changed - let state_after_eval = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state_after_eval = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!( state_after_eval.changes.len(), 2, @@ -3125,11 +3404,11 @@ mod tests { // Now commit the changes pager .io - .block(|| agg.commit((&uncommitted).into(), &mut cursor)) + .block(|| agg.commit((&uncommitted).into(), &mut cursors)) .unwrap(); // State should now be updated - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(final_state.changes.len(), 3, "Should now have A, B, and C"); let a_final = final_state @@ -3170,8 +3449,14 @@ mod tests { // Test that calling eval multiple times with different uncommitted data // doesn't pollute the internal state // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -3189,11 +3474,11 @@ mod tests { init_data.insert(2, vec![Value::Integer(2), Value::Integer(200)]); pager .io - .block(|| agg.commit((&init_data).into(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursors)) .unwrap(); // Initial state: count=2, sum=300 - let initial_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let initial_state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(initial_state.changes.len(), 1); assert_eq!(initial_state.changes[0].0.values[0], Value::Integer(2)); assert_eq!(initial_state.changes[0].0.values[1], Value::Float(300.0)); @@ -3204,11 +3489,11 @@ mod tests { let mut eval_state1 = uncommitted1.clone().into(); let _ = pager .io - .block(|| agg.eval(&mut eval_state1, &mut cursor)) + .block(|| agg.eval(&mut eval_state1, &mut cursors)) .unwrap(); // State should be unchanged - let state1 = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state1 = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state1.changes[0].0.values[0], Value::Integer(2)); assert_eq!(state1.changes[0].0.values[1], Value::Float(300.0)); @@ -3219,11 +3504,11 @@ mod tests { let mut eval_state2 = uncommitted2.clone().into(); let _ = pager .io - .block(|| agg.eval(&mut eval_state2, &mut cursor)) + .block(|| agg.eval(&mut eval_state2, &mut cursors)) .unwrap(); // State should STILL be unchanged - let state2 = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state2 = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state2.changes[0].0.values[0], Value::Integer(2)); assert_eq!(state2.changes[0].0.values[1], Value::Float(300.0)); @@ -3233,11 +3518,11 @@ mod tests { let mut eval_state3 = uncommitted3.clone().into(); let _ = pager .io - .block(|| agg.eval(&mut eval_state3, &mut cursor)) + .block(|| agg.eval(&mut eval_state3, &mut cursors)) .unwrap(); // State should STILL be unchanged - let state3 = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state3 = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state3.changes[0].0.values[0], Value::Integer(2)); assert_eq!(state3.changes[0].0.values[1], Value::Float(300.0)); } @@ -3246,8 +3531,14 @@ mod tests { fn test_aggregate_eval_with_mixed_committed_and_uncommitted() { // Test eval with both committed delta and uncommitted changes // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -3262,7 +3553,7 @@ mod tests { init_data.insert(2, vec![Value::Integer(2), Value::Text("Y".into())]); pager .io - .block(|| agg.commit((&init_data).into(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursors)) .unwrap(); // Create a committed delta (to be processed) @@ -3280,7 +3571,7 @@ mod tests { let mut eval_state = combined.clone().into(); let result = pager .io - .block(|| agg.eval(&mut eval_state, &mut cursor)) + .block(|| agg.eval(&mut eval_state, &mut cursors)) .unwrap(); // Result should reflect changes from both @@ -3334,17 +3625,17 @@ mod tests { assert_eq!(sorted_changes[4].1, 1); // insertion only (no retraction as it's new); // But internal state should be unchanged - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes.len(), 2, "Should still have only X and Y"); // Now commit only the committed_delta pager .io - .block(|| agg.commit((&committed_delta).into(), &mut cursor)) + .block(|| agg.commit((&committed_delta).into(), &mut cursors)) .unwrap(); // State should now have X count=2, Y count=1 - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); let x = final_state .changes .iter() diff --git a/core/incremental/persistence.rs b/core/incremental/persistence.rs index 381b406aa..0d3425404 100644 --- a/core/incremental/persistence.rs +++ b/core/incremental/persistence.rs @@ -1,7 +1,7 @@ -use crate::incremental::operator::{AggregateFunction, AggregateState}; +use crate::incremental::operator::{AggregateFunction, AggregateState, DbspStateCursors}; use crate::storage::btree::{BTreeCursor, BTreeKey}; -use crate::types::{IOResult, SeekKey, SeekOp, SeekResult}; -use crate::{return_if_io, Result, Value}; +use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult}; +use crate::{return_if_io, LimboError, Result, Value}; #[derive(Debug, Default)] pub enum ReadRecord { @@ -32,21 +32,22 @@ impl ReadRecord { } else { let record = return_if_io!(cursor.record()); let r = record.ok_or_else(|| { - crate::LimboError::InternalError(format!( + LimboError::InternalError(format!( "Found key {key:?} in aggregate storage but could not read record" )) })?; let values = r.get_values(); - let blob = values[1].to_owned(); + // The blob is in column 3: operator_id, zset_id, element_id, value, weight + let blob = values[3].to_owned(); let (state, _group_key) = match blob { Value::Blob(blob) => AggregateState::from_blob(&blob, aggregates) .ok_or_else(|| { - crate::LimboError::InternalError(format!( + LimboError::InternalError(format!( "Cannot deserialize aggregate state {blob:?}", )) }), - _ => Err(crate::LimboError::ParseError( + _ => Err(LimboError::ParseError( "Value in aggregator not blob".to_string(), )), }?; @@ -63,8 +64,22 @@ impl ReadRecord { pub enum WriteRow { #[default] GetRecord, - Delete, - Insert { + Delete { + rowid: i64, + }, + DeleteIndex, + ComputeNewRowId { + final_weight: isize, + }, + InsertNew { + rowid: i64, + final_weight: isize, + }, + InsertIndex { + rowid: i64, + }, + UpdateExisting { + rowid: i64, final_weight: isize, }, Done, @@ -75,97 +90,192 @@ impl WriteRow { Self::default() } - /// Write a row with weight management. + /// Write a row with weight management using index for lookups. /// /// # Arguments - /// * `cursor` - BTree cursor for the storage - /// * `key` - The key to seek (TableRowId) - /// * `build_record` - Function that builds the record values to insert. - /// Takes the final_weight and returns the complete record values. + /// * `cursors` - DBSP state cursors (table and index) + /// * `index_key` - The key to seek in the index + /// * `record_values` - The record values (without weight) to insert /// * `weight` - The weight delta to apply - pub fn write_row( + pub fn write_row( &mut self, - cursor: &mut BTreeCursor, - key: SeekKey, - build_record: F, + cursors: &mut DbspStateCursors, + index_key: Vec, + record_values: Vec, weight: isize, - ) -> Result> - where - F: Fn(isize) -> Vec, - { + ) -> Result> { loop { match self { WriteRow::GetRecord => { - let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); + // First, seek in the index to find if the row exists + let index_values = index_key.clone(); + let index_record = + ImmutableRecord::from_values(&index_values, index_values.len()); + + let res = return_if_io!(cursors.index_cursor.seek( + SeekKey::IndexKey(&index_record), + SeekOp::GE { eq_only: true } + )); + if !matches!(res, SeekResult::Found) { - *self = WriteRow::Insert { + // Row doesn't exist, we'll insert a new one + *self = WriteRow::ComputeNewRowId { final_weight: weight, }; } else { - let existing_record = return_if_io!(cursor.record()); + // Found in index, get the rowid it points to + let rowid = return_if_io!(cursors.index_cursor.rowid()); + let rowid = rowid.ok_or_else(|| { + LimboError::InternalError( + "Index cursor does not have a valid rowid".to_string(), + ) + })?; + + // Now seek in the table using the rowid + let table_res = return_if_io!(cursors + .table_cursor + .seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true })); + + if !matches!(table_res, SeekResult::Found) { + return Err(LimboError::InternalError( + "Index points to non-existent table row".to_string(), + )); + } + + let existing_record = return_if_io!(cursors.table_cursor.record()); let r = existing_record.ok_or_else(|| { - crate::LimboError::InternalError(format!( - "Found key {key:?} in storage but could not read record" - )) + LimboError::InternalError( + "Found rowid in table but could not read record".to_string(), + ) })?; let values = r.get_values(); - // Weight is always the last value - let existing_weight = match values.last() { + // Weight is always the last value (column 4 in our 5-column structure) + let existing_weight = match values.get(4) { Some(val) => match val.to_owned() { Value::Integer(w) => w as isize, _ => { - return Err(crate::LimboError::InternalError(format!( - "Invalid weight value in storage for key {key:?}" - ))) + return Err(LimboError::InternalError( + "Invalid weight value in storage".to_string(), + )) } }, None => { - return Err(crate::LimboError::InternalError(format!( - "No weight value found in storage for key {key:?}" - ))) + return Err(LimboError::InternalError( + "No weight value found in storage".to_string(), + )) } }; let final_weight = existing_weight + weight; if final_weight <= 0 { - *self = WriteRow::Delete + *self = WriteRow::Delete { rowid } } else { - *self = WriteRow::Insert { final_weight } + // Store the rowid for update + *self = WriteRow::UpdateExisting { + rowid, + final_weight, + } } } } - WriteRow::Delete => { + WriteRow::Delete { rowid } => { + // Seek to the row and delete it + return_if_io!(cursors + .table_cursor + .seek(SeekKey::TableRowId(*rowid), SeekOp::GE { eq_only: true })); + + // Transition to DeleteIndex to also delete the index entry + *self = WriteRow::DeleteIndex; + return_if_io!(cursors.table_cursor.delete()); + } + WriteRow::DeleteIndex => { // Mark as Done before delete to avoid retry on I/O *self = WriteRow::Done; - return_if_io!(cursor.delete()); + return_if_io!(cursors.index_cursor.delete()); } - WriteRow::Insert { final_weight } => { - return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); - - // Extract the row ID from the key - let key_i64 = match key { - SeekKey::TableRowId(id) => id, - _ => { - return Err(crate::LimboError::InternalError( - "Expected TableRowId for storage".to_string(), - )) + WriteRow::ComputeNewRowId { final_weight } => { + // Find the last rowid to compute the next one + return_if_io!(cursors.table_cursor.last()); + let rowid = if cursors.table_cursor.is_empty() { + 1 + } else { + match return_if_io!(cursors.table_cursor.rowid()) { + Some(id) => id + 1, + None => { + return Err(LimboError::InternalError( + "Table cursor has rows but no valid rowid".to_string(), + )) + } } }; - // Build the record values using the provided function - let record_values = build_record(*final_weight); + // Transition to InsertNew with the computed rowid + *self = WriteRow::InsertNew { + rowid, + final_weight: *final_weight, + }; + } + WriteRow::InsertNew { + rowid, + final_weight, + } => { + let rowid_val = *rowid; + let final_weight_val = *final_weight; + + // Seek to where we want to insert + // The insert will position the cursor correctly + return_if_io!(cursors.table_cursor.seek( + SeekKey::TableRowId(rowid_val), + SeekOp::GE { eq_only: false } + )); + + // Build the complete record with weight + // Use the function parameter record_values directly + let mut complete_record = record_values.clone(); + complete_record.push(Value::Integer(final_weight_val as i64)); // Create an ImmutableRecord from the values - let immutable_record = crate::types::ImmutableRecord::from_values( - &record_values, - record_values.len(), - ); - let btree_key = BTreeKey::new_table_rowid(key_i64, Some(&immutable_record)); + let immutable_record = + ImmutableRecord::from_values(&complete_record, complete_record.len()); + let btree_key = BTreeKey::new_table_rowid(rowid_val, Some(&immutable_record)); + + // Transition to InsertIndex state after table insertion + *self = WriteRow::InsertIndex { rowid: rowid_val }; + return_if_io!(cursors.table_cursor.insert(&btree_key)); + } + WriteRow::InsertIndex { rowid } => { + // For has_rowid indexes, we need to append the rowid to the index key + // Use the function parameter index_key directly + let mut index_values = index_key.clone(); + index_values.push(Value::Integer(*rowid)); + + // Create the index record with the rowid appended + let index_record = + ImmutableRecord::from_values(&index_values, index_values.len()); + let index_btree_key = BTreeKey::new_index_key(&index_record); + + // Mark as Done before index insert to avoid retry on I/O + *self = WriteRow::Done; + return_if_io!(cursors.index_cursor.insert(&index_btree_key)); + } + WriteRow::UpdateExisting { + rowid, + final_weight, + } => { + // Build the complete record with weight + let mut complete_record = record_values.clone(); + complete_record.push(Value::Integer(*final_weight as i64)); + + // Create an ImmutableRecord from the values + let immutable_record = + ImmutableRecord::from_values(&complete_record, complete_record.len()); + let btree_key = BTreeKey::new_table_rowid(*rowid, Some(&immutable_record)); // Mark as Done before insert to avoid retry on I/O *self = WriteRow::Done; - return_if_io!(cursor.insert(&btree_key)); + // BTree insert with existing key will replace the old value + return_if_io!(cursors.table_cursor.insert(&btree_key)); } WriteRow::Done => { return Ok(IOResult::Done(())); diff --git a/core/incremental/view.rs b/core/incremental/view.rs index bdcabd7c9..591e95e38 100644 --- a/core/incremental/view.rs +++ b/core/incremental/view.rs @@ -206,6 +206,7 @@ impl IncrementalView { schema: &Schema, main_data_root: usize, internal_state_root: usize, + internal_state_index_root: usize, ) -> Result { // Build the logical plan from the SELECT statement let mut builder = LogicalPlanBuilder::new(schema); @@ -214,7 +215,11 @@ impl IncrementalView { let logical_plan = builder.build_statement(&stmt)?; // Compile the logical plan to a DBSP circuit with the storage roots - let compiler = DbspCompiler::new(main_data_root, internal_state_root); + let compiler = DbspCompiler::new( + main_data_root, + internal_state_root, + internal_state_index_root, + ); let circuit = compiler.compile(&logical_plan)?; Ok(circuit) @@ -271,6 +276,7 @@ impl IncrementalView { schema: &Schema, main_data_root: usize, internal_state_root: usize, + internal_state_index_root: usize, ) -> Result { let mut parser = Parser::new(sql.as_bytes()); let cmd = parser.next_cmd()?; @@ -287,6 +293,7 @@ impl IncrementalView { schema, main_data_root, internal_state_root, + internal_state_index_root, ), _ => Err(LimboError::ParseError(format!( "View is not a CREATE MATERIALIZED VIEW statement: {sql}" @@ -300,6 +307,7 @@ impl IncrementalView { schema: &Schema, main_data_root: usize, internal_state_root: usize, + internal_state_index_root: usize, ) -> Result { let name = view_name.name.as_str().to_string(); @@ -327,6 +335,7 @@ impl IncrementalView { schema, main_data_root, internal_state_root, + internal_state_index_root, ) } @@ -340,13 +349,19 @@ impl IncrementalView { schema: &Schema, main_data_root: usize, internal_state_root: usize, + internal_state_index_root: usize, ) -> Result { // Create the tracker that will be shared by all operators let tracker = Arc::new(Mutex::new(ComputationTracker::new())); // Compile the SELECT statement into a DBSP circuit - let circuit = - Self::try_compile_circuit(&select_stmt, schema, main_data_root, internal_state_root)?; + let circuit = Self::try_compile_circuit( + &select_stmt, + schema, + main_data_root, + internal_state_root, + internal_state_index_root, + )?; Ok(Self { name, diff --git a/core/schema.rs b/core/schema.rs index b849586aa..cb2817d77 100644 --- a/core/schema.rs +++ b/core/schema.rs @@ -306,6 +306,8 @@ impl Schema { // Store DBSP state table root pages: view_name -> dbsp_state_root_page let mut dbsp_state_roots: HashMap = HashMap::new(); + // Store DBSP state table index root pages: view_name -> dbsp_state_index_root_page + let mut dbsp_state_index_roots: HashMap = HashMap::new(); // Store materialized view info (SQL and root page) for later creation let mut materialized_view_info: HashMap = HashMap::new(); @@ -357,6 +359,7 @@ impl Schema { &mut from_sql_indexes, &mut automatic_indices, &mut dbsp_state_roots, + &mut dbsp_state_index_roots, &mut materialized_view_info, )?; drop(record_cursor); @@ -369,7 +372,11 @@ impl Schema { self.populate_indices(from_sql_indexes, automatic_indices)?; - self.populate_materialized_views(materialized_view_info, dbsp_state_roots)?; + self.populate_materialized_views( + materialized_view_info, + dbsp_state_roots, + dbsp_state_index_roots, + )?; Ok(()) } @@ -492,6 +499,7 @@ impl Schema { &mut self, materialized_view_info: std::collections::HashMap, dbsp_state_roots: std::collections::HashMap, + dbsp_state_index_roots: std::collections::HashMap, ) -> Result<()> { for (view_name, (sql, main_root)) in materialized_view_info { // Look up the DBSP state root for this view - must exist for materialized views @@ -501,9 +509,17 @@ impl Schema { )) })?; - // Create the IncrementalView with both root pages - let incremental_view = - IncrementalView::from_sql(&sql, self, main_root, *dbsp_state_root)?; + // Look up the DBSP state index root (may not exist for older schemas) + let dbsp_state_index_root = + dbsp_state_index_roots.get(&view_name).copied().unwrap_or(0); + // Create the IncrementalView with all root pages + let incremental_view = IncrementalView::from_sql( + &sql, + self, + main_root, + *dbsp_state_root, + dbsp_state_index_root, + )?; let referenced_tables = incremental_view.get_referenced_table_names(); // Create a BTreeTable for the materialized view @@ -539,6 +555,7 @@ impl Schema { from_sql_indexes: &mut Vec, automatic_indices: &mut std::collections::HashMap>, dbsp_state_roots: &mut std::collections::HashMap, + dbsp_state_index_roots: &mut std::collections::HashMap, materialized_view_info: &mut std::collections::HashMap, ) -> Result<()> { match ty { @@ -593,12 +610,23 @@ impl Schema { // index|sqlite_autoindex_foo_1|foo|3| let index_name = name.to_string(); let table_name = table_name.to_string(); - match automatic_indices.entry(table_name) { - std::collections::hash_map::Entry::Vacant(e) => { - e.insert(vec![(index_name, root_page as usize)]); - } - std::collections::hash_map::Entry::Occupied(mut e) => { - e.get_mut().push((index_name, root_page as usize)); + + // Check if this is an index for a DBSP state table + if table_name.starts_with(DBSP_TABLE_PREFIX) { + // Extract the view name from __turso_internal_dbsp_state_ + let view_name = table_name + .strip_prefix(DBSP_TABLE_PREFIX) + .unwrap() + .to_string(); + dbsp_state_index_roots.insert(view_name, root_page as usize); + } else { + match automatic_indices.entry(table_name) { + std::collections::hash_map::Entry::Vacant(e) => { + e.insert(vec![(index_name, root_page as usize)]); + } + std::collections::hash_map::Entry::Occupied(mut e) => { + e.get_mut().push((index_name, root_page as usize)); + } } } } diff --git a/core/translate/view.rs b/core/translate/view.rs index afcef3331..f89f29817 100644 --- a/core/translate/view.rs +++ b/core/translate/view.rs @@ -2,7 +2,7 @@ use crate::schema::{Schema, DBSP_TABLE_PREFIX}; use crate::storage::pager::CreateBTreeFlags; use crate::translate::emitter::Resolver; use crate::translate::schema::{emit_schema_entry, SchemaEntryType, SQLITE_TABLEID}; -use crate::util::normalize_ident; +use crate::util::{normalize_ident, PRIMARY_KEY_AUTOMATIC_INDEX_NAME_PREFIX}; use crate::vdbe::builder::{CursorType, ProgramBuilder}; use crate::vdbe::insn::{CmpInsFlags, Cookie, Insn, RegisterOrLiteral}; use crate::{Connection, Result, SymbolTable}; @@ -141,7 +141,20 @@ pub fn translate_create_materialized_view( // Add the DBSP state table to sqlite_master (required for materialized views) let dbsp_table_name = format!("{DBSP_TABLE_PREFIX}{normalized_view_name}"); - let dbsp_sql = format!("CREATE TABLE {dbsp_table_name} (key INTEGER PRIMARY KEY, state BLOB)"); + // The element_id column uses SQLite's dynamic typing system to store different value types: + // - For hash-based operators (joins, filters): stores INTEGER hash values or rowids + // - For future MIN/MAX operators: stores the actual values being compared (INTEGER, REAL, TEXT, BLOB) + // SQLite's type affinity and sorting rules ensure correct ordering within each operator's data + let dbsp_sql = format!( + "CREATE TABLE {dbsp_table_name} (\ + operator_id INTEGER NOT NULL, \ + zset_id INTEGER NOT NULL, \ + element_id NOT NULL, \ + value BLOB, \ + weight INTEGER NOT NULL, \ + PRIMARY KEY (operator_id, zset_id, element_id)\ + )" + ); emit_schema_entry( &mut program, @@ -155,11 +168,37 @@ pub fn translate_create_materialized_view( Some(dbsp_sql), )?; + // Create automatic primary key index for the DBSP table + // Since the table has PRIMARY KEY (operator_id, zset_id, element_id), we need an index + let dbsp_index_root_reg = program.alloc_register(); + program.emit_insn(Insn::CreateBtree { + db: 0, + root: dbsp_index_root_reg, + flags: CreateBTreeFlags::new_index(), + }); + + // Register the index in sqlite_schema + let dbsp_index_name = format!( + "{}{}_1", + PRIMARY_KEY_AUTOMATIC_INDEX_NAME_PREFIX, &dbsp_table_name + ); + emit_schema_entry( + &mut program, + &resolver, + sqlite_schema_cursor_id, + None, // cdc_table_cursor_id + SchemaEntryType::Index, + &dbsp_index_name, + &dbsp_table_name, + dbsp_index_root_reg, + None, // Automatic indexes don't store SQL + )?; + // Parse schema to load the new view and DBSP state table program.emit_insn(Insn::ParseSchema { db: sqlite_schema_cursor_id, where_clause: Some(format!( - "name = '{normalized_view_name}' OR name = '{dbsp_table_name}'" + "name = '{normalized_view_name}' OR name = '{dbsp_table_name}' OR name = '{dbsp_index_name}'" )), }); diff --git a/core/util.rs b/core/util.rs index 5ec33a7e2..1c4217a66 100644 --- a/core/util.rs +++ b/core/util.rs @@ -163,6 +163,9 @@ pub fn parse_schema_rows( // Store DBSP state table root pages: view_name -> dbsp_state_root_page let mut dbsp_state_roots: std::collections::HashMap = std::collections::HashMap::new(); + // Store DBSP state table index root pages: view_name -> dbsp_state_index_root_page + let mut dbsp_state_index_roots: std::collections::HashMap = + std::collections::HashMap::new(); // Store materialized view info (SQL and root page) for later creation let mut materialized_view_info: std::collections::HashMap = std::collections::HashMap::new(); @@ -185,8 +188,9 @@ pub fn parse_schema_rows( &mut from_sql_indexes, &mut automatic_indices, &mut dbsp_state_roots, + &mut dbsp_state_index_roots, &mut materialized_view_info, - )?; + )? } StepResult::IO => { // TODO: How do we ensure that the I/O we submitted to @@ -200,7 +204,11 @@ pub fn parse_schema_rows( } schema.populate_indices(from_sql_indexes, automatic_indices)?; - schema.populate_materialized_views(materialized_view_info, dbsp_state_roots)?; + schema.populate_materialized_views( + materialized_view_info, + dbsp_state_roots, + dbsp_state_index_roots, + )?; Ok(()) }