small adjustments in the sync-engine

This commit is contained in:
Nikita Sivukhin
2025-08-08 01:12:56 +04:00
parent ba88d17f29
commit 405e7f56a6
5 changed files with 29 additions and 35 deletions

View File

@@ -1,4 +1,4 @@
use std::{path::Path, sync::Arc};
use std::sync::Arc;
use crate::{
database_sync_operations::{
@@ -15,8 +15,8 @@ use crate::{
#[derive(Debug)]
pub struct DatabaseSyncEngineOpts {
client_name: String,
wal_pull_batch_size: u64,
pub client_name: String,
pub wal_pull_batch_size: u64,
}
pub struct DatabaseSyncEngine<P: ProtocolIO> {
@@ -70,17 +70,13 @@ impl<C: ProtocolIO> DatabaseSyncEngine<C> {
pub async fn new(
coro: &Coro,
io: Arc<dyn turso_core::IO>,
http_client: Arc<C>,
path: &Path,
protocol: Arc<C>,
path: &str,
opts: DatabaseSyncEngineOpts,
) -> Result<Self> {
let Some(path) = path.to_str() else {
let error = format!("invalid path: {path:?}");
return Err(Error::DatabaseSyncEngineError(error));
};
let mut db = Self {
io,
protocol: http_client,
protocol,
draft_path: format!("{path}-draft"),
synced_path: format!("{path}-synced"),
meta_path: format!("{path}-info"),

View File

@@ -121,7 +121,7 @@ pub async fn wal_pull<'a, C: ProtocolIO, U: AsyncFnMut(&'a Coro, u64) -> Result<
while let Some(chunk) = data.poll_data()? {
let mut chunk = chunk.data();
while !chunk.is_empty() {
let to_fill = WAL_FRAME_SIZE - buffer.len();
let to_fill = (WAL_FRAME_SIZE - buffer.len()).min(chunk.len());
buffer.extend_from_slice(&chunk[0..to_fill]);
chunk = &chunk[to_fill..];
@@ -455,8 +455,8 @@ async fn wal_pull_http<C: ProtocolIO>(
end_frame: u64,
) -> Result<WalHttpPullResult<C::DataCompletion>> {
let completion = client.http(
http::Method::GET,
format!("/sync/{generation}/{start_frame}/{end_frame}"),
"GET",
&format!("/sync/{generation}/{start_frame}/{end_frame}"),
None,
)?;
let status = wait_status(coro, &completion).await?;
@@ -490,8 +490,8 @@ async fn wal_push_http<C: ProtocolIO>(
.map(|baton| format!("/{baton}"))
.unwrap_or("".to_string());
let completion = client.http(
http::Method::POST,
format!("/sync/{generation}/{start_frame}/{end_frame}{baton}"),
"POST",
&format!("/sync/{generation}/{start_frame}/{end_frame}{baton}"),
Some(frames),
)?;
let status = wait_status(coro, &completion).await?;
@@ -506,7 +506,7 @@ async fn wal_push_http<C: ProtocolIO>(
}
async fn db_info_http<C: ProtocolIO>(coro: &Coro, client: &C) -> Result<DbSyncInfo> {
let completion = client.http(http::Method::GET, "/info".to_string(), None)?;
let completion = client.http("GET", "/info", None)?;
let status = wait_status(coro, &completion).await?;
let status_body = wait_full_body(coro, &completion).await?;
if status != http::StatusCode::OK {
@@ -522,7 +522,7 @@ async fn db_bootstrap_http<C: ProtocolIO>(
client: &C,
generation: u64,
) -> Result<C::DataCompletion> {
let completion = client.http(http::Method::GET, format!("/export/{generation}"), None)?;
let completion = client.http("GET", &format!("/export/{generation}"), None)?;
let status = wait_status(coro, &completion).await?;
if status != http::StatusCode::OK.as_u16() {
return Err(Error::DatabaseSyncEngineError(format!(

View File

@@ -93,7 +93,14 @@ mod tests {
let server = self.sync_server.clone();
let db = self
.run(genawaiter::sync::Gen::new(|coro| async move {
DatabaseSyncEngine::new(&coro, io, Arc::new(server), &local_path, opts).await
DatabaseSyncEngine::new(
&coro,
io,
Arc::new(server),
local_path.to_str().unwrap(),
opts,
)
.await
}))
.await
.unwrap();

View File

@@ -5,9 +5,9 @@ pub trait DataPollResult {
}
pub trait DataCompletion {
type HttpPollResult: DataPollResult;
type DataPollResult: DataPollResult;
fn status(&self) -> Result<Option<u16>>;
fn poll_data(&self) -> Result<Option<Self::HttpPollResult>>;
fn poll_data(&self) -> Result<Option<Self::DataPollResult>>;
fn is_done(&self) -> Result<bool>;
}
@@ -15,10 +15,6 @@ pub trait ProtocolIO {
type DataCompletion: DataCompletion;
fn full_read(&self, path: &str) -> Result<Self::DataCompletion>;
fn full_write(&self, path: &str, content: Vec<u8>) -> Result<Self::DataCompletion>;
fn http(
&self,
method: http::Method,
path: String,
body: Option<Vec<u8>>,
) -> Result<Self::DataCompletion>;
fn http(&self, method: &str, path: &str, body: Option<Vec<u8>>)
-> Result<Self::DataCompletion>;
}

View File

@@ -74,7 +74,7 @@ impl TestDataCompletion {
}
impl DataCompletion for TestDataCompletion {
type HttpPollResult = TestDataPollResult;
type DataPollResult = TestDataPollResult;
fn status(&self) -> Result<Option<u16>> {
let poison = self.poisoned.lock().unwrap();
@@ -86,7 +86,7 @@ impl DataCompletion for TestDataCompletion {
Ok(*self.status.lock().unwrap())
}
fn poll_data(&self) -> Result<Option<Self::HttpPollResult>> {
fn poll_data(&self) -> Result<Option<Self::DataPollResult>> {
let poison = self.poisoned.lock().unwrap();
if poison.is_some() {
return Err(Error::DatabaseSyncEngineError(format!(
@@ -138,17 +138,12 @@ impl TestProtocolIo {
impl ProtocolIO for TestProtocolIo {
type DataCompletion = TestDataCompletion;
fn http(
&self,
method: http::Method,
path: String,
data: Option<Vec<u8>>,
) -> Result<TestDataCompletion> {
fn http(&self, method: &str, path: &str, data: Option<Vec<u8>>) -> Result<TestDataCompletion> {
let completion = TestDataCompletion::new();
{
let completion = completion.clone();
let path = &path[1..].split("/").collect::<Vec<_>>();
match (method.as_str(), path.as_slice()) {
match (method, path.as_slice()) {
("GET", ["info"]) => {
self.schedule(completion, |s, c| async move { s.db_info(c).await });
}