From 874047276e44a939ef46acc4b708f993abed2c5e Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Mon, 8 Sep 2025 20:23:31 -0700 Subject: [PATCH] views: pass a DeltaSet for merge_delta A DeltaSet is a collection of Deltas, one per table. We'll need that for joins. The populate step for now will still generate a single set. That will be our next step to fix. --- core/incremental/compiler.rs | 10 ++++++ core/incremental/cursor.rs | 16 ++++----- core/incremental/view.rs | 66 +++++++++++++++++++++--------------- core/vdbe/execute.rs | 6 ++-- core/vdbe/mod.rs | 12 +++++-- 5 files changed, 67 insertions(+), 43 deletions(-) diff --git a/core/incremental/compiler.rs b/core/incremental/compiler.rs index 135d03b6a..abf1f6de3 100644 --- a/core/incremental/compiler.rs +++ b/core/incremental/compiler.rs @@ -148,6 +148,16 @@ impl DeltaSet { .cloned() .unwrap_or_else(Delta::new) } + + /// Convert DeltaSet into the underlying HashMap + pub fn into_map(self) -> HashMap { + self.deltas + } + + /// Check if all deltas in the set are empty + pub fn is_empty(&self) -> bool { + self.deltas.values().all(|d| d.is_empty()) + } } /// Represents a DBSP operator in the compiled circuit diff --git a/core/incremental/cursor.rs b/core/incremental/cursor.rs index df4f78d9d..fd97241a4 100644 --- a/core/incremental/cursor.rs +++ b/core/incremental/cursor.rs @@ -91,14 +91,12 @@ impl MaterializedViewCursor { // Get the view and the current transaction state let mut view_guard = self.view.lock().unwrap(); - let tx_delta = self.tx_state.get_delta(); + let table_deltas = self.tx_state.get_table_deltas(); - // Process the delta through the circuit to get materialized changes + // Process the deltas through the circuit to get materialized changes let mut uncommitted = DeltaSet::new(); - // Get the first table name from the view's referenced tables - let table_names = view_guard.get_referenced_table_names(); - if !table_names.is_empty() { - uncommitted.insert(table_names[0].clone(), tx_delta); + for (table_name, delta) in table_deltas { + uncommitted.insert(table_name, delta); } let processed_delta = return_if_io!(view_guard.execute_with_uncommitted( @@ -396,9 +394,9 @@ mod tests { ) { for (rowid, values, weight) in changes { if weight > 0 { - tx_state.insert(rowid, values); + tx_state.insert("test_table", rowid, values); } else if weight < 0 { - tx_state.delete(rowid, values); + tx_state.delete("test_table", rowid, values); } } } @@ -669,7 +667,7 @@ mod tests { assert_eq!(result, SeekResult::NotFound); // Add row 2 to uncommitted - tx_state.insert(2, vec![Value::Integer(2), Value::Integer(20)]); + tx_state.insert("test_table", 2, vec![Value::Integer(2), Value::Integer(20)]); // Now seek for row 2 finds it let result = pager diff --git a/core/incremental/view.rs b/core/incremental/view.rs index b2d0df606..a8c2ef8a2 100644 --- a/core/incremental/view.rs +++ b/core/incremental/view.rs @@ -55,47 +55,52 @@ impl fmt::Debug for PopulateState { /// Per-connection transaction state for incremental views #[derive(Debug, Clone, Default)] pub struct ViewTransactionState { - // Per-connection delta for uncommitted changes (contains both weights and values) + // Per-table deltas for uncommitted changes + // Maps table_name -> Delta for that table // Using RefCell for interior mutability - delta: RefCell, + table_deltas: RefCell>, } impl ViewTransactionState { /// Create a new transaction state pub fn new() -> Self { Self { - delta: RefCell::new(Delta::new()), + table_deltas: RefCell::new(HashMap::new()), } } - /// Insert a row into the delta - pub fn insert(&self, key: i64, values: Vec) { - self.delta.borrow_mut().insert(key, values); + /// Insert a row into the delta for a specific table + pub fn insert(&self, table_name: &str, key: i64, values: Vec) { + let mut deltas = self.table_deltas.borrow_mut(); + let delta = deltas.entry(table_name.to_string()).or_default(); + delta.insert(key, values); } - /// Delete a row from the delta - pub fn delete(&self, key: i64, values: Vec) { - self.delta.borrow_mut().delete(key, values); + /// Delete a row from the delta for a specific table + pub fn delete(&self, table_name: &str, key: i64, values: Vec) { + let mut deltas = self.table_deltas.borrow_mut(); + let delta = deltas.entry(table_name.to_string()).or_default(); + delta.delete(key, values); } /// Clear all changes in the delta pub fn clear(&self) { - self.delta.borrow_mut().changes.clear(); + self.table_deltas.borrow_mut().clear(); } - /// Get a clone of the current delta - pub fn get_delta(&self) -> Delta { - self.delta.borrow().clone() + /// Get deltas organized by table + pub fn get_table_deltas(&self) -> HashMap { + self.table_deltas.borrow().clone() } /// Check if the delta is empty pub fn is_empty(&self) -> bool { - self.delta.borrow().is_empty() + self.table_deltas.borrow().values().all(|d| d.is_empty()) } /// Returns how many elements exist in the delta. pub fn len(&self) -> usize { - self.delta.borrow().len() + self.table_deltas.borrow().values().map(|d| d.len()).sum() } } @@ -604,8 +609,13 @@ impl IncrementalView { let mut single_row_delta = Delta::new(); single_row_delta.insert(rowid, values.clone()); + // Create a DeltaSet with this delta for the first table (for now) + let mut delta_set = DeltaSet::new(); + // TODO: When we support JOINs, determine which table this row came from + delta_set.insert(self.referenced_tables[0].name.clone(), single_row_delta); + // Process the pending row with the pager - match self.merge_delta(&single_row_delta, pager.clone())? { + match self.merge_delta(delta_set, pager.clone())? { IOResult::Done(_) => { // Row processed successfully, continue to next row rows_processed += 1; @@ -671,8 +681,13 @@ impl IncrementalView { let mut single_row_delta = Delta::new(); single_row_delta.insert(rowid, values.clone()); + // Create a DeltaSet with this delta for the first table (for now) + let mut delta_set = DeltaSet::new(); + // TODO: When we support JOINs, determine which table this row came from + delta_set.insert(self.referenced_tables[0].name.clone(), single_row_delta); + // Process this single row through merge_delta with the pager - match self.merge_delta(&single_row_delta, pager.clone())? { + match self.merge_delta(delta_set, pager.clone())? { IOResult::Done(_) => { // Row processed successfully, continue to next row rows_processed += 1; @@ -801,24 +816,19 @@ impl IncrementalView { None } - /// Merge a delta of changes into the view's current state + /// Merge a delta set of changes into the view's current state pub fn merge_delta( &mut self, - delta: &Delta, + delta_set: DeltaSet, pager: std::rc::Rc, ) -> crate::Result> { - // Early return if delta is empty - if delta.is_empty() { + // Early return if all deltas are empty + if delta_set.is_empty() { return Ok(IOResult::Done(())); } - // Use the circuit to process the delta and write to btree - let mut input_data = HashMap::new(); - // For now, assume the delta applies to the first table - // TODO: This needs to be improved to handle deltas for multiple tables - if !self.referenced_tables.is_empty() { - input_data.insert(self.referenced_tables[0].name.clone(), delta.clone()); - } + // Use the circuit to process the deltas and write to btree + let input_data = delta_set.into_map(); // The circuit now handles all btree I/O internally with the provided pager let _delta = return_if_io!(self.circuit.commit(input_data, pager)); diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 11257211a..f7577c4a4 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -5525,7 +5525,7 @@ pub fn op_insert( .connection .view_transaction_states .get_or_create(view_name); - tx_state.delete(key, values.clone()); + tx_state.delete(table_name, key, values.clone()); } } for view_name in dependent_views.iter() { @@ -5534,7 +5534,7 @@ pub fn op_insert( .view_transaction_states .get_or_create(view_name); - tx_state.insert(key, values.clone()); + tx_state.insert(table_name, key, values.clone()); } break; @@ -5668,7 +5668,7 @@ pub fn op_delete( .connection .view_transaction_states .get_or_create(&view_name); - tx_state.delete(key, values.clone()); + tx_state.delete(table_name, key, values.clone()); } } break; diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index c5e4e91a8..92d9a0f5b 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -603,19 +603,25 @@ impl Program { let view_name = &views[*current_index]; - let delta = self + let table_deltas = self .connection .view_transaction_states .get(view_name) .unwrap() - .get_delta(); + .get_table_deltas(); let schema = self.connection.schema.borrow(); if let Some(view_mutex) = schema.get_materialized_view(view_name) { let mut view = view_mutex.lock().unwrap(); + // Create a DeltaSet from the per-table deltas + let mut delta_set = crate::incremental::compiler::DeltaSet::new(); + for (table_name, delta) in table_deltas { + delta_set.insert(table_name, delta); + } + // Handle I/O from merge_delta - pass pager, circuit will create its own cursor - match view.merge_delta(&delta, pager.clone())? { + match view.merge_delta(delta_set, pager.clone())? { IOResult::Done(_) => { // Move to next view state.view_delta_state = ViewDeltaCommitState::Processing {