mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-01 07:24:19 +01:00
Merge 'btree/pager: Improve update performance by reusing freelist pages in allocate_page()' from Jussi Saurio
Closes #2225. ## What We currently do not use pages in the [freelist](https://www.sqlite.org/fileformat.html#the_freelist) at all when allocating new pages. ## Why is this bad The effect of this is that 1. UPDATEs with overflow pages become really slow and 2. the database size grows really quickly. See #2225 for an extreme example comparison with SQLite. ## The fix Whenever `allocate_page()` is called, we first check if we have pages in the freelist, and if we do, we recycle one of those pages instead of creating a new one. If there are no freelist pages, we allocate a new page as normal. ## Implementation notes - `allocate_page()` now needs to return an `IOResult`, which means all of its callers also need to return an `IOResult`, necessitating quite a bit of new state machine logic to ensure re-entrancy. - I left a few "synchronous IO hacks" in the `balance()` routine because the size of this PR would balloon even more than it already has if I were to fix those immediately in this PR. - `fill_cell_payload()` uses some `unsafe` code to avoid lifetime issues, and adds an unfortunate double-indirection via `Arc<Mutex<Vec<T>>>` because the existing btree code constantly clones `WriteState`, and we must ensure the underlying buffers referenced by raw pointers in `fill_cell_payload` remain valid. **Follow-up cleanups:** 1. remove synchronous IO hacks that would require even more state machines and are best left for another PR 2. remove `Clone` from `WriteState` and implement it better ## Perf comparison `main`: 33 seconds ``` jussi@Jussis-MacBook-Pro limbo % time target/release/tursodb --experimental-indexes apinatest_main.db <<'EOF' create table t(x, y, z unique); insert into t select randomblob(1024*128),randomblob(1024*128),randomblob(1024*128) from generate_series(1, 100); insert into t select randomblob(1024*128),randomblob(1024*128),randomblob(1024*128) from generate_series(1, 100); insert into t select randomblob(1024*128),randomblob(1024*128),randomblob(1024*128) from generate_series(1, 100); insert into t select randomblob(1024*128),randomblob(1024*128),randomblob(1024*128) from generate_series(1, 100); insert into t select randomblob(1024*128),randomblob(1024*128),randomblob(1024*128) from generate_series(1, 100); insert into t select randomblob(1024*128),randomblob(1024*128),randomblob(1024*128) from generate_series(1, 100); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); EOF Turso v0.1.3-pre.3 Enter ".help" for usage hints. This software is ALPHA, only use for development, testing, and experimentation. target/release/tursodb --experimental-indexes apinatest_main.db <<<'' 6.81s user 21.18s system 83% cpu 33.643 total ``` PR: 13 seconds ``` jussi@Jussis-MacBook-Pro limbo % time target/release/tursodb --experimental-indexes apinatest_PR.db <<'EOF' create table t(x, y, z unique); insert into t select randomblob(1024*128),randomblob(1024*128),randomblob(1024*128) from generate_series(1, 100); insert into t select randomblob(1024*128),randomblob(1024*128),randomblob(1024*128) from generate_series(1, 100); insert into t select randomblob(1024*128),randomblob(1024*128),randomblob(1024*128) from generate_series(1, 100); insert into t select randomblob(1024*128),randomblob(1024*128),randomblob(1024*128) from generate_series(1, 100); insert into t select randomblob(1024*128),randomblob(1024*128),randomblob(1024*128) from generate_series(1, 100); insert into t select randomblob(1024*128),randomblob(1024*128),randomblob(1024*128) from generate_series(1, 100); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); update t set x = x + 1 WHERE z > randomblob(1024*128); EOF Turso v0.1.3-pre.3 Enter ".help" for usage hints. This software is ALPHA, only use for development, testing, and experimentation. target/release/tursodb --experimental-indexes apinatest_PR.db <<<'' 3.89s user 7.83s system 89% cpu 13.162 total ``` (sqlite: 2 seconds 🤡 ) --- TODO: - [x] Fix whatever issue the simulator caught in CI (#2238 ) - [x] Post a performance comparison - [x] Fix autovacuum test failure - [x] Improve docs - [x] Fix `fill_cell_payload` re-entrancy issue when allocating overflow pages - [x] Add proper PR description Reviewed-by: Pere Diaz Bou <pere-altea@homail.com> Closes #2233
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
use parking_lot::Mutex;
|
||||
use tracing::{instrument, Level};
|
||||
|
||||
use crate::{
|
||||
@@ -228,11 +229,20 @@ struct DeleteInfo {
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum OverwriteCellState {
|
||||
/// Fill the cell payload with the new value.
|
||||
FillPayload,
|
||||
/// Clear the overflow pages of the old celland overwrite the cell.
|
||||
/// Allocate a new payload for the cell.
|
||||
AllocatePayload,
|
||||
/// Fill the cell payload with the new payload.
|
||||
FillPayload {
|
||||
/// Dumb double-indirection via Arc because we clone [WriteState] for some reason and we use unsafe in [FillCellPayloadState::AllocateOverflowPages]
|
||||
/// so the underlying Vec must not be cloned in upper layers.
|
||||
new_payload: Arc<Mutex<Vec<u8>>>,
|
||||
rowid: Option<i64>,
|
||||
fill_cell_payload_state: FillCellPayloadState,
|
||||
},
|
||||
/// Clear the old cell's overflow pages and add them to the freelist.
|
||||
/// Overwrite the cell with the new payload.
|
||||
ClearOverflowPagesAndOverwrite {
|
||||
new_payload: Vec<u8>,
|
||||
new_payload: Arc<Mutex<Vec<u8>>>,
|
||||
old_offset: usize,
|
||||
old_local_size: usize,
|
||||
},
|
||||
@@ -256,6 +266,8 @@ enum WriteState {
|
||||
Insert {
|
||||
page: Arc<BTreePageInner>,
|
||||
cell_idx: usize,
|
||||
new_payload: Vec<u8>,
|
||||
fill_cell_payload_state: FillCellPayloadState,
|
||||
},
|
||||
BalanceStart,
|
||||
BalanceFreePages {
|
||||
@@ -2183,7 +2195,7 @@ impl BTreeCursor {
|
||||
write_info.state = WriteState::Overwrite {
|
||||
page: page.clone(),
|
||||
cell_idx,
|
||||
state: OverwriteCellState::FillPayload,
|
||||
state: OverwriteCellState::AllocatePayload,
|
||||
};
|
||||
continue;
|
||||
}
|
||||
@@ -2208,7 +2220,7 @@ impl BTreeCursor {
|
||||
write_info.state = WriteState::Overwrite {
|
||||
page: page.clone(),
|
||||
cell_idx,
|
||||
state: OverwriteCellState::FillPayload,
|
||||
state: OverwriteCellState::AllocatePayload,
|
||||
};
|
||||
continue;
|
||||
} else {
|
||||
@@ -2229,20 +2241,27 @@ impl BTreeCursor {
|
||||
write_info.state = WriteState::Insert {
|
||||
page: page.clone(),
|
||||
cell_idx,
|
||||
new_payload: Vec::with_capacity(record_values.len() + 4),
|
||||
fill_cell_payload_state: FillCellPayloadState::Start,
|
||||
};
|
||||
continue;
|
||||
}
|
||||
WriteState::Insert { page, cell_idx } => {
|
||||
let mut cell_payload: Vec<u8> = Vec::with_capacity(record_values.len() + 4);
|
||||
fill_cell_payload(
|
||||
WriteState::Insert {
|
||||
page,
|
||||
cell_idx,
|
||||
mut new_payload,
|
||||
mut fill_cell_payload_state,
|
||||
} => {
|
||||
return_if_io!(fill_cell_payload(
|
||||
page.get().get().contents.as_ref().unwrap(),
|
||||
bkey.maybe_rowid(),
|
||||
&mut cell_payload,
|
||||
&mut new_payload,
|
||||
cell_idx,
|
||||
record,
|
||||
self.usable_space(),
|
||||
self.pager.clone(),
|
||||
);
|
||||
&mut fill_cell_payload_state,
|
||||
));
|
||||
|
||||
{
|
||||
let page = page.get();
|
||||
@@ -2251,7 +2270,7 @@ impl BTreeCursor {
|
||||
|
||||
insert_into_cell(
|
||||
contents,
|
||||
cell_payload.as_slice(),
|
||||
new_payload.as_slice(),
|
||||
cell_idx,
|
||||
self.usable_space() as u16,
|
||||
)?;
|
||||
@@ -3160,7 +3179,17 @@ impl BTreeCursor {
|
||||
pages_to_balance_new[i].replace(page.clone());
|
||||
} else {
|
||||
// FIXME: handle page cache is full
|
||||
let page = self.allocate_page(page_type, 0)?;
|
||||
let mut page = self.allocate_page(page_type, 0)?;
|
||||
// FIXME: add new state machine state instead of this sync IO hack
|
||||
while matches!(page, IOResult::IO) {
|
||||
self.pager.io.run_once()?;
|
||||
page = self.allocate_page(page_type, 0)?;
|
||||
}
|
||||
let IOResult::Done(page) = page else {
|
||||
return Err(LimboError::InternalError(
|
||||
"Failed to allocate page".into(),
|
||||
));
|
||||
};
|
||||
pages_to_balance_new[i].replace(page);
|
||||
// Since this page didn't exist before, we can set it to cells length as it
|
||||
// marks them as empty since it is a prefix sum of cells.
|
||||
@@ -4030,7 +4059,7 @@ impl BTreeCursor {
|
||||
/// Balance the root page.
|
||||
/// This is done when the root page overflows, and we need to create a new root page.
|
||||
/// See e.g. https://en.wikipedia.org/wiki/B-tree
|
||||
fn balance_root(&mut self) -> Result<()> {
|
||||
fn balance_root(&mut self) -> Result<IOResult<()>> {
|
||||
/* todo: balance deeper, create child and copy contents of root there. Then split root */
|
||||
/* if we are in root page then we just need to create a new root and push key there */
|
||||
|
||||
@@ -4045,9 +4074,19 @@ impl BTreeCursor {
|
||||
let root = root_btree.get();
|
||||
let root_contents = root.get_contents();
|
||||
// FIXME: handle page cache is full
|
||||
let child_btree =
|
||||
self.pager
|
||||
.do_allocate_page(root_contents.page_type(), 0, BtreePageAllocMode::Any)?;
|
||||
// FIXME: remove sync IO hack
|
||||
let child_btree = loop {
|
||||
match self.pager.do_allocate_page(
|
||||
root_contents.page_type(),
|
||||
0,
|
||||
BtreePageAllocMode::Any,
|
||||
)? {
|
||||
IOResult::IO => {
|
||||
self.pager.io.run_once()?;
|
||||
}
|
||||
IOResult::Done(page) => break page,
|
||||
}
|
||||
};
|
||||
|
||||
tracing::debug!(
|
||||
"balance_root(root={}, rightmost={}, page_type={:?})",
|
||||
@@ -4108,7 +4147,7 @@ impl BTreeCursor {
|
||||
self.stack.push(root_btree.clone());
|
||||
self.stack.set_cell_index(0); // leave parent pointing at the rightmost pointer (in this case 0, as there are no cells), since we will be balancing the rightmost child page.
|
||||
self.stack.push(child_btree.clone());
|
||||
Ok(())
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
|
||||
fn usable_space(&self) -> usize {
|
||||
@@ -5157,21 +5196,38 @@ impl BTreeCursor {
|
||||
page_ref.get().get().id
|
||||
);
|
||||
match state {
|
||||
OverwriteCellState::FillPayload => {
|
||||
OverwriteCellState::AllocatePayload => {
|
||||
let serial_types_len = self.record_cursor.borrow_mut().len(record);
|
||||
let new_payload = Vec::with_capacity(serial_types_len);
|
||||
let rowid = return_if_io!(self.rowid());
|
||||
*state = OverwriteCellState::FillPayload {
|
||||
new_payload: Arc::new(Mutex::new(new_payload)),
|
||||
rowid,
|
||||
fill_cell_payload_state: FillCellPayloadState::Start,
|
||||
};
|
||||
continue;
|
||||
}
|
||||
OverwriteCellState::FillPayload {
|
||||
new_payload,
|
||||
rowid,
|
||||
fill_cell_payload_state,
|
||||
} => {
|
||||
let page = page_ref.get();
|
||||
let page_contents = page.get().contents.as_ref().unwrap();
|
||||
let serial_types_len = self.record_cursor.borrow_mut().len(record);
|
||||
let mut new_payload = Vec::with_capacity(serial_types_len);
|
||||
let rowid = return_if_io!(self.rowid());
|
||||
fill_cell_payload(
|
||||
page_contents,
|
||||
rowid,
|
||||
&mut new_payload,
|
||||
cell_idx,
|
||||
record,
|
||||
self.usable_space(),
|
||||
self.pager.clone(),
|
||||
);
|
||||
{
|
||||
let mut new_payload_mut = new_payload.lock();
|
||||
let new_payload_mut = &mut *new_payload_mut;
|
||||
return_if_io!(fill_cell_payload(
|
||||
page_contents,
|
||||
*rowid,
|
||||
new_payload_mut,
|
||||
cell_idx,
|
||||
record,
|
||||
self.usable_space(),
|
||||
self.pager.clone(),
|
||||
fill_cell_payload_state,
|
||||
));
|
||||
}
|
||||
// figure out old cell offset & size
|
||||
let (old_offset, old_local_size) = {
|
||||
let page_ref = page_ref.get();
|
||||
@@ -5180,7 +5236,7 @@ impl BTreeCursor {
|
||||
};
|
||||
|
||||
*state = OverwriteCellState::ClearOverflowPagesAndOverwrite {
|
||||
new_payload,
|
||||
new_payload: new_payload.clone(),
|
||||
old_offset,
|
||||
old_local_size,
|
||||
};
|
||||
@@ -5195,6 +5251,9 @@ impl BTreeCursor {
|
||||
let page_contents = page.get().contents.as_ref().unwrap();
|
||||
let cell = page_contents.cell_get(cell_idx, self.usable_space())?;
|
||||
return_if_io!(self.clear_overflow_pages(&cell));
|
||||
|
||||
let mut new_payload = new_payload.lock();
|
||||
let new_payload = &mut *new_payload;
|
||||
// if it all fits in local space and old_local_size is enough, do an in-place overwrite
|
||||
if new_payload.len() == *old_local_size {
|
||||
self.overwrite_content(page_ref.clone(), *old_offset, new_payload)?;
|
||||
@@ -5393,7 +5452,7 @@ impl BTreeCursor {
|
||||
btree_read_page(&self.pager, page_idx)
|
||||
}
|
||||
|
||||
pub fn allocate_page(&self, page_type: PageType, offset: usize) -> Result<BTreePage> {
|
||||
pub fn allocate_page(&self, page_type: PageType, offset: usize) -> Result<IOResult<BTreePage>> {
|
||||
self.pager
|
||||
.do_allocate_page(page_type, offset, BtreePageAllocMode::Any)
|
||||
}
|
||||
@@ -6701,8 +6760,26 @@ fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) -
|
||||
Ok(top as u16)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum FillCellPayloadState {
|
||||
Start,
|
||||
AllocateOverflowPages {
|
||||
/// Arc because we clone [WriteState] for some reason and we use unsafe pointer dereferences in [FillCellPayloadState::AllocateOverflowPages]
|
||||
/// so the underlying bytes must not be cloned in upper layers.
|
||||
record_buf: Arc<[u8]>,
|
||||
space_left: usize,
|
||||
to_copy_buffer_ptr: *const u8,
|
||||
to_copy_buffer_len: usize,
|
||||
pointer: *mut u8,
|
||||
pointer_to_next: *mut u8,
|
||||
},
|
||||
}
|
||||
|
||||
/// Fill in the cell payload with the record.
|
||||
/// If the record is too large to fit in the cell, it will spill onto overflow pages.
|
||||
/// This function needs a separate [FillCellPayloadState] because allocating overflow pages
|
||||
/// may require I/O.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn fill_cell_payload(
|
||||
page_contents: &PageContent,
|
||||
int_key: Option<i64>,
|
||||
@@ -6711,87 +6788,147 @@ fn fill_cell_payload(
|
||||
record: &ImmutableRecord,
|
||||
usable_space: usize,
|
||||
pager: Rc<Pager>,
|
||||
) {
|
||||
// TODO: make record raw from start, having to serialize is not good
|
||||
let record_buf = record.get_payload().to_vec();
|
||||
|
||||
let page_type = page_contents.page_type();
|
||||
// fill in header
|
||||
if matches!(page_type, PageType::IndexInterior) {
|
||||
// if a write happened on an index interior page, it is always an overwrite.
|
||||
// we must copy the left child pointer of the replaced cell to the new cell.
|
||||
let left_child_page = page_contents.cell_interior_read_left_child_page(cell_idx);
|
||||
cell_payload.extend_from_slice(&left_child_page.to_be_bytes());
|
||||
}
|
||||
if matches!(page_type, PageType::TableLeaf) {
|
||||
let int_key = int_key.unwrap();
|
||||
write_varint_to_vec(record_buf.len() as u64, cell_payload);
|
||||
write_varint_to_vec(int_key as u64, cell_payload);
|
||||
} else {
|
||||
write_varint_to_vec(record_buf.len() as u64, cell_payload);
|
||||
}
|
||||
|
||||
let payload_overflow_threshold_max = payload_overflow_threshold_max(page_type, usable_space);
|
||||
tracing::debug!(
|
||||
"fill_cell_payload(record_size={}, payload_overflow_threshold_max={})",
|
||||
record_buf.len(),
|
||||
payload_overflow_threshold_max
|
||||
);
|
||||
if record_buf.len() <= payload_overflow_threshold_max {
|
||||
// enough allowed space to fit inside a btree page
|
||||
cell_payload.extend_from_slice(record_buf.as_slice());
|
||||
return;
|
||||
}
|
||||
|
||||
let payload_overflow_threshold_min = payload_overflow_threshold_min(page_type, usable_space);
|
||||
// see e.g. https://github.com/sqlite/sqlite/blob/9591d3fe93936533c8c3b0dc4d025ac999539e11/src/dbstat.c#L371
|
||||
let mut space_left = payload_overflow_threshold_min
|
||||
+ (record_buf.len() - payload_overflow_threshold_min) % (usable_space - 4);
|
||||
|
||||
if space_left > payload_overflow_threshold_max {
|
||||
space_left = payload_overflow_threshold_min;
|
||||
}
|
||||
|
||||
// cell_size must be equal to first value of space_left as this will be the bytes copied to non-overflow page.
|
||||
let cell_size = space_left + cell_payload.len() + 4; // 4 is the number of bytes of pointer to first overflow page
|
||||
let mut to_copy_buffer = record_buf.as_slice();
|
||||
|
||||
let prev_size = cell_payload.len();
|
||||
cell_payload.resize(prev_size + space_left + 4, 0);
|
||||
let mut pointer = unsafe { cell_payload.as_mut_ptr().add(prev_size) };
|
||||
let mut pointer_to_next = unsafe { cell_payload.as_mut_ptr().add(prev_size + space_left) };
|
||||
|
||||
state: &mut FillCellPayloadState,
|
||||
) -> Result<IOResult<()>> {
|
||||
loop {
|
||||
let to_copy = space_left.min(to_copy_buffer.len());
|
||||
unsafe { std::ptr::copy(to_copy_buffer.as_ptr(), pointer, to_copy) };
|
||||
match state {
|
||||
FillCellPayloadState::Start => {
|
||||
// TODO: make record raw from start, having to serialize is not good
|
||||
let record_buf: Arc<[u8]> = Arc::from(record.get_payload());
|
||||
|
||||
let left = to_copy_buffer.len() - to_copy;
|
||||
if left == 0 {
|
||||
break;
|
||||
let page_type = page_contents.page_type();
|
||||
// fill in header
|
||||
if matches!(page_type, PageType::IndexInterior) {
|
||||
// if a write happened on an index interior page, it is always an overwrite.
|
||||
// we must copy the left child pointer of the replaced cell to the new cell.
|
||||
let left_child_page =
|
||||
page_contents.cell_interior_read_left_child_page(cell_idx);
|
||||
cell_payload.extend_from_slice(&left_child_page.to_be_bytes());
|
||||
}
|
||||
if matches!(page_type, PageType::TableLeaf) {
|
||||
let int_key = int_key.unwrap();
|
||||
write_varint_to_vec(record_buf.len() as u64, cell_payload);
|
||||
write_varint_to_vec(int_key as u64, cell_payload);
|
||||
} else {
|
||||
write_varint_to_vec(record_buf.len() as u64, cell_payload);
|
||||
}
|
||||
|
||||
let payload_overflow_threshold_max =
|
||||
payload_overflow_threshold_max(page_type, usable_space);
|
||||
tracing::debug!(
|
||||
"fill_cell_payload(record_size={}, payload_overflow_threshold_max={})",
|
||||
record_buf.len(),
|
||||
payload_overflow_threshold_max
|
||||
);
|
||||
if record_buf.len() <= payload_overflow_threshold_max {
|
||||
// enough allowed space to fit inside a btree page
|
||||
cell_payload.extend_from_slice(record_buf.as_ref());
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
|
||||
let payload_overflow_threshold_min =
|
||||
payload_overflow_threshold_min(page_type, usable_space);
|
||||
// see e.g. https://github.com/sqlite/sqlite/blob/9591d3fe93936533c8c3b0dc4d025ac999539e11/src/dbstat.c#L371
|
||||
let mut space_left = payload_overflow_threshold_min
|
||||
+ (record_buf.len() - payload_overflow_threshold_min) % (usable_space - 4);
|
||||
|
||||
if space_left > payload_overflow_threshold_max {
|
||||
space_left = payload_overflow_threshold_min;
|
||||
}
|
||||
|
||||
// cell_size must be equal to first value of space_left as this will be the bytes copied to non-overflow page.
|
||||
let cell_size = space_left + cell_payload.len() + 4; // 4 is the number of bytes of pointer to first overflow page
|
||||
let to_copy_buffer = record_buf.as_ref();
|
||||
|
||||
let prev_size = cell_payload.len();
|
||||
cell_payload.resize(prev_size + space_left + 4, 0);
|
||||
assert_eq!(
|
||||
cell_size,
|
||||
cell_payload.len(),
|
||||
"cell_size={} != cell_payload.len()={}",
|
||||
cell_size,
|
||||
cell_payload.len()
|
||||
);
|
||||
|
||||
// SAFETY: this pointer is valid because it points to a buffer in an Arc<Mutex<Vec<u8>>> that lives at least as long as this function,
|
||||
// and the Vec will not be mutated in FillCellPayloadState::AllocateOverflowPages, which we will move to next.
|
||||
let pointer = unsafe { cell_payload.as_mut_ptr().add(prev_size) };
|
||||
let pointer_to_next =
|
||||
unsafe { cell_payload.as_mut_ptr().add(prev_size + space_left) };
|
||||
|
||||
let to_copy_buffer_ptr = to_copy_buffer.as_ptr();
|
||||
let to_copy_buffer_len = to_copy_buffer.len();
|
||||
|
||||
*state = FillCellPayloadState::AllocateOverflowPages {
|
||||
record_buf,
|
||||
space_left,
|
||||
to_copy_buffer_ptr,
|
||||
to_copy_buffer_len,
|
||||
pointer,
|
||||
pointer_to_next,
|
||||
};
|
||||
continue;
|
||||
}
|
||||
FillCellPayloadState::AllocateOverflowPages {
|
||||
record_buf: _record_buf,
|
||||
space_left,
|
||||
to_copy_buffer_ptr,
|
||||
to_copy_buffer_len,
|
||||
pointer,
|
||||
pointer_to_next,
|
||||
} => {
|
||||
let to_copy;
|
||||
{
|
||||
let to_copy_buffer_ptr = *to_copy_buffer_ptr;
|
||||
let to_copy_buffer_len = *to_copy_buffer_len;
|
||||
let pointer = *pointer;
|
||||
let space_left = *space_left;
|
||||
|
||||
// SAFETY: we know to_copy_buffer_ptr is valid because it refers to record_buf which lives at least as long as this function,
|
||||
// and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages.
|
||||
let to_copy_buffer = unsafe {
|
||||
std::slice::from_raw_parts(to_copy_buffer_ptr, to_copy_buffer_len)
|
||||
};
|
||||
to_copy = space_left.min(to_copy_buffer_len);
|
||||
// SAFETY: we know 'pointer' is valid because it refers to cell_payload which lives at least as long as this function,
|
||||
// and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages.
|
||||
unsafe { std::ptr::copy(to_copy_buffer_ptr, pointer, to_copy) };
|
||||
|
||||
let left = to_copy_buffer.len() - to_copy;
|
||||
if left == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// we still have bytes to add, we will need to allocate new overflow page
|
||||
// FIXME: handle page cache is full
|
||||
let overflow_page = return_if_io!(pager.allocate_overflow_page());
|
||||
turso_assert!(overflow_page.is_loaded(), "overflow page is not loaded");
|
||||
{
|
||||
let id = overflow_page.get().id as u32;
|
||||
let contents = overflow_page.get_contents();
|
||||
|
||||
// TODO: take into account offset here?
|
||||
let buf = contents.as_ptr();
|
||||
let as_bytes = id.to_be_bytes();
|
||||
// update pointer to new overflow page
|
||||
// SAFETY: we know 'pointer_to_next' is valid because it refers to an offset in cell_payload which is less than space_left + 4,
|
||||
// and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages.
|
||||
unsafe { std::ptr::copy(as_bytes.as_ptr(), *pointer_to_next, 4) };
|
||||
|
||||
*pointer = unsafe { buf.as_mut_ptr().add(4) };
|
||||
*pointer_to_next = buf.as_mut_ptr();
|
||||
*space_left = usable_space - 4;
|
||||
}
|
||||
|
||||
*to_copy_buffer_len -= to_copy;
|
||||
// SAFETY: we know 'to_copy_buffer_ptr' is valid because it refers to record_buf which lives at least as long as this function,
|
||||
// and that the offset is less than its length, and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages.
|
||||
*to_copy_buffer_ptr = unsafe { to_copy_buffer_ptr.add(to_copy) };
|
||||
}
|
||||
}
|
||||
|
||||
// we still have bytes to add, we will need to allocate new overflow page
|
||||
// FIXME: handle page cache is full
|
||||
let overflow_page = pager.allocate_overflow_page();
|
||||
{
|
||||
let id = overflow_page.get().id as u32;
|
||||
let contents = overflow_page.get().contents.as_mut().unwrap();
|
||||
|
||||
// TODO: take into account offset here?
|
||||
let buf = contents.as_ptr();
|
||||
let as_bytes = id.to_be_bytes();
|
||||
// update pointer to new overflow page
|
||||
unsafe { std::ptr::copy(as_bytes.as_ptr(), pointer_to_next, 4) };
|
||||
|
||||
pointer = unsafe { buf.as_mut_ptr().add(4) };
|
||||
pointer_to_next = buf.as_mut_ptr();
|
||||
space_left = usable_space - 4;
|
||||
}
|
||||
|
||||
to_copy_buffer = &to_copy_buffer[to_copy..];
|
||||
}
|
||||
|
||||
assert_eq!(cell_size, cell_payload.len());
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
|
||||
/// Returns the maximum payload size (X) that can be stored directly on a b-tree page without spilling to overflow pages.
|
||||
@@ -6960,15 +7097,23 @@ mod tests {
|
||||
conn: &Arc<Connection>,
|
||||
) -> Vec<u8> {
|
||||
let mut payload: Vec<u8> = Vec::new();
|
||||
fill_cell_payload(
|
||||
page,
|
||||
Some(id as i64),
|
||||
&mut payload,
|
||||
pos,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.borrow().clone(),
|
||||
);
|
||||
let mut fill_cell_payload_state = FillCellPayloadState::Start;
|
||||
run_until_done(
|
||||
|| {
|
||||
fill_cell_payload(
|
||||
page,
|
||||
Some(id as i64),
|
||||
&mut payload,
|
||||
pos,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.borrow().clone(),
|
||||
&mut fill_cell_payload_state,
|
||||
)
|
||||
},
|
||||
&conn.pager.borrow().clone(),
|
||||
)
|
||||
.unwrap();
|
||||
insert_into_cell(page, &payload, pos, 4096).unwrap();
|
||||
payload
|
||||
}
|
||||
@@ -7209,7 +7354,7 @@ mod tests {
|
||||
|
||||
// FIXME: handle page cache is full
|
||||
let _ = run_until_done(|| pager.allocate_page1(), &pager);
|
||||
let page2 = pager.allocate_page().unwrap();
|
||||
let page2 = run_until_done(|| pager.allocate_page(), &pager).unwrap();
|
||||
let page2 = Arc::new(BTreePageInner {
|
||||
page: RefCell::new(page2),
|
||||
});
|
||||
@@ -8320,11 +8465,20 @@ mod tests {
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), 2, num_columns);
|
||||
|
||||
// Initialize page 2 as a root page (interior)
|
||||
let root_page = cursor.allocate_page(PageType::TableInterior, 0)?;
|
||||
let root_page = run_until_done(
|
||||
|| cursor.allocate_page(PageType::TableInterior, 0),
|
||||
&cursor.pager,
|
||||
)?;
|
||||
|
||||
// Allocate two leaf pages
|
||||
let page3 = cursor.allocate_page(PageType::TableLeaf, 0)?;
|
||||
let page4 = cursor.allocate_page(PageType::TableLeaf, 0)?;
|
||||
let page3 = run_until_done(
|
||||
|| cursor.allocate_page(PageType::TableLeaf, 0),
|
||||
&cursor.pager,
|
||||
)?;
|
||||
let page4 = run_until_done(
|
||||
|| cursor.allocate_page(PageType::TableLeaf, 0),
|
||||
&cursor.pager,
|
||||
)?;
|
||||
|
||||
// Configure the root page to point to the two leaf pages
|
||||
{
|
||||
@@ -8502,15 +8656,23 @@ mod tests {
|
||||
let regs = &[Register::Value(Value::Integer(i as i64))];
|
||||
let record = ImmutableRecord::from_registers(regs, regs.len());
|
||||
let mut payload: Vec<u8> = Vec::new();
|
||||
fill_cell_payload(
|
||||
page,
|
||||
Some(i as i64),
|
||||
&mut payload,
|
||||
cell_idx,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.borrow().clone(),
|
||||
);
|
||||
let mut fill_cell_payload_state = FillCellPayloadState::Start;
|
||||
run_until_done(
|
||||
|| {
|
||||
fill_cell_payload(
|
||||
page,
|
||||
Some(i as i64),
|
||||
&mut payload,
|
||||
cell_idx,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.borrow().clone(),
|
||||
&mut fill_cell_payload_state,
|
||||
)
|
||||
},
|
||||
&conn.pager.borrow().clone(),
|
||||
)
|
||||
.unwrap();
|
||||
if (free as usize) < payload.len() + 2 {
|
||||
// do not try to insert overflow pages because they require balancing
|
||||
continue;
|
||||
@@ -8576,15 +8738,23 @@ mod tests {
|
||||
let regs = &[Register::Value(Value::Integer(i))];
|
||||
let record = ImmutableRecord::from_registers(regs, regs.len());
|
||||
let mut payload: Vec<u8> = Vec::new();
|
||||
fill_cell_payload(
|
||||
page,
|
||||
Some(i),
|
||||
&mut payload,
|
||||
cell_idx,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.borrow().clone(),
|
||||
);
|
||||
let mut fill_cell_payload_state = FillCellPayloadState::Start;
|
||||
run_until_done(
|
||||
|| {
|
||||
fill_cell_payload(
|
||||
page,
|
||||
Some(i),
|
||||
&mut payload,
|
||||
cell_idx,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.borrow().clone(),
|
||||
&mut fill_cell_payload_state,
|
||||
)
|
||||
},
|
||||
&conn.pager.borrow().clone(),
|
||||
)
|
||||
.unwrap();
|
||||
if (free as usize) < payload.len() - 2 {
|
||||
// do not try to insert overflow pages because they require balancing
|
||||
continue;
|
||||
@@ -8941,15 +9111,23 @@ mod tests {
|
||||
let regs = &[Register::Value(Value::Integer(0))];
|
||||
let record = ImmutableRecord::from_registers(regs, regs.len());
|
||||
let mut payload: Vec<u8> = Vec::new();
|
||||
fill_cell_payload(
|
||||
page.get().get_contents(),
|
||||
Some(0),
|
||||
&mut payload,
|
||||
0,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.borrow().clone(),
|
||||
);
|
||||
let mut fill_cell_payload_state = FillCellPayloadState::Start;
|
||||
run_until_done(
|
||||
|| {
|
||||
fill_cell_payload(
|
||||
page.get().get_contents(),
|
||||
Some(0),
|
||||
&mut payload,
|
||||
0,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.borrow().clone(),
|
||||
&mut fill_cell_payload_state,
|
||||
)
|
||||
},
|
||||
&conn.pager.borrow().clone(),
|
||||
)
|
||||
.unwrap();
|
||||
let page = page.get();
|
||||
insert(0, page.get_contents());
|
||||
defragment(page.get_contents());
|
||||
@@ -9019,15 +9197,23 @@ mod tests {
|
||||
let regs = &[Register::Value(Value::Blob(vec![0; 3600]))];
|
||||
let record = ImmutableRecord::from_registers(regs, regs.len());
|
||||
let mut payload: Vec<u8> = Vec::new();
|
||||
fill_cell_payload(
|
||||
page.get().get_contents(),
|
||||
Some(0),
|
||||
&mut payload,
|
||||
0,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.borrow().clone(),
|
||||
);
|
||||
let mut fill_cell_payload_state = FillCellPayloadState::Start;
|
||||
run_until_done(
|
||||
|| {
|
||||
fill_cell_payload(
|
||||
page.get().get_contents(),
|
||||
Some(0),
|
||||
&mut payload,
|
||||
0,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.borrow().clone(),
|
||||
&mut fill_cell_payload_state,
|
||||
)
|
||||
},
|
||||
&conn.pager.borrow().clone(),
|
||||
)
|
||||
.unwrap();
|
||||
insert_into_cell(page.get().get_contents(), &payload, 0, 4096).unwrap();
|
||||
let free = compute_free_space(page.get().get_contents(), usable_space);
|
||||
let total_size = payload.len() + 2;
|
||||
@@ -9355,7 +9541,7 @@ mod tests {
|
||||
let mut cells_cloned = Vec::new();
|
||||
let (pager, _, _, _) = empty_btree();
|
||||
let page_type = PageType::TableLeaf;
|
||||
let page = pager.allocate_page().unwrap();
|
||||
let page = run_until_done(|| pager.allocate_page(), &pager).unwrap();
|
||||
let page = Arc::new(BTreePageInner {
|
||||
page: RefCell::new(page),
|
||||
});
|
||||
@@ -9427,15 +9613,23 @@ mod tests {
|
||||
let mut payload = Vec::new();
|
||||
let regs = &[Register::Value(Value::Blob(vec![0; size as usize]))];
|
||||
let record = ImmutableRecord::from_registers(regs, regs.len());
|
||||
fill_cell_payload(
|
||||
contents,
|
||||
Some(cell_idx as i64),
|
||||
&mut payload,
|
||||
cell_idx as usize,
|
||||
&record,
|
||||
pager.usable_space(),
|
||||
pager.clone(),
|
||||
);
|
||||
let mut fill_cell_payload_state = FillCellPayloadState::Start;
|
||||
run_until_done(
|
||||
|| {
|
||||
fill_cell_payload(
|
||||
contents,
|
||||
Some(cell_idx as i64),
|
||||
&mut payload,
|
||||
cell_idx as usize,
|
||||
&record,
|
||||
pager.usable_space(),
|
||||
pager.clone(),
|
||||
&mut fill_cell_payload_state,
|
||||
)
|
||||
},
|
||||
&pager,
|
||||
)
|
||||
.unwrap();
|
||||
insert_into_cell(
|
||||
contents,
|
||||
&payload,
|
||||
|
||||
@@ -221,6 +221,10 @@ enum CheckpointState {
|
||||
}
|
||||
|
||||
/// The mode of allocating a btree page.
|
||||
/// SQLite defines the following:
|
||||
/// #define BTALLOC_ANY 0 /* Allocate any page */
|
||||
/// #define BTALLOC_EXACT 1 /* Allocate exact page if possible */
|
||||
/// #define BTALLOC_LE 2 /* Allocate any page <= the parameter */
|
||||
pub enum BtreePageAllocMode {
|
||||
/// Allocate any btree page
|
||||
Any,
|
||||
@@ -335,6 +339,9 @@ pub struct Pager {
|
||||
pub db_state: Arc<AtomicDbState>,
|
||||
/// Mutex for synchronizing database initialization to prevent race conditions
|
||||
init_lock: Arc<Mutex<()>>,
|
||||
/// The state of the current allocate page operation.
|
||||
allocate_page_state: RefCell<AllocatePageState>,
|
||||
/// The state of the current allocate page1 operation.
|
||||
allocate_page1_state: RefCell<AllocatePage1State>,
|
||||
/// Cache page_size and reserved_space at Pager init and reuse for subsequent
|
||||
/// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality
|
||||
@@ -355,6 +362,29 @@ pub enum PagerCommitResult {
|
||||
Rollback,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum AllocatePageState {
|
||||
Start,
|
||||
/// Search the trunk page for an available free list leaf.
|
||||
/// If none are found, there are two options:
|
||||
/// - If there are no more trunk pages, the freelist is empty, so allocate a new page.
|
||||
/// - If there are more trunk pages, use the current first trunk page as the new allocation,
|
||||
/// and set the next trunk page as the database's "first freelist trunk page".
|
||||
SearchAvailableFreeListLeaf {
|
||||
trunk_page: PageRef,
|
||||
current_db_size: u32,
|
||||
},
|
||||
/// If a freelist leaf is found, reuse it for the page allocation and remove it from the trunk page.
|
||||
ReuseFreelistLeaf {
|
||||
trunk_page: PageRef,
|
||||
number_of_freelist_leaves: u32,
|
||||
},
|
||||
/// If a suitable freelist leaf is not found, allocate an entirely new page.
|
||||
AllocateNewPage {
|
||||
current_db_size: u32,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum AllocatePage1State {
|
||||
Start,
|
||||
@@ -421,6 +451,7 @@ impl Pager {
|
||||
dirty_pages: Vec::new(),
|
||||
}),
|
||||
free_page_state: RefCell::new(FreePageState::Start),
|
||||
allocate_page_state: RefCell::new(AllocatePageState::Start),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -612,9 +643,8 @@ impl Pager {
|
||||
};
|
||||
#[cfg(feature = "omit_autovacuum")]
|
||||
{
|
||||
let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?;
|
||||
let page_id = page.get().get().id;
|
||||
Ok(IOResult::Done(page_id as u32))
|
||||
let page = return_if_io!(self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any));
|
||||
Ok(IOResult::Done(page.get().get().id as u32))
|
||||
}
|
||||
|
||||
// If autovacuum is enabled, we need to allocate a new page number that is greater than the largest root page number
|
||||
@@ -623,9 +653,9 @@ impl Pager {
|
||||
let auto_vacuum_mode = self.auto_vacuum_mode.borrow();
|
||||
match *auto_vacuum_mode {
|
||||
AutoVacuumMode::None => {
|
||||
let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?;
|
||||
let page_id = page.get().get().id;
|
||||
Ok(IOResult::Done(page_id as u32))
|
||||
let page =
|
||||
return_if_io!(self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any));
|
||||
Ok(IOResult::Done(page.get().get().id as u32))
|
||||
}
|
||||
AutoVacuumMode::Full => {
|
||||
let mut root_page_num =
|
||||
@@ -648,11 +678,11 @@ impl Pager {
|
||||
assert!(root_page_num >= 3); // the very first root page is page 3
|
||||
|
||||
// root_page_num here is the desired root page
|
||||
let page = self.do_allocate_page(
|
||||
let page = return_if_io!(self.do_allocate_page(
|
||||
page_type,
|
||||
0,
|
||||
BtreePageAllocMode::Exact(root_page_num),
|
||||
)?;
|
||||
));
|
||||
let allocated_page_id = page.get().get().id as u32;
|
||||
if allocated_page_id != root_page_num {
|
||||
// TODO(Zaid): Handle swapping the allocated page with the desired root page
|
||||
@@ -676,8 +706,8 @@ impl Pager {
|
||||
/// Allocate a new overflow page.
|
||||
/// This is done when a cell overflows and new space is needed.
|
||||
// FIXME: handle no room in page cache
|
||||
pub fn allocate_overflow_page(&self) -> PageRef {
|
||||
let page = self.allocate_page().unwrap();
|
||||
pub fn allocate_overflow_page(&self) -> Result<IOResult<PageRef>> {
|
||||
let page = return_if_io!(self.allocate_page());
|
||||
tracing::debug!("Pager::allocate_overflow_page(id={})", page.get().id);
|
||||
|
||||
// setup overflow page
|
||||
@@ -685,7 +715,7 @@ impl Pager {
|
||||
let buf = contents.as_ptr();
|
||||
buf.fill(0);
|
||||
|
||||
page
|
||||
Ok(IOResult::Done(page))
|
||||
}
|
||||
|
||||
/// Allocate a new page to the btree via the pager.
|
||||
@@ -696,8 +726,8 @@ impl Pager {
|
||||
page_type: PageType,
|
||||
offset: usize,
|
||||
_alloc_mode: BtreePageAllocMode,
|
||||
) -> Result<BTreePage> {
|
||||
let page = self.allocate_page()?;
|
||||
) -> Result<IOResult<BTreePage>> {
|
||||
let page = return_if_io!(self.allocate_page());
|
||||
let page = Arc::new(BTreePageInner {
|
||||
page: RefCell::new(page),
|
||||
});
|
||||
@@ -707,7 +737,7 @@ impl Pager {
|
||||
page.get().get().id,
|
||||
page.get().get_contents().page_type()
|
||||
);
|
||||
Ok(page)
|
||||
Ok(IOResult::Done(page))
|
||||
}
|
||||
|
||||
/// The "usable size" of a database page is the page size specified by the 2-byte integer at offset 16
|
||||
@@ -1412,7 +1442,7 @@ impl Pager {
|
||||
if let Some(size) = self.page_size.get() {
|
||||
default_header.update_page_size(size);
|
||||
}
|
||||
let page = allocate_page(1, &self.buffer_pool, 0);
|
||||
let page = allocate_new_page(1, &self.buffer_pool, 0);
|
||||
|
||||
let contents = page.get_contents();
|
||||
contents.write_database_header(&default_header);
|
||||
@@ -1470,63 +1500,239 @@ impl Pager {
|
||||
)
|
||||
}
|
||||
|
||||
/*
|
||||
Gets a new page that increasing the size of the page or uses a free page.
|
||||
Currently free list pages are not yet supported.
|
||||
*/
|
||||
// FIXME: handle no room in page cache
|
||||
/// Tries to reuse a page from the freelist if available.
|
||||
/// If not, allocates a new page which increases the database size.
|
||||
///
|
||||
/// FIXME: implement sqlite's 'nearby' parameter and use AllocMode.
|
||||
/// SQLite's allocate_page() equivalent has a parameter 'nearby' which is a hint about the page number we want to have for the allocated page.
|
||||
/// We should use this parameter to allocate the page in the same way as SQLite does; instead now we just either take the first available freelist page
|
||||
/// or allocate a new page.
|
||||
/// FIXME: handle no room in page cache
|
||||
#[allow(clippy::readonly_write_lock)]
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn allocate_page(&self) -> Result<PageRef> {
|
||||
let old_db_size = header_accessor::get_database_size(self)?;
|
||||
#[allow(unused_mut)]
|
||||
let mut new_db_size = old_db_size + 1;
|
||||
pub fn allocate_page(&self) -> Result<IOResult<PageRef>> {
|
||||
const FREELIST_TRUNK_OFFSET_NEXT_TRUNK: usize = 0;
|
||||
const FREELIST_TRUNK_OFFSET_LEAF_COUNT: usize = 4;
|
||||
const FREELIST_TRUNK_OFFSET_FIRST_LEAF: usize = 8;
|
||||
|
||||
tracing::debug!("allocate_page(database_size={})", new_db_size);
|
||||
loop {
|
||||
let mut state = self.allocate_page_state.borrow_mut();
|
||||
tracing::debug!("allocate_page(state={:?})", state);
|
||||
match &mut *state {
|
||||
AllocatePageState::Start => {
|
||||
let old_db_size = header_accessor::get_database_size(self)?;
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
let mut new_db_size = old_db_size;
|
||||
#[cfg(feature = "omit_autovacuum")]
|
||||
let new_db_size = old_db_size;
|
||||
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
{
|
||||
// If the following conditions are met, allocate a pointer map page, add to cache and increment the database size
|
||||
// - autovacuum is enabled
|
||||
// - the last page is a pointer map page
|
||||
if matches!(*self.auto_vacuum_mode.borrow(), AutoVacuumMode::Full)
|
||||
&& is_ptrmap_page(new_db_size, header_accessor::get_page_size(self)? as usize)
|
||||
{
|
||||
let page = allocate_page(new_db_size as usize, &self.buffer_pool, 0);
|
||||
self.add_dirty(&page);
|
||||
tracing::debug!("allocate_page(database_size={})", new_db_size);
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
{
|
||||
// If the following conditions are met, allocate a pointer map page, add to cache and increment the database size
|
||||
// - autovacuum is enabled
|
||||
// - the last page is a pointer map page
|
||||
if matches!(*self.auto_vacuum_mode.borrow(), AutoVacuumMode::Full)
|
||||
&& is_ptrmap_page(
|
||||
new_db_size + 1,
|
||||
header_accessor::get_page_size(self)? as usize,
|
||||
)
|
||||
{
|
||||
// we will allocate a ptrmap page, so increment size
|
||||
new_db_size += 1;
|
||||
let page =
|
||||
allocate_new_page(new_db_size as usize, &self.buffer_pool, 0);
|
||||
self.add_dirty(&page);
|
||||
let page_key = PageCacheKey::new(page.get().id);
|
||||
let mut cache = self.page_cache.write();
|
||||
match cache.insert(page_key, page.clone()) {
|
||||
Ok(_) => (),
|
||||
Err(CacheError::Full) => return Err(LimboError::CacheFull),
|
||||
Err(_) => {
|
||||
return Err(LimboError::InternalError(
|
||||
"Unknown error inserting page to cache".into(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let page_key = PageCacheKey::new(page.get().id);
|
||||
let mut cache = self.page_cache.write();
|
||||
match cache.insert(page_key, page.clone()) {
|
||||
Ok(_) => (),
|
||||
Err(CacheError::Full) => return Err(LimboError::CacheFull),
|
||||
Err(_) => {
|
||||
return Err(LimboError::InternalError(
|
||||
"Unknown error inserting page to cache".into(),
|
||||
))
|
||||
let first_freelist_trunk_page_id =
|
||||
header_accessor::get_freelist_trunk_page(self)?;
|
||||
if first_freelist_trunk_page_id == 0 {
|
||||
*state = AllocatePageState::AllocateNewPage {
|
||||
current_db_size: new_db_size,
|
||||
};
|
||||
continue;
|
||||
}
|
||||
let trunk_page = self.read_page(first_freelist_trunk_page_id as usize)?;
|
||||
*state = AllocatePageState::SearchAvailableFreeListLeaf {
|
||||
trunk_page,
|
||||
current_db_size: new_db_size,
|
||||
};
|
||||
continue;
|
||||
}
|
||||
AllocatePageState::SearchAvailableFreeListLeaf {
|
||||
trunk_page,
|
||||
current_db_size,
|
||||
} => {
|
||||
if trunk_page.is_locked() {
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
turso_assert!(
|
||||
trunk_page.is_loaded(),
|
||||
"Freelist trunk page {} is not loaded",
|
||||
trunk_page.get().id
|
||||
);
|
||||
let page_contents = trunk_page.get().contents.as_ref().unwrap();
|
||||
let next_trunk_page_id =
|
||||
page_contents.read_u32(FREELIST_TRUNK_OFFSET_NEXT_TRUNK);
|
||||
let number_of_freelist_leaves =
|
||||
page_contents.read_u32(FREELIST_TRUNK_OFFSET_LEAF_COUNT);
|
||||
|
||||
// There are leaf pointers on this trunk page, so we can reuse one of the pages
|
||||
// for the allocation.
|
||||
if number_of_freelist_leaves != 0 {
|
||||
*state = AllocatePageState::ReuseFreelistLeaf {
|
||||
trunk_page: trunk_page.clone(),
|
||||
number_of_freelist_leaves,
|
||||
};
|
||||
continue;
|
||||
}
|
||||
|
||||
// No freelist leaves on this trunk page.
|
||||
// If the freelist is completely empty, allocate a new page.
|
||||
if next_trunk_page_id == 0 {
|
||||
*state = AllocatePageState::AllocateNewPage {
|
||||
current_db_size: *current_db_size,
|
||||
};
|
||||
continue;
|
||||
}
|
||||
|
||||
// Freelist is not empty, so we can reuse the trunk itself as a new page
|
||||
// and update the database's first freelist trunk page to the next trunk page.
|
||||
header_accessor::set_freelist_trunk_page(self, next_trunk_page_id)?;
|
||||
header_accessor::set_freelist_pages(
|
||||
self,
|
||||
header_accessor::get_freelist_pages(self)? - 1,
|
||||
)?;
|
||||
self.add_dirty(trunk_page);
|
||||
// zero out the page
|
||||
turso_assert!(
|
||||
trunk_page.get_contents().overflow_cells.is_empty(),
|
||||
"Freelist leaf page {} has overflow cells",
|
||||
trunk_page.get().id
|
||||
);
|
||||
trunk_page.get().contents.as_ref().unwrap().as_ptr().fill(0);
|
||||
let page_key = PageCacheKey::new(trunk_page.get().id);
|
||||
{
|
||||
let mut page_cache = self.page_cache.write();
|
||||
turso_assert!(
|
||||
page_cache.contains_key(&page_key),
|
||||
"page {} is not in cache",
|
||||
trunk_page.get().id
|
||||
);
|
||||
}
|
||||
let trunk_page = trunk_page.clone();
|
||||
*state = AllocatePageState::Start;
|
||||
return Ok(IOResult::Done(trunk_page));
|
||||
}
|
||||
AllocatePageState::ReuseFreelistLeaf {
|
||||
trunk_page,
|
||||
number_of_freelist_leaves,
|
||||
} => {
|
||||
turso_assert!(
|
||||
trunk_page.is_loaded(),
|
||||
"Freelist trunk page {} is not loaded",
|
||||
trunk_page.get().id
|
||||
);
|
||||
turso_assert!(
|
||||
*number_of_freelist_leaves > 0,
|
||||
"Freelist trunk page {} has no leaves",
|
||||
trunk_page.get().id
|
||||
);
|
||||
let page_contents = trunk_page.get().contents.as_ref().unwrap();
|
||||
let next_leaf_page_id =
|
||||
page_contents.read_u32(FREELIST_TRUNK_OFFSET_FIRST_LEAF);
|
||||
let leaf_page = self.read_page(next_leaf_page_id as usize)?;
|
||||
if leaf_page.is_locked() {
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
self.add_dirty(&leaf_page);
|
||||
// zero out the page
|
||||
turso_assert!(
|
||||
leaf_page.get_contents().overflow_cells.is_empty(),
|
||||
"Freelist leaf page {} has overflow cells",
|
||||
leaf_page.get().id
|
||||
);
|
||||
leaf_page.get().contents.as_ref().unwrap().as_ptr().fill(0);
|
||||
let page_key = PageCacheKey::new(leaf_page.get().id);
|
||||
{
|
||||
let mut page_cache = self.page_cache.write();
|
||||
turso_assert!(
|
||||
page_cache.contains_key(&page_key),
|
||||
"page {} is not in cache",
|
||||
leaf_page.get().id
|
||||
);
|
||||
}
|
||||
|
||||
// Shift left all the other leaf pages in the trunk page and subtract 1 from the leaf count
|
||||
let remaining_leaves_count = (*number_of_freelist_leaves - 1) as usize;
|
||||
{
|
||||
let buf = page_contents.as_ptr();
|
||||
// use copy within the same page
|
||||
const LEAF_PTR_SIZE_BYTES: usize = 4;
|
||||
let offset_remaining_leaves_start =
|
||||
FREELIST_TRUNK_OFFSET_FIRST_LEAF + LEAF_PTR_SIZE_BYTES;
|
||||
let offset_remaining_leaves_end = offset_remaining_leaves_start
|
||||
+ remaining_leaves_count * LEAF_PTR_SIZE_BYTES;
|
||||
buf.copy_within(
|
||||
offset_remaining_leaves_start..offset_remaining_leaves_end,
|
||||
FREELIST_TRUNK_OFFSET_FIRST_LEAF,
|
||||
);
|
||||
}
|
||||
// write the new leaf count
|
||||
page_contents.write_u32(
|
||||
FREELIST_TRUNK_OFFSET_LEAF_COUNT,
|
||||
remaining_leaves_count as u32,
|
||||
);
|
||||
self.add_dirty(trunk_page);
|
||||
|
||||
header_accessor::set_freelist_pages(
|
||||
self,
|
||||
header_accessor::get_freelist_pages(self)? - 1,
|
||||
)?;
|
||||
|
||||
*state = AllocatePageState::Start;
|
||||
return Ok(IOResult::Done(leaf_page));
|
||||
}
|
||||
AllocatePageState::AllocateNewPage { current_db_size } => {
|
||||
let new_db_size = *current_db_size + 1;
|
||||
// FIXME: should reserve page cache entry before modifying the database
|
||||
let page = allocate_new_page(new_db_size as usize, &self.buffer_pool, 0);
|
||||
{
|
||||
// setup page and add to cache
|
||||
self.add_dirty(&page);
|
||||
|
||||
let page_key = PageCacheKey::new(page.get().id);
|
||||
{
|
||||
// Run in separate block to avoid deadlock on page cache write lock
|
||||
let mut cache = self.page_cache.write();
|
||||
match cache.insert(page_key, page.clone()) {
|
||||
Err(CacheError::Full) => return Err(LimboError::CacheFull),
|
||||
Err(_) => {
|
||||
return Err(LimboError::InternalError(
|
||||
"Unknown error inserting page to cache".into(),
|
||||
))
|
||||
}
|
||||
Ok(_) => {}
|
||||
};
|
||||
}
|
||||
header_accessor::set_database_size(self, new_db_size)?;
|
||||
*state = AllocatePageState::Start;
|
||||
return Ok(IOResult::Done(page));
|
||||
}
|
||||
}
|
||||
// we allocated a ptrmap page, so the next data page will be at new_db_size + 1
|
||||
new_db_size += 1;
|
||||
}
|
||||
}
|
||||
|
||||
header_accessor::set_database_size(self, new_db_size)?;
|
||||
|
||||
// FIXME: should reserve page cache entry before modifying the database
|
||||
let page = allocate_page(new_db_size as usize, &self.buffer_pool, 0);
|
||||
{
|
||||
// setup page and add to cache
|
||||
self.add_dirty(&page);
|
||||
|
||||
let page_key = PageCacheKey::new(page.get().id);
|
||||
let mut cache = self.page_cache.write();
|
||||
match cache.insert(page_key, page.clone()) {
|
||||
Err(CacheError::Full) => Err(LimboError::CacheFull),
|
||||
Err(_) => Err(LimboError::InternalError(
|
||||
"Unknown error inserting page to cache".into(),
|
||||
)),
|
||||
Ok(_) => Ok(page),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1594,10 +1800,11 @@ impl Pager {
|
||||
in_flight_writes: Rc::new(RefCell::new(0)),
|
||||
dirty_pages: Vec::new(),
|
||||
});
|
||||
self.allocate_page_state.replace(AllocatePageState::Start);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn allocate_page(page_id: usize, buffer_pool: &Arc<BufferPool>, offset: usize) -> PageRef {
|
||||
pub fn allocate_new_page(page_id: usize, buffer_pool: &Arc<BufferPool>, offset: usize) -> PageRef {
|
||||
let page = Arc::new(Page::new(page_id));
|
||||
{
|
||||
let buffer = buffer_pool.get();
|
||||
@@ -1913,11 +2120,14 @@ mod ptrmap_tests {
|
||||
pager.set_auto_vacuum_mode(AutoVacuumMode::Full);
|
||||
|
||||
// Allocate all the pages as btree root pages
|
||||
for _ in 0..initial_db_pages {
|
||||
match pager.btree_create(&CreateBTreeFlags::new_table()) {
|
||||
Ok(IOResult::Done(_root_page_id)) => (),
|
||||
Ok(IOResult::IO) => {
|
||||
panic!("test_pager_setup: btree_create returned IOResult::IO unexpectedly");
|
||||
const EXPECTED_FIRST_ROOT_PAGE_ID: u32 = 3; // page1 = 1, first ptrmap page = 2, root page = 3
|
||||
for i in 0..initial_db_pages {
|
||||
match run_until_done(
|
||||
|| pager.btree_create(&CreateBTreeFlags::new_table()),
|
||||
&pager,
|
||||
) {
|
||||
Ok(root_page_id) => {
|
||||
assert_eq!(root_page_id, EXPECTED_FIRST_ROOT_PAGE_ID + i);
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("test_pager_setup: btree_create failed: {e:?}");
|
||||
|
||||
Reference in New Issue
Block a user