unify populate

populate now has its own code path to apply changes to the view. It was
okay until now because all we do is filter. But now that we are also
applying aggregations, we'll end up with two disjoint code paths.

A better approach is to just apply the results of our select to the
delta set, and apply it.
This commit is contained in:
Glauber Costa
2025-08-11 14:35:45 -05:00
parent 27c22a64b3
commit 333c5c435b

View File

@@ -552,13 +552,15 @@ impl IncrementalView {
stmt,
rows_processed,
} => {
// Process rows in batches to allow for IO interruption
// Collect rows into a delta batch
let mut batch_delta = Delta::new();
let mut batch_count = 0;
loop {
if batch_count >= BATCH_SIZE {
// Process this batch through the standard pipeline
self.merge_delta(&batch_delta);
// Yield control after processing a batch
// The statement maintains its position, so we'll resume from here
return Ok(IOResult::IO);
}
@@ -586,38 +588,15 @@ impl IncrementalView {
// Get all values except the rowid
let values = all_values[..all_values.len() - 1].to_vec();
// Apply filter if we have one
// Pure DBSP would ingest the entire stream and then apply filter operators.
// However, for initial population, we adopt a hybrid approach where we filter at
// the query result level for efficiency. This avoids reading millions of rows just
// to filter them down to a few. We only do this optimization for filters, not for
// other operators like projections or aggregations.
// TODO: We should further optimize by pushing the filter into the SQL WHERE clause.
// Check filter first (we need to do this before accessing self mutably)
let passes_filter =
if let Some(ref filter_op) = self.filter_operator {
filter_op.evaluate_predicate(&values)
} else {
true
};
if passes_filter {
// Store the row with its original rowid
self.records.insert(rowid, values.clone());
// Update the ZSet stream with weight +1
let mut delta = RowKeyZSet::new();
use crate::incremental::hashable_row::HashableRow;
let row = HashableRow::new(rowid, values);
delta.insert(row, 1);
self.stream.apply_delta(&delta);
}
// Add to batch delta - let merge_delta handle filtering and aggregation
batch_delta.insert(rowid, values);
*rows_processed += 1;
batch_count += 1;
}
crate::vdbe::StepResult::Done => {
// Process any remaining rows in the batch
self.merge_delta(&batch_delta);
// All rows processed, move to Done state
self.populate_state = PopulateState::Done;
return Ok(IOResult::Done(()));
@@ -626,8 +605,9 @@ impl IncrementalView {
return Err(LimboError::Busy);
}
crate::vdbe::StepResult::IO => {
// Process current batch before yielding
self.merge_delta(&batch_delta);
// The Statement needs to wait for IO
// When we return here, the Statement maintains its position
return Ok(IOResult::IO);
}
}