cacheflush state machine

This commit is contained in:
Pere Diaz Bou
2024-09-27 18:17:27 +02:00
parent 129cc1cd6d
commit fc65c5096d
8 changed files with 211 additions and 26 deletions

View File

@@ -71,7 +71,7 @@ impl IO for DarwinIO {
let c: &Completion = c;
let r = match c {
Completion::Read(r) => r,
Completion::Write(_) => unreachable!(),
_ => unreachable!(),
};
let mut buf = r.buf_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
@@ -192,7 +192,7 @@ impl File for DarwinFile {
let result = {
let r = match &(*c) {
Completion::Read(r) => r,
Completion::Write(_) => unreachable!(),
_ => unreachable!(),
};
let mut buf = r.buf_mut();
rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64)
@@ -259,6 +259,19 @@ impl File for DarwinFile {
Err(e) => Err(e.into()),
}
}
fn sync(&self, c: Rc<Completion>) -> Result<()> {
let file = self.file.borrow();
let result = rustix::fs::fsync(file.as_fd());
match result {
std::result::Result::Ok(()) => {
trace!("fsync");
c.complete(0);
Ok(())
}
Err(e) => Err(e.into()),
}
}
}
impl Drop for DarwinFile {

View File

@@ -260,6 +260,22 @@ impl File for LinuxFile {
io.ring.submit_entry(&write);
Ok(())
}
fn sync(&self, c: Rc<Completion>) -> Result<()> {
let mut io = self.io.borrow_mut();
let fd = io_uring::types::Fd(self.file.as_raw_fd());
let ptr = Rc::into_raw(c.clone());
let sync = io_uring::opcode::Fsync::new(fd)
.build()
.user_data(ptr as u64);
let ring = &mut io.ring;
unsafe {
ring.submission()
.push(&sync)
.expect("submission queue is full");
}
Ok(())
}
}
impl Drop for LinuxFile {

View File

@@ -14,6 +14,7 @@ pub trait File {
fn unlock_file(&self) -> Result<()>;
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()>;
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<Completion>) -> Result<()>;
fn sync(&self, c: Rc<Completion>) -> Result<()>;
}
pub enum OpenFlags {
@@ -33,10 +34,12 @@ pub trait IO {
pub type Complete = dyn Fn(Rc<RefCell<Buffer>>);
pub type WriteComplete = dyn Fn(i32);
pub type SyncComplete = dyn Fn(i32);
pub enum Completion {
Read(ReadCompletion),
Write(WriteCompletion),
Sync(SyncCompletion),
}
pub struct ReadCompletion {
@@ -48,7 +51,8 @@ impl Completion {
pub fn complete(&self, result: i32) {
match self {
Completion::Read(r) => r.complete(),
Completion::Write(w) => w.complete(result), // fix
Completion::Write(w) => w.complete(result),
Completion::Sync(s) => s.complete(result), // fix
}
}
}
@@ -57,6 +61,10 @@ pub struct WriteCompletion {
pub complete: Box<WriteComplete>,
}
pub struct SyncCompletion {
pub complete: Box<SyncComplete>,
}
impl ReadCompletion {
pub fn new(buf: Rc<RefCell<Buffer>>, complete: Box<Complete>) -> Self {
Self { buf, complete }
@@ -79,11 +87,22 @@ impl WriteCompletion {
pub fn new(complete: Box<WriteComplete>) -> Self {
Self { complete }
}
pub fn complete(&self, bytes_written: i32) {
(self.complete)(bytes_written);
}
}
impl SyncCompletion {
pub fn new(complete: Box<SyncComplete>) -> Self {
Self { complete }
}
pub fn complete(&self, res: i32) {
(self.complete)(res);
}
}
pub type BufferData = Pin<Vec<u8>>;
pub type BufferDropFn = Rc<dyn Fn(BufferData)>;

View File

@@ -82,4 +82,10 @@ impl File for WindowsFile {
file.write_all(buf)?;
Ok(())
}
fn sync(&self, c: Rc<Completion>) -> Result<()> {
let mut file = self.file.borrow_mut();
file.sync_all()?;
Ok(())
}
}

View File

@@ -26,7 +26,7 @@ impl DatabaseStorage for FileStorage {
fn read_page(&self, page_idx: usize, c: Rc<Completion>) -> Result<()> {
let r = match &(*c) {
Completion::Read(r) => r,
Completion::Write(_) => unreachable!(),
_ => unreachable!(),
};
let size = r.buf().len();
assert!(page_idx > 0);

View File

@@ -265,6 +265,21 @@ impl<K: Eq + Hash + Clone, V> PageCache<K, V> {
}
}
#[derive(Clone)]
enum FlushState {
Start,
FramesDone,
CheckpointDone,
Syncing,
}
/// This will keep track of the state of current cache flush in order to not repeat work
struct FlushInfo {
state: FlushState,
/// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync.
in_flight_writes: Rc<RefCell<usize>>,
}
/// The pager interface implements the persistence layer by providing access
/// to pages of the database file, including caching, concurrency control, and
/// transaction management.
@@ -281,6 +296,8 @@ pub struct Pager {
pub io: Arc<dyn crate::io::IO>,
dirty_pages: Rc<RefCell<HashSet<usize>>>,
db_header: Rc<RefCell<DatabaseHeader>>,
flush_info: RefCell<FlushInfo>,
}
impl Pager {
@@ -308,6 +325,10 @@ impl Pager {
io,
dirty_pages: Rc::new(RefCell::new(HashSet::new())),
db_header: db_header_ref.clone(),
flush_info: RefCell::new(FlushInfo {
state: FlushState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
}),
})
}
@@ -375,20 +396,59 @@ impl Pager {
}
pub fn cacheflush(&self) -> Result<CheckpointStatus> {
let db_size = self.db_header.borrow().database_size;
for page_id in self.dirty_pages.borrow().iter() {
let mut cache = self.page_cache.borrow_mut();
let page = cache.get(&page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
self.wal
.borrow_mut()
.append_frame(page.clone(), db_size, self)?;
if matches!(self.flush_info.borrow().state.clone(), FlushState::Start) {
let db_size = self.db_header.borrow().database_size;
for page_id in self.dirty_pages.borrow().iter() {
let mut cache = self.page_cache.borrow_mut();
let page = cache.get(&page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
self.wal.borrow_mut().append_frame(
page.clone(),
db_size,
self,
self.flush_info.borrow().in_flight_writes.clone(),
)?;
}
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = FlushState::FramesDone;
}
// remove before checkpoint so we can retry cacheflush if needed
self.dirty_pages.borrow_mut().clear();
let should_checkpoint = self.wal.borrow().should_checkpoint();
if should_checkpoint {
self.wal.borrow_mut().checkpoint(self)?;
if matches!(
self.flush_info.borrow().state.clone(),
FlushState::FramesDone
) {
let should_checkpoint = self.wal.borrow().should_checkpoint();
if should_checkpoint {
match self
.wal
.borrow_mut()
.checkpoint(self, self.flush_info.borrow().in_flight_writes.clone())
{
Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO),
Ok(CheckpointStatus::Done) => {}
Err(e) => return Err(e),
};
}
self.flush_info.borrow_mut().state = FlushState::CheckpointDone;
}
if matches!(
self.flush_info.borrow().state.clone(),
FlushState::CheckpointDone
) {
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
if in_flight == 0 {
self.flush_info.borrow_mut().state = FlushState::Syncing;
}
}
if matches!(self.flush_info.borrow().state.clone(), FlushState::Syncing) {
println!("syncing");
match self.wal.borrow_mut().sync() {
Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO),
Ok(CheckpointStatus::Done) => {}
Err(e) => return Err(e),
}
self.flush_info.borrow_mut().state = FlushState::Start;
}
Ok(CheckpointStatus::Done)
}
@@ -396,7 +456,11 @@ impl Pager {
// WARN: used for testing purposes
pub fn clear_page_cache(&self) {
loop {
match self.wal.borrow_mut().checkpoint(self) {
match self
.wal
.borrow_mut()
.checkpoint(self, Rc::new(RefCell::new(0)))
{
Ok(CheckpointStatus::IO) => {}
Ok(CheckpointStatus::Done) => {
break;

View File

@@ -528,7 +528,11 @@ fn finish_read_page(
Ok(())
}
pub fn begin_write_btree_page(pager: &Pager, page: &Rc<RefCell<Page>>) -> Result<()> {
pub fn begin_write_btree_page(
pager: &Pager,
page: &Rc<RefCell<Page>>,
write_counter: Rc<RefCell<usize>>,
) -> Result<()> {
let page_source = &pager.page_io;
let page_finish = page.clone();
@@ -540,11 +544,13 @@ pub fn begin_write_btree_page(pager: &Pager, page: &Rc<RefCell<Page>>) -> Result
contents.buffer.clone()
};
*write_counter.borrow_mut() += 1;
let write_complete = {
let buf_copy = buffer.clone();
Box::new(move |bytes_written: i32| {
let buf_copy = buf_copy.clone();
let buf_len = buf_copy.borrow().len();
*write_counter.borrow_mut() -= 1;
page_finish.borrow_mut().clear_dirty();
if bytes_written < buf_len as i32 {
@@ -994,6 +1000,7 @@ pub fn begin_write_wal_frame(
offset: usize,
page: &Rc<RefCell<Page>>,
db_size: u32,
write_counter: Rc<RefCell<usize>>,
) -> Result<()> {
let page_finish = page.clone();
let page_id = page.borrow().id;
@@ -1029,11 +1036,13 @@ pub fn begin_write_wal_frame(
Rc::new(RefCell::new(buffer))
};
*write_counter.borrow_mut() += 1;
let write_complete = {
let buf_copy = buffer.clone();
Box::new(move |bytes_written: i32| {
let buf_copy = buf_copy.clone();
let buf_len = buf_copy.borrow().len();
*write_counter.borrow_mut() -= 1;
page_finish.borrow_mut().clear_dirty();
if bytes_written < buf_len as i32 {

View File

@@ -1,16 +1,17 @@
use std::collections::{HashMap, HashSet};
use std::{cell::RefCell, rc::Rc, sync::Arc};
use crate::io::{File, IO};
use crate::io::{File, SyncCompletion, IO};
use crate::storage::sqlite3_ondisk::{
begin_read_page, begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE,
WAL_HEADER_SIZE,
};
use crate::Completion;
use crate::{storage::pager::Page, Result};
use super::buffer_pool::BufferPool;
use super::pager::Pager;
use super::sqlite3_ondisk;
use super::sqlite3_ondisk::{self, begin_write_btree_page};
/// Write-ahead log (WAL).
pub trait Wal {
@@ -38,10 +39,21 @@ pub trait Wal {
) -> Result<()>;
/// Write a frame to the WAL.
fn append_frame(&mut self, page: Rc<RefCell<Page>>, db_size: u32, pager: &Pager) -> Result<()>;
fn append_frame(
&mut self,
page: Rc<RefCell<Page>>,
db_size: u32,
pager: &Pager,
write_counter: Rc<RefCell<usize>>,
) -> Result<()>;
fn should_checkpoint(&self) -> bool;
fn checkpoint(&mut self, pager: &Pager) -> Result<CheckpointStatus>;
fn checkpoint(
&mut self,
pager: &Pager,
write_counter: Rc<RefCell<usize>>,
) -> Result<CheckpointStatus>;
fn sync(&mut self) -> Result<CheckpointStatus>;
}
#[cfg(feature = "fs")]
@@ -57,6 +69,8 @@ pub struct WalFile {
frame_cache: RefCell<HashMap<u64, Vec<u64>>>, // FIXME: for now let's use a simple hashmap instead of a shm file
checkpoint_threshold: usize,
ongoing_checkpoint: HashSet<usize>,
syncing: Rc<RefCell<bool>>,
}
pub enum CheckpointStatus {
@@ -112,12 +126,24 @@ impl Wal for WalFile {
}
/// Write a frame to the WAL.
fn append_frame(&mut self, page: Rc<RefCell<Page>>, db_size: u32, pager: &Pager) -> Result<()> {
fn append_frame(
&mut self,
page: Rc<RefCell<Page>>,
db_size: u32,
pager: &Pager,
write_counter: Rc<RefCell<usize>>,
) -> Result<()> {
self.ensure_init()?;
let page_id = page.borrow().id;
let frame_id = *self.max_frame.borrow();
let offset = self.frame_offset(frame_id);
begin_write_wal_frame(self.file.borrow().as_ref().unwrap(), offset, &page, db_size)?;
begin_write_wal_frame(
self.file.borrow().as_ref().unwrap(),
offset,
&page,
db_size,
write_counter,
)?;
self.max_frame.replace(frame_id + 1);
{
let mut frame_cache = self.frame_cache.borrow_mut();
@@ -151,23 +177,54 @@ impl Wal for WalFile {
}
}
fn checkpoint(&mut self, pager: &Pager) -> Result<CheckpointStatus> {
fn checkpoint(
&mut self,
pager: &Pager,
write_counter: Rc<RefCell<usize>>,
) -> Result<CheckpointStatus> {
for (page_id, _frames) in self.frame_cache.borrow().iter() {
// move page from WAL to database file
// TODO(Pere): use splice syscall in linux to do zero-copy file page movements to improve perf
let page_id = *page_id as usize;
if self.ongoing_checkpoint.contains(&page_id) {
continue;
}
let page = pager.read_page(page_id)?;
if page.borrow().is_locked() {
return Ok(CheckpointStatus::IO);
}
pager.put_page(page_id, page);
begin_write_btree_page(pager, &page, write_counter.clone());
self.ongoing_checkpoint.insert(page_id);
}
self.frame_cache.borrow_mut().clear();
*self.max_frame.borrow_mut() = 0;
self.ongoing_checkpoint.clear();
Ok(CheckpointStatus::Done)
}
fn sync(&mut self) -> Result<CheckpointStatus> {
self.ensure_init()?;
let file = self.file.borrow();
let file = file.as_ref().unwrap();
{
let syncing = self.syncing.clone();
let completion = Completion::Sync(SyncCompletion {
complete: Box::new(move |_| {
*syncing.borrow_mut() = false;
}),
});
file.sync(Rc::new(completion))?;
}
if *self.syncing.borrow() {
return Ok(CheckpointStatus::IO);
} else {
return Ok(CheckpointStatus::Done);
}
}
}
#[cfg(feature = "fs")]
@@ -184,6 +241,7 @@ impl WalFile {
nbackfills: RefCell::new(0),
checkpoint_threshold: 1000,
ongoing_checkpoint: HashSet::new(),
syncing: Rc::new(RefCell::new(false)),
}
}