From 96e4c5d241e42efd3134700b969fe059de637e27 Mon Sep 17 00:00:00 2001 From: TcMits Date: Tue, 16 Sep 2025 18:39:45 +0700 Subject: [PATCH 1/3] fix issue 3144 --- bindings/rust/src/lib.rs | 2 +- core/lib.rs | 132 +++++++++++++----------------- perf/throughput/turso/src/main.rs | 2 +- 3 files changed, 59 insertions(+), 77 deletions(-) diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 39706fdd5..dddb8df01 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -408,7 +408,7 @@ impl Connection { /// 3. Step through query -> returns Busy -> sleep/yield for 2 ms /// 4. Step through query -> returns Busy -> sleep/yield for 2 ms (totaling 5 ms of sleep) /// 5. Step through query -> returns Busy -> return Busy to user - pub fn busy_timeout(&self, duration: Option) -> Result<()> { + pub fn busy_timeout(&self, duration: std::time::Duration) -> Result<()> { let conn = self .inner .lock() diff --git a/core/lib.rs b/core/lib.rs index 2a03ec045..7dd615b00 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -486,7 +486,7 @@ impl Database { encryption_cipher_mode: Cell::new(None), sync_mode: Cell::new(SyncMode::Full), data_sync_retry: Cell::new(false), - busy_timeout: Cell::new(None), + busy_timeout: Cell::new(Duration::new(0, 0)), }); self.n_connections .fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -978,7 +978,8 @@ pub struct Connection { sync_mode: Cell, data_sync_retry: Cell, /// User defined max accumulated Busy timeout duration - busy_timeout: Cell>, + /// Default is 0 (no timeout) + busy_timeout: Cell, } impl Drop for Connection { @@ -2117,22 +2118,16 @@ impl Connection { /// 5. Step through query -> returns Busy -> return Busy to user /// /// This slight api change demonstrated a better throughtput in `perf/throughput/turso` benchmark - pub fn busy_timeout(&self, mut duration: Option) { - duration = duration.filter(|duration| !duration.is_zero()); + pub fn busy_timeout(&self, duration: std::time::Duration) { self.busy_timeout.set(duration); } } -#[derive(Debug, Default)] +#[derive(Debug)] struct BusyTimeout { /// Busy timeout instant - timeout: Option, - /// Max duration of timeout set by Connection - max_duration: Duration, - /// Accumulated duration for busy timeout - /// - /// It will be decremented until it reaches 0, then after that no timeout will be emitted - accum_duration: Duration, + timeout: Instant, + /// Next iteration index for DELAYS iteration: usize, } @@ -2152,30 +2147,48 @@ impl BusyTimeout { Duration::from_millis(100), ]; - pub fn new(duration: std::time::Duration) -> Self { + const TOTALS: [std::time::Duration; 12] = [ + Duration::from_millis(0), + Duration::from_millis(1), + Duration::from_millis(3), + Duration::from_millis(8), + Duration::from_millis(18), + Duration::from_millis(33), + Duration::from_millis(53), + Duration::from_millis(78), + Duration::from_millis(103), + Duration::from_millis(128), + Duration::from_millis(178), + Duration::from_millis(228), + ]; + + pub fn new(now: Instant) -> Self { Self { - timeout: None, - max_duration: duration, + timeout: now, iteration: 0, - accum_duration: duration, } } - pub fn initiate_timeout(&mut self, now: Instant) { - self.timeout = Self::DELAYS.get(self.iteration).and_then(|delay| { - if self.accum_duration.is_zero() { - None - } else { - let new_timeout = now + (*delay).min(self.accum_duration); - self.accum_duration = self.accum_duration.saturating_sub(*delay); - Some(new_timeout) + // implementation of sqliteDefaultBusyCallback + pub fn busy_callback(&mut self, now: Instant, max_duration: Duration) { + let idx = self.iteration.min(11); + let mut delay = Self::DELAYS[idx]; + let mut prior = Self::TOTALS[idx]; + + if self.iteration >= 12 { + prior += delay * (self.iteration as u32 - 11); + } + + if prior + delay > max_duration { + delay = max_duration.saturating_sub(prior); + // no more waiting after this + if delay.is_zero() { + return; } - }); - self.iteration = if self.iteration < Self::DELAYS.len() - 1 { - self.iteration + 1 - } else { - self.iteration - }; + } + + self.iteration += 1; + self.timeout = now + delay; } } @@ -2193,7 +2206,7 @@ pub struct Statement { /// Flag to show if the statement was busy busy: bool, /// Busy timeout instant - busy_timeout: Option, + busy_timeout: BusyTimeout, } impl Statement { @@ -2210,6 +2223,7 @@ impl Statement { QueryMode::ExplainQueryPlan => (EXPLAIN_QUERY_PLAN_COLUMNS.len(), 0), }; let state = vdbe::ProgramState::new(max_registers, cursor_count); + let now = pager.io.now(); Self { program, state, @@ -2218,7 +2232,7 @@ impl Statement { accesses_db, query_mode, busy: false, - busy_timeout: None, + busy_timeout: BusyTimeout::new(now), } } pub fn get_query_mode(&self) -> QueryMode { @@ -2238,16 +2252,9 @@ impl Statement { } pub fn step(&mut self) -> Result { - if let Some(busy_timeout) = self.busy_timeout.as_mut() { - if let Some(timeout) = busy_timeout.timeout { - let now = self.pager.io.now(); - - if now < timeout { - // Yield the query as the timeout has not been reached yet - return Ok(StepResult::IO); - } - // Timeout ended now continue to query execution - } + if self.pager.io.now() < self.busy_timeout.timeout { + // Yield the query as the timeout has not been reached yet + return Ok(StepResult::IO); } let mut res = if !self.accesses_db { @@ -2292,14 +2299,13 @@ impl Statement { } if matches!(res, Ok(StepResult::Busy)) { - self.check_if_busy_handler_set(); - if let Some(busy_timeout) = self.busy_timeout.as_mut() { - busy_timeout.initiate_timeout(self.pager.io.now()); - if busy_timeout.timeout.is_some() { - // Yield instead of busy, as now we will try to wait for the timeout - // before continuing execution - res = Ok(StepResult::IO); - } + let now = self.pager.io.now(); + self.busy_timeout + .busy_callback(now, self.program.connection.busy_timeout.get()); + if now < self.busy_timeout.timeout { + // Yield instead of busy, as now we will try to wait for the timeout + // before continuing execution + res = Ok(StepResult::IO); } } @@ -2473,7 +2479,7 @@ impl Statement { pub fn _reset(&mut self, max_registers: Option, max_cursors: Option) { self.state.reset(max_registers, max_cursors); self.busy = false; - self.check_if_busy_handler_set(); + self.busy_timeout = BusyTimeout::new(self.pager.io.now()); } pub fn row(&self) -> Option<&Row> { @@ -2487,30 +2493,6 @@ impl Statement { pub fn is_busy(&self) -> bool { self.busy } - - /// Checks if the busy handler is set in the connection and sets the handler if needed - fn check_if_busy_handler_set(&mut self) { - let conn_busy_timeout = self - .program - .connection - .busy_timeout - .get() - .map(BusyTimeout::new); - if self.busy_timeout.is_none() { - self.busy_timeout = conn_busy_timeout; - return; - } - if let Some(conn_busy_timeout) = conn_busy_timeout { - let busy_timeout = self - .busy_timeout - .as_mut() - .expect("busy timeout was checked for None above"); - // User changed max duration, so clear previous handler and set a new one - if busy_timeout.max_duration != conn_busy_timeout.max_duration { - *busy_timeout = conn_busy_timeout; - } - } - } } pub type Row = vdbe::Row; diff --git a/perf/throughput/turso/src/main.rs b/perf/throughput/turso/src/main.rs index 61bd35ed0..e881536b5 100644 --- a/perf/throughput/turso/src/main.rs +++ b/perf/throughput/turso/src/main.rs @@ -168,7 +168,7 @@ async fn worker_thread( for iteration in 0..iterations { let conn = db.connect()?; - conn.busy_timeout(Some(timeout))?; + conn.busy_timeout(timeout)?; let total_inserts = Arc::clone(&total_inserts); let tx_fut = async move { let mut stmt = conn From 597314f1cfb647347bc58c5f271c97ecbb6d582d Mon Sep 17 00:00:00 2001 From: TcMits Date: Tue, 16 Sep 2025 19:21:04 +0700 Subject: [PATCH 2/3] perf --- core/lib.rs | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 7dd615b00..201ab2a46 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -2206,7 +2206,7 @@ pub struct Statement { /// Flag to show if the statement was busy busy: bool, /// Busy timeout instant - busy_timeout: BusyTimeout, + busy_timeout: Option, } impl Statement { @@ -2223,7 +2223,6 @@ impl Statement { QueryMode::ExplainQueryPlan => (EXPLAIN_QUERY_PLAN_COLUMNS.len(), 0), }; let state = vdbe::ProgramState::new(max_registers, cursor_count); - let now = pager.io.now(); Self { program, state, @@ -2232,7 +2231,7 @@ impl Statement { accesses_db, query_mode, busy: false, - busy_timeout: BusyTimeout::new(now), + busy_timeout: None, } } pub fn get_query_mode(&self) -> QueryMode { @@ -2252,9 +2251,11 @@ impl Statement { } pub fn step(&mut self) -> Result { - if self.pager.io.now() < self.busy_timeout.timeout { - // Yield the query as the timeout has not been reached yet - return Ok(StepResult::IO); + if let Some(busy_timeout) = self.busy_timeout.as_ref() { + if self.pager.io.now() < busy_timeout.timeout { + // Yield the query as the timeout has not been reached yet + return Ok(StepResult::IO); + } } let mut res = if !self.accesses_db { @@ -2300,11 +2301,20 @@ impl Statement { if matches!(res, Ok(StepResult::Busy)) { let now = self.pager.io.now(); - self.busy_timeout - .busy_callback(now, self.program.connection.busy_timeout.get()); - if now < self.busy_timeout.timeout { - // Yield instead of busy, as now we will try to wait for the timeout - // before continuing execution + let max_duration = self.program.connection.busy_timeout.get(); + self.busy_timeout = match self.busy_timeout.take() { + None => { + let mut result = BusyTimeout::new(now); + result.busy_callback(now, max_duration); + Some(result) + } + Some(mut bt) => { + bt.busy_callback(now, max_duration); + Some(bt) + } + }; + + if now < self.busy_timeout.as_ref().unwrap().timeout { res = Ok(StepResult::IO); } } @@ -2479,7 +2489,7 @@ impl Statement { pub fn _reset(&mut self, max_registers: Option, max_cursors: Option) { self.state.reset(max_registers, max_cursors); self.busy = false; - self.busy_timeout = BusyTimeout::new(self.pager.io.now()); + self.busy_timeout = None; } pub fn row(&self) -> Option<&Row> { From 226dd5cbe0c78c2be1d1078df3320dc5b846a273 Mon Sep 17 00:00:00 2001 From: TcMits Date: Tue, 16 Sep 2025 20:00:04 +0700 Subject: [PATCH 3/3] add commet --- core/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/lib.rs b/core/lib.rs index 201ab2a46..c0fc4bb1e 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -2206,6 +2206,7 @@ pub struct Statement { /// Flag to show if the statement was busy busy: bool, /// Busy timeout instant + /// We need Option here because `io.now()` is not a cheap call busy_timeout: Option, }