mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 08:55:40 +01:00
Refactor btree to reuse existing insert and seek with idx keys
This commit is contained in:
@@ -171,6 +171,50 @@ enum ReadPayloadOverflow {
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum BTreeKey<'a> {
|
||||
TableRowId((u64, Option<&'a ImmutableRecord>)),
|
||||
IndexKey(&'a ImmutableRecord),
|
||||
}
|
||||
|
||||
impl<'a> BTreeKey<'_> {
|
||||
pub fn new_table_rowid(rowid: u64, record: Option<&'a ImmutableRecord>) -> BTreeKey<'a> {
|
||||
BTreeKey::TableRowId((rowid, record))
|
||||
}
|
||||
pub fn new_index_key(record: &'a ImmutableRecord) -> BTreeKey<'a> {
|
||||
BTreeKey::IndexKey(record)
|
||||
}
|
||||
fn get_record(&self) -> Option<&'_ ImmutableRecord> {
|
||||
match self {
|
||||
BTreeKey::TableRowId((_, record)) => *record,
|
||||
BTreeKey::IndexKey(record) => Some(record),
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_rowid(&self) -> Option<u64> {
|
||||
match self {
|
||||
BTreeKey::TableRowId((rowid, _)) => Some(*rowid),
|
||||
BTreeKey::IndexKey(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Assert that the key is a rowid and return it.
|
||||
fn to_rowid(&self) -> u64 {
|
||||
match self {
|
||||
BTreeKey::TableRowId((rowid, _)) => *rowid,
|
||||
BTreeKey::IndexKey(_) => panic!("BTreeKey::assert_rowid() called on IndexKey"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Assert that the key is an index key and return it.
|
||||
fn to_index_key_values(&self) -> &'_ Vec<RefValue> {
|
||||
match self {
|
||||
BTreeKey::TableRowId(_) => panic!("BTreeKey::assert_index_key() called on TableRowId"),
|
||||
BTreeKey::IndexKey(key) => key.get_values(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct BalanceInfo {
|
||||
/// Old pages being balanced.
|
||||
@@ -1047,72 +1091,13 @@ impl BTreeCursor {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert_index_key(&mut self, key: &ImmutableRecord) -> Result<CursorResult<()>> {
|
||||
if let CursorState::None = &self.state {
|
||||
self.state = CursorState::Write(WriteInfo::new());
|
||||
}
|
||||
|
||||
let ret = loop {
|
||||
let write_state = self.state.mut_write_info().unwrap().state;
|
||||
match write_state {
|
||||
WriteState::Start => {
|
||||
let page = self.stack.top();
|
||||
return_if_locked!(page);
|
||||
page.set_dirty();
|
||||
self.pager.add_dirty(page.get().id);
|
||||
let page = page.get().contents.as_mut().unwrap();
|
||||
|
||||
assert!(matches!(page.page_type(), PageType::IndexLeaf));
|
||||
let cell_idx = self.find_index_cell(page, key);
|
||||
let mut cell_payload: Vec<u8> = Vec::new();
|
||||
fill_cell_payload(
|
||||
page.page_type(),
|
||||
None,
|
||||
&mut cell_payload,
|
||||
key,
|
||||
self.usable_space() as u16,
|
||||
self.pager.clone(),
|
||||
);
|
||||
// insert
|
||||
let overflow = {
|
||||
debug!(
|
||||
"insert_index_key(overflow, cell_count={})",
|
||||
page.cell_count()
|
||||
);
|
||||
insert_into_cell(
|
||||
page,
|
||||
cell_payload.as_slice(),
|
||||
cell_idx,
|
||||
self.usable_space() as u16,
|
||||
)?;
|
||||
page.overflow_cells.len()
|
||||
};
|
||||
let write_info = self.state.mut_write_info().unwrap();
|
||||
write_info.state = if overflow > 0 {
|
||||
WriteState::BalanceStart
|
||||
} else {
|
||||
WriteState::Finish
|
||||
};
|
||||
}
|
||||
WriteState::BalanceStart
|
||||
| WriteState::BalanceNonRoot
|
||||
| WriteState::BalanceNonRootWaitLoadPages => {
|
||||
return_if_io!(self.balance());
|
||||
}
|
||||
WriteState::Finish => break Ok(CursorResult::Ok(())),
|
||||
}
|
||||
};
|
||||
self.state = CursorState::None;
|
||||
ret
|
||||
}
|
||||
|
||||
/// Insert a record into the btree.
|
||||
/// If the insert operation overflows the page, it will be split and the btree will be balanced.
|
||||
fn insert_into_page(
|
||||
&mut self,
|
||||
key: &OwnedValue,
|
||||
record: &ImmutableRecord,
|
||||
) -> Result<CursorResult<()>> {
|
||||
fn insert_into_page(&mut self, bkey: &BTreeKey) -> Result<CursorResult<()>> {
|
||||
let record = bkey
|
||||
.get_record()
|
||||
.expect("expected record present on insert");
|
||||
|
||||
if let CursorState::None = &self.state {
|
||||
self.state = CursorState::Write(WriteInfo::new());
|
||||
}
|
||||
@@ -1128,10 +1113,6 @@ impl BTreeCursor {
|
||||
WriteState::Start => {
|
||||
let page = self.stack.top();
|
||||
return_if_locked_maybe_load!(self.pager, page);
|
||||
let int_key = match key {
|
||||
OwnedValue::Integer(i) => *i as u64,
|
||||
_ => unreachable!("btree tables are indexed by integers!"),
|
||||
};
|
||||
|
||||
// get page and find cell
|
||||
let (cell_idx, page_type) = {
|
||||
@@ -1141,23 +1122,27 @@ impl BTreeCursor {
|
||||
self.pager.add_dirty(page.get().id);
|
||||
|
||||
let page = page.get().contents.as_mut().unwrap();
|
||||
assert!(matches!(page.page_type(), PageType::TableLeaf));
|
||||
assert!(matches!(
|
||||
page.page_type(),
|
||||
PageType::TableLeaf | PageType::IndexLeaf
|
||||
));
|
||||
|
||||
// find cell
|
||||
(self.find_cell(page, int_key), page.page_type())
|
||||
(self.find_cell(page, bkey), page.page_type())
|
||||
};
|
||||
tracing::debug!("insert_into_page(cell_idx={})", cell_idx);
|
||||
|
||||
// if the cell index is less than the total cells, check: if its an existing
|
||||
// rowid, we are going to update / overwrite the cell
|
||||
if cell_idx < page.get_contents().cell_count() {
|
||||
if let BTreeCell::TableLeafCell(tbl_leaf) = page.get_contents().cell_get(
|
||||
match page.get_contents().cell_get(
|
||||
cell_idx,
|
||||
payload_overflow_threshold_max(page_type, self.usable_space() as u16),
|
||||
payload_overflow_threshold_min(page_type, self.usable_space() as u16),
|
||||
self.usable_space(),
|
||||
)? {
|
||||
if tbl_leaf._rowid == int_key {
|
||||
BTreeCell::TableLeafCell(tbl_leaf) => {
|
||||
if tbl_leaf._rowid == bkey.to_rowid() {
|
||||
tracing::debug!("insert_into_page: found exact match with cell_idx={cell_idx}, overwriting");
|
||||
self.overwrite_cell(page.clone(), cell_idx, record)?;
|
||||
self.state
|
||||
@@ -1167,12 +1152,37 @@ impl BTreeCursor {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
BTreeCell::IndexLeafCell(idx_leaf) => {
|
||||
read_record(
|
||||
idx_leaf.payload,
|
||||
self.get_immutable_record_or_create().as_mut().unwrap(),
|
||||
)
|
||||
.expect("failed to read record");
|
||||
if compare_immutable(
|
||||
record.get_values(),
|
||||
self.get_immutable_record()
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.get_values()
|
||||
) == Ordering::Equal {
|
||||
|
||||
tracing::debug!("insert_into_page: found exact match with cell_idx={cell_idx}, overwriting");
|
||||
self.overwrite_cell(page.clone(), cell_idx, record)?;
|
||||
self.state
|
||||
.mut_write_info()
|
||||
.expect("expected write info")
|
||||
.state = WriteState::Finish;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
other => panic!("unexpected cell type, expected TableLeaf or IndexLeaf, found: {:?}", other),
|
||||
}
|
||||
}
|
||||
// insert cell
|
||||
let mut cell_payload: Vec<u8> = Vec::with_capacity(record.len() + 4);
|
||||
fill_cell_payload(
|
||||
page_type,
|
||||
Some(int_key),
|
||||
bkey.maybe_rowid(),
|
||||
&mut cell_payload,
|
||||
record,
|
||||
self.usable_space() as u16,
|
||||
@@ -1971,8 +1981,7 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
/// Find the index of the cell in the page that contains the given rowid.
|
||||
/// BTree tables only.
|
||||
fn find_cell(&self, page: &PageContent, int_key: u64) -> usize {
|
||||
fn find_cell(&self, page: &PageContent, key: &BTreeKey) -> usize {
|
||||
let mut cell_idx = 0;
|
||||
let cell_count = page.cell_count();
|
||||
while cell_idx < cell_count {
|
||||
@@ -1986,35 +1995,15 @@ impl BTreeCursor {
|
||||
.unwrap()
|
||||
{
|
||||
BTreeCell::TableLeafCell(cell) => {
|
||||
if int_key <= cell._rowid {
|
||||
if key.to_rowid() <= cell._rowid {
|
||||
break;
|
||||
}
|
||||
}
|
||||
BTreeCell::TableInteriorCell(cell) => {
|
||||
if int_key <= cell._rowid {
|
||||
if key.to_rowid() <= cell._rowid {
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ => todo!(),
|
||||
}
|
||||
cell_idx += 1;
|
||||
}
|
||||
cell_idx
|
||||
}
|
||||
|
||||
fn find_index_cell(&self, page: &PageContent, key: &ImmutableRecord) -> usize {
|
||||
let mut cell_idx = 0;
|
||||
let cell_count = page.cell_count();
|
||||
while cell_idx < cell_count {
|
||||
match page
|
||||
.cell_get(
|
||||
cell_idx,
|
||||
payload_overflow_threshold_max(page.page_type(), self.usable_space() as u16),
|
||||
payload_overflow_threshold_min(page.page_type(), self.usable_space() as u16),
|
||||
self.usable_space(),
|
||||
)
|
||||
.unwrap()
|
||||
{
|
||||
BTreeCell::IndexInteriorCell(IndexInteriorCell { payload, .. })
|
||||
| BTreeCell::IndexLeafCell(IndexLeafCell { payload, .. }) => {
|
||||
// TODO: implement efficient comparison of records
|
||||
@@ -2025,7 +2014,7 @@ impl BTreeCursor {
|
||||
)
|
||||
.expect("failed to read record");
|
||||
let order = compare_immutable(
|
||||
key.get_values(),
|
||||
key.to_index_key_values(),
|
||||
self.get_immutable_record().as_ref().unwrap().get_values(),
|
||||
);
|
||||
match order {
|
||||
@@ -2035,7 +2024,6 @@ impl BTreeCursor {
|
||||
Ordering::Greater => {}
|
||||
}
|
||||
}
|
||||
_ => unreachable!("Expected Index cell types"),
|
||||
}
|
||||
cell_idx += 1;
|
||||
}
|
||||
@@ -2158,28 +2146,34 @@ impl BTreeCursor {
|
||||
|
||||
pub fn insert(
|
||||
&mut self,
|
||||
key: &OwnedValue,
|
||||
record: &ImmutableRecord,
|
||||
key: &BTreeKey,
|
||||
moved_before: bool, /* Indicate whether it's necessary to traverse to find the leaf page */
|
||||
) -> Result<CursorResult<()>> {
|
||||
let int_key = match key {
|
||||
OwnedValue::Integer(i) => i,
|
||||
_ => unreachable!("btree tables are indexed by integers!"),
|
||||
};
|
||||
match &self.mv_cursor {
|
||||
Some(mv_cursor) => {
|
||||
let row_id =
|
||||
crate::mvcc::database::RowID::new(self.table_id() as u64, *int_key as u64);
|
||||
let record_buf = record.get_payload().to_vec();
|
||||
let row = crate::mvcc::database::Row::new(row_id, record_buf);
|
||||
mv_cursor.borrow_mut().insert(row).unwrap();
|
||||
}
|
||||
Some(mv_cursor) => match key.maybe_rowid() {
|
||||
Some(rowid) => {
|
||||
let row_id = crate::mvcc::database::RowID::new(self.table_id() as u64, rowid);
|
||||
let record_buf = key.get_record().unwrap().get_payload().to_vec();
|
||||
let row = crate::mvcc::database::Row::new(row_id, record_buf);
|
||||
mv_cursor.borrow_mut().insert(row).unwrap();
|
||||
}
|
||||
None => todo!("Support mvcc inserts with index btrees"),
|
||||
},
|
||||
None => {
|
||||
if !moved_before {
|
||||
return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ));
|
||||
return_if_io!(self.move_to(
|
||||
match key {
|
||||
BTreeKey::IndexKey(_) => SeekKey::IndexKey(key.get_record().unwrap()),
|
||||
BTreeKey::TableRowId(_) => SeekKey::TableRowId(key.to_rowid()),
|
||||
},
|
||||
SeekOp::EQ
|
||||
));
|
||||
}
|
||||
return_if_io!(self.insert_into_page(key));
|
||||
if key.maybe_rowid().is_some() {
|
||||
let int_key = key.to_rowid();
|
||||
self.rowid.replace(Some(int_key));
|
||||
}
|
||||
return_if_io!(self.insert_into_page(key, record));
|
||||
self.rowid.replace(Some(*int_key as u64));
|
||||
}
|
||||
};
|
||||
Ok(CursorResult::Ok(()))
|
||||
@@ -2543,7 +2537,7 @@ impl BTreeCursor {
|
||||
OwnedValue::Integer(i) => *i as u64,
|
||||
_ => unreachable!("btree tables are indexed by integers!"),
|
||||
};
|
||||
let cell_idx = self.find_cell(contents, int_key);
|
||||
let cell_idx = self.find_cell(contents, &BTreeKey::new_table_rowid(int_key, None));
|
||||
if cell_idx >= contents.cell_count() {
|
||||
Ok(CursorResult::Ok(false))
|
||||
} else {
|
||||
@@ -4199,12 +4193,15 @@ mod tests {
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
let key = OwnedValue::Integer(*key);
|
||||
let value = ImmutableRecord::from_registers(&[Register::OwnedValue(
|
||||
OwnedValue::Blob(vec![0; *size]),
|
||||
)]);
|
||||
tracing::info!("insert key:{}", key);
|
||||
run_until_done(|| cursor.insert(&key, &value, true), pager.deref()).unwrap();
|
||||
run_until_done(
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(*key, Some(&value)), true),
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
tracing::info!(
|
||||
"=========== btree ===========\n{}\n\n",
|
||||
format_btree(pager.clone(), root_page, 0)
|
||||
@@ -4279,12 +4276,14 @@ mod tests {
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let key = OwnedValue::Integer(key);
|
||||
let value = ImmutableRecord::from_registers(&[Register::OwnedValue(
|
||||
OwnedValue::Blob(vec![0; size]),
|
||||
)]);
|
||||
run_until_done(|| cursor.insert(&key, &value, true), pager.deref()).unwrap();
|
||||
run_until_done(
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(key as u64, Some(&value)), true),
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
if matches!(validate_btree(pager.clone(), root_page), (_, false)) {
|
||||
panic!("invalid btree");
|
||||
}
|
||||
@@ -5170,7 +5169,11 @@ mod tests {
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
run_until_done(|| cursor.insert(&key, &value, true), pager.deref()).unwrap();
|
||||
run_until_done(
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(i as u64, Some(&value)), true),
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
keys.push(i);
|
||||
}
|
||||
if matches!(validate_btree(pager.clone(), root_page), (_, false)) {
|
||||
@@ -5248,7 +5251,11 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
run_until_done(|| cursor.insert(&key, &value, true), pager.deref()).unwrap();
|
||||
run_until_done(
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(i as u64, Some(&value)), true),
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
match validate_btree(pager.clone(), root_page) {
|
||||
@@ -5326,7 +5333,11 @@ mod tests {
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
run_until_done(|| cursor.insert(&key, &value, true), pager.deref()).unwrap();
|
||||
run_until_done(
|
||||
|| cursor.insert(&BTreeKey::new_table_rowid(i as u64, Some(&value)), true),
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
tracing::debug!(
|
||||
"=========== btree after ===========\n{}\n\n",
|
||||
format_btree(pager.clone(), root_page, 0)
|
||||
|
||||
@@ -11,7 +11,7 @@ use std::{borrow::BorrowMut, rc::Rc};
|
||||
use crate::pseudo::PseudoCursor;
|
||||
use crate::result::LimboResult;
|
||||
use crate::schema::{affinity, Affinity};
|
||||
use crate::storage::btree::BTreeCursor;
|
||||
use crate::storage::btree::{BTreeCursor, BTreeKey};
|
||||
use crate::storage::wal::CheckpointResult;
|
||||
use crate::types::{
|
||||
AggContext, Cursor, CursorResult, ExternalAggState, OwnedValue, SeekKey, SeekOp,
|
||||
@@ -3666,11 +3666,14 @@ pub fn op_insert_async(
|
||||
Register::Record(r) => r,
|
||||
_ => unreachable!("Not a record! Cannot insert a non record value."),
|
||||
};
|
||||
let key = &state.registers[*key_reg];
|
||||
let key = match &state.registers[*key_reg].get_owned_value() {
|
||||
OwnedValue::Integer(i) => *i,
|
||||
_ => unreachable!("expected integer key"),
|
||||
};
|
||||
// NOTE(pere): Sending moved_before == true is okay because we moved before but
|
||||
// if we were to set to false after starting a balance procedure, it might
|
||||
// leave undefined state.
|
||||
return_if_io!(cursor.insert(key.get_owned_value(), record, true));
|
||||
return_if_io!(cursor.insert(&BTreeKey::new_table_rowid(key as u64, Some(record)), true));
|
||||
}
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
@@ -3765,7 +3768,7 @@ pub fn op_idx_insert_async(
|
||||
flags.has(IdxInsertFlags::USE_SEEK)
|
||||
};
|
||||
// insert record as key
|
||||
return_if_io!(cursor.insert_index_key(record));
|
||||
return_if_io!(cursor.insert(&BTreeKey::new_index_key(record), moved_before));
|
||||
}
|
||||
state.pc += 1;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user