Merge 'Remove RefCell from Buffer in IO trait methods and PageContents' from Preston Thorpe

We are doing unsafe mut borrowing in `PageContent` currently, and when
new BufferPool is merged, it will be all pointers into an arena anyway.
We just need to be sure to clone the Arc and reference the buffers in
the completion callbacks so they are ensured to live for async IO.
naturally in true spirit of all my previous PR's, I needed to introduce
one while another is open that causes an absurd amount of conflicts.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #2456
This commit is contained in:
Preston Thorpe
2025-08-05 21:30:27 -04:00
committed by GitHub
18 changed files with 138 additions and 257 deletions

View File

@@ -469,10 +469,10 @@ impl turso_core::DatabaseStorage for DatabaseFile {
fn write_page(
&self,
page_idx: usize,
buffer: Arc<RefCell<turso_core::Buffer>>,
buffer: Arc<turso_core::Buffer>,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
let size = buffer.borrow().len();
let size = buffer.len();
let pos = (page_idx - 1) * size;
self.file.pwrite(pos, buffer, c)
}
@@ -481,7 +481,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
&self,
page_idx: usize,
page_size: usize,
buffers: Vec<Arc<RefCell<turso_core::Buffer>>>,
buffers: Vec<Arc<turso_core::Buffer>>,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
let pos = page_idx.saturating_sub(1) * page_size;

View File

@@ -91,7 +91,7 @@ impl File for GenericFile {
file.seek(std::io::SeekFrom::Start(pos as u64))?;
{
let r = c.as_read();
let mut buf = r.buf_mut();
let mut buf = r.buf();
let buf = buf.as_mut_slice();
file.read_exact(buf)?;
}
@@ -99,16 +99,10 @@ impl File for GenericFile {
Ok(c)
}
fn pwrite(
&self,
pos: usize,
buffer: Arc<RefCell<crate::Buffer>>,
c: Completion,
) -> Result<Completion> {
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
let buf = buffer.borrow();
let buf = buf.as_slice();
let buf = buffer.as_slice();
file.write_all(buf)?;
c.complete(buf.len() as i32);
Ok(c)

View File

@@ -183,18 +183,18 @@ struct WritevState {
/// cache the sum of all buffer lengths for the total expected write
total_len: usize,
/// buffers to write
bufs: Vec<Arc<RefCell<crate::Buffer>>>,
bufs: Vec<Arc<crate::Buffer>>,
/// we keep the last iovec allocation alive until final CQE
last_iov_allocation: Option<Box<[libc::iovec; MAX_IOVEC_ENTRIES]>>,
}
impl WritevState {
fn new(file: &UringFile, pos: usize, bufs: Vec<Arc<RefCell<crate::Buffer>>>) -> Self {
fn new(file: &UringFile, pos: usize, bufs: Vec<Arc<crate::Buffer>>) -> Self {
let file_id = file
.id()
.map(Fd::Fixed)
.unwrap_or_else(|| Fd::RawFd(file.as_raw_fd()));
let total_len = bufs.iter().map(|b| b.borrow().len()).sum();
let total_len = bufs.iter().map(|b| b.len()).sum();
Self {
file_id,
file_pos: pos,
@@ -217,10 +217,7 @@ impl WritevState {
fn advance(&mut self, written: usize) {
let mut remaining = written;
while remaining > 0 {
let current_buf_len = {
let r = self.bufs[self.current_buffer_idx].borrow();
r.len()
};
let current_buf_len = self.bufs[self.current_buffer_idx].len();
let left = current_buf_len - self.current_buffer_offset;
if remaining < left {
self.current_buffer_offset += remaining;
@@ -314,9 +311,8 @@ impl WrappedIOUring {
let mut iov_count = 0;
let mut last_end: Option<(*const u8, usize)> = None;
for buffer in st.bufs.iter().skip(st.current_buffer_idx) {
let buf = buffer.borrow();
let ptr = buf.as_ptr();
let len = buf.len();
let ptr = buffer.as_ptr();
let len = buffer.len();
if let Some((last_ptr, last_len)) = last_end {
// Check if this buffer is adjacent to the last
if unsafe { last_ptr.add(last_len) } == ptr {
@@ -591,7 +587,7 @@ impl File for UringFile {
trace!("pread(pos = {}, length = {})", pos, r.buf().len());
let mut io = self.io.borrow_mut();
let read_e = {
let mut buf = r.buf_mut();
let buf = r.buf();
let len = buf.len();
let buf = buf.as_mut_ptr();
with_fd!(self, |fd| {
@@ -605,18 +601,12 @@ impl File for UringFile {
Ok(c)
}
fn pwrite(
&self,
pos: usize,
buffer: Arc<RefCell<crate::Buffer>>,
c: Completion,
) -> Result<Completion> {
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
let mut io = self.io.borrow_mut();
let write = {
let buf = buffer.borrow();
trace!("pwrite(pos = {}, length = {})", pos, buf.len());
trace!("pwrite(pos = {}, length = {})", pos, buffer.len());
with_fd!(self, |fd| {
io_uring::opcode::Write::new(fd, buf.as_ptr(), buf.len() as u32)
io_uring::opcode::Write::new(fd, buffer.as_ptr(), buffer.len() as u32)
.offset(pos as u64)
.build()
.user_data(get_key(c.clone()))
@@ -641,7 +631,7 @@ impl File for UringFile {
fn pwritev(
&self,
pos: usize,
bufs: Vec<Arc<RefCell<crate::Buffer>>>,
bufs: Vec<Arc<crate::Buffer>>,
c: Completion,
) -> Result<Completion> {
// for a single buffer use pwrite directly

View File

@@ -3,7 +3,7 @@ use crate::{LimboError, Result};
use crate::io::clock::Instant;
use std::{
cell::{Cell, RefCell, UnsafeCell},
cell::{Cell, UnsafeCell},
collections::{BTreeMap, HashMap},
sync::{Arc, Mutex},
};
@@ -119,7 +119,7 @@ impl File for MemoryFile {
let read_len = buf_len.min(file_size - pos);
{
let mut read_buf = r.buf_mut();
let read_buf = r.buf();
let mut offset = pos;
let mut remaining = read_len;
let mut buf_offset = 0;
@@ -144,14 +144,8 @@ impl File for MemoryFile {
Ok(c)
}
fn pwrite(
&self,
pos: usize,
buffer: Arc<RefCell<Buffer>>,
c: Completion,
) -> Result<Completion> {
let buf = buffer.borrow();
let buf_len = buf.len();
fn pwrite(&self, pos: usize, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
let buf_len = buffer.len();
if buf_len == 0 {
c.complete(0);
return Ok(c);
@@ -160,7 +154,7 @@ impl File for MemoryFile {
let mut offset = pos;
let mut remaining = buf_len;
let mut buf_offset = 0;
let data = &buf.as_slice();
let data = &buffer.as_slice();
while remaining > 0 {
let page_no = offset / PAGE_SIZE;
@@ -204,25 +198,19 @@ impl File for MemoryFile {
Ok(c)
}
fn pwritev(
&self,
pos: usize,
buffers: Vec<Arc<RefCell<Buffer>>>,
c: Completion,
) -> Result<Completion> {
fn pwritev(&self, pos: usize, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
let mut offset = pos;
let mut total_written = 0;
for buffer in buffers {
let buf = buffer.borrow();
let buf_len = buf.len();
let buf_len = buffer.len();
if buf_len == 0 {
continue;
}
let mut remaining = buf_len;
let mut buf_offset = 0;
let data = &buf.as_slice();
let data = &buffer.as_slice();
while remaining > 0 {
let page_no = offset / PAGE_SIZE;
@@ -252,12 +240,6 @@ impl File for MemoryFile {
}
}
impl Drop for MemoryFile {
fn drop(&mut self) {
// no-op
}
}
impl MemoryFile {
#[allow(clippy::mut_from_ref)]
fn get_or_allocate_page(&self, page_no: usize) -> &mut MemPage {

View File

@@ -4,27 +4,15 @@ use cfg_block::cfg_block;
use std::fmt;
use std::ptr::NonNull;
use std::sync::Arc;
use std::{
cell::{Cell, Ref, RefCell, RefMut},
fmt::Debug,
mem::ManuallyDrop,
pin::Pin,
rc::Rc,
};
use std::{cell::Cell, fmt::Debug, mem::ManuallyDrop, pin::Pin, rc::Rc};
pub trait File: Send + Sync {
fn lock_file(&self, exclusive: bool) -> Result<()>;
fn unlock_file(&self) -> Result<()>;
fn pread(&self, pos: usize, c: Completion) -> Result<Completion>;
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<Buffer>>, c: Completion)
-> Result<Completion>;
fn pwrite(&self, pos: usize, buffer: Arc<Buffer>, c: Completion) -> Result<Completion>;
fn sync(&self, c: Completion) -> Result<Completion>;
fn pwritev(
&self,
pos: usize,
buffers: Vec<Arc<RefCell<Buffer>>>,
c: Completion,
) -> Result<Completion> {
fn pwritev(&self, pos: usize, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
use std::sync::atomic::{AtomicUsize, Ordering};
if buffers.is_empty() {
c.complete(0);
@@ -36,12 +24,15 @@ pub trait File: Send + Sync {
let total_written = Arc::new(AtomicUsize::new(0));
for buf in buffers {
let len = buf.borrow().len();
let len = buf.len();
let child_c = {
let c_main = c.clone();
let outstanding = outstanding.clone();
let total_written = total_written.clone();
let _cloned = buf.clone();
Completion::new_write(move |n| {
// reference buffer in callback to ensure alive for async io
let _buf = _cloned.clone();
// accumulate bytes actually reported by the backend
total_written.fetch_add(n as usize, Ordering::Relaxed);
if outstanding.fetch_sub(1, Ordering::AcqRel) == 1 {
@@ -98,7 +89,7 @@ pub trait IO: Clock + Send + Sync {
}
}
pub type Complete = dyn Fn(Arc<RefCell<Buffer>>, i32);
pub type Complete = dyn Fn(Arc<Buffer>, i32);
pub type WriteComplete = dyn Fn(i32);
pub type SyncComplete = dyn Fn(i32);
pub type TruncateComplete = dyn Fn(i32);
@@ -122,7 +113,7 @@ pub enum CompletionType {
}
pub struct ReadCompletion {
pub buf: Arc<RefCell<Buffer>>,
pub buf: Arc<Buffer>,
pub complete: Box<Complete>,
}
@@ -145,9 +136,9 @@ impl Completion {
))))
}
pub fn new_read<F>(buf: Arc<RefCell<Buffer>>, complete: F) -> Self
pub fn new_read<F>(buf: Arc<Buffer>, complete: F) -> Self
where
F: Fn(Arc<RefCell<Buffer>>, i32) + 'static,
F: Fn(Arc<Buffer>, i32) + 'static,
{
Self::new(CompletionType::Read(ReadCompletion::new(
buf,
@@ -215,16 +206,12 @@ pub struct SyncCompletion {
}
impl ReadCompletion {
pub fn new(buf: Arc<RefCell<Buffer>>, complete: Box<Complete>) -> Self {
pub fn new(buf: Arc<Buffer>, complete: Box<Complete>) -> Self {
Self { buf, complete }
}
pub fn buf(&self) -> Ref<'_, Buffer> {
self.buf.borrow()
}
pub fn buf_mut(&self) -> RefMut<'_, Buffer> {
self.buf.borrow_mut()
pub fn buf(&self) -> &Buffer {
&self.buf
}
pub fn complete(&self, bytes_read: i32) {
@@ -311,16 +298,17 @@ impl Buffer {
&self.data
}
pub fn as_mut_slice(&mut self) -> &mut [u8] {
&mut self.data
#[allow(clippy::mut_from_ref)]
pub fn as_mut_slice(&self) -> &mut [u8] {
unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.data.len()) }
}
pub fn as_ptr(&self) -> *const u8 {
self.data.as_ptr()
}
pub fn as_mut_ptr(&mut self) -> *mut u8 {
self.data.as_mut_ptr()
pub fn as_mut_ptr(&self) -> *mut u8 {
self.data.as_ptr() as *mut u8
}
}

View File

@@ -10,11 +10,7 @@ use rustix::{
io::Errno,
};
use std::os::fd::RawFd;
use std::{
cell::{RefCell, UnsafeCell},
mem::MaybeUninit,
sync::Mutex,
};
use std::{cell::UnsafeCell, mem::MaybeUninit, sync::Mutex};
use std::{io::ErrorKind, sync::Arc};
#[cfg(feature = "fs")]
@@ -201,7 +197,7 @@ impl Clock for UnixIO {
fn try_pwritev_raw(
fd: RawFd,
off: u64,
bufs: &[Arc<RefCell<crate::Buffer>>],
bufs: &[Arc<crate::Buffer>],
start_idx: usize,
start_off: usize,
) -> std::io::Result<usize> {
@@ -212,10 +208,9 @@ fn try_pwritev_raw(
let mut last_end: Option<(*const u8, usize)> = None;
let mut iov_count = 0;
for (i, b) in bufs.iter().enumerate().skip(start_idx).take(iov_len) {
let r = b.borrow(); // borrow just to get pointer/len
let s = r.as_slice();
let s = b.as_slice();
let ptr = if i == start_idx { &s[start_off..] } else { s }.as_ptr();
let len = r.len();
let len = b.len();
if let Some((last_ptr, last_len)) = last_end {
// Check if this buffer is adjacent to the last
@@ -300,7 +295,7 @@ impl IO for UnixIO {
.lock()
.map_err(|e| LimboError::LockingError(e.to_string()))?;
let r = c.as_read();
let mut buf = r.buf_mut();
let buf = r.buf();
match rustix::io::pread(f.as_fd(), buf.as_mut_slice(), pos as u64) {
Ok(n) => c.complete(n as i32),
Err(Errno::AGAIN) => {
@@ -319,8 +314,7 @@ impl IO for UnixIO {
let f = file
.lock()
.map_err(|e| LimboError::LockingError(e.to_string()))?;
let b = buf.borrow();
match rustix::io::pwrite(f.as_fd(), b.as_slice(), pos as u64) {
match rustix::io::pwrite(f.as_fd(), buf.as_slice(), pos as u64) {
Ok(n) => c.complete(n as i32),
Err(Errno::AGAIN) => {
unsafe { self.poller.as_mut().add(&f.as_fd(), Event::writable(key))? };
@@ -343,10 +337,7 @@ impl IO for UnixIO {
// advance through buffers
let mut rem = written;
while rem > 0 {
let len = {
let r = bufs[idx].borrow();
r.len()
};
let len = bufs[idx].len();
let left = len - off;
if rem < left {
off += rem;
@@ -431,13 +422,13 @@ enum CompletionCallback {
Write(
Arc<Mutex<std::fs::File>>,
Completion,
Arc<RefCell<crate::Buffer>>,
Arc<crate::Buffer>,
usize,
),
Writev(
Arc<Mutex<std::fs::File>>,
Completion,
Vec<Arc<RefCell<crate::Buffer>>>,
Vec<Arc<crate::Buffer>>,
usize, // absolute file offset
usize, // buf index
usize, // intra-buf offset
@@ -498,7 +489,7 @@ impl File for UnixFile<'_> {
let file = self.file.lock().unwrap();
let result = {
let r = c.as_read();
let mut buf = r.buf_mut();
let buf = r.buf();
rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64)
};
match result {
@@ -527,17 +518,9 @@ impl File for UnixFile<'_> {
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn pwrite(
&self,
pos: usize,
buffer: Arc<RefCell<crate::Buffer>>,
c: Completion,
) -> Result<Completion> {
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
let file = self.file.lock().unwrap();
let result = {
let buf = buffer.borrow();
rustix::io::pwrite(file.as_fd(), buf.as_slice(), pos as u64)
};
let result = { rustix::io::pwrite(file.as_fd(), buffer.as_slice(), pos as u64) };
match result {
Ok(n) => {
trace!("pwrite n: {}", n);
@@ -565,7 +548,7 @@ impl File for UnixFile<'_> {
fn pwritev(
&self,
pos: usize,
buffers: Vec<Arc<RefCell<crate::Buffer>>>,
buffers: Vec<Arc<crate::Buffer>>,
c: Completion,
) -> Result<Completion> {
if buffers.len().eq(&1) {

View File

@@ -2,7 +2,6 @@ use super::{Buffer, Completion, File, MemoryIO, OpenFlags, IO};
use crate::ext::VfsMod;
use crate::io::clock::{Clock, Instant};
use crate::{LimboError, Result};
use std::cell::RefCell;
use std::ffi::{c_void, CString};
use std::sync::Arc;
use turso_ext::{VfsFileImpl, VfsImpl};
@@ -101,7 +100,7 @@ impl File for VfsFileImpl {
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
let r = c.as_read();
let result = {
let mut buf = r.buf_mut();
let buf = r.buf();
let count = buf.len();
let vfs = unsafe { &*self.vfs };
unsafe { (vfs.read)(self.file, buf.as_mut_ptr(), count, pos as i64) }
@@ -114,26 +113,14 @@ impl File for VfsFileImpl {
}
}
fn pwrite(
&self,
pos: usize,
buffer: Arc<RefCell<Buffer>>,
c: Completion,
) -> Result<Completion> {
let buf = buffer.borrow();
let count = buf.as_slice().len();
fn pwrite(&self, pos: usize, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
let count = buffer.as_slice().len();
if self.vfs.is_null() {
return Err(LimboError::ExtensionError("VFS is null".to_string()));
}
let vfs = unsafe { &*self.vfs };
let result = unsafe {
(vfs.write)(
self.file,
buf.as_slice().as_ptr() as *mut u8,
count,
pos as i64,
)
};
let result =
unsafe { (vfs.write)(self.file, buffer.as_ptr() as *mut u8, count, pos as i64) };
if result < 0 {
Err(LimboError::ExtensionError("pwrite failed".to_string()))

View File

@@ -1,7 +1,6 @@
use super::MemoryIO;
use crate::{Clock, Completion, File, Instant, LimboError, OpenFlags, Result, IO};
use parking_lot::RwLock;
use std::cell::RefCell;
use std::io::{Read, Seek, Write};
use std::sync::Arc;
use tracing::{debug, instrument, trace, Level};
@@ -89,7 +88,7 @@ impl File for WindowsFile {
file.seek(std::io::SeekFrom::Start(pos as u64))?;
let nr = {
let r = c.as_read();
let mut buf = r.buf_mut();
let buf = r.buf();
let buf = buf.as_mut_slice();
file.read_exact(buf)?;
buf.len() as i32
@@ -99,16 +98,10 @@ impl File for WindowsFile {
}
#[instrument(skip(self, c, buffer), level = Level::TRACE)]
fn pwrite(
&self,
pos: usize,
buffer: Arc<RefCell<crate::Buffer>>,
c: Completion,
) -> Result<Completion> {
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
let mut file = self.file.write();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
let buf = buffer.borrow();
let buf = buf.as_slice();
let buf = buffer.as_slice();
file.write_all(buf)?;
c.complete(buffer.borrow().len() as i32);
Ok(c)

View File

@@ -1146,7 +1146,7 @@ impl Connection {
let content = page_ref.get_contents();
// empty read - attempt to read absent page
if content.buffer.borrow().is_empty() {
if content.buffer.is_empty() {
return Ok(false);
}
page.copy_from_slice(content.as_ptr());

View File

@@ -7350,10 +7350,7 @@ mod tests {
let drop_fn = Rc::new(|_| {});
let inner = PageContent::new(
0,
Arc::new(RefCell::new(Buffer::new(
BufferData::new(vec![0; 4096]),
drop_fn,
))),
Arc::new(Buffer::new(BufferData::new(vec![0; 4096]), drop_fn)),
);
page.get().contents.replace(inner);
let page = Arc::new(BTreePageInner {
@@ -8644,15 +8641,14 @@ mod tests {
while current_page <= 4 {
let drop_fn = Rc::new(|_buf| {});
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(RefCell::new(Buffer::allocate(
let buf = Arc::new(Buffer::allocate(
pager
.io
.block(|| pager.with_header(|header| header.page_size))?
.get() as usize,
drop_fn,
)));
));
let c = Completion::new_write(|_| {});
#[allow(clippy::arc_with_non_send_sync)]
let _c = pager
.db_file
.write_page(current_page as usize, buf.clone(), c)?;

View File

@@ -1,6 +1,6 @@
use crate::error::LimboError;
use crate::{io::Completion, Buffer, Result};
use std::{cell::RefCell, sync::Arc};
use std::sync::Arc;
use tracing::{instrument, Level};
/// DatabaseStorage is an interface a database file that consists of pages.
@@ -10,17 +10,13 @@ use tracing::{instrument, Level};
/// or something like a remote page server service.
pub trait DatabaseStorage: Send + Sync {
fn read_page(&self, page_idx: usize, c: Completion) -> Result<Completion>;
fn write_page(
&self,
page_idx: usize,
buffer: Arc<RefCell<Buffer>>,
c: Completion,
) -> Result<Completion>;
fn write_page(&self, page_idx: usize, buffer: Arc<Buffer>, c: Completion)
-> Result<Completion>;
fn write_pages(
&self,
first_page_idx: usize,
page_size: usize,
buffers: Vec<Arc<RefCell<Buffer>>>,
buffers: Vec<Arc<Buffer>>,
c: Completion,
) -> Result<Completion>;
fn sync(&self, c: Completion) -> Result<Completion>;
@@ -56,10 +52,10 @@ impl DatabaseStorage for DatabaseFile {
fn write_page(
&self,
page_idx: usize,
buffer: Arc<RefCell<Buffer>>,
buffer: Arc<Buffer>,
c: Completion,
) -> Result<Completion> {
let buffer_size = buffer.borrow().len();
let buffer_size = buffer.len();
assert!(page_idx > 0);
assert!(buffer_size >= 512);
assert!(buffer_size <= 65536);
@@ -72,7 +68,7 @@ impl DatabaseStorage for DatabaseFile {
&self,
page_idx: usize,
page_size: usize,
buffers: Vec<Arc<RefCell<Buffer>>>,
buffers: Vec<Arc<Buffer>>,
c: Completion,
) -> Result<Completion> {
assert!(page_idx > 0);

View File

@@ -626,7 +626,7 @@ mod tests {
use crate::storage::pager::{Page, PageRef};
use crate::storage::sqlite3_ondisk::PageContent;
use std::ptr::NonNull;
use std::{cell::RefCell, num::NonZeroUsize, pin::Pin, rc::Rc, sync::Arc};
use std::{num::NonZeroUsize, pin::Pin, rc::Rc, sync::Arc};
use lru::LruCache;
use rand_chacha::{
@@ -646,7 +646,7 @@ mod tests {
let buffer = Buffer::new(Pin::new(vec![0; 4096]), buffer_drop_fn);
let page_content = PageContent {
offset: 0,
buffer: Arc::new(RefCell::new(buffer)),
buffer: Arc::new(buffer),
overflow_cells: Vec::new(),
};
page.get().contents = Some(page_content);

View File

@@ -12,7 +12,6 @@ use crate::{return_if_io, Completion, TransactionState};
use crate::{turso_assert, Buffer, Connection, LimboError, Result};
use parking_lot::RwLock;
use std::cell::{Cell, OnceCell, RefCell, UnsafeCell};
use std::cell::{Ref, RefMut};
use std::collections::HashSet;
use std::hash;
use std::rc::Rc;
@@ -26,7 +25,7 @@ use super::sqlite3_ondisk::begin_write_btree_page;
use super::wal::CheckpointMode;
#[cfg(not(feature = "omit_autovacuum"))]
use {crate::io::Buffer as IoBuffer, ptrmap::*};
use ptrmap::*;
pub struct HeaderRef(PageRef);
@@ -49,12 +48,10 @@ impl HeaderRef {
Ok(IOResult::Done(Self(page)))
}
pub fn borrow(&self) -> Ref<'_, DatabaseHeader> {
pub fn borrow(&self) -> &DatabaseHeader {
// TODO: Instead of erasing mutability, implement `get_mut_contents` and return a shared reference.
let content: &PageContent = self.0.get_contents();
Ref::map(content.buffer.borrow(), |buffer| {
bytemuck::from_bytes::<DatabaseHeader>(&buffer.as_slice()[0..DatabaseHeader::SIZE])
})
bytemuck::from_bytes::<DatabaseHeader>(&content.buffer.as_slice()[0..DatabaseHeader::SIZE])
}
}
@@ -81,13 +78,11 @@ impl HeaderRefMut {
Ok(IOResult::Done(Self(page)))
}
pub fn borrow_mut(&self) -> RefMut<'_, DatabaseHeader> {
pub fn borrow_mut(&self) -> &mut DatabaseHeader {
let content = self.0.get_contents();
RefMut::map(content.buffer.borrow_mut(), |buffer| {
bytemuck::from_bytes_mut::<DatabaseHeader>(
&mut buffer.as_mut_slice()[0..DatabaseHeader::SIZE],
)
})
bytemuck::from_bytes_mut::<DatabaseHeader>(
&mut content.buffer.as_mut_slice()[0..DatabaseHeader::SIZE],
)
}
}
@@ -603,8 +598,7 @@ impl Pager {
}
};
let page_buffer_guard: std::cell::Ref<IoBuffer> = page_content.buffer.borrow();
let full_buffer_slice: &[u8] = page_buffer_guard.as_slice();
let full_buffer_slice: &[u8] = page_content.buffer.as_slice();
// Ptrmap pages are not page 1, so their internal offset within their buffer should be 0.
// The actual page data starts at page_content.offset within the full_buffer_slice.
@@ -695,8 +689,7 @@ impl Pager {
}
};
let mut page_buffer_guard = page_content.buffer.borrow_mut();
let full_buffer_slice = page_buffer_guard.as_mut_slice();
let full_buffer_slice = page_content.buffer.as_mut_slice();
if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > full_buffer_slice.len() {
return Err(LimboError::InternalError(format!(
@@ -1542,7 +1535,7 @@ impl Pager {
const TRUNK_PAGE_LEAF_COUNT_OFFSET: usize = 4; // Offset to leaf count
let header_ref = self.io.block(|| HeaderRefMut::from_pager(self))?;
let mut header = header_ref.borrow_mut();
let header = header_ref.borrow_mut();
let mut state = self.free_page_state.borrow_mut();
tracing::debug!(?state);
@@ -1742,7 +1735,7 @@ impl Pager {
const FREELIST_TRUNK_OFFSET_FIRST_LEAF: usize = 8;
let header_ref = self.io.block(|| HeaderRefMut::from_pager(self))?;
let mut header = header_ref.borrow_mut();
let header = header_ref.borrow_mut();
loop {
let mut state = self.allocate_page_state.borrow_mut();
@@ -2032,15 +2025,15 @@ impl Pager {
return Ok(IOResult::IO);
};
let header = header_ref.borrow();
Ok(IOResult::Done(f(&header)))
Ok(IOResult::Done(f(header)))
}
pub fn with_header_mut<T>(&self, f: impl Fn(&mut DatabaseHeader) -> T) -> Result<IOResult<T>> {
let IOResult::Done(header_ref) = HeaderRefMut::from_pager(self)? else {
return Ok(IOResult::IO);
};
let mut header = header_ref.borrow_mut();
Ok(IOResult::Done(f(&mut header)))
let header = header_ref.borrow_mut();
Ok(IOResult::Done(f(header)))
}
}
@@ -2052,7 +2045,7 @@ pub fn allocate_new_page(page_id: usize, buffer_pool: &Arc<BufferPool>, offset:
let drop_fn = Rc::new(move |buf| {
bp.put(buf);
});
let buffer = Arc::new(RefCell::new(Buffer::new(buffer, drop_fn)));
let buffer = Arc::new(Buffer::new(buffer, drop_fn));
page.set_loaded();
page.get().contents = Some(PageContent::new(offset, buffer));
}

View File

@@ -430,12 +430,12 @@ pub struct PageContent {
/// the position where page content starts. it's 100 for page 1(database file header is 100 bytes),
/// 0 for all other pages.
pub offset: usize,
pub buffer: Arc<RefCell<Buffer>>,
pub buffer: Arc<Buffer>,
pub overflow_cells: Vec<OverflowCell>,
}
impl PageContent {
pub fn new(offset: usize, buffer: Arc<RefCell<Buffer>>) -> Self {
pub fn new(offset: usize, buffer: Arc<Buffer>) -> Self {
Self {
offset,
buffer,
@@ -453,12 +453,7 @@ impl PageContent {
#[allow(clippy::mut_from_ref)]
pub fn as_ptr(&self) -> &mut [u8] {
unsafe {
// unsafe trick to borrow twice
let buf_pointer = &self.buffer.as_ptr();
let buf = (*buf_pointer).as_mut().unwrap().as_mut_slice();
buf
}
self.buffer.as_mut_slice()
}
pub fn read_u8(&self, pos: usize) -> u8 {
@@ -789,16 +784,16 @@ pub fn begin_read_page(
buffer_pool.put(buf);
});
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(RefCell::new(Buffer::new(buf, drop_fn)));
let complete = Box::new(move |mut buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
let buf_len = buf.borrow().len();
let buf = Arc::new(Buffer::new(buf, drop_fn));
let complete = Box::new(move |mut buf: Arc<Buffer>, bytes_read: i32| {
let buf_len = buf.len();
turso_assert!(
(allow_empty_read && bytes_read == 0) || bytes_read == buf_len as i32,
"read({bytes_read}) != expected({buf_len})"
);
let page = page.clone();
if bytes_read == 0 {
buf = Arc::new(RefCell::new(Buffer::allocate(0, Rc::new(|_| {}))));
buf = Arc::new(Buffer::allocate(0, Rc::new(|_| {})));
}
if finish_read_page(page_idx, buf, page.clone()).is_err() {
page.set_error();
@@ -809,11 +804,7 @@ pub fn begin_read_page(
}
#[instrument(skip_all, level = Level::INFO)]
pub fn finish_read_page(
page_idx: usize,
buffer_ref: Arc<RefCell<Buffer>>,
page: PageRef,
) -> Result<()> {
pub fn finish_read_page(page_idx: usize, buffer_ref: Arc<Buffer>, page: PageRef) -> Result<()> {
tracing::trace!(page_idx);
let pos = if page_idx == DatabaseHeader::PAGE_ID {
DatabaseHeader::SIZE
@@ -854,7 +845,7 @@ pub fn begin_write_btree_page(
Box::new(move |bytes_written: i32| {
tracing::trace!("finish_write_btree_page");
let buf_copy = buf_copy.clone();
let buf_len = buf_copy.borrow().len();
let buf_len = buf_copy.len();
*clone_counter.borrow_mut() -= 1;
page_finish.clear_dirty();
@@ -886,7 +877,7 @@ pub fn begin_write_btree_page(
/// for 3 total syscalls instead of 9.
pub fn write_pages_vectored(
pager: &Pager,
batch: BTreeMap<usize, Arc<RefCell<Buffer>>>,
batch: BTreeMap<usize, Arc<Buffer>>,
) -> Result<PendingFlush> {
if batch.is_empty() {
return Ok(PendingFlush::default());
@@ -1495,7 +1486,7 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
let drop_fn = Rc::new(|_buf| {});
let size = file.size()?;
#[allow(clippy::arc_with_non_send_sync)]
let buf_for_pread = Arc::new(RefCell::new(Buffer::allocate(size as usize, drop_fn)));
let buf_for_pread = Arc::new(Buffer::allocate(size as usize, drop_fn));
let header = Arc::new(SpinLock::new(WalHeader::default()));
#[allow(clippy::arc_with_non_send_sync)]
let wal_file_shared_ret = Arc::new(UnsafeCell::new(WalFileShared {
@@ -1520,8 +1511,7 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
}));
let wal_file_shared_for_completion = wal_file_shared_ret.clone();
let complete: Box<Complete> = Box::new(move |buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
let buf = buf.borrow();
let complete: Box<Complete> = Box::new(move |buf: Arc<Buffer>, bytes_read: i32| {
let buf_slice = buf.as_slice();
turso_assert!(
bytes_read == buf_slice.len() as i32,
@@ -1704,14 +1694,14 @@ pub fn begin_read_wal_frame_raw(
io: &Arc<dyn File>,
offset: usize,
page_size: u32,
complete: Box<dyn Fn(Arc<RefCell<Buffer>>, i32)>,
complete: Box<dyn Fn(Arc<Buffer>, i32)>,
) -> Result<Completion> {
tracing::trace!("begin_read_wal_frame_raw(offset={})", offset);
let drop_fn = Rc::new(|_buf| {});
let buf = Arc::new(RefCell::new(Buffer::allocate(
let buf = Arc::new(Buffer::allocate(
page_size as usize + WAL_FRAME_HEADER_SIZE,
drop_fn,
)));
));
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new_read(buf, complete);
let c = io.pread(offset, c)?;
@@ -1722,7 +1712,7 @@ pub fn begin_read_wal_frame(
io: &Arc<dyn File>,
offset: usize,
buffer_pool: Arc<BufferPool>,
complete: Box<dyn Fn(Arc<RefCell<Buffer>>, i32)>,
complete: Box<dyn Fn(Arc<Buffer>, i32)>,
) -> Result<Completion> {
tracing::trace!("begin_read_wal_frame(offset={})", offset);
let buf = buffer_pool.get();
@@ -1730,7 +1720,7 @@ pub fn begin_read_wal_frame(
let buffer_pool = buffer_pool.clone();
buffer_pool.put(buf);
});
let buf = Arc::new(RefCell::new(Buffer::new(buf, drop_fn)));
let buf = Arc::new(Buffer::new(buf, drop_fn));
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new_read(buf, complete);
let c = io.pread(offset, c)?;
@@ -1763,11 +1753,11 @@ pub fn prepare_wal_frame(
page_number: u32,
db_size: u32,
page: &[u8],
) -> ((u32, u32), Arc<RefCell<Buffer>>) {
) -> ((u32, u32), Arc<Buffer>) {
tracing::trace!(page_number);
let drop_fn = Rc::new(|_buf| {});
let mut buffer = Buffer::allocate(page_size as usize + WAL_FRAME_HEADER_SIZE, drop_fn);
let buffer = Buffer::allocate(page_size as usize + WAL_FRAME_HEADER_SIZE, drop_fn);
let frame = buffer.as_mut_slice();
frame[WAL_FRAME_HEADER_SIZE..].copy_from_slice(page);
@@ -1788,7 +1778,7 @@ pub fn prepare_wal_frame(
frame[16..20].copy_from_slice(&final_checksum.0.to_be_bytes());
frame[20..24].copy_from_slice(&final_checksum.1.to_be_bytes());
(final_checksum, Arc::new(RefCell::new(buffer)))
(final_checksum, Arc::new(buffer))
}
pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<Completion> {
@@ -1796,7 +1786,7 @@ pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<
let buffer = {
let drop_fn = Rc::new(|_buf| {});
let mut buffer = Buffer::allocate(WAL_HEADER_SIZE, drop_fn);
let buffer = Buffer::allocate(WAL_HEADER_SIZE, drop_fn);
let buf = buffer.as_mut_slice();
buf[0..4].copy_from_slice(&header.magic.to_be_bytes());
@@ -1809,13 +1799,13 @@ pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<
buf[28..32].copy_from_slice(&header.checksum_2.to_be_bytes());
#[allow(clippy::arc_with_non_send_sync)]
Arc::new(RefCell::new(buffer))
Arc::new(buffer)
};
let cloned = buffer.clone();
let write_complete = move |bytes_written: i32| {
// make sure to reference buffer so it's alive for async IO
let _buf = cloned.borrow();
let _buf = cloned.clone();
turso_assert!(
bytes_written == WAL_HEADER_SIZE as i32,
"wal header wrote({bytes_written}) != expected({WAL_HEADER_SIZE})"

View File

@@ -312,7 +312,7 @@ type PageId = usize;
/// Batch is a collection of pages that are being checkpointed together. It is used to
/// aggregate contiguous pages into a single write operation to the database file.
pub(super) struct Batch {
items: BTreeMap<PageId, Arc<RefCell<Buffer>>>,
items: BTreeMap<PageId, Arc<Buffer>>,
}
// TODO(preston): implement the same thing for `readv`
impl Batch {
@@ -339,7 +339,7 @@ impl Batch {
let raw = pool.get();
let pool_clone = pool.clone();
let drop_fn = Rc::new(move |b| pool_clone.put(b));
let new_buf = Arc::new(RefCell::new(Buffer::new(raw, drop_fn)));
let new_buf = Arc::new(Buffer::new(raw, drop_fn));
unsafe {
let inner = &mut *scratch.inner.get();
@@ -351,7 +351,7 @@ impl Batch {
}
impl std::ops::Deref for Batch {
type Target = BTreeMap<PageId, Arc<RefCell<Buffer>>>;
type Target = BTreeMap<PageId, Arc<Buffer>>;
fn deref(&self) -> &Self::Target {
&self.items
}
@@ -901,8 +901,8 @@ impl Wal for WalFile {
let offset = self.frame_offset(frame_id);
page.set_locked();
let frame = page.clone();
let complete = Box::new(move |buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
let buf_len = buf.borrow().len();
let complete = Box::new(move |buf: Arc<Buffer>, bytes_read: i32| {
let buf_len = buf.len();
turso_assert!(
bytes_read == buf_len as i32,
"read({bytes_read}) less than expected({buf_len})"
@@ -923,8 +923,7 @@ impl Wal for WalFile {
tracing::debug!("read_frame({})", frame_id);
let offset = self.frame_offset(frame_id);
let (frame_ptr, frame_len) = (frame.as_mut_ptr(), frame.len());
let complete = Box::new(move |buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
let buf = buf.borrow();
let complete = Box::new(move |buf: Arc<Buffer>, bytes_read: i32| {
let buf_len = buf.len();
turso_assert!(
bytes_read == buf_len as i32,
@@ -971,8 +970,7 @@ impl Wal for WalFile {
let (page_ptr, page_len) = (page.as_ptr(), page.len());
let complete = Box::new({
let conflict = conflict.clone();
move |buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
let buf = buf.borrow();
move |buf: Arc<Buffer>, bytes_read: i32| {
let buf_len = buf.len();
turso_assert!(
bytes_read == buf_len as i32,
@@ -1061,7 +1059,7 @@ impl Wal for WalFile {
let frame_bytes = frame_bytes.clone();
let write_counter = write_counter.clone();
move |bytes_written| {
let frame_len = frame_bytes.borrow().len();
let frame_len = frame_bytes.len();
turso_assert!(
bytes_written == frame_len as i32,
"wrote({bytes_written}) != expected({frame_len})"
@@ -1226,10 +1224,8 @@ impl WalFile {
let drop_fn = Rc::new(move |buf| {
buffer_pool.put(buf);
});
checkpoint_page.get().contents = Some(PageContent::new(
0,
Arc::new(RefCell::new(Buffer::new(buffer, drop_fn))),
));
checkpoint_page.get().contents =
Some(PageContent::new(0, Arc::new(Buffer::new(buffer, drop_fn))));
}
let header = unsafe { shared.get().as_mut().unwrap().wal_header.lock() };

View File

@@ -9,10 +9,7 @@ use tempfile;
use crate::{
error::LimboError,
io::{
Buffer, BufferData, Completion, CompletionType, File, OpenFlags, ReadCompletion,
WriteCompletion, IO,
},
io::{Buffer, BufferData, Completion, File, OpenFlags, IO},
storage::sqlite3_ondisk::{read_varint, varint_len, write_varint},
translate::collate::CollationSeq,
turso_assert,
@@ -369,14 +366,14 @@ impl SortedChunk {
let drop_fn = Rc::new(|_buffer: BufferData| {});
let read_buffer = Buffer::allocate(read_buffer_size, drop_fn);
let read_buffer_ref = Arc::new(RefCell::new(read_buffer));
let read_buffer_ref = Arc::new(read_buffer);
let chunk_io_state_copy = self.io_state.clone();
let stored_buffer_copy = self.buffer.clone();
let stored_buffer_len_copy = self.buffer_len.clone();
let total_bytes_read_copy = self.total_bytes_read.clone();
let read_complete = Box::new(move |buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
let read_buf_ref = buf.borrow();
let read_complete = Box::new(move |buf: Arc<Buffer>, bytes_read: i32| {
let read_buf_ref = buf.clone();
let read_buf = read_buf_ref.as_slice();
let bytes_read = bytes_read as usize;
@@ -398,10 +395,7 @@ impl SortedChunk {
total_bytes_read_copy.set(total_bytes_read_copy.get() + bytes_read);
});
let c = Completion::new(CompletionType::Read(ReadCompletion::new(
read_buffer_ref,
read_complete,
)));
let c = Completion::new_read(read_buffer_ref, read_complete);
let _c = self.file.pread(self.total_bytes_read.get(), c)?;
Ok(())
}
@@ -421,7 +415,7 @@ impl SortedChunk {
}
let drop_fn = Rc::new(|_buffer: BufferData| {});
let mut buffer = Buffer::allocate(self.chunk_size, drop_fn);
let buffer = Buffer::allocate(self.chunk_size, drop_fn);
let mut buf_pos = 0;
let buf = buffer.as_mut_slice();
@@ -435,19 +429,19 @@ impl SortedChunk {
buf_pos += payload.len();
}
let buffer_ref = Arc::new(RefCell::new(buffer));
let buffer_ref = Arc::new(buffer);
let buffer_ref_copy = buffer_ref.clone();
let chunk_io_state_copy = self.io_state.clone();
let write_complete = Box::new(move |bytes_written: i32| {
chunk_io_state_copy.set(SortedChunkIOState::WriteComplete);
let buf_len = buffer_ref_copy.borrow().len();
let buf_len = buffer_ref_copy.len();
if bytes_written < buf_len as i32 {
tracing::error!("wrote({bytes_written}) less than expected({buf_len})");
}
});
let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete)));
let c = Completion::new_write(write_complete);
let _c = self.file.pwrite(0, buffer_ref, c)?;
Ok(())
}

View File

@@ -173,7 +173,7 @@ impl File for SimulatorFile {
fn pwrite(
&self,
pos: usize,
buffer: Arc<RefCell<turso_core::Buffer>>,
buffer: Arc<turso_core::Buffer>,
c: turso_core::Completion,
) -> Result<turso_core::Completion> {
self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1);
@@ -225,7 +225,7 @@ impl File for SimulatorFile {
fn pwritev(
&self,
pos: usize,
buffers: Vec<Arc<RefCell<turso_core::Buffer>>>,
buffers: Vec<Arc<turso_core::Buffer>>,
c: turso_core::Completion,
) -> Result<turso_core::Completion> {
self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1);

View File

@@ -1,5 +1,4 @@
use std::{
cell::RefCell,
collections::{HashMap, HashSet},
path::Path,
pin::Pin,
@@ -432,7 +431,7 @@ fn write_at(io: &impl IO, file: Arc<dyn File>, offset: usize, data: &[u8]) {
let completion = Completion::new_write(|_| {});
let drop_fn = Rc::new(move |_| {});
#[allow(clippy::arc_with_non_send_sync)]
let buffer = Arc::new(RefCell::new(Buffer::new(Pin::new(data.to_vec()), drop_fn)));
let buffer = Arc::new(Buffer::new(Pin::new(data.to_vec()), drop_fn));
let result = file.pwrite(offset, buffer, completion).unwrap();
while !result.is_completed() {
io.run_once().unwrap();