From 2627ad44de1cd23dc96a7e00d6bb17afbd1ab3f4 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Fri, 19 Sep 2025 05:18:44 -0500 Subject: [PATCH] support union statements in the DBSP circuit compiler --- core/incremental/compiler.rs | 125 +++++++++++- testing/materialized_views.test | 337 ++++++++++++++++++++++++++++++++ 2 files changed, 461 insertions(+), 1 deletion(-) diff --git a/core/incremental/compiler.rs b/core/incremental/compiler.rs index 8c8189261..cec950f35 100644 --- a/core/incremental/compiler.rs +++ b/core/incremental/compiler.rs @@ -298,6 +298,8 @@ pub enum DbspOperator { }, /// Input operator - source of data Input { name: String, schema: SchemaRef }, + /// Merge operator for combining streams (used in recursive CTEs and UNION) + Merge { schema: SchemaRef }, } /// Represents an expression in DBSP @@ -807,6 +809,13 @@ impl DbspCircuit { DbspOperator::Input { name, .. } => { writeln!(f, "{indent}Input[{node_id}]: {name}")?; } + DbspOperator::Merge { schema } => { + writeln!( + f, + "{indent}Merge[{node_id}]: UNION/Recursive (schema: {} columns)", + schema.columns.len() + )?; + } } for input_id in &node.inputs { @@ -1286,8 +1295,12 @@ impl DbspCompiler { ); Ok(node_id) } + LogicalPlan::Union(union) => { + // Handle UNION and UNION ALL + self.compile_union(union) + } _ => Err(LimboError::ParseError( - format!("Unsupported operator in DBSP compiler: only Filter, Projection, Join and Aggregate are supported, got: {:?}", + format!("Unsupported operator in DBSP compiler: only Filter, Projection, Join, Aggregate, and Union are supported, got: {:?}", match plan { LogicalPlan::Sort(_) => "Sort", LogicalPlan::Limit(_) => "Limit", @@ -1304,6 +1317,116 @@ impl DbspCompiler { } } + /// Extract a representative table name from a logical plan (for UNION ALL identification) + /// Returns a string that uniquely identifies the source of the data + fn extract_source_identifier(plan: &LogicalPlan) -> String { + match plan { + LogicalPlan::TableScan(scan) => { + // Direct table scan - use the table name + scan.table_name.clone() + } + LogicalPlan::Projection(proj) => { + // Pass through to input + Self::extract_source_identifier(&proj.input) + } + LogicalPlan::Filter(filter) => { + // Pass through to input + Self::extract_source_identifier(&filter.input) + } + LogicalPlan::Aggregate(agg) => { + // Aggregate of a table + format!("agg_{}", Self::extract_source_identifier(&agg.input)) + } + LogicalPlan::Sort(sort) => { + // Pass through to input + Self::extract_source_identifier(&sort.input) + } + LogicalPlan::Limit(limit) => { + // Pass through to input + Self::extract_source_identifier(&limit.input) + } + LogicalPlan::Join(join) => { + // Join of two sources - combine their identifiers + let left_id = Self::extract_source_identifier(&join.left); + let right_id = Self::extract_source_identifier(&join.right); + format!("join_{left_id}_{right_id}") + } + LogicalPlan::Union(union) => { + // Union of multiple sources + if union.inputs.is_empty() { + "union_empty".to_string() + } else { + let identifiers: Vec = union + .inputs + .iter() + .map(|input| Self::extract_source_identifier(input)) + .collect(); + format!("union_{}", identifiers.join("_")) + } + } + LogicalPlan::Distinct(distinct) => { + // Distinct of a source + format!( + "distinct_{}", + Self::extract_source_identifier(&distinct.input) + ) + } + LogicalPlan::WithCTE(with_cte) => { + // CTE body + Self::extract_source_identifier(&with_cte.body) + } + LogicalPlan::CTERef(cte_ref) => { + // CTE reference - use the CTE name + format!("cte_{}", cte_ref.name) + } + LogicalPlan::EmptyRelation(_) => "empty".to_string(), + LogicalPlan::Values(_) => "values".to_string(), + } + } + + /// Compile a UNION operator + fn compile_union(&mut self, union: &crate::translate::logical::Union) -> Result { + if union.inputs.len() != 2 { + return Err(LimboError::ParseError(format!( + "UNION requires exactly 2 inputs, got {}", + union.inputs.len() + ))); + } + + // Extract source identifiers from each input (for UNION ALL) + let left_source = Self::extract_source_identifier(&union.inputs[0]); + let right_source = Self::extract_source_identifier(&union.inputs[1]); + + // Compile left and right inputs + let left_id = self.compile_plan(&union.inputs[0])?; + let right_id = self.compile_plan(&union.inputs[1])?; + + use crate::incremental::merge_operator::{MergeOperator, UnionMode}; + + // Create a merge operator that handles the rowid transformation + let operator_id = self.circuit.next_id; + let mode = if union.all { + // For UNION ALL, pass the source identifiers + UnionMode::All { + left_table: left_source, + right_table: right_source, + } + } else { + UnionMode::Distinct + }; + let merge_operator = Box::new(MergeOperator::new(operator_id, mode)); + + let merge_id = self.circuit.add_node( + DbspOperator::Merge { + schema: union.schema.clone(), + }, + vec![left_id, right_id], + merge_operator, + ); + + Ok(merge_id) + } + /// Convert a logical expression to a DBSP expression fn compile_expr(expr: &LogicalExpr) -> Result { match expr { diff --git a/testing/materialized_views.test b/testing/materialized_views.test index 15229a48c..354f65d39 100755 --- a/testing/materialized_views.test +++ b/testing/materialized_views.test @@ -1091,3 +1091,340 @@ do_execsql_test_on_specific_db {:memory:} matview-join-complex-where { } {Charlie|10|100|1000 Alice|5|100|500 Charlie|6|75|450} + +# Test UNION queries in materialized views +do_execsql_test_on_specific_db {:memory:} matview-union-simple { + CREATE TABLE sales_online(id INTEGER, product TEXT, amount INTEGER); + CREATE TABLE sales_store(id INTEGER, product TEXT, amount INTEGER); + + INSERT INTO sales_online VALUES + (1, 'Laptop', 1200), + (2, 'Mouse', 25), + (3, 'Monitor', 400); + + INSERT INTO sales_store VALUES + (1, 'Keyboard', 75), + (2, 'Chair', 150), + (3, 'Desk', 350); + + -- Create a view that combines both sources + CREATE MATERIALIZED VIEW all_sales AS + SELECT product, amount FROM sales_online + UNION ALL + SELECT product, amount FROM sales_store; + + SELECT * FROM all_sales ORDER BY product; +} {Chair|150 +Desk|350 +Keyboard|75 +Laptop|1200 +Monitor|400 +Mouse|25} + +do_execsql_test_on_specific_db {:memory:} matview-union-with-where { + CREATE TABLE employees(id INTEGER, name TEXT, dept TEXT, salary INTEGER); + CREATE TABLE contractors(id INTEGER, name TEXT, dept TEXT, rate INTEGER); + + INSERT INTO employees VALUES + (1, 'Alice', 'Engineering', 90000), + (2, 'Bob', 'Sales', 60000), + (3, 'Charlie', 'Engineering', 85000); + + INSERT INTO contractors VALUES + (1, 'David', 'Engineering', 150), + (2, 'Eve', 'Marketing', 120), + (3, 'Frank', 'Engineering', 180); + + -- High-earning staff from both categories + CREATE MATERIALIZED VIEW high_earners AS + SELECT name, dept, salary as compensation FROM employees WHERE salary > 80000 + UNION ALL + SELECT name, dept, rate * 2000 as compensation FROM contractors WHERE rate > 140; + + SELECT * FROM high_earners ORDER BY name; +} {Alice|Engineering|90000 +Charlie|Engineering|85000 +David|Engineering|300000 +Frank|Engineering|360000} + +do_execsql_test_on_specific_db {:memory:} matview-union-same-table-different-filters { + CREATE TABLE orders(id INTEGER, customer_id INTEGER, product TEXT, amount INTEGER, status TEXT); + + INSERT INTO orders VALUES + (1, 1, 'Laptop', 1200, 'completed'), + (2, 2, 'Mouse', 25, 'pending'), + (3, 1, 'Monitor', 400, 'completed'), + (4, 3, 'Keyboard', 75, 'cancelled'), + (5, 2, 'Desk', 350, 'completed'), + (6, 3, 'Chair', 150, 'pending'); + + -- View showing priority orders: high-value OR pending status + CREATE MATERIALIZED VIEW priority_orders AS + SELECT id, customer_id, product, amount FROM orders WHERE amount > 300 + UNION ALL + SELECT id, customer_id, product, amount FROM orders WHERE status = 'pending'; + + SELECT * FROM priority_orders ORDER BY id; +} {1|1|Laptop|1200 +2|2|Mouse|25 +3|1|Monitor|400 +5|2|Desk|350 +6|3|Chair|150} + +do_execsql_test_on_specific_db {:memory:} matview-union-with-aggregation { + CREATE TABLE q1_sales(product TEXT, quantity INTEGER, revenue INTEGER); + CREATE TABLE q2_sales(product TEXT, quantity INTEGER, revenue INTEGER); + + INSERT INTO q1_sales VALUES + ('Laptop', 10, 12000), + ('Mouse', 50, 1250), + ('Monitor', 8, 3200); + + INSERT INTO q2_sales VALUES + ('Laptop', 15, 18000), + ('Mouse', 60, 1500), + ('Keyboard', 30, 2250); + + -- Combined quarterly summary + CREATE MATERIALIZED VIEW half_year_summary AS + SELECT 'Q1' as quarter, SUM(quantity) as total_units, SUM(revenue) as total_revenue + FROM q1_sales + UNION ALL + SELECT 'Q2' as quarter, SUM(quantity) as total_units, SUM(revenue) as total_revenue + FROM q2_sales; + + SELECT * FROM half_year_summary ORDER BY quarter; +} {Q1|68|16450 +Q2|105|21750} + +do_execsql_test_on_specific_db {:memory:} matview-union-with-join { + CREATE TABLE customers(id INTEGER PRIMARY KEY, name TEXT, type TEXT); + CREATE TABLE orders(id INTEGER PRIMARY KEY, customer_id INTEGER, amount INTEGER); + CREATE TABLE quotes(id INTEGER PRIMARY KEY, customer_id INTEGER, amount INTEGER); + + INSERT INTO customers VALUES + (1, 'Alice', 'premium'), + (2, 'Bob', 'regular'), + (3, 'Charlie', 'premium'); + + INSERT INTO orders VALUES + (1, 1, 1000), + (2, 2, 500), + (3, 3, 1500); + + INSERT INTO quotes VALUES + (1, 1, 800), + (2, 2, 300), + (3, 3, 2000); + + -- All premium customer transactions (orders and quotes) + CREATE MATERIALIZED VIEW premium_transactions AS + SELECT c.name, 'order' as type, o.amount + FROM customers c + JOIN orders o ON c.id = o.customer_id + WHERE c.type = 'premium' + UNION ALL + SELECT c.name, 'quote' as type, q.amount + FROM customers c + JOIN quotes q ON c.id = q.customer_id + WHERE c.type = 'premium'; + + SELECT * FROM premium_transactions ORDER BY name, type, amount; +} {Alice|order|1000 +Alice|quote|800 +Charlie|order|1500 +Charlie|quote|2000} + +do_execsql_test_on_specific_db {:memory:} matview-union-distinct { + CREATE TABLE active_users(id INTEGER, name TEXT, email TEXT); + CREATE TABLE inactive_users(id INTEGER, name TEXT, email TEXT); + + INSERT INTO active_users VALUES + (1, 'Alice', 'alice@example.com'), + (2, 'Bob', 'bob@example.com'), + (3, 'Charlie', 'charlie@example.com'); + + INSERT INTO inactive_users VALUES + (4, 'David', 'david@example.com'), + (2, 'Bob', 'bob@example.com'), -- Bob appears in both + (5, 'Eve', 'eve@example.com'); + + -- All unique users (using UNION to deduplicate) + CREATE MATERIALIZED VIEW all_users AS + SELECT id, name, email FROM active_users + UNION + SELECT id, name, email FROM inactive_users; + + SELECT * FROM all_users ORDER BY id; +} {1|Alice|alice@example.com +2|Bob|bob@example.com +3|Charlie|charlie@example.com +4|David|david@example.com +5|Eve|eve@example.com} + +do_execsql_test_on_specific_db {:memory:} matview-union-complex-multiple-branches { + CREATE TABLE products(id INTEGER, name TEXT, category TEXT, price INTEGER); + + INSERT INTO products VALUES + (1, 'Laptop', 'Electronics', 1200), + (2, 'Mouse', 'Electronics', 25), + (3, 'Desk', 'Furniture', 350), + (4, 'Chair', 'Furniture', 150), + (5, 'Monitor', 'Electronics', 400), + (6, 'Keyboard', 'Electronics', 75), + (7, 'Bookshelf', 'Furniture', 200), + (8, 'Tablet', 'Electronics', 600); + + -- Products of interest: expensive electronics, all furniture, or very cheap items + CREATE MATERIALIZED VIEW featured_products AS + SELECT name, category, price, 'PremiumElectronic' as tag + FROM products + WHERE category = 'Electronics' AND price > 500 + UNION ALL + SELECT name, category, price, 'Furniture' as tag + FROM products + WHERE category = 'Furniture' + UNION ALL + SELECT name, category, price, 'Budget' as tag + FROM products + WHERE price < 50; + + SELECT * FROM featured_products ORDER BY tag, name; +} {Mouse|Electronics|25|Budget +Bookshelf|Furniture|200|Furniture +Chair|Furniture|150|Furniture +Desk|Furniture|350|Furniture +Laptop|Electronics|1200|PremiumElectronic +Tablet|Electronics|600|PremiumElectronic} + +do_execsql_test_on_specific_db {:memory:} matview-union-maintenance-insert { + CREATE TABLE t1(id INTEGER, value INTEGER); + CREATE TABLE t2(id INTEGER, value INTEGER); + + INSERT INTO t1 VALUES (1, 100), (2, 200); + INSERT INTO t2 VALUES (3, 300), (4, 400); + + CREATE MATERIALIZED VIEW combined AS + SELECT id, value FROM t1 WHERE value > 150 + UNION ALL + SELECT id, value FROM t2 WHERE value > 350; + + SELECT * FROM combined ORDER BY id; + + -- Insert into t1 + INSERT INTO t1 VALUES (5, 500); + SELECT * FROM combined ORDER BY id; + + -- Insert into t2 + INSERT INTO t2 VALUES (6, 600); + SELECT * FROM combined ORDER BY id; +} {2|200 +4|400 +2|200 +4|400 +5|500 +2|200 +4|400 +5|500 +6|600} + +do_execsql_test_on_specific_db {:memory:} matview-union-maintenance-delete { + CREATE TABLE source1(id INTEGER PRIMARY KEY, data TEXT); + CREATE TABLE source2(id INTEGER PRIMARY KEY, data TEXT); + + INSERT INTO source1 VALUES (1, 'A'), (2, 'B'), (3, 'C'); + INSERT INTO source2 VALUES (4, 'D'), (5, 'E'), (6, 'F'); + + CREATE MATERIALIZED VIEW merged AS + SELECT id, data FROM source1 + UNION ALL + SELECT id, data FROM source2; + + SELECT COUNT(*) FROM merged; + + DELETE FROM source1 WHERE id = 2; + SELECT COUNT(*) FROM merged; + + DELETE FROM source2 WHERE id > 4; + SELECT COUNT(*) FROM merged; +} {6 +5 +3} + +do_execsql_test_on_specific_db {:memory:} matview-union-maintenance-update { + CREATE TABLE high_priority(id INTEGER PRIMARY KEY, task TEXT, priority INTEGER); + CREATE TABLE normal_priority(id INTEGER PRIMARY KEY, task TEXT, priority INTEGER); + + INSERT INTO high_priority VALUES (1, 'Task A', 10), (2, 'Task B', 9); + INSERT INTO normal_priority VALUES (3, 'Task C', 5), (4, 'Task D', 6); + + CREATE MATERIALIZED VIEW active_tasks AS + SELECT id, task, priority FROM high_priority WHERE priority >= 9 + UNION ALL + SELECT id, task, priority FROM normal_priority WHERE priority >= 5; + + SELECT COUNT(*) FROM active_tasks; + + -- Update drops a high priority task below threshold + UPDATE high_priority SET priority = 8 WHERE id = 2; + SELECT COUNT(*) FROM active_tasks; + + -- Update brings a normal task above threshold + UPDATE normal_priority SET priority = 3 WHERE id = 3; + SELECT COUNT(*) FROM active_tasks; +} {4 +3 +2} + +# Test UNION ALL with same table and different WHERE conditions +do_execsql_test_on_specific_db {:memory:} matview-union-all-same-table { + CREATE TABLE test(id INTEGER PRIMARY KEY, value INTEGER); + INSERT INTO test VALUES (1, 10), (2, 20); + + -- This UNION ALL should return both rows + CREATE MATERIALIZED VIEW union_view AS + SELECT id, value FROM test WHERE value < 15 + UNION ALL + SELECT id, value FROM test WHERE value > 15; + + -- Should return 2 rows: (1,10) and (2,20) + SELECT * FROM union_view ORDER BY id; +} {1|10 +2|20} + +# Test UNION ALL preserves all rows in count +do_execsql_test_on_specific_db {:memory:} matview-union-all-row-count { + CREATE TABLE data(id INTEGER PRIMARY KEY, num INTEGER); + INSERT INTO data VALUES (1, 5), (2, 15), (3, 25); + + CREATE MATERIALIZED VIEW split_view AS + SELECT id, num FROM data WHERE num <= 10 + UNION ALL + SELECT id, num FROM data WHERE num > 10; + + -- Should return count of 3 + SELECT COUNT(*) FROM split_view; +} {3} + +# Test UNION ALL with text columns and filtering +do_execsql_test_on_specific_db {:memory:} matview-union-all-text-filter { + CREATE TABLE items(id INTEGER PRIMARY KEY, category TEXT, price INTEGER); + INSERT INTO items VALUES + (1, 'cheap', 10), + (2, 'expensive', 100), + (3, 'cheap', 20), + (4, 'expensive', 200); + + CREATE MATERIALIZED VIEW price_categories AS + SELECT id, category, price FROM items WHERE category = 'cheap' + UNION ALL + SELECT id, category, price FROM items WHERE category = 'expensive'; + + -- Should return all 4 items + SELECT COUNT(*) FROM price_categories; + SELECT id FROM price_categories ORDER BY id; +} {4 +1 +2 +3 +4}