From d8f07fe3da29f3192703c54aa75f0451906264e7 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Sat, 13 Sep 2025 09:36:49 +0300 Subject: [PATCH] core: Panic on fsync() error by default Retrying fsync() on error was historically not safe ("fsyncgate") and Postgres still defaults to panicing on fsync(). Therefore, add a "data_sync_retry" pragma (disabled by default) and use it to determine whether to panic on fsync() error or not. --- core/lib.rs | 18 +++++++++++++++++- core/pragma.rs | 4 ++++ core/storage/pager.rs | 23 ++++++++++++++++++++--- core/translate/pragma.rs | 24 ++++++++++++++++++++++++ parser/src/ast.rs | 2 ++ 5 files changed, 67 insertions(+), 4 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 471b70095..57726b76e 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -483,6 +483,7 @@ impl Database { encryption_key: RefCell::new(None), encryption_cipher_mode: Cell::new(None), sync_mode: Cell::new(SyncMode::Full), + data_sync_retry: Cell::new(false), }); self.n_connections .fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -928,6 +929,7 @@ pub struct Connection { encryption_key: RefCell>, encryption_cipher_mode: Cell>, sync_mode: Cell, + data_sync_retry: Cell, } impl Drop for Connection { @@ -1448,7 +1450,13 @@ impl Connection { let commit_err = if force_commit { pager .io - .block(|| pager.commit_dirty_pages(true, self.get_sync_mode())) + .block(|| { + pager.commit_dirty_pages( + true, + self.get_sync_mode(), + self.get_data_sync_retry(), + ) + }) .err() } else { None @@ -1996,6 +2004,14 @@ impl Connection { self.sync_mode.set(mode); } + pub fn get_data_sync_retry(&self) -> bool { + self.data_sync_retry.get() + } + + pub fn set_data_sync_retry(&self, value: bool) { + self.data_sync_retry.set(value); + } + /// Creates a HashSet of modules that have been loaded pub fn get_syms_vtab_mods(&self) -> std::collections::HashSet { self.syms.borrow().vtab_modules.keys().cloned().collect() diff --git a/core/pragma.rs b/core/pragma.rs index 5ec791e3a..edcfd21b9 100644 --- a/core/pragma.rs +++ b/core/pragma.rs @@ -46,6 +46,10 @@ pub fn pragma_for(pragma: &PragmaName) -> Pragma { | PragmaFlags::NoColumns1, &["cache_size"], ), + DataSyncRetry => Pragma::new( + PragmaFlags::Result0 | PragmaFlags::NoColumns1, + &["data_sync_retry"], + ), DatabaseList => Pragma::new(PragmaFlags::Result0, &["seq", "name", "file"]), Encoding => Pragma::new( PragmaFlags::Result0 | PragmaFlags::NoColumns1, diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 17a49a69a..052d8d9b3 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1064,7 +1064,8 @@ impl Pager { } let commit_status = return_if_io!(self.commit_dirty_pages( connection.wal_auto_checkpoint_disabled.get(), - connection.get_sync_mode() + connection.get_sync_mode(), + connection.get_data_sync_retry() )); wal.borrow().end_write_tx(); wal.borrow().end_read_tx(); @@ -1317,6 +1318,7 @@ impl Pager { &self, wal_auto_checkpoint_disabled: bool, sync_mode: crate::SyncMode, + data_sync_retry: bool, ) -> Result> { let Some(wal) = self.wal.as_ref() else { return Err(LimboError::InternalError( @@ -1404,7 +1406,14 @@ impl Pager { } CommitState::SyncWal => { self.commit_info.state.set(CommitState::AfterSyncWal); - let c = wal.borrow_mut().sync()?; + let sync_result = wal.borrow_mut().sync(); + let c = match sync_result { + Ok(c) => c, + Err(e) if !data_sync_retry => { + panic!("fsync error (data_sync_retry=off): {e:?}"); + } + Err(e) => return Err(e), + }; if !c.is_completed() { io_yield_one!(c); } @@ -1427,7 +1436,15 @@ impl Pager { } } CommitState::SyncDbFile => { - let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?; + let sync_result = + sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone()); + let c = match sync_result { + Ok(c) => c, + Err(e) if !data_sync_retry => { + panic!("fsync error on database file (data_sync_retry=off): {e:?}"); + } + Err(e) => return Err(e), + }; self.commit_info.state.set(CommitState::AfterSyncDbFile); if !c.is_completed() { io_yield_one!(c); diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 8ed31475e..af172a331 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -350,6 +350,22 @@ fn update_pragma( connection.set_sync_mode(mode); Ok((program, TransactionMode::None)) } + PragmaName::DataSyncRetry => { + let retry_enabled = match value { + Expr::Name(name) => { + let name_bytes = name.as_str().as_bytes(); + match_ignore_ascii_case!(match name_bytes { + b"ON" | b"TRUE" | b"YES" | b"1" => true, + _ => false, + }) + } + Expr::Literal(Literal::Numeric(n)) => !matches!(n.as_str(), "0"), + _ => false, + }; + + connection.set_data_sync_retry(retry_enabled); + Ok((program, TransactionMode::None)) + } } } @@ -641,6 +657,14 @@ fn query_pragma( program.add_pragma_result_column(pragma.to_string()); Ok((program, TransactionMode::None)) } + PragmaName::DataSyncRetry => { + let retry_enabled = connection.get_data_sync_retry(); + let register = program.alloc_register(); + program.emit_int(retry_enabled as i64, register); + program.emit_result_row(register, 1); + program.add_pragma_result_column(pragma.to_string()); + Ok((program, TransactionMode::None)) + } } } diff --git a/parser/src/ast.rs b/parser/src/ast.rs index c1ce16931..3c331107a 100644 --- a/parser/src/ast.rs +++ b/parser/src/ast.rs @@ -1318,6 +1318,8 @@ pub enum PragmaName { #[strum(serialize = "cipher")] #[cfg_attr(feature = "serde", serde(rename = "cipher"))] EncryptionCipher, + /// Control fsync error retry behavior (0 = off/panic, 1 = on/retry) + DataSyncRetry, /// List databases DatabaseList, /// Encoding - only support utf8