From e93ac38e551f8527c084b177e802ed5ba301f2f2 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 19 Dec 2024 11:41:07 +0200 Subject: [PATCH] Add statement interruption support This adds an interrupt() method to Statement that allows apps to interrupt a running statement. Please note that this is different from `sqlite3_interrupt()` which interrupts all ongoing operations in a database. Although we want to support that too, per statement interrupt is much more useful to apps. --- bindings/python/src/lib.rs | 6 ++++++ bindings/wasm/lib.rs | 5 ++++- cli/app.rs | 4 ++++ core/benches/benchmark.rs | 9 +++++++++ core/lib.rs | 6 ++++++ core/util.rs | 1 + core/vdbe/mod.rs | 14 ++++++++++++++ simulator/main.rs | 3 +++ sqlite3/src/lib.rs | 2 ++ test/src/lib.rs | 7 +++++++ 10 files changed, 56 insertions(+), 1 deletion(-) diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 89ce26603..aca225304 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -134,6 +134,9 @@ impl Cursor { PyErr::new::(format!("IO error: {:?}", e)) })?; } + limbo_core::RowResult::Interrupt => { + return Ok(None); + } limbo_core::RowResult::Done => { return Ok(None); } @@ -165,6 +168,9 @@ impl Cursor { PyErr::new::(format!("IO error: {:?}", e)) })?; } + limbo_core::RowResult::Interrupt => { + return Ok(results); + } limbo_core::RowResult::Done => { return Ok(results); } diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index c456cb617..f4c02a8e0 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -83,7 +83,9 @@ impl Statement { } JsValue::from(row_array) } - Ok(limbo_core::RowResult::IO) | Ok(limbo_core::RowResult::Done) => JsValue::UNDEFINED, + Ok(limbo_core::RowResult::IO) + | Ok(limbo_core::RowResult::Done) + | Ok(limbo_core::RowResult::Interrupt) => JsValue::UNDEFINED, Err(e) => panic!("Error: {:?}", e), } } @@ -101,6 +103,7 @@ impl Statement { array.push(&row_array); } Ok(limbo_core::RowResult::IO) => {} + Ok(limbo_core::RowResult::Interrupt) => break, Ok(limbo_core::RowResult::Done) => break, Err(e) => panic!("Error: {:?}", e), } diff --git a/cli/app.rs b/cli/app.rs index a6bc003a9..34ab20481 100644 --- a/cli/app.rs +++ b/cli/app.rs @@ -521,6 +521,7 @@ impl Limbo { Ok(RowResult::IO) => { self.io.run_once()?; } + Ok(RowResult::Interrupt) => break, Ok(RowResult::Done) => { break; } @@ -557,6 +558,7 @@ impl Limbo { Ok(RowResult::IO) => { self.io.run_once()?; } + Ok(RowResult::Interrupt) => break, Ok(RowResult::Done) => break, Err(err) => { let _ = self.write_fmt(format_args!("{}", err)); @@ -606,6 +608,7 @@ impl Limbo { RowResult::IO => { self.io.run_once()?; } + RowResult::Interrupt => break, RowResult::Done => break, } } @@ -658,6 +661,7 @@ impl Limbo { RowResult::IO => { self.io.run_once()?; } + RowResult::Interrupt => break, RowResult::Done => break, } } diff --git a/core/benches/benchmark.rs b/core/benches/benchmark.rs index cc2093a59..ab1b0aa9a 100644 --- a/core/benches/benchmark.rs +++ b/core/benches/benchmark.rs @@ -46,6 +46,9 @@ fn limbo_bench(criterion: &mut Criterion) { limbo_core::RowResult::IO => { io.run_once().unwrap(); } + limbo_core::RowResult::Interrupt => { + unreachable!(); + } limbo_core::RowResult::Done => { unreachable!(); } @@ -68,6 +71,9 @@ fn limbo_bench(criterion: &mut Criterion) { limbo_core::RowResult::IO => { io.run_once().unwrap(); } + limbo_core::RowResult::Interrupt => { + unreachable!(); + } limbo_core::RowResult::Done => { unreachable!(); } @@ -91,6 +97,9 @@ fn limbo_bench(criterion: &mut Criterion) { limbo_core::RowResult::IO => { io.run_once().unwrap(); } + limbo_core::RowResult::Interrupt => { + unreachable!(); + } limbo_core::RowResult::Done => { unreachable!(); } diff --git a/core/lib.rs b/core/lib.rs index e80406c72..1f5668d76 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -367,12 +367,17 @@ impl Statement { } } + pub fn interrupt(&mut self) { + self.state.interrupt(); + } + pub fn step(&mut self) -> Result> { let result = self.program.step(&mut self.state, self.pager.clone())?; match result { vdbe::StepResult::Row(row) => Ok(RowResult::Row(Row { values: row.values })), vdbe::StepResult::IO => Ok(RowResult::IO), vdbe::StepResult::Done => Ok(RowResult::Done), + vdbe::StepResult::Interrupt => Ok(RowResult::Interrupt), } } @@ -388,6 +393,7 @@ pub enum RowResult<'a> { Row(Row<'a>), IO, Done, + Interrupt, } pub struct Row<'a> { diff --git a/core/util.rs b/core/util.rs index 4b8a7f43b..66dd94789 100644 --- a/core/util.rs +++ b/core/util.rs @@ -51,6 +51,7 @@ pub fn parse_schema_rows(rows: Option, schema: &mut Schema, io: Arc break, RowResult::Done => break, } } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 069782c9a..6d45eeaec 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -554,6 +554,7 @@ pub enum StepResult<'a> { Done, IO, Row(Record<'a>), + Interrupt, } /// If there is I/O, the instruction is restarted. @@ -589,6 +590,7 @@ pub struct ProgramState { deferred_seek: Option<(CursorID, CursorID)>, ended_coroutine: bool, // flag to notify yield coroutine finished regex_cache: RegexCache, + interrupted: bool, } impl ProgramState { @@ -604,6 +606,7 @@ impl ProgramState { deferred_seek: None, ended_coroutine: false, regex_cache: RegexCache::new(), + interrupted: false, } } @@ -614,6 +617,14 @@ impl ProgramState { pub fn column(&self, i: usize) -> Option { Some(format!("{:?}", self.registers[i])) } + + pub fn interrupt(&mut self) { + self.interrupted = true; + } + + pub fn is_interrupted(&self) -> bool { + self.interrupted + } } #[derive(Debug)] @@ -652,6 +663,9 @@ impl Program { pager: Rc, ) -> Result> { loop { + if state.is_interrupted() { + return Ok(StepResult::Interrupt); + } let insn = &self.insns[state.pc as usize]; trace_insn(self, state.pc as InsnReference, insn); let mut cursors = state.cursors.borrow_mut(); diff --git a/simulator/main.rs b/simulator/main.rs index b7af9854e..fc485132c 100644 --- a/simulator/main.rs +++ b/simulator/main.rs @@ -355,6 +355,9 @@ fn get_all_rows( break 'rows_loop; } } + RowResult::Interrupt => { + break; + } RowResult::Done => { break; } diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 6a86a04fe..bfe990eb9 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -19,6 +19,7 @@ pub const SQLITE_ERROR: ffi::c_int = 1; pub const SQLITE_ABORT: ffi::c_int = 4; pub const SQLITE_BUSY: ffi::c_int = 5; pub const SQLITE_NOMEM: ffi::c_int = 7; +pub const SQLITE_INTERRUPT: ffi::c_int = 9; pub const SQLITE_NOTFOUND: ffi::c_int = 14; pub const SQLITE_MISUSE: ffi::c_int = 21; pub const SQLITE_ROW: ffi::c_int = 100; @@ -235,6 +236,7 @@ pub unsafe extern "C" fn sqlite3_step(stmt: *mut sqlite3_stmt) -> std::ffi::c_in match result { limbo_core::RowResult::IO => SQLITE_BUSY, limbo_core::RowResult::Done => SQLITE_DONE, + limbo_core::RowResult::Interrupt => SQLITE_INTERRUPT, limbo_core::RowResult::Row(row) => { stmt.row.replace(Some(row)); SQLITE_ROW diff --git a/test/src/lib.rs b/test/src/lib.rs index bde88a1f0..53cec37a4 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -93,6 +93,7 @@ mod tests { RowResult::IO => { tmp_db.io.run_once()?; } + RowResult::Interrupt => break, RowResult::Done => break, } }, @@ -160,6 +161,7 @@ mod tests { RowResult::IO => { tmp_db.io.run_once()?; } + RowResult::Interrupt => break, RowResult::Done => break, } }, @@ -233,6 +235,7 @@ mod tests { RowResult::IO => { tmp_db.io.run_once()?; } + RowResult::Interrupt => break, RowResult::Done => break, } }, @@ -295,6 +298,7 @@ mod tests { RowResult::IO => { tmp_db.io.run_once()?; } + RowResult::Interrupt => break, RowResult::Done => break, } }, @@ -355,6 +359,7 @@ mod tests { RowResult::IO => { tmp_db.io.run_once()?; } + RowResult::Interrupt => break, RowResult::Done => break, } } @@ -446,6 +451,7 @@ mod tests { RowResult::IO => { tmp_db.io.run_once()?; } + RowResult::Interrupt => break, RowResult::Done => break, } } @@ -479,6 +485,7 @@ mod tests { RowResult::IO => { tmp_db.io.run_once()?; } + RowResult::Interrupt => break, RowResult::Done => break, } },