checkpoint on drop connection

This commit is contained in:
Pere Diaz Bou
2024-11-12 17:03:30 +01:00
parent 807496a146
commit 48f0e72e14
6 changed files with 147 additions and 13 deletions

View File

@@ -283,6 +283,27 @@ impl Connection {
self.pager.clear_page_cache();
Ok(())
}
pub fn checkpoint(&self) -> Result<()> {
self.pager.clear_page_cache();
Ok(())
}
}
impl Drop for Connection {
fn drop(&mut self) {
loop {
// TODO: make this async?
match self.pager.checkpoint().unwrap() {
CheckpointStatus::Done => {
return;
}
CheckpointStatus::IO => {
self.pager.io.run_once().unwrap();
}
};
}
}
}
pub struct Statement {

View File

@@ -304,6 +304,12 @@ enum FlushState {
WaitSyncDbFile,
}
#[derive(Clone)]
enum CheckpointState {
Checkpoint,
CheckpointDone,
}
/// This will keep track of the state of current cache flush in order to not repeat work
struct FlushInfo {
state: FlushState,
@@ -329,6 +335,7 @@ pub struct Pager {
db_header: Rc<RefCell<DatabaseHeader>>,
flush_info: RefCell<FlushInfo>,
checkpoint_state: RefCell<CheckpointState>,
syncing: Rc<RefCell<bool>>,
}
@@ -362,6 +369,7 @@ impl Pager {
in_flight_writes: Rc::new(RefCell::new(0)),
}),
syncing: Rc::new(RefCell::new(false)),
checkpoint_state: RefCell::new(CheckpointState::Checkpoint),
})
}
@@ -376,7 +384,10 @@ impl Pager {
}
pub fn end_tx(&self) -> Result<CheckpointStatus> {
self.cacheflush()?;
match self.cacheflush()? {
CheckpointStatus::Done => {}
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
};
self.wal.borrow().end_read_tx()?;
Ok(CheckpointStatus::Done)
}
@@ -492,12 +503,11 @@ impl Pager {
return Ok(CheckpointStatus::IO);
}
FlushState::Checkpoint => {
let in_flight = self.flush_info.borrow().in_flight_writes.clone();
match self.wal.borrow_mut().checkpoint(self, in_flight)? {
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
match self.checkpoint()? {
CheckpointStatus::Done => {
self.flush_info.borrow_mut().state = FlushState::CheckpointDone;
self.flush_info.borrow_mut().state = FlushState::SyncDbFile;
}
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
};
}
FlushState::CheckpointDone => {
@@ -540,6 +550,33 @@ impl Pager {
Ok(CheckpointStatus::Done)
}
pub fn checkpoint(&self) -> Result<CheckpointStatus> {
loop {
let state = self.checkpoint_state.borrow().clone();
match state {
CheckpointState::Checkpoint => {
let in_flight = self.flush_info.borrow().in_flight_writes.clone();
match self.wal.borrow_mut().checkpoint(self, in_flight)? {
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
CheckpointStatus::Done => {
self.checkpoint_state
.replace(CheckpointState::CheckpointDone);
}
};
}
CheckpointState::CheckpointDone => {
let in_flight = self.flush_info.borrow().in_flight_writes.clone();
if *in_flight.borrow() > 0 {
return Ok(CheckpointStatus::IO);
} else {
self.checkpoint_state.replace(CheckpointState::Checkpoint);
return Ok(CheckpointStatus::Done);
}
}
}
}
}
// WARN: used for testing purposes
pub fn clear_page_cache(&self) {
loop {

View File

@@ -48,7 +48,7 @@ use crate::storage::database::DatabaseStorage;
use crate::storage::pager::{Page, Pager};
use crate::types::{OwnedRecord, OwnedValue};
use crate::{File, Result};
use log::trace;
use log::{debug, trace};
use std::cell::RefCell;
use std::pin::Pin;
use std::rc::Rc;
@@ -1018,6 +1018,7 @@ pub fn begin_write_wal_frame(
) -> Result<()> {
let page_finish = page.clone();
let page_id = page.borrow().id;
trace!("begin_write_wal_frame(offset={}, page={})", offset, page_id);
let header = WalFrameHeader {
page_number: page_id as u32,
@@ -1039,12 +1040,12 @@ pub fn begin_write_wal_frame(
);
let buf = buffer.as_mut_slice();
buf[0..4].copy_from_slice(&header.page_number.to_ne_bytes());
buf[4..8].copy_from_slice(&header.db_size.to_ne_bytes());
buf[8..12].copy_from_slice(&header.salt_1.to_ne_bytes());
buf[12..16].copy_from_slice(&header.salt_2.to_ne_bytes());
buf[16..20].copy_from_slice(&header.checksum_1.to_ne_bytes());
buf[20..24].copy_from_slice(&header.checksum_2.to_ne_bytes());
buf[0..4].copy_from_slice(&header.page_number.to_be_bytes());
buf[4..8].copy_from_slice(&header.db_size.to_be_bytes());
buf[8..12].copy_from_slice(&header.salt_1.to_be_bytes());
buf[12..16].copy_from_slice(&header.salt_2.to_be_bytes());
buf[16..20].copy_from_slice(&header.checksum_1.to_be_bytes());
buf[20..24].copy_from_slice(&header.checksum_2.to_be_bytes());
buf[WAL_FRAME_HEADER_SIZE..].copy_from_slice(&contents.as_ptr());
Rc::new(RefCell::new(buffer))

View File

@@ -1,7 +1,7 @@
use std::collections::{HashMap, HashSet};
use std::{cell::RefCell, rc::Rc, sync::Arc};
use log::debug;
use log::{debug, trace};
use crate::io::{File, SyncCompletion, IO};
use crate::storage::sqlite3_ondisk::{
@@ -140,6 +140,12 @@ impl Wal for WalFile {
let page_id = page.borrow().id;
let frame_id = *self.max_frame.borrow();
let offset = self.frame_offset(frame_id);
trace!(
"append_frame(frame={}, offset={}, page_id={})",
frame_id,
offset,
page_id
);
begin_write_wal_frame(
self.file.borrow().as_ref().unwrap(),
offset,

View File

@@ -1098,6 +1098,7 @@ impl Program {
)));
}
}
log::trace!("Halt auto_commit {}", self.auto_commit);
if self.auto_commit {
return match pager.end_tx() {
Ok(crate::storage::wal::CheckpointStatus::IO) => Ok(StepResult::IO),

View File

@@ -306,6 +306,74 @@ mod tests {
Ok(())
}
#[test]
fn test_wal_restart() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY);");
// threshold is 1000 by default
fn insert(i: usize, conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {
let insert_query = format!("INSERT INTO test VALUES ({})", i);
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.next_row()? {
RowResult::IO => {
tmp_db.io.run_once()?;
}
RowResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
tmp_db.io.run_once()?;
Ok(())
}
fn count(conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<usize> {
let list_query = "SELECT count(x) FROM test";
loop {
match conn.query(list_query).unwrap() {
Some(ref mut rows) => loop {
match rows.next_row()? {
RowResult::Row(row) => {
let first_value = &row.values[0];
let count = match first_value {
Value::Integer(i) => *i as i32,
_ => unreachable!(),
};
return Ok(count as usize);
}
RowResult::IO => {
tmp_db.io.run_once()?;
}
RowResult::Done => break,
}
},
None => {}
}
}
}
{
let conn = tmp_db.connect_limbo();
insert(1, &conn, &tmp_db).unwrap();
assert_eq!(count(&conn, &tmp_db).unwrap(), 1);
}
{
let conn = tmp_db.connect_limbo();
assert_eq!(
count(&conn, &tmp_db).unwrap(),
1,
"failed to read from wal from another connection"
);
}
Ok(())
}
fn compare_string(a: &String, b: &String) {
assert_eq!(a.len(), b.len(), "Strings are not equal in size!");
let a = a.as_bytes();