mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-10 02:34:20 +01:00
implement lazy record and rowid in cursor
This also comments save_context for now
This commit is contained in:
committed by
Jussi Saurio
parent
b0c64cb4d2
commit
77b6896eae
@@ -635,7 +635,7 @@ pub struct BTreeCursor {
|
||||
move_to_state: CursorMoveToState,
|
||||
/// Separate state to read a record with overflow pages. This separation from `state` is necessary as
|
||||
/// we can be in a function that relies on `state`, but also needs to process overflow pages
|
||||
read_overflow_state: Option<ReadPayloadOverflow>,
|
||||
read_overflow_state: RefCell<Option<ReadPayloadOverflow>>,
|
||||
/// Contains the current cell_idx for `find_cell`
|
||||
find_cell_state: FindCellState,
|
||||
}
|
||||
@@ -669,8 +669,9 @@ impl BTreeCursor {
|
||||
collations,
|
||||
seek_state: CursorSeekState::Start,
|
||||
move_to_state: CursorMoveToState::Start,
|
||||
read_overflow_state: None,
|
||||
read_overflow_state: RefCell::new(None),
|
||||
find_cell_state: FindCellState(None),
|
||||
parse_record_state: RefCell::new(ParseRecordState::Init),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -835,16 +836,16 @@ impl BTreeCursor {
|
||||
/// Reads the record of a cell that has overflow pages. This is a state machine that requires to be called until completion so everything
|
||||
/// that calls this function should be reentrant.
|
||||
fn process_overflow_read(
|
||||
&mut self,
|
||||
&self,
|
||||
payload: &'static [u8],
|
||||
start_next_page: u32,
|
||||
payload_size: u64,
|
||||
) -> Result<CursorResult<()>> {
|
||||
let res = match &mut self.read_overflow_state {
|
||||
let res = match &mut *self.read_overflow_state.borrow_mut() {
|
||||
None => {
|
||||
tracing::debug!("start reading overflow page payload_size={}", payload_size);
|
||||
let page = self.read_page(start_next_page as usize)?;
|
||||
self.read_overflow_state = Some(ReadPayloadOverflow {
|
||||
*self.read_overflow_state.borrow_mut() = Some(ReadPayloadOverflow {
|
||||
payload: payload.to_vec(),
|
||||
next_page: start_next_page,
|
||||
remaining_to_read: payload_size as usize - payload.len(),
|
||||
@@ -900,7 +901,7 @@ impl BTreeCursor {
|
||||
reuse_immutable.as_mut().unwrap(),
|
||||
)?;
|
||||
}
|
||||
self.read_overflow_state = None;
|
||||
*self.read_overflow_state.borrow_mut() = None;
|
||||
Ok(CursorResult::Ok(()))
|
||||
}
|
||||
CursorResult::IO => Ok(CursorResult::IO),
|
||||
@@ -1256,16 +1257,21 @@ impl BTreeCursor {
|
||||
}
|
||||
loop {
|
||||
let mem_page_rc = self.stack.top();
|
||||
let cell_idx = self.stack.current_cell_index() as usize;
|
||||
tracing::trace!(
|
||||
"current_before_advance id={} cell={}",
|
||||
mem_page_rc.get().get().id,
|
||||
self.stack.current_cell_index()
|
||||
);
|
||||
|
||||
return_if_locked_maybe_load!(self.pager, mem_page_rc);
|
||||
// Important to advance only after loading the page in order to not advance > 1 times
|
||||
self.stack.advance();
|
||||
let cell_idx = self.stack.current_cell_index() as usize;
|
||||
tracing::trace!(
|
||||
"current id={} cell={}",
|
||||
mem_page_rc.get().get().id,
|
||||
cell_idx
|
||||
);
|
||||
return_if_locked_maybe_load!(self.pager, mem_page_rc);
|
||||
// Important to advance only after loading the page in order to not advance > 1 times
|
||||
self.stack.advance();
|
||||
let mem_page = mem_page_rc.get();
|
||||
|
||||
let contents = mem_page.get().contents.as_ref().unwrap();
|
||||
@@ -3892,10 +3898,34 @@ impl BTreeCursor {
|
||||
mv_cursor.current_row_id().map(|rowid| rowid.row_id),
|
||||
));
|
||||
}
|
||||
Ok(match self.has_record.get() {
|
||||
CursorHasRecord::Yes { rowid: Some(rowid) } => Some(rowid),
|
||||
_ => None,
|
||||
})
|
||||
if self.has_record.get() {
|
||||
let page = self.stack.top();
|
||||
return_if_locked_maybe_load!(self.pager, page);
|
||||
let page_type = page.get().get_contents().page_type();
|
||||
let page = page.get();
|
||||
let contents = page.get_contents();
|
||||
let cell_idx = self.stack.current_cell_index();
|
||||
let cell = contents.cell_get(
|
||||
cell_idx as usize,
|
||||
payload_overflow_threshold_max(contents.page_type(), self.usable_space() as u16),
|
||||
payload_overflow_threshold_min(contents.page_type(), self.usable_space() as u16),
|
||||
self.usable_space(),
|
||||
)?;
|
||||
if page_type.is_table() {
|
||||
let BTreeCell::TableLeafCell(TableLeafCell {
|
||||
_rowid,
|
||||
_payload,
|
||||
..
|
||||
}) = cell else {
|
||||
unreachable!("unexpected page_type");
|
||||
};
|
||||
return Ok(CursorResult::Ok(Some(_rowid)));
|
||||
} else {
|
||||
return Ok(CursorResult::Ok(self.get_index_rowid_from_record()));
|
||||
}
|
||||
} else {
|
||||
return Ok(CursorResult::Ok(None));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result<CursorResult<bool>> {
|
||||
@@ -3914,7 +3944,7 @@ impl BTreeCursor {
|
||||
/// Return a reference to the record the cursor is currently pointing to.
|
||||
/// If record was not parsed yet, then we have to parse it and in case of I/O we yield control
|
||||
/// back.
|
||||
pub fn record(&mut self) -> Result<CursorResult<Ref<Option<ImmutableRecord>>>> {
|
||||
pub fn record(&self) -> Result<CursorResult<Ref<Option<ImmutableRecord>>>> {
|
||||
let invalidated = self
|
||||
.reusable_immutable_record
|
||||
.borrow()
|
||||
@@ -3940,17 +3970,28 @@ impl BTreeCursor {
|
||||
payload_overflow_threshold_min(contents.page_type(), self.usable_space() as u16),
|
||||
self.usable_space(),
|
||||
)?;
|
||||
let BTreeCell::IndexInteriorCell(IndexInteriorCell {
|
||||
payload,
|
||||
payload_size,
|
||||
first_overflow_page,
|
||||
..
|
||||
}) = &cell
|
||||
else {
|
||||
unreachable!("unexpected cell type: {:?}", cell);
|
||||
};
|
||||
let (payload, payload_size, first_overflow_page) = match cell {
|
||||
BTreeCell::TableLeafCell(TableLeafCell {
|
||||
_rowid,
|
||||
_payload,
|
||||
payload_size,
|
||||
first_overflow_page,
|
||||
}) => (_payload, payload_size, first_overflow_page),
|
||||
BTreeCell::IndexInteriorCell(IndexInteriorCell {
|
||||
left_child_page: _,
|
||||
payload,
|
||||
payload_size,
|
||||
first_overflow_page,
|
||||
}) => (payload, payload_size, first_overflow_page),
|
||||
BTreeCell::IndexLeafCell(IndexLeafCell {
|
||||
payload,
|
||||
first_overflow_page,
|
||||
payload_size,
|
||||
}) => (payload, payload_size, first_overflow_page),
|
||||
_ => unreachable!("unexpected page_type"),
|
||||
};
|
||||
if let Some(next_page) = first_overflow_page {
|
||||
return_if_io!(self.process_overflow_read(payload, *next_page, *payload_size))
|
||||
return_if_io!(self.process_overflow_read(payload, next_page, payload_size))
|
||||
} else {
|
||||
crate::storage::sqlite3_ondisk::read_record(
|
||||
payload,
|
||||
@@ -3994,10 +4035,7 @@ impl BTreeCursor {
|
||||
}
|
||||
return_if_io!(self.insert_into_page(key));
|
||||
if key.maybe_rowid().is_some() {
|
||||
let int_key = key.to_rowid();
|
||||
self.has_record.replace(CursorHasRecord::Yes {
|
||||
rowid: Some(int_key),
|
||||
});
|
||||
self.has_record.replace(true);
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -4040,8 +4078,8 @@ impl BTreeCursor {
|
||||
page.get().get_contents().page_type(),
|
||||
PageType::TableLeaf | PageType::TableInterior
|
||||
) {
|
||||
let _target_rowid = match self.has_record.get() {
|
||||
CursorHasRecord::Yes { rowid: Some(rowid) } => rowid,
|
||||
let _target_rowid = match return_if_io!(self.rowid()) {
|
||||
Some(rowid) => rowid,
|
||||
_ => {
|
||||
self.state = CursorState::None;
|
||||
return Ok(CursorResult::Ok(()));
|
||||
@@ -4222,9 +4260,13 @@ impl BTreeCursor {
|
||||
let needs_balancing = free_space as usize * 3 > self.usable_space() * 2;
|
||||
|
||||
let target_key = if page.is_index() {
|
||||
DeleteSavepoint::Payload(self.record().as_ref().unwrap().clone())
|
||||
let record = match &*return_if_io!(self.record()) {
|
||||
Some(record) => record.clone(),
|
||||
None => unreachable!("there should've been a record"),
|
||||
};
|
||||
DeleteSavepoint::Payload(record)
|
||||
} else {
|
||||
let CursorHasRecord::Yes { rowid: Some(rowid) } = self.has_record.get()
|
||||
let Some(rowid) = return_if_io!(self.rowid())
|
||||
else {
|
||||
panic!("cursor should be pointing to a record with a rowid");
|
||||
};
|
||||
@@ -4245,7 +4287,7 @@ impl BTreeCursor {
|
||||
return Ok(CursorResult::Ok(()));
|
||||
}
|
||||
// Only reaches this function call if state = DeleteState::WaitForBalancingToComplete
|
||||
self.save_context();
|
||||
// self.save_context();
|
||||
}
|
||||
|
||||
DeleteState::WaitForBalancingToComplete { target_key } => {
|
||||
@@ -4321,7 +4363,7 @@ impl BTreeCursor {
|
||||
pub fn key_exists_in_index(&mut self, key: &ImmutableRecord) -> Result<CursorResult<bool>> {
|
||||
return_if_io!(self.seek(SeekKey::IndexKey(key), SeekOp::GE));
|
||||
|
||||
let record_opt = self.record();
|
||||
let record_opt = return_if_io!(self.record());
|
||||
match record_opt.as_ref() {
|
||||
Some(record) => {
|
||||
// Existing record found — compare prefix
|
||||
@@ -4657,7 +4699,7 @@ impl BTreeCursor {
|
||||
// build the new payload
|
||||
let page_type = page_ref.get().get().contents.as_ref().unwrap().page_type();
|
||||
let mut new_payload = Vec::with_capacity(record.len());
|
||||
let CursorHasRecord::Yes { rowid } = self.has_record.get() else {
|
||||
let rowid = return_if_io!(self.rowid()) else {
|
||||
panic!("cursor should be pointing to a record");
|
||||
};
|
||||
fill_cell_payload(
|
||||
@@ -4842,27 +4884,27 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
/// Save cursor context, to be restored later
|
||||
pub fn save_context(&mut self) {
|
||||
if let CursorHasRecord::Yes { rowid } = self.has_record.get() {
|
||||
self.valid_state = CursorValidState::RequireSeek;
|
||||
match self.stack.top().get().get_contents().page_type() {
|
||||
PageType::TableInterior | PageType::TableLeaf => {
|
||||
self.context = Some(CursorContext::TableRowId(rowid.expect(
|
||||
"table cells should have a rowid since we don't support WITHOUT ROWID tables",
|
||||
)));
|
||||
}
|
||||
PageType::IndexInterior | PageType::IndexLeaf => {
|
||||
self.context = Some(CursorContext::IndexKeyRowId(
|
||||
self.reusable_immutable_record
|
||||
.borrow()
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// pub fn save_context(&mut self) {
|
||||
// if let CursorHasRecord::Yes { rowid } = self.has_record.get() {
|
||||
// self.valid_state = CursorValidState::RequireSeek;
|
||||
// match self.stack.top().get().get_contents().page_type() {
|
||||
// PageType::TableInterior | PageType::TableLeaf => {
|
||||
// self.context = Some(CursorContext::TableRowId(rowid.expect(
|
||||
// "table cells should have a rowid since we don't support WITHOUT ROWID tables",
|
||||
// )));
|
||||
// }
|
||||
// PageType::IndexInterior | PageType::IndexLeaf => {
|
||||
// self.context = Some(CursorContext::IndexKeyRowId(
|
||||
// self.reusable_immutable_record
|
||||
// .borrow()
|
||||
// .as_ref()
|
||||
// .unwrap()
|
||||
// .clone(),
|
||||
// ));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
/// If context is defined, restore it and set it None on success
|
||||
fn restore_context(&mut self) -> Result<CursorResult<()>> {
|
||||
@@ -4962,7 +5004,7 @@ impl PageStack {
|
||||
}
|
||||
|
||||
fn push(&self, page: BTreePage) {
|
||||
self._push(page, 0);
|
||||
self._push(page, -1);
|
||||
}
|
||||
|
||||
fn push_backwards(&self, page: BTreePage) {
|
||||
@@ -6501,7 +6543,9 @@ mod tests {
|
||||
for key in keys.iter() {
|
||||
tracing::trace!("seeking key: {}", key);
|
||||
run_until_done(|| cursor.next(), pager.deref()).unwrap();
|
||||
let cursor_rowid = cursor.rowid().unwrap().unwrap();
|
||||
let cursor_rowid = run_until_done(|| cursor.rowid(), pager.deref())
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
if *key != cursor_rowid {
|
||||
valid = false;
|
||||
println!("key {} is not found, got {}", key, cursor_rowid);
|
||||
@@ -6532,7 +6576,9 @@ mod tests {
|
||||
for key in keys.iter() {
|
||||
tracing::trace!("seeking key: {}", key);
|
||||
run_until_done(|| cursor.next(), pager.deref()).unwrap();
|
||||
let cursor_rowid = cursor.rowid().unwrap().unwrap();
|
||||
let cursor_rowid = run_until_done(|| cursor.rowid(), pager.deref())
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
*key, cursor_rowid,
|
||||
"key {} is not found, got {}",
|
||||
@@ -7815,11 +7861,13 @@ mod tests {
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page);
|
||||
cursor.move_to_root();
|
||||
for i in 0..iterations {
|
||||
let CursorHasRecord::Yes { rowid: Some(rowid) } =
|
||||
run_until_done(|| cursor.get_next_record(None), pager.deref()).unwrap()
|
||||
else {
|
||||
let has_next = run_until_done(|| cursor.get_next_record(), pager.deref()).unwrap();
|
||||
if !has_next {
|
||||
panic!("expected Some(rowid) but got {:?}", cursor.has_record.get());
|
||||
};
|
||||
let rowid = run_until_done(|| cursor.rowid(), pager.deref())
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(rowid, i as i64, "got!=expected");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -377,6 +377,15 @@ pub enum PageType {
|
||||
TableLeaf = 13,
|
||||
}
|
||||
|
||||
impl PageType {
|
||||
pub fn is_table(&self) -> bool {
|
||||
match self {
|
||||
PageType::IndexInterior | PageType::IndexLeaf => false,
|
||||
PageType::TableInterior | PageType::TableLeaf => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for PageType {
|
||||
type Error = LimboError;
|
||||
|
||||
|
||||
@@ -1348,7 +1348,7 @@ pub fn op_column(
|
||||
let rowid = {
|
||||
let mut index_cursor = state.get_cursor(index_cursor_id);
|
||||
let index_cursor = index_cursor.as_btree_mut();
|
||||
index_cursor.rowid()?
|
||||
return_if_io!(index_cursor.rowid())
|
||||
};
|
||||
let mut table_cursor = state.get_cursor(table_cursor_id);
|
||||
let table_cursor = table_cursor.as_btree_mut();
|
||||
@@ -1369,7 +1369,7 @@ pub fn op_column(
|
||||
let mut cursor =
|
||||
must_be_btree_cursor!(*cursor_id, program.cursor_ref, state, "Column");
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let record = cursor.record();
|
||||
let record = return_if_io!(cursor.record());
|
||||
let value = if let Some(record) = record.as_ref() {
|
||||
if cursor.get_null_flag() {
|
||||
RefValue::Null
|
||||
@@ -1904,7 +1904,7 @@ pub fn op_row_id(
|
||||
let rowid = {
|
||||
let mut index_cursor = state.get_cursor(index_cursor_id);
|
||||
let index_cursor = index_cursor.as_btree_mut();
|
||||
let record = index_cursor.record();
|
||||
let record = return_if_io!(index_cursor.record());
|
||||
let record = record.as_ref().unwrap();
|
||||
let rowid = record.get_values().last().unwrap();
|
||||
match rowid {
|
||||
@@ -1926,7 +1926,7 @@ pub fn op_row_id(
|
||||
}
|
||||
let mut cursors = state.cursors.borrow_mut();
|
||||
if let Some(Cursor::BTree(btree_cursor)) = cursors.get_mut(*cursor_id).unwrap() {
|
||||
if let Some(ref rowid) = btree_cursor.rowid()? {
|
||||
if let Some(ref rowid) = return_if_io!(btree_cursor.rowid()) {
|
||||
state.registers[*dest] = Register::Value(Value::Integer(*rowid as i64));
|
||||
} else {
|
||||
state.registers[*dest] = Register::Value(Value::Null);
|
||||
@@ -1960,7 +1960,7 @@ pub fn op_idx_row_id(
|
||||
let mut cursors = state.cursors.borrow_mut();
|
||||
let cursor = cursors.get_mut(*cursor_id).unwrap().as_mut().unwrap();
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let rowid = cursor.rowid()?;
|
||||
let rowid = return_if_io!(cursor.rowid());
|
||||
state.registers[*dest] = match rowid {
|
||||
Some(rowid) => Register::Value(Value::Integer(rowid as i64)),
|
||||
None => Register::Value(Value::Null),
|
||||
@@ -2160,7 +2160,7 @@ pub fn op_idx_ge(
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let record_from_regs = make_record(&state.registers, start_reg, num_regs);
|
||||
let pc = if let Some(ref idx_record) = *cursor.record() {
|
||||
let pc = if let Some(ref idx_record) = *return_if_io!(cursor.record()) {
|
||||
// Compare against the same number of values
|
||||
let idx_values = idx_record.get_values();
|
||||
let idx_values = &idx_values[..record_from_regs.len()];
|
||||
@@ -2224,7 +2224,7 @@ pub fn op_idx_le(
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let record_from_regs = make_record(&state.registers, start_reg, num_regs);
|
||||
let pc = if let Some(ref idx_record) = *cursor.record() {
|
||||
let pc = if let Some(ref idx_record) = *return_if_io!(cursor.record()) {
|
||||
// Compare against the same number of values
|
||||
let idx_values = idx_record.get_values();
|
||||
let idx_values = &idx_values[..record_from_regs.len()];
|
||||
@@ -2270,7 +2270,7 @@ pub fn op_idx_gt(
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let record_from_regs = make_record(&state.registers, start_reg, num_regs);
|
||||
let pc = if let Some(ref idx_record) = *cursor.record() {
|
||||
let pc = if let Some(ref idx_record) = *return_if_io!(cursor.record()) {
|
||||
// Compare against the same number of values
|
||||
let idx_values = idx_record.get_values();
|
||||
let idx_values = &idx_values[..record_from_regs.len()];
|
||||
@@ -2316,7 +2316,7 @@ pub fn op_idx_lt(
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let record_from_regs = make_record(&state.registers, start_reg, num_regs);
|
||||
let pc = if let Some(ref idx_record) = *cursor.record() {
|
||||
let pc = if let Some(ref idx_record) = *return_if_io!(cursor.record()) {
|
||||
// Compare against the same number of values
|
||||
let idx_values = idx_record.get_values();
|
||||
let idx_values = &idx_values[..record_from_regs.len()];
|
||||
@@ -3842,7 +3842,7 @@ pub fn op_insert(
|
||||
return_if_io!(cursor.insert(&BTreeKey::new_table_rowid(key, Some(record)), true));
|
||||
// Only update last_insert_rowid for regular table inserts, not schema modifications
|
||||
if cursor.root_page() != 1 {
|
||||
if let Some(rowid) = cursor.rowid()? {
|
||||
if let Some(rowid) = return_if_io!(cursor.rowid()) {
|
||||
if let Some(conn) = program.connection.upgrade() {
|
||||
conn.update_last_rowid(rowid);
|
||||
}
|
||||
@@ -3932,10 +3932,10 @@ pub fn op_idx_delete(
|
||||
tracing::debug!(
|
||||
"op_idx_delete(seek={}, record={} rowid={:?})",
|
||||
&record,
|
||||
cursor.record().as_ref().unwrap(),
|
||||
cursor.rowid()
|
||||
return_if_io!(cursor.record()).as_ref().unwrap(),
|
||||
return_if_io!(cursor.rowid())
|
||||
);
|
||||
if cursor.rowid()?.is_none() {
|
||||
if return_if_io!(cursor.rowid()).is_none() {
|
||||
// If P5 is not zero, then raise an SQLITE_CORRUPT_INDEX error if no matching
|
||||
// index entry is found. This happens when running an UPDATE or DELETE statement and the
|
||||
// index entry to be updated or deleted is not found. For some uses of IdxDelete
|
||||
|
||||
@@ -499,11 +499,11 @@ fn get_new_rowid<R: Rng>(cursor: &mut BTreeCursor, mut rng: R) -> Result<CursorR
|
||||
CursorResult::Ok(()) => {}
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
}
|
||||
let mut rowid = cursor
|
||||
.rowid()?
|
||||
.unwrap_or(0) // if BTree is empty - use 0 as initial value for rowid
|
||||
.checked_add(1) // add 1 but be careful with overflows
|
||||
.unwrap_or(i64::MAX); // in case of overflow - use i64::MAX
|
||||
let mut rowid = match cursor.rowid()? {
|
||||
CursorResult::Ok(Some(rowid)) => rowid.checked_add(1).unwrap_or(i64::MAX), // add 1 but be careful with overflows, in case of overflow - use i64::MAX
|
||||
CursorResult::Ok(None) => 0,
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
};
|
||||
if rowid > i64::MAX.try_into().unwrap() {
|
||||
let distribution = Uniform::from(1..=i64::MAX);
|
||||
let max_attempts = 100;
|
||||
|
||||
Reference in New Issue
Block a user