diff --git a/tests/integration/query_processing/test_write_path.rs b/tests/integration/query_processing/test_write_path.rs index 9c6107d58..bab91e3ff 100644 --- a/tests/integration/query_processing/test_write_path.rs +++ b/tests/integration/query_processing/test_write_path.rs @@ -1,6 +1,6 @@ use crate::common::{self, maybe_setup_tracing}; use crate::common::{compare_string, do_flush, TempDatabase}; -use limbo_core::{Connection, OwnedValue, StepResult}; +use limbo_core::{Connection, OwnedValue, Row, StepResult}; use log::debug; use std::rc::Rc; @@ -153,52 +153,19 @@ fn test_sequential_write() -> anyhow::Result<()> { println!("progress {:.1}%", progress); } let insert_query = format!("INSERT INTO test VALUES ({})", i); - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, &insert_query)?; let mut current_read_index = 0; - match conn.query(list_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let first_value = row.get::<&OwnedValue>(0).expect("missing id"); - let id = match first_value { - limbo_core::OwnedValue::Integer(i) => *i as i32, - limbo_core::OwnedValue::Float(f) => *f as i32, - _ => unreachable!(), - }; - assert_eq!(current_read_index, id); - current_read_index += 1; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => { - panic!("Database is busy"); - } - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + run_query_on_row(&tmp_db, &conn, &list_query, |row: &Row| { + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + limbo_core::OwnedValue::Integer(i) => *i as i32, + limbo_core::OwnedValue::Float(f) => *f as i32, + _ => unreachable!(), + }; + assert_eq!(current_read_index, id); + current_read_index += 1; + })?; common::do_flush(&conn, &tmp_db)?; } Ok(()) @@ -215,55 +182,22 @@ fn test_regression_multi_row_insert() -> anyhow::Result<()> { let insert_query = "INSERT INTO test VALUES (-2), (-3), (-1)"; let list_query = "SELECT * FROM test"; - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, insert_query)?; common::do_flush(&conn, &tmp_db)?; let mut current_read_index = 1; let expected_ids = vec![-3, -2, -1]; let mut actual_ids = Vec::new(); - match conn.query(list_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let first_value = row.get::<&OwnedValue>(0).expect("missing id"); - let id = match first_value { - OwnedValue::Float(f) => *f as i32, - _ => panic!("expected float"), - }; - actual_ids.push(id); - current_read_index += 1; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => { - panic!("Database is busy"); - } - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| { + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + OwnedValue::Float(f) => *f as i32, + _ => panic!("expected float"), + }; + actual_ids.push(id); + current_read_index += 1; + })?; assert_eq!(current_read_index, 4); // Verify we read all rows // sort ids @@ -331,49 +265,18 @@ fn test_wal_checkpoint() -> anyhow::Result<()> { let insert_query = format!("INSERT INTO test VALUES ({})", i); do_flush(&conn, &tmp_db)?; conn.checkpoint()?; - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, &insert_query)?; } do_flush(&conn, &tmp_db)?; conn.clear_page_cache()?; let list_query = "SELECT * FROM test LIMIT 1"; let mut current_index = 0; - match conn.query(list_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let id = row.get::(0).unwrap(); - assert_eq!(current_index, id as usize); - current_index += 1; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| { + let id = row.get::(0).unwrap(); + assert_eq!(current_index, id as usize); + current_index += 1; + })?; do_flush(&conn, &tmp_db)?; Ok(()) } @@ -387,21 +290,7 @@ fn test_wal_restart() -> anyhow::Result<()> { fn insert(i: usize, conn: &Rc, tmp_db: &TempDatabase) -> anyhow::Result<()> { debug!("inserting {}", i); let insert_query = format!("INSERT INTO test VALUES ({})", i); - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(tmp_db, conn, &insert_query)?; debug!("inserted {}", i); tmp_db.io.run_once()?; Ok(()) @@ -410,26 +299,13 @@ fn test_wal_restart() -> anyhow::Result<()> { fn count(conn: &Rc, tmp_db: &TempDatabase) -> anyhow::Result { debug!("counting"); let list_query = "SELECT count(x) FROM test"; - loop { - if let Some(ref mut rows) = conn.query(list_query)? { - loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let count = row.get::(0).unwrap(); - debug!("counted {}", count); - return Ok(count as usize); - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => panic!("Database is busy"), - } - } - } - } + let mut count = None; + run_query_on_row(tmp_db, conn, list_query, |row: &Row| { + assert!(count.is_none()); + count = Some(row.get::(0).unwrap() as usize); + debug!("counted {:?}", count); + })?; + Ok(count.unwrap()) } { @@ -476,113 +352,86 @@ fn test_write_delete_with_index() -> anyhow::Result<()> { let max_iterations = 1000; for i in 0..max_iterations { println!("inserting {} ", i); - if (i % 100) == 0 { - let progress = (i as f64 / max_iterations as f64) * 100.0; - println!("progress {:.1}%", progress); - } let insert_query = format!("INSERT INTO test VALUES ({})", i); - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, &insert_query)?; } for i in 0..max_iterations { println!("deleting {} ", i); - if (i % 100) == 0 { - let progress = (i as f64 / max_iterations as f64) * 100.0; - println!("progress {:.1}%", progress); - } let delete_query = format!("delete from test where x={}", i); - match conn.query(delete_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, &delete_query)?; println!("listing after deleting {} ", i); let mut current_read_index = i + 1; - match conn.query(list_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let first_value = row.get::<&OwnedValue>(0).expect("missing id"); - let id = match first_value { - limbo_core::OwnedValue::Integer(i) => *i as i32, - limbo_core::OwnedValue::Float(f) => *f as i32, - _ => unreachable!(), - }; - assert_eq!(current_read_index, id); - current_read_index += 1; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => { - panic!("Database is busy"); - } - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| { + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + limbo_core::OwnedValue::Integer(i) => *i as i32, + limbo_core::OwnedValue::Float(f) => *f as i32, + _ => unreachable!(), + }; + assert_eq!(current_read_index, id); + current_read_index += 1; + })?; for i in i + 1..max_iterations { // now test with seek - match conn.query(format!("select * from test where x = {}", i)) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let first_value = row.get::<&OwnedValue>(0).expect("missing id"); - let id = match first_value { - limbo_core::OwnedValue::Integer(i) => *i as i32, - limbo_core::OwnedValue::Float(f) => *f as i32, - _ => unreachable!(), - }; - assert_eq!(i, id); - break; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => { - panic!("Database is busy"); - } - } + run_query_on_row( + &tmp_db, + &conn, + &format!("select * from test where x = {}", i), + |row| { + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + limbo_core::OwnedValue::Integer(i) => *i as i32, + limbo_core::OwnedValue::Float(f) => *f as i32, + _ => unreachable!(), + }; + assert_eq!(i, id); }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + )?; } } Ok(()) } + +fn run_query(tmp_db: &TempDatabase, conn: &Rc, query: &str) -> anyhow::Result<()> { + run_query_core(tmp_db, conn, query, None::) +} + +fn run_query_on_row( + tmp_db: &TempDatabase, + conn: &Rc, + query: &str, + on_row: impl FnMut(&Row), +) -> anyhow::Result<()> { + run_query_core(tmp_db, conn, query, Some(on_row)) +} + +fn run_query_core( + tmp_db: &TempDatabase, + conn: &Rc, + query: &str, + mut on_row: Option, +) -> anyhow::Result<()> { + match conn.query(query) { + Ok(Some(ref mut rows)) => loop { + match rows.step()? { + StepResult::IO => { + tmp_db.io.run_once()?; + } + StepResult::Done => break, + StepResult::Row => { + if let Some(on_row) = on_row.as_mut() { + let row = rows.row().unwrap(); + on_row(row) + } + } + _ => unreachable!(), + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + }; + Ok(()) +}