diff --git a/core/io/memory.rs b/core/io/memory.rs index ac81b39bc..aed9531c1 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -53,8 +53,11 @@ impl IO for MemoryIO { Ok(()) } - fn wait_for_completion(&self, _c: Completion) -> Result<()> { - todo!(); + fn wait_for_completion(&self, c: Completion) -> Result<()> { + while !c.is_completed() { + self.run_once()?; + } + Ok(()) } fn generate_random_number(&self) -> i64 { diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 545ce61a1..1f941e2c2 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -43,8 +43,11 @@ impl IO for VfsMod { Ok(()) } - fn wait_for_completion(&self, _c: Completion) -> Result<()> { - todo!(); + fn wait_for_completion(&self, c: Completion) -> Result<()> { + while !c.is_completed() { + self.run_once()?; + } + Ok(()) } fn generate_random_number(&self) -> i64 { diff --git a/core/storage/btree.rs b/core/storage/btree.rs index bf3277f61..741900ae8 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -18,7 +18,7 @@ use crate::{ find_compare, get_tie_breaker_from_seek_op, IndexInfo, ParseRecordState, RecordCompare, RecordCursor, SeekResult, }, - MvCursor, + Completion, MvCursor, }; use crate::{ @@ -137,7 +137,7 @@ macro_rules! return_if_locked_maybe_load { return Ok(IOResult::IO); } if !$btree_page.get().is_loaded() { - let page = $pager.read_page($btree_page.get().get().id)?; + let (page, c) = $pager.read_page($btree_page.get().get().id)?; $btree_page.page.replace(page); return Ok(IOResult::IO); } @@ -684,7 +684,7 @@ impl BTreeCursor { let mv_cursor = mv_cursor.borrow(); return Ok(IOResult::Done(mv_cursor.is_empty())); } - let page = self.pager.read_page(self.root_page)?; + let (page, c) = self.pager.read_page(self.root_page)?; return_if_locked!(page); let cell_count = page.get().contents.as_ref().unwrap().cell_count(); @@ -712,8 +712,8 @@ impl BTreeCursor { if let Some(rightmost_pointer) = rightmost_pointer { let past_rightmost_pointer = cell_count as i32 + 1; self.stack.set_cell_index(past_rightmost_pointer); - self.stack - .push_backwards(self.read_page(rightmost_pointer as usize)?); + let (page, c) = self.read_page(rightmost_pointer as usize)?; + self.stack.push_backwards(page); continue; } } @@ -761,7 +761,7 @@ impl BTreeCursor { BTreeCell::TableInteriorCell(TableInteriorCell { left_child_page, .. }) => { - let mem_page = self.read_page(left_child_page as usize)?; + let (mem_page, c) = self.read_page(left_child_page as usize)?; self.stack.push_backwards(mem_page); continue; } @@ -779,7 +779,7 @@ impl BTreeCursor { // this parent: key 666 // left child has: key 663, key 664, key 665 // we need to move to the previous parent (with e.g. key 662) when iterating backwards. - let mem_page = self.read_page(left_child_page as usize)?; + let (mem_page, c) = self.read_page(left_child_page as usize)?; self.stack.retreat(); self.stack.push_backwards(mem_page); continue; @@ -808,7 +808,7 @@ impl BTreeCursor { payload_size: u64, ) -> Result> { if self.read_overflow_state.borrow().is_none() { - let page = self.read_page(start_next_page as usize)?; + let (page, c) = self.read_page(start_next_page as usize)?; *self.read_overflow_state.borrow_mut() = Some(ReadPayloadOverflow { payload: payload.to_vec(), next_page: start_next_page, @@ -840,10 +840,13 @@ impl BTreeCursor { *remaining_to_read -= to_read; if *remaining_to_read != 0 && next != 0 { - let new_page = self.pager.read_page(next as usize).map(|page| { - Arc::new(BTreePageInner { - page: RefCell::new(page), - }) + let (new_page, c) = self.pager.read_page(next as usize).map(|(page, c)| { + ( + Arc::new(BTreePageInner { + page: RefCell::new(page), + }), + c, + ) })?; *page_btree = new_page; *next_page = next; @@ -1044,7 +1047,7 @@ impl BTreeCursor { is_write, }) => { if *pages_left_to_skip == 0 { - let page = self.read_page(*next_page as usize)?; + let (page, c) = self.read_page(*next_page as usize)?; return_if_locked_maybe_load!(self.pager, page); self.state = CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { @@ -1059,7 +1062,7 @@ impl BTreeCursor { continue; } - let page = self.read_page(*next_page as usize)?; + let (page, c) = self.read_page(*next_page as usize)?; return_if_locked_maybe_load!(self.pager, page); let page = page.get(); let contents = page.get_contents(); @@ -1154,7 +1157,8 @@ impl BTreeCursor { // Load next page *next_page = next; *current_offset = 0; // Reset offset for new page - *page_btree = self.read_page(next as usize)?; + let (page, c) = self.read_page(next as usize)?; + *page_btree = page; // Return IO to allow other operations return Ok(IOResult::IO); @@ -1265,7 +1269,7 @@ impl BTreeCursor { (Some(right_most_pointer), false) => { // do rightmost self.stack.advance(); - let mem_page = self.read_page(right_most_pointer as usize)?; + let (mem_page, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); continue; } @@ -1297,7 +1301,7 @@ impl BTreeCursor { BTreeCell::TableInteriorCell(TableInteriorCell { left_child_page, .. }) => { - let mem_page = self.read_page(*left_child_page as usize)?; + let (mem_page, c) = self.read_page(*left_child_page as usize)?; self.stack.push(mem_page); continue; } @@ -1311,7 +1315,7 @@ impl BTreeCursor { self.going_upwards = false; return Ok(IOResult::Done(true)); } else { - let mem_page = self.read_page(*left_child_page as usize)?; + let (mem_page, c) = self.read_page(*left_child_page as usize)?; self.stack.push(mem_page); continue; } @@ -1346,7 +1350,7 @@ impl BTreeCursor { self.seek_state = CursorSeekState::Start; self.going_upwards = false; tracing::trace!(root_page = self.root_page); - let mem_page = self.read_page(self.root_page)?; + let (mem_page, c) = self.read_page(self.root_page)?; self.stack.clear(); self.stack.push(mem_page); Ok(()) @@ -1360,7 +1364,7 @@ impl BTreeCursor { loop { let mem_page = self.stack.top(); let page_idx = mem_page.get().get().id; - let page = self.read_page(page_idx)?; + let (page, c) = self.read_page(page_idx)?; return_if_locked_maybe_load!(self.pager, page); let page = page.get(); let contents = page.get().contents.as_ref().unwrap(); @@ -1375,7 +1379,7 @@ impl BTreeCursor { match contents.rightmost_pointer() { Some(right_most_pointer) => { self.stack.set_cell_index(contents.cell_count() as i32 + 1); - let mem_page = self.read_page(right_most_pointer as usize)?; + let (mem_page, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); continue; } @@ -1442,7 +1446,7 @@ impl BTreeCursor { let left_child_page = contents.cell_interior_read_left_child_page(nearest_matching_cell); self.stack.set_cell_index(nearest_matching_cell as i32); - let mem_page = self.read_page(left_child_page as usize)?; + let (mem_page, c) = self.read_page(left_child_page as usize)?; self.stack.push(mem_page); self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), @@ -1452,7 +1456,7 @@ impl BTreeCursor { self.stack.set_cell_index(cell_count as i32 + 1); match contents.rightmost_pointer() { Some(right_most_pointer) => { - let mem_page = self.read_page(right_most_pointer as usize)?; + let (mem_page, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), @@ -1583,7 +1587,7 @@ impl BTreeCursor { self.stack.set_cell_index(contents.cell_count() as i32 + 1); match contents.rightmost_pointer() { Some(right_most_pointer) => { - let mem_page = self.read_page(right_most_pointer as usize)?; + let (mem_page, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), @@ -1623,7 +1627,7 @@ impl BTreeCursor { page.get().id ); - let mem_page = self.read_page(*left_child_page as usize)?; + let (mem_page, c) = self.read_page(*left_child_page as usize)?; self.stack.push(mem_page); self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), @@ -2611,7 +2615,7 @@ impl BTreeCursor { let mut pgno: u32 = unsafe { right_pointer.cast::().read().swap_bytes() }; let current_sibling = sibling_pointer; for i in (0..=current_sibling).rev() { - let page = self.read_page(pgno as usize)?; + let (page, c) = self.read_page(pgno as usize)?; { // mark as dirty let sibling_page = page.get(); @@ -4160,7 +4164,7 @@ impl BTreeCursor { loop { let mem_page = self.stack.top(); let page_id = mem_page.get().get().id; - let page = self.read_page(page_id)?; + let (page, c) = self.read_page(page_id)?; return_if_locked_maybe_load!(self.pager, page); let page = page.get(); @@ -4174,7 +4178,7 @@ impl BTreeCursor { match contents.rightmost_pointer() { Some(right_most_pointer) => { self.stack.set_cell_index(contents.cell_count() as i32 + 1); // invalid on interior - let child = self.read_page(right_most_pointer as usize)?; + let (child, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(child); } None => unreachable!("interior page must have rightmost pointer"), @@ -4964,7 +4968,7 @@ impl BTreeCursor { self.overflow_state = None; return Err(LimboError::Corrupt("Invalid overflow page number".into())); } - let page = self.read_page(next_page as usize)?; + let (page, c) = self.read_page(next_page as usize)?; return_if_locked_maybe_load!(self.pager, page); let page = page.get(); @@ -5060,7 +5064,7 @@ impl BTreeCursor { // Non-leaf page which has processed all children but not it's potential right child (false, n) if n == contents.cell_count() as i32 => { if let Some(rightmost) = contents.rightmost_pointer() { - let rightmost_page = self.read_page(rightmost as usize)?; + let (rightmost_page, c) = self.read_page(rightmost as usize)?; self.stack.push(rightmost_page); let destroy_info = self.state.mut_destroy_info().expect( "unable to get a mut reference to destroy state in cursor", @@ -5117,7 +5121,7 @@ impl BTreeCursor { BTreeCell::IndexInteriorCell(cell) => cell.left_child_page, _ => panic!("expected interior cell"), }; - let child_page = self.read_page(child_page_id as usize)?; + let (child_page, c) = self.read_page(child_page_id as usize)?; self.stack.push(child_page); let destroy_info = self.state.mut_destroy_info().expect( "unable to get a mut reference to destroy state in cursor", @@ -5133,7 +5137,7 @@ impl BTreeCursor { IOResult::Done(_) => match cell { // For an index interior cell, clear the left child page now that overflow pages have been cleared BTreeCell::IndexInteriorCell(index_int_cell) => { - let child_page = + let (child_page, c) = self.read_page(index_int_cell.left_child_page as usize)?; self.stack.push(child_page); let destroy_info = self.state.mut_destroy_info().expect( @@ -5378,7 +5382,7 @@ impl BTreeCursor { // should be safe as contents is not a leaf page let right_most_pointer = contents.rightmost_pointer().unwrap(); self.stack.advance(); - let mem_page = self.read_page(right_most_pointer as usize)?; + let (mem_page, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); } else { // Move to child left page @@ -5392,7 +5396,7 @@ impl BTreeCursor { left_child_page, .. }) => { self.stack.advance(); - let mem_page = self.read_page(left_child_page as usize)?; + let (mem_page, c) = self.read_page(left_child_page as usize)?; self.stack.push(mem_page); } _ => unreachable!(), @@ -5449,7 +5453,7 @@ impl BTreeCursor { } } - pub fn read_page(&self, page_idx: usize) -> Result { + pub fn read_page(&self, page_idx: usize) -> Result<(BTreePage, Completion)> { btree_read_page(&self.pager, page_idx) } @@ -5572,7 +5576,7 @@ pub fn integrity_check( else { return Ok(IOResult::Done(())); }; - let page = btree_read_page(pager, page_idx)?; + let (page, c) = btree_read_page(pager, page_idx)?; return_if_locked_maybe_load!(pager, page); state.page_stack.pop(); @@ -5719,11 +5723,14 @@ pub fn integrity_check( Ok(IOResult::Done(())) } -pub fn btree_read_page(pager: &Rc, page_idx: usize) -> Result { - pager.read_page(page_idx).map(|page| { - Arc::new(BTreePageInner { - page: RefCell::new(page), - }) +pub fn btree_read_page(pager: &Rc, page_idx: usize) -> Result<(BTreePage, Completion)> { + pager.read_page(page_idx).map(|(page, c)| { + ( + Arc::new(BTreePageInner { + page: RefCell::new(page), + }), + c, + ) }) } @@ -7181,7 +7188,7 @@ mod tests { fn validate_btree(pager: Rc, page_idx: usize) -> (usize, bool) { let num_columns = 5; let cursor = BTreeCursor::new_table(None, pager.clone(), page_idx, num_columns); - let page = cursor.read_page(page_idx).unwrap(); + let (page, c) = cursor.read_page(page_idx).unwrap(); while page.get().is_locked() { pager.io.run_once().unwrap(); } @@ -7201,7 +7208,7 @@ mod tests { BTreeCell::TableInteriorCell(TableInteriorCell { left_child_page, .. }) => { - let child_page = cursor.read_page(left_child_page as usize).unwrap(); + let (child_page, c) = cursor.read_page(left_child_page as usize).unwrap(); while child_page.get().is_locked() { pager.io.run_once().unwrap(); } @@ -7258,7 +7265,7 @@ mod tests { } let first_page_type = child_pages.first().map(|p| { if !p.get().is_loaded() { - let new_page = pager.read_page(p.get().get().id).unwrap(); + let (new_page, c) = pager.read_page(p.get().get().id).unwrap(); p.page.replace(new_page); } while p.get().is_locked() { @@ -7269,7 +7276,7 @@ mod tests { if let Some(child_type) = first_page_type { for page in child_pages.iter().skip(1) { if !page.get().is_loaded() { - let new_page = pager.read_page(page.get().get().id).unwrap(); + let (new_page, c) = pager.read_page(page.get().get().id).unwrap(); page.page.replace(new_page); } while page.get().is_locked() { @@ -7292,7 +7299,7 @@ mod tests { let num_columns = 5; let cursor = BTreeCursor::new_table(None, pager.clone(), page_idx, num_columns); - let page = cursor.read_page(page_idx).unwrap(); + let (page, c) = cursor.read_page(page_idx).unwrap(); while page.get().is_locked() { pager.io.run_once().unwrap(); } @@ -8340,7 +8347,7 @@ mod tests { .write_page(current_page as usize, buf.clone(), c)?; pager.io.run_once()?; - let page = cursor.read_page(current_page as usize)?; + let (page, c) = cursor.read_page(current_page as usize)?; while page.get().is_locked() { cursor.pager.io.run_once()?; } @@ -8388,7 +8395,7 @@ mod tests { let trunk_page_id = header_accessor::get_freelist_trunk_page(&pager)?; if trunk_page_id > 0 { // Verify trunk page structure - let trunk_page = cursor.read_page(trunk_page_id as usize)?; + let (trunk_page, c) = cursor.read_page(trunk_page_id as usize)?; if let Some(contents) = trunk_page.get().get().contents.as_ref() { // Read number of leaf pages in trunk let n_leaf = contents.read_u32(4); diff --git a/core/storage/header_accessor.rs b/core/storage/header_accessor.rs index 4e0f75881..592d4f570 100644 --- a/core/storage/header_accessor.rs +++ b/core/storage/header_accessor.rs @@ -41,7 +41,7 @@ fn get_header_page(pager: &Pager) -> Result> { "Database is empty, header does not exist - page 1 should've been allocated before this".to_string(), )); } - let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?; + let (page, c) = pager.read_page(DATABASE_HEADER_PAGE_ID)?; if page.is_locked() { return Ok(IOResult::IO); } @@ -56,7 +56,7 @@ fn get_header_page_for_write(pager: &Pager) -> Result> { "Cannot write to header of an empty database - page 1 should've been allocated before this".to_string(), )); } - let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?; + let (page, c) = pager.read_page(DATABASE_HEADER_PAGE_ID)?; if page.is_locked() { return Ok(IOResult::IO); } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index f03b69252..3a73a96a1 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -493,7 +493,7 @@ impl Pager { ptrmap_pg_no ); - let ptrmap_page = self.read_page(ptrmap_pg_no as usize)?; + let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?; if ptrmap_page.is_locked() { return Ok(IOResult::IO); } @@ -584,7 +584,7 @@ impl Pager { offset_in_ptrmap_page ); - let ptrmap_page = self.read_page(ptrmap_pg_no as usize)?; + let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?; if ptrmap_page.is_locked() { return Ok(IOResult::IO); } @@ -846,24 +846,24 @@ impl Pager { /// Reads a page from the database. #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn read_page(&self, page_idx: usize) -> Result { + pub fn read_page(&self, page_idx: usize) -> Result<(PageRef, Completion)> { tracing::trace!("read_page(page_idx = {})", page_idx); let mut page_cache = self.page_cache.write(); let page_key = PageCacheKey::new(page_idx); if let Some(page) = page_cache.get(&page_key) { tracing::trace!("read_page(page_idx = {}) = cached", page_idx); - return Ok(page.clone()); + // Dummy completion being passed, as we do not need to read from database or wal + return Ok((page.clone(), Completion::new_write(|_| {}))); } let page = Arc::new(Page::new(page_idx)); page.set_locked(); if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? { - self.wal - .borrow() - .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; - { - page.set_uptodate(); - } + let c = + self.wal + .borrow() + .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; + page.set_uptodate(); // TODO(pere) should probably first insert to page cache, and if successful, // read frame or page match page_cache.insert(page_key, page.clone()) { @@ -878,7 +878,7 @@ impl Pager { ))) } } - return Ok(page); + return Ok((page, c)); } let c = sqlite3_ondisk::begin_read_page( @@ -899,7 +899,7 @@ impl Pager { ))) } } - Ok(page) + Ok((page, c)) } // Get a page from the cache, if it exists. @@ -969,7 +969,7 @@ impl Pager { page }; - self.wal.borrow_mut().append_frame( + let c = self.wal.borrow_mut().append_frame( page.clone(), 0, self.flush_info.borrow().in_flight_writes.clone(), @@ -1081,7 +1081,7 @@ impl Pager { 0 } }; - self.wal.borrow_mut().append_frame( + let c = self.wal.borrow_mut().append_frame( page.clone(), db_size, self.commit_info.borrow().in_flight_writes.clone(), @@ -1148,7 +1148,7 @@ impl Pager { self.commit_info.borrow_mut().state = CommitState::SyncDbFile; } CommitState::SyncDbFile => { - sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?; + let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?; self.commit_info.borrow_mut().state = CommitState::WaitSyncDbFile; } CommitState::WaitSyncDbFile => { @@ -1229,7 +1229,7 @@ impl Pager { }; } CheckpointState::SyncDbFile => { - sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?; + let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?; self.checkpoint_state .replace(CheckpointState::WaitSyncDbFile); } @@ -1334,7 +1334,7 @@ impl Pager { ))); } - let page = match page.clone() { + let (page, c) = match page.clone() { Some(page) => { assert_eq!( page.get().id, @@ -1347,9 +1347,12 @@ impl Pager { let page_contents = page.get_contents(); page_contents.overflow_cells.clear(); } - page + (page, None) + } + None => { + let (page, c) = self.read_page(page_id)?; + (page, Some(c)) } - None => self.read_page(page_id)?, }; header_accessor::set_freelist_pages( self, @@ -1371,7 +1374,8 @@ impl Pager { let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?; if trunk_page.is_none() { // Add as leaf to current trunk - trunk_page.replace(self.read_page(trunk_page_id as usize)?); + let (page, c) = self.read_page(trunk_page_id as usize)?; + trunk_page.replace(page); } let trunk_page = trunk_page.as_ref().unwrap(); if trunk_page.is_locked() || !trunk_page.is_loaded() { @@ -1568,7 +1572,7 @@ impl Pager { }; continue; } - let trunk_page = self.read_page(first_freelist_trunk_page_id as usize)?; + let (trunk_page, c) = self.read_page(first_freelist_trunk_page_id as usize)?; *state = AllocatePageState::SearchAvailableFreeListLeaf { trunk_page, current_db_size: new_db_size, @@ -1657,7 +1661,7 @@ impl Pager { let page_contents = trunk_page.get().contents.as_ref().unwrap(); let next_leaf_page_id = page_contents.read_u32(FREELIST_TRUNK_OFFSET_FIRST_LEAF); - let leaf_page = self.read_page(next_leaf_page_id as usize)?; + let (leaf_page, c) = self.read_page(next_leaf_page_id as usize)?; if leaf_page.is_locked() { return Ok(IOResult::IO); } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 83f669a0d..46cb738b4 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -853,15 +853,17 @@ pub fn begin_write_btree_page( } #[instrument(skip_all, level = Level::DEBUG)] -pub fn begin_sync(db_file: Arc, syncing: Rc>) -> Result<()> { +pub fn begin_sync( + db_file: Arc, + syncing: Rc>, +) -> Result { assert!(!*syncing.borrow()); *syncing.borrow_mut() = true; let completion = Completion::new_sync(move |_| { *syncing.borrow_mut() = false; }); #[allow(clippy::arc_with_non_send_sync)] - let c = db_file.sync(completion)?; - Ok(()) + db_file.sync(completion) } #[allow(clippy::enum_variant_names)] @@ -1661,7 +1663,7 @@ pub fn prepare_wal_frame( (final_checksum, Arc::new(RefCell::new(buffer))) } -pub fn begin_write_wal_header(io: &Arc, header: &WalHeader) -> Result<()> { +pub fn begin_write_wal_header(io: &Arc, header: &WalHeader) -> Result { tracing::trace!("begin_write_wal_header"); let buffer = { let drop_fn = Rc::new(|_buf| {}); @@ -1693,8 +1695,7 @@ pub fn begin_write_wal_header(io: &Arc, header: &WalHeader) -> Result< }; #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new_write(write_complete); - let c = io.pwrite(0, buffer.clone(), c)?; - Ok(()) + io.pwrite(0, buffer.clone(), c) } /// Checks if payload will overflow a cell based on the maximum allowed size. diff --git a/core/storage/wal.rs b/core/storage/wal.rs index af470eb1b..c166d8017 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -206,7 +206,12 @@ pub trait Wal { fn find_frame(&self, page_id: u64) -> Result>; /// Read a frame from the WAL. - fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Arc) -> Result<()>; + fn read_frame( + &self, + frame_id: u64, + page: PageRef, + buffer_pool: Arc, + ) -> Result; /// Read a raw frame (header included) from the WAL. fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result; @@ -232,7 +237,7 @@ pub trait Wal { page: PageRef, db_size: u32, write_counter: Rc>, - ) -> Result<()>; + ) -> Result; /// Complete append of frames by updating shared wal state. Before this /// all changes were stored locally. @@ -280,8 +285,9 @@ impl Wal for DummyWAL { _frame_id: u64, _page: crate::PageRef, _buffer_pool: Arc, - ) -> Result<()> { - Ok(()) + ) -> Result { + // Dummy completion + Ok(Completion::new_write(|_| {})) } fn read_frame_raw(&self, _frame_id: u64, _frame: &mut [u8]) -> Result { @@ -304,8 +310,8 @@ impl Wal for DummyWAL { _page: crate::PageRef, _db_size: u32, _write_counter: Rc>, - ) -> Result<()> { - Ok(()) + ) -> Result { + Ok(Completion::new_write(|_| {})) } fn should_checkpoint(&self) -> bool { @@ -612,7 +618,12 @@ impl Wal for WalFile { /// Read a frame from the WAL. #[instrument(skip_all, level = Level::DEBUG)] - fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Arc) -> Result<()> { + fn read_frame( + &self, + frame_id: u64, + page: PageRef, + buffer_pool: Arc, + ) -> Result { tracing::debug!("read_frame({})", frame_id); let offset = self.frame_offset(frame_id); page.set_locked(); @@ -626,13 +637,12 @@ impl Wal for WalFile { let frame = frame.clone(); finish_read_page(page.get().id, buf, frame).unwrap(); }); - let c = begin_read_wal_frame( + begin_read_wal_frame( &self.get_shared().file, offset + WAL_FRAME_HEADER_SIZE, buffer_pool, complete, - )?; - Ok(()) + ) } #[instrument(skip_all, level = Level::DEBUG)] @@ -748,12 +758,12 @@ impl Wal for WalFile { page: PageRef, db_size: u32, write_counter: Rc>, - ) -> Result<()> { + ) -> Result { let page_id = page.get().id; let frame_id = self.max_frame + 1; let offset = self.frame_offset(frame_id); tracing::debug!(frame_id, offset, page_id); - let checksums = { + let (c, checksums) = { let shared = self.get_shared(); let header = shared.wal_header.clone(); let header = header.lock(); @@ -789,10 +799,10 @@ impl Wal for WalFile { *write_counter.borrow_mut() -= 1; return Err(err); } - frame_checksums + (result.unwrap(), frame_checksums) }; self.complete_append_frame(page_id as u64, frame_id, checksums); - Ok(()) + Ok(c) } #[instrument(skip_all, level = Level::DEBUG)] @@ -894,7 +904,7 @@ impl Wal for WalFile { ); self.ongoing_checkpoint.page.get().id = page as usize; - self.read_frame( + let c = self.read_frame( *frame, self.ongoing_checkpoint.page.clone(), self.buffer_pool.clone(), @@ -1217,7 +1227,9 @@ impl WalFileShared { ); wal_header.checksum_1 = checksums.0; wal_header.checksum_2 = checksums.1; - sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?; + let c = sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?; + // TODO: for now wait for completion + io.wait_for_completion(c)?; let header = Arc::new(SpinLock::new(wal_header)); let checksum = { let checksum = header.lock();