diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 2c80210d4..105a8a75a 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -128,6 +128,8 @@ enum FlushState { #[derive(Clone, Debug)] enum CheckpointState { Checkpoint, + SyncDbFile, + WaitSyncDbFile, CheckpointDone, } @@ -314,7 +316,6 @@ impl Pager { self.wal.borrow_mut().append_frame( page.clone(), db_size, - self, self.flush_info.borrow().in_flight_writes.clone(), )?; } @@ -380,11 +381,23 @@ impl Pager { match self.wal.borrow_mut().checkpoint(self, in_flight)? { CheckpointStatus::IO => return Ok(CheckpointStatus::IO), CheckpointStatus::Done => { - self.checkpoint_state - .replace(CheckpointState::CheckpointDone); + self.checkpoint_state.replace(CheckpointState::SyncDbFile); } }; } + CheckpointState::SyncDbFile => { + sqlite3_ondisk::begin_sync(self.page_io.clone(), self.syncing.clone())?; + self.checkpoint_state + .replace(CheckpointState::WaitSyncDbFile); + } + CheckpointState::WaitSyncDbFile => { + if *self.syncing.borrow() { + return Ok(CheckpointStatus::IO); + } else { + self.checkpoint_state + .replace(CheckpointState::CheckpointDone); + } + } CheckpointState::CheckpointDone => { let in_flight = self.checkpoint_inflight.clone(); if *in_flight.borrow() > 0 { @@ -406,7 +419,9 @@ impl Pager { .borrow_mut() .checkpoint(self, Rc::new(RefCell::new(0))) { - Ok(CheckpointStatus::IO) => {} + Ok(CheckpointStatus::IO) => { + self.io.run_once(); + } Ok(CheckpointStatus::Done) => { break; } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 314ce5503..fc44dd932 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -43,7 +43,6 @@ pub trait Wal { &mut self, page: PageRef, db_size: u32, - pager: &Pager, write_counter: Rc>, ) -> Result<()>; @@ -58,6 +57,12 @@ pub trait Wal { fn get_min_frame(&self) -> u64; } +#[derive(Copy, Clone)] +enum SyncState { + Start, + Wait, +} + struct OngoingCheckpoint { page: PageRef, state: CheckpointState, @@ -70,6 +75,7 @@ pub struct WalFile { io: Arc, buffer_pool: Rc, + sync_state: RefCell, syncing: Rc>, page_size: usize, @@ -100,6 +106,7 @@ pub enum CheckpointState { ReadFrame, WaitReadFrame, WritePage, + WaitWritePage, Done, } @@ -143,6 +150,7 @@ impl Wal for WalFile { debug!("read_frame({})", frame_id); let offset = self.frame_offset(frame_id); let shared = self.shared.read().unwrap(); + page.set_locked(); begin_read_wal_frame( &shared.file, offset + WAL_FRAME_HEADER_SIZE, @@ -157,7 +165,6 @@ impl Wal for WalFile { &mut self, page: PageRef, db_size: u32, - _pager: &Pager, write_counter: Rc>, ) -> Result<()> { let page_id = page.get().id; @@ -231,38 +238,40 @@ impl Wal for WalFile { } CheckpointState::ReadFrame => { let shared = self.shared.read().unwrap(); - for page in shared - .pages_in_frames - .iter() - .skip(self.ongoing_checkpoint.current_page as usize) + if self.ongoing_checkpoint.current_page as usize >= shared.pages_in_frames.len() { - let frames = shared - .frame_cache - .get(page) - .expect("page must be in frame cache if it's in list"); - - for frame in frames.iter().rev() { - if *frame <= self.ongoing_checkpoint.max_frame { - log::debug!( - "checkpoint page(state={:?}, page={}, frame={})", - state, - *page, - *frame - ); - self.ongoing_checkpoint.page.get().id = *page as usize; - self.read_frame( - *frame, - self.ongoing_checkpoint.page.clone(), - self.buffer_pool.clone(), - )?; - self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame; - self.ongoing_checkpoint.current_page += 1; - continue 'checkpoint_loop; - } - } - self.ongoing_checkpoint.current_page += 1; + self.ongoing_checkpoint.state = CheckpointState::Done; + continue 'checkpoint_loop; } - self.ongoing_checkpoint.state = CheckpointState::Done; + let page = + shared.pages_in_frames[self.ongoing_checkpoint.current_page as usize]; + let frames = shared + .frame_cache + .get(&page) + .expect("page must be in frame cache if it's in list"); + + for frame in frames.iter().rev() { + // TODO: do proper selection of frames to checkpoint + if *frame >= self.ongoing_checkpoint.min_frame { + log::debug!( + "checkpoint page(state={:?}, page={}, frame={})", + state, + page, + *frame + ); + self.ongoing_checkpoint.page.get().id = page as usize; + + self.read_frame( + *frame, + self.ongoing_checkpoint.page.clone(), + self.buffer_pool.clone(), + )?; + self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame; + self.ongoing_checkpoint.current_page += 1; + continue 'checkpoint_loop; + } + } + self.ongoing_checkpoint.current_page += 1; } CheckpointState::WaitReadFrame => { if self.ongoing_checkpoint.page.is_locked() { @@ -272,11 +281,18 @@ impl Wal for WalFile { } } CheckpointState::WritePage => { + self.ongoing_checkpoint.page.set_dirty(); begin_write_btree_page( pager, &self.ongoing_checkpoint.page, write_counter.clone(), )?; + self.ongoing_checkpoint.state = CheckpointState::WaitWritePage; + } + CheckpointState::WaitWritePage => { + if *write_counter.borrow() > 0 { + return Ok(CheckpointStatus::IO); + } let shared = self.shared.read().unwrap(); if (self.ongoing_checkpoint.current_page as usize) < shared.pages_in_frames.len() @@ -303,21 +319,33 @@ impl Wal for WalFile { } fn sync(&mut self) -> Result { - let shared = self.shared.write().unwrap(); - { - let syncing = self.syncing.clone(); - let completion = Completion::Sync(SyncCompletion { - complete: Box::new(move |_| { - *syncing.borrow_mut() = false; - }), - }); - shared.file.sync(Rc::new(completion))?; - } - - if *self.syncing.borrow() { - Ok(CheckpointStatus::IO) - } else { - Ok(CheckpointStatus::Done) + let state = *self.sync_state.borrow(); + match state { + SyncState::Start => { + let shared = self.shared.write().unwrap(); + log::debug!("wal_sync"); + { + let syncing = self.syncing.clone(); + *syncing.borrow_mut() = true; + let completion = Completion::Sync(SyncCompletion { + complete: Box::new(move |_| { + log::debug!("wal_sync finish"); + *syncing.borrow_mut() = false; + }), + }); + shared.file.sync(Rc::new(completion))?; + } + self.sync_state.replace(SyncState::Wait); + Ok(CheckpointStatus::IO) + } + SyncState::Wait => { + if *self.syncing.borrow() { + Ok(CheckpointStatus::IO) + } else { + self.sync_state.replace(SyncState::Start); + Ok(CheckpointStatus::Done) + } + } } } @@ -366,6 +394,7 @@ impl WalFile { max_frame: 0, min_frame: 0, buffer_pool, + sync_state: RefCell::new(SyncState::Start), } } diff --git a/test/src/lib.rs b/test/src/lib.rs index d76981d4a..bde88a1f0 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -257,7 +257,7 @@ mod tests { for i in 0..iterations { let insert_query = format!("INSERT INTO test VALUES ({})", i); do_flush(&conn, &tmp_db)?; - conn.clear_page_cache().unwrap(); + conn.checkpoint().unwrap(); match conn.query(insert_query) { Ok(Some(ref mut rows)) => loop { match rows.next_row()? {