diff --git a/core/incremental/view.rs b/core/incremental/view.rs index 6576ab078..741849d35 100644 --- a/core/incremental/view.rs +++ b/core/incremental/view.rs @@ -552,13 +552,15 @@ impl IncrementalView { stmt, rows_processed, } => { - // Process rows in batches to allow for IO interruption + // Collect rows into a delta batch + let mut batch_delta = Delta::new(); let mut batch_count = 0; loop { if batch_count >= BATCH_SIZE { + // Process this batch through the standard pipeline + self.merge_delta(&batch_delta); // Yield control after processing a batch - // The statement maintains its position, so we'll resume from here return Ok(IOResult::IO); } @@ -586,38 +588,15 @@ impl IncrementalView { // Get all values except the rowid let values = all_values[..all_values.len() - 1].to_vec(); - // Apply filter if we have one - // Pure DBSP would ingest the entire stream and then apply filter operators. - // However, for initial population, we adopt a hybrid approach where we filter at - // the query result level for efficiency. This avoids reading millions of rows just - // to filter them down to a few. We only do this optimization for filters, not for - // other operators like projections or aggregations. - // TODO: We should further optimize by pushing the filter into the SQL WHERE clause. - - // Check filter first (we need to do this before accessing self mutably) - let passes_filter = - if let Some(ref filter_op) = self.filter_operator { - filter_op.evaluate_predicate(&values) - } else { - true - }; - - if passes_filter { - // Store the row with its original rowid - self.records.insert(rowid, values.clone()); - - // Update the ZSet stream with weight +1 - let mut delta = RowKeyZSet::new(); - use crate::incremental::hashable_row::HashableRow; - let row = HashableRow::new(rowid, values); - delta.insert(row, 1); - self.stream.apply_delta(&delta); - } + // Add to batch delta - let merge_delta handle filtering and aggregation + batch_delta.insert(rowid, values); *rows_processed += 1; batch_count += 1; } crate::vdbe::StepResult::Done => { + // Process any remaining rows in the batch + self.merge_delta(&batch_delta); // All rows processed, move to Done state self.populate_state = PopulateState::Done; return Ok(IOResult::Done(())); @@ -626,8 +605,9 @@ impl IncrementalView { return Err(LimboError::Busy); } crate::vdbe::StepResult::IO => { + // Process current batch before yielding + self.merge_delta(&batch_delta); // The Statement needs to wait for IO - // When we return here, the Statement maintains its position return Ok(IOResult::IO); } }