diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 15ae191f7..a5ffacec0 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -408,7 +408,7 @@ impl Connection { /// 3. Step through query -> returns Busy -> sleep/yield for 2 ms /// 4. Step through query -> returns Busy -> sleep/yield for 2 ms (totaling 5 ms of sleep) /// 5. Step through query -> returns Busy -> return Busy to user - pub fn busy_timeout(&self, duration: Option) -> Result<()> { + pub fn busy_timeout(&self, duration: std::time::Duration) -> Result<()> { let conn = self .inner .lock() diff --git a/core/lib.rs b/core/lib.rs index 79999a7bb..f7d0421bb 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -514,7 +514,7 @@ impl Database { encryption_cipher_mode: Cell::new(None), sync_mode: Cell::new(SyncMode::Full), data_sync_retry: Cell::new(false), - busy_timeout: Cell::new(None), + busy_timeout: Cell::new(Duration::new(0, 0)), }); self.n_connections .fetch_add(1, std::sync::atomic::Ordering::SeqCst); @@ -1007,7 +1007,8 @@ pub struct Connection { sync_mode: Cell, data_sync_retry: Cell, /// User defined max accumulated Busy timeout duration - busy_timeout: Cell>, + /// Default is 0 (no timeout) + busy_timeout: Cell, } impl Drop for Connection { @@ -2174,26 +2175,20 @@ impl Connection { /// 5. Step through query -> returns Busy -> return Busy to user /// /// This slight api change demonstrated a better throughtput in `perf/throughput/turso` benchmark - pub fn set_busy_timeout(&self, mut duration: Option) { - duration = duration.filter(|duration| !duration.is_zero()); + pub fn set_busy_timeout(&self, duration: std::time::Duration) { self.busy_timeout.set(duration); } - pub fn get_busy_timeout(&self) -> Option { + pub fn get_busy_timeout(&self) -> std::time::Duration { self.busy_timeout.get() } } -#[derive(Debug, Default)] +#[derive(Debug)] struct BusyTimeout { /// Busy timeout instant - timeout: Option, - /// Max duration of timeout set by Connection - max_duration: Duration, - /// Accumulated duration for busy timeout - /// - /// It will be decremented until it reaches 0, then after that no timeout will be emitted - accum_duration: Duration, + timeout: Instant, + /// Next iteration index for DELAYS iteration: usize, } @@ -2213,30 +2208,48 @@ impl BusyTimeout { Duration::from_millis(100), ]; - pub fn new(duration: std::time::Duration) -> Self { + const TOTALS: [std::time::Duration; 12] = [ + Duration::from_millis(0), + Duration::from_millis(1), + Duration::from_millis(3), + Duration::from_millis(8), + Duration::from_millis(18), + Duration::from_millis(33), + Duration::from_millis(53), + Duration::from_millis(78), + Duration::from_millis(103), + Duration::from_millis(128), + Duration::from_millis(178), + Duration::from_millis(228), + ]; + + pub fn new(now: Instant) -> Self { Self { - timeout: None, - max_duration: duration, + timeout: now, iteration: 0, - accum_duration: duration, } } - pub fn initiate_timeout(&mut self, now: Instant) { - self.timeout = Self::DELAYS.get(self.iteration).and_then(|delay| { - if self.accum_duration.is_zero() { - None - } else { - let new_timeout = now + (*delay).min(self.accum_duration); - self.accum_duration = self.accum_duration.saturating_sub(*delay); - Some(new_timeout) + // implementation of sqliteDefaultBusyCallback + pub fn busy_callback(&mut self, now: Instant, max_duration: Duration) { + let idx = self.iteration.min(11); + let mut delay = Self::DELAYS[idx]; + let mut prior = Self::TOTALS[idx]; + + if self.iteration >= 12 { + prior += delay * (self.iteration as u32 - 11); + } + + if prior + delay > max_duration { + delay = max_duration.saturating_sub(prior); + // no more waiting after this + if delay.is_zero() { + return; } - }); - self.iteration = if self.iteration < Self::DELAYS.len() - 1 { - self.iteration + 1 - } else { - self.iteration - }; + } + + self.iteration += 1; + self.timeout = now + delay; } } @@ -2254,6 +2267,7 @@ pub struct Statement { /// Flag to show if the statement was busy busy: bool, /// Busy timeout instant + /// We need Option here because `io.now()` is not a cheap call busy_timeout: Option, } @@ -2299,15 +2313,10 @@ impl Statement { } pub fn step(&mut self) -> Result { - if let Some(busy_timeout) = self.busy_timeout.as_mut() { - if let Some(timeout) = busy_timeout.timeout { - let now = self.pager.io.now(); - - if now < timeout { - // Yield the query as the timeout has not been reached yet - return Ok(StepResult::IO); - } - // Timeout ended now continue to query execution + if let Some(busy_timeout) = self.busy_timeout.as_ref() { + if self.pager.io.now() < busy_timeout.timeout { + // Yield the query as the timeout has not been reached yet + return Ok(StepResult::IO); } } @@ -2353,14 +2362,22 @@ impl Statement { } if matches!(res, Ok(StepResult::Busy)) { - self.check_if_busy_handler_set(); - if let Some(busy_timeout) = self.busy_timeout.as_mut() { - busy_timeout.initiate_timeout(self.pager.io.now()); - if busy_timeout.timeout.is_some() { - // Yield instead of busy, as now we will try to wait for the timeout - // before continuing execution - res = Ok(StepResult::IO); + let now = self.pager.io.now(); + let max_duration = self.program.connection.busy_timeout.get(); + self.busy_timeout = match self.busy_timeout.take() { + None => { + let mut result = BusyTimeout::new(now); + result.busy_callback(now, max_duration); + Some(result) } + Some(mut bt) => { + bt.busy_callback(now, max_duration); + Some(bt) + } + }; + + if now < self.busy_timeout.as_ref().unwrap().timeout { + res = Ok(StepResult::IO); } } @@ -2546,7 +2563,7 @@ impl Statement { pub fn _reset(&mut self, max_registers: Option, max_cursors: Option) { self.state.reset(max_registers, max_cursors); self.busy = false; - self.check_if_busy_handler_set(); + self.busy_timeout = None; } pub fn row(&self) -> Option<&Row> { @@ -2560,30 +2577,6 @@ impl Statement { pub fn is_busy(&self) -> bool { self.busy } - - /// Checks if the busy handler is set in the connection and sets the handler if needed - fn check_if_busy_handler_set(&mut self) { - let conn_busy_timeout = self - .program - .connection - .busy_timeout - .get() - .map(BusyTimeout::new); - if self.busy_timeout.is_none() { - self.busy_timeout = conn_busy_timeout; - return; - } - if let Some(conn_busy_timeout) = conn_busy_timeout { - let busy_timeout = self - .busy_timeout - .as_mut() - .expect("busy timeout was checked for None above"); - // User changed max duration, so clear previous handler and set a new one - if busy_timeout.max_duration != conn_busy_timeout.max_duration { - *busy_timeout = conn_busy_timeout; - } - } - } } pub type Row = vdbe::Row; diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index fa4274ed3..a50880611 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -118,9 +118,7 @@ fn update_pragma( _ => bail_parse_error!("expected integer, got {:?}", data), }; let busy_timeout_ms = busy_timeout_ms.max(0); - connection.set_busy_timeout(Some(std::time::Duration::from_millis( - busy_timeout_ms as u64, - ))); + connection.set_busy_timeout(std::time::Duration::from_millis(busy_timeout_ms as u64)); Ok((program, TransactionMode::Write)) } PragmaName::CacheSize => { @@ -402,13 +400,7 @@ fn query_pragma( Ok((program, TransactionMode::Read)) } PragmaName::BusyTimeout => { - program.emit_int( - connection - .get_busy_timeout() - .map(|t| t.as_millis() as i64) - .unwrap_or_default(), - register, - ); + program.emit_int(connection.get_busy_timeout().as_millis() as i64, register); program.emit_result_row(register, 1); program.add_pragma_result_column(pragma.to_string()); Ok((program, TransactionMode::None)) diff --git a/perf/encryption/src/main.rs b/perf/encryption/src/main.rs index 7736055c5..0fab31549 100644 --- a/perf/encryption/src/main.rs +++ b/perf/encryption/src/main.rs @@ -352,7 +352,7 @@ async fn worker_thread( .await?; } - conn.busy_timeout(Some(timeout))?; + conn.busy_timeout(timeout)?; let mut insert_stmt = conn .prepare("INSERT INTO test_table (id, data) VALUES (?, ?)") diff --git a/perf/throughput/turso/src/main.rs b/perf/throughput/turso/src/main.rs index d16151aac..409f68f80 100644 --- a/perf/throughput/turso/src/main.rs +++ b/perf/throughput/turso/src/main.rs @@ -195,7 +195,7 @@ async fn worker_thread( for iteration in 0..iterations { let conn = db.connect()?; - conn.busy_timeout(Some(timeout))?; + conn.busy_timeout(timeout)?; let total_inserts = Arc::clone(&total_inserts); let tx_fut = async move { let mut stmt = conn