diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 4add30661..7ce6eb521 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -469,10 +469,10 @@ impl turso_core::DatabaseStorage for DatabaseFile { fn write_page( &self, page_idx: usize, - buffer: Arc>, + buffer: Arc, c: turso_core::Completion, ) -> turso_core::Result { - let size = buffer.borrow().len(); + let size = buffer.len(); let pos = (page_idx - 1) * size; self.file.pwrite(pos, buffer, c) } @@ -481,7 +481,7 @@ impl turso_core::DatabaseStorage for DatabaseFile { &self, page_idx: usize, page_size: usize, - buffers: Vec>>, + buffers: Vec>, c: turso_core::Completion, ) -> turso_core::Result { let pos = page_idx.saturating_sub(1) * page_size; diff --git a/core/io/generic.rs b/core/io/generic.rs index 5bfda1db0..0b0727d8c 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -91,7 +91,7 @@ impl File for GenericFile { file.seek(std::io::SeekFrom::Start(pos as u64))?; { let r = c.as_read(); - let mut buf = r.buf_mut(); + let mut buf = r.buf(); let buf = buf.as_mut_slice(); file.read_exact(buf)?; } @@ -99,16 +99,10 @@ impl File for GenericFile { Ok(c) } - fn pwrite( - &self, - pos: usize, - buffer: Arc>, - c: Completion, - ) -> Result { + fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; - let buf = buffer.borrow(); - let buf = buf.as_slice(); + let buf = buffer.as_slice(); file.write_all(buf)?; c.complete(buf.len() as i32); Ok(c) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 35f267ffd..61ba4badf 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -183,18 +183,18 @@ struct WritevState { /// cache the sum of all buffer lengths for the total expected write total_len: usize, /// buffers to write - bufs: Vec>>, + bufs: Vec>, /// we keep the last iovec allocation alive until final CQE last_iov_allocation: Option>, } impl WritevState { - fn new(file: &UringFile, pos: usize, bufs: Vec>>) -> Self { + fn new(file: &UringFile, pos: usize, bufs: Vec>) -> Self { let file_id = file .id() .map(Fd::Fixed) .unwrap_or_else(|| Fd::RawFd(file.as_raw_fd())); - let total_len = bufs.iter().map(|b| b.borrow().len()).sum(); + let total_len = bufs.iter().map(|b| b.len()).sum(); Self { file_id, file_pos: pos, @@ -217,10 +217,7 @@ impl WritevState { fn advance(&mut self, written: usize) { let mut remaining = written; while remaining > 0 { - let current_buf_len = { - let r = self.bufs[self.current_buffer_idx].borrow(); - r.len() - }; + let current_buf_len = self.bufs[self.current_buffer_idx].len(); let left = current_buf_len - self.current_buffer_offset; if remaining < left { self.current_buffer_offset += remaining; @@ -314,9 +311,8 @@ impl WrappedIOUring { let mut iov_count = 0; let mut last_end: Option<(*const u8, usize)> = None; for buffer in st.bufs.iter().skip(st.current_buffer_idx) { - let buf = buffer.borrow(); - let ptr = buf.as_ptr(); - let len = buf.len(); + let ptr = buffer.as_ptr(); + let len = buffer.len(); if let Some((last_ptr, last_len)) = last_end { // Check if this buffer is adjacent to the last if unsafe { last_ptr.add(last_len) } == ptr { @@ -591,7 +587,7 @@ impl File for UringFile { trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let mut io = self.io.borrow_mut(); let read_e = { - let mut buf = r.buf_mut(); + let buf = r.buf(); let len = buf.len(); let buf = buf.as_mut_ptr(); with_fd!(self, |fd| { @@ -605,18 +601,12 @@ impl File for UringFile { Ok(c) } - fn pwrite( - &self, - pos: usize, - buffer: Arc>, - c: Completion, - ) -> Result { + fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { let mut io = self.io.borrow_mut(); let write = { - let buf = buffer.borrow(); - trace!("pwrite(pos = {}, length = {})", pos, buf.len()); + trace!("pwrite(pos = {}, length = {})", pos, buffer.len()); with_fd!(self, |fd| { - io_uring::opcode::Write::new(fd, buf.as_ptr(), buf.len() as u32) + io_uring::opcode::Write::new(fd, buffer.as_ptr(), buffer.len() as u32) .offset(pos as u64) .build() .user_data(get_key(c.clone())) @@ -641,7 +631,7 @@ impl File for UringFile { fn pwritev( &self, pos: usize, - bufs: Vec>>, + bufs: Vec>, c: Completion, ) -> Result { // for a single buffer use pwrite directly diff --git a/core/io/memory.rs b/core/io/memory.rs index 745ecd3d4..f3cadd4f1 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -3,7 +3,7 @@ use crate::{LimboError, Result}; use crate::io::clock::Instant; use std::{ - cell::{Cell, RefCell, UnsafeCell}, + cell::{Cell, UnsafeCell}, collections::{BTreeMap, HashMap}, sync::{Arc, Mutex}, }; @@ -119,7 +119,7 @@ impl File for MemoryFile { let read_len = buf_len.min(file_size - pos); { - let mut read_buf = r.buf_mut(); + let read_buf = r.buf(); let mut offset = pos; let mut remaining = read_len; let mut buf_offset = 0; @@ -144,14 +144,8 @@ impl File for MemoryFile { Ok(c) } - fn pwrite( - &self, - pos: usize, - buffer: Arc>, - c: Completion, - ) -> Result { - let buf = buffer.borrow(); - let buf_len = buf.len(); + fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { + let buf_len = buffer.len(); if buf_len == 0 { c.complete(0); return Ok(c); @@ -160,7 +154,7 @@ impl File for MemoryFile { let mut offset = pos; let mut remaining = buf_len; let mut buf_offset = 0; - let data = &buf.as_slice(); + let data = &buffer.as_slice(); while remaining > 0 { let page_no = offset / PAGE_SIZE; @@ -204,25 +198,19 @@ impl File for MemoryFile { Ok(c) } - fn pwritev( - &self, - pos: usize, - buffers: Vec>>, - c: Completion, - ) -> Result { + fn pwritev(&self, pos: usize, buffers: Vec>, c: Completion) -> Result { let mut offset = pos; let mut total_written = 0; for buffer in buffers { - let buf = buffer.borrow(); - let buf_len = buf.len(); + let buf_len = buffer.len(); if buf_len == 0 { continue; } let mut remaining = buf_len; let mut buf_offset = 0; - let data = &buf.as_slice(); + let data = &buffer.as_slice(); while remaining > 0 { let page_no = offset / PAGE_SIZE; @@ -252,12 +240,6 @@ impl File for MemoryFile { } } -impl Drop for MemoryFile { - fn drop(&mut self) { - // no-op - } -} - impl MemoryFile { #[allow(clippy::mut_from_ref)] fn get_or_allocate_page(&self, page_no: usize) -> &mut MemPage { diff --git a/core/io/mod.rs b/core/io/mod.rs index 42807ee34..2265ee5b0 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -4,27 +4,15 @@ use cfg_block::cfg_block; use std::fmt; use std::ptr::NonNull; use std::sync::Arc; -use std::{ - cell::{Cell, Ref, RefCell, RefMut}, - fmt::Debug, - mem::ManuallyDrop, - pin::Pin, - rc::Rc, -}; +use std::{cell::Cell, fmt::Debug, mem::ManuallyDrop, pin::Pin, rc::Rc}; pub trait File: Send + Sync { fn lock_file(&self, exclusive: bool) -> Result<()>; fn unlock_file(&self) -> Result<()>; fn pread(&self, pos: usize, c: Completion) -> Result; - fn pwrite(&self, pos: usize, buffer: Arc>, c: Completion) - -> Result; + fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result; fn sync(&self, c: Completion) -> Result; - fn pwritev( - &self, - pos: usize, - buffers: Vec>>, - c: Completion, - ) -> Result { + fn pwritev(&self, pos: usize, buffers: Vec>, c: Completion) -> Result { use std::sync::atomic::{AtomicUsize, Ordering}; if buffers.is_empty() { c.complete(0); @@ -36,12 +24,15 @@ pub trait File: Send + Sync { let total_written = Arc::new(AtomicUsize::new(0)); for buf in buffers { - let len = buf.borrow().len(); + let len = buf.len(); let child_c = { let c_main = c.clone(); let outstanding = outstanding.clone(); let total_written = total_written.clone(); + let _cloned = buf.clone(); Completion::new_write(move |n| { + // reference buffer in callback to ensure alive for async io + let _buf = _cloned.clone(); // accumulate bytes actually reported by the backend total_written.fetch_add(n as usize, Ordering::Relaxed); if outstanding.fetch_sub(1, Ordering::AcqRel) == 1 { @@ -98,7 +89,7 @@ pub trait IO: Clock + Send + Sync { } } -pub type Complete = dyn Fn(Arc>, i32); +pub type Complete = dyn Fn(Arc, i32); pub type WriteComplete = dyn Fn(i32); pub type SyncComplete = dyn Fn(i32); pub type TruncateComplete = dyn Fn(i32); @@ -122,7 +113,7 @@ pub enum CompletionType { } pub struct ReadCompletion { - pub buf: Arc>, + pub buf: Arc, pub complete: Box, } @@ -145,9 +136,9 @@ impl Completion { )))) } - pub fn new_read(buf: Arc>, complete: F) -> Self + pub fn new_read(buf: Arc, complete: F) -> Self where - F: Fn(Arc>, i32) + 'static, + F: Fn(Arc, i32) + 'static, { Self::new(CompletionType::Read(ReadCompletion::new( buf, @@ -215,16 +206,12 @@ pub struct SyncCompletion { } impl ReadCompletion { - pub fn new(buf: Arc>, complete: Box) -> Self { + pub fn new(buf: Arc, complete: Box) -> Self { Self { buf, complete } } - pub fn buf(&self) -> Ref<'_, Buffer> { - self.buf.borrow() - } - - pub fn buf_mut(&self) -> RefMut<'_, Buffer> { - self.buf.borrow_mut() + pub fn buf(&self) -> &Buffer { + &self.buf } pub fn complete(&self, bytes_read: i32) { @@ -311,16 +298,16 @@ impl Buffer { &self.data } - pub fn as_mut_slice(&mut self) -> &mut [u8] { - &mut self.data + pub fn as_mut_slice(&self) -> &mut [u8] { + unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.data.len()) } } pub fn as_ptr(&self) -> *const u8 { self.data.as_ptr() } - pub fn as_mut_ptr(&mut self) -> *mut u8 { - self.data.as_mut_ptr() + pub fn as_mut_ptr(&self) -> *mut u8 { + self.data.as_ptr() as *mut u8 } } diff --git a/core/io/unix.rs b/core/io/unix.rs index cde493514..2c07cf61b 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -10,11 +10,7 @@ use rustix::{ io::Errno, }; use std::os::fd::RawFd; -use std::{ - cell::{RefCell, UnsafeCell}, - mem::MaybeUninit, - sync::Mutex, -}; +use std::{cell::UnsafeCell, mem::MaybeUninit, sync::Mutex}; use std::{io::ErrorKind, sync::Arc}; #[cfg(feature = "fs")] @@ -201,7 +197,7 @@ impl Clock for UnixIO { fn try_pwritev_raw( fd: RawFd, off: u64, - bufs: &[Arc>], + bufs: &[Arc], start_idx: usize, start_off: usize, ) -> std::io::Result { @@ -212,10 +208,9 @@ fn try_pwritev_raw( let mut last_end: Option<(*const u8, usize)> = None; let mut iov_count = 0; for (i, b) in bufs.iter().enumerate().skip(start_idx).take(iov_len) { - let r = b.borrow(); // borrow just to get pointer/len - let s = r.as_slice(); + let s = b.as_slice(); let ptr = if i == start_idx { &s[start_off..] } else { s }.as_ptr(); - let len = r.len(); + let len = b.len(); if let Some((last_ptr, last_len)) = last_end { // Check if this buffer is adjacent to the last @@ -300,7 +295,7 @@ impl IO for UnixIO { .lock() .map_err(|e| LimboError::LockingError(e.to_string()))?; let r = c.as_read(); - let mut buf = r.buf_mut(); + let buf = r.buf(); match rustix::io::pread(f.as_fd(), buf.as_mut_slice(), pos as u64) { Ok(n) => c.complete(n as i32), Err(Errno::AGAIN) => { @@ -319,8 +314,7 @@ impl IO for UnixIO { let f = file .lock() .map_err(|e| LimboError::LockingError(e.to_string()))?; - let b = buf.borrow(); - match rustix::io::pwrite(f.as_fd(), b.as_slice(), pos as u64) { + match rustix::io::pwrite(f.as_fd(), buf.as_slice(), pos as u64) { Ok(n) => c.complete(n as i32), Err(Errno::AGAIN) => { unsafe { self.poller.as_mut().add(&f.as_fd(), Event::writable(key))? }; @@ -343,10 +337,7 @@ impl IO for UnixIO { // advance through buffers let mut rem = written; while rem > 0 { - let len = { - let r = bufs[idx].borrow(); - r.len() - }; + let len = bufs[idx].len(); let left = len - off; if rem < left { off += rem; @@ -431,13 +422,13 @@ enum CompletionCallback { Write( Arc>, Completion, - Arc>, + Arc, usize, ), Writev( Arc>, Completion, - Vec>>, + Vec>, usize, // absolute file offset usize, // buf index usize, // intra-buf offset @@ -498,7 +489,7 @@ impl File for UnixFile<'_> { let file = self.file.lock().unwrap(); let result = { let r = c.as_read(); - let mut buf = r.buf_mut(); + let buf = r.buf(); rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64) }; match result { @@ -527,17 +518,9 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::TRACE)] - fn pwrite( - &self, - pos: usize, - buffer: Arc>, - c: Completion, - ) -> Result { + fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { let file = self.file.lock().unwrap(); - let result = { - let buf = buffer.borrow(); - rustix::io::pwrite(file.as_fd(), buf.as_slice(), pos as u64) - }; + let result = { rustix::io::pwrite(file.as_fd(), buffer.as_slice(), pos as u64) }; match result { Ok(n) => { trace!("pwrite n: {}", n); @@ -565,7 +548,7 @@ impl File for UnixFile<'_> { fn pwritev( &self, pos: usize, - buffers: Vec>>, + buffers: Vec>, c: Completion, ) -> Result { if buffers.len().eq(&1) { diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 256ba4494..e7446550d 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -2,7 +2,6 @@ use super::{Buffer, Completion, File, MemoryIO, OpenFlags, IO}; use crate::ext::VfsMod; use crate::io::clock::{Clock, Instant}; use crate::{LimboError, Result}; -use std::cell::RefCell; use std::ffi::{c_void, CString}; use std::sync::Arc; use turso_ext::{VfsFileImpl, VfsImpl}; @@ -101,7 +100,7 @@ impl File for VfsFileImpl { fn pread(&self, pos: usize, c: Completion) -> Result { let r = c.as_read(); let result = { - let mut buf = r.buf_mut(); + let buf = r.buf(); let count = buf.len(); let vfs = unsafe { &*self.vfs }; unsafe { (vfs.read)(self.file, buf.as_mut_ptr(), count, pos as i64) } @@ -114,26 +113,14 @@ impl File for VfsFileImpl { } } - fn pwrite( - &self, - pos: usize, - buffer: Arc>, - c: Completion, - ) -> Result { - let buf = buffer.borrow(); - let count = buf.as_slice().len(); + fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { + let count = buffer.as_slice().len(); if self.vfs.is_null() { return Err(LimboError::ExtensionError("VFS is null".to_string())); } let vfs = unsafe { &*self.vfs }; - let result = unsafe { - (vfs.write)( - self.file, - buf.as_slice().as_ptr() as *mut u8, - count, - pos as i64, - ) - }; + let result = + unsafe { (vfs.write)(self.file, buffer.as_ptr() as *mut u8, count, pos as i64) }; if result < 0 { Err(LimboError::ExtensionError("pwrite failed".to_string())) diff --git a/core/io/windows.rs b/core/io/windows.rs index 26d80cfc7..465325101 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -1,7 +1,6 @@ use super::MemoryIO; use crate::{Clock, Completion, File, Instant, LimboError, OpenFlags, Result, IO}; use parking_lot::RwLock; -use std::cell::RefCell; use std::io::{Read, Seek, Write}; use std::sync::Arc; use tracing::{debug, instrument, trace, Level}; @@ -89,7 +88,7 @@ impl File for WindowsFile { file.seek(std::io::SeekFrom::Start(pos as u64))?; let nr = { let r = c.as_read(); - let mut buf = r.buf_mut(); + let buf = r.buf(); let buf = buf.as_mut_slice(); file.read_exact(buf)?; buf.len() as i32 @@ -99,16 +98,10 @@ impl File for WindowsFile { } #[instrument(skip(self, c, buffer), level = Level::TRACE)] - fn pwrite( - &self, - pos: usize, - buffer: Arc>, - c: Completion, - ) -> Result { + fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { let mut file = self.file.write(); file.seek(std::io::SeekFrom::Start(pos as u64))?; - let buf = buffer.borrow(); - let buf = buf.as_slice(); + let buf = buffer.as_slice(); file.write_all(buf)?; c.complete(buffer.borrow().len() as i32); Ok(c) diff --git a/core/lib.rs b/core/lib.rs index 15877333e..903e1b552 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -1146,7 +1146,7 @@ impl Connection { let content = page_ref.get_contents(); // empty read - attempt to read absent page - if content.buffer.borrow().is_empty() { + if content.buffer.is_empty() { return Ok(false); } page.copy_from_slice(content.as_ptr()); diff --git a/core/storage/btree.rs b/core/storage/btree.rs index d4c9cf59c..ff56d2857 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -7350,10 +7350,7 @@ mod tests { let drop_fn = Rc::new(|_| {}); let inner = PageContent::new( 0, - Arc::new(RefCell::new(Buffer::new( - BufferData::new(vec![0; 4096]), - drop_fn, - ))), + Arc::new(Buffer::new(BufferData::new(vec![0; 4096]), drop_fn)), ); page.get().contents.replace(inner); let page = Arc::new(BTreePageInner { @@ -8644,15 +8641,14 @@ mod tests { while current_page <= 4 { let drop_fn = Rc::new(|_buf| {}); #[allow(clippy::arc_with_non_send_sync)] - let buf = Arc::new(RefCell::new(Buffer::allocate( + let buf = Arc::new(Buffer::allocate( pager .io .block(|| pager.with_header(|header| header.page_size))? .get() as usize, drop_fn, - ))); + )); let c = Completion::new_write(|_| {}); - #[allow(clippy::arc_with_non_send_sync)] let _c = pager .db_file .write_page(current_page as usize, buf.clone(), c)?; diff --git a/core/storage/database.rs b/core/storage/database.rs index 671e4a0da..3687cacb8 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -1,6 +1,6 @@ use crate::error::LimboError; use crate::{io::Completion, Buffer, Result}; -use std::{cell::RefCell, sync::Arc}; +use std::sync::Arc; use tracing::{instrument, Level}; /// DatabaseStorage is an interface a database file that consists of pages. @@ -10,17 +10,13 @@ use tracing::{instrument, Level}; /// or something like a remote page server service. pub trait DatabaseStorage: Send + Sync { fn read_page(&self, page_idx: usize, c: Completion) -> Result; - fn write_page( - &self, - page_idx: usize, - buffer: Arc>, - c: Completion, - ) -> Result; + fn write_page(&self, page_idx: usize, buffer: Arc, c: Completion) + -> Result; fn write_pages( &self, first_page_idx: usize, page_size: usize, - buffers: Vec>>, + buffers: Vec>, c: Completion, ) -> Result; fn sync(&self, c: Completion) -> Result; @@ -56,10 +52,10 @@ impl DatabaseStorage for DatabaseFile { fn write_page( &self, page_idx: usize, - buffer: Arc>, + buffer: Arc, c: Completion, ) -> Result { - let buffer_size = buffer.borrow().len(); + let buffer_size = buffer.len(); assert!(page_idx > 0); assert!(buffer_size >= 512); assert!(buffer_size <= 65536); @@ -72,7 +68,7 @@ impl DatabaseStorage for DatabaseFile { &self, page_idx: usize, page_size: usize, - buffers: Vec>>, + buffers: Vec>, c: Completion, ) -> Result { assert!(page_idx > 0); diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index a892a32fc..2a2dcc04f 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -626,7 +626,7 @@ mod tests { use crate::storage::pager::{Page, PageRef}; use crate::storage::sqlite3_ondisk::PageContent; use std::ptr::NonNull; - use std::{cell::RefCell, num::NonZeroUsize, pin::Pin, rc::Rc, sync::Arc}; + use std::{num::NonZeroUsize, pin::Pin, rc::Rc, sync::Arc}; use lru::LruCache; use rand_chacha::{ @@ -646,7 +646,7 @@ mod tests { let buffer = Buffer::new(Pin::new(vec![0; 4096]), buffer_drop_fn); let page_content = PageContent { offset: 0, - buffer: Arc::new(RefCell::new(buffer)), + buffer: Arc::new(buffer), overflow_cells: Vec::new(), }; page.get().contents = Some(page_content); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index b744b9e36..210af9076 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -12,7 +12,6 @@ use crate::{return_if_io, Completion, TransactionState}; use crate::{turso_assert, Buffer, Connection, LimboError, Result}; use parking_lot::RwLock; use std::cell::{Cell, OnceCell, RefCell, UnsafeCell}; -use std::cell::{Ref, RefMut}; use std::collections::HashSet; use std::hash; use std::rc::Rc; @@ -26,7 +25,7 @@ use super::sqlite3_ondisk::begin_write_btree_page; use super::wal::CheckpointMode; #[cfg(not(feature = "omit_autovacuum"))] -use {crate::io::Buffer as IoBuffer, ptrmap::*}; +use ptrmap::*; pub struct HeaderRef(PageRef); @@ -49,12 +48,10 @@ impl HeaderRef { Ok(IOResult::Done(Self(page))) } - pub fn borrow(&self) -> Ref<'_, DatabaseHeader> { + pub fn borrow(&self) -> &DatabaseHeader { // TODO: Instead of erasing mutability, implement `get_mut_contents` and return a shared reference. let content: &PageContent = self.0.get_contents(); - Ref::map(content.buffer.borrow(), |buffer| { - bytemuck::from_bytes::(&buffer.as_slice()[0..DatabaseHeader::SIZE]) - }) + bytemuck::from_bytes::(&content.buffer.as_slice()[0..DatabaseHeader::SIZE]) } } @@ -81,13 +78,11 @@ impl HeaderRefMut { Ok(IOResult::Done(Self(page))) } - pub fn borrow_mut(&self) -> RefMut<'_, DatabaseHeader> { + pub fn borrow_mut(&self) -> &mut DatabaseHeader { let content = self.0.get_contents(); - RefMut::map(content.buffer.borrow_mut(), |buffer| { - bytemuck::from_bytes_mut::( - &mut buffer.as_mut_slice()[0..DatabaseHeader::SIZE], - ) - }) + bytemuck::from_bytes_mut::( + &mut content.buffer.as_mut_slice()[0..DatabaseHeader::SIZE], + ) } } @@ -603,8 +598,7 @@ impl Pager { } }; - let page_buffer_guard: std::cell::Ref = page_content.buffer.borrow(); - let full_buffer_slice: &[u8] = page_buffer_guard.as_slice(); + let full_buffer_slice: &[u8] = page_content.buffer.as_slice(); // Ptrmap pages are not page 1, so their internal offset within their buffer should be 0. // The actual page data starts at page_content.offset within the full_buffer_slice. @@ -695,8 +689,7 @@ impl Pager { } }; - let mut page_buffer_guard = page_content.buffer.borrow_mut(); - let full_buffer_slice = page_buffer_guard.as_mut_slice(); + let full_buffer_slice = page_content.buffer.as_mut_slice(); if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > full_buffer_slice.len() { return Err(LimboError::InternalError(format!( @@ -1542,7 +1535,7 @@ impl Pager { const TRUNK_PAGE_LEAF_COUNT_OFFSET: usize = 4; // Offset to leaf count let header_ref = self.io.block(|| HeaderRefMut::from_pager(self))?; - let mut header = header_ref.borrow_mut(); + let header = header_ref.borrow_mut(); let mut state = self.free_page_state.borrow_mut(); tracing::debug!(?state); @@ -1742,7 +1735,7 @@ impl Pager { const FREELIST_TRUNK_OFFSET_FIRST_LEAF: usize = 8; let header_ref = self.io.block(|| HeaderRefMut::from_pager(self))?; - let mut header = header_ref.borrow_mut(); + let header = header_ref.borrow_mut(); loop { let mut state = self.allocate_page_state.borrow_mut(); @@ -2032,15 +2025,15 @@ impl Pager { return Ok(IOResult::IO); }; let header = header_ref.borrow(); - Ok(IOResult::Done(f(&header))) + Ok(IOResult::Done(f(header))) } pub fn with_header_mut(&self, f: impl Fn(&mut DatabaseHeader) -> T) -> Result> { let IOResult::Done(header_ref) = HeaderRefMut::from_pager(self)? else { return Ok(IOResult::IO); }; - let mut header = header_ref.borrow_mut(); - Ok(IOResult::Done(f(&mut header))) + let header = header_ref.borrow_mut(); + Ok(IOResult::Done(f(header))) } } @@ -2052,7 +2045,7 @@ pub fn allocate_new_page(page_id: usize, buffer_pool: &Arc, offset: let drop_fn = Rc::new(move |buf| { bp.put(buf); }); - let buffer = Arc::new(RefCell::new(Buffer::new(buffer, drop_fn))); + let buffer = Arc::new(Buffer::new(buffer, drop_fn)); page.set_loaded(); page.get().contents = Some(PageContent::new(offset, buffer)); } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index c766f7518..4d900eee9 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -430,12 +430,12 @@ pub struct PageContent { /// the position where page content starts. it's 100 for page 1(database file header is 100 bytes), /// 0 for all other pages. pub offset: usize, - pub buffer: Arc>, + pub buffer: Arc, pub overflow_cells: Vec, } impl PageContent { - pub fn new(offset: usize, buffer: Arc>) -> Self { + pub fn new(offset: usize, buffer: Arc) -> Self { Self { offset, buffer, @@ -453,12 +453,7 @@ impl PageContent { #[allow(clippy::mut_from_ref)] pub fn as_ptr(&self) -> &mut [u8] { - unsafe { - // unsafe trick to borrow twice - let buf_pointer = &self.buffer.as_ptr(); - let buf = (*buf_pointer).as_mut().unwrap().as_mut_slice(); - buf - } + self.buffer.as_mut_slice() } pub fn read_u8(&self, pos: usize) -> u8 { @@ -789,16 +784,16 @@ pub fn begin_read_page( buffer_pool.put(buf); }); #[allow(clippy::arc_with_non_send_sync)] - let buf = Arc::new(RefCell::new(Buffer::new(buf, drop_fn))); - let complete = Box::new(move |mut buf: Arc>, bytes_read: i32| { - let buf_len = buf.borrow().len(); + let buf = Arc::new(Buffer::new(buf, drop_fn)); + let complete = Box::new(move |mut buf: Arc, bytes_read: i32| { + let buf_len = buf.len(); turso_assert!( (allow_empty_read && bytes_read == 0) || bytes_read == buf_len as i32, "read({bytes_read}) != expected({buf_len})" ); let page = page.clone(); if bytes_read == 0 { - buf = Arc::new(RefCell::new(Buffer::allocate(0, Rc::new(|_| {})))); + buf = Arc::new(Buffer::allocate(0, Rc::new(|_| {}))); } if finish_read_page(page_idx, buf, page.clone()).is_err() { page.set_error(); @@ -809,11 +804,7 @@ pub fn begin_read_page( } #[instrument(skip_all, level = Level::INFO)] -pub fn finish_read_page( - page_idx: usize, - buffer_ref: Arc>, - page: PageRef, -) -> Result<()> { +pub fn finish_read_page(page_idx: usize, buffer_ref: Arc, page: PageRef) -> Result<()> { tracing::trace!(page_idx); let pos = if page_idx == DatabaseHeader::PAGE_ID { DatabaseHeader::SIZE @@ -854,7 +845,7 @@ pub fn begin_write_btree_page( Box::new(move |bytes_written: i32| { tracing::trace!("finish_write_btree_page"); let buf_copy = buf_copy.clone(); - let buf_len = buf_copy.borrow().len(); + let buf_len = buf_copy.len(); *clone_counter.borrow_mut() -= 1; page_finish.clear_dirty(); @@ -886,7 +877,7 @@ pub fn begin_write_btree_page( /// for 3 total syscalls instead of 9. pub fn write_pages_vectored( pager: &Pager, - batch: BTreeMap>>, + batch: BTreeMap>, ) -> Result { if batch.is_empty() { return Ok(PendingFlush::default()); @@ -1495,7 +1486,7 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result) -> Result = Box::new(move |buf: Arc>, bytes_read: i32| { - let buf = buf.borrow(); + let complete: Box = Box::new(move |buf: Arc, bytes_read: i32| { let buf_slice = buf.as_slice(); turso_assert!( bytes_read == buf_slice.len() as i32, @@ -1704,14 +1694,14 @@ pub fn begin_read_wal_frame_raw( io: &Arc, offset: usize, page_size: u32, - complete: Box>, i32)>, + complete: Box, i32)>, ) -> Result { tracing::trace!("begin_read_wal_frame_raw(offset={})", offset); let drop_fn = Rc::new(|_buf| {}); - let buf = Arc::new(RefCell::new(Buffer::allocate( + let buf = Arc::new(Buffer::allocate( page_size as usize + WAL_FRAME_HEADER_SIZE, drop_fn, - ))); + )); #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new_read(buf, complete); let c = io.pread(offset, c)?; @@ -1722,7 +1712,7 @@ pub fn begin_read_wal_frame( io: &Arc, offset: usize, buffer_pool: Arc, - complete: Box>, i32)>, + complete: Box, i32)>, ) -> Result { tracing::trace!("begin_read_wal_frame(offset={})", offset); let buf = buffer_pool.get(); @@ -1730,7 +1720,7 @@ pub fn begin_read_wal_frame( let buffer_pool = buffer_pool.clone(); buffer_pool.put(buf); }); - let buf = Arc::new(RefCell::new(Buffer::new(buf, drop_fn))); + let buf = Arc::new(Buffer::new(buf, drop_fn)); #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new_read(buf, complete); let c = io.pread(offset, c)?; @@ -1763,11 +1753,11 @@ pub fn prepare_wal_frame( page_number: u32, db_size: u32, page: &[u8], -) -> ((u32, u32), Arc>) { +) -> ((u32, u32), Arc) { tracing::trace!(page_number); let drop_fn = Rc::new(|_buf| {}); - let mut buffer = Buffer::allocate(page_size as usize + WAL_FRAME_HEADER_SIZE, drop_fn); + let buffer = Buffer::allocate(page_size as usize + WAL_FRAME_HEADER_SIZE, drop_fn); let frame = buffer.as_mut_slice(); frame[WAL_FRAME_HEADER_SIZE..].copy_from_slice(page); @@ -1788,7 +1778,7 @@ pub fn prepare_wal_frame( frame[16..20].copy_from_slice(&final_checksum.0.to_be_bytes()); frame[20..24].copy_from_slice(&final_checksum.1.to_be_bytes()); - (final_checksum, Arc::new(RefCell::new(buffer))) + (final_checksum, Arc::new(buffer)) } pub fn begin_write_wal_header(io: &Arc, header: &WalHeader) -> Result { @@ -1796,7 +1786,7 @@ pub fn begin_write_wal_header(io: &Arc, header: &WalHeader) -> Result< let buffer = { let drop_fn = Rc::new(|_buf| {}); - let mut buffer = Buffer::allocate(WAL_HEADER_SIZE, drop_fn); + let buffer = Buffer::allocate(WAL_HEADER_SIZE, drop_fn); let buf = buffer.as_mut_slice(); buf[0..4].copy_from_slice(&header.magic.to_be_bytes()); @@ -1809,13 +1799,13 @@ pub fn begin_write_wal_header(io: &Arc, header: &WalHeader) -> Result< buf[28..32].copy_from_slice(&header.checksum_2.to_be_bytes()); #[allow(clippy::arc_with_non_send_sync)] - Arc::new(RefCell::new(buffer)) + Arc::new(buffer) }; let cloned = buffer.clone(); let write_complete = move |bytes_written: i32| { // make sure to reference buffer so it's alive for async IO - let _buf = cloned.borrow(); + let _buf = cloned.clone(); turso_assert!( bytes_written == WAL_HEADER_SIZE as i32, "wal header wrote({bytes_written}) != expected({WAL_HEADER_SIZE})" diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 8041d3495..b96d0bf26 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -312,7 +312,7 @@ type PageId = usize; /// Batch is a collection of pages that are being checkpointed together. It is used to /// aggregate contiguous pages into a single write operation to the database file. pub(super) struct Batch { - items: BTreeMap>>, + items: BTreeMap>, } // TODO(preston): implement the same thing for `readv` impl Batch { @@ -339,7 +339,7 @@ impl Batch { let raw = pool.get(); let pool_clone = pool.clone(); let drop_fn = Rc::new(move |b| pool_clone.put(b)); - let new_buf = Arc::new(RefCell::new(Buffer::new(raw, drop_fn))); + let new_buf = Arc::new(Buffer::new(raw, drop_fn)); unsafe { let inner = &mut *scratch.inner.get(); @@ -351,7 +351,7 @@ impl Batch { } impl std::ops::Deref for Batch { - type Target = BTreeMap>>; + type Target = BTreeMap>; fn deref(&self) -> &Self::Target { &self.items } @@ -901,8 +901,8 @@ impl Wal for WalFile { let offset = self.frame_offset(frame_id); page.set_locked(); let frame = page.clone(); - let complete = Box::new(move |buf: Arc>, bytes_read: i32| { - let buf_len = buf.borrow().len(); + let complete = Box::new(move |buf: Arc, bytes_read: i32| { + let buf_len = buf.len(); turso_assert!( bytes_read == buf_len as i32, "read({bytes_read}) less than expected({buf_len})" @@ -923,8 +923,7 @@ impl Wal for WalFile { tracing::debug!("read_frame({})", frame_id); let offset = self.frame_offset(frame_id); let (frame_ptr, frame_len) = (frame.as_mut_ptr(), frame.len()); - let complete = Box::new(move |buf: Arc>, bytes_read: i32| { - let buf = buf.borrow(); + let complete = Box::new(move |buf: Arc, bytes_read: i32| { let buf_len = buf.len(); turso_assert!( bytes_read == buf_len as i32, @@ -971,8 +970,7 @@ impl Wal for WalFile { let (page_ptr, page_len) = (page.as_ptr(), page.len()); let complete = Box::new({ let conflict = conflict.clone(); - move |buf: Arc>, bytes_read: i32| { - let buf = buf.borrow(); + move |buf: Arc, bytes_read: i32| { let buf_len = buf.len(); turso_assert!( bytes_read == buf_len as i32, @@ -1061,7 +1059,7 @@ impl Wal for WalFile { let frame_bytes = frame_bytes.clone(); let write_counter = write_counter.clone(); move |bytes_written| { - let frame_len = frame_bytes.borrow().len(); + let frame_len = frame_bytes.len(); turso_assert!( bytes_written == frame_len as i32, "wrote({bytes_written}) != expected({frame_len})" @@ -1226,10 +1224,8 @@ impl WalFile { let drop_fn = Rc::new(move |buf| { buffer_pool.put(buf); }); - checkpoint_page.get().contents = Some(PageContent::new( - 0, - Arc::new(RefCell::new(Buffer::new(buffer, drop_fn))), - )); + checkpoint_page.get().contents = + Some(PageContent::new(0, Arc::new(Buffer::new(buffer, drop_fn)))); } let header = unsafe { shared.get().as_mut().unwrap().wal_header.lock() }; diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 746e3e6cd..3ee943555 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -9,10 +9,7 @@ use tempfile; use crate::{ error::LimboError, - io::{ - Buffer, BufferData, Completion, CompletionType, File, OpenFlags, ReadCompletion, - WriteCompletion, IO, - }, + io::{Buffer, BufferData, Completion, File, OpenFlags, IO}, storage::sqlite3_ondisk::{read_varint, varint_len, write_varint}, translate::collate::CollationSeq, turso_assert, @@ -369,14 +366,14 @@ impl SortedChunk { let drop_fn = Rc::new(|_buffer: BufferData| {}); let read_buffer = Buffer::allocate(read_buffer_size, drop_fn); - let read_buffer_ref = Arc::new(RefCell::new(read_buffer)); + let read_buffer_ref = Arc::new(read_buffer); let chunk_io_state_copy = self.io_state.clone(); let stored_buffer_copy = self.buffer.clone(); let stored_buffer_len_copy = self.buffer_len.clone(); let total_bytes_read_copy = self.total_bytes_read.clone(); - let read_complete = Box::new(move |buf: Arc>, bytes_read: i32| { - let read_buf_ref = buf.borrow(); + let read_complete = Box::new(move |buf: Arc, bytes_read: i32| { + let read_buf_ref = buf.clone(); let read_buf = read_buf_ref.as_slice(); let bytes_read = bytes_read as usize; @@ -398,10 +395,7 @@ impl SortedChunk { total_bytes_read_copy.set(total_bytes_read_copy.get() + bytes_read); }); - let c = Completion::new(CompletionType::Read(ReadCompletion::new( - read_buffer_ref, - read_complete, - ))); + let c = Completion::new_read(read_buffer_ref, read_complete); let _c = self.file.pread(self.total_bytes_read.get(), c)?; Ok(()) } @@ -421,7 +415,7 @@ impl SortedChunk { } let drop_fn = Rc::new(|_buffer: BufferData| {}); - let mut buffer = Buffer::allocate(self.chunk_size, drop_fn); + let buffer = Buffer::allocate(self.chunk_size, drop_fn); let mut buf_pos = 0; let buf = buffer.as_mut_slice(); @@ -435,19 +429,19 @@ impl SortedChunk { buf_pos += payload.len(); } - let buffer_ref = Arc::new(RefCell::new(buffer)); + let buffer_ref = Arc::new(buffer); let buffer_ref_copy = buffer_ref.clone(); let chunk_io_state_copy = self.io_state.clone(); let write_complete = Box::new(move |bytes_written: i32| { chunk_io_state_copy.set(SortedChunkIOState::WriteComplete); - let buf_len = buffer_ref_copy.borrow().len(); + let buf_len = buffer_ref_copy.len(); if bytes_written < buf_len as i32 { tracing::error!("wrote({bytes_written}) less than expected({buf_len})"); } }); - let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete))); + let c = Completion::new_write(write_complete); let _c = self.file.pwrite(0, buffer_ref, c)?; Ok(()) } diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index e21ab2d07..076e2ed9e 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -173,7 +173,7 @@ impl File for SimulatorFile { fn pwrite( &self, pos: usize, - buffer: Arc>, + buffer: Arc, c: turso_core::Completion, ) -> Result { self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); @@ -225,7 +225,7 @@ impl File for SimulatorFile { fn pwritev( &self, pos: usize, - buffers: Vec>>, + buffers: Vec>, c: turso_core::Completion, ) -> Result { self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); diff --git a/tests/integration/query_processing/test_btree.rs b/tests/integration/query_processing/test_btree.rs index a50100b42..f17b947e2 100644 --- a/tests/integration/query_processing/test_btree.rs +++ b/tests/integration/query_processing/test_btree.rs @@ -432,7 +432,7 @@ fn write_at(io: &impl IO, file: Arc, offset: usize, data: &[u8]) { let completion = Completion::new_write(|_| {}); let drop_fn = Rc::new(move |_| {}); #[allow(clippy::arc_with_non_send_sync)] - let buffer = Arc::new(RefCell::new(Buffer::new(Pin::new(data.to_vec()), drop_fn))); + let buffer = Arc::new(Buffer::new(Pin::new(data.to_vec()), drop_fn)); let result = file.pwrite(offset, buffer, completion).unwrap(); while !result.is_completed() { io.run_once().unwrap();