Merge 'Fix: read transaction cannot be allowed to start with a stale max frame' from Jussi Saurio

If both of the following are true:
1. All read locks are already held
2. The highest readmark of any read lock is less than the committed max
frame
Then we must return Busy to the reader, because otherwise they would
begin a transaction with a stale local max frame, and thus not see some
committed changes.
Closes #3016

Reviewed-by: Preston Thorpe <preston@turso.tech>

Closes #3023
This commit is contained in:
Jussi Saurio
2025-09-11 16:20:05 +03:00
committed by GitHub
2 changed files with 34 additions and 13 deletions

View File

@@ -874,7 +874,9 @@ impl Wal for WalFile {
}
}
if best_idx == -1 {
if best_idx == -1 || best_mark != shared_max as u32 {
// If we cannot find a valid slot or the highest readmark has a stale max frame, we must return busy;
// otherwise we would not see some committed changes.
return Ok((LimboResult::Busy, db_changed));
}

View File

@@ -493,9 +493,10 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
const NUM_ITERATIONS: usize = 50;
const OPERATIONS_PER_CONNECTION: usize = 30;
const NUM_CONNECTIONS: usize = 2;
const MAX_NUM_CONNECTIONS: usize = 8;
for iteration in 0..NUM_ITERATIONS {
let num_connections = rng.random_range(2..=MAX_NUM_CONNECTIONS);
println!("--- Seed {seed} Iteration {iteration} ---");
// Create a fresh database for each iteration
let tempfile = tempfile::NamedTempFile::new().unwrap();
@@ -529,7 +530,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
// Create connections
let mut connections = Vec::new();
for conn_id in 0..NUM_CONNECTIONS {
for conn_id in 0..num_connections {
let conn = db.connect().unwrap();
// Create table if it doesn't exist
@@ -735,18 +736,36 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
}
let mut real_rows = Vec::new();
while let Some(row) = rows.next().await.unwrap() {
let Value::Integer(id) = row.get_value(0).unwrap() else {
panic!("Unexpected value for id: {:?}", row.get_value(0));
};
let mut other_columns = HashMap::new();
for i in 1..columns.len() {
let column = columns.get(i).unwrap();
let value = row.get_value(i).unwrap();
other_columns.insert(column.name().to_string(), value);
let ok = loop {
match rows.next().await {
Err(e) => {
if !e.to_string().contains("database is locked") {
panic!("Unexpected error during select: {e}");
}
break false;
}
Ok(None) => {
break true;
}
Ok(Some(row)) => {
let Value::Integer(id) = row.get_value(0).unwrap() else {
panic!("Unexpected value for id: {:?}", row.get_value(0));
};
let mut other_columns = HashMap::new();
for i in 1..columns.len() {
let column = columns.get(i).unwrap();
let value = row.get_value(i).unwrap();
other_columns.insert(column.name().to_string(), value);
}
real_rows.push(DbRow { id, other_columns });
}
}
real_rows.push(DbRow { id, other_columns });
};
if !ok {
continue;
}
real_rows.sort_by_key(|r| r.id);
let mut expected_rows = visible_rows.clone();