mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-25 20:14:21 +01:00
The next step is to adapt the view code to use circuits instead of listing the operators manually.
2923 lines
102 KiB
Rust
2923 lines
102 KiB
Rust
//! DBSP Compiler: Converts Logical Plans to DBSP Circuits
|
||
//!
|
||
//! This module implements compilation from SQL logical plans to DBSP circuits.
|
||
//! The initial version supports only filter and projection operators.
|
||
//!
|
||
//! Based on the DBSP paper: "DBSP: Automatic Incremental View Maintenance for Rich Query Languages"
|
||
|
||
use crate::incremental::expr_compiler::CompiledExpression;
|
||
use crate::incremental::operator::{
|
||
Delta, FilterOperator, FilterPredicate, IncrementalOperator, ProjectOperator,
|
||
};
|
||
// Note: logical module must be made pub(crate) in translate/mod.rs
|
||
use crate::translate::logical::{BinaryOperator, LogicalExpr, LogicalPlan, SchemaRef};
|
||
use crate::types::Value;
|
||
use crate::{LimboError, Result};
|
||
use std::collections::HashMap;
|
||
use std::fmt::{self, Display, Formatter};
|
||
|
||
/// A set of deltas for multiple tables/operators
|
||
/// This provides a cleaner API for passing deltas through circuit execution
|
||
#[derive(Debug, Clone, Default)]
|
||
pub struct DeltaSet {
|
||
/// Deltas keyed by table/operator name
|
||
deltas: HashMap<String, Delta>,
|
||
}
|
||
|
||
impl DeltaSet {
|
||
/// Create a new empty delta set
|
||
pub fn new() -> Self {
|
||
Self {
|
||
deltas: HashMap::new(),
|
||
}
|
||
}
|
||
|
||
/// Create an empty delta set (more semantic for "no changes")
|
||
pub fn empty() -> Self {
|
||
Self {
|
||
deltas: HashMap::new(),
|
||
}
|
||
}
|
||
|
||
/// Add a delta for a table
|
||
pub fn insert(&mut self, table_name: String, delta: Delta) {
|
||
self.deltas.insert(table_name, delta);
|
||
}
|
||
|
||
/// Get delta for a table, returns empty delta if not found
|
||
pub fn get(&self, table_name: &str) -> Delta {
|
||
self.deltas
|
||
.get(table_name)
|
||
.cloned()
|
||
.unwrap_or_else(Delta::new)
|
||
}
|
||
}
|
||
|
||
/// Represents a DBSP operator in the compiled circuit
|
||
#[derive(Debug, Clone, PartialEq)]
|
||
pub enum DbspOperator {
|
||
/// Filter operator (σ) - filters records based on a predicate
|
||
Filter { predicate: DbspExpr },
|
||
/// Projection operator (π) - projects specific columns
|
||
Projection {
|
||
exprs: Vec<DbspExpr>,
|
||
schema: SchemaRef,
|
||
},
|
||
/// Aggregate operator (γ) - performs grouping and aggregation
|
||
Aggregate {
|
||
group_exprs: Vec<DbspExpr>,
|
||
aggr_exprs: Vec<crate::incremental::operator::AggregateFunction>,
|
||
schema: SchemaRef,
|
||
},
|
||
/// Input operator - source of data
|
||
Input { name: String, schema: SchemaRef },
|
||
}
|
||
|
||
/// Represents an expression in DBSP
|
||
#[derive(Debug, Clone, PartialEq)]
|
||
pub enum DbspExpr {
|
||
/// Column reference
|
||
Column(String),
|
||
/// Literal value
|
||
Literal(Value),
|
||
/// Binary expression
|
||
BinaryExpr {
|
||
left: Box<DbspExpr>,
|
||
op: BinaryOperator,
|
||
right: Box<DbspExpr>,
|
||
},
|
||
}
|
||
|
||
/// A node in the DBSP circuit DAG
|
||
pub struct DbspNode {
|
||
/// Unique identifier for this node
|
||
pub id: usize,
|
||
/// The operator metadata
|
||
pub operator: DbspOperator,
|
||
/// Input nodes (edges in the DAG)
|
||
pub inputs: Vec<usize>,
|
||
/// The actual executable operator (if applicable)
|
||
pub executable: Option<Box<dyn IncrementalOperator>>,
|
||
}
|
||
|
||
impl std::fmt::Debug for DbspNode {
|
||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||
f.debug_struct("DbspNode")
|
||
.field("id", &self.id)
|
||
.field("operator", &self.operator)
|
||
.field("inputs", &self.inputs)
|
||
.field("has_executable", &self.executable.is_some())
|
||
.finish()
|
||
}
|
||
}
|
||
|
||
/// Represents a complete DBSP circuit (DAG of operators)
|
||
#[derive(Debug)]
|
||
pub struct DbspCircuit {
|
||
/// All nodes in the circuit, indexed by their ID
|
||
pub(super) nodes: HashMap<usize, DbspNode>,
|
||
/// Counter for generating unique node IDs
|
||
next_id: usize,
|
||
/// Root node ID (the final output)
|
||
pub(super) root: Option<usize>,
|
||
}
|
||
|
||
impl DbspCircuit {
|
||
/// Create a new empty circuit
|
||
pub fn new() -> Self {
|
||
Self {
|
||
nodes: HashMap::new(),
|
||
next_id: 0,
|
||
root: None,
|
||
}
|
||
}
|
||
|
||
/// Add a node to the circuit
|
||
fn add_node(
|
||
&mut self,
|
||
operator: DbspOperator,
|
||
inputs: Vec<usize>,
|
||
executable: Option<Box<dyn IncrementalOperator>>,
|
||
) -> usize {
|
||
let id = self.next_id;
|
||
self.next_id += 1;
|
||
|
||
let node = DbspNode {
|
||
id,
|
||
operator,
|
||
inputs,
|
||
executable,
|
||
};
|
||
|
||
self.nodes.insert(id, node);
|
||
id
|
||
}
|
||
|
||
/// Initialize the circuit with base data. Should be called once before processing deltas.
|
||
/// If the database is restarting with materialized views, this can be skipped.
|
||
pub fn initialize(&mut self, input_data: HashMap<String, Delta>) -> Result<Delta> {
|
||
if let Some(root_id) = self.root {
|
||
self.initialize_node(root_id, &input_data)
|
||
} else {
|
||
Err(LimboError::ParseError(
|
||
"Circuit has no root node".to_string(),
|
||
))
|
||
}
|
||
}
|
||
|
||
/// Initialize a specific node and its dependencies
|
||
fn initialize_node(
|
||
&mut self,
|
||
node_id: usize,
|
||
input_data: &HashMap<String, Delta>,
|
||
) -> Result<Delta> {
|
||
// Clone to avoid borrow checker issues
|
||
let inputs = self
|
||
.nodes
|
||
.get(&node_id)
|
||
.ok_or_else(|| LimboError::ParseError("Node not found".to_string()))?
|
||
.inputs
|
||
.clone();
|
||
|
||
// Initialize inputs first
|
||
let mut input_deltas = Vec::new();
|
||
for input_id in inputs {
|
||
let delta = self.initialize_node(input_id, input_data)?;
|
||
input_deltas.push(delta);
|
||
}
|
||
|
||
// Get mutable reference to node
|
||
let node = self
|
||
.nodes
|
||
.get_mut(&node_id)
|
||
.ok_or_else(|| LimboError::ParseError("Node not found".to_string()))?;
|
||
|
||
// Initialize based on operator type
|
||
let result = match &node.operator {
|
||
DbspOperator::Input { name, .. } => {
|
||
// Get data from input map
|
||
input_data.get(name).cloned().unwrap_or_else(Delta::new)
|
||
}
|
||
DbspOperator::Filter { .. }
|
||
| DbspOperator::Projection { .. }
|
||
| DbspOperator::Aggregate { .. } => {
|
||
// Initialize the executable operator
|
||
if let Some(ref mut op) = node.executable {
|
||
if !input_deltas.is_empty() {
|
||
let input_delta = input_deltas[0].clone();
|
||
op.initialize(input_delta);
|
||
op.get_current_state()
|
||
} else {
|
||
Delta::new()
|
||
}
|
||
} else {
|
||
// If no executable, pass through the input
|
||
if !input_deltas.is_empty() {
|
||
input_deltas[0].clone()
|
||
} else {
|
||
Delta::new()
|
||
}
|
||
}
|
||
}
|
||
};
|
||
|
||
Ok(result)
|
||
}
|
||
|
||
/// Execute the circuit with incremental input data (deltas).
|
||
/// Call initialize() first for initial data, then use execute() for updates.
|
||
///
|
||
/// # Arguments
|
||
/// * `input_data` - The committed deltas to process
|
||
/// * `uncommitted_data` - Uncommitted transaction deltas that should be visible
|
||
/// during this execution but not stored in operators.
|
||
/// Use DeltaSet::empty() for no uncommitted changes.
|
||
pub fn execute(
|
||
&self,
|
||
input_data: HashMap<String, Delta>,
|
||
uncommitted_data: DeltaSet,
|
||
) -> Result<Delta> {
|
||
if let Some(root_id) = self.root {
|
||
self.execute_node(root_id, &input_data, &uncommitted_data)
|
||
} else {
|
||
Err(LimboError::ParseError(
|
||
"Circuit has no root node".to_string(),
|
||
))
|
||
}
|
||
}
|
||
|
||
/// Commit deltas to the circuit, updating internal operator state.
|
||
/// This should be called after execute() when you want to make changes permanent.
|
||
///
|
||
/// # Arguments
|
||
/// * `input_data` - The deltas to commit (same as what was passed to execute)
|
||
pub fn commit(&mut self, input_data: HashMap<String, Delta>) -> Result<()> {
|
||
if let Some(root_id) = self.root {
|
||
self.commit_node(root_id, &input_data)?;
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
/// Commit a specific node in the circuit
|
||
fn commit_node(
|
||
&mut self,
|
||
node_id: usize,
|
||
input_data: &HashMap<String, Delta>,
|
||
) -> Result<Delta> {
|
||
// Clone to avoid borrow checker issues
|
||
let inputs = self
|
||
.nodes
|
||
.get(&node_id)
|
||
.ok_or_else(|| LimboError::ParseError("Node not found".to_string()))?
|
||
.inputs
|
||
.clone();
|
||
|
||
// Process inputs first
|
||
let mut input_deltas = Vec::new();
|
||
for input_id in inputs {
|
||
let delta = self.commit_node(input_id, input_data)?;
|
||
input_deltas.push(delta);
|
||
}
|
||
|
||
// Get mutable reference to node
|
||
let node = self
|
||
.nodes
|
||
.get_mut(&node_id)
|
||
.ok_or_else(|| LimboError::ParseError("Node not found".to_string()))?;
|
||
|
||
// Commit based on operator type
|
||
let result = match &node.operator {
|
||
DbspOperator::Input { name, .. } => {
|
||
// For input nodes, just return the committed delta
|
||
input_data.get(name).cloned().unwrap_or_else(Delta::new)
|
||
}
|
||
DbspOperator::Filter { .. }
|
||
| DbspOperator::Projection { .. }
|
||
| DbspOperator::Aggregate { .. } => {
|
||
// Commit the delta to the executable operator
|
||
if let Some(ref mut op) = node.executable {
|
||
if !input_deltas.is_empty() {
|
||
let input_delta = input_deltas[0].clone();
|
||
// Commit updates state and returns the output delta
|
||
op.commit(input_delta)
|
||
} else {
|
||
Delta::new()
|
||
}
|
||
} else {
|
||
// If no executable, pass through the input
|
||
if !input_deltas.is_empty() {
|
||
input_deltas[0].clone()
|
||
} else {
|
||
Delta::new()
|
||
}
|
||
}
|
||
}
|
||
};
|
||
Ok(result)
|
||
}
|
||
|
||
/// Execute a specific node in the circuit
|
||
fn execute_node(
|
||
&self,
|
||
node_id: usize,
|
||
input_data: &HashMap<String, Delta>,
|
||
uncommitted_data: &DeltaSet,
|
||
) -> Result<Delta> {
|
||
// Clone to avoid borrow checker issues
|
||
let inputs = self
|
||
.nodes
|
||
.get(&node_id)
|
||
.ok_or_else(|| LimboError::ParseError("Node not found".to_string()))?
|
||
.inputs
|
||
.clone();
|
||
|
||
// Process inputs first
|
||
let mut input_deltas = Vec::new();
|
||
for input_id in inputs {
|
||
let delta = self.execute_node(input_id, input_data, uncommitted_data)?;
|
||
input_deltas.push(delta);
|
||
}
|
||
|
||
// Get reference to node (read-only since we're using eval, not commit)
|
||
let node = self
|
||
.nodes
|
||
.get(&node_id)
|
||
.ok_or_else(|| LimboError::ParseError("Node not found".to_string()))?;
|
||
|
||
// Execute based on operator type
|
||
let result = match &node.operator {
|
||
DbspOperator::Input { name, .. } => {
|
||
// Get committed data from input map and merge with uncommitted if present
|
||
let committed = input_data.get(name).cloned().unwrap_or_else(Delta::new);
|
||
let uncommitted = uncommitted_data.get(name);
|
||
|
||
// If there's uncommitted data for this table, merge it with committed
|
||
if !uncommitted.is_empty() {
|
||
let mut combined = committed;
|
||
combined.merge(&uncommitted);
|
||
combined
|
||
} else {
|
||
committed
|
||
}
|
||
}
|
||
DbspOperator::Filter { .. }
|
||
| DbspOperator::Projection { .. }
|
||
| DbspOperator::Aggregate { .. } => {
|
||
// Process delta using the executable operator
|
||
if let Some(ref op) = node.executable {
|
||
if !input_deltas.is_empty() {
|
||
// Process the delta through the operator
|
||
let input_delta = input_deltas[0].clone();
|
||
|
||
// Use eval to compute result without modifying state
|
||
// The uncommitted data has already been merged into input_delta if needed
|
||
op.eval(input_delta, None)
|
||
} else {
|
||
Delta::new()
|
||
}
|
||
} else {
|
||
// If no executable, pass through the input
|
||
if !input_deltas.is_empty() {
|
||
input_deltas[0].clone()
|
||
} else {
|
||
Delta::new()
|
||
}
|
||
}
|
||
}
|
||
};
|
||
Ok(result)
|
||
}
|
||
}
|
||
|
||
impl Display for DbspCircuit {
|
||
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
|
||
writeln!(f, "DBSP Circuit:")?;
|
||
if let Some(root_id) = self.root {
|
||
self.fmt_node(f, root_id, 0)?;
|
||
}
|
||
Ok(())
|
||
}
|
||
}
|
||
|
||
impl DbspCircuit {
|
||
fn fmt_node(&self, f: &mut Formatter, node_id: usize, depth: usize) -> fmt::Result {
|
||
let indent = " ".repeat(depth);
|
||
if let Some(node) = self.nodes.get(&node_id) {
|
||
match &node.operator {
|
||
DbspOperator::Filter { predicate } => {
|
||
writeln!(f, "{indent}Filter[{node_id}]: {predicate:?}")?;
|
||
}
|
||
DbspOperator::Projection { exprs, .. } => {
|
||
writeln!(f, "{indent}Projection[{node_id}]: {exprs:?}")?;
|
||
}
|
||
DbspOperator::Aggregate {
|
||
group_exprs,
|
||
aggr_exprs,
|
||
..
|
||
} => {
|
||
writeln!(
|
||
f,
|
||
"{indent}Aggregate[{node_id}]: GROUP BY {group_exprs:?}, AGGR {aggr_exprs:?}"
|
||
)?;
|
||
}
|
||
DbspOperator::Input { name, .. } => {
|
||
writeln!(f, "{indent}Input[{node_id}]: {name}")?;
|
||
}
|
||
}
|
||
|
||
for input_id in &node.inputs {
|
||
self.fmt_node(f, *input_id, depth + 1)?;
|
||
}
|
||
}
|
||
Ok(())
|
||
}
|
||
}
|
||
|
||
/// Compiler from LogicalPlan to DBSP Circuit
|
||
pub struct DbspCompiler {
|
||
circuit: DbspCircuit,
|
||
}
|
||
|
||
impl DbspCompiler {
|
||
/// Create a new DBSP compiler
|
||
pub fn new() -> Self {
|
||
Self {
|
||
circuit: DbspCircuit::new(),
|
||
}
|
||
}
|
||
|
||
/// Compile a logical plan to a DBSP circuit
|
||
pub fn compile(mut self, plan: &LogicalPlan) -> Result<DbspCircuit> {
|
||
let root_id = self.compile_plan(plan)?;
|
||
self.circuit.root = Some(root_id);
|
||
Ok(self.circuit)
|
||
}
|
||
|
||
/// Recursively compile a logical plan node
|
||
fn compile_plan(&mut self, plan: &LogicalPlan) -> Result<usize> {
|
||
match plan {
|
||
LogicalPlan::Projection(proj) => {
|
||
// Compile the input first
|
||
let input_id = self.compile_plan(&proj.input)?;
|
||
|
||
// Get input column names for the ProjectOperator
|
||
let input_schema = proj.input.schema();
|
||
let input_column_names: Vec<String> = input_schema.columns.iter()
|
||
.map(|(name, _)| name.clone())
|
||
.collect();
|
||
|
||
// Convert logical expressions to DBSP expressions
|
||
let dbsp_exprs = proj.exprs.iter()
|
||
.map(Self::compile_expr)
|
||
.collect::<Result<Vec<_>>>()?;
|
||
|
||
// Compile logical expressions to CompiledExpressions
|
||
let mut compiled_exprs = Vec::new();
|
||
let mut aliases = Vec::new();
|
||
for expr in &proj.exprs {
|
||
let (compiled, alias) = Self::compile_expression(expr, &input_column_names)?;
|
||
compiled_exprs.push(compiled);
|
||
aliases.push(alias);
|
||
}
|
||
|
||
// Get output column names from the projection schema
|
||
let output_column_names: Vec<String> = proj.schema.columns.iter()
|
||
.map(|(name, _)| name.clone())
|
||
.collect();
|
||
|
||
// Create the ProjectOperator
|
||
let executable: Option<Box<dyn IncrementalOperator>> =
|
||
ProjectOperator::from_compiled(compiled_exprs, aliases, input_column_names, output_column_names)
|
||
.ok()
|
||
.map(|op| Box::new(op) as Box<dyn IncrementalOperator>);
|
||
|
||
// Create projection node
|
||
let node_id = self.circuit.add_node(
|
||
DbspOperator::Projection {
|
||
exprs: dbsp_exprs,
|
||
schema: proj.schema.clone(),
|
||
},
|
||
vec![input_id],
|
||
executable,
|
||
);
|
||
Ok(node_id)
|
||
}
|
||
LogicalPlan::Filter(filter) => {
|
||
// Compile the input first
|
||
let input_id = self.compile_plan(&filter.input)?;
|
||
|
||
// Get column names from input schema
|
||
let input_schema = filter.input.schema();
|
||
let column_names: Vec<String> = input_schema.columns.iter()
|
||
.map(|(name, _)| name.clone())
|
||
.collect();
|
||
|
||
// Convert predicate to DBSP expression
|
||
let dbsp_predicate = Self::compile_expr(&filter.predicate)?;
|
||
|
||
// Convert to FilterPredicate
|
||
let filter_predicate = Self::compile_filter_predicate(&filter.predicate)?;
|
||
|
||
// Create executable operator
|
||
let executable: Box<dyn IncrementalOperator> =
|
||
Box::new(FilterOperator::new(filter_predicate, column_names));
|
||
|
||
// Create filter node
|
||
let node_id = self.circuit.add_node(
|
||
DbspOperator::Filter { predicate: dbsp_predicate },
|
||
vec![input_id],
|
||
Some(executable),
|
||
);
|
||
Ok(node_id)
|
||
}
|
||
LogicalPlan::Aggregate(agg) => {
|
||
// Compile the input first
|
||
let input_id = self.compile_plan(&agg.input)?;
|
||
|
||
// Get input column names
|
||
let input_schema = agg.input.schema();
|
||
let input_column_names: Vec<String> = input_schema.columns.iter()
|
||
.map(|(name, _)| name.clone())
|
||
.collect();
|
||
|
||
// Compile group by expressions to column names
|
||
let mut group_by_columns = Vec::new();
|
||
let mut dbsp_group_exprs = Vec::new();
|
||
for expr in &agg.group_expr {
|
||
// For now, only support simple column references in GROUP BY
|
||
if let LogicalExpr::Column(col) = expr {
|
||
group_by_columns.push(col.name.clone());
|
||
dbsp_group_exprs.push(DbspExpr::Column(col.name.clone()));
|
||
} else {
|
||
return Err(LimboError::ParseError(
|
||
"Only column references are supported in GROUP BY for incremental views".to_string()
|
||
));
|
||
}
|
||
}
|
||
|
||
// Compile aggregate expressions
|
||
let mut aggregate_functions = Vec::new();
|
||
for expr in &agg.aggr_expr {
|
||
if let LogicalExpr::AggregateFunction { fun, args, .. } = expr {
|
||
use crate::function::AggFunc;
|
||
use crate::incremental::operator::AggregateFunction;
|
||
|
||
let agg_fn = match fun {
|
||
AggFunc::Count | AggFunc::Count0 => {
|
||
AggregateFunction::Count
|
||
}
|
||
AggFunc::Sum => {
|
||
if args.is_empty() {
|
||
return Err(LimboError::ParseError("SUM requires an argument".to_string()));
|
||
}
|
||
// Extract column name from the argument
|
||
if let LogicalExpr::Column(col) = &args[0] {
|
||
AggregateFunction::Sum(col.name.clone())
|
||
} else {
|
||
return Err(LimboError::ParseError(
|
||
"Only column references are supported in aggregate functions for incremental views".to_string()
|
||
));
|
||
}
|
||
}
|
||
AggFunc::Avg => {
|
||
if args.is_empty() {
|
||
return Err(LimboError::ParseError("AVG requires an argument".to_string()));
|
||
}
|
||
if let LogicalExpr::Column(col) = &args[0] {
|
||
AggregateFunction::Avg(col.name.clone())
|
||
} else {
|
||
return Err(LimboError::ParseError(
|
||
"Only column references are supported in aggregate functions for incremental views".to_string()
|
||
));
|
||
}
|
||
}
|
||
// MIN and MAX are not supported in incremental views due to storage overhead.
|
||
// To correctly handle deletions, these operators would need to track all values
|
||
// in each group, resulting in O(n) storage overhead. This is prohibitive for
|
||
// large datasets. Alternative approaches like maintaining sorted indexes still
|
||
// require O(n) storage. Until a more efficient solution is found, MIN/MAX
|
||
// aggregations are not supported in materialized views.
|
||
AggFunc::Min => {
|
||
return Err(LimboError::ParseError(
|
||
"MIN aggregation is not supported in incremental materialized views due to O(n) storage overhead required for handling deletions".to_string()
|
||
));
|
||
}
|
||
AggFunc::Max => {
|
||
return Err(LimboError::ParseError(
|
||
"MAX aggregation is not supported in incremental materialized views due to O(n) storage overhead required for handling deletions".to_string()
|
||
));
|
||
}
|
||
_ => {
|
||
return Err(LimboError::ParseError(
|
||
format!("Unsupported aggregate function in DBSP compiler: {fun:?}")
|
||
));
|
||
}
|
||
};
|
||
aggregate_functions.push(agg_fn);
|
||
} else {
|
||
return Err(LimboError::ParseError(
|
||
"Expected aggregate function in aggregate expressions".to_string()
|
||
));
|
||
}
|
||
}
|
||
|
||
// Create the AggregateOperator
|
||
use crate::incremental::operator::AggregateOperator;
|
||
let executable: Option<Box<dyn IncrementalOperator>> = Some(
|
||
Box::new(AggregateOperator::new(
|
||
group_by_columns,
|
||
aggregate_functions.clone(),
|
||
input_column_names,
|
||
))
|
||
);
|
||
|
||
// Create aggregate node
|
||
let node_id = self.circuit.add_node(
|
||
DbspOperator::Aggregate {
|
||
group_exprs: dbsp_group_exprs,
|
||
aggr_exprs: aggregate_functions,
|
||
schema: agg.schema.clone(),
|
||
},
|
||
vec![input_id],
|
||
executable,
|
||
);
|
||
Ok(node_id)
|
||
}
|
||
LogicalPlan::TableScan(scan) => {
|
||
// Create input node (no executable needed for input)
|
||
let node_id = self.circuit.add_node(
|
||
DbspOperator::Input {
|
||
name: scan.table_name.clone(),
|
||
schema: scan.schema.clone(),
|
||
},
|
||
vec![],
|
||
None,
|
||
);
|
||
Ok(node_id)
|
||
}
|
||
_ => Err(LimboError::ParseError(
|
||
format!("Unsupported operator in DBSP compiler: only Filter, Projection and Aggregate are supported, got: {:?}",
|
||
match plan {
|
||
LogicalPlan::Sort(_) => "Sort",
|
||
LogicalPlan::Limit(_) => "Limit",
|
||
LogicalPlan::Union(_) => "Union",
|
||
LogicalPlan::Distinct(_) => "Distinct",
|
||
LogicalPlan::EmptyRelation(_) => "EmptyRelation",
|
||
LogicalPlan::Values(_) => "Values",
|
||
LogicalPlan::WithCTE(_) => "WithCTE",
|
||
LogicalPlan::CTERef(_) => "CTERef",
|
||
_ => "Unknown",
|
||
}
|
||
)
|
||
)),
|
||
}
|
||
}
|
||
|
||
/// Convert a logical expression to a DBSP expression
|
||
fn compile_expr(expr: &LogicalExpr) -> Result<DbspExpr> {
|
||
match expr {
|
||
LogicalExpr::Column(col) => Ok(DbspExpr::Column(col.name.clone())),
|
||
|
||
LogicalExpr::Literal(val) => Ok(DbspExpr::Literal(val.clone())),
|
||
|
||
LogicalExpr::BinaryExpr { left, op, right } => {
|
||
let left_expr = Self::compile_expr(left)?;
|
||
let right_expr = Self::compile_expr(right)?;
|
||
|
||
Ok(DbspExpr::BinaryExpr {
|
||
left: Box::new(left_expr),
|
||
op: *op,
|
||
right: Box::new(right_expr),
|
||
})
|
||
}
|
||
|
||
LogicalExpr::Alias { expr, .. } => {
|
||
// For aliases, compile the underlying expression
|
||
Self::compile_expr(expr)
|
||
}
|
||
|
||
// For complex expressions (functions, etc), we can't represent them as DbspExpr
|
||
// but that's OK - they'll be handled by the ProjectOperator's VDBE compilation
|
||
// For now, just use a placeholder
|
||
_ => {
|
||
// Use a literal null as placeholder - the actual execution will use the compiled VDBE
|
||
Ok(DbspExpr::Literal(Value::Null))
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Compile a logical expression to a CompiledExpression and optional alias
|
||
fn compile_expression(
|
||
expr: &LogicalExpr,
|
||
input_column_names: &[String],
|
||
) -> Result<(CompiledExpression, Option<String>)> {
|
||
// Check for alias first
|
||
if let LogicalExpr::Alias { expr, alias } = expr {
|
||
// For aliases, compile the underlying expression and return with alias
|
||
let (compiled, _) = Self::compile_expression(expr, input_column_names)?;
|
||
return Ok((compiled, Some(alias.clone())));
|
||
}
|
||
|
||
// Convert LogicalExpr to AST Expr
|
||
let ast_expr = Self::logical_to_ast_expr(expr)?;
|
||
|
||
// For all expressions (simple or complex), use CompiledExpression::compile
|
||
// This handles both trivial cases and complex VDBE compilation
|
||
// We need to set up the necessary context
|
||
use crate::{Database, MemoryIO, SymbolTable};
|
||
use std::sync::Arc;
|
||
|
||
// Create an internal connection for expression compilation
|
||
let io = Arc::new(MemoryIO::new());
|
||
let db = Database::open_file(io, ":memory:", false, false)?;
|
||
let internal_conn = db.connect()?;
|
||
internal_conn.query_only.set(true);
|
||
internal_conn.auto_commit.set(false);
|
||
|
||
// Create temporary symbol table
|
||
let temp_syms = SymbolTable::new();
|
||
|
||
// Get a minimal schema for compilation (we don't need the full schema for expressions)
|
||
let schema = crate::schema::Schema::new(false);
|
||
|
||
// Compile the expression using the existing CompiledExpression::compile
|
||
let compiled = CompiledExpression::compile(
|
||
&ast_expr,
|
||
input_column_names,
|
||
&schema,
|
||
&temp_syms,
|
||
internal_conn,
|
||
)?;
|
||
|
||
Ok((compiled, None))
|
||
}
|
||
|
||
/// Convert LogicalExpr to AST Expr
|
||
fn logical_to_ast_expr(expr: &LogicalExpr) -> Result<turso_parser::ast::Expr> {
|
||
use turso_parser::ast;
|
||
|
||
match expr {
|
||
LogicalExpr::Column(col) => Ok(ast::Expr::Id(ast::Name::Ident(col.name.clone()))),
|
||
LogicalExpr::Literal(val) => {
|
||
let lit = match val {
|
||
Value::Integer(i) => ast::Literal::Numeric(i.to_string()),
|
||
Value::Float(f) => ast::Literal::Numeric(f.to_string()),
|
||
Value::Text(t) => ast::Literal::String(t.to_string()),
|
||
Value::Blob(b) => ast::Literal::Blob(format!("{b:?}")),
|
||
Value::Null => ast::Literal::Null,
|
||
};
|
||
Ok(ast::Expr::Literal(lit))
|
||
}
|
||
LogicalExpr::BinaryExpr { left, op, right } => {
|
||
let left_expr = Self::logical_to_ast_expr(left)?;
|
||
let right_expr = Self::logical_to_ast_expr(right)?;
|
||
Ok(ast::Expr::Binary(
|
||
Box::new(left_expr),
|
||
*op,
|
||
Box::new(right_expr),
|
||
))
|
||
}
|
||
LogicalExpr::ScalarFunction { fun, args } => {
|
||
let ast_args: Result<Vec<_>> = args.iter().map(Self::logical_to_ast_expr).collect();
|
||
let ast_args: Vec<Box<ast::Expr>> = ast_args?.into_iter().map(Box::new).collect();
|
||
Ok(ast::Expr::FunctionCall {
|
||
name: ast::Name::Ident(fun.clone()),
|
||
distinctness: None,
|
||
args: ast_args,
|
||
order_by: Vec::new(),
|
||
filter_over: ast::FunctionTail {
|
||
filter_clause: None,
|
||
over_clause: None,
|
||
},
|
||
})
|
||
}
|
||
LogicalExpr::Alias { expr, .. } => {
|
||
// For conversion to AST, ignore the alias and convert the inner expression
|
||
Self::logical_to_ast_expr(expr)
|
||
}
|
||
LogicalExpr::AggregateFunction {
|
||
fun,
|
||
args,
|
||
distinct,
|
||
} => {
|
||
// Convert aggregate function to AST
|
||
let ast_args: Result<Vec<_>> = args.iter().map(Self::logical_to_ast_expr).collect();
|
||
let ast_args: Vec<Box<ast::Expr>> = ast_args?.into_iter().map(Box::new).collect();
|
||
|
||
// Get the function name based on the aggregate type
|
||
let func_name = match fun {
|
||
crate::function::AggFunc::Count => "COUNT",
|
||
crate::function::AggFunc::Sum => "SUM",
|
||
crate::function::AggFunc::Avg => "AVG",
|
||
crate::function::AggFunc::Min => "MIN",
|
||
crate::function::AggFunc::Max => "MAX",
|
||
_ => {
|
||
return Err(LimboError::ParseError(format!(
|
||
"Unsupported aggregate function: {fun:?}"
|
||
)))
|
||
}
|
||
};
|
||
|
||
Ok(ast::Expr::FunctionCall {
|
||
name: ast::Name::Ident(func_name.to_string()),
|
||
distinctness: if *distinct {
|
||
Some(ast::Distinctness::Distinct)
|
||
} else {
|
||
None
|
||
},
|
||
args: ast_args,
|
||
order_by: Vec::new(),
|
||
filter_over: ast::FunctionTail {
|
||
filter_clause: None,
|
||
over_clause: None,
|
||
},
|
||
})
|
||
}
|
||
_ => Err(LimboError::ParseError(format!(
|
||
"Cannot convert LogicalExpr to AST Expr: {expr:?}"
|
||
))),
|
||
}
|
||
}
|
||
|
||
/// Compile a logical expression to a FilterPredicate for execution
|
||
fn compile_filter_predicate(expr: &LogicalExpr) -> Result<FilterPredicate> {
|
||
match expr {
|
||
LogicalExpr::BinaryExpr { left, op, right } => {
|
||
// Extract column name and value for simple predicates
|
||
if let (LogicalExpr::Column(col), LogicalExpr::Literal(val)) =
|
||
(left.as_ref(), right.as_ref())
|
||
{
|
||
match op {
|
||
BinaryOperator::Equals => Ok(FilterPredicate::Equals {
|
||
column: col.name.clone(),
|
||
value: val.clone(),
|
||
}),
|
||
BinaryOperator::NotEquals => Ok(FilterPredicate::NotEquals {
|
||
column: col.name.clone(),
|
||
value: val.clone(),
|
||
}),
|
||
BinaryOperator::Greater => Ok(FilterPredicate::GreaterThan {
|
||
column: col.name.clone(),
|
||
value: val.clone(),
|
||
}),
|
||
BinaryOperator::GreaterEquals => Ok(FilterPredicate::GreaterThanOrEqual {
|
||
column: col.name.clone(),
|
||
value: val.clone(),
|
||
}),
|
||
BinaryOperator::Less => Ok(FilterPredicate::LessThan {
|
||
column: col.name.clone(),
|
||
value: val.clone(),
|
||
}),
|
||
BinaryOperator::LessEquals => Ok(FilterPredicate::LessThanOrEqual {
|
||
column: col.name.clone(),
|
||
value: val.clone(),
|
||
}),
|
||
BinaryOperator::And => {
|
||
// Handle AND of two predicates
|
||
let left_pred = Self::compile_filter_predicate(left)?;
|
||
let right_pred = Self::compile_filter_predicate(right)?;
|
||
Ok(FilterPredicate::And(
|
||
Box::new(left_pred),
|
||
Box::new(right_pred),
|
||
))
|
||
}
|
||
BinaryOperator::Or => {
|
||
// Handle OR of two predicates
|
||
let left_pred = Self::compile_filter_predicate(left)?;
|
||
let right_pred = Self::compile_filter_predicate(right)?;
|
||
Ok(FilterPredicate::Or(
|
||
Box::new(left_pred),
|
||
Box::new(right_pred),
|
||
))
|
||
}
|
||
_ => Err(LimboError::ParseError(format!(
|
||
"Unsupported operator in filter: {op:?}"
|
||
))),
|
||
}
|
||
} else if matches!(op, BinaryOperator::And | BinaryOperator::Or) {
|
||
// Handle logical operators
|
||
let left_pred = Self::compile_filter_predicate(left)?;
|
||
let right_pred = Self::compile_filter_predicate(right)?;
|
||
match op {
|
||
BinaryOperator::And => Ok(FilterPredicate::And(
|
||
Box::new(left_pred),
|
||
Box::new(right_pred),
|
||
)),
|
||
BinaryOperator::Or => Ok(FilterPredicate::Or(
|
||
Box::new(left_pred),
|
||
Box::new(right_pred),
|
||
)),
|
||
_ => unreachable!(),
|
||
}
|
||
} else {
|
||
Err(LimboError::ParseError(
|
||
"Filter predicate must be column op value".to_string(),
|
||
))
|
||
}
|
||
}
|
||
_ => Err(LimboError::ParseError(format!(
|
||
"Unsupported filter expression: {expr:?}"
|
||
))),
|
||
}
|
||
}
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
use crate::incremental::operator::{Delta, FilterOperator, FilterPredicate};
|
||
use crate::schema::{BTreeTable, Column as SchemaColumn, Schema, Type};
|
||
use crate::translate::logical::LogicalPlanBuilder;
|
||
use crate::translate::logical::LogicalSchema;
|
||
use std::sync::Arc;
|
||
use turso_parser::ast;
|
||
use turso_parser::parser::Parser;
|
||
|
||
// Macro to create a test schema with a users table
|
||
macro_rules! test_schema {
|
||
() => {{
|
||
let mut schema = Schema::new(false);
|
||
let users_table = BTreeTable {
|
||
name: "users".to_string(),
|
||
root_page: 2,
|
||
primary_key_columns: vec![("id".to_string(), turso_parser::ast::SortOrder::Asc)],
|
||
columns: vec![
|
||
SchemaColumn {
|
||
name: Some("id".to_string()),
|
||
ty: Type::Integer,
|
||
ty_str: "INTEGER".to_string(),
|
||
primary_key: true,
|
||
is_rowid_alias: true,
|
||
notnull: true,
|
||
default: None,
|
||
unique: false,
|
||
collation: None,
|
||
hidden: false,
|
||
},
|
||
SchemaColumn {
|
||
name: Some("name".to_string()),
|
||
ty: Type::Text,
|
||
ty_str: "TEXT".to_string(),
|
||
primary_key: false,
|
||
is_rowid_alias: false,
|
||
notnull: false,
|
||
default: None,
|
||
unique: false,
|
||
collation: None,
|
||
hidden: false,
|
||
},
|
||
SchemaColumn {
|
||
name: Some("age".to_string()),
|
||
ty: Type::Integer,
|
||
ty_str: "INTEGER".to_string(),
|
||
primary_key: false,
|
||
is_rowid_alias: false,
|
||
notnull: false,
|
||
default: None,
|
||
unique: false,
|
||
collation: None,
|
||
hidden: false,
|
||
},
|
||
],
|
||
has_rowid: true,
|
||
is_strict: false,
|
||
unique_sets: None,
|
||
};
|
||
schema.add_btree_table(Arc::new(users_table));
|
||
schema
|
||
}};
|
||
}
|
||
|
||
// Macro to compile SQL to DBSP circuit
|
||
macro_rules! compile_sql {
|
||
($sql:expr) => {{
|
||
let schema = test_schema!();
|
||
let mut parser = Parser::new($sql.as_bytes());
|
||
let cmd = parser
|
||
.next()
|
||
.unwrap() // This returns Option<Result<Cmd, Error>>
|
||
.unwrap(); // This unwraps the Result
|
||
|
||
match cmd {
|
||
ast::Cmd::Stmt(stmt) => {
|
||
let mut builder = LogicalPlanBuilder::new(&schema);
|
||
let logical_plan = builder.build_statement(&stmt).unwrap();
|
||
DbspCompiler::new().compile(&logical_plan).unwrap()
|
||
}
|
||
_ => panic!("Only SQL statements are supported"),
|
||
}
|
||
}};
|
||
}
|
||
|
||
// Macro to assert circuit structure
|
||
macro_rules! assert_circuit {
|
||
($circuit:expr, depth: $depth:expr, root: $root_type:ident) => {
|
||
assert_eq!($circuit.nodes.len(), $depth);
|
||
let node = get_node_at_level(&$circuit, 0);
|
||
assert!(matches!(node.operator, DbspOperator::$root_type { .. }));
|
||
};
|
||
}
|
||
|
||
// Macro to assert operator properties
|
||
macro_rules! assert_operator {
|
||
($circuit:expr, $level:expr, Input { name: $name:expr }) => {{
|
||
let node = get_node_at_level(&$circuit, $level);
|
||
match &node.operator {
|
||
DbspOperator::Input { name, .. } => assert_eq!(name, $name),
|
||
_ => panic!("Expected Input operator at level {}", $level),
|
||
}
|
||
}};
|
||
($circuit:expr, $level:expr, Filter) => {{
|
||
let node = get_node_at_level(&$circuit, $level);
|
||
assert!(matches!(node.operator, DbspOperator::Filter { .. }));
|
||
}};
|
||
($circuit:expr, $level:expr, Projection { columns: [$($col:expr),*] }) => {{
|
||
let node = get_node_at_level(&$circuit, $level);
|
||
match &node.operator {
|
||
DbspOperator::Projection { exprs, .. } => {
|
||
let expected_cols = vec![$($col),*];
|
||
let actual_cols: Vec<String> = exprs.iter().map(|e| {
|
||
match e {
|
||
DbspExpr::Column(name) => name.clone(),
|
||
_ => "expr".to_string(),
|
||
}
|
||
}).collect();
|
||
assert_eq!(actual_cols, expected_cols);
|
||
}
|
||
_ => panic!("Expected Projection operator at level {}", $level),
|
||
}
|
||
}};
|
||
}
|
||
|
||
// Macro to assert filter predicate
|
||
macro_rules! assert_filter_predicate {
|
||
($circuit:expr, $level:expr, $col:literal > $val:literal) => {{
|
||
let node = get_node_at_level(&$circuit, $level);
|
||
match &node.operator {
|
||
DbspOperator::Filter { predicate } => match predicate {
|
||
DbspExpr::BinaryExpr { left, op, right } => {
|
||
assert!(matches!(op, ast::Operator::Greater));
|
||
assert!(matches!(&**left, DbspExpr::Column(name) if name == $col));
|
||
assert!(matches!(&**right, DbspExpr::Literal(Value::Integer($val))));
|
||
}
|
||
_ => panic!("Expected binary expression in filter"),
|
||
},
|
||
_ => panic!("Expected Filter operator at level {}", $level),
|
||
}
|
||
}};
|
||
($circuit:expr, $level:expr, $col:literal < $val:literal) => {{
|
||
let node = get_node_at_level(&$circuit, $level);
|
||
match &node.operator {
|
||
DbspOperator::Filter { predicate } => match predicate {
|
||
DbspExpr::BinaryExpr { left, op, right } => {
|
||
assert!(matches!(op, ast::Operator::Less));
|
||
assert!(matches!(&**left, DbspExpr::Column(name) if name == $col));
|
||
assert!(matches!(&**right, DbspExpr::Literal(Value::Integer($val))));
|
||
}
|
||
_ => panic!("Expected binary expression in filter"),
|
||
},
|
||
_ => panic!("Expected Filter operator at level {}", $level),
|
||
}
|
||
}};
|
||
($circuit:expr, $level:expr, $col:literal = $val:literal) => {{
|
||
let node = get_node_at_level(&$circuit, $level);
|
||
match &node.operator {
|
||
DbspOperator::Filter { predicate } => match predicate {
|
||
DbspExpr::BinaryExpr { left, op, right } => {
|
||
assert!(matches!(op, ast::Operator::Equals));
|
||
assert!(matches!(&**left, DbspExpr::Column(name) if name == $col));
|
||
assert!(matches!(&**right, DbspExpr::Literal(Value::Integer($val))));
|
||
}
|
||
_ => panic!("Expected binary expression in filter"),
|
||
},
|
||
_ => panic!("Expected Filter operator at level {}", $level),
|
||
}
|
||
}};
|
||
}
|
||
|
||
// Helper to get node at specific level from root
|
||
fn get_node_at_level(circuit: &DbspCircuit, level: usize) -> &DbspNode {
|
||
let mut current_id = circuit.root.expect("Circuit has no root");
|
||
for _ in 0..level {
|
||
let node = circuit.nodes.get(¤t_id).expect("Node not found");
|
||
if node.inputs.is_empty() {
|
||
panic!("No more levels available, requested level {level}");
|
||
}
|
||
current_id = node.inputs[0];
|
||
}
|
||
circuit.nodes.get(¤t_id).expect("Node not found")
|
||
}
|
||
|
||
// Helper to get the current accumulated state of the circuit (from the root operator)
|
||
// This returns the internal state including bookkeeping entries
|
||
fn get_current_state(circuit: &DbspCircuit) -> Result<Delta> {
|
||
if let Some(root_id) = circuit.root {
|
||
let node = circuit
|
||
.nodes
|
||
.get(&root_id)
|
||
.ok_or_else(|| LimboError::ParseError("Root node not found".to_string()))?;
|
||
|
||
if let Some(ref executable) = node.executable {
|
||
Ok(executable.get_current_state())
|
||
} else {
|
||
// Input nodes don't have executables but also don't have state
|
||
Ok(Delta::new())
|
||
}
|
||
} else {
|
||
Err(LimboError::ParseError(
|
||
"Circuit has no root node".to_string(),
|
||
))
|
||
}
|
||
}
|
||
|
||
// Helper to create a DeltaSet from a HashMap (for tests)
|
||
fn delta_set_from_map(map: HashMap<String, Delta>) -> DeltaSet {
|
||
let mut delta_set = DeltaSet::new();
|
||
for (key, value) in map {
|
||
delta_set.insert(key, value);
|
||
}
|
||
delta_set
|
||
}
|
||
|
||
#[test]
|
||
fn test_simple_projection() {
|
||
let circuit = compile_sql!("SELECT name FROM users");
|
||
|
||
// Circuit has 2 nodes with Projection at root
|
||
assert_circuit!(circuit, depth: 2, root: Projection);
|
||
|
||
// Verify operators at each level
|
||
assert_operator!(circuit, 0, Projection { columns: ["name"] });
|
||
assert_operator!(circuit, 1, Input { name: "users" });
|
||
}
|
||
|
||
#[test]
|
||
fn test_filter_with_projection() {
|
||
let circuit = compile_sql!("SELECT name FROM users WHERE age > 18");
|
||
|
||
// Circuit has 3 nodes with Projection at root
|
||
assert_circuit!(circuit, depth: 3, root: Projection);
|
||
|
||
// Verify operators at each level
|
||
assert_operator!(circuit, 0, Projection { columns: ["name"] });
|
||
assert_operator!(circuit, 1, Filter);
|
||
assert_filter_predicate!(circuit, 1, "age" > 18);
|
||
assert_operator!(circuit, 2, Input { name: "users" });
|
||
}
|
||
|
||
#[test]
|
||
fn test_select_star() {
|
||
let mut circuit = compile_sql!("SELECT * FROM users");
|
||
|
||
// Create test data
|
||
let mut input_delta = Delta::new();
|
||
input_delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(17),
|
||
],
|
||
);
|
||
|
||
// Create input map
|
||
let mut inputs = HashMap::new();
|
||
inputs.insert("users".to_string(), input_delta);
|
||
|
||
// Initialize circuit with initial data
|
||
let result = circuit.initialize(inputs).unwrap();
|
||
|
||
// Should have all rows with all columns
|
||
assert_eq!(result.changes.len(), 2);
|
||
|
||
// Verify both rows are present with all columns
|
||
for (row, weight) in &result.changes {
|
||
assert_eq!(*weight, 1);
|
||
assert_eq!(row.values.len(), 3); // id, name, age
|
||
}
|
||
}
|
||
|
||
#[test]
|
||
fn test_execute_filter() {
|
||
let mut circuit = compile_sql!("SELECT * FROM users WHERE age > 18");
|
||
|
||
// Create test data
|
||
let mut input_delta = Delta::new();
|
||
input_delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(17),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
3,
|
||
vec![
|
||
Value::Integer(3),
|
||
Value::Text("Charlie".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
|
||
// Create input map
|
||
let mut inputs = HashMap::new();
|
||
inputs.insert("users".to_string(), input_delta);
|
||
|
||
// Initialize circuit with initial data
|
||
let result = circuit.initialize(inputs).unwrap();
|
||
|
||
// Should only have Alice and Charlie (age > 18)
|
||
assert_eq!(
|
||
result.changes.len(),
|
||
2,
|
||
"Expected 2 rows after filtering, got {}",
|
||
result.changes.len()
|
||
);
|
||
|
||
// Check that the filtered rows are correct
|
||
let names: Vec<String> = result
|
||
.changes
|
||
.iter()
|
||
.filter_map(|(row, weight)| {
|
||
if *weight > 0 && row.values.len() > 1 {
|
||
if let Value::Text(name) = &row.values[1] {
|
||
Some(name.to_string())
|
||
} else {
|
||
None
|
||
}
|
||
} else {
|
||
None
|
||
}
|
||
})
|
||
.collect();
|
||
|
||
assert!(
|
||
names.contains(&"Alice".to_string()),
|
||
"Alice should be in results"
|
||
);
|
||
assert!(
|
||
names.contains(&"Charlie".to_string()),
|
||
"Charlie should be in results"
|
||
);
|
||
assert!(
|
||
!names.contains(&"Bob".to_string()),
|
||
"Bob should not be in results"
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn test_simple_column_projection() {
|
||
let mut circuit = compile_sql!("SELECT name, age FROM users");
|
||
|
||
// Create test data
|
||
let mut input_delta = Delta::new();
|
||
input_delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(17),
|
||
],
|
||
);
|
||
|
||
// Create input map
|
||
let mut inputs = HashMap::new();
|
||
inputs.insert("users".to_string(), input_delta);
|
||
|
||
// Initialize circuit with initial data
|
||
let result = circuit.initialize(inputs).unwrap();
|
||
|
||
// Should have all rows but only 2 columns (name, age)
|
||
assert_eq!(result.changes.len(), 2);
|
||
|
||
for (row, _) in &result.changes {
|
||
assert_eq!(row.values.len(), 2); // Only name and age
|
||
// First value should be name (Text)
|
||
assert!(matches!(&row.values[0], Value::Text(_)));
|
||
// Second value should be age (Integer)
|
||
assert!(matches!(&row.values[1], Value::Integer(_)));
|
||
}
|
||
}
|
||
|
||
#[test]
|
||
fn test_simple_aggregation() {
|
||
// Test COUNT(*) with GROUP BY
|
||
let mut circuit = compile_sql!("SELECT age, COUNT(*) FROM users GROUP BY age");
|
||
|
||
// Create test data
|
||
let mut input_delta = Delta::new();
|
||
input_delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
3,
|
||
vec![
|
||
Value::Integer(3),
|
||
Value::Text("Charlie".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
|
||
// Create input map
|
||
let mut inputs = HashMap::new();
|
||
inputs.insert("users".to_string(), input_delta);
|
||
|
||
// Initialize circuit with initial data
|
||
let result = circuit.initialize(inputs).unwrap();
|
||
|
||
// Should have 2 groups: age 25 with count 2, age 30 with count 1
|
||
assert_eq!(result.changes.len(), 2);
|
||
|
||
// Check the results
|
||
let mut found_25 = false;
|
||
let mut found_30 = false;
|
||
|
||
for (row, weight) in &result.changes {
|
||
assert_eq!(*weight, 1);
|
||
assert_eq!(row.values.len(), 2); // age, count
|
||
|
||
if let (Value::Integer(age), Value::Integer(count)) = (&row.values[0], &row.values[1]) {
|
||
if *age == 25 {
|
||
assert_eq!(*count, 2, "Age 25 should have count 2");
|
||
found_25 = true;
|
||
} else if *age == 30 {
|
||
assert_eq!(*count, 1, "Age 30 should have count 1");
|
||
found_30 = true;
|
||
}
|
||
}
|
||
}
|
||
|
||
assert!(found_25, "Should have group for age 25");
|
||
assert!(found_30, "Should have group for age 30");
|
||
}
|
||
|
||
#[test]
|
||
fn test_sum_aggregation() {
|
||
// Test SUM with GROUP BY
|
||
let mut circuit = compile_sql!("SELECT name, SUM(age) FROM users GROUP BY name");
|
||
|
||
// Create test data - some names appear multiple times
|
||
let mut input_delta = Delta::new();
|
||
input_delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
3,
|
||
vec![
|
||
Value::Integer(3),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(20),
|
||
],
|
||
);
|
||
|
||
// Create input map
|
||
let mut inputs = HashMap::new();
|
||
inputs.insert("users".to_string(), input_delta);
|
||
|
||
// Initialize circuit with initial data
|
||
let result = circuit.initialize(inputs).unwrap();
|
||
|
||
// Should have 2 groups: Alice with sum 55, Bob with sum 20
|
||
assert_eq!(result.changes.len(), 2);
|
||
|
||
for (row, weight) in &result.changes {
|
||
assert_eq!(*weight, 1);
|
||
assert_eq!(row.values.len(), 2); // name, sum
|
||
|
||
if let (Value::Text(name), Value::Float(sum)) = (&row.values[0], &row.values[1]) {
|
||
if name.as_str() == "Alice" {
|
||
assert_eq!(*sum, 55.0, "Alice should have sum 55");
|
||
} else if name.as_str() == "Bob" {
|
||
assert_eq!(*sum, 20.0, "Bob should have sum 20");
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
#[test]
|
||
fn test_aggregation_without_group_by() {
|
||
// Test aggregation without GROUP BY - should produce a single row
|
||
let mut circuit = compile_sql!("SELECT COUNT(*), SUM(age), AVG(age) FROM users");
|
||
|
||
// Create test data
|
||
let mut input_delta = Delta::new();
|
||
input_delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
3,
|
||
vec![
|
||
Value::Integer(3),
|
||
Value::Text("Charlie".into()),
|
||
Value::Integer(20),
|
||
],
|
||
);
|
||
|
||
// Create input map
|
||
let mut inputs = HashMap::new();
|
||
inputs.insert("users".to_string(), input_delta);
|
||
|
||
// Initialize circuit with initial data
|
||
let result = circuit.initialize(inputs).unwrap();
|
||
|
||
// Should have exactly 1 row with all aggregates
|
||
assert_eq!(
|
||
result.changes.len(),
|
||
1,
|
||
"Should have exactly one result row"
|
||
);
|
||
|
||
let (row, weight) = result.changes.first().unwrap();
|
||
assert_eq!(*weight, 1);
|
||
assert_eq!(row.values.len(), 3); // count, sum, avg
|
||
|
||
// Check aggregate results
|
||
// COUNT should be Integer
|
||
if let Value::Integer(count) = &row.values[0] {
|
||
assert_eq!(*count, 3, "COUNT(*) should be 3");
|
||
} else {
|
||
panic!("COUNT should be Integer, got {:?}", row.values[0]);
|
||
}
|
||
|
||
// SUM can be Integer (if whole number) or Float
|
||
match &row.values[1] {
|
||
Value::Integer(sum) => assert_eq!(*sum, 75, "SUM(age) should be 75"),
|
||
Value::Float(sum) => assert_eq!(*sum, 75.0, "SUM(age) should be 75.0"),
|
||
other => panic!("SUM should be Integer or Float, got {other:?}"),
|
||
}
|
||
|
||
// AVG should be Float
|
||
if let Value::Float(avg) = &row.values[2] {
|
||
assert_eq!(*avg, 25.0, "AVG(age) should be 25.0");
|
||
} else {
|
||
panic!("AVG should be Float, got {:?}", row.values[2]);
|
||
}
|
||
}
|
||
|
||
#[test]
|
||
fn test_expression_projection_execution() {
|
||
// Test that complex expressions work through VDBE compilation
|
||
let mut circuit = compile_sql!("SELECT hex(id) FROM users");
|
||
|
||
// Create test data
|
||
let mut input_delta = Delta::new();
|
||
input_delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(255),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(17),
|
||
],
|
||
);
|
||
|
||
// Create input map
|
||
let mut inputs = HashMap::new();
|
||
inputs.insert("users".to_string(), input_delta);
|
||
|
||
// Initialize circuit with initial data
|
||
let result = circuit.initialize(inputs).unwrap();
|
||
|
||
assert_eq!(result.changes.len(), 2);
|
||
|
||
let hex_values: HashMap<i64, String> = result
|
||
.changes
|
||
.iter()
|
||
.map(|(row, _)| {
|
||
let rowid = row.rowid;
|
||
if let Value::Text(text) = &row.values[0] {
|
||
(rowid, text.to_string())
|
||
} else {
|
||
panic!("Expected Text value for hex() result");
|
||
}
|
||
})
|
||
.collect();
|
||
|
||
assert_eq!(
|
||
hex_values.get(&1).unwrap(),
|
||
"31",
|
||
"hex(1) should return '31' (hex of ASCII '1')"
|
||
);
|
||
|
||
assert_eq!(
|
||
hex_values.get(&2).unwrap(),
|
||
"323535",
|
||
"hex(255) should return '323535' (hex of ASCII '2', '5', '5')"
|
||
);
|
||
}
|
||
|
||
// TODO: This test currently fails on incremental updates.
|
||
// The initial execution works correctly, but incremental updates produce
|
||
// incorrect results (3 changes instead of 2, with wrong values).
|
||
// This tests that the aggregate operator correctly handles incremental
|
||
// updates when it's sandwiched between projection operators.
|
||
#[test]
|
||
fn test_projection_aggregation_projection_pattern() {
|
||
// Test pattern: projection -> aggregation -> projection
|
||
// Query: SELECT HEX(SUM(age + 2)) FROM users
|
||
let mut circuit = compile_sql!("SELECT HEX(SUM(age + 2)) FROM users");
|
||
|
||
// Initial input data
|
||
let mut input_delta = Delta::new();
|
||
input_delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".to_string().into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".to_string().into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
3,
|
||
vec![
|
||
Value::Integer(3),
|
||
Value::Text("Charlie".to_string().into()),
|
||
Value::Integer(35),
|
||
],
|
||
);
|
||
|
||
let mut input_data = HashMap::new();
|
||
input_data.insert("users".to_string(), input_delta);
|
||
|
||
// Initialize the circuit with the initial data
|
||
let result = circuit.initialize(input_data).unwrap();
|
||
|
||
// Expected: SUM(age + 2) = (25+2) + (30+2) + (35+2) = 27 + 32 + 37 = 96
|
||
// HEX(96) should be the hex representation of the string "96" = "3936"
|
||
assert_eq!(result.changes.len(), 1);
|
||
let (row, _weight) = &result.changes[0];
|
||
assert_eq!(row.values.len(), 1);
|
||
|
||
// The hex function converts the number to string first, then to hex
|
||
// 96 as string is "96", which in hex is "3936" (hex of ASCII '9' and '6')
|
||
assert_eq!(
|
||
row.values[0],
|
||
Value::Text("3936".to_string().into()),
|
||
"HEX(SUM(age + 2)) should return '3936' for sum of 96"
|
||
);
|
||
|
||
// Test incremental update: add a new user
|
||
let mut input_delta = Delta::new();
|
||
input_delta.insert(
|
||
4,
|
||
vec![
|
||
Value::Integer(4),
|
||
Value::Text("David".to_string().into()),
|
||
Value::Integer(40),
|
||
],
|
||
);
|
||
|
||
let mut input_data = HashMap::new();
|
||
input_data.insert("users".to_string(), input_delta);
|
||
|
||
let result = circuit.execute(input_data, DeltaSet::empty()).unwrap();
|
||
|
||
// Expected: new SUM(age + 2) = 96 + (40+2) = 138
|
||
// HEX(138) = hex of "138" = "313338"
|
||
assert_eq!(result.changes.len(), 2);
|
||
|
||
// First change: remove old aggregate (96)
|
||
let (row, weight) = &result.changes[0];
|
||
assert_eq!(*weight, -1);
|
||
assert_eq!(row.values[0], Value::Text("3936".to_string().into()));
|
||
|
||
// Second change: add new aggregate (138)
|
||
let (row, weight) = &result.changes[1];
|
||
assert_eq!(*weight, 1);
|
||
assert_eq!(
|
||
row.values[0],
|
||
Value::Text("313338".to_string().into()),
|
||
"HEX(SUM(age + 2)) should return '313338' for sum of 138"
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn test_nested_projection_with_groupby() {
|
||
// Test pattern: projection -> aggregation with GROUP BY -> projection
|
||
// Query: SELECT name, HEX(SUM(age * 2)) FROM users GROUP BY name
|
||
let mut circuit = compile_sql!("SELECT name, HEX(SUM(age * 2)) FROM users GROUP BY name");
|
||
|
||
// Initial input data
|
||
let mut input_delta = Delta::new();
|
||
input_delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".to_string().into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".to_string().into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
input_delta.insert(
|
||
3,
|
||
vec![
|
||
Value::Integer(3),
|
||
Value::Text("Alice".to_string().into()),
|
||
Value::Integer(35),
|
||
],
|
||
);
|
||
|
||
let mut input_data = HashMap::new();
|
||
input_data.insert("users".to_string(), input_delta);
|
||
|
||
// Initialize circuit with initial data
|
||
let result = circuit.initialize(input_data).unwrap();
|
||
|
||
// Expected results:
|
||
// Alice: SUM(25*2 + 35*2) = 50 + 70 = 120, HEX("120") = "313230"
|
||
// Bob: SUM(30*2) = 60, HEX("60") = "3630"
|
||
assert_eq!(result.changes.len(), 2);
|
||
|
||
let results: HashMap<String, String> = result
|
||
.changes
|
||
.iter()
|
||
.map(|(row, _weight)| {
|
||
let name = match &row.values[0] {
|
||
Value::Text(t) => t.to_string(),
|
||
_ => panic!("Expected text for name"),
|
||
};
|
||
let hex_sum = match &row.values[1] {
|
||
Value::Text(t) => t.to_string(),
|
||
_ => panic!("Expected text for hex value"),
|
||
};
|
||
(name, hex_sum)
|
||
})
|
||
.collect();
|
||
|
||
assert_eq!(
|
||
results.get("Alice").unwrap(),
|
||
"313230",
|
||
"Alice's HEX(SUM(age * 2)) should be '313230' (120)"
|
||
);
|
||
assert_eq!(
|
||
results.get("Bob").unwrap(),
|
||
"3630",
|
||
"Bob's HEX(SUM(age * 2)) should be '3630' (60)"
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn test_transaction_context() {
|
||
// Test that uncommitted changes are visible within a transaction
|
||
// but don't affect the operator's internal state
|
||
let mut circuit = compile_sql!("SELECT * FROM users WHERE age > 18");
|
||
|
||
// Initialize with some data
|
||
let mut init_data = HashMap::new();
|
||
let mut delta = Delta::new();
|
||
delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(17),
|
||
],
|
||
);
|
||
init_data.insert("users".to_string(), delta);
|
||
|
||
circuit.initialize(init_data).unwrap();
|
||
|
||
// Verify initial state: only Alice (age > 18)
|
||
let state = get_current_state(&circuit).unwrap();
|
||
assert_eq!(state.changes.len(), 1);
|
||
assert_eq!(state.changes[0].0.values[1], Value::Text("Alice".into()));
|
||
|
||
// Create uncommitted changes that would be visible in a transaction
|
||
let mut uncommitted = HashMap::new();
|
||
let mut uncommitted_delta = Delta::new();
|
||
// Add Charlie (age 30) - should be visible in transaction
|
||
uncommitted_delta.insert(
|
||
3,
|
||
vec![
|
||
Value::Integer(3),
|
||
Value::Text("Charlie".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
// Add David (age 15) - should NOT be visible (filtered out)
|
||
uncommitted_delta.insert(
|
||
4,
|
||
vec![
|
||
Value::Integer(4),
|
||
Value::Text("David".into()),
|
||
Value::Integer(15),
|
||
],
|
||
);
|
||
uncommitted.insert("users".to_string(), uncommitted_delta);
|
||
|
||
// Execute with uncommitted data - this simulates processing the uncommitted changes
|
||
// through the circuit to see what would be visible
|
||
let tx_result = circuit
|
||
.execute(HashMap::new(), delta_set_from_map(uncommitted.clone()))
|
||
.unwrap();
|
||
|
||
// The result should show Charlie being added (passes filter, age > 18)
|
||
// David is filtered out (age 15 < 18)
|
||
assert_eq!(tx_result.changes.len(), 1, "Should see Charlie added");
|
||
assert_eq!(
|
||
tx_result.changes[0].0.values[1],
|
||
Value::Text("Charlie".into())
|
||
);
|
||
|
||
// Now actually commit Charlie (without uncommitted context)
|
||
let mut commit_data = HashMap::new();
|
||
let mut commit_delta = Delta::new();
|
||
commit_delta.insert(
|
||
3,
|
||
vec![
|
||
Value::Integer(3),
|
||
Value::Text("Charlie".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
commit_data.insert("users".to_string(), commit_delta);
|
||
|
||
let commit_result = circuit
|
||
.execute(commit_data.clone(), DeltaSet::empty())
|
||
.unwrap();
|
||
|
||
// The commit result should show Charlie being added
|
||
assert_eq!(commit_result.changes.len(), 1, "Should see Charlie added");
|
||
assert_eq!(
|
||
commit_result.changes[0].0.values[1],
|
||
Value::Text("Charlie".into())
|
||
);
|
||
|
||
// Commit the change to make it permanent
|
||
circuit.commit(commit_data).unwrap();
|
||
|
||
// Now if we execute again with no changes, we should see no delta
|
||
let empty_result = circuit.execute(HashMap::new(), DeltaSet::empty()).unwrap();
|
||
assert_eq!(empty_result.changes.len(), 0, "No changes when no new data");
|
||
}
|
||
|
||
#[test]
|
||
fn test_uncommitted_delete() {
|
||
// Test that uncommitted deletes are handled correctly without affecting operator state
|
||
let mut circuit = compile_sql!("SELECT * FROM users WHERE age > 18");
|
||
|
||
// Initialize with some data
|
||
let mut init_data = HashMap::new();
|
||
let mut delta = Delta::new();
|
||
delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
delta.insert(
|
||
3,
|
||
vec![
|
||
Value::Integer(3),
|
||
Value::Text("Charlie".into()),
|
||
Value::Integer(20),
|
||
],
|
||
);
|
||
init_data.insert("users".to_string(), delta);
|
||
|
||
circuit.initialize(init_data).unwrap();
|
||
|
||
// Verify initial state: Alice, Bob, Charlie (all age > 18)
|
||
let state = get_current_state(&circuit).unwrap();
|
||
assert_eq!(state.changes.len(), 3);
|
||
|
||
// Create uncommitted delete for Bob
|
||
let mut uncommitted = HashMap::new();
|
||
let mut uncommitted_delta = Delta::new();
|
||
uncommitted_delta.delete(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
uncommitted.insert("users".to_string(), uncommitted_delta);
|
||
|
||
// Execute with uncommitted delete
|
||
let tx_result = circuit
|
||
.execute(HashMap::new(), delta_set_from_map(uncommitted.clone()))
|
||
.unwrap();
|
||
|
||
// Result should show the deleted row that passed the filter
|
||
assert_eq!(
|
||
tx_result.changes.len(),
|
||
1,
|
||
"Should see the uncommitted delete"
|
||
);
|
||
|
||
// Verify operator's internal state is unchanged (still has all 3 users)
|
||
let state_after = get_current_state(&circuit).unwrap();
|
||
assert_eq!(
|
||
state_after.changes.len(),
|
||
3,
|
||
"Internal state should still have all 3 users"
|
||
);
|
||
|
||
// Now actually commit the delete
|
||
let mut commit_data = HashMap::new();
|
||
let mut commit_delta = Delta::new();
|
||
commit_delta.delete(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
commit_data.insert("users".to_string(), commit_delta);
|
||
|
||
let commit_result = circuit
|
||
.execute(commit_data.clone(), DeltaSet::empty())
|
||
.unwrap();
|
||
|
||
// Actually commit the delete to update operator state
|
||
circuit.commit(commit_data).unwrap();
|
||
|
||
// The commit result should show Bob being deleted
|
||
assert_eq!(commit_result.changes.len(), 1, "Should see Bob deleted");
|
||
assert_eq!(
|
||
commit_result.changes[0].1, -1,
|
||
"Delete should have weight -1"
|
||
);
|
||
assert_eq!(
|
||
commit_result.changes[0].0.values[1],
|
||
Value::Text("Bob".into())
|
||
);
|
||
|
||
// After commit, internal state should have only Alice and Charlie
|
||
let final_state = get_current_state(&circuit).unwrap();
|
||
assert_eq!(
|
||
final_state.changes.len(),
|
||
2,
|
||
"After commit, should have Alice and Charlie"
|
||
);
|
||
|
||
let names: Vec<String> = final_state
|
||
.changes
|
||
.iter()
|
||
.map(|(row, _)| {
|
||
if let Value::Text(name) = &row.values[1] {
|
||
name.to_string()
|
||
} else {
|
||
panic!("Expected text value");
|
||
}
|
||
})
|
||
.collect();
|
||
assert!(names.contains(&"Alice".to_string()));
|
||
assert!(names.contains(&"Charlie".to_string()));
|
||
assert!(!names.contains(&"Bob".to_string()));
|
||
}
|
||
|
||
#[test]
|
||
fn test_uncommitted_update() {
|
||
// Test that uncommitted updates (delete + insert) are handled correctly
|
||
let mut circuit = compile_sql!("SELECT * FROM users WHERE age > 18");
|
||
|
||
// Initialize with some data
|
||
let mut init_data = HashMap::new();
|
||
let mut delta = Delta::new();
|
||
delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(17),
|
||
],
|
||
); // Bob is 17, filtered out
|
||
init_data.insert("users".to_string(), delta);
|
||
|
||
circuit.initialize(init_data).unwrap();
|
||
|
||
// Create uncommitted update: Bob turns 19 (update from 17 to 19)
|
||
// This is modeled as delete + insert
|
||
let mut uncommitted = HashMap::new();
|
||
let mut uncommitted_delta = Delta::new();
|
||
uncommitted_delta.delete(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(17),
|
||
],
|
||
);
|
||
uncommitted_delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(19),
|
||
],
|
||
);
|
||
uncommitted.insert("users".to_string(), uncommitted_delta);
|
||
|
||
// Execute with uncommitted update
|
||
let tx_result = circuit
|
||
.execute(HashMap::new(), delta_set_from_map(uncommitted.clone()))
|
||
.unwrap();
|
||
|
||
// Bob should now appear in the result (age 19 > 18)
|
||
// Consolidate to see the final state
|
||
let mut final_result = tx_result;
|
||
final_result.consolidate();
|
||
|
||
assert_eq!(final_result.changes.len(), 1, "Bob should now be in view");
|
||
assert_eq!(
|
||
final_result.changes[0].0.values[1],
|
||
Value::Text("Bob".into())
|
||
);
|
||
assert_eq!(final_result.changes[0].0.values[2], Value::Integer(19));
|
||
|
||
// Now actually commit the update
|
||
let mut commit_data = HashMap::new();
|
||
let mut commit_delta = Delta::new();
|
||
commit_delta.delete(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(17),
|
||
],
|
||
);
|
||
commit_delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(19),
|
||
],
|
||
);
|
||
commit_data.insert("users".to_string(), commit_delta);
|
||
|
||
// Commit the update
|
||
circuit.commit(commit_data).unwrap();
|
||
|
||
// After committing, Bob should be in the view's state
|
||
let state = get_current_state(&circuit).unwrap();
|
||
let mut consolidated_state = state;
|
||
consolidated_state.consolidate();
|
||
|
||
// Should have both Alice and Bob now
|
||
assert_eq!(
|
||
consolidated_state.changes.len(),
|
||
2,
|
||
"Should have Alice and Bob"
|
||
);
|
||
|
||
let names: Vec<String> = consolidated_state
|
||
.changes
|
||
.iter()
|
||
.map(|(row, _)| {
|
||
if let Value::Text(name) = &row.values[1] {
|
||
name.as_str().to_string()
|
||
} else {
|
||
panic!("Expected text value");
|
||
}
|
||
})
|
||
.collect();
|
||
assert!(names.contains(&"Alice".to_string()));
|
||
assert!(names.contains(&"Bob".to_string()));
|
||
}
|
||
|
||
#[test]
|
||
fn test_uncommitted_filtered_delete() {
|
||
// Test deleting a row that doesn't pass the filter
|
||
let mut circuit = compile_sql!("SELECT * FROM users WHERE age > 18");
|
||
|
||
// Initialize with mixed data
|
||
let mut init_data = HashMap::new();
|
||
let mut delta = Delta::new();
|
||
delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(15),
|
||
],
|
||
); // Bob doesn't pass filter
|
||
init_data.insert("users".to_string(), delta);
|
||
|
||
circuit.initialize(init_data).unwrap();
|
||
|
||
// Create uncommitted delete for Bob (who isn't in the view because age=15)
|
||
let mut uncommitted = HashMap::new();
|
||
let mut uncommitted_delta = Delta::new();
|
||
uncommitted_delta.delete(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(15),
|
||
],
|
||
);
|
||
uncommitted.insert("users".to_string(), uncommitted_delta);
|
||
|
||
// Execute with uncommitted delete - should produce no output changes
|
||
let tx_result = circuit
|
||
.execute(HashMap::new(), delta_set_from_map(uncommitted))
|
||
.unwrap();
|
||
|
||
// Bob wasn't in the view, so deleting him produces no output
|
||
assert_eq!(
|
||
tx_result.changes.len(),
|
||
0,
|
||
"Deleting filtered row produces no changes"
|
||
);
|
||
|
||
// The view state should still only have Alice
|
||
let state = get_current_state(&circuit).unwrap();
|
||
assert_eq!(state.changes.len(), 1, "View still has only Alice");
|
||
assert_eq!(state.changes[0].0.values[1], Value::Text("Alice".into()));
|
||
}
|
||
|
||
#[test]
|
||
fn test_uncommitted_mixed_operations() {
|
||
// Test multiple uncommitted operations together
|
||
let mut circuit = compile_sql!("SELECT * FROM users WHERE age > 18");
|
||
|
||
// Initialize with some data
|
||
let mut init_data = HashMap::new();
|
||
let mut delta = Delta::new();
|
||
delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
init_data.insert("users".to_string(), delta);
|
||
|
||
circuit.initialize(init_data).unwrap();
|
||
|
||
// Verify initial state
|
||
let state = get_current_state(&circuit).unwrap();
|
||
assert_eq!(state.changes.len(), 2);
|
||
|
||
// Create uncommitted changes:
|
||
// - Delete Alice
|
||
// - Update Bob's age to 35
|
||
// - Insert Charlie (age 40)
|
||
// - Insert David (age 16, filtered out)
|
||
let mut uncommitted = HashMap::new();
|
||
let mut uncommitted_delta = Delta::new();
|
||
// Delete Alice
|
||
uncommitted_delta.delete(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
// Update Bob (delete + insert)
|
||
uncommitted_delta.delete(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
uncommitted_delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(35),
|
||
],
|
||
);
|
||
// Insert Charlie
|
||
uncommitted_delta.insert(
|
||
3,
|
||
vec![
|
||
Value::Integer(3),
|
||
Value::Text("Charlie".into()),
|
||
Value::Integer(40),
|
||
],
|
||
);
|
||
// Insert David (will be filtered)
|
||
uncommitted_delta.insert(
|
||
4,
|
||
vec![
|
||
Value::Integer(4),
|
||
Value::Text("David".into()),
|
||
Value::Integer(16),
|
||
],
|
||
);
|
||
uncommitted.insert("users".to_string(), uncommitted_delta);
|
||
|
||
// Execute with uncommitted changes
|
||
let tx_result = circuit
|
||
.execute(HashMap::new(), delta_set_from_map(uncommitted.clone()))
|
||
.unwrap();
|
||
|
||
// Result should show all changes: delete Alice, update Bob, insert Charlie and David
|
||
assert_eq!(
|
||
tx_result.changes.len(),
|
||
4,
|
||
"Should see all uncommitted mixed operations"
|
||
);
|
||
|
||
// Verify operator's internal state is unchanged
|
||
let state_after = get_current_state(&circuit).unwrap();
|
||
assert_eq!(state_after.changes.len(), 2, "Still has Alice and Bob");
|
||
|
||
// Commit all changes
|
||
let mut commit_data = HashMap::new();
|
||
let mut commit_delta = Delta::new();
|
||
commit_delta.delete(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
commit_delta.delete(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
commit_delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(35),
|
||
],
|
||
);
|
||
commit_delta.insert(
|
||
3,
|
||
vec![
|
||
Value::Integer(3),
|
||
Value::Text("Charlie".into()),
|
||
Value::Integer(40),
|
||
],
|
||
);
|
||
commit_delta.insert(
|
||
4,
|
||
vec![
|
||
Value::Integer(4),
|
||
Value::Text("David".into()),
|
||
Value::Integer(16),
|
||
],
|
||
);
|
||
commit_data.insert("users".to_string(), commit_delta);
|
||
|
||
let commit_result = circuit
|
||
.execute(commit_data.clone(), DeltaSet::empty())
|
||
.unwrap();
|
||
|
||
// Should see: Alice deleted, Bob deleted, Bob inserted, Charlie inserted
|
||
// (David filtered out)
|
||
assert_eq!(commit_result.changes.len(), 4, "Should see 4 changes");
|
||
|
||
// Actually commit the changes to update operator state
|
||
circuit.commit(commit_data).unwrap();
|
||
|
||
// After all commits, execute with no changes should return empty delta
|
||
let empty_result = circuit.execute(HashMap::new(), DeltaSet::empty()).unwrap();
|
||
assert_eq!(empty_result.changes.len(), 0, "No changes when no new data");
|
||
}
|
||
|
||
#[test]
|
||
fn test_uncommitted_aggregation() {
|
||
// Test that aggregations work correctly with uncommitted changes
|
||
// This tests the specific scenario where a transaction adds new data
|
||
// and we need to see correct aggregation results within the transaction
|
||
|
||
// Create a sales table schema for testing
|
||
let mut schema = Schema::new(false);
|
||
let sales_table = BTreeTable {
|
||
name: "sales".to_string(),
|
||
root_page: 2,
|
||
primary_key_columns: vec![],
|
||
columns: vec![
|
||
SchemaColumn {
|
||
name: Some("product_id".to_string()),
|
||
ty: Type::Integer,
|
||
ty_str: "INTEGER".to_string(),
|
||
primary_key: false,
|
||
is_rowid_alias: false,
|
||
notnull: false,
|
||
default: None,
|
||
unique: false,
|
||
collation: None,
|
||
hidden: false,
|
||
},
|
||
SchemaColumn {
|
||
name: Some("amount".to_string()),
|
||
ty: Type::Integer,
|
||
ty_str: "INTEGER".to_string(),
|
||
primary_key: false,
|
||
is_rowid_alias: false,
|
||
notnull: false,
|
||
default: None,
|
||
unique: false,
|
||
collation: None,
|
||
hidden: false,
|
||
},
|
||
],
|
||
has_rowid: true,
|
||
is_strict: false,
|
||
unique_sets: None,
|
||
};
|
||
schema.add_btree_table(Arc::new(sales_table));
|
||
|
||
// Parse and compile the aggregation query
|
||
let sql = "SELECT product_id, SUM(amount) as total, COUNT(*) as cnt FROM sales GROUP BY product_id";
|
||
let mut parser = Parser::new(sql.as_bytes());
|
||
let cmd = parser.next().unwrap().unwrap();
|
||
|
||
let mut circuit = match cmd {
|
||
ast::Cmd::Stmt(stmt) => {
|
||
let mut builder = LogicalPlanBuilder::new(&schema);
|
||
let logical_plan = builder.build_statement(&stmt).unwrap();
|
||
DbspCompiler::new().compile(&logical_plan).unwrap()
|
||
}
|
||
_ => panic!("Expected SQL statement"),
|
||
};
|
||
|
||
// Initialize with base data: (1, 100), (1, 200), (2, 150), (2, 250)
|
||
let mut init_data = HashMap::new();
|
||
let mut delta = Delta::new();
|
||
delta.insert(1, vec![Value::Integer(1), Value::Integer(100)]);
|
||
delta.insert(2, vec![Value::Integer(1), Value::Integer(200)]);
|
||
delta.insert(3, vec![Value::Integer(2), Value::Integer(150)]);
|
||
delta.insert(4, vec![Value::Integer(2), Value::Integer(250)]);
|
||
init_data.insert("sales".to_string(), delta);
|
||
|
||
circuit.initialize(init_data).unwrap();
|
||
|
||
// Verify initial state: product 1 total=300, product 2 total=400
|
||
let state = get_current_state(&circuit).unwrap();
|
||
assert_eq!(state.changes.len(), 2, "Should have 2 product groups");
|
||
|
||
// Build a map of product_id -> (total, count)
|
||
let initial_results: HashMap<i64, (i64, i64)> = state
|
||
.changes
|
||
.iter()
|
||
.map(|(row, _)| {
|
||
// SUM might return Integer or Float, COUNT returns Integer
|
||
let product_id = match &row.values[0] {
|
||
Value::Integer(id) => *id,
|
||
_ => panic!("Product ID should be Integer, got {:?}", row.values[0]),
|
||
};
|
||
|
||
let total = match &row.values[1] {
|
||
Value::Integer(t) => *t,
|
||
Value::Float(t) => *t as i64,
|
||
_ => panic!("Total should be numeric, got {:?}", row.values[1]),
|
||
};
|
||
|
||
let count = match &row.values[2] {
|
||
Value::Integer(c) => *c,
|
||
_ => panic!("Count should be Integer, got {:?}", row.values[2]),
|
||
};
|
||
|
||
(product_id, (total, count))
|
||
})
|
||
.collect();
|
||
|
||
assert_eq!(
|
||
initial_results.get(&1).unwrap(),
|
||
&(300, 2),
|
||
"Product 1 should have total=300, count=2"
|
||
);
|
||
assert_eq!(
|
||
initial_results.get(&2).unwrap(),
|
||
&(400, 2),
|
||
"Product 2 should have total=400, count=2"
|
||
);
|
||
|
||
// Create uncommitted changes: INSERT (1, 50), (3, 300)
|
||
let mut uncommitted = HashMap::new();
|
||
let mut uncommitted_delta = Delta::new();
|
||
uncommitted_delta.insert(5, vec![Value::Integer(1), Value::Integer(50)]); // Add to product 1
|
||
uncommitted_delta.insert(6, vec![Value::Integer(3), Value::Integer(300)]); // New product 3
|
||
uncommitted.insert("sales".to_string(), uncommitted_delta);
|
||
|
||
// Execute with uncommitted data - simulating a read within transaction
|
||
let tx_result = circuit
|
||
.execute(HashMap::new(), delta_set_from_map(uncommitted.clone()))
|
||
.unwrap();
|
||
|
||
// Result should show the aggregate changes from uncommitted data
|
||
// Product 1: retraction of (300, 2) and insertion of (350, 3)
|
||
// Product 3: insertion of (300, 1) - new product
|
||
assert_eq!(
|
||
tx_result.changes.len(),
|
||
3,
|
||
"Should see aggregate changes from uncommitted data"
|
||
);
|
||
|
||
// IMPORTANT: Verify operator's internal state is unchanged
|
||
let state_after = get_current_state(&circuit).unwrap();
|
||
assert_eq!(
|
||
state_after.changes.len(),
|
||
2,
|
||
"Internal state should still have 2 groups"
|
||
);
|
||
|
||
// Verify the internal state still has original values
|
||
let state_results: HashMap<i64, (i64, i64)> = state_after
|
||
.changes
|
||
.iter()
|
||
.map(|(row, _)| {
|
||
let product_id = match &row.values[0] {
|
||
Value::Integer(id) => *id,
|
||
_ => panic!("Product ID should be Integer"),
|
||
};
|
||
|
||
let total = match &row.values[1] {
|
||
Value::Integer(t) => *t,
|
||
Value::Float(t) => *t as i64,
|
||
_ => panic!("Total should be numeric"),
|
||
};
|
||
|
||
let count = match &row.values[2] {
|
||
Value::Integer(c) => *c,
|
||
_ => panic!("Count should be Integer"),
|
||
};
|
||
|
||
(product_id, (total, count))
|
||
})
|
||
.collect();
|
||
|
||
assert_eq!(
|
||
state_results.get(&1).unwrap(),
|
||
&(300, 2),
|
||
"Product 1 unchanged"
|
||
);
|
||
assert_eq!(
|
||
state_results.get(&2).unwrap(),
|
||
&(400, 2),
|
||
"Product 2 unchanged"
|
||
);
|
||
assert!(
|
||
!state_results.contains_key(&3),
|
||
"Product 3 should not be in committed state"
|
||
);
|
||
|
||
// Now actually commit the changes
|
||
let mut commit_data = HashMap::new();
|
||
let mut commit_delta = Delta::new();
|
||
commit_delta.insert(5, vec![Value::Integer(1), Value::Integer(50)]);
|
||
commit_delta.insert(6, vec![Value::Integer(3), Value::Integer(300)]);
|
||
commit_data.insert("sales".to_string(), commit_delta);
|
||
|
||
let commit_result = circuit
|
||
.execute(commit_data.clone(), DeltaSet::empty())
|
||
.unwrap();
|
||
|
||
// Should see changes for product 1 (updated) and product 3 (new)
|
||
assert_eq!(
|
||
commit_result.changes.len(),
|
||
3,
|
||
"Should see 3 changes (delete old product 1, insert new product 1, insert product 3)"
|
||
);
|
||
|
||
// Actually commit the changes to update operator state
|
||
circuit.commit(commit_data).unwrap();
|
||
|
||
// After commit, verify final state
|
||
let final_state = get_current_state(&circuit).unwrap();
|
||
assert_eq!(
|
||
final_state.changes.len(),
|
||
3,
|
||
"Should have 3 product groups after commit"
|
||
);
|
||
|
||
let final_results: HashMap<i64, (i64, i64)> = final_state
|
||
.changes
|
||
.iter()
|
||
.map(|(row, _)| {
|
||
let product_id = match &row.values[0] {
|
||
Value::Integer(id) => *id,
|
||
_ => panic!("Product ID should be Integer"),
|
||
};
|
||
|
||
let total = match &row.values[1] {
|
||
Value::Integer(t) => *t,
|
||
Value::Float(t) => *t as i64,
|
||
_ => panic!("Total should be numeric"),
|
||
};
|
||
|
||
let count = match &row.values[2] {
|
||
Value::Integer(c) => *c,
|
||
_ => panic!("Count should be Integer"),
|
||
};
|
||
|
||
(product_id, (total, count))
|
||
})
|
||
.collect();
|
||
|
||
assert_eq!(
|
||
final_results.get(&1).unwrap(),
|
||
&(350, 3),
|
||
"Product 1 should have total=350, count=3"
|
||
);
|
||
assert_eq!(
|
||
final_results.get(&2).unwrap(),
|
||
&(400, 2),
|
||
"Product 2 should have total=400, count=2"
|
||
);
|
||
assert_eq!(
|
||
final_results.get(&3).unwrap(),
|
||
&(300, 1),
|
||
"Product 3 should have total=300, count=1"
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn test_uncommitted_data_visible_in_transaction() {
|
||
// Test that uncommitted INSERTs are visible within the same transaction
|
||
// This simulates: BEGIN; INSERT ...; SELECT * FROM view; COMMIT;
|
||
|
||
let mut circuit = compile_sql!("SELECT * FROM users WHERE age > 18");
|
||
|
||
// Initialize with some data - need to match the schema (id, name, age)
|
||
let mut init_data = HashMap::new();
|
||
let mut delta = Delta::new();
|
||
delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
init_data.insert("users".to_string(), delta);
|
||
|
||
circuit.initialize(init_data.clone()).unwrap();
|
||
|
||
// Verify initial state
|
||
let state = get_current_state(&circuit).unwrap();
|
||
assert_eq!(
|
||
state.len(),
|
||
2,
|
||
"Should have 2 users initially (both pass age > 18 filter)"
|
||
);
|
||
|
||
// Simulate a transaction: INSERT new users that pass the filter - match schema (id, name, age)
|
||
let mut uncommitted = HashMap::new();
|
||
let mut tx_delta = Delta::new();
|
||
tx_delta.insert(
|
||
3,
|
||
vec![
|
||
Value::Integer(3),
|
||
Value::Text("Charlie".into()),
|
||
Value::Integer(35),
|
||
],
|
||
);
|
||
tx_delta.insert(
|
||
4,
|
||
vec![
|
||
Value::Integer(4),
|
||
Value::Text("David".into()),
|
||
Value::Integer(20),
|
||
],
|
||
);
|
||
uncommitted.insert("users".to_string(), tx_delta);
|
||
|
||
// Execute with uncommitted data - this should return the uncommitted changes
|
||
// that passed through the filter (age > 18)
|
||
let tx_result = circuit
|
||
.execute(HashMap::new(), delta_set_from_map(uncommitted.clone()))
|
||
.unwrap();
|
||
|
||
// IMPORTANT: tx_result should contain the filtered uncommitted changes!
|
||
// Both Charlie (35) and David (20) should pass the age > 18 filter
|
||
assert_eq!(
|
||
tx_result.len(),
|
||
2,
|
||
"Should see 2 uncommitted rows that pass filter"
|
||
);
|
||
|
||
// Verify the uncommitted results contain the expected rows
|
||
let has_charlie = tx_result.changes.iter().any(|(row, _)| row.rowid == 3);
|
||
assert!(
|
||
has_charlie,
|
||
"Should find Charlie (rowid=3) in uncommitted results"
|
||
);
|
||
|
||
let has_david = tx_result.changes.iter().any(|(row, _)| row.rowid == 4);
|
||
assert!(
|
||
has_david,
|
||
"Should find David (rowid=4) in uncommitted results"
|
||
);
|
||
|
||
// CRITICAL: Verify the operator state wasn't modified by uncommitted execution
|
||
let state_after_uncommitted = get_current_state(&circuit).unwrap();
|
||
assert_eq!(
|
||
state_after_uncommitted.len(),
|
||
2,
|
||
"State should STILL be 2 after uncommitted execution - only Alice and Bob"
|
||
);
|
||
|
||
// The state should not contain Charlie or David
|
||
let has_charlie_in_state = state_after_uncommitted
|
||
.changes
|
||
.iter()
|
||
.any(|(row, _)| row.rowid == 3);
|
||
let has_david_in_state = state_after_uncommitted
|
||
.changes
|
||
.iter()
|
||
.any(|(row, _)| row.rowid == 4);
|
||
assert!(
|
||
!has_charlie_in_state,
|
||
"Charlie should NOT be in operator state (uncommitted)"
|
||
);
|
||
assert!(
|
||
!has_david_in_state,
|
||
"David should NOT be in operator state (uncommitted)"
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn test_uncommitted_aggregation_with_rollback() {
|
||
// Test that rollback properly discards uncommitted aggregation changes
|
||
// Similar to test_uncommitted_aggregation but explicitly tests rollback semantics
|
||
|
||
// Create a simple aggregation circuit
|
||
let mut circuit = compile_sql!("SELECT age, COUNT(*) as cnt FROM users GROUP BY age");
|
||
|
||
// Initialize with some data
|
||
let mut init_data = HashMap::new();
|
||
let mut delta = Delta::new();
|
||
delta.insert(
|
||
1,
|
||
vec![
|
||
Value::Integer(1),
|
||
Value::Text("Alice".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
delta.insert(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
delta.insert(
|
||
3,
|
||
vec![
|
||
Value::Integer(3),
|
||
Value::Text("Charlie".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
delta.insert(
|
||
4,
|
||
vec![
|
||
Value::Integer(4),
|
||
Value::Text("David".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
init_data.insert("users".to_string(), delta);
|
||
|
||
circuit.initialize(init_data).unwrap();
|
||
|
||
// Verify initial state: age 25 count=2, age 30 count=2
|
||
let state = get_current_state(&circuit).unwrap();
|
||
assert_eq!(state.changes.len(), 2);
|
||
|
||
let initial_counts: HashMap<i64, i64> = state
|
||
.changes
|
||
.iter()
|
||
.map(|(row, _)| {
|
||
if let (Value::Integer(age), Value::Integer(count)) =
|
||
(&row.values[0], &row.values[1])
|
||
{
|
||
(*age, *count)
|
||
} else {
|
||
panic!("Unexpected value types");
|
||
}
|
||
})
|
||
.collect();
|
||
|
||
assert_eq!(initial_counts.get(&25).unwrap(), &2);
|
||
assert_eq!(initial_counts.get(&30).unwrap(), &2);
|
||
|
||
// Create uncommitted changes that would affect aggregations
|
||
let mut uncommitted = HashMap::new();
|
||
let mut uncommitted_delta = Delta::new();
|
||
// Add more people aged 25
|
||
uncommitted_delta.insert(
|
||
5,
|
||
vec![
|
||
Value::Integer(5),
|
||
Value::Text("Eve".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
uncommitted_delta.insert(
|
||
6,
|
||
vec![
|
||
Value::Integer(6),
|
||
Value::Text("Frank".into()),
|
||
Value::Integer(25),
|
||
],
|
||
);
|
||
// Add person aged 35 (new group)
|
||
uncommitted_delta.insert(
|
||
7,
|
||
vec![
|
||
Value::Integer(7),
|
||
Value::Text("Grace".into()),
|
||
Value::Integer(35),
|
||
],
|
||
);
|
||
// Delete Bob (age 30)
|
||
uncommitted_delta.delete(
|
||
2,
|
||
vec![
|
||
Value::Integer(2),
|
||
Value::Text("Bob".into()),
|
||
Value::Integer(30),
|
||
],
|
||
);
|
||
uncommitted.insert("users".to_string(), uncommitted_delta);
|
||
|
||
// Execute with uncommitted changes
|
||
let tx_result = circuit
|
||
.execute(HashMap::new(), delta_set_from_map(uncommitted.clone()))
|
||
.unwrap();
|
||
|
||
// Should see the aggregate changes from uncommitted data
|
||
// Age 25: retraction of count 1 and insertion of count 2
|
||
// Age 30: insertion of count 1 (Bob is new for age 30)
|
||
assert!(
|
||
!tx_result.changes.is_empty(),
|
||
"Should see aggregate changes from uncommitted data"
|
||
);
|
||
|
||
// Verify internal state is unchanged (simulating rollback by not committing)
|
||
let state_after_rollback = get_current_state(&circuit).unwrap();
|
||
assert_eq!(
|
||
state_after_rollback.changes.len(),
|
||
2,
|
||
"Should still have 2 age groups"
|
||
);
|
||
|
||
let rollback_counts: HashMap<i64, i64> = state_after_rollback
|
||
.changes
|
||
.iter()
|
||
.map(|(row, _)| {
|
||
if let (Value::Integer(age), Value::Integer(count)) =
|
||
(&row.values[0], &row.values[1])
|
||
{
|
||
(*age, *count)
|
||
} else {
|
||
panic!("Unexpected value types");
|
||
}
|
||
})
|
||
.collect();
|
||
|
||
// Verify counts are unchanged after rollback
|
||
assert_eq!(
|
||
rollback_counts.get(&25).unwrap(),
|
||
&2,
|
||
"Age 25 count unchanged"
|
||
);
|
||
assert_eq!(
|
||
rollback_counts.get(&30).unwrap(),
|
||
&2,
|
||
"Age 30 count unchanged"
|
||
);
|
||
assert!(
|
||
!rollback_counts.contains_key(&35),
|
||
"Age 35 should not exist"
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn test_circuit_rowid_update_consolidation() {
|
||
// Test that circuit properly consolidates state when rowid changes
|
||
let mut circuit = DbspCircuit::new();
|
||
|
||
// Create a simple filter node
|
||
let schema = Arc::new(LogicalSchema::new(vec![
|
||
("id".to_string(), Type::Integer),
|
||
("value".to_string(), Type::Integer),
|
||
]));
|
||
|
||
// First create an input node
|
||
let input_id = circuit.add_node(
|
||
DbspOperator::Input {
|
||
name: "test".to_string(),
|
||
schema: schema.clone(),
|
||
},
|
||
vec![],
|
||
None, // Input nodes don't have executables
|
||
);
|
||
|
||
let filter_op = FilterOperator::new(
|
||
FilterPredicate::GreaterThan {
|
||
column: "value".to_string(),
|
||
value: Value::Integer(10),
|
||
},
|
||
vec!["id".to_string(), "value".to_string()],
|
||
);
|
||
|
||
// Create the filter predicate using DbspExpr
|
||
let predicate = DbspExpr::BinaryExpr {
|
||
left: Box::new(DbspExpr::Column("value".to_string())),
|
||
op: ast::Operator::Greater,
|
||
right: Box::new(DbspExpr::Literal(Value::Integer(10))),
|
||
};
|
||
|
||
let filter_id = circuit.add_node(
|
||
DbspOperator::Filter { predicate },
|
||
vec![input_id], // Filter takes input from the input node
|
||
Some(Box::new(filter_op)),
|
||
);
|
||
|
||
circuit.root = Some(filter_id);
|
||
|
||
// Initialize with a row
|
||
let mut init_data = HashMap::new();
|
||
let mut delta = Delta::new();
|
||
delta.insert(5, vec![Value::Integer(5), Value::Integer(20)]);
|
||
init_data.insert("test".to_string(), delta);
|
||
|
||
circuit.initialize(init_data).unwrap();
|
||
|
||
// Verify initial state
|
||
let state = get_current_state(&circuit).unwrap();
|
||
assert_eq!(state.changes.len(), 1);
|
||
assert_eq!(state.changes[0].0.rowid, 5);
|
||
|
||
// Now update the rowid from 5 to 3
|
||
let mut update_data = HashMap::new();
|
||
let mut update_delta = Delta::new();
|
||
update_delta.delete(5, vec![Value::Integer(5), Value::Integer(20)]);
|
||
update_delta.insert(3, vec![Value::Integer(3), Value::Integer(20)]);
|
||
update_data.insert("test".to_string(), update_delta);
|
||
|
||
circuit
|
||
.execute(update_data.clone(), DeltaSet::empty())
|
||
.unwrap();
|
||
|
||
// Commit the changes to update operator state
|
||
circuit.commit(update_data).unwrap();
|
||
|
||
// The circuit should consolidate the state properly
|
||
let final_state = get_current_state(&circuit).unwrap();
|
||
assert_eq!(
|
||
final_state.changes.len(),
|
||
1,
|
||
"Circuit should consolidate to single row"
|
||
);
|
||
assert_eq!(final_state.changes[0].0.rowid, 3);
|
||
assert_eq!(
|
||
final_state.changes[0].0.values,
|
||
vec![Value::Integer(3), Value::Integer(20)]
|
||
);
|
||
assert_eq!(final_state.changes[0].1, 1);
|
||
}
|
||
}
|