Merge branch 'tursodatabase:main' into main

This commit is contained in:
Danawan Bimantoro
2025-09-24 13:05:41 +07:00
committed by GitHub
48 changed files with 940 additions and 25313 deletions

View File

@@ -190,6 +190,41 @@ jobs:
shell: bash
- name: Test bindings
run: docker run --rm -v $(pwd):/build -w /build node:${{ matrix.node }}-slim yarn workspace @tursodatabase/database test
test-db-browser-binding:
name: Test DB bindings on browser@${{ matrix.node }}
needs:
- build
strategy:
fail-fast: false
matrix:
node:
- "20"
runs-on: blacksmith-4vcpu-ubuntu-2404
steps:
- uses: actions/checkout@v4
- name: Setup node
uses: useblacksmith/setup-node@v5
with:
node-version: ${{ matrix.node }}
- name: Install dependencies
run: yarn install
- name: Build common
run: yarn workspace @tursodatabase/database-common build
- name: Build browser-common
run: yarn workspace @tursodatabase/database-browser-common build
- name: Install playwright with deps
run: yarn workspace @tursodatabase/database-browser playwright install --with-deps
- name: Download all DB artifacts
uses: actions/download-artifact@v4
with:
path: bindings/javascript
merge-multiple: true
pattern: 'db*'
- name: List packages
run: ls -R .
shell: bash
- name: Test bindings
run: yarn workspace @tursodatabase/database-browser test
publish:
name: Publish
runs-on: ubuntu-latest
@@ -198,6 +233,7 @@ jobs:
id-token: write
needs:
- test-db-linux-x64-gnu-binding
- test-db-browser-binding
steps:
- uses: actions/checkout@v4
- name: Setup node

View File

@@ -539,7 +539,8 @@ Modifiers:
| SeekLt | Yes | |
| SeekRowid | Yes | |
| SeekEnd | Yes | |
| Sequence | No | |
| Sequence | Yes | |
| SequenceTest | Yes | |
| SetCookie | Yes | |
| ShiftLeft | Yes | |
| ShiftRight | Yes | |

View File

@@ -7,7 +7,6 @@
"": {
"version": "0.2.0-pre.7",
"workspaces": [
"packages/wasm-runtime",
"packages/common",
"packages/native",
"packages/browser-common",
@@ -67,28 +66,28 @@
"license": "Apache-2.0"
},
"node_modules/@emnapi/core": {
"version": "1.4.5",
"resolved": "https://registry.npmjs.org/@emnapi/core/-/core-1.4.5.tgz",
"integrity": "sha512-XsLw1dEOpkSX/WucdqUhPWP7hDxSvZiY+fsUC14h+FtQ2Ifni4znbBt8punRX+Uj2JG/uDb8nEHVKvrVlvdZ5Q==",
"version": "1.5.0",
"resolved": "https://registry.npmjs.org/@emnapi/core/-/core-1.5.0.tgz",
"integrity": "sha512-sbP8GzB1WDzacS8fgNPpHlp6C9VZe+SJP3F90W9rLemaQj2PzIuTEl1qDOYQf58YIpyjViI24y9aPWCjEzY2cg==",
"license": "MIT",
"dependencies": {
"@emnapi/wasi-threads": "1.0.4",
"@emnapi/wasi-threads": "1.1.0",
"tslib": "^2.4.0"
}
},
"node_modules/@emnapi/runtime": {
"version": "1.4.5",
"resolved": "https://registry.npmjs.org/@emnapi/runtime/-/runtime-1.4.5.tgz",
"integrity": "sha512-++LApOtY0pEEz1zrd9vy1/zXVaVJJ/EbAF3u0fXIzPJEDtnITsBGbbK0EkM72amhl/R5b+5xx0Y/QhcVOpuulg==",
"version": "1.5.0",
"resolved": "https://registry.npmjs.org/@emnapi/runtime/-/runtime-1.5.0.tgz",
"integrity": "sha512-97/BJ3iXHww3djw6hYIfErCZFee7qCtrneuLa20UXFCOTCfBM2cvQHjWJ2EG0s0MtdNwInarqCTz35i4wWXHsQ==",
"license": "MIT",
"dependencies": {
"tslib": "^2.4.0"
}
},
"node_modules/@emnapi/wasi-threads": {
"version": "1.0.4",
"resolved": "https://registry.npmjs.org/@emnapi/wasi-threads/-/wasi-threads-1.0.4.tgz",
"integrity": "sha512-PJR+bOmMOPH8AtcTGAyYNiuJ3/Fcoj2XN/gBEWzDIKh254XO+mM9XoXHk5GNEhodxeMznbg7BlRojVbKN+gC6g==",
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/@emnapi/wasi-threads/-/wasi-threads-1.1.0.tgz",
"integrity": "sha512-WI0DdZ8xFSbgMjR1sFsKABJ/C5OnRrjT06JXbZKexJGrDuPTzZdDYfFlsgcCXCyf+suG5QU2e/y1Wo2V/OapLQ==",
"license": "MIT",
"dependencies": {
"tslib": "^2.4.0"
@@ -1512,16 +1511,14 @@
}
},
"node_modules/@napi-rs/wasm-runtime": {
"version": "1.0.3",
"resolved": "https://registry.npmjs.org/@napi-rs/wasm-runtime/-/wasm-runtime-1.0.3.tgz",
"integrity": "sha512-rZxtMsLwjdXkMUGC3WwsPwLNVqVqnTJT6MNIB6e+5fhMcSCPP0AOsNWuMQ5mdCq6HNjs/ZeWAEchpqeprqBD2Q==",
"dev": true,
"version": "1.0.5",
"resolved": "https://registry.npmjs.org/@napi-rs/wasm-runtime/-/wasm-runtime-1.0.5.tgz",
"integrity": "sha512-TBr9Cf9onSAS2LQ2+QHx6XcC6h9+RIzJgbqG3++9TUZSH204AwEy5jg3BTQ0VATsyoGj4ee49tN/y6rvaOOtcg==",
"license": "MIT",
"optional": true,
"dependencies": {
"@emnapi/core": "^1.4.5",
"@emnapi/runtime": "^1.4.5",
"@tybys/wasm-util": "^0.10.0"
"@emnapi/core": "^1.5.0",
"@emnapi/runtime": "^1.5.0",
"@tybys/wasm-util": "^0.10.1"
}
},
"node_modules/@napi-rs/wasm-tools": {
@@ -1872,13 +1869,14 @@
"resolved": "sync/packages/common",
"link": true
},
"node_modules/@tursodatabase/wasm-runtime": {
"resolved": "packages/wasm-runtime",
"link": true
},
"node_modules/@tybys/wasm-util": {
"resolved": "packages/wasm-util",
"link": true
"version": "0.10.1",
"resolved": "https://registry.npmjs.org/@tybys/wasm-util/-/wasm-util-0.10.1.tgz",
"integrity": "sha512-9tTaPJLSiejZKx+Bmog4uSubteqTvFrVrURwkmHixBo0G4seD0zUxp98E1DzUBJxLQ3NPwXrGKDiVjwx/DpPsg==",
"license": "MIT",
"dependencies": {
"tslib": "^2.4.0"
}
},
"node_modules/@types/aria-query": {
"version": "5.0.4",
@@ -4614,7 +4612,7 @@
"version": "0.2.0-pre.7",
"license": "MIT",
"dependencies": {
"@tursodatabase/wasm-runtime": "^0.2.0-pre.7"
"@napi-rs/wasm-runtime": "^1.0.5"
},
"devDependencies": {
"typescript": "^5.9.2"
@@ -4645,29 +4643,6 @@
"vitest": "^3.2.4"
}
},
"packages/wasm-runtime": {
"name": "@tursodatabase/wasm-runtime",
"version": "0.2.0-pre.7",
"license": "MIT",
"dependencies": {
"@emnapi/core": "^1.4.5",
"@emnapi/runtime": "^1.4.5",
"@tybys/wasm-util": "^0.10.1"
}
},
"packages/wasm-runtime/node_modules/@tybys/wasm-util": {
"version": "0.10.1",
"resolved": "https://registry.npmjs.org/@tybys/wasm-util/-/wasm-util-0.10.1.tgz",
"integrity": "sha512-9tTaPJLSiejZKx+Bmog4uSubteqTvFrVrURwkmHixBo0G4seD0zUxp98E1DzUBJxLQ3NPwXrGKDiVjwx/DpPsg==",
"license": "MIT",
"dependencies": {
"tslib": "^2.4.0"
}
},
"packages/wasm-util": {
"dev": true,
"optional": true
},
"sync/packages/browser": {
"name": "@tursodatabase/sync-browser",
"version": "0.2.0-pre.7",

View File

@@ -6,7 +6,6 @@
"test": "npm run test --workspaces"
},
"workspaces": [
"packages/wasm-runtime",
"packages/common",
"packages/native",
"packages/browser-common",

View File

@@ -5,7 +5,7 @@ import {
WASI as __WASI,
instantiateNapiModuleSync,
MessageHandler
} from '@tursodatabase/wasm-runtime'
} from '@napi-rs/wasm-runtime'
function getUint8ArrayFromMemory(memory: WebAssembly.Memory, ptr: number, len: number): Uint8Array {
ptr = ptr >>> 0;

View File

@@ -23,6 +23,6 @@
"test": "echo 'no tests'"
},
"dependencies": {
"@tursodatabase/wasm-runtime": "^0.2.0-pre.7"
"@napi-rs/wasm-runtime": "^1.0.5"
}
}

View File

@@ -1 +0,0 @@
!dist

View File

@@ -1,44 +0,0 @@
MIT License
Copyright (c) 2020-present LongYinan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
MIT License
Copyright (c) 2018 GitHub
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1,264 +0,0 @@
'use strict';
// @ts-check
/**
* @param {unknown} value
*/
const getType = (value) => {
if (value === undefined) return 0
if (value === null) return 1
const t = typeof value;
if (t === 'boolean') return 2
if (t === 'number') return 3
if (t === 'string') return 4
if (t === 'object') return 6
if (t === 'bigint') return 9
return -1
};
/**
* @param {import('memfs').IFs} memfs
* @param {any} value
* @param {ReturnType<typeof getType>} type
* @returns {Uint8Array}
*/
const encodeValue = (memfs, value, type) => {
switch (type) {
case 0:
case 1:
return new Uint8Array(0)
case 2: {
const view = new Int32Array(1);
view[0] = value ? 1 : 0;
return new Uint8Array(view.buffer)
}
case 3: {
const view = new Float64Array(1);
view[0] = value;
return new Uint8Array(view.buffer)
}
case 4: {
const view = new TextEncoder().encode(value);
return view
}
case 6: {
function storeConstructor(obj, memfs, processed = new WeakSet()) {
if (!obj || typeof obj !== 'object') {
return
}
if (processed.has(obj)) {
return
}
processed.add(obj);
const [entry] =
Object.entries(memfs).filter(([_, v]) => v === obj.constructor)[0] ??
[];
if (entry) {
Object.defineProperty(obj, '__constructor__', {
configurable: true,
writable: true,
enumerable: true,
value: entry,
});
}
for (const value of Object.values(obj)) {
storeConstructor(value, memfs, processed);
}
}
storeConstructor(value, memfs);
const json = JSON.stringify(value, (_, value) => {
if (typeof value === 'bigint') {
return `BigInt(${String(value)})`
}
if (value instanceof Error) {
return {
...value,
message: value.message,
stack: value.stack,
__error__: value.constructor.name,
}
}
return value
});
const view = new TextEncoder().encode(json);
return view
}
case 9: {
const view = new BigInt64Array(1);
view[0] = value;
return new Uint8Array(view.buffer)
}
case -1:
default:
throw new Error('unsupported data')
}
};
/**
* @param {typeof import('memfs')} memfs
* @param {Uint8Array} payload
* @param {number} type
* @returns {any}
*/
const decodeValue = (memfs, payload, type) => {
if (type === 0) return undefined
if (type === 1) return null
if (type === 2)
return Boolean(new Int32Array(payload.buffer, payload.byteOffset, 1)[0])
if (type === 3)
return new Float64Array(payload.buffer, payload.byteOffset, 1)[0]
if (type === 4) return new TextDecoder().decode(payload.slice())
if (type === 6) {
const obj = JSON.parse(
new TextDecoder().decode(payload.slice()),
(_key, value) => {
if (typeof value === 'string') {
const matched = value.match(/^BigInt\((-?\d+)\)$/);
if (matched && matched[1]) {
return BigInt(matched[1])
}
}
return value
},
);
function loadConstructor(obj, memfs, processed = new WeakSet()) {
if (!obj || typeof obj !== 'object') {
return
}
if (processed.has(obj)) {
return
}
processed.add(obj);
if (obj.__constructor__) {
const ctor = obj.__constructor__;
delete obj.__constructor__;
Object.setPrototypeOf(obj, memfs[ctor].prototype);
}
for (const value of Object.values(obj)) {
loadConstructor(value, memfs, processed);
}
}
loadConstructor(obj, memfs);
if (obj.__error__) {
const name = obj.__error__;
const ErrorConstructor = globalThis[name] || Error;
delete obj.__error__;
const err = new ErrorConstructor(obj.message);
Object.defineProperty(err, 'stack', {
configurable: true,
enumerable: false,
writable: true,
value: err.stack,
});
Object.defineProperty(err, Symbol.toStringTag, {
configurable: true,
enumerable: false,
writable: true,
value: name,
});
for (const [k, v] of Object.entries(obj)) {
if (k === 'message' || k === 'stack') continue
err[k] = v;
}
return err
}
return obj
}
if (type === 9)
return new BigInt64Array(payload.buffer, payload.byteOffset, 1)[0]
throw new Error('unsupported data')
};
/**
* @param {import('memfs').IFs} fs
* @returns {(e: { data: { __fs__: { sab: Int32Array, type: keyof import('memfs').IFs, payload: any[] } } }) => void}
*/
// oxlint-disable-next-line no-unused-vars -- fixed in an upcoming release
const createOnMessage = (fs) =>
function onMessage(e) {
if (e.data.__fs__) {
/**
* 0..4 status(int32_t): 21(waiting) 0(success) 1(error)
* 5..8 type(napi_valuetype): 0(undefined) 1(null) 2(boolean) 3(number) 4(string) 6(jsonstring) 9(bigint) -1(unsupported)
* 9..16 payload_size(uint32_t) <= 1024
* 16..16 + payload_size payload_content
*/
const { sab, type, payload } = e.data.__fs__;
const fn = fs[type];
try {
const ret = fn.apply(fs, payload);
const t = getType(ret);
Atomics.store(sab, 1, t);
const v = encodeValue(fs, ret, t);
Atomics.store(sab, 2, v.length);
new Uint8Array(sab.buffer).set(v, 16);
Atomics.store(sab, 0, 0); // success
} catch (/** @type {any} */ err) {
const t = getType(err);
Atomics.store(sab, 1, t);
const v = encodeValue(fs, err, t);
Atomics.store(sab, 2, v.length);
new Uint8Array(sab.buffer).set(v, 16);
Atomics.store(sab, 0, 1); // error
} finally {
Atomics.notify(sab, 0);
}
}
};
/**
* @param {typeof import('memfs')} memfs
*/
const createFsProxy = (memfs) =>
new Proxy(
{},
{
get(_target, p, _receiver) {
/**
* @param {any[]} args
*/
return function (...args) {
const sab = new SharedArrayBuffer(16 + 10240);
const i32arr = new Int32Array(sab);
Atomics.store(i32arr, 0, 21);
postMessage({
__fs__: {
sab: i32arr,
type: p,
payload: args,
},
});
Atomics.wait(i32arr, 0, 21);
const status = Atomics.load(i32arr, 0);
const type = Atomics.load(i32arr, 1);
const size = Atomics.load(i32arr, 2);
const content = new Uint8Array(sab, 16, size);
const value = decodeValue(memfs, content, type);
if (status === 1) {
throw value
}
return value
}
},
},
);
exports.createFsProxy = createFsProxy;
exports.createOnMessage = createOnMessage;

