From 911b6791b90b516fb36ef42e47e0c2e5e913e70d Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Thu, 18 Sep 2025 01:23:26 -0300 Subject: [PATCH] when pwritev fails, clear the dirty pages add flag to `clear_page_cache` --- core/lib.rs | 11 +++-------- core/storage/page_cache.rs | 10 +++++----- core/storage/pager.rs | 14 ++++++++------ core/vdbe/execute.rs | 2 +- 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 5e0288670..198371ad9 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -488,7 +488,7 @@ impl Database { conn.pragma_update("cipher", format!("'{}'", encryption_opts.cipher))?; conn.pragma_update("hexkey", format!("'{}'", encryption_opts.hexkey))?; // Clear page cache so the header page can be reread from disk and decrypted using the encryption context. - pager.clear_page_cache(); + pager.clear_page_cache(false); } db.with_schema_mut(|schema| { let header_schema_cookie = pager @@ -1515,7 +1515,7 @@ impl Connection { let pager = conn.pager.read(); if db.db_state.is_initialized() { // Clear page cache so the header page can be reread from disk and decrypted using the encryption context. - pager.clear_page_cache(); + pager.clear_page_cache(false); } } Ok((io, conn)) @@ -1737,11 +1737,6 @@ impl Connection { self.pager.read().cacheflush() } - pub fn clear_page_cache(&self) -> Result<()> { - self.pager.read().clear_page_cache(); - Ok(()) - } - pub fn checkpoint(&self, mode: CheckpointMode) -> Result { if self.is_closed() { return Err(LimboError::InternalError("Connection closed".to_string())); @@ -1890,7 +1885,7 @@ impl Connection { shared_wal.enabled.store(false, Ordering::SeqCst); shared_wal.file = None; } - self.pager.write().clear_page_cache(); + self.pager.write().clear_page_cache(false); let pager = self.db.init_pager(Some(size.get() as usize))?; pager.enable_encryption(self.db.opts.enable_encryption); *self.pager.write() = Arc::new(pager); diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index 396332eb2..25bfe357a 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -425,11 +425,11 @@ impl PageCache { Err(CacheError::Full) } - pub fn clear(&mut self) -> Result<(), CacheError> { + pub fn clear(&mut self, clear_dirty: bool) -> Result<(), CacheError> { // Check all pages are clean for &entry_ptr in self.map.values() { let entry = unsafe { &*entry_ptr }; - if entry.page.is_dirty() { + if entry.page.is_dirty() && !clear_dirty { return Err(CacheError::Dirty { pgno: entry.page.get().id, }); @@ -852,7 +852,7 @@ mod tests { let key1 = insert_page(&mut cache, 1); let key2 = insert_page(&mut cache, 2); - assert!(cache.clear().is_ok()); + assert!(cache.clear(false).is_ok()); assert!(cache.get(&key1).unwrap().is_none()); assert!(cache.get(&key2).unwrap().is_none()); assert_eq!(cache.len(), 0); @@ -1141,7 +1141,7 @@ mod tests { cache.insert(key, page).unwrap(); } - cache.clear().unwrap(); + cache.clear(false).unwrap(); drop(cache); } @@ -1231,7 +1231,7 @@ mod tests { for i in 1..=3 { let _ = insert_page(&mut c, i); } - c.clear().unwrap(); + c.clear(false).unwrap(); // No elements; insert should not rely on stale hand let _ = insert_page(&mut c, 10); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index bf11ef1d3..cb4e4d006 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1119,7 +1119,7 @@ impl Pager { let changed = wal.borrow_mut().begin_read_tx()?; if changed { // Someone else changed the database -> assume our page cache is invalid (this is default SQLite behavior, we can probably do better with more granular invalidation) - self.clear_page_cache(); + self.clear_page_cache(false); } Ok(()) } @@ -1800,7 +1800,7 @@ impl Pager { /// Invalidates entire page cache by removing all dirty and clean pages. Usually used in case /// of a rollback or in case we want to invalidate page cache after starting a read transaction /// right after new writes happened which would invalidate current page cache. - pub fn clear_page_cache(&self) { + pub fn clear_page_cache(&self, clear_dirty: bool) { let dirty_pages = self.dirty_pages.read(); let mut cache = self.page_cache.write(); for page_id in dirty_pages.iter() { @@ -1809,7 +1809,9 @@ impl Pager { page.clear_dirty(); } } - cache.clear().expect("Failed to clear page cache"); + cache + .clear(clear_dirty) + .expect("Failed to clear page cache"); } /// Checkpoint in Truncate mode and delete the WAL file. This method is _only_ to be called @@ -1914,7 +1916,7 @@ impl Pager { // TODO: only clear cache of things that are really invalidated self.page_cache .write() - .clear() + .clear(false) .map_err(|e| LimboError::InternalError(format!("Failed to clear page cache: {e:?}")))?; Ok(IOResult::Done(())) } @@ -2400,7 +2402,7 @@ impl Pager { is_write: bool, ) -> Result<(), LimboError> { tracing::debug!(schema_did_change); - self.clear_page_cache(); + self.clear_page_cache(is_write); if is_write { self.dirty_pages.write().clear(); } else { @@ -2483,7 +2485,7 @@ impl Pager { // might have been loaded with page 1 to initialise the connection. During initialisation, // we only read the header which is unencrypted, but the rest of the page is. If so, lets // clear the cache. - self.clear_page_cache(); + self.clear_page_cache(false); Ok(()) } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 87fbaec0a..03b6fe96a 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2138,7 +2138,7 @@ pub fn halt( ) -> Result { if err_code > 0 { // invalidate page cache in case of error - pager.clear_page_cache(); + pager.clear_page_cache(false); } match err_code { 0 => {}