mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-01 15:34:19 +01:00
Merge 'Adjust write cursors for delete to avoid opening more than once. ' from Pedro Muniz
We were opening Write cursors inside of the rewind loop, which meant that we were opening the same cursor multiple times per iteration. Also, in this case, we need to open all indexes before the loop starts, and avoid opening the same cursor again. While I was trying to debug this, I also added instrumentation to some functions, so that I could track where the instructions were being emitted. This helped me a lot. The `#[instrument]` macro creates a span for the particular function, which enables us to examine the path our functions are taking when any log is emitted. Lastly, I fixed an explain bug, where it was unwrapping a None `CursorKey` when opening the `sqlite_schema` table. Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com> Closes #1640
This commit is contained in:
@@ -76,6 +76,7 @@ use storage::{
|
||||
pager::allocate_page,
|
||||
sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE},
|
||||
};
|
||||
use tracing::{instrument, Level};
|
||||
use translate::select::prepare_select_plan;
|
||||
pub use types::RefValue;
|
||||
pub use types::Value;
|
||||
@@ -339,6 +340,7 @@ pub struct Connection {
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn prepare(self: &Rc<Connection>, sql: impl AsRef<str>) -> Result<Statement> {
|
||||
if sql.as_ref().is_empty() {
|
||||
return Err(LimboError::InvalidArgument(
|
||||
@@ -377,6 +379,7 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn query(self: &Rc<Connection>, sql: impl AsRef<str>) -> Result<Option<Statement>> {
|
||||
let sql = sql.as_ref();
|
||||
tracing::trace!("Querying: {}", sql);
|
||||
@@ -388,6 +391,7 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub(crate) fn run_cmd(self: &Rc<Connection>, cmd: Cmd) -> Result<Option<Statement>> {
|
||||
let syms = self.syms.borrow();
|
||||
match cmd {
|
||||
@@ -448,6 +452,7 @@ impl Connection {
|
||||
|
||||
/// Execute will run a query from start to finish taking ownership of I/O because it will run pending I/Os if it didn't finish.
|
||||
/// TODO: make this api async
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn execute(self: &Rc<Connection>, sql: impl AsRef<str>) -> Result<()> {
|
||||
let sql = sql.as_ref();
|
||||
let mut parser = Parser::new(sql.as_bytes());
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
|
||||
use limbo_sqlite3_parser::ast::{self, SortOrder};
|
||||
use tracing::{instrument, Level};
|
||||
|
||||
use super::aggregation::emit_ungrouped_aggregation;
|
||||
use super::expr::{translate_condition_expr, translate_expr, ConditionMetadata};
|
||||
@@ -175,6 +176,7 @@ pub enum TransactionMode {
|
||||
|
||||
/// Main entry point for emitting bytecode for a SQL query
|
||||
/// Takes a query plan and generates the corresponding bytecode program
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn emit_program(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: Plan,
|
||||
@@ -191,6 +193,7 @@ pub fn emit_program(
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn emit_program_for_compound_select(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: Plan,
|
||||
@@ -465,6 +468,7 @@ fn read_deduplicated_union_rows(
|
||||
});
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn emit_program_for_select(
|
||||
program: &mut ProgramBuilder,
|
||||
mut plan: SelectPlan,
|
||||
@@ -503,6 +507,7 @@ fn emit_program_for_select(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn emit_query<'a>(
|
||||
program: &'a mut ProgramBuilder,
|
||||
plan: &'a mut SelectPlan,
|
||||
@@ -641,6 +646,7 @@ pub fn emit_query<'a>(
|
||||
Ok(t_ctx.reg_result_cols_start.unwrap())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn emit_program_for_delete(
|
||||
program: &mut ProgramBuilder,
|
||||
mut plan: DeletePlan,
|
||||
@@ -684,6 +690,35 @@ fn emit_program_for_delete(
|
||||
OperationMode::DELETE,
|
||||
)?;
|
||||
|
||||
// Open indexes for delete.
|
||||
// Order is important. This segment of code must be run before `open_loop`
|
||||
|
||||
let table_ref = plan.table_references.joined_tables().first().unwrap();
|
||||
let index_references = plan
|
||||
.indexes
|
||||
.iter()
|
||||
.map(|index| {
|
||||
if table_ref
|
||||
.op
|
||||
.index()
|
||||
.is_some_and(|table_ref_index| table_ref_index.name == index.name)
|
||||
{
|
||||
(
|
||||
index.clone(),
|
||||
program
|
||||
.resolve_cursor_id(&CursorKey::index(table_ref.internal_id, index.clone())),
|
||||
)
|
||||
} else {
|
||||
(
|
||||
index.clone(),
|
||||
program.alloc_cursor_id(CursorType::BTreeIndex(index.clone())),
|
||||
)
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(index_references.len(), plan.indexes.len());
|
||||
|
||||
// Set up main query execution loop
|
||||
open_loop(
|
||||
program,
|
||||
@@ -692,7 +727,13 @@ fn emit_program_for_delete(
|
||||
&[JoinOrderMember::default()],
|
||||
&mut plan.where_clause,
|
||||
)?;
|
||||
emit_delete_insns(program, &mut t_ctx, &plan.table_references, &plan.indexes)?;
|
||||
|
||||
emit_delete_insns(
|
||||
program,
|
||||
&mut t_ctx,
|
||||
&plan.table_references,
|
||||
&index_references,
|
||||
)?;
|
||||
|
||||
// Clean up and close the main execution loop
|
||||
close_loop(
|
||||
@@ -714,7 +755,7 @@ fn emit_delete_insns(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
table_references: &TableReferences,
|
||||
index_references: &[Arc<Index>],
|
||||
index_references: &[(Arc<Index>, usize)],
|
||||
) -> Result<()> {
|
||||
let table_reference = table_references.joined_tables().first().unwrap();
|
||||
let cursor_id = match &table_reference.op {
|
||||
@@ -759,14 +800,7 @@ fn emit_delete_insns(
|
||||
conflict_action,
|
||||
});
|
||||
} else {
|
||||
for index in index_references {
|
||||
let index_cursor_id = program.alloc_cursor_id(CursorType::BTreeIndex(index.clone()));
|
||||
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
cursor_id: index_cursor_id,
|
||||
root_page: RegisterOrLiteral::Literal(index.root_page),
|
||||
name: index.name.clone(),
|
||||
});
|
||||
for (index, index_cursor_id) in index_references {
|
||||
let num_regs = index.columns.len() + 1;
|
||||
let start_reg = program.alloc_registers(num_regs);
|
||||
// Emit columns that are part of the index
|
||||
@@ -788,7 +822,7 @@ fn emit_delete_insns(
|
||||
program.emit_insn(Insn::IdxDelete {
|
||||
start_reg,
|
||||
num_regs,
|
||||
cursor_id: index_cursor_id,
|
||||
cursor_id: *index_cursor_id,
|
||||
});
|
||||
}
|
||||
program.emit_insn(Insn::Delete {
|
||||
@@ -805,6 +839,7 @@ fn emit_delete_insns(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn emit_program_for_update(
|
||||
program: &mut ProgramBuilder,
|
||||
mut plan: UpdatePlan,
|
||||
@@ -893,6 +928,7 @@ fn emit_program_for_update(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn emit_update_insns(
|
||||
plan: &UpdatePlan,
|
||||
t_ctx: &TranslateCtx,
|
||||
|
||||
@@ -49,9 +49,11 @@ use schema::{
|
||||
use select::translate_select;
|
||||
use std::rc::{Rc, Weak};
|
||||
use std::sync::Arc;
|
||||
use tracing::{instrument, Level};
|
||||
use transaction::{translate_tx_begin, translate_tx_commit};
|
||||
use update::translate_update;
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn translate(
|
||||
schema: &Schema,
|
||||
stmt: ast::Stmt,
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::{
|
||||
};
|
||||
|
||||
use limbo_sqlite3_parser::ast::{self, TableInternalId};
|
||||
use tracing::{instrument, Level};
|
||||
|
||||
use crate::{
|
||||
fast_lock::SpinLock,
|
||||
@@ -284,8 +285,11 @@ impl ProgramBuilder {
|
||||
cursor
|
||||
}
|
||||
|
||||
#[instrument(skip(self), level = Level::TRACE)]
|
||||
pub fn emit_insn(&mut self, insn: Insn) {
|
||||
let function = insn.to_function();
|
||||
// This seemingly empty trace here is needed so that a function span is emmited with it
|
||||
tracing::trace!("");
|
||||
self.insns.push((insn, function, self.insns.len()));
|
||||
}
|
||||
|
||||
|
||||
@@ -365,14 +365,20 @@ pub fn insn_to_str(
|
||||
Value::build_text(""),
|
||||
0,
|
||||
{
|
||||
let cursor_key = program.cursor_ref[*cursor_id].0.as_ref().unwrap();
|
||||
let cursor_type =
|
||||
program.cursor_ref[*cursor_id]
|
||||
.0
|
||||
.as_ref()
|
||||
.map_or("", |cursor_key| {
|
||||
if cursor_key.index.is_some() {
|
||||
"index"
|
||||
} else {
|
||||
"table"
|
||||
}
|
||||
});
|
||||
format!(
|
||||
"{}={}, root={}",
|
||||
if cursor_key.index.is_some() {
|
||||
"index"
|
||||
} else {
|
||||
"table"
|
||||
},
|
||||
cursor_type,
|
||||
get_table_or_index_name(*cursor_id),
|
||||
root_page
|
||||
)
|
||||
@@ -386,16 +392,18 @@ pub fn insn_to_str(
|
||||
Value::build_text(""),
|
||||
0,
|
||||
{
|
||||
let cursor_key = program.cursor_ref[*cursor_id].0.as_ref().unwrap();
|
||||
format!(
|
||||
"{} {}",
|
||||
if cursor_key.index.is_some() {
|
||||
"index"
|
||||
} else {
|
||||
"table"
|
||||
},
|
||||
get_table_or_index_name(*cursor_id),
|
||||
)
|
||||
let cursor_type =
|
||||
program.cursor_ref[*cursor_id]
|
||||
.0
|
||||
.as_ref()
|
||||
.map_or("", |cursor_key| {
|
||||
if cursor_key.index.is_some() {
|
||||
"index"
|
||||
} else {
|
||||
"table"
|
||||
}
|
||||
});
|
||||
format!("{} {}", cursor_type, get_table_or_index_name(*cursor_id),)
|
||||
},
|
||||
),
|
||||
Insn::VCreate {
|
||||
@@ -497,14 +505,20 @@ pub fn insn_to_str(
|
||||
Value::build_text(""),
|
||||
0,
|
||||
{
|
||||
let cursor_key = program.cursor_ref[*cursor_id].0.as_ref().unwrap();
|
||||
let cursor_type =
|
||||
program.cursor_ref[*cursor_id]
|
||||
.0
|
||||
.as_ref()
|
||||
.map_or("", |cursor_key| {
|
||||
if cursor_key.index.is_some() {
|
||||
"index"
|
||||
} else {
|
||||
"table"
|
||||
}
|
||||
});
|
||||
format!(
|
||||
"Rewind {} {}",
|
||||
if cursor_key.index.is_some() {
|
||||
"index"
|
||||
} else {
|
||||
"table"
|
||||
},
|
||||
cursor_type,
|
||||
get_table_or_index_name(*cursor_id),
|
||||
)
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user