diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index 0e8ad4918..1ba1a07d1 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -1,6 +1,9 @@ use crate::{ - mvcc::database::LogRecord, turso_assert, types::IOCompletions, Buffer, Completion, - CompletionError, Result, + mvcc::database::{LogRecord, RowVersion}, + storage::sqlite3_ondisk::write_varint_to_vec, + turso_assert, + types::IOCompletions, + Buffer, Completion, CompletionError, Result, }; use std::sync::Arc; @@ -11,8 +14,105 @@ pub struct LogicalLog { offset: u64, } -const TOMBSTONE: u8 = 1; -const NOT_TOMBSTONE: u8 = 0; +/// Log's Header, this will be the 64 bytes in any logical log file. +/// Log header is 64 bytes at maximum, fields added must not exceed that size. If it doesn't exceed +/// it, any bytes missing will be padded with zeroes. +struct LogHeader { + version: u8, + salt: u64, + encrypted: u8, // 0 is no +} + +const LOG_HEADER_MAX_SIZE: usize = 64; +const LOG_HEADER_PADDING: [u8; LOG_HEADER_MAX_SIZE] = [0; LOG_HEADER_MAX_SIZE]; + +impl LogHeader { + pub fn serialize(&self, buffer: &mut Vec) { + let buffer_size_start = buffer.len(); + buffer.push(self.version); + buffer.extend_from_slice(&self.salt.to_be_bytes()); + buffer.push(self.encrypted); + + let header_size_before_padding = buffer.len() - buffer_size_start; + let padding = 64 - header_size_before_padding; + debug_assert!(header_size_before_padding <= LOG_HEADER_MAX_SIZE); + buffer.extend_from_slice(&LOG_HEADER_PADDING[0..padding]); + } +} + +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq)] +enum LogRecordType { + DeleteRow = 0, + InsertRow = 1, +} + +impl LogRecordType { + fn from_row_version(row_version: &RowVersion) -> Self { + if row_version.end.is_some() { + Self::DeleteRow + } else { + Self::InsertRow + } + } + + #[allow(dead_code)] + fn from_u8(value: u8) -> Option { + match value { + 0 => Some(LogRecordType::DeleteRow), + 1 => Some(LogRecordType::InsertRow), + _ => None, + } + } + + fn as_u8(&self) -> u8 { + *self as u8 + } + + /// Serialize a row_version into on disk format. + /// Format of a "row" (maybe we could change the name because row is not general enough for + /// future type of values): + /// + /// * table_id (root page) -> u64 + /// * row type -> u8 + /// + /// (by row type) + /// Delete: + /// * Payload length -> u64 + /// * Rowid -> varint + /// + /// Insert: + /// * Payload length -> u64 + /// * Rowid -> varint + /// * Data size -> varint + /// * Data -> [u8] (data size length) + fn serialize(&self, buffer: &mut Vec, row_version: &RowVersion) { + buffer.extend_from_slice(&row_version.row.id.table_id.to_be_bytes()); + buffer.extend_from_slice(&self.as_u8().to_be_bytes()); + let size_before_payload = buffer.len(); + match self { + LogRecordType::DeleteRow => { + write_varint_to_vec(row_version.row.id.row_id as u64, buffer); + } + LogRecordType::InsertRow => { + write_varint_to_vec(row_version.row.id.row_id as u64, buffer); + + let data = &row_version.row.data; + // Maybe this isn't needed? We already might infer data size with payload size + // anyways. + write_varint_to_vec(data.len() as u64, buffer); + buffer.extend_from_slice(data); + } + } + // FIXME: remove shifting of bytes that we do by inserting payload sizes before everything + // Should payload_size be varint? + let payload_size = (buffer.len() - size_before_payload) as u64; + buffer.splice( + size_before_payload..size_before_payload, + payload_size.to_be_bytes(), + ); + } +} impl LogicalLog { pub fn new(file: Arc) -> Self { @@ -21,19 +121,41 @@ impl LogicalLog { pub fn log_tx(&mut self, tx: &LogRecord) -> Result> { let mut buffer = Vec::new(); + + // 1. Serialize log header if it's first write + let is_first_write = self.offset == 0; + if is_first_write { + let header = LogHeader { + version: 1, + salt: 0, // TODO: add checksums! + encrypted: 0, + }; + header.serialize(&mut buffer); + } + + // 2. Serialize Transaction buffer.extend_from_slice(&tx.tx_timestamp.to_be_bytes()); + // TODO: checksum + buffer.extend_from_slice(&[0; 8]); + let buffer_pos_for_rows_size = buffer.len(); + + // 3. Serialize rows tx.row_versions.iter().for_each(|row_version| { - let data = &row_version.row.data; - buffer.extend_from_slice(&row_version.row.id.table_id.to_be_bytes()); - buffer.extend_from_slice(&row_version.row.id.row_id.to_be_bytes()); - if row_version.end.is_some() { - buffer.extend_from_slice(&TOMBSTONE.to_be_bytes()); - } else { - buffer.extend_from_slice(&NOT_TOMBSTONE.to_be_bytes()); - buffer.extend_from_slice(&row_version.row.column_count.to_be_bytes()); - buffer.extend_from_slice(data); - } + let row_type = LogRecordType::from_row_version(row_version); + row_type.serialize(&mut buffer, row_version); }); + + // 4. Serialize transaction's end marker and rows size. This marker will be the position of the offset + // after writing the buffer. + let rows_size = (buffer.len() - buffer_pos_for_rows_size) as u64; + buffer.splice( + buffer_pos_for_rows_size..buffer_pos_for_rows_size, + rows_size.to_be_bytes(), + ); + let offset_after_buffer = self.offset + buffer.len() as u64 + size_of::() as u64; + buffer.extend_from_slice(&offset_after_buffer.to_be_bytes()); + + // 5. Write to disk let buffer = Arc::new(Buffer::new(buffer)); let c = Completion::new_write({ let buffer = buffer.clone(); @@ -48,6 +170,7 @@ impl LogicalLog { ); } }); + let buffer_len = buffer.len(); let c = self.file.pwrite(self.offset, buffer, c)?; self.offset += buffer_len as u64;