mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-23 11:14:19 +01:00
This is a purely syntactic PR. It doesn't change behavior, just rewrites some loops and removes unneeded parts, like lifetime annotations and references. Mainly because the Clippy and IDE warnings get annoying. Don't worry about the number of commits, I just separated based on type of change. Closes #732
597 lines
18 KiB
Rust
597 lines
18 KiB
Rust
mod error;
|
|
mod ext;
|
|
mod function;
|
|
mod io;
|
|
#[cfg(feature = "json")]
|
|
mod json;
|
|
mod parameters;
|
|
mod pseudo;
|
|
mod result;
|
|
mod schema;
|
|
mod storage;
|
|
mod translate;
|
|
mod types;
|
|
mod util;
|
|
mod vdbe;
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
#[global_allocator]
|
|
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
|
|
|
|
use fallible_iterator::FallibleIterator;
|
|
#[cfg(not(target_family = "wasm"))]
|
|
use libloading::{Library, Symbol};
|
|
#[cfg(not(target_family = "wasm"))]
|
|
use limbo_ext::{ExtensionApi, ExtensionEntryPoint};
|
|
use log::trace;
|
|
use schema::Schema;
|
|
use sqlite3_parser::ast;
|
|
use sqlite3_parser::{ast::Cmd, lexer::sql::Parser};
|
|
use std::cell::Cell;
|
|
use std::collections::HashMap;
|
|
use std::num::NonZero;
|
|
use std::sync::{Arc, OnceLock, RwLock};
|
|
use std::{cell::RefCell, rc::Rc};
|
|
use storage::btree::btree_init_page;
|
|
#[cfg(feature = "fs")]
|
|
use storage::database::FileStorage;
|
|
use storage::page_cache::DumbLruPageCache;
|
|
use storage::pager::allocate_page;
|
|
use storage::sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE};
|
|
pub use storage::wal::WalFile;
|
|
pub use storage::wal::WalFileShared;
|
|
pub use types::Value;
|
|
use util::parse_schema_rows;
|
|
|
|
pub use error::LimboError;
|
|
use translate::select::prepare_select_plan;
|
|
pub type Result<T, E = LimboError> = std::result::Result<T, E>;
|
|
|
|
use crate::translate::optimizer::optimize_plan;
|
|
pub use io::OpenFlags;
|
|
pub use io::PlatformIO;
|
|
#[cfg(all(feature = "fs", target_family = "unix"))]
|
|
pub use io::UnixIO;
|
|
#[cfg(all(feature = "fs", target_os = "linux", feature = "io_uring"))]
|
|
pub use io::UringIO;
|
|
pub use io::{Buffer, Completion, File, MemoryIO, WriteCompletion, IO};
|
|
pub use storage::buffer_pool::BufferPool;
|
|
pub use storage::database::DatabaseStorage;
|
|
pub use storage::pager::Page;
|
|
pub use storage::pager::Pager;
|
|
pub use storage::wal::CheckpointStatus;
|
|
pub use storage::wal::Wal;
|
|
pub static DATABASE_VERSION: OnceLock<String> = OnceLock::new();
|
|
|
|
#[derive(Clone)]
|
|
enum TransactionState {
|
|
Write,
|
|
Read,
|
|
None,
|
|
}
|
|
|
|
pub struct Database {
|
|
pager: Rc<Pager>,
|
|
schema: Rc<RefCell<Schema>>,
|
|
header: Rc<RefCell<DatabaseHeader>>,
|
|
syms: Rc<RefCell<SymbolTable>>,
|
|
// Shared structures of a Database are the parts that are common to multiple threads that might
|
|
// create DB connections.
|
|
_shared_page_cache: Arc<RwLock<DumbLruPageCache>>,
|
|
_shared_wal: Arc<RwLock<WalFileShared>>,
|
|
}
|
|
|
|
impl Database {
|
|
#[cfg(feature = "fs")]
|
|
pub fn open_file(io: Arc<dyn IO>, path: &str) -> Result<Arc<Database>> {
|
|
use storage::wal::WalFileShared;
|
|
|
|
let file = io.open_file(path, OpenFlags::Create, true)?;
|
|
maybe_init_database_file(&file, &io)?;
|
|
let page_io = Rc::new(FileStorage::new(file));
|
|
let wal_path = format!("{}-wal", path);
|
|
let db_header = Pager::begin_open(page_io.clone())?;
|
|
io.run_once()?;
|
|
let page_size = db_header.borrow().page_size;
|
|
let wal_shared = WalFileShared::open_shared(&io, wal_path.as_str(), page_size)?;
|
|
let buffer_pool = Rc::new(BufferPool::new(page_size as usize));
|
|
let wal = Rc::new(RefCell::new(WalFile::new(
|
|
io.clone(),
|
|
db_header.borrow().page_size as usize,
|
|
wal_shared.clone(),
|
|
buffer_pool.clone(),
|
|
)));
|
|
Self::open(io, page_io, wal, wal_shared, buffer_pool)
|
|
}
|
|
|
|
#[allow(clippy::arc_with_non_send_sync)]
|
|
pub fn open(
|
|
io: Arc<dyn IO>,
|
|
page_io: Rc<dyn DatabaseStorage>,
|
|
wal: Rc<RefCell<dyn Wal>>,
|
|
shared_wal: Arc<RwLock<WalFileShared>>,
|
|
buffer_pool: Rc<BufferPool>,
|
|
) -> Result<Arc<Database>> {
|
|
let db_header = Pager::begin_open(page_io.clone())?;
|
|
io.run_once()?;
|
|
DATABASE_VERSION.get_or_init(|| {
|
|
let version = db_header.borrow().version_number;
|
|
version.to_string()
|
|
});
|
|
let _shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10)));
|
|
let pager = Rc::new(Pager::finish_open(
|
|
db_header.clone(),
|
|
page_io,
|
|
wal,
|
|
io.clone(),
|
|
_shared_page_cache.clone(),
|
|
buffer_pool,
|
|
)?);
|
|
let header = db_header;
|
|
let schema = Rc::new(RefCell::new(Schema::new()));
|
|
let syms = Rc::new(RefCell::new(SymbolTable::new()));
|
|
let db = Database {
|
|
pager: pager.clone(),
|
|
schema: schema.clone(),
|
|
header: header.clone(),
|
|
_shared_page_cache: _shared_page_cache.clone(),
|
|
_shared_wal: shared_wal.clone(),
|
|
syms,
|
|
};
|
|
let db = Arc::new(db);
|
|
let conn = Rc::new(Connection {
|
|
db: db.clone(),
|
|
pager,
|
|
schema: schema.clone(),
|
|
header,
|
|
transaction_state: RefCell::new(TransactionState::None),
|
|
last_insert_rowid: Cell::new(0),
|
|
last_change: Cell::new(0),
|
|
total_changes: Cell::new(0),
|
|
});
|
|
let rows = conn.query("SELECT * FROM sqlite_schema")?;
|
|
let mut schema = schema.borrow_mut();
|
|
parse_schema_rows(rows, &mut schema, io)?;
|
|
Ok(db)
|
|
}
|
|
|
|
pub fn connect(self: &Arc<Database>) -> Rc<Connection> {
|
|
Rc::new(Connection {
|
|
db: self.clone(),
|
|
pager: self.pager.clone(),
|
|
schema: self.schema.clone(),
|
|
header: self.header.clone(),
|
|
last_insert_rowid: Cell::new(0),
|
|
transaction_state: RefCell::new(TransactionState::None),
|
|
last_change: Cell::new(0),
|
|
total_changes: Cell::new(0),
|
|
})
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
pub fn load_extension<P: AsRef<std::ffi::OsStr>>(&self, path: P) -> Result<()> {
|
|
let api = Box::new(self.build_limbo_ext());
|
|
let lib =
|
|
unsafe { Library::new(path).map_err(|e| LimboError::ExtensionError(e.to_string()))? };
|
|
let entry: Symbol<ExtensionEntryPoint> = unsafe {
|
|
lib.get(b"register_extension")
|
|
.map_err(|e| LimboError::ExtensionError(e.to_string()))?
|
|
};
|
|
let api_ptr: *const ExtensionApi = Box::into_raw(api);
|
|
let result_code = unsafe { entry(api_ptr) };
|
|
if result_code.is_ok() {
|
|
self.syms.borrow_mut().extensions.push((lib, api_ptr));
|
|
Ok(())
|
|
} else {
|
|
if !api_ptr.is_null() {
|
|
let _ = unsafe { Box::from_raw(api_ptr.cast_mut()) };
|
|
}
|
|
Err(LimboError::ExtensionError(
|
|
"Extension registration failed".to_string(),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn maybe_init_database_file(file: &Rc<dyn File>, io: &Arc<dyn IO>) -> Result<()> {
|
|
if file.size()? == 0 {
|
|
// init db
|
|
let db_header = DatabaseHeader::default();
|
|
let page1 = allocate_page(
|
|
1,
|
|
&Rc::new(BufferPool::new(db_header.page_size as usize)),
|
|
DATABASE_HEADER_SIZE,
|
|
);
|
|
{
|
|
// Create the sqlite_schema table, for this we just need to create the btree page
|
|
// for the first page of the database which is basically like any other btree page
|
|
// but with a 100 byte offset, so we just init the page so that sqlite understands
|
|
// this is a correct page.
|
|
btree_init_page(
|
|
&page1,
|
|
storage::sqlite3_ondisk::PageType::TableLeaf,
|
|
&db_header,
|
|
DATABASE_HEADER_SIZE,
|
|
);
|
|
|
|
let contents = page1.get().contents.as_mut().unwrap();
|
|
contents.write_database_header(&db_header);
|
|
// write the first page to disk synchronously
|
|
let flag_complete = Rc::new(RefCell::new(false));
|
|
{
|
|
let flag_complete = flag_complete.clone();
|
|
let completion = Completion::Write(WriteCompletion::new(Box::new(move |_| {
|
|
*flag_complete.borrow_mut() = true;
|
|
})));
|
|
file.pwrite(0, contents.buffer.clone(), Rc::new(completion))?;
|
|
}
|
|
let mut limit = 100;
|
|
loop {
|
|
io.run_once()?;
|
|
if *flag_complete.borrow() {
|
|
break;
|
|
}
|
|
limit -= 1;
|
|
if limit == 0 {
|
|
panic!("Database file couldn't be initialized, io loop run for {} iterations and write didn't finish", limit);
|
|
}
|
|
}
|
|
}
|
|
};
|
|
Ok(())
|
|
}
|
|
|
|
pub struct Connection {
|
|
db: Arc<Database>,
|
|
pager: Rc<Pager>,
|
|
schema: Rc<RefCell<Schema>>,
|
|
header: Rc<RefCell<DatabaseHeader>>,
|
|
transaction_state: RefCell<TransactionState>,
|
|
last_insert_rowid: Cell<u64>,
|
|
last_change: Cell<i64>,
|
|
total_changes: Cell<i64>,
|
|
}
|
|
|
|
impl Connection {
|
|
pub fn prepare(self: &Rc<Connection>, sql: impl Into<String>) -> Result<Statement> {
|
|
let sql = sql.into();
|
|
trace!("Preparing: {}", sql);
|
|
let db = self.db.clone();
|
|
let syms: &SymbolTable = &db.syms.borrow();
|
|
let mut parser = Parser::new(sql.as_bytes());
|
|
let cmd = parser.next()?;
|
|
if let Some(cmd) = cmd {
|
|
match cmd {
|
|
Cmd::Stmt(stmt) => {
|
|
let program = Rc::new(translate::translate(
|
|
&self.schema.borrow(),
|
|
stmt,
|
|
self.header.clone(),
|
|
self.pager.clone(),
|
|
Rc::downgrade(self),
|
|
syms,
|
|
)?);
|
|
Ok(Statement::new(program, self.pager.clone()))
|
|
}
|
|
Cmd::Explain(_stmt) => todo!(),
|
|
Cmd::ExplainQueryPlan(_stmt) => todo!(),
|
|
}
|
|
} else {
|
|
todo!()
|
|
}
|
|
}
|
|
|
|
pub fn query(self: &Rc<Connection>, sql: impl Into<String>) -> Result<Option<Rows>> {
|
|
let sql = sql.into();
|
|
trace!("Querying: {}", sql);
|
|
let mut parser = Parser::new(sql.as_bytes());
|
|
let cmd = parser.next()?;
|
|
match cmd {
|
|
Some(cmd) => self.run_cmd(cmd),
|
|
None => Ok(None),
|
|
}
|
|
}
|
|
|
|
pub(crate) fn run_cmd(self: &Rc<Connection>, cmd: Cmd) -> Result<Option<Rows>> {
|
|
let db = self.db.clone();
|
|
let syms: &SymbolTable = &db.syms.borrow();
|
|
|
|
match cmd {
|
|
Cmd::Stmt(stmt) => {
|
|
let program = Rc::new(translate::translate(
|
|
&self.schema.borrow(),
|
|
stmt,
|
|
self.header.clone(),
|
|
self.pager.clone(),
|
|
Rc::downgrade(self),
|
|
syms,
|
|
)?);
|
|
let stmt = Statement::new(program, self.pager.clone());
|
|
Ok(Some(Rows { stmt }))
|
|
}
|
|
Cmd::Explain(stmt) => {
|
|
let program = translate::translate(
|
|
&self.schema.borrow(),
|
|
stmt,
|
|
self.header.clone(),
|
|
self.pager.clone(),
|
|
Rc::downgrade(self),
|
|
syms,
|
|
)?;
|
|
program.explain();
|
|
Ok(None)
|
|
}
|
|
Cmd::ExplainQueryPlan(stmt) => {
|
|
match stmt {
|
|
ast::Stmt::Select(select) => {
|
|
let mut plan = prepare_select_plan(
|
|
&self.schema.borrow(),
|
|
*select,
|
|
&self.db.syms.borrow(),
|
|
)?;
|
|
optimize_plan(&mut plan)?;
|
|
println!("{}", plan);
|
|
}
|
|
_ => todo!(),
|
|
}
|
|
Ok(None)
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn query_runner<'a>(self: &'a Rc<Connection>, sql: &'a [u8]) -> QueryRunner<'a> {
|
|
QueryRunner::new(self, sql)
|
|
}
|
|
|
|
pub fn execute(self: &Rc<Connection>, sql: impl Into<String>) -> Result<()> {
|
|
let sql = sql.into();
|
|
let db = self.db.clone();
|
|
let syms: &SymbolTable = &db.syms.borrow();
|
|
let mut parser = Parser::new(sql.as_bytes());
|
|
let cmd = parser.next()?;
|
|
if let Some(cmd) = cmd {
|
|
match cmd {
|
|
Cmd::Explain(stmt) => {
|
|
let program = translate::translate(
|
|
&self.schema.borrow(),
|
|
stmt,
|
|
self.header.clone(),
|
|
self.pager.clone(),
|
|
Rc::downgrade(self),
|
|
syms,
|
|
)?;
|
|
program.explain();
|
|
}
|
|
Cmd::ExplainQueryPlan(_stmt) => todo!(),
|
|
Cmd::Stmt(stmt) => {
|
|
let program = translate::translate(
|
|
&self.schema.borrow(),
|
|
stmt,
|
|
self.header.clone(),
|
|
self.pager.clone(),
|
|
Rc::downgrade(self),
|
|
syms,
|
|
)?;
|
|
|
|
let mut state = vdbe::ProgramState::new(program.max_registers);
|
|
program.step(&mut state, self.pager.clone())?;
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub fn cacheflush(&self) -> Result<CheckpointStatus> {
|
|
self.pager.cacheflush()
|
|
}
|
|
|
|
pub fn clear_page_cache(&self) -> Result<()> {
|
|
self.pager.clear_page_cache();
|
|
Ok(())
|
|
}
|
|
|
|
pub fn checkpoint(&self) -> Result<()> {
|
|
self.pager.clear_page_cache();
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
pub fn load_extension<P: AsRef<std::ffi::OsStr>>(&self, path: P) -> Result<()> {
|
|
Database::load_extension(self.db.as_ref(), path)
|
|
}
|
|
|
|
/// Close a connection and checkpoint.
|
|
pub fn close(&self) -> Result<()> {
|
|
loop {
|
|
// TODO: make this async?
|
|
match self.pager.checkpoint()? {
|
|
CheckpointStatus::Done => {
|
|
return Ok(());
|
|
}
|
|
CheckpointStatus::IO => {
|
|
self.pager.io.run_once()?;
|
|
}
|
|
};
|
|
}
|
|
}
|
|
|
|
pub fn last_insert_rowid(&self) -> u64 {
|
|
self.last_insert_rowid.get()
|
|
}
|
|
|
|
fn update_last_rowid(&self, rowid: u64) {
|
|
self.last_insert_rowid.set(rowid);
|
|
}
|
|
}
|
|
|
|
pub struct Statement {
|
|
program: Rc<vdbe::Program>,
|
|
state: vdbe::ProgramState,
|
|
pager: Rc<Pager>,
|
|
}
|
|
|
|
impl Statement {
|
|
pub fn new(program: Rc<vdbe::Program>, pager: Rc<Pager>) -> Self {
|
|
let state = vdbe::ProgramState::new(program.max_registers);
|
|
Self {
|
|
program,
|
|
state,
|
|
pager,
|
|
}
|
|
}
|
|
|
|
pub fn interrupt(&mut self) {
|
|
self.state.interrupt();
|
|
}
|
|
|
|
pub fn step(&mut self) -> Result<StepResult<'_>> {
|
|
let result = self.program.step(&mut self.state, self.pager.clone())?;
|
|
match result {
|
|
vdbe::StepResult::Row(row) => Ok(StepResult::Row(Row { values: row.values })),
|
|
vdbe::StepResult::IO => Ok(StepResult::IO),
|
|
vdbe::StepResult::Done => Ok(StepResult::Done),
|
|
vdbe::StepResult::Interrupt => Ok(StepResult::Interrupt),
|
|
vdbe::StepResult::Busy => Ok(StepResult::Busy),
|
|
}
|
|
}
|
|
|
|
pub fn query(&mut self) -> Result<Rows> {
|
|
let stmt = Statement::new(self.program.clone(), self.pager.clone());
|
|
Ok(Rows::new(stmt))
|
|
}
|
|
|
|
pub fn parameters(&self) -> ¶meters::Parameters {
|
|
&self.program.parameters
|
|
}
|
|
|
|
pub fn bind_at(&mut self, index: NonZero<usize>, value: Value) {
|
|
self.state.bind_at(index, value.into());
|
|
}
|
|
|
|
pub fn reset(&mut self) {
|
|
let state = vdbe::ProgramState::new(self.program.max_registers);
|
|
self.state = state
|
|
}
|
|
}
|
|
|
|
#[derive(PartialEq)]
|
|
pub enum StepResult<'a> {
|
|
Row(Row<'a>),
|
|
IO,
|
|
Done,
|
|
Interrupt,
|
|
Busy,
|
|
}
|
|
|
|
#[derive(PartialEq)]
|
|
pub struct Row<'a> {
|
|
pub values: Vec<Value<'a>>,
|
|
}
|
|
|
|
impl<'a> Row<'a> {
|
|
pub fn get<T: types::FromValue<'a> + 'a>(&self, idx: usize) -> Result<T> {
|
|
let value = &self.values[idx];
|
|
T::from_value(value)
|
|
}
|
|
}
|
|
|
|
pub struct Rows {
|
|
stmt: Statement,
|
|
}
|
|
|
|
impl Rows {
|
|
pub fn new(stmt: Statement) -> Self {
|
|
Self { stmt }
|
|
}
|
|
|
|
pub fn next_row(&mut self) -> Result<StepResult<'_>> {
|
|
self.stmt.step()
|
|
}
|
|
}
|
|
|
|
pub(crate) struct SymbolTable {
|
|
pub functions: HashMap<String, Rc<function::ExternalFunc>>,
|
|
#[cfg(not(target_family = "wasm"))]
|
|
extensions: Vec<(Library, *const ExtensionApi)>,
|
|
}
|
|
|
|
impl std::fmt::Debug for SymbolTable {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
f.debug_struct("SymbolTable")
|
|
.field("functions", &self.functions)
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
fn is_shared_library(path: &std::path::Path) -> bool {
|
|
path.extension()
|
|
.map_or(false, |ext| ext == "so" || ext == "dylib" || ext == "dll")
|
|
}
|
|
|
|
pub fn resolve_ext_path(extpath: &str) -> Result<std::path::PathBuf> {
|
|
let path = std::path::Path::new(extpath);
|
|
if !path.exists() {
|
|
if is_shared_library(path) {
|
|
return Err(LimboError::ExtensionError(format!(
|
|
"Extension file not found: {}",
|
|
extpath
|
|
)));
|
|
};
|
|
let maybe = path.with_extension(std::env::consts::DLL_EXTENSION);
|
|
maybe
|
|
.exists()
|
|
.then_some(maybe)
|
|
.ok_or(LimboError::ExtensionError(format!(
|
|
"Extension file not found: {}",
|
|
extpath
|
|
)))
|
|
} else {
|
|
Ok(path.to_path_buf())
|
|
}
|
|
}
|
|
|
|
impl SymbolTable {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
functions: HashMap::new(),
|
|
// TODO: wasm libs will be very different
|
|
#[cfg(not(target_family = "wasm"))]
|
|
extensions: Vec::new(),
|
|
}
|
|
}
|
|
|
|
pub fn resolve_function(
|
|
&self,
|
|
name: &str,
|
|
_arg_count: usize,
|
|
) -> Option<Rc<function::ExternalFunc>> {
|
|
self.functions.get(name).cloned()
|
|
}
|
|
}
|
|
|
|
pub struct QueryRunner<'a> {
|
|
parser: Parser<'a>,
|
|
conn: &'a Rc<Connection>,
|
|
}
|
|
|
|
impl<'a> QueryRunner<'a> {
|
|
pub(crate) fn new(conn: &'a Rc<Connection>, statements: &'a [u8]) -> Self {
|
|
Self {
|
|
parser: Parser::new(statements),
|
|
conn,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Iterator for QueryRunner<'_> {
|
|
type Item = Result<Option<Rows>>;
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
match self.parser.next() {
|
|
Ok(Some(cmd)) => Some(self.conn.run_cmd(cmd)),
|
|
Ok(None) => None,
|
|
Err(err) => Some(Err(LimboError::from(err))),
|
|
}
|
|
}
|
|
}
|