diff --git a/core/storage/pager.rs b/core/storage/pager.rs index e89b659bf..af574053b 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -258,23 +258,23 @@ impl Pager { } pub fn end_tx(&self) -> Result { - if let Some(wal) = &self.wal { - let checkpoint_status = self.cacheflush()?; - return match checkpoint_status { - CheckpointStatus::IO => Ok(checkpoint_status), - CheckpointStatus::Done(_) => { - wal.borrow().end_write_tx()?; - wal.borrow().end_read_tx()?; - Ok(checkpoint_status) - } + if let Some(wal) = &self.wal { + let checkpoint_status = self.cacheflush()?; + return match checkpoint_status { + CheckpointStatus::IO => Ok(checkpoint_status), + CheckpointStatus::Done(_) => { + wal.borrow().end_write_tx()?; + wal.borrow().end_read_tx()?; + Ok(checkpoint_status) + } + }; } - } - Ok(CheckpointStatus::Done(CheckpointResult::default())) + Ok(CheckpointStatus::Done(CheckpointResult::default())) } pub fn end_read_tx(&self) -> Result<()> { - if let Some(wal) = &self.wal { + if let Some(wal) = &self.wal { wal.borrow().end_read_tx()?; } Ok(()) @@ -297,19 +297,18 @@ impl Pager { page.set_locked(); if let Some(wal) = &self.wal { - if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64)? { - wal - .borrow() - .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; - { - page.set_uptodate(); + if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64)? { + wal.borrow() + .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; + { + page.set_uptodate(); + } + // TODO(pere) ensure page is inserted, we should probably first insert to page cache + // and if successful, read frame or page + page_cache.insert(page_key, page.clone()); + return Ok(page); } - // TODO(pere) ensure page is inserted, we should probably first insert to page cache - // and if successful, read frame or page - page_cache.insert(page_key, page.clone()); - return Ok(page); } - } sqlite3_ondisk::begin_read_page( self.db_file.clone(), self.buffer_pool.clone(), @@ -327,7 +326,7 @@ impl Pager { trace!("load_page(page_idx = {})", id); let mut page_cache = self.page_cache.write(); page.set_locked(); - if let Some(wal) = &self.wal { + if let Some(wal) = &self.wal { let page_key = PageCacheKey::new(id, Some(wal.borrow().get_max_frame())); if let Some(frame_id) = wal.borrow().find_frame(id as u64)? { wal.borrow() @@ -340,8 +339,8 @@ impl Pager { page_cache.insert(page_key, page.clone()); } return Ok(()); + } } - } sqlite3_ondisk::begin_read_page( self.db_file.clone(), self.buffer_pool.clone(), @@ -350,7 +349,7 @@ impl Pager { )?; Ok(()) -} + } /// Writes the database header. pub fn write_database_header(&self, header: &DatabaseHeader) { @@ -379,7 +378,7 @@ impl Pager { let db_size = self.db_header.lock().database_size; for page_id in self.dirty_pages.borrow().iter() { let mut cache = self.page_cache.write(); - if let Some(wal) = &self.wal { + if let Some(wal) = &self.wal { let page_key = PageCacheKey::new(*page_id, Some(wal.borrow().get_max_frame())); let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); @@ -412,7 +411,7 @@ impl Pager { let wal = self.wal.clone().ok_or(LimboError::InternalError( "SyncWal was called without a existing wal".to_string(), ))?; - match wal.borrow_mut().sync() { + match wal.borrow_mut().sync() { Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO), Ok(CheckpointStatus::Done(res)) => checkpoint_result = res, Err(e) => return Err(e), @@ -628,7 +627,7 @@ impl Pager { page.set_dirty(); self.add_dirty(page.get().id); let mut cache = self.page_cache.write(); - let max_frame = match &self.wal { + let max_frame = match &self.wal { Some(wal) => wal.borrow().get_max_frame(), None => 0, }; diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index ecc97d088..fb23141d0 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -1,4 +1,6 @@ #![allow(unused_variables)] +use crate::storage::database::FileMemoryStorage; +use crate::storage::page_cache::DumbLruPageCache; use crate::{ error::{LimboError, SQLITE_CONSTRAINT, SQLITE_CONSTRAINT_PRIMARYKEY}, ext::ExtValue, @@ -10,7 +12,7 @@ use crate::{ printf::exec_printf, }, }; -use std::{borrow::BorrowMut, rc::Rc}; +use std::{borrow::BorrowMut, rc::Rc, sync::Arc}; use crate::{pseudo::PseudoCursor, result::LimboResult}; @@ -36,12 +38,13 @@ use crate::{ vector::{vector32, vector64, vector_distance_cos, vector_extract}, }; -use crate::{info, MvCursor, RefValue, Row, StepResult, TransactionState}; +use crate::{info, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult, TransactionState}; use super::{ insn::{Cookie, RegisterOrLiteral}, HaltState, }; +use parking_lot::RwLock; use rand::thread_rng; use super::{ @@ -4504,6 +4507,89 @@ pub fn op_noop( Ok(InsnFunctionStepResult::Step) } +pub fn op_open_ephemeral( + program: &Program, + state: &mut ProgramState, + insn: &Insn, + pager: &Rc, + mv_store: Option<&Rc>, +) -> Result { + let Insn::OpenEphemeral { + cursor_id, + is_btree, + } = insn + else { + unreachable!("unexpected Insn {:?}", insn) + }; + + let conn = program.connection.upgrade().unwrap(); + // Only memory and vfs IOs returns None, so cloning is safe + let io = match conn.pager.io.get_memory_io() { + Some(io) => io, + None => conn.pager.io.clone(), + }; + + let file = io.open_file("", OpenFlags::Create, true)?; + let page_io = Arc::new(FileMemoryStorage::new(file)); + + let db_header = Pager::begin_open(page_io.clone())?; + let buffer_pool = Rc::new(BufferPool::new(512)); + let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10))); + + let pager = Rc::new(Pager::finish_open( + db_header, + page_io, + None, + io, + page_cache, + buffer_pool, + )?); + + let root_page = pager.btree_create(*is_btree as usize); + + let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap(); + let mv_cursor = match state.mv_tx_id { + Some(tx_id) => { + let table_id = root_page as u64; + let mv_store = mv_store.as_ref().unwrap().clone(); + let mv_cursor = Rc::new(RefCell::new( + MvCursor::new(mv_store.clone(), tx_id, table_id).unwrap(), + )); + Some(mv_cursor) + } + None => None, + }; + let cursor = BTreeCursor::new(mv_cursor, pager, root_page as usize); + let mut cursors: std::cell::RefMut<'_, Vec>> = state.cursors.borrow_mut(); + // Table content is erased if the cursor already exists + match cursor_type { + CursorType::BTreeTable(_) => { + cursors + .get_mut(*cursor_id) + .unwrap() + .replace(Cursor::new_btree(cursor)); + } + CursorType::BTreeIndex(_) => { + cursors + .get_mut(*cursor_id) + .unwrap() + .replace(Cursor::new_btree(cursor)); + } + CursorType::Pseudo(_) => { + panic!("OpenEphemeral on pseudo cursor"); + } + CursorType::Sorter => { + panic!("OpenEphemeral on sorter cursor"); + } + CursorType::VirtualTable(_) => { + panic!("OpenEphemeral on virtual table cursor, use Insn::VOpenAsync instead"); + } + } + + state.pc += 1; + Ok(InsnFunctionStepResult::Step) +} + fn exec_lower(reg: &OwnedValue) -> Option { match reg { OwnedValue::Text(t) => Some(OwnedValue::build_text(&t.as_str().to_lowercase())), diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs index 79b0f3ded..40854da24 100644 --- a/core/vdbe/explain.rs +++ b/core/vdbe/explain.rs @@ -1389,6 +1389,22 @@ pub fn insn_to_str( 0, format!("auto_commit={}, rollback={}", auto_commit, rollback), ), + Insn::OpenEphemeral { + cursor_id, + is_btree, + } => ( + "OpenEphemeral", + *cursor_id as i32, + *is_btree as i32, + 0, + OwnedValue::build_text(""), + 0, + format!( + "cursor={} is_btree={}", + cursor_id, + if *is_btree { "true" } else { "false" } + ), + ), }; format!( "{:<4} {:<17} {:<4} {:<4} {:<4} {:<13} {:<2} {}", diff --git a/core/vdbe/insn.rs b/core/vdbe/insn.rs index acd8166c3..83bac3701 100644 --- a/core/vdbe/insn.rs +++ b/core/vdbe/insn.rs @@ -812,40 +812,33 @@ pub enum Insn { dest: usize, cookie: Cookie, }, + /// Open a new cursor P1 to a transient table. + OpenEphemeral { + cursor_id: usize, + is_btree: bool, + }, } impl Insn { pub fn to_function(&self) -> InsnFunction { match self { Insn::Init { .. } => execute::op_init, - Insn::Null { .. } => execute::op_null, - Insn::NullRow { .. } => execute::op_null_row, - Insn::Add { .. } => execute::op_add, - Insn::Subtract { .. } => execute::op_subtract, - Insn::Multiply { .. } => execute::op_multiply, - Insn::Divide { .. } => execute::op_divide, - Insn::Compare { .. } => execute::op_compare, Insn::BitAnd { .. } => execute::op_bit_and, - Insn::BitOr { .. } => execute::op_bit_or, - Insn::BitNot { .. } => execute::op_bit_not, - Insn::Checkpoint { .. } => execute::op_checkpoint, Insn::Remainder { .. } => execute::op_remainder, - Insn::Jump { .. } => execute::op_jump, Insn::Move { .. } => execute::op_move, Insn::IfPos { .. } => execute::op_if_pos, Insn::NotNull { .. } => execute::op_not_null, - Insn::Eq { .. } => execute::op_eq, Insn::Ne { .. } => execute::op_ne, Insn::Lt { .. } => execute::op_lt, @@ -856,11 +849,8 @@ impl Insn { Insn::IfNot { .. } => execute::op_if_not, Insn::OpenReadAsync { .. } => execute::op_open_read_async, Insn::OpenReadAwait => execute::op_open_read_await, - Insn::VOpenAsync { .. } => execute::op_vopen_async, - Insn::VOpenAwait => execute::op_vopen_await, - Insn::VCreate { .. } => execute::op_vcreate, Insn::VFilter { .. } => execute::op_vfilter, Insn::VColumn { .. } => execute::op_vcolumn, @@ -868,43 +858,29 @@ impl Insn { Insn::VNext { .. } => execute::op_vnext, Insn::OpenPseudo { .. } => execute::op_open_pseudo, Insn::RewindAsync { .. } => execute::op_rewind_async, - Insn::RewindAwait { .. } => execute::op_rewind_await, Insn::LastAsync { .. } => execute::op_last_async, - Insn::LastAwait { .. } => execute::op_last_await, Insn::Column { .. } => execute::op_column, Insn::TypeCheck { .. } => execute::op_type_check, Insn::MakeRecord { .. } => execute::op_make_record, Insn::ResultRow { .. } => execute::op_result_row, - Insn::NextAsync { .. } => execute::op_next_async, - Insn::NextAwait { .. } => execute::op_next_await, Insn::PrevAsync { .. } => execute::op_prev_async, - Insn::PrevAwait { .. } => execute::op_prev_await, Insn::Halt { .. } => execute::op_halt, Insn::Transaction { .. } => execute::op_transaction, - Insn::AutoCommit { .. } => execute::op_auto_commit, Insn::Goto { .. } => execute::op_goto, - Insn::Gosub { .. } => execute::op_gosub, Insn::Return { .. } => execute::op_return, - Insn::Integer { .. } => execute::op_integer, - Insn::Real { .. } => execute::op_real, - Insn::RealAffinity { .. } => execute::op_real_affinity, - Insn::String8 { .. } => execute::op_string8, - Insn::Blob { .. } => execute::op_blob, - Insn::RowId { .. } => execute::op_row_id, - Insn::SeekRowid { .. } => execute::op_seek_rowid, Insn::DeferredSeek { .. } => execute::op_deferred_seek, Insn::SeekGE { .. } => execute::op_seek, @@ -917,10 +893,8 @@ impl Insn { Insn::IdxLE { .. } => execute::op_idx_le, Insn::IdxLT { .. } => execute::op_idx_lt, Insn::DecrJumpZero { .. } => execute::op_decr_jump_zero, - Insn::AggStep { .. } => execute::op_agg_step, Insn::AggFinal { .. } => execute::op_agg_final, - Insn::SorterOpen { .. } => execute::op_sorter_open, Insn::SorterInsert { .. } => execute::op_sorter_insert, Insn::SorterSort { .. } => execute::op_sorter_sort, @@ -929,57 +903,39 @@ impl Insn { Insn::Function { .. } => execute::op_function, Insn::InitCoroutine { .. } => execute::op_init_coroutine, Insn::EndCoroutine { .. } => execute::op_end_coroutine, - Insn::Yield { .. } => execute::op_yield, Insn::InsertAsync { .. } => execute::op_insert_async, Insn::InsertAwait { .. } => execute::op_insert_await, Insn::IdxInsertAsync { .. } => execute::op_idx_insert_async, Insn::IdxInsertAwait { .. } => execute::op_idx_insert_await, Insn::DeleteAsync { .. } => execute::op_delete_async, - Insn::DeleteAwait { .. } => execute::op_delete_await, - Insn::NewRowid { .. } => execute::op_new_rowid, Insn::MustBeInt { .. } => execute::op_must_be_int, - Insn::SoftNull { .. } => execute::op_soft_null, - Insn::NotExists { .. } => execute::op_not_exists, Insn::OffsetLimit { .. } => execute::op_offset_limit, Insn::OpenWriteAsync { .. } => execute::op_open_write_async, Insn::OpenWriteAwait { .. } => execute::op_open_write_await, - Insn::Copy { .. } => execute::op_copy, Insn::CreateBtree { .. } => execute::op_create_btree, - Insn::Destroy { .. } => execute::op_destroy, Insn::DropTable { .. } => execute::op_drop_table, Insn::Close { .. } => execute::op_close, - Insn::IsNull { .. } => execute::op_is_null, - Insn::ParseSchema { .. } => execute::op_parse_schema, - Insn::ShiftRight { .. } => execute::op_shift_right, - Insn::ShiftLeft { .. } => execute::op_shift_left, - Insn::Variable { .. } => execute::op_variable, - Insn::ZeroOrNull { .. } => execute::op_zero_or_null, - Insn::Not { .. } => execute::op_not, - Insn::Concat { .. } => execute::op_concat, - Insn::And { .. } => execute::op_and, - Insn::Or { .. } => execute::op_or, - Insn::Noop => execute::op_noop, Insn::PageCount { .. } => execute::op_page_count, - Insn::ReadCookie { .. } => execute::op_read_cookie, + Insn::OpenEphemeral { .. } => execute::op_open_ephemeral, } } }