diff --git a/core/storage/wal.rs b/core/storage/wal.rs index d21267bbb..b5c875cf2 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -874,7 +874,9 @@ impl Wal for WalFile { } } - if best_idx == -1 { + if best_idx == -1 || best_mark != shared_max as u32 { + // If we cannot find a valid slot or the highest readmark has a stale max frame, we must return busy; + // otherwise we would not see some committed changes. return Ok((LimboResult::Busy, db_changed)); } diff --git a/tests/integration/fuzz_transaction/mod.rs b/tests/integration/fuzz_transaction/mod.rs index 2d83b36ac..6e6046b78 100644 --- a/tests/integration/fuzz_transaction/mod.rs +++ b/tests/integration/fuzz_transaction/mod.rs @@ -493,9 +493,10 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { const NUM_ITERATIONS: usize = 50; const OPERATIONS_PER_CONNECTION: usize = 30; - const NUM_CONNECTIONS: usize = 2; + const MAX_NUM_CONNECTIONS: usize = 8; for iteration in 0..NUM_ITERATIONS { + let num_connections = rng.random_range(2..=MAX_NUM_CONNECTIONS); println!("--- Seed {seed} Iteration {iteration} ---"); // Create a fresh database for each iteration let tempfile = tempfile::NamedTempFile::new().unwrap(); @@ -529,7 +530,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { // Create connections let mut connections = Vec::new(); - for conn_id in 0..NUM_CONNECTIONS { + for conn_id in 0..num_connections { let conn = db.connect().unwrap(); // Create table if it doesn't exist @@ -735,18 +736,36 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { } let mut real_rows = Vec::new(); - while let Some(row) = rows.next().await.unwrap() { - let Value::Integer(id) = row.get_value(0).unwrap() else { - panic!("Unexpected value for id: {:?}", row.get_value(0)); - }; - let mut other_columns = HashMap::new(); - for i in 1..columns.len() { - let column = columns.get(i).unwrap(); - let value = row.get_value(i).unwrap(); - other_columns.insert(column.name().to_string(), value); + let ok = loop { + match rows.next().await { + Err(e) => { + if !e.to_string().contains("database is locked") { + panic!("Unexpected error during select: {e}"); + } + break false; + } + Ok(None) => { + break true; + } + Ok(Some(row)) => { + let Value::Integer(id) = row.get_value(0).unwrap() else { + panic!("Unexpected value for id: {:?}", row.get_value(0)); + }; + let mut other_columns = HashMap::new(); + for i in 1..columns.len() { + let column = columns.get(i).unwrap(); + let value = row.get_value(i).unwrap(); + other_columns.insert(column.name().to_string(), value); + } + real_rows.push(DbRow { id, other_columns }); + } } - real_rows.push(DbRow { id, other_columns }); + }; + + if !ok { + continue; } + real_rows.sort_by_key(|r| r.id); let mut expected_rows = visible_rows.clone();