File diff suppressed because one or more lines are too long

View File

@@ -1,259 +0,0 @@
// @ts-check
/**
* @param {unknown} value
*/
const getType = (value) => {
if (value === undefined) return 0
if (value === null) return 1
const t = typeof value
if (t === 'boolean') return 2
if (t === 'number') return 3
if (t === 'string') return 4
if (t === 'object') return 6
if (t === 'bigint') return 9
return -1
}
/**
* @param {import('memfs').IFs} memfs
* @param {any} value
* @param {ReturnType<typeof getType>} type
* @returns {Uint8Array}
*/
const encodeValue = (memfs, value, type) => {
switch (type) {
case 0:
case 1:
return new Uint8Array(0)
case 2: {
const view = new Int32Array(1)
view[0] = value ? 1 : 0
return new Uint8Array(view.buffer)
}
case 3: {
const view = new Float64Array(1)
view[0] = value
return new Uint8Array(view.buffer)
}
case 4: {
const view = new TextEncoder().encode(value)
return view
}
case 6: {
function storeConstructor(obj, memfs, processed = new WeakSet()) {
if (!obj || typeof obj !== 'object') {
return
}
if (processed.has(obj)) {
return
}
processed.add(obj)
const [entry] =
Object.entries(memfs).filter(([_, v]) => v === obj.constructor)[0] ??
[]
if (entry) {
Object.defineProperty(obj, '__constructor__', {
configurable: true,
writable: true,
enumerable: true,
value: entry,
})
}
for (const value of Object.values(obj)) {
storeConstructor(value, memfs, processed)
}
}
storeConstructor(value, memfs)
const json = JSON.stringify(value, (_, value) => {
if (typeof value === 'bigint') {
return `BigInt(${String(value)})`
}
if (value instanceof Error) {
return {
...value,
message: value.message,
stack: value.stack,
__error__: value.constructor.name,
}
}
return value
})
const view = new TextEncoder().encode(json)
return view
}
case 9: {
const view = new BigInt64Array(1)
view[0] = value
return new Uint8Array(view.buffer)
}
case -1:
default:
throw new Error('unsupported data')
}
}
/**
* @param {typeof import('memfs')} memfs
* @param {Uint8Array} payload
* @param {number} type
* @returns {any}
*/
const decodeValue = (memfs, payload, type) => {
if (type === 0) return undefined
if (type === 1) return null
if (type === 2)
return Boolean(new Int32Array(payload.buffer, payload.byteOffset, 1)[0])
if (type === 3)
return new Float64Array(payload.buffer, payload.byteOffset, 1)[0]
if (type === 4) return new TextDecoder().decode(payload.slice())
if (type === 6) {
const obj = JSON.parse(
new TextDecoder().decode(payload.slice()),
(_key, value) => {
if (typeof value === 'string') {
const matched = value.match(/^BigInt\((-?\d+)\)$/)
if (matched && matched[1]) {
return BigInt(matched[1])
}
}
return value
},
)
function loadConstructor(obj, memfs, processed = new WeakSet()) {
if (!obj || typeof obj !== 'object') {
return
}
if (processed.has(obj)) {
return
}
processed.add(obj)
if (obj.__constructor__) {
const ctor = obj.__constructor__
delete obj.__constructor__
Object.setPrototypeOf(obj, memfs[ctor].prototype)
}
for (const value of Object.values(obj)) {
loadConstructor(value, memfs, processed)
}
}
loadConstructor(obj, memfs)
if (obj.__error__) {
const name = obj.__error__
const ErrorConstructor = globalThis[name] || Error
delete obj.__error__
const err = new ErrorConstructor(obj.message)
Object.defineProperty(err, 'stack', {
configurable: true,
enumerable: false,
writable: true,
value: err.stack,
})
Object.defineProperty(err, Symbol.toStringTag, {
configurable: true,
enumerable: false,
writable: true,
value: name,
})
for (const [k, v] of Object.entries(obj)) {
if (k === 'message' || k === 'stack') continue
err[k] = v
}
return err
}
return obj
}
if (type === 9)
return new BigInt64Array(payload.buffer, payload.byteOffset, 1)[0]
throw new Error('unsupported data')
}
/**
* @param {import('memfs').IFs} fs
* @returns {(e: { data: { __fs__: { sab: Int32Array, type: keyof import('memfs').IFs, payload: any[] } } }) => void}
*/
// oxlint-disable-next-line no-unused-vars -- fixed in an upcoming release
export const createOnMessage = (fs) =>
function onMessage(e) {
if (e.data.__fs__) {
/**
* 0..4 status(int32_t): 21(waiting) 0(success) 1(error)
* 5..8 type(napi_valuetype): 0(undefined) 1(null) 2(boolean) 3(number) 4(string) 6(jsonstring) 9(bigint) -1(unsupported)
* 9..16 payload_size(uint32_t) <= 1024
* 16..16 + payload_size payload_content
*/
const { sab, type, payload } = e.data.__fs__
const fn = fs[type]
try {
const ret = fn.apply(fs, payload)
const t = getType(ret)
Atomics.store(sab, 1, t)
const v = encodeValue(fs, ret, t)
Atomics.store(sab, 2, v.length)
new Uint8Array(sab.buffer).set(v, 16)
Atomics.store(sab, 0, 0) // success
} catch (/** @type {any} */ err) {
const t = getType(err)
Atomics.store(sab, 1, t)
const v = encodeValue(fs, err, t)
Atomics.store(sab, 2, v.length)
new Uint8Array(sab.buffer).set(v, 16)
Atomics.store(sab, 0, 1) // error
} finally {
Atomics.notify(sab, 0)
}
}
}
/**
* @param {typeof import('memfs')} memfs
*/
export const createFsProxy = (memfs) =>
new Proxy(
{},
{
get(_target, p, _receiver) {
/**
* @param {any[]} args
*/
return function (...args) {
const sab = new SharedArrayBuffer(16 + 10240)
const i32arr = new Int32Array(sab)
Atomics.store(i32arr, 0, 21)
postMessage({
__fs__: {
sab: i32arr,
type: p,
payload: args,
},
})
Atomics.wait(i32arr, 0, 21)
const status = Atomics.load(i32arr, 0)
const type = Atomics.load(i32arr, 1)
const size = Atomics.load(i32arr, 2)
const content = new Uint8Array(sab, 16, size)
const value = decodeValue(memfs, content, type)
if (status === 1) {
throw value
}
return value
}
},
},
)

View File

@@ -1,42 +0,0 @@
{
"name": "@tursodatabase/wasm-runtime",
"version": "0.2.0-pre.7",
"type": "module",
"description": "Runtime and polyfill for wasm targets",
"author": {
"name": "LongYinan",
"url": "https://github.com/Brooooooklyn"
},
"repository": {
"type": "git",
"url": "https://github.com/tursodatabase/turso"
},
"license": "MIT",
"files": [
"runtime.cjs",
"runtime.js",
"fs-proxy.js",
"dist/*.cjs",
"dist/*.js",
"LICENSE"
],
"dependencies": {
"@emnapi/core": "^1.4.5",
"@emnapi/runtime": "^1.4.5",
"@tybys/wasm-util": "^0.10.1"
},
"scripts": {
"build": "echo 'nothing to build'",
"tsc-build": "echo 'nothing to tsc-build'",
"test": "echo 'nothing to test'"
},
"exports": {
".": {
"import": "./runtime.js",
"require": "./runtime.cjs"
},
"./fs": {
"import": "./dist/fs.js"
}
}
}

View File

@@ -1,15 +0,0 @@
const { MessageHandler, instantiateNapiModuleSync, instantiateNapiModule } = require('@emnapi/core')
const { getDefaultContext } = require('@emnapi/runtime')
const { WASI } = require('@tybys/wasm-util')
const { createFsProxy, createOnMessage } = require('./dist/fs-proxy.cjs')
module.exports = {
MessageHandler,
instantiateNapiModule,
instantiateNapiModuleSync,
getDefaultContext,
WASI,
createFsProxy,
createOnMessage,
}

View File

@@ -1,8 +0,0 @@
export {
instantiateNapiModuleSync,
instantiateNapiModule,
MessageHandler,
} from '@emnapi/core'
export { getDefaultContext } from '@emnapi/runtime'
export * from '@tybys/wasm-util'
export { createOnMessage, createFsProxy } from './fs-proxy.js'

View File

@@ -1,4 +1,3 @@
sed -i "s/$NAME_FROM/$NAME_TO/g" packages/wasm-runtime/package.json
sed -i "s/$NAME_FROM/$NAME_TO/g" packages/common/package.json
sed -i "s/$NAME_FROM/$NAME_TO/g" packages/native/package.json
sed -i "s/$NAME_FROM/$NAME_TO/g" packages/browser/package.json
@@ -7,7 +6,6 @@ sed -i "s/$NAME_FROM/$NAME_TO/g" sync/packages/common/package.json
sed -i "s/$NAME_FROM/$NAME_TO/g" sync/packages/native/package.json
sed -i "s/$NAME_FROM/$NAME_TO/g" sync/packages/browser/package.json
sed -i "s/$VERSION_FROM/$VERSION_TO/g" packages/wasm-runtime/package.json
sed -i "s/$VERSION_FROM/$VERSION_TO/g" packages/common/package.json
sed -i "s/$VERSION_FROM/$VERSION_TO/g" packages/native/package.json
sed -i "s/$VERSION_FROM/$VERSION_TO/g" packages/browser/package.json
@@ -43,6 +41,3 @@ sed -i "s/$NAME_FROM/$NAME_TO/g" sync/packages/browser/index-bundle.ts
sed -i "s/$NAME_FROM/$NAME_TO/g" sync/packages/browser/index-vite-dev-hack.ts
sed -i "s/$NAME_FROM/$NAME_TO/g" sync/packages/browser/index-turbopack-hack.ts
sed -i "s/$NAME_FROM/$NAME_TO/g" sync/packages/browser/worker.ts
sed -i "s/$NAME_FROM/$NAME_TO/g" packages/wasm-runtime/runtime.cjs
sed -i "s/$NAME_FROM/$NAME_TO/g" packages/wasm-runtime/runtime.js

View File

@@ -37,31 +37,31 @@ __metadata:
languageName: node
linkType: hard
"@emnapi/core@npm:^1.4.5":
version: 1.4.5
resolution: "@emnapi/core@npm:1.4.5"
"@emnapi/core@npm:^1.5.0":
version: 1.5.0
resolution: "@emnapi/core@npm:1.5.0"
dependencies:
"@emnapi/wasi-threads": "npm:1.0.4"
"@emnapi/wasi-threads": "npm:1.1.0"
tslib: "npm:^2.4.0"
checksum: 10c0/da4a57f65f325d720d0e0d1a9c6618b90c4c43a5027834a110476984e1d47c95ebaed4d316b5dddb9c0ed9a493ffeb97d1934f9677035f336d8a36c1f3b2818f
checksum: 10c0/52ba3485277706d92fa27d92b37e5b4f6ef0742c03ed68f8096f294c6bfa30f0752c82d4c2bfa14bff4dc30d63c9f71a8f9fb64a92743d00807d9e468fafd5ff
languageName: node
linkType: hard
"@emnapi/runtime@npm:^1.4.5":
version: 1.4.5
resolution: "@emnapi/runtime@npm:1.4.5"
"@emnapi/runtime@npm:^1.5.0":
version: 1.5.0
resolution: "@emnapi/runtime@npm:1.5.0"
dependencies:
tslib: "npm:^2.4.0"
checksum: 10c0/37a0278be5ac81e918efe36f1449875cbafba947039c53c65a1f8fc238001b866446fc66041513b286baaff5d6f9bec667f5164b3ca481373a8d9cb65bfc984b
checksum: 10c0/a85c9fc4e3af49cbe41e5437e5be2551392a931910cd0a5b5d3572532786927810c9cc1db11b232ec8f9657b33d4e6f7c4f985f1a052917d7cd703b5b2a20faa
languageName: node
linkType: hard
"@emnapi/wasi-threads@npm:1.0.4":
version: 1.0.4
resolution: "@emnapi/wasi-threads@npm:1.0.4"
"@emnapi/wasi-threads@npm:1.1.0":
version: 1.1.0
resolution: "@emnapi/wasi-threads@npm:1.1.0"
dependencies:
tslib: "npm:^2.4.0"
checksum: 10c0/2c91a53e62f875800baf035c4d42c9c0d18e5afd9a31ca2aac8b435aeaeaeaac386b5b3d0d0e70aa7a5a9852bbe05106b1f680cd82cce03145c703b423d41313
checksum: 10c0/e6d54bf2b1e64cdd83d2916411e44e579b6ae35d5def0dea61a3c452d9921373044dff32a8b8473ae60c80692bdc39323e98b96a3f3d87ba6886b24dd0ef7ca1
languageName: node
linkType: hard
@@ -1091,14 +1091,14 @@ __metadata:
languageName: node
linkType: hard
"@napi-rs/wasm-runtime@npm:^1.0.1":
version: 1.0.3
resolution: "@napi-rs/wasm-runtime@npm:1.0.3"
"@napi-rs/wasm-runtime@npm:^1.0.1, @napi-rs/wasm-runtime@npm:^1.0.5":
version: 1.0.5
resolution: "@napi-rs/wasm-runtime@npm:1.0.5"
dependencies:
"@emnapi/core": "npm:^1.4.5"
"@emnapi/runtime": "npm:^1.4.5"
"@tybys/wasm-util": "npm:^0.10.0"
checksum: 10c0/7918d82477e75931b6e35bb003464382eb93e526362f81a98bf8610407a67b10f4d041931015ad48072c89db547deb7e471dfb91f4ab11ac63a24d8580297f75
"@emnapi/core": "npm:^1.5.0"
"@emnapi/runtime": "npm:^1.5.0"
"@tybys/wasm-util": "npm:^0.10.1"
checksum: 10c0/8d29299933c57b6ead61f46fad5c3dfabc31e1356bbaf25c3a8ae57be0af0db0006a808f2c1bb16e28925e027f20e0856550dac94e015f56dd6ed53b38f9a385
languageName: node
linkType: hard
@@ -1579,7 +1579,7 @@ __metadata:
version: 0.0.0-use.local
resolution: "@tursodatabase/database-browser-common@workspace:packages/browser-common"
dependencies:
"@tursodatabase/wasm-runtime": "npm:^0.2.0-pre.7"
"@napi-rs/wasm-runtime": "npm:^1.0.5"
typescript: "npm:^5.9.2"
languageName: unknown
linkType: soft
@@ -1660,17 +1660,7 @@ __metadata:
languageName: unknown
linkType: soft
"@tursodatabase/wasm-runtime@npm:^0.2.0-pre.7, @tursodatabase/wasm-runtime@workspace:packages/wasm-runtime":
version: 0.0.0-use.local
resolution: "@tursodatabase/wasm-runtime@workspace:packages/wasm-runtime"
dependencies:
"@emnapi/core": "npm:^1.4.5"
"@emnapi/runtime": "npm:^1.4.5"
"@tybys/wasm-util": "npm:^0.10.1"
languageName: unknown
linkType: soft
"@tybys/wasm-util@npm:^0.10.0, @tybys/wasm-util@npm:^0.10.1":
"@tybys/wasm-util@npm:^0.10.1":
version: 0.10.1
resolution: "@tybys/wasm-util@npm:0.10.1"
dependencies:

