From e68c652f8f24265aef498bb7b0455887e0a4cd1f Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 30 Sep 2025 17:19:21 +0300 Subject: [PATCH] Add some table ID integrity checks to logical log recovery --- core/mvcc/database/mod.rs | 43 ++++++++++++++++++++++++++++++++++----- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 10d26b8f1..a11689e27 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -12,11 +12,13 @@ use crate::turso_assert; use crate::types::IOCompletions; use crate::types::IOResult; use crate::types::ImmutableRecord; +use crate::types::RecordCursor; use crate::types::SeekResult; use crate::Completion; use crate::File; use crate::IOExt; use crate::LimboError; +use crate::RefValue; use crate::Result; use crate::Statement; use crate::StepResult; @@ -1927,15 +1929,46 @@ impl MvStore { match reader.next_record(&pager.io).unwrap() { StreamingResult::InsertRow { row, rowid } => { tracing::trace!("read {row:?} with rowid {rowid:?}"); - if self.table_id_to_rootpage.get(&row.id.table_id).is_none() { - self.insert_table_id_to_rootpage(row.id.table_id, None); + if rowid.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID { + // Sqlite schema row version inserts + let row_data = row.data.clone(); + let record = ImmutableRecord::from_bin_record(row_data); + let mut record_cursor = RecordCursor::new(); + let record_values = record_cursor.get_values(&record).unwrap(); + let RefValue::Integer(root_page) = record_values[3] else { + panic!( + "Expected integer value for root page, got {:?}", + record_values[3] + ); + }; + if root_page < 0 { + let table_id = self.get_table_id_from_root_page(root_page); + // Not a checkpointed table; must not have a root page in mapping + if let Some(entry) = self.table_id_to_rootpage.get(&table_id) { + if let Some(value) = *entry.value() { + panic!("Logical log contains an insertion of a sqlite_schema record that has both a negative root page and a positive root page: {root_page} & {value}"); + } + } + self.insert_table_id_to_rootpage(table_id, None); + } else { + // A checkpointed table; sqlite_schema root page value must match the in-memory mapping + let table_id = self.get_table_id_from_root_page(root_page); + let Some(entry) = self.table_id_to_rootpage.get(&table_id) else { + panic!("Logical log contains root page reference {root_page} that does not exist in the table_id_to_rootpage map"); + }; + let Some(value) = *entry.value() else { + panic!("Logical log contains root page reference {root_page} that does not have a root page in the table_id_to_rootpage map"); + }; + assert!(value == root_page as u64, "Logical log contains root page reference {root_page} that does not match the root page in the table_id_to_rootpage map({value})"); + } + } else { + // Other table row version inserts; table id must exist in mapping (otherwise there's a row version insert to an unknown table) + assert!(self.table_id_to_rootpage.get(&rowid.table_id).is_some(), "Logical log contains a row version insert with a table id {} that does not exist in the table_id_to_rootpage map: {:?}", rowid.table_id, self.table_id_to_rootpage.iter().collect::>()); } self.insert(tx_id, row)?; } StreamingResult::DeleteRow { rowid } => { - if self.table_id_to_rootpage.get(&rowid.table_id).is_none() { - self.insert_table_id_to_rootpage(rowid.table_id, None); - } + assert!(self.table_id_to_rootpage.get(&rowid.table_id).is_some(), "Logical log contains a row version delete with a table id that does not exist in the table_id_to_rootpage map: {}", rowid.table_id); self.delete(tx_id, rowid)?; } StreamingResult::Eof => {