mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-07 09:14:26 +01:00
core/mvcc: load table's rowid on initialization
We need to load rowids into mvcc's store in order before doing any read in case there are rows. This has a performance penalty for now as expected because we should, ideally, scan for row ids lazily instead.
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
use crate::mvcc::clock::LogicalClock;
|
||||
use crate::mvcc::database::{MvStore, Result, Row, RowID};
|
||||
use crate::Pager;
|
||||
use std::fmt::Debug;
|
||||
use std::rc::Rc;
|
||||
|
||||
@@ -21,13 +22,20 @@ pub struct MvccLazyCursor<Clock: LogicalClock> {
|
||||
}
|
||||
|
||||
impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
|
||||
pub fn new(db: Rc<MvStore<Clock>>, tx_id: u64, table_id: u64) -> Result<MvccLazyCursor<Clock>> {
|
||||
Ok(Self {
|
||||
pub fn new(
|
||||
db: Rc<MvStore<Clock>>,
|
||||
tx_id: u64,
|
||||
table_id: u64,
|
||||
pager: Rc<Pager>,
|
||||
) -> Result<MvccLazyCursor<Clock>> {
|
||||
db.maybe_initialize_table(table_id, pager)?;
|
||||
let cursor = Self {
|
||||
db,
|
||||
tx_id,
|
||||
current_pos: CursorPosition::BeforeFirst,
|
||||
table_id,
|
||||
})
|
||||
};
|
||||
Ok(cursor)
|
||||
}
|
||||
|
||||
/// Insert a row into the table.
|
||||
@@ -40,18 +48,37 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn current_row_id(&self) -> Option<RowID> {
|
||||
pub fn current_row_id(&mut self) -> Option<RowID> {
|
||||
match self.current_pos {
|
||||
CursorPosition::Loaded(id) => Some(id),
|
||||
CursorPosition::BeforeFirst => None,
|
||||
CursorPosition::BeforeFirst => {
|
||||
// If we are before first, we need to try and find the first row.
|
||||
let maybe_rowid = self.db.get_next_row_id_for_table(self.table_id, i64::MIN);
|
||||
if let Some(id) = maybe_rowid {
|
||||
self.current_pos = CursorPosition::Loaded(id);
|
||||
Some(id)
|
||||
} else {
|
||||
self.current_pos = CursorPosition::BeforeFirst;
|
||||
None
|
||||
}
|
||||
}
|
||||
CursorPosition::End => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn current_row(&self) -> Result<Option<Row>> {
|
||||
pub fn current_row(&mut self) -> Result<Option<Row>> {
|
||||
match self.current_pos {
|
||||
CursorPosition::Loaded(id) => self.db.read(self.tx_id, id),
|
||||
CursorPosition::BeforeFirst => Ok(None),
|
||||
CursorPosition::BeforeFirst => {
|
||||
// If we are before first, we need to try and find the first row.
|
||||
let maybe_rowid = self.db.get_next_row_id_for_table(self.table_id, i64::MIN);
|
||||
if let Some(id) = maybe_rowid {
|
||||
self.current_pos = CursorPosition::Loaded(id);
|
||||
self.db.read(self.tx_id, id)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
CursorPosition::End => Ok(None),
|
||||
}
|
||||
}
|
||||
@@ -65,7 +92,8 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
|
||||
let before_first = matches!(self.current_pos, CursorPosition::BeforeFirst);
|
||||
let min_id = match self.current_pos {
|
||||
CursorPosition::Loaded(id) => id.row_id + 1,
|
||||
CursorPosition::BeforeFirst => i64::MIN, // we need to find first row, so we look from the first id
|
||||
// TODO: do we need to forward twice?
|
||||
CursorPosition::BeforeFirst => i64::MIN, // we need to find first row, so we look from the first id,
|
||||
CursorPosition::End => {
|
||||
// let's keep same state, we reached the end so no point in moving forward.
|
||||
return false;
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
use crate::mvcc::clock::LogicalClock;
|
||||
use crate::mvcc::errors::DatabaseError;
|
||||
use crate::mvcc::persistent_storage::Storage;
|
||||
use crate::storage::btree::BTreeCursor;
|
||||
use crate::storage::btree::BTreeKey;
|
||||
use crate::types::IOResult;
|
||||
use crate::types::ImmutableRecord;
|
||||
use crate::{Connection, Pager};
|
||||
use crossbeam_skiplist::{SkipMap, SkipSet};
|
||||
use parking_lot::RwLock;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::rc::Rc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
@@ -240,6 +243,7 @@ pub struct MvStore<Clock: LogicalClock> {
|
||||
next_rowid: AtomicU64,
|
||||
clock: Clock,
|
||||
storage: Storage,
|
||||
loaded_tables: RwLock<HashSet<u64>>,
|
||||
}
|
||||
|
||||
impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
@@ -252,6 +256,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
next_rowid: AtomicU64::new(0), // TODO: determine this from B-Tree
|
||||
clock,
|
||||
storage,
|
||||
loaded_tables: RwLock::new(HashSet::new()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -419,25 +424,6 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
Ok(keys.collect())
|
||||
}
|
||||
|
||||
/// Gets all row ids in the database for a given table.
|
||||
pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
|
||||
tracing::trace!("scan_row_ids_for_table(table_id={})", table_id);
|
||||
let rows: Vec<RowID> = self
|
||||
.rows
|
||||
.range(
|
||||
RowID {
|
||||
table_id,
|
||||
row_id: 0,
|
||||
}..RowID {
|
||||
table_id,
|
||||
row_id: i64::MAX,
|
||||
},
|
||||
)
|
||||
.map(|entry| *entry.key())
|
||||
.collect();
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
pub fn get_row_id_range(
|
||||
&self,
|
||||
table_id: u64,
|
||||
@@ -667,7 +653,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
)
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))
|
||||
.unwrap();
|
||||
if let crate::types::IOResult::Done(result) = result {
|
||||
if let crate::types::IOResult::Done(_) = result {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -929,6 +915,94 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Try to scan for row ids in the table.
|
||||
///
|
||||
/// This function loads all row ids of a table if the rowids of table were not populated yet.
|
||||
/// TODO: This is quite expensive so we should try and load rowids in a lazy way.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
pub fn maybe_initialize_table(&self, table_id: u64, pager: Rc<Pager>) -> Result<()> {
|
||||
tracing::trace!("scan_row_ids_for_table(table_id={})", table_id);
|
||||
|
||||
// First, check if the table is already loaded.
|
||||
if self.loaded_tables.read().unwrap().contains(&table_id) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Then, scan the disk B-tree to find existing rows
|
||||
self.scan_load_table(table_id, pager)?;
|
||||
|
||||
self.loaded_tables.write().unwrap().insert(table_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Scans the table and inserts the rows into the database.
|
||||
///
|
||||
/// This is initialization step for a table, where we still don't have any rows so we need to insert them if there are.
|
||||
fn scan_load_table(&self, table_id: u64, pager: Rc<Pager>) -> Result<()> {
|
||||
let root_page = table_id as usize;
|
||||
let mut cursor = BTreeCursor::new_table(
|
||||
None, // No MVCC cursor for scanning
|
||||
pager, root_page, 1, // We'll adjust this as needed
|
||||
);
|
||||
loop {
|
||||
match cursor
|
||||
.rewind()
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?
|
||||
{
|
||||
IOResult::Done(()) => {
|
||||
break;
|
||||
}
|
||||
IOResult::IO => unreachable!(), // FIXME: lazy me not wanting to do state machine right now
|
||||
}
|
||||
}
|
||||
Ok(loop {
|
||||
let rowid_result = cursor
|
||||
.rowid()
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?;
|
||||
let row_id = match rowid_result {
|
||||
IOResult::Done(Some(row_id)) => row_id,
|
||||
IOResult::Done(None) => break,
|
||||
IOResult::IO => unreachable!(), // FIXME: lazy me not wanting to do state machine right now
|
||||
};
|
||||
match cursor
|
||||
.record()
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?
|
||||
{
|
||||
IOResult::Done(Some(record)) => {
|
||||
let id = RowID { table_id, row_id };
|
||||
let column_count = record.column_count();
|
||||
// We insert row with 0 timestamp, because it's the only version we have on initialization.
|
||||
self.insert_version(
|
||||
id,
|
||||
RowVersion {
|
||||
begin: TxTimestampOrID::Timestamp(0),
|
||||
end: None,
|
||||
row: Row::new(id, record.get_payload().to_vec(), column_count),
|
||||
},
|
||||
);
|
||||
}
|
||||
IOResult::Done(None) => break,
|
||||
IOResult::IO => unreachable!(), // FIXME: lazy me not wanting to do state machine right now
|
||||
}
|
||||
|
||||
// Move to next record
|
||||
match cursor
|
||||
.next()
|
||||
.map_err(|e| DatabaseError::Io(e.to_string()))?
|
||||
{
|
||||
IOResult::Done(has_next) => {
|
||||
if !has_next {
|
||||
break;
|
||||
}
|
||||
}
|
||||
IOResult::IO => unreachable!(), // FIXME: lazy me not wanting to do state machine right now
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A write-write conflict happens when transaction T_current attempts to update a
|
||||
|
||||
@@ -4278,7 +4278,7 @@ impl BTreeCursor {
|
||||
pub fn rowid(&mut self) -> Result<IOResult<Option<i64>>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
if self.has_record.get() {
|
||||
let mv_cursor = mv_cursor.borrow();
|
||||
let mut mv_cursor = mv_cursor.borrow_mut();
|
||||
return Ok(IOResult::Done(
|
||||
mv_cursor.current_row_id().map(|rowid| rowid.row_id),
|
||||
));
|
||||
@@ -4350,7 +4350,7 @@ impl BTreeCursor {
|
||||
return Ok(IOResult::Done(Some(record_ref)));
|
||||
}
|
||||
if self.mv_cursor.is_some() {
|
||||
let mv_cursor = self.mv_cursor.as_ref().unwrap().borrow();
|
||||
let mut mv_cursor = self.mv_cursor.as_ref().unwrap().borrow_mut();
|
||||
let row = mv_cursor.current_row().unwrap().unwrap();
|
||||
self.get_immutable_record_or_create()
|
||||
.as_mut()
|
||||
|
||||
@@ -919,7 +919,7 @@ pub fn op_open_read(
|
||||
let table_id = *root_page as u64;
|
||||
let mv_store = mv_store.unwrap().clone();
|
||||
let mv_cursor = Rc::new(RefCell::new(
|
||||
MvCursor::new(mv_store.clone(), tx_id, table_id).unwrap(),
|
||||
MvCursor::new(mv_store, tx_id, table_id, pager.clone()).unwrap(),
|
||||
));
|
||||
Some(mv_cursor)
|
||||
}
|
||||
@@ -5829,7 +5829,7 @@ pub fn op_open_write(
|
||||
let table_id = root_page;
|
||||
let mv_store = mv_store.unwrap().clone();
|
||||
let mv_cursor = Rc::new(RefCell::new(
|
||||
MvCursor::new(mv_store.clone(), tx_id, table_id).unwrap(),
|
||||
MvCursor::new(mv_store.clone(), tx_id, table_id, pager.clone()).unwrap(),
|
||||
));
|
||||
Some(mv_cursor)
|
||||
}
|
||||
@@ -6439,7 +6439,7 @@ pub fn op_open_ephemeral(
|
||||
let table_id = root_page as u64;
|
||||
let mv_store = mv_store.unwrap().clone();
|
||||
let mv_cursor = Rc::new(RefCell::new(
|
||||
MvCursor::new(mv_store.clone(), tx_id, table_id).unwrap(),
|
||||
MvCursor::new(mv_store.clone(), tx_id, table_id, pager.clone()).unwrap(),
|
||||
));
|
||||
Some(mv_cursor)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user