From 6e2bd364ee05c48bb979e65fa9ea8b284762cbd0 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Tue, 26 Aug 2025 12:11:31 -0500 Subject: [PATCH] fix issue with rowids and deletions The operator itself should handle deletions and updates that change the rowid by consolidating its state. Our current materialized views track state themselves, so we don't see this problem now. But it becomes apparent once we switch the views to use circuits. --- core/incremental/operator.rs | 72 ++++++++++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 38e1689da..34e2e2f1b 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -575,7 +575,10 @@ impl IncrementalOperator for FilterOperator { } fn get_current_state(&self) -> Delta { - self.current_state.clone() + // Return a consolidated view of the current state + let mut consolidated = self.current_state.clone(); + consolidated.consolidate(); + consolidated } fn set_tracker(&mut self, tracker: Arc>) { @@ -918,7 +921,10 @@ impl IncrementalOperator for ProjectOperator { } fn get_current_state(&self) -> Delta { - self.current_state.clone() + // Return a consolidated view of the current state + let mut consolidated = self.current_state.clone(); + consolidated.consolidate(); + consolidated } fn set_tracker(&mut self, tracker: Arc>) { @@ -1584,7 +1590,10 @@ impl IncrementalOperator for AggregateOperator { } fn get_current_state(&self) -> Delta { - self.current_state.clone() + // Return a consolidated view of the current state + let mut consolidated = self.current_state.clone(); + consolidated.consolidate(); + consolidated } fn set_tracker(&mut self, tracker: Arc>) { @@ -2390,4 +2399,61 @@ mod tests { assert_eq!(change_row.values[1], Value::Integer(1)); // count: 2 - 1 assert_eq!(change_row.values[2], Value::Integer(200)); // sum: 300 - 100 } + + #[test] + fn test_filter_operator_rowid_update() { + // When a row's rowid changes (e.g., UPDATE t SET a=1 WHERE a=3 on INTEGER PRIMARY KEY), + // the operator should properly consolidate the state + + let mut filter = FilterOperator::new( + FilterPredicate::GreaterThan { + column: "b".to_string(), + value: Value::Integer(2), + }, + vec!["a".to_string(), "b".to_string()], + ); + + // Initialize with a row (rowid=3, values=[3, 3]) + let mut init_data = Delta::new(); + init_data.insert(3, vec![Value::Integer(3), Value::Integer(3)]); + filter.initialize(init_data); + + // Check initial state + let state = filter.get_current_state(); + assert_eq!(state.changes.len(), 1); + assert_eq!(state.changes[0].0.rowid, 3); + assert_eq!( + state.changes[0].0.values, + vec![Value::Integer(3), Value::Integer(3)] + ); + + // Simulate an UPDATE that changes rowid from 3 to 1 + // This is sent as: delete(3) + insert(1) + let mut update_delta = Delta::new(); + update_delta.delete(3, vec![Value::Integer(3), Value::Integer(3)]); + update_delta.insert(1, vec![Value::Integer(1), Value::Integer(3)]); + + let output = filter.process_delta(update_delta); + + // The output delta should have both changes (both pass the filter b > 2) + assert_eq!(output.changes.len(), 2); + assert_eq!(output.changes[0].1, -1); // delete weight + assert_eq!(output.changes[1].1, 1); // insert weight + + // The current state should be consolidated to only have rows with positive weight + let final_state = filter.get_current_state(); + + // After consolidation, we should have only one row with rowid=1 + assert_eq!( + final_state.changes.len(), + 1, + "State should be consolidated to have only one row" + ); + assert_eq!(final_state.changes[0].0.rowid, 1); + assert_eq!( + final_state.changes[0].0.values, + vec![Value::Integer(1), Value::Integer(3)] + ); + assert_eq!(final_state.changes[0].1, 1); // positive weight + } }