implement Busy Handler in Turso statements

This commit is contained in:
pedrocarlo
2025-09-12 12:41:01 -03:00
parent 24c50597ad
commit a56680f79e
2 changed files with 97 additions and 1 deletions

View File

@@ -6,6 +6,10 @@ pub struct Instant {
pub micros: u32,
}
const NSEC_PER_SEC: u64 = 1_000_000_000;
const NANOS_PER_MICRO: u32 = 1_000;
const MICROS_PER_SEC: u32 = NSEC_PER_SEC as u32 / NANOS_PER_MICRO;
impl Instant {
pub fn to_system_time(self) -> SystemTime {
if self.secs >= 0 {
@@ -24,6 +28,35 @@ impl Instant {
}
}
}
pub fn checked_add_duration(&self, other: &Duration) -> Option<Instant> {
let mut secs = self.secs.checked_add_unsigned(other.as_secs())?;
// Micros calculations can't overflow because micros are <1B which fit
// in a u32.
let mut micros = other.subsec_micros() + self.micros;
if micros >= MICROS_PER_SEC {
micros -= MICROS_PER_SEC;
secs = secs.checked_add(1)?;
}
Some(Self { secs, micros })
}
pub fn checked_sub_duration(&self, other: &Duration) -> Option<Instant> {
let mut secs = self.secs.checked_sub_unsigned(other.as_secs())?;
// Similar to above, micros can't overflow.
let mut micros = self.micros as i32 - other.subsec_micros() as i32;
if micros < 0 {
micros += MICROS_PER_SEC as i32;
secs = secs.checked_sub(1)?;
}
Some(Self {
secs,
micros: micros as u32,
})
}
}
impl<T: chrono::TimeZone> From<chrono::DateTime<T>> for Instant {
@@ -35,6 +68,14 @@ impl<T: chrono::TimeZone> From<chrono::DateTime<T>> for Instant {
}
}
impl std::ops::Add<Duration> for Instant {
type Output = Instant;
fn add(self, rhs: Duration) -> Self::Output {
self.checked_add_duration(&rhs).unwrap()
}
}
pub trait Clock {
fn now(&self) -> Instant;
}

View File

@@ -75,6 +75,7 @@ use std::{
atomic::{AtomicUsize, Ordering},
Arc, LazyLock, Mutex, Weak,
},
time::Duration,
};
#[cfg(feature = "fs")]
use storage::database::DatabaseFile;
@@ -2098,6 +2099,40 @@ impl Connection {
}
}
#[derive(Debug, Default)]
struct BusyTimeout {
/// Busy timeout instant
timeout: Option<Instant>,
iteration: usize,
}
impl BusyTimeout {
const DELAYS: [std::time::Duration; 12] = [
Duration::from_millis(1),
Duration::from_millis(2),
Duration::from_millis(5),
Duration::from_millis(10),
Duration::from_millis(15),
Duration::from_millis(20),
Duration::from_millis(25),
Duration::from_millis(25),
Duration::from_millis(25),
Duration::from_millis(50),
Duration::from_millis(50),
Duration::from_millis(100),
];
pub fn timeout(&self) -> Option<Instant> {
self.timeout
}
/// Modifies in place the next timeout instant
pub fn next_timeout(&mut self, now: Instant) {
self.iteration = self.iteration.saturating_add(1);
self.timeout = Self::DELAYS.get(self.iteration).map(|delay| now + *delay);
}
}
pub struct Statement {
program: vdbe::Program,
state: vdbe::ProgramState,
@@ -2111,6 +2146,8 @@ pub struct Statement {
query_mode: QueryMode,
/// Flag to show if the statement was busy
busy: bool,
/// Busy timeout instant
busy_timeout: BusyTimeout,
}
impl Statement {
@@ -2135,6 +2172,7 @@ impl Statement {
accesses_db,
query_mode,
busy: false,
busy_timeout: BusyTimeout::default(),
}
}
pub fn get_query_mode(&self) -> QueryMode {
@@ -2154,7 +2192,16 @@ impl Statement {
}
pub fn step(&mut self) -> Result<StepResult> {
let res = if !self.accesses_db {
if let Some(instant) = self.busy_timeout.timeout() {
let now = self.pager.io.now();
if instant > now {
// Yield the query as the timeout has not been reached yet
return Ok(StepResult::IO);
}
}
let mut res = if !self.accesses_db {
self.program.step(
&mut self.state,
self.mv_store.clone(),
@@ -2195,6 +2242,14 @@ impl Statement {
self.busy = true;
}
if matches!(res, Ok(StepResult::Busy)) {
self.busy_timeout.next_timeout(self.pager.io.now());
if self.busy_timeout.timeout().is_some() {
// Yield if there is a next timeout
res = Ok(StepResult::IO);
}
}
res
}