Revert "Merge 'Rust binding improvements' from Pedro Muniz"

This reverts commit bd60cd214c, reversing
changes made to 74e48a3a8f because it
makes limbo_stress hang.
This commit is contained in:
Pekka Enberg
2025-07-03 12:28:10 +03:00
parent 4e5de4b03f
commit ca990e8fd1
10 changed files with 236 additions and 402 deletions

1
Cargo.lock generated
View File

@@ -3630,7 +3630,6 @@ dependencies = [
name = "turso"
version = "0.1.1"
dependencies = [
"futures-util",
"tempfile",
"thiserror 2.0.12",
"tokio",

View File

@@ -10,15 +10,13 @@ repository.workspace = true
description = "Turso Rust API"
[features]
default = ["futures"]
default = []
experimental_indexes = []
antithesis = ["turso_core/antithesis"]
futures = ["dep:futures-util"]
[dependencies]
turso_core = { workspace = true, features = ["io_uring"] }
thiserror = "2.0.9"
futures-util = { version = "0.3.31", optional = true, default-features = false, features = ["std", "async-await"] }
[dev-dependencies]
tempfile = "3.20.0"

View File

@@ -1,6 +1,3 @@
#![cfg(feature = "futures")]
use futures_util::stream::TryStreamExt;
use turso::Builder;
#[tokio::main]
@@ -9,15 +6,8 @@ async fn main() {
let conn = db.connect().unwrap();
// `query` and other methods, only parse the first query given.
let mut rows = conn.query("select 1; select 1;", ()).await.unwrap();
// Iterate over the rows with the Stream iterator syntax
while let Some(row) = rows.try_next().await.unwrap() {
let val = row.get_value(0).unwrap();
println!("{:?}", val);
}
conn.query("select 1; select 1;", ()).await.unwrap();
// Contrary to `prepare` and `query`, `execute` is not lazy and will execute the query to completion
conn.execute("CREATE TABLE IF NOT EXISTS users (email TEXT)", ())
.await
.unwrap();
@@ -42,20 +32,9 @@ async fn main() {
let mut rows = stmt.query(["foo@example.com"]).await.unwrap();
while let Some(row) = rows.try_next().await.unwrap() {
let row = rows.next().await.unwrap().unwrap();
let value = row.get_value(0).unwrap();
println!("Row: {:?}", value);
}
let rows = stmt.query(["foo@example.com"]).await.unwrap();
// As `Rows` implement streams you can easily map over it and apply other transformations you see fit
println!("Using Stream map");
rows.map_ok(|row| row.get_value(0).unwrap())
.try_for_each(|val| {
println!("Row: {:?}", val.as_text().unwrap());
futures_util::future::ready(Ok(()))
})
.await
.unwrap();
}

View File

@@ -20,33 +20,28 @@
//! You can also prepare statements with the [`Connection`] object and then execute the [`Statement`] objects:
//!
//! ```rust,no_run
//! # use crate::turso::futures_util::TryStreamExt;
//! # async fn run() {
//! # use turso::Builder;
//! # let db = Builder::new_local(":memory:").build().await.unwrap();
//! # let conn = db.connect().unwrap();
//! let mut stmt = conn.prepare("SELECT * FROM users WHERE email = ?1").await.unwrap();
//! let mut rows = stmt.query(["foo@example.com"]).await.unwrap();
//! let row = rows.try_next().await.unwrap().unwrap();
//! let row = rows.next().await.unwrap().unwrap();
//! let value = row.get_value(0).unwrap();
//! println!("Row: {:?}", value);
//! # }
//! ```
pub mod params;
pub mod row;
pub mod statement;
pub mod value;
#[cfg(feature = "futures")]
pub use futures_util;
pub use params::params_from_iter;
pub use value::Value;
pub use params::params_from_iter;
use crate::params::*;
use crate::row::{Row, Rows};
use crate::statement::Statement;
use std::fmt::Debug;
use std::num::NonZero;
use std::sync::{Arc, Mutex};
#[derive(Debug, thiserror::Error)]
@@ -67,7 +62,7 @@ impl From<turso_core::LimboError> for Error {
pub(crate) type BoxError = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub type Result<T> = std::result::Result<T, Error>;
/// A builder for `Database`.
pub struct Builder {
@@ -218,6 +213,119 @@ impl Debug for Connection {
}
}
/// A prepared statement.
pub struct Statement {
inner: Arc<Mutex<turso_core::Statement>>,
}
impl Clone for Statement {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
unsafe impl Send for Statement {}
unsafe impl Sync for Statement {}
impl Statement {
/// Query the database with this prepared statement.
pub async fn query(&mut self, params: impl IntoParams) -> Result<Rows> {
let params = params.into_params()?;
match params {
params::Params::None => (),
params::Params::Positional(values) => {
for (i, value) in values.into_iter().enumerate() {
let mut stmt = self.inner.lock().unwrap();
stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
}
}
params::Params::Named(values) => {
for (name, value) in values.into_iter() {
let mut stmt = self.inner.lock().unwrap();
let i = stmt.parameters().index(name).unwrap();
stmt.bind_at(i, value.into());
}
}
}
#[allow(clippy::arc_with_non_send_sync)]
let rows = Rows {
inner: Arc::clone(&self.inner),
};
Ok(rows)
}
/// Execute this prepared statement.
pub async fn execute(&mut self, params: impl IntoParams) -> Result<u64> {
{
// Reset the statement before executing
self.inner.lock().unwrap().reset();
}
let params = params.into_params()?;
match params {
params::Params::None => (),
params::Params::Positional(values) => {
for (i, value) in values.into_iter().enumerate() {
let mut stmt = self.inner.lock().unwrap();
stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
}
}
params::Params::Named(values) => {
for (name, value) in values.into_iter() {
let mut stmt = self.inner.lock().unwrap();
let i = stmt.parameters().index(name).unwrap();
stmt.bind_at(i, value.into());
}
}
}
loop {
let mut stmt = self.inner.lock().unwrap();
match stmt.step() {
Ok(turso_core::StepResult::Row) => {
// unexpected row during execution, error out.
return Ok(2);
}
Ok(turso_core::StepResult::Done) => {
return Ok(0);
}
Ok(turso_core::StepResult::IO) => {
let _ = stmt.run_once();
//return Ok(1);
}
Ok(turso_core::StepResult::Busy) => {
return Ok(4);
}
Ok(turso_core::StepResult::Interrupt) => {
return Ok(3);
}
Err(err) => {
return Err(err.into());
}
}
}
}
/// Returns columns of the result of this prepared statement.
pub fn columns(&self) -> Vec<Column> {
let stmt = self.inner.lock().unwrap();
let n = stmt.num_columns();
let mut cols = Vec::with_capacity(n);
for i in 0..n {
let name = stmt.get_column_name(i).into_owned();
cols.push(Column {
name,
decl_type: None, // TODO
});
}
cols
}
}
/// Column information.
pub struct Column {
name: String,
@@ -249,27 +357,100 @@ pub enum Params {
pub struct Transaction {}
/// Results of a prepared statement query.
pub struct Rows {
inner: Arc<Mutex<turso_core::Statement>>,
}
impl Clone for Rows {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
unsafe impl Send for Rows {}
unsafe impl Sync for Rows {}
impl Rows {
/// Fetch the next row of this result set.
pub async fn next(&mut self) -> Result<Option<Row>> {
loop {
let mut stmt = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()))?;
match stmt.step() {
Ok(turso_core::StepResult::Row) => {
let row = stmt.row().unwrap();
return Ok(Some(Row {
values: row.get_values().map(|v| v.to_owned()).collect(),
}));
}
Ok(turso_core::StepResult::Done) => return Ok(None),
Ok(turso_core::StepResult::IO) => {
if let Err(e) = stmt.run_once() {
return Err(e.into());
}
continue;
}
Ok(turso_core::StepResult::Busy) => return Ok(None),
Ok(turso_core::StepResult::Interrupt) => return Ok(None),
_ => return Ok(None),
}
}
}
}
/// Query result row.
#[derive(Debug)]
pub struct Row {
values: Vec<turso_core::Value>,
}
unsafe impl Send for Row {}
unsafe impl Sync for Row {}
impl Row {
pub fn get_value(&self, index: usize) -> Result<Value> {
let value = &self.values[index];
match value {
turso_core::Value::Integer(i) => Ok(Value::Integer(*i)),
turso_core::Value::Null => Ok(Value::Null),
turso_core::Value::Float(f) => Ok(Value::Real(*f)),
turso_core::Value::Text(text) => Ok(Value::Text(text.to_string())),
turso_core::Value::Blob(items) => Ok(Value::Blob(items.to_vec())),
}
}
pub fn column_count(&self) -> usize {
self.values.len()
}
}
impl<'a> FromIterator<&'a turso_core::Value> for Row {
fn from_iter<T: IntoIterator<Item = &'a turso_core::Value>>(iter: T) -> Self {
let values = iter
.into_iter()
.map(|v| match v {
turso_core::Value::Integer(i) => turso_core::Value::Integer(*i),
turso_core::Value::Null => turso_core::Value::Null,
turso_core::Value::Float(f) => turso_core::Value::Float(*f),
turso_core::Value::Text(s) => turso_core::Value::Text(s.clone()),
turso_core::Value::Blob(b) => turso_core::Value::Blob(b.clone()),
})
.collect();
Row { values }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "futures")]
use futures_util::TryStreamExt;
use tempfile::NamedTempFile;
#[cfg(not(feature = "futures"))]
macro_rules! rows_next {
($rows:expr) => {
$rows.next()
};
}
#[cfg(feature = "futures")]
macro_rules! rows_next {
($rows:expr) => {
$rows.try_next()
};
}
#[tokio::test]
async fn test_database_persistence() -> Result<()> {
let temp_file = NamedTempFile::new().unwrap();
@@ -298,13 +479,13 @@ mod tests {
.query("SELECT name FROM test_persistence ORDER BY id;", ())
.await?;
let row1 = rows_next!(rows).await?.expect("Expected first row");
let row1 = rows.next().await?.expect("Expected first row");
assert_eq!(row1.get_value(0)?, Value::Text("Alice".to_string()));
let row2 = rows_next!(rows).await?.expect("Expected second row");
let row2 = rows.next().await?.expect("Expected second row");
assert_eq!(row2.get_value(0)?, Value::Text("Bob".to_string()));
assert!(rows_next!(rows).await?.is_none(), "Expected no more rows");
assert!(rows.next().await?.is_none(), "Expected no more rows");
Ok(())
}
@@ -353,7 +534,8 @@ mod tests {
.await?;
for (i, value) in original_data.iter().enumerate().take(NUM_INSERTS) {
let row = rows_next!(rows)
let row = rows
.next()
.await?
.unwrap_or_else(|| panic!("Expected row {} but found None", i));
assert_eq!(
@@ -365,7 +547,7 @@ mod tests {
}
assert!(
rows_next!(rows).await?.is_none(),
rows.next().await?.is_none(),
"Expected no more rows after retrieving all inserted data"
);
@@ -422,9 +604,9 @@ mod tests {
let mut rows_iter = conn
.query("SELECT count(*) FROM test_persistence;", ())
.await?;
let rows = rows_next!(rows_iter).await?.unwrap();
let rows = rows_iter.next().await?.unwrap();
assert_eq!(rows.get_value(0)?, Value::Integer(i as i64 + 1));
assert!(rows_next!(rows_iter).await?.is_none());
assert!(rows_iter.next().await?.is_none());
}
}

View File

@@ -9,14 +9,14 @@ mod sealed {
use sealed::Sealed;
/// Converts some type into parameters that can be passed
/// to Turso.
/// to libsql.
///
/// The trait is sealed and not designed to be implemented by hand
/// but instead provides a few ways to use it.
///
/// # Passing parameters to Turso
/// # Passing parameters to libsql
///
/// Many functions in this library let you pass parameters to Turso. Doing this
/// Many functions in this library let you pass parameters to libsql. Doing this
/// lets you avoid any risk of SQL injection, and is simpler than escaping
/// things manually. These functions generally contain some parameter that generically
/// accepts some implementation this trait.
@@ -27,7 +27,7 @@ use sealed::Sealed;
///
/// - For heterogeneous parameter lists of 16 or less items a tuple syntax is supported
/// by doing `(1, "foo")`.
/// - For hetergeneous parameter lists of 16 or greater, the [`crate::params!`] is supported
/// - For hetergeneous parameter lists of 16 or greater, the [`turso::params!`] is supported
/// by doing `turso::params![1, "foo"]`.
/// - For homogeneous parameter types (where they are all the same type), const arrays are
/// supported by doing `[1, 2, 3]`.
@@ -62,8 +62,8 @@ use sealed::Sealed;
///
/// - For heterogeneous parameter lists of 16 or less items a tuple syntax is supported
/// by doing `(("key1", 1), ("key2", "foo"))`.
/// - For hetergeneous parameter lists of 16 or greater, the [`crate::named_params!`] is supported
/// by doing `turso::named_params!{"key1": 1, "key2": "foo"}`.
/// - For hetergeneous parameter lists of 16 or greater, the [`turso::params!`] is supported
/// by doing `turso::named_params!["key1": 1, "key2": "foo"]`.
/// - For homogeneous parameter types (where they are all the same type), const arrays are
/// supported by doing `[("key1", 1), ("key2, 2), ("key3", 3)]`.
///
@@ -77,7 +77,7 @@ use sealed::Sealed;
/// // Using a tuple:
/// stmt.execute(((":key1", 0), (":key2", "foobar"))).await?;
///
/// // Using `named_params!`:
/// // Using `turso::named_params!`:
/// stmt.execute(named_params! {":key1": 1i32, ":key2": "blah" }).await?;
///
/// // const array:
@@ -106,7 +106,7 @@ pub enum Params {
/// # Example
///
/// ```rust
/// # use turso::{Connection, params_from_iter};
/// # use turso::{Connection, params_from_iter, Rows};
/// # async fn run(conn: &Connection) {
///
/// let iter = vec![1, 2, 3];

View File

@@ -1,158 +0,0 @@
use std::sync::{Arc, Mutex};
use crate::Error;
use crate::Value;
/// Results of a prepared statement query.
#[must_use = "Rows is lazy and will do nothing unless consumed"]
pub struct Rows {
pub(crate) inner: Arc<Mutex<turso_core::Statement>>,
}
impl Clone for Rows {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
unsafe impl Send for Rows {}
unsafe impl Sync for Rows {}
#[cfg(not(feature = "futures"))]
impl Rows {
/// Fetch the next row of this result set.
pub async fn next(&mut self) -> crate::Result<Option<Row>> {
loop {
let mut stmt = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()))?;
match stmt.step().inspect_err(|_| stmt.reset())? {
turso_core::StepResult::Row => {
let row = stmt.row().unwrap();
return Ok(Some(Row {
values: row.get_values().map(|v| v.to_owned()).collect(),
}));
}
turso_core::StepResult::Done => {
stmt.reset();
return Ok(None);
}
turso_core::StepResult::IO => {
if let Err(e) = stmt.run_once() {
return Err(e.into());
}
continue;
}
// TODO: Busy should probably be an error
turso_core::StepResult::Busy | turso_core::StepResult::Interrupt => {
stmt.reset();
return Ok(None);
}
}
}
}
}
#[cfg(feature = "futures")]
impl futures_util::Stream for Rows {
type Item = crate::Result<Row>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::task::Poll;
let stmt = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()));
if let Err(err) = stmt {
return Poll::Ready(Some(Err(err)));
}
let mut stmt = stmt.unwrap();
match stmt.step() {
Ok(step_result) => match step_result {
turso_core::StepResult::Row => {
let row = stmt.row().unwrap();
Poll::Ready(Some(Ok(Row {
values: row.get_values().map(|v| v.to_owned()).collect(),
})))
}
turso_core::StepResult::Done => {
stmt.reset();
Poll::Ready(None)
}
turso_core::StepResult::IO => {
if let Err(e) = stmt.run_once() {
return Poll::Ready(Some(Err(e.into())));
}
// TODO: see correct way to signal for this task to wake up
Poll::Pending
}
// TODO: Busy and Interrupt should probably return errors
turso_core::StepResult::Busy | turso_core::StepResult::Interrupt => {
stmt.reset();
Poll::Ready(None)
}
},
Err(err) => {
stmt.reset();
Poll::Ready(Some(Err(Error::from(err))))
}
}
}
}
/// Query result row.
#[derive(Debug)]
pub struct Row {
values: Vec<turso_core::Value>,
}
unsafe impl Send for Row {}
unsafe impl Sync for Row {}
impl Row {
pub fn get_value(&self, index: usize) -> crate::Result<Value> {
let value = &self.values[index];
match value {
turso_core::Value::Integer(i) => Ok(Value::Integer(*i)),
turso_core::Value::Null => Ok(Value::Null),
turso_core::Value::Float(f) => Ok(Value::Real(*f)),
turso_core::Value::Text(text) => Ok(Value::Text(text.to_string())),
turso_core::Value::Blob(items) => Ok(Value::Blob(items.to_vec())),
}
}
pub fn column_count(&self) -> usize {
self.values.len()
}
}
impl<'a> FromIterator<&'a turso_core::Value> for Row {
fn from_iter<T: IntoIterator<Item = &'a turso_core::Value>>(iter: T) -> Self {
let values = iter
.into_iter()
.map(|v| match v {
turso_core::Value::Integer(i) => turso_core::Value::Integer(*i),
turso_core::Value::Null => turso_core::Value::Null,
turso_core::Value::Float(f) => turso_core::Value::Float(*f),
turso_core::Value::Text(s) => turso_core::Value::Text(s.clone()),
turso_core::Value::Blob(b) => turso_core::Value::Blob(b.clone()),
})
.collect();
Row { values }
}
}
#[cfg(test)]
mod tests {
#[tokio::test]
async fn test_row() {}
}

View File

@@ -1,124 +0,0 @@
use std::{
num::NonZero,
sync::{Arc, Mutex},
};
use crate::{
params::{self, IntoParams},
Column, Rows,
};
/// A prepared statement.
///
/// Statements when executed or queried are reset after they encounter an error or run to completion
pub struct Statement {
pub(crate) inner: Arc<Mutex<turso_core::Statement>>,
}
impl Clone for Statement {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
unsafe impl Send for Statement {}
unsafe impl Sync for Statement {}
impl Statement {
/// Query the database with this prepared statement.
pub async fn query(&mut self, params: impl IntoParams) -> crate::Result<Rows> {
let params = params.into_params()?;
match params {
params::Params::None => (),
params::Params::Positional(values) => {
for (i, value) in values.into_iter().enumerate() {
let mut stmt = self.inner.lock().unwrap();
stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
}
}
params::Params::Named(values) => {
for (name, value) in values.into_iter() {
let mut stmt = self.inner.lock().unwrap();
let i = stmt.parameters().index(name).unwrap();
stmt.bind_at(i, value.into());
}
}
}
#[allow(clippy::arc_with_non_send_sync)]
let rows = Rows {
inner: Arc::clone(&self.inner),
};
Ok(rows)
}
/// Execute this prepared statement.
pub async fn execute(&mut self, params: impl IntoParams) -> crate::Result<u64> {
{
// Reset the statement before executing
self.inner.lock().unwrap().reset();
}
let params = params.into_params()?;
match params {
params::Params::None => (),
params::Params::Positional(values) => {
for (i, value) in values.into_iter().enumerate() {
let mut stmt = self.inner.lock().unwrap();
stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
}
}
params::Params::Named(values) => {
for (name, value) in values.into_iter() {
let mut stmt = self.inner.lock().unwrap();
let i = stmt.parameters().index(name).unwrap();
stmt.bind_at(i, value.into());
}
}
}
loop {
let mut stmt = self.inner.lock().unwrap();
match stmt.step() {
Ok(turso_core::StepResult::Row) => {}
Ok(turso_core::StepResult::Done) => {
stmt.reset();
return Ok(0);
}
Ok(turso_core::StepResult::IO) => {
stmt.run_once()?;
}
Ok(turso_core::StepResult::Busy) => {
stmt.reset();
return Ok(4);
}
Ok(turso_core::StepResult::Interrupt) => {
stmt.reset();
return Ok(3);
}
Err(err) => {
stmt.reset();
return Err(err.into());
}
}
}
}
/// Returns columns of the result of this prepared statement.
pub fn columns(&self) -> Vec<Column> {
let stmt = self.inner.lock().unwrap();
let n = stmt.num_columns();
let mut cols = Vec::with_capacity(n);
for i in 0..n {
let name = stmt.get_column_name(i).into_owned();
cols.push(Column {
name,
decl_type: None, // TODO
});
}
cols
}
}

View File

@@ -11,7 +11,7 @@ pub enum Value {
Blob(Vec<u8>),
}
/// The possible types a column can be in Turso.
/// The possible types a column can be in libsql.
#[derive(Debug, Copy, Clone)]
pub enum ValueType {
Integer = 1,

View File

@@ -1,21 +1,5 @@
#[cfg(feature = "futures")]
use futures_util::TryStreamExt;
use turso::{Builder, Value};
#[cfg(not(feature = "futures"))]
macro_rules! rows_next {
($rows:expr) => {
$rows.next()
};
}
#[cfg(feature = "futures")]
macro_rules! rows_next {
($rows:expr) => {
$rows.try_next()
};
}
#[tokio::test]
async fn test_rows_next() {
let builder = Builder::new_local(":memory:");
@@ -50,49 +34,24 @@ async fn test_rows_next() {
.unwrap();
let mut res = conn.query("SELECT * FROM test", ()).await.unwrap();
assert_eq!(
rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
1.into()
);
assert_eq!(
rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
2.into()
);
assert_eq!(
rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
3.into()
);
assert_eq!(
rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
4.into()
);
assert_eq!(
rows_next!(res)
.await
.unwrap()
.unwrap()
.get_value(0)
.unwrap(),
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
5.into()
);
assert!(rows_next!(res).await.unwrap().is_none());
assert!(res.next().await.unwrap().is_none());
}

View File

@@ -15,7 +15,6 @@ use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
use turso::futures_util::TryStreamExt;
use turso::Builder;
pub struct Plan {
@@ -523,7 +522,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
const INTEGRITY_CHECK_INTERVAL: usize = 100;
if query_index % INTEGRITY_CHECK_INTERVAL == 0 {
let mut res = conn.query("PRAGMA integrity_check", ()).await.unwrap();
if let Some(row) = res.try_next().await? {
if let Some(row) = res.next().await? {
let value = row.get_value(0).unwrap();
if value != "ok".into() {
panic!("integrity check failed: {:?}", value);