mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-03 08:24:19 +01:00
Prepare for asynchronous I/O with page flags
We want to submit I/O asynchronously with io_uring, which means we can have multiple concurrent lookups to the page that is under I/O. Therefore, let's add three page flags: up-to-date, locked, and error, so that the higher level layers can just look up a page, but wait if there's I/O happening.
This commit is contained in:
@@ -98,6 +98,8 @@ impl Cursor {
|
||||
};
|
||||
let page_idx = mem_page.page_idx;
|
||||
let page = self.pager.read_page(page_idx)?;
|
||||
assert!(page.is_uptodate());
|
||||
let page = &page.contents;
|
||||
if mem_page.cell_idx() >= page.cells.len() {
|
||||
let parent = mem_page.parent.clone();
|
||||
match page.header.right_most_pointer {
|
||||
|
||||
@@ -4,11 +4,71 @@ use crate::sqlite3_ondisk;
|
||||
use crate::sqlite3_ondisk::BTreePage;
|
||||
use crate::Storage;
|
||||
use concurrent_lru::unsharded::LruCache;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc, Mutex,
|
||||
};
|
||||
|
||||
pub struct Page {
|
||||
flags: AtomicUsize,
|
||||
pub contents: BTreePage,
|
||||
}
|
||||
|
||||
/// Page is up-to-date.
|
||||
const PAGE_UPTODATE: usize = 0b001;
|
||||
/// Page is locked for I/O to prevent concurrent access.
|
||||
const PAGE_LOCKED: usize = 0b010;
|
||||
/// Page had an I/O error.
|
||||
const PAGE_ERROR: usize = 0b100;
|
||||
|
||||
impl Page {
|
||||
pub fn new(contents: BTreePage) -> Page {
|
||||
Page {
|
||||
flags: AtomicUsize::new(0),
|
||||
contents,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_uptodate(&self) -> bool {
|
||||
self.flags.load(Ordering::SeqCst) & PAGE_UPTODATE != 0
|
||||
}
|
||||
|
||||
pub fn set_uptodate(&self) {
|
||||
self.flags.fetch_or(PAGE_UPTODATE, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
pub fn clear_uptodate(&self) {
|
||||
self.flags.fetch_and(!PAGE_UPTODATE, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
pub fn is_locked(&self) -> bool {
|
||||
self.flags.load(Ordering::SeqCst) & PAGE_LOCKED != 0
|
||||
}
|
||||
|
||||
pub fn set_locked(&self) {
|
||||
self.flags.fetch_or(PAGE_LOCKED, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
pub fn clear_locked(&self) {
|
||||
self.flags.fetch_and(!PAGE_LOCKED, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
pub fn is_error(&self) -> bool {
|
||||
self.flags.load(Ordering::SeqCst) & PAGE_ERROR != 0
|
||||
}
|
||||
|
||||
pub fn set_error(&self) {
|
||||
self.flags.fetch_or(PAGE_ERROR, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
pub fn clear_error(&self) {
|
||||
self.flags.fetch_and(!PAGE_ERROR, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Pager {
|
||||
storage: Storage,
|
||||
page_cache: LruCache<usize, Arc<BTreePage>>,
|
||||
page_cache: LruCache<usize, Arc<Page>>,
|
||||
buffer_pool: Arc<Mutex<BufferPool>>,
|
||||
}
|
||||
|
||||
@@ -25,12 +85,14 @@ impl Pager {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn read_page(&self, page_idx: usize) -> anyhow::Result<Arc<BTreePage>> {
|
||||
pub fn read_page(&self, page_idx: usize) -> anyhow::Result<Arc<Page>> {
|
||||
let handle = self.page_cache.get_or_try_init(page_idx, 1, |_idx| {
|
||||
let mut buffer_pool = self.buffer_pool.lock().unwrap();
|
||||
let page =
|
||||
sqlite3_ondisk::read_btree_page(&self.storage, &mut buffer_pool, page_idx).unwrap();
|
||||
Ok::<Arc<BTreePage>, anyhow::Error>(Arc::new(page))
|
||||
let page = Page::new(page);
|
||||
page.set_uptodate();
|
||||
Ok::<Arc<Page>, anyhow::Error>(Arc::new(page))
|
||||
})?;
|
||||
Ok(handle.value().clone())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user