Merge 'btree: cursor with lineal stack structure' from Pere Diaz Bou

Removed MemPage from the code in favor of an array to encode the stack
of the cursor. This is both simpler and better in terms of memory
access.
O_DIRECT was removed from WAL file as it introduces alignment
constraints that are too hard to follow in regular appends. Maybe in the
future?

Closes #378
This commit is contained in:
Pere Diaz Bou
2024-11-13 18:23:31 +01:00
18 changed files with 650 additions and 324 deletions

View File

@@ -25,12 +25,15 @@ jobs:
runs-on: ${{ matrix.os }}
steps:
- uses: Swatinem/rust-cache@v2
- uses: actions/checkout@v3
- uses: Swatinem/rust-cache@v2
- name: Build
run: cargo build --verbose
- name: Test
env:
RUST_LOG: ${{ runner.debug && 'limbo_core::storage=trace' || '' }}
run: cargo test --verbose
timeout-minutes: 5
build-wasm:
runs-on: ubuntu-latest

View File

@@ -8,7 +8,7 @@ mypy==1.11.0
# via limbo (pyproject.toml)
mypy-extensions==1.0.0
# via mypy
packaging==24.1
packaging==24.2
# via pytest
pluggy==1.5.0
# via pytest

View File

@@ -16,7 +16,9 @@ impl Database {
#[wasm_bindgen(constructor)]
pub fn new(path: &str) -> Database {
let io = Arc::new(PlatformIO { vfs: VFS::new() });
let file = io.open_file(path, limbo_core::OpenFlags::None).unwrap();
let file = io
.open_file(path, limbo_core::OpenFlags::None, false)
.unwrap();
let page_io = Rc::new(DatabaseStorage::new(file));
let wal = Rc::new(RefCell::new(Wal {}));
let db = limbo_core::Database::open(io, page_io, wal).unwrap();
@@ -124,7 +126,12 @@ pub struct PlatformIO {
}
impl limbo_core::IO for PlatformIO {
fn open_file(&self, path: &str, _flags: OpenFlags) -> Result<Rc<dyn limbo_core::File>> {
fn open_file(
&self,
path: &str,
_flags: OpenFlags,
_direct: bool,
) -> Result<Rc<dyn limbo_core::File>> {
let fd = self.vfs.open(path);
Ok(Rc::new(File {
vfs: VFS::new(),

View File

@@ -13,7 +13,7 @@ pub mod tests {
// Parent process opens the file
let io1 = create_io().expect("Failed to create IO");
let _file1 = io1
.open_file(&path, crate::io::OpenFlags::None)
.open_file(&path, crate::io::OpenFlags::None, false)
.expect("Failed to open file in parent process");
let current_exe = std::env::current_exe().expect("Failed to get current executable path");
@@ -38,7 +38,7 @@ pub mod tests {
if std::env::var("RUST_TEST_CHILD_PROCESS").is_ok() {
let path = std::env::var("RUST_TEST_FILE_PATH")?;
let io = create_io()?;
match io.open_file(&path, crate::io::OpenFlags::None) {
match io.open_file(&path, crate::io::OpenFlags::None, false) {
Ok(_) => std::process::exit(0),
Err(_) => std::process::exit(1),
}

View File

@@ -31,7 +31,7 @@ impl DarwinIO {
}
impl IO for DarwinIO {
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn File>> {
fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result<Rc<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::File::options()
.read(true)

View File

@@ -13,7 +13,7 @@ impl GenericIO {
}
impl IO for GenericIO {
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn File>> {
fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result<Rc<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::File::open(path)?;
Ok(Rc::new(GenericFile {

View File

@@ -4,8 +4,8 @@ use libc::{c_short, fcntl, flock, iovec, F_SETLK};
use log::{debug, trace};
use nix::fcntl::{FcntlArg, OFlag};
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt;
use std::os::unix::fs::MetadataExt;
use std::os::unix::io::AsRawFd;
use std::rc::Rc;
use thiserror::Error;
@@ -37,6 +37,8 @@ pub struct LinuxIO {
struct WrappedIOUring {
ring: io_uring::IoUring,
pending_ops: usize,
pub pending: HashMap<u64, Rc<Completion>>,
key: u64,
}
struct InnerLinuxIO {
@@ -52,6 +54,8 @@ impl LinuxIO {
ring: WrappedIOUring {
ring,
pending_ops: 0,
pending: HashMap::new(),
key: 0,
},
iovecs: [iovec {
iov_base: std::ptr::null_mut(),
@@ -76,7 +80,9 @@ impl InnerLinuxIO {
}
impl WrappedIOUring {
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) {
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Rc<Completion>) {
log::trace!("submit_entry({:?})", entry);
self.pending.insert(entry.get_user_data(), c);
unsafe {
self.ring
.submission()
@@ -95,6 +101,7 @@ impl WrappedIOUring {
// NOTE: This works because CompletionQueue's next function pops the head of the queue. This is not normal behaviour of iterators
let entry = self.ring.completion().next();
if entry.is_some() {
log::trace!("get_completion({:?})", entry);
// consumed an entry from completion queue, update pending_ops
self.pending_ops -= 1;
}
@@ -104,10 +111,15 @@ impl WrappedIOUring {
fn empty(&self) -> bool {
self.pending_ops == 0
}
fn get_key(&mut self) -> u64 {
self.key += 1;
self.key
}
}
impl IO for LinuxIO {
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn File>> {
fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result<Rc<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::File::options()
.read(true)
@@ -117,10 +129,12 @@ impl IO for LinuxIO {
// Let's attempt to enable direct I/O. Not all filesystems support it
// so ignore any errors.
let fd = file.as_raw_fd();
match nix::fcntl::fcntl(fd, FcntlArg::F_SETFL(OFlag::O_DIRECT)) {
Ok(_) => {},
Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"),
};
if direct {
match nix::fcntl::fcntl(fd, FcntlArg::F_SETFL(OFlag::O_DIRECT)) {
Ok(_) => {},
Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"),
};
}
let linux_file = Rc::new(LinuxFile {
io: self.inner.clone(),
file,
@@ -145,12 +159,16 @@ impl IO for LinuxIO {
let result = cqe.result();
if result < 0 {
return Err(LimboError::LinuxIOError(format!(
"{}",
LinuxIOError::IOUringCQError(result)
"{} cqe: {:?}",
LinuxIOError::IOUringCQError(result),
cqe
)));
}
let c = unsafe { Rc::from_raw(cqe.user_data() as *const Completion) };
c.complete(cqe.result());
{
let c = ring.pending.get(&cqe.user_data()).unwrap().clone();
c.complete(cqe.result());
}
ring.pending.remove(&cqe.user_data());
}
Ok(())
}
@@ -234,14 +252,13 @@ impl File for LinuxFile {
let mut buf = r.buf_mut();
let len = buf.len();
let buf = buf.as_mut_ptr();
let ptr = Rc::into_raw(c.clone());
let iovec = io.get_iovec(buf, len);
io_uring::opcode::Readv::new(fd, iovec, 1)
.offset(pos as u64)
.build()
.user_data(ptr as u64)
.user_data(io.ring.get_key())
};
io.ring.submit_entry(&read_e);
io.ring.submit_entry(&read_e, c);
Ok(())
}
@@ -255,25 +272,25 @@ impl File for LinuxFile {
let fd = io_uring::types::Fd(self.file.as_raw_fd());
let write = {
let buf = buffer.borrow();
let ptr = Rc::into_raw(c.clone());
trace!("pwrite(pos = {}, length = {})", pos, buf.len());
let iovec = io.get_iovec(buf.as_ptr(), buf.len());
io_uring::opcode::Writev::new(fd, iovec, 1)
.offset(pos as u64)
.build()
.user_data(ptr as u64)
.user_data(io.ring.get_key())
};
io.ring.submit_entry(&write);
io.ring.submit_entry(&write, c);
Ok(())
}
fn sync(&self, c: Rc<Completion>) -> Result<()> {
let fd = io_uring::types::Fd(self.file.as_raw_fd());
let ptr = Rc::into_raw(c.clone());
let mut io = self.io.borrow_mut();
trace!("sync()");
let sync = io_uring::opcode::Fsync::new(fd)
.build()
.user_data(ptr as u64);
let mut io = self.io.borrow_mut();
io.ring.submit_entry(&sync);
.user_data(io.ring.get_key());
io.ring.submit_entry(&sync, c);
Ok(())
}

View File

@@ -24,7 +24,7 @@ pub enum OpenFlags {
}
pub trait IO {
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn File>>;
fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result<Rc<dyn File>>;
fn run_once(&self) -> Result<()>;

View File

@@ -13,7 +13,7 @@ impl WindowsIO {
}
impl IO for WindowsIO {
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn File>> {
fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result<Rc<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::File::options()
.read(true)

View File

@@ -65,7 +65,7 @@ pub struct Database {
impl Database {
#[cfg(feature = "fs")]
pub fn open_file(io: Arc<dyn IO>, path: &str) -> Result<Rc<Database>> {
let file = io.open_file(path, io::OpenFlags::None)?;
let file = io.open_file(path, io::OpenFlags::None, true)?;
let page_io = Rc::new(FileStorage::new(file));
let wal_path = format!("{}-wal", path);
let db_header = Pager::begin_open(page_io.clone())?;
@@ -283,6 +283,27 @@ impl Connection {
self.pager.clear_page_cache();
Ok(())
}
pub fn checkpoint(&self) -> Result<()> {
self.pager.clear_page_cache();
Ok(())
}
}
impl Drop for Connection {
fn drop(&mut self) {
loop {
// TODO: make this async?
match self.pager.checkpoint().unwrap() {
CheckpointStatus::Done => {
return;
}
CheckpointStatus::IO => {
self.pager.io.run_once().unwrap();
}
};
}
}
}
pub struct Statement {

View File

@@ -24,31 +24,16 @@ const BTREE_HEADER_OFFSET_CELL_CONTENT: usize = 5; /* pointer to first byte of c
const BTREE_HEADER_OFFSET_FRAGMENTED: usize = 7; /* number of fragmented bytes -> u8 */
const BTREE_HEADER_OFFSET_RIGHTMOST: usize = 8; /* if internalnode, pointer right most pointer (saved separately from cells) -> u32 */
#[derive(Debug)]
pub struct MemPage {
parent: Option<Rc<MemPage>>,
page_idx: usize,
cell_idx: RefCell<usize>,
}
impl MemPage {
pub fn new(parent: Option<Rc<MemPage>>, page_idx: usize, cell_idx: usize) -> Self {
Self {
parent,
page_idx,
cell_idx: RefCell::new(cell_idx),
}
}
pub fn cell_idx(&self) -> usize {
*self.cell_idx.borrow()
}
pub fn advance(&self) {
let mut cell_idx = self.cell_idx.borrow_mut();
*cell_idx += 1;
}
}
/*
** Maximum depth of an SQLite B-Tree structure. Any B-Tree deeper than
** this will be declared corrupt. This value is calculated based on a
** maximum database size of 2^31 pages a minimum fanout of 2 for a
** root-node and 3 for all other internal nodes.
**
** If a tree that appears to be taller than this is encountered, it is
** assumed that the database is corrupt.
*/
pub const BTCURSOR_MAX_DEPTH: usize = 20;
#[derive(Debug)]
enum WriteState {
@@ -61,9 +46,7 @@ enum WriteState {
struct WriteInfo {
state: WriteState,
current_page: RefCell<Option<(Rc<MemPage>, Rc<RefCell<Page>>)>>,
parent_page: RefCell<Option<(Rc<MemPage>, Rc<RefCell<Page>>)>>,
new_pages: RefCell<Vec<(Rc<MemPage>, Rc<RefCell<Page>>)>>,
new_pages: RefCell<Vec<Rc<RefCell<Page>>>>,
scratch_cells: RefCell<Vec<&'static [u8]>>,
rightmost_pointer: RefCell<Option<u32>>,
page_copy: RefCell<Option<PageContent>>, // this holds the copy a of a page needed for buffer references
@@ -71,14 +54,38 @@ struct WriteInfo {
pub struct BTreeCursor {
pager: Rc<Pager>,
/// Page id of the root page used to go back up fast.
root_page: usize,
page: RefCell<Option<Rc<MemPage>>>,
/// Rowid and record are stored before being consumed.
rowid: RefCell<Option<u64>>,
record: RefCell<Option<OwnedRecord>>,
null_flag: bool,
database_header: Rc<RefCell<DatabaseHeader>>,
/// Index internal pages are consumed on the way up, so we store going upwards flag in case
/// we just moved to a parent page and the parent page is an internal index page which requires
/// to be consumed.
going_upwards: bool,
/// Write information kept in case of write yields due to I/O. Needs to be stored somewhere
/// right :).
write_info: WriteInfo,
/// Page stack used to traverse the btree.
/// Each cursor has a stack because each cursor traverses the btree independently.
stack: PageStack,
}
/// Stack of pages representing the tree traversal order.
/// current_page represents the current page being used in the tree and current_page - 1 would be
/// the parent. Using current_page + 1 or higher is undefined behaviour.
struct PageStack {
/// Pointer to the currenet page being consumed
current_page: RefCell<i32>,
/// List of pages in the stack. Root page will be in index 0
stack: RefCell<[Option<Rc<RefCell<Page>>>; BTCURSOR_MAX_DEPTH + 1]>,
/// List of cell indices in the stack.
/// cell_indices[current_page] is the current cell index being consumed. Similarly
/// cell_indices[current_page-1] is the cell index of the parent of the current page
/// that we save in case of going back up.
cell_indices: RefCell<[usize; BTCURSOR_MAX_DEPTH + 1]>,
}
impl BTreeCursor {
@@ -90,7 +97,6 @@ impl BTreeCursor {
Self {
pager,
root_page,
page: RefCell::new(None),
rowid: RefCell::new(None),
record: RefCell::new(None),
null_flag: false,
@@ -98,13 +104,16 @@ impl BTreeCursor {
going_upwards: false,
write_info: WriteInfo {
state: WriteState::Start,
current_page: RefCell::new(None),
parent_page: RefCell::new(None),
new_pages: RefCell::new(Vec::with_capacity(4)),
scratch_cells: RefCell::new(Vec::new()),
rightmost_pointer: RefCell::new(None),
page_copy: RefCell::new(None),
},
stack: PageStack {
current_page: RefCell::new(-1),
cell_indices: RefCell::new([0; BTCURSOR_MAX_DEPTH + 1]),
stack: RefCell::new([const { None }; BTCURSOR_MAX_DEPTH + 1]),
},
}
}
@@ -125,38 +134,61 @@ impl BTreeCursor {
predicate: Option<(SeekKey<'_>, SeekOp)>,
) -> Result<CursorResult<(Option<u64>, Option<OwnedRecord>)>> {
loop {
let mem_page = self.get_mem_page();
let page_idx = mem_page.page_idx;
let page = self.pager.read_page(page_idx)?;
let page = RefCell::borrow(&page);
if page.is_locked() {
let mem_page_rc = self.stack.top();
let cell_idx = self.stack.current_index();
debug!("current id={} cell={}", mem_page_rc.borrow().id, cell_idx);
if mem_page_rc.borrow().is_locked() {
return Ok(CursorResult::IO);
}
let page = page.contents.read().unwrap();
if !mem_page_rc.borrow().is_loaded() {
self.pager.load_page(mem_page_rc.clone())?;
return Ok(CursorResult::IO);
}
let mem_page = mem_page_rc.borrow();
let page = mem_page.contents.read().unwrap();
let page = page.as_ref().unwrap();
if mem_page.cell_idx() >= page.cell_count() {
let parent = mem_page.parent.clone();
if cell_idx == page.cell_count() {
// do rightmost
let has_parent = self.stack.has_parent();
match page.rightmost_pointer() {
Some(right_most_pointer) => {
let mem_page = MemPage::new(parent.clone(), right_most_pointer as usize, 0);
self.page.replace(Some(Rc::new(mem_page)));
self.stack.advance();
let mem_page = self.pager.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
continue;
}
None => match parent {
Some(ref parent) => {
None => {
if has_parent {
debug!("moving simple upwards");
self.going_upwards = true;
self.page.replace(Some(parent.clone()));
self.stack.pop();
continue;
}
None => {
} else {
return Ok(CursorResult::Ok((None, None)));
}
},
}
}
}
if cell_idx >= page.cell_count() + 1 {
// end
let has_parent = self.stack.current() > 0;
if has_parent {
debug!("moving upwards");
self.going_upwards = true;
self.stack.pop();
continue;
} else {
return Ok(CursorResult::Ok((None, None)));
}
}
assert!(cell_idx < page.cell_count());
let cell = page.cell_get(
mem_page.cell_idx(),
cell_idx,
self.pager.clone(),
self.max_local(page.page_type()),
self.min_local(page.page_type()),
@@ -168,10 +200,9 @@ impl BTreeCursor {
_rowid,
}) => {
assert!(predicate.is_none());
mem_page.advance();
let mem_page =
MemPage::new(Some(mem_page.clone()), *_left_child_page as usize, 0);
self.page.replace(Some(Rc::new(mem_page)));
self.stack.advance();
let mem_page = self.pager.read_page(*_left_child_page as usize)?;
self.stack.push(mem_page);
continue;
}
BTreeCell::TableLeafCell(TableLeafCell {
@@ -180,7 +211,7 @@ impl BTreeCursor {
first_overflow_page: _,
}) => {
assert!(predicate.is_none());
mem_page.advance();
self.stack.advance();
let record = crate::storage::sqlite3_ondisk::read_record(_payload)?;
return Ok(CursorResult::Ok((Some(*_rowid), Some(record))));
}
@@ -190,14 +221,13 @@ impl BTreeCursor {
..
}) => {
if !self.going_upwards {
let mem_page =
MemPage::new(Some(mem_page.clone()), *left_child_page as usize, 0);
self.page.replace(Some(Rc::new(mem_page)));
let mem_page = self.pager.read_page(*left_child_page as usize)?;
self.stack.push(mem_page);
continue;
}
self.going_upwards = false;
mem_page.advance();
self.stack.advance();
let record = crate::storage::sqlite3_ondisk::read_record(payload)?;
if predicate.is_none() {
@@ -228,7 +258,7 @@ impl BTreeCursor {
}
}
BTreeCell::IndexLeafCell(IndexLeafCell { payload, .. }) => {
mem_page.advance();
self.stack.advance();
let record = crate::storage::sqlite3_ondisk::read_record(payload)?;
if predicate.is_none() {
let rowid = match record.values.last() {
@@ -270,66 +300,66 @@ impl BTreeCursor {
CursorResult::IO => return Ok(CursorResult::IO),
};
let mem_page = self.get_mem_page();
let page_idx = mem_page.page_idx;
let page = self.pager.read_page(page_idx)?;
let page = RefCell::borrow(&page);
if page.is_locked() {
return Ok(CursorResult::IO);
}
{
let page_rc = self.stack.top();
let page = page_rc.borrow();
if page.is_locked() {
return Ok(CursorResult::IO);
}
let page = page.contents.read().unwrap();
let page = page.as_ref().unwrap();
let contents = page.contents.read().unwrap();
let contents = contents.as_ref().unwrap();
for cell_idx in 0..page.cell_count() {
let cell = page.cell_get(
cell_idx,
self.pager.clone(),
self.max_local(page.page_type()),
self.min_local(page.page_type()),
self.usable_space(),
)?;
match &cell {
BTreeCell::TableLeafCell(TableLeafCell {
_rowid: cell_rowid,
_payload: payload,
first_overflow_page: _,
}) => {
let SeekKey::TableRowId(rowid_key) = key else {
unreachable!("table seek key should be a rowid");
};
mem_page.advance();
let found = match op {
SeekOp::GT => *cell_rowid > rowid_key,
SeekOp::GE => *cell_rowid >= rowid_key,
SeekOp::EQ => *cell_rowid == rowid_key,
};
if found {
let record = crate::storage::sqlite3_ondisk::read_record(payload)?;
return Ok(CursorResult::Ok((Some(*cell_rowid), Some(record))));
}
}
BTreeCell::IndexLeafCell(IndexLeafCell { payload, .. }) => {
let SeekKey::IndexKey(index_key) = key else {
unreachable!("index seek key should be a record");
};
mem_page.advance();
let record = crate::storage::sqlite3_ondisk::read_record(payload)?;
let found = match op {
SeekOp::GT => record > *index_key,
SeekOp::GE => record >= *index_key,
SeekOp::EQ => record == *index_key,
};
if found {
let rowid = match record.values.last() {
Some(OwnedValue::Integer(rowid)) => *rowid as u64,
_ => unreachable!("index cells should have an integer rowid"),
for cell_idx in 0..contents.cell_count() {
let cell = contents.cell_get(
cell_idx,
self.pager.clone(),
self.max_local(contents.page_type()),
self.min_local(contents.page_type()),
self.usable_space(),
)?;
match &cell {
BTreeCell::TableLeafCell(TableLeafCell {
_rowid: cell_rowid,
_payload: payload,
first_overflow_page: _,
}) => {
let SeekKey::TableRowId(rowid_key) = key else {
unreachable!("table seek key should be a rowid");
};
return Ok(CursorResult::Ok((Some(rowid), Some(record))));
let found = match op {
SeekOp::GT => *cell_rowid > rowid_key,
SeekOp::GE => *cell_rowid >= rowid_key,
SeekOp::EQ => *cell_rowid == rowid_key,
};
self.stack.advance();
if found {
let record = crate::storage::sqlite3_ondisk::read_record(payload)?;
return Ok(CursorResult::Ok((Some(*cell_rowid), Some(record))));
}
}
BTreeCell::IndexLeafCell(IndexLeafCell { payload, .. }) => {
let SeekKey::IndexKey(index_key) = key else {
unreachable!("index seek key should be a record");
};
let record = crate::storage::sqlite3_ondisk::read_record(payload)?;
let found = match op {
SeekOp::GT => record > *index_key,
SeekOp::GE => record >= *index_key,
SeekOp::EQ => record == *index_key,
};
self.stack.advance();
if found {
let rowid = match record.values.last() {
Some(OwnedValue::Integer(rowid)) => *rowid as u64,
_ => unreachable!("index cells should have an integer rowid"),
};
return Ok(CursorResult::Ok((Some(rowid), Some(record))));
}
}
cell_type => {
unreachable!("unexpected cell type: {:?}", cell_type);
}
}
cell_type => {
unreachable!("unexpected cell type: {:?}", cell_type);
}
}
}
@@ -358,16 +388,17 @@ impl BTreeCursor {
}
fn move_to_root(&mut self) {
self.page
.replace(Some(Rc::new(MemPage::new(None, self.root_page, 0))));
let mem_page = self.pager.read_page(self.root_page).unwrap();
self.stack.clear();
self.stack.push(mem_page);
}
fn move_to_rightmost(&mut self) -> Result<CursorResult<()>> {
self.move_to_root();
loop {
let mem_page = self.page.borrow().as_ref().unwrap().clone();
let page_idx = mem_page.page_idx;
let mem_page = self.stack.top();
let page_idx = mem_page.borrow().id;
let page = self.pager.read_page(page_idx)?;
let page = RefCell::borrow(&page);
if page.is_locked() {
@@ -377,17 +408,16 @@ impl BTreeCursor {
let page = page.as_ref().unwrap();
if page.is_leaf() {
if page.cell_count() > 0 {
mem_page.cell_idx.replace(page.cell_count() - 1);
self.stack.set_cell_index(page.cell_count() - 1);
}
return Ok(CursorResult::Ok(()));
}
match page.rightmost_pointer() {
Some(right_most_pointer) => {
mem_page.cell_idx.replace(page.cell_count());
let mem_page =
MemPage::new(Some(mem_page.clone()), right_most_pointer as usize, 0);
self.page.replace(Some(Rc::new(mem_page)));
self.stack.set_cell_index(page.cell_count() + 1);
let mem_page = self.pager.read_page(right_most_pointer as usize).unwrap();
self.stack.push(mem_page);
continue;
}
@@ -425,27 +455,25 @@ impl BTreeCursor {
self.move_to_root();
loop {
let mem_page = self.get_mem_page();
let page_idx = mem_page.page_idx;
let page = self.pager.read_page(page_idx)?;
let page = RefCell::borrow(&page);
let page_rc = self.stack.top();
let page = RefCell::borrow(&page_rc);
if page.is_locked() {
return Ok(CursorResult::IO);
}
let page = page.contents.read().unwrap();
let page = page.as_ref().unwrap();
if page.is_leaf() {
let contents = page.contents.read().unwrap();
let contents = contents.as_ref().unwrap();
if contents.is_leaf() {
return Ok(CursorResult::Ok(()));
}
let mut found_cell = false;
for cell_idx in 0..page.cell_count() {
match &page.cell_get(
for cell_idx in 0..contents.cell_count() {
match &contents.cell_get(
cell_idx,
self.pager.clone(),
self.max_local(page.page_type()),
self.min_local(page.page_type()),
self.max_local(contents.page_type()),
self.min_local(contents.page_type()),
self.usable_space(),
)? {
BTreeCell::TableInteriorCell(TableInteriorCell {
@@ -455,16 +483,15 @@ impl BTreeCursor {
let SeekKey::TableRowId(rowid_key) = key else {
unreachable!("table seek key should be a rowid");
};
mem_page.advance();
let target_leaf_page_is_in_left_subtree = match cmp {
SeekOp::GT => rowid_key < *_rowid,
SeekOp::GE => rowid_key <= *_rowid,
SeekOp::EQ => rowid_key <= *_rowid,
};
self.stack.advance();
if target_leaf_page_is_in_left_subtree {
let mem_page =
MemPage::new(Some(mem_page.clone()), *_left_child_page as usize, 0);
self.page.replace(Some(Rc::new(mem_page)));
let mem_page = self.pager.read_page(*_left_child_page as usize)?;
self.stack.push(mem_page);
found_cell = true;
break;
}
@@ -493,13 +520,13 @@ impl BTreeCursor {
SeekOp::EQ => index_key <= &record,
};
if target_leaf_page_is_in_the_left_subtree {
let mem_page =
MemPage::new(Some(mem_page.clone()), *left_child_page as usize, 0);
self.page.replace(Some(Rc::new(mem_page)));
// we don't advance in case of index tree internal nodes because we will visit this node going up
let mem_page = self.pager.read_page(*left_child_page as usize).unwrap();
self.stack.push(mem_page);
found_cell = true;
break;
} else {
mem_page.advance();
self.stack.advance();
}
}
BTreeCell::IndexLeafCell(_) => {
@@ -511,11 +538,11 @@ impl BTreeCursor {
}
if !found_cell {
let parent = mem_page.parent.clone();
match page.rightmost_pointer() {
match contents.rightmost_pointer() {
Some(right_most_pointer) => {
let mem_page = MemPage::new(parent, right_most_pointer as usize, 0);
self.page.replace(Some(Rc::new(mem_page)));
self.stack.advance();
let mem_page = self.pager.read_page(right_most_pointer as usize).unwrap();
self.stack.push(mem_page);
continue;
}
None => {
@@ -526,7 +553,7 @@ impl BTreeCursor {
}
}
fn insert_to_page(
fn insert_into_page(
&mut self,
key: &OwnedValue,
record: &OwnedRecord,
@@ -535,7 +562,7 @@ impl BTreeCursor {
let state = &self.write_info.state;
match state {
WriteState::Start => {
let page_ref = self.get_current_page()?;
let page_ref = self.stack.top();
let int_key = match key {
OwnedValue::Integer(i) => *i as u64,
_ => unreachable!("btree tables are indexed by integers!"),
@@ -577,10 +604,6 @@ impl BTreeCursor {
};
if overflow > 0 {
self.write_info.state = WriteState::BalanceStart;
self.write_info.current_page.borrow_mut().replace((
self.page.borrow().as_ref().unwrap().clone(),
page_ref.clone(),
));
} else {
self.write_info.state = WriteState::Finish;
}
@@ -711,17 +734,6 @@ impl BTreeCursor {
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1);
}
fn get_current_page(&mut self) -> crate::Result<Rc<RefCell<Page>>> {
let mem_page = {
let mem_page = self.page.borrow();
let mem_page = mem_page.as_ref().unwrap();
mem_page.clone()
};
let page_idx = mem_page.page_idx;
let page_ref = self.pager.read_page(page_idx)?;
Ok(page_ref)
}
/// This is a naive algorithm that doesn't try to distribute cells evenly by content.
/// It will try to split the page in half by keys not by content.
/// Sqlite tries to have a page at least 40% full.
@@ -729,12 +741,16 @@ impl BTreeCursor {
let state = &self.write_info.state;
match state {
WriteState::BalanceStart => {
let current_page = self.write_info.current_page.borrow();
let mem_page = &current_page.as_ref().unwrap().0;
// drop divider cells and find right pointer
// NOTE: since we are doing a simple split we only finding the pointer we want to update (right pointer).
// Right pointer means cell that points to the last page, as we don't really want to drop this one. This one
// can be a "rightmost pointer" or a "cell".
// TODO(pere): simplify locking...
// we always asumme there is a parent
let current_page = self.stack.top();
let page_rc = RefCell::borrow(&current_page);
{
// check if we don't need to balance
let page_ref = &current_page.as_ref().unwrap().1;
let page_rc = RefCell::borrow(&page_ref);
{
// don't continue if there are no overflow cells
@@ -747,15 +763,13 @@ impl BTreeCursor {
}
}
if mem_page.parent.is_none() {
if !self.stack.has_parent() {
drop(page_rc);
drop(current_page);
self.balance_root();
return Ok(CursorResult::Ok(()));
}
debug!("Balancing leaf. leaf={}", mem_page.page_idx);
let page_ref = &current_page.as_ref().unwrap().1;
let page_rc = RefCell::borrow(&page_ref);
debug!("Balancing leaf. leaf={}", page_rc.id);
// Copy of page used to reference cell bytes.
let page_copy = {
@@ -808,59 +822,48 @@ impl BTreeCursor {
self.write_info
.new_pages
.borrow_mut()
.push((mem_page.clone(), page_ref.clone()));
self.write_info.new_pages.borrow_mut().push((
Rc::new(MemPage::new(mem_page.parent.clone(), right_page_id, 0)),
right_page_ref.clone(),
));
.push(current_page.clone());
self.write_info
.new_pages
.borrow_mut()
.push(right_page_ref.clone());
let new_pages_ids = [mem_page.page_idx, right_page_id];
debug!(
"splitting left={} right={}",
new_pages_ids[0], new_pages_ids[1]
self.stack.current(),
right_page_id
);
// drop divider cells and find right pointer
// NOTE: since we are doing a simple split we only finding the pointer we want to update (right pointer).
// Right pointer means cell that points to the last page, as we don't really want to drop this one. This one
// can be a "rightmost pointer" or a "cell".
// TODO(pere): simplify locking...
// we always asumme there is a parent
self.write_info.state = WriteState::BalanceGetParentPage;
return Ok(CursorResult::Ok(()));
Ok(CursorResult::Ok(()))
}
WriteState::BalanceGetParentPage => {
let current_page = self.write_info.current_page.borrow();
let mem_page = &current_page.as_ref().unwrap().0;
let parent_rc = self.stack.parent();
let loaded = parent_rc.borrow().is_loaded();
let locked = parent_rc.borrow().is_locked();
let parent_rc = mem_page.parent.as_ref().unwrap();
let parent_ref = self.pager.read_page(parent_rc.page_idx)?;
if !RefCell::borrow(&parent_ref).is_locked() {
self.write_info.state = WriteState::BalanceMoveUp;
self.write_info
.parent_page
.borrow_mut()
.replace((parent_rc.clone(), parent_ref.clone()));
Ok(CursorResult::Ok(()))
} else {
if locked {
Ok(CursorResult::IO)
} else {
if !loaded {
debug!("balance_leaf(loading page {} {})", locked, loaded);
self.pager.load_page(parent_rc.clone())?;
return Ok(CursorResult::IO);
}
parent_rc.borrow_mut().set_dirty();
self.write_info.state = WriteState::BalanceMoveUp;
Ok(CursorResult::Ok(()))
}
}
WriteState::BalanceMoveUp => {
let parent = self.write_info.parent_page.borrow();
let parent_entry = parent.as_ref().unwrap();
let parent_ref = &parent_entry.1;
let parent_ref = self.stack.parent();
let parent = RefCell::borrow_mut(&parent_ref);
let (page_type, current_idx) = {
let current_page = self.write_info.current_page.borrow();
let pagerc = current_page.as_ref().unwrap();
let page = RefCell::borrow(&pagerc.1);
let page = page.contents.read().unwrap();
(
page.as_ref().unwrap().page_type().clone(),
pagerc.0.page_idx,
)
let current_page = self.stack.top();
let page_ref = current_page.borrow();
let page = page_ref.contents.read().unwrap();
(page.as_ref().unwrap().page_type().clone(), page_ref.id)
};
parent.set_dirty();
@@ -904,8 +907,9 @@ impl BTreeCursor {
let scratch_cells = self.write_info.scratch_cells.borrow();
// reset pages
for (_, page) in new_pages.iter() {
for page in new_pages.iter() {
let page = page.borrow_mut();
assert!(page.is_dirty());
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
@@ -927,7 +931,7 @@ impl BTreeCursor {
let mut current_cell_index = 0_usize;
let mut divider_cells_index = Vec::new(); /* index to scratch cells that will be used as dividers in order */
for (i, (_, page)) in new_pages.iter_mut().enumerate() {
for (i, page) in new_pages.iter_mut().enumerate() {
let page = page.borrow_mut();
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
@@ -949,15 +953,15 @@ impl BTreeCursor {
current_cell_index += cells_to_copy;
}
let is_leaf = {
let page = self.write_info.current_page.borrow();
let page = RefCell::borrow(&page.as_ref().unwrap().1);
let page = self.stack.top();
let page = page.borrow();
let page = page.contents.read().unwrap();
page.as_ref().unwrap().is_leaf()
};
// update rightmost pointer for each page if we are in interior page
if !is_leaf {
for (_, page) in new_pages.iter_mut().take(new_pages_len - 1) {
for page in new_pages.iter_mut().take(new_pages_len - 1) {
let page = page.borrow_mut();
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
@@ -981,7 +985,6 @@ impl BTreeCursor {
}
// last page right most pointer points to previous right most pointer before splitting
let last_page = new_pages.last().unwrap();
let last_page = &last_page.1;
let last_page = RefCell::borrow(&last_page);
let mut last_page = last_page.contents.write().unwrap();
let last_page = last_page.as_mut().unwrap();
@@ -993,22 +996,22 @@ impl BTreeCursor {
// insert dividers in parent
// we can consider dividers the first cell of each page starting from the second page
for (page_id_index, (mem_page, page)) in
for (page_id_index, page) in
new_pages.iter_mut().take(new_pages_len - 1).enumerate()
{
let page = page.borrow_mut();
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
assert!(page.cell_count() > 1);
let mut contents = page.contents.write().unwrap();
let contents = contents.as_mut().unwrap();
assert!(contents.cell_count() > 1);
let divider_cell_index = divider_cells_index[page_id_index];
let cell_payload = scratch_cells[divider_cell_index];
let cell = read_btree_cell(
cell_payload,
&page.page_type(),
&contents.page_type(),
0,
self.pager.clone(),
self.max_local(page.page_type()),
self.min_local(page.page_type()),
self.max_local(contents.page_type()),
self.min_local(contents.page_type()),
self.usable_space(),
)
.unwrap();
@@ -1020,7 +1023,7 @@ impl BTreeCursor {
_ => unreachable!(),
};
let mut divider_cell = Vec::new();
divider_cell.extend_from_slice(&(mem_page.page_idx as u32).to_be_bytes());
divider_cell.extend_from_slice(&(page.id as u32).to_be_bytes());
divider_cell.extend(std::iter::repeat(0).take(9));
let n = write_varint(&mut divider_cell.as_mut_slice()[4..], key);
divider_cell.truncate(4 + n);
@@ -1036,7 +1039,7 @@ impl BTreeCursor {
BTreeCell::TableInteriorCell(interior) => interior._rowid,
_ => unreachable!(),
};
let parent_cell_idx = self.find_cell(page, key);
let parent_cell_idx = self.find_cell(contents, key);
self.insert_into_cell(parent_contents, cell_payload, parent_cell_idx);
// self.drop_cell(*page, 0);
}
@@ -1044,15 +1047,12 @@ impl BTreeCursor {
{
// copy last page id to right pointer
let last_pointer = new_pages.last().unwrap().0.page_idx as u32;
let last_pointer = new_pages.last().unwrap().borrow().id as u32;
parent_contents.write_u32(right_pointer, last_pointer);
}
self.page = RefCell::new(Some(parent_entry.0.clone()));
self.write_info
.current_page
.replace(Some(parent_entry.clone()));
self.stack.pop();
self.write_info.state = WriteState::BalanceStart;
self.write_info.page_copy.replace(None);
let _ = self.write_info.page_copy.take();
Ok(CursorResult::Ok(()))
}
@@ -1079,11 +1079,10 @@ impl BTreeCursor {
/* swap splitted page buffer with new root buffer so we don't have to update page idx */
{
let (root_id, child_id, child) = {
let page = self.write_info.current_page.borrow();
let page_ref = &page.as_ref().unwrap().1;
let page_ref = self.stack.top();
let child = page_ref.clone();
let mut page_rc = RefCell::borrow_mut(page_ref);
let mut new_root_page = RefCell::borrow_mut(&new_root_page_ref);
let mut page_rc = page_ref.borrow_mut();
let mut new_root_page = new_root_page_ref.borrow_mut();
// Swap the entire Page structs
std::mem::swap(&mut page_rc.id, &mut new_root_page.id);
@@ -1093,19 +1092,16 @@ impl BTreeCursor {
(new_root_page.id, page_rc.id, child)
};
debug!("Balancing root. root={}, rightmost={}", root_id, child_id);
let root = new_root_page_ref.clone();
let parent = Some(Rc::new(MemPage::new(None, root_id, 0)));
let current_mem_page = Rc::new(MemPage::new(parent, child_id, 0));
self.page = RefCell::new(Some(current_mem_page.clone()));
self.root_page = root_id;
self.stack.clear();
self.stack.push(root.clone());
self.stack.push(child.clone());
self.write_info
.current_page
.replace(Some((current_mem_page, child.clone())));
debug!("Balancing root. root={}, rightmost={}", root_id, child_id);
self.pager.put_page(root_id, root);
self.pager.put_page(child_id, child);
self.pager.put_loaded_page(root_id, root);
self.pager.put_loaded_page(child_id, child);
}
}
@@ -1115,6 +1111,7 @@ impl BTreeCursor {
{
// setup btree page
let contents = RefCell::borrow(&page);
debug!("allocating page {}", contents.id);
let mut contents = contents.contents.write().unwrap();
let contents = contents.as_mut().unwrap();
let id = page_type as u8;
@@ -1336,12 +1333,6 @@ impl BTreeCursor {
nfree as u16
}
fn get_mem_page(&self) -> Rc<MemPage> {
let mem_page = self.page.borrow();
let mem_page = mem_page.as_ref().unwrap();
mem_page.clone()
}
fn fill_cell_payload(
&self,
page_type: PageType,
@@ -1482,6 +1473,84 @@ impl BTreeCursor {
}
}
impl PageStack {
fn push(&self, page: Rc<RefCell<Page>>) {
debug!(
"pagestack::push(current={}, new_page_id={})",
self.current_page.borrow(),
page.borrow().id
);
*self.current_page.borrow_mut() += 1;
let current = *self.current_page.borrow();
assert!(
current < BTCURSOR_MAX_DEPTH as i32,
"corrupted database, stack is bigger than expected"
);
self.stack.borrow_mut()[current as usize] = Some(page);
self.cell_indices.borrow_mut()[current as usize] = 0;
}
fn pop(&self) {
let current = *self.current_page.borrow();
debug!("pagestack::pop(current={})", current);
self.cell_indices.borrow_mut()[current as usize] = 0;
self.stack.borrow_mut()[current as usize] = None;
*self.current_page.borrow_mut() -= 1;
}
fn top(&self) -> Rc<RefCell<Page>> {
let current = *self.current_page.borrow();
let page = self.stack.borrow()[current as usize]
.as_ref()
.unwrap()
.clone();
debug!(
"pagestack::top(current={}, page_id={})",
current,
page.borrow().id
);
page
}
fn parent(&self) -> Rc<RefCell<Page>> {
let current = *self.current_page.borrow();
self.stack.borrow()[current as usize - 1]
.as_ref()
.unwrap()
.clone()
}
/// Current page pointer being used
fn current(&self) -> usize {
*self.current_page.borrow() as usize
}
/// Cell index of the current page
fn current_index(&self) -> usize {
let current = self.current();
self.cell_indices.borrow()[current]
}
/// Advance the current cell index of the current page to the next cell.
fn advance(&self) {
let current = self.current();
self.cell_indices.borrow_mut()[current] += 1;
}
fn set_cell_index(&self, idx: usize) {
let current = self.current();
self.cell_indices.borrow_mut()[current] = idx;
}
fn has_parent(&self) -> bool {
*self.current_page.borrow() > 0
}
fn clear(&self) {
*self.current_page.borrow_mut() = -1;
}
}
fn find_free_cell(page_ref: &PageContent, db_header: Ref<DatabaseHeader>, amount: usize) -> usize {
// NOTE: freelist is in ascending order of keys and pc
// unuse_space is reserved bytes at the end of page, therefore we must substract from maxpc
@@ -1534,8 +1603,8 @@ impl Cursor for BTreeCursor {
}
fn rewind(&mut self) -> Result<CursorResult<()>> {
let mem_page = MemPage::new(None, self.root_page, 0);
self.page.replace(Some(Rc::new(mem_page)));
self.move_to_root();
match self.get_next_record(None)? {
CursorResult::Ok((rowid, next)) => {
self.rowid.replace(rowid);
@@ -1598,7 +1667,7 @@ impl Cursor for BTreeCursor {
};
}
match self.insert_to_page(key, _record)? {
match self.insert_into_page(key, _record)? {
CursorResult::Ok(_) => Ok(CursorResult::Ok(())),
CursorResult::IO => Ok(CursorResult::IO),
}
@@ -1621,9 +1690,10 @@ impl Cursor for BTreeCursor {
CursorResult::Ok(_) => {}
CursorResult::IO => return Ok(CursorResult::IO),
};
let page_ref = self.get_current_page()?;
let page_ref = self.stack.top();
let page = RefCell::borrow(&page_ref);
if page.is_locked() {
// TODO(pere); request load
return Ok(CursorResult::IO);
}

View File

@@ -3,7 +3,7 @@ use crate::storage::database::DatabaseStorage;
use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent};
use crate::storage::wal::Wal;
use crate::{Buffer, Result};
use log::trace;
use log::{debug, trace};
use sieve_cache::SieveCache;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
@@ -29,12 +29,8 @@ const PAGE_LOCKED: usize = 0b010;
const PAGE_ERROR: usize = 0b100;
/// Page is dirty. Flush needed.
const PAGE_DIRTY: usize = 0b1000;
impl Default for Page {
fn default() -> Self {
Self::new(0)
}
}
/// Page's contents are loaded in memory.
const PAGE_LOADED: usize = 0b10000;
impl Page {
pub fn new(id: usize) -> Page {
@@ -92,6 +88,19 @@ impl Page {
pub fn clear_dirty(&self) {
self.flags.fetch_and(!PAGE_DIRTY, Ordering::SeqCst);
}
pub fn is_loaded(&self) -> bool {
self.flags.load(Ordering::SeqCst) & PAGE_LOADED != 0
}
pub fn set_loaded(&self) {
self.flags.fetch_or(PAGE_LOADED, Ordering::SeqCst);
}
pub fn clear_loaded(&self) {
log::debug!("clear loaded {}", self.id);
self.flags.fetch_and(!PAGE_LOADED, Ordering::SeqCst);
}
}
#[allow(dead_code)]
@@ -125,8 +134,13 @@ impl DumbLruPageCache {
}
}
pub fn contains_key(&mut self, key: usize) -> bool {
self.map.borrow().contains_key(&key)
}
pub fn insert(&mut self, key: usize, value: Rc<RefCell<Page>>) {
self.delete(key);
debug!("cache_insert(key={})", key);
self._delete(key, false);
let mut entry = Box::new(PageCacheEntry {
key,
next: None,
@@ -144,6 +158,11 @@ impl DumbLruPageCache {
}
pub fn delete(&mut self, key: usize) {
self._delete(key, true)
}
pub fn _delete(&mut self, key: usize, clean_page: bool) {
debug!("cache_delete(key={}, clean={})", key, clean_page);
let ptr = self.map.borrow_mut().remove(&key);
if ptr.is_none() {
return;
@@ -151,7 +170,7 @@ impl DumbLruPageCache {
let mut ptr = ptr.unwrap();
{
let ptr = unsafe { ptr.as_mut() };
self.detach(ptr);
self.detach(ptr, clean_page);
}
unsafe { drop_in_place(ptr.as_ptr()) };
}
@@ -163,11 +182,12 @@ impl DumbLruPageCache {
}
pub fn get(&mut self, key: &usize) -> Option<Rc<RefCell<Page>>> {
debug!("cache_get(key={})", key);
let ptr = self.get_ptr(*key);
ptr?;
let ptr = unsafe { ptr.unwrap().as_mut() };
let page = ptr.page.clone();
self.detach(ptr);
//self.detach(ptr);
self.touch(ptr);
Some(page)
}
@@ -177,9 +197,18 @@ impl DumbLruPageCache {
todo!();
}
fn detach(&mut self, entry: &mut PageCacheEntry) {
fn detach(&mut self, entry: &mut PageCacheEntry, clean_page: bool) {
let mut current = entry.as_non_null();
if clean_page {
// evict buffer
let page = entry.page.borrow_mut();
page.clear_loaded();
debug!("cleaning up page {}", page.id);
let mut contents = page.contents.write().unwrap();
let _ = contents.as_mut().take();
}
let (next, prev) = unsafe {
let c = current.as_mut();
let next = c.next;
@@ -230,7 +259,7 @@ impl DumbLruPageCache {
// TODO: drop from another clean entry?
return;
}
self.detach(tail);
self.detach(tail, true);
}
fn clear(&mut self) {
@@ -275,6 +304,12 @@ enum FlushState {
WaitSyncDbFile,
}
#[derive(Clone, Debug)]
enum CheckpointState {
Checkpoint,
CheckpointDone,
}
/// This will keep track of the state of current cache flush in order to not repeat work
struct FlushInfo {
state: FlushState,
@@ -300,6 +335,8 @@ pub struct Pager {
db_header: Rc<RefCell<DatabaseHeader>>,
flush_info: RefCell<FlushInfo>,
checkpoint_state: RefCell<CheckpointState>,
checkpoint_inflight: Rc<RefCell<usize>>,
syncing: Rc<RefCell<bool>>,
}
@@ -333,6 +370,8 @@ impl Pager {
in_flight_writes: Rc::new(RefCell::new(0)),
}),
syncing: Rc::new(RefCell::new(false)),
checkpoint_state: RefCell::new(CheckpointState::Checkpoint),
checkpoint_inflight: Rc::new(RefCell::new(0)),
})
}
@@ -347,7 +386,10 @@ impl Pager {
}
pub fn end_tx(&self) -> Result<CheckpointStatus> {
self.cacheflush()?;
match self.cacheflush()? {
CheckpointStatus::Done => {}
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
};
self.wal.borrow().end_read_tx()?;
Ok(CheckpointStatus::Done)
}
@@ -357,6 +399,7 @@ impl Pager {
trace!("read_page(page_idx = {})", page_idx);
let mut page_cache = self.page_cache.borrow_mut();
if let Some(page) = page_cache.get(&page_idx) {
trace!("read_page(page_idx = {}) = cached", page_idx);
return Ok(page.clone());
}
let page = Rc::new(RefCell::new(Page::new(page_idx)));
@@ -369,6 +412,8 @@ impl Pager {
let page = page.borrow_mut();
page.set_uptodate();
}
// TODO(pere) ensure page is inserted, we should probably first insert to page cache
// and if successful, read frame or page
page_cache.insert(page_idx, page.clone());
return Ok(page);
}
@@ -378,10 +423,44 @@ impl Pager {
page.clone(),
page_idx,
)?;
// TODO(pere) ensure page is inserted
page_cache.insert(page_idx, page.clone());
Ok(page)
}
/// Loads pages if not loaded
pub fn load_page(&self, page: Rc<RefCell<Page>>) -> Result<()> {
let id = page.borrow().id;
trace!("load_page(page_idx = {})", id);
let mut page_cache = self.page_cache.borrow_mut();
page.borrow_mut().set_locked();
if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? {
self.wal
.borrow()
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
{
let page = page.borrow_mut();
page.set_uptodate();
}
// TODO(pere) ensure page is inserted
if !page_cache.contains_key(id) {
page_cache.insert(id, page.clone());
}
return Ok(());
}
sqlite3_ondisk::begin_read_page(
self.page_io.clone(),
self.buffer_pool.clone(),
page.clone(),
id,
)?;
// TODO(pere) ensure page is inserted
if !page_cache.contains_key(id) {
page_cache.insert(id, page.clone());
}
Ok(())
}
/// Writes the database header.
pub fn write_database_header(&self, header: &DatabaseHeader) {
sqlite3_ondisk::begin_write_database_header(header, self).expect("failed to write header");
@@ -405,8 +484,15 @@ impl Pager {
FlushState::Start => {
let db_size = self.db_header.borrow().database_size;
for page_id in self.dirty_pages.borrow().iter() {
debug!("appending frame {}", page_id);
let mut cache = self.page_cache.borrow_mut();
let page = cache.get(page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
{
let contents = page.borrow();
let contents = contents.contents.read().unwrap();
let contents = contents.as_ref().unwrap();
debug!("appending frame {} {:?}", page_id, contents.page_type());
}
self.wal.borrow_mut().append_frame(
page.clone(),
db_size,
@@ -419,12 +505,11 @@ impl Pager {
return Ok(CheckpointStatus::IO);
}
FlushState::Checkpoint => {
let in_flight = self.flush_info.borrow().in_flight_writes.clone();
match self.wal.borrow_mut().checkpoint(self, in_flight)? {
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
match self.checkpoint()? {
CheckpointStatus::Done => {
self.flush_info.borrow_mut().state = FlushState::CheckpointDone;
self.flush_info.borrow_mut().state = FlushState::SyncDbFile;
}
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
};
}
FlushState::CheckpointDone => {
@@ -467,6 +552,34 @@ impl Pager {
Ok(CheckpointStatus::Done)
}
pub fn checkpoint(&self) -> Result<CheckpointStatus> {
loop {
let state = self.checkpoint_state.borrow().clone();
log::trace!("checkpoint(state={:?})", state);
match state {
CheckpointState::Checkpoint => {
let in_flight = self.checkpoint_inflight.clone();
match self.wal.borrow_mut().checkpoint(self, in_flight)? {
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
CheckpointStatus::Done => {
self.checkpoint_state
.replace(CheckpointState::CheckpointDone);
}
};
}
CheckpointState::CheckpointDone => {
let in_flight = self.checkpoint_inflight.clone();
if *in_flight.borrow() > 0 {
return Ok(CheckpointStatus::IO);
} else {
self.checkpoint_state.replace(CheckpointState::Checkpoint);
return Ok(CheckpointStatus::Done);
}
}
}
}
}
// WARN: used for testing purposes
pub fn clear_page_cache(&self) {
loop {
@@ -539,9 +652,11 @@ impl Pager {
Ok(page_ref)
}
pub fn put_page(&self, id: usize, page: Rc<RefCell<Page>>) {
pub fn put_loaded_page(&self, id: usize, page: Rc<RefCell<Page>>) {
let mut cache = RefCell::borrow_mut(&self.page_cache);
cache.insert(id, page);
// cache insert invalidates previous page
cache.insert(id, page.clone());
page.borrow_mut().set_loaded();
}
pub fn usable_size(&self) -> usize {

View File

@@ -48,7 +48,7 @@ use crate::storage::database::DatabaseStorage;
use crate::storage::pager::{Page, Pager};
use crate::types::{OwnedRecord, OwnedValue};
use crate::{File, Result};
use log::trace;
use log::{debug, trace};
use std::cell::RefCell;
use std::pin::Pin;
use std::rc::Rc;
@@ -525,6 +525,7 @@ fn finish_read_page(
page.contents.write().unwrap().replace(inner);
page.set_uptodate();
page.clear_locked();
page.set_loaded();
}
Ok(())
}
@@ -538,6 +539,7 @@ pub fn begin_write_btree_page(
let page_finish = page.clone();
let page_id = page.borrow().id;
log::trace!("begin_write_btree_page(page_id={})", page_id);
let buffer = {
let page = page.borrow();
let contents = page.contents.read().unwrap();
@@ -549,6 +551,7 @@ pub fn begin_write_btree_page(
let write_complete = {
let buf_copy = buffer.clone();
Box::new(move |bytes_written: i32| {
log::trace!("finish_write_btree_page");
let buf_copy = buf_copy.clone();
let buf_len = buf_copy.borrow().len();
*write_counter.borrow_mut() -= 1;
@@ -959,7 +962,7 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec<u8>) {
pub fn begin_read_wal_header(io: &Rc<dyn File>) -> Result<Rc<RefCell<WalHeader>>> {
let drop_fn = Rc::new(|_buf| {});
let buf = Rc::new(RefCell::new(Buffer::allocate(WAL_HEADER_SIZE, drop_fn)));
let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn)));
let result = Rc::new(RefCell::new(WalHeader::default()));
let header = result.clone();
let complete = Box::new(move |buf: Rc<RefCell<Buffer>>| {
@@ -1017,6 +1020,7 @@ pub fn begin_write_wal_frame(
) -> Result<()> {
let page_finish = page.clone();
let page_id = page.borrow().id;
trace!("begin_write_wal_frame(offset={}, page={})", offset, page_id);
let header = WalFrameHeader {
page_number: page_id as u32,
@@ -1038,12 +1042,12 @@ pub fn begin_write_wal_frame(
);
let buf = buffer.as_mut_slice();
buf[0..4].copy_from_slice(&header.page_number.to_ne_bytes());
buf[4..8].copy_from_slice(&header.db_size.to_ne_bytes());
buf[8..12].copy_from_slice(&header.salt_1.to_ne_bytes());
buf[12..16].copy_from_slice(&header.salt_2.to_ne_bytes());
buf[16..20].copy_from_slice(&header.checksum_1.to_ne_bytes());
buf[20..24].copy_from_slice(&header.checksum_2.to_ne_bytes());
buf[0..4].copy_from_slice(&header.page_number.to_be_bytes());
buf[4..8].copy_from_slice(&header.db_size.to_be_bytes());
buf[8..12].copy_from_slice(&header.salt_1.to_be_bytes());
buf[12..16].copy_from_slice(&header.salt_2.to_be_bytes());
buf[16..20].copy_from_slice(&header.checksum_1.to_be_bytes());
buf[20..24].copy_from_slice(&header.checksum_2.to_be_bytes());
buf[WAL_FRAME_HEADER_SIZE..].copy_from_slice(&contents.as_ptr());
Rc::new(RefCell::new(buffer))
@@ -1052,6 +1056,7 @@ pub fn begin_write_wal_frame(
*write_counter.borrow_mut() += 1;
let write_complete = {
let buf_copy = buffer.clone();
log::info!("finished");
Box::new(move |bytes_written: i32| {
let buf_copy = buf_copy.clone();
let buf_len = buf_copy.borrow().len();
@@ -1072,7 +1077,7 @@ pub fn begin_write_wal_header(io: &Rc<dyn File>, header: &WalHeader) -> Result<(
let buffer = {
let drop_fn = Rc::new(|_buf| {});
let mut buffer = Buffer::allocate(WAL_HEADER_SIZE, drop_fn);
let mut buffer = Buffer::allocate(512, drop_fn);
let buf = buffer.as_mut_slice();
buf[0..4].copy_from_slice(&header.magic);

View File

@@ -1,6 +1,8 @@
use std::collections::{HashMap, HashSet};
use std::{cell::RefCell, rc::Rc, sync::Arc};
use log::{debug, trace};
use crate::io::{File, SyncCompletion, IO};
use crate::storage::sqlite3_ondisk::{
begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
@@ -115,10 +117,11 @@ impl Wal for WalFile {
page: Rc<RefCell<Page>>,
buffer_pool: Rc<BufferPool>,
) -> Result<()> {
debug!("read_frame({})", frame_id);
let offset = self.frame_offset(frame_id);
begin_read_wal_frame(
self.file.borrow().as_ref().unwrap(),
offset,
offset + WAL_FRAME_HEADER_SIZE,
buffer_pool,
page,
)?;
@@ -137,6 +140,12 @@ impl Wal for WalFile {
let page_id = page.borrow().id;
let frame_id = *self.max_frame.borrow();
let offset = self.frame_offset(frame_id);
trace!(
"append_frame(frame={}, offset={}, page_id={})",
frame_id,
offset,
page_id
);
begin_write_wal_frame(
self.file.borrow().as_ref().unwrap(),
offset,
@@ -246,7 +255,7 @@ impl WalFile {
if self.file.borrow().is_none() {
match self
.io
.open_file(&self.wal_path, crate::io::OpenFlags::Create)
.open_file(&self.wal_path, crate::io::OpenFlags::Create, false)
{
Ok(file) => {
if file.size()? > 0 {
@@ -285,7 +294,7 @@ impl WalFile {
let header = header.as_ref().unwrap().borrow();
let page_size = header.page_size;
let page_offset = frame_id * (page_size as u64 + WAL_FRAME_HEADER_SIZE as u64);
let offset = WAL_HEADER_SIZE as u64 + WAL_FRAME_HEADER_SIZE as u64 + page_offset;
let offset = WAL_HEADER_SIZE as u64 + page_offset;
offset as usize
}
}

View File

@@ -1098,6 +1098,7 @@ impl Program {
)));
}
}
log::trace!("Halt auto_commit {}", self.auto_commit);
if self.auto_commit {
return match pager.end_tx() {
Ok(crate::storage::wal::CheckpointStatus::IO) => Ok(StepResult::IO),

View File

@@ -94,8 +94,13 @@ impl SimulatorIO {
}
impl IO for SimulatorIO {
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn limbo_core::File>> {
let inner = self.inner.open_file(path, flags)?;
fn open_file(
&self,
path: &str,
flags: OpenFlags,
_direct: bool,
) -> Result<Rc<dyn limbo_core::File>> {
let inner = self.inner.open_file(path, flags, false)?;
let file = Rc::new(SimulatorFile {
inner,
fault: RefCell::new(false),

View File

@@ -28,9 +28,12 @@ impl TempDatabase {
}
pub fn connect_limbo(&self) -> Rc<limbo_core::Connection> {
log::debug!("conneting to limbo");
let db = Database::open_file(self.io.clone(), self.path.to_str().unwrap()).unwrap();
db.connect()
let conn = db.connect();
log::debug!("connected to limbo");
conn
}
}
@@ -51,6 +54,7 @@ mod tests {
let list_query = "SELECT * FROM test";
let max_iterations = 10000;
for i in 0..max_iterations {
debug!("inserting {} ", i);
if (i % 100) == 0 {
let progress = (i as f64 / max_iterations as f64) * 100.0;
println!("progress {:.1}%", progress);
@@ -103,7 +107,6 @@ mod tests {
}
#[test]
#[ignore]
fn test_simple_overflow_page() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY, t TEXT);");
@@ -169,7 +172,6 @@ mod tests {
Ok(())
}
#[ignore]
#[test]
fn test_sequential_overflow_page() -> anyhow::Result<()> {
let _ = env_logger::try_init();
@@ -305,6 +307,78 @@ mod tests {
Ok(())
}
#[test]
fn test_wal_restart() -> anyhow::Result<()> {
let _ = env_logger::try_init();
let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY);");
// threshold is 1000 by default
fn insert(i: usize, conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {
log::debug!("inserting {}", i);
let insert_query = format!("INSERT INTO test VALUES ({})", i);
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.next_row()? {
RowResult::IO => {
tmp_db.io.run_once()?;
}
RowResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
log::debug!("inserted {}", i);
tmp_db.io.run_once()?;
Ok(())
}
fn count(conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<usize> {
log::debug!("counting");
let list_query = "SELECT count(x) FROM test";
loop {
match conn.query(list_query).unwrap() {
Some(ref mut rows) => loop {
match rows.next_row()? {
RowResult::Row(row) => {
let first_value = &row.values[0];
let count = match first_value {
Value::Integer(i) => *i as i32,
_ => unreachable!(),
};
log::debug!("counted {}", count);
return Ok(count as usize);
}
RowResult::IO => {
tmp_db.io.run_once()?;
}
RowResult::Done => break,
}
},
None => {}
}
}
}
{
let conn = tmp_db.connect_limbo();
insert(1, &conn, &tmp_db).unwrap();
assert_eq!(count(&conn, &tmp_db).unwrap(), 1);
}
{
let conn = tmp_db.connect_limbo();
assert_eq!(
count(&conn, &tmp_db).unwrap(),
1,
"failed to read from wal from another connection"
);
}
Ok(())
}
fn compare_string(a: &String, b: &String) {
assert_eq!(a.len(), b.len(), "Strings are not equal in size!");
let a = a.as_bytes();

View File

@@ -8,7 +8,6 @@ source $testdir/coalesce.test
source $testdir/glob.test
source $testdir/join.test
source $testdir/insert.test
source $testdir/join.test
source $testdir/json.test
source $testdir/like.test
source $testdir/orderby.test