View File

@@ -23,7 +23,7 @@ use crate::Pager;
use crate::{return_and_restore_if_io, return_if_io, LimboError, Result};
use std::collections::HashMap;
use std::fmt::{self, Display, Formatter};
use std::sync::Arc;
use std::sync::{atomic::Ordering, Arc};
// The state table has 5 columns: operator_id, zset_id, element_id, value, weight
const OPERATOR_COLUMNS: usize = 5;
@@ -1507,7 +1507,7 @@ impl DbspCompiler {
let db = Database::open_file(io, ":memory:", false, false)?;
let internal_conn = db.connect()?;
internal_conn.query_only.set(true);
internal_conn.auto_commit.set(false);
internal_conn.auto_commit.store(false, Ordering::SeqCst);
// Create temporary symbol table
let temp_syms = SymbolTable::new();

View File

@@ -342,7 +342,7 @@ mod tests {
// Get the schema and view
let view_mutex = conn
.schema
.borrow()
.read()
.get_materialized_view("test_view")
.ok_or(crate::LimboError::InternalError(
"View not found".to_string(),

View File

@@ -8,7 +8,7 @@ use crate::incremental::operator::{
};
use crate::types::IOResult;
use crate::{Connection, Database, Result, Value};
use std::sync::{Arc, Mutex};
use std::sync::{atomic::Ordering, Arc, Mutex};
#[derive(Debug, Clone)]
pub struct ProjectColumn {
@@ -63,7 +63,7 @@ impl ProjectOperator {
let internal_conn = db.connect()?;
// Set to read-only mode and disable auto-commit since we're only evaluating expressions
internal_conn.query_only.set(true);
internal_conn.auto_commit.set(false);
internal_conn.auto_commit.store(false, Ordering::SeqCst);
// Create ProjectColumn structs from compiled expressions
let columns: Vec<ProjectColumn> = compiled_exprs

View File

@@ -70,7 +70,7 @@ use std::{
ops::Deref,
rc::Rc,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, LazyLock, Mutex, Weak,
},
time::Duration,
@@ -493,15 +493,15 @@ impl Database {
let conn = Arc::new(Connection {
db: self.clone(),
pager: RwLock::new(Arc::new(pager)),
schema: RefCell::new(
schema: RwLock::new(
self.schema
.lock()
.map_err(|_| LimboError::SchemaLocked)?
.clone(),
),
database_schemas: RefCell::new(std::collections::HashMap::new()),
auto_commit: Cell::new(true),
transaction_state: Cell::new(TransactionState::None),
database_schemas: RwLock::new(std::collections::HashMap::new()),
auto_commit: AtomicBool::new(true),
transaction_state: RwLock::new(TransactionState::None),
last_insert_rowid: Cell::new(0),
last_change: Cell::new(0),
total_changes: Cell::new(0),
@@ -980,13 +980,13 @@ impl DatabaseCatalog {
pub struct Connection {
db: Arc<Database>,
pager: RwLock<Arc<Pager>>,
schema: RefCell<Arc<Schema>>,
schema: RwLock<Arc<Schema>>,
/// Per-database schema cache (database_index -> schema)
/// Loaded lazily to avoid copying all schemas on connection open
database_schemas: RefCell<std::collections::HashMap<usize, Arc<Schema>>>,
database_schemas: RwLock<std::collections::HashMap<usize, Arc<Schema>>>,
/// Whether to automatically commit transaction
auto_commit: Cell<bool>,
transaction_state: Cell<TransactionState>,
auto_commit: AtomicBool,
transaction_state: RwLock<TransactionState>,
last_insert_rowid: Cell<i64>,
last_change: Cell<i64>,
total_changes: Cell<i64>,
@@ -1061,7 +1061,7 @@ impl Connection {
let mode = QueryMode::new(&cmd);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
let program = translate::translate(
self.schema.borrow().deref(),
self.schema.read().deref(),
stmt,
pager.clone(),
self.clone(),
@@ -1115,7 +1115,7 @@ impl Connection {
}
// maybe_reparse_schema must be called outside of any transaction
turso_assert!(
self.transaction_state.get() == TransactionState::None,
self.get_tx_state() == TransactionState::None,
"unexpected start transaction"
);
// start read transaction manually, because we will read schema cookie once again and
@@ -1124,11 +1124,12 @@ impl Connection {
// from now on we must be very careful with errors propagation
// in order to not accidentally keep read transaction opened
pager.begin_read_tx()?;
self.transaction_state.replace(TransactionState::Read);
self.set_tx_state(TransactionState::Read);
let reparse_result = self.reparse_schema();
let previous = self.transaction_state.replace(TransactionState::None);
let previous =
std::mem::replace(&mut *self.transaction_state.write(), TransactionState::None);
turso_assert!(
matches!(previous, TransactionState::None | TransactionState::Read),
"unexpected end transaction state"
@@ -1141,7 +1142,7 @@ impl Connection {
reparse_result?;
let schema = self.schema.borrow().clone();
let schema = self.schema.read().clone();
self.db.update_schema_if_newer(schema)
}
@@ -1155,12 +1156,12 @@ impl Connection {
.get();
// create fresh schema as some objects can be deleted
let mut fresh = Schema::new(self.schema.borrow().indexes_enabled);
let mut fresh = Schema::new(self.schema.read().indexes_enabled);
fresh.schema_version = cookie;
// Preserve existing views to avoid expensive repopulation.
// TODO: We may not need to do this if we materialize our views.
let existing_views = self.schema.borrow().incremental_views.clone();
let existing_views = self.schema.read().incremental_views.clone();
// TODO: this is hack to avoid a cyclical problem with schema reprepare
// The problem here is that we prepare a statement here, but when the statement tries
@@ -1218,7 +1219,7 @@ impl Connection {
let mode = QueryMode::new(&cmd);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
let program = translate::translate(
self.schema.borrow().deref(),
self.schema.read().deref(),
stmt,
pager.clone(),
self.clone(),
@@ -1266,7 +1267,7 @@ impl Connection {
let mode = QueryMode::new(&cmd);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
let program = translate::translate(
self.schema.borrow().deref(),
self.schema.read().deref(),
stmt,
pager.clone(),
self.clone(),
@@ -1302,7 +1303,7 @@ impl Connection {
let mode = QueryMode::new(&cmd);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
let program = translate::translate(
self.schema.borrow().deref(),
self.schema.read().deref(),
stmt,
pager.clone(),
self.clone(),
@@ -1411,16 +1412,16 @@ impl Connection {
}
pub fn maybe_update_schema(&self) -> Result<()> {
let current_schema_version = self.schema.borrow().schema_version;
let current_schema_version = self.schema.read().schema_version;
let schema = self
.db
.schema
.lock()
.map_err(|_| LimboError::SchemaLocked)?;
if matches!(self.transaction_state.get(), TransactionState::None)
if matches!(self.get_tx_state(), TransactionState::None)
&& current_schema_version != schema.schema_version
{
self.schema.replace(schema.clone());
*self.schema.write() = schema.clone();
}
Ok(())
@@ -1442,7 +1443,7 @@ impl Connection {
/// Write transaction must be opened in advance - otherwise method will panic
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
pub fn write_schema_version(self: &Arc<Connection>, version: u32) -> Result<()> {
let TransactionState::Write { .. } = self.transaction_state.get() else {
let TransactionState::Write { .. } = self.get_tx_state() else {
return Err(LimboError::InternalError(
"write_schema_version must be called from within Write transaction".to_string(),
));
@@ -1454,9 +1455,9 @@ impl Connection {
header.schema_cookie.get() < version,
"cookie can't go back in time"
);
self.transaction_state.replace(TransactionState::Write {
*self.transaction_state.write() = TransactionState::Write {
schema_did_change: true,
});
};
self.with_schema_mut(|schema| schema.schema_version = version);
header.schema_cookie = version.into();
})
@@ -1540,10 +1541,10 @@ impl Connection {
})?;
// start write transaction and disable auto-commit mode as SQL can be executed within WAL session (at caller own risk)
self.transaction_state.replace(TransactionState::Write {
*self.transaction_state.write() = TransactionState::Write {
schema_did_change: false,
});
self.auto_commit.replace(false);
};
self.auto_commit.store(false, Ordering::SeqCst);
Ok(())
}
@@ -1576,8 +1577,8 @@ impl Connection {
None
};
self.auto_commit.replace(true);
self.transaction_state.replace(TransactionState::None);
self.auto_commit.store(true, Ordering::SeqCst);
self.set_tx_state(TransactionState::None);
{
let wal = wal.borrow_mut();
wal.end_write_tx();
@@ -1627,7 +1628,7 @@ impl Connection {
}
self.closed.set(true);
match self.transaction_state.get() {
match self.get_tx_state() {
TransactionState::None => {
// No active transaction
}
@@ -1639,7 +1640,7 @@ impl Connection {
self,
)
})?;
self.transaction_state.set(TransactionState::None);
self.set_tx_state(TransactionState::None);
}
}
@@ -1779,7 +1780,7 @@ impl Connection {
}
pub fn get_auto_commit(&self) -> bool {
self.auto_commit.get()
self.auto_commit.load(Ordering::SeqCst)
}
pub fn parse_schema_rows(self: &Arc<Connection>) -> Result<()> {
@@ -1879,7 +1880,7 @@ impl Connection {
#[inline]
pub fn with_schema_mut<T>(&self, f: impl FnOnce(&mut Schema) -> T) -> T {
let mut schema_ref = self.schema.borrow_mut();
let mut schema_ref = self.schema.write();
let schema = Arc::make_mut(&mut *schema_ref);
f(schema)
}
@@ -2029,15 +2030,15 @@ impl Connection {
pub(crate) fn with_schema<T>(&self, database_id: usize, f: impl FnOnce(&Schema) -> T) -> T {
if database_id == 0 {
// Main database - use connection's schema which should be kept in sync
let schema = self.schema.borrow();
let schema = self.schema.read();
f(&schema)
} else if database_id == 1 {
// Temp database - uses same schema as main for now, but this will change later.
let schema = self.schema.borrow();
let schema = self.schema.read();
f(&schema)
} else {
// Attached database - check cache first, then load from database
let mut schemas = self.database_schemas.borrow_mut();
let mut schemas = self.database_schemas.write();
if let Some(cached_schema) = schemas.get(&database_id) {
return f(cached_schema);
@@ -2194,6 +2195,14 @@ impl Connection {
pub fn get_busy_timeout(&self) -> std::time::Duration {
self.busy_timeout.get()
}
fn set_tx_state(&self, state: TransactionState) {
*self.transaction_state.write() = state;
}
fn get_tx_state(&self) -> TransactionState {
*self.transaction_state.read()
}
}
#[derive(Debug)]
@@ -2430,7 +2439,7 @@ impl Statement {
fn reprepare(&mut self) -> Result<()> {
tracing::trace!("repreparing statement");
let conn = self.program.connection.clone();
*conn.schema.borrow_mut() = conn.db.clone_schema()?;
*conn.schema.write() = conn.db.clone_schema()?;
self.program = {
let mut parser = Parser::new(self.program.sql.as_bytes());
let cmd = parser.next_cmd()?;
@@ -2441,7 +2450,7 @@ impl Statement {
debug_assert_eq!(QueryMode::new(&cmd), mode,);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
translate::translate(
conn.schema.borrow().deref(),
conn.schema.read().deref(),
stmt,
self.pager.clone(),
conn.clone(),
@@ -2472,13 +2481,10 @@ impl Statement {
if let Some(io) = &self.state.io_completions {
io.abort();
}
let state = self.program.connection.transaction_state.get();
let state = self.program.connection.get_tx_state();
if let TransactionState::Write { .. } = state {
let end_tx_res = self.pager.end_tx(true, &self.program.connection)?;
self.program
.connection
.transaction_state
.set(TransactionState::None);
self.program.connection.set_tx_state(TransactionState::None);
assert!(
matches!(end_tx_res, IOResult::Done(_)),
"end_tx should not return IO as it should just end txn without flushing anything. Got {end_tx_res:?}"

View File

@@ -864,12 +864,12 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
}
CommitState::EndCommitLogicalLog { end_ts } => {
let connection = self.connection.clone();
let schema_did_change = match connection.transaction_state.get() {
let schema_did_change = match connection.get_tx_state() {
crate::TransactionState::Write { schema_did_change } => schema_did_change,
_ => false,
};
if schema_did_change {
let schema = connection.schema.borrow().clone();
let schema = connection.schema.read().clone();
connection.db.update_schema_if_newer(schema)?;
}
let tx = mvcc_store.txs.get(&self.tx_id).unwrap();

View File

@@ -1131,7 +1131,7 @@ impl Pager {
// TODO: Unsure what the semantics of "end_tx" is for in-memory databases, ephemeral tables and ephemeral indexes.
return Ok(IOResult::Done(PagerCommitResult::Rollback));
};
let (is_write, schema_did_change) = match connection.transaction_state.get() {
let (is_write, schema_did_change) = match connection.get_tx_state() {
TransactionState::Write { schema_did_change } => (true, schema_did_change),
_ => (false, false),
};
@@ -1153,7 +1153,7 @@ impl Pager {
wal.borrow().end_read_tx();
if schema_did_change {
let schema = connection.schema.borrow().clone();
let schema = connection.schema.read().clone();
connection.db.update_schema_if_newer(schema)?;
}
Ok(IOResult::Done(commit_status))
@@ -2324,7 +2324,7 @@ impl Pager {
}
self.reset_internal_states();
if schema_did_change {
connection.schema.replace(connection.db.clone_schema()?);
*connection.schema.write() = connection.db.clone_schema()?;
}
if is_write {
if let Some(wal) = self.wal.as_ref() {

View File

@@ -207,6 +207,13 @@ pub fn translate_alter_table(
}
}
let new_column_name = column.name.as_ref().unwrap();
if btree.get_column(new_column_name).is_some() {
return Err(LimboError::ParseError(
"duplicate column name: ".to_string() + new_column_name,
));
}
// TODO: All quoted ids will be quoted with `[]`, we should store some info from the parsed AST
btree.columns.push(column.clone());

View File

@@ -307,6 +307,8 @@ pub fn emit_query<'a>(
&plan.result_columns,
&plan.order_by,
&plan.table_references,
plan.group_by.is_some(),
&plan.aggregates,
)?;
}

View File

@@ -1,4 +1,4 @@
use turso_parser::ast;
use turso_parser::ast::{self, SortOrder};
use super::{
emitter::TranslateCtx,
@@ -7,9 +7,16 @@ use super::{
plan::{Distinctness, GroupBy, SelectPlan},
result_row::emit_select_result,
};
use crate::translate::aggregation::{translate_aggregation_step, AggArgumentSource};
use crate::translate::expr::{walk_expr, WalkControl};
use crate::translate::plan::ResultSetColumn;
use crate::translate::{
aggregation::{translate_aggregation_step, AggArgumentSource},
plan::Aggregate,
};
use crate::translate::{
emitter::Resolver,
expr::{walk_expr, WalkControl},
optimizer::Optimizable,
};
use crate::{
schema::PseudoCursorType,
translate::collate::CollationSeq,
@@ -231,6 +238,71 @@ pub fn init_group_by<'a>(
Ok(())
}
/// Returns whether an ORDER BY expression should be treated as an
/// aggregate-position term for the purposes of tie-ordering.
///
/// We classify an ORDER BY term as "aggregate or constant" when:
/// it is syntactically equivalent to one of the finalized aggregate
/// expressions for this SELECT (`COUNT(*)`, `SUM(col)`, `MAX(price)`), or
/// it is a constant literal
///
/// Why this matters:
/// When ORDER BY consists only of aggregates and/or constants, SQLite relies
/// on the stability of the ORDER BY sorter to preserve the traversal order
/// of groups established by GROUP BY iteration, and no extra tiebreak
/// `Sequence` column is appended
pub fn is_orderby_agg_or_const(resolver: &Resolver, e: &ast::Expr, aggs: &[Aggregate]) -> bool {
if aggs
.iter()
.any(|agg| exprs_are_equivalent(&agg.original_expr, e))
{
return true;
}
e.is_constant(resolver)
}
/// Computes the traversal order of GROUP BY keys so that the final
/// ORDER BY matches SQLite's tie-breaking semantics.
///
/// If there are no GROUP BY keys or no ORDER BY terms, all keys default to ascending.
///
/// If *every* ORDER BY term is an aggregate or a constant then we mirror the
/// direction of the first ORDER BY term across all GROUP BY keys.
///
/// Otherwise (mixed ORDER BY: at least one non-aggregate, non-constant term),
/// we try to mirror explicit directions for any GROUP BY expression that
/// appears in ORDER BY, and the remaining keys default to `ASC`.
pub fn compute_group_by_sort_order(
group_by_exprs: &[ast::Expr],
order_by: &[(Box<ast::Expr>, SortOrder)],
aggs: &[Aggregate],
resolver: &Resolver,
) -> Vec<SortOrder> {
let groupby_len = group_by_exprs.len();
if groupby_len == 0 || order_by.is_empty() {
return vec![SortOrder::Asc; groupby_len];
}
let only_agg_or_const = order_by
.iter()
.all(|(e, _)| is_orderby_agg_or_const(resolver, e, aggs));
if only_agg_or_const {
let first_direction = order_by[0].1;
return vec![first_direction; groupby_len];
}
let mut result = vec![SortOrder::Asc; groupby_len];
for (idx, groupby_expr) in group_by_exprs.iter().enumerate() {
if let Some((_, direction)) = order_by
.iter()
.find(|(expr, _)| exprs_are_equivalent(expr, groupby_expr))
{
result[idx] = *direction;
}
}
result
}
fn collect_non_aggregate_expressions<'a>(
non_aggregate_expressions: &mut Vec<(&'a ast::Expr, bool)>,
group_by: &'a GroupBy,
@@ -736,15 +808,7 @@ pub fn group_by_emit_row_phase<'a>(
)?;
}
false => {
order_by_sorter_insert(
program,
&t_ctx.resolver,
t_ctx
.meta_sort
.as_ref()
.expect("sort metadata must exist for ORDER BY"),
plan,
)?;
order_by_sorter_insert(program, t_ctx, plan)?;
}
}

View File

@@ -860,15 +860,7 @@ fn emit_loop_source(
Ok(())
}
LoopEmitTarget::OrderBySorter => {
order_by_sorter_insert(
program,
&t_ctx.resolver,
t_ctx
.meta_sort
.as_ref()
.expect("sort metadata must exist for ORDER BY"),
plan,
)?;
order_by_sorter_insert(program, t_ctx, plan)?;
if let Distinctness::Distinct { ctx } = &plan.distinctness {
let distinct_ctx = ctx.as_ref().expect("distinct context must exist");

View File

@@ -3,7 +3,7 @@ use turso_parser::ast::{self, SortOrder};
use crate::{
emit_explain,
schema::PseudoCursorType,
translate::collate::CollationSeq,
translate::{collate::CollationSeq, group_by::is_orderby_agg_or_const, plan::Aggregate},
util::exprs_are_equivalent,
vdbe::{
builder::{CursorType, ProgramBuilder},
@@ -13,7 +13,7 @@ use crate::{
};
use super::{
emitter::{Resolver, TranslateCtx},
emitter::TranslateCtx,
expr::translate_expr,
plan::{Distinctness, ResultSetColumn, SelectPlan, TableReferences},
result_row::{emit_offset, emit_result_row_and_limit},
@@ -30,6 +30,11 @@ pub struct SortMetadata {
// This vector holds the indexes of the result columns in the ORDER BY sorter.
// This vector must be the same length as the result columns.
pub remappings: Vec<OrderByRemapping>,
/// Whether we append an extra ascending "Sequence" key to the ORDER BY sort keys.
/// This is used *only* when a GROUP BY is present *and* ORDER BY is not purely
/// aggregates/constants, so that rows that tie on ORDER BY terms are output in
/// the same relative order the underlying row stream produced them.
pub has_sequence: bool,
}
/// Initialize resources needed for ORDER BY processing
@@ -39,12 +44,21 @@ pub fn init_order_by(
result_columns: &[ResultSetColumn],
order_by: &[(Box<ast::Expr>, SortOrder)],
referenced_tables: &TableReferences,
has_group_by: bool,
aggregates: &[Aggregate],
) -> Result<()> {
let sort_cursor = program.alloc_cursor_id(CursorType::Sorter);
let only_aggs = order_by
.iter()
.all(|(e, _)| is_orderby_agg_or_const(&t_ctx.resolver, e, aggregates));
// only emit sequence column if we have GROUP BY and ORDER BY is not only aggregates or constants
let has_sequence = has_group_by && !only_aggs;
t_ctx.meta_sort = Some(SortMetadata {
sort_cursor,
reg_sorter_data: program.alloc_register(),
remappings: order_by_deduplicate_result_columns(order_by, result_columns),
remappings: order_by_deduplicate_result_columns(order_by, result_columns, has_sequence),
has_sequence,
});
/*
@@ -54,7 +68,7 @@ pub fn init_order_by(
* then the collating sequence of the column is used to determine sort order.
* If the expression is not a column and has no COLLATE clause, then the BINARY collating sequence is used.
*/
let collations = order_by
let mut collations = order_by
.iter()
.map(|(expr, _)| match expr.as_ref() {
ast::Expr::Collate(_, collation_name) => {
@@ -72,10 +86,25 @@ pub fn init_order_by(
_ => Ok(Some(CollationSeq::default())),
})
.collect::<Result<Vec<_>>>()?;
if has_sequence {
// sequence column uses BINARY collation
collations.push(Some(CollationSeq::default()));
}
let key_len = order_by.len() + if has_sequence { 1 } else { 0 };
program.emit_insn(Insn::SorterOpen {
cursor_id: sort_cursor,
columns: order_by.len(),
order: order_by.iter().map(|(_, direction)| *direction).collect(),
columns: key_len,
order: {
let mut ord: Vec<SortOrder> = order_by.iter().map(|(_, d)| *d).collect();
if has_sequence {
// sequence is ascending tiebreaker
ord.push(SortOrder::Asc);
}
ord
},
collations,
});
Ok(())
@@ -98,9 +127,12 @@ pub fn emit_order_by(
sort_cursor,
reg_sorter_data,
ref remappings,
has_sequence,
} = *t_ctx.meta_sort.as_ref().unwrap();
let sorter_column_count =
order_by.len() + remappings.iter().filter(|r| !r.deduplicated).count();
let sorter_column_count = order_by.len()
+ if has_sequence { 1 } else { 0 }
+ remappings.iter().filter(|r| !r.deduplicated).count();
// TODO: we need to know how many indices used for sorting
// to emit correct explain output.
@@ -142,10 +174,11 @@ pub fn emit_order_by(
let start_reg = t_ctx.reg_result_cols_start.unwrap();
for i in 0..result_columns.len() {
let reg = start_reg + i;
let column_idx = remappings
let remapping = remappings
.get(i)
.expect("remapping must exist for all result columns")
.orderby_sorter_idx;
.expect("remapping must exist for all result columns");
let column_idx = remapping.orderby_sorter_idx;
program.emit_column_or_rowid(cursor_id, column_idx, reg);
}
@@ -170,10 +203,11 @@ pub fn emit_order_by(
/// Emits the bytecode for inserting a row into an ORDER BY sorter.
pub fn order_by_sorter_insert(
program: &mut ProgramBuilder,
resolver: &Resolver,
sort_metadata: &SortMetadata,
t_ctx: &TranslateCtx,
plan: &SelectPlan,
) -> Result<()> {
let resolver = &t_ctx.resolver;
let sort_metadata = t_ctx.meta_sort.as_ref().expect("sort metadata must exist");
let order_by = &plan.order_by;
let order_by_len = order_by.len();
let result_columns = &plan.result_columns;
@@ -185,19 +219,49 @@ pub fn order_by_sorter_insert(
// The ORDER BY sorter has the sort keys first, then the result columns.
let orderby_sorter_column_count =
order_by_len + result_columns.len() - result_columns_to_skip_len;
order_by_len + if sort_metadata.has_sequence { 1 } else { 0 } + result_columns.len()
- result_columns_to_skip_len;
let start_reg = program.alloc_registers(orderby_sorter_column_count);
for (i, (expr, _)) in order_by.iter().enumerate() {
let key_reg = start_reg + i;
translate_expr(
program,
Some(&plan.table_references),
expr,
key_reg,
resolver,
)?;
// Check if this ORDER BY expression matches a finalized aggregate
if let Some(agg_idx) = plan
.aggregates
.iter()
.position(|agg| exprs_are_equivalent(&agg.original_expr, expr))
{
// This ORDER BY expression is an aggregate, so copy from register
let agg_start_reg = t_ctx
.reg_agg_start
.expect("aggregate registers must be initialized");
let src_reg = agg_start_reg + agg_idx;
program.emit_insn(Insn::Copy {
src_reg,
dst_reg: key_reg,
extra_amount: 0,
});
} else {
// Not an aggregate, translate normally
translate_expr(
program,
Some(&plan.table_references),
expr,
key_reg,
resolver,
)?;
}
}
let mut cur_reg = start_reg + order_by_len;
if sort_metadata.has_sequence {
program.emit_insn(Insn::Sequence {
cursor_id: sort_metadata.sort_cursor,
target_reg: cur_reg,
});
cur_reg += 1;
}
for (i, rc) in result_columns.iter().enumerate() {
// If the result column is an exact duplicate of a sort key, we skip it.
if sort_metadata
@@ -251,12 +315,12 @@ pub fn order_by_sorter_insert(
let reordered_start_reg = program.alloc_registers(result_columns.len());
for (select_idx, _rc) in result_columns.iter().enumerate() {
let src_reg = sort_metadata
let remapping = sort_metadata
.remappings
.get(select_idx)
.map(|r| start_reg + r.orderby_sorter_idx)
.expect("remapping must exist for all result columns");
let src_reg = start_reg + remapping.orderby_sorter_idx;
let dst_reg = reordered_start_reg + select_idx;
program.emit_insn(Insn::Copy {
@@ -336,9 +400,13 @@ pub struct OrderByRemapping {
pub fn order_by_deduplicate_result_columns(
order_by: &[(Box<ast::Expr>, SortOrder)],
result_columns: &[ResultSetColumn],
has_sequence: bool,
) -> Vec<OrderByRemapping> {
let mut result_column_remapping: Vec<OrderByRemapping> = Vec::new();
let order_by_len = order_by.len();
// `sequence_offset` shifts the base index where non-deduped SELECT columns begin,
// because Sequence sits after ORDER BY keys but before result columns.
let sequence_offset = if has_sequence { 1 } else { 0 };
let mut i = 0;
for rc in result_columns.iter() {
@@ -356,7 +424,7 @@ pub fn order_by_deduplicate_result_columns(
// index comes after all ORDER BY entries (hence the +order_by_len). The
// counter `i` tracks how many such non-duplicate result columns we've seen.
result_column_remapping.push(OrderByRemapping {
orderby_sorter_idx: i + order_by_len,
orderby_sorter_idx: order_by_len + sequence_offset + i,
deduplicated: false,
});
i += 1;

View File

@@ -354,7 +354,17 @@ fn parse_table(
.position(|cte| cte.identifier == normalized_qualified_name)
{
// TODO: what if the CTE is referenced multiple times?
let cte_table = ctes.remove(cte_idx);
let mut cte_table = ctes.remove(cte_idx);
// If there's an alias provided, update the identifier to use that alias
if let Some(a) = maybe_alias {
let alias = match a {
ast::As::As(id) => id,
ast::As::Elided(id) => id,
};
cte_table.identifier = normalize_ident(alias.as_str());
}
table_references.add_joined_table(cte_table);
return Ok(());
};

View File

@@ -4,7 +4,9 @@ use super::plan::{
Search, TableReferences, WhereTerm, Window,
};
use crate::schema::Table;
use crate::translate::emitter::Resolver;
use crate::translate::expr::{bind_and_rewrite_expr, ParamState};
use crate::translate::group_by::compute_group_by_sort_order;
use crate::translate::optimizer::optimize_plan;
use crate::translate::plan::{GroupBy, Plan, ResultSetColumn, SelectPlan};
use crate::translate::planner::{
@@ -19,7 +21,7 @@ use crate::{schema::Schema, vdbe::builder::ProgramBuilder, Result};
use crate::{Connection, SymbolTable};
use std::sync::Arc;
use turso_parser::ast::ResultColumn;
use turso_parser::ast::{self, CompoundSelect, Expr, SortOrder};
use turso_parser::ast::{self, CompoundSelect, Expr};
pub struct TranslateSelectResult {
pub program: ProgramBuilder,
@@ -424,7 +426,7 @@ fn prepare_one_select_plan(
}
plan.group_by = Some(GroupBy {
sort_order: Some((0..group_by.exprs.len()).map(|_| SortOrder::Asc).collect()),
sort_order: None,
exprs: group_by.exprs.iter().map(|expr| *expr.clone()).collect(),
having: if let Some(having) = group_by.having {
let mut predicates = vec![];
@@ -487,6 +489,16 @@ fn prepare_one_select_plan(
key.push((o.expr, o.order.unwrap_or(ast::SortOrder::Asc)));
}
plan.order_by = key;
if let Some(group_by) = &mut plan.group_by {
// now that we have resolved the ORDER BY expressions and aggregates, we can
// compute the necessary sort order for the GROUP BY clause
group_by.sort_order = Some(compute_group_by_sort_order(
&group_by.exprs,
&plan.order_by,
&plan.aggregates,
&Resolver::new(schema, syms),
));
}
// Parse the LIMIT/OFFSET clause
(plan.limit, plan.offset) = limit.map_or(Ok((None, None)), |mut l| {

View File

@@ -1013,15 +1013,7 @@ fn emit_return_buffered_rows(
)?;
}
false => {
order_by_sorter_insert(
program,
&t_ctx.resolver,
t_ctx
.meta_sort
.as_ref()
.expect("sort metadata must exist for ORDER BY"),
plan,
)?;
order_by_sorter_insert(program, t_ctx, plan)?;
}
}

View File

@@ -38,7 +38,7 @@ use std::env::temp_dir;
use std::ops::DerefMut;
use std::{
borrow::BorrowMut,
sync::{Arc, Mutex},
sync::{atomic::Ordering, Arc, Mutex},
};
use turso_macros::match_ignore_ascii_case;
@@ -369,7 +369,7 @@ pub fn op_checkpoint_inner(
},
insn
);
if !program.connection.auto_commit.get() {
if !program.connection.auto_commit.load(Ordering::SeqCst) {
// TODO: sqlite returns "Runtime error: database table is locked (6)" when a table is in use
// when a checkpoint is attempted. We don't have table locks, so return TableLocked for any
// attempt to checkpoint in an interactive transaction. This does not end the transaction,
@@ -2107,7 +2107,7 @@ pub fn halt(
}
}
let auto_commit = program.connection.auto_commit.get();
let auto_commit = program.connection.auto_commit.load(Ordering::SeqCst);
tracing::trace!("halt(auto_commit={})", auto_commit);
if auto_commit {
program
@@ -2206,7 +2206,7 @@ pub fn op_transaction_inner(
}
// 1. We try to upgrade current version
let current_state = conn.transaction_state.get();
let current_state = conn.get_tx_state();
let (new_transaction_state, updated) = if conn.is_nested_stmt.get() {
(current_state, false)
} else {
@@ -2313,9 +2313,9 @@ pub fn op_transaction_inner(
// start a new one.
if matches!(current_state, TransactionState::None) {
pager.end_read_tx()?;
conn.transaction_state.replace(TransactionState::None);
conn.set_tx_state(TransactionState::None);
}
assert_eq!(conn.transaction_state.get(), current_state);
assert_eq!(conn.get_tx_state(), current_state);
return Err(LimboError::Busy);
}
if let IOResult::IO(io) = begin_w_tx_res? {
@@ -2323,8 +2323,7 @@ pub fn op_transaction_inner(
// end the read transaction.
program
.connection
.transaction_state
.replace(TransactionState::PendingUpgrade);
.set_tx_state(TransactionState::PendingUpgrade);
return Ok(InsnFunctionStepResult::IO(io));
}
}
@@ -2332,7 +2331,7 @@ pub fn op_transaction_inner(
// 3. Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error
if updated {
conn.transaction_state.replace(new_transaction_state);
conn.set_tx_state(new_transaction_state);
}
state.op_transaction_state = OpTransactionState::CheckSchemaCookie;
continue;
@@ -2391,7 +2390,7 @@ pub fn op_auto_commit(
.map(Into::into);
}
if *auto_commit != conn.auto_commit.get() {
if *auto_commit != conn.auto_commit.load(Ordering::SeqCst) {
if *rollback {
// TODO(pere): add rollback I/O logic once we implement rollback journal
if let Some(mv_store) = mv_store {
@@ -2401,10 +2400,10 @@ pub fn op_auto_commit(
} else {
return_if_io!(pager.end_tx(true, &conn));
}
conn.transaction_state.replace(TransactionState::None);
conn.auto_commit.replace(true);
conn.set_tx_state(TransactionState::None);
conn.auto_commit.store(true, Ordering::SeqCst);
} else {
conn.auto_commit.replace(*auto_commit);
conn.auto_commit.store(*auto_commit, Ordering::SeqCst);
}
} else {
let mvcc_tx_active = program.connection.mv_tx.get().is_some();
@@ -4817,7 +4816,7 @@ pub fn op_function(
));
};
let table = {
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
match schema.get_table(table.as_str()) {
Some(table) => table,
None => {
@@ -5354,6 +5353,49 @@ pub fn op_function(
Ok(InsnFunctionStepResult::Step)
}
pub fn op_sequence(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Arc<Pager>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
load_insn!(
Sequence {
cursor_id,
target_reg
},
insn
);
let cursor = state.get_cursor(*cursor_id).as_sorter_mut();
let seq_num = cursor.next_sequence();
state.registers[*target_reg] = Register::Value(Value::Integer(seq_num));
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
pub fn op_sequence_test(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Arc<Pager>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
load_insn!(
SequenceTest {
cursor_id,
target_pc,
value_reg
},
insn
);
let cursor = state.get_cursor(*cursor_id).as_sorter_mut();
if cursor.seq_beginning() {
state.pc = target_pc.as_offset_int();
}
Ok(InsnFunctionStepResult::Step)
}
pub fn op_init_coroutine(
program: &Program,
state: &mut ProgramState,
@@ -5483,7 +5525,7 @@ pub fn op_insert(
loop {
match &state.op_insert_state.sub_state {
OpInsertSubState::MaybeCaptureRecord => {
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
let dependent_views = schema.get_dependent_materialized_views(table_name);
// If there are no dependent views, we don't need to capture the old record.
// We also don't need to do it if the rowid of the UPDATEd row was changed, because that means
@@ -5593,7 +5635,7 @@ pub fn op_insert(
if root_page != 1 {
state.op_insert_state.sub_state = OpInsertSubState::UpdateLastRowid;
} else {
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
let dependent_views = schema.get_dependent_materialized_views(table_name);
if !dependent_views.is_empty() {
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
@@ -5614,7 +5656,7 @@ pub fn op_insert(
let prev_changes = program.n_change.get();
program.n_change.set(prev_changes + 1);
}
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
let dependent_views = schema.get_dependent_materialized_views(table_name);
if !dependent_views.is_empty() {
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
@@ -5623,7 +5665,7 @@ pub fn op_insert(
break;
}
OpInsertSubState::ApplyViewChange => {
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
let dependent_views = schema.get_dependent_materialized_views(table_name);
assert!(!dependent_views.is_empty());
@@ -5658,7 +5700,7 @@ pub fn op_insert(
.collect::<Vec<_>>();
// Fix rowid alias columns: replace Null with actual rowid value
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
if let Some(table) = schema.get_table(table_name) {
for (i, col) in table.columns().iter().enumerate() {
if col.is_rowid_alias && i < new_values.len() {
@@ -5751,7 +5793,7 @@ pub fn op_delete(
loop {
match &state.op_delete_state.sub_state {
OpDeleteSubState::MaybeCaptureRecord => {
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
let dependent_views = schema.get_dependent_materialized_views(table_name);
if dependent_views.is_empty() {
state.op_delete_state.sub_state = OpDeleteSubState::Delete;
@@ -5800,7 +5842,7 @@ pub fn op_delete(
}
// Increment metrics for row write (DELETE is a write operation)
state.metrics.rows_written = state.metrics.rows_written.saturating_add(1);
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
let dependent_views = schema.get_dependent_materialized_views(table_name);
if dependent_views.is_empty() {
break;
@@ -5809,7 +5851,7 @@ pub fn op_delete(
continue;
}
OpDeleteSubState::ApplyViewChange => {
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
let dependent_views = schema.get_dependent_materialized_views(table_name);
assert!(!dependent_views.is_empty());
let maybe_deleted_record = state.op_delete_state.deleted_record.take();
@@ -6507,7 +6549,7 @@ pub fn op_open_write(
};
if let Some(index) = maybe_index {
let conn = program.connection.clone();
let schema = conn.schema.borrow();
let schema = conn.schema.read();
let table = schema
.get_table(&index.table_name)
.and_then(|table| table.btree());
@@ -6784,8 +6826,8 @@ pub fn op_parse_schema(
let conn = program.connection.clone();
// set auto commit to false in order for parse schema to not commit changes as transaction state is stored in connection,
// and we use the same connection for nested query.
let previous_auto_commit = conn.auto_commit.get();
conn.auto_commit.set(false);
let previous_auto_commit = conn.auto_commit.load(Ordering::SeqCst);
conn.auto_commit.store(false, Ordering::SeqCst);
let maybe_nested_stmt_err = if let Some(where_clause) = where_clause {
let stmt = conn.prepare(format!("SELECT * FROM sqlite_schema WHERE {where_clause}"))?;
@@ -6821,7 +6863,8 @@ pub fn op_parse_schema(
})
};
conn.is_nested_stmt.set(false);
conn.auto_commit.set(previous_auto_commit);
conn.auto_commit
.store(previous_auto_commit, Ordering::SeqCst);
maybe_nested_stmt_err?;
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
@@ -6866,7 +6909,7 @@ pub fn op_populate_materialized_views(
// Now populate the views (after releasing the schema borrow)
for (view_name, _root_page, cursor_id) in view_info {
let schema = conn.schema.borrow();
let schema = conn.schema.read();
if let Some(view) = schema.get_materialized_view(&view_name) {
let mut view = view.lock().unwrap();
// Drop the schema borrow before calling populate_from_table
@@ -6964,9 +7007,9 @@ pub fn op_set_cookie(
Cookie::IncrementalVacuum => header.incremental_vacuum_enabled = (*value as u32).into(),
Cookie::SchemaVersion => {
// we update transaction state to indicate that the schema has changed
match program.connection.transaction_state.get() {
match program.connection.get_tx_state() {
TransactionState::Write { schema_did_change } => {
program.connection.transaction_state.set(TransactionState::Write { schema_did_change: true });
program.connection.set_tx_state(TransactionState::Write { schema_did_change: true });
},
TransactionState::Read => unreachable!("invalid transaction state for SetCookie: TransactionState::Read, should be write"),
TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"),

View File

@@ -1759,6 +1759,24 @@ pub fn insn_to_row(
0,
String::new(),
),
Insn::Sequence{ cursor_id, target_reg} => (
"Sequence",
*cursor_id as i32,
*target_reg as i32,
0,
Value::build_text(""),
0,
String::new(),
),
Insn::SequenceTest{ cursor_id, target_pc, value_reg } => (
"SequenceTest",
*cursor_id as i32,
target_pc.as_debug_int(),
*value_reg as i32,
Value::build_text(""),
0,
String::new(),
),
}
}

View File

@@ -1126,6 +1126,20 @@ pub enum Insn {
target_pc: BranchOffset,
},
/// Find the next available sequence number for cursor P1. Write the sequence number into register P2.
/// The sequence number on the cursor is incremented after this instruction.
Sequence {
cursor_id: CursorID,
target_reg: usize,
},
/// P1 is a sorter cursor. If the sequence counter is currently zero, jump to P2. Regardless of whether or not the jump is taken, increment the the sequence value.
SequenceTest {
cursor_id: CursorID,
target_pc: BranchOffset,
value_reg: usize,
},
// OP_Explain
Explain {
p1: usize, // P1: address of instruction
@@ -1295,6 +1309,8 @@ impl InsnVariants {
InsnVariants::IfNeg => execute::op_if_neg,
InsnVariants::Explain => execute::op_noop,
InsnVariants::OpenDup => execute::op_open_dup,
InsnVariants::Sequence => execute::op_sequence,
InsnVariants::SequenceTest => execute::op_sequence_test,
}
}
}

View File

@@ -62,7 +62,12 @@ use execute::{
use explain::{insn_to_row_with_comment, EXPLAIN_COLUMNS, EXPLAIN_QUERY_PLAN_COLUMNS};
use regex::Regex;
use std::{cell::Cell, collections::HashMap, num::NonZero, sync::Arc};
use std::{
cell::Cell,
collections::HashMap,
num::NonZero,
sync::{atomic::Ordering, Arc},
};
use tracing::{instrument, Level};
/// State machine for committing view deltas with I/O handling
@@ -526,7 +531,7 @@ impl Program {
debug_assert!(state.column_count() == EXPLAIN_COLUMNS.len());
if self.connection.closed.get() {
// Connection is closed for whatever reason, rollback the transaction.
let state = self.connection.transaction_state.get();
let state = self.connection.get_tx_state();
if let TransactionState::Write { .. } = state {
pager.io.block(|| pager.end_tx(true, &self.connection))?;
}
@@ -581,7 +586,7 @@ impl Program {
loop {
if self.connection.closed.get() {
// Connection is closed for whatever reason, rollback the transaction.
let state = self.connection.transaction_state.get();
let state = self.connection.get_tx_state();
if let TransactionState::Write { .. } = state {
pager.io.block(|| pager.end_tx(true, &self.connection))?;
}
@@ -629,7 +634,7 @@ impl Program {
loop {
if self.connection.closed.get() {
// Connection is closed for whatever reason, rollback the transaction.
let state = self.connection.transaction_state.get();
let state = self.connection.get_tx_state();
if let TransactionState::Write { .. } = state {
pager.io.block(|| pager.end_tx(true, &self.connection))?;
}
@@ -718,7 +723,7 @@ impl Program {
}
// Not a rollback - proceed with processing
let schema = self.connection.schema.borrow();
let schema = self.connection.schema.read();
// Collect materialized views - they should all have storage
let mut views = Vec::new();
@@ -764,7 +769,7 @@ impl Program {
.unwrap()
.get_table_deltas();
let schema = self.connection.schema.borrow();
let schema = self.connection.schema.read();
if let Some(view_mutex) = schema.get_materialized_view(view_name) {
let mut view = view_mutex.lock().unwrap();
@@ -814,7 +819,7 @@ impl Program {
// Reset state for next use
program_state.view_delta_state = ViewDeltaCommitState::NotStarted;
if self.connection.transaction_state.get() == TransactionState::None {
if self.connection.get_tx_state() == TransactionState::None {
// No need to do any work here if not in tx. Current MVCC logic doesn't work with this assumption,
// hence the mv_store.is_none() check.
return Ok(IOResult::Done(()));
@@ -825,7 +830,7 @@ impl Program {
return Ok(IOResult::Done(()));
}
let conn = self.connection.clone();
let auto_commit = conn.auto_commit.get();
let auto_commit = conn.auto_commit.load(Ordering::SeqCst);
if auto_commit {
// FIXME: we don't want to commit stuff from other programs.
if matches!(program_state.commit_state, CommitState::Ready) {
@@ -843,7 +848,7 @@ impl Program {
IOResult::Done(_) => {
assert!(state_machine.is_finalized());
conn.mv_tx.set(None);
conn.transaction_state.replace(TransactionState::None);
conn.set_tx_state(TransactionState::None);
program_state.commit_state = CommitState::Ready;
return Ok(IOResult::Done(()));
}
@@ -855,14 +860,14 @@ impl Program {
Ok(IOResult::Done(()))
} else {
let connection = self.connection.clone();
let auto_commit = connection.auto_commit.get();
let auto_commit = connection.auto_commit.load(Ordering::SeqCst);
tracing::trace!(
"Halt auto_commit {}, state={:?}",
auto_commit,
program_state.commit_state
);
if matches!(program_state.commit_state, CommitState::Committing) {
let TransactionState::Write { .. } = connection.transaction_state.get() else {
let TransactionState::Write { .. } = connection.get_tx_state() else {
unreachable!("invalid state for write commit step")
};
self.step_end_write_txn(
@@ -872,7 +877,7 @@ impl Program {
rollback,
)
} else if auto_commit {
let current_state = connection.transaction_state.get();
let current_state = connection.get_tx_state();
tracing::trace!("Auto-commit state: {:?}", current_state);
match current_state {
TransactionState::Write { .. } => self.step_end_write_txn(
@@ -882,7 +887,7 @@ impl Program {
rollback,
),
TransactionState::Read => {
connection.transaction_state.replace(TransactionState::None);
connection.set_tx_state(TransactionState::None);
pager.end_read_tx()?;
Ok(IOResult::Done(()))
}
@@ -914,7 +919,7 @@ impl Program {
if self.change_cnt_on {
self.connection.set_changes(self.n_change.get());
}
connection.transaction_state.replace(TransactionState::None);
connection.set_tx_state(TransactionState::None);
*commit_state = CommitState::Ready;
}
IOResult::IO(io) => {
@@ -1078,8 +1083,8 @@ pub fn handle_program_error(
_ => {
if let Some(mv_store) = mv_store {
if let Some((tx_id, _)) = connection.mv_tx.get() {
connection.transaction_state.replace(TransactionState::None);
connection.auto_commit.replace(true);
connection.set_tx_state(TransactionState::None);
connection.auto_commit.store(true, Ordering::SeqCst);
mv_store.rollback_tx(tx_id, pager.clone(), connection)?;
}
} else {
@@ -1090,7 +1095,7 @@ pub fn handle_program_error(
tracing::error!("end_tx failed: {e}");
})?;
}
connection.transaction_state.replace(TransactionState::None);
connection.set_tx_state(TransactionState::None);
}
}
Ok(())

View File

@@ -88,6 +88,7 @@ pub struct Sorter {
insert_state: InsertState,
/// State machine for [Sorter::init_chunk_heap]
init_chunk_heap_state: InitChunkHeapState,
seq_count: i64,
}
impl Sorter {
@@ -125,6 +126,7 @@ impl Sorter {
sort_state: SortState::Start,
insert_state: InsertState::Start,
init_chunk_heap_state: InitChunkHeapState::Start,
seq_count: 0,
}
}
@@ -136,6 +138,21 @@ impl Sorter {
self.current.is_some()
}
/// Get current sequence count and increment it
pub fn next_sequence(&mut self) -> i64 {
let current = self.seq_count;
self.seq_count += 1;
current
}
/// Test if at beginning of sequence (count == 0) and increment
/// Returns true if this was the first call (seq_count was 0)
pub fn seq_beginning(&mut self) -> bool {
let was_zero = self.seq_count == 0;
self.seq_count += 1;
was_zero
}
// We do the sorting here since this is what is called by the SorterSort instruction
pub fn sort(&mut self) -> Result<IOResult<()>> {
loop {
@@ -578,6 +595,7 @@ struct SortableImmutableRecord {
record: ImmutableRecord,
cursor: RecordCursor,
key_values: RefCell<Vec<RefValue>>,
key_len: usize,
index_key_info: Rc<Vec<KeyInfo>>,
/// The key deserialization error, if any.
deserialization_error: RefCell<Option<LimboError>>,
@@ -601,6 +619,7 @@ impl SortableImmutableRecord {
key_values: RefCell::new(Vec::with_capacity(key_len)),
index_key_info,
deserialization_error: RefCell::new(None),
key_len,
})
}
@@ -638,7 +657,7 @@ impl Ord for SortableImmutableRecord {
let this_key_values_len = self.key_values.borrow().len();
let other_key_values_len = other.key_values.borrow().len();
for i in 0..self.cursor.serial_types.len() {
for i in 0..self.key_len {
// Lazily deserialize the key values if they haven't been deserialized already.
if i >= this_key_values_len {
self.try_deserialize_key(i);

View File

@@ -17,7 +17,6 @@ from pathlib import Path
# Define all npm package paths in one place
NPM_PACKAGES = [
"bindings/javascript",
"bindings/javascript/packages/wasm-runtime",
"bindings/javascript/packages/common",
"bindings/javascript/packages/native",
"bindings/javascript/packages/browser",
@@ -28,7 +27,6 @@ NPM_PACKAGES = [
]
NPM_WORKSPACE_PACKAGES = [
"@tursodatabase/wasm-runtime",
"@tursodatabase/database-common",
"@tursodatabase/database-browser-common",
"@tursodatabase/sync-common",

View File

@@ -82,7 +82,7 @@ impl InteractionPlan {
/// delete interactions from the human readable file, and this function uses the JSON file as
/// a baseline to detect with interactions were deleted and constructs the plan from the
/// remaining interactions.
pub(crate) fn compute_via_diff(plan_path: &Path) -> Vec<Interaction> {
pub(crate) fn compute_via_diff(plan_path: &Path) -> impl InteractionPlanIterator {
let interactions = std::fs::read_to_string(plan_path).unwrap();
let interactions = interactions.lines().collect::<Vec<_>>();
@@ -137,7 +137,9 @@ impl InteractionPlan {
}
}
let _ = plan.split_off(j);
plan.into_iter().flatten().collect()
PlanIterator {
iter: plan.into_iter().flatten(),
}
}
pub fn interactions_list(&self) -> Vec<Interaction> {
@@ -209,14 +211,11 @@ impl InteractionPlan {
stats
}
pub fn generate_plan<R: rand::Rng>(rng: &mut R, env: &mut SimulatorEnv) -> Self {
pub fn init_plan(env: &mut SimulatorEnv) -> Self {
let mut plan = InteractionPlan::new(env.profile.experimental_mvcc);
let num_interactions = env.opts.max_interactions as usize;
// First create at least one table
let create_query = Create::arbitrary(rng, &env.connection_context(0));
env.committed_tables.push(create_query.table.clone());
let create_query = Create::arbitrary(&mut env.rng.clone(), &env.connection_context(0));
// initial query starts at 0th connection
plan.plan.push(Interactions::new(
@@ -224,21 +223,50 @@ impl InteractionPlan {
InteractionsType::Query(Query::Create(create_query)),
));
while plan.len() < num_interactions {
tracing::debug!("Generating interaction {}/{}", plan.len(), num_interactions);
plan
}
/// Appends a new [Interactions] and outputs the next set of [Interaction] to take
pub fn generate_next_interaction(
&mut self,
rng: &mut impl rand::Rng,
env: &mut SimulatorEnv,
) -> Option<Vec<Interaction>> {
let num_interactions = env.opts.max_interactions as usize;
if self.len() < num_interactions {
tracing::debug!("Generating interaction {}/{}", self.len(), num_interactions);
let interactions = {
let conn_index = env.choose_conn(rng);
let conn_ctx = &env.connection_context(conn_index);
Interactions::arbitrary_from(rng, conn_ctx, (env, plan.stats(), conn_index))
Interactions::arbitrary_from(rng, conn_ctx, (env, self.stats(), conn_index))
};
interactions.shadow(&mut env.get_conn_tables_mut(interactions.connection_index));
plan.push(interactions);
let out_interactions = interactions.interactions();
assert!(!out_interactions.is_empty());
self.push(interactions);
Some(out_interactions)
} else {
None
}
}
tracing::info!("Generated plan with {} interactions", plan.plan.len());
pub fn generator<'a>(
&'a mut self,
rng: &'a mut impl rand::Rng,
) -> impl InteractionPlanIterator {
let interactions = self.interactions_list();
let iter = interactions.into_iter();
PlanGenerator {
plan: self,
iter,
rng,
}
}
plan
pub fn static_iterator(&self) -> impl InteractionPlanIterator {
PlanIterator {
iter: self.interactions_list().into_iter(),
}
}
}
@@ -266,6 +294,56 @@ impl IntoIterator for InteractionPlan {
}
}
pub trait InteractionPlanIterator {
fn next(&mut self, env: &mut SimulatorEnv) -> Option<Interaction>;
}
impl<T: InteractionPlanIterator> InteractionPlanIterator for &mut T {
fn next(&mut self, env: &mut SimulatorEnv) -> Option<Interaction> {
T::next(self, env)
}
}
pub struct PlanGenerator<'a, R: rand::Rng> {
plan: &'a mut InteractionPlan,
iter: <Vec<Interaction> as IntoIterator>::IntoIter,
rng: &'a mut R,
}
impl<'a, R: rand::Rng> InteractionPlanIterator for PlanGenerator<'a, R> {
/// try to generate the next [Interactions] and store it
fn next(&mut self, env: &mut SimulatorEnv) -> Option<Interaction> {
self.iter.next().or_else(|| {
// Iterator ended, try to create a new iterator
// This will not be an infinte sequence because generate_next_interaction will eventually
// stop generating
let mut iter = self
.plan
.generate_next_interaction(self.rng, env)
.map_or(Vec::new().into_iter(), |interactions| {
interactions.into_iter()
});
let next = iter.next();
self.iter = iter;
next
})
}
}
pub struct PlanIterator<I: Iterator<Item = Interaction>> {
iter: I,
}
impl<I> InteractionPlanIterator for PlanIterator<I>
where
I: Iterator<Item = Interaction>,
{
fn next(&mut self, _env: &mut SimulatorEnv) -> Option<Interaction> {
self.iter.next()
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct InteractionPlanState {
pub interaction_pointer: usize,

View File

@@ -1,7 +1,7 @@
#![allow(clippy::arc_with_non_send_sync, dead_code)]
use anyhow::anyhow;
use clap::Parser;
use generation::plan::{Interaction, InteractionPlan, InteractionPlanState};
use generation::plan::{InteractionPlan, InteractionPlanState};
use notify::event::{DataChange, ModifyKind};
use notify::{EventKind, RecursiveMode, Watcher};
use rand::prelude::*;
@@ -15,13 +15,14 @@ use std::backtrace::Backtrace;
use std::fs::OpenOptions;
use std::io::{IsTerminal, Write};
use std::path::Path;
use std::rc::Rc;
use std::sync::{Arc, Mutex, mpsc};
use tracing_subscriber::EnvFilter;
use tracing_subscriber::field::MakeExt;
use tracing_subscriber::fmt::format;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use crate::generation::plan::ConnectionState;
use crate::generation::plan::{ConnectionState, InteractionPlanIterator};
use crate::profiles::Profile;
use crate::runner::doublecheck;
use crate::runner::env::{Paths, SimulationPhase, SimulationType};
@@ -179,16 +180,10 @@ fn watch_mode(env: SimulatorEnv) -> notify::Result<()> {
let result = SandboxedResult::from(
std::panic::catch_unwind(move || {
let mut env = env;
let plan = InteractionPlan::compute_via_diff(&env.get_plan_path());
tracing::error!("plan_len: {}", plan.len());
let plan_path = env.get_plan_path();
let plan = InteractionPlan::compute_via_diff(&plan_path);
env.clear();
// plan.iter().for_each(|is| {
// is.iter().for_each(|i| {
// let _ = i.shadow(&mut env.tables);
// });
// });
let env = Arc::new(Mutex::new(env.clone_without_connections()));
run_simulation_default(env, plan, last_execution_.clone())
}),
@@ -236,16 +231,38 @@ fn run_simulator(
}));
let last_execution = Arc::new(Mutex::new(Execution::new(0, 0)));
let mut gen_rng = env.gen_rng();
let env = Arc::new(Mutex::new(env));
let result = SandboxedResult::from(
std::panic::catch_unwind(|| {
let interactions = plan.interactions_list();
run_simulation(env.clone(), interactions, last_execution.clone())
}),
last_execution.clone(),
);
// Need to wrap in Rc Mutex due to the UnwindSafe barrier
let plan = Rc::new(Mutex::new(plan));
let result = {
let sim_execution = last_execution.clone();
let sim_plan = plan.clone();
let sim_env = env.clone();
SandboxedResult::from(
std::panic::catch_unwind(move || {
let mut sim_plan = sim_plan.lock().unwrap();
let plan = sim_plan.generator(&mut gen_rng);
run_simulation(sim_env, plan, sim_execution)
}),
last_execution.clone(),
)
};
env.clear_poison();
plan.clear_poison();
let env = env.lock().unwrap();
let plan = plan.lock().unwrap();
tracing::info!("{}", plan.stats());
std::fs::write(env.get_plan_path(), plan.to_string()).unwrap();
std::fs::write(
env.get_plan_path().with_extension("json"),
serde_json::to_string_pretty(&*plan).unwrap(),
)
.unwrap();
// No doublecheck, run shrinking if panicking or found a bug.
match &result {
@@ -306,9 +323,9 @@ fn run_simulator(
let env = Arc::new(Mutex::new(env));
let shrunk = SandboxedResult::from(
std::panic::catch_unwind(|| {
let interactions = shrunk_plan.interactions_list();
let plan = shrunk_plan.static_iterator();
run_simulation(env.clone(), interactions, last_execution.clone())
run_simulation(env.clone(), plan, last_execution.clone())
}),
last_execution,
);
@@ -520,16 +537,7 @@ fn setup_simulation(
tracing::info!("Generating database interaction plan...");
let plan = InteractionPlan::generate_plan(&mut env.rng.clone(), &mut env);
// todo: for now, we only use 1 connection, so it's safe to use the first plan.
tracing::info!("{}", plan.stats());
std::fs::write(env.get_plan_path(), plan.to_string()).unwrap();
std::fs::write(
env.get_plan_path().with_extension("json"),
serde_json::to_string_pretty(&plan).unwrap(),
)
.unwrap();
let plan = InteractionPlan::init_plan(&mut env);
(seed, env, plan)
}
@@ -537,7 +545,7 @@ fn setup_simulation(
fn run_simulation(
env: Arc<Mutex<SimulatorEnv>>,
plan: Vec<Interaction>,
plan: impl InteractionPlanIterator,
last_execution: Arc<Mutex<Execution>>,
) -> ExecutionResult {
let simulation_type = {
@@ -570,7 +578,7 @@ fn run_simulation(
fn run_simulation_default(
env: Arc<Mutex<SimulatorEnv>>,
plan: Vec<Interaction>,
plan: impl InteractionPlanIterator,
last_execution: Arc<Mutex<Execution>>,
) -> ExecutionResult {
tracing::info!("Executing database interaction plan...");

View File

@@ -7,7 +7,10 @@ use itertools::Itertools;
use similar_asserts::SimpleDiff;
use sql_generation::model::table::SimValue;
use crate::generation::plan::{ConnectionState, Interaction, InteractionPlanState};
use crate::{
generation::plan::{ConnectionState, InteractionPlanIterator, InteractionPlanState},
runner::execution::ExecutionContinuation,
};
use super::{
env::SimulatorEnv,
@@ -17,7 +20,7 @@ use super::{
pub fn run_simulation(
env: Arc<Mutex<SimulatorEnv>>,
rusqlite_env: Arc<Mutex<SimulatorEnv>>,
plan: Vec<Interaction>,
plan: impl InteractionPlanIterator,
last_execution: Arc<Mutex<Execution>>,
) -> ExecutionResult {
tracing::info!("Executing database interaction plan...");
@@ -55,7 +58,7 @@ pub fn run_simulation(
pub(crate) fn execute_interactions(
env: Arc<Mutex<SimulatorEnv>>,
rusqlite_env: Arc<Mutex<SimulatorEnv>>,
interactions: Vec<Interaction>,
mut plan: impl InteractionPlanIterator,
state: &mut InteractionPlanState,
conn_states: &mut [ConnectionState],
rusqlite_states: &mut [ConnectionState],
@@ -69,15 +72,13 @@ pub(crate) fn execute_interactions(
env.clear_tables();
rusqlite_env.clear_tables();
let mut interaction = plan
.next(&mut env)
.expect("we should always have at least 1 interaction to start");
let now = std::time::Instant::now();
for _tick in 0..env.opts.ticks {
if state.interaction_pointer >= interactions.len() {
break;
}
let interaction = &interactions[state.interaction_pointer];
let connection_index = interaction.connection_index;
let turso_conn_state = &mut conn_states[connection_index];
let rusqlite_conn_state = &mut rusqlite_states[connection_index];
@@ -89,39 +90,35 @@ pub(crate) fn execute_interactions(
last_execution.connection_index = connection_index;
last_execution.interaction_index = state.interaction_pointer;
let mut turso_state = state.clone();
let mut rusqlite_state = state.clone();
// first execute turso
let turso_res = super::execution::execute_plan(
&mut env,
interaction,
turso_conn_state,
&mut turso_state,
);
let turso_res = super::execution::execute_plan(&mut env, &interaction, turso_conn_state);
// second execute rusqlite
let rusqlite_res = super::execution::execute_plan(
&mut rusqlite_env,
interaction,
rusqlite_conn_state,
&mut rusqlite_state,
);
let rusqlite_res =
super::execution::execute_plan(&mut rusqlite_env, &interaction, rusqlite_conn_state);
// Compare results
if let Err(err) = compare_results(
let next = match compare_results(
turso_res,
turso_conn_state,
rusqlite_res,
rusqlite_conn_state,
) {
return ExecutionResult::new(history, Some(err));
Ok(next) => next,
Err(err) => return ExecutionResult::new(history, Some(err)),
};
match next {
ExecutionContinuation::Stay => {}
ExecutionContinuation::NextInteraction => {
state.interaction_pointer += 1;
let Some(new_interaction) = plan.next(&mut env) else {
break;
};
interaction = new_interaction;
}
}
assert_eq!(turso_state, rusqlite_state);
*state = turso_state;
// Check if the maximum time for the simulation has been reached
if now.elapsed().as_secs() >= env.opts.max_time_simulation as u64 {
return ExecutionResult::new(
@@ -137,13 +134,14 @@ pub(crate) fn execute_interactions(
}
fn compare_results(
turso_res: turso_core::Result<()>,
turso_res: turso_core::Result<ExecutionContinuation>,
turso_conn_state: &mut ConnectionState,
rusqlite_res: turso_core::Result<()>,
rusqlite_res: turso_core::Result<ExecutionContinuation>,
rusqlite_conn_state: &mut ConnectionState,
) -> turso_core::Result<()> {
match (turso_res, rusqlite_res) {
(Ok(..), Ok(..)) => {
) -> turso_core::Result<ExecutionContinuation> {
let next = match (turso_res, rusqlite_res) {
(Ok(v1), Ok(v2)) => {
assert_eq!(v1, v2);
let turso_values = turso_conn_state.stack.last();
let rusqlite_values = rusqlite_conn_state.stack.last();
match (turso_values, rusqlite_values) {
@@ -222,8 +220,9 @@ fn compare_results(
));
}
}
v1
}
(None, None) => {}
(None, None) => v1,
_ => {
tracing::error!("limbo and rusqlite results do not match");
return Err(turso_core::LimboError::InternalError(
@@ -251,9 +250,10 @@ fn compare_results(
// The problem is that the errors might be different, and we cannot
// just assume both of them being errors has the same semantics.
// return Err(err);
ExecutionContinuation::NextInteraction
}
}
Ok(())
};
Ok(next)
}
fn count_rows(values: &[Vec<SimValue>]) -> BTreeMap<&Vec<SimValue>, i32> {

View File

@@ -3,7 +3,10 @@ use std::{
sync::{Arc, Mutex},
};
use crate::generation::plan::{ConnectionState, Interaction, InteractionPlanState};
use crate::{
generation::plan::{ConnectionState, InteractionPlanIterator, InteractionPlanState},
runner::execution::ExecutionContinuation,
};
use super::{
env::SimulatorEnv,
@@ -13,7 +16,7 @@ use super::{
pub fn run_simulation(
env: Arc<Mutex<SimulatorEnv>>,
doublecheck_env: Arc<Mutex<SimulatorEnv>>,
plan: Vec<Interaction>,
plan: impl InteractionPlanIterator,
last_execution: Arc<Mutex<Execution>>,
) -> ExecutionResult {
tracing::info!("Executing database interaction plan...");
@@ -78,7 +81,7 @@ pub fn run_simulation(
pub(crate) fn execute_plans(
env: Arc<Mutex<SimulatorEnv>>,
doublecheck_env: Arc<Mutex<SimulatorEnv>>,
interactions: Vec<Interaction>,
mut plan: impl InteractionPlanIterator,
state: &mut InteractionPlanState,
conn_states: &mut [ConnectionState],
doublecheck_states: &mut [ConnectionState],
@@ -92,15 +95,13 @@ pub(crate) fn execute_plans(
env.clear_tables();
doublecheck_env.clear_tables();
let mut interaction = plan
.next(&mut env)
.expect("we should always have at least 1 interaction to start");
let now = std::time::Instant::now();
for _tick in 0..env.opts.ticks {
if state.interaction_pointer >= interactions.len() {
break;
}
let interaction = &interactions[state.interaction_pointer];
let connection_index = interaction.connection_index;
let turso_conn_state = &mut conn_states[connection_index];
let doublecheck_conn_state = &mut doublecheck_states[connection_index];
@@ -112,40 +113,38 @@ pub(crate) fn execute_plans(
last_execution.connection_index = connection_index;
last_execution.interaction_index = state.interaction_pointer;
let mut turso_state = state.clone();
// first execute turso
let turso_res = super::execution::execute_plan(
&mut env,
interaction,
turso_conn_state,
&mut turso_state,
);
let mut doublecheck_state = state.clone();
let turso_res = super::execution::execute_plan(&mut env, &interaction, turso_conn_state);
// second execute doublecheck
let doublecheck_res = super::execution::execute_plan(
&mut doublecheck_env,
interaction,
&interaction,
doublecheck_conn_state,
&mut doublecheck_state,
);
// Compare results
if let Err(err) = compare_results(
let next = match compare_results(
turso_res,
turso_conn_state,
doublecheck_res,
doublecheck_conn_state,
) {
return ExecutionResult::new(history, Some(err));
Ok(next) => next,
Err(err) => return ExecutionResult::new(history, Some(err)),
};
match next {
ExecutionContinuation::Stay => {}
ExecutionContinuation::NextInteraction => {
state.interaction_pointer += 1;
let Some(new_interaction) = plan.next(&mut env) else {
break;
};
interaction = new_interaction;
}
}
assert_eq!(turso_state, doublecheck_state);
*state = turso_state;
// Check if the maximum time for the simulation has been reached
if now.elapsed().as_secs() >= env.opts.max_time_simulation as u64 {
return ExecutionResult::new(
@@ -161,13 +160,14 @@ pub(crate) fn execute_plans(
}
fn compare_results(
turso_res: turso_core::Result<()>,
turso_res: turso_core::Result<ExecutionContinuation>,
turso_state: &mut ConnectionState,
doublecheck_res: turso_core::Result<()>,
doublecheck_res: turso_core::Result<ExecutionContinuation>,
doublecheck_state: &mut ConnectionState,
) -> turso_core::Result<()> {
match (turso_res, doublecheck_res) {
(Ok(..), Ok(..)) => {
) -> turso_core::Result<ExecutionContinuation> {
let next = match (turso_res, doublecheck_res) {
(Ok(v1), Ok(v2)) => {
assert_eq!(v1, v2);
let turso_values = turso_state.stack.last();
let doublecheck_values = doublecheck_state.stack.last();
match (turso_values, doublecheck_values) {
@@ -184,6 +184,7 @@ fn compare_results(
"returned values from limbo and doublecheck results do not match".into(),
));
}
v1
}
(Err(limbo_err), Err(doublecheck_err)) => {
if limbo_err.to_string() != doublecheck_err.to_string() {
@@ -194,6 +195,7 @@ fn compare_results(
"limbo and doublecheck errors do not match".into(),
));
}
v1
}
(Ok(limbo_result), Err(doublecheck_err)) => {
tracing::error!(
@@ -216,7 +218,7 @@ fn compare_results(
}
}
}
(None, None) => {}
(None, None) => v1,
_ => {
tracing::error!("limbo and doublecheck results do not match");
return Err(turso_core::LimboError::InternalError(
@@ -245,7 +247,8 @@ fn compare_results(
"limbo and doublecheck errors do not match".into(),
));
}
ExecutionContinuation::NextInteraction
}
}
Ok(())
};
Ok(next)
}

View File

@@ -142,6 +142,8 @@ pub(crate) struct SimulatorEnv {
pub(crate) io: Arc<dyn SimIO>,
pub(crate) db: Option<Arc<Database>>,
pub(crate) rng: ChaCha8Rng,
seed: u64,
pub(crate) paths: Paths,
pub(crate) type_: SimulationType,
pub(crate) phase: SimulationPhase,
@@ -162,6 +164,7 @@ impl SimulatorEnv {
io: self.io.clone(),
db: self.db.clone(),
rng: self.rng.clone(),
seed: self.seed,
paths: self.paths.clone(),
type_: self.type_,
phase: self.phase,
@@ -257,6 +260,12 @@ impl SimulatorEnv {
pub fn choose_conn(&self, rng: &mut impl Rng) -> usize {
rng.random_range(0..self.connections.len())
}
/// Rng only used for generating interactions. By having a separate Rng we can guarantee that a particular seed
/// will always create the same interactions plan, regardless of the changes that happen in the Database code
pub fn gen_rng(&self) -> ChaCha8Rng {
ChaCha8Rng::seed_from_u64(self.seed)
}
}
impl SimulatorEnv {
@@ -373,6 +382,7 @@ impl SimulatorEnv {
connections,
paths,
rng,
seed,
io,
db: Some(db),
type_: simulation_type,

View File

@@ -7,7 +7,10 @@ use turso_core::{Connection, LimboError, Result, StepResult, Value};
use crate::{
generation::{
Shadow as _,
plan::{ConnectionState, Interaction, InteractionPlanState, InteractionType, ResultSet},
plan::{
ConnectionState, Interaction, InteractionPlanIterator, InteractionPlanState,
InteractionType, ResultSet,
},
},
model::Query,
};
@@ -55,7 +58,7 @@ impl ExecutionResult {
pub(crate) fn execute_interactions(
env: Arc<Mutex<SimulatorEnv>>,
interactions: Vec<Interaction>,
mut plan: impl InteractionPlanIterator,
state: &mut InteractionPlanState,
conn_states: &mut [ConnectionState],
last_execution: Arc<Mutex<Execution>>,
@@ -67,15 +70,13 @@ pub(crate) fn execute_interactions(
env.clear_tables();
let mut interaction = plan
.next(&mut env)
.expect("we should always have at least 1 interaction to start");
for _tick in 0..env.opts.ticks {
tracing::trace!("Executing tick {}", _tick);
if state.interaction_pointer >= interactions.len() {
break;
}
let interaction = &interactions[state.interaction_pointer];
let connection_index = interaction.connection_index;
let conn_state = &mut conn_states[connection_index];
@@ -86,11 +87,18 @@ pub(crate) fn execute_interactions(
last_execution.connection_index = connection_index;
last_execution.interaction_index = state.interaction_pointer;
// Execute the interaction for the selected connection
match execute_plan(&mut env, interaction, conn_state, state) {
Ok(_) => {}
match execute_plan(&mut env, &interaction, conn_state) {
Ok(ExecutionContinuation::NextInteraction) => {
state.interaction_pointer += 1;
let Some(new_interaction) = plan.next(&mut env) else {
break;
};
interaction = new_interaction;
}
Err(err) => {
return ExecutionResult::new(history, Some(err));
}
_ => {}
}
// Check if the maximum time for the simulation has been reached
if now.elapsed().as_secs() >= env.opts.max_time_simulation as u64 {
@@ -110,33 +118,17 @@ pub fn execute_plan(
env: &mut SimulatorEnv,
interaction: &Interaction,
conn_state: &mut ConnectionState,
state: &mut InteractionPlanState,
) -> Result<()> {
) -> Result<ExecutionContinuation> {
let connection_index = interaction.connection_index;
let connection = &mut env.connections[connection_index];
if let SimConnection::Disconnected = connection {
tracing::debug!("connecting {}", connection_index);
env.connect(connection_index);
Ok(ExecutionContinuation::Stay)
} else {
tracing::debug!("connection {} already connected", connection_index);
match execute_interaction(env, interaction, &mut conn_state.stack) {
Ok(next_execution) => {
tracing::debug!("connection {} processed", connection_index);
// Move to the next interaction or property
match next_execution {
ExecutionContinuation::NextInteraction => {
state.interaction_pointer += 1;
}
}
}
Err(err) => {
tracing::error!("error {}", err);
return Err(err);
}
}
execute_interaction(env, interaction, &mut conn_state.stack)
}
Ok(())
}
/// The next point of control flow after executing an interaction.
@@ -145,6 +137,8 @@ pub fn execute_plan(
/// indicates the next step in the plan.
#[derive(PartialEq, Debug)]
pub(crate) enum ExecutionContinuation {
/// Stay in the current interaction
Stay,
/// Default continuation, execute the next interaction.
NextInteraction,
// /// Typically used in the case of preconditions failures, skip to the next property.

View File

@@ -339,9 +339,9 @@ impl InteractionPlan {
let last_execution = Arc::new(Mutex::new(*failing_execution));
let result = SandboxedResult::from(
std::panic::catch_unwind(|| {
let interactions = test_plan.interactions_list();
let plan = test_plan.static_iterator();
run_simulation(env.clone(), interactions, last_execution.clone())
run_simulation(env.clone(), plan, last_execution.clone())
}),
last_execution,
);

View File

@@ -18,8 +18,9 @@ use crate::{
io_operations::IoOperations,
protocol_io::{DataCompletion, DataPollResult, ProtocolIO},
server_proto::{
self, ExecuteStreamReq, PageData, PageUpdatesEncodingReq, PullUpdatesReqProtoBody,
PullUpdatesRespProtoBody, Stmt, StmtResult, StreamRequest,
self, Batch, BatchCond, BatchStep, BatchStreamReq, ExecuteStreamReq, PageData,
PageUpdatesEncodingReq, PullUpdatesReqProtoBody, PullUpdatesRespProtoBody, Stmt,
StmtResult, StreamRequest,
},
types::{
Coro, DatabasePullRevision, DatabaseRowTransformResult, DatabaseSyncEngineProtocolVersion,
@@ -713,23 +714,32 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
ignore_schema_changes: false,
..Default::default()
};
let step = |query, args| BatchStep {
stmt: Stmt {
sql: Some(query),
sql_id: None,
args,
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
},
condition: Some(BatchCond::Not {
cond: Box::new(BatchCond::IsAutocommit {}),
}),
};
let mut sql_over_http_requests = vec![
Stmt {
sql: Some("BEGIN IMMEDIATE".to_string()),
sql_id: None,
args: Vec::new(),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
},
Stmt {
sql: Some(TURSO_SYNC_CREATE_TABLE.to_string()),
sql_id: None,
args: Vec::new(),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
BatchStep {
stmt: Stmt {
sql: Some("BEGIN IMMEDIATE".to_string()),
sql_id: None,
args: Vec::new(),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
},
condition: None,
},
step(TURSO_SYNC_CREATE_TABLE.to_string(), Vec::new()),
];
let mut rows_changed = 0;
let mut changes = source.iterate_changes(iterate_opts)?;
@@ -797,14 +807,9 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
DatabaseTapeOperation::Commit => {
panic!("Commit operation must not be emited at this stage")
}
DatabaseTapeOperation::StmtReplay(replay) => sql_over_http_requests.push(Stmt {
sql: Some(replay.sql),
sql_id: None,
args: convert_to_args(replay.values),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
}),
DatabaseTapeOperation::StmtReplay(replay) => {
sql_over_http_requests.push(step(replay.sql, convert_to_args(replay.values)))
}
DatabaseTapeOperation::RowChange(change) => {
let replay_info = generator.replay_info(coro, &change).await?;
match change.change {
@@ -816,14 +821,8 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
before,
None,
);
sql_over_http_requests.push(Stmt {
sql: Some(replay_info.query.clone()),
sql_id: None,
args: convert_to_args(values),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
})
sql_over_http_requests
.push(step(replay_info.query.clone(), convert_to_args(values)))
}
DatabaseTapeRowChangeType::Insert { after } => {
let values = generator.replay_values(
@@ -833,14 +832,8 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
after,
None,
);
sql_over_http_requests.push(Stmt {
sql: Some(replay_info.query.clone()),
sql_id: None,
args: convert_to_args(values),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
})
sql_over_http_requests
.push(step(replay_info.query.clone(), convert_to_args(values)));
}
DatabaseTapeRowChangeType::Update {
after,
@@ -854,14 +847,8 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
after,
Some(updates),
);
sql_over_http_requests.push(Stmt {
sql: Some(replay_info.query.clone()),
sql_id: None,
args: convert_to_args(values),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
})
sql_over_http_requests
.push(step(replay_info.query.clone(), convert_to_args(values)));
}
DatabaseTapeRowChangeType::Update {
after,
@@ -875,14 +862,8 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
after,
None,
);
sql_over_http_requests.push(Stmt {
sql: Some(replay_info.query.clone()),
sql_id: None,
args: convert_to_args(values),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
});
sql_over_http_requests
.push(step(replay_info.query.clone(), convert_to_args(values)));
}
}
}
@@ -894,10 +875,9 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
// update turso_sync_last_change_id table with new value before commit
let next_change_id = last_change_id.unwrap_or(0);
tracing::info!("push_logical_changes: client_id={client_id}, set pull_gen={source_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}");
sql_over_http_requests.push(Stmt {
sql: Some(TURSO_SYNC_UPSERT_LAST_CHANGE_ID.to_string()),
sql_id: None,
args: vec![
sql_over_http_requests.push(step(
TURSO_SYNC_UPSERT_LAST_CHANGE_ID.to_string(),
vec![
server_proto::Value::Text {
value: client_id.to_string(),
},
@@ -908,27 +888,20 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
value: next_change_id,
},
],
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
});
));
}
sql_over_http_requests.push(Stmt {
sql: Some("COMMIT".to_string()),
sql_id: None,
args: Vec::new(),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
});
sql_over_http_requests.push(step("COMMIT".to_string(), Vec::new()));
tracing::trace!("hrana request: {:?}", sql_over_http_requests);
let replay_hrana_request = server_proto::PipelineReqBody {
baton: None,
requests: sql_over_http_requests
.into_iter()
.map(|stmt| StreamRequest::Execute(ExecuteStreamReq { stmt }))
.collect(),
requests: vec![StreamRequest::Batch(BatchStreamReq {
batch: Batch {
steps: sql_over_http_requests.into(),
replication_index: None,
},
})]
.into(),
};
let _ = sql_execute_http(coro, client, replay_hrana_request).await?;
@@ -1206,6 +1179,16 @@ async fn sql_execute_http<C: ProtocolIO, Ctx>(
server_proto::StreamResponse::Execute(execute) => {
results.push(execute.result);
}
server_proto::StreamResponse::Batch(batch) => {
if let Some(error) = batch.result.step_errors.into_iter().flatten().next() {
return Err(Error::DatabaseSyncEngineError(format!(
"failed to execute sql: {error:?}"
)));
}
for result in batch.result.step_results.into_iter().flatten() {
results.push(result);
}
}
},
}
}

View File

@@ -82,6 +82,8 @@ pub enum StreamRequest {
None,
/// See [`ExecuteStreamReq`]
Execute(ExecuteStreamReq),
/// See [`BatchStreamReq`]
Batch(BatchStreamReq),
}
#[derive(Serialize, Deserialize, Default, Debug, PartialEq)]
@@ -101,6 +103,66 @@ pub enum StreamResult {
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StreamResponse {
Execute(ExecuteStreamResp),
Batch(BatchStreamResp),
}
#[derive(Serialize, Deserialize, Debug)]
/// A request to execute a batch of SQL statements that may each have a [`BatchCond`] that must be satisfied for the statement to be executed.
pub struct BatchStreamReq {
pub batch: Batch,
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
/// A response to a [`BatchStreamReq`].
pub struct BatchStreamResp {
pub result: BatchResult,
}
#[derive(Clone, Deserialize, Serialize, Debug, Default, PartialEq)]
pub struct BatchResult {
pub step_results: Vec<Option<StmtResult>>,
pub step_errors: Vec<Option<Error>>,
#[serde(default, with = "option_u64_as_str")]
pub replication_index: Option<u64>,
}
#[derive(Clone, Deserialize, Serialize, Debug)]
pub struct Batch {
pub steps: VecDeque<BatchStep>,
#[serde(default, with = "option_u64_as_str")]
pub replication_index: Option<u64>,
}
#[derive(Clone, Deserialize, Serialize, Debug)]
pub struct BatchStep {
#[serde(default)]
pub condition: Option<BatchCond>,
pub stmt: Stmt,
}
#[derive(Clone, Deserialize, Serialize, Debug, Default)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum BatchCond {
#[serde(skip_deserializing)]
#[default]
None,
Ok {
step: u32,
},
Error {
step: u32,
},
Not {
cond: Box<BatchCond>,
},
And(BatchCondList),
Or(BatchCondList),
IsAutocommit {},
}
#[derive(Clone, Deserialize, Serialize, Debug)]
pub struct BatchCondList {
pub conds: Vec<BatchCond>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]

View File

@@ -133,6 +133,17 @@ do_execsql_test_in_memory_any_error fail-alter-table-drop-unique-column-constrai
ALTER TABLE t DROP b;
}
# refer https://github.com/tursodatabase/turso/issues/3231
do_execsql_test_in_memory_any_error fail-alter-table-add-duplicate-column {
CREATE TABLE t1 (a);
ALTER TABLE t1 ADD COLUMN a;
}
do_execsql_test_in_memory_any_error fail-alter-table-add-duplicate-column-case-insensitive {
CREATE TABLE t1 (a);
ALTER TABLE t1 ADD COLUMN A;
}
do_execsql_test_in_memory_any_error fail-alter-table-drop-primary-key-column {
CREATE TABLE t (a PRIMARY KEY, b);
ALTER TABLE t DROP a;

View File

@@ -326,3 +326,23 @@ sneakers|sneakers|sneakers
boots|boots|boots
coat|coat|coat
accessories|accessories|accessories}
# make sure we return the group by key sorted DESC when the order by has only an aggregate term
do_execsql_test proper-sort-order {
SELECT u.first_name, COUNT(*) AS c
FROM users u
JOIN products p ON p.id = u.id
GROUP BY u.first_name
ORDER BY c DESC;
} {Travis|1
Tommy|1
Rachel|1
Nicholas|1
Matthew|1
Jennifer|1
Jamie|1
Edward|1
Daniel|1
Cindy|1
Aimee|1}

View File

@@ -703,3 +703,50 @@ fn test_bind_parameters_delete_rowid_alias_seek_out_of_order() -> anyhow::Result
assert_eq!(ins.parameters().count(), 4);
Ok(())
}
#[test]
fn test_cte_alias() -> anyhow::Result<()> {
let tmp_db = TempDatabase::new_with_rusqlite(
"CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT);",
false,
);
let conn = tmp_db.connect_limbo();
conn.execute("INSERT INTO test (id, name) VALUES (1, 'Limbo');")?;
conn.execute("INSERT INTO test (id, name) VALUES (2, 'Turso');")?;
let mut stmt1 = conn.prepare(
"WITH a1 AS (SELECT id FROM test WHERE name = 'Limbo') SELECT a2.id FROM a1 AS a2",
)?;
loop {
match stmt1.step()? {
StepResult::Row => {
let row = stmt1.row().unwrap();
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::Integer(1));
break;
}
StepResult::Done => {
panic!("Expected a row but got Done");
}
StepResult::IO => stmt1.run_once()?,
_ => panic!("Unexpected step result"),
}
}
let mut stmt2 = conn
.prepare("WITH a1 AS (SELECT id FROM test WHERE name = 'Turso') SELECT a2.id FROM a1 a2")?;
loop {
match stmt2.step()? {
StepResult::Row => {
let row = stmt2.row().unwrap();
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::Integer(2));
break;
}
StepResult::Done => {
panic!("Expected a row but got Done");
}
StepResult::IO => stmt2.run_once()?,
_ => panic!("Unexpected step result"),
}
}
Ok(())
}