mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-15 14:14:20 +01:00
core/mvcc: remove intialize of mvcc table
This commit is contained in:
@@ -929,7 +929,6 @@ pub struct MvStore<Clock: LogicalClock> {
|
||||
next_table_id: AtomicI64,
|
||||
clock: Clock,
|
||||
storage: Storage,
|
||||
loaded_tables: RwLock<HashSet<MVTableId>>,
|
||||
|
||||
/// The transaction ID of a transaction that has acquired an exclusive write lock, if any.
|
||||
///
|
||||
@@ -973,7 +972,6 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
next_table_id: AtomicI64::new(-2), // table id -1 / root page 1 is always sqlite_schema.
|
||||
clock,
|
||||
storage,
|
||||
loaded_tables: RwLock::new(HashSet::new()),
|
||||
exclusive_tx: AtomicU64::new(NO_EXCLUSIVE_TX),
|
||||
commit_coordinator: Arc::new(CommitCoordinator {
|
||||
pager_commit_lock: Arc::new(TursoRwLock::new()),
|
||||
@@ -1834,125 +1832,6 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
Ok(state_machine)
|
||||
}
|
||||
|
||||
/// Try to scan for row ids in the table.
|
||||
///
|
||||
/// This function loads all row ids of a table if the rowids of table were not populated yet.
|
||||
/// TODO: This is quite expensive so we should try and load rowids in a lazy way.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
pub fn maybe_initialize_table(&self, table_id: MVTableId, pager: Arc<Pager>) -> Result<()> {
|
||||
tracing::trace!("scan_row_ids_for_table(table_id={})", table_id);
|
||||
|
||||
// First, check if the table is already loaded.
|
||||
if self.loaded_tables.read().contains(&table_id) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if !self
|
||||
.table_id_to_rootpage
|
||||
.get(&table_id)
|
||||
.is_some_and(|entry| entry.value().is_some())
|
||||
{
|
||||
// Not a checkpointed table; doesn't need to be initialized
|
||||
self.mark_table_as_loaded(table_id);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Then, scan the disk B-tree to find existing rows
|
||||
self.scan_load_table(table_id, pager)?;
|
||||
|
||||
self.mark_table_as_loaded(table_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Mark table as loaded
|
||||
pub fn mark_table_as_loaded(&self, table_id: MVTableId) {
|
||||
self.loaded_tables.write().insert(table_id);
|
||||
}
|
||||
|
||||
/// Scans the table and inserts the rows into the database.
|
||||
///
|
||||
/// This is initialization step for a table, where we still don't have any rows so we need to insert them if there are.
|
||||
fn scan_load_table(&self, table_id: MVTableId, pager: Arc<Pager>) -> Result<()> {
|
||||
let entry = self
|
||||
.table_id_to_rootpage
|
||||
.get(&table_id)
|
||||
.unwrap_or_else(|| panic!("Table ID does not have a root page: {table_id}"));
|
||||
let root_page = entry
|
||||
.value()
|
||||
.unwrap_or_else(|| panic!("Table ID does not have a root page: {table_id}"));
|
||||
let mut cursor = BTreeCursor::new_table(
|
||||
pager.clone(),
|
||||
root_page as i64,
|
||||
1, // We'll adjust this as needed
|
||||
);
|
||||
loop {
|
||||
match cursor
|
||||
.rewind()
|
||||
.map_err(|e| LimboError::InternalError(e.to_string()))?
|
||||
{
|
||||
IOResult::Done(()) => break,
|
||||
IOResult::IO(io) => {
|
||||
io.wait(pager.io.as_ref())?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
loop {
|
||||
let rowid_result = cursor
|
||||
.rowid()
|
||||
.map_err(|e| LimboError::InternalError(e.to_string()))?;
|
||||
let row_id = match rowid_result {
|
||||
IOResult::Done(Some(row_id)) => row_id,
|
||||
IOResult::Done(None) => break,
|
||||
IOResult::IO(io) => {
|
||||
io.wait(pager.io.as_ref())?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
'record: loop {
|
||||
match cursor.record()? {
|
||||
IOResult::Done(Some(record)) => {
|
||||
let id = RowID { table_id, row_id };
|
||||
let column_count = record.column_count();
|
||||
// We insert row with 0 timestamp, because it's the only version we have on initialization.
|
||||
self.insert_version(
|
||||
id,
|
||||
RowVersion {
|
||||
begin: Some(TxTimestampOrID::Timestamp(0)),
|
||||
end: None,
|
||||
row: Row::new(id, record.get_payload().to_vec(), column_count),
|
||||
},
|
||||
);
|
||||
break 'record;
|
||||
}
|
||||
IOResult::Done(None) => break,
|
||||
IOResult::IO(io) => {
|
||||
io.wait(pager.io.as_ref())?;
|
||||
} // FIXME: lazy me not wanting to do state machine right now
|
||||
}
|
||||
}
|
||||
|
||||
// Move to next record
|
||||
'next: loop {
|
||||
match cursor.next()? {
|
||||
IOResult::Done(has_next) => {
|
||||
if !has_next {
|
||||
break;
|
||||
}
|
||||
break 'next;
|
||||
}
|
||||
IOResult::IO(io) => {
|
||||
io.wait(pager.io.as_ref())?;
|
||||
} // FIXME: lazy me not wanting to do state machine right now
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_last_rowid(&self, table_id: MVTableId) -> Option<i64> {
|
||||
let last_rowid = self
|
||||
.rows
|
||||
|
||||
Reference in New Issue
Block a user