Single-threaded architecture

Use Rc instead of Arc and replace the concurrent LRU with
single-threaded SIEVE.

Fixes #23
Fixes #29
This commit is contained in:
Pekka Enberg
2024-03-03 11:57:47 +02:00
parent 8f48416de0
commit ed9f3e6d1e
14 changed files with 103 additions and 113 deletions

17
Cargo.lock generated
View File

@@ -289,15 +289,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "concurrent_lru"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7feb5cb312f774e8a24540e27206db4e890f7d488563671d24a16389cf4c2e4e"
dependencies = [
"once_cell",
]
[[package]]
name = "cpp_demangle"
version = "0.4.3"
@@ -850,7 +841,6 @@ version = "0.0.0"
dependencies = [
"anyhow",
"cfg_block",
"concurrent_lru",
"criterion",
"fallible-iterator 0.3.0",
"io-uring",
@@ -859,6 +849,7 @@ dependencies = [
"pprof",
"rstest",
"rusqlite",
"sieve-cache",
"sqlite3-parser",
]
@@ -1431,6 +1422,12 @@ dependencies = [
"serde",
]
[[package]]
name = "sieve-cache"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51bf3a9dccf2c079bf1465d449a485c85b36443caf765f2f127bfec28b180f75"
[[package]]
name = "siphasher"
version = "0.3.11"

View File

@@ -1,5 +1,5 @@
use anyhow::Result;
use std::sync::Arc;
use std::rc::Rc;
use wasm_bindgen::prelude::*;
#[wasm_bindgen]
@@ -10,8 +10,8 @@ pub struct Database {
#[wasm_bindgen]
impl Database {
pub fn open(_path: &str) -> Database {
let io = Arc::new(IO {});
let page_source = limbo_core::PageSource::from_io(Arc::new(PageIO {}));
let io = Rc::new(IO {});
let page_source = limbo_core::PageSource::from_io(Rc::new(PageIO {}));
let inner = limbo_core::Database::open(io, page_source).unwrap();
Database { _inner: inner }
}
@@ -32,7 +32,7 @@ impl limbo_core::IO for IO {
pub struct PageIO {}
impl limbo_core::PageIO for PageIO {
fn get(&self, _page_idx: usize, _c: Arc<limbo_core::Completion>) -> Result<()> {
fn get(&self, _page_idx: usize, _c: Rc<limbo_core::Completion>) -> Result<()> {
todo!();
}
}

View File

@@ -2,7 +2,7 @@ use clap::{Parser, ValueEnum};
use cli_table::{Cell, Table};
use limbo_core::{Database, RowResult, Value};
use rustyline::{error::ReadlineError, DefaultEditor};
use std::{path::PathBuf, sync::Arc};
use std::{path::PathBuf, rc::Rc};
#[derive(ValueEnum, Copy, Clone, Debug, PartialEq, Eq)]
enum OutputMode {
@@ -32,7 +32,7 @@ fn main() -> anyhow::Result<()> {
env_logger::init();
let opts = Opts::parse();
let path = opts.database.to_str().unwrap();
let io = Arc::new(limbo_core::PlatformIO::new()?);
let io = Rc::new(limbo_core::PlatformIO::new()?);
let db = Database::open_file(io.clone(), path)?;
let conn = db.connect();
if let Some(sql) = opts.sql {
@@ -69,7 +69,7 @@ fn main() -> anyhow::Result<()> {
}
fn query(
io: Arc<dyn limbo_core::IO>,
io: Rc<dyn limbo_core::IO>,
conn: &limbo_core::Connection,
sql: &str,
output_mode: &OutputMode,

View File

@@ -26,9 +26,9 @@ mimalloc = { version = "*", default-features = false }
[dependencies]
anyhow = "1.0.75"
cfg_block = "0.1.1"
concurrent_lru = "0.2.0"
fallible-iterator = "0.3.0"
log = "0.4.20"
sieve-cache = "0.1.4"
sqlite3-parser = "0.11.0"
[target.'cfg(not(target_family = "windows"))'.dev-dependencies]

View File

@@ -5,16 +5,16 @@ use crate::types::OwnedRecord;
use anyhow::Result;
use std::cell::{Ref, RefCell};
use std::sync::Arc;
use std::rc::Rc;
pub struct MemPage {
parent: Option<Arc<MemPage>>,
parent: Option<Rc<MemPage>>,
page_idx: usize,
cell_idx: RefCell<usize>,
}
impl MemPage {
pub fn new(parent: Option<Arc<MemPage>>, page_idx: usize, cell_idx: usize) -> Self {
pub fn new(parent: Option<Rc<MemPage>>, page_idx: usize, cell_idx: usize) -> Self {
Self {
parent,
page_idx,
@@ -38,15 +38,15 @@ pub enum CursorResult<T> {
}
pub struct Cursor {
pager: Arc<Pager>,
pager: Rc<Pager>,
root_page: usize,
page: RefCell<Option<Arc<MemPage>>>,
page: RefCell<Option<Rc<MemPage>>>,
rowid: RefCell<Option<u64>>,
record: RefCell<Option<OwnedRecord>>,
}
impl Cursor {
pub fn new(pager: Arc<Pager>, root_page: usize) -> Self {
pub fn new(pager: Rc<Pager>, root_page: usize) -> Self {
Self {
pager,
root_page,
@@ -62,16 +62,14 @@ impl Cursor {
pub fn rewind(&mut self) -> Result<CursorResult<()>> {
let mem_page = MemPage::new(None, self.root_page, 0);
self.page.replace(Some(Arc::new(mem_page)));
self.page.replace(Some(Rc::new(mem_page)));
match self.get_next_record()? {
CursorResult::Ok((rowid, next)) => {
self.rowid.replace(rowid);
self.record.replace(next);
Ok(CursorResult::Ok(()))
}
CursorResult::IO => {
Ok(CursorResult::IO)
}
CursorResult::IO => Ok(CursorResult::IO),
}
}
@@ -82,9 +80,7 @@ impl Cursor {
self.record.replace(next);
Ok(CursorResult::Ok(()))
}
CursorResult::IO => {
Ok(CursorResult::IO)
}
CursorResult::IO => Ok(CursorResult::IO),
}
}
@@ -124,7 +120,7 @@ impl Cursor {
match page.header.right_most_pointer {
Some(right_most_pointer) => {
let mem_page = MemPage::new(parent.clone(), right_most_pointer as usize, 0);
self.page.replace(Some(Arc::new(mem_page)));
self.page.replace(Some(Rc::new(mem_page)));
continue;
}
None => match parent {
@@ -147,7 +143,7 @@ impl Cursor {
mem_page.advance();
let mem_page =
MemPage::new(Some(mem_page.clone()), *_left_child_page as usize, 0);
self.page.replace(Some(Arc::new(mem_page)));
self.page.replace(Some(Rc::new(mem_page)));
continue;
}
BTreeCell::TableLeafCell(TableLeafCell { _rowid, _payload }) => {

View File

@@ -1,6 +1,6 @@
use super::{Completion, File, IO};
use anyhow::{Ok, Result};
use std::sync::Arc;
use std::rc::Rc;
use std::cell::RefCell;
use std::io::{Read, Seek};
use log::trace;
@@ -32,7 +32,7 @@ pub struct DarwinFile {
}
impl File for DarwinFile {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()> {
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
{

View File

@@ -3,7 +3,6 @@ use anyhow::Result;
use std::cell::RefCell;
use std::os::unix::io::AsRawFd;
use std::rc::Rc;
use std::sync::Arc;
use log::trace;
pub struct LinuxIO {
@@ -36,7 +35,7 @@ impl IO for LinuxIO {
loop {
match ring.completion().next() {
Some(cqe) => {
let c = unsafe { Arc::from_raw(cqe.user_data() as *const Completion) };
let c = unsafe { Rc::from_raw(cqe.user_data() as *const Completion) };
c.complete();
}
None => break,
@@ -52,14 +51,14 @@ pub struct LinuxFile {
}
impl File for LinuxFile {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()> {
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
trace!("pread(pos = {}, length = {})", pos, c.buf().len());
let fd = io_uring::types::Fd(self.file.as_raw_fd());
let read_e = {
let mut buf = c.buf_mut();
let len = buf.len();
let buf = buf.as_mut_ptr();
let ptr = Arc::into_raw(c.clone());
let ptr = Rc::into_raw(c.clone());
io_uring::opcode::Read::new(fd, buf, len as u32 )
.offset(pos as u64)
.build()

View File

@@ -4,11 +4,11 @@ use std::{
cell::{Ref, RefCell, RefMut},
mem::ManuallyDrop,
pin::Pin,
sync::Arc,
rc::Rc,
};
pub trait File {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()>;
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()>;
}
pub trait IO {
@@ -46,7 +46,7 @@ impl Completion {
pub type BufferData = Pin<Vec<u8>>;
pub type BufferDropFn = Arc<dyn Fn(BufferData)>;
pub type BufferDropFn = Rc<dyn Fn(BufferData)>;
pub struct Buffer {
data: ManuallyDrop<BufferData>,

View File

@@ -1,6 +1,6 @@
use super::{Completion, File, IO};
use anyhow::{Ok, Result};
use std::sync::Arc;
use std::rc::Rc;
use std::cell::RefCell;
use std::io::{Read, Seek};
use log::trace;
@@ -32,7 +32,7 @@ pub struct WindowsFile {
}
impl File for WindowsFile {
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()> {
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
{

View File

@@ -18,7 +18,7 @@ use fallible_iterator::FallibleIterator;
use pager::Pager;
use schema::Schema;
use sqlite3_parser::{ast::Cmd, lexer::sql::Parser};
use std::sync::Arc;
use std::rc::Rc;
#[cfg(feature = "fs")]
pub use io::PlatformIO;
@@ -27,23 +27,23 @@ pub use storage::{PageIO, PageSource};
pub use types::Value;
pub struct Database {
pager: Arc<Pager>,
schema: Arc<Schema>,
pager: Rc<Pager>,
schema: Rc<Schema>,
}
impl Database {
#[cfg(feature = "fs")]
pub fn open_file(io: Arc<dyn crate::io::IO>, path: &str) -> Result<Database> {
pub fn open_file(io: Rc<dyn crate::io::IO>, path: &str) -> Result<Database> {
let file = io.open_file(path)?;
let storage = storage::PageSource::from_file(file);
Self::open(io, storage)
}
pub fn open(io: Arc<dyn crate::io::IO>, page_source: PageSource) -> Result<Database> {
pub fn open(io: Rc<dyn crate::io::IO>, page_source: PageSource) -> Result<Database> {
let db_header = Pager::begin_open(&page_source)?;
io.run_once()?;
let pager = Arc::new(Pager::finish_open(db_header, page_source)?);
let bootstrap_schema = Arc::new(Schema::new());
let pager = Rc::new(Pager::finish_open(db_header, page_source)?);
let bootstrap_schema = Rc::new(Schema::new());
let conn = Connection {
pager: pager.clone(),
schema: bootstrap_schema.clone(),
@@ -74,7 +74,7 @@ impl Database {
}
}
}
let schema = Arc::new(schema);
let schema = Rc::new(schema);
Ok(Database { pager, schema })
}
@@ -87,8 +87,8 @@ impl Database {
}
pub struct Connection {
pager: Arc<Pager>,
schema: Arc<Schema>,
pager: Rc<Pager>,
schema: Rc<Schema>,
}
impl Connection {
@@ -99,7 +99,7 @@ impl Connection {
if let Some(cmd) = cmd {
match cmd {
Cmd::Stmt(stmt) => {
let program = Arc::new(translate::translate(&self.schema, stmt)?);
let program = Rc::new(translate::translate(&self.schema, stmt)?);
Ok(Statement::new(program, self.pager.clone()))
}
Cmd::Explain(_stmt) => todo!(),
@@ -117,7 +117,7 @@ impl Connection {
if let Some(cmd) = cmd {
match cmd {
Cmd::Stmt(stmt) => {
let program = Arc::new(translate::translate(&self.schema, stmt)?);
let program = Rc::new(translate::translate(&self.schema, stmt)?);
let stmt = Statement::new(program, self.pager.clone());
Ok(Some(Rows { stmt }))
}
@@ -156,13 +156,13 @@ impl Connection {
}
pub struct Statement {
program: Arc<vdbe::Program>,
program: Rc<vdbe::Program>,
state: vdbe::ProgramState,
pager: Arc<Pager>,
pager: Rc<Pager>,
}
impl Statement {
pub fn new(program: Arc<vdbe::Program>, pager: Arc<Pager>) -> Self {
pub fn new(program: Rc<vdbe::Program>, pager: Rc<Pager>) -> Self {
let state = vdbe::ProgramState::new(program.max_registers);
Self {
program,
@@ -174,15 +174,9 @@ impl Statement {
pub fn step(&mut self) -> Result<RowResult<'_>> {
let result = self.program.step(&mut self.state, self.pager.clone())?;
match result {
vdbe::StepResult::Row(row) => {
Ok(RowResult::Row(Row { values: row.values }))
}
vdbe::StepResult::IO => {
Ok(RowResult::IO)
}
vdbe::StepResult::Done => {
Ok(RowResult::Done)
}
vdbe::StepResult::Row(row) => Ok(RowResult::Row(Row { values: row.values })),
vdbe::StepResult::IO => Ok(RowResult::IO),
vdbe::StepResult::Done => Ok(RowResult::Done),
}
}

View File

@@ -2,14 +2,12 @@ use crate::buffer_pool::BufferPool;
use crate::sqlite3_ondisk::BTreePage;
use crate::sqlite3_ondisk::{self, DatabaseHeader};
use crate::PageSource;
use concurrent_lru::unsharded::LruCache;
use log::trace;
use sieve_cache::SieveCache;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::RwLock;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
pub struct Page {
flags: AtomicUsize,
@@ -70,23 +68,23 @@ impl Page {
pub struct Pager {
page_source: PageSource,
page_cache: LruCache<usize, Arc<Page>>,
buffer_pool: Arc<BufferPool>,
page_cache: RefCell<SieveCache<usize, Rc<Page>>>,
buffer_pool: Rc<BufferPool>,
}
impl Pager {
pub fn begin_open(page_source: &PageSource) -> anyhow::Result<Arc<RefCell<DatabaseHeader>>> {
pub fn begin_open(page_source: &PageSource) -> anyhow::Result<Rc<RefCell<DatabaseHeader>>> {
sqlite3_ondisk::begin_read_database_header(page_source)
}
pub fn finish_open(
db_header: Arc<RefCell<DatabaseHeader>>,
db_header: Rc<RefCell<DatabaseHeader>>,
page_source: PageSource,
) -> anyhow::Result<Self> {
let db_header = db_header.borrow();
let page_size = db_header.page_size as usize;
let buffer_pool = Arc::new(BufferPool::new(page_size));
let page_cache = LruCache::new(10);
let buffer_pool = Rc::new(BufferPool::new(page_size));
let page_cache = RefCell::new(SieveCache::new(10).unwrap());
Ok(Self {
page_source,
buffer_pool,
@@ -94,20 +92,28 @@ impl Pager {
})
}
pub fn read_page(&self, page_idx: usize) -> anyhow::Result<Arc<Page>> {
pub fn read_page(&self, page_idx: usize) -> anyhow::Result<Rc<Page>> {
trace!("read_page(page_idx = {})", page_idx);
let mut page_cache = self.page_cache.borrow_mut();
if let Some(page) = page_cache.get(&page_idx) {
return Ok(page.clone());
}
let page = Rc::new(Page::new());
page.set_locked();
sqlite3_ondisk::begin_read_btree_page(
&self.page_source,
self.buffer_pool.clone(),
page.clone(),
page_idx,
)
.unwrap();
page_cache.insert(page_idx, page.clone());
Ok(page)
/*
let handle = self.page_cache.get_or_try_init(page_idx, 1, |_idx| {
let page = Arc::new(Page::new());
page.set_locked();
sqlite3_ondisk::begin_read_btree_page(
&self.page_source,
self.buffer_pool.clone(),
page.clone(),
page_idx,
)
.unwrap();
Ok::<Arc<Page>, anyhow::Error>(page)
Ok::<Rc<Page>, anyhow::Error>(page)
})?;
Ok(handle.value().clone())
*/
}
}

View File

@@ -31,7 +31,7 @@ use crate::PageSource;
use anyhow::{anyhow, Result};
use log::trace;
use std::cell::RefCell;
use std::sync::Arc;
use std::rc::Rc;
/// The size of the database header in bytes.
pub const DATABASE_HEADER_SIZE: usize = 100;
@@ -63,23 +63,21 @@ pub struct DatabaseHeader {
version_number: u32,
}
pub fn begin_read_database_header(
page_source: &PageSource,
) -> Result<Arc<RefCell<DatabaseHeader>>> {
let drop_fn = Arc::new(|_buf| {});
pub fn begin_read_database_header(page_source: &PageSource) -> Result<Rc<RefCell<DatabaseHeader>>> {
let drop_fn = Rc::new(|_buf| {});
let buf = Buffer::allocate(512, drop_fn);
let result = Arc::new(RefCell::new(DatabaseHeader::default()));
let result = Rc::new(RefCell::new(DatabaseHeader::default()));
let header = result.clone();
let complete = Box::new(move |buf: &Buffer| {
let header = header.clone();
finish_read_database_header(buf, header).unwrap();
});
let c = Arc::new(Completion::new(buf, complete));
let c = Rc::new(Completion::new(buf, complete));
page_source.get(1, c.clone())?;
Ok(result)
}
fn finish_read_database_header(buf: &Buffer, header: Arc<RefCell<DatabaseHeader>>) -> Result<()> {
fn finish_read_database_header(buf: &Buffer, header: Rc<RefCell<DatabaseHeader>>) -> Result<()> {
let buf = buf.as_slice();
let mut header = header.borrow_mut();
header.magic.copy_from_slice(&buf[0..16]);
@@ -149,13 +147,13 @@ pub struct BTreePage {
pub fn begin_read_btree_page(
page_source: &PageSource,
buffer_pool: Arc<BufferPool>,
page: Arc<Page>,
buffer_pool: Rc<BufferPool>,
page: Rc<Page>,
page_idx: usize,
) -> Result<()> {
trace!("begin_read_btree_page(page_idx = {})", page_idx);
let buf = buffer_pool.get();
let drop_fn = Arc::new(move |buf| {
let drop_fn = Rc::new(move |buf| {
let buffer_pool = buffer_pool.clone();
buffer_pool.put(buf);
});
@@ -166,12 +164,12 @@ pub fn begin_read_btree_page(
page.set_error();
}
});
let c = Arc::new(Completion::new(buf, complete));
let c = Rc::new(Completion::new(buf, complete));
page_source.get(page_idx, c.clone())?;
Ok(())
}
fn finish_read_btree_page(page_idx: usize, buf: &Buffer, page: Arc<Page>) -> Result<()> {
fn finish_read_btree_page(page_idx: usize, buf: &Buffer, page: Rc<Page>) -> Result<()> {
trace!("finish_read_btree_page(page_idx = {})", page_idx);
let mut pos = if page_idx == 1 {
DATABASE_HEADER_SIZE

View File

@@ -2,31 +2,31 @@ use crate::io::Completion;
#[cfg(feature = "fs")]
use crate::io::File;
use anyhow::Result;
use std::sync::Arc;
use std::rc::Rc;
pub struct PageSource {
io: Arc<dyn PageIO>,
io: Rc<dyn PageIO>,
}
impl PageSource {
pub fn from_io(io: Arc<dyn PageIO>) -> Self {
pub fn from_io(io: Rc<dyn PageIO>) -> Self {
Self { io }
}
#[cfg(feature = "fs")]
pub fn from_file(file: Box<dyn File>) -> Self {
Self {
io: Arc::new(FileStorage::new(file)),
io: Rc::new(FileStorage::new(file)),
}
}
pub fn get(&self, page_idx: usize, c: Arc<Completion>) -> Result<()> {
pub fn get(&self, page_idx: usize, c: Rc<Completion>) -> Result<()> {
self.io.get(page_idx, c)
}
}
pub trait PageIO {
fn get(&self, page_idx: usize, c: Arc<Completion>) -> Result<()>;
fn get(&self, page_idx: usize, c: Rc<Completion>) -> Result<()>;
}
#[cfg(feature = "fs")]
@@ -36,7 +36,7 @@ struct FileStorage {
#[cfg(feature = "fs")]
impl PageIO for FileStorage {
fn get(&self, page_idx: usize, c: Arc<Completion>) -> Result<()> {
fn get(&self, page_idx: usize, c: Rc<Completion>) -> Result<()> {
let page_size = c.buf().len();
assert!(page_idx > 0);
assert!(page_size >= 512);

View File

@@ -5,7 +5,7 @@ use crate::types::{OwnedValue, Record};
use anyhow::Result;
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::rc::Rc;
pub type BranchOffset = usize;
@@ -195,7 +195,7 @@ impl Program {
pub fn step<'a>(
&self,
state: &'a mut ProgramState,
pager: Arc<Pager>,
pager: Rc<Pager>,
) -> Result<StepResult<'a>> {
loop {
let insn = &self.insns[state.pc];