From e6f8b34f2b6ffa6894e54bb710e23801ea37ea10 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 25 Jul 2024 01:19:40 +0200 Subject: [PATCH] core: insert_to_page almost complete --- core/btree.rs | 250 ++++++++++++++++++++++++++++++++++++--- core/pseudo.rs | 4 +- core/sqlite3_ondisk.rs | 5 +- core/translate/insert.rs | 6 +- core/translate/mod.rs | 12 +- core/translate/select.rs | 9 +- core/types.rs | 13 +- core/vdbe/builder.rs | 7 +- core/vdbe/mod.rs | 16 ++- core/vdbe/sorter.rs | 4 +- 10 files changed, 283 insertions(+), 43 deletions(-) diff --git a/core/btree.rs b/core/btree.rs index b9ccb89af..a498adafe 100644 --- a/core/btree.rs +++ b/core/btree.rs @@ -1,5 +1,8 @@ use crate::pager::Pager; -use crate::sqlite3_ondisk::{BTreeCell, BTreePage, TableInteriorCell, TableLeafCell}; +use crate::sqlite3_ondisk::{ + read_varint, write_varint, BTreeCell, BTreePage, DatabaseHeader, PageType, TableInteriorCell, + TableLeafCell, +}; use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue}; use crate::Result; @@ -38,10 +41,15 @@ pub struct BTreeCursor { rowid: RefCell>, record: RefCell>, null_flag: bool, + database_header: Rc>, } impl BTreeCursor { - pub fn new(pager: Rc, root_page: usize) -> Self { + pub fn new( + pager: Rc, + root_page: usize, + database_header: Rc>, + ) -> Self { Self { pager, root_page, @@ -49,6 +57,7 @@ impl BTreeCursor { rowid: RefCell::new(None), record: RefCell::new(None), null_flag: false, + database_header, } } @@ -205,26 +214,210 @@ impl BTreeCursor { }; let page_idx = mem_page.page_idx; let page = self.pager.read_page(page_idx)?; - let page = page.borrow(); + let page = page.borrow_mut(); if page.is_locked() { return Ok(CursorResult::IO); } page.set_dirty(); - let page = page.contents.read().unwrap(); - let page = page.as_ref().unwrap(); + let mut page = page.contents.write().unwrap(); + let page = page.as_mut().unwrap(); - let free = self.compute_free_space(page); - dbg!(free); + let free = self.compute_free_space(page, self.database_header.borrow()); + + // find cell + let int_key = match key { + OwnedValue::Integer(i) => *i as u64, + _ => unreachable!("btree tables are indexed by integers!"), + }; + let mut cell_idx = 0; + for cell in &page.cells { + match cell { + BTreeCell::TableLeafCell(cell) => { + if int_key >= cell._rowid { + break; + } + } + _ => todo!(), + } + cell_idx += 1; + } + + // if overwrite drop cell + + // insert cell + assert!(page.header.page_type == PageType::TableLeaf); + let mut payload: Vec = Vec::new(); + + { + // Data len will be prepended later + // Key + let mut key_varint: Vec = Vec::new(); + key_varint.extend(std::iter::repeat(0).take(9)); + let n = write_varint(&mut key_varint.as_mut_slice()[0..9], int_key); + write_varint(&mut key_varint, int_key); + key_varint.truncate(n); + payload.extend_from_slice(&key_varint); + } + + // Data payload + let payload_size_before_record = payload.len(); + _record.serialize(&mut payload); + let data_size = payload.len() - payload_size_before_record; + payload[0..8].copy_from_slice(&(data_size as u64).to_be_bytes()); + + { + // Data len + let mut data_len_varint: Vec = Vec::new(); + data_len_varint.extend(std::iter::repeat(0).take(9)); + let n = write_varint(&mut data_len_varint.as_mut_slice()[0..9], int_key); + write_varint(&mut data_len_varint, int_key); + data_len_varint.truncate(n); + payload.splice(0..0, data_len_varint.iter().cloned()); + } + + if payload.len() + 2 > free as usize { + // overflow or balance + todo!("overflow/balance"); + } else { + // insert + let pc = self.allocate_cell_space(page, payload.len() as u16); + let mut buf = page.buffer.borrow_mut(); + let mut buf = buf.as_mut_slice(); + buf[pc as usize..pc as usize + payload.len()].copy_from_slice(&payload); + // memmove(pIns+2, pIns, 2*(pPage->nCell - i)); + let pointer_area_pc_by_idx = 8 + 2 * cell_idx; + // move previous pointers forward and insert new pointer there + let n_cells_forward = 2 * (page.cells.len() - cell_idx); + buf.copy_within( + pointer_area_pc_by_idx..pointer_area_pc_by_idx + n_cells_forward, + pointer_area_pc_by_idx + 2, + ); + // TODo: refactor cells to be lazy loadable because this will be crazy slow + let mut payload_for_cell_in_memory: Vec = Vec::new(); + _record.serialize(&mut payload_for_cell_in_memory); + page.cells.insert( + cell_idx, + BTreeCell::TableLeafCell(TableLeafCell { + _rowid: int_key, + _payload: payload_for_cell_in_memory, + first_overflow_page: None, + }), + ); + + dbg!(pc); + } Ok(CursorResult::Ok(())) } - fn compute_free_space(&self, page: &BTreePage) -> u16 { + fn allocate_cell_space(&mut self, page_ref: &BTreePage, amount: u16) -> u16 { + let amount = amount as usize; + let mut page = page_ref.buffer.borrow_mut(); + let buf = page.as_mut_slice(); + + let cell_offset = 8; + let mut gap = cell_offset + 2 * page_ref.cells.len(); + let mut top = page_ref.header._cell_content_area as usize; + + dbg!(gap); + dbg!(top); + // there are free blocks and enough space + if page_ref.header._first_freeblock_offset != 0 && gap + 2 <= top { + // find slot + let db_header = self.database_header.borrow(); + let pc = find_free_cell(page_ref, db_header, amount, buf); + dbg!("found"); + dbg!(pc); + return pc as u16; + } + + if gap + 2 + amount as usize > top { + // defragment + self.defragment_page(page_ref, self.database_header.borrow()); + top = u16::from_be_bytes([buf[5], buf[6]]) as usize; + return 0; + } + + let db_header = self.database_header.borrow(); + top -= amount; + buf[5..7].copy_from_slice(&(top as u16).to_be_bytes()); + let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize; + assert!(top + amount <= usable_space); + return top as u16; + } + + fn defragment_page(&self, page: &BTreePage, db_header: Ref) { + let cloned_page = page.clone(); + let usable_space = (db_header.page_size - db_header.unused_space as u16) as u64; + let mut cbrk = usable_space as u64; + + // TODO: implement fast algorithm + + let last_cell = (usable_space - 4) as u64; + let first_cell = cloned_page.header._cell_content_area as u64; + if cloned_page.cells.len() > 0 { + let buf = cloned_page.buffer.borrow(); + let buf = buf.as_slice(); + let mut write_buf = page.buffer.borrow_mut(); + let write_buf = write_buf.as_mut_slice(); + + for i in 0..cloned_page.cells.len() { + let cell_offset = 8; + let cell_idx = cell_offset + i * 2; + + let pc = u16::from_be_bytes([buf[cell_idx], buf[cell_idx + 1]]) as u64; + if pc > last_cell { + unimplemented!("corrupted page"); + } + + assert!(pc <= last_cell); + + let size = match read_varint(&buf[pc as usize..pc as usize + 9]) { + Ok(v) => v.0, + Err(_) => todo!( + "error while parsing varint from cell, probably treat this as corruption?" + ), + }; + cbrk -= size; + if cbrk < first_cell as u64 || pc as u64 + size > usable_space as u64 { + todo!("corrupt"); + } + assert!(cbrk + size <= usable_space && cbrk >= first_cell); + // set new pointer + write_buf[cell_idx..cell_idx + 2].copy_from_slice(&cbrk.to_be_bytes()); + // copy payload + write_buf[cbrk as usize..cbrk as usize + size as usize] + .copy_from_slice(&buf[pc as usize..pc as usize + size as usize]); + } + } + + // assert!( nfree >= 0 ); + // if( data[hdr+7]+cbrk-iCellFirst!=pPage->nFree ){ + // return SQLITE_CORRUPT_PAGE(pPage); + // } + assert!(cbrk >= first_cell); + let mut write_buf = page.buffer.borrow_mut(); + let write_buf = write_buf.as_mut_slice(); + + // set new first byte of cell content + write_buf[5..7].copy_from_slice(&cbrk.to_be_bytes()); + // set free block to 0, unused spaced can be retrieved from gap between cell pointer end and content start + write_buf[1] = 0; + write_buf[2] = 0; + // set unused space to 0 + write_buf[first_cell as usize..first_cell as usize + cbrk as usize - first_cell as usize] + .fill(0); + } + + // Free blocks can be zero, meaning the "real free space" that can be used to allocate is expected to be between first cell byte + // and end of cell pointer area. + fn compute_free_space(&self, page: &BTreePage, db_header: Ref) -> u16 { let buffer = page.buffer.borrow(); let buf = buffer.as_slice(); + let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize; let mut first_byte_in_cell_content = page.header._cell_content_area; if first_byte_in_cell_content == 0 { first_byte_in_cell_content = u16::MAX; @@ -237,11 +430,8 @@ impl BTreeCursor { // 8 + 4 == header end let first_cell = 8 + 4 + (2 * ncell) as u16; - dbg!(first_byte_in_cell_content); - dbg!(fragmented_free_bytes); let mut nfree = fragmented_free_bytes as usize + first_byte_in_cell_content as usize; - dbg!(nfree); let mut pc = free_block_pointer as usize; if pc > 0 { let mut next = 0; @@ -263,26 +453,50 @@ impl BTreeCursor { } if next > 0 { - /* Freeblock not in ascending order */ todo!("corrupted page ascending order"); } - // if( pc+size>(unsigned int)usableSize ){ - // /* Last freeblock extends past page end */ - // todo!("corrupted page last freeblock extends last page end"); - // } + + if pc + size > usable_space { + todo!("corrupted page last freeblock extends last page end"); + } } // if( nFree>usableSize || nFreenFree = (u16)(nFree - iCellFirst); - // don't count header and cell pointers? nfree = nfree - first_cell as usize; return nfree as u16; } } +fn find_free_cell( + page_ref: &BTreePage, + db_header: Ref, + amount: usize, + buf: &[u8], +) -> usize { + // NOTE: freelist is in ascending order of keys and pc + // unuse_space is reserved bytes at the end of page, therefore we must substract from maxpc + let mut pc = page_ref.header._first_freeblock_offset as usize; + let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize; + let maxpc = (usable_space - amount as usize) as usize; + let mut found = false; + while pc <= maxpc { + let next = u16::from_be_bytes(buf[pc..pc + 2].try_into().unwrap()); + let size = u16::from_be_bytes(buf[pc + 2..pc + 4].try_into().unwrap()); + if amount <= size as usize { + found = true; + break; + } + pc = next as usize; + } + if !found { + unimplemented!("recover for fragmented space"); + } + pc +} + impl Cursor for BTreeCursor { fn is_empty(&self) -> bool { self.record.borrow().is_none() diff --git a/core/pseudo.rs b/core/pseudo.rs index 4d6b6e78a..92c69f7ee 100644 --- a/core/pseudo.rs +++ b/core/pseudo.rs @@ -50,9 +50,9 @@ impl Cursor for PseudoCursor { Ok(self.current.borrow()) } - fn insert(&mut self, record: &OwnedRecord) -> Result<()> { + fn insert(&mut self, key: &OwnedValue, record: &OwnedRecord) -> Result> { *self.current.borrow_mut() = Some(record.clone()); - Ok(()) + Ok(CursorResult::Ok(())) } fn get_null_flag(&self) -> bool { diff --git a/core/sqlite3_ondisk.rs b/core/sqlite3_ondisk.rs index ad721c058..9255257bd 100644 --- a/core/sqlite3_ondisk.rs +++ b/core/sqlite3_ondisk.rs @@ -48,7 +48,7 @@ pub struct DatabaseHeader { pub page_size: u16, write_version: u8, read_version: u8, - unused_space: u8, + pub unused_space: u8, max_embed_frac: u8, min_embed_frac: u8, min_leaf_frac: u8, @@ -194,6 +194,7 @@ pub struct BTreePageHeader { pub(crate) page_type: PageType, pub(crate) _first_freeblock_offset: u16, pub(crate) num_cells: u16, + // First byte of content area pub(crate) _cell_content_area: u16, pub(crate) _num_frag_free_bytes: u8, pub(crate) right_most_pointer: Option, @@ -573,7 +574,7 @@ pub fn read_value(buf: &[u8], serial_type: &SerialType) -> Result<(OwnedValue, u } } -fn read_varint(buf: &[u8]) -> Result<(u64, usize)> { +pub fn read_varint(buf: &[u8]) -> Result<(u64, usize)> { let mut v: u64 = 0; for i in 0..8 { match buf.get(i) { diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 70637ace1..7ada31f75 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -1,4 +1,4 @@ -use std::{ops::Deref, rc::Rc}; +use std::{cell::RefCell, ops::Deref, rc::Rc}; use sqlite3_parser::ast::{ DistinctNames, InsertBody, Name, QualifiedName, ResolveType, ResultColumn, Select, With, @@ -7,6 +7,7 @@ use sqlite3_parser::ast::{ use crate::Result; use crate::{ schema::{self, Schema, Table}, + sqlite3_ondisk::DatabaseHeader, translate::expr::{resolve_ident_qualified, translate_expr}, vdbe::{builder::ProgramBuilder, Insn, Program}, }; @@ -19,6 +20,7 @@ pub fn translate_insert( columns: &Option, body: &InsertBody, returning: &Option>, + database_header: Rc>, ) -> Result { assert!(with.is_none()); assert!(or_conflict.is_none()); @@ -193,5 +195,5 @@ pub fn translate_insert( target_pc: start_offset, }); program.resolve_deferred_labels(); - Ok(program.build()) + Ok(program.build(database_header)) } diff --git a/core/translate/mod.rs b/core/translate/mod.rs index 48ebe1746..280cbc372 100644 --- a/core/translate/mod.rs +++ b/core/translate/mod.rs @@ -58,7 +58,7 @@ pub fn translate( ast::Stmt::Savepoint(_) => bail_parse_error!("SAVEPOINT not supported yet"), ast::Stmt::Select(select) => { let select = prepare_select(schema, &select)?; - translate_select(select) + translate_select(select, database_header) } ast::Stmt::Update { .. } => bail_parse_error!("UPDATE not supported yet"), ast::Stmt::Vacuum(_, _) => bail_parse_error!("VACUUM not supported yet"), @@ -77,6 +77,7 @@ pub fn translate( &columns, &body, &returning, + database_header, ), } } @@ -124,7 +125,12 @@ fn translate_pragma( }, _ => 0, }; - update_pragma(&name.name.0, value_to_update, database_header, pager); + update_pragma( + &name.name.0, + value_to_update, + database_header.clone(), + pager, + ); } Some(ast::PragmaBody::Call(_)) => { todo!() @@ -138,7 +144,7 @@ fn translate_pragma( target_pc: start_offset, }); program.resolve_deferred_labels(); - Ok(program.build()) + Ok(program.build(database_header)) } fn update_pragma(name: &str, value: i64, header: Rc>, pager: Rc) { diff --git a/core/translate/select.rs b/core/translate/select.rs index 5e79039d5..154b91be5 100644 --- a/core/translate/select.rs +++ b/core/translate/select.rs @@ -1,5 +1,6 @@ use crate::function::{AggFunc, Func}; use crate::schema::{Column, PseudoTable, Schema, Table}; +use crate::sqlite3_ondisk::DatabaseHeader; use crate::translate::expr::{analyze_columns, maybe_apply_affinity, translate_expr}; use crate::translate::where_clause::{ process_where, translate_processed_where, translate_tableless_where, ProcessedWhereClause, @@ -11,6 +12,7 @@ use crate::Result; use sqlite3_parser::ast::{self, JoinOperator, JoinType, ResultColumn}; +use std::cell::RefCell; use std::rc::Rc; /// A representation of a `SELECT` statement that has all the information @@ -235,7 +237,10 @@ pub fn prepare_select<'a>(schema: &Schema, select: &'a ast::Select) -> Result Result { +pub fn translate_select( + mut select: Select, + database_header: Rc>, +) -> Result { let mut program = ProgramBuilder::new(); let init_label = program.allocate_label(); let early_terminate_label = program.allocate_label(); @@ -423,7 +428,7 @@ pub fn translate_select(mut select: Select) -> Result { target_pc: start_offset, }); program.resolve_deferred_labels(); - Ok(program.build()) + Ok(program.build(database_header)) } fn emit_limit_insn(limit_info: &Option, program: &mut ProgramBuilder) { diff --git a/core/types.rs b/core/types.rs index 2ced3f49d..dd130a9f8 100644 --- a/core/types.rs +++ b/core/types.rs @@ -305,8 +305,7 @@ impl OwnedRecord { Self { values } } - pub fn serialize(&self) -> Vec { - let mut buf: Vec = Vec::new(); + pub fn serialize(&self, buf: &mut Vec) { let mut header_bytes: usize = 0; let mut buf_i = 0; @@ -315,7 +314,7 @@ impl OwnedRecord { // ensure we have enough space for 9 bytes buf.extend(std::iter::repeat(0).take(9)); } - let n = write_varint(&mut buf.as_mut_slice()[buf_i..], value); + let n = write_varint(&mut buf.as_mut_slice()[buf_i..buf_i + 9], value); buf_i += n; return n; }; @@ -326,8 +325,8 @@ impl OwnedRecord { OwnedValue::Null => write_and_advance(0), OwnedValue::Integer(_) => write_and_advance(6), // for now let's only do i64 OwnedValue::Float(_) => write_and_advance(7), - OwnedValue::Text(t) => write_and_advance((t.len() * 2 - 12) as u64), - OwnedValue::Blob(b) => write_and_advance((b.len() * 2 - 13) as u64), + OwnedValue::Text(t) => write_and_advance((t.len() * 2 + 13) as u64), + OwnedValue::Blob(b) => write_and_advance((b.len() * 2 + 12) as u64), // not serializable values OwnedValue::Agg(_) => unreachable!(), OwnedValue::Record(_) => unreachable!(), @@ -340,7 +339,7 @@ impl OwnedRecord { // ensure we have enough space for data buf.extend(std::iter::repeat(0).take(data.len())); } - let n = buf.as_mut_slice()[buf_i..].clone_from_slice(data); + let n = buf.as_mut_slice()[buf_i..buf_i + data.len()].clone_from_slice(data); buf_i += data.len(); return n; }; @@ -374,8 +373,6 @@ impl OwnedRecord { // cleanup extra extends buf.truncate(buf_i); - - buf } } diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index 382bc8731..c1ddb380c 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -1,3 +1,7 @@ +use std::{cell::RefCell, rc::Rc}; + +use crate::sqlite3_ondisk::DatabaseHeader; + use super::{BranchOffset, CursorID, Insn, InsnReference, Program, Table}; pub struct ProgramBuilder { @@ -281,7 +285,7 @@ impl ProgramBuilder { self.deferred_label_resolutions.clear(); } - pub fn build(self) -> Program { + pub fn build(self, database_header: Rc>) -> Program { assert!( self.deferred_label_resolutions.is_empty(), "deferred_label_resolutions is not empty when build() is called, did you forget to call resolve_deferred_labels()?" @@ -294,6 +298,7 @@ impl ProgramBuilder { max_registers: self.next_free_register, insns: self.insns, cursor_ref: self.cursor_ref, + database_header, } } } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 929b8c66a..08a77c937 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -28,6 +28,7 @@ use crate::function::{AggFunc, ScalarFunc}; use crate::pager::Pager; use crate::pseudo::PseudoCursor; use crate::schema::Table; +use crate::sqlite3_ondisk::DatabaseHeader; use crate::types::{AggContext, Cursor, CursorResult, OwnedRecord, OwnedValue, Record}; use crate::Result; @@ -381,6 +382,7 @@ pub struct Program { pub max_registers: usize, pub insns: Vec, pub cursor_ref: Vec<(Option, Option)>, + pub database_header: Rc>, } impl Program { @@ -639,7 +641,11 @@ impl Program { cursor_id, root_page, } => { - let cursor = Box::new(BTreeCursor::new(pager.clone(), *root_page)); + let cursor = Box::new(BTreeCursor::new( + pager.clone(), + *root_page, + self.database_header.clone(), + )); cursors.insert(*cursor_id, cursor); state.pc += 1; } @@ -1056,7 +1062,7 @@ impl Program { }; state.registers[*dest_reg] = OwnedValue::Record(record.clone()); let sorter_cursor = cursors.get_mut(sorter_cursor).unwrap(); - sorter_cursor.insert(&record)?; + sorter_cursor.insert(&OwnedValue::Integer(0), &record)?; // fix key later state.pc += 1; } Insn::SorterInsert { @@ -1353,7 +1359,11 @@ impl Program { cursor_id, root_page, } => { - let cursor = Box::new(BTreeCursor::new(pager.clone(), *root_page)); + let cursor = Box::new(BTreeCursor::new( + pager.clone(), + *root_page, + self.database_header.clone(), + )); cursors.insert(*cursor_id, cursor); state.pc += 1; } diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 70cff752c..951c56615 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -79,11 +79,11 @@ impl Cursor for Sorter { Ok(self.current.borrow()) } - fn insert(&mut self, record: &OwnedRecord) -> Result<()> { + fn insert(&mut self, key: &OwnedValue, record: &OwnedRecord) -> Result> { let key_fields = self.order.len(); let key = OwnedRecord::new(record.values[0..key_fields].to_vec()); self.insert(key, OwnedRecord::new(record.values[key_fields..].to_vec())); - Ok(()) + Ok(CursorResult::Ok(())) } fn set_null_flag(&mut self, _flag: bool) {