From a56680f79e1deb8ad21a6153d5ccbef42c2c782e Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Fri, 12 Sep 2025 12:41:01 -0300 Subject: [PATCH] implement Busy Handler in Turso statements --- core/io/clock.rs | 41 ++++++++++++++++++++++++++++++++++ core/lib.rs | 57 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 97 insertions(+), 1 deletion(-) diff --git a/core/io/clock.rs b/core/io/clock.rs index d0bdfa009..12eefa030 100644 --- a/core/io/clock.rs +++ b/core/io/clock.rs @@ -6,6 +6,10 @@ pub struct Instant { pub micros: u32, } +const NSEC_PER_SEC: u64 = 1_000_000_000; +const NANOS_PER_MICRO: u32 = 1_000; +const MICROS_PER_SEC: u32 = NSEC_PER_SEC as u32 / NANOS_PER_MICRO; + impl Instant { pub fn to_system_time(self) -> SystemTime { if self.secs >= 0 { @@ -24,6 +28,35 @@ impl Instant { } } } + + pub fn checked_add_duration(&self, other: &Duration) -> Option { + let mut secs = self.secs.checked_add_unsigned(other.as_secs())?; + + // Micros calculations can't overflow because micros are <1B which fit + // in a u32. + let mut micros = other.subsec_micros() + self.micros; + if micros >= MICROS_PER_SEC { + micros -= MICROS_PER_SEC; + secs = secs.checked_add(1)?; + } + + Some(Self { secs, micros }) + } + + pub fn checked_sub_duration(&self, other: &Duration) -> Option { + let mut secs = self.secs.checked_sub_unsigned(other.as_secs())?; + + // Similar to above, micros can't overflow. + let mut micros = self.micros as i32 - other.subsec_micros() as i32; + if micros < 0 { + micros += MICROS_PER_SEC as i32; + secs = secs.checked_sub(1)?; + } + Some(Self { + secs, + micros: micros as u32, + }) + } } impl From> for Instant { @@ -35,6 +68,14 @@ impl From> for Instant { } } +impl std::ops::Add for Instant { + type Output = Instant; + + fn add(self, rhs: Duration) -> Self::Output { + self.checked_add_duration(&rhs).unwrap() + } +} + pub trait Clock { fn now(&self) -> Instant; } diff --git a/core/lib.rs b/core/lib.rs index 507316091..b26ec5657 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -75,6 +75,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, LazyLock, Mutex, Weak, }, + time::Duration, }; #[cfg(feature = "fs")] use storage::database::DatabaseFile; @@ -2098,6 +2099,40 @@ impl Connection { } } +#[derive(Debug, Default)] +struct BusyTimeout { + /// Busy timeout instant + timeout: Option, + iteration: usize, +} + +impl BusyTimeout { + const DELAYS: [std::time::Duration; 12] = [ + Duration::from_millis(1), + Duration::from_millis(2), + Duration::from_millis(5), + Duration::from_millis(10), + Duration::from_millis(15), + Duration::from_millis(20), + Duration::from_millis(25), + Duration::from_millis(25), + Duration::from_millis(25), + Duration::from_millis(50), + Duration::from_millis(50), + Duration::from_millis(100), + ]; + + pub fn timeout(&self) -> Option { + self.timeout + } + + /// Modifies in place the next timeout instant + pub fn next_timeout(&mut self, now: Instant) { + self.iteration = self.iteration.saturating_add(1); + self.timeout = Self::DELAYS.get(self.iteration).map(|delay| now + *delay); + } +} + pub struct Statement { program: vdbe::Program, state: vdbe::ProgramState, @@ -2111,6 +2146,8 @@ pub struct Statement { query_mode: QueryMode, /// Flag to show if the statement was busy busy: bool, + /// Busy timeout instant + busy_timeout: BusyTimeout, } impl Statement { @@ -2135,6 +2172,7 @@ impl Statement { accesses_db, query_mode, busy: false, + busy_timeout: BusyTimeout::default(), } } pub fn get_query_mode(&self) -> QueryMode { @@ -2154,7 +2192,16 @@ impl Statement { } pub fn step(&mut self) -> Result { - let res = if !self.accesses_db { + if let Some(instant) = self.busy_timeout.timeout() { + let now = self.pager.io.now(); + + if instant > now { + // Yield the query as the timeout has not been reached yet + return Ok(StepResult::IO); + } + } + + let mut res = if !self.accesses_db { self.program.step( &mut self.state, self.mv_store.clone(), @@ -2195,6 +2242,14 @@ impl Statement { self.busy = true; } + if matches!(res, Ok(StepResult::Busy)) { + self.busy_timeout.next_timeout(self.pager.io.now()); + if self.busy_timeout.timeout().is_some() { + // Yield if there is a next timeout + res = Ok(StepResult::IO); + } + } + res }