make all run_once be run under statement or connection so that rollback is called

This commit is contained in:
pedrocarlo
2025-07-04 13:19:48 -03:00
parent 5559c45011
commit 711b1ef114
26 changed files with 151 additions and 157 deletions

View File

@@ -7,7 +7,7 @@ use turso_core::{LimboError, Statement, StepResult, Value};
pub struct LimboRows<'conn> {
stmt: Box<Statement>,
conn: &'conn mut LimboConn,
_conn: &'conn mut LimboConn,
err: Option<LimboError>,
}
@@ -15,7 +15,7 @@ impl<'conn> LimboRows<'conn> {
pub fn new(stmt: Statement, conn: &'conn mut LimboConn) -> Self {
LimboRows {
stmt: Box::new(stmt),
conn,
_conn: conn,
err: None,
}
}
@@ -55,8 +55,12 @@ pub extern "C" fn rows_next(ctx: *mut c_void) -> ResultCode {
Ok(StepResult::Row) => ResultCode::Row,
Ok(StepResult::Done) => ResultCode::Done,
Ok(StepResult::IO) => {
let _ = ctx.conn.io.run_once();
ResultCode::Io
let res = ctx.stmt.run_once();
if res.is_err() {
ResultCode::Error
} else {
ResultCode::Io
}
}
Ok(StepResult::Busy) => ResultCode::Busy,
Ok(StepResult::Interrupt) => ResultCode::Interrupt,

View File

@@ -64,7 +64,10 @@ pub extern "C" fn stmt_execute(
return ResultCode::Done;
}
Ok(StepResult::IO) => {
let _ = stmt.conn.io.run_once();
let res = statement.run_once();
if res.is_err() {
return ResultCode::Error;
}
}
Ok(StepResult::Busy) => {
return ResultCode::Busy;

View File

@@ -13,12 +13,12 @@ use turso_core::Connection;
#[derive(Clone)]
pub struct TursoConnection {
pub(crate) conn: Arc<Connection>,
pub(crate) io: Arc<dyn turso_core::IO>,
pub(crate) _io: Arc<dyn turso_core::IO>,
}
impl TursoConnection {
pub fn new(conn: Arc<Connection>, io: Arc<dyn turso_core::IO>) -> Self {
TursoConnection { conn, io }
TursoConnection { conn, _io: io }
}
#[allow(clippy::wrong_self_convention)]

View File

@@ -76,7 +76,7 @@ pub extern "system" fn Java_tech_turso_core_TursoStatement_step<'local>(
};
}
StepResult::IO => {
if let Err(e) = stmt.connection.io.run_once() {
if let Err(e) = stmt.stmt.run_once() {
set_err_msg_and_throw_exception(&mut env, obj, TURSO_ETC, e.to_string());
return to_turso_step_result(&mut env, STEP_RESULT_ID_ERROR, None);
}

View File

@@ -41,7 +41,7 @@ pub struct Database {
pub name: String,
_db: Arc<turso_core::Database>,
conn: Arc<turso_core::Connection>,
io: Arc<dyn turso_core::IO>,
_io: Arc<dyn turso_core::IO>,
}
impl ObjectFinalize for Database {
@@ -82,7 +82,7 @@ impl Database {
conn,
open: true,
name: path,
io,
_io: io,
})
}
@@ -114,7 +114,7 @@ impl Database {
return Ok(env.get_undefined()?.into_unknown())
}
turso_core::StepResult::IO => {
self.io.run_once().map_err(into_napi_error)?;
stmt.run_once().map_err(into_napi_error)?;
continue;
}
step @ turso_core::StepResult::Interrupt
@@ -185,7 +185,7 @@ impl Database {
Ok(Some(mut stmt)) => loop {
match stmt.step() {
Ok(StepResult::Row) => continue,
Ok(StepResult::IO) => self.io.run_once().map_err(into_napi_error)?,
Ok(StepResult::IO) => stmt.run_once().map_err(into_napi_error)?,
Ok(StepResult::Done) => break,
Ok(StepResult::Interrupt | StepResult::Busy) => {
return Err(napi::Error::new(
@@ -308,7 +308,7 @@ impl Statement {
}
turso_core::StepResult::Done => return Ok(env.get_undefined()?.into_unknown()),
turso_core::StepResult::IO => {
self.database.io.run_once().map_err(into_napi_error)?;
stmt.run_once().map_err(into_napi_error)?;
continue;
}
turso_core::StepResult::Interrupt | turso_core::StepResult::Busy => {
@@ -338,7 +338,7 @@ impl Statement {
self.check_and_bind(args)?;
Ok(IteratorStatement {
stmt: Rc::clone(&self.inner),
database: self.database.clone(),
_database: self.database.clone(),
env,
presentation_mode: self.presentation_mode.clone(),
})
@@ -401,7 +401,7 @@ impl Statement {
break;
}
turso_core::StepResult::IO => {
self.database.io.run_once().map_err(into_napi_error)?;
stmt.run_once().map_err(into_napi_error)?;
}
turso_core::StepResult::Interrupt | turso_core::StepResult::Busy => {
return Err(napi::Error::new(
@@ -480,7 +480,7 @@ impl Statement {
#[napi(iterator)]
pub struct IteratorStatement {
stmt: Rc<RefCell<turso_core::Statement>>,
database: Database,
_database: Database,
env: Env,
presentation_mode: PresentationMode,
}
@@ -528,7 +528,7 @@ impl Generator for IteratorStatement {
}
turso_core::StepResult::Done => return None,
turso_core::StepResult::IO => {
self.database.io.run_once().ok()?;
stmt.run_once().ok()?;
continue;
}
turso_core::StepResult::Interrupt | turso_core::StepResult::Busy => return None,

View File

@@ -96,14 +96,12 @@ impl Cursor {
// For DDL and DML statements,
// we need to execute the statement immediately
if stmt_is_ddl || stmt_is_dml || stmt_is_tx {
let mut stmt = stmt.borrow_mut();
while let turso_core::StepResult::IO = stmt
.borrow_mut()
.step()
.map_err(|e| PyErr::new::<OperationalError, _>(format!("Step error: {:?}", e)))?
{
self.conn
.io
.run_once()
stmt.run_once()
.map_err(|e| PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e)))?;
}
}
@@ -132,7 +130,7 @@ impl Cursor {
return Ok(Some(py_row));
}
turso_core::StepResult::IO => {
self.conn.io.run_once().map_err(|e| {
stmt.run_once().map_err(|e| {
PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e))
})?;
}
@@ -168,7 +166,7 @@ impl Cursor {
results.push(py_row);
}
turso_core::StepResult::IO => {
self.conn.io.run_once().map_err(|e| {
stmt.run_once().map_err(|e| {
PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e))
})?;
}
@@ -233,7 +231,7 @@ fn stmt_is_tx(sql: &str) -> bool {
#[derive(Clone)]
pub struct Connection {
conn: Arc<turso_core::Connection>,
io: Arc<dyn turso_core::IO>,
_io: Arc<dyn turso_core::IO>,
}
#[pymethods]
@@ -308,7 +306,7 @@ impl Drop for Connection {
#[pyfunction]
pub fn connect(path: &str) -> Result<Connection> {
match turso_core::Connection::from_uri(path, false, false) {
Ok((io, conn)) => Ok(Connection { conn, io }),
Ok((io, conn)) => Ok(Connection { conn, _io: io }),
Err(e) => Err(PyErr::new::<ProgrammingError, _>(format!(
"Failed to create connection: {:?}",
e

View File

@@ -96,7 +96,7 @@ macro_rules! query_internal {
$body(row)?;
}
StepResult::IO => {
$self.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@@ -176,7 +176,6 @@ impl Limbo {
pub fn with_readline(mut self, mut rl: Editor<LimboHelper, DefaultHistory>) -> Self {
let h = LimboHelper::new(
self.conn.clone(),
self.io.clone(),
self.config.as_ref().map(|c| c.highlight.clone()),
);
rl.set_helper(Some(h));
@@ -645,8 +644,7 @@ impl Limbo {
let _ = self.show_info();
}
Command::Import(args) => {
let mut import_file =
ImportFile::new(self.conn.clone(), self.io.clone(), &mut self.writer);
let mut import_file = ImportFile::new(self.conn.clone(), &mut self.writer);
import_file.import(args)
}
Command::LoadExtension(args) => {
@@ -741,7 +739,7 @@ impl Limbo {
}
Ok(StepResult::IO) => {
let start = Instant::now();
self.io.run_once()?;
rows.run_once()?;
if let Some(ref mut stats) = statistics {
stats.io_time_elapsed_samples.push(start.elapsed());
}
@@ -834,7 +832,7 @@ impl Limbo {
}
Ok(StepResult::IO) => {
let start = Instant::now();
self.io.run_once()?;
rows.run_once()?;
if let Some(ref mut stats) = statistics {
stats.io_time_elapsed_samples.push(start.elapsed());
}
@@ -946,7 +944,7 @@ impl Limbo {
}
}
StepResult::IO => {
self.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@@ -1002,7 +1000,7 @@ impl Limbo {
}
}
StepResult::IO => {
self.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@@ -1053,7 +1051,7 @@ impl Limbo {
}
}
StepResult::IO => {
self.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,

View File

@@ -21,17 +21,12 @@ pub struct ImportArgs {
pub struct ImportFile<'a> {
conn: Arc<Connection>,
io: Arc<dyn turso_core::IO>,
writer: &'a mut dyn Write,
}
impl<'a> ImportFile<'a> {
pub fn new(
conn: Arc<Connection>,
io: Arc<dyn turso_core::IO>,
writer: &'a mut dyn Write,
) -> Self {
Self { conn, io, writer }
pub fn new(conn: Arc<Connection>, writer: &'a mut dyn Write) -> Self {
Self { conn, writer }
}
pub fn import(&mut self, args: ImportArgs) {
@@ -79,7 +74,7 @@ impl<'a> ImportFile<'a> {
while let Ok(x) = rows.step() {
match x {
turso_core::StepResult::IO => {
self.io.run_once().unwrap();
rows.run_once().unwrap();
}
turso_core::StepResult::Done => break,
turso_core::StepResult::Interrupt => break,

View File

@@ -40,11 +40,7 @@ pub struct LimboHelper {
}
impl LimboHelper {
pub fn new(
conn: Arc<Connection>,
io: Arc<dyn turso_core::IO>,
syntax_config: Option<HighlightConfig>,
) -> Self {
pub fn new(conn: Arc<Connection>, syntax_config: Option<HighlightConfig>) -> Self {
// Load only predefined syntax
let ps = from_uncompressed_data(include_bytes!(concat!(
env!("OUT_DIR"),
@@ -59,7 +55,7 @@ impl LimboHelper {
}
}
LimboHelper {
completer: SqlCompleter::new(conn, io),
completer: SqlCompleter::new(conn),
syntax_set: ps,
theme_set: ts,
syntax_config: syntax_config.unwrap_or_default(),
@@ -141,7 +137,6 @@ impl Highlighter for LimboHelper {
pub struct SqlCompleter<C: Parser + Send + Sync + 'static> {
conn: Arc<Connection>,
io: Arc<dyn turso_core::IO>,
// Has to be a ref cell as Rustyline takes immutable reference to self
// This problem would be solved with Reedline as it uses &mut self for completions
cmd: RefCell<clap::Command>,
@@ -149,10 +144,9 @@ pub struct SqlCompleter<C: Parser + Send + Sync + 'static> {
}
impl<C: Parser + Send + Sync + 'static> SqlCompleter<C> {
pub fn new(conn: Arc<Connection>, io: Arc<dyn turso_core::IO>) -> Self {
pub fn new(conn: Arc<Connection>) -> Self {
Self {
conn,
io,
cmd: C::command().into(),
_cmd_phantom: PhantomData,
}
@@ -228,7 +222,7 @@ impl<C: Parser + Send + Sync + 'static> SqlCompleter<C> {
candidates.push(pair);
}
StepResult::IO => {
try_result!(self.io.run_once(), (prefix_pos, candidates));
try_result!(rows.run_once(), (prefix_pos, candidates));
}
StepResult::Interrupt => break,
StepResult::Done => break,

View File

@@ -1,7 +1,7 @@
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use pprof::criterion::{Output, PProfProfiler};
use std::sync::Arc;
use turso_core::{Database, PlatformIO, IO};
use turso_core::{Database, PlatformIO};
fn rusqlite_open() -> rusqlite::Connection {
let sqlite_conn = rusqlite::Connection::open("../testing/testing.db").unwrap();
@@ -79,7 +79,6 @@ fn bench_execute_select_rows(criterion: &mut Criterion) {
let mut stmt = limbo_conn
.prepare(format!("SELECT * FROM users LIMIT {}", *i))
.unwrap();
let io = io.clone();
b.iter(|| {
loop {
match stmt.step().unwrap() {
@@ -87,7 +86,7 @@ fn bench_execute_select_rows(criterion: &mut Criterion) {
black_box(stmt.row());
}
turso_core::StepResult::IO => {
let _ = io.run_once();
stmt.run_once().unwrap();
}
turso_core::StepResult::Done => {
break;
@@ -141,7 +140,6 @@ fn bench_execute_select_1(criterion: &mut Criterion) {
group.bench_function("limbo_execute_select_1", |b| {
let mut stmt = limbo_conn.prepare("SELECT 1").unwrap();
let io = io.clone();
b.iter(|| {
loop {
match stmt.step().unwrap() {
@@ -149,7 +147,7 @@ fn bench_execute_select_1(criterion: &mut Criterion) {
black_box(stmt.row());
}
turso_core::StepResult::IO => {
let _ = io.run_once();
stmt.run_once().unwrap();
}
turso_core::StepResult::Done => {
break;
@@ -194,7 +192,6 @@ fn bench_execute_select_count(criterion: &mut Criterion) {
group.bench_function("limbo_execute_select_count", |b| {
let mut stmt = limbo_conn.prepare("SELECT count() FROM users").unwrap();
let io = io.clone();
b.iter(|| {
loop {
match stmt.step().unwrap() {
@@ -202,7 +199,7 @@ fn bench_execute_select_count(criterion: &mut Criterion) {
black_box(stmt.row());
}
turso_core::StepResult::IO => {
let _ = io.run_once();
stmt.run_once().unwrap();
}
turso_core::StepResult::Done => {
break;

View File

@@ -4,7 +4,7 @@ use pprof::{
flamegraph::Options,
};
use std::sync::Arc;
use turso_core::{Database, PlatformIO, IO};
use turso_core::{Database, PlatformIO};
// Title: JSONB Function Benchmarking
@@ -447,13 +447,12 @@ fn bench(criterion: &mut Criterion) {
group.bench_function("Limbo", |b| {
let mut stmt = limbo_conn.prepare(&query).unwrap();
let io = io.clone();
b.iter(|| {
loop {
match stmt.step().unwrap() {
turso_core::StepResult::Row => {}
turso_core::StepResult::IO => {
let _ = io.run_once();
stmt.run_once().unwrap();
}
turso_core::StepResult::Done => {
break;
@@ -606,13 +605,12 @@ fn bench_sequential_jsonb(criterion: &mut Criterion) {
group.bench_function("Limbo - Sequential", |b| {
let mut stmt = limbo_conn.prepare(&query).unwrap();
let io = io.clone();
b.iter(|| {
loop {
match stmt.step().unwrap() {
turso_core::StepResult::Row => {}
turso_core::StepResult::IO => {
let _ = io.run_once();
stmt.run_once().unwrap();
}
turso_core::StepResult::Done => {
break;
@@ -899,13 +897,12 @@ fn bench_json_patch(criterion: &mut Criterion) {
group.bench_function("Limbo", |b| {
let mut stmt = limbo_conn.prepare(&query).unwrap();
let io = io.clone();
b.iter(|| {
loop {
match stmt.step().unwrap() {
turso_core::StepResult::Row => {}
turso_core::StepResult::IO => {
let _ = io.run_once();
stmt.run_once().unwrap();
}
turso_core::StepResult::Done => {
break;

View File

@@ -2,7 +2,7 @@ use std::sync::Arc;
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, SamplingMode};
use pprof::criterion::{Output, PProfProfiler};
use turso_core::{Database, PlatformIO, IO as _};
use turso_core::{Database, PlatformIO};
const TPC_H_PATH: &str = "../perf/tpc-h/TPC-H.db";
@@ -97,7 +97,7 @@ fn bench_tpc_h_queries(criterion: &mut Criterion) {
black_box(stmt.row());
}
turso_core::StepResult::IO => {
let _ = io.run_once();
stmt.run_once().unwrap();
}
turso_core::StepResult::Done => {
break;

View File

@@ -65,7 +65,10 @@ pub unsafe extern "C" fn execute(
return ResultCode::OK;
}
Ok(StepResult::IO) => {
let _ = conn.pager.io.run_once();
let res = stmt.run_once();
if res.is_err() {
return ResultCode::Error;
}
continue;
}
Ok(StepResult::Interrupt) => return ResultCode::Interrupt,
@@ -154,7 +157,6 @@ pub unsafe extern "C" fn stmt_step(stmt: *mut Stmt) -> ResultCode {
tracing::error!("stmt_step: null connection or context");
return ResultCode::Error;
}
let conn: &Connection = unsafe { &*(stmt._conn as *const Connection) };
let stmt_ctx: &mut Statement = unsafe { &mut *(stmt._ctx as *mut Statement) };
while let Ok(res) = stmt_ctx.step() {
match res {
@@ -162,7 +164,10 @@ pub unsafe extern "C" fn stmt_step(stmt: *mut Stmt) -> ResultCode {
StepResult::Done => return ResultCode::EOF,
StepResult::IO => {
// always handle IO step result internally.
let _ = conn.pager.io.run_once();
let res = stmt_ctx.run_once();
if res.is_err() {
return ResultCode::Error;
}
continue;
}
StepResult::Interrupt => return ResultCode::Interrupt,

View File

@@ -228,7 +228,7 @@ impl Database {
if is_empty == 2 {
// parse schema
let conn = db.connect()?;
let schema_version = get_schema_version(&conn, &io)?;
let schema_version = get_schema_version(&conn)?;
schema.write().schema_version = schema_version;
let rows = conn.query("SELECT * FROM sqlite_schema")?;
let mut schema = schema
@@ -236,7 +236,7 @@ impl Database {
.expect("lock on schema should succeed first try");
let syms = conn.syms.borrow();
if let Err(LimboError::ExtensionError(e)) =
parse_schema_rows(rows, &mut schema, io, &syms, None)
parse_schema_rows(rows, &mut schema, &syms, None)
{
// this means that a vtab exists and we no longer have the module loaded. we print
// a warning to the user to load the module
@@ -401,7 +401,7 @@ impl Database {
}
}
fn get_schema_version(conn: &Arc<Connection>, io: &Arc<dyn IO>) -> Result<u32> {
fn get_schema_version(conn: &Arc<Connection>) -> Result<u32> {
let mut rows = conn
.query("PRAGMA schema_version")?
.ok_or(LimboError::InternalError(
@@ -420,7 +420,7 @@ fn get_schema_version(conn: &Arc<Connection>, io: &Arc<dyn IO>) -> Result<u32> {
schema_version = Some(row.get::<i64>(0)? as u32);
}
StepResult::IO => {
io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => {
return Err(LimboError::InternalError(
@@ -621,7 +621,7 @@ impl Connection {
if matches!(res, StepResult::Done) {
break;
}
self._db.io.run_once()?;
self.run_once()?;
}
}
}
@@ -629,6 +629,15 @@ impl Connection {
Ok(())
}
fn run_once(&self) -> Result<()> {
let res = self._db.io.run_once();
if res.is_err() {
let state = self.transaction_state.get();
self.pager.rollback(state.change_schema(), self)?;
}
res
}
#[cfg(feature = "fs")]
pub fn from_uri(
uri: &str,
@@ -767,7 +776,7 @@ impl Connection {
{
let syms = self.syms.borrow();
if let Err(LimboError::ExtensionError(e)) =
parse_schema_rows(rows, &mut schema, self.pager.io.clone(), &syms, None)
parse_schema_rows(rows, &mut schema, &syms, None)
{
// this means that a vtab exists and we no longer have the module loaded. we print
// a warning to the user to load the module
@@ -894,7 +903,13 @@ impl Statement {
}
pub fn run_once(&self) -> Result<()> {
self.pager.io.run_once()
let res = self.pager.io.run_once();
if res.is_err() {
let state = self.program.connection.transaction_state.get();
self.pager
.rollback(state.change_schema(), &self.program.connection)?;
}
res
}
pub fn num_columns(&self) -> usize {

View File

@@ -957,7 +957,7 @@ impl Pager {
CheckpointMode::Passive,
) {
Ok(CheckpointStatus::IO) => {
let _ = self.io.run_once();
self.io.run_once()?;
}
Ok(CheckpointStatus::Done(res)) => {
checkpoint_result = res;

View File

@@ -3,7 +3,7 @@ use crate::{
schema::{self, Column, Schema, Type},
translate::{collate::CollationSeq, expr::walk_expr, plan::JoinOrderMember},
types::{Value, ValueType},
LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable, IO,
LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable,
};
use std::{rc::Rc, sync::Arc};
use turso_sqlite3_parser::ast::{
@@ -51,7 +51,6 @@ struct UnparsedFromSqlIndex {
pub fn parse_schema_rows(
rows: Option<Statement>,
schema: &mut Schema,
io: Arc<dyn IO>,
syms: &SymbolTable,
mv_tx_id: Option<u64>,
) -> Result<()> {
@@ -130,7 +129,7 @@ pub fn parse_schema_rows(
StepResult::IO => {
// TODO: How do we ensure that the I/O we submitted to
// read the schema is actually complete?
io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,

View File

@@ -4978,7 +4978,6 @@ pub fn op_parse_schema(
parse_schema_rows(
Some(stmt),
&mut new_schema,
conn.pager.io.clone(),
&conn.syms.borrow(),
state.mv_tx_id,
)?;
@@ -4993,7 +4992,6 @@ pub fn op_parse_schema(
parse_schema_rows(
Some(stmt),
&mut new_schema,
conn.pager.io.clone(),
&conn.syms.borrow(),
state.mv_tx_id,
)?;

View File

@@ -194,7 +194,7 @@ fn do_fuzz(expr: Expr) -> Result<Corpus, Box<dyn Error>> {
loop {
use turso_core::StepResult;
match stmt.step()? {
StepResult::IO => io.run_once()?,
StepResult::IO => stmt.run_once()?,
StepResult::Row => {
let row = stmt.row().unwrap();
assert_eq!(row.len(), 1, "expr: {:?}", expr);

View File

@@ -7,14 +7,14 @@ use std::{
};
use serde::{Deserialize, Serialize};
use turso_core::{Connection, Result, StepResult, IO};
use turso_core::{Connection, Result, StepResult};
use crate::{
model::{
query::{update::Update, Create, CreateIndex, Delete, Drop, Insert, Query, Select},
table::SimValue,
},
runner::{env::SimConnection, io::SimulatorIO},
runner::env::SimConnection,
SimulatorEnv,
};
@@ -411,7 +411,7 @@ impl Interaction {
}
}
}
pub(crate) fn execute_query(&self, conn: &mut Arc<Connection>, io: &SimulatorIO) -> ResultSet {
pub(crate) fn execute_query(&self, conn: &mut Arc<Connection>) -> ResultSet {
if let Self::Query(query) = self {
let query_str = query.to_string();
let rows = conn.query(&query_str);
@@ -440,7 +440,7 @@ impl Interaction {
out.push(r);
}
StepResult::IO => {
io.run_once().unwrap();
rows.run_once().unwrap();
}
StepResult::Interrupt => {}
StepResult::Done => {

View File

@@ -191,7 +191,7 @@ pub(crate) fn execute_interaction(
SimConnection::Disconnected => unreachable!(),
};
let results = interaction.execute_query(conn, &env.io);
let results = interaction.execute_query(conn);
tracing::debug!(?results);
stack.push(results);
limbo_integrity_check(conn)?;

View File

@@ -247,12 +247,11 @@ pub unsafe extern "C" fn sqlite3_step(stmt: *mut sqlite3_stmt) -> ffi::c_int {
let stmt = &mut *stmt;
let db = &mut *stmt.db;
loop {
let db = db.inner.lock().unwrap();
let _db = db.inner.lock().unwrap();
if let Ok(result) = stmt.stmt.step() {
match result {
turso_core::StepResult::IO => {
let io = db.io.clone();
io.run_once().unwrap();
stmt.stmt.run_once().unwrap();
continue;
}
turso_core::StepResult::Done => return SQLITE_DONE,

View File

@@ -183,7 +183,7 @@ pub(crate) fn sqlite_exec_rows(
}
pub(crate) fn limbo_exec_rows(
db: &TempDatabase,
_db: &TempDatabase,
conn: &Arc<turso_core::Connection>,
query: &str,
) -> Vec<Vec<rusqlite::types::Value>> {
@@ -198,7 +198,7 @@ pub(crate) fn limbo_exec_rows(
break row;
}
turso_core::StepResult::IO => {
db.io.run_once().unwrap();
stmt.run_once().unwrap();
continue;
}
turso_core::StepResult::Done => break 'outer,
@@ -221,7 +221,7 @@ pub(crate) fn limbo_exec_rows(
}
pub(crate) fn limbo_exec_rows_error(
db: &TempDatabase,
_db: &TempDatabase,
conn: &Arc<turso_core::Connection>,
query: &str,
) -> turso_core::Result<()> {
@@ -230,7 +230,7 @@ pub(crate) fn limbo_exec_rows_error(
let result = stmt.step()?;
match result {
turso_core::StepResult::IO => {
db.io.run_once()?;
stmt.run_once()?;
continue;
}
turso_core::StepResult::Done => return Ok(()),

View File

@@ -16,7 +16,7 @@ fn test_last_insert_rowid_basic() -> anyhow::Result<()> {
loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
@@ -36,7 +36,7 @@ fn test_last_insert_rowid_basic() -> anyhow::Result<()> {
}
}
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@@ -50,7 +50,7 @@ fn test_last_insert_rowid_basic() -> anyhow::Result<()> {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
@@ -72,7 +72,7 @@ fn test_last_insert_rowid_basic() -> anyhow::Result<()> {
}
}
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@@ -101,7 +101,7 @@ fn test_integer_primary_key() -> anyhow::Result<()> {
let mut insert_query = conn.query(query)?.unwrap();
loop {
match insert_query.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => insert_query.run_once()?,
StepResult::Done => break,
_ => unreachable!(),
}
@@ -117,7 +117,7 @@ fn test_integer_primary_key() -> anyhow::Result<()> {
rowids.push(*id);
}
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => select_query.run_once()?,
StepResult::Interrupt | StepResult::Done => break,
StepResult::Busy => panic!("Database is busy"),
}

View File

@@ -19,7 +19,7 @@ fn test_statement_reset_bind() -> anyhow::Result<()> {
turso_core::Value::Integer(1)
);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => stmt.run_once()?,
_ => break,
}
}
@@ -37,7 +37,7 @@ fn test_statement_reset_bind() -> anyhow::Result<()> {
turso_core::Value::Integer(2)
);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => stmt.run_once()?,
_ => break,
}
}
@@ -88,7 +88,7 @@ fn test_statement_bind() -> anyhow::Result<()> {
}
}
StepResult::IO => {
tmp_db.io.run_once()?;
stmt.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@@ -125,7 +125,7 @@ fn test_insert_parameter_remap() -> anyhow::Result<()> {
}
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@@ -150,7 +150,7 @@ fn test_insert_parameter_remap() -> anyhow::Result<()> {
// D = 22
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(22));
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@@ -196,7 +196,7 @@ fn test_insert_parameter_remap_all_params() -> anyhow::Result<()> {
// execute the insert (no rows returned)
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@@ -222,7 +222,7 @@ fn test_insert_parameter_remap_all_params() -> anyhow::Result<()> {
// D = 999
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(999));
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@@ -264,7 +264,7 @@ fn test_insert_parameter_multiple_remap_backwards() -> anyhow::Result<()> {
// execute the insert (no rows returned)
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@@ -290,7 +290,7 @@ fn test_insert_parameter_multiple_remap_backwards() -> anyhow::Result<()> {
// D = 999
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(444));
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@@ -331,7 +331,7 @@ fn test_insert_parameter_multiple_no_remap() -> anyhow::Result<()> {
// execute the insert (no rows returned)
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@@ -357,7 +357,7 @@ fn test_insert_parameter_multiple_no_remap() -> anyhow::Result<()> {
// D = 999
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(444));
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@@ -402,7 +402,7 @@ fn test_insert_parameter_multiple_row() -> anyhow::Result<()> {
// execute the insert (no rows returned)
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@@ -434,7 +434,7 @@ fn test_insert_parameter_multiple_row() -> anyhow::Result<()> {
);
i += 1;
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@@ -450,7 +450,7 @@ fn test_bind_parameters_update_query() -> anyhow::Result<()> {
let mut ins = conn.prepare("insert into test (a, b) values (3, 'test1');")?;
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@@ -461,7 +461,7 @@ fn test_bind_parameters_update_query() -> anyhow::Result<()> {
ins.bind_at(2.try_into()?, Value::build_text("test1"));
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@@ -476,7 +476,7 @@ fn test_bind_parameters_update_query() -> anyhow::Result<()> {
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::Integer(222));
assert_eq!(row.get::<&Value>(1).unwrap(), &Value::build_text("test1"),);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@@ -495,7 +495,7 @@ fn test_bind_parameters_update_query_multiple_where() -> anyhow::Result<()> {
let mut ins = conn.prepare("insert into test (a, b, c, d) values (3, 'test1', 4, 5);")?;
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@@ -507,7 +507,7 @@ fn test_bind_parameters_update_query_multiple_where() -> anyhow::Result<()> {
ins.bind_at(3.try_into()?, Value::Integer(5));
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@@ -524,7 +524,7 @@ fn test_bind_parameters_update_query_multiple_where() -> anyhow::Result<()> {
assert_eq!(row.get::<&Value>(2).unwrap(), &Value::Integer(4));
assert_eq!(row.get::<&Value>(3).unwrap(), &Value::Integer(5));
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@@ -543,7 +543,7 @@ fn test_bind_parameters_update_rowid_alias() -> anyhow::Result<()> {
let mut ins = conn.prepare("insert into test (id, name) values (1, 'test');")?;
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@@ -558,7 +558,7 @@ fn test_bind_parameters_update_rowid_alias() -> anyhow::Result<()> {
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::Integer(1));
assert_eq!(row.get::<&Value>(1).unwrap(), &Value::build_text("test"),);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@@ -568,7 +568,7 @@ fn test_bind_parameters_update_rowid_alias() -> anyhow::Result<()> {
ins.bind_at(2.try_into()?, Value::Integer(1));
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@@ -583,7 +583,7 @@ fn test_bind_parameters_update_rowid_alias() -> anyhow::Result<()> {
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::Integer(1));
assert_eq!(row.get::<&Value>(1).unwrap(), &Value::build_text("updated"),);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@@ -618,7 +618,7 @@ fn test_bind_parameters_update_rowid_alias_seek_rowid() -> anyhow::Result<()> {
&Value::Integer(if i == 0 { 4 } else { 11 })
);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@@ -631,7 +631,7 @@ fn test_bind_parameters_update_rowid_alias_seek_rowid() -> anyhow::Result<()> {
ins.bind_at(4.try_into()?, Value::Integer(5));
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@@ -649,7 +649,7 @@ fn test_bind_parameters_update_rowid_alias_seek_rowid() -> anyhow::Result<()> {
&Value::build_text(if i == 0 { "updated" } else { "test" }),
);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
@@ -678,7 +678,7 @@ fn test_bind_parameters_delete_rowid_alias_seek_out_of_order() -> anyhow::Result
ins.bind_at(4.try_into()?, Value::build_text("test"));
loop {
match ins.step()? {
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => ins.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
_ => {}
@@ -693,7 +693,7 @@ fn test_bind_parameters_delete_rowid_alias_seek_out_of_order() -> anyhow::Result
let row = sel.row().unwrap();
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::build_text("correct"),);
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}

View File

@@ -42,7 +42,7 @@ fn test_simple_overflow_page() -> anyhow::Result<()> {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
@@ -68,7 +68,7 @@ fn test_simple_overflow_page() -> anyhow::Result<()> {
compare_string(&huge_text, text);
}
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@@ -110,7 +110,7 @@ fn test_sequential_overflow_page() -> anyhow::Result<()> {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
@@ -138,7 +138,7 @@ fn test_sequential_overflow_page() -> anyhow::Result<()> {
current_index += 1;
}
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
@@ -247,7 +247,7 @@ fn test_statement_reset() -> anyhow::Result<()> {
);
break;
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => stmt.run_once()?,
_ => break,
}
}
@@ -264,7 +264,7 @@ fn test_statement_reset() -> anyhow::Result<()> {
);
break;
}
StepResult::IO => tmp_db.io.run_once()?,
StepResult::IO => stmt.run_once()?,
_ => break,
}
}
@@ -748,7 +748,7 @@ fn run_query_on_row(
}
fn run_query_core(
tmp_db: &TempDatabase,
_tmp_db: &TempDatabase,
conn: &Arc<Connection>,
query: &str,
mut on_row: Option<impl FnMut(&Row)>,
@@ -757,7 +757,7 @@ fn run_query_core(
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
rows.run_once()?;
}
StepResult::Done => break,
StepResult::Row => {

View File

@@ -13,7 +13,7 @@ fn test_wal_checkpoint_result() -> Result<()> {
let conn = tmp_db.connect_limbo();
conn.execute("CREATE TABLE t1 (id text);")?;
let res = execute_and_get_strings(&tmp_db, &conn, "pragma journal_mode;")?;
let res = execute_and_get_strings(&conn, "pragma journal_mode;")?;
assert_eq!(res, vec!["wal"]);
conn.execute("insert into t1(id) values (1), (2);")?;
@@ -22,7 +22,7 @@ fn test_wal_checkpoint_result() -> Result<()> {
do_flush(&conn, &tmp_db).unwrap();
// checkpoint result should return > 0 num pages now as database has data
let res = execute_and_get_ints(&tmp_db, &conn, "pragma wal_checkpoint;")?;
let res = execute_and_get_ints(&conn, "pragma wal_checkpoint;")?;
println!("'pragma wal_checkpoint;' returns: {res:?}");
assert_eq!(res.len(), 3);
assert_eq!(res[0], 0); // checkpoint successfully
@@ -46,7 +46,7 @@ fn test_wal_1_writer_1_reader() -> Result<()> {
match rows.step().unwrap() {
StepResult::Row => {}
StepResult::IO => {
tmp_db.lock().unwrap().io.run_once().unwrap();
rows.run_once().unwrap();
}
StepResult::Interrupt => break,
StepResult::Done => break,
@@ -86,7 +86,7 @@ fn test_wal_1_writer_1_reader() -> Result<()> {
i += 1;
}
StepResult::IO => {
tmp_db.lock().unwrap().io.run_once().unwrap();
rows.run_once().unwrap();
}
StepResult::Interrupt => break,
StepResult::Done => break,
@@ -110,11 +110,7 @@ fn test_wal_1_writer_1_reader() -> Result<()> {
}
/// Execute a statement and get strings result
pub(crate) fn execute_and_get_strings(
tmp_db: &TempDatabase,
conn: &Arc<Connection>,
sql: &str,
) -> Result<Vec<String>> {
pub(crate) fn execute_and_get_strings(conn: &Arc<Connection>, sql: &str) -> Result<Vec<String>> {
let statement = conn.prepare(sql)?;
let stmt = Rc::new(RefCell::new(statement));
let mut result = Vec::new();
@@ -130,19 +126,15 @@ pub(crate) fn execute_and_get_strings(
}
StepResult::Done => break,
StepResult::Interrupt => break,
StepResult::IO => tmp_db.io.run_once()?,
StepResult::Busy => tmp_db.io.run_once()?,
StepResult::IO => stmt.run_once()?,
StepResult::Busy => stmt.run_once()?,
}
}
Ok(result)
}
/// Execute a statement and get integers
pub(crate) fn execute_and_get_ints(
tmp_db: &TempDatabase,
conn: &Arc<Connection>,
sql: &str,
) -> Result<Vec<i64>> {
pub(crate) fn execute_and_get_ints(conn: &Arc<Connection>, sql: &str) -> Result<Vec<i64>> {
let statement = conn.prepare(sql)?;
let stmt = Rc::new(RefCell::new(statement));
let mut result = Vec::new();
@@ -166,8 +158,8 @@ pub(crate) fn execute_and_get_ints(
}
StepResult::Done => break,
StepResult::Interrupt => break,
StepResult::IO => tmp_db.io.run_once()?,
StepResult::Busy => tmp_db.io.run_once()?,
StepResult::IO => stmt.run_once()?,
StepResult::Busy => stmt.run_once()?,
}
}
Ok(result)