core/mvcc/logical-log: test fuzz logical log read

This commit is contained in:
Pere Diaz Bou
2025-09-25 13:23:43 +02:00
parent dd0a71484d
commit 7b131b0b0d

View File

@@ -287,8 +287,8 @@ impl StreamingLogicalLogReader {
if self.is_eof() {
return Ok(StreamingResult::Eof);
}
let tx_id = self.consume_u64(io)?;
let checksum = self.consume_u64(io)?;
let _tx_id = self.consume_u64(io)?;
let _checksum = self.consume_u64(io)?;
let transaction_size = self.consume_u64(io)?;
self.state = StreamingState::NeedRow {
transaction_size,
@@ -309,7 +309,7 @@ impl StreamingLogicalLogReader {
}
let table_id = self.consume_u64(io)?;
let record_type = self.consume_u8(io)?;
let payload_size = self.consume_u64(io)?;
let _payload_size = self.consume_u64(io)?;
let mut bytes_read_on_row = 17; // table_id, record_type and payload_size
match LogRecordType::from_u8(record_type).unwrap() {
LogRecordType::DeleteRow => {
@@ -455,7 +455,13 @@ pub fn load_logical_log(
#[cfg(test)]
mod tests {
use std::{path::PathBuf, sync::Arc};
use std::{collections::HashSet, path::PathBuf, sync::Arc};
use rand::{thread_rng, Rng};
use rand_chacha::{
rand_core::{RngCore, SeedableRng},
ChaCha8Rng,
};
use crate::{
mvcc::{
@@ -470,7 +476,7 @@ mod tests {
OpenFlags, RefValue,
};
use super::load_logical_log;
use super::{load_logical_log, LogRecordType};
#[test]
fn test_logical_log_read() {
@@ -569,4 +575,117 @@ mod tests {
assert_eq!(foo.as_str(), value.as_str());
}
}
#[test]
fn test_logical_log_read_fuzz() {
let seed = thread_rng().gen();
let mut rng = ChaCha8Rng::seed_from_u64(seed);
let num_transactions = rng.next_u64() % 128;
let mut txns = vec![];
let mut present_rowids = HashSet::new();
let mut non_present_rowids = HashSet::new();
for _ in 0..num_transactions {
let num_operations = rng.next_u64() % 8;
let mut ops = vec![];
for _ in 0..num_operations {
let op_type = rng.next_u64() % 2;
match op_type {
0 => {
let row_id = rng.next_u64();
let rowid = RowID::new(1, row_id as i64);
let row = generate_simple_string_row(
rowid.table_id,
rowid.row_id,
&format!("row_{row_id}"),
);
ops.push((LogRecordType::InsertRow, Some(row), rowid));
present_rowids.insert(rowid);
non_present_rowids.remove(&rowid);
tracing::debug!("insert {rowid:?}");
}
1 => {
if present_rowids.is_empty() {
continue;
}
let row_id_pos = rng.next_u64() as usize % present_rowids.len();
let row_id = *present_rowids.iter().nth(row_id_pos).unwrap();
ops.push((LogRecordType::DeleteRow, None, row_id));
present_rowids.remove(&row_id);
non_present_rowids.insert(row_id);
tracing::debug!("removed {row_id:?}");
}
_ => unreachable!(),
}
}
txns.push(ops);
}
// let's not drop db as we don't want files to be removed
let db = MvccTestDbNoConn::new_with_random_db();
let (db_path, io, pager) = {
let conn = db.connect();
let pager = conn.pager.read().clone();
let mvcc_store = db.get_mvcc_store();
for ops in &txns {
let tx_id = mvcc_store.begin_tx(pager.clone()).unwrap();
for (op_type, maybe_row, rowid) in ops {
match op_type {
LogRecordType::DeleteRow => {
mvcc_store.delete(tx_id, *rowid).unwrap();
}
LogRecordType::InsertRow => {
mvcc_store
.insert(tx_id, maybe_row.as_ref().unwrap().clone())
.unwrap();
}
}
}
commit_tx(mvcc_store.clone(), &conn, tx_id).unwrap();
}
conn.close().unwrap();
let db = db.get_db();
(db.path.clone(), db.io.clone(), pager)
};
// Now try to read it back
let db_path = PathBuf::from(db_path);
let mut log_file = db_path.clone();
let filename = log_file
.file_name()
.and_then(|name| name.to_str())
.map(|s| format!("{s}-lg"))
.unwrap();
log_file.set_file_name(filename);
let file = io
.open_file(log_file.to_str().unwrap(), OpenFlags::ReadOnly, false)
.unwrap();
let mvcc_store = Arc::new(MvStore::new(LocalClock::new(), Storage::new(file.clone())));
load_logical_log(&mvcc_store, file, &io, &pager).unwrap();
// Check rowids that weren't deleted
let tx = mvcc_store.begin_tx(pager.clone()).unwrap();
for present_rowid in present_rowids {
let row = mvcc_store.read(tx, present_rowid).unwrap().unwrap();
let record = ImmutableRecord::from_bin_record(row.data.clone());
let values = record.get_values();
let foo = values.first().unwrap();
let RefValue::Text(foo) = foo else {
unreachable!()
};
assert_eq!(foo.as_str(), format!("row_{}", present_rowid.row_id as u64));
}
// Check rowids that were deleted
let tx = mvcc_store.begin_tx(pager.clone()).unwrap();
for present_rowid in non_present_rowids {
let row = mvcc_store.read(tx, present_rowid).unwrap();
assert!(
row.is_none(),
"row {present_rowid:?} should have been removed"
);
}
}
}