From 565c2a698af7a99fa39b7235a63ca23cff651efe Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 27 Aug 2025 10:38:11 -0500 Subject: [PATCH] adjust views to use circuits --- core/incremental/view.rs | 861 ++++++--------------------------------- 1 file changed, 128 insertions(+), 733 deletions(-) diff --git a/core/incremental/view.rs b/core/incremental/view.rs index 7033fe83c..bcae0afae 100644 --- a/core/incremental/view.rs +++ b/core/incremental/view.rs @@ -1,13 +1,12 @@ +use super::compiler::{DbspCircuit, DbspCompiler, DeltaSet}; use super::dbsp::{RowKeyStream, RowKeyZSet}; -use super::operator::{ - AggregateFunction, AggregateOperator, ComputationTracker, Delta, FilterOperator, - FilterPredicate, IncrementalOperator, ProjectOperator, -}; +use super::operator::{ComputationTracker, Delta, FilterPredicate}; use crate::schema::{BTreeTable, Column, Schema}; +use crate::translate::logical::LogicalPlanBuilder; use crate::types::{IOCompletions, IOResult, Value}; -use crate::util::{extract_column_name_from_expr, extract_view_columns}; +use crate::util::extract_view_columns; use crate::{io_yield_one, Completion, LimboError, Result, Statement}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::fmt; use std::sync::{Arc, Mutex}; use turso_parser::ast; @@ -60,8 +59,7 @@ pub struct ViewTransactionState { /// for large aggregations, because then we don't have to re-compute when opening the database /// again. /// -/// Right now we are supporting the simplest views by keeping the operators in the view and -/// applying them in a sane order. But the general solution would turn this into a DBSP circuit. +/// Uses DBSP circuits for incremental computation. #[derive(Debug)] pub struct IncrementalView { // Stream of row keys for this view @@ -75,12 +73,11 @@ pub struct IncrementalView { // The SELECT statement that defines how to transform input data pub select_stmt: ast::Select, - // Internal filter operator for predicate evaluation - filter_operator: Option, - // Internal project operator for value transformation - project_operator: Option, - // Internal aggregate operator for GROUP BY and aggregations - aggregate_operator: Option, + // DBSP circuit that encapsulates the computation + circuit: DbspCircuit, + // Track whether circuit has been initialized with data + circuit_initialized: bool, + // Tables referenced by this view (extracted from FROM clause and JOINs) base_table: Arc, // The view's output columns with their types @@ -108,6 +105,25 @@ impl IncrementalView { Ok(()) } + /// Try to compile the SELECT statement into a DBSP circuit + fn try_compile_circuit( + select: &ast::Select, + schema: &Schema, + _base_table: &Arc, + ) -> Result { + // Build the logical plan from the SELECT statement + let mut builder = LogicalPlanBuilder::new(schema); + // Convert Select to a Stmt for the builder + let stmt = ast::Stmt::Select(select.clone()); + let logical_plan = builder.build_statement(&stmt)?; + + // Compile the logical plan to a DBSP circuit + let compiler = DbspCompiler::new(); + let circuit = compiler.compile(&logical_plan)?; + + Ok(circuit) + } + /// Get an iterator over column names, using enumerated naming for unnamed columns pub fn column_names(&self) -> impl Iterator + '_ { self.columns.iter().enumerate().map(|(i, col)| { @@ -136,14 +152,6 @@ impl IncrementalView { false } - /// Apply filter operator to check if values pass the view's WHERE clause - fn apply_filter(&self, values: &[Value]) -> bool { - if let Some(ref filter_op) = self.filter_operator { - filter_op.evaluate_predicate(values) - } else { - true - } - } pub fn from_sql(sql: &str, schema: &Schema) -> Result { let mut parser = Parser::new(sql.as_bytes()); let cmd = parser.next_cmd()?; @@ -173,10 +181,6 @@ impl IncrementalView { // Extract output columns using the shared function let view_columns = extract_view_columns(&select, schema); - // Extract GROUP BY columns and aggregate functions - let (group_by_columns, aggregate_functions, _old_output_names) = - Self::extract_aggregation_info(&select); - let (join_tables, join_condition) = Self::extract_join_info(&select); if join_tables.is_some() || join_condition.is_some() { return Err(LimboError::ParseError( @@ -199,105 +203,43 @@ impl IncrementalView { )); }; - let base_table_column_names = base_table - .columns - .iter() - .enumerate() - .map(|(i, col)| col.name.clone().unwrap_or_else(|| format!("column_{i}"))) - .collect(); - Self::new( name, - Vec::new(), // Empty initial data where_predicate, select.clone(), base_table, - base_table_column_names, view_columns, - group_by_columns, - aggregate_functions, schema, ) } - #[allow(clippy::too_many_arguments)] pub fn new( name: String, - initial_data: Vec<(i64, Vec)>, where_predicate: FilterPredicate, select_stmt: ast::Select, base_table: Arc, - base_table_column_names: Vec, columns: Vec, - group_by_columns: Vec, - aggregate_functions: Vec, schema: &Schema, ) -> Result { - let mut records = BTreeMap::new(); - - for (row_key, values) in initial_data { - records.insert(row_key, values); - } - - // Create initial stream with row keys - let mut zset = RowKeyZSet::new(); - for (row_key, values) in &records { - use crate::incremental::hashable_row::HashableRow; - let row = HashableRow::new(*row_key, values.clone()); - zset.insert(row, 1); - } + let records = BTreeMap::new(); // Create the tracker that will be shared by all operators let tracker = Arc::new(Mutex::new(ComputationTracker::new())); - // Create filter operator if we have a predicate - let filter_operator = if !matches!(where_predicate, FilterPredicate::None) { - let mut filter_op = - FilterOperator::new(where_predicate.clone(), base_table_column_names.clone()); - filter_op.set_tracker(tracker.clone()); - Some(filter_op) - } else { - None - }; + // Compile the SELECT statement into a DBSP circuit + let circuit = Self::try_compile_circuit(&select_stmt, schema, &base_table)?; - // Check if this is an aggregated view - let is_aggregated = !group_by_columns.is_empty() || !aggregate_functions.is_empty(); - - // Create aggregate operator if needed - let aggregate_operator = if is_aggregated { - let mut agg_op = AggregateOperator::new( - group_by_columns, - aggregate_functions, - base_table_column_names.clone(), - ); - agg_op.set_tracker(tracker.clone()); - Some(agg_op) - } else { - None - }; - - // Only create project operator for non-aggregated views - let project_operator = if !is_aggregated { - let mut proj_op = ProjectOperator::from_select( - &select_stmt, - base_table_column_names.clone(), - schema, - )?; - proj_op.set_tracker(tracker.clone()); - Some(proj_op) - } else { - None - }; + // Circuit will be initialized when we first call merge_delta + let circuit_initialized = false; Ok(Self { - stream: RowKeyStream::from_zset(zset), + stream: RowKeyStream::from_zset(RowKeyZSet::new()), name, records, where_predicate, select_stmt, - filter_operator, - project_operator, - aggregate_operator, + circuit, + circuit_initialized, base_table, columns, populate_state: PopulateState::Start, @@ -338,46 +280,28 @@ impl IncrementalView { // Get the base table from referenced tables let table = &self.base_table; - // Build column list for SELECT clause - let select_columns = if let Some(ref project_op) = self.project_operator { - // Get the columns used by the projection operator - let mut columns = Vec::new(); - for col in project_op.columns() { - // Check if it's a simple column reference - if let turso_parser::ast::Expr::Id(name) = &col.expr { - columns.push(name.as_str().to_string()); - } else { - // For expressions, we need all columns (for now) - columns.clear(); - columns.push("*".to_string()); - break; - } - } - if columns.is_empty() || columns.contains(&"*".to_string()) { - "*".to_string() - } else { - // Add the columns and always include rowid - columns.join(", ").to_string() - } - } else { - // No projection, use all columns + // Check if the table has a rowid alias (INTEGER PRIMARY KEY column) + let has_rowid_alias = table.columns.iter().any(|col| col.is_rowid_alias); + + // For now, select all columns since we don't have the static operators + // The circuit will handle filtering and projection + // If there's a rowid alias, we don't need to select rowid separately + let select_clause = if has_rowid_alias { "*".to_string() + } else { + "*, rowid".to_string() }; - // Build WHERE clause from filter operator - let where_clause = if let Some(ref filter_op) = self.filter_operator { - self.build_where_clause(filter_op.predicate())? - } else { - String::new() - }; + // Build WHERE clause from the where_predicate + let where_clause = self.build_where_clause(&self.where_predicate)?; // Construct the final query let query = if where_clause.is_empty() { - format!("SELECT {}, rowid FROM {}", select_columns, table.name) + format!("SELECT {} FROM {}", select_clause, table.name) } else { format!( - "SELECT {}, rowid FROM {} WHERE {}", - select_columns, table.name, where_clause + "SELECT {} FROM {} WHERE {}", + select_clause, table.name, where_clause ) }; Ok(query) @@ -494,20 +418,40 @@ impl IncrementalView { let all_values: Vec = row.get_values().cloned().collect(); - // The last value should be the rowid - let rowid = match all_values.last() { - Some(crate::types::Value::Integer(id)) => *id, - _ => { - // This shouldn't happen - rowid must be an integer - *rows_processed += 1; - batch_count += 1; - continue; - } + // Determine how to extract the rowid + // If there's a rowid alias (INTEGER PRIMARY KEY), the rowid is one of the columns + // Otherwise, it's the last value we explicitly selected + let (rowid, values) = if let Some((idx, _)) = + self.base_table.get_rowid_alias_column() + { + // The rowid is the value at the rowid alias column index + let rowid = match all_values.get(idx) { + Some(crate::types::Value::Integer(id)) => *id, + _ => { + // This shouldn't happen - rowid alias must be an integer + *rows_processed += 1; + batch_count += 1; + continue; + } + }; + // All values are table columns (no separate rowid was selected) + (rowid, all_values) + } else { + // The last value is the explicitly selected rowid + let rowid = match all_values.last() { + Some(crate::types::Value::Integer(id)) => *id, + _ => { + // This shouldn't happen - rowid must be an integer + *rows_processed += 1; + batch_count += 1; + continue; + } + }; + // Get all values except the rowid + let values = all_values[..all_values.len() - 1].to_vec(); + (rowid, values) }; - // Get all values except the rowid - let values = all_values[..all_values.len() - 1].to_vec(); - // Add to batch delta - let merge_delta handle filtering and aggregation batch_delta.insert(rowid, values); @@ -542,120 +486,6 @@ impl IncrementalView { } } - /// Extract GROUP BY columns and aggregate functions from SELECT statement - fn extract_aggregation_info( - select: &ast::Select, - ) -> (Vec, Vec, Vec) { - use turso_parser::ast::*; - - let mut group_by_columns = Vec::new(); - let mut aggregate_functions = Vec::new(); - let mut output_column_names = Vec::new(); - - if let OneSelect::Select { - ref group_by, - ref columns, - .. - } = select.body.select - { - // Extract GROUP BY columns - if let Some(group_by) = group_by { - for expr in &group_by.exprs { - if let Some(col_name) = extract_column_name_from_expr(expr) { - group_by_columns.push(col_name); - } - } - } - - // Extract aggregate functions and column names/aliases from SELECT list - for result_col in columns { - match result_col { - ResultColumn::Expr(expr, alias) => { - // Extract aggregate functions - let mut found_aggregates = Vec::new(); - Self::extract_aggregates_from_expr(expr, &mut found_aggregates); - - // Determine the output column name - let col_name = if let Some(As::As(alias_name)) = alias { - // Use the provided alias - alias_name.as_str().to_string() - } else if !found_aggregates.is_empty() { - // Use the default name from the aggregate function - found_aggregates[0].default_output_name() - } else if let Some(name) = extract_column_name_from_expr(expr) { - // Use the column name - name - } else { - // Fallback to a generic name - format!("column{}", output_column_names.len() + 1) - }; - - output_column_names.push(col_name); - aggregate_functions.extend(found_aggregates); - } - ResultColumn::Star => { - // For SELECT *, we'd need to know the base table columns - // This is handled elsewhere - } - ResultColumn::TableStar(_) => { - // Similar to Star, but for a specific table - } - } - } - } - - (group_by_columns, aggregate_functions, output_column_names) - } - - /// Recursively extract aggregate functions from an expression - fn extract_aggregates_from_expr( - expr: &ast::Expr, - aggregate_functions: &mut Vec, - ) { - use crate::function::Func; - use turso_parser::ast::*; - - match expr { - // Handle COUNT(*) and similar aggregate functions with * - Expr::FunctionCallStar { name, .. } => { - // FunctionCallStar is typically COUNT(*), which has 0 args - if let Ok(func) = Func::resolve_function(name.as_str(), 0) { - // Use the centralized mapping from operator.rs - // For COUNT(*), we pass None as the input column - if let Some(agg_func) = AggregateFunction::from_sql_function(&func, None) { - aggregate_functions.push(agg_func); - } - } - } - Expr::FunctionCall { name, args, .. } => { - // Regular function calls with arguments - let arg_count = args.len(); - - if let Ok(func) = Func::resolve_function(name.as_str(), arg_count) { - // Extract the input column if there's an argument - let input_column = if arg_count > 0 { - args.first().and_then(extract_column_name_from_expr) - } else { - None - }; - - // Use the centralized mapping from operator.rs - if let Some(agg_func) = - AggregateFunction::from_sql_function(&func, input_column) - { - aggregate_functions.push(agg_func); - } - } - } - // Recursively check binary expressions, etc. - Expr::Binary(left, _, right) => { - Self::extract_aggregates_from_expr(left, aggregate_functions); - Self::extract_aggregates_from_expr(right, aggregate_functions); - } - _ => {} - } - } - /// Extract JOIN information from SELECT statement #[allow(clippy::type_complexity)] pub fn extract_join_info( @@ -743,50 +573,36 @@ impl IncrementalView { /// Get current data merged with transaction state pub fn current_data(&self, tx_state: Option<&ViewTransactionState>) -> Vec<(i64, Vec)> { - // Start with committed records - if let Some(tx_state) = tx_state { - // processed_delta = input delta for now. Need to apply operations - let processed_delta = &tx_state.delta; + // Use circuit to process uncommitted changes + let mut uncommitted = DeltaSet::new(); + uncommitted.insert(self.base_table.name.clone(), tx_state.delta.clone()); - // For non-aggregation views, merge the processed delta with committed records - let mut result_map: BTreeMap> = self.records.clone(); - - for (row, weight) in &processed_delta.changes { - if *weight > 0 && self.apply_filter(&row.values) { - result_map.insert(row.rowid, row.values.clone()); - } else if *weight < 0 { - result_map.remove(&row.rowid); + // Execute with uncommitted changes (won't affect circuit state) + match self.circuit.execute(HashMap::new(), uncommitted) { + Ok(processed_delta) => { + // Merge processed delta with committed records + let mut result_map: BTreeMap> = self.records.clone(); + for (row, weight) in &processed_delta.changes { + if *weight > 0 { + result_map.insert(row.rowid, row.values.clone()); + } else if *weight < 0 { + result_map.remove(&row.rowid); + } + } + result_map.into_iter().collect() + } + Err(e) => { + // Return error or panic - no fallback + panic!("Failed to execute circuit with uncommitted data: {e:?}"); } } - - result_map.into_iter().collect() } else { // No transaction state: return committed records self.records.clone().into_iter().collect() } } - /// Apply filter operator to a delta if present and commit the changes - fn apply_filter_to_delta(&mut self, delta: Delta) -> Delta { - if let Some(ref mut filter_op) = self.filter_operator { - // Commit updates state and returns output - filter_op.commit(delta) - } else { - delta - } - } - - /// Apply aggregation operator to a delta if this is an aggregated view and commit the changes - fn apply_aggregation_to_delta(&mut self, delta: Delta) -> Delta { - if let Some(ref mut agg_op) = self.aggregate_operator { - // Commit updates state and returns output - agg_op.commit(delta) - } else { - delta - } - } - /// Merge a delta of changes into the view's current state pub fn merge_delta(&mut self, delta: &Delta) { // Early return if delta is empty @@ -794,16 +610,33 @@ impl IncrementalView { return; } - // Apply operators in pipeline - let mut current_delta = delta.clone(); - current_delta = self.apply_filter_to_delta(current_delta); + // Use the circuit to process the delta + let mut input_data = HashMap::new(); + input_data.insert(self.base_table.name.clone(), delta.clone()); - // Apply projection operator if present (for non-aggregated views) - if let Some(ref mut project_op) = self.project_operator { - current_delta = project_op.commit(current_delta); + // If circuit hasn't been initialized yet, initialize it first + // This happens during populate_from_table + if !self.circuit_initialized { + // Initialize the circuit with empty state + self.circuit + .initialize(HashMap::new()) + .expect("Failed to initialize circuit"); + self.circuit_initialized = true; } - current_delta = self.apply_aggregation_to_delta(current_delta); + // Execute the circuit to process the delta + let current_delta = match self.circuit.execute(input_data.clone(), DeltaSet::empty()) { + Ok(output) => { + // Commit the changes to the circuit's internal state + self.circuit + .commit(input_data) + .expect("Failed to commit to circuit"); + output + } + Err(e) => { + panic!("Failed to execute circuit: {e:?}"); + } + }; // Update records and stream with the processed delta let mut zset_delta = RowKeyZSet::new(); @@ -821,441 +654,3 @@ impl IncrementalView { self.stream.apply_delta(&zset_delta); } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::incremental::operator::{Delta, IncrementalOperator}; - use crate::schema::{BTreeTable, Column, Schema, Type}; - use crate::types::Value; - use std::sync::Arc; - fn create_test_schema() -> Schema { - let mut schema = Schema::new(false); - let table = BTreeTable { - root_page: 1, - name: "t".to_string(), - columns: vec![ - Column { - name: Some("a".to_string()), - ty: Type::Integer, - ty_str: "INTEGER".to_string(), - primary_key: false, - is_rowid_alias: false, - notnull: false, - default: None, - unique: false, - collation: None, - hidden: false, - }, - Column { - name: Some("b".to_string()), - ty: Type::Integer, - ty_str: "INTEGER".to_string(), - primary_key: false, - is_rowid_alias: false, - notnull: false, - default: None, - unique: false, - collation: None, - hidden: false, - }, - Column { - name: Some("c".to_string()), - ty: Type::Integer, - ty_str: "INTEGER".to_string(), - primary_key: false, - is_rowid_alias: false, - notnull: false, - default: None, - unique: false, - collation: None, - hidden: false, - }, - ], - primary_key_columns: vec![], - has_rowid: true, - is_strict: false, - unique_sets: None, - }; - schema.add_btree_table(Arc::new(table)); - schema - } - - #[test] - fn test_projection_simple_columns() { - let schema = create_test_schema(); - let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, b FROM t"; - - let view = IncrementalView::from_sql(sql, &schema).unwrap(); - - assert!(view.project_operator.is_some()); - let project_op = view.project_operator.as_ref().unwrap(); - - let mut delta = Delta::new(); - delta.insert( - 1, - vec![Value::Integer(10), Value::Integer(20), Value::Integer(30)], - ); - - let mut temp_project = project_op.clone(); - temp_project.initialize(delta); - let result = temp_project.get_current_state(); - - let (output, _weight) = result.changes.first().unwrap(); - assert_eq!(output.values, vec![Value::Integer(10), Value::Integer(20)]); - } - - #[test] - fn test_projection_arithmetic_expression() { - let schema = create_test_schema(); - let sql = "CREATE MATERIALIZED VIEW v AS SELECT a * 2 as doubled FROM t"; - - let view = IncrementalView::from_sql(sql, &schema).unwrap(); - - assert!(view.project_operator.is_some()); - let project_op = view.project_operator.as_ref().unwrap(); - - let mut delta = Delta::new(); - delta.insert( - 1, - vec![Value::Integer(4), Value::Integer(2), Value::Integer(0)], - ); - - let mut temp_project = project_op.clone(); - temp_project.initialize(delta); - let result = temp_project.get_current_state(); - - let (output, _weight) = result.changes.first().unwrap(); - assert_eq!(output.values, vec![Value::Integer(8)]); - } - - #[test] - fn test_projection_multiple_expressions() { - let schema = create_test_schema(); - let sql = "CREATE MATERIALIZED VIEW v AS SELECT a + b as sum, a - b as diff, c FROM t"; - - let view = IncrementalView::from_sql(sql, &schema).unwrap(); - - assert!(view.project_operator.is_some()); - let project_op = view.project_operator.as_ref().unwrap(); - - let mut delta = Delta::new(); - delta.insert( - 1, - vec![Value::Integer(10), Value::Integer(3), Value::Integer(7)], - ); - - let mut temp_project = project_op.clone(); - temp_project.initialize(delta); - let result = temp_project.get_current_state(); - - let (output, _weight) = result.changes.first().unwrap(); - assert_eq!( - output.values, - vec![Value::Integer(13), Value::Integer(7), Value::Integer(7),] - ); - } - - #[test] - fn test_projection_function_call() { - let schema = create_test_schema(); - let sql = "CREATE MATERIALIZED VIEW v AS SELECT abs(a - 300) as abs_diff, b FROM t"; - - let view = IncrementalView::from_sql(sql, &schema).unwrap(); - - assert!(view.project_operator.is_some()); - let project_op = view.project_operator.as_ref().unwrap(); - - let mut delta = Delta::new(); - delta.insert( - 1, - vec![Value::Integer(255), Value::Integer(20), Value::Integer(30)], - ); - - let mut temp_project = project_op.clone(); - temp_project.initialize(delta); - let result = temp_project.get_current_state(); - - let (output, _weight) = result.changes.first().unwrap(); - // abs(255 - 300) = abs(-45) = 45 - assert_eq!(output.values, vec![Value::Integer(45), Value::Integer(20),]); - } - - #[test] - fn test_projection_mixed_columns_and_expressions() { - let schema = create_test_schema(); - let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, b * 2 as doubled, c, a + b + c as total FROM t"; - - let view = IncrementalView::from_sql(sql, &schema).unwrap(); - - assert!(view.project_operator.is_some()); - let project_op = view.project_operator.as_ref().unwrap(); - - let mut delta = Delta::new(); - delta.insert( - 1, - vec![Value::Integer(1), Value::Integer(5), Value::Integer(3)], - ); - - let mut temp_project = project_op.clone(); - temp_project.initialize(delta); - let result = temp_project.get_current_state(); - - let (output, _weight) = result.changes.first().unwrap(); - assert_eq!( - output.values, - vec![ - Value::Integer(1), - Value::Integer(10), - Value::Integer(3), - Value::Integer(9), - ] - ); - } - - #[test] - fn test_projection_complex_expression() { - let schema = create_test_schema(); - let sql = "CREATE MATERIALIZED VIEW v AS SELECT (a * 2) + (b * 3) as weighted, c / 2 as half FROM t"; - - let view = IncrementalView::from_sql(sql, &schema).unwrap(); - - assert!(view.project_operator.is_some()); - let project_op = view.project_operator.as_ref().unwrap(); - - let mut delta = Delta::new(); - delta.insert( - 1, - vec![Value::Integer(5), Value::Integer(2), Value::Integer(10)], - ); - - let mut temp_project = project_op.clone(); - temp_project.initialize(delta); - let result = temp_project.get_current_state(); - - let (output, _weight) = result.changes.first().unwrap(); - assert_eq!(output.values, vec![Value::Integer(16), Value::Integer(5),]); - } - - #[test] - fn test_projection_with_where_clause() { - let schema = create_test_schema(); - let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, a * 2 as doubled FROM t WHERE b > 2"; - - let view = IncrementalView::from_sql(sql, &schema).unwrap(); - - assert!(view.project_operator.is_some()); - assert!(view.filter_operator.is_some()); - - let project_op = view.project_operator.as_ref().unwrap(); - - let mut delta = Delta::new(); - delta.insert( - 1, - vec![Value::Integer(4), Value::Integer(3), Value::Integer(0)], - ); - - let mut temp_project = project_op.clone(); - temp_project.initialize(delta); - let result = temp_project.get_current_state(); - - let (output, _weight) = result.changes.first().unwrap(); - assert_eq!(output.values, vec![Value::Integer(4), Value::Integer(8),]); - } - - #[test] - fn test_projection_more_output_columns_than_input() { - let schema = create_test_schema(); - let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, b, a * 2 as doubled_a, b * 3 as tripled_b, a + b as sum, hex(c) as hex_c FROM t"; - - let view = IncrementalView::from_sql(sql, &schema).unwrap(); - - assert!(view.project_operator.is_some()); - let project_op = view.project_operator.as_ref().unwrap(); - - let mut delta = Delta::new(); - delta.insert( - 1, - vec![Value::Integer(5), Value::Integer(2), Value::Integer(15)], - ); - - let mut temp_project = project_op.clone(); - temp_project.initialize(delta); - let result = temp_project.get_current_state(); - - let (output, _weight) = result.changes.first().unwrap(); - // 3 input columns -> 6 output columns - assert_eq!( - output.values, - vec![ - Value::Integer(5), // a - Value::Integer(2), // b - Value::Integer(10), // a * 2 - Value::Integer(6), // b * 3 - Value::Integer(7), // a + b - Value::Text("3135".into()), // hex(15) - SQLite converts to string "15" then hex encodes - ] - ); - } - - #[test] - fn test_aggregation_count_with_group_by() { - let schema = create_test_schema(); - let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, COUNT(*) FROM t GROUP BY a"; - - let mut view = IncrementalView::from_sql(sql, &schema).unwrap(); - - // Verify the view has an aggregate operator - assert!(view.aggregate_operator.is_some()); - - // Insert some test data - let mut delta = Delta::new(); - delta.insert( - 1, - vec![Value::Integer(1), Value::Integer(10), Value::Integer(100)], - ); - delta.insert( - 2, - vec![Value::Integer(2), Value::Integer(20), Value::Integer(200)], - ); - delta.insert( - 3, - vec![Value::Integer(1), Value::Integer(30), Value::Integer(300)], - ); - - // Process the delta - view.merge_delta(&delta); - - // Verify we only processed the 3 rows we inserted - assert_eq!(view.tracker.lock().unwrap().aggregation_updates, 3); - - // Check the aggregated results - let results = view.current_data(None); - - // Should have 2 groups: a=1 with count=2, a=2 with count=1 - assert_eq!(results.len(), 2); - - // Find the group with a=1 - let group1 = results - .iter() - .find(|(_, vals)| vals[0] == Value::Integer(1)) - .unwrap(); - assert_eq!(group1.1[0], Value::Integer(1)); // a=1 - assert_eq!(group1.1[1], Value::Integer(2)); // COUNT(*)=2 - - // Find the group with a=2 - let group2 = results - .iter() - .find(|(_, vals)| vals[0] == Value::Integer(2)) - .unwrap(); - assert_eq!(group2.1[0], Value::Integer(2)); // a=2 - assert_eq!(group2.1[1], Value::Integer(1)); // COUNT(*)=1 - } - - #[test] - fn test_aggregation_sum_with_filter() { - let schema = create_test_schema(); - let sql = "CREATE MATERIALIZED VIEW v AS SELECT SUM(b) FROM t WHERE a > 1"; - - let mut view = IncrementalView::from_sql(sql, &schema).unwrap(); - - assert!(view.aggregate_operator.is_some()); - assert!(view.filter_operator.is_some()); - - let mut delta = Delta::new(); - delta.insert( - 1, - vec![Value::Integer(1), Value::Integer(10), Value::Integer(100)], - ); - delta.insert( - 2, - vec![Value::Integer(2), Value::Integer(20), Value::Integer(200)], - ); - delta.insert( - 3, - vec![Value::Integer(3), Value::Integer(30), Value::Integer(300)], - ); - - view.merge_delta(&delta); - - // Should filter all 3 rows - assert_eq!(view.tracker.lock().unwrap().filter_evaluations, 3); - // But only aggregate the 2 that passed the filter (a > 1) - assert_eq!(view.tracker.lock().unwrap().aggregation_updates, 2); - - let results = view.current_data(None); - - // Should have 1 row with sum of b where a > 1 - assert_eq!(results.len(), 1); - assert_eq!(results[0].1[0], Value::Integer(50)); // SUM(b) = 20 + 30 - } - - #[test] - fn test_aggregation_incremental_updates() { - let schema = create_test_schema(); - let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, COUNT(*), SUM(b) FROM t GROUP BY a"; - - let mut view = IncrementalView::from_sql(sql, &schema).unwrap(); - - // Initial insert - let mut delta1 = Delta::new(); - delta1.insert( - 1, - vec![Value::Integer(1), Value::Integer(10), Value::Integer(100)], - ); - delta1.insert( - 2, - vec![Value::Integer(1), Value::Integer(20), Value::Integer(200)], - ); - - view.merge_delta(&delta1); - - // Verify we processed exactly 2 rows for the first batch - assert_eq!(view.tracker.lock().unwrap().aggregation_updates, 2); - - // Check initial state - let results1 = view.current_data(None); - assert_eq!(results1.len(), 1); - assert_eq!(results1[0].1[1], Value::Integer(2)); // COUNT(*)=2 - assert_eq!(results1[0].1[2], Value::Integer(30)); // SUM(b)=30 - - // Reset counter to track second batch separately - view.tracker.lock().unwrap().aggregation_updates = 0; - - // Add more data - let mut delta2 = Delta::new(); - delta2.insert( - 3, - vec![Value::Integer(1), Value::Integer(5), Value::Integer(300)], - ); - delta2.insert( - 4, - vec![Value::Integer(2), Value::Integer(15), Value::Integer(400)], - ); - - view.merge_delta(&delta2); - - // Should only process the 2 new rows, not recompute everything - assert_eq!(view.tracker.lock().unwrap().aggregation_updates, 2); - - // Check updated state - let results2 = view.current_data(None); - assert_eq!(results2.len(), 2); - - // Group a=1 - let group1 = results2 - .iter() - .find(|(_, vals)| vals[0] == Value::Integer(1)) - .unwrap(); - assert_eq!(group1.1[1], Value::Integer(3)); // COUNT(*)=3 - assert_eq!(group1.1[2], Value::Integer(35)); // SUM(b)=35 - - // Group a=2 - let group2 = results2 - .iter() - .find(|(_, vals)| vals[0] == Value::Integer(2)) - .unwrap(); - assert_eq!(group2.1[1], Value::Integer(1)); // COUNT(*)=1 - assert_eq!(group2.1[2], Value::Integer(15)); // SUM(b)=15 - } -}