Merge 'Implement IO latency correctly in simulator' from Pedro Muniz

Closes #1998. Now I am queuing IO to be run at some later point in time.
Also Latency for some reason is slowing the simulator a looot for some
runs.
This PR also adds a StateMachine variant in Balance as now `free_pages`
is correctly an asynchronous function. With this change, we now need a
state machine in the `Pager` so that `free_pages` can be reentrant.
Lastly, I removed a timeout in `checkpoint_shutdown` as it was
triggering constantly due to the slightly increased latency.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #1943
This commit is contained in:
Jussi Saurio
2025-07-17 21:05:17 +03:00
23 changed files with 393 additions and 203 deletions

View File

@@ -75,7 +75,7 @@ jobs:
with:
prefix-key: "v1-rust" # can be updated if we need to reset caches due to non-trivial change in the dependencies (for example, custom env var were set for single workspace project)
- name: Install the project
run: ./scripts/run-sim --maximum-tests 1000 loop -n 10 -s
run: ./scripts/run-sim --maximum-tests 1000 --min-tick 10 --max-tick 50 loop -n 10 -s
test-limbo:
runs-on: blacksmith-4vcpu-ubuntu-2404

View File

@@ -672,7 +672,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
return Err(turso_core::LimboError::NotADB);
}
let pos = (page_idx - 1) * size;
self.file.pread(pos, c)?;
self.file.pread(pos, c.into())?;
Ok(())
}
@@ -684,12 +684,12 @@ impl turso_core::DatabaseStorage for DatabaseFile {
) -> turso_core::Result<()> {
let size = buffer.borrow().len();
let pos = (page_idx - 1) * size;
self.file.pwrite(pos, buffer, c)?;
self.file.pwrite(pos, buffer, c.into())?;
Ok(())
}
fn sync(&self, c: turso_core::Completion) -> turso_core::Result<()> {
let _ = self.file.sync(c)?;
let _ = self.file.sync(c.into())?;
Ok(())
}

View File

