diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 38e1689da..34e2e2f1b 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -575,7 +575,10 @@ impl IncrementalOperator for FilterOperator { } fn get_current_state(&self) -> Delta { - self.current_state.clone() + // Return a consolidated view of the current state + let mut consolidated = self.current_state.clone(); + consolidated.consolidate(); + consolidated } fn set_tracker(&mut self, tracker: Arc>) { @@ -918,7 +921,10 @@ impl IncrementalOperator for ProjectOperator { } fn get_current_state(&self) -> Delta { - self.current_state.clone() + // Return a consolidated view of the current state + let mut consolidated = self.current_state.clone(); + consolidated.consolidate(); + consolidated } fn set_tracker(&mut self, tracker: Arc>) { @@ -1584,7 +1590,10 @@ impl IncrementalOperator for AggregateOperator { } fn get_current_state(&self) -> Delta { - self.current_state.clone() + // Return a consolidated view of the current state + let mut consolidated = self.current_state.clone(); + consolidated.consolidate(); + consolidated } fn set_tracker(&mut self, tracker: Arc>) { @@ -2390,4 +2399,61 @@ mod tests { assert_eq!(change_row.values[1], Value::Integer(1)); // count: 2 - 1 assert_eq!(change_row.values[2], Value::Integer(200)); // sum: 300 - 100 } + + #[test] + fn test_filter_operator_rowid_update() { + // When a row's rowid changes (e.g., UPDATE t SET a=1 WHERE a=3 on INTEGER PRIMARY KEY), + // the operator should properly consolidate the state + + let mut filter = FilterOperator::new( + FilterPredicate::GreaterThan { + column: "b".to_string(), + value: Value::Integer(2), + }, + vec!["a".to_string(), "b".to_string()], + ); + + // Initialize with a row (rowid=3, values=[3, 3]) + let mut init_data = Delta::new(); + init_data.insert(3, vec![Value::Integer(3), Value::Integer(3)]); + filter.initialize(init_data); + + // Check initial state + let state = filter.get_current_state(); + assert_eq!(state.changes.len(), 1); + assert_eq!(state.changes[0].0.rowid, 3); + assert_eq!( + state.changes[0].0.values, + vec![Value::Integer(3), Value::Integer(3)] + ); + + // Simulate an UPDATE that changes rowid from 3 to 1 + // This is sent as: delete(3) + insert(1) + let mut update_delta = Delta::new(); + update_delta.delete(3, vec![Value::Integer(3), Value::Integer(3)]); + update_delta.insert(1, vec![Value::Integer(1), Value::Integer(3)]); + + let output = filter.process_delta(update_delta); + + // The output delta should have both changes (both pass the filter b > 2) + assert_eq!(output.changes.len(), 2); + assert_eq!(output.changes[0].1, -1); // delete weight + assert_eq!(output.changes[1].1, 1); // insert weight + + // The current state should be consolidated to only have rows with positive weight + let final_state = filter.get_current_state(); + + // After consolidation, we should have only one row with rowid=1 + assert_eq!( + final_state.changes.len(), + 1, + "State should be consolidated to have only one row" + ); + assert_eq!(final_state.changes[0].0.rowid, 1); + assert_eq!( + final_state.changes[0].0.values, + vec![Value::Integer(1), Value::Integer(3)] + ); + assert_eq!(final_state.changes[0].1, 1); // positive weight + } }