mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-09 11:14:20 +01:00
adjust sync package
This commit is contained in:
@@ -1,23 +1,155 @@
|
||||
import { SyncOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult } from "@tursodatabase/sync-common";
|
||||
import { connect as promiseConnect, Database } from "./promise.js";
|
||||
import { SyncEngine, initThreadPool, MainWorker } from "./index-bundle.js";
|
||||
import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-browser-common"
|
||||
import { DatabasePromise } from "@tursodatabase/database-common"
|
||||
import { ProtocolIo, run, DatabaseOpts as SyncDatabaseOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common";
|
||||
import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-bundle.js";
|
||||
|
||||
let BrowserIO: ProtocolIo = {
|
||||
async read(path: string): Promise<Buffer | Uint8Array | null> {
|
||||
const result = localStorage.getItem(path);
|
||||
if (result == null) {
|
||||
return null;
|
||||
}
|
||||
return new TextEncoder().encode(result);
|
||||
},
|
||||
async write(path: string, data: Buffer | Uint8Array): Promise<void> {
|
||||
const array = new Uint8Array(data);
|
||||
const value = new TextDecoder('utf-8').decode(array);
|
||||
localStorage.setItem(path, value);
|
||||
}
|
||||
};
|
||||
|
||||
function memoryIO(): ProtocolIo {
|
||||
let values = new Map();
|
||||
return {
|
||||
async read(path: string): Promise<Buffer | Uint8Array | null> {
|
||||
return values.get(path);
|
||||
},
|
||||
async write(path: string, data: Buffer | Uint8Array): Promise<void> {
|
||||
values.set(path, data);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
async function init(): Promise<Worker> {
|
||||
await initThreadPool();
|
||||
if (MainWorker == null) {
|
||||
throw new Error("panic: MainWorker is not initialized");
|
||||
}
|
||||
return MainWorker;
|
||||
}
|
||||
|
||||
class Database extends DatabasePromise {
|
||||
#runOpts: RunOpts;
|
||||
#engine: any;
|
||||
#io: ProtocolIo;
|
||||
#guards: SyncEngineGuards;
|
||||
#worker: Worker | null;
|
||||
constructor(opts: SyncDatabaseOpts) {
|
||||
const engine = new SyncEngine({
|
||||
path: opts.path,
|
||||
clientName: opts.clientName,
|
||||
useTransform: opts.transform != null,
|
||||
protocolVersion: SyncEngineProtocolVersion.V1,
|
||||
longPollTimeoutMs: opts.longPollTimeoutMs,
|
||||
tracing: opts.tracing,
|
||||
});
|
||||
super(engine.db() as unknown as any);
|
||||
|
||||
|
||||
let headers = typeof opts.authToken === "function" ? () => ({
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }),
|
||||
...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey })
|
||||
}) : {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }),
|
||||
...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey })
|
||||
};
|
||||
this.#runOpts = {
|
||||
url: opts.url,
|
||||
headers: headers,
|
||||
preemptionMs: 1,
|
||||
transform: opts.transform,
|
||||
};
|
||||
this.#engine = engine;
|
||||
this.#io = this.memory ? memoryIO() : BrowserIO;
|
||||
this.#guards = new SyncEngineGuards();
|
||||
}
|
||||
/**
|
||||
* connect database and initialize it in case of clean start
|
||||
*/
|
||||
override async connect() {
|
||||
if (!this.memory) {
|
||||
this.#worker = await init();
|
||||
await Promise.all([
|
||||
registerFileAtWorker(this.#worker, this.name),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
|
||||
}
|
||||
/**
|
||||
* pull new changes from the remote database
|
||||
* if {@link SyncDatabaseOpts.longPollTimeoutMs} is set - then server will hold the connection open until either new changes will appear in the database or timeout occurs.
|
||||
* @returns true if new changes were pulled from the remote
|
||||
*/
|
||||
async pull() {
|
||||
const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait()));
|
||||
if (changes.empty()) {
|
||||
return false;
|
||||
}
|
||||
await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes)));
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* push new local changes to the remote database
|
||||
* if {@link SyncDatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote
|
||||
*/
|
||||
async push() {
|
||||
await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push()));
|
||||
}
|
||||
/**
|
||||
* checkpoint WAL for local database
|
||||
*/
|
||||
async checkpoint() {
|
||||
await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint()));
|
||||
}
|
||||
/**
|
||||
* @returns statistic of current local database
|
||||
*/
|
||||
async stats(): Promise<DatabaseStats> {
|
||||
return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats()));
|
||||
}
|
||||
/**
|
||||
* close the database and relevant files
|
||||
*/
|
||||
async close() {
|
||||
if (this.name != null && this.#worker != null) {
|
||||
await Promise.all([
|
||||
unregisterFileAtWorker(this.#worker, this.name),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
await super.close();
|
||||
this.#engine.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new database connection asynchronously.
|
||||
*
|
||||
* @param {string} path - Path to the database file.
|
||||
* @param {Object} opts - Options for database behavior.
|
||||
* @returns {Promise<Database>} - A promise that resolves to a Database instance.
|
||||
*/
|
||||
async function connect(opts: SyncOpts): Promise<Database> {
|
||||
return await promiseConnect(opts, x => new SyncEngine(x), async () => {
|
||||
await initThreadPool();
|
||||
if (MainWorker == null) {
|
||||
throw new Error("panic: MainWorker is not initialized");
|
||||
}
|
||||
return MainWorker;
|
||||
});
|
||||
async function connect(opts: SyncDatabaseOpts): Promise<Database> {
|
||||
const db = new Database(opts);
|
||||
await db.connect();
|
||||
return db;
|
||||
}
|
||||
|
||||
export { connect, Database, }
|
||||
export { connect, Database }
|
||||
export type { DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult }
|
||||
|
||||
@@ -1,23 +1,155 @@
|
||||
import { SyncOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult } from "@tursodatabase/sync-common";
|
||||
import { connect as promiseConnect, Database } from "./promise.js";
|
||||
import { SyncEngine, initThreadPool, MainWorker } from "./index-default.js";
|
||||
import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-browser-common"
|
||||
import { DatabasePromise } from "@tursodatabase/database-common"
|
||||
import { ProtocolIo, run, DatabaseOpts as SyncDatabaseOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common";
|
||||
import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-default.js";
|
||||
|
||||
let BrowserIO: ProtocolIo = {
|
||||
async read(path: string): Promise<Buffer | Uint8Array | null> {
|
||||
const result = localStorage.getItem(path);
|
||||
if (result == null) {
|
||||
return null;
|
||||
}
|
||||
return new TextEncoder().encode(result);
|
||||
},
|
||||
async write(path: string, data: Buffer | Uint8Array): Promise<void> {
|
||||
const array = new Uint8Array(data);
|
||||
const value = new TextDecoder('utf-8').decode(array);
|
||||
localStorage.setItem(path, value);
|
||||
}
|
||||
};
|
||||
|
||||
function memoryIO(): ProtocolIo {
|
||||
let values = new Map();
|
||||
return {
|
||||
async read(path: string): Promise<Buffer | Uint8Array | null> {
|
||||
return values.get(path);
|
||||
},
|
||||
async write(path: string, data: Buffer | Uint8Array): Promise<void> {
|
||||
values.set(path, data);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
async function init(): Promise<Worker> {
|
||||
await initThreadPool();
|
||||
if (MainWorker == null) {
|
||||
throw new Error("panic: MainWorker is not initialized");
|
||||
}
|
||||
return MainWorker;
|
||||
}
|
||||
|
||||
class Database extends DatabasePromise {
|
||||
#runOpts: RunOpts;
|
||||
#engine: any;
|
||||
#io: ProtocolIo;
|
||||
#guards: SyncEngineGuards;
|
||||
#worker: Worker | null;
|
||||
constructor(opts: SyncDatabaseOpts) {
|
||||
const engine = new SyncEngine({
|
||||
path: opts.path,
|
||||
clientName: opts.clientName,
|
||||
useTransform: opts.transform != null,
|
||||
protocolVersion: SyncEngineProtocolVersion.V1,
|
||||
longPollTimeoutMs: opts.longPollTimeoutMs,
|
||||
tracing: opts.tracing,
|
||||
});
|
||||
super(engine.db() as unknown as any);
|
||||
|
||||
|
||||
let headers = typeof opts.authToken === "function" ? () => ({
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }),
|
||||
...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey })
|
||||
}) : {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }),
|
||||
...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey })
|
||||
};
|
||||
this.#runOpts = {
|
||||
url: opts.url,
|
||||
headers: headers,
|
||||
preemptionMs: 1,
|
||||
transform: opts.transform,
|
||||
};
|
||||
this.#engine = engine;
|
||||
this.#io = this.memory ? memoryIO() : BrowserIO;
|
||||
this.#guards = new SyncEngineGuards();
|
||||
}
|
||||
/**
|
||||
* connect database and initialize it in case of clean start
|
||||
*/
|
||||
override async connect() {
|
||||
if (!this.memory) {
|
||||
this.#worker = await init();
|
||||
await Promise.all([
|
||||
registerFileAtWorker(this.#worker, this.name),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
|
||||
}
|
||||
/**
|
||||
* pull new changes from the remote database
|
||||
* if {@link SyncDatabaseOpts.longPollTimeoutMs} is set - then server will hold the connection open until either new changes will appear in the database or timeout occurs.
|
||||
* @returns true if new changes were pulled from the remote
|
||||
*/
|
||||
async pull() {
|
||||
const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait()));
|
||||
if (changes.empty()) {
|
||||
return false;
|
||||
}
|
||||
await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes)));
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* push new local changes to the remote database
|
||||
* if {@link SyncDatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote
|
||||
*/
|
||||
async push() {
|
||||
await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push()));
|
||||
}
|
||||
/**
|
||||
* checkpoint WAL for local database
|
||||
*/
|
||||
async checkpoint() {
|
||||
await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint()));
|
||||
}
|
||||
/**
|
||||
* @returns statistic of current local database
|
||||
*/
|
||||
async stats(): Promise<DatabaseStats> {
|
||||
return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats()));
|
||||
}
|
||||
/**
|
||||
* close the database and relevant files
|
||||
*/
|
||||
async close() {
|
||||
if (this.name != null && this.#worker != null) {
|
||||
await Promise.all([
|
||||
unregisterFileAtWorker(this.#worker, this.name),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
await super.close();
|
||||
this.#engine.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new database connection asynchronously.
|
||||
*
|
||||
* @param {string} path - Path to the database file.
|
||||
* @param {Object} opts - Options for database behavior.
|
||||
* @returns {Promise<Database>} - A promise that resolves to a Database instance.
|
||||
*/
|
||||
async function connect(opts: SyncOpts): Promise<Database> {
|
||||
return await promiseConnect(opts, x => new SyncEngine(x), async () => {
|
||||
await initThreadPool();
|
||||
if (MainWorker == null) {
|
||||
throw new Error("panic: MainWorker is not initialized");
|
||||
}
|
||||
return MainWorker;
|
||||
});
|
||||
async function connect(opts: SyncDatabaseOpts): Promise<Database> {
|
||||
const db = new Database(opts);
|
||||
await db.connect();
|
||||
return db;
|
||||
}
|
||||
|
||||
export { connect, Database, }
|
||||
export { connect, Database }
|
||||
export type { DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult }
|
||||
|
||||
@@ -1,23 +1,155 @@
|
||||
import { SyncOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult } from "@tursodatabase/sync-common";
|
||||
import { connect as promiseConnect, Database } from "./promise.js";
|
||||
import { SyncEngine, initThreadPool, MainWorker } from "./index-turbopack-hack.js";
|
||||
import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-browser-common"
|
||||
import { DatabasePromise } from "@tursodatabase/database-common"
|
||||
import { ProtocolIo, run, DatabaseOpts as SyncDatabaseOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common";
|
||||
import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-turbopack-hack.js";
|
||||
|
||||
let BrowserIO: ProtocolIo = {
|
||||
async read(path: string): Promise<Buffer | Uint8Array | null> {
|
||||
const result = localStorage.getItem(path);
|
||||
if (result == null) {
|
||||
return null;
|
||||
}
|
||||
return new TextEncoder().encode(result);
|
||||
},
|
||||
async write(path: string, data: Buffer | Uint8Array): Promise<void> {
|
||||
const array = new Uint8Array(data);
|
||||
const value = new TextDecoder('utf-8').decode(array);
|
||||
localStorage.setItem(path, value);
|
||||
}
|
||||
};
|
||||
|
||||
function memoryIO(): ProtocolIo {
|
||||
let values = new Map();
|
||||
return {
|
||||
async read(path: string): Promise<Buffer | Uint8Array | null> {
|
||||
return values.get(path);
|
||||
},
|
||||
async write(path: string, data: Buffer | Uint8Array): Promise<void> {
|
||||
values.set(path, data);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
async function init(): Promise<Worker> {
|
||||
await initThreadPool();
|
||||
if (MainWorker == null) {
|
||||
throw new Error("panic: MainWorker is not initialized");
|
||||
}
|
||||
return MainWorker;
|
||||
}
|
||||
|
||||
class Database extends DatabasePromise {
|
||||
#runOpts: RunOpts;
|
||||
#engine: any;
|
||||
#io: ProtocolIo;
|
||||
#guards: SyncEngineGuards;
|
||||
#worker: Worker | null;
|
||||
constructor(opts: SyncDatabaseOpts) {
|
||||
const engine = new SyncEngine({
|
||||
path: opts.path,
|
||||
clientName: opts.clientName,
|
||||
useTransform: opts.transform != null,
|
||||
protocolVersion: SyncEngineProtocolVersion.V1,
|
||||
longPollTimeoutMs: opts.longPollTimeoutMs,
|
||||
tracing: opts.tracing,
|
||||
});
|
||||
super(engine.db() as unknown as any);
|
||||
|
||||
|
||||
let headers = typeof opts.authToken === "function" ? () => ({
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }),
|
||||
...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey })
|
||||
}) : {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }),
|
||||
...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey })
|
||||
};
|
||||
this.#runOpts = {
|
||||
url: opts.url,
|
||||
headers: headers,
|
||||
preemptionMs: 1,
|
||||
transform: opts.transform,
|
||||
};
|
||||
this.#engine = engine;
|
||||
this.#io = this.memory ? memoryIO() : BrowserIO;
|
||||
this.#guards = new SyncEngineGuards();
|
||||
}
|
||||
/**
|
||||
* connect database and initialize it in case of clean start
|
||||
*/
|
||||
override async connect() {
|
||||
if (!this.memory) {
|
||||
this.#worker = await init();
|
||||
await Promise.all([
|
||||
registerFileAtWorker(this.#worker, this.name),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
|
||||
}
|
||||
/**
|
||||
* pull new changes from the remote database
|
||||
* if {@link SyncDatabaseOpts.longPollTimeoutMs} is set - then server will hold the connection open until either new changes will appear in the database or timeout occurs.
|
||||
* @returns true if new changes were pulled from the remote
|
||||
*/
|
||||
async pull() {
|
||||
const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait()));
|
||||
if (changes.empty()) {
|
||||
return false;
|
||||
}
|
||||
await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes)));
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* push new local changes to the remote database
|
||||
* if {@link SyncDatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote
|
||||
*/
|
||||
async push() {
|
||||
await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push()));
|
||||
}
|
||||
/**
|
||||
* checkpoint WAL for local database
|
||||
*/
|
||||
async checkpoint() {
|
||||
await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint()));
|
||||
}
|
||||
/**
|
||||
* @returns statistic of current local database
|
||||
*/
|
||||
async stats(): Promise<DatabaseStats> {
|
||||
return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats()));
|
||||
}
|
||||
/**
|
||||
* close the database and relevant files
|
||||
*/
|
||||
async close() {
|
||||
if (this.name != null && this.#worker != null) {
|
||||
await Promise.all([
|
||||
unregisterFileAtWorker(this.#worker, this.name),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
await super.close();
|
||||
this.#engine.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new database connection asynchronously.
|
||||
*
|
||||
* @param {string} path - Path to the database file.
|
||||
* @param {Object} opts - Options for database behavior.
|
||||
* @returns {Promise<Database>} - A promise that resolves to a Database instance.
|
||||
*/
|
||||
async function connect(opts: SyncOpts): Promise<Database> {
|
||||
return await promiseConnect(opts, x => new SyncEngine(x), async () => {
|
||||
await initThreadPool();
|
||||
if (MainWorker == null) {
|
||||
throw new Error("panic: MainWorker is not initialized");
|
||||
}
|
||||
return MainWorker;
|
||||
});
|
||||
async function connect(opts: SyncDatabaseOpts): Promise<Database> {
|
||||
const db = new Database(opts);
|
||||
await db.connect();
|
||||
return db;
|
||||
}
|
||||
|
||||
export { connect, Database, }
|
||||
export { connect, Database }
|
||||
export type { DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult }
|
||||
|
||||
@@ -1,23 +1,155 @@
|
||||
import { SyncOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult } from "@tursodatabase/sync-common";
|
||||
import { connect as promiseConnect, Database } from "./promise.js";
|
||||
import { SyncEngine, initThreadPool, MainWorker } from "./index-vite-dev-hack.js";
|
||||
import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-browser-common"
|
||||
import { DatabasePromise } from "@tursodatabase/database-common"
|
||||
import { ProtocolIo, run, DatabaseOpts as SyncDatabaseOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common";
|
||||
import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-vite-dev-hack.js";
|
||||
|
||||
let BrowserIO: ProtocolIo = {
|
||||
async read(path: string): Promise<Buffer | Uint8Array | null> {
|
||||
const result = localStorage.getItem(path);
|
||||
if (result == null) {
|
||||
return null;
|
||||
}
|
||||
return new TextEncoder().encode(result);
|
||||
},
|
||||
async write(path: string, data: Buffer | Uint8Array): Promise<void> {
|
||||
const array = new Uint8Array(data);
|
||||
const value = new TextDecoder('utf-8').decode(array);
|
||||
localStorage.setItem(path, value);
|
||||
}
|
||||
};
|
||||
|
||||
function memoryIO(): ProtocolIo {
|
||||
let values = new Map();
|
||||
return {
|
||||
async read(path: string): Promise<Buffer | Uint8Array | null> {
|
||||
return values.get(path);
|
||||
},
|
||||
async write(path: string, data: Buffer | Uint8Array): Promise<void> {
|
||||
values.set(path, data);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
async function init(): Promise<Worker> {
|
||||
await initThreadPool();
|
||||
if (MainWorker == null) {
|
||||
throw new Error("panic: MainWorker is not initialized");
|
||||
}
|
||||
return MainWorker;
|
||||
}
|
||||
|
||||
class Database extends DatabasePromise {
|
||||
#runOpts: RunOpts;
|
||||
#engine: any;
|
||||
#io: ProtocolIo;
|
||||
#guards: SyncEngineGuards;
|
||||
#worker: Worker | null;
|
||||
constructor(opts: SyncDatabaseOpts) {
|
||||
const engine = new SyncEngine({
|
||||
path: opts.path,
|
||||
clientName: opts.clientName,
|
||||
useTransform: opts.transform != null,
|
||||
protocolVersion: SyncEngineProtocolVersion.V1,
|
||||
longPollTimeoutMs: opts.longPollTimeoutMs,
|
||||
tracing: opts.tracing,
|
||||
});
|
||||
super(engine.db() as unknown as any);
|
||||
|
||||
|
||||
let headers = typeof opts.authToken === "function" ? () => ({
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }),
|
||||
...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey })
|
||||
}) : {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }),
|
||||
...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey })
|
||||
};
|
||||
this.#runOpts = {
|
||||
url: opts.url,
|
||||
headers: headers,
|
||||
preemptionMs: 1,
|
||||
transform: opts.transform,
|
||||
};
|
||||
this.#engine = engine;
|
||||
this.#io = this.memory ? memoryIO() : BrowserIO;
|
||||
this.#guards = new SyncEngineGuards();
|
||||
}
|
||||
/**
|
||||
* connect database and initialize it in case of clean start
|
||||
*/
|
||||
override async connect() {
|
||||
if (!this.memory) {
|
||||
this.#worker = await init();
|
||||
await Promise.all([
|
||||
registerFileAtWorker(this.#worker, this.name),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
|
||||
}
|
||||
/**
|
||||
* pull new changes from the remote database
|
||||
* if {@link SyncDatabaseOpts.longPollTimeoutMs} is set - then server will hold the connection open until either new changes will appear in the database or timeout occurs.
|
||||
* @returns true if new changes were pulled from the remote
|
||||
*/
|
||||
async pull() {
|
||||
const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait()));
|
||||
if (changes.empty()) {
|
||||
return false;
|
||||
}
|
||||
await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes)));
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* push new local changes to the remote database
|
||||
* if {@link SyncDatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote
|
||||
*/
|
||||
async push() {
|
||||
await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push()));
|
||||
}
|
||||
/**
|
||||
* checkpoint WAL for local database
|
||||
*/
|
||||
async checkpoint() {
|
||||
await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint()));
|
||||
}
|
||||
/**
|
||||
* @returns statistic of current local database
|
||||
*/
|
||||
async stats(): Promise<DatabaseStats> {
|
||||
return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats()));
|
||||
}
|
||||
/**
|
||||
* close the database and relevant files
|
||||
*/
|
||||
async close() {
|
||||
if (this.name != null && this.#worker != null) {
|
||||
await Promise.all([
|
||||
unregisterFileAtWorker(this.#worker, this.name),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
await super.close();
|
||||
this.#engine.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new database connection asynchronously.
|
||||
*
|
||||
* @param {string} path - Path to the database file.
|
||||
* @param {Object} opts - Options for database behavior.
|
||||
* @returns {Promise<Database>} - A promise that resolves to a Database instance.
|
||||
*/
|
||||
async function connect(opts: SyncOpts): Promise<Database> {
|
||||
return await promiseConnect(opts, x => new SyncEngine(x), async () => {
|
||||
await initThreadPool();
|
||||
if (MainWorker == null) {
|
||||
throw new Error("panic: MainWorker is not initialized");
|
||||
}
|
||||
return MainWorker;
|
||||
});
|
||||
async function connect(opts: SyncDatabaseOpts): Promise<Database> {
|
||||
const db = new Database(opts);
|
||||
await db.connect();
|
||||
return db;
|
||||
}
|
||||
|
||||
export { connect, Database, }
|
||||
export { connect, Database }
|
||||
export type { DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult }
|
||||
|
||||
@@ -331,7 +331,6 @@ test('concurrent-updates', { timeout: 60000 }, async () => {
|
||||
}
|
||||
async function pull(db, i) {
|
||||
try {
|
||||
console.info('pull', i);
|
||||
await db.pull();
|
||||
} catch (e) {
|
||||
console.error('pull', i, e);
|
||||
@@ -343,7 +342,6 @@ test('concurrent-updates', { timeout: 60000 }, async () => {
|
||||
}
|
||||
async function push(db, i) {
|
||||
try {
|
||||
console.info('push', i);
|
||||
await db.push();
|
||||
} catch (e) {
|
||||
console.error('push', i, e);
|
||||
|
||||
@@ -1,113 +0,0 @@
|
||||
import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-browser-common"
|
||||
import { DatabasePromise, DatabaseOpts, NativeDatabase } from "@tursodatabase/database-common"
|
||||
import { ProtocolIo, run, SyncOpts, RunOpts, memoryIO, SyncEngineStats, SyncEngineGuards } from "@tursodatabase/sync-common";
|
||||
|
||||
let BrowserIo: ProtocolIo = {
|
||||
async read(path: string): Promise<Buffer | Uint8Array | null> {
|
||||
const result = localStorage.getItem(path);
|
||||
if (result == null) {
|
||||
return null;
|
||||
}
|
||||
return new TextEncoder().encode(result);
|
||||
},
|
||||
async write(path: string, data: Buffer | Uint8Array): Promise<void> {
|
||||
const array = new Uint8Array(data);
|
||||
const value = new TextDecoder('utf-8').decode(array);
|
||||
localStorage.setItem(path, value);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
class Database extends DatabasePromise {
|
||||
runOpts: RunOpts;
|
||||
engine: any;
|
||||
io: ProtocolIo;
|
||||
worker: Worker | null;
|
||||
fsPath: string | null;
|
||||
guards: SyncEngineGuards;
|
||||
constructor(db: NativeDatabase, io: ProtocolIo, worker: Worker | null, runOpts: RunOpts, engine: any, fsPath: string | null, opts: DatabaseOpts = {}) {
|
||||
super(db, opts)
|
||||
this.io = io;
|
||||
this.worker = worker;
|
||||
this.runOpts = runOpts;
|
||||
this.engine = engine;
|
||||
this.fsPath = fsPath;
|
||||
this.guards = new SyncEngineGuards();
|
||||
}
|
||||
async sync() {
|
||||
await this.push();
|
||||
await this.pull();
|
||||
}
|
||||
async pull() {
|
||||
const changes = await this.guards.wait(async () => await run(this.runOpts, this.io, this.engine, this.engine.wait()));
|
||||
await this.guards.apply(async () => await run(this.runOpts, this.io, this.engine, this.engine.apply(changes)));
|
||||
}
|
||||
async push() {
|
||||
await this.guards.push(async () => await run(this.runOpts, this.io, this.engine, this.engine.push()));
|
||||
}
|
||||
async checkpoint() {
|
||||
await this.guards.checkpoint(async () => await run(this.runOpts, this.io, this.engine, this.engine.checkpoint()));
|
||||
}
|
||||
async stats(): Promise<SyncEngineStats> {
|
||||
return (await run(this.runOpts, this.io, this.engine, this.engine.stats()));
|
||||
}
|
||||
override async close(): Promise<void> {
|
||||
this.db.close();
|
||||
this.engine.close();
|
||||
if (this.fsPath != null && this.worker != null) {
|
||||
await Promise.all([
|
||||
unregisterFileAtWorker(this.worker, this.fsPath),
|
||||
unregisterFileAtWorker(this.worker, `${this.fsPath}-wal`),
|
||||
unregisterFileAtWorker(this.worker, `${this.fsPath}-wal-revert`),
|
||||
unregisterFileAtWorker(this.worker, `${this.fsPath}-info`),
|
||||
unregisterFileAtWorker(this.worker, `${this.fsPath}-changes`),
|
||||
]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new database connection asynchronously.
|
||||
*
|
||||
* @param {string} path - Path to the database file.
|
||||
* @param {Object} opts - Options for database behavior.
|
||||
* @returns {Promise<Database>} - A promise that resolves to a Database instance.
|
||||
*/
|
||||
async function connect(opts: SyncOpts, connect: (any) => any, init: () => Promise<Worker>): Promise<Database> {
|
||||
const engine = connect({
|
||||
path: opts.path,
|
||||
clientName: opts.clientName,
|
||||
tablesIgnore: opts.tablesIgnore,
|
||||
useTransform: opts.transform != null,
|
||||
tracing: opts.tracing,
|
||||
protocolVersion: 1,
|
||||
longPollTimeoutMs: opts.longPollTimeoutMs
|
||||
});
|
||||
const runOpts: RunOpts = {
|
||||
url: opts.url,
|
||||
headers: {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }),
|
||||
...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey })
|
||||
},
|
||||
preemptionMs: 1,
|
||||
transform: opts.transform,
|
||||
};
|
||||
const isMemory = opts.path == ':memory:';
|
||||
let io = isMemory ? memoryIO() : BrowserIo;
|
||||
|
||||
const worker = await init();
|
||||
if (!isMemory) {
|
||||
await Promise.all([
|
||||
registerFileAtWorker(worker, opts.path),
|
||||
registerFileAtWorker(worker, `${opts.path}-wal`),
|
||||
registerFileAtWorker(worker, `${opts.path}-wal-revert`),
|
||||
registerFileAtWorker(worker, `${opts.path}-info`),
|
||||
registerFileAtWorker(worker, `${opts.path}-changes`),
|
||||
]);
|
||||
}
|
||||
await run(runOpts, io, engine, engine.init());
|
||||
const nativeDb = engine.open();
|
||||
return new Database(nativeDb as any, io, worker, runOpts, engine, isMemory ? null : opts.path, {});
|
||||
}
|
||||
|
||||
export { connect, Database }
|
||||
Reference in New Issue
Block a user