From d795a7a3baab97bd2e9c57c2a6f70df9837bebf8 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Sat, 15 Jun 2024 17:01:48 +0200 Subject: [PATCH 1/6] core: introduce pseudo program with pragma MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduced pragma statement parsing and update in memory of default page cache size. There are some more "improvements" to the print insn procedure — I couldn't decide what was the preferred way in rust to do printing on different int types so I went with the stupidest I could think of at the moment. --- core/lib.rs | 54 ++++++++++-- core/sqlite3_ondisk.rs | 10 ++- core/translate.rs | 63 +++++++++++++- core/vdbe.rs | 190 ++++++++++++++++++++++++++++++++--------- 4 files changed, 262 insertions(+), 55 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index f9ff186e4..ceda27f3a 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -18,8 +18,10 @@ use fallible_iterator::FallibleIterator; use log::trace; use pager::Pager; use schema::Schema; +use sqlite3_ondisk::DatabaseHeader; use sqlite3_parser::{ast::Cmd, lexer::sql::Parser}; -use std::rc::Rc; +use std::{cell::RefCell, rc::Rc}; +use vdbe::ProgramType; #[cfg(feature = "fs")] pub use io::PlatformIO; @@ -30,6 +32,7 @@ pub use types::Value; pub struct Database { pager: Rc, schema: Rc, + header: Rc>, } impl Database { @@ -43,11 +46,12 @@ impl Database { pub fn open(io: Rc, page_source: PageSource) -> Result { let db_header = Pager::begin_open(&page_source)?; io.run_once()?; - let pager = Rc::new(Pager::finish_open(db_header, page_source)?); + let pager = Rc::new(Pager::finish_open(db_header.clone(), page_source)?); let bootstrap_schema = Rc::new(Schema::new()); let conn = Connection { pager: pager.clone(), schema: bootstrap_schema.clone(), + header: db_header.clone(), }; let mut schema = Schema::new(); let rows = conn.query("SELECT * FROM sqlite_schema")?; @@ -74,13 +78,19 @@ impl Database { } } let schema = Rc::new(schema); - Ok(Database { pager, schema }) + let header = db_header; + Ok(Database { + pager, + schema, + header, + }) } pub fn connect(&self) -> Connection { Connection { pager: self.pager.clone(), schema: self.schema.clone(), + header: self.header.clone(), } } } @@ -88,6 +98,7 @@ impl Database { pub struct Connection { pager: Rc, schema: Rc, + header: Rc>, } impl Connection { @@ -99,7 +110,11 @@ impl Connection { if let Some(cmd) = cmd { match cmd { Cmd::Stmt(stmt) => { - let program = Rc::new(translate::translate(&self.schema, stmt)?); + let program = Rc::new(translate::translate( + &self.schema, + stmt, + &self.header.borrow(), + )?); Ok(Statement::new(program, self.pager.clone())) } Cmd::Explain(_stmt) => todo!(), @@ -110,6 +125,22 @@ impl Connection { } } + pub fn update_pragma(&self, name: &String, value: i64) { + match name.as_str() { + "cache_size" => { + // update in disk + + // update in-memory header + self.header.borrow_mut().default_cache_size = value + .try_into() + .expect(&format!("invalid value, too big for a i32 {}", value)); + + // update cache size + } + _ => todo!(), + } + } + pub fn query(&self, sql: impl Into) -> Result> { let sql = sql.into(); trace!("Querying: {}", sql); @@ -118,12 +149,19 @@ impl Connection { if let Some(cmd) = cmd { match cmd { Cmd::Stmt(stmt) => { - let program = Rc::new(translate::translate(&self.schema, stmt)?); + let program = Rc::new(translate::translate( + &self.schema, + stmt, + &self.header.borrow(), + )?); + if let ProgramType::PragmaChange(name, value) = &program.program_type { + self.update_pragma(name, *value); + } let stmt = Statement::new(program, self.pager.clone()); Ok(Some(Rows { stmt })) } Cmd::Explain(stmt) => { - let program = translate::translate(&self.schema, stmt)?; + let program = translate::translate(&self.schema, stmt, &self.header.borrow())?; program.explain(); Ok(None) } @@ -141,12 +179,12 @@ impl Connection { if let Some(cmd) = cmd { match cmd { Cmd::Explain(stmt) => { - let program = translate::translate(&self.schema, stmt)?; + let program = translate::translate(&self.schema, stmt, &self.header.borrow())?; program.explain(); } Cmd::ExplainQueryPlan(_stmt) => todo!(), Cmd::Stmt(stmt) => { - let program = translate::translate(&self.schema, stmt)?; + let program = translate::translate(&self.schema, stmt, &self.header.borrow())?; let mut state = vdbe::ProgramState::new(program.max_registers); program.step(&mut state, self.pager.clone())?; } diff --git a/core/sqlite3_ondisk.rs b/core/sqlite3_ondisk.rs index 94516dbfb..7950d36c2 100644 --- a/core/sqlite3_ondisk.rs +++ b/core/sqlite3_ondisk.rs @@ -18,7 +18,7 @@ /// +-----------------+----------------+---------------------+----------------+ /// | | | | | /// | Page header | Cell pointer | Unallocated | Cell content | -/// | (8 or 12 bytes) | array | space | area | +/// | (8 or 12 bytes) | array | space | area | /// | | | | | /// +-----------------+----------------+---------------------+----------------+ /// @@ -35,6 +35,7 @@ use std::rc::Rc; /// The size of the database header in bytes. pub const DATABASE_HEADER_SIZE: usize = 100; +const DEFAULT_CACHE_SIZE: i32 = -2000; #[derive(Debug, Default)] pub struct DatabaseHeader { @@ -52,7 +53,7 @@ pub struct DatabaseHeader { freelist_pages: u32, schema_cookie: u32, schema_format: u32, - default_cache_size: u32, + pub default_cache_size: i32, vacuum: u32, text_encoding: u32, user_version: u32, @@ -94,7 +95,10 @@ fn finish_read_database_header(buf: &Buffer, header: Rc> header.freelist_pages = u32::from_be_bytes([buf[36], buf[37], buf[38], buf[39]]); header.schema_cookie = u32::from_be_bytes([buf[40], buf[41], buf[42], buf[43]]); header.schema_format = u32::from_be_bytes([buf[44], buf[45], buf[46], buf[47]]); - header.default_cache_size = u32::from_be_bytes([buf[48], buf[49], buf[50], buf[51]]); + header.default_cache_size = i32::from_be_bytes([buf[48], buf[49], buf[50], buf[51]]); + if header.default_cache_size == 0 { + header.default_cache_size = DEFAULT_CACHE_SIZE; + } header.vacuum = u32::from_be_bytes([buf[52], buf[53], buf[54], buf[55]]); header.text_encoding = u32::from_be_bytes([buf[56], buf[57], buf[58], buf[59]]); header.user_version = u32::from_be_bytes([buf[60], buf[61], buf[62], buf[63]]); diff --git a/core/translate.rs b/core/translate.rs index 482a51616..5f4aab6c4 100644 --- a/core/translate.rs +++ b/core/translate.rs @@ -1,12 +1,16 @@ +use std::borrow::Borrow; + use crate::schema::Schema; +use crate::sqlite3_ondisk::DatabaseHeader; use crate::vdbe::{Insn, Program, ProgramBuilder}; use anyhow::Result; -use sqlite3_parser::ast::{Expr, Literal, OneSelect, Select, Stmt}; +use sqlite3_parser::ast::{Expr, Literal, OneSelect, PragmaBody, QualifiedName, Select, Stmt}; /// Translate SQL statement into bytecode program. -pub fn translate(schema: &Schema, stmt: Stmt) -> Result { +pub fn translate(schema: &Schema, stmt: Stmt, database_header: &DatabaseHeader) -> Result { match stmt { Stmt::Select(select) => translate_select(schema, select), + Stmt::Pragma(name, body) => translate_pragma(&name, body, database_header), _ => todo!(), } } @@ -218,3 +222,58 @@ fn translate_expr( Expr::Variable(_) => todo!(), } } + +fn translate_pragma( + name: &QualifiedName, + body: Option, + database_header: &DatabaseHeader, +) -> Result { + let mut program = ProgramBuilder::new(); + let init_offset = program.emit_placeholder(); + let start_offset = program.offset(); + let (change_pragma, new_value) = match body { + None => { + let pragma_result = program.alloc_register(); + + program.emit_insn(Insn::Integer { + value: database_header.default_cache_size.into(), + dest: pragma_result, + }); + + let pragma_result_end = program.next_free_register(); + program.emit_insn(Insn::ResultRow { + register_start: pragma_result, + register_end: pragma_result_end, + }); + (false, 0) + } + Some(PragmaBody::Equals(value)) => { + let value_to_update = if let Expr::Literal(Literal::Numeric(numeric_value)) = value { + numeric_value.parse::().unwrap() + } else { + // If you put gibberish into a pragma update it turns it into 0 I think + 0 + }; + println!("{:?}", value_to_update); + (true, value_to_update) + } + Some(PragmaBody::Call(_)) => { + todo!() + } + }; + program.emit_insn(Insn::Halt); + program.fixup_insn( + init_offset, + Insn::Init { + target_pc: program.offset(), + }, + ); + program.emit_insn(Insn::Transaction); + program.emit_insn(Insn::Goto { + target_pc: start_offset, + }); + if change_pragma { + return Ok(program.build_pragma_change(name.name.to_string(), new_value)); + } + Ok(program.build()) +} diff --git a/core/vdbe.rs b/core/vdbe.rs index 0df9d9753..e439e95eb 100644 --- a/core/vdbe.rs +++ b/core/vdbe.rs @@ -3,6 +3,7 @@ use crate::pager::Pager; use crate::types::{Cursor, CursorResult, OwnedValue, Record}; use anyhow::Result; +use core::fmt; use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::Rc; @@ -154,6 +155,15 @@ impl ProgramBuilder { Program { max_registers: self.next_free_register, insns: self.insns, + program_type: ProgramType::Default, + } + } + + pub fn build_pragma_change(self, pragma_to_change: String, value: i64) -> Program { + Program { + max_registers: self.next_free_register, + insns: self.insns, + program_type: ProgramType::PragmaChange(pragma_to_change, value), } } } @@ -192,9 +202,15 @@ impl ProgramState { } } +pub enum ProgramType { + Default, + PragmaChange(String, i64), +} + pub struct Program { pub max_registers: usize, pub insns: Vec, + pub program_type: ProgramType, } impl Program { @@ -363,15 +379,37 @@ fn print_insn(addr: usize, insn: &Insn) { println!("{}", s); } +enum IntValue { + Int(i64), + Usize(usize), +} + +impl fmt::Display for IntValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + IntValue::Int(i) => f.pad(i.to_string().as_str()), + IntValue::Usize(i) => f.pad(i.to_string().as_str()), + } + } +} + fn insn_to_str(addr: usize, insn: &Insn) -> String { - let (opcode, p1, p2, p3, p4, p5, comment) = match insn { + let (opcode, p1, p2, p3, p4, p5, comment): ( + &str, + IntValue, + IntValue, + IntValue, + &str, + IntValue, + String, + ) = match insn { Insn::Init { target_pc } => ( "Init", - 0, - *target_pc, - 0, + IntValue::Usize(0), + IntValue::Usize(*target_pc), + IntValue::Usize(0), "", - 0, + IntValue::Usize(0), format!("Start at {}", target_pc), ), Insn::OpenReadAsync { @@ -379,25 +417,41 @@ fn insn_to_str(addr: usize, insn: &Insn) -> String { root_page, } => ( "OpenReadAsync", - *cursor_id, - *root_page, - 0, + IntValue::Usize(*cursor_id), + IntValue::Usize(*root_page), + IntValue::Usize(0), "", - 0, + IntValue::Usize(0), format!("root={}", root_page), ), - Insn::OpenReadAwait => ("OpenReadAwait", 0, 0, 0, "", 0, "".to_string()), - Insn::RewindAsync { cursor_id } => ("RewindAsync", *cursor_id, 0, 0, "", 0, "".to_string()), + Insn::OpenReadAwait => ( + "OpenReadAwait", + IntValue::Usize(0), + IntValue::Usize(0), + IntValue::Usize(0), + "", + IntValue::Usize(0), + "".to_string(), + ), + Insn::RewindAsync { cursor_id } => ( + "RewindAsync", + IntValue::Usize(*cursor_id), + IntValue::Usize(0), + IntValue::Usize(0), + "", + IntValue::Usize(0), + "".to_string(), + ), Insn::RewindAwait { cursor_id, pc_if_empty, } => ( "RewindAwait", - *cursor_id, - *pc_if_empty, - 0, + IntValue::Usize(*cursor_id), + IntValue::Usize(*pc_if_empty), + IntValue::Usize(0), "", - 0, + IntValue::Usize(0), "".to_string(), ), Insn::Column { @@ -406,11 +460,11 @@ fn insn_to_str(addr: usize, insn: &Insn) -> String { dest, } => ( "Column", - *cursor_id, - *column, - *dest, + IntValue::Usize(*cursor_id), + IntValue::Usize(*column), + IntValue::Usize(*dest), "", - 0, + IntValue::Usize(0), format!("r[{}]= cursor {} column {}", dest, cursor_id, column), ), Insn::ResultRow { @@ -418,45 +472,97 @@ fn insn_to_str(addr: usize, insn: &Insn) -> String { register_end, } => ( "ResultRow", - *register_start, - *register_end, - 0, + IntValue::Usize(*register_start), + IntValue::Usize(*register_end), + IntValue::Usize(0), "", - 0, + IntValue::Usize(0), format!("output=r[{}..{}]", register_start, register_end), ), - Insn::NextAsync { cursor_id } => ("NextAsync", *cursor_id, 0, 0, "", 0, "".to_string()), + Insn::NextAsync { cursor_id } => ( + "NextAsync", + IntValue::Usize(*cursor_id), + IntValue::Usize(0), + IntValue::Usize(0), + "", + IntValue::Usize(0), + "".to_string(), + ), Insn::NextAwait { cursor_id, pc_if_next, } => ( "NextAwait", - *cursor_id, - *pc_if_next, - 0, + IntValue::Usize(*cursor_id), + IntValue::Usize(*pc_if_next), + IntValue::Usize(0), "", - 0, + IntValue::Usize(0), + "".to_string(), + ), + Insn::Halt => ( + "Halt", + IntValue::Usize(0), + IntValue::Usize(0), + IntValue::Usize(0), + "", + IntValue::Usize(0), + "".to_string(), + ), + Insn::Transaction => ( + "Transaction", + IntValue::Usize(0), + IntValue::Usize(0), + IntValue::Usize(0), + "", + IntValue::Usize(0), + "".to_string(), + ), + Insn::Goto { target_pc } => ( + "Goto", + IntValue::Usize(0), + IntValue::Usize(*target_pc), + IntValue::Usize(0), + "", + IntValue::Usize(0), + "".to_string(), + ), + Insn::Integer { value, dest } => ( + "Integer", + IntValue::Usize(*dest), + IntValue::Int(*value), + IntValue::Usize(0), + "", + IntValue::Usize(0), "".to_string(), ), - Insn::Halt => ("Halt", 0, 0, 0, "", 0, "".to_string()), - Insn::Transaction => ("Transaction", 0, 0, 0, "", 0, "".to_string()), - Insn::Goto { target_pc } => ("Goto", 0, *target_pc, 0, "", 0, "".to_string()), - Insn::Integer { value, dest } => { - ("Integer", *dest, *value as usize, 0, "", 0, "".to_string()) - } Insn::String8 { value, dest } => ( "String8", - *dest, - 0, - 0, + IntValue::Usize(*dest), + IntValue::Usize(0), + IntValue::Usize(0), value.as_str(), - 0, + IntValue::Usize(0), format!("r[{}]= '{}'", dest, value), ), - Insn::RowId { cursor_id, dest } => ("RowId", *cursor_id, *dest, 0, "", 0, "".to_string()), - Insn::DecrJumpZero { reg, target_pc } => { - ("DecrJumpZero", *reg, *target_pc, 0, "", 0, "".to_string()) - } + Insn::RowId { cursor_id, dest } => ( + "RowId", + IntValue::Usize(*cursor_id), + IntValue::Usize(*dest), + IntValue::Usize(0), + "", + IntValue::Usize(0), + "".to_string(), + ), + Insn::DecrJumpZero { reg, target_pc } => ( + "DecrJumpZero", + IntValue::Usize(*reg), + IntValue::Usize(*target_pc), + IntValue::Usize(0), + "", + IntValue::Usize(0), + "".to_string(), + ), }; format!( "{:<4} {:<13} {:<4} {:<4} {:<4} {:<13} {:<2} {}", From 53c348402a23807e4feda669b1698f618ffc56a0 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 18 Jun 2024 19:51:47 +0200 Subject: [PATCH 2/6] core: parse unary and write to disk --- core/lib.rs | 11 ++++-- core/pager.rs | 9 ++++- core/sqlite3_ondisk.rs | 78 +++++++++++++++++++++++++++++++++++++++--- core/storage.rs | 44 ++++++++++++++++++++++-- core/translate.rs | 22 +++++++----- 5 files changed, 146 insertions(+), 18 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index ceda27f3a..d7f6f7d59 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -46,7 +46,11 @@ impl Database { pub fn open(io: Rc, page_source: PageSource) -> Result { let db_header = Pager::begin_open(&page_source)?; io.run_once()?; - let pager = Rc::new(Pager::finish_open(db_header.clone(), page_source)?); + let pager = Rc::new(Pager::finish_open( + db_header.clone(), + page_source, + io.clone(), + )?); let bootstrap_schema = Rc::new(Schema::new()); let conn = Connection { pager: pager.clone(), @@ -128,13 +132,14 @@ impl Connection { pub fn update_pragma(&self, name: &String, value: i64) { match name.as_str() { "cache_size" => { - // update in disk - // update in-memory header self.header.borrow_mut().default_cache_size = value .try_into() .expect(&format!("invalid value, too big for a i32 {}", value)); + // update in disk + self.pager.write_database_header(&self.header.borrow()); + // update cache size } _ => todo!(), diff --git a/core/pager.rs b/core/pager.rs index 3665eba67..a30206ec5 100644 --- a/core/pager.rs +++ b/core/pager.rs @@ -76,9 +76,10 @@ impl Page { /// to pages of the database file, including caching, concurrency control, and /// transaction management. pub struct Pager { - page_source: PageSource, + pub page_source: PageSource, page_cache: RefCell>>, buffer_pool: Rc, + pub io: Rc, } impl Pager { @@ -89,6 +90,7 @@ impl Pager { pub fn finish_open( db_header: Rc>, page_source: PageSource, + io: Rc, ) -> anyhow::Result { let db_header = db_header.borrow(); let page_size = db_header.page_size as usize; @@ -98,6 +100,7 @@ impl Pager { page_source, buffer_pool, page_cache, + io, }) } @@ -119,4 +122,8 @@ impl Pager { page_cache.insert(page_idx, page.clone()); Ok(page) } + + pub fn write_database_header(&self, header: &DatabaseHeader) { + sqlite3_ondisk::begin_write_database_header(header, self).expect("failed to write header"); + } } diff --git a/core/sqlite3_ondisk.rs b/core/sqlite3_ondisk.rs index 7950d36c2..8dff25bea 100644 --- a/core/sqlite3_ondisk.rs +++ b/core/sqlite3_ondisk.rs @@ -24,8 +24,8 @@ /// /// For more information, see: https://www.sqlite.org/fileformat.html use crate::buffer_pool::BufferPool; -use crate::io::{Buffer, Completion}; -use crate::pager::Page; +use crate::io::{Buffer, Completion, WriteCompletion}; +use crate::pager::{Page, Pager}; use crate::types::{OwnedRecord, OwnedValue}; use crate::PageSource; use anyhow::{anyhow, Result}; @@ -37,7 +37,7 @@ use std::rc::Rc; pub const DATABASE_HEADER_SIZE: usize = 100; const DEFAULT_CACHE_SIZE: i32 = -2000; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct DatabaseHeader { magic: [u8; 16], pub page_size: u16, @@ -80,7 +80,7 @@ pub fn begin_read_database_header(page_source: &PageSource) -> Result>) -> Result<()> { let buf = buf.as_slice(); - let mut header = header.borrow_mut(); + let mut header = std::cell::RefCell::borrow_mut(&header); header.magic.copy_from_slice(&buf[0..16]); header.page_size = u16::from_be_bytes([buf[16], buf[17]]); header.write_version = buf[18]; @@ -110,6 +110,76 @@ fn finish_read_database_header(buf: &Buffer, header: Rc> Ok(()) } +pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Result<()> { + let header = Rc::new(header.clone()); + let page_source = Rc::new(pager.page_source.clone()); + + let drop_fn = Rc::new(|_buf| {}); + let buffer_to_copy = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn))); + let buffer_to_copy_in_cb = buffer_to_copy.clone(); + + let header_cb = header.clone(); + let complete = Box::new(move |buffer: &Buffer| { + let header = header_cb.clone(); + let buffer: Buffer = buffer.clone(); + let buffer = Rc::new(RefCell::new(buffer)); + { + let mut buf_mut = std::cell::RefCell::borrow_mut(&buffer); + let buf = buf_mut.as_mut_slice(); + buf[0..16].copy_from_slice(&header.magic); + buf[16..18].copy_from_slice(&header.page_size.to_be_bytes()); + buf[18] = header.write_version; + buf[19] = header.read_version; + buf[20] = header.unused_space; + buf[21] = header.max_embed_frac; + buf[22] = header.min_embed_frac; + buf[23] = header.min_leaf_frac; + buf[24..28].copy_from_slice(&header.change_counter.to_be_bytes()); + buf[28..32].copy_from_slice(&header.database_size.to_be_bytes()); + buf[32..36].copy_from_slice(&header.freelist_trunk_page.to_be_bytes()); + buf[36..40].copy_from_slice(&header.freelist_pages.to_be_bytes()); + buf[40..44].copy_from_slice(&header.schema_cookie.to_be_bytes()); + buf[44..48].copy_from_slice(&header.schema_format.to_be_bytes()); + buf[48..52].copy_from_slice(&header.default_cache_size.to_be_bytes()); + + buf[52..56].copy_from_slice(&header.vacuum.to_be_bytes()); + buf[56..60].copy_from_slice(&header.text_encoding.to_be_bytes()); + buf[60..64].copy_from_slice(&header.user_version.to_be_bytes()); + buf[64..68].copy_from_slice(&header.incremental_vacuum.to_be_bytes()); + + buf[68..72].copy_from_slice(&header.application_id.to_be_bytes()); + buf[72..92].copy_from_slice(&header.reserved); + buf[92..96].copy_from_slice(&header.version_valid_for.to_be_bytes()); + buf[96..100].copy_from_slice(&header.version_number.to_be_bytes()); + let mut buffer_to_copy = std::cell::RefCell::borrow_mut(&buffer_to_copy_in_cb); + let buffer_to_copy_slice = buffer_to_copy.as_mut_slice(); + + buffer_to_copy_slice.copy_from_slice(buf); + } + }); + + let drop_fn = Rc::new(|_buf| {}); + let buf = Buffer::allocate(512, drop_fn); + let c = Rc::new(Completion::new(buf.clone(), complete)); + page_source.get(1, c.clone())?; + // run get header block + pager.io.run_once()?; + + let buffer_in_cb = buffer_to_copy.clone(); + let write_complete = Box::new(move |bytes_written: usize| { + let buf = buffer_in_cb.clone(); + let buf_len = std::cell::RefCell::borrow(&buf).len(); + if bytes_written < buf_len { + log::error!("wrote({bytes_written}) less than expected({buf_len})"); + } + // finish_read_database_header(buf, header).unwrap(); + }); + let c = Rc::new(WriteCompletion::new(write_complete)); + page_source.write(0, buffer_to_copy.clone(), c).unwrap(); + + Ok(()) +} + #[derive(Debug)] pub struct BTreePageHeader { page_type: PageType, diff --git a/core/storage.rs b/core/storage.rs index 95a1b3257..a9d2fc995 100644 --- a/core/storage.rs +++ b/core/storage.rs @@ -1,13 +1,24 @@ -use crate::io::Completion; #[cfg(feature = "fs")] use crate::io::File; +use crate::{ + io::{Completion, WriteCompletion}, + Buffer, +}; use anyhow::Result; -use std::rc::Rc; +use std::{cell::RefCell, rc::Rc}; pub struct PageSource { io: Rc, } +impl Clone for PageSource { + fn clone(&self) -> Self { + Self { + io: self.io.clone(), + } + } +} + impl PageSource { pub fn from_io(io: Rc) -> Self { Self { io } @@ -23,10 +34,25 @@ impl PageSource { pub fn get(&self, page_idx: usize, c: Rc) -> Result<()> { self.io.get(page_idx, c) } + + pub fn write( + &self, + page_idx: usize, + buffer: Rc>, + c: Rc, + ) -> Result<()> { + self.io.write(page_idx, buffer, c) + } } pub trait PageIO { fn get(&self, page_idx: usize, c: Rc) -> Result<()>; + fn write( + &self, + page_idx: usize, + buffer: Rc>, + c: Rc, + ) -> Result<()>; } #[cfg(feature = "fs")] @@ -46,6 +72,20 @@ impl PageIO for FileStorage { self.file.pread(pos, c)?; Ok(()) } + + fn write( + &self, + page_idx: usize, + buffer: Rc>, + c: Rc, + ) -> Result<()> { + let buffer_size = buffer.borrow().len(); + assert!(buffer_size >= 512); + assert!(buffer_size <= 65536); + assert!((buffer_size & (buffer_size - 1)) == 0); + self.file.pwrite(page_idx, buffer, c)?; + Ok(()) + } } #[cfg(feature = "fs")] diff --git a/core/translate.rs b/core/translate.rs index 5f4aab6c4..7bcfa76de 100644 --- a/core/translate.rs +++ b/core/translate.rs @@ -1,10 +1,10 @@ -use std::borrow::Borrow; - use crate::schema::Schema; use crate::sqlite3_ondisk::DatabaseHeader; use crate::vdbe::{Insn, Program, ProgramBuilder}; use anyhow::Result; -use sqlite3_parser::ast::{Expr, Literal, OneSelect, PragmaBody, QualifiedName, Select, Stmt}; +use sqlite3_parser::ast::{ + Expr, Literal, OneSelect, PragmaBody, QualifiedName, Select, Stmt, UnaryOperator, +}; /// Translate SQL statement into bytecode program. pub fn translate(schema: &Schema, stmt: Stmt, database_header: &DatabaseHeader) -> Result { @@ -248,11 +248,17 @@ fn translate_pragma( (false, 0) } Some(PragmaBody::Equals(value)) => { - let value_to_update = if let Expr::Literal(Literal::Numeric(numeric_value)) = value { - numeric_value.parse::().unwrap() - } else { - // If you put gibberish into a pragma update it turns it into 0 I think - 0 + let value_to_update = match value { + Expr::Literal(Literal::Numeric(numeric_value)) => { + numeric_value.parse::().unwrap() + } + Expr::Unary(UnaryOperator::Negative, expr) => match *expr { + Expr::Literal(Literal::Numeric(numeric_value)) => { + -numeric_value.parse::().unwrap() + } + _ => 0, + }, + _ => 0, }; println!("{:?}", value_to_update); (true, value_to_update) From 1884aab3b87f97b25f024d78dbbe1ff906ca84cd Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 18 Jun 2024 21:23:02 +0200 Subject: [PATCH 3/6] core: resize page cache Abstract page cache with PageCache so that we can call resize inside refcell --- core/lib.rs | 1 + core/pager.rs | 38 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index d7f6f7d59..a37955145 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -141,6 +141,7 @@ impl Connection { self.pager.write_database_header(&self.header.borrow()); // update cache size + self.pager.change_page_cache_size(value); } _ => todo!(), } diff --git a/core/pager.rs b/core/pager.rs index a30206ec5..5ef837153 100644 --- a/core/pager.rs +++ b/core/pager.rs @@ -5,6 +5,7 @@ use crate::PageSource; use log::trace; use sieve_cache::SieveCache; use std::cell::RefCell; +use std::hash::Hash; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::RwLock; @@ -72,12 +73,34 @@ impl Page { } } +pub struct PageCache { + cache: SieveCache, +} + +impl PageCache { + pub fn new(cache: SieveCache) -> Self { + Self { cache } + } + + pub fn insert(&mut self, key: K, value: V) { + self.cache.insert(key, value); + } + + pub fn get(&mut self, key: &K) -> Option<&V> { + self.cache.get(key) + } + + pub fn resize(&mut self, capacity: usize) { + self.cache = SieveCache::new(capacity).unwrap(); + } +} + /// The pager interface implements the persistence layer by providing access /// to pages of the database file, including caching, concurrency control, and /// transaction management. pub struct Pager { pub page_source: PageSource, - page_cache: RefCell>>, + page_cache: RefCell>>, buffer_pool: Rc, pub io: Rc, } @@ -95,7 +118,7 @@ impl Pager { let db_header = db_header.borrow(); let page_size = db_header.page_size as usize; let buffer_pool = Rc::new(BufferPool::new(page_size)); - let page_cache = RefCell::new(SieveCache::new(10).unwrap()); + let page_cache = RefCell::new(PageCache::new(SieveCache::new(10).unwrap())); Ok(Self { page_source, buffer_pool, @@ -126,4 +149,15 @@ impl Pager { pub fn write_database_header(&self, header: &DatabaseHeader) { sqlite3_ondisk::begin_write_database_header(header, self).expect("failed to write header"); } + + pub fn change_page_cache_size(&self, capacity: i64) { + // Sadly SieveCache is limited. Not resize available and no iterator. + let capacity = if capacity < 0 { + let kb = capacity.abs() * 1024; + kb / 512 // assume 512 page size for now + } else { + capacity + }; + self.page_cache.borrow_mut().resize(capacity as usize); + } } From 7e03cc70d00b90a26c287a3b83535f0462f0c382 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 19 Jun 2024 17:27:26 +0200 Subject: [PATCH 4/6] core: add minimum cache_size --- core/lib.rs | 14 +++++++++++--- core/pager.rs | 11 ++--------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index a37955145..dc1a03695 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -18,7 +18,7 @@ use fallible_iterator::FallibleIterator; use log::trace; use pager::Pager; use schema::Schema; -use sqlite3_ondisk::DatabaseHeader; +use sqlite3_ondisk::{DatabaseHeader, MIN_PAGE_CACHE_SIZE}; use sqlite3_parser::{ast::Cmd, lexer::sql::Parser}; use std::{cell::RefCell, rc::Rc}; use vdbe::ProgramType; @@ -132,8 +132,16 @@ impl Connection { pub fn update_pragma(&self, name: &String, value: i64) { match name.as_str() { "cache_size" => { + let cache_size_unformatted = value; + let cache_size = if cache_size_unformatted < 0 { + let kb = cache_size_unformatted.abs() * 1024; + kb / 512 // assume 512 page size for now + } else { + value + } as usize; + let cache_size = cache_size.clamp(cache_size, MIN_PAGE_CACHE_SIZE); // update in-memory header - self.header.borrow_mut().default_cache_size = value + self.header.borrow_mut().default_cache_size = cache_size_unformatted .try_into() .expect(&format!("invalid value, too big for a i32 {}", value)); @@ -141,7 +149,7 @@ impl Connection { self.pager.write_database_header(&self.header.borrow()); // update cache size - self.pager.change_page_cache_size(value); + self.pager.change_page_cache_size(cache_size); } _ => todo!(), } diff --git a/core/pager.rs b/core/pager.rs index 5ef837153..c8275bb48 100644 --- a/core/pager.rs +++ b/core/pager.rs @@ -150,14 +150,7 @@ impl Pager { sqlite3_ondisk::begin_write_database_header(header, self).expect("failed to write header"); } - pub fn change_page_cache_size(&self, capacity: i64) { - // Sadly SieveCache is limited. Not resize available and no iterator. - let capacity = if capacity < 0 { - let kb = capacity.abs() * 1024; - kb / 512 // assume 512 page size for now - } else { - capacity - }; - self.page_cache.borrow_mut().resize(capacity as usize); + pub fn change_page_cache_size(&self, capacity: usize) { + self.page_cache.borrow_mut().resize(capacity); } } From 932ae7bf3f91230448f3ce3e63318e8809d2c427 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 19 Jun 2024 17:54:09 +0200 Subject: [PATCH 5/6] core: update pragma in transalte --- core/lib.rs | 59 +++++++++++++++----------------------- core/sqlite3_ondisk.rs | 4 +++ core/translate.rs | 65 ++++++++++++++++++++++++++++++++++-------- core/vdbe.rs | 15 ---------- 4 files changed, 80 insertions(+), 63 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index dc1a03695..c6a9e962b 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -18,10 +18,9 @@ use fallible_iterator::FallibleIterator; use log::trace; use pager::Pager; use schema::Schema; -use sqlite3_ondisk::{DatabaseHeader, MIN_PAGE_CACHE_SIZE}; +use sqlite3_ondisk::DatabaseHeader; use sqlite3_parser::{ast::Cmd, lexer::sql::Parser}; use std::{cell::RefCell, rc::Rc}; -use vdbe::ProgramType; #[cfg(feature = "fs")] pub use io::PlatformIO; @@ -117,7 +116,8 @@ impl Connection { let program = Rc::new(translate::translate( &self.schema, stmt, - &self.header.borrow(), + self.header.clone(), + self.pager.clone(), )?); Ok(Statement::new(program, self.pager.clone())) } @@ -129,32 +129,6 @@ impl Connection { } } - pub fn update_pragma(&self, name: &String, value: i64) { - match name.as_str() { - "cache_size" => { - let cache_size_unformatted = value; - let cache_size = if cache_size_unformatted < 0 { - let kb = cache_size_unformatted.abs() * 1024; - kb / 512 // assume 512 page size for now - } else { - value - } as usize; - let cache_size = cache_size.clamp(cache_size, MIN_PAGE_CACHE_SIZE); - // update in-memory header - self.header.borrow_mut().default_cache_size = cache_size_unformatted - .try_into() - .expect(&format!("invalid value, too big for a i32 {}", value)); - - // update in disk - self.pager.write_database_header(&self.header.borrow()); - - // update cache size - self.pager.change_page_cache_size(cache_size); - } - _ => todo!(), - } - } - pub fn query(&self, sql: impl Into) -> Result> { let sql = sql.into(); trace!("Querying: {}", sql); @@ -166,16 +140,19 @@ impl Connection { let program = Rc::new(translate::translate( &self.schema, stmt, - &self.header.borrow(), + self.header.clone(), + self.pager.clone(), )?); - if let ProgramType::PragmaChange(name, value) = &program.program_type { - self.update_pragma(name, *value); - } let stmt = Statement::new(program, self.pager.clone()); Ok(Some(Rows { stmt })) } Cmd::Explain(stmt) => { - let program = translate::translate(&self.schema, stmt, &self.header.borrow())?; + let program = translate::translate( + &self.schema, + stmt, + self.header.clone(), + self.pager.clone(), + )?; program.explain(); Ok(None) } @@ -193,12 +170,22 @@ impl Connection { if let Some(cmd) = cmd { match cmd { Cmd::Explain(stmt) => { - let program = translate::translate(&self.schema, stmt, &self.header.borrow())?; + let program = translate::translate( + &self.schema, + stmt, + self.header.clone(), + self.pager.clone(), + )?; program.explain(); } Cmd::ExplainQueryPlan(_stmt) => todo!(), Cmd::Stmt(stmt) => { - let program = translate::translate(&self.schema, stmt, &self.header.borrow())?; + let program = translate::translate( + &self.schema, + stmt, + self.header.clone(), + self.pager.clone(), + )?; let mut state = vdbe::ProgramState::new(program.max_registers); program.step(&mut state, self.pager.clone())?; } diff --git a/core/sqlite3_ondisk.rs b/core/sqlite3_ondisk.rs index 8dff25bea..5f9223d24 100644 --- a/core/sqlite3_ondisk.rs +++ b/core/sqlite3_ondisk.rs @@ -35,7 +35,11 @@ use std::rc::Rc; /// The size of the database header in bytes. pub const DATABASE_HEADER_SIZE: usize = 100; +// DEFAULT_CACHE_SIZE negative values mean that we store the amount of pages a XKiB of memory can hold. +// We can calculate "real" cache size by diving by page size. const DEFAULT_CACHE_SIZE: i32 = -2000; +// Minimun number of pages that cache can hold. +pub const MIN_PAGE_CACHE_SIZE: usize = 10; #[derive(Debug, Default, Clone)] pub struct DatabaseHeader { diff --git a/core/translate.rs b/core/translate.rs index 7bcfa76de..182a9297f 100644 --- a/core/translate.rs +++ b/core/translate.rs @@ -1,5 +1,9 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use crate::pager::Pager; use crate::schema::Schema; -use crate::sqlite3_ondisk::DatabaseHeader; +use crate::sqlite3_ondisk::{DatabaseHeader, MIN_PAGE_CACHE_SIZE}; use crate::vdbe::{Insn, Program, ProgramBuilder}; use anyhow::Result; use sqlite3_parser::ast::{ @@ -7,14 +11,55 @@ use sqlite3_parser::ast::{ }; /// Translate SQL statement into bytecode program. -pub fn translate(schema: &Schema, stmt: Stmt, database_header: &DatabaseHeader) -> Result { +pub fn translate( + schema: &Schema, + stmt: Stmt, + database_header: Rc>, + pager: Rc, +) -> Result { match stmt { Stmt::Select(select) => translate_select(schema, select), - Stmt::Pragma(name, body) => translate_pragma(&name, body, database_header), + Stmt::Pragma(name, body) => translate_pragma(&name, body, database_header, pager), _ => todo!(), } } +pub fn update_pragma( + name: &String, + value: i64, + header: Rc>, + pager: Rc, +) { + match name.as_str() { + "cache_size" => { + let mut cache_size_unformatted = value; + let mut cache_size = if cache_size_unformatted < 0 { + let kb = cache_size_unformatted.abs() * 1024; + kb / 512 // assume 512 page size for now + } else { + value + } as usize; + if cache_size < MIN_PAGE_CACHE_SIZE { + // update both in memory and stored disk value + cache_size = MIN_PAGE_CACHE_SIZE; + cache_size_unformatted = MIN_PAGE_CACHE_SIZE as i64; + } + + // update in-memory header + header.borrow_mut().default_cache_size = cache_size_unformatted + .try_into() + .expect(&format!("invalid value, too big for a i32 {}", value)); + + // update in disk + let header_copy = header.borrow().clone(); + pager.write_database_header(&header_copy); + + // update cache size + pager.change_page_cache_size(cache_size); + } + _ => todo!(), + } +} fn translate_select(schema: &Schema, select: Select) -> Result { let mut program = ProgramBuilder::new(); let init_offset = program.emit_placeholder(); @@ -226,17 +271,18 @@ fn translate_expr( fn translate_pragma( name: &QualifiedName, body: Option, - database_header: &DatabaseHeader, + database_header: Rc>, + pager: Rc, ) -> Result { let mut program = ProgramBuilder::new(); let init_offset = program.emit_placeholder(); let start_offset = program.offset(); - let (change_pragma, new_value) = match body { + match body { None => { let pragma_result = program.alloc_register(); program.emit_insn(Insn::Integer { - value: database_header.default_cache_size.into(), + value: database_header.borrow().default_cache_size.into(), dest: pragma_result, }); @@ -245,7 +291,6 @@ fn translate_pragma( register_start: pragma_result, register_end: pragma_result_end, }); - (false, 0) } Some(PragmaBody::Equals(value)) => { let value_to_update = match value { @@ -260,8 +305,7 @@ fn translate_pragma( }, _ => 0, }; - println!("{:?}", value_to_update); - (true, value_to_update) + update_pragma(&name.name.0, value_to_update, database_header, pager); } Some(PragmaBody::Call(_)) => { todo!() @@ -278,8 +322,5 @@ fn translate_pragma( program.emit_insn(Insn::Goto { target_pc: start_offset, }); - if change_pragma { - return Ok(program.build_pragma_change(name.name.to_string(), new_value)); - } Ok(program.build()) } diff --git a/core/vdbe.rs b/core/vdbe.rs index e439e95eb..6b39a5ed1 100644 --- a/core/vdbe.rs +++ b/core/vdbe.rs @@ -155,15 +155,6 @@ impl ProgramBuilder { Program { max_registers: self.next_free_register, insns: self.insns, - program_type: ProgramType::Default, - } - } - - pub fn build_pragma_change(self, pragma_to_change: String, value: i64) -> Program { - Program { - max_registers: self.next_free_register, - insns: self.insns, - program_type: ProgramType::PragmaChange(pragma_to_change, value), } } } @@ -202,15 +193,9 @@ impl ProgramState { } } -pub enum ProgramType { - Default, - PragmaChange(String, i64), -} - pub struct Program { pub max_registers: usize, pub insns: Vec, - pub program_type: ProgramType, } impl Program { From 427103b199622a41c2cb5ab9d0c28235bd0aeb56 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 19 Jun 2024 20:54:55 +0200 Subject: [PATCH 6/6] core,wasm: add missing write procedure to wasm --- bindings/wasm/lib.rs | 9 +++++++++ core/lib.rs | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index e8c29a360..646e6504e 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -39,4 +39,13 @@ impl limbo_core::PageIO for PageIO { fn get(&self, _page_idx: usize, _c: Rc) -> Result<()> { todo!(); } + + fn write( + &self, + _page_idx: usize, + _buffer: Rc>, + _c: Rc, + ) -> Result<()> { + todo!() + } } diff --git a/core/lib.rs b/core/lib.rs index c6a9e962b..91b2084ac 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -24,7 +24,7 @@ use std::{cell::RefCell, rc::Rc}; #[cfg(feature = "fs")] pub use io::PlatformIO; -pub use io::{Buffer, Completion, File, IO}; +pub use io::{Buffer, Completion, File, WriteCompletion, IO}; pub use storage::{PageIO, PageSource}; pub use types::Value;