mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-25 12:04:21 +01:00
Merge 'Use u64 for file offsets in I/O and calculate such offsets in u64' from Preston Thorpe
Using `usize` to compute file offsets caps us at ~16GB on 32-bit systems. For example, with 4 KiB pages we can only address up to 1048576 pages; attempting the next page overflows a 32-bit usize and can wrap the write offset, corrupting data. Switching our I/O APIs and offset math to u64 avoids this overflow on 32-bit targets Closes #2791
This commit is contained in:
@@ -578,7 +578,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
|
||||
if !(512..=65536).contains(&size) || size & (size - 1) != 0 {
|
||||
return Err(turso_core::LimboError::NotADB);
|
||||
}
|
||||
let pos = (page_idx - 1) * size;
|
||||
let pos = (page_idx as u64 - 1) * size as u64;
|
||||
self.file.pread(pos, c)
|
||||
}
|
||||
|
||||
@@ -590,7 +590,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
|
||||
c: turso_core::Completion,
|
||||
) -> turso_core::Result<turso_core::Completion> {
|
||||
let size = buffer.len();
|
||||
let pos = (page_idx - 1) * size;
|
||||
let pos = (page_idx as u64 - 1) * size as u64;
|
||||
self.file.pwrite(pos, buffer, c)
|
||||
}
|
||||
|
||||
@@ -602,7 +602,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
|
||||
_io_ctx: &turso_core::IOContext,
|
||||
c: turso_core::Completion,
|
||||
) -> turso_core::Result<turso_core::Completion> {
|
||||
let pos = first_page_idx.saturating_sub(1) * page_size;
|
||||
let pos = first_page_idx.saturating_sub(1) as u64 * page_size as u64;
|
||||
let c = self.file.pwritev(pos, buffers, c)?;
|
||||
Ok(c)
|
||||
}
|
||||
@@ -620,7 +620,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
|
||||
len: usize,
|
||||
c: turso_core::Completion,
|
||||
) -> turso_core::Result<turso_core::Completion> {
|
||||
let c = self.file.truncate(len, c)?;
|
||||
let c = self.file.truncate(len as u64, c)?;
|
||||
Ok(c)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,9 +68,9 @@ impl File for GenericFile {
|
||||
}
|
||||
|
||||
#[instrument(skip(self, c), level = Level::TRACE)]
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
||||
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
|
||||
let mut file = self.file.write();
|
||||
file.seek(std::io::SeekFrom::Start(pos as u64))?;
|
||||
file.seek(std::io::SeekFrom::Start(pos))?;
|
||||
let nr = {
|
||||
let r = c.as_read();
|
||||
let buf = r.buf();
|
||||
@@ -83,9 +83,9 @@ impl File for GenericFile {
|
||||
}
|
||||
|
||||
#[instrument(skip(self, c, buffer), level = Level::TRACE)]
|
||||
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
||||
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
||||
let mut file = self.file.write();
|
||||
file.seek(std::io::SeekFrom::Start(pos as u64))?;
|
||||
file.seek(std::io::SeekFrom::Start(pos))?;
|
||||
let buf = buffer.as_slice();
|
||||
file.write_all(buf)?;
|
||||
c.complete(buffer.len() as i32);
|
||||
@@ -101,9 +101,9 @@ impl File for GenericFile {
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
|
||||
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
|
||||
let file = self.file.write();
|
||||
file.set_len(len as u64)?;
|
||||
file.set_len(len)?;
|
||||
c.complete(0);
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
@@ -182,7 +182,7 @@ struct WritevState {
|
||||
/// File descriptor/id of the file we are writing to
|
||||
file_id: Fd,
|
||||
/// absolute file offset for next submit
|
||||
file_pos: usize,
|
||||
file_pos: u64,
|
||||
/// current buffer index in `bufs`
|
||||
current_buffer_idx: usize,
|
||||
/// intra-buffer offset
|
||||
@@ -198,7 +198,7 @@ struct WritevState {
|
||||
}
|
||||
|
||||
impl WritevState {
|
||||
fn new(file: &UringFile, pos: usize, bufs: Vec<Arc<crate::Buffer>>) -> Self {
|
||||
fn new(file: &UringFile, pos: u64, bufs: Vec<Arc<crate::Buffer>>) -> Self {
|
||||
let file_id = file
|
||||
.id()
|
||||
.map(Fd::Fixed)
|
||||
@@ -223,23 +223,23 @@ impl WritevState {
|
||||
|
||||
/// Advance (idx, off, pos) after written bytes
|
||||
#[inline(always)]
|
||||
fn advance(&mut self, written: usize) {
|
||||
fn advance(&mut self, written: u64) {
|
||||
let mut remaining = written;
|
||||
while remaining > 0 {
|
||||
let current_buf_len = self.bufs[self.current_buffer_idx].len();
|
||||
let left = current_buf_len - self.current_buffer_offset;
|
||||
if remaining < left {
|
||||
self.current_buffer_offset += remaining;
|
||||
if remaining < left as u64 {
|
||||
self.current_buffer_offset += remaining as usize;
|
||||
self.file_pos += remaining;
|
||||
remaining = 0;
|
||||
} else {
|
||||
remaining -= left;
|
||||
self.file_pos += left;
|
||||
remaining -= left as u64;
|
||||
self.file_pos += left as u64;
|
||||
self.current_buffer_idx += 1;
|
||||
self.current_buffer_offset = 0;
|
||||
}
|
||||
}
|
||||
self.total_written += written;
|
||||
self.total_written += written as usize;
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
@@ -400,7 +400,7 @@ impl WrappedIOUring {
|
||||
iov_allocation[0].iov_len as u32,
|
||||
id as u16,
|
||||
)
|
||||
.offset(st.file_pos as u64)
|
||||
.offset(st.file_pos)
|
||||
.build()
|
||||
.user_data(key)
|
||||
} else {
|
||||
@@ -409,7 +409,7 @@ impl WrappedIOUring {
|
||||
iov_allocation[0].iov_base as *const u8,
|
||||
iov_allocation[0].iov_len as u32,
|
||||
)
|
||||
.offset(st.file_pos as u64)
|
||||
.offset(st.file_pos)
|
||||
.build()
|
||||
.user_data(key)
|
||||
}
|
||||
@@ -425,7 +425,7 @@ impl WrappedIOUring {
|
||||
|
||||
let entry = with_fd!(st.file_id, |fd| {
|
||||
io_uring::opcode::Writev::new(fd, ptr, iov_count as u32)
|
||||
.offset(st.file_pos as u64)
|
||||
.offset(st.file_pos)
|
||||
.build()
|
||||
.user_data(key)
|
||||
});
|
||||
@@ -443,8 +443,8 @@ impl WrappedIOUring {
|
||||
return;
|
||||
}
|
||||
|
||||
let written = result as usize;
|
||||
state.advance(written);
|
||||
let written = result;
|
||||
state.advance(written as u64);
|
||||
match state.remaining() {
|
||||
0 => {
|
||||
tracing::info!(
|
||||
@@ -643,7 +643,7 @@ impl File for UringFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
||||
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
|
||||
let r = c.as_read();
|
||||
let mut io = self.io.borrow_mut();
|
||||
let read_e = {
|
||||
@@ -663,14 +663,14 @@ impl File for UringFile {
|
||||
io.debug_check_fixed(idx, ptr, len);
|
||||
}
|
||||
io_uring::opcode::ReadFixed::new(fd, ptr, len as u32, idx as u16)
|
||||
.offset(pos as u64)
|
||||
.offset(pos)
|
||||
.build()
|
||||
.user_data(get_key(c.clone()))
|
||||
} else {
|
||||
trace!("pread(pos = {}, length = {})", pos, len);
|
||||
// Use Read opcode if fixed buffer is not available
|
||||
io_uring::opcode::Read::new(fd, buf.as_mut_ptr(), len as u32)
|
||||
.offset(pos as u64)
|
||||
.offset(pos)
|
||||
.build()
|
||||
.user_data(get_key(c.clone()))
|
||||
}
|
||||
@@ -680,7 +680,7 @@ impl File for UringFile {
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
||||
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
||||
let mut io = self.io.borrow_mut();
|
||||
let write = {
|
||||
let ptr = buffer.as_ptr();
|
||||
@@ -698,13 +698,13 @@ impl File for UringFile {
|
||||
io.debug_check_fixed(idx, ptr, len);
|
||||
}
|
||||
io_uring::opcode::WriteFixed::new(fd, ptr, len as u32, idx as u16)
|
||||
.offset(pos as u64)
|
||||
.offset(pos)
|
||||
.build()
|
||||
.user_data(get_key(c.clone()))
|
||||
} else {
|
||||
trace!("pwrite(pos = {}, length = {})", pos, buffer.len());
|
||||
io_uring::opcode::Write::new(fd, ptr, len as u32)
|
||||
.offset(pos as u64)
|
||||
.offset(pos)
|
||||
.build()
|
||||
.user_data(get_key(c.clone()))
|
||||
}
|
||||
@@ -728,7 +728,7 @@ impl File for UringFile {
|
||||
|
||||
fn pwritev(
|
||||
&self,
|
||||
pos: usize,
|
||||
pos: u64,
|
||||
bufs: Vec<Arc<crate::Buffer>>,
|
||||
c: Completion,
|
||||
) -> Result<Completion> {
|
||||
@@ -748,10 +748,10 @@ impl File for UringFile {
|
||||
Ok(self.file.metadata()?.len())
|
||||
}
|
||||
|
||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
|
||||
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
|
||||
let mut io = self.io.borrow_mut();
|
||||
let truncate = with_fd!(self, |fd| {
|
||||
io_uring::opcode::Ftruncate::new(fd, len as u64)
|
||||
io_uring::opcode::Ftruncate::new(fd, len)
|
||||
.build()
|
||||
.user_data(get_key(c.clone()))
|
||||
});
|
||||
|
||||
@@ -69,17 +69,12 @@ impl IO for MemoryIO {
|
||||
files.remove(path);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn run_once(&self) -> Result<()> {
|
||||
// nop
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MemoryFile {
|
||||
path: String,
|
||||
pages: UnsafeCell<BTreeMap<usize, MemPage>>,
|
||||
size: Cell<usize>,
|
||||
size: Cell<u64>,
|
||||
}
|
||||
unsafe impl Send for MemoryFile {}
|
||||
unsafe impl Sync for MemoryFile {}
|
||||
@@ -92,10 +87,10 @@ impl File for MemoryFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
||||
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
|
||||
tracing::debug!("pread(path={}): pos={}", self.path, pos);
|
||||
let r = c.as_read();
|
||||
let buf_len = r.buf().len();
|
||||
let buf_len = r.buf().len() as u64;
|
||||
if buf_len == 0 {
|
||||
c.complete(0);
|
||||
return Ok(c);
|
||||
@@ -110,8 +105,8 @@ impl File for MemoryFile {
|
||||
let read_len = buf_len.min(file_size - pos);
|
||||
{
|
||||
let read_buf = r.buf();
|
||||
let mut offset = pos;
|
||||
let mut remaining = read_len;
|
||||
let mut offset = pos as usize;
|
||||
let mut remaining = read_len as usize;
|
||||
let mut buf_offset = 0;
|
||||
|
||||
while remaining > 0 {
|
||||
@@ -134,7 +129,7 @@ impl File for MemoryFile {
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn pwrite(&self, pos: usize, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
|
||||
fn pwrite(&self, pos: u64, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
|
||||
tracing::debug!(
|
||||
"pwrite(path={}): pos={}, size={}",
|
||||
self.path,
|
||||
@@ -147,7 +142,7 @@ impl File for MemoryFile {
|
||||
return Ok(c);
|
||||
}
|
||||
|
||||
let mut offset = pos;
|
||||
let mut offset = pos as usize;
|
||||
let mut remaining = buf_len;
|
||||
let mut buf_offset = 0;
|
||||
let data = &buffer.as_slice();
|
||||
@@ -158,7 +153,7 @@ impl File for MemoryFile {
|
||||
let bytes_to_write = remaining.min(PAGE_SIZE - page_offset);
|
||||
|
||||
{
|
||||
let page = self.get_or_allocate_page(page_no);
|
||||
let page = self.get_or_allocate_page(page_no as u64);
|
||||
page[page_offset..page_offset + bytes_to_write]
|
||||
.copy_from_slice(&data[buf_offset..buf_offset + bytes_to_write]);
|
||||
}
|
||||
@@ -169,7 +164,7 @@ impl File for MemoryFile {
|
||||
}
|
||||
|
||||
self.size
|
||||
.set(core::cmp::max(pos + buf_len, self.size.get()));
|
||||
.set(core::cmp::max(pos + buf_len as u64, self.size.get()));
|
||||
|
||||
c.complete(buf_len as i32);
|
||||
Ok(c)
|
||||
@@ -182,13 +177,13 @@ impl File for MemoryFile {
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
|
||||
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
|
||||
tracing::debug!("truncate(path={}): len={}", self.path, len);
|
||||
if len < self.size.get() {
|
||||
// Truncate pages
|
||||
unsafe {
|
||||
let pages = &mut *self.pages.get();
|
||||
pages.retain(|&k, _| k * PAGE_SIZE < len);
|
||||
pages.retain(|&k, _| k * PAGE_SIZE < len as usize);
|
||||
}
|
||||
}
|
||||
self.size.set(len);
|
||||
@@ -196,14 +191,14 @@ impl File for MemoryFile {
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn pwritev(&self, pos: usize, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
|
||||
fn pwritev(&self, pos: u64, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
|
||||
tracing::debug!(
|
||||
"pwritev(path={}): pos={}, buffers={:?}",
|
||||
self.path,
|
||||
pos,
|
||||
buffers.iter().map(|x| x.len()).collect::<Vec<_>>()
|
||||
);
|
||||
let mut offset = pos;
|
||||
let mut offset = pos as usize;
|
||||
let mut total_written = 0;
|
||||
|
||||
for buffer in buffers {
|
||||
@@ -222,7 +217,7 @@ impl File for MemoryFile {
|
||||
let bytes_to_write = remaining.min(PAGE_SIZE - page_offset);
|
||||
|
||||
{
|
||||
let page = self.get_or_allocate_page(page_no);
|
||||
let page = self.get_or_allocate_page(page_no as u64);
|
||||
page[page_offset..page_offset + bytes_to_write]
|
||||
.copy_from_slice(&data[buf_offset..buf_offset + bytes_to_write]);
|
||||
}
|
||||
@@ -235,23 +230,23 @@ impl File for MemoryFile {
|
||||
}
|
||||
c.complete(total_written as i32);
|
||||
self.size
|
||||
.set(core::cmp::max(pos + total_written, self.size.get()));
|
||||
.set(core::cmp::max(pos + total_written as u64, self.size.get()));
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn size(&self) -> Result<u64> {
|
||||
tracing::debug!("size(path={}): {}", self.path, self.size.get());
|
||||
Ok(self.size.get() as u64)
|
||||
Ok(self.size.get())
|
||||
}
|
||||
}
|
||||
|
||||
impl MemoryFile {
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
fn get_or_allocate_page(&self, page_no: usize) -> &mut MemPage {
|
||||
fn get_or_allocate_page(&self, page_no: u64) -> &mut MemPage {
|
||||
unsafe {
|
||||
let pages = &mut *self.pages.get();
|
||||
pages
|
||||
.entry(page_no)
|
||||
.entry(page_no as usize)
|
||||
.or_insert_with(|| Box::new([0; PAGE_SIZE]))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,10 +12,10 @@ use std::{fmt::Debug, pin::Pin};
|
||||
pub trait File: Send + Sync {
|
||||
fn lock_file(&self, exclusive: bool) -> Result<()>;
|
||||
fn unlock_file(&self) -> Result<()>;
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion>;
|
||||
fn pwrite(&self, pos: usize, buffer: Arc<Buffer>, c: Completion) -> Result<Completion>;
|
||||
fn pread(&self, pos: u64, c: Completion) -> Result<Completion>;
|
||||
fn pwrite(&self, pos: u64, buffer: Arc<Buffer>, c: Completion) -> Result<Completion>;
|
||||
fn sync(&self, c: Completion) -> Result<Completion>;
|
||||
fn pwritev(&self, pos: usize, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
|
||||
fn pwritev(&self, pos: u64, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
if buffers.is_empty() {
|
||||
c.complete(0);
|
||||
@@ -56,12 +56,12 @@ pub trait File: Send + Sync {
|
||||
c.abort();
|
||||
return Err(e);
|
||||
}
|
||||
pos += len;
|
||||
pos += len as u64;
|
||||
}
|
||||
Ok(c)
|
||||
}
|
||||
fn size(&self) -> Result<u64>;
|
||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion>;
|
||||
fn truncate(&self, len: u64, c: Completion) -> Result<Completion>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq)]
|
||||
@@ -87,7 +87,9 @@ pub trait IO: Clock + Send + Sync {
|
||||
// remove_file is used in the sync-engine
|
||||
fn remove_file(&self, path: &str) -> Result<()>;
|
||||
|
||||
fn run_once(&self) -> Result<()>;
|
||||
fn run_once(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn wait_for_completion(&self, c: Completion) -> Result<()> {
|
||||
while !c.finished() {
|
||||
|
||||
@@ -15,8 +15,6 @@ use std::{io::ErrorKind, sync::Arc};
|
||||
use tracing::debug;
|
||||
use tracing::{instrument, trace, Level};
|
||||
|
||||
/// UnixIO lives longer than any of the files it creates, so it is
|
||||
/// safe to store references to it's internals in the UnixFiles
|
||||
pub struct UnixIO {}
|
||||
|
||||
unsafe impl Send for UnixIO {}
|
||||
@@ -127,24 +125,6 @@ impl IO for UnixIO {
|
||||
}
|
||||
}
|
||||
|
||||
// enum CompletionCallback {
|
||||
// Read(Arc<Mutex<std::fs::File>>, Completion, usize),
|
||||
// Write(
|
||||
// Arc<Mutex<std::fs::File>>,
|
||||
// Completion,
|
||||
// Arc<crate::Buffer>,
|
||||
// usize,
|
||||
// ),
|
||||
// Writev(
|
||||
// Arc<Mutex<std::fs::File>>,
|
||||
// Completion,
|
||||
// Vec<Arc<crate::Buffer>>,
|
||||
// usize, // absolute file offset
|
||||
// usize, // buf index
|
||||
// usize, // intra-buf offset
|
||||
// ),
|
||||
// }
|
||||
|
||||
pub struct UnixFile {
|
||||
file: Arc<Mutex<std::fs::File>>,
|
||||
}
|
||||
@@ -192,7 +172,7 @@ impl File for UnixFile {
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
||||
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
|
||||
let file = self.file.lock();
|
||||
let result = unsafe {
|
||||
let r = c.as_read();
|
||||
@@ -217,7 +197,7 @@ impl File for UnixFile {
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
||||
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
||||
let file = self.file.lock();
|
||||
let result = unsafe {
|
||||
libc::pwrite(
|
||||
@@ -241,7 +221,7 @@ impl File for UnixFile {
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn pwritev(
|
||||
&self,
|
||||
pos: usize,
|
||||
pos: u64,
|
||||
buffers: Vec<Arc<crate::Buffer>>,
|
||||
c: Completion,
|
||||
) -> Result<Completion> {
|
||||
@@ -251,7 +231,7 @@ impl File for UnixFile {
|
||||
}
|
||||
let file = self.file.lock();
|
||||
|
||||
match try_pwritev_raw(file.as_raw_fd(), pos as u64, &buffers, 0, 0) {
|
||||
match try_pwritev_raw(file.as_raw_fd(), pos, &buffers, 0, 0) {
|
||||
Ok(written) => {
|
||||
trace!("pwritev wrote {written}");
|
||||
c.complete(written as i32);
|
||||
@@ -268,24 +248,21 @@ impl File for UnixFile {
|
||||
let file = self.file.lock();
|
||||
|
||||
let result = unsafe {
|
||||
|
||||
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
|
||||
{
|
||||
libc::fsync(file.as_raw_fd())
|
||||
}
|
||||
|
||||
|
||||
#[cfg(any(target_os = "macos", target_os = "ios"))]
|
||||
{
|
||||
libc::fcntl(file.as_raw_fd(), libc::F_FULLFSYNC)
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
|
||||
if result == -1 {
|
||||
let e = std::io::Error::last_os_error();
|
||||
Err(e.into())
|
||||
} else {
|
||||
|
||||
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
|
||||
trace!("fsync");
|
||||
|
||||
@@ -304,9 +281,9 @@ impl File for UnixFile {
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::INFO)]
|
||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
|
||||
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
|
||||
let file = self.file.lock();
|
||||
let result = file.set_len(len as u64);
|
||||
let result = file.set_len(len);
|
||||
match result {
|
||||
Ok(()) => {
|
||||
trace!("file truncated to len=({})", len);
|
||||
|
||||
@@ -81,8 +81,6 @@ impl VfsMod {
|
||||
}
|
||||
}
|
||||
|
||||
// #Safety:
|
||||
// the callback wrapper in the extension library is FnOnce, so we know
|
||||
/// # Safety
|
||||
/// the callback wrapper in the extension library is FnOnce, so we know
|
||||
/// that the into_raw/from_raw contract will hold
|
||||
@@ -121,7 +119,7 @@ impl File for VfsFileImpl {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
||||
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
|
||||
if self.vfs.is_null() {
|
||||
c.complete(-1);
|
||||
return Err(LimboError::ExtensionError("VFS is null".to_string()));
|
||||
@@ -145,7 +143,7 @@ impl File for VfsFileImpl {
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn pwrite(&self, pos: usize, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
|
||||
fn pwrite(&self, pos: u64, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
|
||||
if self.vfs.is_null() {
|
||||
c.complete(-1);
|
||||
return Err(LimboError::ExtensionError("VFS is null".to_string()));
|
||||
@@ -192,7 +190,7 @@ impl File for VfsFileImpl {
|
||||
}
|
||||
}
|
||||
|
||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
|
||||
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
|
||||
if self.vfs.is_null() {
|
||||
c.complete(-1);
|
||||
return Err(LimboError::ExtensionError("VFS is null".to_string()));
|
||||
|
||||
115
core/io/windows.rs
Normal file
115
core/io/windows.rs
Normal file
@@ -0,0 +1,115 @@
|
||||
use crate::{Clock, Completion, File, Instant, LimboError, OpenFlags, Result, IO};
|
||||
use parking_lot::RwLock;
|
||||
use std::io::{Read, Seek, Write};
|
||||
use std::sync::Arc;
|
||||
use tracing::{debug, instrument, trace, Level};
|
||||
pub struct WindowsIO {}
|
||||
|
||||
impl WindowsIO {
|
||||
pub fn new() -> Result<Self> {
|
||||
debug!("Using IO backend 'syscall'");
|
||||
Ok(Self {})
|
||||
}
|
||||
}
|
||||
|
||||
impl IO for WindowsIO {
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result<Arc<dyn File>> {
|
||||
trace!("open_file(path = {})", path);
|
||||
let mut file = std::fs::File::options();
|
||||
file.read(true);
|
||||
|
||||
if !flags.contains(OpenFlags::ReadOnly) {
|
||||
file.write(true);
|
||||
file.create(flags.contains(OpenFlags::Create));
|
||||
}
|
||||
|
||||
let file = file.open(path)?;
|
||||
Ok(Arc::new(WindowsFile {
|
||||
file: RwLock::new(file),
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn remove_file(&self, path: &str) -> Result<()> {
|
||||
trace!("remove_file(path = {})", path);
|
||||
Ok(std::fs::remove_file(path)?)
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn run_once(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Clock for WindowsIO {
|
||||
fn now(&self) -> Instant {
|
||||
let now = chrono::Local::now();
|
||||
Instant {
|
||||
secs: now.timestamp(),
|
||||
micros: now.timestamp_subsec_micros(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WindowsFile {
|
||||
file: RwLock<std::fs::File>,
|
||||
}
|
||||
|
||||
impl File for WindowsFile {
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn lock_file(&self, exclusive: bool) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn unlock_file(&self) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
#[instrument(skip(self, c), level = Level::TRACE)]
|
||||
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
|
||||
let mut file = self.file.write();
|
||||
file.seek(std::io::SeekFrom::Start(pos))?;
|
||||
let nr = {
|
||||
let r = c.as_read();
|
||||
let buf = r.buf();
|
||||
let buf = buf.as_mut_slice();
|
||||
file.read_exact(buf)?;
|
||||
buf.len() as i32
|
||||
};
|
||||
c.complete(nr);
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
#[instrument(skip(self, c, buffer), level = Level::TRACE)]
|
||||
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
||||
let mut file = self.file.write();
|
||||
file.seek(std::io::SeekFrom::Start(pos))?;
|
||||
let buf = buffer.as_slice();
|
||||
file.write_all(buf)?;
|
||||
c.complete(buffer.len() as i32);
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn sync(&self, c: Completion) -> Result<Completion> {
|
||||
let file = self.file.write();
|
||||
file.sync_all()?;
|
||||
c.complete(0);
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
|
||||
let file = self.file.write();
|
||||
file.set_len(len)?;
|
||||
c.complete(0);
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn size(&self) -> Result<u64> {
|
||||
let file = self.file.read();
|
||||
Ok(file.metadata().unwrap().len())
|
||||
}
|
||||
}
|
||||
@@ -91,7 +91,9 @@ impl DatabaseStorage for DatabaseFile {
|
||||
if !(512..=65536).contains(&size) || size & (size - 1) != 0 {
|
||||
return Err(LimboError::NotADB);
|
||||
}
|
||||
let pos = (page_idx - 1) * size;
|
||||
let Some(pos) = (page_idx as u64 - 1).checked_mul(size as u64) else {
|
||||
return Err(LimboError::IntegerOverflow);
|
||||
};
|
||||
|
||||
if let Some(ctx) = io_ctx.encryption_context() {
|
||||
let encryption_ctx = ctx.clone();
|
||||
@@ -145,7 +147,9 @@ impl DatabaseStorage for DatabaseFile {
|
||||
assert!(buffer_size >= 512);
|
||||
assert!(buffer_size <= 65536);
|
||||
assert_eq!(buffer_size & (buffer_size - 1), 0);
|
||||
let pos = (page_idx - 1) * buffer_size;
|
||||
let Some(pos) = (page_idx as u64 - 1).checked_mul(buffer_size as u64) else {
|
||||
return Err(LimboError::IntegerOverflow);
|
||||
};
|
||||
let buffer = {
|
||||
if let Some(ctx) = io_ctx.encryption_context() {
|
||||
encrypt_buffer(page_idx, buffer, ctx)
|
||||
@@ -169,7 +173,9 @@ impl DatabaseStorage for DatabaseFile {
|
||||
assert!(page_size <= 65536);
|
||||
assert_eq!(page_size & (page_size - 1), 0);
|
||||
|
||||
let pos = (first_page_idx - 1) * page_size;
|
||||
let Some(pos) = (first_page_idx as u64 - 1).checked_mul(page_size as u64) else {
|
||||
return Err(LimboError::IntegerOverflow);
|
||||
};
|
||||
let buffers = {
|
||||
if let Some(ctx) = io_ctx.encryption_context() {
|
||||
buffers
|
||||
@@ -198,7 +204,7 @@ impl DatabaseStorage for DatabaseFile {
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
|
||||
let c = self.file.truncate(len, c)?;
|
||||
let c = self.file.truncate(len as u64, c)?;
|
||||
Ok(c)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1838,7 +1838,7 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
|
||||
pub fn begin_read_wal_frame_raw(
|
||||
buffer_pool: &Arc<BufferPool>,
|
||||
io: &Arc<dyn File>,
|
||||
offset: usize,
|
||||
offset: u64,
|
||||
complete: Box<ReadComplete>,
|
||||
) -> Result<Completion> {
|
||||
tracing::trace!("begin_read_wal_frame_raw(offset={})", offset);
|
||||
@@ -1851,7 +1851,7 @@ pub fn begin_read_wal_frame_raw(
|
||||
|
||||
pub fn begin_read_wal_frame(
|
||||
io: &Arc<dyn File>,
|
||||
offset: usize,
|
||||
offset: u64,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
complete: Box<ReadComplete>,
|
||||
page_idx: usize,
|
||||
|
||||
@@ -1082,7 +1082,7 @@ impl Wal for WalFile {
|
||||
});
|
||||
begin_read_wal_frame(
|
||||
&self.get_shared().file,
|
||||
offset + WAL_FRAME_HEADER_SIZE,
|
||||
offset + WAL_FRAME_HEADER_SIZE as u64,
|
||||
buffer_pool,
|
||||
complete,
|
||||
page_idx,
|
||||
@@ -1196,7 +1196,7 @@ impl Wal for WalFile {
|
||||
});
|
||||
let c = begin_read_wal_frame(
|
||||
&self.get_shared().file,
|
||||
offset + WAL_FRAME_HEADER_SIZE,
|
||||
offset + WAL_FRAME_HEADER_SIZE as u64,
|
||||
buffer_pool,
|
||||
complete,
|
||||
page_id as usize,
|
||||
@@ -1460,14 +1460,14 @@ impl Wal for WalFile {
|
||||
let mut next_frame_id = self.max_frame + 1;
|
||||
// Build every frame in order, updating the rolling checksum
|
||||
for (idx, page) in pages.iter().enumerate() {
|
||||
let page_id = page.get().id as u64;
|
||||
let page_id = page.get().id;
|
||||
let plain = page.get_contents().as_ptr();
|
||||
|
||||
let data_to_write: std::borrow::Cow<[u8]> = {
|
||||
let io_ctx = self.io_ctx.borrow();
|
||||
let ectx = io_ctx.encryption_context();
|
||||
if let Some(ctx) = ectx.as_ref() {
|
||||
Cow::Owned(ctx.encrypt_page(plain, page_id as usize)?)
|
||||
Cow::Owned(ctx.encrypt_page(plain, page_id)?)
|
||||
} else {
|
||||
Cow::Borrowed(plain)
|
||||
}
|
||||
@@ -1581,11 +1581,10 @@ impl WalFile {
|
||||
self.get_shared().wal_header.lock().page_size
|
||||
}
|
||||
|
||||
fn frame_offset(&self, frame_id: u64) -> usize {
|
||||
fn frame_offset(&self, frame_id: u64) -> u64 {
|
||||
assert!(frame_id > 0, "Frame ID must be 1-based");
|
||||
let page_offset = (frame_id - 1) * (self.page_size() + WAL_FRAME_HEADER_SIZE as u32) as u64;
|
||||
let offset = WAL_HEADER_SIZE as u64 + page_offset;
|
||||
offset as usize
|
||||
WAL_HEADER_SIZE as u64 + page_offset
|
||||
}
|
||||
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
@@ -2131,7 +2130,7 @@ impl WalFile {
|
||||
// schedule read of the page payload
|
||||
let c = begin_read_wal_frame(
|
||||
&self.get_shared().file,
|
||||
offset + WAL_FRAME_HEADER_SIZE,
|
||||
offset + WAL_FRAME_HEADER_SIZE as u64,
|
||||
self.buffer_pool.clone(),
|
||||
complete,
|
||||
page_id,
|
||||
@@ -2317,7 +2316,7 @@ pub mod test {
|
||||
let done = Rc::new(Cell::new(false));
|
||||
let _done = done.clone();
|
||||
let _ = file.file.truncate(
|
||||
WAL_HEADER_SIZE,
|
||||
WAL_HEADER_SIZE as u64,
|
||||
Completion::new_trunc(move |_| {
|
||||
let done = _done.clone();
|
||||
done.set(true);
|
||||
|
||||
@@ -370,7 +370,7 @@ struct SortedChunk {
|
||||
/// The chunk file.
|
||||
file: Arc<dyn File>,
|
||||
/// Offset of the start of chunk in file
|
||||
start_offset: usize,
|
||||
start_offset: u64,
|
||||
/// The size of this chunk file in bytes.
|
||||
chunk_size: usize,
|
||||
/// The read buffer.
|
||||
@@ -391,7 +391,7 @@ impl SortedChunk {
|
||||
fn new(file: Arc<dyn File>, start_offset: usize, buffer_size: usize) -> Self {
|
||||
Self {
|
||||
file,
|
||||
start_offset,
|
||||
start_offset: start_offset as u64,
|
||||
chunk_size: 0,
|
||||
buffer: Rc::new(RefCell::new(vec![0; buffer_size])),
|
||||
buffer_len: Rc::new(Cell::new(0)),
|
||||
@@ -522,7 +522,7 @@ impl SortedChunk {
|
||||
let c = Completion::new_read(read_buffer_ref, read_complete);
|
||||
let c = self
|
||||
.file
|
||||
.pread(self.start_offset + self.total_bytes_read.get(), c)?;
|
||||
.pread(self.start_offset + self.total_bytes_read.get() as u64, c)?;
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
|
||||
@@ -150,7 +150,7 @@ impl File for SimulatorFile {
|
||||
self.inner.unlock_file()
|
||||
}
|
||||
|
||||
fn pread(&self, pos: usize, c: turso_core::Completion) -> Result<turso_core::Completion> {
|
||||
fn pread(&self, pos: u64, c: turso_core::Completion) -> Result<turso_core::Completion> {
|
||||
self.nr_pread_calls.set(self.nr_pread_calls.get() + 1);
|
||||
if self.fault.get() {
|
||||
tracing::debug!("pread fault");
|
||||
@@ -173,7 +173,7 @@ impl File for SimulatorFile {
|
||||
|
||||
fn pwrite(
|
||||
&self,
|
||||
pos: usize,
|
||||
pos: u64,
|
||||
buffer: Arc<turso_core::Buffer>,
|
||||
c: turso_core::Completion,
|
||||
) -> Result<turso_core::Completion> {
|
||||
@@ -227,7 +227,7 @@ impl File for SimulatorFile {
|
||||
|
||||
fn pwritev(
|
||||
&self,
|
||||
pos: usize,
|
||||
pos: u64,
|
||||
buffers: Vec<Arc<turso_core::Buffer>>,
|
||||
c: turso_core::Completion,
|
||||
) -> Result<turso_core::Completion> {
|
||||
@@ -257,7 +257,7 @@ impl File for SimulatorFile {
|
||||
self.inner.size()
|
||||
}
|
||||
|
||||
fn truncate(&self, len: usize, c: turso_core::Completion) -> Result<turso_core::Completion> {
|
||||
fn truncate(&self, len: u64, c: turso_core::Completion) -> Result<turso_core::Completion> {
|
||||
if self.fault.get() {
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
FAULT_ERROR_MSG.into(),
|
||||
|
||||
@@ -81,7 +81,7 @@ pub async fn db_bootstrap<C: ProtocolIO, Ctx>(
|
||||
while !c.is_completed() {
|
||||
coro.yield_(ProtocolCommand::IO).await?;
|
||||
}
|
||||
pos += content_len;
|
||||
pos += content_len as u64;
|
||||
}
|
||||
if content.is_done()? {
|
||||
break;
|
||||
@@ -123,7 +123,7 @@ pub async fn wal_apply_from_file<Ctx>(
|
||||
// todo(sivukhin): we need to error out in case of partial read
|
||||
assert!(size as usize == WAL_FRAME_SIZE);
|
||||
});
|
||||
let c = frames_file.pread(offset as usize, c)?;
|
||||
let c = frames_file.pread(offset, c)?;
|
||||
while !c.is_completed() {
|
||||
coro.yield_(ProtocolCommand::IO).await?;
|
||||
}
|
||||
@@ -243,7 +243,7 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
|
||||
while !c.is_completed() {
|
||||
coro.yield_(ProtocolCommand::IO).await?;
|
||||
}
|
||||
offset += WAL_FRAME_SIZE;
|
||||
offset += WAL_FRAME_SIZE as u64;
|
||||
}
|
||||
|
||||
let c = Completion::new_sync(move |_| {
|
||||
@@ -323,7 +323,7 @@ pub async fn wal_pull_to_file_legacy<C: ProtocolIO, Ctx>(
|
||||
coro.yield_(ProtocolCommand::IO).await?;
|
||||
}
|
||||
|
||||
last_offset += WAL_FRAME_SIZE;
|
||||
last_offset += WAL_FRAME_SIZE as u64;
|
||||
buffer_len = 0;
|
||||
start_frame += 1;
|
||||
|
||||
@@ -974,7 +974,7 @@ pub async fn bootstrap_db_file_v1<C: ProtocolIO, Ctx>(
|
||||
};
|
||||
assert!(rc as usize == 0);
|
||||
});
|
||||
let c = file.truncate(header.db_size as usize * PAGE_SIZE, c)?;
|
||||
let c = file.truncate(header.db_size * PAGE_SIZE as u64, c)?;
|
||||
while !c.is_completed() {
|
||||
coro.yield_(ProtocolCommand::IO).await?;
|
||||
}
|
||||
@@ -984,7 +984,7 @@ pub async fn bootstrap_db_file_v1<C: ProtocolIO, Ctx>(
|
||||
while let Some(page_data) =
|
||||
wait_proto_message::<Ctx, PageData>(coro, &completion, &mut bytes).await?
|
||||
{
|
||||
let offset = page_data.page_id as usize * PAGE_SIZE;
|
||||
let offset = page_data.page_id * PAGE_SIZE as u64;
|
||||
let page = decode_page(&header, page_data)?;
|
||||
if page.len() != PAGE_SIZE {
|
||||
return Err(Error::DatabaseSyncEngineError(format!(
|
||||
@@ -1074,7 +1074,7 @@ pub async fn reset_wal_file<Ctx>(
|
||||
// let's truncate WAL file completely in order for this operation to safely execute on empty WAL in case of initial bootstrap phase
|
||||
0
|
||||
} else {
|
||||
WAL_HEADER + WAL_FRAME_SIZE * (frames_count as usize)
|
||||
WAL_HEADER as u64 + WAL_FRAME_SIZE as u64 * frames_count
|
||||
};
|
||||
tracing::debug!("reset db wal to the size of {} frames", frames_count);
|
||||
let c = Completion::new_trunc(move |result| {
|
||||
|
||||
@@ -59,7 +59,7 @@ impl IoOperations for Arc<dyn turso_core::IO> {
|
||||
};
|
||||
tracing::debug!("file truncated: rc={}", rc);
|
||||
});
|
||||
let c = file.truncate(len, c)?;
|
||||
let c = file.truncate(len as u64, c)?;
|
||||
while !c.is_completed() {
|
||||
coro.yield_(ProtocolCommand::IO).await?;
|
||||
}
|
||||
|
||||
@@ -434,7 +434,7 @@ fn write_at(io: &impl IO, file: Arc<dyn File>, offset: usize, data: &[u8]) {
|
||||
// reference the buffer to keep alive for async io
|
||||
let _buf = _buf.clone();
|
||||
});
|
||||
let result = file.pwrite(offset, buffer, completion).unwrap();
|
||||
let result = file.pwrite(offset as u64, buffer, completion).unwrap();
|
||||
while !result.is_completed() {
|
||||
io.run_once().unwrap();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user