views: pass a DeltaSet for merge_delta

A DeltaSet is a collection of Deltas, one per table.
We'll need that for joins. The populate step for now will still generate
a single set. That will be our next step to fix.
This commit is contained in:
Glauber Costa
2025-09-08 20:23:31 -07:00
parent 841de334b7
commit 874047276e
5 changed files with 67 additions and 43 deletions

View File

@@ -148,6 +148,16 @@ impl DeltaSet {
.cloned()
.unwrap_or_else(Delta::new)
}
/// Convert DeltaSet into the underlying HashMap
pub fn into_map(self) -> HashMap<String, Delta> {
self.deltas
}
/// Check if all deltas in the set are empty
pub fn is_empty(&self) -> bool {
self.deltas.values().all(|d| d.is_empty())
}
}
/// Represents a DBSP operator in the compiled circuit

View File

@@ -91,14 +91,12 @@ impl MaterializedViewCursor {
// Get the view and the current transaction state
let mut view_guard = self.view.lock().unwrap();
let tx_delta = self.tx_state.get_delta();
let table_deltas = self.tx_state.get_table_deltas();
// Process the delta through the circuit to get materialized changes
// Process the deltas through the circuit to get materialized changes
let mut uncommitted = DeltaSet::new();
// Get the first table name from the view's referenced tables
let table_names = view_guard.get_referenced_table_names();
if !table_names.is_empty() {
uncommitted.insert(table_names[0].clone(), tx_delta);
for (table_name, delta) in table_deltas {
uncommitted.insert(table_name, delta);
}
let processed_delta = return_if_io!(view_guard.execute_with_uncommitted(
@@ -396,9 +394,9 @@ mod tests {
) {
for (rowid, values, weight) in changes {
if weight > 0 {
tx_state.insert(rowid, values);
tx_state.insert("test_table", rowid, values);
} else if weight < 0 {
tx_state.delete(rowid, values);
tx_state.delete("test_table", rowid, values);
}
}
}
@@ -669,7 +667,7 @@ mod tests {
assert_eq!(result, SeekResult::NotFound);
// Add row 2 to uncommitted
tx_state.insert(2, vec![Value::Integer(2), Value::Integer(20)]);
tx_state.insert("test_table", 2, vec![Value::Integer(2), Value::Integer(20)]);
// Now seek for row 2 finds it
let result = pager

View File

@@ -55,47 +55,52 @@ impl fmt::Debug for PopulateState {
/// Per-connection transaction state for incremental views
#[derive(Debug, Clone, Default)]
pub struct ViewTransactionState {
// Per-connection delta for uncommitted changes (contains both weights and values)
// Per-table deltas for uncommitted changes
// Maps table_name -> Delta for that table
// Using RefCell for interior mutability
delta: RefCell<Delta>,
table_deltas: RefCell<HashMap<String, Delta>>,
}
impl ViewTransactionState {
/// Create a new transaction state
pub fn new() -> Self {
Self {
delta: RefCell::new(Delta::new()),
table_deltas: RefCell::new(HashMap::new()),
}
}
/// Insert a row into the delta
pub fn insert(&self, key: i64, values: Vec<Value>) {
self.delta.borrow_mut().insert(key, values);
/// Insert a row into the delta for a specific table
pub fn insert(&self, table_name: &str, key: i64, values: Vec<Value>) {
let mut deltas = self.table_deltas.borrow_mut();
let delta = deltas.entry(table_name.to_string()).or_default();
delta.insert(key, values);
}
/// Delete a row from the delta
pub fn delete(&self, key: i64, values: Vec<Value>) {
self.delta.borrow_mut().delete(key, values);
/// Delete a row from the delta for a specific table
pub fn delete(&self, table_name: &str, key: i64, values: Vec<Value>) {
let mut deltas = self.table_deltas.borrow_mut();
let delta = deltas.entry(table_name.to_string()).or_default();
delta.delete(key, values);
}
/// Clear all changes in the delta
pub fn clear(&self) {
self.delta.borrow_mut().changes.clear();
self.table_deltas.borrow_mut().clear();
}
/// Get a clone of the current delta
pub fn get_delta(&self) -> Delta {
self.delta.borrow().clone()
/// Get deltas organized by table
pub fn get_table_deltas(&self) -> HashMap<String, Delta> {
self.table_deltas.borrow().clone()
}
/// Check if the delta is empty
pub fn is_empty(&self) -> bool {
self.delta.borrow().is_empty()
self.table_deltas.borrow().values().all(|d| d.is_empty())
}
/// Returns how many elements exist in the delta.
pub fn len(&self) -> usize {
self.delta.borrow().len()
self.table_deltas.borrow().values().map(|d| d.len()).sum()
}
}
@@ -604,8 +609,13 @@ impl IncrementalView {
let mut single_row_delta = Delta::new();
single_row_delta.insert(rowid, values.clone());
// Create a DeltaSet with this delta for the first table (for now)
let mut delta_set = DeltaSet::new();
// TODO: When we support JOINs, determine which table this row came from
delta_set.insert(self.referenced_tables[0].name.clone(), single_row_delta);
// Process the pending row with the pager
match self.merge_delta(&single_row_delta, pager.clone())? {
match self.merge_delta(delta_set, pager.clone())? {
IOResult::Done(_) => {
// Row processed successfully, continue to next row
rows_processed += 1;
@@ -671,8 +681,13 @@ impl IncrementalView {
let mut single_row_delta = Delta::new();
single_row_delta.insert(rowid, values.clone());
// Create a DeltaSet with this delta for the first table (for now)
let mut delta_set = DeltaSet::new();
// TODO: When we support JOINs, determine which table this row came from
delta_set.insert(self.referenced_tables[0].name.clone(), single_row_delta);
// Process this single row through merge_delta with the pager
match self.merge_delta(&single_row_delta, pager.clone())? {
match self.merge_delta(delta_set, pager.clone())? {
IOResult::Done(_) => {
// Row processed successfully, continue to next row
rows_processed += 1;
@@ -801,24 +816,19 @@ impl IncrementalView {
None
}
/// Merge a delta of changes into the view's current state
/// Merge a delta set of changes into the view's current state
pub fn merge_delta(
&mut self,
delta: &Delta,
delta_set: DeltaSet,
pager: std::rc::Rc<crate::Pager>,
) -> crate::Result<IOResult<()>> {
// Early return if delta is empty
if delta.is_empty() {
// Early return if all deltas are empty
if delta_set.is_empty() {
return Ok(IOResult::Done(()));
}
// Use the circuit to process the delta and write to btree
let mut input_data = HashMap::new();
// For now, assume the delta applies to the first table
// TODO: This needs to be improved to handle deltas for multiple tables
if !self.referenced_tables.is_empty() {
input_data.insert(self.referenced_tables[0].name.clone(), delta.clone());
}
// Use the circuit to process the deltas and write to btree
let input_data = delta_set.into_map();
// The circuit now handles all btree I/O internally with the provided pager
let _delta = return_if_io!(self.circuit.commit(input_data, pager));

View File

@@ -5525,7 +5525,7 @@ pub fn op_insert(
.connection
.view_transaction_states
.get_or_create(view_name);
tx_state.delete(key, values.clone());
tx_state.delete(table_name, key, values.clone());
}
}
for view_name in dependent_views.iter() {
@@ -5534,7 +5534,7 @@ pub fn op_insert(
.view_transaction_states
.get_or_create(view_name);
tx_state.insert(key, values.clone());
tx_state.insert(table_name, key, values.clone());
}
break;
@@ -5668,7 +5668,7 @@ pub fn op_delete(
.connection
.view_transaction_states
.get_or_create(&view_name);
tx_state.delete(key, values.clone());
tx_state.delete(table_name, key, values.clone());
}
}
break;

View File

@@ -603,19 +603,25 @@ impl Program {
let view_name = &views[*current_index];
let delta = self
let table_deltas = self
.connection
.view_transaction_states
.get(view_name)
.unwrap()
.get_delta();
.get_table_deltas();
let schema = self.connection.schema.borrow();
if let Some(view_mutex) = schema.get_materialized_view(view_name) {
let mut view = view_mutex.lock().unwrap();
// Create a DeltaSet from the per-table deltas
let mut delta_set = crate::incremental::compiler::DeltaSet::new();
for (table_name, delta) in table_deltas {
delta_set.insert(table_name, delta);
}
// Handle I/O from merge_delta - pass pager, circuit will create its own cursor
match view.merge_delta(&delta, pager.clone())? {
match view.merge_delta(delta_set, pager.clone())? {
IOResult::Done(_) => {
// Move to next view
state.view_delta_state = ViewDeltaCommitState::Processing {