mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-17 23:24:19 +01:00
Extract the appending delete related opcodes to emit_delete_opcodes
This commit is contained in:
@@ -101,6 +101,15 @@ pub struct Metadata {
|
||||
pub result_columns_to_skip_in_orderby_sorter: Option<Vec<usize>>,
|
||||
}
|
||||
|
||||
/// Used to distinguish database operations
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum OperationMode {
|
||||
SELECT,
|
||||
INSERT,
|
||||
UPDATE,
|
||||
DELETE,
|
||||
}
|
||||
|
||||
/// Initialize the program with basic setup and return initial metadata and labels
|
||||
fn prologue() -> Result<(ProgramBuilder, Metadata, BranchOffset, BranchOffset)> {
|
||||
let mut program = ProgramBuilder::new();
|
||||
@@ -201,7 +210,12 @@ pub fn emit_program(
|
||||
if let Some(ref mut group_by) = plan.group_by {
|
||||
init_group_by(&mut program, group_by, &plan.aggregates, &mut metadata)?;
|
||||
}
|
||||
init_source(&mut program, &plan.source, &mut metadata)?;
|
||||
init_source(
|
||||
&mut program,
|
||||
&plan.source,
|
||||
&mut metadata,
|
||||
&OperationMode::SELECT,
|
||||
)?;
|
||||
|
||||
// Set up main query execution loop
|
||||
open_loop(
|
||||
@@ -294,7 +308,12 @@ pub fn emit_program_for_delete(
|
||||
};
|
||||
|
||||
// Initialize cursors and other resources needed for query execution
|
||||
init_source_for_delete(&mut program, &plan.source, &mut metadata)?;
|
||||
init_source(
|
||||
&mut program,
|
||||
&plan.source,
|
||||
&mut metadata,
|
||||
&OperationMode::DELETE,
|
||||
)?;
|
||||
|
||||
// Set up main query execution loop
|
||||
open_loop(
|
||||
@@ -304,8 +323,15 @@ pub fn emit_program_for_delete(
|
||||
&mut metadata,
|
||||
)?;
|
||||
|
||||
emit_delete_insns(&mut program, &plan.source)?;
|
||||
|
||||
// Close the loop and handle deletion
|
||||
close_loop_for_delete(&mut program, &plan.source, &mut metadata)?;
|
||||
close_loop(
|
||||
&mut program,
|
||||
&plan.source,
|
||||
&mut metadata,
|
||||
&plan.referenced_tables,
|
||||
)?;
|
||||
|
||||
if let Some(skip_loops_label) = skip_loops_label {
|
||||
program.resolve_label(skip_loops_label, program.offset());
|
||||
@@ -430,6 +456,7 @@ fn init_source(
|
||||
program: &mut ProgramBuilder,
|
||||
source: &SourceOperator,
|
||||
metadata: &mut Metadata,
|
||||
mode: &OperationMode,
|
||||
) -> Result<()> {
|
||||
match source {
|
||||
SourceOperator::Join {
|
||||
@@ -447,10 +474,10 @@ fn init_source(
|
||||
};
|
||||
metadata.left_joins.insert(*id, lj_metadata);
|
||||
}
|
||||
init_source(program, left, metadata)?;
|
||||
init_source(program, right, metadata)?;
|
||||
init_source(program, left, metadata, mode)?;
|
||||
init_source(program, right, metadata, mode)?;
|
||||
|
||||
return Ok(());
|
||||
Ok(())
|
||||
}
|
||||
SourceOperator::Scan {
|
||||
id,
|
||||
@@ -464,80 +491,27 @@ fn init_source(
|
||||
let root_page = table_reference.table.root_page;
|
||||
let next_row_label = program.allocate_label();
|
||||
metadata.next_row_labels.insert(*id, next_row_label);
|
||||
program.emit_insn(Insn::OpenReadAsync {
|
||||
cursor_id,
|
||||
root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenReadAwait);
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
SourceOperator::Search {
|
||||
id,
|
||||
table_reference,
|
||||
search,
|
||||
..
|
||||
} => {
|
||||
let table_cursor_id = program.alloc_cursor_id(
|
||||
Some(table_reference.table_identifier.clone()),
|
||||
Some(Table::BTree(table_reference.table.clone())),
|
||||
);
|
||||
|
||||
let next_row_label = program.allocate_label();
|
||||
|
||||
metadata.next_row_labels.insert(*id, next_row_label);
|
||||
|
||||
program.emit_insn(Insn::OpenReadAsync {
|
||||
cursor_id: table_cursor_id,
|
||||
root_page: table_reference.table.root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenReadAwait);
|
||||
|
||||
if let Search::IndexSearch { index, .. } = search {
|
||||
let index_cursor_id = program
|
||||
.alloc_cursor_id(Some(index.name.clone()), Some(Table::Index(index.clone())));
|
||||
program.emit_insn(Insn::OpenReadAsync {
|
||||
cursor_id: index_cursor_id,
|
||||
root_page: index.root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenReadAwait);
|
||||
match mode {
|
||||
OperationMode::SELECT => {
|
||||
program.emit_insn(Insn::OpenReadAsync {
|
||||
cursor_id,
|
||||
root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenReadAwait {});
|
||||
}
|
||||
OperationMode::DELETE => {
|
||||
program.emit_insn(Insn::OpenWriteAsync {
|
||||
cursor_id,
|
||||
root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
}
|
||||
_ => {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
SourceOperator::Nothing => {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn init_source_for_delete(
|
||||
program: &mut ProgramBuilder,
|
||||
source: &SourceOperator,
|
||||
metadata: &mut Metadata,
|
||||
) -> Result<()> {
|
||||
match source {
|
||||
SourceOperator::Join { .. } => {
|
||||
unreachable!()
|
||||
}
|
||||
SourceOperator::Scan {
|
||||
id,
|
||||
table_reference,
|
||||
..
|
||||
} => {
|
||||
let cursor_id = program.alloc_cursor_id(
|
||||
Some(table_reference.table_identifier.clone()),
|
||||
Some(Table::BTree(table_reference.table.clone())),
|
||||
);
|
||||
let root_page = table_reference.table.root_page;
|
||||
let next_row_label = program.allocate_label();
|
||||
metadata.next_row_labels.insert(*id, next_row_label);
|
||||
program.emit_insn(Insn::OpenWriteAsync {
|
||||
cursor_id,
|
||||
root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
SourceOperator::Search {
|
||||
@@ -555,20 +529,49 @@ fn init_source_for_delete(
|
||||
|
||||
metadata.next_row_labels.insert(*id, next_row_label);
|
||||
|
||||
program.emit_insn(Insn::OpenWriteAsync {
|
||||
cursor_id: table_cursor_id,
|
||||
root_page: table_reference.table.root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
match mode {
|
||||
OperationMode::SELECT => {
|
||||
program.emit_insn(Insn::OpenReadAsync {
|
||||
cursor_id: table_cursor_id,
|
||||
root_page: table_reference.table.root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenReadAwait {});
|
||||
}
|
||||
OperationMode::DELETE => {
|
||||
program.emit_insn(Insn::OpenWriteAsync {
|
||||
cursor_id: table_cursor_id,
|
||||
root_page: table_reference.table.root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
}
|
||||
_ => {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
if let Search::IndexSearch { index, .. } = search {
|
||||
let index_cursor_id = program
|
||||
.alloc_cursor_id(Some(index.name.clone()), Some(Table::Index(index.clone())));
|
||||
program.emit_insn(Insn::OpenWriteAsync {
|
||||
cursor_id: index_cursor_id,
|
||||
root_page: index.root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
|
||||
match mode {
|
||||
OperationMode::SELECT => {
|
||||
program.emit_insn(Insn::OpenReadAsync {
|
||||
cursor_id: index_cursor_id,
|
||||
root_page: index.root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenReadAwait);
|
||||
}
|
||||
OperationMode::DELETE => {
|
||||
program.emit_insn(Insn::OpenWriteAsync {
|
||||
cursor_id: index_cursor_id,
|
||||
root_page: index.root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
}
|
||||
_ => {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -1232,11 +1235,7 @@ fn close_loop(
|
||||
}
|
||||
}
|
||||
|
||||
fn close_loop_for_delete(
|
||||
program: &mut ProgramBuilder,
|
||||
source: &SourceOperator,
|
||||
metadata: &mut Metadata,
|
||||
) -> Result<()> {
|
||||
fn emit_delete_insns(program: &mut ProgramBuilder, source: &SourceOperator) -> Result<()> {
|
||||
match source {
|
||||
SourceOperator::Scan {
|
||||
id,
|
||||
@@ -1255,40 +1254,6 @@ fn close_loop_for_delete(
|
||||
program.emit_insn(Insn::DeleteAsync { cursor_id });
|
||||
program.emit_insn(Insn::DeleteAwait { cursor_id });
|
||||
|
||||
program.resolve_label(*metadata.next_row_labels.get(id).unwrap(), program.offset());
|
||||
|
||||
// Emit the NextAsync or PrevAsync instruction to continue the loop
|
||||
if iter_dir
|
||||
.as_ref()
|
||||
.is_some_and(|dir| *dir == IterationDirection::Backwards)
|
||||
{
|
||||
program.emit_insn(Insn::PrevAsync { cursor_id });
|
||||
} else {
|
||||
program.emit_insn(Insn::NextAsync { cursor_id });
|
||||
}
|
||||
let jump_label = metadata.scan_loop_body_labels.pop().unwrap();
|
||||
|
||||
// Emit the NextAwait or PrevAwait instruction with label dependency
|
||||
if iter_dir
|
||||
.as_ref()
|
||||
.is_some_and(|dir| *dir == IterationDirection::Backwards)
|
||||
{
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::PrevAwait {
|
||||
cursor_id,
|
||||
pc_if_next: jump_label,
|
||||
},
|
||||
jump_label,
|
||||
);
|
||||
} else {
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::NextAwait {
|
||||
cursor_id,
|
||||
pc_if_next: jump_label,
|
||||
},
|
||||
jump_label,
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
SourceOperator::Search {
|
||||
@@ -1313,22 +1278,6 @@ fn close_loop_for_delete(
|
||||
program.emit_insn(Insn::DeleteAsync { cursor_id });
|
||||
program.emit_insn(Insn::DeleteAwait { cursor_id });
|
||||
|
||||
// resolve labels after calling Delete opcodes
|
||||
program.resolve_label(*metadata.next_row_labels.get(id).unwrap(), program.offset());
|
||||
|
||||
// Emit the NextAsync instruction to continue the loop
|
||||
if !matches!(search, Search::RowidEq { .. }) {
|
||||
program.emit_insn(Insn::NextAsync { cursor_id });
|
||||
let jump_label = metadata.scan_loop_body_labels.pop().unwrap();
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::NextAwait {
|
||||
cursor_id,
|
||||
pc_if_next: jump_label,
|
||||
},
|
||||
jump_label,
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
_ => Ok(()),
|
||||
|
||||
Reference in New Issue
Block a user