core: pager allocate page

This commit is contained in:
Pere Diaz Bou
2024-07-27 17:00:43 +02:00
parent 35c3fe7448
commit dac2868c66
3 changed files with 202 additions and 58 deletions

View File

@@ -1,4 +1,4 @@
use crate::pager::Pager;
use crate::pager::{Page, Pager};
use crate::sqlite3_ondisk::{
read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType, TableInteriorCell,
TableLeafCell,
@@ -6,6 +6,8 @@ use crate::sqlite3_ondisk::{
use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue};
use crate::Result;
use std::any::Any;
use std::borrow::BorrowMut;
use std::cell::{Ref, RefCell};
use std::rc::Rc;
@@ -70,7 +72,7 @@ impl BTreeCursor {
};
let page_idx = mem_page.page_idx;
let page = self.pager.read_page(page_idx)?;
let page = page.borrow();
let page = RefCell::borrow(&page);
if page.is_locked() {
return Ok(CursorResult::IO);
}
@@ -142,7 +144,7 @@ impl BTreeCursor {
};
let page_idx = mem_page.page_idx;
let page = self.pager.read_page(page_idx)?;
let page = page.borrow();
let page = RefCell::borrow(&page);
if page.is_locked() {
return Ok(CursorResult::IO);
}
@@ -207,14 +209,8 @@ impl BTreeCursor {
key: &OwnedValue,
_record: &OwnedRecord,
) -> Result<CursorResult<()>> {
let mem_page = {
let mem_page = self.page.borrow();
let mem_page = mem_page.as_ref().unwrap();
mem_page.clone()
};
let page_idx = mem_page.page_idx;
let page_ref = self.pager.read_page(page_idx)?;
let page = page_ref.borrow();
let page_ref = self.get_page()?;
let page = RefCell::borrow(&page_ref);
if page.is_locked() {
return Ok(CursorResult::IO);
}
@@ -226,8 +222,6 @@ impl BTreeCursor {
let page = page.as_mut().unwrap();
assert!(matches!(page.page_type(), PageType::TableLeaf));
let free = self.compute_free_space(page, self.database_header.borrow());
// find cell
let int_key = match key {
OwnedValue::Integer(i) => *i as u64,
@@ -268,9 +262,18 @@ impl BTreeCursor {
payload.splice(0..0, data_len_varint.iter().cloned());
}
let usable_space = {
let db_header = RefCell::borrow(&self.database_header);
(db_header.page_size - db_header.unused_space as u16) as usize
};
let free = self.compute_free_space(page, RefCell::borrow(&self.database_header));
assert!(
payload.len() <= usable_space - 100, /* 100 bytes minus for precaution to remember */
"need to implemented overflow pages, too big to even add to a an empty page"
);
if payload.len() + 2 > free as usize {
// overflow or balance
todo!("overflow/balance");
self.balance_leaf(int_key, &payload);
} else {
// insert
let pc = self.allocate_cell_space(page, payload.len() as u16);
@@ -305,6 +308,76 @@ impl BTreeCursor {
Ok(CursorResult::Ok(()))
}
fn get_page(&mut self) -> crate::Result<Rc<RefCell<Page>>> {
let mem_page = {
let mem_page = self.page.borrow();
let mem_page = mem_page.as_ref().unwrap();
mem_page.clone()
};
let page_idx = mem_page.page_idx;
let page_ref = self.pager.read_page(page_idx)?;
Ok(page_ref)
}
fn balance_leaf(&mut self, key: u64, payload: &Vec<u8>) {
// This is a naive algorithm that doesn't try to distribute cells evenly by content.
// It will try to split the page in half by keys not by content.
// Sqlite tries to have a page at least 40% full.
loop {
let mem_page = {
let mem_page = self.page.borrow();
let mem_page = mem_page.as_ref().unwrap();
mem_page.clone()
};
let page_ref = self.read_page_sync(mem_page.page_idx);
let page = RefCell::borrow_mut(&page_ref);
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
let free = self.compute_free_space(page, RefCell::borrow(&self.database_header));
if payload.len() + 2 <= free as usize {
break;
}
let right_page_ref = self.allocate_page(page.page_type());
let right_page = RefCell::borrow_mut(&right_page_ref);
let mut right_page = right_page.contents.write().unwrap();
let right_page = right_page.as_mut().unwrap();
}
}
fn read_page_sync(&mut self, page_idx: usize) -> Rc<RefCell<Page>> {
loop {
let page_ref = self.pager.read_page(page_idx);
match page_ref {
Ok(p) => return p,
Err(_) => {}
}
}
}
fn allocate_page(&mut self, page_type: PageType) -> Rc<RefCell<Page>> {
let page = self.pager.allocate_page().unwrap();
{
// setup btree page
let contents = RefCell::borrow(&page);
let mut contents = contents.contents.write().unwrap();
let contents = contents.as_mut().unwrap();
let id = page_type as u8;
contents.write_u8(0, id);
contents.write_u16(1, 0);
contents.write_u16(3, 0);
contents.write_u16(5, 0);
contents.write_u8(7, 0);
contents.write_u32(8, 0);
}
page
}
/*
Allocate space for a cell on a page.
*/
fn allocate_cell_space(&mut self, page_ref: &PageContent, amount: u16) -> u16 {
let amount = amount as usize;
let mut buf_ref = RefCell::borrow_mut(&page_ref.buffer);
@@ -317,19 +390,19 @@ impl BTreeCursor {
// there are free blocks and enough space
if page_ref.first_freeblock() != 0 && gap + 2 <= top {
// find slot
let db_header = self.database_header.borrow();
let db_header = RefCell::borrow(&self.database_header);
let pc = find_free_cell(page_ref, db_header, amount, buf);
return pc as u16;
}
if gap + 2 + amount as usize > top {
// defragment
self.defragment_page(page_ref, self.database_header.borrow());
self.defragment_page(page_ref, RefCell::borrow(&self.database_header));
top = u16::from_be_bytes([buf[5], buf[6]]) as usize;
return 0;
}
let db_header = self.database_header.borrow();
let db_header = RefCell::borrow(&self.database_header);
top -= amount;
buf[5..7].copy_from_slice(&(top as u16).to_be_bytes());
let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize;
@@ -347,7 +420,7 @@ impl BTreeCursor {
let last_cell = (usable_space - 4) as u64;
let first_cell = cloned_page.cell_content_area() as u64;
if cloned_page.cell_count() > 0 {
let buf = cloned_page.buffer.borrow();
let buf = RefCell::borrow(&cloned_page.buffer);
let buf = buf.as_slice();
let mut write_buf = RefCell::borrow_mut(&page.buffer);
let write_buf = write_buf.as_mut_slice();
@@ -403,7 +476,7 @@ impl BTreeCursor {
// Free blocks can be zero, meaning the "real free space" that can be used to allocate is expected to be between first cell byte
// and end of cell pointer area.
fn compute_free_space(&self, page: &PageContent, db_header: Ref<DatabaseHeader>) -> u16 {
let buffer = page.buffer.borrow();
let buffer = RefCell::borrow(&page.buffer);
let buf = buffer.as_slice();
let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize;
@@ -568,14 +641,8 @@ impl Cursor for BTreeCursor {
CursorResult::Ok(_) => {}
CursorResult::IO => return Ok(CursorResult::IO),
};
let mem_page = {
let mem_page = self.page.borrow();
let mem_page = mem_page.as_ref().unwrap();
mem_page.clone()
};
let page_idx = mem_page.page_idx;
let page_ref = self.pager.read_page(page_idx)?;
let page = page_ref.borrow();
let page_ref = self.get_page()?;
let page = RefCell::borrow(&page_ref);
if page.is_locked() {
return Ok(CursorResult::IO);
}

View File

@@ -1,9 +1,10 @@
use crate::buffer_pool::BufferPool;
use crate::sqlite3_ondisk::PageContent;
use crate::sqlite3_ondisk::{self, DatabaseHeader};
use crate::{PageSource, Result};
use crate::{Buffer, PageSource, Result};
use log::trace;
use sieve_cache::SieveCache;
use std::borrow::Borrow;
use std::cell::RefCell;
use std::collections::HashMap;
use std::hash::Hash;
@@ -227,7 +228,7 @@ impl DumbLruPageCache {
return;
}
let tail = unsafe { tail.unwrap().as_mut() };
if tail.page.borrow().is_dirty() {
if RefCell::borrow(&tail.page).is_dirty() {
// TODO: drop from another clean entry?
return;
}
@@ -269,6 +270,7 @@ pub struct Pager {
/// I/O interface for input/output operations.
pub io: Arc<dyn crate::io::IO>,
dirty_pages: Rc<RefCell<Vec<Rc<RefCell<Page>>>>>,
db_header: Rc<RefCell<DatabaseHeader>>,
}
impl Pager {
@@ -279,11 +281,11 @@ impl Pager {
/// Completes opening a database by initializing the Pager with the database header.
pub fn finish_open(
db_header: Rc<RefCell<DatabaseHeader>>,
db_header_ref: Rc<RefCell<DatabaseHeader>>,
page_source: PageSource,
io: Arc<dyn crate::io::IO>,
) -> Result<Self> {
let db_header = db_header.borrow();
let db_header = RefCell::borrow(&db_header_ref);
let page_size = db_header.page_size as usize;
let buffer_pool = Rc::new(BufferPool::new(page_size));
let page_cache = RefCell::new(DumbLruPageCache::new(10));
@@ -293,6 +295,7 @@ impl Pager {
page_cache,
io,
dirty_pages: Rc::new(RefCell::new(Vec::new())),
db_header: db_header_ref.clone(),
})
}
@@ -304,7 +307,7 @@ impl Pager {
return Ok(page.clone());
}
let page = Rc::new(RefCell::new(Page::new(page_idx)));
page.borrow().set_locked();
RefCell::borrow(&page).set_locked();
sqlite3_ondisk::begin_read_page(
&self.page_source,
self.buffer_pool.clone(),
@@ -346,4 +349,44 @@ impl Pager {
self.io.run_once()?;
Ok(())
}
/*
Get's a new page that increasing the size of the page or uses a free page.
Currently free list pages are not yet supported.
*/
pub fn allocate_page(&self) -> Result<Rc<RefCell<Page>>> {
let header = &self.db_header;
let mut header = RefCell::borrow_mut(&header);
header.database_size += 1;
{
// update database size
let first_page_ref = self.read_page(1).unwrap();
let first_page = RefCell::borrow_mut(&first_page_ref);
first_page.set_dirty();
self.add_dirty(first_page_ref.clone());
let contents = first_page.contents.write().unwrap();
let contents = contents.as_ref().unwrap();
contents.write_database_header(&header);
}
let page_ref = Rc::new(RefCell::new(Page::new(0)));
{
// setup page and add to cache
self.add_dirty(page_ref.clone());
let mut page = RefCell::borrow_mut(&page_ref);
page.set_dirty();
page.id = header.database_size as usize;
let buffer = self.buffer_pool.get();
let bp = self.buffer_pool.clone();
let drop_fn = Rc::new(move |buf| {
bp.put(buf);
});
let buffer = Rc::new(RefCell::new(Buffer::new(buffer, drop_fn)));
page.contents = RwLock::new(Some(PageContent { offset: 0, buffer }));
let mut cache = RefCell::borrow_mut(&self.page_cache);
cache.insert(page.id, page_ref.clone());
}
Ok(page_ref)
}
}

View File

@@ -52,7 +52,7 @@ pub struct DatabaseHeader {
min_embed_frac: u8,
min_leaf_frac: u8,
change_counter: u32,
database_size: u32,
pub database_size: u32,
freelist_trunk_page: u32,
freelist_pages: u32,
schema_cookie: u32,
@@ -134,31 +134,7 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re
{
let mut buf_mut = std::cell::RefCell::borrow_mut(&buffer);
let buf = buf_mut.as_mut_slice();
buf[0..16].copy_from_slice(&header.magic);
buf[16..18].copy_from_slice(&header.page_size.to_be_bytes());
buf[18] = header.write_version;
buf[19] = header.read_version;
buf[20] = header.unused_space;
buf[21] = header.max_embed_frac;
buf[22] = header.min_embed_frac;
buf[23] = header.min_leaf_frac;
buf[24..28].copy_from_slice(&header.change_counter.to_be_bytes());
buf[28..32].copy_from_slice(&header.database_size.to_be_bytes());
buf[32..36].copy_from_slice(&header.freelist_trunk_page.to_be_bytes());
buf[36..40].copy_from_slice(&header.freelist_pages.to_be_bytes());
buf[40..44].copy_from_slice(&header.schema_cookie.to_be_bytes());
buf[44..48].copy_from_slice(&header.schema_format.to_be_bytes());
buf[48..52].copy_from_slice(&header.default_cache_size.to_be_bytes());
buf[52..56].copy_from_slice(&header.vacuum.to_be_bytes());
buf[56..60].copy_from_slice(&header.text_encoding.to_be_bytes());
buf[60..64].copy_from_slice(&header.user_version.to_be_bytes());
buf[64..68].copy_from_slice(&header.incremental_vacuum.to_be_bytes());
buf[68..72].copy_from_slice(&header.application_id.to_be_bytes());
buf[72..92].copy_from_slice(&header.reserved);
buf[92..96].copy_from_slice(&header.version_valid_for.to_be_bytes());
buf[96..100].copy_from_slice(&header.version_number.to_be_bytes());
write_header_to_buf(buf, &header);
let mut buffer_to_copy = std::cell::RefCell::borrow_mut(&buffer_to_copy_in_cb);
let buffer_to_copy_slice = buffer_to_copy.as_mut_slice();
@@ -188,6 +164,34 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re
Ok(())
}
fn write_header_to_buf(buf: &mut [u8], header: &DatabaseHeader) {
buf[0..16].copy_from_slice(&header.magic);
buf[16..18].copy_from_slice(&header.page_size.to_be_bytes());
buf[18] = header.write_version;
buf[19] = header.read_version;
buf[20] = header.unused_space;
buf[21] = header.max_embed_frac;
buf[22] = header.min_embed_frac;
buf[23] = header.min_leaf_frac;
buf[24..28].copy_from_slice(&header.change_counter.to_be_bytes());
buf[28..32].copy_from_slice(&header.database_size.to_be_bytes());
buf[32..36].copy_from_slice(&header.freelist_trunk_page.to_be_bytes());
buf[36..40].copy_from_slice(&header.freelist_pages.to_be_bytes());
buf[40..44].copy_from_slice(&header.schema_cookie.to_be_bytes());
buf[44..48].copy_from_slice(&header.schema_format.to_be_bytes());
buf[48..52].copy_from_slice(&header.default_cache_size.to_be_bytes());
buf[52..56].copy_from_slice(&header.vacuum.to_be_bytes());
buf[56..60].copy_from_slice(&header.text_encoding.to_be_bytes());
buf[60..64].copy_from_slice(&header.user_version.to_be_bytes());
buf[64..68].copy_from_slice(&header.incremental_vacuum.to_be_bytes());
buf[68..72].copy_from_slice(&header.application_id.to_be_bytes());
buf[72..92].copy_from_slice(&header.reserved);
buf[92..96].copy_from_slice(&header.version_valid_for.to_be_bytes());
buf[96..100].copy_from_slice(&header.version_number.to_be_bytes());
}
#[repr(u8)]
#[derive(Debug, PartialEq, Clone)]
pub enum PageType {
@@ -245,6 +249,30 @@ impl PageContent {
}
}
pub fn write_u8(&self, pos: usize, value: u8) {
unsafe {
let buf_pointer = &self.buffer.as_ptr();
let buf = (*buf_pointer).as_mut().unwrap().as_mut_slice();
buf[self.offset + pos] = value;
}
}
pub fn write_u16(&self, pos: usize, value: u16) {
unsafe {
let buf_pointer = &self.buffer.as_ptr();
let buf = (*buf_pointer).as_mut().unwrap().as_mut_slice();
buf[self.offset + pos..self.offset + pos + 2].copy_from_slice(&value.to_be_bytes());
}
}
pub fn write_u32(&self, pos: usize, value: u32) {
unsafe {
let buf_pointer = &self.buffer.as_ptr();
let buf = (*buf_pointer).as_mut().unwrap().as_mut_slice();
buf[self.offset + pos..self.offset + pos + 4].copy_from_slice(&value.to_be_bytes());
}
}
pub fn first_freeblock(&self) -> u16 {
self.read_u16(1)
}
@@ -296,6 +324,12 @@ impl PageContent {
PageType::TableLeaf => true,
}
}
pub fn write_database_header(&self, header: &DatabaseHeader) {
let mut buf = self.buffer.borrow_mut();
let buf = buf.as_mut_slice();
write_header_to_buf(buf, header);
}
}
pub fn begin_read_page(