Handle table ID / rootpages properly for both checkpointed and non-checkpointed tables

Table ID is an opaque identifier that is only meaningful to the MV store.
Each checkpointed MVCC table corresponds to a single B-tree on the pager,
which naturally has a root page.

We cannot use root page as the MVCC table ID directly because:
- We assign table IDs during MVCC commit, but
- we commit pages to the pager only during checkpoint
which means the root page is not easily knowable ahead of time.

Hence, we:

- store the mapping between table id and btree rootpage
- sqlite_schema rows will have a negative rootpage column if the
  table has not been checkpointed yet.
This commit is contained in:
Jussi Saurio
2025-09-30 14:12:28 +03:00
parent a1bdad58b6
commit a52dbb7842
12 changed files with 376 additions and 149 deletions

1
.gitignore vendored
View File

@@ -48,3 +48,4 @@ simulator-output/
bisected.sql
*.log
*.db-lg

View File

@@ -443,6 +443,13 @@ impl Database {
buffer_pool: BufferPool::begin_init(&io, arena_size),
n_connections: AtomicUsize::new(0),
});
if opts.enable_mvcc {
let mv_store = db.mv_store.as_ref().unwrap();
let mvcc_bootstrap_conn = db.connect_mvcc_bootstrap()?;
mv_store.bootstrap(mvcc_bootstrap_conn)?;
}
db.register_global_builtin_extensions()
.expect("unable to register global extensions");
@@ -487,18 +494,27 @@ impl Database {
Ok(())
})?;
}
Ok(db)
}
#[instrument(skip_all, level = Level::INFO)]
pub fn connect(self: &Arc<Database>) -> Result<Arc<Connection>> {
self._connect(false)
}
pub(crate) fn connect_mvcc_bootstrap(self: &Arc<Database>) -> Result<Arc<Connection>> {
self._connect(true)
}
#[instrument(skip_all, level = Level::INFO)]
fn _connect(
self: &Arc<Database>,
is_mvcc_bootstrap_connection: bool,
) -> Result<Arc<Connection>> {
let pager = self.init_pager(None)?;
let pager = Arc::new(pager);
if self.mv_store.is_some() {
self.maybe_recover_logical_log(pager.clone())?;
}
let page_size = pager.get_page_size_unchecked();
let default_cache_size = pager
@@ -539,6 +555,7 @@ impl Database {
sync_mode: RwLock::new(SyncMode::Full),
data_sync_retry: AtomicBool::new(false),
busy_timeout: RwLock::new(Duration::new(0, 0)),
is_mvcc_bootstrap_connection: AtomicBool::new(is_mvcc_bootstrap_connection),
});
self.n_connections
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
@@ -548,17 +565,6 @@ impl Database {
Ok(conn)
}
pub fn maybe_recover_logical_log(self: &Arc<Database>, pager: Arc<Pager>) -> Result<()> {
let Some(mv_store) = self.mv_store.clone() else {
panic!("tryign to recover logical log without mvcc");
};
if !mv_store.needs_recover() {
return Ok(());
}
mv_store.recover_logical_log(&self.io, &pager)
}
pub fn is_readonly(&self) -> bool {
self.open_flags.contains(OpenFlags::ReadOnly)
}
@@ -1052,6 +1058,8 @@ pub struct Connection {
/// User defined max accumulated Busy timeout duration
/// Default is 0 (no timeout)
busy_timeout: RwLock<std::time::Duration>,
/// Whether this is an internal connection used for MVCC bootstrap
is_mvcc_bootstrap_connection: AtomicBool,
}
impl Drop for Connection {
@@ -1066,8 +1074,20 @@ impl Drop for Connection {
}
impl Connection {
#[instrument(skip_all, level = Level::INFO)]
pub fn prepare(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Statement> {
if self.is_mvcc_bootstrap_connection() {
// Never use MV store for bootstrapping - we read state directly from sqlite_schema in the DB file.
return self._prepare(sql, None);
}
self._prepare(sql, self.db.mv_store.clone())
}
#[instrument(skip_all, level = Level::INFO)]
pub fn _prepare(
self: &Arc<Connection>,
sql: impl AsRef<str>,
mv_store: Option<Arc<MvStore>>,
) -> Result<Statement> {
if self.is_closed() {
return Err(LimboError::InternalError("Connection closed".to_string()));
}
@@ -1100,12 +1120,19 @@ impl Connection {
mode,
input,
)?;
Ok(Statement::new(
program,
self.db.mv_store.clone(),
pager,
mode,
))
Ok(Statement::new(program, mv_store, pager, mode))
}
/// Whether this is an internal connection used for MVCC bootstrap
pub fn is_mvcc_bootstrap_connection(&self) -> bool {
self.is_mvcc_bootstrap_connection.load(Ordering::SeqCst)
}
/// Promote MVCC bootstrap connection to a regular connection so it reads from the MV store again.
pub fn promote_to_regular_connection(&self) {
assert!(self.is_mvcc_bootstrap_connection.load(Ordering::SeqCst));
self.is_mvcc_bootstrap_connection
.store(false, Ordering::SeqCst);
}
/// Parse schema from scratch if version of schema for the connection differs from the schema cookie in the root page
@@ -1206,14 +1233,7 @@ impl Connection {
let stmt = self.prepare("SELECT * FROM sqlite_schema")?;
// TODO: This function below is synchronous, make it async
parse_schema_rows(
stmt,
&mut fresh,
&self.syms.read(),
None,
existing_views,
self.db.mv_store.as_ref(),
)?;
parse_schema_rows(stmt, &mut fresh, &self.syms.read(), None, existing_views)?;
tracing::debug!(
"reparse_schema: schema_version={}, tables={:?}",
@@ -1720,7 +1740,7 @@ impl Connection {
}
pub fn is_wal_auto_checkpoint_disabled(&self) -> bool {
self.wal_auto_checkpoint_disabled.load(Ordering::SeqCst)
self.wal_auto_checkpoint_disabled.load(Ordering::SeqCst) || self.db.mv_store.is_some()
}
pub fn last_insert_rowid(&self) -> i64 {
@@ -1862,14 +1882,9 @@ impl Connection {
let syms = self.syms.read();
self.with_schema_mut(|schema| {
let existing_views = schema.incremental_views.clone();
if let Err(LimboError::ExtensionError(e)) = parse_schema_rows(
rows,
schema,
&syms,
None,
existing_views,
self.db.mv_store.as_ref(),
) {
if let Err(LimboError::ExtensionError(e)) =
parse_schema_rows(rows, schema, &syms, None, existing_views)
{
// this means that a vtab exists and we no longer have the module loaded. we print
// a warning to the user to load the module
eprintln!("Warning: {e}");

View File

@@ -20,7 +20,7 @@ enum CursorPosition {
pub struct MvccLazyCursor<Clock: LogicalClock> {
pub db: Arc<MvStore<Clock>>,
current_pos: CursorPosition,
table_id: i64,
pub table_id: i64,
tx_id: u64,
}

View File

@@ -1,15 +1,18 @@
use crate::mvcc::clock::LogicalClock;
use crate::mvcc::database::{
DeleteRowStateMachine, MvStore, RowVersion, TxTimestampOrID, WriteRowStateMachine,
SQLITE_SCHEMA_MVCC_TABLE_ID,
};
use crate::state_machine::{StateMachine, StateTransition, TransitionResult};
use crate::storage::btree::BTreeCursor;
use crate::storage::pager::CreateBTreeFlags;
use crate::storage::wal::{CheckpointMode, TursoRwLock};
use crate::types::{IOResult, ImmutableRecord, RecordCursor};
use crate::{CheckpointResult, Connection, IOExt, Pager, RefValue, Result, TransactionState};
use crate::{
CheckpointResult, Connection, IOExt, Pager, RefValue, Result, TransactionState, Value,
};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::Ordering;
use std::sync::Arc;
@@ -79,7 +82,9 @@ pub struct CheckpointStateMachine<Clock: LogicalClock> {
/// State machine for deleting rows from the B-tree
delete_row_state_machine: Option<StateMachine<DeleteRowStateMachine>>,
/// Cursors for the B-trees
cursors: HashMap<i64, Arc<RwLock<BTreeCursor>>>,
cursors: HashMap<u64, Arc<RwLock<BTreeCursor>>>,
/// Tables that were destroyed in this checkpoint
destroyed_tables: HashSet<i64>,
/// Result of the checkpoint
checkpoint_result: Option<CheckpointResult>,
}
@@ -88,8 +93,14 @@ pub struct CheckpointStateMachine<Clock: LogicalClock> {
/// Special writes for CREATE TABLE / DROP TABLE ops.
/// These are used to create/destroy B-trees during pager ops.
pub enum SpecialWrite {
BTreeCreate { root_page: i64 },
BTreeDestroy { root_page: i64, num_columns: usize },
BTreeCreate {
table_id: i64,
},
BTreeDestroy {
table_id: i64,
root_page: u64,
num_columns: usize,
},
}
impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
@@ -116,6 +127,7 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
write_row_state_machine: None,
delete_row_state_machine: None,
cursors: HashMap::new(),
destroyed_tables: HashSet::new(),
checkpoint_result: None,
}
}
@@ -134,7 +146,17 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
// the MVCC store, so that we don't checkpoint the same row versions again on the next checkpoint.
let mut max_timestamp = self.checkpointed_txid_max_old;
for entry in self.mvstore.rows.iter() {
// Since table ids are negative, and we want schema changes (table_id=-1) to be processed first, we iterate in reverse order.
// Reliance on SkipMap ordering is a bit yolo-swag fragile, but oh well.
for entry in self.mvstore.rows.iter().rev() {
let key = entry.key();
if self.destroyed_tables.contains(&key.table_id) {
// We won't checkpoint rows for tables that will be destroyed in this checkpoint.
// There's two forms of destroyed table:
// 1. A non-checkpointed table that was created in the logical log and then destroyed. We don't need to do anything about this table in the pager/btree layer.
// 2. A checkpointed table that was destroyed in the logical log. We need to destroy the btree in the pager/btree layer.
continue;
}
let row_versions = entry.value().read();
let mut exists_in_db_file = false;
for (i, version) in row_versions.iter().enumerate() {
@@ -155,7 +177,10 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
continue;
}
let get_root_page = |row_data: &Vec<u8>| {
// Row versions in sqlite_schema are temporarily assigned a negative root page that is equal to the table id,
// because the root page is not known until it's actually allocated during the checkpoint.
// However, existing tables have a real root page.
let get_table_id_or_root_page_from_sqlite_schema = |row_data: &Vec<u8>| {
let row_data = ImmutableRecord::from_bin_record(row_data.clone());
let mut record_cursor = RecordCursor::new();
record_cursor.parse_full_header(&row_data).unwrap();
@@ -173,24 +198,47 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
max_timestamp = max_timestamp.max(current_version_ts);
if is_last {
let is_delete = version.end.is_some();
let should_be_deleted_from_db_file = is_delete && exists_in_db_file;
let is_delete_of_table =
is_delete && version.row.id.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID;
let is_create_of_table = !exists_in_db_file
&& !is_delete
&& version.row.id.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID;
// We might need to create or destroy a B-tree in the pager during checkpoint if a row in root page 1 is deleted or created.
let special_write =
if should_be_deleted_from_db_file && version.row.id.table_id == 1 {
let root_page = get_root_page(&version.row.data);
let special_write = if is_delete_of_table {
let root_page =
get_table_id_or_root_page_from_sqlite_schema(&version.row.data);
assert!(root_page > 0, "rootpage is positive integer");
let root_page = root_page as u64;
let table_id = *self
.mvstore
.table_id_to_rootpage
.iter()
.find(|entry| entry.value().is_some_and(|r| r == root_page))
.unwrap()
.key();
self.destroyed_tables.insert(table_id);
if exists_in_db_file {
Some(SpecialWrite::BTreeDestroy {
table_id,
root_page,
num_columns: version.row.column_count,
})
} else if !exists_in_db_file && version.row.id.table_id == 1 {
let root_page = get_root_page(&version.row.data);
Some(SpecialWrite::BTreeCreate { root_page })
} else {
None
}
} else if is_create_of_table {
let table_id =
get_table_id_or_root_page_from_sqlite_schema(&version.row.data);
assert!(table_id < 0, "table_id is negative integer");
Some(SpecialWrite::BTreeCreate { table_id })
} else {
None
};
// Only write the row to the B-tree if it is not a delete, or if it is a delete and it exists in the database file.
let should_be_deleted_from_db_file = is_delete && exists_in_db_file;
if !is_delete || should_be_deleted_from_db_file {
self.write_set.push((version.clone(), special_write));
}
@@ -209,6 +257,14 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
self.write_set.get(write_set_index)
}
/// Mutably get the current row version to write to the B-tree
fn get_current_row_version_mut(
&mut self,
write_set_index: usize,
) -> Option<&mut (RowVersion, Option<SpecialWrite>)> {
self.write_set.get_mut(write_set_index)
}
/// Check if we have more rows to write
fn has_more_rows(&self, write_set_index: usize) -> bool {
write_set_index < self.write_set.len()
@@ -314,24 +370,36 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
// Handle CREATE TABLE / DROP TABLE ops
if let Some(special_write) = special_write {
match special_write {
SpecialWrite::BTreeCreate { root_page } => {
let created_root_page =
self.pager.io.block(|| {
SpecialWrite::BTreeCreate { table_id } => {
let created_root_page = self.pager.io.block(|| {
self.pager.btree_create(&CreateBTreeFlags::new_table())
})? as i64;
assert_eq!(created_root_page , root_page, "Created root page does not match expected root page: {created_root_page} != {root_page}");
})?;
self.mvstore.insert_table_id_to_rootpage(
table_id,
Some(created_root_page as u64),
);
}
SpecialWrite::BTreeDestroy {
table_id,
root_page,
num_columns,
} => {
let cursor = if let Some(cursor) = self.cursors.get(&root_page) {
let known_root_page = self
.mvstore
.table_id_to_rootpage
.get(&table_id)
.expect("Table ID does not have a root page");
let known_root_page = known_root_page
.value()
.expect("Table ID does not have a root page");
assert_eq!(known_root_page, root_page, "MV store root page does not match root page in the sqlite_schema record: {known_root_page} != {root_page}");
let cursor = if let Some(cursor) = self.cursors.get(&known_root_page) {
cursor.clone()
} else {
let cursor = BTreeCursor::new_table(
None,
self.pager.clone(),
root_page,
known_root_page as i64,
num_columns,
);
let cursor = Arc::new(RwLock::new(cursor));
@@ -339,23 +407,79 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
cursor
};
self.pager.io.block(|| cursor.write().btree_destroy())?;
self.cursors.remove(&root_page);
self.destroyed_tables.insert(table_id);
}
}
}
if self.destroyed_tables.contains(&table_id) {
// Don't write rows for tables that will be destroyed in this checkpoint.
self.state = CheckpointState::WriteRow {
write_set_index: write_set_index + 1,
requires_seek: true,
};
return Ok(TransitionResult::Continue);
}
let root_page = {
let root_page = self
.mvstore
.table_id_to_rootpage
.get(&table_id)
.unwrap_or_else(|| {
panic!(
"Table ID does not have a root page: {table_id}, row_version: {:?}",
self.get_current_row_version(write_set_index).unwrap()
)
});
root_page.value().unwrap_or_else(|| {
panic!(
"Table ID does not have a root page: {table_id}, row_version: {:?}",
self.get_current_row_version(write_set_index).unwrap()
)
})
};
// If a table was created, it now has a real root page allocated for it, but the 'root_page' field in the sqlite_schema record is still the table id.
// So we need to rewrite the row version to use the real root page.
if let Some(SpecialWrite::BTreeCreate { table_id }) = special_write {
let root_page = {
let root_page = self
.mvstore
.table_id_to_rootpage
.get(&table_id)
.expect("Table ID does not have a root page");
root_page
.value()
.expect("Table ID does not have a root page")
};
let (row_version, _) =
self.get_current_row_version_mut(write_set_index).unwrap();
let record = ImmutableRecord::from_bin_record(row_version.row.data.clone());
let mut record_cursor = RecordCursor::new();
record_cursor.parse_full_header(&record).unwrap();
let values = record_cursor.get_values(&record)?;
let mut values = values
.into_iter()
.map(|value| value.to_owned())
.collect::<Vec<_>>();
values[3] = Value::Integer(root_page as i64);
let record = ImmutableRecord::from_values(&values, values.len());
row_version.row.data = record.get_payload().to_owned();
}
// Get or create cursor for this table
let cursor = if let Some(cursor) = self.cursors.get(&table_id) {
let cursor = if let Some(cursor) = self.cursors.get(&root_page) {
cursor.clone()
} else {
let cursor = BTreeCursor::new_table(
None, // Write directly to B-tree
self.pager.clone(),
table_id,
root_page as i64,
num_columns,
);
let cursor = Arc::new(RwLock::new(cursor));
self.cursors.insert(table_id, cursor.clone());
self.cursors.insert(root_page, cursor.clone());
cursor
};

View File

@@ -18,6 +18,9 @@ use crate::File;
use crate::IOExt;
use crate::LimboError;
use crate::Result;
use crate::Statement;
use crate::StepResult;
use crate::Value;
use crate::{Connection, Pager};
use crossbeam_skiplist::{SkipMap, SkipSet};
use parking_lot::RwLock;
@@ -25,6 +28,7 @@ use std::collections::HashSet;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::Bound;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tracing::instrument;
@@ -522,7 +526,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
row_version.clone(),
); // FIXME: optimize cloning out
if row_version.row.id.table_id == 1 {
if row_version.row.id.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID {
self.did_commit_schema_change = true;
}
}
@@ -537,7 +541,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
row_version.clone(),
); // FIXME: optimize cloning out
if row_version.row.id.table_id == 1 {
if row_version.row.id.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID {
self.did_commit_schema_change = true;
}
}
@@ -824,14 +828,28 @@ impl DeleteRowStateMachine {
}
}
pub const SQLITE_SCHEMA_MVCC_TABLE_ID: i64 = -1;
/// A multi-version concurrency control database.
#[derive(Debug)]
pub struct MvStore<Clock: LogicalClock> {
rows: SkipMap<RowID, RwLock<Vec<RowVersion>>>,
/// Table ID is an opaque identifier that is only meaningful to the MV store.
/// Each checkpointed MVCC table corresponds to a single B-tree on the pager,
/// which naturally has a root page.
/// We cannot use root page as the MVCC table ID directly because:
/// - We assign table IDs during MVCC commit, but
/// - we commit pages to the pager only during checkpoint
///
/// which means the root page is not easily knowable ahead of time.
/// Hence, we store the mapping here.
/// The value is Option because tables created in an MVCC commit that have not
/// been checkpointed yet have no real root page assigned yet.
pub table_id_to_rootpage: SkipMap<i64, Option<u64>>,
txs: SkipMap<TxID, Transaction>,
tx_ids: AtomicU64,
next_rowid: AtomicU64,
next_table_id: AtomicU64,
next_table_id: AtomicI64,
clock: Clock,
storage: Storage,
loaded_tables: RwLock<HashSet<i64>>,
@@ -864,10 +882,6 @@ pub struct MvStore<Clock: LogicalClock> {
/// If there are two concurrent BEGIN (non-CONCURRENT) transactions, and one tries to promote
/// to exclusive, it will abort if another transaction committed after its begin timestamp.
last_committed_tx_ts: AtomicU64,
/// Lock used while recovering a logical log file. We don't want multiple connections trying to
/// load the file.
recover_lock: RwLock<()>,
}
impl<Clock: LogicalClock> MvStore<Clock> {
@@ -875,10 +889,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pub fn new(clock: Clock, storage: Storage) -> Self {
Self {
rows: SkipMap::new(),
table_id_to_rootpage: SkipMap::from_iter(vec![(SQLITE_SCHEMA_MVCC_TABLE_ID, Some(1))]), // table id 1 / root page 1 is always sqlite_schema.
txs: SkipMap::new(),
tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes
next_rowid: AtomicU64::new(0), // TODO: determine this from B-Tree
next_table_id: AtomicU64::new(2), // table id 1 / root page 1 is always sqlite_schema.
next_table_id: AtomicI64::new(-2), // table id -1 / root page 1 is always sqlite_schema.
clock,
storage,
loaded_tables: RwLock::new(HashSet::new()),
@@ -891,16 +906,88 @@ impl<Clock: LogicalClock> MvStore<Clock> {
checkpointed_txid_max: AtomicU64::new(0),
last_committed_schema_change_ts: AtomicU64::new(0),
last_committed_tx_ts: AtomicU64::new(0),
recover_lock: RwLock::new(()),
}
}
/// Get the table ID from the root page.
/// If the root page is negative, it is a non-checkpointed table and the table ID and root page are both the same negative value.
/// If the root page is positive, it is a checkpointed table and there should be a corresponding table ID.
pub fn get_table_id_from_root_page(&self, root_page: i64) -> i64 {
if root_page < 0 {
// Not checkpointed table - table ID and root_page are both the same negative value
root_page
} else {
// Root page is positive: it is a checkpointed table and there should be a corresponding table ID
let root_page = root_page as u64;
let table_id = self
.table_id_to_rootpage
.iter()
.find(|entry| entry.value().is_some_and(|value| value == root_page))
.map(|entry| *entry.key())
.expect("Positive root page is not mapped to a table id");
table_id
}
}
/// Insert a table ID and root page mapping.
/// Root page must be positive here, because we only invoke this method with Some() for checkpointed tables.
pub fn insert_table_id_to_rootpage(&self, table_id: i64, root_page: Option<u64>) {
self.table_id_to_rootpage.insert(table_id, root_page);
let minimum = if let Some(root_page) = root_page {
// On recovery, we assign table_id = -root_page. Let's make sure we don't get any clashes between checkpointed and non-checkpointed tables
// E.g. if we checkpoint a table that has physical root page 7, let's require the next table_id to be less than -7 (or if table_id is already smaller, then smaller than that.)
table_id.min(-(root_page as i64))
} else {
table_id
};
if minimum < self.next_table_id.load(Ordering::SeqCst) {
self.next_table_id.store(minimum - 1, Ordering::SeqCst);
}
}
/// Bootstrap the MV store from the SQLite schema table and logical log.
/// 1. Get all root pages from the SQLite schema table (using bootstrap connection, which does not attempt to read from MV store)
/// 2. Assign table IDs to the root pages (table_id = -1 * root_page)
/// 3. Recover the logical log
/// 4. Promote the bootstrap connection to a regular connection so that it reads from the MV store again
/// 5. Make sure schema changes reflected from deserialized logical log are captured in the schema
pub fn bootstrap(&self, bootstrap_conn: Arc<Connection>) -> Result<()> {
// Get all rows from the SQLite schema table
let mut get_all_sqlite_schema_rows =
bootstrap_conn.prepare("SELECT rootpage FROM sqlite_schema WHERE type='table' AND name NOT LIKE 'sqlite_%' AND name NOT LIKE '__turso_internal_%'")?;
let sqlite_schema_root_pages = stmt_get_all_rows(&mut get_all_sqlite_schema_rows)?
.into_iter()
.map(|row| {
let root_page = row[0].as_int().expect("rootpage is integer");
assert!(root_page > 0, "rootpage is positive integer");
root_page as u64
})
.collect::<Vec<u64>>();
// Map all existing checkpointed root pages to table ids so that if root_page=R, table_id=-R
for root_page in sqlite_schema_root_pages.iter() {
self.insert_table_id_to_rootpage(-(*root_page as i64), Some(*root_page));
}
if !self.maybe_recover_logical_log(bootstrap_conn.pager.read().clone())? {
// There was no logical log to recover, so we're done.
return Ok(());
}
// Make sure we capture all the schema changes that were deserialized from the logical log.
bootstrap_conn.promote_to_regular_connection();
bootstrap_conn.reparse_schema()?;
*bootstrap_conn.db.schema.lock().unwrap() = bootstrap_conn.schema.read().clone();
Ok(())
}
/// MVCC does not use the pager/btree cursors to create pages until checkpoint.
/// This method is used to assign root page numbers when Insn::CreateBtree is used.
/// NOTE: during MVCC recovery (not implemented yet), [MvStore::next_table_id] must be
/// initialized to the current highest table id / root page number.
pub fn get_next_table_id(&self) -> u64 {
self.next_table_id.fetch_add(1, Ordering::SeqCst)
/// MVCC table ids are always negative. Their corresponding rootpage entry in sqlite_schema
/// is the same negative value if the table has not been checkpointed yet. Otherwise, the root page
/// will be positive and corresponds to the actual physical page.
pub fn get_next_table_id(&self) -> i64 {
self.next_table_id.fetch_sub(1, Ordering::SeqCst)
}
pub fn get_next_rowid(&self) -> i64 {
@@ -1682,11 +1769,15 @@ impl<Clock: LogicalClock> MvStore<Clock> {
///
/// This is initialization step for a table, where we still don't have any rows so we need to insert them if there are.
fn scan_load_table(&self, table_id: i64, pager: Arc<Pager>) -> Result<()> {
let root_page = table_id;
let entry = self
.table_id_to_rootpage
.get(&table_id)
.expect("Table ID does not have a root page");
let root_page = entry.value().expect("Table ID does not have a root page");
let mut cursor = BTreeCursor::new_table(
None, // No MVCC cursor for scanning
pager.clone(),
root_page,
root_page as i64,
1, // We'll adjust this as needed
);
loop {
@@ -1766,40 +1857,37 @@ impl<Clock: LogicalClock> MvStore<Clock> {
last_rowid
}
pub fn needs_recover(&self) -> bool {
self.storage.needs_recover()
}
pub fn mark_recovered(&self) {
self.storage.mark_recovered();
}
pub fn get_logical_log_file(&self) -> Arc<dyn File> {
self.storage.get_logical_log_file()
}
pub fn recover_logical_log(&self, io: &Arc<dyn crate::IO>, pager: &Arc<Pager>) -> Result<()> {
// Get lock, if we don't get it we will wait until recover finishes in another connection
// and then return.
let _recover_guard = self.recover_lock.write();
if !self.storage.needs_recover() {
// another connection completed recover
return Ok(());
}
// Recovers the logical log if there is any content.
// Returns true if the logical log was recovered, false otherwise.
pub fn maybe_recover_logical_log(&self, pager: Arc<Pager>) -> Result<bool> {
let file = self.get_logical_log_file();
let mut reader = StreamingLogicalLogReader::new(file.clone());
if file.size()? == 0 {
return Ok(false);
}
let c = reader.read_header()?;
io.wait_for_completion(c)?;
pager.io.wait_for_completion(c)?;
let tx_id = 0;
self.begin_load_tx(pager.clone())?;
loop {
match reader.next_record(io).unwrap() {
match reader.next_record(&pager.io).unwrap() {
StreamingResult::InsertRow { row, rowid } => {
tracing::trace!("read {rowid:?}");
tracing::trace!("read {row:?} with rowid {rowid:?}");
if self.table_id_to_rootpage.get(&row.id.table_id).is_none() {
self.insert_table_id_to_rootpage(row.id.table_id, None);
}
self.insert(tx_id, row)?;
}
StreamingResult::DeleteRow { rowid } => {
if self.table_id_to_rootpage.get(&rowid.table_id).is_none() {
self.insert_table_id_to_rootpage(rowid.table_id, None);
}
self.delete(tx_id, rowid)?;
}
StreamingResult::Eof => {
@@ -1808,8 +1896,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
}
}
self.commit_load_tx(tx_id);
self.mark_recovered();
Ok(())
Ok(true)
}
}
@@ -1915,3 +2002,28 @@ fn is_end_visible(
None => true,
}
}
fn stmt_get_all_rows(stmt: &mut Statement) -> Result<Vec<Vec<Value>>> {
let mut rows = Vec::new();
loop {
let step = stmt.step()?;
match step {
StepResult::Row => {
rows.push(stmt.row().unwrap().get_values().cloned().collect());
}
StepResult::Done => {
break;
}
StepResult::IO => {
stmt.run_once()?;
}
StepResult::Interrupt => {
return Err(LimboError::InternalError("interrupted".to_string()))
}
StepResult::Busy => {
return Err(LimboError::Busy);
}
}
}
Ok(rows)
}

View File

@@ -15,7 +15,6 @@ use crate::{types::IOResult, File};
pub struct LogicalLog {
pub file: Arc<dyn File>,
offset: u64,
needs_recovery: bool,
}
/// Log's Header, this will be the 64 bytes in any logical log file.
@@ -136,12 +135,7 @@ impl LogRecordType {
impl LogicalLog {
pub fn new(file: Arc<dyn File>) -> Self {
let recover = file.size().unwrap() > 0;
Self {
file,
offset: 0,
needs_recovery: recover,
}
Self { file, offset: 0 }
}
pub fn log_tx(&mut self, tx: &LogRecord) -> Result<IOResult<()>> {
@@ -216,14 +210,6 @@ impl LogicalLog {
self.offset = 0;
Ok(IOResult::IO(IOCompletions::Single(c)))
}
pub fn needs_recover(&self) -> bool {
self.needs_recovery
}
pub fn mark_recovered(&mut self) {
self.needs_recovery = false;
}
}
pub enum StreamingResult {
@@ -333,7 +319,9 @@ impl StreamingLogicalLogReader {
let record_type = self.consume_u8(io)?;
let _payload_size = self.consume_u64(io)?;
let mut bytes_read_on_row = 17; // table_id, record_type and payload_size
match LogRecordType::from_u8(record_type).unwrap() {
match LogRecordType::from_u8(record_type)
.unwrap_or_else(|| panic!("invalid record type: {record_type}"))
{
LogRecordType::DeleteRow => {
let (rowid, n) = self.consume_varint(io)?;
bytes_read_on_row += n;
@@ -498,7 +486,7 @@ mod tests {
let file = io.open_file(log_file, OpenFlags::ReadOnly, false).unwrap();
let mvcc_store = Arc::new(MvStore::new(LocalClock::new(), Storage::new(file.clone())));
mvcc_store.recover_logical_log(&io, &pager).unwrap();
mvcc_store.maybe_recover_logical_log(pager.clone()).unwrap();
let tx = mvcc_store.begin_tx(pager.clone()).unwrap();
let row = mvcc_store.read(tx, RowID::new(1, 1)).unwrap().unwrap();
let record = ImmutableRecord::from_bin_record(row.data.clone());
@@ -540,7 +528,7 @@ mod tests {
let file = io.open_file(log_file, OpenFlags::ReadOnly, false).unwrap();
let mvcc_store = Arc::new(MvStore::new(LocalClock::new(), Storage::new(file.clone())));
mvcc_store.recover_logical_log(&io, &pager).unwrap();
mvcc_store.maybe_recover_logical_log(pager.clone()).unwrap();
for (rowid, value) in &values {
let tx = mvcc_store.begin_tx(pager.clone()).unwrap();
let row = mvcc_store.read(tx, *rowid).unwrap().unwrap();

View File

@@ -36,14 +36,6 @@ impl Storage {
self.logical_log.write().unwrap().truncate()
}
pub fn needs_recover(&self) -> bool {
self.logical_log.read().unwrap().needs_recover()
}
pub fn mark_recovered(&self) {
self.logical_log.write().unwrap().mark_recovered();
}
pub fn get_logical_log_file(&self) -> Arc<dyn File> {
self.logical_log.write().unwrap().file.clone()
}

View File

@@ -4718,7 +4718,8 @@ impl BTreeCursor {
match &self.mv_cursor {
Some(mv_cursor) => match key.maybe_rowid() {
Some(rowid) => {
let row_id = crate::mvcc::database::RowID::new(self.table_id(), rowid);
let row_id =
crate::mvcc::database::RowID::new(mv_cursor.read().table_id, rowid);
let record_buf = key.get_record().unwrap().get_payload().to_vec();
let num_columns = match key {
BTreeKey::IndexKey(record) => record.column_count(),
@@ -5464,10 +5465,6 @@ impl BTreeCursor {
btree_init_page(root_page, page_type, 0, self.pager.usable_space());
}
pub fn table_id(&self) -> i64 {
self.root_page
}
pub fn overwrite_cell(
&mut self,
page: &PageRef,

View File

@@ -56,7 +56,8 @@ pub fn translate_create_table(
};
program.extend(&opts);
if RESERVED_TABLE_PREFIXES
if !connection.is_mvcc_bootstrap_connection()
&& RESERVED_TABLE_PREFIXES
.iter()
.any(|prefix| normalized_tbl_name.starts_with(prefix))
{

View File

@@ -135,9 +135,9 @@ pub fn parse_schema_rows(
syms: &SymbolTable,
mv_tx: Option<(u64, TransactionMode)>,
mut existing_views: HashMap<String, Arc<Mutex<IncrementalView>>>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<()> {
rows.set_mv_tx(mv_tx);
let mv_store = rows.mv_store.clone();
// TODO: if we IO, this unparsed indexes is lost. Will probably need some state between
// IO runs
let mut from_sql_indexes = Vec::with_capacity(10);
@@ -173,7 +173,7 @@ pub fn parse_schema_rows(
&mut dbsp_state_roots,
&mut dbsp_state_index_roots,
&mut materialized_view_info,
mv_store,
mv_store.as_ref(),
)?
}
StepResult::IO => {

View File

@@ -1037,8 +1037,8 @@ pub fn op_open_read(
let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap();
let mv_cursor = if let Some(tx_id) = program.connection.get_mv_tx_id() {
let table_id = *root_page;
let mv_store = mv_store.unwrap().clone();
let table_id = mv_store.get_table_id_from_root_page(*root_page);
let mv_cursor = Arc::new(RwLock::new(
MvCursor::new(mv_store, tx_id, table_id, pager.clone()).unwrap(),
));
@@ -2244,7 +2244,6 @@ pub fn op_transaction_inner(
},
insn
);
tracing::info!("op_transaction: mv_store.is_some()={}", mv_store.is_some());
let pager = program.get_pager_from_database_index(db);
loop {
match state.op_transaction_state {
@@ -6649,8 +6648,8 @@ pub fn op_open_write(
_ => None,
};
let mv_cursor = if let Some(tx_id) = program.connection.get_mv_tx_id() {
let table_id = root_page;
let mv_store = mv_store.unwrap().clone();
let table_id = mv_store.get_table_id_from_root_page(root_page);
let mv_cursor = Arc::new(RwLock::new(
MvCursor::new(mv_store.clone(), tx_id, table_id, pager.clone()).unwrap(),
));
@@ -6739,7 +6738,7 @@ pub fn op_create_btree(
if let Some(mv_store) = mv_store {
let root_page = mv_store.get_next_table_id();
state.registers[*root] = Register::Value(Value::Integer(root_page as i64));
state.registers[*root] = Register::Value(Value::Integer(root_page));
state.pc += 1;
return Ok(InsnFunctionStepResult::Step);
}
@@ -6959,7 +6958,6 @@ pub fn op_parse_schema(
&conn.syms.read(),
program.connection.get_mv_tx(),
existing_views,
mv_store,
)
})
} else {
@@ -6975,7 +6973,6 @@ pub fn op_parse_schema(
&conn.syms.read(),
program.connection.get_mv_tx(),
existing_views,
mv_store,
)
})
};
@@ -7423,8 +7420,8 @@ pub fn op_open_ephemeral(
let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();
let mv_cursor = if let Some(tx_id) = program.connection.get_mv_tx_id() {
let table_id = root_page;
let mv_store = mv_store.unwrap().clone();
let table_id = mv_store.get_table_id_from_root_page(root_page);
let mv_cursor = Arc::new(RwLock::new(
MvCursor::new(mv_store.clone(), tx_id, table_id, pager.clone()).unwrap(),
));
@@ -7522,8 +7519,8 @@ pub fn op_open_dup(
let pager = &original_cursor.pager;
let mv_cursor = if let Some(tx_id) = program.connection.get_mv_tx_id() {
let table_id = root_page;
let mv_store = mv_store.unwrap().clone();
let table_id = mv_store.get_table_id_from_root_page(root_page);
let mv_cursor = Arc::new(RwLock::new(MvCursor::new(
mv_store,
tx_id,

View File

@@ -571,7 +571,7 @@ fn test_mvcc_recovery_of_both_checkpointed_and_noncheckpointed_tables_works() {
let value = i * 10;
execute_and_log(
&conn,
&format!("INSERT INTO test1 (id, value) VALUES ({}, {})", i, value),
&format!("INSERT INTO test1 (id, value) VALUES ({i}, {value})"),
)
.unwrap();
expected_rows1.push((i, value));
@@ -592,7 +592,7 @@ fn test_mvcc_recovery_of_both_checkpointed_and_noncheckpointed_tables_works() {
let value = i * 20;
execute_and_log(
&conn,
&format!("INSERT INTO test2 (id, value) VALUES ({}, {})", i, value),
&format!("INSERT INTO test2 (id, value) VALUES ({i}, {value})"),
)
.unwrap();
expected_rows2.push((i, value));