From c30d320cab76af253e50e8f89fc9b98f60fd3474 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Thu, 11 Sep 2025 15:58:13 +0300 Subject: [PATCH] Fix: read transaction cannot be allowed to start with a stale max frame If both of the following are true: 1. All read locks are already held 2. The highest readmark of any read lock is less than the committed max frame Then we must return Busy to the reader, because otherwise they would begin a transaction with a stale local max frame, and thus not see some committed changes. --- core/storage/wal.rs | 4 ++- tests/integration/fuzz_transaction/mod.rs | 43 ++++++++++++++++------- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index d21267bbb..b5c875cf2 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -874,7 +874,9 @@ impl Wal for WalFile { } } - if best_idx == -1 { + if best_idx == -1 || best_mark != shared_max as u32 { + // If we cannot find a valid slot or the highest readmark has a stale max frame, we must return busy; + // otherwise we would not see some committed changes. return Ok((LimboResult::Busy, db_changed)); } diff --git a/tests/integration/fuzz_transaction/mod.rs b/tests/integration/fuzz_transaction/mod.rs index 2d83b36ac..6e6046b78 100644 --- a/tests/integration/fuzz_transaction/mod.rs +++ b/tests/integration/fuzz_transaction/mod.rs @@ -493,9 +493,10 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { const NUM_ITERATIONS: usize = 50; const OPERATIONS_PER_CONNECTION: usize = 30; - const NUM_CONNECTIONS: usize = 2; + const MAX_NUM_CONNECTIONS: usize = 8; for iteration in 0..NUM_ITERATIONS { + let num_connections = rng.random_range(2..=MAX_NUM_CONNECTIONS); println!("--- Seed {seed} Iteration {iteration} ---"); // Create a fresh database for each iteration let tempfile = tempfile::NamedTempFile::new().unwrap(); @@ -529,7 +530,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { // Create connections let mut connections = Vec::new(); - for conn_id in 0..NUM_CONNECTIONS { + for conn_id in 0..num_connections { let conn = db.connect().unwrap(); // Create table if it doesn't exist @@ -735,18 +736,36 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { } let mut real_rows = Vec::new(); - while let Some(row) = rows.next().await.unwrap() { - let Value::Integer(id) = row.get_value(0).unwrap() else { - panic!("Unexpected value for id: {:?}", row.get_value(0)); - }; - let mut other_columns = HashMap::new(); - for i in 1..columns.len() { - let column = columns.get(i).unwrap(); - let value = row.get_value(i).unwrap(); - other_columns.insert(column.name().to_string(), value); + let ok = loop { + match rows.next().await { + Err(e) => { + if !e.to_string().contains("database is locked") { + panic!("Unexpected error during select: {e}"); + } + break false; + } + Ok(None) => { + break true; + } + Ok(Some(row)) => { + let Value::Integer(id) = row.get_value(0).unwrap() else { + panic!("Unexpected value for id: {:?}", row.get_value(0)); + }; + let mut other_columns = HashMap::new(); + for i in 1..columns.len() { + let column = columns.get(i).unwrap(); + let value = row.get_value(i).unwrap(); + other_columns.insert(column.name().to_string(), value); + } + real_rows.push(DbRow { id, other_columns }); + } } - real_rows.push(DbRow { id, other_columns }); + }; + + if !ok { + continue; } + real_rows.sort_by_key(|r| r.id); let mut expected_rows = visible_rows.clone();