mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-23 09:54:26 +01:00
Merge 'Use u64 for file offsets in I/O and calculate such offsets in u64' from Preston Thorpe
Using `usize` to compute file offsets caps us at ~16GB on 32-bit systems. For example, with 4 KiB pages we can only address up to 1048576 pages; attempting the next page overflows a 32-bit usize and can wrap the write offset, corrupting data. Switching our I/O APIs and offset math to u64 avoids this overflow on 32-bit targets Closes #2791
This commit is contained in:
@@ -578,7 +578,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
|
|||||||
if !(512..=65536).contains(&size) || size & (size - 1) != 0 {
|
if !(512..=65536).contains(&size) || size & (size - 1) != 0 {
|
||||||
return Err(turso_core::LimboError::NotADB);
|
return Err(turso_core::LimboError::NotADB);
|
||||||
}
|
}
|
||||||
let pos = (page_idx - 1) * size;
|
let pos = (page_idx as u64 - 1) * size as u64;
|
||||||
self.file.pread(pos, c)
|
self.file.pread(pos, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -590,7 +590,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
|
|||||||
c: turso_core::Completion,
|
c: turso_core::Completion,
|
||||||
) -> turso_core::Result<turso_core::Completion> {
|
) -> turso_core::Result<turso_core::Completion> {
|
||||||
let size = buffer.len();
|
let size = buffer.len();
|
||||||
let pos = (page_idx - 1) * size;
|
let pos = (page_idx as u64 - 1) * size as u64;
|
||||||
self.file.pwrite(pos, buffer, c)
|
self.file.pwrite(pos, buffer, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -602,7 +602,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
|
|||||||
_io_ctx: &turso_core::IOContext,
|
_io_ctx: &turso_core::IOContext,
|
||||||
c: turso_core::Completion,
|
c: turso_core::Completion,
|
||||||
) -> turso_core::Result<turso_core::Completion> {
|
) -> turso_core::Result<turso_core::Completion> {
|
||||||
let pos = first_page_idx.saturating_sub(1) * page_size;
|
let pos = first_page_idx.saturating_sub(1) as u64 * page_size as u64;
|
||||||
let c = self.file.pwritev(pos, buffers, c)?;
|
let c = self.file.pwritev(pos, buffers, c)?;
|
||||||
Ok(c)
|
Ok(c)
|
||||||
}
|
}
|
||||||
@@ -620,7 +620,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
|
|||||||
len: usize,
|
len: usize,
|
||||||
c: turso_core::Completion,
|
c: turso_core::Completion,
|
||||||
) -> turso_core::Result<turso_core::Completion> {
|
) -> turso_core::Result<turso_core::Completion> {
|
||||||
let c = self.file.truncate(len, c)?;
|
let c = self.file.truncate(len as u64, c)?;
|
||||||
Ok(c)
|
Ok(c)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -68,9 +68,9 @@ impl File for GenericFile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip(self, c), level = Level::TRACE)]
|
#[instrument(skip(self, c), level = Level::TRACE)]
|
||||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
|
||||||
let mut file = self.file.write();
|
let mut file = self.file.write();
|
||||||
file.seek(std::io::SeekFrom::Start(pos as u64))?;
|
file.seek(std::io::SeekFrom::Start(pos))?;
|
||||||
let nr = {
|
let nr = {
|
||||||
let r = c.as_read();
|
let r = c.as_read();
|
||||||
let buf = r.buf();
|
let buf = r.buf();
|
||||||
@@ -83,9 +83,9 @@ impl File for GenericFile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip(self, c, buffer), level = Level::TRACE)]
|
#[instrument(skip(self, c, buffer), level = Level::TRACE)]
|
||||||
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
||||||
let mut file = self.file.write();
|
let mut file = self.file.write();
|
||||||
file.seek(std::io::SeekFrom::Start(pos as u64))?;
|
file.seek(std::io::SeekFrom::Start(pos))?;
|
||||||
let buf = buffer.as_slice();
|
let buf = buffer.as_slice();
|
||||||
file.write_all(buf)?;
|
file.write_all(buf)?;
|
||||||
c.complete(buffer.len() as i32);
|
c.complete(buffer.len() as i32);
|
||||||
@@ -101,9 +101,9 @@ impl File for GenericFile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
|
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
|
||||||
let file = self.file.write();
|
let file = self.file.write();
|
||||||
file.set_len(len as u64)?;
|
file.set_len(len)?;
|
||||||
c.complete(0);
|
c.complete(0);
|
||||||
Ok(c)
|
Ok(c)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -182,7 +182,7 @@ struct WritevState {
|
|||||||
/// File descriptor/id of the file we are writing to
|
/// File descriptor/id of the file we are writing to
|
||||||
file_id: Fd,
|
file_id: Fd,
|
||||||
/// absolute file offset for next submit
|
/// absolute file offset for next submit
|
||||||
file_pos: usize,
|
file_pos: u64,
|
||||||
/// current buffer index in `bufs`
|
/// current buffer index in `bufs`
|
||||||
current_buffer_idx: usize,
|
current_buffer_idx: usize,
|
||||||
/// intra-buffer offset
|
/// intra-buffer offset
|
||||||
@@ -198,7 +198,7 @@ struct WritevState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl WritevState {
|
impl WritevState {
|
||||||
fn new(file: &UringFile, pos: usize, bufs: Vec<Arc<crate::Buffer>>) -> Self {
|
fn new(file: &UringFile, pos: u64, bufs: Vec<Arc<crate::Buffer>>) -> Self {
|
||||||
let file_id = file
|
let file_id = file
|
||||||
.id()
|
.id()
|
||||||
.map(Fd::Fixed)
|
.map(Fd::Fixed)
|
||||||
@@ -223,23 +223,23 @@ impl WritevState {
|
|||||||
|
|
||||||
/// Advance (idx, off, pos) after written bytes
|
/// Advance (idx, off, pos) after written bytes
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn advance(&mut self, written: usize) {
|
fn advance(&mut self, written: u64) {
|
||||||
let mut remaining = written;
|
let mut remaining = written;
|
||||||
while remaining > 0 {
|
while remaining > 0 {
|
||||||
let current_buf_len = self.bufs[self.current_buffer_idx].len();
|
let current_buf_len = self.bufs[self.current_buffer_idx].len();
|
||||||
let left = current_buf_len - self.current_buffer_offset;
|
let left = current_buf_len - self.current_buffer_offset;
|
||||||
if remaining < left {
|
if remaining < left as u64 {
|
||||||
self.current_buffer_offset += remaining;
|
self.current_buffer_offset += remaining as usize;
|
||||||
self.file_pos += remaining;
|
self.file_pos += remaining;
|
||||||
remaining = 0;
|
remaining = 0;
|
||||||
} else {
|
} else {
|
||||||
remaining -= left;
|
remaining -= left as u64;
|
||||||
self.file_pos += left;
|
self.file_pos += left as u64;
|
||||||
self.current_buffer_idx += 1;
|
self.current_buffer_idx += 1;
|
||||||
self.current_buffer_offset = 0;
|
self.current_buffer_offset = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.total_written += written;
|
self.total_written += written as usize;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
@@ -400,7 +400,7 @@ impl WrappedIOUring {
|
|||||||
iov_allocation[0].iov_len as u32,
|
iov_allocation[0].iov_len as u32,
|
||||||
id as u16,
|
id as u16,
|
||||||
)
|
)
|
||||||
.offset(st.file_pos as u64)
|
.offset(st.file_pos)
|
||||||
.build()
|
.build()
|
||||||
.user_data(key)
|
.user_data(key)
|
||||||
} else {
|
} else {
|
||||||
@@ -409,7 +409,7 @@ impl WrappedIOUring {
|
|||||||
iov_allocation[0].iov_base as *const u8,
|
iov_allocation[0].iov_base as *const u8,
|
||||||
iov_allocation[0].iov_len as u32,
|
iov_allocation[0].iov_len as u32,
|
||||||
)
|
)
|
||||||
.offset(st.file_pos as u64)
|
.offset(st.file_pos)
|
||||||
.build()
|
.build()
|
||||||
.user_data(key)
|
.user_data(key)
|
||||||
}
|
}
|
||||||
@@ -425,7 +425,7 @@ impl WrappedIOUring {
|
|||||||
|
|
||||||
let entry = with_fd!(st.file_id, |fd| {
|
let entry = with_fd!(st.file_id, |fd| {
|
||||||
io_uring::opcode::Writev::new(fd, ptr, iov_count as u32)
|
io_uring::opcode::Writev::new(fd, ptr, iov_count as u32)
|
||||||
.offset(st.file_pos as u64)
|
.offset(st.file_pos)
|
||||||
.build()
|
.build()
|
||||||
.user_data(key)
|
.user_data(key)
|
||||||
});
|
});
|
||||||
@@ -443,8 +443,8 @@ impl WrappedIOUring {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let written = result as usize;
|
let written = result;
|
||||||
state.advance(written);
|
state.advance(written as u64);
|
||||||
match state.remaining() {
|
match state.remaining() {
|
||||||
0 => {
|
0 => {
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
@@ -643,7 +643,7 @@ impl File for UringFile {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
|
||||||
let r = c.as_read();
|
let r = c.as_read();
|
||||||
let mut io = self.io.borrow_mut();
|
let mut io = self.io.borrow_mut();
|
||||||
let read_e = {
|
let read_e = {
|
||||||
@@ -663,14 +663,14 @@ impl File for UringFile {
|
|||||||
io.debug_check_fixed(idx, ptr, len);
|
io.debug_check_fixed(idx, ptr, len);
|
||||||
}
|
}
|
||||||
io_uring::opcode::ReadFixed::new(fd, ptr, len as u32, idx as u16)
|
io_uring::opcode::ReadFixed::new(fd, ptr, len as u32, idx as u16)
|
||||||
.offset(pos as u64)
|
.offset(pos)
|
||||||
.build()
|
.build()
|
||||||
.user_data(get_key(c.clone()))
|
.user_data(get_key(c.clone()))
|
||||||
} else {
|
} else {
|
||||||
trace!("pread(pos = {}, length = {})", pos, len);
|
trace!("pread(pos = {}, length = {})", pos, len);
|
||||||
// Use Read opcode if fixed buffer is not available
|
// Use Read opcode if fixed buffer is not available
|
||||||
io_uring::opcode::Read::new(fd, buf.as_mut_ptr(), len as u32)
|
io_uring::opcode::Read::new(fd, buf.as_mut_ptr(), len as u32)
|
||||||
.offset(pos as u64)
|
.offset(pos)
|
||||||
.build()
|
.build()
|
||||||
.user_data(get_key(c.clone()))
|
.user_data(get_key(c.clone()))
|
||||||
}
|
}
|
||||||
@@ -680,7 +680,7 @@ impl File for UringFile {
|
|||||||
Ok(c)
|
Ok(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
||||||
let mut io = self.io.borrow_mut();
|
let mut io = self.io.borrow_mut();
|
||||||
let write = {
|
let write = {
|
||||||
let ptr = buffer.as_ptr();
|
let ptr = buffer.as_ptr();
|
||||||
@@ -698,13 +698,13 @@ impl File for UringFile {
|
|||||||
io.debug_check_fixed(idx, ptr, len);
|
io.debug_check_fixed(idx, ptr, len);
|
||||||
}
|
}
|
||||||
io_uring::opcode::WriteFixed::new(fd, ptr, len as u32, idx as u16)
|
io_uring::opcode::WriteFixed::new(fd, ptr, len as u32, idx as u16)
|
||||||
.offset(pos as u64)
|
.offset(pos)
|
||||||
.build()
|
.build()
|
||||||
.user_data(get_key(c.clone()))
|
.user_data(get_key(c.clone()))
|
||||||
} else {
|
} else {
|
||||||
trace!("pwrite(pos = {}, length = {})", pos, buffer.len());
|
trace!("pwrite(pos = {}, length = {})", pos, buffer.len());
|
||||||
io_uring::opcode::Write::new(fd, ptr, len as u32)
|
io_uring::opcode::Write::new(fd, ptr, len as u32)
|
||||||
.offset(pos as u64)
|
.offset(pos)
|
||||||
.build()
|
.build()
|
||||||
.user_data(get_key(c.clone()))
|
.user_data(get_key(c.clone()))
|
||||||
}
|
}
|
||||||
@@ -728,7 +728,7 @@ impl File for UringFile {
|
|||||||
|
|
||||||
fn pwritev(
|
fn pwritev(
|
||||||
&self,
|
&self,
|
||||||
pos: usize,
|
pos: u64,
|
||||||
bufs: Vec<Arc<crate::Buffer>>,
|
bufs: Vec<Arc<crate::Buffer>>,
|
||||||
c: Completion,
|
c: Completion,
|
||||||
) -> Result<Completion> {
|
) -> Result<Completion> {
|
||||||
@@ -748,10 +748,10 @@ impl File for UringFile {
|
|||||||
Ok(self.file.metadata()?.len())
|
Ok(self.file.metadata()?.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
|
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
|
||||||
let mut io = self.io.borrow_mut();
|
let mut io = self.io.borrow_mut();
|
||||||
let truncate = with_fd!(self, |fd| {
|
let truncate = with_fd!(self, |fd| {
|
||||||
io_uring::opcode::Ftruncate::new(fd, len as u64)
|
io_uring::opcode::Ftruncate::new(fd, len)
|
||||||
.build()
|
.build()
|
||||||
.user_data(get_key(c.clone()))
|
.user_data(get_key(c.clone()))
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -69,17 +69,12 @@ impl IO for MemoryIO {
|
|||||||
files.remove(path);
|
files.remove(path);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_once(&self) -> Result<()> {
|
|
||||||
// nop
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct MemoryFile {
|
pub struct MemoryFile {
|
||||||
path: String,
|
path: String,
|
||||||
pages: UnsafeCell<BTreeMap<usize, MemPage>>,
|
pages: UnsafeCell<BTreeMap<usize, MemPage>>,
|
||||||
size: Cell<usize>,
|
size: Cell<u64>,
|
||||||
}
|
}
|
||||||
unsafe impl Send for MemoryFile {}
|
unsafe impl Send for MemoryFile {}
|
||||||
unsafe impl Sync for MemoryFile {}
|
unsafe impl Sync for MemoryFile {}
|
||||||
@@ -92,10 +87,10 @@ impl File for MemoryFile {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
|
||||||
tracing::debug!("pread(path={}): pos={}", self.path, pos);
|
tracing::debug!("pread(path={}): pos={}", self.path, pos);
|
||||||
let r = c.as_read();
|
let r = c.as_read();
|
||||||
let buf_len = r.buf().len();
|
let buf_len = r.buf().len() as u64;
|
||||||
if buf_len == 0 {
|
if buf_len == 0 {
|
||||||
c.complete(0);
|
c.complete(0);
|
||||||
return Ok(c);
|
return Ok(c);
|
||||||
@@ -110,8 +105,8 @@ impl File for MemoryFile {
|
|||||||
let read_len = buf_len.min(file_size - pos);
|
let read_len = buf_len.min(file_size - pos);
|
||||||
{
|
{
|
||||||
let read_buf = r.buf();
|
let read_buf = r.buf();
|
||||||
let mut offset = pos;
|
let mut offset = pos as usize;
|
||||||
let mut remaining = read_len;
|
let mut remaining = read_len as usize;
|
||||||
let mut buf_offset = 0;
|
let mut buf_offset = 0;
|
||||||
|
|
||||||
while remaining > 0 {
|
while remaining > 0 {
|
||||||
@@ -134,7 +129,7 @@ impl File for MemoryFile {
|
|||||||
Ok(c)
|
Ok(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pwrite(&self, pos: usize, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
|
fn pwrite(&self, pos: u64, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
"pwrite(path={}): pos={}, size={}",
|
"pwrite(path={}): pos={}, size={}",
|
||||||
self.path,
|
self.path,
|
||||||
@@ -147,7 +142,7 @@ impl File for MemoryFile {
|
|||||||
return Ok(c);
|
return Ok(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut offset = pos;
|
let mut offset = pos as usize;
|
||||||
let mut remaining = buf_len;
|
let mut remaining = buf_len;
|
||||||
let mut buf_offset = 0;
|
let mut buf_offset = 0;
|
||||||
let data = &buffer.as_slice();
|
let data = &buffer.as_slice();
|
||||||
@@ -158,7 +153,7 @@ impl File for MemoryFile {
|
|||||||
let bytes_to_write = remaining.min(PAGE_SIZE - page_offset);
|
let bytes_to_write = remaining.min(PAGE_SIZE - page_offset);
|
||||||
|
|
||||||
{
|
{
|
||||||
let page = self.get_or_allocate_page(page_no);
|
let page = self.get_or_allocate_page(page_no as u64);
|
||||||
page[page_offset..page_offset + bytes_to_write]
|
page[page_offset..page_offset + bytes_to_write]
|
||||||
.copy_from_slice(&data[buf_offset..buf_offset + bytes_to_write]);
|
.copy_from_slice(&data[buf_offset..buf_offset + bytes_to_write]);
|
||||||
}
|
}
|
||||||
@@ -169,7 +164,7 @@ impl File for MemoryFile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.size
|
self.size
|
||||||
.set(core::cmp::max(pos + buf_len, self.size.get()));
|
.set(core::cmp::max(pos + buf_len as u64, self.size.get()));
|
||||||
|
|
||||||
c.complete(buf_len as i32);
|
c.complete(buf_len as i32);
|
||||||
Ok(c)
|
Ok(c)
|
||||||
@@ -182,13 +177,13 @@ impl File for MemoryFile {
|
|||||||
Ok(c)
|
Ok(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
|
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
|
||||||
tracing::debug!("truncate(path={}): len={}", self.path, len);
|
tracing::debug!("truncate(path={}): len={}", self.path, len);
|
||||||
if len < self.size.get() {
|
if len < self.size.get() {
|
||||||
// Truncate pages
|
// Truncate pages
|
||||||
unsafe {
|
unsafe {
|
||||||
let pages = &mut *self.pages.get();
|
let pages = &mut *self.pages.get();
|
||||||
pages.retain(|&k, _| k * PAGE_SIZE < len);
|
pages.retain(|&k, _| k * PAGE_SIZE < len as usize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.size.set(len);
|
self.size.set(len);
|
||||||
@@ -196,14 +191,14 @@ impl File for MemoryFile {
|
|||||||
Ok(c)
|
Ok(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pwritev(&self, pos: usize, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
|
fn pwritev(&self, pos: u64, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
"pwritev(path={}): pos={}, buffers={:?}",
|
"pwritev(path={}): pos={}, buffers={:?}",
|
||||||
self.path,
|
self.path,
|
||||||
pos,
|
pos,
|
||||||
buffers.iter().map(|x| x.len()).collect::<Vec<_>>()
|
buffers.iter().map(|x| x.len()).collect::<Vec<_>>()
|
||||||
);
|
);
|
||||||
let mut offset = pos;
|
let mut offset = pos as usize;
|
||||||
let mut total_written = 0;
|
let mut total_written = 0;
|
||||||
|
|
||||||
for buffer in buffers {
|
for buffer in buffers {
|
||||||
@@ -222,7 +217,7 @@ impl File for MemoryFile {
|
|||||||
let bytes_to_write = remaining.min(PAGE_SIZE - page_offset);
|
let bytes_to_write = remaining.min(PAGE_SIZE - page_offset);
|
||||||
|
|
||||||
{
|
{
|
||||||
let page = self.get_or_allocate_page(page_no);
|
let page = self.get_or_allocate_page(page_no as u64);
|
||||||
page[page_offset..page_offset + bytes_to_write]
|
page[page_offset..page_offset + bytes_to_write]
|
||||||
.copy_from_slice(&data[buf_offset..buf_offset + bytes_to_write]);
|
.copy_from_slice(&data[buf_offset..buf_offset + bytes_to_write]);
|
||||||
}
|
}
|
||||||
@@ -235,23 +230,23 @@ impl File for MemoryFile {
|
|||||||
}
|
}
|
||||||
c.complete(total_written as i32);
|
c.complete(total_written as i32);
|
||||||
self.size
|
self.size
|
||||||
.set(core::cmp::max(pos + total_written, self.size.get()));
|
.set(core::cmp::max(pos + total_written as u64, self.size.get()));
|
||||||
Ok(c)
|
Ok(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn size(&self) -> Result<u64> {
|
fn size(&self) -> Result<u64> {
|
||||||
tracing::debug!("size(path={}): {}", self.path, self.size.get());
|
tracing::debug!("size(path={}): {}", self.path, self.size.get());
|
||||||
Ok(self.size.get() as u64)
|
Ok(self.size.get())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MemoryFile {
|
impl MemoryFile {
|
||||||
#[allow(clippy::mut_from_ref)]
|
#[allow(clippy::mut_from_ref)]
|
||||||
fn get_or_allocate_page(&self, page_no: usize) -> &mut MemPage {
|
fn get_or_allocate_page(&self, page_no: u64) -> &mut MemPage {
|
||||||
unsafe {
|
unsafe {
|
||||||
let pages = &mut *self.pages.get();
|
let pages = &mut *self.pages.get();
|
||||||
pages
|
pages
|
||||||
.entry(page_no)
|
.entry(page_no as usize)
|
||||||
.or_insert_with(|| Box::new([0; PAGE_SIZE]))
|
.or_insert_with(|| Box::new([0; PAGE_SIZE]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,10 +12,10 @@ use std::{fmt::Debug, pin::Pin};
|
|||||||
pub trait File: Send + Sync {
|
pub trait File: Send + Sync {
|
||||||
fn lock_file(&self, exclusive: bool) -> Result<()>;
|
fn lock_file(&self, exclusive: bool) -> Result<()>;
|
||||||
fn unlock_file(&self) -> Result<()>;
|
fn unlock_file(&self) -> Result<()>;
|
||||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion>;
|
fn pread(&self, pos: u64, c: Completion) -> Result<Completion>;
|
||||||
fn pwrite(&self, pos: usize, buffer: Arc<Buffer>, c: Completion) -> Result<Completion>;
|
fn pwrite(&self, pos: u64, buffer: Arc<Buffer>, c: Completion) -> Result<Completion>;
|
||||||
fn sync(&self, c: Completion) -> Result<Completion>;
|
fn sync(&self, c: Completion) -> Result<Completion>;
|
||||||
fn pwritev(&self, pos: usize, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
|
fn pwritev(&self, pos: u64, buffers: Vec<Arc<Buffer>>, c: Completion) -> Result<Completion> {
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
if buffers.is_empty() {
|
if buffers.is_empty() {
|
||||||
c.complete(0);
|
c.complete(0);
|
||||||
@@ -56,12 +56,12 @@ pub trait File: Send + Sync {
|
|||||||
c.abort();
|
c.abort();
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
pos += len;
|
pos += len as u64;
|
||||||
}
|
}
|
||||||
Ok(c)
|
Ok(c)
|
||||||
}
|
}
|
||||||
fn size(&self) -> Result<u64>;
|
fn size(&self) -> Result<u64>;
|
||||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion>;
|
fn truncate(&self, len: u64, c: Completion) -> Result<Completion>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Copy, Clone, PartialEq)]
|
#[derive(Debug, Copy, Clone, PartialEq)]
|
||||||
@@ -87,7 +87,9 @@ pub trait IO: Clock + Send + Sync {
|
|||||||
// remove_file is used in the sync-engine
|
// remove_file is used in the sync-engine
|
||||||
fn remove_file(&self, path: &str) -> Result<()>;
|
fn remove_file(&self, path: &str) -> Result<()>;
|
||||||
|
|
||||||
fn run_once(&self) -> Result<()>;
|
fn run_once(&self) -> Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn wait_for_completion(&self, c: Completion) -> Result<()> {
|
fn wait_for_completion(&self, c: Completion) -> Result<()> {
|
||||||
while !c.finished() {
|
while !c.finished() {
|
||||||
|
|||||||
@@ -15,8 +15,6 @@ use std::{io::ErrorKind, sync::Arc};
|
|||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
use tracing::{instrument, trace, Level};
|
use tracing::{instrument, trace, Level};
|
||||||
|
|
||||||
/// UnixIO lives longer than any of the files it creates, so it is
|
|
||||||
/// safe to store references to it's internals in the UnixFiles
|
|
||||||
pub struct UnixIO {}
|
pub struct UnixIO {}
|
||||||
|
|
||||||
unsafe impl Send for UnixIO {}
|
unsafe impl Send for UnixIO {}
|
||||||
@@ -127,24 +125,6 @@ impl IO for UnixIO {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// enum CompletionCallback {
|
|
||||||
// Read(Arc<Mutex<std::fs::File>>, Completion, usize),
|
|
||||||
// Write(
|
|
||||||
// Arc<Mutex<std::fs::File>>,
|
|
||||||
// Completion,
|
|
||||||
// Arc<crate::Buffer>,
|
|
||||||
// usize,
|
|
||||||
// ),
|
|
||||||
// Writev(
|
|
||||||
// Arc<Mutex<std::fs::File>>,
|
|
||||||
// Completion,
|
|
||||||
// Vec<Arc<crate::Buffer>>,
|
|
||||||
// usize, // absolute file offset
|
|
||||||
// usize, // buf index
|
|
||||||
// usize, // intra-buf offset
|
|
||||||
// ),
|
|
||||||
// }
|
|
||||||
|
|
||||||
pub struct UnixFile {
|
pub struct UnixFile {
|
||||||
file: Arc<Mutex<std::fs::File>>,
|
file: Arc<Mutex<std::fs::File>>,
|
||||||
}
|
}
|
||||||
@@ -192,7 +172,7 @@ impl File for UnixFile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
|
||||||
let file = self.file.lock();
|
let file = self.file.lock();
|
||||||
let result = unsafe {
|
let result = unsafe {
|
||||||
let r = c.as_read();
|
let r = c.as_read();
|
||||||
@@ -217,7 +197,7 @@ impl File for UnixFile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||||
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
||||||
let file = self.file.lock();
|
let file = self.file.lock();
|
||||||
let result = unsafe {
|
let result = unsafe {
|
||||||
libc::pwrite(
|
libc::pwrite(
|
||||||
@@ -241,7 +221,7 @@ impl File for UnixFile {
|
|||||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||||
fn pwritev(
|
fn pwritev(
|
||||||
&self,
|
&self,
|
||||||
pos: usize,
|
pos: u64,
|
||||||
buffers: Vec<Arc<crate::Buffer>>,
|
buffers: Vec<Arc<crate::Buffer>>,
|
||||||
c: Completion,
|
c: Completion,
|
||||||
) -> Result<Completion> {
|
) -> Result<Completion> {
|
||||||
@@ -251,7 +231,7 @@ impl File for UnixFile {
|
|||||||
}
|
}
|
||||||
let file = self.file.lock();
|
let file = self.file.lock();
|
||||||
|
|
||||||
match try_pwritev_raw(file.as_raw_fd(), pos as u64, &buffers, 0, 0) {
|
match try_pwritev_raw(file.as_raw_fd(), pos, &buffers, 0, 0) {
|
||||||
Ok(written) => {
|
Ok(written) => {
|
||||||
trace!("pwritev wrote {written}");
|
trace!("pwritev wrote {written}");
|
||||||
c.complete(written as i32);
|
c.complete(written as i32);
|
||||||
@@ -268,24 +248,21 @@ impl File for UnixFile {
|
|||||||
let file = self.file.lock();
|
let file = self.file.lock();
|
||||||
|
|
||||||
let result = unsafe {
|
let result = unsafe {
|
||||||
|
|
||||||
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
|
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
|
||||||
{
|
{
|
||||||
libc::fsync(file.as_raw_fd())
|
libc::fsync(file.as_raw_fd())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(any(target_os = "macos", target_os = "ios"))]
|
#[cfg(any(target_os = "macos", target_os = "ios"))]
|
||||||
{
|
{
|
||||||
libc::fcntl(file.as_raw_fd(), libc::F_FULLFSYNC)
|
libc::fcntl(file.as_raw_fd(), libc::F_FULLFSYNC)
|
||||||
}
|
}
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if result == -1 {
|
if result == -1 {
|
||||||
let e = std::io::Error::last_os_error();
|
let e = std::io::Error::last_os_error();
|
||||||
Err(e.into())
|
Err(e.into())
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
|
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
|
||||||
trace!("fsync");
|
trace!("fsync");
|
||||||
|
|
||||||
@@ -304,9 +281,9 @@ impl File for UnixFile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(err, skip_all, level = Level::INFO)]
|
#[instrument(err, skip_all, level = Level::INFO)]
|
||||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
|
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
|
||||||
let file = self.file.lock();
|
let file = self.file.lock();
|
||||||
let result = file.set_len(len as u64);
|
let result = file.set_len(len);
|
||||||
match result {
|
match result {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
trace!("file truncated to len=({})", len);
|
trace!("file truncated to len=({})", len);
|
||||||
|
|||||||
@@ -81,8 +81,6 @@ impl VfsMod {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// #Safety:
|
|
||||||
// the callback wrapper in the extension library is FnOnce, so we know
|
|
||||||
/// # Safety
|
/// # Safety
|
||||||
/// the callback wrapper in the extension library is FnOnce, so we know
|
/// the callback wrapper in the extension library is FnOnce, so we know
|
||||||
/// that the into_raw/from_raw contract will hold
|
/// that the into_raw/from_raw contract will hold
|
||||||
@@ -121,7 +119,7 @@ impl File for VfsFileImpl {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
|
||||||
if self.vfs.is_null() {
|
if self.vfs.is_null() {
|
||||||
c.complete(-1);
|
c.complete(-1);
|
||||||
return Err(LimboError::ExtensionError("VFS is null".to_string()));
|
return Err(LimboError::ExtensionError("VFS is null".to_string()));
|
||||||
@@ -145,7 +143,7 @@ impl File for VfsFileImpl {
|
|||||||
Ok(c)
|
Ok(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pwrite(&self, pos: usize, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
|
fn pwrite(&self, pos: u64, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
|
||||||
if self.vfs.is_null() {
|
if self.vfs.is_null() {
|
||||||
c.complete(-1);
|
c.complete(-1);
|
||||||
return Err(LimboError::ExtensionError("VFS is null".to_string()));
|
return Err(LimboError::ExtensionError("VFS is null".to_string()));
|
||||||
@@ -192,7 +190,7 @@ impl File for VfsFileImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
|
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
|
||||||
if self.vfs.is_null() {
|
if self.vfs.is_null() {
|
||||||
c.complete(-1);
|
c.complete(-1);
|
||||||
return Err(LimboError::ExtensionError("VFS is null".to_string()));
|
return Err(LimboError::ExtensionError("VFS is null".to_string()));
|
||||||
|
|||||||
115
core/io/windows.rs
Normal file
115
core/io/windows.rs
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
use crate::{Clock, Completion, File, Instant, LimboError, OpenFlags, Result, IO};
|
||||||
|
use parking_lot::RwLock;
|
||||||
|
use std::io::{Read, Seek, Write};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tracing::{debug, instrument, trace, Level};
|
||||||
|
pub struct WindowsIO {}
|
||||||
|
|
||||||
|
impl WindowsIO {
|
||||||
|
pub fn new() -> Result<Self> {
|
||||||
|
debug!("Using IO backend 'syscall'");
|
||||||
|
Ok(Self {})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IO for WindowsIO {
|
||||||
|
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||||
|
fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result<Arc<dyn File>> {
|
||||||
|
trace!("open_file(path = {})", path);
|
||||||
|
let mut file = std::fs::File::options();
|
||||||
|
file.read(true);
|
||||||
|
|
||||||
|
if !flags.contains(OpenFlags::ReadOnly) {
|
||||||
|
file.write(true);
|
||||||
|
file.create(flags.contains(OpenFlags::Create));
|
||||||
|
}
|
||||||
|
|
||||||
|
let file = file.open(path)?;
|
||||||
|
Ok(Arc::new(WindowsFile {
|
||||||
|
file: RwLock::new(file),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||||
|
fn remove_file(&self, path: &str) -> Result<()> {
|
||||||
|
trace!("remove_file(path = {})", path);
|
||||||
|
Ok(std::fs::remove_file(path)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||||
|
fn run_once(&self) -> Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clock for WindowsIO {
|
||||||
|
fn now(&self) -> Instant {
|
||||||
|
let now = chrono::Local::now();
|
||||||
|
Instant {
|
||||||
|
secs: now.timestamp(),
|
||||||
|
micros: now.timestamp_subsec_micros(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct WindowsFile {
|
||||||
|
file: RwLock<std::fs::File>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl File for WindowsFile {
|
||||||
|
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||||
|
fn lock_file(&self, exclusive: bool) -> Result<()> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||||
|
fn unlock_file(&self) -> Result<()> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(self, c), level = Level::TRACE)]
|
||||||
|
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
|
||||||
|
let mut file = self.file.write();
|
||||||
|
file.seek(std::io::SeekFrom::Start(pos))?;
|
||||||
|
let nr = {
|
||||||
|
let r = c.as_read();
|
||||||
|
let buf = r.buf();
|
||||||
|
let buf = buf.as_mut_slice();
|
||||||
|
file.read_exact(buf)?;
|
||||||
|
buf.len() as i32
|
||||||
|
};
|
||||||
|
c.complete(nr);
|
||||||
|
Ok(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(self, c, buffer), level = Level::TRACE)]
|
||||||
|
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
||||||
|
let mut file = self.file.write();
|
||||||
|
file.seek(std::io::SeekFrom::Start(pos))?;
|
||||||
|
let buf = buffer.as_slice();
|
||||||
|
file.write_all(buf)?;
|
||||||
|
c.complete(buffer.len() as i32);
|
||||||
|
Ok(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||||
|
fn sync(&self, c: Completion) -> Result<Completion> {
|
||||||
|
let file = self.file.write();
|
||||||
|
file.sync_all()?;
|
||||||
|
c.complete(0);
|
||||||
|
Ok(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||||
|
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
|
||||||
|
let file = self.file.write();
|
||||||
|
file.set_len(len)?;
|
||||||
|
c.complete(0);
|
||||||
|
Ok(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn size(&self) -> Result<u64> {
|
||||||
|
let file = self.file.read();
|
||||||
|
Ok(file.metadata().unwrap().len())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -91,7 +91,9 @@ impl DatabaseStorage for DatabaseFile {
|
|||||||
if !(512..=65536).contains(&size) || size & (size - 1) != 0 {
|
if !(512..=65536).contains(&size) || size & (size - 1) != 0 {
|
||||||
return Err(LimboError::NotADB);
|
return Err(LimboError::NotADB);
|
||||||
}
|
}
|
||||||
let pos = (page_idx - 1) * size;
|
let Some(pos) = (page_idx as u64 - 1).checked_mul(size as u64) else {
|
||||||
|
return Err(LimboError::IntegerOverflow);
|
||||||
|
};
|
||||||
|
|
||||||
if let Some(ctx) = io_ctx.encryption_context() {
|
if let Some(ctx) = io_ctx.encryption_context() {
|
||||||
let encryption_ctx = ctx.clone();
|
let encryption_ctx = ctx.clone();
|
||||||
@@ -145,7 +147,9 @@ impl DatabaseStorage for DatabaseFile {
|
|||||||
assert!(buffer_size >= 512);
|
assert!(buffer_size >= 512);
|
||||||
assert!(buffer_size <= 65536);
|
assert!(buffer_size <= 65536);
|
||||||
assert_eq!(buffer_size & (buffer_size - 1), 0);
|
assert_eq!(buffer_size & (buffer_size - 1), 0);
|
||||||
let pos = (page_idx - 1) * buffer_size;
|
let Some(pos) = (page_idx as u64 - 1).checked_mul(buffer_size as u64) else {
|
||||||
|
return Err(LimboError::IntegerOverflow);
|
||||||
|
};
|
||||||
let buffer = {
|
let buffer = {
|
||||||
if let Some(ctx) = io_ctx.encryption_context() {
|
if let Some(ctx) = io_ctx.encryption_context() {
|
||||||
encrypt_buffer(page_idx, buffer, ctx)
|
encrypt_buffer(page_idx, buffer, ctx)
|
||||||
@@ -169,7 +173,9 @@ impl DatabaseStorage for DatabaseFile {
|
|||||||
assert!(page_size <= 65536);
|
assert!(page_size <= 65536);
|
||||||
assert_eq!(page_size & (page_size - 1), 0);
|
assert_eq!(page_size & (page_size - 1), 0);
|
||||||
|
|
||||||
let pos = (first_page_idx - 1) * page_size;
|
let Some(pos) = (first_page_idx as u64 - 1).checked_mul(page_size as u64) else {
|
||||||
|
return Err(LimboError::IntegerOverflow);
|
||||||
|
};
|
||||||
let buffers = {
|
let buffers = {
|
||||||
if let Some(ctx) = io_ctx.encryption_context() {
|
if let Some(ctx) = io_ctx.encryption_context() {
|
||||||
buffers
|
buffers
|
||||||
@@ -198,7 +204,7 @@ impl DatabaseStorage for DatabaseFile {
|
|||||||
|
|
||||||
#[instrument(skip_all, level = Level::INFO)]
|
#[instrument(skip_all, level = Level::INFO)]
|
||||||
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
|
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
|
||||||
let c = self.file.truncate(len, c)?;
|
let c = self.file.truncate(len as u64, c)?;
|
||||||
Ok(c)
|
Ok(c)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1838,7 +1838,7 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
|
|||||||
pub fn begin_read_wal_frame_raw(
|
pub fn begin_read_wal_frame_raw(
|
||||||
buffer_pool: &Arc<BufferPool>,
|
buffer_pool: &Arc<BufferPool>,
|
||||||
io: &Arc<dyn File>,
|
io: &Arc<dyn File>,
|
||||||
offset: usize,
|
offset: u64,
|
||||||
complete: Box<ReadComplete>,
|
complete: Box<ReadComplete>,
|
||||||
) -> Result<Completion> {
|
) -> Result<Completion> {
|
||||||
tracing::trace!("begin_read_wal_frame_raw(offset={})", offset);
|
tracing::trace!("begin_read_wal_frame_raw(offset={})", offset);
|
||||||
@@ -1851,7 +1851,7 @@ pub fn begin_read_wal_frame_raw(
|
|||||||
|
|
||||||
pub fn begin_read_wal_frame(
|
pub fn begin_read_wal_frame(
|
||||||
io: &Arc<dyn File>,
|
io: &Arc<dyn File>,
|
||||||
offset: usize,
|
offset: u64,
|
||||||
buffer_pool: Arc<BufferPool>,
|
buffer_pool: Arc<BufferPool>,
|
||||||
complete: Box<ReadComplete>,
|
complete: Box<ReadComplete>,
|
||||||
page_idx: usize,
|
page_idx: usize,
|
||||||
|
|||||||
@@ -1082,7 +1082,7 @@ impl Wal for WalFile {
|
|||||||
});
|
});
|
||||||
begin_read_wal_frame(
|
begin_read_wal_frame(
|
||||||
&self.get_shared().file,
|
&self.get_shared().file,
|
||||||
offset + WAL_FRAME_HEADER_SIZE,
|
offset + WAL_FRAME_HEADER_SIZE as u64,
|
||||||
buffer_pool,
|
buffer_pool,
|
||||||
complete,
|
complete,
|
||||||
page_idx,
|
page_idx,
|
||||||
@@ -1196,7 +1196,7 @@ impl Wal for WalFile {
|
|||||||
});
|
});
|
||||||
let c = begin_read_wal_frame(
|
let c = begin_read_wal_frame(
|
||||||
&self.get_shared().file,
|
&self.get_shared().file,
|
||||||
offset + WAL_FRAME_HEADER_SIZE,
|
offset + WAL_FRAME_HEADER_SIZE as u64,
|
||||||
buffer_pool,
|
buffer_pool,
|
||||||
complete,
|
complete,
|
||||||
page_id as usize,
|
page_id as usize,
|
||||||
@@ -1460,14 +1460,14 @@ impl Wal for WalFile {
|
|||||||
let mut next_frame_id = self.max_frame + 1;
|
let mut next_frame_id = self.max_frame + 1;
|
||||||
// Build every frame in order, updating the rolling checksum
|
// Build every frame in order, updating the rolling checksum
|
||||||
for (idx, page) in pages.iter().enumerate() {
|
for (idx, page) in pages.iter().enumerate() {
|
||||||
let page_id = page.get().id as u64;
|
let page_id = page.get().id;
|
||||||
let plain = page.get_contents().as_ptr();
|
let plain = page.get_contents().as_ptr();
|
||||||
|
|
||||||
let data_to_write: std::borrow::Cow<[u8]> = {
|
let data_to_write: std::borrow::Cow<[u8]> = {
|
||||||
let io_ctx = self.io_ctx.borrow();
|
let io_ctx = self.io_ctx.borrow();
|
||||||
let ectx = io_ctx.encryption_context();
|
let ectx = io_ctx.encryption_context();
|
||||||
if let Some(ctx) = ectx.as_ref() {
|
if let Some(ctx) = ectx.as_ref() {
|
||||||
Cow::Owned(ctx.encrypt_page(plain, page_id as usize)?)
|
Cow::Owned(ctx.encrypt_page(plain, page_id)?)
|
||||||
} else {
|
} else {
|
||||||
Cow::Borrowed(plain)
|
Cow::Borrowed(plain)
|
||||||
}
|
}
|
||||||
@@ -1581,11 +1581,10 @@ impl WalFile {
|
|||||||
self.get_shared().wal_header.lock().page_size
|
self.get_shared().wal_header.lock().page_size
|
||||||
}
|
}
|
||||||
|
|
||||||
fn frame_offset(&self, frame_id: u64) -> usize {
|
fn frame_offset(&self, frame_id: u64) -> u64 {
|
||||||
assert!(frame_id > 0, "Frame ID must be 1-based");
|
assert!(frame_id > 0, "Frame ID must be 1-based");
|
||||||
let page_offset = (frame_id - 1) * (self.page_size() + WAL_FRAME_HEADER_SIZE as u32) as u64;
|
let page_offset = (frame_id - 1) * (self.page_size() + WAL_FRAME_HEADER_SIZE as u32) as u64;
|
||||||
let offset = WAL_HEADER_SIZE as u64 + page_offset;
|
WAL_HEADER_SIZE as u64 + page_offset
|
||||||
offset as usize
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::mut_from_ref)]
|
#[allow(clippy::mut_from_ref)]
|
||||||
@@ -2131,7 +2130,7 @@ impl WalFile {
|
|||||||
// schedule read of the page payload
|
// schedule read of the page payload
|
||||||
let c = begin_read_wal_frame(
|
let c = begin_read_wal_frame(
|
||||||
&self.get_shared().file,
|
&self.get_shared().file,
|
||||||
offset + WAL_FRAME_HEADER_SIZE,
|
offset + WAL_FRAME_HEADER_SIZE as u64,
|
||||||
self.buffer_pool.clone(),
|
self.buffer_pool.clone(),
|
||||||
complete,
|
complete,
|
||||||
page_id,
|
page_id,
|
||||||
@@ -2317,7 +2316,7 @@ pub mod test {
|
|||||||
let done = Rc::new(Cell::new(false));
|
let done = Rc::new(Cell::new(false));
|
||||||
let _done = done.clone();
|
let _done = done.clone();
|
||||||
let _ = file.file.truncate(
|
let _ = file.file.truncate(
|
||||||
WAL_HEADER_SIZE,
|
WAL_HEADER_SIZE as u64,
|
||||||
Completion::new_trunc(move |_| {
|
Completion::new_trunc(move |_| {
|
||||||
let done = _done.clone();
|
let done = _done.clone();
|
||||||
done.set(true);
|
done.set(true);
|
||||||
|
|||||||
@@ -370,7 +370,7 @@ struct SortedChunk {
|
|||||||
/// The chunk file.
|
/// The chunk file.
|
||||||
file: Arc<dyn File>,
|
file: Arc<dyn File>,
|
||||||
/// Offset of the start of chunk in file
|
/// Offset of the start of chunk in file
|
||||||
start_offset: usize,
|
start_offset: u64,
|
||||||
/// The size of this chunk file in bytes.
|
/// The size of this chunk file in bytes.
|
||||||
chunk_size: usize,
|
chunk_size: usize,
|
||||||
/// The read buffer.
|
/// The read buffer.
|
||||||
@@ -391,7 +391,7 @@ impl SortedChunk {
|
|||||||
fn new(file: Arc<dyn File>, start_offset: usize, buffer_size: usize) -> Self {
|
fn new(file: Arc<dyn File>, start_offset: usize, buffer_size: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
file,
|
file,
|
||||||
start_offset,
|
start_offset: start_offset as u64,
|
||||||
chunk_size: 0,
|
chunk_size: 0,
|
||||||
buffer: Rc::new(RefCell::new(vec![0; buffer_size])),
|
buffer: Rc::new(RefCell::new(vec![0; buffer_size])),
|
||||||
buffer_len: Rc::new(Cell::new(0)),
|
buffer_len: Rc::new(Cell::new(0)),
|
||||||
@@ -522,7 +522,7 @@ impl SortedChunk {
|
|||||||
let c = Completion::new_read(read_buffer_ref, read_complete);
|
let c = Completion::new_read(read_buffer_ref, read_complete);
|
||||||
let c = self
|
let c = self
|
||||||
.file
|
.file
|
||||||
.pread(self.start_offset + self.total_bytes_read.get(), c)?;
|
.pread(self.start_offset + self.total_bytes_read.get() as u64, c)?;
|
||||||
Ok(c)
|
Ok(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -150,7 +150,7 @@ impl File for SimulatorFile {
|
|||||||
self.inner.unlock_file()
|
self.inner.unlock_file()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pread(&self, pos: usize, c: turso_core::Completion) -> Result<turso_core::Completion> {
|
fn pread(&self, pos: u64, c: turso_core::Completion) -> Result<turso_core::Completion> {
|
||||||
self.nr_pread_calls.set(self.nr_pread_calls.get() + 1);
|
self.nr_pread_calls.set(self.nr_pread_calls.get() + 1);
|
||||||
if self.fault.get() {
|
if self.fault.get() {
|
||||||
tracing::debug!("pread fault");
|
tracing::debug!("pread fault");
|
||||||
@@ -173,7 +173,7 @@ impl File for SimulatorFile {
|
|||||||
|
|
||||||
fn pwrite(
|
fn pwrite(
|
||||||
&self,
|
&self,
|
||||||
pos: usize,
|
pos: u64,
|
||||||
buffer: Arc<turso_core::Buffer>,
|
buffer: Arc<turso_core::Buffer>,
|
||||||
c: turso_core::Completion,
|
c: turso_core::Completion,
|
||||||
) -> Result<turso_core::Completion> {
|
) -> Result<turso_core::Completion> {
|
||||||
@@ -227,7 +227,7 @@ impl File for SimulatorFile {
|
|||||||
|
|
||||||
fn pwritev(
|
fn pwritev(
|
||||||
&self,
|
&self,
|
||||||
pos: usize,
|
pos: u64,
|
||||||
buffers: Vec<Arc<turso_core::Buffer>>,
|
buffers: Vec<Arc<turso_core::Buffer>>,
|
||||||
c: turso_core::Completion,
|
c: turso_core::Completion,
|
||||||
) -> Result<turso_core::Completion> {
|
) -> Result<turso_core::Completion> {
|
||||||
@@ -257,7 +257,7 @@ impl File for SimulatorFile {
|
|||||||
self.inner.size()
|
self.inner.size()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn truncate(&self, len: usize, c: turso_core::Completion) -> Result<turso_core::Completion> {
|
fn truncate(&self, len: u64, c: turso_core::Completion) -> Result<turso_core::Completion> {
|
||||||
if self.fault.get() {
|
if self.fault.get() {
|
||||||
return Err(turso_core::LimboError::InternalError(
|
return Err(turso_core::LimboError::InternalError(
|
||||||
FAULT_ERROR_MSG.into(),
|
FAULT_ERROR_MSG.into(),
|
||||||
|
|||||||
@@ -81,7 +81,7 @@ pub async fn db_bootstrap<C: ProtocolIO, Ctx>(
|
|||||||
while !c.is_completed() {
|
while !c.is_completed() {
|
||||||
coro.yield_(ProtocolCommand::IO).await?;
|
coro.yield_(ProtocolCommand::IO).await?;
|
||||||
}
|
}
|
||||||
pos += content_len;
|
pos += content_len as u64;
|
||||||
}
|
}
|
||||||
if content.is_done()? {
|
if content.is_done()? {
|
||||||
break;
|
break;
|
||||||
@@ -123,7 +123,7 @@ pub async fn wal_apply_from_file<Ctx>(
|
|||||||
// todo(sivukhin): we need to error out in case of partial read
|
// todo(sivukhin): we need to error out in case of partial read
|
||||||
assert!(size as usize == WAL_FRAME_SIZE);
|
assert!(size as usize == WAL_FRAME_SIZE);
|
||||||
});
|
});
|
||||||
let c = frames_file.pread(offset as usize, c)?;
|
let c = frames_file.pread(offset, c)?;
|
||||||
while !c.is_completed() {
|
while !c.is_completed() {
|
||||||
coro.yield_(ProtocolCommand::IO).await?;
|
coro.yield_(ProtocolCommand::IO).await?;
|
||||||
}
|
}
|
||||||
@@ -243,7 +243,7 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
|
|||||||
while !c.is_completed() {
|
while !c.is_completed() {
|
||||||
coro.yield_(ProtocolCommand::IO).await?;
|
coro.yield_(ProtocolCommand::IO).await?;
|
||||||
}
|
}
|
||||||
offset += WAL_FRAME_SIZE;
|
offset += WAL_FRAME_SIZE as u64;
|
||||||
}
|
}
|
||||||
|
|
||||||
let c = Completion::new_sync(move |_| {
|
let c = Completion::new_sync(move |_| {
|
||||||
@@ -323,7 +323,7 @@ pub async fn wal_pull_to_file_legacy<C: ProtocolIO, Ctx>(
|
|||||||
coro.yield_(ProtocolCommand::IO).await?;
|
coro.yield_(ProtocolCommand::IO).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
last_offset += WAL_FRAME_SIZE;
|
last_offset += WAL_FRAME_SIZE as u64;
|
||||||
buffer_len = 0;
|
buffer_len = 0;
|
||||||
start_frame += 1;
|
start_frame += 1;
|
||||||
|
|
||||||
@@ -974,7 +974,7 @@ pub async fn bootstrap_db_file_v1<C: ProtocolIO, Ctx>(
|
|||||||
};
|
};
|
||||||
assert!(rc as usize == 0);
|
assert!(rc as usize == 0);
|
||||||
});
|
});
|
||||||
let c = file.truncate(header.db_size as usize * PAGE_SIZE, c)?;
|
let c = file.truncate(header.db_size * PAGE_SIZE as u64, c)?;
|
||||||
while !c.is_completed() {
|
while !c.is_completed() {
|
||||||
coro.yield_(ProtocolCommand::IO).await?;
|
coro.yield_(ProtocolCommand::IO).await?;
|
||||||
}
|
}
|
||||||
@@ -984,7 +984,7 @@ pub async fn bootstrap_db_file_v1<C: ProtocolIO, Ctx>(
|
|||||||
while let Some(page_data) =
|
while let Some(page_data) =
|
||||||
wait_proto_message::<Ctx, PageData>(coro, &completion, &mut bytes).await?
|
wait_proto_message::<Ctx, PageData>(coro, &completion, &mut bytes).await?
|
||||||
{
|
{
|
||||||
let offset = page_data.page_id as usize * PAGE_SIZE;
|
let offset = page_data.page_id * PAGE_SIZE as u64;
|
||||||
let page = decode_page(&header, page_data)?;
|
let page = decode_page(&header, page_data)?;
|
||||||
if page.len() != PAGE_SIZE {
|
if page.len() != PAGE_SIZE {
|
||||||
return Err(Error::DatabaseSyncEngineError(format!(
|
return Err(Error::DatabaseSyncEngineError(format!(
|
||||||
@@ -1074,7 +1074,7 @@ pub async fn reset_wal_file<Ctx>(
|
|||||||
// let's truncate WAL file completely in order for this operation to safely execute on empty WAL in case of initial bootstrap phase
|
// let's truncate WAL file completely in order for this operation to safely execute on empty WAL in case of initial bootstrap phase
|
||||||
0
|
0
|
||||||
} else {
|
} else {
|
||||||
WAL_HEADER + WAL_FRAME_SIZE * (frames_count as usize)
|
WAL_HEADER as u64 + WAL_FRAME_SIZE as u64 * frames_count
|
||||||
};
|
};
|
||||||
tracing::debug!("reset db wal to the size of {} frames", frames_count);
|
tracing::debug!("reset db wal to the size of {} frames", frames_count);
|
||||||
let c = Completion::new_trunc(move |result| {
|
let c = Completion::new_trunc(move |result| {
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ impl IoOperations for Arc<dyn turso_core::IO> {
|
|||||||
};
|
};
|
||||||
tracing::debug!("file truncated: rc={}", rc);
|
tracing::debug!("file truncated: rc={}", rc);
|
||||||
});
|
});
|
||||||
let c = file.truncate(len, c)?;
|
let c = file.truncate(len as u64, c)?;
|
||||||
while !c.is_completed() {
|
while !c.is_completed() {
|
||||||
coro.yield_(ProtocolCommand::IO).await?;
|
coro.yield_(ProtocolCommand::IO).await?;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -434,7 +434,7 @@ fn write_at(io: &impl IO, file: Arc<dyn File>, offset: usize, data: &[u8]) {
|
|||||||
// reference the buffer to keep alive for async io
|
// reference the buffer to keep alive for async io
|
||||||
let _buf = _buf.clone();
|
let _buf = _buf.clone();
|
||||||
});
|
});
|
||||||
let result = file.pwrite(offset, buffer, completion).unwrap();
|
let result = file.pwrite(offset as u64, buffer, completion).unwrap();
|
||||||
while !result.is_completed() {
|
while !result.is_completed() {
|
||||||
io.run_once().unwrap();
|
io.run_once().unwrap();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user