diff --git a/core/mvcc/cursor.rs b/core/mvcc/cursor.rs index db0d621a7..dea78b999 100644 --- a/core/mvcc/cursor.rs +++ b/core/mvcc/cursor.rs @@ -1,5 +1,6 @@ use crate::mvcc::clock::LogicalClock; use crate::mvcc::database::{MvStore, Result, Row, RowID}; +use crate::Pager; use std::fmt::Debug; use std::rc::Rc; @@ -21,13 +22,20 @@ pub struct MvccLazyCursor { } impl MvccLazyCursor { - pub fn new(db: Rc>, tx_id: u64, table_id: u64) -> Result> { - Ok(Self { + pub fn new( + db: Rc>, + tx_id: u64, + table_id: u64, + pager: Rc, + ) -> Result> { + db.maybe_initialize_table(table_id, pager)?; + let cursor = Self { db, tx_id, current_pos: CursorPosition::BeforeFirst, table_id, - }) + }; + Ok(cursor) } /// Insert a row into the table. @@ -40,18 +48,37 @@ impl MvccLazyCursor { Ok(()) } - pub fn current_row_id(&self) -> Option { + pub fn current_row_id(&mut self) -> Option { match self.current_pos { CursorPosition::Loaded(id) => Some(id), - CursorPosition::BeforeFirst => None, + CursorPosition::BeforeFirst => { + // If we are before first, we need to try and find the first row. + let maybe_rowid = self.db.get_next_row_id_for_table(self.table_id, i64::MIN); + if let Some(id) = maybe_rowid { + self.current_pos = CursorPosition::Loaded(id); + Some(id) + } else { + self.current_pos = CursorPosition::BeforeFirst; + None + } + } CursorPosition::End => None, } } - pub fn current_row(&self) -> Result> { + pub fn current_row(&mut self) -> Result> { match self.current_pos { CursorPosition::Loaded(id) => self.db.read(self.tx_id, id), - CursorPosition::BeforeFirst => Ok(None), + CursorPosition::BeforeFirst => { + // If we are before first, we need to try and find the first row. + let maybe_rowid = self.db.get_next_row_id_for_table(self.table_id, i64::MIN); + if let Some(id) = maybe_rowid { + self.current_pos = CursorPosition::Loaded(id); + self.db.read(self.tx_id, id) + } else { + Ok(None) + } + } CursorPosition::End => Ok(None), } } @@ -65,7 +92,8 @@ impl MvccLazyCursor { let before_first = matches!(self.current_pos, CursorPosition::BeforeFirst); let min_id = match self.current_pos { CursorPosition::Loaded(id) => id.row_id + 1, - CursorPosition::BeforeFirst => i64::MIN, // we need to find first row, so we look from the first id + // TODO: do we need to forward twice? + CursorPosition::BeforeFirst => i64::MIN, // we need to find first row, so we look from the first id, CursorPosition::End => { // let's keep same state, we reached the end so no point in moving forward. return false; diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 83265163b..13aab7d0c 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1,11 +1,14 @@ use crate::mvcc::clock::LogicalClock; use crate::mvcc::errors::DatabaseError; use crate::mvcc::persistent_storage::Storage; +use crate::storage::btree::BTreeCursor; use crate::storage::btree::BTreeKey; +use crate::types::IOResult; use crate::types::ImmutableRecord; use crate::{Connection, Pager}; use crossbeam_skiplist::{SkipMap, SkipSet}; use parking_lot::RwLock; +use std::collections::HashSet; use std::fmt::Debug; use std::rc::Rc; use std::sync::atomic::{AtomicU64, Ordering}; @@ -240,6 +243,7 @@ pub struct MvStore { next_rowid: AtomicU64, clock: Clock, storage: Storage, + loaded_tables: RwLock>, } impl MvStore { @@ -252,6 +256,7 @@ impl MvStore { next_rowid: AtomicU64::new(0), // TODO: determine this from B-Tree clock, storage, + loaded_tables: RwLock::new(HashSet::new()), } } @@ -419,25 +424,6 @@ impl MvStore { Ok(keys.collect()) } - /// Gets all row ids in the database for a given table. - pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { - tracing::trace!("scan_row_ids_for_table(table_id={})", table_id); - let rows: Vec = self - .rows - .range( - RowID { - table_id, - row_id: 0, - }..RowID { - table_id, - row_id: i64::MAX, - }, - ) - .map(|entry| *entry.key()) - .collect(); - Ok(rows) - } - pub fn get_row_id_range( &self, table_id: u64, @@ -667,7 +653,7 @@ impl MvStore { ) .map_err(|e| DatabaseError::Io(e.to_string())) .unwrap(); - if let crate::types::IOResult::Done(result) = result { + if let crate::types::IOResult::Done(_) = result { break; } } @@ -929,6 +915,94 @@ impl MvStore { ); Ok(()) } + + /// Try to scan for row ids in the table. + /// + /// This function loads all row ids of a table if the rowids of table were not populated yet. + /// TODO: This is quite expensive so we should try and load rowids in a lazy way. + /// + /// # Arguments + /// + pub fn maybe_initialize_table(&self, table_id: u64, pager: Rc) -> Result<()> { + tracing::trace!("scan_row_ids_for_table(table_id={})", table_id); + + // First, check if the table is already loaded. + if self.loaded_tables.read().unwrap().contains(&table_id) { + return Ok(()); + } + + // Then, scan the disk B-tree to find existing rows + self.scan_load_table(table_id, pager)?; + + self.loaded_tables.write().unwrap().insert(table_id); + + Ok(()) + } + + /// Scans the table and inserts the rows into the database. + /// + /// This is initialization step for a table, where we still don't have any rows so we need to insert them if there are. + fn scan_load_table(&self, table_id: u64, pager: Rc) -> Result<()> { + let root_page = table_id as usize; + let mut cursor = BTreeCursor::new_table( + None, // No MVCC cursor for scanning + pager, root_page, 1, // We'll adjust this as needed + ); + loop { + match cursor + .rewind() + .map_err(|e| DatabaseError::Io(e.to_string()))? + { + IOResult::Done(()) => { + break; + } + IOResult::IO => unreachable!(), // FIXME: lazy me not wanting to do state machine right now + } + } + Ok(loop { + let rowid_result = cursor + .rowid() + .map_err(|e| DatabaseError::Io(e.to_string()))?; + let row_id = match rowid_result { + IOResult::Done(Some(row_id)) => row_id, + IOResult::Done(None) => break, + IOResult::IO => unreachable!(), // FIXME: lazy me not wanting to do state machine right now + }; + match cursor + .record() + .map_err(|e| DatabaseError::Io(e.to_string()))? + { + IOResult::Done(Some(record)) => { + let id = RowID { table_id, row_id }; + let column_count = record.column_count(); + // We insert row with 0 timestamp, because it's the only version we have on initialization. + self.insert_version( + id, + RowVersion { + begin: TxTimestampOrID::Timestamp(0), + end: None, + row: Row::new(id, record.get_payload().to_vec(), column_count), + }, + ); + } + IOResult::Done(None) => break, + IOResult::IO => unreachable!(), // FIXME: lazy me not wanting to do state machine right now + } + + // Move to next record + match cursor + .next() + .map_err(|e| DatabaseError::Io(e.to_string()))? + { + IOResult::Done(has_next) => { + if !has_next { + break; + } + } + IOResult::IO => unreachable!(), // FIXME: lazy me not wanting to do state machine right now + } + }) + } } /// A write-write conflict happens when transaction T_current attempts to update a diff --git a/core/storage/btree.rs b/core/storage/btree.rs index db64e578c..ea101bd99 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -4278,7 +4278,7 @@ impl BTreeCursor { pub fn rowid(&mut self) -> Result>> { if let Some(mv_cursor) = &self.mv_cursor { if self.has_record.get() { - let mv_cursor = mv_cursor.borrow(); + let mut mv_cursor = mv_cursor.borrow_mut(); return Ok(IOResult::Done( mv_cursor.current_row_id().map(|rowid| rowid.row_id), )); @@ -4350,7 +4350,7 @@ impl BTreeCursor { return Ok(IOResult::Done(Some(record_ref))); } if self.mv_cursor.is_some() { - let mv_cursor = self.mv_cursor.as_ref().unwrap().borrow(); + let mut mv_cursor = self.mv_cursor.as_ref().unwrap().borrow_mut(); let row = mv_cursor.current_row().unwrap().unwrap(); self.get_immutable_record_or_create() .as_mut() diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index f60c12ff8..65ee84871 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -919,7 +919,7 @@ pub fn op_open_read( let table_id = *root_page as u64; let mv_store = mv_store.unwrap().clone(); let mv_cursor = Rc::new(RefCell::new( - MvCursor::new(mv_store.clone(), tx_id, table_id).unwrap(), + MvCursor::new(mv_store, tx_id, table_id, pager.clone()).unwrap(), )); Some(mv_cursor) } @@ -5829,7 +5829,7 @@ pub fn op_open_write( let table_id = root_page; let mv_store = mv_store.unwrap().clone(); let mv_cursor = Rc::new(RefCell::new( - MvCursor::new(mv_store.clone(), tx_id, table_id).unwrap(), + MvCursor::new(mv_store.clone(), tx_id, table_id, pager.clone()).unwrap(), )); Some(mv_cursor) } @@ -6439,7 +6439,7 @@ pub fn op_open_ephemeral( let table_id = root_page as u64; let mv_store = mv_store.unwrap().clone(); let mv_cursor = Rc::new(RefCell::new( - MvCursor::new(mv_store.clone(), tx_id, table_id).unwrap(), + MvCursor::new(mv_store.clone(), tx_id, table_id, pager.clone()).unwrap(), )); Some(mv_cursor) }