mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-31 13:54:27 +01:00
expose function to set busy timeout duration
This commit is contained in:
@@ -393,6 +393,16 @@ impl Connection {
|
||||
|
||||
Ok(conn.get_auto_commit())
|
||||
}
|
||||
|
||||
/// Sets the max timeout for the busy handler
|
||||
pub fn max_busy_timeout(&self, duration: std::time::Duration) -> Result<()> {
|
||||
let conn = self
|
||||
.inner
|
||||
.lock()
|
||||
.map_err(|e| Error::MutexError(e.to_string()))?;
|
||||
conn.max_busy_timeout(duration);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for Connection {
|
||||
|
||||
57
core/lib.rs
57
core/lib.rs
@@ -486,6 +486,7 @@ impl Database {
|
||||
encryption_cipher_mode: Cell::new(None),
|
||||
sync_mode: Cell::new(SyncMode::Full),
|
||||
data_sync_retry: Cell::new(false),
|
||||
max_busy_timeout: Cell::new(None),
|
||||
});
|
||||
self.n_connections
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
@@ -976,6 +977,8 @@ pub struct Connection {
|
||||
encryption_cipher_mode: Cell<Option<CipherMode>>,
|
||||
sync_mode: Cell<SyncMode>,
|
||||
data_sync_retry: Cell<bool>,
|
||||
/// User defined max Busy timeout duration
|
||||
max_busy_timeout: Cell<Option<std::time::Duration>>,
|
||||
}
|
||||
|
||||
impl Drop for Connection {
|
||||
@@ -2097,12 +2100,18 @@ impl Connection {
|
||||
}
|
||||
pager.set_encryption_context(cipher_mode, key)
|
||||
}
|
||||
|
||||
pub fn max_busy_timeout(&self, duration: std::time::Duration) {
|
||||
self.max_busy_timeout.set(Some(duration));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct BusyTimeout {
|
||||
/// Busy timeout instant
|
||||
timeout: Option<Instant>,
|
||||
/// Max duration for busy timeout
|
||||
max_duration: Duration,
|
||||
iteration: usize,
|
||||
}
|
||||
|
||||
@@ -2122,14 +2131,19 @@ impl BusyTimeout {
|
||||
Duration::from_millis(100),
|
||||
];
|
||||
|
||||
pub fn timeout(&self) -> Option<Instant> {
|
||||
self.timeout
|
||||
pub fn new(duration: std::time::Duration) -> Self {
|
||||
Self {
|
||||
timeout: None,
|
||||
max_duration: duration,
|
||||
iteration: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Modifies in place the next timeout instant
|
||||
pub fn next_timeout(&mut self, now: Instant) {
|
||||
pub fn initiate_timeout(&mut self, now: Instant) {
|
||||
self.iteration = self.iteration.saturating_add(1);
|
||||
self.timeout = Self::DELAYS.get(self.iteration).map(|delay| now + *delay);
|
||||
self.timeout = Self::DELAYS
|
||||
.get(self.iteration)
|
||||
.map(|delay| now + (*delay).min(self.max_duration));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2147,7 +2161,7 @@ pub struct Statement {
|
||||
/// Flag to show if the statement was busy
|
||||
busy: bool,
|
||||
/// Busy timeout instant
|
||||
busy_timeout: BusyTimeout,
|
||||
busy_timeout: Option<BusyTimeout>,
|
||||
}
|
||||
|
||||
impl Statement {
|
||||
@@ -2164,6 +2178,11 @@ impl Statement {
|
||||
QueryMode::ExplainQueryPlan => (EXPLAIN_QUERY_PLAN_COLUMNS.len(), 0),
|
||||
};
|
||||
let state = vdbe::ProgramState::new(max_registers, cursor_count);
|
||||
let busy_timeout = program
|
||||
.connection
|
||||
.max_busy_timeout
|
||||
.get()
|
||||
.map(BusyTimeout::new);
|
||||
Self {
|
||||
program,
|
||||
state,
|
||||
@@ -2172,7 +2191,7 @@ impl Statement {
|
||||
accesses_db,
|
||||
query_mode,
|
||||
busy: false,
|
||||
busy_timeout: BusyTimeout::default(),
|
||||
busy_timeout,
|
||||
}
|
||||
}
|
||||
pub fn get_query_mode(&self) -> QueryMode {
|
||||
@@ -2192,12 +2211,15 @@ impl Statement {
|
||||
}
|
||||
|
||||
pub fn step(&mut self) -> Result<StepResult> {
|
||||
if let Some(instant) = self.busy_timeout.timeout() {
|
||||
let now = self.pager.io.now();
|
||||
if let Some(busy_timeout) = self.busy_timeout.as_mut() {
|
||||
if let Some(instant) = busy_timeout.timeout {
|
||||
let now = self.pager.io.now();
|
||||
|
||||
if instant > now {
|
||||
// Yield the query as the timeout has not been reached yet
|
||||
return Ok(StepResult::IO);
|
||||
if instant > now {
|
||||
// Yield the query as the timeout has not been reached yet
|
||||
return Ok(StepResult::IO);
|
||||
}
|
||||
// Timeout ended now continue to query execution
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2243,10 +2265,13 @@ impl Statement {
|
||||
}
|
||||
|
||||
if matches!(res, Ok(StepResult::Busy)) {
|
||||
self.busy_timeout.next_timeout(self.pager.io.now());
|
||||
if self.busy_timeout.timeout().is_some() {
|
||||
// Yield if there is a next timeout
|
||||
res = Ok(StepResult::IO);
|
||||
if let Some(busy_timeout) = self.busy_timeout.as_mut() {
|
||||
busy_timeout.initiate_timeout(self.pager.io.now());
|
||||
if busy_timeout.timeout.is_some() {
|
||||
// Yield instead of busy, as now we will try to wait for the timeout
|
||||
// before continuing execution
|
||||
res = Ok(StepResult::IO);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -155,6 +155,7 @@ async fn worker_thread(
|
||||
|
||||
for iteration in 0..iterations {
|
||||
let conn = db.connect()?;
|
||||
conn.max_busy_timeout(std::time::Duration::from_millis(10))?;
|
||||
let total_inserts = Arc::clone(&total_inserts);
|
||||
let tx_fut = async move {
|
||||
let mut stmt = conn
|
||||
|
||||
Reference in New Issue
Block a user