mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-04 00:44:19 +01:00
Merge 'Fix busy handler' from Lâm Hoàng Phúc
@penberg i think it fixed #3144, but i got locked database error ```sh Running write throughput benchmark with 5 threads, 1000 batch size, 1000 iterations, mode: Legacy Database created at: write_throughput_test.db Thread error 0: SQL execution failure: `database is locked` Thread 1: 1000000 inserts in 514.45s (1943.82 inserts/sec) Error: SqlExecutionFailure("database is locked") ``` Closes #3147
This commit is contained in:
@@ -408,7 +408,7 @@ impl Connection {
|
||||
/// 3. Step through query -> returns Busy -> sleep/yield for 2 ms
|
||||
/// 4. Step through query -> returns Busy -> sleep/yield for 2 ms (totaling 5 ms of sleep)
|
||||
/// 5. Step through query -> returns Busy -> return Busy to user
|
||||
pub fn busy_timeout(&self, duration: Option<std::time::Duration>) -> Result<()> {
|
||||
pub fn busy_timeout(&self, duration: std::time::Duration) -> Result<()> {
|
||||
let conn = self
|
||||
.inner
|
||||
.lock()
|
||||
|
||||
137
core/lib.rs
137
core/lib.rs
@@ -514,7 +514,7 @@ impl Database {
|
||||
encryption_cipher_mode: Cell::new(None),
|
||||
sync_mode: Cell::new(SyncMode::Full),
|
||||
data_sync_retry: Cell::new(false),
|
||||
busy_timeout: Cell::new(None),
|
||||
busy_timeout: Cell::new(Duration::new(0, 0)),
|
||||
});
|
||||
self.n_connections
|
||||
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||
@@ -1007,7 +1007,8 @@ pub struct Connection {
|
||||
sync_mode: Cell<SyncMode>,
|
||||
data_sync_retry: Cell<bool>,
|
||||
/// User defined max accumulated Busy timeout duration
|
||||
busy_timeout: Cell<Option<std::time::Duration>>,
|
||||
/// Default is 0 (no timeout)
|
||||
busy_timeout: Cell<std::time::Duration>,
|
||||
}
|
||||
|
||||
impl Drop for Connection {
|
||||
@@ -2174,26 +2175,20 @@ impl Connection {
|
||||
/// 5. Step through query -> returns Busy -> return Busy to user
|
||||
///
|
||||
/// This slight api change demonstrated a better throughtput in `perf/throughput/turso` benchmark
|
||||
pub fn set_busy_timeout(&self, mut duration: Option<std::time::Duration>) {
|
||||
duration = duration.filter(|duration| !duration.is_zero());
|
||||
pub fn set_busy_timeout(&self, duration: std::time::Duration) {
|
||||
self.busy_timeout.set(duration);
|
||||
}
|
||||
|
||||
pub fn get_busy_timeout(&self) -> Option<std::time::Duration> {
|
||||
pub fn get_busy_timeout(&self) -> std::time::Duration {
|
||||
self.busy_timeout.get()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Debug)]
|
||||
struct BusyTimeout {
|
||||
/// Busy timeout instant
|
||||
timeout: Option<Instant>,
|
||||
/// Max duration of timeout set by Connection
|
||||
max_duration: Duration,
|
||||
/// Accumulated duration for busy timeout
|
||||
///
|
||||
/// It will be decremented until it reaches 0, then after that no timeout will be emitted
|
||||
accum_duration: Duration,
|
||||
timeout: Instant,
|
||||
/// Next iteration index for DELAYS
|
||||
iteration: usize,
|
||||
}
|
||||
|
||||
@@ -2213,30 +2208,48 @@ impl BusyTimeout {
|
||||
Duration::from_millis(100),
|
||||
];
|
||||
|
||||
pub fn new(duration: std::time::Duration) -> Self {
|
||||
const TOTALS: [std::time::Duration; 12] = [
|
||||
Duration::from_millis(0),
|
||||
Duration::from_millis(1),
|
||||
Duration::from_millis(3),
|
||||
Duration::from_millis(8),
|
||||
Duration::from_millis(18),
|
||||
Duration::from_millis(33),
|
||||
Duration::from_millis(53),
|
||||
Duration::from_millis(78),
|
||||
Duration::from_millis(103),
|
||||
Duration::from_millis(128),
|
||||
Duration::from_millis(178),
|
||||
Duration::from_millis(228),
|
||||
];
|
||||
|
||||
pub fn new(now: Instant) -> Self {
|
||||
Self {
|
||||
timeout: None,
|
||||
max_duration: duration,
|
||||
timeout: now,
|
||||
iteration: 0,
|
||||
accum_duration: duration,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn initiate_timeout(&mut self, now: Instant) {
|
||||
self.timeout = Self::DELAYS.get(self.iteration).and_then(|delay| {
|
||||
if self.accum_duration.is_zero() {
|
||||
None
|
||||
} else {
|
||||
let new_timeout = now + (*delay).min(self.accum_duration);
|
||||
self.accum_duration = self.accum_duration.saturating_sub(*delay);
|
||||
Some(new_timeout)
|
||||
// implementation of sqliteDefaultBusyCallback
|
||||
pub fn busy_callback(&mut self, now: Instant, max_duration: Duration) {
|
||||
let idx = self.iteration.min(11);
|
||||
let mut delay = Self::DELAYS[idx];
|
||||
let mut prior = Self::TOTALS[idx];
|
||||
|
||||
if self.iteration >= 12 {
|
||||
prior += delay * (self.iteration as u32 - 11);
|
||||
}
|
||||
|
||||
if prior + delay > max_duration {
|
||||
delay = max_duration.saturating_sub(prior);
|
||||
// no more waiting after this
|
||||
if delay.is_zero() {
|
||||
return;
|
||||
}
|
||||
});
|
||||
self.iteration = if self.iteration < Self::DELAYS.len() - 1 {
|
||||
self.iteration + 1
|
||||
} else {
|
||||
self.iteration
|
||||
};
|
||||
}
|
||||
|
||||
self.iteration += 1;
|
||||
self.timeout = now + delay;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2254,6 +2267,7 @@ pub struct Statement {
|
||||
/// Flag to show if the statement was busy
|
||||
busy: bool,
|
||||
/// Busy timeout instant
|
||||
/// We need Option here because `io.now()` is not a cheap call
|
||||
busy_timeout: Option<BusyTimeout>,
|
||||
}
|
||||
|
||||
@@ -2299,15 +2313,10 @@ impl Statement {
|
||||
}
|
||||
|
||||
pub fn step(&mut self) -> Result<StepResult> {
|
||||
if let Some(busy_timeout) = self.busy_timeout.as_mut() {
|
||||
if let Some(timeout) = busy_timeout.timeout {
|
||||
let now = self.pager.io.now();
|
||||
|
||||
if now < timeout {
|
||||
// Yield the query as the timeout has not been reached yet
|
||||
return Ok(StepResult::IO);
|
||||
}
|
||||
// Timeout ended now continue to query execution
|
||||
if let Some(busy_timeout) = self.busy_timeout.as_ref() {
|
||||
if self.pager.io.now() < busy_timeout.timeout {
|
||||
// Yield the query as the timeout has not been reached yet
|
||||
return Ok(StepResult::IO);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2353,14 +2362,22 @@ impl Statement {
|
||||
}
|
||||
|
||||
if matches!(res, Ok(StepResult::Busy)) {
|
||||
self.check_if_busy_handler_set();
|
||||
if let Some(busy_timeout) = self.busy_timeout.as_mut() {
|
||||
busy_timeout.initiate_timeout(self.pager.io.now());
|
||||
if busy_timeout.timeout.is_some() {
|
||||
// Yield instead of busy, as now we will try to wait for the timeout
|
||||
// before continuing execution
|
||||
res = Ok(StepResult::IO);
|
||||
let now = self.pager.io.now();
|
||||
let max_duration = self.program.connection.busy_timeout.get();
|
||||
self.busy_timeout = match self.busy_timeout.take() {
|
||||
None => {
|
||||
let mut result = BusyTimeout::new(now);
|
||||
result.busy_callback(now, max_duration);
|
||||
Some(result)
|
||||
}
|
||||
Some(mut bt) => {
|
||||
bt.busy_callback(now, max_duration);
|
||||
Some(bt)
|
||||
}
|
||||
};
|
||||
|
||||
if now < self.busy_timeout.as_ref().unwrap().timeout {
|
||||
res = Ok(StepResult::IO);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2546,7 +2563,7 @@ impl Statement {
|
||||
pub fn _reset(&mut self, max_registers: Option<usize>, max_cursors: Option<usize>) {
|
||||
self.state.reset(max_registers, max_cursors);
|
||||
self.busy = false;
|
||||
self.check_if_busy_handler_set();
|
||||
self.busy_timeout = None;
|
||||
}
|
||||
|
||||
pub fn row(&self) -> Option<&Row> {
|
||||
@@ -2560,30 +2577,6 @@ impl Statement {
|
||||
pub fn is_busy(&self) -> bool {
|
||||
self.busy
|
||||
}
|
||||
|
||||
/// Checks if the busy handler is set in the connection and sets the handler if needed
|
||||
fn check_if_busy_handler_set(&mut self) {
|
||||
let conn_busy_timeout = self
|
||||
.program
|
||||
.connection
|
||||
.busy_timeout
|
||||
.get()
|
||||
.map(BusyTimeout::new);
|
||||
if self.busy_timeout.is_none() {
|
||||
self.busy_timeout = conn_busy_timeout;
|
||||
return;
|
||||
}
|
||||
if let Some(conn_busy_timeout) = conn_busy_timeout {
|
||||
let busy_timeout = self
|
||||
.busy_timeout
|
||||
.as_mut()
|
||||
.expect("busy timeout was checked for None above");
|
||||
// User changed max duration, so clear previous handler and set a new one
|
||||
if busy_timeout.max_duration != conn_busy_timeout.max_duration {
|
||||
*busy_timeout = conn_busy_timeout;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type Row = vdbe::Row;
|
||||
|
||||
@@ -118,9 +118,7 @@ fn update_pragma(
|
||||
_ => bail_parse_error!("expected integer, got {:?}", data),
|
||||
};
|
||||
let busy_timeout_ms = busy_timeout_ms.max(0);
|
||||
connection.set_busy_timeout(Some(std::time::Duration::from_millis(
|
||||
busy_timeout_ms as u64,
|
||||
)));
|
||||
connection.set_busy_timeout(std::time::Duration::from_millis(busy_timeout_ms as u64));
|
||||
Ok((program, TransactionMode::Write))
|
||||
}
|
||||
PragmaName::CacheSize => {
|
||||
@@ -402,13 +400,7 @@ fn query_pragma(
|
||||
Ok((program, TransactionMode::Read))
|
||||
}
|
||||
PragmaName::BusyTimeout => {
|
||||
program.emit_int(
|
||||
connection
|
||||
.get_busy_timeout()
|
||||
.map(|t| t.as_millis() as i64)
|
||||
.unwrap_or_default(),
|
||||
register,
|
||||
);
|
||||
program.emit_int(connection.get_busy_timeout().as_millis() as i64, register);
|
||||
program.emit_result_row(register, 1);
|
||||
program.add_pragma_result_column(pragma.to_string());
|
||||
Ok((program, TransactionMode::None))
|
||||
|
||||
@@ -352,7 +352,7 @@ async fn worker_thread(
|
||||
.await?;
|
||||
}
|
||||
|
||||
conn.busy_timeout(Some(timeout))?;
|
||||
conn.busy_timeout(timeout)?;
|
||||
|
||||
let mut insert_stmt = conn
|
||||
.prepare("INSERT INTO test_table (id, data) VALUES (?, ?)")
|
||||
|
||||
@@ -195,7 +195,7 @@ async fn worker_thread(
|
||||
|
||||
for iteration in 0..iterations {
|
||||
let conn = db.connect()?;
|
||||
conn.busy_timeout(Some(timeout))?;
|
||||
conn.busy_timeout(timeout)?;
|
||||
let total_inserts = Arc::clone(&total_inserts);
|
||||
let tx_fut = async move {
|
||||
let mut stmt = conn
|
||||
|
||||
Reference in New Issue
Block a user