From 36a728d98428ac37fb3464b67948dc7cc4f60db7 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 22 Sep 2025 12:43:54 +0200 Subject: [PATCH 1/8] core/mvcc/logical-log: add format for row types --- core/mvcc/persistent_storage/logical_log.rs | 95 ++++++++++++++++++--- 1 file changed, 83 insertions(+), 12 deletions(-) diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index 0e8ad4918..c3ddf8ced 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -1,6 +1,9 @@ use crate::{ - mvcc::database::LogRecord, turso_assert, types::IOCompletions, Buffer, Completion, - CompletionError, Result, + mvcc::database::{LogRecord, RowVersion}, + storage::sqlite3_ondisk::{write_varint, write_varint_to_vec}, + turso_assert, + types::IOCompletions, + Buffer, Completion, CompletionError, Result, }; use std::sync::Arc; @@ -14,6 +17,81 @@ pub struct LogicalLog { const TOMBSTONE: u8 = 1; const NOT_TOMBSTONE: u8 = 0; +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq)] +enum LogRowType { + Delete = 0, + Insert = 1, +} + +impl LogRowType { + fn from_row_version(row_version: &RowVersion) -> Self { + if row_version.end.is_some() { + Self::Delete + } else { + Self::Insert + } + } + + #[allow(dead_code)] + fn from_u8(value: u8) -> Option { + match value { + 0 => Some(LogRowType::Delete), + 1 => Some(LogRowType::Insert), + _ => None, + } + } + + fn as_u8(&self) -> u8 { + *self as u8 + } + + /// Serialize a row_version into on disk format. + /// Format of a "row" (maybe we could change the name because row is not general enough for + /// future type of values): + /// + /// * table_id (root page) -> u64 + /// * row type -> u8 + /// + /// (by row type) + /// Delete: + /// * Payload length -> u64 + /// * Rowid -> varint + /// + /// Insert: + /// * Payload length -> u64 + /// * Data size -> varint + /// * Rowid -> varint + /// * Data -> [u8] (data size length) + fn serialize(&self, buffer: &mut Vec, row_version: &RowVersion) { + buffer.extend_from_slice(&row_version.row.id.table_id.to_be_bytes()); + buffer.extend_from_slice(&self.as_u8().to_be_bytes()); + let size_before_payload = buffer.len(); + match self { + LogRowType::Delete => { + write_varint_to_vec(row_version.row.id.row_id as u64, buffer); + } + LogRowType::Insert => { + write_varint_to_vec(row_version.row.id.row_id as u64, buffer); + write_varint_to_vec(row_version.row.column_count as u64, buffer); + + let data = &row_version.row.data; + // Maybe this isn't needed? We already might infer data size with payload size + // anyways. + write_varint_to_vec(data.len() as u64, buffer); + buffer.extend_from_slice(data); + } + } + // FIXME: remove shifting of bytes that we do by inserting payload sizes before everything + // Should payload_size be varint? + let payload_size = (buffer.len() - size_before_payload) as u64; + buffer.splice( + size_before_payload..size_before_payload, + payload_size.to_be_bytes(), + ); + } +} + impl LogicalLog { pub fn new(file: Arc) -> Self { Self { file, offset: 0 } @@ -23,17 +101,10 @@ impl LogicalLog { let mut buffer = Vec::new(); buffer.extend_from_slice(&tx.tx_timestamp.to_be_bytes()); tx.row_versions.iter().for_each(|row_version| { - let data = &row_version.row.data; - buffer.extend_from_slice(&row_version.row.id.table_id.to_be_bytes()); - buffer.extend_from_slice(&row_version.row.id.row_id.to_be_bytes()); - if row_version.end.is_some() { - buffer.extend_from_slice(&TOMBSTONE.to_be_bytes()); - } else { - buffer.extend_from_slice(&NOT_TOMBSTONE.to_be_bytes()); - buffer.extend_from_slice(&row_version.row.column_count.to_be_bytes()); - buffer.extend_from_slice(data); - } + let row_type = LogRowType::from_row_version(row_version); + row_type.serialize(&mut buffer, row_version); }); + let buffer = Arc::new(Buffer::new(buffer)); let c = Completion::new_write({ let buffer = buffer.clone(); From 6fc1bed18704901fea92b08046ab43ba8f6cc57b Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 22 Sep 2025 13:00:52 +0200 Subject: [PATCH 2/8] core/mvcc/logical-log: add format for logical log header --- core/mvcc/persistent_storage/logical_log.rs | 43 +++++++++++++++++++-- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index c3ddf8ced..95036ad24 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -1,6 +1,6 @@ use crate::{ mvcc::database::{LogRecord, RowVersion}, - storage::sqlite3_ondisk::{write_varint, write_varint_to_vec}, + storage::sqlite3_ondisk::write_varint_to_vec, turso_assert, types::IOCompletions, Buffer, Completion, CompletionError, Result, @@ -14,8 +14,31 @@ pub struct LogicalLog { offset: u64, } -const TOMBSTONE: u8 = 1; -const NOT_TOMBSTONE: u8 = 0; +/// Log's Header, this will be the 64 bytes in any logical log file. +/// Log header is 64 bytes at maximum, fields added must not exceed that size. If it doesn't exceed +/// it, any bytes missing will be padded with zeroes. +struct LogHeader { + version: u8, + salt: u64, + encrypted: u8, // 0 is no +} + +const LOG_HEADER_MAX_SIZE: usize = 64; +const LOG_HEADER_PADDING: [u8; LOG_HEADER_MAX_SIZE] = [0; LOG_HEADER_MAX_SIZE]; + +impl LogHeader { + pub fn serialize(&self, buffer: &mut Vec) { + let buffer_size_start = buffer.len(); + buffer.push(self.version); + buffer.extend_from_slice(&self.salt.to_be_bytes()); + buffer.push(self.encrypted); + + let header_size_before_padding = buffer.len() - buffer_size_start; + let padding = 64 - header_size_before_padding; + debug_assert!(header_size_before_padding <= LOG_HEADER_MAX_SIZE); + buffer.extend_from_slice(&LOG_HEADER_PADDING[0..padding]); + } +} #[repr(u8)] #[derive(Debug, Clone, Copy, PartialEq)] @@ -99,6 +122,20 @@ impl LogicalLog { pub fn log_tx(&mut self, tx: &LogRecord) -> Result> { let mut buffer = Vec::new(); + + // 1. Serialize log header if it's first write + let is_first_write = self.offset == 0; + if is_first_write { + let header = LogHeader { + version: 1, + salt: 0, // TODO: add checksums! + encrypted: 0, + }; + header.serialize(&mut buffer); + } + // 2. Serialize Transaction + + // 3. Serialize rows buffer.extend_from_slice(&tx.tx_timestamp.to_be_bytes()); tx.row_versions.iter().for_each(|row_version| { let row_type = LogRowType::from_row_version(row_version); From 2cd1562966248c7cce30756ba42c92815c4f5f5e Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 22 Sep 2025 13:06:11 +0200 Subject: [PATCH 3/8] core/mvcc/logical-log: add format for transaction fields and marker end --- core/mvcc/persistent_storage/logical_log.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index 95036ad24..3e9cd92d8 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -133,15 +133,24 @@ impl LogicalLog { }; header.serialize(&mut buffer); } + // 2. Serialize Transaction + buffer.extend_from_slice(&tx.tx_timestamp.to_be_bytes()); + // TODO: checksum + buffer.extend_from_slice(&[0; 8]); // 3. Serialize rows - buffer.extend_from_slice(&tx.tx_timestamp.to_be_bytes()); tx.row_versions.iter().for_each(|row_version| { let row_type = LogRowType::from_row_version(row_version); row_type.serialize(&mut buffer, row_version); }); + // 4. Serialize transaction's end marker. This marker will be the position of the offset + // after writing the buffer. + let offset_after_buffer = self.offset + buffer.len() as u64 + size_of::() as u64; + buffer.extend_from_slice(&offset_after_buffer.to_be_bytes()); + + // 5. Write to disk let buffer = Arc::new(Buffer::new(buffer)); let c = Completion::new_write({ let buffer = buffer.clone(); @@ -156,6 +165,7 @@ impl LogicalLog { ); } }); + let buffer_len = buffer.len(); let c = self.file.pwrite(self.offset, buffer, c)?; self.offset += buffer_len as u64; From 4c959e760b9fa55035a6723730b09c91efb8112b Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 22 Sep 2025 13:15:07 +0200 Subject: [PATCH 4/8] core/mvcc/logical-log: add rows size field for transaction format --- core/mvcc/persistent_storage/logical_log.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index 3e9cd92d8..9ba68fc10 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -138,6 +138,7 @@ impl LogicalLog { buffer.extend_from_slice(&tx.tx_timestamp.to_be_bytes()); // TODO: checksum buffer.extend_from_slice(&[0; 8]); + let buffer_pos_for_rows_size = buffer.len(); // 3. Serialize rows tx.row_versions.iter().for_each(|row_version| { @@ -145,8 +146,13 @@ impl LogicalLog { row_type.serialize(&mut buffer, row_version); }); - // 4. Serialize transaction's end marker. This marker will be the position of the offset + // 4. Serialize transaction's end marker and rows size. This marker will be the position of the offset // after writing the buffer. + let rows_size = (buffer.len() - buffer_pos_for_rows_size) as u64; + buffer.splice( + buffer_pos_for_rows_size..buffer_pos_for_rows_size, + rows_size.to_be_bytes(), + ); let offset_after_buffer = self.offset + buffer.len() as u64 + size_of::() as u64; buffer.extend_from_slice(&offset_after_buffer.to_be_bytes()); From e22a3893d54f1fd25a4671c01acb16f15afafb93 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 22 Sep 2025 13:18:17 +0200 Subject: [PATCH 5/8] core/mvcc/logical-log: remove column_count from insert row type --- core/mvcc/persistent_storage/logical_log.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index 9ba68fc10..b2b68cd94 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -96,7 +96,6 @@ impl LogRowType { } LogRowType::Insert => { write_varint_to_vec(row_version.row.id.row_id as u64, buffer); - write_varint_to_vec(row_version.row.column_count as u64, buffer); let data = &row_version.row.data; // Maybe this isn't needed? We already might infer data size with payload size From db326affc692fb7b9938e8b314fca4e20e3f1a12 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 22 Sep 2025 13:19:02 +0200 Subject: [PATCH 6/8] core/mvcc/logical-log: rename LogRowType to LogRecordType --- core/mvcc/persistent_storage/logical_log.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index b2b68cd94..b970f82e5 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -42,12 +42,12 @@ impl LogHeader { #[repr(u8)] #[derive(Debug, Clone, Copy, PartialEq)] -enum LogRowType { +enum LogRecordType { Delete = 0, Insert = 1, } -impl LogRowType { +impl LogRecordType { fn from_row_version(row_version: &RowVersion) -> Self { if row_version.end.is_some() { Self::Delete @@ -59,8 +59,8 @@ impl LogRowType { #[allow(dead_code)] fn from_u8(value: u8) -> Option { match value { - 0 => Some(LogRowType::Delete), - 1 => Some(LogRowType::Insert), + 0 => Some(LogRecordType::Delete), + 1 => Some(LogRecordType::Insert), _ => None, } } @@ -91,10 +91,10 @@ impl LogRowType { buffer.extend_from_slice(&self.as_u8().to_be_bytes()); let size_before_payload = buffer.len(); match self { - LogRowType::Delete => { + LogRecordType::Delete => { write_varint_to_vec(row_version.row.id.row_id as u64, buffer); } - LogRowType::Insert => { + LogRecordType::Insert => { write_varint_to_vec(row_version.row.id.row_id as u64, buffer); let data = &row_version.row.data; @@ -141,7 +141,7 @@ impl LogicalLog { // 3. Serialize rows tx.row_versions.iter().for_each(|row_version| { - let row_type = LogRowType::from_row_version(row_version); + let row_type = LogRecordType::from_row_version(row_version); row_type.serialize(&mut buffer, row_version); }); From 4cc88ee2bbedbf26033db6f0795b1dbb97a98ed1 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 22 Sep 2025 13:21:54 +0200 Subject: [PATCH 7/8] core/mvcc/logical-log: rename Insert and Delete -> InsertRow and DeleteRow in LogRecordType --- core/mvcc/persistent_storage/logical_log.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index b970f82e5..8d4d5e0f9 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -43,24 +43,24 @@ impl LogHeader { #[repr(u8)] #[derive(Debug, Clone, Copy, PartialEq)] enum LogRecordType { - Delete = 0, - Insert = 1, + DeleteRow = 0, + InsertRow = 1, } impl LogRecordType { fn from_row_version(row_version: &RowVersion) -> Self { if row_version.end.is_some() { - Self::Delete + Self::DeleteRow } else { - Self::Insert + Self::InsertRow } } #[allow(dead_code)] fn from_u8(value: u8) -> Option { match value { - 0 => Some(LogRecordType::Delete), - 1 => Some(LogRecordType::Insert), + 0 => Some(LogRecordType::DeleteRow), + 1 => Some(LogRecordType::InsertRow), _ => None, } } @@ -91,10 +91,10 @@ impl LogRecordType { buffer.extend_from_slice(&self.as_u8().to_be_bytes()); let size_before_payload = buffer.len(); match self { - LogRecordType::Delete => { + LogRecordType::DeleteRow => { write_varint_to_vec(row_version.row.id.row_id as u64, buffer); } - LogRecordType::Insert => { + LogRecordType::InsertRow => { write_varint_to_vec(row_version.row.id.row_id as u64, buffer); let data = &row_version.row.data; From 2f4426fc33f067695eaa65a605e6217f24e6d14d Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 22 Sep 2025 13:25:46 +0200 Subject: [PATCH 8/8] core/mvcc/logical-log: change order of Data size in InsertRow --- core/mvcc/persistent_storage/logical_log.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index 8d4d5e0f9..1ba1a07d1 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -83,8 +83,8 @@ impl LogRecordType { /// /// Insert: /// * Payload length -> u64 - /// * Data size -> varint /// * Rowid -> varint + /// * Data size -> varint /// * Data -> [u8] (data size length) fn serialize(&self, buffer: &mut Vec, row_version: &RowVersion) { buffer.extend_from_slice(&row_version.row.id.table_id.to_be_bytes());