From 845a1ea175b6e441fa9a4566a0f7e7b5129db64e Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 25 Jul 2024 17:46:38 +0200 Subject: [PATCH] core: cacheflush and fix *Completion casting --- core/btree.rs | 20 +++++++++++--------- core/io/linux.rs | 10 +++++++--- core/io/mod.rs | 21 +++++++++++++++++---- core/lib.rs | 5 +++++ core/pager.rs | 29 ++++++++++++++++++++++++++--- core/sqlite3_ondisk.rs | 35 +++++++++++++++++++++++++++++------ core/storage.rs | 29 ++++++++++++++--------------- 7 files changed, 109 insertions(+), 40 deletions(-) diff --git a/core/btree.rs b/core/btree.rs index f8b0e11e0..16020070b 100644 --- a/core/btree.rs +++ b/core/btree.rs @@ -6,6 +6,7 @@ use crate::sqlite3_ondisk::{ use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue}; use crate::Result; +use std::borrow::BorrowMut; use std::cell::{Ref, RefCell}; use std::rc::Rc; @@ -213,13 +214,14 @@ impl BTreeCursor { mem_page.clone() }; let page_idx = mem_page.page_idx; - let page = self.pager.read_page(page_idx)?; - let page = page.borrow_mut(); + let page_ref = self.pager.read_page(page_idx)?; + let page = page_ref.borrow(); if page.is_locked() { return Ok(CursorResult::IO); } page.set_dirty(); + self.pager.add_dirty(page_ref.clone()); let mut page = page.contents.write().unwrap(); let page = page.as_mut().unwrap(); @@ -284,8 +286,8 @@ impl BTreeCursor { } else { // insert let pc = self.allocate_cell_space(page, payload.len() as u16); - let mut buf = page.buffer.borrow_mut(); - let mut buf = buf.as_mut_slice(); + let mut buf_ref = RefCell::borrow_mut(&page.buffer); + let buf: &mut [u8] = buf_ref.as_mut_slice(); buf[pc as usize..pc as usize + payload.len()].copy_from_slice(&payload); // memmove(pIns+2, pIns, 2*(pPage->nCell - i)); let pointer_area_pc_by_idx = 8 + 2 * cell_idx; @@ -313,11 +315,11 @@ impl BTreeCursor { fn allocate_cell_space(&mut self, page_ref: &BTreePage, amount: u16) -> u16 { let amount = amount as usize; - let mut page = page_ref.buffer.borrow_mut(); - let buf = page.as_mut_slice(); + let mut buf_ref = RefCell::borrow_mut(&page_ref.buffer); + let buf = buf_ref.as_mut_slice(); let cell_offset = 8; - let mut gap = cell_offset + 2 * page_ref.cells.len(); + let gap = cell_offset + 2 * page_ref.cells.len(); let mut top = page_ref.header._cell_content_area as usize; // there are free blocks and enough space @@ -355,7 +357,7 @@ impl BTreeCursor { if cloned_page.cells.len() > 0 { let buf = cloned_page.buffer.borrow(); let buf = buf.as_slice(); - let mut write_buf = page.buffer.borrow_mut(); + let mut write_buf = RefCell::borrow_mut(&page.buffer); let write_buf = write_buf.as_mut_slice(); for i in 0..cloned_page.cells.len() { @@ -393,7 +395,7 @@ impl BTreeCursor { // return SQLITE_CORRUPT_PAGE(pPage); // } assert!(cbrk >= first_cell); - let mut write_buf = page.buffer.borrow_mut(); + let mut write_buf = RefCell::borrow_mut(&page.buffer); let write_buf = write_buf.as_mut_slice(); // set new first byte of cell content diff --git a/core/io/linux.rs b/core/io/linux.rs index 71a5cd58d..3aafbf4ca 100644 --- a/core/io/linux.rs +++ b/core/io/linux.rs @@ -159,11 +159,15 @@ impl File for LinuxFile { } fn pread(&self, pos: usize, c: Rc) -> Result<()> { - trace!("pread(pos = {}, length = {})", pos, c.buf().len()); + let r = match &(*c) { + Completion::Read(r) => r, + Completion::Write(_) => unreachable!(), + }; + trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let fd = io_uring::types::Fd(self.file.as_raw_fd()); let mut io = self.io.borrow_mut(); let read_e = { - let mut buf = c.buf_mut(); + let mut buf = r.buf_mut(); let len = buf.len(); let buf = buf.as_mut_ptr(); let ptr = Rc::into_raw(c.clone()); @@ -186,7 +190,7 @@ impl File for LinuxFile { &self, pos: usize, buffer: Rc>, - c: Rc, + c: Rc, ) -> Result<()> { let mut io = self.io.borrow_mut(); let fd = io_uring::types::Fd(self.file.as_raw_fd()); diff --git a/core/io/mod.rs b/core/io/mod.rs index 36d9b985a..cf8ef25a3 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -13,8 +13,7 @@ pub trait File { fn lock_file(&self, exclusive: bool) -> Result<()>; fn unlock_file(&self) -> Result<()>; fn pread(&self, pos: usize, c: Rc) -> Result<()>; - fn pwrite(&self, pos: usize, buffer: Rc>, c: Rc) - -> Result<()>; + fn pwrite(&self, pos: usize, buffer: Rc>, c: Rc) -> Result<()>; } pub trait IO { @@ -26,16 +25,30 @@ pub trait IO { pub type Complete = dyn Fn(Rc>); pub type WriteComplete = dyn Fn(usize); -pub struct Completion { +pub enum Completion { + Read(ReadCompletion), + Write(WriteCompletion), +} + +pub struct ReadCompletion { pub buf: Rc>, pub complete: Box, } +impl Completion { + pub fn complete(&self) { + match self { + Completion::Read(r) => r.complete(), + Completion::Write(w) => w.complete(234234), // fix + } + } +} + pub struct WriteCompletion { pub complete: Box, } -impl Completion { +impl ReadCompletion { pub fn new(buf: Rc>, complete: Box) -> Self { Self { buf, complete } } diff --git a/core/lib.rs b/core/lib.rs index 593e8765e..eefd2acde 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -201,6 +201,11 @@ impl Connection { } Ok(()) } + + pub fn cacheflush(&self) -> Result<()> { + self.pager.cacheflush()?; + Ok(()) + } } pub struct Statement { diff --git a/core/pager.rs b/core/pager.rs index ecca1b566..e38a266e5 100644 --- a/core/pager.rs +++ b/core/pager.rs @@ -18,6 +18,7 @@ use std::sync::{Arc, RwLock}; pub struct Page { flags: AtomicUsize, pub contents: RwLock>, + pub id: usize, } /// Page is up-to-date. @@ -31,15 +32,16 @@ const PAGE_DIRTY: usize = 0b1000; impl Default for Page { fn default() -> Self { - Self::new() + Self::new(0) } } impl Page { - pub fn new() -> Page { + pub fn new(id: usize) -> Page { Page { flags: AtomicUsize::new(0), contents: RwLock::new(None), + id, } } @@ -273,6 +275,7 @@ pub struct Pager { buffer_pool: Rc, /// I/O interface for input/output operations. pub io: Arc, + dirty_pages: Rc>>>>, } impl Pager { @@ -296,6 +299,7 @@ impl Pager { buffer_pool, page_cache, io, + dirty_pages: Rc::new(RefCell::new(Vec::new())), }) } @@ -306,7 +310,7 @@ impl Pager { if let Some(page) = page_cache.get(&page_idx) { return Ok(page.clone()); } - let page = Rc::new(RefCell::new(Page::new())); + let page = Rc::new(RefCell::new(Page::new(page_idx))); page.borrow().set_locked(); sqlite3_ondisk::begin_read_btree_page( &self.page_source, @@ -327,4 +331,23 @@ impl Pager { pub fn change_page_cache_size(&self, capacity: usize) { self.page_cache.borrow_mut().resize(capacity); } + + pub fn add_dirty(&self, page: Rc>) { + // TODO: cehck duplicates? + let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages); + dirty_pages.push(page); + } + + pub fn cacheflush(&self) -> anyhow::Result<()> { + let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages); + loop { + if dirty_pages.len() == 0 { + break; + } + let page = dirty_pages.pop().unwrap(); + sqlite3_ondisk::begin_write_btree_page(self, &page)?; + self.io.run_once()?; + } + Ok(()) + } } diff --git a/core/sqlite3_ondisk.rs b/core/sqlite3_ondisk.rs index 9cf03209a..471bf4be4 100644 --- a/core/sqlite3_ondisk.rs +++ b/core/sqlite3_ondisk.rs @@ -25,13 +25,12 @@ /// For more information, see: https://www.sqlite.org/fileformat.html use crate::buffer_pool::BufferPool; use crate::error::LimboError; -use crate::io::{Buffer, Completion, WriteCompletion}; +use crate::io::{Buffer, Completion, ReadCompletion, WriteCompletion}; use crate::pager::{Page, Pager}; use crate::types::{OwnedRecord, OwnedValue}; use crate::{PageSource, Result}; use log::trace; use std::cell::RefCell; -use std::ptr::NonNull; use std::rc::Rc; /// The size of the database header in bytes. @@ -78,7 +77,7 @@ pub fn begin_read_database_header(page_source: &PageSource) -> Result Re let drop_fn = Rc::new(|_buf| {}); let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn))); - let c = Rc::new(Completion::new(buf.clone(), complete)); + let c = Rc::new(Completion::Read(ReadCompletion::new(buf.clone(), complete))); page_source.get(1, c.clone())?; // run get header block pager.io.run_once()?; @@ -183,7 +182,7 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re } // finish_read_database_header(buf, header).unwrap(); }); - let c = Rc::new(WriteCompletion::new(write_complete)); + let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete))); page_source.write(0, buffer_to_copy.clone(), c).unwrap(); Ok(()) @@ -260,7 +259,7 @@ pub fn begin_read_btree_page( page.borrow_mut().set_error(); } }); - let c = Rc::new(Completion::new(buf, complete)); + let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete))); page_source.get(page_idx, c.clone())?; Ok(()) } @@ -317,6 +316,30 @@ fn finish_read_btree_page( Ok(()) } +pub fn begin_write_btree_page(pager: &Pager, page: &Rc>) -> Result<()> { + let page_source = &pager.page_source; + let page = page.borrow(); + let contents = page.contents.read().unwrap(); + let contents = contents.as_ref().unwrap(); + let buffer = contents.buffer.clone(); + let write_complete = { + let buf_copy = buffer.clone(); + Box::new(move |bytes_written: usize| { + let buf_copy = buf_copy.clone(); + let buf_len = buf_copy.borrow().len(); + if bytes_written < buf_len { + log::error!("wrote({bytes_written}) less than expected({buf_len})"); + } + println!("done"); + // finish_read_database_header(buf, header).unwrap(); + }) + }; + dbg!(buffer.borrow().len()); + let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete))); + page_source.write(page.id, buffer.clone(), c)?; + Ok(()) +} + #[derive(Debug)] pub enum BTreeCell { TableInteriorCell(TableInteriorCell), diff --git a/core/storage.rs b/core/storage.rs index bb62df0bd..3eed75797 100644 --- a/core/storage.rs +++ b/core/storage.rs @@ -39,7 +39,7 @@ impl PageSource { &self, page_idx: usize, buffer: Rc>, - c: Rc, + c: Rc, ) -> Result<()> { self.io.write(page_idx, buffer, c) } @@ -47,12 +47,7 @@ impl PageSource { pub trait PageIO { fn get(&self, page_idx: usize, c: Rc) -> Result<()>; - fn write( - &self, - page_idx: usize, - buffer: Rc>, - c: Rc, - ) -> Result<()>; + fn write(&self, page_idx: usize, buffer: Rc>, c: Rc) -> Result<()>; } #[cfg(feature = "fs")] @@ -63,7 +58,11 @@ struct FileStorage { #[cfg(feature = "fs")] impl PageIO for FileStorage { fn get(&self, page_idx: usize, c: Rc) -> Result<()> { - let size = c.buf().len(); + let r = match &(*c) { + Completion::Read(r) => r, + Completion::Write(_) => unreachable!(), + }; + let size = r.buf().len(); assert!(page_idx > 0); if size < 512 || size > 65536 || size & (size - 1) != 0 { return Err(LimboError::NotADB.into()); @@ -73,17 +72,17 @@ impl PageIO for FileStorage { Ok(()) } - fn write( - &self, - page_idx: usize, - buffer: Rc>, - c: Rc, - ) -> Result<()> { + fn write(&self, page_idx: usize, buffer: Rc>, c: Rc) -> Result<()> { + let w = match &(*c) { + Completion::Read(_) => unreachable!(), + Completion::Write(w) => w, + }; let buffer_size = buffer.borrow().len(); assert!(buffer_size >= 512); assert!(buffer_size <= 65536); assert!((buffer_size & (buffer_size - 1)) == 0); - self.file.pwrite(page_idx, buffer, c)?; + let pos = (page_idx - 1) * buffer_size; + self.file.pwrite(pos, buffer, c)?; Ok(()) } }