add futures-util feature flag + impl Stream for Rows

This commit is contained in:
pedrocarlo
2025-06-29 19:48:52 -03:00
parent 6672334eaa
commit 279a7fca3b
5 changed files with 128 additions and 21 deletions

1
Cargo.lock generated
View File

@@ -3620,6 +3620,7 @@ dependencies = [
name = "turso" name = "turso"
version = "0.1.0-pre.5" version = "0.1.0-pre.5"
dependencies = [ dependencies = [
"futures-util",
"tempfile", "tempfile",
"thiserror 2.0.12", "thiserror 2.0.12",
"tokio", "tokio",

View File

@@ -13,10 +13,12 @@ description = "Turso Rust API"
default = [] default = []
experimental_indexes = [] experimental_indexes = []
antithesis = ["turso_core/antithesis"] antithesis = ["turso_core/antithesis"]
futures = ["dep:futures-util"]
[dependencies] [dependencies]
turso_core = { workspace = true, features = ["io_uring"] } turso_core = { workspace = true, features = ["io_uring"] }
thiserror = "2.0.9" thiserror = "2.0.9"
futures-util = { version = "0.3.31", optional = true, default-features = false, features = ["std", "async-await"] }
[dev-dependencies] [dev-dependencies]
tempfile = "3.20.0" tempfile = "3.20.0"

View File

@@ -1,3 +1,5 @@
#[cfg(feature = "futures")]
use futures_util::stream::StreamExt;
use turso::Builder; use turso::Builder;
#[tokio::main] #[tokio::main]

View File

@@ -374,6 +374,7 @@ unsafe impl Send for Rows {}
unsafe impl Sync for Rows {} unsafe impl Sync for Rows {}
impl Rows { impl Rows {
#[cfg(not(feature = "futures"))]
/// Fetch the next row of this result set. /// Fetch the next row of this result set.
pub async fn next(&mut self) -> Result<Option<Row>> { pub async fn next(&mut self) -> Result<Option<Row>> {
loop { loop {
@@ -381,23 +382,68 @@ impl Rows {
.inner .inner
.lock() .lock()
.map_err(|e| Error::MutexError(e.to_string()))?; .map_err(|e| Error::MutexError(e.to_string()))?;
match stmt.step() { match stmt.step()? {
Ok(turso_core::StepResult::Row) => { turso_core::StepResult::Row => {
let row = stmt.row().unwrap(); let row = stmt.row().unwrap();
return Ok(Some(Row { return Ok(Some(Row {
values: row.get_values().map(|v| v.to_owned()).collect(), values: row.get_values().map(|v| v.to_owned()).collect(),
})); }));
} }
Ok(turso_core::StepResult::Done) => return Ok(None), turso_core::StepResult::Done => return Ok(None),
Ok(turso_core::StepResult::IO) => { turso_core::StepResult::IO => {
if let Err(e) = stmt.run_once() { if let Err(e) = stmt.run_once() {
return Err(e.into()); return Err(e.into());
} }
continue; continue;
} }
Ok(turso_core::StepResult::Busy) => return Ok(None), // TODO: Busy should probably be an error
Ok(turso_core::StepResult::Interrupt) => return Ok(None), turso_core::StepResult::Busy => return Ok(None),
_ => return Ok(None), turso_core::StepResult::Interrupt => return Ok(None),
}
}
}
}
#[cfg(feature = "futures")]
impl futures_util::Stream for Rows {
type Item = Result<Row>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::task::Poll;
loop {
let stmt = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()));
if let Err(err) = stmt {
return Poll::Ready(Some(Err(err)));
}
let mut stmt = stmt.unwrap();
match stmt.step() {
Ok(step_result) => match step_result {
turso_core::StepResult::Row => {
let row = stmt.row().unwrap();
return Poll::Ready(Some(Ok(Row {
values: row.get_values().map(|v| v.to_owned()).collect(),
})));
}
turso_core::StepResult::Done => return Poll::Ready(None),
turso_core::StepResult::IO => {
if let Err(e) = stmt.run_once() {
return Poll::Ready(Some(Err(e.into())));
}
return Poll::Pending;
}
turso_core::StepResult::Busy => return Poll::Ready(None),
turso_core::StepResult::Interrupt => return Poll::Ready(None),
},
Err(err) => {
return Poll::Ready(Some(Err(Error::from(err))));
}
} }
} }
} }
@@ -449,8 +495,24 @@ impl<'a> FromIterator<&'a turso_core::Value> for Row {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
#[cfg(feature = "futures")]
use futures_util::TryStreamExt;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
#[cfg(not(feature = "futures"))]
macro_rules! rows_next {
($rows:expr) => {
$rows.next()
};
}
#[cfg(feature = "futures")]
macro_rules! rows_next {
($rows:expr) => {
$rows.try_next()
};
}
#[tokio::test] #[tokio::test]
async fn test_database_persistence() -> Result<()> { async fn test_database_persistence() -> Result<()> {
let temp_file = NamedTempFile::new().unwrap(); let temp_file = NamedTempFile::new().unwrap();
@@ -479,13 +541,13 @@ mod tests {
.query("SELECT name FROM test_persistence ORDER BY id;", ()) .query("SELECT name FROM test_persistence ORDER BY id;", ())
.await?; .await?;
let row1 = rows.next().await?.expect("Expected first row"); let row1 = rows_next!(rows).await?.expect("Expected first row");
assert_eq!(row1.get_value(0)?, Value::Text("Alice".to_string())); assert_eq!(row1.get_value(0)?, Value::Text("Alice".to_string()));
let row2 = rows.next().await?.expect("Expected second row"); let row2 = rows_next!(rows).await?.expect("Expected second row");
assert_eq!(row2.get_value(0)?, Value::Text("Bob".to_string())); assert_eq!(row2.get_value(0)?, Value::Text("Bob".to_string()));
assert!(rows.next().await?.is_none(), "Expected no more rows"); assert!(rows_next!(rows).await?.is_none(), "Expected no more rows");
Ok(()) Ok(())
} }
@@ -534,8 +596,7 @@ mod tests {
.await?; .await?;
for (i, value) in original_data.iter().enumerate().take(NUM_INSERTS) { for (i, value) in original_data.iter().enumerate().take(NUM_INSERTS) {
let row = rows let row = rows_next!(rows)
.next()
.await? .await?
.unwrap_or_else(|| panic!("Expected row {} but found None", i)); .unwrap_or_else(|| panic!("Expected row {} but found None", i));
assert_eq!( assert_eq!(
@@ -547,7 +608,7 @@ mod tests {
} }
assert!( assert!(
rows.next().await?.is_none(), rows_next!(rows).await?.is_none(),
"Expected no more rows after retrieving all inserted data" "Expected no more rows after retrieving all inserted data"
); );
@@ -604,9 +665,9 @@ mod tests {
let mut rows_iter = conn let mut rows_iter = conn
.query("SELECT count(*) FROM test_persistence;", ()) .query("SELECT count(*) FROM test_persistence;", ())
.await?; .await?;
let rows = rows_iter.next().await?.unwrap(); let rows = rows_next!(rows_iter).await?.unwrap();
assert_eq!(rows.get_value(0)?, Value::Integer(i as i64 + 1)); assert_eq!(rows.get_value(0)?, Value::Integer(i as i64 + 1));
assert!(rows_iter.next().await?.is_none()); assert!(rows_next!(rows_iter).await?.is_none());
} }
} }

View File

@@ -1,5 +1,21 @@
#[cfg(feature = "futures")]
use futures_util::TryStreamExt;
use turso::{Builder, Value}; use turso::{Builder, Value};
#[cfg(not(feature = "futures"))]
macro_rules! rows_next {
($rows:expr) => {
$rows.next()
};
}
#[cfg(feature = "futures")]
macro_rules! rows_next {
($rows:expr) => {
$rows.try_next()
};
}
#[tokio::test] #[tokio::test]
async fn test_rows_next() { async fn test_rows_next() {
let builder = Builder::new_local(":memory:"); let builder = Builder::new_local(":memory:");
@@ -34,24 +50,49 @@ async fn test_rows_next() {
.unwrap(); .unwrap();
let mut res = conn.query("SELECT * FROM test", ()).await.unwrap(); let mut res = conn.query("SELECT * FROM test", ()).await.unwrap();
assert_eq!( assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(), rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
1.into() 1.into()
); );
assert_eq!( assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(), rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
2.into() 2.into()
); );
assert_eq!( assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(), rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
3.into() 3.into()
); );
assert_eq!( assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(), rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
4.into() 4.into()
); );
assert_eq!( assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(), rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
5.into() 5.into()
); );
assert!(res.next().await.unwrap().is_none()); assert!(rows_next!(res).await.unwrap().is_none());
} }