diff --git a/core/incremental/compiler.rs b/core/incremental/compiler.rs index 4a1cf7453..08cb07827 100644 --- a/core/incremental/compiler.rs +++ b/core/incremental/compiler.rs @@ -851,6 +851,62 @@ impl DbspCompiler { } } + /// Resolve join condition columns to determine which side each column belongs to. + /// + /// Returns (left_column, left_index, right_column, right_index) where: + /// - left_column/right_column are the Column references + /// - left_index/right_index are the column indices in their respective schemas + /// + /// Handles cases where: + /// - Columns are in normal order (left table column = right table column) + /// - Columns are swapped (right table column = left table column) + /// - One or both columns have table qualifiers + /// - Column names exist in both tables but are disambiguated by qualifiers + fn resolve_join_columns( + first_col: &Column, + second_col: &Column, + left_schema: &LogicalSchema, + right_schema: &LogicalSchema, + ) -> Result<(Column, usize, Column, usize)> { + // Check all four possibilities to handle ambiguous column names + let first_in_left = left_schema.find_column(&first_col.name, first_col.table.as_deref()); + let first_in_right = right_schema.find_column(&first_col.name, first_col.table.as_deref()); + let second_in_left = left_schema.find_column(&second_col.name, second_col.table.as_deref()); + let second_in_right = + right_schema.find_column(&second_col.name, second_col.table.as_deref()); + + // Determine the correct pairing: one column must be from left, one from right + if first_in_left.is_some() && second_in_right.is_some() { + // first is from left, second is from right + let (left_idx, _) = first_in_left.unwrap(); + let (right_idx, _) = second_in_right.unwrap(); + Ok((first_col.clone(), left_idx, second_col.clone(), right_idx)) + } else if first_in_right.is_some() && second_in_left.is_some() { + // first is from right, second is from left + let (left_idx, _) = second_in_left.unwrap(); + let (right_idx, _) = first_in_right.unwrap(); + Ok((second_col.clone(), left_idx, first_col.clone(), right_idx)) + } else { + // Provide specific error messages for different failure cases + if first_in_left.is_none() && first_in_right.is_none() { + Err(LimboError::ParseError(format!( + "Join condition column '{}' not found in either input", + first_col.name + ))) + } else if second_in_left.is_none() && second_in_right.is_none() { + Err(LimboError::ParseError(format!( + "Join condition column '{}' not found in either input", + second_col.name + ))) + } else { + Err(LimboError::ParseError(format!( + "Join condition columns '{}' and '{}' must come from different input tables", + first_col.name, second_col.name + ))) + } + } + } + /// Compile a logical plan to a DBSP circuit pub fn compile(mut self, plan: &LogicalPlan) -> Result { let root_id = self.compile_plan(plan)?; @@ -1240,24 +1296,17 @@ impl DbspCompiler { for (left_expr, right_expr) in &join.on { // Extract column indices from join expressions // We expect simple column references in join conditions - if let (LogicalExpr::Column(left_col), LogicalExpr::Column(right_col)) = (left_expr, right_expr) { - // Find indices in respective schemas using qualified lookup - let (left_idx, _) = left_schema.find_column(&left_col.name, left_col.table.as_deref()) - .ok_or_else(|| LimboError::ParseError( - format!("Join column '{}' not found in left input", left_col.name) - ))?; - let (right_idx, _) = right_schema.find_column(&right_col.name, right_col.table.as_deref()) - .ok_or_else(|| LimboError::ParseError( - format!("Join column '{}' not found in right input", right_col.name) - ))?; + if let (LogicalExpr::Column(first_col), LogicalExpr::Column(second_col)) = (left_expr, right_expr) { + let (actual_left_col, actual_left_idx, actual_right_col, actual_right_idx) = + Self::resolve_join_columns(first_col, second_col, left_schema, right_schema)?; - left_key_indices.push(left_idx); - right_key_indices.push(right_idx); + left_key_indices.push(actual_left_idx); + right_key_indices.push(actual_right_idx); // Convert to DBSP expressions dbsp_on_exprs.push(( - DbspExpr::Column(left_col.name.clone()), - DbspExpr::Column(right_col.name.clone()) + DbspExpr::Column(actual_left_col.name.clone()), + DbspExpr::Column(actual_right_col.name.clone()) )); } else { return Err(LimboError::ParseError( @@ -5700,4 +5749,340 @@ mod tests { .find(|(row, _)| row.values[0] == Value::Integer(2)); assert!(bob.is_none(), "Bob should be filtered out"); } + + fn make_column_info(name: &str, ty: Type, table: &str) -> ColumnInfo { + ColumnInfo { + name: name.to_string(), + ty, + database: None, + table: Some(table.to_string()), + table_alias: None, + } + } + + #[test] + fn test_resolve_join_columns_normal_order() { + // Normal case: left.id = right.id + let left_schema = LogicalSchema::new(vec![ + ColumnInfo { + name: "id".to_string(), + ty: Type::Integer, + database: None, + table: Some("left".to_string()), + table_alias: None, + }, + ColumnInfo { + name: "name".to_string(), + ty: Type::Text, + database: None, + table: Some("left".to_string()), + table_alias: None, + }, + ]); + let right_schema = LogicalSchema::new(vec![ + ColumnInfo { + name: "id".to_string(), + ty: Type::Integer, + database: None, + table: Some("right".to_string()), + table_alias: None, + }, + ColumnInfo { + name: "value".to_string(), + ty: Type::Integer, + database: None, + table: Some("right".to_string()), + table_alias: None, + }, + ]); + + let left_col = Column { + name: "id".to_string(), + table: Some("left".to_string()), + }; + let right_col = Column { + name: "id".to_string(), + table: Some("right".to_string()), + }; + + let result = + DbspCompiler::resolve_join_columns(&left_col, &right_col, &left_schema, &right_schema); + assert!(result.is_ok()); + let (actual_left, left_idx, actual_right, right_idx) = result.unwrap(); + assert_eq!(actual_left.name, "id"); + assert_eq!(actual_left.table, Some("left".to_string())); + assert_eq!(left_idx, 0); + assert_eq!(actual_right.name, "id"); + assert_eq!(actual_right.table, Some("right".to_string())); + assert_eq!(right_idx, 0); + } + + #[test] + fn test_resolve_join_columns_swapped_order() { + // Swapped case: right.id = left.id + let left_schema = LogicalSchema::new(vec![ + make_column_info("id", Type::Integer, "left"), + make_column_info("name", Type::Text, "left"), + ]); + let right_schema = LogicalSchema::new(vec![ + make_column_info("id", Type::Integer, "right"), + make_column_info("value", Type::Integer, "right"), + ]); + + let right_col = Column { + name: "id".to_string(), + table: Some("right".to_string()), + }; + let left_col = Column { + name: "id".to_string(), + table: Some("left".to_string()), + }; + + let result = + DbspCompiler::resolve_join_columns(&right_col, &left_col, &left_schema, &right_schema); + assert!(result.is_ok()); + let (actual_left, left_idx, actual_right, right_idx) = result.unwrap(); + assert_eq!(actual_left.name, "id"); + assert_eq!(actual_left.table, Some("left".to_string())); + assert_eq!(left_idx, 0); + assert_eq!(actual_right.name, "id"); + assert_eq!(actual_right.table, Some("right".to_string())); + assert_eq!(right_idx, 0); + } + + #[test] + fn test_resolve_join_columns_one_ambiguous_one_not() { + // Both tables have 'id', but only left has 'other_id' + let left_schema = LogicalSchema::new(vec![ + make_column_info("id", Type::Integer, "left"), + make_column_info("other_id", Type::Integer, "left"), + ]); + let right_schema = LogicalSchema::new(vec![ + make_column_info("id", Type::Integer, "right"), + make_column_info("value", Type::Integer, "right"), + ]); + + // Unqualified 'id' with qualified 'left.other_id' + let id_col = Column { + name: "id".to_string(), + table: None, + }; + let other_id_col = Column { + name: "other_id".to_string(), + table: Some("left".to_string()), + }; + + // id from right, other_id from left + let result = + DbspCompiler::resolve_join_columns(&id_col, &other_id_col, &left_schema, &right_schema); + assert!(result.is_ok()); + let (actual_left, left_idx, actual_right, right_idx) = result.unwrap(); + assert_eq!(actual_left.name, "other_id"); + assert_eq!(left_idx, 1); + assert_eq!(actual_right.name, "id"); + assert_eq!(right_idx, 0); + } + + #[test] + fn test_resolve_join_columns_mixed_qualified() { + // One qualified, one unqualified, column exists on both sides + let left_schema = LogicalSchema::new(vec![ + make_column_info("id", Type::Integer, "left"), + make_column_info("name", Type::Text, "left"), + ]); + let right_schema = LogicalSchema::new(vec![ + make_column_info("id", Type::Integer, "right"), + make_column_info("name", Type::Text, "right"), + ]); + + // Qualified left.id with unqualified name + let left_id = Column { + name: "id".to_string(), + table: Some("left".to_string()), + }; + let name_unqualified = Column { + name: "name".to_string(), + table: None, + }; + + let result = DbspCompiler::resolve_join_columns( + &left_id, + &name_unqualified, + &left_schema, + &right_schema, + ); + // left.id is explicitly from left, so unqualified 'name' must be resolved from right + assert!(result.is_ok()); + let (actual_left, left_idx, actual_right, right_idx) = result.unwrap(); + assert_eq!(actual_left.name, "id"); + assert_eq!(left_idx, 0); + assert_eq!(actual_right.name, "name"); + assert_eq!(right_idx, 1); + } + + #[test] + fn test_resolve_join_columns_both_from_same_side() { + // Both columns from left table - should fail + let left_schema = LogicalSchema::new(vec![ + make_column_info("id", Type::Integer, "left"), + make_column_info("other_id", Type::Integer, "left"), + ]); + let right_schema = + LogicalSchema::new(vec![make_column_info("value", Type::Integer, "right")]); + + let left_id = Column { + name: "id".to_string(), + table: Some("left".to_string()), + }; + let left_other_id = Column { + name: "other_id".to_string(), + table: Some("left".to_string()), + }; + + let result = DbspCompiler::resolve_join_columns( + &left_id, + &left_other_id, + &left_schema, + &right_schema, + ); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("must come from different input tables")); + } + + #[test] + fn test_resolve_join_columns_nonexistent_column() { + // Column doesn't exist in either table + let left_schema = LogicalSchema::new(vec![make_column_info("id", Type::Integer, "left")]); + let right_schema = + LogicalSchema::new(vec![make_column_info("value", Type::Integer, "right")]); + + let id_col = Column { + name: "id".to_string(), + table: None, + }; + let nonexistent_col = Column { + name: "does_not_exist".to_string(), + table: None, + }; + + let result = DbspCompiler::resolve_join_columns( + &id_col, + &nonexistent_col, + &left_schema, + &right_schema, + ); + assert!(result.is_err()); + } + + #[test] + fn test_resolve_join_columns_both_qualified() { + // Both columns qualified - should work normally + let left_schema = LogicalSchema::new(vec![ + make_column_info("id", Type::Integer, "left"), + make_column_info("name", Type::Text, "left"), + ]); + let right_schema = LogicalSchema::new(vec![ + make_column_info("id", Type::Integer, "right"), + make_column_info("value", Type::Integer, "right"), + ]); + + let left_id = Column { + name: "id".to_string(), + table: Some("left".to_string()), + }; + let right_id = Column { + name: "id".to_string(), + table: Some("right".to_string()), + }; + + let result = + DbspCompiler::resolve_join_columns(&left_id, &right_id, &left_schema, &right_schema); + assert!(result.is_ok()); + let (actual_left, left_idx, actual_right, right_idx) = result.unwrap(); + assert_eq!(actual_left.name, "id"); + assert_eq!(left_idx, 0); + assert_eq!(actual_right.name, "id"); + assert_eq!(right_idx, 0); + } + + #[test] + fn test_resolve_join_columns_both_unqualified_same_name() { + // Both columns unqualified with same name existing in both tables - should succeed + // (first match wins based on order of checking) + let left_schema = LogicalSchema::new(vec![make_column_info("id", Type::Integer, "left")]); + let right_schema = LogicalSchema::new(vec![make_column_info("id", Type::Integer, "right")]); + + let id_col1 = Column { + name: "id".to_string(), + table: None, + }; + let id_col2 = Column { + name: "id".to_string(), + table: None, + }; + + let result = + DbspCompiler::resolve_join_columns(&id_col1, &id_col2, &left_schema, &right_schema); + // Should succeed - unqualified 'id' matches in both schemas + assert!(result.is_ok()); + } + + #[test] + fn test_resolve_join_columns_first_not_found() { + // First column doesn't exist anywhere + let left_schema = LogicalSchema::new(vec![make_column_info("id", Type::Integer, "left")]); + let right_schema = + LogicalSchema::new(vec![make_column_info("value", Type::Integer, "right")]); + + let missing_col = Column { + name: "missing".to_string(), + table: None, + }; + let value_col = Column { + name: "value".to_string(), + table: None, + }; + + let result = DbspCompiler::resolve_join_columns( + &missing_col, + &value_col, + &left_schema, + &right_schema, + ); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("not found in either input")); + } + + #[test] + fn test_resolve_join_columns_both_unqualified_different_names() { + // Both unqualified, each exists in only one table + let left_schema = + LogicalSchema::new(vec![make_column_info("left_id", Type::Integer, "left")]); + let right_schema = + LogicalSchema::new(vec![make_column_info("right_id", Type::Integer, "right")]); + + let left_col = Column { + name: "left_id".to_string(), + table: None, + }; + let right_col = Column { + name: "right_id".to_string(), + table: None, + }; + + let result = + DbspCompiler::resolve_join_columns(&left_col, &right_col, &left_schema, &right_schema); + assert!(result.is_ok()); + let (actual_left, left_idx, actual_right, right_idx) = result.unwrap(); + assert_eq!(actual_left.name, "left_id"); + assert_eq!(left_idx, 0); + assert_eq!(actual_right.name, "right_id"); + assert_eq!(right_idx, 0); + } } diff --git a/testing/materialized_views.test b/testing/materialized_views.test index 1755aede2..911e50ef8 100755 --- a/testing/materialized_views.test +++ b/testing/materialized_views.test @@ -1595,3 +1595,20 @@ do_execsql_test_on_specific_db {:memory:} matview-in-incremental { 1|INFO|start 3|ERROR|fail 3|ERROR|fail} + +# Test join with swapped column order in ON clause +do_execsql_test_on_specific_db {:memory:} matview-join-swapped-columns { + CREATE TABLE employees(id INTEGER PRIMARY KEY, name TEXT); + CREATE TABLE departments(emp_id INTEGER PRIMARY KEY, dept_name TEXT); + + INSERT INTO employees VALUES (1, 'Alice'), (2, 'Bob'); + INSERT INTO departments VALUES (1, 'Engineering'), (2, 'Sales'); + + CREATE MATERIALIZED VIEW emp_dept AS + SELECT e.name, d.dept_name + FROM employees e + JOIN departments d ON d.emp_id = e.id; + + SELECT * FROM emp_dept ORDER BY name; +} {Alice|Engineering +Bob|Sales}