mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-09 02:04:22 +01:00
Introduce PageRef wrapper BTreePage.
One problem we have with PageRef, is that this Page reference can be unloaded, this means if we read the page again instead of loading the page onto the same reference, we will have split brain of references. To solve this we wrap PageRef in `BTreePage` so that if a page is seen as unloaded, we will replace BTreePage::page with the newest version of the page.
This commit is contained in:
@@ -56,7 +56,7 @@ use std::{
|
||||
rc::Rc,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
use storage::btree::btree_init_page;
|
||||
use storage::btree::{btree_init_page, BTreePageInner};
|
||||
#[cfg(feature = "fs")]
|
||||
use storage::database::DatabaseFile;
|
||||
pub use storage::{
|
||||
@@ -271,6 +271,9 @@ pub fn maybe_init_database_file(file: &Arc<dyn File>, io: &Arc<dyn IO>) -> Resul
|
||||
&Rc::new(BufferPool::new(db_header.get_page_size() as usize)),
|
||||
DATABASE_HEADER_SIZE,
|
||||
);
|
||||
let page1 = Arc::new(BTreePageInner {
|
||||
page: RefCell::new(page1),
|
||||
});
|
||||
{
|
||||
// Create the sqlite_schema table, for this we just need to create the btree page
|
||||
// for the first page of the database which is basically like any other btree page
|
||||
@@ -283,6 +286,7 @@ pub fn maybe_init_database_file(file: &Arc<dyn File>, io: &Arc<dyn IO>) -> Resul
|
||||
(db_header.get_page_size() - db_header.reserved_space as u32) as u16,
|
||||
);
|
||||
|
||||
let page1 = page1.get();
|
||||
let contents = page1.get().contents.as_mut().unwrap();
|
||||
contents.write_database_header(&db_header);
|
||||
// write the first page to disk synchronously
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,5 +1,6 @@
|
||||
use crate::fast_lock::SpinLock;
|
||||
use crate::result::LimboResult;
|
||||
use crate::storage::btree::BTreePageInner;
|
||||
use crate::storage::buffer_pool::BufferPool;
|
||||
use crate::storage::database::DatabaseStorage;
|
||||
use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType};
|
||||
@@ -13,6 +14,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tracing::trace;
|
||||
|
||||
use super::btree::BTreePage;
|
||||
use super::page_cache::{CacheError, CacheResizeResult, DumbLruPageCache, PageCacheKey};
|
||||
use super::wal::{CheckpointMode, CheckpointStatus};
|
||||
|
||||
@@ -222,7 +224,7 @@ impl Pager {
|
||||
_ => unreachable!("Invalid flags state"),
|
||||
};
|
||||
let page = self.do_allocate_page(page_type, 0);
|
||||
let id = page.get().id;
|
||||
let id = page.get().get().id;
|
||||
id as u32
|
||||
}
|
||||
|
||||
@@ -244,13 +246,16 @@ impl Pager {
|
||||
/// Allocate a new page to the btree via the pager.
|
||||
/// This marks the page as dirty and writes the page header.
|
||||
// FIXME: handle no room in page cache
|
||||
pub fn do_allocate_page(&self, page_type: PageType, offset: usize) -> PageRef {
|
||||
pub fn do_allocate_page(&self, page_type: PageType, offset: usize) -> BTreePage {
|
||||
let page = self.allocate_page().unwrap();
|
||||
let page = Arc::new(BTreePageInner {
|
||||
page: RefCell::new(page),
|
||||
});
|
||||
crate::btree_init_page(&page, page_type, offset, self.usable_space() as u16);
|
||||
tracing::debug!(
|
||||
"do_allocate_page(id={}, page_type={:?})",
|
||||
page.get().id,
|
||||
page.get_contents().page_type()
|
||||
page.get().get().id,
|
||||
page.get().get_contents().page_type()
|
||||
);
|
||||
page
|
||||
}
|
||||
@@ -368,61 +373,6 @@ impl Pager {
|
||||
Ok(page)
|
||||
}
|
||||
|
||||
/// Loads pages if not loaded
|
||||
pub fn load_page(&self, page: PageRef) -> Result<()> {
|
||||
let id = page.get().id;
|
||||
trace!("load_page(page_idx = {})", id);
|
||||
let mut page_cache = self.page_cache.write();
|
||||
page.set_locked();
|
||||
let max_frame = match &self.wal {
|
||||
Some(wal) => wal.borrow().get_max_frame(),
|
||||
None => 0,
|
||||
};
|
||||
let page_key = PageCacheKey::new(id, Some(max_frame));
|
||||
if let Some(wal) = &self.wal {
|
||||
if let Some(frame_id) = wal.borrow().find_frame(id as u64)? {
|
||||
wal.borrow()
|
||||
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
|
||||
{
|
||||
page.set_uptodate();
|
||||
}
|
||||
match page_cache.insert(page_key, page.clone()) {
|
||||
Err(CacheError::KeyExists) => {} // Exists but same page, not error
|
||||
Err(CacheError::Full) => return Err(LimboError::CacheFull),
|
||||
Err(e) => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Failed to insert page into cache during load: {:?}",
|
||||
e
|
||||
)))
|
||||
}
|
||||
Ok(_) => {}
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
match page_cache.insert(page_key, page.clone()) {
|
||||
Err(CacheError::KeyExists) => {} // Ensures same page
|
||||
Err(CacheError::Full) => return Err(LimboError::CacheFull),
|
||||
Err(e) => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Failed to insert page into cache during load: {:?}",
|
||||
e
|
||||
)))
|
||||
}
|
||||
Ok(_) => {}
|
||||
};
|
||||
|
||||
sqlite3_ondisk::begin_read_page(
|
||||
self.db_file.clone(),
|
||||
self.buffer_pool.clone(),
|
||||
page.clone(),
|
||||
id,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes the database header.
|
||||
pub fn write_database_header(&self, header: &DatabaseHeader) {
|
||||
sqlite3_ondisk::begin_write_database_header(header, self).expect("failed to write header");
|
||||
@@ -464,6 +414,8 @@ impl Pager {
|
||||
let max_frame_after_append = self.wal.as_ref().map(|wal| {
|
||||
wal.borrow().get_max_frame() + self.dirty_pages.borrow().len() as u64
|
||||
});
|
||||
tracing::error!("start flush");
|
||||
tracing::error!("pages={:?}", self.dirty_pages.borrow());
|
||||
for page_id in self.dirty_pages.borrow().iter() {
|
||||
let mut cache = self.page_cache.write();
|
||||
let page_key = PageCacheKey::new(*page_id, Some(max_frame));
|
||||
|
||||
Reference in New Issue
Block a user