move hashable_row to dbsp.rs

There will be a new type for joins, so it makes less sense to have
a separate file just for it. dbsp.rs is good.
This commit is contained in:
Glauber Costa
2025-09-08 07:29:38 -05:00
parent 1fd345f382
commit 6541a43670
5 changed files with 102 additions and 107 deletions

View File

@@ -1,8 +1,7 @@
use crate::{ use crate::{
incremental::{ incremental::{
compiler::{DeltaSet, ExecuteState}, compiler::{DeltaSet, ExecuteState},
dbsp::{Delta, RowKeyZSet}, dbsp::{Delta, HashableRow, RowKeyZSet},
hashable_row::HashableRow,
view::{IncrementalView, ViewTransactionState}, view::{IncrementalView, ViewTransactionState},
}, },
return_if_io, return_if_io,

View File

@@ -1,9 +1,107 @@
// Simplified DBSP integration for incremental view maintenance // Simplified DBSP integration for incremental view maintenance
// For now, we'll use a basic approach and can expand to full DBSP later // For now, we'll use a basic approach and can expand to full DBSP later
use super::hashable_row::HashableRow;
use crate::Value; use crate::Value;
use std::collections::hash_map::DefaultHasher;
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::hash::{Hash, Hasher};
// The DBSP paper uses as a key the whole record, with both the row key and the values. This is a
// bit confuses for us in databases, because when you say "key", it is easy to understand that as
// being the row key.
//
// Empirically speaking, using row keys as the ZSet keys will waste a competent but not brilliant
// engineer around 82 and 88 hours, depending on how you count. Hours that are never coming back.
//
// One of the situations in which using row keys completely breaks are table updates. If the "key"
// is the row key, let's say "5", then an update is a delete + insert. Imagine a table that had k =
// 5, v = 5, and a view that filters v > 2.
//
// Now we will do an update that changes v => 1. If the "key" is 5, then inside the Delta set, we
// will have (5, weight = -1), (5, weight = +1), and the whole thing just disappears. The Delta
// set, therefore, has to contain ((5, 5), weight = -1), ((5, 1), weight = +1).
//
// It is theoretically possible to use the rowkey in the ZSet and then use a hash of key ->
// Vec(changes) in the Delta set. But deviating from the paper here is just asking for trouble, as
// I am sure it would break somewhere else.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HashableRow {
pub rowid: i64,
pub values: Vec<Value>,
// Pre-computed hash: DBSP rows are immutable and frequently hashed during joins,
// making caching worthwhile despite the memory overhead
cached_hash: u64,
}
impl HashableRow {
pub fn new(rowid: i64, values: Vec<Value>) -> Self {
let cached_hash = Self::compute_hash(rowid, &values);
Self {
rowid,
values,
cached_hash,
}
}
fn compute_hash(rowid: i64, values: &[Value]) -> u64 {
let mut hasher = DefaultHasher::new();
rowid.hash(&mut hasher);
for value in values {
match value {
Value::Null => {
0u8.hash(&mut hasher);
}
Value::Integer(i) => {
1u8.hash(&mut hasher);
i.hash(&mut hasher);
}
Value::Float(f) => {
2u8.hash(&mut hasher);
f.to_bits().hash(&mut hasher);
}
Value::Text(s) => {
3u8.hash(&mut hasher);
s.value.hash(&mut hasher);
(s.subtype as u8).hash(&mut hasher);
}
Value::Blob(b) => {
4u8.hash(&mut hasher);
b.hash(&mut hasher);
}
}
}
hasher.finish()
}
}
impl Hash for HashableRow {
fn hash<H: Hasher>(&self, state: &mut H) {
self.cached_hash.hash(state);
}
}
impl PartialOrd for HashableRow {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HashableRow {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// First compare by rowid, then by values if rowids are equal
// This ensures Ord is consistent with Eq (which compares all fields)
match self.rowid.cmp(&other.rowid) {
std::cmp::Ordering::Equal => {
// If rowids are equal, compare values to maintain consistency with Eq
self.values.cmp(&other.values)
}
other => other,
}
}
}
type DeltaEntry = (HashableRow, isize); type DeltaEntry = (HashableRow, isize);
/// A delta represents ordered changes to data /// A delta represents ordered changes to data

View File

@@ -1,100 +0,0 @@
use crate::types::Value;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
// The DBSP paper uses as a key the whole record, with both the row key and the values. This is a
// bit confuses for us in databases, because when you say "key", it is easy to understand that as
// being the row key.
//
// Empirically speaking, using row keys as the ZSet keys will waste a competent but not brilliant
// engineer around 82 and 88 hours, depending on how you count. Hours that are never coming back.
//
// One of the situations in which using row keys completely breaks are table updates. If the "key"
// is the row key, let's say "5", then an update is a delete + insert. Imagine a table that had k =
// 5, v = 5, and a view that filters v > 2.
//
// Now we will do an update that changes v => 1. If the "key" is 5, then inside the Delta set, we
// will have (5, weight = -1), (5, weight = +1), and the whole thing just disappears. The Delta
// set, therefore, has to contain ((5, 5), weight = -1), ((5, 1), weight = +1).
//
// It is theoretically possible to use the rowkey in the ZSet and then use a hash of key ->
// Vec(changes) in the Delta set. But deviating from the paper here is just asking for trouble, as
// I am sure it would break somewhere else.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HashableRow {
pub rowid: i64,
pub values: Vec<Value>,
// Pre-computed hash: DBSP rows are immutable and frequently hashed during joins,
// making caching worthwhile despite the memory overhead
cached_hash: u64,
}
impl HashableRow {
pub fn new(rowid: i64, values: Vec<Value>) -> Self {
let cached_hash = Self::compute_hash(rowid, &values);
Self {
rowid,
values,
cached_hash,
}
}
fn compute_hash(rowid: i64, values: &[Value]) -> u64 {
let mut hasher = DefaultHasher::new();
rowid.hash(&mut hasher);
for value in values {
match value {
Value::Null => {
0u8.hash(&mut hasher);
}
Value::Integer(i) => {
1u8.hash(&mut hasher);
i.hash(&mut hasher);
}
Value::Float(f) => {
2u8.hash(&mut hasher);
f.to_bits().hash(&mut hasher);
}
Value::Text(s) => {
3u8.hash(&mut hasher);
s.value.hash(&mut hasher);
(s.subtype as u8).hash(&mut hasher);
}
Value::Blob(b) => {
4u8.hash(&mut hasher);
b.hash(&mut hasher);
}
}
}
hasher.finish()
}
}
impl Hash for HashableRow {
fn hash<H: Hasher>(&self, state: &mut H) {
self.cached_hash.hash(state);
}
}
impl PartialOrd for HashableRow {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HashableRow {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// First compare by rowid, then by values if rowids are equal
// This ensures Ord is consistent with Eq (which compares all fields)
match self.rowid.cmp(&other.rowid) {
std::cmp::Ordering::Equal => {
// If rowids are equal, compare values to maintain consistency with Eq
self.values.cmp(&other.values)
}
other => other,
}
}
}

