mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 17:05:36 +01:00
write page1 on database initialization
Page 1 must be initialized and written as soon as possible without marking page as dirty.
This commit is contained in:
21
core/lib.rs
21
core/lib.rs
@@ -51,7 +51,8 @@ pub use io::{
|
||||
use limbo_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser};
|
||||
use parking_lot::RwLock;
|
||||
use schema::Schema;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Mutex;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
cell::{Cell, RefCell, UnsafeCell},
|
||||
@@ -67,6 +68,7 @@ use std::{
|
||||
use storage::database::DatabaseFile;
|
||||
use storage::page_cache::DumbLruPageCache;
|
||||
pub use storage::pager::PagerCacheflushStatus;
|
||||
use storage::pager::{DB_STATE_EMPTY, DB_STATE_INITIALIZED};
|
||||
pub use storage::{
|
||||
buffer_pool::BufferPool,
|
||||
database::DatabaseStorage,
|
||||
@@ -105,7 +107,9 @@ pub struct Database {
|
||||
// create DB connections.
|
||||
_shared_page_cache: Arc<RwLock<DumbLruPageCache>>,
|
||||
maybe_shared_wal: RwLock<Option<Arc<UnsafeCell<WalFileShared>>>>,
|
||||
is_empty: Arc<AtomicBool>,
|
||||
is_empty: Arc<AtomicUsize>,
|
||||
init_lock: Arc<Mutex<()>>,
|
||||
|
||||
open_flags: OpenFlags,
|
||||
}
|
||||
|
||||
@@ -164,7 +168,11 @@ impl Database {
|
||||
unsafe { &*wal.get() }.max_frame.load(Ordering::SeqCst) > 0
|
||||
});
|
||||
|
||||
let is_empty = db_size == 0 && !wal_has_frames;
|
||||
let is_empty = if db_size == 0 && !wal_has_frames {
|
||||
DB_STATE_EMPTY
|
||||
} else {
|
||||
DB_STATE_INITIALIZED
|
||||
};
|
||||
|
||||
let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::default()));
|
||||
let schema = Arc::new(RwLock::new(Schema::new()));
|
||||
@@ -177,12 +185,13 @@ impl Database {
|
||||
db_file,
|
||||
io: io.clone(),
|
||||
open_flags: flags,
|
||||
is_empty: Arc::new(AtomicBool::new(is_empty)),
|
||||
is_empty: Arc::new(AtomicUsize::new(is_empty)),
|
||||
init_lock: Arc::new(Mutex::new(())),
|
||||
};
|
||||
let db = Arc::new(db);
|
||||
|
||||
// Check: https://github.com/tursodatabase/limbo/pull/1761#discussion_r2154013123
|
||||
if !is_empty {
|
||||
if is_empty == 2 {
|
||||
// parse schema
|
||||
let conn = db.connect()?;
|
||||
let rows = conn.query("SELECT * FROM sqlite_schema")?;
|
||||
@@ -220,6 +229,7 @@ impl Database {
|
||||
Arc::new(RwLock::new(DumbLruPageCache::default())),
|
||||
buffer_pool,
|
||||
is_empty,
|
||||
self.init_lock.clone(),
|
||||
)?);
|
||||
|
||||
let page_size = header_accessor::get_page_size(&pager)
|
||||
@@ -259,6 +269,7 @@ impl Database {
|
||||
Arc::new(RwLock::new(DumbLruPageCache::default())),
|
||||
buffer_pool.clone(),
|
||||
is_empty,
|
||||
Arc::new(Mutex::new(())),
|
||||
)?;
|
||||
let page_size = header_accessor::get_page_size(&pager)
|
||||
.unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) as u32;
|
||||
|
||||
@@ -6531,7 +6531,10 @@ mod tests {
|
||||
ops::Deref,
|
||||
panic,
|
||||
rc::Rc,
|
||||
sync::{atomic::AtomicBool, Arc},
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicUsize},
|
||||
Arc, Mutex,
|
||||
},
|
||||
};
|
||||
|
||||
use tempfile::TempDir;
|
||||
@@ -6883,13 +6886,14 @@ mod tests {
|
||||
io,
|
||||
page_cache,
|
||||
buffer_pool,
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(AtomicUsize::new(0)),
|
||||
Arc::new(Mutex::new(())),
|
||||
)
|
||||
.unwrap()
|
||||
};
|
||||
let pager = Rc::new(pager);
|
||||
// FIXME: handle page cache is full
|
||||
pager.allocate_page1().unwrap();
|
||||
let _ = run_until_done(|| pager.allocate_page1(), &pager);
|
||||
let page2 = pager.allocate_page().unwrap();
|
||||
let page2 = Arc::new(BTreePageInner {
|
||||
page: RefCell::new(page2),
|
||||
@@ -7016,8 +7020,8 @@ mod tests {
|
||||
tracing::info!("seed: {}", seed);
|
||||
for insert_id in 0..inserts {
|
||||
let do_validate = do_validate_btree || (insert_id % VALIDATE_INTERVAL == 0);
|
||||
pager.begin_read_tx().unwrap();
|
||||
pager.begin_write_tx().unwrap();
|
||||
run_until_done(|| pager.begin_read_tx(), &pager).unwrap();
|
||||
run_until_done(|| pager.begin_write_tx(), &pager).unwrap();
|
||||
let size = size(&mut rng);
|
||||
let key = {
|
||||
let result;
|
||||
@@ -7068,7 +7072,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
pager.begin_read_tx().unwrap();
|
||||
run_until_done(|| pager.begin_read_tx(), &pager).unwrap();
|
||||
// FIXME: add sorted vector instead, should be okay for small amounts of keys for now :P, too lazy to fix right now
|
||||
cursor.move_to_root();
|
||||
let mut valid = true;
|
||||
@@ -7098,7 +7102,7 @@ mod tests {
|
||||
}
|
||||
pager.end_read_tx().unwrap();
|
||||
}
|
||||
pager.begin_read_tx().unwrap();
|
||||
run_until_done(|| pager.begin_read_tx(), &pager).unwrap();
|
||||
tracing::info!(
|
||||
"=========== btree ===========\n{}\n\n",
|
||||
format_btree(pager.clone(), root_page, 0)
|
||||
@@ -7408,15 +7412,15 @@ mod tests {
|
||||
io,
|
||||
Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(10))),
|
||||
buffer_pool,
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(AtomicUsize::new(0)),
|
||||
Arc::new(Mutex::new(())),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
pager.io.run_once().unwrap();
|
||||
|
||||
pager.allocate_page1().unwrap();
|
||||
|
||||
let _ = run_until_done(|| pager.allocate_page1(), &pager);
|
||||
for _ in 0..(database_size - 1) {
|
||||
pager.allocate_page().unwrap();
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ const HEADER_OFFSET_VERSION_NUMBER: usize = 96;
|
||||
|
||||
// Helper to get a read-only reference to the header page.
|
||||
fn get_header_page(pager: &Pager) -> Result<PageRef> {
|
||||
if pager.is_empty.load(Ordering::SeqCst) {
|
||||
if pager.is_empty.load(Ordering::SeqCst) < 2 {
|
||||
return Err(LimboError::InternalError(
|
||||
"Database is empty, header does not exist - page 1 should've been allocated before this".to_string(),
|
||||
));
|
||||
@@ -49,7 +49,7 @@ fn get_header_page(pager: &Pager) -> Result<PageRef> {
|
||||
|
||||
// Helper to get a writable reference to the header page and mark it dirty.
|
||||
fn get_header_page_for_write(pager: &Pager) -> Result<PageRef> {
|
||||
if pager.is_empty.load(Ordering::SeqCst) {
|
||||
if pager.is_empty.load(Ordering::SeqCst) < 2 {
|
||||
// This should not be called on an empty DB for writing, as page 1 is allocated on first transaction.
|
||||
return Err(LimboError::InternalError(
|
||||
"Cannot write to header of an empty database - page 1 should've been allocated before this".to_string(),
|
||||
@@ -88,7 +88,7 @@ macro_rules! impl_header_field_accessor {
|
||||
paste::paste! {
|
||||
#[allow(dead_code)]
|
||||
pub fn [<get_ $field_name>](pager: &Pager) -> Result<$type> {
|
||||
if pager.is_empty.load(Ordering::SeqCst) {
|
||||
if pager.is_empty.load(Ordering::SeqCst) < 2 {
|
||||
return Err(LimboError::InternalError(format!("Database is empty, header does not exist - page 1 should've been allocated before this")));
|
||||
}
|
||||
let page = get_header_page(pager)?;
|
||||
|
||||
@@ -12,13 +12,13 @@ use parking_lot::RwLock;
|
||||
use std::cell::{RefCell, UnsafeCell};
|
||||
use std::collections::HashSet;
|
||||
use std::rc::Rc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tracing::{trace, Level};
|
||||
|
||||
use super::btree::{btree_init_page, BTreePage};
|
||||
use super::page_cache::{CacheError, CacheResizeResult, DumbLruPageCache, PageCacheKey};
|
||||
use super::sqlite3_ondisk::DATABASE_HEADER_SIZE;
|
||||
use super::sqlite3_ondisk::{begin_write_btree_page, DATABASE_HEADER_SIZE};
|
||||
use super::wal::{CheckpointMode, CheckpointStatus};
|
||||
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
@@ -191,6 +191,9 @@ pub enum AutoVacuumMode {
|
||||
Incremental,
|
||||
}
|
||||
|
||||
pub const DB_STATE_EMPTY: usize = 0;
|
||||
pub const DB_STATE_INITIALIZING: usize = 1;
|
||||
pub const DB_STATE_INITIALIZED: usize = 2;
|
||||
/// The pager interface implements the persistence layer by providing access
|
||||
/// to pages of the database file, including caching, concurrency control, and
|
||||
/// transaction management.
|
||||
@@ -212,10 +215,13 @@ pub struct Pager {
|
||||
checkpoint_inflight: Rc<RefCell<usize>>,
|
||||
syncing: Rc<RefCell<bool>>,
|
||||
auto_vacuum_mode: RefCell<AutoVacuumMode>,
|
||||
/// Is the db empty? This is signified by 0-sized database and nonexistent WAL.
|
||||
pub is_empty: Arc<AtomicBool>,
|
||||
/// 0 -> Database is empty,
|
||||
/// 1 -> Database is being initialized,
|
||||
/// 2 -> Database is initialized and ready for use.
|
||||
pub is_empty: Arc<AtomicUsize>,
|
||||
/// Mutex for synchronizing database initialization to prevent race conditions
|
||||
init_lock: Mutex<()>,
|
||||
init_lock: Arc<Mutex<()>>,
|
||||
allocate_page1_state: RefCell<AllocatePage1State>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
@@ -237,6 +243,16 @@ pub enum PagerCacheflushResult {
|
||||
Rollback,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum AllocatePage1State {
|
||||
Start,
|
||||
Writing {
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
page: BTreePage,
|
||||
},
|
||||
Done,
|
||||
}
|
||||
|
||||
impl Pager {
|
||||
pub fn new(
|
||||
db_file: Arc<dyn DatabaseStorage>,
|
||||
@@ -244,8 +260,14 @@ impl Pager {
|
||||
io: Arc<dyn crate::io::IO>,
|
||||
page_cache: Arc<RwLock<DumbLruPageCache>>,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
is_empty: Arc<AtomicBool>,
|
||||
is_empty: Arc<AtomicUsize>,
|
||||
init_lock: Arc<Mutex<()>>,
|
||||
) -> Result<Self> {
|
||||
let allocate_page1_state = if is_empty.load(Ordering::SeqCst) < DB_STATE_INITIALIZED {
|
||||
RefCell::new(AllocatePage1State::Start)
|
||||
} else {
|
||||
RefCell::new(AllocatePage1State::Done)
|
||||
};
|
||||
Ok(Self {
|
||||
db_file,
|
||||
wal,
|
||||
@@ -262,7 +284,8 @@ impl Pager {
|
||||
buffer_pool,
|
||||
auto_vacuum_mode: RefCell::new(AutoVacuumMode::None),
|
||||
is_empty,
|
||||
init_lock: Mutex::new(()),
|
||||
init_lock,
|
||||
allocate_page1_state,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -553,28 +576,46 @@ impl Pager {
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn begin_read_tx(&self) -> Result<LimboResult> {
|
||||
pub fn begin_read_tx(&self) -> Result<CursorResult<LimboResult>> {
|
||||
// We allocate the first page lazily in the first transaction
|
||||
if self.is_empty.load(Ordering::SeqCst) {
|
||||
let _lock = self.init_lock.lock().unwrap();
|
||||
if self.is_empty.load(Ordering::SeqCst) {
|
||||
self.allocate_page1()?;
|
||||
}
|
||||
match self.maybe_allocate_page1()? {
|
||||
CursorResult::Ok(_) => {}
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
}
|
||||
Ok(CursorResult::Ok(self.wal.borrow_mut().begin_read_tx()?))
|
||||
}
|
||||
|
||||
fn maybe_allocate_page1(&self) -> Result<CursorResult<()>> {
|
||||
if self.is_empty.load(Ordering::SeqCst) < DB_STATE_INITIALIZED {
|
||||
if let Ok(_lock) = self.init_lock.try_lock() {
|
||||
match (
|
||||
self.is_empty.load(Ordering::SeqCst),
|
||||
self.allocating_page1(),
|
||||
) {
|
||||
// In case of being empty or (allocating and this connection is performing allocation) then allocate the first page
|
||||
(0, false) | (1, true) => match self.allocate_page1()? {
|
||||
CursorResult::Ok(_) => Ok(CursorResult::Ok(())),
|
||||
CursorResult::IO => Ok(CursorResult::IO),
|
||||
},
|
||||
_ => Ok(CursorResult::IO),
|
||||
}
|
||||
} else {
|
||||
Ok(CursorResult::IO)
|
||||
}
|
||||
} else {
|
||||
Ok(CursorResult::Ok(()))
|
||||
}
|
||||
self.wal.borrow_mut().begin_read_tx()
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn begin_write_tx(&self) -> Result<LimboResult> {
|
||||
pub fn begin_write_tx(&self) -> Result<CursorResult<LimboResult>> {
|
||||
// TODO(Diego): The only possibly allocate page1 here is because OpenEphemeral needs a write transaction
|
||||
// we should have a unique API to begin transactions, something like sqlite3BtreeBeginTrans
|
||||
if self.is_empty.load(Ordering::SeqCst) {
|
||||
let _lock = self.init_lock.lock().unwrap();
|
||||
if self.is_empty.load(Ordering::SeqCst) {
|
||||
self.allocate_page1()?;
|
||||
}
|
||||
match self.maybe_allocate_page1()? {
|
||||
CursorResult::Ok(_) => {}
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
}
|
||||
self.wal.borrow_mut().begin_write_tx()
|
||||
Ok(CursorResult::Ok(self.wal.borrow_mut().begin_write_tx()?))
|
||||
}
|
||||
|
||||
pub fn end_tx(&self, rollback: bool) -> Result<PagerCacheflushStatus> {
|
||||
@@ -957,37 +998,73 @@ impl Pager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn allocate_page1(&self) -> Result<PageRef> {
|
||||
let mut default_header = DatabaseHeader::default();
|
||||
default_header.database_size += 1;
|
||||
self.is_empty.store(false, Ordering::SeqCst);
|
||||
let page = allocate_page(1, &self.buffer_pool, 0);
|
||||
pub fn allocate_page1(&self) -> Result<CursorResult<PageRef>> {
|
||||
let state = self.allocate_page1_state.borrow().clone();
|
||||
match state {
|
||||
AllocatePage1State::Start => {
|
||||
tracing::trace!("allocate_page1(Start)");
|
||||
self.is_empty.store(DB_STATE_INITIALIZING, Ordering::SeqCst);
|
||||
let default_header = DatabaseHeader::default();
|
||||
let page = allocate_page(1, &self.buffer_pool, 0);
|
||||
|
||||
let page1 = Arc::new(BTreePageInner {
|
||||
page: RefCell::new(page),
|
||||
});
|
||||
// Create the sqlite_schema table, for this we just need to create the btree page
|
||||
// for the first page of the database which is basically like any other btree page
|
||||
// but with a 100 byte offset, so we just init the page so that sqlite understands
|
||||
// this is a correct page.
|
||||
btree_init_page(
|
||||
&page1,
|
||||
PageType::TableLeaf,
|
||||
DATABASE_HEADER_SIZE,
|
||||
(default_header.get_page_size() - default_header.reserved_space as u32) as u16,
|
||||
);
|
||||
let page1 = Arc::new(BTreePageInner {
|
||||
page: RefCell::new(page),
|
||||
});
|
||||
// Create the sqlite_schema table, for this we just need to create the btree page
|
||||
// for the first page of the database which is basically like any other btree page
|
||||
// but with a 100 byte offset, so we just init the page so that sqlite understands
|
||||
// this is a correct page.
|
||||
btree_init_page(
|
||||
&page1,
|
||||
PageType::TableLeaf,
|
||||
DATABASE_HEADER_SIZE,
|
||||
(default_header.get_page_size() - default_header.reserved_space as u32) as u16,
|
||||
);
|
||||
let write_counter = Rc::new(RefCell::new(0));
|
||||
begin_write_btree_page(self, &page1.get(), write_counter.clone())?;
|
||||
|
||||
let page1_ref = page1.get();
|
||||
let contents = page1_ref.get().contents.as_mut().unwrap();
|
||||
contents.write_database_header(&default_header);
|
||||
page1_ref.set_dirty();
|
||||
self.add_dirty(page1_ref.get().id);
|
||||
let page_key = PageCacheKey::new(page1_ref.get().id);
|
||||
let mut cache = self.page_cache.write();
|
||||
cache.insert(page_key, page1_ref.clone()).map_err(|e| {
|
||||
LimboError::InternalError(format!("Failed to insert page 1 into cache: {:?}", e))
|
||||
})?;
|
||||
Ok(page1_ref.clone())
|
||||
self.allocate_page1_state
|
||||
.replace(AllocatePage1State::Writing {
|
||||
write_counter,
|
||||
page: page1,
|
||||
});
|
||||
Ok(CursorResult::IO)
|
||||
}
|
||||
AllocatePage1State::Writing {
|
||||
write_counter,
|
||||
page,
|
||||
} => {
|
||||
tracing::trace!("allocate_page1(Writing)");
|
||||
if *write_counter.borrow() > 0 {
|
||||
return Ok(CursorResult::IO);
|
||||
}
|
||||
tracing::trace!("allocate_page1(Writing done)");
|
||||
let mut default_header = DatabaseHeader::default();
|
||||
default_header.database_size += 1;
|
||||
let page1_ref = page.get();
|
||||
let contents = page1_ref.get().contents.as_mut().unwrap();
|
||||
contents.write_database_header(&default_header);
|
||||
let page_key = PageCacheKey::new(page1_ref.get().id);
|
||||
let mut cache = self.page_cache.write();
|
||||
cache.insert(page_key, page1_ref.clone()).map_err(|e| {
|
||||
LimboError::InternalError(format!(
|
||||
"Failed to insert page 1 into cache: {:?}",
|
||||
e
|
||||
))
|
||||
})?;
|
||||
self.is_empty.store(DB_STATE_INITIALIZED, Ordering::SeqCst);
|
||||
self.allocate_page1_state.replace(AllocatePage1State::Done);
|
||||
Ok(CursorResult::Ok(page1_ref.clone()))
|
||||
}
|
||||
AllocatePage1State::Done => unreachable!("cannot try to allocate page 1 again"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn allocating_page1(&self) -> bool {
|
||||
matches!(
|
||||
*self.allocate_page1_state.borrow(),
|
||||
AllocatePage1State::Writing { .. }
|
||||
)
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1356,6 +1433,19 @@ mod ptrmap_tests {
|
||||
use crate::storage::sqlite3_ondisk::MIN_PAGE_SIZE;
|
||||
use crate::storage::wal::{WalFile, WalFileShared};
|
||||
|
||||
pub fn run_until_done<T>(
|
||||
mut action: impl FnMut() -> Result<CursorResult<T>>,
|
||||
pager: &Pager,
|
||||
) -> Result<T> {
|
||||
loop {
|
||||
match action()? {
|
||||
CursorResult::Ok(res) => {
|
||||
return Ok(res);
|
||||
}
|
||||
CursorResult::IO => pager.io.run_once().unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
// Helper to create a Pager for testing
|
||||
fn test_pager_setup(page_size: u32, initial_db_pages: u32) -> Pager {
|
||||
let io: Arc<dyn IO> = Arc::new(MemoryIO::new());
|
||||
@@ -1387,10 +1477,11 @@ mod ptrmap_tests {
|
||||
io,
|
||||
page_cache,
|
||||
buffer_pool,
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Arc::new(AtomicUsize::new(0)),
|
||||
Arc::new(Mutex::new(())),
|
||||
)
|
||||
.unwrap();
|
||||
pager.allocate_page1().unwrap();
|
||||
run_until_done(|| pager.allocate_page1(), &pager).unwrap();
|
||||
header_accessor::set_vacuum_mode_largest_root_page(&pager, 1).unwrap();
|
||||
pager.set_auto_vacuum_mode(AutoVacuumMode::Full);
|
||||
|
||||
|
||||
@@ -25,7 +25,8 @@ use crate::{
|
||||
},
|
||||
types::compare_immutable,
|
||||
};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::Mutex;
|
||||
use std::{borrow::BorrowMut, rc::Rc, sync::Arc};
|
||||
|
||||
use crate::{pseudo::PseudoCursor, result::LimboResult};
|
||||
@@ -1708,13 +1709,13 @@ pub fn op_transaction(
|
||||
};
|
||||
|
||||
if updated && matches!(current_state, TransactionState::None) {
|
||||
if let LimboResult::Busy = pager.begin_read_tx()? {
|
||||
if let LimboResult::Busy = return_if_io!(pager.begin_read_tx()) {
|
||||
return Ok(InsnFunctionStepResult::Busy);
|
||||
}
|
||||
}
|
||||
|
||||
if updated && matches!(new_transaction_state, TransactionState::Write) {
|
||||
if let LimboResult::Busy = pager.begin_write_tx()? {
|
||||
if let LimboResult::Busy = return_if_io!(pager.begin_write_tx()) {
|
||||
pager.end_read_tx()?;
|
||||
tracing::trace!("begin_write_tx busy");
|
||||
return Ok(InsnFunctionStepResult::Busy);
|
||||
@@ -5188,6 +5189,11 @@ pub fn op_noop(
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
pub enum OpOpenEphemeralState {
|
||||
Start,
|
||||
StartingTxn { pager: Rc<Pager> },
|
||||
CreateBtree { pager: Rc<Pager> },
|
||||
}
|
||||
pub fn op_open_ephemeral(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
@@ -5203,97 +5209,114 @@ pub fn op_open_ephemeral(
|
||||
Insn::OpenAutoindex { cursor_id } => (*cursor_id, false),
|
||||
_ => unreachable!("unexpected Insn {:?}", insn),
|
||||
};
|
||||
match &state.op_open_ephemeral_state {
|
||||
OpOpenEphemeralState::Start => {
|
||||
tracing::trace!("Start");
|
||||
let conn = program.connection.clone();
|
||||
let io = conn.pager.io.get_memory_io();
|
||||
|
||||
let conn = program.connection.clone();
|
||||
let io = conn.pager.io.get_memory_io();
|
||||
let file = io.open_file("", OpenFlags::Create, true)?;
|
||||
let db_file = Arc::new(FileMemoryStorage::new(file));
|
||||
|
||||
let file = io.open_file("", OpenFlags::Create, true)?;
|
||||
let db_file = Arc::new(FileMemoryStorage::new(file));
|
||||
let buffer_pool = Rc::new(BufferPool::new(None));
|
||||
let page_cache = Arc::new(RwLock::new(DumbLruPageCache::default()));
|
||||
|
||||
let buffer_pool = Rc::new(BufferPool::new(None));
|
||||
let page_cache = Arc::new(RwLock::new(DumbLruPageCache::default()));
|
||||
let pager = Rc::new(Pager::new(
|
||||
db_file,
|
||||
Rc::new(RefCell::new(DummyWAL)),
|
||||
io,
|
||||
page_cache,
|
||||
buffer_pool.clone(),
|
||||
Arc::new(AtomicUsize::new(0)),
|
||||
Arc::new(Mutex::new(())),
|
||||
)?);
|
||||
|
||||
let pager = Rc::new(Pager::new(
|
||||
db_file,
|
||||
Rc::new(RefCell::new(DummyWAL)),
|
||||
io,
|
||||
page_cache,
|
||||
buffer_pool.clone(),
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
)?);
|
||||
let page_size = header_accessor::get_page_size(&pager)
|
||||
.unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE)
|
||||
as usize;
|
||||
buffer_pool.set_page_size(page_size);
|
||||
|
||||
let page_size = header_accessor::get_page_size(&pager)
|
||||
.unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) as usize;
|
||||
buffer_pool.set_page_size(page_size);
|
||||
|
||||
let flag = if is_table {
|
||||
&CreateBTreeFlags::new_table()
|
||||
} else {
|
||||
&CreateBTreeFlags::new_index()
|
||||
};
|
||||
|
||||
pager.begin_write_tx()?;
|
||||
|
||||
// FIXME: handle page cache is full
|
||||
let root_page = return_if_io!(pager.btree_create(flag));
|
||||
|
||||
let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();
|
||||
let mv_cursor = match state.mv_tx_id {
|
||||
Some(tx_id) => {
|
||||
let table_id = root_page as u64;
|
||||
let mv_store = mv_store.unwrap().clone();
|
||||
let mv_cursor = Rc::new(RefCell::new(
|
||||
MvCursor::new(mv_store.clone(), tx_id, table_id).unwrap(),
|
||||
));
|
||||
Some(mv_cursor)
|
||||
state.op_open_ephemeral_state = OpOpenEphemeralState::StartingTxn { pager };
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
let mut cursor = if let CursorType::BTreeIndex(index) = cursor_type {
|
||||
BTreeCursor::new_index(
|
||||
mv_cursor,
|
||||
pager,
|
||||
root_page as usize,
|
||||
index,
|
||||
index
|
||||
.columns
|
||||
.iter()
|
||||
.map(|c| c.collation.unwrap_or_default())
|
||||
.collect(),
|
||||
)
|
||||
} else {
|
||||
BTreeCursor::new_table(mv_cursor, pager, root_page as usize)
|
||||
};
|
||||
cursor.rewind()?; // Will never return io
|
||||
OpOpenEphemeralState::StartingTxn { pager } => {
|
||||
tracing::trace!("StartingTxn");
|
||||
return_if_io!(pager.begin_write_tx());
|
||||
state.op_open_ephemeral_state = OpOpenEphemeralState::CreateBtree {
|
||||
pager: pager.clone(),
|
||||
};
|
||||
}
|
||||
OpOpenEphemeralState::CreateBtree { pager } => {
|
||||
tracing::trace!("CreateBtree");
|
||||
// FIXME: handle page cache is full
|
||||
let flag = if is_table {
|
||||
&CreateBTreeFlags::new_table()
|
||||
} else {
|
||||
&CreateBTreeFlags::new_index()
|
||||
};
|
||||
let root_page = return_if_io!(pager.btree_create(flag));
|
||||
|
||||
let mut cursors: std::cell::RefMut<'_, Vec<Option<Cursor>>> = state.cursors.borrow_mut();
|
||||
let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();
|
||||
let mv_cursor = match state.mv_tx_id {
|
||||
Some(tx_id) => {
|
||||
let table_id = root_page as u64;
|
||||
let mv_store = mv_store.unwrap().clone();
|
||||
let mv_cursor = Rc::new(RefCell::new(
|
||||
MvCursor::new(mv_store.clone(), tx_id, table_id).unwrap(),
|
||||
));
|
||||
Some(mv_cursor)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
let mut cursor = if let CursorType::BTreeIndex(index) = cursor_type {
|
||||
BTreeCursor::new_index(
|
||||
mv_cursor,
|
||||
pager.clone(),
|
||||
root_page as usize,
|
||||
index,
|
||||
index
|
||||
.columns
|
||||
.iter()
|
||||
.map(|c| c.collation.unwrap_or_default())
|
||||
.collect(),
|
||||
)
|
||||
} else {
|
||||
BTreeCursor::new_table(mv_cursor, pager.clone(), root_page as usize)
|
||||
};
|
||||
cursor.rewind()?; // Will never return io
|
||||
|
||||
// Table content is erased if the cursor already exists
|
||||
match cursor_type {
|
||||
CursorType::BTreeTable(_) => {
|
||||
cursors
|
||||
.get_mut(cursor_id)
|
||||
.unwrap()
|
||||
.replace(Cursor::new_btree(cursor));
|
||||
}
|
||||
CursorType::BTreeIndex(_) => {
|
||||
cursors
|
||||
.get_mut(cursor_id)
|
||||
.unwrap()
|
||||
.replace(Cursor::new_btree(cursor));
|
||||
}
|
||||
CursorType::Pseudo(_) => {
|
||||
panic!("OpenEphemeral on pseudo cursor");
|
||||
}
|
||||
CursorType::Sorter => {
|
||||
panic!("OpenEphemeral on sorter cursor");
|
||||
}
|
||||
CursorType::VirtualTable(_) => {
|
||||
panic!("OpenEphemeral on virtual table cursor, use Insn::VOpen instead");
|
||||
let mut cursors: std::cell::RefMut<'_, Vec<Option<Cursor>>> =
|
||||
state.cursors.borrow_mut();
|
||||
|
||||
// Table content is erased if the cursor already exists
|
||||
match cursor_type {
|
||||
CursorType::BTreeTable(_) => {
|
||||
cursors
|
||||
.get_mut(cursor_id)
|
||||
.unwrap()
|
||||
.replace(Cursor::new_btree(cursor));
|
||||
}
|
||||
CursorType::BTreeIndex(_) => {
|
||||
cursors
|
||||
.get_mut(cursor_id)
|
||||
.unwrap()
|
||||
.replace(Cursor::new_btree(cursor));
|
||||
}
|
||||
CursorType::Pseudo(_) => {
|
||||
panic!("OpenEphemeral on pseudo cursor");
|
||||
}
|
||||
CursorType::Sorter => {
|
||||
panic!("OpenEphemeral on sorter cursor");
|
||||
}
|
||||
CursorType::VirtualTable(_) => {
|
||||
panic!("OpenEphemeral on virtual table cursor, use Insn::VOpen instead");
|
||||
}
|
||||
}
|
||||
|
||||
state.pc += 1;
|
||||
state.op_open_ephemeral_state = OpOpenEphemeralState::Start;
|
||||
}
|
||||
}
|
||||
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
|
||||
@@ -42,7 +42,10 @@ use crate::{
|
||||
use crate::json::JsonCacheCell;
|
||||
use crate::{Connection, MvStore, Result, TransactionState};
|
||||
use builder::CursorKey;
|
||||
use execute::{InsnFunction, InsnFunctionStepResult, OpIdxDeleteState, OpIntegrityCheckState};
|
||||
use execute::{
|
||||
InsnFunction, InsnFunctionStepResult, OpIdxDeleteState, OpIntegrityCheckState,
|
||||
OpOpenEphemeralState,
|
||||
};
|
||||
|
||||
use rand::Rng;
|
||||
use regex::Regex;
|
||||
@@ -246,6 +249,7 @@ pub struct ProgramState {
|
||||
json_cache: JsonCacheCell,
|
||||
op_idx_delete_state: Option<OpIdxDeleteState>,
|
||||
op_integrity_check_state: OpIntegrityCheckState,
|
||||
op_open_ephemeral_state: OpOpenEphemeralState,
|
||||
}
|
||||
|
||||
impl ProgramState {
|
||||
@@ -271,6 +275,7 @@ impl ProgramState {
|
||||
json_cache: JsonCacheCell::new(),
|
||||
op_idx_delete_state: None,
|
||||
op_integrity_check_state: OpIntegrityCheckState::Start,
|
||||
op_open_ephemeral_state: OpOpenEphemeralState::Start,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user