diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index e40845728..d6cd40313 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -25,12 +25,15 @@ jobs: runs-on: ${{ matrix.os }} steps: - - uses: Swatinem/rust-cache@v2 - uses: actions/checkout@v3 + - uses: Swatinem/rust-cache@v2 - name: Build run: cargo build --verbose - name: Test + env: + RUST_LOG: ${{ runner.debug && 'limbo_core::storage=trace' || '' }} run: cargo test --verbose + timeout-minutes: 5 build-wasm: runs-on: ubuntu-latest diff --git a/bindings/python/requirements-dev.txt b/bindings/python/requirements-dev.txt index ea68f9ec1..c3a985985 100644 --- a/bindings/python/requirements-dev.txt +++ b/bindings/python/requirements-dev.txt @@ -8,7 +8,7 @@ mypy==1.11.0 # via limbo (pyproject.toml) mypy-extensions==1.0.0 # via mypy -packaging==24.1 +packaging==24.2 # via pytest pluggy==1.5.0 # via pytest diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index e7fdfc882..b32367abd 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -16,7 +16,9 @@ impl Database { #[wasm_bindgen(constructor)] pub fn new(path: &str) -> Database { let io = Arc::new(PlatformIO { vfs: VFS::new() }); - let file = io.open_file(path, limbo_core::OpenFlags::None).unwrap(); + let file = io + .open_file(path, limbo_core::OpenFlags::None, false) + .unwrap(); let page_io = Rc::new(DatabaseStorage::new(file)); let wal = Rc::new(RefCell::new(Wal {})); let db = limbo_core::Database::open(io, page_io, wal).unwrap(); @@ -124,7 +126,12 @@ pub struct PlatformIO { } impl limbo_core::IO for PlatformIO { - fn open_file(&self, path: &str, _flags: OpenFlags) -> Result> { + fn open_file( + &self, + path: &str, + _flags: OpenFlags, + _direct: bool, + ) -> Result> { let fd = self.vfs.open(path); Ok(Rc::new(File { vfs: VFS::new(), diff --git a/core/io/common.rs b/core/io/common.rs index 6627e2076..9608c48ab 100644 --- a/core/io/common.rs +++ b/core/io/common.rs @@ -13,7 +13,7 @@ pub mod tests { // Parent process opens the file let io1 = create_io().expect("Failed to create IO"); let _file1 = io1 - .open_file(&path, crate::io::OpenFlags::None) + .open_file(&path, crate::io::OpenFlags::None, false) .expect("Failed to open file in parent process"); let current_exe = std::env::current_exe().expect("Failed to get current executable path"); @@ -38,7 +38,7 @@ pub mod tests { if std::env::var("RUST_TEST_CHILD_PROCESS").is_ok() { let path = std::env::var("RUST_TEST_FILE_PATH")?; let io = create_io()?; - match io.open_file(&path, crate::io::OpenFlags::None) { + match io.open_file(&path, crate::io::OpenFlags::None, false) { Ok(_) => std::process::exit(0), Err(_) => std::process::exit(1), } diff --git a/core/io/darwin.rs b/core/io/darwin.rs index cf527a3cb..bdab24af7 100644 --- a/core/io/darwin.rs +++ b/core/io/darwin.rs @@ -31,7 +31,7 @@ impl DarwinIO { } impl IO for DarwinIO { - fn open_file(&self, path: &str, flags: OpenFlags) -> Result> { + fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result> { trace!("open_file(path = {})", path); let file = std::fs::File::options() .read(true) diff --git a/core/io/generic.rs b/core/io/generic.rs index c703ff78e..c8c5c45b8 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -13,7 +13,7 @@ impl GenericIO { } impl IO for GenericIO { - fn open_file(&self, path: &str, flags: OpenFlags) -> Result> { + fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result> { trace!("open_file(path = {})", path); let file = std::fs::File::open(path)?; Ok(Rc::new(GenericFile { diff --git a/core/io/linux.rs b/core/io/linux.rs index 76d20809d..3a2de5f42 100644 --- a/core/io/linux.rs +++ b/core/io/linux.rs @@ -4,8 +4,8 @@ use libc::{c_short, fcntl, flock, iovec, F_SETLK}; use log::{debug, trace}; use nix::fcntl::{FcntlArg, OFlag}; use std::cell::RefCell; +use std::collections::HashMap; use std::fmt; -use std::os::unix::fs::MetadataExt; use std::os::unix::io::AsRawFd; use std::rc::Rc; use thiserror::Error; @@ -37,6 +37,8 @@ pub struct LinuxIO { struct WrappedIOUring { ring: io_uring::IoUring, pending_ops: usize, + pub pending: HashMap>, + key: u64, } struct InnerLinuxIO { @@ -52,6 +54,8 @@ impl LinuxIO { ring: WrappedIOUring { ring, pending_ops: 0, + pending: HashMap::new(), + key: 0, }, iovecs: [iovec { iov_base: std::ptr::null_mut(), @@ -76,7 +80,9 @@ impl InnerLinuxIO { } impl WrappedIOUring { - fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) { + fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Rc) { + log::trace!("submit_entry({:?})", entry); + self.pending.insert(entry.get_user_data(), c); unsafe { self.ring .submission() @@ -95,6 +101,7 @@ impl WrappedIOUring { // NOTE: This works because CompletionQueue's next function pops the head of the queue. This is not normal behaviour of iterators let entry = self.ring.completion().next(); if entry.is_some() { + log::trace!("get_completion({:?})", entry); // consumed an entry from completion queue, update pending_ops self.pending_ops -= 1; } @@ -104,10 +111,15 @@ impl WrappedIOUring { fn empty(&self) -> bool { self.pending_ops == 0 } + + fn get_key(&mut self) -> u64 { + self.key += 1; + self.key + } } impl IO for LinuxIO { - fn open_file(&self, path: &str, flags: OpenFlags) -> Result> { + fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result> { trace!("open_file(path = {})", path); let file = std::fs::File::options() .read(true) @@ -117,10 +129,12 @@ impl IO for LinuxIO { // Let's attempt to enable direct I/O. Not all filesystems support it // so ignore any errors. let fd = file.as_raw_fd(); - match nix::fcntl::fcntl(fd, FcntlArg::F_SETFL(OFlag::O_DIRECT)) { - Ok(_) => {}, - Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"), - }; + if direct { + match nix::fcntl::fcntl(fd, FcntlArg::F_SETFL(OFlag::O_DIRECT)) { + Ok(_) => {}, + Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"), + }; + } let linux_file = Rc::new(LinuxFile { io: self.inner.clone(), file, @@ -145,12 +159,16 @@ impl IO for LinuxIO { let result = cqe.result(); if result < 0 { return Err(LimboError::LinuxIOError(format!( - "{}", - LinuxIOError::IOUringCQError(result) + "{} cqe: {:?}", + LinuxIOError::IOUringCQError(result), + cqe ))); } - let c = unsafe { Rc::from_raw(cqe.user_data() as *const Completion) }; - c.complete(cqe.result()); + { + let c = ring.pending.get(&cqe.user_data()).unwrap().clone(); + c.complete(cqe.result()); + } + ring.pending.remove(&cqe.user_data()); } Ok(()) } @@ -234,14 +252,13 @@ impl File for LinuxFile { let mut buf = r.buf_mut(); let len = buf.len(); let buf = buf.as_mut_ptr(); - let ptr = Rc::into_raw(c.clone()); let iovec = io.get_iovec(buf, len); io_uring::opcode::Readv::new(fd, iovec, 1) .offset(pos as u64) .build() - .user_data(ptr as u64) + .user_data(io.ring.get_key()) }; - io.ring.submit_entry(&read_e); + io.ring.submit_entry(&read_e, c); Ok(()) } @@ -255,25 +272,25 @@ impl File for LinuxFile { let fd = io_uring::types::Fd(self.file.as_raw_fd()); let write = { let buf = buffer.borrow(); - let ptr = Rc::into_raw(c.clone()); + trace!("pwrite(pos = {}, length = {})", pos, buf.len()); let iovec = io.get_iovec(buf.as_ptr(), buf.len()); io_uring::opcode::Writev::new(fd, iovec, 1) .offset(pos as u64) .build() - .user_data(ptr as u64) + .user_data(io.ring.get_key()) }; - io.ring.submit_entry(&write); + io.ring.submit_entry(&write, c); Ok(()) } fn sync(&self, c: Rc) -> Result<()> { let fd = io_uring::types::Fd(self.file.as_raw_fd()); - let ptr = Rc::into_raw(c.clone()); + let mut io = self.io.borrow_mut(); + trace!("sync()"); let sync = io_uring::opcode::Fsync::new(fd) .build() - .user_data(ptr as u64); - let mut io = self.io.borrow_mut(); - io.ring.submit_entry(&sync); + .user_data(io.ring.get_key()); + io.ring.submit_entry(&sync, c); Ok(()) } diff --git a/core/io/mod.rs b/core/io/mod.rs index 5d87be948..05b418206 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -24,7 +24,7 @@ pub enum OpenFlags { } pub trait IO { - fn open_file(&self, path: &str, flags: OpenFlags) -> Result>; + fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result>; fn run_once(&self) -> Result<()>; diff --git a/core/io/windows.rs b/core/io/windows.rs index 1ef18665d..db7f9da31 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -13,7 +13,7 @@ impl WindowsIO { } impl IO for WindowsIO { - fn open_file(&self, path: &str, flags: OpenFlags) -> Result> { + fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result> { trace!("open_file(path = {})", path); let file = std::fs::File::options() .read(true) diff --git a/core/lib.rs b/core/lib.rs index 36f5739d0..fab59ed31 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -65,7 +65,7 @@ pub struct Database { impl Database { #[cfg(feature = "fs")] pub fn open_file(io: Arc, path: &str) -> Result> { - let file = io.open_file(path, io::OpenFlags::None)?; + let file = io.open_file(path, io::OpenFlags::None, true)?; let page_io = Rc::new(FileStorage::new(file)); let wal_path = format!("{}-wal", path); let db_header = Pager::begin_open(page_io.clone())?; @@ -283,6 +283,27 @@ impl Connection { self.pager.clear_page_cache(); Ok(()) } + + pub fn checkpoint(&self) -> Result<()> { + self.pager.clear_page_cache(); + Ok(()) + } +} + +impl Drop for Connection { + fn drop(&mut self) { + loop { + // TODO: make this async? + match self.pager.checkpoint().unwrap() { + CheckpointStatus::Done => { + return; + } + CheckpointStatus::IO => { + self.pager.io.run_once().unwrap(); + } + }; + } + } } pub struct Statement { diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 2dfb656d9..bb5afd851 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -24,31 +24,16 @@ const BTREE_HEADER_OFFSET_CELL_CONTENT: usize = 5; /* pointer to first byte of c const BTREE_HEADER_OFFSET_FRAGMENTED: usize = 7; /* number of fragmented bytes -> u8 */ const BTREE_HEADER_OFFSET_RIGHTMOST: usize = 8; /* if internalnode, pointer right most pointer (saved separately from cells) -> u32 */ -#[derive(Debug)] -pub struct MemPage { - parent: Option>, - page_idx: usize, - cell_idx: RefCell, -} - -impl MemPage { - pub fn new(parent: Option>, page_idx: usize, cell_idx: usize) -> Self { - Self { - parent, - page_idx, - cell_idx: RefCell::new(cell_idx), - } - } - - pub fn cell_idx(&self) -> usize { - *self.cell_idx.borrow() - } - - pub fn advance(&self) { - let mut cell_idx = self.cell_idx.borrow_mut(); - *cell_idx += 1; - } -} +/* +** Maximum depth of an SQLite B-Tree structure. Any B-Tree deeper than +** this will be declared corrupt. This value is calculated based on a +** maximum database size of 2^31 pages a minimum fanout of 2 for a +** root-node and 3 for all other internal nodes. +** +** If a tree that appears to be taller than this is encountered, it is +** assumed that the database is corrupt. +*/ +pub const BTCURSOR_MAX_DEPTH: usize = 20; #[derive(Debug)] enum WriteState { @@ -61,9 +46,7 @@ enum WriteState { struct WriteInfo { state: WriteState, - current_page: RefCell, Rc>)>>, - parent_page: RefCell, Rc>)>>, - new_pages: RefCell, Rc>)>>, + new_pages: RefCell>>>, scratch_cells: RefCell>, rightmost_pointer: RefCell>, page_copy: RefCell>, // this holds the copy a of a page needed for buffer references @@ -71,14 +54,38 @@ struct WriteInfo { pub struct BTreeCursor { pager: Rc, + /// Page id of the root page used to go back up fast. root_page: usize, - page: RefCell>>, + /// Rowid and record are stored before being consumed. rowid: RefCell>, record: RefCell>, null_flag: bool, database_header: Rc>, + /// Index internal pages are consumed on the way up, so we store going upwards flag in case + /// we just moved to a parent page and the parent page is an internal index page which requires + /// to be consumed. going_upwards: bool, + /// Write information kept in case of write yields due to I/O. Needs to be stored somewhere + /// right :). write_info: WriteInfo, + /// Page stack used to traverse the btree. + /// Each cursor has a stack because each cursor traverses the btree independently. + stack: PageStack, +} + +/// Stack of pages representing the tree traversal order. +/// current_page represents the current page being used in the tree and current_page - 1 would be +/// the parent. Using current_page + 1 or higher is undefined behaviour. +struct PageStack { + /// Pointer to the currenet page being consumed + current_page: RefCell, + /// List of pages in the stack. Root page will be in index 0 + stack: RefCell<[Option>>; BTCURSOR_MAX_DEPTH + 1]>, + /// List of cell indices in the stack. + /// cell_indices[current_page] is the current cell index being consumed. Similarly + /// cell_indices[current_page-1] is the cell index of the parent of the current page + /// that we save in case of going back up. + cell_indices: RefCell<[usize; BTCURSOR_MAX_DEPTH + 1]>, } impl BTreeCursor { @@ -90,7 +97,6 @@ impl BTreeCursor { Self { pager, root_page, - page: RefCell::new(None), rowid: RefCell::new(None), record: RefCell::new(None), null_flag: false, @@ -98,13 +104,16 @@ impl BTreeCursor { going_upwards: false, write_info: WriteInfo { state: WriteState::Start, - current_page: RefCell::new(None), - parent_page: RefCell::new(None), new_pages: RefCell::new(Vec::with_capacity(4)), scratch_cells: RefCell::new(Vec::new()), rightmost_pointer: RefCell::new(None), page_copy: RefCell::new(None), }, + stack: PageStack { + current_page: RefCell::new(-1), + cell_indices: RefCell::new([0; BTCURSOR_MAX_DEPTH + 1]), + stack: RefCell::new([const { None }; BTCURSOR_MAX_DEPTH + 1]), + }, } } @@ -125,38 +134,61 @@ impl BTreeCursor { predicate: Option<(SeekKey<'_>, SeekOp)>, ) -> Result, Option)>> { loop { - let mem_page = self.get_mem_page(); - let page_idx = mem_page.page_idx; - let page = self.pager.read_page(page_idx)?; - let page = RefCell::borrow(&page); - if page.is_locked() { + let mem_page_rc = self.stack.top(); + let cell_idx = self.stack.current_index(); + + debug!("current id={} cell={}", mem_page_rc.borrow().id, cell_idx); + if mem_page_rc.borrow().is_locked() { return Ok(CursorResult::IO); } - let page = page.contents.read().unwrap(); + if !mem_page_rc.borrow().is_loaded() { + self.pager.load_page(mem_page_rc.clone())?; + return Ok(CursorResult::IO); + } + let mem_page = mem_page_rc.borrow(); + + let page = mem_page.contents.read().unwrap(); let page = page.as_ref().unwrap(); - if mem_page.cell_idx() >= page.cell_count() { - let parent = mem_page.parent.clone(); + if cell_idx == page.cell_count() { + // do rightmost + let has_parent = self.stack.has_parent(); match page.rightmost_pointer() { Some(right_most_pointer) => { - let mem_page = MemPage::new(parent.clone(), right_most_pointer as usize, 0); - self.page.replace(Some(Rc::new(mem_page))); + self.stack.advance(); + let mem_page = self.pager.read_page(right_most_pointer as usize)?; + self.stack.push(mem_page); continue; } - None => match parent { - Some(ref parent) => { + None => { + if has_parent { + debug!("moving simple upwards"); self.going_upwards = true; - self.page.replace(Some(parent.clone())); + self.stack.pop(); continue; - } - None => { + } else { return Ok(CursorResult::Ok((None, None))); } - }, + } } } + + if cell_idx >= page.cell_count() + 1 { + // end + let has_parent = self.stack.current() > 0; + if has_parent { + debug!("moving upwards"); + self.going_upwards = true; + self.stack.pop(); + continue; + } else { + return Ok(CursorResult::Ok((None, None))); + } + } + assert!(cell_idx < page.cell_count()); + let cell = page.cell_get( - mem_page.cell_idx(), + cell_idx, self.pager.clone(), self.max_local(page.page_type()), self.min_local(page.page_type()), @@ -168,10 +200,9 @@ impl BTreeCursor { _rowid, }) => { assert!(predicate.is_none()); - mem_page.advance(); - let mem_page = - MemPage::new(Some(mem_page.clone()), *_left_child_page as usize, 0); - self.page.replace(Some(Rc::new(mem_page))); + self.stack.advance(); + let mem_page = self.pager.read_page(*_left_child_page as usize)?; + self.stack.push(mem_page); continue; } BTreeCell::TableLeafCell(TableLeafCell { @@ -180,7 +211,7 @@ impl BTreeCursor { first_overflow_page: _, }) => { assert!(predicate.is_none()); - mem_page.advance(); + self.stack.advance(); let record = crate::storage::sqlite3_ondisk::read_record(_payload)?; return Ok(CursorResult::Ok((Some(*_rowid), Some(record)))); } @@ -190,14 +221,13 @@ impl BTreeCursor { .. }) => { if !self.going_upwards { - let mem_page = - MemPage::new(Some(mem_page.clone()), *left_child_page as usize, 0); - self.page.replace(Some(Rc::new(mem_page))); + let mem_page = self.pager.read_page(*left_child_page as usize)?; + self.stack.push(mem_page); continue; } self.going_upwards = false; - mem_page.advance(); + self.stack.advance(); let record = crate::storage::sqlite3_ondisk::read_record(payload)?; if predicate.is_none() { @@ -228,7 +258,7 @@ impl BTreeCursor { } } BTreeCell::IndexLeafCell(IndexLeafCell { payload, .. }) => { - mem_page.advance(); + self.stack.advance(); let record = crate::storage::sqlite3_ondisk::read_record(payload)?; if predicate.is_none() { let rowid = match record.values.last() { @@ -270,66 +300,66 @@ impl BTreeCursor { CursorResult::IO => return Ok(CursorResult::IO), }; - let mem_page = self.get_mem_page(); - let page_idx = mem_page.page_idx; - let page = self.pager.read_page(page_idx)?; - let page = RefCell::borrow(&page); - if page.is_locked() { - return Ok(CursorResult::IO); - } + { + let page_rc = self.stack.top(); + let page = page_rc.borrow(); + if page.is_locked() { + return Ok(CursorResult::IO); + } - let page = page.contents.read().unwrap(); - let page = page.as_ref().unwrap(); + let contents = page.contents.read().unwrap(); + let contents = contents.as_ref().unwrap(); - for cell_idx in 0..page.cell_count() { - let cell = page.cell_get( - cell_idx, - self.pager.clone(), - self.max_local(page.page_type()), - self.min_local(page.page_type()), - self.usable_space(), - )?; - match &cell { - BTreeCell::TableLeafCell(TableLeafCell { - _rowid: cell_rowid, - _payload: payload, - first_overflow_page: _, - }) => { - let SeekKey::TableRowId(rowid_key) = key else { - unreachable!("table seek key should be a rowid"); - }; - mem_page.advance(); - let found = match op { - SeekOp::GT => *cell_rowid > rowid_key, - SeekOp::GE => *cell_rowid >= rowid_key, - SeekOp::EQ => *cell_rowid == rowid_key, - }; - if found { - let record = crate::storage::sqlite3_ondisk::read_record(payload)?; - return Ok(CursorResult::Ok((Some(*cell_rowid), Some(record)))); - } - } - BTreeCell::IndexLeafCell(IndexLeafCell { payload, .. }) => { - let SeekKey::IndexKey(index_key) = key else { - unreachable!("index seek key should be a record"); - }; - mem_page.advance(); - let record = crate::storage::sqlite3_ondisk::read_record(payload)?; - let found = match op { - SeekOp::GT => record > *index_key, - SeekOp::GE => record >= *index_key, - SeekOp::EQ => record == *index_key, - }; - if found { - let rowid = match record.values.last() { - Some(OwnedValue::Integer(rowid)) => *rowid as u64, - _ => unreachable!("index cells should have an integer rowid"), + for cell_idx in 0..contents.cell_count() { + let cell = contents.cell_get( + cell_idx, + self.pager.clone(), + self.max_local(contents.page_type()), + self.min_local(contents.page_type()), + self.usable_space(), + )?; + match &cell { + BTreeCell::TableLeafCell(TableLeafCell { + _rowid: cell_rowid, + _payload: payload, + first_overflow_page: _, + }) => { + let SeekKey::TableRowId(rowid_key) = key else { + unreachable!("table seek key should be a rowid"); }; - return Ok(CursorResult::Ok((Some(rowid), Some(record)))); + let found = match op { + SeekOp::GT => *cell_rowid > rowid_key, + SeekOp::GE => *cell_rowid >= rowid_key, + SeekOp::EQ => *cell_rowid == rowid_key, + }; + self.stack.advance(); + if found { + let record = crate::storage::sqlite3_ondisk::read_record(payload)?; + return Ok(CursorResult::Ok((Some(*cell_rowid), Some(record)))); + } + } + BTreeCell::IndexLeafCell(IndexLeafCell { payload, .. }) => { + let SeekKey::IndexKey(index_key) = key else { + unreachable!("index seek key should be a record"); + }; + let record = crate::storage::sqlite3_ondisk::read_record(payload)?; + let found = match op { + SeekOp::GT => record > *index_key, + SeekOp::GE => record >= *index_key, + SeekOp::EQ => record == *index_key, + }; + self.stack.advance(); + if found { + let rowid = match record.values.last() { + Some(OwnedValue::Integer(rowid)) => *rowid as u64, + _ => unreachable!("index cells should have an integer rowid"), + }; + return Ok(CursorResult::Ok((Some(rowid), Some(record)))); + } + } + cell_type => { + unreachable!("unexpected cell type: {:?}", cell_type); } - } - cell_type => { - unreachable!("unexpected cell type: {:?}", cell_type); } } } @@ -358,16 +388,17 @@ impl BTreeCursor { } fn move_to_root(&mut self) { - self.page - .replace(Some(Rc::new(MemPage::new(None, self.root_page, 0)))); + let mem_page = self.pager.read_page(self.root_page).unwrap(); + self.stack.clear(); + self.stack.push(mem_page); } fn move_to_rightmost(&mut self) -> Result> { self.move_to_root(); loop { - let mem_page = self.page.borrow().as_ref().unwrap().clone(); - let page_idx = mem_page.page_idx; + let mem_page = self.stack.top(); + let page_idx = mem_page.borrow().id; let page = self.pager.read_page(page_idx)?; let page = RefCell::borrow(&page); if page.is_locked() { @@ -377,17 +408,16 @@ impl BTreeCursor { let page = page.as_ref().unwrap(); if page.is_leaf() { if page.cell_count() > 0 { - mem_page.cell_idx.replace(page.cell_count() - 1); + self.stack.set_cell_index(page.cell_count() - 1); } return Ok(CursorResult::Ok(())); } match page.rightmost_pointer() { Some(right_most_pointer) => { - mem_page.cell_idx.replace(page.cell_count()); - let mem_page = - MemPage::new(Some(mem_page.clone()), right_most_pointer as usize, 0); - self.page.replace(Some(Rc::new(mem_page))); + self.stack.set_cell_index(page.cell_count() + 1); + let mem_page = self.pager.read_page(right_most_pointer as usize).unwrap(); + self.stack.push(mem_page); continue; } @@ -425,27 +455,25 @@ impl BTreeCursor { self.move_to_root(); loop { - let mem_page = self.get_mem_page(); - let page_idx = mem_page.page_idx; - let page = self.pager.read_page(page_idx)?; - let page = RefCell::borrow(&page); + let page_rc = self.stack.top(); + let page = RefCell::borrow(&page_rc); if page.is_locked() { return Ok(CursorResult::IO); } - let page = page.contents.read().unwrap(); - let page = page.as_ref().unwrap(); - if page.is_leaf() { + let contents = page.contents.read().unwrap(); + let contents = contents.as_ref().unwrap(); + if contents.is_leaf() { return Ok(CursorResult::Ok(())); } let mut found_cell = false; - for cell_idx in 0..page.cell_count() { - match &page.cell_get( + for cell_idx in 0..contents.cell_count() { + match &contents.cell_get( cell_idx, self.pager.clone(), - self.max_local(page.page_type()), - self.min_local(page.page_type()), + self.max_local(contents.page_type()), + self.min_local(contents.page_type()), self.usable_space(), )? { BTreeCell::TableInteriorCell(TableInteriorCell { @@ -455,16 +483,15 @@ impl BTreeCursor { let SeekKey::TableRowId(rowid_key) = key else { unreachable!("table seek key should be a rowid"); }; - mem_page.advance(); let target_leaf_page_is_in_left_subtree = match cmp { SeekOp::GT => rowid_key < *_rowid, SeekOp::GE => rowid_key <= *_rowid, SeekOp::EQ => rowid_key <= *_rowid, }; + self.stack.advance(); if target_leaf_page_is_in_left_subtree { - let mem_page = - MemPage::new(Some(mem_page.clone()), *_left_child_page as usize, 0); - self.page.replace(Some(Rc::new(mem_page))); + let mem_page = self.pager.read_page(*_left_child_page as usize)?; + self.stack.push(mem_page); found_cell = true; break; } @@ -493,13 +520,13 @@ impl BTreeCursor { SeekOp::EQ => index_key <= &record, }; if target_leaf_page_is_in_the_left_subtree { - let mem_page = - MemPage::new(Some(mem_page.clone()), *left_child_page as usize, 0); - self.page.replace(Some(Rc::new(mem_page))); + // we don't advance in case of index tree internal nodes because we will visit this node going up + let mem_page = self.pager.read_page(*left_child_page as usize).unwrap(); + self.stack.push(mem_page); found_cell = true; break; } else { - mem_page.advance(); + self.stack.advance(); } } BTreeCell::IndexLeafCell(_) => { @@ -511,11 +538,11 @@ impl BTreeCursor { } if !found_cell { - let parent = mem_page.parent.clone(); - match page.rightmost_pointer() { + match contents.rightmost_pointer() { Some(right_most_pointer) => { - let mem_page = MemPage::new(parent, right_most_pointer as usize, 0); - self.page.replace(Some(Rc::new(mem_page))); + self.stack.advance(); + let mem_page = self.pager.read_page(right_most_pointer as usize).unwrap(); + self.stack.push(mem_page); continue; } None => { @@ -526,7 +553,7 @@ impl BTreeCursor { } } - fn insert_to_page( + fn insert_into_page( &mut self, key: &OwnedValue, record: &OwnedRecord, @@ -535,7 +562,7 @@ impl BTreeCursor { let state = &self.write_info.state; match state { WriteState::Start => { - let page_ref = self.get_current_page()?; + let page_ref = self.stack.top(); let int_key = match key { OwnedValue::Integer(i) => *i as u64, _ => unreachable!("btree tables are indexed by integers!"), @@ -577,10 +604,6 @@ impl BTreeCursor { }; if overflow > 0 { self.write_info.state = WriteState::BalanceStart; - self.write_info.current_page.borrow_mut().replace(( - self.page.borrow().as_ref().unwrap().clone(), - page_ref.clone(), - )); } else { self.write_info.state = WriteState::Finish; } @@ -711,17 +734,6 @@ impl BTreeCursor { page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1); } - fn get_current_page(&mut self) -> crate::Result>> { - let mem_page = { - let mem_page = self.page.borrow(); - let mem_page = mem_page.as_ref().unwrap(); - mem_page.clone() - }; - let page_idx = mem_page.page_idx; - let page_ref = self.pager.read_page(page_idx)?; - Ok(page_ref) - } - /// This is a naive algorithm that doesn't try to distribute cells evenly by content. /// It will try to split the page in half by keys not by content. /// Sqlite tries to have a page at least 40% full. @@ -729,12 +741,16 @@ impl BTreeCursor { let state = &self.write_info.state; match state { WriteState::BalanceStart => { - let current_page = self.write_info.current_page.borrow(); - let mem_page = ¤t_page.as_ref().unwrap().0; + // drop divider cells and find right pointer + // NOTE: since we are doing a simple split we only finding the pointer we want to update (right pointer). + // Right pointer means cell that points to the last page, as we don't really want to drop this one. This one + // can be a "rightmost pointer" or a "cell". + // TODO(pere): simplify locking... + // we always asumme there is a parent + let current_page = self.stack.top(); + let page_rc = RefCell::borrow(¤t_page); { // check if we don't need to balance - let page_ref = ¤t_page.as_ref().unwrap().1; - let page_rc = RefCell::borrow(&page_ref); { // don't continue if there are no overflow cells @@ -747,15 +763,13 @@ impl BTreeCursor { } } - if mem_page.parent.is_none() { + if !self.stack.has_parent() { + drop(page_rc); drop(current_page); self.balance_root(); return Ok(CursorResult::Ok(())); } - debug!("Balancing leaf. leaf={}", mem_page.page_idx); - - let page_ref = ¤t_page.as_ref().unwrap().1; - let page_rc = RefCell::borrow(&page_ref); + debug!("Balancing leaf. leaf={}", page_rc.id); // Copy of page used to reference cell bytes. let page_copy = { @@ -808,59 +822,48 @@ impl BTreeCursor { self.write_info .new_pages .borrow_mut() - .push((mem_page.clone(), page_ref.clone())); - self.write_info.new_pages.borrow_mut().push(( - Rc::new(MemPage::new(mem_page.parent.clone(), right_page_id, 0)), - right_page_ref.clone(), - )); + .push(current_page.clone()); + self.write_info + .new_pages + .borrow_mut() + .push(right_page_ref.clone()); - let new_pages_ids = [mem_page.page_idx, right_page_id]; debug!( "splitting left={} right={}", - new_pages_ids[0], new_pages_ids[1] + self.stack.current(), + right_page_id ); - // drop divider cells and find right pointer - // NOTE: since we are doing a simple split we only finding the pointer we want to update (right pointer). - // Right pointer means cell that points to the last page, as we don't really want to drop this one. This one - // can be a "rightmost pointer" or a "cell". - // TODO(pere): simplify locking... - // we always asumme there is a parent self.write_info.state = WriteState::BalanceGetParentPage; - return Ok(CursorResult::Ok(())); + Ok(CursorResult::Ok(())) } WriteState::BalanceGetParentPage => { - let current_page = self.write_info.current_page.borrow(); - let mem_page = ¤t_page.as_ref().unwrap().0; + let parent_rc = self.stack.parent(); + let loaded = parent_rc.borrow().is_loaded(); + let locked = parent_rc.borrow().is_locked(); - let parent_rc = mem_page.parent.as_ref().unwrap(); - let parent_ref = self.pager.read_page(parent_rc.page_idx)?; - if !RefCell::borrow(&parent_ref).is_locked() { - self.write_info.state = WriteState::BalanceMoveUp; - self.write_info - .parent_page - .borrow_mut() - .replace((parent_rc.clone(), parent_ref.clone())); - Ok(CursorResult::Ok(())) - } else { + if locked { Ok(CursorResult::IO) + } else { + if !loaded { + debug!("balance_leaf(loading page {} {})", locked, loaded); + self.pager.load_page(parent_rc.clone())?; + return Ok(CursorResult::IO); + } + parent_rc.borrow_mut().set_dirty(); + self.write_info.state = WriteState::BalanceMoveUp; + Ok(CursorResult::Ok(())) } } WriteState::BalanceMoveUp => { - let parent = self.write_info.parent_page.borrow(); - let parent_entry = parent.as_ref().unwrap(); - let parent_ref = &parent_entry.1; + let parent_ref = self.stack.parent(); let parent = RefCell::borrow_mut(&parent_ref); let (page_type, current_idx) = { - let current_page = self.write_info.current_page.borrow(); - let pagerc = current_page.as_ref().unwrap(); - let page = RefCell::borrow(&pagerc.1); - let page = page.contents.read().unwrap(); - ( - page.as_ref().unwrap().page_type().clone(), - pagerc.0.page_idx, - ) + let current_page = self.stack.top(); + let page_ref = current_page.borrow(); + let page = page_ref.contents.read().unwrap(); + (page.as_ref().unwrap().page_type().clone(), page_ref.id) }; parent.set_dirty(); @@ -904,8 +907,9 @@ impl BTreeCursor { let scratch_cells = self.write_info.scratch_cells.borrow(); // reset pages - for (_, page) in new_pages.iter() { + for page in new_pages.iter() { let page = page.borrow_mut(); + assert!(page.is_dirty()); let mut page = page.contents.write().unwrap(); let page = page.as_mut().unwrap(); @@ -927,7 +931,7 @@ impl BTreeCursor { let mut current_cell_index = 0_usize; let mut divider_cells_index = Vec::new(); /* index to scratch cells that will be used as dividers in order */ - for (i, (_, page)) in new_pages.iter_mut().enumerate() { + for (i, page) in new_pages.iter_mut().enumerate() { let page = page.borrow_mut(); let mut page = page.contents.write().unwrap(); let page = page.as_mut().unwrap(); @@ -949,15 +953,15 @@ impl BTreeCursor { current_cell_index += cells_to_copy; } let is_leaf = { - let page = self.write_info.current_page.borrow(); - let page = RefCell::borrow(&page.as_ref().unwrap().1); + let page = self.stack.top(); + let page = page.borrow(); let page = page.contents.read().unwrap(); page.as_ref().unwrap().is_leaf() }; // update rightmost pointer for each page if we are in interior page if !is_leaf { - for (_, page) in new_pages.iter_mut().take(new_pages_len - 1) { + for page in new_pages.iter_mut().take(new_pages_len - 1) { let page = page.borrow_mut(); let mut page = page.contents.write().unwrap(); let page = page.as_mut().unwrap(); @@ -981,7 +985,6 @@ impl BTreeCursor { } // last page right most pointer points to previous right most pointer before splitting let last_page = new_pages.last().unwrap(); - let last_page = &last_page.1; let last_page = RefCell::borrow(&last_page); let mut last_page = last_page.contents.write().unwrap(); let last_page = last_page.as_mut().unwrap(); @@ -993,22 +996,22 @@ impl BTreeCursor { // insert dividers in parent // we can consider dividers the first cell of each page starting from the second page - for (page_id_index, (mem_page, page)) in + for (page_id_index, page) in new_pages.iter_mut().take(new_pages_len - 1).enumerate() { let page = page.borrow_mut(); - let mut page = page.contents.write().unwrap(); - let page = page.as_mut().unwrap(); - assert!(page.cell_count() > 1); + let mut contents = page.contents.write().unwrap(); + let contents = contents.as_mut().unwrap(); + assert!(contents.cell_count() > 1); let divider_cell_index = divider_cells_index[page_id_index]; let cell_payload = scratch_cells[divider_cell_index]; let cell = read_btree_cell( cell_payload, - &page.page_type(), + &contents.page_type(), 0, self.pager.clone(), - self.max_local(page.page_type()), - self.min_local(page.page_type()), + self.max_local(contents.page_type()), + self.min_local(contents.page_type()), self.usable_space(), ) .unwrap(); @@ -1020,7 +1023,7 @@ impl BTreeCursor { _ => unreachable!(), }; let mut divider_cell = Vec::new(); - divider_cell.extend_from_slice(&(mem_page.page_idx as u32).to_be_bytes()); + divider_cell.extend_from_slice(&(page.id as u32).to_be_bytes()); divider_cell.extend(std::iter::repeat(0).take(9)); let n = write_varint(&mut divider_cell.as_mut_slice()[4..], key); divider_cell.truncate(4 + n); @@ -1036,7 +1039,7 @@ impl BTreeCursor { BTreeCell::TableInteriorCell(interior) => interior._rowid, _ => unreachable!(), }; - let parent_cell_idx = self.find_cell(page, key); + let parent_cell_idx = self.find_cell(contents, key); self.insert_into_cell(parent_contents, cell_payload, parent_cell_idx); // self.drop_cell(*page, 0); } @@ -1044,15 +1047,12 @@ impl BTreeCursor { { // copy last page id to right pointer - let last_pointer = new_pages.last().unwrap().0.page_idx as u32; + let last_pointer = new_pages.last().unwrap().borrow().id as u32; parent_contents.write_u32(right_pointer, last_pointer); } - self.page = RefCell::new(Some(parent_entry.0.clone())); - self.write_info - .current_page - .replace(Some(parent_entry.clone())); + self.stack.pop(); self.write_info.state = WriteState::BalanceStart; - self.write_info.page_copy.replace(None); + let _ = self.write_info.page_copy.take(); Ok(CursorResult::Ok(())) } @@ -1079,11 +1079,10 @@ impl BTreeCursor { /* swap splitted page buffer with new root buffer so we don't have to update page idx */ { let (root_id, child_id, child) = { - let page = self.write_info.current_page.borrow(); - let page_ref = &page.as_ref().unwrap().1; + let page_ref = self.stack.top(); let child = page_ref.clone(); - let mut page_rc = RefCell::borrow_mut(page_ref); - let mut new_root_page = RefCell::borrow_mut(&new_root_page_ref); + let mut page_rc = page_ref.borrow_mut(); + let mut new_root_page = new_root_page_ref.borrow_mut(); // Swap the entire Page structs std::mem::swap(&mut page_rc.id, &mut new_root_page.id); @@ -1093,19 +1092,16 @@ impl BTreeCursor { (new_root_page.id, page_rc.id, child) }; + debug!("Balancing root. root={}, rightmost={}", root_id, child_id); let root = new_root_page_ref.clone(); - let parent = Some(Rc::new(MemPage::new(None, root_id, 0))); - let current_mem_page = Rc::new(MemPage::new(parent, child_id, 0)); - self.page = RefCell::new(Some(current_mem_page.clone())); + self.root_page = root_id; + self.stack.clear(); + self.stack.push(root.clone()); + self.stack.push(child.clone()); - self.write_info - .current_page - .replace(Some((current_mem_page, child.clone()))); - - debug!("Balancing root. root={}, rightmost={}", root_id, child_id); - self.pager.put_page(root_id, root); - self.pager.put_page(child_id, child); + self.pager.put_loaded_page(root_id, root); + self.pager.put_loaded_page(child_id, child); } } @@ -1115,6 +1111,7 @@ impl BTreeCursor { { // setup btree page let contents = RefCell::borrow(&page); + debug!("allocating page {}", contents.id); let mut contents = contents.contents.write().unwrap(); let contents = contents.as_mut().unwrap(); let id = page_type as u8; @@ -1336,12 +1333,6 @@ impl BTreeCursor { nfree as u16 } - fn get_mem_page(&self) -> Rc { - let mem_page = self.page.borrow(); - let mem_page = mem_page.as_ref().unwrap(); - mem_page.clone() - } - fn fill_cell_payload( &self, page_type: PageType, @@ -1482,6 +1473,84 @@ impl BTreeCursor { } } +impl PageStack { + fn push(&self, page: Rc>) { + debug!( + "pagestack::push(current={}, new_page_id={})", + self.current_page.borrow(), + page.borrow().id + ); + *self.current_page.borrow_mut() += 1; + let current = *self.current_page.borrow(); + assert!( + current < BTCURSOR_MAX_DEPTH as i32, + "corrupted database, stack is bigger than expected" + ); + self.stack.borrow_mut()[current as usize] = Some(page); + self.cell_indices.borrow_mut()[current as usize] = 0; + } + + fn pop(&self) { + let current = *self.current_page.borrow(); + debug!("pagestack::pop(current={})", current); + self.cell_indices.borrow_mut()[current as usize] = 0; + self.stack.borrow_mut()[current as usize] = None; + *self.current_page.borrow_mut() -= 1; + } + + fn top(&self) -> Rc> { + let current = *self.current_page.borrow(); + let page = self.stack.borrow()[current as usize] + .as_ref() + .unwrap() + .clone(); + debug!( + "pagestack::top(current={}, page_id={})", + current, + page.borrow().id + ); + page + } + + fn parent(&self) -> Rc> { + let current = *self.current_page.borrow(); + self.stack.borrow()[current as usize - 1] + .as_ref() + .unwrap() + .clone() + } + + /// Current page pointer being used + fn current(&self) -> usize { + *self.current_page.borrow() as usize + } + + /// Cell index of the current page + fn current_index(&self) -> usize { + let current = self.current(); + self.cell_indices.borrow()[current] + } + + /// Advance the current cell index of the current page to the next cell. + fn advance(&self) { + let current = self.current(); + self.cell_indices.borrow_mut()[current] += 1; + } + + fn set_cell_index(&self, idx: usize) { + let current = self.current(); + self.cell_indices.borrow_mut()[current] = idx; + } + + fn has_parent(&self) -> bool { + *self.current_page.borrow() > 0 + } + + fn clear(&self) { + *self.current_page.borrow_mut() = -1; + } +} + fn find_free_cell(page_ref: &PageContent, db_header: Ref, amount: usize) -> usize { // NOTE: freelist is in ascending order of keys and pc // unuse_space is reserved bytes at the end of page, therefore we must substract from maxpc @@ -1534,8 +1603,8 @@ impl Cursor for BTreeCursor { } fn rewind(&mut self) -> Result> { - let mem_page = MemPage::new(None, self.root_page, 0); - self.page.replace(Some(Rc::new(mem_page))); + self.move_to_root(); + match self.get_next_record(None)? { CursorResult::Ok((rowid, next)) => { self.rowid.replace(rowid); @@ -1598,7 +1667,7 @@ impl Cursor for BTreeCursor { }; } - match self.insert_to_page(key, _record)? { + match self.insert_into_page(key, _record)? { CursorResult::Ok(_) => Ok(CursorResult::Ok(())), CursorResult::IO => Ok(CursorResult::IO), } @@ -1621,9 +1690,10 @@ impl Cursor for BTreeCursor { CursorResult::Ok(_) => {} CursorResult::IO => return Ok(CursorResult::IO), }; - let page_ref = self.get_current_page()?; + let page_ref = self.stack.top(); let page = RefCell::borrow(&page_ref); if page.is_locked() { + // TODO(pere); request load return Ok(CursorResult::IO); } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index bbc902ea1..c8926c147 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -3,7 +3,7 @@ use crate::storage::database::DatabaseStorage; use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent}; use crate::storage::wal::Wal; use crate::{Buffer, Result}; -use log::trace; +use log::{debug, trace}; use sieve_cache::SieveCache; use std::cell::RefCell; use std::collections::{HashMap, HashSet}; @@ -29,12 +29,8 @@ const PAGE_LOCKED: usize = 0b010; const PAGE_ERROR: usize = 0b100; /// Page is dirty. Flush needed. const PAGE_DIRTY: usize = 0b1000; - -impl Default for Page { - fn default() -> Self { - Self::new(0) - } -} +/// Page's contents are loaded in memory. +const PAGE_LOADED: usize = 0b10000; impl Page { pub fn new(id: usize) -> Page { @@ -92,6 +88,19 @@ impl Page { pub fn clear_dirty(&self) { self.flags.fetch_and(!PAGE_DIRTY, Ordering::SeqCst); } + + pub fn is_loaded(&self) -> bool { + self.flags.load(Ordering::SeqCst) & PAGE_LOADED != 0 + } + + pub fn set_loaded(&self) { + self.flags.fetch_or(PAGE_LOADED, Ordering::SeqCst); + } + + pub fn clear_loaded(&self) { + log::debug!("clear loaded {}", self.id); + self.flags.fetch_and(!PAGE_LOADED, Ordering::SeqCst); + } } #[allow(dead_code)] @@ -125,8 +134,13 @@ impl DumbLruPageCache { } } + pub fn contains_key(&mut self, key: usize) -> bool { + self.map.borrow().contains_key(&key) + } + pub fn insert(&mut self, key: usize, value: Rc>) { - self.delete(key); + debug!("cache_insert(key={})", key); + self._delete(key, false); let mut entry = Box::new(PageCacheEntry { key, next: None, @@ -144,6 +158,11 @@ impl DumbLruPageCache { } pub fn delete(&mut self, key: usize) { + self._delete(key, true) + } + + pub fn _delete(&mut self, key: usize, clean_page: bool) { + debug!("cache_delete(key={}, clean={})", key, clean_page); let ptr = self.map.borrow_mut().remove(&key); if ptr.is_none() { return; @@ -151,7 +170,7 @@ impl DumbLruPageCache { let mut ptr = ptr.unwrap(); { let ptr = unsafe { ptr.as_mut() }; - self.detach(ptr); + self.detach(ptr, clean_page); } unsafe { drop_in_place(ptr.as_ptr()) }; } @@ -163,11 +182,12 @@ impl DumbLruPageCache { } pub fn get(&mut self, key: &usize) -> Option>> { + debug!("cache_get(key={})", key); let ptr = self.get_ptr(*key); ptr?; let ptr = unsafe { ptr.unwrap().as_mut() }; let page = ptr.page.clone(); - self.detach(ptr); + //self.detach(ptr); self.touch(ptr); Some(page) } @@ -177,9 +197,18 @@ impl DumbLruPageCache { todo!(); } - fn detach(&mut self, entry: &mut PageCacheEntry) { + fn detach(&mut self, entry: &mut PageCacheEntry, clean_page: bool) { let mut current = entry.as_non_null(); + if clean_page { + // evict buffer + let page = entry.page.borrow_mut(); + page.clear_loaded(); + debug!("cleaning up page {}", page.id); + let mut contents = page.contents.write().unwrap(); + let _ = contents.as_mut().take(); + } + let (next, prev) = unsafe { let c = current.as_mut(); let next = c.next; @@ -230,7 +259,7 @@ impl DumbLruPageCache { // TODO: drop from another clean entry? return; } - self.detach(tail); + self.detach(tail, true); } fn clear(&mut self) { @@ -275,6 +304,12 @@ enum FlushState { WaitSyncDbFile, } +#[derive(Clone, Debug)] +enum CheckpointState { + Checkpoint, + CheckpointDone, +} + /// This will keep track of the state of current cache flush in order to not repeat work struct FlushInfo { state: FlushState, @@ -300,6 +335,8 @@ pub struct Pager { db_header: Rc>, flush_info: RefCell, + checkpoint_state: RefCell, + checkpoint_inflight: Rc>, syncing: Rc>, } @@ -333,6 +370,8 @@ impl Pager { in_flight_writes: Rc::new(RefCell::new(0)), }), syncing: Rc::new(RefCell::new(false)), + checkpoint_state: RefCell::new(CheckpointState::Checkpoint), + checkpoint_inflight: Rc::new(RefCell::new(0)), }) } @@ -347,7 +386,10 @@ impl Pager { } pub fn end_tx(&self) -> Result { - self.cacheflush()?; + match self.cacheflush()? { + CheckpointStatus::Done => {} + CheckpointStatus::IO => return Ok(CheckpointStatus::IO), + }; self.wal.borrow().end_read_tx()?; Ok(CheckpointStatus::Done) } @@ -357,6 +399,7 @@ impl Pager { trace!("read_page(page_idx = {})", page_idx); let mut page_cache = self.page_cache.borrow_mut(); if let Some(page) = page_cache.get(&page_idx) { + trace!("read_page(page_idx = {}) = cached", page_idx); return Ok(page.clone()); } let page = Rc::new(RefCell::new(Page::new(page_idx))); @@ -369,6 +412,8 @@ impl Pager { let page = page.borrow_mut(); page.set_uptodate(); } + // TODO(pere) ensure page is inserted, we should probably first insert to page cache + // and if successful, read frame or page page_cache.insert(page_idx, page.clone()); return Ok(page); } @@ -378,10 +423,44 @@ impl Pager { page.clone(), page_idx, )?; + // TODO(pere) ensure page is inserted page_cache.insert(page_idx, page.clone()); Ok(page) } + /// Loads pages if not loaded + pub fn load_page(&self, page: Rc>) -> Result<()> { + let id = page.borrow().id; + trace!("load_page(page_idx = {})", id); + let mut page_cache = self.page_cache.borrow_mut(); + page.borrow_mut().set_locked(); + if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? { + self.wal + .borrow() + .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; + { + let page = page.borrow_mut(); + page.set_uptodate(); + } + // TODO(pere) ensure page is inserted + if !page_cache.contains_key(id) { + page_cache.insert(id, page.clone()); + } + return Ok(()); + } + sqlite3_ondisk::begin_read_page( + self.page_io.clone(), + self.buffer_pool.clone(), + page.clone(), + id, + )?; + // TODO(pere) ensure page is inserted + if !page_cache.contains_key(id) { + page_cache.insert(id, page.clone()); + } + Ok(()) + } + /// Writes the database header. pub fn write_database_header(&self, header: &DatabaseHeader) { sqlite3_ondisk::begin_write_database_header(header, self).expect("failed to write header"); @@ -405,8 +484,15 @@ impl Pager { FlushState::Start => { let db_size = self.db_header.borrow().database_size; for page_id in self.dirty_pages.borrow().iter() { + debug!("appending frame {}", page_id); let mut cache = self.page_cache.borrow_mut(); let page = cache.get(page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + { + let contents = page.borrow(); + let contents = contents.contents.read().unwrap(); + let contents = contents.as_ref().unwrap(); + debug!("appending frame {} {:?}", page_id, contents.page_type()); + } self.wal.borrow_mut().append_frame( page.clone(), db_size, @@ -419,12 +505,11 @@ impl Pager { return Ok(CheckpointStatus::IO); } FlushState::Checkpoint => { - let in_flight = self.flush_info.borrow().in_flight_writes.clone(); - match self.wal.borrow_mut().checkpoint(self, in_flight)? { - CheckpointStatus::IO => return Ok(CheckpointStatus::IO), + match self.checkpoint()? { CheckpointStatus::Done => { - self.flush_info.borrow_mut().state = FlushState::CheckpointDone; + self.flush_info.borrow_mut().state = FlushState::SyncDbFile; } + CheckpointStatus::IO => return Ok(CheckpointStatus::IO), }; } FlushState::CheckpointDone => { @@ -467,6 +552,34 @@ impl Pager { Ok(CheckpointStatus::Done) } + pub fn checkpoint(&self) -> Result { + loop { + let state = self.checkpoint_state.borrow().clone(); + log::trace!("checkpoint(state={:?})", state); + match state { + CheckpointState::Checkpoint => { + let in_flight = self.checkpoint_inflight.clone(); + match self.wal.borrow_mut().checkpoint(self, in_flight)? { + CheckpointStatus::IO => return Ok(CheckpointStatus::IO), + CheckpointStatus::Done => { + self.checkpoint_state + .replace(CheckpointState::CheckpointDone); + } + }; + } + CheckpointState::CheckpointDone => { + let in_flight = self.checkpoint_inflight.clone(); + if *in_flight.borrow() > 0 { + return Ok(CheckpointStatus::IO); + } else { + self.checkpoint_state.replace(CheckpointState::Checkpoint); + return Ok(CheckpointStatus::Done); + } + } + } + } + } + // WARN: used for testing purposes pub fn clear_page_cache(&self) { loop { @@ -539,9 +652,11 @@ impl Pager { Ok(page_ref) } - pub fn put_page(&self, id: usize, page: Rc>) { + pub fn put_loaded_page(&self, id: usize, page: Rc>) { let mut cache = RefCell::borrow_mut(&self.page_cache); - cache.insert(id, page); + // cache insert invalidates previous page + cache.insert(id, page.clone()); + page.borrow_mut().set_loaded(); } pub fn usable_size(&self) -> usize { diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 6c0b4b376..a581b84bc 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -48,7 +48,7 @@ use crate::storage::database::DatabaseStorage; use crate::storage::pager::{Page, Pager}; use crate::types::{OwnedRecord, OwnedValue}; use crate::{File, Result}; -use log::trace; +use log::{debug, trace}; use std::cell::RefCell; use std::pin::Pin; use std::rc::Rc; @@ -525,6 +525,7 @@ fn finish_read_page( page.contents.write().unwrap().replace(inner); page.set_uptodate(); page.clear_locked(); + page.set_loaded(); } Ok(()) } @@ -538,6 +539,7 @@ pub fn begin_write_btree_page( let page_finish = page.clone(); let page_id = page.borrow().id; + log::trace!("begin_write_btree_page(page_id={})", page_id); let buffer = { let page = page.borrow(); let contents = page.contents.read().unwrap(); @@ -549,6 +551,7 @@ pub fn begin_write_btree_page( let write_complete = { let buf_copy = buffer.clone(); Box::new(move |bytes_written: i32| { + log::trace!("finish_write_btree_page"); let buf_copy = buf_copy.clone(); let buf_len = buf_copy.borrow().len(); *write_counter.borrow_mut() -= 1; @@ -959,7 +962,7 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec) { pub fn begin_read_wal_header(io: &Rc) -> Result>> { let drop_fn = Rc::new(|_buf| {}); - let buf = Rc::new(RefCell::new(Buffer::allocate(WAL_HEADER_SIZE, drop_fn))); + let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn))); let result = Rc::new(RefCell::new(WalHeader::default())); let header = result.clone(); let complete = Box::new(move |buf: Rc>| { @@ -1017,6 +1020,7 @@ pub fn begin_write_wal_frame( ) -> Result<()> { let page_finish = page.clone(); let page_id = page.borrow().id; + trace!("begin_write_wal_frame(offset={}, page={})", offset, page_id); let header = WalFrameHeader { page_number: page_id as u32, @@ -1038,12 +1042,12 @@ pub fn begin_write_wal_frame( ); let buf = buffer.as_mut_slice(); - buf[0..4].copy_from_slice(&header.page_number.to_ne_bytes()); - buf[4..8].copy_from_slice(&header.db_size.to_ne_bytes()); - buf[8..12].copy_from_slice(&header.salt_1.to_ne_bytes()); - buf[12..16].copy_from_slice(&header.salt_2.to_ne_bytes()); - buf[16..20].copy_from_slice(&header.checksum_1.to_ne_bytes()); - buf[20..24].copy_from_slice(&header.checksum_2.to_ne_bytes()); + buf[0..4].copy_from_slice(&header.page_number.to_be_bytes()); + buf[4..8].copy_from_slice(&header.db_size.to_be_bytes()); + buf[8..12].copy_from_slice(&header.salt_1.to_be_bytes()); + buf[12..16].copy_from_slice(&header.salt_2.to_be_bytes()); + buf[16..20].copy_from_slice(&header.checksum_1.to_be_bytes()); + buf[20..24].copy_from_slice(&header.checksum_2.to_be_bytes()); buf[WAL_FRAME_HEADER_SIZE..].copy_from_slice(&contents.as_ptr()); Rc::new(RefCell::new(buffer)) @@ -1052,6 +1056,7 @@ pub fn begin_write_wal_frame( *write_counter.borrow_mut() += 1; let write_complete = { let buf_copy = buffer.clone(); + log::info!("finished"); Box::new(move |bytes_written: i32| { let buf_copy = buf_copy.clone(); let buf_len = buf_copy.borrow().len(); @@ -1072,7 +1077,7 @@ pub fn begin_write_wal_header(io: &Rc, header: &WalHeader) -> Result<( let buffer = { let drop_fn = Rc::new(|_buf| {}); - let mut buffer = Buffer::allocate(WAL_HEADER_SIZE, drop_fn); + let mut buffer = Buffer::allocate(512, drop_fn); let buf = buffer.as_mut_slice(); buf[0..4].copy_from_slice(&header.magic); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 8bc55f910..1e4e6f313 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1,6 +1,8 @@ use std::collections::{HashMap, HashSet}; use std::{cell::RefCell, rc::Rc, sync::Arc}; +use log::{debug, trace}; + use crate::io::{File, SyncCompletion, IO}; use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, @@ -115,10 +117,11 @@ impl Wal for WalFile { page: Rc>, buffer_pool: Rc, ) -> Result<()> { + debug!("read_frame({})", frame_id); let offset = self.frame_offset(frame_id); begin_read_wal_frame( self.file.borrow().as_ref().unwrap(), - offset, + offset + WAL_FRAME_HEADER_SIZE, buffer_pool, page, )?; @@ -137,6 +140,12 @@ impl Wal for WalFile { let page_id = page.borrow().id; let frame_id = *self.max_frame.borrow(); let offset = self.frame_offset(frame_id); + trace!( + "append_frame(frame={}, offset={}, page_id={})", + frame_id, + offset, + page_id + ); begin_write_wal_frame( self.file.borrow().as_ref().unwrap(), offset, @@ -246,7 +255,7 @@ impl WalFile { if self.file.borrow().is_none() { match self .io - .open_file(&self.wal_path, crate::io::OpenFlags::Create) + .open_file(&self.wal_path, crate::io::OpenFlags::Create, false) { Ok(file) => { if file.size()? > 0 { @@ -285,7 +294,7 @@ impl WalFile { let header = header.as_ref().unwrap().borrow(); let page_size = header.page_size; let page_offset = frame_id * (page_size as u64 + WAL_FRAME_HEADER_SIZE as u64); - let offset = WAL_HEADER_SIZE as u64 + WAL_FRAME_HEADER_SIZE as u64 + page_offset; + let offset = WAL_HEADER_SIZE as u64 + page_offset; offset as usize } } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 0c292ff85..609b8bfb3 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -1098,6 +1098,7 @@ impl Program { ))); } } + log::trace!("Halt auto_commit {}", self.auto_commit); if self.auto_commit { return match pager.end_tx() { Ok(crate::storage::wal::CheckpointStatus::IO) => Ok(StepResult::IO), diff --git a/simulator/main.rs b/simulator/main.rs index fb80dc3b1..ceea26ee3 100644 --- a/simulator/main.rs +++ b/simulator/main.rs @@ -94,8 +94,13 @@ impl SimulatorIO { } impl IO for SimulatorIO { - fn open_file(&self, path: &str, flags: OpenFlags) -> Result> { - let inner = self.inner.open_file(path, flags)?; + fn open_file( + &self, + path: &str, + flags: OpenFlags, + _direct: bool, + ) -> Result> { + let inner = self.inner.open_file(path, flags, false)?; let file = Rc::new(SimulatorFile { inner, fault: RefCell::new(false), diff --git a/test/src/lib.rs b/test/src/lib.rs index 7e8b31443..5e0367206 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -28,9 +28,12 @@ impl TempDatabase { } pub fn connect_limbo(&self) -> Rc { + log::debug!("conneting to limbo"); let db = Database::open_file(self.io.clone(), self.path.to_str().unwrap()).unwrap(); - db.connect() + let conn = db.connect(); + log::debug!("connected to limbo"); + conn } } @@ -51,6 +54,7 @@ mod tests { let list_query = "SELECT * FROM test"; let max_iterations = 10000; for i in 0..max_iterations { + debug!("inserting {} ", i); if (i % 100) == 0 { let progress = (i as f64 / max_iterations as f64) * 100.0; println!("progress {:.1}%", progress); @@ -103,7 +107,6 @@ mod tests { } #[test] - #[ignore] fn test_simple_overflow_page() -> anyhow::Result<()> { let _ = env_logger::try_init(); let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY, t TEXT);"); @@ -169,7 +172,6 @@ mod tests { Ok(()) } - #[ignore] #[test] fn test_sequential_overflow_page() -> anyhow::Result<()> { let _ = env_logger::try_init(); @@ -305,6 +307,78 @@ mod tests { Ok(()) } + #[test] + fn test_wal_restart() -> anyhow::Result<()> { + let _ = env_logger::try_init(); + let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY);"); + // threshold is 1000 by default + + fn insert(i: usize, conn: &Rc, tmp_db: &TempDatabase) -> anyhow::Result<()> { + log::debug!("inserting {}", i); + let insert_query = format!("INSERT INTO test VALUES ({})", i); + match conn.query(insert_query) { + Ok(Some(ref mut rows)) => loop { + match rows.next_row()? { + RowResult::IO => { + tmp_db.io.run_once()?; + } + RowResult::Done => break, + _ => unreachable!(), + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + }; + log::debug!("inserted {}", i); + tmp_db.io.run_once()?; + Ok(()) + } + + fn count(conn: &Rc, tmp_db: &TempDatabase) -> anyhow::Result { + log::debug!("counting"); + let list_query = "SELECT count(x) FROM test"; + loop { + match conn.query(list_query).unwrap() { + Some(ref mut rows) => loop { + match rows.next_row()? { + RowResult::Row(row) => { + let first_value = &row.values[0]; + let count = match first_value { + Value::Integer(i) => *i as i32, + _ => unreachable!(), + }; + log::debug!("counted {}", count); + return Ok(count as usize); + } + RowResult::IO => { + tmp_db.io.run_once()?; + } + RowResult::Done => break, + } + }, + None => {} + } + } + } + + { + let conn = tmp_db.connect_limbo(); + insert(1, &conn, &tmp_db).unwrap(); + assert_eq!(count(&conn, &tmp_db).unwrap(), 1); + } + { + let conn = tmp_db.connect_limbo(); + assert_eq!( + count(&conn, &tmp_db).unwrap(), + 1, + "failed to read from wal from another connection" + ); + } + Ok(()) + } + fn compare_string(a: &String, b: &String) { assert_eq!(a.len(), b.len(), "Strings are not equal in size!"); let a = a.as_bytes(); diff --git a/testing/all.test b/testing/all.test index 171144665..966fbce69 100755 --- a/testing/all.test +++ b/testing/all.test @@ -8,7 +8,6 @@ source $testdir/coalesce.test source $testdir/glob.test source $testdir/join.test source $testdir/insert.test -source $testdir/join.test source $testdir/json.test source $testdir/like.test source $testdir/orderby.test