From e6008e532a977472ae0b5b989ac282c1d7be39d9 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Mon, 8 Sep 2025 09:09:13 -0500 Subject: [PATCH] Add a second delta to the EvalState, Commit We will assert that the second one is always empty for the existing operators - as they should be! But joins will need both. --- core/incremental/compiler.rs | 34 +++---- core/incremental/dbsp.rs | 34 +++++++ core/incremental/operator.rs | 176 +++++++++++++++++++++++------------ 3 files changed, 165 insertions(+), 79 deletions(-) diff --git a/core/incremental/compiler.rs b/core/incremental/compiler.rs index e15fc6dd2..135d03b6a 100644 --- a/core/incremental/compiler.rs +++ b/core/incremental/compiler.rs @@ -5,7 +5,7 @@ //! //! Based on the DBSP paper: "DBSP: Automatic Incremental View Maintenance for Rich Query Languages" -use crate::incremental::dbsp::Delta; +use crate::incremental::dbsp::{Delta, DeltaPair}; use crate::incremental::expr_compiler::CompiledExpression; use crate::incremental::operator::{ EvalState, FilterOperator, FilterPredicate, IncrementalOperator, InputOperator, ProjectOperator, @@ -231,13 +231,13 @@ impl DbspNode { }; let state = if commit_operators { - // Clone the delta from eval_state - don't extract it + // Clone the deltas from eval_state - don't extract them // in case we need to re-execute due to I/O - let delta = match eval_state { - EvalState::Init { delta } => delta.clone(), + let deltas = match eval_state { + EvalState::Init { deltas } => deltas.clone(), _ => panic!("commit can only be called when eval_state is in Init state"), }; - let result = return_if_io!(op.commit(delta, cursor)); + let result = return_if_io!(op.commit(deltas, cursor)); // After successful commit, move state to Done *eval_state = EvalState::Done; result @@ -518,7 +518,9 @@ impl DbspCircuit { // Input nodes get their delta directly from input_data let delta = input_data.get(name); *execute_state = ExecuteState::ProcessingNode { - eval_state: Box::new(EvalState::Init { delta }), + eval_state: Box::new(EvalState::Init { + deltas: delta.into(), + }), }; } _ => { @@ -552,22 +554,14 @@ impl DbspCircuit { input_deltas, } => { if *current_index >= input_states.len() { - // All inputs processed, check we have exactly one delta - // (Input nodes never reach here since they go straight to ProcessingNode) - let delta = if input_deltas.is_empty() { - return Err(LimboError::InternalError( - "execute() cannot be called without a Delta".to_string(), - )); - } else if input_deltas.len() > 1 { - return Err(LimboError::InternalError( - format!("Until joins are supported, only one delta is expected. Got {} deltas", input_deltas.len()), - )); - } else { - input_deltas[0].clone() - }; + // All inputs processed + let left_delta = input_deltas.first().cloned().unwrap_or_else(Delta::new); + let right_delta = input_deltas.get(1).cloned().unwrap_or_else(Delta::new); *execute_state = ExecuteState::ProcessingNode { - eval_state: Box::new(EvalState::Init { delta }), + eval_state: Box::new(EvalState::Init { + deltas: DeltaPair::new(left_delta, right_delta), + }), }; } else { // Get the (node_id, state) pair for the current index diff --git a/core/incremental/dbsp.rs b/core/incremental/dbsp.rs index 935a79146..363ac1142 100644 --- a/core/incremental/dbsp.rs +++ b/core/incremental/dbsp.rs @@ -167,6 +167,40 @@ impl Delta { } } +/// A pair of deltas for operators that process two inputs +#[derive(Debug, Clone)] +pub struct DeltaPair { + pub left: Delta, + pub right: Delta, +} + +impl DeltaPair { + /// Create a new delta pair + pub fn new(left: Delta, right: Delta) -> Self { + Self { left, right } + } +} + +impl From for DeltaPair { + /// Convert a single delta into a delta pair with empty right delta + fn from(delta: Delta) -> Self { + Self { + left: delta, + right: Delta::new(), + } + } +} + +impl From<&Delta> for DeltaPair { + /// Convert a delta reference into a delta pair with empty right delta + fn from(delta: &Delta) -> Self { + Self { + left: delta.clone(), + right: Delta::new(), + } + } +} + /// A simplified ZSet for incremental computation /// Each element has a weight: positive for additions, negative for deletions #[derive(Clone, Debug, Default)] diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 09a212fdd..825290bef 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -3,7 +3,7 @@ // Based on Feldera DBSP design but adapted for Turso's architecture use crate::function::{AggFunc, Func}; -use crate::incremental::dbsp::{Delta, HashableRow}; +use crate::incremental::dbsp::{Delta, DeltaPair, HashableRow}; use crate::incremental::expr_compiler::CompiledExpression; use crate::incremental::persistence::{ReadRecord, WriteRow}; use crate::storage::btree::BTreeCursor; @@ -42,7 +42,7 @@ enum AggregateCommitState { pub enum EvalState { Uninitialized, Init { - delta: Delta, + deltas: DeltaPair, }, FetchData { delta: Delta, // Keep original delta for merge operation @@ -57,25 +57,35 @@ pub enum EvalState { impl From for EvalState { fn from(delta: Delta) -> Self { - EvalState::Init { delta } + EvalState::Init { + deltas: delta.into(), + } + } +} + +impl From for EvalState { + fn from(deltas: DeltaPair) -> Self { + EvalState::Init { deltas } } } impl EvalState { fn from_delta(delta: Delta) -> Self { - Self::Init { delta } + Self::Init { + deltas: delta.into(), + } } fn delta_ref(&self) -> &Delta { match self { - EvalState::Init { delta } => delta, + EvalState::Init { deltas } => &deltas.left, _ => panic!("delta_ref() can only be called when in Init state",), } } pub fn extract_delta(&mut self) -> Delta { match self { - EvalState::Init { delta } => { - let extracted = std::mem::take(delta); + EvalState::Init { deltas } => { + let extracted = std::mem::take(&mut deltas.left); *self = EvalState::Uninitialized; extracted } @@ -85,7 +95,7 @@ impl EvalState { fn advance(&mut self, groups_to_read: BTreeMap>) { let delta = match self { - EvalState::Init { delta } => std::mem::take(delta), + EvalState::Init { deltas } => std::mem::take(&mut deltas.left), _ => panic!("advance() can only be called when in Init state, current state: {self:?}"), }; @@ -507,11 +517,11 @@ pub trait IncrementalOperator: Debug { /// The output delta from the evaluation fn eval(&mut self, state: &mut EvalState, cursor: &mut BTreeCursor) -> Result>; - /// Commit a delta to the operator's internal state and return the output + /// Commit deltas to the operator's internal state and return the output /// This is called when a transaction commits, making changes permanent /// Returns the output delta (what downstream operators should see) /// The cursor parameter is for operators that need to persist state - fn commit(&mut self, delta: Delta, cursor: &mut BTreeCursor) -> Result>; + fn commit(&mut self, deltas: DeltaPair, cursor: &mut BTreeCursor) -> Result>; /// Set computation tracker fn set_tracker(&mut self, tracker: Arc>); @@ -541,8 +551,13 @@ impl IncrementalOperator for InputOperator { _cursor: &mut BTreeCursor, ) -> Result> { match state { - EvalState::Init { delta } => { - let output = std::mem::take(delta); + EvalState::Init { deltas } => { + // Input operators only use left_delta, right_delta must be empty + assert!( + deltas.right.is_empty(), + "InputOperator expects right_delta to be empty" + ); + let output = std::mem::take(&mut deltas.left); *state = EvalState::Done; Ok(IOResult::Done(output)) } @@ -552,9 +567,14 @@ impl IncrementalOperator for InputOperator { } } - fn commit(&mut self, delta: Delta, _cursor: &mut BTreeCursor) -> Result> { + fn commit(&mut self, deltas: DeltaPair, _cursor: &mut BTreeCursor) -> Result> { + // Input operator only uses left delta, right must be empty + assert!( + deltas.right.is_empty(), + "InputOperator expects right delta to be empty in commit" + ); // Input operator passes through the delta unchanged during commit - Ok(IOResult::Done(delta)) + Ok(IOResult::Done(deltas.left)) } fn set_tracker(&mut self, _tracker: Arc>) { @@ -680,7 +700,14 @@ impl IncrementalOperator for FilterOperator { _cursor: &mut BTreeCursor, ) -> Result> { let delta = match state { - EvalState::Init { delta } => std::mem::take(delta), + EvalState::Init { deltas } => { + // Filter operators only use left_delta, right_delta must be empty + assert!( + deltas.right.is_empty(), + "FilterOperator expects right_delta to be empty" + ); + std::mem::take(&mut deltas.left) + } _ => unreachable!( "FilterOperator doesn't execute the state machine. Should be in Init state" ), @@ -706,12 +733,18 @@ impl IncrementalOperator for FilterOperator { Ok(IOResult::Done(output_delta)) } - fn commit(&mut self, delta: Delta, _cursor: &mut BTreeCursor) -> Result> { + fn commit(&mut self, deltas: DeltaPair, _cursor: &mut BTreeCursor) -> Result> { + // Filter operator only uses left delta, right must be empty + assert!( + deltas.right.is_empty(), + "FilterOperator expects right delta to be empty in commit" + ); + let mut output_delta = Delta::new(); // Commit the delta to our internal state // Only pass through and track rows that satisfy the filter predicate - for (row, weight) in delta.changes { + for (row, weight) in deltas.left.changes { if let Some(tracker) = &self.tracker { tracker.lock().unwrap().record_filter(); } @@ -1076,7 +1109,14 @@ impl IncrementalOperator for ProjectOperator { _cursor: &mut BTreeCursor, ) -> Result> { let delta = match state { - EvalState::Init { delta } => std::mem::take(delta), + EvalState::Init { deltas } => { + // Project operators only use left_delta, right_delta must be empty + assert!( + deltas.right.is_empty(), + "ProjectOperator expects right_delta to be empty" + ); + std::mem::take(&mut deltas.left) + } _ => unreachable!( "ProjectOperator doesn't execute the state machine. Should be in Init state" ), @@ -1098,11 +1138,17 @@ impl IncrementalOperator for ProjectOperator { Ok(IOResult::Done(output_delta)) } - fn commit(&mut self, delta: Delta, _cursor: &mut BTreeCursor) -> Result> { + fn commit(&mut self, deltas: DeltaPair, _cursor: &mut BTreeCursor) -> Result> { + // Project operator only uses left delta, right must be empty + assert!( + deltas.right.is_empty(), + "ProjectOperator expects right delta to be empty in commit" + ); + let mut output_delta = Delta::new(); // Commit the delta to our internal state and build output - for (row, weight) in &delta.changes { + for (row, weight) in &deltas.left.changes { if let Some(tracker) = &self.tracker { tracker.lock().unwrap().record_project(); } @@ -1426,14 +1472,20 @@ impl AggregateOperator { EvalState::Uninitialized => { panic!("Cannot eval AggregateOperator with Uninitialized state"); } - EvalState::Init { delta } => { - if delta.changes.is_empty() { + EvalState::Init { deltas } => { + // Aggregate operators only use left_delta, right_delta must be empty + assert!( + deltas.right.is_empty(), + "AggregateOperator expects right_delta to be empty" + ); + + if deltas.left.changes.is_empty() { *state = EvalState::Done; return Ok(IOResult::Done((Delta::new(), HashMap::new()))); } let mut groups_to_read = BTreeMap::new(); - for (row, _weight) in &delta.changes { + for (row, _weight) in &deltas.left.changes { // Extract group key using cloned fields let group_key = self.extract_group_key(&row.values); let group_key_str = Self::group_key_to_string(&group_key); @@ -1589,7 +1641,13 @@ impl IncrementalOperator for AggregateOperator { Ok(IOResult::Done(delta)) } - fn commit(&mut self, delta: Delta, cursor: &mut BTreeCursor) -> Result> { + fn commit(&mut self, deltas: DeltaPair, cursor: &mut BTreeCursor) -> Result> { + // Aggregate operator only uses left delta, right must be empty + assert!( + deltas.right.is_empty(), + "AggregateOperator expects right delta to be empty in commit" + ); + let delta = deltas.left; loop { // Note: because we std::mem::replace here (without it, the borrow checker goes nuts, // because we call self.eval_interval, which requires a mutable borrow), we have to @@ -1854,7 +1912,7 @@ mod tests { // Initialize with initial data pager .io - .block(|| agg.commit(initial_delta.clone(), &mut cursor)) + .block(|| agg.commit((&initial_delta).into(), &mut cursor)) .unwrap(); // Verify initial state: SUM(age) = 25 + 30 + 35 = 90 @@ -1878,7 +1936,7 @@ mod tests { // Process the incremental update let output_delta = pager .io - .block(|| agg.commit(update_delta.clone(), &mut cursor)) + .block(|| agg.commit((&update_delta).into(), &mut cursor)) .unwrap(); // CRITICAL: The output delta should contain TWO changes: @@ -1975,7 +2033,7 @@ mod tests { // Initialize with initial data pager .io - .block(|| agg.commit(initial_delta.clone(), &mut cursor)) + .block(|| agg.commit((&initial_delta).into(), &mut cursor)) .unwrap(); // Verify initial state: red team = 30, blue team = 15 @@ -2021,7 +2079,7 @@ mod tests { // Process the incremental update let output_delta = pager .io - .block(|| agg.commit(update_delta.clone(), &mut cursor)) + .block(|| agg.commit((&update_delta).into(), &mut cursor)) .unwrap(); // Should have 2 changes: retraction of old red team sum, insertion of new red team sum @@ -2103,7 +2161,7 @@ mod tests { } pager .io - .block(|| agg.commit(initial.clone(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursor)) .unwrap(); // Reset tracker for delta processing @@ -2122,7 +2180,7 @@ mod tests { pager .io - .block(|| agg.commit(delta.clone(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursor)) .unwrap(); assert_eq!(tracker.lock().unwrap().aggregation_updates, 1); @@ -2190,7 +2248,7 @@ mod tests { ); pager .io - .block(|| agg.commit(initial.clone(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursor)) .unwrap(); // Check initial state: Widget=250, Gadget=200 @@ -2219,7 +2277,7 @@ mod tests { pager .io - .block(|| agg.commit(delta.clone(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursor)) .unwrap(); assert_eq!(tracker.lock().unwrap().aggregation_updates, 1); @@ -2271,7 +2329,7 @@ mod tests { ); pager .io - .block(|| agg.commit(initial.clone(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursor)) .unwrap(); // Check initial state @@ -2306,7 +2364,7 @@ mod tests { ); pager .io - .block(|| agg.commit(delta.clone(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursor)) .unwrap(); // Check final state - user 1 should have updated count and sum @@ -2366,7 +2424,7 @@ mod tests { ); pager .io - .block(|| agg.commit(initial.clone(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursor)) .unwrap(); // Check initial averages @@ -2401,7 +2459,7 @@ mod tests { ); pager .io - .block(|| agg.commit(delta.clone(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursor)) .unwrap(); // Check final state - Category A avg should now be (10 + 20 + 30) / 3 = 20 @@ -2455,7 +2513,7 @@ mod tests { ); pager .io - .block(|| agg.commit(initial.clone(), &mut cursor)) + .block(|| agg.commit((&initial).into(), &mut cursor)) .unwrap(); // Check initial state: count=2, sum=300 @@ -2478,7 +2536,7 @@ mod tests { pager .io - .block(|| agg.commit(delta.clone(), &mut cursor)) + .block(|| agg.commit((&delta).into(), &mut cursor)) .unwrap(); // Check final state - should update to count=1, sum=200 @@ -2516,7 +2574,7 @@ mod tests { init_data.insert(3, vec![Value::Text("B".into()), Value::Integer(30)]); pager .io - .block(|| agg.commit(init_data.clone(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursor)) .unwrap(); // Check initial counts @@ -2544,7 +2602,7 @@ mod tests { let output = pager .io - .block(|| agg.commit(delete_delta.clone(), &mut cursor)) + .block(|| agg.commit((&delete_delta).into(), &mut cursor)) .unwrap(); // Should emit retraction for old count and insertion for new count @@ -2565,7 +2623,7 @@ mod tests { let output_b = pager .io - .block(|| agg.commit(delete_all_b.clone(), &mut cursor)) + .block(|| agg.commit((&delete_all_b).into(), &mut cursor)) .unwrap(); assert_eq!(output_b.changes.len(), 1); // Only retraction, no new row assert_eq!(output_b.changes[0].1, -1); // Retraction @@ -2601,7 +2659,7 @@ mod tests { init_data.insert(4, vec![Value::Text("B".into()), Value::Integer(15)]); pager .io - .block(|| agg.commit(init_data.clone(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursor)) .unwrap(); // Check initial sums @@ -2626,7 +2684,7 @@ mod tests { pager .io - .block(|| agg.commit(delete_delta.clone(), &mut cursor)) + .block(|| agg.commit((&delete_delta).into(), &mut cursor)) .unwrap(); // Check updated sum @@ -2645,7 +2703,7 @@ mod tests { pager .io - .block(|| agg.commit(delete_all_b.clone(), &mut cursor)) + .block(|| agg.commit((&delete_all_b).into(), &mut cursor)) .unwrap(); // Group B should be gone @@ -2678,7 +2736,7 @@ mod tests { init_data.insert(3, vec![Value::Text("A".into()), Value::Integer(30)]); pager .io - .block(|| agg.commit(init_data.clone(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursor)) .unwrap(); // Check initial average @@ -2692,7 +2750,7 @@ mod tests { pager .io - .block(|| agg.commit(delete_delta.clone(), &mut cursor)) + .block(|| agg.commit((&delete_delta).into(), &mut cursor)) .unwrap(); // Check updated average @@ -2705,7 +2763,7 @@ mod tests { pager .io - .block(|| agg.commit(delete_another.clone(), &mut cursor)) + .block(|| agg.commit((&delete_another).into(), &mut cursor)) .unwrap(); let state = get_current_state_from_btree(&agg, &pager, &mut cursor); @@ -2741,7 +2799,7 @@ mod tests { init_data.insert(3, vec![Value::Text("B".into()), Value::Integer(50)]); pager .io - .block(|| agg.commit(init_data.clone(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursor)) .unwrap(); // Check initial state @@ -2762,7 +2820,7 @@ mod tests { pager .io - .block(|| agg.commit(delete_delta.clone(), &mut cursor)) + .block(|| agg.commit((&delete_delta).into(), &mut cursor)) .unwrap(); // Check all aggregates updated correctly @@ -2783,7 +2841,7 @@ mod tests { pager .io - .block(|| agg.commit(insert_delta.clone(), &mut cursor)) + .block(|| agg.commit((&insert_delta).into(), &mut cursor)) .unwrap(); let state = get_current_state_from_btree(&agg, &pager, &mut cursor); @@ -2820,7 +2878,7 @@ mod tests { init_data.insert(3, vec![Value::Integer(3), Value::Integer(3)]); let state = pager .io - .block(|| filter.commit(init_data.clone(), &mut cursor)) + .block(|| filter.commit((&init_data).into(), &mut cursor)) .unwrap(); // Check initial state @@ -2839,7 +2897,7 @@ mod tests { let output = pager .io - .block(|| filter.commit(update_delta.clone(), &mut cursor)) + .block(|| filter.commit((&update_delta).into(), &mut cursor)) .unwrap(); // The output delta should have both changes (both pass the filter b > 2) @@ -2891,7 +2949,7 @@ mod tests { ); let state = pager .io - .block(|| filter.commit(init_data.clone(), &mut cursor)) + .block(|| filter.commit((&init_data).into(), &mut cursor)) .unwrap(); // Verify initial state (only Alice passes filter) @@ -2933,7 +2991,7 @@ mod tests { // Now commit the changes let state = pager .io - .block(|| filter.commit(uncommitted.clone(), &mut cursor)) + .block(|| filter.commit((&uncommitted).into(), &mut cursor)) .unwrap(); // State should now include Charlie (who passes filter) @@ -2993,7 +3051,7 @@ mod tests { ); pager .io - .block(|| agg.commit(init_data.clone(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursor)) .unwrap(); // Check initial state: A -> (count=2, sum=300), B -> (count=1, sum=150) @@ -3067,7 +3125,7 @@ mod tests { // Now commit the changes pager .io - .block(|| agg.commit(uncommitted.clone(), &mut cursor)) + .block(|| agg.commit((&uncommitted).into(), &mut cursor)) .unwrap(); // State should now be updated @@ -3131,7 +3189,7 @@ mod tests { init_data.insert(2, vec![Value::Integer(2), Value::Integer(200)]); pager .io - .block(|| agg.commit(init_data.clone(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursor)) .unwrap(); // Initial state: count=2, sum=300 @@ -3204,7 +3262,7 @@ mod tests { init_data.insert(2, vec![Value::Integer(2), Value::Text("Y".into())]); pager .io - .block(|| agg.commit(init_data.clone(), &mut cursor)) + .block(|| agg.commit((&init_data).into(), &mut cursor)) .unwrap(); // Create a committed delta (to be processed) @@ -3282,7 +3340,7 @@ mod tests { // Now commit only the committed_delta pager .io - .block(|| agg.commit(committed_delta.clone(), &mut cursor)) + .block(|| agg.commit((&committed_delta).into(), &mut cursor)) .unwrap(); // State should now have X count=2, Y count=1