Read page 1 from pager always, no separate db_header

This commit is contained in:
Jussi Saurio
2025-06-13 13:09:09 +03:00
committed by Diego Reis
parent f01516378b
commit cc2e14b11c
12 changed files with 356 additions and 363 deletions

View File

@@ -34,8 +34,9 @@ mod numeric;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
use crate::storage::wal::DummyWAL;
use crate::translate::optimizer::optimize_plan;
use crate::vtab::VirtualTable;
use crate::{fast_lock::SpinLock, translate::optimizer::optimize_plan};
use core::str;
pub use error::LimboError;
use fallible_iterator::FallibleIterator;
@@ -59,7 +60,7 @@ use std::{
num::NonZero,
ops::Deref,
rc::Rc,
sync::{Arc, OnceLock},
sync::Arc,
};
use storage::btree::{btree_init_page, BTreePageInner};
#[cfg(feature = "fs")]
@@ -86,7 +87,6 @@ use vdbe::builder::QueryMode;
use vdbe::builder::TableRefIdCounter;
pub type Result<T, E = LimboError> = std::result::Result<T, E>;
pub static DATABASE_VERSION: OnceLock<String> = OnceLock::new();
#[derive(Clone, Copy, PartialEq, Eq)]
enum TransactionState {
@@ -102,15 +102,13 @@ pub(crate) type MvCursor = mvcc::cursor::ScanCursor<mvcc::LocalClock>;
pub struct Database {
mv_store: Option<Rc<MvStore>>,
schema: Arc<RwLock<Schema>>,
// TODO: make header work without lock
header: Arc<SpinLock<DatabaseHeader>>,
db_file: Arc<dyn DatabaseStorage>,
path: String,
io: Arc<dyn IO>,
page_size: u32,
// Shared structures of a Database are the parts that are common to multiple threads that might
// create DB connections.
_shared_page_cache: Arc<RwLock<DumbLruPageCache>>,
shared_wal: Arc<UnsafeCell<WalFileShared>>,
maybe_shared_wal: Option<Arc<UnsafeCell<WalFileShared>>>,
open_flags: OpenFlags,
}
@@ -154,18 +152,8 @@ impl Database {
flags: OpenFlags,
enable_mvcc: bool,
) -> Result<Arc<Database>> {
let db_header = Pager::begin_open(db_file.clone())?;
// ensure db header is there
io.run_once()?;
let page_size = db_header.lock().get_page_size();
let wal_path = format!("{}-wal", path);
let shared_wal = WalFileShared::open_shared(&io, wal_path.as_str(), page_size)?;
DATABASE_VERSION.get_or_init(|| {
let version = db_header.lock().version_number;
version.to_string()
});
let maybe_shared_wal = WalFileShared::open_shared_if_exists(&io, wal_path.as_str())?;
let mv_store = if enable_mvcc {
Some(Rc::new(MvStore::new(
@@ -180,13 +168,12 @@ impl Database {
let schema = Arc::new(RwLock::new(Schema::new()));
let db = Database {
mv_store,
path: path.to_string(),
schema: schema.clone(),
header: db_header.clone(),
_shared_page_cache: shared_page_cache.clone(),
shared_wal: shared_wal.clone(),
maybe_shared_wal,
db_file,
io: io.clone(),
page_size,
open_flags: flags,
};
let db = Arc::new(db);
@@ -210,38 +197,76 @@ impl Database {
}
pub fn connect(self: &Arc<Database>) -> Result<Arc<Connection>> {
let buffer_pool = Rc::new(BufferPool::new(self.page_size as usize));
let buffer_pool = Rc::new(BufferPool::new(None));
let conn = if let Some(shared_wal) = self.maybe_shared_wal.clone() {
let wal = Rc::new(RefCell::new(WalFile::new(
self.io.clone(),
shared_wal,
buffer_pool.clone(),
)));
let pager = Rc::new(Pager::new(
self.db_file.clone(),
wal,
self.io.clone(),
Arc::new(RwLock::new(DumbLruPageCache::default())),
buffer_pool,
)?);
let header = pager.db_header()?;
pager
.buffer_pool
.set_page_size(header.get_page_size() as usize);
Arc::new(Connection {
_db: self.clone(),
pager: pager.clone(),
schema: self.schema.clone(),
last_insert_rowid: Cell::new(0),
auto_commit: Cell::new(true),
mv_transactions: RefCell::new(Vec::new()),
transaction_state: Cell::new(TransactionState::None),
last_change: Cell::new(0),
syms: RefCell::new(SymbolTable::new()),
total_changes: Cell::new(0),
_shared_cache: false,
cache_size: Cell::new(header.default_page_cache_size),
})
} else {
let dummy_wal = Rc::new(RefCell::new(DummyWAL {}));
let mut pager = Pager::new(
self.db_file.clone(),
dummy_wal,
self.io.clone(),
Arc::new(RwLock::new(DumbLruPageCache::default())),
buffer_pool.clone(),
)?;
let header = pager.db_header()?;
let wal_path = format!("{}-wal", self.path);
let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?;
let real_shared_wal =
WalFileShared::new_shared(header.get_page_size(), &self.io, file)?;
let wal = Rc::new(RefCell::new(WalFile::new(
self.io.clone(),
real_shared_wal,
buffer_pool,
)));
pager.set_wal(wal);
Arc::new(Connection {
_db: self.clone(),
pager: Rc::new(pager),
schema: self.schema.clone(),
auto_commit: Cell::new(true),
mv_transactions: RefCell::new(Vec::new()),
transaction_state: Cell::new(TransactionState::None),
last_insert_rowid: Cell::new(0),
last_change: Cell::new(0),
total_changes: Cell::new(0),
syms: RefCell::new(SymbolTable::new()),
_shared_cache: false,
cache_size: Cell::new(header.default_page_cache_size),
})
};
let wal = Rc::new(RefCell::new(WalFile::new(
self.io.clone(),
self.page_size,
self.shared_wal.clone(),
buffer_pool.clone(),
)));
// For now let's open database without shared cache by default.
let pager = Rc::new(Pager::finish_open(
self.header.clone(),
self.db_file.clone(),
wal,
self.io.clone(),
Arc::new(RwLock::new(DumbLruPageCache::default())),
buffer_pool,
)?);
let conn = Arc::new(Connection {
_db: self.clone(),
pager: pager.clone(),
schema: self.schema.clone(),
header: self.header.clone(),
last_insert_rowid: Cell::new(0),
auto_commit: Cell::new(true),
mv_transactions: RefCell::new(Vec::new()),
transaction_state: Cell::new(TransactionState::None),
last_change: Cell::new(0),
syms: RefCell::new(SymbolTable::default()),
total_changes: Cell::new(0),
_shared_cache: false,
cache_size: Cell::new(self.header.lock().default_page_cache_size),
});
if let Err(e) = conn.register_builtins() {
return Err(LimboError::ExtensionError(e));
}
@@ -280,7 +305,7 @@ pub fn maybe_init_database_file(file: &Arc<dyn File>, io: &Arc<dyn IO>) -> Resul
let db_header = DatabaseHeader::default();
let page1 = allocate_page(
1,
&Rc::new(BufferPool::new(db_header.get_page_size() as usize)),
&Rc::new(BufferPool::new(Some(db_header.get_page_size() as usize))),
DATABASE_HEADER_SIZE,
);
let page1 = Arc::new(BTreePageInner {
@@ -331,7 +356,6 @@ pub struct Connection {
_db: Arc<Database>,
pager: Rc<Pager>,
schema: Arc<RwLock<Schema>>,
header: Arc<SpinLock<DatabaseHeader>>,
auto_commit: Cell<bool>,
mv_transactions: RefCell<Vec<crate::mvcc::database::TxID>>,
transaction_state: Cell<TransactionState>,
@@ -370,7 +394,6 @@ impl Connection {
.ok_or(LimboError::SchemaLocked)?
.deref(),
stmt,
self.header.clone(),
self.pager.clone(),
self.clone(),
&syms,
@@ -419,7 +442,6 @@ impl Connection {
.ok_or(LimboError::SchemaLocked)?
.deref(),
stmt.clone(),
self.header.clone(),
self.pager.clone(),
self.clone(),
&syms,
@@ -489,7 +511,6 @@ impl Connection {
.ok_or(LimboError::SchemaLocked)?
.deref(),
stmt,
self.header.clone(),
self.pager.clone(),
self.clone(),
&syms,
@@ -506,7 +527,6 @@ impl Connection {
.ok_or(LimboError::SchemaLocked)?
.deref(),
stmt,
self.header.clone(),
self.pager.clone(),
self.clone(),
&syms,
@@ -843,6 +863,13 @@ pub fn resolve_ext_path(extpath: &str) -> Result<std::path::PathBuf> {
}
impl SymbolTable {
pub fn new() -> Self {
Self {
functions: HashMap::new(),
vtabs: HashMap::new(),
vtab_modules: HashMap::new(),
}
}
pub fn resolve_function(
&self,
name: &str,

View File

@@ -3026,7 +3026,7 @@ impl BTreeCursor {
assert_eq!(left_pointer, page.get().get().id as u32);
// FIXME: remove this lock
assert!(
left_pointer <= self.pager.db_header.lock().database_size,
left_pointer <= self.pager.db_header()?.database_size,
"invalid page number divider left pointer {} > database number of pages",
left_pointer,
);
@@ -4660,7 +4660,7 @@ impl BTreeCursor {
}
OverflowState::ProcessPage { next_page } => {
if next_page < 2
|| next_page as usize > self.pager.db_header.lock().database_size as usize
|| next_page as usize > self.pager.db_header()?.database_size as usize
{
self.overflow_state = None;
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
@@ -6516,17 +6516,11 @@ mod tests {
use super::*;
use crate::{
fast_lock::SpinLock,
io::{Buffer, Completion, MemoryIO, OpenFlags, IO},
storage::{
database::DatabaseFile,
page_cache::DumbLruPageCache,
sqlite3_ondisk::{self, DatabaseHeader},
},
storage::{database::DatabaseFile, page_cache::DumbLruPageCache},
types::Text,
vdbe::Register,
BufferPool, Connection, DatabaseStorage, StepResult, WalFile, WalFileShared,
WriteCompletion,
BufferPool, Connection, StepResult, WalFile, WalFileShared, WriteCompletion,
};
use std::{
cell::RefCell, collections::HashSet, mem::transmute, ops::Deref, panic, rc::Rc, sync::Arc,
@@ -6860,32 +6854,30 @@ mod tests {
}
fn empty_btree() -> (Rc<Pager>, usize) {
let db_header = DatabaseHeader::default();
let page_size = db_header.get_page_size();
let page_size = 4096;
#[allow(clippy::arc_with_non_send_sync)]
let io: Arc<dyn IO> = Arc::new(MemoryIO::new());
let io_file = io.open_file("test.db", OpenFlags::Create, false).unwrap();
let db_file = Arc::new(DatabaseFile::new(io_file));
let wal_file = io.open_file("test.wal", OpenFlags::Create, false).unwrap();
let buffer_pool = Rc::new(BufferPool::new(page_size as usize));
let wal_shared = WalFileShared::open_shared(&io, "test.wal", page_size).unwrap();
let wal_file = WalFile::new(io.clone(), page_size, wal_shared, buffer_pool.clone());
let buffer_pool = Rc::new(BufferPool::new(Some(page_size as usize)));
let wal_shared = WalFileShared::new_shared(page_size, &io, wal_file).unwrap();
let wal_file = WalFile::new(io.clone(), wal_shared, buffer_pool.clone());
let wal = Rc::new(RefCell::new(wal_file));
let page_cache = Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(2000)));
let pager = {
let db_header = Arc::new(SpinLock::new(db_header.clone()));
Pager::finish_open(db_header, db_file, wal, io, page_cache, buffer_pool).unwrap()
};
let pager = { Pager::new(db_file, wal, io, page_cache, buffer_pool).unwrap() };
let pager = Rc::new(pager);
// FIXME: handle page cache is full
let page1 = pager.allocate_page().unwrap();
let page1 = Arc::new(BTreePageInner {
page: RefCell::new(page1),
pager.allocate_page1().unwrap();
let page2 = pager.allocate_page().unwrap();
let page2 = Arc::new(BTreePageInner {
page: RefCell::new(page2),
});
btree_init_page(&page1, PageType::TableLeaf, 0, 4096);
(pager, page1.get().get().id)
btree_init_page(&page2, PageType::TableLeaf, 0, 4096);
(pager, page2.get().get().id)
}
#[test]
@@ -7367,14 +7359,10 @@ mod tests {
}
#[allow(clippy::arc_with_non_send_sync)]
fn setup_test_env(database_size: u32) -> (Rc<Pager>, Arc<SpinLock<DatabaseHeader>>) {
fn setup_test_env(database_size: u32) -> Rc<Pager> {
let page_size = 512;
let mut db_header = DatabaseHeader::default();
db_header.update_page_size(page_size);
db_header.database_size = database_size;
let db_header = Arc::new(SpinLock::new(db_header));
let buffer_pool = Rc::new(BufferPool::new(10));
let buffer_pool = Rc::new(BufferPool::new(Some(page_size as usize)));
// Initialize buffer pool with correctly sized buffers
for _ in 0..10 {
@@ -7387,29 +7375,16 @@ mod tests {
io.open_file("test.db", OpenFlags::Create, false).unwrap(),
));
let drop_fn = Rc::new(|_buf| {});
let buf = Arc::new(RefCell::new(Buffer::allocate(page_size as usize, drop_fn)));
{
let mut buf_mut = buf.borrow_mut();
let buf_slice = buf_mut.as_mut_slice();
sqlite3_ondisk::write_header_to_buf(buf_slice, &db_header.lock());
}
let write_complete = Box::new(|_| {});
let c = Completion::Write(WriteCompletion::new(write_complete));
db_file.write_page(1, buf.clone(), Arc::new(c)).unwrap();
let wal_shared = WalFileShared::open_shared(&io, "test.wal", page_size).unwrap();
let wal_file = io.open_file("test.wal", OpenFlags::Create, false).unwrap();
let wal_shared = WalFileShared::new_shared(page_size, &io, wal_file).unwrap();
let wal = Rc::new(RefCell::new(WalFile::new(
io.clone(),
page_size,
wal_shared,
buffer_pool.clone(),
)));
let pager = Rc::new(
Pager::finish_open(
db_header.clone(),
Pager::new(
db_file,
wal,
io,
@@ -7421,13 +7396,31 @@ mod tests {
pager.io.run_once().unwrap();
(pager, db_header)
pager.allocate_page1().unwrap();
for _ in 0..(database_size - 1) {
pager.allocate_page().unwrap();
}
let mut db_header = pager.db_header().unwrap();
db_header.page_size = page_size as u16;
let page1 = pager.read_page(1).unwrap();
while page1.is_locked() {
pager.io.run_once().unwrap();
}
page1.set_dirty();
let page1 = page1.get();
let contents = page1.contents.as_mut().unwrap();
contents.write_database_header(&db_header);
pager.add_dirty(page1.id);
pager
}
#[test]
#[ignore]
pub fn test_clear_overflow_pages() -> Result<()> {
let (pager, db_header) = setup_test_env(5);
let pager = setup_test_env(5);
let mut cursor = BTreeCursor::new_table(None, pager.clone(), 1);
let max_local = payload_overflow_threshold_max(PageType::TableLeaf, 4096);
@@ -7442,7 +7435,7 @@ mod tests {
let drop_fn = Rc::new(|_buf| {});
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(RefCell::new(Buffer::allocate(
db_header.lock().get_page_size() as usize,
pager.db_header().unwrap().get_page_size() as usize,
drop_fn,
)));
let write_complete = Box::new(|_| {});
@@ -7485,20 +7478,20 @@ mod tests {
payload_size: large_payload.len() as u64,
});
let initial_freelist_pages = db_header.lock().freelist_pages;
let initial_freelist_pages = pager.db_header().unwrap().freelist_pages;
// Clear overflow pages
let clear_result = cursor.clear_overflow_pages(&leaf_cell)?;
match clear_result {
CursorResult::Ok(_) => {
// Verify proper number of pages were added to freelist
assert_eq!(
db_header.lock().freelist_pages,
pager.db_header().unwrap().freelist_pages,
initial_freelist_pages + 3,
"Expected 3 pages to be added to freelist"
);
// If this is first trunk page
let trunk_page_id = db_header.lock().freelist_trunk_page;
let trunk_page_id = pager.db_header().unwrap().freelist_trunk_page;
if trunk_page_id > 0 {
// Verify trunk page structure
let trunk_page = cursor.read_page(trunk_page_id as usize)?;
@@ -7528,7 +7521,7 @@ mod tests {
#[test]
pub fn test_clear_overflow_pages_no_overflow() -> Result<()> {
let (pager, db_header) = setup_test_env(5);
let pager = setup_test_env(5);
let mut cursor = BTreeCursor::new_table(None, pager.clone(), 1);
let small_payload = vec![b'A'; 10];
@@ -7541,7 +7534,7 @@ mod tests {
payload_size: small_payload.len() as u64,
});
let initial_freelist_pages = db_header.lock().freelist_pages;
let initial_freelist_pages = pager.db_header().unwrap().freelist_pages;
// Try to clear non-existent overflow pages
let clear_result = cursor.clear_overflow_pages(&leaf_cell)?;
@@ -7549,14 +7542,14 @@ mod tests {
CursorResult::Ok(_) => {
// Verify freelist was not modified
assert_eq!(
db_header.lock().freelist_pages,
pager.db_header().unwrap().freelist_pages,
initial_freelist_pages,
"Freelist should not change when no overflow pages exist"
);
// Verify trunk page wasn't created
assert_eq!(
db_header.lock().freelist_trunk_page,
pager.db_header().unwrap().freelist_trunk_page,
0,
"No trunk page should be created when no overflow pages exist"
);
@@ -7571,26 +7564,15 @@ mod tests {
#[test]
fn test_btree_destroy() -> Result<()> {
let initial_size = 3;
let (pager, db_header) = setup_test_env(initial_size);
let initial_size = 1;
let pager = setup_test_env(initial_size);
let mut cursor = BTreeCursor::new_table(None, pager.clone(), 2);
assert_eq!(
db_header.lock().database_size,
initial_size,
"Database should initially have 3 pages"
);
// Initialize page 2 as a root page (interior)
let root_page = cursor.read_page(2)?;
{
btree_init_page(&root_page, PageType::TableInterior, 0, 512); // Use proper page size
}
let root_page = cursor.allocate_page(PageType::TableInterior, 0);
// Allocate two leaf pages
// FIXME: handle page cache is full
let page3 = cursor.allocate_page(PageType::TableLeaf, 0);
// FIXME: handle page cache is full
let page4 = cursor.allocate_page(PageType::TableLeaf, 0);
// Configure the root page to point to the two leaf pages
@@ -7637,18 +7619,18 @@ mod tests {
// Verify structure before destruction
assert_eq!(
db_header.lock().database_size,
5, // We should have pages 0-4
pager.db_header().unwrap().database_size,
4, // We should have pages 1-4
"Database should have 4 pages total"
);
// Track freelist state before destruction
let initial_free_pages = db_header.lock().freelist_pages;
let initial_free_pages = pager.db_header().unwrap().freelist_pages;
assert_eq!(initial_free_pages, 0, "should start with no free pages");
run_until_done(|| cursor.btree_destroy(), pager.deref())?;
let pages_freed = db_header.lock().freelist_pages - initial_free_pages;
let pages_freed = pager.db_header().unwrap().freelist_pages - initial_free_pages;
assert_eq!(pages_freed, 3, "should free 3 pages (root + 2 leaves)");
Ok(())

View File

@@ -1,26 +1,32 @@
use crate::io::BufferData;
use std::cell::RefCell;
use std::cell::{Cell, RefCell};
use std::pin::Pin;
pub struct BufferPool {
pub free_buffers: RefCell<Vec<BufferData>>,
page_size: usize,
page_size: Cell<usize>,
}
const DEFAULT_PAGE_SIZE: usize = 4096;
impl BufferPool {
pub fn new(page_size: usize) -> Self {
pub fn new(page_size: Option<usize>) -> Self {
Self {
free_buffers: RefCell::new(Vec::new()),
page_size,
page_size: Cell::new(page_size.unwrap_or(DEFAULT_PAGE_SIZE)),
}
}
pub fn set_page_size(&self, page_size: usize) {
self.page_size.set(page_size);
}
pub fn get(&self) -> BufferData {
let mut free_buffers = self.free_buffers.borrow_mut();
if let Some(buffer) = free_buffers.pop() {
buffer
} else {
Pin::new(vec![0; self.page_size])
Pin::new(vec![0; self.page_size.get()])
}
}

View File

@@ -102,7 +102,8 @@ impl DumbLruPageCache {
if let Some(existing_page_ref) = self.get(&key) {
assert!(
Arc::ptr_eq(&value, &existing_page_ref),
"Attempted to insert different page with same key"
"Attempted to insert different page with same key: {:?}",
key
);
return Err(CacheError::KeyExists);
}

View File

@@ -1,22 +1,21 @@
use crate::fast_lock::SpinLock;
use crate::result::LimboResult;
use crate::storage::btree::BTreePageInner;
use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::sqlite3_ondisk::{
self, DatabaseHeader, PageContent, PageType, DATABASE_HEADER_PAGE_ID,
self, DatabaseHeader, PageContent, PageType, DATABASE_HEADER_PAGE_ID, DEFAULT_CACHE_SIZE,
};
use crate::storage::wal::{CheckpointResult, Wal, WalFsyncStatus};
use crate::types::CursorResult;
use crate::Completion;
use crate::{Buffer, LimboError, Result};
use crate::{Completion, WalFile};
use parking_lot::RwLock;
use std::cell::{RefCell, UnsafeCell};
use std::collections::HashSet;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tracing::trace;
use tracing::{trace, Level};
use super::btree::BTreePage;
use super::page_cache::{CacheError, CacheResizeResult, DumbLruPageCache, PageCacheKey};
@@ -203,11 +202,10 @@ pub struct Pager {
/// A page cache for the database.
page_cache: Arc<RwLock<DumbLruPageCache>>,
/// Buffer pool for temporary data storage.
buffer_pool: Rc<BufferPool>,
pub buffer_pool: Rc<BufferPool>,
/// I/O interface for input/output operations.
pub io: Arc<dyn crate::io::IO>,
dirty_pages: Rc<RefCell<HashSet<usize>>>,
pub db_header: Arc<SpinLock<DatabaseHeader>>,
flush_info: RefCell<FlushInfo>,
checkpoint_state: RefCell<CheckpointState>,
@@ -235,14 +233,7 @@ pub enum PagerCacheflushResult {
}
impl Pager {
/// Begins opening a database by reading the database header.
pub fn begin_open(db_file: Arc<dyn DatabaseStorage>) -> Result<Arc<SpinLock<DatabaseHeader>>> {
sqlite3_ondisk::begin_read_database_header(db_file)
}
/// Completes opening a database by initializing the Pager with the database header.
pub fn finish_open(
db_header_ref: Arc<SpinLock<DatabaseHeader>>,
pub fn new(
db_file: Arc<dyn DatabaseStorage>,
wal: Rc<RefCell<dyn Wal>>,
io: Arc<dyn crate::io::IO>,
@@ -255,7 +246,6 @@ impl Pager {
page_cache,
io,
dirty_pages: Rc::new(RefCell::new(HashSet::new())),
db_header: db_header_ref.clone(),
flush_info: RefCell::new(FlushInfo {
state: FlushState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
@@ -268,6 +258,54 @@ impl Pager {
})
}
pub fn set_wal(&mut self, wal: Rc<RefCell<WalFile>>) {
self.wal = wal;
}
pub fn db_header(&self) -> Result<DatabaseHeader> {
// read page 1
let page = self.read_page(1)?;
let mut header = DatabaseHeader::default();
while !page.is_loaded() || page.is_locked() {
// FIXME: LETS STOP DOING THESE SYNCHRONOUS IO HACKS
self.io.run_once()?;
}
let page_inner = page.get();
let page_content = page_inner.contents.as_ref().unwrap();
let buf = page_content.buffer.borrow();
let buf = buf.as_slice();
header.magic.copy_from_slice(&buf[0..16]);
header.page_size = u16::from_be_bytes([buf[16], buf[17]]);
header.write_version = buf[18];
header.read_version = buf[19];
header.reserved_space = buf[20];
header.max_embed_frac = buf[21];
header.min_embed_frac = buf[22];
header.min_leaf_frac = buf[23];
header.change_counter = u32::from_be_bytes([buf[24], buf[25], buf[26], buf[27]]);
header.database_size = u32::from_be_bytes([buf[28], buf[29], buf[30], buf[31]]);
header.freelist_trunk_page = u32::from_be_bytes([buf[32], buf[33], buf[34], buf[35]]);
header.freelist_pages = u32::from_be_bytes([buf[36], buf[37], buf[38], buf[39]]);
header.schema_cookie = u32::from_be_bytes([buf[40], buf[41], buf[42], buf[43]]);
header.schema_format = u32::from_be_bytes([buf[44], buf[45], buf[46], buf[47]]);
header.default_page_cache_size = i32::from_be_bytes([buf[48], buf[49], buf[50], buf[51]]);
if header.default_page_cache_size == 0 {
header.default_page_cache_size = DEFAULT_CACHE_SIZE;
}
header.vacuum_mode_largest_root_page =
u32::from_be_bytes([buf[52], buf[53], buf[54], buf[55]]);
header.text_encoding = u32::from_be_bytes([buf[56], buf[57], buf[58], buf[59]]);
header.user_version = i32::from_be_bytes([buf[60], buf[61], buf[62], buf[63]]);
header.incremental_vacuum_enabled =
u32::from_be_bytes([buf[64], buf[65], buf[66], buf[67]]);
header.application_id = u32::from_be_bytes([buf[68], buf[69], buf[70], buf[71]]);
header.reserved_for_expansion.copy_from_slice(&buf[72..92]);
header.version_valid_for = u32::from_be_bytes([buf[92], buf[93], buf[94], buf[95]]);
header.version_number = u32::from_be_bytes([buf[96], buf[97], buf[98], buf[99]]);
Ok(header)
}
pub fn get_auto_vacuum_mode(&self) -> AutoVacuumMode {
*self.auto_vacuum_mode.borrow()
}
@@ -282,7 +320,7 @@ impl Pager {
#[cfg(not(feature = "omit_autovacuum"))]
pub fn ptrmap_get(&self, target_page_num: u32) -> Result<CursorResult<Option<PtrmapEntry>>> {
tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
let configured_page_size = self.db_header.lock().get_page_size() as usize;
let configured_page_size = self.db_header()?.get_page_size() as usize;
if target_page_num < FIRST_PTRMAP_PAGE_NO
|| is_ptrmap_page(target_page_num, configured_page_size)
@@ -368,7 +406,7 @@ impl Pager {
parent_page_no
);
let page_size = self.db_header.lock().get_page_size() as usize;
let page_size = self.db_header()?.get_page_size() as usize;
if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO
|| is_ptrmap_page(db_page_no_to_update, page_size)
@@ -463,15 +501,13 @@ impl Pager {
Ok(CursorResult::Ok(page_id as u32))
}
AutoVacuumMode::Full => {
let mut root_page_num = self.db_header.lock().vacuum_mode_largest_root_page;
let mut root_page_num = self.db_header()?.vacuum_mode_largest_root_page;
assert!(root_page_num > 0); // Largest root page number cannot be 0 because that is set to 1 when creating the database with autovacuum enabled
root_page_num += 1;
assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); // can never be less than 2 because we have already incremented
while is_ptrmap_page(
root_page_num,
self.db_header.lock().get_page_size() as usize,
) {
while is_ptrmap_page(root_page_num, self.db_header()?.get_page_size() as usize)
{
root_page_num += 1;
}
assert!(root_page_num >= 3); // the very first root page is page 3
@@ -544,7 +580,7 @@ impl Pager {
/// The usable size of a page might be an odd number. However, the usable size is not allowed to be less than 480.
/// In other words, if the page size is 512, then the reserved space size cannot exceed 32.
pub fn usable_space(&self) -> usize {
let db_header = self.db_header.lock();
let db_header = self.db_header().unwrap();
(db_header.get_page_size() - db_header.reserved_space as u32) as usize
}
@@ -576,6 +612,7 @@ impl Pager {
}
/// Reads a page from the database.
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn read_page(&self, page_idx: usize) -> Result<PageRef, LimboError> {
tracing::trace!("read_page(page_idx = {})", page_idx);
let mut page_cache = self.page_cache.write();
@@ -677,7 +714,7 @@ impl Pager {
trace!("cacheflush {:?}", state);
match state {
FlushState::Start => {
let db_size = self.db_header.lock().database_size;
let db_size = self.db_header()?.database_size;
for page_id in self.dirty_pages.borrow().iter() {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(*page_id);
@@ -877,7 +914,7 @@ impl Pager {
const TRUNK_PAGE_NEXT_PAGE_OFFSET: usize = 0; // Offset to next trunk page pointer
const TRUNK_PAGE_LEAF_COUNT_OFFSET: usize = 4; // Offset to leaf count
if page_id < 2 || page_id > self.db_header.lock().database_size as usize {
if page_id < 2 || page_id > self.db_header()?.database_size as usize {
return Err(LimboError::Corrupt(format!(
"Invalid page number {} for free operation",
page_id
@@ -892,9 +929,11 @@ impl Pager {
None => self.read_page(page_id)?,
};
self.db_header.lock().freelist_pages += 1;
let mut header = self.db_header()?;
header.freelist_pages += 1;
self.write_database_header(&header)?;
let trunk_page_id = self.db_header.lock().freelist_trunk_page;
let trunk_page_id = header.freelist_trunk_page;
if trunk_page_id != 0 {
// Add as leaf to current trunk
@@ -932,13 +971,33 @@ impl Pager {
// Zero leaf count
contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0);
// Update page 1 to point to new trunk
self.db_header.lock().freelist_trunk_page = page_id as u32;
let mut header = self.db_header()?;
header.freelist_trunk_page = page_id as u32;
self.write_database_header(&header)?;
// Clear flags
page.clear_uptodate();
page.clear_loaded();
Ok(())
}
pub fn allocate_page1(&self) -> Result<PageRef> {
let default_header = DatabaseHeader::default();
let page = allocate_page(1, &self.buffer_pool, 0);
page.set_dirty();
self.add_dirty(page.get().id);
page.get()
.contents
.as_mut()
.unwrap()
.write_database_header(&default_header);
let page_key = PageCacheKey::new(page.get().id);
let mut cache = self.page_cache.write();
cache.insert(page_key, page.clone()).map_err(|e| {
LimboError::InternalError(format!("Failed to insert page 1 into cache: {:?}", e))
})?;
Ok(page)
}
/*
Gets a new page that increasing the size of the page or uses a free page.
Currently free list pages are not yet supported.
@@ -946,10 +1005,11 @@ impl Pager {
// FIXME: handle no room in page cache
#[allow(clippy::readonly_write_lock)]
pub fn allocate_page(&self) -> Result<PageRef> {
let header = &self.db_header;
let mut header = header.lock();
let mut header = self.db_header()?;
header.database_size += 1;
tracing::debug!("allocate_page(database_size={})", header.database_size);
#[cfg(not(feature = "omit_autovacuum"))]
{
// If the following conditions are met, allocate a pointer map page, add to cache and increment the database size
@@ -1022,7 +1082,7 @@ impl Pager {
}
pub fn usable_size(&self) -> usize {
let db_header = self.db_header.lock();
let db_header = self.db_header().unwrap();
(db_header.get_page_size() - db_header.reserved_space as u32) as usize
}
}
@@ -1307,20 +1367,24 @@ mod ptrmap_tests {
db_header_arc.lock().vacuum_mode_largest_root_page = 1;
// Construct interfaces for the pager
let buffer_pool = Rc::new(BufferPool::new(page_size as usize));
let buffer_pool = Rc::new(BufferPool::new(Some(page_size as usize)));
let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(
(initial_db_pages + 10) as usize,
)));
let wal = Rc::new(RefCell::new(WalFile::new(
io.clone(),
page_size,
WalFileShared::open_shared(&io, "test.db-wal", page_size).unwrap(),
WalFileShared::new_shared(
page_size,
&io,
io.open_file("test.db-wal", OpenFlags::Create, false)
.unwrap(),
)
.unwrap(),
buffer_pool.clone(),
)));
let pager = Pager::finish_open(db_header_arc, db_storage, wal, io, page_cache, buffer_pool)
.unwrap();
let pager = Pager::new(db_storage, wal, io, page_cache, buffer_pool).unwrap();
pager.set_auto_vacuum_mode(AutoVacuumMode::Full);
// Allocate all the pages as btree root pages
@@ -1356,7 +1420,10 @@ mod ptrmap_tests {
assert!(ptrmap_page_ref.is_ok());
// Ensure that the database header size is correctly reflected
assert_eq!(pager.db_header.lock().database_size, initial_db_pages + 2); // (1+1) -> (header + ptrmap)
assert_eq!(
pager.db_header().unwrap().database_size,
initial_db_pages + 2
); // (1+1) -> (header + ptrmap)
// Read the entry from the ptrmap page and verify it
let entry = pager.ptrmap_get(db_page_to_update).unwrap();

View File

@@ -70,7 +70,7 @@ use super::wal::LimboRwLock;
pub const DATABASE_HEADER_SIZE: usize = 100;
// DEFAULT_CACHE_SIZE negative values mean that we store the amount of pages a XKiB of memory can hold.
// We can calculate "real" cache size by diving by page size.
const DEFAULT_CACHE_SIZE: i32 = -2000;
pub const DEFAULT_CACHE_SIZE: i32 = -2000;
// Minimum number of pages that cache can hold.
pub const MIN_PAGE_CACHE_SIZE: usize = 10;
@@ -93,17 +93,17 @@ pub const DATABASE_HEADER_PAGE_ID: usize = 1;
#[derive(Debug, Clone)]
pub struct DatabaseHeader {
/// The header string: "SQLite format 3\0"
magic: [u8; 16],
pub magic: [u8; 16],
/// The database page size in bytes. Must be a power of two between 512 and 32768 inclusive,
/// or the value 1 representing a page size of 65536.
page_size: u16,
pub page_size: u16,
/// File format write version. 1 for legacy; 2 for WAL.
write_version: u8,
pub write_version: u8,
/// File format read version. 1 for legacy; 2 for WAL.
read_version: u8,
pub read_version: u8,
/// Bytes of unused "reserved" space at the end of each page. Usually 0.
/// SQLite has the ability to set aside a small number of extra bytes at the end of every page for use by extensions.
@@ -112,16 +112,16 @@ pub struct DatabaseHeader {
pub reserved_space: u8,
/// Maximum embedded payload fraction. Must be 64.
max_embed_frac: u8,
pub max_embed_frac: u8,
/// Minimum embedded payload fraction. Must be 32.
min_embed_frac: u8,
pub min_embed_frac: u8,
/// Leaf payload fraction. Must be 32.
min_leaf_frac: u8,
pub min_leaf_frac: u8,
/// File change counter, incremented when database is modified.
change_counter: u32,
pub change_counter: u32,
/// Size of the database file in pages. The "in-header database size".
pub database_size: u32,
@@ -136,7 +136,7 @@ pub struct DatabaseHeader {
pub schema_cookie: u32,
/// The schema format number. Supported formats are 1, 2, 3, and 4.
schema_format: u32,
pub schema_format: u32,
/// Default page cache size.
pub default_page_cache_size: i32,
@@ -146,7 +146,7 @@ pub struct DatabaseHeader {
pub vacuum_mode_largest_root_page: u32,
/// The database text encoding. 1=UTF-8, 2=UTF-16le, 3=UTF-16be.
text_encoding: u32,
pub text_encoding: u32,
/// The "user version" as read and set by the user_version pragma.
pub user_version: i32,
@@ -155,13 +155,13 @@ pub struct DatabaseHeader {
pub incremental_vacuum_enabled: u32,
/// The "Application ID" set by PRAGMA application_id.
application_id: u32,
pub application_id: u32,
/// Reserved for expansion. Must be zero.
reserved_for_expansion: [u8; 20],
pub reserved_for_expansion: [u8; 20],
/// The version-valid-for number.
version_valid_for: u32,
pub version_valid_for: u32,
/// SQLITE_VERSION_NUMBER
pub version_number: u32,
@@ -287,60 +287,6 @@ impl DatabaseHeader {
}
}
pub fn begin_read_database_header(
db_file: Arc<dyn DatabaseStorage>,
) -> Result<Arc<SpinLock<DatabaseHeader>>> {
let drop_fn = Rc::new(|_buf| {});
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(RefCell::new(Buffer::allocate(512, drop_fn)));
let result = Arc::new(SpinLock::new(DatabaseHeader::default()));
let header = result.clone();
let complete = Box::new(move |buf: Arc<RefCell<Buffer>>| {
let header = header.clone();
finish_read_database_header(buf, header).unwrap();
});
let c = Completion::Read(ReadCompletion::new(buf, complete));
#[allow(clippy::arc_with_non_send_sync)]
db_file.read_page(DATABASE_HEADER_PAGE_ID, Arc::new(c))?;
Ok(result)
}
fn finish_read_database_header(
buf: Arc<RefCell<Buffer>>,
header: Arc<SpinLock<DatabaseHeader>>,
) -> Result<()> {
let buf = buf.borrow();
let buf = buf.as_slice();
let mut header = header.lock();
header.magic.copy_from_slice(&buf[0..16]);
header.page_size = u16::from_be_bytes([buf[16], buf[17]]);
header.write_version = buf[18];
header.read_version = buf[19];
header.reserved_space = buf[20];
header.max_embed_frac = buf[21];
header.min_embed_frac = buf[22];
header.min_leaf_frac = buf[23];
header.change_counter = u32::from_be_bytes([buf[24], buf[25], buf[26], buf[27]]);
header.database_size = u32::from_be_bytes([buf[28], buf[29], buf[30], buf[31]]);
header.freelist_trunk_page = u32::from_be_bytes([buf[32], buf[33], buf[34], buf[35]]);
header.freelist_pages = u32::from_be_bytes([buf[36], buf[37], buf[38], buf[39]]);
header.schema_cookie = u32::from_be_bytes([buf[40], buf[41], buf[42], buf[43]]);
header.schema_format = u32::from_be_bytes([buf[44], buf[45], buf[46], buf[47]]);
header.default_page_cache_size = i32::from_be_bytes([buf[48], buf[49], buf[50], buf[51]]);
if header.default_page_cache_size == 0 {
header.default_page_cache_size = DEFAULT_CACHE_SIZE;
}
header.vacuum_mode_largest_root_page = u32::from_be_bytes([buf[52], buf[53], buf[54], buf[55]]);
header.text_encoding = u32::from_be_bytes([buf[56], buf[57], buf[58], buf[59]]);
header.user_version = i32::from_be_bytes([buf[60], buf[61], buf[62], buf[63]]);
header.incremental_vacuum_enabled = u32::from_be_bytes([buf[64], buf[65], buf[66], buf[67]]);
header.application_id = u32::from_be_bytes([buf[68], buf[69], buf[70], buf[71]]);
header.reserved_for_expansion.copy_from_slice(&buf[72..92]);
header.version_valid_for = u32::from_be_bytes([buf[92], buf[93], buf[94], buf[95]]);
header.version_number = u32::from_be_bytes([buf[96], buf[97], buf[98], buf[99]]);
Ok(())
}
pub fn write_header_to_buf(buf: &mut [u8], header: &DatabaseHeader) {
buf[0..16].copy_from_slice(&header.magic);
buf[16..18].copy_from_slice(&header.page_size.to_be_bytes());

View File

@@ -394,7 +394,6 @@ pub struct WalFile {
syncing: Rc<Cell<bool>>,
sync_state: Cell<SyncState>,
page_size: u32,
shared: Arc<UnsafeCell<WalFileShared>>,
ongoing_checkpoint: OngoingCheckpoint,
@@ -414,7 +413,7 @@ impl fmt::Debug for WalFile {
f.debug_struct("WalFile")
.field("syncing", &self.syncing.get())
.field("sync_state", &self.sync_state)
.field("page_size", &self.page_size)
.field("page_size", &self.page_size())
.field("shared", &self.shared)
.field("ongoing_checkpoint", &self.ongoing_checkpoint)
.field("checkpoint_threshold", &self.checkpoint_threshold)
@@ -642,7 +641,7 @@ impl Wal for WalFile {
&shared.file,
offset,
&page,
self.page_size as u16,
header.page_size as u16,
db_size,
write_counter,
&header,
@@ -876,7 +875,6 @@ impl Wal for WalFile {
impl WalFile {
pub fn new(
io: Arc<dyn IO>,
page_size: u32,
shared: Arc<UnsafeCell<WalFileShared>>,
buffer_pool: Rc<BufferPool>,
) -> Self {
@@ -894,6 +892,8 @@ impl WalFile {
}
Self {
io,
// default to max frame in WAL, so that when we read schema we can read from WAL too if it's there.
max_frame: unsafe { (*shared.get()).max_frame.load(Ordering::SeqCst) },
shared,
ongoing_checkpoint: OngoingCheckpoint {
page: checkpoint_page,
@@ -903,20 +903,21 @@ impl WalFile {
current_page: 0,
},
checkpoint_threshold: 1000,
page_size,
buffer_pool,
syncing: Rc::new(Cell::new(false)),
sync_state: Cell::new(SyncState::NotSyncing),
max_frame: 0,
min_frame: 0,
max_frame_read_lock_index: 0,
}
}
fn page_size(&self) -> u32 {
self.get_shared().wal_header.lock().page_size
}
fn frame_offset(&self, frame_id: u64) -> usize {
assert!(frame_id > 0, "Frame ID must be 1-based");
let page_size = self.page_size;
let page_offset = (frame_id - 1) * (page_size + WAL_FRAME_HEADER_SIZE as u32) as u64;
let page_offset = (frame_id - 1) * (self.page_size() + WAL_FRAME_HEADER_SIZE as u32) as u64;
let offset = WAL_HEADER_SIZE as u64 + page_offset;
offset as usize
}
@@ -928,13 +929,12 @@ impl WalFile {
}
impl WalFileShared {
pub fn open_shared(
pub fn open_shared_if_exists(
io: &Arc<dyn IO>,
path: &str,
page_size: u32,
) -> Result<Arc<UnsafeCell<WalFileShared>>> {
) -> Result<Option<Arc<UnsafeCell<WalFileShared>>>> {
let file = io.open_file(path, crate::io::OpenFlags::Create, false)?;
let header = if file.size()? > 0 {
if file.size()? > 0 {
let wal_file_shared = sqlite3_ondisk::read_entire_wal_dumb(&file)?;
// TODO: Return a completion instead.
let mut max_loops = 100_000;
@@ -948,39 +948,47 @@ impl WalFileShared {
panic!("WAL file not loaded");
}
}
return Ok(wal_file_shared);
Ok(Some(wal_file_shared))
} else {
let magic = if cfg!(target_endian = "big") {
WAL_MAGIC_BE
} else {
WAL_MAGIC_LE
};
let mut wal_header = WalHeader {
magic,
file_format: 3007000,
page_size,
checkpoint_seq: 0, // TODO implement sequence number
salt_1: io.generate_random_number() as u32,
salt_2: io.generate_random_number() as u32,
checksum_1: 0,
checksum_2: 0,
};
let native = cfg!(target_endian = "big"); // if target_endian is
// already big then we don't care but if isn't, header hasn't yet been
// encoded to big endian, therefore we want to swap bytes to compute this
// checksum.
let checksums = (0, 0);
let checksums = checksum_wal(
&wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes
&wal_header,
checksums,
native, // this is false because we haven't encoded the wal header yet
);
wal_header.checksum_1 = checksums.0;
wal_header.checksum_2 = checksums.1;
sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?;
Arc::new(SpinLock::new(wal_header))
Ok(None)
}
}
pub fn new_shared(
page_size: u32,
io: &Arc<dyn IO>,
file: Arc<dyn File>,
) -> Result<Arc<UnsafeCell<WalFileShared>>> {
let magic = if cfg!(target_endian = "big") {
WAL_MAGIC_BE
} else {
WAL_MAGIC_LE
};
let mut wal_header = WalHeader {
magic,
file_format: 3007000,
page_size,
checkpoint_seq: 0, // TODO implement sequence number
salt_1: io.generate_random_number() as u32,
salt_2: io.generate_random_number() as u32,
checksum_1: 0,
checksum_2: 0,
};
let native = cfg!(target_endian = "big"); // if target_endian is
// already big then we don't care but if isn't, header hasn't yet been
// encoded to big endian, therefore we want to swap bytes to compute this
// checksum.
let checksums = (0, 0);
let checksums = checksum_wal(
&wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes
&wal_header,
checksums,
native, // this is false because we haven't encoded the wal header yet
);
wal_header.checksum_1 = checksums.0;
wal_header.checksum_2 = checksums.1;
sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?;
let header = Arc::new(SpinLock::new(wal_header));
let checksum = {
let checksum = header.lock();
(checksum.checksum_1, checksum.checksum_2)
@@ -1008,4 +1016,8 @@ impl WalFileShared {
};
Ok(Arc::new(UnsafeCell::new(shared)))
}
pub fn page_size(&self) -> u32 {
self.wal_header.lock().page_size
}
}

View File

@@ -33,10 +33,8 @@ pub(crate) mod transaction;
pub(crate) mod update;
mod values;
use crate::fast_lock::SpinLock;
use crate::schema::Schema;
use crate::storage::pager::Pager;
use crate::storage::sqlite3_ondisk::DatabaseHeader;
use crate::translate::delete::translate_delete;
use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts, QueryMode};
use crate::vdbe::Program;
@@ -58,7 +56,6 @@ use update::translate_update;
pub fn translate(
schema: &Schema,
stmt: ast::Stmt,
database_header: Arc<SpinLock<DatabaseHeader>>,
pager: Rc<Pager>,
connection: Arc<Connection>,
syms: &SymbolTable,
@@ -90,7 +87,6 @@ pub fn translate(
schema,
&name,
body.map(|b| *b),
database_header.clone(),
pager,
connection.clone(),
program,
@@ -100,7 +96,7 @@ pub fn translate(
// TODO: bring epilogue here when I can sort out what instructions correspond to a Write or a Read transaction
Ok(program.build(database_header, connection, change_cnt_on))
Ok(program.build(connection, change_cnt_on))
}
// TODO: for now leaving the return value as a Program. But ideally to support nested parsing of arbitraty

View File

@@ -6,10 +6,9 @@ use limbo_sqlite3_parser::ast::{self, Expr};
use std::rc::Rc;
use std::sync::Arc;
use crate::fast_lock::SpinLock;
use crate::schema::Schema;
use crate::storage::pager::AutoVacuumMode;
use crate::storage::sqlite3_ondisk::{DatabaseHeader, MIN_PAGE_CACHE_SIZE};
use crate::storage::sqlite3_ondisk::MIN_PAGE_CACHE_SIZE;
use crate::storage::wal::CheckpointMode;
use crate::util::{normalize_ident, parse_signed_number};
use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts, QueryMode};
@@ -35,7 +34,6 @@ pub fn translate_pragma(
schema: &Schema,
name: &ast::QualifiedName,
body: Option<ast::PragmaBody>,
database_header: Arc<SpinLock<DatabaseHeader>>,
pager: Rc<Pager>,
connection: Arc<crate::Connection>,
mut program: ProgramBuilder,
@@ -61,39 +59,15 @@ pub fn translate_pragma(
match body {
None => {
query_pragma(
pragma,
schema,
None,
database_header.clone(),
pager,
connection,
&mut program,
)?;
query_pragma(pragma, schema, None, pager, connection, &mut program)?;
}
Some(ast::PragmaBody::Equals(value) | ast::PragmaBody::Call(value)) => match pragma {
PragmaName::TableInfo => {
query_pragma(
pragma,
schema,
Some(value),
database_header.clone(),
pager,
connection,
&mut program,
)?;
query_pragma(pragma, schema, Some(value), pager, connection, &mut program)?;
}
_ => {
write = true;
update_pragma(
pragma,
schema,
value,
database_header.clone(),
pager,
connection,
&mut program,
)?;
update_pragma(pragma, schema, value, pager, connection, &mut program)?;
}
},
};
@@ -109,7 +83,6 @@ fn update_pragma(
pragma: PragmaName,
schema: &Schema,
value: ast::Expr,
header: Arc<SpinLock<DatabaseHeader>>,
pager: Rc<Pager>,
connection: Arc<crate::Connection>,
program: &mut ProgramBuilder,
@@ -121,7 +94,7 @@ fn update_pragma(
Value::Float(size) => size as i64,
_ => bail_parse_error!("Invalid value for cache size pragma"),
};
update_cache_size(cache_size, header, pager, connection)?;
update_cache_size(cache_size, pager, connection)?;
Ok(())
}
PragmaName::JournalMode => {
@@ -129,7 +102,6 @@ fn update_pragma(
PragmaName::JournalMode,
schema,
None,
header,
pager,
connection,
program,
@@ -142,7 +114,6 @@ fn update_pragma(
PragmaName::WalCheckpoint,
schema,
Some(value),
header,
pager,
connection,
program,
@@ -154,7 +125,6 @@ fn update_pragma(
PragmaName::PageCount,
schema,
None,
header,
pager,
connection,
program,
@@ -212,9 +182,9 @@ fn update_pragma(
}
};
match auto_vacuum_mode {
0 => update_auto_vacuum_mode(AutoVacuumMode::None, 0, header, pager)?,
1 => update_auto_vacuum_mode(AutoVacuumMode::Full, 1, header, pager)?,
2 => update_auto_vacuum_mode(AutoVacuumMode::Incremental, 1, header, pager)?,
0 => update_auto_vacuum_mode(AutoVacuumMode::None, 0, pager)?,
1 => update_auto_vacuum_mode(AutoVacuumMode::Full, 1, pager)?,
2 => update_auto_vacuum_mode(AutoVacuumMode::Incremental, 1, pager)?,
_ => {
return Err(LimboError::InvalidArgument(
"invalid auto vacuum mode".to_string(),
@@ -254,7 +224,6 @@ fn query_pragma(
pragma: PragmaName,
schema: &Schema,
value: Option<ast::Expr>,
database_header: Arc<SpinLock<DatabaseHeader>>,
pager: Rc<Pager>,
connection: Arc<crate::Connection>,
program: &mut ProgramBuilder,
@@ -371,7 +340,7 @@ fn query_pragma(
program.emit_result_row(register, 1);
}
PragmaName::PageSize => {
program.emit_int(database_header.lock().get_page_size().into(), register);
program.emit_int(pager.db_header()?.get_page_size().into(), register);
program.emit_result_row(register, 1);
program.add_pragma_result_column(pragma.to_string());
}
@@ -402,10 +371,9 @@ fn query_pragma(
fn update_auto_vacuum_mode(
auto_vacuum_mode: AutoVacuumMode,
largest_root_page_number: u32,
header: Arc<SpinLock<DatabaseHeader>>,
pager: Rc<Pager>,
) -> crate::Result<()> {
let mut header_guard = header.lock();
let mut header_guard = pager.db_header()?;
header_guard.vacuum_mode_largest_root_page = largest_root_page_number;
pager.set_auto_vacuum_mode(auto_vacuum_mode);
pager.write_database_header(&header_guard)?;
@@ -414,14 +382,13 @@ fn update_auto_vacuum_mode(
fn update_cache_size(
value: i64,
header: Arc<SpinLock<DatabaseHeader>>,
pager: Rc<Pager>,
connection: Arc<crate::Connection>,
) -> crate::Result<()> {
let mut cache_size_unformatted: i64 = value;
let mut cache_size = if cache_size_unformatted < 0 {
let kb = cache_size_unformatted.abs() * 1024;
let page_size = header.lock().get_page_size();
let page_size = pager.db_header()?.get_page_size();
kb / page_size as i64
} else {
value

View File

@@ -4,11 +4,9 @@ use limbo_sqlite3_parser::ast::{self, TableInternalId};
use tracing::{instrument, Level};
use crate::{
fast_lock::SpinLock,
numeric::Numeric,
parameters::Parameters,
schema::{BTreeTable, Index, PseudoTable, Table},
storage::sqlite3_ondisk::DatabaseHeader,
translate::{
collate::CollationSeq,
emitter::TransactionMode,
@@ -849,12 +847,7 @@ impl ProgramBuilder {
});
}
pub fn build(
mut self,
database_header: Arc<SpinLock<DatabaseHeader>>,
connection: Arc<Connection>,
change_cnt_on: bool,
) -> Program {
pub fn build(mut self, connection: Arc<Connection>, change_cnt_on: bool) -> Program {
self.resolve_labels();
self.parameters.list.dedup();
@@ -866,7 +859,6 @@ impl ProgramBuilder {
.map(|(insn, function, _)| (insn, function))
.collect(),
cursor_ref: self.cursor_ref,
database_header,
comments: self.comments,
connection,
parameters: self.parameters,

View File

@@ -86,7 +86,6 @@ use crate::{
use super::{get_new_rowid, make_record, Program, ProgramState, Register};
use crate::{
bail_constraint_error, must_be_btree_cursor, resolve_ext_path, MvStore, Pager, Result,
DATABASE_VERSION,
};
macro_rules! return_if_io {
@@ -3690,7 +3689,8 @@ pub fn op_function(
}
}
ScalarFunc::SqliteVersion => {
let version_integer: i64 = DATABASE_VERSION.get().unwrap().parse()?;
let header = pager.db_header()?;
let version_integer: i64 = header.version_number as i64;
let version = execute_sqlite_version(version_integer);
state.registers[*dest] = Register::Value(Value::build_text(version));
}
@@ -4895,7 +4895,7 @@ pub fn op_page_count(
// TODO: implement temp databases
todo!("temp databases not implemented yet");
}
let count = pager.db_header.lock().database_size.into();
let count = pager.db_header()?.database_size.into();
state.registers[*dest] = Register::Value(Value::Integer(count));
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
@@ -4972,11 +4972,9 @@ pub fn op_read_cookie(
todo!("temp databases not implemented yet");
}
let cookie_value = match cookie {
Cookie::UserVersion => pager.db_header.lock().user_version.into(),
Cookie::SchemaVersion => pager.db_header.lock().schema_cookie.into(),
Cookie::LargestRootPageNumber => {
pager.db_header.lock().vacuum_mode_largest_root_page.into()
}
Cookie::UserVersion => pager.db_header()?.user_version.into(),
Cookie::SchemaVersion => pager.db_header()?.schema_cookie.into(),
Cookie::LargestRootPageNumber => pager.db_header()?.vacuum_mode_largest_root_page.into(),
cookie => todo!("{cookie:?} is not yet implement for ReadCookie"),
};
state.registers[*dest] = Register::Value(Value::Integer(cookie_value));
@@ -5005,17 +5003,17 @@ pub fn op_set_cookie(
}
match cookie {
Cookie::UserVersion => {
let mut header_guard = pager.db_header.lock();
let mut header_guard = pager.db_header()?;
header_guard.user_version = *value;
pager.write_database_header(&header_guard)?;
}
Cookie::LargestRootPageNumber => {
let mut header_guard = pager.db_header.lock();
let mut header_guard = pager.db_header()?;
header_guard.vacuum_mode_largest_root_page = *value as u32;
pager.write_database_header(&header_guard)?;
}
Cookie::IncrementalVacuum => {
let mut header_guard = pager.db_header.lock();
let mut header_guard = pager.db_header()?;
header_guard.incremental_vacuum_enabled = *value as u32;
pager.write_database_header(&header_guard)?;
}
@@ -5208,19 +5206,20 @@ pub fn op_open_ephemeral(
maybe_init_database_file(&file, &(io.clone() as Arc<dyn IO>))?;
let db_file = Arc::new(FileMemoryStorage::new(file));
let db_header = Pager::begin_open(db_file.clone())?;
let buffer_pool = Rc::new(BufferPool::new(db_header.lock().get_page_size() as usize));
let buffer_pool = Rc::new(BufferPool::new(None));
let page_cache = Arc::new(RwLock::new(DumbLruPageCache::default()));
let pager = Rc::new(Pager::finish_open(
db_header,
let pager = Rc::new(Pager::new(
db_file,
Rc::new(RefCell::new(DummyWAL)),
io,
page_cache,
buffer_pool,
buffer_pool.clone(),
)?);
let header = pager.db_header()?;
buffer_pool.set_page_size(header.get_page_size() as usize);
let flag = if is_table {
&CreateBTreeFlags::new_table()
} else {

View File

@@ -26,14 +26,13 @@ pub mod sorter;
use crate::{
error::LimboError,
fast_lock::SpinLock,
function::{AggFunc, FuncCtx},
storage::{pager::PagerCacheflushStatus, sqlite3_ondisk::SmallVec},
translate::plan::TableReferences,
};
use crate::{
storage::{btree::BTreeCursor, pager::Pager, sqlite3_ondisk::DatabaseHeader},
storage::{btree::BTreeCursor, pager::Pager},
translate::plan::ResultSetColumn,
types::{AggContext, Cursor, CursorResult, ImmutableRecord, Value},
vdbe::{builder::CursorType, insn::Insn},
@@ -354,7 +353,6 @@ pub struct Program {
pub max_registers: usize,
pub insns: Vec<(Insn, InsnFunction)>,
pub cursor_ref: Vec<(Option<CursorKey>, CursorType)>,
pub database_header: Arc<SpinLock<DatabaseHeader>>,
pub comments: Option<Vec<(InsnReference, &'static str)>>,
pub parameters: crate::parameters::Parameters,
pub connection: Arc<Connection>,