mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-15 04:54:20 +01:00
core: Integrate MVCC to B-Tree cursor
This commit is contained in:
@@ -85,6 +85,8 @@ enum TransactionState {
|
||||
|
||||
pub(crate) type MvStore = crate::mvcc::MvStore<crate::mvcc::LocalClock>;
|
||||
|
||||
pub(crate) type MvCursor = crate::mvcc::cursor::ScanCursor<crate::mvcc::LocalClock>;
|
||||
|
||||
pub struct Database {
|
||||
mv_store: Option<Rc<MvStore>>,
|
||||
schema: Arc<RwLock<Schema>>,
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::storage::pager::Pager;
|
||||
use crate::storage::sqlite3_ondisk::{
|
||||
read_varint, BTreeCell, PageContent, PageType, TableInteriorCell, TableLeafCell,
|
||||
};
|
||||
use crate::MvCursor;
|
||||
|
||||
use crate::types::{CursorResult, OwnedValue, Record, SeekKey, SeekOp};
|
||||
use crate::{return_corrupt, LimboError, Result};
|
||||
@@ -135,6 +136,9 @@ impl CursorState {
|
||||
}
|
||||
|
||||
pub struct BTreeCursor {
|
||||
/// The multi-version cursor that is used to read and write to the database file.
|
||||
mv_cursor: Option<Rc<RefCell<MvCursor>>>,
|
||||
/// The pager that is used to read and write to the database file.
|
||||
pager: Rc<Pager>,
|
||||
/// Page id of the root page used to go back up fast.
|
||||
root_page: usize,
|
||||
@@ -178,8 +182,13 @@ struct CellArray {
|
||||
}
|
||||
|
||||
impl BTreeCursor {
|
||||
pub fn new(pager: Rc<Pager>, root_page: usize) -> Self {
|
||||
pub fn new(
|
||||
mv_cursor: Option<Rc<RefCell<MvCursor>>>,
|
||||
pager: Rc<Pager>,
|
||||
root_page: usize,
|
||||
) -> Self {
|
||||
Self {
|
||||
mv_cursor,
|
||||
pager,
|
||||
root_page,
|
||||
rowid: Cell::new(None),
|
||||
@@ -198,6 +207,10 @@ impl BTreeCursor {
|
||||
/// Check if the table is empty.
|
||||
/// This is done by checking if the root page has no cells.
|
||||
fn is_empty_table(&self) -> Result<CursorResult<bool>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mv_cursor = mv_cursor.borrow();
|
||||
return Ok(CursorResult::Ok(mv_cursor.is_empty()));
|
||||
}
|
||||
let page = self.pager.read_page(self.root_page)?;
|
||||
return_if_locked!(page);
|
||||
|
||||
@@ -290,6 +303,19 @@ impl BTreeCursor {
|
||||
&mut self,
|
||||
predicate: Option<(SeekKey<'_>, SeekOp)>,
|
||||
) -> Result<CursorResult<(Option<u64>, Option<Record>)>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mut mv_cursor = mv_cursor.borrow_mut();
|
||||
let rowid = mv_cursor.current_row_id();
|
||||
match rowid {
|
||||
Some(rowid) => {
|
||||
let record = mv_cursor.current_row().unwrap().unwrap();
|
||||
let record: Record = crate::storage::sqlite3_ondisk::read_record(&record.data)?;
|
||||
mv_cursor.forward();
|
||||
return Ok(CursorResult::Ok((Some(rowid.row_id), Some(record))));
|
||||
}
|
||||
None => return Ok(CursorResult::Ok((None, None))),
|
||||
}
|
||||
}
|
||||
loop {
|
||||
let mem_page_rc = self.stack.top();
|
||||
let cell_idx = self.stack.current_cell_index() as usize;
|
||||
@@ -592,6 +618,7 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result<CursorResult<()>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
// For a table with N rows, we can find any row by row id in O(log(N)) time by starting at the root page and following the B-tree pointers.
|
||||
// B-trees consist of interior pages and leaf pages. Interior pages contain pointers to other pages, while leaf pages contain the actual row data.
|
||||
//
|
||||
@@ -1592,15 +1619,22 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
pub fn rewind(&mut self) -> Result<CursorResult<()>> {
|
||||
self.move_to_root();
|
||||
if let Some(_) = &self.mv_cursor {
|
||||
let (rowid, record) = return_if_io!(self.get_next_record(None));
|
||||
self.rowid.replace(rowid);
|
||||
self.record.replace(record);
|
||||
} else {
|
||||
self.move_to_root();
|
||||
|
||||
let (rowid, record) = return_if_io!(self.get_next_record(None));
|
||||
self.rowid.replace(rowid);
|
||||
self.record.replace(record);
|
||||
let (rowid, record) = return_if_io!(self.get_next_record(None));
|
||||
self.rowid.replace(rowid);
|
||||
self.record.replace(record);
|
||||
}
|
||||
Ok(CursorResult::Ok(()))
|
||||
}
|
||||
|
||||
pub fn last(&mut self) -> Result<CursorResult<()>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
match self.move_to_rightmost()? {
|
||||
CursorResult::Ok(_) => self.prev(),
|
||||
CursorResult::IO => Ok(CursorResult::IO),
|
||||
@@ -1615,6 +1649,7 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
pub fn prev(&mut self) -> Result<CursorResult<()>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
match self.get_prev_record()? {
|
||||
CursorResult::Ok((rowid, record)) => {
|
||||
self.rowid.replace(rowid);
|
||||
@@ -1631,10 +1666,15 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
pub fn rowid(&self) -> Result<Option<u64>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mv_cursor = mv_cursor.borrow();
|
||||
return Ok(mv_cursor.current_row_id().map(|rowid| rowid.row_id));
|
||||
}
|
||||
Ok(self.rowid.get())
|
||||
}
|
||||
|
||||
pub fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result<CursorResult<bool>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
let (rowid, record) = return_if_io!(self.do_seek(key, op));
|
||||
self.rowid.replace(rowid);
|
||||
self.record.replace(record);
|
||||
@@ -1648,23 +1688,35 @@ impl BTreeCursor {
|
||||
pub fn insert(
|
||||
&mut self,
|
||||
key: &OwnedValue,
|
||||
_record: &Record,
|
||||
record: &Record,
|
||||
moved_before: bool, /* Indicate whether it's necessary to traverse to find the leaf page */
|
||||
) -> Result<CursorResult<()>> {
|
||||
let int_key = match key {
|
||||
OwnedValue::Integer(i) => i,
|
||||
_ => unreachable!("btree tables are indexed by integers!"),
|
||||
};
|
||||
if !moved_before {
|
||||
return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ));
|
||||
}
|
||||
|
||||
return_if_io!(self.insert_into_page(key, _record));
|
||||
self.rowid.replace(Some(*int_key as u64));
|
||||
match &self.mv_cursor {
|
||||
Some(mv_cursor) => {
|
||||
let row_id =
|
||||
crate::mvcc::database::RowID::new(self.table_id() as u64, *int_key as u64);
|
||||
let mut record_buf = Vec::new();
|
||||
record.serialize(&mut record_buf);
|
||||
let row = crate::mvcc::database::Row::new(row_id, record_buf);
|
||||
mv_cursor.borrow_mut().insert(row).unwrap();
|
||||
}
|
||||
None => {
|
||||
if !moved_before {
|
||||
return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ));
|
||||
}
|
||||
return_if_io!(self.insert_into_page(key, record));
|
||||
self.rowid.replace(Some(*int_key as u64));
|
||||
}
|
||||
};
|
||||
Ok(CursorResult::Ok(()))
|
||||
}
|
||||
|
||||
pub fn delete(&mut self) -> Result<CursorResult<()>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
let page = self.stack.top();
|
||||
return_if_locked!(page);
|
||||
|
||||
@@ -1808,6 +1860,7 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
pub fn exists(&mut self, key: &OwnedValue) -> Result<CursorResult<bool>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
let int_key = match key {
|
||||
OwnedValue::Integer(i) => i,
|
||||
_ => unreachable!("btree tables are indexed by integers!"),
|
||||
@@ -1927,6 +1980,10 @@ impl BTreeCursor {
|
||||
|
||||
Ok(Some(n_overflow))
|
||||
}
|
||||
|
||||
pub fn table_id(&self) -> usize {
|
||||
self.root_page
|
||||
}
|
||||
}
|
||||
|
||||
impl PageStack {
|
||||
@@ -2905,7 +2962,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn validate_btree(pager: Rc<Pager>, page_idx: usize) -> (usize, bool) {
|
||||
let cursor = BTreeCursor::new(pager.clone(), page_idx);
|
||||
let cursor = BTreeCursor::new(None, pager.clone(), page_idx);
|
||||
let page = pager.read_page(page_idx).unwrap();
|
||||
let page = page.get();
|
||||
let contents = page.contents.as_ref().unwrap();
|
||||
@@ -2969,7 +3026,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn format_btree(pager: Rc<Pager>, page_idx: usize, depth: usize) -> String {
|
||||
let cursor = BTreeCursor::new(pager.clone(), page_idx);
|
||||
let cursor = BTreeCursor::new(None, pager.clone(), page_idx);
|
||||
let page = pager.read_page(page_idx).unwrap();
|
||||
let page = page.get();
|
||||
let contents = page.contents.as_ref().unwrap();
|
||||
@@ -3100,7 +3157,7 @@ mod tests {
|
||||
.as_slice(),
|
||||
] {
|
||||
let (pager, root_page) = empty_btree();
|
||||
let mut cursor = BTreeCursor::new(pager.clone(), root_page);
|
||||
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page);
|
||||
for (key, size) in sequence.iter() {
|
||||
run_until_done(
|
||||
|| {
|
||||
@@ -3151,7 +3208,7 @@ mod tests {
|
||||
tracing::info!("super seed: {}", seed);
|
||||
for _ in 0..attempts {
|
||||
let (pager, root_page) = empty_btree();
|
||||
let mut cursor = BTreeCursor::new(pager.clone(), root_page);
|
||||
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page);
|
||||
let mut keys = Vec::new();
|
||||
let seed = rng.next_u64();
|
||||
tracing::info!("seed: {}", seed);
|
||||
@@ -3332,7 +3389,7 @@ mod tests {
|
||||
#[ignore]
|
||||
pub fn test_clear_overflow_pages() -> Result<()> {
|
||||
let (pager, db_header) = setup_test_env(5);
|
||||
let cursor = BTreeCursor::new(pager.clone(), 1);
|
||||
let cursor = BTreeCursor::new(None, pager.clone(), 1);
|
||||
|
||||
let max_local = payload_overflow_threshold_max(PageType::TableLeaf, 4096);
|
||||
let usable_size = cursor.usable_space();
|
||||
@@ -3430,7 +3487,7 @@ mod tests {
|
||||
#[test]
|
||||
pub fn test_clear_overflow_pages_no_overflow() -> Result<()> {
|
||||
let (pager, db_header) = setup_test_env(5);
|
||||
let cursor = BTreeCursor::new(pager.clone(), 1);
|
||||
let cursor = BTreeCursor::new(None, pager.clone(), 1);
|
||||
|
||||
let small_payload = vec![b'A'; 10];
|
||||
|
||||
@@ -3869,7 +3926,7 @@ mod tests {
|
||||
let (pager, root_page) = empty_btree();
|
||||
let mut keys = Vec::new();
|
||||
for i in 0..10000 {
|
||||
let mut cursor = BTreeCursor::new(pager.clone(), root_page);
|
||||
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page);
|
||||
tracing::info!("INSERT INTO t VALUES ({});", i,);
|
||||
let key = OwnedValue::Integer(i);
|
||||
let value = Record::new(vec![OwnedValue::Integer(i)]);
|
||||
@@ -3893,7 +3950,7 @@ mod tests {
|
||||
format_btree(pager.clone(), root_page, 0)
|
||||
);
|
||||
for key in keys.iter() {
|
||||
let mut cursor = BTreeCursor::new(pager.clone(), root_page);
|
||||
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page);
|
||||
let key = OwnedValue::Integer(*key);
|
||||
let exists = run_until_done(|| cursor.exists(&key), pager.deref()).unwrap();
|
||||
assert!(exists, "key not found {}", key);
|
||||
|
||||
@@ -55,7 +55,9 @@ use crate::{
|
||||
json::json_quote, json::json_remove, json::json_set, json::json_type,
|
||||
};
|
||||
use crate::{info, CheckpointStatus};
|
||||
use crate::{resolve_ext_path, Connection, MvStore, Result, TransactionState, DATABASE_VERSION};
|
||||
use crate::{
|
||||
resolve_ext_path, Connection, MvCursor, MvStore, Result, TransactionState, DATABASE_VERSION,
|
||||
};
|
||||
use insn::{
|
||||
exec_add, exec_and, exec_bit_and, exec_bit_not, exec_bit_or, exec_boolean_not, exec_concat,
|
||||
exec_divide, exec_multiply, exec_or, exec_remainder, exec_shift_left, exec_shift_right,
|
||||
@@ -762,7 +764,18 @@ impl Program {
|
||||
root_page,
|
||||
} => {
|
||||
let (_, cursor_type) = self.cursor_ref.get(*cursor_id).unwrap();
|
||||
let cursor = BTreeCursor::new(pager.clone(), *root_page);
|
||||
let mv_cursor = match state.mv_tx_id {
|
||||
Some(tx_id) => {
|
||||
let table_id = *root_page as u64;
|
||||
let mv_store = mv_store.as_ref().unwrap().clone();
|
||||
let mv_cursor = Rc::new(RefCell::new(
|
||||
MvCursor::new(mv_store, tx_id, table_id).unwrap(),
|
||||
));
|
||||
Some(mv_cursor)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
let cursor = BTreeCursor::new(mv_cursor, pager.clone(), *root_page);
|
||||
let mut cursors = state.cursors.borrow_mut();
|
||||
match cursor_type {
|
||||
CursorType::BTreeTable(_) => {
|
||||
@@ -2955,7 +2968,18 @@ impl Program {
|
||||
let (_, cursor_type) = self.cursor_ref.get(*cursor_id).unwrap();
|
||||
let mut cursors = state.cursors.borrow_mut();
|
||||
let is_index = cursor_type.is_index();
|
||||
let cursor = BTreeCursor::new(pager.clone(), *root_page);
|
||||
let mv_cursor = match state.mv_tx_id {
|
||||
Some(tx_id) => {
|
||||
let table_id = *root_page as u64;
|
||||
let mv_store = mv_store.as_ref().unwrap().clone();
|
||||
let mv_cursor = Rc::new(RefCell::new(
|
||||
MvCursor::new(mv_store, tx_id, table_id).unwrap(),
|
||||
));
|
||||
Some(mv_cursor)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
let cursor = BTreeCursor::new(mv_cursor, pager.clone(), *root_page);
|
||||
if is_index {
|
||||
cursors
|
||||
.get_mut(*cursor_id)
|
||||
|
||||
Reference in New Issue
Block a user