io: assert small I/O

Let's assert **for now** that we do not read/write less bytes than
expected. This should be fixed to retrigger several reads/writes if we
couldn't read/write enough but for now let's assert.
This commit is contained in:
Pere Diaz Bou
2025-07-14 10:19:41 +02:00
parent 8f8d582b4a
commit 88ff218810
5 changed files with 50 additions and 28 deletions

View File

@@ -54,7 +54,7 @@ pub trait IO: Clock + Send + Sync {
fn get_memory_io(&self) -> Arc<MemoryIO>;
}
pub type Complete = dyn Fn(Arc<RefCell<Buffer>>);
pub type Complete = dyn Fn(Arc<RefCell<Buffer>>, i32);
pub type WriteComplete = dyn Fn(i32);
pub type SyncComplete = dyn Fn(i32);
@@ -88,7 +88,7 @@ impl Completion {
pub fn complete(&self, result: i32) {
match &self.completion_type {
CompletionType::Read(r) => r.complete(),
CompletionType::Read(r) => r.complete(result),
CompletionType::Write(w) => w.complete(result),
CompletionType::Sync(s) => s.complete(result), // fix
};
@@ -126,8 +126,8 @@ impl ReadCompletion {
self.buf.borrow_mut()
}
pub fn complete(&self) {
(self.complete)(self.buf.clone());
pub fn complete(&self, bytes_read: i32) {
(self.complete)(self.buf.clone(), bytes_read);
}
}

View File

@@ -1,6 +1,6 @@
use crate::error::LimboError;
use crate::io::common;
use crate::Result;
use crate::{error::LimboError, turso_assert};
use super::{Completion, File, MemoryIO, OpenFlags, IO};
use crate::io::clock::{Clock, Instant};
@@ -345,9 +345,10 @@ impl File for UnixFile<'_> {
let c = Arc::new(c);
match result {
Ok(n) => {
let r = c.as_read();
trace!("pread n: {}", n);
// Read succeeded immediately
c.complete(0);
c.complete(n as i32);
Ok(c)
}
Err(Errno::AGAIN) => {

View File

@@ -61,7 +61,7 @@ use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::pager::Pager;
use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype};
use crate::{File, Result, WalFileShared};
use crate::{turso_assert, File, Result, WalFileShared};
use std::cell::{RefCell, UnsafeCell};
use std::collections::HashMap;
use std::mem::MaybeUninit;
@@ -732,7 +732,12 @@ pub fn begin_read_page(
});
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(RefCell::new(Buffer::new(buf, drop_fn)));
let complete = Box::new(move |buf: Arc<RefCell<Buffer>>| {
let complete = Box::new(move |buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
let buf_len = buf.borrow().len();
turso_assert!(
bytes_read == buf_len as i32,
"read({bytes_read}) less than expected({buf_len})"
);
let page = page.clone();
if finish_read_page(page_idx, buf, page.clone()).is_err() {
page.set_error();
@@ -793,9 +798,10 @@ pub fn begin_write_btree_page(
*clone_counter.borrow_mut() -= 1;
page_finish.clear_dirty();
if bytes_written < buf_len as i32 {
tracing::error!("wrote({bytes_written}) less than expected({buf_len})");
}
turso_assert!(
bytes_written == buf_len as i32,
"wrote({bytes_written}) less than expected({buf_len})"
);
})
};
let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete)));
@@ -1330,9 +1336,14 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
}));
let wal_file_shared_for_completion = wal_file_shared_ret.clone();
let complete: Box<Complete> = Box::new(move |buf: Arc<RefCell<Buffer>>| {
let complete: Box<Complete> = Box::new(move |buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
let buf = buf.borrow();
let buf_slice = buf.as_slice();
turso_assert!(
bytes_read == buf_slice.len() as i32,
"read({bytes_read}) less than expected({})",
buf_slice.len()
);
let mut header_locked = header.lock();
// Read header
header_locked.magic =
@@ -1489,7 +1500,7 @@ pub fn begin_read_wal_frame(
io: &Arc<dyn File>,
offset: usize,
buffer_pool: Arc<BufferPool>,
complete: Box<dyn Fn(Arc<RefCell<Buffer>>)>,
complete: Box<dyn Fn(Arc<RefCell<Buffer>>, i32)>,
) -> Result<Arc<Completion>> {
tracing::trace!("begin_read_wal_frame(offset={})", offset);
let buf = buffer_pool.get();
@@ -1586,9 +1597,10 @@ pub fn begin_write_wal_frame(
*clone_counter.borrow_mut() -= 1;
page_finish.clear_dirty();
if bytes_written < buf_len as i32 {
tracing::error!("wrote({bytes_written}) less than expected({buf_len})");
}
turso_assert!(
bytes_written == buf_len as i32,
"wrote({bytes_written}) less than expected({buf_len})"
);
})
};
#[allow(clippy::arc_with_non_send_sync)]
@@ -1608,7 +1620,7 @@ pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<
let buffer = {
let drop_fn = Rc::new(|_buf| {});
let mut buffer = Buffer::allocate(512, drop_fn);
let mut buffer = Buffer::allocate(WAL_HEADER_SIZE, drop_fn);
let buf = buffer.as_mut_slice();
buf[0..4].copy_from_slice(&header.magic.to_be_bytes());
@@ -1626,11 +1638,10 @@ pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<
let write_complete = {
Box::new(move |bytes_written: i32| {
if bytes_written < WAL_HEADER_SIZE as i32 {
tracing::error!(
"wal header wrote({bytes_written}) less than expected({WAL_HEADER_SIZE})"
);
}
turso_assert!(
bytes_written == WAL_HEADER_SIZE as i32,
"wal header wrote({bytes_written}) less than expected({WAL_HEADER_SIZE})"
);
})
};
#[allow(clippy::arc_with_non_send_sync)]

View File

@@ -23,7 +23,7 @@ use crate::storage::sqlite3_ondisk::{
begin_read_wal_frame, begin_write_wal_frame, finish_read_page, WAL_FRAME_HEADER_SIZE,
WAL_HEADER_SIZE,
};
use crate::{Buffer, LimboError, Result};
use crate::{turso_assert, Buffer, LimboError, Result};
use crate::{Completion, Page};
use self::sqlite3_ondisk::{checksum_wal, PageContent, WAL_MAGIC_BE, WAL_MAGIC_LE};
@@ -618,7 +618,12 @@ impl Wal for WalFile {
let offset = self.frame_offset(frame_id);
page.set_locked();
let frame = page.clone();
let complete = Box::new(move |buf: Arc<RefCell<Buffer>>| {
let complete = Box::new(move |buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
let buf_len = buf.borrow().len();
turso_assert!(
bytes_read == buf_len as i32,
"read({bytes_read}) less than expected({buf_len})"
);
let frame = frame.clone();
finish_read_page(page.get().id, buf, frame).unwrap();
});
@@ -641,8 +646,13 @@ impl Wal for WalFile {
) -> Result<Arc<Completion>> {
tracing::debug!("read_frame({})", frame_id);
let offset = self.frame_offset(frame_id);
let complete = Box::new(move |buf: Arc<RefCell<Buffer>>| {
let complete = Box::new(move |buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
let buf = buf.borrow();
let buf_len = buf.len();
turso_assert!(
bytes_read == buf_len as i32,
"read({bytes_read}) less than expected({buf_len})"
);
let buf_ptr = buf.as_ptr();
unsafe {
std::ptr::copy_nonoverlapping(buf_ptr, frame, frame_len as usize);

View File

@@ -123,13 +123,13 @@ impl File for SimulatorFile {
unreachable!();
};
let before = self.rng.borrow_mut().gen_bool(0.5);
let dummy_complete = Box::new(|_| {});
let dummy_complete = Box::new(|_, _| {});
let prev_complete = std::mem::replace(&mut read_completion.complete, dummy_complete);
let new_complete = move |res| {
let new_complete = move |res, bytes_read| {
if before {
std::thread::sleep(latency);
}
(prev_complete)(res);
(prev_complete)(res, bytes_read);
if !before {
std::thread::sleep(latency);
}