mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-16 05:24:22 +01:00
core/vdbe: Create OpenEphemeral bytecode
"Open a new cursor P1 to a transient table. The cursor is always opened read/write even if the main database is read-only. The ephemeral table is deleted automatically when the cursor is closed. If the cursor P1 is already opened on an ephemeral table, the table is cleared (all content is erased)." There is still some work to do, but this is a basic setup
This commit is contained in:
@@ -258,23 +258,23 @@ impl Pager {
|
||||
}
|
||||
|
||||
pub fn end_tx(&self) -> Result<CheckpointStatus> {
|
||||
if let Some(wal) = &self.wal {
|
||||
let checkpoint_status = self.cacheflush()?;
|
||||
return match checkpoint_status {
|
||||
CheckpointStatus::IO => Ok(checkpoint_status),
|
||||
CheckpointStatus::Done(_) => {
|
||||
wal.borrow().end_write_tx()?;
|
||||
wal.borrow().end_read_tx()?;
|
||||
Ok(checkpoint_status)
|
||||
}
|
||||
if let Some(wal) = &self.wal {
|
||||
let checkpoint_status = self.cacheflush()?;
|
||||
return match checkpoint_status {
|
||||
CheckpointStatus::IO => Ok(checkpoint_status),
|
||||
CheckpointStatus::Done(_) => {
|
||||
wal.borrow().end_write_tx()?;
|
||||
wal.borrow().end_read_tx()?;
|
||||
Ok(checkpoint_status)
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Ok(CheckpointStatus::Done(CheckpointResult::default()))
|
||||
Ok(CheckpointStatus::Done(CheckpointResult::default()))
|
||||
}
|
||||
|
||||
pub fn end_read_tx(&self) -> Result<()> {
|
||||
if let Some(wal) = &self.wal {
|
||||
if let Some(wal) = &self.wal {
|
||||
wal.borrow().end_read_tx()?;
|
||||
}
|
||||
Ok(())
|
||||
@@ -297,19 +297,18 @@ impl Pager {
|
||||
page.set_locked();
|
||||
|
||||
if let Some(wal) = &self.wal {
|
||||
if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64)? {
|
||||
wal
|
||||
.borrow()
|
||||
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
|
||||
{
|
||||
page.set_uptodate();
|
||||
if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64)? {
|
||||
wal.borrow()
|
||||
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
|
||||
{
|
||||
page.set_uptodate();
|
||||
}
|
||||
// TODO(pere) ensure page is inserted, we should probably first insert to page cache
|
||||
// and if successful, read frame or page
|
||||
page_cache.insert(page_key, page.clone());
|
||||
return Ok(page);
|
||||
}
|
||||
// TODO(pere) ensure page is inserted, we should probably first insert to page cache
|
||||
// and if successful, read frame or page
|
||||
page_cache.insert(page_key, page.clone());
|
||||
return Ok(page);
|
||||
}
|
||||
}
|
||||
sqlite3_ondisk::begin_read_page(
|
||||
self.db_file.clone(),
|
||||
self.buffer_pool.clone(),
|
||||
@@ -327,7 +326,7 @@ impl Pager {
|
||||
trace!("load_page(page_idx = {})", id);
|
||||
let mut page_cache = self.page_cache.write();
|
||||
page.set_locked();
|
||||
if let Some(wal) = &self.wal {
|
||||
if let Some(wal) = &self.wal {
|
||||
let page_key = PageCacheKey::new(id, Some(wal.borrow().get_max_frame()));
|
||||
if let Some(frame_id) = wal.borrow().find_frame(id as u64)? {
|
||||
wal.borrow()
|
||||
@@ -340,8 +339,8 @@ impl Pager {
|
||||
page_cache.insert(page_key, page.clone());
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
sqlite3_ondisk::begin_read_page(
|
||||
self.db_file.clone(),
|
||||
self.buffer_pool.clone(),
|
||||
@@ -350,7 +349,7 @@ impl Pager {
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Writes the database header.
|
||||
pub fn write_database_header(&self, header: &DatabaseHeader) {
|
||||
@@ -379,7 +378,7 @@ impl Pager {
|
||||
let db_size = self.db_header.lock().database_size;
|
||||
for page_id in self.dirty_pages.borrow().iter() {
|
||||
let mut cache = self.page_cache.write();
|
||||
if let Some(wal) = &self.wal {
|
||||
if let Some(wal) = &self.wal {
|
||||
let page_key =
|
||||
PageCacheKey::new(*page_id, Some(wal.borrow().get_max_frame()));
|
||||
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
|
||||
@@ -412,7 +411,7 @@ impl Pager {
|
||||
let wal = self.wal.clone().ok_or(LimboError::InternalError(
|
||||
"SyncWal was called without a existing wal".to_string(),
|
||||
))?;
|
||||
match wal.borrow_mut().sync() {
|
||||
match wal.borrow_mut().sync() {
|
||||
Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO),
|
||||
Ok(CheckpointStatus::Done(res)) => checkpoint_result = res,
|
||||
Err(e) => return Err(e),
|
||||
@@ -628,7 +627,7 @@ impl Pager {
|
||||
page.set_dirty();
|
||||
self.add_dirty(page.get().id);
|
||||
let mut cache = self.page_cache.write();
|
||||
let max_frame = match &self.wal {
|
||||
let max_frame = match &self.wal {
|
||||
Some(wal) => wal.borrow().get_max_frame(),
|
||||
None => 0,
|
||||
};
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
#![allow(unused_variables)]
|
||||
use crate::storage::database::FileMemoryStorage;
|
||||
use crate::storage::page_cache::DumbLruPageCache;
|
||||
use crate::{
|
||||
error::{LimboError, SQLITE_CONSTRAINT, SQLITE_CONSTRAINT_PRIMARYKEY},
|
||||
ext::ExtValue,
|
||||
@@ -10,7 +12,7 @@ use crate::{
|
||||
printf::exec_printf,
|
||||
},
|
||||
};
|
||||
use std::{borrow::BorrowMut, rc::Rc};
|
||||
use std::{borrow::BorrowMut, rc::Rc, sync::Arc};
|
||||
|
||||
use crate::{pseudo::PseudoCursor, result::LimboResult};
|
||||
|
||||
@@ -36,12 +38,13 @@ use crate::{
|
||||
vector::{vector32, vector64, vector_distance_cos, vector_extract},
|
||||
};
|
||||
|
||||
use crate::{info, MvCursor, RefValue, Row, StepResult, TransactionState};
|
||||
use crate::{info, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult, TransactionState};
|
||||
|
||||
use super::{
|
||||
insn::{Cookie, RegisterOrLiteral},
|
||||
HaltState,
|
||||
};
|
||||
use parking_lot::RwLock;
|
||||
use rand::thread_rng;
|
||||
|
||||
use super::{
|
||||
@@ -4504,6 +4507,89 @@ pub fn op_noop(
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
pub fn op_open_ephemeral(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
insn: &Insn,
|
||||
pager: &Rc<Pager>,
|
||||
mv_store: Option<&Rc<MvStore>>,
|
||||
) -> Result<InsnFunctionStepResult> {
|
||||
let Insn::OpenEphemeral {
|
||||
cursor_id,
|
||||
is_btree,
|
||||
} = insn
|
||||
else {
|
||||
unreachable!("unexpected Insn {:?}", insn)
|
||||
};
|
||||
|
||||
let conn = program.connection.upgrade().unwrap();
|
||||
// Only memory and vfs IOs returns None, so cloning is safe
|
||||
let io = match conn.pager.io.get_memory_io() {
|
||||
Some(io) => io,
|
||||
None => conn.pager.io.clone(),
|
||||
};
|
||||
|
||||
let file = io.open_file("", OpenFlags::Create, true)?;
|
||||
let page_io = Arc::new(FileMemoryStorage::new(file));
|
||||
|
||||
let db_header = Pager::begin_open(page_io.clone())?;
|
||||
let buffer_pool = Rc::new(BufferPool::new(512));
|
||||
let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10)));
|
||||
|
||||
let pager = Rc::new(Pager::finish_open(
|
||||
db_header,
|
||||
page_io,
|
||||
None,
|
||||
io,
|
||||
page_cache,
|
||||
buffer_pool,
|
||||
)?);
|
||||
|
||||
let root_page = pager.btree_create(*is_btree as usize);
|
||||
|
||||
let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap();
|
||||
let mv_cursor = match state.mv_tx_id {
|
||||
Some(tx_id) => {
|
||||
let table_id = root_page as u64;
|
||||
let mv_store = mv_store.as_ref().unwrap().clone();
|
||||
let mv_cursor = Rc::new(RefCell::new(
|
||||
MvCursor::new(mv_store.clone(), tx_id, table_id).unwrap(),
|
||||
));
|
||||
Some(mv_cursor)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
let cursor = BTreeCursor::new(mv_cursor, pager, root_page as usize);
|
||||
let mut cursors: std::cell::RefMut<'_, Vec<Option<Cursor>>> = state.cursors.borrow_mut();
|
||||
// Table content is erased if the cursor already exists
|
||||
match cursor_type {
|
||||
CursorType::BTreeTable(_) => {
|
||||
cursors
|
||||
.get_mut(*cursor_id)
|
||||
.unwrap()
|
||||
.replace(Cursor::new_btree(cursor));
|
||||
}
|
||||
CursorType::BTreeIndex(_) => {
|
||||
cursors
|
||||
.get_mut(*cursor_id)
|
||||
.unwrap()
|
||||
.replace(Cursor::new_btree(cursor));
|
||||
}
|
||||
CursorType::Pseudo(_) => {
|
||||
panic!("OpenEphemeral on pseudo cursor");
|
||||
}
|
||||
CursorType::Sorter => {
|
||||
panic!("OpenEphemeral on sorter cursor");
|
||||
}
|
||||
CursorType::VirtualTable(_) => {
|
||||
panic!("OpenEphemeral on virtual table cursor, use Insn::VOpenAsync instead");
|
||||
}
|
||||
}
|
||||
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
fn exec_lower(reg: &OwnedValue) -> Option<OwnedValue> {
|
||||
match reg {
|
||||
OwnedValue::Text(t) => Some(OwnedValue::build_text(&t.as_str().to_lowercase())),
|
||||
|
||||
@@ -1389,6 +1389,22 @@ pub fn insn_to_str(
|
||||
0,
|
||||
format!("auto_commit={}, rollback={}", auto_commit, rollback),
|
||||
),
|
||||
Insn::OpenEphemeral {
|
||||
cursor_id,
|
||||
is_btree,
|
||||
} => (
|
||||
"OpenEphemeral",
|
||||
*cursor_id as i32,
|
||||
*is_btree as i32,
|
||||
0,
|
||||
OwnedValue::build_text(""),
|
||||
0,
|
||||
format!(
|
||||
"cursor={} is_btree={}",
|
||||
cursor_id,
|
||||
if *is_btree { "true" } else { "false" }
|
||||
),
|
||||
),
|
||||
};
|
||||
format!(
|
||||
"{:<4} {:<17} {:<4} {:<4} {:<4} {:<13} {:<2} {}",
|
||||
|
||||
@@ -812,40 +812,33 @@ pub enum Insn {
|
||||
dest: usize,
|
||||
cookie: Cookie,
|
||||
},
|
||||
/// Open a new cursor P1 to a transient table.
|
||||
OpenEphemeral {
|
||||
cursor_id: usize,
|
||||
is_btree: bool,
|
||||
},
|
||||
}
|
||||
|
||||
impl Insn {
|
||||
pub fn to_function(&self) -> InsnFunction {
|
||||
match self {
|
||||
Insn::Init { .. } => execute::op_init,
|
||||
|
||||
Insn::Null { .. } => execute::op_null,
|
||||
|
||||
Insn::NullRow { .. } => execute::op_null_row,
|
||||
|
||||
Insn::Add { .. } => execute::op_add,
|
||||
|
||||
Insn::Subtract { .. } => execute::op_subtract,
|
||||
|
||||
Insn::Multiply { .. } => execute::op_multiply,
|
||||
|
||||
Insn::Divide { .. } => execute::op_divide,
|
||||
|
||||
Insn::Compare { .. } => execute::op_compare,
|
||||
Insn::BitAnd { .. } => execute::op_bit_and,
|
||||
|
||||
Insn::BitOr { .. } => execute::op_bit_or,
|
||||
|
||||
Insn::BitNot { .. } => execute::op_bit_not,
|
||||
|
||||
Insn::Checkpoint { .. } => execute::op_checkpoint,
|
||||
Insn::Remainder { .. } => execute::op_remainder,
|
||||
|
||||
Insn::Jump { .. } => execute::op_jump,
|
||||
Insn::Move { .. } => execute::op_move,
|
||||
Insn::IfPos { .. } => execute::op_if_pos,
|
||||
Insn::NotNull { .. } => execute::op_not_null,
|
||||
|
||||
Insn::Eq { .. } => execute::op_eq,
|
||||
Insn::Ne { .. } => execute::op_ne,
|
||||
Insn::Lt { .. } => execute::op_lt,
|
||||
@@ -856,11 +849,8 @@ impl Insn {
|
||||
Insn::IfNot { .. } => execute::op_if_not,
|
||||
Insn::OpenReadAsync { .. } => execute::op_open_read_async,
|
||||
Insn::OpenReadAwait => execute::op_open_read_await,
|
||||
|
||||
Insn::VOpenAsync { .. } => execute::op_vopen_async,
|
||||
|
||||
Insn::VOpenAwait => execute::op_vopen_await,
|
||||
|
||||
Insn::VCreate { .. } => execute::op_vcreate,
|
||||
Insn::VFilter { .. } => execute::op_vfilter,
|
||||
Insn::VColumn { .. } => execute::op_vcolumn,
|
||||
@@ -868,43 +858,29 @@ impl Insn {
|
||||
Insn::VNext { .. } => execute::op_vnext,
|
||||
Insn::OpenPseudo { .. } => execute::op_open_pseudo,
|
||||
Insn::RewindAsync { .. } => execute::op_rewind_async,
|
||||
|
||||
Insn::RewindAwait { .. } => execute::op_rewind_await,
|
||||
Insn::LastAsync { .. } => execute::op_last_async,
|
||||
|
||||
Insn::LastAwait { .. } => execute::op_last_await,
|
||||
Insn::Column { .. } => execute::op_column,
|
||||
Insn::TypeCheck { .. } => execute::op_type_check,
|
||||
Insn::MakeRecord { .. } => execute::op_make_record,
|
||||
Insn::ResultRow { .. } => execute::op_result_row,
|
||||
|
||||
Insn::NextAsync { .. } => execute::op_next_async,
|
||||
|
||||
Insn::NextAwait { .. } => execute::op_next_await,
|
||||
Insn::PrevAsync { .. } => execute::op_prev_async,
|
||||
|
||||
Insn::PrevAwait { .. } => execute::op_prev_await,
|
||||
Insn::Halt { .. } => execute::op_halt,
|
||||
Insn::Transaction { .. } => execute::op_transaction,
|
||||
|
||||
Insn::AutoCommit { .. } => execute::op_auto_commit,
|
||||
Insn::Goto { .. } => execute::op_goto,
|
||||
|
||||
Insn::Gosub { .. } => execute::op_gosub,
|
||||
Insn::Return { .. } => execute::op_return,
|
||||
|
||||
Insn::Integer { .. } => execute::op_integer,
|
||||
|
||||
Insn::Real { .. } => execute::op_real,
|
||||
|
||||
Insn::RealAffinity { .. } => execute::op_real_affinity,
|
||||
|
||||
Insn::String8 { .. } => execute::op_string8,
|
||||
|
||||
Insn::Blob { .. } => execute::op_blob,
|
||||
|
||||
Insn::RowId { .. } => execute::op_row_id,
|
||||
|
||||
Insn::SeekRowid { .. } => execute::op_seek_rowid,
|
||||
Insn::DeferredSeek { .. } => execute::op_deferred_seek,
|
||||
Insn::SeekGE { .. } => execute::op_seek,
|
||||
@@ -917,10 +893,8 @@ impl Insn {
|
||||
Insn::IdxLE { .. } => execute::op_idx_le,
|
||||
Insn::IdxLT { .. } => execute::op_idx_lt,
|
||||
Insn::DecrJumpZero { .. } => execute::op_decr_jump_zero,
|
||||
|
||||
Insn::AggStep { .. } => execute::op_agg_step,
|
||||
Insn::AggFinal { .. } => execute::op_agg_final,
|
||||
|
||||
Insn::SorterOpen { .. } => execute::op_sorter_open,
|
||||
Insn::SorterInsert { .. } => execute::op_sorter_insert,
|
||||
Insn::SorterSort { .. } => execute::op_sorter_sort,
|
||||
@@ -929,57 +903,39 @@ impl Insn {
|
||||
Insn::Function { .. } => execute::op_function,
|
||||
Insn::InitCoroutine { .. } => execute::op_init_coroutine,
|
||||
Insn::EndCoroutine { .. } => execute::op_end_coroutine,
|
||||
|
||||
Insn::Yield { .. } => execute::op_yield,
|
||||
Insn::InsertAsync { .. } => execute::op_insert_async,
|
||||
Insn::InsertAwait { .. } => execute::op_insert_await,
|
||||
Insn::IdxInsertAsync { .. } => execute::op_idx_insert_async,
|
||||
Insn::IdxInsertAwait { .. } => execute::op_idx_insert_await,
|
||||
Insn::DeleteAsync { .. } => execute::op_delete_async,
|
||||
|
||||
Insn::DeleteAwait { .. } => execute::op_delete_await,
|
||||
|
||||
Insn::NewRowid { .. } => execute::op_new_rowid,
|
||||
Insn::MustBeInt { .. } => execute::op_must_be_int,
|
||||
|
||||
Insn::SoftNull { .. } => execute::op_soft_null,
|
||||
|
||||
Insn::NotExists { .. } => execute::op_not_exists,
|
||||
Insn::OffsetLimit { .. } => execute::op_offset_limit,
|
||||
Insn::OpenWriteAsync { .. } => execute::op_open_write_async,
|
||||
Insn::OpenWriteAwait { .. } => execute::op_open_write_await,
|
||||
|
||||
Insn::Copy { .. } => execute::op_copy,
|
||||
Insn::CreateBtree { .. } => execute::op_create_btree,
|
||||
|
||||
Insn::Destroy { .. } => execute::op_destroy,
|
||||
Insn::DropTable { .. } => execute::op_drop_table,
|
||||
Insn::Close { .. } => execute::op_close,
|
||||
|
||||
Insn::IsNull { .. } => execute::op_is_null,
|
||||
|
||||
Insn::ParseSchema { .. } => execute::op_parse_schema,
|
||||
|
||||
Insn::ShiftRight { .. } => execute::op_shift_right,
|
||||
|
||||
Insn::ShiftLeft { .. } => execute::op_shift_left,
|
||||
|
||||
Insn::Variable { .. } => execute::op_variable,
|
||||
|
||||
Insn::ZeroOrNull { .. } => execute::op_zero_or_null,
|
||||
|
||||
Insn::Not { .. } => execute::op_not,
|
||||
|
||||
Insn::Concat { .. } => execute::op_concat,
|
||||
|
||||
Insn::And { .. } => execute::op_and,
|
||||
|
||||
Insn::Or { .. } => execute::op_or,
|
||||
|
||||
Insn::Noop => execute::op_noop,
|
||||
Insn::PageCount { .. } => execute::op_page_count,
|
||||
|
||||
Insn::ReadCookie { .. } => execute::op_read_cookie,
|
||||
Insn::OpenEphemeral { .. } => execute::op_open_ephemeral,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user