View File

@@ -2,7 +2,6 @@ pub mod compiler;
pub mod cursor; pub mod cursor;
pub mod dbsp; pub mod dbsp;
pub mod expr_compiler; pub mod expr_compiler;
pub mod hashable_row;
pub mod operator; pub mod operator;
pub mod persistence; pub mod persistence;
pub mod view; pub mod view;

View File

@@ -3,9 +3,8 @@
// Based on Feldera DBSP design but adapted for Turso's architecture // Based on Feldera DBSP design but adapted for Turso's architecture
use crate::function::{AggFunc, Func}; use crate::function::{AggFunc, Func};
use crate::incremental::dbsp::Delta; use crate::incremental::dbsp::{Delta, HashableRow};
use crate::incremental::expr_compiler::CompiledExpression; use crate::incremental::expr_compiler::CompiledExpression;
use crate::incremental::hashable_row::HashableRow;
use crate::incremental::persistence::{ReadRecord, WriteRow}; use crate::incremental::persistence::{ReadRecord, WriteRow};
use crate::storage::btree::BTreeCursor; use crate::storage::btree::BTreeCursor;
use crate::types::{IOResult, SeekKey, Text}; use crate::types::{IOResult, SeekKey, Text};
@@ -210,7 +209,7 @@ impl ComputationTracker {
} }
#[cfg(test)] #[cfg(test)]
mod hashable_row_tests { mod dbsp_types_tests {
use super::*; use super::*;
#[test] #[test]