From 48f0e72e14e5dffcda327edd034e0e3eeaf4cdea Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 12 Nov 2024 17:03:30 +0100 Subject: [PATCH] checkpoint on drop connection --- core/lib.rs | 21 +++++++++++ core/storage/pager.rs | 47 ++++++++++++++++++++--- core/storage/sqlite3_ondisk.rs | 15 ++++---- core/storage/wal.rs | 8 +++- core/vdbe/mod.rs | 1 + test/src/lib.rs | 68 ++++++++++++++++++++++++++++++++++ 6 files changed, 147 insertions(+), 13 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 36f5739d0..88609e055 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -283,6 +283,27 @@ impl Connection { self.pager.clear_page_cache(); Ok(()) } + + pub fn checkpoint(&self) -> Result<()> { + self.pager.clear_page_cache(); + Ok(()) + } +} + +impl Drop for Connection { + fn drop(&mut self) { + loop { + // TODO: make this async? + match self.pager.checkpoint().unwrap() { + CheckpointStatus::Done => { + return; + } + CheckpointStatus::IO => { + self.pager.io.run_once().unwrap(); + } + }; + } + } } pub struct Statement { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 501d3ef35..a0f29233e 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -304,6 +304,12 @@ enum FlushState { WaitSyncDbFile, } +#[derive(Clone)] +enum CheckpointState { + Checkpoint, + CheckpointDone, +} + /// This will keep track of the state of current cache flush in order to not repeat work struct FlushInfo { state: FlushState, @@ -329,6 +335,7 @@ pub struct Pager { db_header: Rc>, flush_info: RefCell, + checkpoint_state: RefCell, syncing: Rc>, } @@ -362,6 +369,7 @@ impl Pager { in_flight_writes: Rc::new(RefCell::new(0)), }), syncing: Rc::new(RefCell::new(false)), + checkpoint_state: RefCell::new(CheckpointState::Checkpoint), }) } @@ -376,7 +384,10 @@ impl Pager { } pub fn end_tx(&self) -> Result { - self.cacheflush()?; + match self.cacheflush()? { + CheckpointStatus::Done => {} + CheckpointStatus::IO => return Ok(CheckpointStatus::IO), + }; self.wal.borrow().end_read_tx()?; Ok(CheckpointStatus::Done) } @@ -492,12 +503,11 @@ impl Pager { return Ok(CheckpointStatus::IO); } FlushState::Checkpoint => { - let in_flight = self.flush_info.borrow().in_flight_writes.clone(); - match self.wal.borrow_mut().checkpoint(self, in_flight)? { - CheckpointStatus::IO => return Ok(CheckpointStatus::IO), + match self.checkpoint()? { CheckpointStatus::Done => { - self.flush_info.borrow_mut().state = FlushState::CheckpointDone; + self.flush_info.borrow_mut().state = FlushState::SyncDbFile; } + CheckpointStatus::IO => return Ok(CheckpointStatus::IO), }; } FlushState::CheckpointDone => { @@ -540,6 +550,33 @@ impl Pager { Ok(CheckpointStatus::Done) } + pub fn checkpoint(&self) -> Result { + loop { + let state = self.checkpoint_state.borrow().clone(); + match state { + CheckpointState::Checkpoint => { + let in_flight = self.flush_info.borrow().in_flight_writes.clone(); + match self.wal.borrow_mut().checkpoint(self, in_flight)? { + CheckpointStatus::IO => return Ok(CheckpointStatus::IO), + CheckpointStatus::Done => { + self.checkpoint_state + .replace(CheckpointState::CheckpointDone); + } + }; + } + CheckpointState::CheckpointDone => { + let in_flight = self.flush_info.borrow().in_flight_writes.clone(); + if *in_flight.borrow() > 0 { + return Ok(CheckpointStatus::IO); + } else { + self.checkpoint_state.replace(CheckpointState::Checkpoint); + return Ok(CheckpointStatus::Done); + } + } + } + } + } + // WARN: used for testing purposes pub fn clear_page_cache(&self) { loop { diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 56de8cfa5..ce2136a01 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -48,7 +48,7 @@ use crate::storage::database::DatabaseStorage; use crate::storage::pager::{Page, Pager}; use crate::types::{OwnedRecord, OwnedValue}; use crate::{File, Result}; -use log::trace; +use log::{debug, trace}; use std::cell::RefCell; use std::pin::Pin; use std::rc::Rc; @@ -1018,6 +1018,7 @@ pub fn begin_write_wal_frame( ) -> Result<()> { let page_finish = page.clone(); let page_id = page.borrow().id; + trace!("begin_write_wal_frame(offset={}, page={})", offset, page_id); let header = WalFrameHeader { page_number: page_id as u32, @@ -1039,12 +1040,12 @@ pub fn begin_write_wal_frame( ); let buf = buffer.as_mut_slice(); - buf[0..4].copy_from_slice(&header.page_number.to_ne_bytes()); - buf[4..8].copy_from_slice(&header.db_size.to_ne_bytes()); - buf[8..12].copy_from_slice(&header.salt_1.to_ne_bytes()); - buf[12..16].copy_from_slice(&header.salt_2.to_ne_bytes()); - buf[16..20].copy_from_slice(&header.checksum_1.to_ne_bytes()); - buf[20..24].copy_from_slice(&header.checksum_2.to_ne_bytes()); + buf[0..4].copy_from_slice(&header.page_number.to_be_bytes()); + buf[4..8].copy_from_slice(&header.db_size.to_be_bytes()); + buf[8..12].copy_from_slice(&header.salt_1.to_be_bytes()); + buf[12..16].copy_from_slice(&header.salt_2.to_be_bytes()); + buf[16..20].copy_from_slice(&header.checksum_1.to_be_bytes()); + buf[20..24].copy_from_slice(&header.checksum_2.to_be_bytes()); buf[WAL_FRAME_HEADER_SIZE..].copy_from_slice(&contents.as_ptr()); Rc::new(RefCell::new(buffer)) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index a5b98203c..c670786d9 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1,7 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::{cell::RefCell, rc::Rc, sync::Arc}; -use log::debug; +use log::{debug, trace}; use crate::io::{File, SyncCompletion, IO}; use crate::storage::sqlite3_ondisk::{ @@ -140,6 +140,12 @@ impl Wal for WalFile { let page_id = page.borrow().id; let frame_id = *self.max_frame.borrow(); let offset = self.frame_offset(frame_id); + trace!( + "append_frame(frame={}, offset={}, page_id={})", + frame_id, + offset, + page_id + ); begin_write_wal_frame( self.file.borrow().as_ref().unwrap(), offset, diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 0c292ff85..609b8bfb3 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -1098,6 +1098,7 @@ impl Program { ))); } } + log::trace!("Halt auto_commit {}", self.auto_commit); if self.auto_commit { return match pager.end_tx() { Ok(crate::storage::wal::CheckpointStatus::IO) => Ok(StepResult::IO), diff --git a/test/src/lib.rs b/test/src/lib.rs index b1393102a..0e9c2cbdd 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -306,6 +306,74 @@ mod tests { Ok(()) } + #[test] + fn test_wal_restart() -> anyhow::Result<()> { + let _ = env_logger::try_init(); + let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY);"); + // threshold is 1000 by default + + fn insert(i: usize, conn: &Rc, tmp_db: &TempDatabase) -> anyhow::Result<()> { + let insert_query = format!("INSERT INTO test VALUES ({})", i); + match conn.query(insert_query) { + Ok(Some(ref mut rows)) => loop { + match rows.next_row()? { + RowResult::IO => { + tmp_db.io.run_once()?; + } + RowResult::Done => break, + _ => unreachable!(), + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + }; + tmp_db.io.run_once()?; + Ok(()) + } + + fn count(conn: &Rc, tmp_db: &TempDatabase) -> anyhow::Result { + let list_query = "SELECT count(x) FROM test"; + loop { + match conn.query(list_query).unwrap() { + Some(ref mut rows) => loop { + match rows.next_row()? { + RowResult::Row(row) => { + let first_value = &row.values[0]; + let count = match first_value { + Value::Integer(i) => *i as i32, + _ => unreachable!(), + }; + return Ok(count as usize); + } + RowResult::IO => { + tmp_db.io.run_once()?; + } + RowResult::Done => break, + } + }, + None => {} + } + } + } + + { + let conn = tmp_db.connect_limbo(); + insert(1, &conn, &tmp_db).unwrap(); + assert_eq!(count(&conn, &tmp_db).unwrap(), 1); + } + { + let conn = tmp_db.connect_limbo(); + assert_eq!( + count(&conn, &tmp_db).unwrap(), + 1, + "failed to read from wal from another connection" + ); + } + Ok(()) + } + fn compare_string(a: &String, b: &String) { assert_eq!(a.len(), b.len(), "Strings are not equal in size!"); let a = a.as_bytes();