introduce concurrent write test

The idea is quite simple: write with 4 concurrent writers and once all
are finsihed, check the count of rows written is correct.
This commit is contained in:
Pere Diaz Bou
2025-06-18 16:52:42 +02:00
parent 27a164bd04
commit 10d02525d6
3 changed files with 166 additions and 25 deletions

View File

@@ -1,9 +1,21 @@
use crate::common::{self, maybe_setup_tracing};
use crate::common::{compare_string, do_flush, TempDatabase};
use limbo_core::{Connection, Row, StepResult, Value};
use limbo_core::{Connection, Row, Statement, StepResult, Value};
use log::debug;
use std::sync::Arc;
#[macro_export]
macro_rules! change_state {
($current:expr, $pattern:pat => $selector:expr) => {
let state = match std::mem::replace($current, unsafe { std::mem::zeroed() }) {
$pattern => $selector,
_ => panic!("unexpected state"),
};
#[allow(clippy::forget_non_drop)]
std::mem::forget(std::mem::replace($current, state));
};
}
#[test]
#[ignore]
fn test_simple_overflow_page() -> anyhow::Result<()> {
@@ -448,6 +460,124 @@ fn test_delete_with_index() -> anyhow::Result<()> {
Ok(())
}
enum ConnectionState {
PrepareQuery { query_idx: usize },
ExecuteQuery { query_idx: usize, stmt: Statement },
Done,
}
struct ConnectionPlan {
queries: Vec<String>,
conn: Arc<Connection>,
state: ConnectionState,
}
impl ConnectionPlan {
pub fn step(&mut self) -> anyhow::Result<bool> {
loop {
match &mut self.state {
ConnectionState::PrepareQuery { query_idx } => {
if *query_idx >= self.queries.len() {
self.state = ConnectionState::Done;
return Ok(true);
}
let query = &self.queries[*query_idx];
tracing::info!("preparing {}", query);
let stmt = self.conn.query(query)?.unwrap();
self.state = ConnectionState::ExecuteQuery {
query_idx: *query_idx,
stmt,
};
}
ConnectionState::ExecuteQuery { stmt, query_idx } => loop {
let query = &self.queries[*query_idx];
tracing::info!("stepping {}", query);
let current_query_idx = *query_idx;
let step_result = stmt.step()?;
match step_result {
StepResult::IO => {
return Ok(false);
}
StepResult::Done => {
change_state!(&mut self.state, ConnectionState::ExecuteQuery { .. } => ConnectionState::PrepareQuery { query_idx: current_query_idx + 1 });
return Ok(false);
}
StepResult::Row => {}
StepResult::Busy => {
return Ok(false);
}
_ => unreachable!(),
}
},
ConnectionState::Done => {
return Ok(true);
}
}
}
}
pub fn is_finished(&self) -> bool {
matches!(self.state, ConnectionState::Done)
}
}
#[test]
fn test_write_concurrent_connections() -> anyhow::Result<()> {
let _ = env_logger::try_init();
maybe_setup_tracing();
let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE t(x)");
let num_connections = 4;
let num_inserts_per_connection = 100;
let mut connections = vec![];
for connection_idx in 0..num_connections {
let conn = tmp_db.connect_limbo();
let mut queries = Vec::with_capacity(num_inserts_per_connection);
for query_idx in 0..num_inserts_per_connection {
queries.push(format!(
"INSERT INTO t VALUES({})",
(connection_idx * num_inserts_per_connection) + query_idx
));
}
connections.push(ConnectionPlan {
queries,
conn,
state: ConnectionState::PrepareQuery { query_idx: 0 },
});
}
let mut connections_finished = 0;
while connections_finished != num_connections {
for conn in &mut connections {
if conn.is_finished() {
continue;
}
let finished = conn.step()?;
if finished {
connections_finished += 1;
}
}
}
let conn = tmp_db.connect_limbo();
// run_query_on_row(&tmp_db, &conn, "SELECT * from t", |row: &Row| {
// for value in row.get_values() {
// tracing::info!("got value {:?}", value);
// }
// })?;
run_query_on_row(&tmp_db, &conn, "SELECT count(1) from t", |row: &Row| {
let count = row.get::<i64>(0).unwrap();
assert_eq!(
count,
(num_connections * num_inserts_per_connection) as i64,
"received wrong number of rows"
);
})?;
Ok(())
}
fn run_query(tmp_db: &TempDatabase, conn: &Arc<Connection>, query: &str) -> anyhow::Result<()> {
run_query_core(tmp_db, conn, query, None::<fn(&Row)>)
}