From 333c5c435b0246d635ee07ffc50fdfbd68a52066 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Mon, 11 Aug 2025 14:35:45 -0500 Subject: [PATCH] unify populate populate now has its own code path to apply changes to the view. It was okay until now because all we do is filter. But now that we are also applying aggregations, we'll end up with two disjoint code paths. A better approach is to just apply the results of our select to the delta set, and apply it. --- core/incremental/view.rs | 40 ++++++++++------------------------------ 1 file changed, 10 insertions(+), 30 deletions(-) diff --git a/core/incremental/view.rs b/core/incremental/view.rs index 6576ab078..741849d35 100644 --- a/core/incremental/view.rs +++ b/core/incremental/view.rs @@ -552,13 +552,15 @@ impl IncrementalView { stmt, rows_processed, } => { - // Process rows in batches to allow for IO interruption + // Collect rows into a delta batch + let mut batch_delta = Delta::new(); let mut batch_count = 0; loop { if batch_count >= BATCH_SIZE { + // Process this batch through the standard pipeline + self.merge_delta(&batch_delta); // Yield control after processing a batch - // The statement maintains its position, so we'll resume from here return Ok(IOResult::IO); } @@ -586,38 +588,15 @@ impl IncrementalView { // Get all values except the rowid let values = all_values[..all_values.len() - 1].to_vec(); - // Apply filter if we have one - // Pure DBSP would ingest the entire stream and then apply filter operators. - // However, for initial population, we adopt a hybrid approach where we filter at - // the query result level for efficiency. This avoids reading millions of rows just - // to filter them down to a few. We only do this optimization for filters, not for - // other operators like projections or aggregations. - // TODO: We should further optimize by pushing the filter into the SQL WHERE clause. - - // Check filter first (we need to do this before accessing self mutably) - let passes_filter = - if let Some(ref filter_op) = self.filter_operator { - filter_op.evaluate_predicate(&values) - } else { - true - }; - - if passes_filter { - // Store the row with its original rowid - self.records.insert(rowid, values.clone()); - - // Update the ZSet stream with weight +1 - let mut delta = RowKeyZSet::new(); - use crate::incremental::hashable_row::HashableRow; - let row = HashableRow::new(rowid, values); - delta.insert(row, 1); - self.stream.apply_delta(&delta); - } + // Add to batch delta - let merge_delta handle filtering and aggregation + batch_delta.insert(rowid, values); *rows_processed += 1; batch_count += 1; } crate::vdbe::StepResult::Done => { + // Process any remaining rows in the batch + self.merge_delta(&batch_delta); // All rows processed, move to Done state self.populate_state = PopulateState::Done; return Ok(IOResult::Done(())); @@ -626,8 +605,9 @@ impl IncrementalView { return Err(LimboError::Busy); } crate::vdbe::StepResult::IO => { + // Process current batch before yielding + self.merge_delta(&batch_delta); // The Statement needs to wait for IO - // When we return here, the Statement maintains its position return Ok(IOResult::IO); } }