mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 08:55:40 +01:00
when pwritev fails, clear the dirty pages
add flag to `clear_page_cache`
This commit is contained in:
11
core/lib.rs
11
core/lib.rs
@@ -488,7 +488,7 @@ impl Database {
|
||||
conn.pragma_update("cipher", format!("'{}'", encryption_opts.cipher))?;
|
||||
conn.pragma_update("hexkey", format!("'{}'", encryption_opts.hexkey))?;
|
||||
// Clear page cache so the header page can be reread from disk and decrypted using the encryption context.
|
||||
pager.clear_page_cache();
|
||||
pager.clear_page_cache(false);
|
||||
}
|
||||
db.with_schema_mut(|schema| {
|
||||
let header_schema_cookie = pager
|
||||
@@ -1515,7 +1515,7 @@ impl Connection {
|
||||
let pager = conn.pager.read();
|
||||
if db.db_state.is_initialized() {
|
||||
// Clear page cache so the header page can be reread from disk and decrypted using the encryption context.
|
||||
pager.clear_page_cache();
|
||||
pager.clear_page_cache(false);
|
||||
}
|
||||
}
|
||||
Ok((io, conn))
|
||||
@@ -1737,11 +1737,6 @@ impl Connection {
|
||||
self.pager.read().cacheflush()
|
||||
}
|
||||
|
||||
pub fn clear_page_cache(&self) -> Result<()> {
|
||||
self.pager.read().clear_page_cache();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn checkpoint(&self, mode: CheckpointMode) -> Result<CheckpointResult> {
|
||||
if self.is_closed() {
|
||||
return Err(LimboError::InternalError("Connection closed".to_string()));
|
||||
@@ -1890,7 +1885,7 @@ impl Connection {
|
||||
shared_wal.enabled.store(false, Ordering::SeqCst);
|
||||
shared_wal.file = None;
|
||||
}
|
||||
self.pager.write().clear_page_cache();
|
||||
self.pager.write().clear_page_cache(false);
|
||||
let pager = self.db.init_pager(Some(size.get() as usize))?;
|
||||
pager.enable_encryption(self.db.opts.enable_encryption);
|
||||
*self.pager.write() = Arc::new(pager);
|
||||
|
||||
@@ -425,11 +425,11 @@ impl PageCache {
|
||||
Err(CacheError::Full)
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) -> Result<(), CacheError> {
|
||||
pub fn clear(&mut self, clear_dirty: bool) -> Result<(), CacheError> {
|
||||
// Check all pages are clean
|
||||
for &entry_ptr in self.map.values() {
|
||||
let entry = unsafe { &*entry_ptr };
|
||||
if entry.page.is_dirty() {
|
||||
if entry.page.is_dirty() && !clear_dirty {
|
||||
return Err(CacheError::Dirty {
|
||||
pgno: entry.page.get().id,
|
||||
});
|
||||
@@ -852,7 +852,7 @@ mod tests {
|
||||
let key1 = insert_page(&mut cache, 1);
|
||||
let key2 = insert_page(&mut cache, 2);
|
||||
|
||||
assert!(cache.clear().is_ok());
|
||||
assert!(cache.clear(false).is_ok());
|
||||
assert!(cache.get(&key1).unwrap().is_none());
|
||||
assert!(cache.get(&key2).unwrap().is_none());
|
||||
assert_eq!(cache.len(), 0);
|
||||
@@ -1141,7 +1141,7 @@ mod tests {
|
||||
cache.insert(key, page).unwrap();
|
||||
}
|
||||
|
||||
cache.clear().unwrap();
|
||||
cache.clear(false).unwrap();
|
||||
drop(cache);
|
||||
}
|
||||
|
||||
@@ -1231,7 +1231,7 @@ mod tests {
|
||||
for i in 1..=3 {
|
||||
let _ = insert_page(&mut c, i);
|
||||
}
|
||||
c.clear().unwrap();
|
||||
c.clear(false).unwrap();
|
||||
// No elements; insert should not rely on stale hand
|
||||
let _ = insert_page(&mut c, 10);
|
||||
|
||||
|
||||
@@ -1119,7 +1119,7 @@ impl Pager {
|
||||
let changed = wal.borrow_mut().begin_read_tx()?;
|
||||
if changed {
|
||||
// Someone else changed the database -> assume our page cache is invalid (this is default SQLite behavior, we can probably do better with more granular invalidation)
|
||||
self.clear_page_cache();
|
||||
self.clear_page_cache(false);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -1800,7 +1800,7 @@ impl Pager {
|
||||
/// Invalidates entire page cache by removing all dirty and clean pages. Usually used in case
|
||||
/// of a rollback or in case we want to invalidate page cache after starting a read transaction
|
||||
/// right after new writes happened which would invalidate current page cache.
|
||||
pub fn clear_page_cache(&self) {
|
||||
pub fn clear_page_cache(&self, clear_dirty: bool) {
|
||||
let dirty_pages = self.dirty_pages.read();
|
||||
let mut cache = self.page_cache.write();
|
||||
for page_id in dirty_pages.iter() {
|
||||
@@ -1809,7 +1809,9 @@ impl Pager {
|
||||
page.clear_dirty();
|
||||
}
|
||||
}
|
||||
cache.clear().expect("Failed to clear page cache");
|
||||
cache
|
||||
.clear(clear_dirty)
|
||||
.expect("Failed to clear page cache");
|
||||
}
|
||||
|
||||
/// Checkpoint in Truncate mode and delete the WAL file. This method is _only_ to be called
|
||||
@@ -1914,7 +1916,7 @@ impl Pager {
|
||||
// TODO: only clear cache of things that are really invalidated
|
||||
self.page_cache
|
||||
.write()
|
||||
.clear()
|
||||
.clear(false)
|
||||
.map_err(|e| LimboError::InternalError(format!("Failed to clear page cache: {e:?}")))?;
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
@@ -2400,7 +2402,7 @@ impl Pager {
|
||||
is_write: bool,
|
||||
) -> Result<(), LimboError> {
|
||||
tracing::debug!(schema_did_change);
|
||||
self.clear_page_cache();
|
||||
self.clear_page_cache(is_write);
|
||||
if is_write {
|
||||
self.dirty_pages.write().clear();
|
||||
} else {
|
||||
@@ -2483,7 +2485,7 @@ impl Pager {
|
||||
// might have been loaded with page 1 to initialise the connection. During initialisation,
|
||||
// we only read the header which is unencrypted, but the rest of the page is. If so, lets
|
||||
// clear the cache.
|
||||
self.clear_page_cache();
|
||||
self.clear_page_cache(false);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -2138,7 +2138,7 @@ pub fn halt(
|
||||
) -> Result<InsnFunctionStepResult> {
|
||||
if err_code > 0 {
|
||||
// invalidate page cache in case of error
|
||||
pager.clear_page_cache();
|
||||
pager.clear_page_cache(false);
|
||||
}
|
||||
match err_code {
|
||||
0 => {}
|
||||
|
||||
Reference in New Issue
Block a user