diff --git a/core/lib.rs b/core/lib.rs index 130503ede..645a71e53 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -77,7 +77,7 @@ use std::{ use storage::database::DatabaseFile; use storage::page_cache::DumbLruPageCache; pub use storage::pager::PagerCacheflushStatus; -use storage::pager::{DB_STATE_INITIALIZED, DB_STATE_UNITIALIZED}; +use storage::pager::{DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED}; pub use storage::{ buffer_pool::BufferPool, database::DatabaseStorage, @@ -117,7 +117,7 @@ pub struct Database { // create DB connections. _shared_page_cache: Arc>, maybe_shared_wal: RwLock>>>, - is_empty: Arc, + db_state: Arc, init_lock: Arc>, open_flags: OpenFlags, } @@ -192,8 +192,9 @@ impl Database { .as_ref() .is_some_and(|wal| unsafe { &*wal.get() }.max_frame.load(Ordering::SeqCst) > 0); - let is_empty = if db_size == 0 && !wal_has_frames { - DB_STATE_UNITIALIZED + // No pages in DB file or WAL -> empty database + let db_state = if db_size == 0 && !wal_has_frames { + DB_STATE_UNINITIALIZED } else { DB_STATE_INITIALIZED }; @@ -209,13 +210,13 @@ impl Database { db_file, io: io.clone(), open_flags: flags, - is_empty: Arc::new(AtomicUsize::new(is_empty)), + db_state: Arc::new(AtomicUsize::new(db_state)), init_lock: Arc::new(Mutex::new(())), }; let db = Arc::new(db); // Check: https://github.com/tursodatabase/turso/pull/1761#discussion_r2154013123 - if is_empty == 2 { + if db_state == DB_STATE_INITIALIZED { // parse schema let conn = db.connect()?; let schema_version = get_schema_version(&conn)?; @@ -239,90 +240,16 @@ impl Database { } pub fn connect(self: &Arc) -> Result> { - let buffer_pool = Arc::new(BufferPool::new(None)); + let pager = self.init_pager(None)?; - // Open existing WAL file if present - if let Some(shared_wal) = self.maybe_shared_wal.read().clone() { - // No pages in DB file or WAL -> empty database - let is_empty = self.is_empty.clone(); - let wal = Rc::new(RefCell::new(WalFile::new( - self.io.clone(), - shared_wal, - buffer_pool.clone(), - ))); - let pager = Rc::new(Pager::new( - self.db_file.clone(), - wal, - self.io.clone(), - Arc::new(RwLock::new(DumbLruPageCache::default())), - buffer_pool, - is_empty, - self.init_lock.clone(), - )?); - - let page_size = header_accessor::get_page_size(&pager) - .unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) - as u32; - let default_cache_size = header_accessor::get_default_page_cache_size(&pager) - .unwrap_or(storage::sqlite3_ondisk::DEFAULT_CACHE_SIZE); - pager.buffer_pool.set_page_size(page_size as usize); - let conn = Arc::new(Connection { - _db: self.clone(), - pager: pager.clone(), - schema: RefCell::new(self.schema.read().clone()), - last_insert_rowid: Cell::new(0), - auto_commit: Cell::new(true), - mv_transactions: RefCell::new(Vec::new()), - transaction_state: Cell::new(TransactionState::None), - last_change: Cell::new(0), - syms: RefCell::new(SymbolTable::new()), - total_changes: Cell::new(0), - _shared_cache: false, - cache_size: Cell::new(default_cache_size), - readonly: Cell::new(false), - wal_checkpoint_disabled: Cell::new(false), - capture_data_changes: RefCell::new(CaptureDataChangesMode::Off), - closed: Cell::new(false), - }); - if let Err(e) = conn.register_builtins() { - return Err(LimboError::ExtensionError(e)); - } - return Ok(conn); - }; - - // No existing WAL; create one. - // TODO: currently Pager needs to be instantiated with some implementation of trait Wal, so here's a workaround. - let dummy_wal = Rc::new(RefCell::new(DummyWAL {})); - let is_empty = self.is_empty.clone(); - let mut pager = Pager::new( - self.db_file.clone(), - dummy_wal, - self.io.clone(), - Arc::new(RwLock::new(DumbLruPageCache::default())), - buffer_pool.clone(), - is_empty, - Arc::new(Mutex::new(())), - )?; let page_size = header_accessor::get_page_size(&pager) .unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) as u32; let default_cache_size = header_accessor::get_default_page_cache_size(&pager) .unwrap_or(storage::sqlite3_ondisk::DEFAULT_CACHE_SIZE); - let wal_path = format!("{}-wal", self.path); - let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?; - let real_shared_wal = WalFileShared::new_shared(page_size, &self.io, file)?; - // Modify Database::maybe_shared_wal to point to the new WAL file so that other connections - // can open the existing WAL. - *self.maybe_shared_wal.write() = Some(real_shared_wal.clone()); - let wal = Rc::new(RefCell::new(WalFile::new( - self.io.clone(), - real_shared_wal, - buffer_pool, - ))); - pager.set_wal(wal); let conn = Arc::new(Connection { _db: self.clone(), - pager: Rc::new(pager), + pager: RefCell::new(Rc::new(pager)), schema: RefCell::new(self.schema.read().clone()), auto_commit: Cell::new(true), mv_transactions: RefCell::new(Vec::new()), @@ -333,6 +260,7 @@ impl Database { syms: RefCell::new(SymbolTable::new()), _shared_cache: false, cache_size: Cell::new(default_cache_size), + page_size: Cell::new(page_size), readonly: Cell::new(false), wal_checkpoint_disabled: Cell::new(false), capture_data_changes: RefCell::new(CaptureDataChangesMode::Off), @@ -345,6 +273,78 @@ impl Database { Ok(conn) } + fn init_pager(&self, page_size: Option) -> Result { + // Open existing WAL file if present + if let Some(shared_wal) = self.maybe_shared_wal.read().clone() { + let size = match page_size { + None => unsafe { (*shared_wal.get()).page_size() as usize }, + Some(size) => { + unsafe { (*shared_wal.get()).set_page_size(size as u32) }; + size + } + }; + let buffer_pool = Arc::new(BufferPool::new(Some(size))); + + let db_state = self.db_state.clone(); + let wal = Rc::new(RefCell::new(WalFile::new( + self.io.clone(), + shared_wal, + buffer_pool.clone(), + ))); + let pager = Pager::new( + self.db_file.clone(), + wal, + self.io.clone(), + Arc::new(RwLock::new(DumbLruPageCache::default())), + buffer_pool.clone(), + db_state, + self.init_lock.clone(), + )?; + return Ok(pager); + } + + let buffer_pool = Arc::new(BufferPool::new(page_size)); + // No existing WAL; create one. + // TODO: currently Pager needs to be instantiated with some implementation of trait Wal, so here's a workaround. + let dummy_wal = Rc::new(RefCell::new(DummyWAL {})); + let db_state = self.db_state.clone(); + let mut pager = Pager::new( + self.db_file.clone(), + dummy_wal, + self.io.clone(), + Arc::new(RwLock::new(DumbLruPageCache::default())), + buffer_pool.clone(), + db_state, + Arc::new(Mutex::new(())), + )?; + + let size = match page_size { + Some(size) => size as u32, + None => { + let size = header_accessor::get_page_size(&pager) + .unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) + as u32; + buffer_pool.set_page_size(size as usize); + size + } + }; + + let wal_path = format!("{}-wal", self.path); + let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?; + let real_shared_wal = WalFileShared::new_shared(size, &self.io, file)?; + // Modify Database::maybe_shared_wal to point to the new WAL file so that other connections + // can open the existing WAL. + *self.maybe_shared_wal.write() = Some(real_shared_wal.clone()); + let wal = Rc::new(RefCell::new(WalFile::new( + self.io.clone(), + real_shared_wal, + buffer_pool, + ))); + pager.set_wal(wal); + + Ok(pager) + } + /// Open a new database file with optionally specifying a VFS without an existing database /// connection and symbol table to register extensions. #[cfg(feature = "fs")] @@ -499,7 +499,7 @@ impl CaptureDataChangesMode { pub struct Connection { _db: Arc, - pager: Rc, + pager: RefCell>, schema: RefCell, /// Whether to automatically commit transaction auto_commit: Cell, @@ -511,6 +511,7 @@ pub struct Connection { syms: RefCell, _shared_cache: bool, cache_size: Cell, + page_size: Cell, readonly: Cell, wal_checkpoint_disabled: Cell, capture_data_changes: RefCell, @@ -545,7 +546,7 @@ impl Connection { let program = Rc::new(translate::translate( self.schema.borrow().deref(), stmt, - self.pager.clone(), + self.pager.borrow().clone(), self.clone(), &syms, QueryMode::Normal, @@ -554,7 +555,7 @@ impl Connection { Ok(Statement::new( program, self._db.mv_store.clone(), - self.pager.clone(), + self.pager.borrow().clone(), )) } Cmd::Explain(_stmt) => todo!(), @@ -596,7 +597,7 @@ impl Connection { let program = translate::translate( self.schema.borrow().deref(), stmt.clone(), - self.pager.clone(), + self.pager.borrow().clone(), self.clone(), &syms, cmd.into(), @@ -605,7 +606,7 @@ impl Connection { let stmt = Statement::new( program.into(), self._db.mv_store.clone(), - self.pager.clone(), + self.pager.borrow().clone(), ); Ok(Some(stmt)) } @@ -656,7 +657,7 @@ impl Connection { let program = translate::translate( self.schema.borrow().deref(), stmt, - self.pager.clone(), + self.pager.borrow().clone(), self.clone(), &syms, QueryMode::Explain, @@ -669,7 +670,7 @@ impl Connection { let program = translate::translate( self.schema.borrow().deref(), stmt, - self.pager.clone(), + self.pager.borrow().clone(), self.clone(), &syms, QueryMode::Normal, @@ -682,7 +683,7 @@ impl Connection { let res = program.step( &mut state, self._db.mv_store.clone(), - self.pager.clone(), + self.pager.borrow().clone(), )?; if matches!(res, StepResult::Done) { break; @@ -703,7 +704,7 @@ impl Connection { if res.is_err() { let state = self.transaction_state.get(); if let TransactionState::Write { schema_did_change } = state { - self.pager.rollback(schema_did_change, self)? + self.pager.borrow().rollback(schema_did_change, self)? } } res @@ -750,7 +751,7 @@ impl Connection { } pub fn wal_frame_count(&self) -> Result { - self.pager.wal_frame_count() + self.pager.borrow().wal_frame_count() } pub fn wal_get_frame( @@ -759,7 +760,9 @@ impl Connection { p_frame: *mut u8, frame_len: u32, ) -> Result> { - self.pager.wal_get_frame(frame_no, p_frame, frame_len) + self.pager + .borrow() + .wal_get_frame(frame_no, p_frame, frame_len) } /// Flush dirty pages to disk. @@ -770,11 +773,13 @@ impl Connection { if self.closed.get() { return Err(LimboError::InternalError("Connection closed".to_string())); } - self.pager.cacheflush(self.wal_checkpoint_disabled.get()) + self.pager + .borrow() + .cacheflush(self.wal_checkpoint_disabled.get()) } pub fn clear_page_cache(&self) -> Result<()> { - self.pager.clear_page_cache(); + self.pager.borrow().clear_page_cache(); Ok(()) } @@ -783,6 +788,7 @@ impl Connection { return Err(LimboError::InternalError("Connection closed".to_string())); } self.pager + .borrow() .wal_checkpoint(self.wal_checkpoint_disabled.get()) } @@ -793,6 +799,7 @@ impl Connection { } self.closed.set(true); self.pager + .borrow() .checkpoint_shutdown(self.wal_checkpoint_disabled.get()) } @@ -831,6 +838,22 @@ impl Connection { pub fn set_capture_data_changes(&self, opts: CaptureDataChangesMode) { self.capture_data_changes.replace(opts); } + pub fn get_page_size(&self) -> u32 { + self.page_size.get() + } + + /// Reset the page size for the current connection. Can only be called when db is uninitialized. + pub fn reset_page_size(&self, size: u32) -> Result<()> { + if self._db.db_state.load(Ordering::SeqCst) != DB_STATE_UNINITIALIZED { + return Ok(()); + } + + self.page_size.set(size); + let pager = self._db.init_pager(Some(size as usize))?; + self.pager.replace(Rc::new(pager)); + + Ok(()) + } #[cfg(feature = "fs")] pub fn open_new(&self, path: &str, vfs: &str) -> Result<(Arc, Arc)> { diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 09abcc2dd..2bddc979d 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -6593,7 +6593,7 @@ mod tests { &mut payload, &record, 4096, - conn.pager.clone(), + conn.pager.borrow().clone(), ); insert_into_cell(page, &payload, pos, 4096).unwrap(); payload @@ -6831,7 +6831,7 @@ mod tests { let io: Arc = Arc::new(MemoryIO::new()); let db = Database::open_file(io.clone(), "test.db", false, false).unwrap(); let conn = db.connect().unwrap(); - let pager = conn.pager.clone(); + let pager = conn.pager.borrow().clone(); // FIXME: handle page cache is full let _ = run_until_done(|| pager.allocate_page1(), &pager); @@ -7717,7 +7717,7 @@ mod tests { &mut payload, &record, 4096, - conn.pager.clone(), + conn.pager.borrow().clone(), ); if (free as usize) < payload.len() + 2 { // do not try to insert overflow pages because they require balancing @@ -7790,7 +7790,7 @@ mod tests { &mut payload, &record, 4096, - conn.pager.clone(), + conn.pager.borrow().clone(), ); if (free as usize) < payload.len() - 2 { // do not try to insert overflow pages because they require balancing @@ -8154,7 +8154,7 @@ mod tests { &mut payload, &record, 4096, - conn.pager.clone(), + conn.pager.borrow().clone(), ); let page = page.get(); insert(0, page.get_contents()); @@ -8231,7 +8231,7 @@ mod tests { &mut payload, &record, 4096, - conn.pager.clone(), + conn.pager.borrow().clone(), ); insert_into_cell(page.get().get_contents(), &payload, 0, 4096).unwrap(); let free = compute_free_space(page.get().get_contents(), usable_space); diff --git a/core/storage/header_accessor.rs b/core/storage/header_accessor.rs index d1d7c726a..1e1c1c48c 100644 --- a/core/storage/header_accessor.rs +++ b/core/storage/header_accessor.rs @@ -35,7 +35,7 @@ const HEADER_OFFSET_VERSION_NUMBER: usize = 96; // Helper to get a read-only reference to the header page. fn get_header_page(pager: &Pager) -> Result> { - if pager.is_empty.load(Ordering::SeqCst) < 2 { + if pager.db_state.load(Ordering::SeqCst) < 2 { return Err(LimboError::InternalError( "Database is empty, header does not exist - page 1 should've been allocated before this".to_string(), )); @@ -49,7 +49,7 @@ fn get_header_page(pager: &Pager) -> Result> { // Helper to get a writable reference to the header page and mark it dirty. fn get_header_page_for_write(pager: &Pager) -> Result> { - if pager.is_empty.load(Ordering::SeqCst) < 2 { + if pager.db_state.load(Ordering::SeqCst) < 2 { // This should not be called on an empty DB for writing, as page 1 is allocated on first transaction. return Err(LimboError::InternalError( "Cannot write to header of an empty database - page 1 should've been allocated before this".to_string(), @@ -103,7 +103,7 @@ macro_rules! impl_header_field_accessor { // Async version #[allow(dead_code)] pub fn [](pager: &Pager) -> Result> { - if pager.is_empty.load(Ordering::SeqCst) < 2 { + if pager.db_state.load(Ordering::SeqCst) < 2 { return Err(LimboError::InternalError(format!("Database is empty, header does not exist - page 1 should've been allocated before this"))); } let page = match get_header_page(pager)? { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 67499ee86..c5918ca4c 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -6,8 +6,8 @@ use crate::storage::header_accessor; use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType}; use crate::storage::wal::{CheckpointResult, Wal, WalFsyncStatus}; use crate::types::CursorResult; +use crate::Completion; use crate::{Buffer, Connection, LimboError, Result}; -use crate::{Completion, WalFile}; use parking_lot::RwLock; use std::cell::{OnceCell, RefCell, UnsafeCell}; use std::collections::HashSet; @@ -191,7 +191,7 @@ pub enum AutoVacuumMode { Incremental, } -pub const DB_STATE_UNITIALIZED: usize = 0; +pub const DB_STATE_UNINITIALIZED: usize = 0; pub const DB_STATE_INITIALIZING: usize = 1; pub const DB_STATE_INITIALIZED: usize = 2; /// The pager interface implements the persistence layer by providing access @@ -218,7 +218,7 @@ pub struct Pager { /// 0 -> Database is empty, /// 1 -> Database is being initialized, /// 2 -> Database is initialized and ready for use. - pub is_empty: Arc, + pub db_state: Arc, /// Mutex for synchronizing database initialization to prevent race conditions init_lock: Arc>, allocate_page1_state: RefCell, @@ -265,10 +265,10 @@ impl Pager { io: Arc, page_cache: Arc>, buffer_pool: Arc, - is_empty: Arc, + db_state: Arc, init_lock: Arc>, ) -> Result { - let allocate_page1_state = if is_empty.load(Ordering::SeqCst) < DB_STATE_INITIALIZED { + let allocate_page1_state = if db_state.load(Ordering::SeqCst) < DB_STATE_INITIALIZED { RefCell::new(AllocatePage1State::Start) } else { RefCell::new(AllocatePage1State::Done) @@ -288,7 +288,7 @@ impl Pager { checkpoint_inflight: Rc::new(RefCell::new(0)), buffer_pool, auto_vacuum_mode: RefCell::new(AutoVacuumMode::None), - is_empty, + db_state, init_lock, allocate_page1_state, page_size: OnceCell::new(), @@ -296,7 +296,7 @@ impl Pager { }) } - pub fn set_wal(&mut self, wal: Rc>) { + pub fn set_wal(&mut self, wal: Rc>) { self.wal = wal; } @@ -608,10 +608,10 @@ impl Pager { #[instrument(skip_all, level = Level::INFO)] fn maybe_allocate_page1(&self) -> Result> { - if self.is_empty.load(Ordering::SeqCst) < DB_STATE_INITIALIZED { + if self.db_state.load(Ordering::SeqCst) < DB_STATE_INITIALIZED { if let Ok(_lock) = self.init_lock.try_lock() { match ( - self.is_empty.load(Ordering::SeqCst), + self.db_state.load(Ordering::SeqCst), self.allocating_page1(), ) { // In case of being empty or (allocating and this connection is performing allocation) then allocate the first page @@ -1054,7 +1054,7 @@ impl Pager { match state { AllocatePage1State::Start => { tracing::trace!("allocate_page1(Start)"); - self.is_empty.store(DB_STATE_INITIALIZING, Ordering::SeqCst); + self.db_state.store(DB_STATE_INITIALIZING, Ordering::SeqCst); let mut default_header = DatabaseHeader::default(); default_header.database_size += 1; let page = allocate_page(1, &self.buffer_pool, 0); @@ -1100,7 +1100,7 @@ impl Pager { cache.insert(page_key, page1_ref.clone()).map_err(|e| { LimboError::InternalError(format!("Failed to insert page 1 into cache: {e:?}")) })?; - self.is_empty.store(DB_STATE_INITIALIZED, Ordering::SeqCst); + self.db_state.store(DB_STATE_INITIALIZED, Ordering::SeqCst); self.allocate_page1_state.replace(AllocatePage1State::Done); Ok(CursorResult::Ok(page1_ref.clone())) } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index af5438859..67a4be5ec 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -83,7 +83,7 @@ pub const MIN_PAGE_CACHE_SIZE: usize = 10; pub const MIN_PAGE_SIZE: u32 = 512; /// The maximum page size in bytes. -const MAX_PAGE_SIZE: u32 = 65536; +pub const MAX_PAGE_SIZE: u32 = 65536; /// The default page size in bytes. pub const DEFAULT_PAGE_SIZE: u16 = 4096; @@ -279,7 +279,7 @@ impl Default for DatabaseHeader { impl DatabaseHeader { pub fn update_page_size(&mut self, size: u32) { - if !(MIN_PAGE_SIZE..=MAX_PAGE_SIZE).contains(&size) || (size & (size - 1) != 0) { + if !is_valid_page_size(size) { return; } @@ -299,6 +299,10 @@ impl DatabaseHeader { } } +pub fn is_valid_page_size(size: u32) -> bool { + (MIN_PAGE_SIZE..=MAX_PAGE_SIZE).contains(&size) && (size & (size - 1)) == 0 +} + pub fn write_header_to_buf(buf: &mut [u8], header: &DatabaseHeader) { buf[0..16].copy_from_slice(&header.magic); buf[16..18].copy_from_slice(&header.page_size.to_be_bytes()); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 45114fed4..884df632b 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1123,4 +1123,8 @@ impl WalFileShared { pub fn page_size(&self) -> u32 { self.wal_header.lock().page_size } + + pub fn set_page_size(&self, page_size: u32) { + self.wal_header.lock().page_size = page_size; + } } diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 45f554fc9..ce6490e99 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -9,7 +9,7 @@ use turso_sqlite3_parser::ast::{PragmaName, QualifiedName}; use crate::pragma::pragma_for; use crate::schema::Schema; use crate::storage::pager::AutoVacuumMode; -use crate::storage::sqlite3_ondisk::MIN_PAGE_CACHE_SIZE; +use crate::storage::sqlite3_ondisk::{is_valid_page_size, MIN_PAGE_CACHE_SIZE}; use crate::storage::wal::CheckpointMode; use crate::translate::schema::translate_create_table; use crate::util::{normalize_ident, parse_signed_number, parse_string}; @@ -149,7 +149,13 @@ fn update_pragma( unreachable!(); } PragmaName::PageSize => { - bail_parse_error!("Updating database page size is not supported."); + let page_size = match parse_signed_number(&value)? { + Value::Integer(size) => size, + Value::Float(size) => size as i64, + _ => bail_parse_error!("Invalid value for page size pragma"), + }; + update_page_size(connection, page_size as u32)?; + Ok(program) } PragmaName::AutoVacuum => { let auto_vacuum_mode = match value { @@ -528,3 +534,11 @@ fn turso_cdc_table_columns() -> Vec { }, ] } +fn update_page_size(connection: Arc, page_size: u32) -> crate::Result<()> { + if !is_valid_page_size(page_size) { + return Ok(()); + } + + connection.reset_page_size(page_size)?; + Ok(()) +} diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index be9f666cb..d77869386 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -26,6 +26,7 @@ use crate::{ }, printf::exec_printf, }, + IO, }; use std::ops::DerefMut; use std::sync::atomic::AtomicUsize; @@ -55,9 +56,7 @@ use crate::{ vector::{vector32, vector64, vector_distance_cos, vector_distance_l2, vector_extract}, }; -use crate::{ - info, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult, TransactionState, IO, -}; +use crate::{info, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult, TransactionState}; use super::{ insn::{Cookie, RegisterOrLiteral}, @@ -5936,7 +5935,7 @@ pub fn op_open_ephemeral( OpOpenEphemeralState::Start => { tracing::trace!("Start"); let conn = program.connection.clone(); - let io = conn.pager.io.get_memory_io(); + let io = conn.pager.borrow().io.get_memory_io(); let file = io.open_file("", OpenFlags::Create, true)?; let db_file = Arc::new(FileMemoryStorage::new(file));