pass optional waker to step

This commit is contained in:
pedrocarlo
2025-10-01 13:02:09 -03:00
parent e64aa5d014
commit 0d95a2924a
2 changed files with 18 additions and 3 deletions

View File

@@ -63,6 +63,7 @@ pub use io::{
};
use parking_lot::RwLock;
use schema::Schema;
use std::task::Waker;
use std::{
borrow::Cow,
cell::{Cell, RefCell},
@@ -2437,7 +2438,7 @@ impl BusyTimeout {
}
}
self.iteration += 1;
self.iteration = self.iteration.saturating_add(1);
self.timeout = now + delay;
}
}
@@ -2509,7 +2510,7 @@ impl Statement {
self.state.interrupt();
}
pub fn step(&mut self) -> Result<StepResult> {
fn _step(&mut self, waker: Option<&Waker>) -> Result<StepResult> {
if let Some(busy_timeout) = self.busy_timeout.as_ref() {
if self.pager.io.now() < busy_timeout.timeout {
// Yield the query as the timeout has not been reached yet
@@ -2523,6 +2524,7 @@ impl Statement {
self.mv_store.as_ref(),
self.pager.clone(),
self.query_mode,
waker,
)
} else {
const MAX_SCHEMA_RETRY: usize = 50;
@@ -2531,6 +2533,7 @@ impl Statement {
self.mv_store.as_ref(),
self.pager.clone(),
self.query_mode,
waker,
);
for attempt in 0..MAX_SCHEMA_RETRY {
// Only reprepare if we still need to update schema
@@ -2544,6 +2547,7 @@ impl Statement {
self.mv_store.as_ref(),
self.pager.clone(),
self.query_mode,
waker,
);
}
res
@@ -2581,6 +2585,14 @@ impl Statement {
res
}
pub fn step(&mut self) -> Result<StepResult> {
self._step(None)
}
pub fn step_with_waker(&mut self, waker: &Waker) -> Result<StepResult> {
self._step(Some(waker))
}
pub(crate) fn run_ignore_rows(&mut self) -> Result<()> {
loop {
match self.step()? {

View File

@@ -69,6 +69,7 @@ use std::{
atomic::{AtomicI64, Ordering},
Arc,
},
task::Waker,
};
use tracing::{instrument, Level};
@@ -530,9 +531,10 @@ impl Program {
mv_store: Option<&Arc<MvStore>>,
pager: Arc<Pager>,
query_mode: QueryMode,
waker: Option<&Waker>,
) -> Result<StepResult> {
match query_mode {
QueryMode::Normal => self.normal_step(state, mv_store, pager),
QueryMode::Normal => self.normal_step(state, mv_store, pager, waker),
QueryMode::Explain => self.explain_step(state, mv_store, pager),
QueryMode::ExplainQueryPlan => self.explain_query_plan_step(state, mv_store, pager),
}
@@ -645,6 +647,7 @@ impl Program {
state: &mut ProgramState,
mv_store: Option<&Arc<MvStore>>,
pager: Arc<Pager>,
waker: Option<&Waker>,
) -> Result<StepResult> {
let enable_tracing = tracing::enabled!(tracing::Level::TRACE);
loop {