mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 00:45:37 +01:00
pager: make WAL optional again and remove DummyWAL
This commit is contained in:
15
core/lib.rs
15
core/lib.rs
@@ -41,7 +41,6 @@ mod numeric;
|
||||
#[global_allocator]
|
||||
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
|
||||
|
||||
use crate::storage::wal::DummyWAL;
|
||||
use crate::translate::optimizer::optimize_plan;
|
||||
use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME;
|
||||
#[cfg(feature = "fs")]
|
||||
@@ -397,7 +396,7 @@ impl Database {
|
||||
)));
|
||||
let pager = Pager::new(
|
||||
self.db_file.clone(),
|
||||
wal,
|
||||
Some(wal),
|
||||
self.io.clone(),
|
||||
Arc::new(RwLock::new(DumbLruPageCache::default())),
|
||||
buffer_pool.clone(),
|
||||
@@ -409,12 +408,10 @@ impl Database {
|
||||
|
||||
let buffer_pool = Arc::new(BufferPool::new(page_size));
|
||||
// No existing WAL; create one.
|
||||
// TODO: currently Pager needs to be instantiated with some implementation of trait Wal, so here's a workaround.
|
||||
let dummy_wal = Rc::new(RefCell::new(DummyWAL {}));
|
||||
let db_state = self.db_state.clone();
|
||||
let mut pager = Pager::new(
|
||||
self.db_file.clone(),
|
||||
dummy_wal,
|
||||
None,
|
||||
self.io.clone(),
|
||||
Arc::new(RwLock::new(DumbLruPageCache::default())),
|
||||
buffer_pool.clone(),
|
||||
@@ -1184,8 +1181,14 @@ impl Connection {
|
||||
{
|
||||
let pager = self.pager.borrow();
|
||||
|
||||
let Some(wal) = pager.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
"wal_insert_end called without a wal".to_string(),
|
||||
));
|
||||
};
|
||||
|
||||
{
|
||||
let wal = pager.wal.borrow_mut();
|
||||
let wal = wal.borrow_mut();
|
||||
wal.end_write_tx();
|
||||
wal.end_read_tx();
|
||||
}
|
||||
|
||||
@@ -8323,7 +8323,7 @@ mod tests {
|
||||
let pager = Rc::new(
|
||||
Pager::new(
|
||||
db_file,
|
||||
wal,
|
||||
Some(wal),
|
||||
io,
|
||||
Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(10))),
|
||||
buffer_pool,
|
||||
|
||||
@@ -318,7 +318,8 @@ pub struct Pager {
|
||||
/// Source of the database pages.
|
||||
pub db_file: Arc<dyn DatabaseStorage>,
|
||||
/// The write-ahead log (WAL) for the database.
|
||||
pub(crate) wal: Rc<RefCell<dyn Wal>>,
|
||||
/// in-memory databases, ephemeral tables and ephemeral indexes do not have a WAL.
|
||||
pub(crate) wal: Option<Rc<RefCell<dyn Wal>>>,
|
||||
/// A page cache for the database.
|
||||
page_cache: Arc<RwLock<DumbLruPageCache>>,
|
||||
/// Buffer pool for temporary data storage.
|
||||
@@ -410,7 +411,7 @@ enum FreePageState {
|
||||
impl Pager {
|
||||
pub fn new(
|
||||
db_file: Arc<dyn DatabaseStorage>,
|
||||
wal: Rc<RefCell<dyn Wal>>,
|
||||
wal: Option<Rc<RefCell<dyn Wal>>>,
|
||||
io: Arc<dyn crate::io::IO>,
|
||||
page_cache: Arc<RwLock<DumbLruPageCache>>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
@@ -456,7 +457,7 @@ impl Pager {
|
||||
}
|
||||
|
||||
pub fn set_wal(&mut self, wal: Rc<RefCell<dyn Wal>>) {
|
||||
self.wal = wal;
|
||||
self.wal = Some(wal);
|
||||
}
|
||||
|
||||
pub fn get_auto_vacuum_mode(&self) -> AutoVacuumMode {
|
||||
@@ -763,7 +764,10 @@ impl Pager {
|
||||
#[inline(always)]
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn begin_read_tx(&self) -> Result<LimboResult> {
|
||||
let (result, changed) = self.wal.borrow_mut().begin_read_tx()?;
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Ok(LimboResult::Ok);
|
||||
};
|
||||
let (result, changed) = wal.borrow_mut().begin_read_tx()?;
|
||||
if changed {
|
||||
// Someone else changed the database -> assume our page cache is invalid (this is default SQLite behavior, we can probably do better with more granular invalidation)
|
||||
self.clear_page_cache();
|
||||
@@ -802,7 +806,10 @@ impl Pager {
|
||||
IOResult::Done(_) => {}
|
||||
IOResult::IO => return Ok(IOResult::IO),
|
||||
}
|
||||
Ok(IOResult::Done(self.wal.borrow_mut().begin_write_tx()?))
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Ok(IOResult::Done(LimboResult::Ok));
|
||||
};
|
||||
Ok(IOResult::Done(wal.borrow_mut().begin_write_tx()?))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
@@ -814,15 +821,19 @@ impl Pager {
|
||||
wal_checkpoint_disabled: bool,
|
||||
) -> Result<IOResult<PagerCommitResult>> {
|
||||
tracing::trace!("end_tx(rollback={})", rollback);
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
// TODO: Unsure what the semantics of "end_tx" is for in-memory databases, ephemeral tables and ephemeral indexes.
|
||||
return Ok(IOResult::Done(PagerCommitResult::Rollback));
|
||||
};
|
||||
if rollback {
|
||||
let is_write = matches!(
|
||||
connection.transaction_state.get(),
|
||||
TransactionState::Write { .. }
|
||||
);
|
||||
if is_write {
|
||||
self.wal.borrow().end_write_tx();
|
||||
wal.borrow().end_write_tx();
|
||||
}
|
||||
self.wal.borrow().end_read_tx();
|
||||
wal.borrow().end_read_tx();
|
||||
self.rollback(schema_did_change, connection, is_write)?;
|
||||
return Ok(IOResult::Done(PagerCommitResult::Rollback));
|
||||
}
|
||||
@@ -830,8 +841,8 @@ impl Pager {
|
||||
match commit_status {
|
||||
IOResult::IO => Ok(IOResult::IO),
|
||||
IOResult::Done(_) => {
|
||||
self.wal.borrow().end_write_tx();
|
||||
self.wal.borrow().end_read_tx();
|
||||
wal.borrow().end_write_tx();
|
||||
wal.borrow().end_read_tx();
|
||||
|
||||
if schema_did_change {
|
||||
let schema = connection.schema.borrow().clone();
|
||||
@@ -844,7 +855,10 @@ impl Pager {
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn end_read_tx(&self) -> Result<()> {
|
||||
self.wal.borrow().end_read_tx();
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Ok(());
|
||||
};
|
||||
wal.borrow().end_read_tx();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -862,35 +876,46 @@ impl Pager {
|
||||
let page = Arc::new(Page::new(page_idx));
|
||||
page.set_locked();
|
||||
|
||||
if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? {
|
||||
let c =
|
||||
self.wal
|
||||
.borrow()
|
||||
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
|
||||
page.set_uptodate();
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
let c = self.begin_read_disk_page(page_idx, page.clone())?;
|
||||
self.cache_insert(page_idx, page.clone(), &mut page_cache)?;
|
||||
return Ok((page, c));
|
||||
};
|
||||
|
||||
if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64)? {
|
||||
let c = wal
|
||||
.borrow()
|
||||
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
|
||||
{
|
||||
page.set_uptodate();
|
||||
}
|
||||
// TODO(pere) should probably first insert to page cache, and if successful,
|
||||
// read frame or page
|
||||
match page_cache.insert(page_key, page.clone()) {
|
||||
Ok(_) => {}
|
||||
Err(CacheError::Full) => return Err(LimboError::CacheFull),
|
||||
Err(CacheError::KeyExists) => {
|
||||
unreachable!("Page should not exist in cache after get() miss")
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Failed to insert page into cache: {e:?}"
|
||||
)))
|
||||
}
|
||||
}
|
||||
self.cache_insert(page_idx, page.clone(), &mut page_cache)?;
|
||||
return Ok((page, c));
|
||||
}
|
||||
|
||||
let c = sqlite3_ondisk::begin_read_page(
|
||||
let c = self.begin_read_disk_page(page_idx, page.clone())?;
|
||||
self.cache_insert(page_idx, page.clone(), &mut page_cache)?;
|
||||
Ok((page, c))
|
||||
}
|
||||
|
||||
fn begin_read_disk_page(&self, page_idx: usize, page: PageRef) -> Result<Completion> {
|
||||
sqlite3_ondisk::begin_read_page(
|
||||
self.db_file.clone(),
|
||||
self.buffer_pool.clone(),
|
||||
page.clone(),
|
||||
page,
|
||||
page_idx,
|
||||
)?;
|
||||
)
|
||||
}
|
||||
|
||||
fn cache_insert(
|
||||
&self,
|
||||
page_idx: usize,
|
||||
page: PageRef,
|
||||
page_cache: &mut DumbLruPageCache,
|
||||
) -> Result<()> {
|
||||
let page_key = PageCacheKey::new(page_idx);
|
||||
match page_cache.insert(page_key, page.clone()) {
|
||||
Ok(_) => {}
|
||||
Err(CacheError::Full) => return Err(LimboError::CacheFull),
|
||||
@@ -903,7 +928,7 @@ impl Pager {
|
||||
)))
|
||||
}
|
||||
}
|
||||
Ok((page, c))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Get a page from the cache, if it exists.
|
||||
@@ -928,13 +953,25 @@ impl Pager {
|
||||
}
|
||||
|
||||
pub fn wal_frame_count(&self) -> Result<u64> {
|
||||
Ok(self.wal.borrow().get_max_frame_in_wal())
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
"wal_frame_count() called on database without WAL".to_string(),
|
||||
));
|
||||
};
|
||||
Ok(wal.borrow().get_max_frame_in_wal())
|
||||
}
|
||||
|
||||
/// Flush all dirty pages to disk.
|
||||
/// Unlike commit_dirty_pages, this function does not commit, checkpoint now sync the WAL/Database.
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn cacheflush(&self) -> Result<IOResult<()>> {
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
// TODO: when ephemeral table spills to disk, it should cacheflush pages directly to the temporary database file.
|
||||
// This handling is not yet implemented, but it should be when spilling is implemented.
|
||||
return Err(LimboError::InternalError(
|
||||
"cacheflush() called on database without WAL".to_string(),
|
||||
));
|
||||
};
|
||||
let state = self.flush_info.borrow().state;
|
||||
trace!(?state);
|
||||
match state {
|
||||
@@ -973,7 +1010,7 @@ impl Pager {
|
||||
page
|
||||
};
|
||||
|
||||
let _c = self.wal.borrow_mut().append_frame(
|
||||
let _c = wal.borrow_mut().append_frame(
|
||||
page.clone(),
|
||||
0,
|
||||
self.flush_info.borrow().in_flight_writes.clone(),
|
||||
@@ -1032,6 +1069,11 @@ impl Pager {
|
||||
&self,
|
||||
wal_checkpoint_disabled: bool,
|
||||
) -> Result<IOResult<PagerCommitResult>> {
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
"commit_dirty_pages() called on database without WAL".to_string(),
|
||||
));
|
||||
};
|
||||
let mut checkpoint_result = CheckpointResult::default();
|
||||
let res = loop {
|
||||
let state = self.commit_info.borrow().state;
|
||||
@@ -1088,7 +1130,7 @@ impl Pager {
|
||||
0
|
||||
}
|
||||
};
|
||||
let _c = self.wal.borrow_mut().append_frame(
|
||||
let _c = wal.borrow_mut().append_frame(
|
||||
page.clone(),
|
||||
db_size,
|
||||
self.commit_info.borrow().in_flight_writes.clone(),
|
||||
@@ -1142,9 +1184,9 @@ impl Pager {
|
||||
}
|
||||
}
|
||||
CommitState::SyncWal => {
|
||||
return_if_io!(self.wal.borrow_mut().sync());
|
||||
return_if_io!(wal.borrow_mut().sync());
|
||||
|
||||
if wal_checkpoint_disabled || !self.wal.borrow().should_checkpoint() {
|
||||
if wal_checkpoint_disabled || !wal.borrow().should_checkpoint() {
|
||||
self.commit_info.borrow_mut().state = CommitState::Start;
|
||||
break PagerCommitResult::WalWritten;
|
||||
}
|
||||
@@ -1170,19 +1212,29 @@ impl Pager {
|
||||
}
|
||||
};
|
||||
// We should only signal that we finished appenind frames after wal sync to avoid inconsistencies when sync fails
|
||||
self.wal.borrow_mut().finish_append_frames_commit()?;
|
||||
wal.borrow_mut().finish_append_frames_commit()?;
|
||||
Ok(IOResult::Done(res))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<Completion> {
|
||||
let wal = self.wal.borrow();
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
"wal_get_frame() called on database without WAL".to_string(),
|
||||
));
|
||||
};
|
||||
let wal = wal.borrow();
|
||||
wal.read_frame_raw(frame_no.into(), frame)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<WalInsertInfo> {
|
||||
let mut wal = self.wal.borrow_mut();
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
"wal_insert_frame() called on database without WAL".to_string(),
|
||||
));
|
||||
};
|
||||
let mut wal = wal.borrow_mut();
|
||||
let (header, raw_page) = parse_wal_frame_header(frame);
|
||||
wal.write_frame_raw(
|
||||
self.buffer_pool.clone(),
|
||||
@@ -1217,6 +1269,11 @@ impl Pager {
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG, name = "pager_checkpoint",)]
|
||||
pub fn checkpoint(&self) -> Result<IOResult<CheckpointResult>> {
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
"checkpoint() called on database without WAL".to_string(),
|
||||
));
|
||||
};
|
||||
let mut checkpoint_result = CheckpointResult::default();
|
||||
loop {
|
||||
let state = *self.checkpoint_state.borrow();
|
||||
@@ -1224,11 +1281,10 @@ impl Pager {
|
||||
match state {
|
||||
CheckpointState::Checkpoint => {
|
||||
let in_flight = self.checkpoint_inflight.clone();
|
||||
match self.wal.borrow_mut().checkpoint(
|
||||
self,
|
||||
in_flight,
|
||||
CheckpointMode::Passive,
|
||||
)? {
|
||||
match wal
|
||||
.borrow_mut()
|
||||
.checkpoint(self, in_flight, CheckpointMode::Passive)?
|
||||
{
|
||||
IOResult::IO => return Ok(IOResult::IO),
|
||||
IOResult::Done(res) => {
|
||||
checkpoint_result = res;
|
||||
@@ -1277,7 +1333,12 @@ impl Pager {
|
||||
pub fn checkpoint_shutdown(&self, wal_checkpoint_disabled: bool) -> Result<()> {
|
||||
let mut _attempts = 0;
|
||||
{
|
||||
let mut wal = self.wal.borrow_mut();
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
"checkpoint_shutdown() called on database without WAL".to_string(),
|
||||
));
|
||||
};
|
||||
let mut wal = wal.borrow_mut();
|
||||
// fsync the wal syncronously before beginning checkpoint
|
||||
while let Ok(IOResult::IO) = wal.sync() {
|
||||
// TODO: for now forget about timeouts as they fail regularly in SIM
|
||||
@@ -1302,14 +1363,18 @@ impl Pager {
|
||||
wal_checkpoint_disabled: bool,
|
||||
mode: CheckpointMode,
|
||||
) -> Result<CheckpointResult> {
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
"wal_checkpoint() called on database without WAL".to_string(),
|
||||
));
|
||||
};
|
||||
if wal_checkpoint_disabled {
|
||||
return Ok(CheckpointResult::default());
|
||||
}
|
||||
|
||||
let write_counter = Rc::new(RefCell::new(0));
|
||||
let mut checkpoint_result = self.io.block(|| {
|
||||
self.wal
|
||||
.borrow_mut()
|
||||
wal.borrow_mut()
|
||||
.checkpoint(self, write_counter.clone(), mode)
|
||||
})?;
|
||||
|
||||
@@ -1823,7 +1888,9 @@ impl Pager {
|
||||
connection.schema.replace(connection._db.clone_schema()?);
|
||||
}
|
||||
if is_write {
|
||||
self.wal.borrow_mut().rollback()?;
|
||||
if let Some(wal) = self.wal.as_ref() {
|
||||
wal.borrow_mut().rollback()?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -2166,7 +2233,7 @@ mod ptrmap_tests {
|
||||
|
||||
let pager = Pager::new(
|
||||
db_file,
|
||||
wal,
|
||||
Some(wal),
|
||||
io,
|
||||
page_cache,
|
||||
buffer_pool,
|
||||
|
||||
@@ -280,106 +280,6 @@ pub trait Wal {
|
||||
fn as_any(&self) -> &dyn std::any::Any;
|
||||
}
|
||||
|
||||
/// A dummy WAL implementation that does nothing.
|
||||
/// This is used for ephemeral indexes where a WAL is not really
|
||||
/// needed, and is preferable to passing an Option<dyn Wal> around
|
||||
/// everywhere.
|
||||
pub struct DummyWAL;
|
||||
|
||||
impl Wal for DummyWAL {
|
||||
fn begin_read_tx(&mut self) -> Result<(LimboResult, bool)> {
|
||||
Ok((LimboResult::Ok, false))
|
||||
}
|
||||
|
||||
fn end_read_tx(&self) {}
|
||||
|
||||
fn begin_write_tx(&mut self) -> Result<LimboResult> {
|
||||
Ok(LimboResult::Ok)
|
||||
}
|
||||
|
||||
fn end_write_tx(&self) {}
|
||||
|
||||
fn find_frame(&self, _page_id: u64) -> Result<Option<u64>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn read_frame(
|
||||
&self,
|
||||
_frame_id: u64,
|
||||
_page: crate::PageRef,
|
||||
_buffer_pool: Arc<BufferPool>,
|
||||
) -> Result<Completion> {
|
||||
// Dummy completion
|
||||
Ok(Completion::new_write(|_| {}))
|
||||
}
|
||||
|
||||
fn read_frame_raw(&self, _frame_id: u64, _frame: &mut [u8]) -> Result<Completion> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
fn write_frame_raw(
|
||||
&mut self,
|
||||
_buffer_pool: Arc<BufferPool>,
|
||||
_frame_id: u64,
|
||||
_page_id: u64,
|
||||
_db_size: u64,
|
||||
_page: &[u8],
|
||||
) -> Result<()> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
fn append_frame(
|
||||
&mut self,
|
||||
_page: crate::PageRef,
|
||||
_db_size: u32,
|
||||
_write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<Completion> {
|
||||
Ok(Completion::new_write(|_| {}))
|
||||
}
|
||||
|
||||
fn should_checkpoint(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn checkpoint(
|
||||
&mut self,
|
||||
_pager: &Pager,
|
||||
_write_counter: Rc<RefCell<usize>>,
|
||||
_mode: crate::CheckpointMode,
|
||||
) -> Result<IOResult<CheckpointResult>> {
|
||||
Ok(IOResult::Done(CheckpointResult::default()))
|
||||
}
|
||||
|
||||
fn sync(&mut self) -> Result<IOResult<()>> {
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
|
||||
fn get_max_frame_in_wal(&self) -> u64 {
|
||||
0
|
||||
}
|
||||
|
||||
fn get_max_frame(&self) -> u64 {
|
||||
0
|
||||
}
|
||||
|
||||
fn get_min_frame(&self) -> u64 {
|
||||
0
|
||||
}
|
||||
|
||||
fn finish_append_frames_commit(&mut self) -> Result<()> {
|
||||
tracing::trace!("finish_append_frames_commit_dumb");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rollback(&mut self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
#[cfg(debug_assertions)]
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
// Syncing requires a state machine because we need to schedule a sync and then wait until it is
|
||||
// finished. If we don't wait there will be undefined behaviour that no one wants to debug.
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
@@ -2031,7 +1931,7 @@ pub mod test {
|
||||
}
|
||||
let pager = conn.pager.borrow_mut();
|
||||
let _ = pager.cacheflush();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
|
||||
let stat = std::fs::metadata(&walpath).unwrap();
|
||||
let meta_before = std::fs::metadata(&walpath).unwrap();
|
||||
@@ -2149,7 +2049,7 @@ pub mod test {
|
||||
// but NOT truncate the file.
|
||||
{
|
||||
let pager = conn.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let res = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart);
|
||||
assert_eq!(res.num_wal_frames, mx_before);
|
||||
assert_eq!(res.num_checkpointed_frames, mx_before);
|
||||
@@ -2196,6 +2096,8 @@ pub mod test {
|
||||
conn.pager
|
||||
.borrow_mut()
|
||||
.wal
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.borrow_mut()
|
||||
.finish_append_frames_commit()
|
||||
.unwrap();
|
||||
@@ -2222,7 +2124,7 @@ pub mod test {
|
||||
// Force a read transaction that will freeze a lower read mark
|
||||
let readmark = {
|
||||
let pager = conn2.pager.borrow_mut();
|
||||
let mut wal2 = pager.wal.borrow_mut();
|
||||
let mut wal2 = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
assert!(matches!(wal2.begin_read_tx().unwrap().0, LimboResult::Ok));
|
||||
wal2.get_max_frame()
|
||||
};
|
||||
@@ -2236,7 +2138,7 @@ pub mod test {
|
||||
// Run passive checkpoint, expect partial
|
||||
let (res1, max_before) = {
|
||||
let pager = conn1.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let res = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive);
|
||||
let maxf = unsafe {
|
||||
(&*db.maybe_shared_wal.read().as_ref().unwrap().get())
|
||||
@@ -2259,13 +2161,13 @@ pub mod test {
|
||||
// Release reader
|
||||
{
|
||||
let pager = conn2.pager.borrow_mut();
|
||||
let wal2 = pager.wal.borrow_mut();
|
||||
let wal2 = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
wal2.end_read_tx();
|
||||
}
|
||||
|
||||
// Second passive checkpoint should finish
|
||||
let pager = conn1.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let res2 = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive);
|
||||
assert_eq!(
|
||||
res2.num_checkpointed_frames, res2.num_wal_frames,
|
||||
@@ -2284,6 +2186,8 @@ pub mod test {
|
||||
.pager
|
||||
.borrow_mut()
|
||||
.wal
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.borrow_mut()
|
||||
.begin_read_tx()
|
||||
.unwrap();
|
||||
@@ -2291,7 +2195,7 @@ pub mod test {
|
||||
// checkpoint should succeed here because the wal is fully checkpointed (empty)
|
||||
// so the reader is using readmark0 to read directly from the db file.
|
||||
let p = conn1.pager.borrow();
|
||||
let mut w = p.wal.borrow_mut();
|
||||
let mut w = p.wal.as_ref().unwrap().borrow_mut();
|
||||
loop {
|
||||
match w.checkpoint(&p, Rc::new(RefCell::new(0)), CheckpointMode::Restart) {
|
||||
Ok(IOResult::IO) => {
|
||||
@@ -2320,7 +2224,7 @@ pub mod test {
|
||||
// now that we have some frames to checkpoint, try again
|
||||
conn2.pager.borrow_mut().begin_read_tx().unwrap();
|
||||
let p = conn1.pager.borrow();
|
||||
let mut w = p.wal.borrow_mut();
|
||||
let mut w = p.wal.as_ref().unwrap().borrow_mut();
|
||||
loop {
|
||||
match w.checkpoint(&p, Rc::new(RefCell::new(0)), CheckpointMode::Restart) {
|
||||
Ok(IOResult::IO) => {
|
||||
@@ -2352,7 +2256,7 @@ pub mod test {
|
||||
// Checkpoint with restart
|
||||
{
|
||||
let pager = conn.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let result = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart);
|
||||
assert!(result.everything_backfilled());
|
||||
}
|
||||
@@ -2395,7 +2299,7 @@ pub mod test {
|
||||
// R1 starts reading
|
||||
let r1_max_frame = {
|
||||
let pager = conn_r1.pager.borrow_mut();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
assert!(matches!(wal.begin_read_tx().unwrap().0, LimboResult::Ok));
|
||||
wal.get_max_frame()
|
||||
};
|
||||
@@ -2404,7 +2308,7 @@ pub mod test {
|
||||
// R2 starts reading, sees more frames than R1
|
||||
let r2_max_frame = {
|
||||
let pager = conn_r2.pager.borrow_mut();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
assert!(matches!(wal.begin_read_tx().unwrap().0, LimboResult::Ok));
|
||||
wal.get_max_frame()
|
||||
};
|
||||
@@ -2412,7 +2316,7 @@ pub mod test {
|
||||
// try passive checkpoint, should only checkpoint up to R1's position
|
||||
let checkpoint_result = {
|
||||
let pager = conn_writer.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive)
|
||||
};
|
||||
|
||||
@@ -2427,7 +2331,14 @@ pub mod test {
|
||||
|
||||
// Verify R2 still sees its frames
|
||||
assert_eq!(
|
||||
conn_r2.pager.borrow().wal.borrow().get_max_frame(),
|
||||
conn_r2
|
||||
.pager
|
||||
.borrow()
|
||||
.wal
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.borrow()
|
||||
.get_max_frame(),
|
||||
r2_max_frame,
|
||||
"Reader should maintain its snapshot"
|
||||
);
|
||||
@@ -2448,7 +2359,7 @@ pub mod test {
|
||||
|
||||
{
|
||||
let pager = conn.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let _result = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive);
|
||||
}
|
||||
|
||||
@@ -2479,7 +2390,7 @@ pub mod test {
|
||||
// start a write transaction
|
||||
{
|
||||
let pager = conn2.pager.borrow_mut();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let _ = wal.begin_read_tx().unwrap();
|
||||
let res = wal.begin_write_tx().unwrap();
|
||||
assert!(matches!(res, LimboResult::Ok), "result: {res:?}");
|
||||
@@ -2488,7 +2399,7 @@ pub mod test {
|
||||
// should fail because writer lock is held
|
||||
let result = {
|
||||
let pager = conn1.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Restart)
|
||||
};
|
||||
|
||||
@@ -2497,14 +2408,28 @@ pub mod test {
|
||||
"Restart checkpoint should fail when write lock is held"
|
||||
);
|
||||
|
||||
conn2.pager.borrow().wal.borrow().end_read_tx();
|
||||
conn2
|
||||
.pager
|
||||
.borrow()
|
||||
.wal
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.borrow_mut()
|
||||
.end_read_tx();
|
||||
// release write lock
|
||||
conn2.pager.borrow().wal.borrow().end_write_tx();
|
||||
conn2
|
||||
.pager
|
||||
.borrow()
|
||||
.wal
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.borrow_mut()
|
||||
.end_write_tx();
|
||||
|
||||
// now restart should succeed
|
||||
let result = {
|
||||
let pager = conn1.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart)
|
||||
};
|
||||
|
||||
@@ -2522,13 +2447,13 @@ pub mod test {
|
||||
|
||||
// Attempt to start a write transaction without a read transaction
|
||||
let pager = conn.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let _ = wal.begin_write_tx();
|
||||
}
|
||||
|
||||
fn check_read_lock_slot(conn: &Arc<Connection>, expected_slot: usize) -> bool {
|
||||
let pager = conn.pager.borrow();
|
||||
let wal = pager.wal.borrow();
|
||||
let wal = pager.wal.as_ref().unwrap().borrow();
|
||||
let wal_any = wal.as_any();
|
||||
if let Some(wal_file) = wal_any.downcast_ref::<WalFile>() {
|
||||
return wal_file.max_frame_read_lock_index.get() == expected_slot;
|
||||
@@ -2549,7 +2474,14 @@ pub mod test {
|
||||
conn.execute("BEGIN").unwrap();
|
||||
let mut stmt = conn.prepare("SELECT * FROM test").unwrap();
|
||||
stmt.step().unwrap();
|
||||
let frame = conn.pager.borrow().wal.borrow().get_max_frame();
|
||||
let frame = conn
|
||||
.pager
|
||||
.borrow()
|
||||
.wal
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.borrow()
|
||||
.get_max_frame();
|
||||
(frame, stmt)
|
||||
}
|
||||
|
||||
@@ -2573,7 +2505,7 @@ pub mod test {
|
||||
// passive checkpoint #1
|
||||
let result1 = {
|
||||
let pager = conn_writer.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive)
|
||||
};
|
||||
assert_eq!(result1.num_checkpointed_frames, r1_frame);
|
||||
@@ -2584,7 +2516,7 @@ pub mod test {
|
||||
// passive checkpoint #2
|
||||
let result2 = {
|
||||
let pager = conn_writer.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive)
|
||||
};
|
||||
assert_eq!(
|
||||
@@ -2630,7 +2562,7 @@ pub mod test {
|
||||
// Do a TRUNCATE checkpoint
|
||||
{
|
||||
let pager = conn.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Truncate);
|
||||
}
|
||||
|
||||
@@ -2685,7 +2617,7 @@ pub mod test {
|
||||
// Do a TRUNCATE checkpoint
|
||||
{
|
||||
let pager = conn.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Truncate);
|
||||
}
|
||||
|
||||
@@ -2717,7 +2649,7 @@ pub mod test {
|
||||
assert_eq!(hdr.checkpoint_seq, 1, "invalid checkpoint_seq");
|
||||
{
|
||||
let pager = conn.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive);
|
||||
}
|
||||
// delete the WAL file so we can read right from db and assert
|
||||
@@ -2761,7 +2693,7 @@ pub mod test {
|
||||
// Start a read transaction on conn2
|
||||
{
|
||||
let pager = conn2.pager.borrow_mut();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let (res, _) = wal.begin_read_tx().unwrap();
|
||||
assert!(matches!(res, LimboResult::Ok));
|
||||
}
|
||||
@@ -2770,7 +2702,7 @@ pub mod test {
|
||||
// Try to start a write transaction on conn2 with a stale snapshot
|
||||
let result = {
|
||||
let pager = conn2.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
wal.begin_write_tx()
|
||||
};
|
||||
// Should get Busy due to stale snapshot
|
||||
@@ -2779,7 +2711,7 @@ pub mod test {
|
||||
// End read transaction and start a fresh one
|
||||
{
|
||||
let pager = conn2.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
wal.end_read_tx();
|
||||
let (res, _) = wal.begin_read_tx().unwrap();
|
||||
assert!(matches!(res, LimboResult::Ok));
|
||||
@@ -2787,7 +2719,7 @@ pub mod test {
|
||||
// Now write transaction should work
|
||||
let result = {
|
||||
let pager = conn2.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
wal.begin_write_tx()
|
||||
};
|
||||
assert!(matches!(result.unwrap(), LimboResult::Ok));
|
||||
@@ -2806,14 +2738,14 @@ pub mod test {
|
||||
// Do a full checkpoint to move all data to DB file
|
||||
{
|
||||
let pager = conn1.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive);
|
||||
}
|
||||
|
||||
// Start a read transaction on conn2
|
||||
{
|
||||
let pager = conn2.pager.borrow_mut();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let (res, _) = wal.begin_read_tx().unwrap();
|
||||
assert!(matches!(res, LimboResult::Ok));
|
||||
}
|
||||
@@ -2821,7 +2753,7 @@ pub mod test {
|
||||
assert!(check_read_lock_slot(&conn2, 0));
|
||||
{
|
||||
let pager = conn1.pager.borrow();
|
||||
let wal = pager.wal.borrow();
|
||||
let wal = pager.wal.as_ref().unwrap().borrow();
|
||||
let frame = wal.find_frame(5);
|
||||
// since we hold readlock0, we should ignore the db file and find_frame should return none
|
||||
assert!(frame.is_ok_and(|f| f.is_none()));
|
||||
@@ -2829,7 +2761,7 @@ pub mod test {
|
||||
// Try checkpoint, should fail because reader has slot 0
|
||||
{
|
||||
let pager = conn1.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let result = wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Restart);
|
||||
|
||||
assert!(
|
||||
@@ -2840,12 +2772,12 @@ pub mod test {
|
||||
// End the read transaction
|
||||
{
|
||||
let pager = conn2.pager.borrow();
|
||||
let wal = pager.wal.borrow();
|
||||
let wal = pager.wal.as_ref().unwrap().borrow();
|
||||
wal.end_read_tx();
|
||||
}
|
||||
{
|
||||
let pager = conn1.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let result = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart);
|
||||
assert!(
|
||||
result.everything_backfilled(),
|
||||
|
||||
@@ -6,7 +6,6 @@ use crate::storage::database::DatabaseFile;
|
||||
use crate::storage::page_cache::DumbLruPageCache;
|
||||
use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState};
|
||||
use crate::storage::sqlite3_ondisk::read_varint;
|
||||
use crate::storage::wal::DummyWAL;
|
||||
use crate::translate::collate::CollationSeq;
|
||||
use crate::types::{
|
||||
compare_immutable, compare_records_generic, Extendable, ImmutableRecord, RawSlice, SeekResult,
|
||||
@@ -28,7 +27,6 @@ use crate::{
|
||||
},
|
||||
printf::exec_printf,
|
||||
},
|
||||
IO,
|
||||
};
|
||||
use std::env::temp_dir;
|
||||
use std::ops::DerefMut;
|
||||
@@ -6388,14 +6386,17 @@ pub fn op_open_ephemeral(
|
||||
let file = io.open_file(rand_path_str, OpenFlags::Create, false)?;
|
||||
let db_file = Arc::new(DatabaseFile::new(file));
|
||||
|
||||
let buffer_pool = Arc::new(BufferPool::new(Some(header_accessor::get_page_size(
|
||||
&conn.pager.borrow(),
|
||||
)? as usize)));
|
||||
let page_size = pager
|
||||
.io
|
||||
.block(|| pager.with_header(|header| header.page_size))?
|
||||
.get();
|
||||
|
||||
let buffer_pool = Arc::new(BufferPool::new(Some(page_size as usize)));
|
||||
let page_cache = Arc::new(RwLock::new(DumbLruPageCache::default()));
|
||||
|
||||
let pager = Rc::new(Pager::new(
|
||||
db_file,
|
||||
Rc::new(RefCell::new(DummyWAL)),
|
||||
None,
|
||||
io,
|
||||
page_cache,
|
||||
buffer_pool.clone(),
|
||||
|
||||
Reference in New Issue
Block a user