From 3dc1dca5a82d628c42d397550ce0a3a6bfa40685 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 25 Sep 2025 15:54:20 -0300 Subject: [PATCH] use 128-bit hashes for the zset_id We have used i64 before because that is the size of an integer in SQLite. However, I believe that for large enough databases, the chances of collision here are just too high. The effect of a collision is the database silently returning incorrect data in the materialized view. So now that everything else is working, we should move to i128. --- Cargo.lock | 7 ++ core/Cargo.toml | 2 +- core/incremental/aggregate_operator.rs | 107 ++++++++++------- core/incremental/dbsp.rs | 157 +++++++++++++++++++------ core/incremental/join_operator.rs | 109 ++++++++++------- core/incremental/merge_operator.rs | 2 +- core/incremental/operator.rs | 3 - core/translate/view.rs | 4 +- 8 files changed, 263 insertions(+), 128 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 175d182b6..29d269c46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3696,6 +3696,12 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.9" @@ -4746,6 +4752,7 @@ checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d" dependencies = [ "getrandom 0.3.2", "js-sys", + "sha1_smol", "wasm-bindgen", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index 7add4af52..d5850d7f0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -72,7 +72,7 @@ strum_macros = { workspace = true } bitflags = { workspace = true } serde = { workspace = true, optional = true, features = ["derive"] } paste = "1.0.15" -uuid = { version = "1.11.0", features = ["v4", "v7"], optional = true } +uuid = { version = "1.11.0", features = ["v4", "v5", "v7"], optional = true } tempfile = { workspace = true } pack1 = { version = "1.0.0", features = ["bytemuck"] } bytemuck = "1.23.1" diff --git a/core/incremental/aggregate_operator.rs b/core/incremental/aggregate_operator.rs index df4d1a18a..868e55409 100644 --- a/core/incremental/aggregate_operator.rs +++ b/core/incremental/aggregate_operator.rs @@ -1,6 +1,7 @@ // Aggregate operator for DBSP-style incremental computation use crate::function::{AggFunc, Func}; +use crate::incremental::dbsp::Hash128; use crate::incremental::dbsp::{Delta, DeltaPair, HashableRow}; use crate::incremental::operator::{ generate_storage_id, ComputationTracker, DbspStateCursors, EvalState, IncrementalOperator, @@ -312,17 +313,17 @@ impl AggregateEvalState { // Get the current group to read let (group_key_str, _group_key) = &groups_to_read[*current_idx]; - // Build the key for the index: (operator_id, zset_id, element_id) + // Build the key for the index: (operator_id, zset_hash, element_id) // For regular aggregates, use column_index=0 and AGG_TYPE_REGULAR let operator_storage_id = generate_storage_id(operator.operator_id, 0, AGG_TYPE_REGULAR); - let zset_id = operator.generate_group_rowid(group_key_str); + let zset_hash = operator.generate_group_hash(group_key_str); let element_id = 0i64; // Always 0 for aggregators // Create index key values let index_key_values = vec![ Value::Integer(operator_storage_id), - Value::Integer(zset_id), + zset_hash.to_value(), Value::Integer(element_id), ]; @@ -955,7 +956,7 @@ impl AggregateOperator { for (group_key_str, state) in existing_groups { let group_key = temp_keys.get(group_key_str).cloned().unwrap_or_default(); - // Generate a unique rowid for this group + // Generate synthetic rowid for this group let result_key = self.generate_group_rowid(group_key_str); if let Some(old_row_values) = old_values.get(group_key_str) { @@ -1022,19 +1023,26 @@ impl AggregateOperator { self.tracker = Some(tracker); } - /// Generate a rowid for a group - /// For no GROUP BY: always returns 0 - /// For GROUP BY: returns a hash of the group key string - pub fn generate_group_rowid(&self, group_key_str: &str) -> i64 { + /// Generate a hash for a group + /// For no GROUP BY: returns a zero hash + /// For GROUP BY: returns a 128-bit hash of the group key string + pub fn generate_group_hash(&self, group_key_str: &str) -> Hash128 { if self.group_by.is_empty() { - 0 + Hash128::new(0, 0) } else { - group_key_str - .bytes() - .fold(0i64, |acc, b| acc.wrapping_mul(31).wrapping_add(b as i64)) + Hash128::hash_str(group_key_str) } } + /// Generate a rowid for a group (for output rows) + /// This is NOT the hash used for storage (that's generate_group_hash which returns full 128-bit). + /// This is a synthetic rowid used in place of SQLite's rowid for aggregate output rows. + /// We truncate the 128-bit hash to 64 bits for SQLite rowid compatibility. + pub fn generate_group_rowid(&self, group_key_str: &str) -> i64 { + let hash = self.generate_group_hash(group_key_str); + hash.as_i64() + } + /// Extract group key values from a row pub fn extract_group_key(&self, values: &[Value]) -> Vec { let mut key = Vec::new(); @@ -1140,7 +1148,7 @@ impl IncrementalOperator for AggregateOperator { // For regular aggregates, use column_index=0 and AGG_TYPE_REGULAR let operator_storage_id = generate_storage_id(self.operator_id, 0, AGG_TYPE_REGULAR); - let zset_id = self.generate_group_rowid(group_key_str); + let zset_hash = self.generate_group_hash(group_key_str); let element_id = 0i64; // Determine weight: -1 to delete (cancels existing weight=1), 1 to insert/update @@ -1150,22 +1158,22 @@ impl IncrementalOperator for AggregateOperator { let state_blob = agg_state.to_blob(&self.aggregates, group_key); let blob_value = Value::Blob(state_blob); - // Build the aggregate storage format: [operator_id, zset_id, element_id, value, weight] + // Build the aggregate storage format: [operator_id, zset_hash, element_id, value, weight] let operator_id_val = Value::Integer(operator_storage_id); - let zset_id_val = Value::Integer(zset_id); + let zset_hash_val = zset_hash.to_value(); let element_id_val = Value::Integer(element_id); let blob_val = blob_value.clone(); // Create index key - the first 3 columns of our primary key let index_key = vec![ operator_id_val.clone(), - zset_id_val.clone(), + zset_hash_val.clone(), element_id_val.clone(), ]; // Record values (without weight) let record_values = - vec![operator_id_val, zset_id_val, element_id_val, blob_val]; + vec![operator_id_val, zset_hash_val, element_id_val, blob_val]; return_and_restore_if_io!( &mut self.commit_state, @@ -1201,7 +1209,7 @@ impl IncrementalOperator for AggregateOperator { self.operator_id, &self.column_min_max, cursors, - |group_key_str| self.generate_group_rowid(group_key_str) + |group_key_str| self.generate_group_hash(group_key_str) ) ); @@ -1359,7 +1367,7 @@ impl RecomputeMinMax { // Create storage keys for index lookup let storage_id = generate_storage_id(operator.operator_id, storage_index, AGG_TYPE_MINMAX); - let zset_id = operator.generate_group_rowid(&group_key); + let zset_hash = operator.generate_group_hash(&group_key); // Get the values for this group from min_max_deltas let group_values = min_max_deltas.get(&group_key).cloned().unwrap_or_default(); @@ -1373,7 +1381,7 @@ impl RecomputeMinMax { group_key.clone(), column_name, storage_id, - zset_id, + zset_hash, group_values, )) } else { @@ -1382,7 +1390,7 @@ impl RecomputeMinMax { group_key.clone(), column_name, storage_id, - zset_id, + zset_hash, group_values, )) }; @@ -1454,7 +1462,7 @@ pub enum ScanState { /// Storage ID for the index seek storage_id: i64, /// ZSet ID for the group - zset_id: i64, + zset_hash: Hash128, /// Group values from MinMaxDeltas: (column_name, HashableRow) -> weight group_values: HashMap<(usize, HashableRow), isize>, /// Whether we're looking for MIN (true) or MAX (false) @@ -1470,7 +1478,7 @@ pub enum ScanState { /// Storage ID for the index seek storage_id: i64, /// ZSet ID for the group - zset_id: i64, + zset_hash: Hash128, /// Group values from MinMaxDeltas: (column_name, HashableRow) -> weight group_values: HashMap<(usize, HashableRow), isize>, /// Whether we're looking for MIN (true) or MAX (false) @@ -1488,7 +1496,7 @@ impl ScanState { group_key: String, column_name: usize, storage_id: i64, - zset_id: i64, + zset_hash: Hash128, group_values: HashMap<(usize, HashableRow), isize>, ) -> Self { Self::CheckCandidate { @@ -1496,7 +1504,7 @@ impl ScanState { group_key, column_name, storage_id, - zset_id, + zset_hash, group_values, is_min: true, } @@ -1510,7 +1518,7 @@ impl ScanState { index_record: &ImmutableRecord, seek_op: SeekOp, storage_id: i64, - zset_id: i64, + zset_hash: Hash128, ) -> Result>> { let seek_result = return_if_io!(cursors .index_cursor @@ -1533,15 +1541,26 @@ impl ScanState { let Some(rec_storage_id) = values.first() else { return Ok(IOResult::Done(None)); }; - let Some(rec_zset_id) = values.get(1) else { + let Some(rec_zset_hash) = values.get(1) else { return Ok(IOResult::Done(None)); }; // Check if we're still in the same group - if let (RefValue::Integer(rec_sid), RefValue::Integer(rec_zid)) = - (rec_storage_id, rec_zset_id) - { - if *rec_sid != storage_id || *rec_zid != zset_id { + if let RefValue::Integer(rec_sid) = rec_storage_id { + if *rec_sid != storage_id { + return Ok(IOResult::Done(None)); + } + } else { + return Ok(IOResult::Done(None)); + } + + // Compare zset_hash as blob + if let RefValue::Blob(rec_zset_blob) = rec_zset_hash { + if let Some(rec_hash) = Hash128::from_blob(rec_zset_blob.to_slice()) { + if rec_hash != zset_hash { + return Ok(IOResult::Done(None)); + } + } else { return Ok(IOResult::Done(None)); } } else { @@ -1557,7 +1576,7 @@ impl ScanState { group_key: String, column_name: usize, storage_id: i64, - zset_id: i64, + zset_hash: Hash128, group_values: HashMap<(usize, HashableRow), isize>, ) -> Self { Self::CheckCandidate { @@ -1565,7 +1584,7 @@ impl ScanState { group_key, column_name, storage_id, - zset_id, + zset_hash, group_values, is_min: false, } @@ -1582,7 +1601,7 @@ impl ScanState { group_key, column_name, storage_id, - zset_id, + zset_hash, group_values, is_min, } => { @@ -1602,7 +1621,7 @@ impl ScanState { group_key: std::mem::take(group_key), column_name: std::mem::take(column_name), storage_id: *storage_id, - zset_id: *zset_id, + zset_hash: *zset_hash, group_values: std::mem::take(group_values), is_min: *is_min, }; @@ -1664,14 +1683,14 @@ impl ScanState { group_key, column_name, storage_id, - zset_id, + zset_hash, group_values, is_min, } => { // Seek to the next value in the index let index_key = vec![ Value::Integer(*storage_id), - Value::Integer(*zset_id), + zset_hash.to_value(), current_candidate.clone(), ]; let index_record = ImmutableRecord::from_values(&index_key, index_key.len()); @@ -1687,7 +1706,7 @@ impl ScanState { &index_record, seek_op, *storage_id, - *zset_id + *zset_hash )); *self = ScanState::CheckCandidate { @@ -1695,7 +1714,7 @@ impl ScanState { group_key: std::mem::take(group_key), column_name: std::mem::take(column_name), storage_id: *storage_id, - zset_id: *zset_id, + zset_hash: *zset_hash, group_values: std::mem::take(group_values), is_min: *is_min, }; @@ -1749,7 +1768,7 @@ impl MinMaxPersistState { operator_id: usize, column_min_max: &HashMap, cursors: &mut DbspStateCursors, - generate_group_rowid: impl Fn(&str) -> i64, + generate_group_hash: impl Fn(&str) -> Hash128, ) -> Result> { loop { match self { @@ -1835,7 +1854,7 @@ impl MinMaxPersistState { // Build the key components for MinMax storage using new encoding let storage_id = generate_storage_id(operator_id, column_index, AGG_TYPE_MINMAX); - let zset_id = generate_group_rowid(group_key_str); + let zset_hash = generate_group_hash(group_key_str); // element_id is the actual value for Min/Max let element_id_val = value.clone(); @@ -1843,15 +1862,15 @@ impl MinMaxPersistState { // Create index key let index_key = vec![ Value::Integer(storage_id), - Value::Integer(zset_id), + zset_hash.to_value(), element_id_val.clone(), ]; - // Record values (operator_id, zset_id, element_id, unused_placeholder) + // Record values (operator_id, zset_hash, element_id, unused_placeholder) // For MIN/MAX, the element_id IS the value, so we use NULL for the 4th column let record_values = vec![ Value::Integer(storage_id), - Value::Integer(zset_id), + zset_hash.to_value(), element_id_val.clone(), Value::Null, // Placeholder - not used for MIN/MAX ]; diff --git a/core/incremental/dbsp.rs b/core/incremental/dbsp.rs index eeab315d3..a5708518d 100644 --- a/core/incremental/dbsp.rs +++ b/core/incremental/dbsp.rs @@ -2,10 +2,121 @@ // For now, we'll use a basic approach and can expand to full DBSP later use crate::Value; -use std::collections::hash_map::DefaultHasher; use std::collections::{BTreeMap, HashMap}; use std::hash::{Hash, Hasher}; +/// A 128-bit hash value implemented as a UUID +/// We use UUID because it's a standard 128-bit type we already depend on +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Hash128 { + // Store as UUID internally for efficient 128-bit representation + uuid: uuid::Uuid, +} + +impl Hash128 { + /// Create a new 128-bit hash from high and low 64-bit parts + pub fn new(high: u64, low: u64) -> Self { + // Convert two u64 values to UUID bytes (big-endian) + let mut bytes = [0u8; 16]; + bytes[0..8].copy_from_slice(&high.to_be_bytes()); + bytes[8..16].copy_from_slice(&low.to_be_bytes()); + Self { + uuid: uuid::Uuid::from_bytes(bytes), + } + } + + /// Get the low 64 bits as i64 (for when we need a rowid) + pub fn as_i64(&self) -> i64 { + let bytes = self.uuid.as_bytes(); + let low = u64::from_be_bytes([ + bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15], + ]); + low as i64 + } + + /// Compute a 128-bit hash of the given values + /// We serialize values to a string representation and use UUID v5 (SHA-1 based) + /// to get a deterministic 128-bit hash + pub fn hash_values(values: &[Value]) -> Self { + // Build a string representation of all values + // Use a delimiter that won't appear in normal values + let mut s = String::new(); + for (i, value) in values.iter().enumerate() { + if i > 0 { + s.push('\x00'); // null byte as delimiter + } + // Add type prefix to distinguish between types + match value { + Value::Null => s.push_str("N:"), + Value::Integer(n) => { + s.push_str("I:"); + s.push_str(&n.to_string()); + } + Value::Float(f) => { + s.push_str("F:"); + // Use to_bits to ensure consistent representation + s.push_str(&f.to_bits().to_string()); + } + Value::Text(t) => { + s.push_str("T:"); + s.push_str(t.as_str()); + } + Value::Blob(b) => { + s.push_str("B:"); + s.push_str(&hex::encode(b)); + } + } + } + + Self::hash_str(&s) + } + + /// Hash a string value to 128 bits using UUID v5 + pub fn hash_str(s: &str) -> Self { + // Use UUID v5 with a fixed namespace to get deterministic 128-bit hashes + // We use the DNS namespace as it's a standard choice + let uuid = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_DNS, s.as_bytes()); + Self { uuid } + } + + /// Convert to a big-endian byte array for storage + pub fn to_blob(self) -> Vec { + self.uuid.as_bytes().to_vec() + } + + /// Create from a big-endian byte array + pub fn from_blob(bytes: &[u8]) -> Option { + if bytes.len() != 16 { + return None; + } + + let mut uuid_bytes = [0u8; 16]; + uuid_bytes.copy_from_slice(bytes); + Some(Self { + uuid: uuid::Uuid::from_bytes(uuid_bytes), + }) + } + + /// Convert to a Value::Blob for storage + pub fn to_value(self) -> Value { + Value::Blob(self.to_blob()) + } + + /// Try to extract a Hash128 from a Value + pub fn from_value(value: &Value) -> Option { + match value { + Value::Blob(b) => Self::from_blob(b), + _ => None, + } + } +} + +impl std::fmt::Display for Hash128 { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.uuid) + } +} + // The DBSP paper uses as a key the whole record, with both the row key and the values. This is a // bit confuses for us in databases, because when you say "key", it is easy to understand that as // being the row key. @@ -30,7 +141,7 @@ pub struct HashableRow { pub values: Vec, // Pre-computed hash: DBSP rows are immutable and frequently hashed during joins, // making caching worthwhile despite the memory overhead - cached_hash: u64, + cached_hash: Hash128, } impl HashableRow { @@ -43,47 +154,23 @@ impl HashableRow { } } - fn compute_hash(rowid: i64, values: &[Value]) -> u64 { - let mut hasher = DefaultHasher::new(); - - rowid.hash(&mut hasher); - - for value in values { - match value { - Value::Null => { - 0u8.hash(&mut hasher); - } - Value::Integer(i) => { - 1u8.hash(&mut hasher); - i.hash(&mut hasher); - } - Value::Float(f) => { - 2u8.hash(&mut hasher); - f.to_bits().hash(&mut hasher); - } - Value::Text(s) => { - 3u8.hash(&mut hasher); - s.value.hash(&mut hasher); - (s.subtype as u8).hash(&mut hasher); - } - Value::Blob(b) => { - 4u8.hash(&mut hasher); - b.hash(&mut hasher); - } - } - } - - hasher.finish() + fn compute_hash(rowid: i64, values: &[Value]) -> Hash128 { + // Include rowid in the hash by prepending it to values + let mut all_values = Vec::with_capacity(values.len() + 1); + all_values.push(Value::Integer(rowid)); + all_values.extend_from_slice(values); + Hash128::hash_values(&all_values) } - pub fn cached_hash(&self) -> u64 { + pub fn cached_hash(&self) -> Hash128 { self.cached_hash } } impl Hash for HashableRow { fn hash(&self, state: &mut H) { - self.cached_hash.hash(state); + // Hash the 128-bit value by hashing both parts + self.cached_hash.to_blob().hash(state); } } diff --git a/core/incremental/join_operator.rs b/core/incremental/join_operator.rs index d0b799a8d..f008a51c7 100644 --- a/core/incremental/join_operator.rs +++ b/core/incremental/join_operator.rs @@ -1,5 +1,6 @@ #![allow(dead_code)] +use crate::incremental::dbsp::Hash128; use crate::incremental::dbsp::{Delta, DeltaPair, HashableRow}; use crate::incremental::operator::{ generate_storage_id, ComputationTracker, DbspStateCursors, EvalState, IncrementalOperator, @@ -22,23 +23,39 @@ pub enum JoinType { fn read_next_join_row( storage_id: i64, join_key: &HashableRow, - last_element_id: i64, + last_element_hash: Option, cursors: &mut DbspStateCursors, -) -> Result>> { +) -> Result>> { // Build the index key: (storage_id, zset_id, element_id) // zset_id is the hash of the join key - let zset_id = join_key.cached_hash() as i64; + let zset_hash = join_key.cached_hash(); - let index_key_values = vec![ - Value::Integer(storage_id), - Value::Integer(zset_id), - Value::Integer(last_element_id), - ]; + // For iteration, use the last element hash if we have one, or NULL to start + let index_key_values = match last_element_hash { + Some(last_hash) => vec![ + Value::Integer(storage_id), + zset_hash.to_value(), + last_hash.to_value(), + ], + None => vec![ + Value::Integer(storage_id), + zset_hash.to_value(), + Value::Null, // Start iteration from beginning + ], + }; let index_record = ImmutableRecord::from_values(&index_key_values, index_key_values.len()); + + // Use GE (>=) for initial seek with NULL, GT (>) for continuation + let seek_op = if last_element_hash.is_none() { + SeekOp::GE { eq_only: false } + } else { + SeekOp::GT + }; + let seek_result = return_if_io!(cursors .index_cursor - .seek(SeekKey::IndexKey(&index_record), SeekOp::GT)); + .seek(SeekKey::IndexKey(&index_record), seek_op)); if !matches!(seek_result, SeekResult::Found) { return Ok(IOResult::Done(None)); @@ -48,7 +65,7 @@ fn read_next_join_row( let current_record = return_if_io!(cursors.index_cursor.record()); // Extract all needed values from the record before dropping it - let (found_storage_id, found_zset_id, element_id) = if let Some(rec) = current_record { + let (found_storage_id, found_zset_hash, element_hash) = if let Some(rec) = current_record { let values = rec.get_values(); // Index has 4 values: storage_id, zset_id, element_id, rowid (appended by WriteRow) @@ -57,17 +74,21 @@ fn read_next_join_row( Value::Integer(id) => *id, _ => return Ok(IOResult::Done(None)), }; - let found_zset_id = match &values[1].to_owned() { - Value::Integer(id) => *id, + let found_zset_hash = match &values[1].to_owned() { + Value::Blob(blob) => Hash128::from_blob(blob).ok_or_else(|| { + crate::LimboError::InternalError("Invalid zset_hash blob".to_string()) + })?, _ => return Ok(IOResult::Done(None)), }; - let element_id = match &values[2].to_owned() { - Value::Integer(id) => *id, + let element_hash = match &values[2].to_owned() { + Value::Blob(blob) => Hash128::from_blob(blob).ok_or_else(|| { + crate::LimboError::InternalError("Invalid element_hash blob".to_string()) + })?, _ => { return Ok(IOResult::Done(None)); } }; - (found_storage_id, found_zset_id, element_id) + (found_storage_id, found_zset_hash, element_hash) } else { return Ok(IOResult::Done(None)); } @@ -77,7 +98,7 @@ fn read_next_join_row( // Now we can safely check if we're in the right range // If we've moved to a different storage_id or zset_id, we're done - if found_storage_id != storage_id || found_zset_id != zset_id { + if found_storage_id != storage_id || found_zset_hash != zset_hash { return Ok(IOResult::Done(None)); } @@ -109,7 +130,7 @@ fn read_next_join_row( _ => return Ok(IOResult::Done(None)), }; - return Ok(IOResult::Done(Some((element_id, row, weight)))); + return Ok(IOResult::Done(Some((element_hash, row, weight)))); } } } @@ -127,13 +148,13 @@ pub enum JoinEvalState { deltas: DeltaPair, output: Delta, current_idx: usize, - last_row_scanned: i64, + last_row_scanned: Option, }, ProcessRightJoin { deltas: DeltaPair, output: Delta, current_idx: usize, - last_row_scanned: i64, + last_row_scanned: Option, }, Done { output: Delta, @@ -151,9 +172,9 @@ impl JoinEvalState { // Combine the rows let mut combined_values = left_row.values.clone(); combined_values.extend(right_row.values.clone()); - // Use hash of the combined values as rowid to ensure uniqueness + // Use hash of combined values as synthetic rowid let temp_row = HashableRow::new(0, combined_values.clone()); - let joined_rowid = temp_row.cached_hash() as i64; + let joined_rowid = temp_row.cached_hash().as_i64(); let joined_row = HashableRow::new(joined_rowid, combined_values); // Add to output with combined weight @@ -177,7 +198,7 @@ impl JoinEvalState { deltas: std::mem::take(deltas), output: std::mem::take(output), current_idx: 0, - last_row_scanned: i64::MIN, + last_row_scanned: None, }; } JoinEvalState::ProcessLeftJoin { @@ -191,7 +212,7 @@ impl JoinEvalState { deltas: std::mem::take(deltas), output: std::mem::take(output), current_idx: 0, - last_row_scanned: i64::MIN, + last_row_scanned: None, }; } else { let (left_row, left_weight) = &deltas.left.changes[*current_idx]; @@ -209,7 +230,7 @@ impl JoinEvalState { cursors )); match next_row { - Some((element_id, right_row, right_weight)) => { + Some((element_hash, right_row, right_weight)) => { Self::combine_rows( left_row, (*left_weight) as i64, @@ -222,7 +243,7 @@ impl JoinEvalState { deltas: std::mem::take(deltas), output: std::mem::take(output), current_idx: *current_idx, - last_row_scanned: element_id, + last_row_scanned: Some(element_hash), }; } None => { @@ -231,7 +252,7 @@ impl JoinEvalState { deltas: std::mem::take(deltas), output: std::mem::take(output), current_idx: *current_idx + 1, - last_row_scanned: i64::MIN, + last_row_scanned: None, }; } } @@ -263,7 +284,7 @@ impl JoinEvalState { cursors )); match next_row { - Some((element_id, left_row, left_weight)) => { + Some((element_hash, left_row, left_weight)) => { Self::combine_rows( &left_row, left_weight as i64, @@ -276,7 +297,7 @@ impl JoinEvalState { deltas: std::mem::take(deltas), output: std::mem::take(output), current_idx: *current_idx, - last_row_scanned: element_id, + last_row_scanned: Some(element_hash), }; } None => { @@ -285,7 +306,7 @@ impl JoinEvalState { deltas: std::mem::take(deltas), output: std::mem::take(output), current_idx: *current_idx + 1, - last_row_scanned: i64::MIN, + last_row_scanned: None, }; } } @@ -376,7 +397,7 @@ impl JoinOperator { JoinType::Inner => {} // Inner join is supported } - Ok(Self { + let result = Self { operator_id, join_type, left_key_indices, @@ -385,7 +406,8 @@ impl JoinOperator { right_columns, tracker: None, commit_state: JoinCommitState::Idle, - }) + }; + Ok(result) } /// Extract join key from row values using the specified indices @@ -485,8 +507,9 @@ impl JoinOperator { // Create the joined row with a unique rowid // Use hash of the combined values to ensure uniqueness + // Use hash of combined values as synthetic rowid let temp_row = HashableRow::new(0, combined_values.clone()); - let joined_rowid = temp_row.cached_hash() as i64; + let joined_rowid = temp_row.cached_hash().as_i64(); let joined_row = HashableRow::new(joined_rowid, combined_values.clone()); @@ -617,20 +640,20 @@ impl IncrementalOperator for JoinOperator { // The index key: (storage_id, zset_id, element_id) // zset_id is the hash of the join key, element_id is hash of the row let storage_id = self.left_storage_id(); - let zset_id = join_key.cached_hash() as i64; - let element_id = row.cached_hash() as i64; + let zset_hash = join_key.cached_hash(); + let element_hash = row.cached_hash(); let index_key = vec![ Value::Integer(storage_id), - Value::Integer(zset_id), - Value::Integer(element_id), + zset_hash.to_value(), + element_hash.to_value(), ]; // The record values: we'll store the serialized row as a blob let row_blob = serialize_hashable_row(row); let record_values = vec![ Value::Integer(self.left_storage_id()), - Value::Integer(join_key.cached_hash() as i64), - Value::Integer(row.cached_hash() as i64), + zset_hash.to_value(), + element_hash.to_value(), Value::Blob(row_blob), ]; @@ -665,18 +688,20 @@ impl IncrementalOperator for JoinOperator { let join_key = self.extract_join_key(&row.values, &self.right_key_indices); // The index key: (storage_id, zset_id, element_id) + let zset_hash = join_key.cached_hash(); + let element_hash = row.cached_hash(); let index_key = vec![ Value::Integer(self.right_storage_id()), - Value::Integer(join_key.cached_hash() as i64), - Value::Integer(row.cached_hash() as i64), + zset_hash.to_value(), + element_hash.to_value(), ]; // The record values: we'll store the serialized row as a blob let row_blob = serialize_hashable_row(row); let record_values = vec![ Value::Integer(self.right_storage_id()), - Value::Integer(join_key.cached_hash() as i64), - Value::Integer(row.cached_hash() as i64), + zset_hash.to_value(), + element_hash.to_value(), Value::Blob(row_blob), ]; diff --git a/core/incremental/merge_operator.rs b/core/incremental/merge_operator.rs index c8547028f..2ce717c26 100644 --- a/core/incremental/merge_operator.rs +++ b/core/incremental/merge_operator.rs @@ -57,7 +57,7 @@ impl MergeOperator { for (row, weight) in delta.changes { // Hash only the values (not rowid) for deduplication let temp_row = HashableRow::new(0, row.values.clone()); - let value_hash = temp_row.cached_hash(); + let value_hash = temp_row.cached_hash().as_i64() as u64; // Check if we've seen this value before let assigned_rowid = diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 0dec705f9..afabd063f 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -342,7 +342,6 @@ mod tests { let group_key_str = AggregateOperator::group_key_to_string(&group_key); let rowid = agg.generate_group_rowid(&group_key_str); - let output_row = HashableRow::new(rowid, output_values); result.changes.push((output_row, 1)); } @@ -2929,7 +2928,6 @@ mod tests { let index_cursor = BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10); let mut cursors = DbspStateCursors::new(table_cursor, index_cursor); - let mut join = JoinOperator::new( 1, // operator_id JoinType::Inner, @@ -2945,7 +2943,6 @@ mod tests { left_delta.insert(1, vec![Value::Integer(1), Value::Float(100.0)]); left_delta.insert(2, vec![Value::Integer(2), Value::Float(200.0)]); left_delta.insert(3, vec![Value::Integer(3), Value::Float(300.0)]); // No match initially - let mut right_delta = Delta::new(); right_delta.insert(1, vec![Value::Integer(1), Value::Text("Alice".into())]); right_delta.insert(2, vec![Value::Integer(2), Value::Text("Bob".into())]); diff --git a/core/translate/view.rs b/core/translate/view.rs index 9ff8e6c89..7f1fb23b2 100644 --- a/core/translate/view.rs +++ b/core/translate/view.rs @@ -149,8 +149,8 @@ pub fn translate_create_materialized_view( let dbsp_sql = format!( "CREATE TABLE {dbsp_table_name} (\ operator_id INTEGER NOT NULL, \ - zset_id INTEGER NOT NULL, \ - element_id NOT NULL, \ + zset_id BLOB NOT NULL, \ + element_id BLOB NOT NULL, \ value BLOB, \ weight INTEGER NOT NULL, \ PRIMARY KEY (operator_id, zset_id, element_id)\