Implement full table scan

This commit is contained in:
Pekka Enberg
2023-09-07 18:45:34 +03:00
parent e75fa010dc
commit 0a942f0fd0

View File

@@ -1,5 +1,5 @@
use crate::pager::Pager;
use crate::sqlite3_ondisk::{BTreeCell, BTreePage, TableLeafCell};
use crate::sqlite3_ondisk::{BTreeCell, BTreePage, TableInteriorCell, TableLeafCell};
use crate::types::Record;
use anyhow::Result;
@@ -7,12 +7,36 @@ use anyhow::Result;
use std::cell::{Ref, RefCell};
use std::sync::Arc;
pub struct MemPage {
parent: Option<Arc<MemPage>>,
page_idx: usize,
cell_idx: RefCell<usize>,
}
impl MemPage {
pub fn new(parent: Option<Arc<MemPage>>, page_idx: usize, cell_idx: usize) -> Self {
Self {
parent,
page_idx,
cell_idx: RefCell::new(cell_idx),
}
}
pub fn cell_idx(&self) -> usize {
*self.cell_idx.borrow()
}
pub fn advance(&self) {
let mut cell_idx = self.cell_idx.borrow_mut();
*cell_idx += 1;
}
}
pub struct Cursor {
pager: Arc<Pager>,
root_page: usize,
page: RefCell<Option<Arc<BTreePage>>>,
page: RefCell<Option<Arc<MemPage>>>,
record: RefCell<Option<Record>>,
cell_idx: usize,
}
impl Cursor {
@@ -22,7 +46,6 @@ impl Cursor {
root_page,
page: RefCell::new(None),
record: RefCell::new(None),
cell_idx: 0,
}
}
@@ -31,8 +54,8 @@ impl Cursor {
}
pub fn rewind(&mut self) -> Result<()> {
self.page
.replace(Some(self.pager.read_page(self.root_page)?));
let mem_page = MemPage::new(None, self.root_page, 0);
self.page.replace(Some(Arc::new(mem_page)));
let record = self.get_next_record()?;
self.record.replace(record);
Ok(())
@@ -59,25 +82,41 @@ impl Cursor {
}
fn get_next_record(&mut self) -> Result<Option<Record>> {
match self.page.borrow_mut().as_mut() {
Some(page) => {
if self.cell_idx < page.cells.len() {
let cell = &page.cells[self.cell_idx];
self.cell_idx += 1;
match &cell {
BTreeCell::TableInteriorCell(_) => {
todo!();
}
BTreeCell::TableLeafCell(TableLeafCell { _rowid, _payload }) => {
let record = crate::sqlite3_ondisk::read_record(_payload)?;
Ok(Some(record))
}
loop {
let mem_page = {
let mem_page = self.page.borrow();
let mem_page = mem_page.as_ref().unwrap();
mem_page.clone()
};
let page_idx = mem_page.page_idx;
let page = self.pager.read_page(page_idx)?;
if mem_page.cell_idx() >= page.cells.len() {
match mem_page.parent {
Some(ref parent) => {
self.page.replace(Some(parent.clone()));
continue;
}
} else {
Ok(None)
None => return Ok(None),
}
}
let cell = &page.cells[mem_page.cell_idx()];
match &cell {
BTreeCell::TableInteriorCell(TableInteriorCell {
_left_child_page,
_rowid,
}) => {
mem_page.advance();
let mem_page =
MemPage::new(Some(mem_page.clone()), *_left_child_page as usize, 0);
self.page.replace(Some(Arc::new(mem_page)));
continue;
}
BTreeCell::TableLeafCell(TableLeafCell { _rowid, _payload }) => {
mem_page.advance();
let record = crate::sqlite3_ondisk::read_record(_payload)?;
return Ok(Some(record));
}
}
None => Ok(None),
}
}
}