mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-05 01:04:22 +01:00
Merge 'core: Panic on fsync() error by default' from Pekka Enberg
Retrying fsync() on error was historically not safe ("fsyncgate") and
Postgres still defaults to panicing on fsync(). Therefore, add a
"data_sync_retry" pragma (disabled by default) and use it to determine
whether to panic on fsync() error or not.
Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>
Closes #3077
This commit is contained in:
18
core/lib.rs
18
core/lib.rs
@@ -483,6 +483,7 @@ impl Database {
|
||||
encryption_key: RefCell::new(None),
|
||||
encryption_cipher_mode: Cell::new(None),
|
||||
sync_mode: Cell::new(SyncMode::Full),
|
||||
data_sync_retry: Cell::new(false),
|
||||
});
|
||||
self.n_connections
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
@@ -928,6 +929,7 @@ pub struct Connection {
|
||||
encryption_key: RefCell<Option<EncryptionKey>>,
|
||||
encryption_cipher_mode: Cell<Option<CipherMode>>,
|
||||
sync_mode: Cell<SyncMode>,
|
||||
data_sync_retry: Cell<bool>,
|
||||
}
|
||||
|
||||
impl Drop for Connection {
|
||||
@@ -1448,7 +1450,13 @@ impl Connection {
|
||||
let commit_err = if force_commit {
|
||||
pager
|
||||
.io
|
||||
.block(|| pager.commit_dirty_pages(true, self.get_sync_mode()))
|
||||
.block(|| {
|
||||
pager.commit_dirty_pages(
|
||||
true,
|
||||
self.get_sync_mode(),
|
||||
self.get_data_sync_retry(),
|
||||
)
|
||||
})
|
||||
.err()
|
||||
} else {
|
||||
None
|
||||
@@ -1996,6 +2004,14 @@ impl Connection {
|
||||
self.sync_mode.set(mode);
|
||||
}
|
||||
|
||||
pub fn get_data_sync_retry(&self) -> bool {
|
||||
self.data_sync_retry.get()
|
||||
}
|
||||
|
||||
pub fn set_data_sync_retry(&self, value: bool) {
|
||||
self.data_sync_retry.set(value);
|
||||
}
|
||||
|
||||
/// Creates a HashSet of modules that have been loaded
|
||||
pub fn get_syms_vtab_mods(&self) -> std::collections::HashSet<String> {
|
||||
self.syms.borrow().vtab_modules.keys().cloned().collect()
|
||||
|
||||
@@ -46,6 +46,10 @@ pub fn pragma_for(pragma: &PragmaName) -> Pragma {
|
||||
| PragmaFlags::NoColumns1,
|
||||
&["cache_size"],
|
||||
),
|
||||
DataSyncRetry => Pragma::new(
|
||||
PragmaFlags::Result0 | PragmaFlags::NoColumns1,
|
||||
&["data_sync_retry"],
|
||||
),
|
||||
DatabaseList => Pragma::new(PragmaFlags::Result0, &["seq", "name", "file"]),
|
||||
Encoding => Pragma::new(
|
||||
PragmaFlags::Result0 | PragmaFlags::NoColumns1,
|
||||
|
||||
@@ -1064,7 +1064,8 @@ impl Pager {
|
||||
}
|
||||
let commit_status = return_if_io!(self.commit_dirty_pages(
|
||||
connection.wal_auto_checkpoint_disabled.get(),
|
||||
connection.get_sync_mode()
|
||||
connection.get_sync_mode(),
|
||||
connection.get_data_sync_retry()
|
||||
));
|
||||
wal.borrow().end_write_tx();
|
||||
wal.borrow().end_read_tx();
|
||||
@@ -1317,6 +1318,7 @@ impl Pager {
|
||||
&self,
|
||||
wal_auto_checkpoint_disabled: bool,
|
||||
sync_mode: crate::SyncMode,
|
||||
data_sync_retry: bool,
|
||||
) -> Result<IOResult<PagerCommitResult>> {
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
@@ -1404,7 +1406,14 @@ impl Pager {
|
||||
}
|
||||
CommitState::SyncWal => {
|
||||
self.commit_info.state.set(CommitState::AfterSyncWal);
|
||||
let c = wal.borrow_mut().sync()?;
|
||||
let sync_result = wal.borrow_mut().sync();
|
||||
let c = match sync_result {
|
||||
Ok(c) => c,
|
||||
Err(e) if !data_sync_retry => {
|
||||
panic!("fsync error (data_sync_retry=off): {e:?}");
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
if !c.is_completed() {
|
||||
io_yield_one!(c);
|
||||
}
|
||||
@@ -1427,7 +1436,15 @@ impl Pager {
|
||||
}
|
||||
}
|
||||
CommitState::SyncDbFile => {
|
||||
let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
|
||||
let sync_result =
|
||||
sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone());
|
||||
let c = match sync_result {
|
||||
Ok(c) => c,
|
||||
Err(e) if !data_sync_retry => {
|
||||
panic!("fsync error on database file (data_sync_retry=off): {e:?}");
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
self.commit_info.state.set(CommitState::AfterSyncDbFile);
|
||||
if !c.is_completed() {
|
||||
io_yield_one!(c);
|
||||
|
||||
@@ -350,6 +350,22 @@ fn update_pragma(
|
||||
connection.set_sync_mode(mode);
|
||||
Ok((program, TransactionMode::None))
|
||||
}
|
||||
PragmaName::DataSyncRetry => {
|
||||
let retry_enabled = match value {
|
||||
Expr::Name(name) => {
|
||||
let name_bytes = name.as_str().as_bytes();
|
||||
match_ignore_ascii_case!(match name_bytes {
|
||||
b"ON" | b"TRUE" | b"YES" | b"1" => true,
|
||||
_ => false,
|
||||
})
|
||||
}
|
||||
Expr::Literal(Literal::Numeric(n)) => !matches!(n.as_str(), "0"),
|
||||
_ => false,
|
||||
};
|
||||
|
||||
connection.set_data_sync_retry(retry_enabled);
|
||||
Ok((program, TransactionMode::None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -641,6 +657,14 @@ fn query_pragma(
|
||||
program.add_pragma_result_column(pragma.to_string());
|
||||
Ok((program, TransactionMode::None))
|
||||
}
|
||||
PragmaName::DataSyncRetry => {
|
||||
let retry_enabled = connection.get_data_sync_retry();
|
||||
let register = program.alloc_register();
|
||||
program.emit_int(retry_enabled as i64, register);
|
||||
program.emit_result_row(register, 1);
|
||||
program.add_pragma_result_column(pragma.to_string());
|
||||
Ok((program, TransactionMode::None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1318,6 +1318,8 @@ pub enum PragmaName {
|
||||
#[strum(serialize = "cipher")]
|
||||
#[cfg_attr(feature = "serde", serde(rename = "cipher"))]
|
||||
EncryptionCipher,
|
||||
/// Control fsync error retry behavior (0 = off/panic, 1 = on/retry)
|
||||
DataSyncRetry,
|
||||
/// List databases
|
||||
DatabaseList,
|
||||
/// Encoding - only support utf8
|
||||
|
||||
Reference in New Issue
Block a user