diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index fb7162fd2..3b0d8a466 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -578,7 +578,7 @@ impl turso_core::DatabaseStorage for DatabaseFile { if !(512..=65536).contains(&size) || size & (size - 1) != 0 { return Err(turso_core::LimboError::NotADB); } - let pos = (page_idx - 1) * size; + let pos = (page_idx as u64 - 1) * size as u64; self.file.pread(pos, c) } @@ -590,7 +590,7 @@ impl turso_core::DatabaseStorage for DatabaseFile { c: turso_core::Completion, ) -> turso_core::Result { let size = buffer.len(); - let pos = (page_idx - 1) * size; + let pos = (page_idx as u64 - 1) * size as u64; self.file.pwrite(pos, buffer, c) } @@ -602,7 +602,7 @@ impl turso_core::DatabaseStorage for DatabaseFile { _io_ctx: &turso_core::IOContext, c: turso_core::Completion, ) -> turso_core::Result { - let pos = first_page_idx.saturating_sub(1) * page_size; + let pos = first_page_idx.saturating_sub(1) as u64 * page_size as u64; let c = self.file.pwritev(pos, buffers, c)?; Ok(c) } @@ -620,7 +620,7 @@ impl turso_core::DatabaseStorage for DatabaseFile { len: usize, c: turso_core::Completion, ) -> turso_core::Result { - let c = self.file.truncate(len, c)?; + let c = self.file.truncate(len as u64, c)?; Ok(c) } } diff --git a/core/io/generic.rs b/core/io/generic.rs index 83caa1405..7b7fcf99b 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -68,9 +68,9 @@ impl File for GenericFile { } #[instrument(skip(self, c), level = Level::TRACE)] - fn pread(&self, pos: usize, c: Completion) -> Result { + fn pread(&self, pos: u64, c: Completion) -> Result { let mut file = self.file.write(); - file.seek(std::io::SeekFrom::Start(pos as u64))?; + file.seek(std::io::SeekFrom::Start(pos))?; let nr = { let r = c.as_read(); let buf = r.buf(); @@ -83,9 +83,9 @@ impl File for GenericFile { } #[instrument(skip(self, c, buffer), level = Level::TRACE)] - fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { + fn pwrite(&self, pos: u64, buffer: Arc, c: Completion) -> Result { let mut file = self.file.write(); - file.seek(std::io::SeekFrom::Start(pos as u64))?; + file.seek(std::io::SeekFrom::Start(pos))?; let buf = buffer.as_slice(); file.write_all(buf)?; c.complete(buffer.len() as i32); @@ -101,9 +101,9 @@ impl File for GenericFile { } #[instrument(err, skip_all, level = Level::TRACE)] - fn truncate(&self, len: usize, c: Completion) -> Result { + fn truncate(&self, len: u64, c: Completion) -> Result { let file = self.file.write(); - file.set_len(len as u64)?; + file.set_len(len)?; c.complete(0); Ok(c) } diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 0c727560d..aca0ed42f 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -182,7 +182,7 @@ struct WritevState { /// File descriptor/id of the file we are writing to file_id: Fd, /// absolute file offset for next submit - file_pos: usize, + file_pos: u64, /// current buffer index in `bufs` current_buffer_idx: usize, /// intra-buffer offset @@ -198,7 +198,7 @@ struct WritevState { } impl WritevState { - fn new(file: &UringFile, pos: usize, bufs: Vec>) -> Self { + fn new(file: &UringFile, pos: u64, bufs: Vec>) -> Self { let file_id = file .id() .map(Fd::Fixed) @@ -223,23 +223,23 @@ impl WritevState { /// Advance (idx, off, pos) after written bytes #[inline(always)] - fn advance(&mut self, written: usize) { + fn advance(&mut self, written: u64) { let mut remaining = written; while remaining > 0 { let current_buf_len = self.bufs[self.current_buffer_idx].len(); let left = current_buf_len - self.current_buffer_offset; - if remaining < left { - self.current_buffer_offset += remaining; + if remaining < left as u64 { + self.current_buffer_offset += remaining as usize; self.file_pos += remaining; remaining = 0; } else { - remaining -= left; - self.file_pos += left; + remaining -= left as u64; + self.file_pos += left as u64; self.current_buffer_idx += 1; self.current_buffer_offset = 0; } } - self.total_written += written; + self.total_written += written as usize; } #[inline(always)] @@ -400,7 +400,7 @@ impl WrappedIOUring { iov_allocation[0].iov_len as u32, id as u16, ) - .offset(st.file_pos as u64) + .offset(st.file_pos) .build() .user_data(key) } else { @@ -409,7 +409,7 @@ impl WrappedIOUring { iov_allocation[0].iov_base as *const u8, iov_allocation[0].iov_len as u32, ) - .offset(st.file_pos as u64) + .offset(st.file_pos) .build() .user_data(key) } @@ -425,7 +425,7 @@ impl WrappedIOUring { let entry = with_fd!(st.file_id, |fd| { io_uring::opcode::Writev::new(fd, ptr, iov_count as u32) - .offset(st.file_pos as u64) + .offset(st.file_pos) .build() .user_data(key) }); @@ -443,8 +443,8 @@ impl WrappedIOUring { return; } - let written = result as usize; - state.advance(written); + let written = result; + state.advance(written as u64); match state.remaining() { 0 => { tracing::info!( @@ -643,7 +643,7 @@ impl File for UringFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result { + fn pread(&self, pos: u64, c: Completion) -> Result { let r = c.as_read(); let mut io = self.io.borrow_mut(); let read_e = { @@ -663,14 +663,14 @@ impl File for UringFile { io.debug_check_fixed(idx, ptr, len); } io_uring::opcode::ReadFixed::new(fd, ptr, len as u32, idx as u16) - .offset(pos as u64) + .offset(pos) .build() .user_data(get_key(c.clone())) } else { trace!("pread(pos = {}, length = {})", pos, len); // Use Read opcode if fixed buffer is not available io_uring::opcode::Read::new(fd, buf.as_mut_ptr(), len as u32) - .offset(pos as u64) + .offset(pos) .build() .user_data(get_key(c.clone())) } @@ -680,7 +680,7 @@ impl File for UringFile { Ok(c) } - fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { + fn pwrite(&self, pos: u64, buffer: Arc, c: Completion) -> Result { let mut io = self.io.borrow_mut(); let write = { let ptr = buffer.as_ptr(); @@ -698,13 +698,13 @@ impl File for UringFile { io.debug_check_fixed(idx, ptr, len); } io_uring::opcode::WriteFixed::new(fd, ptr, len as u32, idx as u16) - .offset(pos as u64) + .offset(pos) .build() .user_data(get_key(c.clone())) } else { trace!("pwrite(pos = {}, length = {})", pos, buffer.len()); io_uring::opcode::Write::new(fd, ptr, len as u32) - .offset(pos as u64) + .offset(pos) .build() .user_data(get_key(c.clone())) } @@ -728,7 +728,7 @@ impl File for UringFile { fn pwritev( &self, - pos: usize, + pos: u64, bufs: Vec>, c: Completion, ) -> Result { @@ -748,10 +748,10 @@ impl File for UringFile { Ok(self.file.metadata()?.len()) } - fn truncate(&self, len: usize, c: Completion) -> Result { + fn truncate(&self, len: u64, c: Completion) -> Result { let mut io = self.io.borrow_mut(); let truncate = with_fd!(self, |fd| { - io_uring::opcode::Ftruncate::new(fd, len as u64) + io_uring::opcode::Ftruncate::new(fd, len) .build() .user_data(get_key(c.clone())) }); diff --git a/core/io/memory.rs b/core/io/memory.rs index cea17b199..c69d87dcf 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -69,17 +69,12 @@ impl IO for MemoryIO { files.remove(path); Ok(()) } - - fn run_once(&self) -> Result<()> { - // nop - Ok(()) - } } pub struct MemoryFile { path: String, pages: UnsafeCell>, - size: Cell, + size: Cell, } unsafe impl Send for MemoryFile {} unsafe impl Sync for MemoryFile {} @@ -92,10 +87,10 @@ impl File for MemoryFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result { + fn pread(&self, pos: u64, c: Completion) -> Result { tracing::debug!("pread(path={}): pos={}", self.path, pos); let r = c.as_read(); - let buf_len = r.buf().len(); + let buf_len = r.buf().len() as u64; if buf_len == 0 { c.complete(0); return Ok(c); @@ -110,8 +105,8 @@ impl File for MemoryFile { let read_len = buf_len.min(file_size - pos); { let read_buf = r.buf(); - let mut offset = pos; - let mut remaining = read_len; + let mut offset = pos as usize; + let mut remaining = read_len as usize; let mut buf_offset = 0; while remaining > 0 { @@ -134,7 +129,7 @@ impl File for MemoryFile { Ok(c) } - fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { + fn pwrite(&self, pos: u64, buffer: Arc, c: Completion) -> Result { tracing::debug!( "pwrite(path={}): pos={}, size={}", self.path, @@ -147,7 +142,7 @@ impl File for MemoryFile { return Ok(c); } - let mut offset = pos; + let mut offset = pos as usize; let mut remaining = buf_len; let mut buf_offset = 0; let data = &buffer.as_slice(); @@ -158,7 +153,7 @@ impl File for MemoryFile { let bytes_to_write = remaining.min(PAGE_SIZE - page_offset); { - let page = self.get_or_allocate_page(page_no); + let page = self.get_or_allocate_page(page_no as u64); page[page_offset..page_offset + bytes_to_write] .copy_from_slice(&data[buf_offset..buf_offset + bytes_to_write]); } @@ -169,7 +164,7 @@ impl File for MemoryFile { } self.size - .set(core::cmp::max(pos + buf_len, self.size.get())); + .set(core::cmp::max(pos + buf_len as u64, self.size.get())); c.complete(buf_len as i32); Ok(c) @@ -182,13 +177,13 @@ impl File for MemoryFile { Ok(c) } - fn truncate(&self, len: usize, c: Completion) -> Result { + fn truncate(&self, len: u64, c: Completion) -> Result { tracing::debug!("truncate(path={}): len={}", self.path, len); if len < self.size.get() { // Truncate pages unsafe { let pages = &mut *self.pages.get(); - pages.retain(|&k, _| k * PAGE_SIZE < len); + pages.retain(|&k, _| k * PAGE_SIZE < len as usize); } } self.size.set(len); @@ -196,14 +191,14 @@ impl File for MemoryFile { Ok(c) } - fn pwritev(&self, pos: usize, buffers: Vec>, c: Completion) -> Result { + fn pwritev(&self, pos: u64, buffers: Vec>, c: Completion) -> Result { tracing::debug!( "pwritev(path={}): pos={}, buffers={:?}", self.path, pos, buffers.iter().map(|x| x.len()).collect::>() ); - let mut offset = pos; + let mut offset = pos as usize; let mut total_written = 0; for buffer in buffers { @@ -222,7 +217,7 @@ impl File for MemoryFile { let bytes_to_write = remaining.min(PAGE_SIZE - page_offset); { - let page = self.get_or_allocate_page(page_no); + let page = self.get_or_allocate_page(page_no as u64); page[page_offset..page_offset + bytes_to_write] .copy_from_slice(&data[buf_offset..buf_offset + bytes_to_write]); } @@ -235,23 +230,23 @@ impl File for MemoryFile { } c.complete(total_written as i32); self.size - .set(core::cmp::max(pos + total_written, self.size.get())); + .set(core::cmp::max(pos + total_written as u64, self.size.get())); Ok(c) } fn size(&self) -> Result { tracing::debug!("size(path={}): {}", self.path, self.size.get()); - Ok(self.size.get() as u64) + Ok(self.size.get()) } } impl MemoryFile { #[allow(clippy::mut_from_ref)] - fn get_or_allocate_page(&self, page_no: usize) -> &mut MemPage { + fn get_or_allocate_page(&self, page_no: u64) -> &mut MemPage { unsafe { let pages = &mut *self.pages.get(); pages - .entry(page_no) + .entry(page_no as usize) .or_insert_with(|| Box::new([0; PAGE_SIZE])) } } diff --git a/core/io/mod.rs b/core/io/mod.rs index 992eabac0..69faeba79 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -12,10 +12,10 @@ use std::{fmt::Debug, pin::Pin}; pub trait File: Send + Sync { fn lock_file(&self, exclusive: bool) -> Result<()>; fn unlock_file(&self) -> Result<()>; - fn pread(&self, pos: usize, c: Completion) -> Result; - fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result; + fn pread(&self, pos: u64, c: Completion) -> Result; + fn pwrite(&self, pos: u64, buffer: Arc, c: Completion) -> Result; fn sync(&self, c: Completion) -> Result; - fn pwritev(&self, pos: usize, buffers: Vec>, c: Completion) -> Result { + fn pwritev(&self, pos: u64, buffers: Vec>, c: Completion) -> Result { use std::sync::atomic::{AtomicUsize, Ordering}; if buffers.is_empty() { c.complete(0); @@ -56,12 +56,12 @@ pub trait File: Send + Sync { c.abort(); return Err(e); } - pos += len; + pos += len as u64; } Ok(c) } fn size(&self) -> Result; - fn truncate(&self, len: usize, c: Completion) -> Result; + fn truncate(&self, len: u64, c: Completion) -> Result; } #[derive(Debug, Copy, Clone, PartialEq)] @@ -87,7 +87,9 @@ pub trait IO: Clock + Send + Sync { // remove_file is used in the sync-engine fn remove_file(&self, path: &str) -> Result<()>; - fn run_once(&self) -> Result<()>; + fn run_once(&self) -> Result<()> { + Ok(()) + } fn wait_for_completion(&self, c: Completion) -> Result<()> { while !c.finished() { diff --git a/core/io/unix.rs b/core/io/unix.rs index 601f20c83..1f7fbb8c7 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -15,8 +15,6 @@ use std::{io::ErrorKind, sync::Arc}; use tracing::debug; use tracing::{instrument, trace, Level}; -/// UnixIO lives longer than any of the files it creates, so it is -/// safe to store references to it's internals in the UnixFiles pub struct UnixIO {} unsafe impl Send for UnixIO {} @@ -127,24 +125,6 @@ impl IO for UnixIO { } } -// enum CompletionCallback { -// Read(Arc>, Completion, usize), -// Write( -// Arc>, -// Completion, -// Arc, -// usize, -// ), -// Writev( -// Arc>, -// Completion, -// Vec>, -// usize, // absolute file offset -// usize, // buf index -// usize, // intra-buf offset -// ), -// } - pub struct UnixFile { file: Arc>, } @@ -192,7 +172,7 @@ impl File for UnixFile { } #[instrument(err, skip_all, level = Level::TRACE)] - fn pread(&self, pos: usize, c: Completion) -> Result { + fn pread(&self, pos: u64, c: Completion) -> Result { let file = self.file.lock(); let result = unsafe { let r = c.as_read(); @@ -217,7 +197,7 @@ impl File for UnixFile { } #[instrument(err, skip_all, level = Level::TRACE)] - fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { + fn pwrite(&self, pos: u64, buffer: Arc, c: Completion) -> Result { let file = self.file.lock(); let result = unsafe { libc::pwrite( @@ -241,7 +221,7 @@ impl File for UnixFile { #[instrument(err, skip_all, level = Level::TRACE)] fn pwritev( &self, - pos: usize, + pos: u64, buffers: Vec>, c: Completion, ) -> Result { @@ -251,7 +231,7 @@ impl File for UnixFile { } let file = self.file.lock(); - match try_pwritev_raw(file.as_raw_fd(), pos as u64, &buffers, 0, 0) { + match try_pwritev_raw(file.as_raw_fd(), pos, &buffers, 0, 0) { Ok(written) => { trace!("pwritev wrote {written}"); c.complete(written as i32); @@ -268,24 +248,21 @@ impl File for UnixFile { let file = self.file.lock(); let result = unsafe { - #[cfg(not(any(target_os = "macos", target_os = "ios")))] { libc::fsync(file.as_raw_fd()) } - + #[cfg(any(target_os = "macos", target_os = "ios"))] { libc::fcntl(file.as_raw_fd(), libc::F_FULLFSYNC) } - }; - + if result == -1 { let e = std::io::Error::last_os_error(); Err(e.into()) } else { - #[cfg(not(any(target_os = "macos", target_os = "ios")))] trace!("fsync"); @@ -304,9 +281,9 @@ impl File for UnixFile { } #[instrument(err, skip_all, level = Level::INFO)] - fn truncate(&self, len: usize, c: Completion) -> Result { + fn truncate(&self, len: u64, c: Completion) -> Result { let file = self.file.lock(); - let result = file.set_len(len as u64); + let result = file.set_len(len); match result { Ok(()) => { trace!("file truncated to len=({})", len); diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 1b9535e9e..a0eb7aec5 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -81,8 +81,6 @@ impl VfsMod { } } -// #Safety: -// the callback wrapper in the extension library is FnOnce, so we know /// # Safety /// the callback wrapper in the extension library is FnOnce, so we know /// that the into_raw/from_raw contract will hold @@ -121,7 +119,7 @@ impl File for VfsFileImpl { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result { + fn pread(&self, pos: u64, c: Completion) -> Result { if self.vfs.is_null() { c.complete(-1); return Err(LimboError::ExtensionError("VFS is null".to_string())); @@ -145,7 +143,7 @@ impl File for VfsFileImpl { Ok(c) } - fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { + fn pwrite(&self, pos: u64, buffer: Arc, c: Completion) -> Result { if self.vfs.is_null() { c.complete(-1); return Err(LimboError::ExtensionError("VFS is null".to_string())); @@ -192,7 +190,7 @@ impl File for VfsFileImpl { } } - fn truncate(&self, len: usize, c: Completion) -> Result { + fn truncate(&self, len: u64, c: Completion) -> Result { if self.vfs.is_null() { c.complete(-1); return Err(LimboError::ExtensionError("VFS is null".to_string())); diff --git a/core/io/windows.rs b/core/io/windows.rs new file mode 100644 index 000000000..77662bce2 --- /dev/null +++ b/core/io/windows.rs @@ -0,0 +1,115 @@ +use crate::{Clock, Completion, File, Instant, LimboError, OpenFlags, Result, IO}; +use parking_lot::RwLock; +use std::io::{Read, Seek, Write}; +use std::sync::Arc; +use tracing::{debug, instrument, trace, Level}; +pub struct WindowsIO {} + +impl WindowsIO { + pub fn new() -> Result { + debug!("Using IO backend 'syscall'"); + Ok(Self {}) + } +} + +impl IO for WindowsIO { + #[instrument(err, skip_all, level = Level::TRACE)] + fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result> { + trace!("open_file(path = {})", path); + let mut file = std::fs::File::options(); + file.read(true); + + if !flags.contains(OpenFlags::ReadOnly) { + file.write(true); + file.create(flags.contains(OpenFlags::Create)); + } + + let file = file.open(path)?; + Ok(Arc::new(WindowsFile { + file: RwLock::new(file), + })) + } + + #[instrument(err, skip_all, level = Level::TRACE)] + fn remove_file(&self, path: &str) -> Result<()> { + trace!("remove_file(path = {})", path); + Ok(std::fs::remove_file(path)?) + } + + #[instrument(err, skip_all, level = Level::TRACE)] + fn run_once(&self) -> Result<()> { + Ok(()) + } +} + +impl Clock for WindowsIO { + fn now(&self) -> Instant { + let now = chrono::Local::now(); + Instant { + secs: now.timestamp(), + micros: now.timestamp_subsec_micros(), + } + } +} + +pub struct WindowsFile { + file: RwLock, +} + +impl File for WindowsFile { + #[instrument(err, skip_all, level = Level::TRACE)] + fn lock_file(&self, exclusive: bool) -> Result<()> { + unimplemented!() + } + + #[instrument(err, skip_all, level = Level::TRACE)] + fn unlock_file(&self) -> Result<()> { + unimplemented!() + } + + #[instrument(skip(self, c), level = Level::TRACE)] + fn pread(&self, pos: u64, c: Completion) -> Result { + let mut file = self.file.write(); + file.seek(std::io::SeekFrom::Start(pos))?; + let nr = { + let r = c.as_read(); + let buf = r.buf(); + let buf = buf.as_mut_slice(); + file.read_exact(buf)?; + buf.len() as i32 + }; + c.complete(nr); + Ok(c) + } + + #[instrument(skip(self, c, buffer), level = Level::TRACE)] + fn pwrite(&self, pos: u64, buffer: Arc, c: Completion) -> Result { + let mut file = self.file.write(); + file.seek(std::io::SeekFrom::Start(pos))?; + let buf = buffer.as_slice(); + file.write_all(buf)?; + c.complete(buffer.len() as i32); + Ok(c) + } + + #[instrument(err, skip_all, level = Level::TRACE)] + fn sync(&self, c: Completion) -> Result { + let file = self.file.write(); + file.sync_all()?; + c.complete(0); + Ok(c) + } + + #[instrument(err, skip_all, level = Level::TRACE)] + fn truncate(&self, len: u64, c: Completion) -> Result { + let file = self.file.write(); + file.set_len(len)?; + c.complete(0); + Ok(c) + } + + fn size(&self) -> Result { + let file = self.file.read(); + Ok(file.metadata().unwrap().len()) + } +} diff --git a/core/storage/database.rs b/core/storage/database.rs index 9658c65bf..e1ec2ae76 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -91,7 +91,9 @@ impl DatabaseStorage for DatabaseFile { if !(512..=65536).contains(&size) || size & (size - 1) != 0 { return Err(LimboError::NotADB); } - let pos = (page_idx - 1) * size; + let Some(pos) = (page_idx as u64 - 1).checked_mul(size as u64) else { + return Err(LimboError::IntegerOverflow); + }; if let Some(ctx) = io_ctx.encryption_context() { let encryption_ctx = ctx.clone(); @@ -145,7 +147,9 @@ impl DatabaseStorage for DatabaseFile { assert!(buffer_size >= 512); assert!(buffer_size <= 65536); assert_eq!(buffer_size & (buffer_size - 1), 0); - let pos = (page_idx - 1) * buffer_size; + let Some(pos) = (page_idx as u64 - 1).checked_mul(buffer_size as u64) else { + return Err(LimboError::IntegerOverflow); + }; let buffer = { if let Some(ctx) = io_ctx.encryption_context() { encrypt_buffer(page_idx, buffer, ctx) @@ -169,7 +173,9 @@ impl DatabaseStorage for DatabaseFile { assert!(page_size <= 65536); assert_eq!(page_size & (page_size - 1), 0); - let pos = (first_page_idx - 1) * page_size; + let Some(pos) = (first_page_idx as u64 - 1).checked_mul(page_size as u64) else { + return Err(LimboError::IntegerOverflow); + }; let buffers = { if let Some(ctx) = io_ctx.encryption_context() { buffers @@ -198,7 +204,7 @@ impl DatabaseStorage for DatabaseFile { #[instrument(skip_all, level = Level::INFO)] fn truncate(&self, len: usize, c: Completion) -> Result { - let c = self.file.truncate(len, c)?; + let c = self.file.truncate(len as u64, c)?; Ok(c) } } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index ded06b0bd..0bc8d1e67 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1838,7 +1838,7 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result, io: &Arc, - offset: usize, + offset: u64, complete: Box, ) -> Result { tracing::trace!("begin_read_wal_frame_raw(offset={})", offset); @@ -1851,7 +1851,7 @@ pub fn begin_read_wal_frame_raw( pub fn begin_read_wal_frame( io: &Arc, - offset: usize, + offset: u64, buffer_pool: Arc, complete: Box, page_idx: usize, diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 927709caf..744dde86c 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1082,7 +1082,7 @@ impl Wal for WalFile { }); begin_read_wal_frame( &self.get_shared().file, - offset + WAL_FRAME_HEADER_SIZE, + offset + WAL_FRAME_HEADER_SIZE as u64, buffer_pool, complete, page_idx, @@ -1196,7 +1196,7 @@ impl Wal for WalFile { }); let c = begin_read_wal_frame( &self.get_shared().file, - offset + WAL_FRAME_HEADER_SIZE, + offset + WAL_FRAME_HEADER_SIZE as u64, buffer_pool, complete, page_id as usize, @@ -1460,14 +1460,14 @@ impl Wal for WalFile { let mut next_frame_id = self.max_frame + 1; // Build every frame in order, updating the rolling checksum for (idx, page) in pages.iter().enumerate() { - let page_id = page.get().id as u64; + let page_id = page.get().id; let plain = page.get_contents().as_ptr(); let data_to_write: std::borrow::Cow<[u8]> = { let io_ctx = self.io_ctx.borrow(); let ectx = io_ctx.encryption_context(); if let Some(ctx) = ectx.as_ref() { - Cow::Owned(ctx.encrypt_page(plain, page_id as usize)?) + Cow::Owned(ctx.encrypt_page(plain, page_id)?) } else { Cow::Borrowed(plain) } @@ -1581,11 +1581,10 @@ impl WalFile { self.get_shared().wal_header.lock().page_size } - fn frame_offset(&self, frame_id: u64) -> usize { + fn frame_offset(&self, frame_id: u64) -> u64 { assert!(frame_id > 0, "Frame ID must be 1-based"); let page_offset = (frame_id - 1) * (self.page_size() + WAL_FRAME_HEADER_SIZE as u32) as u64; - let offset = WAL_HEADER_SIZE as u64 + page_offset; - offset as usize + WAL_HEADER_SIZE as u64 + page_offset } #[allow(clippy::mut_from_ref)] @@ -2131,7 +2130,7 @@ impl WalFile { // schedule read of the page payload let c = begin_read_wal_frame( &self.get_shared().file, - offset + WAL_FRAME_HEADER_SIZE, + offset + WAL_FRAME_HEADER_SIZE as u64, self.buffer_pool.clone(), complete, page_id, @@ -2317,7 +2316,7 @@ pub mod test { let done = Rc::new(Cell::new(false)); let _done = done.clone(); let _ = file.file.truncate( - WAL_HEADER_SIZE, + WAL_HEADER_SIZE as u64, Completion::new_trunc(move |_| { let done = _done.clone(); done.set(true); diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 46bad5955..d8a325dae 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -370,7 +370,7 @@ struct SortedChunk { /// The chunk file. file: Arc, /// Offset of the start of chunk in file - start_offset: usize, + start_offset: u64, /// The size of this chunk file in bytes. chunk_size: usize, /// The read buffer. @@ -391,7 +391,7 @@ impl SortedChunk { fn new(file: Arc, start_offset: usize, buffer_size: usize) -> Self { Self { file, - start_offset, + start_offset: start_offset as u64, chunk_size: 0, buffer: Rc::new(RefCell::new(vec![0; buffer_size])), buffer_len: Rc::new(Cell::new(0)), @@ -522,7 +522,7 @@ impl SortedChunk { let c = Completion::new_read(read_buffer_ref, read_complete); let c = self .file - .pread(self.start_offset + self.total_bytes_read.get(), c)?; + .pread(self.start_offset + self.total_bytes_read.get() as u64, c)?; Ok(c) } diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index 0d644dc28..440323e74 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -150,7 +150,7 @@ impl File for SimulatorFile { self.inner.unlock_file() } - fn pread(&self, pos: usize, c: turso_core::Completion) -> Result { + fn pread(&self, pos: u64, c: turso_core::Completion) -> Result { self.nr_pread_calls.set(self.nr_pread_calls.get() + 1); if self.fault.get() { tracing::debug!("pread fault"); @@ -173,7 +173,7 @@ impl File for SimulatorFile { fn pwrite( &self, - pos: usize, + pos: u64, buffer: Arc, c: turso_core::Completion, ) -> Result { @@ -227,7 +227,7 @@ impl File for SimulatorFile { fn pwritev( &self, - pos: usize, + pos: u64, buffers: Vec>, c: turso_core::Completion, ) -> Result { @@ -257,7 +257,7 @@ impl File for SimulatorFile { self.inner.size() } - fn truncate(&self, len: usize, c: turso_core::Completion) -> Result { + fn truncate(&self, len: u64, c: turso_core::Completion) -> Result { if self.fault.get() { return Err(turso_core::LimboError::InternalError( FAULT_ERROR_MSG.into(), diff --git a/sync/engine/src/database_sync_operations.rs b/sync/engine/src/database_sync_operations.rs index 78691c405..5157071a0 100644 --- a/sync/engine/src/database_sync_operations.rs +++ b/sync/engine/src/database_sync_operations.rs @@ -81,7 +81,7 @@ pub async fn db_bootstrap( while !c.is_completed() { coro.yield_(ProtocolCommand::IO).await?; } - pos += content_len; + pos += content_len as u64; } if content.is_done()? { break; @@ -123,7 +123,7 @@ pub async fn wal_apply_from_file( // todo(sivukhin): we need to error out in case of partial read assert!(size as usize == WAL_FRAME_SIZE); }); - let c = frames_file.pread(offset as usize, c)?; + let c = frames_file.pread(offset, c)?; while !c.is_completed() { coro.yield_(ProtocolCommand::IO).await?; } @@ -243,7 +243,7 @@ pub async fn wal_pull_to_file_v1( while !c.is_completed() { coro.yield_(ProtocolCommand::IO).await?; } - offset += WAL_FRAME_SIZE; + offset += WAL_FRAME_SIZE as u64; } let c = Completion::new_sync(move |_| { @@ -323,7 +323,7 @@ pub async fn wal_pull_to_file_legacy( coro.yield_(ProtocolCommand::IO).await?; } - last_offset += WAL_FRAME_SIZE; + last_offset += WAL_FRAME_SIZE as u64; buffer_len = 0; start_frame += 1; @@ -974,7 +974,7 @@ pub async fn bootstrap_db_file_v1( }; assert!(rc as usize == 0); }); - let c = file.truncate(header.db_size as usize * PAGE_SIZE, c)?; + let c = file.truncate(header.db_size * PAGE_SIZE as u64, c)?; while !c.is_completed() { coro.yield_(ProtocolCommand::IO).await?; } @@ -984,7 +984,7 @@ pub async fn bootstrap_db_file_v1( while let Some(page_data) = wait_proto_message::(coro, &completion, &mut bytes).await? { - let offset = page_data.page_id as usize * PAGE_SIZE; + let offset = page_data.page_id * PAGE_SIZE as u64; let page = decode_page(&header, page_data)?; if page.len() != PAGE_SIZE { return Err(Error::DatabaseSyncEngineError(format!( @@ -1074,7 +1074,7 @@ pub async fn reset_wal_file( // let's truncate WAL file completely in order for this operation to safely execute on empty WAL in case of initial bootstrap phase 0 } else { - WAL_HEADER + WAL_FRAME_SIZE * (frames_count as usize) + WAL_HEADER as u64 + WAL_FRAME_SIZE as u64 * frames_count }; tracing::debug!("reset db wal to the size of {} frames", frames_count); let c = Completion::new_trunc(move |result| { diff --git a/sync/engine/src/io_operations.rs b/sync/engine/src/io_operations.rs index 777687e18..978e5ae34 100644 --- a/sync/engine/src/io_operations.rs +++ b/sync/engine/src/io_operations.rs @@ -59,7 +59,7 @@ impl IoOperations for Arc { }; tracing::debug!("file truncated: rc={}", rc); }); - let c = file.truncate(len, c)?; + let c = file.truncate(len as u64, c)?; while !c.is_completed() { coro.yield_(ProtocolCommand::IO).await?; } diff --git a/tests/integration/query_processing/test_btree.rs b/tests/integration/query_processing/test_btree.rs index 0d174e50c..1355d81e8 100644 --- a/tests/integration/query_processing/test_btree.rs +++ b/tests/integration/query_processing/test_btree.rs @@ -434,7 +434,7 @@ fn write_at(io: &impl IO, file: Arc, offset: usize, data: &[u8]) { // reference the buffer to keep alive for async io let _buf = _buf.clone(); }); - let result = file.pwrite(offset, buffer, completion).unwrap(); + let result = file.pwrite(offset as u64, buffer, completion).unwrap(); while !result.is_completed() { io.run_once().unwrap(); }