core: Populate page contents lazily

We need to be able to allocate a new page and insert it into a page
cache without contents for asynchronoous I/O. Let's do that by making
`contents` optional in Page. (We perhaps ought to rename it to
`inner`...)
This commit is contained in:
Pekka Enberg
2024-01-12 16:38:11 +02:00
parent 93670b0505
commit a39f5c68b4
3 changed files with 49 additions and 36 deletions

View File

@@ -99,7 +99,8 @@ impl Cursor {
let page_idx = mem_page.page_idx;
let page = self.pager.read_page(page_idx)?;
assert!(page.is_uptodate());
let page = &page.contents;
let page = page.contents.read().unwrap();
let page = page.as_ref().unwrap();
if mem_page.cell_idx() >= page.cells.len() {
let parent = mem_page.parent.clone();
match page.header.right_most_pointer {

View File

@@ -2,8 +2,8 @@ use crate::buffer_pool::BufferPool;
use crate::sqlite3_ondisk;
use crate::sqlite3_ondisk::BTreePage;
use crate::Storage;
use crate::{buffer_pool, Buffer, Completion};
use concurrent_lru::unsharded::LruCache;
use std::sync::RwLock;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
@@ -11,7 +11,7 @@ use std::sync::{
pub struct Page {
flags: AtomicUsize,
pub contents: BTreePage,
pub contents: RwLock<Option<BTreePage>>,
}
/// Page is up-to-date.
@@ -22,10 +22,10 @@ const PAGE_LOCKED: usize = 0b010;
const PAGE_ERROR: usize = 0b100;
impl Page {
pub fn new(contents: BTreePage) -> Page {
pub fn new() -> Page {
Page {
flags: AtomicUsize::new(0),
contents,
contents: RwLock::new(None),
}
}
@@ -76,7 +76,7 @@ impl Pager {
pub fn open(storage: Storage) -> anyhow::Result<Self> {
let db_header = sqlite3_ondisk::read_database_header(&storage)?;
let page_size = db_header.page_size as usize;
let buffer_pool = Arc::new(buffer_pool::BufferPool::new(page_size));
let buffer_pool = Arc::new(BufferPool::new(page_size));
let page_cache = LruCache::new(10);
Ok(Self {
storage,
@@ -87,18 +87,16 @@ impl Pager {
pub fn read_page(&self, page_idx: usize) -> anyhow::Result<Arc<Page>> {
let handle = self.page_cache.get_or_try_init(page_idx, 1, |_idx| {
let buffer_pool = self.buffer_pool.clone();
let drop_fn = Arc::new(move |buf| {
buffer_pool.put(buf);
});
let buf = self.buffer_pool.get();
let buf = Buffer::new(buf, drop_fn);
let complete = Box::new(move |buf: &Buffer| {});
let mut c = Completion::new(buf, complete);
let page = sqlite3_ondisk::read_btree_page(&self.storage, &mut c, page_idx).unwrap();
let page = Page::new(page);
page.set_uptodate();
Ok::<Arc<Page>, anyhow::Error>(Arc::new(page))
let page = Arc::new(Page::new());
page.set_locked();
sqlite3_ondisk::read_btree_page(
&self.storage,
self.buffer_pool.clone(),
page.clone(),
page_idx,
)
.unwrap();
Ok::<Arc<Page>, anyhow::Error>(page)
})?;
Ok(handle.value().clone())
}

View File

@@ -1,5 +1,3 @@
use std::sync::Arc;
/// SQLite on-disk file format.
///
/// SQLite stores data in a single database file, which is divided into fixed-size
@@ -25,10 +23,13 @@ use std::sync::Arc;
/// +-----------------+----------------+---------------------+----------------+
///
/// For more information, see: https://www.sqlite.org/fileformat.html
use crate::buffer_pool::BufferPool;
use crate::io::{Buffer, Completion};
use crate::pager::Page;
use crate::types::{Record, Value};
use crate::Storage;
use anyhow::{anyhow, Result};
use std::sync::Arc;
/// The size of the database header in bytes.
pub const DATABASE_HEADER_SIZE: usize = 100;
@@ -135,42 +136,55 @@ pub struct BTreePage {
pub fn read_btree_page(
storage: &Storage,
c: &mut Completion,
buffer_pool: Arc<BufferPool>,
page: Arc<Page>,
page_idx: usize,
) -> Result<BTreePage> {
storage.get(page_idx, c)?;
) -> Result<()> {
let buf = buffer_pool.get();
let drop_fn = Arc::new(move |buf| {
let buffer_pool = buffer_pool.clone();
buffer_pool.put(buf);
});
let buf = Buffer::new(buf, drop_fn);
let complete = Box::new(move |_buf: &Buffer| {});
let mut c = Completion::new(buf, complete);
storage.get(page_idx, &mut c)?;
let mut pos = if page_idx == 1 {
DATABASE_HEADER_SIZE
} else {
0
};
let page = c.buf.as_slice();
let buf = c.buf.as_slice();
let mut header = BTreePageHeader {
page_type: page[pos].try_into()?,
_first_freeblock_offset: u16::from_be_bytes([page[pos + 1], page[pos + 2]]),
num_cells: u16::from_be_bytes([page[pos + 3], page[pos + 4]]),
_cell_content_area: u16::from_be_bytes([page[pos + 5], page[pos + 6]]),
_num_frag_free_bytes: page[pos + 7],
page_type: buf[pos].try_into()?,
_first_freeblock_offset: u16::from_be_bytes([buf[pos + 1], buf[pos + 2]]),
num_cells: u16::from_be_bytes([buf[pos + 3], buf[pos + 4]]),
_cell_content_area: u16::from_be_bytes([buf[pos + 5], buf[pos + 6]]),
_num_frag_free_bytes: buf[pos + 7],
right_most_pointer: None,
};
pos += 8;
if header.page_type == PageType::IndexInterior || header.page_type == PageType::TableInterior {
header.right_most_pointer = Some(u32::from_be_bytes([
page[pos],
page[pos + 1],
page[pos + 2],
page[pos + 3],
buf[pos],
buf[pos + 1],
buf[pos + 2],
buf[pos + 3],
]));
pos += 4;
}
let mut cells = Vec::new();
for _ in 0..header.num_cells {
let cell_pointer = u16::from_be_bytes([page[pos], page[pos + 1]]);
let cell_pointer = u16::from_be_bytes([buf[pos], buf[pos + 1]]);
pos += 2;
let cell = read_btree_cell(page, &header.page_type, cell_pointer as usize)?;
let cell = read_btree_cell(buf, &header.page_type, cell_pointer as usize)?;
cells.push(cell);
}
Ok(BTreePage { header, cells })
let inner = BTreePage { header, cells };
page.contents.write().unwrap().replace(inner);
page.set_uptodate();
page.clear_locked();
Ok(())
}
#[derive(Debug)]