diff --git a/core/storage/btree.rs b/core/storage/btree.rs index c8f67594b..4d23eee34 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -1103,6 +1103,7 @@ impl BTreeCursor { let page_type = pages_to_balance[0].get_contents().page_type(); let leaf_data = matches!(page_type, PageType::TableLeaf); + let leaf = matches!(page_type, PageType::TableLeaf | PageType::IndexLeaf); for (i, old_page) in pages_to_balance.iter().enumerate() { let old_page_contents = old_page.get_contents(); for cell_idx in 0..old_page_contents.cell_count() { @@ -1159,10 +1160,10 @@ impl BTreeCursor { // calculate how many pages to allocate let mut new_page_sizes = Vec::new(); let mut k = 0; - // todo: add leaf correction - // number of bytes beyond header, differnet from global usableSapce which inccludes + let leaf_correction = if leaf { 4 } else { 0 }; + // number of bytes beyond header, different from global usableSapce which inccludes // header - let usable_space = self.usable_space() - 12; + let usable_space = self.usable_space() - 12 + leaf_correction; for i in 0..sibling_count { cell_array .number_of_cells_per_page @@ -1265,6 +1266,12 @@ impl BTreeCursor { break; } } + tracing::debug!( + "balance_non_root(sibling_count={}, sibling_count_new={}, cells={})", + sibling_count, + sibling_count_new, + cell_array.cells.len() + ); // Comment borrowed from SQLite src/btree.c // The packing computed by the previous block is biased toward the siblings @@ -1324,8 +1331,11 @@ impl BTreeCursor { pages_to_balance[i].set_dirty(); pages_to_balance_new.push(pages_to_balance[i].clone()); } else { - let page = self.allocate_page(page_type.clone(), 0); + let page = self.allocate_page(page_type, 0); pages_to_balance_new.push(page); + // Since this page didn't exist before, we can set it to cells length as it + // marks them as empty since it is a prefix sum of cells. + count_cells_in_old_pages.push(cell_array.cells.len() as u16); } } @@ -1389,6 +1399,7 @@ impl BTreeCursor { new_divider_cell.extend_from_slice(divider_cell); } // FIXME: defragment shouldn't be needed + defragment_page(parent_contents, self.usable_space() as u16); insert_into_cell( parent_contents, &new_divider_cell, @@ -1931,10 +1942,8 @@ impl BTreeCursor { payload_len: usize, page_type: PageType, ) -> Result> { - let max_local = - payload_overflow_threshold_max(page_type.clone(), self.usable_space() as u16); - let min_local = - payload_overflow_threshold_min(page_type.clone(), self.usable_space() as u16); + let max_local = payload_overflow_threshold_max(page_type, self.usable_space() as u16); + let min_local = payload_overflow_threshold_min(page_type, self.usable_space() as u16); let usable_size = self.usable_space(); let (_, local_size) = payload_overflows(payload_len, max_local, min_local, usable_size); @@ -2635,7 +2644,6 @@ fn fill_cell_payload( cell_payload.extend_from_slice(record_buf.as_slice()); return; } - debug!("fill_cell_payload(overflow)"); let payload_overflow_threshold_min = payload_overflow_threshold_min(page_type, usable_space); // see e.g. https://github.com/sqlite/sqlite/blob/9591d3fe93936533c8c3b0dc4d025ac999539e11/src/dbstat.c#L371 @@ -3129,10 +3137,18 @@ mod tests { let (pager, root_page) = empty_btree(); let mut cursor = BTreeCursor::new(pager.clone(), root_page); for (key, size) in sequence.iter() { + run_until_done( + || { + let key = SeekKey::TableRowId(*key as u64); + cursor.move_to(key, SeekOp::EQ) + }, + pager.deref(), + ) + .unwrap(); let key = OwnedValue::Integer(*key); let value = Record::new(vec![OwnedValue::Blob(Rc::new(vec![0; *size]))]); tracing::info!("insert key:{}", key); - cursor.insert(&key, &value, false).unwrap(); + run_until_done(|| cursor.insert(&key, &value, true), pager.deref()).unwrap(); tracing::info!( "=========== btree ===========\n{}\n\n", format_btree(pager.clone(), root_page, 0) @@ -3873,6 +3889,7 @@ mod tests { ); dbg!(free); } + #[test] pub fn btree_insert_sequential() { let (pager, root_page) = empty_btree(); @@ -3909,6 +3926,32 @@ mod tests { } } + #[test] + pub fn test_big_payload_compute_free() { + let db = get_database(); + + let page = get_page(2); + let usable_space = 4096; + let record = Record::new([OwnedValue::Blob(Rc::new(vec![0; 3600]))].to_vec()); + let mut payload: Vec = Vec::new(); + fill_cell_payload( + page.get_contents().page_type(), + Some(0), + &mut payload, + &record, + 4096, + db.pager.clone(), + ); + insert_into_cell(page.get_contents(), &payload, 0, 4096).unwrap(); + let free = compute_free_space(page.get_contents(), usable_space); + let total_size = payload.len() + 2; + assert_eq!( + free, + usable_space - page.get_contents().header_size() as u16 - total_size as u16 + ); + dbg!(free); + } + fn run_until_done( mut action: impl FnMut() -> Result>, pager: &Pager,