diff --git a/Cargo.lock b/Cargo.lock index c2a368785..3d15cb7b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3630,7 +3630,6 @@ dependencies = [ name = "turso" version = "0.1.1" dependencies = [ - "futures-util", "tempfile", "thiserror 2.0.12", "tokio", diff --git a/bindings/rust/Cargo.toml b/bindings/rust/Cargo.toml index 257123468..557ad6e20 100644 --- a/bindings/rust/Cargo.toml +++ b/bindings/rust/Cargo.toml @@ -10,15 +10,13 @@ repository.workspace = true description = "Turso Rust API" [features] -default = ["futures"] +default = [] experimental_indexes = [] antithesis = ["turso_core/antithesis"] -futures = ["dep:futures-util"] [dependencies] turso_core = { workspace = true, features = ["io_uring"] } thiserror = "2.0.9" -futures-util = { version = "0.3.31", optional = true, default-features = false, features = ["std", "async-await"] } [dev-dependencies] tempfile = "3.20.0" diff --git a/bindings/rust/examples/example.rs b/bindings/rust/examples/example.rs index 1d00a69db..f1d086126 100644 --- a/bindings/rust/examples/example.rs +++ b/bindings/rust/examples/example.rs @@ -1,6 +1,3 @@ -#![cfg(feature = "futures")] - -use futures_util::stream::TryStreamExt; use turso::Builder; #[tokio::main] @@ -9,15 +6,8 @@ async fn main() { let conn = db.connect().unwrap(); - // `query` and other methods, only parse the first query given. - let mut rows = conn.query("select 1; select 1;", ()).await.unwrap(); - // Iterate over the rows with the Stream iterator syntax - while let Some(row) = rows.try_next().await.unwrap() { - let val = row.get_value(0).unwrap(); - println!("{:?}", val); - } + conn.query("select 1; select 1;", ()).await.unwrap(); - // Contrary to `prepare` and `query`, `execute` is not lazy and will execute the query to completion conn.execute("CREATE TABLE IF NOT EXISTS users (email TEXT)", ()) .await .unwrap(); @@ -42,20 +32,9 @@ async fn main() { let mut rows = stmt.query(["foo@example.com"]).await.unwrap(); - while let Some(row) = rows.try_next().await.unwrap() { - let value = row.get_value(0).unwrap(); - println!("Row: {:?}", value); - } + let row = rows.next().await.unwrap().unwrap(); - let rows = stmt.query(["foo@example.com"]).await.unwrap(); + let value = row.get_value(0).unwrap(); - // As `Rows` implement streams you can easily map over it and apply other transformations you see fit - println!("Using Stream map"); - rows.map_ok(|row| row.get_value(0).unwrap()) - .try_for_each(|val| { - println!("Row: {:?}", val.as_text().unwrap()); - futures_util::future::ready(Ok(())) - }) - .await - .unwrap(); + println!("Row: {:?}", value); } diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 5fb1bb0b5..465ca1ca9 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -20,33 +20,28 @@ //! You can also prepare statements with the [`Connection`] object and then execute the [`Statement`] objects: //! //! ```rust,no_run -//! # use crate::turso::futures_util::TryStreamExt; //! # async fn run() { //! # use turso::Builder; //! # let db = Builder::new_local(":memory:").build().await.unwrap(); //! # let conn = db.connect().unwrap(); //! let mut stmt = conn.prepare("SELECT * FROM users WHERE email = ?1").await.unwrap(); //! let mut rows = stmt.query(["foo@example.com"]).await.unwrap(); -//! let row = rows.try_next().await.unwrap().unwrap(); +//! let row = rows.next().await.unwrap().unwrap(); //! let value = row.get_value(0).unwrap(); //! println!("Row: {:?}", value); //! # } //! ``` pub mod params; -pub mod row; -pub mod statement; pub mod value; -#[cfg(feature = "futures")] -pub use futures_util; -pub use params::params_from_iter; pub use value::Value; +pub use params::params_from_iter; + use crate::params::*; -use crate::row::{Row, Rows}; -use crate::statement::Statement; use std::fmt::Debug; +use std::num::NonZero; use std::sync::{Arc, Mutex}; #[derive(Debug, thiserror::Error)] @@ -67,7 +62,7 @@ impl From for Error { pub(crate) type BoxError = Box; -pub type Result = std::result::Result; +pub type Result = std::result::Result; /// A builder for `Database`. pub struct Builder { @@ -218,6 +213,119 @@ impl Debug for Connection { } } +/// A prepared statement. +pub struct Statement { + inner: Arc>, +} + +impl Clone for Statement { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + +unsafe impl Send for Statement {} +unsafe impl Sync for Statement {} + +impl Statement { + /// Query the database with this prepared statement. + pub async fn query(&mut self, params: impl IntoParams) -> Result { + let params = params.into_params()?; + match params { + params::Params::None => (), + params::Params::Positional(values) => { + for (i, value) in values.into_iter().enumerate() { + let mut stmt = self.inner.lock().unwrap(); + stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into()); + } + } + params::Params::Named(values) => { + for (name, value) in values.into_iter() { + let mut stmt = self.inner.lock().unwrap(); + let i = stmt.parameters().index(name).unwrap(); + stmt.bind_at(i, value.into()); + } + } + } + #[allow(clippy::arc_with_non_send_sync)] + let rows = Rows { + inner: Arc::clone(&self.inner), + }; + Ok(rows) + } + + /// Execute this prepared statement. + pub async fn execute(&mut self, params: impl IntoParams) -> Result { + { + // Reset the statement before executing + self.inner.lock().unwrap().reset(); + } + let params = params.into_params()?; + match params { + params::Params::None => (), + params::Params::Positional(values) => { + for (i, value) in values.into_iter().enumerate() { + let mut stmt = self.inner.lock().unwrap(); + stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into()); + } + } + params::Params::Named(values) => { + for (name, value) in values.into_iter() { + let mut stmt = self.inner.lock().unwrap(); + let i = stmt.parameters().index(name).unwrap(); + stmt.bind_at(i, value.into()); + } + } + } + loop { + let mut stmt = self.inner.lock().unwrap(); + match stmt.step() { + Ok(turso_core::StepResult::Row) => { + // unexpected row during execution, error out. + return Ok(2); + } + Ok(turso_core::StepResult::Done) => { + return Ok(0); + } + Ok(turso_core::StepResult::IO) => { + let _ = stmt.run_once(); + //return Ok(1); + } + Ok(turso_core::StepResult::Busy) => { + return Ok(4); + } + Ok(turso_core::StepResult::Interrupt) => { + return Ok(3); + } + Err(err) => { + return Err(err.into()); + } + } + } + } + + /// Returns columns of the result of this prepared statement. + pub fn columns(&self) -> Vec { + let stmt = self.inner.lock().unwrap(); + + let n = stmt.num_columns(); + + let mut cols = Vec::with_capacity(n); + + for i in 0..n { + let name = stmt.get_column_name(i).into_owned(); + cols.push(Column { + name, + decl_type: None, // TODO + }); + } + + cols + } +} + /// Column information. pub struct Column { name: String, @@ -249,27 +357,100 @@ pub enum Params { pub struct Transaction {} +/// Results of a prepared statement query. +pub struct Rows { + inner: Arc>, +} + +impl Clone for Rows { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + +unsafe impl Send for Rows {} +unsafe impl Sync for Rows {} + +impl Rows { + /// Fetch the next row of this result set. + pub async fn next(&mut self) -> Result> { + loop { + let mut stmt = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + match stmt.step() { + Ok(turso_core::StepResult::Row) => { + let row = stmt.row().unwrap(); + return Ok(Some(Row { + values: row.get_values().map(|v| v.to_owned()).collect(), + })); + } + Ok(turso_core::StepResult::Done) => return Ok(None), + Ok(turso_core::StepResult::IO) => { + if let Err(e) = stmt.run_once() { + return Err(e.into()); + } + continue; + } + Ok(turso_core::StepResult::Busy) => return Ok(None), + Ok(turso_core::StepResult::Interrupt) => return Ok(None), + _ => return Ok(None), + } + } + } +} + +/// Query result row. +#[derive(Debug)] +pub struct Row { + values: Vec, +} + +unsafe impl Send for Row {} +unsafe impl Sync for Row {} + +impl Row { + pub fn get_value(&self, index: usize) -> Result { + let value = &self.values[index]; + match value { + turso_core::Value::Integer(i) => Ok(Value::Integer(*i)), + turso_core::Value::Null => Ok(Value::Null), + turso_core::Value::Float(f) => Ok(Value::Real(*f)), + turso_core::Value::Text(text) => Ok(Value::Text(text.to_string())), + turso_core::Value::Blob(items) => Ok(Value::Blob(items.to_vec())), + } + } + + pub fn column_count(&self) -> usize { + self.values.len() + } +} + +impl<'a> FromIterator<&'a turso_core::Value> for Row { + fn from_iter>(iter: T) -> Self { + let values = iter + .into_iter() + .map(|v| match v { + turso_core::Value::Integer(i) => turso_core::Value::Integer(*i), + turso_core::Value::Null => turso_core::Value::Null, + turso_core::Value::Float(f) => turso_core::Value::Float(*f), + turso_core::Value::Text(s) => turso_core::Value::Text(s.clone()), + turso_core::Value::Blob(b) => turso_core::Value::Blob(b.clone()), + }) + .collect(); + + Row { values } + } +} + #[cfg(test)] mod tests { use super::*; - #[cfg(feature = "futures")] - use futures_util::TryStreamExt; use tempfile::NamedTempFile; - #[cfg(not(feature = "futures"))] - macro_rules! rows_next { - ($rows:expr) => { - $rows.next() - }; - } - - #[cfg(feature = "futures")] - macro_rules! rows_next { - ($rows:expr) => { - $rows.try_next() - }; - } - #[tokio::test] async fn test_database_persistence() -> Result<()> { let temp_file = NamedTempFile::new().unwrap(); @@ -298,13 +479,13 @@ mod tests { .query("SELECT name FROM test_persistence ORDER BY id;", ()) .await?; - let row1 = rows_next!(rows).await?.expect("Expected first row"); + let row1 = rows.next().await?.expect("Expected first row"); assert_eq!(row1.get_value(0)?, Value::Text("Alice".to_string())); - let row2 = rows_next!(rows).await?.expect("Expected second row"); + let row2 = rows.next().await?.expect("Expected second row"); assert_eq!(row2.get_value(0)?, Value::Text("Bob".to_string())); - assert!(rows_next!(rows).await?.is_none(), "Expected no more rows"); + assert!(rows.next().await?.is_none(), "Expected no more rows"); Ok(()) } @@ -353,7 +534,8 @@ mod tests { .await?; for (i, value) in original_data.iter().enumerate().take(NUM_INSERTS) { - let row = rows_next!(rows) + let row = rows + .next() .await? .unwrap_or_else(|| panic!("Expected row {} but found None", i)); assert_eq!( @@ -365,7 +547,7 @@ mod tests { } assert!( - rows_next!(rows).await?.is_none(), + rows.next().await?.is_none(), "Expected no more rows after retrieving all inserted data" ); @@ -422,9 +604,9 @@ mod tests { let mut rows_iter = conn .query("SELECT count(*) FROM test_persistence;", ()) .await?; - let rows = rows_next!(rows_iter).await?.unwrap(); + let rows = rows_iter.next().await?.unwrap(); assert_eq!(rows.get_value(0)?, Value::Integer(i as i64 + 1)); - assert!(rows_next!(rows_iter).await?.is_none()); + assert!(rows_iter.next().await?.is_none()); } } diff --git a/bindings/rust/src/params.rs b/bindings/rust/src/params.rs index 8ae3cc28e..82828ead2 100644 --- a/bindings/rust/src/params.rs +++ b/bindings/rust/src/params.rs @@ -9,14 +9,14 @@ mod sealed { use sealed::Sealed; /// Converts some type into parameters that can be passed -/// to Turso. +/// to libsql. /// /// The trait is sealed and not designed to be implemented by hand /// but instead provides a few ways to use it. /// -/// # Passing parameters to Turso +/// # Passing parameters to libsql /// -/// Many functions in this library let you pass parameters to Turso. Doing this +/// Many functions in this library let you pass parameters to libsql. Doing this /// lets you avoid any risk of SQL injection, and is simpler than escaping /// things manually. These functions generally contain some parameter that generically /// accepts some implementation this trait. @@ -27,7 +27,7 @@ use sealed::Sealed; /// /// - For heterogeneous parameter lists of 16 or less items a tuple syntax is supported /// by doing `(1, "foo")`. -/// - For hetergeneous parameter lists of 16 or greater, the [`crate::params!`] is supported +/// - For hetergeneous parameter lists of 16 or greater, the [`turso::params!`] is supported /// by doing `turso::params![1, "foo"]`. /// - For homogeneous parameter types (where they are all the same type), const arrays are /// supported by doing `[1, 2, 3]`. @@ -62,8 +62,8 @@ use sealed::Sealed; /// /// - For heterogeneous parameter lists of 16 or less items a tuple syntax is supported /// by doing `(("key1", 1), ("key2", "foo"))`. -/// - For hetergeneous parameter lists of 16 or greater, the [`crate::named_params!`] is supported -/// by doing `turso::named_params!{"key1": 1, "key2": "foo"}`. +/// - For hetergeneous parameter lists of 16 or greater, the [`turso::params!`] is supported +/// by doing `turso::named_params!["key1": 1, "key2": "foo"]`. /// - For homogeneous parameter types (where they are all the same type), const arrays are /// supported by doing `[("key1", 1), ("key2, 2), ("key3", 3)]`. /// @@ -77,7 +77,7 @@ use sealed::Sealed; /// // Using a tuple: /// stmt.execute(((":key1", 0), (":key2", "foobar"))).await?; /// -/// // Using `named_params!`: +/// // Using `turso::named_params!`: /// stmt.execute(named_params! {":key1": 1i32, ":key2": "blah" }).await?; /// /// // const array: @@ -106,7 +106,7 @@ pub enum Params { /// # Example /// /// ```rust -/// # use turso::{Connection, params_from_iter}; +/// # use turso::{Connection, params_from_iter, Rows}; /// # async fn run(conn: &Connection) { /// /// let iter = vec![1, 2, 3]; diff --git a/bindings/rust/src/row.rs b/bindings/rust/src/row.rs deleted file mode 100644 index 6a631c670..000000000 --- a/bindings/rust/src/row.rs +++ /dev/null @@ -1,158 +0,0 @@ -use std::sync::{Arc, Mutex}; - -use crate::Error; -use crate::Value; - -/// Results of a prepared statement query. -#[must_use = "Rows is lazy and will do nothing unless consumed"] -pub struct Rows { - pub(crate) inner: Arc>, -} - -impl Clone for Rows { - fn clone(&self) -> Self { - Self { - inner: Arc::clone(&self.inner), - } - } -} - -unsafe impl Send for Rows {} -unsafe impl Sync for Rows {} - -#[cfg(not(feature = "futures"))] -impl Rows { - /// Fetch the next row of this result set. - pub async fn next(&mut self) -> crate::Result> { - loop { - let mut stmt = self - .inner - .lock() - .map_err(|e| Error::MutexError(e.to_string()))?; - match stmt.step().inspect_err(|_| stmt.reset())? { - turso_core::StepResult::Row => { - let row = stmt.row().unwrap(); - return Ok(Some(Row { - values: row.get_values().map(|v| v.to_owned()).collect(), - })); - } - turso_core::StepResult::Done => { - stmt.reset(); - return Ok(None); - } - turso_core::StepResult::IO => { - if let Err(e) = stmt.run_once() { - return Err(e.into()); - } - continue; - } - // TODO: Busy should probably be an error - turso_core::StepResult::Busy | turso_core::StepResult::Interrupt => { - stmt.reset(); - return Ok(None); - } - } - } - } -} - -#[cfg(feature = "futures")] -impl futures_util::Stream for Rows { - type Item = crate::Result; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - use std::task::Poll; - let stmt = self - .inner - .lock() - .map_err(|e| Error::MutexError(e.to_string())); - - if let Err(err) = stmt { - return Poll::Ready(Some(Err(err))); - } - let mut stmt = stmt.unwrap(); - match stmt.step() { - Ok(step_result) => match step_result { - turso_core::StepResult::Row => { - let row = stmt.row().unwrap(); - Poll::Ready(Some(Ok(Row { - values: row.get_values().map(|v| v.to_owned()).collect(), - }))) - } - turso_core::StepResult::Done => { - stmt.reset(); - Poll::Ready(None) - } - turso_core::StepResult::IO => { - if let Err(e) = stmt.run_once() { - return Poll::Ready(Some(Err(e.into()))); - } - // TODO: see correct way to signal for this task to wake up - Poll::Pending - } - // TODO: Busy and Interrupt should probably return errors - turso_core::StepResult::Busy | turso_core::StepResult::Interrupt => { - stmt.reset(); - Poll::Ready(None) - } - }, - Err(err) => { - stmt.reset(); - Poll::Ready(Some(Err(Error::from(err)))) - } - } - } -} - -/// Query result row. -#[derive(Debug)] -pub struct Row { - values: Vec, -} - -unsafe impl Send for Row {} -unsafe impl Sync for Row {} - -impl Row { - pub fn get_value(&self, index: usize) -> crate::Result { - let value = &self.values[index]; - match value { - turso_core::Value::Integer(i) => Ok(Value::Integer(*i)), - turso_core::Value::Null => Ok(Value::Null), - turso_core::Value::Float(f) => Ok(Value::Real(*f)), - turso_core::Value::Text(text) => Ok(Value::Text(text.to_string())), - turso_core::Value::Blob(items) => Ok(Value::Blob(items.to_vec())), - } - } - - pub fn column_count(&self) -> usize { - self.values.len() - } -} - -impl<'a> FromIterator<&'a turso_core::Value> for Row { - fn from_iter>(iter: T) -> Self { - let values = iter - .into_iter() - .map(|v| match v { - turso_core::Value::Integer(i) => turso_core::Value::Integer(*i), - turso_core::Value::Null => turso_core::Value::Null, - turso_core::Value::Float(f) => turso_core::Value::Float(*f), - turso_core::Value::Text(s) => turso_core::Value::Text(s.clone()), - turso_core::Value::Blob(b) => turso_core::Value::Blob(b.clone()), - }) - .collect(); - - Row { values } - } -} - -#[cfg(test)] -mod tests { - - #[tokio::test] - async fn test_row() {} -} diff --git a/bindings/rust/src/statement.rs b/bindings/rust/src/statement.rs deleted file mode 100644 index 3b193a460..000000000 --- a/bindings/rust/src/statement.rs +++ /dev/null @@ -1,124 +0,0 @@ -use std::{ - num::NonZero, - sync::{Arc, Mutex}, -}; - -use crate::{ - params::{self, IntoParams}, - Column, Rows, -}; - -/// A prepared statement. -/// -/// Statements when executed or queried are reset after they encounter an error or run to completion -pub struct Statement { - pub(crate) inner: Arc>, -} - -impl Clone for Statement { - fn clone(&self) -> Self { - Self { - inner: Arc::clone(&self.inner), - } - } -} - -unsafe impl Send for Statement {} -unsafe impl Sync for Statement {} - -impl Statement { - /// Query the database with this prepared statement. - pub async fn query(&mut self, params: impl IntoParams) -> crate::Result { - let params = params.into_params()?; - match params { - params::Params::None => (), - params::Params::Positional(values) => { - for (i, value) in values.into_iter().enumerate() { - let mut stmt = self.inner.lock().unwrap(); - stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into()); - } - } - params::Params::Named(values) => { - for (name, value) in values.into_iter() { - let mut stmt = self.inner.lock().unwrap(); - let i = stmt.parameters().index(name).unwrap(); - stmt.bind_at(i, value.into()); - } - } - } - #[allow(clippy::arc_with_non_send_sync)] - let rows = Rows { - inner: Arc::clone(&self.inner), - }; - Ok(rows) - } - - /// Execute this prepared statement. - pub async fn execute(&mut self, params: impl IntoParams) -> crate::Result { - { - // Reset the statement before executing - self.inner.lock().unwrap().reset(); - } - let params = params.into_params()?; - match params { - params::Params::None => (), - params::Params::Positional(values) => { - for (i, value) in values.into_iter().enumerate() { - let mut stmt = self.inner.lock().unwrap(); - stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into()); - } - } - params::Params::Named(values) => { - for (name, value) in values.into_iter() { - let mut stmt = self.inner.lock().unwrap(); - let i = stmt.parameters().index(name).unwrap(); - stmt.bind_at(i, value.into()); - } - } - } - loop { - let mut stmt = self.inner.lock().unwrap(); - match stmt.step() { - Ok(turso_core::StepResult::Row) => {} - Ok(turso_core::StepResult::Done) => { - stmt.reset(); - return Ok(0); - } - Ok(turso_core::StepResult::IO) => { - stmt.run_once()?; - } - Ok(turso_core::StepResult::Busy) => { - stmt.reset(); - return Ok(4); - } - Ok(turso_core::StepResult::Interrupt) => { - stmt.reset(); - return Ok(3); - } - Err(err) => { - stmt.reset(); - return Err(err.into()); - } - } - } - } - - /// Returns columns of the result of this prepared statement. - pub fn columns(&self) -> Vec { - let stmt = self.inner.lock().unwrap(); - - let n = stmt.num_columns(); - - let mut cols = Vec::with_capacity(n); - - for i in 0..n { - let name = stmt.get_column_name(i).into_owned(); - cols.push(Column { - name, - decl_type: None, // TODO - }); - } - - cols - } -} diff --git a/bindings/rust/src/value.rs b/bindings/rust/src/value.rs index ae3fd7f08..567476513 100644 --- a/bindings/rust/src/value.rs +++ b/bindings/rust/src/value.rs @@ -11,7 +11,7 @@ pub enum Value { Blob(Vec), } -/// The possible types a column can be in Turso. +/// The possible types a column can be in libsql. #[derive(Debug, Copy, Clone)] pub enum ValueType { Integer = 1, diff --git a/bindings/rust/tests/integration_tests.rs b/bindings/rust/tests/integration_tests.rs index 55b65c578..14ac01273 100644 --- a/bindings/rust/tests/integration_tests.rs +++ b/bindings/rust/tests/integration_tests.rs @@ -1,21 +1,5 @@ -#[cfg(feature = "futures")] -use futures_util::TryStreamExt; use turso::{Builder, Value}; -#[cfg(not(feature = "futures"))] -macro_rules! rows_next { - ($rows:expr) => { - $rows.next() - }; -} - -#[cfg(feature = "futures")] -macro_rules! rows_next { - ($rows:expr) => { - $rows.try_next() - }; -} - #[tokio::test] async fn test_rows_next() { let builder = Builder::new_local(":memory:"); @@ -50,49 +34,24 @@ async fn test_rows_next() { .unwrap(); let mut res = conn.query("SELECT * FROM test", ()).await.unwrap(); assert_eq!( - rows_next!(res) - .await - .unwrap() - .unwrap() - .get_value(0) - .unwrap(), + res.next().await.unwrap().unwrap().get_value(0).unwrap(), 1.into() ); assert_eq!( - rows_next!(res) - .await - .unwrap() - .unwrap() - .get_value(0) - .unwrap(), + res.next().await.unwrap().unwrap().get_value(0).unwrap(), 2.into() ); assert_eq!( - rows_next!(res) - .await - .unwrap() - .unwrap() - .get_value(0) - .unwrap(), + res.next().await.unwrap().unwrap().get_value(0).unwrap(), 3.into() ); assert_eq!( - rows_next!(res) - .await - .unwrap() - .unwrap() - .get_value(0) - .unwrap(), + res.next().await.unwrap().unwrap().get_value(0).unwrap(), 4.into() ); assert_eq!( - rows_next!(res) - .await - .unwrap() - .unwrap() - .get_value(0) - .unwrap(), + res.next().await.unwrap().unwrap().get_value(0).unwrap(), 5.into() ); - assert!(rows_next!(res).await.unwrap().is_none()); + assert!(res.next().await.unwrap().is_none()); } diff --git a/stress/main.rs b/stress/main.rs index 3cd61016a..d4807fbad 100644 --- a/stress/main.rs +++ b/stress/main.rs @@ -15,7 +15,6 @@ use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; -use turso::futures_util::TryStreamExt; use turso::Builder; pub struct Plan { @@ -523,7 +522,7 @@ async fn main() -> Result<(), Box> { const INTEGRITY_CHECK_INTERVAL: usize = 100; if query_index % INTEGRITY_CHECK_INTERVAL == 0 { let mut res = conn.query("PRAGMA integrity_check", ()).await.unwrap(); - if let Some(row) = res.try_next().await? { + if let Some(row) = res.next().await? { let value = row.get_value(0).unwrap(); if value != "ok".into() { panic!("integrity check failed: {:?}", value);