Merge 'btree: Use correct byte offsets for page 1 in defragmentation ' from Jussi Saurio

## Beef
`defragment_page_fast()` incorrectly didn't use the version of
read/write methods on `PageContent` that does NOT add the 100 byte
database header into the requested byte offset.
this resulted in defragment of page 1 in reading 2nd/3rd freeblocks
from the wrong offset and reading/writing freeblock sizes and cell
offsets to the wrong location.
## Testing
Adds fuzz test for CREATE TABLE / DROP TABLE / ALTER TABLE, which I was
able to reproduce this with.

Closes #2491
This commit is contained in:
Pekka Enberg
2025-08-07 16:52:11 +03:00
committed by GitHub
2 changed files with 167 additions and 7 deletions

View File

@@ -6498,9 +6498,9 @@ fn defragment_page_fast(
turso_assert!(freeblock_1st <= usable_space as usize - FREEBLOCK_SIZE_MIN, "1st freeblock beyond usable space: freeblock_1st={freeblock_1st} usable_space={usable_space}");
turso_assert!(freeblock_2nd <= usable_space as usize - FREEBLOCK_SIZE_MIN, "2nd freeblock beyond usable space: freeblock_2nd={freeblock_2nd} usable_space={usable_space}");
let freeblock_1st_size = page.read_u16(freeblock_1st + 2) as usize;
let freeblock_1st_size = page.read_u16_no_offset(freeblock_1st + 2) as usize;
let freeblock_2nd_size = if freeblock_2nd > 0 {
page.read_u16(freeblock_2nd + 2) as usize
page.read_u16_no_offset(freeblock_2nd + 2) as usize
} else {
0
};
@@ -6552,19 +6552,19 @@ fn defragment_page_fast(
let cell_pointer_array_offset = page.cell_pointer_array_offset_and_size().0;
for i in 0..cell_count {
let ptr_offset = cell_pointer_array_offset + (i * CELL_PTR_SIZE_BYTES);
let cell_ptr = page.read_u16(ptr_offset) as usize;
let cell_ptr = page.read_u16_no_offset(ptr_offset) as usize;
if cell_ptr < freeblock_1st {
// If the cell pointer was located before the first freeblock, we need to shift it right by the size of the merged freeblock
// since the space occupied by both the 1st and 2nd freeblocks was now moved to its left.
let new_offset = cell_ptr + freeblocks_total_size;
turso_assert!(new_offset <= usable_space as usize, "new offset beyond usable space: new_offset={new_offset} usable_space={usable_space}");
page.write_u16(ptr_offset, (cell_ptr + freeblocks_total_size) as u16);
page.write_u16_no_offset(ptr_offset, (cell_ptr + freeblocks_total_size) as u16);
} else if freeblock_2nd > 0 && cell_ptr < freeblock_2nd {
// If the cell pointer was located between the first and second freeblock, we need to shift it right by the size of only the second freeblock,
// since the first one was already on its left.
let new_offset = cell_ptr + freeblock_2nd_size;
turso_assert!(new_offset <= usable_space as usize, "new offset beyond usable space: new_offset={new_offset} usable_space={usable_space}");
page.write_u16(ptr_offset, (cell_ptr + freeblock_2nd_size) as u16);
page.write_u16_no_offset(ptr_offset, (cell_ptr + freeblock_2nd_size) as u16);
}
}
@@ -6606,11 +6606,11 @@ fn defragment_page(page: &PageContent, usable_space: u16, max_frag_bytes: isize)
// No freeblocks and very little if any fragmented free bytes -> no need to defragment.
return Ok(());
}
let freeblock_2nd = page.read_u16(freeblock_1st) as usize;
let freeblock_2nd = page.read_u16_no_offset(freeblock_1st) as usize;
if freeblock_2nd == 0 {
return defragment_page_fast(page, usable_space, freeblock_1st, 0);
}
let freeblock_3rd = page.read_u16(freeblock_2nd) as usize;
let freeblock_3rd = page.read_u16_no_offset(freeblock_2nd) as usize;
if freeblock_3rd == 0 {
return defragment_page_fast(page, usable_space, freeblock_1st, freeblock_2nd);
}

View File

@@ -1719,4 +1719,164 @@ mod tests {
}
}
}
#[test]
#[ignore]
pub fn fuzz_long_create_table_drop_table_alter_table() {
let db = TempDatabase::new_empty(true);
let limbo_conn = db.connect_limbo();
let (mut rng, seed) = rng_from_time_or_env();
tracing::info!("create_table_drop_table_fuzz seed: {}", seed);
// Keep track of current tables and their columns in memory
let mut current_tables: std::collections::HashMap<String, Vec<String>> =
std::collections::HashMap::new();
let mut table_counter = 0;
// Column types for random generation
const COLUMN_TYPES: [&str; 6] = ["INTEGER", "TEXT", "REAL", "BLOB", "BOOLEAN", "NUMERIC"];
const COLUMN_NAMES: [&str; 8] = [
"id", "name", "value", "data", "info", "field", "col", "attr",
];
let mut undroppable_cols = HashSet::new();
for iteration in 0..50000 {
println!("iteration: {iteration} (seed: {seed})");
let operation = rng.random_range(0..100); // 0: create, 1: drop, 2: alter, 3: alter rename
match operation {
0..20 => {
// Create table
if current_tables.len() < 10 {
// Limit number of tables
let table_name = format!("table_{table_counter}");
table_counter += 1;
let num_columns = rng.random_range(1..6);
let mut columns = Vec::new();
for i in 0..num_columns {
let col_name = if i == 0 && rng.random_bool(0.3) {
"id".to_string()
} else {
format!(
"{}_{}",
COLUMN_NAMES[rng.random_range(0..COLUMN_NAMES.len())],
rng.random_range(0..u64::MAX)
)
};
let col_type = COLUMN_TYPES[rng.random_range(0..COLUMN_TYPES.len())];
let constraint = if i == 0 && rng.random_bool(0.2) {
" PRIMARY KEY"
} else if rng.random_bool(0.1) {
" UNIQUE"
} else {
""
};
if constraint.contains("UNIQUE") || constraint.contains("PRIMARY KEY") {
undroppable_cols.insert((table_name.clone(), col_name.clone()));
}
columns.push(format!("{col_name} {col_type}{constraint}"));
}
let create_sql =
format!("CREATE TABLE {} ({})", table_name, columns.join(", "));
// Execute the create table statement
limbo_exec_rows(&db, &limbo_conn, &create_sql);
// Successfully created table, update our tracking
current_tables.insert(
table_name.clone(),
columns
.iter()
.map(|c| c.split_whitespace().next().unwrap().to_string())
.collect(),
);
}
}
20..30 => {
// Drop table
if !current_tables.is_empty() {
let table_names: Vec<String> = current_tables.keys().cloned().collect();
let table_to_drop = &table_names[rng.random_range(0..table_names.len())];
let drop_sql = format!("DROP TABLE {table_to_drop}");
limbo_exec_rows(&db, &limbo_conn, &drop_sql);
// Successfully dropped table, update our tracking
current_tables.remove(table_to_drop);
}
}
30..60 => {
// Alter table - add column
if !current_tables.is_empty() {
let table_names: Vec<String> = current_tables.keys().cloned().collect();
let table_to_alter = &table_names[rng.random_range(0..table_names.len())];
let new_col_name = format!("new_col_{}", rng.random_range(0..u64::MAX));
let col_type = COLUMN_TYPES[rng.random_range(0..COLUMN_TYPES.len())];
let alter_sql = format!(
"ALTER TABLE {} ADD COLUMN {} {}",
table_to_alter, &new_col_name, col_type
);
limbo_exec_rows(&db, &limbo_conn, &alter_sql);
// Successfully added column, update our tracking
let table_name = table_to_alter.clone();
let col_name = new_col_name.clone();
if let Some(columns) = current_tables.get_mut(&table_name) {
columns.push(new_col_name);
}
}
}
60..100 => {
// Alter table - drop column
if !current_tables.is_empty() {
let table_names: Vec<String> = current_tables.keys().cloned().collect();
let table_to_alter = &table_names[rng.random_range(0..table_names.len())];
let table_name = table_to_alter.clone();
if let Some(columns) = current_tables.get(&table_name) {
let droppable_cols = columns
.iter()
.filter(|c| {
!undroppable_cols.contains(&(table_name.clone(), c.to_string()))
})
.collect::<Vec<_>>();
if columns.len() > 1 && !droppable_cols.is_empty() {
// Don't drop the last column
let col_index = rng.random_range(0..droppable_cols.len());
let col_to_drop = droppable_cols[col_index].clone();
let alter_sql = format!(
"ALTER TABLE {table_to_alter} DROP COLUMN {col_to_drop}"
);
limbo_exec_rows(&db, &limbo_conn, &alter_sql);
// Successfully dropped column, update our tracking
let columns = current_tables.get_mut(&table_name).unwrap();
columns.retain(|c| c != &col_to_drop);
}
}
}
}
_ => unreachable!(),
}
}
// Final verification - the test passes if we didn't crash
println!(
"create_table_drop_table_fuzz completed successfully with {} tables remaining",
current_tables.len()
);
}
}