From 26152e07e5f85da776c8f7f07749da249cc072e9 Mon Sep 17 00:00:00 2001 From: Bennett Clement Date: Sat, 13 Jul 2024 09:44:03 +0800 Subject: [PATCH 1/2] Use kqueue for Darwin IO --- core/Cargo.toml | 4 ++ core/io/darwin.rs | 161 ++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 144 insertions(+), 21 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index ce6c65140..3752aa23e 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -20,6 +20,10 @@ fs = [] [target.'cfg(target_os = "linux")'.dependencies] io-uring = "0.6.1" +[target.'cfg(not(target_os = "linux"))'.dependencies] +polling = "3.7.2" +rustix = "0.38.34" + [target.'cfg(not(target_family = "wasm"))'.dependencies] mimalloc = { version = "*", default-features = false } diff --git a/core/io/darwin.rs b/core/io/darwin.rs index 16d0d1a27..21f248785 100644 --- a/core/io/darwin.rs +++ b/core/io/darwin.rs @@ -1,47 +1,143 @@ use super::{Completion, File, WriteCompletion, IO}; use anyhow::{Ok, Result}; -use std::rc::Rc; -use std::cell::RefCell; -use std::io::{Read, Seek, Write}; use log::trace; +use polling::{Event, Events, Poller}; +use rustix::fd::{AsFd, AsRawFd}; +use rustix::fs::OpenOptionsExt; +use rustix::io::Errno; +use std::cell::RefCell; +use std::collections::HashMap; +use std::io::{Read, Seek, Write}; +use std::rc::Rc; -pub struct DarwinIO {} +pub struct DarwinIO { + poller: Rc>, + events: Rc>, + callbacks: Rc>>, +} impl DarwinIO { pub fn new() -> Result { - Ok(Self {}) + Ok(Self { + poller: Rc::new(RefCell::new(Poller::new()?)), + events: Rc::new(RefCell::new(Events::new())), + callbacks: Rc::new(RefCell::new(HashMap::new())), + }) } } impl IO for DarwinIO { fn open_file(&self, path: &str) -> Result> { trace!("open_file(path = {})", path); - let file = std::fs::File::open(path)?; + let file = std::fs::File::options() + .read(true) + .custom_flags(libc::O_NONBLOCK) + .open(path)?; Ok(Rc::new(DarwinFile { - file: RefCell::new(file), + file: Rc::new(RefCell::new(file)), + poller: self.poller.clone(), + callbacks: self.callbacks.clone(), })) } fn run_once(&self) -> Result<()> { + if self.callbacks.borrow().is_empty() { + return Ok(()); + } + let mut events = self.events.borrow_mut(); + events.clear(); + + trace!("run_once() waits for events"); + let poller = self.poller.borrow(); + poller.wait(&mut events, None)?; + + for event in events.iter() { + if let Some(cf) = self.callbacks.borrow_mut().remove(&event.key) { + let result = { + match cf { + CompletionCallback::Read(ref file, ref c, pos) => { + let mut file = file.borrow_mut(); + let mut buf = c.buf_mut(); + file.seek(std::io::SeekFrom::Start(pos as u64))?; + file.read(buf.as_mut_slice()) + } + CompletionCallback::Write(ref file, _, ref buf, pos) => { + let mut file = file.borrow_mut(); + let buf = buf.borrow(); + file.seek(std::io::SeekFrom::Start(pos as u64))?; + file.write(buf.as_slice()) + } + } + }; + match result { + std::result::Result::Ok(n) => { + match cf { + CompletionCallback::Read(_, ref c, _) => { + c.complete(); + } + CompletionCallback::Write(_, ref c, _, _) => { + c.complete(n); + } + } + return Ok(()); + } + Err(e) => { + return Err(e.into()); + } + } + } + } Ok(()) } } +enum CompletionCallback { + Read(Rc>, Rc, usize), + Write( + Rc>, + Rc, + Rc>, + usize, + ), +} + pub struct DarwinFile { - file: RefCell, + file: Rc>, + poller: Rc>, + callbacks: Rc>>, } impl File for DarwinFile { fn pread(&self, pos: usize, c: Rc) -> Result<()> { - let mut file = self.file.borrow_mut(); - file.seek(std::io::SeekFrom::Start(pos as u64))?; - { + let file = self.file.borrow(); + let result = { let mut buf = c.buf_mut(); - let buf = buf.as_mut_slice(); - file.read_exact(buf)?; + rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64) + }; + match result { + std::result::Result::Ok(n) => { + trace!("pread n: {}", n); + // Read succeeded immediately + c.complete(); + Ok(()) + } + Err(Errno::AGAIN) => { + trace!("pread blocks"); + // Would block, set up polling + let fd = file.as_raw_fd(); + unsafe { + self.poller + .borrow() + .add(&file.as_fd(), Event::readable(fd as usize))?; + } + self.callbacks.borrow_mut().insert( + fd as usize, + CompletionCallback::Read(self.file.clone(), c.clone(), pos), + ); + Ok(()) + } + Err(e) => Err(e.into()), } - c.complete(); - Ok(()) } fn pwrite( @@ -50,11 +146,34 @@ impl File for DarwinFile { buffer: Rc>, c: Rc, ) -> Result<()> { - let mut file = self.file.borrow_mut(); - file.seek(std::io::SeekFrom::Start(pos as u64))?; - let buf = buffer.borrow(); - let buf = buf.as_slice(); - file.write_all(buf)?; - Ok(()) + let file = self.file.borrow(); + let result = { + let buf = buffer.borrow(); + rustix::io::pwrite(file.as_fd(), buf.as_slice(), pos as u64) + }; + match result { + std::result::Result::Ok(n) => { + trace!("pwrite n: {}", n); + // Read succeeded immediately + c.complete(n); + Ok(()) + } + Err(Errno::AGAIN) => { + trace!("pwrite blocks"); + // Would block, set up polling + let fd = file.as_raw_fd(); + unsafe { + self.poller + .borrow() + .add(&file.as_fd(), Event::readable(fd as usize))?; + } + self.callbacks.borrow_mut().insert( + fd as usize, + CompletionCallback::Write(self.file.clone(), c.clone(), buffer.clone(), pos), + ); + Ok(()) + } + Err(e) => Err(e.into()), + } } } From 30e4a70d0776dc8e260609e571530b89bf1b2a52 Mon Sep 17 00:00:00 2001 From: Bennett Clement Date: Sat, 13 Jul 2024 09:47:42 +0800 Subject: [PATCH 2/2] ignore wasm compilation target --- core/Cargo.toml | 2 +- core/io/generic.rs | 60 ++++++++++++++++++++++++++++++++++++++++++++++ core/io/mod.rs | 5 ++++ 3 files changed, 66 insertions(+), 1 deletion(-) create mode 100644 core/io/generic.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index 3752aa23e..e4c4b3b25 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -20,7 +20,7 @@ fs = [] [target.'cfg(target_os = "linux")'.dependencies] io-uring = "0.6.1" -[target.'cfg(not(target_os = "linux"))'.dependencies] +[target.'cfg(target_os = "macos")'.dependencies] polling = "3.7.2" rustix = "0.38.34" diff --git a/core/io/generic.rs b/core/io/generic.rs new file mode 100644 index 000000000..55dbdfb90 --- /dev/null +++ b/core/io/generic.rs @@ -0,0 +1,60 @@ +use super::{Completion, File, WriteCompletion, IO}; +use anyhow::{Ok, Result}; +use log::trace; +use std::cell::RefCell; +use std::io::{Read, Seek, Write}; +use std::rc::Rc; + +pub struct GenericIO {} + +impl GenericIO { + pub fn new() -> Result { + Ok(Self {}) + } +} + +impl IO for GenericIO { + fn open_file(&self, path: &str) -> Result> { + trace!("open_file(path = {})", path); + let file = std::fs::File::open(path)?; + Ok(Rc::new(GenericFile { + file: RefCell::new(file), + })) + } + + fn run_once(&self) -> Result<()> { + Ok(()) + } +} + +pub struct GenericFile { + file: RefCell, +} + +impl File for GenericFile { + fn pread(&self, pos: usize, c: Rc) -> Result<()> { + let mut file = self.file.borrow_mut(); + file.seek(std::io::SeekFrom::Start(pos as u64))?; + { + let mut buf = c.buf_mut(); + let buf = buf.as_mut_slice(); + file.read_exact(buf)?; + } + c.complete(); + Ok(()) + } + + fn pwrite( + &self, + pos: usize, + buffer: Rc>, + c: Rc, + ) -> Result<()> { + let mut file = self.file.borrow_mut(); + file.seek(std::io::SeekFrom::Start(pos as u64))?; + let buf = buffer.borrow(); + let buf = buf.as_slice(); + file.write_all(buf)?; + Ok(()) + } +} diff --git a/core/io/mod.rs b/core/io/mod.rs index de34a70cb..549ded5b0 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -128,4 +128,9 @@ cfg_block! { mod windows; pub use windows::WindowsIO as PlatformIO; } + + #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] { + mod generic; + pub use generic::GenericIO as PlatformIO; + } }