Add an index to the dbsp internal table

And also change the schema of the main table. I have come to see the
current key-value schema as inadequate for non-aggregate operators.
Calculating Min/Max, for example, doesn't feat in this schema because
we have to be able to track existing values and index them.

Another alternative is to keep one table per operator type, but this
quickly leads to an explosion of tables.
This commit is contained in:
Glauber Costa
2025-09-13 14:41:39 -05:00
parent 3e9a5d93b5
commit 3565e7978a
7 changed files with 968 additions and 308 deletions

View File

@@ -8,15 +8,15 @@
use crate::incremental::dbsp::{Delta, DeltaPair};
use crate::incremental::expr_compiler::CompiledExpression;
use crate::incremental::operator::{
EvalState, FilterOperator, FilterPredicate, IncrementalOperator, InputOperator, ProjectOperator,
create_dbsp_state_index, DbspStateCursors, EvalState, FilterOperator, FilterPredicate,
IncrementalOperator, InputOperator, ProjectOperator,
};
use crate::incremental::persistence::WriteRow;
use crate::storage::btree::BTreeCursor;
use crate::storage::btree::{BTreeCursor, BTreeKey};
// Note: logical module must be made pub(crate) in translate/mod.rs
use crate::translate::logical::{
BinaryOperator, LogicalExpr, LogicalPlan, LogicalSchema, SchemaRef,
};
use crate::types::{IOResult, SeekKey, Value};
use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult, Value};
use crate::Pager;
use crate::{return_and_restore_if_io, return_if_io, LimboError, Result};
use std::collections::HashMap;
@@ -24,8 +24,120 @@ use std::fmt::{self, Display, Formatter};
use std::rc::Rc;
use std::sync::Arc;
// The state table is always a key-value store with 3 columns: key, state, and weight.
const OPERATOR_COLUMNS: usize = 3;
// The state table has 5 columns: operator_id, zset_id, element_id, value, weight
const OPERATOR_COLUMNS: usize = 5;
/// State machine for writing rows to simple materialized views (table-only, no index)
#[derive(Debug, Default)]
pub enum WriteRowView {
#[default]
GetRecord,
Delete,
Insert {
final_weight: isize,
},
Done,
}
impl WriteRowView {
pub fn new() -> Self {
Self::default()
}
/// Write a row with weight management for table-only storage.
///
/// # Arguments
/// * `cursor` - BTree cursor for the storage
/// * `key` - The key to seek (TableRowId)
/// * `build_record` - Function that builds the record values to insert.
/// Takes the final_weight and returns the complete record values.
/// * `weight` - The weight delta to apply
pub fn write_row(
&mut self,
cursor: &mut BTreeCursor,
key: SeekKey,
build_record: impl Fn(isize) -> Vec<Value>,
weight: isize,
) -> Result<IOResult<()>> {
loop {
match self {
WriteRowView::GetRecord => {
let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true }));
if !matches!(res, SeekResult::Found) {
*self = WriteRowView::Insert {
final_weight: weight,
};
} else {
let existing_record = return_if_io!(cursor.record());
let r = existing_record.ok_or_else(|| {
LimboError::InternalError(format!(
"Found key {key:?} in storage but could not read record"
))
})?;
let values = r.get_values();
// Weight is always the last value
let existing_weight = match values.last() {
Some(val) => match val.to_owned() {
Value::Integer(w) => w as isize,
_ => {
return Err(LimboError::InternalError(format!(
"Invalid weight value in storage for key {key:?}"
)))
}
},
None => {
return Err(LimboError::InternalError(format!(
"No weight value found in storage for key {key:?}"
)))
}
};
let final_weight = existing_weight + weight;
if final_weight <= 0 {
*self = WriteRowView::Delete
} else {
*self = WriteRowView::Insert { final_weight }
}
}
}
WriteRowView::Delete => {
// Mark as Done before delete to avoid retry on I/O
*self = WriteRowView::Done;
return_if_io!(cursor.delete());
}
WriteRowView::Insert { final_weight } => {
return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true }));
// Extract the row ID from the key
let key_i64 = match key {
SeekKey::TableRowId(id) => id,
_ => {
return Err(LimboError::InternalError(
"Expected TableRowId for storage".to_string(),
))
}
};
// Build the record values using the provided function
let record_values = build_record(*final_weight);
// Create an ImmutableRecord from the values
let immutable_record =
ImmutableRecord::from_values(&record_values, record_values.len());
let btree_key = BTreeKey::new_table_rowid(key_i64, Some(&immutable_record));
// Mark as Done before insert to avoid retry on I/O
*self = WriteRowView::Done;
return_if_io!(cursor.insert(&btree_key));
}
WriteRowView::Done => {
return Ok(IOResult::Done(()));
}
}
}
}
}
/// State machine for commit operations
pub enum CommitState {
@@ -36,8 +148,8 @@ pub enum CommitState {
CommitOperators {
/// Execute state for running the circuit
execute_state: Box<ExecuteState>,
/// Persistent cursor for operator state btree (internal_state_root)
state_cursor: Box<BTreeCursor>,
/// Persistent cursors for operator state (table and index)
state_cursors: Box<DbspStateCursors>,
},
/// Updating the materialized view with the delta
@@ -47,7 +159,7 @@ pub enum CommitState {
/// Current index in delta.changes being processed
current_index: usize,
/// State for writing individual rows
write_row_state: WriteRow,
write_row_state: WriteRowView,
/// Cursor for view data btree - created fresh for each row
view_cursor: Box<BTreeCursor>,
},
@@ -60,7 +172,8 @@ impl std::fmt::Debug for CommitState {
Self::CommitOperators { execute_state, .. } => f
.debug_struct("CommitOperators")
.field("execute_state", execute_state)
.field("has_state_cursor", &true)
.field("has_state_table_cursor", &true)
.field("has_state_index_cursor", &true)
.finish(),
Self::UpdateView {
delta,
@@ -221,25 +334,13 @@ impl std::fmt::Debug for DbspNode {
impl DbspNode {
fn process_node(
&mut self,
pager: Rc<Pager>,
eval_state: &mut EvalState,
root_page: usize,
commit_operators: bool,
state_cursor: Option<&mut Box<BTreeCursor>>,
cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>> {
// Process delta using the executable operator
let op = &mut self.executable;
// Use provided cursor or create a local one
let mut local_cursor;
let cursor = if let Some(cursor) = state_cursor {
cursor.as_mut()
} else {
// Create a local cursor if none was provided
local_cursor = BTreeCursor::new_table(None, pager.clone(), root_page, OPERATOR_COLUMNS);
&mut local_cursor
};
let state = if commit_operators {
// Clone the deltas from eval_state - don't extract them
// in case we need to re-execute due to I/O
@@ -247,12 +348,12 @@ impl DbspNode {
EvalState::Init { deltas } => deltas.clone(),
_ => panic!("commit can only be called when eval_state is in Init state"),
};
let result = return_if_io!(op.commit(deltas, cursor));
let result = return_if_io!(op.commit(deltas, cursors));
// After successful commit, move state to Done
*eval_state = EvalState::Done;
result
} else {
return_if_io!(op.eval(eval_state, cursor))
return_if_io!(op.eval(eval_state, cursors))
};
Ok(IOResult::Done(state))
}
@@ -275,14 +376,20 @@ pub struct DbspCircuit {
/// Root page for the main materialized view data
pub(super) main_data_root: usize,
/// Root page for internal DBSP state
/// Root page for internal DBSP state table
pub(super) internal_state_root: usize,
/// Root page for the DBSP state table's primary key index
pub(super) internal_state_index_root: usize,
}
impl DbspCircuit {
/// Create a new empty circuit with initial empty schema
/// The actual output schema will be set when the root node is established
pub fn new(main_data_root: usize, internal_state_root: usize) -> Self {
pub fn new(
main_data_root: usize,
internal_state_root: usize,
internal_state_index_root: usize,
) -> Self {
// Start with an empty schema - will be updated when root is set
let empty_schema = Arc::new(LogicalSchema::new(vec![]));
Self {
@@ -293,6 +400,7 @@ impl DbspCircuit {
commit_state: CommitState::Init,
main_data_root,
internal_state_root,
internal_state_index_root,
}
}
@@ -326,18 +434,18 @@ impl DbspCircuit {
pub fn run_circuit(
&mut self,
pager: Rc<Pager>,
execute_state: &mut ExecuteState,
pager: &Rc<Pager>,
state_cursors: &mut DbspStateCursors,
commit_operators: bool,
state_cursor: &mut Box<BTreeCursor>,
) -> Result<IOResult<Delta>> {
if let Some(root_id) = self.root {
self.execute_node(
root_id,
pager,
pager.clone(),
execute_state,
commit_operators,
Some(state_cursor),
state_cursors,
)
} else {
Err(LimboError::ParseError(
@@ -358,7 +466,23 @@ impl DbspCircuit {
execute_state: &mut ExecuteState,
) -> Result<IOResult<Delta>> {
if let Some(root_id) = self.root {
self.execute_node(root_id, pager, execute_state, false, None)
// Create temporary cursors for execute (non-commit) operations
let table_cursor = BTreeCursor::new_table(
None,
pager.clone(),
self.internal_state_root,
OPERATOR_COLUMNS,
);
let index_def = create_dbsp_state_index(self.internal_state_index_root);
let index_cursor = BTreeCursor::new_index(
None,
pager.clone(),
self.internal_state_index_root,
&index_def,
3,
);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
self.execute_node(root_id, pager, execute_state, false, &mut cursors)
} else {
Err(LimboError::ParseError(
"Circuit has no root node".to_string(),
@@ -398,29 +522,42 @@ impl DbspCircuit {
let mut state = std::mem::replace(&mut self.commit_state, CommitState::Init);
match &mut state {
CommitState::Init => {
// Create state cursor when entering CommitOperators state
let state_cursor = Box::new(BTreeCursor::new_table(
// Create state cursors when entering CommitOperators state
let state_table_cursor = BTreeCursor::new_table(
None,
pager.clone(),
self.internal_state_root,
OPERATOR_COLUMNS,
);
let index_def = create_dbsp_state_index(self.internal_state_index_root);
let state_index_cursor = BTreeCursor::new_index(
None,
pager.clone(),
self.internal_state_index_root,
&index_def,
3, // Index on first 3 columns
);
let state_cursors = Box::new(DbspStateCursors::new(
state_table_cursor,
state_index_cursor,
));
self.commit_state = CommitState::CommitOperators {
execute_state: Box::new(ExecuteState::Init {
input_data: input_delta_set.clone(),
}),
state_cursor,
state_cursors,
};
}
CommitState::CommitOperators {
ref mut execute_state,
ref mut state_cursor,
ref mut state_cursors,
} => {
let delta = return_and_restore_if_io!(
&mut self.commit_state,
state,
self.run_circuit(pager.clone(), execute_state, true, state_cursor)
self.run_circuit(execute_state, &pager, state_cursors, true,)
);
// Create view cursor when entering UpdateView state
@@ -434,7 +571,7 @@ impl DbspCircuit {
self.commit_state = CommitState::UpdateView {
delta,
current_index: 0,
write_row_state: WriteRow::new(),
write_row_state: WriteRowView::new(),
view_cursor,
};
}
@@ -453,7 +590,7 @@ impl DbspCircuit {
// If we're starting a new row (GetRecord state), we need a fresh cursor
// due to btree cursor state machine limitations
if matches!(write_row_state, WriteRow::GetRecord) {
if matches!(write_row_state, WriteRowView::GetRecord) {
*view_cursor = Box::new(BTreeCursor::new_table(
None,
pager.clone(),
@@ -493,7 +630,7 @@ impl DbspCircuit {
self.commit_state = CommitState::UpdateView {
delta,
current_index: *current_index + 1,
write_row_state: WriteRow::new(),
write_row_state: WriteRowView::new(),
view_cursor,
};
}
@@ -509,7 +646,7 @@ impl DbspCircuit {
pager: Rc<Pager>,
execute_state: &mut ExecuteState,
commit_operators: bool,
state_cursor: Option<&mut Box<BTreeCursor>>,
cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>> {
loop {
match execute_state {
@@ -577,12 +714,30 @@ impl DbspCircuit {
// Get the (node_id, state) pair for the current index
let (input_node_id, input_state) = &mut input_states[*current_index];
// Create temporary cursors for the recursive call
let temp_table_cursor = BTreeCursor::new_table(
None,
pager.clone(),
self.internal_state_root,
OPERATOR_COLUMNS,
);
let index_def = create_dbsp_state_index(self.internal_state_index_root);
let temp_index_cursor = BTreeCursor::new_index(
None,
pager.clone(),
self.internal_state_index_root,
&index_def,
3,
);
let mut temp_cursors =
DbspStateCursors::new(temp_table_cursor, temp_index_cursor);
let delta = return_if_io!(self.execute_node(
*input_node_id,
pager.clone(),
input_state,
commit_operators,
None // Input nodes don't need state cursor
&mut temp_cursors
));
input_deltas.push(delta);
*current_index += 1;
@@ -595,13 +750,8 @@ impl DbspCircuit {
.get_mut(&node_id)
.ok_or_else(|| LimboError::ParseError("Node not found".to_string()))?;
let output_delta = return_if_io!(node.process_node(
pager.clone(),
eval_state,
self.internal_state_root,
commit_operators,
state_cursor,
));
let output_delta =
return_if_io!(node.process_node(eval_state, commit_operators, cursors,));
return Ok(IOResult::Done(output_delta));
}
}
@@ -660,9 +810,17 @@ pub struct DbspCompiler {
impl DbspCompiler {
/// Create a new DBSP compiler
pub fn new(main_data_root: usize, internal_state_root: usize) -> Self {
pub fn new(
main_data_root: usize,
internal_state_root: usize,
internal_state_index_root: usize,
) -> Self {
Self {
circuit: DbspCircuit::new(main_data_root, internal_state_root),
circuit: DbspCircuit::new(
main_data_root,
internal_state_root,
internal_state_index_root,
),
}
}
@@ -1252,7 +1410,7 @@ mod tests {
}};
}
fn setup_btree_for_circuit() -> (Rc<Pager>, usize, usize) {
fn setup_btree_for_circuit() -> (Rc<Pager>, usize, usize, usize) {
let io: Arc<dyn IO> = Arc::new(MemoryIO::new());
let db = Database::open_file(io.clone(), ":memory:", false, false).unwrap();
let conn = db.connect().unwrap();
@@ -1270,13 +1428,24 @@ mod tests {
.block(|| pager.btree_create(&CreateBTreeFlags::new_table()))
.unwrap() as usize;
(pager, main_root_page, dbsp_state_page)
let dbsp_state_index_page = pager
.io
.block(|| pager.btree_create(&CreateBTreeFlags::new_index()))
.unwrap() as usize;
(
pager,
main_root_page,
dbsp_state_page,
dbsp_state_index_page,
)
}
// Macro to compile SQL to DBSP circuit
macro_rules! compile_sql {
($sql:expr) => {{
let (pager, main_root_page, dbsp_state_page) = setup_btree_for_circuit();
let (pager, main_root_page, dbsp_state_page, dbsp_state_index_page) =
setup_btree_for_circuit();
let schema = test_schema!();
let mut parser = Parser::new($sql.as_bytes());
let cmd = parser
@@ -1289,7 +1458,7 @@ mod tests {
let mut builder = LogicalPlanBuilder::new(&schema);
let logical_plan = builder.build_statement(&stmt).unwrap();
(
DbspCompiler::new(main_root_page, dbsp_state_page)
DbspCompiler::new(main_root_page, dbsp_state_page, dbsp_state_index_page)
.compile(&logical_plan)
.unwrap(),
pager,
@@ -3162,10 +3331,10 @@ mod tests {
#[test]
fn test_circuit_rowid_update_consolidation() {
let (pager, p1, p2) = setup_btree_for_circuit();
let (pager, p1, p2, p3) = setup_btree_for_circuit();
// Test that circuit properly consolidates state when rowid changes
let mut circuit = DbspCircuit::new(p1, p2);
let mut circuit = DbspCircuit::new(p1, p2, p3);
// Create a simple filter node
let schema = Arc::new(LogicalSchema::new(vec![

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,7 @@
use crate::incremental::operator::{AggregateFunction, AggregateState};
use crate::incremental::operator::{AggregateFunction, AggregateState, DbspStateCursors};
use crate::storage::btree::{BTreeCursor, BTreeKey};
use crate::types::{IOResult, SeekKey, SeekOp, SeekResult};
use crate::{return_if_io, Result, Value};
use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult};
use crate::{return_if_io, LimboError, Result, Value};
#[derive(Debug, Default)]
pub enum ReadRecord {
@@ -32,21 +32,22 @@ impl ReadRecord {
} else {
let record = return_if_io!(cursor.record());
let r = record.ok_or_else(|| {
crate::LimboError::InternalError(format!(
LimboError::InternalError(format!(
"Found key {key:?} in aggregate storage but could not read record"
))
})?;
let values = r.get_values();
let blob = values[1].to_owned();
// The blob is in column 3: operator_id, zset_id, element_id, value, weight
let blob = values[3].to_owned();
let (state, _group_key) = match blob {
Value::Blob(blob) => AggregateState::from_blob(&blob, aggregates)
.ok_or_else(|| {
crate::LimboError::InternalError(format!(
LimboError::InternalError(format!(
"Cannot deserialize aggregate state {blob:?}",
))
}),
_ => Err(crate::LimboError::ParseError(
_ => Err(LimboError::ParseError(
"Value in aggregator not blob".to_string(),
)),
}?;
@@ -63,8 +64,22 @@ impl ReadRecord {
pub enum WriteRow {
#[default]
GetRecord,
Delete,
Insert {
Delete {
rowid: i64,
},
DeleteIndex,
ComputeNewRowId {
final_weight: isize,
},
InsertNew {
rowid: i64,
final_weight: isize,
},
InsertIndex {
rowid: i64,
},
UpdateExisting {
rowid: i64,
final_weight: isize,
},
Done,
@@ -75,97 +90,192 @@ impl WriteRow {
Self::default()
}
/// Write a row with weight management.
/// Write a row with weight management using index for lookups.
///
/// # Arguments
/// * `cursor` - BTree cursor for the storage
/// * `key` - The key to seek (TableRowId)
/// * `build_record` - Function that builds the record values to insert.
/// Takes the final_weight and returns the complete record values.
/// * `cursors` - DBSP state cursors (table and index)
/// * `index_key` - The key to seek in the index
/// * `record_values` - The record values (without weight) to insert
/// * `weight` - The weight delta to apply
pub fn write_row<F>(
pub fn write_row(
&mut self,
cursor: &mut BTreeCursor,
key: SeekKey,
build_record: F,
cursors: &mut DbspStateCursors,
index_key: Vec<Value>,
record_values: Vec<Value>,
weight: isize,
) -> Result<IOResult<()>>
where
F: Fn(isize) -> Vec<Value>,
{
) -> Result<IOResult<()>> {
loop {
match self {
WriteRow::GetRecord => {
let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true }));
// First, seek in the index to find if the row exists
let index_values = index_key.clone();
let index_record =
ImmutableRecord::from_values(&index_values, index_values.len());
let res = return_if_io!(cursors.index_cursor.seek(
SeekKey::IndexKey(&index_record),
SeekOp::GE { eq_only: true }
));
if !matches!(res, SeekResult::Found) {
*self = WriteRow::Insert {
// Row doesn't exist, we'll insert a new one
*self = WriteRow::ComputeNewRowId {
final_weight: weight,
};
} else {
let existing_record = return_if_io!(cursor.record());
// Found in index, get the rowid it points to
let rowid = return_if_io!(cursors.index_cursor.rowid());
let rowid = rowid.ok_or_else(|| {
LimboError::InternalError(
"Index cursor does not have a valid rowid".to_string(),
)
})?;
// Now seek in the table using the rowid
let table_res = return_if_io!(cursors
.table_cursor
.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true }));
if !matches!(table_res, SeekResult::Found) {
return Err(LimboError::InternalError(
"Index points to non-existent table row".to_string(),
));
}
let existing_record = return_if_io!(cursors.table_cursor.record());
let r = existing_record.ok_or_else(|| {
crate::LimboError::InternalError(format!(
"Found key {key:?} in storage but could not read record"
))
LimboError::InternalError(
"Found rowid in table but could not read record".to_string(),
)
})?;
let values = r.get_values();
// Weight is always the last value
let existing_weight = match values.last() {
// Weight is always the last value (column 4 in our 5-column structure)
let existing_weight = match values.get(4) {
Some(val) => match val.to_owned() {
Value::Integer(w) => w as isize,
_ => {
return Err(crate::LimboError::InternalError(format!(
"Invalid weight value in storage for key {key:?}"
)))
return Err(LimboError::InternalError(
"Invalid weight value in storage".to_string(),
))
}
},
None => {
return Err(crate::LimboError::InternalError(format!(
"No weight value found in storage for key {key:?}"
)))
return Err(LimboError::InternalError(
"No weight value found in storage".to_string(),
))
}
};
let final_weight = existing_weight + weight;
if final_weight <= 0 {
*self = WriteRow::Delete
*self = WriteRow::Delete { rowid }
} else {
*self = WriteRow::Insert { final_weight }
// Store the rowid for update
*self = WriteRow::UpdateExisting {
rowid,
final_weight,
}
}
}
}
WriteRow::Delete => {
WriteRow::Delete { rowid } => {
// Seek to the row and delete it
return_if_io!(cursors
.table_cursor
.seek(SeekKey::TableRowId(*rowid), SeekOp::GE { eq_only: true }));
// Transition to DeleteIndex to also delete the index entry
*self = WriteRow::DeleteIndex;
return_if_io!(cursors.table_cursor.delete());
}
WriteRow::DeleteIndex => {
// Mark as Done before delete to avoid retry on I/O
*self = WriteRow::Done;
return_if_io!(cursor.delete());
return_if_io!(cursors.index_cursor.delete());
}
WriteRow::Insert { final_weight } => {
return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true }));
// Extract the row ID from the key
let key_i64 = match key {
SeekKey::TableRowId(id) => id,
_ => {
return Err(crate::LimboError::InternalError(
"Expected TableRowId for storage".to_string(),
))
WriteRow::ComputeNewRowId { final_weight } => {
// Find the last rowid to compute the next one
return_if_io!(cursors.table_cursor.last());
let rowid = if cursors.table_cursor.is_empty() {
1
} else {
match return_if_io!(cursors.table_cursor.rowid()) {
Some(id) => id + 1,
None => {
return Err(LimboError::InternalError(
"Table cursor has rows but no valid rowid".to_string(),
))
}
}
};
// Build the record values using the provided function
let record_values = build_record(*final_weight);
// Transition to InsertNew with the computed rowid
*self = WriteRow::InsertNew {
rowid,
final_weight: *final_weight,
};
}
WriteRow::InsertNew {
rowid,
final_weight,
} => {
let rowid_val = *rowid;
let final_weight_val = *final_weight;
// Seek to where we want to insert
// The insert will position the cursor correctly
return_if_io!(cursors.table_cursor.seek(
SeekKey::TableRowId(rowid_val),
SeekOp::GE { eq_only: false }
));
// Build the complete record with weight
// Use the function parameter record_values directly
let mut complete_record = record_values.clone();
complete_record.push(Value::Integer(final_weight_val as i64));
// Create an ImmutableRecord from the values
let immutable_record = crate::types::ImmutableRecord::from_values(
&record_values,
record_values.len(),
);
let btree_key = BTreeKey::new_table_rowid(key_i64, Some(&immutable_record));
let immutable_record =
ImmutableRecord::from_values(&complete_record, complete_record.len());
let btree_key = BTreeKey::new_table_rowid(rowid_val, Some(&immutable_record));
// Transition to InsertIndex state after table insertion
*self = WriteRow::InsertIndex { rowid: rowid_val };
return_if_io!(cursors.table_cursor.insert(&btree_key));
}
WriteRow::InsertIndex { rowid } => {
// For has_rowid indexes, we need to append the rowid to the index key
// Use the function parameter index_key directly
let mut index_values = index_key.clone();
index_values.push(Value::Integer(*rowid));
// Create the index record with the rowid appended
let index_record =
ImmutableRecord::from_values(&index_values, index_values.len());
let index_btree_key = BTreeKey::new_index_key(&index_record);
// Mark as Done before index insert to avoid retry on I/O
*self = WriteRow::Done;
return_if_io!(cursors.index_cursor.insert(&index_btree_key));
}
WriteRow::UpdateExisting {
rowid,
final_weight,
} => {
// Build the complete record with weight
let mut complete_record = record_values.clone();
complete_record.push(Value::Integer(*final_weight as i64));
// Create an ImmutableRecord from the values
let immutable_record =
ImmutableRecord::from_values(&complete_record, complete_record.len());
let btree_key = BTreeKey::new_table_rowid(*rowid, Some(&immutable_record));
// Mark as Done before insert to avoid retry on I/O
*self = WriteRow::Done;
return_if_io!(cursor.insert(&btree_key));
// BTree insert with existing key will replace the old value
return_if_io!(cursors.table_cursor.insert(&btree_key));
}
WriteRow::Done => {
return Ok(IOResult::Done(()));

View File

@@ -206,6 +206,7 @@ impl IncrementalView {
schema: &Schema,
main_data_root: usize,
internal_state_root: usize,
internal_state_index_root: usize,
) -> Result<DbspCircuit> {
// Build the logical plan from the SELECT statement
let mut builder = LogicalPlanBuilder::new(schema);
@@ -214,7 +215,11 @@ impl IncrementalView {
let logical_plan = builder.build_statement(&stmt)?;
// Compile the logical plan to a DBSP circuit with the storage roots
let compiler = DbspCompiler::new(main_data_root, internal_state_root);
let compiler = DbspCompiler::new(
main_data_root,
internal_state_root,
internal_state_index_root,
);
let circuit = compiler.compile(&logical_plan)?;
Ok(circuit)
@@ -271,6 +276,7 @@ impl IncrementalView {
schema: &Schema,
main_data_root: usize,
internal_state_root: usize,
internal_state_index_root: usize,
) -> Result<Self> {
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next_cmd()?;
@@ -287,6 +293,7 @@ impl IncrementalView {
schema,
main_data_root,
internal_state_root,
internal_state_index_root,
),
_ => Err(LimboError::ParseError(format!(
"View is not a CREATE MATERIALIZED VIEW statement: {sql}"
@@ -300,6 +307,7 @@ impl IncrementalView {
schema: &Schema,
main_data_root: usize,
internal_state_root: usize,
internal_state_index_root: usize,
) -> Result<Self> {
let name = view_name.name.as_str().to_string();
@@ -327,6 +335,7 @@ impl IncrementalView {
schema,
main_data_root,
internal_state_root,
internal_state_index_root,
)
}
@@ -340,13 +349,19 @@ impl IncrementalView {
schema: &Schema,
main_data_root: usize,
internal_state_root: usize,
internal_state_index_root: usize,
) -> Result<Self> {
// Create the tracker that will be shared by all operators
let tracker = Arc::new(Mutex::new(ComputationTracker::new()));
// Compile the SELECT statement into a DBSP circuit
let circuit =
Self::try_compile_circuit(&select_stmt, schema, main_data_root, internal_state_root)?;
let circuit = Self::try_compile_circuit(
&select_stmt,
schema,
main_data_root,
internal_state_root,
internal_state_index_root,
)?;
Ok(Self {
name,

View File

@@ -306,6 +306,8 @@ impl Schema {
// Store DBSP state table root pages: view_name -> dbsp_state_root_page
let mut dbsp_state_roots: HashMap<String, usize> = HashMap::new();
// Store DBSP state table index root pages: view_name -> dbsp_state_index_root_page
let mut dbsp_state_index_roots: HashMap<String, usize> = HashMap::new();
// Store materialized view info (SQL and root page) for later creation
let mut materialized_view_info: HashMap<String, (String, usize)> = HashMap::new();
@@ -357,6 +359,7 @@ impl Schema {
&mut from_sql_indexes,
&mut automatic_indices,
&mut dbsp_state_roots,
&mut dbsp_state_index_roots,
&mut materialized_view_info,
)?;
drop(record_cursor);
@@ -369,7 +372,11 @@ impl Schema {
self.populate_indices(from_sql_indexes, automatic_indices)?;
self.populate_materialized_views(materialized_view_info, dbsp_state_roots)?;
self.populate_materialized_views(
materialized_view_info,
dbsp_state_roots,
dbsp_state_index_roots,
)?;
Ok(())
}
@@ -492,6 +499,7 @@ impl Schema {
&mut self,
materialized_view_info: std::collections::HashMap<String, (String, usize)>,
dbsp_state_roots: std::collections::HashMap<String, usize>,
dbsp_state_index_roots: std::collections::HashMap<String, usize>,
) -> Result<()> {
for (view_name, (sql, main_root)) in materialized_view_info {
// Look up the DBSP state root for this view - must exist for materialized views
@@ -501,9 +509,17 @@ impl Schema {
))
})?;
// Create the IncrementalView with both root pages
let incremental_view =
IncrementalView::from_sql(&sql, self, main_root, *dbsp_state_root)?;
// Look up the DBSP state index root (may not exist for older schemas)
let dbsp_state_index_root =
dbsp_state_index_roots.get(&view_name).copied().unwrap_or(0);
// Create the IncrementalView with all root pages
let incremental_view = IncrementalView::from_sql(
&sql,
self,
main_root,
*dbsp_state_root,
dbsp_state_index_root,
)?;
let referenced_tables = incremental_view.get_referenced_table_names();
// Create a BTreeTable for the materialized view
@@ -539,6 +555,7 @@ impl Schema {
from_sql_indexes: &mut Vec<UnparsedFromSqlIndex>,
automatic_indices: &mut std::collections::HashMap<String, Vec<(String, usize)>>,
dbsp_state_roots: &mut std::collections::HashMap<String, usize>,
dbsp_state_index_roots: &mut std::collections::HashMap<String, usize>,
materialized_view_info: &mut std::collections::HashMap<String, (String, usize)>,
) -> Result<()> {
match ty {
@@ -593,12 +610,23 @@ impl Schema {
// index|sqlite_autoindex_foo_1|foo|3|
let index_name = name.to_string();
let table_name = table_name.to_string();
match automatic_indices.entry(table_name) {
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(vec![(index_name, root_page as usize)]);
}
std::collections::hash_map::Entry::Occupied(mut e) => {
e.get_mut().push((index_name, root_page as usize));
// Check if this is an index for a DBSP state table
if table_name.starts_with(DBSP_TABLE_PREFIX) {
// Extract the view name from __turso_internal_dbsp_state_<viewname>
let view_name = table_name
.strip_prefix(DBSP_TABLE_PREFIX)
.unwrap()
.to_string();
dbsp_state_index_roots.insert(view_name, root_page as usize);
} else {
match automatic_indices.entry(table_name) {
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(vec![(index_name, root_page as usize)]);
}
std::collections::hash_map::Entry::Occupied(mut e) => {
e.get_mut().push((index_name, root_page as usize));
}
}
}
}

View File

@@ -2,7 +2,7 @@ use crate::schema::{Schema, DBSP_TABLE_PREFIX};
use crate::storage::pager::CreateBTreeFlags;
use crate::translate::emitter::Resolver;
use crate::translate::schema::{emit_schema_entry, SchemaEntryType, SQLITE_TABLEID};
use crate::util::normalize_ident;
use crate::util::{normalize_ident, PRIMARY_KEY_AUTOMATIC_INDEX_NAME_PREFIX};
use crate::vdbe::builder::{CursorType, ProgramBuilder};
use crate::vdbe::insn::{CmpInsFlags, Cookie, Insn, RegisterOrLiteral};
use crate::{Connection, Result, SymbolTable};
@@ -141,7 +141,20 @@ pub fn translate_create_materialized_view(
// Add the DBSP state table to sqlite_master (required for materialized views)
let dbsp_table_name = format!("{DBSP_TABLE_PREFIX}{normalized_view_name}");
let dbsp_sql = format!("CREATE TABLE {dbsp_table_name} (key INTEGER PRIMARY KEY, state BLOB)");
// The element_id column uses SQLite's dynamic typing system to store different value types:
// - For hash-based operators (joins, filters): stores INTEGER hash values or rowids
// - For future MIN/MAX operators: stores the actual values being compared (INTEGER, REAL, TEXT, BLOB)
// SQLite's type affinity and sorting rules ensure correct ordering within each operator's data
let dbsp_sql = format!(
"CREATE TABLE {dbsp_table_name} (\
operator_id INTEGER NOT NULL, \
zset_id INTEGER NOT NULL, \
element_id NOT NULL, \
value BLOB, \
weight INTEGER NOT NULL, \
PRIMARY KEY (operator_id, zset_id, element_id)\
)"
);
emit_schema_entry(
&mut program,
@@ -155,11 +168,37 @@ pub fn translate_create_materialized_view(
Some(dbsp_sql),
)?;
// Create automatic primary key index for the DBSP table
// Since the table has PRIMARY KEY (operator_id, zset_id, element_id), we need an index
let dbsp_index_root_reg = program.alloc_register();
program.emit_insn(Insn::CreateBtree {
db: 0,
root: dbsp_index_root_reg,
flags: CreateBTreeFlags::new_index(),
});
// Register the index in sqlite_schema
let dbsp_index_name = format!(
"{}{}_1",
PRIMARY_KEY_AUTOMATIC_INDEX_NAME_PREFIX, &dbsp_table_name
);
emit_schema_entry(
&mut program,
&resolver,
sqlite_schema_cursor_id,
None, // cdc_table_cursor_id
SchemaEntryType::Index,
&dbsp_index_name,
&dbsp_table_name,
dbsp_index_root_reg,
None, // Automatic indexes don't store SQL
)?;
// Parse schema to load the new view and DBSP state table
program.emit_insn(Insn::ParseSchema {
db: sqlite_schema_cursor_id,
where_clause: Some(format!(
"name = '{normalized_view_name}' OR name = '{dbsp_table_name}'"
"name = '{normalized_view_name}' OR name = '{dbsp_table_name}' OR name = '{dbsp_index_name}'"
)),
});

View File

@@ -163,6 +163,9 @@ pub fn parse_schema_rows(
// Store DBSP state table root pages: view_name -> dbsp_state_root_page
let mut dbsp_state_roots: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
// Store DBSP state table index root pages: view_name -> dbsp_state_index_root_page
let mut dbsp_state_index_roots: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
// Store materialized view info (SQL and root page) for later creation
let mut materialized_view_info: std::collections::HashMap<String, (String, usize)> =
std::collections::HashMap::new();
@@ -185,8 +188,9 @@ pub fn parse_schema_rows(
&mut from_sql_indexes,
&mut automatic_indices,
&mut dbsp_state_roots,
&mut dbsp_state_index_roots,
&mut materialized_view_info,
)?;
)?
}
StepResult::IO => {
// TODO: How do we ensure that the I/O we submitted to
@@ -200,7 +204,11 @@ pub fn parse_schema_rows(
}
schema.populate_indices(from_sql_indexes, automatic_indices)?;
schema.populate_materialized_views(materialized_view_info, dbsp_state_roots)?;
schema.populate_materialized_views(
materialized_view_info,
dbsp_state_roots,
dbsp_state_index_roots,
)?;
Ok(())
}