From 90a5a53b0eda3574734fe4a2913aeec621560da1 Mon Sep 17 00:00:00 2001 From: Zaid Humayun Date: Sat, 5 Jul 2025 18:04:48 +0530 Subject: [PATCH 1/2] Added Async Header Accessor API's This commit introduces async header accessor API's in addition to the sync ones. Allows gradual porting instead of one big shot PR. --- core/storage/header_accessor.rs | 78 +++++++++++++++++++++++---------- core/storage/pager.rs | 25 ++++++++--- 2 files changed, 74 insertions(+), 29 deletions(-) diff --git a/core/storage/header_accessor.rs b/core/storage/header_accessor.rs index 761bdc7b4..c51161a82 100644 --- a/core/storage/header_accessor.rs +++ b/core/storage/header_accessor.rs @@ -4,6 +4,7 @@ use crate::{ pager::{PageRef, Pager}, sqlite3_ondisk::DATABASE_HEADER_PAGE_ID, }, + types::CursorResult, LimboError, Result, }; use std::sync::atomic::Ordering; @@ -33,22 +34,21 @@ const HEADER_OFFSET_VERSION_VALID_FOR: usize = 92; const HEADER_OFFSET_VERSION_NUMBER: usize = 96; // Helper to get a read-only reference to the header page. -fn get_header_page(pager: &Pager) -> Result { +fn get_header_page(pager: &Pager) -> Result> { if pager.is_empty.load(Ordering::SeqCst) < 2 { return Err(LimboError::InternalError( "Database is empty, header does not exist - page 1 should've been allocated before this".to_string(), )); } let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?; - while !page.is_loaded() || page.is_locked() { - // FIXME: LETS STOP DOING THESE SYNCHRONOUS IO HACKS - pager.io.run_once()?; + if !page.is_loaded() || page.is_locked() { + return Ok(CursorResult::IO); } - Ok(page) + Ok(CursorResult::Ok(page)) } // Helper to get a writable reference to the header page and mark it dirty. -fn get_header_page_for_write(pager: &Pager) -> Result { +fn get_header_page_for_write(pager: &Pager) -> Result> { if pager.is_empty.load(Ordering::SeqCst) < 2 { // This should not be called on an empty DB for writing, as page 1 is allocated on first transaction. return Err(LimboError::InternalError( @@ -56,20 +56,36 @@ fn get_header_page_for_write(pager: &Pager) -> Result { )); } let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?; - while !page.is_loaded() || page.is_locked() { - // FIXME: LETS STOP DOING THESE SYNCHRONOUS IO HACKS - pager.io.run_once()?; + if !page.is_loaded() || page.is_locked() { + return Ok(CursorResult::IO); } page.set_dirty(); pager.add_dirty(DATABASE_HEADER_PAGE_ID); - Ok(page) + Ok(CursorResult::Ok(page)) +} + +/// Helper function to run async header accessors until completion +fn run_header_accessor_until_done(pager: &Pager, mut accessor: F) -> Result +where + F: FnMut() -> Result>, +{ + loop { + match accessor()? { + CursorResult::Ok(value) => return Ok(value), + CursorResult::IO => { + pager.io.run_once()?; + } + } + } } /// Helper macro to implement getters and setters for header fields. /// For example, `impl_header_field_accessor!(page_size, u16, HEADER_OFFSET_PAGE_SIZE);` /// will generate the following functions: -/// - `pub fn get_page_size(pager: &Pager) -> Result` -/// - `pub fn set_page_size(pager: &Pager, value: u16) -> Result<()>` +/// - `pub fn get_page_size(pager: &Pager) -> Result` (sync) +/// - `pub fn get_page_size_async(pager: &Pager) -> Result>` (async) +/// - `pub fn set_page_size(pager: &Pager, value: u16) -> Result<()>` (sync) +/// - `pub fn set_page_size_async(pager: &Pager, value: u16) -> Result>` (async) /// /// The macro takes three required arguments: /// - `$field_name`: The name of the field to implement. @@ -79,19 +95,21 @@ fn get_header_page_for_write(pager: &Pager) -> Result { /// And a fourth optional argument: /// - `$ifzero`: A value to return if the field is 0. /// -/// The macro will generate the following functions: -/// - `pub fn get_(pager: &Pager) -> Result` -/// - `pub fn set_(pager: &Pager, value: T) -> Result<()>` +/// The macro will generate both sync and async versions of the functions. /// macro_rules! impl_header_field_accessor { ($field_name:ident, $type:ty, $offset:expr $(, $ifzero:expr)?) => { paste::paste! { + // Async version #[allow(dead_code)] - pub fn [](pager: &Pager) -> Result<$type> { + pub fn [](pager: &Pager) -> Result> { if pager.is_empty.load(Ordering::SeqCst) < 2 { return Err(LimboError::InternalError(format!("Database is empty, header does not exist - page 1 should've been allocated before this"))); } - let page = get_header_page(pager)?; + let page = match get_header_page(pager)? { + CursorResult::Ok(page) => page, + CursorResult::IO => return Ok(CursorResult::IO), + }; let page_inner = page.get(); let page_content = page_inner.contents.as_ref().unwrap(); let buf = page_content.buffer.borrow(); @@ -101,15 +119,25 @@ macro_rules! impl_header_field_accessor { let value = <$type>::from_be_bytes(bytes); $( if value == 0 { - return Ok($ifzero); + return Ok(CursorResult::Ok($ifzero)); } )? - Ok(value) + Ok(CursorResult::Ok(value)) } + // Sync version #[allow(dead_code)] - pub fn [](pager: &Pager, value: $type) -> Result<()> { - let page = get_header_page_for_write(pager)?; + pub fn [](pager: &Pager) -> Result<$type> { + run_header_accessor_until_done(pager, || [](pager)) + } + + // Async setter + #[allow(dead_code)] + pub fn [](pager: &Pager, value: $type) -> Result> { + let page = match get_header_page_for_write(pager)? { + CursorResult::Ok(page) => page, + CursorResult::IO => return Ok(CursorResult::IO), + }; let page_inner = page.get(); let page_content = page_inner.contents.as_ref().unwrap(); let mut buf = page_content.buffer.borrow_mut(); @@ -117,7 +145,13 @@ macro_rules! impl_header_field_accessor { buf_slice[$offset..$offset + std::mem::size_of::<$type>()].copy_from_slice(&value.to_be_bytes()); page.set_dirty(); pager.add_dirty(1); - Ok(()) + Ok(CursorResult::Ok(())) + } + + // Sync setter + #[allow(dead_code)] + pub fn [](pager: &Pager, value: $type) -> Result<()> { + run_header_accessor_until_done(pager, || [](pager, value)) } } }; diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 1a4e3f755..2cc89651b 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -314,7 +314,10 @@ impl Pager { #[cfg(not(feature = "omit_autovacuum"))] pub fn ptrmap_get(&self, target_page_num: u32) -> Result>> { tracing::trace!("ptrmap_get(page_idx = {})", target_page_num); - let configured_page_size = header_accessor::get_page_size(self)? as usize; + let configured_page_size = match header_accessor::get_page_size_async(self)? { + CursorResult::Ok(size) => size as usize, + CursorResult::IO => return Ok(CursorResult::IO), + }; if target_page_num < FIRST_PTRMAP_PAGE_NO || is_ptrmap_page(target_page_num, configured_page_size) @@ -400,7 +403,10 @@ impl Pager { parent_page_no ); - let page_size = header_accessor::get_page_size(self)? as usize; + let page_size = match header_accessor::get_page_size_async(self)? { + CursorResult::Ok(size) => size as usize, + CursorResult::IO => return Ok(CursorResult::IO), + }; if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO || is_ptrmap_page(db_page_no_to_update, page_size) @@ -497,15 +503,20 @@ impl Pager { } AutoVacuumMode::Full => { let mut root_page_num = - header_accessor::get_vacuum_mode_largest_root_page(self)?; + match header_accessor::get_vacuum_mode_largest_root_page_async(self)? { + CursorResult::Ok(value) => value, + CursorResult::IO => return Ok(CursorResult::IO), + }; assert!(root_page_num > 0); // Largest root page number cannot be 0 because that is set to 1 when creating the database with autovacuum enabled root_page_num += 1; assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); // can never be less than 2 because we have already incremented - while is_ptrmap_page( - root_page_num, - header_accessor::get_page_size(self)? as usize, - ) { + let page_size = match header_accessor::get_page_size_async(self)? { + CursorResult::Ok(size) => size as usize, + CursorResult::IO => return Ok(CursorResult::IO), + }; + + while is_ptrmap_page(root_page_num, page_size) { root_page_num += 1; } assert!(root_page_num >= 3); // the very first root page is page 3 From 2f0ff89e286b0a6499bef61e47800cd462170d07 Mon Sep 17 00:00:00 2001 From: Zaid Humayun Date: Sat, 12 Jul 2025 09:44:31 +0530 Subject: [PATCH 2/2] resolved jussi's comment https://github.com/tursodatabase/turso/pull/1966#discussion_r2201864782 this commit removes !page.is_loaded() from header_accessor cause it's not required --- core/storage/header_accessor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/storage/header_accessor.rs b/core/storage/header_accessor.rs index c51161a82..d1d7c726a 100644 --- a/core/storage/header_accessor.rs +++ b/core/storage/header_accessor.rs @@ -41,7 +41,7 @@ fn get_header_page(pager: &Pager) -> Result> { )); } let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?; - if !page.is_loaded() || page.is_locked() { + if page.is_locked() { return Ok(CursorResult::IO); } Ok(CursorResult::Ok(page)) @@ -56,7 +56,7 @@ fn get_header_page_for_write(pager: &Pager) -> Result> { )); } let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?; - if !page.is_loaded() || page.is_locked() { + if page.is_locked() { return Ok(CursorResult::IO); } page.set_dirty();