diff --git a/core/incremental/cursor.rs b/core/incremental/cursor.rs index 1e0e91af8..7bd5d116a 100644 --- a/core/incremental/cursor.rs +++ b/core/incremental/cursor.rs @@ -1,8 +1,7 @@ use crate::{ incremental::{ compiler::{DeltaSet, ExecuteState}, - dbsp::{Delta, RowKeyZSet}, - hashable_row::HashableRow, + dbsp::{Delta, HashableRow, RowKeyZSet}, view::{IncrementalView, ViewTransactionState}, }, return_if_io, diff --git a/core/incremental/dbsp.rs b/core/incremental/dbsp.rs index 607fd562b..935a79146 100644 --- a/core/incremental/dbsp.rs +++ b/core/incremental/dbsp.rs @@ -1,9 +1,107 @@ // Simplified DBSP integration for incremental view maintenance // For now, we'll use a basic approach and can expand to full DBSP later -use super::hashable_row::HashableRow; use crate::Value; +use std::collections::hash_map::DefaultHasher; use std::collections::{BTreeMap, HashMap}; +use std::hash::{Hash, Hasher}; + +// The DBSP paper uses as a key the whole record, with both the row key and the values. This is a +// bit confuses for us in databases, because when you say "key", it is easy to understand that as +// being the row key. +// +// Empirically speaking, using row keys as the ZSet keys will waste a competent but not brilliant +// engineer around 82 and 88 hours, depending on how you count. Hours that are never coming back. +// +// One of the situations in which using row keys completely breaks are table updates. If the "key" +// is the row key, let's say "5", then an update is a delete + insert. Imagine a table that had k = +// 5, v = 5, and a view that filters v > 2. +// +// Now we will do an update that changes v => 1. If the "key" is 5, then inside the Delta set, we +// will have (5, weight = -1), (5, weight = +1), and the whole thing just disappears. The Delta +// set, therefore, has to contain ((5, 5), weight = -1), ((5, 1), weight = +1). +// +// It is theoretically possible to use the rowkey in the ZSet and then use a hash of key -> +// Vec(changes) in the Delta set. But deviating from the paper here is just asking for trouble, as +// I am sure it would break somewhere else. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct HashableRow { + pub rowid: i64, + pub values: Vec, + // Pre-computed hash: DBSP rows are immutable and frequently hashed during joins, + // making caching worthwhile despite the memory overhead + cached_hash: u64, +} + +impl HashableRow { + pub fn new(rowid: i64, values: Vec) -> Self { + let cached_hash = Self::compute_hash(rowid, &values); + Self { + rowid, + values, + cached_hash, + } + } + + fn compute_hash(rowid: i64, values: &[Value]) -> u64 { + let mut hasher = DefaultHasher::new(); + + rowid.hash(&mut hasher); + + for value in values { + match value { + Value::Null => { + 0u8.hash(&mut hasher); + } + Value::Integer(i) => { + 1u8.hash(&mut hasher); + i.hash(&mut hasher); + } + Value::Float(f) => { + 2u8.hash(&mut hasher); + f.to_bits().hash(&mut hasher); + } + Value::Text(s) => { + 3u8.hash(&mut hasher); + s.value.hash(&mut hasher); + (s.subtype as u8).hash(&mut hasher); + } + Value::Blob(b) => { + 4u8.hash(&mut hasher); + b.hash(&mut hasher); + } + } + } + + hasher.finish() + } +} + +impl Hash for HashableRow { + fn hash(&self, state: &mut H) { + self.cached_hash.hash(state); + } +} + +impl PartialOrd for HashableRow { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for HashableRow { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // First compare by rowid, then by values if rowids are equal + // This ensures Ord is consistent with Eq (which compares all fields) + match self.rowid.cmp(&other.rowid) { + std::cmp::Ordering::Equal => { + // If rowids are equal, compare values to maintain consistency with Eq + self.values.cmp(&other.values) + } + other => other, + } + } +} type DeltaEntry = (HashableRow, isize); /// A delta represents ordered changes to data diff --git a/core/incremental/hashable_row.rs b/core/incremental/hashable_row.rs deleted file mode 100644 index 799f88e87..000000000 --- a/core/incremental/hashable_row.rs +++ /dev/null @@ -1,100 +0,0 @@ -use crate::types::Value; -use std::collections::hash_map::DefaultHasher; -use std::hash::{Hash, Hasher}; - -// The DBSP paper uses as a key the whole record, with both the row key and the values. This is a -// bit confuses for us in databases, because when you say "key", it is easy to understand that as -// being the row key. -// -// Empirically speaking, using row keys as the ZSet keys will waste a competent but not brilliant -// engineer around 82 and 88 hours, depending on how you count. Hours that are never coming back. -// -// One of the situations in which using row keys completely breaks are table updates. If the "key" -// is the row key, let's say "5", then an update is a delete + insert. Imagine a table that had k = -// 5, v = 5, and a view that filters v > 2. -// -// Now we will do an update that changes v => 1. If the "key" is 5, then inside the Delta set, we -// will have (5, weight = -1), (5, weight = +1), and the whole thing just disappears. The Delta -// set, therefore, has to contain ((5, 5), weight = -1), ((5, 1), weight = +1). -// -// It is theoretically possible to use the rowkey in the ZSet and then use a hash of key -> -// Vec(changes) in the Delta set. But deviating from the paper here is just asking for trouble, as -// I am sure it would break somewhere else. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct HashableRow { - pub rowid: i64, - pub values: Vec, - // Pre-computed hash: DBSP rows are immutable and frequently hashed during joins, - // making caching worthwhile despite the memory overhead - cached_hash: u64, -} - -impl HashableRow { - pub fn new(rowid: i64, values: Vec) -> Self { - let cached_hash = Self::compute_hash(rowid, &values); - Self { - rowid, - values, - cached_hash, - } - } - - fn compute_hash(rowid: i64, values: &[Value]) -> u64 { - let mut hasher = DefaultHasher::new(); - - rowid.hash(&mut hasher); - - for value in values { - match value { - Value::Null => { - 0u8.hash(&mut hasher); - } - Value::Integer(i) => { - 1u8.hash(&mut hasher); - i.hash(&mut hasher); - } - Value::Float(f) => { - 2u8.hash(&mut hasher); - f.to_bits().hash(&mut hasher); - } - Value::Text(s) => { - 3u8.hash(&mut hasher); - s.value.hash(&mut hasher); - (s.subtype as u8).hash(&mut hasher); - } - Value::Blob(b) => { - 4u8.hash(&mut hasher); - b.hash(&mut hasher); - } - } - } - - hasher.finish() - } -} - -impl Hash for HashableRow { - fn hash(&self, state: &mut H) { - self.cached_hash.hash(state); - } -} - -impl PartialOrd for HashableRow { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for HashableRow { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // First compare by rowid, then by values if rowids are equal - // This ensures Ord is consistent with Eq (which compares all fields) - match self.rowid.cmp(&other.rowid) { - std::cmp::Ordering::Equal => { - // If rowids are equal, compare values to maintain consistency with Eq - self.values.cmp(&other.values) - } - other => other, - } - } -} diff --git a/core/incremental/mod.rs b/core/incremental/mod.rs index 285c70978..2c69e050b 100644 --- a/core/incremental/mod.rs +++ b/core/incremental/mod.rs @@ -2,7 +2,6 @@ pub mod compiler; pub mod cursor; pub mod dbsp; pub mod expr_compiler; -pub mod hashable_row; pub mod operator; pub mod persistence; pub mod view; diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 8d4490826..09a212fdd 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -3,9 +3,8 @@ // Based on Feldera DBSP design but adapted for Turso's architecture use crate::function::{AggFunc, Func}; -use crate::incremental::dbsp::Delta; +use crate::incremental::dbsp::{Delta, HashableRow}; use crate::incremental::expr_compiler::CompiledExpression; -use crate::incremental::hashable_row::HashableRow; use crate::incremental::persistence::{ReadRecord, WriteRow}; use crate::storage::btree::BTreeCursor; use crate::types::{IOResult, SeekKey, Text}; @@ -210,7 +209,7 @@ impl ComputationTracker { } #[cfg(test)] -mod hashable_row_tests { +mod dbsp_types_tests { use super::*; #[test]