Merge 'Change more function signatures to return Completions' from Pedro Muniz

Closes #2330
This commit is contained in:
Pekka Enberg
2025-07-30 10:42:45 +03:00
7 changed files with 128 additions and 98 deletions

View File

@@ -53,8 +53,11 @@ impl IO for MemoryIO {
Ok(())
}
fn wait_for_completion(&self, _c: Completion) -> Result<()> {
todo!();
fn wait_for_completion(&self, c: Completion) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
fn generate_random_number(&self) -> i64 {

View File

@@ -43,8 +43,11 @@ impl IO for VfsMod {
Ok(())
}
fn wait_for_completion(&self, _c: Completion) -> Result<()> {
todo!();
fn wait_for_completion(&self, c: Completion) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
fn generate_random_number(&self) -> i64 {

View File

@@ -18,7 +18,7 @@ use crate::{
find_compare, get_tie_breaker_from_seek_op, IndexInfo, ParseRecordState, RecordCompare,
RecordCursor, SeekResult,
},
MvCursor,
Completion, MvCursor,
};
use crate::{
@@ -137,7 +137,7 @@ macro_rules! return_if_locked_maybe_load {
return Ok(IOResult::IO);
}
if !$btree_page.get().is_loaded() {
let page = $pager.read_page($btree_page.get().get().id)?;
let (page, c) = $pager.read_page($btree_page.get().get().id)?;
$btree_page.page.replace(page);
return Ok(IOResult::IO);
}
@@ -684,7 +684,7 @@ impl BTreeCursor {
let mv_cursor = mv_cursor.borrow();
return Ok(IOResult::Done(mv_cursor.is_empty()));
}
let page = self.pager.read_page(self.root_page)?;
let (page, c) = self.pager.read_page(self.root_page)?;
return_if_locked!(page);
let cell_count = page.get().contents.as_ref().unwrap().cell_count();
@@ -712,8 +712,8 @@ impl BTreeCursor {
if let Some(rightmost_pointer) = rightmost_pointer {
let past_rightmost_pointer = cell_count as i32 + 1;
self.stack.set_cell_index(past_rightmost_pointer);
self.stack
.push_backwards(self.read_page(rightmost_pointer as usize)?);
let (page, c) = self.read_page(rightmost_pointer as usize)?;
self.stack.push_backwards(page);
continue;
}
}
@@ -761,7 +761,7 @@ impl BTreeCursor {
BTreeCell::TableInteriorCell(TableInteriorCell {
left_child_page, ..
}) => {
let mem_page = self.read_page(left_child_page as usize)?;
let (mem_page, c) = self.read_page(left_child_page as usize)?;
self.stack.push_backwards(mem_page);
continue;
}
@@ -779,7 +779,7 @@ impl BTreeCursor {
// this parent: key 666
// left child has: key 663, key 664, key 665
// we need to move to the previous parent (with e.g. key 662) when iterating backwards.
let mem_page = self.read_page(left_child_page as usize)?;
let (mem_page, c) = self.read_page(left_child_page as usize)?;
self.stack.retreat();
self.stack.push_backwards(mem_page);
continue;
@@ -808,7 +808,7 @@ impl BTreeCursor {
payload_size: u64,
) -> Result<IOResult<()>> {
if self.read_overflow_state.borrow().is_none() {
let page = self.read_page(start_next_page as usize)?;
let (page, c) = self.read_page(start_next_page as usize)?;
*self.read_overflow_state.borrow_mut() = Some(ReadPayloadOverflow {
payload: payload.to_vec(),
next_page: start_next_page,
@@ -840,10 +840,13 @@ impl BTreeCursor {
*remaining_to_read -= to_read;
if *remaining_to_read != 0 && next != 0 {
let new_page = self.pager.read_page(next as usize).map(|page| {
Arc::new(BTreePageInner {
page: RefCell::new(page),
})
let (new_page, c) = self.pager.read_page(next as usize).map(|(page, c)| {
(
Arc::new(BTreePageInner {
page: RefCell::new(page),
}),
c,
)
})?;
*page_btree = new_page;
*next_page = next;
@@ -1044,7 +1047,7 @@ impl BTreeCursor {
is_write,
}) => {
if *pages_left_to_skip == 0 {
let page = self.read_page(*next_page as usize)?;
let (page, c) = self.read_page(*next_page as usize)?;
return_if_locked_maybe_load!(self.pager, page);
self.state =
CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage {
@@ -1059,7 +1062,7 @@ impl BTreeCursor {
continue;
}
let page = self.read_page(*next_page as usize)?;
let (page, c) = self.read_page(*next_page as usize)?;
return_if_locked_maybe_load!(self.pager, page);
let page = page.get();
let contents = page.get_contents();
@@ -1154,7 +1157,8 @@ impl BTreeCursor {
// Load next page
*next_page = next;
*current_offset = 0; // Reset offset for new page
*page_btree = self.read_page(next as usize)?;
let (page, c) = self.read_page(next as usize)?;
*page_btree = page;
// Return IO to allow other operations
return Ok(IOResult::IO);
@@ -1265,7 +1269,7 @@ impl BTreeCursor {
(Some(right_most_pointer), false) => {
// do rightmost
self.stack.advance();
let mem_page = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
continue;
}
@@ -1297,7 +1301,7 @@ impl BTreeCursor {
BTreeCell::TableInteriorCell(TableInteriorCell {
left_child_page, ..
}) => {
let mem_page = self.read_page(*left_child_page as usize)?;
let (mem_page, c) = self.read_page(*left_child_page as usize)?;
self.stack.push(mem_page);
continue;
}
@@ -1311,7 +1315,7 @@ impl BTreeCursor {
self.going_upwards = false;
return Ok(IOResult::Done(true));
} else {
let mem_page = self.read_page(*left_child_page as usize)?;
let (mem_page, c) = self.read_page(*left_child_page as usize)?;
self.stack.push(mem_page);
continue;
}
@@ -1346,7 +1350,7 @@ impl BTreeCursor {
self.seek_state = CursorSeekState::Start;
self.going_upwards = false;
tracing::trace!(root_page = self.root_page);
let mem_page = self.read_page(self.root_page)?;
let (mem_page, c) = self.read_page(self.root_page)?;
self.stack.clear();
self.stack.push(mem_page);
Ok(())
@@ -1360,7 +1364,7 @@ impl BTreeCursor {
loop {
let mem_page = self.stack.top();
let page_idx = mem_page.get().get().id;
let page = self.read_page(page_idx)?;
let (page, c) = self.read_page(page_idx)?;
return_if_locked_maybe_load!(self.pager, page);
let page = page.get();
let contents = page.get().contents.as_ref().unwrap();
@@ -1375,7 +1379,7 @@ impl BTreeCursor {
match contents.rightmost_pointer() {
Some(right_most_pointer) => {
self.stack.set_cell_index(contents.cell_count() as i32 + 1);
let mem_page = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
continue;
}
@@ -1442,7 +1446,7 @@ impl BTreeCursor {
let left_child_page =
contents.cell_interior_read_left_child_page(nearest_matching_cell);
self.stack.set_cell_index(nearest_matching_cell as i32);
let mem_page = self.read_page(left_child_page as usize)?;
let (mem_page, c) = self.read_page(left_child_page as usize)?;
self.stack.push(mem_page);
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
@@ -1452,7 +1456,7 @@ impl BTreeCursor {
self.stack.set_cell_index(cell_count as i32 + 1);
match contents.rightmost_pointer() {
Some(right_most_pointer) => {
let mem_page = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
@@ -1583,7 +1587,7 @@ impl BTreeCursor {
self.stack.set_cell_index(contents.cell_count() as i32 + 1);
match contents.rightmost_pointer() {
Some(right_most_pointer) => {
let mem_page = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
@@ -1623,7 +1627,7 @@ impl BTreeCursor {
page.get().id
);
let mem_page = self.read_page(*left_child_page as usize)?;
let (mem_page, c) = self.read_page(*left_child_page as usize)?;
self.stack.push(mem_page);
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
@@ -2611,7 +2615,7 @@ impl BTreeCursor {
let mut pgno: u32 = unsafe { right_pointer.cast::<u32>().read().swap_bytes() };
let current_sibling = sibling_pointer;
for i in (0..=current_sibling).rev() {
let page = self.read_page(pgno as usize)?;
let (page, c) = self.read_page(pgno as usize)?;
{
// mark as dirty
let sibling_page = page.get();
@@ -4160,7 +4164,7 @@ impl BTreeCursor {
loop {
let mem_page = self.stack.top();
let page_id = mem_page.get().get().id;
let page = self.read_page(page_id)?;
let (page, c) = self.read_page(page_id)?;
return_if_locked_maybe_load!(self.pager, page);
let page = page.get();
@@ -4174,7 +4178,7 @@ impl BTreeCursor {
match contents.rightmost_pointer() {
Some(right_most_pointer) => {
self.stack.set_cell_index(contents.cell_count() as i32 + 1); // invalid on interior
let child = self.read_page(right_most_pointer as usize)?;
let (child, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(child);
}
None => unreachable!("interior page must have rightmost pointer"),
@@ -4964,7 +4968,7 @@ impl BTreeCursor {
self.overflow_state = None;
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
}
let page = self.read_page(next_page as usize)?;
let (page, c) = self.read_page(next_page as usize)?;
return_if_locked_maybe_load!(self.pager, page);
let page = page.get();
@@ -5060,7 +5064,7 @@ impl BTreeCursor {
// Non-leaf page which has processed all children but not it's potential right child
(false, n) if n == contents.cell_count() as i32 => {
if let Some(rightmost) = contents.rightmost_pointer() {
let rightmost_page = self.read_page(rightmost as usize)?;
let (rightmost_page, c) = self.read_page(rightmost as usize)?;
self.stack.push(rightmost_page);
let destroy_info = self.state.mut_destroy_info().expect(
"unable to get a mut reference to destroy state in cursor",
@@ -5117,7 +5121,7 @@ impl BTreeCursor {
BTreeCell::IndexInteriorCell(cell) => cell.left_child_page,
_ => panic!("expected interior cell"),
};
let child_page = self.read_page(child_page_id as usize)?;
let (child_page, c) = self.read_page(child_page_id as usize)?;
self.stack.push(child_page);
let destroy_info = self.state.mut_destroy_info().expect(
"unable to get a mut reference to destroy state in cursor",
@@ -5133,7 +5137,7 @@ impl BTreeCursor {
IOResult::Done(_) => match cell {
// For an index interior cell, clear the left child page now that overflow pages have been cleared
BTreeCell::IndexInteriorCell(index_int_cell) => {
let child_page =
let (child_page, c) =
self.read_page(index_int_cell.left_child_page as usize)?;
self.stack.push(child_page);
let destroy_info = self.state.mut_destroy_info().expect(
@@ -5378,7 +5382,7 @@ impl BTreeCursor {
// should be safe as contents is not a leaf page
let right_most_pointer = contents.rightmost_pointer().unwrap();
self.stack.advance();
let mem_page = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
} else {
// Move to child left page
@@ -5392,7 +5396,7 @@ impl BTreeCursor {
left_child_page, ..
}) => {
self.stack.advance();
let mem_page = self.read_page(left_child_page as usize)?;
let (mem_page, c) = self.read_page(left_child_page as usize)?;
self.stack.push(mem_page);
}
_ => unreachable!(),
@@ -5449,7 +5453,7 @@ impl BTreeCursor {
}
}
pub fn read_page(&self, page_idx: usize) -> Result<BTreePage> {
pub fn read_page(&self, page_idx: usize) -> Result<(BTreePage, Completion)> {
btree_read_page(&self.pager, page_idx)
}
@@ -5572,7 +5576,7 @@ pub fn integrity_check(
else {
return Ok(IOResult::Done(()));
};
let page = btree_read_page(pager, page_idx)?;
let (page, c) = btree_read_page(pager, page_idx)?;
return_if_locked_maybe_load!(pager, page);
state.page_stack.pop();
@@ -5719,11 +5723,14 @@ pub fn integrity_check(
Ok(IOResult::Done(()))
}
pub fn btree_read_page(pager: &Rc<Pager>, page_idx: usize) -> Result<BTreePage> {
pager.read_page(page_idx).map(|page| {
Arc::new(BTreePageInner {
page: RefCell::new(page),
})
pub fn btree_read_page(pager: &Rc<Pager>, page_idx: usize) -> Result<(BTreePage, Completion)> {
pager.read_page(page_idx).map(|(page, c)| {
(
Arc::new(BTreePageInner {
page: RefCell::new(page),
}),
c,
)
})
}
@@ -7181,7 +7188,7 @@ mod tests {
fn validate_btree(pager: Rc<Pager>, page_idx: usize) -> (usize, bool) {
let num_columns = 5;
let cursor = BTreeCursor::new_table(None, pager.clone(), page_idx, num_columns);
let page = cursor.read_page(page_idx).unwrap();
let (page, c) = cursor.read_page(page_idx).unwrap();
while page.get().is_locked() {
pager.io.run_once().unwrap();
}
@@ -7201,7 +7208,7 @@ mod tests {
BTreeCell::TableInteriorCell(TableInteriorCell {
left_child_page, ..
}) => {
let child_page = cursor.read_page(left_child_page as usize).unwrap();
let (child_page, c) = cursor.read_page(left_child_page as usize).unwrap();
while child_page.get().is_locked() {
pager.io.run_once().unwrap();
}
@@ -7258,7 +7265,7 @@ mod tests {
}
let first_page_type = child_pages.first().map(|p| {
if !p.get().is_loaded() {
let new_page = pager.read_page(p.get().get().id).unwrap();
let (new_page, c) = pager.read_page(p.get().get().id).unwrap();
p.page.replace(new_page);
}
while p.get().is_locked() {
@@ -7269,7 +7276,7 @@ mod tests {
if let Some(child_type) = first_page_type {
for page in child_pages.iter().skip(1) {
if !page.get().is_loaded() {
let new_page = pager.read_page(page.get().get().id).unwrap();
let (new_page, c) = pager.read_page(page.get().get().id).unwrap();
page.page.replace(new_page);
}
while page.get().is_locked() {
@@ -7292,7 +7299,7 @@ mod tests {
let num_columns = 5;
let cursor = BTreeCursor::new_table(None, pager.clone(), page_idx, num_columns);
let page = cursor.read_page(page_idx).unwrap();
let (page, c) = cursor.read_page(page_idx).unwrap();
while page.get().is_locked() {
pager.io.run_once().unwrap();
}
@@ -8340,7 +8347,7 @@ mod tests {
.write_page(current_page as usize, buf.clone(), c)?;
pager.io.run_once()?;
let page = cursor.read_page(current_page as usize)?;
let (page, c) = cursor.read_page(current_page as usize)?;
while page.get().is_locked() {
cursor.pager.io.run_once()?;
}
@@ -8388,7 +8395,7 @@ mod tests {
let trunk_page_id = header_accessor::get_freelist_trunk_page(&pager)?;
if trunk_page_id > 0 {
// Verify trunk page structure
let trunk_page = cursor.read_page(trunk_page_id as usize)?;
let (trunk_page, c) = cursor.read_page(trunk_page_id as usize)?;
if let Some(contents) = trunk_page.get().get().contents.as_ref() {
// Read number of leaf pages in trunk
let n_leaf = contents.read_u32(4);

View File

@@ -41,7 +41,7 @@ fn get_header_page(pager: &Pager) -> Result<IOResult<PageRef>> {
"Database is empty, header does not exist - page 1 should've been allocated before this".to_string(),
));
}
let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?;
let (page, c) = pager.read_page(DATABASE_HEADER_PAGE_ID)?;
if page.is_locked() {
return Ok(IOResult::IO);
}
@@ -56,7 +56,7 @@ fn get_header_page_for_write(pager: &Pager) -> Result<IOResult<PageRef>> {
"Cannot write to header of an empty database - page 1 should've been allocated before this".to_string(),
));
}
let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?;
let (page, c) = pager.read_page(DATABASE_HEADER_PAGE_ID)?;
if page.is_locked() {
return Ok(IOResult::IO);
}

View File

@@ -493,7 +493,7 @@ impl Pager {
ptrmap_pg_no
);
let ptrmap_page = self.read_page(ptrmap_pg_no as usize)?;
let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?;
if ptrmap_page.is_locked() {
return Ok(IOResult::IO);
}
@@ -584,7 +584,7 @@ impl Pager {
offset_in_ptrmap_page
);
let ptrmap_page = self.read_page(ptrmap_pg_no as usize)?;
let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?;
if ptrmap_page.is_locked() {
return Ok(IOResult::IO);
}
@@ -846,24 +846,24 @@ impl Pager {
/// Reads a page from the database.
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn read_page(&self, page_idx: usize) -> Result<PageRef, LimboError> {
pub fn read_page(&self, page_idx: usize) -> Result<(PageRef, Completion)> {
tracing::trace!("read_page(page_idx = {})", page_idx);
let mut page_cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_idx);
if let Some(page) = page_cache.get(&page_key) {
tracing::trace!("read_page(page_idx = {}) = cached", page_idx);
return Ok(page.clone());
// Dummy completion being passed, as we do not need to read from database or wal
return Ok((page.clone(), Completion::new_write(|_| {})));
}
let page = Arc::new(Page::new(page_idx));
page.set_locked();
if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? {
self.wal
.borrow()
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
{
page.set_uptodate();
}
let c =
self.wal
.borrow()
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
page.set_uptodate();
// TODO(pere) should probably first insert to page cache, and if successful,
// read frame or page
match page_cache.insert(page_key, page.clone()) {
@@ -878,7 +878,7 @@ impl Pager {
)))
}
}
return Ok(page);
return Ok((page, c));
}
let c = sqlite3_ondisk::begin_read_page(
@@ -899,7 +899,7 @@ impl Pager {
)))
}
}
Ok(page)
Ok((page, c))
}
// Get a page from the cache, if it exists.
@@ -969,7 +969,7 @@ impl Pager {
page
};
self.wal.borrow_mut().append_frame(
let c = self.wal.borrow_mut().append_frame(
page.clone(),
0,
self.flush_info.borrow().in_flight_writes.clone(),
@@ -1081,7 +1081,7 @@ impl Pager {
0
}
};
self.wal.borrow_mut().append_frame(
let c = self.wal.borrow_mut().append_frame(
page.clone(),
db_size,
self.commit_info.borrow().in_flight_writes.clone(),
@@ -1148,7 +1148,7 @@ impl Pager {
self.commit_info.borrow_mut().state = CommitState::SyncDbFile;
}
CommitState::SyncDbFile => {
sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
self.commit_info.borrow_mut().state = CommitState::WaitSyncDbFile;
}
CommitState::WaitSyncDbFile => {
@@ -1229,7 +1229,7 @@ impl Pager {
};
}
CheckpointState::SyncDbFile => {
sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
self.checkpoint_state
.replace(CheckpointState::WaitSyncDbFile);
}
@@ -1334,7 +1334,7 @@ impl Pager {
)));
}
let page = match page.clone() {
let (page, c) = match page.clone() {
Some(page) => {
assert_eq!(
page.get().id,
@@ -1347,9 +1347,12 @@ impl Pager {
let page_contents = page.get_contents();
page_contents.overflow_cells.clear();
}
page
(page, None)
}
None => {
let (page, c) = self.read_page(page_id)?;
(page, Some(c))
}
None => self.read_page(page_id)?,
};
header_accessor::set_freelist_pages(
self,
@@ -1371,7 +1374,8 @@ impl Pager {
let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?;
if trunk_page.is_none() {
// Add as leaf to current trunk
trunk_page.replace(self.read_page(trunk_page_id as usize)?);
let (page, c) = self.read_page(trunk_page_id as usize)?;
trunk_page.replace(page);
}
let trunk_page = trunk_page.as_ref().unwrap();
if trunk_page.is_locked() || !trunk_page.is_loaded() {
@@ -1568,7 +1572,7 @@ impl Pager {
};
continue;
}
let trunk_page = self.read_page(first_freelist_trunk_page_id as usize)?;
let (trunk_page, c) = self.read_page(first_freelist_trunk_page_id as usize)?;
*state = AllocatePageState::SearchAvailableFreeListLeaf {
trunk_page,
current_db_size: new_db_size,
@@ -1657,7 +1661,7 @@ impl Pager {
let page_contents = trunk_page.get().contents.as_ref().unwrap();
let next_leaf_page_id =
page_contents.read_u32(FREELIST_TRUNK_OFFSET_FIRST_LEAF);
let leaf_page = self.read_page(next_leaf_page_id as usize)?;
let (leaf_page, c) = self.read_page(next_leaf_page_id as usize)?;
if leaf_page.is_locked() {
return Ok(IOResult::IO);
}

View File

@@ -853,15 +853,17 @@ pub fn begin_write_btree_page(
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn begin_sync(db_file: Arc<dyn DatabaseStorage>, syncing: Rc<RefCell<bool>>) -> Result<()> {
pub fn begin_sync(
db_file: Arc<dyn DatabaseStorage>,
syncing: Rc<RefCell<bool>>,
) -> Result<Completion> {
assert!(!*syncing.borrow());
*syncing.borrow_mut() = true;
let completion = Completion::new_sync(move |_| {
*syncing.borrow_mut() = false;
});
#[allow(clippy::arc_with_non_send_sync)]
let c = db_file.sync(completion)?;
Ok(())
db_file.sync(completion)
}
#[allow(clippy::enum_variant_names)]
@@ -1661,7 +1663,7 @@ pub fn prepare_wal_frame(
(final_checksum, Arc::new(RefCell::new(buffer)))
}
pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<()> {
pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<Completion> {
tracing::trace!("begin_write_wal_header");
let buffer = {
let drop_fn = Rc::new(|_buf| {});
@@ -1693,8 +1695,7 @@ pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<
};
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new_write(write_complete);
let c = io.pwrite(0, buffer.clone(), c)?;
Ok(())
io.pwrite(0, buffer.clone(), c)
}
/// Checks if payload will overflow a cell based on the maximum allowed size.

View File

@@ -206,7 +206,12 @@ pub trait Wal {
fn find_frame(&self, page_id: u64) -> Result<Option<u64>>;
/// Read a frame from the WAL.
fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Arc<BufferPool>) -> Result<()>;
fn read_frame(
&self,
frame_id: u64,
page: PageRef,
buffer_pool: Arc<BufferPool>,
) -> Result<Completion>;
/// Read a raw frame (header included) from the WAL.
fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result<Completion>;
@@ -232,7 +237,7 @@ pub trait Wal {
page: PageRef,
db_size: u32,
write_counter: Rc<RefCell<usize>>,
) -> Result<()>;
) -> Result<Completion>;
/// Complete append of frames by updating shared wal state. Before this
/// all changes were stored locally.
@@ -280,8 +285,9 @@ impl Wal for DummyWAL {
_frame_id: u64,
_page: crate::PageRef,
_buffer_pool: Arc<BufferPool>,
) -> Result<()> {
Ok(())
) -> Result<Completion> {
// Dummy completion
Ok(Completion::new_write(|_| {}))
}
fn read_frame_raw(&self, _frame_id: u64, _frame: &mut [u8]) -> Result<Completion> {
@@ -304,8 +310,8 @@ impl Wal for DummyWAL {
_page: crate::PageRef,
_db_size: u32,
_write_counter: Rc<RefCell<usize>>,
) -> Result<()> {
Ok(())
) -> Result<Completion> {
Ok(Completion::new_write(|_| {}))
}
fn should_checkpoint(&self) -> bool {
@@ -612,7 +618,12 @@ impl Wal for WalFile {
/// Read a frame from the WAL.
#[instrument(skip_all, level = Level::DEBUG)]
fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Arc<BufferPool>) -> Result<()> {
fn read_frame(
&self,
frame_id: u64,
page: PageRef,
buffer_pool: Arc<BufferPool>,
) -> Result<Completion> {
tracing::debug!("read_frame({})", frame_id);
let offset = self.frame_offset(frame_id);
page.set_locked();
@@ -626,13 +637,12 @@ impl Wal for WalFile {
let frame = frame.clone();
finish_read_page(page.get().id, buf, frame).unwrap();
});
let c = begin_read_wal_frame(
begin_read_wal_frame(
&self.get_shared().file,
offset + WAL_FRAME_HEADER_SIZE,
buffer_pool,
complete,
)?;
Ok(())
)
}
#[instrument(skip_all, level = Level::DEBUG)]
@@ -748,12 +758,12 @@ impl Wal for WalFile {
page: PageRef,
db_size: u32,
write_counter: Rc<RefCell<usize>>,
) -> Result<()> {
) -> Result<Completion> {
let page_id = page.get().id;
let frame_id = self.max_frame + 1;
let offset = self.frame_offset(frame_id);
tracing::debug!(frame_id, offset, page_id);
let checksums = {
let (c, checksums) = {
let shared = self.get_shared();
let header = shared.wal_header.clone();
let header = header.lock();
@@ -789,10 +799,10 @@ impl Wal for WalFile {
*write_counter.borrow_mut() -= 1;
return Err(err);
}
frame_checksums
(result.unwrap(), frame_checksums)
};
self.complete_append_frame(page_id as u64, frame_id, checksums);
Ok(())
Ok(c)
}
#[instrument(skip_all, level = Level::DEBUG)]
@@ -894,7 +904,7 @@ impl Wal for WalFile {
);
self.ongoing_checkpoint.page.get().id = page as usize;
self.read_frame(
let c = self.read_frame(
*frame,
self.ongoing_checkpoint.page.clone(),
self.buffer_pool.clone(),
@@ -1217,7 +1227,9 @@ impl WalFileShared {
);
wal_header.checksum_1 = checksums.0;
wal_header.checksum_2 = checksums.1;
sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?;
let c = sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?;
// TODO: for now wait for completion
io.wait_for_completion(c)?;
let header = Arc::new(SpinLock::new(wal_header));
let checksum = {
let checksum = header.lock();