mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 17:05:36 +01:00
Merge 'Improve DBSP view serialization' from Glauber Costa
Improve serialization for DBSP views. The serialization code was written organically, without much forward thinking about stability as we evolved the table and operator format. Now that this is done, we are at at point where we can actually make it suck less and take a considerable step towards making this production ready. We also add a simple version check (in the table name, because that is much easier than reading contents in parse_schema_row) to prevent views to be used if we had to do anything to evolve the format of the circuit (including the operators) Closes #3351
This commit is contained in:
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -3696,6 +3696,12 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha1_smol"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
|
||||
|
||||
[[package]]
|
||||
name = "sha2"
|
||||
version = "0.10.9"
|
||||
@@ -4746,6 +4752,7 @@ checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d"
|
||||
dependencies = [
|
||||
"getrandom 0.3.2",
|
||||
"js-sys",
|
||||
"sha1_smol",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
|
||||
@@ -72,7 +72,7 @@ strum_macros = { workspace = true }
|
||||
bitflags = { workspace = true }
|
||||
serde = { workspace = true, optional = true, features = ["derive"] }
|
||||
paste = "1.0.15"
|
||||
uuid = { version = "1.11.0", features = ["v4", "v7"], optional = true }
|
||||
uuid = { version = "1.11.0", features = ["v4", "v5", "v7"], optional = true }
|
||||
tempfile = { workspace = true }
|
||||
pack1 = { version = "1.0.0", features = ["bytemuck"] }
|
||||
bytemuck = "1.23.1"
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Aggregate operator for DBSP-style incremental computation
|
||||
|
||||
use crate::function::{AggFunc, Func};
|
||||
use crate::incremental::dbsp::Hash128;
|
||||
use crate::incremental::dbsp::{Delta, DeltaPair, HashableRow};
|
||||
use crate::incremental::operator::{
|
||||
generate_storage_id, ComputationTracker, DbspStateCursors, EvalState, IncrementalOperator,
|
||||
@@ -16,6 +17,13 @@ use std::sync::{Arc, Mutex};
|
||||
pub const AGG_TYPE_REGULAR: u8 = 0b00; // COUNT/SUM/AVG
|
||||
pub const AGG_TYPE_MINMAX: u8 = 0b01; // MIN/MAX (BTree ordering gives both)
|
||||
|
||||
// Serialization type codes for aggregate functions
|
||||
const AGG_FUNC_COUNT: i64 = 0;
|
||||
const AGG_FUNC_SUM: i64 = 1;
|
||||
const AGG_FUNC_AVG: i64 = 2;
|
||||
const AGG_FUNC_MIN: i64 = 3;
|
||||
const AGG_FUNC_MAX: i64 = 4;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum AggregateFunction {
|
||||
Count,
|
||||
@@ -44,6 +52,104 @@ impl AggregateFunction {
|
||||
self.to_string()
|
||||
}
|
||||
|
||||
/// Serialize this aggregate function to a Value
|
||||
/// Returns a vector of values: [type_code, optional_column_index]
|
||||
pub fn to_values(&self) -> Vec<Value> {
|
||||
match self {
|
||||
AggregateFunction::Count => vec![Value::Integer(AGG_FUNC_COUNT)],
|
||||
AggregateFunction::Sum(idx) => {
|
||||
vec![Value::Integer(AGG_FUNC_SUM), Value::Integer(*idx as i64)]
|
||||
}
|
||||
AggregateFunction::Avg(idx) => {
|
||||
vec![Value::Integer(AGG_FUNC_AVG), Value::Integer(*idx as i64)]
|
||||
}
|
||||
AggregateFunction::Min(idx) => {
|
||||
vec![Value::Integer(AGG_FUNC_MIN), Value::Integer(*idx as i64)]
|
||||
}
|
||||
AggregateFunction::Max(idx) => {
|
||||
vec![Value::Integer(AGG_FUNC_MAX), Value::Integer(*idx as i64)]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Deserialize an aggregate function from values
|
||||
/// Consumes values from the cursor and returns the aggregate function
|
||||
pub fn from_values(values: &[Value], cursor: &mut usize) -> Result<Self> {
|
||||
let type_code = values
|
||||
.get(*cursor)
|
||||
.ok_or_else(|| LimboError::InternalError("Missing aggregate type code".into()))?;
|
||||
|
||||
let agg_fn = match type_code {
|
||||
Value::Integer(AGG_FUNC_COUNT) => {
|
||||
*cursor += 1;
|
||||
AggregateFunction::Count
|
||||
}
|
||||
Value::Integer(AGG_FUNC_SUM) => {
|
||||
*cursor += 1;
|
||||
let idx = values
|
||||
.get(*cursor)
|
||||
.ok_or_else(|| LimboError::InternalError("Missing SUM column index".into()))?;
|
||||
if let Value::Integer(idx) = idx {
|
||||
*cursor += 1;
|
||||
AggregateFunction::Sum(*idx as usize)
|
||||
} else {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Expected Integer for SUM column index, got {idx:?}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
Value::Integer(AGG_FUNC_AVG) => {
|
||||
*cursor += 1;
|
||||
let idx = values
|
||||
.get(*cursor)
|
||||
.ok_or_else(|| LimboError::InternalError("Missing AVG column index".into()))?;
|
||||
if let Value::Integer(idx) = idx {
|
||||
*cursor += 1;
|
||||
AggregateFunction::Avg(*idx as usize)
|
||||
} else {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Expected Integer for AVG column index, got {idx:?}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
Value::Integer(AGG_FUNC_MIN) => {
|
||||
*cursor += 1;
|
||||
let idx = values
|
||||
.get(*cursor)
|
||||
.ok_or_else(|| LimboError::InternalError("Missing MIN column index".into()))?;
|
||||
if let Value::Integer(idx) = idx {
|
||||
*cursor += 1;
|
||||
AggregateFunction::Min(*idx as usize)
|
||||
} else {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Expected Integer for MIN column index, got {idx:?}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
Value::Integer(AGG_FUNC_MAX) => {
|
||||
*cursor += 1;
|
||||
let idx = values
|
||||
.get(*cursor)
|
||||
.ok_or_else(|| LimboError::InternalError("Missing MAX column index".into()))?;
|
||||
if let Value::Integer(idx) = idx {
|
||||
*cursor += 1;
|
||||
AggregateFunction::Max(*idx as usize)
|
||||
} else {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Expected Integer for MAX column index, got {idx:?}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Unknown aggregate type code: {type_code:?}"
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
Ok(agg_fn)
|
||||
}
|
||||
|
||||
/// Create an AggregateFunction from a SQL function and its arguments
|
||||
/// Returns None if the function is not a supported aggregate
|
||||
pub fn from_sql_function(
|
||||
@@ -77,42 +183,6 @@ pub struct AggColumnInfo {
|
||||
pub has_max: bool,
|
||||
}
|
||||
|
||||
/// Serialize a Value using SQLite's serial type format
|
||||
/// This is used for MIN/MAX values that need to be stored in a compact, sortable format
|
||||
pub fn serialize_value(value: &Value, blob: &mut Vec<u8>) {
|
||||
let serial_type = crate::types::SerialType::from(value);
|
||||
let serial_type_u64: u64 = serial_type.into();
|
||||
crate::storage::sqlite3_ondisk::write_varint_to_vec(serial_type_u64, blob);
|
||||
value.serialize_serial(blob);
|
||||
}
|
||||
|
||||
/// Deserialize a Value using SQLite's serial type format
|
||||
/// Returns the deserialized value and the number of bytes consumed
|
||||
pub fn deserialize_value(blob: &[u8]) -> Option<(Value, usize)> {
|
||||
let mut cursor = 0;
|
||||
|
||||
// Read the serial type
|
||||
let (serial_type, varint_size) = crate::storage::sqlite3_ondisk::read_varint(blob).ok()?;
|
||||
cursor += varint_size;
|
||||
|
||||
let serial_type_obj = crate::types::SerialType::try_from(serial_type).ok()?;
|
||||
let expected_size = serial_type_obj.size();
|
||||
|
||||
// Read the value
|
||||
let (value, actual_size) =
|
||||
crate::storage::sqlite3_ondisk::read_value(&blob[cursor..], serial_type_obj).ok()?;
|
||||
|
||||
// Verify that the actual size matches what we expected from the serial type
|
||||
if actual_size != expected_size {
|
||||
return None; // Data corruption - size mismatch
|
||||
}
|
||||
|
||||
cursor += actual_size;
|
||||
|
||||
// Convert RefValue to Value
|
||||
Some((value.to_owned(), cursor))
|
||||
}
|
||||
|
||||
// group_key_str -> (group_key, state)
|
||||
type ComputedStates = HashMap<String, (Vec<Value>, AggregateState)>;
|
||||
// group_key_str -> (column_index, value_as_hashable_row) -> accumulated_weight
|
||||
@@ -198,9 +268,9 @@ pub struct AggregateState {
|
||||
// For COUNT: just the count
|
||||
pub count: i64,
|
||||
// For SUM: column_index -> sum value
|
||||
sums: HashMap<usize, f64>,
|
||||
pub sums: HashMap<usize, f64>,
|
||||
// For AVG: column_index -> (sum, count) for computing average
|
||||
avgs: HashMap<usize, (f64, i64)>,
|
||||
pub avgs: HashMap<usize, (f64, i64)>,
|
||||
// For MIN: column_index -> minimum value
|
||||
pub mins: HashMap<usize, Value>,
|
||||
// For MAX: column_index -> maximum value
|
||||
@@ -243,17 +313,17 @@ impl AggregateEvalState {
|
||||
// Get the current group to read
|
||||
let (group_key_str, _group_key) = &groups_to_read[*current_idx];
|
||||
|
||||
// Build the key for the index: (operator_id, zset_id, element_id)
|
||||
// Build the key for the index: (operator_id, zset_hash, element_id)
|
||||
// For regular aggregates, use column_index=0 and AGG_TYPE_REGULAR
|
||||
let operator_storage_id =
|
||||
generate_storage_id(operator.operator_id, 0, AGG_TYPE_REGULAR);
|
||||
let zset_id = operator.generate_group_rowid(group_key_str);
|
||||
let zset_hash = operator.generate_group_hash(group_key_str);
|
||||
let element_id = 0i64; // Always 0 for aggregators
|
||||
|
||||
// Create index key values
|
||||
let index_key_values = vec![
|
||||
Value::Integer(operator_storage_id),
|
||||
Value::Integer(zset_id),
|
||||
zset_hash.to_value(),
|
||||
Value::Integer(element_id),
|
||||
];
|
||||
|
||||
@@ -306,11 +376,9 @@ impl AggregateEvalState {
|
||||
// Only try to read if we have a rowid
|
||||
if let Some(rowid) = rowid {
|
||||
let key = SeekKey::TableRowId(*rowid);
|
||||
let state = return_if_io!(read_record_state.read_record(
|
||||
key,
|
||||
&operator.aggregates,
|
||||
&mut cursors.table_cursor
|
||||
));
|
||||
let state = return_if_io!(
|
||||
read_record_state.read_record(key, &mut cursors.table_cursor)
|
||||
);
|
||||
// Process the fetched state
|
||||
if let Some(state) = state {
|
||||
let mut old_row = group_key.clone();
|
||||
@@ -368,196 +436,249 @@ impl AggregateState {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
// Serialize the aggregate state to a binary blob including group key values
|
||||
// The reason we serialize it like this, instead of just writing the actual values, is that
|
||||
// The same table may have different aggregators in the circuit. They will all have different
|
||||
// columns.
|
||||
fn to_blob(&self, aggregates: &[AggregateFunction], group_key: &[Value]) -> Vec<u8> {
|
||||
let mut blob = Vec::new();
|
||||
/// Convert the aggregate state to a vector of Values for unified serialization
|
||||
/// Format: [count, num_aggregates, (agg_metadata, agg_state)...]
|
||||
/// Each aggregate includes its type and column index for proper deserialization
|
||||
pub fn to_value_vector(&self, aggregates: &[AggregateFunction]) -> Vec<Value> {
|
||||
let mut values = Vec::new();
|
||||
|
||||
// Write version byte for future compatibility
|
||||
blob.push(1u8);
|
||||
// Include count first
|
||||
values.push(Value::Integer(self.count));
|
||||
|
||||
// Write number of group key values
|
||||
blob.extend_from_slice(&(group_key.len() as u32).to_le_bytes());
|
||||
// Store number of aggregates
|
||||
values.push(Value::Integer(aggregates.len() as i64));
|
||||
|
||||
// Write each group key value
|
||||
for value in group_key {
|
||||
// Write value type tag
|
||||
match value {
|
||||
Value::Null => blob.push(0u8),
|
||||
Value::Integer(i) => {
|
||||
blob.push(1u8);
|
||||
blob.extend_from_slice(&i.to_le_bytes());
|
||||
}
|
||||
Value::Float(f) => {
|
||||
blob.push(2u8);
|
||||
blob.extend_from_slice(&f.to_le_bytes());
|
||||
}
|
||||
Value::Text(s) => {
|
||||
blob.push(3u8);
|
||||
let text_str = s.as_str();
|
||||
let bytes = text_str.as_bytes();
|
||||
blob.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
|
||||
blob.extend_from_slice(bytes);
|
||||
}
|
||||
Value::Blob(b) => {
|
||||
blob.push(4u8);
|
||||
blob.extend_from_slice(&(b.len() as u32).to_le_bytes());
|
||||
blob.extend_from_slice(b);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write count as 8 bytes (little-endian)
|
||||
blob.extend_from_slice(&self.count.to_le_bytes());
|
||||
|
||||
// Write each aggregate's state
|
||||
// Add each aggregate's metadata and state
|
||||
for agg in aggregates {
|
||||
// First, add the aggregate function metadata (type and column index)
|
||||
values.extend(agg.to_values());
|
||||
|
||||
// Then add the state for this aggregate
|
||||
match agg {
|
||||
AggregateFunction::Sum(col_name) => {
|
||||
let sum = self.sums.get(col_name).copied().unwrap_or(0.0);
|
||||
blob.extend_from_slice(&sum.to_le_bytes());
|
||||
}
|
||||
AggregateFunction::Avg(col_name) => {
|
||||
let (sum, count) = self.avgs.get(col_name).copied().unwrap_or((0.0, 0));
|
||||
blob.extend_from_slice(&sum.to_le_bytes());
|
||||
blob.extend_from_slice(&count.to_le_bytes());
|
||||
}
|
||||
AggregateFunction::Count => {
|
||||
// Count is already written above
|
||||
// Count state is already stored at the beginning
|
||||
}
|
||||
AggregateFunction::Min(col_name) => {
|
||||
// Write whether we have a MIN value (1 byte)
|
||||
if let Some(min_val) = self.mins.get(col_name) {
|
||||
blob.push(1u8); // Has value
|
||||
serialize_value(min_val, &mut blob);
|
||||
AggregateFunction::Sum(col_idx) => {
|
||||
let sum = self.sums.get(col_idx).copied().unwrap_or(0.0);
|
||||
values.push(Value::Float(sum));
|
||||
}
|
||||
AggregateFunction::Avg(col_idx) => {
|
||||
let (sum, count) = self.avgs.get(col_idx).copied().unwrap_or((0.0, 0));
|
||||
values.push(Value::Float(sum));
|
||||
values.push(Value::Integer(count));
|
||||
}
|
||||
AggregateFunction::Min(col_idx) => {
|
||||
if let Some(min_val) = self.mins.get(col_idx) {
|
||||
values.push(Value::Integer(1)); // Has value
|
||||
values.push(min_val.clone());
|
||||
} else {
|
||||
blob.push(0u8); // No value
|
||||
values.push(Value::Integer(0)); // No value
|
||||
}
|
||||
}
|
||||
AggregateFunction::Max(col_name) => {
|
||||
// Write whether we have a MAX value (1 byte)
|
||||
if let Some(max_val) = self.maxs.get(col_name) {
|
||||
blob.push(1u8); // Has value
|
||||
serialize_value(max_val, &mut blob);
|
||||
AggregateFunction::Max(col_idx) => {
|
||||
if let Some(max_val) = self.maxs.get(col_idx) {
|
||||
values.push(Value::Integer(1)); // Has value
|
||||
values.push(max_val.clone());
|
||||
} else {
|
||||
blob.push(0u8); // No value
|
||||
values.push(Value::Integer(0)); // No value
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
blob
|
||||
values
|
||||
}
|
||||
|
||||
/// Deserialize aggregate state from a binary blob
|
||||
/// Returns the aggregate state and the group key values
|
||||
pub fn from_blob(blob: &[u8], aggregates: &[AggregateFunction]) -> Option<(Self, Vec<Value>)> {
|
||||
/// Reconstruct aggregate state from a vector of Values
|
||||
pub fn from_value_vector(values: &[Value]) -> Result<Self> {
|
||||
let mut cursor = 0;
|
||||
|
||||
// Check version byte
|
||||
if blob.get(cursor) != Some(&1u8) {
|
||||
return None;
|
||||
}
|
||||
cursor += 1;
|
||||
|
||||
// Read number of group key values
|
||||
let num_group_keys =
|
||||
u32::from_le_bytes(blob.get(cursor..cursor + 4)?.try_into().ok()?) as usize;
|
||||
cursor += 4;
|
||||
|
||||
// Read group key values
|
||||
let mut group_key = Vec::new();
|
||||
for _ in 0..num_group_keys {
|
||||
let value_type = *blob.get(cursor)?;
|
||||
cursor += 1;
|
||||
|
||||
let value = match value_type {
|
||||
0 => Value::Null,
|
||||
1 => {
|
||||
let i = i64::from_le_bytes(blob.get(cursor..cursor + 8)?.try_into().ok()?);
|
||||
cursor += 8;
|
||||
Value::Integer(i)
|
||||
}
|
||||
2 => {
|
||||
let f = f64::from_le_bytes(blob.get(cursor..cursor + 8)?.try_into().ok()?);
|
||||
cursor += 8;
|
||||
Value::Float(f)
|
||||
}
|
||||
3 => {
|
||||
let len =
|
||||
u32::from_le_bytes(blob.get(cursor..cursor + 4)?.try_into().ok()?) as usize;
|
||||
cursor += 4;
|
||||
let bytes = blob.get(cursor..cursor + len)?;
|
||||
cursor += len;
|
||||
let text_str = std::str::from_utf8(bytes).ok()?;
|
||||
Value::Text(text_str.to_string().into())
|
||||
}
|
||||
4 => {
|
||||
let len =
|
||||
u32::from_le_bytes(blob.get(cursor..cursor + 4)?.try_into().ok()?) as usize;
|
||||
cursor += 4;
|
||||
let bytes = blob.get(cursor..cursor + len)?;
|
||||
cursor += len;
|
||||
Value::Blob(bytes.to_vec())
|
||||
}
|
||||
_ => return None,
|
||||
};
|
||||
group_key.push(value);
|
||||
}
|
||||
let mut state = Self::new();
|
||||
|
||||
// Read count
|
||||
let count = i64::from_le_bytes(blob.get(cursor..cursor + 8)?.try_into().ok()?);
|
||||
cursor += 8;
|
||||
let count = values
|
||||
.get(cursor)
|
||||
.ok_or_else(|| LimboError::InternalError("Aggregate state missing count".into()))?;
|
||||
if let Value::Integer(count) = count {
|
||||
state.count = *count;
|
||||
cursor += 1;
|
||||
} else {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Expected Integer for count, got {count:?}"
|
||||
)));
|
||||
}
|
||||
|
||||
let mut state = Self::new();
|
||||
state.count = count;
|
||||
// Read number of aggregates
|
||||
let num_aggregates = values
|
||||
.get(cursor)
|
||||
.ok_or_else(|| LimboError::InternalError("Missing number of aggregates".into()))?;
|
||||
let num_aggregates = match num_aggregates {
|
||||
Value::Integer(n) => *n as usize,
|
||||
_ => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Expected Integer for aggregate count, got {num_aggregates:?}"
|
||||
)))
|
||||
}
|
||||
};
|
||||
cursor += 1;
|
||||
|
||||
// Read each aggregate's state
|
||||
for agg in aggregates {
|
||||
match agg {
|
||||
AggregateFunction::Sum(col_name) => {
|
||||
let sum = f64::from_le_bytes(blob.get(cursor..cursor + 8)?.try_into().ok()?);
|
||||
cursor += 8;
|
||||
state.sums.insert(*col_name, sum);
|
||||
}
|
||||
AggregateFunction::Avg(col_name) => {
|
||||
let sum = f64::from_le_bytes(blob.get(cursor..cursor + 8)?.try_into().ok()?);
|
||||
cursor += 8;
|
||||
let count = i64::from_le_bytes(blob.get(cursor..cursor + 8)?.try_into().ok()?);
|
||||
cursor += 8;
|
||||
state.avgs.insert(*col_name, (sum, count));
|
||||
}
|
||||
// Read each aggregate's state with type and column index
|
||||
for _ in 0..num_aggregates {
|
||||
// Deserialize the aggregate function metadata
|
||||
let agg_fn = AggregateFunction::from_values(values, &mut cursor)?;
|
||||
|
||||
// Read the state for this aggregate
|
||||
match agg_fn {
|
||||
AggregateFunction::Count => {
|
||||
// Count was already read above
|
||||
// Count state is already stored at the beginning
|
||||
}
|
||||
AggregateFunction::Min(col_name) => {
|
||||
// Read whether we have a MIN value
|
||||
let has_value = *blob.get(cursor)?;
|
||||
cursor += 1;
|
||||
|
||||
if has_value == 1 {
|
||||
let (min_value, bytes_consumed) = deserialize_value(&blob[cursor..])?;
|
||||
cursor += bytes_consumed;
|
||||
state.mins.insert(*col_name, min_value);
|
||||
AggregateFunction::Sum(col_idx) => {
|
||||
let sum = values
|
||||
.get(cursor)
|
||||
.ok_or_else(|| LimboError::InternalError("Missing SUM value".into()))?;
|
||||
if let Value::Float(sum) = sum {
|
||||
state.sums.insert(col_idx, *sum);
|
||||
cursor += 1;
|
||||
} else {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Expected Float for SUM value, got {sum:?}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
AggregateFunction::Max(col_name) => {
|
||||
// Read whether we have a MAX value
|
||||
let has_value = *blob.get(cursor)?;
|
||||
AggregateFunction::Avg(col_idx) => {
|
||||
let sum = values
|
||||
.get(cursor)
|
||||
.ok_or_else(|| LimboError::InternalError("Missing AVG sum value".into()))?;
|
||||
let sum = match sum {
|
||||
Value::Float(f) => *f,
|
||||
_ => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Expected Float for AVG sum, got {sum:?}"
|
||||
)))
|
||||
}
|
||||
};
|
||||
cursor += 1;
|
||||
|
||||
if has_value == 1 {
|
||||
let (max_value, bytes_consumed) = deserialize_value(&blob[cursor..])?;
|
||||
cursor += bytes_consumed;
|
||||
state.maxs.insert(*col_name, max_value);
|
||||
let count = values.get(cursor).ok_or_else(|| {
|
||||
LimboError::InternalError("Missing AVG count value".into())
|
||||
})?;
|
||||
let count = match count {
|
||||
Value::Integer(i) => *i,
|
||||
_ => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Expected Integer for AVG count, got {count:?}"
|
||||
)))
|
||||
}
|
||||
};
|
||||
cursor += 1;
|
||||
|
||||
state.avgs.insert(col_idx, (sum, count));
|
||||
}
|
||||
AggregateFunction::Min(col_idx) => {
|
||||
let has_value = values.get(cursor).ok_or_else(|| {
|
||||
LimboError::InternalError("Missing MIN has_value flag".into())
|
||||
})?;
|
||||
if let Value::Integer(has_value) = has_value {
|
||||
cursor += 1;
|
||||
if *has_value == 1 {
|
||||
let min_val = values
|
||||
.get(cursor)
|
||||
.ok_or_else(|| {
|
||||
LimboError::InternalError("Missing MIN value".into())
|
||||
})?
|
||||
.clone();
|
||||
cursor += 1;
|
||||
state.mins.insert(col_idx, min_val);
|
||||
}
|
||||
} else {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Expected Integer for MIN has_value flag, got {has_value:?}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
AggregateFunction::Max(col_idx) => {
|
||||
let has_value = values.get(cursor).ok_or_else(|| {
|
||||
LimboError::InternalError("Missing MAX has_value flag".into())
|
||||
})?;
|
||||
if let Value::Integer(has_value) = has_value {
|
||||
cursor += 1;
|
||||
if *has_value == 1 {
|
||||
let max_val = values
|
||||
.get(cursor)
|
||||
.ok_or_else(|| {
|
||||
LimboError::InternalError("Missing MAX value".into())
|
||||
})?
|
||||
.clone();
|
||||
cursor += 1;
|
||||
state.maxs.insert(col_idx, max_val);
|
||||
}
|
||||
} else {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Expected Integer for MAX has_value flag, got {has_value:?}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Some((state, group_key))
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
fn to_blob(&self, aggregates: &[AggregateFunction], group_key: &[Value]) -> Vec<u8> {
|
||||
let mut all_values = Vec::new();
|
||||
// Store the group key size first
|
||||
all_values.push(Value::Integer(group_key.len() as i64));
|
||||
all_values.extend_from_slice(group_key);
|
||||
all_values.extend(self.to_value_vector(aggregates));
|
||||
|
||||
let record = ImmutableRecord::from_values(&all_values, all_values.len());
|
||||
record.as_blob().clone()
|
||||
}
|
||||
|
||||
pub fn from_blob(blob: &[u8]) -> Result<(Self, Vec<Value>)> {
|
||||
let record = ImmutableRecord::from_bin_record(blob.to_vec());
|
||||
let ref_values = record.get_values();
|
||||
let mut all_values: Vec<Value> = ref_values.into_iter().map(|rv| rv.to_owned()).collect();
|
||||
|
||||
if all_values.is_empty() {
|
||||
return Err(LimboError::InternalError(
|
||||
"Aggregate state blob is empty".into(),
|
||||
));
|
||||
}
|
||||
|
||||
// Read the group key size
|
||||
let group_key_count = match &all_values[0] {
|
||||
Value::Integer(n) if *n >= 0 => *n as usize,
|
||||
Value::Integer(n) => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Negative group key count: {n}"
|
||||
)))
|
||||
}
|
||||
other => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Expected Integer for group key count, got {other:?}"
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
// Remove the group key count from the values
|
||||
all_values.remove(0);
|
||||
|
||||
if all_values.len() < group_key_count {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Blob too short: expected at least {} values for group key, got {}",
|
||||
group_key_count,
|
||||
all_values.len()
|
||||
)));
|
||||
}
|
||||
|
||||
// Split into group key and state values
|
||||
let group_key = all_values[..group_key_count].to_vec();
|
||||
let state_values = &all_values[group_key_count..];
|
||||
|
||||
// Reconstruct the aggregate state
|
||||
let state = Self::from_value_vector(state_values)?;
|
||||
|
||||
Ok((state, group_key))
|
||||
}
|
||||
|
||||
/// Apply a delta to this aggregate state
|
||||
@@ -835,7 +956,7 @@ impl AggregateOperator {
|
||||
for (group_key_str, state) in existing_groups {
|
||||
let group_key = temp_keys.get(group_key_str).cloned().unwrap_or_default();
|
||||
|
||||
// Generate a unique rowid for this group
|
||||
// Generate synthetic rowid for this group
|
||||
let result_key = self.generate_group_rowid(group_key_str);
|
||||
|
||||
if let Some(old_row_values) = old_values.get(group_key_str) {
|
||||
@@ -902,19 +1023,26 @@ impl AggregateOperator {
|
||||
self.tracker = Some(tracker);
|
||||
}
|
||||
|
||||
/// Generate a rowid for a group
|
||||
/// For no GROUP BY: always returns 0
|
||||
/// For GROUP BY: returns a hash of the group key string
|
||||
pub fn generate_group_rowid(&self, group_key_str: &str) -> i64 {
|
||||
/// Generate a hash for a group
|
||||
/// For no GROUP BY: returns a zero hash
|
||||
/// For GROUP BY: returns a 128-bit hash of the group key string
|
||||
pub fn generate_group_hash(&self, group_key_str: &str) -> Hash128 {
|
||||
if self.group_by.is_empty() {
|
||||
0
|
||||
Hash128::new(0, 0)
|
||||
} else {
|
||||
group_key_str
|
||||
.bytes()
|
||||
.fold(0i64, |acc, b| acc.wrapping_mul(31).wrapping_add(b as i64))
|
||||
Hash128::hash_str(group_key_str)
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a rowid for a group (for output rows)
|
||||
/// This is NOT the hash used for storage (that's generate_group_hash which returns full 128-bit).
|
||||
/// This is a synthetic rowid used in place of SQLite's rowid for aggregate output rows.
|
||||
/// We truncate the 128-bit hash to 64 bits for SQLite rowid compatibility.
|
||||
pub fn generate_group_rowid(&self, group_key_str: &str) -> i64 {
|
||||
let hash = self.generate_group_hash(group_key_str);
|
||||
hash.as_i64()
|
||||
}
|
||||
|
||||
/// Extract group key values from a row
|
||||
pub fn extract_group_key(&self, values: &[Value]) -> Vec<Value> {
|
||||
let mut key = Vec::new();
|
||||
@@ -1020,7 +1148,7 @@ impl IncrementalOperator for AggregateOperator {
|
||||
// For regular aggregates, use column_index=0 and AGG_TYPE_REGULAR
|
||||
let operator_storage_id =
|
||||
generate_storage_id(self.operator_id, 0, AGG_TYPE_REGULAR);
|
||||
let zset_id = self.generate_group_rowid(group_key_str);
|
||||
let zset_hash = self.generate_group_hash(group_key_str);
|
||||
let element_id = 0i64;
|
||||
|
||||
// Determine weight: -1 to delete (cancels existing weight=1), 1 to insert/update
|
||||
@@ -1030,22 +1158,22 @@ impl IncrementalOperator for AggregateOperator {
|
||||
let state_blob = agg_state.to_blob(&self.aggregates, group_key);
|
||||
let blob_value = Value::Blob(state_blob);
|
||||
|
||||
// Build the aggregate storage format: [operator_id, zset_id, element_id, value, weight]
|
||||
// Build the aggregate storage format: [operator_id, zset_hash, element_id, value, weight]
|
||||
let operator_id_val = Value::Integer(operator_storage_id);
|
||||
let zset_id_val = Value::Integer(zset_id);
|
||||
let zset_hash_val = zset_hash.to_value();
|
||||
let element_id_val = Value::Integer(element_id);
|
||||
let blob_val = blob_value.clone();
|
||||
|
||||
// Create index key - the first 3 columns of our primary key
|
||||
let index_key = vec![
|
||||
operator_id_val.clone(),
|
||||
zset_id_val.clone(),
|
||||
zset_hash_val.clone(),
|
||||
element_id_val.clone(),
|
||||
];
|
||||
|
||||
// Record values (without weight)
|
||||
let record_values =
|
||||
vec![operator_id_val, zset_id_val, element_id_val, blob_val];
|
||||
vec![operator_id_val, zset_hash_val, element_id_val, blob_val];
|
||||
|
||||
return_and_restore_if_io!(
|
||||
&mut self.commit_state,
|
||||
@@ -1081,7 +1209,7 @@ impl IncrementalOperator for AggregateOperator {
|
||||
self.operator_id,
|
||||
&self.column_min_max,
|
||||
cursors,
|
||||
|group_key_str| self.generate_group_rowid(group_key_str)
|
||||
|group_key_str| self.generate_group_hash(group_key_str)
|
||||
)
|
||||
);
|
||||
|
||||
@@ -1239,7 +1367,7 @@ impl RecomputeMinMax {
|
||||
// Create storage keys for index lookup
|
||||
let storage_id =
|
||||
generate_storage_id(operator.operator_id, storage_index, AGG_TYPE_MINMAX);
|
||||
let zset_id = operator.generate_group_rowid(&group_key);
|
||||
let zset_hash = operator.generate_group_hash(&group_key);
|
||||
|
||||
// Get the values for this group from min_max_deltas
|
||||
let group_values = min_max_deltas.get(&group_key).cloned().unwrap_or_default();
|
||||
@@ -1253,7 +1381,7 @@ impl RecomputeMinMax {
|
||||
group_key.clone(),
|
||||
column_name,
|
||||
storage_id,
|
||||
zset_id,
|
||||
zset_hash,
|
||||
group_values,
|
||||
))
|
||||
} else {
|
||||
@@ -1262,7 +1390,7 @@ impl RecomputeMinMax {
|
||||
group_key.clone(),
|
||||
column_name,
|
||||
storage_id,
|
||||
zset_id,
|
||||
zset_hash,
|
||||
group_values,
|
||||
))
|
||||
};
|
||||
@@ -1334,7 +1462,7 @@ pub enum ScanState {
|
||||
/// Storage ID for the index seek
|
||||
storage_id: i64,
|
||||
/// ZSet ID for the group
|
||||
zset_id: i64,
|
||||
zset_hash: Hash128,
|
||||
/// Group values from MinMaxDeltas: (column_name, HashableRow) -> weight
|
||||
group_values: HashMap<(usize, HashableRow), isize>,
|
||||
/// Whether we're looking for MIN (true) or MAX (false)
|
||||
@@ -1350,7 +1478,7 @@ pub enum ScanState {
|
||||
/// Storage ID for the index seek
|
||||
storage_id: i64,
|
||||
/// ZSet ID for the group
|
||||
zset_id: i64,
|
||||
zset_hash: Hash128,
|
||||
/// Group values from MinMaxDeltas: (column_name, HashableRow) -> weight
|
||||
group_values: HashMap<(usize, HashableRow), isize>,
|
||||
/// Whether we're looking for MIN (true) or MAX (false)
|
||||
@@ -1368,7 +1496,7 @@ impl ScanState {
|
||||
group_key: String,
|
||||
column_name: usize,
|
||||
storage_id: i64,
|
||||
zset_id: i64,
|
||||
zset_hash: Hash128,
|
||||
group_values: HashMap<(usize, HashableRow), isize>,
|
||||
) -> Self {
|
||||
Self::CheckCandidate {
|
||||
@@ -1376,7 +1504,7 @@ impl ScanState {
|
||||
group_key,
|
||||
column_name,
|
||||
storage_id,
|
||||
zset_id,
|
||||
zset_hash,
|
||||
group_values,
|
||||
is_min: true,
|
||||
}
|
||||
@@ -1390,7 +1518,7 @@ impl ScanState {
|
||||
index_record: &ImmutableRecord,
|
||||
seek_op: SeekOp,
|
||||
storage_id: i64,
|
||||
zset_id: i64,
|
||||
zset_hash: Hash128,
|
||||
) -> Result<IOResult<Option<Value>>> {
|
||||
let seek_result = return_if_io!(cursors
|
||||
.index_cursor
|
||||
@@ -1413,15 +1541,26 @@ impl ScanState {
|
||||
let Some(rec_storage_id) = values.first() else {
|
||||
return Ok(IOResult::Done(None));
|
||||
};
|
||||
let Some(rec_zset_id) = values.get(1) else {
|
||||
let Some(rec_zset_hash) = values.get(1) else {
|
||||
return Ok(IOResult::Done(None));
|
||||
};
|
||||
|
||||
// Check if we're still in the same group
|
||||
if let (RefValue::Integer(rec_sid), RefValue::Integer(rec_zid)) =
|
||||
(rec_storage_id, rec_zset_id)
|
||||
{
|
||||
if *rec_sid != storage_id || *rec_zid != zset_id {
|
||||
if let RefValue::Integer(rec_sid) = rec_storage_id {
|
||||
if *rec_sid != storage_id {
|
||||
return Ok(IOResult::Done(None));
|
||||
}
|
||||
} else {
|
||||
return Ok(IOResult::Done(None));
|
||||
}
|
||||
|
||||
// Compare zset_hash as blob
|
||||
if let RefValue::Blob(rec_zset_blob) = rec_zset_hash {
|
||||
if let Some(rec_hash) = Hash128::from_blob(rec_zset_blob.to_slice()) {
|
||||
if rec_hash != zset_hash {
|
||||
return Ok(IOResult::Done(None));
|
||||
}
|
||||
} else {
|
||||
return Ok(IOResult::Done(None));
|
||||
}
|
||||
} else {
|
||||
@@ -1437,7 +1576,7 @@ impl ScanState {
|
||||
group_key: String,
|
||||
column_name: usize,
|
||||
storage_id: i64,
|
||||
zset_id: i64,
|
||||
zset_hash: Hash128,
|
||||
group_values: HashMap<(usize, HashableRow), isize>,
|
||||
) -> Self {
|
||||
Self::CheckCandidate {
|
||||
@@ -1445,7 +1584,7 @@ impl ScanState {
|
||||
group_key,
|
||||
column_name,
|
||||
storage_id,
|
||||
zset_id,
|
||||
zset_hash,
|
||||
group_values,
|
||||
is_min: false,
|
||||
}
|
||||
@@ -1462,7 +1601,7 @@ impl ScanState {
|
||||
group_key,
|
||||
column_name,
|
||||
storage_id,
|
||||
zset_id,
|
||||
zset_hash,
|
||||
group_values,
|
||||
is_min,
|
||||
} => {
|
||||
@@ -1482,7 +1621,7 @@ impl ScanState {
|
||||
group_key: std::mem::take(group_key),
|
||||
column_name: std::mem::take(column_name),
|
||||
storage_id: *storage_id,
|
||||
zset_id: *zset_id,
|
||||
zset_hash: *zset_hash,
|
||||
group_values: std::mem::take(group_values),
|
||||
is_min: *is_min,
|
||||
};
|
||||
@@ -1544,14 +1683,14 @@ impl ScanState {
|
||||
group_key,
|
||||
column_name,
|
||||
storage_id,
|
||||
zset_id,
|
||||
zset_hash,
|
||||
group_values,
|
||||
is_min,
|
||||
} => {
|
||||
// Seek to the next value in the index
|
||||
let index_key = vec![
|
||||
Value::Integer(*storage_id),
|
||||
Value::Integer(*zset_id),
|
||||
zset_hash.to_value(),
|
||||
current_candidate.clone(),
|
||||
];
|
||||
let index_record = ImmutableRecord::from_values(&index_key, index_key.len());
|
||||
@@ -1567,7 +1706,7 @@ impl ScanState {
|
||||
&index_record,
|
||||
seek_op,
|
||||
*storage_id,
|
||||
*zset_id
|
||||
*zset_hash
|
||||
));
|
||||
|
||||
*self = ScanState::CheckCandidate {
|
||||
@@ -1575,7 +1714,7 @@ impl ScanState {
|
||||
group_key: std::mem::take(group_key),
|
||||
column_name: std::mem::take(column_name),
|
||||
storage_id: *storage_id,
|
||||
zset_id: *zset_id,
|
||||
zset_hash: *zset_hash,
|
||||
group_values: std::mem::take(group_values),
|
||||
is_min: *is_min,
|
||||
};
|
||||
@@ -1629,7 +1768,7 @@ impl MinMaxPersistState {
|
||||
operator_id: usize,
|
||||
column_min_max: &HashMap<usize, AggColumnInfo>,
|
||||
cursors: &mut DbspStateCursors,
|
||||
generate_group_rowid: impl Fn(&str) -> i64,
|
||||
generate_group_hash: impl Fn(&str) -> Hash128,
|
||||
) -> Result<IOResult<()>> {
|
||||
loop {
|
||||
match self {
|
||||
@@ -1715,7 +1854,7 @@ impl MinMaxPersistState {
|
||||
// Build the key components for MinMax storage using new encoding
|
||||
let storage_id =
|
||||
generate_storage_id(operator_id, column_index, AGG_TYPE_MINMAX);
|
||||
let zset_id = generate_group_rowid(group_key_str);
|
||||
let zset_hash = generate_group_hash(group_key_str);
|
||||
|
||||
// element_id is the actual value for Min/Max
|
||||
let element_id_val = value.clone();
|
||||
@@ -1723,15 +1862,15 @@ impl MinMaxPersistState {
|
||||
// Create index key
|
||||
let index_key = vec![
|
||||
Value::Integer(storage_id),
|
||||
Value::Integer(zset_id),
|
||||
zset_hash.to_value(),
|
||||
element_id_val.clone(),
|
||||
];
|
||||
|
||||
// Record values (operator_id, zset_id, element_id, unused_placeholder)
|
||||
// Record values (operator_id, zset_hash, element_id, unused_placeholder)
|
||||
// For MIN/MAX, the element_id IS the value, so we use NULL for the 4th column
|
||||
let record_values = vec![
|
||||
Value::Integer(storage_id),
|
||||
Value::Integer(zset_id),
|
||||
zset_hash.to_value(),
|
||||
element_id_val.clone(),
|
||||
Value::Null, // Placeholder - not used for MIN/MAX
|
||||
];
|
||||
|
||||
@@ -368,6 +368,10 @@ impl DbspNode {
|
||||
}
|
||||
}
|
||||
|
||||
/// Version number for the DBSP circuit format
|
||||
/// This should be incremented when the circuit structure changes
|
||||
pub const DBSP_CIRCUIT_VERSION: u32 = 1;
|
||||
|
||||
/// Represents a complete DBSP circuit (DAG of operators)
|
||||
#[derive(Debug)]
|
||||
pub struct DbspCircuit {
|
||||
@@ -403,7 +407,7 @@ impl DbspCircuit {
|
||||
let empty_schema = Arc::new(LogicalSchema::new(vec![]));
|
||||
Self {
|
||||
nodes: HashMap::new(),
|
||||
next_id: 0,
|
||||
next_id: 1, // Start from 1 to reserve 0 for metadata
|
||||
root: None,
|
||||
output_schema: empty_schema,
|
||||
commit_state: CommitState::Init,
|
||||
|
||||
@@ -2,10 +2,121 @@
|
||||
// For now, we'll use a basic approach and can expand to full DBSP later
|
||||
|
||||
use crate::Value;
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::hash::{Hash, Hasher};
|
||||
|
||||
/// A 128-bit hash value implemented as a UUID
|
||||
/// We use UUID because it's a standard 128-bit type we already depend on
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct Hash128 {
|
||||
// Store as UUID internally for efficient 128-bit representation
|
||||
uuid: uuid::Uuid,
|
||||
}
|
||||
|
||||
impl Hash128 {
|
||||
/// Create a new 128-bit hash from high and low 64-bit parts
|
||||
pub fn new(high: u64, low: u64) -> Self {
|
||||
// Convert two u64 values to UUID bytes (big-endian)
|
||||
let mut bytes = [0u8; 16];
|
||||
bytes[0..8].copy_from_slice(&high.to_be_bytes());
|
||||
bytes[8..16].copy_from_slice(&low.to_be_bytes());
|
||||
Self {
|
||||
uuid: uuid::Uuid::from_bytes(bytes),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the low 64 bits as i64 (for when we need a rowid)
|
||||
pub fn as_i64(&self) -> i64 {
|
||||
let bytes = self.uuid.as_bytes();
|
||||
let low = u64::from_be_bytes([
|
||||
bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15],
|
||||
]);
|
||||
low as i64
|
||||
}
|
||||
|
||||
/// Compute a 128-bit hash of the given values
|
||||
/// We serialize values to a string representation and use UUID v5 (SHA-1 based)
|
||||
/// to get a deterministic 128-bit hash
|
||||
pub fn hash_values(values: &[Value]) -> Self {
|
||||
// Build a string representation of all values
|
||||
// Use a delimiter that won't appear in normal values
|
||||
let mut s = String::new();
|
||||
for (i, value) in values.iter().enumerate() {
|
||||
if i > 0 {
|
||||
s.push('\x00'); // null byte as delimiter
|
||||
}
|
||||
// Add type prefix to distinguish between types
|
||||
match value {
|
||||
Value::Null => s.push_str("N:"),
|
||||
Value::Integer(n) => {
|
||||
s.push_str("I:");
|
||||
s.push_str(&n.to_string());
|
||||
}
|
||||
Value::Float(f) => {
|
||||
s.push_str("F:");
|
||||
// Use to_bits to ensure consistent representation
|
||||
s.push_str(&f.to_bits().to_string());
|
||||
}
|
||||
Value::Text(t) => {
|
||||
s.push_str("T:");
|
||||
s.push_str(t.as_str());
|
||||
}
|
||||
Value::Blob(b) => {
|
||||
s.push_str("B:");
|
||||
s.push_str(&hex::encode(b));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Self::hash_str(&s)
|
||||
}
|
||||
|
||||
/// Hash a string value to 128 bits using UUID v5
|
||||
pub fn hash_str(s: &str) -> Self {
|
||||
// Use UUID v5 with a fixed namespace to get deterministic 128-bit hashes
|
||||
// We use the DNS namespace as it's a standard choice
|
||||
let uuid = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_DNS, s.as_bytes());
|
||||
Self { uuid }
|
||||
}
|
||||
|
||||
/// Convert to a big-endian byte array for storage
|
||||
pub fn to_blob(self) -> Vec<u8> {
|
||||
self.uuid.as_bytes().to_vec()
|
||||
}
|
||||
|
||||
/// Create from a big-endian byte array
|
||||
pub fn from_blob(bytes: &[u8]) -> Option<Self> {
|
||||
if bytes.len() != 16 {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut uuid_bytes = [0u8; 16];
|
||||
uuid_bytes.copy_from_slice(bytes);
|
||||
Some(Self {
|
||||
uuid: uuid::Uuid::from_bytes(uuid_bytes),
|
||||
})
|
||||
}
|
||||
|
||||
/// Convert to a Value::Blob for storage
|
||||
pub fn to_value(self) -> Value {
|
||||
Value::Blob(self.to_blob())
|
||||
}
|
||||
|
||||
/// Try to extract a Hash128 from a Value
|
||||
pub fn from_value(value: &Value) -> Option<Self> {
|
||||
match value {
|
||||
Value::Blob(b) => Self::from_blob(b),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Hash128 {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.uuid)
|
||||
}
|
||||
}
|
||||
|
||||
// The DBSP paper uses as a key the whole record, with both the row key and the values. This is a
|
||||
// bit confuses for us in databases, because when you say "key", it is easy to understand that as
|
||||
// being the row key.
|
||||
@@ -30,7 +141,7 @@ pub struct HashableRow {
|
||||
pub values: Vec<Value>,
|
||||
// Pre-computed hash: DBSP rows are immutable and frequently hashed during joins,
|
||||
// making caching worthwhile despite the memory overhead
|
||||
cached_hash: u64,
|
||||
cached_hash: Hash128,
|
||||
}
|
||||
|
||||
impl HashableRow {
|
||||
@@ -43,47 +154,23 @@ impl HashableRow {
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_hash(rowid: i64, values: &[Value]) -> u64 {
|
||||
let mut hasher = DefaultHasher::new();
|
||||
|
||||
rowid.hash(&mut hasher);
|
||||
|
||||
for value in values {
|
||||
match value {
|
||||
Value::Null => {
|
||||
0u8.hash(&mut hasher);
|
||||
}
|
||||
Value::Integer(i) => {
|
||||
1u8.hash(&mut hasher);
|
||||
i.hash(&mut hasher);
|
||||
}
|
||||
Value::Float(f) => {
|
||||
2u8.hash(&mut hasher);
|
||||
f.to_bits().hash(&mut hasher);
|
||||
}
|
||||
Value::Text(s) => {
|
||||
3u8.hash(&mut hasher);
|
||||
s.value.hash(&mut hasher);
|
||||
(s.subtype as u8).hash(&mut hasher);
|
||||
}
|
||||
Value::Blob(b) => {
|
||||
4u8.hash(&mut hasher);
|
||||
b.hash(&mut hasher);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
hasher.finish()
|
||||
fn compute_hash(rowid: i64, values: &[Value]) -> Hash128 {
|
||||
// Include rowid in the hash by prepending it to values
|
||||
let mut all_values = Vec::with_capacity(values.len() + 1);
|
||||
all_values.push(Value::Integer(rowid));
|
||||
all_values.extend_from_slice(values);
|
||||
Hash128::hash_values(&all_values)
|
||||
}
|
||||
|
||||
pub fn cached_hash(&self) -> u64 {
|
||||
pub fn cached_hash(&self) -> Hash128 {
|
||||
self.cached_hash
|
||||
}
|
||||
}
|
||||
|
||||
impl Hash for HashableRow {
|
||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||
self.cached_hash.hash(state);
|
||||
// Hash the 128-bit value by hashing both parts
|
||||
self.cached_hash.to_blob().hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
use crate::incremental::dbsp::Hash128;
|
||||
use crate::incremental::dbsp::{Delta, DeltaPair, HashableRow};
|
||||
use crate::incremental::operator::{
|
||||
generate_storage_id, ComputationTracker, DbspStateCursors, EvalState, IncrementalOperator,
|
||||
@@ -22,23 +23,39 @@ pub enum JoinType {
|
||||
fn read_next_join_row(
|
||||
storage_id: i64,
|
||||
join_key: &HashableRow,
|
||||
last_element_id: i64,
|
||||
last_element_hash: Option<Hash128>,
|
||||
cursors: &mut DbspStateCursors,
|
||||
) -> Result<IOResult<Option<(i64, HashableRow, isize)>>> {
|
||||
) -> Result<IOResult<Option<(Hash128, HashableRow, isize)>>> {
|
||||
// Build the index key: (storage_id, zset_id, element_id)
|
||||
// zset_id is the hash of the join key
|
||||
let zset_id = join_key.cached_hash() as i64;
|
||||
let zset_hash = join_key.cached_hash();
|
||||
|
||||
let index_key_values = vec![
|
||||
Value::Integer(storage_id),
|
||||
Value::Integer(zset_id),
|
||||
Value::Integer(last_element_id),
|
||||
];
|
||||
// For iteration, use the last element hash if we have one, or NULL to start
|
||||
let index_key_values = match last_element_hash {
|
||||
Some(last_hash) => vec![
|
||||
Value::Integer(storage_id),
|
||||
zset_hash.to_value(),
|
||||
last_hash.to_value(),
|
||||
],
|
||||
None => vec![
|
||||
Value::Integer(storage_id),
|
||||
zset_hash.to_value(),
|
||||
Value::Null, // Start iteration from beginning
|
||||
],
|
||||
};
|
||||
|
||||
let index_record = ImmutableRecord::from_values(&index_key_values, index_key_values.len());
|
||||
|
||||
// Use GE (>=) for initial seek with NULL, GT (>) for continuation
|
||||
let seek_op = if last_element_hash.is_none() {
|
||||
SeekOp::GE { eq_only: false }
|
||||
} else {
|
||||
SeekOp::GT
|
||||
};
|
||||
|
||||
let seek_result = return_if_io!(cursors
|
||||
.index_cursor
|
||||
.seek(SeekKey::IndexKey(&index_record), SeekOp::GT));
|
||||
.seek(SeekKey::IndexKey(&index_record), seek_op));
|
||||
|
||||
if !matches!(seek_result, SeekResult::Found) {
|
||||
return Ok(IOResult::Done(None));
|
||||
@@ -48,7 +65,7 @@ fn read_next_join_row(
|
||||
let current_record = return_if_io!(cursors.index_cursor.record());
|
||||
|
||||
// Extract all needed values from the record before dropping it
|
||||
let (found_storage_id, found_zset_id, element_id) = if let Some(rec) = current_record {
|
||||
let (found_storage_id, found_zset_hash, element_hash) = if let Some(rec) = current_record {
|
||||
let values = rec.get_values();
|
||||
|
||||
// Index has 4 values: storage_id, zset_id, element_id, rowid (appended by WriteRow)
|
||||
@@ -57,17 +74,21 @@ fn read_next_join_row(
|
||||
Value::Integer(id) => *id,
|
||||
_ => return Ok(IOResult::Done(None)),
|
||||
};
|
||||
let found_zset_id = match &values[1].to_owned() {
|
||||
Value::Integer(id) => *id,
|
||||
let found_zset_hash = match &values[1].to_owned() {
|
||||
Value::Blob(blob) => Hash128::from_blob(blob).ok_or_else(|| {
|
||||
crate::LimboError::InternalError("Invalid zset_hash blob".to_string())
|
||||
})?,
|
||||
_ => return Ok(IOResult::Done(None)),
|
||||
};
|
||||
let element_id = match &values[2].to_owned() {
|
||||
Value::Integer(id) => *id,
|
||||
let element_hash = match &values[2].to_owned() {
|
||||
Value::Blob(blob) => Hash128::from_blob(blob).ok_or_else(|| {
|
||||
crate::LimboError::InternalError("Invalid element_hash blob".to_string())
|
||||
})?,
|
||||
_ => {
|
||||
return Ok(IOResult::Done(None));
|
||||
}
|
||||
};
|
||||
(found_storage_id, found_zset_id, element_id)
|
||||
(found_storage_id, found_zset_hash, element_hash)
|
||||
} else {
|
||||
return Ok(IOResult::Done(None));
|
||||
}
|
||||
@@ -77,7 +98,7 @@ fn read_next_join_row(
|
||||
|
||||
// Now we can safely check if we're in the right range
|
||||
// If we've moved to a different storage_id or zset_id, we're done
|
||||
if found_storage_id != storage_id || found_zset_id != zset_id {
|
||||
if found_storage_id != storage_id || found_zset_hash != zset_hash {
|
||||
return Ok(IOResult::Done(None));
|
||||
}
|
||||
|
||||
@@ -109,7 +130,7 @@ fn read_next_join_row(
|
||||
_ => return Ok(IOResult::Done(None)),
|
||||
};
|
||||
|
||||
return Ok(IOResult::Done(Some((element_id, row, weight))));
|
||||
return Ok(IOResult::Done(Some((element_hash, row, weight))));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -127,13 +148,13 @@ pub enum JoinEvalState {
|
||||
deltas: DeltaPair,
|
||||
output: Delta,
|
||||
current_idx: usize,
|
||||
last_row_scanned: i64,
|
||||
last_row_scanned: Option<Hash128>,
|
||||
},
|
||||
ProcessRightJoin {
|
||||
deltas: DeltaPair,
|
||||
output: Delta,
|
||||
current_idx: usize,
|
||||
last_row_scanned: i64,
|
||||
last_row_scanned: Option<Hash128>,
|
||||
},
|
||||
Done {
|
||||
output: Delta,
|
||||
@@ -151,9 +172,9 @@ impl JoinEvalState {
|
||||
// Combine the rows
|
||||
let mut combined_values = left_row.values.clone();
|
||||
combined_values.extend(right_row.values.clone());
|
||||
// Use hash of the combined values as rowid to ensure uniqueness
|
||||
// Use hash of combined values as synthetic rowid
|
||||
let temp_row = HashableRow::new(0, combined_values.clone());
|
||||
let joined_rowid = temp_row.cached_hash() as i64;
|
||||
let joined_rowid = temp_row.cached_hash().as_i64();
|
||||
let joined_row = HashableRow::new(joined_rowid, combined_values);
|
||||
|
||||
// Add to output with combined weight
|
||||
@@ -177,7 +198,7 @@ impl JoinEvalState {
|
||||
deltas: std::mem::take(deltas),
|
||||
output: std::mem::take(output),
|
||||
current_idx: 0,
|
||||
last_row_scanned: i64::MIN,
|
||||
last_row_scanned: None,
|
||||
};
|
||||
}
|
||||
JoinEvalState::ProcessLeftJoin {
|
||||
@@ -191,7 +212,7 @@ impl JoinEvalState {
|
||||
deltas: std::mem::take(deltas),
|
||||
output: std::mem::take(output),
|
||||
current_idx: 0,
|
||||
last_row_scanned: i64::MIN,
|
||||
last_row_scanned: None,
|
||||
};
|
||||
} else {
|
||||
let (left_row, left_weight) = &deltas.left.changes[*current_idx];
|
||||
@@ -209,7 +230,7 @@ impl JoinEvalState {
|
||||
cursors
|
||||
));
|
||||
match next_row {
|
||||
Some((element_id, right_row, right_weight)) => {
|
||||
Some((element_hash, right_row, right_weight)) => {
|
||||
Self::combine_rows(
|
||||
left_row,
|
||||
(*left_weight) as i64,
|
||||
@@ -222,7 +243,7 @@ impl JoinEvalState {
|
||||
deltas: std::mem::take(deltas),
|
||||
output: std::mem::take(output),
|
||||
current_idx: *current_idx,
|
||||
last_row_scanned: element_id,
|
||||
last_row_scanned: Some(element_hash),
|
||||
};
|
||||
}
|
||||
None => {
|
||||
@@ -231,7 +252,7 @@ impl JoinEvalState {
|
||||
deltas: std::mem::take(deltas),
|
||||
output: std::mem::take(output),
|
||||
current_idx: *current_idx + 1,
|
||||
last_row_scanned: i64::MIN,
|
||||
last_row_scanned: None,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -263,7 +284,7 @@ impl JoinEvalState {
|
||||
cursors
|
||||
));
|
||||
match next_row {
|
||||
Some((element_id, left_row, left_weight)) => {
|
||||
Some((element_hash, left_row, left_weight)) => {
|
||||
Self::combine_rows(
|
||||
&left_row,
|
||||
left_weight as i64,
|
||||
@@ -276,7 +297,7 @@ impl JoinEvalState {
|
||||
deltas: std::mem::take(deltas),
|
||||
output: std::mem::take(output),
|
||||
current_idx: *current_idx,
|
||||
last_row_scanned: element_id,
|
||||
last_row_scanned: Some(element_hash),
|
||||
};
|
||||
}
|
||||
None => {
|
||||
@@ -285,7 +306,7 @@ impl JoinEvalState {
|
||||
deltas: std::mem::take(deltas),
|
||||
output: std::mem::take(output),
|
||||
current_idx: *current_idx + 1,
|
||||
last_row_scanned: i64::MIN,
|
||||
last_row_scanned: None,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -376,7 +397,7 @@ impl JoinOperator {
|
||||
JoinType::Inner => {} // Inner join is supported
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
let result = Self {
|
||||
operator_id,
|
||||
join_type,
|
||||
left_key_indices,
|
||||
@@ -385,7 +406,8 @@ impl JoinOperator {
|
||||
right_columns,
|
||||
tracker: None,
|
||||
commit_state: JoinCommitState::Idle,
|
||||
})
|
||||
};
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Extract join key from row values using the specified indices
|
||||
@@ -485,8 +507,9 @@ impl JoinOperator {
|
||||
|
||||
// Create the joined row with a unique rowid
|
||||
// Use hash of the combined values to ensure uniqueness
|
||||
// Use hash of combined values as synthetic rowid
|
||||
let temp_row = HashableRow::new(0, combined_values.clone());
|
||||
let joined_rowid = temp_row.cached_hash() as i64;
|
||||
let joined_rowid = temp_row.cached_hash().as_i64();
|
||||
let joined_row =
|
||||
HashableRow::new(joined_rowid, combined_values.clone());
|
||||
|
||||
@@ -518,124 +541,44 @@ impl JoinOperator {
|
||||
}
|
||||
}
|
||||
|
||||
// Helper to deserialize a HashableRow from a blob
|
||||
fn deserialize_hashable_row(blob: &[u8]) -> Result<HashableRow> {
|
||||
// Simple deserialization - this needs to match how we serialize in commit
|
||||
// Format: [rowid:8 bytes][num_values:4 bytes][values...]
|
||||
if blob.len() < 12 {
|
||||
use crate::types::ImmutableRecord;
|
||||
|
||||
let record = ImmutableRecord::from_bin_record(blob.to_vec());
|
||||
let ref_values = record.get_values();
|
||||
let all_values: Vec<Value> = ref_values.into_iter().map(|rv| rv.to_owned()).collect();
|
||||
|
||||
if all_values.is_empty() {
|
||||
return Err(crate::LimboError::InternalError(
|
||||
"Invalid blob size".to_string(),
|
||||
"HashableRow blob must contain at least rowid".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let rowid = i64::from_le_bytes(blob[0..8].try_into().unwrap());
|
||||
let num_values = u32::from_le_bytes(blob[8..12].try_into().unwrap()) as usize;
|
||||
|
||||
let mut values = Vec::new();
|
||||
let mut offset = 12;
|
||||
|
||||
for _ in 0..num_values {
|
||||
if offset >= blob.len() {
|
||||
break;
|
||||
// First value is the rowid
|
||||
let rowid = match &all_values[0] {
|
||||
Value::Integer(i) => *i,
|
||||
_ => {
|
||||
return Err(crate::LimboError::InternalError(
|
||||
"First value must be rowid (integer)".to_string(),
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let type_tag = blob[offset];
|
||||
offset += 1;
|
||||
|
||||
match type_tag {
|
||||
0 => values.push(Value::Null),
|
||||
1 => {
|
||||
if offset + 8 <= blob.len() {
|
||||
let i = i64::from_le_bytes(blob[offset..offset + 8].try_into().unwrap());
|
||||
values.push(Value::Integer(i));
|
||||
offset += 8;
|
||||
}
|
||||
}
|
||||
2 => {
|
||||
if offset + 8 <= blob.len() {
|
||||
let f = f64::from_le_bytes(blob[offset..offset + 8].try_into().unwrap());
|
||||
values.push(Value::Float(f));
|
||||
offset += 8;
|
||||
}
|
||||
}
|
||||
3 => {
|
||||
if offset + 4 <= blob.len() {
|
||||
let len =
|
||||
u32::from_le_bytes(blob[offset..offset + 4].try_into().unwrap()) as usize;
|
||||
offset += 4;
|
||||
if offset + len < blob.len() {
|
||||
let text_bytes = blob[offset..offset + len].to_vec();
|
||||
offset += len;
|
||||
let subtype = match blob[offset] {
|
||||
0 => crate::types::TextSubtype::Text,
|
||||
1 => crate::types::TextSubtype::Json,
|
||||
_ => crate::types::TextSubtype::Text,
|
||||
};
|
||||
offset += 1;
|
||||
values.push(Value::Text(crate::types::Text {
|
||||
value: text_bytes,
|
||||
subtype,
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
4 => {
|
||||
if offset + 4 <= blob.len() {
|
||||
let len =
|
||||
u32::from_le_bytes(blob[offset..offset + 4].try_into().unwrap()) as usize;
|
||||
offset += 4;
|
||||
if offset + len <= blob.len() {
|
||||
let blob_data = blob[offset..offset + len].to_vec();
|
||||
values.push(Value::Blob(blob_data));
|
||||
offset += len;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => break, // Unknown type tag
|
||||
}
|
||||
}
|
||||
// Rest are the row values
|
||||
let values = all_values[1..].to_vec();
|
||||
|
||||
Ok(HashableRow::new(rowid, values))
|
||||
}
|
||||
|
||||
// Helper to serialize a HashableRow to a blob
|
||||
fn serialize_hashable_row(row: &HashableRow) -> Vec<u8> {
|
||||
let mut blob = Vec::new();
|
||||
use crate::types::ImmutableRecord;
|
||||
|
||||
// Write rowid
|
||||
blob.extend_from_slice(&row.rowid.to_le_bytes());
|
||||
let mut all_values = Vec::with_capacity(row.values.len() + 1);
|
||||
all_values.push(Value::Integer(row.rowid));
|
||||
all_values.extend_from_slice(&row.values);
|
||||
|
||||
// Write number of values
|
||||
blob.extend_from_slice(&(row.values.len() as u32).to_le_bytes());
|
||||
|
||||
// Write each value directly with type tags (like AggregateState does)
|
||||
for value in &row.values {
|
||||
match value {
|
||||
Value::Null => blob.push(0u8),
|
||||
Value::Integer(i) => {
|
||||
blob.push(1u8);
|
||||
blob.extend_from_slice(&i.to_le_bytes());
|
||||
}
|
||||
Value::Float(f) => {
|
||||
blob.push(2u8);
|
||||
blob.extend_from_slice(&f.to_le_bytes());
|
||||
}
|
||||
Value::Text(s) => {
|
||||
blob.push(3u8);
|
||||
let bytes = &s.value;
|
||||
blob.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
|
||||
blob.extend_from_slice(bytes);
|
||||
blob.push(s.subtype as u8);
|
||||
}
|
||||
Value::Blob(b) => {
|
||||
blob.push(4u8);
|
||||
blob.extend_from_slice(&(b.len() as u32).to_le_bytes());
|
||||
blob.extend_from_slice(b);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
blob
|
||||
let record = ImmutableRecord::from_values(&all_values, all_values.len());
|
||||
record.as_blob().clone()
|
||||
}
|
||||
|
||||
impl IncrementalOperator for JoinOperator {
|
||||
@@ -697,20 +640,20 @@ impl IncrementalOperator for JoinOperator {
|
||||
// The index key: (storage_id, zset_id, element_id)
|
||||
// zset_id is the hash of the join key, element_id is hash of the row
|
||||
let storage_id = self.left_storage_id();
|
||||
let zset_id = join_key.cached_hash() as i64;
|
||||
let element_id = row.cached_hash() as i64;
|
||||
let zset_hash = join_key.cached_hash();
|
||||
let element_hash = row.cached_hash();
|
||||
let index_key = vec![
|
||||
Value::Integer(storage_id),
|
||||
Value::Integer(zset_id),
|
||||
Value::Integer(element_id),
|
||||
zset_hash.to_value(),
|
||||
element_hash.to_value(),
|
||||
];
|
||||
|
||||
// The record values: we'll store the serialized row as a blob
|
||||
let row_blob = serialize_hashable_row(row);
|
||||
let record_values = vec![
|
||||
Value::Integer(self.left_storage_id()),
|
||||
Value::Integer(join_key.cached_hash() as i64),
|
||||
Value::Integer(row.cached_hash() as i64),
|
||||
zset_hash.to_value(),
|
||||
element_hash.to_value(),
|
||||
Value::Blob(row_blob),
|
||||
];
|
||||
|
||||
@@ -745,18 +688,20 @@ impl IncrementalOperator for JoinOperator {
|
||||
let join_key = self.extract_join_key(&row.values, &self.right_key_indices);
|
||||
|
||||
// The index key: (storage_id, zset_id, element_id)
|
||||
let zset_hash = join_key.cached_hash();
|
||||
let element_hash = row.cached_hash();
|
||||
let index_key = vec![
|
||||
Value::Integer(self.right_storage_id()),
|
||||
Value::Integer(join_key.cached_hash() as i64),
|
||||
Value::Integer(row.cached_hash() as i64),
|
||||
zset_hash.to_value(),
|
||||
element_hash.to_value(),
|
||||
];
|
||||
|
||||
// The record values: we'll store the serialized row as a blob
|
||||
let row_blob = serialize_hashable_row(row);
|
||||
let record_values = vec![
|
||||
Value::Integer(self.right_storage_id()),
|
||||
Value::Integer(join_key.cached_hash() as i64),
|
||||
Value::Integer(row.cached_hash() as i64),
|
||||
zset_hash.to_value(),
|
||||
element_hash.to_value(),
|
||||
Value::Blob(row_blob),
|
||||
];
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ impl MergeOperator {
|
||||
for (row, weight) in delta.changes {
|
||||
// Hash only the values (not rowid) for deduplication
|
||||
let temp_row = HashableRow::new(0, row.values.clone());
|
||||
let value_hash = temp_row.cached_hash();
|
||||
let value_hash = temp_row.cached_hash().as_i64() as u64;
|
||||
|
||||
// Check if we've seen this value before
|
||||
let assigned_rowid =
|
||||
|
||||
@@ -332,20 +332,24 @@ mod tests {
|
||||
// Get the blob data from column 3 (value column)
|
||||
if let Some(Value::Blob(blob)) = values.get(3) {
|
||||
// Deserialize the state
|
||||
if let Some((state, group_key)) =
|
||||
AggregateState::from_blob(blob, &agg.aggregates)
|
||||
{
|
||||
// Should not have made it this far.
|
||||
assert!(state.count != 0);
|
||||
// Build output row: group_by columns + aggregate values
|
||||
let mut output_values = group_key.clone();
|
||||
output_values.extend(state.to_values(&agg.aggregates));
|
||||
match AggregateState::from_blob(blob) {
|
||||
Ok((state, group_key)) => {
|
||||
// Should not have made it this far.
|
||||
assert!(state.count != 0);
|
||||
// Build output row: group_by columns + aggregate values
|
||||
let mut output_values = group_key.clone();
|
||||
output_values.extend(state.to_values(&agg.aggregates));
|
||||
|
||||
let group_key_str = AggregateOperator::group_key_to_string(&group_key);
|
||||
let rowid = agg.generate_group_rowid(&group_key_str);
|
||||
|
||||
let output_row = HashableRow::new(rowid, output_values);
|
||||
result.changes.push((output_row, 1));
|
||||
let group_key_str = AggregateOperator::group_key_to_string(&group_key);
|
||||
let rowid = agg.generate_group_rowid(&group_key_str);
|
||||
let output_row = HashableRow::new(rowid, output_values);
|
||||
result.changes.push((output_row, 1));
|
||||
}
|
||||
Err(e) => {
|
||||
// Log or handle the deserialization error
|
||||
// For now, we'll skip this entry
|
||||
eprintln!("Failed to deserialize aggregate state: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2924,7 +2928,6 @@ mod tests {
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_page_id, &index_def, 10);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
let mut join = JoinOperator::new(
|
||||
1, // operator_id
|
||||
JoinType::Inner,
|
||||
@@ -2940,7 +2943,6 @@ mod tests {
|
||||
left_delta.insert(1, vec![Value::Integer(1), Value::Float(100.0)]);
|
||||
left_delta.insert(2, vec![Value::Integer(2), Value::Float(200.0)]);
|
||||
left_delta.insert(3, vec![Value::Integer(3), Value::Float(300.0)]); // No match initially
|
||||
|
||||
let mut right_delta = Delta::new();
|
||||
right_delta.insert(1, vec![Value::Integer(1), Value::Text("Alice".into())]);
|
||||
right_delta.insert(2, vec![Value::Integer(2), Value::Text("Bob".into())]);
|
||||
@@ -4011,4 +4013,115 @@ mod tests {
|
||||
panic!("Expected Done result");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_aggregate_serialization_with_different_column_indices() {
|
||||
// Test that aggregate state serialization correctly preserves column indices
|
||||
// when multiple aggregates operate on different columns
|
||||
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
|
||||
let table_cursor = BTreeCursor::new_table(None, pager.clone(), table_root_page_id, 5);
|
||||
let index_def = create_dbsp_state_index(index_root_page_id);
|
||||
let index_cursor =
|
||||
BTreeCursor::new_index(None, pager.clone(), index_root_page_id, &index_def, 4);
|
||||
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
|
||||
|
||||
// Create first operator with SUM(col1), MIN(col3) GROUP BY col0
|
||||
let mut agg1 = AggregateOperator::new(
|
||||
1,
|
||||
vec![0],
|
||||
vec![AggregateFunction::Sum(1), AggregateFunction::Min(3)],
|
||||
vec![
|
||||
"group".to_string(),
|
||||
"val1".to_string(),
|
||||
"val2".to_string(),
|
||||
"val3".to_string(),
|
||||
],
|
||||
);
|
||||
|
||||
// Add initial data
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![
|
||||
Value::Text("A".into()),
|
||||
Value::Integer(10),
|
||||
Value::Integer(100),
|
||||
Value::Integer(5),
|
||||
],
|
||||
);
|
||||
delta.insert(
|
||||
2,
|
||||
vec![
|
||||
Value::Text("A".into()),
|
||||
Value::Integer(15),
|
||||
Value::Integer(200),
|
||||
Value::Integer(3),
|
||||
],
|
||||
);
|
||||
|
||||
let result1 = pager
|
||||
.io
|
||||
.block(|| agg1.commit((&delta).into(), &mut cursors))
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result1.changes.len(), 1);
|
||||
let (row1, _) = &result1.changes[0];
|
||||
assert_eq!(row1.values[0], Value::Text("A".into()));
|
||||
assert_eq!(row1.values[1], Value::Integer(25)); // SUM(val1) = 10 + 15
|
||||
assert_eq!(row1.values[2], Value::Integer(3)); // MIN(val3) = min(5, 3)
|
||||
|
||||
// Create operator with same ID but different column mappings: SUM(col3), MIN(col1)
|
||||
let mut agg2 = AggregateOperator::new(
|
||||
1, // Same operator_id
|
||||
vec![0],
|
||||
vec![AggregateFunction::Sum(3), AggregateFunction::Min(1)],
|
||||
vec![
|
||||
"group".to_string(),
|
||||
"val1".to_string(),
|
||||
"val2".to_string(),
|
||||
"val3".to_string(),
|
||||
],
|
||||
);
|
||||
|
||||
// Process new data
|
||||
let mut delta2 = Delta::new();
|
||||
delta2.insert(
|
||||
3,
|
||||
vec![
|
||||
Value::Text("A".into()),
|
||||
Value::Integer(20),
|
||||
Value::Integer(300),
|
||||
Value::Integer(4),
|
||||
],
|
||||
);
|
||||
|
||||
let result2 = pager
|
||||
.io
|
||||
.block(|| agg2.commit((&delta2).into(), &mut cursors))
|
||||
.unwrap();
|
||||
|
||||
// Find the positive weight row for group A (the updated aggregate)
|
||||
let row2 = result2
|
||||
.changes
|
||||
.iter()
|
||||
.find(|(row, weight)| row.values[0] == Value::Text("A".into()) && *weight > 0)
|
||||
.expect("Should have a positive weight row for group A");
|
||||
let (row2, _) = row2;
|
||||
|
||||
// Verify that column indices are preserved correctly in serialization
|
||||
// When agg2 processes the data with different column mappings:
|
||||
// - It reads the existing state which has SUM(col1)=25 and MIN(col3)=3
|
||||
// - For SUM(col3), there's no existing state, so it starts fresh: 4
|
||||
// - For MIN(col1), there's no existing state, so it starts fresh: 20
|
||||
assert_eq!(
|
||||
row2.values[1],
|
||||
Value::Integer(4),
|
||||
"SUM(col3) should be 4 (new data only)"
|
||||
);
|
||||
assert_eq!(
|
||||
row2.values[2],
|
||||
Value::Integer(20),
|
||||
"MIN(col1) should be 20 (new data only)"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::incremental::operator::{AggregateFunction, AggregateState, DbspStateCursors};
|
||||
use crate::incremental::operator::{AggregateState, DbspStateCursors};
|
||||
use crate::storage::btree::{BTreeCursor, BTreeKey};
|
||||
use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult};
|
||||
use crate::{return_if_io, LimboError, Result, Value};
|
||||
@@ -20,7 +20,6 @@ impl ReadRecord {
|
||||
pub fn read_record(
|
||||
&mut self,
|
||||
key: SeekKey,
|
||||
aggregates: &[AggregateFunction],
|
||||
cursor: &mut BTreeCursor,
|
||||
) -> Result<IOResult<Option<AggregateState>>> {
|
||||
loop {
|
||||
@@ -41,12 +40,7 @@ impl ReadRecord {
|
||||
let blob = values[3].to_owned();
|
||||
|
||||
let (state, _group_key) = match blob {
|
||||
Value::Blob(blob) => AggregateState::from_blob(&blob, aggregates)
|
||||
.ok_or_else(|| {
|
||||
LimboError::InternalError(format!(
|
||||
"Cannot deserialize aggregate state {blob:?}",
|
||||
))
|
||||
}),
|
||||
Value::Blob(blob) => AggregateState::from_blob(&blob),
|
||||
_ => Err(LimboError::ParseError(
|
||||
"Value in aggregator not blob".to_string(),
|
||||
)),
|
||||
|
||||
124
core/schema.rs
124
core/schema.rs
@@ -43,7 +43,7 @@ use turso_parser::{
|
||||
|
||||
const SCHEMA_TABLE_NAME: &str = "sqlite_schema";
|
||||
const SCHEMA_TABLE_NAME_ALT: &str = "sqlite_master";
|
||||
pub const DBSP_TABLE_PREFIX: &str = "__turso_internal_dbsp_state_";
|
||||
pub const DBSP_TABLE_PREFIX: &str = "__turso_internal_dbsp_state_v";
|
||||
|
||||
/// Used to refer to the implicit rowid column in tables without an alias during UPDATE
|
||||
pub const ROWID_SENTINEL: usize = usize::MAX;
|
||||
@@ -77,6 +77,9 @@ pub struct Schema {
|
||||
|
||||
/// Mapping from table names to the materialized views that depend on them
|
||||
pub table_to_materialized_views: HashMap<String, Vec<String>>,
|
||||
|
||||
/// Track views that exist but have incompatible versions
|
||||
pub incompatible_views: HashSet<String>,
|
||||
}
|
||||
|
||||
impl Schema {
|
||||
@@ -100,6 +103,7 @@ impl Schema {
|
||||
let incremental_views = HashMap::new();
|
||||
let views: ViewsMap = HashMap::new();
|
||||
let table_to_materialized_views: HashMap<String, Vec<String>> = HashMap::new();
|
||||
let incompatible_views = HashSet::new();
|
||||
Self {
|
||||
tables,
|
||||
materialized_view_names,
|
||||
@@ -111,6 +115,7 @@ impl Schema {
|
||||
indexes_enabled,
|
||||
schema_version: 0,
|
||||
table_to_materialized_views,
|
||||
incompatible_views,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -140,11 +145,39 @@ impl Schema {
|
||||
self.incremental_views.get(&name).cloned()
|
||||
}
|
||||
|
||||
/// Check if DBSP state table exists with the current version
|
||||
pub fn has_compatible_dbsp_state_table(&self, view_name: &str) -> bool {
|
||||
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
|
||||
let view_name = normalize_ident(view_name);
|
||||
let expected_table_name = format!("{DBSP_TABLE_PREFIX}{DBSP_CIRCUIT_VERSION}_{view_name}");
|
||||
|
||||
// Check if a table with the expected versioned name exists
|
||||
self.tables.contains_key(&expected_table_name)
|
||||
}
|
||||
|
||||
pub fn is_materialized_view(&self, name: &str) -> bool {
|
||||
let name = normalize_ident(name);
|
||||
self.materialized_view_names.contains(&name)
|
||||
}
|
||||
|
||||
/// Check if a table has any incompatible dependent materialized views
|
||||
pub fn has_incompatible_dependent_views(&self, table_name: &str) -> Vec<String> {
|
||||
let table_name = normalize_ident(table_name);
|
||||
|
||||
// Get all materialized views that depend on this table
|
||||
let dependent_views = self
|
||||
.table_to_materialized_views
|
||||
.get(&table_name)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
// Filter to only incompatible views
|
||||
dependent_views
|
||||
.into_iter()
|
||||
.filter(|view_name| self.incompatible_views.contains(view_name))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn remove_view(&mut self, name: &str) -> Result<()> {
|
||||
let name = normalize_ident(name);
|
||||
|
||||
@@ -519,12 +552,21 @@ impl Schema {
|
||||
dbsp_state_index_roots: std::collections::HashMap<String, usize>,
|
||||
) -> Result<()> {
|
||||
for (view_name, (sql, main_root)) in materialized_view_info {
|
||||
// Look up the DBSP state root for this view - must exist for materialized views
|
||||
let dbsp_state_root = dbsp_state_roots.get(&view_name).ok_or_else(|| {
|
||||
LimboError::InternalError(format!(
|
||||
"Materialized view {view_name} is missing its DBSP state table"
|
||||
))
|
||||
})?;
|
||||
// Look up the DBSP state root for this view
|
||||
// If missing, it means version mismatch - skip this view
|
||||
// Check if we have a compatible DBSP state root
|
||||
let dbsp_state_root = if let Some(&root) = dbsp_state_roots.get(&view_name) {
|
||||
root
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"Materialized view '{}' has incompatible version or missing DBSP state table",
|
||||
view_name
|
||||
);
|
||||
// Track this as an incompatible view
|
||||
self.incompatible_views.insert(view_name.clone());
|
||||
// Use a dummy root page - the view won't be usable anyway
|
||||
0
|
||||
};
|
||||
|
||||
// Look up the DBSP state index root (may not exist for older schemas)
|
||||
let dbsp_state_index_root =
|
||||
@@ -534,7 +576,7 @@ impl Schema {
|
||||
&sql,
|
||||
self,
|
||||
main_root,
|
||||
*dbsp_state_root,
|
||||
dbsp_state_root,
|
||||
dbsp_state_index_root,
|
||||
)?;
|
||||
let referenced_tables = incremental_view.get_referenced_table_names();
|
||||
@@ -552,9 +594,12 @@ impl Schema {
|
||||
unique_sets: vec![],
|
||||
})));
|
||||
|
||||
self.add_materialized_view(incremental_view, table, sql);
|
||||
// Only add to schema if compatible
|
||||
if !self.incompatible_views.contains(&view_name) {
|
||||
self.add_materialized_view(incremental_view, table, sql);
|
||||
}
|
||||
|
||||
// Register dependencies
|
||||
// Register dependencies regardless of compatibility
|
||||
for table_name in referenced_tables {
|
||||
self.add_materialized_view_dependency(&table_name, &view_name);
|
||||
}
|
||||
@@ -606,13 +651,33 @@ impl Schema {
|
||||
|
||||
// Check if this is a DBSP state table
|
||||
if table.name.starts_with(DBSP_TABLE_PREFIX) {
|
||||
// Extract the view name from __turso_internal_dbsp_state_<viewname>
|
||||
let view_name = table
|
||||
.name
|
||||
.strip_prefix(DBSP_TABLE_PREFIX)
|
||||
.unwrap()
|
||||
.to_string();
|
||||
dbsp_state_roots.insert(view_name, root_page as usize);
|
||||
// Extract version and view name from __turso_internal_dbsp_state_v<version>_<viewname>
|
||||
let suffix = table.name.strip_prefix(DBSP_TABLE_PREFIX).unwrap();
|
||||
|
||||
// Parse version and view name (format: "<version>_<viewname>")
|
||||
if let Some(underscore_pos) = suffix.find('_') {
|
||||
let version_str = &suffix[..underscore_pos];
|
||||
let view_name = &suffix[underscore_pos + 1..];
|
||||
|
||||
// Check version compatibility
|
||||
if let Ok(stored_version) = version_str.parse::<u32>() {
|
||||
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
|
||||
if stored_version == DBSP_CIRCUIT_VERSION {
|
||||
// Version matches, store the root page
|
||||
dbsp_state_roots
|
||||
.insert(view_name.to_string(), root_page as usize);
|
||||
} else {
|
||||
// Version mismatch - DO NOT insert into dbsp_state_roots
|
||||
// This will cause populate_materialized_views to skip this view
|
||||
tracing::warn!(
|
||||
"Skipping materialized view '{}' - has version {} but current version is {}. DROP and recreate the view to use it.",
|
||||
view_name, stored_version, DBSP_CIRCUIT_VERSION
|
||||
);
|
||||
// We can't track incompatible views here since we're in handle_schema_row
|
||||
// which doesn't have mutable access to self
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(mv_store) = mv_store {
|
||||
@@ -640,12 +705,23 @@ impl Schema {
|
||||
|
||||
// Check if this is an index for a DBSP state table
|
||||
if table_name.starts_with(DBSP_TABLE_PREFIX) {
|
||||
// Extract the view name from __turso_internal_dbsp_state_<viewname>
|
||||
let view_name = table_name
|
||||
.strip_prefix(DBSP_TABLE_PREFIX)
|
||||
.unwrap()
|
||||
.to_string();
|
||||
dbsp_state_index_roots.insert(view_name, root_page as usize);
|
||||
// Extract version and view name from __turso_internal_dbsp_state_v<version>_<viewname>
|
||||
let suffix = table_name.strip_prefix(DBSP_TABLE_PREFIX).unwrap();
|
||||
|
||||
// Parse version and view name (format: "<version>_<viewname>")
|
||||
if let Some(underscore_pos) = suffix.find('_') {
|
||||
let version_str = &suffix[..underscore_pos];
|
||||
let view_name = &suffix[underscore_pos + 1..];
|
||||
|
||||
// Only store index root if version matches
|
||||
if let Ok(stored_version) = version_str.parse::<u32>() {
|
||||
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
|
||||
if stored_version == DBSP_CIRCUIT_VERSION {
|
||||
dbsp_state_index_roots
|
||||
.insert(view_name.to_string(), root_page as usize);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match automatic_indices.entry(table_name) {
|
||||
std::collections::hash_map::Entry::Vacant(e) => {
|
||||
@@ -772,6 +848,7 @@ impl Clone for Schema {
|
||||
.map(|(name, view)| (name.clone(), view.clone()))
|
||||
.collect();
|
||||
let views = self.views.clone();
|
||||
let incompatible_views = self.incompatible_views.clone();
|
||||
Self {
|
||||
tables,
|
||||
materialized_view_names,
|
||||
@@ -783,6 +860,7 @@ impl Clone for Schema {
|
||||
indexes_enabled: self.indexes_enabled,
|
||||
schema_version: self.schema_version,
|
||||
table_to_materialized_views: self.table_to_materialized_views.clone(),
|
||||
incompatible_views,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,6 +89,20 @@ pub fn prepare_delete_plan(
|
||||
crate::bail_parse_error!("cannot modify materialized view {}", tbl_name);
|
||||
}
|
||||
|
||||
// Check if this table has any incompatible dependent views
|
||||
let incompatible_views = schema.has_incompatible_dependent_views(&tbl_name);
|
||||
if !incompatible_views.is_empty() {
|
||||
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
|
||||
crate::bail_parse_error!(
|
||||
"Cannot DELETE from table '{}' because it has incompatible dependent materialized view(s): {}. \n\
|
||||
These views were created with a different DBSP version than the current version ({}). \n\
|
||||
Please DROP and recreate the view(s) before modifying this table.",
|
||||
tbl_name,
|
||||
incompatible_views.join(", "),
|
||||
DBSP_CIRCUIT_VERSION
|
||||
);
|
||||
}
|
||||
|
||||
let table = if let Some(table) = table.virtual_table() {
|
||||
Table::Virtual(table.clone())
|
||||
} else if let Some(table) = table.btree() {
|
||||
|
||||
@@ -96,6 +96,20 @@ pub fn translate_insert(
|
||||
crate::bail_parse_error!("cannot modify materialized view {}", table_name);
|
||||
}
|
||||
|
||||
// Check if this table has any incompatible dependent views
|
||||
let incompatible_views = schema.has_incompatible_dependent_views(table_name.as_str());
|
||||
if !incompatible_views.is_empty() {
|
||||
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
|
||||
crate::bail_parse_error!(
|
||||
"Cannot INSERT into table '{}' because it has incompatible dependent materialized view(s): {}. \n\
|
||||
These views were created with a different DBSP version than the current version ({}). \n\
|
||||
Please DROP and recreate the view(s) before modifying this table.",
|
||||
table_name,
|
||||
incompatible_views.join(", "),
|
||||
DBSP_CIRCUIT_VERSION
|
||||
);
|
||||
}
|
||||
|
||||
let resolver = Resolver::new(schema, syms);
|
||||
|
||||
if let Some(virtual_table) = &table.virtual_table() {
|
||||
|
||||
@@ -433,6 +433,20 @@ fn parse_table(
|
||||
schema.get_materialized_view(table_name.as_str())
|
||||
});
|
||||
if let Some(view) = view {
|
||||
// First check if the DBSP state table exists with the correct version
|
||||
let has_compatible_state = connection.with_schema(database_id, |schema| {
|
||||
schema.has_compatible_dbsp_state_table(table_name.as_str())
|
||||
});
|
||||
|
||||
if !has_compatible_state {
|
||||
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
|
||||
return Err(crate::LimboError::InternalError(format!(
|
||||
"Materialized view '{table_name}' has an incompatible version. \n\
|
||||
The current version is {DBSP_CIRCUIT_VERSION}, but the view was created with a different version. \n\
|
||||
Please DROP and recreate the view to use it."
|
||||
)));
|
||||
}
|
||||
|
||||
// Check if this materialized view has persistent storage
|
||||
let view_guard = view.lock().unwrap();
|
||||
let root_page = view_guard.get_root_page();
|
||||
@@ -505,6 +519,24 @@ fn parse_table(
|
||||
}
|
||||
}
|
||||
|
||||
// Check if this is an incompatible view
|
||||
let is_incompatible = connection.with_schema(database_id, |schema| {
|
||||
schema
|
||||
.incompatible_views
|
||||
.contains(&normalized_qualified_name)
|
||||
});
|
||||
|
||||
if is_incompatible {
|
||||
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
|
||||
crate::bail_parse_error!(
|
||||
"Materialized view '{}' has an incompatible version. \n\
|
||||
The view was created with a different DBSP version than the current version ({}). \n\
|
||||
Please DROP and recreate the view to use it.",
|
||||
normalized_qualified_name,
|
||||
DBSP_CIRCUIT_VERSION
|
||||
);
|
||||
}
|
||||
|
||||
crate::bail_parse_error!("no such table: {}", normalized_qualified_name);
|
||||
}
|
||||
|
||||
|
||||
@@ -152,6 +152,20 @@ pub fn prepare_update_plan(
|
||||
bail_parse_error!("cannot modify materialized view {}", table_name);
|
||||
}
|
||||
|
||||
// Check if this table has any incompatible dependent views
|
||||
let incompatible_views = schema.has_incompatible_dependent_views(table_name.as_str());
|
||||
if !incompatible_views.is_empty() {
|
||||
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
|
||||
bail_parse_error!(
|
||||
"Cannot UPDATE table '{}' because it has incompatible dependent materialized view(s): {}. \n\
|
||||
These views were created with a different DBSP version than the current version ({}). \n\
|
||||
Please DROP and recreate the view(s) before modifying this table.",
|
||||
table_name,
|
||||
incompatible_views.join(", "),
|
||||
DBSP_CIRCUIT_VERSION
|
||||
);
|
||||
}
|
||||
|
||||
let table_name = table.get_name();
|
||||
let iter_dir = body
|
||||
.order_by
|
||||
|
||||
@@ -143,7 +143,10 @@ pub fn translate_create_materialized_view(
|
||||
)?;
|
||||
|
||||
// Add the DBSP state table to sqlite_master (required for materialized views)
|
||||
let dbsp_table_name = format!("{DBSP_TABLE_PREFIX}{normalized_view_name}");
|
||||
// Include the version number in the table name
|
||||
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
|
||||
let dbsp_table_name =
|
||||
format!("{DBSP_TABLE_PREFIX}{DBSP_CIRCUIT_VERSION}_{normalized_view_name}");
|
||||
// The element_id column uses SQLite's dynamic typing system to store different value types:
|
||||
// - For hash-based operators (joins, filters): stores INTEGER hash values or rowids
|
||||
// - For future MIN/MAX operators: stores the actual values being compared (INTEGER, REAL, TEXT, BLOB)
|
||||
@@ -151,8 +154,8 @@ pub fn translate_create_materialized_view(
|
||||
let dbsp_sql = format!(
|
||||
"CREATE TABLE {dbsp_table_name} (\
|
||||
operator_id INTEGER NOT NULL, \
|
||||
zset_id INTEGER NOT NULL, \
|
||||
element_id NOT NULL, \
|
||||
zset_id BLOB NOT NULL, \
|
||||
element_id BLOB NOT NULL, \
|
||||
value BLOB, \
|
||||
weight INTEGER NOT NULL, \
|
||||
PRIMARY KEY (operator_id, zset_id, element_id)\
|
||||
|
||||
Reference in New Issue
Block a user