From 0d95a2924a5c132de5ea39fc809c73d105cc50ac Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 1 Oct 2025 13:02:09 -0300 Subject: [PATCH] pass optional waker to step --- core/lib.rs | 16 ++++++++++++++-- core/vdbe/mod.rs | 5 ++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 2a0a558cf..d233c9c26 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -63,6 +63,7 @@ pub use io::{ }; use parking_lot::RwLock; use schema::Schema; +use std::task::Waker; use std::{ borrow::Cow, cell::{Cell, RefCell}, @@ -2437,7 +2438,7 @@ impl BusyTimeout { } } - self.iteration += 1; + self.iteration = self.iteration.saturating_add(1); self.timeout = now + delay; } } @@ -2509,7 +2510,7 @@ impl Statement { self.state.interrupt(); } - pub fn step(&mut self) -> Result { + fn _step(&mut self, waker: Option<&Waker>) -> Result { if let Some(busy_timeout) = self.busy_timeout.as_ref() { if self.pager.io.now() < busy_timeout.timeout { // Yield the query as the timeout has not been reached yet @@ -2523,6 +2524,7 @@ impl Statement { self.mv_store.as_ref(), self.pager.clone(), self.query_mode, + waker, ) } else { const MAX_SCHEMA_RETRY: usize = 50; @@ -2531,6 +2533,7 @@ impl Statement { self.mv_store.as_ref(), self.pager.clone(), self.query_mode, + waker, ); for attempt in 0..MAX_SCHEMA_RETRY { // Only reprepare if we still need to update schema @@ -2544,6 +2547,7 @@ impl Statement { self.mv_store.as_ref(), self.pager.clone(), self.query_mode, + waker, ); } res @@ -2581,6 +2585,14 @@ impl Statement { res } + pub fn step(&mut self) -> Result { + self._step(None) + } + + pub fn step_with_waker(&mut self, waker: &Waker) -> Result { + self._step(Some(waker)) + } + pub(crate) fn run_ignore_rows(&mut self) -> Result<()> { loop { match self.step()? { diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index bcb4372d5..1728058e5 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -69,6 +69,7 @@ use std::{ atomic::{AtomicI64, Ordering}, Arc, }, + task::Waker, }; use tracing::{instrument, Level}; @@ -530,9 +531,10 @@ impl Program { mv_store: Option<&Arc>, pager: Arc, query_mode: QueryMode, + waker: Option<&Waker>, ) -> Result { match query_mode { - QueryMode::Normal => self.normal_step(state, mv_store, pager), + QueryMode::Normal => self.normal_step(state, mv_store, pager, waker), QueryMode::Explain => self.explain_step(state, mv_store, pager), QueryMode::ExplainQueryPlan => self.explain_query_plan_step(state, mv_store, pager), } @@ -645,6 +647,7 @@ impl Program { state: &mut ProgramState, mv_store: Option<&Arc>, pager: Arc, + waker: Option<&Waker>, ) -> Result { let enable_tracing = tracing::enabled!(tracing::Level::TRACE); loop {