diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 1e037a92f..cd26abad6 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -333,7 +333,11 @@ impl Connection { .inner .lock() .map_err(|e| Error::MutexError(e.to_string()))?; - let _res = conn.cacheflush()?; + let completions = conn.cacheflush()?; + let pager = conn.get_pager(); + for c in completions { + pager.io.wait_for_completion(c)?; + } Ok(()) } diff --git a/core/lib.rs b/core/lib.rs index f2109417d..b8324eb1e 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -1380,7 +1380,7 @@ impl Connection { } /// Flush dirty pages to disk. - pub fn cacheflush(&self) -> Result> { + pub fn cacheflush(&self) -> Result> { if self.closed.get() { return Err(LimboError::InternalError("Connection closed".to_string())); } diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 9eff5613f..e15f022a0 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -5,7 +5,6 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use tempfile::TempDir; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; -use turso_core::IOExt; use turso_core::{Connection, Database, IO}; #[allow(dead_code)] @@ -116,10 +115,11 @@ impl TempDatabase { } pub(crate) fn do_flush(conn: &Arc, tmp_db: &TempDatabase) -> anyhow::Result<()> { - tmp_db - .io - .block(|| conn.cacheflush()) - .map_err(anyhow::Error::from) + let completions = conn.cacheflush()?; + for c in completions { + tmp_db.io.wait_for_completion(c)?; + } + Ok(()) } pub(crate) fn compare_string(a: impl AsRef, b: impl AsRef) { @@ -249,7 +249,9 @@ pub(crate) fn rng_from_time() -> (ChaCha8Rng, u64) { mod tests { use std::{sync::Arc, vec}; use tempfile::{NamedTempFile, TempDir}; - use turso_core::{types::IOResult, Database, StepResult, IO}; + use turso_core::{Database, StepResult, IO}; + + use crate::common::do_flush; use super::{limbo_exec_rows, limbo_exec_rows_error, TempDatabase}; use rusqlite::types::Value; @@ -411,9 +413,7 @@ mod tests { // Begin transaction on first connection and insert a value let _ = limbo_exec_rows(&db, &conn1, "BEGIN"); let _ = limbo_exec_rows(&db, &conn1, "INSERT INTO t VALUES (42)"); - while matches!(conn1.cacheflush().unwrap(), IOResult::IO) { - db.io.run_once().unwrap(); - } + do_flush(&conn1, &db)?; // Second connection should not see uncommitted changes let conn2 = db.connect_limbo(); @@ -480,9 +480,7 @@ mod tests { // Begin transaction on first connection and insert a value let _ = limbo_exec_rows(&db, &conn, "BEGIN"); let _ = limbo_exec_rows(&db, &conn, "INSERT INTO t VALUES (42)"); - while matches!(conn.cacheflush().unwrap(), IOResult::IO) { - db.io.run_once().unwrap(); - } + do_flush(&conn, &db)?; // Rollback the transaction let _ = limbo_exec_rows(&db, &conn, "ROLLBACK"); @@ -524,9 +522,7 @@ mod tests { let _ = limbo_exec_rows(&db, &conn, "INSERT INTO t VALUES (42)"); // Flush to WAL but don't commit - while matches!(conn.cacheflush().unwrap(), IOResult::IO) { - db.io.run_once().unwrap(); - } + do_flush(&conn, &db)?; // Reopen the database without committing let db = TempDatabase::new_with_existent(&path, true);