Merge 'Use u64 for file offsets in I/O and calculate such offsets in u64' from Preston Thorpe

Using `usize` to compute file offsets caps us at ~16GB on 32-bit
systems. For example, with 4 KiB pages we can only address up to 1048576
pages; attempting the next page overflows a 32-bit usize and can wrap
the write offset, corrupting data. Switching our I/O APIs and offset
math to u64 avoids this overflow on 32-bit targets

Closes #2791
This commit is contained in:
Pekka Enberg
2025-09-02 09:06:49 +03:00
committed by GitHub
16 changed files with 220 additions and 128 deletions

View File

@@ -578,7 +578,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
if !(512..=65536).contains(&size) || size & (size - 1) != 0 {
return Err(turso_core::LimboError::NotADB);
}
let pos = (page_idx - 1) * size;
let pos = (page_idx as u64 - 1) * size as u64;
self.file.pread(pos, c)
}
@@ -590,7 +590,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
let size = buffer.len();
let pos = (page_idx - 1) * size;
let pos = (page_idx as u64 - 1) * size as u64;
self.file.pwrite(pos, buffer, c)
}
@@ -602,7 +602,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
_io_ctx: &turso_core::IOContext,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
let pos = first_page_idx.saturating_sub(1) * page_size;
let pos = first_page_idx.saturating_sub(1) as u64 * page_size as u64;
let c = self.file.pwritev(pos, buffers, c)?;
Ok(c)
}
@@ -620,7 +620,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
len: usize,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
let c = self.file.truncate(len, c)?;
let c = self.file.truncate(len as u64, c)?;
Ok(c)
}
}

View File