@@ -213,7 +213,11 @@ impl turso_core::File for File {
Ok(())
}
fn pread(&self, pos: usize, c: turso_core::Completion) -> Result<Arc<turso_core::Completion>> {
fn pread(
&self,
pos: usize,
c: Arc<turso_core::Completion>,
) -> Result<Arc<turso_core::Completion>> {
let r = match c.completion_type {
turso_core::CompletionType::Read(ref r) => r,
_ => unreachable!(),
@@ -225,14 +229,14 @@ impl turso_core::File for File {
};
r.complete(nr);
#[allow(clippy::arc_with_non_send_sync)]
Ok(Arc::new(c))
Ok(c)
}
fn pwrite(
&self,
pos: usize,
buffer: Arc<std::cell::RefCell<turso_core::Buffer>>,
c: turso_core::Completion,
c: Arc<turso_core::Completion>,
) -> Result<Arc<turso_core::Completion>> {
let w = match c.completion_type {
turso_core::CompletionType::Write(ref w) => w,
@@ -243,14 +247,14 @@ impl turso_core::File for File {
self.vfs.pwrite(self.fd, buf, pos);
w.complete(buf.len() as i32);
#[allow(clippy::arc_with_non_send_sync)]
Ok(Arc::new(c))
Ok(c)
}
fn sync(&self, c: turso_core::Completion) -> Result<Arc<turso_core::Completion>> {
fn sync(&self, c: Arc<turso_core::Completion>) -> Result<Arc<turso_core::Completion>> {
self.vfs.sync(self.fd);
c.complete(0);
#[allow(clippy::arc_with_non_send_sync)]
Ok(Arc::new(c))
Ok(c)
}
fn size(&self) -> Result<u64> {
@@ -351,7 +355,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
return Err(turso_core::LimboError::NotADB);
}
let pos = (page_idx - 1) * size;
self.file.pread(pos, c)?;
self.file.pread(pos, c.into())?;
Ok(())
}
@@ -363,12 +367,12 @@ impl turso_core::DatabaseStorage for DatabaseFile {
) -> Result<()> {
let size = buffer.borrow().len();
let pos = (page_idx - 1) * size;
self.file.pwrite(pos, buffer, c)?;
self.file.pwrite(pos, buffer, c.into())?;
Ok(())
}
fn sync(&self, c: turso_core::Completion) -> Result<()> {
let _ = self.file.sync(c)?;
let _ = self.file.sync(c.into())?;
Ok(())
}

View File

@@ -4,6 +4,15 @@ pub struct Instant {
pub micros: u32,
}
impl<T: chrono::TimeZone> From<chrono::DateTime<T>> for Instant {
fn from(value: chrono::DateTime<T>) -> Self {
Instant {
secs: value.timestamp(),
micros: value.timestamp_subsec_micros(),
}
}
}
pub trait Clock {
fn now(&self) -> Instant;
}

View File

@@ -86,7 +86,7 @@ impl File for GenericFile {
Ok(())
}
fn pread(&self, pos: usize, c: Completion) -> Result<Arc<Completion>> {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
{
@@ -99,14 +99,14 @@ impl File for GenericFile {
file.read_exact(buf)?;
}
c.complete(0);
Ok(Arc::new(c))
Ok(c)
}
fn pwrite(
&self,
pos: usize,
buffer: Arc<RefCell<crate::Buffer>>,
c: Completion,
c: Arc<Completion>,
) -> Result<Arc<Completion>> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
@@ -114,14 +114,14 @@ impl File for GenericFile {
let buf = buf.as_slice();
file.write_all(buf)?;
c.complete(buf.len() as i32);
Ok(Arc::new(c))
Ok(c)
}
fn sync(&self, c: Completion) -> Result<Arc<Completion>> {
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
let mut file = self.file.borrow_mut();
file.sync_all().map_err(|err| LimboError::IOError(err))?;
c.complete(0);
Ok(Arc::new(c))
Ok(c)
}
fn size(&self) -> Result<u64> {

View File

@@ -302,7 +302,7 @@ impl File for UringFile {
Ok(())
}
fn pread(&self, pos: usize, c: Completion) -> Result<Arc<Completion>> {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>> {
let r = c.as_read();
trace!("pread(pos = {}, length = {})", pos, r.buf().len());
let mut io = self.io.borrow_mut();
@@ -317,7 +317,6 @@ impl File for UringFile {
.user_data(io.ring.get_key())
})
};
let c = Arc::new(c);
io.ring.submit_entry(&read_e, c.clone());
Ok(c)
}
@@ -326,7 +325,7 @@ impl File for UringFile {
&self,
pos: usize,
buffer: Arc<RefCell<crate::Buffer>>,
c: Completion,
c: Arc<Completion>,
) -> Result<Arc<Completion>> {
let mut io = self.io.borrow_mut();
let write = {
@@ -339,7 +338,6 @@ impl File for UringFile {
.user_data(io.ring.get_key())
})
};
let c = Arc::new(c);
let c_uring = c.clone();
io.ring.submit_entry(
&write,
@@ -354,7 +352,7 @@ impl File for UringFile {
Ok(c)
}
fn sync(&self, c: Completion) -> Result<Arc<Completion>> {
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
let mut io = self.io.borrow_mut();
trace!("sync()");
let sync = with_fd!(self, |fd| {
@@ -362,7 +360,6 @@ impl File for UringFile {
.build()
.user_data(io.ring.get_key())
});
let c = Arc::new(c);
io.ring.submit_entry(&sync, c.clone());
Ok(c)
}

View File

@@ -83,8 +83,7 @@ impl File for MemoryFile {
Ok(())
}
fn pread(&self, pos: usize, c: Completion) -> Result<Arc<Completion>> {
let c = Arc::new(c);
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>> {
let r = c.as_read();
let buf_len = r.buf().len();
if buf_len == 0 {
@@ -129,9 +128,8 @@ impl File for MemoryFile {
&self,
pos: usize,
buffer: Arc<RefCell<Buffer>>,
c: Completion,
c: Arc<Completion>,
) -> Result<Arc<Completion>> {
let c = Arc::new(c);
let buf = buffer.borrow();
let buf_len = buf.len();
if buf_len == 0 {
@@ -167,10 +165,10 @@ impl File for MemoryFile {
Ok(c)
}
fn sync(&self, c: Completion) -> Result<Arc<Completion>> {
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
// no-op
c.complete(0);
Ok(Arc::new(c))
Ok(c)
}
fn size(&self) -> Result<u64> {

View File

@@ -14,14 +14,14 @@ use std::{
pub trait File: Send + Sync {
fn lock_file(&self, exclusive: bool) -> Result<()>;
fn unlock_file(&self) -> Result<()>;
fn pread(&self, pos: usize, c: Completion) -> Result<Arc<Completion>>;
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>>;
fn pwrite(
&self,
pos: usize,
buffer: Arc<RefCell<Buffer>>,
c: Completion,
c: Arc<Completion>,
) -> Result<Arc<Completion>>;
fn sync(&self, c: Completion) -> Result<Arc<Completion>>;
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>>;
fn size(&self) -> Result<u64>;
}

View File

@@ -335,14 +335,13 @@ impl File for UnixFile<'_> {
}
#[instrument(err, skip_all, level = Level::INFO)]
fn pread(&self, pos: usize, c: Completion) -> Result<Arc<Completion>> {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>> {
let file = self.file.borrow();
let result = {
let r = c.as_read();
let mut buf = r.buf_mut();
rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64)
};
let c = Arc::new(c);
match result {
Ok(n) => {
trace!("pread n: {}", n);
@@ -373,14 +372,13 @@ impl File for UnixFile<'_> {
&self,
pos: usize,
buffer: Arc<RefCell<crate::Buffer>>,
c: Completion,
c: Arc<Completion>,
) -> Result<Arc<Completion>> {
let file = self.file.borrow();
let result = {
let buf = buffer.borrow();
rustix::io::pwrite(file.as_fd(), buf.as_slice(), pos as u64)
};
let c = Arc::new(c);
match result {
Ok(n) => {
trace!("pwrite n: {}", n);
@@ -405,10 +403,9 @@ impl File for UnixFile<'_> {
}
#[instrument(err, skip_all, level = Level::INFO)]
fn sync(&self, c: Completion) -> Result<Arc<Completion>> {
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
let file = self.file.borrow();
let result = fs::fsync(file.as_fd());
let c = Arc::new(c);
match result {
Ok(()) => {
trace!("fsync");

View File

@@ -98,7 +98,7 @@ impl File for VfsFileImpl {
Ok(())
}
fn pread(&self, pos: usize, c: Completion) -> Result<Arc<Completion>> {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>> {
let r = match c.completion_type {
CompletionType::Read(ref r) => r,
_ => unreachable!(),
@@ -113,7 +113,7 @@ impl File for VfsFileImpl {
Err(LimboError::ExtensionError("pread failed".to_string()))
} else {
c.complete(result);
Ok(Arc::new(c))
Ok(c)
}
}
@@ -121,7 +121,7 @@ impl File for VfsFileImpl {
&self,
pos: usize,
buffer: Arc<RefCell<Buffer>>,
c: Completion,
c: Arc<Completion>,
) -> Result<Arc<Completion>> {
let buf = buffer.borrow();
let count = buf.as_slice().len();
@@ -142,18 +142,18 @@ impl File for VfsFileImpl {
Err(LimboError::ExtensionError("pwrite failed".to_string()))
} else {
c.complete(result);
Ok(Arc::new(c))
Ok(c)
}
}
fn sync(&self, c: Completion) -> Result<Arc<Completion>> {
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
let vfs = unsafe { &*self.vfs };
let result = unsafe { (vfs.sync)(self.file) };
if result < 0 {
Err(LimboError::ExtensionError("sync failed".to_string()))
} else {
c.complete(0);
Ok(Arc::new(c))
Ok(c)
}
}

View File

@@ -81,7 +81,7 @@ impl File for WindowsFile {
unimplemented!()
}
fn pread(&self, pos: usize, c: Completion) -> Result<Arc<Completion>> {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
let nr = {
@@ -92,14 +92,14 @@ impl File for WindowsFile {
buf.len() as i32
};
c.complete(nr);
Ok(Arc::new(c))
Ok(c)
}
fn pwrite(
&self,
pos: usize,
buffer: Arc<RefCell<crate::Buffer>>,
c: Completion,
c: Arc<Completion>,
) -> Result<Arc<Completion>> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
@@ -107,14 +107,14 @@ impl File for WindowsFile {
let buf = buf.as_slice();
file.write_all(buf)?;
c.complete(buffer.borrow().len() as i32);
Ok(Arc::new(c))
Ok(c)
}
fn sync(&self, c: Completion) -> Result<Arc<Completion>> {
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
let file = self.file.borrow_mut();
file.sync_all().map_err(LimboError::IOError)?;
c.complete(0);
Ok(Arc::new(c))
Ok(c)
}
fn size(&self) -> Result<u64> {

View File

@@ -223,6 +223,10 @@ struct DeleteInfo {
enum WriteState {
Start,
BalanceStart,
BalanceFreePages {
curr_page: usize,
sibling_count_new: usize,
},
/// Choose which sibling pages to balance (max 3).
/// Generally, the siblings involved will be the page that triggered the balancing and its left and right siblings.
/// The exceptions are:
@@ -2255,6 +2259,7 @@ impl BTreeCursor {
}
}
WriteState::BalanceStart
| WriteState::BalanceFreePages { .. }
| WriteState::BalanceNonRootPickSiblings
| WriteState::BalanceNonRootDoBalancing => {
return_if_io!(self.balance());
@@ -2333,7 +2338,9 @@ impl BTreeCursor {
self.stack.pop();
return_if_io!(self.balance_non_root());
}
WriteState::BalanceNonRootPickSiblings | WriteState::BalanceNonRootDoBalancing => {
WriteState::BalanceNonRootPickSiblings
| WriteState::BalanceNonRootDoBalancing
| WriteState::BalanceFreePages { .. } => {
return_if_io!(self.balance_non_root());
}
WriteState::Finish => return Ok(IOResult::Done(())),
@@ -2350,7 +2357,7 @@ impl BTreeCursor {
"Cursor must be in balancing state"
);
let state = self.state.write_info().expect("must be balancing").state;
tracing::debug!("balance_non_root(state={:?})", state);
tracing::debug!(?state);
let (next_write_state, result) = match state {
WriteState::Start => todo!(),
WriteState::BalanceStart => todo!(),
@@ -3326,13 +3333,38 @@ impl BTreeCursor {
right_page_id,
);
(
WriteState::BalanceFreePages {
curr_page: sibling_count_new,
sibling_count_new,
},
Ok(IOResult::Done(())),
)
}
WriteState::BalanceFreePages {
curr_page,
sibling_count_new,
} => {
let write_info = self.state.write_info().unwrap();
let mut balance_info: std::cell::RefMut<'_, Option<BalanceInfo>> =
write_info.balance_info.borrow_mut();
let balance_info = balance_info.as_mut().unwrap();
// We have to free pages that are not used anymore
for i in sibling_count_new..balance_info.sibling_count {
let page = balance_info.pages_to_balance[i].as_ref().unwrap();
self.pager
.free_page(Some(page.get().clone()), page.get().get().id)?;
if !((sibling_count_new..balance_info.sibling_count).contains(&curr_page)) {
(WriteState::BalanceStart, Ok(IOResult::Done(())))
} else {
let page = balance_info.pages_to_balance[curr_page].as_ref().unwrap();
return_if_io!(self
.pager
.free_page(Some(page.get().clone()), page.get().get().id));
(
WriteState::BalanceFreePages {
curr_page: curr_page + 1,
sibling_count_new,
},
Ok(IOResult::Done(())),
)
}
(WriteState::BalanceStart, Ok(IOResult::Done(())))
}
WriteState::Finish => todo!(),
};
@@ -4686,7 +4718,7 @@ impl BTreeCursor {
let contents = page.get().contents.as_ref().unwrap();
let next = contents.read_u32(0);
self.pager.free_page(Some(page), next_page as usize)?;
return_if_io!(self.pager.free_page(Some(page), next_page as usize));
if next != 0 {
self.overflow_state = Some(OverflowState::ProcessPage { next_page: next });
@@ -4873,7 +4905,7 @@ impl BTreeCursor {
let page = self.stack.top();
let page_id = page.get().get().id;
self.pager.free_page(Some(page.get()), page_id)?;
return_if_io!(self.pager.free_page(Some(page.get()), page_id));
if self.stack.has_parent() {
self.stack.pop();

View File

@@ -42,7 +42,7 @@ impl DatabaseStorage for DatabaseFile {
return Err(LimboError::NotADB);
}
let pos = (page_idx - 1) * size;
self.file.pread(pos, c)?;
self.file.pread(pos, c.into())?;
Ok(())
}
@@ -59,13 +59,13 @@ impl DatabaseStorage for DatabaseFile {
assert!(buffer_size <= 65536);
assert_eq!(buffer_size & (buffer_size - 1), 0);
let pos = (page_idx - 1) * buffer_size;
self.file.pwrite(pos, buffer, c)?;
self.file.pwrite(pos, buffer, c.into())?;
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
fn sync(&self, c: Completion) -> Result<()> {
let _ = self.file.sync(c)?;
let _ = self.file.sync(c.into())?;
Ok(())
}
@@ -102,7 +102,7 @@ impl DatabaseStorage for FileMemoryStorage {
return Err(LimboError::NotADB);
}
let pos = (page_idx - 1) * size;
self.file.pread(pos, c)?;
self.file.pread(pos, c.into())?;
Ok(())
}
@@ -118,13 +118,13 @@ impl DatabaseStorage for FileMemoryStorage {
assert!(buffer_size <= 65536);
assert_eq!(buffer_size & (buffer_size - 1), 0);
let pos = (page_idx - 1) * buffer_size;
self.file.pwrite(pos, buffer, c)?;
self.file.pwrite(pos, buffer, c.into())?;
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
fn sync(&self, c: Completion) -> Result<()> {
let _ = self.file.sync(c)?;
let _ = self.file.sync(c.into())?;
Ok(())
}

View File

@@ -280,6 +280,7 @@ pub struct Pager {
/// to change it.
page_size: Cell<Option<u32>>,
reserved_space: OnceCell<u8>,
free_page_state: RefCell<FreePageState>,
}
#[derive(Debug, Copy, Clone)]
@@ -303,6 +304,18 @@ enum AllocatePage1State {
Done,
}
#[derive(Debug, Clone)]
enum FreePageState {
Start,
AddToTrunk {
page: Arc<Page>,
trunk_page: Option<Arc<Page>>,
},
NewTrunk {
page: Arc<Page>,
},
}
impl Pager {
pub fn new(
db_file: Arc<dyn DatabaseStorage>,
@@ -342,6 +355,7 @@ impl Pager {
state: CacheFlushState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
}),
free_page_state: RefCell::new(FreePageState::Start),
})
}
@@ -1016,18 +1030,21 @@ impl Pager {
}
pub fn checkpoint_shutdown(&self, wal_checkpoint_disabled: bool) -> Result<()> {
let mut attempts = 0;
let mut _attempts = 0;
{
let mut wal = self.wal.borrow_mut();
// fsync the wal syncronously before beginning checkpoint
while let Ok(IOResult::IO) = wal.sync() {
if attempts >= 10 {
return Err(LimboError::InternalError(
"Failed to fsync WAL before final checkpoint, fd likely closed".into(),
));
}
// TODO: for now forget about timeouts as they fail regularly in SIM
// need to think of a better way to do this
// if attempts >= 1000 {
// return Err(LimboError::InternalError(
// "Failed to fsync WAL before final checkpoint, fd likely closed".into(),
// ));
// }
self.io.run_once()?;
attempts += 1;
_attempts += 1;
}
}
self.wal_checkpoint(wal_checkpoint_disabled)?;
@@ -1070,7 +1087,7 @@ impl Pager {
// Providing a page is optional, if provided it will be used to avoid reading the page from disk.
// This is implemented in accordance with sqlite freepage2() function.
#[instrument(skip_all, level = Level::INFO)]
pub fn free_page(&self, page: Option<PageRef>, page_id: usize) -> Result<()> {
pub fn free_page(&self, page: Option<PageRef>, page_id: usize) -> Result<IOResult<()>> {
tracing::trace!("free_page(page_id={})", page_id);
const TRUNK_PAGE_HEADER_SIZE: usize = 8;
const LEAF_ENTRY_SIZE: usize = 4;
@@ -1079,65 +1096,100 @@ impl Pager {
const TRUNK_PAGE_NEXT_PAGE_OFFSET: usize = 0; // Offset to next trunk page pointer
const TRUNK_PAGE_LEAF_COUNT_OFFSET: usize = 4; // Offset to leaf count
if page_id < 2 || page_id > header_accessor::get_database_size(self)? as usize {
return Err(LimboError::Corrupt(format!(
"Invalid page number {page_id} for free operation"
)));
}
let mut state = self.free_page_state.borrow_mut();
tracing::debug!(?state);
loop {
match &mut *state {
FreePageState::Start => {
if page_id < 2 || page_id > header_accessor::get_database_size(self)? as usize {
return Err(LimboError::Corrupt(format!(
"Invalid page number {page_id} for free operation"
)));
}
let page = match page {
Some(page) => {
assert_eq!(page.get().id, page_id, "Page id mismatch");
page
}
None => self.read_page(page_id)?,
};
let page = match page.clone() {
Some(page) => {
assert_eq!(page.get().id, page_id, "Page id mismatch");
page
}
None => self.read_page(page_id)?,
};
header_accessor::set_freelist_pages(
self,
header_accessor::get_freelist_pages(self)? + 1,
)?;
header_accessor::set_freelist_pages(self, header_accessor::get_freelist_pages(self)? + 1)?;
let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?;
let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?;
if trunk_page_id != 0 {
*state = FreePageState::AddToTrunk {
page,
trunk_page: None,
};
} else {
*state = FreePageState::NewTrunk { page };
}
}
FreePageState::AddToTrunk { page, trunk_page } => {
let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?;
if trunk_page.is_none() {
// Add as leaf to current trunk
trunk_page.replace(self.read_page(trunk_page_id as usize)?);
}
let trunk_page = trunk_page.as_ref().unwrap();
if trunk_page.is_locked() || !trunk_page.is_loaded() {
return Ok(IOResult::IO);
}
if trunk_page_id != 0 {
// Add as leaf to current trunk
let trunk_page = self.read_page(trunk_page_id as usize)?;
let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap();
let number_of_leaf_pages = trunk_page_contents.read_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET);
let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap();
let number_of_leaf_pages =
trunk_page_contents.read_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET);
// Reserve 2 slots for the trunk page header which is 8 bytes or 2*LEAF_ENTRY_SIZE
let max_free_list_entries = (self.usable_space() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS;
// Reserve 2 slots for the trunk page header which is 8 bytes or 2*LEAF_ENTRY_SIZE
let max_free_list_entries =
(self.usable_space() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS;
if number_of_leaf_pages < max_free_list_entries as u32 {
trunk_page.set_dirty();
self.add_dirty(trunk_page_id as usize);
if number_of_leaf_pages < max_free_list_entries as u32 {
trunk_page.set_dirty();
self.add_dirty(trunk_page_id as usize);
trunk_page_contents
.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, number_of_leaf_pages + 1);
trunk_page_contents.write_u32(
TRUNK_PAGE_HEADER_SIZE + (number_of_leaf_pages as usize * LEAF_ENTRY_SIZE),
page_id as u32,
);
page.clear_uptodate();
page.clear_loaded();
trunk_page_contents
.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, number_of_leaf_pages + 1);
trunk_page_contents.write_u32(
TRUNK_PAGE_HEADER_SIZE
+ (number_of_leaf_pages as usize * LEAF_ENTRY_SIZE),
page_id as u32,
);
page.clear_uptodate();
return Ok(());
break;
}
}
FreePageState::NewTrunk { page } => {
if page.is_locked() || !page.is_loaded() {
return Ok(IOResult::IO);
}
// If we get here, need to make this page a new trunk
page.set_dirty();
self.add_dirty(page_id);
let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?;
let contents = page.get().contents.as_mut().unwrap();
// Point to previous trunk
contents.write_u32(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id);
// Zero leaf count
contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0);
// Update page 1 to point to new trunk
header_accessor::set_freelist_trunk_page(self, page_id as u32)?;
// Clear flags
page.clear_uptodate();
break;
}
}
}
// If we get here, need to make this page a new trunk
page.set_dirty();
self.add_dirty(page_id);
let contents = page.get().contents.as_mut().unwrap();
// Point to previous trunk
contents.write_u32(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id);
// Zero leaf count
contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0);
// Update page 1 to point to new trunk
header_accessor::set_freelist_trunk_page(self, page_id as u32)?;
// Clear flags
page.clear_uptodate();
page.clear_loaded();
Ok(())
*state = FreePageState::Start;
Ok(IOResult::Done(()))
}
#[instrument(skip_all, level = Level::INFO)]

View File

@@ -752,12 +752,13 @@ pub fn begin_read_page(
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
pub fn finish_read_page(
page_idx: usize,
buffer_ref: Arc<RefCell<Buffer>>,
page: PageRef,
) -> Result<()> {
tracing::trace!("finish_read_btree_page(page_idx = {})", page_idx);
tracing::trace!(page_idx);
let pos = if page_idx == DATABASE_HEADER_PAGE_ID {
DATABASE_HEADER_SIZE
} else {
@@ -1490,7 +1491,7 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
buf_for_pread,
complete,
)));
file.pread(0, c)?;
file.pread(0, c.into())?;
Ok(wal_file_shared_ret)
}
@@ -1510,7 +1511,7 @@ pub fn begin_read_wal_frame(
let buf = Arc::new(RefCell::new(Buffer::new(buf, drop_fn)));
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new(CompletionType::Read(ReadCompletion::new(buf, complete)));
let c = io.pread(offset, c)?;
let c = io.pread(offset, c.into())?;
Ok(c)
}
@@ -1604,7 +1605,7 @@ pub fn begin_write_wal_frame(
};
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete)));
let res = io.pwrite(offset, buffer.clone(), c);
let res = io.pwrite(offset, buffer.clone(), c.into());
if res.is_err() {
// If we do not reduce the counter here on error, we incur an infinite loop when cacheflushing
*write_counter.borrow_mut() -= 1;
@@ -1645,7 +1646,7 @@ pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<
};
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete)));
io.pwrite(0, buffer.clone(), c)?;
io.pwrite(0, buffer.clone(), c.into())?;
Ok(())
}

View File

@@ -890,7 +890,7 @@ impl Wal for WalFile {
}),
}));
let shared = self.get_shared();
shared.file.sync(completion)?;
shared.file.sync(completion.into())?;
self.sync_state.set(SyncState::Syncing);
Ok(IOResult::IO)
}

View File

@@ -108,11 +108,23 @@ pub struct SimulatorCLI {
#[clap(long, help = "disable Reopen-Database fault", default_value_t = false)]
pub disable_reopen_database: bool,
#[clap(
long = "latency_prob",
long = "latency-prob",
help = "added IO latency probability",
default_value_t = 0
default_value_t = 1
)]
pub latency_probability: usize,
#[clap(
long,
help = "Minimum tick time in microseconds for simulated time",
default_value_t = 1
)]
pub min_tick: u64,
#[clap(
long,
help = "Maximum tick time in microseconds for simulated time",
default_value_t = 30
)]
pub max_tick: u64,
#[clap(long, help = "Enable experimental MVCC feature")]
pub experimental_mvcc: bool,
#[clap(long, help = "Enable experimental indexing feature")]

35
simulator/runner/clock.rs Normal file
View File

@@ -0,0 +1,35 @@
use std::cell::RefCell;
use chrono::{DateTime, Utc};
use rand::Rng;
use rand_chacha::ChaCha8Rng;
#[derive(Debug)]
pub struct SimulatorClock {
curr_time: RefCell<DateTime<Utc>>,
rng: RefCell<ChaCha8Rng>,
min_tick: u64,
max_tick: u64,
}
impl SimulatorClock {
pub fn new(rng: ChaCha8Rng, min_tick: u64, max_tick: u64) -> Self {
Self {
curr_time: RefCell::new(Utc::now()),
rng: RefCell::new(rng),
min_tick,
max_tick,
}
}
pub fn now(&self) -> DateTime<Utc> {
let mut time = self.curr_time.borrow_mut();
let nanos = self
.rng
.borrow_mut()
.gen_range(self.min_tick..self.max_tick);
let nanos = std::time::Duration::from_micros(nanos);
*time += nanos;
*time
}
}

View File

@@ -99,6 +99,8 @@ impl SimulatorEnv {
self.opts.seed,
self.opts.page_size,
self.opts.latency_probability,
self.opts.min_tick,
self.opts.max_tick,
)
.unwrap(),
);
@@ -245,10 +247,20 @@ impl SimulatorEnv {
latency_probability: cli_opts.latency_probability,
experimental_mvcc: cli_opts.experimental_mvcc,
experimental_indexes: cli_opts.experimental_indexes,
min_tick: cli_opts.min_tick,
max_tick: cli_opts.max_tick,
};
let io =
Arc::new(SimulatorIO::new(seed, opts.page_size, cli_opts.latency_probability).unwrap());
let io = Arc::new(
SimulatorIO::new(
seed,
opts.page_size,
cli_opts.latency_probability,
cli_opts.min_tick,
cli_opts.max_tick,
)
.unwrap(),
);
// Remove existing database file if it exists
let db_path = paths.db(&simulation_type, &SimulationPhase::Test);
@@ -407,6 +419,8 @@ pub(crate) struct SimulatorOpts {
pub(crate) latency_probability: usize,
pub(crate) experimental_mvcc: bool,
pub(crate) experimental_indexes: bool,
pub min_tick: u64,
pub max_tick: u64,
}
#[derive(Debug, Clone)]

View File

@@ -1,14 +1,15 @@
use std::{
cell::{Cell, RefCell},
fmt::Debug,
sync::Arc,
};
use rand::Rng as _;
use rand_chacha::ChaCha8Rng;
use tracing::{instrument, Level};
use turso_core::{CompletionType, File, Result};
use turso_core::{File, Result};
use crate::model::FAULT_ERROR_MSG;
use crate::{model::FAULT_ERROR_MSG, runner::clock::SimulatorClock};
pub(crate) struct SimulatorFile {
pub(crate) inner: Arc<dyn File>,
pub(crate) fault: Cell<bool>,
@@ -38,6 +39,23 @@ pub(crate) struct SimulatorFile {
pub latency_probability: usize,
pub sync_completion: RefCell<Option<Arc<turso_core::Completion>>>,
pub queued_io: RefCell<Vec<DelayedIo>>,
pub clock: Arc<SimulatorClock>,
}
type IoOperation = Box<dyn FnOnce(&SimulatorFile) -> Result<Arc<turso_core::Completion>>>;
pub struct DelayedIo {
pub time: turso_core::Instant,
pub op: IoOperation,
}
impl Debug for DelayedIo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DelayedIo")
.field("time", &self.time)
.finish()
}
}
unsafe impl Send for SimulatorFile {}
@@ -78,11 +96,37 @@ impl SimulatorFile {
}
#[instrument(skip_all, level = Level::TRACE)]
fn generate_latency_duration(&self) -> Option<std::time::Duration> {
fn generate_latency_duration(&self) -> Option<turso_core::Instant> {
let mut rng = self.rng.borrow_mut();
// Chance to introduce some latency
rng.gen_bool(self.latency_probability as f64 / 100.0)
.then(|| std::time::Duration::from_millis(rng.gen_range(20..50)))
.then(|| {
let now = self.clock.now();
let sum = now + std::time::Duration::from_millis(rng.gen_range(5..20));
sum.into()
})
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn run_queued_io(&self, now: turso_core::Instant) -> Result<()> {
let mut queued_io = self.queued_io.borrow_mut();
// TODO: as we are not in version 1.87 we cannot use `extract_if`
// so we have to do something different to achieve the same thing
// This code was acquired from: https://doc.rust-lang.org/beta/std/vec/struct.Vec.html#method.extract_if
let range = 0..queued_io.len();
let mut i = range.start;
let end_items = queued_io.len() - range.end;
while i < queued_io.len() - end_items {
if queued_io[i].time <= now {
let io = queued_io.remove(i);
// your code here
(io.op)(self)?;
} else {
i += 1;
}
}
Ok(())
}
}
@@ -108,7 +152,7 @@ impl File for SimulatorFile {
fn pread(
&self,
pos: usize,
mut c: turso_core::Completion,
c: Arc<turso_core::Completion>,
) -> Result<Arc<turso_core::Completion>> {
self.nr_pread_calls.set(self.nr_pread_calls.get() + 1);
if self.fault.get() {
@@ -119,31 +163,22 @@ impl File for SimulatorFile {
));
}
if let Some(latency) = self.generate_latency_duration() {
let CompletionType::Read(read_completion) = &mut c.completion_type else {
unreachable!();
};
let before = self.rng.borrow_mut().gen_bool(0.5);
let dummy_complete = Box::new(|_, _| {});
let prev_complete = std::mem::replace(&mut read_completion.complete, dummy_complete);
let new_complete = move |res, bytes_read| {
if before {
std::thread::sleep(latency);
}
(prev_complete)(res, bytes_read);
if !before {
std::thread::sleep(latency);
}
};
read_completion.complete = Box::new(new_complete);
};
self.inner.pread(pos, c)
let cloned_c = c.clone();
let op = Box::new(move |file: &SimulatorFile| file.inner.pread(pos, cloned_c));
self.queued_io
.borrow_mut()
.push(DelayedIo { time: latency, op });
Ok(c)
} else {
self.inner.pread(pos, c)
}
}
fn pwrite(
&self,
pos: usize,
buffer: Arc<RefCell<turso_core::Buffer>>,
mut c: turso_core::Completion,
c: Arc<turso_core::Completion>,
) -> Result<Arc<turso_core::Completion>> {
self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1);
if self.fault.get() {
@@ -154,53 +189,40 @@ impl File for SimulatorFile {
));
}
if let Some(latency) = self.generate_latency_duration() {
let CompletionType::Write(write_completion) = &mut c.completion_type else {
unreachable!();
};
let before = self.rng.borrow_mut().gen_bool(0.5);
let dummy_complete = Box::new(|_| {});
let prev_complete = std::mem::replace(&mut write_completion.complete, dummy_complete);
let new_complete = move |res| {
if before {
std::thread::sleep(latency);
}
(prev_complete)(res);
if !before {
std::thread::sleep(latency);
}
};
write_completion.complete = Box::new(new_complete);
};
self.inner.pwrite(pos, buffer, c)
let cloned_c = c.clone();
let op = Box::new(move |file: &SimulatorFile| file.inner.pwrite(pos, buffer, cloned_c));
self.queued_io
.borrow_mut()
.push(DelayedIo { time: latency, op });
Ok(c)
} else {
self.inner.pwrite(pos, buffer, c)
}
}
fn sync(&self, mut c: turso_core::Completion) -> Result<Arc<turso_core::Completion>> {
fn sync(&self, c: Arc<turso_core::Completion>) -> Result<Arc<turso_core::Completion>> {
self.nr_sync_calls.set(self.nr_sync_calls.get() + 1);
if self.fault.get() {
// TODO: Enable this when https://github.com/tursodatabase/turso/issues/2091 is fixed.
tracing::debug!("ignoring sync fault because it causes false positives with current simulator design");
self.fault.set(false);
}
if let Some(latency) = self.generate_latency_duration() {
let CompletionType::Sync(sync_completion) = &mut c.completion_type else {
unreachable!();
};
let before = self.rng.borrow_mut().gen_bool(0.5);
let dummy_complete = Box::new(|_| {});
let prev_complete = std::mem::replace(&mut sync_completion.complete, dummy_complete);
let new_complete = move |res| {
if before {
std::thread::sleep(latency);
}
(prev_complete)(res);
if !before {
std::thread::sleep(latency);
}
};
sync_completion.complete = Box::new(new_complete);
let c = if let Some(latency) = self.generate_latency_duration() {
let cloned_c = c.clone();
let op = Box::new(|file: &SimulatorFile| -> Result<_> {
let c = file.inner.sync(cloned_c)?;
*file.sync_completion.borrow_mut() = Some(c.clone());
Ok(c)
});
self.queued_io
.borrow_mut()
.push(DelayedIo { time: latency, op });
c
} else {
let c = self.inner.sync(c)?;
*self.sync_completion.borrow_mut() = Some(c.clone());
c
};
let c = self.inner.sync(c)?;
*self.sync_completion.borrow_mut() = Some(c.clone());
Ok(c)
}

View File

@@ -7,7 +7,10 @@ use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha8Rng;
use turso_core::{Clock, Instant, MemoryIO, OpenFlags, PlatformIO, Result, IO};
use crate::{model::FAULT_ERROR_MSG, runner::file::SimulatorFile};
use crate::{
model::FAULT_ERROR_MSG,
runner::{clock::SimulatorClock, file::SimulatorFile},
};
pub(crate) struct SimulatorIO {
pub(crate) inner: Box<dyn IO>,
@@ -18,18 +21,27 @@ pub(crate) struct SimulatorIO {
pub(crate) page_size: usize,
seed: u64,
latency_probability: usize,
clock: Arc<SimulatorClock>,
}
unsafe impl Send for SimulatorIO {}
unsafe impl Sync for SimulatorIO {}
impl SimulatorIO {
pub(crate) fn new(seed: u64, page_size: usize, latency_probability: usize) -> Result<Self> {
pub(crate) fn new(
seed: u64,
page_size: usize,
latency_probability: usize,
min_tick: u64,
max_tick: u64,
) -> Result<Self> {
let inner = Box::new(PlatformIO::new()?);
let fault = Cell::new(false);
let files = RefCell::new(Vec::new());
let rng = RefCell::new(ChaCha8Rng::seed_from_u64(seed));
let nr_run_once_faults = Cell::new(0);
let clock = SimulatorClock::new(ChaCha8Rng::seed_from_u64(seed), min_tick, max_tick);
Ok(Self {
inner,
fault,
@@ -39,6 +51,7 @@ impl SimulatorIO {
page_size,
seed,
latency_probability,
clock: Arc::new(clock),
})
}
@@ -59,10 +72,7 @@ impl SimulatorIO {
impl Clock for SimulatorIO {
fn now(&self) -> Instant {
Instant {
secs: 1704067200, // 2024-01-01 00:00:00 UTC
micros: 0,
}
self.clock.now().into()
}
}
@@ -87,6 +97,8 @@ impl IO for SimulatorIO {
rng: RefCell::new(ChaCha8Rng::seed_from_u64(self.seed)),
latency_probability: self.latency_probability,
sync_completion: RefCell::new(None),
queued_io: RefCell::new(Vec::new()),
clock: self.clock.clone(),
});
self.files.borrow_mut().push(file.clone());
Ok(file)
@@ -107,6 +119,10 @@ impl IO for SimulatorIO {
FAULT_ERROR_MSG.into(),
));
}
let now = self.now();
for file in self.files.borrow().iter() {
file.run_queued_io(now)?;
}
self.inner.run_once()?;
Ok(())
}

View File

@@ -1,5 +1,6 @@
pub mod bugbase;
pub mod cli;
pub mod clock;
pub mod differential;
pub mod doublecheck;
pub mod env;

View File

@@ -437,7 +437,7 @@ fn write_at(io: &impl IO, file: Arc<dyn File>, offset: usize, data: &[u8]) {
let drop_fn = Rc::new(move |_| {});
#[allow(clippy::arc_with_non_send_sync)]
let buffer = Arc::new(RefCell::new(Buffer::new(Pin::new(data.to_vec()), drop_fn)));
let result = file.pwrite(offset, buffer, completion).unwrap();
let result = file.pwrite(offset, buffer, completion.into()).unwrap();
while !result.is_completed() {
io.run_once().unwrap();
}