diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index fc5f36563..c9fa74bee 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -287,8 +287,8 @@ impl StreamingLogicalLogReader { if self.is_eof() { return Ok(StreamingResult::Eof); } - let tx_id = self.consume_u64(io)?; - let checksum = self.consume_u64(io)?; + let _tx_id = self.consume_u64(io)?; + let _checksum = self.consume_u64(io)?; let transaction_size = self.consume_u64(io)?; self.state = StreamingState::NeedRow { transaction_size, @@ -309,7 +309,7 @@ impl StreamingLogicalLogReader { } let table_id = self.consume_u64(io)?; let record_type = self.consume_u8(io)?; - let payload_size = self.consume_u64(io)?; + let _payload_size = self.consume_u64(io)?; let mut bytes_read_on_row = 17; // table_id, record_type and payload_size match LogRecordType::from_u8(record_type).unwrap() { LogRecordType::DeleteRow => { @@ -455,7 +455,13 @@ pub fn load_logical_log( #[cfg(test)] mod tests { - use std::{path::PathBuf, sync::Arc}; + use std::{collections::HashSet, path::PathBuf, sync::Arc}; + + use rand::{thread_rng, Rng}; + use rand_chacha::{ + rand_core::{RngCore, SeedableRng}, + ChaCha8Rng, + }; use crate::{ mvcc::{ @@ -470,7 +476,7 @@ mod tests { OpenFlags, RefValue, }; - use super::load_logical_log; + use super::{load_logical_log, LogRecordType}; #[test] fn test_logical_log_read() { @@ -569,4 +575,117 @@ mod tests { assert_eq!(foo.as_str(), value.as_str()); } } + + #[test] + fn test_logical_log_read_fuzz() { + let seed = thread_rng().gen(); + let mut rng = ChaCha8Rng::seed_from_u64(seed); + let num_transactions = rng.next_u64() % 128; + let mut txns = vec![]; + let mut present_rowids = HashSet::new(); + let mut non_present_rowids = HashSet::new(); + for _ in 0..num_transactions { + let num_operations = rng.next_u64() % 8; + let mut ops = vec![]; + for _ in 0..num_operations { + let op_type = rng.next_u64() % 2; + match op_type { + 0 => { + let row_id = rng.next_u64(); + let rowid = RowID::new(1, row_id as i64); + let row = generate_simple_string_row( + rowid.table_id, + rowid.row_id, + &format!("row_{row_id}"), + ); + ops.push((LogRecordType::InsertRow, Some(row), rowid)); + present_rowids.insert(rowid); + non_present_rowids.remove(&rowid); + tracing::debug!("insert {rowid:?}"); + } + 1 => { + if present_rowids.is_empty() { + continue; + } + let row_id_pos = rng.next_u64() as usize % present_rowids.len(); + let row_id = *present_rowids.iter().nth(row_id_pos).unwrap(); + ops.push((LogRecordType::DeleteRow, None, row_id)); + present_rowids.remove(&row_id); + non_present_rowids.insert(row_id); + tracing::debug!("removed {row_id:?}"); + } + _ => unreachable!(), + } + } + txns.push(ops); + } + // let's not drop db as we don't want files to be removed + let db = MvccTestDbNoConn::new_with_random_db(); + let (db_path, io, pager) = { + let conn = db.connect(); + let pager = conn.pager.read().clone(); + let mvcc_store = db.get_mvcc_store(); + + for ops in &txns { + let tx_id = mvcc_store.begin_tx(pager.clone()).unwrap(); + for (op_type, maybe_row, rowid) in ops { + match op_type { + LogRecordType::DeleteRow => { + mvcc_store.delete(tx_id, *rowid).unwrap(); + } + LogRecordType::InsertRow => { + mvcc_store + .insert(tx_id, maybe_row.as_ref().unwrap().clone()) + .unwrap(); + } + } + } + commit_tx(mvcc_store.clone(), &conn, tx_id).unwrap(); + } + + conn.close().unwrap(); + let db = db.get_db(); + (db.path.clone(), db.io.clone(), pager) + }; + + // Now try to read it back + let db_path = PathBuf::from(db_path); + let mut log_file = db_path.clone(); + let filename = log_file + .file_name() + .and_then(|name| name.to_str()) + .map(|s| format!("{s}-lg")) + .unwrap(); + log_file.set_file_name(filename); + + let file = io + .open_file(log_file.to_str().unwrap(), OpenFlags::ReadOnly, false) + .unwrap(); + let mvcc_store = Arc::new(MvStore::new(LocalClock::new(), Storage::new(file.clone()))); + load_logical_log(&mvcc_store, file, &io, &pager).unwrap(); + + // Check rowids that weren't deleted + let tx = mvcc_store.begin_tx(pager.clone()).unwrap(); + for present_rowid in present_rowids { + let row = mvcc_store.read(tx, present_rowid).unwrap().unwrap(); + let record = ImmutableRecord::from_bin_record(row.data.clone()); + let values = record.get_values(); + let foo = values.first().unwrap(); + let RefValue::Text(foo) = foo else { + unreachable!() + }; + + assert_eq!(foo.as_str(), format!("row_{}", present_rowid.row_id as u64)); + } + + // Check rowids that were deleted + let tx = mvcc_store.begin_tx(pager.clone()).unwrap(); + for present_rowid in non_present_rowids { + let row = mvcc_store.read(tx, present_rowid).unwrap(); + assert!( + row.is_none(), + "row {present_rowid:?} should have been removed" + ); + } + } }