Merge pull request #52 from penberg/io-writepath

core: I/O write path
This commit is contained in:
Pekka Enberg
2024-06-19 21:30:36 +03:00
committed by GitHub
6 changed files with 92 additions and 10 deletions

View File

@@ -1,5 +1,5 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use limbo_core::{Database, IO, PlatformIO};
use limbo_core::{Database, PlatformIO, IO};
use pprof::criterion::{Output, PProfProfiler};
use std::rc::Rc;

View File

@@ -1,8 +1,8 @@
use super::{Completion, File, IO};
use super::{Completion, File, WriteCompletion, IO};
use anyhow::{Ok, Result};
use std::rc::Rc;
use std::cell::RefCell;
use std::io::{Read, Seek};
use std::io::{Read, Seek, Write};
use log::trace;
pub struct DarwinIO {}
@@ -43,4 +43,18 @@ impl File for DarwinFile {
c.complete();
Ok(())
}
fn pwrite(
&self,
pos: usize,
buffer: Rc<RefCell<crate::Buffer>>,
c: Rc<WriteCompletion>,
) -> Result<()> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
let buf = buffer.borrow();
let buf = buf.as_slice();
file.write_all(buf)?;
Ok(())
}
}

View File

@@ -1,9 +1,9 @@
use super::{Completion, File, IO};
use super::{Completion, File, WriteCompletion, IO};
use anyhow::Result;
use log::trace;
use std::cell::RefCell;
use std::os::unix::io::AsRawFd;
use std::rc::Rc;
use log::trace;
pub struct LinuxIO {
ring: Rc<RefCell<io_uring::IoUring>>,
@@ -21,7 +21,7 @@ impl LinuxIO {
impl IO for LinuxIO {
fn open_file(&self, path: &str) -> Result<Box<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::File::open(path)?;
let file = std::fs::File::options().read(true).write(true).open(path)?;
Ok(Box::new(LinuxFile {
ring: self.ring.clone(),
file,
@@ -59,7 +59,7 @@ impl File for LinuxFile {
let len = buf.len();
let buf = buf.as_mut_ptr();
let ptr = Rc::into_raw(c.clone());
io_uring::opcode::Read::new(fd, buf, len as u32 )
io_uring::opcode::Read::new(fd, buf, len as u32)
.offset(pos as u64)
.build()
.user_data(ptr as u64)
@@ -72,4 +72,29 @@ impl File for LinuxFile {
}
Ok(())
}
fn pwrite(
&self,
pos: usize,
buffer: Rc<RefCell<crate::Buffer>>,
c: Rc<WriteCompletion>,
) -> Result<()> {
let fd = io_uring::types::Fd(self.file.as_raw_fd());
let write = {
let buf = buffer.borrow();
let ptr = Rc::into_raw(c.clone());
io_uring::opcode::Write::new(fd, buf.as_ptr(), buf.len() as u32)
.offset(pos as u64)
.build()
.user_data(ptr as u64)
};
let mut ring = self.ring.borrow_mut();
unsafe {
ring.submission()
.push(&write)
.expect("submission queue is full");
ring.submit()?;
}
Ok(())
}
}

View File

@@ -9,6 +9,8 @@ use std::{
pub trait File {
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()>;
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<WriteCompletion>)
-> Result<()>;
}
pub trait IO {
@@ -18,12 +20,17 @@ pub trait IO {
}
pub type Complete = dyn Fn(&Buffer);
pub type WriteComplete = dyn Fn(usize);
pub struct Completion {
pub buf: RefCell<Buffer>,
pub complete: Box<Complete>,
}
pub struct WriteCompletion {
pub complete: Box<WriteComplete>,
}
impl Completion {
pub fn new(buf: Buffer, complete: Box<Complete>) -> Self {
let buf = RefCell::new(buf);
@@ -44,6 +51,15 @@ impl Completion {
}
}
impl WriteCompletion {
pub fn new(complete: Box<WriteComplete>) -> Self {
Self { complete }
}
pub fn complete(&self, bytes_written: usize) {
(self.complete)(bytes_written);
}
}
pub type BufferData = Pin<Vec<u8>>;
pub type BufferDropFn = Rc<dyn Fn(BufferData)>;
@@ -53,6 +69,15 @@ pub struct Buffer {
drop: BufferDropFn,
}
impl Clone for Buffer {
fn clone(&self) -> Self {
Self {
data: self.data.clone(),
drop: self.drop.clone(),
}
}
}
impl Drop for Buffer {
fn drop(&mut self) {
let data = unsafe { ManuallyDrop::take(&mut self.data) };

View File

@@ -1,8 +1,8 @@
use super::{Completion, File, IO};
use super::{Completion, File, WriteCompletion, IO};
use anyhow::{Ok, Result};
use std::rc::Rc;
use std::cell::RefCell;
use std::io::{Read, Seek};
use std::io::{Read, Seek, Write};
use log::trace;
pub struct WindowsIO {}
@@ -43,4 +43,18 @@ impl File for WindowsFile {
c.complete();
Ok(())
}
fn pwrite(
&self,
pos: usize,
buffer: Rc<RefCell<crate::Buffer>>,
c: Rc<WriteCompletion>,
) -> Result<()> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
let buf = buffer.borrow();
let buf = buf.as_slice();
file.write_all(buf)?;
Ok(())
}
}

View File

@@ -339,7 +339,11 @@ impl Program {
}
}
fn make_record<'a>(registers: &'a [OwnedValue], register_end: &usize, register_start: &usize) -> Record<'a> {
fn make_record<'a>(
registers: &'a [OwnedValue],
register_end: &usize,
register_start: &usize,
) -> Record<'a> {
let mut values = Vec::with_capacity(*register_end - *register_start);
for i in *register_start..*register_end {
values.push(crate::types::to_value(&registers[i]));