diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index 077a1fae5..7d9b658d4 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -107,6 +107,10 @@ impl LogRecordType { /// * Data -> [u8] (data size length) fn serialize(&self, buffer: &mut Vec, row_version: &RowVersion) { let table_id_i64: i64 = row_version.row.id.table_id.into(); + assert!( + table_id_i64 < 0, + "table_id_i64 should be negative, but got {table_id_i64}" + ); buffer.extend_from_slice(&table_id_i64.to_be_bytes()); buffer.extend_from_slice(&self.as_u8().to_be_bytes()); let size_before_payload = buffer.len(); @@ -316,7 +320,7 @@ impl StreamingLogicalLogReader { self.state = StreamingState::NeedTransactionStart; continue; } - let table_id = MVTableId::from(self.consume_u64(io)? as i64); + let table_id = MVTableId::from(self.consume_i64(io)?); let record_type = self.consume_u8(io)?; let _payload_size = self.consume_u64(io)?; let mut bytes_read_on_row = 17; // table_id, record_type and payload_size @@ -371,6 +375,17 @@ impl StreamingLogicalLogReader { Ok(r) } + fn consume_i64(&mut self, io: &Arc) -> Result { + self.read_more_data(io, 8)?; + let r = i64::from_be_bytes( + self.buffer.read().unwrap()[self.buffer_offset..self.buffer_offset + 8] + .try_into() + .unwrap(), + ); + self.buffer_offset += 8; + Ok(r) + } + fn consume_u64(&mut self, io: &Arc) -> Result { self.read_more_data(io, 8)?; let r = u64::from_be_bytes(