@@ -68,9 +68,9 @@ impl File for GenericFile {
}
#[instrument(skip(self, c), level = Level::TRACE)]
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
let mut file = self.file.write();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
file.seek(std::io::SeekFrom::Start(pos))?;
let nr = {
let r = c.as_read();
let buf = r.buf();
@@ -83,9 +83,9 @@ impl File for GenericFile {
}
#[instrument(skip(self, c, buffer), level = Level::TRACE)]
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
let mut file = self.file.write();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
file.seek(std::io::SeekFrom::Start(pos))?;
let buf = buffer.as_slice();
file.write_all(buf)?;
c.complete(buffer.len() as i32);
@@ -101,9 +101,9 @@ impl File for GenericFile {
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
let file = self.file.write();
file.set_len(len as u64)?;
file.set_len(len)?;
c.complete(0);
Ok(c)
}

View File

@@ -182,7 +182,7 @@ struct WritevState {
/// File descriptor/id of the file we are writing to
file_id: Fd,
/// absolute file offset for next submit
file_pos: usize,
file_pos: u64,
/// current buffer index in `bufs`
current_buffer_idx: usize,
/// intra-buffer offset
@@ -198,7 +198,7 @@ struct WritevState {
}
impl WritevState {
fn new(file: &UringFile, pos: usize, bufs: Vec<Arc<crate::Buffer>>) -> Self {
fn new(file: &UringFile, pos: u64, bufs: Vec<Arc<crate::Buffer>>) -> Self {
let file_id = file
.id()
.map(Fd::Fixed)
@@ -223,23 +223,23 @@ impl WritevState {
/// Advance (idx, off, pos) after written bytes
#[inline(always)]
fn advance(&mut self, written: usize) {
fn advance(&mut self, written: u64) {
let mut remaining = written;
while remaining > 0 {
let current_buf_len = self.bufs[self.current_buffer_idx].len();
let left = current_buf_len - self.current_buffer_offset;
if remaining < left {
self.current_buffer_offset += remaining;
if remaining < left as u64 {
self.current_buffer_offset += remaining as usize;
self.file_pos += remaining;
remaining = 0;
} else {
remaining -= left;
self.file_pos += left;
remaining -= left as u64;
self.file_pos += left as u64;
self.current_buffer_idx += 1;
self.current_buffer_offset = 0;
}
}
self.total_written += written;
self.total_written += written as usize;
}
#[inline(always)]
@@ -400,7 +400,7 @@ impl WrappedIOUring {
iov_allocation[0].iov_len as u32,
id as u16,
)
.offset(st.file_pos as u64)
.offset(st.file_pos)
.build()
.user_data(key)
} else {
@@ -409,7 +409,7 @@ impl WrappedIOUring {
iov_allocation[0].iov_base as *const u8,
iov_allocation[0].iov_len as u32,
)
.offset(st.file_pos as u64)
.offset(st.file_pos)
.build()
.user_data(key)
}
@@ -425,7 +425,7 @@ impl WrappedIOUring {
let entry = with_fd!(st.file_id, |fd| {
io_uring::opcode::Writev::new(fd, ptr, iov_count as u32)
.offset(st.file_pos as u64)
.offset(st.file_pos)
.build()
.user_data(key)
});
@@ -443,8 +443,8 @@ impl WrappedIOUring {
return;
}
let written = result as usize;
state.advance(written);
let written = result;
state.advance(written as u64);
match state.remaining() {
0 => {
tracing::info!(
@@ -643,7 +643,7 @@ impl File for UringFile {
Ok(())
}
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
let r = c.as_read();
let mut io = self.io.borrow_mut();
let read_e = {
@@ -663,14 +663,14 @@ impl File for UringFile {
io.debug_check_fixed(idx, ptr, len);
}
io_uring::opcode::ReadFixed::new(fd, ptr, len as u32, idx as u16)
.offset(pos as u64)
.offset(pos)
.build()
.user_data(get_key(c.clone()))
} else {
trace!("pread(pos = {}, length = {})", pos, len);
// Use Read opcode if fixed buffer is not available
io_uring::opcode::Read::new(fd, buf.as_mut_ptr(), len as u32)
.offset(pos as u64)
.offset(pos)
.build()
.user_data(get_key(c.clone()))
}
@@ -680,7 +680,7 @@ impl File for UringFile {
Ok(c)
}
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
let mut io = self.io.borrow_mut();
let write = {
let ptr = buffer.as_ptr();
@@ -698,13 +698,13 @@ impl File for UringFile {
io.debug_check_fixed(idx, ptr, len);
}
io_uring::opcode::WriteFixed::new(fd, ptr, len as u32, idx as u16)
.offset(pos as u64)
.offset(pos)
.build()
.user_data(get_key(c.clone()))
} else {
trace!("pwrite(pos = {}, length = {})", pos, buffer.len());
io_uring::opcode::Write::new(fd, ptr, len as u32)
.offset(pos as u64)
.offset(pos)
.build()
.user_data(get_key(c.clone()))
}
@@ -728,7 +728,7 @@ impl File for UringFile {
fn pwritev(
&self,
pos: usize,
pos: u64,
bufs: Vec<Arc<crate::Buffer>>,
c: Completion,
) -> Result<Completion> {
@@ -748,10 +748,10 @@ impl File for UringFile {
Ok(self.file.metadata()?.len())
}
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
let mut io = self.io.borrow_mut();
let truncate = with_fd!(self, |fd| {
io_uring::opcode::Ftruncate::new(fd, len as u64)
io_uring::opcode::Ftruncate::new(fd, len)
.build()
.user_data(get_key(c.clone()))
});

View File

@@ -69,17 +69,12 @@ impl IO for MemoryIO {
files.remove(path);
Ok(())
}
fn run_once(&self) -> Result<()> {
// nop
Ok(())
}
}
pub struct MemoryFile {
path: String,
pages: UnsafeCell<BTreeMap<usize, MemPage>>,
size: Cell<usize>,
size: Cell<u64>,
}
unsafe impl Send for MemoryFile {}
unsafe impl Sync for MemoryFile {}
@@ -92,10 +87,10 @@ impl File for MemoryFile {
Ok(())
}
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
tracing::debug!("pread(path={}): pos={}", self.path, pos);
let r = c.as_read();
let buf_len = r.buf().len();
let buf_len = r.buf().len() as u64;
if buf_len == 0 {
c.complete(0);
return Ok(c);
@@ -110,8 +105,8 @@ impl File for MemoryFile {
let read_len = buf_len.min(file_size - pos);
{
let read_buf = r.buf();
let mut offset = pos;
let mut remaining = read_len;
let mut offset = pos as usize;
let mut remaining = read_len as usize;
let mut buf_offset = 0;
while remaining > 0 {
@@ -134,7 +129,7 @@ impl File for MemoryFile {
Ok(c)
}
fn pwrite(&self, pos: usize, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
fn pwrite(&self, pos: u64, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
tracing::debug!(
"pwrite(path={}): pos={}, size={}",
self.path,
@@ -147,7 +142,7 @@ impl File for MemoryFile {
return Ok(c);
}
let mut offset = pos;
let mut offset = pos as usize;
let mut remaining = buf_len;
let mut buf_offset = 0;
let data = &buffer.as_slice();
@@ -158,7 +153,7 @@ impl File for MemoryFile {
let bytes_to_write = remaining.min(PAGE_SIZE - page_offset);
{
let page = self.get_or_allocate_page(page_no);
let page = self.get_or_allocate_page(page_no as u64);
page[page_offset..page_offset + bytes_to_write]
.copy_from_slice(&data[buf_offset..buf_offset + bytes_to_write]);
}
@@ -169,7 +164,7 @@ impl File for MemoryFile {
}
self.size
.set(core::cmp::max(pos + buf_len, self.size.get()));
.set(core::cmp::max(pos + buf_len as u64, self.size.get()));
c.complete(buf_len as i32);
Ok(c)
@@ -182,13 +177,13 @@ impl File for MemoryFile {
Ok(c)
}
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
tracing::debug!("truncate(path={}): len={}", self.path, len);
if len < self.size.get() {
// Truncate pages
unsafe {
let pages = &mut *self.pages.get();
pages.retain(|&k, _| k * PAGE_SIZE < len);
pages.retain(|&k, _| k * PAGE_SIZE < len as usize);
}
}
self.size.set(len);
@@ -196,14 +191,14 @@ impl File for MemoryFile {
Ok(c)
}
fn pwritev(&self, pos: usize, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
fn pwritev(&self, pos: u64, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
tracing::debug!(
"pwritev(path={}): pos={}, buffers={:?}",
self.path,
pos,
buffers.iter().map(|x| x.len()).collect::<Vec<_>>()
);
let mut offset = pos;
let mut offset = pos as usize;
let mut total_written = 0;
for buffer in buffers {
@@ -222,7 +217,7 @@ impl File for MemoryFile {
let bytes_to_write = remaining.min(PAGE_SIZE - page_offset);
{
let page = self.get_or_allocate_page(page_no);
let page = self.get_or_allocate_page(page_no as u64);
page[page_offset..page_offset + bytes_to_write]
.copy_from_slice(&data[buf_offset..buf_offset + bytes_to_write]);
}
@@ -235,23 +230,23 @@ impl File for MemoryFile {
}
c.complete(total_written as i32);
self.size
.set(core::cmp::max(pos + total_written, self.size.get()));
.set(core::cmp::max(pos + total_written as u64, self.size.get()));
Ok(c)
}
fn size(&self) -> Result<u64> {
tracing::debug!("size(path={}): {}", self.path, self.size.get());
Ok(self.size.get() as u64)
Ok(self.size.get())
}
}
impl MemoryFile {
#[allow(clippy::mut_from_ref)]
fn get_or_allocate_page(&self, page_no: usize) -> &mut MemPage {
fn get_or_allocate_page(&self, page_no: u64) -> &mut MemPage {
unsafe {
let pages = &mut *self.pages.get();
pages
.entry(page_no)
.entry(page_no as usize)
.or_insert_with(|| Box::new([0; PAGE_SIZE]))
}
}

View File

@@ -12,10 +12,10 @@ use std::{fmt::Debug, pin::Pin};
pub trait File: Send + Sync {
fn lock_file(&self, exclusive: bool) -> Result<()>;
fn unlock_file(&self) -> Result<()>;
fn pread(&self, pos: usize, c: Completion) -> Result<Completion>;
fn pwrite(&self, pos: usize, buffer: Arc<Buffer>, c: Completion) -> Result<Completion>;
fn pread(&self, pos: u64, c: Completion) -> Result<Completion>;
fn pwrite(&self, pos: u64, buffer: Arc<Buffer>, c: Completion) -> Result<Completion>;
fn sync(&self, c: Completion) -> Result<Completion>;
fn pwritev(&self, pos: usize, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
fn pwritev(&self, pos: u64, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
use std::sync::atomic::{AtomicUsize, Ordering};
if buffers.is_empty() {
c.complete(0);
@@ -56,12 +56,12 @@ pub trait File: Send + Sync {
c.abort();
return Err(e);
}
pos += len;
pos += len as u64;
}
Ok(c)
}
fn size(&self) -> Result<u64>;
fn truncate(&self, len: usize, c: Completion) -> Result<Completion>;
fn truncate(&self, len: u64, c: Completion) -> Result<Completion>;
}
#[derive(Debug, Copy, Clone, PartialEq)]
@@ -87,7 +87,9 @@ pub trait IO: Clock + Send + Sync {
// remove_file is used in the sync-engine
fn remove_file(&self, path: &str) -> Result<()>;
fn run_once(&self) -> Result<()>;
fn run_once(&self) -> Result<()> {
Ok(())
}
fn wait_for_completion(&self, c: Completion) -> Result<()> {
while !c.finished() {

View File

@@ -15,8 +15,6 @@ use std::{io::ErrorKind, sync::Arc};
use tracing::debug;
use tracing::{instrument, trace, Level};
/// UnixIO lives longer than any of the files it creates, so it is
/// safe to store references to it's internals in the UnixFiles
pub struct UnixIO {}
unsafe impl Send for UnixIO {}
@@ -127,24 +125,6 @@ impl IO for UnixIO {
}
}
// enum CompletionCallback {
// Read(Arc<Mutex<std::fs::File>>, Completion, usize),
// Write(
// Arc<Mutex<std::fs::File>>,
// Completion,
// Arc<crate::Buffer>,
// usize,
// ),
// Writev(
// Arc<Mutex<std::fs::File>>,
// Completion,
// Vec<Arc<crate::Buffer>>,
// usize, // absolute file offset
// usize, // buf index
// usize, // intra-buf offset
// ),
// }
pub struct UnixFile {
file: Arc<Mutex<std::fs::File>>,
}
@@ -192,7 +172,7 @@ impl File for UnixFile {
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
let file = self.file.lock();
let result = unsafe {
let r = c.as_read();
@@ -217,7 +197,7 @@ impl File for UnixFile {
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
let file = self.file.lock();
let result = unsafe {
libc::pwrite(
@@ -241,7 +221,7 @@ impl File for UnixFile {
#[instrument(err, skip_all, level = Level::TRACE)]
fn pwritev(
&self,
pos: usize,
pos: u64,
buffers: Vec<Arc<crate::Buffer>>,
c: Completion,
) -> Result<Completion> {
@@ -251,7 +231,7 @@ impl File for UnixFile {
}
let file = self.file.lock();
match try_pwritev_raw(file.as_raw_fd(), pos as u64, &buffers, 0, 0) {
match try_pwritev_raw(file.as_raw_fd(), pos, &buffers, 0, 0) {
Ok(written) => {
trace!("pwritev wrote {written}");
c.complete(written as i32);
@@ -268,24 +248,21 @@ impl File for UnixFile {
let file = self.file.lock();
let result = unsafe {
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
{
libc::fsync(file.as_raw_fd())
}
#[cfg(any(target_os = "macos", target_os = "ios"))]
{
libc::fcntl(file.as_raw_fd(), libc::F_FULLFSYNC)
}
};
if result == -1 {
let e = std::io::Error::last_os_error();
Err(e.into())
} else {
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
trace!("fsync");
@@ -304,9 +281,9 @@ impl File for UnixFile {
}
#[instrument(err, skip_all, level = Level::INFO)]
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
let file = self.file.lock();
let result = file.set_len(len as u64);
let result = file.set_len(len);
match result {
Ok(()) => {
trace!("file truncated to len=({})", len);

View File

@@ -81,8 +81,6 @@ impl VfsMod {
}
}
// #Safety:
// the callback wrapper in the extension library is FnOnce, so we know
/// # Safety
/// the callback wrapper in the extension library is FnOnce, so we know
/// that the into_raw/from_raw contract will hold
@@ -121,7 +119,7 @@ impl File for VfsFileImpl {
Ok(())
}
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
if self.vfs.is_null() {
c.complete(-1);
return Err(LimboError::ExtensionError("VFS is null".to_string()));
@@ -145,7 +143,7 @@ impl File for VfsFileImpl {
Ok(c)
}
fn pwrite(&self, pos: usize, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
fn pwrite(&self, pos: u64, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
if self.vfs.is_null() {
c.complete(-1);
return Err(LimboError::ExtensionError("VFS is null".to_string()));
@@ -192,7 +190,7 @@ impl File for VfsFileImpl {
}
}
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
if self.vfs.is_null() {
c.complete(-1);
return Err(LimboError::ExtensionError("VFS is null".to_string()));

115
core/io/windows.rs Normal file
View File

@@ -0,0 +1,115 @@
use crate::{Clock, Completion, File, Instant, LimboError, OpenFlags, Result, IO};
use parking_lot::RwLock;
use std::io::{Read, Seek, Write};
use std::sync::Arc;
use tracing::{debug, instrument, trace, Level};
pub struct WindowsIO {}
impl WindowsIO {
pub fn new() -> Result<Self> {
debug!("Using IO backend 'syscall'");
Ok(Self {})
}
}
impl IO for WindowsIO {
#[instrument(err, skip_all, level = Level::TRACE)]
fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result<Arc<dyn File>> {
trace!("open_file(path = {})", path);
let mut file = std::fs::File::options();
file.read(true);
if !flags.contains(OpenFlags::ReadOnly) {
file.write(true);
file.create(flags.contains(OpenFlags::Create));
}
let file = file.open(path)?;
Ok(Arc::new(WindowsFile {
file: RwLock::new(file),
}))
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn remove_file(&self, path: &str) -> Result<()> {
trace!("remove_file(path = {})", path);
Ok(std::fs::remove_file(path)?)
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn run_once(&self) -> Result<()> {
Ok(())
}
}
impl Clock for WindowsIO {
fn now(&self) -> Instant {
let now = chrono::Local::now();
Instant {
secs: now.timestamp(),
micros: now.timestamp_subsec_micros(),
}
}
}
pub struct WindowsFile {
file: RwLock<std::fs::File>,
}
impl File for WindowsFile {
#[instrument(err, skip_all, level = Level::TRACE)]
fn lock_file(&self, exclusive: bool) -> Result<()> {
unimplemented!()
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn unlock_file(&self) -> Result<()> {
unimplemented!()
}
#[instrument(skip(self, c), level = Level::TRACE)]
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
let mut file = self.file.write();
file.seek(std::io::SeekFrom::Start(pos))?;
let nr = {
let r = c.as_read();
let buf = r.buf();
let buf = buf.as_mut_slice();
file.read_exact(buf)?;
buf.len() as i32
};
c.complete(nr);
Ok(c)
}
#[instrument(skip(self, c, buffer), level = Level::TRACE)]
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
let mut file = self.file.write();
file.seek(std::io::SeekFrom::Start(pos))?;
let buf = buffer.as_slice();
file.write_all(buf)?;
c.complete(buffer.len() as i32);
Ok(c)
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn sync(&self, c: Completion) -> Result<Completion> {
let file = self.file.write();
file.sync_all()?;
c.complete(0);
Ok(c)
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
let file = self.file.write();
file.set_len(len)?;
c.complete(0);
Ok(c)
}
fn size(&self) -> Result<u64> {
let file = self.file.read();
Ok(file.metadata().unwrap().len())
}
}

View File

@@ -91,7 +91,9 @@ impl DatabaseStorage for DatabaseFile {
if !(512..=65536).contains(&size) || size & (size - 1) != 0 {
return Err(LimboError::NotADB);
}
let pos = (page_idx - 1) * size;
let Some(pos) = (page_idx as u64 - 1).checked_mul(size as u64) else {
return Err(LimboError::IntegerOverflow);
};
if let Some(ctx) = io_ctx.encryption_context() {
let encryption_ctx = ctx.clone();
@@ -145,7 +147,9 @@ impl DatabaseStorage for DatabaseFile {
assert!(buffer_size >= 512);
assert!(buffer_size <= 65536);
assert_eq!(buffer_size & (buffer_size - 1), 0);
let pos = (page_idx - 1) * buffer_size;
let Some(pos) = (page_idx as u64 - 1).checked_mul(buffer_size as u64) else {
return Err(LimboError::IntegerOverflow);
};
let buffer = {
if let Some(ctx) = io_ctx.encryption_context() {
encrypt_buffer(page_idx, buffer, ctx)
@@ -169,7 +173,9 @@ impl DatabaseStorage for DatabaseFile {
assert!(page_size <= 65536);
assert_eq!(page_size & (page_size - 1), 0);
let pos = (first_page_idx - 1) * page_size;
let Some(pos) = (first_page_idx as u64 - 1).checked_mul(page_size as u64) else {
return Err(LimboError::IntegerOverflow);
};
let buffers = {
if let Some(ctx) = io_ctx.encryption_context() {
buffers
@@ -198,7 +204,7 @@ impl DatabaseStorage for DatabaseFile {
#[instrument(skip_all, level = Level::INFO)]
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
let c = self.file.truncate(len, c)?;
let c = self.file.truncate(len as u64, c)?;
Ok(c)
}
}

View File

@@ -1838,7 +1838,7 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
pub fn begin_read_wal_frame_raw(
buffer_pool: &Arc<BufferPool>,
io: &Arc<dyn File>,
offset: usize,
offset: u64,
complete: Box<ReadComplete>,
) -> Result<Completion> {
tracing::trace!("begin_read_wal_frame_raw(offset={})", offset);
@@ -1851,7 +1851,7 @@ pub fn begin_read_wal_frame_raw(
pub fn begin_read_wal_frame(
io: &Arc<dyn File>,
offset: usize,
offset: u64,
buffer_pool: Arc<BufferPool>,
complete: Box<ReadComplete>,
page_idx: usize,

View File

@@ -1082,7 +1082,7 @@ impl Wal for WalFile {
});
begin_read_wal_frame(
&self.get_shared().file,
offset + WAL_FRAME_HEADER_SIZE,
offset + WAL_FRAME_HEADER_SIZE as u64,
buffer_pool,
complete,
page_idx,
@@ -1196,7 +1196,7 @@ impl Wal for WalFile {
});
let c = begin_read_wal_frame(
&self.get_shared().file,
offset + WAL_FRAME_HEADER_SIZE,
offset + WAL_FRAME_HEADER_SIZE as u64,
buffer_pool,
complete,
page_id as usize,
@@ -1460,14 +1460,14 @@ impl Wal for WalFile {
let mut next_frame_id = self.max_frame + 1;
// Build every frame in order, updating the rolling checksum
for (idx, page) in pages.iter().enumerate() {
let page_id = page.get().id as u64;
let page_id = page.get().id;
let plain = page.get_contents().as_ptr();
let data_to_write: std::borrow::Cow<[u8]> = {
let io_ctx = self.io_ctx.borrow();
let ectx = io_ctx.encryption_context();
if let Some(ctx) = ectx.as_ref() {
Cow::Owned(ctx.encrypt_page(plain, page_id as usize)?)
Cow::Owned(ctx.encrypt_page(plain, page_id)?)
} else {
Cow::Borrowed(plain)
}
@@ -1581,11 +1581,10 @@ impl WalFile {
self.get_shared().wal_header.lock().page_size
}
fn frame_offset(&self, frame_id: u64) -> usize {
fn frame_offset(&self, frame_id: u64) -> u64 {
assert!(frame_id > 0, "Frame ID must be 1-based");
let page_offset = (frame_id - 1) * (self.page_size() + WAL_FRAME_HEADER_SIZE as u32) as u64;
let offset = WAL_HEADER_SIZE as u64 + page_offset;
offset as usize
WAL_HEADER_SIZE as u64 + page_offset
}
#[allow(clippy::mut_from_ref)]
@@ -2131,7 +2130,7 @@ impl WalFile {
// schedule read of the page payload
let c = begin_read_wal_frame(
&self.get_shared().file,
offset + WAL_FRAME_HEADER_SIZE,
offset + WAL_FRAME_HEADER_SIZE as u64,
self.buffer_pool.clone(),
complete,
page_id,
@@ -2317,7 +2316,7 @@ pub mod test {
let done = Rc::new(Cell::new(false));
let _done = done.clone();
let _ = file.file.truncate(
WAL_HEADER_SIZE,
WAL_HEADER_SIZE as u64,
Completion::new_trunc(move |_| {
let done = _done.clone();
done.set(true);

View File

@@ -370,7 +370,7 @@ struct SortedChunk {
/// The chunk file.
file: Arc<dyn File>,
/// Offset of the start of chunk in file
start_offset: usize,
start_offset: u64,
/// The size of this chunk file in bytes.
chunk_size: usize,
/// The read buffer.
@@ -391,7 +391,7 @@ impl SortedChunk {
fn new(file: Arc<dyn File>, start_offset: usize, buffer_size: usize) -> Self {
Self {
file,
start_offset,
start_offset: start_offset as u64,
chunk_size: 0,
buffer: Rc::new(RefCell::new(vec![0; buffer_size])),
buffer_len: Rc::new(Cell::new(0)),
@@ -522,7 +522,7 @@ impl SortedChunk {
let c = Completion::new_read(read_buffer_ref, read_complete);
let c = self
.file
.pread(self.start_offset + self.total_bytes_read.get(), c)?;
.pread(self.start_offset + self.total_bytes_read.get() as u64, c)?;
Ok(c)
}

View File

@@ -150,7 +150,7 @@ impl File for SimulatorFile {
self.inner.unlock_file()
}
fn pread(&self, pos: usize, c: turso_core::Completion) -> Result<turso_core::Completion> {
fn pread(&self, pos: u64, c: turso_core::Completion) -> Result<turso_core::Completion> {
self.nr_pread_calls.set(self.nr_pread_calls.get() + 1);
if self.fault.get() {
tracing::debug!("pread fault");
@@ -173,7 +173,7 @@ impl File for SimulatorFile {
fn pwrite(
&self,
pos: usize,
pos: u64,
buffer: Arc<turso_core::Buffer>,
c: turso_core::Completion,
) -> Result<turso_core::Completion> {
@@ -227,7 +227,7 @@ impl File for SimulatorFile {
fn pwritev(
&self,
pos: usize,
pos: u64,
buffers: Vec<Arc<turso_core::Buffer>>,
c: turso_core::Completion,
) -> Result<turso_core::Completion> {
@@ -257,7 +257,7 @@ impl File for SimulatorFile {
self.inner.size()
}
fn truncate(&self, len: usize, c: turso_core::Completion) -> Result<turso_core::Completion> {
fn truncate(&self, len: u64, c: turso_core::Completion) -> Result<turso_core::Completion> {
if self.fault.get() {
return Err(turso_core::LimboError::InternalError(
FAULT_ERROR_MSG.into(),

View File

@@ -81,7 +81,7 @@ pub async fn db_bootstrap<C: ProtocolIO, Ctx>(
while !c.is_completed() {
coro.yield_(ProtocolCommand::IO).await?;
}
pos += content_len;
pos += content_len as u64;
}
if content.is_done()? {
break;
@@ -123,7 +123,7 @@ pub async fn wal_apply_from_file<Ctx>(
// todo(sivukhin): we need to error out in case of partial read
assert!(size as usize == WAL_FRAME_SIZE);
});
let c = frames_file.pread(offset as usize, c)?;
let c = frames_file.pread(offset, c)?;
while !c.is_completed() {
coro.yield_(ProtocolCommand::IO).await?;
}
@@ -243,7 +243,7 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
while !c.is_completed() {
coro.yield_(ProtocolCommand::IO).await?;
}
offset += WAL_FRAME_SIZE;
offset += WAL_FRAME_SIZE as u64;
}
let c = Completion::new_sync(move |_| {
@@ -323,7 +323,7 @@ pub async fn wal_pull_to_file_legacy<C: ProtocolIO, Ctx>(
coro.yield_(ProtocolCommand::IO).await?;
}
last_offset += WAL_FRAME_SIZE;
last_offset += WAL_FRAME_SIZE as u64;
buffer_len = 0;
start_frame += 1;
@@ -974,7 +974,7 @@ pub async fn bootstrap_db_file_v1<C: ProtocolIO, Ctx>(
};
assert!(rc as usize == 0);
});
let c = file.truncate(header.db_size as usize * PAGE_SIZE, c)?;
let c = file.truncate(header.db_size * PAGE_SIZE as u64, c)?;
while !c.is_completed() {
coro.yield_(ProtocolCommand::IO).await?;
}
@@ -984,7 +984,7 @@ pub async fn bootstrap_db_file_v1<C: ProtocolIO, Ctx>(
while let Some(page_data) =
wait_proto_message::<Ctx, PageData>(coro, &completion, &mut bytes).await?
{
let offset = page_data.page_id as usize * PAGE_SIZE;
let offset = page_data.page_id * PAGE_SIZE as u64;
let page = decode_page(&header, page_data)?;
if page.len() != PAGE_SIZE {
return Err(Error::DatabaseSyncEngineError(format!(
@@ -1074,7 +1074,7 @@ pub async fn reset_wal_file<Ctx>(
// let's truncate WAL file completely in order for this operation to safely execute on empty WAL in case of initial bootstrap phase
0
} else {
WAL_HEADER + WAL_FRAME_SIZE * (frames_count as usize)
WAL_HEADER as u64 + WAL_FRAME_SIZE as u64 * frames_count
};
tracing::debug!("reset db wal to the size of {} frames", frames_count);
let c = Completion::new_trunc(move |result| {

View File

@@ -59,7 +59,7 @@ impl IoOperations for Arc<dyn turso_core::IO> {
};
tracing::debug!("file truncated: rc={}", rc);
});
let c = file.truncate(len, c)?;
let c = file.truncate(len as u64, c)?;
while !c.is_completed() {
coro.yield_(ProtocolCommand::IO).await?;
}

View File

@@ -434,7 +434,7 @@ fn write_at(io: &impl IO, file: Arc<dyn File>, offset: usize, data: &[u8]) {
// reference the buffer to keep alive for async io
let _buf = _buf.clone();
});
let result = file.pwrite(offset, buffer, completion).unwrap();
let result = file.pwrite(offset as u64, buffer, completion).unwrap();
while !result.is_completed() {
io.run_once().unwrap();
}