diff --git a/core/incremental/compiler.rs b/core/incremental/compiler.rs index 9dc85ad79..b15dc547c 100644 --- a/core/incremental/compiler.rs +++ b/core/incremental/compiler.rs @@ -8,15 +8,15 @@ use crate::incremental::dbsp::{Delta, DeltaPair}; use crate::incremental::expr_compiler::CompiledExpression; use crate::incremental::operator::{ - EvalState, FilterOperator, FilterPredicate, IncrementalOperator, InputOperator, ProjectOperator, + create_dbsp_state_index, DbspStateCursors, EvalState, FilterOperator, FilterPredicate, + IncrementalOperator, InputOperator, ProjectOperator, }; -use crate::incremental::persistence::WriteRow; -use crate::storage::btree::BTreeCursor; +use crate::storage::btree::{BTreeCursor, BTreeKey}; // Note: logical module must be made pub(crate) in translate/mod.rs use crate::translate::logical::{ BinaryOperator, LogicalExpr, LogicalPlan, LogicalSchema, SchemaRef, }; -use crate::types::{IOResult, SeekKey, Value}; +use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult, Value}; use crate::Pager; use crate::{return_and_restore_if_io, return_if_io, LimboError, Result}; use std::collections::HashMap; @@ -24,8 +24,120 @@ use std::fmt::{self, Display, Formatter}; use std::rc::Rc; use std::sync::Arc; -// The state table is always a key-value store with 3 columns: key, state, and weight. -const OPERATOR_COLUMNS: usize = 3; +// The state table has 5 columns: operator_id, zset_id, element_id, value, weight +const OPERATOR_COLUMNS: usize = 5; + +/// State machine for writing rows to simple materialized views (table-only, no index) +#[derive(Debug, Default)] +pub enum WriteRowView { + #[default] + GetRecord, + Delete, + Insert { + final_weight: isize, + }, + Done, +} + +impl WriteRowView { + pub fn new() -> Self { + Self::default() + } + + /// Write a row with weight management for table-only storage. + /// + /// # Arguments + /// * `cursor` - BTree cursor for the storage + /// * `key` - The key to seek (TableRowId) + /// * `build_record` - Function that builds the record values to insert. + /// Takes the final_weight and returns the complete record values. + /// * `weight` - The weight delta to apply + pub fn write_row( + &mut self, + cursor: &mut BTreeCursor, + key: SeekKey, + build_record: impl Fn(isize) -> Vec, + weight: isize, + ) -> Result> { + loop { + match self { + WriteRowView::GetRecord => { + let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); + if !matches!(res, SeekResult::Found) { + *self = WriteRowView::Insert { + final_weight: weight, + }; + } else { + let existing_record = return_if_io!(cursor.record()); + let r = existing_record.ok_or_else(|| { + LimboError::InternalError(format!( + "Found key {key:?} in storage but could not read record" + )) + })?; + let values = r.get_values(); + + // Weight is always the last value + let existing_weight = match values.last() { + Some(val) => match val.to_owned() { + Value::Integer(w) => w as isize, + _ => { + return Err(LimboError::InternalError(format!( + "Invalid weight value in storage for key {key:?}" + ))) + } + }, + None => { + return Err(LimboError::InternalError(format!( + "No weight value found in storage for key {key:?}" + ))) + } + }; + + let final_weight = existing_weight + weight; + if final_weight <= 0 { + *self = WriteRowView::Delete + } else { + *self = WriteRowView::Insert { final_weight } + } + } + } + WriteRowView::Delete => { + // Mark as Done before delete to avoid retry on I/O + *self = WriteRowView::Done; + return_if_io!(cursor.delete()); + } + WriteRowView::Insert { final_weight } => { + return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); + + // Extract the row ID from the key + let key_i64 = match key { + SeekKey::TableRowId(id) => id, + _ => { + return Err(LimboError::InternalError( + "Expected TableRowId for storage".to_string(), + )) + } + }; + + // Build the record values using the provided function + let record_values = build_record(*final_weight); + + // Create an ImmutableRecord from the values + let immutable_record = + ImmutableRecord::from_values(&record_values, record_values.len()); + let btree_key = BTreeKey::new_table_rowid(key_i64, Some(&immutable_record)); + + // Mark as Done before insert to avoid retry on I/O + *self = WriteRowView::Done; + return_if_io!(cursor.insert(&btree_key)); + } + WriteRowView::Done => { + return Ok(IOResult::Done(())); + } + } + } + } +} /// State machine for commit operations pub enum CommitState { @@ -36,8 +148,8 @@ pub enum CommitState { CommitOperators { /// Execute state for running the circuit execute_state: Box, - /// Persistent cursor for operator state btree (internal_state_root) - state_cursor: Box, + /// Persistent cursors for operator state (table and index) + state_cursors: Box, }, /// Updating the materialized view with the delta @@ -47,7 +159,7 @@ pub enum CommitState { /// Current index in delta.changes being processed current_index: usize, /// State for writing individual rows - write_row_state: WriteRow, + write_row_state: WriteRowView, /// Cursor for view data btree - created fresh for each row view_cursor: Box, }, @@ -60,7 +172,8 @@ impl std::fmt::Debug for CommitState { Self::CommitOperators { execute_state, .. } => f .debug_struct("CommitOperators") .field("execute_state", execute_state) - .field("has_state_cursor", &true) + .field("has_state_table_cursor", &true) + .field("has_state_index_cursor", &true) .finish(), Self::UpdateView { delta, @@ -221,25 +334,13 @@ impl std::fmt::Debug for DbspNode { impl DbspNode { fn process_node( &mut self, - pager: Rc, eval_state: &mut EvalState, - root_page: usize, commit_operators: bool, - state_cursor: Option<&mut Box>, + cursors: &mut DbspStateCursors, ) -> Result> { // Process delta using the executable operator let op = &mut self.executable; - // Use provided cursor or create a local one - let mut local_cursor; - let cursor = if let Some(cursor) = state_cursor { - cursor.as_mut() - } else { - // Create a local cursor if none was provided - local_cursor = BTreeCursor::new_table(None, pager.clone(), root_page, OPERATOR_COLUMNS); - &mut local_cursor - }; - let state = if commit_operators { // Clone the deltas from eval_state - don't extract them // in case we need to re-execute due to I/O @@ -247,12 +348,12 @@ impl DbspNode { EvalState::Init { deltas } => deltas.clone(), _ => panic!("commit can only be called when eval_state is in Init state"), }; - let result = return_if_io!(op.commit(deltas, cursor)); + let result = return_if_io!(op.commit(deltas, cursors)); // After successful commit, move state to Done *eval_state = EvalState::Done; result } else { - return_if_io!(op.eval(eval_state, cursor)) + return_if_io!(op.eval(eval_state, cursors)) }; Ok(IOResult::Done(state)) } @@ -275,14 +376,20 @@ pub struct DbspCircuit { /// Root page for the main materialized view data pub(super) main_data_root: usize, - /// Root page for internal DBSP state + /// Root page for internal DBSP state table pub(super) internal_state_root: usize, + /// Root page for the DBSP state table's primary key index + pub(super) internal_state_index_root: usize, } impl DbspCircuit { /// Create a new empty circuit with initial empty schema /// The actual output schema will be set when the root node is established - pub fn new(main_data_root: usize, internal_state_root: usize) -> Self { + pub fn new( + main_data_root: usize, + internal_state_root: usize, + internal_state_index_root: usize, + ) -> Self { // Start with an empty schema - will be updated when root is set let empty_schema = Arc::new(LogicalSchema::new(vec![])); Self { @@ -293,6 +400,7 @@ impl DbspCircuit { commit_state: CommitState::Init, main_data_root, internal_state_root, + internal_state_index_root, } } @@ -326,18 +434,18 @@ impl DbspCircuit { pub fn run_circuit( &mut self, - pager: Rc, execute_state: &mut ExecuteState, + pager: &Rc, + state_cursors: &mut DbspStateCursors, commit_operators: bool, - state_cursor: &mut Box, ) -> Result> { if let Some(root_id) = self.root { self.execute_node( root_id, - pager, + pager.clone(), execute_state, commit_operators, - Some(state_cursor), + state_cursors, ) } else { Err(LimboError::ParseError( @@ -358,7 +466,23 @@ impl DbspCircuit { execute_state: &mut ExecuteState, ) -> Result> { if let Some(root_id) = self.root { - self.execute_node(root_id, pager, execute_state, false, None) + // Create temporary cursors for execute (non-commit) operations + let table_cursor = BTreeCursor::new_table( + None, + pager.clone(), + self.internal_state_root, + OPERATOR_COLUMNS, + ); + let index_def = create_dbsp_state_index(self.internal_state_index_root); + let index_cursor = BTreeCursor::new_index( + None, + pager.clone(), + self.internal_state_index_root, + &index_def, + 3, + ); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); + self.execute_node(root_id, pager, execute_state, false, &mut cursors) } else { Err(LimboError::ParseError( "Circuit has no root node".to_string(), @@ -398,29 +522,42 @@ impl DbspCircuit { let mut state = std::mem::replace(&mut self.commit_state, CommitState::Init); match &mut state { CommitState::Init => { - // Create state cursor when entering CommitOperators state - let state_cursor = Box::new(BTreeCursor::new_table( + // Create state cursors when entering CommitOperators state + let state_table_cursor = BTreeCursor::new_table( None, pager.clone(), self.internal_state_root, OPERATOR_COLUMNS, + ); + let index_def = create_dbsp_state_index(self.internal_state_index_root); + let state_index_cursor = BTreeCursor::new_index( + None, + pager.clone(), + self.internal_state_index_root, + &index_def, + 3, // Index on first 3 columns + ); + + let state_cursors = Box::new(DbspStateCursors::new( + state_table_cursor, + state_index_cursor, )); self.commit_state = CommitState::CommitOperators { execute_state: Box::new(ExecuteState::Init { input_data: input_delta_set.clone(), }), - state_cursor, + state_cursors, }; } CommitState::CommitOperators { ref mut execute_state, - ref mut state_cursor, + ref mut state_cursors, } => { let delta = return_and_restore_if_io!( &mut self.commit_state, state, - self.run_circuit(pager.clone(), execute_state, true, state_cursor) + self.run_circuit(execute_state, &pager, state_cursors, true,) ); // Create view cursor when entering UpdateView state @@ -434,7 +571,7 @@ impl DbspCircuit { self.commit_state = CommitState::UpdateView { delta, current_index: 0, - write_row_state: WriteRow::new(), + write_row_state: WriteRowView::new(), view_cursor, }; } @@ -453,7 +590,7 @@ impl DbspCircuit { // If we're starting a new row (GetRecord state), we need a fresh cursor // due to btree cursor state machine limitations - if matches!(write_row_state, WriteRow::GetRecord) { + if matches!(write_row_state, WriteRowView::GetRecord) { *view_cursor = Box::new(BTreeCursor::new_table( None, pager.clone(), @@ -493,7 +630,7 @@ impl DbspCircuit { self.commit_state = CommitState::UpdateView { delta, current_index: *current_index + 1, - write_row_state: WriteRow::new(), + write_row_state: WriteRowView::new(), view_cursor, }; } @@ -509,7 +646,7 @@ impl DbspCircuit { pager: Rc, execute_state: &mut ExecuteState, commit_operators: bool, - state_cursor: Option<&mut Box>, + cursors: &mut DbspStateCursors, ) -> Result> { loop { match execute_state { @@ -577,12 +714,30 @@ impl DbspCircuit { // Get the (node_id, state) pair for the current index let (input_node_id, input_state) = &mut input_states[*current_index]; + // Create temporary cursors for the recursive call + let temp_table_cursor = BTreeCursor::new_table( + None, + pager.clone(), + self.internal_state_root, + OPERATOR_COLUMNS, + ); + let index_def = create_dbsp_state_index(self.internal_state_index_root); + let temp_index_cursor = BTreeCursor::new_index( + None, + pager.clone(), + self.internal_state_index_root, + &index_def, + 3, + ); + let mut temp_cursors = + DbspStateCursors::new(temp_table_cursor, temp_index_cursor); + let delta = return_if_io!(self.execute_node( *input_node_id, pager.clone(), input_state, commit_operators, - None // Input nodes don't need state cursor + &mut temp_cursors )); input_deltas.push(delta); *current_index += 1; @@ -595,13 +750,8 @@ impl DbspCircuit { .get_mut(&node_id) .ok_or_else(|| LimboError::ParseError("Node not found".to_string()))?; - let output_delta = return_if_io!(node.process_node( - pager.clone(), - eval_state, - self.internal_state_root, - commit_operators, - state_cursor, - )); + let output_delta = + return_if_io!(node.process_node(eval_state, commit_operators, cursors,)); return Ok(IOResult::Done(output_delta)); } } @@ -660,9 +810,17 @@ pub struct DbspCompiler { impl DbspCompiler { /// Create a new DBSP compiler - pub fn new(main_data_root: usize, internal_state_root: usize) -> Self { + pub fn new( + main_data_root: usize, + internal_state_root: usize, + internal_state_index_root: usize, + ) -> Self { Self { - circuit: DbspCircuit::new(main_data_root, internal_state_root), + circuit: DbspCircuit::new( + main_data_root, + internal_state_root, + internal_state_index_root, + ), } } @@ -1252,7 +1410,7 @@ mod tests { }}; } - fn setup_btree_for_circuit() -> (Rc, usize, usize) { + fn setup_btree_for_circuit() -> (Rc, usize, usize, usize) { let io: Arc = Arc::new(MemoryIO::new()); let db = Database::open_file(io.clone(), ":memory:", false, false).unwrap(); let conn = db.connect().unwrap(); @@ -1270,13 +1428,24 @@ mod tests { .block(|| pager.btree_create(&CreateBTreeFlags::new_table())) .unwrap() as usize; - (pager, main_root_page, dbsp_state_page) + let dbsp_state_index_page = pager + .io + .block(|| pager.btree_create(&CreateBTreeFlags::new_index())) + .unwrap() as usize; + + ( + pager, + main_root_page, + dbsp_state_page, + dbsp_state_index_page, + ) } // Macro to compile SQL to DBSP circuit macro_rules! compile_sql { ($sql:expr) => {{ - let (pager, main_root_page, dbsp_state_page) = setup_btree_for_circuit(); + let (pager, main_root_page, dbsp_state_page, dbsp_state_index_page) = + setup_btree_for_circuit(); let schema = test_schema!(); let mut parser = Parser::new($sql.as_bytes()); let cmd = parser @@ -1289,7 +1458,7 @@ mod tests { let mut builder = LogicalPlanBuilder::new(&schema); let logical_plan = builder.build_statement(&stmt).unwrap(); ( - DbspCompiler::new(main_root_page, dbsp_state_page) + DbspCompiler::new(main_root_page, dbsp_state_page, dbsp_state_index_page) .compile(&logical_plan) .unwrap(), pager, @@ -3162,10 +3331,10 @@ mod tests { #[test] fn test_circuit_rowid_update_consolidation() { - let (pager, p1, p2) = setup_btree_for_circuit(); + let (pager, p1, p2, p3) = setup_btree_for_circuit(); // Test that circuit properly consolidates state when rowid changes - let mut circuit = DbspCircuit::new(p1, p2); + let mut circuit = DbspCircuit::new(p1, p2, p3); // Create a simple filter node let schema = Arc::new(LogicalSchema::new(vec![ diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 825290bef..1c735df7d 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -6,8 +6,9 @@ use crate::function::{AggFunc, Func}; use crate::incremental::dbsp::{Delta, DeltaPair, HashableRow}; use crate::incremental::expr_compiler::CompiledExpression; use crate::incremental::persistence::{ReadRecord, WriteRow}; +use crate::schema::{Index, IndexColumn}; use crate::storage::btree::BTreeCursor; -use crate::types::{IOResult, SeekKey, Text}; +use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult, Text}; use crate::{ return_and_restore_if_io, return_if_io, Connection, Database, Result, SymbolTable, Value, }; @@ -17,6 +18,77 @@ use std::sync::{Arc, Mutex}; use turso_macros::match_ignore_ascii_case; use turso_parser::ast::{As, Expr, Literal, Name, OneSelect, Operator, ResultColumn}; +/// Struct to hold both table and index cursors for DBSP state operations +pub struct DbspStateCursors { + /// Cursor for the DBSP state table + pub table_cursor: BTreeCursor, + /// Cursor for the DBSP state table's primary key index + pub index_cursor: BTreeCursor, +} + +impl DbspStateCursors { + /// Create a new DbspStateCursors with both table and index cursors + pub fn new(table_cursor: BTreeCursor, index_cursor: BTreeCursor) -> Self { + Self { + table_cursor, + index_cursor, + } + } +} + +/// Create an index definition for the DBSP state table +/// This defines the primary key index on (operator_id, zset_id, element_id) +pub fn create_dbsp_state_index(root_page: usize) -> Index { + Index { + name: "dbsp_state_pk".to_string(), + table_name: "dbsp_state".to_string(), + root_page, + columns: vec![ + IndexColumn { + name: "operator_id".to_string(), + order: turso_parser::ast::SortOrder::Asc, + collation: None, + pos_in_table: 0, + default: None, + }, + IndexColumn { + name: "zset_id".to_string(), + order: turso_parser::ast::SortOrder::Asc, + collation: None, + pos_in_table: 1, + default: None, + }, + IndexColumn { + name: "element_id".to_string(), + order: turso_parser::ast::SortOrder::Asc, + collation: None, + pos_in_table: 2, + default: None, + }, + ], + unique: true, + ephemeral: false, + has_rowid: true, + } +} + +/// Storage key types for different operator contexts +#[derive(Debug, Clone, Copy)] +pub enum StorageKeyType { + /// For aggregate operators - uses operator_id * 2 + Aggregate { operator_id: usize }, +} + +impl StorageKeyType { + /// Get the unique storage ID using the same formula as before + /// This ensures different operators get unique IDs + pub fn to_storage_id(self) -> u64 { + match self { + StorageKeyType::Aggregate { operator_id } => (operator_id as u64), + } + } +} + type ComputedStates = HashMap, AggregateState)>; // group_key_str -> (group_key, state) #[derive(Debug)] enum AggregateCommitState { @@ -44,12 +116,20 @@ pub enum EvalState { Init { deltas: DeltaPair, }, + FetchKey { + delta: Delta, // Keep original delta for merge operation + current_idx: usize, + groups_to_read: Vec<(String, Vec)>, // Changed to Vec for index-based access + existing_groups: HashMap, + old_values: HashMap>, + }, FetchData { delta: Delta, // Keep original delta for merge operation current_idx: usize, groups_to_read: Vec<(String, Vec)>, // Changed to Vec for index-based access existing_groups: HashMap, old_values: HashMap>, + rowid: Option, // Rowid found by FetchKey (None if not found) read_record_state: Box, }, Done, @@ -101,20 +181,19 @@ impl EvalState { let _ = std::mem::replace( self, - EvalState::FetchData { + EvalState::FetchKey { delta, current_idx: 0, groups_to_read: groups_to_read.into_iter().collect(), // Convert BTreeMap to Vec existing_groups: HashMap::new(), old_values: HashMap::new(), - read_record_state: Box::new(ReadRecord::new()), }, ); } fn process_delta( &mut self, operator: &mut AggregateOperator, - cursor: &mut BTreeCursor, + cursors: &mut DbspStateCursors, ) -> Result> { loop { match self { @@ -124,13 +203,12 @@ impl EvalState { EvalState::Init { .. } => { panic!("State machine not supposed to reach the init state! advance() should have been called"); } - EvalState::FetchData { + EvalState::FetchKey { delta, current_idx, groups_to_read, existing_groups, old_values, - read_record_state, } => { if *current_idx >= groups_to_read.len() { // All groups processed, compute final output @@ -140,31 +218,102 @@ impl EvalState { return Ok(IOResult::Done(result)); } else { // Get the current group to read - let (group_key_str, group_key) = &groups_to_read[*current_idx]; + let (group_key_str, _group_key) = &groups_to_read[*current_idx]; - let seek_key = operator.generate_storage_key(group_key_str); - let key = SeekKey::TableRowId(seek_key); + // Build the key for the index: (operator_id, zset_id, element_id) + let storage_key = StorageKeyType::Aggregate { + operator_id: operator.operator_id, + }; + let operator_storage_id = storage_key.to_storage_id() as i64; + let zset_id = operator.generate_group_rowid(group_key_str); + let element_id = 0i64; // Always 0 for aggregators + // Create index key values + let index_key_values = vec![ + Value::Integer(operator_storage_id), + Value::Integer(zset_id), + Value::Integer(element_id), + ]; + + // Create an immutable record for the index key + let index_record = + ImmutableRecord::from_values(&index_key_values, index_key_values.len()); + + // Seek in the index to find if this row exists + let seek_result = return_if_io!(cursors.index_cursor.seek( + SeekKey::IndexKey(&index_record), + SeekOp::GE { eq_only: true } + )); + + let rowid = if matches!(seek_result, SeekResult::Found) { + // Found in index, get the table rowid + // The btree code handles extracting the rowid from the index record for has_rowid indexes + let rowid = return_if_io!(cursors.index_cursor.rowid()); + rowid + } else { + // Not found in index, no existing state + None + }; + + // Always transition to FetchData + let taken_existing = std::mem::take(existing_groups); + let taken_old_values = std::mem::take(old_values); + let next_state = EvalState::FetchData { + delta: std::mem::take(delta), + current_idx: *current_idx, + groups_to_read: std::mem::take(groups_to_read), + existing_groups: taken_existing, + old_values: taken_old_values, + rowid, + read_record_state: Box::new(ReadRecord::new()), + }; + *self = next_state; + } + } + EvalState::FetchData { + delta, + current_idx, + groups_to_read, + existing_groups, + old_values, + rowid, + read_record_state, + } => { + // Get the current group to read + let (group_key_str, group_key) = &groups_to_read[*current_idx]; + + // Only try to read if we have a rowid + if let Some(rowid) = rowid { + let key = SeekKey::TableRowId(*rowid); let state = return_if_io!(read_record_state.read_record( key, &operator.aggregates, - cursor + &mut cursors.table_cursor )); - - // Anything that mutates state has to happen after return_if_io! - // Unfortunately there's no good way to enforce that without turning - // this into a hot mess of mem::takes. + // Process the fetched state if let Some(state) = state { let mut old_row = group_key.clone(); old_row.extend(state.to_values(&operator.aggregates)); old_values.insert(group_key_str.clone(), old_row); existing_groups.insert(group_key_str.clone(), state.clone()); } - - // All attributes mutated in place. - *current_idx += 1; - *read_record_state = Box::new(ReadRecord::new()); + } else { + // No rowid for this group, skipping read } + // If no rowid, there's no existing state for this group + + // Move to next group + let next_idx = *current_idx + 1; + let taken_existing = std::mem::take(existing_groups); + let taken_old_values = std::mem::take(old_values); + let next_state = EvalState::FetchKey { + delta: std::mem::take(delta), + current_idx: next_idx, + groups_to_read: std::mem::take(groups_to_read), + existing_groups: taken_existing, + old_values: taken_old_values, + }; + *self = next_state; } EvalState::Done => { return Ok(IOResult::Done((Delta::new(), HashMap::new()))); @@ -511,17 +660,25 @@ pub trait IncrementalOperator: Debug { /// /// # Arguments /// * `state` - The evaluation state (may be in progress from a previous I/O operation) - /// * `cursor` - Cursor for reading operator state from storage + /// * `cursors` - Cursors for reading operator state from storage (table and optional index) /// /// # Returns /// The output delta from the evaluation - fn eval(&mut self, state: &mut EvalState, cursor: &mut BTreeCursor) -> Result>; + fn eval( + &mut self, + state: &mut EvalState, + cursors: &mut DbspStateCursors, + ) -> Result>; /// Commit deltas to the operator's internal state and return the output /// This is called when a transaction commits, making changes permanent /// Returns the output delta (what downstream operators should see) - /// The cursor parameter is for operators that need to persist state - fn commit(&mut self, deltas: DeltaPair, cursor: &mut BTreeCursor) -> Result>; + /// The cursors parameter is for operators that need to persist state + fn commit( + &mut self, + deltas: DeltaPair, + cursors: &mut DbspStateCursors, + ) -> Result>; /// Set computation tracker fn set_tracker(&mut self, tracker: Arc>); @@ -548,7 +705,7 @@ impl IncrementalOperator for InputOperator { fn eval( &mut self, state: &mut EvalState, - _cursor: &mut BTreeCursor, + _cursors: &mut DbspStateCursors, ) -> Result> { match state { EvalState::Init { deltas } => { @@ -567,7 +724,11 @@ impl IncrementalOperator for InputOperator { } } - fn commit(&mut self, deltas: DeltaPair, _cursor: &mut BTreeCursor) -> Result> { + fn commit( + &mut self, + deltas: DeltaPair, + _cursors: &mut DbspStateCursors, + ) -> Result> { // Input operator only uses left delta, right must be empty assert!( deltas.right.is_empty(), @@ -697,7 +858,7 @@ impl IncrementalOperator for FilterOperator { fn eval( &mut self, state: &mut EvalState, - _cursor: &mut BTreeCursor, + _cursors: &mut DbspStateCursors, ) -> Result> { let delta = match state { EvalState::Init { deltas } => { @@ -733,7 +894,11 @@ impl IncrementalOperator for FilterOperator { Ok(IOResult::Done(output_delta)) } - fn commit(&mut self, deltas: DeltaPair, _cursor: &mut BTreeCursor) -> Result> { + fn commit( + &mut self, + deltas: DeltaPair, + _cursors: &mut DbspStateCursors, + ) -> Result> { // Filter operator only uses left delta, right must be empty assert!( deltas.right.is_empty(), @@ -1106,7 +1271,7 @@ impl IncrementalOperator for ProjectOperator { fn eval( &mut self, state: &mut EvalState, - _cursor: &mut BTreeCursor, + _cursors: &mut DbspStateCursors, ) -> Result> { let delta = match state { EvalState::Init { deltas } => { @@ -1138,7 +1303,11 @@ impl IncrementalOperator for ProjectOperator { Ok(IOResult::Done(output_delta)) } - fn commit(&mut self, deltas: DeltaPair, _cursor: &mut BTreeCursor) -> Result> { + fn commit( + &mut self, + deltas: DeltaPair, + _cursors: &mut DbspStateCursors, + ) -> Result> { // Project operator only uses left delta, right must be empty assert!( deltas.right.is_empty(), @@ -1466,7 +1635,7 @@ impl AggregateOperator { fn eval_internal( &mut self, state: &mut EvalState, - cursor: &mut BTreeCursor, + cursors: &mut DbspStateCursors, ) -> Result> { match state { EvalState::Uninitialized => { @@ -1493,7 +1662,7 @@ impl AggregateOperator { } state.advance(groups_to_read); } - EvalState::FetchData { .. } => { + EvalState::FetchKey { .. } | EvalState::FetchData { .. } => { // Already in progress, continue processing on process_delta below. } EvalState::Done => { @@ -1502,7 +1671,7 @@ impl AggregateOperator { } // Process the delta through the state machine - let result = return_if_io!(state.process_delta(self, cursor)); + let result = return_if_io!(state.process_delta(self, cursors)); Ok(IOResult::Done(result)) } @@ -1636,12 +1805,20 @@ impl AggregateOperator { } impl IncrementalOperator for AggregateOperator { - fn eval(&mut self, state: &mut EvalState, cursor: &mut BTreeCursor) -> Result> { - let (delta, _) = return_if_io!(self.eval_internal(state, cursor)); + fn eval( + &mut self, + state: &mut EvalState, + cursors: &mut DbspStateCursors, + ) -> Result> { + let (delta, _) = return_if_io!(self.eval_internal(state, cursors)); Ok(IOResult::Done(delta)) } - fn commit(&mut self, deltas: DeltaPair, cursor: &mut BTreeCursor) -> Result> { + fn commit( + &mut self, + deltas: DeltaPair, + cursors: &mut DbspStateCursors, + ) -> Result> { // Aggregate operator only uses left delta, right must be empty assert!( deltas.right.is_empty(), @@ -1666,7 +1843,7 @@ impl IncrementalOperator for AggregateOperator { let (output_delta, computed_states) = return_and_restore_if_io!( &mut self.commit_state, state, - self.eval_internal(eval_state, cursor) + self.eval_internal(eval_state, cursors) ); self.commit_state = AggregateCommitState::PersistDelta { delta: output_delta, @@ -1690,34 +1867,42 @@ impl IncrementalOperator for AggregateOperator { } else { let (group_key_str, (group_key, agg_state)) = states_vec[*current_idx]; - let seek_key = self.seek_key_from_str(group_key_str); + // Build the key components for the new table structure + let storage_key = StorageKeyType::Aggregate { + operator_id: self.operator_id, + }; + let operator_storage_id = storage_key.to_storage_id() as i64; + let zset_id = self.generate_group_rowid(group_key_str); + let element_id = 0i64; // Determine weight: -1 to delete (cancels existing weight=1), 1 to insert/update let weight = if agg_state.count == 0 { -1 } else { 1 }; // Serialize the aggregate state with group key (even for deletion, we need a row) let state_blob = agg_state.to_blob(&self.aggregates, group_key); - let blob_row = HashableRow::new(0, vec![Value::Blob(state_blob)]); + let blob_value = Value::Blob(state_blob); - // Build the aggregate storage format: [key, blob, weight] - let seek_key_clone = seek_key.clone(); - let blob_value = blob_row.values[0].clone(); - let build_fn = move |final_weight: isize| -> Vec { - let key_i64 = match seek_key_clone.clone() { - SeekKey::TableRowId(id) => id, - _ => panic!("Expected TableRowId"), - }; - vec![ - Value::Integer(key_i64), - blob_value.clone(), // The blob with serialized state - Value::Integer(final_weight as i64), - ] - }; + // Build the aggregate storage format: [operator_id, zset_id, element_id, value, weight] + let operator_id_val = Value::Integer(operator_storage_id); + let zset_id_val = Value::Integer(zset_id); + let element_id_val = Value::Integer(element_id); + let blob_val = blob_value.clone(); + + // Create index key - the first 3 columns of our primary key + let index_key = vec![ + operator_id_val.clone(), + zset_id_val.clone(), + element_id_val.clone(), + ]; + + // Record values (without weight) + let record_values = + vec![operator_id_val, zset_id_val, element_id_val, blob_val]; return_and_restore_if_io!( &mut self.commit_state, state, - write_row.write_row(cursor, seek_key, build_fn, weight) + write_row.write_row(cursors, index_key, record_values, weight) ); let delta = std::mem::take(delta); @@ -1755,8 +1940,8 @@ mod tests { use crate::{Database, MemoryIO, IO}; use std::sync::{Arc, Mutex}; - /// Create a test pager for operator tests - fn create_test_pager() -> (std::rc::Rc, usize) { + /// Create a test pager for operator tests with both table and index + fn create_test_pager() -> (std::rc::Rc, usize, usize) { let io: Arc = Arc::new(MemoryIO::new()); let db = Database::open_file(io.clone(), ":memory:", false, false).unwrap(); let conn = db.connect().unwrap(); @@ -1766,14 +1951,21 @@ mod tests { // Allocate page 1 first (database header) let _ = pager.io.block(|| pager.allocate_page1()); - // Properly create a BTree for aggregate state using the pager API - let root_page_id = pager + // Create a BTree for the table + let table_root_page_id = pager .io .block(|| pager.btree_create(&CreateBTreeFlags::new_table())) - .expect("Failed to create BTree for aggregate state") + .expect("Failed to create BTree for aggregate state table") as usize; - (pager, root_page_id) + // Create a BTree for the index + let index_root_page_id = pager + .io + .block(|| pager.btree_create(&CreateBTreeFlags::new_index())) + .expect("Failed to create BTree for aggregate state index") + as usize; + + (pager, table_root_page_id, index_root_page_id) } /// Read the current state from the BTree (for testing) @@ -1781,23 +1973,23 @@ mod tests { fn get_current_state_from_btree( agg: &AggregateOperator, pager: &std::rc::Rc, - cursor: &mut BTreeCursor, + cursors: &mut DbspStateCursors, ) -> Delta { let mut result = Delta::new(); // Rewind to start of table - pager.io.block(|| cursor.rewind()).unwrap(); + pager.io.block(|| cursors.table_cursor.rewind()).unwrap(); loop { // Check if cursor is empty (no more rows) - if cursor.is_empty() { + if cursors.table_cursor.is_empty() { break; } // Get the record at this position let record = pager .io - .block(|| cursor.record()) + .block(|| cursors.table_cursor.record()) .unwrap() .unwrap() .to_owned(); @@ -1805,18 +1997,21 @@ mod tests { let values_ref = record.get_values(); let values: Vec = values_ref.into_iter().map(|x| x.to_owned()).collect(); - // Check if this record belongs to our operator - if let Some(Value::Integer(key)) = values.first() { - let operator_part = (key >> 32) as usize; + // Parse the 5-column structure: operator_id, zset_id, element_id, value, weight + if let Some(Value::Integer(op_id)) = values.first() { + let storage_key = StorageKeyType::Aggregate { + operator_id: agg.operator_id, + }; + let expected_op_id = storage_key.to_storage_id() as i64; // Skip if not our operator - if operator_part != agg.operator_id { - pager.io.block(|| cursor.next()).unwrap(); + if *op_id != expected_op_id { + pager.io.block(|| cursors.table_cursor.next()).unwrap(); continue; } - // Get the blob data - if let Some(Value::Blob(blob)) = values.get(1) { + // Get the blob data from column 3 (value column) + if let Some(Value::Blob(blob)) = values.get(3) { // Deserialize the state if let Some((state, group_key)) = AggregateState::from_blob(blob, &agg.aggregates) @@ -1836,7 +2031,7 @@ mod tests { } } - pager.io.block(|| cursor.next()).unwrap(); + pager.io.block(|| cursors.table_cursor.next()).unwrap(); } result.consolidate(); @@ -1871,8 +2066,14 @@ mod tests { // and an insertion (+1) of the new value. // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); // Create an aggregate operator for SUM(age) with no GROUP BY let mut agg = AggregateOperator::new( @@ -1912,11 +2113,11 @@ mod tests { // Initialize with initial data pager .io - .block(|| agg.commit((&initial_delta).into(), &mut cursor)) + .block(|| agg.commit((&initial_delta).into(), &mut cursors)) .unwrap(); // Verify initial state: SUM(age) = 25 + 30 + 35 = 90 - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes.len(), 1, "Should have one aggregate row"); let (row, weight) = &state.changes[0]; assert_eq!(*weight, 1, "Aggregate row should have weight 1"); @@ -1936,7 +2137,7 @@ mod tests { // Process the incremental update let output_delta = pager .io - .block(|| agg.commit((&update_delta).into(), &mut cursor)) + .block(|| agg.commit((&update_delta).into(), &mut cursors)) .unwrap(); // CRITICAL: The output delta should contain TWO changes: @@ -1985,8 +2186,14 @@ mod tests { // Create an aggregate operator for SUM(score) GROUP BY team // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2033,11 +2240,11 @@ mod tests { // Initialize with initial data pager .io - .block(|| agg.commit((&initial_delta).into(), &mut cursor)) + .block(|| agg.commit((&initial_delta).into(), &mut cursors)) .unwrap(); // Verify initial state: red team = 30, blue team = 15 - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes.len(), 2, "Should have two groups"); // Find the red and blue team aggregates @@ -2079,7 +2286,7 @@ mod tests { // Process the incremental update let output_delta = pager .io - .block(|| agg.commit((&update_delta).into(), &mut cursor)) + .block(|| agg.commit((&update_delta).into(), &mut cursors)) .unwrap(); // Should have 2 changes: retraction of old red team sum, insertion of new red team sum @@ -2130,8 +2337,14 @@ mod tests { let tracker = Arc::new(Mutex::new(ComputationTracker::new())); // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); // Create COUNT(*) GROUP BY category let mut agg = AggregateOperator::new( @@ -2161,7 +2374,7 @@ mod tests { } pager .io - .block(|| agg.commit((&initial).into(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursors)) .unwrap(); // Reset tracker for delta processing @@ -2180,13 +2393,13 @@ mod tests { pager .io - .block(|| agg.commit((&delta).into(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursors)) .unwrap(); assert_eq!(tracker.lock().unwrap().aggregation_updates, 1); // Check the final state - cat_0 should now have count 11 - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); let cat_0 = final_state .changes .iter() @@ -2205,8 +2418,14 @@ mod tests { // Create SUM(amount) GROUP BY product // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2248,11 +2467,11 @@ mod tests { ); pager .io - .block(|| agg.commit((&initial).into(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursors)) .unwrap(); // Check initial state: Widget=250, Gadget=200 - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); let widget_sum = state .changes .iter() @@ -2277,13 +2496,13 @@ mod tests { pager .io - .block(|| agg.commit((&delta).into(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursors)) .unwrap(); assert_eq!(tracker.lock().unwrap().aggregation_updates, 1); // Check final state - Widget should now be 300 (250 + 50) - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); let widget = final_state .changes .iter() @@ -2296,8 +2515,14 @@ mod tests { fn test_count_and_sum_together() { // Test the example from DBSP_ROADMAP: COUNT(*) and SUM(amount) GROUP BY user_id // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2329,13 +2554,13 @@ mod tests { ); pager .io - .block(|| agg.commit((&initial).into(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursors)) .unwrap(); // Check initial state // User 1: count=2, sum=300 // User 2: count=1, sum=150 - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes.len(), 2); let user1 = state @@ -2364,11 +2589,11 @@ mod tests { ); pager .io - .block(|| agg.commit((&delta).into(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursors)) .unwrap(); // Check final state - user 1 should have updated count and sum - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); let user1 = final_state .changes .iter() @@ -2382,8 +2607,14 @@ mod tests { fn test_avg_maintains_sum_and_count() { // Test AVG aggregation // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2424,13 +2655,13 @@ mod tests { ); pager .io - .block(|| agg.commit((&initial).into(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursors)) .unwrap(); // Check initial averages // Category A: avg = (10 + 20) / 2 = 15 // Category B: avg = 30 / 1 = 30 - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); let cat_a = state .changes .iter() @@ -2459,11 +2690,11 @@ mod tests { ); pager .io - .block(|| agg.commit((&delta).into(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursors)) .unwrap(); // Check final state - Category A avg should now be (10 + 20 + 30) / 3 = 20 - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); let cat_a = final_state .changes .iter() @@ -2476,8 +2707,14 @@ mod tests { fn test_delete_updates_aggregates() { // Test that deletes (negative weights) properly update aggregates // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2513,11 +2750,11 @@ mod tests { ); pager .io - .block(|| agg.commit((&initial).into(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursors)) .unwrap(); // Check initial state: count=2, sum=300 - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert!(!state.changes.is_empty()); let (row, _weight) = &state.changes[0]; assert_eq!(row.values[1], Value::Integer(2)); // count @@ -2536,11 +2773,11 @@ mod tests { pager .io - .block(|| agg.commit((&delta).into(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursors)) .unwrap(); // Check final state - should update to count=1, sum=200 - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); let cat_a = final_state .changes .iter() @@ -2557,8 +2794,14 @@ mod tests { let input_columns = vec!["category".to_string(), "value".to_string()]; // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2574,11 +2817,11 @@ mod tests { init_data.insert(3, vec![Value::Text("B".into()), Value::Integer(30)]); pager .io - .block(|| agg.commit((&init_data).into(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursors)) .unwrap(); // Check initial counts - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes.len(), 2); // Find group A and B @@ -2602,14 +2845,14 @@ mod tests { let output = pager .io - .block(|| agg.commit((&delete_delta).into(), &mut cursor)) + .block(|| agg.commit((&delete_delta).into(), &mut cursors)) .unwrap(); // Should emit retraction for old count and insertion for new count assert_eq!(output.changes.len(), 2); // Check final state - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); let group_a_final = final_state .changes .iter() @@ -2623,13 +2866,13 @@ mod tests { let output_b = pager .io - .block(|| agg.commit((&delete_all_b).into(), &mut cursor)) + .block(|| agg.commit((&delete_all_b).into(), &mut cursors)) .unwrap(); assert_eq!(output_b.changes.len(), 1); // Only retraction, no new row assert_eq!(output_b.changes[0].1, -1); // Retraction // Final state should not have group B - let final_state2 = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state2 = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(final_state2.changes.len(), 1); // Only group A remains assert_eq!(final_state2.changes[0].0.values[0], Value::Text("A".into())); } @@ -2641,8 +2884,14 @@ mod tests { let input_columns = vec!["category".to_string(), "value".to_string()]; // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2659,11 +2908,11 @@ mod tests { init_data.insert(4, vec![Value::Text("B".into()), Value::Integer(15)]); pager .io - .block(|| agg.commit((&init_data).into(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursors)) .unwrap(); // Check initial sums - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); let group_a = state .changes .iter() @@ -2684,11 +2933,11 @@ mod tests { pager .io - .block(|| agg.commit((&delete_delta).into(), &mut cursor)) + .block(|| agg.commit((&delete_delta).into(), &mut cursors)) .unwrap(); // Check updated sum - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); let group_a = state .changes .iter() @@ -2703,11 +2952,11 @@ mod tests { pager .io - .block(|| agg.commit((&delete_all_b).into(), &mut cursor)) + .block(|| agg.commit((&delete_all_b).into(), &mut cursors)) .unwrap(); // Group B should be gone - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(final_state.changes.len(), 1); // Only group A remains assert_eq!(final_state.changes[0].0.values[0], Value::Text("A".into())); } @@ -2719,8 +2968,14 @@ mod tests { let input_columns = vec!["category".to_string(), "value".to_string()]; // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2736,11 +2991,11 @@ mod tests { init_data.insert(3, vec![Value::Text("A".into()), Value::Integer(30)]); pager .io - .block(|| agg.commit((&init_data).into(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursors)) .unwrap(); // Check initial average - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes.len(), 1); assert_eq!(state.changes[0].0.values[1], Value::Float(20.0)); // AVG = (10+20+30)/3 = 20 @@ -2750,11 +3005,11 @@ mod tests { pager .io - .block(|| agg.commit((&delete_delta).into(), &mut cursor)) + .block(|| agg.commit((&delete_delta).into(), &mut cursors)) .unwrap(); // Check updated average - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes[0].0.values[1], Value::Float(20.0)); // AVG = (10+30)/2 = 20 (same!) // Delete another to change the average @@ -2763,10 +3018,10 @@ mod tests { pager .io - .block(|| agg.commit((&delete_another).into(), &mut cursor)) + .block(|| agg.commit((&delete_another).into(), &mut cursors)) .unwrap(); - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes[0].0.values[1], Value::Float(10.0)); // AVG = 10/1 = 10 } @@ -2782,8 +3037,14 @@ mod tests { let input_columns = vec!["category".to_string(), "value".to_string()]; // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -2799,11 +3060,11 @@ mod tests { init_data.insert(3, vec![Value::Text("B".into()), Value::Integer(50)]); pager .io - .block(|| agg.commit((&init_data).into(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursors)) .unwrap(); // Check initial state - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); let group_a = state .changes .iter() @@ -2820,11 +3081,11 @@ mod tests { pager .io - .block(|| agg.commit((&delete_delta).into(), &mut cursor)) + .block(|| agg.commit((&delete_delta).into(), &mut cursors)) .unwrap(); // Check all aggregates updated correctly - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); let group_a = state .changes .iter() @@ -2841,10 +3102,10 @@ mod tests { pager .io - .block(|| agg.commit((&insert_delta).into(), &mut cursor)) + .block(|| agg.commit((&insert_delta).into(), &mut cursors)) .unwrap(); - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); let group_a = state .changes .iter() @@ -2862,8 +3123,14 @@ mod tests { // the operator should properly consolidate the state // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut filter = FilterOperator::new( FilterPredicate::GreaterThan { @@ -2878,7 +3145,7 @@ mod tests { init_data.insert(3, vec![Value::Integer(3), Value::Integer(3)]); let state = pager .io - .block(|| filter.commit((&init_data).into(), &mut cursor)) + .block(|| filter.commit((&init_data).into(), &mut cursors)) .unwrap(); // Check initial state @@ -2897,7 +3164,7 @@ mod tests { let output = pager .io - .block(|| filter.commit((&update_delta).into(), &mut cursor)) + .block(|| filter.commit((&update_delta).into(), &mut cursors)) .unwrap(); // The output delta should have both changes (both pass the filter b > 2) @@ -2918,8 +3185,14 @@ mod tests { #[test] fn test_filter_eval_with_uncommitted() { // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut filter = FilterOperator::new( FilterPredicate::GreaterThan { @@ -2949,7 +3222,7 @@ mod tests { ); let state = pager .io - .block(|| filter.commit((&init_data).into(), &mut cursor)) + .block(|| filter.commit((&init_data).into(), &mut cursors)) .unwrap(); // Verify initial state (only Alice passes filter) @@ -2979,7 +3252,7 @@ mod tests { let mut eval_state = uncommitted.clone().into(); let result = pager .io - .block(|| filter.eval(&mut eval_state, &mut cursor)) + .block(|| filter.eval(&mut eval_state, &mut cursors)) .unwrap(); assert_eq!( result.changes.len(), @@ -2991,7 +3264,7 @@ mod tests { // Now commit the changes let state = pager .io - .block(|| filter.commit((&uncommitted).into(), &mut cursor)) + .block(|| filter.commit((&uncommitted).into(), &mut cursors)) .unwrap(); // State should now include Charlie (who passes filter) @@ -3006,8 +3279,14 @@ mod tests { fn test_aggregate_eval_with_uncommitted_preserves_state() { // This is the critical test - aggregations must not modify internal state during eval // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -3051,11 +3330,11 @@ mod tests { ); pager .io - .block(|| agg.commit((&init_data).into(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursors)) .unwrap(); // Check initial state: A -> (count=2, sum=300), B -> (count=1, sum=150) - let initial_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let initial_state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(initial_state.changes.len(), 2); // Store initial state for comparison @@ -3090,7 +3369,7 @@ mod tests { let mut eval_state = uncommitted.clone().into(); let result = pager .io - .block(|| agg.eval(&mut eval_state, &mut cursor)) + .block(|| agg.eval(&mut eval_state, &mut cursors)) .unwrap(); // Result should contain updates for A and new group C @@ -3099,7 +3378,7 @@ mod tests { assert!(!result.changes.is_empty(), "Should have aggregate changes"); // CRITICAL: Verify internal state hasn't changed - let state_after_eval = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state_after_eval = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!( state_after_eval.changes.len(), 2, @@ -3125,11 +3404,11 @@ mod tests { // Now commit the changes pager .io - .block(|| agg.commit((&uncommitted).into(), &mut cursor)) + .block(|| agg.commit((&uncommitted).into(), &mut cursors)) .unwrap(); // State should now be updated - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(final_state.changes.len(), 3, "Should now have A, B, and C"); let a_final = final_state @@ -3170,8 +3449,14 @@ mod tests { // Test that calling eval multiple times with different uncommitted data // doesn't pollute the internal state // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -3189,11 +3474,11 @@ mod tests { init_data.insert(2, vec![Value::Integer(2), Value::Integer(200)]); pager .io - .block(|| agg.commit((&init_data).into(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursors)) .unwrap(); // Initial state: count=2, sum=300 - let initial_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let initial_state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(initial_state.changes.len(), 1); assert_eq!(initial_state.changes[0].0.values[0], Value::Integer(2)); assert_eq!(initial_state.changes[0].0.values[1], Value::Float(300.0)); @@ -3204,11 +3489,11 @@ mod tests { let mut eval_state1 = uncommitted1.clone().into(); let _ = pager .io - .block(|| agg.eval(&mut eval_state1, &mut cursor)) + .block(|| agg.eval(&mut eval_state1, &mut cursors)) .unwrap(); // State should be unchanged - let state1 = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state1 = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state1.changes[0].0.values[0], Value::Integer(2)); assert_eq!(state1.changes[0].0.values[1], Value::Float(300.0)); @@ -3219,11 +3504,11 @@ mod tests { let mut eval_state2 = uncommitted2.clone().into(); let _ = pager .io - .block(|| agg.eval(&mut eval_state2, &mut cursor)) + .block(|| agg.eval(&mut eval_state2, &mut cursors)) .unwrap(); // State should STILL be unchanged - let state2 = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state2 = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state2.changes[0].0.values[0], Value::Integer(2)); assert_eq!(state2.changes[0].0.values[1], Value::Float(300.0)); @@ -3233,11 +3518,11 @@ mod tests { let mut eval_state3 = uncommitted3.clone().into(); let _ = pager .io - .block(|| agg.eval(&mut eval_state3, &mut cursor)) + .block(|| agg.eval(&mut eval_state3, &mut cursors)) .unwrap(); // State should STILL be unchanged - let state3 = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state3 = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state3.changes[0].0.values[0], Value::Integer(2)); assert_eq!(state3.changes[0].0.values[1], Value::Float(300.0)); } @@ -3246,8 +3531,14 @@ mod tests { fn test_aggregate_eval_with_mixed_committed_and_uncommitted() { // Test eval with both committed delta and uncommitted changes // Create a persistent pager for the test - let (pager, root_page_id) = create_test_pager(); - let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page_id, 10); + let (pager, table_root_page_id, index_root_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5); + // Create index cursor with proper index definition for DBSP state table + let index_def = create_dbsp_state_index(index_root_page_id); + // Index has 4 columns: operator_id, zset_id, element_id, rowid + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); let mut agg = AggregateOperator::new( 1, // operator_id for testing @@ -3262,7 +3553,7 @@ mod tests { init_data.insert(2, vec![Value::Integer(2), Value::Text("Y".into())]); pager .io - .block(|| agg.commit((&init_data).into(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursors)) .unwrap(); // Create a committed delta (to be processed) @@ -3280,7 +3571,7 @@ mod tests { let mut eval_state = combined.clone().into(); let result = pager .io - .block(|| agg.eval(&mut eval_state, &mut cursor)) + .block(|| agg.eval(&mut eval_state, &mut cursors)) .unwrap(); // Result should reflect changes from both @@ -3334,17 +3625,17 @@ mod tests { assert_eq!(sorted_changes[4].1, 1); // insertion only (no retraction as it's new); // But internal state should be unchanged - let state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let state = get_current_state_from_btree(&agg, &pager, &mut cursors); assert_eq!(state.changes.len(), 2, "Should still have only X and Y"); // Now commit only the committed_delta pager .io - .block(|| agg.commit((&committed_delta).into(), &mut cursor)) + .block(|| agg.commit((&committed_delta).into(), &mut cursors)) .unwrap(); // State should now have X count=2, Y count=1 - let final_state = get_current_state_from_btree(&agg, &pager, &mut cursor); + let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); let x = final_state .changes .iter() diff --git a/core/incremental/persistence.rs b/core/incremental/persistence.rs index 381b406aa..0d3425404 100644 --- a/core/incremental/persistence.rs +++ b/core/incremental/persistence.rs @@ -1,7 +1,7 @@ -use crate::incremental::operator::{AggregateFunction, AggregateState}; +use crate::incremental::operator::{AggregateFunction, AggregateState, DbspStateCursors}; use crate::storage::btree::{BTreeCursor, BTreeKey}; -use crate::types::{IOResult, SeekKey, SeekOp, SeekResult}; -use crate::{return_if_io, Result, Value}; +use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult}; +use crate::{return_if_io, LimboError, Result, Value}; #[derive(Debug, Default)] pub enum ReadRecord { @@ -32,21 +32,22 @@ impl ReadRecord { } else { let record = return_if_io!(cursor.record()); let r = record.ok_or_else(|| { - crate::LimboError::InternalError(format!( + LimboError::InternalError(format!( "Found key {key:?} in aggregate storage but could not read record" )) })?; let values = r.get_values(); - let blob = values[1].to_owned(); + // The blob is in column 3: operator_id, zset_id, element_id, value, weight + let blob = values[3].to_owned(); let (state, _group_key) = match blob { Value::Blob(blob) => AggregateState::from_blob(&blob, aggregates) .ok_or_else(|| { - crate::LimboError::InternalError(format!( + LimboError::InternalError(format!( "Cannot deserialize aggregate state {blob:?}", )) }), - _ => Err(crate::LimboError::ParseError( + _ => Err(LimboError::ParseError( "Value in aggregator not blob".to_string(), )), }?; @@ -63,8 +64,22 @@ impl ReadRecord { pub enum WriteRow { #[default] GetRecord, - Delete, - Insert { + Delete { + rowid: i64, + }, + DeleteIndex, + ComputeNewRowId { + final_weight: isize, + }, + InsertNew { + rowid: i64, + final_weight: isize, + }, + InsertIndex { + rowid: i64, + }, + UpdateExisting { + rowid: i64, final_weight: isize, }, Done, @@ -75,97 +90,192 @@ impl WriteRow { Self::default() } - /// Write a row with weight management. + /// Write a row with weight management using index for lookups. /// /// # Arguments - /// * `cursor` - BTree cursor for the storage - /// * `key` - The key to seek (TableRowId) - /// * `build_record` - Function that builds the record values to insert. - /// Takes the final_weight and returns the complete record values. + /// * `cursors` - DBSP state cursors (table and index) + /// * `index_key` - The key to seek in the index + /// * `record_values` - The record values (without weight) to insert /// * `weight` - The weight delta to apply - pub fn write_row( + pub fn write_row( &mut self, - cursor: &mut BTreeCursor, - key: SeekKey, - build_record: F, + cursors: &mut DbspStateCursors, + index_key: Vec, + record_values: Vec, weight: isize, - ) -> Result> - where - F: Fn(isize) -> Vec, - { + ) -> Result> { loop { match self { WriteRow::GetRecord => { - let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); + // First, seek in the index to find if the row exists + let index_values = index_key.clone(); + let index_record = + ImmutableRecord::from_values(&index_values, index_values.len()); + + let res = return_if_io!(cursors.index_cursor.seek( + SeekKey::IndexKey(&index_record), + SeekOp::GE { eq_only: true } + )); + if !matches!(res, SeekResult::Found) { - *self = WriteRow::Insert { + // Row doesn't exist, we'll insert a new one + *self = WriteRow::ComputeNewRowId { final_weight: weight, }; } else { - let existing_record = return_if_io!(cursor.record()); + // Found in index, get the rowid it points to + let rowid = return_if_io!(cursors.index_cursor.rowid()); + let rowid = rowid.ok_or_else(|| { + LimboError::InternalError( + "Index cursor does not have a valid rowid".to_string(), + ) + })?; + + // Now seek in the table using the rowid + let table_res = return_if_io!(cursors + .table_cursor + .seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true })); + + if !matches!(table_res, SeekResult::Found) { + return Err(LimboError::InternalError( + "Index points to non-existent table row".to_string(), + )); + } + + let existing_record = return_if_io!(cursors.table_cursor.record()); let r = existing_record.ok_or_else(|| { - crate::LimboError::InternalError(format!( - "Found key {key:?} in storage but could not read record" - )) + LimboError::InternalError( + "Found rowid in table but could not read record".to_string(), + ) })?; let values = r.get_values(); - // Weight is always the last value - let existing_weight = match values.last() { + // Weight is always the last value (column 4 in our 5-column structure) + let existing_weight = match values.get(4) { Some(val) => match val.to_owned() { Value::Integer(w) => w as isize, _ => { - return Err(crate::LimboError::InternalError(format!( - "Invalid weight value in storage for key {key:?}" - ))) + return Err(LimboError::InternalError( + "Invalid weight value in storage".to_string(), + )) } }, None => { - return Err(crate::LimboError::InternalError(format!( - "No weight value found in storage for key {key:?}" - ))) + return Err(LimboError::InternalError( + "No weight value found in storage".to_string(), + )) } }; let final_weight = existing_weight + weight; if final_weight <= 0 { - *self = WriteRow::Delete + *self = WriteRow::Delete { rowid } } else { - *self = WriteRow::Insert { final_weight } + // Store the rowid for update + *self = WriteRow::UpdateExisting { + rowid, + final_weight, + } } } } - WriteRow::Delete => { + WriteRow::Delete { rowid } => { + // Seek to the row and delete it + return_if_io!(cursors + .table_cursor + .seek(SeekKey::TableRowId(*rowid), SeekOp::GE { eq_only: true })); + + // Transition to DeleteIndex to also delete the index entry + *self = WriteRow::DeleteIndex; + return_if_io!(cursors.table_cursor.delete()); + } + WriteRow::DeleteIndex => { // Mark as Done before delete to avoid retry on I/O *self = WriteRow::Done; - return_if_io!(cursor.delete()); + return_if_io!(cursors.index_cursor.delete()); } - WriteRow::Insert { final_weight } => { - return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); - - // Extract the row ID from the key - let key_i64 = match key { - SeekKey::TableRowId(id) => id, - _ => { - return Err(crate::LimboError::InternalError( - "Expected TableRowId for storage".to_string(), - )) + WriteRow::ComputeNewRowId { final_weight } => { + // Find the last rowid to compute the next one + return_if_io!(cursors.table_cursor.last()); + let rowid = if cursors.table_cursor.is_empty() { + 1 + } else { + match return_if_io!(cursors.table_cursor.rowid()) { + Some(id) => id + 1, + None => { + return Err(LimboError::InternalError( + "Table cursor has rows but no valid rowid".to_string(), + )) + } } }; - // Build the record values using the provided function - let record_values = build_record(*final_weight); + // Transition to InsertNew with the computed rowid + *self = WriteRow::InsertNew { + rowid, + final_weight: *final_weight, + }; + } + WriteRow::InsertNew { + rowid, + final_weight, + } => { + let rowid_val = *rowid; + let final_weight_val = *final_weight; + + // Seek to where we want to insert + // The insert will position the cursor correctly + return_if_io!(cursors.table_cursor.seek( + SeekKey::TableRowId(rowid_val), + SeekOp::GE { eq_only: false } + )); + + // Build the complete record with weight + // Use the function parameter record_values directly + let mut complete_record = record_values.clone(); + complete_record.push(Value::Integer(final_weight_val as i64)); // Create an ImmutableRecord from the values - let immutable_record = crate::types::ImmutableRecord::from_values( - &record_values, - record_values.len(), - ); - let btree_key = BTreeKey::new_table_rowid(key_i64, Some(&immutable_record)); + let immutable_record = + ImmutableRecord::from_values(&complete_record, complete_record.len()); + let btree_key = BTreeKey::new_table_rowid(rowid_val, Some(&immutable_record)); + + // Transition to InsertIndex state after table insertion + *self = WriteRow::InsertIndex { rowid: rowid_val }; + return_if_io!(cursors.table_cursor.insert(&btree_key)); + } + WriteRow::InsertIndex { rowid } => { + // For has_rowid indexes, we need to append the rowid to the index key + // Use the function parameter index_key directly + let mut index_values = index_key.clone(); + index_values.push(Value::Integer(*rowid)); + + // Create the index record with the rowid appended + let index_record = + ImmutableRecord::from_values(&index_values, index_values.len()); + let index_btree_key = BTreeKey::new_index_key(&index_record); + + // Mark as Done before index insert to avoid retry on I/O + *self = WriteRow::Done; + return_if_io!(cursors.index_cursor.insert(&index_btree_key)); + } + WriteRow::UpdateExisting { + rowid, + final_weight, + } => { + // Build the complete record with weight + let mut complete_record = record_values.clone(); + complete_record.push(Value::Integer(*final_weight as i64)); + + // Create an ImmutableRecord from the values + let immutable_record = + ImmutableRecord::from_values(&complete_record, complete_record.len()); + let btree_key = BTreeKey::new_table_rowid(*rowid, Some(&immutable_record)); // Mark as Done before insert to avoid retry on I/O *self = WriteRow::Done; - return_if_io!(cursor.insert(&btree_key)); + // BTree insert with existing key will replace the old value + return_if_io!(cursors.table_cursor.insert(&btree_key)); } WriteRow::Done => { return Ok(IOResult::Done(())); diff --git a/core/incremental/view.rs b/core/incremental/view.rs index bdcabd7c9..591e95e38 100644 --- a/core/incremental/view.rs +++ b/core/incremental/view.rs @@ -206,6 +206,7 @@ impl IncrementalView { schema: &Schema, main_data_root: usize, internal_state_root: usize, + internal_state_index_root: usize, ) -> Result { // Build the logical plan from the SELECT statement let mut builder = LogicalPlanBuilder::new(schema); @@ -214,7 +215,11 @@ impl IncrementalView { let logical_plan = builder.build_statement(&stmt)?; // Compile the logical plan to a DBSP circuit with the storage roots - let compiler = DbspCompiler::new(main_data_root, internal_state_root); + let compiler = DbspCompiler::new( + main_data_root, + internal_state_root, + internal_state_index_root, + ); let circuit = compiler.compile(&logical_plan)?; Ok(circuit) @@ -271,6 +276,7 @@ impl IncrementalView { schema: &Schema, main_data_root: usize, internal_state_root: usize, + internal_state_index_root: usize, ) -> Result { let mut parser = Parser::new(sql.as_bytes()); let cmd = parser.next_cmd()?; @@ -287,6 +293,7 @@ impl IncrementalView { schema, main_data_root, internal_state_root, + internal_state_index_root, ), _ => Err(LimboError::ParseError(format!( "View is not a CREATE MATERIALIZED VIEW statement: {sql}" @@ -300,6 +307,7 @@ impl IncrementalView { schema: &Schema, main_data_root: usize, internal_state_root: usize, + internal_state_index_root: usize, ) -> Result { let name = view_name.name.as_str().to_string(); @@ -327,6 +335,7 @@ impl IncrementalView { schema, main_data_root, internal_state_root, + internal_state_index_root, ) } @@ -340,13 +349,19 @@ impl IncrementalView { schema: &Schema, main_data_root: usize, internal_state_root: usize, + internal_state_index_root: usize, ) -> Result { // Create the tracker that will be shared by all operators let tracker = Arc::new(Mutex::new(ComputationTracker::new())); // Compile the SELECT statement into a DBSP circuit - let circuit = - Self::try_compile_circuit(&select_stmt, schema, main_data_root, internal_state_root)?; + let circuit = Self::try_compile_circuit( + &select_stmt, + schema, + main_data_root, + internal_state_root, + internal_state_index_root, + )?; Ok(Self { name, diff --git a/core/schema.rs b/core/schema.rs index b849586aa..cb2817d77 100644 --- a/core/schema.rs +++ b/core/schema.rs @@ -306,6 +306,8 @@ impl Schema { // Store DBSP state table root pages: view_name -> dbsp_state_root_page let mut dbsp_state_roots: HashMap = HashMap::new(); + // Store DBSP state table index root pages: view_name -> dbsp_state_index_root_page + let mut dbsp_state_index_roots: HashMap = HashMap::new(); // Store materialized view info (SQL and root page) for later creation let mut materialized_view_info: HashMap = HashMap::new(); @@ -357,6 +359,7 @@ impl Schema { &mut from_sql_indexes, &mut automatic_indices, &mut dbsp_state_roots, + &mut dbsp_state_index_roots, &mut materialized_view_info, )?; drop(record_cursor); @@ -369,7 +372,11 @@ impl Schema { self.populate_indices(from_sql_indexes, automatic_indices)?; - self.populate_materialized_views(materialized_view_info, dbsp_state_roots)?; + self.populate_materialized_views( + materialized_view_info, + dbsp_state_roots, + dbsp_state_index_roots, + )?; Ok(()) } @@ -492,6 +499,7 @@ impl Schema { &mut self, materialized_view_info: std::collections::HashMap, dbsp_state_roots: std::collections::HashMap, + dbsp_state_index_roots: std::collections::HashMap, ) -> Result<()> { for (view_name, (sql, main_root)) in materialized_view_info { // Look up the DBSP state root for this view - must exist for materialized views @@ -501,9 +509,17 @@ impl Schema { )) })?; - // Create the IncrementalView with both root pages - let incremental_view = - IncrementalView::from_sql(&sql, self, main_root, *dbsp_state_root)?; + // Look up the DBSP state index root (may not exist for older schemas) + let dbsp_state_index_root = + dbsp_state_index_roots.get(&view_name).copied().unwrap_or(0); + // Create the IncrementalView with all root pages + let incremental_view = IncrementalView::from_sql( + &sql, + self, + main_root, + *dbsp_state_root, + dbsp_state_index_root, + )?; let referenced_tables = incremental_view.get_referenced_table_names(); // Create a BTreeTable for the materialized view @@ -539,6 +555,7 @@ impl Schema { from_sql_indexes: &mut Vec, automatic_indices: &mut std::collections::HashMap>, dbsp_state_roots: &mut std::collections::HashMap, + dbsp_state_index_roots: &mut std::collections::HashMap, materialized_view_info: &mut std::collections::HashMap, ) -> Result<()> { match ty { @@ -593,12 +610,23 @@ impl Schema { // index|sqlite_autoindex_foo_1|foo|3| let index_name = name.to_string(); let table_name = table_name.to_string(); - match automatic_indices.entry(table_name) { - std::collections::hash_map::Entry::Vacant(e) => { - e.insert(vec![(index_name, root_page as usize)]); - } - std::collections::hash_map::Entry::Occupied(mut e) => { - e.get_mut().push((index_name, root_page as usize)); + + // Check if this is an index for a DBSP state table + if table_name.starts_with(DBSP_TABLE_PREFIX) { + // Extract the view name from __turso_internal_dbsp_state_ + let view_name = table_name + .strip_prefix(DBSP_TABLE_PREFIX) + .unwrap() + .to_string(); + dbsp_state_index_roots.insert(view_name, root_page as usize); + } else { + match automatic_indices.entry(table_name) { + std::collections::hash_map::Entry::Vacant(e) => { + e.insert(vec![(index_name, root_page as usize)]); + } + std::collections::hash_map::Entry::Occupied(mut e) => { + e.get_mut().push((index_name, root_page as usize)); + } } } } diff --git a/core/translate/view.rs b/core/translate/view.rs index afcef3331..f89f29817 100644 --- a/core/translate/view.rs +++ b/core/translate/view.rs @@ -2,7 +2,7 @@ use crate::schema::{Schema, DBSP_TABLE_PREFIX}; use crate::storage::pager::CreateBTreeFlags; use crate::translate::emitter::Resolver; use crate::translate::schema::{emit_schema_entry, SchemaEntryType, SQLITE_TABLEID}; -use crate::util::normalize_ident; +use crate::util::{normalize_ident, PRIMARY_KEY_AUTOMATIC_INDEX_NAME_PREFIX}; use crate::vdbe::builder::{CursorType, ProgramBuilder}; use crate::vdbe::insn::{CmpInsFlags, Cookie, Insn, RegisterOrLiteral}; use crate::{Connection, Result, SymbolTable}; @@ -141,7 +141,20 @@ pub fn translate_create_materialized_view( // Add the DBSP state table to sqlite_master (required for materialized views) let dbsp_table_name = format!("{DBSP_TABLE_PREFIX}{normalized_view_name}"); - let dbsp_sql = format!("CREATE TABLE {dbsp_table_name} (key INTEGER PRIMARY KEY, state BLOB)"); + // The element_id column uses SQLite's dynamic typing system to store different value types: + // - For hash-based operators (joins, filters): stores INTEGER hash values or rowids + // - For future MIN/MAX operators: stores the actual values being compared (INTEGER, REAL, TEXT, BLOB) + // SQLite's type affinity and sorting rules ensure correct ordering within each operator's data + let dbsp_sql = format!( + "CREATE TABLE {dbsp_table_name} (\ + operator_id INTEGER NOT NULL, \ + zset_id INTEGER NOT NULL, \ + element_id NOT NULL, \ + value BLOB, \ + weight INTEGER NOT NULL, \ + PRIMARY KEY (operator_id, zset_id, element_id)\ + )" + ); emit_schema_entry( &mut program, @@ -155,11 +168,37 @@ pub fn translate_create_materialized_view( Some(dbsp_sql), )?; + // Create automatic primary key index for the DBSP table + // Since the table has PRIMARY KEY (operator_id, zset_id, element_id), we need an index + let dbsp_index_root_reg = program.alloc_register(); + program.emit_insn(Insn::CreateBtree { + db: 0, + root: dbsp_index_root_reg, + flags: CreateBTreeFlags::new_index(), + }); + + // Register the index in sqlite_schema + let dbsp_index_name = format!( + "{}{}_1", + PRIMARY_KEY_AUTOMATIC_INDEX_NAME_PREFIX, &dbsp_table_name + ); + emit_schema_entry( + &mut program, + &resolver, + sqlite_schema_cursor_id, + None, // cdc_table_cursor_id + SchemaEntryType::Index, + &dbsp_index_name, + &dbsp_table_name, + dbsp_index_root_reg, + None, // Automatic indexes don't store SQL + )?; + // Parse schema to load the new view and DBSP state table program.emit_insn(Insn::ParseSchema { db: sqlite_schema_cursor_id, where_clause: Some(format!( - "name = '{normalized_view_name}' OR name = '{dbsp_table_name}'" + "name = '{normalized_view_name}' OR name = '{dbsp_table_name}' OR name = '{dbsp_index_name}'" )), }); diff --git a/core/util.rs b/core/util.rs index 5ec33a7e2..1c4217a66 100644 --- a/core/util.rs +++ b/core/util.rs @@ -163,6 +163,9 @@ pub fn parse_schema_rows( // Store DBSP state table root pages: view_name -> dbsp_state_root_page let mut dbsp_state_roots: std::collections::HashMap = std::collections::HashMap::new(); + // Store DBSP state table index root pages: view_name -> dbsp_state_index_root_page + let mut dbsp_state_index_roots: std::collections::HashMap = + std::collections::HashMap::new(); // Store materialized view info (SQL and root page) for later creation let mut materialized_view_info: std::collections::HashMap = std::collections::HashMap::new(); @@ -185,8 +188,9 @@ pub fn parse_schema_rows( &mut from_sql_indexes, &mut automatic_indices, &mut dbsp_state_roots, + &mut dbsp_state_index_roots, &mut materialized_view_info, - )?; + )? } StepResult::IO => { // TODO: How do we ensure that the I/O we submitted to @@ -200,7 +204,11 @@ pub fn parse_schema_rows( } schema.populate_indices(from_sql_indexes, automatic_indices)?; - schema.populate_materialized_views(materialized_view_info, dbsp_state_roots)?; + schema.populate_materialized_views( + materialized_view_info, + dbsp_state_roots, + dbsp_state_index_roots, + )?; Ok(()) }