mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 17:05:36 +01:00
core/mvcc: implement CursorTrait on MVCC cursor
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
use crate::mvcc::clock::LogicalClock;
|
||||
use crate::mvcc::database::{MVTableId, MvStore, Row, RowID};
|
||||
use crate::types::{IOResult, SeekKey, SeekOp, SeekResult};
|
||||
use crate::storage::btree::{BTreeKey, CursorTrait};
|
||||
use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult};
|
||||
use crate::Result;
|
||||
use crate::{Pager, Value};
|
||||
use std::cell::{Ref, RefCell};
|
||||
use std::fmt::Debug;
|
||||
use std::ops::Bound;
|
||||
use std::sync::Arc;
|
||||
@@ -19,12 +21,14 @@ enum CursorPosition {
|
||||
#[derive(Debug)]
|
||||
pub struct MvccLazyCursor<Clock: LogicalClock> {
|
||||
pub db: Arc<MvStore<Clock>>,
|
||||
current_pos: CursorPosition,
|
||||
current_pos: RefCell<CursorPosition>,
|
||||
pub table_id: MVTableId,
|
||||
tx_id: u64,
|
||||
/// Reusable immutable record, used to allow better allocation strategy.
|
||||
reusable_immutable_record: RefCell<Option<ImmutableRecord>>,
|
||||
}
|
||||
|
||||
impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
|
||||
impl<Clock: LogicalClock + 'static> MvccLazyCursor<Clock> {
|
||||
pub fn new(
|
||||
db: Arc<MvStore<Clock>>,
|
||||
tx_id: u64,
|
||||
@@ -36,55 +40,15 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
|
||||
let cursor = Self {
|
||||
db,
|
||||
tx_id,
|
||||
current_pos: CursorPosition::BeforeFirst,
|
||||
current_pos: RefCell::new(CursorPosition::BeforeFirst),
|
||||
table_id,
|
||||
reusable_immutable_record: RefCell::new(None),
|
||||
};
|
||||
Ok(cursor)
|
||||
}
|
||||
|
||||
/// Insert a row into the table.
|
||||
/// Sets the cursor to the inserted row.
|
||||
pub fn insert(&mut self, row: Row) -> Result<()> {
|
||||
self.current_pos = CursorPosition::Loaded(row.id);
|
||||
if self.db.read(self.tx_id, row.id)?.is_some() {
|
||||
self.db.update(self.tx_id, row).inspect_err(|_| {
|
||||
self.current_pos = CursorPosition::BeforeFirst;
|
||||
})?;
|
||||
} else {
|
||||
self.db.insert(self.tx_id, row).inspect_err(|_| {
|
||||
self.current_pos = CursorPosition::BeforeFirst;
|
||||
})?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn delete(&mut self, rowid: RowID) -> Result<()> {
|
||||
self.db.delete(self.tx_id, rowid)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn current_row_id(&mut self) -> Option<RowID> {
|
||||
match self.current_pos {
|
||||
CursorPosition::Loaded(id) => Some(id),
|
||||
CursorPosition::BeforeFirst => {
|
||||
// If we are before first, we need to try and find the first row.
|
||||
let maybe_rowid =
|
||||
self.db
|
||||
.get_next_row_id_for_table(self.table_id, i64::MIN, self.tx_id);
|
||||
if let Some(id) = maybe_rowid {
|
||||
self.current_pos = CursorPosition::Loaded(id);
|
||||
Some(id)
|
||||
} else {
|
||||
self.current_pos = CursorPosition::BeforeFirst;
|
||||
None
|
||||
}
|
||||
}
|
||||
CursorPosition::End => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn current_row(&mut self) -> Result<Option<Row>> {
|
||||
match self.current_pos {
|
||||
pub fn current_row(&self) -> Result<Option<Row>> {
|
||||
match *self.current_pos.borrow() {
|
||||
CursorPosition::Loaded(id) => self.db.read(self.tx_id, id),
|
||||
CursorPosition::BeforeFirst => {
|
||||
// If we are before first, we need to try and find the first row.
|
||||
@@ -92,7 +56,7 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
|
||||
self.db
|
||||
.get_next_row_id_for_table(self.table_id, i64::MIN, self.tx_id);
|
||||
if let Some(id) = maybe_rowid {
|
||||
self.current_pos = CursorPosition::Loaded(id);
|
||||
self.current_pos.replace(CursorPosition::Loaded(id));
|
||||
self.db.read(self.tx_id, id)
|
||||
} else {
|
||||
Ok(None)
|
||||
@@ -106,19 +70,57 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_next_rowid(&mut self) -> i64 {
|
||||
let _ = self.last();
|
||||
match *self.current_pos.borrow() {
|
||||
CursorPosition::Loaded(id) => id.row_id + 1,
|
||||
CursorPosition::BeforeFirst => 1,
|
||||
CursorPosition::End => i64::MAX,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_immutable_record_or_create(&self) -> std::cell::RefMut<'_, Option<ImmutableRecord>> {
|
||||
let mut reusable_immutable_record = self.reusable_immutable_record.borrow_mut();
|
||||
if reusable_immutable_record.is_none() {
|
||||
let record = ImmutableRecord::new(1024);
|
||||
reusable_immutable_record.replace(record);
|
||||
}
|
||||
reusable_immutable_record
|
||||
}
|
||||
|
||||
fn get_current_pos(&self) -> CursorPosition {
|
||||
*self.current_pos.borrow()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
|
||||
fn last(&mut self) -> Result<IOResult<()>> {
|
||||
let last_rowid = self.db.get_last_rowid(self.table_id);
|
||||
if let Some(last_rowid) = last_rowid {
|
||||
self.current_pos.replace(CursorPosition::Loaded(RowID {
|
||||
table_id: self.table_id,
|
||||
row_id: last_rowid,
|
||||
}));
|
||||
} else {
|
||||
self.current_pos.replace(CursorPosition::BeforeFirst);
|
||||
}
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
|
||||
/// Move the cursor to the next row. Returns true if the cursor moved to the next row, false if the cursor is at the end of the table.
|
||||
pub fn forward(&mut self) -> bool {
|
||||
let before_first = matches!(self.current_pos, CursorPosition::BeforeFirst);
|
||||
let min_id = match self.current_pos {
|
||||
fn next(&mut self) -> Result<IOResult<bool>> {
|
||||
let before_first = matches!(self.get_current_pos(), CursorPosition::BeforeFirst);
|
||||
let min_id = match *self.current_pos.borrow() {
|
||||
CursorPosition::Loaded(id) => id.row_id + 1,
|
||||
// TODO: do we need to forward twice?
|
||||
CursorPosition::BeforeFirst => i64::MIN, // we need to find first row, so we look from the first id,
|
||||
CursorPosition::End => {
|
||||
// let's keep same state, we reached the end so no point in moving forward.
|
||||
return false;
|
||||
return Ok(IOResult::Done(false));
|
||||
}
|
||||
};
|
||||
self.current_pos =
|
||||
|
||||
let new_position =
|
||||
match self
|
||||
.db
|
||||
.get_next_row_id_for_table(self.table_id, min_id, self.tx_id)
|
||||
@@ -134,46 +136,59 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
|
||||
}
|
||||
}
|
||||
};
|
||||
matches!(self.current_pos, CursorPosition::Loaded(_))
|
||||
self.current_pos.replace(new_position);
|
||||
|
||||
Ok(IOResult::Done(matches!(
|
||||
self.get_current_pos(),
|
||||
CursorPosition::Loaded(_)
|
||||
)))
|
||||
}
|
||||
|
||||
/// Returns true if the is not pointing to any row.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
// If we reached the end of the table, it means we traversed the whole table therefore there must be something in the table.
|
||||
// If we have loaded a row, it means there is something in the table.
|
||||
match self.current_pos {
|
||||
CursorPosition::Loaded(_) => false,
|
||||
CursorPosition::BeforeFirst => true,
|
||||
CursorPosition::End => true,
|
||||
fn prev(&mut self) -> Result<IOResult<bool>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn rowid(&self) -> Result<IOResult<Option<i64>>> {
|
||||
let rowid = match self.get_current_pos() {
|
||||
CursorPosition::Loaded(id) => Some(id.row_id),
|
||||
CursorPosition::BeforeFirst => {
|
||||
// If we are before first, we need to try and find the first row.
|
||||
let maybe_rowid =
|
||||
self.db
|
||||
.get_next_row_id_for_table(self.table_id, i64::MIN, self.tx_id);
|
||||
if let Some(id) = maybe_rowid {
|
||||
self.current_pos.replace(CursorPosition::Loaded(id));
|
||||
Some(id.row_id)
|
||||
} else {
|
||||
self.current_pos.replace(CursorPosition::BeforeFirst);
|
||||
None
|
||||
}
|
||||
}
|
||||
CursorPosition::End => None,
|
||||
};
|
||||
Ok(IOResult::Done(rowid))
|
||||
}
|
||||
|
||||
fn record(
|
||||
&self,
|
||||
) -> Result<IOResult<Option<std::cell::Ref<'_, crate::types::ImmutableRecord>>>> {
|
||||
let Some(row) = self.current_row()? else {
|
||||
return Ok(IOResult::Done(None));
|
||||
};
|
||||
|
||||
{
|
||||
let mut record = self.get_immutable_record_or_create();
|
||||
let record = record.as_mut().unwrap();
|
||||
record.invalidate();
|
||||
record.start_serialization(&row.data);
|
||||
}
|
||||
|
||||
let record_ref =
|
||||
Ref::filter_map(self.reusable_immutable_record.borrow(), |opt| opt.as_ref()).unwrap();
|
||||
Ok(IOResult::Done(Some(record_ref)))
|
||||
}
|
||||
|
||||
pub fn rewind(&mut self) {
|
||||
self.current_pos = CursorPosition::BeforeFirst;
|
||||
}
|
||||
|
||||
pub fn last(&mut self) {
|
||||
let last_rowid = self.db.get_last_rowid(self.table_id);
|
||||
if let Some(last_rowid) = last_rowid {
|
||||
self.current_pos = CursorPosition::Loaded(RowID {
|
||||
table_id: self.table_id,
|
||||
row_id: last_rowid,
|
||||
});
|
||||
} else {
|
||||
self.current_pos = CursorPosition::BeforeFirst;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_next_rowid(&mut self) -> i64 {
|
||||
self.last();
|
||||
match self.current_pos {
|
||||
CursorPosition::Loaded(id) => id.row_id + 1,
|
||||
CursorPosition::BeforeFirst => 1,
|
||||
CursorPosition::End => i64::MAX,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn seek(&mut self, seek_key: SeekKey<'_>, op: SeekOp) -> Result<IOResult<SeekResult>> {
|
||||
fn seek(&mut self, seek_key: SeekKey<'_>, op: SeekOp) -> Result<IOResult<SeekResult>> {
|
||||
let row_id = match seek_key {
|
||||
SeekKey::TableRowId(row_id) => row_id,
|
||||
SeekKey::IndexKey(_) => {
|
||||
@@ -196,7 +211,7 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
|
||||
};
|
||||
let rowid = self.db.seek_rowid(bound, lower_bound, self.tx_id);
|
||||
if let Some(rowid) = rowid {
|
||||
self.current_pos = CursorPosition::Loaded(rowid);
|
||||
self.current_pos.replace(CursorPosition::Loaded(rowid));
|
||||
if op.eq_only() {
|
||||
if rowid.row_id == row_id {
|
||||
Ok(IOResult::Done(SeekResult::Found))
|
||||
@@ -209,15 +224,59 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
|
||||
} else {
|
||||
let forwards = matches!(op, SeekOp::GE { eq_only: _ } | SeekOp::GT);
|
||||
if forwards {
|
||||
self.last();
|
||||
let _ = self.last()?;
|
||||
} else {
|
||||
self.rewind();
|
||||
let _ = self.rewind()?;
|
||||
}
|
||||
Ok(IOResult::Done(SeekResult::NotFound))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn exists(&mut self, key: &Value) -> Result<IOResult<bool>> {
|
||||
/// Insert a row into the table.
|
||||
/// Sets the cursor to the inserted row.
|
||||
fn insert(&mut self, key: &BTreeKey) -> Result<IOResult<()>> {
|
||||
let Some(rowid) = key.maybe_rowid() else {
|
||||
todo!()
|
||||
};
|
||||
let row_id = RowID::new(self.table_id, rowid);
|
||||
let record_buf = key.get_record().unwrap().get_payload().to_vec();
|
||||
let num_columns = match key {
|
||||
BTreeKey::IndexKey(record) => record.column_count(),
|
||||
BTreeKey::TableRowId((_, record)) => record.as_ref().unwrap().column_count(),
|
||||
};
|
||||
let row = crate::mvcc::database::Row::new(row_id, record_buf, num_columns);
|
||||
|
||||
self.current_pos.replace(CursorPosition::Loaded(row.id));
|
||||
if self.db.read(self.tx_id, row.id)?.is_some() {
|
||||
self.db.update(self.tx_id, row).inspect_err(|_| {
|
||||
self.current_pos.replace(CursorPosition::BeforeFirst);
|
||||
})?;
|
||||
} else {
|
||||
self.db.insert(self.tx_id, row).inspect_err(|_| {
|
||||
self.current_pos.replace(CursorPosition::BeforeFirst);
|
||||
})?;
|
||||
}
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
|
||||
fn delete(&mut self) -> Result<IOResult<()>> {
|
||||
let IOResult::Done(Some(rowid)) = self.rowid()? else {
|
||||
todo!();
|
||||
};
|
||||
let rowid = RowID::new(self.table_id, rowid);
|
||||
self.db.delete(self.tx_id, rowid)?;
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
|
||||
fn set_null_flag(&mut self, _flag: bool) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_null_flag(&self) -> bool {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn exists(&mut self, key: &Value) -> Result<IOResult<bool>> {
|
||||
let int_key = match key {
|
||||
Value::Integer(i) => i,
|
||||
_ => unreachable!("btree tables are indexed by integers!"),
|
||||
@@ -234,11 +293,90 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
|
||||
)
|
||||
.is_some();
|
||||
if exists {
|
||||
self.current_pos = CursorPosition::Loaded(RowID {
|
||||
self.current_pos.replace(CursorPosition::Loaded(RowID {
|
||||
table_id: self.table_id,
|
||||
row_id: *int_key,
|
||||
});
|
||||
}));
|
||||
}
|
||||
Ok(IOResult::Done(exists))
|
||||
}
|
||||
|
||||
fn clear_btree(&mut self) -> Result<IOResult<Option<usize>>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn btree_destroy(&mut self) -> Result<IOResult<Option<usize>>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn count(&mut self) -> Result<IOResult<usize>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Returns true if the is not pointing to any row.
|
||||
fn is_empty(&self) -> bool {
|
||||
// If we reached the end of the table, it means we traversed the whole table therefore there must be something in the table.
|
||||
// If we have loaded a row, it means there is something in the table.
|
||||
match self.get_current_pos() {
|
||||
CursorPosition::Loaded(_) => false,
|
||||
CursorPosition::BeforeFirst => true,
|
||||
CursorPosition::End => true,
|
||||
}
|
||||
}
|
||||
|
||||
fn root_page(&self) -> i64 {
|
||||
self.table_id.into()
|
||||
}
|
||||
|
||||
fn rewind(&mut self) -> Result<IOResult<()>> {
|
||||
self.current_pos.replace(CursorPosition::BeforeFirst);
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
|
||||
fn has_record(&self) -> bool {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn set_has_record(&self, _has_record: bool) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_index_info(&self) -> &crate::types::IndexInfo {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn seek_end(&mut self) -> Result<IOResult<()>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn seek_to_last(&mut self) -> Result<IOResult<()>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn invalidate_record(&mut self) {
|
||||
self.get_immutable_record_or_create()
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.invalidate();
|
||||
}
|
||||
|
||||
fn has_rowid(&self) -> bool {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn record_cursor_mut(&self) -> std::cell::RefMut<'_, crate::types::RecordCursor> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_pager(&self) -> Arc<Pager> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_skip_advance(&self) -> bool {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_mvcc_cursor(&self) -> Arc<parking_lot::RwLock<crate::MvCursor>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,6 +115,10 @@ pub(crate) fn generate_simple_string_row(table_id: MVTableId, id: i64, data: &st
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn generate_simple_string_record(data: &str) -> ImmutableRecord {
|
||||
ImmutableRecord::from_values(&[Value::Text(Text::new(data))], 1)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_insert_read() {
|
||||
let db = MvccTestDb::new();
|
||||
@@ -830,14 +834,21 @@ fn test_lazy_scan_cursor_basic() {
|
||||
.unwrap();
|
||||
|
||||
// Check first row
|
||||
assert!(cursor.forward());
|
||||
assert!(matches!(cursor.next().unwrap(), IOResult::Done(true)));
|
||||
assert!(!cursor.is_empty());
|
||||
let row = cursor.current_row().unwrap().unwrap();
|
||||
assert_eq!(row.id.row_id, 1);
|
||||
|
||||
// Iterate through all rows
|
||||
let mut count = 1;
|
||||
while cursor.forward() {
|
||||
loop {
|
||||
let res = cursor.next().unwrap();
|
||||
let IOResult::Done(res) = res else {
|
||||
panic!("unexpected next result {res:?}");
|
||||
};
|
||||
if !res {
|
||||
break;
|
||||
}
|
||||
count += 1;
|
||||
let row = cursor.current_row().unwrap().unwrap();
|
||||
assert_eq!(row.id.row_id, count);
|
||||
@@ -847,7 +858,7 @@ fn test_lazy_scan_cursor_basic() {
|
||||
assert_eq!(count, 5);
|
||||
|
||||
// After the last row, is_empty should return true
|
||||
assert!(!cursor.forward());
|
||||
assert!(!matches!(cursor.next().unwrap(), IOResult::Done(true)));
|
||||
assert!(cursor.is_empty());
|
||||
}
|
||||
|
||||
@@ -865,7 +876,7 @@ fn test_lazy_scan_cursor_with_gaps() {
|
||||
.unwrap();
|
||||
|
||||
// Check first row
|
||||
assert!(cursor.forward());
|
||||
assert!(matches!(cursor.next().unwrap(), IOResult::Done(true)));
|
||||
assert!(!cursor.is_empty());
|
||||
let row = cursor.current_row().unwrap().unwrap();
|
||||
assert_eq!(row.id.row_id, 5);
|
||||
@@ -874,12 +885,27 @@ fn test_lazy_scan_cursor_with_gaps() {
|
||||
let expected_ids = [5, 10, 15, 20, 30];
|
||||
let mut index = 0;
|
||||
|
||||
assert_eq!(cursor.current_row_id().unwrap().row_id, expected_ids[index]);
|
||||
let IOResult::Done(rowid) = cursor.rowid().unwrap() else {
|
||||
unreachable!();
|
||||
};
|
||||
let rowid = rowid.unwrap();
|
||||
assert_eq!(rowid, expected_ids[index]);
|
||||
|
||||
while cursor.forward() {
|
||||
loop {
|
||||
let res = cursor.next().unwrap();
|
||||
let IOResult::Done(res) = res else {
|
||||
panic!("unexpected next result {res:?}");
|
||||
};
|
||||
if !res {
|
||||
break;
|
||||
}
|
||||
index += 1;
|
||||
if index < expected_ids.len() {
|
||||
assert_eq!(cursor.current_row_id().unwrap().row_id, expected_ids[index]);
|
||||
let IOResult::Done(rowid) = cursor.rowid().unwrap() else {
|
||||
unreachable!();
|
||||
};
|
||||
let rowid = rowid.unwrap();
|
||||
assert_eq!(rowid, expected_ids[index]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -900,7 +926,7 @@ fn test_cursor_basic() {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
cursor.forward();
|
||||
let _ = cursor.next().unwrap();
|
||||
|
||||
// Check first row
|
||||
assert!(!cursor.is_empty());
|
||||
@@ -909,7 +935,14 @@ fn test_cursor_basic() {
|
||||
|
||||
// Iterate through all rows
|
||||
let mut count = 1;
|
||||
while cursor.forward() {
|
||||
loop {
|
||||
let res = cursor.next().unwrap();
|
||||
let IOResult::Done(res) = res else {
|
||||
panic!("unexpected next result {res:?}");
|
||||
};
|
||||
if !res {
|
||||
break;
|
||||
}
|
||||
count += 1;
|
||||
let row = cursor.current_row().unwrap().unwrap();
|
||||
assert_eq!(row.id.row_id, count);
|
||||
@@ -919,7 +952,7 @@ fn test_cursor_basic() {
|
||||
assert_eq!(count, 5);
|
||||
|
||||
// After the last row, is_empty should return true
|
||||
assert!(!cursor.forward());
|
||||
assert!(!matches!(cursor.next().unwrap(), IOResult::Done(true)));
|
||||
assert!(cursor.is_empty());
|
||||
}
|
||||
|
||||
@@ -939,7 +972,7 @@ fn test_cursor_with_empty_table() {
|
||||
let table_id = -1; // Empty table
|
||||
|
||||
// Test LazyScanCursor with empty table
|
||||
let mut cursor = MvccLazyCursor::new(
|
||||
let cursor = MvccLazyCursor::new(
|
||||
db.mvcc_store.clone(),
|
||||
tx_id,
|
||||
table_id,
|
||||
@@ -947,7 +980,8 @@ fn test_cursor_with_empty_table() {
|
||||
)
|
||||
.unwrap();
|
||||
assert!(cursor.is_empty());
|
||||
assert!(cursor.current_row_id().is_none());
|
||||
let rowid = cursor.rowid().unwrap();
|
||||
assert!(matches!(rowid, IOResult::Done(None)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -964,15 +998,17 @@ fn test_cursor_modification_during_scan() {
|
||||
.unwrap();
|
||||
|
||||
// Read first row
|
||||
assert!(cursor.forward());
|
||||
assert!(matches!(cursor.next().unwrap(), IOResult::Done(true)));
|
||||
let first_row = cursor.current_row().unwrap().unwrap();
|
||||
assert_eq!(first_row.id.row_id, 1);
|
||||
|
||||
// Insert a new row with ID between existing rows
|
||||
let new_row_id = RowID::new(table_id.into(), 3);
|
||||
let new_row = generate_simple_string_row(table_id.into(), new_row_id.row_id, "new_row");
|
||||
let new_row = generate_simple_string_record("new_row");
|
||||
|
||||
cursor.insert(new_row).unwrap();
|
||||
let _ = cursor
|
||||
.insert(&BTreeKey::TableRowId((new_row_id.row_id, Some(&new_row))))
|
||||
.unwrap();
|
||||
let row = db.mvcc_store.read(tx_id, new_row_id).unwrap().unwrap();
|
||||
let mut record = ImmutableRecord::new(1024);
|
||||
record.start_serialization(&row.data);
|
||||
@@ -986,7 +1022,7 @@ fn test_cursor_modification_during_scan() {
|
||||
assert_eq!(row.id.row_id, 3);
|
||||
|
||||
// Continue scanning - the cursor should still work correctly
|
||||
cursor.forward(); // Move to 4
|
||||
let _ = cursor.next().unwrap(); // Move to 4
|
||||
let row = db
|
||||
.mvcc_store
|
||||
.read(tx_id, RowID::new(table_id.into(), 4))
|
||||
@@ -994,14 +1030,14 @@ fn test_cursor_modification_during_scan() {
|
||||
.unwrap();
|
||||
assert_eq!(row.id.row_id, 4);
|
||||
|
||||
cursor.forward(); // Move to 5 (our new row)
|
||||
let _ = cursor.next().unwrap(); // Move to 5 (our new row)
|
||||
let row = db
|
||||
.mvcc_store
|
||||
.read(tx_id, RowID::new(table_id.into(), 5))
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(row.id.row_id, 5);
|
||||
assert!(!cursor.forward());
|
||||
assert!(!matches!(cursor.next().unwrap(), IOResult::Done(true)));
|
||||
assert!(cursor.is_empty());
|
||||
}
|
||||
|
||||
|
||||
@@ -337,7 +337,7 @@ impl BTreeKey<'_> {
|
||||
}
|
||||
|
||||
/// Get the record, if present. Index will always be present,
|
||||
fn get_record(&self) -> Option<&'_ ImmutableRecord> {
|
||||
pub fn get_record(&self) -> Option<&'_ ImmutableRecord> {
|
||||
match self {
|
||||
BTreeKey::TableRowId((_, record)) => *record,
|
||||
BTreeKey::IndexKey(record) => Some(record),
|
||||
@@ -345,7 +345,7 @@ impl BTreeKey<'_> {
|
||||
}
|
||||
|
||||
/// Get the rowid, if present. Index will never be present.
|
||||
fn maybe_rowid(&self) -> Option<i64> {
|
||||
pub fn maybe_rowid(&self) -> Option<i64> {
|
||||
match self {
|
||||
BTreeKey::TableRowId((rowid, _)) => Some(*rowid),
|
||||
BTreeKey::IndexKey(_) => None,
|
||||
@@ -1297,8 +1297,10 @@ impl BTreeCursor {
|
||||
pub fn get_next_record(&mut self) -> Result<IOResult<bool>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mut mv_cursor = mv_cursor.write();
|
||||
mv_cursor.forward();
|
||||
let rowid = mv_cursor.current_row_id();
|
||||
assert!(matches!(mv_cursor.next()?, IOResult::Done(_)));
|
||||
let IOResult::Done(rowid) = mv_cursor.rowid()? else {
|
||||
todo!()
|
||||
};
|
||||
match rowid {
|
||||
Some(_rowid) => {
|
||||
return Ok(IOResult::Done(true));
|
||||
@@ -4453,11 +4455,14 @@ impl BTreeCursor {
|
||||
#[instrument(skip(self), level = Level::DEBUG)]
|
||||
pub fn rowid(&self) -> Result<IOResult<Option<i64>>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mut mv_cursor = mv_cursor.write();
|
||||
let Some(rowid) = mv_cursor.current_row_id() else {
|
||||
let mv_cursor = mv_cursor.write();
|
||||
let IOResult::Done(rowid) = mv_cursor.rowid()? else {
|
||||
todo!()
|
||||
};
|
||||
let Some(rowid) = rowid else {
|
||||
return Ok(IOResult::Done(None));
|
||||
};
|
||||
return Ok(IOResult::Done(Some(rowid.row_id)));
|
||||
return Ok(IOResult::Done(Some(rowid)));
|
||||
}
|
||||
if self.get_null_flag() {
|
||||
return Ok(IOResult::Done(None));
|
||||
@@ -4520,7 +4525,7 @@ impl BTreeCursor {
|
||||
return Ok(IOResult::Done(Some(record_ref)));
|
||||
}
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mut mv_cursor = mv_cursor.write();
|
||||
let mv_cursor = mv_cursor.write();
|
||||
let Some(row) = mv_cursor.current_row()? else {
|
||||
return Ok(IOResult::Done(None));
|
||||
};
|
||||
@@ -4586,22 +4591,9 @@ impl BTreeCursor {
|
||||
pub fn insert(&mut self, key: &BTreeKey) -> Result<IOResult<()>> {
|
||||
tracing::debug!(valid_state = ?self.valid_state, cursor_state = ?self.state, is_write_in_progress = self.is_write_in_progress());
|
||||
match &self.mv_cursor {
|
||||
Some(mv_cursor) => match key.maybe_rowid() {
|
||||
Some(rowid) => {
|
||||
let row_id =
|
||||
crate::mvcc::database::RowID::new(mv_cursor.read().table_id, rowid);
|
||||
let record_buf = key.get_record().unwrap().get_payload().to_vec();
|
||||
let num_columns = match key {
|
||||
BTreeKey::IndexKey(record) => record.column_count(),
|
||||
BTreeKey::TableRowId((_, record)) => {
|
||||
record.as_ref().unwrap().column_count()
|
||||
}
|
||||
};
|
||||
let row = crate::mvcc::database::Row::new(row_id, record_buf, num_columns);
|
||||
mv_cursor.write().insert(row)?;
|
||||
}
|
||||
None => todo!("Support mvcc inserts with index btrees"),
|
||||
},
|
||||
Some(mv_cursor) => {
|
||||
return_if_io!(mv_cursor.write().insert(key));
|
||||
}
|
||||
None => {
|
||||
return_if_io!(self.insert_into_page(key));
|
||||
if key.maybe_rowid().is_some() {
|
||||
@@ -4627,8 +4619,7 @@ impl BTreeCursor {
|
||||
#[instrument(skip(self), level = Level::DEBUG)]
|
||||
pub fn delete(&mut self) -> Result<IOResult<()>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let rowid = mv_cursor.write().current_row_id().unwrap();
|
||||
mv_cursor.write().delete(rowid)?;
|
||||
return_if_io!(mv_cursor.write().delete());
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
|
||||
@@ -5679,11 +5670,14 @@ impl CursorTrait for BTreeCursor {
|
||||
#[instrument(skip(self), level = Level::DEBUG)]
|
||||
fn rowid(&self) -> Result<IOResult<Option<i64>>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mut mv_cursor = mv_cursor.write();
|
||||
let Some(rowid) = mv_cursor.current_row_id() else {
|
||||
let mv_cursor = mv_cursor.write();
|
||||
let IOResult::Done(rowid) = mv_cursor.rowid()? else {
|
||||
todo!();
|
||||
};
|
||||
let Some(rowid) = rowid else {
|
||||
return Ok(IOResult::Done(None));
|
||||
};
|
||||
return Ok(IOResult::Done(Some(rowid.row_id)));
|
||||
return Ok(IOResult::Done(Some(rowid)));
|
||||
}
|
||||
if self.get_null_flag() {
|
||||
return Ok(IOResult::Done(None));
|
||||
@@ -5743,7 +5737,7 @@ impl CursorTrait for BTreeCursor {
|
||||
return Ok(IOResult::Done(Some(record_ref)));
|
||||
}
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mut mv_cursor = mv_cursor.write();
|
||||
let mv_cursor = mv_cursor.write();
|
||||
let Some(row) = mv_cursor.current_row()? else {
|
||||
return Ok(IOResult::Done(None));
|
||||
};
|
||||
@@ -5809,22 +5803,9 @@ impl CursorTrait for BTreeCursor {
|
||||
fn insert(&mut self, key: &BTreeKey) -> Result<IOResult<()>> {
|
||||
tracing::debug!(valid_state = ?self.valid_state, cursor_state = ?self.state, is_write_in_progress = self.is_write_in_progress());
|
||||
match &self.mv_cursor {
|
||||
Some(mv_cursor) => match key.maybe_rowid() {
|
||||
Some(rowid) => {
|
||||
let row_id =
|
||||
crate::mvcc::database::RowID::new(mv_cursor.read().table_id, rowid);
|
||||
let record_buf = key.get_record().unwrap().get_payload().to_vec();
|
||||
let num_columns = match key {
|
||||
BTreeKey::IndexKey(record) => record.column_count(),
|
||||
BTreeKey::TableRowId((_, record)) => {
|
||||
record.as_ref().unwrap().column_count()
|
||||
}
|
||||
};
|
||||
let row = crate::mvcc::database::Row::new(row_id, record_buf, num_columns);
|
||||
mv_cursor.write().insert(row)?;
|
||||
}
|
||||
None => todo!("Support mvcc inserts with index btrees"),
|
||||
},
|
||||
Some(mv_cursor) => {
|
||||
return_if_io!(mv_cursor.write().insert(key));
|
||||
}
|
||||
None => {
|
||||
return_if_io!(self.insert_into_page(key));
|
||||
if key.maybe_rowid().is_some() {
|
||||
@@ -5838,8 +5819,7 @@ impl CursorTrait for BTreeCursor {
|
||||
#[instrument(skip(self), level = Level::DEBUG)]
|
||||
fn delete(&mut self) -> Result<IOResult<()>> {
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let rowid = mv_cursor.write().current_row_id().unwrap();
|
||||
mv_cursor.write().delete(rowid)?;
|
||||
return_if_io!(mv_cursor.write().delete());
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
|
||||
@@ -6361,7 +6341,7 @@ impl CursorTrait for BTreeCursor {
|
||||
self.rewind_state = RewindState::NextRecord;
|
||||
if let Some(mv_cursor) = &self.mv_cursor {
|
||||
let mut mv_cursor = mv_cursor.write();
|
||||
mv_cursor.rewind();
|
||||
return_if_io!(mv_cursor.rewind());
|
||||
} else {
|
||||
let c = self.move_to_root()?;
|
||||
if let Some(c) = c {
|
||||
|
||||
Reference in New Issue
Block a user