mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 17:05:36 +01:00
read overflow pages on demand
This commit is contained in:
@@ -140,6 +140,14 @@ enum WriteState {
|
||||
Finish,
|
||||
}
|
||||
|
||||
enum ReadPayloadOverlow {
|
||||
ProcessPage {
|
||||
payload: Vec<u8>,
|
||||
next_page: u32,
|
||||
remaining_to_read: usize,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct BalanceInfo {
|
||||
/// Old pages being balanced.
|
||||
@@ -174,6 +182,7 @@ impl WriteInfo {
|
||||
/// was suspended due to IO.
|
||||
enum CursorState {
|
||||
None,
|
||||
Read(ReadPayloadOverlow),
|
||||
Write(WriteInfo),
|
||||
Destroy(DestroyInfo),
|
||||
Delete(DeleteInfo),
|
||||
@@ -380,10 +389,23 @@ impl BTreeCursor {
|
||||
continue;
|
||||
}
|
||||
BTreeCell::TableLeafCell(TableLeafCell {
|
||||
_rowid, _payload, ..
|
||||
_rowid,
|
||||
_payload,
|
||||
first_overflow_page,
|
||||
payload_size,
|
||||
}) => {
|
||||
let record = if let Some(next_page) = first_overflow_page {
|
||||
if let Some(record) =
|
||||
self.process_overflow_read(_payload, next_page, payload_size)?
|
||||
{
|
||||
record
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
crate::storage::sqlite3_ondisk::read_record(_payload)?
|
||||
};
|
||||
self.stack.retreat();
|
||||
let record = crate::storage::sqlite3_ondisk::read_record(&_payload)?;
|
||||
return Ok(CursorResult::Ok((Some(_rowid), Some(record))));
|
||||
}
|
||||
BTreeCell::IndexInteriorCell(_) => todo!(),
|
||||
@@ -392,6 +414,63 @@ impl BTreeCursor {
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads the record of a cell that has overflow pages. This is a state machine that requires to be called until completion so everything
|
||||
/// that calls this function should be reentrant.
|
||||
fn process_overflow_read(
|
||||
&mut self,
|
||||
payload: &'static [u8],
|
||||
start_next_page: u32,
|
||||
payload_size: u64,
|
||||
) -> Result<Option<ImmutableRecord>> {
|
||||
let res = match &mut self.state {
|
||||
CursorState::None => {
|
||||
let page = self.pager.read_page(start_next_page as usize)?;
|
||||
self.stack.push(page);
|
||||
self.state = CursorState::Read(ReadPayloadOverlow::ProcessPage {
|
||||
payload: payload.to_vec(),
|
||||
next_page: start_next_page,
|
||||
remaining_to_read: payload_size as usize - payload.len(),
|
||||
});
|
||||
None
|
||||
}
|
||||
CursorState::Read(ReadPayloadOverlow::ProcessPage {
|
||||
payload,
|
||||
next_page,
|
||||
remaining_to_read,
|
||||
}) => {
|
||||
let page = self.stack.top();
|
||||
if page.is_locked() {
|
||||
return Ok(None);
|
||||
}
|
||||
self.stack.pop();
|
||||
let contents = page.get_contents();
|
||||
let next = contents.read_u32_no_offset(0);
|
||||
let buf = contents.as_ptr();
|
||||
let to_read = (*remaining_to_read).min(buf.len() - 4);
|
||||
payload.extend_from_slice(&buf[4..to_read]);
|
||||
*remaining_to_read -= to_read;
|
||||
if *remaining_to_read == 0 || next == 0 {
|
||||
assert!(
|
||||
*remaining_to_read == 0 && next == 0,
|
||||
"we can't have more pages to read while also have read everything"
|
||||
);
|
||||
let record = crate::storage::sqlite3_ondisk::read_record(&payload)?;
|
||||
Some(record)
|
||||
} else {
|
||||
let new_page = self.pager.read_page(next as usize)?;
|
||||
self.stack.push(new_page);
|
||||
*next_page = next;
|
||||
None
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
if res.is_some() {
|
||||
self.state = CursorState::None;
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Move the cursor to the next record and return it.
|
||||
/// Used in forwards iteration, which is the default.
|
||||
fn get_next_record(
|
||||
@@ -482,28 +561,49 @@ impl BTreeCursor {
|
||||
BTreeCell::TableLeafCell(TableLeafCell {
|
||||
_rowid,
|
||||
_payload,
|
||||
first_overflow_page: _,
|
||||
first_overflow_page,
|
||||
payload_size,
|
||||
}) => {
|
||||
assert!(predicate.is_none());
|
||||
let record = if let Some(next_page) = first_overflow_page {
|
||||
if let Some(record) =
|
||||
self.process_overflow_read(_payload, *next_page, *payload_size)?
|
||||
{
|
||||
record
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
crate::storage::sqlite3_ondisk::read_record(_payload)?
|
||||
};
|
||||
self.stack.advance();
|
||||
let record = crate::storage::sqlite3_ondisk::read_record(_payload)?;
|
||||
return Ok(CursorResult::Ok((Some(*_rowid), Some(record))));
|
||||
}
|
||||
BTreeCell::IndexInteriorCell(IndexInteriorCell {
|
||||
payload,
|
||||
left_child_page,
|
||||
..
|
||||
first_overflow_page,
|
||||
payload_size,
|
||||
}) => {
|
||||
if !self.going_upwards {
|
||||
let mem_page = self.pager.read_page(*left_child_page as usize)?;
|
||||
self.stack.push(mem_page);
|
||||
continue;
|
||||
}
|
||||
let record = if let Some(next_page) = first_overflow_page {
|
||||
if let Some(record) =
|
||||
self.process_overflow_read(payload, *next_page, *payload_size)?
|
||||
{
|
||||
record
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
crate::storage::sqlite3_ondisk::read_record(payload)?
|
||||
};
|
||||
|
||||
self.going_upwards = false;
|
||||
self.stack.advance();
|
||||
|
||||
let record = crate::storage::sqlite3_ondisk::read_record(payload)?;
|
||||
if predicate.is_none() {
|
||||
let rowid = match record.last_value() {
|
||||
Some(RefValue::Integer(rowid)) => *rowid as u64,
|
||||
@@ -533,9 +633,24 @@ impl BTreeCursor {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
BTreeCell::IndexLeafCell(IndexLeafCell { payload, .. }) => {
|
||||
BTreeCell::IndexLeafCell(IndexLeafCell {
|
||||
payload,
|
||||
first_overflow_page,
|
||||
payload_size,
|
||||
}) => {
|
||||
let record = if let Some(next_page) = first_overflow_page {
|
||||
if let Some(record) =
|
||||
self.process_overflow_read(payload, *next_page, *payload_size)?
|
||||
{
|
||||
record
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
crate::storage::sqlite3_ondisk::read_record(payload)?
|
||||
};
|
||||
|
||||
self.stack.advance();
|
||||
let record = crate::storage::sqlite3_ondisk::read_record(payload)?;
|
||||
if predicate.is_none() {
|
||||
let rowid = match record.last_value() {
|
||||
Some(RefValue::Integer(rowid)) => *rowid as u64,
|
||||
@@ -602,7 +717,8 @@ impl BTreeCursor {
|
||||
BTreeCell::TableLeafCell(TableLeafCell {
|
||||
_rowid: cell_rowid,
|
||||
_payload: payload,
|
||||
first_overflow_page: _,
|
||||
first_overflow_page,
|
||||
payload_size,
|
||||
}) => {
|
||||
let SeekKey::TableRowId(rowid_key) = key else {
|
||||
unreachable!("table seek key should be a rowid");
|
||||
@@ -612,17 +728,43 @@ impl BTreeCursor {
|
||||
SeekOp::GE => *cell_rowid >= rowid_key,
|
||||
SeekOp::EQ => *cell_rowid == rowid_key,
|
||||
};
|
||||
self.stack.advance();
|
||||
if found {
|
||||
let record = crate::storage::sqlite3_ondisk::read_record(payload)?;
|
||||
let record = if let Some(next_page) = first_overflow_page {
|
||||
if let Some(record) =
|
||||
self.process_overflow_read(payload, *next_page, *payload_size)?
|
||||
{
|
||||
record
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
crate::storage::sqlite3_ondisk::read_record(payload)?
|
||||
};
|
||||
self.stack.advance();
|
||||
return Ok(CursorResult::Ok((Some(*cell_rowid), Some(record))));
|
||||
} else {
|
||||
self.stack.advance();
|
||||
}
|
||||
}
|
||||
BTreeCell::IndexLeafCell(IndexLeafCell { payload, .. }) => {
|
||||
BTreeCell::IndexLeafCell(IndexLeafCell {
|
||||
payload,
|
||||
first_overflow_page,
|
||||
payload_size,
|
||||
}) => {
|
||||
let SeekKey::IndexKey(index_key) = key else {
|
||||
unreachable!("index seek key should be a record");
|
||||
};
|
||||
let record = crate::storage::sqlite3_ondisk::read_record(payload)?;
|
||||
let record = if let Some(next_page) = first_overflow_page {
|
||||
if let Some(record) =
|
||||
self.process_overflow_read(payload, *next_page, *payload_size)?
|
||||
{
|
||||
record
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
crate::storage::sqlite3_ondisk::read_record(payload)?
|
||||
};
|
||||
let order = compare_immutable_to_record(
|
||||
&record.get_values().as_slice()[..record.len() - 1],
|
||||
&index_key.get_values().as_slice()[..],
|
||||
@@ -784,6 +926,7 @@ impl BTreeCursor {
|
||||
_rowid: _,
|
||||
_payload: _,
|
||||
first_overflow_page: _,
|
||||
..
|
||||
}) => {
|
||||
unreachable!(
|
||||
"we don't iterate leaf cells while trying to move to a leaf cell"
|
||||
@@ -792,12 +935,23 @@ impl BTreeCursor {
|
||||
BTreeCell::IndexInteriorCell(IndexInteriorCell {
|
||||
left_child_page,
|
||||
payload,
|
||||
..
|
||||
first_overflow_page,
|
||||
payload_size,
|
||||
}) => {
|
||||
let SeekKey::IndexKey(index_key) = key else {
|
||||
unreachable!("index seek key should be a record");
|
||||
};
|
||||
let record = crate::storage::sqlite3_ondisk::read_record(payload)?;
|
||||
let record = if let Some(next_page) = first_overflow_page {
|
||||
if let Some(record) =
|
||||
self.process_overflow_read(payload, *next_page, *payload_size)?
|
||||
{
|
||||
record
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
crate::storage::sqlite3_ondisk::read_record(payload)?
|
||||
};
|
||||
let order = compare_record_to_immutable(
|
||||
&index_key.get_values(),
|
||||
&record.get_values(),
|
||||
@@ -4022,6 +4176,7 @@ mod tests {
|
||||
_rowid: 1,
|
||||
_payload: unsafe { transmute::<&[u8], &'static [u8]>(large_payload.as_slice()) },
|
||||
first_overflow_page: Some(2), // Point to first overflow page
|
||||
payload_size: large_payload.len() as u64,
|
||||
});
|
||||
|
||||
let initial_freelist_pages = db_header.lock().freelist_pages;
|
||||
@@ -4077,6 +4232,7 @@ mod tests {
|
||||
_rowid: 1,
|
||||
_payload: unsafe { transmute::<&[u8], &'static [u8]>(small_payload.as_slice()) },
|
||||
first_overflow_page: None,
|
||||
payload_size: small_payload.len() as u64,
|
||||
});
|
||||
|
||||
let initial_freelist_pages = db_header.lock().freelist_pages;
|
||||
|
||||
@@ -449,6 +449,11 @@ impl PageContent {
|
||||
u16::from_be_bytes([buf[pos], buf[pos + 1]])
|
||||
}
|
||||
|
||||
pub fn read_u32_no_offset(&self, pos: usize) -> u32 {
|
||||
let buf = self.as_ptr();
|
||||
u32::from_be_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]])
|
||||
}
|
||||
|
||||
pub fn read_u32(&self, pos: usize) -> u32 {
|
||||
let buf = self.as_ptr();
|
||||
read_u32(buf, self.offset + pos)
|
||||
@@ -815,7 +820,10 @@ pub struct TableInteriorCell {
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TableLeafCell {
|
||||
pub _rowid: u64,
|
||||
/// Payload of cell, if it overflows it won't include overflowed payload.
|
||||
pub _payload: &'static [u8],
|
||||
/// This is the complete payload size including overflow pages.
|
||||
pub payload_size: u64,
|
||||
pub first_overflow_page: Option<u32>,
|
||||
}
|
||||
|
||||
@@ -823,6 +831,8 @@ pub struct TableLeafCell {
|
||||
pub struct IndexInteriorCell {
|
||||
pub left_child_page: u32,
|
||||
pub payload: &'static [u8],
|
||||
/// This is the complete payload size including overflow pages.
|
||||
pub payload_size: u64,
|
||||
pub first_overflow_page: Option<u32>,
|
||||
}
|
||||
|
||||
@@ -830,6 +840,8 @@ pub struct IndexInteriorCell {
|
||||
pub struct IndexLeafCell {
|
||||
pub payload: &'static [u8],
|
||||
pub first_overflow_page: Option<u32>,
|
||||
/// This is the complete payload size including overflow pages.
|
||||
pub payload_size: u64,
|
||||
}
|
||||
|
||||
/// read_btree_cell contructs a BTreeCell which is basically a wrapper around pointer to the payload of a cell.
|
||||
@@ -861,6 +873,7 @@ pub fn read_btree_cell(
|
||||
left_child_page,
|
||||
payload,
|
||||
first_overflow_page,
|
||||
payload_size,
|
||||
}))
|
||||
}
|
||||
PageType::TableInterior => {
|
||||
@@ -888,6 +901,7 @@ pub fn read_btree_cell(
|
||||
Ok(BTreeCell::IndexLeafCell(IndexLeafCell {
|
||||
payload,
|
||||
first_overflow_page,
|
||||
payload_size,
|
||||
}))
|
||||
}
|
||||
PageType::TableLeaf => {
|
||||
@@ -907,6 +921,7 @@ pub fn read_btree_cell(
|
||||
_rowid: rowid,
|
||||
_payload: payload,
|
||||
first_overflow_page,
|
||||
payload_size,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user