From 3e508a4b42b8901d00674c9459e0028f86e555c7 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 7 Oct 2025 12:00:21 +0200 Subject: [PATCH 1/2] core/io: remove new_dummy in place of new_yield Yield is a completion that does not allocate any inner state. By design it is completed from the start and has no errors. This allows lightly yield without allocating any locks nor heap allocate inner state. --- core/incremental/view.rs | 2 +- core/io/mod.rs | 99 +++++++++++++++++++++------------- core/io/vfs.rs | 4 +- core/mvcc/database/mod.rs | 2 +- core/storage/btree.rs | 2 +- core/storage/pager.rs | 2 +- core/storage/sqlite3_ondisk.rs | 2 +- 7 files changed, 69 insertions(+), 44 deletions(-) diff --git a/core/incremental/view.rs b/core/incremental/view.rs index cb90b57b9..f82aeadcf 100644 --- a/core/incremental/view.rs +++ b/core/incremental/view.rs @@ -1282,7 +1282,7 @@ impl IncrementalView { pending_row: None, // No pending row when interrupted between rows }; // TODO: Get the actual I/O completion from the statement - let completion = crate::io::Completion::new_dummy(); + let completion = crate::io::Completion::new_yield(); return Ok(IOResult::IO(crate::types::IOCompletions::Single( completion, ))); diff --git a/core/io/mod.rs b/core/io/mod.rs index 0698df9c6..c1940e191 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -102,8 +102,10 @@ pub trait IO: Clock + Send + Sync { while !c.finished() { self.step()? } - if let Some(Some(err)) = c.inner.result.get().copied() { - return Err(err.into()); + if let Some(inner) = &c.inner { + if let Some(Some(err)) = inner.result.get().copied() { + return Err(err.into()); + } } Ok(()) } @@ -133,7 +135,8 @@ pub type TruncateComplete = dyn Fn(Result); #[must_use] #[derive(Debug, Clone)] pub struct Completion { - inner: Arc, + /// Optional completion state. If None, it means we are Yield in order to not allocate anything + inner: Option>, } struct CompletionInner { @@ -194,7 +197,7 @@ impl CompletionGroup { c.link_internal(&group); continue; } - let group_inner = match &group.inner.completion_type { + let group_inner = match &group.get_inner().completion_type { CompletionType::Group(g) => &g.inner, _ => unreachable!(), }; @@ -209,7 +212,7 @@ impl CompletionGroup { group_inner.outstanding.fetch_sub(1, Ordering::SeqCst); } - let group_inner = match &group.inner.completion_type { + let group_inner = match &group.get_inner().completion_type { CompletionType::Group(g) => &g.inner, _ => unreachable!(), }; @@ -276,6 +279,7 @@ impl Debug for CompletionType { Self::Sync(..) => f.debug_tuple("Sync").finish(), Self::Truncate(..) => f.debug_tuple("Truncate").finish(), Self::Group(..) => f.debug_tuple("Group").finish(), + Self::Yield => f.debug_tuple("Yield").finish(), } } } @@ -286,33 +290,38 @@ pub enum CompletionType { Sync(SyncCompletion), Truncate(TruncateCompletion), Group(GroupCompletion), + Yield, } impl Completion { pub fn new(completion_type: CompletionType) -> Self { Self { - inner: Arc::new(CompletionInner { + inner: Some(Arc::new(CompletionInner { completion_type, result: OnceLock::new(), needs_link: false, parent: OnceLock::new(), - }), + })), } } pub fn new_linked(completion_type: CompletionType) -> Self { Self { - inner: Arc::new(CompletionInner { + inner: Some(Arc::new(CompletionInner { completion_type, result: OnceLock::new(), needs_link: true, parent: OnceLock::new(), - }), + })), } } + pub(self) fn get_inner(&self) -> &Arc { + self.inner.as_ref().unwrap() + } + pub fn needs_link(&self) -> bool { - self.inner.needs_link + self.get_inner().needs_link } pub fn new_write_linked(complete: F) -> Self @@ -360,43 +369,56 @@ impl Completion { )))) } - /// Create a dummy completed completion - pub fn new_dummy() -> Self { - let c = Self::new_write(|_| {}); - c.complete(0); - c + /// Create a yield completion. These are completed by default allowing to yield control without + /// allocating memory. + pub fn new_yield() -> Self { + Self { inner: None } } pub fn succeeded(&self) -> bool { - match &self.inner.completion_type { - CompletionType::Group(g) => { - g.inner.outstanding.load(Ordering::SeqCst) == 0 - && g.inner.result.get().is_none_or(|e| e.is_none()) - } - _ => self.inner.result.get().is_some(), + match &self.inner { + Some(inner) => match &inner.completion_type { + CompletionType::Group(g) => { + g.inner.outstanding.load(Ordering::SeqCst) == 0 + && g.inner.result.get().is_none_or(|e| e.is_none()) + } + _ => inner.result.get().is_some(), + }, + None => true, } } pub fn failed(&self) -> bool { - self.inner.result.get().is_some_and(|val| val.is_some()) + match &self.inner { + Some(inner) => inner.result.get().is_some_and(|val| val.is_some()), + None => false, + } } pub fn get_error(&self) -> Option { - match &self.inner.completion_type { - CompletionType::Group(g) => { - // For groups, check the group's cached result field - // (set when the last completion finishes) - g.inner.result.get().and_then(|res| *res) + match &self.inner { + Some(inner) => { + match &inner.completion_type { + CompletionType::Group(g) => { + // For groups, check the group's cached result field + // (set when the last completion finishes) + g.inner.result.get().and_then(|res| *res) + } + _ => inner.result.get().and_then(|res| *res), + } } - _ => self.inner.result.get().and_then(|res| *res), + None => None, } } /// Checks if the Completion completed or errored pub fn finished(&self) -> bool { - match &self.inner.completion_type { - CompletionType::Group(g) => g.inner.outstanding.load(Ordering::SeqCst) == 0, - _ => self.inner.result.get().is_some(), + match &self.inner { + Some(inner) => match &inner.completion_type { + CompletionType::Group(g) => g.inner.outstanding.load(Ordering::SeqCst) == 0, + _ => inner.result.get().is_some(), + }, + None => true, } } @@ -415,16 +437,18 @@ impl Completion { } fn callback(&self, result: Result) { - self.inner.result.get_or_init(|| { - match &self.inner.completion_type { + let inner = self.get_inner(); + inner.result.get_or_init(|| { + match &inner.completion_type { CompletionType::Read(r) => r.callback(result), CompletionType::Write(w) => w.callback(result), CompletionType::Sync(s) => s.callback(result), // fix CompletionType::Truncate(t) => t.callback(result), CompletionType::Group(g) => g.callback(result), + CompletionType::Yield => {} }; - if let Some(group) = self.inner.parent.get() { + if let Some(group) = inner.parent.get() { // Capture first error in group if let Err(err) = result { let _ = group.result.set(Some(err)); @@ -446,7 +470,8 @@ impl Completion { /// only call this method if you are sure that the completion is /// a ReadCompletion, panics otherwise pub fn as_read(&self) -> &ReadCompletion { - match self.inner.completion_type { + let inner = self.get_inner(); + match inner.completion_type { CompletionType::Read(ref r) => r, _ => unreachable!(), } @@ -454,13 +479,13 @@ impl Completion { /// Link this completion to a group completion (internal use only) fn link_internal(&mut self, group: &Completion) { - let group_inner = match &group.inner.completion_type { + let group_inner = match &group.get_inner().completion_type { CompletionType::Group(g) => &g.inner, _ => panic!("link_internal() requires a group completion"), }; // Set the parent (can only be set once) - if self.inner.parent.set(group_inner.clone()).is_err() { + if self.get_inner().parent.set(group_inner.clone()).is_err() { panic!("completion can only be linked once"); } } diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 9c7b116c0..52722a82e 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -86,14 +86,14 @@ impl VfsMod { /// that the into_raw/from_raw contract will hold unsafe extern "C" fn callback_fn(result: i32, ctx: SendPtr) { let completion = Completion { - inner: (Arc::from_raw(ctx.inner().as_ptr() as *mut CompletionInner)), + inner: (Some(Arc::from_raw(ctx.inner().as_ptr() as *mut CompletionInner))), }; completion.complete(result); } fn to_callback(c: Completion) -> IOCallback { IOCallback::new(callback_fn, unsafe { - NonNull::new_unchecked(Arc::into_raw(c.inner) as *mut c_void) + NonNull::new_unchecked(Arc::into_raw(c.get_inner().clone()) as *mut c_void) }) } diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 675695b78..0726504bb 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -625,7 +625,7 @@ impl StateTransition for CommitStateMachine { let locked = self.commit_coordinator.pager_commit_lock.write(); if !locked { return Ok(TransitionResult::Io(IOCompletions::Single( - Completion::new_dummy(), + Completion::new_yield(), ))); } } diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 9ab4c689e..71a63aba6 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -5717,7 +5717,7 @@ impl BTreeCursor { self.valid_state = CursorValidState::RequireAdvance(ctx.seek_op.iteration_direction()); self.context = Some(ctx); - io_yield_one!(Completion::new_dummy()); + io_yield_one!(Completion::new_yield()); } self.valid_state = CursorValidState::Valid; Ok(IOResult::Done(())) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index e3281fc7c..0d748467e 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1142,7 +1142,7 @@ impl Pager { } } else { // Give a chance for the allocation to happen elsewhere - io_yield_one!(Completion::new_dummy()); + io_yield_one!(Completion::new_yield()); } } Ok(IOResult::Done(())) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 5fb353804..6dfe12bd1 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1746,7 +1746,7 @@ impl StreamingWalReader { .min((self.file_size - offset) as usize); if read_size == 0 { // end-of-file; let caller finalize - return Ok((0, Completion::new_dummy())); + return Ok((0, Completion::new_yield())); } let buf = Arc::new(Buffer::new_temporary(read_size)); From a7d2462c0519c300255e9f8729f0e6159027de45 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 7 Oct 2025 12:05:54 +0200 Subject: [PATCH 2/2] core/io/uring: fix inner usages Yield is a completion that does not allocate any inner state. By design it is completed from the start and has no errors. This allows lightly yield without allocating any locks nor heap allocate inner state. --- core/io/io_uring.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 03f8dc3a0..d9f79b874 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -709,14 +709,16 @@ impl Clock for UringIO { /// use the callback pointer as the user_data for the operation as is /// common practice for io_uring to prevent more indirection fn get_key(c: Completion) -> u64 { - Arc::into_raw(c.inner.clone()) as u64 + Arc::into_raw(c.get_inner().clone()) as u64 } #[inline(always)] /// convert the user_data back to an Completion pointer fn completion_from_key(key: u64) -> Completion { let c_inner = unsafe { Arc::from_raw(key as *const CompletionInner) }; - Completion { inner: c_inner } + Completion { + inner: Some(c_inner), + } } pub struct UringFile {