Apply review suggestions

This commit is contained in:
PThorpe92
2025-07-29 19:38:48 -04:00
parent 73882b97d6
commit ef69df7258
8 changed files with 188 additions and 160 deletions

View File

@@ -724,10 +724,10 @@ impl turso_core::DatabaseStorage for DatabaseFile {
page_size: usize,
buffers: Vec<Arc<RefCell<turso_core::Buffer>>>,
c: turso_core::Completion,
) -> turso_core::Result<()> {
let pos = (page_idx - 1) * page_size;
self.file.pwritev(pos, buffers, c.into())?;
Ok(())
) -> turso_core::Result<turso_core::Completion> {
let pos = page_idx.saturating_sub(1) * page_size;
let c = self.file.pwritev(pos, buffers, c)?;
Ok(c)
}
fn sync(&self, c: turso_core::Completion) -> turso_core::Result<turso_core::Completion> {

View File

@@ -65,15 +65,16 @@ struct IovecPool {
impl IovecPool {
fn new() -> Self {
let mut pool = Vec::with_capacity(IOVEC_POOL_SIZE);
for _ in 0..IOVEC_POOL_SIZE {
pool.push(Box::new(
[libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 0,
}; MAX_IOVEC_ENTRIES],
));
}
let pool = (0..IOVEC_POOL_SIZE)
.map(|_| {
Box::new(
[libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 0,
}; MAX_IOVEC_ENTRIES],
)
})
.collect();
Self { pool }
}
@@ -144,18 +145,20 @@ enum Fd {
}
impl Fd {
fn as_raw_fd(&self) -> i32 {
match self {
Fd::RawFd(fd) => *fd,
_ => unreachable!("only to be called on RawFd variant"),
}
}
/// to match the behavior of the File, we need to implement the same methods
fn id(&self) -> Option<u32> {
match self {
Fd::Fixed(id) => Some(*id),
Fd::RawFd(_) => None,
}
}
/// ONLY to be called by the macro, in the case where id() is None
fn as_raw_fd(&self) -> i32 {
match self {
Fd::RawFd(fd) => *fd,
_ => panic!("Cannot call as_raw_fd on a Fixed Fd"),
}
}
}
/// State to track an ongoing writev operation in
@@ -181,10 +184,10 @@ struct WritevState {
impl WritevState {
fn new(file: &UringFile, pos: usize, bufs: Vec<Arc<RefCell<crate::Buffer>>>) -> Self {
let file_id = match file.id() {
Some(id) => Fd::Fixed(id),
None => Fd::RawFd(file.as_raw_fd()),
};
let file_id = file
.id()
.map(Fd::Fixed)
.unwrap_or_else(|| Fd::RawFd(file.as_raw_fd()));
let total_len = bufs.iter().map(|b| b.borrow().len()).sum();
Self {
file_id,
@@ -293,18 +296,15 @@ impl WrappedIOUring {
/// Submit or resubmit a writev operation
fn submit_writev(&mut self, key: u64, mut st: WritevState) {
st.free_last_iov(&mut self.iov_pool);
let mut iov_allocation = match self.iov_pool.acquire() {
Some(alloc) => alloc,
None => {
// Fallback: allocate a new one if pool is exhausted
Box::new(
[libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 0,
}; MAX_IOVEC_ENTRIES],
)
}
};
let mut iov_allocation = self.iov_pool.acquire().unwrap_or_else(|| {
// Fallback: allocate a new one if pool is exhausted
Box::new(
[libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 0,
}; MAX_IOVEC_ENTRIES],
)
});
let mut iov_count = 0;
for (idx, buffer) in st
.bufs
@@ -346,54 +346,41 @@ impl WrappedIOUring {
self.submit_entry(&entry);
}
fn handle_writev_completion(&mut self, mut st: WritevState, user_data: u64, result: i32) {
fn handle_writev_completion(&mut self, mut state: WritevState, user_data: u64, result: i32) {
if result < 0 {
tracing::error!(
"writev operation failed for user_data {}: {}",
user_data,
std::io::Error::from_raw_os_error(result)
);
// error: free iov allocation and call completion with error code
st.free_last_iov(&mut self.iov_pool);
let err = std::io::Error::from_raw_os_error(result);
tracing::error!("writev failed (user_data: {}): {}", user_data, err);
state.free_last_iov(&mut self.iov_pool);
completion_from_key(user_data).complete(result);
} else {
let written = result as usize;
st.advance(written);
if st.remaining() == 0 {
return;
}
let written = result as usize;
state.advance(written);
match state.remaining() {
0 => {
tracing::info!(
"writev operation completed: wrote {} bytes",
st.total_written
state.total_written
);
// write complete, return iovec to pool
st.free_last_iov(&mut self.iov_pool);
completion_from_key(user_data).complete(st.total_written as i32);
} else {
state.free_last_iov(&mut self.iov_pool);
completion_from_key(user_data).complete(state.total_written as i32);
}
remaining => {
tracing::trace!(
"resubmitting writev operation for user_data {}: wrote {} bytes, remaining {}",
user_data,
written,
st.remaining()
remaining
);
// partial write, submit next
self.submit_writev(user_data, st);
self.submit_writev(user_data, state);
}
}
}
}
#[inline(always)]
/// use the callback pointer as the user_data for the operation as is
/// common practice for io_uring to prevent more indirection
fn get_key(c: Arc<Completion>) -> u64 {
Arc::into_raw(c) as u64
}
#[inline(always)]
/// convert the user_data back to an Arc<Completion> pointer
fn completion_from_key(key: u64) -> Arc<Completion> {
unsafe { Arc::from_raw(key as *const Completion) }
}
impl IO for UringIO {
fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result<Arc<dyn File>> {
trace!("open_file(path = {})", path);
@@ -613,8 +600,8 @@ impl File for UringFile {
&self,
pos: usize,
bufs: Vec<Arc<RefCell<crate::Buffer>>>,
c: Arc<Completion>,
) -> Result<Arc<Completion>> {
c: Completion,
) -> Result<Completion> {
// for a single buffer use pwrite directly
if bufs.len().eq(&1) {
return self.pwrite(pos, bufs[0].clone(), c.clone());

View File

@@ -1,4 +1,4 @@
use crate::{turso_assert, Result};
use crate::Result;
use bitflags::bitflags;
use cfg_block::cfg_block;
use std::fmt;

View File

@@ -411,7 +411,7 @@ enum CompletionCallback {
),
Writev(
Arc<Mutex<std::fs::File>>,
Arc<Completion>,
Completion,
Vec<Arc<RefCell<crate::Buffer>>>,
usize, // absolute file offset
usize, // buf index
@@ -537,17 +537,12 @@ impl File for UnixFile<'_> {
}
#[instrument(err, skip_all, level = Level::TRACE)]
<<<<<<< HEAD
fn sync(&self, c: Completion) -> Result<Completion> {
||||||| parent of 7f48531b (batch backfilling pages when checkpointing)
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
=======
fn pwritev(
&self,
pos: usize,
buffers: Vec<Arc<RefCell<crate::Buffer>>>,
c: Arc<Completion>,
) -> Result<Arc<Completion>> {
c: Completion,
) -> Result<Completion> {
let file = self
.file
.lock()
@@ -588,8 +583,7 @@ impl File for UnixFile<'_> {
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
>>>>>>> 7f48531b (batch backfilling pages when checkpointing)
fn sync(&self, c: Completion) -> Result<Completion> {
let file = self.file.lock().unwrap();
let result = fs::fsync(file.as_fd());
match result {

View File

@@ -23,7 +23,7 @@ pub trait DatabaseStorage: Send + Sync {
buffers: Vec<Arc<RefCell<Buffer>>>,
c: Completion,
) -> Result<Completion>;
fn sync(&self, c: Completion) -> Result<()>;
fn sync(&self, c: Completion) -> Result<Completion>;
fn size(&self) -> Result<u64>;
fn truncate(&self, len: usize, c: Completion) -> Result<Completion>;
}
@@ -74,7 +74,7 @@ impl DatabaseStorage for DatabaseFile {
page_size: usize,
buffers: Vec<Arc<RefCell<Buffer>>>,
c: Completion,
) -> Result<()> {
) -> Result<Completion> {
assert!(page_idx > 0);
assert!(page_size >= 512);
assert!(page_size <= 65536);
@@ -149,7 +149,7 @@ impl DatabaseStorage for FileMemoryStorage {
page_size: usize,
buffer: Vec<Arc<RefCell<Buffer>>>,
c: Completion,
) -> Result<()> {
) -> Result<Completion> {
assert!(page_idx > 0);
assert!(page_size >= 512);
assert!(page_size <= 65536);

View File

@@ -1304,11 +1304,10 @@ impl Pager {
}
let write_counter = Rc::new(RefCell::new(0));
let checkpoint_result = self.io.block(|| {
let mut checkpoint_result = self.io.block(|| {
self.wal
.borrow_mut()
.checkpoint(self, counter.clone(), mode)
.map_err(|err| panic!("error while clearing cache {err}"))
.checkpoint(self, write_counter.clone(), mode)
})?;
if checkpoint_result.everything_backfilled()

View File

@@ -62,7 +62,7 @@ use crate::storage::wal::{BatchItem, PendingFlush};
use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype};
use crate::{turso_assert, File, Result, WalFileShared};
use std::cell::{RefCell, UnsafeCell};
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::rc::Rc;
@@ -854,52 +854,89 @@ pub fn begin_write_btree_page(
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn write_pages_vectored(pager: &Pager, batch: &[BatchItem]) -> Result<PendingFlush> {
pub fn write_pages_vectored(
pager: &Pager,
batch: BTreeMap<usize, BatchItem>,
) -> Result<PendingFlush> {
if batch.is_empty() {
return Ok(PendingFlush::default());
}
let mut run = batch.to_vec();
run.sort_by_key(|b| b.id);
// batch item array is already sorted by id, so we just need to find contiguous ranges of page_id's
// to submit as `writev`/write_pages calls.
let page_sz = pager.page_size.get().unwrap_or(DEFAULT_PAGE_SIZE) as usize;
let mut all_ids = Vec::with_capacity(run.len());
// count runs
let mut starts = Vec::with_capacity(5); // arbitrary initialization
let mut start = 0;
while start < run.len() {
let mut end = start + 1;
while end < run.len() && run[end].id == run[end - 1].id + 1 {
end += 1;
let mut all_ids = Vec::with_capacity(batch.len());
// Count expected number of runs to create the atomic counter we need to track each batch
let mut run_count = 0;
let mut prev_id = None;
for &id in batch.keys() {
if let Some(prev) = prev_id {
if id != prev + 1 {
run_count += 1;
}
} else {
run_count = 1; // First run
}
starts.push((start, end));
start = end;
prev_id = Some(id);
}
let runs = starts.len();
let runs_left = Arc::new(AtomicUsize::new(runs));
// Create the atomic counters
let runs_left = Arc::new(AtomicUsize::new(run_count));
let done = Arc::new(AtomicBool::new(false));
for (start, end) in starts {
let first_id = run[start].id;
let bufs: Vec<_> = run[start..end].iter().map(|b| b.buf.clone()).collect();
all_ids.extend(run[start..end].iter().map(|b| b.id));
let mut run_start_id = None;
// we know how many runs, but we don't know how many buffers per run, so we can only give an
// estimate of the capacity
const EST_BUFF_CAPACITY: usize = 32;
let mut run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY);
let mut run_ids = Vec::with_capacity(EST_BUFF_CAPACITY);
let runs_left_cl = runs_left.clone();
let done_cl = done.clone();
// Iterate through the batch, submitting each run as soon as it ends
let mut iter = batch.iter().peekable();
while let Some((&id, item)) = iter.next() {
if run_start_id.is_none() {
run_start_id = Some(id);
}
let c = Completion::new_write(move |_| {
if runs_left_cl.fetch_sub(1, Ordering::AcqRel) == 1 {
done_cl.store(true, Ordering::Release);
run_bufs.push(item.buf.clone());
run_ids.push(id);
// Check if this is the end of a run, either the next key is not consecutive or this is the last entry
let is_end_of_run = match iter.peek() {
Some((&next_id, _)) => next_id != id + 1,
None => true, // Last item is always end of a run
};
if is_end_of_run {
// Submit this run immediately
let start_id = run_start_id.unwrap();
let runs_left_cl = runs_left.clone();
let done_cl = done.clone();
let c = Completion::new_write(move |_| {
if runs_left_cl.fetch_sub(1, Ordering::AcqRel) == 1 {
done_cl.store(true, Ordering::Release);
}
});
// Submit and decrement the runs_left counter on error
if let Err(e) = pager.db_file.write_pages(start_id, page_sz, run_bufs, c) {
if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 {
done.store(true, Ordering::Release);
}
return Err(e);
}
});
// submit, roll back on error
if let Err(e) = pager.db_file.write_pages(first_id, page_sz, bufs, c) {
if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 {
done.store(true, Ordering::Release);
}
return Err(e);
// Add IDs to the all_ids list and prepare for the next run
all_ids.extend(run_ids);
run_start_id = None;
// .clear() will cause borrowing issue, unfortunately we have to reassign
run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY);
run_ids = Vec::with_capacity(EST_BUFF_CAPACITY);
}
}
tracing::debug!(
"write_pages_vectored: {} pages to write, runs: {runs}",
"write_pages_vectored: {} pages to write, runs: {run_count}",
all_ids.len()
);

View File

@@ -3,7 +3,7 @@
use std::array;
use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use strum::EnumString;
use tracing::{instrument, Level};
@@ -404,7 +404,6 @@ pub const CKPT_BATCH_PAGES: usize = 512;
#[derive(Clone)]
pub(super) struct BatchItem {
pub(super) id: usize,
pub(super) buf: Arc<RefCell<Buffer>>,
}
@@ -418,9 +417,9 @@ pub(super) struct BatchItem {
// range. This is inefficient for now.
struct OngoingCheckpoint {
scratch_page: PageRef,
batch: Vec<BatchItem>,
batch: BTreeMap<usize, BatchItem>,
state: CheckpointState,
pending_flushes: Vec<PendingFlush>,
pending_flush: Option<PendingFlush>,
min_frame: u64,
max_frame: u64,
current_page: u64,
@@ -446,6 +445,14 @@ impl PendingFlush {
done: Arc::new(AtomicBool::new(false)),
}
}
// clear the dirty flag of all pages in the pending flush batch
fn clear_dirty(&self, pager: &Pager) {
for id in &self.pages {
if let Some(p) = pager.cache_get(*id) {
p.clear_dirty();
}
}
}
}
impl fmt::Debug for OngoingCheckpoint {
@@ -699,7 +706,11 @@ impl Drop for CheckpointLocks {
}
}
fn take_page_into_batch(scratch: &PageRef, pool: &Arc<BufferPool>, batch: &mut Vec<BatchItem>) {
fn take_page_into_batch(
scratch: &PageRef,
pool: &Arc<BufferPool>,
batch: &mut BTreeMap<usize, BatchItem>,
) {
let (id, buf_clone) = unsafe {
let inner = &*scratch.inner.get();
let id = inner.id;
@@ -707,9 +718,8 @@ fn take_page_into_batch(scratch: &PageRef, pool: &Arc<BufferPool>, batch: &mut V
let buf = contents.buffer.clone();
(id, buf)
};
// Push into batch
batch.push(BatchItem { id, buf: buf_clone });
// Insert the new batch item at the correct position
batch.insert(id, BatchItem { buf: buf_clone });
// Re-initialize scratch with a fresh buffer
let raw = pool.get();
@@ -1147,7 +1157,7 @@ impl Wal for WalFile {
"Full checkpoint mode is not implemented yet".into(),
));
}
self.checkpoint_inner(pager, write_counter, mode)
self.checkpoint_inner(pager, _write_counter, mode)
.inspect_err(|_| {
let _ = self.checkpoint_guard.take();
})
@@ -1265,8 +1275,8 @@ impl WalFile {
shared,
ongoing_checkpoint: OngoingCheckpoint {
scratch_page: checkpoint_page,
batch: Vec::new(),
pending_flushes: Vec::new(),
batch: BTreeMap::new(),
pending_flush: None,
state: CheckpointState::Start,
min_frame: 0,
max_frame: 0,
@@ -1326,7 +1336,7 @@ impl WalFile {
self.ongoing_checkpoint.current_page = 0;
self.max_frame_read_lock_index.set(NO_LOCK_HELD);
self.ongoing_checkpoint.batch.clear();
self.ongoing_checkpoint.pending_flushes.clear();
let _ = self.ongoing_checkpoint.pending_flush.take();
self.sync_state.set(SyncState::NotSyncing);
self.syncing.set(false);
}
@@ -1375,7 +1385,7 @@ impl WalFile {
fn checkpoint_inner(
&mut self,
pager: &Pager,
write_counter: Rc<RefCell<usize>>,
_write_counter: Rc<RefCell<usize>>,
mode: CheckpointMode,
) -> Result<IOResult<CheckpointResult>> {
'checkpoint_loop: loop {
@@ -1438,10 +1448,10 @@ impl WalFile {
page,
*frame
);
self.ongoing_checkpoint.page.get().id = page as usize;
self.ongoing_checkpoint.scratch_page.get().id = page as usize;
let _ = self.read_frame(
*frame,
self.ongoing_checkpoint.page.clone(),
self.ongoing_checkpoint.scratch_page.clone(),
self.buffer_pool.clone(),
)?;
self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame;
@@ -1451,7 +1461,7 @@ impl WalFile {
self.ongoing_checkpoint.current_page += 1;
}
CheckpointState::WaitReadFrame => {
if self.ongoing_checkpoint.page.is_locked() {
if self.ongoing_checkpoint.scratch_page.is_locked() {
return Ok(IOResult::IO);
} else {
self.ongoing_checkpoint.state = CheckpointState::AccumulatePage;
@@ -1460,14 +1470,15 @@ impl WalFile {
CheckpointState::AccumulatePage => {
// mark before batching
self.ongoing_checkpoint.scratch_page.set_dirty();
// we read the frame into memory, add it to our batch
take_page_into_batch(
&self.ongoing_checkpoint.scratch_page,
&self.buffer_pool,
&mut self.ongoing_checkpoint.batch,
);
let more_pages = (self.ongoing_checkpoint.current_page as usize)
< self.get_shared().pages_in_frames.lock().len() - 1;
< self.get_shared().pages_in_frames.lock().len() - 1
&& self.ongoing_checkpoint.batch.len() < CKPT_BATCH_PAGES;
if more_pages {
self.ongoing_checkpoint.current_page += 1;
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
@@ -1477,34 +1488,30 @@ impl WalFile {
}
CheckpointState::FlushBatch => {
tracing::trace!("started checkpoint backfilling batch");
self.ongoing_checkpoint
.pending_flushes
.push(write_pages_vectored(pager, &self.ongoing_checkpoint.batch)?);
self.ongoing_checkpoint.pending_flush = Some(write_pages_vectored(
pager,
std::mem::take(&mut self.ongoing_checkpoint.batch),
)?);
// batch is queued
self.ongoing_checkpoint.batch.clear();
self.ongoing_checkpoint.state = CheckpointState::WaitFlush;
}
CheckpointState::WaitFlush => {
if self
.ongoing_checkpoint
.pending_flushes
.iter()
.any(|pf| !pf.done.load(Ordering::Acquire))
{
return Ok(IOResult::IO);
match self.ongoing_checkpoint.pending_flush.as_ref() {
Some(pf) if pf.done.load(Ordering::SeqCst) => {
// flush is done, we can continue
tracing::trace!("checkpoint backfilling batch done");
}
Some(_) => return Ok(IOResult::IO),
None => panic!("we should have a pending flush here"),
}
tracing::debug!("finished checkpoint backfilling batch");
for pf in self
let pf = self
.ongoing_checkpoint
.pending_flushes
.drain(std::ops::RangeFull)
{
for id in pf.pages {
if let Some(p) = pager.cache_get(id) {
p.clear_dirty();
}
}
}
.pending_flush
.as_ref()
.expect("we should have a pending flush here");
pf.clear_dirty(pager);
// done with batch
let shared = self.get_shared();
if (self.ongoing_checkpoint.current_page as usize)
@@ -1513,7 +1520,7 @@ impl WalFile {
self.ongoing_checkpoint.current_page += 1;
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
} else {
tracing::info!("transitioning checkpoint to done");
tracing::debug!("WaitFlush transitioning checkpoint to Done");
self.ongoing_checkpoint.state = CheckpointState::Done;
}
}
@@ -1522,9 +1529,13 @@ impl WalFile {
// In Restart or Truncate mode, we need to restart the log over and possibly truncate the file
// Release all locks and return the current num of wal frames and the amount we backfilled
CheckpointState::Done => {
if *write_counter.borrow() > 0 {
return Ok(IOResult::IO);
}
turso_assert!(
self.ongoing_checkpoint
.pending_flush
.as_ref()
.is_some_and(|pf| pf.done.load(Ordering::Relaxed)),
"checkpoint pending flush must have finished"
);
let mut checkpoint_result = {
let shared = self.get_shared();
let current_mx = shared.max_frame.load(Ordering::SeqCst);