support union statements in the DBSP circuit compiler

This commit is contained in:
Glauber Costa
2025-09-19 05:18:44 -05:00
parent b419db489a
commit 2627ad44de
2 changed files with 461 additions and 1 deletions

View File

@@ -298,6 +298,8 @@ pub enum DbspOperator {
},
/// Input operator - source of data
Input { name: String, schema: SchemaRef },
/// Merge operator for combining streams (used in recursive CTEs and UNION)
Merge { schema: SchemaRef },
}
/// Represents an expression in DBSP
@@ -807,6 +809,13 @@ impl DbspCircuit {
DbspOperator::Input { name, .. } => {
writeln!(f, "{indent}Input[{node_id}]: {name}")?;
}
DbspOperator::Merge { schema } => {
writeln!(
f,
"{indent}Merge[{node_id}]: UNION/Recursive (schema: {} columns)",
schema.columns.len()
)?;
}
}
for input_id in &node.inputs {
@@ -1286,8 +1295,12 @@ impl DbspCompiler {
);
Ok(node_id)
}
LogicalPlan::Union(union) => {
// Handle UNION and UNION ALL
self.compile_union(union)
}
_ => Err(LimboError::ParseError(
format!("Unsupported operator in DBSP compiler: only Filter, Projection, Join and Aggregate are supported, got: {:?}",
format!("Unsupported operator in DBSP compiler: only Filter, Projection, Join, Aggregate, and Union are supported, got: {:?}",
match plan {
LogicalPlan::Sort(_) => "Sort",
LogicalPlan::Limit(_) => "Limit",
@@ -1304,6 +1317,116 @@ impl DbspCompiler {
}
}
/// Extract a representative table name from a logical plan (for UNION ALL identification)
/// Returns a string that uniquely identifies the source of the data
fn extract_source_identifier(plan: &LogicalPlan) -> String {
match plan {
LogicalPlan::TableScan(scan) => {
// Direct table scan - use the table name
scan.table_name.clone()
}
LogicalPlan::Projection(proj) => {
// Pass through to input
Self::extract_source_identifier(&proj.input)
}
LogicalPlan::Filter(filter) => {
// Pass through to input
Self::extract_source_identifier(&filter.input)
}
LogicalPlan::Aggregate(agg) => {
// Aggregate of a table
format!("agg_{}", Self::extract_source_identifier(&agg.input))
}
LogicalPlan::Sort(sort) => {
// Pass through to input
Self::extract_source_identifier(&sort.input)
}
LogicalPlan::Limit(limit) => {
// Pass through to input
Self::extract_source_identifier(&limit.input)
}
LogicalPlan::Join(join) => {
// Join of two sources - combine their identifiers
let left_id = Self::extract_source_identifier(&join.left);
let right_id = Self::extract_source_identifier(&join.right);
format!("join_{left_id}_{right_id}")
}
LogicalPlan::Union(union) => {
// Union of multiple sources
if union.inputs.is_empty() {
"union_empty".to_string()
} else {
let identifiers: Vec<String> = union
.inputs
.iter()
.map(|input| Self::extract_source_identifier(input))
.collect();
format!("union_{}", identifiers.join("_"))
}
}
LogicalPlan::Distinct(distinct) => {
// Distinct of a source
format!(
"distinct_{}",
Self::extract_source_identifier(&distinct.input)
)
}
LogicalPlan::WithCTE(with_cte) => {
// CTE body
Self::extract_source_identifier(&with_cte.body)
}
LogicalPlan::CTERef(cte_ref) => {
// CTE reference - use the CTE name
format!("cte_{}", cte_ref.name)
}
LogicalPlan::EmptyRelation(_) => "empty".to_string(),
LogicalPlan::Values(_) => "values".to_string(),
}
}
/// Compile a UNION operator
fn compile_union(&mut self, union: &crate::translate::logical::Union) -> Result<usize> {
if union.inputs.len() != 2 {
return Err(LimboError::ParseError(format!(
"UNION requires exactly 2 inputs, got {}",
union.inputs.len()
)));
}
// Extract source identifiers from each input (for UNION ALL)
let left_source = Self::extract_source_identifier(&union.inputs[0]);
let right_source = Self::extract_source_identifier(&union.inputs[1]);
// Compile left and right inputs
let left_id = self.compile_plan(&union.inputs[0])?;
let right_id = self.compile_plan(&union.inputs[1])?;
use crate::incremental::merge_operator::{MergeOperator, UnionMode};
// Create a merge operator that handles the rowid transformation
let operator_id = self.circuit.next_id;
let mode = if union.all {
// For UNION ALL, pass the source identifiers
UnionMode::All {
left_table: left_source,
right_table: right_source,
}
} else {
UnionMode::Distinct
};
let merge_operator = Box::new(MergeOperator::new(operator_id, mode));
let merge_id = self.circuit.add_node(
DbspOperator::Merge {
schema: union.schema.clone(),
},
vec![left_id, right_id],
merge_operator,
);
Ok(merge_id)
}
/// Convert a logical expression to a DBSP expression
fn compile_expr(expr: &LogicalExpr) -> Result<DbspExpr> {
match expr {

View File

@@ -1091,3 +1091,340 @@ do_execsql_test_on_specific_db {:memory:} matview-join-complex-where {
} {Charlie|10|100|1000
Alice|5|100|500
Charlie|6|75|450}
# Test UNION queries in materialized views
do_execsql_test_on_specific_db {:memory:} matview-union-simple {
CREATE TABLE sales_online(id INTEGER, product TEXT, amount INTEGER);
CREATE TABLE sales_store(id INTEGER, product TEXT, amount INTEGER);
INSERT INTO sales_online VALUES
(1, 'Laptop', 1200),
(2, 'Mouse', 25),
(3, 'Monitor', 400);
INSERT INTO sales_store VALUES
(1, 'Keyboard', 75),
(2, 'Chair', 150),
(3, 'Desk', 350);
-- Create a view that combines both sources
CREATE MATERIALIZED VIEW all_sales AS
SELECT product, amount FROM sales_online
UNION ALL
SELECT product, amount FROM sales_store;
SELECT * FROM all_sales ORDER BY product;
} {Chair|150
Desk|350
Keyboard|75
Laptop|1200
Monitor|400
Mouse|25}
do_execsql_test_on_specific_db {:memory:} matview-union-with-where {
CREATE TABLE employees(id INTEGER, name TEXT, dept TEXT, salary INTEGER);
CREATE TABLE contractors(id INTEGER, name TEXT, dept TEXT, rate INTEGER);
INSERT INTO employees VALUES
(1, 'Alice', 'Engineering', 90000),
(2, 'Bob', 'Sales', 60000),
(3, 'Charlie', 'Engineering', 85000);
INSERT INTO contractors VALUES
(1, 'David', 'Engineering', 150),
(2, 'Eve', 'Marketing', 120),
(3, 'Frank', 'Engineering', 180);
-- High-earning staff from both categories
CREATE MATERIALIZED VIEW high_earners AS
SELECT name, dept, salary as compensation FROM employees WHERE salary > 80000
UNION ALL
SELECT name, dept, rate * 2000 as compensation FROM contractors WHERE rate > 140;
SELECT * FROM high_earners ORDER BY name;
} {Alice|Engineering|90000
Charlie|Engineering|85000
David|Engineering|300000
Frank|Engineering|360000}
do_execsql_test_on_specific_db {:memory:} matview-union-same-table-different-filters {
CREATE TABLE orders(id INTEGER, customer_id INTEGER, product TEXT, amount INTEGER, status TEXT);
INSERT INTO orders VALUES
(1, 1, 'Laptop', 1200, 'completed'),
(2, 2, 'Mouse', 25, 'pending'),
(3, 1, 'Monitor', 400, 'completed'),
(4, 3, 'Keyboard', 75, 'cancelled'),
(5, 2, 'Desk', 350, 'completed'),
(6, 3, 'Chair', 150, 'pending');
-- View showing priority orders: high-value OR pending status
CREATE MATERIALIZED VIEW priority_orders AS
SELECT id, customer_id, product, amount FROM orders WHERE amount > 300
UNION ALL
SELECT id, customer_id, product, amount FROM orders WHERE status = 'pending';
SELECT * FROM priority_orders ORDER BY id;
} {1|1|Laptop|1200
2|2|Mouse|25
3|1|Monitor|400
5|2|Desk|350
6|3|Chair|150}
do_execsql_test_on_specific_db {:memory:} matview-union-with-aggregation {
CREATE TABLE q1_sales(product TEXT, quantity INTEGER, revenue INTEGER);
CREATE TABLE q2_sales(product TEXT, quantity INTEGER, revenue INTEGER);
INSERT INTO q1_sales VALUES
('Laptop', 10, 12000),
('Mouse', 50, 1250),
('Monitor', 8, 3200);
INSERT INTO q2_sales VALUES
('Laptop', 15, 18000),
('Mouse', 60, 1500),
('Keyboard', 30, 2250);
-- Combined quarterly summary
CREATE MATERIALIZED VIEW half_year_summary AS
SELECT 'Q1' as quarter, SUM(quantity) as total_units, SUM(revenue) as total_revenue
FROM q1_sales
UNION ALL
SELECT 'Q2' as quarter, SUM(quantity) as total_units, SUM(revenue) as total_revenue
FROM q2_sales;
SELECT * FROM half_year_summary ORDER BY quarter;
} {Q1|68|16450
Q2|105|21750}
do_execsql_test_on_specific_db {:memory:} matview-union-with-join {
CREATE TABLE customers(id INTEGER PRIMARY KEY, name TEXT, type TEXT);
CREATE TABLE orders(id INTEGER PRIMARY KEY, customer_id INTEGER, amount INTEGER);
CREATE TABLE quotes(id INTEGER PRIMARY KEY, customer_id INTEGER, amount INTEGER);
INSERT INTO customers VALUES
(1, 'Alice', 'premium'),
(2, 'Bob', 'regular'),
(3, 'Charlie', 'premium');
INSERT INTO orders VALUES
(1, 1, 1000),
(2, 2, 500),
(3, 3, 1500);
INSERT INTO quotes VALUES
(1, 1, 800),
(2, 2, 300),
(3, 3, 2000);
-- All premium customer transactions (orders and quotes)
CREATE MATERIALIZED VIEW premium_transactions AS
SELECT c.name, 'order' as type, o.amount
FROM customers c
JOIN orders o ON c.id = o.customer_id
WHERE c.type = 'premium'
UNION ALL
SELECT c.name, 'quote' as type, q.amount
FROM customers c
JOIN quotes q ON c.id = q.customer_id
WHERE c.type = 'premium';
SELECT * FROM premium_transactions ORDER BY name, type, amount;
} {Alice|order|1000
Alice|quote|800
Charlie|order|1500
Charlie|quote|2000}
do_execsql_test_on_specific_db {:memory:} matview-union-distinct {
CREATE TABLE active_users(id INTEGER, name TEXT, email TEXT);
CREATE TABLE inactive_users(id INTEGER, name TEXT, email TEXT);
INSERT INTO active_users VALUES
(1, 'Alice', 'alice@example.com'),
(2, 'Bob', 'bob@example.com'),
(3, 'Charlie', 'charlie@example.com');
INSERT INTO inactive_users VALUES
(4, 'David', 'david@example.com'),
(2, 'Bob', 'bob@example.com'), -- Bob appears in both
(5, 'Eve', 'eve@example.com');
-- All unique users (using UNION to deduplicate)
CREATE MATERIALIZED VIEW all_users AS
SELECT id, name, email FROM active_users
UNION
SELECT id, name, email FROM inactive_users;
SELECT * FROM all_users ORDER BY id;
} {1|Alice|alice@example.com
2|Bob|bob@example.com
3|Charlie|charlie@example.com
4|David|david@example.com
5|Eve|eve@example.com}
do_execsql_test_on_specific_db {:memory:} matview-union-complex-multiple-branches {
CREATE TABLE products(id INTEGER, name TEXT, category TEXT, price INTEGER);
INSERT INTO products VALUES
(1, 'Laptop', 'Electronics', 1200),
(2, 'Mouse', 'Electronics', 25),
(3, 'Desk', 'Furniture', 350),
(4, 'Chair', 'Furniture', 150),
(5, 'Monitor', 'Electronics', 400),
(6, 'Keyboard', 'Electronics', 75),
(7, 'Bookshelf', 'Furniture', 200),
(8, 'Tablet', 'Electronics', 600);
-- Products of interest: expensive electronics, all furniture, or very cheap items
CREATE MATERIALIZED VIEW featured_products AS
SELECT name, category, price, 'PremiumElectronic' as tag
FROM products
WHERE category = 'Electronics' AND price > 500
UNION ALL
SELECT name, category, price, 'Furniture' as tag
FROM products
WHERE category = 'Furniture'
UNION ALL
SELECT name, category, price, 'Budget' as tag
FROM products
WHERE price < 50;
SELECT * FROM featured_products ORDER BY tag, name;
} {Mouse|Electronics|25|Budget
Bookshelf|Furniture|200|Furniture
Chair|Furniture|150|Furniture
Desk|Furniture|350|Furniture
Laptop|Electronics|1200|PremiumElectronic
Tablet|Electronics|600|PremiumElectronic}
do_execsql_test_on_specific_db {:memory:} matview-union-maintenance-insert {
CREATE TABLE t1(id INTEGER, value INTEGER);
CREATE TABLE t2(id INTEGER, value INTEGER);
INSERT INTO t1 VALUES (1, 100), (2, 200);
INSERT INTO t2 VALUES (3, 300), (4, 400);
CREATE MATERIALIZED VIEW combined AS
SELECT id, value FROM t1 WHERE value > 150
UNION ALL
SELECT id, value FROM t2 WHERE value > 350;
SELECT * FROM combined ORDER BY id;
-- Insert into t1
INSERT INTO t1 VALUES (5, 500);
SELECT * FROM combined ORDER BY id;
-- Insert into t2
INSERT INTO t2 VALUES (6, 600);
SELECT * FROM combined ORDER BY id;
} {2|200
4|400
2|200
4|400
5|500
2|200
4|400
5|500
6|600}
do_execsql_test_on_specific_db {:memory:} matview-union-maintenance-delete {
CREATE TABLE source1(id INTEGER PRIMARY KEY, data TEXT);
CREATE TABLE source2(id INTEGER PRIMARY KEY, data TEXT);
INSERT INTO source1 VALUES (1, 'A'), (2, 'B'), (3, 'C');
INSERT INTO source2 VALUES (4, 'D'), (5, 'E'), (6, 'F');
CREATE MATERIALIZED VIEW merged AS
SELECT id, data FROM source1
UNION ALL
SELECT id, data FROM source2;
SELECT COUNT(*) FROM merged;
DELETE FROM source1 WHERE id = 2;
SELECT COUNT(*) FROM merged;
DELETE FROM source2 WHERE id > 4;
SELECT COUNT(*) FROM merged;
} {6
5
3}
do_execsql_test_on_specific_db {:memory:} matview-union-maintenance-update {
CREATE TABLE high_priority(id INTEGER PRIMARY KEY, task TEXT, priority INTEGER);
CREATE TABLE normal_priority(id INTEGER PRIMARY KEY, task TEXT, priority INTEGER);
INSERT INTO high_priority VALUES (1, 'Task A', 10), (2, 'Task B', 9);
INSERT INTO normal_priority VALUES (3, 'Task C', 5), (4, 'Task D', 6);
CREATE MATERIALIZED VIEW active_tasks AS
SELECT id, task, priority FROM high_priority WHERE priority >= 9
UNION ALL
SELECT id, task, priority FROM normal_priority WHERE priority >= 5;
SELECT COUNT(*) FROM active_tasks;
-- Update drops a high priority task below threshold
UPDATE high_priority SET priority = 8 WHERE id = 2;
SELECT COUNT(*) FROM active_tasks;
-- Update brings a normal task above threshold
UPDATE normal_priority SET priority = 3 WHERE id = 3;
SELECT COUNT(*) FROM active_tasks;
} {4
3
2}
# Test UNION ALL with same table and different WHERE conditions
do_execsql_test_on_specific_db {:memory:} matview-union-all-same-table {
CREATE TABLE test(id INTEGER PRIMARY KEY, value INTEGER);
INSERT INTO test VALUES (1, 10), (2, 20);
-- This UNION ALL should return both rows
CREATE MATERIALIZED VIEW union_view AS
SELECT id, value FROM test WHERE value < 15
UNION ALL
SELECT id, value FROM test WHERE value > 15;
-- Should return 2 rows: (1,10) and (2,20)
SELECT * FROM union_view ORDER BY id;
} {1|10
2|20}
# Test UNION ALL preserves all rows in count
do_execsql_test_on_specific_db {:memory:} matview-union-all-row-count {
CREATE TABLE data(id INTEGER PRIMARY KEY, num INTEGER);
INSERT INTO data VALUES (1, 5), (2, 15), (3, 25);
CREATE MATERIALIZED VIEW split_view AS
SELECT id, num FROM data WHERE num <= 10
UNION ALL
SELECT id, num FROM data WHERE num > 10;
-- Should return count of 3
SELECT COUNT(*) FROM split_view;
} {3}
# Test UNION ALL with text columns and filtering
do_execsql_test_on_specific_db {:memory:} matview-union-all-text-filter {
CREATE TABLE items(id INTEGER PRIMARY KEY, category TEXT, price INTEGER);
INSERT INTO items VALUES
(1, 'cheap', 10),
(2, 'expensive', 100),
(3, 'cheap', 20),
(4, 'expensive', 200);
CREATE MATERIALIZED VIEW price_categories AS
SELECT id, category, price FROM items WHERE category = 'cheap'
UNION ALL
SELECT id, category, price FROM items WHERE category = 'expensive';
-- Should return all 4 items
SELECT COUNT(*) FROM price_categories;
SELECT id FROM price_categories ORDER BY id;
} {4
1
2
3
4}