Merge 'core/io: remove new_dummy in place of new_yield' from Pere Diaz Bou

Yield is a completion that does not allocate any inner state. By design
it is completed from the start and has no errors. This allows lightly
yield without allocating any locks nor heap allocate inner state.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #3616
This commit is contained in:
Pekka Enberg
2025-10-07 13:39:19 +03:00
committed by GitHub
8 changed files with 73 additions and 46 deletions

View File

@@ -1282,7 +1282,7 @@ impl IncrementalView {
pending_row: None, // No pending row when interrupted between rows
};
// TODO: Get the actual I/O completion from the statement
let completion = crate::io::Completion::new_dummy();
let completion = crate::io::Completion::new_yield();
return Ok(IOResult::IO(crate::types::IOCompletions::Single(
completion,
)));

View File

@@ -709,14 +709,16 @@ impl Clock for UringIO {
/// use the callback pointer as the user_data for the operation as is
/// common practice for io_uring to prevent more indirection
fn get_key(c: Completion) -> u64 {
Arc::into_raw(c.inner.clone()) as u64
Arc::into_raw(c.get_inner().clone()) as u64
}
#[inline(always)]
/// convert the user_data back to an Completion pointer
fn completion_from_key(key: u64) -> Completion {
let c_inner = unsafe { Arc::from_raw(key as *const CompletionInner) };
Completion { inner: c_inner }
Completion {
inner: Some(c_inner),
}
}
pub struct UringFile {

View File

@@ -102,8 +102,10 @@ pub trait IO: Clock + Send + Sync {
while !c.finished() {
self.step()?
}
if let Some(Some(err)) = c.inner.result.get().copied() {
return Err(err.into());
if let Some(inner) = &c.inner {
if let Some(Some(err)) = inner.result.get().copied() {
return Err(err.into());
}
}
Ok(())
}
@@ -133,7 +135,8 @@ pub type TruncateComplete = dyn Fn(Result<i32, CompletionError>);
#[must_use]
#[derive(Debug, Clone)]
pub struct Completion {
inner: Arc<CompletionInner>,
/// Optional completion state. If None, it means we are Yield in order to not allocate anything
inner: Option<Arc<CompletionInner>>,
}
struct CompletionInner {
@@ -194,7 +197,7 @@ impl CompletionGroup {
c.link_internal(&group);
continue;
}
let group_inner = match &group.inner.completion_type {
let group_inner = match &group.get_inner().completion_type {
CompletionType::Group(g) => &g.inner,
_ => unreachable!(),
};
@@ -209,7 +212,7 @@ impl CompletionGroup {
group_inner.outstanding.fetch_sub(1, Ordering::SeqCst);
}
let group_inner = match &group.inner.completion_type {
let group_inner = match &group.get_inner().completion_type {
CompletionType::Group(g) => &g.inner,
_ => unreachable!(),
};
@@ -276,6 +279,7 @@ impl Debug for CompletionType {
Self::Sync(..) => f.debug_tuple("Sync").finish(),
Self::Truncate(..) => f.debug_tuple("Truncate").finish(),
Self::Group(..) => f.debug_tuple("Group").finish(),
Self::Yield => f.debug_tuple("Yield").finish(),
}
}
}
@@ -286,33 +290,38 @@ pub enum CompletionType {
Sync(SyncCompletion),
Truncate(TruncateCompletion),
Group(GroupCompletion),
Yield,
}
impl Completion {
pub fn new(completion_type: CompletionType) -> Self {
Self {
inner: Arc::new(CompletionInner {
inner: Some(Arc::new(CompletionInner {
completion_type,
result: OnceLock::new(),
needs_link: false,
parent: OnceLock::new(),
}),
})),
}
}
pub fn new_linked(completion_type: CompletionType) -> Self {
Self {
inner: Arc::new(CompletionInner {
inner: Some(Arc::new(CompletionInner {
completion_type,
result: OnceLock::new(),
needs_link: true,
parent: OnceLock::new(),
}),
})),
}
}
pub(self) fn get_inner(&self) -> &Arc<CompletionInner> {
self.inner.as_ref().unwrap()
}
pub fn needs_link(&self) -> bool {
self.inner.needs_link
self.get_inner().needs_link
}
pub fn new_write_linked<F>(complete: F) -> Self
@@ -360,43 +369,56 @@ impl Completion {
))))
}
/// Create a dummy completed completion
pub fn new_dummy() -> Self {
let c = Self::new_write(|_| {});
c.complete(0);
c
/// Create a yield completion. These are completed by default allowing to yield control without
/// allocating memory.
pub fn new_yield() -> Self {
Self { inner: None }
}
pub fn succeeded(&self) -> bool {
match &self.inner.completion_type {
CompletionType::Group(g) => {
g.inner.outstanding.load(Ordering::SeqCst) == 0
&& g.inner.result.get().is_none_or(|e| e.is_none())
}
_ => self.inner.result.get().is_some(),
match &self.inner {
Some(inner) => match &inner.completion_type {
CompletionType::Group(g) => {
g.inner.outstanding.load(Ordering::SeqCst) == 0
&& g.inner.result.get().is_none_or(|e| e.is_none())
}
_ => inner.result.get().is_some(),
},
None => true,
}
}
pub fn failed(&self) -> bool {
self.inner.result.get().is_some_and(|val| val.is_some())
match &self.inner {
Some(inner) => inner.result.get().is_some_and(|val| val.is_some()),
None => false,
}
}
pub fn get_error(&self) -> Option<CompletionError> {
match &self.inner.completion_type {
CompletionType::Group(g) => {
// For groups, check the group's cached result field
// (set when the last completion finishes)
g.inner.result.get().and_then(|res| *res)
match &self.inner {
Some(inner) => {
match &inner.completion_type {
CompletionType::Group(g) => {
// For groups, check the group's cached result field
// (set when the last completion finishes)
g.inner.result.get().and_then(|res| *res)
}
_ => inner.result.get().and_then(|res| *res),
}
}
_ => self.inner.result.get().and_then(|res| *res),
None => None,
}
}
/// Checks if the Completion completed or errored
pub fn finished(&self) -> bool {
match &self.inner.completion_type {
CompletionType::Group(g) => g.inner.outstanding.load(Ordering::SeqCst) == 0,
_ => self.inner.result.get().is_some(),
match &self.inner {
Some(inner) => match &inner.completion_type {
CompletionType::Group(g) => g.inner.outstanding.load(Ordering::SeqCst) == 0,
_ => inner.result.get().is_some(),
},
None => true,
}
}
@@ -415,16 +437,18 @@ impl Completion {
}
fn callback(&self, result: Result<i32, CompletionError>) {
self.inner.result.get_or_init(|| {
match &self.inner.completion_type {
let inner = self.get_inner();
inner.result.get_or_init(|| {
match &inner.completion_type {
CompletionType::Read(r) => r.callback(result),
CompletionType::Write(w) => w.callback(result),
CompletionType::Sync(s) => s.callback(result), // fix
CompletionType::Truncate(t) => t.callback(result),
CompletionType::Group(g) => g.callback(result),
CompletionType::Yield => {}
};
if let Some(group) = self.inner.parent.get() {
if let Some(group) = inner.parent.get() {
// Capture first error in group
if let Err(err) = result {
let _ = group.result.set(Some(err));
@@ -446,7 +470,8 @@ impl Completion {
/// only call this method if you are sure that the completion is
/// a ReadCompletion, panics otherwise
pub fn as_read(&self) -> &ReadCompletion {
match self.inner.completion_type {
let inner = self.get_inner();
match inner.completion_type {
CompletionType::Read(ref r) => r,
_ => unreachable!(),
}
@@ -454,13 +479,13 @@ impl Completion {
/// Link this completion to a group completion (internal use only)
fn link_internal(&mut self, group: &Completion) {
let group_inner = match &group.inner.completion_type {
let group_inner = match &group.get_inner().completion_type {
CompletionType::Group(g) => &g.inner,
_ => panic!("link_internal() requires a group completion"),
};
// Set the parent (can only be set once)
if self.inner.parent.set(group_inner.clone()).is_err() {
if self.get_inner().parent.set(group_inner.clone()).is_err() {
panic!("completion can only be linked once");
}
}

View File

@@ -86,14 +86,14 @@ impl VfsMod {
/// that the into_raw/from_raw contract will hold
unsafe extern "C" fn callback_fn(result: i32, ctx: SendPtr) {
let completion = Completion {
inner: (Arc::from_raw(ctx.inner().as_ptr() as *mut CompletionInner)),
inner: (Some(Arc::from_raw(ctx.inner().as_ptr() as *mut CompletionInner))),
};
completion.complete(result);
}
fn to_callback(c: Completion) -> IOCallback {
IOCallback::new(callback_fn, unsafe {
NonNull::new_unchecked(Arc::into_raw(c.inner) as *mut c_void)
NonNull::new_unchecked(Arc::into_raw(c.get_inner().clone()) as *mut c_void)
})
}

View File

@@ -625,7 +625,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
let locked = self.commit_coordinator.pager_commit_lock.write();
if !locked {
return Ok(TransitionResult::Io(IOCompletions::Single(
Completion::new_dummy(),
Completion::new_yield(),
)));
}
}

View File

@@ -5717,7 +5717,7 @@ impl BTreeCursor {
self.valid_state =
CursorValidState::RequireAdvance(ctx.seek_op.iteration_direction());
self.context = Some(ctx);
io_yield_one!(Completion::new_dummy());
io_yield_one!(Completion::new_yield());
}
self.valid_state = CursorValidState::Valid;
Ok(IOResult::Done(()))

View File

@@ -1142,7 +1142,7 @@ impl Pager {
}
} else {
// Give a chance for the allocation to happen elsewhere
io_yield_one!(Completion::new_dummy());
io_yield_one!(Completion::new_yield());
}
}
Ok(IOResult::Done(()))

View File

@@ -1746,7 +1746,7 @@ impl StreamingWalReader {
.min((self.file_size - offset) as usize);
if read_size == 0 {
// end-of-file; let caller finalize
return Ok((0, Completion::new_dummy()));
return Ok((0, Completion::new_yield()));
}
let buf = Arc::new(Buffer::new_temporary(read_size));