From 8d1d2d36cc7e79726bb7df6aec4cd338e9eebb35 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 12 Nov 2024 17:53:59 +0000 Subject: [PATCH] core/linux: aligned wal header read --- core/io/linux.rs | 42 ++++++++++++++++++++++------------ core/storage/sqlite3_ondisk.rs | 4 ++-- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/core/io/linux.rs b/core/io/linux.rs index 76d20809d..42b14b2a4 100644 --- a/core/io/linux.rs +++ b/core/io/linux.rs @@ -5,7 +5,7 @@ use log::{debug, trace}; use nix::fcntl::{FcntlArg, OFlag}; use std::cell::RefCell; use std::fmt; -use std::os::unix::fs::MetadataExt; +use std::collections::HashMap; use std::os::unix::io::AsRawFd; use std::rc::Rc; use thiserror::Error; @@ -37,6 +37,8 @@ pub struct LinuxIO { struct WrappedIOUring { ring: io_uring::IoUring, pending_ops: usize, + pub pending: HashMap>, + key: u64 } struct InnerLinuxIO { @@ -52,6 +54,8 @@ impl LinuxIO { ring: WrappedIOUring { ring, pending_ops: 0, + pending: HashMap::new(), + key: 0 }, iovecs: [iovec { iov_base: std::ptr::null_mut(), @@ -76,7 +80,8 @@ impl InnerLinuxIO { } impl WrappedIOUring { - fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) { + fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Rc) { + self.pending.insert(entry.get_user_data(), c); unsafe { self.ring .submission() @@ -104,6 +109,11 @@ impl WrappedIOUring { fn empty(&self) -> bool { self.pending_ops == 0 } + + fn get_key(&mut self) -> u64 { + self.key +=1; + self.key + } } impl IO for LinuxIO { @@ -149,8 +159,11 @@ impl IO for LinuxIO { LinuxIOError::IOUringCQError(result) ))); } - let c = unsafe { Rc::from_raw(cqe.user_data() as *const Completion) }; - c.complete(cqe.result()); + { + let c = ring.pending.get(&cqe.user_data()).unwrap().clone(); + c.complete(cqe.result()); + } + ring.pending.remove(&cqe.user_data()); } Ok(()) } @@ -171,6 +184,7 @@ pub struct LinuxFile { file: std::fs::File, } + impl File for LinuxFile { fn lock_file(&self, exclusive: bool) -> Result<()> { let fd = self.file.as_raw_fd(); @@ -234,17 +248,17 @@ impl File for LinuxFile { let mut buf = r.buf_mut(); let len = buf.len(); let buf = buf.as_mut_ptr(); - let ptr = Rc::into_raw(c.clone()); let iovec = io.get_iovec(buf, len); io_uring::opcode::Readv::new(fd, iovec, 1) .offset(pos as u64) .build() - .user_data(ptr as u64) + .user_data(io.ring.get_key()) }; - io.ring.submit_entry(&read_e); + io.ring.submit_entry(&read_e, c); Ok(()) } + fn pwrite( &self, pos: usize, @@ -255,25 +269,25 @@ impl File for LinuxFile { let fd = io_uring::types::Fd(self.file.as_raw_fd()); let write = { let buf = buffer.borrow(); - let ptr = Rc::into_raw(c.clone()); + trace!("pwrite(pos = {}, length = {})", pos, buf.len()); let iovec = io.get_iovec(buf.as_ptr(), buf.len()); io_uring::opcode::Writev::new(fd, iovec, 1) .offset(pos as u64) .build() - .user_data(ptr as u64) + .user_data(io.ring.get_key()) }; - io.ring.submit_entry(&write); + io.ring.submit_entry(&write, c); Ok(()) } fn sync(&self, c: Rc) -> Result<()> { let fd = io_uring::types::Fd(self.file.as_raw_fd()); - let ptr = Rc::into_raw(c.clone()); + let mut io = self.io.borrow_mut(); + trace!("sync()"); let sync = io_uring::opcode::Fsync::new(fd) .build() - .user_data(ptr as u64); - let mut io = self.io.borrow_mut(); - io.ring.submit_entry(&sync); + .user_data(io.ring.get_key()); + io.ring.submit_entry(&sync, c); Ok(()) } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index ce2136a01..ca4cf555b 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -960,7 +960,7 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec) { pub fn begin_read_wal_header(io: &Rc) -> Result>> { let drop_fn = Rc::new(|_buf| {}); - let buf = Rc::new(RefCell::new(Buffer::allocate(WAL_HEADER_SIZE, drop_fn))); + let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn))); let result = Rc::new(RefCell::new(WalHeader::default())); let header = result.clone(); let complete = Box::new(move |buf: Rc>| { @@ -1074,7 +1074,7 @@ pub fn begin_write_wal_header(io: &Rc, header: &WalHeader) -> Result<( let buffer = { let drop_fn = Rc::new(|_buf| {}); - let mut buffer = Buffer::allocate(WAL_HEADER_SIZE, drop_fn); + let mut buffer = Buffer::allocate(512, drop_fn); let buf = buffer.as_mut_slice(); buf[0..4].copy_from_slice(&header.magic);