diff --git a/core/incremental/view.rs b/core/incremental/view.rs index cb90b57b9..f82aeadcf 100644 --- a/core/incremental/view.rs +++ b/core/incremental/view.rs @@ -1282,7 +1282,7 @@ impl IncrementalView { pending_row: None, // No pending row when interrupted between rows }; // TODO: Get the actual I/O completion from the statement - let completion = crate::io::Completion::new_dummy(); + let completion = crate::io::Completion::new_yield(); return Ok(IOResult::IO(crate::types::IOCompletions::Single( completion, ))); diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 03f8dc3a0..d9f79b874 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -709,14 +709,16 @@ impl Clock for UringIO { /// use the callback pointer as the user_data for the operation as is /// common practice for io_uring to prevent more indirection fn get_key(c: Completion) -> u64 { - Arc::into_raw(c.inner.clone()) as u64 + Arc::into_raw(c.get_inner().clone()) as u64 } #[inline(always)] /// convert the user_data back to an Completion pointer fn completion_from_key(key: u64) -> Completion { let c_inner = unsafe { Arc::from_raw(key as *const CompletionInner) }; - Completion { inner: c_inner } + Completion { + inner: Some(c_inner), + } } pub struct UringFile { diff --git a/core/io/mod.rs b/core/io/mod.rs index 0698df9c6..c1940e191 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -102,8 +102,10 @@ pub trait IO: Clock + Send + Sync { while !c.finished() { self.step()? } - if let Some(Some(err)) = c.inner.result.get().copied() { - return Err(err.into()); + if let Some(inner) = &c.inner { + if let Some(Some(err)) = inner.result.get().copied() { + return Err(err.into()); + } } Ok(()) } @@ -133,7 +135,8 @@ pub type TruncateComplete = dyn Fn(Result); #[must_use] #[derive(Debug, Clone)] pub struct Completion { - inner: Arc, + /// Optional completion state. If None, it means we are Yield in order to not allocate anything + inner: Option>, } struct CompletionInner { @@ -194,7 +197,7 @@ impl CompletionGroup { c.link_internal(&group); continue; } - let group_inner = match &group.inner.completion_type { + let group_inner = match &group.get_inner().completion_type { CompletionType::Group(g) => &g.inner, _ => unreachable!(), }; @@ -209,7 +212,7 @@ impl CompletionGroup { group_inner.outstanding.fetch_sub(1, Ordering::SeqCst); } - let group_inner = match &group.inner.completion_type { + let group_inner = match &group.get_inner().completion_type { CompletionType::Group(g) => &g.inner, _ => unreachable!(), }; @@ -276,6 +279,7 @@ impl Debug for CompletionType { Self::Sync(..) => f.debug_tuple("Sync").finish(), Self::Truncate(..) => f.debug_tuple("Truncate").finish(), Self::Group(..) => f.debug_tuple("Group").finish(), + Self::Yield => f.debug_tuple("Yield").finish(), } } } @@ -286,33 +290,38 @@ pub enum CompletionType { Sync(SyncCompletion), Truncate(TruncateCompletion), Group(GroupCompletion), + Yield, } impl Completion { pub fn new(completion_type: CompletionType) -> Self { Self { - inner: Arc::new(CompletionInner { + inner: Some(Arc::new(CompletionInner { completion_type, result: OnceLock::new(), needs_link: false, parent: OnceLock::new(), - }), + })), } } pub fn new_linked(completion_type: CompletionType) -> Self { Self { - inner: Arc::new(CompletionInner { + inner: Some(Arc::new(CompletionInner { completion_type, result: OnceLock::new(), needs_link: true, parent: OnceLock::new(), - }), + })), } } + pub(self) fn get_inner(&self) -> &Arc { + self.inner.as_ref().unwrap() + } + pub fn needs_link(&self) -> bool { - self.inner.needs_link + self.get_inner().needs_link } pub fn new_write_linked(complete: F) -> Self @@ -360,43 +369,56 @@ impl Completion { )))) } - /// Create a dummy completed completion - pub fn new_dummy() -> Self { - let c = Self::new_write(|_| {}); - c.complete(0); - c + /// Create a yield completion. These are completed by default allowing to yield control without + /// allocating memory. + pub fn new_yield() -> Self { + Self { inner: None } } pub fn succeeded(&self) -> bool { - match &self.inner.completion_type { - CompletionType::Group(g) => { - g.inner.outstanding.load(Ordering::SeqCst) == 0 - && g.inner.result.get().is_none_or(|e| e.is_none()) - } - _ => self.inner.result.get().is_some(), + match &self.inner { + Some(inner) => match &inner.completion_type { + CompletionType::Group(g) => { + g.inner.outstanding.load(Ordering::SeqCst) == 0 + && g.inner.result.get().is_none_or(|e| e.is_none()) + } + _ => inner.result.get().is_some(), + }, + None => true, } } pub fn failed(&self) -> bool { - self.inner.result.get().is_some_and(|val| val.is_some()) + match &self.inner { + Some(inner) => inner.result.get().is_some_and(|val| val.is_some()), + None => false, + } } pub fn get_error(&self) -> Option { - match &self.inner.completion_type { - CompletionType::Group(g) => { - // For groups, check the group's cached result field - // (set when the last completion finishes) - g.inner.result.get().and_then(|res| *res) + match &self.inner { + Some(inner) => { + match &inner.completion_type { + CompletionType::Group(g) => { + // For groups, check the group's cached result field + // (set when the last completion finishes) + g.inner.result.get().and_then(|res| *res) + } + _ => inner.result.get().and_then(|res| *res), + } } - _ => self.inner.result.get().and_then(|res| *res), + None => None, } } /// Checks if the Completion completed or errored pub fn finished(&self) -> bool { - match &self.inner.completion_type { - CompletionType::Group(g) => g.inner.outstanding.load(Ordering::SeqCst) == 0, - _ => self.inner.result.get().is_some(), + match &self.inner { + Some(inner) => match &inner.completion_type { + CompletionType::Group(g) => g.inner.outstanding.load(Ordering::SeqCst) == 0, + _ => inner.result.get().is_some(), + }, + None => true, } } @@ -415,16 +437,18 @@ impl Completion { } fn callback(&self, result: Result) { - self.inner.result.get_or_init(|| { - match &self.inner.completion_type { + let inner = self.get_inner(); + inner.result.get_or_init(|| { + match &inner.completion_type { CompletionType::Read(r) => r.callback(result), CompletionType::Write(w) => w.callback(result), CompletionType::Sync(s) => s.callback(result), // fix CompletionType::Truncate(t) => t.callback(result), CompletionType::Group(g) => g.callback(result), + CompletionType::Yield => {} }; - if let Some(group) = self.inner.parent.get() { + if let Some(group) = inner.parent.get() { // Capture first error in group if let Err(err) = result { let _ = group.result.set(Some(err)); @@ -446,7 +470,8 @@ impl Completion { /// only call this method if you are sure that the completion is /// a ReadCompletion, panics otherwise pub fn as_read(&self) -> &ReadCompletion { - match self.inner.completion_type { + let inner = self.get_inner(); + match inner.completion_type { CompletionType::Read(ref r) => r, _ => unreachable!(), } @@ -454,13 +479,13 @@ impl Completion { /// Link this completion to a group completion (internal use only) fn link_internal(&mut self, group: &Completion) { - let group_inner = match &group.inner.completion_type { + let group_inner = match &group.get_inner().completion_type { CompletionType::Group(g) => &g.inner, _ => panic!("link_internal() requires a group completion"), }; // Set the parent (can only be set once) - if self.inner.parent.set(group_inner.clone()).is_err() { + if self.get_inner().parent.set(group_inner.clone()).is_err() { panic!("completion can only be linked once"); } } diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 9c7b116c0..52722a82e 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -86,14 +86,14 @@ impl VfsMod { /// that the into_raw/from_raw contract will hold unsafe extern "C" fn callback_fn(result: i32, ctx: SendPtr) { let completion = Completion { - inner: (Arc::from_raw(ctx.inner().as_ptr() as *mut CompletionInner)), + inner: (Some(Arc::from_raw(ctx.inner().as_ptr() as *mut CompletionInner))), }; completion.complete(result); } fn to_callback(c: Completion) -> IOCallback { IOCallback::new(callback_fn, unsafe { - NonNull::new_unchecked(Arc::into_raw(c.inner) as *mut c_void) + NonNull::new_unchecked(Arc::into_raw(c.get_inner().clone()) as *mut c_void) }) } diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 675695b78..0726504bb 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -625,7 +625,7 @@ impl StateTransition for CommitStateMachine { let locked = self.commit_coordinator.pager_commit_lock.write(); if !locked { return Ok(TransitionResult::Io(IOCompletions::Single( - Completion::new_dummy(), + Completion::new_yield(), ))); } } diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 9ab4c689e..71a63aba6 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -5717,7 +5717,7 @@ impl BTreeCursor { self.valid_state = CursorValidState::RequireAdvance(ctx.seek_op.iteration_direction()); self.context = Some(ctx); - io_yield_one!(Completion::new_dummy()); + io_yield_one!(Completion::new_yield()); } self.valid_state = CursorValidState::Valid; Ok(IOResult::Done(())) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index e3281fc7c..0d748467e 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1142,7 +1142,7 @@ impl Pager { } } else { // Give a chance for the allocation to happen elsewhere - io_yield_one!(Completion::new_dummy()); + io_yield_one!(Completion::new_yield()); } } Ok(IOResult::Done(())) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 5fb353804..6dfe12bd1 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1746,7 +1746,7 @@ impl StreamingWalReader { .min((self.file_size - offset) as usize); if read_size == 0 { // end-of-file; let caller finalize - return Ok((0, Completion::new_dummy())); + return Ok((0, Completion::new_yield())); } let buf = Arc::new(Buffer::new_temporary(read_size));