diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 02911294e..6b4b1992f 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -720,12 +720,14 @@ impl turso_core::DatabaseStorage for DatabaseFile { } fn write_pages( &self, - _first_page_idx: usize, - _page_size: usize, - _buffers: Vec>>, - _c: turso_core::Completion, + page_idx: usize, + page_size: usize, + buffers: Vec>>, + c: turso_core::Completion, ) -> turso_core::Result<()> { - todo!(); + let pos = (page_idx - 1) * page_size; + self.file.pwritev(pos, buffers, c.into())?; + Ok(()) } fn sync(&self, c: turso_core::Completion) -> turso_core::Result { diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 51c2c85a8..963e2fe28 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -5,36 +5,18 @@ use crate::io::clock::{Clock, Instant}; use crate::{turso_assert, LimboError, MemoryIO, Result}; use rustix::fs::{self, FlockOperation, OFlags}; use std::cell::RefCell; -use std::collections::VecDeque; -use std::fmt; +use std::collections::{HashMap, VecDeque}; use std::io::ErrorKind; use std::os::fd::AsFd; use std::os::unix::io::AsRawFd; use std::rc::Rc; use std::sync::Arc; -use thiserror::Error; use tracing::{debug, trace}; const ENTRIES: u32 = 512; const SQPOLL_IDLE: u32 = 1000; const FILES: u32 = 8; -#[derive(Debug, Error)] -enum UringIOError { - IOUringCQError(i32), -} - -impl fmt::Display for UringIOError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - UringIOError::IOUringCQError(code) => write!( - f, - "IOUring completion queue error occurred with code {code}", - ), - } - } -} - pub struct UringIO { inner: Rc>, } @@ -45,6 +27,7 @@ unsafe impl Sync for UringIO {} struct WrappedIOUring { ring: io_uring::IoUring, pending_ops: usize, + writev_states: HashMap, } struct InnerUringIO { @@ -69,6 +52,7 @@ impl UringIO { ring: WrappedIOUring { ring, pending_ops: 0, + writev_states: HashMap::new(), }, free_files: (0..FILES).collect(), }; @@ -79,6 +63,86 @@ impl UringIO { } } +macro_rules! with_fd { + ($file:expr, |$fd:ident| $body:expr) => { + match $file.id { + Some(id) => { + let $fd = io_uring::types::Fixed(id); + $body + } + None => { + let $fd = io_uring::types::Fd($file.file.as_raw_fd()); + $body + } + } + }; +} + +struct WritevState { + // fixed fd slot or 0xFFFF_FFFF if none + file_id: u32, + // absolute file offset for next submit + pos: usize, + // current buffer index in `bufs` + idx: usize, + // intra-buffer offset + off: usize, + bufs: Vec>>, + // completion returned to caller + user_c: Arc, + // we keep the last iovec allocation alive until CQE: + // raw ptr to Box<[iovec]> + last_iov: *mut libc::iovec, + last_iov_len: usize, +} + +impl WritevState { + fn remaining(&self) -> usize { + let mut total = 0; + for (i, b) in self.bufs.iter().enumerate().skip(self.idx) { + let r = b.borrow(); + let len = r.len(); + total += if i == self.idx { len - self.off } else { len }; + } + total + } + + /// Advance (idx, off, pos) after written bytes + fn advance(&mut self, written: usize) { + let mut rem = written; + while rem > 0 { + let len = { + let r = self.bufs[self.idx].borrow(); + r.len() + }; + let left = len - self.off; + if rem < left { + self.off += rem; + self.pos += rem; + rem = 0; + } else { + rem -= left; + self.pos += left; + self.idx += 1; + self.off = 0; + } + } + } + + fn free_last_iov(&mut self) { + if !self.last_iov.is_null() { + unsafe { + drop(Box::from_raw(core::slice::from_raw_parts_mut( + self.last_iov, + self.last_iov_len, + ))) + }; + self.last_iov = core::ptr::null_mut(); + self.last_iov_len = 0; + } + } +} + impl InnerUringIO { fn register_file(&mut self, fd: i32) -> Result { if let Some(slot) = self.free_files.pop_front() { @@ -132,6 +196,41 @@ impl WrappedIOUring { fn empty(&self) -> bool { self.pending_ops == 0 } + + fn submit_writev(&mut self, key: u64) { + let st = self.writev_states.get_mut(&key).expect("state must exist"); + // build iovecs for the remaining slice (respect UIO_MAXIOV) + const MAX_IOV: usize = libc::UIO_MAXIOV as usize; + let mut iov = Vec::with_capacity(MAX_IOV); + for (i, b) in st.bufs.iter().enumerate().skip(st.idx).take(MAX_IOV) { + let r = b.borrow(); + let s = r.as_slice(); + let slice = if i == st.idx { &s[st.off..] } else { s }; + if slice.is_empty() { + continue; + } + iov.push(libc::iovec { + iov_base: slice.as_ptr() as *mut _, + iov_len: slice.len(), + }); + } + + // keep iov alive until CQE + let boxed = iov.into_boxed_slice(); + let ptr = boxed.as_ptr() as *mut libc::iovec; + let len = boxed.len(); + st.free_last_iov(); + st.last_iov = ptr; + st.last_iov_len = len; + // leak; freed when CQE processed + let _ = Box::into_raw(boxed); + let entry = + io_uring::opcode::Writev::new(io_uring::types::Fixed(st.file_id), ptr, len as u32) + .offset(st.pos as u64) + .build() + .user_data(key); + self.submit_entry(&entry); + } } impl IO for UringIO { @@ -175,35 +274,53 @@ impl IO for UringIO { } fn run_once(&self) -> Result<()> { - trace!("run_once()"); - let mut inner = self.inner.borrow_mut(); - let ring = &mut inner.ring; + { + let mut inner = self.inner.borrow_mut(); + let ring = &mut inner.ring; - if ring.empty() { - return Ok(()); - } - - ring.wait_for_completion()?; - while let Some(cqe) = ring.ring.completion().next() { - ring.pending_ops -= 1; - let result = cqe.result(); - if result < 0 { - return Err(LimboError::UringIOError(format!( - "{} cqe: {:?}", - UringIOError::IOUringCQError(result), - cqe - ))); + if ring.empty() { + return Ok(()); } let ud = cqe.user_data(); turso_assert!(ud > 0, "therea are no linked timeouts or cancelations, all cqe user_data should be valid arc pointers"); - if ud == 0 { - // we currently don't have any linked timeouts or cancelations, but just in case - // lets guard against this case - tracing::error!("Received completion with user_data 0"); - continue; - } - completion_from_key(ud).complete(result); + ring.wait_for_completion()?; } + loop { + let (had, user_data, result) = { + let mut inner = self.inner.borrow_mut(); + let mut cq = inner.ring.ring.completion(); + if let Some(cqe) = cq.next() { + (true, cqe.user_data(), cqe.result()) + } else { + (false, 0, 0) + } + }; + if !had { + break; + } + self.inner.borrow_mut().ring.pending_ops -= 1; + + let mut inner = self.inner.borrow_mut(); + if let Some(mut st) = inner.ring.writev_states.remove(&user_data) { + if result < 0 { + st.free_last_iov(); + st.user_c.complete(result); + } else { + let written = result as usize; + st.free_last_iov(); + st.advance(written); + if st.remaining() == 0 { + st.user_c.complete(st.pos as i32); + } else { + inner.ring.writev_states.insert(user_data, st); + inner.ring.submit_writev(user_data); // safe: no CQ borrow alive + } + } + } else { + completion_from_key(user_data).complete(result); + } + } + Ok(()) } @@ -251,21 +368,6 @@ pub struct UringFile { unsafe impl Send for UringFile {} unsafe impl Sync for UringFile {} -macro_rules! with_fd { - ($file:expr, |$fd:ident| $body:expr) => { - match $file.id { - Some(id) => { - let $fd = io_uring::types::Fixed(id); - $body - } - None => { - let $fd = io_uring::types::Fd($file.file.as_raw_fd()); - $body - } - } - }; -} - impl File for UringFile { fn lock_file(&self, exclusive: bool) -> Result<()> { let fd = self.file.as_fd(); @@ -360,46 +462,26 @@ impl File for UringFile { &self, pos: usize, bufs: Vec>>, - c: Arc, + user_c: Arc, ) -> Result> { - // build iovecs - let mut iovs: Vec = Vec::with_capacity(bufs.len()); - for b in &bufs { - let rb = b.borrow(); - iovs.push(libc::iovec { - iov_base: rb.as_ptr() as *mut _, - iov_len: rb.len(), - }); - } - // keep iovecs alive until completion - let boxed_iovs = iovs.into_boxed_slice(); - let iov_ptr = boxed_iovs.as_ptr(); - let iov_len = boxed_iovs.len() as u32; - // leak now, free in completion - let raw_iovs = Box::into_raw(boxed_iovs); - - let comp = { - // wrap original completion to free resources - let orig = c.clone(); - Box::new(move |res: i32| { - // reclaim iovecs - unsafe { - let _ = Box::from_raw(raw_iovs); - } - // forward to user closure - orig.complete(res); - }) - }; - let c = Arc::new(Completion::new_write(comp)); + // create state + let key = get_key(user_c.clone()); let mut io = self.io.borrow_mut(); - let e = with_fd!(self, |fd| { - io_uring::opcode::Writev::new(fd, iov_ptr, iov_len) - .offset(pos as u64) - .build() - .user_data(get_key(c.clone())) - }); - io.ring.submit_entry(&e); - Ok(c) + + let state = WritevState { + file_id: self.id.unwrap_or(u32::MAX), + pos, + idx: 0, + off: 0, + bufs, + user_c: user_c.clone(), + last_iov: core::ptr::null_mut(), + last_iov_len: 0, + }; + io.ring.writev_states.insert(key, state); + io.ring.submit_writev(key); + + Ok(user_c.clone()) } fn size(&self) -> Result { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 9e345a59c..7577849c2 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -701,7 +701,8 @@ impl Drop for CheckpointLocks { fn take_page_into_batch(scratch: &PageRef, pool: &Arc, batch: &mut Vec) { // grab id and buffer let id = scratch.get().id; - let buf = scratch.get_contents().buffer.clone(); // current data + let buf = scratch.get_contents().buffer.clone(); + scratch.pin(); // ensure it isnt evicted batch.push(BatchItem { id, buf }); // give scratch a brand-new empty buffer for the next read reinit_scratch_buffer(scratch, pool);