Use the SQLite header as associated data for protection

against tampering and corruption.

Previously, we did not use the first 100 bytes in encryption
machinery. This patch changes that and uses that data as
associated data. So in case the header is corrupted or
tampered with, the decryption will fail.
This commit is contained in:
Avinash Sajjanshetty
2025-09-27 17:03:18 +05:30
parent 9cd869f660
commit dc3d1fa36d

View File

@@ -1,5 +1,5 @@
#![allow(unused_variables, dead_code)]
use crate::{LimboError, Result};
use crate::{turso_assert, LimboError, Result};
use aegis::aegis128l::Aegis128L;
use aegis::aegis128x2::Aegis128X2;
use aegis::aegis128x4::Aegis128X4;
@@ -12,6 +12,12 @@ use aes_gcm::{
};
use turso_macros::match_ignore_ascii_case;
/// constants used for the Turso page header in the encrypted dbs.
const TURSO_HEADER_PREFIX: &[u8] = b"Turso";
const TURSO_VERSION: u8 = 0x00;
const TURSO_HEADER_SIZE: usize = 16;
const SQLITE_HEADER: &[u8] = b"SQLite format 3\0";
#[derive(Clone)]
pub enum EncryptionKey {
Key128([u8; 16]),
@@ -189,9 +195,12 @@ macro_rules! define_aes_gcm_cipher {
})
}
fn encrypt(&self, plaintext: &[u8], _ad: &[u8]) -> Result<(Vec<u8>, [u8; 12])> {
fn encrypt(&self, plaintext: &[u8], ad: &[u8]) -> Result<(Vec<u8>, [u8; 12])> {
let nonce = <$cipher_type>::generate_nonce(&mut OsRng);
let ciphertext = self.cipher.encrypt(&nonce, plaintext).map_err(|e| {
let ciphertext = self.cipher.encrypt(&nonce, aes_gcm::aead::Payload {
msg: plaintext,
aad: ad,
}).map_err(|e| {
LimboError::InternalError(format!("{} encryption failed: {e:?}", $name))
})?;
let mut nonce_array = [0u8; 12];
@@ -199,10 +208,13 @@ macro_rules! define_aes_gcm_cipher {
Ok((ciphertext, nonce_array))
}
fn decrypt(&self, ciphertext: &[u8], nonce: &[u8; 12], _ad: &[u8]) -> Result<Vec<u8>> {
fn decrypt(&self, ciphertext: &[u8], nonce: &[u8; 12], ad: &[u8]) -> Result<Vec<u8>> {
let nonce = Nonce::from_slice(nonce);
self.cipher
.decrypt(nonce, ciphertext)
.decrypt(nonce, aes_gcm::aead::Payload {
msg: ciphertext,
aad: ad,
})
.map_err(|_| -> LimboError { CipherError::DecryptionFailed { cipher: $name }.into() })
}
}
@@ -349,6 +361,37 @@ impl CipherMode {
pub fn metadata_size(&self) -> usize {
self.nonce_size() + self.tag_size()
}
/// Returns the cipher identifier byte for Turso header
pub fn cipher_id(&self) -> u8 {
match self {
CipherMode::Aes128Gcm => 1,
CipherMode::Aes256Gcm => 2,
CipherMode::Aegis256 => 3,
CipherMode::Aegis256X2 => 4,
CipherMode::Aegis256X4 => 5,
CipherMode::Aegis128L => 6,
CipherMode::Aegis128X2 => 7,
CipherMode::Aegis128X4 => 8,
}
}
/// Creates a CipherMode from cipher identifier byte. This is used when read from Turso header.
pub fn from_cipher_id(id: u8) -> Result<Self> {
match id {
1 => Ok(CipherMode::Aes128Gcm),
2 => Ok(CipherMode::Aes256Gcm),
3 => Ok(CipherMode::Aegis256),
4 => Ok(CipherMode::Aegis256X2),
5 => Ok(CipherMode::Aegis256X4),
6 => Ok(CipherMode::Aegis128L),
7 => Ok(CipherMode::Aegis128X2),
8 => Ok(CipherMode::Aegis128X4),
_ => Err(LimboError::InvalidArgument(format!(
"Unknown cipher ID: {id}"
))),
}
}
}
#[derive(Clone)]
@@ -423,9 +466,66 @@ impl EncryptionContext {
self.cipher_mode.metadata_size() as u8
}
/// Creates Turso header for encrypted page 1
fn create_turso_header(&self) -> [u8; TURSO_HEADER_SIZE] {
let mut header = [0u8; TURSO_HEADER_SIZE];
// "Turso" prefix (5 bytes)
header[..TURSO_HEADER_PREFIX.len()].copy_from_slice(TURSO_HEADER_PREFIX);
// version byte (1 byte)
header[5] = TURSO_VERSION;
// cipher identifier (1 byte)
header[6] = self.cipher_mode.cipher_id();
// remaining unused 9 bytes
header
}
/// Validates and extracts cipher mode from Turso header
fn validate_turso_header(&self, header: &[u8]) -> Result<()> {
if header.len() < TURSO_HEADER_SIZE {
return Err(LimboError::InternalError(
"Header too short for encrypted Turso db".into(),
));
}
if &header[..TURSO_HEADER_PREFIX.len()] != TURSO_HEADER_PREFIX {
return Err(LimboError::InternalError(
"Invalid Turso header: prefix mismatch".into(),
));
}
let version = header[5];
if version != TURSO_VERSION {
return Err(LimboError::InternalError(format!(
"Unsupported Turso header version: expected {}, got {}",
TURSO_VERSION, version
)));
}
let cipher_id = header[6];
let header_cipher = CipherMode::from_cipher_id(cipher_id)?;
if header_cipher != self.cipher_mode {
return Err(LimboError::InternalError(format!(
"Cipher mode mismatch: expected {:?} (ID {}), got {:?} (ID {})",
self.cipher_mode,
self.cipher_mode.cipher_id(),
header_cipher,
cipher_id
)));
}
Ok(())
}
#[cfg(feature = "encryption")]
pub fn encrypt_page(&self, page: &[u8], page_id: usize) -> Result<Vec<u8>> {
use crate::storage::sqlite3_ondisk::DatabaseHeader;
if page_id == DatabaseHeader::PAGE_ID {
return self.encrypt_page_1(page);
}
tracing::debug!("encrypting page {}", page_id);
assert_eq!(
page.len(),
@@ -434,19 +534,12 @@ impl EncryptionContext {
self.page_size
);
let encryption_start_offset = match page_id {
DatabaseHeader::PAGE_ID => DatabaseHeader::SIZE,
_ => 0,
};
let metadata_size = self.cipher_mode.metadata_size();
let reserved_bytes = &page[self.page_size - metadata_size..];
#[cfg(debug_assertions)]
{
use crate::turso_assert;
// In debug builds, ensure that the reserved bytes are zeroed out. So even when we are
// reusing a page from buffer pool, we zero out in debug build so that we can be
// sure that b tree layer is not writing any data into the reserved space.
let reserved_bytes_zeroed = reserved_bytes.iter().all(|&b| b == 0);
turso_assert!(
reserved_bytes_zeroed,
@@ -454,20 +547,18 @@ impl EncryptionContext {
);
}
let payload = &page[encryption_start_offset..self.page_size - metadata_size];
let payload = &page[..self.page_size - metadata_size];
let (encrypted, nonce) = self.encrypt_raw(payload)?;
let nonce_size = self.cipher_mode.nonce_size();
assert_eq!(
encrypted.len(),
self.page_size - nonce_size - encryption_start_offset,
self.page_size - nonce_size,
"Encrypted page must be exactly {} bytes",
self.page_size - nonce_size - encryption_start_offset
self.page_size - nonce_size
);
let mut result = Vec::with_capacity(self.page_size);
result.extend_from_slice(&page[..encryption_start_offset]);
result.extend_from_slice(&encrypted);
result.extend_from_slice(&nonce);
assert_eq!(
@@ -482,6 +573,9 @@ impl EncryptionContext {
#[cfg(feature = "encryption")]
pub fn decrypt_page(&self, encrypted_page: &[u8], page_id: usize) -> Result<Vec<u8>> {
use crate::storage::sqlite3_ondisk::DatabaseHeader;
if page_id == DatabaseHeader::PAGE_ID {
return self.decrypt_page_1(encrypted_page);
}
tracing::debug!("decrypting page {}", page_id);
assert_eq!(
encrypted_page.len(),
@@ -489,32 +583,143 @@ impl EncryptionContext {
"Encrypted page data must be exactly {} bytes",
self.page_size
);
// for page 1, the encrypted page starts after the database header
// for other pages, the encrypted page starts at the beginning
let encrypted_page_offset = match page_id {
DatabaseHeader::PAGE_ID => DatabaseHeader::SIZE,
_ => 0,
};
let nonce_size = self.cipher_mode.nonce_size();
let nonce_offset = encrypted_page.len() - nonce_size;
let payload = &encrypted_page[encrypted_page_offset..nonce_offset];
let payload = &encrypted_page[..nonce_offset];
let nonce = &encrypted_page[nonce_offset..];
let decrypted_data = self.decrypt_raw(payload, nonce)?;
let metadata_size = self.cipher_mode.metadata_size();
assert_eq!(
decrypted_data.len(),
self.page_size - metadata_size,
"Decrypted page data must be exactly {} bytes",
self.page_size - metadata_size
);
let mut result = Vec::with_capacity(self.page_size);
result.extend_from_slice(&decrypted_data);
result.resize(self.page_size, 0);
assert_eq!(
result.len(),
self.page_size,
"Decrypted page data must be exactly {} bytes",
self.page_size
);
Ok(result)
}
#[cfg(feature = "encryption")]
fn encrypt_page_1(&self, page: &[u8]) -> Result<Vec<u8>> {
use crate::storage::sqlite3_ondisk::DatabaseHeader;
tracing::debug!("encrypting page 1");
assert_eq!(
page.len(),
self.page_size,
"Page data must be exactly {} bytes",
self.page_size
);
// since this is page 1, this must have header
turso_assert!(
&page[..SQLITE_HEADER.len()] == SQLITE_HEADER,
"Page 1 must start with SQLite header"
);
let metadata_size = self.cipher_mode.metadata_size();
let reserved_bytes = &page[self.page_size - metadata_size..];
#[cfg(debug_assertions)]
{
use crate::turso_assert;
let reserved_bytes_zeroed = reserved_bytes.iter().all(|&b| b == 0);
turso_assert!(
reserved_bytes_zeroed,
"last reserved bytes must be empty/zero, but found non-zero bytes"
);
}
// page 1 encryption:
// 1. First 16 bytes are replaced with Turso magic bytes
// 2. Next 84 bytes (16-100) are kept as-is (not encrypted)
// 3. Remaining bytes (100-end) are encrypted
// 4. The header (the first 100 bytes) as associated data
let turso_header = self.create_turso_header();
let mut new_header = Vec::with_capacity(DatabaseHeader::SIZE);
new_header.extend_from_slice(&turso_header);
new_header.extend_from_slice(&page[TURSO_HEADER_SIZE..DatabaseHeader::SIZE]);
let payload = &page[DatabaseHeader::SIZE..self.page_size - metadata_size];
let (encrypted, nonce) = self.encrypt_raw_with_ad(payload, &new_header)?;
let nonce_size = self.cipher_mode.nonce_size();
assert_eq!(
encrypted.len(),
self.page_size - nonce_size - DatabaseHeader::SIZE,
"Encrypted page must be exactly {} bytes",
self.page_size - nonce_size - DatabaseHeader::SIZE
);
let mut result = Vec::with_capacity(self.page_size);
// 1. copy the header
result.extend_from_slice(&new_header);
// 2. copy the encrypted payload
result.extend_from_slice(&encrypted);
// 3. now add the nonce
result.extend_from_slice(&nonce);
assert_eq!(
result.len(),
self.page_size,
"Encrypted page must be exactly {} bytes",
self.page_size
);
Ok(result)
}
#[cfg(feature = "encryption")]
fn decrypt_page_1(&self, encrypted_page: &[u8]) -> Result<Vec<u8>> {
use crate::storage::sqlite3_ondisk::DatabaseHeader;
tracing::debug!("decrypting page 1");
assert_eq!(
encrypted_page.len(),
self.page_size,
"Encrypted page data must be exactly {} bytes",
self.page_size
);
self.validate_turso_header(&encrypted_page[..TURSO_HEADER_SIZE])?;
let nonce_size = self.cipher_mode.nonce_size();
let nonce_offset = encrypted_page.len() - nonce_size;
let payload = &encrypted_page[DatabaseHeader::SIZE..nonce_offset];
let nonce = &encrypted_page[nonce_offset..];
// it's important to use the header on disk (with Turso magic bytes) as associated data
// for protection against tampering the header
let header = &encrypted_page[..DatabaseHeader::SIZE];
let decrypted_data = self.decrypt_raw_with_ad(payload, nonce, header)?;
let metadata_size = self.cipher_mode.metadata_size();
assert_eq!(
decrypted_data.len(),
self.page_size - metadata_size - encrypted_page_offset,
self.page_size - metadata_size - DatabaseHeader::SIZE,
"Decrypted page data must be exactly {} bytes",
self.page_size - metadata_size - encrypted_page_offset
self.page_size - metadata_size - DatabaseHeader::SIZE
);
// reconstruct the page with the appropriate SQLite header
let mut result = Vec::with_capacity(self.page_size);
result.extend_from_slice(&encrypted_page[..encrypted_page_offset]);
result.extend_from_slice(SQLITE_HEADER);
result.extend_from_slice(&encrypted_page[TURSO_HEADER_SIZE..DatabaseHeader::SIZE]);
result.extend_from_slice(&decrypted_data);
result.resize(self.page_size, 0);
assert_eq!(
result.len(),
self.page_size,
@@ -527,10 +732,14 @@ impl EncryptionContext {
/// encrypts raw data using the configured cipher, returns ciphertext and nonce
fn encrypt_raw(&self, plaintext: &[u8]) -> Result<(Vec<u8>, Vec<u8>)> {
const AD: &[u8] = b"";
self.encrypt_raw_with_ad(plaintext, AD)
}
/// encrypts raw data with associated data using the configured cipher
fn encrypt_raw_with_ad(&self, plaintext: &[u8], ad: &[u8]) -> Result<(Vec<u8>, Vec<u8>)> {
macro_rules! encrypt_cipher {
($cipher:expr) => {{
let (ciphertext, nonce) = $cipher.encrypt(plaintext, AD)?;
let (ciphertext, nonce) = $cipher.encrypt(plaintext, ad)?;
Ok((ciphertext, nonce.to_vec()))
}};
}
@@ -549,7 +758,10 @@ impl EncryptionContext {
fn decrypt_raw(&self, ciphertext: &[u8], nonce: &[u8]) -> Result<Vec<u8>> {
const AD: &[u8] = b"";
self.decrypt_raw_with_ad(ciphertext, nonce, AD)
}
fn decrypt_raw_with_ad(&self, ciphertext: &[u8], nonce: &[u8], ad: &[u8]) -> Result<Vec<u8>> {
macro_rules! decrypt_with_nonce {
($cipher:expr, $nonce_size:literal, $name:literal) => {{
let nonce_array: [u8; $nonce_size] = nonce.try_into().map_err(|_| {
@@ -560,7 +772,7 @@ impl EncryptionContext {
nonce.len()
))
})?;
$cipher.decrypt(ciphertext, &nonce_array, AD)
$cipher.decrypt(ciphertext, &nonce_array, ad)
}};
}
@@ -715,6 +927,17 @@ mod tests {
hex::encode(bytes)
}
fn create_test_page_1() -> Vec<u8> {
let mut page = vec![0u8; DEFAULT_ENCRYPTED_PAGE_SIZE];
page[..SQLITE_HEADER.len()].copy_from_slice(SQLITE_HEADER);
let mut rng = rand::thread_rng();
// 48 is the max reserved bytes we might need for metadata with any cipher
for i in SQLITE_HEADER.len()..DEFAULT_ENCRYPTED_PAGE_SIZE - 48 {
page[i] = rng.gen();
}
page
}
test_aes_cipher_wrapper!(
test_aes128gcm_cipher_wrapper,
Aes128GcmCipher,
@@ -731,6 +954,122 @@ mod tests {
"Hello, AES-128-GCM!"
);
#[test]
fn test_page_1_encrypt_decrypt_round_trip_with_ad() {
let key = EncryptionKey::from_hex_string(&generate_random_hex_key()).unwrap();
let ctx = EncryptionContext::new(CipherMode::Aegis256, &key, DEFAULT_ENCRYPTED_PAGE_SIZE)
.unwrap();
let page_data = create_test_page_1();
let encrypted = ctx.encrypt_page(&page_data, 1).unwrap();
assert_eq!(encrypted.len(), DEFAULT_ENCRYPTED_PAGE_SIZE);
// check that header is readable directly from disk (not encrypted)
assert_eq!(&encrypted[..5], b"Turso");
assert_eq!(encrypted[5], TURSO_VERSION);
assert_eq!(encrypted[6], CipherMode::Aegis256.cipher_id());
// header should be unencrypted, but data after DatabaseHeader::SIZE should be different
assert_eq!(&encrypted[16..100], &page_data[16..100]); // header portion
assert_ne!(&encrypted[100..200], &page_data[100..200]); // some encrypted portion
// decrypt page 1
let decrypted = ctx.decrypt_page(&encrypted, 1).unwrap();
assert_eq!(decrypted.len(), DEFAULT_ENCRYPTED_PAGE_SIZE);
// check that SQLite header was restored
assert_eq!(&decrypted[..SQLITE_HEADER.len()], SQLITE_HEADER);
assert_eq!(decrypted, page_data);
}
#[test]
fn test_turso_header_validation() {
let key = EncryptionKey::from_hex_string(&generate_random_hex_key()).unwrap();
let ctx = EncryptionContext::new(CipherMode::Aegis256, &key, DEFAULT_ENCRYPTED_PAGE_SIZE)
.unwrap();
// test cipher_id conversion
assert_eq!(CipherMode::Aes128Gcm.cipher_id(), 1);
assert_eq!(CipherMode::Aes256Gcm.cipher_id(), 2);
assert_eq!(CipherMode::Aegis256.cipher_id(), 3);
assert_eq!(CipherMode::Aegis128L.cipher_id(), 6);
// test from_cipher_id conversion
assert_eq!(
CipherMode::from_cipher_id(1).unwrap(),
CipherMode::Aes128Gcm
);
assert_eq!(CipherMode::from_cipher_id(3).unwrap(), CipherMode::Aegis256);
assert!(CipherMode::from_cipher_id(99).is_err());
// test header creation
let header = ctx.create_turso_header();
assert_eq!(&header[..5], b"Turso");
assert_eq!(header[5], TURSO_VERSION);
assert_eq!(header[6], 3); // AEGIS-256
assert_eq!(&header[7..], &[0u8; 9]); // unused bytes are zero
}
#[test]
fn test_invalid_turso_header_fails_decrypt() {
let key = EncryptionKey::from_hex_string(&generate_random_hex_key()).unwrap();
let ctx = EncryptionContext::new(CipherMode::Aegis256, &key, DEFAULT_ENCRYPTED_PAGE_SIZE)
.unwrap();
let page_data = create_test_page_1();
let encrypted = ctx.encrypt_page(&page_data, 1).unwrap();
// corrupt the header prefix
let mut corrupted = encrypted.clone();
corrupted[0] = b'V'; // make `Turso` to `Vurso`
assert!(ctx.decrypt_page(&corrupted, 1).is_err());
// test with wrong cipher ID
let mut wrong_cipher = encrypted.clone();
wrong_cipher[6] = 99; // invalid cipher ID
assert!(ctx.decrypt_page(&wrong_cipher, 1).is_err());
}
#[test]
fn test_associated_data_validation() {
let key = EncryptionKey::from_hex_string(&generate_random_hex_key()).unwrap();
let ctx = EncryptionContext::new(CipherMode::Aegis256, &key, DEFAULT_ENCRYPTED_PAGE_SIZE)
.unwrap();
let page_data = create_test_page_1();
let encrypted = ctx.encrypt_page(&page_data, 1).unwrap();
// modify a byte in the preserved header portion (bytes 16-100)
let mut corrupted_ad = encrypted.clone();
corrupted_ad[50] ^= 1; // flip one bit in the associated data portion
// this should fail decryption because associated data doesn't match
let decrypt_result = ctx.decrypt_page(&corrupted_ad, 1);
assert!(
decrypt_result.is_err(),
"Decryption should fail with corrupted associated data"
);
}
#[test]
fn test_turso_header_corruption_detection() {
let key = EncryptionKey::from_hex_string(&generate_random_hex_key()).unwrap();
let ctx = EncryptionContext::new(CipherMode::Aegis256, &key, DEFAULT_ENCRYPTED_PAGE_SIZE)
.unwrap();
let page_data = create_test_page_1();
let encrypted = ctx.encrypt_page(&page_data, 1).unwrap();
let mut corrupted_turso_header = encrypted.clone();
corrupted_turso_header[7] ^= 1;
let decrypt_result = ctx.decrypt_page(&corrupted_turso_header, 1);
assert!(
decrypt_result.is_err(),
"Decryption should fail with corrupted Turso header"
);
}
#[test]
fn test_aes128gcm_encrypt_decrypt_round_trip() {
let mut rng = rand::thread_rng();