move the filter operator to accept indexes instead of names

We already did similarly for the AggregateOperator: for joins
you can have the same column name in many tables. And passing schema
information to the operator is a layering violation (the operator may be
operating on the result of a previous node, and at that point there is
no more "schema"). Therefore we pass indexes into the column set the
operator has.

The FilterOperator has a complication: we are using it to generate the
SQL for the populate statement, and that needs column names. However,
we should *not* be using the FilterOperator for that, and that is a
relic from the time where we had operator information directly inside
the IncrementalView.

To enable moving the FilterOperator to index-based, we rework that code.
For joins, we'll need to populate many tables anyway, so we take the
time to do that work here.
This commit is contained in:
Glauber Costa
2025-09-18 10:14:40 -05:00
parent cb7c04ffad
commit e80dd8e5e1
4 changed files with 1121 additions and 331 deletions

View File

@@ -895,21 +895,18 @@ impl DbspCompiler {
// Compile the input first
let input_id = self.compile_plan(&filter.input)?;
// Get column names from input schema
// Get input schema for column resolution
let input_schema = filter.input.schema();
let column_names: Vec<String> = input_schema.columns.iter()
.map(|col| col.name.clone())
.collect();
// Convert predicate to DBSP expression
let dbsp_predicate = Self::compile_expr(&filter.predicate)?;
// Convert to FilterPredicate
let filter_predicate = Self::compile_filter_predicate(&filter.predicate)?;
let filter_predicate = Self::compile_filter_predicate(&filter.predicate, input_schema)?;
// Create executable operator
let executable: Box<dyn IncrementalOperator> =
Box::new(FilterOperator::new(filter_predicate, column_names));
Box::new(FilterOperator::new(filter_predicate));
// Create filter node
let node_id = self.circuit.add_node(
@@ -1372,42 +1369,57 @@ impl DbspCompiler {
}
/// Compile a logical expression to a FilterPredicate for execution
fn compile_filter_predicate(expr: &LogicalExpr) -> Result<FilterPredicate> {
fn compile_filter_predicate(
expr: &LogicalExpr,
schema: &LogicalSchema,
) -> Result<FilterPredicate> {
match expr {
LogicalExpr::BinaryExpr { left, op, right } => {
// Extract column name and value for simple predicates
if let (LogicalExpr::Column(col), LogicalExpr::Literal(val)) =
(left.as_ref(), right.as_ref())
{
// Resolve column name to index using the schema
let column_idx = schema
.columns
.iter()
.position(|c| c.name == col.name)
.ok_or_else(|| {
crate::LimboError::ParseError(format!(
"Column '{}' not found in schema for filter",
col.name
))
})?;
match op {
BinaryOperator::Equals => Ok(FilterPredicate::Equals {
column: col.name.clone(),
column_idx,
value: val.clone(),
}),
BinaryOperator::NotEquals => Ok(FilterPredicate::NotEquals {
column: col.name.clone(),
column_idx,
value: val.clone(),
}),
BinaryOperator::Greater => Ok(FilterPredicate::GreaterThan {
column: col.name.clone(),
column_idx,
value: val.clone(),
}),
BinaryOperator::GreaterEquals => Ok(FilterPredicate::GreaterThanOrEqual {
column: col.name.clone(),
column_idx,
value: val.clone(),
}),
BinaryOperator::Less => Ok(FilterPredicate::LessThan {
column: col.name.clone(),
column_idx,
value: val.clone(),
}),
BinaryOperator::LessEquals => Ok(FilterPredicate::LessThanOrEqual {
column: col.name.clone(),
column_idx,
value: val.clone(),
}),
BinaryOperator::And => {
// Handle AND of two predicates
let left_pred = Self::compile_filter_predicate(left)?;
let right_pred = Self::compile_filter_predicate(right)?;
let left_pred = Self::compile_filter_predicate(left, schema)?;
let right_pred = Self::compile_filter_predicate(right, schema)?;
Ok(FilterPredicate::And(
Box::new(left_pred),
Box::new(right_pred),
@@ -1415,8 +1427,8 @@ impl DbspCompiler {
}
BinaryOperator::Or => {
// Handle OR of two predicates
let left_pred = Self::compile_filter_predicate(left)?;
let right_pred = Self::compile_filter_predicate(right)?;
let left_pred = Self::compile_filter_predicate(left, schema)?;
let right_pred = Self::compile_filter_predicate(right, schema)?;
Ok(FilterPredicate::Or(
Box::new(left_pred),
Box::new(right_pred),
@@ -1428,8 +1440,8 @@ impl DbspCompiler {
}
} else if matches!(op, BinaryOperator::And | BinaryOperator::Or) {
// Handle logical operators
let left_pred = Self::compile_filter_predicate(left)?;
let right_pred = Self::compile_filter_predicate(right)?;
let left_pred = Self::compile_filter_predicate(left, schema)?;
let right_pred = Self::compile_filter_predicate(right, schema)?;
match op {
BinaryOperator::And => Ok(FilterPredicate::And(
Box::new(left_pred),
@@ -3777,13 +3789,10 @@ mod tests {
Box::new(InputOperator::new("test".to_string())),
);
let filter_op = FilterOperator::new(
FilterPredicate::GreaterThan {
column: "value".to_string(),
value: Value::Integer(10),
},
vec!["id".to_string(), "value".to_string()],
);
let filter_op = FilterOperator::new(FilterPredicate::GreaterThan {
column_idx: 1, // "value" is at index 1
value: Value::Integer(10),
});
// Create the filter predicate using DbspExpr
let predicate = DbspExpr::BinaryExpr {
@@ -4587,18 +4596,18 @@ mod tests {
fn test_filter_with_qualified_columns_in_join() {
// Test that filters correctly handle qualified column names in joins
// when multiple tables have columns with the SAME names.
// Both users and sales tables have an 'id' column which can be ambiguous.
// Both users and customers tables have 'id' and 'name' columns which can be ambiguous.
let (mut circuit, pager) = compile_sql!(
"SELECT users.id, users.name, sales.id, sales.amount
"SELECT users.id, users.name, customers.id, customers.name
FROM users
JOIN sales ON users.id = sales.customer_id
WHERE users.id > 1 AND sales.id < 100"
JOIN customers ON users.id = customers.id
WHERE users.id > 1 AND customers.id < 100"
);
// Create test data
let mut users_delta = Delta::new();
let mut sales_delta = Delta::new();
let mut customers_delta = Delta::new();
// Users data: (id, name, age)
users_delta.insert(
@@ -4626,48 +4635,60 @@ mod tests {
],
); // id = 3
// Sales data: (id, customer_id, amount)
sales_delta.insert(
50,
vec![Value::Integer(50), Value::Integer(1), Value::Integer(100)],
); // sales.id = 50, customer_id = 1
sales_delta.insert(
99,
vec![Value::Integer(99), Value::Integer(2), Value::Integer(200)],
); // sales.id = 99, customer_id = 2
sales_delta.insert(
150,
vec![Value::Integer(150), Value::Integer(3), Value::Integer(300)],
); // sales.id = 150, customer_id = 3
// Customers data: (id, name, email)
customers_delta.insert(
1,
vec![
Value::Integer(1),
Value::Text("Customer Alice".into()),
Value::Text("alice@example.com".into()),
],
); // id = 1
customers_delta.insert(
2,
vec![
Value::Integer(2),
Value::Text("Customer Bob".into()),
Value::Text("bob@example.com".into()),
],
); // id = 2
customers_delta.insert(
3,
vec![
Value::Integer(3),
Value::Text("Customer Charlie".into()),
Value::Text("charlie@example.com".into()),
],
); // id = 3
let mut inputs = HashMap::new();
inputs.insert("users".to_string(), users_delta);
inputs.insert("sales".to_string(), sales_delta);
inputs.insert("customers".to_string(), customers_delta);
let result = test_execute(&mut circuit, inputs.clone(), pager.clone()).unwrap();
// Should only get row with Bob (users.id=2, sales.id=99):
// - users.id=2 (> 1) AND sales.id=99 (< 100) ✓
// Should get rows where users.id > 1 AND customers.id < 100
// - users.id=2 (> 1) AND customers.id=2 (< 100) ✓
// - users.id=3 (> 1) AND customers.id=3 (< 100) ✓
// Alice excluded: users.id=1 (NOT > 1)
// Charlie excluded: sales.id=150 (NOT < 100)
assert_eq!(result.len(), 1, "Should have 1 filtered result");
assert_eq!(result.len(), 2, "Should have 2 filtered results");
let (row, weight) = &result.changes[0];
assert_eq!(*weight, 1);
assert_eq!(row.values.len(), 4, "Should have 4 columns");
// Verify the filter correctly used qualified columns
// Verify the filter correctly used qualified columns for Bob
assert_eq!(row.values[0], Value::Integer(2), "users.id should be 2");
assert_eq!(
row.values[1],
Value::Text("Bob".into()),
"users.name should be Bob"
);
assert_eq!(row.values[2], Value::Integer(99), "sales.id should be 99");
assert_eq!(row.values[2], Value::Integer(2), "customers.id should be 2");
assert_eq!(
row.values[3],
Value::Integer(200),
"sales.amount should be 200"
Value::Text("Customer Bob".into()),
"customers.name should be Customer Bob"
);
}
}

View File

@@ -6,26 +6,25 @@ use crate::incremental::dbsp::{Delta, DeltaPair};
use crate::incremental::operator::{
ComputationTracker, DbspStateCursors, EvalState, IncrementalOperator,
};
use crate::types::{IOResult, Text};
use crate::types::IOResult;
use crate::{Result, Value};
use std::sync::{Arc, Mutex};
use turso_parser::ast::{Expr, Literal, OneSelect, Operator};
/// Filter predicate for filtering rows
#[derive(Debug, Clone)]
pub enum FilterPredicate {
/// Column = value
Equals { column: String, value: Value },
/// Column != value
NotEquals { column: String, value: Value },
/// Column > value
GreaterThan { column: String, value: Value },
/// Column >= value
GreaterThanOrEqual { column: String, value: Value },
/// Column < value
LessThan { column: String, value: Value },
/// Column <= value
LessThanOrEqual { column: String, value: Value },
/// Column = value (using column index)
Equals { column_idx: usize, value: Value },
/// Column != value (using column index)
NotEquals { column_idx: usize, value: Value },
/// Column > value (using column index)
GreaterThan { column_idx: usize, value: Value },
/// Column >= value (using column index)
GreaterThanOrEqual { column_idx: usize, value: Value },
/// Column < value (using column index)
LessThan { column_idx: usize, value: Value },
/// Column <= value (using column index)
LessThanOrEqual { column_idx: usize, value: Value },
/// Logical AND of two predicates
And(Box<FilterPredicate>, Box<FilterPredicate>),
/// Logical OR of two predicates
@@ -34,122 +33,17 @@ pub enum FilterPredicate {
None,
}
impl FilterPredicate {
/// Parse a SQL AST expression into a FilterPredicate
/// This centralizes all SQL-to-predicate parsing logic
pub fn from_sql_expr(expr: &turso_parser::ast::Expr) -> crate::Result<Self> {
let Expr::Binary(lhs, op, rhs) = expr else {
return Err(crate::LimboError::ParseError(
"Unsupported WHERE clause for incremental views: not a binary expression"
.to_string(),
));
};
// Handle AND/OR logical operators
match op {
Operator::And => {
let left = Self::from_sql_expr(lhs)?;
let right = Self::from_sql_expr(rhs)?;
return Ok(FilterPredicate::And(Box::new(left), Box::new(right)));
}
Operator::Or => {
let left = Self::from_sql_expr(lhs)?;
let right = Self::from_sql_expr(rhs)?;
return Ok(FilterPredicate::Or(Box::new(left), Box::new(right)));
}
_ => {}
}
// Handle comparison operators
let Expr::Id(column_name) = &**lhs else {
return Err(crate::LimboError::ParseError(
"Unsupported WHERE clause for incremental views: left-hand-side is not a column reference".to_string(),
));
};
let column = column_name.as_str().to_string();
// Parse the right-hand side value
let value = match &**rhs {
Expr::Literal(Literal::String(s)) => {
// Strip quotes from string literals
let cleaned = s.trim_matches('\'').trim_matches('"');
Value::Text(Text::new(cleaned))
}
Expr::Literal(Literal::Numeric(n)) => {
// Try to parse as integer first, then float
if let Ok(i) = n.parse::<i64>() {
Value::Integer(i)
} else if let Ok(f) = n.parse::<f64>() {
Value::Float(f)
} else {
return Err(crate::LimboError::ParseError(
"Unsupported WHERE clause for incremental views: right-hand-side is not a numeric literal".to_string(),
));
}
}
Expr::Literal(Literal::Null) => Value::Null,
Expr::Literal(Literal::Blob(_)) => {
// Blob comparison not yet supported
return Err(crate::LimboError::ParseError(
"Unsupported WHERE clause for incremental views: comparison with blob literals is not supported".to_string(),
));
}
other => {
// Complex expressions not yet supported
return Err(crate::LimboError::ParseError(
format!("Unsupported WHERE clause for incremental views: comparison with {other:?} is not supported"),
));
}
};
// Create the appropriate predicate based on operator
match op {
Operator::Equals => Ok(FilterPredicate::Equals { column, value }),
Operator::NotEquals => Ok(FilterPredicate::NotEquals { column, value }),
Operator::Greater => Ok(FilterPredicate::GreaterThan { column, value }),
Operator::GreaterEquals => Ok(FilterPredicate::GreaterThanOrEqual { column, value }),
Operator::Less => Ok(FilterPredicate::LessThan { column, value }),
Operator::LessEquals => Ok(FilterPredicate::LessThanOrEqual { column, value }),
other => Err(crate::LimboError::ParseError(
format!("Unsupported WHERE clause for incremental views: comparison operator {other:?} is not supported"),
)),
}
}
/// Parse a WHERE clause from a SELECT statement
pub fn from_select(select: &turso_parser::ast::Select) -> crate::Result<Self> {
if let OneSelect::Select {
ref where_clause, ..
} = select.body.select
{
if let Some(where_clause) = where_clause {
Self::from_sql_expr(where_clause)
} else {
Ok(FilterPredicate::None)
}
} else {
Err(crate::LimboError::ParseError(
"Unsupported WHERE clause for incremental views: not a single SELECT statement"
.to_string(),
))
}
}
}
/// Filter operator - filters rows based on predicate
#[derive(Debug)]
pub struct FilterOperator {
predicate: FilterPredicate,
column_names: Vec<String>,
tracker: Option<Arc<Mutex<ComputationTracker>>>,
}
impl FilterOperator {
pub fn new(predicate: FilterPredicate, column_names: Vec<String>) -> Self {
pub fn new(predicate: FilterPredicate) -> Self {
Self {
predicate,
column_names,
tracker: None,
}
}
@@ -162,86 +56,72 @@ impl FilterOperator {
pub fn evaluate_predicate(&self, values: &[Value]) -> bool {
match &self.predicate {
FilterPredicate::None => true,
FilterPredicate::Equals { column, value } => {
if let Some(idx) = self.column_names.iter().position(|c| c == column) {
if let Some(v) = values.get(idx) {
return v == value;
FilterPredicate::Equals { column_idx, value } => {
if let Some(v) = values.get(*column_idx) {
return v == value;
}
false
}
FilterPredicate::NotEquals { column_idx, value } => {
if let Some(v) = values.get(*column_idx) {
return v != value;
}
false
}
FilterPredicate::GreaterThan { column_idx, value } => {
if let Some(v) = values.get(*column_idx) {
// Compare based on value types
match (v, value) {
(Value::Integer(a), Value::Integer(b)) => return a > b,
(Value::Float(a), Value::Float(b)) => return a > b,
(Value::Text(a), Value::Text(b)) => return a.as_str() > b.as_str(),
_ => {}
}
}
false
}
FilterPredicate::NotEquals { column, value } => {
if let Some(idx) = self.column_names.iter().position(|c| c == column) {
if let Some(v) = values.get(idx) {
return v != value;
FilterPredicate::GreaterThanOrEqual { column_idx, value } => {
if let Some(v) = values.get(*column_idx) {
match (v, value) {
(Value::Integer(a), Value::Integer(b)) => return a >= b,
(Value::Float(a), Value::Float(b)) => return a >= b,
(Value::Text(a), Value::Text(b)) => return a.as_str() >= b.as_str(),
_ => {}
}
}
false
}
FilterPredicate::GreaterThan { column, value } => {
if let Some(idx) = self.column_names.iter().position(|c| c == column) {
if let Some(v) = values.get(idx) {
// Compare based on value types
match (v, value) {
(Value::Integer(a), Value::Integer(b)) => return a > b,
(Value::Float(a), Value::Float(b)) => return a > b,
(Value::Text(a), Value::Text(b)) => return a.as_str() > b.as_str(),
_ => {}
}
FilterPredicate::LessThan { column_idx, value } => {
if let Some(v) = values.get(*column_idx) {
match (v, value) {
(Value::Integer(a), Value::Integer(b)) => return a < b,
(Value::Float(a), Value::Float(b)) => return a < b,
(Value::Text(a), Value::Text(b)) => return a.as_str() < b.as_str(),
_ => {}
}
}
false
}
FilterPredicate::GreaterThanOrEqual { column, value } => {
if let Some(idx) = self.column_names.iter().position(|c| c == column) {
if let Some(v) = values.get(idx) {
match (v, value) {
(Value::Integer(a), Value::Integer(b)) => return a >= b,
(Value::Float(a), Value::Float(b)) => return a >= b,
(Value::Text(a), Value::Text(b)) => return a.as_str() >= b.as_str(),
_ => {}
}
}
}
false
}
FilterPredicate::LessThan { column, value } => {
if let Some(idx) = self.column_names.iter().position(|c| c == column) {
if let Some(v) = values.get(idx) {
match (v, value) {
(Value::Integer(a), Value::Integer(b)) => return a < b,
(Value::Float(a), Value::Float(b)) => return a < b,
(Value::Text(a), Value::Text(b)) => return a.as_str() < b.as_str(),
_ => {}
}
}
}
false
}
FilterPredicate::LessThanOrEqual { column, value } => {
if let Some(idx) = self.column_names.iter().position(|c| c == column) {
if let Some(v) = values.get(idx) {
match (v, value) {
(Value::Integer(a), Value::Integer(b)) => return a <= b,
(Value::Float(a), Value::Float(b)) => return a <= b,
(Value::Text(a), Value::Text(b)) => return a.as_str() <= b.as_str(),
_ => {}
}
FilterPredicate::LessThanOrEqual { column_idx, value } => {
if let Some(v) = values.get(*column_idx) {
match (v, value) {
(Value::Integer(a), Value::Integer(b)) => return a <= b,
(Value::Float(a), Value::Float(b)) => return a <= b,
(Value::Text(a), Value::Text(b)) => return a.as_str() <= b.as_str(),
_ => {}
}
}
false
}
FilterPredicate::And(left, right) => {
// Temporarily create sub-filters to evaluate
let left_filter = FilterOperator::new((**left).clone(), self.column_names.clone());
let right_filter =
FilterOperator::new((**right).clone(), self.column_names.clone());
let left_filter = FilterOperator::new((**left).clone());
let right_filter = FilterOperator::new((**right).clone());
left_filter.evaluate_predicate(values) && right_filter.evaluate_predicate(values)
}
FilterPredicate::Or(left, right) => {
let left_filter = FilterOperator::new((**left).clone(), self.column_names.clone());
let right_filter =
FilterOperator::new((**right).clone(), self.column_names.clone());
let left_filter = FilterOperator::new((**left).clone());
let right_filter = FilterOperator::new((**right).clone());
left_filter.evaluate_predicate(values) || right_filter.evaluate_predicate(values)
}
}

View File

@@ -1450,13 +1450,10 @@ mod tests {
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut filter = FilterOperator::new(
FilterPredicate::GreaterThan {
column: "b".to_string(),
value: Value::Integer(2),
},
vec!["a".to_string(), "b".to_string()],
);
let mut filter = FilterOperator::new(FilterPredicate::GreaterThan {
column_idx: 1, // "b" is at index 1
value: Value::Integer(2),
});
// Initialize with a row (rowid=3, values=[3, 3])
let mut init_data = Delta::new();
@@ -1512,13 +1509,10 @@ mod tests {
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut filter = FilterOperator::new(
FilterPredicate::GreaterThan {
column: "age".to_string(),
value: Value::Integer(25),
},
vec!["id".to_string(), "name".to_string(), "age".to_string()],
);
let mut filter = FilterOperator::new(FilterPredicate::GreaterThan {
column_idx: 2, // "age" is at index 2
value: Value::Integer(25),
});
// Initialize with some data
let mut init_data = Delta::new();

File diff suppressed because it is too large Load Diff