diff --git a/core/lib.rs b/core/lib.rs index 9b41b1d98..f2109417d 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -91,7 +91,8 @@ use turso_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser}; use types::IOResult; pub use types::RefValue; pub use types::Value; -use util::{parse_schema_rows, IOExt as _}; +use util::parse_schema_rows; +pub use util::IOExt; use vdbe::builder::QueryMode; use vdbe::builder::TableRefIdCounter; @@ -1231,16 +1232,11 @@ impl Connection { /// Read schema version at current transaction #[cfg(all(feature = "fs", feature = "conn_raw_api"))] pub fn read_schema_version(&self) -> Result { - loop { - let pager = self.pager.borrow(); - match pager.with_header(|header| header.schema_cookie)? { - IOResult::Done(cookie) => return Ok(cookie.get()), - IOResult::IO => { - self.run_once()?; - continue; - } - }; - } + let pager = self.pager.borrow(); + pager + .io + .block(|| pager.with_header(|header| header.schema_cookie)) + .map(|version| version.get()) } /// Update schema version to the new value within opened write transaction @@ -1254,9 +1250,9 @@ impl Connection { "write_schema_version must be called from within Write transaction".to_string(), )); }; - loop { - let pager = self.pager.borrow(); - match pager.with_header_mut(|header| { + let pager = self.pager.borrow(); + pager.io.block(|| { + pager.with_header_mut(|header| { turso_assert!( header.schema_cookie.get() < version, "cookie can't go back in time" @@ -1266,14 +1262,8 @@ impl Connection { }); self.with_schema_mut(|schema| schema.schema_version = version); header.schema_cookie = version.into(); - })? { - IOResult::Done(()) => break, - IOResult::IO => { - self.run_once()?; - continue; - } - }; - } + }) + })?; Ok(()) } @@ -1423,13 +1413,14 @@ impl Connection { // No active transaction } _ => { - while let IOResult::IO = self.pager.borrow().end_tx( - true, // rollback = true for close - self, - self.wal_checkpoint_disabled.get(), - )? { - self.run_once()?; - } + let pager = self.pager.borrow(); + pager.io.block(|| { + pager.end_tx( + true, // rollback = true for close + self, + self.wal_checkpoint_disabled.get(), + ) + })?; self.transaction_state.set(TransactionState::None); } } diff --git a/core/schema.rs b/core/schema.rs index ff469b912..03f49e19f 100644 --- a/core/schema.rs +++ b/core/schema.rs @@ -8,8 +8,8 @@ use crate::storage::btree::BTreeCursor; use crate::translate::collate::CollationSeq; use crate::translate::plan::SelectPlan; use crate::util::{module_args_from_sql, module_name_from_sql, IOExt, UnparsedFromSqlIndex}; +use crate::{return_if_io, LimboError, MvCursor, Pager, RefValue, SymbolTable, VirtualTable}; use crate::{util::normalize_ident, Result}; -use crate::{LimboError, MvCursor, Pager, RefValue, SymbolTable, VirtualTable}; use core::fmt; use fallible_iterator::FallibleIterator; use std::cell::RefCell; @@ -128,16 +128,7 @@ impl Schema { let mut view = view .lock() .map_err(|_| LimboError::InternalError("Failed to lock view".to_string()))?; - match view.populate_from_table(conn)? { - IOResult::Done(()) => { - // This view is done, continue to next - continue; - } - IOResult::IO => { - // This view needs more IO, return early - return Ok(IOResult::IO); - } - } + return_if_io!(view.populate_from_table(conn)); } Ok(IOResult::Done(())) } diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 4c9ea427b..90e29fedf 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -4070,18 +4070,10 @@ impl BTreeCursor { let root_contents = root.get_contents(); // FIXME: handle page cache is full // FIXME: remove sync IO hack - let child_btree = loop { - match self.pager.do_allocate_page( - root_contents.page_type(), - 0, - BtreePageAllocMode::Any, - )? { - IOResult::IO => { - self.pager.io.run_once()?; - } - IOResult::Done(page) => break page, - } - }; + let child_btree = self.pager.io.block(|| { + self.pager + .do_allocate_page(root_contents.page_type(), 0, BtreePageAllocMode::Any) + })?; tracing::debug!( "balance_root(root={}, rightmost={}, page_type={:?})", @@ -5160,29 +5152,29 @@ impl BTreeCursor { } } DestroyState::ClearOverflowPages { cell } => { - match self.clear_overflow_pages(&cell)? { - IOResult::Done(_) => match cell { - // For an index interior cell, clear the left child page now that overflow pages have been cleared - BTreeCell::IndexInteriorCell(index_int_cell) => { - let (child_page, _c) = - self.read_page(index_int_cell.left_child_page as usize)?; - self.stack.push(child_page); - let destroy_info = self.state.mut_destroy_info().expect( - "unable to get a mut reference to destroy state in cursor", - ); - destroy_info.state = DestroyState::LoadPage; - return Ok(IOResult::IO); - } - // For any leaf cell, advance the index now that overflow pages have been cleared - BTreeCell::TableLeafCell(_) | BTreeCell::IndexLeafCell(_) => { - let destroy_info = self.state.mut_destroy_info().expect( - "unable to get a mut reference to destroy state in cursor", - ); - destroy_info.state = DestroyState::LoadPage; - } - _ => panic!("unexpected cell type"), - }, - IOResult::IO => return Ok(IOResult::IO), + return_if_io!(self.clear_overflow_pages(&cell)); + match cell { + // For an index interior cell, clear the left child page now that overflow pages have been cleared + BTreeCell::IndexInteriorCell(index_int_cell) => { + let (child_page, _c) = + self.read_page(index_int_cell.left_child_page as usize)?; + self.stack.push(child_page); + let destroy_info = self + .state + .mut_destroy_info() + .expect("unable to get a mut reference to destroy state in cursor"); + destroy_info.state = DestroyState::LoadPage; + return Ok(IOResult::IO); + } + // For any leaf cell, advance the index now that overflow pages have been cleared + BTreeCell::TableLeafCell(_) | BTreeCell::IndexLeafCell(_) => { + let destroy_info = self + .state + .mut_destroy_info() + .expect("unable to get a mut reference to destroy state in cursor"); + destroy_info.state = DestroyState::LoadPage; + } + _ => panic!("unexpected cell type"), } } DestroyState::FreePage => { @@ -7867,14 +7859,10 @@ mod tests { pager.deref(), ) .unwrap(); - loop { - match pager.end_tx(false, &conn, false).unwrap() { - IOResult::Done(_) => break, - IOResult::IO => { - pager.io.run_once().unwrap(); - } - } - } + pager + .io + .block(|| pager.end_tx(false, &conn, false)) + .unwrap(); pager.begin_read_tx().unwrap(); // FIXME: add sorted vector instead, should be okay for small amounts of keys for now :P, too lazy to fix right now let _c = cursor.move_to_root().unwrap(); @@ -7944,14 +7932,10 @@ mod tests { tracing::info!("super seed: {}", seed); for _ in 0..attempts { let (pager, _, _db, conn) = empty_btree(); - let index_root_page_result = - pager.btree_create(&CreateBTreeFlags::new_index()).unwrap(); - let index_root_page = match index_root_page_result { - crate::types::IOResult::Done(id) => id as usize, - crate::types::IOResult::IO => { - panic!("btree_create returned IO in test, unexpected") - } - }; + let index_root_page = pager + .io + .block(|| pager.btree_create(&CreateBTreeFlags::new_index())) + .unwrap() as usize; let index_def = Index { name: "testindex".to_string(), columns: (0..10) @@ -7981,7 +7965,7 @@ mod tests { tracing::info!("seed: {seed}"); for i in 0..inserts { pager.begin_read_tx().unwrap(); - let _res = pager.begin_write_tx().unwrap(); + pager.io.block(|| pager.begin_write_tx()).unwrap(); let key = { let result; loop { @@ -8024,15 +8008,12 @@ mod tests { pager.deref(), ) .unwrap(); - let _c = cursor.move_to_root().unwrap(); - loop { - match pager.end_tx(false, &conn, false).unwrap() { - IOResult::Done(_) => break, - IOResult::IO => { - pager.io.run_once().unwrap(); - } - } - } + let c = cursor.move_to_root().unwrap(); + pager.io.wait_for_completion(c).unwrap(); + pager + .io + .block(|| pager.end_tx(false, &conn, false)) + .unwrap(); } // Check that all keys can be found by seeking @@ -8118,14 +8099,10 @@ mod tests { for _ in 0..attempts { let (pager, _, _db, conn) = empty_btree(); - let index_root_page_result = - pager.btree_create(&CreateBTreeFlags::new_index()).unwrap(); - let index_root_page = match index_root_page_result { - crate::types::IOResult::Done(id) => id as usize, - crate::types::IOResult::IO => { - panic!("btree_create returned IO in test, unexpected") - } - }; + let index_root_page = pager + .io + .block(|| pager.btree_create(&CreateBTreeFlags::new_index())) + .unwrap() as usize; let index_def = Index { name: "testindex".to_string(), columns: vec![IndexColumn { @@ -8151,7 +8128,8 @@ mod tests { for i in 0..operations { let print_progress = i % 100 == 0; pager.begin_read_tx().unwrap(); - let _res = pager.begin_write_tx().unwrap(); + + pager.io.block(|| pager.begin_write_tx()).unwrap(); // Decide whether to insert or delete (80% chance of insert) let is_insert = rng.next_u64() % 100 < (insert_chance * 100.0) as u64; @@ -8242,15 +8220,12 @@ mod tests { } } - let _c = cursor.move_to_root().unwrap(); - loop { - match pager.end_tx(false, &conn, false).unwrap() { - IOResult::Done(_) => break, - IOResult::IO => { - pager.io.run_once().unwrap(); - } - } - } + let c = cursor.move_to_root().unwrap(); + pager.io.wait_for_completion(c).unwrap(); + pager + .io + .block(|| pager.end_tx(false, &conn, false)) + .unwrap(); } // Final validation @@ -8610,51 +8585,44 @@ mod tests { .block(|| pager.with_header(|header| header.freelist_pages))? .get(); // Clear overflow pages - let clear_result = cursor.clear_overflow_pages(&leaf_cell)?; - match clear_result { - IOResult::Done(_) => { - let (freelist_pages, freelist_trunk_page) = pager - .io - .block(|| { - pager.with_header(|header| { - ( - header.freelist_pages.get(), - header.freelist_trunk_page.get(), - ) - }) - }) - .unwrap(); + pager.io.block(|| cursor.clear_overflow_pages(&leaf_cell))?; + let (freelist_pages, freelist_trunk_page) = pager + .io + .block(|| { + pager.with_header(|header| { + ( + header.freelist_pages.get(), + header.freelist_trunk_page.get(), + ) + }) + }) + .unwrap(); - // Verify proper number of pages were added to freelist - assert_eq!( - freelist_pages, - initial_freelist_pages + 3, - "Expected 3 pages to be added to freelist" - ); + // Verify proper number of pages were added to freelist + assert_eq!( + freelist_pages, + initial_freelist_pages + 3, + "Expected 3 pages to be added to freelist" + ); - // If this is first trunk page - let trunk_page_id = freelist_trunk_page; - if trunk_page_id > 0 { - // Verify trunk page structure - let (trunk_page, _c) = cursor.read_page(trunk_page_id as usize)?; - if let Some(contents) = trunk_page.get().get().contents.as_ref() { - // Read number of leaf pages in trunk - let n_leaf = contents.read_u32_no_offset(4); - assert!(n_leaf > 0, "Trunk page should have leaf entries"); + // If this is first trunk page + let trunk_page_id = freelist_trunk_page; + if trunk_page_id > 0 { + // Verify trunk page structure + let (trunk_page, _c) = cursor.read_page(trunk_page_id as usize)?; + if let Some(contents) = trunk_page.get().get().contents.as_ref() { + // Read number of leaf pages in trunk + let n_leaf = contents.read_u32_no_offset(4); + assert!(n_leaf > 0, "Trunk page should have leaf entries"); - for i in 0..n_leaf { - let leaf_page_id = contents.read_u32_no_offset(8 + (i as usize * 4)); - assert!( - (2..=4).contains(&leaf_page_id), - "Leaf page ID {leaf_page_id} should be in range 2-4" - ); - } - } + for i in 0..n_leaf { + let leaf_page_id = contents.read_u32_no_offset(8 + (i as usize * 4)); + assert!( + (2..=4).contains(&leaf_page_id), + "Leaf page ID {leaf_page_id} should be in range 2-4" + ); } } - IOResult::IO => { - cursor.pager.io.run_once()?; - } } Ok(()) @@ -8683,34 +8651,27 @@ mod tests { .get() as usize; // Try to clear non-existent overflow pages - let clear_result = cursor.clear_overflow_pages(&leaf_cell)?; - match clear_result { - IOResult::Done(_) => { - let (freelist_pages, freelist_trunk_page) = pager.io.block(|| { - pager.with_header(|header| { - ( - header.freelist_pages.get(), - header.freelist_trunk_page.get(), - ) - }) - })?; + pager.io.block(|| cursor.clear_overflow_pages(&leaf_cell))?; + let (freelist_pages, freelist_trunk_page) = pager.io.block(|| { + pager.with_header(|header| { + ( + header.freelist_pages.get(), + header.freelist_trunk_page.get(), + ) + }) + })?; - // Verify freelist was not modified - assert_eq!( - freelist_pages as usize, initial_freelist_pages, - "Freelist should not change when no overflow pages exist" - ); + // Verify freelist was not modified + assert_eq!( + freelist_pages as usize, initial_freelist_pages, + "Freelist should not change when no overflow pages exist" + ); - // Verify trunk page wasn't created - assert_eq!( - freelist_trunk_page, 0, - "No trunk page should be created when no overflow pages exist" - ); - } - IOResult::IO => { - cursor.pager.io.run_once()?; - } - } + // Verify trunk page wasn't created + assert_eq!( + freelist_trunk_page, 0, + "No trunk page should be created when no overflow pages exist" + ); Ok(()) } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 74d96f170..6a7e1f39a 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -590,10 +590,8 @@ impl Pager { /// Returns the new maximum page count (may be clamped to current database size) pub fn set_max_page_count(&self, new_max: u32) -> crate::Result> { // Get current database size - let current_page_count = match self.with_header(|header| header.database_size.get())? { - IOResult::Done(size) => size, - IOResult::IO => return Ok(IOResult::IO), - }; + let current_page_count = + return_if_io!(self.with_header(|header| header.database_size.get())); // Clamp new_max to be at least the current database size let clamped_max = std::cmp::max(new_max, current_page_count); @@ -868,17 +866,14 @@ impl Pager { } BtreeCreateVacuumFullState::PtrMapPut { allocated_page_id } => { // For now map allocated_page_id since we are not swapping it with root_page_num - let res = match self.ptrmap_put( + return_if_io!(self.ptrmap_put( allocated_page_id, PtrmapType::RootPage, 0, - )? { - IOResult::Done(_) => Ok(IOResult::Done(allocated_page_id)), - IOResult::IO => return Ok(IOResult::IO), - }; + )); self.btree_create_vacuum_full_state .set(BtreeCreateVacuumFullState::Start); - return res; + return Ok(IOResult::Done(allocated_page_id)); } } } @@ -995,10 +990,7 @@ impl Pager { pub fn begin_write_tx(&self) -> Result> { // TODO(Diego): The only possibly allocate page1 here is because OpenEphemeral needs a write transaction // we should have a unique API to begin transactions, something like sqlite3BtreeBeginTrans - match self.maybe_allocate_page1()? { - IOResult::Done(_) => {} - IOResult::IO => return Ok(IOResult::IO), - } + return_if_io!(self.maybe_allocate_page1()); let Some(wal) = self.wal.as_ref() else { return Ok(IOResult::Done(LimboResult::Ok)); }; @@ -1029,20 +1021,15 @@ impl Pager { self.rollback(schema_did_change, connection, is_write)?; return Ok(IOResult::Done(PagerCommitResult::Rollback)); } - let commit_status = self.commit_dirty_pages(wal_checkpoint_disabled)?; - match commit_status { - IOResult::IO => Ok(IOResult::IO), - IOResult::Done(_) => { - wal.borrow().end_write_tx(); - wal.borrow().end_read_tx(); + let commit_status = return_if_io!(self.commit_dirty_pages(wal_checkpoint_disabled)); + wal.borrow().end_write_tx(); + wal.borrow().end_read_tx(); - if schema_did_change { - let schema = connection.schema.borrow().clone(); - connection._db.update_schema_if_newer(schema)?; - } - Ok(commit_status) - } + if schema_did_change { + let schema = connection.schema.borrow().clone(); + connection._db.update_schema_if_newer(schema)?; } + Ok(IOResult::Done(commit_status)) } #[instrument(skip_all, level = Level::DEBUG)] @@ -1097,7 +1084,9 @@ impl Pager { if let Some(page) = page_cache.get(&page_key) { tracing::trace!("read_page(page_idx = {}) = cached", page_idx); // Dummy completion being passed, as we do not need to read from database or wal - return Ok((page.clone(), Completion::new_write(|_| {}))); + let c = Completion::new_write(|_| {}); + c.complete(0); + return Ok((page.clone(), c)); } let (page, c) = self.read_page_no_cache(page_idx, None, false)?; self.cache_insert(page_idx, page.clone(), &mut page_cache)?; @@ -1492,16 +1481,13 @@ impl Pager { match state { CheckpointState::Checkpoint => { let in_flight = self.checkpoint_inflight.clone(); - match wal - .borrow_mut() - .checkpoint(self, in_flight, CheckpointMode::Passive)? - { - IOResult::IO => return Ok(IOResult::IO), - IOResult::Done(res) => { - checkpoint_result = res; - self.checkpoint_state.replace(CheckpointState::SyncDbFile); - } - }; + let res = return_if_io!(wal.borrow_mut().checkpoint( + self, + in_flight, + CheckpointMode::Passive + )); + checkpoint_result = res; + self.checkpoint_state.replace(CheckpointState::SyncDbFile); } CheckpointState::SyncDbFile => { let _c = diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 154f1f3f3..7fe006240 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1906,6 +1906,7 @@ pub mod test { wal::READMARK_NOT_USED, }, types::IOResult, + util::IOExt, CheckpointMode, CheckpointResult, Completion, Connection, Database, LimboError, PlatformIO, StepResult, Wal, WalFile, WalFileShared, IO, }; @@ -2030,14 +2031,10 @@ pub mod test { mode: CheckpointMode, ) -> CheckpointResult { let wc = Rc::new(RefCell::new(0usize)); - loop { - match wal.checkpoint(pager, wc.clone(), mode).unwrap() { - IOResult::Done(r) => return r, - IOResult::IO => { - pager.io.run_once().unwrap(); - } - } - } + pager + .io + .block(|| wal.checkpoint(pager, wc.clone(), mode)) + .unwrap() } fn wal_header_snapshot(shared: &Arc>) -> (u32, u32, u32, u32) { @@ -2062,9 +2059,9 @@ pub mod test { conn.execute("create table test(id integer primary key, value text)") .unwrap(); bulk_inserts(&conn, 20, 3); - while let IOResult::IO = conn.pager.borrow_mut().cacheflush().unwrap() { - conn.run_once().unwrap(); - } + db.io + .block(|| conn.pager.borrow_mut().cacheflush()) + .unwrap(); // Snapshot header & counters before the RESTART checkpoint. let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone(); @@ -2156,9 +2153,9 @@ pub mod test { .execute("create table test(id integer primary key, value text)") .unwrap(); bulk_inserts(&conn1.clone(), 15, 2); - while let IOResult::IO = conn1.pager.borrow_mut().cacheflush().unwrap() { - conn1.run_once().unwrap(); - } + db.io + .block(|| conn1.pager.borrow_mut().cacheflush()) + .unwrap(); // Force a read transaction that will freeze a lower read mark let readmark = { @@ -2170,9 +2167,9 @@ pub mod test { // generate more frames that the reader will not see. bulk_inserts(&conn1.clone(), 15, 2); - while let IOResult::IO = conn1.pager.borrow_mut().cacheflush().unwrap() { - conn1.run_once().unwrap(); - } + db.io + .block(|| conn1.pager.borrow_mut().cacheflush()) + .unwrap(); // Run passive checkpoint, expect partial let (res1, max_before) = { @@ -2831,9 +2828,9 @@ pub mod test { bulk_inserts(&conn, 8, 4); // Ensure frames are flushed to the WAL - while let IOResult::IO = conn.pager.borrow_mut().cacheflush().unwrap() { - conn.run_once().unwrap(); - } + db.io + .block(|| conn.pager.borrow_mut().cacheflush()) + .unwrap(); // Snapshot the current mxFrame before running FULL let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone(); @@ -2863,9 +2860,9 @@ pub mod test { // First commit some data and flush (reader will snapshot here) bulk_inserts(&writer, 2, 3); - while let IOResult::IO = writer.pager.borrow_mut().cacheflush().unwrap() { - writer.run_once().unwrap(); - } + db.io + .block(|| writer.pager.borrow_mut().cacheflush()) + .unwrap(); // Start a read transaction pinned at the current snapshot { @@ -2882,9 +2879,9 @@ pub mod test { // Advance WAL beyond the reader's snapshot bulk_inserts(&writer, 3, 4); - while let IOResult::IO = writer.pager.borrow_mut().cacheflush().unwrap() { - writer.run_once().unwrap(); - } + db.io + .block(|| writer.pager.borrow_mut().cacheflush()) + .unwrap(); let mx_now = unsafe { (&*db.maybe_shared_wal.read().as_ref().unwrap().get()) .max_frame diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index f5e9c254d..b2e1399bf 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -3793,9 +3793,7 @@ pub fn op_sorter_sort( let cursor = cursor.as_sorter_mut(); let is_empty = cursor.is_empty(); if !is_empty { - if let IOResult::IO = cursor.sort()? { - return Ok(InsnFunctionStepResult::IO); - } + return_if_io!(cursor.sort()); } is_empty }; @@ -3825,9 +3823,7 @@ pub fn op_sorter_next( let has_more = { let mut cursor = state.get_cursor(*cursor_id); let cursor = cursor.as_sorter_mut(); - if let IOResult::IO = cursor.next()? { - return Ok(InsnFunctionStepResult::IO); - } + return_if_io!(cursor.next()); cursor.has_more() }; if has_more { @@ -6463,17 +6459,10 @@ pub fn op_populate_views( let conn = program.connection.clone(); let schema = conn.schema.borrow(); - match schema.populate_views(&conn)? { - IOResult::Done(()) => { - // All views populated, advance to next instruction - state.pc += 1; - Ok(InsnFunctionStepResult::Step) - } - IOResult::IO => { - // Need more IO, stay on this instruction - Ok(InsnFunctionStepResult::IO) - } - } + return_if_io!(schema.populate_views(&conn)); + // All views populated, advance to next instruction + state.pc += 1; + Ok(InsnFunctionStepResult::Step) } pub fn op_read_cookie( @@ -8747,10 +8736,7 @@ pub fn op_max_pgcnt( pager.get_max_page_count() } else { // Set new maximum page count (will be clamped to current database size) - match pager.set_max_page_count(*new_max as u32)? { - IOResult::Done(new_max_count) => new_max_count, - IOResult::IO => return Ok(InsnFunctionStepResult::IO), - } + return_if_io!(pager.set_max_page_count(*new_max as u32)) }; state.registers[*dest] = Register::Value(Value::Integer(result_value.into())); diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 991803c10..ca5e25896 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -178,10 +178,7 @@ impl Sorter { self.records.pop() } else { // Serve from sorted chunk files. - match self.next_from_chunk_heap()? { - IOResult::IO => return Ok(IOResult::IO), - IOResult::Done(record) => record, - } + return_if_io!(self.next_from_chunk_heap()) }; match record { Some(record) => { diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 83e38429a..9eff5613f 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -5,7 +5,8 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use tempfile::TempDir; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; -use turso_core::{types::IOResult, Connection, Database, IO}; +use turso_core::IOExt; +use turso_core::{Connection, Database, IO}; #[allow(dead_code)] pub struct TempDatabase { @@ -115,17 +116,10 @@ impl TempDatabase { } pub(crate) fn do_flush(conn: &Arc, tmp_db: &TempDatabase) -> anyhow::Result<()> { - loop { - match conn.cacheflush()? { - IOResult::Done(_) => { - break; - } - IOResult::IO => { - tmp_db.io.run_once()?; - } - } - } - Ok(()) + tmp_db + .io + .block(|| conn.cacheflush()) + .map_err(anyhow::Error::from) } pub(crate) fn compare_string(a: impl AsRef, b: impl AsRef) { diff --git a/tests/integration/fuzz/mod.rs b/tests/integration/fuzz/mod.rs index c9b96a01e..7032f4bcb 100644 --- a/tests/integration/fuzz/mod.rs +++ b/tests/integration/fuzz/mod.rs @@ -1832,7 +1832,6 @@ mod tests { // Successfully added column, update our tracking let table_name = table_to_alter.clone(); - let col_name = new_col_name.clone(); if let Some(columns) = current_tables.get_mut(&table_name) { columns.push(new_col_name); }