mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-16 05:24:22 +01:00
Merge 'Add support for OpenEphemeral bytecode' from Diego Reis
First ~actual~ step to close #741, this time I'm following an approach way closer to what SQLite does by just creating a normal `BTreeCursor` and using an in-memory pager (thanks @krishvishal for the insight 🤝). Reviewed-by: Pere Diaz Bou <pere-altea@homail.com> Closes #1139
This commit is contained in:
@@ -176,4 +176,8 @@ impl limbo_core::IO for IO {
|
||||
fn generate_random_number(&self) -> i64 {
|
||||
todo!();
|
||||
}
|
||||
|
||||
fn get_memory_io(&self) -> Arc<limbo_core::MemoryIO> {
|
||||
Arc::new(limbo_core::MemoryIO::new())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -305,6 +305,10 @@ impl limbo_core::IO for PlatformIO {
|
||||
let random_f64 = Math_random();
|
||||
(random_f64 * i64::MAX as f64) as i64
|
||||
}
|
||||
|
||||
fn get_memory_io(&self) -> Arc<limbo_core::MemoryIO> {
|
||||
Arc::new(limbo_core::MemoryIO::new())
|
||||
}
|
||||
}
|
||||
|
||||
#[wasm_bindgen]
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use super::MemoryIO;
|
||||
use crate::{Clock, Completion, File, Instant, LimboError, OpenFlags, Result, IO};
|
||||
use std::cell::RefCell;
|
||||
use std::io::{Read, Seek, Write};
|
||||
@@ -26,6 +27,7 @@ impl IO for GenericIO {
|
||||
.open(path)?;
|
||||
Ok(Arc::new(GenericFile {
|
||||
file: RefCell::new(file),
|
||||
memory_io: Arc::new(MemoryIO::new()),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -38,6 +40,10 @@ impl IO for GenericIO {
|
||||
getrandom::getrandom(&mut buf).unwrap();
|
||||
i64::from_ne_bytes(buf)
|
||||
}
|
||||
|
||||
fn get_memory_io(&self) -> Arc<MemoryIO> {
|
||||
Arc::new(MemoryIO::new())
|
||||
}
|
||||
}
|
||||
|
||||
impl Clock for GenericIO {
|
||||
@@ -52,6 +58,7 @@ impl Clock for GenericIO {
|
||||
|
||||
pub struct GenericFile {
|
||||
file: RefCell<std::fs::File>,
|
||||
memory_io: Arc<MemoryIO>,
|
||||
}
|
||||
|
||||
unsafe impl Send for GenericFile {}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use super::{common, Completion, File, OpenFlags, WriteCompletion, IO};
|
||||
use crate::{LimboError, Result};
|
||||
use crate::io::clock::{Clock, Instant};
|
||||
use crate::{LimboError, MemoryIO, Result};
|
||||
use rustix::fs::{self, FlockOperation, OFlags};
|
||||
use rustix::io_uring::iovec;
|
||||
use std::cell::RefCell;
|
||||
@@ -11,7 +12,6 @@ use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
use tracing::{debug, trace};
|
||||
use crate::io::clock::{Clock, Instant};
|
||||
|
||||
const MAX_IOVECS: u32 = 128;
|
||||
const SQPOLL_IDLE: u32 = 1000;
|
||||
@@ -197,6 +197,10 @@ impl IO for UringIO {
|
||||
getrandom::getrandom(&mut buf).unwrap();
|
||||
i64::from_ne_bytes(buf)
|
||||
}
|
||||
|
||||
fn get_memory_io(&self) -> Arc<MemoryIO> {
|
||||
Arc::new(MemoryIO::new())
|
||||
}
|
||||
}
|
||||
|
||||
impl Clock for UringIO {
|
||||
|
||||
@@ -58,6 +58,10 @@ impl IO for MemoryIO {
|
||||
getrandom::getrandom(&mut buf).unwrap();
|
||||
i64::from_ne_bytes(buf)
|
||||
}
|
||||
|
||||
fn get_memory_io(&self) -> Arc<MemoryIO> {
|
||||
Arc::new(MemoryIO::new())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MemoryFile {
|
||||
|
||||
@@ -40,6 +40,8 @@ pub trait IO: Clock + Send + Sync {
|
||||
fn run_once(&self) -> Result<()>;
|
||||
|
||||
fn generate_random_number(&self) -> i64;
|
||||
|
||||
fn get_memory_io(&self) -> Arc<MemoryIO>;
|
||||
}
|
||||
|
||||
pub type Complete = dyn Fn(Arc<RefCell<Buffer>>);
|
||||
|
||||
@@ -2,7 +2,7 @@ use crate::error::LimboError;
|
||||
use crate::io::common;
|
||||
use crate::Result;
|
||||
|
||||
use super::{Completion, File, OpenFlags, IO};
|
||||
use super::{Completion, File, MemoryIO, OpenFlags, IO};
|
||||
use polling::{Event, Events, Poller};
|
||||
use rustix::{
|
||||
fd::{AsFd, AsRawFd},
|
||||
@@ -258,6 +258,10 @@ impl IO for UnixIO {
|
||||
getrandom::getrandom(&mut buf).unwrap();
|
||||
i64::from_ne_bytes(buf)
|
||||
}
|
||||
|
||||
fn get_memory_io(&self) -> Arc<MemoryIO> {
|
||||
Arc::new(MemoryIO::new())
|
||||
}
|
||||
}
|
||||
|
||||
enum CompletionCallback {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use super::{Buffer, Completion, File, OpenFlags, IO};
|
||||
use super::{Buffer, Completion, File, MemoryIO, OpenFlags, IO};
|
||||
use crate::ext::VfsMod;
|
||||
use crate::io::clock::{Clock, Instant};
|
||||
use crate::{LimboError, Result};
|
||||
@@ -50,6 +50,10 @@ impl IO for VfsMod {
|
||||
let vfs = unsafe { &*self.ctx };
|
||||
unsafe { (vfs.gen_random_number)() }
|
||||
}
|
||||
|
||||
fn get_memory_io(&self) -> Arc<MemoryIO> {
|
||||
Arc::new(MemoryIO::new())
|
||||
}
|
||||
}
|
||||
|
||||
impl VfsMod {
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use super::MemoryIO;
|
||||
use crate::{Clock, Completion, File, Instant, LimboError, OpenFlags, Result, IO};
|
||||
use std::cell::RefCell;
|
||||
use std::io::{Read, Seek, Write};
|
||||
use std::sync::Arc;
|
||||
use tracing::{debug, trace};
|
||||
|
||||
pub struct WindowsIO {}
|
||||
|
||||
impl WindowsIO {
|
||||
@@ -38,6 +38,10 @@ impl IO for WindowsIO {
|
||||
getrandom::getrandom(&mut buf).unwrap();
|
||||
i64::from_ne_bytes(buf)
|
||||
}
|
||||
|
||||
fn get_memory_io(&self) -> Arc<MemoryIO> {
|
||||
Arc::new(MemoryIO::new())
|
||||
}
|
||||
}
|
||||
|
||||
impl Clock for WindowsIO {
|
||||
|
||||
@@ -183,7 +183,7 @@ impl Database {
|
||||
let pager = Rc::new(Pager::finish_open(
|
||||
self.header.clone(),
|
||||
self.db_file.clone(),
|
||||
wal,
|
||||
Some(wal),
|
||||
self.io.clone(),
|
||||
self.shared_page_cache.clone(),
|
||||
buffer_pool,
|
||||
|
||||
@@ -5012,7 +5012,7 @@ mod tests {
|
||||
let page_cache = Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(10)));
|
||||
let pager = {
|
||||
let db_header = Arc::new(SpinLock::new(db_header.clone()));
|
||||
Pager::finish_open(db_header, db_file, wal, io, page_cache, buffer_pool).unwrap()
|
||||
Pager::finish_open(db_header, db_file, Some(wal), io, page_cache, buffer_pool).unwrap()
|
||||
};
|
||||
let pager = Rc::new(pager);
|
||||
let page1 = pager.allocate_page().unwrap();
|
||||
@@ -5329,7 +5329,7 @@ mod tests {
|
||||
Pager::finish_open(
|
||||
db_header.clone(),
|
||||
db_file,
|
||||
wal,
|
||||
Some(wal),
|
||||
io,
|
||||
Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(10))),
|
||||
buffer_pool,
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
#[cfg(feature = "fs")]
|
||||
use crate::error::LimboError;
|
||||
use crate::{io::Completion, Buffer, Result};
|
||||
use std::{cell::RefCell, sync::Arc};
|
||||
@@ -70,3 +69,52 @@ impl DatabaseFile {
|
||||
Self { file }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FileMemoryStorage {
|
||||
file: Arc<dyn crate::io::File>,
|
||||
}
|
||||
|
||||
unsafe impl Send for FileMemoryStorage {}
|
||||
unsafe impl Sync for FileMemoryStorage {}
|
||||
|
||||
impl DatabaseStorage for FileMemoryStorage {
|
||||
fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> {
|
||||
let r = match c {
|
||||
Completion::Read(ref r) => r,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let size = r.buf().len();
|
||||
assert!(page_idx > 0);
|
||||
if !(512..=65536).contains(&size) || size & (size - 1) != 0 {
|
||||
return Err(LimboError::NotADB);
|
||||
}
|
||||
let pos = (page_idx - 1) * size;
|
||||
self.file.pread(pos, c)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_page(
|
||||
&self,
|
||||
page_idx: usize,
|
||||
buffer: Arc<RefCell<Buffer>>,
|
||||
c: Completion,
|
||||
) -> Result<()> {
|
||||
let buffer_size = buffer.borrow().len();
|
||||
assert!(buffer_size >= 512);
|
||||
assert!(buffer_size <= 65536);
|
||||
assert_eq!(buffer_size & (buffer_size - 1), 0);
|
||||
let pos = (page_idx - 1) * buffer_size;
|
||||
self.file.pwrite(pos, buffer, c)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sync(&self, c: Completion) -> Result<()> {
|
||||
self.file.sync(c)
|
||||
}
|
||||
}
|
||||
|
||||
impl FileMemoryStorage {
|
||||
pub fn new(file: Arc<dyn crate::io::File>) -> Self {
|
||||
Self { file }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,7 +157,7 @@ pub struct Pager {
|
||||
/// Source of the database pages.
|
||||
pub db_file: Arc<dyn DatabaseStorage>,
|
||||
/// The write-ahead log (WAL) for the database.
|
||||
wal: Rc<RefCell<dyn Wal>>,
|
||||
wal: Option<Rc<RefCell<dyn Wal>>>,
|
||||
/// A page cache for the database.
|
||||
page_cache: Arc<RwLock<DumbLruPageCache>>,
|
||||
/// Buffer pool for temporary data storage.
|
||||
@@ -183,7 +183,7 @@ impl Pager {
|
||||
pub fn finish_open(
|
||||
db_header_ref: Arc<SpinLock<DatabaseHeader>>,
|
||||
db_file: Arc<dyn DatabaseStorage>,
|
||||
wal: Rc<RefCell<dyn Wal>>,
|
||||
wal: Option<Rc<RefCell<dyn Wal>>>,
|
||||
io: Arc<dyn crate::io::IO>,
|
||||
page_cache: Arc<RwLock<DumbLruPageCache>>,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
@@ -241,28 +241,42 @@ impl Pager {
|
||||
|
||||
#[inline(always)]
|
||||
pub fn begin_read_tx(&self) -> Result<LimboResult> {
|
||||
self.wal.borrow_mut().begin_read_tx()
|
||||
if let Some(wal) = &self.wal {
|
||||
return wal.borrow_mut().begin_read_tx();
|
||||
}
|
||||
|
||||
Ok(LimboResult::Ok)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn begin_write_tx(&self) -> Result<LimboResult> {
|
||||
self.wal.borrow_mut().begin_write_tx()
|
||||
if let Some(wal) = &self.wal {
|
||||
return wal.borrow_mut().begin_write_tx();
|
||||
}
|
||||
|
||||
Ok(LimboResult::Ok)
|
||||
}
|
||||
|
||||
pub fn end_tx(&self) -> Result<CheckpointStatus> {
|
||||
let checkpoint_status = self.cacheflush()?;
|
||||
match checkpoint_status {
|
||||
CheckpointStatus::IO => Ok(checkpoint_status),
|
||||
CheckpointStatus::Done(_) => {
|
||||
self.wal.borrow().end_write_tx()?;
|
||||
self.wal.borrow().end_read_tx()?;
|
||||
Ok(checkpoint_status)
|
||||
}
|
||||
if let Some(wal) = &self.wal {
|
||||
let checkpoint_status = self.cacheflush()?;
|
||||
return match checkpoint_status {
|
||||
CheckpointStatus::IO => Ok(checkpoint_status),
|
||||
CheckpointStatus::Done(_) => {
|
||||
wal.borrow().end_write_tx()?;
|
||||
wal.borrow().end_read_tx()?;
|
||||
Ok(checkpoint_status)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Ok(CheckpointStatus::Done(CheckpointResult::default()))
|
||||
}
|
||||
|
||||
pub fn end_read_tx(&self) -> Result<()> {
|
||||
self.wal.borrow().end_read_tx()?;
|
||||
if let Some(wal) = &self.wal {
|
||||
wal.borrow().end_read_tx()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -270,7 +284,11 @@ impl Pager {
|
||||
pub fn read_page(&self, page_idx: usize) -> Result<PageRef> {
|
||||
tracing::trace!("read_page(page_idx = {})", page_idx);
|
||||
let mut page_cache = self.page_cache.write();
|
||||
let page_key = PageCacheKey::new(page_idx, Some(self.wal.borrow().get_max_frame()));
|
||||
let max_frame = match &self.wal {
|
||||
Some(wal) => wal.borrow().get_max_frame(),
|
||||
None => 0,
|
||||
};
|
||||
let page_key = PageCacheKey::new(page_idx, Some(max_frame));
|
||||
if let Some(page) = page_cache.get(&page_key) {
|
||||
tracing::trace!("read_page(page_idx = {}) = cached", page_idx);
|
||||
return Ok(page.clone());
|
||||
@@ -278,17 +296,18 @@ impl Pager {
|
||||
let page = Arc::new(Page::new(page_idx));
|
||||
page.set_locked();
|
||||
|
||||
if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? {
|
||||
self.wal
|
||||
.borrow()
|
||||
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
|
||||
{
|
||||
page.set_uptodate();
|
||||
if let Some(wal) = &self.wal {
|
||||
if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64)? {
|
||||
wal.borrow()
|
||||
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
|
||||
{
|
||||
page.set_uptodate();
|
||||
}
|
||||
// TODO(pere) ensure page is inserted, we should probably first insert to page cache
|
||||
// and if successful, read frame or page
|
||||
page_cache.insert(page_key, page.clone());
|
||||
return Ok(page);
|
||||
}
|
||||
// TODO(pere) ensure page is inserted, we should probably first insert to page cache
|
||||
// and if successful, read frame or page
|
||||
page_cache.insert(page_key, page.clone());
|
||||
return Ok(page);
|
||||
}
|
||||
sqlite3_ondisk::begin_read_page(
|
||||
self.db_file.clone(),
|
||||
@@ -307,19 +326,29 @@ impl Pager {
|
||||
trace!("load_page(page_idx = {})", id);
|
||||
let mut page_cache = self.page_cache.write();
|
||||
page.set_locked();
|
||||
let page_key = PageCacheKey::new(id, Some(self.wal.borrow().get_max_frame()));
|
||||
if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? {
|
||||
self.wal
|
||||
.borrow()
|
||||
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
|
||||
{
|
||||
page.set_uptodate();
|
||||
let max_frame = match &self.wal {
|
||||
Some(wal) => wal.borrow().get_max_frame(),
|
||||
None => 0,
|
||||
};
|
||||
let page_key = PageCacheKey::new(id, Some(max_frame));
|
||||
if let Some(wal) = &self.wal {
|
||||
if let Some(frame_id) = wal.borrow().find_frame(id as u64)? {
|
||||
wal.borrow()
|
||||
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
|
||||
{
|
||||
page.set_uptodate();
|
||||
}
|
||||
// TODO(pere) ensure page is inserted
|
||||
if !page_cache.contains_key(&page_key) {
|
||||
page_cache.insert(page_key, page.clone());
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
// TODO(pere) ensure page is inserted
|
||||
if !page_cache.contains_key(&page_key) {
|
||||
page_cache.insert(page_key, page.clone());
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// TODO(pere) ensure page is inserted
|
||||
if !page_cache.contains_key(&page_key) {
|
||||
page_cache.insert(page_key, page.clone());
|
||||
}
|
||||
sqlite3_ondisk::begin_read_page(
|
||||
self.db_file.clone(),
|
||||
@@ -327,10 +356,7 @@ impl Pager {
|
||||
page.clone(),
|
||||
id,
|
||||
)?;
|
||||
// TODO(pere) ensure page is inserted
|
||||
if !page_cache.contains_key(&page_key) {
|
||||
page_cache.insert(page_key, page.clone());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -359,18 +385,23 @@ impl Pager {
|
||||
match state {
|
||||
FlushState::Start => {
|
||||
let db_size = self.db_header.lock().database_size;
|
||||
let max_frame = match &self.wal {
|
||||
Some(wal) => wal.borrow().get_max_frame(),
|
||||
None => 0,
|
||||
};
|
||||
for page_id in self.dirty_pages.borrow().iter() {
|
||||
let mut cache = self.page_cache.write();
|
||||
let page_key =
|
||||
PageCacheKey::new(*page_id, Some(self.wal.borrow().get_max_frame()));
|
||||
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
|
||||
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
|
||||
trace!("cacheflush(page={}, page_type={:?}", page_id, page_type);
|
||||
self.wal.borrow_mut().append_frame(
|
||||
page.clone(),
|
||||
db_size,
|
||||
self.flush_info.borrow().in_flight_writes.clone(),
|
||||
)?;
|
||||
let page_key = PageCacheKey::new(*page_id, Some(max_frame));
|
||||
if let Some(wal) = &self.wal {
|
||||
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
|
||||
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
|
||||
trace!("cacheflush(page={}, page_type={:?}", page_id, page_type);
|
||||
wal.borrow_mut().append_frame(
|
||||
page.clone(),
|
||||
db_size,
|
||||
self.flush_info.borrow().in_flight_writes.clone(),
|
||||
)?;
|
||||
}
|
||||
// This page is no longer valid.
|
||||
// For example:
|
||||
// We took page with key (page_num, max_frame) -- this page is no longer valid for that max_frame so it must be invalidated.
|
||||
@@ -389,13 +420,16 @@ impl Pager {
|
||||
}
|
||||
}
|
||||
FlushState::SyncWal => {
|
||||
match self.wal.borrow_mut().sync() {
|
||||
let wal = self.wal.clone().ok_or(LimboError::InternalError(
|
||||
"SyncWal was called without a existing wal".to_string(),
|
||||
))?;
|
||||
match wal.borrow_mut().sync() {
|
||||
Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO),
|
||||
Ok(CheckpointStatus::Done(res)) => checkpoint_result = res,
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
|
||||
let should_checkpoint = self.wal.borrow().should_checkpoint();
|
||||
let should_checkpoint = wal.borrow().should_checkpoint();
|
||||
if should_checkpoint {
|
||||
self.flush_info.borrow_mut().state = FlushState::Checkpoint;
|
||||
} else {
|
||||
@@ -437,11 +471,13 @@ impl Pager {
|
||||
match state {
|
||||
CheckpointState::Checkpoint => {
|
||||
let in_flight = self.checkpoint_inflight.clone();
|
||||
match self.wal.borrow_mut().checkpoint(
|
||||
self,
|
||||
in_flight,
|
||||
CheckpointMode::Passive,
|
||||
)? {
|
||||
let wal = self.wal.clone().ok_or(LimboError::InternalError(
|
||||
"Checkpoint was called without a existing wal".to_string(),
|
||||
))?;
|
||||
match wal
|
||||
.borrow_mut()
|
||||
.checkpoint(self, in_flight, CheckpointMode::Passive)?
|
||||
{
|
||||
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
|
||||
CheckpointStatus::Done(res) => {
|
||||
checkpoint_result = res;
|
||||
@@ -478,7 +514,7 @@ impl Pager {
|
||||
pub fn clear_page_cache(&self) -> CheckpointResult {
|
||||
let checkpoint_result: CheckpointResult;
|
||||
loop {
|
||||
match self.wal.borrow_mut().checkpoint(
|
||||
match self.wal.clone().unwrap().borrow_mut().checkpoint(
|
||||
self,
|
||||
Rc::new(RefCell::new(0)),
|
||||
CheckpointMode::Passive,
|
||||
@@ -603,8 +639,12 @@ impl Pager {
|
||||
page.set_dirty();
|
||||
self.add_dirty(page.get().id);
|
||||
let mut cache = self.page_cache.write();
|
||||
let page_key =
|
||||
PageCacheKey::new(page.get().id, Some(self.wal.borrow().get_max_frame()));
|
||||
let max_frame = match &self.wal {
|
||||
Some(wal) => wal.borrow().get_max_frame(),
|
||||
None => 0,
|
||||
};
|
||||
|
||||
let page_key = PageCacheKey::new(page.get().id, Some(max_frame));
|
||||
cache.insert(page_key, page.clone());
|
||||
}
|
||||
Ok(page)
|
||||
@@ -613,7 +653,11 @@ impl Pager {
|
||||
pub fn put_loaded_page(&self, id: usize, page: PageRef) {
|
||||
let mut cache = self.page_cache.write();
|
||||
// cache insert invalidates previous page
|
||||
let page_key = PageCacheKey::new(id, Some(self.wal.borrow().get_max_frame()));
|
||||
let max_frame = match &self.wal {
|
||||
Some(wal) => wal.borrow().get_max_frame(),
|
||||
None => 0,
|
||||
};
|
||||
let page_key = PageCacheKey::new(id, Some(max_frame));
|
||||
cache.insert(page_key, page.clone());
|
||||
page.set_loaded();
|
||||
}
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
#![allow(unused_variables)]
|
||||
use crate::storage::database::FileMemoryStorage;
|
||||
use crate::storage::page_cache::DumbLruPageCache;
|
||||
use crate::storage::pager::CreateBTreeFlags;
|
||||
use crate::{
|
||||
error::{LimboError, SQLITE_CONSTRAINT, SQLITE_CONSTRAINT_PRIMARYKEY},
|
||||
ext::ExtValue,
|
||||
@@ -10,7 +13,7 @@ use crate::{
|
||||
printf::exec_printf,
|
||||
},
|
||||
};
|
||||
use std::{borrow::BorrowMut, rc::Rc};
|
||||
use std::{borrow::BorrowMut, rc::Rc, sync::Arc};
|
||||
|
||||
use crate::{pseudo::PseudoCursor, result::LimboResult};
|
||||
|
||||
@@ -36,12 +39,16 @@ use crate::{
|
||||
vector::{vector32, vector64, vector_distance_cos, vector_extract},
|
||||
};
|
||||
|
||||
use crate::{info, MvCursor, RefValue, Row, StepResult, TransactionState};
|
||||
use crate::{
|
||||
info, maybe_init_database_file, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult,
|
||||
TransactionState, IO,
|
||||
};
|
||||
|
||||
use super::{
|
||||
insn::{Cookie, RegisterOrLiteral},
|
||||
HaltState,
|
||||
};
|
||||
use parking_lot::RwLock;
|
||||
use rand::thread_rng;
|
||||
|
||||
use super::{
|
||||
@@ -4504,6 +4511,95 @@ pub fn op_noop(
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
pub fn op_open_ephemeral(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
insn: &Insn,
|
||||
pager: &Rc<Pager>,
|
||||
mv_store: Option<&Rc<MvStore>>,
|
||||
) -> Result<InsnFunctionStepResult> {
|
||||
let Insn::OpenEphemeral {
|
||||
cursor_id,
|
||||
is_table,
|
||||
} = insn
|
||||
else {
|
||||
unreachable!("unexpected Insn {:?}", insn)
|
||||
};
|
||||
|
||||
let conn = program.connection.upgrade().unwrap();
|
||||
let io = conn.pager.io.get_memory_io();
|
||||
|
||||
let file = io.open_file("", OpenFlags::Create, true)?;
|
||||
maybe_init_database_file(&file, &(io.clone() as Arc<dyn IO>))?;
|
||||
let db_file = Arc::new(FileMemoryStorage::new(file));
|
||||
|
||||
let db_header = Pager::begin_open(db_file.clone())?;
|
||||
let buffer_pool = Rc::new(BufferPool::new(db_header.lock().page_size as usize));
|
||||
let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10)));
|
||||
|
||||
let pager = Rc::new(Pager::finish_open(
|
||||
db_header,
|
||||
db_file,
|
||||
None,
|
||||
io,
|
||||
page_cache,
|
||||
buffer_pool,
|
||||
)?);
|
||||
|
||||
let flag = if *is_table {
|
||||
&CreateBTreeFlags::new_table()
|
||||
} else {
|
||||
&CreateBTreeFlags::new_index()
|
||||
};
|
||||
|
||||
let root_page = pager.btree_create(flag);
|
||||
|
||||
let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap();
|
||||
let mv_cursor = match state.mv_tx_id {
|
||||
Some(tx_id) => {
|
||||
let table_id = root_page as u64;
|
||||
let mv_store = mv_store.unwrap().clone();
|
||||
let mv_cursor = Rc::new(RefCell::new(
|
||||
MvCursor::new(mv_store.clone(), tx_id, table_id).unwrap(),
|
||||
));
|
||||
Some(mv_cursor)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
let mut cursor = BTreeCursor::new(mv_cursor, pager, root_page as usize);
|
||||
cursor.rewind()?; // Will never return io
|
||||
|
||||
let mut cursors: std::cell::RefMut<'_, Vec<Option<Cursor>>> = state.cursors.borrow_mut();
|
||||
|
||||
// Table content is erased if the cursor already exists
|
||||
match cursor_type {
|
||||
CursorType::BTreeTable(_) => {
|
||||
cursors
|
||||
.get_mut(*cursor_id)
|
||||
.unwrap()
|
||||
.replace(Cursor::new_btree(cursor));
|
||||
}
|
||||
CursorType::BTreeIndex(_) => {
|
||||
cursors
|
||||
.get_mut(*cursor_id)
|
||||
.unwrap()
|
||||
.replace(Cursor::new_btree(cursor));
|
||||
}
|
||||
CursorType::Pseudo(_) => {
|
||||
panic!("OpenEphemeral on pseudo cursor");
|
||||
}
|
||||
CursorType::Sorter => {
|
||||
panic!("OpenEphemeral on sorter cursor");
|
||||
}
|
||||
CursorType::VirtualTable(_) => {
|
||||
panic!("OpenEphemeral on virtual table cursor, use Insn::VOpenAsync instead");
|
||||
}
|
||||
}
|
||||
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
fn exec_lower(reg: &OwnedValue) -> Option<OwnedValue> {
|
||||
match reg {
|
||||
OwnedValue::Text(t) => Some(OwnedValue::build_text(&t.as_str().to_lowercase())),
|
||||
|
||||
@@ -1389,6 +1389,22 @@ pub fn insn_to_str(
|
||||
0,
|
||||
format!("auto_commit={}, rollback={}", auto_commit, rollback),
|
||||
),
|
||||
Insn::OpenEphemeral {
|
||||
cursor_id,
|
||||
is_table,
|
||||
} => (
|
||||
"OpenEphemeral",
|
||||
*cursor_id as i32,
|
||||
*is_table as i32,
|
||||
0,
|
||||
OwnedValue::build_text(""),
|
||||
0,
|
||||
format!(
|
||||
"cursor={} is_table={}",
|
||||
cursor_id,
|
||||
if *is_table { "true" } else { "false" }
|
||||
),
|
||||
),
|
||||
};
|
||||
format!(
|
||||
"{:<4} {:<17} {:<4} {:<4} {:<4} {:<13} {:<2} {}",
|
||||
|
||||
@@ -812,40 +812,33 @@ pub enum Insn {
|
||||
dest: usize,
|
||||
cookie: Cookie,
|
||||
},
|
||||
/// Open a new cursor P1 to a transient table.
|
||||
OpenEphemeral {
|
||||
cursor_id: usize,
|
||||
is_table: bool,
|
||||
},
|
||||
}
|
||||
|
||||
impl Insn {
|
||||
pub fn to_function(&self) -> InsnFunction {
|
||||
match self {
|
||||
Insn::Init { .. } => execute::op_init,
|
||||
|
||||
Insn::Null { .. } => execute::op_null,
|
||||
|
||||
Insn::NullRow { .. } => execute::op_null_row,
|
||||
|
||||
Insn::Add { .. } => execute::op_add,
|
||||
|
||||
Insn::Subtract { .. } => execute::op_subtract,
|
||||
|
||||
Insn::Multiply { .. } => execute::op_multiply,
|
||||
|
||||
Insn::Divide { .. } => execute::op_divide,
|
||||
|
||||
Insn::Compare { .. } => execute::op_compare,
|
||||
Insn::BitAnd { .. } => execute::op_bit_and,
|
||||
|
||||
Insn::BitOr { .. } => execute::op_bit_or,
|
||||
|
||||
Insn::BitNot { .. } => execute::op_bit_not,
|
||||
|
||||
Insn::Checkpoint { .. } => execute::op_checkpoint,
|
||||
Insn::Remainder { .. } => execute::op_remainder,
|
||||
|
||||
Insn::Jump { .. } => execute::op_jump,
|
||||
Insn::Move { .. } => execute::op_move,
|
||||
Insn::IfPos { .. } => execute::op_if_pos,
|
||||
Insn::NotNull { .. } => execute::op_not_null,
|
||||
|
||||
Insn::Eq { .. } => execute::op_eq,
|
||||
Insn::Ne { .. } => execute::op_ne,
|
||||
Insn::Lt { .. } => execute::op_lt,
|
||||
@@ -856,11 +849,8 @@ impl Insn {
|
||||
Insn::IfNot { .. } => execute::op_if_not,
|
||||
Insn::OpenReadAsync { .. } => execute::op_open_read_async,
|
||||
Insn::OpenReadAwait => execute::op_open_read_await,
|
||||
|
||||
Insn::VOpenAsync { .. } => execute::op_vopen_async,
|
||||
|
||||
Insn::VOpenAwait => execute::op_vopen_await,
|
||||
|
||||
Insn::VCreate { .. } => execute::op_vcreate,
|
||||
Insn::VFilter { .. } => execute::op_vfilter,
|
||||
Insn::VColumn { .. } => execute::op_vcolumn,
|
||||
@@ -868,43 +858,29 @@ impl Insn {
|
||||
Insn::VNext { .. } => execute::op_vnext,
|
||||
Insn::OpenPseudo { .. } => execute::op_open_pseudo,
|
||||
Insn::RewindAsync { .. } => execute::op_rewind_async,
|
||||
|
||||
Insn::RewindAwait { .. } => execute::op_rewind_await,
|
||||
Insn::LastAsync { .. } => execute::op_last_async,
|
||||
|
||||
Insn::LastAwait { .. } => execute::op_last_await,
|
||||
Insn::Column { .. } => execute::op_column,
|
||||
Insn::TypeCheck { .. } => execute::op_type_check,
|
||||
Insn::MakeRecord { .. } => execute::op_make_record,
|
||||
Insn::ResultRow { .. } => execute::op_result_row,
|
||||
|
||||
Insn::NextAsync { .. } => execute::op_next_async,
|
||||
|
||||
Insn::NextAwait { .. } => execute::op_next_await,
|
||||
Insn::PrevAsync { .. } => execute::op_prev_async,
|
||||
|
||||
Insn::PrevAwait { .. } => execute::op_prev_await,
|
||||
Insn::Halt { .. } => execute::op_halt,
|
||||
Insn::Transaction { .. } => execute::op_transaction,
|
||||
|
||||
Insn::AutoCommit { .. } => execute::op_auto_commit,
|
||||
Insn::Goto { .. } => execute::op_goto,
|
||||
|
||||
Insn::Gosub { .. } => execute::op_gosub,
|
||||
Insn::Return { .. } => execute::op_return,
|
||||
|
||||
Insn::Integer { .. } => execute::op_integer,
|
||||
|
||||
Insn::Real { .. } => execute::op_real,
|
||||
|
||||
Insn::RealAffinity { .. } => execute::op_real_affinity,
|
||||
|
||||
Insn::String8 { .. } => execute::op_string8,
|
||||
|
||||
Insn::Blob { .. } => execute::op_blob,
|
||||
|
||||
Insn::RowId { .. } => execute::op_row_id,
|
||||
|
||||
Insn::SeekRowid { .. } => execute::op_seek_rowid,
|
||||
Insn::DeferredSeek { .. } => execute::op_deferred_seek,
|
||||
Insn::SeekGE { .. } => execute::op_seek,
|
||||
@@ -917,10 +893,8 @@ impl Insn {
|
||||
Insn::IdxLE { .. } => execute::op_idx_le,
|
||||
Insn::IdxLT { .. } => execute::op_idx_lt,
|
||||
Insn::DecrJumpZero { .. } => execute::op_decr_jump_zero,
|
||||
|
||||
Insn::AggStep { .. } => execute::op_agg_step,
|
||||
Insn::AggFinal { .. } => execute::op_agg_final,
|
||||
|
||||
Insn::SorterOpen { .. } => execute::op_sorter_open,
|
||||
Insn::SorterInsert { .. } => execute::op_sorter_insert,
|
||||
Insn::SorterSort { .. } => execute::op_sorter_sort,
|
||||
@@ -929,57 +903,39 @@ impl Insn {
|
||||
Insn::Function { .. } => execute::op_function,
|
||||
Insn::InitCoroutine { .. } => execute::op_init_coroutine,
|
||||
Insn::EndCoroutine { .. } => execute::op_end_coroutine,
|
||||
|
||||
Insn::Yield { .. } => execute::op_yield,
|
||||
Insn::InsertAsync { .. } => execute::op_insert_async,
|
||||
Insn::InsertAwait { .. } => execute::op_insert_await,
|
||||
Insn::IdxInsertAsync { .. } => execute::op_idx_insert_async,
|
||||
Insn::IdxInsertAwait { .. } => execute::op_idx_insert_await,
|
||||
Insn::DeleteAsync { .. } => execute::op_delete_async,
|
||||
|
||||
Insn::DeleteAwait { .. } => execute::op_delete_await,
|
||||
|
||||
Insn::NewRowid { .. } => execute::op_new_rowid,
|
||||
Insn::MustBeInt { .. } => execute::op_must_be_int,
|
||||
|
||||
Insn::SoftNull { .. } => execute::op_soft_null,
|
||||
|
||||
Insn::NotExists { .. } => execute::op_not_exists,
|
||||
Insn::OffsetLimit { .. } => execute::op_offset_limit,
|
||||
Insn::OpenWriteAsync { .. } => execute::op_open_write_async,
|
||||
Insn::OpenWriteAwait { .. } => execute::op_open_write_await,
|
||||
|
||||
Insn::Copy { .. } => execute::op_copy,
|
||||
Insn::CreateBtree { .. } => execute::op_create_btree,
|
||||
|
||||
Insn::Destroy { .. } => execute::op_destroy,
|
||||
Insn::DropTable { .. } => execute::op_drop_table,
|
||||
Insn::Close { .. } => execute::op_close,
|
||||
|
||||
Insn::IsNull { .. } => execute::op_is_null,
|
||||
|
||||
Insn::ParseSchema { .. } => execute::op_parse_schema,
|
||||
|
||||
Insn::ShiftRight { .. } => execute::op_shift_right,
|
||||
|
||||
Insn::ShiftLeft { .. } => execute::op_shift_left,
|
||||
|
||||
Insn::Variable { .. } => execute::op_variable,
|
||||
|
||||
Insn::ZeroOrNull { .. } => execute::op_zero_or_null,
|
||||
|
||||
Insn::Not { .. } => execute::op_not,
|
||||
|
||||
Insn::Concat { .. } => execute::op_concat,
|
||||
|
||||
Insn::And { .. } => execute::op_and,
|
||||
|
||||
Insn::Or { .. } => execute::op_or,
|
||||
|
||||
Insn::Noop => execute::op_noop,
|
||||
Insn::PageCount { .. } => execute::op_page_count,
|
||||
|
||||
Insn::ReadCookie { .. } => execute::op_read_cookie,
|
||||
Insn::OpenEphemeral { .. } => execute::op_open_ephemeral,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,4 +97,8 @@ impl IO for SimulatorIO {
|
||||
fn generate_random_number(&self) -> i64 {
|
||||
self.rng.borrow_mut().next_u64() as i64
|
||||
}
|
||||
|
||||
fn get_memory_io(&self) -> Arc<limbo_core::MemoryIO> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user