Fix column fetch in joins

In comparisons for joins, we were assuming that the left column belonged
to the left join (and vice-versa). That is incorrect, because you can
write the comparison condition in any order.

Fixes #3368
This commit is contained in:
Glauber Costa
2025-09-27 06:53:47 -03:00
parent 2f38d2ef04
commit 78ee8b8627
2 changed files with 416 additions and 14 deletions

View File

@@ -851,6 +851,62 @@ impl DbspCompiler {
} }
} }
/// Resolve join condition columns to determine which side each column belongs to.
///
/// Returns (left_column, left_index, right_column, right_index) where:
/// - left_column/right_column are the Column references
/// - left_index/right_index are the column indices in their respective schemas
///
/// Handles cases where:
/// - Columns are in normal order (left table column = right table column)
/// - Columns are swapped (right table column = left table column)
/// - One or both columns have table qualifiers
/// - Column names exist in both tables but are disambiguated by qualifiers
fn resolve_join_columns(
first_col: &Column,
second_col: &Column,
left_schema: &LogicalSchema,
right_schema: &LogicalSchema,
) -> Result<(Column, usize, Column, usize)> {
// Check all four possibilities to handle ambiguous column names
let first_in_left = left_schema.find_column(&first_col.name, first_col.table.as_deref());
let first_in_right = right_schema.find_column(&first_col.name, first_col.table.as_deref());
let second_in_left = left_schema.find_column(&second_col.name, second_col.table.as_deref());
let second_in_right =
right_schema.find_column(&second_col.name, second_col.table.as_deref());
// Determine the correct pairing: one column must be from left, one from right
if first_in_left.is_some() && second_in_right.is_some() {
// first is from left, second is from right
let (left_idx, _) = first_in_left.unwrap();
let (right_idx, _) = second_in_right.unwrap();
Ok((first_col.clone(), left_idx, second_col.clone(), right_idx))
} else if first_in_right.is_some() && second_in_left.is_some() {
// first is from right, second is from left
let (left_idx, _) = second_in_left.unwrap();
let (right_idx, _) = first_in_right.unwrap();
Ok((second_col.clone(), left_idx, first_col.clone(), right_idx))
} else {
// Provide specific error messages for different failure cases
if first_in_left.is_none() && first_in_right.is_none() {
Err(LimboError::ParseError(format!(
"Join condition column '{}' not found in either input",
first_col.name
)))
} else if second_in_left.is_none() && second_in_right.is_none() {
Err(LimboError::ParseError(format!(
"Join condition column '{}' not found in either input",
second_col.name
)))
} else {
Err(LimboError::ParseError(format!(
"Join condition columns '{}' and '{}' must come from different input tables",
first_col.name, second_col.name
)))
}
}
}
/// Compile a logical plan to a DBSP circuit /// Compile a logical plan to a DBSP circuit
pub fn compile(mut self, plan: &LogicalPlan) -> Result<DbspCircuit> { pub fn compile(mut self, plan: &LogicalPlan) -> Result<DbspCircuit> {
let root_id = self.compile_plan(plan)?; let root_id = self.compile_plan(plan)?;
@@ -1240,24 +1296,17 @@ impl DbspCompiler {
for (left_expr, right_expr) in &join.on { for (left_expr, right_expr) in &join.on {
// Extract column indices from join expressions // Extract column indices from join expressions
// We expect simple column references in join conditions // We expect simple column references in join conditions
if let (LogicalExpr::Column(left_col), LogicalExpr::Column(right_col)) = (left_expr, right_expr) { if let (LogicalExpr::Column(first_col), LogicalExpr::Column(second_col)) = (left_expr, right_expr) {
// Find indices in respective schemas using qualified lookup let (actual_left_col, actual_left_idx, actual_right_col, actual_right_idx) =
let (left_idx, _) = left_schema.find_column(&left_col.name, left_col.table.as_deref()) Self::resolve_join_columns(first_col, second_col, left_schema, right_schema)?;
.ok_or_else(|| LimboError::ParseError(
format!("Join column '{}' not found in left input", left_col.name)
))?;
let (right_idx, _) = right_schema.find_column(&right_col.name, right_col.table.as_deref())
.ok_or_else(|| LimboError::ParseError(
format!("Join column '{}' not found in right input", right_col.name)
))?;
left_key_indices.push(left_idx); left_key_indices.push(actual_left_idx);
right_key_indices.push(right_idx); right_key_indices.push(actual_right_idx);
// Convert to DBSP expressions // Convert to DBSP expressions
dbsp_on_exprs.push(( dbsp_on_exprs.push((
DbspExpr::Column(left_col.name.clone()), DbspExpr::Column(actual_left_col.name.clone()),
DbspExpr::Column(right_col.name.clone()) DbspExpr::Column(actual_right_col.name.clone())
)); ));
} else { } else {
return Err(LimboError::ParseError( return Err(LimboError::ParseError(
@@ -5700,4 +5749,340 @@ mod tests {
.find(|(row, _)| row.values[0] == Value::Integer(2)); .find(|(row, _)| row.values[0] == Value::Integer(2));
assert!(bob.is_none(), "Bob should be filtered out"); assert!(bob.is_none(), "Bob should be filtered out");
} }
fn make_column_info(name: &str, ty: Type, table: &str) -> ColumnInfo {
ColumnInfo {
name: name.to_string(),
ty,
database: None,
table: Some(table.to_string()),
table_alias: None,
}
}
#[test]
fn test_resolve_join_columns_normal_order() {
// Normal case: left.id = right.id
let left_schema = LogicalSchema::new(vec![
ColumnInfo {
name: "id".to_string(),
ty: Type::Integer,
database: None,
table: Some("left".to_string()),
table_alias: None,
},
ColumnInfo {
name: "name".to_string(),
ty: Type::Text,
database: None,
table: Some("left".to_string()),
table_alias: None,
},
]);
let right_schema = LogicalSchema::new(vec![
ColumnInfo {
name: "id".to_string(),
ty: Type::Integer,
database: None,
table: Some("right".to_string()),
table_alias: None,
},
ColumnInfo {
name: "value".to_string(),
ty: Type::Integer,
database: None,
table: Some("right".to_string()),
table_alias: None,
},
]);
let left_col = Column {
name: "id".to_string(),
table: Some("left".to_string()),
};
let right_col = Column {
name: "id".to_string(),
table: Some("right".to_string()),
};
let result =
DbspCompiler::resolve_join_columns(&left_col, &right_col, &left_schema, &right_schema);
assert!(result.is_ok());
let (actual_left, left_idx, actual_right, right_idx) = result.unwrap();
assert_eq!(actual_left.name, "id");
assert_eq!(actual_left.table, Some("left".to_string()));
assert_eq!(left_idx, 0);
assert_eq!(actual_right.name, "id");
assert_eq!(actual_right.table, Some("right".to_string()));
assert_eq!(right_idx, 0);
}
#[test]
fn test_resolve_join_columns_swapped_order() {
// Swapped case: right.id = left.id
let left_schema = LogicalSchema::new(vec![
make_column_info("id", Type::Integer, "left"),
make_column_info("name", Type::Text, "left"),
]);
let right_schema = LogicalSchema::new(vec![
make_column_info("id", Type::Integer, "right"),
make_column_info("value", Type::Integer, "right"),
]);
let right_col = Column {
name: "id".to_string(),
table: Some("right".to_string()),
};
let left_col = Column {
name: "id".to_string(),
table: Some("left".to_string()),
};
let result =
DbspCompiler::resolve_join_columns(&right_col, &left_col, &left_schema, &right_schema);
assert!(result.is_ok());
let (actual_left, left_idx, actual_right, right_idx) = result.unwrap();
assert_eq!(actual_left.name, "id");
assert_eq!(actual_left.table, Some("left".to_string()));
assert_eq!(left_idx, 0);
assert_eq!(actual_right.name, "id");
assert_eq!(actual_right.table, Some("right".to_string()));
assert_eq!(right_idx, 0);
}
#[test]
fn test_resolve_join_columns_one_ambiguous_one_not() {
// Both tables have 'id', but only left has 'other_id'
let left_schema = LogicalSchema::new(vec![
make_column_info("id", Type::Integer, "left"),
make_column_info("other_id", Type::Integer, "left"),
]);
let right_schema = LogicalSchema::new(vec![
make_column_info("id", Type::Integer, "right"),
make_column_info("value", Type::Integer, "right"),
]);
// Unqualified 'id' with qualified 'left.other_id'
let id_col = Column {
name: "id".to_string(),
table: None,
};
let other_id_col = Column {
name: "other_id".to_string(),
table: Some("left".to_string()),
};
// id from right, other_id from left
let result =
DbspCompiler::resolve_join_columns(&id_col, &other_id_col, &left_schema, &right_schema);
assert!(result.is_ok());
let (actual_left, left_idx, actual_right, right_idx) = result.unwrap();
assert_eq!(actual_left.name, "other_id");
assert_eq!(left_idx, 1);
assert_eq!(actual_right.name, "id");
assert_eq!(right_idx, 0);
}
#[test]
fn test_resolve_join_columns_mixed_qualified() {
// One qualified, one unqualified, column exists on both sides
let left_schema = LogicalSchema::new(vec![
make_column_info("id", Type::Integer, "left"),
make_column_info("name", Type::Text, "left"),
]);
let right_schema = LogicalSchema::new(vec![
make_column_info("id", Type::Integer, "right"),
make_column_info("name", Type::Text, "right"),
]);
// Qualified left.id with unqualified name
let left_id = Column {
name: "id".to_string(),
table: Some("left".to_string()),
};
let name_unqualified = Column {
name: "name".to_string(),
table: None,
};
let result = DbspCompiler::resolve_join_columns(
&left_id,
&name_unqualified,
&left_schema,
&right_schema,
);
// left.id is explicitly from left, so unqualified 'name' must be resolved from right
assert!(result.is_ok());
let (actual_left, left_idx, actual_right, right_idx) = result.unwrap();
assert_eq!(actual_left.name, "id");
assert_eq!(left_idx, 0);
assert_eq!(actual_right.name, "name");
assert_eq!(right_idx, 1);
}
#[test]
fn test_resolve_join_columns_both_from_same_side() {
// Both columns from left table - should fail
let left_schema = LogicalSchema::new(vec![
make_column_info("id", Type::Integer, "left"),
make_column_info("other_id", Type::Integer, "left"),
]);
let right_schema =
LogicalSchema::new(vec![make_column_info("value", Type::Integer, "right")]);
let left_id = Column {
name: "id".to_string(),
table: Some("left".to_string()),
};
let left_other_id = Column {
name: "other_id".to_string(),
table: Some("left".to_string()),
};
let result = DbspCompiler::resolve_join_columns(
&left_id,
&left_other_id,
&left_schema,
&right_schema,
);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("must come from different input tables"));
}
#[test]
fn test_resolve_join_columns_nonexistent_column() {
// Column doesn't exist in either table
let left_schema = LogicalSchema::new(vec![make_column_info("id", Type::Integer, "left")]);
let right_schema =
LogicalSchema::new(vec![make_column_info("value", Type::Integer, "right")]);
let id_col = Column {
name: "id".to_string(),
table: None,
};
let nonexistent_col = Column {
name: "does_not_exist".to_string(),
table: None,
};
let result = DbspCompiler::resolve_join_columns(
&id_col,
&nonexistent_col,
&left_schema,
&right_schema,
);
assert!(result.is_err());
}
#[test]
fn test_resolve_join_columns_both_qualified() {
// Both columns qualified - should work normally
let left_schema = LogicalSchema::new(vec![
make_column_info("id", Type::Integer, "left"),
make_column_info("name", Type::Text, "left"),
]);
let right_schema = LogicalSchema::new(vec![
make_column_info("id", Type::Integer, "right"),
make_column_info("value", Type::Integer, "right"),
]);
let left_id = Column {
name: "id".to_string(),
table: Some("left".to_string()),
};
let right_id = Column {
name: "id".to_string(),
table: Some("right".to_string()),
};
let result =
DbspCompiler::resolve_join_columns(&left_id, &right_id, &left_schema, &right_schema);
assert!(result.is_ok());
let (actual_left, left_idx, actual_right, right_idx) = result.unwrap();
assert_eq!(actual_left.name, "id");
assert_eq!(left_idx, 0);
assert_eq!(actual_right.name, "id");
assert_eq!(right_idx, 0);
}
#[test]
fn test_resolve_join_columns_both_unqualified_same_name() {
// Both columns unqualified with same name existing in both tables - should succeed
// (first match wins based on order of checking)
let left_schema = LogicalSchema::new(vec![make_column_info("id", Type::Integer, "left")]);
let right_schema = LogicalSchema::new(vec![make_column_info("id", Type::Integer, "right")]);
let id_col1 = Column {
name: "id".to_string(),
table: None,
};
let id_col2 = Column {
name: "id".to_string(),
table: None,
};
let result =
DbspCompiler::resolve_join_columns(&id_col1, &id_col2, &left_schema, &right_schema);
// Should succeed - unqualified 'id' matches in both schemas
assert!(result.is_ok());
}
#[test]
fn test_resolve_join_columns_first_not_found() {
// First column doesn't exist anywhere
let left_schema = LogicalSchema::new(vec![make_column_info("id", Type::Integer, "left")]);
let right_schema =
LogicalSchema::new(vec![make_column_info("value", Type::Integer, "right")]);
let missing_col = Column {
name: "missing".to_string(),
table: None,
};
let value_col = Column {
name: "value".to_string(),
table: None,
};
let result = DbspCompiler::resolve_join_columns(
&missing_col,
&value_col,
&left_schema,
&right_schema,
);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("not found in either input"));
}
#[test]
fn test_resolve_join_columns_both_unqualified_different_names() {
// Both unqualified, each exists in only one table
let left_schema =
LogicalSchema::new(vec![make_column_info("left_id", Type::Integer, "left")]);
let right_schema =
LogicalSchema::new(vec![make_column_info("right_id", Type::Integer, "right")]);
let left_col = Column {
name: "left_id".to_string(),
table: None,
};
let right_col = Column {
name: "right_id".to_string(),
table: None,
};
let result =
DbspCompiler::resolve_join_columns(&left_col, &right_col, &left_schema, &right_schema);
assert!(result.is_ok());
let (actual_left, left_idx, actual_right, right_idx) = result.unwrap();
assert_eq!(actual_left.name, "left_id");
assert_eq!(left_idx, 0);
assert_eq!(actual_right.name, "right_id");
assert_eq!(right_idx, 0);
}
} }

View File

@@ -1595,3 +1595,20 @@ do_execsql_test_on_specific_db {:memory:} matview-in-incremental {
1|INFO|start 1|INFO|start
3|ERROR|fail 3|ERROR|fail
3|ERROR|fail} 3|ERROR|fail}
# Test join with swapped column order in ON clause
do_execsql_test_on_specific_db {:memory:} matview-join-swapped-columns {
CREATE TABLE employees(id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE departments(emp_id INTEGER PRIMARY KEY, dept_name TEXT);
INSERT INTO employees VALUES (1, 'Alice'), (2, 'Bob');
INSERT INTO departments VALUES (1, 'Engineering'), (2, 'Sales');
CREATE MATERIALIZED VIEW emp_dept AS
SELECT e.name, d.dept_name
FROM employees e
JOIN departments d ON d.emp_id = e.id;
SELECT * FROM emp_dept ORDER BY name;
} {Alice|Engineering
Bob|Sales}