diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index fdb879ab9..e85a98b0a 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -1,4 +1,4 @@ -use std::{path::Path, sync::Arc}; +use std::sync::Arc; use crate::{ database_sync_operations::{ @@ -15,8 +15,8 @@ use crate::{ #[derive(Debug)] pub struct DatabaseSyncEngineOpts { - client_name: String, - wal_pull_batch_size: u64, + pub client_name: String, + pub wal_pull_batch_size: u64, } pub struct DatabaseSyncEngine { @@ -70,17 +70,13 @@ impl DatabaseSyncEngine { pub async fn new( coro: &Coro, io: Arc, - http_client: Arc, - path: &Path, + protocol: Arc, + path: &str, opts: DatabaseSyncEngineOpts, ) -> Result { - let Some(path) = path.to_str() else { - let error = format!("invalid path: {path:?}"); - return Err(Error::DatabaseSyncEngineError(error)); - }; let mut db = Self { io, - protocol: http_client, + protocol, draft_path: format!("{path}-draft"), synced_path: format!("{path}-synced"), meta_path: format!("{path}-info"), diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index eff3d8826..9ae04ca9a 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -121,7 +121,7 @@ pub async fn wal_pull<'a, C: ProtocolIO, U: AsyncFnMut(&'a Coro, u64) -> Result< while let Some(chunk) = data.poll_data()? { let mut chunk = chunk.data(); while !chunk.is_empty() { - let to_fill = WAL_FRAME_SIZE - buffer.len(); + let to_fill = (WAL_FRAME_SIZE - buffer.len()).min(chunk.len()); buffer.extend_from_slice(&chunk[0..to_fill]); chunk = &chunk[to_fill..]; @@ -455,8 +455,8 @@ async fn wal_pull_http( end_frame: u64, ) -> Result> { let completion = client.http( - http::Method::GET, - format!("/sync/{generation}/{start_frame}/{end_frame}"), + "GET", + &format!("/sync/{generation}/{start_frame}/{end_frame}"), None, )?; let status = wait_status(coro, &completion).await?; @@ -490,8 +490,8 @@ async fn wal_push_http( .map(|baton| format!("/{baton}")) .unwrap_or("".to_string()); let completion = client.http( - http::Method::POST, - format!("/sync/{generation}/{start_frame}/{end_frame}{baton}"), + "POST", + &format!("/sync/{generation}/{start_frame}/{end_frame}{baton}"), Some(frames), )?; let status = wait_status(coro, &completion).await?; @@ -506,7 +506,7 @@ async fn wal_push_http( } async fn db_info_http(coro: &Coro, client: &C) -> Result { - let completion = client.http(http::Method::GET, "/info".to_string(), None)?; + let completion = client.http("GET", "/info", None)?; let status = wait_status(coro, &completion).await?; let status_body = wait_full_body(coro, &completion).await?; if status != http::StatusCode::OK { @@ -522,7 +522,7 @@ async fn db_bootstrap_http( client: &C, generation: u64, ) -> Result { - let completion = client.http(http::Method::GET, format!("/export/{generation}"), None)?; + let completion = client.http("GET", &format!("/export/{generation}"), None)?; let status = wait_status(coro, &completion).await?; if status != http::StatusCode::OK.as_u16() { return Err(Error::DatabaseSyncEngineError(format!( diff --git a/packages/turso-sync-engine/src/lib.rs b/packages/turso-sync-engine/src/lib.rs index 4b0f723c1..d0f61dc21 100644 --- a/packages/turso-sync-engine/src/lib.rs +++ b/packages/turso-sync-engine/src/lib.rs @@ -93,7 +93,14 @@ mod tests { let server = self.sync_server.clone(); let db = self .run(genawaiter::sync::Gen::new(|coro| async move { - DatabaseSyncEngine::new(&coro, io, Arc::new(server), &local_path, opts).await + DatabaseSyncEngine::new( + &coro, + io, + Arc::new(server), + local_path.to_str().unwrap(), + opts, + ) + .await })) .await .unwrap(); diff --git a/packages/turso-sync-engine/src/protocol_io.rs b/packages/turso-sync-engine/src/protocol_io.rs index f1f599aa7..77577d6c5 100644 --- a/packages/turso-sync-engine/src/protocol_io.rs +++ b/packages/turso-sync-engine/src/protocol_io.rs @@ -5,9 +5,9 @@ pub trait DataPollResult { } pub trait DataCompletion { - type HttpPollResult: DataPollResult; + type DataPollResult: DataPollResult; fn status(&self) -> Result>; - fn poll_data(&self) -> Result>; + fn poll_data(&self) -> Result>; fn is_done(&self) -> Result; } @@ -15,10 +15,6 @@ pub trait ProtocolIO { type DataCompletion: DataCompletion; fn full_read(&self, path: &str) -> Result; fn full_write(&self, path: &str, content: Vec) -> Result; - fn http( - &self, - method: http::Method, - path: String, - body: Option>, - ) -> Result; + fn http(&self, method: &str, path: &str, body: Option>) + -> Result; } diff --git a/packages/turso-sync-engine/src/test_protocol_io.rs b/packages/turso-sync-engine/src/test_protocol_io.rs index eaef3b6bb..09e00e265 100644 --- a/packages/turso-sync-engine/src/test_protocol_io.rs +++ b/packages/turso-sync-engine/src/test_protocol_io.rs @@ -74,7 +74,7 @@ impl TestDataCompletion { } impl DataCompletion for TestDataCompletion { - type HttpPollResult = TestDataPollResult; + type DataPollResult = TestDataPollResult; fn status(&self) -> Result> { let poison = self.poisoned.lock().unwrap(); @@ -86,7 +86,7 @@ impl DataCompletion for TestDataCompletion { Ok(*self.status.lock().unwrap()) } - fn poll_data(&self) -> Result> { + fn poll_data(&self) -> Result> { let poison = self.poisoned.lock().unwrap(); if poison.is_some() { return Err(Error::DatabaseSyncEngineError(format!( @@ -138,17 +138,12 @@ impl TestProtocolIo { impl ProtocolIO for TestProtocolIo { type DataCompletion = TestDataCompletion; - fn http( - &self, - method: http::Method, - path: String, - data: Option>, - ) -> Result { + fn http(&self, method: &str, path: &str, data: Option>) -> Result { let completion = TestDataCompletion::new(); { let completion = completion.clone(); let path = &path[1..].split("/").collect::>(); - match (method.as_str(), path.as_slice()) { + match (method, path.as_slice()) { ("GET", ["info"]) => { self.schedule(completion, |s, c| async move { s.db_info(c).await }); }