diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 923542cdb..5dc8183be 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -393,6 +393,16 @@ impl Connection { Ok(conn.get_auto_commit()) } + + /// Sets the max timeout for the busy handler + pub fn max_busy_timeout(&self, duration: std::time::Duration) -> Result<()> { + let conn = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + conn.max_busy_timeout(duration); + Ok(()) + } } impl Debug for Connection { diff --git a/core/lib.rs b/core/lib.rs index b26ec5657..7d6a703fc 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -486,6 +486,7 @@ impl Database { encryption_cipher_mode: Cell::new(None), sync_mode: Cell::new(SyncMode::Full), data_sync_retry: Cell::new(false), + max_busy_timeout: Cell::new(None), }); self.n_connections .fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -976,6 +977,8 @@ pub struct Connection { encryption_cipher_mode: Cell>, sync_mode: Cell, data_sync_retry: Cell, + /// User defined max Busy timeout duration + max_busy_timeout: Cell>, } impl Drop for Connection { @@ -2097,12 +2100,18 @@ impl Connection { } pager.set_encryption_context(cipher_mode, key) } + + pub fn max_busy_timeout(&self, duration: std::time::Duration) { + self.max_busy_timeout.set(Some(duration)); + } } #[derive(Debug, Default)] struct BusyTimeout { /// Busy timeout instant timeout: Option, + /// Max duration for busy timeout + max_duration: Duration, iteration: usize, } @@ -2122,14 +2131,19 @@ impl BusyTimeout { Duration::from_millis(100), ]; - pub fn timeout(&self) -> Option { - self.timeout + pub fn new(duration: std::time::Duration) -> Self { + Self { + timeout: None, + max_duration: duration, + iteration: 0, + } } - /// Modifies in place the next timeout instant - pub fn next_timeout(&mut self, now: Instant) { + pub fn initiate_timeout(&mut self, now: Instant) { self.iteration = self.iteration.saturating_add(1); - self.timeout = Self::DELAYS.get(self.iteration).map(|delay| now + *delay); + self.timeout = Self::DELAYS + .get(self.iteration) + .map(|delay| now + (*delay).min(self.max_duration)); } } @@ -2147,7 +2161,7 @@ pub struct Statement { /// Flag to show if the statement was busy busy: bool, /// Busy timeout instant - busy_timeout: BusyTimeout, + busy_timeout: Option, } impl Statement { @@ -2164,6 +2178,11 @@ impl Statement { QueryMode::ExplainQueryPlan => (EXPLAIN_QUERY_PLAN_COLUMNS.len(), 0), }; let state = vdbe::ProgramState::new(max_registers, cursor_count); + let busy_timeout = program + .connection + .max_busy_timeout + .get() + .map(BusyTimeout::new); Self { program, state, @@ -2172,7 +2191,7 @@ impl Statement { accesses_db, query_mode, busy: false, - busy_timeout: BusyTimeout::default(), + busy_timeout, } } pub fn get_query_mode(&self) -> QueryMode { @@ -2192,12 +2211,15 @@ impl Statement { } pub fn step(&mut self) -> Result { - if let Some(instant) = self.busy_timeout.timeout() { - let now = self.pager.io.now(); + if let Some(busy_timeout) = self.busy_timeout.as_mut() { + if let Some(instant) = busy_timeout.timeout { + let now = self.pager.io.now(); - if instant > now { - // Yield the query as the timeout has not been reached yet - return Ok(StepResult::IO); + if instant > now { + // Yield the query as the timeout has not been reached yet + return Ok(StepResult::IO); + } + // Timeout ended now continue to query execution } } @@ -2243,10 +2265,13 @@ impl Statement { } if matches!(res, Ok(StepResult::Busy)) { - self.busy_timeout.next_timeout(self.pager.io.now()); - if self.busy_timeout.timeout().is_some() { - // Yield if there is a next timeout - res = Ok(StepResult::IO); + if let Some(busy_timeout) = self.busy_timeout.as_mut() { + busy_timeout.initiate_timeout(self.pager.io.now()); + if busy_timeout.timeout.is_some() { + // Yield instead of busy, as now we will try to wait for the timeout + // before continuing execution + res = Ok(StepResult::IO); + } } } diff --git a/perf/throughput/turso/src/main.rs b/perf/throughput/turso/src/main.rs index 66e2efd9c..01c9cee86 100644 --- a/perf/throughput/turso/src/main.rs +++ b/perf/throughput/turso/src/main.rs @@ -155,6 +155,7 @@ async fn worker_thread( for iteration in 0..iterations { let conn = db.connect()?; + conn.max_busy_timeout(std::time::Duration::from_millis(10))?; let total_inserts = Arc::clone(&total_inserts); let tx_fut = async move { let mut stmt = conn