small adjustments in the sync engine js

This commit is contained in:
Nikita Sivukhin
2025-08-08 15:37:49 +04:00
parent ca20af6f89
commit d25acf3c2c
5 changed files with 69 additions and 27 deletions

View File

@@ -15,7 +15,7 @@ napi = { version = "3.1.3", default-features = false, features = ["napi6"] }
napi-derive = { version = "3.1.1", default-features = true }
turso_sync_engine = { workspace = true }
turso_core = { workspace = true }
turso_node = { path = "../../bindings/javascript" }
turso_node = { workspace = true }
genawaiter = { version = "0.99.1", default-features = false }
[build-dependencies]

View File

@@ -154,7 +154,7 @@ export interface DatabaseOpts {
export type JsProtocolRequest =
| { type: 'Http', method: string, path: string, body?: Buffer }
| { type: 'FullRead', path: string }
| { type: 'FullWrite', path: string, content: Buffer }
| { type: 'FullWrite', path: string, content: Array<number> }
export interface SyncEngineOpts {
path: string

View File

@@ -21,7 +21,7 @@ pub enum JsProtocolRequest {
},
FullWrite {
path: String,
content: Buffer,
content: Vec<u8>,
},
}
@@ -151,7 +151,7 @@ impl ProtocolIO for JsProtocolIo {
) -> turso_sync_engine::Result<Self::DataCompletion> {
Ok(self.add_request(JsProtocolRequest::FullWrite {
path: path.to_string(),
content: Buffer::from(content),
content,
}))
}
}

View File

@@ -45,17 +45,23 @@ pub struct SyncEngineOpts {
impl SyncEngine {
#[napi(constructor)]
pub fn new(opts: SyncEngineOpts) -> napi::Result<Self> {
let is_memory = opts.path == ":memory:";
let io: Arc<dyn turso_core::IO> = if is_memory {
Arc::new(turso_core::MemoryIO::new())
} else {
Arc::new(turso_core::PlatformIO::new().map_err(|e| {
napi::Error::new(
napi::Status::GenericFailure,
format!("Failed to create IO: {e}"),
)
})?)
};
Ok(SyncEngine {
path: opts.path,
client_name: opts.client_name.unwrap_or("turso-sync-js".to_string()),
wal_pull_batch_size: opts.wal_pull_batch_size.unwrap_or(100),
sync_engine: Arc::new(Mutex::new(None)),
io: Arc::new(turso_core::PlatformIO::new().map_err(|e| {
napi::Error::new(
napi::Status::GenericFailure,
format!("Failed to create IO: {e}"),
)
})?),
io,
protocol: Arc::new(JsProtocolIo::default()),
#[allow(clippy::arc_with_non_send_sync)]
opened: Arc::new(Mutex::new(None)),

View File

@@ -2,19 +2,60 @@
import { SyncEngine } from '#entry-point';
import { Database } from '@tursodatabase/turso';
import * as fs from 'fs';
const GENERATOR_RESUME_IO = 0;
const GENERATOR_RESUME_DONE = 1;
async function process(httpOpts, request) {
async function read(opts, path: string): Promise<Buffer | Uint8Array | null> {
if (opts.isMemory) {
return opts.value;
}
if (typeof window === 'undefined') {
const { promises } = await import('node:fs');
try {
return await promises.readFile(path);
} catch (error) {
if (error.code === 'ENOENT') {
return null;
}
throw error;
}
} else {
const data = localStorage.getItem(path);
if (data != null) {
return new TextEncoder().encode(data);
} else {
return null;
}
}
}
async function write(opts, path: string, content: number[]): Promise<void> {
if (opts.isMemory) {
opts.value = content;
return;
}
const data = new Uint8Array(content);
if (typeof window === 'undefined') {
const { promises } = await import('node:fs');
const unix = Math.floor(Date.now() / 1000);
const nonce = Math.floor(Math.random() * 1000000000);
const tmp = `${path}.tmp.${unix}.${nonce}`;
await promises.writeFile(tmp, data);
await promises.rename(tmp, path);
} else {
localStorage.setItem(path, new TextDecoder().decode(data));
}
}
async function process(opts, request) {
const requestType = request.request();
const completion = request.completion();
if (requestType.type == 'Http') {
try {
const response = await fetch(`${httpOpts.url}${requestType.path}`, {
const response = await fetch(`${opts.url}${requestType.path}`, {
method: requestType.method,
headers: httpOpts.headers,
headers: opts.headers,
body: requestType.body
});
completion.status(response.status);
@@ -32,23 +73,17 @@ async function process(httpOpts, request) {
}
} else if (requestType.type == 'FullRead') {
try {
const metadata = await fs.promises.readFile(requestType.path);
completion.push(metadata);
const metadata = await read(opts.metadata, requestType.path);
if (metadata != null) {
completion.push(metadata);
}
completion.done();
} catch (error) {
if (error.code === 'ENOENT') {
completion.done();
} else {
completion.poison(`metadata read error: ${error}`);
}
completion.poison(`metadata read error: ${error}`);
}
} else if (requestType.type == 'FullWrite') {
try {
const unix = Math.floor(Date.now() / 1000);
const nonce = Math.floor(Math.random() * 1000000000);
const tmp = `${requestType.path}.tmp.${unix}.${nonce}`;
await fs.promises.writeFile(tmp, requestType.content);
await fs.promises.rename(tmp, requestType.path);
await write(opts.metadata, requestType.path, requestType.content);
completion.done();
} catch (error) {
completion.poison(`metadata write error: ${error}`);
@@ -87,7 +122,8 @@ export async function connect(opts: ConnectOpts): Database & Sync {
headers: {
...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }),
...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey })
}
},
metadata: opts.path == ':memory:' ? { isMemory: true, value: null } : { isMemory: false }
};
await run(httpOpts, engine, engine.init());
const nativeDb = engine.open();