From 159e2fbd718c696bf158d8450c3aee3ddb642c29 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Fri, 7 Feb 2025 11:55:38 -0500 Subject: [PATCH 1/4] Remove rc refcell in unix io module --- core/io/unix.rs | 87 ++++++++++++++++++++++++++++--------------------- 1 file changed, 49 insertions(+), 38 deletions(-) diff --git a/core/io/unix.rs b/core/io/unix.rs index d8301a611..f28a867f3 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -10,15 +10,15 @@ use rustix::{ fs::{self, FlockOperation, OFlags, OpenOptionsExt}, io::Errno, }; -use std::cell::RefCell; -use std::collections::HashMap; +use std::cell::{RefCell, UnsafeCell}; use std::io::{ErrorKind, Read, Seek, Write}; use std::rc::Rc; +const MAX_FD: usize = 1024; pub struct UnixIO { - poller: Rc>, - events: Rc>, - callbacks: Rc>>, + poller: UnsafeCell, + events: UnsafeCell, + callbacks: UnsafeCell<[Option; MAX_FD]>, } impl UnixIO { @@ -26,9 +26,9 @@ impl UnixIO { pub fn new() -> Result { debug!("Using IO backend 'syscall'"); Ok(Self { - poller: Rc::new(RefCell::new(Poller::new()?)), - events: Rc::new(RefCell::new(Events::new())), - callbacks: Rc::new(RefCell::new(HashMap::new())), + poller: Poller::new()?.into(), + events: Events::new().into(), + callbacks: [const { None }; MAX_FD].into(), }) } } @@ -45,8 +45,8 @@ impl IO for UnixIO { let unix_file = Rc::new(UnixFile { file: Rc::new(RefCell::new(file)), - poller: self.poller.clone(), - callbacks: self.callbacks.clone(), + poller: UnsafeCell::new(unsafe { &mut *self.poller.get() }), + callbacks: UnsafeCell::new(unsafe { &mut *self.callbacks.get() }), }); if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() { unix_file.lock_file(true)?; @@ -55,18 +55,22 @@ impl IO for UnixIO { } fn run_once(&self) -> Result<()> { - if self.callbacks.borrow().is_empty() { - return Ok(()); + { + let callbacks = unsafe { &mut *self.callbacks.get() }; + if callbacks.iter().all(|c| c.is_none()) { + return Ok(()); + } } - let mut events = self.events.borrow_mut(); + let events = unsafe { &mut *self.events.get() }; events.clear(); trace!("run_once() waits for events"); - let poller = self.poller.borrow(); - poller.wait(&mut events, None)?; + let poller = unsafe { &mut *self.poller.get() }; + poller.wait(events, None)?; for event in events.iter() { - if let Some(cf) = self.callbacks.borrow_mut().remove(&event.key) { + let callbacks = unsafe { &mut *self.callbacks.get() }; + if let Some(cf) = callbacks[event.key].as_ref() { let result = { match cf { CompletionCallback::Read(ref file, ref c, pos) => { @@ -77,13 +81,13 @@ impl IO for UnixIO { _ => unreachable!(), }; let mut buf = r.buf_mut(); - file.seek(std::io::SeekFrom::Start(pos as u64))?; + file.seek(std::io::SeekFrom::Start(*pos as u64))?; file.read(buf.as_mut_slice()) } CompletionCallback::Write(ref file, _, ref buf, pos) => { let mut file = file.borrow_mut(); let buf = buf.borrow(); - file.seek(std::io::SeekFrom::Start(pos as u64))?; + file.seek(std::io::SeekFrom::Start(*pos as u64))?; file.write(buf.as_slice()) } } @@ -103,6 +107,7 @@ impl IO for UnixIO { Err(e) => Err(e.into()), }; } + callbacks[event.key] = None; } Ok(()) } @@ -128,13 +133,13 @@ enum CompletionCallback { ), } -pub struct UnixFile { +pub struct UnixFile<'io> { file: Rc>, - poller: Rc>, - callbacks: Rc>>, + poller: UnsafeCell<&'io mut Poller>, + callbacks: UnsafeCell<&'io mut [Option; MAX_FD]>, } -impl File for UnixFile { +impl File for UnixFile<'_> { fn lock_file(&self, exclusive: bool) -> Result<()> { let fd = self.file.borrow(); let fd = fd.as_fd(); @@ -196,14 +201,14 @@ impl File for UnixFile { // Would block, set up polling let fd = file.as_raw_fd(); unsafe { - self.poller - .borrow() - .add(&file.as_fd(), Event::readable(fd as usize))?; + let poller = &mut *self.poller.get(); + poller.add(&file.as_fd(), Event::readable(fd as usize))?; + } + { + let callbacks = unsafe { &mut *self.callbacks.get() }; + callbacks[fd as usize] = + Some(CompletionCallback::Read(self.file.clone(), c, pos)); } - self.callbacks.borrow_mut().insert( - fd as usize, - CompletionCallback::Read(self.file.clone(), c, pos), - ); Ok(()) } Err(e) => Err(e.into()), @@ -227,15 +232,21 @@ impl File for UnixFile { trace!("pwrite blocks"); // Would block, set up polling let fd = file.as_raw_fd(); - unsafe { - self.poller - .borrow() - .add(&file.as_fd(), Event::readable(fd as usize))?; + { + unsafe { + let poller = &mut *self.poller.get(); + poller.add(&file.as_fd(), Event::readable(fd as usize))?; + } + } + { + let callbacks = unsafe { &mut *self.callbacks.get() }; + callbacks[fd as usize] = Some(CompletionCallback::Write( + self.file.clone(), + c, + buffer.clone(), + pos, + )); } - self.callbacks.borrow_mut().insert( - fd as usize, - CompletionCallback::Write(self.file.clone(), c, buffer.clone(), pos), - ); Ok(()) } Err(e) => Err(e.into()), @@ -261,7 +272,7 @@ impl File for UnixFile { } } -impl Drop for UnixFile { +impl Drop for UnixFile<'_> { fn drop(&mut self) { self.unlock_file().expect("Failed to unlock file"); } From 975cf95c375671dbfdb06fcf00678d72c127faab Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 8 Feb 2025 08:03:42 -0500 Subject: [PATCH 2/4] Reformat unix io with wrappers for unsafecell --- core/io/mod.rs | 26 ++++++++ core/io/unix.rs | 159 ++++++++++++++++++++++++++++++++++-------------- 2 files changed, 140 insertions(+), 45 deletions(-) diff --git a/core/io/mod.rs b/core/io/mod.rs index 4f92ad3e1..089add355 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -56,6 +56,32 @@ impl Completion { Self::Sync(s) => s.complete(result), // fix } } + + /// only call this method if you are sure that the completion is + /// a ReadCompletion, panics otherwise + pub fn read(&self) -> &ReadCompletion { + match self { + Self::Read(ref r) => r, + _ => unreachable!(), + } + } + /// only call this method if you are sure that the completion is + /// a WriteCompletion, panics otherwise + pub fn write(&self) -> &WriteCompletion { + match self { + Self::Write(ref w) => w, + _ => unreachable!(), + } + } + /// + /// only call this method if you are sure that the completion is + /// a SyncCompletion, panics otherwise + pub fn sync(&self) -> &SyncCompletion { + match self { + Self::Sync(ref s) => s, + _ => unreachable!(), + } + } } pub struct WriteCompletion { diff --git a/core/io/unix.rs b/core/io/unix.rs index f28a867f3..68e17a9e7 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -13,12 +13,92 @@ use rustix::{ use std::cell::{RefCell, UnsafeCell}; use std::io::{ErrorKind, Read, Seek, Write}; use std::rc::Rc; -const MAX_FD: usize = 1024; +struct OwnedCallbacks(UnsafeCell); +struct BorrowedCallbacks<'io>(UnsafeCell<&'io mut Callbacks>); + +impl OwnedCallbacks { + fn new() -> Self { + Self(UnsafeCell::new(Callbacks::new())) + } + fn as_mut<'io>(&self) -> &'io mut Callbacks { + unsafe { &mut *self.0.get() } + } +} + +impl<'io> BorrowedCallbacks<'io> { + fn as_mut(&self) -> &'io mut Callbacks { + unsafe { *self.0.get() } + } +} + +struct EventsHandler(UnsafeCell); + +impl EventsHandler { + fn new() -> Self { + Self(UnsafeCell::new(Events::new())) + } + + fn as_mut<'io>(&self) -> &'io mut Events { + unsafe { &mut *self.0.get() } + } +} +struct PollHandler(UnsafeCell); +struct BorrowedPollHandler<'io>(UnsafeCell<&'io mut Poller>); + +impl<'io> BorrowedPollHandler<'io> { + fn get(&self) -> &'io mut Poller { + unsafe { *self.0.get() } + } +} + +impl PollHandler { + fn new() -> Self { + Self(UnsafeCell::new(Poller::new().unwrap())) + } + + fn as_mut<'io>(&self) -> &'io mut Poller { + unsafe { &mut *self.0.get() } + } +} + +type CallbackEntry = (usize, CompletionCallback); + +struct Callbacks { + entries: Vec, +} + +impl Callbacks { + fn new() -> Self { + Self { + entries: Vec::with_capacity(32), + } + } + + fn insert(&mut self, fd: usize, callback: CompletionCallback) { + if let Some(entry) = self.entries.iter_mut().find(|(key, _)| *key == fd) { + // replace existing + let _ = std::mem::replace(&mut entry.1, callback); + return; + } + self.entries.push((fd, callback)); + } + + fn remove(&mut self, fd: usize) -> Option { + if let Some(pos) = self.entries.iter().position(|&(k, _)| k == fd) { + Some(self.entries.swap_remove(pos).1) + } else { + None + } + } +} + +/// UnixIO lives longer than any of the files it creates, so it's +/// safe to store references to it's internals in the UnixFiles pub struct UnixIO { - poller: UnsafeCell, - events: UnsafeCell, - callbacks: UnsafeCell<[Option; MAX_FD]>, + poller: PollHandler, + events: EventsHandler, + callbacks: OwnedCallbacks, } impl UnixIO { @@ -26,9 +106,9 @@ impl UnixIO { pub fn new() -> Result { debug!("Using IO backend 'syscall'"); Ok(Self { - poller: Poller::new()?.into(), - events: Events::new().into(), - callbacks: [const { None }; MAX_FD].into(), + poller: PollHandler::new(), + events: EventsHandler::new(), + callbacks: OwnedCallbacks::new(), }) } } @@ -45,8 +125,8 @@ impl IO for UnixIO { let unix_file = Rc::new(UnixFile { file: Rc::new(RefCell::new(file)), - poller: UnsafeCell::new(unsafe { &mut *self.poller.get() }), - callbacks: UnsafeCell::new(unsafe { &mut *self.callbacks.get() }), + poller: BorrowedPollHandler(self.poller.as_mut().into()), + callbacks: BorrowedCallbacks(self.callbacks.as_mut().into()), }); if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() { unix_file.lock_file(true)?; @@ -56,38 +136,33 @@ impl IO for UnixIO { fn run_once(&self) -> Result<()> { { - let callbacks = unsafe { &mut *self.callbacks.get() }; - if callbacks.iter().all(|c| c.is_none()) { + if self.callbacks.as_mut().entries.is_empty() { return Ok(()); } } - let events = unsafe { &mut *self.events.get() }; - events.clear(); - + { + self.events.as_mut().clear(); + } trace!("run_once() waits for events"); - let poller = unsafe { &mut *self.poller.get() }; - poller.wait(events, None)?; - - for event in events.iter() { - let callbacks = unsafe { &mut *self.callbacks.get() }; - if let Some(cf) = callbacks[event.key].as_ref() { + { + self.poller.as_mut().wait(self.events.as_mut(), None)?; + } + for event in self.events.as_mut().iter() { + let callbacks = self.callbacks.as_mut(); + if let Some(cf) = callbacks.remove(event.key) { let result = { match cf { CompletionCallback::Read(ref file, ref c, pos) => { let mut file = file.borrow_mut(); - let c: &Completion = c; - let r = match c { - Completion::Read(r) => r, - _ => unreachable!(), - }; + let r = c.read(); let mut buf = r.buf_mut(); - file.seek(std::io::SeekFrom::Start(*pos as u64))?; + file.seek(std::io::SeekFrom::Start(pos as u64))?; file.read(buf.as_mut_slice()) } CompletionCallback::Write(ref file, _, ref buf, pos) => { let mut file = file.borrow_mut(); let buf = buf.borrow(); - file.seek(std::io::SeekFrom::Start(*pos as u64))?; + file.seek(std::io::SeekFrom::Start(pos as u64))?; file.write(buf.as_slice()) } } @@ -107,7 +182,6 @@ impl IO for UnixIO { Err(e) => Err(e.into()), }; } - callbacks[event.key] = None; } Ok(()) } @@ -135,8 +209,8 @@ enum CompletionCallback { pub struct UnixFile<'io> { file: Rc>, - poller: UnsafeCell<&'io mut Poller>, - callbacks: UnsafeCell<&'io mut [Option; MAX_FD]>, + poller: BorrowedPollHandler<'io>, + callbacks: BorrowedCallbacks<'io>, } impl File for UnixFile<'_> { @@ -182,10 +256,7 @@ impl File for UnixFile<'_> { fn pread(&self, pos: usize, c: Completion) -> Result<()> { let file = self.file.borrow(); let result = { - let r = match c { - Completion::Read(ref r) => r, - _ => unreachable!(), - }; + let r = c.read(); let mut buf = r.buf_mut(); rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64) }; @@ -205,9 +276,10 @@ impl File for UnixFile<'_> { poller.add(&file.as_fd(), Event::readable(fd as usize))?; } { - let callbacks = unsafe { &mut *self.callbacks.get() }; - callbacks[fd as usize] = - Some(CompletionCallback::Read(self.file.clone(), c, pos)); + self.callbacks.as_mut().insert( + fd as usize, + CompletionCallback::Read(self.file.clone(), c, pos), + ); } Ok(()) } @@ -234,18 +306,15 @@ impl File for UnixFile<'_> { let fd = file.as_raw_fd(); { unsafe { - let poller = &mut *self.poller.get(); + let poller = self.poller.get(); poller.add(&file.as_fd(), Event::readable(fd as usize))?; } } { - let callbacks = unsafe { &mut *self.callbacks.get() }; - callbacks[fd as usize] = Some(CompletionCallback::Write( - self.file.clone(), - c, - buffer.clone(), - pos, - )); + self.callbacks.as_mut().insert( + fd as usize, + CompletionCallback::Write(self.file.clone(), c, buffer.clone(), pos), + ); } Ok(()) } From 9098237a12d34a68d52aee5f3761f16f173b2bb0 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 8 Feb 2025 09:07:42 -0500 Subject: [PATCH 3/4] Add as_read method to completion enum --- core/io/io_uring.rs | 5 +- core/io/memory.rs | 5 +- core/io/mod.rs | 19 +---- core/io/unix.rs | 180 +++++++++++++++++++++++++------------------- core/io/windows.rs | 5 +- 5 files changed, 106 insertions(+), 108 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index ebd1fcb61..ffc87c346 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -242,10 +242,7 @@ impl File for UringFile { } fn pread(&self, pos: usize, c: Completion) -> Result<()> { - let r = match c { - Completion::Read(ref r) => r, - _ => unreachable!(), - }; + let r = c.as_read(); trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let fd = io_uring::types::Fd(self.file.as_raw_fd()); let mut io = self.io.borrow_mut(); diff --git a/core/io/memory.rs b/core/io/memory.rs index 164268d5e..24f55b091 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -79,10 +79,7 @@ impl File for MemoryFile { } fn pread(&self, pos: usize, c: Completion) -> Result<()> { - let r = match &c { - Completion::Read(ref r) => r, - _ => unreachable!(), - }; + let r = c.as_read(); let buf_len = r.buf().len(); if buf_len == 0 { c.complete(0); diff --git a/core/io/mod.rs b/core/io/mod.rs index 089add355..6c1eddb04 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -59,29 +59,12 @@ impl Completion { /// only call this method if you are sure that the completion is /// a ReadCompletion, panics otherwise - pub fn read(&self) -> &ReadCompletion { + pub fn as_read(&self) -> &ReadCompletion { match self { Self::Read(ref r) => r, _ => unreachable!(), } } - /// only call this method if you are sure that the completion is - /// a WriteCompletion, panics otherwise - pub fn write(&self) -> &WriteCompletion { - match self { - Self::Write(ref w) => w, - _ => unreachable!(), - } - } - /// - /// only call this method if you are sure that the completion is - /// a SyncCompletion, panics otherwise - pub fn sync(&self) -> &SyncCompletion { - match self { - Self::Sync(ref s) => s, - _ => unreachable!(), - } - } } pub struct WriteCompletion { diff --git a/core/io/unix.rs b/core/io/unix.rs index 68e17a9e7..52d5d5565 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -24,11 +24,21 @@ impl OwnedCallbacks { fn as_mut<'io>(&self) -> &'io mut Callbacks { unsafe { &mut *self.0.get() } } + + fn is_empty(&self) -> bool { + self.as_mut().count == 0 + } + + fn remove(&self, fd: usize) -> Option { + let callbacks = unsafe { &mut *self.0.get() }; + callbacks.remove(fd) + } } -impl<'io> BorrowedCallbacks<'io> { - fn as_mut(&self) -> &'io mut Callbacks { - unsafe { *self.0.get() } +impl BorrowedCallbacks<'_> { + fn insert(&self, fd: usize, callback: CompletionCallback) { + let callbacks = unsafe { &mut *self.0.get() }; + callbacks.insert(fd, callback); } } @@ -39,6 +49,16 @@ impl EventsHandler { Self(UnsafeCell::new(Events::new())) } + fn clear(&self) { + let events = unsafe { &mut *self.0.get() }; + events.clear(); + } + + fn iter(&self) -> impl Iterator { + let events = unsafe { &*self.0.get() }; + events.iter() + } + fn as_mut<'io>(&self) -> &'io mut Events { unsafe { &mut *self.0.get() } } @@ -46,9 +66,11 @@ impl EventsHandler { struct PollHandler(UnsafeCell); struct BorrowedPollHandler<'io>(UnsafeCell<&'io mut Poller>); -impl<'io> BorrowedPollHandler<'io> { - fn get(&self) -> &'io mut Poller { - unsafe { *self.0.get() } +impl BorrowedPollHandler<'_> { + fn add(&self, fd: &rustix::fd::BorrowedFd, event: Event) -> Result<()> { + let poller = unsafe { &mut *self.0.get() }; + unsafe { poller.add(fd, event)? } + Ok(()) } } @@ -56,6 +78,11 @@ impl PollHandler { fn new() -> Self { Self(UnsafeCell::new(Poller::new().unwrap())) } + fn wait(&self, events: &mut Events, timeout: Option) -> Result<()> { + let poller = unsafe { &mut *self.0.get() }; + poller.wait(events, timeout)?; + Ok(()) + } fn as_mut<'io>(&self) -> &'io mut Poller { unsafe { &mut *self.0.get() } @@ -64,36 +91,56 @@ impl PollHandler { type CallbackEntry = (usize, CompletionCallback); +const FD_INLINE_SIZE: usize = 32; + struct Callbacks { - entries: Vec, + inline_entries: [Option<(usize, CompletionCallback)>; FD_INLINE_SIZE], + heap_entries: Vec, + count: usize, } impl Callbacks { fn new() -> Self { Self { - entries: Vec::with_capacity(32), + inline_entries: core::array::from_fn(|_| None), + heap_entries: Vec::new(), + count: 0, } } fn insert(&mut self, fd: usize, callback: CompletionCallback) { - if let Some(entry) = self.entries.iter_mut().find(|(key, _)| *key == fd) { - // replace existing - let _ = std::mem::replace(&mut entry.1, callback); - return; + if self.count < FD_INLINE_SIZE { + self.inline_entries[self.count] = Some((fd, callback)); + } else { + self.heap_entries.push((fd, callback)); } - self.entries.push((fd, callback)); + self.count += 1; } fn remove(&mut self, fd: usize) -> Option { - if let Some(pos) = self.entries.iter().position(|&(k, _)| k == fd) { - Some(self.entries.swap_remove(pos).1) - } else { - None + if let Some(pos) = self + .inline_entries + .iter() + .position(|cb| cb.as_ref().is_some_and(|cb| cb.0 == fd)) + { + let callback = self.inline_entries[pos].take(); + // swap with last valid entry + if pos < self.count - 1 { + self.inline_entries[pos] = self.inline_entries[self.count - 1].take(); + } + self.count -= 1; + return callback.map(|c| c.1); } + + if let Some(pos) = self.heap_entries.iter().position(|&(k, _)| k == fd) { + self.count -= 1; + return Some(self.heap_entries.swap_remove(pos).1); + } + None } } -/// UnixIO lives longer than any of the files it creates, so it's +/// UnixIO lives longer than any of the files it creates, so it is /// safe to store references to it's internals in the UnixFiles pub struct UnixIO { poller: PollHandler, @@ -135,52 +182,37 @@ impl IO for UnixIO { } fn run_once(&self) -> Result<()> { - { - if self.callbacks.as_mut().entries.is_empty() { - return Ok(()); - } - } - { - self.events.as_mut().clear(); + if self.callbacks.is_empty() { + return Ok(()); } + self.events.clear(); trace!("run_once() waits for events"); - { - self.poller.as_mut().wait(self.events.as_mut(), None)?; - } - for event in self.events.as_mut().iter() { - let callbacks = self.callbacks.as_mut(); - if let Some(cf) = callbacks.remove(event.key) { - let result = { - match cf { - CompletionCallback::Read(ref file, ref c, pos) => { - let mut file = file.borrow_mut(); - let r = c.read(); - let mut buf = r.buf_mut(); - file.seek(std::io::SeekFrom::Start(pos as u64))?; - file.read(buf.as_mut_slice()) - } - CompletionCallback::Write(ref file, _, ref buf, pos) => { - let mut file = file.borrow_mut(); - let buf = buf.borrow(); - file.seek(std::io::SeekFrom::Start(pos as u64))?; - file.write(buf.as_slice()) - } + self.poller.wait(self.events.as_mut(), None)?; + + for event in self.events.iter() { + if let Some(cf) = self.callbacks.remove(event.key) { + let result = match cf { + CompletionCallback::Read(ref file, ref c, pos) => { + let mut file = file.borrow_mut(); + let r = c.as_read(); + let mut buf = r.buf_mut(); + file.seek(std::io::SeekFrom::Start(pos as u64))?; + file.read(buf.as_mut_slice()) + } + CompletionCallback::Write(ref file, _, ref buf, pos) => { + let mut file = file.borrow_mut(); + let buf = buf.borrow(); + file.seek(std::io::SeekFrom::Start(pos as u64))?; + file.write(buf.as_slice()) } }; - return match result { - Ok(n) => { - match &cf { - CompletionCallback::Read(_, ref c, _) => { - c.complete(0); - } - CompletionCallback::Write(_, ref c, _, _) => { - c.complete(n as i32); - } - } - Ok(()) - } - Err(e) => Err(e.into()), - }; + match result { + Ok(n) => match &cf { + CompletionCallback::Read(_, ref c, _) => c.complete(0), + CompletionCallback::Write(_, ref c, _, _) => c.complete(n as i32), + }, + Err(e) => return Err(e.into()), + } } } Ok(()) @@ -256,7 +288,7 @@ impl File for UnixFile<'_> { fn pread(&self, pos: usize, c: Completion) -> Result<()> { let file = self.file.borrow(); let result = { - let r = c.read(); + let r = c.as_read(); let mut buf = r.buf_mut(); rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64) }; @@ -271,12 +303,10 @@ impl File for UnixFile<'_> { trace!("pread blocks"); // Would block, set up polling let fd = file.as_raw_fd(); - unsafe { - let poller = &mut *self.poller.get(); - poller.add(&file.as_fd(), Event::readable(fd as usize))?; - } + self.poller + .add(&file.as_fd(), Event::readable(fd as usize))?; { - self.callbacks.as_mut().insert( + self.callbacks.insert( fd as usize, CompletionCallback::Read(self.file.clone(), c, pos), ); @@ -304,18 +334,12 @@ impl File for UnixFile<'_> { trace!("pwrite blocks"); // Would block, set up polling let fd = file.as_raw_fd(); - { - unsafe { - let poller = self.poller.get(); - poller.add(&file.as_fd(), Event::readable(fd as usize))?; - } - } - { - self.callbacks.as_mut().insert( - fd as usize, - CompletionCallback::Write(self.file.clone(), c, buffer.clone(), pos), - ); - } + self.poller + .add(&file.as_fd(), Event::readable(fd as usize))?; + self.callbacks.insert( + fd as usize, + CompletionCallback::Write(self.file.clone(), c, buffer.clone(), pos), + ); Ok(()) } Err(e) => Err(e.into()), diff --git a/core/io/windows.rs b/core/io/windows.rs index d359c4575..8d93a5abe 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -58,10 +58,7 @@ impl File for WindowsFile { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { - let r = match c { - Completion::Read(ref r) => r, - _ => unreachable!(), - }; + let r = c.as_read(); let mut buf = r.buf_mut(); let buf = buf.as_mut_slice(); file.read_exact(buf)?; From e3ab80ae28d93c6d3893720886f3556f05471540 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 8 Feb 2025 17:13:41 -0500 Subject: [PATCH 4/4] Replace Option with MaybeUninit in unix IO --- core/io/unix.rs | 57 +++++++++++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/core/io/unix.rs b/core/io/unix.rs index 52d5d5565..d7290b861 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -10,9 +10,12 @@ use rustix::{ fs::{self, FlockOperation, OFlags, OpenOptionsExt}, io::Errno, }; -use std::cell::{RefCell, UnsafeCell}; use std::io::{ErrorKind, Read, Seek, Write}; use std::rc::Rc; +use std::{ + cell::{RefCell, UnsafeCell}, + mem::MaybeUninit, +}; struct OwnedCallbacks(UnsafeCell); struct BorrowedCallbacks<'io>(UnsafeCell<&'io mut Callbacks>); @@ -26,7 +29,7 @@ impl OwnedCallbacks { } fn is_empty(&self) -> bool { - self.as_mut().count == 0 + self.as_mut().inline_count == 0 } fn remove(&self, fd: usize) -> Option { @@ -94,50 +97,62 @@ type CallbackEntry = (usize, CompletionCallback); const FD_INLINE_SIZE: usize = 32; struct Callbacks { - inline_entries: [Option<(usize, CompletionCallback)>; FD_INLINE_SIZE], + inline_entries: [MaybeUninit<(usize, CompletionCallback)>; FD_INLINE_SIZE], heap_entries: Vec, - count: usize, + inline_count: usize, } impl Callbacks { fn new() -> Self { Self { - inline_entries: core::array::from_fn(|_| None), + inline_entries: [const { MaybeUninit::uninit() }; FD_INLINE_SIZE], heap_entries: Vec::new(), - count: 0, + inline_count: 0, } } fn insert(&mut self, fd: usize, callback: CompletionCallback) { - if self.count < FD_INLINE_SIZE { - self.inline_entries[self.count] = Some((fd, callback)); + if self.inline_count < FD_INLINE_SIZE { + self.inline_entries[self.inline_count].write((fd, callback)); + self.inline_count += 1; } else { self.heap_entries.push((fd, callback)); } - self.count += 1; } fn remove(&mut self, fd: usize) -> Option { - if let Some(pos) = self - .inline_entries - .iter() - .position(|cb| cb.as_ref().is_some_and(|cb| cb.0 == fd)) - { - let callback = self.inline_entries[pos].take(); - // swap with last valid entry - if pos < self.count - 1 { - self.inline_entries[pos] = self.inline_entries[self.count - 1].take(); + if let Some(pos) = self.find_inline(fd) { + let (_, callback) = unsafe { self.inline_entries[pos].assume_init_read() }; + + // if not the last element, move the last valid entry into this position + if pos < self.inline_count - 1 { + let last_valid = + unsafe { self.inline_entries[self.inline_count - 1].assume_init_read() }; + self.inline_entries[pos].write(last_valid); } - self.count -= 1; - return callback.map(|c| c.1); + + self.inline_count -= 1; + return Some(callback); } if let Some(pos) = self.heap_entries.iter().position(|&(k, _)| k == fd) { - self.count -= 1; return Some(self.heap_entries.swap_remove(pos).1); } None } + + fn find_inline(&self, fd: usize) -> Option { + (0..self.inline_count) + .find(|&i| unsafe { self.inline_entries[i].assume_init_ref().0 == fd }) + } +} + +impl Drop for Callbacks { + fn drop(&mut self) { + for i in 0..self.inline_count { + unsafe { self.inline_entries[i].assume_init_drop() }; + } + } } /// UnixIO lives longer than any of the files it creates, so it is