mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-21 07:55:18 +01:00
refactor/idx_insert: move seeking to VDBE instead of BTreeCursor
Also removes `InsertState` and `moved_before` since neither are needed anymore.
This commit is contained in:
@@ -637,7 +637,7 @@ impl StateTransition for WriteRowStateMachine {
|
||||
let key = BTreeKey::new_table_rowid(self.row.id.row_id, self.record.as_ref());
|
||||
|
||||
match cursor
|
||||
.insert(&key, true)
|
||||
.insert(&key)
|
||||
.map_err(|e| LimboError::InternalError(e.to_string()))?
|
||||
{
|
||||
IOResult::Done(()) => {
|
||||
|
||||
@@ -11,8 +11,8 @@ use crate::{
|
||||
LEAF_PAGE_HEADER_SIZE_BYTES, LEFT_CHILD_PTR_SIZE_BYTES,
|
||||
},
|
||||
state_machines::{
|
||||
AdvanceState, CountState, EmptyTableState, InsertState, MoveToRightState, MoveToState,
|
||||
RewindState, SeekEndState, SeekToLastState,
|
||||
AdvanceState, CountState, EmptyTableState, MoveToRightState, MoveToState, RewindState,
|
||||
SeekEndState, SeekToLastState,
|
||||
},
|
||||
},
|
||||
translate::plan::IterationDirection,
|
||||
@@ -551,8 +551,6 @@ pub struct BTreeCursor {
|
||||
count_state: CountState,
|
||||
/// State machine for [BTreeCursor::seek_end]
|
||||
seek_end_state: SeekEndState,
|
||||
/// State machine for [BTreeCursor::insert]
|
||||
insert_state: InsertState,
|
||||
/// State machine for [BTreeCursor::move_to]
|
||||
move_to_state: MoveToState,
|
||||
}
|
||||
@@ -626,7 +624,6 @@ impl BTreeCursor {
|
||||
advance_state: AdvanceState::Start,
|
||||
count_state: CountState::Start,
|
||||
seek_end_state: SeekEndState::Start,
|
||||
insert_state: InsertState::Start,
|
||||
move_to_state: MoveToState::Start,
|
||||
}
|
||||
}
|
||||
@@ -4378,14 +4375,7 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
#[instrument(skip(self), level = Level::DEBUG)]
|
||||
pub fn insert(
|
||||
&mut self,
|
||||
key: &BTreeKey,
|
||||
// Indicate whether it's necessary to traverse to find the leaf page
|
||||
// FIXME: refactor this out into a state machine, these ad-hoc state
|
||||
// variables are very hard to reason about
|
||||
mut moved_before: bool,
|
||||
) -> Result<IOResult<()>> {
|
||||
pub fn insert(&mut self, key: &BTreeKey) -> Result<IOResult<()>> {
|
||||
tracing::debug!(valid_state = ?self.valid_state, cursor_state = ?self.state, is_write_in_progress = self.is_write_in_progress());
|
||||
match &self.mv_cursor {
|
||||
Some(mv_cursor) => match key.maybe_rowid() {
|
||||
@@ -4404,90 +4394,9 @@ impl BTreeCursor {
|
||||
None => todo!("Support mvcc inserts with index btrees"),
|
||||
},
|
||||
None => {
|
||||
loop {
|
||||
let state = self.insert_state;
|
||||
match state {
|
||||
InsertState::Start => {
|
||||
match (&self.valid_state, self.is_write_in_progress()) {
|
||||
(CursorValidState::Invalid, _) => {
|
||||
panic!("trying to insert with invalid BTreeCursor");
|
||||
}
|
||||
(CursorValidState::Valid, _) => {
|
||||
// consider the current position valid unless the caller explicitly asks us to seek.
|
||||
}
|
||||
(CursorValidState::RequireSeek, false) => {
|
||||
// we must seek.
|
||||
moved_before = false;
|
||||
}
|
||||
(CursorValidState::RequireSeek, true) => {
|
||||
// illegal to seek during a write no matter what CursorValidState or caller says -- we might e.g. move to the wrong page during balancing
|
||||
moved_before = true;
|
||||
}
|
||||
(CursorValidState::RequireAdvance(direction), _) => {
|
||||
// FIXME: this is a hack to support the case where we need to advance the cursor after a seek.
|
||||
// We should have a proper state machine for this.
|
||||
return_if_io!(match direction {
|
||||
IterationDirection::Forwards => self.next(),
|
||||
IterationDirection::Backwards => self.prev(),
|
||||
});
|
||||
self.valid_state = CursorValidState::Valid;
|
||||
self.seek_state = CursorSeekState::Start;
|
||||
moved_before = true;
|
||||
}
|
||||
};
|
||||
if !moved_before {
|
||||
self.insert_state = InsertState::Seek;
|
||||
} else {
|
||||
self.insert_state = InsertState::InsertIntoPage;
|
||||
}
|
||||
}
|
||||
InsertState::Seek => {
|
||||
let seek_result = match key {
|
||||
BTreeKey::IndexKey(_) => {
|
||||
return_if_io!(self.seek(
|
||||
SeekKey::IndexKey(key.get_record().unwrap()),
|
||||
SeekOp::GE { eq_only: true }
|
||||
))
|
||||
}
|
||||
BTreeKey::TableRowId(_) => {
|
||||
return_if_io!(self.seek(
|
||||
SeekKey::TableRowId(key.to_rowid()),
|
||||
SeekOp::GE { eq_only: true }
|
||||
))
|
||||
}
|
||||
};
|
||||
if SeekResult::TryAdvance == seek_result {
|
||||
self.valid_state =
|
||||
CursorValidState::RequireAdvance(IterationDirection::Forwards);
|
||||
self.insert_state = InsertState::Advance;
|
||||
}
|
||||
self.context.take(); // we know where we wanted to move so if there was any saved context, discard it.
|
||||
self.valid_state = CursorValidState::Valid;
|
||||
tracing::debug!(
|
||||
"seeked to the right place, page is now {:?}",
|
||||
self.stack.top().get().get().id
|
||||
);
|
||||
self.insert_state = InsertState::InsertIntoPage;
|
||||
}
|
||||
InsertState::Advance => {
|
||||
return_if_io!(self.next());
|
||||
self.context.take(); // we know where we wanted to move so if there was any saved context, discard it.
|
||||
self.valid_state = CursorValidState::Valid;
|
||||
tracing::debug!(
|
||||
"seeked to the right place, page is now {:?}",
|
||||
self.stack.top().get().get().id
|
||||
);
|
||||
self.insert_state = InsertState::InsertIntoPage;
|
||||
}
|
||||
InsertState::InsertIntoPage => {
|
||||
return_if_io!(self.insert_into_page(key));
|
||||
if key.maybe_rowid().is_some() {
|
||||
self.has_record.replace(true);
|
||||
}
|
||||
self.insert_state = InsertState::Start;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return_if_io!(self.insert_into_page(key));
|
||||
if key.maybe_rowid().is_some() {
|
||||
self.has_record.replace(true);
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -7604,7 +7513,7 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
let key = BTreeKey::new_table_rowid(1, Some(&large_record));
|
||||
run_until_done(|| cursor.insert(&key, true), pager.deref()).unwrap();
|
||||
run_until_done(|| cursor.insert(&key), pager.deref()).unwrap();
|
||||
|
||||
// Verify that overflow pages were created by checking freelist count
|
||||
// The freelist count should be 0 initially, and after inserting a large record,
|
||||
@@ -7646,7 +7555,7 @@ mod tests {
|
||||
|
||||
// Overwrite the record with the same rowid
|
||||
let key = BTreeKey::new_table_rowid(1, Some(&small_record));
|
||||
run_until_done(|| cursor.insert(&key, true), pager.deref()).unwrap();
|
||||
run_until_done(|| cursor.insert(&key), pager.deref()).unwrap();
|
||||
|
||||
// Check that the freelist count has increased, indicating overflow pages were cleared
|
||||
let freelist_after_overwrite = pager
|
||||
@@ -7749,7 +7658,7 @@ mod tests {
|
||||
let value = ImmutableRecord::from_registers(regs, regs.len());
|
||||
tracing::info!("insert key:{}", key);
|
||||
run_until_done(
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(*key, Some(&value)), true),
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(*key, Some(&value))),
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
@@ -7846,7 +7755,7 @@ mod tests {
|
||||
"".to_string()
|
||||
};
|
||||
run_until_done(
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(key, Some(&value)), true),
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(key, Some(&value))),
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
@@ -7990,12 +7899,7 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
run_until_done(
|
||||
|| {
|
||||
cursor.insert(
|
||||
&BTreeKey::new_index_key(&value),
|
||||
cursor.is_write_in_progress(),
|
||||
)
|
||||
},
|
||||
|| cursor.insert(&BTreeKey::new_index_key(&value)),
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
@@ -8166,12 +8070,7 @@ mod tests {
|
||||
run_until_done(|| cursor.next(), pager.deref()).unwrap();
|
||||
}
|
||||
run_until_done(
|
||||
|| {
|
||||
cursor.insert(
|
||||
&BTreeKey::new_index_key(&value),
|
||||
cursor.is_write_in_progress(),
|
||||
)
|
||||
},
|
||||
|| cursor.insert(&BTreeKey::new_index_key(&value)),
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
@@ -9387,7 +9286,7 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
run_until_done(
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(i, Some(&value)), true),
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(i, Some(&value))),
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
@@ -9474,7 +9373,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
run_until_done(
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(i, Some(&value)), true),
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(i, Some(&value))),
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
@@ -9559,7 +9458,7 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
run_until_done(
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(i as i64, Some(&value)), true),
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(i as i64, Some(&value))),
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
@@ -9603,7 +9502,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
run_until_done(
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(1, Some(&value)), true),
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(1, Some(&value))),
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
@@ -9680,7 +9579,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
run_until_done(
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(1, Some(&value)), true),
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(1, Some(&value))),
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -43,14 +43,6 @@ pub enum SeekEndState {
|
||||
ProcessPage,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum InsertState {
|
||||
Start,
|
||||
Seek,
|
||||
Advance,
|
||||
InsertIntoPage,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum MoveToState {
|
||||
Start,
|
||||
|
||||
@@ -5288,9 +5288,7 @@ pub fn op_insert(
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
|
||||
return_if_io!(
|
||||
cursor.insert(&BTreeKey::new_table_rowid(key, Some(&record)), true,)
|
||||
);
|
||||
return_if_io!(cursor.insert(&BTreeKey::new_table_rowid(key, Some(&record))));
|
||||
}
|
||||
// Increment metrics for row write
|
||||
state.metrics.rows_written = state.metrics.rows_written.saturating_add(1);
|
||||
@@ -5644,13 +5642,12 @@ pub fn op_idx_delete(
|
||||
|
||||
#[derive(Debug, PartialEq, Copy, Clone)]
|
||||
pub enum OpIdxInsertState {
|
||||
/// Optional seek step done before an unique constraint check.
|
||||
SeekIfUnique,
|
||||
/// Optional seek step done before an unique constraint check or if the caller indicates a seek is required.
|
||||
MaybeSeek,
|
||||
/// Optional unique constraint check done before an insert.
|
||||
UniqueConstraintCheck,
|
||||
/// Main insert step. This is always performed. Usually the state machine just
|
||||
/// skips to this step unless the insertion is made into a unique index.
|
||||
Insert { moved_before: bool },
|
||||
/// Main insert step. This is always performed.
|
||||
Insert,
|
||||
}
|
||||
|
||||
pub fn op_idx_insert(
|
||||
@@ -5680,15 +5677,16 @@ pub fn op_idx_insert(
|
||||
};
|
||||
|
||||
match state.op_idx_insert_state {
|
||||
OpIdxInsertState::SeekIfUnique => {
|
||||
OpIdxInsertState::MaybeSeek => {
|
||||
let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();
|
||||
let CursorType::BTreeIndex(index_meta) = cursor_type else {
|
||||
panic!("IdxInsert: not a BTreeIndex cursor");
|
||||
};
|
||||
if !index_meta.unique {
|
||||
state.op_idx_insert_state = OpIdxInsertState::Insert {
|
||||
moved_before: false,
|
||||
};
|
||||
|
||||
// TODO: currently we never pass USE_SEEK, so this other check is a bit redundant and we always seek,
|
||||
// but I guess it's FutureProofed™®
|
||||
if !index_meta.unique && flags.has(IdxInsertFlags::USE_SEEK) {
|
||||
state.op_idx_insert_state = OpIdxInsertState::Insert;
|
||||
return Ok(InsnFunctionStepResult::Step);
|
||||
}
|
||||
|
||||
@@ -5703,11 +5701,15 @@ pub fn op_idx_insert(
|
||||
SeekOp::GE { eq_only: true },
|
||||
)? {
|
||||
SeekInternalResult::Found => {
|
||||
state.op_idx_insert_state = OpIdxInsertState::UniqueConstraintCheck;
|
||||
state.op_idx_insert_state = if index_meta.unique {
|
||||
OpIdxInsertState::UniqueConstraintCheck
|
||||
} else {
|
||||
OpIdxInsertState::Insert
|
||||
};
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
SeekInternalResult::NotFound => {
|
||||
state.op_idx_insert_state = OpIdxInsertState::Insert { moved_before: true };
|
||||
state.op_idx_insert_state = OpIdxInsertState::Insert;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
SeekInternalResult::IO(io) => Ok(InsnFunctionStepResult::IO(io)),
|
||||
@@ -5753,33 +5755,23 @@ pub fn op_idx_insert(
|
||||
};
|
||||
state.op_idx_insert_state = if ignore_conflict {
|
||||
state.pc += 1;
|
||||
OpIdxInsertState::SeekIfUnique
|
||||
OpIdxInsertState::MaybeSeek
|
||||
} else {
|
||||
OpIdxInsertState::Insert { moved_before: true }
|
||||
OpIdxInsertState::Insert
|
||||
};
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
OpIdxInsertState::Insert { moved_before } => {
|
||||
OpIdxInsertState::Insert => {
|
||||
{
|
||||
let mut cursor = state.get_cursor(cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
// To make this reentrant in case of `moved_before` = false, we need to check if the previous cursor.insert started
|
||||
// a write/balancing operation. If it did, it means we already moved to the place we wanted.
|
||||
let moved_before = moved_before
|
||||
|| cursor.is_write_in_progress()
|
||||
|| flags.has(IdxInsertFlags::USE_SEEK);
|
||||
// Start insertion of row. This might trigger a balance procedure which will take care of moving to different pages,
|
||||
// therefore, we don't want to seek again if that happens, meaning we don't want to return on io without moving to the following opcode
|
||||
// because it could trigger a movement to child page after a balance root which will leave the current page as the root page.
|
||||
return_if_io!(
|
||||
cursor.insert(&BTreeKey::new_index_key(record_to_insert), moved_before)
|
||||
);
|
||||
return_if_io!(cursor.insert(&BTreeKey::new_index_key(record_to_insert)));
|
||||
}
|
||||
// Increment metrics for index write
|
||||
if flags.has(IdxInsertFlags::NCHANGE) {
|
||||
state.metrics.rows_written = state.metrics.rows_written.saturating_add(1);
|
||||
}
|
||||
state.op_idx_insert_state = OpIdxInsertState::SeekIfUnique;
|
||||
state.op_idx_insert_state = OpIdxInsertState::MaybeSeek;
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
@@ -304,7 +304,7 @@ impl ProgramState {
|
||||
metrics: StatementMetrics::new(),
|
||||
op_open_ephemeral_state: OpOpenEphemeralState::Start,
|
||||
op_new_rowid_state: OpNewRowidState::Start,
|
||||
op_idx_insert_state: OpIdxInsertState::SeekIfUnique,
|
||||
op_idx_insert_state: OpIdxInsertState::MaybeSeek,
|
||||
op_insert_state: OpInsertState {
|
||||
sub_state: OpInsertSubState::MaybeCaptureRecord,
|
||||
old_record: None,
|
||||
|
||||
Reference in New Issue
Block a user