mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-07 09:14:26 +01:00
Add some table ID integrity checks to logical log recovery
This commit is contained in:
@@ -12,11 +12,13 @@ use crate::turso_assert;
|
||||
use crate::types::IOCompletions;
|
||||
use crate::types::IOResult;
|
||||
use crate::types::ImmutableRecord;
|
||||
use crate::types::RecordCursor;
|
||||
use crate::types::SeekResult;
|
||||
use crate::Completion;
|
||||
use crate::File;
|
||||
use crate::IOExt;
|
||||
use crate::LimboError;
|
||||
use crate::RefValue;
|
||||
use crate::Result;
|
||||
use crate::Statement;
|
||||
use crate::StepResult;
|
||||
@@ -1927,15 +1929,46 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
match reader.next_record(&pager.io).unwrap() {
|
||||
StreamingResult::InsertRow { row, rowid } => {
|
||||
tracing::trace!("read {row:?} with rowid {rowid:?}");
|
||||
if self.table_id_to_rootpage.get(&row.id.table_id).is_none() {
|
||||
self.insert_table_id_to_rootpage(row.id.table_id, None);
|
||||
if rowid.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID {
|
||||
// Sqlite schema row version inserts
|
||||
let row_data = row.data.clone();
|
||||
let record = ImmutableRecord::from_bin_record(row_data);
|
||||
let mut record_cursor = RecordCursor::new();
|
||||
let record_values = record_cursor.get_values(&record).unwrap();
|
||||
let RefValue::Integer(root_page) = record_values[3] else {
|
||||
panic!(
|
||||
"Expected integer value for root page, got {:?}",
|
||||
record_values[3]
|
||||
);
|
||||
};
|
||||
if root_page < 0 {
|
||||
let table_id = self.get_table_id_from_root_page(root_page);
|
||||
// Not a checkpointed table; must not have a root page in mapping
|
||||
if let Some(entry) = self.table_id_to_rootpage.get(&table_id) {
|
||||
if let Some(value) = *entry.value() {
|
||||
panic!("Logical log contains an insertion of a sqlite_schema record that has both a negative root page and a positive root page: {root_page} & {value}");
|
||||
}
|
||||
}
|
||||
self.insert_table_id_to_rootpage(table_id, None);
|
||||
} else {
|
||||
// A checkpointed table; sqlite_schema root page value must match the in-memory mapping
|
||||
let table_id = self.get_table_id_from_root_page(root_page);
|
||||
let Some(entry) = self.table_id_to_rootpage.get(&table_id) else {
|
||||
panic!("Logical log contains root page reference {root_page} that does not exist in the table_id_to_rootpage map");
|
||||
};
|
||||
let Some(value) = *entry.value() else {
|
||||
panic!("Logical log contains root page reference {root_page} that does not have a root page in the table_id_to_rootpage map");
|
||||
};
|
||||
assert!(value == root_page as u64, "Logical log contains root page reference {root_page} that does not match the root page in the table_id_to_rootpage map({value})");
|
||||
}
|
||||
} else {
|
||||
// Other table row version inserts; table id must exist in mapping (otherwise there's a row version insert to an unknown table)
|
||||
assert!(self.table_id_to_rootpage.get(&rowid.table_id).is_some(), "Logical log contains a row version insert with a table id {} that does not exist in the table_id_to_rootpage map: {:?}", rowid.table_id, self.table_id_to_rootpage.iter().collect::<Vec<_>>());
|
||||
}
|
||||
self.insert(tx_id, row)?;
|
||||
}
|
||||
StreamingResult::DeleteRow { rowid } => {
|
||||
if self.table_id_to_rootpage.get(&rowid.table_id).is_none() {
|
||||
self.insert_table_id_to_rootpage(rowid.table_id, None);
|
||||
}
|
||||
assert!(self.table_id_to_rootpage.get(&rowid.table_id).is_some(), "Logical log contains a row version delete with a table id that does not exist in the table_id_to_rootpage map: {}", rowid.table_id);
|
||||
self.delete(tx_id, rowid)?;
|
||||
}
|
||||
StreamingResult::Eof => {
|
||||
|
||||
Reference in New Issue
Block a user