mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-20 07:25:14 +01:00
Merge 'IO Cleanups to use and ' from Pedro Muniz
Depends on #2512 . This is an attempt to remove all the noise from my IO completions refactor into separate PRs Closes #2566
This commit is contained in:
49
core/lib.rs
49
core/lib.rs
@@ -91,7 +91,8 @@ use turso_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser};
|
||||
use types::IOResult;
|
||||
pub use types::RefValue;
|
||||
pub use types::Value;
|
||||
use util::{parse_schema_rows, IOExt as _};
|
||||
use util::parse_schema_rows;
|
||||
pub use util::IOExt;
|
||||
use vdbe::builder::QueryMode;
|
||||
use vdbe::builder::TableRefIdCounter;
|
||||
|
||||
@@ -1231,16 +1232,11 @@ impl Connection {
|
||||
/// Read schema version at current transaction
|
||||
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
|
||||
pub fn read_schema_version(&self) -> Result<u32> {
|
||||
loop {
|
||||
let pager = self.pager.borrow();
|
||||
match pager.with_header(|header| header.schema_cookie)? {
|
||||
IOResult::Done(cookie) => return Ok(cookie.get()),
|
||||
IOResult::IO => {
|
||||
self.run_once()?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
let pager = self.pager.borrow();
|
||||
pager
|
||||
.io
|
||||
.block(|| pager.with_header(|header| header.schema_cookie))
|
||||
.map(|version| version.get())
|
||||
}
|
||||
|
||||
/// Update schema version to the new value within opened write transaction
|
||||
@@ -1254,9 +1250,9 @@ impl Connection {
|
||||
"write_schema_version must be called from within Write transaction".to_string(),
|
||||
));
|
||||
};
|
||||
loop {
|
||||
let pager = self.pager.borrow();
|
||||
match pager.with_header_mut(|header| {
|
||||
let pager = self.pager.borrow();
|
||||
pager.io.block(|| {
|
||||
pager.with_header_mut(|header| {
|
||||
turso_assert!(
|
||||
header.schema_cookie.get() < version,
|
||||
"cookie can't go back in time"
|
||||
@@ -1266,14 +1262,8 @@ impl Connection {
|
||||
});
|
||||
self.with_schema_mut(|schema| schema.schema_version = version);
|
||||
header.schema_cookie = version.into();
|
||||
})? {
|
||||
IOResult::Done(()) => break,
|
||||
IOResult::IO => {
|
||||
self.run_once()?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
})
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1423,13 +1413,14 @@ impl Connection {
|
||||
// No active transaction
|
||||
}
|
||||
_ => {
|
||||
while let IOResult::IO = self.pager.borrow().end_tx(
|
||||
true, // rollback = true for close
|
||||
self,
|
||||
self.wal_checkpoint_disabled.get(),
|
||||
)? {
|
||||
self.run_once()?;
|
||||
}
|
||||
let pager = self.pager.borrow();
|
||||
pager.io.block(|| {
|
||||
pager.end_tx(
|
||||
true, // rollback = true for close
|
||||
self,
|
||||
self.wal_checkpoint_disabled.get(),
|
||||
)
|
||||
})?;
|
||||
self.transaction_state.set(TransactionState::None);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,8 +8,8 @@ use crate::storage::btree::BTreeCursor;
|
||||
use crate::translate::collate::CollationSeq;
|
||||
use crate::translate::plan::SelectPlan;
|
||||
use crate::util::{module_args_from_sql, module_name_from_sql, IOExt, UnparsedFromSqlIndex};
|
||||
use crate::{return_if_io, LimboError, MvCursor, Pager, RefValue, SymbolTable, VirtualTable};
|
||||
use crate::{util::normalize_ident, Result};
|
||||
use crate::{LimboError, MvCursor, Pager, RefValue, SymbolTable, VirtualTable};
|
||||
use core::fmt;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use std::cell::RefCell;
|
||||
@@ -128,16 +128,7 @@ impl Schema {
|
||||
let mut view = view
|
||||
.lock()
|
||||
.map_err(|_| LimboError::InternalError("Failed to lock view".to_string()))?;
|
||||
match view.populate_from_table(conn)? {
|
||||
IOResult::Done(()) => {
|
||||
// This view is done, continue to next
|
||||
continue;
|
||||
}
|
||||
IOResult::IO => {
|
||||
// This view needs more IO, return early
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
}
|
||||
return_if_io!(view.populate_from_table(conn));
|
||||
}
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
|
||||
@@ -4070,18 +4070,10 @@ impl BTreeCursor {
|
||||
let root_contents = root.get_contents();
|
||||
// FIXME: handle page cache is full
|
||||
// FIXME: remove sync IO hack
|
||||
let child_btree = loop {
|
||||
match self.pager.do_allocate_page(
|
||||
root_contents.page_type(),
|
||||
0,
|
||||
BtreePageAllocMode::Any,
|
||||
)? {
|
||||
IOResult::IO => {
|
||||
self.pager.io.run_once()?;
|
||||
}
|
||||
IOResult::Done(page) => break page,
|
||||
}
|
||||
};
|
||||
let child_btree = self.pager.io.block(|| {
|
||||
self.pager
|
||||
.do_allocate_page(root_contents.page_type(), 0, BtreePageAllocMode::Any)
|
||||
})?;
|
||||
|
||||
tracing::debug!(
|
||||
"balance_root(root={}, rightmost={}, page_type={:?})",
|
||||
@@ -5160,29 +5152,29 @@ impl BTreeCursor {
|
||||
}
|
||||
}
|
||||
DestroyState::ClearOverflowPages { cell } => {
|
||||
match self.clear_overflow_pages(&cell)? {
|
||||
IOResult::Done(_) => match cell {
|
||||
// For an index interior cell, clear the left child page now that overflow pages have been cleared
|
||||
BTreeCell::IndexInteriorCell(index_int_cell) => {
|
||||
let (child_page, _c) =
|
||||
self.read_page(index_int_cell.left_child_page as usize)?;
|
||||
self.stack.push(child_page);
|
||||
let destroy_info = self.state.mut_destroy_info().expect(
|
||||
"unable to get a mut reference to destroy state in cursor",
|
||||
);
|
||||
destroy_info.state = DestroyState::LoadPage;
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
// For any leaf cell, advance the index now that overflow pages have been cleared
|
||||
BTreeCell::TableLeafCell(_) | BTreeCell::IndexLeafCell(_) => {
|
||||
let destroy_info = self.state.mut_destroy_info().expect(
|
||||
"unable to get a mut reference to destroy state in cursor",
|
||||
);
|
||||
destroy_info.state = DestroyState::LoadPage;
|
||||
}
|
||||
_ => panic!("unexpected cell type"),
|
||||
},
|
||||
IOResult::IO => return Ok(IOResult::IO),
|
||||
return_if_io!(self.clear_overflow_pages(&cell));
|
||||
match cell {
|
||||
// For an index interior cell, clear the left child page now that overflow pages have been cleared
|
||||
BTreeCell::IndexInteriorCell(index_int_cell) => {
|
||||
let (child_page, _c) =
|
||||
self.read_page(index_int_cell.left_child_page as usize)?;
|
||||
self.stack.push(child_page);
|
||||
let destroy_info = self
|
||||
.state
|
||||
.mut_destroy_info()
|
||||
.expect("unable to get a mut reference to destroy state in cursor");
|
||||
destroy_info.state = DestroyState::LoadPage;
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
// For any leaf cell, advance the index now that overflow pages have been cleared
|
||||
BTreeCell::TableLeafCell(_) | BTreeCell::IndexLeafCell(_) => {
|
||||
let destroy_info = self
|
||||
.state
|
||||
.mut_destroy_info()
|
||||
.expect("unable to get a mut reference to destroy state in cursor");
|
||||
destroy_info.state = DestroyState::LoadPage;
|
||||
}
|
||||
_ => panic!("unexpected cell type"),
|
||||
}
|
||||
}
|
||||
DestroyState::FreePage => {
|
||||
@@ -7867,14 +7859,10 @@ mod tests {
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
loop {
|
||||
match pager.end_tx(false, &conn, false).unwrap() {
|
||||
IOResult::Done(_) => break,
|
||||
IOResult::IO => {
|
||||
pager.io.run_once().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
pager
|
||||
.io
|
||||
.block(|| pager.end_tx(false, &conn, false))
|
||||
.unwrap();
|
||||
pager.begin_read_tx().unwrap();
|
||||
// FIXME: add sorted vector instead, should be okay for small amounts of keys for now :P, too lazy to fix right now
|
||||
let _c = cursor.move_to_root().unwrap();
|
||||
@@ -7944,14 +7932,10 @@ mod tests {
|
||||
tracing::info!("super seed: {}", seed);
|
||||
for _ in 0..attempts {
|
||||
let (pager, _, _db, conn) = empty_btree();
|
||||
let index_root_page_result =
|
||||
pager.btree_create(&CreateBTreeFlags::new_index()).unwrap();
|
||||
let index_root_page = match index_root_page_result {
|
||||
crate::types::IOResult::Done(id) => id as usize,
|
||||
crate::types::IOResult::IO => {
|
||||
panic!("btree_create returned IO in test, unexpected")
|
||||
}
|
||||
};
|
||||
let index_root_page = pager
|
||||
.io
|
||||
.block(|| pager.btree_create(&CreateBTreeFlags::new_index()))
|
||||
.unwrap() as usize;
|
||||
let index_def = Index {
|
||||
name: "testindex".to_string(),
|
||||
columns: (0..10)
|
||||
@@ -7981,7 +7965,7 @@ mod tests {
|
||||
tracing::info!("seed: {seed}");
|
||||
for i in 0..inserts {
|
||||
pager.begin_read_tx().unwrap();
|
||||
let _res = pager.begin_write_tx().unwrap();
|
||||
pager.io.block(|| pager.begin_write_tx()).unwrap();
|
||||
let key = {
|
||||
let result;
|
||||
loop {
|
||||
@@ -8024,15 +8008,12 @@ mod tests {
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
let _c = cursor.move_to_root().unwrap();
|
||||
loop {
|
||||
match pager.end_tx(false, &conn, false).unwrap() {
|
||||
IOResult::Done(_) => break,
|
||||
IOResult::IO => {
|
||||
pager.io.run_once().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
let c = cursor.move_to_root().unwrap();
|
||||
pager.io.wait_for_completion(c).unwrap();
|
||||
pager
|
||||
.io
|
||||
.block(|| pager.end_tx(false, &conn, false))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Check that all keys can be found by seeking
|
||||
@@ -8118,14 +8099,10 @@ mod tests {
|
||||
|
||||
for _ in 0..attempts {
|
||||
let (pager, _, _db, conn) = empty_btree();
|
||||
let index_root_page_result =
|
||||
pager.btree_create(&CreateBTreeFlags::new_index()).unwrap();
|
||||
let index_root_page = match index_root_page_result {
|
||||
crate::types::IOResult::Done(id) => id as usize,
|
||||
crate::types::IOResult::IO => {
|
||||
panic!("btree_create returned IO in test, unexpected")
|
||||
}
|
||||
};
|
||||
let index_root_page = pager
|
||||
.io
|
||||
.block(|| pager.btree_create(&CreateBTreeFlags::new_index()))
|
||||
.unwrap() as usize;
|
||||
let index_def = Index {
|
||||
name: "testindex".to_string(),
|
||||
columns: vec![IndexColumn {
|
||||
@@ -8151,7 +8128,8 @@ mod tests {
|
||||
for i in 0..operations {
|
||||
let print_progress = i % 100 == 0;
|
||||
pager.begin_read_tx().unwrap();
|
||||
let _res = pager.begin_write_tx().unwrap();
|
||||
|
||||
pager.io.block(|| pager.begin_write_tx()).unwrap();
|
||||
|
||||
// Decide whether to insert or delete (80% chance of insert)
|
||||
let is_insert = rng.next_u64() % 100 < (insert_chance * 100.0) as u64;
|
||||
@@ -8242,15 +8220,12 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
let _c = cursor.move_to_root().unwrap();
|
||||
loop {
|
||||
match pager.end_tx(false, &conn, false).unwrap() {
|
||||
IOResult::Done(_) => break,
|
||||
IOResult::IO => {
|
||||
pager.io.run_once().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
let c = cursor.move_to_root().unwrap();
|
||||
pager.io.wait_for_completion(c).unwrap();
|
||||
pager
|
||||
.io
|
||||
.block(|| pager.end_tx(false, &conn, false))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Final validation
|
||||
@@ -8610,51 +8585,44 @@ mod tests {
|
||||
.block(|| pager.with_header(|header| header.freelist_pages))?
|
||||
.get();
|
||||
// Clear overflow pages
|
||||
let clear_result = cursor.clear_overflow_pages(&leaf_cell)?;
|
||||
match clear_result {
|
||||
IOResult::Done(_) => {
|
||||
let (freelist_pages, freelist_trunk_page) = pager
|
||||
.io
|
||||
.block(|| {
|
||||
pager.with_header(|header| {
|
||||
(
|
||||
header.freelist_pages.get(),
|
||||
header.freelist_trunk_page.get(),
|
||||
)
|
||||
})
|
||||
})
|
||||
.unwrap();
|
||||
pager.io.block(|| cursor.clear_overflow_pages(&leaf_cell))?;
|
||||
let (freelist_pages, freelist_trunk_page) = pager
|
||||
.io
|
||||
.block(|| {
|
||||
pager.with_header(|header| {
|
||||
(
|
||||
header.freelist_pages.get(),
|
||||
header.freelist_trunk_page.get(),
|
||||
)
|
||||
})
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Verify proper number of pages were added to freelist
|
||||
assert_eq!(
|
||||
freelist_pages,
|
||||
initial_freelist_pages + 3,
|
||||
"Expected 3 pages to be added to freelist"
|
||||
);
|
||||
// Verify proper number of pages were added to freelist
|
||||
assert_eq!(
|
||||
freelist_pages,
|
||||
initial_freelist_pages + 3,
|
||||
"Expected 3 pages to be added to freelist"
|
||||
);
|
||||
|
||||
// If this is first trunk page
|
||||
let trunk_page_id = freelist_trunk_page;
|
||||
if trunk_page_id > 0 {
|
||||
// Verify trunk page structure
|
||||
let (trunk_page, _c) = cursor.read_page(trunk_page_id as usize)?;
|
||||
if let Some(contents) = trunk_page.get().get().contents.as_ref() {
|
||||
// Read number of leaf pages in trunk
|
||||
let n_leaf = contents.read_u32_no_offset(4);
|
||||
assert!(n_leaf > 0, "Trunk page should have leaf entries");
|
||||
// If this is first trunk page
|
||||
let trunk_page_id = freelist_trunk_page;
|
||||
if trunk_page_id > 0 {
|
||||
// Verify trunk page structure
|
||||
let (trunk_page, _c) = cursor.read_page(trunk_page_id as usize)?;
|
||||
if let Some(contents) = trunk_page.get().get().contents.as_ref() {
|
||||
// Read number of leaf pages in trunk
|
||||
let n_leaf = contents.read_u32_no_offset(4);
|
||||
assert!(n_leaf > 0, "Trunk page should have leaf entries");
|
||||
|
||||
for i in 0..n_leaf {
|
||||
let leaf_page_id = contents.read_u32_no_offset(8 + (i as usize * 4));
|
||||
assert!(
|
||||
(2..=4).contains(&leaf_page_id),
|
||||
"Leaf page ID {leaf_page_id} should be in range 2-4"
|
||||
);
|
||||
}
|
||||
}
|
||||
for i in 0..n_leaf {
|
||||
let leaf_page_id = contents.read_u32_no_offset(8 + (i as usize * 4));
|
||||
assert!(
|
||||
(2..=4).contains(&leaf_page_id),
|
||||
"Leaf page ID {leaf_page_id} should be in range 2-4"
|
||||
);
|
||||
}
|
||||
}
|
||||
IOResult::IO => {
|
||||
cursor.pager.io.run_once()?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -8683,34 +8651,27 @@ mod tests {
|
||||
.get() as usize;
|
||||
|
||||
// Try to clear non-existent overflow pages
|
||||
let clear_result = cursor.clear_overflow_pages(&leaf_cell)?;
|
||||
match clear_result {
|
||||
IOResult::Done(_) => {
|
||||
let (freelist_pages, freelist_trunk_page) = pager.io.block(|| {
|
||||
pager.with_header(|header| {
|
||||
(
|
||||
header.freelist_pages.get(),
|
||||
header.freelist_trunk_page.get(),
|
||||
)
|
||||
})
|
||||
})?;
|
||||
pager.io.block(|| cursor.clear_overflow_pages(&leaf_cell))?;
|
||||
let (freelist_pages, freelist_trunk_page) = pager.io.block(|| {
|
||||
pager.with_header(|header| {
|
||||
(
|
||||
header.freelist_pages.get(),
|
||||
header.freelist_trunk_page.get(),
|
||||
)
|
||||
})
|
||||
})?;
|
||||
|
||||
// Verify freelist was not modified
|
||||
assert_eq!(
|
||||
freelist_pages as usize, initial_freelist_pages,
|
||||
"Freelist should not change when no overflow pages exist"
|
||||
);
|
||||
// Verify freelist was not modified
|
||||
assert_eq!(
|
||||
freelist_pages as usize, initial_freelist_pages,
|
||||
"Freelist should not change when no overflow pages exist"
|
||||
);
|
||||
|
||||
// Verify trunk page wasn't created
|
||||
assert_eq!(
|
||||
freelist_trunk_page, 0,
|
||||
"No trunk page should be created when no overflow pages exist"
|
||||
);
|
||||
}
|
||||
IOResult::IO => {
|
||||
cursor.pager.io.run_once()?;
|
||||
}
|
||||
}
|
||||
// Verify trunk page wasn't created
|
||||
assert_eq!(
|
||||
freelist_trunk_page, 0,
|
||||
"No trunk page should be created when no overflow pages exist"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -590,10 +590,8 @@ impl Pager {
|
||||
/// Returns the new maximum page count (may be clamped to current database size)
|
||||
pub fn set_max_page_count(&self, new_max: u32) -> crate::Result<IOResult<u32>> {
|
||||
// Get current database size
|
||||
let current_page_count = match self.with_header(|header| header.database_size.get())? {
|
||||
IOResult::Done(size) => size,
|
||||
IOResult::IO => return Ok(IOResult::IO),
|
||||
};
|
||||
let current_page_count =
|
||||
return_if_io!(self.with_header(|header| header.database_size.get()));
|
||||
|
||||
// Clamp new_max to be at least the current database size
|
||||
let clamped_max = std::cmp::max(new_max, current_page_count);
|
||||
@@ -868,17 +866,14 @@ impl Pager {
|
||||
}
|
||||
BtreeCreateVacuumFullState::PtrMapPut { allocated_page_id } => {
|
||||
// For now map allocated_page_id since we are not swapping it with root_page_num
|
||||
let res = match self.ptrmap_put(
|
||||
return_if_io!(self.ptrmap_put(
|
||||
allocated_page_id,
|
||||
PtrmapType::RootPage,
|
||||
0,
|
||||
)? {
|
||||
IOResult::Done(_) => Ok(IOResult::Done(allocated_page_id)),
|
||||
IOResult::IO => return Ok(IOResult::IO),
|
||||
};
|
||||
));
|
||||
self.btree_create_vacuum_full_state
|
||||
.set(BtreeCreateVacuumFullState::Start);
|
||||
return res;
|
||||
return Ok(IOResult::Done(allocated_page_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -995,10 +990,7 @@ impl Pager {
|
||||
pub fn begin_write_tx(&self) -> Result<IOResult<LimboResult>> {
|
||||
// TODO(Diego): The only possibly allocate page1 here is because OpenEphemeral needs a write transaction
|
||||
// we should have a unique API to begin transactions, something like sqlite3BtreeBeginTrans
|
||||
match self.maybe_allocate_page1()? {
|
||||
IOResult::Done(_) => {}
|
||||
IOResult::IO => return Ok(IOResult::IO),
|
||||
}
|
||||
return_if_io!(self.maybe_allocate_page1());
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Ok(IOResult::Done(LimboResult::Ok));
|
||||
};
|
||||
@@ -1029,20 +1021,15 @@ impl Pager {
|
||||
self.rollback(schema_did_change, connection, is_write)?;
|
||||
return Ok(IOResult::Done(PagerCommitResult::Rollback));
|
||||
}
|
||||
let commit_status = self.commit_dirty_pages(wal_checkpoint_disabled)?;
|
||||
match commit_status {
|
||||
IOResult::IO => Ok(IOResult::IO),
|
||||
IOResult::Done(_) => {
|
||||
wal.borrow().end_write_tx();
|
||||
wal.borrow().end_read_tx();
|
||||
let commit_status = return_if_io!(self.commit_dirty_pages(wal_checkpoint_disabled));
|
||||
wal.borrow().end_write_tx();
|
||||
wal.borrow().end_read_tx();
|
||||
|
||||
if schema_did_change {
|
||||
let schema = connection.schema.borrow().clone();
|
||||
connection._db.update_schema_if_newer(schema)?;
|
||||
}
|
||||
Ok(commit_status)
|
||||
}
|
||||
if schema_did_change {
|
||||
let schema = connection.schema.borrow().clone();
|
||||
connection._db.update_schema_if_newer(schema)?;
|
||||
}
|
||||
Ok(IOResult::Done(commit_status))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
@@ -1097,7 +1084,9 @@ impl Pager {
|
||||
if let Some(page) = page_cache.get(&page_key) {
|
||||
tracing::trace!("read_page(page_idx = {}) = cached", page_idx);
|
||||
// Dummy completion being passed, as we do not need to read from database or wal
|
||||
return Ok((page.clone(), Completion::new_write(|_| {})));
|
||||
let c = Completion::new_write(|_| {});
|
||||
c.complete(0);
|
||||
return Ok((page.clone(), c));
|
||||
}
|
||||
let (page, c) = self.read_page_no_cache(page_idx, None, false)?;
|
||||
self.cache_insert(page_idx, page.clone(), &mut page_cache)?;
|
||||
@@ -1492,16 +1481,13 @@ impl Pager {
|
||||
match state {
|
||||
CheckpointState::Checkpoint => {
|
||||
let in_flight = self.checkpoint_inflight.clone();
|
||||
match wal
|
||||
.borrow_mut()
|
||||
.checkpoint(self, in_flight, CheckpointMode::Passive)?
|
||||
{
|
||||
IOResult::IO => return Ok(IOResult::IO),
|
||||
IOResult::Done(res) => {
|
||||
checkpoint_result = res;
|
||||
self.checkpoint_state.replace(CheckpointState::SyncDbFile);
|
||||
}
|
||||
};
|
||||
let res = return_if_io!(wal.borrow_mut().checkpoint(
|
||||
self,
|
||||
in_flight,
|
||||
CheckpointMode::Passive
|
||||
));
|
||||
checkpoint_result = res;
|
||||
self.checkpoint_state.replace(CheckpointState::SyncDbFile);
|
||||
}
|
||||
CheckpointState::SyncDbFile => {
|
||||
let _c =
|
||||
|
||||
@@ -1906,6 +1906,7 @@ pub mod test {
|
||||
wal::READMARK_NOT_USED,
|
||||
},
|
||||
types::IOResult,
|
||||
util::IOExt,
|
||||
CheckpointMode, CheckpointResult, Completion, Connection, Database, LimboError, PlatformIO,
|
||||
StepResult, Wal, WalFile, WalFileShared, IO,
|
||||
};
|
||||
@@ -2030,14 +2031,10 @@ pub mod test {
|
||||
mode: CheckpointMode,
|
||||
) -> CheckpointResult {
|
||||
let wc = Rc::new(RefCell::new(0usize));
|
||||
loop {
|
||||
match wal.checkpoint(pager, wc.clone(), mode).unwrap() {
|
||||
IOResult::Done(r) => return r,
|
||||
IOResult::IO => {
|
||||
pager.io.run_once().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
pager
|
||||
.io
|
||||
.block(|| wal.checkpoint(pager, wc.clone(), mode))
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn wal_header_snapshot(shared: &Arc<UnsafeCell<WalFileShared>>) -> (u32, u32, u32, u32) {
|
||||
@@ -2062,9 +2059,9 @@ pub mod test {
|
||||
conn.execute("create table test(id integer primary key, value text)")
|
||||
.unwrap();
|
||||
bulk_inserts(&conn, 20, 3);
|
||||
while let IOResult::IO = conn.pager.borrow_mut().cacheflush().unwrap() {
|
||||
conn.run_once().unwrap();
|
||||
}
|
||||
db.io
|
||||
.block(|| conn.pager.borrow_mut().cacheflush())
|
||||
.unwrap();
|
||||
|
||||
// Snapshot header & counters before the RESTART checkpoint.
|
||||
let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone();
|
||||
@@ -2156,9 +2153,9 @@ pub mod test {
|
||||
.execute("create table test(id integer primary key, value text)")
|
||||
.unwrap();
|
||||
bulk_inserts(&conn1.clone(), 15, 2);
|
||||
while let IOResult::IO = conn1.pager.borrow_mut().cacheflush().unwrap() {
|
||||
conn1.run_once().unwrap();
|
||||
}
|
||||
db.io
|
||||
.block(|| conn1.pager.borrow_mut().cacheflush())
|
||||
.unwrap();
|
||||
|
||||
// Force a read transaction that will freeze a lower read mark
|
||||
let readmark = {
|
||||
@@ -2170,9 +2167,9 @@ pub mod test {
|
||||
|
||||
// generate more frames that the reader will not see.
|
||||
bulk_inserts(&conn1.clone(), 15, 2);
|
||||
while let IOResult::IO = conn1.pager.borrow_mut().cacheflush().unwrap() {
|
||||
conn1.run_once().unwrap();
|
||||
}
|
||||
db.io
|
||||
.block(|| conn1.pager.borrow_mut().cacheflush())
|
||||
.unwrap();
|
||||
|
||||
// Run passive checkpoint, expect partial
|
||||
let (res1, max_before) = {
|
||||
@@ -2831,9 +2828,9 @@ pub mod test {
|
||||
bulk_inserts(&conn, 8, 4);
|
||||
|
||||
// Ensure frames are flushed to the WAL
|
||||
while let IOResult::IO = conn.pager.borrow_mut().cacheflush().unwrap() {
|
||||
conn.run_once().unwrap();
|
||||
}
|
||||
db.io
|
||||
.block(|| conn.pager.borrow_mut().cacheflush())
|
||||
.unwrap();
|
||||
|
||||
// Snapshot the current mxFrame before running FULL
|
||||
let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone();
|
||||
@@ -2863,9 +2860,9 @@ pub mod test {
|
||||
|
||||
// First commit some data and flush (reader will snapshot here)
|
||||
bulk_inserts(&writer, 2, 3);
|
||||
while let IOResult::IO = writer.pager.borrow_mut().cacheflush().unwrap() {
|
||||
writer.run_once().unwrap();
|
||||
}
|
||||
db.io
|
||||
.block(|| writer.pager.borrow_mut().cacheflush())
|
||||
.unwrap();
|
||||
|
||||
// Start a read transaction pinned at the current snapshot
|
||||
{
|
||||
@@ -2882,9 +2879,9 @@ pub mod test {
|
||||
|
||||
// Advance WAL beyond the reader's snapshot
|
||||
bulk_inserts(&writer, 3, 4);
|
||||
while let IOResult::IO = writer.pager.borrow_mut().cacheflush().unwrap() {
|
||||
writer.run_once().unwrap();
|
||||
}
|
||||
db.io
|
||||
.block(|| writer.pager.borrow_mut().cacheflush())
|
||||
.unwrap();
|
||||
let mx_now = unsafe {
|
||||
(&*db.maybe_shared_wal.read().as_ref().unwrap().get())
|
||||
.max_frame
|
||||
|
||||
@@ -3793,9 +3793,7 @@ pub fn op_sorter_sort(
|
||||
let cursor = cursor.as_sorter_mut();
|
||||
let is_empty = cursor.is_empty();
|
||||
if !is_empty {
|
||||
if let IOResult::IO = cursor.sort()? {
|
||||
return Ok(InsnFunctionStepResult::IO);
|
||||
}
|
||||
return_if_io!(cursor.sort());
|
||||
}
|
||||
is_empty
|
||||
};
|
||||
@@ -3825,9 +3823,7 @@ pub fn op_sorter_next(
|
||||
let has_more = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_sorter_mut();
|
||||
if let IOResult::IO = cursor.next()? {
|
||||
return Ok(InsnFunctionStepResult::IO);
|
||||
}
|
||||
return_if_io!(cursor.next());
|
||||
cursor.has_more()
|
||||
};
|
||||
if has_more {
|
||||
@@ -6463,17 +6459,10 @@ pub fn op_populate_views(
|
||||
let conn = program.connection.clone();
|
||||
let schema = conn.schema.borrow();
|
||||
|
||||
match schema.populate_views(&conn)? {
|
||||
IOResult::Done(()) => {
|
||||
// All views populated, advance to next instruction
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
IOResult::IO => {
|
||||
// Need more IO, stay on this instruction
|
||||
Ok(InsnFunctionStepResult::IO)
|
||||
}
|
||||
}
|
||||
return_if_io!(schema.populate_views(&conn));
|
||||
// All views populated, advance to next instruction
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
pub fn op_read_cookie(
|
||||
@@ -8747,10 +8736,7 @@ pub fn op_max_pgcnt(
|
||||
pager.get_max_page_count()
|
||||
} else {
|
||||
// Set new maximum page count (will be clamped to current database size)
|
||||
match pager.set_max_page_count(*new_max as u32)? {
|
||||
IOResult::Done(new_max_count) => new_max_count,
|
||||
IOResult::IO => return Ok(InsnFunctionStepResult::IO),
|
||||
}
|
||||
return_if_io!(pager.set_max_page_count(*new_max as u32))
|
||||
};
|
||||
|
||||
state.registers[*dest] = Register::Value(Value::Integer(result_value.into()));
|
||||
|
||||
@@ -178,10 +178,7 @@ impl Sorter {
|
||||
self.records.pop()
|
||||
} else {
|
||||
// Serve from sorted chunk files.
|
||||
match self.next_from_chunk_heap()? {
|
||||
IOResult::IO => return Ok(IOResult::IO),
|
||||
IOResult::Done(record) => record,
|
||||
}
|
||||
return_if_io!(self.next_from_chunk_heap())
|
||||
};
|
||||
match record {
|
||||
Some(record) => {
|
||||
|
||||
@@ -5,7 +5,8 @@ use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use tempfile::TempDir;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
|
||||
use turso_core::{types::IOResult, Connection, Database, IO};
|
||||
use turso_core::IOExt;
|
||||
use turso_core::{Connection, Database, IO};
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct TempDatabase {
|
||||
@@ -115,17 +116,10 @@ impl TempDatabase {
|
||||
}
|
||||
|
||||
pub(crate) fn do_flush(conn: &Arc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {
|
||||
loop {
|
||||
match conn.cacheflush()? {
|
||||
IOResult::Done(_) => {
|
||||
break;
|
||||
}
|
||||
IOResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
tmp_db
|
||||
.io
|
||||
.block(|| conn.cacheflush())
|
||||
.map_err(anyhow::Error::from)
|
||||
}
|
||||
|
||||
pub(crate) fn compare_string(a: impl AsRef<str>, b: impl AsRef<str>) {
|
||||
|
||||
@@ -1832,7 +1832,6 @@ mod tests {
|
||||
|
||||
// Successfully added column, update our tracking
|
||||
let table_name = table_to_alter.clone();
|
||||
let col_name = new_col_name.clone();
|
||||
if let Some(columns) = current_tables.get_mut(&table_name) {
|
||||
columns.push(new_col_name);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user