fix checkpoint

This commit is contained in:
Pere Diaz Bou
2024-12-15 18:55:23 +00:00
parent de3449be5a
commit 37005a23d2
3 changed files with 96 additions and 52 deletions

View File

@@ -128,6 +128,8 @@ enum FlushState {
#[derive(Clone, Debug)]
enum CheckpointState {
Checkpoint,
SyncDbFile,
WaitSyncDbFile,
CheckpointDone,
}
@@ -314,7 +316,6 @@ impl Pager {
self.wal.borrow_mut().append_frame(
page.clone(),
db_size,
self,
self.flush_info.borrow().in_flight_writes.clone(),
)?;
}
@@ -380,11 +381,23 @@ impl Pager {
match self.wal.borrow_mut().checkpoint(self, in_flight)? {
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
CheckpointStatus::Done => {
self.checkpoint_state
.replace(CheckpointState::CheckpointDone);
self.checkpoint_state.replace(CheckpointState::SyncDbFile);
}
};
}
CheckpointState::SyncDbFile => {
sqlite3_ondisk::begin_sync(self.page_io.clone(), self.syncing.clone())?;
self.checkpoint_state
.replace(CheckpointState::WaitSyncDbFile);
}
CheckpointState::WaitSyncDbFile => {
if *self.syncing.borrow() {
return Ok(CheckpointStatus::IO);
} else {
self.checkpoint_state
.replace(CheckpointState::CheckpointDone);
}
}
CheckpointState::CheckpointDone => {
let in_flight = self.checkpoint_inflight.clone();
if *in_flight.borrow() > 0 {
@@ -406,7 +419,9 @@ impl Pager {
.borrow_mut()
.checkpoint(self, Rc::new(RefCell::new(0)))
{
Ok(CheckpointStatus::IO) => {}
Ok(CheckpointStatus::IO) => {
self.io.run_once();
}
Ok(CheckpointStatus::Done) => {
break;
}

View File

@@ -43,7 +43,6 @@ pub trait Wal {
&mut self,
page: PageRef,
db_size: u32,
pager: &Pager,
write_counter: Rc<RefCell<usize>>,
) -> Result<()>;
@@ -58,6 +57,12 @@ pub trait Wal {
fn get_min_frame(&self) -> u64;
}
#[derive(Copy, Clone)]
enum SyncState {
Start,
Wait,
}
struct OngoingCheckpoint {
page: PageRef,
state: CheckpointState,
@@ -70,6 +75,7 @@ pub struct WalFile {
io: Arc<dyn crate::io::IO>,
buffer_pool: Rc<BufferPool>,
sync_state: RefCell<SyncState>,
syncing: Rc<RefCell<bool>>,
page_size: usize,
@@ -100,6 +106,7 @@ pub enum CheckpointState {
ReadFrame,
WaitReadFrame,
WritePage,
WaitWritePage,
Done,
}
@@ -143,6 +150,7 @@ impl Wal for WalFile {
debug!("read_frame({})", frame_id);
let offset = self.frame_offset(frame_id);
let shared = self.shared.read().unwrap();
page.set_locked();
begin_read_wal_frame(
&shared.file,
offset + WAL_FRAME_HEADER_SIZE,
@@ -157,7 +165,6 @@ impl Wal for WalFile {
&mut self,
page: PageRef,
db_size: u32,
_pager: &Pager,
write_counter: Rc<RefCell<usize>>,
) -> Result<()> {
let page_id = page.get().id;
@@ -231,38 +238,40 @@ impl Wal for WalFile {
}
CheckpointState::ReadFrame => {
let shared = self.shared.read().unwrap();
for page in shared
.pages_in_frames
.iter()
.skip(self.ongoing_checkpoint.current_page as usize)
if self.ongoing_checkpoint.current_page as usize >= shared.pages_in_frames.len()
{
let frames = shared
.frame_cache
.get(page)
.expect("page must be in frame cache if it's in list");
for frame in frames.iter().rev() {
if *frame <= self.ongoing_checkpoint.max_frame {
log::debug!(
"checkpoint page(state={:?}, page={}, frame={})",
state,
*page,
*frame
);
self.ongoing_checkpoint.page.get().id = *page as usize;
self.read_frame(
*frame,
self.ongoing_checkpoint.page.clone(),
self.buffer_pool.clone(),
)?;
self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame;
self.ongoing_checkpoint.current_page += 1;
continue 'checkpoint_loop;
}
}
self.ongoing_checkpoint.current_page += 1;
self.ongoing_checkpoint.state = CheckpointState::Done;
continue 'checkpoint_loop;
}
self.ongoing_checkpoint.state = CheckpointState::Done;
let page =
shared.pages_in_frames[self.ongoing_checkpoint.current_page as usize];
let frames = shared
.frame_cache
.get(&page)
.expect("page must be in frame cache if it's in list");
for frame in frames.iter().rev() {
// TODO: do proper selection of frames to checkpoint
if *frame >= self.ongoing_checkpoint.min_frame {
log::debug!(
"checkpoint page(state={:?}, page={}, frame={})",
state,
page,
*frame
);
self.ongoing_checkpoint.page.get().id = page as usize;
self.read_frame(
*frame,
self.ongoing_checkpoint.page.clone(),
self.buffer_pool.clone(),
)?;
self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame;
self.ongoing_checkpoint.current_page += 1;
continue 'checkpoint_loop;
}
}
self.ongoing_checkpoint.current_page += 1;
}
CheckpointState::WaitReadFrame => {
if self.ongoing_checkpoint.page.is_locked() {
@@ -272,11 +281,18 @@ impl Wal for WalFile {
}
}
CheckpointState::WritePage => {
self.ongoing_checkpoint.page.set_dirty();
begin_write_btree_page(
pager,
&self.ongoing_checkpoint.page,
write_counter.clone(),
)?;
self.ongoing_checkpoint.state = CheckpointState::WaitWritePage;
}
CheckpointState::WaitWritePage => {
if *write_counter.borrow() > 0 {
return Ok(CheckpointStatus::IO);
}
let shared = self.shared.read().unwrap();
if (self.ongoing_checkpoint.current_page as usize)
< shared.pages_in_frames.len()
@@ -303,21 +319,33 @@ impl Wal for WalFile {
}
fn sync(&mut self) -> Result<CheckpointStatus> {
let shared = self.shared.write().unwrap();
{
let syncing = self.syncing.clone();
let completion = Completion::Sync(SyncCompletion {
complete: Box::new(move |_| {
*syncing.borrow_mut() = false;
}),
});
shared.file.sync(Rc::new(completion))?;
}
if *self.syncing.borrow() {
Ok(CheckpointStatus::IO)
} else {
Ok(CheckpointStatus::Done)
let state = *self.sync_state.borrow();
match state {
SyncState::Start => {
let shared = self.shared.write().unwrap();
log::debug!("wal_sync");
{
let syncing = self.syncing.clone();
*syncing.borrow_mut() = true;
let completion = Completion::Sync(SyncCompletion {
complete: Box::new(move |_| {
log::debug!("wal_sync finish");
*syncing.borrow_mut() = false;
}),
});
shared.file.sync(Rc::new(completion))?;
}
self.sync_state.replace(SyncState::Wait);
Ok(CheckpointStatus::IO)
}
SyncState::Wait => {
if *self.syncing.borrow() {
Ok(CheckpointStatus::IO)
} else {
self.sync_state.replace(SyncState::Start);
Ok(CheckpointStatus::Done)
}
}
}
}
@@ -366,6 +394,7 @@ impl WalFile {
max_frame: 0,
min_frame: 0,
buffer_pool,
sync_state: RefCell::new(SyncState::Start),
}
}

View File

@@ -257,7 +257,7 @@ mod tests {
for i in 0..iterations {
let insert_query = format!("INSERT INTO test VALUES ({})", i);
do_flush(&conn, &tmp_db)?;
conn.clear_page_cache().unwrap();
conn.checkpoint().unwrap();
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.next_row()? {