mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-02 23:04:23 +01:00
adjust views to use circuits
This commit is contained in:
@@ -1,13 +1,12 @@
|
||||
use super::compiler::{DbspCircuit, DbspCompiler, DeltaSet};
|
||||
use super::dbsp::{RowKeyStream, RowKeyZSet};
|
||||
use super::operator::{
|
||||
AggregateFunction, AggregateOperator, ComputationTracker, Delta, FilterOperator,
|
||||
FilterPredicate, IncrementalOperator, ProjectOperator,
|
||||
};
|
||||
use super::operator::{ComputationTracker, Delta, FilterPredicate};
|
||||
use crate::schema::{BTreeTable, Column, Schema};
|
||||
use crate::translate::logical::LogicalPlanBuilder;
|
||||
use crate::types::{IOCompletions, IOResult, Value};
|
||||
use crate::util::{extract_column_name_from_expr, extract_view_columns};
|
||||
use crate::util::extract_view_columns;
|
||||
use crate::{io_yield_one, Completion, LimboError, Result, Statement};
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use turso_parser::ast;
|
||||
@@ -60,8 +59,7 @@ pub struct ViewTransactionState {
|
||||
/// for large aggregations, because then we don't have to re-compute when opening the database
|
||||
/// again.
|
||||
///
|
||||
/// Right now we are supporting the simplest views by keeping the operators in the view and
|
||||
/// applying them in a sane order. But the general solution would turn this into a DBSP circuit.
|
||||
/// Uses DBSP circuits for incremental computation.
|
||||
#[derive(Debug)]
|
||||
pub struct IncrementalView {
|
||||
// Stream of row keys for this view
|
||||
@@ -75,12 +73,11 @@ pub struct IncrementalView {
|
||||
// The SELECT statement that defines how to transform input data
|
||||
pub select_stmt: ast::Select,
|
||||
|
||||
// Internal filter operator for predicate evaluation
|
||||
filter_operator: Option<FilterOperator>,
|
||||
// Internal project operator for value transformation
|
||||
project_operator: Option<ProjectOperator>,
|
||||
// Internal aggregate operator for GROUP BY and aggregations
|
||||
aggregate_operator: Option<AggregateOperator>,
|
||||
// DBSP circuit that encapsulates the computation
|
||||
circuit: DbspCircuit,
|
||||
// Track whether circuit has been initialized with data
|
||||
circuit_initialized: bool,
|
||||
|
||||
// Tables referenced by this view (extracted from FROM clause and JOINs)
|
||||
base_table: Arc<BTreeTable>,
|
||||
// The view's output columns with their types
|
||||
@@ -108,6 +105,25 @@ impl IncrementalView {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Try to compile the SELECT statement into a DBSP circuit
|
||||
fn try_compile_circuit(
|
||||
select: &ast::Select,
|
||||
schema: &Schema,
|
||||
_base_table: &Arc<BTreeTable>,
|
||||
) -> Result<DbspCircuit> {
|
||||
// Build the logical plan from the SELECT statement
|
||||
let mut builder = LogicalPlanBuilder::new(schema);
|
||||
// Convert Select to a Stmt for the builder
|
||||
let stmt = ast::Stmt::Select(select.clone());
|
||||
let logical_plan = builder.build_statement(&stmt)?;
|
||||
|
||||
// Compile the logical plan to a DBSP circuit
|
||||
let compiler = DbspCompiler::new();
|
||||
let circuit = compiler.compile(&logical_plan)?;
|
||||
|
||||
Ok(circuit)
|
||||
}
|
||||
|
||||
/// Get an iterator over column names, using enumerated naming for unnamed columns
|
||||
pub fn column_names(&self) -> impl Iterator<Item = String> + '_ {
|
||||
self.columns.iter().enumerate().map(|(i, col)| {
|
||||
@@ -136,14 +152,6 @@ impl IncrementalView {
|
||||
false
|
||||
}
|
||||
|
||||
/// Apply filter operator to check if values pass the view's WHERE clause
|
||||
fn apply_filter(&self, values: &[Value]) -> bool {
|
||||
if let Some(ref filter_op) = self.filter_operator {
|
||||
filter_op.evaluate_predicate(values)
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
pub fn from_sql(sql: &str, schema: &Schema) -> Result<Self> {
|
||||
let mut parser = Parser::new(sql.as_bytes());
|
||||
let cmd = parser.next_cmd()?;
|
||||
@@ -173,10 +181,6 @@ impl IncrementalView {
|
||||
// Extract output columns using the shared function
|
||||
let view_columns = extract_view_columns(&select, schema);
|
||||
|
||||
// Extract GROUP BY columns and aggregate functions
|
||||
let (group_by_columns, aggregate_functions, _old_output_names) =
|
||||
Self::extract_aggregation_info(&select);
|
||||
|
||||
let (join_tables, join_condition) = Self::extract_join_info(&select);
|
||||
if join_tables.is_some() || join_condition.is_some() {
|
||||
return Err(LimboError::ParseError(
|
||||
@@ -199,105 +203,43 @@ impl IncrementalView {
|
||||
));
|
||||
};
|
||||
|
||||
let base_table_column_names = base_table
|
||||
.columns
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, col)| col.name.clone().unwrap_or_else(|| format!("column_{i}")))
|
||||
.collect();
|
||||
|
||||
Self::new(
|
||||
name,
|
||||
Vec::new(), // Empty initial data
|
||||
where_predicate,
|
||||
select.clone(),
|
||||
base_table,
|
||||
base_table_column_names,
|
||||
view_columns,
|
||||
group_by_columns,
|
||||
aggregate_functions,
|
||||
schema,
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
name: String,
|
||||
initial_data: Vec<(i64, Vec<Value>)>,
|
||||
where_predicate: FilterPredicate,
|
||||
select_stmt: ast::Select,
|
||||
base_table: Arc<BTreeTable>,
|
||||
base_table_column_names: Vec<String>,
|
||||
columns: Vec<Column>,
|
||||
group_by_columns: Vec<String>,
|
||||
aggregate_functions: Vec<AggregateFunction>,
|
||||
schema: &Schema,
|
||||
) -> Result<Self> {
|
||||
let mut records = BTreeMap::new();
|
||||
|
||||
for (row_key, values) in initial_data {
|
||||
records.insert(row_key, values);
|
||||
}
|
||||
|
||||
// Create initial stream with row keys
|
||||
let mut zset = RowKeyZSet::new();
|
||||
for (row_key, values) in &records {
|
||||
use crate::incremental::hashable_row::HashableRow;
|
||||
let row = HashableRow::new(*row_key, values.clone());
|
||||
zset.insert(row, 1);
|
||||
}
|
||||
let records = BTreeMap::new();
|
||||
|
||||
// Create the tracker that will be shared by all operators
|
||||
let tracker = Arc::new(Mutex::new(ComputationTracker::new()));
|
||||
|
||||
// Create filter operator if we have a predicate
|
||||
let filter_operator = if !matches!(where_predicate, FilterPredicate::None) {
|
||||
let mut filter_op =
|
||||
FilterOperator::new(where_predicate.clone(), base_table_column_names.clone());
|
||||
filter_op.set_tracker(tracker.clone());
|
||||
Some(filter_op)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Compile the SELECT statement into a DBSP circuit
|
||||
let circuit = Self::try_compile_circuit(&select_stmt, schema, &base_table)?;
|
||||
|
||||
// Check if this is an aggregated view
|
||||
let is_aggregated = !group_by_columns.is_empty() || !aggregate_functions.is_empty();
|
||||
|
||||
// Create aggregate operator if needed
|
||||
let aggregate_operator = if is_aggregated {
|
||||
let mut agg_op = AggregateOperator::new(
|
||||
group_by_columns,
|
||||
aggregate_functions,
|
||||
base_table_column_names.clone(),
|
||||
);
|
||||
agg_op.set_tracker(tracker.clone());
|
||||
Some(agg_op)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Only create project operator for non-aggregated views
|
||||
let project_operator = if !is_aggregated {
|
||||
let mut proj_op = ProjectOperator::from_select(
|
||||
&select_stmt,
|
||||
base_table_column_names.clone(),
|
||||
schema,
|
||||
)?;
|
||||
proj_op.set_tracker(tracker.clone());
|
||||
Some(proj_op)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Circuit will be initialized when we first call merge_delta
|
||||
let circuit_initialized = false;
|
||||
|
||||
Ok(Self {
|
||||
stream: RowKeyStream::from_zset(zset),
|
||||
stream: RowKeyStream::from_zset(RowKeyZSet::new()),
|
||||
name,
|
||||
records,
|
||||
where_predicate,
|
||||
select_stmt,
|
||||
filter_operator,
|
||||
project_operator,
|
||||
aggregate_operator,
|
||||
circuit,
|
||||
circuit_initialized,
|
||||
base_table,
|
||||
columns,
|
||||
populate_state: PopulateState::Start,
|
||||
@@ -338,46 +280,28 @@ impl IncrementalView {
|
||||
// Get the base table from referenced tables
|
||||
let table = &self.base_table;
|
||||
|
||||
// Build column list for SELECT clause
|
||||
let select_columns = if let Some(ref project_op) = self.project_operator {
|
||||
// Get the columns used by the projection operator
|
||||
let mut columns = Vec::new();
|
||||
for col in project_op.columns() {
|
||||
// Check if it's a simple column reference
|
||||
if let turso_parser::ast::Expr::Id(name) = &col.expr {
|
||||
columns.push(name.as_str().to_string());
|
||||
} else {
|
||||
// For expressions, we need all columns (for now)
|
||||
columns.clear();
|
||||
columns.push("*".to_string());
|
||||
break;
|
||||
}
|
||||
}
|
||||
if columns.is_empty() || columns.contains(&"*".to_string()) {
|
||||
"*".to_string()
|
||||
} else {
|
||||
// Add the columns and always include rowid
|
||||
columns.join(", ").to_string()
|
||||
}
|
||||
} else {
|
||||
// No projection, use all columns
|
||||
// Check if the table has a rowid alias (INTEGER PRIMARY KEY column)
|
||||
let has_rowid_alias = table.columns.iter().any(|col| col.is_rowid_alias);
|
||||
|
||||
// For now, select all columns since we don't have the static operators
|
||||
// The circuit will handle filtering and projection
|
||||
// If there's a rowid alias, we don't need to select rowid separately
|
||||
let select_clause = if has_rowid_alias {
|
||||
"*".to_string()
|
||||
} else {
|
||||
"*, rowid".to_string()
|
||||
};
|
||||
|
||||
// Build WHERE clause from filter operator
|
||||
let where_clause = if let Some(ref filter_op) = self.filter_operator {
|
||||
self.build_where_clause(filter_op.predicate())?
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
// Build WHERE clause from the where_predicate
|
||||
let where_clause = self.build_where_clause(&self.where_predicate)?;
|
||||
|
||||
// Construct the final query
|
||||
let query = if where_clause.is_empty() {
|
||||
format!("SELECT {}, rowid FROM {}", select_columns, table.name)
|
||||
format!("SELECT {} FROM {}", select_clause, table.name)
|
||||
} else {
|
||||
format!(
|
||||
"SELECT {}, rowid FROM {} WHERE {}",
|
||||
select_columns, table.name, where_clause
|
||||
"SELECT {} FROM {} WHERE {}",
|
||||
select_clause, table.name, where_clause
|
||||
)
|
||||
};
|
||||
Ok(query)
|
||||
@@ -494,20 +418,40 @@ impl IncrementalView {
|
||||
let all_values: Vec<crate::types::Value> =
|
||||
row.get_values().cloned().collect();
|
||||
|
||||
// The last value should be the rowid
|
||||
let rowid = match all_values.last() {
|
||||
Some(crate::types::Value::Integer(id)) => *id,
|
||||
_ => {
|
||||
// This shouldn't happen - rowid must be an integer
|
||||
*rows_processed += 1;
|
||||
batch_count += 1;
|
||||
continue;
|
||||
}
|
||||
// Determine how to extract the rowid
|
||||
// If there's a rowid alias (INTEGER PRIMARY KEY), the rowid is one of the columns
|
||||
// Otherwise, it's the last value we explicitly selected
|
||||
let (rowid, values) = if let Some((idx, _)) =
|
||||
self.base_table.get_rowid_alias_column()
|
||||
{
|
||||
// The rowid is the value at the rowid alias column index
|
||||
let rowid = match all_values.get(idx) {
|
||||
Some(crate::types::Value::Integer(id)) => *id,
|
||||
_ => {
|
||||
// This shouldn't happen - rowid alias must be an integer
|
||||
*rows_processed += 1;
|
||||
batch_count += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// All values are table columns (no separate rowid was selected)
|
||||
(rowid, all_values)
|
||||
} else {
|
||||
// The last value is the explicitly selected rowid
|
||||
let rowid = match all_values.last() {
|
||||
Some(crate::types::Value::Integer(id)) => *id,
|
||||
_ => {
|
||||
// This shouldn't happen - rowid must be an integer
|
||||
*rows_processed += 1;
|
||||
batch_count += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// Get all values except the rowid
|
||||
let values = all_values[..all_values.len() - 1].to_vec();
|
||||
(rowid, values)
|
||||
};
|
||||
|
||||
// Get all values except the rowid
|
||||
let values = all_values[..all_values.len() - 1].to_vec();
|
||||
|
||||
// Add to batch delta - let merge_delta handle filtering and aggregation
|
||||
batch_delta.insert(rowid, values);
|
||||
|
||||
@@ -542,120 +486,6 @@ impl IncrementalView {
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract GROUP BY columns and aggregate functions from SELECT statement
|
||||
fn extract_aggregation_info(
|
||||
select: &ast::Select,
|
||||
) -> (Vec<String>, Vec<AggregateFunction>, Vec<String>) {
|
||||
use turso_parser::ast::*;
|
||||
|
||||
let mut group_by_columns = Vec::new();
|
||||
let mut aggregate_functions = Vec::new();
|
||||
let mut output_column_names = Vec::new();
|
||||
|
||||
if let OneSelect::Select {
|
||||
ref group_by,
|
||||
ref columns,
|
||||
..
|
||||
} = select.body.select
|
||||
{
|
||||
// Extract GROUP BY columns
|
||||
if let Some(group_by) = group_by {
|
||||
for expr in &group_by.exprs {
|
||||
if let Some(col_name) = extract_column_name_from_expr(expr) {
|
||||
group_by_columns.push(col_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Extract aggregate functions and column names/aliases from SELECT list
|
||||
for result_col in columns {
|
||||
match result_col {
|
||||
ResultColumn::Expr(expr, alias) => {
|
||||
// Extract aggregate functions
|
||||
let mut found_aggregates = Vec::new();
|
||||
Self::extract_aggregates_from_expr(expr, &mut found_aggregates);
|
||||
|
||||
// Determine the output column name
|
||||
let col_name = if let Some(As::As(alias_name)) = alias {
|
||||
// Use the provided alias
|
||||
alias_name.as_str().to_string()
|
||||
} else if !found_aggregates.is_empty() {
|
||||
// Use the default name from the aggregate function
|
||||
found_aggregates[0].default_output_name()
|
||||
} else if let Some(name) = extract_column_name_from_expr(expr) {
|
||||
// Use the column name
|
||||
name
|
||||
} else {
|
||||
// Fallback to a generic name
|
||||
format!("column{}", output_column_names.len() + 1)
|
||||
};
|
||||
|
||||
output_column_names.push(col_name);
|
||||
aggregate_functions.extend(found_aggregates);
|
||||
}
|
||||
ResultColumn::Star => {
|
||||
// For SELECT *, we'd need to know the base table columns
|
||||
// This is handled elsewhere
|
||||
}
|
||||
ResultColumn::TableStar(_) => {
|
||||
// Similar to Star, but for a specific table
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(group_by_columns, aggregate_functions, output_column_names)
|
||||
}
|
||||
|
||||
/// Recursively extract aggregate functions from an expression
|
||||
fn extract_aggregates_from_expr(
|
||||
expr: &ast::Expr,
|
||||
aggregate_functions: &mut Vec<AggregateFunction>,
|
||||
) {
|
||||
use crate::function::Func;
|
||||
use turso_parser::ast::*;
|
||||
|
||||
match expr {
|
||||
// Handle COUNT(*) and similar aggregate functions with *
|
||||
Expr::FunctionCallStar { name, .. } => {
|
||||
// FunctionCallStar is typically COUNT(*), which has 0 args
|
||||
if let Ok(func) = Func::resolve_function(name.as_str(), 0) {
|
||||
// Use the centralized mapping from operator.rs
|
||||
// For COUNT(*), we pass None as the input column
|
||||
if let Some(agg_func) = AggregateFunction::from_sql_function(&func, None) {
|
||||
aggregate_functions.push(agg_func);
|
||||
}
|
||||
}
|
||||
}
|
||||
Expr::FunctionCall { name, args, .. } => {
|
||||
// Regular function calls with arguments
|
||||
let arg_count = args.len();
|
||||
|
||||
if let Ok(func) = Func::resolve_function(name.as_str(), arg_count) {
|
||||
// Extract the input column if there's an argument
|
||||
let input_column = if arg_count > 0 {
|
||||
args.first().and_then(extract_column_name_from_expr)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Use the centralized mapping from operator.rs
|
||||
if let Some(agg_func) =
|
||||
AggregateFunction::from_sql_function(&func, input_column)
|
||||
{
|
||||
aggregate_functions.push(agg_func);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Recursively check binary expressions, etc.
|
||||
Expr::Binary(left, _, right) => {
|
||||
Self::extract_aggregates_from_expr(left, aggregate_functions);
|
||||
Self::extract_aggregates_from_expr(right, aggregate_functions);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract JOIN information from SELECT statement
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn extract_join_info(
|
||||
@@ -743,50 +573,36 @@ impl IncrementalView {
|
||||
|
||||
/// Get current data merged with transaction state
|
||||
pub fn current_data(&self, tx_state: Option<&ViewTransactionState>) -> Vec<(i64, Vec<Value>)> {
|
||||
// Start with committed records
|
||||
|
||||
if let Some(tx_state) = tx_state {
|
||||
// processed_delta = input delta for now. Need to apply operations
|
||||
let processed_delta = &tx_state.delta;
|
||||
// Use circuit to process uncommitted changes
|
||||
let mut uncommitted = DeltaSet::new();
|
||||
uncommitted.insert(self.base_table.name.clone(), tx_state.delta.clone());
|
||||
|
||||
// For non-aggregation views, merge the processed delta with committed records
|
||||
let mut result_map: BTreeMap<i64, Vec<Value>> = self.records.clone();
|
||||
|
||||
for (row, weight) in &processed_delta.changes {
|
||||
if *weight > 0 && self.apply_filter(&row.values) {
|
||||
result_map.insert(row.rowid, row.values.clone());
|
||||
} else if *weight < 0 {
|
||||
result_map.remove(&row.rowid);
|
||||
// Execute with uncommitted changes (won't affect circuit state)
|
||||
match self.circuit.execute(HashMap::new(), uncommitted) {
|
||||
Ok(processed_delta) => {
|
||||
// Merge processed delta with committed records
|
||||
let mut result_map: BTreeMap<i64, Vec<Value>> = self.records.clone();
|
||||
for (row, weight) in &processed_delta.changes {
|
||||
if *weight > 0 {
|
||||
result_map.insert(row.rowid, row.values.clone());
|
||||
} else if *weight < 0 {
|
||||
result_map.remove(&row.rowid);
|
||||
}
|
||||
}
|
||||
result_map.into_iter().collect()
|
||||
}
|
||||
Err(e) => {
|
||||
// Return error or panic - no fallback
|
||||
panic!("Failed to execute circuit with uncommitted data: {e:?}");
|
||||
}
|
||||
}
|
||||
|
||||
result_map.into_iter().collect()
|
||||
} else {
|
||||
// No transaction state: return committed records
|
||||
self.records.clone().into_iter().collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply filter operator to a delta if present and commit the changes
|
||||
fn apply_filter_to_delta(&mut self, delta: Delta) -> Delta {
|
||||
if let Some(ref mut filter_op) = self.filter_operator {
|
||||
// Commit updates state and returns output
|
||||
filter_op.commit(delta)
|
||||
} else {
|
||||
delta
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply aggregation operator to a delta if this is an aggregated view and commit the changes
|
||||
fn apply_aggregation_to_delta(&mut self, delta: Delta) -> Delta {
|
||||
if let Some(ref mut agg_op) = self.aggregate_operator {
|
||||
// Commit updates state and returns output
|
||||
agg_op.commit(delta)
|
||||
} else {
|
||||
delta
|
||||
}
|
||||
}
|
||||
|
||||
/// Merge a delta of changes into the view's current state
|
||||
pub fn merge_delta(&mut self, delta: &Delta) {
|
||||
// Early return if delta is empty
|
||||
@@ -794,16 +610,33 @@ impl IncrementalView {
|
||||
return;
|
||||
}
|
||||
|
||||
// Apply operators in pipeline
|
||||
let mut current_delta = delta.clone();
|
||||
current_delta = self.apply_filter_to_delta(current_delta);
|
||||
// Use the circuit to process the delta
|
||||
let mut input_data = HashMap::new();
|
||||
input_data.insert(self.base_table.name.clone(), delta.clone());
|
||||
|
||||
// Apply projection operator if present (for non-aggregated views)
|
||||
if let Some(ref mut project_op) = self.project_operator {
|
||||
current_delta = project_op.commit(current_delta);
|
||||
// If circuit hasn't been initialized yet, initialize it first
|
||||
// This happens during populate_from_table
|
||||
if !self.circuit_initialized {
|
||||
// Initialize the circuit with empty state
|
||||
self.circuit
|
||||
.initialize(HashMap::new())
|
||||
.expect("Failed to initialize circuit");
|
||||
self.circuit_initialized = true;
|
||||
}
|
||||
|
||||
current_delta = self.apply_aggregation_to_delta(current_delta);
|
||||
// Execute the circuit to process the delta
|
||||
let current_delta = match self.circuit.execute(input_data.clone(), DeltaSet::empty()) {
|
||||
Ok(output) => {
|
||||
// Commit the changes to the circuit's internal state
|
||||
self.circuit
|
||||
.commit(input_data)
|
||||
.expect("Failed to commit to circuit");
|
||||
output
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("Failed to execute circuit: {e:?}");
|
||||
}
|
||||
};
|
||||
|
||||
// Update records and stream with the processed delta
|
||||
let mut zset_delta = RowKeyZSet::new();
|
||||
@@ -821,441 +654,3 @@ impl IncrementalView {
|
||||
self.stream.apply_delta(&zset_delta);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::incremental::operator::{Delta, IncrementalOperator};
|
||||
use crate::schema::{BTreeTable, Column, Schema, Type};
|
||||
use crate::types::Value;
|
||||
use std::sync::Arc;
|
||||
fn create_test_schema() -> Schema {
|
||||
let mut schema = Schema::new(false);
|
||||
let table = BTreeTable {
|
||||
root_page: 1,
|
||||
name: "t".to_string(),
|
||||
columns: vec![
|
||||
Column {
|
||||
name: Some("a".to_string()),
|
||||
ty: Type::Integer,
|
||||
ty_str: "INTEGER".to_string(),
|
||||
primary_key: false,
|
||||
is_rowid_alias: false,
|
||||
notnull: false,
|
||||
default: None,
|
||||
unique: false,
|
||||
collation: None,
|
||||
hidden: false,
|
||||
},
|
||||
Column {
|
||||
name: Some("b".to_string()),
|
||||
ty: Type::Integer,
|
||||
ty_str: "INTEGER".to_string(),
|
||||
primary_key: false,
|
||||
is_rowid_alias: false,
|
||||
notnull: false,
|
||||
default: None,
|
||||
unique: false,
|
||||
collation: None,
|
||||
hidden: false,
|
||||
},
|
||||
Column {
|
||||
name: Some("c".to_string()),
|
||||
ty: Type::Integer,
|
||||
ty_str: "INTEGER".to_string(),
|
||||
primary_key: false,
|
||||
is_rowid_alias: false,
|
||||
notnull: false,
|
||||
default: None,
|
||||
unique: false,
|
||||
collation: None,
|
||||
hidden: false,
|
||||
},
|
||||
],
|
||||
primary_key_columns: vec![],
|
||||
has_rowid: true,
|
||||
is_strict: false,
|
||||
unique_sets: None,
|
||||
};
|
||||
schema.add_btree_table(Arc::new(table));
|
||||
schema
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_simple_columns() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, b FROM t";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(10), Value::Integer(20), Value::Integer(30)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
assert_eq!(output.values, vec![Value::Integer(10), Value::Integer(20)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_arithmetic_expression() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a * 2 as doubled FROM t";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(4), Value::Integer(2), Value::Integer(0)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
assert_eq!(output.values, vec![Value::Integer(8)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_multiple_expressions() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a + b as sum, a - b as diff, c FROM t";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(10), Value::Integer(3), Value::Integer(7)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
assert_eq!(
|
||||
output.values,
|
||||
vec![Value::Integer(13), Value::Integer(7), Value::Integer(7),]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_function_call() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT abs(a - 300) as abs_diff, b FROM t";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(255), Value::Integer(20), Value::Integer(30)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
// abs(255 - 300) = abs(-45) = 45
|
||||
assert_eq!(output.values, vec![Value::Integer(45), Value::Integer(20),]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_mixed_columns_and_expressions() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, b * 2 as doubled, c, a + b + c as total FROM t";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(1), Value::Integer(5), Value::Integer(3)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
assert_eq!(
|
||||
output.values,
|
||||
vec![
|
||||
Value::Integer(1),
|
||||
Value::Integer(10),
|
||||
Value::Integer(3),
|
||||
Value::Integer(9),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_complex_expression() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT (a * 2) + (b * 3) as weighted, c / 2 as half FROM t";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(5), Value::Integer(2), Value::Integer(10)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
assert_eq!(output.values, vec![Value::Integer(16), Value::Integer(5),]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_with_where_clause() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, a * 2 as doubled FROM t WHERE b > 2";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
assert!(view.filter_operator.is_some());
|
||||
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(4), Value::Integer(3), Value::Integer(0)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
assert_eq!(output.values, vec![Value::Integer(4), Value::Integer(8),]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_more_output_columns_than_input() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, b, a * 2 as doubled_a, b * 3 as tripled_b, a + b as sum, hex(c) as hex_c FROM t";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(5), Value::Integer(2), Value::Integer(15)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
// 3 input columns -> 6 output columns
|
||||
assert_eq!(
|
||||
output.values,
|
||||
vec![
|
||||
Value::Integer(5), // a
|
||||
Value::Integer(2), // b
|
||||
Value::Integer(10), // a * 2
|
||||
Value::Integer(6), // b * 3
|
||||
Value::Integer(7), // a + b
|
||||
Value::Text("3135".into()), // hex(15) - SQLite converts to string "15" then hex encodes
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_aggregation_count_with_group_by() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, COUNT(*) FROM t GROUP BY a";
|
||||
|
||||
let mut view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
// Verify the view has an aggregate operator
|
||||
assert!(view.aggregate_operator.is_some());
|
||||
|
||||
// Insert some test data
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(1), Value::Integer(10), Value::Integer(100)],
|
||||
);
|
||||
delta.insert(
|
||||
2,
|
||||
vec![Value::Integer(2), Value::Integer(20), Value::Integer(200)],
|
||||
);
|
||||
delta.insert(
|
||||
3,
|
||||
vec![Value::Integer(1), Value::Integer(30), Value::Integer(300)],
|
||||
);
|
||||
|
||||
// Process the delta
|
||||
view.merge_delta(&delta);
|
||||
|
||||
// Verify we only processed the 3 rows we inserted
|
||||
assert_eq!(view.tracker.lock().unwrap().aggregation_updates, 3);
|
||||
|
||||
// Check the aggregated results
|
||||
let results = view.current_data(None);
|
||||
|
||||
// Should have 2 groups: a=1 with count=2, a=2 with count=1
|
||||
assert_eq!(results.len(), 2);
|
||||
|
||||
// Find the group with a=1
|
||||
let group1 = results
|
||||
.iter()
|
||||
.find(|(_, vals)| vals[0] == Value::Integer(1))
|
||||
.unwrap();
|
||||
assert_eq!(group1.1[0], Value::Integer(1)); // a=1
|
||||
assert_eq!(group1.1[1], Value::Integer(2)); // COUNT(*)=2
|
||||
|
||||
// Find the group with a=2
|
||||
let group2 = results
|
||||
.iter()
|
||||
.find(|(_, vals)| vals[0] == Value::Integer(2))
|
||||
.unwrap();
|
||||
assert_eq!(group2.1[0], Value::Integer(2)); // a=2
|
||||
assert_eq!(group2.1[1], Value::Integer(1)); // COUNT(*)=1
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_aggregation_sum_with_filter() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT SUM(b) FROM t WHERE a > 1";
|
||||
|
||||
let mut view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.aggregate_operator.is_some());
|
||||
assert!(view.filter_operator.is_some());
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(1), Value::Integer(10), Value::Integer(100)],
|
||||
);
|
||||
delta.insert(
|
||||
2,
|
||||
vec![Value::Integer(2), Value::Integer(20), Value::Integer(200)],
|
||||
);
|
||||
delta.insert(
|
||||
3,
|
||||
vec![Value::Integer(3), Value::Integer(30), Value::Integer(300)],
|
||||
);
|
||||
|
||||
view.merge_delta(&delta);
|
||||
|
||||
// Should filter all 3 rows
|
||||
assert_eq!(view.tracker.lock().unwrap().filter_evaluations, 3);
|
||||
// But only aggregate the 2 that passed the filter (a > 1)
|
||||
assert_eq!(view.tracker.lock().unwrap().aggregation_updates, 2);
|
||||
|
||||
let results = view.current_data(None);
|
||||
|
||||
// Should have 1 row with sum of b where a > 1
|
||||
assert_eq!(results.len(), 1);
|
||||
assert_eq!(results[0].1[0], Value::Integer(50)); // SUM(b) = 20 + 30
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_aggregation_incremental_updates() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, COUNT(*), SUM(b) FROM t GROUP BY a";
|
||||
|
||||
let mut view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
// Initial insert
|
||||
let mut delta1 = Delta::new();
|
||||
delta1.insert(
|
||||
1,
|
||||
vec![Value::Integer(1), Value::Integer(10), Value::Integer(100)],
|
||||
);
|
||||
delta1.insert(
|
||||
2,
|
||||
vec![Value::Integer(1), Value::Integer(20), Value::Integer(200)],
|
||||
);
|
||||
|
||||
view.merge_delta(&delta1);
|
||||
|
||||
// Verify we processed exactly 2 rows for the first batch
|
||||
assert_eq!(view.tracker.lock().unwrap().aggregation_updates, 2);
|
||||
|
||||
// Check initial state
|
||||
let results1 = view.current_data(None);
|
||||
assert_eq!(results1.len(), 1);
|
||||
assert_eq!(results1[0].1[1], Value::Integer(2)); // COUNT(*)=2
|
||||
assert_eq!(results1[0].1[2], Value::Integer(30)); // SUM(b)=30
|
||||
|
||||
// Reset counter to track second batch separately
|
||||
view.tracker.lock().unwrap().aggregation_updates = 0;
|
||||
|
||||
// Add more data
|
||||
let mut delta2 = Delta::new();
|
||||
delta2.insert(
|
||||
3,
|
||||
vec![Value::Integer(1), Value::Integer(5), Value::Integer(300)],
|
||||
);
|
||||
delta2.insert(
|
||||
4,
|
||||
vec![Value::Integer(2), Value::Integer(15), Value::Integer(400)],
|
||||
);
|
||||
|
||||
view.merge_delta(&delta2);
|
||||
|
||||
// Should only process the 2 new rows, not recompute everything
|
||||
assert_eq!(view.tracker.lock().unwrap().aggregation_updates, 2);
|
||||
|
||||
// Check updated state
|
||||
let results2 = view.current_data(None);
|
||||
assert_eq!(results2.len(), 2);
|
||||
|
||||
// Group a=1
|
||||
let group1 = results2
|
||||
.iter()
|
||||
.find(|(_, vals)| vals[0] == Value::Integer(1))
|
||||
.unwrap();
|
||||
assert_eq!(group1.1[1], Value::Integer(3)); // COUNT(*)=3
|
||||
assert_eq!(group1.1[2], Value::Integer(35)); // SUM(b)=35
|
||||
|
||||
// Group a=2
|
||||
let group2 = results2
|
||||
.iter()
|
||||
.find(|(_, vals)| vals[0] == Value::Integer(2))
|
||||
.unwrap();
|
||||
assert_eq!(group2.1[1], Value::Integer(1)); // COUNT(*)=1
|
||||
assert_eq!(group2.1[2], Value::Integer(15)); // SUM(b)=15
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user