mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-06 00:34:23 +01:00
Merge 'Don't allow autovacuum to be flipped on non-empty databases' from Pavan Nambi
Turso incorrectly creates the first table in an autovacuumed table in page 2. (Note: this is on collaboration with @LeMikaelF) SQLite does not allow enabling or disabling auto-vacuum after the first table has been created (https://sqlite.org/pragma.html#pragma_auto_vacuum). This is because the sequence of the pages in the databases is different when auto-vacuum is enabled, because the first b-tree page must be page 3 instead of 2, to make room for the first [Pointer Map page](https://sqlite.org/fileformat.html#pointer_map_or_ptrmap_pages). But Turso doesn't currently consider this, which can lead to data loss. The simplest way to reproduce this is to create an autovacuumed databases with either `pragma auto_vacuum=full` so that autovacuum runs on each commit, and then create a table with some data. Turso will incorrectly create the new table on page 2. After this, every time a new page is created, either through a page split or because a new table is created, Turso will write a 5-byte pointer in page 2, starting from the top of the page, thereby overwriting existing data. For example, let's start with a clean database and the first bytes of page 2. It starts with `0d`, the discriminator for a leaf page ([source](https://www.sqlite.org/fileformat.html#b_tree_pages)). The next interesting number is the number of cells contained in this page (`01`) at offset 5. ``` $ cargo run -- /tmp/a.db turso> create table t(a); turso> insert into t values ('myvalue'); $ dbtotxt /tmp/a.db | size 8192 pagesize 4096 filename a.db | page 1 offset 0 # ...snip... | page 2 offset 4096 | 0: 0d 00 00 00 01 0f f5 00 0f f5 00 00 00 00 00 00 ................ | 4080: 00 00 00 00 00 09 01 02 1b 6d 79 76 61 6c 75 65 .........myvalue | end a.db ``` Pointer map pages are located every N pages, starting from page 2, and contain a list of 5-byte pointers that represent the parent page of a certain page. So whenever Turso or SQLite needs to add a page, it will overwrite 5 bytes of page 2. This means that for data loss to occur, it is sufficient to add a single page to the database, for example by creating a table. Offset 5 will then be zeroed out: ``` $ cargo run -- /tmp/a.db turso> create table t(a); turso> insert into t values ('myvalue'); turso> pragma auto_vacuum=full; turso> create table tt(a); $ dbtotxt /tmp/a.db | size 12288 pagesize 4096 filename a.db | page 1 offset 0 # ...snip... | page 2 offset 4096 | 0: 01 00 00 00 00 0f f5 00 0f f5 00 00 00 00 00 00 ................ | 4080: 00 00 00 00 00 09 01 02 1b 6d 79 76 61 6c 75 65 .........myvalue ``` Creating more tables, or adding more B-tree pages, will keep overwriting the rest of the page, until the cells themselves are also overwritten. ## Reproducing the issue in the simulator We have been unable to reproduce this exact corruption mode in the simulator, but patching it shows many failure modes, all of which don't occur with the unpatched simulator. The following seeds are failing. The following seeds are showing the issue when the patched simulator is ran against `main`: - `11522841279124073062`, with "Assertion 'table inquisitive_graham_159 should contain all of its expected values' failed: table inquisitive_graham_159 does not contain the expected values, the simulator model has more rows than the database" - `7057400018220918989`, `16028085350691325843`, `7721542713659053944`, and `203017821863546118`, with "Failed to read ptrmap key=XXX" - `12533694709304969540`, `18357088553315413457`, `3108945730906932377`, with "Integrity Check Failed: Cell N in page 2 is out of range." - `4757352625344646473`, with "dirty pages should be empty for read txn" - `7083498604824302257`, with "header_size: 6272, header_len_bytes: 2, payload.len(): 13" - `17881876827470741581`, with "ParseError("no such table: focused_historians_416")" - `2092231500503735693`, with "range end index 4789 out of range for slice of length 4096" - `7555257419378470845`, with malformed database schema (imaginative_ontivero\u{1})" - `12905270229511147245`, with "index out of bounds: the len is 4096 but the index is 4096" ## Fixing the issue - When DB is opened, we read the `auto_vacuum` state, instead of assuming `auto_vacuum=none`. - Don't allow auto_vacuum to be flipped on non-empty databases as if we allow this it could cause overlap with existing bits.(ptrmap could overwrite existing data) - Modify integrity check to avoid reporting that page 2 is orphaned in auto-vacuumed databases. Fixes #3752 Closes #3830
This commit is contained in:
24
core/lib.rs
24
core/lib.rs
@@ -43,6 +43,7 @@ mod numeric;
|
||||
use crate::index_method::IndexMethod;
|
||||
use crate::storage::checksum::CHECKSUM_REQUIRED_RESERVED_BYTES;
|
||||
use crate::storage::encryption::AtomicCipherMode;
|
||||
use crate::storage::pager::{AutoVacuumMode, HeaderRef};
|
||||
use crate::translate::display::PlanContext;
|
||||
use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME;
|
||||
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
|
||||
@@ -566,6 +567,29 @@ impl Database {
|
||||
pager.enable_encryption(self.opts.enable_encryption);
|
||||
let pager = Arc::new(pager);
|
||||
|
||||
if self.db_state.get().is_initialized() {
|
||||
let header_ref = pager.io.block(|| HeaderRef::from_pager(&pager))?;
|
||||
|
||||
let header = header_ref.borrow();
|
||||
|
||||
let mode = if header.vacuum_mode_largest_root_page.get() > 0 {
|
||||
if header.incremental_vacuum_enabled.get() > 0 {
|
||||
AutoVacuumMode::Incremental
|
||||
} else {
|
||||
AutoVacuumMode::Full
|
||||
}
|
||||
} else {
|
||||
AutoVacuumMode::None
|
||||
};
|
||||
|
||||
pager.set_auto_vacuum_mode(mode);
|
||||
|
||||
tracing::debug!(
|
||||
"Opened existing database. Detected auto_vacuum_mode from header: {:?}",
|
||||
mode
|
||||
);
|
||||
}
|
||||
|
||||
let page_size = pager.get_page_size_unchecked();
|
||||
|
||||
let default_cache_size = pager
|
||||
|
||||
@@ -5784,6 +5784,8 @@ pub(crate) enum PageCategory {
|
||||
Overflow,
|
||||
FreeListTrunk,
|
||||
FreePage,
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
PointerMap,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -1222,6 +1222,21 @@ impl Pager {
|
||||
BtreePageAllocMode::Exact(root_page_num),
|
||||
));
|
||||
let allocated_page_id = page.get().id as u32;
|
||||
|
||||
return_if_io!(self.with_header_mut(|header| {
|
||||
if allocated_page_id
|
||||
> header.vacuum_mode_largest_root_page.get()
|
||||
{
|
||||
tracing::debug!(
|
||||
"Updating largest root page in header from {} to {}",
|
||||
header.vacuum_mode_largest_root_page.get(),
|
||||
allocated_page_id
|
||||
);
|
||||
header.vacuum_mode_largest_root_page =
|
||||
allocated_page_id.into();
|
||||
}
|
||||
}));
|
||||
|
||||
if allocated_page_id != root_page_num {
|
||||
// TODO(Zaid): Handle swapping the allocated page with the desired root page
|
||||
}
|
||||
@@ -2886,7 +2901,7 @@ impl CreateBTreeFlags {
|
||||
** identifies the parent page in the btree.
|
||||
*/
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
mod ptrmap {
|
||||
pub(crate) mod ptrmap {
|
||||
use crate::{storage::sqlite3_ondisk::PageSize, LimboError, Result};
|
||||
|
||||
// Constants
|
||||
|
||||
@@ -241,6 +241,18 @@ fn update_pragma(
|
||||
Ok((program, TransactionMode::None))
|
||||
}
|
||||
PragmaName::AutoVacuum => {
|
||||
let is_empty = is_database_empty(resolver.schema, &pager)?;
|
||||
tracing::debug!(
|
||||
"Checking if database is empty for auto_vacuum pragma: {}",
|
||||
is_empty
|
||||
);
|
||||
|
||||
if !is_empty {
|
||||
// SQLite's behavior is to silently ignore this pragma if the database is not empty.
|
||||
tracing::debug!("Attempted to set auto_vacuum, database is not empty so we are ignoring pragma.");
|
||||
return Ok((program, TransactionMode::None));
|
||||
}
|
||||
|
||||
let auto_vacuum_mode = match value {
|
||||
Expr::Name(name) => {
|
||||
let name = name.as_str().as_bytes();
|
||||
@@ -894,3 +906,30 @@ fn update_page_size(connection: Arc<crate::Connection>, page_size: u32) -> crate
|
||||
connection.reset_page_size(page_size)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_database_empty(schema: &Schema, pager: &Arc<Pager>) -> crate::Result<bool> {
|
||||
if schema.tables.len() > 1 {
|
||||
return Ok(false);
|
||||
}
|
||||
if let Some(table_arc) = schema.tables.values().next() {
|
||||
let table_name = match table_arc.as_ref() {
|
||||
crate::schema::Table::BTree(tbl) => &tbl.name,
|
||||
crate::schema::Table::Virtual(tbl) => &tbl.name,
|
||||
crate::schema::Table::FromClauseSubquery(tbl) => &tbl.name,
|
||||
};
|
||||
|
||||
if table_name != "sqlite_schema" {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
let db_size_result = pager
|
||||
.io
|
||||
.block(|| pager.with_header(|header| header.database_size.get()));
|
||||
|
||||
match db_size_result {
|
||||
Err(_) => Ok(true),
|
||||
Ok(0 | 1) => Ok(true),
|
||||
Ok(_) => Ok(false),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8140,6 +8140,33 @@ pub fn op_integrity_check(
|
||||
expected_count: integrity_check_state.freelist_count.expected_count,
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
{
|
||||
let auto_vacuum_mode = pager.get_auto_vacuum_mode();
|
||||
if !matches!(
|
||||
auto_vacuum_mode,
|
||||
crate::storage::pager::AutoVacuumMode::None
|
||||
) {
|
||||
tracing::debug!("Integrity check: auto-vacuum mode detected ({:?}). Scanning for pointer-map pages.", auto_vacuum_mode);
|
||||
let page_size = pager.get_page_size_unchecked().get() as usize;
|
||||
|
||||
for page_number in 2..=integrity_check_state.db_size {
|
||||
if crate::storage::pager::ptrmap::is_ptrmap_page(
|
||||
page_number as u32,
|
||||
page_size,
|
||||
) {
|
||||
tracing::debug!("Integrity check: Found and marking pointer-map page as visited: page_id={}", page_number);
|
||||
|
||||
integrity_check_state.start(
|
||||
page_number as i64,
|
||||
PageCategory::PointerMap,
|
||||
errors,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for page_number in 2..=integrity_check_state.db_size {
|
||||
if !integrity_check_state
|
||||
.page_reference
|
||||
|
||||
@@ -237,6 +237,7 @@ impl InteractionPlan {
|
||||
Query::AlterTable(_) => stats.alter_table_count += 1,
|
||||
Query::DropIndex(_) => stats.drop_index_count += 1,
|
||||
Query::Placeholder => {}
|
||||
Query::Pragma(_) => stats.pragma_count += 1,
|
||||
}
|
||||
}
|
||||
for interactions in &self.plan {
|
||||
@@ -771,6 +772,7 @@ pub(crate) struct InteractionStats {
|
||||
pub rollback_count: u32,
|
||||
pub alter_table_count: u32,
|
||||
pub drop_index_count: u32,
|
||||
pub pragma_count: u32,
|
||||
}
|
||||
|
||||
impl Display for InteractionStats {
|
||||
|
||||
@@ -302,7 +302,6 @@ impl Property {
|
||||
|
||||
let rows = insert.rows();
|
||||
let row = &rows[*row_index];
|
||||
|
||||
match &query {
|
||||
Query::Delete(Delete {
|
||||
table: t,
|
||||
@@ -1381,6 +1380,7 @@ pub(super) struct Remaining {
|
||||
pub drop: u32,
|
||||
pub alter_table: u32,
|
||||
pub drop_index: u32,
|
||||
pub pragma_count: u32,
|
||||
}
|
||||
|
||||
pub(super) fn remaining(
|
||||
@@ -1401,6 +1401,7 @@ pub(super) fn remaining(
|
||||
let total_drop = (max_interactions * opts.drop_table_weight) / total_weight;
|
||||
let total_alter_table = (max_interactions * opts.alter_table_weight) / total_weight;
|
||||
let total_drop_index = (max_interactions * opts.drop_index) / total_weight;
|
||||
let total_pragma = (max_interactions * opts.pragma_weight) / total_weight;
|
||||
|
||||
let remaining_select = total_select
|
||||
.checked_sub(stats.select_count)
|
||||
@@ -1421,6 +1422,9 @@ pub(super) fn remaining(
|
||||
.checked_sub(stats.update_count)
|
||||
.unwrap_or_default();
|
||||
let remaining_drop = total_drop.checked_sub(stats.drop_count).unwrap_or_default();
|
||||
let remaining_pragma = total_pragma
|
||||
.checked_sub(stats.pragma_count)
|
||||
.unwrap_or_default();
|
||||
|
||||
let remaining_alter_table = total_alter_table
|
||||
.checked_sub(stats.alter_table_count)
|
||||
@@ -1455,6 +1459,7 @@ pub(super) fn remaining(
|
||||
update: remaining_update,
|
||||
alter_table: remaining_alter_table,
|
||||
drop_index: remaining_drop_index,
|
||||
pragma_count: remaining_pragma,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,12 +5,15 @@ use crate::{
|
||||
use rand::{
|
||||
Rng,
|
||||
distr::{Distribution, weighted::WeightedIndex},
|
||||
seq::IndexedRandom,
|
||||
};
|
||||
use sql_generation::{
|
||||
generation::{Arbitrary, ArbitraryFrom, GenerationContext, query::SelectFree},
|
||||
model::{
|
||||
query::{
|
||||
Create, CreateIndex, Delete, DropIndex, Insert, Select, alter_table::AlterTable,
|
||||
Create, CreateIndex, Delete, DropIndex, Insert, Select,
|
||||
alter_table::AlterTable,
|
||||
pragma::{Pragma, VacuumMode},
|
||||
update::Update,
|
||||
},
|
||||
table::Table,
|
||||
@@ -82,6 +85,18 @@ fn random_create_index<R: rand::Rng + ?Sized>(
|
||||
Query::CreateIndex(create_index)
|
||||
}
|
||||
|
||||
fn random_pragma<R: rand::Rng + ?Sized>(rng: &mut R, _conn_ctx: &impl GenerationContext) -> Query {
|
||||
const ALL_MODES: [VacuumMode; 2] = [
|
||||
VacuumMode::None,
|
||||
// VacuumMode::Incremental, not implemented yet
|
||||
VacuumMode::Full,
|
||||
];
|
||||
|
||||
let mode = ALL_MODES.choose(rng).unwrap();
|
||||
|
||||
Query::Pragma(Pragma::AutoVacuumMode(mode.clone()))
|
||||
}
|
||||
|
||||
fn random_alter_table<R: rand::Rng + ?Sized>(
|
||||
rng: &mut R,
|
||||
conn_ctx: &impl GenerationContext,
|
||||
@@ -140,6 +155,7 @@ impl QueryDiscriminants {
|
||||
QueryDiscriminants::Placeholder => {
|
||||
unreachable!("Query Placeholders should not be generated")
|
||||
}
|
||||
QueryDiscriminants::Pragma => random_pragma,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,6 +180,7 @@ impl QueryDiscriminants {
|
||||
QueryDiscriminants::Placeholder => {
|
||||
unreachable!("Query Placeholders should not be generated")
|
||||
}
|
||||
QueryDiscriminants::Pragma => remaining.pragma_count,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use sql_generation::model::{
|
||||
query::{
|
||||
Create, CreateIndex, Delete, Drop, DropIndex, Insert, Select,
|
||||
alter_table::{AlterTable, AlterTableType},
|
||||
pragma::Pragma,
|
||||
select::{CompoundOperator, FromClause, ResultColumn, SelectInner},
|
||||
transaction::{Begin, Commit, Rollback},
|
||||
update::Update,
|
||||
@@ -34,6 +35,7 @@ pub enum Query {
|
||||
Begin(Begin),
|
||||
Commit(Commit),
|
||||
Rollback(Rollback),
|
||||
Pragma(Pragma),
|
||||
/// Placeholder query that still needs to be generated
|
||||
Placeholder,
|
||||
}
|
||||
@@ -81,8 +83,11 @@ impl Query {
|
||||
| Query::DropIndex(DropIndex {
|
||||
table_name: table, ..
|
||||
}) => IndexSet::from_iter([table.clone()]),
|
||||
Query::Begin(_) | Query::Commit(_) | Query::Rollback(_) => IndexSet::new(),
|
||||
Query::Placeholder => IndexSet::new(),
|
||||
Query::Begin(_)
|
||||
| Query::Commit(_)
|
||||
| Query::Rollback(_)
|
||||
| Query::Placeholder
|
||||
| Query::Pragma(_) => IndexSet::new(),
|
||||
}
|
||||
}
|
||||
pub fn uses(&self) -> Vec<String> {
|
||||
@@ -107,6 +112,7 @@ impl Query {
|
||||
}) => vec![table.clone()],
|
||||
Query::Begin(..) | Query::Commit(..) | Query::Rollback(..) => vec![],
|
||||
Query::Placeholder => vec![],
|
||||
Query::Pragma(_) => vec![],
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,6 +153,7 @@ impl Display for Query {
|
||||
Self::Commit(commit) => write!(f, "{commit}"),
|
||||
Self::Rollback(rollback) => write!(f, "{rollback}"),
|
||||
Self::Placeholder => Ok(()),
|
||||
Query::Pragma(pragma) => write!(f, "{pragma}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -169,12 +176,14 @@ impl Shadow for Query {
|
||||
Query::Commit(commit) => Ok(commit.shadow(env)),
|
||||
Query::Rollback(rollback) => Ok(rollback.shadow(env)),
|
||||
Query::Placeholder => Ok(vec![]),
|
||||
Query::Pragma(Pragma::AutoVacuumMode(_)) => Ok(vec![]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bitflags! {
|
||||
pub struct QueryCapabilities: u32 {
|
||||
const NONE = 0;
|
||||
const CREATE = 1 << 0;
|
||||
const SELECT = 1 << 1;
|
||||
const INSERT = 1 << 2;
|
||||
@@ -222,6 +231,7 @@ impl From<QueryDiscriminants> for QueryCapabilities {
|
||||
QueryDiscriminants::Placeholder => {
|
||||
unreachable!("QueryCapabilities do not apply to query Placeholder")
|
||||
}
|
||||
QueryDiscriminants::Pragma => QueryCapabilities::NONE,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -237,6 +247,7 @@ impl QueryDiscriminants {
|
||||
QueryDiscriminants::CreateIndex,
|
||||
QueryDiscriminants::AlterTable,
|
||||
QueryDiscriminants::DropIndex,
|
||||
QueryDiscriminants::Pragma,
|
||||
];
|
||||
}
|
||||
|
||||
|
||||
@@ -26,6 +26,8 @@ pub struct QueryProfile {
|
||||
pub alter_table_weight: u32,
|
||||
#[garde(skip)]
|
||||
pub drop_index: u32,
|
||||
#[garde(skip)]
|
||||
pub pragma_weight: u32,
|
||||
}
|
||||
|
||||
impl Default for QueryProfile {
|
||||
@@ -41,6 +43,7 @@ impl Default for QueryProfile {
|
||||
drop_table_weight: 2,
|
||||
alter_table_weight: 2,
|
||||
drop_index: 2,
|
||||
pragma_weight: 2,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -56,6 +59,7 @@ impl QueryProfile {
|
||||
+ self.delete_weight
|
||||
+ self.drop_table_weight
|
||||
+ self.alter_table_weight
|
||||
+ self.pragma_weight
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ pub mod delete;
|
||||
pub mod drop;
|
||||
pub mod drop_index;
|
||||
pub mod insert;
|
||||
pub mod pragma;
|
||||
pub mod predicate;
|
||||
pub mod select;
|
||||
pub mod transaction;
|
||||
|
||||
32
sql_generation/model/query/pragma.rs
Normal file
32
sql_generation/model/query/pragma.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum Pragma {
|
||||
AutoVacuumMode(VacuumMode),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum VacuumMode {
|
||||
None,
|
||||
Incremental,
|
||||
Full,
|
||||
}
|
||||
|
||||
impl Display for Pragma {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Pragma::AutoVacuumMode(vacuum_mode) => {
|
||||
let mode = match vacuum_mode {
|
||||
VacuumMode::None => "none",
|
||||
VacuumMode::Incremental => "incremental",
|
||||
VacuumMode::Full => "full",
|
||||
};
|
||||
|
||||
write!(f, "PRAGMA auto_vacuum={mode}")?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user