mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 00:45:37 +01:00
Support set page size
This commit is contained in:
213
core/lib.rs
213
core/lib.rs
@@ -77,7 +77,7 @@ use std::{
|
||||
use storage::database::DatabaseFile;
|
||||
use storage::page_cache::DumbLruPageCache;
|
||||
pub use storage::pager::PagerCacheflushStatus;
|
||||
use storage::pager::{DB_STATE_INITIALIZED, DB_STATE_UNITIALIZED};
|
||||
use storage::pager::{DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED};
|
||||
pub use storage::{
|
||||
buffer_pool::BufferPool,
|
||||
database::DatabaseStorage,
|
||||
@@ -117,7 +117,7 @@ pub struct Database {
|
||||
// create DB connections.
|
||||
_shared_page_cache: Arc<RwLock<DumbLruPageCache>>,
|
||||
maybe_shared_wal: RwLock<Option<Arc<UnsafeCell<WalFileShared>>>>,
|
||||
is_empty: Arc<AtomicUsize>,
|
||||
db_state: Arc<AtomicUsize>,
|
||||
init_lock: Arc<Mutex<()>>,
|
||||
open_flags: OpenFlags,
|
||||
}
|
||||
@@ -192,8 +192,9 @@ impl Database {
|
||||
.as_ref()
|
||||
.is_some_and(|wal| unsafe { &*wal.get() }.max_frame.load(Ordering::SeqCst) > 0);
|
||||
|
||||
let is_empty = if db_size == 0 && !wal_has_frames {
|
||||
DB_STATE_UNITIALIZED
|
||||
// No pages in DB file or WAL -> empty database
|
||||
let db_state = if db_size == 0 && !wal_has_frames {
|
||||
DB_STATE_UNINITIALIZED
|
||||
} else {
|
||||
DB_STATE_INITIALIZED
|
||||
};
|
||||
@@ -209,13 +210,13 @@ impl Database {
|
||||
db_file,
|
||||
io: io.clone(),
|
||||
open_flags: flags,
|
||||
is_empty: Arc::new(AtomicUsize::new(is_empty)),
|
||||
db_state: Arc::new(AtomicUsize::new(db_state)),
|
||||
init_lock: Arc::new(Mutex::new(())),
|
||||
};
|
||||
let db = Arc::new(db);
|
||||
|
||||
// Check: https://github.com/tursodatabase/turso/pull/1761#discussion_r2154013123
|
||||
if is_empty == 2 {
|
||||
if db_state == DB_STATE_INITIALIZED {
|
||||
// parse schema
|
||||
let conn = db.connect()?;
|
||||
let schema_version = get_schema_version(&conn)?;
|
||||
@@ -239,90 +240,16 @@ impl Database {
|
||||
}
|
||||
|
||||
pub fn connect(self: &Arc<Database>) -> Result<Arc<Connection>> {
|
||||
let buffer_pool = Arc::new(BufferPool::new(None));
|
||||
let pager = self.init_pager(None)?;
|
||||
|
||||
// Open existing WAL file if present
|
||||
if let Some(shared_wal) = self.maybe_shared_wal.read().clone() {
|
||||
// No pages in DB file or WAL -> empty database
|
||||
let is_empty = self.is_empty.clone();
|
||||
let wal = Rc::new(RefCell::new(WalFile::new(
|
||||
self.io.clone(),
|
||||
shared_wal,
|
||||
buffer_pool.clone(),
|
||||
)));
|
||||
let pager = Rc::new(Pager::new(
|
||||
self.db_file.clone(),
|
||||
wal,
|
||||
self.io.clone(),
|
||||
Arc::new(RwLock::new(DumbLruPageCache::default())),
|
||||
buffer_pool,
|
||||
is_empty,
|
||||
self.init_lock.clone(),
|
||||
)?);
|
||||
|
||||
let page_size = header_accessor::get_page_size(&pager)
|
||||
.unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE)
|
||||
as u32;
|
||||
let default_cache_size = header_accessor::get_default_page_cache_size(&pager)
|
||||
.unwrap_or(storage::sqlite3_ondisk::DEFAULT_CACHE_SIZE);
|
||||
pager.buffer_pool.set_page_size(page_size as usize);
|
||||
let conn = Arc::new(Connection {
|
||||
_db: self.clone(),
|
||||
pager: pager.clone(),
|
||||
schema: RefCell::new(self.schema.read().clone()),
|
||||
last_insert_rowid: Cell::new(0),
|
||||
auto_commit: Cell::new(true),
|
||||
mv_transactions: RefCell::new(Vec::new()),
|
||||
transaction_state: Cell::new(TransactionState::None),
|
||||
last_change: Cell::new(0),
|
||||
syms: RefCell::new(SymbolTable::new()),
|
||||
total_changes: Cell::new(0),
|
||||
_shared_cache: false,
|
||||
cache_size: Cell::new(default_cache_size),
|
||||
readonly: Cell::new(false),
|
||||
wal_checkpoint_disabled: Cell::new(false),
|
||||
capture_data_changes: RefCell::new(CaptureDataChangesMode::Off),
|
||||
closed: Cell::new(false),
|
||||
});
|
||||
if let Err(e) = conn.register_builtins() {
|
||||
return Err(LimboError::ExtensionError(e));
|
||||
}
|
||||
return Ok(conn);
|
||||
};
|
||||
|
||||
// No existing WAL; create one.
|
||||
// TODO: currently Pager needs to be instantiated with some implementation of trait Wal, so here's a workaround.
|
||||
let dummy_wal = Rc::new(RefCell::new(DummyWAL {}));
|
||||
let is_empty = self.is_empty.clone();
|
||||
let mut pager = Pager::new(
|
||||
self.db_file.clone(),
|
||||
dummy_wal,
|
||||
self.io.clone(),
|
||||
Arc::new(RwLock::new(DumbLruPageCache::default())),
|
||||
buffer_pool.clone(),
|
||||
is_empty,
|
||||
Arc::new(Mutex::new(())),
|
||||
)?;
|
||||
let page_size = header_accessor::get_page_size(&pager)
|
||||
.unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) as u32;
|
||||
let default_cache_size = header_accessor::get_default_page_cache_size(&pager)
|
||||
.unwrap_or(storage::sqlite3_ondisk::DEFAULT_CACHE_SIZE);
|
||||
|
||||
let wal_path = format!("{}-wal", self.path);
|
||||
let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?;
|
||||
let real_shared_wal = WalFileShared::new_shared(page_size, &self.io, file)?;
|
||||
// Modify Database::maybe_shared_wal to point to the new WAL file so that other connections
|
||||
// can open the existing WAL.
|
||||
*self.maybe_shared_wal.write() = Some(real_shared_wal.clone());
|
||||
let wal = Rc::new(RefCell::new(WalFile::new(
|
||||
self.io.clone(),
|
||||
real_shared_wal,
|
||||
buffer_pool,
|
||||
)));
|
||||
pager.set_wal(wal);
|
||||
let conn = Arc::new(Connection {
|
||||
_db: self.clone(),
|
||||
pager: Rc::new(pager),
|
||||
pager: RefCell::new(Rc::new(pager)),
|
||||
schema: RefCell::new(self.schema.read().clone()),
|
||||
auto_commit: Cell::new(true),
|
||||
mv_transactions: RefCell::new(Vec::new()),
|
||||
@@ -333,6 +260,7 @@ impl Database {
|
||||
syms: RefCell::new(SymbolTable::new()),
|
||||
_shared_cache: false,
|
||||
cache_size: Cell::new(default_cache_size),
|
||||
page_size: Cell::new(page_size),
|
||||
readonly: Cell::new(false),
|
||||
wal_checkpoint_disabled: Cell::new(false),
|
||||
capture_data_changes: RefCell::new(CaptureDataChangesMode::Off),
|
||||
@@ -345,6 +273,78 @@ impl Database {
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
fn init_pager(&self, page_size: Option<usize>) -> Result<Pager> {
|
||||
// Open existing WAL file if present
|
||||
if let Some(shared_wal) = self.maybe_shared_wal.read().clone() {
|
||||
let size = match page_size {
|
||||
None => unsafe { (*shared_wal.get()).page_size() as usize },
|
||||
Some(size) => {
|
||||
unsafe { (*shared_wal.get()).set_page_size(size as u32) };
|
||||
size
|
||||
}
|
||||
};
|
||||
let buffer_pool = Arc::new(BufferPool::new(Some(size)));
|
||||
|
||||
let db_state = self.db_state.clone();
|
||||
let wal = Rc::new(RefCell::new(WalFile::new(
|
||||
self.io.clone(),
|
||||
shared_wal,
|
||||
buffer_pool.clone(),
|
||||
)));
|
||||
let pager = Pager::new(
|
||||
self.db_file.clone(),
|
||||
wal,
|
||||
self.io.clone(),
|
||||
Arc::new(RwLock::new(DumbLruPageCache::default())),
|
||||
buffer_pool.clone(),
|
||||
db_state,
|
||||
self.init_lock.clone(),
|
||||
)?;
|
||||
return Ok(pager);
|
||||
}
|
||||
|
||||
let buffer_pool = Arc::new(BufferPool::new(page_size));
|
||||
// No existing WAL; create one.
|
||||
// TODO: currently Pager needs to be instantiated with some implementation of trait Wal, so here's a workaround.
|
||||
let dummy_wal = Rc::new(RefCell::new(DummyWAL {}));
|
||||
let db_state = self.db_state.clone();
|
||||
let mut pager = Pager::new(
|
||||
self.db_file.clone(),
|
||||
dummy_wal,
|
||||
self.io.clone(),
|
||||
Arc::new(RwLock::new(DumbLruPageCache::default())),
|
||||
buffer_pool.clone(),
|
||||
db_state,
|
||||
Arc::new(Mutex::new(())),
|
||||
)?;
|
||||
|
||||
let size = match page_size {
|
||||
Some(size) => size as u32,
|
||||
None => {
|
||||
let size = header_accessor::get_page_size(&pager)
|
||||
.unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE)
|
||||
as u32;
|
||||
buffer_pool.set_page_size(size as usize);
|
||||
size
|
||||
}
|
||||
};
|
||||
|
||||
let wal_path = format!("{}-wal", self.path);
|
||||
let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?;
|
||||
let real_shared_wal = WalFileShared::new_shared(size, &self.io, file)?;
|
||||
// Modify Database::maybe_shared_wal to point to the new WAL file so that other connections
|
||||
// can open the existing WAL.
|
||||
*self.maybe_shared_wal.write() = Some(real_shared_wal.clone());
|
||||
let wal = Rc::new(RefCell::new(WalFile::new(
|
||||
self.io.clone(),
|
||||
real_shared_wal,
|
||||
buffer_pool,
|
||||
)));
|
||||
pager.set_wal(wal);
|
||||
|
||||
Ok(pager)
|
||||
}
|
||||
|
||||
/// Open a new database file with optionally specifying a VFS without an existing database
|
||||
/// connection and symbol table to register extensions.
|
||||
#[cfg(feature = "fs")]
|
||||
@@ -499,7 +499,7 @@ impl CaptureDataChangesMode {
|
||||
|
||||
pub struct Connection {
|
||||
_db: Arc<Database>,
|
||||
pager: Rc<Pager>,
|
||||
pager: RefCell<Rc<Pager>>,
|
||||
schema: RefCell<Schema>,
|
||||
/// Whether to automatically commit transaction
|
||||
auto_commit: Cell<bool>,
|
||||
@@ -511,6 +511,7 @@ pub struct Connection {
|
||||
syms: RefCell<SymbolTable>,
|
||||
_shared_cache: bool,
|
||||
cache_size: Cell<i32>,
|
||||
page_size: Cell<u32>,
|
||||
readonly: Cell<bool>,
|
||||
wal_checkpoint_disabled: Cell<bool>,
|
||||
capture_data_changes: RefCell<CaptureDataChangesMode>,
|
||||
@@ -545,7 +546,7 @@ impl Connection {
|
||||
let program = Rc::new(translate::translate(
|
||||
self.schema.borrow().deref(),
|
||||
stmt,
|
||||
self.pager.clone(),
|
||||
self.pager.borrow().clone(),
|
||||
self.clone(),
|
||||
&syms,
|
||||
QueryMode::Normal,
|
||||
@@ -554,7 +555,7 @@ impl Connection {
|
||||
Ok(Statement::new(
|
||||
program,
|
||||
self._db.mv_store.clone(),
|
||||
self.pager.clone(),
|
||||
self.pager.borrow().clone(),
|
||||
))
|
||||
}
|
||||
Cmd::Explain(_stmt) => todo!(),
|
||||
@@ -596,7 +597,7 @@ impl Connection {
|
||||
let program = translate::translate(
|
||||
self.schema.borrow().deref(),
|
||||
stmt.clone(),
|
||||
self.pager.clone(),
|
||||
self.pager.borrow().clone(),
|
||||
self.clone(),
|
||||
&syms,
|
||||
cmd.into(),
|
||||
@@ -605,7 +606,7 @@ impl Connection {
|
||||
let stmt = Statement::new(
|
||||
program.into(),
|
||||
self._db.mv_store.clone(),
|
||||
self.pager.clone(),
|
||||
self.pager.borrow().clone(),
|
||||
);
|
||||
Ok(Some(stmt))
|
||||
}
|
||||
@@ -656,7 +657,7 @@ impl Connection {
|
||||
let program = translate::translate(
|
||||
self.schema.borrow().deref(),
|
||||
stmt,
|
||||
self.pager.clone(),
|
||||
self.pager.borrow().clone(),
|
||||
self.clone(),
|
||||
&syms,
|
||||
QueryMode::Explain,
|
||||
@@ -669,7 +670,7 @@ impl Connection {
|
||||
let program = translate::translate(
|
||||
self.schema.borrow().deref(),
|
||||
stmt,
|
||||
self.pager.clone(),
|
||||
self.pager.borrow().clone(),
|
||||
self.clone(),
|
||||
&syms,
|
||||
QueryMode::Normal,
|
||||
@@ -682,7 +683,7 @@ impl Connection {
|
||||
let res = program.step(
|
||||
&mut state,
|
||||
self._db.mv_store.clone(),
|
||||
self.pager.clone(),
|
||||
self.pager.borrow().clone(),
|
||||
)?;
|
||||
if matches!(res, StepResult::Done) {
|
||||
break;
|
||||
@@ -703,7 +704,7 @@ impl Connection {
|
||||
if res.is_err() {
|
||||
let state = self.transaction_state.get();
|
||||
if let TransactionState::Write { schema_did_change } = state {
|
||||
self.pager.rollback(schema_did_change, self)?
|
||||
self.pager.borrow().rollback(schema_did_change, self)?
|
||||
}
|
||||
}
|
||||
res
|
||||
@@ -750,7 +751,7 @@ impl Connection {
|
||||
}
|
||||
|
||||
pub fn wal_frame_count(&self) -> Result<u64> {
|
||||
self.pager.wal_frame_count()
|
||||
self.pager.borrow().wal_frame_count()
|
||||
}
|
||||
|
||||
pub fn wal_get_frame(
|
||||
@@ -759,7 +760,9 @@ impl Connection {
|
||||
p_frame: *mut u8,
|
||||
frame_len: u32,
|
||||
) -> Result<Arc<Completion>> {
|
||||
self.pager.wal_get_frame(frame_no, p_frame, frame_len)
|
||||
self.pager
|
||||
.borrow()
|
||||
.wal_get_frame(frame_no, p_frame, frame_len)
|
||||
}
|
||||
|
||||
/// Flush dirty pages to disk.
|
||||
@@ -770,11 +773,13 @@ impl Connection {
|
||||
if self.closed.get() {
|
||||
return Err(LimboError::InternalError("Connection closed".to_string()));
|
||||
}
|
||||
self.pager.cacheflush(self.wal_checkpoint_disabled.get())
|
||||
self.pager
|
||||
.borrow()
|
||||
.cacheflush(self.wal_checkpoint_disabled.get())
|
||||
}
|
||||
|
||||
pub fn clear_page_cache(&self) -> Result<()> {
|
||||
self.pager.clear_page_cache();
|
||||
self.pager.borrow().clear_page_cache();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -783,6 +788,7 @@ impl Connection {
|
||||
return Err(LimboError::InternalError("Connection closed".to_string()));
|
||||
}
|
||||
self.pager
|
||||
.borrow()
|
||||
.wal_checkpoint(self.wal_checkpoint_disabled.get())
|
||||
}
|
||||
|
||||
@@ -793,6 +799,7 @@ impl Connection {
|
||||
}
|
||||
self.closed.set(true);
|
||||
self.pager
|
||||
.borrow()
|
||||
.checkpoint_shutdown(self.wal_checkpoint_disabled.get())
|
||||
}
|
||||
|
||||
@@ -831,6 +838,22 @@ impl Connection {
|
||||
pub fn set_capture_data_changes(&self, opts: CaptureDataChangesMode) {
|
||||
self.capture_data_changes.replace(opts);
|
||||
}
|
||||
pub fn get_page_size(&self) -> u32 {
|
||||
self.page_size.get()
|
||||
}
|
||||
|
||||
/// Reset the page size for the current connection. Can only be called when db is uninitialized.
|
||||
pub fn reset_page_size(&self, size: u32) -> Result<()> {
|
||||
if self._db.db_state.load(Ordering::SeqCst) != DB_STATE_UNINITIALIZED {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.page_size.set(size);
|
||||
let pager = self._db.init_pager(Some(size as usize))?;
|
||||
self.pager.replace(Rc::new(pager));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs")]
|
||||
pub fn open_new(&self, path: &str, vfs: &str) -> Result<(Arc<dyn IO>, Arc<Database>)> {
|
||||
|
||||
@@ -6593,7 +6593,7 @@ mod tests {
|
||||
&mut payload,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.clone(),
|
||||
conn.pager.borrow().clone(),
|
||||
);
|
||||
insert_into_cell(page, &payload, pos, 4096).unwrap();
|
||||
payload
|
||||
@@ -6831,7 +6831,7 @@ mod tests {
|
||||
let io: Arc<dyn IO> = Arc::new(MemoryIO::new());
|
||||
let db = Database::open_file(io.clone(), "test.db", false, false).unwrap();
|
||||
let conn = db.connect().unwrap();
|
||||
let pager = conn.pager.clone();
|
||||
let pager = conn.pager.borrow().clone();
|
||||
|
||||
// FIXME: handle page cache is full
|
||||
let _ = run_until_done(|| pager.allocate_page1(), &pager);
|
||||
@@ -7717,7 +7717,7 @@ mod tests {
|
||||
&mut payload,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.clone(),
|
||||
conn.pager.borrow().clone(),
|
||||
);
|
||||
if (free as usize) < payload.len() + 2 {
|
||||
// do not try to insert overflow pages because they require balancing
|
||||
@@ -7790,7 +7790,7 @@ mod tests {
|
||||
&mut payload,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.clone(),
|
||||
conn.pager.borrow().clone(),
|
||||
);
|
||||
if (free as usize) < payload.len() - 2 {
|
||||
// do not try to insert overflow pages because they require balancing
|
||||
@@ -8154,7 +8154,7 @@ mod tests {
|
||||
&mut payload,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.clone(),
|
||||
conn.pager.borrow().clone(),
|
||||
);
|
||||
let page = page.get();
|
||||
insert(0, page.get_contents());
|
||||
@@ -8231,7 +8231,7 @@ mod tests {
|
||||
&mut payload,
|
||||
&record,
|
||||
4096,
|
||||
conn.pager.clone(),
|
||||
conn.pager.borrow().clone(),
|
||||
);
|
||||
insert_into_cell(page.get().get_contents(), &payload, 0, 4096).unwrap();
|
||||
let free = compute_free_space(page.get().get_contents(), usable_space);
|
||||
|
||||
@@ -35,7 +35,7 @@ const HEADER_OFFSET_VERSION_NUMBER: usize = 96;
|
||||
|
||||
// Helper to get a read-only reference to the header page.
|
||||
fn get_header_page(pager: &Pager) -> Result<CursorResult<PageRef>> {
|
||||
if pager.is_empty.load(Ordering::SeqCst) < 2 {
|
||||
if pager.db_state.load(Ordering::SeqCst) < 2 {
|
||||
return Err(LimboError::InternalError(
|
||||
"Database is empty, header does not exist - page 1 should've been allocated before this".to_string(),
|
||||
));
|
||||
@@ -49,7 +49,7 @@ fn get_header_page(pager: &Pager) -> Result<CursorResult<PageRef>> {
|
||||
|
||||
// Helper to get a writable reference to the header page and mark it dirty.
|
||||
fn get_header_page_for_write(pager: &Pager) -> Result<CursorResult<PageRef>> {
|
||||
if pager.is_empty.load(Ordering::SeqCst) < 2 {
|
||||
if pager.db_state.load(Ordering::SeqCst) < 2 {
|
||||
// This should not be called on an empty DB for writing, as page 1 is allocated on first transaction.
|
||||
return Err(LimboError::InternalError(
|
||||
"Cannot write to header of an empty database - page 1 should've been allocated before this".to_string(),
|
||||
@@ -103,7 +103,7 @@ macro_rules! impl_header_field_accessor {
|
||||
// Async version
|
||||
#[allow(dead_code)]
|
||||
pub fn [<get_ $field_name _async>](pager: &Pager) -> Result<CursorResult<$type>> {
|
||||
if pager.is_empty.load(Ordering::SeqCst) < 2 {
|
||||
if pager.db_state.load(Ordering::SeqCst) < 2 {
|
||||
return Err(LimboError::InternalError(format!("Database is empty, header does not exist - page 1 should've been allocated before this")));
|
||||
}
|
||||
let page = match get_header_page(pager)? {
|
||||
|
||||
@@ -6,8 +6,8 @@ use crate::storage::header_accessor;
|
||||
use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType};
|
||||
use crate::storage::wal::{CheckpointResult, Wal, WalFsyncStatus};
|
||||
use crate::types::CursorResult;
|
||||
use crate::Completion;
|
||||
use crate::{Buffer, Connection, LimboError, Result};
|
||||
use crate::{Completion, WalFile};
|
||||
use parking_lot::RwLock;
|
||||
use std::cell::{OnceCell, RefCell, UnsafeCell};
|
||||
use std::collections::HashSet;
|
||||
@@ -191,7 +191,7 @@ pub enum AutoVacuumMode {
|
||||
Incremental,
|
||||
}
|
||||
|
||||
pub const DB_STATE_UNITIALIZED: usize = 0;
|
||||
pub const DB_STATE_UNINITIALIZED: usize = 0;
|
||||
pub const DB_STATE_INITIALIZING: usize = 1;
|
||||
pub const DB_STATE_INITIALIZED: usize = 2;
|
||||
/// The pager interface implements the persistence layer by providing access
|
||||
@@ -218,7 +218,7 @@ pub struct Pager {
|
||||
/// 0 -> Database is empty,
|
||||
/// 1 -> Database is being initialized,
|
||||
/// 2 -> Database is initialized and ready for use.
|
||||
pub is_empty: Arc<AtomicUsize>,
|
||||
pub db_state: Arc<AtomicUsize>,
|
||||
/// Mutex for synchronizing database initialization to prevent race conditions
|
||||
init_lock: Arc<Mutex<()>>,
|
||||
allocate_page1_state: RefCell<AllocatePage1State>,
|
||||
@@ -265,10 +265,10 @@ impl Pager {
|
||||
io: Arc<dyn crate::io::IO>,
|
||||
page_cache: Arc<RwLock<DumbLruPageCache>>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
is_empty: Arc<AtomicUsize>,
|
||||
db_state: Arc<AtomicUsize>,
|
||||
init_lock: Arc<Mutex<()>>,
|
||||
) -> Result<Self> {
|
||||
let allocate_page1_state = if is_empty.load(Ordering::SeqCst) < DB_STATE_INITIALIZED {
|
||||
let allocate_page1_state = if db_state.load(Ordering::SeqCst) < DB_STATE_INITIALIZED {
|
||||
RefCell::new(AllocatePage1State::Start)
|
||||
} else {
|
||||
RefCell::new(AllocatePage1State::Done)
|
||||
@@ -288,7 +288,7 @@ impl Pager {
|
||||
checkpoint_inflight: Rc::new(RefCell::new(0)),
|
||||
buffer_pool,
|
||||
auto_vacuum_mode: RefCell::new(AutoVacuumMode::None),
|
||||
is_empty,
|
||||
db_state,
|
||||
init_lock,
|
||||
allocate_page1_state,
|
||||
page_size: OnceCell::new(),
|
||||
@@ -296,7 +296,7 @@ impl Pager {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_wal(&mut self, wal: Rc<RefCell<WalFile>>) {
|
||||
pub fn set_wal(&mut self, wal: Rc<RefCell<dyn Wal>>) {
|
||||
self.wal = wal;
|
||||
}
|
||||
|
||||
@@ -608,10 +608,10 @@ impl Pager {
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
fn maybe_allocate_page1(&self) -> Result<CursorResult<()>> {
|
||||
if self.is_empty.load(Ordering::SeqCst) < DB_STATE_INITIALIZED {
|
||||
if self.db_state.load(Ordering::SeqCst) < DB_STATE_INITIALIZED {
|
||||
if let Ok(_lock) = self.init_lock.try_lock() {
|
||||
match (
|
||||
self.is_empty.load(Ordering::SeqCst),
|
||||
self.db_state.load(Ordering::SeqCst),
|
||||
self.allocating_page1(),
|
||||
) {
|
||||
// In case of being empty or (allocating and this connection is performing allocation) then allocate the first page
|
||||
@@ -1054,7 +1054,7 @@ impl Pager {
|
||||
match state {
|
||||
AllocatePage1State::Start => {
|
||||
tracing::trace!("allocate_page1(Start)");
|
||||
self.is_empty.store(DB_STATE_INITIALIZING, Ordering::SeqCst);
|
||||
self.db_state.store(DB_STATE_INITIALIZING, Ordering::SeqCst);
|
||||
let mut default_header = DatabaseHeader::default();
|
||||
default_header.database_size += 1;
|
||||
let page = allocate_page(1, &self.buffer_pool, 0);
|
||||
@@ -1100,7 +1100,7 @@ impl Pager {
|
||||
cache.insert(page_key, page1_ref.clone()).map_err(|e| {
|
||||
LimboError::InternalError(format!("Failed to insert page 1 into cache: {e:?}"))
|
||||
})?;
|
||||
self.is_empty.store(DB_STATE_INITIALIZED, Ordering::SeqCst);
|
||||
self.db_state.store(DB_STATE_INITIALIZED, Ordering::SeqCst);
|
||||
self.allocate_page1_state.replace(AllocatePage1State::Done);
|
||||
Ok(CursorResult::Ok(page1_ref.clone()))
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ pub const MIN_PAGE_CACHE_SIZE: usize = 10;
|
||||
pub const MIN_PAGE_SIZE: u32 = 512;
|
||||
|
||||
/// The maximum page size in bytes.
|
||||
const MAX_PAGE_SIZE: u32 = 65536;
|
||||
pub const MAX_PAGE_SIZE: u32 = 65536;
|
||||
|
||||
/// The default page size in bytes.
|
||||
pub const DEFAULT_PAGE_SIZE: u16 = 4096;
|
||||
@@ -279,7 +279,7 @@ impl Default for DatabaseHeader {
|
||||
|
||||
impl DatabaseHeader {
|
||||
pub fn update_page_size(&mut self, size: u32) {
|
||||
if !(MIN_PAGE_SIZE..=MAX_PAGE_SIZE).contains(&size) || (size & (size - 1) != 0) {
|
||||
if !is_valid_page_size(size) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -299,6 +299,10 @@ impl DatabaseHeader {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_valid_page_size(size: u32) -> bool {
|
||||
(MIN_PAGE_SIZE..=MAX_PAGE_SIZE).contains(&size) && (size & (size - 1)) == 0
|
||||
}
|
||||
|
||||
pub fn write_header_to_buf(buf: &mut [u8], header: &DatabaseHeader) {
|
||||
buf[0..16].copy_from_slice(&header.magic);
|
||||
buf[16..18].copy_from_slice(&header.page_size.to_be_bytes());
|
||||
|
||||
@@ -1123,4 +1123,8 @@ impl WalFileShared {
|
||||
pub fn page_size(&self) -> u32 {
|
||||
self.wal_header.lock().page_size
|
||||
}
|
||||
|
||||
pub fn set_page_size(&self, page_size: u32) {
|
||||
self.wal_header.lock().page_size = page_size;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ use turso_sqlite3_parser::ast::{PragmaName, QualifiedName};
|
||||
use crate::pragma::pragma_for;
|
||||
use crate::schema::Schema;
|
||||
use crate::storage::pager::AutoVacuumMode;
|
||||
use crate::storage::sqlite3_ondisk::MIN_PAGE_CACHE_SIZE;
|
||||
use crate::storage::sqlite3_ondisk::{is_valid_page_size, MIN_PAGE_CACHE_SIZE};
|
||||
use crate::storage::wal::CheckpointMode;
|
||||
use crate::translate::schema::translate_create_table;
|
||||
use crate::util::{normalize_ident, parse_signed_number, parse_string};
|
||||
@@ -149,7 +149,13 @@ fn update_pragma(
|
||||
unreachable!();
|
||||
}
|
||||
PragmaName::PageSize => {
|
||||
bail_parse_error!("Updating database page size is not supported.");
|
||||
let page_size = match parse_signed_number(&value)? {
|
||||
Value::Integer(size) => size,
|
||||
Value::Float(size) => size as i64,
|
||||
_ => bail_parse_error!("Invalid value for page size pragma"),
|
||||
};
|
||||
update_page_size(connection, page_size as u32)?;
|
||||
Ok(program)
|
||||
}
|
||||
PragmaName::AutoVacuum => {
|
||||
let auto_vacuum_mode = match value {
|
||||
@@ -528,3 +534,11 @@ fn turso_cdc_table_columns() -> Vec<ColumnDefinition> {
|
||||
},
|
||||
]
|
||||
}
|
||||
fn update_page_size(connection: Arc<crate::Connection>, page_size: u32) -> crate::Result<()> {
|
||||
if !is_valid_page_size(page_size) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
connection.reset_page_size(page_size)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ use crate::{
|
||||
},
|
||||
printf::exec_printf,
|
||||
},
|
||||
IO,
|
||||
};
|
||||
use std::ops::DerefMut;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
@@ -55,9 +56,7 @@ use crate::{
|
||||
vector::{vector32, vector64, vector_distance_cos, vector_distance_l2, vector_extract},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
info, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult, TransactionState, IO,
|
||||
};
|
||||
use crate::{info, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult, TransactionState};
|
||||
|
||||
use super::{
|
||||
insn::{Cookie, RegisterOrLiteral},
|
||||
@@ -5936,7 +5935,7 @@ pub fn op_open_ephemeral(
|
||||
OpOpenEphemeralState::Start => {
|
||||
tracing::trace!("Start");
|
||||
let conn = program.connection.clone();
|
||||
let io = conn.pager.io.get_memory_io();
|
||||
let io = conn.pager.borrow().io.get_memory_io();
|
||||
|
||||
let file = io.open_file("", OpenFlags::Create, true)?;
|
||||
let db_file = Arc::new(FileMemoryStorage::new(file));
|
||||
|
||||
Reference in New Issue
Block a user