core: cacheflush and fix *Completion casting

This commit is contained in:
Pere Diaz Bou
2024-07-25 17:46:38 +02:00
parent 7846a3b29c
commit 845a1ea175
7 changed files with 109 additions and 40 deletions

View File

@@ -6,6 +6,7 @@ use crate::sqlite3_ondisk::{
use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue};
use crate::Result;
use std::borrow::BorrowMut;
use std::cell::{Ref, RefCell};
use std::rc::Rc;
@@ -213,13 +214,14 @@ impl BTreeCursor {
mem_page.clone()
};
let page_idx = mem_page.page_idx;
let page = self.pager.read_page(page_idx)?;
let page = page.borrow_mut();
let page_ref = self.pager.read_page(page_idx)?;
let page = page_ref.borrow();
if page.is_locked() {
return Ok(CursorResult::IO);
}
page.set_dirty();
self.pager.add_dirty(page_ref.clone());
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
@@ -284,8 +286,8 @@ impl BTreeCursor {
} else {
// insert
let pc = self.allocate_cell_space(page, payload.len() as u16);
let mut buf = page.buffer.borrow_mut();
let mut buf = buf.as_mut_slice();
let mut buf_ref = RefCell::borrow_mut(&page.buffer);
let buf: &mut [u8] = buf_ref.as_mut_slice();
buf[pc as usize..pc as usize + payload.len()].copy_from_slice(&payload);
// memmove(pIns+2, pIns, 2*(pPage->nCell - i));
let pointer_area_pc_by_idx = 8 + 2 * cell_idx;
@@ -313,11 +315,11 @@ impl BTreeCursor {
fn allocate_cell_space(&mut self, page_ref: &BTreePage, amount: u16) -> u16 {
let amount = amount as usize;
let mut page = page_ref.buffer.borrow_mut();
let buf = page.as_mut_slice();
let mut buf_ref = RefCell::borrow_mut(&page_ref.buffer);
let buf = buf_ref.as_mut_slice();
let cell_offset = 8;
let mut gap = cell_offset + 2 * page_ref.cells.len();
let gap = cell_offset + 2 * page_ref.cells.len();
let mut top = page_ref.header._cell_content_area as usize;
// there are free blocks and enough space
@@ -355,7 +357,7 @@ impl BTreeCursor {
if cloned_page.cells.len() > 0 {
let buf = cloned_page.buffer.borrow();
let buf = buf.as_slice();
let mut write_buf = page.buffer.borrow_mut();
let mut write_buf = RefCell::borrow_mut(&page.buffer);
let write_buf = write_buf.as_mut_slice();
for i in 0..cloned_page.cells.len() {
@@ -393,7 +395,7 @@ impl BTreeCursor {
// return SQLITE_CORRUPT_PAGE(pPage);
// }
assert!(cbrk >= first_cell);
let mut write_buf = page.buffer.borrow_mut();
let mut write_buf = RefCell::borrow_mut(&page.buffer);
let write_buf = write_buf.as_mut_slice();
// set new first byte of cell content

View File

@@ -159,11 +159,15 @@ impl File for LinuxFile {
}
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
trace!("pread(pos = {}, length = {})", pos, c.buf().len());
let r = match &(*c) {
Completion::Read(r) => r,
Completion::Write(_) => unreachable!(),
};
trace!("pread(pos = {}, length = {})", pos, r.buf().len());
let fd = io_uring::types::Fd(self.file.as_raw_fd());
let mut io = self.io.borrow_mut();
let read_e = {
let mut buf = c.buf_mut();
let mut buf = r.buf_mut();
let len = buf.len();
let buf = buf.as_mut_ptr();
let ptr = Rc::into_raw(c.clone());
@@ -186,7 +190,7 @@ impl File for LinuxFile {
&self,
pos: usize,
buffer: Rc<RefCell<crate::Buffer>>,
c: Rc<WriteCompletion>,
c: Rc<Completion>,
) -> Result<()> {
let mut io = self.io.borrow_mut();
let fd = io_uring::types::Fd(self.file.as_raw_fd());

View File

@@ -13,8 +13,7 @@ pub trait File {
fn lock_file(&self, exclusive: bool) -> Result<()>;
fn unlock_file(&self) -> Result<()>;
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()>;
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<WriteCompletion>)
-> Result<()>;
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<Completion>) -> Result<()>;
}
pub trait IO {
@@ -26,16 +25,30 @@ pub trait IO {
pub type Complete = dyn Fn(Rc<RefCell<Buffer>>);
pub type WriteComplete = dyn Fn(usize);
pub struct Completion {
pub enum Completion {
Read(ReadCompletion),
Write(WriteCompletion),
}
pub struct ReadCompletion {
pub buf: Rc<RefCell<Buffer>>,
pub complete: Box<Complete>,
}
impl Completion {
pub fn complete(&self) {
match self {
Completion::Read(r) => r.complete(),
Completion::Write(w) => w.complete(234234), // fix
}
}
}
pub struct WriteCompletion {
pub complete: Box<WriteComplete>,
}
impl Completion {
impl ReadCompletion {
pub fn new(buf: Rc<RefCell<Buffer>>, complete: Box<Complete>) -> Self {
Self { buf, complete }
}

View File

@@ -201,6 +201,11 @@ impl Connection {
}
Ok(())
}
pub fn cacheflush(&self) -> Result<()> {
self.pager.cacheflush()?;
Ok(())
}
}
pub struct Statement {

View File

@@ -18,6 +18,7 @@ use std::sync::{Arc, RwLock};
pub struct Page {
flags: AtomicUsize,
pub contents: RwLock<Option<BTreePage>>,
pub id: usize,
}
/// Page is up-to-date.
@@ -31,15 +32,16 @@ const PAGE_DIRTY: usize = 0b1000;
impl Default for Page {
fn default() -> Self {
Self::new()
Self::new(0)
}
}
impl Page {
pub fn new() -> Page {
pub fn new(id: usize) -> Page {
Page {
flags: AtomicUsize::new(0),
contents: RwLock::new(None),
id,
}
}
@@ -273,6 +275,7 @@ pub struct Pager {
buffer_pool: Rc<BufferPool>,
/// I/O interface for input/output operations.
pub io: Arc<dyn crate::io::IO>,
dirty_pages: Rc<RefCell<Vec<Rc<RefCell<Page>>>>>,
}
impl Pager {
@@ -296,6 +299,7 @@ impl Pager {
buffer_pool,
page_cache,
io,
dirty_pages: Rc::new(RefCell::new(Vec::new())),
})
}
@@ -306,7 +310,7 @@ impl Pager {
if let Some(page) = page_cache.get(&page_idx) {
return Ok(page.clone());
}
let page = Rc::new(RefCell::new(Page::new()));
let page = Rc::new(RefCell::new(Page::new(page_idx)));
page.borrow().set_locked();
sqlite3_ondisk::begin_read_btree_page(
&self.page_source,
@@ -327,4 +331,23 @@ impl Pager {
pub fn change_page_cache_size(&self, capacity: usize) {
self.page_cache.borrow_mut().resize(capacity);
}
pub fn add_dirty(&self, page: Rc<RefCell<Page>>) {
// TODO: cehck duplicates?
let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages);
dirty_pages.push(page);
}
pub fn cacheflush(&self) -> anyhow::Result<()> {
let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages);
loop {
if dirty_pages.len() == 0 {
break;
}
let page = dirty_pages.pop().unwrap();
sqlite3_ondisk::begin_write_btree_page(self, &page)?;
self.io.run_once()?;
}
Ok(())
}
}

View File

@@ -25,13 +25,12 @@
/// For more information, see: https://www.sqlite.org/fileformat.html
use crate::buffer_pool::BufferPool;
use crate::error::LimboError;
use crate::io::{Buffer, Completion, WriteCompletion};
use crate::io::{Buffer, Completion, ReadCompletion, WriteCompletion};
use crate::pager::{Page, Pager};
use crate::types::{OwnedRecord, OwnedValue};
use crate::{PageSource, Result};
use log::trace;
use std::cell::RefCell;
use std::ptr::NonNull;
use std::rc::Rc;
/// The size of the database header in bytes.
@@ -78,7 +77,7 @@ pub fn begin_read_database_header(page_source: &PageSource) -> Result<Rc<RefCell
let header = header.clone();
finish_read_database_header(buf, header).unwrap();
});
let c = Rc::new(Completion::new(buf, complete));
let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete)));
page_source.get(1, c.clone())?;
Ok(result)
}
@@ -169,7 +168,7 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re
let drop_fn = Rc::new(|_buf| {});
let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn)));
let c = Rc::new(Completion::new(buf.clone(), complete));
let c = Rc::new(Completion::Read(ReadCompletion::new(buf.clone(), complete)));
page_source.get(1, c.clone())?;
// run get header block
pager.io.run_once()?;
@@ -183,7 +182,7 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re
}
// finish_read_database_header(buf, header).unwrap();
});
let c = Rc::new(WriteCompletion::new(write_complete));
let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete)));
page_source.write(0, buffer_to_copy.clone(), c).unwrap();
Ok(())
@@ -260,7 +259,7 @@ pub fn begin_read_btree_page(
page.borrow_mut().set_error();
}
});
let c = Rc::new(Completion::new(buf, complete));
let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete)));
page_source.get(page_idx, c.clone())?;
Ok(())
}
@@ -317,6 +316,30 @@ fn finish_read_btree_page(
Ok(())
}
pub fn begin_write_btree_page(pager: &Pager, page: &Rc<RefCell<Page>>) -> Result<()> {
let page_source = &pager.page_source;
let page = page.borrow();
let contents = page.contents.read().unwrap();
let contents = contents.as_ref().unwrap();
let buffer = contents.buffer.clone();
let write_complete = {
let buf_copy = buffer.clone();
Box::new(move |bytes_written: usize| {
let buf_copy = buf_copy.clone();
let buf_len = buf_copy.borrow().len();
if bytes_written < buf_len {
log::error!("wrote({bytes_written}) less than expected({buf_len})");
}
println!("done");
// finish_read_database_header(buf, header).unwrap();
})
};
dbg!(buffer.borrow().len());
let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete)));
page_source.write(page.id, buffer.clone(), c)?;
Ok(())
}
#[derive(Debug)]
pub enum BTreeCell {
TableInteriorCell(TableInteriorCell),

View File

@@ -39,7 +39,7 @@ impl PageSource {
&self,
page_idx: usize,
buffer: Rc<RefCell<Buffer>>,
c: Rc<WriteCompletion>,
c: Rc<Completion>,
) -> Result<()> {
self.io.write(page_idx, buffer, c)
}
@@ -47,12 +47,7 @@ impl PageSource {
pub trait PageIO {
fn get(&self, page_idx: usize, c: Rc<Completion>) -> Result<()>;
fn write(
&self,
page_idx: usize,
buffer: Rc<RefCell<Buffer>>,
c: Rc<WriteCompletion>,
) -> Result<()>;
fn write(&self, page_idx: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<Completion>) -> Result<()>;
}
#[cfg(feature = "fs")]
@@ -63,7 +58,11 @@ struct FileStorage {
#[cfg(feature = "fs")]
impl PageIO for FileStorage {
fn get(&self, page_idx: usize, c: Rc<Completion>) -> Result<()> {
let size = c.buf().len();
let r = match &(*c) {
Completion::Read(r) => r,
Completion::Write(_) => unreachable!(),
};
let size = r.buf().len();
assert!(page_idx > 0);
if size < 512 || size > 65536 || size & (size - 1) != 0 {
return Err(LimboError::NotADB.into());
@@ -73,17 +72,17 @@ impl PageIO for FileStorage {
Ok(())
}
fn write(
&self,
page_idx: usize,
buffer: Rc<RefCell<Buffer>>,
c: Rc<WriteCompletion>,
) -> Result<()> {
fn write(&self, page_idx: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<Completion>) -> Result<()> {
let w = match &(*c) {
Completion::Read(_) => unreachable!(),
Completion::Write(w) => w,
};
let buffer_size = buffer.borrow().len();
assert!(buffer_size >= 512);
assert!(buffer_size <= 65536);
assert!((buffer_size & (buffer_size - 1)) == 0);
self.file.pwrite(page_idx, buffer, c)?;
let pos = (page_idx - 1) * buffer_size;
self.file.pwrite(pos, buffer, c)?;
Ok(())
}
}