From 65d4c68cf2d47bd775bf77b068a8434040399753 Mon Sep 17 00:00:00 2001 From: Diego Reis Date: Thu, 3 Apr 2025 14:39:20 -0300 Subject: [PATCH 01/13] core/pager: Wrap wal with Option --- core/lib.rs | 2 +- core/storage/btree.rs | 4 +- core/storage/pager.rs | 141 ++++++++++++++++++++++++++---------------- 3 files changed, 90 insertions(+), 57 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index b66ef4c23..1ac907af7 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -183,7 +183,7 @@ impl Database { let pager = Rc::new(Pager::finish_open( self.header.clone(), self.db_file.clone(), - wal, + Some(wal), self.io.clone(), self.shared_page_cache.clone(), buffer_pool, diff --git a/core/storage/btree.rs b/core/storage/btree.rs index dc512afd5..ba121d86d 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -5012,7 +5012,7 @@ mod tests { let page_cache = Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(10))); let pager = { let db_header = Arc::new(SpinLock::new(db_header.clone())); - Pager::finish_open(db_header, db_file, wal, io, page_cache, buffer_pool).unwrap() + Pager::finish_open(db_header, db_file, Some(wal), io, page_cache, buffer_pool).unwrap() }; let pager = Rc::new(pager); let page1 = pager.allocate_page().unwrap(); @@ -5329,7 +5329,7 @@ mod tests { Pager::finish_open( db_header.clone(), db_file, - wal, + Some(wal), io, Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(10))), buffer_pool, diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 5d9554198..e89b659bf 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -157,7 +157,7 @@ pub struct Pager { /// Source of the database pages. pub db_file: Arc, /// The write-ahead log (WAL) for the database. - wal: Rc>, + wal: Option>>, /// A page cache for the database. page_cache: Arc>, /// Buffer pool for temporary data storage. @@ -183,7 +183,7 @@ impl Pager { pub fn finish_open( db_header_ref: Arc>, db_file: Arc, - wal: Rc>, + wal: Option>>, io: Arc, page_cache: Arc>, buffer_pool: Rc, @@ -241,28 +241,42 @@ impl Pager { #[inline(always)] pub fn begin_read_tx(&self) -> Result { - self.wal.borrow_mut().begin_read_tx() + if let Some(wal) = &self.wal { + return wal.borrow_mut().begin_read_tx(); + } + + Ok(LimboResult::Ok) } #[inline(always)] pub fn begin_write_tx(&self) -> Result { - self.wal.borrow_mut().begin_write_tx() + if let Some(wal) = &self.wal { + return wal.borrow_mut().begin_write_tx(); + } + + Ok(LimboResult::Ok) } pub fn end_tx(&self) -> Result { + if let Some(wal) = &self.wal { let checkpoint_status = self.cacheflush()?; - match checkpoint_status { + return match checkpoint_status { CheckpointStatus::IO => Ok(checkpoint_status), CheckpointStatus::Done(_) => { - self.wal.borrow().end_write_tx()?; - self.wal.borrow().end_read_tx()?; + wal.borrow().end_write_tx()?; + wal.borrow().end_read_tx()?; Ok(checkpoint_status) } } } + Ok(CheckpointStatus::Done(CheckpointResult::default())) + } + pub fn end_read_tx(&self) -> Result<()> { - self.wal.borrow().end_read_tx()?; + if let Some(wal) = &self.wal { + wal.borrow().end_read_tx()?; + } Ok(()) } @@ -270,7 +284,11 @@ impl Pager { pub fn read_page(&self, page_idx: usize) -> Result { tracing::trace!("read_page(page_idx = {})", page_idx); let mut page_cache = self.page_cache.write(); - let page_key = PageCacheKey::new(page_idx, Some(self.wal.borrow().get_max_frame())); + let max_frame = match &self.wal { + Some(wal) => wal.borrow().get_max_frame(), + None => 0, + }; + let page_key = PageCacheKey::new(page_idx, Some(max_frame)); if let Some(page) = page_cache.get(&page_key) { tracing::trace!("read_page(page_idx = {}) = cached", page_idx); return Ok(page.clone()); @@ -278,8 +296,9 @@ impl Pager { let page = Arc::new(Page::new(page_idx)); page.set_locked(); - if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? { - self.wal + if let Some(wal) = &self.wal { + if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64)? { + wal .borrow() .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; { @@ -290,6 +309,7 @@ impl Pager { page_cache.insert(page_key, page.clone()); return Ok(page); } + } sqlite3_ondisk::begin_read_page( self.db_file.clone(), self.buffer_pool.clone(), @@ -307,32 +327,30 @@ impl Pager { trace!("load_page(page_idx = {})", id); let mut page_cache = self.page_cache.write(); page.set_locked(); - let page_key = PageCacheKey::new(id, Some(self.wal.borrow().get_max_frame())); - if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? { - self.wal - .borrow() - .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; - { - page.set_uptodate(); - } - // TODO(pere) ensure page is inserted - if !page_cache.contains_key(&page_key) { - page_cache.insert(page_key, page.clone()); - } - return Ok(()); + if let Some(wal) = &self.wal { + let page_key = PageCacheKey::new(id, Some(wal.borrow().get_max_frame())); + if let Some(frame_id) = wal.borrow().find_frame(id as u64)? { + wal.borrow() + .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; + { + page.set_uptodate(); + } + // TODO(pere) ensure page is inserted + if !page_cache.contains_key(&page_key) { + page_cache.insert(page_key, page.clone()); + } + return Ok(()); } + } sqlite3_ondisk::begin_read_page( self.db_file.clone(), self.buffer_pool.clone(), page.clone(), id, )?; - // TODO(pere) ensure page is inserted - if !page_cache.contains_key(&page_key) { - page_cache.insert(page_key, page.clone()); - } + Ok(()) - } +} /// Writes the database header. pub fn write_database_header(&self, header: &DatabaseHeader) { @@ -361,20 +379,22 @@ impl Pager { let db_size = self.db_header.lock().database_size; for page_id in self.dirty_pages.borrow().iter() { let mut cache = self.page_cache.write(); - let page_key = - PageCacheKey::new(*page_id, Some(self.wal.borrow().get_max_frame())); - let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); - let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); - trace!("cacheflush(page={}, page_type={:?}", page_id, page_type); - self.wal.borrow_mut().append_frame( - page.clone(), - db_size, - self.flush_info.borrow().in_flight_writes.clone(), - )?; - // This page is no longer valid. - // For example: - // We took page with key (page_num, max_frame) -- this page is no longer valid for that max_frame so it must be invalidated. - cache.delete(page_key); + if let Some(wal) = &self.wal { + let page_key = + PageCacheKey::new(*page_id, Some(wal.borrow().get_max_frame())); + let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); + trace!("cacheflush(page={}, page_type={:?}", page_id, page_type); + wal.borrow_mut().append_frame( + page.clone(), + db_size, + self.flush_info.borrow().in_flight_writes.clone(), + )?; + // This page is no longer valid. + // For example: + // We took page with key (page_num, max_frame) -- this page is no longer valid for that max_frame so it must be invalidated. + cache.delete(page_key); + } } self.dirty_pages.borrow_mut().clear(); self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames; @@ -389,13 +409,16 @@ impl Pager { } } FlushState::SyncWal => { - match self.wal.borrow_mut().sync() { + let wal = self.wal.clone().ok_or(LimboError::InternalError( + "SyncWal was called without a existing wal".to_string(), + ))?; + match wal.borrow_mut().sync() { Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO), Ok(CheckpointStatus::Done(res)) => checkpoint_result = res, Err(e) => return Err(e), } - let should_checkpoint = self.wal.borrow().should_checkpoint(); + let should_checkpoint = wal.borrow().should_checkpoint(); if should_checkpoint { self.flush_info.borrow_mut().state = FlushState::Checkpoint; } else { @@ -437,11 +460,13 @@ impl Pager { match state { CheckpointState::Checkpoint => { let in_flight = self.checkpoint_inflight.clone(); - match self.wal.borrow_mut().checkpoint( - self, - in_flight, - CheckpointMode::Passive, - )? { + let wal = self.wal.clone().ok_or(LimboError::InternalError( + "Checkpoint was called without a existing wal".to_string(), + ))?; + match wal + .borrow_mut() + .checkpoint(self, in_flight, CheckpointMode::Passive)? + { CheckpointStatus::IO => return Ok(CheckpointStatus::IO), CheckpointStatus::Done(res) => { checkpoint_result = res; @@ -478,7 +503,7 @@ impl Pager { pub fn clear_page_cache(&self) -> CheckpointResult { let checkpoint_result: CheckpointResult; loop { - match self.wal.borrow_mut().checkpoint( + match self.wal.clone().unwrap().borrow_mut().checkpoint( self, Rc::new(RefCell::new(0)), CheckpointMode::Passive, @@ -603,8 +628,12 @@ impl Pager { page.set_dirty(); self.add_dirty(page.get().id); let mut cache = self.page_cache.write(); - let page_key = - PageCacheKey::new(page.get().id, Some(self.wal.borrow().get_max_frame())); + let max_frame = match &self.wal { + Some(wal) => wal.borrow().get_max_frame(), + None => 0, + }; + + let page_key = PageCacheKey::new(page.get().id, Some(max_frame)); cache.insert(page_key, page.clone()); } Ok(page) @@ -613,7 +642,11 @@ impl Pager { pub fn put_loaded_page(&self, id: usize, page: PageRef) { let mut cache = self.page_cache.write(); // cache insert invalidates previous page - let page_key = PageCacheKey::new(id, Some(self.wal.borrow().get_max_frame())); + let max_frame = match &self.wal { + Some(wal) => wal.borrow().get_max_frame(), + None => 0, + }; + let page_key = PageCacheKey::new(id, Some(max_frame)); cache.insert(page_key, page.clone()); page.set_loaded(); } From b519509349d77203eaed0e54dec09c6ef566c2b8 Mon Sep 17 00:00:00 2001 From: Diego Reis Date: Thu, 3 Apr 2025 15:16:33 -0300 Subject: [PATCH 02/13] core/io: Add internal in-memory MemoryIO to all IO layers Honestly I don't have 100% sure if this is a good idea, the reasoning is that in any IO we'll want to do memory only operations like creating tables etc, so may want a common way to access it --- core/io/generic.rs | 7 +++++++ core/io/io_uring.rs | 8 +++++++- core/io/memory.rs | 4 ++++ core/io/mod.rs | 2 ++ core/io/unix.rs | 8 +++++++- core/io/vfs.rs | 10 +++++++++- core/io/windows.rs | 11 +++++++++-- 7 files changed, 45 insertions(+), 5 deletions(-) diff --git a/core/io/generic.rs b/core/io/generic.rs index e1c7eb1f1..b17a0ea5a 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -3,6 +3,7 @@ use std::cell::RefCell; use std::io::{Read, Seek, Write}; use std::sync::Arc; use tracing::{debug, trace}; +use super::MemoryIO; pub struct GenericIO {} @@ -26,6 +27,7 @@ impl IO for GenericIO { .open(path)?; Ok(Arc::new(GenericFile { file: RefCell::new(file), + memory_io: Arc::new(MemoryIO::new()), })) } @@ -48,10 +50,15 @@ impl Clock for GenericIO { micros: now.timestamp_subsec_micros(), } } + + fn get_memory_io(&self) -> Option> { + Some(self.memory_io.clone()) + } } pub struct GenericFile { file: RefCell, + memory_io: Arc, } unsafe impl Send for GenericFile {} diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 77d574639..7d73461a0 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -1,5 +1,5 @@ use super::{common, Completion, File, OpenFlags, WriteCompletion, IO}; -use crate::{LimboError, Result}; +use crate::{LimboError, MemoryIO, Result}; use rustix::fs::{self, FlockOperation, OFlags}; use rustix::io_uring::iovec; use std::cell::RefCell; @@ -35,6 +35,7 @@ impl fmt::Display for UringIOError { pub struct UringIO { inner: Rc>, + memory_io: Arc, } unsafe impl Send for UringIO {} @@ -78,6 +79,7 @@ impl UringIO { debug!("Using IO backend 'io-uring'"); Ok(Self { inner: Rc::new(RefCell::new(inner)), + memory_io: Arc::new(MemoryIO::new()), }) } } @@ -207,6 +209,10 @@ impl Clock for UringIO { micros: now.timestamp_subsec_micros(), } } + + fn get_memory_io(&self) -> Option> { + Some(self.memory_io.clone()) + } } pub struct UringFile { diff --git a/core/io/memory.rs b/core/io/memory.rs index 92a61bba7..d573c443a 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -58,6 +58,10 @@ impl IO for MemoryIO { getrandom::getrandom(&mut buf).unwrap(); i64::from_ne_bytes(buf) } + + fn get_memory_io(&self) -> Option> { + None + } } pub struct MemoryFile { diff --git a/core/io/mod.rs b/core/io/mod.rs index 1cda42380..36c39e013 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -40,6 +40,8 @@ pub trait IO: Clock + Send + Sync { fn run_once(&self) -> Result<()>; fn generate_random_number(&self) -> i64; + + fn get_memory_io(&self) -> Option>; } pub type Complete = dyn Fn(Arc>); diff --git a/core/io/unix.rs b/core/io/unix.rs index 32054e2d5..9c8ac6fed 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -2,7 +2,7 @@ use crate::error::LimboError; use crate::io::common; use crate::Result; -use super::{Completion, File, OpenFlags, IO}; +use super::{Completion, File, MemoryIO, OpenFlags, IO}; use polling::{Event, Events, Poller}; use rustix::{ fd::{AsFd, AsRawFd}, @@ -167,6 +167,7 @@ pub struct UnixIO { poller: PollHandler, events: EventsHandler, callbacks: OwnedCallbacks, + memory_io: Arc, } unsafe impl Send for UnixIO {} @@ -180,6 +181,7 @@ impl UnixIO { poller: PollHandler::new(), events: EventsHandler::new(), callbacks: OwnedCallbacks::new(), + memory_io: Arc::new(MemoryIO::new()), }) } } @@ -258,6 +260,10 @@ impl IO for UnixIO { getrandom::getrandom(&mut buf).unwrap(); i64::from_ne_bytes(buf) } + + fn get_memory_io(&self) -> Option> { + Some(self.memory_io.clone()) + } } enum CompletionCallback { diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 4d9a6d6e2..ede08c7cf 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -1,4 +1,4 @@ -use super::{Buffer, Completion, File, OpenFlags, IO}; +use super::{Buffer, Completion, File, MemoryIO, OpenFlags, IO}; use crate::ext::VfsMod; use crate::io::clock::{Clock, Instant}; use crate::{LimboError, Result}; @@ -50,6 +50,10 @@ impl IO for VfsMod { let vfs = unsafe { &*self.ctx }; unsafe { (vfs.gen_random_number)() } } + + fn get_memory_io(&self) -> Option> { + Some(Arc::new(MemoryIO::new())) + } } impl VfsMod { @@ -65,6 +69,10 @@ impl VfsMod { cstr.to_string_lossy().into_owned() } } + + fn get_memory_io(&self) -> Option> { + None + } } impl File for VfsFileImpl { diff --git a/core/io/windows.rs b/core/io/windows.rs index 2887ea308..7c3c2e015 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -3,8 +3,10 @@ use std::cell::RefCell; use std::io::{Read, Seek, Write}; use std::sync::Arc; use tracing::{debug, trace}; - -pub struct WindowsIO {} +use super::MemoryIO; +pub struct WindowsIO { + memory_io: Arc, +} impl WindowsIO { pub fn new() -> Result { @@ -26,6 +28,7 @@ impl IO for WindowsIO { .open(path)?; Ok(Arc::new(WindowsFile { file: RefCell::new(file), + memory_io: Arc::new(MemoryIO::new()), })) } @@ -48,6 +51,10 @@ impl Clock for WindowsIO { micros: now.timestamp_subsec_micros(), } } + + fn get_memory_io(&self) -> Option> { + Some(self.memory_io.clone()) + } } pub struct WindowsFile { From e5144bb6a9cba73764fc5ba35ad69fcec990db84 Mon Sep 17 00:00:00 2001 From: Diego Reis Date: Thu, 3 Apr 2025 15:47:19 -0300 Subject: [PATCH 03/13] core/storage: Create FileMemoryStorage This is basically a copy of DatabaseStorage but outside the fs compilation flag, this way, we can access MemoryIO regardless the storage medium. --- core/storage/database.rs | 49 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/core/storage/database.rs b/core/storage/database.rs index f23d2d3ee..33ca2ac18 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -70,3 +70,52 @@ impl DatabaseFile { Self { file } } } + +pub struct FileMemoryStorage { + file: Arc, +} + +unsafe impl Send for FileMemoryStorage {} +unsafe impl Sync for FileMemoryStorage {} + +impl DatabaseStorage for FileMemoryStorage { + fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> { + let r = match c { + Completion::Read(ref r) => r, + _ => unreachable!(), + }; + let size = r.buf().len(); + assert!(page_idx > 0); + if !(512..=65536).contains(&size) || size & (size - 1) != 0 { + return Err(LimboError::NotADB); + } + let pos = (page_idx - 1) * size; + self.file.pread(pos, c)?; + Ok(()) + } + + fn write_page( + &self, + page_idx: usize, + buffer: Arc>, + c: Completion, + ) -> Result<()> { + let buffer_size = buffer.borrow().len(); + assert!(buffer_size >= 512); + assert!(buffer_size <= 65536); + assert_eq!(buffer_size & (buffer_size - 1), 0); + let pos = (page_idx - 1) * buffer_size; + self.file.pwrite(pos, buffer, c)?; + Ok(()) + } + + fn sync(&self, c: Completion) -> Result<()> { + self.file.sync(c) + } +} + +impl FileMemoryStorage { + pub fn new(file: Arc) -> Self { + Self { file } + } +} From 66e12e1c2dd493dd2785cd80fcb1230fd299a0a5 Mon Sep 17 00:00:00 2001 From: Diego Reis Date: Thu, 3 Apr 2025 15:56:51 -0300 Subject: [PATCH 04/13] core/vdbe: Create OpenEphemeral bytecode "Open a new cursor P1 to a transient table. The cursor is always opened read/write even if the main database is read-only. The ephemeral table is deleted automatically when the cursor is closed. If the cursor P1 is already opened on an ephemeral table, the table is cleared (all content is erased)." There is still some work to do, but this is a basic setup --- core/storage/pager.rs | 57 ++++++++++++++------------- core/vdbe/execute.rs | 90 ++++++++++++++++++++++++++++++++++++++++++- core/vdbe/explain.rs | 16 ++++++++ core/vdbe/insn.rs | 56 +++------------------------ 4 files changed, 138 insertions(+), 81 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index e89b659bf..af574053b 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -258,23 +258,23 @@ impl Pager { } pub fn end_tx(&self) -> Result { - if let Some(wal) = &self.wal { - let checkpoint_status = self.cacheflush()?; - return match checkpoint_status { - CheckpointStatus::IO => Ok(checkpoint_status), - CheckpointStatus::Done(_) => { - wal.borrow().end_write_tx()?; - wal.borrow().end_read_tx()?; - Ok(checkpoint_status) - } + if let Some(wal) = &self.wal { + let checkpoint_status = self.cacheflush()?; + return match checkpoint_status { + CheckpointStatus::IO => Ok(checkpoint_status), + CheckpointStatus::Done(_) => { + wal.borrow().end_write_tx()?; + wal.borrow().end_read_tx()?; + Ok(checkpoint_status) + } + }; } - } - Ok(CheckpointStatus::Done(CheckpointResult::default())) + Ok(CheckpointStatus::Done(CheckpointResult::default())) } pub fn end_read_tx(&self) -> Result<()> { - if let Some(wal) = &self.wal { + if let Some(wal) = &self.wal { wal.borrow().end_read_tx()?; } Ok(()) @@ -297,19 +297,18 @@ impl Pager { page.set_locked(); if let Some(wal) = &self.wal { - if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64)? { - wal - .borrow() - .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; - { - page.set_uptodate(); + if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64)? { + wal.borrow() + .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; + { + page.set_uptodate(); + } + // TODO(pere) ensure page is inserted, we should probably first insert to page cache + // and if successful, read frame or page + page_cache.insert(page_key, page.clone()); + return Ok(page); } - // TODO(pere) ensure page is inserted, we should probably first insert to page cache - // and if successful, read frame or page - page_cache.insert(page_key, page.clone()); - return Ok(page); } - } sqlite3_ondisk::begin_read_page( self.db_file.clone(), self.buffer_pool.clone(), @@ -327,7 +326,7 @@ impl Pager { trace!("load_page(page_idx = {})", id); let mut page_cache = self.page_cache.write(); page.set_locked(); - if let Some(wal) = &self.wal { + if let Some(wal) = &self.wal { let page_key = PageCacheKey::new(id, Some(wal.borrow().get_max_frame())); if let Some(frame_id) = wal.borrow().find_frame(id as u64)? { wal.borrow() @@ -340,8 +339,8 @@ impl Pager { page_cache.insert(page_key, page.clone()); } return Ok(()); + } } - } sqlite3_ondisk::begin_read_page( self.db_file.clone(), self.buffer_pool.clone(), @@ -350,7 +349,7 @@ impl Pager { )?; Ok(()) -} + } /// Writes the database header. pub fn write_database_header(&self, header: &DatabaseHeader) { @@ -379,7 +378,7 @@ impl Pager { let db_size = self.db_header.lock().database_size; for page_id in self.dirty_pages.borrow().iter() { let mut cache = self.page_cache.write(); - if let Some(wal) = &self.wal { + if let Some(wal) = &self.wal { let page_key = PageCacheKey::new(*page_id, Some(wal.borrow().get_max_frame())); let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); @@ -412,7 +411,7 @@ impl Pager { let wal = self.wal.clone().ok_or(LimboError::InternalError( "SyncWal was called without a existing wal".to_string(), ))?; - match wal.borrow_mut().sync() { + match wal.borrow_mut().sync() { Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO), Ok(CheckpointStatus::Done(res)) => checkpoint_result = res, Err(e) => return Err(e), @@ -628,7 +627,7 @@ impl Pager { page.set_dirty(); self.add_dirty(page.get().id); let mut cache = self.page_cache.write(); - let max_frame = match &self.wal { + let max_frame = match &self.wal { Some(wal) => wal.borrow().get_max_frame(), None => 0, }; diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index ecc97d088..fb23141d0 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -1,4 +1,6 @@ #![allow(unused_variables)] +use crate::storage::database::FileMemoryStorage; +use crate::storage::page_cache::DumbLruPageCache; use crate::{ error::{LimboError, SQLITE_CONSTRAINT, SQLITE_CONSTRAINT_PRIMARYKEY}, ext::ExtValue, @@ -10,7 +12,7 @@ use crate::{ printf::exec_printf, }, }; -use std::{borrow::BorrowMut, rc::Rc}; +use std::{borrow::BorrowMut, rc::Rc, sync::Arc}; use crate::{pseudo::PseudoCursor, result::LimboResult}; @@ -36,12 +38,13 @@ use crate::{ vector::{vector32, vector64, vector_distance_cos, vector_extract}, }; -use crate::{info, MvCursor, RefValue, Row, StepResult, TransactionState}; +use crate::{info, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult, TransactionState}; use super::{ insn::{Cookie, RegisterOrLiteral}, HaltState, }; +use parking_lot::RwLock; use rand::thread_rng; use super::{ @@ -4504,6 +4507,89 @@ pub fn op_noop( Ok(InsnFunctionStepResult::Step) } +pub fn op_open_ephemeral( + program: &Program, + state: &mut ProgramState, + insn: &Insn, + pager: &Rc, + mv_store: Option<&Rc>, +) -> Result { + let Insn::OpenEphemeral { + cursor_id, + is_btree, + } = insn + else { + unreachable!("unexpected Insn {:?}", insn) + }; + + let conn = program.connection.upgrade().unwrap(); + // Only memory and vfs IOs returns None, so cloning is safe + let io = match conn.pager.io.get_memory_io() { + Some(io) => io, + None => conn.pager.io.clone(), + }; + + let file = io.open_file("", OpenFlags::Create, true)?; + let page_io = Arc::new(FileMemoryStorage::new(file)); + + let db_header = Pager::begin_open(page_io.clone())?; + let buffer_pool = Rc::new(BufferPool::new(512)); + let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10))); + + let pager = Rc::new(Pager::finish_open( + db_header, + page_io, + None, + io, + page_cache, + buffer_pool, + )?); + + let root_page = pager.btree_create(*is_btree as usize); + + let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap(); + let mv_cursor = match state.mv_tx_id { + Some(tx_id) => { + let table_id = root_page as u64; + let mv_store = mv_store.as_ref().unwrap().clone(); + let mv_cursor = Rc::new(RefCell::new( + MvCursor::new(mv_store.clone(), tx_id, table_id).unwrap(), + )); + Some(mv_cursor) + } + None => None, + }; + let cursor = BTreeCursor::new(mv_cursor, pager, root_page as usize); + let mut cursors: std::cell::RefMut<'_, Vec>> = state.cursors.borrow_mut(); + // Table content is erased if the cursor already exists + match cursor_type { + CursorType::BTreeTable(_) => { + cursors + .get_mut(*cursor_id) + .unwrap() + .replace(Cursor::new_btree(cursor)); + } + CursorType::BTreeIndex(_) => { + cursors + .get_mut(*cursor_id) + .unwrap() + .replace(Cursor::new_btree(cursor)); + } + CursorType::Pseudo(_) => { + panic!("OpenEphemeral on pseudo cursor"); + } + CursorType::Sorter => { + panic!("OpenEphemeral on sorter cursor"); + } + CursorType::VirtualTable(_) => { + panic!("OpenEphemeral on virtual table cursor, use Insn::VOpenAsync instead"); + } + } + + state.pc += 1; + Ok(InsnFunctionStepResult::Step) +} + fn exec_lower(reg: &OwnedValue) -> Option { match reg { OwnedValue::Text(t) => Some(OwnedValue::build_text(&t.as_str().to_lowercase())), diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs index 79b0f3ded..40854da24 100644 --- a/core/vdbe/explain.rs +++ b/core/vdbe/explain.rs @@ -1389,6 +1389,22 @@ pub fn insn_to_str( 0, format!("auto_commit={}, rollback={}", auto_commit, rollback), ), + Insn::OpenEphemeral { + cursor_id, + is_btree, + } => ( + "OpenEphemeral", + *cursor_id as i32, + *is_btree as i32, + 0, + OwnedValue::build_text(""), + 0, + format!( + "cursor={} is_btree={}", + cursor_id, + if *is_btree { "true" } else { "false" } + ), + ), }; format!( "{:<4} {:<17} {:<4} {:<4} {:<4} {:<13} {:<2} {}", diff --git a/core/vdbe/insn.rs b/core/vdbe/insn.rs index acd8166c3..83bac3701 100644 --- a/core/vdbe/insn.rs +++ b/core/vdbe/insn.rs @@ -812,40 +812,33 @@ pub enum Insn { dest: usize, cookie: Cookie, }, + /// Open a new cursor P1 to a transient table. + OpenEphemeral { + cursor_id: usize, + is_btree: bool, + }, } impl Insn { pub fn to_function(&self) -> InsnFunction { match self { Insn::Init { .. } => execute::op_init, - Insn::Null { .. } => execute::op_null, - Insn::NullRow { .. } => execute::op_null_row, - Insn::Add { .. } => execute::op_add, - Insn::Subtract { .. } => execute::op_subtract, - Insn::Multiply { .. } => execute::op_multiply, - Insn::Divide { .. } => execute::op_divide, - Insn::Compare { .. } => execute::op_compare, Insn::BitAnd { .. } => execute::op_bit_and, - Insn::BitOr { .. } => execute::op_bit_or, - Insn::BitNot { .. } => execute::op_bit_not, - Insn::Checkpoint { .. } => execute::op_checkpoint, Insn::Remainder { .. } => execute::op_remainder, - Insn::Jump { .. } => execute::op_jump, Insn::Move { .. } => execute::op_move, Insn::IfPos { .. } => execute::op_if_pos, Insn::NotNull { .. } => execute::op_not_null, - Insn::Eq { .. } => execute::op_eq, Insn::Ne { .. } => execute::op_ne, Insn::Lt { .. } => execute::op_lt, @@ -856,11 +849,8 @@ impl Insn { Insn::IfNot { .. } => execute::op_if_not, Insn::OpenReadAsync { .. } => execute::op_open_read_async, Insn::OpenReadAwait => execute::op_open_read_await, - Insn::VOpenAsync { .. } => execute::op_vopen_async, - Insn::VOpenAwait => execute::op_vopen_await, - Insn::VCreate { .. } => execute::op_vcreate, Insn::VFilter { .. } => execute::op_vfilter, Insn::VColumn { .. } => execute::op_vcolumn, @@ -868,43 +858,29 @@ impl Insn { Insn::VNext { .. } => execute::op_vnext, Insn::OpenPseudo { .. } => execute::op_open_pseudo, Insn::RewindAsync { .. } => execute::op_rewind_async, - Insn::RewindAwait { .. } => execute::op_rewind_await, Insn::LastAsync { .. } => execute::op_last_async, - Insn::LastAwait { .. } => execute::op_last_await, Insn::Column { .. } => execute::op_column, Insn::TypeCheck { .. } => execute::op_type_check, Insn::MakeRecord { .. } => execute::op_make_record, Insn::ResultRow { .. } => execute::op_result_row, - Insn::NextAsync { .. } => execute::op_next_async, - Insn::NextAwait { .. } => execute::op_next_await, Insn::PrevAsync { .. } => execute::op_prev_async, - Insn::PrevAwait { .. } => execute::op_prev_await, Insn::Halt { .. } => execute::op_halt, Insn::Transaction { .. } => execute::op_transaction, - Insn::AutoCommit { .. } => execute::op_auto_commit, Insn::Goto { .. } => execute::op_goto, - Insn::Gosub { .. } => execute::op_gosub, Insn::Return { .. } => execute::op_return, - Insn::Integer { .. } => execute::op_integer, - Insn::Real { .. } => execute::op_real, - Insn::RealAffinity { .. } => execute::op_real_affinity, - Insn::String8 { .. } => execute::op_string8, - Insn::Blob { .. } => execute::op_blob, - Insn::RowId { .. } => execute::op_row_id, - Insn::SeekRowid { .. } => execute::op_seek_rowid, Insn::DeferredSeek { .. } => execute::op_deferred_seek, Insn::SeekGE { .. } => execute::op_seek, @@ -917,10 +893,8 @@ impl Insn { Insn::IdxLE { .. } => execute::op_idx_le, Insn::IdxLT { .. } => execute::op_idx_lt, Insn::DecrJumpZero { .. } => execute::op_decr_jump_zero, - Insn::AggStep { .. } => execute::op_agg_step, Insn::AggFinal { .. } => execute::op_agg_final, - Insn::SorterOpen { .. } => execute::op_sorter_open, Insn::SorterInsert { .. } => execute::op_sorter_insert, Insn::SorterSort { .. } => execute::op_sorter_sort, @@ -929,57 +903,39 @@ impl Insn { Insn::Function { .. } => execute::op_function, Insn::InitCoroutine { .. } => execute::op_init_coroutine, Insn::EndCoroutine { .. } => execute::op_end_coroutine, - Insn::Yield { .. } => execute::op_yield, Insn::InsertAsync { .. } => execute::op_insert_async, Insn::InsertAwait { .. } => execute::op_insert_await, Insn::IdxInsertAsync { .. } => execute::op_idx_insert_async, Insn::IdxInsertAwait { .. } => execute::op_idx_insert_await, Insn::DeleteAsync { .. } => execute::op_delete_async, - Insn::DeleteAwait { .. } => execute::op_delete_await, - Insn::NewRowid { .. } => execute::op_new_rowid, Insn::MustBeInt { .. } => execute::op_must_be_int, - Insn::SoftNull { .. } => execute::op_soft_null, - Insn::NotExists { .. } => execute::op_not_exists, Insn::OffsetLimit { .. } => execute::op_offset_limit, Insn::OpenWriteAsync { .. } => execute::op_open_write_async, Insn::OpenWriteAwait { .. } => execute::op_open_write_await, - Insn::Copy { .. } => execute::op_copy, Insn::CreateBtree { .. } => execute::op_create_btree, - Insn::Destroy { .. } => execute::op_destroy, Insn::DropTable { .. } => execute::op_drop_table, Insn::Close { .. } => execute::op_close, - Insn::IsNull { .. } => execute::op_is_null, - Insn::ParseSchema { .. } => execute::op_parse_schema, - Insn::ShiftRight { .. } => execute::op_shift_right, - Insn::ShiftLeft { .. } => execute::op_shift_left, - Insn::Variable { .. } => execute::op_variable, - Insn::ZeroOrNull { .. } => execute::op_zero_or_null, - Insn::Not { .. } => execute::op_not, - Insn::Concat { .. } => execute::op_concat, - Insn::And { .. } => execute::op_and, - Insn::Or { .. } => execute::op_or, - Insn::Noop => execute::op_noop, Insn::PageCount { .. } => execute::op_page_count, - Insn::ReadCookie { .. } => execute::op_read_cookie, + Insn::OpenEphemeral { .. } => execute::op_open_ephemeral, } } } From 79f8b83cbe17457125cf05d953bb445b7dd04db3 Mon Sep 17 00:00:00 2001 From: Diego Reis Date: Thu, 3 Apr 2025 16:25:13 -0300 Subject: [PATCH 05/13] Fix dumb clippy errors --- bindings/javascript/src/lib.rs | 4 ++++ bindings/wasm/lib.rs | 4 ++++ core/io/generic.rs | 8 ++++++-- core/io/windows.rs | 5 +++-- core/storage/database.rs | 1 - core/vdbe/execute.rs | 2 +- simulator/runner/io.rs | 4 ++++ 7 files changed, 22 insertions(+), 6 deletions(-) diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index a9c0d72a5..eaa2fc3a6 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -176,4 +176,8 @@ impl limbo_core::IO for IO { fn generate_random_number(&self) -> i64 { todo!(); } + + fn get_memory_io(&self) -> Option> { + todo!() + } } diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 91680dc96..6b4173c90 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -305,6 +305,10 @@ impl limbo_core::IO for PlatformIO { let random_f64 = Math_random(); (random_f64 * i64::MAX as f64) as i64 } + + fn get_memory_io(&self) -> Option> { + None // TODO: Make sure if memory isn't needed here + } } #[wasm_bindgen] diff --git a/core/io/generic.rs b/core/io/generic.rs index b17a0ea5a..03d5fdd3d 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -5,12 +5,16 @@ use std::sync::Arc; use tracing::{debug, trace}; use super::MemoryIO; -pub struct GenericIO {} +pub struct GenericIO { + memory_io: Arc, +} impl GenericIO { pub fn new() -> Result { debug!("Using IO backend 'generic'"); - Ok(Self {}) + Ok(Self { + memory_io: Arc::new(MemoryIO::new()), + }) } } diff --git a/core/io/windows.rs b/core/io/windows.rs index 7c3c2e015..af36119d0 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -11,7 +11,9 @@ pub struct WindowsIO { impl WindowsIO { pub fn new() -> Result { debug!("Using IO backend 'syscall'"); - Ok(Self {}) + Ok(Self { + memory_io: Arc::new(MemoryIO::new()), + }) } } @@ -28,7 +30,6 @@ impl IO for WindowsIO { .open(path)?; Ok(Arc::new(WindowsFile { file: RefCell::new(file), - memory_io: Arc::new(MemoryIO::new()), })) } diff --git a/core/storage/database.rs b/core/storage/database.rs index 33ca2ac18..cf8b57d8e 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -1,4 +1,3 @@ -#[cfg(feature = "fs")] use crate::error::LimboError; use crate::{io::Completion, Buffer, Result}; use std::{cell::RefCell, sync::Arc}; diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index fb23141d0..3ad0a91f8 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -4551,7 +4551,7 @@ pub fn op_open_ephemeral( let mv_cursor = match state.mv_tx_id { Some(tx_id) => { let table_id = root_page as u64; - let mv_store = mv_store.as_ref().unwrap().clone(); + let mv_store = mv_store.unwrap().clone(); let mv_cursor = Rc::new(RefCell::new( MvCursor::new(mv_store.clone(), tx_id, table_id).unwrap(), )); diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index d1c280b4e..0a7ff3b3a 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -97,4 +97,8 @@ impl IO for SimulatorIO { fn generate_random_number(&self) -> i64 { self.rng.borrow_mut().next_u64() as i64 } + + fn get_memory_io(&self) -> Option> { + todo!() + } } From d9bf38350773789fe01df952e27ba4cadd2a56a1 Mon Sep 17 00:00:00 2001 From: Diego Reis Date: Fri, 4 Apr 2025 00:35:36 -0300 Subject: [PATCH 06/13] core/io: Untie MemoryIO's lifetime of the IO layer --- bindings/javascript/src/lib.rs | 4 ++-- bindings/wasm/lib.rs | 4 ++-- core/io/generic.rs | 8 +++----- core/io/io_uring.rs | 6 ++---- core/io/memory.rs | 4 ++-- core/io/mod.rs | 2 +- core/io/unix.rs | 6 ++---- core/io/vfs.rs | 4 ++-- core/io/windows.rs | 6 ++---- core/vdbe/execute.rs | 10 ++++------ simulator/runner/io.rs | 2 +- 11 files changed, 23 insertions(+), 33 deletions(-) diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index eaa2fc3a6..2e0054358 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -177,7 +177,7 @@ impl limbo_core::IO for IO { todo!(); } - fn get_memory_io(&self) -> Option> { - todo!() + fn get_memory_io(&self) -> Arc { + Arc::new(limbo_core::MemoryIO::new()) } } diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 6b4173c90..a704706be 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -306,8 +306,8 @@ impl limbo_core::IO for PlatformIO { (random_f64 * i64::MAX as f64) as i64 } - fn get_memory_io(&self) -> Option> { - None // TODO: Make sure if memory isn't needed here + fn get_memory_io(&self) -> Arc { + Arc::new(limbo_core::MemoryIO::new()) } } diff --git a/core/io/generic.rs b/core/io/generic.rs index 03d5fdd3d..d67a93dd7 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -6,14 +6,12 @@ use tracing::{debug, trace}; use super::MemoryIO; pub struct GenericIO { - memory_io: Arc, } impl GenericIO { pub fn new() -> Result { debug!("Using IO backend 'generic'"); Ok(Self { - memory_io: Arc::new(MemoryIO::new()), }) } } @@ -55,9 +53,9 @@ impl Clock for GenericIO { } } - fn get_memory_io(&self) -> Option> { - Some(self.memory_io.clone()) - } + fn get_memory_io(&self) -> Arc { + Arc::new(MemoryIO::new()) + } } pub struct GenericFile { diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 7d73461a0..6e2fc1e7e 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -35,7 +35,6 @@ impl fmt::Display for UringIOError { pub struct UringIO { inner: Rc>, - memory_io: Arc, } unsafe impl Send for UringIO {} @@ -79,7 +78,6 @@ impl UringIO { debug!("Using IO backend 'io-uring'"); Ok(Self { inner: Rc::new(RefCell::new(inner)), - memory_io: Arc::new(MemoryIO::new()), }) } } @@ -210,8 +208,8 @@ impl Clock for UringIO { } } - fn get_memory_io(&self) -> Option> { - Some(self.memory_io.clone()) + fn get_memory_io(&self) -> Arc { + Arc::new(MemoryIO::new()) } } diff --git a/core/io/memory.rs b/core/io/memory.rs index d573c443a..9cc56a5e3 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -59,8 +59,8 @@ impl IO for MemoryIO { i64::from_ne_bytes(buf) } - fn get_memory_io(&self) -> Option> { - None + fn get_memory_io(&self) -> Arc { + Arc::new(MemoryIO::new()) } } diff --git a/core/io/mod.rs b/core/io/mod.rs index 36c39e013..6f161d114 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -41,7 +41,7 @@ pub trait IO: Clock + Send + Sync { fn generate_random_number(&self) -> i64; - fn get_memory_io(&self) -> Option>; + fn get_memory_io(&self) -> Arc; } pub type Complete = dyn Fn(Arc>); diff --git a/core/io/unix.rs b/core/io/unix.rs index 9c8ac6fed..c232ed3ad 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -167,7 +167,6 @@ pub struct UnixIO { poller: PollHandler, events: EventsHandler, callbacks: OwnedCallbacks, - memory_io: Arc, } unsafe impl Send for UnixIO {} @@ -181,7 +180,6 @@ impl UnixIO { poller: PollHandler::new(), events: EventsHandler::new(), callbacks: OwnedCallbacks::new(), - memory_io: Arc::new(MemoryIO::new()), }) } } @@ -261,8 +259,8 @@ impl IO for UnixIO { i64::from_ne_bytes(buf) } - fn get_memory_io(&self) -> Option> { - Some(self.memory_io.clone()) + fn get_memory_io(&self) -> Arc { + Arc::new(MemoryIO::new()) } } diff --git a/core/io/vfs.rs b/core/io/vfs.rs index ede08c7cf..6af47b176 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -70,8 +70,8 @@ impl VfsMod { } } - fn get_memory_io(&self) -> Option> { - None + fn get_memory_io(&self) -> Arc { + Arc::new(MemoryIO::new()) } } diff --git a/core/io/windows.rs b/core/io/windows.rs index af36119d0..f970ef02d 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -5,14 +5,12 @@ use std::sync::Arc; use tracing::{debug, trace}; use super::MemoryIO; pub struct WindowsIO { - memory_io: Arc, } impl WindowsIO { pub fn new() -> Result { debug!("Using IO backend 'syscall'"); Ok(Self { - memory_io: Arc::new(MemoryIO::new()), }) } } @@ -53,8 +51,8 @@ impl Clock for WindowsIO { } } - fn get_memory_io(&self) -> Option> { - Some(self.memory_io.clone()) + fn get_memory_io(&self) -> Arc { + Arc::new(MemoryIO::new()) } } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 3ad0a91f8..4f40f0d21 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -38,7 +38,9 @@ use crate::{ vector::{vector32, vector64, vector_distance_cos, vector_extract}, }; -use crate::{info, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult, TransactionState}; +use crate::{ + info, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult, TransactionState, IO, +}; use super::{ insn::{Cookie, RegisterOrLiteral}, @@ -4523,11 +4525,7 @@ pub fn op_open_ephemeral( }; let conn = program.connection.upgrade().unwrap(); - // Only memory and vfs IOs returns None, so cloning is safe - let io = match conn.pager.io.get_memory_io() { - Some(io) => io, - None => conn.pager.io.clone(), - }; + let io = conn.pager.io.get_memory_io(); let file = io.open_file("", OpenFlags::Create, true)?; let page_io = Arc::new(FileMemoryStorage::new(file)); diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index 0a7ff3b3a..c775b3f9e 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -98,7 +98,7 @@ impl IO for SimulatorIO { self.rng.borrow_mut().next_u64() as i64 } - fn get_memory_io(&self) -> Option> { + fn get_memory_io(&self) -> Arc { todo!() } } From 09d83aadf3c0b49d00299b1595fe8d1e7b6adf04 Mon Sep 17 00:00:00 2001 From: Diego Reis Date: Mon, 7 Apr 2025 23:05:01 -0300 Subject: [PATCH 07/13] Fix dumb conflict errors --- core/io/generic.rs | 16 +++++++--------- core/io/io_uring.rs | 10 +++++----- core/io/vfs.rs | 8 ++------ core/io/windows.rs | 16 +++++++--------- 4 files changed, 21 insertions(+), 29 deletions(-) diff --git a/core/io/generic.rs b/core/io/generic.rs index d67a93dd7..fd59ece88 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -1,18 +1,16 @@ +use super::MemoryIO; use crate::{Clock, Completion, File, Instant, LimboError, OpenFlags, Result, IO}; use std::cell::RefCell; use std::io::{Read, Seek, Write}; use std::sync::Arc; use tracing::{debug, trace}; -use super::MemoryIO; -pub struct GenericIO { -} +pub struct GenericIO {} impl GenericIO { pub fn new() -> Result { debug!("Using IO backend 'generic'"); - Ok(Self { - }) + Ok(Self {}) } } @@ -42,6 +40,10 @@ impl IO for GenericIO { getrandom::getrandom(&mut buf).unwrap(); i64::from_ne_bytes(buf) } + + fn get_memory_io(&self) -> Arc { + Arc::new(MemoryIO::new()) + } } impl Clock for GenericIO { @@ -52,10 +54,6 @@ impl Clock for GenericIO { micros: now.timestamp_subsec_micros(), } } - - fn get_memory_io(&self) -> Arc { - Arc::new(MemoryIO::new()) - } } pub struct GenericFile { diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 6e2fc1e7e..b4b21aca8 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -1,4 +1,5 @@ use super::{common, Completion, File, OpenFlags, WriteCompletion, IO}; +use crate::io::clock::{Clock, Instant}; use crate::{LimboError, MemoryIO, Result}; use rustix::fs::{self, FlockOperation, OFlags}; use rustix::io_uring::iovec; @@ -11,7 +12,6 @@ use std::rc::Rc; use std::sync::Arc; use thiserror::Error; use tracing::{debug, trace}; -use crate::io::clock::{Clock, Instant}; const MAX_IOVECS: u32 = 128; const SQPOLL_IDLE: u32 = 1000; @@ -197,6 +197,10 @@ impl IO for UringIO { getrandom::getrandom(&mut buf).unwrap(); i64::from_ne_bytes(buf) } + + fn get_memory_io(&self) -> Arc { + Arc::new(MemoryIO::new()) + } } impl Clock for UringIO { @@ -207,10 +211,6 @@ impl Clock for UringIO { micros: now.timestamp_subsec_micros(), } } - - fn get_memory_io(&self) -> Arc { - Arc::new(MemoryIO::new()) - } } pub struct UringFile { diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 6af47b176..95b4055d0 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -51,8 +51,8 @@ impl IO for VfsMod { unsafe { (vfs.gen_random_number)() } } - fn get_memory_io(&self) -> Option> { - Some(Arc::new(MemoryIO::new())) + fn get_memory_io(&self) -> Arc { + Arc::new(MemoryIO::new()) } } @@ -69,10 +69,6 @@ impl VfsMod { cstr.to_string_lossy().into_owned() } } - - fn get_memory_io(&self) -> Arc { - Arc::new(MemoryIO::new()) - } } impl File for VfsFileImpl { diff --git a/core/io/windows.rs b/core/io/windows.rs index f970ef02d..6c46d1973 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -1,17 +1,15 @@ +use super::MemoryIO; use crate::{Clock, Completion, File, Instant, LimboError, OpenFlags, Result, IO}; use std::cell::RefCell; use std::io::{Read, Seek, Write}; use std::sync::Arc; use tracing::{debug, trace}; -use super::MemoryIO; -pub struct WindowsIO { -} +pub struct WindowsIO {} impl WindowsIO { pub fn new() -> Result { debug!("Using IO backend 'syscall'"); - Ok(Self { - }) + Ok(Self {}) } } @@ -40,6 +38,10 @@ impl IO for WindowsIO { getrandom::getrandom(&mut buf).unwrap(); i64::from_ne_bytes(buf) } + + fn get_memory_io(&self) -> Arc { + Arc::new(MemoryIO::new()) + } } impl Clock for WindowsIO { @@ -50,10 +52,6 @@ impl Clock for WindowsIO { micros: now.timestamp_subsec_micros(), } } - - fn get_memory_io(&self) -> Arc { - Arc::new(MemoryIO::new()) - } } pub struct WindowsFile { From bcac1fe778a862864b7a7ceb833b0eaa88bb16c8 Mon Sep 17 00:00:00 2001 From: Diego Reis Date: Fri, 11 Apr 2025 07:24:42 -0300 Subject: [PATCH 08/13] core/vdbe: Rename page_io to db_file in OpenEphemeral --- core/vdbe/execute.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 4f40f0d21..1ddc8cdfc 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -4528,15 +4528,15 @@ pub fn op_open_ephemeral( let io = conn.pager.io.get_memory_io(); let file = io.open_file("", OpenFlags::Create, true)?; - let page_io = Arc::new(FileMemoryStorage::new(file)); + let db_file = Arc::new(FileMemoryStorage::new(file)); - let db_header = Pager::begin_open(page_io.clone())?; + let db_header = Pager::begin_open(db_file.clone())?; let buffer_pool = Rc::new(BufferPool::new(512)); let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10))); let pager = Rc::new(Pager::finish_open( db_header, - page_io, + db_file, None, io, page_cache, From 61c324cca558972d7db5b165e2c95941fe7475e9 Mon Sep 17 00:00:00 2001 From: Diego Reis Date: Fri, 11 Apr 2025 07:29:51 -0300 Subject: [PATCH 09/13] core/vdbe: Add missing work to get cursor and transient table usable --- core/vdbe/execute.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 1ddc8cdfc..cac1344ed 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -39,7 +39,8 @@ use crate::{ }; use crate::{ - info, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult, TransactionState, IO, + info, maybe_init_database_file, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult, + TransactionState, IO, }; use super::{ @@ -4528,10 +4529,11 @@ pub fn op_open_ephemeral( let io = conn.pager.io.get_memory_io(); let file = io.open_file("", OpenFlags::Create, true)?; + maybe_init_database_file(&file, &(io.clone() as Arc))?; let db_file = Arc::new(FileMemoryStorage::new(file)); let db_header = Pager::begin_open(db_file.clone())?; - let buffer_pool = Rc::new(BufferPool::new(512)); + let buffer_pool = Rc::new(BufferPool::new(db_header.lock().page_size as usize)); let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10))); let pager = Rc::new(Pager::finish_open( @@ -4557,8 +4559,11 @@ pub fn op_open_ephemeral( } None => None, }; - let cursor = BTreeCursor::new(mv_cursor, pager, root_page as usize); + let mut cursor = BTreeCursor::new(mv_cursor, pager, root_page as usize); + cursor.rewind()?; // Will never return io + let mut cursors: std::cell::RefMut<'_, Vec>> = state.cursors.borrow_mut(); + // Table content is erased if the cursor already exists match cursor_type { CursorType::BTreeTable(_) => { From 035e6dcef4dac473e77a7ac725d565aeac2eaacc Mon Sep 17 00:00:00 2001 From: Diego Reis Date: Fri, 11 Apr 2025 07:32:31 -0300 Subject: [PATCH 10/13] core/vdbe: Fix logic error during btree creation I do thing we should change this 1,2 flag to 0,1 or just an enum, to be more rustacean. The current state can be very misleading --- core/vdbe/execute.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index cac1344ed..d1db2e5ee 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -4545,7 +4545,8 @@ pub fn op_open_ephemeral( buffer_pool, )?); - let root_page = pager.btree_create(*is_btree as usize); + let flag = if *is_btree { 1 } else { 0 }; + let root_page = pager.btree_create(flag); let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap(); let mv_cursor = match state.mv_tx_id { From 135330b7361ac9cbabbc6200a15c917c49e446e5 Mon Sep 17 00:00:00 2001 From: Diego Reis Date: Sat, 12 Apr 2025 20:56:08 -0300 Subject: [PATCH 11/13] core/pager: Fix page handling issue due change in wal type --- core/storage/pager.rs | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index af574053b..9d7affa95 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -326,8 +326,12 @@ impl Pager { trace!("load_page(page_idx = {})", id); let mut page_cache = self.page_cache.write(); page.set_locked(); + let max_frame = match &self.wal { + Some(wal) => wal.borrow().get_max_frame(), + None => 0, + }; + let page_key = PageCacheKey::new(id, Some(max_frame)); if let Some(wal) = &self.wal { - let page_key = PageCacheKey::new(id, Some(wal.borrow().get_max_frame())); if let Some(frame_id) = wal.borrow().find_frame(id as u64)? { wal.borrow() .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; @@ -341,6 +345,11 @@ impl Pager { return Ok(()); } } + + // TODO(pere) ensure page is inserted + if !page_cache.contains_key(&page_key) { + page_cache.insert(page_key, page.clone()); + } sqlite3_ondisk::begin_read_page( self.db_file.clone(), self.buffer_pool.clone(), @@ -376,11 +385,14 @@ impl Pager { match state { FlushState::Start => { let db_size = self.db_header.lock().database_size; + let max_frame = match &self.wal { + Some(wal) => wal.borrow().get_max_frame(), + None => 0, + }; for page_id in self.dirty_pages.borrow().iter() { let mut cache = self.page_cache.write(); + let page_key = PageCacheKey::new(*page_id, Some(max_frame)); if let Some(wal) = &self.wal { - let page_key = - PageCacheKey::new(*page_id, Some(wal.borrow().get_max_frame())); let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); trace!("cacheflush(page={}, page_type={:?}", page_id, page_type); @@ -389,11 +401,11 @@ impl Pager { db_size, self.flush_info.borrow().in_flight_writes.clone(), )?; - // This page is no longer valid. - // For example: - // We took page with key (page_num, max_frame) -- this page is no longer valid for that max_frame so it must be invalidated. - cache.delete(page_key); } + // This page is no longer valid. + // For example: + // We took page with key (page_num, max_frame) -- this page is no longer valid for that max_frame so it must be invalidated. + cache.delete(page_key); } self.dirty_pages.borrow_mut().clear(); self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames; From 4c315e1bb6b5b6e2011c6a689727b3ccfa40baa9 Mon Sep 17 00:00:00 2001 From: Diego Reis Date: Sun, 13 Apr 2025 11:13:25 -0300 Subject: [PATCH 12/13] core/vdbe: Update OpenEphemeral to use CreateBtreeFlags --- core/vdbe/execute.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index d1db2e5ee..b496ac199 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -1,6 +1,7 @@ #![allow(unused_variables)] use crate::storage::database::FileMemoryStorage; use crate::storage::page_cache::DumbLruPageCache; +use crate::storage::pager::CreateBTreeFlags; use crate::{ error::{LimboError, SQLITE_CONSTRAINT, SQLITE_CONSTRAINT_PRIMARYKEY}, ext::ExtValue, @@ -4545,7 +4546,12 @@ pub fn op_open_ephemeral( buffer_pool, )?); - let flag = if *is_btree { 1 } else { 0 }; + let flag = if *is_btree { + &CreateBTreeFlags::new_table() + } else { + &CreateBTreeFlags::new_index() + }; + let root_page = pager.btree_create(flag); let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap(); From fd79ad2644fd04dceb9e4f76d7f7aeabd3c3103b Mon Sep 17 00:00:00 2001 From: Diego Reis Date: Sun, 13 Apr 2025 11:15:01 -0300 Subject: [PATCH 13/13] core/vdbe: Change `is_btree` to `is_table` in OpenEphemeral --- core/vdbe/execute.rs | 4 ++-- core/vdbe/explain.rs | 8 ++++---- core/vdbe/insn.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index b496ac199..0df9afcc4 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -4520,7 +4520,7 @@ pub fn op_open_ephemeral( ) -> Result { let Insn::OpenEphemeral { cursor_id, - is_btree, + is_table, } = insn else { unreachable!("unexpected Insn {:?}", insn) @@ -4546,7 +4546,7 @@ pub fn op_open_ephemeral( buffer_pool, )?); - let flag = if *is_btree { + let flag = if *is_table { &CreateBTreeFlags::new_table() } else { &CreateBTreeFlags::new_index() diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs index 40854da24..d4a766d1d 100644 --- a/core/vdbe/explain.rs +++ b/core/vdbe/explain.rs @@ -1391,18 +1391,18 @@ pub fn insn_to_str( ), Insn::OpenEphemeral { cursor_id, - is_btree, + is_table, } => ( "OpenEphemeral", *cursor_id as i32, - *is_btree as i32, + *is_table as i32, 0, OwnedValue::build_text(""), 0, format!( - "cursor={} is_btree={}", + "cursor={} is_table={}", cursor_id, - if *is_btree { "true" } else { "false" } + if *is_table { "true" } else { "false" } ), ), }; diff --git a/core/vdbe/insn.rs b/core/vdbe/insn.rs index 83bac3701..e12293a71 100644 --- a/core/vdbe/insn.rs +++ b/core/vdbe/insn.rs @@ -815,7 +815,7 @@ pub enum Insn { /// Open a new cursor P1 to a transient table. OpenEphemeral { cursor_id: usize, - is_btree: bool, + is_table: bool, }, }