mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-18 06:24:56 +01:00
modify semantics of busy_timeout to be more on par with sqlite
This commit is contained in:
@@ -394,13 +394,26 @@ impl Connection {
|
||||
Ok(conn.get_auto_commit())
|
||||
}
|
||||
|
||||
/// Sets the max timeout for the busy handler
|
||||
pub fn max_busy_timeout(&self, duration: std::time::Duration) -> Result<()> {
|
||||
/// Sets maximum total accumuated timeout. If the duration is None or Zero, we unset the busy handler for this Connection
|
||||
///
|
||||
/// This api defers slighty from: https://www.sqlite.org/c3ref/busy_timeout.html
|
||||
///
|
||||
/// Instead of sleeping for linear amount of time specified by the user,
|
||||
/// we will sleep in phases, until the the total amount of time is reached.
|
||||
/// This means we first sleep of 1ms, then if we still return busy, we sleep for 2 ms, and repeat until a maximum of 100 ms per phase.
|
||||
///
|
||||
/// Example:
|
||||
/// 1. Set duration to 5ms
|
||||
/// 2. Step through query -> returns Busy -> sleep/yield for 1 ms
|
||||
/// 3. Step through query -> returns Busy -> sleep/yield for 2 ms
|
||||
/// 4. Step through query -> returns Busy -> sleep/yield for 2 ms (totaling 5 ms of sleep)
|
||||
/// 5. Step through query -> returns Busy -> return Busy to user
|
||||
pub fn busy_timeout(&self, duration: Option<std::time::Duration>) -> Result<()> {
|
||||
let conn = self
|
||||
.inner
|
||||
.lock()
|
||||
.map_err(|e| Error::MutexError(e.to_string()))?;
|
||||
conn.max_busy_timeout(duration);
|
||||
conn.busy_timeout(duration);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,6 +76,14 @@ impl std::ops::Add<Duration> for Instant {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Sub<Duration> for Instant {
|
||||
type Output = Instant;
|
||||
|
||||
fn sub(self, rhs: Duration) -> Self::Output {
|
||||
self.checked_sub_duration(&rhs).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Clock {
|
||||
fn now(&self) -> Instant;
|
||||
}
|
||||
|
||||
89
core/lib.rs
89
core/lib.rs
@@ -486,7 +486,7 @@ impl Database {
|
||||
encryption_cipher_mode: Cell::new(None),
|
||||
sync_mode: Cell::new(SyncMode::Full),
|
||||
data_sync_retry: Cell::new(false),
|
||||
max_busy_timeout: Cell::new(None),
|
||||
busy_timeout: Cell::new(None),
|
||||
});
|
||||
self.n_connections
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
@@ -977,8 +977,8 @@ pub struct Connection {
|
||||
encryption_cipher_mode: Cell<Option<CipherMode>>,
|
||||
sync_mode: Cell<SyncMode>,
|
||||
data_sync_retry: Cell<bool>,
|
||||
/// User defined max Busy timeout duration
|
||||
max_busy_timeout: Cell<Option<std::time::Duration>>,
|
||||
/// User defined max accumulated Busy timeout duration
|
||||
busy_timeout: Cell<Option<std::time::Duration>>,
|
||||
}
|
||||
|
||||
impl Drop for Connection {
|
||||
@@ -2101,8 +2101,25 @@ impl Connection {
|
||||
pager.set_encryption_context(cipher_mode, key)
|
||||
}
|
||||
|
||||
pub fn max_busy_timeout(&self, duration: std::time::Duration) {
|
||||
self.max_busy_timeout.set(Some(duration));
|
||||
/// Sets maximum total accumuated timeout. If the duration is None or Zero, we unset the busy handler for this Connection
|
||||
///
|
||||
/// This api defers slighty from: https://www.sqlite.org/c3ref/busy_timeout.html
|
||||
///
|
||||
/// Instead of sleeping for linear amount of time specified by the user,
|
||||
/// we will sleep in phases, until the the total amount of time is reached.
|
||||
/// This means we first sleep of 1ms, then if we still return busy, we sleep for 2 ms, and repeat until a maximum of 100 ms per phase.
|
||||
///
|
||||
/// Example:
|
||||
/// 1. Set duration to 5ms
|
||||
/// 2. Step through query -> returns Busy -> sleep/yield for 1 ms
|
||||
/// 3. Step through query -> returns Busy -> sleep/yield for 2 ms
|
||||
/// 4. Step through query -> returns Busy -> sleep/yield for 2 ms (totaling 5 ms of sleep)
|
||||
/// 5. Step through query -> returns Busy -> return Busy to user
|
||||
///
|
||||
/// This slight api change demonstrated a better throughtput in `perf/throughput/turso` benchmark
|
||||
pub fn busy_timeout(&self, mut duration: Option<std::time::Duration>) {
|
||||
duration = duration.filter(|duration| !duration.is_zero());
|
||||
self.busy_timeout.set(duration);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2110,8 +2127,12 @@ impl Connection {
|
||||
struct BusyTimeout {
|
||||
/// Busy timeout instant
|
||||
timeout: Option<Instant>,
|
||||
/// Max duration for busy timeout
|
||||
/// Max duration of timeout set by Connection
|
||||
max_duration: Duration,
|
||||
/// Accumulated duration for busy timeout
|
||||
///
|
||||
/// It will be decremented until it reaches 0, then after that no timeout will be emitted
|
||||
accum_duration: Duration,
|
||||
iteration: usize,
|
||||
}
|
||||
|
||||
@@ -2136,14 +2157,25 @@ impl BusyTimeout {
|
||||
timeout: None,
|
||||
max_duration: duration,
|
||||
iteration: 0,
|
||||
accum_duration: duration,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn initiate_timeout(&mut self, now: Instant) {
|
||||
self.iteration = self.iteration.saturating_add(1);
|
||||
self.timeout = Self::DELAYS
|
||||
.get(self.iteration)
|
||||
.map(|delay| now + (*delay).min(self.max_duration));
|
||||
self.timeout = Self::DELAYS.get(self.iteration).and_then(|delay| {
|
||||
if self.accum_duration.is_zero() {
|
||||
None
|
||||
} else {
|
||||
let new_timeout = now + (*delay).min(self.accum_duration);
|
||||
self.accum_duration = self.accum_duration.saturating_sub(*delay);
|
||||
Some(new_timeout)
|
||||
}
|
||||
});
|
||||
self.iteration = if self.iteration < Self::DELAYS.len() - 1 {
|
||||
self.iteration + 1
|
||||
} else {
|
||||
self.iteration
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2178,11 +2210,6 @@ impl Statement {
|
||||
QueryMode::ExplainQueryPlan => (EXPLAIN_QUERY_PLAN_COLUMNS.len(), 0),
|
||||
};
|
||||
let state = vdbe::ProgramState::new(max_registers, cursor_count);
|
||||
let busy_timeout = program
|
||||
.connection
|
||||
.max_busy_timeout
|
||||
.get()
|
||||
.map(BusyTimeout::new);
|
||||
Self {
|
||||
program,
|
||||
state,
|
||||
@@ -2191,7 +2218,7 @@ impl Statement {
|
||||
accesses_db,
|
||||
query_mode,
|
||||
busy: false,
|
||||
busy_timeout,
|
||||
busy_timeout: None,
|
||||
}
|
||||
}
|
||||
pub fn get_query_mode(&self) -> QueryMode {
|
||||
@@ -2212,10 +2239,10 @@ impl Statement {
|
||||
|
||||
pub fn step(&mut self) -> Result<StepResult> {
|
||||
if let Some(busy_timeout) = self.busy_timeout.as_mut() {
|
||||
if let Some(instant) = busy_timeout.timeout {
|
||||
if let Some(timeout) = busy_timeout.timeout {
|
||||
let now = self.pager.io.now();
|
||||
|
||||
if instant > now {
|
||||
if now < timeout {
|
||||
// Yield the query as the timeout has not been reached yet
|
||||
return Ok(StepResult::IO);
|
||||
}
|
||||
@@ -2265,6 +2292,7 @@ impl Statement {
|
||||
}
|
||||
|
||||
if matches!(res, Ok(StepResult::Busy)) {
|
||||
self.check_if_busy_handler_set();
|
||||
if let Some(busy_timeout) = self.busy_timeout.as_mut() {
|
||||
busy_timeout.initiate_timeout(self.pager.io.now());
|
||||
if busy_timeout.timeout.is_some() {
|
||||
@@ -2445,6 +2473,7 @@ impl Statement {
|
||||
pub fn _reset(&mut self, max_registers: Option<usize>, max_cursors: Option<usize>) {
|
||||
self.state.reset(max_registers, max_cursors);
|
||||
self.busy = false;
|
||||
self.check_if_busy_handler_set();
|
||||
}
|
||||
|
||||
pub fn row(&self) -> Option<&Row> {
|
||||
@@ -2458,6 +2487,30 @@ impl Statement {
|
||||
pub fn is_busy(&self) -> bool {
|
||||
self.busy
|
||||
}
|
||||
|
||||
/// Checks if the busy handler is set in the connection and sets the handler if needed
|
||||
fn check_if_busy_handler_set(&mut self) {
|
||||
let conn_busy_timeout = self
|
||||
.program
|
||||
.connection
|
||||
.busy_timeout
|
||||
.get()
|
||||
.map(BusyTimeout::new);
|
||||
if self.busy_timeout.is_none() {
|
||||
self.busy_timeout = conn_busy_timeout;
|
||||
return;
|
||||
}
|
||||
if let Some(conn_busy_timeout) = conn_busy_timeout {
|
||||
let busy_timeout = self
|
||||
.busy_timeout
|
||||
.as_mut()
|
||||
.expect("busy timeout was checked for None above");
|
||||
// User changed max duration, so clear previous handler and set a new one
|
||||
if busy_timeout.max_duration != conn_busy_timeout.max_duration {
|
||||
*busy_timeout = conn_busy_timeout;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type Row = vdbe::Row;
|
||||
|
||||
@@ -155,7 +155,7 @@ async fn worker_thread(
|
||||
|
||||
for iteration in 0..iterations {
|
||||
let conn = db.connect()?;
|
||||
conn.max_busy_timeout(std::time::Duration::from_millis(10))?;
|
||||
conn.busy_timeout(Some(std::time::Duration::from_millis(10)))?;
|
||||
let total_inserts = Arc::clone(&total_inserts);
|
||||
let tx_fut = async move {
|
||||
let mut stmt = conn
|
||||
|
||||
Reference in New Issue
Block a user