Merge branch 'main' of github.com:danawanb/turso

This commit is contained in:
danawan
2025-08-20 07:38:17 +07:00
46 changed files with 981 additions and 501 deletions

52
Cargo.lock generated
View File

@@ -579,7 +579,7 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "core_tester"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"anyhow",
"assert_cmd",
@@ -1934,14 +1934,14 @@ dependencies = [
[[package]]
name = "limbo-go"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"turso_core",
]
[[package]]
name = "limbo_completion"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"mimalloc",
"turso_ext",
@@ -1949,7 +1949,7 @@ dependencies = [
[[package]]
name = "limbo_crypto"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"blake3",
"data-encoding",
@@ -1962,7 +1962,7 @@ dependencies = [
[[package]]
name = "limbo_csv"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"csv",
"mimalloc",
@@ -1972,7 +1972,7 @@ dependencies = [
[[package]]
name = "limbo_ipaddr"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"ipnetwork",
"mimalloc",
@@ -1981,7 +1981,7 @@ dependencies = [
[[package]]
name = "limbo_percentile"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"mimalloc",
"turso_ext",
@@ -1989,7 +1989,7 @@ dependencies = [
[[package]]
name = "limbo_regexp"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"mimalloc",
"regex",
@@ -1998,7 +1998,7 @@ dependencies = [
[[package]]
name = "limbo_sim"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"anarchist-readable-name-generator-lib",
"anyhow",
@@ -2025,7 +2025,7 @@ dependencies = [
[[package]]
name = "limbo_sqlite_test_ext"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"cc",
]
@@ -2695,7 +2695,7 @@ dependencies = [
[[package]]
name = "py-turso"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"anyhow",
"pyo3",
@@ -3794,7 +3794,7 @@ dependencies = [
[[package]]
name = "turso"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"rand 0.8.5",
"rand_chacha 0.3.1",
@@ -3806,7 +3806,7 @@ dependencies = [
[[package]]
name = "turso-java"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"jni",
"thiserror 2.0.12",
@@ -3815,7 +3815,7 @@ dependencies = [
[[package]]
name = "turso_cli"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"anyhow",
"cfg-if",
@@ -3848,7 +3848,7 @@ dependencies = [
[[package]]
name = "turso_core"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"antithesis_sdk",
"bitflags 2.9.0",
@@ -3903,7 +3903,7 @@ dependencies = [
[[package]]
name = "turso_dart"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"flutter_rust_bridge",
"turso_core",
@@ -3911,7 +3911,7 @@ dependencies = [
[[package]]
name = "turso_ext"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"chrono",
"getrandom 0.3.2",
@@ -3920,7 +3920,7 @@ dependencies = [
[[package]]
name = "turso_ext_tests"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"env_logger 0.11.7",
"lazy_static",
@@ -3931,7 +3931,7 @@ dependencies = [
[[package]]
name = "turso_macros"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"proc-macro2",
"quote",
@@ -3940,7 +3940,7 @@ dependencies = [
[[package]]
name = "turso_node"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"napi",
"napi-build",
@@ -3951,7 +3951,7 @@ dependencies = [
[[package]]
name = "turso_parser"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"bitflags 2.9.0",
"criterion",
@@ -3966,7 +3966,7 @@ dependencies = [
[[package]]
name = "turso_sqlite3"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"env_logger 0.11.7",
"libc",
@@ -3979,7 +3979,7 @@ dependencies = [
[[package]]
name = "turso_sqlite3_parser"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"bitflags 2.9.0",
"cc",
@@ -3997,7 +3997,7 @@ dependencies = [
[[package]]
name = "turso_stress"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"anarchist-readable-name-generator-lib",
"antithesis_sdk",
@@ -4013,7 +4013,7 @@ dependencies = [
[[package]]
name = "turso_sync_engine"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"base64",
"bytes",
@@ -4037,7 +4037,7 @@ dependencies = [
[[package]]
name = "turso_sync_js"
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
dependencies = [
"genawaiter",
"http",

View File

@@ -33,28 +33,28 @@ members = [
exclude = ["perf/latency/limbo"]
[workspace.package]
version = "0.1.4-pre.9"
version = "0.1.4-pre.10"
authors = ["the Limbo authors"]
edition = "2021"
license = "MIT"
repository = "https://github.com/tursodatabase/turso"
[workspace.dependencies]
turso = { path = "bindings/rust", version = "0.1.4-pre.9" }
turso_node = { path = "bindings/javascript", version = "0.1.4-pre.9" }
limbo_completion = { path = "extensions/completion", version = "0.1.4-pre.9" }
turso_core = { path = "core", version = "0.1.4-pre.9" }
turso_sync_engine = { path = "sync/engine", version = "0.1.4-pre.9" }
limbo_crypto = { path = "extensions/crypto", version = "0.1.4-pre.9" }
limbo_csv = { path = "extensions/csv", version = "0.1.4-pre.9" }
turso_ext = { path = "extensions/core", version = "0.1.4-pre.9" }
turso_ext_tests = { path = "extensions/tests", version = "0.1.4-pre.9" }
limbo_ipaddr = { path = "extensions/ipaddr", version = "0.1.4-pre.9" }
turso_macros = { path = "macros", version = "0.1.4-pre.9" }
limbo_percentile = { path = "extensions/percentile", version = "0.1.4-pre.9" }
limbo_regexp = { path = "extensions/regexp", version = "0.1.4-pre.9" }
turso_sqlite3_parser = { path = "vendored/sqlite3-parser", version = "0.1.4-pre.9" }
limbo_uuid = { path = "extensions/uuid", version = "0.1.4-pre.9" }
turso = { path = "bindings/rust", version = "0.1.4-pre.10" }
turso_node = { path = "bindings/javascript", version = "0.1.4-pre.10" }
limbo_completion = { path = "extensions/completion", version = "0.1.4-pre.10" }
turso_core = { path = "core", version = "0.1.4-pre.10" }
turso_sync_engine = { path = "sync/engine", version = "0.1.4-pre.10" }
limbo_crypto = { path = "extensions/crypto", version = "0.1.4-pre.10" }
limbo_csv = { path = "extensions/csv", version = "0.1.4-pre.10" }
turso_ext = { path = "extensions/core", version = "0.1.4-pre.10" }
turso_ext_tests = { path = "extensions/tests", version = "0.1.4-pre.10" }
limbo_ipaddr = { path = "extensions/ipaddr", version = "0.1.4-pre.10" }
turso_macros = { path = "macros", version = "0.1.4-pre.10" }
limbo_percentile = { path = "extensions/percentile", version = "0.1.4-pre.10" }
limbo_regexp = { path = "extensions/regexp", version = "0.1.4-pre.10" }
turso_sqlite3_parser = { path = "vendored/sqlite3-parser", version = "0.1.4-pre.10" }
limbo_uuid = { path = "extensions/uuid", version = "0.1.4-pre.10" }
strum = { version = "0.26", features = ["derive"] }
strum_macros = "0.26"
serde = "1.0"

View File

@@ -330,7 +330,7 @@ Once configured, you can ask Claude Code to:
- "What's the schema for the users table?"
- "Find all posts with more than 100 upvotes"
- "Insert a new user with name 'Alice' and email 'alice@example.com'"
</details>
## Contributing

View File

@@ -38,6 +38,7 @@ class Database {
db: NativeDB;
memory: boolean;
open: boolean;
private _inTransaction: boolean = false;
/**
* Creates a new database connection. If the database file pointed to by `path` does not exists, it will be created.
@@ -61,9 +62,7 @@ class Database {
Object.defineProperties(this, {
inTransaction: {
get() {
throw new Error("not implemented");
},
get: () => this._inTransaction,
},
name: {
get() {
@@ -117,12 +116,15 @@ class Database {
const wrapTxn = (mode) => {
return (...bindParameters) => {
db.exec("BEGIN " + mode);
db._inTransaction = true;
try {
const result = fn(...bindParameters);
db.exec("COMMIT");
db._inTransaction = false;
return result;
} catch (err) {
db.exec("ROLLBACK");
db._inTransaction = false;
throw err;
}
};
@@ -210,6 +212,15 @@ class Database {
throw new Error("not implemented");
}
/**
* Sets the default safe integers mode for all statements from this database.
*
* @param {boolean} [toggle] - Whether to use safe integers by default.
*/
defaultSafeIntegers(toggle) {
this.db.defaultSafeIntegers(toggle);
}
/**
* Closes the database connection.
*/
@@ -250,6 +261,25 @@ class Statement {
return this;
}
/**
* Sets safe integers mode for this statement.
*
* @param {boolean} [toggle] - Whether to use safe integers.
*/
safeIntegers(toggle) {
this.stmt.safeIntegers(toggle);
return this;
}
/**
* Get column information for the statement.
*
* @returns {Array} An array of column objects with name, column, table, database, and type properties.
*/
columns() {
return this.stmt.columns();
}
get source() {
throw new Error("not implemented");
}
@@ -370,12 +400,6 @@ class Statement {
throw new Error("not implemented");
}
/**
* Returns the columns in the result set returned by this prepared statement.
*/
columns() {
throw new Error("not implemented");
}
/**
* Binds the given parameters to the statement _permanently_

View File

@@ -67,6 +67,14 @@ export declare class Database {
* `Ok(())` if the database is closed successfully.
*/
close(): void
/**
* Sets the default safe integers mode for all statements from this database.
*
* # Arguments
*
* * `toggle` - Whether to use safe integers by default.
*/
defaultSafeIntegers(toggle?: boolean | undefined | null): void
/** Runs the I/O loop synchronously. */
ioLoopSync(): void
/** Runs the I/O loop asynchronously, returning a Promise. */
@@ -107,6 +115,16 @@ export declare class Statement {
raw(raw?: boolean | undefined | null): void
/** Sets the presentation mode to pluck. */
pluck(pluck?: boolean | undefined | null): void
/**
* Sets safe integers mode for this statement.
*
* # Arguments
*
* * `toggle` - Whether to use safe integers.
*/
safeIntegers(toggle?: boolean | undefined | null): void
/** Get column information for the statement */
columns(): unknown[]
/** Finalizes the statement. */
finalize(): void
}

View File

@@ -1,12 +1,12 @@
{
"name": "@tursodatabase/database",
"version": "0.1.4-pre.9",
"version": "0.1.4-pre.10",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@tursodatabase/database",
"version": "0.1.4-pre.9",
"version": "0.1.4-pre.10",
"license": "MIT",
"devDependencies": {
"@napi-rs/cli": "^3.0.4",

View File

@@ -1,6 +1,6 @@
{
"name": "@tursodatabase/database",
"version": "0.1.4-pre.9",
"version": "0.1.4-pre.10",
"repository": {
"type": "git",
"url": "https://github.com/tursodatabase/turso"
@@ -11,11 +11,12 @@
"type": "module",
"exports": {
".": "./dist/promise.js",
"./sync": "./dist/sync.js"
"./compat": "./dist/compat.js"
},
"files": [
"browser.js",
"index.js",
"index.d.ts",
"dist/**"
],
"types": "index.d.ts",

View File

@@ -38,6 +38,7 @@ class Database {
db: NativeDB;
memory: boolean;
open: boolean;
private _inTransaction: boolean = false;
/**
* Creates a new database connection. If the database file pointed to by `path` does not exists, it will be created.
*
@@ -65,9 +66,7 @@ class Database {
this.memory = db.memory;
Object.defineProperties(this, {
inTransaction: {
get() {
throw new Error("not implemented");
},
get: () => this._inTransaction,
},
name: {
get() {
@@ -121,12 +120,15 @@ class Database {
const wrapTxn = (mode) => {
return (...bindParameters) => {
db.exec("BEGIN " + mode);
db._inTransaction = true;
try {
const result = fn(...bindParameters);
db.exec("COMMIT");
db._inTransaction = false;
return result;
} catch (err) {
db.exec("ROLLBACK");
db._inTransaction = false;
throw err;
}
};
@@ -214,6 +216,15 @@ class Database {
throw new Error("not implemented");
}
/**
* Sets the default safe integers mode for all statements from this database.
*
* @param {boolean} [toggle] - Whether to use safe integers by default.
*/
defaultSafeIntegers(toggle) {
this.db.defaultSafeIntegers(toggle);
}
/**
* Closes the database connection.
*/
@@ -253,6 +264,25 @@ class Statement {
return this;
}
/**
* Sets safe integers mode for this statement.
*
* @param {boolean} [toggle] - Whether to use safe integers.
*/
safeIntegers(toggle) {
this.stmt.safeIntegers(toggle);
return this;
}
/**
* Get column information for the statement.
*
* @returns {Array} An array of column objects with name, column, table, database, and type properties.
*/
columns() {
return this.stmt.columns();
}
get source() {
throw new Error("not implemented");
}
@@ -376,12 +406,6 @@ class Statement {
throw new Error("not implemented");
}
/**
* Returns the columns in the result set returned by this prepared statement.
*/
columns() {
throw new Error("not implemented");
}
/**
* Binds the given parameters to the statement _permanently_

View File

@@ -41,6 +41,7 @@ pub struct Database {
conn: Arc<turso_core::Connection>,
is_memory: bool,
is_open: Cell<bool>,
default_safe_integers: Cell<bool>,
}
#[napi]
@@ -92,6 +93,7 @@ impl Database {
conn,
is_memory,
is_open: Cell::new(true),
default_safe_integers: Cell::new(false),
}
}
@@ -147,6 +149,7 @@ impl Database {
stmt: RefCell::new(Some(stmt)),
column_names,
mode: RefCell::new(PresentationMode::Expanded),
safe_integers: Cell::new(self.default_safe_integers.get()),
})
}
@@ -192,6 +195,16 @@ impl Database {
Ok(())
}
/// Sets the default safe integers mode for all statements from this database.
///
/// # Arguments
///
/// * `toggle` - Whether to use safe integers by default.
#[napi(js_name = "defaultSafeIntegers")]
pub fn default_safe_integers(&self, toggle: Option<bool>) {
self.default_safe_integers.set(toggle.unwrap_or(true));
}
/// Runs the I/O loop synchronously.
#[napi]
pub fn io_loop_sync(&self) -> Result<()> {
@@ -215,6 +228,7 @@ pub struct Statement {
stmt: RefCell<Option<turso_core::Statement>>,
column_names: Vec<std::ffi::CString>,
mode: RefCell<PresentationMode>,
safe_integers: Cell<bool>,
}
#[napi]
@@ -281,12 +295,22 @@ impl Statement {
ValueType::Null => turso_core::Value::Null,
ValueType::Number => {
let n: f64 = unsafe { value.cast()? };
if n.fract() == 0.0 {
if n.fract() == 0.0 && n >= i64::MIN as f64 && n <= i64::MAX as f64 {
turso_core::Value::Integer(n as i64)
} else {
turso_core::Value::Float(n)
}
}
ValueType::BigInt => {
let bigint_str = value.coerce_to_string()?.into_utf8()?.as_str()?.to_owned();
let bigint_value = bigint_str.parse::<i64>().map_err(|e| {
Error::new(
Status::NumberExpected,
format!("Failed to parse BigInt: {e}"),
)
})?;
turso_core::Value::Integer(bigint_value)
}
ValueType::String => {
let s = value.coerce_to_string()?.into_utf8()?;
turso_core::Value::Text(s.as_str()?.to_owned().into())
@@ -355,11 +379,12 @@ impl Statement {
.ok_or_else(|| Error::new(Status::GenericFailure, "No row data available"))?;
let mode = self.mode.borrow();
let safe_integers = self.safe_integers.get();
let row_value = match *mode {
PresentationMode::Raw => {
let mut raw_array = env.create_array(row_data.len() as u32)?;
for (idx, value) in row_data.get_values().enumerate() {
let js_value = to_js_value(env, value)?;
let js_value = to_js_value(env, value, safe_integers)?;
raw_array.set(idx as u32, js_value)?;
}
raw_array.coerce_to_object()?.to_unknown()
@@ -374,7 +399,7 @@ impl Statement {
napi::Status::GenericFailure,
"Pluck mode requires at least one column in the result",
))?;
to_js_value(env, value)?
to_js_value(env, value, safe_integers)?
}
PresentationMode::Expanded => {
let row = Object::new(env)?;
@@ -383,7 +408,7 @@ impl Statement {
for idx in 0..row_data.len() {
let value = row_data.get_value(idx);
let column_name = &self.column_names[idx];
let js_value = to_js_value(env, value)?;
let js_value = to_js_value(env, value, safe_integers)?;
unsafe {
napi::sys::napi_set_named_property(
raw_env,
@@ -418,6 +443,52 @@ impl Statement {
});
}
/// Sets safe integers mode for this statement.
///
/// # Arguments
///
/// * `toggle` - Whether to use safe integers.
#[napi(js_name = "safeIntegers")]
pub fn safe_integers(&self, toggle: Option<bool>) {
self.safe_integers.set(toggle.unwrap_or(true));
}
/// Get column information for the statement
#[napi]
pub fn columns<'env>(&self, env: &'env Env) -> Result<Array<'env>> {
let stmt_ref = self.stmt.borrow();
let stmt = stmt_ref
.as_ref()
.ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?;
let column_count = stmt.num_columns();
let mut js_array = env.create_array(column_count as u32)?;
for i in 0..column_count {
let mut js_obj = Object::new(env)?;
let column_name = stmt.get_column_name(i);
let column_type = stmt.get_column_type(i);
// Set the name property
js_obj.set("name", column_name.as_ref())?;
// Set type property if available
match column_type {
Some(type_str) => js_obj.set("type", type_str.as_str())?,
None => js_obj.set("type", ToNapiValue::into_unknown(Null, env)?)?,
}
// For now, set other properties to null since turso_core doesn't provide this metadata
js_obj.set("column", ToNapiValue::into_unknown(Null, env)?)?;
js_obj.set("table", ToNapiValue::into_unknown(Null, env)?)?;
js_obj.set("database", ToNapiValue::into_unknown(Null, env)?)?;
js_array.set(i as u32, js_obj)?;
}
Ok(js_array)
}
/// Finalizes the statement.
#[napi]
pub fn finalize(&self) -> Result<()> {
@@ -449,11 +520,22 @@ impl Task for IoLoopTask {
}
/// Convert a Turso value to a JavaScript value.
fn to_js_value<'a>(env: &'a napi::Env, value: &turso_core::Value) -> napi::Result<Unknown<'a>> {
fn to_js_value<'a>(
env: &'a napi::Env,
value: &turso_core::Value,
safe_integers: bool,
) -> napi::Result<Unknown<'a>> {
match value {
turso_core::Value::Null => ToNapiValue::into_unknown(Null, env),
turso_core::Value::Integer(i) => ToNapiValue::into_unknown(i, env),
turso_core::Value::Float(f) => ToNapiValue::into_unknown(f, env),
turso_core::Value::Integer(i) => {
if safe_integers {
let bigint = BigInt::from(*i);
ToNapiValue::into_unknown(bigint, env)
} else {
ToNapiValue::into_unknown(*i as f64, env)
}
}
turso_core::Value::Float(f) => ToNapiValue::into_unknown(*f, env),
turso_core::Value::Text(s) => ToNapiValue::into_unknown(s.as_str(), env),
turso_core::Value::Blob(b) => ToNapiValue::into_unknown(b, env),
}

View File

@@ -318,6 +318,12 @@ impl Connection {
Ok(())
}
/// Returns the rowid of the last row inserted.
pub fn last_insert_rowid(&self) -> i64 {
let conn = self.inner.lock().unwrap();
conn.last_insert_rowid()
}
/// Flush dirty pages to disk.
/// This will write the dirty pages to the WAL.
pub fn cacheflush(&self) -> Result<()> {

View File

@@ -12,27 +12,32 @@ async fn test_rows_next() {
conn.execute("INSERT INTO test (x) VALUES (1)", ())
.await
.unwrap();
assert_eq!(conn.last_insert_rowid(), 1);
conn.execute("INSERT INTO test (x) VALUES (2)", ())
.await
.unwrap();
assert_eq!(conn.last_insert_rowid(), 2);
conn.execute(
"INSERT INTO test (x) VALUES (:x)",
vec![(":x".to_string(), Value::Integer(3))],
)
.await
.unwrap();
assert_eq!(conn.last_insert_rowid(), 3);
conn.execute(
"INSERT INTO test (x) VALUES (@x)",
vec![("@x".to_string(), Value::Integer(4))],
)
.await
.unwrap();
assert_eq!(conn.last_insert_rowid(), 4);
conn.execute(
"INSERT INTO test (x) VALUES ($x)",
vec![("$x".to_string(), Value::Integer(5))],
)
.await
.unwrap();
assert_eq!(conn.last_insert_rowid(), 5);
let mut res = conn.query("SELECT * FROM test", ()).await.unwrap();
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),

View File

@@ -1,6 +1,6 @@
use thiserror::Error;
#[derive(Debug, Error, miette::Diagnostic)]
#[derive(Debug, Clone, Error, miette::Diagnostic)]
pub enum LimboError {
#[error("Corrupt database: {0}")]
Corrupt(String),
@@ -23,16 +23,10 @@ pub enum LimboError {
EnvVarError(#[from] std::env::VarError),
#[error("Transaction error: {0}")]
TxError(String),
#[error("I/O error: {0}")]
IOError(#[from] std::io::Error),
#[cfg(all(target_os = "linux", feature = "io_uring"))]
#[error("I/O error: {0}")]
UringIOError(String),
#[error(transparent)]
CompletionError(#[from] CompletionError),
#[error("Locking error: {0}")]
LockingError(String),
#[cfg(target_family = "unix")]
#[error("I/O error: {0}")]
RustixIOError(#[from] rustix::io::Errno),
#[error("Parse error: {0}")]
ParseIntError(#[from] std::num::ParseIntError),
#[error("Parse error: {0}")]
@@ -85,6 +79,49 @@ pub enum LimboError {
PlanningError(String),
}
// We only propagate the error kind so we can avoid string allocation in hot path and copying/cloning enums is cheaper
impl From<std::io::Error> for LimboError {
fn from(value: std::io::Error) -> Self {
Self::CompletionError(CompletionError::IOError(value.kind()))
}
}
#[cfg(target_family = "unix")]
impl From<rustix::io::Errno> for LimboError {
fn from(value: rustix::io::Errno) -> Self {
CompletionError::from(value).into()
}
}
#[cfg(all(target_os = "linux", feature = "io_uring"))]
impl From<&'static str> for LimboError {
fn from(value: &'static str) -> Self {
CompletionError::UringIOError(value).into()
}
}
// We only propagate the error kind
impl From<std::io::Error> for CompletionError {
fn from(value: std::io::Error) -> Self {
CompletionError::IOError(value.kind())
}
}
#[derive(Debug, Copy, Clone, Error)]
pub enum CompletionError {
#[error("I/O error: {0}")]
IOError(std::io::ErrorKind),
#[cfg(target_family = "unix")]
#[error("I/O error: {0}")]
RustixIOError(#[from] rustix::io::Errno),
#[cfg(all(target_os = "linux", feature = "io_uring"))]
#[error("I/O error: {0}")]
// TODO: if needed create an enum for IO Uring errors so that we don't have to pass strings around
UringIOError(&'static str),
#[error("Completion was aborted")]
Aborted,
}
#[macro_export]
macro_rules! bail_parse_error {
($($arg:tt)*) => {

View File

@@ -1,9 +1,31 @@
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Instant {
pub secs: i64,
pub micros: u32,
}
impl Instant {
pub fn to_system_time(self) -> SystemTime {
if self.secs >= 0 {
UNIX_EPOCH + Duration::new(self.secs as u64, self.micros * 1000)
} else {
let positive_secs = (-self.secs) as u64;
if self.micros > 0 {
// We have partial seconds that reduce the negative offset
// Need to borrow 1 second and subtract the remainder
let nanos_to_subtract = (1_000_000 - self.micros) * 1000;
UNIX_EPOCH - Duration::new(positive_secs - 1, nanos_to_subtract)
} else {
// Exactly N seconds before epoch
UNIX_EPOCH - Duration::new(positive_secs, 0)
}
}
}
}
impl<T: chrono::TimeZone> From<chrono::DateTime<T>> for Instant {
fn from(value: chrono::DateTime<T>) -> Self {
Instant {

View File

@@ -35,26 +35,9 @@ impl IO for GenericIO {
}))
}
fn wait_for_completion(&self, c: Completion) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
fn run_once(&self) -> Result<()> {
Ok(())
}
fn generate_random_number(&self) -> i64 {
let mut buf = [0u8; 8];
getrandom::getrandom(&mut buf).unwrap();
i64::from_ne_bytes(buf)
}
fn get_memory_io(&self) -> Arc<MemoryIO> {
Arc::new(MemoryIO::new())
}
}
impl Clock for GenericIO {
@@ -110,15 +93,14 @@ impl File for GenericFile {
fn sync(&self, c: Completion) -> Result<Completion> {
let mut file = self.file.borrow_mut();
file.sync_all().map_err(|err| LimboError::IOError(err))?;
file.sync_all()?;
c.complete(0);
Ok(c)
}
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
let mut file = self.file.borrow_mut();
file.set_len(len as u64)
.map_err(|err| LimboError::IOError(err))?;
file.set_len(len as u64)?;
c.complete(0);
Ok(c)
}

View File

@@ -3,7 +3,7 @@
use super::{common, Completion, CompletionInner, File, OpenFlags, IO};
use crate::io::clock::{Clock, Instant};
use crate::storage::wal::CKPT_BATCH_PAGES;
use crate::{turso_assert, LimboError, MemoryIO, Result};
use crate::{turso_assert, LimboError, Result};
use rustix::fs::{self, FlockOperation, OFlags};
use std::ptr::NonNull;
use std::{
@@ -260,9 +260,10 @@ impl InnerUringIO {
.register_files_update(slot, &[fd.as_raw_fd()])?;
return Ok(slot);
}
Err(LimboError::UringIOError(
"unable to register file, no free slots available".to_string(),
))
Err(crate::error::CompletionError::UringIOError(
"unable to register file, no free slots available",
)
.into())
}
fn unregister_file(&mut self, id: u32) -> Result<()> {
self.ring
@@ -314,7 +315,7 @@ impl WrappedIOUring {
if available_space == 0 {
// No space available, always return error if we dont flush all overflow entries
// to prevent out of order I/O operations
return Err(LimboError::UringIOError("squeue full".into()));
return Err(crate::error::CompletionError::UringIOError("squeue full").into());
}
// Push as many as we can
let to_push = std::cmp::min(available_space, self.overflow.len());
@@ -327,7 +328,9 @@ impl WrappedIOUring {
self.overflow.push_front(entry);
// No space available, always return error if we dont flush all overflow entries
// to prevent out of order I/O operations
return Err(LimboError::UringIOError("squeue full".into()));
return Err(
crate::error::CompletionError::UringIOError("squeue full").into()
);
}
self.pending_ops += 1;
}
@@ -499,13 +502,6 @@ impl IO for UringIO {
Ok(uring_file)
}
fn wait_for_completion(&self, c: Completion) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
fn run_once(&self) -> Result<()> {
trace!("run_once()");
let mut inner = self.inner.borrow_mut();
@@ -535,27 +531,15 @@ impl IO for UringIO {
}
}
fn generate_random_number(&self) -> i64 {
let mut buf = [0u8; 8];
getrandom::getrandom(&mut buf).unwrap();
i64::from_ne_bytes(buf)
}
fn get_memory_io(&self) -> Arc<MemoryIO> {
Arc::new(MemoryIO::new())
}
fn register_fixed_buffer(&self, ptr: std::ptr::NonNull<u8>, len: usize) -> Result<u32> {
turso_assert!(
len % 512 == 0,
"fixed buffer length must be logical block aligned"
);
let mut inner = self.inner.borrow_mut();
let slot = inner
.free_arenas
.iter()
.position(|e| e.is_none())
.ok_or_else(|| LimboError::UringIOError("no free fixed buffer slots".into()))?;
let slot = inner.free_arenas.iter().position(|e| e.is_none()).ok_or(
crate::error::CompletionError::UringIOError("no free fixed buffer slots"),
)?;
unsafe {
inner.ring.ring.submitter().register_buffers_update(
slot as u32,

View File

@@ -1,5 +1,5 @@
use super::{Buffer, Clock, Completion, File, OpenFlags, IO};
use crate::{LimboError, Result};
use crate::Result;
use crate::io::clock::Instant;
use std::{
@@ -48,10 +48,9 @@ impl IO for MemoryIO {
fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result<Arc<dyn File>> {
let mut files = self.files.lock().unwrap();
if !files.contains_key(path) && !flags.contains(OpenFlags::Create) {
return Err(LimboError::IOError(std::io::Error::new(
std::io::ErrorKind::NotFound,
"file not found",
)));
return Err(
crate::error::CompletionError::IOError(std::io::ErrorKind::NotFound).into(),
);
}
if !files.contains_key(path) {
files.insert(
@@ -70,23 +69,6 @@ impl IO for MemoryIO {
// nop
Ok(())
}
fn wait_for_completion(&self, c: Completion) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
fn generate_random_number(&self) -> i64 {
let mut buf = [0u8; 8];
getrandom::getrandom(&mut buf).unwrap();
i64::from_ne_bytes(buf)
}
fn get_memory_io(&self) -> Arc<MemoryIO> {
Arc::new(MemoryIO::new())
}
}
pub struct MemoryFile {

View File

@@ -1,13 +1,13 @@
use crate::storage::buffer_pool::ArenaBuffer;
use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE;
use crate::{BufferPool, Result};
use crate::{BufferPool, CompletionError, Result};
use bitflags::bitflags;
use cfg_block::cfg_block;
use std::cell::RefCell;
use std::fmt;
use std::ptr::NonNull;
use std::sync::Arc;
use std::{cell::Cell, fmt::Debug, pin::Pin};
use std::sync::{Arc, OnceLock};
use std::{fmt::Debug, pin::Pin};
pub trait File: Send + Sync {
fn lock_file(&self, exclusive: bool) -> Result<()>;
@@ -37,19 +37,23 @@ pub trait File: Send + Sync {
let total_written = total_written.clone();
let _cloned = buf.clone();
Completion::new_write(move |n| {
// reference buffer in callback to ensure alive for async io
let _buf = _cloned.clone();
// accumulate bytes actually reported by the backend
total_written.fetch_add(n as usize, Ordering::Relaxed);
if outstanding.fetch_sub(1, Ordering::AcqRel) == 1 {
// last one finished
c_main.complete(total_written.load(Ordering::Acquire) as i32);
if let Ok(n) = n {
// reference buffer in callback to ensure alive for async io
let _buf = _cloned.clone();
// accumulate bytes actually reported by the backend
total_written.fetch_add(n as usize, Ordering::Relaxed);
if outstanding.fetch_sub(1, Ordering::AcqRel) == 1 {
// last one finished
c_main.complete(total_written.load(Ordering::Acquire) as i32);
}
}
})
};
if let Err(e) = self.pwrite(pos, buf.clone(), child_c) {
// best-effort: mark as done so caller won't wait forever
c.complete(-1);
// best-effort: mark as abort so caller won't wait forever
// TODO: when we have `pwrite` and other I/O methods return CompletionError
// instead of LimboError, store the error inside
c.abort();
return Err(e);
}
pos += len;
@@ -82,11 +86,25 @@ pub trait IO: Clock + Send + Sync {
fn run_once(&self) -> Result<()>;
fn wait_for_completion(&self, c: Completion) -> Result<()>;
fn wait_for_completion(&self, c: Completion) -> Result<()> {
while !c.finished() {
self.run_once()?
}
if let Some(Some(err)) = c.inner.result.get().copied() {
return Err(err.into());
}
Ok(())
}
fn generate_random_number(&self) -> i64;
fn generate_random_number(&self) -> i64 {
let mut buf = [0u8; 8];
getrandom::getrandom(&mut buf).unwrap();
i64::from_ne_bytes(buf)
}
fn get_memory_io(&self) -> Arc<MemoryIO>;
fn get_memory_io(&self) -> Arc<MemoryIO> {
Arc::new(MemoryIO::new())
}
fn register_fixed_buffer(&self, _ptr: NonNull<u8>, _len: usize) -> Result<u32> {
Err(crate::LimboError::InternalError(
@@ -95,10 +113,10 @@ pub trait IO: Clock + Send + Sync {
}
}
pub type ReadComplete = dyn Fn(Arc<Buffer>, i32);
pub type WriteComplete = dyn Fn(i32);
pub type SyncComplete = dyn Fn(i32);
pub type TruncateComplete = dyn Fn(i32);
pub type ReadComplete = dyn Fn(Result<(Arc<Buffer>, i32), CompletionError>);
pub type WriteComplete = dyn Fn(Result<i32, CompletionError>);
pub type SyncComplete = dyn Fn(Result<i32, CompletionError>);
pub type TruncateComplete = dyn Fn(Result<i32, CompletionError>);
#[must_use]
#[derive(Debug, Clone)]
@@ -108,8 +126,10 @@ pub struct Completion {
#[derive(Debug)]
struct CompletionInner {
pub completion_type: CompletionType,
is_completed: Cell<bool>,
completion_type: CompletionType,
/// None means we completed successfully
// Thread safe with OnceLock
result: std::sync::OnceLock<Option<CompletionError>>,
}
impl Debug for CompletionType {
@@ -130,24 +150,19 @@ pub enum CompletionType {
Truncate(TruncateCompletion),
}
pub struct ReadCompletion {
pub buf: Arc<Buffer>,
pub complete: Box<ReadComplete>,
}
impl Completion {
pub fn new(completion_type: CompletionType) -> Self {
Self {
inner: Arc::new(CompletionInner {
completion_type,
is_completed: Cell::new(false),
result: OnceLock::new(),
}),
}
}
pub fn new_write<F>(complete: F) -> Self
where
F: Fn(i32) + 'static,
F: Fn(Result<i32, CompletionError>) + 'static,
{
Self::new(CompletionType::Write(WriteCompletion::new(Box::new(
complete,
@@ -156,7 +171,7 @@ impl Completion {
pub fn new_read<F>(buf: Arc<Buffer>, complete: F) -> Self
where
F: Fn(Arc<Buffer>, i32) + 'static,
F: Fn(Result<(Arc<Buffer>, i32), CompletionError>) + 'static,
{
Self::new(CompletionType::Read(ReadCompletion::new(
buf,
@@ -165,7 +180,7 @@ impl Completion {
}
pub fn new_sync<F>(complete: F) -> Self
where
F: Fn(i32) + 'static,
F: Fn(Result<i32, CompletionError>) + 'static,
{
Self::new(CompletionType::Sync(SyncCompletion::new(Box::new(
complete,
@@ -174,7 +189,7 @@ impl Completion {
pub fn new_trunc<F>(complete: F) -> Self
where
F: Fn(i32) + 'static,
F: Fn(Result<i32, CompletionError>) + 'static,
{
Self::new(CompletionType::Truncate(TruncateCompletion::new(Box::new(
complete,
@@ -189,21 +204,46 @@ impl Completion {
}
pub fn is_completed(&self) -> bool {
self.inner.is_completed.get()
self.inner.result.get().is_some_and(|val| val.is_none())
}
pub fn has_error(&self) -> bool {
self.inner.result.get().is_some_and(|val| val.is_some())
}
/// Checks if the Completion completed or errored
pub fn finished(&self) -> bool {
self.inner.result.get().is_some()
}
pub fn complete(&self, result: i32) {
if !self.inner.is_completed.get() {
if self.inner.result.set(None).is_ok() {
let result = Ok(result);
match &self.inner.completion_type {
CompletionType::Read(r) => r.complete(result),
CompletionType::Write(w) => w.complete(result),
CompletionType::Sync(s) => s.complete(result), // fix
CompletionType::Truncate(t) => t.complete(result),
CompletionType::Read(r) => r.callback(result),
CompletionType::Write(w) => w.callback(result),
CompletionType::Sync(s) => s.callback(result), // fix
CompletionType::Truncate(t) => t.callback(result),
};
self.inner.is_completed.set(true);
}
}
pub fn error(&self, err: CompletionError) {
if self.inner.result.set(Some(err)).is_ok() {
let result = Err(err);
match &self.inner.completion_type {
CompletionType::Read(r) => r.callback(result),
CompletionType::Write(w) => w.callback(result),
CompletionType::Sync(s) => s.callback(result), // fix
CompletionType::Truncate(t) => t.callback(result),
};
}
}
pub fn abort(&self) {
self.error(CompletionError::Aborted);
}
/// only call this method if you are sure that the completion is
/// a ReadCompletion, panics otherwise
pub fn as_read(&self) -> &ReadCompletion {
@@ -223,12 +263,9 @@ impl Completion {
}
}
pub struct WriteCompletion {
pub complete: Box<WriteComplete>,
}
pub struct SyncCompletion {
pub complete: Box<SyncComplete>,
pub struct ReadCompletion {
pub buf: Arc<Buffer>,
pub complete: Box<ReadComplete>,
}
impl ReadCompletion {
@@ -240,27 +277,35 @@ impl ReadCompletion {
&self.buf
}
pub fn complete(&self, bytes_read: i32) {
(self.complete)(self.buf.clone(), bytes_read);
pub fn callback(&self, bytes_read: Result<i32, CompletionError>) {
(self.complete)(bytes_read.map(|b| (self.buf.clone(), b)));
}
}
pub struct WriteCompletion {
pub complete: Box<WriteComplete>,
}
impl WriteCompletion {
pub fn new(complete: Box<WriteComplete>) -> Self {
Self { complete }
}
pub fn complete(&self, bytes_written: i32) {
pub fn callback(&self, bytes_written: Result<i32, CompletionError>) {
(self.complete)(bytes_written);
}
}
pub struct SyncCompletion {
pub complete: Box<SyncComplete>,
}
impl SyncCompletion {
pub fn new(complete: Box<SyncComplete>) -> Self {
Self { complete }
}
pub fn complete(&self, res: i32) {
pub fn callback(&self, res: Result<i32, CompletionError>) {
(self.complete)(res);
}
}
@@ -274,7 +319,7 @@ impl TruncateCompletion {
Self { complete }
}
pub fn complete(&self, res: i32) {
pub fn callback(&self, res: Result<i32, CompletionError>) {
(self.complete)(res);
}
}

View File

@@ -1,4 +1,4 @@
use super::{Completion, File, MemoryIO, OpenFlags, IO};
use super::{Completion, File, OpenFlags, IO};
use crate::error::LimboError;
use crate::io::clock::{Clock, Instant};
use crate::io::common;
@@ -398,23 +398,6 @@ impl IO for UnixIO {
Ok(())
}
fn wait_for_completion(&self, c: Completion) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
fn generate_random_number(&self) -> i64 {
let mut buf = [0u8; 8];
getrandom::getrandom(&mut buf).unwrap();
i64::from_ne_bytes(buf)
}
fn get_memory_io(&self) -> Arc<MemoryIO> {
Arc::new(MemoryIO::new())
}
}
enum CompletionCallback {

View File

@@ -1,4 +1,4 @@
use super::{Buffer, Completion, File, MemoryIO, OpenFlags, IO};
use super::{Buffer, Completion, File, OpenFlags, IO};
use crate::ext::VfsMod;
use crate::io::clock::{Clock, Instant};
use crate::io::CompletionInner;
@@ -44,11 +44,6 @@ impl IO for VfsMod {
Ok(())
}
fn wait_for_completion(&self, _c: Completion) -> Result<()> {
// for the moment anyway, this is currently a sync api
Ok(())
}
fn generate_random_number(&self) -> i64 {
if self.ctx.is_null() {
return -1;
@@ -56,10 +51,6 @@ impl IO for VfsMod {
let vfs = unsafe { &*self.ctx };
unsafe { (vfs.gen_random_number)() }
}
fn get_memory_io(&self) -> Arc<MemoryIO> {
Arc::new(MemoryIO::new())
}
}
impl VfsMod {

View File

@@ -1,4 +1,3 @@
use super::MemoryIO;
use crate::{Clock, Completion, File, Instant, LimboError, OpenFlags, Result, IO};
use parking_lot::RwLock;
use std::io::{Read, Seek, Write};
@@ -31,30 +30,10 @@ impl IO for WindowsIO {
}))
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn wait_for_completion(&self, c: Completion) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn run_once(&self) -> Result<()> {
Ok(())
}
#[instrument(skip_all, level = Level::TRACE)]
fn generate_random_number(&self) -> i64 {
let mut buf = [0u8; 8];
getrandom::getrandom(&mut buf).unwrap();
i64::from_ne_bytes(buf)
}
#[instrument(skip_all, level = Level::TRACE)]
fn get_memory_io(&self) -> Arc<MemoryIO> {
Arc::new(MemoryIO::new())
}
}
impl Clock for WindowsIO {
@@ -110,7 +89,7 @@ impl File for WindowsFile {
#[instrument(err, skip_all, level = Level::TRACE)]
fn sync(&self, c: Completion) -> Result<Completion> {
let file = self.file.write();
file.sync_all().map_err(LimboError::IOError)?;
file.sync_all()?;
c.complete(0);
Ok(c)
}
@@ -118,7 +97,7 @@ impl File for WindowsFile {
#[instrument(err, skip_all, level = Level::TRACE)]
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
let file = self.file.write();
file.set_len(len as u64).map_err(LimboError::IOError)?;
file.set_len(len as u64)?;
c.complete(0);
Ok(c)
}

View File

@@ -50,7 +50,7 @@ use crate::util::{OpenMode, OpenOptions};
use crate::vdbe::metrics::ConnectionMetrics;
use crate::vtab::VirtualTable;
use core::str;
pub use error::LimboError;
pub use error::{CompletionError, LimboError};
use fallible_iterator::FallibleIterator;
pub use io::clock::{Clock, Instant};
#[cfg(all(feature = "fs", target_family = "unix"))]
@@ -424,6 +424,7 @@ impl Database {
query_only: Cell::new(false),
view_transaction_states: RefCell::new(HashMap::new()),
metrics: RefCell::new(ConnectionMetrics::new()),
is_nested_stmt: Cell::new(false),
});
let builtin_syms = self.builtin_syms.borrow();
// add built-in extensions symbols to the connection to prevent having to load each time
@@ -447,7 +448,7 @@ impl Database {
"header read must be a multiple of 512 for O_DIRECT"
);
let buf = Arc::new(Buffer::new_temporary(PageSize::MIN as usize));
let c = Completion::new_read(buf.clone(), move |_buf, _| {});
let c = Completion::new_read(buf.clone(), move |_res| {});
let c = self.db_file.read_header(c)?;
self.io.wait_for_completion(c)?;
let page_size = u16::from_be_bytes(buf.as_slice()[16..18].try_into().unwrap());
@@ -848,6 +849,9 @@ pub struct Connection {
view_transaction_states: RefCell<HashMap<String, ViewTransactionState>>,
/// Connection-level metrics aggregation
pub metrics: RefCell<ConnectionMetrics>,
/// Whether the connection is executing a statement initiated by another statement.
/// Generally this is only true for ParseSchema.
is_nested_stmt: Cell<bool>,
}
impl Connection {
@@ -1305,9 +1309,9 @@ impl Connection {
Ok(result) => result,
// on windows, zero read will trigger UnexpectedEof
#[cfg(target_os = "windows")]
Err(LimboError::IOError(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return Ok(false)
}
Err(LimboError::CompletionError(CompletionError::IOError(
std::io::ErrorKind::UnexpectedEof,
))) => return Ok(false),
Err(err) => return Err(err),
};
@@ -2040,6 +2044,9 @@ impl Statement {
pub fn run_once(&self) -> Result<()> {
let res = self.pager.io.run_once();
if self.program.connection.is_nested_stmt.get() {
return res;
}
if res.is_err() {
let state = self.program.connection.transaction_state.get();
if let TransactionState::Write { .. } = state {
@@ -2069,6 +2076,32 @@ impl Statement {
}
}
pub fn get_column_type(&self, idx: usize) -> Option<String> {
let column = &self.program.result_columns.get(idx).expect("No column");
match &column.expr {
turso_sqlite3_parser::ast::Expr::Column {
table,
column: column_idx,
..
} => {
let table_ref = self
.program
.table_references
.find_table_by_internal_id(*table)?;
let table_column = table_ref.get_column_at(*column_idx)?;
match &table_column.ty {
crate::schema::Type::Integer => Some("INTEGER".to_string()),
crate::schema::Type::Real => Some("REAL".to_string()),
crate::schema::Type::Text => Some("TEXT".to_string()),
crate::schema::Type::Blob => Some("BLOB".to_string()),
crate::schema::Type::Numeric => Some("NUMERIC".to_string()),
crate::schema::Type::Null => None,
}
}
_ => None,
}
}
pub fn parameters(&self) -> &parameters::Parameters {
&self.program.parameters
}

View File

@@ -2547,9 +2547,14 @@ impl BTreeCursor {
// start loading right page first
let mut pgno: u32 = unsafe { right_pointer.cast::<u32>().read().swap_bytes() };
let current_sibling = sibling_pointer;
let mut completions = Vec::with_capacity(current_sibling + 1);
let mut completions: Vec<Completion> = Vec::with_capacity(current_sibling + 1);
for i in (0..=current_sibling).rev() {
let (page, c) = btree_read_page(&self.pager, pgno as usize)?;
let (page, c) =
btree_read_page(&self.pager, pgno as usize).inspect_err(|_| {
for c in completions.iter() {
c.abort();
}
})?;
{
// mark as dirty
let sibling_page = page.get();

View File

@@ -47,7 +47,7 @@ struct HashMapNode {
#[derive(Debug, PartialEq)]
pub enum CacheError {
InternalError(String),
Locked,
Locked { pgno: usize },
Dirty { pgno: usize },
Pinned { pgno: usize },
ActiveRefs,
@@ -189,7 +189,9 @@ impl DumbLruPageCache {
) -> Result<(), CacheError> {
let entry_mut = unsafe { entry.as_mut() };
if entry_mut.page.is_locked() {
return Err(CacheError::Locked);
return Err(CacheError::Locked {
pgno: entry_mut.page.get().id,
});
}
if entry_mut.page.is_dirty() {
return Err(CacheError::Dirty {
@@ -911,7 +913,10 @@ mod tests {
let mut cache = DumbLruPageCache::default();
let (_, mut entry) = insert_and_get_entry(&mut cache, 1);
unsafe { entry.as_mut().page.set_locked() };
assert_eq!(cache.detach(entry, false), Err(CacheError::Locked));
assert_eq!(
cache.detach(entry, false),
Err(CacheError::Locked { pgno: 1 })
);
cache.verify_list_integrity();
}

View File

@@ -128,8 +128,6 @@ pub type PageRef = Arc<Page>;
/// Page is locked for I/O to prevent concurrent access.
const PAGE_LOCKED: usize = 0b010;
/// Page had an I/O error.
const PAGE_ERROR: usize = 0b100;
/// Page is dirty. Flush needed.
const PAGE_DIRTY: usize = 0b1000;
/// Page's contents are loaded in memory.
@@ -168,18 +166,6 @@ impl Page {
self.get().flags.fetch_and(!PAGE_LOCKED, Ordering::Release);
}
pub fn is_error(&self) -> bool {
self.get().flags.load(Ordering::Relaxed) & PAGE_ERROR != 0
}
pub fn set_error(&self) {
self.get().flags.fetch_or(PAGE_ERROR, Ordering::Release);
}
pub fn clear_error(&self) {
self.get().flags.fetch_and(!PAGE_ERROR, Ordering::Release);
}
pub fn is_dirty(&self) -> bool {
self.get().flags.load(Ordering::Acquire) & PAGE_DIRTY != 0
}
@@ -962,6 +948,10 @@ impl Pager {
connection: &Connection,
wal_auto_checkpoint_disabled: bool,
) -> Result<IOResult<PagerCommitResult>> {
if connection.is_nested_stmt.get() {
// Parent statement will handle the transaction rollback.
return Ok(IOResult::Done(PagerCommitResult::Rollback));
}
tracing::trace!("end_tx(rollback={})", rollback);
let Some(wal) = self.wal.as_ref() else {
// TODO: Unsure what the semantics of "end_tx" is for in-memory databases, ephemeral tables and ephemeral indexes.
@@ -1134,7 +1124,7 @@ impl Pager {
.iter()
.copied()
.collect::<Vec<usize>>();
let mut completions = Vec::with_capacity(dirty_pages.len());
let mut completions: Vec<Completion> = Vec::with_capacity(dirty_pages.len());
for page_id in dirty_pages {
let page = {
let mut cache = self.page_cache.write();
@@ -1148,11 +1138,18 @@ impl Pager {
);
page
};
let c = wal.borrow_mut().append_frame(
page.clone(),
self.page_size.get().expect("page size not set"),
0,
)?;
let c = wal
.borrow_mut()
.append_frame(
page.clone(),
self.page_size.get().expect("page size not set"),
0,
)
.inspect_err(|_| {
for c in completions.iter() {
c.abort();
}
})?;
// TODO: invalidade previous completions if this one fails
completions.push(c);
}
@@ -1186,7 +1183,7 @@ impl Pager {
.get()
};
let dirty_len = self.dirty_pages.borrow().iter().len();
let mut completions = Vec::with_capacity(dirty_len);
let mut completions: Vec<Completion> = Vec::with_capacity(dirty_len);
for (curr_page_idx, page_id) in
self.dirty_pages.borrow().iter().copied().enumerate()
{
@@ -1212,11 +1209,18 @@ impl Pager {
};
// TODO: invalidade previous completions on error here
let c = wal.borrow_mut().append_frame(
page.clone(),
self.page_size.get().expect("page size not set"),
db_size,
)?;
let c = wal
.borrow_mut()
.append_frame(
page.clone(),
self.page_size.get().expect("page size not set"),
db_size,
)
.inspect_err(|_| {
for c in completions.iter() {
c.abort();
}
})?;
completions.push(c);
}
self.dirty_pages.borrow_mut().clear();

View File

@@ -62,7 +62,7 @@ use crate::storage::database::DatabaseStorage;
use crate::storage::pager::Pager;
use crate::storage::wal::{PendingFlush, READMARK_NOT_USED};
use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype};
use crate::{bail_corrupt_error, turso_assert, File, Result, WalFileShared};
use crate::{bail_corrupt_error, turso_assert, CompletionError, File, Result, WalFileShared};
use std::cell::{Cell, UnsafeCell};
use std::collections::{BTreeMap, HashMap};
use std::mem::MaybeUninit;
@@ -874,7 +874,10 @@ pub fn begin_read_page(
let buf = buffer_pool.get_page();
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(buf);
let complete = Box::new(move |mut buf: Arc<Buffer>, bytes_read: i32| {
let complete = Box::new(move |res: Result<(Arc<Buffer>, i32), CompletionError>| {
let Ok((mut buf, bytes_read)) = res else {
return;
};
let buf_len = buf.len();
turso_assert!(
(allow_empty_read && bytes_read == 0) || bytes_read == buf_len as i32,
@@ -884,16 +887,14 @@ pub fn begin_read_page(
if bytes_read == 0 {
buf = Arc::new(Buffer::new_temporary(0));
}
if finish_read_page(page_idx, buf, page.clone()).is_err() {
page.set_error();
}
finish_read_page(page_idx, buf, page.clone());
});
let c = Completion::new_read(buf, complete);
db_file.read_page(page_idx, c)
}
#[instrument(skip_all, level = Level::INFO)]
pub fn finish_read_page(page_idx: usize, buffer_ref: Arc<Buffer>, page: PageRef) -> Result<()> {
pub fn finish_read_page(page_idx: usize, buffer_ref: Arc<Buffer>, page: PageRef) {
tracing::trace!(page_idx);
let pos = if page_idx == DatabaseHeader::PAGE_ID {
DatabaseHeader::SIZE
@@ -906,7 +907,6 @@ pub fn finish_read_page(page_idx: usize, buffer_ref: Arc<Buffer>, page: PageRef)
page.clear_locked();
page.set_loaded();
}
Ok(())
}
#[instrument(skip_all, level = Level::DEBUG)]
@@ -925,7 +925,10 @@ pub fn begin_write_btree_page(pager: &Pager, page: &PageRef) -> Result<Completio
let write_complete = {
let buf_copy = buffer.clone();
Box::new(move |bytes_written: i32| {
Box::new(move |res: Result<i32, CompletionError>| {
let Ok(bytes_written) = res else {
return;
};
tracing::trace!("finish_write_btree_page");
let buf_copy = buf_copy.clone();
let buf_len = buf_copy.len();
@@ -1019,6 +1022,9 @@ pub fn write_pages_vectored(
let total_sz = (page_sz * run_bufs.len()) as i32;
let c = Completion::new_write(move |res| {
let Ok(res) = res else {
return;
};
// writev calls can sometimes return partial writes, but our `pwritev`
// implementation aggregates any partial writes and calls completion with total
turso_assert!(total_sz == res, "failed to write expected size");
@@ -1041,6 +1047,9 @@ pub fn write_pages_vectored(
if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 {
done.store(true, Ordering::Release);
}
for c in completions {
c.abort();
}
return Err(e);
}
}
@@ -1589,7 +1598,10 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
}));
let wal_file_shared_for_completion = wal_file_shared_ret.clone();
let complete: Box<ReadComplete> = Box::new(move |buf: Arc<Buffer>, bytes_read: i32| {
let complete: Box<ReadComplete> = Box::new(move |res: Result<(Arc<Buffer>, i32), _>| {
let Ok((buf, bytes_read)) = res else {
return;
};
let buf_slice = buf.as_slice();
turso_assert!(
bytes_read == buf_slice.len() as i32,
@@ -1887,7 +1899,10 @@ pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<
};
let cloned = buffer.clone();
let write_complete = move |bytes_written: i32| {
let write_complete = move |res: Result<i32, CompletionError>| {
let Ok(bytes_written) = res else {
return;
};
// make sure to reference buffer so it's alive for async IO
let _buf = cloned.clone();
turso_assert!(

View File

@@ -20,7 +20,8 @@ use crate::storage::sqlite3_ondisk::{
};
use crate::types::{IOCompletions, IOResult};
use crate::{
bail_corrupt_error, io_yield_many, io_yield_one, turso_assert, Buffer, LimboError, Result,
bail_corrupt_error, io_yield_many, io_yield_one, turso_assert, Buffer, CompletionError,
LimboError, Result,
};
use crate::{Completion, Page};
@@ -912,14 +913,17 @@ impl Wal for WalFile {
let offset = self.frame_offset(frame_id);
page.set_locked();
let frame = page.clone();
let complete = Box::new(move |buf: Arc<Buffer>, bytes_read: i32| {
let complete = Box::new(move |res: Result<(Arc<Buffer>, i32), CompletionError>| {
let Ok((buf, bytes_read)) = res else {
return;
};
let buf_len = buf.len();
turso_assert!(
bytes_read == buf_len as i32,
"read({bytes_read}) less than expected({buf_len}): frame_id={frame_id}"
);
let frame = frame.clone();
finish_read_page(page.get().id, buf, frame).unwrap();
finish_read_page(page.get().id, buf, frame);
});
begin_read_wal_frame(
&self.get_shared().file,
@@ -934,7 +938,10 @@ impl Wal for WalFile {
tracing::debug!("read_frame({})", frame_id);
let offset = self.frame_offset(frame_id);
let (frame_ptr, frame_len) = (frame.as_mut_ptr(), frame.len());
let complete = Box::new(move |buf: Arc<Buffer>, bytes_read: i32| {
let complete = Box::new(move |res: Result<(Arc<Buffer>, i32), CompletionError>| {
let Ok((buf, bytes_read)) = res else {
return;
};
let buf_len = buf.len();
turso_assert!(
bytes_read == buf_len as i32,
@@ -985,7 +992,10 @@ impl Wal for WalFile {
let (page_ptr, page_len) = (page.as_ptr(), page.len());
let complete = Box::new({
let conflict = conflict.clone();
move |buf: Arc<Buffer>, bytes_read: i32| {
move |res: Result<(Arc<Buffer>, i32), CompletionError>| {
let Ok((buf, bytes_read)) = res else {
return;
};
let buf_len = buf.len();
turso_assert!(
bytes_read == buf_len as i32,
@@ -1077,7 +1087,10 @@ impl Wal for WalFile {
let c = Completion::new_write({
let frame_bytes = frame_bytes.clone();
move |bytes_written| {
move |res: Result<i32, CompletionError>| {
let Ok(bytes_written) = res else {
return;
};
let frame_len = frame_bytes.len();
turso_assert!(
bytes_written == frame_len as i32,

View File

@@ -2015,41 +2015,45 @@ pub fn op_transaction(
// 1. We try to upgrade current version
let current_state = conn.transaction_state.get();
let (new_transaction_state, updated) = match (current_state, write) {
// pending state means that we tried beginning a tx and the method returned IO.
// instead of ending the read tx, just update the state to pending.
(TransactionState::PendingUpgrade, write) => {
turso_assert!(
*write,
"pending upgrade should only be set for write transactions"
);
(
let (new_transaction_state, updated) = if conn.is_nested_stmt.get() {
(current_state, false)
} else {
match (current_state, write) {
// pending state means that we tried beginning a tx and the method returned IO.
// instead of ending the read tx, just update the state to pending.
(TransactionState::PendingUpgrade, write) => {
turso_assert!(
*write,
"pending upgrade should only be set for write transactions"
);
(
TransactionState::Write {
schema_did_change: false,
},
true,
)
}
(TransactionState::Write { schema_did_change }, true) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Write { schema_did_change }, false) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Read, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
)
),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::None, false) => (TransactionState::Read, true),
}
(TransactionState::Write { schema_did_change }, true) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Write { schema_did_change }, false) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Read, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::None, false) => (TransactionState::Read, true),
};
// 2. Start transaction if needed
@@ -2073,12 +2077,20 @@ pub fn op_transaction(
}
} else {
if updated && matches!(current_state, TransactionState::None) {
turso_assert!(
!conn.is_nested_stmt.get(),
"nested stmt should not begin a new read transaction"
);
if let LimboResult::Busy = pager.begin_read_tx()? {
return Ok(InsnFunctionStepResult::Busy);
}
}
if updated && matches!(new_transaction_state, TransactionState::Write { .. }) {
turso_assert!(
!conn.is_nested_stmt.get(),
"nested stmt should not begin a new write transaction"
);
match pager.begin_write_tx()? {
IOResult::Done(r) => {
if let LimboResult::Busy = r {
@@ -6425,12 +6437,13 @@ pub fn op_parse_schema(
let previous_auto_commit = conn.auto_commit.get();
conn.auto_commit.set(false);
if let Some(where_clause) = where_clause {
let maybe_nested_stmt_err = if let Some(where_clause) = where_clause {
let stmt = conn.prepare(format!("SELECT * FROM sqlite_schema WHERE {where_clause}"))?;
conn.with_schema_mut(|schema| {
// TODO: This function below is synchronous, make it async
let existing_views = schema.materialized_views.clone();
conn.is_nested_stmt.set(true);
parse_schema_rows(
stmt,
schema,
@@ -6438,13 +6451,14 @@ pub fn op_parse_schema(
state.mv_tx_id,
existing_views,
)
})?;
})
} else {
let stmt = conn.prepare("SELECT * FROM sqlite_schema")?;
conn.with_schema_mut(|schema| {
// TODO: This function below is synchronous, make it async
let existing_views = schema.materialized_views.clone();
conn.is_nested_stmt.set(true);
parse_schema_rows(
stmt,
schema,
@@ -6452,9 +6466,11 @@ pub fn op_parse_schema(
state.mv_tx_id,
existing_views,
)
})?;
}
})
};
conn.is_nested_stmt.set(false);
conn.auto_commit.set(previous_auto_commit);
maybe_nested_stmt_err?;
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}

View File

@@ -18,7 +18,7 @@ use crate::{
types::{IOResult, ImmutableRecord, KeyInfo, RecordCursor, RefValue},
Result,
};
use crate::{io_yield_many, io_yield_one, return_if_io};
use crate::{io_yield_many, io_yield_one, return_if_io, CompletionError};
#[derive(Debug, Clone, Copy)]
enum SortState {
@@ -236,9 +236,13 @@ impl Sorter {
fn init_chunk_heap(&mut self) -> Result<IOResult<()>> {
match self.init_chunk_heap_state {
InitChunkHeapState::Start => {
let mut completions = Vec::with_capacity(self.chunks.len());
let mut completions: Vec<Completion> = Vec::with_capacity(self.chunks.len());
for chunk in self.chunks.iter_mut() {
let c = chunk.read()?;
let c = chunk.read().inspect_err(|_| {
for c in completions.iter() {
c.abort();
}
})?;
completions.push(c);
}
self.init_chunk_heap_state = InitChunkHeapState::PushChunk;
@@ -313,7 +317,7 @@ impl Sorter {
let chunk_file = match &self.temp_file {
Some(temp_file) => temp_file.file.clone(),
None => {
let temp_dir = tempfile::tempdir().map_err(LimboError::IOError)?;
let temp_dir = tempfile::tempdir()?;
let chunk_file_path = temp_dir.as_ref().join("chunk_file");
let chunk_file = self.io.open_file(
chunk_file_path.to_str().unwrap(),
@@ -489,7 +493,10 @@ impl SortedChunk {
let stored_buffer_copy = self.buffer.clone();
let stored_buffer_len_copy = self.buffer_len.clone();
let total_bytes_read_copy = self.total_bytes_read.clone();
let read_complete = Box::new(move |buf: Arc<Buffer>, bytes_read: i32| {
let read_complete = Box::new(move |res: Result<(Arc<Buffer>, i32), CompletionError>| {
let Ok((buf, bytes_read)) = res else {
return;
};
let read_buf_ref = buf.clone();
let read_buf = read_buf_ref.as_slice();
@@ -547,7 +554,10 @@ impl SortedChunk {
let buffer_ref_copy = buffer_ref.clone();
let chunk_io_state_copy = self.io_state.clone();
let write_complete = Box::new(move |bytes_written: i32| {
let write_complete = Box::new(move |res: Result<i32, CompletionError>| {
let Ok(bytes_written) = res else {
return;
};
chunk_io_state_copy.set(SortedChunkIOState::WriteComplete);
let buf_len = buffer_ref_copy.len();
if bytes_written < buf_len as i32 {

View File

@@ -2,8 +2,8 @@
This document describes the JavaScript API for Turso. The API is implemented in two different packages:
- **`bindings/javascript`**: Native bindings for the Turso database.
- **`packages/turso-serverless`**: Serverless driver for Turso Cloud databases.
- [@tursodatabase/database](https://www.npmjs.com/package/@tursodatabase/database) (`bindings/javascript`) - Native bindings for the Turso database.
- [@tursodatabase/serverless](https://www.npmjs.com/package/@tursodatabase/serverless) (`packages/turso-serverless`) - Serverless driver for Turso Cloud databases.
The API is compatible with the libSQL promise API, which is an asynchronous variant of the `better-sqlite3` API.

View File

@@ -129,6 +129,7 @@ export interface Client {
class LibSQLClient implements Client {
private session: Session;
private _closed = false;
private _defaultSafeIntegers = false;
constructor(config: Config) {
this.validateConfig(config);
@@ -246,15 +247,8 @@ class LibSQLClient implements Client {
normalizedStmt = this.normalizeStatement(stmtOrSql);
}
await this.session.sequence(normalizedStmt.sql);
// Return empty result set for sequence execution
return this.convertResult({
columns: [],
columnTypes: [],
rows: [],
rowsAffected: 0,
lastInsertRowid: undefined
});
const result = await this.session.execute(normalizedStmt.sql, normalizedStmt.args, this._defaultSafeIntegers);
return this.convertResult(result);
} catch (error: any) {
throw new LibsqlError(error.message, "EXECUTE_ERROR");
}

View File

@@ -16,6 +16,8 @@ export class Connection {
private config: Config;
private session: Session;
private isOpen: boolean = true;
private defaultSafeIntegerMode: boolean = false;
private _inTransaction: boolean = false;
constructor(config: Config) {
if (!config.url) {
@@ -23,28 +25,73 @@ export class Connection {
}
this.config = config;
this.session = new Session(config);
// Define inTransaction property
Object.defineProperty(this, 'inTransaction', {
get: () => this._inTransaction,
enumerable: true
});
}
/**
* Whether the database is currently in a transaction.
*/
get inTransaction(): boolean {
return this._inTransaction;
}
/**
* Prepare a SQL statement for execution.
*
* Each prepared statement gets its own session to avoid conflicts during concurrent execution.
* This method fetches column metadata using the describe functionality.
*
* @param sql - The SQL statement to prepare
* @returns A Statement object that can be executed multiple ways
* @returns A Promise that resolves to a Statement object with column metadata
*
* @example
* ```typescript
* const stmt = client.prepare("SELECT * FROM users WHERE id = ?");
* const stmt = await client.prepare("SELECT * FROM users WHERE id = ?");
* const columns = stmt.columns();
* const user = await stmt.get([123]);
* const allUsers = await stmt.all();
* ```
*/
prepare(sql: string): Statement {
async prepare(sql: string): Promise<Statement> {
if (!this.isOpen) {
throw new TypeError("The database connection is not open");
}
return new Statement(this.config, sql);
// Create a session to get column metadata via describe
const session = new Session(this.config);
const description = await session.describe(sql);
await session.close();
const stmt = new Statement(this.config, sql, description.cols);
if (this.defaultSafeIntegerMode) {
stmt.safeIntegers(true);
}
return stmt;
}
/**
* Execute a SQL statement and return all results.
*
* @param sql - The SQL statement to execute
* @param args - Optional array of parameter values
* @returns Promise resolving to the complete result set
*
* @example
* ```typescript
* const result = await client.execute("SELECT * FROM users WHERE id = ?", [123]);
* console.log(result.rows);
* ```
*/
async execute(sql: string, args?: any[]): Promise<any> {
if (!this.isOpen) {
throw new TypeError("The database connection is not open");
}
return this.session.execute(sql, args || [], this.defaultSafeIntegerMode);
}
/**
@@ -55,7 +102,7 @@ export class Connection {
*
* @example
* ```typescript
* const result = await client.execute("SELECT * FROM users");
* const result = await client.exec("SELECT * FROM users");
* console.log(result.rows);
* ```
*/
@@ -101,6 +148,72 @@ export class Connection {
return this.session.execute(sql);
}
/**
* Sets the default safe integers mode for all statements from this connection.
*
* @param toggle - Whether to use safe integers by default.
*/
defaultSafeIntegers(toggle?: boolean): void {
this.defaultSafeIntegerMode = toggle === false ? false : true;
}
/**
* Returns a function that executes the given function in a transaction.
*
* @param fn - The function to wrap in a transaction
* @returns A function that will execute fn within a transaction
*
* @example
* ```typescript
* const insert = await client.prepare("INSERT INTO users (name) VALUES (?)");
* const insertMany = client.transaction((users) => {
* for (const user of users) {
* insert.run([user]);
* }
* });
*
* await insertMany(['Alice', 'Bob', 'Charlie']);
* ```
*/
transaction(fn: (...args: any[]) => any): any {
if (typeof fn !== "function") {
throw new TypeError("Expected first argument to be a function");
}
const db = this;
const wrapTxn = (mode: string) => {
return async (...bindParameters: any[]) => {
await db.exec("BEGIN " + mode);
db._inTransaction = true;
try {
const result = await fn(...bindParameters);
await db.exec("COMMIT");
db._inTransaction = false;
return result;
} catch (err) {
await db.exec("ROLLBACK");
db._inTransaction = false;
throw err;
}
};
};
const properties = {
default: { value: wrapTxn("") },
deferred: { value: wrapTxn("DEFERRED") },
immediate: { value: wrapTxn("IMMEDIATE") },
exclusive: { value: wrapTxn("EXCLUSIVE") },
database: { value: this, enumerable: true },
};
Object.defineProperties(properties.default.value, properties);
Object.defineProperties(properties.deferred.value, properties);
Object.defineProperties(properties.immediate.value, properties);
Object.defineProperties(properties.exclusive.value, properties);
return properties.default.value;
}
/**
* Close the connection.
*

View File

@@ -1,4 +1,5 @@
// Turso serverless driver entry point
export { Connection, connect, type Config } from './connection.js';
export { Statement } from './statement.js';
export { DatabaseError } from './error.js';
export { DatabaseError } from './error.js';
export { type Column } from './protocol.js';

View File

@@ -62,9 +62,21 @@ export interface CloseRequest {
type: 'close';
}
export interface DescribeRequest {
type: 'describe';
sql: string;
}
export interface DescribeResult {
params: Array<{ name?: string }>;
cols: Column[];
is_explain: boolean;
is_readonly: boolean;
}
export interface PipelineRequest {
baton: string | null;
requests: (ExecuteRequest | BatchRequest | SequenceRequest | CloseRequest)[];
requests: (ExecuteRequest | BatchRequest | SequenceRequest | CloseRequest | DescribeRequest)[];
}
export interface PipelineResponse {
@@ -73,8 +85,8 @@ export interface PipelineResponse {
results: Array<{
type: 'ok' | 'error';
response?: {
type: 'execute' | 'batch' | 'sequence' | 'close';
result?: ExecuteResult;
type: 'execute' | 'batch' | 'sequence' | 'close' | 'describe';
result?: ExecuteResult | DescribeResult;
};
error?: {
message: string;
@@ -89,12 +101,20 @@ export function encodeValue(value: any): Value {
}
if (typeof value === 'number') {
if (Number.isInteger(value)) {
return { type: 'integer', value: value.toString() };
if (!Number.isFinite(value)) {
throw new Error("Only finite numbers (not Infinity or NaN) can be passed as arguments");
}
return { type: 'float', value };
}
if (typeof value === 'bigint') {
return { type: 'integer', value: value.toString() };
}
if (typeof value === 'boolean') {
return { type: 'integer', value: value ? '1' : '0' };
}
if (typeof value === 'string') {
return { type: 'text', value };
}
@@ -107,11 +127,14 @@ export function encodeValue(value: any): Value {
return { type: 'text', value: String(value) };
}
export function decodeValue(value: Value): any {
export function decodeValue(value: Value, safeIntegers: boolean = false): any {
switch (value.type) {
case 'null':
return null;
case 'integer':
if (safeIntegers) {
return BigInt(value.value as string);
}
return parseInt(value.value as string, 10);
case 'float':
return value.value as number;

View File

@@ -9,6 +9,8 @@ import {
type PipelineRequest,
type SequenceRequest,
type CloseRequest,
type DescribeRequest,
type DescribeResult,
type NamedArg,
type Value
} from './protocol.js';
@@ -48,16 +50,54 @@ export class Session {
this.baseUrl = normalizeUrl(config.url);
}
/**
* Describe a SQL statement to get its column metadata.
*
* @param sql - The SQL statement to describe
* @returns Promise resolving to the statement description
*/
async describe(sql: string): Promise<DescribeResult> {
const request: PipelineRequest = {
baton: this.baton,
requests: [{
type: "describe",
sql: sql
} as DescribeRequest]
};
const response = await executePipeline(this.baseUrl, this.config.authToken, request);
this.baton = response.baton;
if (response.base_url) {
this.baseUrl = response.base_url;
}
// Check for errors in the response
if (response.results && response.results[0]) {
const result = response.results[0];
if (result.type === "error") {
throw new DatabaseError(result.error?.message || 'Describe execution failed');
}
if (result.response?.type === "describe" && result.response.result) {
return result.response.result as DescribeResult;
}
}
throw new DatabaseError('Unexpected describe response');
}
/**
* Execute a SQL statement and return all results.
*
* @param sql - The SQL statement to execute
* @param args - Optional array of parameter values or object with named parameters
* @param safeIntegers - Whether to return integers as BigInt
* @returns Promise resolving to the complete result set
*/
async execute(sql: string, args: any[] | Record<string, any> = []): Promise<any> {
async execute(sql: string, args: any[] | Record<string, any> = [], safeIntegers: boolean = false): Promise<any> {
const { response, entries } = await this.executeRaw(sql, args);
const result = await this.processCursorEntries(entries);
const result = await this.processCursorEntries(entries, safeIntegers);
return result;
}
@@ -137,7 +177,7 @@ export class Session {
* @param entries - Async generator of cursor entries
* @returns Promise resolving to the processed result
*/
async processCursorEntries(entries: AsyncGenerator<CursorEntry>): Promise<any> {
async processCursorEntries(entries: AsyncGenerator<CursorEntry>, safeIntegers: boolean = false): Promise<any> {
let columns: string[] = [];
let columnTypes: string[] = [];
let rows: any[] = [];
@@ -154,7 +194,7 @@ export class Session {
break;
case 'row':
if (entry.row) {
const decodedRow = entry.row.map(decodeValue);
const decodedRow = entry.row.map(value => decodeValue(value, safeIntegers));
const rowObject = this.createRowObject(decodedRow, columns);
rows.push(rowObject);
}

View File

@@ -1,6 +1,7 @@
import {
decodeValue,
type CursorEntry
type CursorEntry,
type Column
} from './protocol.js';
import { Session, type SessionConfig } from './session.js';
import { DatabaseError } from './error.js';
@@ -18,10 +19,13 @@ export class Statement {
private session: Session;
private sql: string;
private presentationMode: 'expanded' | 'raw' | 'pluck' = 'expanded';
private safeIntegerMode: boolean = false;
private columnMetadata: Column[];
constructor(sessionConfig: SessionConfig, sql: string) {
constructor(sessionConfig: SessionConfig, sql: string, columns?: Column[]) {
this.session = new Session(sessionConfig);
this.sql = sql;
this.columnMetadata = columns || [];
}
@@ -43,6 +47,54 @@ export class Statement {
return this;
}
/**
* Enable pluck mode to return only the first column value from each row.
*
* @param pluck Enable or disable pluck mode. If you don't pass the parameter, pluck mode is enabled.
* @returns This statement instance for chaining
*
* @example
* ```typescript
* const stmt = client.prepare("SELECT id FROM users");
* const ids = await stmt.pluck().all();
* console.log(ids); // [1, 2, 3, ...]
* ```
*/
pluck(pluck?: boolean): Statement {
this.presentationMode = pluck === false ? 'expanded' : 'pluck';
return this;
}
/**
* Sets safe integers mode for this statement.
*
* @param toggle Whether to use safe integers. If you don't pass the parameter, safe integers mode is enabled.
* @returns This statement instance for chaining
*/
safeIntegers(toggle?: boolean): Statement {
this.safeIntegerMode = toggle === false ? false : true;
return this;
}
/**
* Get column information for this statement.
*
* @returns Array of column metadata objects matching the native bindings format
*
* @example
* ```typescript
* const stmt = await client.prepare("SELECT id, name, email FROM users");
* const columns = stmt.columns();
* console.log(columns); // [{ name: 'id', type: 'INTEGER', column: null, database: null, table: null }, ...]
* ```
*/
columns(): any[] {
return this.columnMetadata.map(col => ({
name: col.name,
type: col.decltype
}));
}
/**
* Executes the prepared statement.
*
@@ -58,7 +110,7 @@ export class Statement {
*/
async run(args?: any): Promise<any> {
const normalizedArgs = this.normalizeArgs(args);
const result = await this.session.execute(this.sql, normalizedArgs);
const result = await this.session.execute(this.sql, normalizedArgs, this.safeIntegerMode);
return { changes: result.rowsAffected, lastInsertRowid: result.lastInsertRowid };
}
@@ -79,19 +131,29 @@ export class Statement {
*/
async get(args?: any): Promise<any> {
const normalizedArgs = this.normalizeArgs(args);
const result = await this.session.execute(this.sql, normalizedArgs);
const result = await this.session.execute(this.sql, normalizedArgs, this.safeIntegerMode);
const row = result.rows[0];
if (!row) {
return undefined;
}
if (this.presentationMode === 'pluck') {
// In pluck mode, return only the first column value
return row[0];
}
if (this.presentationMode === 'raw') {
// In raw mode, return the row as a plain array (it already is one)
// The row object is already an array with column properties added
return [...row];
}
return row;
// In expanded mode, convert to plain object with named properties
const obj: any = {};
result.columns.forEach((col: string, i: number) => {
obj[col] = row[i];
});
return obj;
}
/**
@@ -109,11 +171,18 @@ export class Statement {
*/
async all(args?: any): Promise<any[]> {
const normalizedArgs = this.normalizeArgs(args);
const result = await this.session.execute(this.sql, normalizedArgs);
const result = await this.session.execute(this.sql, normalizedArgs, this.safeIntegerMode);
if (this.presentationMode === 'pluck') {
// In pluck mode, return only the first column value from each row
return result.rows.map((row: any) => row[0]);
}
if (this.presentationMode === 'raw') {
return result.rows.map((row: any) => [...row]);
}
// In expanded mode, convert rows to plain objects with named properties
return result.rows.map((row: any) => {
const obj: any = {};
result.columns.forEach((col: string, i: number) => {
@@ -156,8 +225,11 @@ export class Statement {
break;
case 'row':
if (entry.row) {
const decodedRow = entry.row.map(decodeValue);
if (this.presentationMode === 'raw') {
const decodedRow = entry.row.map(value => decodeValue(value, this.safeIntegerMode));
if (this.presentationMode === 'pluck') {
// In pluck mode, yield only the first column value
yield decodedRow[0];
} else if (this.presentationMode === 'raw') {
// In raw mode, yield arrays of values
yield decodedRow;
} else {

View File

@@ -645,6 +645,9 @@ impl Interaction {
&query_str[0..query_str.len().min(4096)],
err
);
if let Some(turso_core::LimboError::ParseError(e)) = err {
panic!("Unexpected parse error: {e}");
}
return Err(err.unwrap());
}
let mut rows = rows.unwrap().unwrap();

View File

@@ -16,7 +16,6 @@ use crate::{
Create, Delete, Drop, Insert, Query, Select,
},
table::SimValue,
FAULT_ERROR_MSG,
},
runner::env::SimulatorEnv,
};
@@ -752,12 +751,10 @@ impl Property {
Ok(Ok(()))
}
Err(err) => {
let msg = format!("{err}");
if msg.contains(FAULT_ERROR_MSG) {
Ok(Ok(()))
} else {
Err(LimboError::InternalError(msg))
}
// We cannot make any assumptions about the error content; all we are about is, if the statement errored,
// we don't shadow the results into the simulator env, i.e. we assume whatever the statement did was rolled back.
tracing::error!("Fault injection produced error: {err}");
Ok(Ok(()))
}
}
}),

View File

@@ -5,7 +5,7 @@ use std::{
use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha8Rng;
use turso_core::{Clock, Instant, MemoryIO, OpenFlags, PlatformIO, Result, IO};
use turso_core::{Clock, Instant, OpenFlags, PlatformIO, Result, IO};
use crate::{
model::FAULT_ERROR_MSG,
@@ -109,13 +109,6 @@ impl IO for SimulatorIO {
Ok(file)
}
fn wait_for_completion(&self, c: turso_core::Completion) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
fn run_once(&self) -> Result<()> {
if self.fault.get() {
self.nr_run_once_faults
@@ -135,8 +128,4 @@ impl IO for SimulatorIO {
fn generate_random_number(&self) -> i64 {
self.rng.borrow_mut().next_u64() as i64
}
fn get_memory_io(&self) -> Arc<turso_core::MemoryIO> {
Arc::new(MemoryIO::new())
}
}

View File

@@ -74,7 +74,10 @@ pub async fn db_bootstrap<C: ProtocolIO>(
buffer.as_mut_slice().copy_from_slice(chunk);
let mut completions = Vec::with_capacity(dbs.len());
for db in dbs {
let c = Completion::new_write(move |size| {
let c = Completion::new_write(move |res| {
let Ok(size) = res else {
return;
};
// todo(sivukhin): we need to error out in case of partial read
assert!(size as usize == content_len);
});
@@ -818,7 +821,10 @@ pub async fn reset_wal_file(
WAL_HEADER + WAL_FRAME_SIZE * (frames_count as usize)
};
tracing::debug!("reset db wal to the size of {} frames", frames_count);
let c = Completion::new_trunc(move |rc| {
let c = Completion::new_trunc(move |res| {
let Ok(rc) = res else {
return;
};
assert!(rc as usize == 0);
});
let c = wal.truncate(wal_size, c)?;

View File

@@ -34,7 +34,9 @@ impl IoOperations for Arc<dyn turso_core::IO> {
fn try_open(&self, path: &str) -> Result<Option<Arc<dyn turso_core::File>>> {
match self.open_file(path, OpenFlags::None, false) {
Ok(file) => Ok(Some(file)),
Err(LimboError::IOError(err)) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(LimboError::CompletionError(turso_core::CompletionError::IOError(
std::io::ErrorKind::NotFound,
))) => Ok(None),
Err(err) => Err(err.into()),
}
}
@@ -51,7 +53,12 @@ impl IoOperations for Arc<dyn turso_core::IO> {
file: Arc<dyn turso_core::File>,
len: usize,
) -> Result<()> {
let c = Completion::new_trunc(move |rc| tracing::debug!("file truncated: rc={}", rc));
let c = Completion::new_trunc(move |rc| {
let Ok(rc) = rc else {
return;
};
tracing::debug!("file truncated: rc={}", rc);
});
let c = file.truncate(len, c)?;
while !c.is_completed() {
coro.yield_(ProtocolCommand::IO).await?;

View File

@@ -1,6 +1,6 @@
{
"name": "@tursodatabase/sync",
"version": "0.1.4-pre.9",
"version": "0.1.4-pre.10",
"repository": {
"type": "git",
"url": "https://github.com/tursodatabase/turso"

View File

@@ -145,16 +145,16 @@ test.serial("Database.pragma() after close()", async (t) => {
// Database.transaction()
// ==========================================================================
test.skip("Database.transaction()", async (t) => {
test.serial("Database.transaction()", async (t) => {
const db = t.context.db;
const insert = await db.prepare(
"INSERT INTO users(name, email) VALUES (:name, :email)"
);
const insertMany = db.transaction((users) => {
const insertMany = db.transaction(async (users) => {
t.is(db.inTransaction, true);
for (const user of users) insert.run(user);
for (const user of users) await insert.run(user);
});
t.is(db.inTransaction, false);
@@ -166,12 +166,12 @@ test.skip("Database.transaction()", async (t) => {
t.is(db.inTransaction, false);
const stmt = await db.prepare("SELECT * FROM users WHERE id = ?");
t.is(stmt.get(3).name, "Joey");
t.is(stmt.get(4).name, "Sally");
t.is(stmt.get(5).name, "Junior");
t.is((await stmt.get(3)).name, "Joey");
t.is((await stmt.get(4)).name, "Sally");
t.is((await stmt.get(5)).name, "Junior");
});
test.skip("Database.transaction().immediate()", async (t) => {
test.serial("Database.transaction().immediate()", async (t) => {
const db = t.context.db;
const insert = await db.prepare(
"INSERT INTO users(name, email) VALUES (:name, :email)"
@@ -279,6 +279,17 @@ test.serial("Statement.get() [raw]", async (t) => {
t.deepEqual(await stmt.raw().get(1), [1, "Alice", "alice@example.org"]);
});
test.serial("Statement.get() values", async (t) => {
const db = t.context.db;
const stmt = (await db.prepare("SELECT ?")).raw();
t.deepEqual(await stmt.get(1), [1]);
t.deepEqual(await stmt.get(Number.MIN_VALUE), [Number.MIN_VALUE]);
t.deepEqual(await stmt.get(Number.MAX_VALUE), [Number.MAX_VALUE]);
t.deepEqual(await stmt.get(Number.MAX_SAFE_INTEGER), [Number.MAX_SAFE_INTEGER]);
t.deepEqual(await stmt.get(9007199254740991n), [9007199254740991]);
});
// ==========================================================================
// Statement.iterate()
// ==========================================================================
@@ -328,7 +339,7 @@ test.serial("Statement.all() [raw]", async (t) => {
t.deepEqual(await stmt.raw().all(), expected);
});
test.skip("Statement.all() [pluck]", async (t) => {
test.serial("Statement.all() [pluck]", async (t) => {
const db = t.context.db;
const stmt = await db.prepare("SELECT * FROM users");
@@ -339,7 +350,7 @@ test.skip("Statement.all() [pluck]", async (t) => {
t.deepEqual(await stmt.pluck().all(), expected);
});
test.skip("Statement.all() [default safe integers]", async (t) => {
test.serial("Statement.all() [default safe integers]", async (t) => {
const db = t.context.db;
db.defaultSafeIntegers();
const stmt = await db.prepare("SELECT * FROM users");
@@ -350,7 +361,7 @@ test.skip("Statement.all() [default safe integers]", async (t) => {
t.deepEqual(await stmt.raw().all(), expected);
});
test.skip("Statement.all() [statement safe integers]", async (t) => {
test.serial("Statement.all() [statement safe integers]", async (t) => {
const db = t.context.db;
const stmt = await db.prepare("SELECT * FROM users");
stmt.safeIntegers();
@@ -379,46 +390,31 @@ test.skip("Statement.raw() [failure]", async (t) => {
// Statement.columns()
// ==========================================================================
test.skip("Statement.columns()", async (t) => {
test.serial("Statement.columns()", async (t) => {
const db = t.context.db;
var stmt = undefined;
stmt = await db.prepare("SELECT 1");
t.deepEqual(stmt.columns(), [
{
column: null,
database: null,
name: '1',
table: null,
type: null,
},
]);
const columns1 = stmt.columns();
t.is(columns1.length, 1);
t.is(columns1[0].name, '1');
// For "SELECT 1", type varies by provider, so just check it exists
t.true('type' in columns1[0]);
stmt = await db.prepare("SELECT * FROM users WHERE id = ?");
t.deepEqual(stmt.columns(), [
{
column: "id",
database: "main",
name: "id",
table: "users",
type: "INTEGER",
},
{
column: "name",
database: "main",
name: "name",
table: "users",
type: "TEXT",
},
{
column: "email",
database: "main",
name: "email",
table: "users",
type: "TEXT",
},
]);
const columns2 = stmt.columns();
t.is(columns2.length, 3);
// Check column names and types only
t.is(columns2[0].name, "id");
t.is(columns2[0].type, "INTEGER");
t.is(columns2[1].name, "name");
t.is(columns2[1].type, "TEXT");
t.is(columns2[2].name, "email");
t.is(columns2[2].type, "TEXT");
});
// ==========================================================================

View File

@@ -138,7 +138,7 @@ test.serial("Database.pragma() after close()", async (t) => {
// Database.transaction()
// ==========================================================================
test.skip("Database.transaction()", async (t) => {
test.serial("Database.transaction()", async (t) => {
const db = t.context.db;
const insert = db.prepare(
@@ -164,7 +164,7 @@ test.skip("Database.transaction()", async (t) => {
t.is(stmt.get(5).name, "Junior");
});
test.skip("Database.transaction().immediate()", async (t) => {
test.serial("Database.transaction().immediate()", async (t) => {
const db = t.context.db;
const insert = db.prepare(
"INSERT INTO users(name, email) VALUES (:name, :email)"
@@ -334,7 +334,7 @@ test.serial("Statement.get() [raw]", async (t) => {
t.deepEqual(stmt.raw().get(1), [1, "Alice", "alice@example.org"]);
});
test.skip("Statement.get() values", async (t) => {
test.serial("Statement.get() values", async (t) => {
const db = t.context.db;
const stmt = db.prepare("SELECT ?").raw();
@@ -406,7 +406,7 @@ test.serial("Statement.all() [pluck]", async (t) => {
t.deepEqual(stmt.pluck().all(), expected);
});
test.skip("Statement.all() [default safe integers]", async (t) => {
test.serial("Statement.all() [default safe integers]", async (t) => {
const db = t.context.db;
db.defaultSafeIntegers();
const stmt = db.prepare("SELECT * FROM users");
@@ -417,7 +417,7 @@ test.skip("Statement.all() [default safe integers]", async (t) => {
t.deepEqual(stmt.raw().all(), expected);
});
test.skip("Statement.all() [statement safe integers]", async (t) => {
test.serial("Statement.all() [statement safe integers]", async (t) => {
const db = t.context.db;
const stmt = db.prepare("SELECT * FROM users");
stmt.safeIntegers();
@@ -446,46 +446,31 @@ test.skip("Statement.raw() [failure]", async (t) => {
// Statement.columns()
// ==========================================================================
test.skip("Statement.columns()", async (t) => {
test.serial("Statement.columns()", async (t) => {
const db = t.context.db;
var stmt = undefined;
stmt = db.prepare("SELECT 1");
t.deepEqual(stmt.columns(), [
{
column: null,
database: null,
name: '1',
table: null,
type: null,
},
]);
const columns1 = stmt.columns();
t.is(columns1.length, 1);
t.is(columns1[0].name, '1');
// For "SELECT 1", type varies by provider, so just check it exists
t.true('type' in columns1[0]);
stmt = db.prepare("SELECT * FROM users WHERE id = ?");
t.deepEqual(stmt.columns(), [
{
column: "id",
database: "main",
name: "id",
table: "users",
type: "INTEGER",
},
{
column: "name",
database: "main",
name: "name",
table: "users",
type: "TEXT",
},
{
column: "email",
database: "main",
name: "email",
table: "users",
type: "TEXT",
},
]);
stmt = await db.prepare("SELECT * FROM users WHERE id = ?");
const columns2 = stmt.columns();
t.is(columns2.length, 3);
// Check column names and types only
t.is(columns2[0].name, "id");
t.is(columns2[0].type, "INTEGER");
t.is(columns2[1].name, "name");
t.is(columns2[1].type, "TEXT");
t.is(columns2[2].name, "email");
t.is(columns2[2].type, "TEXT");
});
test.skip("Timeout option", async (t) => {
@@ -516,7 +501,7 @@ const connect = async (path, options = {}) => {
}
const provider = process.env.PROVIDER;
if (provider === "turso") {
const { Database, SqliteError }= await import("@tursodatabase/database/sync");
const { Database, SqliteError }= await import("@tursodatabase/database/compat");
const db = new Database(path, options);
return [db, path, provider, SqliteError];
}

View File

@@ -2,10 +2,9 @@
use std::error::Error;
use std::fmt;
use std::io;
/// Error with position
pub trait ScanError: Error + From<io::Error> + Sized {
pub trait ScanError: Error + Sized {
/// Update the position where the error occurs
fn position(&mut self, line: u64, column: usize, offset: usize);
}

View File

@@ -1,17 +1,14 @@
use std::error;
use std::fmt;
use std::io;
use crate::lexer::scan::ScanError;
use crate::parser::ParserError;
/// SQL lexer and parser errors
#[non_exhaustive]
#[derive(Debug, miette::Diagnostic)]
#[derive(Debug, Clone, miette::Diagnostic)]
#[diagnostic()]
pub enum Error {
/// I/O Error
Io(io::Error),
/// Lexer error
UnrecognizedToken(
Option<(u64, usize)>,
@@ -73,7 +70,6 @@ pub enum Error {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Self::Io(ref err) => err.fmt(f),
Self::UnrecognizedToken(pos, _) => {
write!(f, "unrecognized token at {:?}", pos.unwrap())
}
@@ -103,12 +99,6 @@ impl fmt::Display for Error {
impl error::Error for Error {}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Self {
Self::Io(err)
}
}
impl From<ParserError> for Error {
fn from(err: ParserError) -> Self {
Self::ParserError(err, None, None)
@@ -118,7 +108,6 @@ impl From<ParserError> for Error {
impl ScanError for Error {
fn position(&mut self, line: u64, column: usize, offset: usize) {
match *self {
Self::Io(_) => {}
Self::UnrecognizedToken(ref mut pos, ref mut src) => {
*pos = Some((line, column));
*src = Some((offset).into());

View File

@@ -16,7 +16,7 @@ use crate::dialect::Token;
use ast::{Cmd, ExplainKind, Name, Stmt};
/// Parser error
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub enum ParserError {
/// Syntax error
SyntaxError(String),