mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 08:55:40 +01:00
Merge 'core: Wrap Pager::io_ctx in RwLock' from Pekka Enberg
Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com> Closes #3246
This commit is contained in:
@@ -528,7 +528,7 @@ pub struct Pager {
|
||||
header_ref_state: RwLock<HeaderRefState>,
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
vacuum_state: RwLock<VacuumState>,
|
||||
pub(crate) io_ctx: RefCell<IOContext>,
|
||||
pub(crate) io_ctx: RwLock<IOContext>,
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
@@ -638,7 +638,7 @@ impl Pager {
|
||||
ptrmap_put_state: PtrMapPutState::Start,
|
||||
btree_create_vacuum_full_state: BtreeCreateVacuumFullState::Start,
|
||||
}),
|
||||
io_ctx: RefCell::new(IOContext::default()),
|
||||
io_ctx: RwLock::new(IOContext::default()),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1178,7 +1178,7 @@ impl Pager {
|
||||
) -> Result<(PageRef, Completion)> {
|
||||
tracing::trace!("read_page_no_cache(page_idx = {})", page_idx);
|
||||
let page = Arc::new(Page::new(page_idx));
|
||||
let io_ctx = &self.io_ctx.borrow();
|
||||
let io_ctx = self.io_ctx.read();
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
turso_assert!(
|
||||
matches!(frame_watermark, Some(0) | None),
|
||||
@@ -1186,7 +1186,7 @@ impl Pager {
|
||||
);
|
||||
|
||||
page.set_locked();
|
||||
let c = self.begin_read_disk_page(page_idx, page.clone(), allow_empty_read, io_ctx)?;
|
||||
let c = self.begin_read_disk_page(page_idx, page.clone(), allow_empty_read, &io_ctx)?;
|
||||
return Ok((page, c));
|
||||
};
|
||||
|
||||
@@ -1199,7 +1199,7 @@ impl Pager {
|
||||
return Ok((page, c));
|
||||
}
|
||||
|
||||
let c = self.begin_read_disk_page(page_idx, page.clone(), allow_empty_read, io_ctx)?;
|
||||
let c = self.begin_read_disk_page(page_idx, page.clone(), allow_empty_read, &io_ctx)?;
|
||||
Ok((page, c))
|
||||
}
|
||||
|
||||
@@ -1993,7 +1993,7 @@ impl Pager {
|
||||
// based on the IOContext set, we will set the reserved space bytes as required by
|
||||
// either the encryption or checksum, or None if they are not set.
|
||||
let reserved_space_bytes = {
|
||||
let io_ctx = self.io_ctx.borrow();
|
||||
let io_ctx = self.io_ctx.read();
|
||||
io_ctx.get_reserved_space_bytes()
|
||||
};
|
||||
default_header.reserved_space = reserved_space_bytes;
|
||||
@@ -2366,7 +2366,7 @@ impl Pager {
|
||||
}
|
||||
|
||||
pub fn is_encryption_ctx_set(&self) -> bool {
|
||||
self.io_ctx.borrow_mut().encryption_context().is_some()
|
||||
self.io_ctx.write().encryption_context().is_some()
|
||||
}
|
||||
|
||||
pub fn set_encryption_context(
|
||||
@@ -2377,25 +2377,23 @@ impl Pager {
|
||||
let page_size = self.get_page_size_unchecked().get() as usize;
|
||||
let encryption_ctx = EncryptionContext::new(cipher_mode, key, page_size)?;
|
||||
{
|
||||
let mut io_ctx = self.io_ctx.borrow_mut();
|
||||
let mut io_ctx = self.io_ctx.write();
|
||||
io_ctx.set_encryption(encryption_ctx);
|
||||
}
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Ok(());
|
||||
};
|
||||
wal.borrow_mut()
|
||||
.set_io_context(self.io_ctx.borrow().clone());
|
||||
wal.borrow_mut().set_io_context(self.io_ctx.read().clone());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn reset_checksum_context(&self) {
|
||||
{
|
||||
let mut io_ctx = self.io_ctx.borrow_mut();
|
||||
let mut io_ctx = self.io_ctx.write();
|
||||
io_ctx.reset_checksum();
|
||||
}
|
||||
let Some(wal) = self.wal.as_ref() else { return };
|
||||
wal.borrow_mut()
|
||||
.set_io_context(self.io_ctx.borrow().clone())
|
||||
wal.borrow_mut().set_io_context(self.io_ctx.read().clone())
|
||||
}
|
||||
|
||||
pub fn set_reserved_space_bytes(&self, value: u8) {
|
||||
|
||||
@@ -975,8 +975,8 @@ pub fn begin_write_btree_page(pager: &Pager, page: &PageRef) -> Result<Completio
|
||||
})
|
||||
};
|
||||
let c = Completion::new_write(write_complete);
|
||||
let io_ctx = &pager.io_ctx.borrow();
|
||||
page_source.write_page(page_id, buffer.clone(), io_ctx, c)
|
||||
let io_ctx = pager.io_ctx.read();
|
||||
page_source.write_page(page_id, buffer.clone(), &io_ctx, c)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
@@ -1044,12 +1044,12 @@ pub fn write_pages_vectored(
|
||||
}
|
||||
});
|
||||
|
||||
let io_ctx = &pager.io_ctx.borrow();
|
||||
let io_ctx = pager.io_ctx.read();
|
||||
match pager.db_file.write_pages(
|
||||
start_id,
|
||||
page_sz,
|
||||
std::mem::replace(&mut run_bufs, Vec::with_capacity(EST_BUFF_CAPACITY)),
|
||||
io_ctx,
|
||||
&io_ctx,
|
||||
cmp,
|
||||
) {
|
||||
Ok(c) => completions.push(c),
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
use std::array;
|
||||
use std::borrow::Cow;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use strum::EnumString;
|
||||
use tracing::{instrument, Level};
|
||||
@@ -588,7 +587,7 @@ pub struct WalFile {
|
||||
/// Manages locks needed for checkpointing
|
||||
checkpoint_guard: Option<CheckpointLocks>,
|
||||
|
||||
io_ctx: RefCell<IOContext>,
|
||||
io_ctx: RwLock<IOContext>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for WalFile {
|
||||
@@ -1124,7 +1123,7 @@ impl Wal for WalFile {
|
||||
buffer_pool,
|
||||
complete,
|
||||
page_idx,
|
||||
&self.io_ctx.borrow(),
|
||||
&self.io_ctx.read(),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1135,7 +1134,7 @@ impl Wal for WalFile {
|
||||
let (frame_ptr, frame_len) = (frame.as_mut_ptr(), frame.len());
|
||||
|
||||
let encryption_ctx = {
|
||||
let io_ctx = self.io_ctx.borrow();
|
||||
let io_ctx = self.io_ctx.read();
|
||||
io_ctx.encryption_context().cloned()
|
||||
};
|
||||
let complete = Box::new(move |res: Result<(Arc<Buffer>, i32), CompletionError>| {
|
||||
@@ -1243,7 +1242,7 @@ impl Wal for WalFile {
|
||||
buffer_pool,
|
||||
complete,
|
||||
page_id as usize,
|
||||
&self.io_ctx.borrow(),
|
||||
&self.io_ctx.read(),
|
||||
)?;
|
||||
self.io.wait_for_completion(c)?;
|
||||
return if conflict.get() {
|
||||
@@ -1493,7 +1492,7 @@ impl Wal for WalFile {
|
||||
let plain = page.get_contents().as_ptr();
|
||||
|
||||
let data_to_write: std::borrow::Cow<[u8]> = {
|
||||
let io_ctx = self.io_ctx.borrow();
|
||||
let io_ctx = self.io_ctx.read();
|
||||
match &io_ctx.encryption_or_checksum() {
|
||||
EncryptionOrChecksum::Encryption(ctx) => {
|
||||
Cow::Owned(ctx.encrypt_page(plain, page_id)?)
|
||||
@@ -1573,7 +1572,7 @@ impl Wal for WalFile {
|
||||
}
|
||||
|
||||
fn set_io_context(&mut self, ctx: IOContext) {
|
||||
self.io_ctx.replace(ctx);
|
||||
*self.io_ctx.write() = ctx;
|
||||
}
|
||||
|
||||
fn update_max_frame(&mut self) {
|
||||
@@ -1624,7 +1623,7 @@ impl WalFile {
|
||||
prev_checkpoint: CheckpointResult::default(),
|
||||
checkpoint_guard: None,
|
||||
header,
|
||||
io_ctx: RefCell::new(IOContext::default()),
|
||||
io_ctx: RwLock::new(IOContext::default()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2233,7 +2232,7 @@ impl WalFile {
|
||||
self.buffer_pool.clone(),
|
||||
complete,
|
||||
page_id,
|
||||
&self.io_ctx.borrow(),
|
||||
&self.io_ctx.read(),
|
||||
)?;
|
||||
|
||||
Ok(InflightRead {
|
||||
|
||||
Reference in New Issue
Block a user