adjust sync native package

This commit is contained in:
Nikita Sivukhin
2025-09-24 18:43:50 +04:00
parent 28c9850b57
commit afbfa98a8d
7 changed files with 284 additions and 154 deletions

View File

@@ -1,6 +1,6 @@
import { DatabasePromise, DatabaseOpts, NativeDatabase } from "@tursodatabase/database-common"
import { ProtocolIo, run, SyncOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, SyncEngineStats, SyncEngineGuards } from "@tursodatabase/sync-common";
import { Database as NativeDB, SyncEngine } from "#index";
import { DatabasePromise } from "@tursodatabase/database-common"
import { ProtocolIo, run, DatabaseOpts as SyncDatabaseOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common";
import { SyncEngine, SyncEngineProtocolVersion } from "#index";
import { promises } from "node:fs";
let NodeIO: ProtocolIo = {
@@ -40,70 +40,95 @@ function memoryIO(): ProtocolIo {
}
};
class Database extends DatabasePromise {
runOpts: RunOpts;
engine: any;
io: ProtocolIo;
guards: SyncEngineGuards
constructor(db: NativeDatabase, io: ProtocolIo, runOpts: RunOpts, engine: any, opts: DatabaseOpts = {}) {
super(db, opts)
this.runOpts = runOpts;
this.engine = engine;
this.io = io;
this.guards = new SyncEngineGuards();
#runOpts: RunOpts;
#engine: any;
#io: ProtocolIo;
#guards: SyncEngineGuards
constructor(opts: SyncDatabaseOpts) {
const engine = new SyncEngine({
path: opts.path,
clientName: opts.clientName,
useTransform: opts.transform != null,
protocolVersion: SyncEngineProtocolVersion.V1,
longPollTimeoutMs: opts.longPollTimeoutMs,
tracing: opts.tracing,
});
super(engine.db() as unknown as any);
let headers = typeof opts.authToken === "function" ? () => ({
...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }),
...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey })
}) : {
...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }),
...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey })
};
this.#runOpts = {
url: opts.url,
headers: headers,
preemptionMs: 1,
transform: opts.transform,
};
this.#engine = engine;
this.#io = this.memory ? memoryIO() : NodeIO;
this.#guards = new SyncEngineGuards();
}
async sync() {
await this.push();
await this.pull();
/**
* connect database and initialize it in case of clean start
*/
override async connect() {
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
}
/**
* pull new changes from the remote database
* if {@link SyncDatabaseOpts.longPollTimeoutMs} is set - then server will hold the connection open until either new changes will appear in the database or timeout occurs.
* @returns true if new changes were pulled from the remote
*/
async pull() {
const changes = await this.guards.wait(async () => await run(this.runOpts, this.io, this.engine, this.engine.wait()));
await this.guards.apply(async () => await run(this.runOpts, this.io, this.engine, this.engine.apply(changes)));
const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait()));
if (changes.empty()) {
return false;
}
await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes)));
return true;
}
/**
* push new local changes to the remote database
* if {@link SyncDatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote
*/
async push() {
await this.guards.push(async () => await run(this.runOpts, this.io, this.engine, this.engine.push()));
await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push()));
}
/**
* checkpoint WAL for local database
*/
async checkpoint() {
await this.guards.checkpoint(async () => await run(this.runOpts, this.io, this.engine, this.engine.checkpoint()));
await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint()));
}
async stats(): Promise<SyncEngineStats> {
return (await run(this.runOpts, this.io, this.engine, this.engine.stats()));
/**
* @returns statistic of current local database
*/
async stats(): Promise<DatabaseStats> {
return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats()));
}
/**
* close the database
*/
override async close(): Promise<void> {
this.engine.close();
this.#engine.close();
}
}
/**
* Creates a new database connection asynchronously.
*
* @param {string} path - Path to the database file.
* @param {Object} opts - Options for database behavior.
* @returns {Promise<Database>} - A promise that resolves to a Database instance.
*/
async function connect(opts: SyncOpts): Promise<Database> {
const engine = new SyncEngine({
path: opts.path,
clientName: opts.clientName,
tablesIgnore: opts.tablesIgnore,
useTransform: opts.transform != null,
tracing: opts.tracing,
protocolVersion: 1,
longPollTimeoutMs: opts.longPollTimeoutMs,
});
const runOpts: RunOpts = {
url: opts.url,
headers: {
...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }),
...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey })
},
preemptionMs: 1,
transform: opts.transform,
};
let io = opts.path == ':memory:' ? memoryIO() : NodeIO;
await run(runOpts, io, engine, engine.init());
const nativeDb = engine.open();
return new Database(nativeDb as any, io, runOpts, engine, {});
async function connect(opts: SyncDatabaseOpts): Promise<Database> {
const db = new Database(opts);
await db.connect();
return db;
}
export { connect, Database, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult }