From 3d265489dc4fe87843408f6bbb78698ff6ea597b Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Sun, 14 Sep 2025 12:34:28 -0300 Subject: [PATCH] modify semantics of `busy_timeout` to be more on par with sqlite --- bindings/rust/src/lib.rs | 19 +++++-- core/io/clock.rs | 8 +++ core/lib.rs | 89 ++++++++++++++++++++++++------- perf/throughput/turso/src/main.rs | 2 +- 4 files changed, 96 insertions(+), 22 deletions(-) diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 5dc8183be..39706fdd5 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -394,13 +394,26 @@ impl Connection { Ok(conn.get_auto_commit()) } - /// Sets the max timeout for the busy handler - pub fn max_busy_timeout(&self, duration: std::time::Duration) -> Result<()> { + /// Sets maximum total accumuated timeout. If the duration is None or Zero, we unset the busy handler for this Connection + /// + /// This api defers slighty from: https://www.sqlite.org/c3ref/busy_timeout.html + /// + /// Instead of sleeping for linear amount of time specified by the user, + /// we will sleep in phases, until the the total amount of time is reached. + /// This means we first sleep of 1ms, then if we still return busy, we sleep for 2 ms, and repeat until a maximum of 100 ms per phase. + /// + /// Example: + /// 1. Set duration to 5ms + /// 2. Step through query -> returns Busy -> sleep/yield for 1 ms + /// 3. Step through query -> returns Busy -> sleep/yield for 2 ms + /// 4. Step through query -> returns Busy -> sleep/yield for 2 ms (totaling 5 ms of sleep) + /// 5. Step through query -> returns Busy -> return Busy to user + pub fn busy_timeout(&self, duration: Option) -> Result<()> { let conn = self .inner .lock() .map_err(|e| Error::MutexError(e.to_string()))?; - conn.max_busy_timeout(duration); + conn.busy_timeout(duration); Ok(()) } } diff --git a/core/io/clock.rs b/core/io/clock.rs index 12eefa030..d522ac278 100644 --- a/core/io/clock.rs +++ b/core/io/clock.rs @@ -76,6 +76,14 @@ impl std::ops::Add for Instant { } } +impl std::ops::Sub for Instant { + type Output = Instant; + + fn sub(self, rhs: Duration) -> Self::Output { + self.checked_sub_duration(&rhs).unwrap() + } +} + pub trait Clock { fn now(&self) -> Instant; } diff --git a/core/lib.rs b/core/lib.rs index 7d6a703fc..2a03ec045 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -486,7 +486,7 @@ impl Database { encryption_cipher_mode: Cell::new(None), sync_mode: Cell::new(SyncMode::Full), data_sync_retry: Cell::new(false), - max_busy_timeout: Cell::new(None), + busy_timeout: Cell::new(None), }); self.n_connections .fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -977,8 +977,8 @@ pub struct Connection { encryption_cipher_mode: Cell>, sync_mode: Cell, data_sync_retry: Cell, - /// User defined max Busy timeout duration - max_busy_timeout: Cell>, + /// User defined max accumulated Busy timeout duration + busy_timeout: Cell>, } impl Drop for Connection { @@ -2101,8 +2101,25 @@ impl Connection { pager.set_encryption_context(cipher_mode, key) } - pub fn max_busy_timeout(&self, duration: std::time::Duration) { - self.max_busy_timeout.set(Some(duration)); + /// Sets maximum total accumuated timeout. If the duration is None or Zero, we unset the busy handler for this Connection + /// + /// This api defers slighty from: https://www.sqlite.org/c3ref/busy_timeout.html + /// + /// Instead of sleeping for linear amount of time specified by the user, + /// we will sleep in phases, until the the total amount of time is reached. + /// This means we first sleep of 1ms, then if we still return busy, we sleep for 2 ms, and repeat until a maximum of 100 ms per phase. + /// + /// Example: + /// 1. Set duration to 5ms + /// 2. Step through query -> returns Busy -> sleep/yield for 1 ms + /// 3. Step through query -> returns Busy -> sleep/yield for 2 ms + /// 4. Step through query -> returns Busy -> sleep/yield for 2 ms (totaling 5 ms of sleep) + /// 5. Step through query -> returns Busy -> return Busy to user + /// + /// This slight api change demonstrated a better throughtput in `perf/throughput/turso` benchmark + pub fn busy_timeout(&self, mut duration: Option) { + duration = duration.filter(|duration| !duration.is_zero()); + self.busy_timeout.set(duration); } } @@ -2110,8 +2127,12 @@ impl Connection { struct BusyTimeout { /// Busy timeout instant timeout: Option, - /// Max duration for busy timeout + /// Max duration of timeout set by Connection max_duration: Duration, + /// Accumulated duration for busy timeout + /// + /// It will be decremented until it reaches 0, then after that no timeout will be emitted + accum_duration: Duration, iteration: usize, } @@ -2136,14 +2157,25 @@ impl BusyTimeout { timeout: None, max_duration: duration, iteration: 0, + accum_duration: duration, } } pub fn initiate_timeout(&mut self, now: Instant) { - self.iteration = self.iteration.saturating_add(1); - self.timeout = Self::DELAYS - .get(self.iteration) - .map(|delay| now + (*delay).min(self.max_duration)); + self.timeout = Self::DELAYS.get(self.iteration).and_then(|delay| { + if self.accum_duration.is_zero() { + None + } else { + let new_timeout = now + (*delay).min(self.accum_duration); + self.accum_duration = self.accum_duration.saturating_sub(*delay); + Some(new_timeout) + } + }); + self.iteration = if self.iteration < Self::DELAYS.len() - 1 { + self.iteration + 1 + } else { + self.iteration + }; } } @@ -2178,11 +2210,6 @@ impl Statement { QueryMode::ExplainQueryPlan => (EXPLAIN_QUERY_PLAN_COLUMNS.len(), 0), }; let state = vdbe::ProgramState::new(max_registers, cursor_count); - let busy_timeout = program - .connection - .max_busy_timeout - .get() - .map(BusyTimeout::new); Self { program, state, @@ -2191,7 +2218,7 @@ impl Statement { accesses_db, query_mode, busy: false, - busy_timeout, + busy_timeout: None, } } pub fn get_query_mode(&self) -> QueryMode { @@ -2212,10 +2239,10 @@ impl Statement { pub fn step(&mut self) -> Result { if let Some(busy_timeout) = self.busy_timeout.as_mut() { - if let Some(instant) = busy_timeout.timeout { + if let Some(timeout) = busy_timeout.timeout { let now = self.pager.io.now(); - if instant > now { + if now < timeout { // Yield the query as the timeout has not been reached yet return Ok(StepResult::IO); } @@ -2265,6 +2292,7 @@ impl Statement { } if matches!(res, Ok(StepResult::Busy)) { + self.check_if_busy_handler_set(); if let Some(busy_timeout) = self.busy_timeout.as_mut() { busy_timeout.initiate_timeout(self.pager.io.now()); if busy_timeout.timeout.is_some() { @@ -2445,6 +2473,7 @@ impl Statement { pub fn _reset(&mut self, max_registers: Option, max_cursors: Option) { self.state.reset(max_registers, max_cursors); self.busy = false; + self.check_if_busy_handler_set(); } pub fn row(&self) -> Option<&Row> { @@ -2458,6 +2487,30 @@ impl Statement { pub fn is_busy(&self) -> bool { self.busy } + + /// Checks if the busy handler is set in the connection and sets the handler if needed + fn check_if_busy_handler_set(&mut self) { + let conn_busy_timeout = self + .program + .connection + .busy_timeout + .get() + .map(BusyTimeout::new); + if self.busy_timeout.is_none() { + self.busy_timeout = conn_busy_timeout; + return; + } + if let Some(conn_busy_timeout) = conn_busy_timeout { + let busy_timeout = self + .busy_timeout + .as_mut() + .expect("busy timeout was checked for None above"); + // User changed max duration, so clear previous handler and set a new one + if busy_timeout.max_duration != conn_busy_timeout.max_duration { + *busy_timeout = conn_busy_timeout; + } + } + } } pub type Row = vdbe::Row; diff --git a/perf/throughput/turso/src/main.rs b/perf/throughput/turso/src/main.rs index 01c9cee86..34cfc3576 100644 --- a/perf/throughput/turso/src/main.rs +++ b/perf/throughput/turso/src/main.rs @@ -155,7 +155,7 @@ async fn worker_thread( for iteration in 0..iterations { let conn = db.connect()?; - conn.max_busy_timeout(std::time::Duration::from_millis(10))?; + conn.busy_timeout(Some(std::time::Duration::from_millis(10)))?; let total_inserts = Arc::clone(&total_inserts); let tx_fut = async move { let mut stmt = conn