Merge 'core: Wrap Pager dirty_pages in RwLock' from Pekka Enberg

Make it Sync and Send.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #3175
This commit is contained in:
Pekka Enberg
2025-09-17 13:34:47 +03:00
committed by GitHub

View File

@@ -470,7 +470,7 @@ pub struct Pager {
pub buffer_pool: Arc<BufferPool>,
/// I/O interface for input/output operations.
pub io: Arc<dyn crate::io::IO>,
dirty_pages: Rc<RefCell<HashSet<usize, hash::BuildHasherDefault<hash::DefaultHasher>>>>,
dirty_pages: Arc<RwLock<HashSet<usize, hash::BuildHasherDefault<hash::DefaultHasher>>>>,
commit_info: CommitInfo,
checkpoint_state: RefCell<CheckpointState>,
@@ -576,7 +576,7 @@ impl Pager {
wal,
page_cache,
io,
dirty_pages: Rc::new(RefCell::new(HashSet::with_hasher(
dirty_pages: Arc::new(RwLock::new(HashSet::with_hasher(
hash::BuildHasherDefault::new(),
))),
commit_info: CommitInfo {
@@ -1219,7 +1219,7 @@ impl Pager {
pub fn add_dirty(&self, page: &Page) {
// TODO: check duplicates?
let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages);
let mut dirty_pages = self.dirty_pages.write();
dirty_pages.insert(page.get().id);
page.set_dirty();
}
@@ -1249,7 +1249,7 @@ impl Pager {
};
let dirty_pages = self
.dirty_pages
.borrow()
.read()
.iter()
.copied()
.collect::<Vec<usize>>();
@@ -1334,7 +1334,7 @@ impl Pager {
.get()
};
let dirty_ids: Vec<usize> = self.dirty_pages.borrow().iter().copied().collect();
let dirty_ids: Vec<usize> = self.dirty_pages.read().iter().copied().collect();
if dirty_ids.is_empty() {
return Ok(IOResult::Done(PagerCommitResult::WalWritten));
}
@@ -1382,7 +1382,7 @@ impl Pager {
}
}
}
self.dirty_pages.borrow_mut().clear();
self.dirty_pages.write().clear();
// Nothing to append
if completions.is_empty() {
return Ok(IOResult::Done(PagerCommitResult::WalWritten));
@@ -1519,13 +1519,13 @@ impl Pager {
})?;
}
if header.is_commit_frame() {
for page_id in self.dirty_pages.borrow().iter() {
for page_id in self.dirty_pages.read().iter() {
let page_key = PageCacheKey::new(*page_id);
let mut cache = self.page_cache.write();
let page = cache.get(&page_key)?.expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
page.clear_dirty();
}
self.dirty_pages.borrow_mut().clear();
self.dirty_pages.write().clear();
}
Ok(WalFrameInfo {
page_no: header.page_number,
@@ -1573,7 +1573,7 @@ impl Pager {
/// of a rollback or in case we want to invalidate page cache after starting a read transaction
/// right after new writes happened which would invalidate current page cache.
pub fn clear_page_cache(&self) {
let dirty_pages = self.dirty_pages.borrow();
let dirty_pages = self.dirty_pages.read();
let mut cache = self.page_cache.write();
for page_id in dirty_pages.iter() {
let page_key = PageCacheKey::new(*page_id);
@@ -2138,10 +2138,10 @@ impl Pager {
tracing::debug!(schema_did_change);
self.clear_page_cache();
if is_write {
self.dirty_pages.borrow_mut().clear();
self.dirty_pages.write().clear();
} else {
turso_assert!(
self.dirty_pages.borrow().is_empty(),
self.dirty_pages.read().is_empty(),
"dirty pages should be empty for read txn"
);
}