Merge 'btree index selection on rightmost pointer in balance_non_root' from Pere Diaz Bou

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #1297
This commit is contained in:
Pekka Enberg
2025-04-10 18:39:51 +03:00
5 changed files with 85 additions and 29 deletions

3
Cargo.lock generated
View File

@@ -1901,6 +1901,9 @@ dependencies = [
"limbo",
"serde_json",
"tokio",
"tracing",
"tracing-appender",
"tracing-subscriber",
]
[[package]]

View File

@@ -1179,6 +1179,7 @@ impl BTreeCursor {
pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result<CursorResult<()>> {
assert!(self.mv_cursor.is_none());
tracing::trace!("move_to(key={:?} cmp={:?})", key, cmp);
tracing::trace!("backtrace: {}", std::backtrace::Backtrace::force_capture());
// For a table with N rows, we can find any row by row id in O(log(N)) time by starting at the root page and following the B-tree pointers.
// B-trees consist of interior pages and leaf pages. Interior pages contain pointers to other pages, while leaf pages contain the actual row data.
//
@@ -1630,12 +1631,6 @@ impl BTreeCursor {
let write_info = self.state.mut_write_info().unwrap();
write_info.state = WriteState::BalanceNonRoot;
self.stack.pop();
// with `move_to` we advance the current cell idx of TableInterior once we move to left subtree.
// On the other hand, with IndexInterior, we do not because we tranver in-order. In the latter case
// since we haven't consumed the cell we can avoid retreating the current cell index.
if matches!(current_page.get_contents().page_type(), PageType::TableLeaf) {
self.stack.retreat();
}
return_if_io!(self.balance_non_root());
}
WriteState::BalanceNonRoot | WriteState::BalanceNonRootWaitLoadPages => {
@@ -1660,10 +1655,14 @@ impl BTreeCursor {
WriteState::BalanceStart => todo!(),
WriteState::BalanceNonRoot => {
let parent_page = self.stack.top();
if parent_page.is_locked() {
return Ok(CursorResult::IO);
}
return_if_locked_maybe_load!(self.pager, parent_page);
// If `move_to` moved to rightmost page, cell index will be out of bounds. Meaning cell_count+1.
// In any other case, `move_to` will stay in the correct index.
if self.stack.current_cell_index() as usize
== parent_page.get_contents().cell_count() + 1
{
self.stack.retreat();
}
parent_page.set_dirty();
self.pager.add_dirty(parent_page.get().id);
let parent_contents = parent_page.get().contents.as_ref().unwrap();
@@ -2535,6 +2534,7 @@ impl BTreeCursor {
// Let's now make a in depth check that we in fact added all possible cells somewhere and they are not lost
for (page_idx, page) in pages_to_balance_new.iter().enumerate() {
let contents = page.get_contents();
debug_validate_cells!(contents, self.usable_space() as u16);
// Cells are distributed in order
for cell_idx in 0..contents.cell_count() {
let (cell_start, cell_len) = contents.cell_get_raw_region(
@@ -2871,6 +2871,7 @@ impl BTreeCursor {
&mut child_contents.overflow_cells,
&mut root_contents.overflow_cells,
);
root_contents.overflow_cells.clear();
// 2. Modify root
let new_root_page_type = match root_contents.page_type() {
@@ -3133,6 +3134,7 @@ impl BTreeCursor {
key: &BTreeKey,
moved_before: bool, /* Indicate whether it's necessary to traverse to find the leaf page */
) -> Result<CursorResult<()>> {
tracing::trace!("insert");
match &self.mv_cursor {
Some(mv_cursor) => match key.maybe_rowid() {
Some(rowid) => {
@@ -3144,6 +3146,7 @@ impl BTreeCursor {
None => todo!("Support mvcc inserts with index btrees"),
},
None => {
tracing::trace!("moved {}", moved_before);
if !moved_before {
self.iteration_state = IterationState::Iterating(IterationDirection::Forwards);
match key {
@@ -4368,7 +4371,11 @@ fn free_cell_range(
}
}
if removed_fragmentation > page.num_frag_free_bytes() {
return_corrupt!("Invalid fragmentation count");
return_corrupt!(format!(
"Invalid fragmentation count. Had {} and removed {}",
page.num_frag_free_bytes(),
removed_fragmentation
));
}
let frag = page.num_frag_free_bytes() - removed_fragmentation;
page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, frag);

View File

@@ -3798,6 +3798,7 @@ pub fn op_idx_insert_async(
pager: &Rc<Pager>,
mv_store: Option<&Rc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
dbg!("op_idx_insert_async");
if let Insn::IdxInsertAsync {
cursor_id,
record_reg,
@@ -3816,29 +3817,29 @@ pub fn op_idx_insert_async(
Register::Record(ref r) => r,
_ => return Err(LimboError::InternalError("expected record".into())),
};
let moved_before = if index_meta.unique {
// check for uniqueness violation
match cursor.key_exists_in_index(record)? {
CursorResult::Ok(true) => {
return Err(LimboError::Constraint(
"UNIQUE constraint failed: duplicate key".into(),
))
}
CursorResult::IO => return Ok(InsnFunctionStepResult::IO),
CursorResult::Ok(false) => {}
};
false
} else {
flags.has(IdxInsertFlags::USE_SEEK)
};
// To make this reentrant in case of `moved_before` = false, we need to check if the previous cursor.insert started
// a write/balancing operation. If it did, it means we already moved to the place we wanted.
let moved_before = if cursor.is_write_in_progress() {
true
} else {
moved_before
if index_meta.unique {
// check for uniqueness violation
match cursor.key_exists_in_index(record)? {
CursorResult::Ok(true) => {
return Err(LimboError::Constraint(
"UNIQUE constraint failed: duplicate key".into(),
))
}
CursorResult::IO => return Ok(InsnFunctionStepResult::IO),
CursorResult::Ok(false) => {}
};
false
} else {
flags.has(IdxInsertFlags::USE_SEEK)
}
};
dbg!(moved_before);
// Start insertion of row. This might trigger a balance procedure which will take care of moving to different pages,
// therefore, we don't want to seek again if that happens, meaning we don't want to return on io without moving to `Await` opcode
// because it could trigger a movement to child page after a balance root which will leave the current page as the root page.

View File

@@ -22,3 +22,6 @@ serde_json = "1.0.139"
tokio = { version = "1.29.1", features = ["full"] }
anarchist-readable-name-generator-lib = "0.1.0"
hex = "0.4"
tracing = "0.1.41"
tracing-appender = "0.2.3"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }

View File

@@ -4,6 +4,7 @@ use anarchist_readable_name_generator_lib::readable_name_custom;
use antithesis_sdk::random::{get_random, AntithesisRng};
use antithesis_sdk::*;
use clap::Parser;
use core::panic;
use hex;
use limbo::Builder;
use opts::Opts;
@@ -12,6 +13,10 @@ use std::collections::HashSet;
use std::fs::File;
use std::io::{Read, Write};
use std::sync::Arc;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
pub struct Plan {
pub ddl_statements: Vec<String>,
@@ -365,8 +370,27 @@ fn read_plan_from_log_file(opts: &Opts) -> Result<Plan, Box<dyn std::error::Erro
Ok(plan)
}
pub fn init_tracing() -> Result<WorkerGuard, std::io::Error> {
let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stderr());
if let Err(e) = tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.with_writer(non_blocking)
.with_ansi(false)
.with_line_number(true)
.with_thread_ids(true),
)
.with(EnvFilter::from_default_env())
.try_init()
{
println!("Unable to setup tracing appender: {:?}", e);
}
Ok(guard)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let _g = init_tracing()?;
let (num_nodes, main_id) = (1, "n-001");
let startup_data = json!({
"num_nodes": num_nodes,
@@ -395,7 +419,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
for stmt in &plan.ddl_statements {
println!("executing ddl {}", stmt);
if let Err(e) = conn.execute(stmt, ()).await {
println!("Error creating table: {}", e);
match e {
limbo::Error::SqlExecutionFailure(e) => {
if e.contains("Corrupt database") {
panic!("Error creating table: {}", e);
} else {
println!("Error creating table: {}", e);
}
}
_ => panic!("Error creating table: {}", e),
}
}
}
@@ -408,7 +441,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let sql = &plan.queries_per_thread[thread][query_index];
println!("executing: {}", sql);
if let Err(e) = conn.execute(&sql, ()).await {
println!("Error: {}", e);
match e {
limbo::Error::SqlExecutionFailure(e) => {
if e.contains("Corrupt database") {
panic!("Error executing query: {}", e);
} else {
println!("Error executing query: {}", e);
}
}
_ => panic!("Error executing query: {}", e),
}
}
}
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(())