mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-06 17:54:20 +01:00
Merge 'Add async header accessor functionality' from Zaid Humayun
This PR addresses https://github.com/tursodatabase/turso/issues/1828 in a phased manner. Making database header access async in one PR will be complicated. This PR ports adds an async API to `header_accessor.rs` and ports over some of `pager.rs` to use this API. This will allow gradual porting over of all call sites. Once all call sites are ported over, one mechanical rename will fix everything in the repo so we don't have any `<header_name>_async` functions. Also, porting header accessors over from sync to async would be a good way to get introduced to the Limbo codebase for first time contributors. Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com> Closes #1966
This commit is contained in:
@@ -4,6 +4,7 @@ use crate::{
|
||||
pager::{PageRef, Pager},
|
||||
sqlite3_ondisk::DATABASE_HEADER_PAGE_ID,
|
||||
},
|
||||
types::CursorResult,
|
||||
LimboError, Result,
|
||||
};
|
||||
use std::sync::atomic::Ordering;
|
||||
@@ -33,22 +34,21 @@ const HEADER_OFFSET_VERSION_VALID_FOR: usize = 92;
|
||||
const HEADER_OFFSET_VERSION_NUMBER: usize = 96;
|
||||
|
||||
// Helper to get a read-only reference to the header page.
|
||||
fn get_header_page(pager: &Pager) -> Result<PageRef> {
|
||||
fn get_header_page(pager: &Pager) -> Result<CursorResult<PageRef>> {
|
||||
if pager.is_empty.load(Ordering::SeqCst) < 2 {
|
||||
return Err(LimboError::InternalError(
|
||||
"Database is empty, header does not exist - page 1 should've been allocated before this".to_string(),
|
||||
));
|
||||
}
|
||||
let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?;
|
||||
while !page.is_loaded() || page.is_locked() {
|
||||
// FIXME: LETS STOP DOING THESE SYNCHRONOUS IO HACKS
|
||||
pager.io.run_once()?;
|
||||
if page.is_locked() {
|
||||
return Ok(CursorResult::IO);
|
||||
}
|
||||
Ok(page)
|
||||
Ok(CursorResult::Ok(page))
|
||||
}
|
||||
|
||||
// Helper to get a writable reference to the header page and mark it dirty.
|
||||
fn get_header_page_for_write(pager: &Pager) -> Result<PageRef> {
|
||||
fn get_header_page_for_write(pager: &Pager) -> Result<CursorResult<PageRef>> {
|
||||
if pager.is_empty.load(Ordering::SeqCst) < 2 {
|
||||
// This should not be called on an empty DB for writing, as page 1 is allocated on first transaction.
|
||||
return Err(LimboError::InternalError(
|
||||
@@ -56,20 +56,36 @@ fn get_header_page_for_write(pager: &Pager) -> Result<PageRef> {
|
||||
));
|
||||
}
|
||||
let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?;
|
||||
while !page.is_loaded() || page.is_locked() {
|
||||
// FIXME: LETS STOP DOING THESE SYNCHRONOUS IO HACKS
|
||||
pager.io.run_once()?;
|
||||
if page.is_locked() {
|
||||
return Ok(CursorResult::IO);
|
||||
}
|
||||
page.set_dirty();
|
||||
pager.add_dirty(DATABASE_HEADER_PAGE_ID);
|
||||
Ok(page)
|
||||
Ok(CursorResult::Ok(page))
|
||||
}
|
||||
|
||||
/// Helper function to run async header accessors until completion
|
||||
fn run_header_accessor_until_done<T, F>(pager: &Pager, mut accessor: F) -> Result<T>
|
||||
where
|
||||
F: FnMut() -> Result<CursorResult<T>>,
|
||||
{
|
||||
loop {
|
||||
match accessor()? {
|
||||
CursorResult::Ok(value) => return Ok(value),
|
||||
CursorResult::IO => {
|
||||
pager.io.run_once()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper macro to implement getters and setters for header fields.
|
||||
/// For example, `impl_header_field_accessor!(page_size, u16, HEADER_OFFSET_PAGE_SIZE);`
|
||||
/// will generate the following functions:
|
||||
/// - `pub fn get_page_size(pager: &Pager) -> Result<u16>`
|
||||
/// - `pub fn set_page_size(pager: &Pager, value: u16) -> Result<()>`
|
||||
/// - `pub fn get_page_size(pager: &Pager) -> Result<u16>` (sync)
|
||||
/// - `pub fn get_page_size_async(pager: &Pager) -> Result<CursorResult<u16>>` (async)
|
||||
/// - `pub fn set_page_size(pager: &Pager, value: u16) -> Result<()>` (sync)
|
||||
/// - `pub fn set_page_size_async(pager: &Pager, value: u16) -> Result<CursorResult<()>>` (async)
|
||||
///
|
||||
/// The macro takes three required arguments:
|
||||
/// - `$field_name`: The name of the field to implement.
|
||||
@@ -79,19 +95,21 @@ fn get_header_page_for_write(pager: &Pager) -> Result<PageRef> {
|
||||
/// And a fourth optional argument:
|
||||
/// - `$ifzero`: A value to return if the field is 0.
|
||||
///
|
||||
/// The macro will generate the following functions:
|
||||
/// - `pub fn get_<field_name>(pager: &Pager) -> Result<T>`
|
||||
/// - `pub fn set_<field_name>(pager: &Pager, value: T) -> Result<()>`
|
||||
/// The macro will generate both sync and async versions of the functions.
|
||||
///
|
||||
macro_rules! impl_header_field_accessor {
|
||||
($field_name:ident, $type:ty, $offset:expr $(, $ifzero:expr)?) => {
|
||||
paste::paste! {
|
||||
// Async version
|
||||
#[allow(dead_code)]
|
||||
pub fn [<get_ $field_name>](pager: &Pager) -> Result<$type> {
|
||||
pub fn [<get_ $field_name _async>](pager: &Pager) -> Result<CursorResult<$type>> {
|
||||
if pager.is_empty.load(Ordering::SeqCst) < 2 {
|
||||
return Err(LimboError::InternalError(format!("Database is empty, header does not exist - page 1 should've been allocated before this")));
|
||||
}
|
||||
let page = get_header_page(pager)?;
|
||||
let page = match get_header_page(pager)? {
|
||||
CursorResult::Ok(page) => page,
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
};
|
||||
let page_inner = page.get();
|
||||
let page_content = page_inner.contents.as_ref().unwrap();
|
||||
let buf = page_content.buffer.borrow();
|
||||
@@ -101,15 +119,25 @@ macro_rules! impl_header_field_accessor {
|
||||
let value = <$type>::from_be_bytes(bytes);
|
||||
$(
|
||||
if value == 0 {
|
||||
return Ok($ifzero);
|
||||
return Ok(CursorResult::Ok($ifzero));
|
||||
}
|
||||
)?
|
||||
Ok(value)
|
||||
Ok(CursorResult::Ok(value))
|
||||
}
|
||||
|
||||
// Sync version
|
||||
#[allow(dead_code)]
|
||||
pub fn [<set_ $field_name>](pager: &Pager, value: $type) -> Result<()> {
|
||||
let page = get_header_page_for_write(pager)?;
|
||||
pub fn [<get_ $field_name>](pager: &Pager) -> Result<$type> {
|
||||
run_header_accessor_until_done(pager, || [<get_ $field_name _async>](pager))
|
||||
}
|
||||
|
||||
// Async setter
|
||||
#[allow(dead_code)]
|
||||
pub fn [<set_ $field_name _async>](pager: &Pager, value: $type) -> Result<CursorResult<()>> {
|
||||
let page = match get_header_page_for_write(pager)? {
|
||||
CursorResult::Ok(page) => page,
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
};
|
||||
let page_inner = page.get();
|
||||
let page_content = page_inner.contents.as_ref().unwrap();
|
||||
let mut buf = page_content.buffer.borrow_mut();
|
||||
@@ -117,7 +145,13 @@ macro_rules! impl_header_field_accessor {
|
||||
buf_slice[$offset..$offset + std::mem::size_of::<$type>()].copy_from_slice(&value.to_be_bytes());
|
||||
page.set_dirty();
|
||||
pager.add_dirty(1);
|
||||
Ok(())
|
||||
Ok(CursorResult::Ok(()))
|
||||
}
|
||||
|
||||
// Sync setter
|
||||
#[allow(dead_code)]
|
||||
pub fn [<set_ $field_name>](pager: &Pager, value: $type) -> Result<()> {
|
||||
run_header_accessor_until_done(pager, || [<set_ $field_name _async>](pager, value))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -314,7 +314,10 @@ impl Pager {
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
pub fn ptrmap_get(&self, target_page_num: u32) -> Result<CursorResult<Option<PtrmapEntry>>> {
|
||||
tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
|
||||
let configured_page_size = header_accessor::get_page_size(self)? as usize;
|
||||
let configured_page_size = match header_accessor::get_page_size_async(self)? {
|
||||
CursorResult::Ok(size) => size as usize,
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
};
|
||||
|
||||
if target_page_num < FIRST_PTRMAP_PAGE_NO
|
||||
|| is_ptrmap_page(target_page_num, configured_page_size)
|
||||
@@ -397,7 +400,10 @@ impl Pager {
|
||||
parent_page_no
|
||||
);
|
||||
|
||||
let page_size = header_accessor::get_page_size(self)? as usize;
|
||||
let page_size = match header_accessor::get_page_size_async(self)? {
|
||||
CursorResult::Ok(size) => size as usize,
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
};
|
||||
|
||||
if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO
|
||||
|| is_ptrmap_page(db_page_no_to_update, page_size)
|
||||
@@ -492,15 +498,20 @@ impl Pager {
|
||||
}
|
||||
AutoVacuumMode::Full => {
|
||||
let mut root_page_num =
|
||||
header_accessor::get_vacuum_mode_largest_root_page(self)?;
|
||||
match header_accessor::get_vacuum_mode_largest_root_page_async(self)? {
|
||||
CursorResult::Ok(value) => value,
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
};
|
||||
assert!(root_page_num > 0); // Largest root page number cannot be 0 because that is set to 1 when creating the database with autovacuum enabled
|
||||
root_page_num += 1;
|
||||
assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); // can never be less than 2 because we have already incremented
|
||||
|
||||
while is_ptrmap_page(
|
||||
root_page_num,
|
||||
header_accessor::get_page_size(self)? as usize,
|
||||
) {
|
||||
let page_size = match header_accessor::get_page_size_async(self)? {
|
||||
CursorResult::Ok(size) => size as usize,
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
};
|
||||
|
||||
while is_ptrmap_page(root_page_num, page_size) {
|
||||
root_page_num += 1;
|
||||
}
|
||||
assert!(root_page_num >= 3); // the very first root page is page 3
|
||||
|
||||
Reference in New Issue
Block a user