fix issue 3144

This commit is contained in:
TcMits
2025-09-16 18:39:45 +07:00
parent 74331898a3
commit 96e4c5d241
3 changed files with 59 additions and 77 deletions

View File

@@ -408,7 +408,7 @@ impl Connection {
/// 3. Step through query -> returns Busy -> sleep/yield for 2 ms
/// 4. Step through query -> returns Busy -> sleep/yield for 2 ms (totaling 5 ms of sleep)
/// 5. Step through query -> returns Busy -> return Busy to user
pub fn busy_timeout(&self, duration: Option<std::time::Duration>) -> Result<()> {
pub fn busy_timeout(&self, duration: std::time::Duration) -> Result<()> {
let conn = self
.inner
.lock()

View File

@@ -486,7 +486,7 @@ impl Database {
encryption_cipher_mode: Cell::new(None),
sync_mode: Cell::new(SyncMode::Full),
data_sync_retry: Cell::new(false),
busy_timeout: Cell::new(None),
busy_timeout: Cell::new(Duration::new(0, 0)),
});
self.n_connections
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
@@ -978,7 +978,8 @@ pub struct Connection {
sync_mode: Cell<SyncMode>,
data_sync_retry: Cell<bool>,
/// User defined max accumulated Busy timeout duration
busy_timeout: Cell<Option<std::time::Duration>>,
/// Default is 0 (no timeout)
busy_timeout: Cell<std::time::Duration>,
}
impl Drop for Connection {
@@ -2117,22 +2118,16 @@ impl Connection {
/// 5. Step through query -> returns Busy -> return Busy to user
///
/// This slight api change demonstrated a better throughtput in `perf/throughput/turso` benchmark
pub fn busy_timeout(&self, mut duration: Option<std::time::Duration>) {
duration = duration.filter(|duration| !duration.is_zero());
pub fn busy_timeout(&self, duration: std::time::Duration) {
self.busy_timeout.set(duration);
}
}
#[derive(Debug, Default)]
#[derive(Debug)]
struct BusyTimeout {
/// Busy timeout instant
timeout: Option<Instant>,
/// Max duration of timeout set by Connection
max_duration: Duration,
/// Accumulated duration for busy timeout
///
/// It will be decremented until it reaches 0, then after that no timeout will be emitted
accum_duration: Duration,
timeout: Instant,
/// Next iteration index for DELAYS
iteration: usize,
}
@@ -2152,30 +2147,48 @@ impl BusyTimeout {
Duration::from_millis(100),
];
pub fn new(duration: std::time::Duration) -> Self {
const TOTALS: [std::time::Duration; 12] = [
Duration::from_millis(0),
Duration::from_millis(1),
Duration::from_millis(3),
Duration::from_millis(8),
Duration::from_millis(18),
Duration::from_millis(33),
Duration::from_millis(53),
Duration::from_millis(78),
Duration::from_millis(103),
Duration::from_millis(128),
Duration::from_millis(178),
Duration::from_millis(228),
];
pub fn new(now: Instant) -> Self {
Self {
timeout: None,
max_duration: duration,
timeout: now,
iteration: 0,
accum_duration: duration,
}
}
pub fn initiate_timeout(&mut self, now: Instant) {
self.timeout = Self::DELAYS.get(self.iteration).and_then(|delay| {
if self.accum_duration.is_zero() {
None
} else {
let new_timeout = now + (*delay).min(self.accum_duration);
self.accum_duration = self.accum_duration.saturating_sub(*delay);
Some(new_timeout)
// implementation of sqliteDefaultBusyCallback
pub fn busy_callback(&mut self, now: Instant, max_duration: Duration) {
let idx = self.iteration.min(11);
let mut delay = Self::DELAYS[idx];
let mut prior = Self::TOTALS[idx];
if self.iteration >= 12 {
prior += delay * (self.iteration as u32 - 11);
}
if prior + delay > max_duration {
delay = max_duration.saturating_sub(prior);
// no more waiting after this
if delay.is_zero() {
return;
}
});
self.iteration = if self.iteration < Self::DELAYS.len() - 1 {
self.iteration + 1
} else {
self.iteration
};
}
self.iteration += 1;
self.timeout = now + delay;
}
}
@@ -2193,7 +2206,7 @@ pub struct Statement {
/// Flag to show if the statement was busy
busy: bool,
/// Busy timeout instant
busy_timeout: Option<BusyTimeout>,
busy_timeout: BusyTimeout,
}
impl Statement {
@@ -2210,6 +2223,7 @@ impl Statement {
QueryMode::ExplainQueryPlan => (EXPLAIN_QUERY_PLAN_COLUMNS.len(), 0),
};
let state = vdbe::ProgramState::new(max_registers, cursor_count);
let now = pager.io.now();
Self {
program,
state,
@@ -2218,7 +2232,7 @@ impl Statement {
accesses_db,
query_mode,
busy: false,
busy_timeout: None,
busy_timeout: BusyTimeout::new(now),
}
}
pub fn get_query_mode(&self) -> QueryMode {
@@ -2238,16 +2252,9 @@ impl Statement {
}
pub fn step(&mut self) -> Result<StepResult> {
if let Some(busy_timeout) = self.busy_timeout.as_mut() {
if let Some(timeout) = busy_timeout.timeout {
let now = self.pager.io.now();
if now < timeout {
// Yield the query as the timeout has not been reached yet
return Ok(StepResult::IO);
}
// Timeout ended now continue to query execution
}
if self.pager.io.now() < self.busy_timeout.timeout {
// Yield the query as the timeout has not been reached yet
return Ok(StepResult::IO);
}
let mut res = if !self.accesses_db {
@@ -2292,14 +2299,13 @@ impl Statement {
}
if matches!(res, Ok(StepResult::Busy)) {
self.check_if_busy_handler_set();
if let Some(busy_timeout) = self.busy_timeout.as_mut() {
busy_timeout.initiate_timeout(self.pager.io.now());
if busy_timeout.timeout.is_some() {
// Yield instead of busy, as now we will try to wait for the timeout
// before continuing execution
res = Ok(StepResult::IO);
}
let now = self.pager.io.now();
self.busy_timeout
.busy_callback(now, self.program.connection.busy_timeout.get());
if now < self.busy_timeout.timeout {
// Yield instead of busy, as now we will try to wait for the timeout
// before continuing execution
res = Ok(StepResult::IO);
}
}
@@ -2473,7 +2479,7 @@ impl Statement {
pub fn _reset(&mut self, max_registers: Option<usize>, max_cursors: Option<usize>) {
self.state.reset(max_registers, max_cursors);
self.busy = false;
self.check_if_busy_handler_set();
self.busy_timeout = BusyTimeout::new(self.pager.io.now());
}
pub fn row(&self) -> Option<&Row> {
@@ -2487,30 +2493,6 @@ impl Statement {
pub fn is_busy(&self) -> bool {
self.busy
}
/// Checks if the busy handler is set in the connection and sets the handler if needed
fn check_if_busy_handler_set(&mut self) {
let conn_busy_timeout = self
.program
.connection
.busy_timeout
.get()
.map(BusyTimeout::new);
if self.busy_timeout.is_none() {
self.busy_timeout = conn_busy_timeout;
return;
}
if let Some(conn_busy_timeout) = conn_busy_timeout {
let busy_timeout = self
.busy_timeout
.as_mut()
.expect("busy timeout was checked for None above");
// User changed max duration, so clear previous handler and set a new one
if busy_timeout.max_duration != conn_busy_timeout.max_duration {
*busy_timeout = conn_busy_timeout;
}
}
}
}
pub type Row = vdbe::Row;

View File

@@ -168,7 +168,7 @@ async fn worker_thread(
for iteration in 0..iterations {
let conn = db.connect()?;
conn.busy_timeout(Some(timeout))?;
conn.busy_timeout(timeout)?;
let total_inserts = Arc::clone(&total_inserts);
let tx_fut = async move {
let mut stmt = conn