Add a second delta to the EvalState, Commit

We will assert that the second one is always empty for the existing
operators - as they should be!

But joins will need both.
This commit is contained in:
Glauber Costa
2025-09-08 09:09:13 -05:00
parent 6541a43670
commit e6008e532a
3 changed files with 165 additions and 79 deletions

View File

@@ -5,7 +5,7 @@
//!
//! Based on the DBSP paper: "DBSP: Automatic Incremental View Maintenance for Rich Query Languages"
use crate::incremental::dbsp::Delta;
use crate::incremental::dbsp::{Delta, DeltaPair};
use crate::incremental::expr_compiler::CompiledExpression;
use crate::incremental::operator::{
EvalState, FilterOperator, FilterPredicate, IncrementalOperator, InputOperator, ProjectOperator,
@@ -231,13 +231,13 @@ impl DbspNode {
};
let state = if commit_operators {
// Clone the delta from eval_state - don't extract it
// Clone the deltas from eval_state - don't extract them
// in case we need to re-execute due to I/O
let delta = match eval_state {
EvalState::Init { delta } => delta.clone(),
let deltas = match eval_state {
EvalState::Init { deltas } => deltas.clone(),
_ => panic!("commit can only be called when eval_state is in Init state"),
};
let result = return_if_io!(op.commit(delta, cursor));
let result = return_if_io!(op.commit(deltas, cursor));
// After successful commit, move state to Done
*eval_state = EvalState::Done;
result
@@ -518,7 +518,9 @@ impl DbspCircuit {
// Input nodes get their delta directly from input_data
let delta = input_data.get(name);
*execute_state = ExecuteState::ProcessingNode {
eval_state: Box::new(EvalState::Init { delta }),
eval_state: Box::new(EvalState::Init {
deltas: delta.into(),
}),
};
}
_ => {
@@ -552,22 +554,14 @@ impl DbspCircuit {
input_deltas,
} => {
if *current_index >= input_states.len() {
// All inputs processed, check we have exactly one delta
// (Input nodes never reach here since they go straight to ProcessingNode)
let delta = if input_deltas.is_empty() {
return Err(LimboError::InternalError(
"execute() cannot be called without a Delta".to_string(),
));
} else if input_deltas.len() > 1 {
return Err(LimboError::InternalError(
format!("Until joins are supported, only one delta is expected. Got {} deltas", input_deltas.len()),
));
} else {
input_deltas[0].clone()
};
// All inputs processed
let left_delta = input_deltas.first().cloned().unwrap_or_else(Delta::new);
let right_delta = input_deltas.get(1).cloned().unwrap_or_else(Delta::new);
*execute_state = ExecuteState::ProcessingNode {
eval_state: Box::new(EvalState::Init { delta }),
eval_state: Box::new(EvalState::Init {
deltas: DeltaPair::new(left_delta, right_delta),
}),
};
} else {
// Get the (node_id, state) pair for the current index

View File

@@ -167,6 +167,40 @@ impl Delta {
}
}
/// A pair of deltas for operators that process two inputs
#[derive(Debug, Clone)]
pub struct DeltaPair {
pub left: Delta,
pub right: Delta,
}
impl DeltaPair {
/// Create a new delta pair
pub fn new(left: Delta, right: Delta) -> Self {
Self { left, right }
}
}
impl From<Delta> for DeltaPair {
/// Convert a single delta into a delta pair with empty right delta
fn from(delta: Delta) -> Self {
Self {
left: delta,
right: Delta::new(),
}
}
}
impl From<&Delta> for DeltaPair {
/// Convert a delta reference into a delta pair with empty right delta
fn from(delta: &Delta) -> Self {
Self {
left: delta.clone(),
right: Delta::new(),
}
}
}
/// A simplified ZSet for incremental computation
/// Each element has a weight: positive for additions, negative for deletions
#[derive(Clone, Debug, Default)]

View File

@@ -3,7 +3,7 @@
// Based on Feldera DBSP design but adapted for Turso's architecture
use crate::function::{AggFunc, Func};
use crate::incremental::dbsp::{Delta, HashableRow};
use crate::incremental::dbsp::{Delta, DeltaPair, HashableRow};
use crate::incremental::expr_compiler::CompiledExpression;
use crate::incremental::persistence::{ReadRecord, WriteRow};
use crate::storage::btree::BTreeCursor;
@@ -42,7 +42,7 @@ enum AggregateCommitState {
pub enum EvalState {
Uninitialized,
Init {
delta: Delta,
deltas: DeltaPair,
},
FetchData {
delta: Delta, // Keep original delta for merge operation
@@ -57,25 +57,35 @@ pub enum EvalState {
impl From<Delta> for EvalState {
fn from(delta: Delta) -> Self {
EvalState::Init { delta }
EvalState::Init {
deltas: delta.into(),
}
}
}
impl From<DeltaPair> for EvalState {
fn from(deltas: DeltaPair) -> Self {
EvalState::Init { deltas }
}
}
impl EvalState {
fn from_delta(delta: Delta) -> Self {
Self::Init { delta }
Self::Init {
deltas: delta.into(),
}
}
fn delta_ref(&self) -> &Delta {
match self {
EvalState::Init { delta } => delta,
EvalState::Init { deltas } => &deltas.left,
_ => panic!("delta_ref() can only be called when in Init state",),
}
}
pub fn extract_delta(&mut self) -> Delta {
match self {
EvalState::Init { delta } => {
let extracted = std::mem::take(delta);
EvalState::Init { deltas } => {
let extracted = std::mem::take(&mut deltas.left);
*self = EvalState::Uninitialized;
extracted
}
@@ -85,7 +95,7 @@ impl EvalState {
fn advance(&mut self, groups_to_read: BTreeMap<String, Vec<Value>>) {
let delta = match self {
EvalState::Init { delta } => std::mem::take(delta),
EvalState::Init { deltas } => std::mem::take(&mut deltas.left),
_ => panic!("advance() can only be called when in Init state, current state: {self:?}"),
};
@@ -507,11 +517,11 @@ pub trait IncrementalOperator: Debug {
/// The output delta from the evaluation
fn eval(&mut self, state: &mut EvalState, cursor: &mut BTreeCursor) -> Result<IOResult<Delta>>;
/// Commit a delta to the operator's internal state and return the output
/// Commit deltas to the operator's internal state and return the output
/// This is called when a transaction commits, making changes permanent
/// Returns the output delta (what downstream operators should see)
/// The cursor parameter is for operators that need to persist state
fn commit(&mut self, delta: Delta, cursor: &mut BTreeCursor) -> Result<IOResult<Delta>>;
fn commit(&mut self, deltas: DeltaPair, cursor: &mut BTreeCursor) -> Result<IOResult<Delta>>;
/// Set computation tracker
fn set_tracker(&mut self, tracker: Arc<Mutex<ComputationTracker>>);
@@ -541,8 +551,13 @@ impl IncrementalOperator for InputOperator {
_cursor: &mut BTreeCursor,
) -> Result<IOResult<Delta>> {
match state {
EvalState::Init { delta } => {
let output = std::mem::take(delta);
EvalState::Init { deltas } => {
// Input operators only use left_delta, right_delta must be empty
assert!(
deltas.right.is_empty(),
"InputOperator expects right_delta to be empty"
);
let output = std::mem::take(&mut deltas.left);
*state = EvalState::Done;
Ok(IOResult::Done(output))
}
@@ -552,9 +567,14 @@ impl IncrementalOperator for InputOperator {
}
}
fn commit(&mut self, delta: Delta, _cursor: &mut BTreeCursor) -> Result<IOResult<Delta>> {
fn commit(&mut self, deltas: DeltaPair, _cursor: &mut BTreeCursor) -> Result<IOResult<Delta>> {
// Input operator only uses left delta, right must be empty
assert!(
deltas.right.is_empty(),
"InputOperator expects right delta to be empty in commit"
);
// Input operator passes through the delta unchanged during commit
Ok(IOResult::Done(delta))
Ok(IOResult::Done(deltas.left))
}
fn set_tracker(&mut self, _tracker: Arc<Mutex<ComputationTracker>>) {
@@ -680,7 +700,14 @@ impl IncrementalOperator for FilterOperator {
_cursor: &mut BTreeCursor,
) -> Result<IOResult<Delta>> {
let delta = match state {
EvalState::Init { delta } => std::mem::take(delta),
EvalState::Init { deltas } => {
// Filter operators only use left_delta, right_delta must be empty
assert!(
deltas.right.is_empty(),
"FilterOperator expects right_delta to be empty"
);
std::mem::take(&mut deltas.left)
}
_ => unreachable!(
"FilterOperator doesn't execute the state machine. Should be in Init state"
),
@@ -706,12 +733,18 @@ impl IncrementalOperator for FilterOperator {
Ok(IOResult::Done(output_delta))
}
fn commit(&mut self, delta: Delta, _cursor: &mut BTreeCursor) -> Result<IOResult<Delta>> {
fn commit(&mut self, deltas: DeltaPair, _cursor: &mut BTreeCursor) -> Result<IOResult<Delta>> {
// Filter operator only uses left delta, right must be empty
assert!(
deltas.right.is_empty(),
"FilterOperator expects right delta to be empty in commit"
);
let mut output_delta = Delta::new();
// Commit the delta to our internal state
// Only pass through and track rows that satisfy the filter predicate
for (row, weight) in delta.changes {
for (row, weight) in deltas.left.changes {
if let Some(tracker) = &self.tracker {
tracker.lock().unwrap().record_filter();
}
@@ -1076,7 +1109,14 @@ impl IncrementalOperator for ProjectOperator {
_cursor: &mut BTreeCursor,
) -> Result<IOResult<Delta>> {
let delta = match state {
EvalState::Init { delta } => std::mem::take(delta),
EvalState::Init { deltas } => {
// Project operators only use left_delta, right_delta must be empty
assert!(
deltas.right.is_empty(),
"ProjectOperator expects right_delta to be empty"
);
std::mem::take(&mut deltas.left)
}
_ => unreachable!(
"ProjectOperator doesn't execute the state machine. Should be in Init state"
),
@@ -1098,11 +1138,17 @@ impl IncrementalOperator for ProjectOperator {
Ok(IOResult::Done(output_delta))
}
fn commit(&mut self, delta: Delta, _cursor: &mut BTreeCursor) -> Result<IOResult<Delta>> {
fn commit(&mut self, deltas: DeltaPair, _cursor: &mut BTreeCursor) -> Result<IOResult<Delta>> {
// Project operator only uses left delta, right must be empty
assert!(
deltas.right.is_empty(),
"ProjectOperator expects right delta to be empty in commit"
);
let mut output_delta = Delta::new();
// Commit the delta to our internal state and build output
for (row, weight) in &delta.changes {
for (row, weight) in &deltas.left.changes {
if let Some(tracker) = &self.tracker {
tracker.lock().unwrap().record_project();
}
@@ -1426,14 +1472,20 @@ impl AggregateOperator {
EvalState::Uninitialized => {
panic!("Cannot eval AggregateOperator with Uninitialized state");
}
EvalState::Init { delta } => {
if delta.changes.is_empty() {
EvalState::Init { deltas } => {
// Aggregate operators only use left_delta, right_delta must be empty
assert!(
deltas.right.is_empty(),
"AggregateOperator expects right_delta to be empty"
);
if deltas.left.changes.is_empty() {
*state = EvalState::Done;
return Ok(IOResult::Done((Delta::new(), HashMap::new())));
}
let mut groups_to_read = BTreeMap::new();
for (row, _weight) in &delta.changes {
for (row, _weight) in &deltas.left.changes {
// Extract group key using cloned fields
let group_key = self.extract_group_key(&row.values);
let group_key_str = Self::group_key_to_string(&group_key);
@@ -1589,7 +1641,13 @@ impl IncrementalOperator for AggregateOperator {
Ok(IOResult::Done(delta))
}
fn commit(&mut self, delta: Delta, cursor: &mut BTreeCursor) -> Result<IOResult<Delta>> {
fn commit(&mut self, deltas: DeltaPair, cursor: &mut BTreeCursor) -> Result<IOResult<Delta>> {
// Aggregate operator only uses left delta, right must be empty
assert!(
deltas.right.is_empty(),
"AggregateOperator expects right delta to be empty in commit"
);
let delta = deltas.left;
loop {
// Note: because we std::mem::replace here (without it, the borrow checker goes nuts,
// because we call self.eval_interval, which requires a mutable borrow), we have to
@@ -1854,7 +1912,7 @@ mod tests {
// Initialize with initial data
pager
.io
.block(|| agg.commit(initial_delta.clone(), &mut cursor))
.block(|| agg.commit((&initial_delta).into(), &mut cursor))
.unwrap();
// Verify initial state: SUM(age) = 25 + 30 + 35 = 90
@@ -1878,7 +1936,7 @@ mod tests {
// Process the incremental update
let output_delta = pager
.io
.block(|| agg.commit(update_delta.clone(), &mut cursor))
.block(|| agg.commit((&update_delta).into(), &mut cursor))
.unwrap();
// CRITICAL: The output delta should contain TWO changes:
@@ -1975,7 +2033,7 @@ mod tests {
// Initialize with initial data
pager
.io
.block(|| agg.commit(initial_delta.clone(), &mut cursor))
.block(|| agg.commit((&initial_delta).into(), &mut cursor))
.unwrap();
// Verify initial state: red team = 30, blue team = 15
@@ -2021,7 +2079,7 @@ mod tests {
// Process the incremental update
let output_delta = pager
.io
.block(|| agg.commit(update_delta.clone(), &mut cursor))
.block(|| agg.commit((&update_delta).into(), &mut cursor))
.unwrap();
// Should have 2 changes: retraction of old red team sum, insertion of new red team sum
@@ -2103,7 +2161,7 @@ mod tests {
}
pager
.io
.block(|| agg.commit(initial.clone(), &mut cursor))
.block(|| agg.commit((&initial).into(), &mut cursor))
.unwrap();
// Reset tracker for delta processing
@@ -2122,7 +2180,7 @@ mod tests {
pager
.io
.block(|| agg.commit(delta.clone(), &mut cursor))
.block(|| agg.commit((&delta).into(), &mut cursor))
.unwrap();
assert_eq!(tracker.lock().unwrap().aggregation_updates, 1);
@@ -2190,7 +2248,7 @@ mod tests {
);
pager
.io
.block(|| agg.commit(initial.clone(), &mut cursor))
.block(|| agg.commit((&initial).into(), &mut cursor))
.unwrap();
// Check initial state: Widget=250, Gadget=200
@@ -2219,7 +2277,7 @@ mod tests {
pager
.io
.block(|| agg.commit(delta.clone(), &mut cursor))
.block(|| agg.commit((&delta).into(), &mut cursor))
.unwrap();
assert_eq!(tracker.lock().unwrap().aggregation_updates, 1);
@@ -2271,7 +2329,7 @@ mod tests {
);
pager
.io
.block(|| agg.commit(initial.clone(), &mut cursor))
.block(|| agg.commit((&initial).into(), &mut cursor))
.unwrap();
// Check initial state
@@ -2306,7 +2364,7 @@ mod tests {
);
pager
.io
.block(|| agg.commit(delta.clone(), &mut cursor))
.block(|| agg.commit((&delta).into(), &mut cursor))
.unwrap();
// Check final state - user 1 should have updated count and sum
@@ -2366,7 +2424,7 @@ mod tests {
);
pager
.io
.block(|| agg.commit(initial.clone(), &mut cursor))
.block(|| agg.commit((&initial).into(), &mut cursor))
.unwrap();
// Check initial averages
@@ -2401,7 +2459,7 @@ mod tests {
);
pager
.io
.block(|| agg.commit(delta.clone(), &mut cursor))
.block(|| agg.commit((&delta).into(), &mut cursor))
.unwrap();
// Check final state - Category A avg should now be (10 + 20 + 30) / 3 = 20
@@ -2455,7 +2513,7 @@ mod tests {
);
pager
.io
.block(|| agg.commit(initial.clone(), &mut cursor))
.block(|| agg.commit((&initial).into(), &mut cursor))
.unwrap();
// Check initial state: count=2, sum=300
@@ -2478,7 +2536,7 @@ mod tests {
pager
.io
.block(|| agg.commit(delta.clone(), &mut cursor))
.block(|| agg.commit((&delta).into(), &mut cursor))
.unwrap();
// Check final state - should update to count=1, sum=200
@@ -2516,7 +2574,7 @@ mod tests {
init_data.insert(3, vec![Value::Text("B".into()), Value::Integer(30)]);
pager
.io
.block(|| agg.commit(init_data.clone(), &mut cursor))
.block(|| agg.commit((&init_data).into(), &mut cursor))
.unwrap();
// Check initial counts
@@ -2544,7 +2602,7 @@ mod tests {
let output = pager
.io
.block(|| agg.commit(delete_delta.clone(), &mut cursor))
.block(|| agg.commit((&delete_delta).into(), &mut cursor))
.unwrap();
// Should emit retraction for old count and insertion for new count
@@ -2565,7 +2623,7 @@ mod tests {
let output_b = pager
.io
.block(|| agg.commit(delete_all_b.clone(), &mut cursor))
.block(|| agg.commit((&delete_all_b).into(), &mut cursor))
.unwrap();
assert_eq!(output_b.changes.len(), 1); // Only retraction, no new row
assert_eq!(output_b.changes[0].1, -1); // Retraction
@@ -2601,7 +2659,7 @@ mod tests {
init_data.insert(4, vec![Value::Text("B".into()), Value::Integer(15)]);
pager
.io
.block(|| agg.commit(init_data.clone(), &mut cursor))
.block(|| agg.commit((&init_data).into(), &mut cursor))
.unwrap();
// Check initial sums
@@ -2626,7 +2684,7 @@ mod tests {
pager
.io
.block(|| agg.commit(delete_delta.clone(), &mut cursor))
.block(|| agg.commit((&delete_delta).into(), &mut cursor))
.unwrap();
// Check updated sum
@@ -2645,7 +2703,7 @@ mod tests {
pager
.io
.block(|| agg.commit(delete_all_b.clone(), &mut cursor))
.block(|| agg.commit((&delete_all_b).into(), &mut cursor))
.unwrap();
// Group B should be gone
@@ -2678,7 +2736,7 @@ mod tests {
init_data.insert(3, vec![Value::Text("A".into()), Value::Integer(30)]);
pager
.io
.block(|| agg.commit(init_data.clone(), &mut cursor))
.block(|| agg.commit((&init_data).into(), &mut cursor))
.unwrap();
// Check initial average
@@ -2692,7 +2750,7 @@ mod tests {
pager
.io
.block(|| agg.commit(delete_delta.clone(), &mut cursor))
.block(|| agg.commit((&delete_delta).into(), &mut cursor))
.unwrap();
// Check updated average
@@ -2705,7 +2763,7 @@ mod tests {
pager
.io
.block(|| agg.commit(delete_another.clone(), &mut cursor))
.block(|| agg.commit((&delete_another).into(), &mut cursor))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursor);
@@ -2741,7 +2799,7 @@ mod tests {
init_data.insert(3, vec![Value::Text("B".into()), Value::Integer(50)]);
pager
.io
.block(|| agg.commit(init_data.clone(), &mut cursor))
.block(|| agg.commit((&init_data).into(), &mut cursor))
.unwrap();
// Check initial state
@@ -2762,7 +2820,7 @@ mod tests {
pager
.io
.block(|| agg.commit(delete_delta.clone(), &mut cursor))
.block(|| agg.commit((&delete_delta).into(), &mut cursor))
.unwrap();
// Check all aggregates updated correctly
@@ -2783,7 +2841,7 @@ mod tests {
pager
.io
.block(|| agg.commit(insert_delta.clone(), &mut cursor))
.block(|| agg.commit((&insert_delta).into(), &mut cursor))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursor);
@@ -2820,7 +2878,7 @@ mod tests {
init_data.insert(3, vec![Value::Integer(3), Value::Integer(3)]);
let state = pager
.io
.block(|| filter.commit(init_data.clone(), &mut cursor))
.block(|| filter.commit((&init_data).into(), &mut cursor))
.unwrap();
// Check initial state
@@ -2839,7 +2897,7 @@ mod tests {
let output = pager
.io
.block(|| filter.commit(update_delta.clone(), &mut cursor))
.block(|| filter.commit((&update_delta).into(), &mut cursor))
.unwrap();
// The output delta should have both changes (both pass the filter b > 2)
@@ -2891,7 +2949,7 @@ mod tests {
);
let state = pager
.io
.block(|| filter.commit(init_data.clone(), &mut cursor))
.block(|| filter.commit((&init_data).into(), &mut cursor))
.unwrap();
// Verify initial state (only Alice passes filter)
@@ -2933,7 +2991,7 @@ mod tests {
// Now commit the changes
let state = pager
.io
.block(|| filter.commit(uncommitted.clone(), &mut cursor))
.block(|| filter.commit((&uncommitted).into(), &mut cursor))
.unwrap();
// State should now include Charlie (who passes filter)
@@ -2993,7 +3051,7 @@ mod tests {
);
pager
.io
.block(|| agg.commit(init_data.clone(), &mut cursor))
.block(|| agg.commit((&init_data).into(), &mut cursor))
.unwrap();
// Check initial state: A -> (count=2, sum=300), B -> (count=1, sum=150)
@@ -3067,7 +3125,7 @@ mod tests {
// Now commit the changes
pager
.io
.block(|| agg.commit(uncommitted.clone(), &mut cursor))
.block(|| agg.commit((&uncommitted).into(), &mut cursor))
.unwrap();
// State should now be updated
@@ -3131,7 +3189,7 @@ mod tests {
init_data.insert(2, vec![Value::Integer(2), Value::Integer(200)]);
pager
.io
.block(|| agg.commit(init_data.clone(), &mut cursor))
.block(|| agg.commit((&init_data).into(), &mut cursor))
.unwrap();
// Initial state: count=2, sum=300
@@ -3204,7 +3262,7 @@ mod tests {
init_data.insert(2, vec![Value::Integer(2), Value::Text("Y".into())]);
pager
.io
.block(|| agg.commit(init_data.clone(), &mut cursor))
.block(|| agg.commit((&init_data).into(), &mut cursor))
.unwrap();
// Create a committed delta (to be processed)
@@ -3282,7 +3340,7 @@ mod tests {
// Now commit only the committed_delta
pager
.io
.block(|| agg.commit(committed_delta.clone(), &mut cursor))
.block(|| agg.commit((&committed_delta).into(), &mut cursor))
.unwrap();
// State should now have X count=2, Y count=1