This commit is contained in:
jussisaurio
2024-11-13 23:29:09 +02:00
parent 6dc427fc9b
commit 3002f2a552
2 changed files with 115 additions and 210 deletions

View File

@@ -38,6 +38,25 @@ const BTREE_HEADER_OFFSET_RIGHTMOST: usize = 8; /* if internalnode, pointer righ
*/
pub const BTCURSOR_MAX_DEPTH: usize = 20;
/// Evaluate a Result<CursorResult<T>>, if IO return IO.
macro_rules! io {
($expr:expr) => {
match $expr? {
CursorResult::Ok(v) => v,
CursorResult::IO => return Ok(CursorResult::IO),
}
};
}
/// Check if the page is unlocked, if not return IO.
macro_rules! unlocked {
($expr:expr) => {{
if $expr.is_locked() {
return Ok(CursorResult::IO);
}
}};
}
#[derive(Debug)]
enum WriteState {
Start,
@@ -126,9 +145,7 @@ impl BTreeCursor {
fn is_empty_table(&mut self) -> Result<CursorResult<bool>> {
let page = self.pager.read_page(self.root_page)?;
let page = RefCell::borrow(&page);
if page.is_locked() {
return Ok(CursorResult::IO);
}
unlocked!(page);
let cell_count = page.contents.as_ref().unwrap().cell_count();
Ok(CursorResult::Ok(cell_count == 0))
@@ -224,9 +241,7 @@ impl BTreeCursor {
let cell_idx = self.stack.current_index() as usize;
debug!("current id={} cell={}", mem_page_rc.borrow().id, cell_idx);
if mem_page_rc.borrow().is_locked() {
return Ok(CursorResult::IO);
}
unlocked!(mem_page_rc.borrow());
if !mem_page_rc.borrow().is_loaded() {
self.pager.load_page(mem_page_rc.clone())?;
return Ok(CursorResult::IO);
@@ -380,17 +395,12 @@ impl BTreeCursor {
key: SeekKey<'_>,
op: SeekOp,
) -> Result<CursorResult<(Option<u64>, Option<OwnedRecord>)>> {
match self.move_to(key.clone(), op.clone())? {
CursorResult::Ok(_) => {}
CursorResult::IO => return Ok(CursorResult::IO),
};
io!(self.move_to(key.clone(), op.clone()));
{
let page_rc = self.stack.top();
let page = page_rc.borrow();
if page.is_locked() {
return Ok(CursorResult::IO);
}
unlocked!(page);
let contents = page.contents.as_ref().unwrap();
@@ -485,9 +495,7 @@ impl BTreeCursor {
let page_idx = mem_page.borrow().id;
let page = self.pager.read_page(page_idx)?;
let page = RefCell::borrow(&page);
if page.is_locked() {
return Ok(CursorResult::IO);
}
unlocked!(page);
let contents = page.contents.as_ref().unwrap();
if contents.is_leaf() {
if contents.cell_count() > 0 {
@@ -540,9 +548,7 @@ impl BTreeCursor {
loop {
let page_rc = self.stack.top();
let page = RefCell::borrow(&page_rc);
if page.is_locked() {
return Ok(CursorResult::IO);
}
unlocked!(page);
let contents = page.contents.as_ref().unwrap();
if contents.is_leaf() {
@@ -653,9 +659,7 @@ impl BTreeCursor {
// get page and find cell
let (cell_idx, page_type) = {
let mut page = page_ref.borrow_mut();
if page.is_locked() {
return Ok(CursorResult::IO);
}
unlocked!(page);
page.set_dirty();
self.pager.add_dirty(page.id);
@@ -695,10 +699,7 @@ impl BTreeCursor {
WriteState::BalanceStart
| WriteState::BalanceMoveUp
| WriteState::BalanceGetParentPage => {
let res = self.balance_leaf()?;
if matches!(res, CursorResult::IO) {
return Ok(res);
}
io!(self.balance_leaf());
}
WriteState::Finish => {
self.write_info.state = WriteState::Start;
@@ -912,20 +913,16 @@ impl BTreeCursor {
WriteState::BalanceGetParentPage => {
let parent_rc = self.stack.parent();
let loaded = parent_rc.borrow().is_loaded();
let locked = parent_rc.borrow().is_locked();
unlocked!(parent_rc.borrow());
if locked {
Ok(CursorResult::IO)
} else {
if !loaded {
debug!("balance_leaf(loading page {} {})", locked, loaded);
self.pager.load_page(parent_rc.clone())?;
return Ok(CursorResult::IO);
}
parent_rc.borrow_mut().set_dirty();
self.write_info.state = WriteState::BalanceMoveUp;
Ok(CursorResult::Ok(()))
if !loaded {
debug!("balance_leaf(loading page)");
self.pager.load_page(parent_rc.clone())?;
return Ok(CursorResult::IO);
}
parent_rc.borrow_mut().set_dirty();
self.write_info.state = WriteState::BalanceMoveUp;
Ok(CursorResult::Ok(()))
}
WriteState::BalanceMoveUp => {
let parent_ref = self.stack.parent();
@@ -1692,23 +1689,16 @@ fn find_free_cell(page_ref: &PageContent, db_header: Ref<DatabaseHeader>, amount
impl Cursor for BTreeCursor {
fn seek_to_last(&mut self) -> Result<CursorResult<()>> {
self.move_to_rightmost()?;
match self.get_next_record(None)? {
CursorResult::Ok((rowid, next)) => {
if rowid.is_none() {
match self.is_empty_table()? {
CursorResult::Ok(is_empty) => {
assert!(is_empty)
}
CursorResult::IO => (),
}
}
self.rowid.replace(rowid);
self.record.replace(next);
Ok(CursorResult::Ok(()))
}
CursorResult::IO => Ok(CursorResult::IO),
io!(self.move_to_rightmost());
let (rowid, record) = io!(self.get_next_record(None));
if rowid.is_none() {
let is_empty = io!(self.is_empty_table());
assert!(is_empty);
return Ok(CursorResult::Ok(()));
}
self.rowid.replace(rowid);
self.record.replace(record);
Ok(CursorResult::Ok(()))
}
fn is_empty(&self) -> bool {
@@ -1718,14 +1708,10 @@ impl Cursor for BTreeCursor {
fn rewind(&mut self) -> Result<CursorResult<()>> {
self.move_to_root();
match self.get_next_record(None)? {
CursorResult::Ok((rowid, next)) => {
self.rowid.replace(rowid);
self.record.replace(next);
Ok(CursorResult::Ok(()))
}
CursorResult::IO => Ok(CursorResult::IO),
}
let (rowid, record) = io!(self.get_next_record(None));
self.rowid.replace(rowid);
self.record.replace(record);
Ok(CursorResult::Ok(()))
}
fn last(&mut self) -> Result<CursorResult<()>> {
@@ -1736,14 +1722,10 @@ impl Cursor for BTreeCursor {
}
fn next(&mut self) -> Result<CursorResult<()>> {
match self.get_next_record(None)? {
CursorResult::Ok((rowid, next)) => {
self.rowid.replace(rowid);
self.record.replace(next);
Ok(CursorResult::Ok(()))
}
CursorResult::IO => Ok(CursorResult::IO),
}
let (rowid, record) = io!(self.get_next_record(None));
self.rowid.replace(rowid);
self.record.replace(record);
Ok(CursorResult::Ok(()))
}
fn prev(&mut self) -> Result<CursorResult<()>> {
@@ -1767,14 +1749,10 @@ impl Cursor for BTreeCursor {
}
fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result<CursorResult<bool>> {
match self.seek(key, op)? {
CursorResult::Ok((rowid, record)) => {
self.rowid.replace(rowid);
self.record.replace(record);
Ok(CursorResult::Ok(rowid.is_some()))
}
CursorResult::IO => Ok(CursorResult::IO),
}
let (rowid, record) = io!(self.seek(key, op));
self.rowid.replace(rowid);
self.record.replace(record);
Ok(CursorResult::Ok(rowid.is_some()))
}
fn record(&self) -> Result<Ref<Option<OwnedRecord>>> {
@@ -1792,16 +1770,11 @@ impl Cursor for BTreeCursor {
_ => unreachable!("btree tables are indexed by integers!"),
};
if !moved_before {
match self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ)? {
CursorResult::Ok(_) => {}
CursorResult::IO => return Ok(CursorResult::IO),
};
io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ));
}
match self.insert_into_page(key, _record)? {
CursorResult::Ok(_) => Ok(CursorResult::Ok(())),
CursorResult::IO => Ok(CursorResult::IO),
}
io!(self.insert_into_page(key, _record));
Ok(CursorResult::Ok(()))
}
fn set_null_flag(&mut self, flag: bool) {
@@ -1817,16 +1790,11 @@ impl Cursor for BTreeCursor {
OwnedValue::Integer(i) => i,
_ => unreachable!("btree tables are indexed by integers!"),
};
match self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ)? {
CursorResult::Ok(_) => {}
CursorResult::IO => return Ok(CursorResult::IO),
};
io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ));
let page_ref = self.stack.top();
let page = page_ref.borrow();
if page.is_locked() {
// TODO(pere); request load
return Ok(CursorResult::IO);
}
// TODO(pere): request load
unlocked!(page);
let contents = page.contents.as_ref().unwrap();

View File

@@ -533,6 +533,17 @@ pub enum StepResult<'a> {
Row(Record<'a>),
}
/// If there is I/O, the instruction is restarted.
/// Evaluate a Result<CursorResult<T>>, if IO return Ok(StepResult::IO).
macro_rules! io {
($expr:expr) => {
match $expr? {
CursorResult::Ok(v) => v,
CursorResult::IO => return Ok(StepResult::IO),
}
};
}
struct RegexCache {
like: HashMap<String, Regex>,
glob: HashMap<String, Regex>,
@@ -1102,13 +1113,7 @@ impl Program {
}
Insn::RewindAsync { cursor_id } => {
let cursor = cursors.get_mut(cursor_id).unwrap();
match cursor.rewind()? {
CursorResult::Ok(()) => {}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
}
io!(cursor.rewind());
state.pc += 1;
}
Insn::LastAsync { cursor_id } => {
@@ -1194,13 +1199,7 @@ impl Program {
Insn::NextAsync { cursor_id } => {
let cursor = cursors.get_mut(cursor_id).unwrap();
cursor.set_null_flag(false);
match cursor.next()? {
CursorResult::Ok(_) => {}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
}
io!(cursor.next());
state.pc += 1;
}
Insn::PrevAsync { cursor_id } => {
@@ -1386,18 +1385,11 @@ impl Program {
));
}
};
match cursor.seek(SeekKey::TableRowId(rowid), SeekOp::EQ)? {
CursorResult::Ok(found) => {
if !found {
state.pc = *target_pc;
} else {
state.pc += 1;
}
}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
let found = io!(cursor.seek(SeekKey::TableRowId(rowid), SeekOp::EQ));
if !found {
state.pc = *target_pc;
} else {
state.pc += 1;
}
}
Insn::DeferredSeek {
@@ -1418,31 +1410,19 @@ impl Program {
let cursor = cursors.get_mut(cursor_id).unwrap();
let record_from_regs: OwnedRecord =
make_owned_record(&state.registers, start_reg, num_regs);
match cursor.seek(SeekKey::IndexKey(&record_from_regs), SeekOp::GE)? {
CursorResult::Ok(found) => {
if !found {
state.pc = *target_pc;
} else {
state.pc += 1;
}
}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
let found =
io!(cursor.seek(SeekKey::IndexKey(&record_from_regs), SeekOp::GE));
if !found {
state.pc = *target_pc;
} else {
state.pc += 1;
}
} else {
let cursor = cursors.get_mut(cursor_id).unwrap();
let rowid = match &state.registers[*start_reg] {
OwnedValue::Null => {
// All integer values are greater than null so we just rewind the cursor
match cursor.rewind()? {
CursorResult::Ok(()) => {}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
}
io!(cursor.rewind());
state.pc += 1;
continue;
}
@@ -1453,18 +1433,11 @@ impl Program {
));
}
};
match cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GE)? {
CursorResult::Ok(found) => {
if !found {
state.pc = *target_pc;
} else {
state.pc += 1;
}
}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
let found = io!(cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GE));
if !found {
state.pc = *target_pc;
} else {
state.pc += 1;
}
}
}
@@ -1479,31 +1452,19 @@ impl Program {
let cursor = cursors.get_mut(cursor_id).unwrap();
let record_from_regs: OwnedRecord =
make_owned_record(&state.registers, start_reg, num_regs);
match cursor.seek(SeekKey::IndexKey(&record_from_regs), SeekOp::GT)? {
CursorResult::Ok(found) => {
if !found {
state.pc = *target_pc;
} else {
state.pc += 1;
}
}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
let found =
io!(cursor.seek(SeekKey::IndexKey(&record_from_regs), SeekOp::GT));
if !found {
state.pc = *target_pc;
} else {
state.pc += 1;
}
} else {
let cursor = cursors.get_mut(cursor_id).unwrap();
let rowid = match &state.registers[*start_reg] {
OwnedValue::Null => {
// All integer values are greater than null so we just rewind the cursor
match cursor.rewind()? {
CursorResult::Ok(()) => {}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
}
io!(cursor.rewind());
state.pc += 1;
continue;
}
@@ -1514,18 +1475,11 @@ impl Program {
));
}
};
match cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GT)? {
CursorResult::Ok(found) => {
if !found {
state.pc = *target_pc;
} else {
state.pc += 1;
}
}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
let found = io!(cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GT));
if !found {
state.pc = *target_pc;
} else {
state.pc += 1;
}
}
}
@@ -1892,13 +1846,7 @@ impl Program {
} => {
assert!(*pc_if_next >= 0);
let cursor = cursors.get_mut(cursor_id).unwrap();
match cursor.next()? {
CursorResult::Ok(_) => {}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
}
io!(cursor.next());
if !cursor.is_empty() {
state.pc = *pc_if_next;
} else {
@@ -2175,15 +2123,8 @@ impl Program {
_ => unreachable!("Not a record! Cannot insert a non record value."),
};
let key = &state.registers[*key_reg];
match cursor.insert(key, record, true)? {
CursorResult::Ok(_) => {
state.pc += 1;
}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
}
io!(cursor.insert(key, record, true));
state.pc += 1;
}
Insn::InsertAwait { cursor_id } => {
let cursor = cursors.get_mut(cursor_id).unwrap();
@@ -2195,13 +2136,8 @@ impl Program {
} => {
let cursor = cursors.get_mut(cursor).unwrap();
// TODO: make io handle rng
let rowid = get_new_rowid(cursor, thread_rng())?;
match rowid {
CursorResult::Ok(rowid) => {
state.registers[*rowid_reg] = OwnedValue::Integer(rowid);
}
CursorResult::IO => return Ok(StepResult::IO),
}
let rowid = io!(get_new_rowid(cursor, thread_rng()));
state.registers[*rowid_reg] = OwnedValue::Integer(rowid);
state.pc += 1;
}
Insn::MustBeInt { reg } => {
@@ -2225,11 +2161,12 @@ impl Program {
target_pc,
} => {
let cursor = cursors.get_mut(cursor).unwrap();
match cursor.exists(&state.registers[*rowid_reg])? {
CursorResult::Ok(true) => state.pc += 1,
CursorResult::Ok(false) => state.pc = *target_pc,
CursorResult::IO => return Ok(StepResult::IO),
};
let exists = io!(cursor.exists(&state.registers[*rowid_reg]));
if exists {
state.pc += 1;
} else {
state.pc = *target_pc;
}
}
// this cursor may be reused for next insert
// Update: tablemoveto is used to travers on not exists, on insert depending on flags if nonseek it traverses again.