diff --git a/Cargo.lock b/Cargo.lock index e823ff7b6..e2e42d04a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1017,8 +1017,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1592,6 +1594,7 @@ name = "limbo-wasm" version = "0.0.14" dependencies = [ "console_error_panic_hook", + "getrandom 0.2.15", "js-sys", "limbo_core", "wasm-bindgen", diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 34f9b1fbc..e37c9f6c3 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -242,10 +242,7 @@ impl File for UringFile { } fn pread(&self, pos: usize, c: Completion) -> Result<()> { - let r = match c { - Completion::Read(ref r) => r, - _ => unreachable!(), - }; + let r = c.as_read(); trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let fd = io_uring::types::Fd(self.file.as_raw_fd()); let mut io = self.io.borrow_mut(); diff --git a/core/io/memory.rs b/core/io/memory.rs index c654da78d..ca15f1d3b 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -79,10 +79,7 @@ impl File for MemoryFile { } fn pread(&self, pos: usize, c: Completion) -> Result<()> { - let r = match &c { - Completion::Read(ref r) => r, - _ => unreachable!(), - }; + let r = c.as_read(); let buf_len = r.buf().len(); if buf_len == 0 { c.complete(0); diff --git a/core/io/mod.rs b/core/io/mod.rs index 4f92ad3e1..6c1eddb04 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -56,6 +56,15 @@ impl Completion { Self::Sync(s) => s.complete(result), // fix } } + + /// only call this method if you are sure that the completion is + /// a ReadCompletion, panics otherwise + pub fn as_read(&self) -> &ReadCompletion { + match self { + Self::Read(ref r) => r, + _ => unreachable!(), + } + } } pub struct WriteCompletion { diff --git a/core/io/unix.rs b/core/io/unix.rs index 6e0a44e47..f1323f2bd 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -9,16 +9,158 @@ use rustix::{ fs::{self, FlockOperation, OFlags, OpenOptionsExt}, io::Errno, }; -use std::cell::RefCell; -use std::collections::HashMap; use std::io::{ErrorKind, Read, Seek, Write}; use std::rc::Rc; use tracing::{debug, trace}; +use std::{ + cell::{RefCell, UnsafeCell}, + mem::MaybeUninit, +}; +struct OwnedCallbacks(UnsafeCell); +struct BorrowedCallbacks<'io>(UnsafeCell<&'io mut Callbacks>); + +impl OwnedCallbacks { + fn new() -> Self { + Self(UnsafeCell::new(Callbacks::new())) + } + fn as_mut<'io>(&self) -> &'io mut Callbacks { + unsafe { &mut *self.0.get() } + } + + fn is_empty(&self) -> bool { + self.as_mut().inline_count == 0 + } + + fn remove(&self, fd: usize) -> Option { + let callbacks = unsafe { &mut *self.0.get() }; + callbacks.remove(fd) + } +} + +impl BorrowedCallbacks<'_> { + fn insert(&self, fd: usize, callback: CompletionCallback) { + let callbacks = unsafe { &mut *self.0.get() }; + callbacks.insert(fd, callback); + } +} + +struct EventsHandler(UnsafeCell); + +impl EventsHandler { + fn new() -> Self { + Self(UnsafeCell::new(Events::new())) + } + + fn clear(&self) { + let events = unsafe { &mut *self.0.get() }; + events.clear(); + } + + fn iter(&self) -> impl Iterator { + let events = unsafe { &*self.0.get() }; + events.iter() + } + + fn as_mut<'io>(&self) -> &'io mut Events { + unsafe { &mut *self.0.get() } + } +} +struct PollHandler(UnsafeCell); +struct BorrowedPollHandler<'io>(UnsafeCell<&'io mut Poller>); + +impl BorrowedPollHandler<'_> { + fn add(&self, fd: &rustix::fd::BorrowedFd, event: Event) -> Result<()> { + let poller = unsafe { &mut *self.0.get() }; + unsafe { poller.add(fd, event)? } + Ok(()) + } +} + +impl PollHandler { + fn new() -> Self { + Self(UnsafeCell::new(Poller::new().unwrap())) + } + fn wait(&self, events: &mut Events, timeout: Option) -> Result<()> { + let poller = unsafe { &mut *self.0.get() }; + poller.wait(events, timeout)?; + Ok(()) + } + + fn as_mut<'io>(&self) -> &'io mut Poller { + unsafe { &mut *self.0.get() } + } +} + +type CallbackEntry = (usize, CompletionCallback); + +const FD_INLINE_SIZE: usize = 32; + +struct Callbacks { + inline_entries: [MaybeUninit<(usize, CompletionCallback)>; FD_INLINE_SIZE], + heap_entries: Vec, + inline_count: usize, +} + +impl Callbacks { + fn new() -> Self { + Self { + inline_entries: [const { MaybeUninit::uninit() }; FD_INLINE_SIZE], + heap_entries: Vec::new(), + inline_count: 0, + } + } + + fn insert(&mut self, fd: usize, callback: CompletionCallback) { + if self.inline_count < FD_INLINE_SIZE { + self.inline_entries[self.inline_count].write((fd, callback)); + self.inline_count += 1; + } else { + self.heap_entries.push((fd, callback)); + } + } + + fn remove(&mut self, fd: usize) -> Option { + if let Some(pos) = self.find_inline(fd) { + let (_, callback) = unsafe { self.inline_entries[pos].assume_init_read() }; + + // if not the last element, move the last valid entry into this position + if pos < self.inline_count - 1 { + let last_valid = + unsafe { self.inline_entries[self.inline_count - 1].assume_init_read() }; + self.inline_entries[pos].write(last_valid); + } + + self.inline_count -= 1; + return Some(callback); + } + + if let Some(pos) = self.heap_entries.iter().position(|&(k, _)| k == fd) { + return Some(self.heap_entries.swap_remove(pos).1); + } + None + } + + fn find_inline(&self, fd: usize) -> Option { + (0..self.inline_count) + .find(|&i| unsafe { self.inline_entries[i].assume_init_ref().0 == fd }) + } +} + +impl Drop for Callbacks { + fn drop(&mut self) { + for i in 0..self.inline_count { + unsafe { self.inline_entries[i].assume_init_drop() }; + } + } +} + +/// UnixIO lives longer than any of the files it creates, so it is +/// safe to store references to it's internals in the UnixFiles pub struct UnixIO { - poller: Rc>, - events: Rc>, - callbacks: Rc>>, + poller: PollHandler, + events: EventsHandler, + callbacks: OwnedCallbacks, } impl UnixIO { @@ -26,9 +168,9 @@ impl UnixIO { pub fn new() -> Result { debug!("Using IO backend 'syscall'"); Ok(Self { - poller: Rc::new(RefCell::new(Poller::new()?)), - events: Rc::new(RefCell::new(Events::new())), - callbacks: Rc::new(RefCell::new(HashMap::new())), + poller: PollHandler::new(), + events: EventsHandler::new(), + callbacks: OwnedCallbacks::new(), }) } } @@ -45,8 +187,8 @@ impl IO for UnixIO { let unix_file = Rc::new(UnixFile { file: Rc::new(RefCell::new(file)), - poller: self.poller.clone(), - callbacks: self.callbacks.clone(), + poller: BorrowedPollHandler(self.poller.as_mut().into()), + callbacks: BorrowedCallbacks(self.callbacks.as_mut().into()), }); if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() { unix_file.lock_file(true)?; @@ -55,53 +197,37 @@ impl IO for UnixIO { } fn run_once(&self) -> Result<()> { - if self.callbacks.borrow().is_empty() { + if self.callbacks.is_empty() { return Ok(()); } - let mut events = self.events.borrow_mut(); - events.clear(); - + self.events.clear(); trace!("run_once() waits for events"); - let poller = self.poller.borrow(); - poller.wait(&mut events, None)?; + self.poller.wait(self.events.as_mut(), None)?; - for event in events.iter() { - if let Some(cf) = self.callbacks.borrow_mut().remove(&event.key) { - let result = { - match cf { - CompletionCallback::Read(ref file, ref c, pos) => { - let mut file = file.borrow_mut(); - let c: &Completion = c; - let r = match c { - Completion::Read(r) => r, - _ => unreachable!(), - }; - let mut buf = r.buf_mut(); - file.seek(std::io::SeekFrom::Start(pos as u64))?; - file.read(buf.as_mut_slice()) - } - CompletionCallback::Write(ref file, _, ref buf, pos) => { - let mut file = file.borrow_mut(); - let buf = buf.borrow(); - file.seek(std::io::SeekFrom::Start(pos as u64))?; - file.write(buf.as_slice()) - } + for event in self.events.iter() { + if let Some(cf) = self.callbacks.remove(event.key) { + let result = match cf { + CompletionCallback::Read(ref file, ref c, pos) => { + let mut file = file.borrow_mut(); + let r = c.as_read(); + let mut buf = r.buf_mut(); + file.seek(std::io::SeekFrom::Start(pos as u64))?; + file.read(buf.as_mut_slice()) + } + CompletionCallback::Write(ref file, _, ref buf, pos) => { + let mut file = file.borrow_mut(); + let buf = buf.borrow(); + file.seek(std::io::SeekFrom::Start(pos as u64))?; + file.write(buf.as_slice()) } }; - return match result { - Ok(n) => { - match &cf { - CompletionCallback::Read(_, ref c, _) => { - c.complete(0); - } - CompletionCallback::Write(_, ref c, _, _) => { - c.complete(n as i32); - } - } - Ok(()) - } - Err(e) => Err(e.into()), - }; + match result { + Ok(n) => match &cf { + CompletionCallback::Read(_, ref c, _) => c.complete(0), + CompletionCallback::Write(_, ref c, _, _) => c.complete(n as i32), + }, + Err(e) => return Err(e.into()), + } } } Ok(()) @@ -128,13 +254,13 @@ enum CompletionCallback { ), } -pub struct UnixFile { +pub struct UnixFile<'io> { file: Rc>, - poller: Rc>, - callbacks: Rc>>, + poller: BorrowedPollHandler<'io>, + callbacks: BorrowedCallbacks<'io>, } -impl File for UnixFile { +impl File for UnixFile<'_> { fn lock_file(&self, exclusive: bool) -> Result<()> { let fd = self.file.borrow(); let fd = fd.as_fd(); @@ -177,10 +303,7 @@ impl File for UnixFile { fn pread(&self, pos: usize, c: Completion) -> Result<()> { let file = self.file.borrow(); let result = { - let r = match c { - Completion::Read(ref r) => r, - _ => unreachable!(), - }; + let r = c.as_read(); let mut buf = r.buf_mut(); rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64) }; @@ -195,15 +318,14 @@ impl File for UnixFile { trace!("pread blocks"); // Would block, set up polling let fd = file.as_raw_fd(); - unsafe { - self.poller - .borrow() - .add(&file.as_fd(), Event::readable(fd as usize))?; + self.poller + .add(&file.as_fd(), Event::readable(fd as usize))?; + { + self.callbacks.insert( + fd as usize, + CompletionCallback::Read(self.file.clone(), c, pos), + ); } - self.callbacks.borrow_mut().insert( - fd as usize, - CompletionCallback::Read(self.file.clone(), c, pos), - ); Ok(()) } Err(e) => Err(e.into()), @@ -227,12 +349,9 @@ impl File for UnixFile { trace!("pwrite blocks"); // Would block, set up polling let fd = file.as_raw_fd(); - unsafe { - self.poller - .borrow() - .add(&file.as_fd(), Event::readable(fd as usize))?; - } - self.callbacks.borrow_mut().insert( + self.poller + .add(&file.as_fd(), Event::readable(fd as usize))?; + self.callbacks.insert( fd as usize, CompletionCallback::Write(self.file.clone(), c, buffer.clone(), pos), ); @@ -261,7 +380,7 @@ impl File for UnixFile { } } -impl Drop for UnixFile { +impl Drop for UnixFile<'_> { fn drop(&mut self) { self.unlock_file().expect("Failed to unlock file"); } diff --git a/core/io/windows.rs b/core/io/windows.rs index 48a676bf6..bc80b90bf 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -58,10 +58,7 @@ impl File for WindowsFile { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { - let r = match c { - Completion::Read(ref r) => r, - _ => unreachable!(), - }; + let r = c.as_read(); let mut buf = r.buf_mut(); let buf = buf.as_mut_slice(); file.read_exact(buf)?;