mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-15 06:04:19 +01:00
core/mvcc/cursor: next with btree
This commit is contained in:
@@ -22,6 +22,18 @@ enum CursorPosition {
|
||||
End,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum NextState {
|
||||
NextBtree {
|
||||
new_position_in_mvcc: CursorPosition,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum MvccLazyCursorState {
|
||||
Next(NextState),
|
||||
}
|
||||
|
||||
pub struct MvccLazyCursor<Clock: LogicalClock> {
|
||||
pub db: Arc<MvStore<Clock>>,
|
||||
current_pos: RefCell<CursorPosition>,
|
||||
@@ -33,6 +45,7 @@ pub struct MvccLazyCursor<Clock: LogicalClock> {
|
||||
null_flag: bool,
|
||||
record_cursor: RefCell<RecordCursor>,
|
||||
next_rowid_lock: Arc<RwLock<()>>,
|
||||
state: RefCell<Option<MvccLazyCursorState>>,
|
||||
}
|
||||
|
||||
impl<Clock: LogicalClock + 'static> MvccLazyCursor<Clock> {
|
||||
@@ -58,6 +71,7 @@ impl<Clock: LogicalClock + 'static> MvccLazyCursor<Clock> {
|
||||
null_flag: false,
|
||||
record_cursor: RefCell::new(RecordCursor::new()),
|
||||
next_rowid_lock: Arc::new(RwLock::new(())),
|
||||
state: RefCell::new(None),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -139,42 +153,112 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
|
||||
|
||||
/// Move the cursor to the next row. Returns true if the cursor moved to the next row, false if the cursor is at the end of the table.
|
||||
fn next(&mut self) -> Result<IOResult<bool>> {
|
||||
let before_first = matches!(self.get_current_pos(), CursorPosition::BeforeFirst);
|
||||
let min_id = match *self.current_pos.borrow() {
|
||||
let current_state = *self.state.borrow();
|
||||
if current_state.is_none() {
|
||||
let before_first = matches!(self.get_current_pos(), CursorPosition::BeforeFirst);
|
||||
let min_id = match *self.current_pos.borrow() {
|
||||
CursorPosition::Loaded {
|
||||
row_id,
|
||||
in_btree: _,
|
||||
} => row_id.row_id + 1,
|
||||
// TODO: do we need to forward twice?
|
||||
CursorPosition::BeforeFirst => i64::MIN, // we need to find first row, so we look from the first id,
|
||||
CursorPosition::End => {
|
||||
// let's keep same state, we reached the end so no point in moving forward.
|
||||
return Ok(IOResult::Done(false));
|
||||
}
|
||||
};
|
||||
|
||||
let new_position_in_mvcc =
|
||||
match self
|
||||
.db
|
||||
.get_next_row_id_for_table(self.table_id, min_id, self.tx_id)
|
||||
{
|
||||
Some(id) => CursorPosition::Loaded {
|
||||
row_id: id,
|
||||
in_btree: false,
|
||||
},
|
||||
None => {
|
||||
if before_first {
|
||||
// if it wasn't loaded and we didn't find anything, it means the table is empty.
|
||||
CursorPosition::BeforeFirst
|
||||
} else {
|
||||
// if we had something loaded, and we didn't find next key then it means we are at the end.
|
||||
CursorPosition::End
|
||||
}
|
||||
}
|
||||
};
|
||||
self.state
|
||||
.replace(Some(MvccLazyCursorState::Next(NextState::NextBtree {
|
||||
new_position_in_mvcc,
|
||||
})));
|
||||
}
|
||||
let current_state = *self.state.borrow();
|
||||
let Some(MvccLazyCursorState::Next(NextState::NextBtree {
|
||||
new_position_in_mvcc,
|
||||
})) = current_state
|
||||
else {
|
||||
panic!("Invalid state {:?}", self.state.borrow());
|
||||
};
|
||||
|
||||
let found = return_if_io!(self.btree_cursor.next());
|
||||
// get current rowid in mvcc and in btree
|
||||
// compare both and set loaded to position of the one that is lesser
|
||||
let new_position_in_mvcc = match new_position_in_mvcc {
|
||||
CursorPosition::Loaded {
|
||||
row_id,
|
||||
in_btree: _,
|
||||
} => row_id.row_id + 1,
|
||||
// TODO: do we need to forward twice?
|
||||
CursorPosition::BeforeFirst => i64::MIN, // we need to find first row, so we look from the first id,
|
||||
CursorPosition::End => {
|
||||
// let's keep same state, we reached the end so no point in moving forward.
|
||||
return Ok(IOResult::Done(false));
|
||||
}
|
||||
} => Some(row_id.row_id),
|
||||
CursorPosition::BeforeFirst => None,
|
||||
CursorPosition::End => None,
|
||||
};
|
||||
|
||||
let new_position_in_mvcc =
|
||||
match self
|
||||
.db
|
||||
.get_next_row_id_for_table(self.table_id, min_id, self.tx_id)
|
||||
{
|
||||
Some(id) => CursorPosition::Loaded {
|
||||
row_id: id,
|
||||
in_btree: false,
|
||||
},
|
||||
None => {
|
||||
if before_first {
|
||||
// if it wasn't loaded and we didn't find anything, it means the table is empty.
|
||||
CursorPosition::BeforeFirst
|
||||
} else {
|
||||
// if we had something loaded, and we didn't find next key then it means we are at the end.
|
||||
CursorPosition::End
|
||||
let current_rowid_in_btree = if found {
|
||||
let IOResult::Done(Some(rowid)) = self.btree_cursor.rowid()? else {
|
||||
panic!("BTree should have returned rowid after next");
|
||||
};
|
||||
Some(rowid)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let new_position = match (new_position_in_mvcc, current_rowid_in_btree) {
|
||||
(Some(mvcc_rowid), Some(btree_rowid)) => {
|
||||
if mvcc_rowid < btree_rowid {
|
||||
CursorPosition::Loaded {
|
||||
row_id: RowID {
|
||||
table_id: self.table_id,
|
||||
row_id: mvcc_rowid,
|
||||
},
|
||||
in_btree: false,
|
||||
}
|
||||
} else {
|
||||
CursorPosition::Loaded {
|
||||
row_id: RowID {
|
||||
table_id: self.table_id,
|
||||
row_id: btree_rowid,
|
||||
},
|
||||
in_btree: true,
|
||||
}
|
||||
}
|
||||
};
|
||||
// let new_position_in_btree = self.btree_cursor.next();
|
||||
self.current_pos.replace(new_position_in_mvcc);
|
||||
}
|
||||
(None, Some(btree_rowid)) => CursorPosition::Loaded {
|
||||
row_id: RowID {
|
||||
table_id: self.table_id,
|
||||
row_id: btree_rowid,
|
||||
},
|
||||
in_btree: true,
|
||||
},
|
||||
(Some(mvcc_rowid), None) => CursorPosition::Loaded {
|
||||
row_id: RowID {
|
||||
table_id: self.table_id,
|
||||
row_id: mvcc_rowid,
|
||||
},
|
||||
in_btree: false,
|
||||
},
|
||||
(None, None) => CursorPosition::End,
|
||||
};
|
||||
self.current_pos.replace(new_position);
|
||||
self.invalidate_record();
|
||||
self.state.replace(None);
|
||||
|
||||
Ok(IOResult::Done(matches!(
|
||||
self.get_current_pos(),
|
||||
|
||||
Reference in New Issue
Block a user