From e87226548ccd595eb97ca21b63e9eb11207f0b35 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 12 Sep 2025 13:49:40 +0000 Subject: [PATCH] core/mvcc: fix concurrent tests mvcc --- core/mvcc/database/tests.rs | 90 ++++++++++++++++++++++++++++++++----- 1 file changed, 78 insertions(+), 12 deletions(-) diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index f5d7f7d08..a348d89d0 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -1235,10 +1235,21 @@ fn test_commit_without_tx() { fn get_rows(conn: &Arc, query: &str) -> Vec> { let mut stmt = conn.prepare(query).unwrap(); let mut rows = Vec::new(); - while let StepResult::Row = stmt.step().unwrap() { - let row = stmt.row().unwrap(); - let values = row.get_values().cloned().collect::>(); - rows.push(values); + loop { + match stmt.step().unwrap() { + StepResult::Row => { + let row = stmt.row().unwrap(); + let values = row.get_values().cloned().collect::>(); + rows.push(values); + } + StepResult::Done => break, + StepResult::IO => { + stmt.run_once().unwrap(); + } + StepResult::Interrupt | StepResult::Busy => { + panic!("unexpected step result"); + } + } } rows } @@ -1252,17 +1263,21 @@ fn test_concurrent_writes() { current_statement: Option, } let db = MvccTestDbNoConn::new_with_random_db(); - let mut connecitons = Vec::new(); + let mut connections = Vec::new(); { let conn = db.connect(); conn.execute("CREATE TABLE test (x)").unwrap(); conn.close().unwrap(); } - for i in 0..2 { + let num_connections = 20; + let num_inserts_per_connection = 10000; + for i in 0..num_connections { let conn = db.connect(); - let mut inserts = ((100 * i)..(100 * (i + 1))).collect::>(); + let mut inserts = ((num_inserts_per_connection * i) + ..(num_inserts_per_connection * (i + 1))) + .collect::>(); inserts.reverse(); - connecitons.push(ConnectionState { + connections.push(ConnectionState { conn, inserts, current_statement: None, @@ -1271,14 +1286,14 @@ fn test_concurrent_writes() { loop { let mut all_finished = true; - for conn in &mut connecitons { - if !conn.inserts.is_empty() && conn.current_statement.is_none() { + for conn in &mut connections { + if !conn.inserts.is_empty() || conn.current_statement.is_some() { all_finished = false; break; } } - for (conn_id, conn) in connecitons.iter_mut().enumerate() { - println!("connection {conn_id} inserts: {:?}", conn.inserts); + for (conn_id, conn) in connections.iter_mut().enumerate() { + // println!("connection {conn_id} inserts: {:?}", conn.inserts); if conn.current_statement.is_none() && !conn.inserts.is_empty() { let write = conn.inserts.pop().unwrap(); println!("inserting row {write} from connection {conn_id}"); @@ -1291,6 +1306,7 @@ fn test_concurrent_writes() { if conn.current_statement.is_none() { continue; } + println!("connection step {conn_id}"); let stmt = conn.current_statement.as_mut().unwrap(); match stmt.step().unwrap() { // These you be only possible cases in write concurrency. @@ -1298,11 +1314,17 @@ fn test_concurrent_writes() { // No interrupt because insert doesn't interrupt // No busy because insert in mvcc should be multi concurrent write StepResult::Done => { + println!("connection {conn_id} done"); conn.current_statement = None; } StepResult::IO => { // let's skip doing I/O here, we want to perform io only after all the statements are stepped } + StepResult::Busy => { + println!("connection {conn_id} busy"); + // stmt.reprepare().unwrap(); + unreachable!(); + } _ => { unreachable!() } @@ -1311,7 +1333,51 @@ fn test_concurrent_writes() { db.get_db().io.step().unwrap(); if all_finished { + println!("all finished"); break; } } + + // Now let's find out if we wrote everything we intended to write. + let conn = db.connect(); + let rows = get_rows(&conn, "SELECT * FROM test ORDER BY x ASC"); + assert_eq!( + rows.len() as i64, + num_connections * num_inserts_per_connection + ); + for (row_id, row) in rows.iter().enumerate() { + assert_eq!(row[0].as_int().unwrap(), row_id as i64); + } + conn.close().unwrap(); +} + +fn generate_batched_insert(num_inserts: usize) -> String { + let mut inserts = String::from("INSERT INTO test (x) VALUES "); + for i in 0..num_inserts { + inserts.push_str(&format!("({i})")); + if i < num_inserts - 1 { + inserts.push(','); + } + } + inserts.push(';'); + inserts +} +#[test] +#[ignore] +fn test_batch_writes() { + let mut start = 0; + let mut end = 5000; + while start < end { + let i = ((end - start) / 2) + start; + let db = MvccTestDbNoConn::new_with_random_db(); + let conn = db.connect(); + conn.execute("CREATE TABLE test (x)").unwrap(); + let inserts = generate_batched_insert(i); + if conn.execute(inserts.clone()).is_err() { + end = i; + } else { + start = i + 1; + } + } + println!("start: {start} end: {end}"); }