From 3831218f6c414c6e32cfce5274d518db5917a7e8 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Wed, 3 Sep 2025 15:58:22 -0400 Subject: [PATCH 1/3] Add linked to completion types in io/mod --- core/io/mod.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/core/io/mod.rs b/core/io/mod.rs index 4b877e37b..735dc2f76 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -135,6 +135,7 @@ struct CompletionInner { /// None means we completed successfully // Thread safe with OnceLock result: std::sync::OnceLock>, + needs_link: bool, } impl Debug for CompletionType { @@ -161,10 +162,34 @@ impl Completion { inner: Arc::new(CompletionInner { completion_type, result: OnceLock::new(), + needs_link: false, }), } } + pub fn new_linked(completion_type: CompletionType) -> Self { + Self { + inner: Arc::new(CompletionInner { + completion_type, + result: OnceLock::new(), + needs_link: true, + }), + } + } + + pub fn needs_link(&self) -> bool { + self.inner.needs_link + } + + pub fn new_write_linked(complete: F) -> Self + where + F: Fn(Result) + 'static, + { + Self::new_linked(CompletionType::Write(WriteCompletion::new(Box::new( + complete, + )))) + } + pub fn new_write(complete: F) -> Self where F: Fn(Result) + 'static, From e3f366963d09e4da45c79f48cf515fc603a070c8 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Wed, 3 Sep 2025 16:00:46 -0400 Subject: [PATCH 2/3] Compute the final db page or make the commit frame submit a linked pwritev completion --- core/storage/sqlite3_ondisk.rs | 34 +++++++++++++++++----------------- core/storage/wal.rs | 22 +++++++++++++++++++--- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 0bc8d1e67..e8abfa1db 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -995,17 +995,14 @@ pub fn write_pages_vectored( pager: &Pager, batch: BTreeMap>, done_flag: Arc, + final_write: bool, ) -> Result> { if batch.is_empty() { done_flag.store(true, Ordering::Relaxed); return Ok(Vec::new()); } - // batch item array is already sorted by id, so we just need to find contiguous ranges of page_id's - // to submit as `writev`/write_pages calls. - let page_sz = pager.page_size.get().expect("page size is not set").get() as usize; - // Count expected number of runs to create the atomic counter we need to track each batch let mut run_count = 0; let mut prev_id = None; @@ -1023,26 +1020,21 @@ pub fn write_pages_vectored( // Create the atomic counters let runs_left = Arc::new(AtomicUsize::new(run_count)); let done = done_flag.clone(); - // we know how many runs, but we don't know how many buffers per run, so we can only give an - // estimate of the capacity const EST_BUFF_CAPACITY: usize = 32; - // Iterate through the batch, submitting each run as soon as it ends - // We can reuse this across runs without reallocating let mut run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY); let mut run_start_id: Option = None; - // Iterate through the batch + // Track which run we're on to identify the last one + let mut current_run = 0; let mut iter = batch.iter().peekable(); - let mut completions = Vec::new(); + while let Some((id, item)) = iter.next() { // Track the start of the run if run_start_id.is_none() { run_start_id = Some(*id); } - - // Add this page to the current run run_bufs.push(item.clone()); // Check if this is the end of a run @@ -1052,24 +1044,32 @@ pub fn write_pages_vectored( }; if is_end_of_run { + current_run += 1; let start_id = run_start_id.expect("should have a start id"); let runs_left_cl = runs_left.clone(); let done_cl = done.clone(); + // This is the last chunk if it's the last run AND final_write is true + let is_last_chunk = current_run == run_count && final_write; + let total_sz = (page_sz * run_bufs.len()) as i32; - let c = Completion::new_write(move |res| { + let cmp = move |res| { let Ok(res) = res else { return; }; - // writev calls can sometimes return partial writes, but our `pwritev` - // implementation aggregates any partial writes and calls completion with total turso_assert!(total_sz == res, "failed to write expected size"); if runs_left_cl.fetch_sub(1, Ordering::AcqRel) == 1 { done_cl.store(true, Ordering::Release); } - }); + }; - // Submit write operation for this run, decrementing the counter if we error + let c = if is_last_chunk { + Completion::new_write_linked(cmp) + } else { + Completion::new_write(cmp) + }; + + // Submit write operation for this run let io_ctx = &pager.io_ctx.borrow(); match pager.db_file.write_pages( start_id, diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 41fc4e677..9a6389c5c 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -465,6 +465,13 @@ impl OngoingCheckpoint { self.state = CheckpointState::Start; } + #[inline] + fn is_final_write(&self) -> bool { + self.current_page as usize >= self.pages_to_checkpoint.len() + && self.inflight_reads.is_empty() + && !self.pending_writes.is_empty() + } + #[inline] /// Whether or not new reads should be issued during checkpoint processing. fn should_issue_reads(&self) -> bool { @@ -1510,7 +1517,7 @@ impl Wal for WalFile { // single completion for the whole batch let total_len: i32 = iovecs.iter().map(|b| b.len() as i32).sum(); let page_frame_for_cb = page_frame_and_checksum.clone(); - let c = Completion::new_write(move |res: Result| { + let cmp = move |res: Result| { let Ok(bytes_written) = res else { return; }; @@ -1523,7 +1530,13 @@ impl Wal for WalFile { page.clear_dirty(); page.set_wal_tag(*fid, seq); } - }); + }; + + let c = if db_size_on_commit.is_some() { + Completion::new_write_linked(cmp) + } else { + Completion::new_write(cmp) + }; let c = self.get_shared().file.pwritev(start_off, iovecs, c)?; Ok(c) @@ -1821,7 +1834,10 @@ impl WalFile { let batch_map = self.ongoing_checkpoint.pending_writes.take(); if !batch_map.is_empty() { let done_flag = self.ongoing_checkpoint.add_write(); - completions.extend(write_pages_vectored(pager, batch_map, done_flag)?); + let is_final = self.ongoing_checkpoint.is_final_write(); + completions.extend(write_pages_vectored( + pager, batch_map, done_flag, is_final, + )?); } } From d894a62132084ae711a89f2eccadd0903ff20754 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Wed, 3 Sep 2025 16:01:12 -0400 Subject: [PATCH 3/3] Add plumbing in io_uring to handle linked writes to ensure consistency --- core/io/io_uring.rs | 89 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 78 insertions(+), 11 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 393169028..80f8c8444 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -7,6 +7,7 @@ use crate::{turso_assert, LimboError, Result}; use parking_lot::Mutex; use rustix::fs::{self, FlockOperation, OFlags}; use std::ptr::NonNull; +use std::sync::atomic::{AtomicBool, Ordering}; use std::{ collections::{HashMap, VecDeque}, io::ErrorKind, @@ -43,6 +44,10 @@ const MAX_WAIT: usize = 4; /// One memory arena for DB pages and another for WAL frames const ARENA_COUNT: usize = 2; +/// Arbitrary non-zero user_data for barrier operation when handling a partial writev +/// writing a commit frame. +const BARRIER_USER_DATA: u64 = 1; + pub struct UringIO { inner: Arc>, } @@ -56,6 +61,7 @@ struct WrappedIOUring { writev_states: HashMap, overflow: VecDeque, iov_pool: IovecPool, + pending_link: AtomicBool, } struct InnerUringIO { @@ -122,6 +128,7 @@ impl UringIO { pending_ops: 0, writev_states: HashMap::new(), iov_pool: IovecPool::new(), + pending_link: AtomicBool::new(false), }, free_files: (0..FILES).collect(), free_arenas: [const { None }; ARENA_COUNT], @@ -153,6 +160,7 @@ macro_rules! with_fd { /// wrapper type to represent a possibly registered file descriptor, /// only used in WritevState, and piggy-backs on the available methods from /// `UringFile`, so we don't have to store the file on `WritevState`. +#[derive(Clone)] enum Fd { Fixed(u32), RawFd(i32), @@ -194,10 +202,12 @@ struct WritevState { bufs: Vec>, /// we keep the last iovec allocation alive until final CQE last_iov_allocation: Option>, + had_partial: bool, + linked_op: bool, } impl WritevState { - fn new(file: &UringFile, pos: u64, bufs: Vec>) -> Self { + fn new(file: &UringFile, pos: u64, linked: bool, bufs: Vec>) -> Self { let file_id = file .id() .map(Fd::Fixed) @@ -212,6 +222,8 @@ impl WritevState { bufs, last_iov_allocation: None, total_len, + had_partial: false, + linked_op: linked, } } @@ -353,7 +365,7 @@ impl WrappedIOUring { } /// Submit or resubmit a writev operation - fn submit_writev(&mut self, key: u64, mut st: WritevState) { + fn submit_writev(&mut self, key: u64, mut st: WritevState, continue_chain: bool) { st.free_last_iov(&mut self.iov_pool); let mut iov_allocation = self.iov_pool.acquire().unwrap_or_else(|| { // Fallback: allocate a new one if pool is exhausted @@ -391,7 +403,7 @@ impl WrappedIOUring { } // If we have coalesced everything into a single iovec, submit as a single`pwrite` if iov_count == 1 { - let entry = with_fd!(st.file_id, |fd| { + let mut entry = with_fd!(st.file_id, |fd| { if let Some(id) = st.bufs[st.current_buffer_idx].fixed_id() { io_uring::opcode::WriteFixed::new( fd, @@ -413,6 +425,16 @@ impl WrappedIOUring { .user_data(key) } }); + + if st.linked_op && !st.had_partial { + // Starting a new link chain + entry = entry.flags(io_uring::squeue::Flags::IO_LINK); + self.pending_link.store(true, Ordering::Release); + } else if continue_chain && !st.had_partial { + // Continue existing chain + entry = entry.flags(io_uring::squeue::Flags::IO_LINK); + } + self.submit_entry(&entry); return; } @@ -422,12 +444,15 @@ impl WrappedIOUring { let ptr = iov_allocation.as_ptr() as *mut libc::iovec; st.last_iov_allocation = Some(iov_allocation); - let entry = with_fd!(st.file_id, |fd| { + let mut entry = with_fd!(st.file_id, |fd| { io_uring::opcode::Writev::new(fd, ptr, iov_count as u32) .offset(st.file_pos) .build() .user_data(key) }); + if st.linked_op { + entry = entry.flags(io_uring::squeue::Flags::IO_LINK); + } // track the current state in case we get a partial write self.writev_states.insert(key, st); self.submit_entry(&entry); @@ -452,6 +477,19 @@ impl WrappedIOUring { ); // write complete, return iovec to pool state.free_last_iov(&mut self.iov_pool); + if state.linked_op && state.had_partial { + // if it was a linked operation, we need to submit a fsync after this writev + // to ensure data is on disk + self.ring.submit().expect("submit after writev"); + let file_id = state.file_id; + let sync = with_fd!(file_id, |fd| { + io_uring::opcode::Fsync::new(fd) + .build() + .user_data(BARRIER_USER_DATA) + }) + .flags(io_uring::squeue::Flags::IO_DRAIN); + self.submit_entry(&sync); + } completion_from_key(user_data).complete(state.total_written as i32); } remaining => { @@ -461,8 +499,10 @@ impl WrappedIOUring { written, remaining ); - // partial write, submit next - self.submit_writev(user_data, state); + // make sure partial write is recorded, because fsync could happen after this + // and we are not finished writing to disk + state.had_partial = true; + self.submit_writev(user_data, state, false); } } } @@ -530,6 +570,14 @@ impl IO for UringIO { // if we have ongoing writev state, handle it separately and don't call completion ring.handle_writev_completion(state, user_data, result); continue; + } else if user_data == BARRIER_USER_DATA { + // barrier operation, no completion to call + if result < 0 { + let err = std::io::Error::from_raw_os_error(result); + tracing::error!("barrier operation failed: {}", err); + return Err(err.into()); + } + continue; } completion_from_key(user_data).complete(result) } @@ -680,7 +728,7 @@ impl File for UringFile { fn pwrite(&self, pos: u64, buffer: Arc, c: Completion) -> Result { let mut io = self.io.lock(); - let write = { + let mut write = { let ptr = buffer.as_ptr(); let len = buffer.len(); with_fd!(self, |fd| { @@ -708,6 +756,15 @@ impl File for UringFile { } }) }; + if c.needs_link() { + // Start a new link chain + write = write.flags(io_uring::squeue::Flags::IO_LINK); + io.ring.pending_link.store(true, Ordering::Release); + } else if io.ring.pending_link.load(Ordering::Acquire) { + // Continue existing link chain + write = write.flags(io_uring::squeue::Flags::IO_LINK); + } + io.ring.submit_entry(&write); Ok(c) } @@ -720,6 +777,8 @@ impl File for UringFile { .build() .user_data(get_key(c.clone())) }); + // sync always ends the chain of linked operations + io.ring.pending_link.store(false, Ordering::Release); io.ring.submit_entry(&sync); Ok(c) } @@ -734,10 +793,14 @@ impl File for UringFile { if bufs.len().eq(&1) { return self.pwrite(pos, bufs[0].clone(), c.clone()); } + let linked = c.needs_link(); tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len()); // create state to track ongoing writev operation - let state = WritevState::new(self, pos, bufs); - self.io.lock().ring.submit_writev(get_key(c.clone()), state); + let state = WritevState::new(self, pos, linked, bufs); + let mut io = self.io.lock(); + let continue_chain = !linked && io.ring.pending_link.load(Ordering::Acquire); + io.ring + .submit_writev(get_key(c.clone()), state, continue_chain); Ok(c) } @@ -746,12 +809,16 @@ impl File for UringFile { } fn truncate(&self, len: u64, c: Completion) -> Result { - let truncate = with_fd!(self, |fd| { + let mut truncate = with_fd!(self, |fd| { io_uring::opcode::Ftruncate::new(fd, len) .build() .user_data(get_key(c.clone())) }); - self.io.lock().ring.submit_entry(&truncate); + let mut io = self.io.lock(); + if io.ring.pending_link.load(Ordering::Acquire) { + truncate = truncate.flags(io_uring::squeue::Flags::IO_LINK); + } + io.ring.submit_entry(&truncate); Ok(c) } }