diff --git a/core/incremental/dbsp.rs b/core/incremental/dbsp.rs index 363ac1142..d4862b70a 100644 --- a/core/incremental/dbsp.rs +++ b/core/incremental/dbsp.rs @@ -75,6 +75,10 @@ impl HashableRow { hasher.finish() } + + pub fn cached_hash(&self) -> u64 { + self.cached_hash + } } impl Hash for HashableRow { @@ -168,7 +172,7 @@ impl Delta { } /// A pair of deltas for operators that process two inputs -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct DeltaPair { pub left: Delta, pub right: Delta, diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 7c402db93..78d2ca60d 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -119,14 +119,9 @@ enum AggregateCommitState { Invalid, } -// eval() has uncommitted data, so it can't be a member attribute of the Operator. -// The state has to be kept by the caller +// Aggregate-specific eval states #[derive(Debug)] -pub enum EvalState { - Uninitialized, - Init { - deltas: DeltaPair, - }, +pub enum AggregateEvalState { FetchKey { delta: Delta, // Keep original delta for merge operation current_idx: usize, @@ -149,6 +144,299 @@ pub enum EvalState { old_values: HashMap>, recompute_state: Box, }, + Done { + output: (Delta, ComputedStates), + }, +} + +// Helper function to read the next row from the BTree for joins +fn read_next_join_row( + storage_id: i64, + join_key: &HashableRow, + last_element_id: i64, + cursors: &mut DbspStateCursors, +) -> Result>> { + // Build the index key: (storage_id, zset_id, element_id) + // zset_id is the hash of the join key + let zset_id = join_key.cached_hash() as i64; + + let index_key_values = vec![ + Value::Integer(storage_id), + Value::Integer(zset_id), + Value::Integer(last_element_id), + ]; + + let index_record = ImmutableRecord::from_values(&index_key_values, index_key_values.len()); + let seek_result = return_if_io!(cursors + .index_cursor + .seek(SeekKey::IndexKey(&index_record), SeekOp::GT)); + + if !matches!(seek_result, SeekResult::Found) { + return Ok(IOResult::Done(None)); + } + + // Check if we're still in the same (storage_id, zset_id) range + let current_record = return_if_io!(cursors.index_cursor.record()); + + // Extract all needed values from the record before dropping it + let (found_storage_id, found_zset_id, element_id) = if let Some(rec) = current_record { + let values = rec.get_values(); + + // Index has 4 values: storage_id, zset_id, element_id, rowid (appended by WriteRow) + if values.len() >= 3 { + let found_storage_id = match &values[0].to_owned() { + Value::Integer(id) => *id, + _ => return Ok(IOResult::Done(None)), + }; + let found_zset_id = match &values[1].to_owned() { + Value::Integer(id) => *id, + _ => return Ok(IOResult::Done(None)), + }; + let element_id = match &values[2].to_owned() { + Value::Integer(id) => *id, + _ => { + return Ok(IOResult::Done(None)); + } + }; + (found_storage_id, found_zset_id, element_id) + } else { + return Ok(IOResult::Done(None)); + } + } else { + return Ok(IOResult::Done(None)); + }; + + // Now we can safely check if we're in the right range + // If we've moved to a different storage_id or zset_id, we're done + if found_storage_id != storage_id || found_zset_id != zset_id { + return Ok(IOResult::Done(None)); + } + + // Now get the actual row from the table using the rowid from the index + let rowid = return_if_io!(cursors.index_cursor.rowid()); + if let Some(rowid) = rowid { + return_if_io!(cursors + .table_cursor + .seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true })); + + let table_record = return_if_io!(cursors.table_cursor.record()); + if let Some(rec) = table_record { + let table_values = rec.get_values(); + // Table format: [storage_id, zset_id, element_id, value_blob, weight] + if table_values.len() >= 5 { + // Deserialize the row from the blob + let value_at_3 = table_values[3].to_owned(); + let blob = match value_at_3 { + Value::Blob(ref b) => b, + _ => return Ok(IOResult::Done(None)), + }; + + // The blob contains the serialized HashableRow + // For now, let's deserialize it simply + let row = deserialize_hashable_row(blob)?; + + let weight = match &table_values[4].to_owned() { + Value::Integer(w) => *w as isize, + _ => return Ok(IOResult::Done(None)), + }; + + return Ok(IOResult::Done(Some((element_id, row, weight)))); + } + } + } + Ok(IOResult::Done(None)) +} + +// Join-specific eval states +#[derive(Debug)] +pub enum JoinEvalState { + ProcessDeltaJoin { + deltas: DeltaPair, + output: Delta, + }, + ProcessLeftJoin { + deltas: DeltaPair, + output: Delta, + current_idx: usize, + last_row_scanned: i64, + }, + ProcessRightJoin { + deltas: DeltaPair, + output: Delta, + current_idx: usize, + last_row_scanned: i64, + }, + Done { + output: Delta, + }, +} + +impl JoinEvalState { + fn combine_rows( + left_row: &HashableRow, + left_weight: i64, + right_row: &HashableRow, + right_weight: i64, + output: &mut Delta, + ) { + // Combine the rows + let mut combined_values = left_row.values.clone(); + combined_values.extend(right_row.values.clone()); + // Use hash of the combined values as rowid to ensure uniqueness + let temp_row = HashableRow::new(0, combined_values.clone()); + let joined_rowid = temp_row.cached_hash() as i64; + let joined_row = HashableRow::new(joined_rowid, combined_values); + + // Add to output with combined weight + let combined_weight = left_weight * right_weight; + output.changes.push((joined_row, combined_weight as isize)); + } + + fn process_join_state( + &mut self, + cursors: &mut DbspStateCursors, + left_key_indices: &[usize], + right_key_indices: &[usize], + left_storage_id: i64, + right_storage_id: i64, + ) -> Result> { + loop { + match self { + JoinEvalState::ProcessDeltaJoin { deltas, output } => { + // Move to ProcessLeftJoin + *self = JoinEvalState::ProcessLeftJoin { + deltas: std::mem::take(deltas), + output: std::mem::take(output), + current_idx: 0, + last_row_scanned: i64::MIN, + }; + } + JoinEvalState::ProcessLeftJoin { + deltas, + output, + current_idx, + last_row_scanned, + } => { + if *current_idx >= deltas.left.changes.len() { + *self = JoinEvalState::ProcessRightJoin { + deltas: std::mem::take(deltas), + output: std::mem::take(output), + current_idx: 0, + last_row_scanned: i64::MIN, + }; + } else { + let (left_row, left_weight) = &deltas.left.changes[*current_idx]; + // Extract join key using provided indices + let key_values: Vec = left_key_indices + .iter() + .map(|&idx| left_row.values.get(idx).cloned().unwrap_or(Value::Null)) + .collect(); + let left_key = HashableRow::new(0, key_values); + + let next_row = return_if_io!(read_next_join_row( + right_storage_id, + &left_key, + *last_row_scanned, + cursors + )); + match next_row { + Some((element_id, right_row, right_weight)) => { + Self::combine_rows( + left_row, + (*left_weight) as i64, + &right_row, + right_weight as i64, + output, + ); + // Continue scanning with this left row + *self = JoinEvalState::ProcessLeftJoin { + deltas: std::mem::take(deltas), + output: std::mem::take(output), + current_idx: *current_idx, + last_row_scanned: element_id, + }; + } + None => { + // No more matches for this left row, move to next + *self = JoinEvalState::ProcessLeftJoin { + deltas: std::mem::take(deltas), + output: std::mem::take(output), + current_idx: *current_idx + 1, + last_row_scanned: i64::MIN, + }; + } + } + } + } + JoinEvalState::ProcessRightJoin { + deltas, + output, + current_idx, + last_row_scanned, + } => { + if *current_idx >= deltas.right.changes.len() { + *self = JoinEvalState::Done { + output: std::mem::take(output), + }; + } else { + let (right_row, right_weight) = &deltas.right.changes[*current_idx]; + // Extract join key using provided indices + let key_values: Vec = right_key_indices + .iter() + .map(|&idx| right_row.values.get(idx).cloned().unwrap_or(Value::Null)) + .collect(); + let right_key = HashableRow::new(0, key_values); + + let next_row = return_if_io!(read_next_join_row( + left_storage_id, + &right_key, + *last_row_scanned, + cursors + )); + match next_row { + Some((element_id, left_row, left_weight)) => { + Self::combine_rows( + &left_row, + left_weight as i64, + right_row, + (*right_weight) as i64, + output, + ); + // Continue scanning with this right row + *self = JoinEvalState::ProcessRightJoin { + deltas: std::mem::take(deltas), + output: std::mem::take(output), + current_idx: *current_idx, + last_row_scanned: element_id, + }; + } + None => { + // No more matches for this right row, move to next + *self = JoinEvalState::ProcessRightJoin { + deltas: std::mem::take(deltas), + output: std::mem::take(output), + current_idx: *current_idx + 1, + last_row_scanned: i64::MIN, + }; + } + } + } + } + JoinEvalState::Done { output } => { + return Ok(IOResult::Done(std::mem::take(output))); + } + } + } + } +} + +// Generic eval state that delegates to operator-specific states +#[derive(Debug)] +pub enum EvalState { + Uninitialized, + Init { deltas: DeltaPair }, + Aggregate(Box), + Join(Box), Done, } @@ -190,23 +478,26 @@ impl EvalState { } } - fn advance(&mut self, groups_to_read: BTreeMap>) { + fn advance_aggregate(&mut self, groups_to_read: BTreeMap>) { let delta = match self { EvalState::Init { deltas } => std::mem::take(&mut deltas.left), - _ => panic!("advance() can only be called when in Init state, current state: {self:?}"), + _ => panic!("advance_aggregate() can only be called when in Init state, current state: {self:?}"), }; let _ = std::mem::replace( self, - EvalState::FetchKey { + EvalState::Aggregate(Box::new(AggregateEvalState::FetchKey { delta, current_idx: 0, groups_to_read: groups_to_read.into_iter().collect(), // Convert BTreeMap to Vec existing_groups: HashMap::new(), old_values: HashMap::new(), - }, + })), ); } +} + +impl AggregateEvalState { fn process_delta( &mut self, operator: &mut AggregateOperator, @@ -214,13 +505,7 @@ impl EvalState { ) -> Result> { loop { match self { - EvalState::Uninitialized => { - panic!("Cannot process_delta with Uninitialized state"); - } - EvalState::Init { .. } => { - panic!("State machine not supposed to reach the init state! advance() should have been called"); - } - EvalState::FetchKey { + AggregateEvalState::FetchKey { delta, current_idx, groups_to_read, @@ -238,7 +523,7 @@ impl EvalState { operator, )); - *self = EvalState::RecomputeMinMax { + *self = AggregateEvalState::RecomputeMinMax { delta: std::mem::take(delta), existing_groups: std::mem::take(existing_groups), old_values: std::mem::take(old_values), @@ -284,7 +569,7 @@ impl EvalState { // Always transition to FetchData let taken_existing = std::mem::take(existing_groups); let taken_old_values = std::mem::take(old_values); - let next_state = EvalState::FetchData { + let next_state = AggregateEvalState::FetchData { delta: std::mem::take(delta), current_idx: *current_idx, groups_to_read: std::mem::take(groups_to_read), @@ -296,7 +581,7 @@ impl EvalState { *self = next_state; } } - EvalState::FetchData { + AggregateEvalState::FetchData { delta, current_idx, groups_to_read, @@ -332,7 +617,7 @@ impl EvalState { let next_idx = *current_idx + 1; let taken_existing = std::mem::take(existing_groups); let taken_old_values = std::mem::take(old_values); - let next_state = EvalState::FetchKey { + let next_state = AggregateEvalState::FetchKey { delta: std::mem::take(delta), current_idx: next_idx, groups_to_read: std::mem::take(groups_to_read), @@ -341,7 +626,7 @@ impl EvalState { }; *self = next_state; } - EvalState::RecomputeMinMax { + AggregateEvalState::RecomputeMinMax { delta, existing_groups, old_values, @@ -356,11 +641,12 @@ impl EvalState { let (output_delta, computed_states) = operator.merge_delta_with_existing(delta, existing_groups, old_values); - *self = EvalState::Done; - return Ok(IOResult::Done((output_delta, computed_states))); + *self = AggregateEvalState::Done { + output: (output_delta, computed_states), + }; } - EvalState::Done => { - return Ok(IOResult::Done((Delta::new(), HashMap::new()))); + AggregateEvalState::Done { output } => { + return Ok(IOResult::Done(output.clone())); } } } @@ -646,6 +932,8 @@ pub enum JoinType { Inner, Left, Right, + Full, + Cross, } #[derive(Debug, Clone, PartialEq)] @@ -1877,21 +2165,27 @@ impl AggregateOperator { let group_key_str = Self::group_key_to_string(&group_key); groups_to_read.insert(group_key_str, group_key); } - state.advance(groups_to_read); + state.advance_aggregate(groups_to_read); } - EvalState::FetchKey { .. } - | EvalState::FetchData { .. } - | EvalState::RecomputeMinMax { .. } => { - // Already in progress, continue processing on process_delta below. + EvalState::Aggregate(_agg_state) => { + // Already in progress, continue processing below. } EvalState::Done => { panic!("unreachable state! should have returned"); } + EvalState::Join(_) => { + panic!("Join state should not appear in aggregate operator"); + } } - // Process the delta through the state machine - let result = return_if_io!(state.process_delta(self, cursors)); - Ok(IOResult::Done(result)) + // Process the delta through the aggregate state machine + match state { + EvalState::Aggregate(agg_state) => { + let result = return_if_io!(agg_state.process_delta(self, cursors)); + Ok(IOResult::Done(result)) + } + _ => panic!("Invalid state for aggregate processing"), + } } fn merge_delta_with_existing( @@ -2228,6 +2522,493 @@ impl IncrementalOperator for AggregateOperator { } } +#[derive(Debug)] +enum JoinCommitState { + Idle, + Eval { + eval_state: EvalState, + }, + CommitLeftDelta { + deltas: DeltaPair, + output: Delta, + current_idx: usize, + write_row: WriteRow, + }, + CommitRightDelta { + deltas: DeltaPair, + output: Delta, + current_idx: usize, + write_row: WriteRow, + }, + Invalid, +} + +/// Join operator - performs incremental join between two relations +/// Implements the DBSP formula: δ(R ⋈ S) = (δR ⋈ S) ∪ (R ⋈ δS) ∪ (δR ⋈ δS) +#[derive(Debug)] +pub struct JoinOperator { + /// Unique operator ID for indexing in persistent storage + operator_id: usize, + /// Type of join to perform + join_type: JoinType, + /// Column indices for extracting join keys from left input + left_key_indices: Vec, + /// Column indices for extracting join keys from right input + right_key_indices: Vec, + /// Column names from left input + left_columns: Vec, + /// Column names from right input + right_columns: Vec, + /// Tracker for computation statistics + tracker: Option>>, + + commit_state: JoinCommitState, +} + +impl JoinOperator { + pub fn new( + operator_id: usize, + join_type: JoinType, + left_key_indices: Vec, + right_key_indices: Vec, + left_columns: Vec, + right_columns: Vec, + ) -> Result { + // Check for unsupported join types + match join_type { + JoinType::Left => { + return Err(crate::LimboError::ParseError( + "LEFT OUTER JOIN is not yet supported in incremental views".to_string(), + )) + } + JoinType::Right => { + return Err(crate::LimboError::ParseError( + "RIGHT OUTER JOIN is not yet supported in incremental views".to_string(), + )) + } + JoinType::Full => { + return Err(crate::LimboError::ParseError( + "FULL OUTER JOIN is not yet supported in incremental views".to_string(), + )) + } + JoinType::Cross => { + return Err(crate::LimboError::ParseError( + "CROSS JOIN is not yet supported in incremental views".to_string(), + )) + } + JoinType::Inner => {} // Inner join is supported + } + + Ok(Self { + operator_id, + join_type, + left_key_indices, + right_key_indices, + left_columns, + right_columns, + tracker: None, + commit_state: JoinCommitState::Idle, + }) + } + + /// Extract join key from row values using the specified indices + fn extract_join_key(&self, values: &[Value], indices: &[usize]) -> HashableRow { + let key_values: Vec = indices + .iter() + .map(|&idx| values.get(idx).cloned().unwrap_or(Value::Null)) + .collect(); + // Use 0 as a dummy rowid for join keys. They don't come from a table, + // so they don't need a rowid. Their key will be the hash of the row values. + HashableRow::new(0, key_values) + } + + /// Generate storage ID for left table + fn left_storage_id(&self) -> i64 { + // Use column_index=0 for left side + generate_storage_id(self.operator_id, 0, 0) + } + + /// Generate storage ID for right table + fn right_storage_id(&self) -> i64 { + // Use column_index=1 for right side + generate_storage_id(self.operator_id, 1, 0) + } + + /// SQL-compliant comparison for join keys + /// Returns true if keys match according to SQL semantics (NULL != NULL) + fn sql_keys_equal(left_key: &HashableRow, right_key: &HashableRow) -> bool { + if left_key.values.len() != right_key.values.len() { + return false; + } + + for (left_val, right_val) in left_key.values.iter().zip(right_key.values.iter()) { + // In SQL, NULL never equals NULL + if matches!(left_val, Value::Null) || matches!(right_val, Value::Null) { + return false; + } + + // For non-NULL values, use regular comparison + if left_val != right_val { + return false; + } + } + + true + } + + fn process_join_state( + &mut self, + state: &mut EvalState, + cursors: &mut DbspStateCursors, + ) -> Result> { + // Get the join state out of the enum + match state { + EvalState::Join(js) => js.process_join_state( + cursors, + &self.left_key_indices, + &self.right_key_indices, + self.left_storage_id(), + self.right_storage_id(), + ), + _ => panic!("process_join_state called with non-join state"), + } + } + + fn eval_internal( + &mut self, + state: &mut EvalState, + cursors: &mut DbspStateCursors, + ) -> Result> { + loop { + let loop_state = std::mem::replace(state, EvalState::Uninitialized); + match loop_state { + EvalState::Uninitialized => { + panic!("Cannot eval JoinOperator with Uninitialized state"); + } + EvalState::Init { deltas } => { + let mut output = Delta::new(); + + // Component 3: δR ⋈ δS (left delta join right delta) + for (left_row, left_weight) in &deltas.left.changes { + let left_key = + self.extract_join_key(&left_row.values, &self.left_key_indices); + + for (right_row, right_weight) in &deltas.right.changes { + let right_key = + self.extract_join_key(&right_row.values, &self.right_key_indices); + + if Self::sql_keys_equal(&left_key, &right_key) { + if let Some(tracker) = &self.tracker { + tracker.lock().unwrap().record_join_lookup(); + } + + // Combine the rows + let mut combined_values = left_row.values.clone(); + combined_values.extend(right_row.values.clone()); + + // Create the joined row with a unique rowid + // Use hash of the combined values to ensure uniqueness + let temp_row = HashableRow::new(0, combined_values.clone()); + let joined_rowid = temp_row.cached_hash() as i64; + let joined_row = + HashableRow::new(joined_rowid, combined_values.clone()); + + // Add to output with combined weight + let combined_weight = left_weight * right_weight; + output.changes.push((joined_row, combined_weight)); + } + } + } + + *state = EvalState::Join(Box::new(JoinEvalState::ProcessDeltaJoin { + deltas, + output, + })); + } + EvalState::Join(join_state) => { + *state = EvalState::Join(join_state); + let output = return_if_io!(self.process_join_state(state, cursors)); + return Ok(IOResult::Done(output)); + } + EvalState::Done => { + return Ok(IOResult::Done(Delta::new())); + } + EvalState::Aggregate(_) => { + panic!("Aggregate state should not appear in join operator"); + } + } + } + } +} + +// Helper to deserialize a HashableRow from a blob +fn deserialize_hashable_row(blob: &[u8]) -> Result { + // Simple deserialization - this needs to match how we serialize in commit + // Format: [rowid:8 bytes][num_values:4 bytes][values...] + if blob.len() < 12 { + return Err(crate::LimboError::InternalError( + "Invalid blob size".to_string(), + )); + } + + let rowid = i64::from_le_bytes(blob[0..8].try_into().unwrap()); + let num_values = u32::from_le_bytes(blob[8..12].try_into().unwrap()) as usize; + + let mut values = Vec::new(); + let mut offset = 12; + + for _ in 0..num_values { + if offset >= blob.len() { + break; + } + + let type_tag = blob[offset]; + offset += 1; + + match type_tag { + 0 => values.push(Value::Null), + 1 => { + if offset + 8 <= blob.len() { + let i = i64::from_le_bytes(blob[offset..offset + 8].try_into().unwrap()); + values.push(Value::Integer(i)); + offset += 8; + } + } + 2 => { + if offset + 8 <= blob.len() { + let f = f64::from_le_bytes(blob[offset..offset + 8].try_into().unwrap()); + values.push(Value::Float(f)); + offset += 8; + } + } + 3 => { + if offset + 4 <= blob.len() { + let len = + u32::from_le_bytes(blob[offset..offset + 4].try_into().unwrap()) as usize; + offset += 4; + if offset + len < blob.len() { + let text_bytes = blob[offset..offset + len].to_vec(); + offset += len; + let subtype = match blob[offset] { + 0 => crate::types::TextSubtype::Text, + 1 => crate::types::TextSubtype::Json, + _ => crate::types::TextSubtype::Text, + }; + offset += 1; + values.push(Value::Text(crate::types::Text { + value: text_bytes, + subtype, + })); + } + } + } + 4 => { + if offset + 4 <= blob.len() { + let len = + u32::from_le_bytes(blob[offset..offset + 4].try_into().unwrap()) as usize; + offset += 4; + if offset + len <= blob.len() { + let blob_data = blob[offset..offset + len].to_vec(); + values.push(Value::Blob(blob_data)); + offset += len; + } + } + } + _ => break, // Unknown type tag + } + } + + Ok(HashableRow::new(rowid, values)) +} + +// Helper to serialize a HashableRow to a blob +fn serialize_hashable_row(row: &HashableRow) -> Vec { + let mut blob = Vec::new(); + + // Write rowid + blob.extend_from_slice(&row.rowid.to_le_bytes()); + + // Write number of values + blob.extend_from_slice(&(row.values.len() as u32).to_le_bytes()); + + // Write each value directly with type tags (like AggregateState does) + for value in &row.values { + match value { + Value::Null => blob.push(0u8), + Value::Integer(i) => { + blob.push(1u8); + blob.extend_from_slice(&i.to_le_bytes()); + } + Value::Float(f) => { + blob.push(2u8); + blob.extend_from_slice(&f.to_le_bytes()); + } + Value::Text(s) => { + blob.push(3u8); + let bytes = &s.value; + blob.extend_from_slice(&(bytes.len() as u32).to_le_bytes()); + blob.extend_from_slice(bytes); + blob.push(s.subtype as u8); + } + Value::Blob(b) => { + blob.push(4u8); + blob.extend_from_slice(&(b.len() as u32).to_le_bytes()); + blob.extend_from_slice(b); + } + } + } + + blob +} + +impl IncrementalOperator for JoinOperator { + fn eval( + &mut self, + state: &mut EvalState, + cursors: &mut DbspStateCursors, + ) -> Result> { + let delta = return_if_io!(self.eval_internal(state, cursors)); + Ok(IOResult::Done(delta)) + } + + fn commit( + &mut self, + deltas: DeltaPair, + cursors: &mut DbspStateCursors, + ) -> Result> { + loop { + let mut state = std::mem::replace(&mut self.commit_state, JoinCommitState::Invalid); + match &mut state { + JoinCommitState::Idle => { + self.commit_state = JoinCommitState::Eval { + eval_state: deltas.clone().into(), + } + } + JoinCommitState::Eval { ref mut eval_state } => { + let output = return_and_restore_if_io!( + &mut self.commit_state, + state, + self.eval(eval_state, cursors) + ); + self.commit_state = JoinCommitState::CommitLeftDelta { + deltas: deltas.clone(), + output, + current_idx: 0, + write_row: WriteRow::new(), + }; + } + JoinCommitState::CommitLeftDelta { + deltas, + output, + current_idx, + ref mut write_row, + } => { + if *current_idx >= deltas.left.changes.len() { + self.commit_state = JoinCommitState::CommitRightDelta { + deltas: std::mem::take(deltas), + output: std::mem::take(output), + current_idx: 0, + write_row: WriteRow::new(), + }; + continue; + } + + let (row, weight) = &deltas.left.changes[*current_idx]; + // Extract join key from the left row + let join_key = self.extract_join_key(&row.values, &self.left_key_indices); + + // The index key: (storage_id, zset_id, element_id) + // zset_id is the hash of the join key, element_id is hash of the row + let storage_id = self.left_storage_id(); + let zset_id = join_key.cached_hash() as i64; + let element_id = row.cached_hash() as i64; + let index_key = vec![ + Value::Integer(storage_id), + Value::Integer(zset_id), + Value::Integer(element_id), + ]; + + // The record values: we'll store the serialized row as a blob + let row_blob = serialize_hashable_row(row); + let record_values = vec![ + Value::Integer(self.left_storage_id()), + Value::Integer(join_key.cached_hash() as i64), + Value::Integer(row.cached_hash() as i64), + Value::Blob(row_blob), + ]; + + // Use return_and_restore_if_io to handle I/O properly + return_and_restore_if_io!( + &mut self.commit_state, + state, + write_row.write_row(cursors, index_key, record_values, *weight) + ); + + self.commit_state = JoinCommitState::CommitLeftDelta { + deltas: deltas.clone(), + output: output.clone(), + current_idx: *current_idx + 1, + write_row: WriteRow::new(), + }; + } + JoinCommitState::CommitRightDelta { + deltas, + output, + current_idx, + ref mut write_row, + } => { + if *current_idx >= deltas.right.changes.len() { + // Reset to Idle state for next commit + self.commit_state = JoinCommitState::Idle; + return Ok(IOResult::Done(output.clone())); + } + + let (row, weight) = &deltas.right.changes[*current_idx]; + // Extract join key from the right row + let join_key = self.extract_join_key(&row.values, &self.right_key_indices); + + // The index key: (storage_id, zset_id, element_id) + let index_key = vec![ + Value::Integer(self.right_storage_id()), + Value::Integer(join_key.cached_hash() as i64), + Value::Integer(row.cached_hash() as i64), + ]; + + // The record values: we'll store the serialized row as a blob + let row_blob = serialize_hashable_row(row); + let record_values = vec![ + Value::Integer(self.right_storage_id()), + Value::Integer(join_key.cached_hash() as i64), + Value::Integer(row.cached_hash() as i64), + Value::Blob(row_blob), + ]; + + // Use return_and_restore_if_io to handle I/O properly + return_and_restore_if_io!( + &mut self.commit_state, + state, + write_row.write_row(cursors, index_key, record_values, *weight) + ); + + self.commit_state = JoinCommitState::CommitRightDelta { + deltas: std::mem::take(deltas), + output: std::mem::take(output), + current_idx: *current_idx + 1, + write_row: WriteRow::new(), + }; + } + JoinCommitState::Invalid => { + panic!("Invalid join commit state"); + } + } + } + } + + fn set_tracker(&mut self, tracker: Arc>) { + self.tracker = Some(tracker); + } +} + #[cfg(test)] mod tests { use super::*; @@ -4897,4 +5678,765 @@ mod tests { assert_eq!(row_ins.values[1], Value::Integer(150)); // New MAX(col2) assert_eq!(row_ins.values[2], Value::Integer(500)); // MIN(col3) unchanged } + + #[test] + fn test_join_operator_inner() { + // Test INNER JOIN with incremental updates + let (pager, table_page_id, index_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10); + let index_def = create_dbsp_state_index(index_page_id); + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); + + let mut join = JoinOperator::new( + 1, // operator_id + JoinType::Inner, + vec![0], // Join on first column + vec![0], + vec!["customer_id".to_string(), "amount".to_string()], + vec!["id".to_string(), "name".to_string()], + ) + .unwrap(); + + // FIRST COMMIT: Initialize with data + let mut left_delta = Delta::new(); + left_delta.insert(1, vec![Value::Integer(1), Value::Float(100.0)]); + left_delta.insert(2, vec![Value::Integer(2), Value::Float(200.0)]); + left_delta.insert(3, vec![Value::Integer(3), Value::Float(300.0)]); // No match initially + + let mut right_delta = Delta::new(); + right_delta.insert(1, vec![Value::Integer(1), Value::Text("Alice".into())]); + right_delta.insert(2, vec![Value::Integer(2), Value::Text("Bob".into())]); + right_delta.insert(4, vec![Value::Integer(4), Value::Text("David".into())]); // No match initially + + let delta_pair = DeltaPair::new(left_delta, right_delta); + let result = pager + .io + .block(|| join.commit(delta_pair.clone(), &mut cursors)) + .unwrap(); + + // Should have 2 matches (customer 1 and 2) + assert_eq!( + result.changes.len(), + 2, + "First commit should produce 2 matches" + ); + + let mut results: Vec<_> = result.changes.clone(); + results.sort_by_key(|r| r.0.values[0].clone()); + + assert_eq!(results[0].0.values[0], Value::Integer(1)); + assert_eq!(results[0].0.values[3], Value::Text("Alice".into())); + assert_eq!(results[1].0.values[0], Value::Integer(2)); + assert_eq!(results[1].0.values[3], Value::Text("Bob".into())); + + // SECOND COMMIT: Add incremental data that should join with persisted state + // Add a new left row that should match existing right row (customer 4) + let mut left_delta2 = Delta::new(); + left_delta2.insert(5, vec![Value::Integer(4), Value::Float(400.0)]); // Should match David from persisted state + + // Add a new right row that should match existing left row (customer 3) + let mut right_delta2 = Delta::new(); + right_delta2.insert(6, vec![Value::Integer(3), Value::Text("Charlie".into())]); // Should match customer 3 from persisted state + + let delta_pair2 = DeltaPair::new(left_delta2, right_delta2); + let result2 = pager + .io + .block(|| join.commit(delta_pair2.clone(), &mut cursors)) + .unwrap(); + + // The second commit should produce: + // 1. New left (customer_id=4) joins with persisted right (id=4, David) + // 2. Persisted left (customer_id=3) joins with new right (id=3, Charlie) + + assert_eq!( + result2.changes.len(), + 2, + "Second commit should produce 2 new matches from incremental join. Got: {:?}", + result2.changes + ); + + // Verify the incremental results + let mut results2: Vec<_> = result2.changes.clone(); + results2.sort_by_key(|r| r.0.values[0].clone()); + + // Check for customer 3 joined with Charlie (existing left + new right) + let charlie_match = results2 + .iter() + .find(|(row, _)| row.values[0] == Value::Integer(3)) + .expect("Should find customer 3 joined with new Charlie"); + assert_eq!(charlie_match.0.values[2], Value::Integer(3)); + assert_eq!(charlie_match.0.values[3], Value::Text("Charlie".into())); + + // Check for customer 4 joined with David (new left + existing right) + let david_match = results2 + .iter() + .find(|(row, _)| row.values[0] == Value::Integer(4)) + .expect("Should find new customer 4 joined with existing David"); + assert_eq!(david_match.0.values[0], Value::Integer(4)); + assert_eq!(david_match.0.values[3], Value::Text("David".into())); + } + + #[test] + fn test_join_operator_with_deletions() { + // Test INNER JOIN with deletions (negative weights) + let (pager, table_page_id, index_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10); + let index_def = create_dbsp_state_index(index_page_id); + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); + + let mut join = JoinOperator::new( + 1, // operator_id + JoinType::Inner, + vec![0], // Join on first column + vec![0], + vec!["customer_id".to_string(), "amount".to_string()], + vec!["id".to_string(), "name".to_string()], + ) + .unwrap(); + + // FIRST COMMIT: Add initial data + let mut left_delta = Delta::new(); + left_delta.insert(1, vec![Value::Integer(1), Value::Float(100.0)]); + left_delta.insert(2, vec![Value::Integer(2), Value::Float(200.0)]); + left_delta.insert(3, vec![Value::Integer(3), Value::Float(300.0)]); + + let mut right_delta = Delta::new(); + right_delta.insert(1, vec![Value::Integer(1), Value::Text("Alice".into())]); + right_delta.insert(2, vec![Value::Integer(2), Value::Text("Bob".into())]); + right_delta.insert(3, vec![Value::Integer(3), Value::Text("Charlie".into())]); + + let delta_pair = DeltaPair::new(left_delta, right_delta); + + let result = pager + .io + .block(|| join.commit(delta_pair.clone(), &mut cursors)) + .unwrap(); + + assert_eq!(result.changes.len(), 3, "Should have 3 initial joins"); + + // SECOND COMMIT: Delete customer 2 from left side + let mut left_delta2 = Delta::new(); + left_delta2.delete(2, vec![Value::Integer(2), Value::Float(200.0)]); + + let empty_right = Delta::new(); + let delta_pair2 = DeltaPair::new(left_delta2, empty_right); + + let result2 = pager + .io + .block(|| join.commit(delta_pair2.clone(), &mut cursors)) + .unwrap(); + + // Should produce 1 deletion (retraction) of the join for customer 2 + assert_eq!( + result2.changes.len(), + 1, + "Should produce 1 retraction for deleted customer 2" + ); + assert_eq!( + result2.changes[0].1, -1, + "Should have weight -1 for deletion" + ); + assert_eq!(result2.changes[0].0.values[0], Value::Integer(2)); + assert_eq!(result2.changes[0].0.values[3], Value::Text("Bob".into())); + + // THIRD COMMIT: Delete customer 3 from right side + let empty_left = Delta::new(); + let mut right_delta3 = Delta::new(); + right_delta3.delete(3, vec![Value::Integer(3), Value::Text("Charlie".into())]); + + let delta_pair3 = DeltaPair::new(empty_left, right_delta3); + + let result3 = pager + .io + .block(|| join.commit(delta_pair3.clone(), &mut cursors)) + .unwrap(); + + // Should produce 1 deletion (retraction) of the join for customer 3 + assert_eq!( + result3.changes.len(), + 1, + "Should produce 1 retraction for deleted customer 3" + ); + assert_eq!( + result3.changes[0].1, -1, + "Should have weight -1 for deletion" + ); + assert_eq!(result3.changes[0].0.values[0], Value::Integer(3)); + assert_eq!(result3.changes[0].0.values[2], Value::Integer(3)); + } + + #[test] + fn test_join_operator_one_to_many() { + // Test one-to-many relationship: one customer with multiple orders + let (pager, table_page_id, index_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10); + let index_def = create_dbsp_state_index(index_page_id); + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); + + let mut join = JoinOperator::new( + 1, // operator_id + JoinType::Inner, + vec![0], // Join on first column (customer_id for orders) + vec![0], // Join on first column (id for customers) + vec![ + "customer_id".to_string(), + "order_id".to_string(), + "amount".to_string(), + ], + vec!["id".to_string(), "name".to_string()], + ) + .unwrap(); + + // FIRST COMMIT: Add one customer + let left_delta = Delta::new(); // Empty orders initially + let mut right_delta = Delta::new(); + right_delta.insert(1, vec![Value::Integer(100), Value::Text("Alice".into())]); + + let delta_pair = DeltaPair::new(left_delta, right_delta); + let result = pager + .io + .block(|| join.commit(delta_pair.clone(), &mut cursors)) + .unwrap(); + + // No joins yet (customer exists but no orders) + assert_eq!( + result.changes.len(), + 0, + "Should have no joins with customer but no orders" + ); + + // SECOND COMMIT: Add multiple orders for the same customer + let mut left_delta2 = Delta::new(); + left_delta2.insert( + 1, + vec![ + Value::Integer(100), + Value::Integer(1001), + Value::Float(50.0), + ], + ); // order 1001 + left_delta2.insert( + 2, + vec![ + Value::Integer(100), + Value::Integer(1002), + Value::Float(75.0), + ], + ); // order 1002 + left_delta2.insert( + 3, + vec![ + Value::Integer(100), + Value::Integer(1003), + Value::Float(100.0), + ], + ); // order 1003 + + let right_delta2 = Delta::new(); // No new customers + + let delta_pair2 = DeltaPair::new(left_delta2, right_delta2); + let result2 = pager + .io + .block(|| join.commit(delta_pair2.clone(), &mut cursors)) + .unwrap(); + + // Should produce 3 joins (3 orders × 1 customer) + assert_eq!( + result2.changes.len(), + 3, + "Should produce 3 joins for 3 orders with same customer. Got: {:?}", + result2.changes + ); + + // Verify all three joins have the same customer but different orders + for (row, weight) in &result2.changes { + assert_eq!(*weight, 1, "Weight should be 1 for insertion"); + assert_eq!( + row.values[0], + Value::Integer(100), + "Customer ID should be 100" + ); + assert_eq!( + row.values[4], + Value::Text("Alice".into()), + "Customer name should be Alice" + ); + + // Check order IDs are different + let order_id = match &row.values[1] { + Value::Integer(id) => *id, + _ => panic!("Expected integer order ID"), + }; + assert!( + (1001..=1003).contains(&order_id), + "Order ID {order_id} should be between 1001 and 1003" + ); + } + + // THIRD COMMIT: Delete one order + let mut left_delta3 = Delta::new(); + left_delta3.delete( + 2, + vec![ + Value::Integer(100), + Value::Integer(1002), + Value::Float(75.0), + ], + ); + + let delta_pair3 = DeltaPair::new(left_delta3, Delta::new()); + let result3 = pager + .io + .block(|| join.commit(delta_pair3.clone(), &mut cursors)) + .unwrap(); + + // Should produce 1 retraction for the deleted order + assert_eq!(result3.changes.len(), 1, "Should produce 1 retraction"); + assert_eq!(result3.changes[0].1, -1, "Should be a deletion"); + assert_eq!( + result3.changes[0].0.values[1], + Value::Integer(1002), + "Should delete order 1002" + ); + } + + #[test] + fn test_join_operator_many_to_many() { + // Test many-to-many: multiple rows with same key on both sides + let (pager, table_page_id, index_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10); + let index_def = create_dbsp_state_index(index_page_id); + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); + + let mut join = JoinOperator::new( + 1, // operator_id + JoinType::Inner, + vec![0], // Join on category_id + vec![0], // Join on id + vec![ + "category_id".to_string(), + "product_name".to_string(), + "price".to_string(), + ], + vec!["id".to_string(), "category_name".to_string()], + ) + .unwrap(); + + // FIRST COMMIT: Add multiple products in same category + let mut left_delta = Delta::new(); + left_delta.insert( + 1, + vec![ + Value::Integer(10), + Value::Text("Laptop".into()), + Value::Float(1000.0), + ], + ); + left_delta.insert( + 2, + vec![ + Value::Integer(10), + Value::Text("Mouse".into()), + Value::Float(50.0), + ], + ); + left_delta.insert( + 3, + vec![ + Value::Integer(10), + Value::Text("Keyboard".into()), + Value::Float(100.0), + ], + ); + + // Add multiple categories with same ID (simulating denormalized data or versioning) + let mut right_delta = Delta::new(); + right_delta.insert( + 1, + vec![Value::Integer(10), Value::Text("Electronics".into())], + ); + right_delta.insert(2, vec![Value::Integer(10), Value::Text("Computers".into())]); // Same category ID, different name + + let delta_pair = DeltaPair::new(left_delta, right_delta); + let result = pager + .io + .block(|| join.commit(delta_pair.clone(), &mut cursors)) + .unwrap(); + + // Should produce 3 products × 2 categories = 6 joins + assert_eq!( + result.changes.len(), + 6, + "Should produce 6 joins (3 products × 2 category records). Got: {:?}", + result.changes + ); + + // Verify we have all combinations + let mut found_combinations = std::collections::HashSet::new(); + for (row, weight) in &result.changes { + assert_eq!(*weight, 1); + let product = row.values[1].to_string(); + let category = row.values[4].to_string(); + found_combinations.insert((product, category)); + } + + assert_eq!( + found_combinations.len(), + 6, + "Should have 6 unique combinations" + ); + + // SECOND COMMIT: Add one more product in the same category + let mut left_delta2 = Delta::new(); + left_delta2.insert( + 4, + vec![ + Value::Integer(10), + Value::Text("Monitor".into()), + Value::Float(500.0), + ], + ); + + let delta_pair2 = DeltaPair::new(left_delta2, Delta::new()); + let result2 = pager + .io + .block(|| join.commit(delta_pair2.clone(), &mut cursors)) + .unwrap(); + + // New product should join with both existing category records + assert_eq!( + result2.changes.len(), + 2, + "New product should join with 2 existing category records" + ); + + for (row, _) in &result2.changes { + assert_eq!(row.values[1], Value::Text("Monitor".into())); + } + } + + #[test] + fn test_join_operator_update_in_one_to_many() { + // Test updates in one-to-many scenarios + let (pager, table_page_id, index_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10); + let index_def = create_dbsp_state_index(index_page_id); + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); + + let mut join = JoinOperator::new( + 1, // operator_id + JoinType::Inner, + vec![0], // Join on customer_id + vec![0], // Join on id + vec![ + "customer_id".to_string(), + "order_id".to_string(), + "amount".to_string(), + ], + vec!["id".to_string(), "name".to_string()], + ) + .unwrap(); + + // FIRST COMMIT: Setup one customer with multiple orders + let mut left_delta = Delta::new(); + left_delta.insert( + 1, + vec![ + Value::Integer(100), + Value::Integer(1001), + Value::Float(50.0), + ], + ); + left_delta.insert( + 2, + vec![ + Value::Integer(100), + Value::Integer(1002), + Value::Float(75.0), + ], + ); + left_delta.insert( + 3, + vec![ + Value::Integer(100), + Value::Integer(1003), + Value::Float(100.0), + ], + ); + + let mut right_delta = Delta::new(); + right_delta.insert(1, vec![Value::Integer(100), Value::Text("Alice".into())]); + + let delta_pair = DeltaPair::new(left_delta, right_delta); + let result = pager + .io + .block(|| join.commit(delta_pair.clone(), &mut cursors)) + .unwrap(); + + assert_eq!(result.changes.len(), 3, "Should have 3 initial joins"); + + // SECOND COMMIT: Update the customer name (affects all 3 joins) + let mut right_delta2 = Delta::new(); + // Delete old customer record + right_delta2.delete(1, vec![Value::Integer(100), Value::Text("Alice".into())]); + // Insert updated customer record + right_delta2.insert( + 1, + vec![Value::Integer(100), Value::Text("Alice Smith".into())], + ); + + let delta_pair2 = DeltaPair::new(Delta::new(), right_delta2); + let result2 = pager + .io + .block(|| join.commit(delta_pair2.clone(), &mut cursors)) + .unwrap(); + + // Should produce 3 deletions and 3 insertions (one for each order) + assert_eq!(result2.changes.len(), 6, + "Should produce 6 changes (3 deletions + 3 insertions) when updating customer with 3 orders"); + + let deletions: Vec<_> = result2.changes.iter().filter(|(_, w)| *w == -1).collect(); + let insertions: Vec<_> = result2.changes.iter().filter(|(_, w)| *w == 1).collect(); + + assert_eq!(deletions.len(), 3, "Should have 3 deletions"); + assert_eq!(insertions.len(), 3, "Should have 3 insertions"); + + // Check all deletions have old name + for (row, _) in &deletions { + assert_eq!( + row.values[4], + Value::Text("Alice".into()), + "Deletions should have old name" + ); + } + + // Check all insertions have new name + for (row, _) in &insertions { + assert_eq!( + row.values[4], + Value::Text("Alice Smith".into()), + "Insertions should have new name" + ); + } + + // Verify we still have all three order IDs in the insertions + let mut order_ids = std::collections::HashSet::new(); + for (row, _) in &insertions { + if let Value::Integer(order_id) = &row.values[1] { + order_ids.insert(*order_id); + } + } + assert_eq!( + order_ids.len(), + 3, + "Should still have all 3 order IDs after update" + ); + assert!(order_ids.contains(&1001)); + assert!(order_ids.contains(&1002)); + assert!(order_ids.contains(&1003)); + } + + #[test] + fn test_join_operator_weight_accumulation_complex() { + // Test complex weight accumulation with multiple identical rows + let (pager, table_page_id, index_page_id) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_page_id, 10); + let index_def = create_dbsp_state_index(index_page_id); + let index_cursor = + BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); + + let mut join = JoinOperator::new( + 1, // operator_id + JoinType::Inner, + vec![0], // Join on first column + vec![0], + vec!["key".to_string(), "val_left".to_string()], + vec!["key".to_string(), "val_right".to_string()], + ) + .unwrap(); + + // FIRST COMMIT: Add identical rows multiple times (simulating duplicates) + let mut left_delta = Delta::new(); + // Same key-value pair inserted 3 times with different rowids + left_delta.insert(1, vec![Value::Integer(10), Value::Text("A".into())]); + left_delta.insert(2, vec![Value::Integer(10), Value::Text("A".into())]); + left_delta.insert(3, vec![Value::Integer(10), Value::Text("A".into())]); + + let mut right_delta = Delta::new(); + // Same key-value pair inserted 2 times + right_delta.insert(4, vec![Value::Integer(10), Value::Text("B".into())]); + right_delta.insert(5, vec![Value::Integer(10), Value::Text("B".into())]); + + let delta_pair = DeltaPair::new(left_delta, right_delta); + let result = pager + .io + .block(|| join.commit(delta_pair.clone(), &mut cursors)) + .unwrap(); + + // Should produce 3 × 2 = 6 join results (cartesian product) + assert_eq!( + result.changes.len(), + 6, + "Should produce 6 joins (3 left rows × 2 right rows)" + ); + + // All should have weight 1 + for (_, weight) in &result.changes { + assert_eq!(*weight, 1); + } + + // SECOND COMMIT: Delete one instance from left + let mut left_delta2 = Delta::new(); + left_delta2.delete(2, vec![Value::Integer(10), Value::Text("A".into())]); + + let delta_pair2 = DeltaPair::new(left_delta2, Delta::new()); + let result2 = pager + .io + .block(|| join.commit(delta_pair2.clone(), &mut cursors)) + .unwrap(); + + // Should produce 2 retractions (1 deleted left row × 2 right rows) + assert_eq!( + result2.changes.len(), + 2, + "Should produce 2 retractions when deleting 1 of 3 identical left rows" + ); + + for (_, weight) in &result2.changes { + assert_eq!(*weight, -1, "Should be retractions"); + } + } + + #[test] + fn test_join_produces_all_expected_results() { + // Test that a join produces ALL expected output rows + // This reproduces the issue where only 1 of 3 expected rows appears in the final result + + // Create a join operator similar to: SELECT u.name, o.quantity FROM users u JOIN orders o ON u.id = o.user_id + let mut join = JoinOperator::new( + 0, + JoinType::Inner, + vec![0], // Join on first column (id) + vec![0], // Join on first column (user_id) + vec!["id".to_string(), "name".to_string()], + vec![ + "user_id".to_string(), + "product_id".to_string(), + "quantity".to_string(), + ], + ) + .unwrap(); + + // Create test data matching the example that fails: + // users: (1, 'Alice'), (2, 'Bob') + // orders: (1, 5), (1, 3), (2, 7) -- user_id, quantity + let left_delta = Delta { + changes: vec![ + ( + HashableRow::new(1, vec![Value::Integer(1), Value::Text(Text::from("Alice"))]), + 1, + ), + ( + HashableRow::new(2, vec![Value::Integer(2), Value::Text(Text::from("Bob"))]), + 1, + ), + ], + }; + + // Orders: Alice has 2 orders, Bob has 1 + let right_delta = Delta { + changes: vec![ + ( + HashableRow::new( + 1, + vec![Value::Integer(1), Value::Integer(100), Value::Integer(5)], + ), + 1, + ), + ( + HashableRow::new( + 2, + vec![Value::Integer(1), Value::Integer(101), Value::Integer(3)], + ), + 1, + ), + ( + HashableRow::new( + 3, + vec![Value::Integer(2), Value::Integer(100), Value::Integer(7)], + ), + 1, + ), + ], + }; + + // Evaluate the join + let delta_pair = DeltaPair::new(left_delta, right_delta); + let mut state = EvalState::Init { deltas: delta_pair }; + + let (pager, table_root, index_root) = create_test_pager(); + let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root, 5); + let index_def = create_dbsp_state_index(index_root); + let index_cursor = BTreeCursor::new_index(None, pager.clone(), index_root, &index_def, 4); + let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); + + let result = pager + .io + .block(|| join.eval(&mut state, &mut cursors)) + .unwrap(); + + // Should produce 3 results: Alice with 2 orders, Bob with 1 order + assert_eq!( + result.changes.len(), + 3, + "Should produce 3 joined rows (Alice×2 + Bob×1)" + ); + + // Verify the actual content of the results + let mut expected_results = std::collections::HashSet::new(); + // Expected: (Alice, 5), (Alice, 3), (Bob, 7) + expected_results.insert(("Alice".to_string(), 5)); + expected_results.insert(("Alice".to_string(), 3)); + expected_results.insert(("Bob".to_string(), 7)); + + let mut actual_results = std::collections::HashSet::new(); + for (row, weight) in &result.changes { + assert_eq!(*weight, 1, "All results should have weight 1"); + + // Extract name (column 1 from left) and quantity (column 3 from right) + let name = match &row.values[1] { + Value::Text(t) => t.as_str().to_string(), + _ => panic!("Expected text value for name"), + }; + let quantity = match &row.values[4] { + Value::Integer(q) => *q, + _ => panic!("Expected integer value for quantity"), + }; + + actual_results.insert((name, quantity)); + } + + assert_eq!( + expected_results, actual_results, + "Join should produce all expected results. Expected: {expected_results:?}, Got: {actual_results:?}", + ); + + // Also verify that rowids are unique (this is important for btree storage) + let mut seen_rowids = std::collections::HashSet::new(); + for (row, _) in &result.changes { + let was_new = seen_rowids.insert(row.rowid); + assert!(was_new, "Duplicate rowid found: {}. This would cause rows to overwrite each other in btree storage!", row.rowid); + } + } }