Merge 'Start multithread support' from Pere Diaz Bou

Since we expect to ensure thread safety between multiple threads in the
future, we extract what is important to be shared between multiple
connections with regards to WAL/Pager.
This is WIP so I just put whatever feels like important behind a RwLock
but expect this to change to Atomics in the future as needed. Maybe even
these locks might disappear because they will be better served with
transaction locks.
This also includes addition of unsafe Sync/Send + UnsafeCell in Pages.

Closes #415
This commit is contained in:
Pere Diaz Bou
2024-12-13 13:22:17 +01:00
11 changed files with 375 additions and 355 deletions

1
Cargo.lock generated
View File

@@ -1151,6 +1151,7 @@ dependencies = [
name = "limbo_core"
version = "0.0.9"
dependencies = [
"bumpalo",
"cfg_block",
"chrono",
"criterion",

View File

@@ -51,6 +51,10 @@ codegen-units = 1
panic = "abort"
lto = true
[profile.bench-profile]
inherits = "release"
debug = true
[profile.dist]
inherits = "release"
lto = "thin"

View File

@@ -1,4 +1,4 @@
use limbo_core::{maybe_init_database_file, OpenFlags, Pager, Result, WalFile};
use limbo_core::{maybe_init_database_file, OpenFlags, Pager, Result, WalFile, WalFileShared};
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
@@ -6,7 +6,7 @@ use wasm_bindgen::prelude::*;
#[wasm_bindgen]
pub struct Database {
db: Rc<limbo_core::Database>,
db: Arc<limbo_core::Database>,
conn: Rc<limbo_core::Connection>,
}
@@ -22,13 +22,21 @@ impl Database {
maybe_init_database_file(&file, &io).unwrap();
let page_io = Rc::new(DatabaseStorage::new(file));
let db_header = Pager::begin_open(page_io.clone()).unwrap();
// ensure db header is there
io.run_once().unwrap();
let wal_path = format!("{}-wal", path);
let wal_shared =
WalFileShared::open_shared(&io, wal_path.as_str(), db_header.borrow().page_size)
.unwrap();
let wal = Rc::new(RefCell::new(WalFile::new(
io.clone(),
wal_path,
db_header.borrow().page_size as usize,
wal_shared.clone(),
)));
let db = limbo_core::Database::open(io, page_io, wal).unwrap();
let db = limbo_core::Database::open(io, page_io, wal, wal_shared).unwrap();
let conn = db.connect();
Database { db, conn }
}

View File

@@ -52,6 +52,7 @@ serde = { version = "1.0", features = ["derive"] }
pest = { version = "2.0", optional = true }
pest_derive = { version = "2.0", optional = true }
rand = "0.8.5"
bumpalo = { version = "3.16.0", features = ["collections", "boxed"] }
[target.'cfg(not(target_family = "windows"))'.dev-dependencies]
pprof = { version = "0.14.0", features = ["criterion", "flamegraph"] }

View File

@@ -21,15 +21,16 @@ use schema::Schema;
use sqlite3_parser::ast;
use sqlite3_parser::{ast::Cmd, lexer::sql::Parser};
use std::cell::Cell;
use std::rc::Weak;
use std::sync::{Arc, OnceLock};
use std::sync::Weak;
use std::sync::{Arc, OnceLock, RwLock};
use std::{cell::RefCell, rc::Rc};
use storage::btree::btree_init_page;
#[cfg(feature = "fs")]
use storage::database::FileStorage;
use storage::pager::allocate_page;
use storage::pager::{allocate_page, DumbLruPageCache};
use storage::sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE};
pub use storage::wal::WalFile;
pub use storage::wal::WalFileShared;
use util::parse_schema_rows;
use translate::optimizer::optimize_plan;
@@ -64,41 +65,52 @@ pub struct Database {
schema: Rc<RefCell<Schema>>,
header: Rc<RefCell<DatabaseHeader>>,
transaction_state: RefCell<TransactionState>,
// Shared structures of a Database are the parts that are common to multiple threads that might
// create DB connections.
shared_page_cache: Arc<RwLock<DumbLruPageCache>>,
shared_wal: Arc<RwLock<WalFileShared>>,
}
impl Database {
#[cfg(feature = "fs")]
pub fn open_file(io: Arc<dyn IO>, path: &str) -> Result<Rc<Database>> {
pub fn open_file(io: Arc<dyn IO>, path: &str) -> Result<Arc<Database>> {
use storage::wal::WalFileShared;
let file = io.open_file(path, io::OpenFlags::Create, true)?;
maybe_init_database_file(&file, &io)?;
let page_io = Rc::new(FileStorage::new(file));
let wal_path = format!("{}-wal", path);
let db_header = Pager::begin_open(page_io.clone())?;
io.run_once()?;
let wal_shared =
WalFileShared::open_shared(&io, wal_path.as_str(), db_header.borrow().page_size)?;
let wal = Rc::new(RefCell::new(WalFile::new(
io.clone(),
wal_path,
db_header.borrow().page_size as usize,
wal_shared.clone(),
)));
Self::open(io, page_io, wal)
Self::open(io, page_io, wal, wal_shared)
}
pub fn open(
io: Arc<dyn IO>,
page_io: Rc<dyn DatabaseStorage>,
wal: Rc<RefCell<dyn Wal>>,
) -> Result<Rc<Database>> {
shared_wal: Arc<RwLock<WalFileShared>>,
) -> Result<Arc<Database>> {
let db_header = Pager::begin_open(page_io.clone())?;
io.run_once()?;
DATABASE_VERSION.get_or_init(|| {
let version = db_header.borrow().version_number;
version.to_string()
});
let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10)));
let pager = Rc::new(Pager::finish_open(
db_header.clone(),
page_io,
wal,
io.clone(),
shared_page_cache.clone(),
)?);
let bootstrap_schema = Rc::new(RefCell::new(Schema::new()));
let conn = Rc::new(Connection {
@@ -113,21 +125,23 @@ impl Database {
parse_schema_rows(rows, &mut schema, io)?;
let schema = Rc::new(RefCell::new(schema));
let header = db_header;
Ok(Rc::new(Database {
Ok(Arc::new(Database {
pager,
schema,
header,
transaction_state: RefCell::new(TransactionState::None),
shared_page_cache,
shared_wal,
}))
}
pub fn connect(self: &Rc<Database>) -> Rc<Connection> {
pub fn connect(self: &Arc<Database>) -> Rc<Connection> {
Rc::new(Connection {
pager: self.pager.clone(),
schema: self.schema.clone(),
header: self.header.clone(),
db: Rc::downgrade(self),
last_insert_rowid: Cell::new(0),
db: Arc::downgrade(self),
})
}
}
@@ -153,8 +167,7 @@ pub fn maybe_init_database_file(file: &Rc<dyn File>, io: &Arc<dyn IO>) -> Result
DATABASE_HEADER_SIZE,
);
let mut page = page1.borrow_mut();
let contents = page.contents.as_mut().unwrap();
let contents = page1.get().contents.as_mut().unwrap();
contents.write_database_header(&db_header);
// write the first page to disk synchronously
let flag_complete = Rc::new(RefCell::new(false));

View File

@@ -1,6 +1,6 @@
use log::debug;
use crate::storage::pager::{Page, Pager};
use crate::storage::pager::Pager;
use crate::storage::sqlite3_ondisk::{
read_btree_cell, read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType,
TableInteriorCell, TableLeafCell,
@@ -12,6 +12,7 @@ use std::cell::{Ref, RefCell};
use std::pin::Pin;
use std::rc::Rc;
use super::pager::PageRef;
use super::sqlite3_ondisk::{
write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, DATABASE_HEADER_SIZE,
};
@@ -67,7 +68,7 @@ enum WriteState {
struct WriteInfo {
state: WriteState,
new_pages: RefCell<Vec<Rc<RefCell<Page>>>>,
new_pages: RefCell<Vec<PageRef>>,
scratch_cells: RefCell<Vec<&'static [u8]>>,
rightmost_pointer: RefCell<Option<u32>>,
page_copy: RefCell<Option<PageContent>>, // this holds the copy a of a page needed for buffer references
@@ -101,7 +102,7 @@ struct PageStack {
/// Pointer to the currenet page being consumed
current_page: RefCell<i32>,
/// List of pages in the stack. Root page will be in index 0
stack: RefCell<[Option<Rc<RefCell<Page>>>; BTCURSOR_MAX_DEPTH + 1]>,
stack: RefCell<[Option<PageRef>; BTCURSOR_MAX_DEPTH + 1]>,
/// List of cell indices in the stack.
/// cell_indices[current_page] is the current cell index being consumed. Similarly
/// cell_indices[current_page-1] is the cell index of the parent of the current page
@@ -143,16 +144,15 @@ impl BTreeCursor {
fn is_empty_table(&mut self) -> Result<CursorResult<bool>> {
let page = self.pager.read_page(self.root_page)?;
let page = RefCell::borrow(&page);
return_if_locked!(page);
let cell_count = page.contents.as_ref().unwrap().cell_count();
let cell_count = page.get().contents.as_ref().unwrap().cell_count();
Ok(CursorResult::Ok(cell_count == 0))
}
fn get_prev_record(&mut self) -> Result<CursorResult<(Option<u64>, Option<OwnedRecord>)>> {
loop {
let mem_page_rc = self.stack.top();
let page = self.stack.top();
let cell_idx = self.stack.current_index();
// moved to current page begin
@@ -177,18 +177,15 @@ impl BTreeCursor {
let cell_idx = cell_idx as usize;
debug!(
"get_prev_record current id={} cell={}",
mem_page_rc.borrow().id,
page.get().id,
cell_idx
);
if mem_page_rc.borrow().is_locked() {
return_if_locked!(page);
if !page.is_loaded() {
self.pager.load_page(page.clone())?;
return Ok(CursorResult::IO);
}
if !mem_page_rc.borrow().is_loaded() {
self.pager.load_page(mem_page_rc.clone())?;
return Ok(CursorResult::IO);
}
let mem_page = mem_page_rc.borrow();
let contents = mem_page.contents.as_ref().unwrap();
let contents = page.get().contents.as_ref().unwrap();
let cell_count = contents.cell_count();
let cell_idx = if cell_idx >= cell_count {
@@ -239,13 +236,13 @@ impl BTreeCursor {
let mem_page_rc = self.stack.top();
let cell_idx = self.stack.current_index() as usize;
debug!("current id={} cell={}", mem_page_rc.borrow().id, cell_idx);
return_if_locked!(mem_page_rc.borrow());
if !mem_page_rc.borrow().is_loaded() {
debug!("current id={} cell={}", mem_page_rc.get().id, cell_idx);
return_if_locked!(mem_page_rc);
if !mem_page_rc.is_loaded() {
self.pager.load_page(mem_page_rc.clone())?;
return Ok(CursorResult::IO);
}
let mem_page = mem_page_rc.borrow();
let mem_page = mem_page_rc.get();
let contents = mem_page.contents.as_ref().unwrap();
@@ -397,11 +394,10 @@ impl BTreeCursor {
return_if_io!(self.move_to(key.clone(), op.clone()));
{
let page_rc = self.stack.top();
let page = page_rc.borrow();
let page = self.stack.top();
return_if_locked!(page);
let contents = page.contents.as_ref().unwrap();
let contents = page.get().contents.as_ref().unwrap();
for cell_idx in 0..contents.cell_count() {
let cell = contents.cell_get(
@@ -491,11 +487,10 @@ impl BTreeCursor {
loop {
let mem_page = self.stack.top();
let page_idx = mem_page.borrow().id;
let page_idx = mem_page.get().id;
let page = self.pager.read_page(page_idx)?;
let page = RefCell::borrow(&page);
return_if_locked!(page);
let contents = page.contents.as_ref().unwrap();
let contents = page.get().contents.as_ref().unwrap();
if contents.is_leaf() {
if contents.cell_count() > 0 {
self.stack.set_cell_index(contents.cell_count() as i32 - 1);
@@ -545,11 +540,10 @@ impl BTreeCursor {
self.move_to_root();
loop {
let page_rc = self.stack.top();
let page = RefCell::borrow(&page_rc);
let page = self.stack.top();
return_if_locked!(page);
let contents = page.contents.as_ref().unwrap();
let contents = page.get().contents.as_ref().unwrap();
if contents.is_leaf() {
return Ok(CursorResult::Ok(()));
}
@@ -649,7 +643,7 @@ impl BTreeCursor {
let state = &self.write_info.state;
match state {
WriteState::Start => {
let page_ref = self.stack.top();
let page = self.stack.top();
let int_key = match key {
OwnedValue::Integer(i) => *i as u64,
_ => unreachable!("btree tables are indexed by integers!"),
@@ -657,13 +651,12 @@ impl BTreeCursor {
// get page and find cell
let (cell_idx, page_type) = {
let mut page = page_ref.borrow_mut();
return_if_locked!(page);
page.set_dirty();
self.pager.add_dirty(page.id);
self.pager.add_dirty(page.get().id);
let page = page.contents.as_mut().unwrap();
let page = page.get().contents.as_mut().unwrap();
assert!(matches!(page.page_type(), PageType::TableLeaf));
// find cell
@@ -679,8 +672,7 @@ impl BTreeCursor {
// insert
let overflow = {
let mut page = page_ref.borrow_mut();
let contents = page.contents.as_mut().unwrap();
let contents = page.get().contents.as_mut().unwrap();
log::debug!(
"insert_into_page(overflow, cell_count={})",
contents.cell_count()
@@ -831,11 +823,10 @@ impl BTreeCursor {
// can be a "rightmost pointer" or a "cell".
// we always asumme there is a parent
let current_page = self.stack.top();
let mut page_rc = current_page.borrow_mut();
{
// check if we don't need to balance
// don't continue if there are no overflow cells
let page = page_rc.contents.as_mut().unwrap();
let page = current_page.get().contents.as_mut().unwrap();
if page.overflow_cells.is_empty() {
self.write_info.state = WriteState::Finish;
return Ok(CursorResult::Ok(()));
@@ -843,17 +834,15 @@ impl BTreeCursor {
}
if !self.stack.has_parent() {
drop(page_rc);
drop(current_page);
self.balance_root();
return Ok(CursorResult::Ok(()));
}
debug!("Balancing leaf. leaf={}", page_rc.id);
debug!("Balancing leaf. leaf={}", current_page.get().id);
// Copy of page used to reference cell bytes.
// This needs to be saved somewhere safe so taht references still point to here,
// this will be store in write_info below
let page_copy = page_rc.contents.as_ref().unwrap().clone();
let page_copy = current_page.get().contents.as_ref().unwrap().clone();
// In memory in order copy of all cells in pages we want to balance. For now let's do a 2 page split.
// Right pointer in interior cells should be converted to regular cells if more than 2 pages are used for balancing.
@@ -880,7 +869,7 @@ impl BTreeCursor {
// allocate new pages and move cells to those new pages
// split procedure
let page = page_rc.contents.as_mut().unwrap();
let page = current_page.get().contents.as_mut().unwrap();
assert!(
matches!(
page.page_type(),
@@ -889,9 +878,8 @@ impl BTreeCursor {
"indexes still not supported "
);
let right_page_ref = self.allocate_page(page.page_type(), 0);
let right_page = right_page_ref.borrow_mut();
let right_page_id = right_page.id;
let right_page = self.allocate_page(page.page_type(), 0);
let right_page_id = right_page.get().id;
self.write_info.new_pages.borrow_mut().clear();
self.write_info
@@ -901,41 +889,43 @@ impl BTreeCursor {
self.write_info
.new_pages
.borrow_mut()
.push(right_page_ref.clone());
.push(right_page.clone());
debug!("splitting left={} right={}", page_rc.id, right_page_id);
debug!(
"splitting left={} right={}",
current_page.get().id,
right_page_id
);
self.write_info.state = WriteState::BalanceGetParentPage;
Ok(CursorResult::Ok(()))
}
WriteState::BalanceGetParentPage => {
let parent_rc = self.stack.parent();
let loaded = parent_rc.borrow().is_loaded();
return_if_locked!(parent_rc.borrow());
let parent = self.stack.parent();
let loaded = parent.is_loaded();
return_if_locked!(parent);
if !loaded {
debug!("balance_leaf(loading page)");
self.pager.load_page(parent_rc.clone())?;
self.pager.load_page(parent.clone())?;
return Ok(CursorResult::IO);
}
parent_rc.borrow_mut().set_dirty();
parent.set_dirty();
self.write_info.state = WriteState::BalanceMoveUp;
Ok(CursorResult::Ok(()))
}
WriteState::BalanceMoveUp => {
let parent_ref = self.stack.parent();
let mut parent = parent_ref.borrow_mut();
let parent = self.stack.parent();
let (page_type, current_idx) = {
let current_page = self.stack.top();
let page_ref = current_page.borrow();
let contents = page_ref.contents.as_ref().unwrap();
(contents.page_type().clone(), page_ref.id)
let contents = current_page.get().contents.as_ref().unwrap();
(contents.page_type().clone(), current_page.get().id)
};
parent.set_dirty();
self.pager.add_dirty(parent.id);
let parent_contents = parent.contents.as_mut().unwrap();
self.pager.add_dirty(parent.get().id);
let parent_contents = parent.get().contents.as_mut().unwrap();
// if this isn't empty next loop won't work
assert_eq!(parent_contents.overflow_cells.len(), 0);
@@ -974,9 +964,8 @@ impl BTreeCursor {
// reset pages
for page in new_pages.iter() {
let mut page = page.borrow_mut();
assert!(page.is_dirty());
let contents = page.contents.as_mut().unwrap();
let contents = page.get().contents.as_mut().unwrap();
contents.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, 0);
contents.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0);
@@ -1005,9 +994,8 @@ impl BTreeCursor {
);
for (i, page) in new_pages.iter_mut().enumerate() {
let mut page = page.borrow_mut();
let page_id = page.id;
let contents = page.contents.as_mut().unwrap();
let page_id = page.get().id;
let contents = page.get().contents.as_mut().unwrap();
let last_page = i == new_pages_len - 1;
let cells_to_copy = if last_page {
@@ -1033,16 +1021,14 @@ impl BTreeCursor {
}
let is_leaf = {
let page = self.stack.top();
let page = page.borrow();
let page = page.contents.as_ref().unwrap();
let page = page.get().contents.as_ref().unwrap();
page.is_leaf()
};
// update rightmost pointer for each page if we are in interior page
if !is_leaf {
for page in new_pages.iter_mut().take(new_pages_len - 1) {
let mut page = page.borrow_mut();
let contents = page.contents.as_mut().unwrap();
let contents = page.get().contents.as_mut().unwrap();
assert!(contents.cell_count() == 1);
let last_cell = contents
@@ -1063,8 +1049,7 @@ impl BTreeCursor {
}
// last page right most pointer points to previous right most pointer before splitting
let last_page = new_pages.last().unwrap();
let mut last_page = last_page.borrow_mut();
let last_page_contents = last_page.contents.as_mut().unwrap();
let last_page_contents = last_page.get().contents.as_mut().unwrap();
last_page_contents.write_u32(
BTREE_HEADER_OFFSET_RIGHTMOST,
self.write_info.rightmost_pointer.borrow().unwrap(),
@@ -1076,8 +1061,7 @@ impl BTreeCursor {
for (page_id_index, page) in
new_pages.iter_mut().take(new_pages_len - 1).enumerate()
{
let mut page = page.borrow_mut();
let contents = page.contents.as_mut().unwrap();
let contents = page.get().contents.as_mut().unwrap();
let divider_cell_index = divider_cells_index[page_id_index];
let cell_payload = scratch_cells[divider_cell_index];
let cell = read_btree_cell(
@@ -1098,7 +1082,7 @@ impl BTreeCursor {
_ => unreachable!(),
};
let mut divider_cell = Vec::new();
divider_cell.extend_from_slice(&(page.id as u32).to_be_bytes());
divider_cell.extend_from_slice(&(page.get().id as u32).to_be_bytes());
divider_cell.extend(std::iter::repeat(0).take(9));
let n = write_varint(&mut divider_cell.as_mut_slice()[4..], key);
divider_cell.truncate(4 + n);
@@ -1122,7 +1106,7 @@ impl BTreeCursor {
{
// copy last page id to right pointer
let last_pointer = new_pages.last().unwrap().borrow().id as u32;
let last_pointer = new_pages.last().unwrap().get().id as u32;
parent_contents.write_u32(right_pointer, last_pointer);
}
self.stack.pop();
@@ -1141,20 +1125,17 @@ impl BTreeCursor {
let is_page_1 = {
let current_root = self.stack.top();
let current_root_ref = current_root.borrow();
current_root_ref.id == 1
current_root.get().id == 1
};
let offset = if is_page_1 { DATABASE_HEADER_SIZE } else { 0 };
let new_root_page_ref = self.allocate_page(PageType::TableInterior, offset);
let new_root_page = self.allocate_page(PageType::TableInterior, offset);
{
let current_root = self.stack.top();
let current_root_ref = current_root.borrow();
let current_root_contents = current_root_ref.contents.as_ref().unwrap();
let current_root_contents = current_root.get().contents.as_ref().unwrap();
let mut new_root_page = new_root_page_ref.borrow_mut();
let new_root_page_id = new_root_page.id;
let new_root_page_contents = new_root_page.contents.as_mut().unwrap();
let new_root_page_id = new_root_page.get().id;
let new_root_page_contents = new_root_page.get().contents.as_mut().unwrap();
if is_page_1 {
// Copy header
let current_root_buf = current_root_contents.as_ptr();
@@ -1166,8 +1147,6 @@ impl BTreeCursor {
new_root_page_contents
.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, new_root_page_id as u32);
new_root_page_contents.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0);
// TODO:: this page should have offset
// copy header bytes to here
}
/* swap splitted page buffer with new root buffer so we don't have to update page idx */
@@ -1175,18 +1154,16 @@ impl BTreeCursor {
let (root_id, child_id, child) = {
let page_ref = self.stack.top();
let child = page_ref.clone();
let mut child_rc = page_ref.borrow_mut();
let mut new_root_page = new_root_page_ref.borrow_mut();
// Swap the entire Page structs
std::mem::swap(&mut child_rc.id, &mut new_root_page.id);
std::mem::swap(&mut child.get().id, &mut new_root_page.get().id);
// TODO:: shift bytes by offset to left on child because now child has offset 100
// and header bytes
// Also change the offset of page
//
if is_page_1 {
// Remove header from child and set offset to 0
let contents = child_rc.contents.as_mut().unwrap();
let contents = child.get().contents.as_mut().unwrap();
let (cell_pointer_offset, _) = contents.cell_get_raw_pointer_region();
// change cell pointers
for cell_idx in 0..contents.cell_count() {
@@ -1200,13 +1177,13 @@ impl BTreeCursor {
buf.copy_within(DATABASE_HEADER_SIZE.., 0);
}
self.pager.add_dirty(new_root_page.id);
self.pager.add_dirty(child_rc.id);
(new_root_page.id, child_rc.id, child)
self.pager.add_dirty(new_root_page.get().id);
self.pager.add_dirty(child.get().id);
(new_root_page.get().id, child.get().id, child)
};
debug!("Balancing root. root={}, rightmost={}", root_id, child_id);
let root = new_root_page_ref.clone();
let root = new_root_page.clone();
self.root_page = root_id;
self.stack.clear();
@@ -1218,22 +1195,19 @@ impl BTreeCursor {
}
}
fn allocate_page(&self, page_type: PageType, offset: usize) -> Rc<RefCell<Page>> {
fn allocate_page(&self, page_type: PageType, offset: usize) -> PageRef {
let page = self.pager.allocate_page().unwrap();
btree_init_page(&page, page_type, &self.database_header.borrow(), offset);
page
}
fn allocate_overflow_page(&self) -> Rc<RefCell<Page>> {
fn allocate_overflow_page(&self) -> PageRef {
let page = self.pager.allocate_page().unwrap();
{
// setup overflow page
let mut contents = page.borrow_mut();
let contents = contents.contents.as_mut().unwrap();
let buf = contents.as_ptr();
buf.fill(0);
}
// setup overflow page
let contents = page.get().contents.as_mut().unwrap();
let buf = contents.as_ptr();
buf.fill(0);
page
}
@@ -1493,9 +1467,8 @@ impl BTreeCursor {
let overflow_page = self.allocate_overflow_page();
overflow_pages.push(overflow_page.clone());
{
let mut page = overflow_page.borrow_mut();
let id = page.id as u32;
let contents = page.contents.as_mut().unwrap();
let id = overflow_page.get().id as u32;
let contents = overflow_page.get().contents.as_mut().unwrap();
// TODO: take into account offset here?
let buf = contents.as_ptr();
@@ -1572,11 +1545,11 @@ impl BTreeCursor {
}
impl PageStack {
fn push(&self, page: Rc<RefCell<Page>>) {
fn push(&self, page: PageRef) {
debug!(
"pagestack::push(current={}, new_page_id={})",
self.current_page.borrow(),
page.borrow().id
page.get().id
);
*self.current_page.borrow_mut() += 1;
let current = *self.current_page.borrow();
@@ -1596,7 +1569,7 @@ impl PageStack {
*self.current_page.borrow_mut() -= 1;
}
fn top(&self) -> Rc<RefCell<Page>> {
fn top(&self) -> PageRef {
let current = *self.current_page.borrow();
let page = self.stack.borrow()[current as usize]
.as_ref()
@@ -1605,12 +1578,12 @@ impl PageStack {
debug!(
"pagestack::top(current={}, page_id={})",
current,
page.borrow().id
page.get().id
);
page
}
fn parent(&self) -> Rc<RefCell<Page>> {
fn parent(&self) -> PageRef {
let current = *self.current_page.borrow();
self.stack.borrow()[current as usize - 1]
.as_ref()
@@ -1794,12 +1767,11 @@ impl Cursor for BTreeCursor {
_ => unreachable!("btree tables are indexed by integers!"),
};
return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ));
let page_ref = self.stack.top();
let page = page_ref.borrow();
let page = self.stack.top();
// TODO(pere): request load
return_if_locked!(page);
let contents = page.contents.as_ref().unwrap();
let contents = page.get().contents.as_ref().unwrap();
// find cell
let int_key = match key {
@@ -1834,19 +1806,19 @@ impl Cursor for BTreeCursor {
),
};
let page = self.allocate_page(page_type, 0);
let id = page.borrow().id;
let id = page.get().id;
id as u32
}
}
pub fn btree_init_page(
page: &Rc<RefCell<Page>>,
page: &PageRef,
page_type: PageType,
db_header: &DatabaseHeader,
offset: usize,
) {
// setup btree page
let mut contents = page.borrow_mut();
let contents = page.get();
debug!("btree_init_page(id={}, offset={})", contents.id, offset);
let contents = contents.contents.as_mut().unwrap();
contents.offset = offset;

View File

@@ -5,22 +5,30 @@ use crate::storage::wal::Wal;
use crate::{Buffer, Result};
use log::{debug, trace};
use sieve_cache::SieveCache;
use std::cell::RefCell;
use std::cell::{RefCell, UnsafeCell};
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::ptr::{drop_in_place, NonNull};
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use super::wal::CheckpointStatus;
pub struct Page {
pub struct PageInner {
pub flags: AtomicUsize,
pub contents: Option<PageContent>,
pub id: usize,
}
pub struct Page {
pub inner: UnsafeCell<PageInner>,
}
// Concurrency control of pages will be handled by the pager, we won't wrap Page with RwLock
// because that is bad bad.
pub type PageRef = Arc<Page>;
/// Page is up-to-date.
const PAGE_UPTODATE: usize = 0b001;
/// Page is locked for I/O to prevent concurrent access.
@@ -35,78 +43,84 @@ const PAGE_LOADED: usize = 0b10000;
impl Page {
pub fn new(id: usize) -> Page {
Page {
flags: AtomicUsize::new(0),
contents: None,
id,
inner: UnsafeCell::new(PageInner {
flags: AtomicUsize::new(0),
contents: None,
id,
}),
}
}
pub fn get(&self) -> &mut PageInner {
unsafe { &mut *self.inner.get() }
}
pub fn is_uptodate(&self) -> bool {
self.flags.load(Ordering::SeqCst) & PAGE_UPTODATE != 0
self.get().flags.load(Ordering::SeqCst) & PAGE_UPTODATE != 0
}
pub fn set_uptodate(&self) {
self.flags.fetch_or(PAGE_UPTODATE, Ordering::SeqCst);
self.get().flags.fetch_or(PAGE_UPTODATE, Ordering::SeqCst);
}
pub fn clear_uptodate(&self) {
self.flags.fetch_and(!PAGE_UPTODATE, Ordering::SeqCst);
self.get().flags.fetch_and(!PAGE_UPTODATE, Ordering::SeqCst);
}
pub fn is_locked(&self) -> bool {
self.flags.load(Ordering::SeqCst) & PAGE_LOCKED != 0
self.get().flags.load(Ordering::SeqCst) & PAGE_LOCKED != 0
}
pub fn set_locked(&self) {
self.flags.fetch_or(PAGE_LOCKED, Ordering::SeqCst);
self.get().flags.fetch_or(PAGE_LOCKED, Ordering::SeqCst);
}
pub fn clear_locked(&self) {
self.flags.fetch_and(!PAGE_LOCKED, Ordering::SeqCst);
self.get().flags.fetch_and(!PAGE_LOCKED, Ordering::SeqCst);
}
pub fn is_error(&self) -> bool {
self.flags.load(Ordering::SeqCst) & PAGE_ERROR != 0
self.get().flags.load(Ordering::SeqCst) & PAGE_ERROR != 0
}
pub fn set_error(&self) {
self.flags.fetch_or(PAGE_ERROR, Ordering::SeqCst);
self.get().flags.fetch_or(PAGE_ERROR, Ordering::SeqCst);
}
pub fn clear_error(&self) {
self.flags.fetch_and(!PAGE_ERROR, Ordering::SeqCst);
self.get().flags.fetch_and(!PAGE_ERROR, Ordering::SeqCst);
}
pub fn is_dirty(&self) -> bool {
self.flags.load(Ordering::SeqCst) & PAGE_DIRTY != 0
self.get().flags.load(Ordering::SeqCst) & PAGE_DIRTY != 0
}
pub fn set_dirty(&self) {
self.flags.fetch_or(PAGE_DIRTY, Ordering::SeqCst);
self.get().flags.fetch_or(PAGE_DIRTY, Ordering::SeqCst);
}
pub fn clear_dirty(&self) {
self.flags.fetch_and(!PAGE_DIRTY, Ordering::SeqCst);
self.get().flags.fetch_and(!PAGE_DIRTY, Ordering::SeqCst);
}
pub fn is_loaded(&self) -> bool {
self.flags.load(Ordering::SeqCst) & PAGE_LOADED != 0
self.get().flags.load(Ordering::SeqCst) & PAGE_LOADED != 0
}
pub fn set_loaded(&self) {
self.flags.fetch_or(PAGE_LOADED, Ordering::SeqCst);
self.get().flags.fetch_or(PAGE_LOADED, Ordering::SeqCst);
}
pub fn clear_loaded(&self) {
log::debug!("clear loaded {}", self.id);
self.flags.fetch_and(!PAGE_LOADED, Ordering::SeqCst);
log::debug!("clear loaded {}", self.get().id);
self.get().flags.fetch_and(!PAGE_LOADED, Ordering::SeqCst);
}
}
#[allow(dead_code)]
struct PageCacheEntry {
key: usize,
page: Rc<RefCell<Page>>,
page: PageRef,
prev: Option<NonNull<PageCacheEntry>>,
next: Option<NonNull<PageCacheEntry>>,
}
@@ -117,12 +131,14 @@ impl PageCacheEntry {
}
}
struct DumbLruPageCache {
pub struct DumbLruPageCache {
capacity: usize,
map: RefCell<HashMap<usize, NonNull<PageCacheEntry>>>,
head: RefCell<Option<NonNull<PageCacheEntry>>>,
tail: RefCell<Option<NonNull<PageCacheEntry>>>,
}
unsafe impl Send for DumbLruPageCache {}
unsafe impl Sync for DumbLruPageCache {}
impl DumbLruPageCache {
pub fn new(capacity: usize) -> Self {
@@ -138,7 +154,7 @@ impl DumbLruPageCache {
self.map.borrow().contains_key(&key)
}
pub fn insert(&mut self, key: usize, value: Rc<RefCell<Page>>) {
pub fn insert(&mut self, key: usize, value: PageRef) {
self._delete(key, false);
debug!("cache_insert(key={})", key);
let mut entry = Box::new(PageCacheEntry {
@@ -181,7 +197,7 @@ impl DumbLruPageCache {
ptr.copied()
}
pub fn get(&mut self, key: &usize) -> Option<Rc<RefCell<Page>>> {
pub fn get(&mut self, key: &usize) -> Option<PageRef> {
debug!("cache_get(key={})", key);
let ptr = self.get_ptr(*key);
ptr?;
@@ -202,10 +218,10 @@ impl DumbLruPageCache {
if clean_page {
// evict buffer
let mut page = entry.page.borrow_mut();
let page = &entry.page;
page.clear_loaded();
debug!("cleaning up page {}", page.id);
let _ = page.contents.take();
debug!("cleaning up page {}", page.get().id);
let _ = page.get().contents.take();
}
let (next, prev) = unsafe {
@@ -254,7 +270,7 @@ impl DumbLruPageCache {
return;
}
let tail = unsafe { tail.unwrap().as_mut() };
if RefCell::borrow(&tail.page).is_dirty() {
if tail.page.is_dirty() {
// TODO: drop from another clean entry?
return;
}
@@ -325,7 +341,7 @@ pub struct Pager {
/// The write-ahead log (WAL) for the database.
wal: Rc<RefCell<dyn Wal>>,
/// A page cache for the database.
page_cache: RefCell<DumbLruPageCache>,
page_cache: Arc<RwLock<DumbLruPageCache>>,
/// Buffer pool for temporary data storage.
buffer_pool: Rc<BufferPool>,
/// I/O interface for input/output operations.
@@ -351,11 +367,11 @@ impl Pager {
page_io: Rc<dyn DatabaseStorage>,
wal: Rc<RefCell<dyn Wal>>,
io: Arc<dyn crate::io::IO>,
page_cache: Arc<RwLock<DumbLruPageCache>>,
) -> Result<Self> {
let db_header = RefCell::borrow(&db_header_ref);
let page_size = db_header.page_size as usize;
let buffer_pool = Rc::new(BufferPool::new(page_size));
let page_cache = RefCell::new(DumbLruPageCache::new(10));
Ok(Self {
page_io,
wal,
@@ -394,21 +410,21 @@ impl Pager {
}
/// Reads a page from the database.
pub fn read_page(&self, page_idx: usize) -> crate::Result<Rc<RefCell<Page>>> {
pub fn read_page(&self, page_idx: usize) -> crate::Result<PageRef> {
trace!("read_page(page_idx = {})", page_idx);
let mut page_cache = self.page_cache.borrow_mut();
let mut page_cache = self.page_cache.write().unwrap();
if let Some(page) = page_cache.get(&page_idx) {
trace!("read_page(page_idx = {}) = cached", page_idx);
return Ok(page.clone());
}
let page = Rc::new(RefCell::new(Page::new(page_idx)));
RefCell::borrow(&page).set_locked();
let page = Arc::new(Page::new(page_idx));
page.set_locked();
if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? {
self.wal
.borrow()
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
{
let page = page.borrow_mut();
page.set_uptodate();
}
// TODO(pere) ensure page is inserted, we should probably first insert to page cache
@@ -428,17 +444,16 @@ impl Pager {
}
/// Loads pages if not loaded
pub fn load_page(&self, page: Rc<RefCell<Page>>) -> Result<()> {
let id = page.borrow().id;
pub fn load_page(&self, page: PageRef) -> Result<()> {
let id = page.get().id;
trace!("load_page(page_idx = {})", id);
let mut page_cache = self.page_cache.borrow_mut();
page.borrow_mut().set_locked();
let mut page_cache = self.page_cache.write().unwrap();
page.set_locked();
if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? {
self.wal
.borrow()
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
{
let page = page.borrow_mut();
page.set_uptodate();
}
// TODO(pere) ensure page is inserted
@@ -467,7 +482,8 @@ impl Pager {
/// Changes the size of the page cache.
pub fn change_page_cache_size(&self, capacity: usize) {
self.page_cache.borrow_mut().resize(capacity);
let mut page_cache = self.page_cache.write().unwrap();
page_cache.resize(capacity);
}
pub fn add_dirty(&self, page_id: usize) {
@@ -483,9 +499,9 @@ impl Pager {
FlushState::Start => {
let db_size = self.db_header.borrow().database_size;
for page_id in self.dirty_pages.borrow().iter() {
let mut cache = self.page_cache.borrow_mut();
let mut cache = self.page_cache.write().unwrap();
let page = cache.get(page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_type = page.borrow().contents.as_ref().unwrap().maybe_page_type();
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
debug!("appending frame {} {:?}", page_id, page_type);
self.wal.borrow_mut().append_frame(
page.clone(),
@@ -589,7 +605,7 @@ impl Pager {
Err(err) => panic!("error while clearing cache {}", err),
}
}
self.page_cache.borrow_mut().clear();
self.page_cache.write().unwrap().clear();
}
/*
@@ -597,7 +613,7 @@ impl Pager {
Currently free list pages are not yet supported.
*/
#[allow(clippy::readonly_write_lock)]
pub fn allocate_page(&self) -> Result<Rc<RefCell<Page>>> {
pub fn allocate_page(&self) -> Result<PageRef> {
let header = &self.db_header;
let mut header = RefCell::borrow_mut(header);
header.database_size += 1;
@@ -606,38 +622,35 @@ impl Pager {
// read sync for now
loop {
let first_page_ref = self.read_page(1)?;
let first_page = RefCell::borrow_mut(&first_page_ref);
if first_page.is_locked() {
drop(first_page);
if first_page_ref.is_locked() {
self.io.run_once()?;
continue;
}
first_page.set_dirty();
first_page_ref.set_dirty();
self.add_dirty(1);
let contents = first_page.contents.as_ref().unwrap();
let contents = first_page_ref.get().contents.as_ref().unwrap();
contents.write_database_header(&header);
break;
}
}
let page_ref = allocate_page(header.database_size as usize, &self.buffer_pool, 0);
let page = allocate_page(header.database_size as usize, &self.buffer_pool, 0);
{
// setup page and add to cache
let page = page_ref.borrow_mut();
page.set_dirty();
self.add_dirty(page.id);
let mut cache = self.page_cache.borrow_mut();
cache.insert(page.id, page_ref.clone());
self.add_dirty(page.get().id);
let mut cache = self.page_cache.write().unwrap();
cache.insert(page.get().id, page.clone());
}
Ok(page_ref)
Ok(page)
}
pub fn put_loaded_page(&self, id: usize, page: Rc<RefCell<Page>>) {
let mut cache = RefCell::borrow_mut(&self.page_cache);
pub fn put_loaded_page(&self, id: usize, page: PageRef) {
let mut cache = self.page_cache.write().unwrap();
// cache insert invalidates previous page
cache.insert(id, page.clone());
page.borrow_mut().set_loaded();
page.set_loaded();
}
pub fn usable_size(&self) -> usize {
@@ -646,14 +659,9 @@ impl Pager {
}
}
pub fn allocate_page(
page_id: usize,
buffer_pool: &Rc<BufferPool>,
offset: usize,
) -> Rc<RefCell<Page>> {
let page_ref = Rc::new(RefCell::new(Page::new(page_id)));
pub fn allocate_page(page_id: usize, buffer_pool: &Rc<BufferPool>, offset: usize) -> PageRef {
let page = Arc::new(Page::new(page_id));
{
let mut page = RefCell::borrow_mut(&page_ref);
let buffer = buffer_pool.get();
let bp = buffer_pool.clone();
let drop_fn = Rc::new(move |buf| {
@@ -661,11 +669,36 @@ pub fn allocate_page(
});
let buffer = Rc::new(RefCell::new(Buffer::new(buffer, drop_fn)));
page.set_loaded();
page.contents = Some(PageContent {
page.get().contents = Some(PageContent {
offset,
buffer,
overflow_cells: Vec::new(),
});
}
page_ref
page
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, RwLock};
use super::{DumbLruPageCache, Page};
#[test]
fn test_shared_cache() {
// ensure cache can be shared between threads
let cache = Arc::new(RwLock::new(DumbLruPageCache::new(10)));
let thread = {
let cache = cache.clone();
std::thread::spawn(move || {
let mut cache = cache.write().unwrap();
cache.insert(1, Arc::new(Page::new(1)));
})
};
let _ = thread.join();
let mut cache = cache.write().unwrap();
let page = cache.get(&1);
assert_eq!(page.unwrap().get().id, 1);
}
}

View File

@@ -45,13 +45,16 @@ use crate::error::LimboError;
use crate::io::{Buffer, Completion, ReadCompletion, SyncCompletion, WriteCompletion};
use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::pager::{Page, Pager};
use crate::storage::pager::Pager;
use crate::types::{OwnedRecord, OwnedValue};
use crate::{File, Result};
use log::trace;
use std::cell::RefCell;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::{Arc, RwLock};
use super::pager::PageRef;
/// The size of the database header in bytes.
pub const DATABASE_HEADER_SIZE: usize = 100;
@@ -95,7 +98,7 @@ pub const WAL_FRAME_HEADER_SIZE: usize = 24;
pub const WAL_MAGIC_LE: u32 = 0x377f0682;
pub const WAL_MAGIC_BE: u32 = 0x377f0683;
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
#[repr(C)] // This helps with encoding because rust does not respect the order in structs, so in
// this case we want to keep the order
pub struct WalHeader {
@@ -531,7 +534,7 @@ impl PageContent {
pub fn begin_read_page(
page_io: Rc<dyn DatabaseStorage>,
buffer_pool: Rc<BufferPool>,
page: Rc<RefCell<Page>>,
page: PageRef,
page_idx: usize,
) -> Result<()> {
trace!("begin_read_btree_page(page_idx = {})", page_idx);
@@ -544,7 +547,7 @@ pub fn begin_read_page(
let complete = Box::new(move |buf: Rc<RefCell<Buffer>>| {
let page = page.clone();
if finish_read_page(page_idx, buf, page.clone()).is_err() {
page.borrow_mut().set_error();
page.set_error();
}
});
let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete)));
@@ -552,11 +555,7 @@ pub fn begin_read_page(
Ok(())
}
fn finish_read_page(
page_idx: usize,
buffer_ref: Rc<RefCell<Buffer>>,
page: Rc<RefCell<Page>>,
) -> Result<()> {
fn finish_read_page(page_idx: usize, buffer_ref: Rc<RefCell<Buffer>>, page: PageRef) -> Result<()> {
trace!("finish_read_btree_page(page_idx = {})", page_idx);
let pos = if page_idx == 1 {
DATABASE_HEADER_SIZE
@@ -569,8 +568,7 @@ fn finish_read_page(
overflow_cells: Vec::new(),
};
{
let mut page = page.borrow_mut();
page.contents.replace(inner);
page.get().contents.replace(inner);
page.set_uptodate();
page.clear_locked();
page.set_loaded();
@@ -580,16 +578,16 @@ fn finish_read_page(
pub fn begin_write_btree_page(
pager: &Pager,
page: &Rc<RefCell<Page>>,
page: &PageRef,
write_counter: Rc<RefCell<usize>>,
) -> Result<()> {
let page_source = &pager.page_io;
let page_finish = page.clone();
let page_id = page.borrow().id;
let page_id = page.get().id;
log::trace!("begin_write_btree_page(page_id={})", page_id);
let buffer = {
let page = page.borrow();
let page = page.get();
let contents = page.contents.as_ref().unwrap();
contents.buffer.clone()
};
@@ -603,7 +601,7 @@ pub fn begin_write_btree_page(
let buf_len = buf_copy.borrow().len();
*write_counter.borrow_mut() -= 1;
page_finish.borrow_mut().clear_dirty();
page_finish.clear_dirty();
if bytes_written < buf_len as i32 {
log::error!("wrote({bytes_written}) less than expected({buf_len})");
}
@@ -770,7 +768,7 @@ fn read_payload(unread: &[u8], payload_size: usize, pager: Rc<Pager>) -> (Vec<u8
break;
}
}
let mut page = page.borrow_mut();
let page = page.get();
let contents = page.contents.as_mut().unwrap();
let to_read = left_to_read.min(usable_size - 4);
@@ -1006,10 +1004,10 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec<u8>) {
payload.extend_from_slice(&varint);
}
pub fn begin_read_wal_header(io: &Rc<dyn File>) -> Result<Rc<RefCell<WalHeader>>> {
pub fn begin_read_wal_header(io: &Rc<dyn File>) -> Result<Arc<RwLock<WalHeader>>> {
let drop_fn = Rc::new(|_buf| {});
let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn)));
let result = Rc::new(RefCell::new(WalHeader::default()));
let result = Arc::new(RwLock::new(WalHeader::default()));
let header = result.clone();
let complete = Box::new(move |buf: Rc<RefCell<Buffer>>| {
let header = header.clone();
@@ -1020,10 +1018,10 @@ pub fn begin_read_wal_header(io: &Rc<dyn File>) -> Result<Rc<RefCell<WalHeader>>
Ok(result)
}
fn finish_read_wal_header(buf: Rc<RefCell<Buffer>>, header: Rc<RefCell<WalHeader>>) -> Result<()> {
fn finish_read_wal_header(buf: Rc<RefCell<Buffer>>, header: Arc<RwLock<WalHeader>>) -> Result<()> {
let buf = buf.borrow();
let buf = buf.as_slice();
let mut header = header.borrow_mut();
let mut header = header.write().unwrap();
header.magic = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
header.file_format = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
header.page_size = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
@@ -1039,7 +1037,7 @@ pub fn begin_read_wal_frame(
io: &Rc<dyn File>,
offset: usize,
buffer_pool: Rc<BufferPool>,
page: Rc<RefCell<Page>>,
page: PageRef,
) -> Result<()> {
let buf = buffer_pool.get();
let drop_fn = Rc::new(move |buf| {
@@ -1060,14 +1058,14 @@ pub fn begin_read_wal_frame(
pub fn begin_write_wal_frame(
io: &Rc<dyn File>,
offset: usize,
page: &Rc<RefCell<Page>>,
page: &PageRef,
db_size: u32,
write_counter: Rc<RefCell<usize>>,
wal_header: &WalHeader,
checksums: (u32, u32),
) -> Result<(u32, u32)> {
let page_finish = page.clone();
let page_id = page.borrow().id;
let page_id = page.get().id;
trace!("begin_write_wal_frame(offset={}, page={})", offset, page_id);
let mut header = WalFrameHeader {
@@ -1079,7 +1077,7 @@ pub fn begin_write_wal_frame(
checksum_2: 0,
};
let (buffer, checksums) = {
let page = page.borrow();
let page = page.get();
let contents = page.contents.as_ref().unwrap();
let drop_fn = Rc::new(|_buf| {});
@@ -1122,7 +1120,7 @@ pub fn begin_write_wal_frame(
let buf_len = buf_copy.borrow().len();
*write_counter.borrow_mut() -= 1;
page_finish.borrow_mut().clear_dirty();
page_finish.clear_dirty();
if bytes_written < buf_len as i32 {
log::error!("wrote({bytes_written}) less than expected({buf_len})");
}

View File

@@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::sync::RwLock;
use std::{cell::RefCell, rc::Rc, sync::Arc};
use log::{debug, trace};
@@ -8,12 +9,12 @@ use crate::storage::sqlite3_ondisk::{
begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
};
use crate::Completion;
use crate::{storage::pager::Page, Result};
use crate::Result;
use self::sqlite3_ondisk::{checksum_wal, WAL_MAGIC_BE, WAL_MAGIC_LE};
use super::buffer_pool::BufferPool;
use super::pager::Pager;
use super::pager::{PageRef, Pager};
use super::sqlite3_ondisk::{self, begin_write_btree_page, WalHeader};
/// Write-ahead log (WAL).
@@ -34,17 +35,12 @@ pub trait Wal {
fn find_frame(&self, page_id: u64) -> Result<Option<u64>>;
/// Read a frame from the WAL.
fn read_frame(
&self,
frame_id: u64,
page: Rc<RefCell<Page>>,
buffer_pool: Rc<BufferPool>,
) -> Result<()>;
fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc<BufferPool>) -> Result<()>;
/// Write a frame to the WAL.
fn append_frame(
&mut self,
page: Rc<RefCell<Page>>,
page: PageRef,
db_size: u32,
pager: &Pager,
write_counter: Rc<RefCell<usize>>,
@@ -61,22 +57,24 @@ pub trait Wal {
pub struct WalFile {
io: Arc<dyn crate::io::IO>,
wal_path: String,
file: RefCell<Option<Rc<dyn File>>>,
wal_header: RefCell<Option<Rc<RefCell<sqlite3_ondisk::WalHeader>>>>,
min_frame: RefCell<u64>,
max_frame: RefCell<u64>,
nbackfills: RefCell<u64>,
// Maps pgno to frame id and offset in wal file
frame_cache: RefCell<HashMap<u64, Vec<u64>>>, // FIXME: for now let's use a simple hashmap instead of a shm file
checkpoint_threshold: usize,
ongoing_checkpoint: HashSet<usize>,
syncing: Rc<RefCell<bool>>,
page_size: usize,
last_checksum: RefCell<(u32, u32)>, // Check of last frame in WAL, this is a cumulative checksum
// over all frames in the WAL
ongoing_checkpoint: HashSet<usize>,
shared: Arc<RwLock<WalFileShared>>,
checkpoint_threshold: usize,
}
pub struct WalFileShared {
wal_header: Arc<RwLock<sqlite3_ondisk::WalHeader>>,
min_frame: u64,
max_frame: u64,
nbackfills: u64,
// Maps pgno to frame id and offset in wal file
frame_cache: HashMap<u64, Vec<u64>>, // FIXME: for now let's use a simple hashmap instead of a shm file
last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL
file: Rc<dyn File>,
}
pub enum CheckpointStatus {
@@ -87,7 +85,8 @@ pub enum CheckpointStatus {
impl Wal for WalFile {
/// Begin a read transaction.
fn begin_read_tx(&self) -> Result<()> {
self.min_frame.replace(*self.nbackfills.borrow() + 1);
let mut shared = self.shared.write().unwrap();
shared.min_frame = shared.nbackfills + 1;
Ok(())
}
@@ -98,15 +97,14 @@ impl Wal for WalFile {
/// Find the latest frame containing a page.
fn find_frame(&self, page_id: u64) -> Result<Option<u64>> {
let frame_cache = self.frame_cache.borrow();
let frames = frame_cache.get(&page_id);
let shared = self.shared.read().unwrap();
let frames = shared.frame_cache.get(&page_id);
if frames.is_none() {
return Ok(None);
}
self.ensure_init()?;
let frames = frames.unwrap();
for frame in frames.iter().rev() {
if *frame <= *self.max_frame.borrow() {
if *frame <= shared.max_frame {
return Ok(Some(*frame));
}
}
@@ -114,16 +112,12 @@ impl Wal for WalFile {
}
/// Read a frame from the WAL.
fn read_frame(
&self,
frame_id: u64,
page: Rc<RefCell<Page>>,
buffer_pool: Rc<BufferPool>,
) -> Result<()> {
fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc<BufferPool>) -> Result<()> {
debug!("read_frame({})", frame_id);
let offset = self.frame_offset(frame_id);
let shared = self.shared.read().unwrap();
begin_read_wal_frame(
self.file.borrow().as_ref().unwrap(),
&shared.file,
offset + WAL_FRAME_HEADER_SIZE,
buffer_pool,
page,
@@ -134,14 +128,14 @@ impl Wal for WalFile {
/// Write a frame to the WAL.
fn append_frame(
&mut self,
page: Rc<RefCell<Page>>,
page: PageRef,
db_size: u32,
_pager: &Pager,
write_counter: Rc<RefCell<usize>>,
) -> Result<()> {
self.ensure_init()?;
let page_id = page.borrow().id;
let frame_id = *self.max_frame.borrow();
let page_id = page.get().id;
let mut shared = self.shared.write().unwrap();
let frame_id = shared.max_frame;
let offset = self.frame_offset(frame_id);
trace!(
"append_frame(frame={}, offset={}, page_id={})",
@@ -149,12 +143,11 @@ impl Wal for WalFile {
offset,
page_id
);
let header = self.wal_header.borrow();
let header = header.as_ref().unwrap();
let header = header.borrow();
let checksums = *self.last_checksum.borrow();
let header = shared.wal_header.clone();
let header = header.read().unwrap();
let checksums = shared.last_checksum;
let checksums = begin_write_wal_frame(
self.file.borrow().as_ref().unwrap(),
&shared.file,
offset,
&page,
db_size,
@@ -162,15 +155,14 @@ impl Wal for WalFile {
&header,
checksums,
)?;
self.last_checksum.replace(checksums);
self.max_frame.replace(frame_id + 1);
shared.last_checksum = checksums;
shared.max_frame = frame_id + 1;
{
let mut frame_cache = self.frame_cache.borrow_mut();
let frames = frame_cache.get_mut(&(page_id as u64));
let frames = shared.frame_cache.get_mut(&(page_id as u64));
match frames {
Some(frames) => frames.push(frame_id),
None => {
frame_cache.insert(page_id as u64, vec![frame_id]);
shared.frame_cache.insert(page_id as u64, vec![frame_id]);
}
}
}
@@ -188,7 +180,8 @@ impl Wal for WalFile {
}
fn should_checkpoint(&self) -> bool {
let frame_id = *self.max_frame.borrow() as usize;
let shared = self.shared.read().unwrap();
let frame_id = shared.max_frame as usize;
frame_id >= self.checkpoint_threshold
}
@@ -197,7 +190,8 @@ impl Wal for WalFile {
pager: &Pager,
write_counter: Rc<RefCell<usize>>,
) -> Result<CheckpointStatus> {
for (page_id, _frames) in self.frame_cache.borrow().iter() {
let mut shared = self.shared.write().unwrap();
for (page_id, _frames) in shared.frame_cache.iter() {
// move page from WAL to database file
// TODO(Pere): use splice syscall in linux to do zero-copy file page movements to improve perf
let page_id = *page_id as usize;
@@ -206,7 +200,7 @@ impl Wal for WalFile {
}
let page = pager.read_page(page_id)?;
if page.borrow().is_locked() {
if page.is_locked() {
return Ok(CheckpointStatus::IO);
}
@@ -214,16 +208,15 @@ impl Wal for WalFile {
self.ongoing_checkpoint.insert(page_id);
}
self.frame_cache.borrow_mut().clear();
*self.max_frame.borrow_mut() = 0;
// TODO: only clear checkpointed frames
shared.frame_cache.clear();
shared.max_frame = 0;
self.ongoing_checkpoint.clear();
Ok(CheckpointStatus::Done)
}
fn sync(&mut self) -> Result<CheckpointStatus> {
self.ensure_init()?;
let file = self.file.borrow();
let file = file.as_ref().unwrap();
let shared = self.shared.write().unwrap();
{
let syncing = self.syncing.clone();
let completion = Completion::Sync(SyncCompletion {
@@ -231,7 +224,7 @@ impl Wal for WalFile {
*syncing.borrow_mut() = false;
}),
});
file.sync(Rc::new(completion))?;
shared.file.sync(Rc::new(completion))?;
}
if *self.syncing.borrow() {
@@ -243,89 +236,86 @@ impl Wal for WalFile {
}
impl WalFile {
pub fn new(io: Arc<dyn IO>, wal_path: String, page_size: usize) -> Self {
pub fn new(io: Arc<dyn IO>, page_size: usize, shared: Arc<RwLock<WalFileShared>>) -> Self {
Self {
io,
wal_path,
file: RefCell::new(None),
wal_header: RefCell::new(None),
frame_cache: RefCell::new(HashMap::new()),
min_frame: RefCell::new(0),
max_frame: RefCell::new(0),
nbackfills: RefCell::new(0),
checkpoint_threshold: 1000,
shared,
ongoing_checkpoint: HashSet::new(),
syncing: Rc::new(RefCell::new(false)),
checkpoint_threshold: 1000,
page_size,
last_checksum: RefCell::new((0, 0)),
}
}
fn ensure_init(&self) -> Result<()> {
if self.file.borrow().is_none() {
match self
.io
.open_file(&self.wal_path, crate::io::OpenFlags::Create, false)
{
Ok(file) => {
if file.size()? > 0 {
let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) {
Ok(header) => header,
Err(err) => panic!("Couldn't read header page: {:?}", err),
};
// TODO: Return a completion instead.
self.io.run_once()?;
self.wal_header.replace(Some(wal_header));
} else {
// magic is a single number represented as WAL_MAGIC_LE but the big endian
// counterpart is just the same number with LSB set to 1.
let magic = if cfg!(target_endian = "big") {
WAL_MAGIC_BE
} else {
WAL_MAGIC_LE
};
let mut wal_header = WalHeader {
magic,
file_format: 3007000,
page_size: self.page_size as u32,
checkpoint_seq: 0, // TODO implement sequence number
salt_1: 0, // TODO implement salt
salt_2: 0,
checksum_1: 0,
checksum_2: 0,
};
let native = cfg!(target_endian = "big"); // if target_endian is
// already big then we don't care but if isn't, header hasn't yet been
// encoded to big endian, therefore we wan't to swap bytes to compute this
// checksum.
let checksums = *self.last_checksum.borrow_mut();
let checksums = checksum_wal(
&wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes
&wal_header,
checksums,
native, // this is false because we haven't encoded the wal header yet
);
wal_header.checksum_1 = checksums.0;
wal_header.checksum_2 = checksums.1;
self.last_checksum.replace(checksums);
sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?;
self.wal_header
.replace(Some(Rc::new(RefCell::new(wal_header))));
}
*self.file.borrow_mut() = Some(file);
}
Err(err) => panic!("{:?} {}", err, &self.wal_path),
};
}
Ok(())
}
fn frame_offset(&self, frame_id: u64) -> usize {
let header = self.wal_header.borrow();
let header = header.as_ref().unwrap().borrow();
let page_size = header.page_size;
let page_size = self.page_size;
let page_offset = frame_id * (page_size as u64 + WAL_FRAME_HEADER_SIZE as u64);
let offset = WAL_HEADER_SIZE as u64 + page_offset;
offset as usize
}
}
impl WalFileShared {
pub fn open_shared(
io: &Arc<dyn IO>,
path: &str,
page_size: u16,
) -> Result<Arc<RwLock<WalFileShared>>> {
let file = io.open_file(path, crate::io::OpenFlags::Create, false)?;
let header = if file.size()? > 0 {
let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) {
Ok(header) => header,
Err(err) => panic!("Couldn't read header page: {:?}", err),
};
log::info!("recover not implemented yet");
// TODO: Return a completion instead.
io.run_once()?;
wal_header
} else {
let magic = if cfg!(target_endian = "big") {
WAL_MAGIC_BE
} else {
WAL_MAGIC_LE
};
let mut wal_header = WalHeader {
magic,
file_format: 3007000,
page_size: page_size as u32,
checkpoint_seq: 0, // TODO implement sequence number
salt_1: 0, // TODO implement salt
salt_2: 0,
checksum_1: 0,
checksum_2: 0,
};
let native = cfg!(target_endian = "big"); // if target_endian is
// already big then we don't care but if isn't, header hasn't yet been
// encoded to big endian, therefore we wan't to swap bytes to compute this
// checksum.
let checksums = (0, 0);
let checksums = checksum_wal(
&wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes
&wal_header,
checksums,
native, // this is false because we haven't encoded the wal header yet
);
wal_header.checksum_1 = checksums.0;
wal_header.checksum_2 = checksums.1;
sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?;
Arc::new(RwLock::new(wal_header))
};
let checksum = {
let checksum = header.read().unwrap();
(checksum.checksum_1, checksum.checksum_2)
};
let shared = WalFileShared {
wal_header: header,
min_frame: 0,
max_frame: 0,
nbackfills: 0,
frame_cache: HashMap::new(),
last_checksum: checksum,
file,
};
Ok(Arc::new(RwLock::new(shared)))
}
}

View File

@@ -13,7 +13,7 @@ struct SimulatorEnv {
tables: Vec<Table>,
connections: Vec<SimConnection>,
io: Arc<SimulatorIO>,
db: Rc<Database>,
db: Arc<Database>,
rng: ChaCha8Rng,
}

View File

@@ -33,7 +33,7 @@ pub mod util;
use util::sqlite3_safety_check_sick_or_ok;
pub struct sqlite3 {
pub(crate) _db: Rc<limbo_core::Database>,
pub(crate) _db: Arc<limbo_core::Database>,
pub(crate) conn: Rc<limbo_core::Connection>,
pub(crate) err_code: ffi::c_int,
pub(crate) err_mask: ffi::c_int,
@@ -43,7 +43,7 @@ pub struct sqlite3 {
}
impl sqlite3 {
pub fn new(db: Rc<limbo_core::Database>, conn: Rc<limbo_core::Connection>) -> Self {
pub fn new(db: Arc<limbo_core::Database>, conn: Rc<limbo_core::Connection>) -> Self {
Self {
_db: db,
conn,