Implement a DBSP module

We are not using the DBSP crate because it is very heavy on Tokio and
other dependencies that won't make sense for us to consume.
This commit is contained in:
Glauber Costa
2025-08-08 13:02:52 -05:00
parent e255fc9a81
commit d5b7533ff8
5 changed files with 2092 additions and 0 deletions

119
core/incremental/dbsp.rs Normal file
View File

@@ -0,0 +1,119 @@
// Simplified DBSP integration for incremental view maintenance
// For now, we'll use a basic approach and can expand to full DBSP later
use std::collections::HashMap;
/// A simplified ZSet for incremental computation
/// Each element has a weight: positive for additions, negative for deletions
#[derive(Clone, Debug, Default)]
pub struct SimpleZSet<T> {
data: HashMap<T, isize>,
}
impl<T: std::hash::Hash + Eq + Clone> SimpleZSet<T> {
pub fn new() -> Self {
Self {
data: HashMap::new(),
}
}
pub fn insert(&mut self, item: T, weight: isize) {
let current = self.data.get(&item).copied().unwrap_or(0);
let new_weight = current + weight;
if new_weight == 0 {
self.data.remove(&item);
} else {
self.data.insert(item, new_weight);
}
}
pub fn iter(&self) -> impl Iterator<Item = (&T, isize)> {
self.data.iter().map(|(k, &v)| (k, v))
}
/// Get all items with positive weights
pub fn to_vec(&self) -> Vec<T> {
self.data
.iter()
.filter(|(_, &weight)| weight > 0)
.map(|(item, _)| item.clone())
.collect()
}
pub fn merge(&mut self, other: &SimpleZSet<T>) {
for (item, weight) in other.iter() {
self.insert(item.clone(), weight);
}
}
}
/// A simplified stream for incremental computation
#[derive(Clone, Debug)]
pub struct SimpleStream<T> {
current: SimpleZSet<T>,
}
impl<T: std::hash::Hash + Eq + Clone> SimpleStream<T> {
pub fn from_zset(zset: SimpleZSet<T>) -> Self {
Self { current: zset }
}
/// Apply a delta (change) to the stream
pub fn apply_delta(&mut self, delta: &SimpleZSet<T>) {
self.current.merge(delta);
}
/// Get the current state as a vector of items (only positive weights)
pub fn to_vec(&self) -> Vec<T> {
self.current.to_vec()
}
}
// Type aliases for convenience
use super::hashable_row::HashableRow;
pub type RowKey = HashableRow;
pub type RowKeyZSet = SimpleZSet<RowKey>;
pub type RowKeyStream = SimpleStream<RowKey>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_zset_merge_with_weights() {
let mut zset1 = SimpleZSet::new();
zset1.insert(1, 1); // Row 1 with weight +1
zset1.insert(2, 1); // Row 2 with weight +1
let mut zset2 = SimpleZSet::new();
zset2.insert(2, -1); // Row 2 with weight -1 (delete)
zset2.insert(3, 1); // Row 3 with weight +1 (insert)
zset1.merge(&zset2);
// Row 1: weight 1 (unchanged)
// Row 2: weight 0 (deleted)
// Row 3: weight 1 (inserted)
assert_eq!(zset1.iter().count(), 2); // Only rows 1 and 3
assert!(zset1.iter().any(|(k, _)| *k == 1));
assert!(zset1.iter().any(|(k, _)| *k == 3));
assert!(!zset1.iter().any(|(k, _)| *k == 2)); // Row 2 removed
}
#[test]
fn test_zset_represents_updates_as_delete_plus_insert() {
let mut zset = SimpleZSet::new();
// Initial state
zset.insert(1, 1);
// Update row 1: delete old + insert new
zset.insert(1, -1); // Delete old version
zset.insert(1, 1); // Insert new version
// Weight should be 1 (not 2)
let weight = zset.iter().find(|(k, _)| **k == 1).map(|(_, w)| w);
assert_eq!(weight, Some(1));
}
}

View File

@@ -0,0 +1,80 @@
use crate::types::Value;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
// The DBSP paper uses as a key the whole record, with both the row key and the values. This is a
// bit confuses for us in databases, because when you say "key", it is easy to understand that as
// being the row key.
//
// Empirically speaking, using row keys as the ZSet keys will waste a competent but not brilliant
// engineer around 82 and 88 hours, depending on how you count. Hours that are never coming back.
//
// One of the situations in which using row keys completely breaks are table updates. If the "key"
// is the row key, let's say "5", then an update is a delete + insert. Imagine a table that had k =
// 5, v = 5, and a view that filters v > 2.
//
// Now we will do an update that changes v => 1. If the "key" is 5, then inside the Delta set, we
// will have (5, weight = -1), (5, weight = +1), and the whole thing just disappears. The Delta
// set, therefore, has to contain ((5, 5), weight = -1), ((5, 1), weight = +1).
//
// It is theoretically possible to use the rowkey in the ZSet and then use a hash of key ->
// Vec(changes) in the Delta set. But deviating from the paper here is just asking for trouble, as
// I am sure it would break somewhere else.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HashableRow {
pub rowid: i64,
pub values: Vec<Value>,
// Pre-computed hash: DBSP rows are immutable and frequently hashed during joins,
// making caching worthwhile despite the memory overhead
cached_hash: u64,
}
impl HashableRow {
pub fn new(rowid: i64, values: Vec<Value>) -> Self {
let cached_hash = Self::compute_hash(rowid, &values);
Self {
rowid,
values,
cached_hash,
}
}
fn compute_hash(rowid: i64, values: &[Value]) -> u64 {
let mut hasher = DefaultHasher::new();
rowid.hash(&mut hasher);
for value in values {
match value {
Value::Null => {
0u8.hash(&mut hasher);
}
Value::Integer(i) => {
1u8.hash(&mut hasher);
i.hash(&mut hasher);
}
Value::Float(f) => {
2u8.hash(&mut hasher);
f.to_bits().hash(&mut hasher);
}
Value::Text(s) => {
3u8.hash(&mut hasher);
s.value.hash(&mut hasher);
(s.subtype as u8).hash(&mut hasher);
}
Value::Blob(b) => {
4u8.hash(&mut hasher);
b.hash(&mut hasher);
}
}
}
hasher.finish()
}
}
impl Hash for HashableRow {
fn hash<H: Hasher>(&self, state: &mut H) {
self.cached_hash.hash(state);
}
}

3
core/incremental/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod dbsp;
pub mod hashable_row;
pub mod operator;

1889
core/incremental/operator.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -6,6 +6,7 @@ mod ext;
mod fast_lock;
mod function;
mod functions;
mod incremental;
mod info;
mod io;
#[cfg(feature = "json")]