This commit is contained in:
pedrocarlo
2025-06-30 15:22:48 -03:00
parent cd280007ff
commit 78107935b5
3 changed files with 39 additions and 42 deletions

View File

@@ -66,7 +66,7 @@ impl From<turso_core::LimboError> for Error {
pub(crate) type BoxError = Box<dyn std::error::Error + Send + Sync>;
pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A builder for `Database`.
pub struct Builder {

View File

@@ -1,5 +1,6 @@
use std::sync::{Arc, Mutex};
use crate::Error;
use crate::Value;
/// Results of a prepared statement query.
@@ -23,7 +24,6 @@ unsafe impl Sync for Rows {}
impl Rows {
/// Fetch the next row of this result set.
pub async fn next(&mut self) -> crate::Result<Option<Row>> {
use crate::Error;
loop {
let mut stmt = self
.inner
@@ -64,48 +64,44 @@ impl futures_util::Stream for Rows {
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use crate::Error;
use std::task::Poll;
loop {
let stmt = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()));
let stmt = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()));
if let Err(err) = stmt {
return Poll::Ready(Some(Err(err)));
}
let mut stmt = stmt.unwrap();
match stmt.step() {
Ok(step_result) => match step_result {
turso_core::StepResult::Row => {
let row = stmt.row().unwrap();
return Poll::Ready(Some(Ok(Row {
values: row.get_values().map(|v| v.to_owned()).collect(),
})));
}
turso_core::StepResult::Done => {
stmt.reset();
return Poll::Ready(None);
}
turso_core::StepResult::IO => {
if let Err(e) = stmt.run_once() {
return Poll::Ready(Some(Err(e.into())));
}
// TODO: see correct way to signal for this task to wake up
return Poll::Pending;
}
// TODO: Busy and Interrupt should probably return errors
turso_core::StepResult::Busy | turso_core::StepResult::Interrupt => {
stmt.reset();
return Poll::Ready(None);
}
},
Err(err) => {
use crate::Error;
stmt.reset();
return Poll::Ready(Some(Err(Error::from(err))));
if let Err(err) = stmt {
return Poll::Ready(Some(Err(err)));
}
let mut stmt = stmt.unwrap();
match stmt.step() {
Ok(step_result) => match step_result {
turso_core::StepResult::Row => {
let row = stmt.row().unwrap();
Poll::Ready(Some(Ok(Row {
values: row.get_values().map(|v| v.to_owned()).collect(),
})))
}
turso_core::StepResult::Done => {
stmt.reset();
Poll::Ready(None)
}
turso_core::StepResult::IO => {
if let Err(e) = stmt.run_once() {
return Poll::Ready(Some(Err(e.into())));
}
// TODO: see correct way to signal for this task to wake up
Poll::Pending
}
// TODO: Busy and Interrupt should probably return errors
turso_core::StepResult::Busy | turso_core::StepResult::Interrupt => {
stmt.reset();
Poll::Ready(None)
}
},
Err(err) => {
stmt.reset();
Poll::Ready(Some(Err(Error::from(err))))
}
}
}

View File

@@ -15,6 +15,7 @@ use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
use turso::futures_util::TryStreamExt;
use turso::Builder;
pub struct Plan {
@@ -522,7 +523,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
const INTEGRITY_CHECK_INTERVAL: usize = 100;
if query_index % INTEGRITY_CHECK_INTERVAL == 0 {
let mut res = conn.query("PRAGMA integrity_check", ()).await.unwrap();
if let Some(row) = res.next().await? {
if let Some(row) = res.try_next().await? {
let value = row.get_value(0).unwrap();
if value != "ok".into() {
panic!("integrity check failed: {:?}", value);