run_query helper for test_write_path

This commit is contained in:
Pere Diaz Bou
2025-05-01 11:16:25 +03:00
parent 64a12ed887
commit e503bb4641

View File

@@ -1,6 +1,6 @@
use crate::common::{self, maybe_setup_tracing};
use crate::common::{compare_string, do_flush, TempDatabase};
use limbo_core::{Connection, OwnedValue, StepResult};
use limbo_core::{Connection, OwnedValue, Row, StepResult};
use log::debug;
use std::rc::Rc;
@@ -153,52 +153,19 @@ fn test_sequential_write() -> anyhow::Result<()> {
println!("progress {:.1}%", progress);
}
let insert_query = format!("INSERT INTO test VALUES ({})", i);
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
run_query(&tmp_db, &conn, &insert_query)?;
let mut current_read_index = 0;
match conn.query(list_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(current_read_index, id);
current_read_index += 1;
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => {
panic!("Database is busy");
}
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
}
run_query_on_row(&tmp_db, &conn, &list_query, |row: &Row| {
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(current_read_index, id);
current_read_index += 1;
})?;
common::do_flush(&conn, &tmp_db)?;
}
Ok(())
@@ -215,55 +182,22 @@ fn test_regression_multi_row_insert() -> anyhow::Result<()> {
let insert_query = "INSERT INTO test VALUES (-2), (-3), (-1)";
let list_query = "SELECT * FROM test";
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
run_query(&tmp_db, &conn, insert_query)?;
common::do_flush(&conn, &tmp_db)?;
let mut current_read_index = 1;
let expected_ids = vec![-3, -2, -1];
let mut actual_ids = Vec::new();
match conn.query(list_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
OwnedValue::Float(f) => *f as i32,
_ => panic!("expected float"),
};
actual_ids.push(id);
current_read_index += 1;
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => {
panic!("Database is busy");
}
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
}
run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| {
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
OwnedValue::Float(f) => *f as i32,
_ => panic!("expected float"),
};
actual_ids.push(id);
current_read_index += 1;
})?;
assert_eq!(current_read_index, 4); // Verify we read all rows
// sort ids
@@ -331,49 +265,18 @@ fn test_wal_checkpoint() -> anyhow::Result<()> {
let insert_query = format!("INSERT INTO test VALUES ({})", i);
do_flush(&conn, &tmp_db)?;
conn.checkpoint()?;
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
run_query(&tmp_db, &conn, &insert_query)?;
}
do_flush(&conn, &tmp_db)?;
conn.clear_page_cache()?;
let list_query = "SELECT * FROM test LIMIT 1";
let mut current_index = 0;
match conn.query(list_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let id = row.get::<i64>(0).unwrap();
assert_eq!(current_index, id as usize);
current_index += 1;
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
}
run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| {
let id = row.get::<i64>(0).unwrap();
assert_eq!(current_index, id as usize);
current_index += 1;
})?;
do_flush(&conn, &tmp_db)?;
Ok(())
}
@@ -387,21 +290,7 @@ fn test_wal_restart() -> anyhow::Result<()> {
fn insert(i: usize, conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {
debug!("inserting {}", i);
let insert_query = format!("INSERT INTO test VALUES ({})", i);
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
run_query(tmp_db, conn, &insert_query)?;
debug!("inserted {}", i);
tmp_db.io.run_once()?;
Ok(())
@@ -410,26 +299,13 @@ fn test_wal_restart() -> anyhow::Result<()> {
fn count(conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<usize> {
debug!("counting");
let list_query = "SELECT count(x) FROM test";
loop {
if let Some(ref mut rows) = conn.query(list_query)? {
loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let count = row.get::<i64>(0).unwrap();
debug!("counted {}", count);
return Ok(count as usize);
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => panic!("Database is busy"),
}
}
}
}
let mut count = None;
run_query_on_row(tmp_db, conn, list_query, |row: &Row| {
assert!(count.is_none());
count = Some(row.get::<i64>(0).unwrap() as usize);
debug!("counted {:?}", count);
})?;
Ok(count.unwrap())
}
{
@@ -476,113 +352,86 @@ fn test_write_delete_with_index() -> anyhow::Result<()> {
let max_iterations = 1000;
for i in 0..max_iterations {
println!("inserting {} ", i);
if (i % 100) == 0 {
let progress = (i as f64 / max_iterations as f64) * 100.0;
println!("progress {:.1}%", progress);
}
let insert_query = format!("INSERT INTO test VALUES ({})", i);
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
run_query(&tmp_db, &conn, &insert_query)?;
}
for i in 0..max_iterations {
println!("deleting {} ", i);
if (i % 100) == 0 {
let progress = (i as f64 / max_iterations as f64) * 100.0;
println!("progress {:.1}%", progress);
}
let delete_query = format!("delete from test where x={}", i);
match conn.query(delete_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
run_query(&tmp_db, &conn, &delete_query)?;
println!("listing after deleting {} ", i);
let mut current_read_index = i + 1;
match conn.query(list_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(current_read_index, id);
current_read_index += 1;
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => {
panic!("Database is busy");
}
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
}
run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| {
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(current_read_index, id);
current_read_index += 1;
})?;
for i in i + 1..max_iterations {
// now test with seek
match conn.query(format!("select * from test where x = {}", i)) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(i, id);
break;
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => {
panic!("Database is busy");
}
}
run_query_on_row(
&tmp_db,
&conn,
&format!("select * from test where x = {}", i),
|row| {
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(i, id);
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
}
)?;
}
}
Ok(())
}
fn run_query(tmp_db: &TempDatabase, conn: &Rc<Connection>, query: &str) -> anyhow::Result<()> {
run_query_core(tmp_db, conn, query, None::<fn(&Row)>)
}
fn run_query_on_row(
tmp_db: &TempDatabase,
conn: &Rc<Connection>,
query: &str,
on_row: impl FnMut(&Row),
) -> anyhow::Result<()> {
run_query_core(tmp_db, conn, query, Some(on_row))
}
fn run_query_core(
tmp_db: &TempDatabase,
conn: &Rc<Connection>,
query: &str,
mut on_row: Option<impl FnMut(&Row)>,
) -> anyhow::Result<()> {
match conn.query(query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
StepResult::Row => {
if let Some(on_row) = on_row.as_mut() {
let row = rows.row().unwrap();
on_row(row)
}
}
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
Ok(())
}