btree/pager: reuse freelist pages in allocate_page() to fix UPDATE perf

This commit is contained in:
Jussi Saurio
2025-07-22 17:03:22 +03:00
parent a02a590f88
commit 5ce65bf8e7
2 changed files with 424 additions and 139 deletions

View File

@@ -2234,7 +2234,7 @@ impl BTreeCursor {
}
WriteState::Insert { page, cell_idx } => {
let mut cell_payload: Vec<u8> = Vec::with_capacity(record_values.len() + 4);
fill_cell_payload(
return_if_io!(fill_cell_payload(
page.get().get().contents.as_ref().unwrap(),
bkey.maybe_rowid(),
&mut cell_payload,
@@ -2242,7 +2242,7 @@ impl BTreeCursor {
record,
self.usable_space(),
self.pager.clone(),
);
));
{
let page = page.get();
@@ -3160,7 +3160,17 @@ impl BTreeCursor {
pages_to_balance_new[i].replace(page.clone());
} else {
// FIXME: handle page cache is full
let page = self.allocate_page(page_type, 0)?;
let mut page = self.allocate_page(page_type, 0)?;
// FIXME: add new state machine state instead of this sync IO hack
while matches!(page, IOResult::IO) {
self.pager.io.run_once()?;
page = self.allocate_page(page_type, 0)?;
}
let IOResult::Done(page) = page else {
return Err(LimboError::InternalError(
"Failed to allocate page".into(),
));
};
pages_to_balance_new[i].replace(page);
// Since this page didn't exist before, we can set it to cells length as it
// marks them as empty since it is a prefix sum of cells.
@@ -4030,7 +4040,7 @@ impl BTreeCursor {
/// Balance the root page.
/// This is done when the root page overflows, and we need to create a new root page.
/// See e.g. https://en.wikipedia.org/wiki/B-tree
fn balance_root(&mut self) -> Result<()> {
fn balance_root(&mut self) -> Result<IOResult<()>> {
/* todo: balance deeper, create child and copy contents of root there. Then split root */
/* if we are in root page then we just need to create a new root and push key there */
@@ -4045,9 +4055,19 @@ impl BTreeCursor {
let root = root_btree.get();
let root_contents = root.get_contents();
// FIXME: handle page cache is full
let child_btree =
self.pager
.do_allocate_page(root_contents.page_type(), 0, BtreePageAllocMode::Any)?;
// FIXME: remove sync IO hack
let child_btree = loop {
match self.pager.do_allocate_page(
root_contents.page_type(),
0,
BtreePageAllocMode::Any,
)? {
IOResult::IO => {
self.pager.io.run_once()?;
}
IOResult::Done(page) => break page,
}
};
tracing::debug!(
"balance_root(root={}, rightmost={}, page_type={:?})",
@@ -4108,7 +4128,7 @@ impl BTreeCursor {
self.stack.push(root_btree.clone());
self.stack.set_cell_index(0); // leave parent pointing at the rightmost pointer (in this case 0, as there are no cells), since we will be balancing the rightmost child page.
self.stack.push(child_btree.clone());
Ok(())
Ok(IOResult::Done(()))
}
fn usable_space(&self) -> usize {
@@ -5163,7 +5183,7 @@ impl BTreeCursor {
let serial_types_len = self.record_cursor.borrow_mut().len(record);
let mut new_payload = Vec::with_capacity(serial_types_len);
let rowid = return_if_io!(self.rowid());
fill_cell_payload(
return_if_io!(fill_cell_payload(
page_contents,
rowid,
&mut new_payload,
@@ -5171,7 +5191,7 @@ impl BTreeCursor {
record,
self.usable_space(),
self.pager.clone(),
);
));
// figure out old cell offset & size
let (old_offset, old_local_size) = {
let page_ref = page_ref.get();
@@ -5393,7 +5413,7 @@ impl BTreeCursor {
btree_read_page(&self.pager, page_idx)
}
pub fn allocate_page(&self, page_type: PageType, offset: usize) -> Result<BTreePage> {
pub fn allocate_page(&self, page_type: PageType, offset: usize) -> Result<IOResult<BTreePage>> {
self.pager
.do_allocate_page(page_type, offset, BtreePageAllocMode::Any)
}
@@ -6711,7 +6731,7 @@ fn fill_cell_payload(
record: &ImmutableRecord,
usable_space: usize,
pager: Rc<Pager>,
) {
) -> Result<IOResult<()>> {
// TODO: make record raw from start, having to serialize is not good
let record_buf = record.get_payload().to_vec();
@@ -6740,7 +6760,7 @@ fn fill_cell_payload(
if record_buf.len() <= payload_overflow_threshold_max {
// enough allowed space to fit inside a btree page
cell_payload.extend_from_slice(record_buf.as_slice());
return;
return Ok(IOResult::Done(()));
}
let payload_overflow_threshold_min = payload_overflow_threshold_min(page_type, usable_space);
@@ -6772,7 +6792,9 @@ fn fill_cell_payload(
// we still have bytes to add, we will need to allocate new overflow page
// FIXME: handle page cache is full
let overflow_page = pager.allocate_overflow_page();
// FIXME: not re-entrant!!!!!!!!!!!!!!!
let overflow_page = return_if_io!(pager.allocate_overflow_page());
turso_assert!(overflow_page.is_loaded(), "overflow page is not loaded");
{
let id = overflow_page.get().id as u32;
let contents = overflow_page.get().contents.as_mut().unwrap();
@@ -6792,6 +6814,7 @@ fn fill_cell_payload(
}
assert_eq!(cell_size, cell_payload.len());
Ok(IOResult::Done(()))
}
/// Returns the maximum payload size (X) that can be stored directly on a b-tree page without spilling to overflow pages.
@@ -6960,15 +6983,21 @@ mod tests {
conn: &Arc<Connection>,
) -> Vec<u8> {
let mut payload: Vec<u8> = Vec::new();
fill_cell_payload(
page,
Some(id as i64),
&mut payload,
pos,
&record,
4096,
conn.pager.borrow().clone(),
);
run_until_done(
|| {
fill_cell_payload(
page,
Some(id as i64),
&mut payload,
pos,
&record,
4096,
conn.pager.borrow().clone(),
)
},
&conn.pager.borrow().clone(),
)
.unwrap();
insert_into_cell(page, &payload, pos, 4096).unwrap();
payload
}
@@ -7209,7 +7238,7 @@ mod tests {
// FIXME: handle page cache is full
let _ = run_until_done(|| pager.allocate_page1(), &pager);
let page2 = pager.allocate_page().unwrap();
let page2 = run_until_done(|| pager.allocate_page(), &pager).unwrap();
let page2 = Arc::new(BTreePageInner {
page: RefCell::new(page2),
});
@@ -8320,11 +8349,20 @@ mod tests {
let mut cursor = BTreeCursor::new_table(None, pager.clone(), 2, num_columns);
// Initialize page 2 as a root page (interior)
let root_page = cursor.allocate_page(PageType::TableInterior, 0)?;
let root_page = run_until_done(
|| cursor.allocate_page(PageType::TableInterior, 0),
&cursor.pager,
)?;
// Allocate two leaf pages
let page3 = cursor.allocate_page(PageType::TableLeaf, 0)?;
let page4 = cursor.allocate_page(PageType::TableLeaf, 0)?;
let page3 = run_until_done(
|| cursor.allocate_page(PageType::TableLeaf, 0),
&cursor.pager,
)?;
let page4 = run_until_done(
|| cursor.allocate_page(PageType::TableLeaf, 0),
&cursor.pager,
)?;
// Configure the root page to point to the two leaf pages
{
@@ -8502,15 +8540,21 @@ mod tests {
let regs = &[Register::Value(Value::Integer(i as i64))];
let record = ImmutableRecord::from_registers(regs, regs.len());
let mut payload: Vec<u8> = Vec::new();
fill_cell_payload(
page,
Some(i as i64),
&mut payload,
cell_idx,
&record,
4096,
conn.pager.borrow().clone(),
);
run_until_done(
|| {
fill_cell_payload(
page,
Some(i as i64),
&mut payload,
cell_idx,
&record,
4096,
conn.pager.borrow().clone(),
)
},
&conn.pager.borrow().clone(),
)
.unwrap();
if (free as usize) < payload.len() + 2 {
// do not try to insert overflow pages because they require balancing
continue;
@@ -8576,15 +8620,21 @@ mod tests {
let regs = &[Register::Value(Value::Integer(i))];
let record = ImmutableRecord::from_registers(regs, regs.len());
let mut payload: Vec<u8> = Vec::new();
fill_cell_payload(
page,
Some(i),
&mut payload,
cell_idx,
&record,
4096,
conn.pager.borrow().clone(),
);
run_until_done(
|| {
fill_cell_payload(
page,
Some(i),
&mut payload,
cell_idx,
&record,
4096,
conn.pager.borrow().clone(),
)
},
&conn.pager.borrow().clone(),
)
.unwrap();
if (free as usize) < payload.len() - 2 {
// do not try to insert overflow pages because they require balancing
continue;
@@ -8941,15 +8991,21 @@ mod tests {
let regs = &[Register::Value(Value::Integer(0))];
let record = ImmutableRecord::from_registers(regs, regs.len());
let mut payload: Vec<u8> = Vec::new();
fill_cell_payload(
page.get().get_contents(),
Some(0),
&mut payload,
0,
&record,
4096,
conn.pager.borrow().clone(),
);
run_until_done(
|| {
fill_cell_payload(
page.get().get_contents(),
Some(0),
&mut payload,
0,
&record,
4096,
conn.pager.borrow().clone(),
)
},
&conn.pager.borrow().clone(),
)
.unwrap();
let page = page.get();
insert(0, page.get_contents());
defragment(page.get_contents());
@@ -9019,15 +9075,21 @@ mod tests {
let regs = &[Register::Value(Value::Blob(vec![0; 3600]))];
let record = ImmutableRecord::from_registers(regs, regs.len());
let mut payload: Vec<u8> = Vec::new();
fill_cell_payload(
page.get().get_contents(),
Some(0),
&mut payload,
0,
&record,
4096,
conn.pager.borrow().clone(),
);
run_until_done(
|| {
fill_cell_payload(
page.get().get_contents(),
Some(0),
&mut payload,
0,
&record,
4096,
conn.pager.borrow().clone(),
)
},
&conn.pager.borrow().clone(),
)
.unwrap();
insert_into_cell(page.get().get_contents(), &payload, 0, 4096).unwrap();
let free = compute_free_space(page.get().get_contents(), usable_space);
let total_size = payload.len() + 2;
@@ -9355,7 +9417,7 @@ mod tests {
let mut cells_cloned = Vec::new();
let (pager, _, _, _) = empty_btree();
let page_type = PageType::TableLeaf;
let page = pager.allocate_page().unwrap();
let page = run_until_done(|| pager.allocate_page(), &pager).unwrap();
let page = Arc::new(BTreePageInner {
page: RefCell::new(page),
});
@@ -9427,15 +9489,21 @@ mod tests {
let mut payload = Vec::new();
let regs = &[Register::Value(Value::Blob(vec![0; size as usize]))];
let record = ImmutableRecord::from_registers(regs, regs.len());
fill_cell_payload(
contents,
Some(cell_idx as i64),
&mut payload,
cell_idx as usize,
&record,
pager.usable_space(),
pager.clone(),
);
run_until_done(
|| {
fill_cell_payload(
contents,
Some(cell_idx as i64),
&mut payload,
cell_idx as usize,
&record,
pager.usable_space(),
pager.clone(),
)
},
&pager,
)
.unwrap();
insert_into_cell(
contents,
&payload,

View File

@@ -335,6 +335,9 @@ pub struct Pager {
pub db_state: Arc<AtomicDbState>,
/// Mutex for synchronizing database initialization to prevent race conditions
init_lock: Arc<Mutex<()>>,
/// The state of the current allocate page operation.
allocate_page_state: RefCell<AllocatePageState>,
/// The state of the current allocate page1 operation.
allocate_page1_state: RefCell<AllocatePage1State>,
/// Cache page_size and reserved_space at Pager init and reuse for subsequent
/// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality
@@ -355,6 +358,34 @@ pub enum PagerCommitResult {
Rollback,
}
#[derive(Debug, Clone)]
enum AllocatePageState {
Start,
/// Load the first freelist trunk page into memory.
LoadFreelistTrunkPage {
current_trunk: u32,
current_db_size: u32,
},
/// Search the trunk page for an available free list leaf.
/// If none are found, there are two options:
/// - If there are no more trunk pages, the freelist is empty, so allocate a new page.
/// - If there are more trunk pages, use the current first trunk page as the new allocation,
/// and set the next trunk page as the database's "first freelist trunk page".
SearchAvailableFreeListLeaf {
trunk_page: PageRef,
current_db_size: u32,
},
/// If a freelist leaf is found, reuse it for the page allocation and remove it from the trunk page.
ReuseFreelistLeaf {
trunk_page: PageRef,
number_of_freelist_leaves: u32,
},
/// If a suitable freelist leaf is not found, allocate an entirely new page.
AllocateNewPage {
current_db_size: u32,
},
}
#[derive(Clone)]
enum AllocatePage1State {
Start,
@@ -421,6 +452,7 @@ impl Pager {
dirty_pages: Vec::new(),
}),
free_page_state: RefCell::new(FreePageState::Start),
allocate_page_state: RefCell::new(AllocatePageState::Start),
})
}
@@ -612,9 +644,8 @@ impl Pager {
};
#[cfg(feature = "omit_autovacuum")]
{
let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?;
let page_id = page.get().get().id;
Ok(IOResult::Done(page_id as u32))
let page = return_if_io!(self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any));
Ok(IOResult::Done(page.get().get().id as u32))
}
// If autovacuum is enabled, we need to allocate a new page number that is greater than the largest root page number
@@ -623,9 +654,9 @@ impl Pager {
let auto_vacuum_mode = self.auto_vacuum_mode.borrow();
match *auto_vacuum_mode {
AutoVacuumMode::None => {
let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?;
let page_id = page.get().get().id;
Ok(IOResult::Done(page_id as u32))
let page =
return_if_io!(self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any));
Ok(IOResult::Done(page.get().get().id as u32))
}
AutoVacuumMode::Full => {
let mut root_page_num =
@@ -648,11 +679,11 @@ impl Pager {
assert!(root_page_num >= 3); // the very first root page is page 3
// root_page_num here is the desired root page
let page = self.do_allocate_page(
let page = return_if_io!(self.do_allocate_page(
page_type,
0,
BtreePageAllocMode::Exact(root_page_num),
)?;
));
let allocated_page_id = page.get().get().id as u32;
if allocated_page_id != root_page_num {
// TODO(Zaid): Handle swapping the allocated page with the desired root page
@@ -676,8 +707,8 @@ impl Pager {
/// Allocate a new overflow page.
/// This is done when a cell overflows and new space is needed.
// FIXME: handle no room in page cache
pub fn allocate_overflow_page(&self) -> PageRef {
let page = self.allocate_page().unwrap();
pub fn allocate_overflow_page(&self) -> Result<IOResult<PageRef>> {
let page = return_if_io!(self.allocate_page());
tracing::debug!("Pager::allocate_overflow_page(id={})", page.get().id);
// setup overflow page
@@ -685,7 +716,7 @@ impl Pager {
let buf = contents.as_ptr();
buf.fill(0);
page
Ok(IOResult::Done(page))
}
/// Allocate a new page to the btree via the pager.
@@ -696,8 +727,8 @@ impl Pager {
page_type: PageType,
offset: usize,
_alloc_mode: BtreePageAllocMode,
) -> Result<BTreePage> {
let page = self.allocate_page()?;
) -> Result<IOResult<BTreePage>> {
let page = return_if_io!(self.allocate_page());
let page = Arc::new(BTreePageInner {
page: RefCell::new(page),
});
@@ -707,7 +738,7 @@ impl Pager {
page.get().get().id,
page.get().get_contents().page_type()
);
Ok(page)
Ok(IOResult::Done(page))
}
/// The "usable size" of a database page is the page size specified by the 2-byte integer at offset 16
@@ -1412,7 +1443,7 @@ impl Pager {
if let Some(size) = self.page_size.get() {
default_header.update_page_size(size);
}
let page = allocate_page(1, &self.buffer_pool, 0);
let page = allocate_new_page(1, &self.buffer_pool, 0);
let contents = page.get_contents();
contents.write_database_header(&default_header);
@@ -1470,63 +1501,249 @@ impl Pager {
)
}
/*
Gets a new page that increasing the size of the page or uses a free page.
Currently free list pages are not yet supported.
*/
// FIXME: handle no room in page cache
/// Tries to reuse a page from the freelist if available.
/// If not, allocates a new page which increases the database size.
///
/// FIXME: implement sqlite's 'nearby' parameter and use AllocMode.
/// SQLite's allocate_page() equivalent has a parameter 'nearby' which is a hint about the page number we want to have for the allocated page.
/// We should use this parameter to allocate the page in the same way as SQLite does; instead now we just either take the first available freelist page
/// or allocate a new page.
/// FIXME: handle no room in page cache
#[allow(clippy::readonly_write_lock)]
#[instrument(skip_all, level = Level::DEBUG)]
pub fn allocate_page(&self) -> Result<PageRef> {
let old_db_size = header_accessor::get_database_size(self)?;
#[allow(unused_mut)]
let mut new_db_size = old_db_size + 1;
pub fn allocate_page(&self) -> Result<IOResult<PageRef>> {
const FREELIST_TRUNK_OFFSET_NEXT_TRUNK: usize = 0;
const FREELIST_TRUNK_OFFSET_LEAF_COUNT: usize = 4;
const FREELIST_TRUNK_OFFSET_FIRST_LEAF: usize = 8;
tracing::debug!("allocate_page(database_size={})", new_db_size);
loop {
let mut state = self.allocate_page_state.borrow_mut();
tracing::debug!("allocate_page(state={:?})", state);
match &mut *state {
AllocatePageState::Start => {
let old_db_size = header_accessor::get_database_size(self)?;
#[cfg(not(feature = "omit_autovacuum"))]
let mut new_db_size = old_db_size;
#[cfg(feature = "omit_autovacuum")]
let new_db_size = old_db_size;
#[cfg(not(feature = "omit_autovacuum"))]
{
// If the following conditions are met, allocate a pointer map page, add to cache and increment the database size
// - autovacuum is enabled
// - the last page is a pointer map page
if matches!(*self.auto_vacuum_mode.borrow(), AutoVacuumMode::Full)
&& is_ptrmap_page(new_db_size, header_accessor::get_page_size(self)? as usize)
{
let page = allocate_page(new_db_size as usize, &self.buffer_pool, 0);
self.add_dirty(&page);
tracing::debug!("allocate_page(database_size={})", new_db_size);
#[cfg(not(feature = "omit_autovacuum"))]
{
// If the following conditions are met, allocate a pointer map page, add to cache and increment the database size
// - autovacuum is enabled
// - the last page is a pointer map page
if matches!(*self.auto_vacuum_mode.borrow(), AutoVacuumMode::Full)
&& is_ptrmap_page(
new_db_size,
header_accessor::get_page_size(self)? as usize,
)
{
let page =
allocate_new_page(new_db_size as usize, &self.buffer_pool, 0);
self.add_dirty(&page);
let page_key = PageCacheKey::new(page.get().id);
let mut cache = self.page_cache.write();
match cache.insert(page_key, page.clone()) {
Ok(_) => (),
Err(CacheError::Full) => return Err(LimboError::CacheFull),
Err(_) => {
return Err(LimboError::InternalError(
"Unknown error inserting page to cache".into(),
))
}
}
// we allocated a ptrmap page, so the next data page will be at new_db_size + 1
new_db_size += 1;
}
}
let page_key = PageCacheKey::new(page.get().id);
let mut cache = self.page_cache.write();
match cache.insert(page_key, page.clone()) {
Ok(_) => (),
Err(CacheError::Full) => return Err(LimboError::CacheFull),
Err(_) => {
return Err(LimboError::InternalError(
"Unknown error inserting page to cache".into(),
))
let first_freelist_trunk_page_id =
header_accessor::get_freelist_trunk_page(self)?;
if first_freelist_trunk_page_id == 0 {
*state = AllocatePageState::AllocateNewPage {
current_db_size: new_db_size,
};
continue;
}
*state = AllocatePageState::LoadFreelistTrunkPage {
current_trunk: first_freelist_trunk_page_id,
current_db_size: new_db_size,
};
continue;
}
AllocatePageState::LoadFreelistTrunkPage {
current_trunk,
current_db_size,
} => {
let page = self.read_page(*current_trunk as usize)?;
if page.is_locked() {
return Ok(IOResult::IO);
}
*state = AllocatePageState::SearchAvailableFreeListLeaf {
trunk_page: page,
current_db_size: *current_db_size,
};
continue;
}
AllocatePageState::SearchAvailableFreeListLeaf {
trunk_page,
current_db_size,
} => {
turso_assert!(
trunk_page.is_loaded(),
"Freelist trunk page {} is not loaded",
trunk_page.get().id
);
let page_contents = trunk_page.get().contents.as_ref().unwrap();
let next_trunk_page_id =
page_contents.read_u32(FREELIST_TRUNK_OFFSET_NEXT_TRUNK);
let number_of_freelist_leaves =
page_contents.read_u32(FREELIST_TRUNK_OFFSET_LEAF_COUNT);
// There are leaf pointers on this trunk page, so we can reuse one of the pages
// for the allocation.
if number_of_freelist_leaves != 0 {
*state = AllocatePageState::ReuseFreelistLeaf {
trunk_page: trunk_page.clone(),
number_of_freelist_leaves,
};
continue;
}
// No freelist leaves on this trunk page.
// If the freelist is completely empty, allocate a new page.
if next_trunk_page_id == 0 {
*state = AllocatePageState::AllocateNewPage {
current_db_size: *current_db_size,
};
continue;
}
// Freelist is not empty, so we can reuse the trunk itself as a new page
// and update the database's first freelist trunk page to the next trunk page.
header_accessor::set_freelist_trunk_page(self, next_trunk_page_id)?;
header_accessor::set_freelist_pages(
self,
header_accessor::get_freelist_pages(self)? - 1,
)?;
self.add_dirty(trunk_page);
// zero out the page
turso_assert!(
trunk_page.get_contents().overflow_cells.is_empty(),
"Freelist leaf page {} has overflow cells",
trunk_page.get().id
);
trunk_page.get().contents.as_ref().unwrap().as_ptr().fill(0);
let page_key = PageCacheKey::new(trunk_page.get().id);
{
let mut page_cache = self.page_cache.write();
turso_assert!(
page_cache.contains_key(&page_key),
"page {} is not in cache",
trunk_page.get().id
);
}
let trunk_page = trunk_page.clone();
*state = AllocatePageState::Start;
return Ok(IOResult::Done(trunk_page));
}
AllocatePageState::ReuseFreelistLeaf {
trunk_page,
number_of_freelist_leaves,
} => {
turso_assert!(
trunk_page.is_loaded(),
"Freelist trunk page {} is not loaded",
trunk_page.get().id
);
turso_assert!(
*number_of_freelist_leaves > 0,
"Freelist trunk page {} has no leaves",
trunk_page.get().id
);
let page_contents = trunk_page.get().contents.as_ref().unwrap();
let next_leaf_page_id =
page_contents.read_u32(FREELIST_TRUNK_OFFSET_FIRST_LEAF);
let leaf_page = self.read_page(next_leaf_page_id as usize)?;
if leaf_page.is_locked() {
return Ok(IOResult::IO);
}
self.add_dirty(&leaf_page);
// zero out the page
turso_assert!(
leaf_page.get_contents().overflow_cells.is_empty(),
"Freelist leaf page {} has overflow cells",
leaf_page.get().id
);
leaf_page.get().contents.as_ref().unwrap().as_ptr().fill(0);
let page_key = PageCacheKey::new(leaf_page.get().id);
{
let mut page_cache = self.page_cache.write();
turso_assert!(
page_cache.contains_key(&page_key),
"page {} is not in cache",
leaf_page.get().id
);
}
// Shift left all the other leaf pages in the trunk page and subtract 1 from the leaf count
let remaining_leaves_count = (*number_of_freelist_leaves - 1) as usize;
{
let buf = page_contents.as_ptr();
// use copy within the same page
const LEAF_PTR_SIZE_BYTES: usize = 4;
let offset_remaining_leaves_start =
FREELIST_TRUNK_OFFSET_FIRST_LEAF + LEAF_PTR_SIZE_BYTES;
let offset_remaining_leaves_end = offset_remaining_leaves_start
+ remaining_leaves_count * LEAF_PTR_SIZE_BYTES;
buf.copy_within(
offset_remaining_leaves_start..offset_remaining_leaves_end,
FREELIST_TRUNK_OFFSET_FIRST_LEAF,
);
}
// write the new leaf count
page_contents.write_u32(
FREELIST_TRUNK_OFFSET_LEAF_COUNT,
remaining_leaves_count as u32,
);
self.add_dirty(trunk_page);
header_accessor::set_freelist_pages(
self,
header_accessor::get_freelist_pages(self)? - 1,
)?;
*state = AllocatePageState::Start;
return Ok(IOResult::Done(leaf_page));
}
AllocatePageState::AllocateNewPage { current_db_size } => {
let new_db_size = *current_db_size + 1;
// FIXME: should reserve page cache entry before modifying the database
let page = allocate_new_page(new_db_size as usize, &self.buffer_pool, 0);
{
// setup page and add to cache
self.add_dirty(&page);
let page_key = PageCacheKey::new(page.get().id);
{
// Run in separate block to avoid deadlock on page cache write lock
let mut cache = self.page_cache.write();
match cache.insert(page_key, page.clone()) {
Err(CacheError::Full) => return Err(LimboError::CacheFull),
Err(_) => {
return Err(LimboError::InternalError(
"Unknown error inserting page to cache".into(),
))
}
Ok(_) => {}
};
}
header_accessor::set_database_size(self, new_db_size)?;
*state = AllocatePageState::Start;
return Ok(IOResult::Done(page));
}
}
// we allocated a ptrmap page, so the next data page will be at new_db_size + 1
new_db_size += 1;
}
}
header_accessor::set_database_size(self, new_db_size)?;
// FIXME: should reserve page cache entry before modifying the database
let page = allocate_page(new_db_size as usize, &self.buffer_pool, 0);
{
// setup page and add to cache
self.add_dirty(&page);
let page_key = PageCacheKey::new(page.get().id);
let mut cache = self.page_cache.write();
match cache.insert(page_key, page.clone()) {
Err(CacheError::Full) => Err(LimboError::CacheFull),
Err(_) => Err(LimboError::InternalError(
"Unknown error inserting page to cache".into(),
)),
Ok(_) => Ok(page),
}
}
}
@@ -1597,7 +1814,7 @@ impl Pager {
}
}
pub fn allocate_page(page_id: usize, buffer_pool: &Arc<BufferPool>, offset: usize) -> PageRef {
pub fn allocate_new_page(page_id: usize, buffer_pool: &Arc<BufferPool>, offset: usize) -> PageRef {
let page = Arc::new(Page::new(page_id));
{
let buffer = buffer_pool.get();