mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-26 03:14:23 +01:00
fix clippy
This commit is contained in:
@@ -30,7 +30,7 @@ impl Builder {
|
||||
Self {
|
||||
path: path.to_string(),
|
||||
sync_url: sync_url.to_string(),
|
||||
auth_token: auth_token,
|
||||
auth_token,
|
||||
encryption_key: None,
|
||||
connector: None,
|
||||
}
|
||||
@@ -48,12 +48,8 @@ impl Builder {
|
||||
}
|
||||
}
|
||||
pub async fn build(self) -> Result<Database> {
|
||||
let path = PathBuf::try_from(self.path)
|
||||
.map_err(|e| Error::DatabaseSyncError(format!("invalid synced database path: {e}")))?;
|
||||
let connector = self
|
||||
.connector
|
||||
.map(Ok)
|
||||
.unwrap_or_else(|| default_connector())?;
|
||||
let path = PathBuf::from(self.path);
|
||||
let connector = self.connector.map(Ok).unwrap_or_else(default_connector)?;
|
||||
let executor = TokioExecutor::new();
|
||||
let client = hyper_util::client::legacy::Builder::new(executor).build(connector);
|
||||
let sync_server = TursoSyncServer::new(
|
||||
@@ -92,7 +88,7 @@ impl Database {
|
||||
pub fn default_connector() -> Result<HttpsConnector<HttpConnector>> {
|
||||
Ok(HttpsConnectorBuilder::new()
|
||||
.with_native_roots()
|
||||
.map_err(|e| Error::DatabaseSyncError(format!("unable to configure CA roots: {}", e)))?
|
||||
.map_err(|e| Error::DatabaseSyncError(format!("unable to configure CA roots: {e}")))?
|
||||
.https_or_http()
|
||||
.enable_http1()
|
||||
.build())
|
||||
|
||||
@@ -73,9 +73,9 @@ impl<S: SyncServer, F: Filesystem> DatabaseInner<S, F> {
|
||||
let path_str = path
|
||||
.to_str()
|
||||
.ok_or_else(|| Error::DatabaseSyncError(format!("invalid path: {path:?}")))?;
|
||||
let draft_path = PathBuf::from(format!("{}-draft", path_str));
|
||||
let synced_path = PathBuf::from(format!("{}-synced", path_str));
|
||||
let meta_path = PathBuf::from(format!("{}-info", path_str));
|
||||
let draft_path = PathBuf::from(format!("{path_str}-draft"));
|
||||
let synced_path = PathBuf::from(format!("{path_str}-synced"));
|
||||
let meta_path = PathBuf::from(format!("{path_str}-info"));
|
||||
let database_container = Arc::new(RwLock::new(ActiveDatabaseContainer {
|
||||
db: None,
|
||||
active_type: ActiveDatabase::Draft,
|
||||
@@ -202,9 +202,9 @@ impl<S: SyncServer, F: Filesystem> DatabaseInner<S, F> {
|
||||
let draft_exists = self.filesystem.exists_file(&self.draft_path).await?;
|
||||
let synced_exists = self.filesystem.exists_file(&self.synced_path).await?;
|
||||
if !draft_exists || !synced_exists {
|
||||
return Err(Error::DatabaseSyncError(format!(
|
||||
"Draft or Synced files doesn't exists, but metadata is"
|
||||
)));
|
||||
return Err(Error::DatabaseSyncError(
|
||||
"Draft or Synced files doesn't exists, but metadata is".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
// Synced db is active - we need to finish transfer from Synced to Draft then
|
||||
@@ -302,10 +302,7 @@ impl<S: SyncServer, F: Filesystem> DatabaseInner<S, F> {
|
||||
})
|
||||
}
|
||||
|
||||
async fn write_meta(
|
||||
&self,
|
||||
update: impl Fn(&mut DatabaseMetadata) -> (),
|
||||
) -> Result<DatabaseMetadata> {
|
||||
async fn write_meta(&self, update: impl Fn(&mut DatabaseMetadata)) -> Result<DatabaseMetadata> {
|
||||
let mut meta = self.meta().clone();
|
||||
update(&mut meta);
|
||||
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
|
||||
@@ -358,7 +355,7 @@ impl<S: SyncServer, F: Filesystem> DatabaseInner<S, F> {
|
||||
};
|
||||
while let Some(mut chunk) = data.read_chunk().await? {
|
||||
// chunk is arbitrary - aggregate groups of FRAME_SIZE bytes out from the chunks stream
|
||||
while chunk.len() > 0 {
|
||||
while !chunk.is_empty() {
|
||||
let to_fill = FRAME_SIZE - buffer.len();
|
||||
let prefix = chunk.split_to(to_fill.min(chunk.len()));
|
||||
buffer.extend_from_slice(&prefix);
|
||||
@@ -542,9 +539,9 @@ impl<S: SyncServer, F: Filesystem> DatabaseInner<S, F> {
|
||||
|
||||
let draft_path_str = self.draft_path.to_str().unwrap_or("");
|
||||
let clean_path_str = self.synced_path.to_str().unwrap_or("");
|
||||
let draft_wal = PathBuf::from(format!("{}-wal", draft_path_str));
|
||||
let clean_wal = PathBuf::from(format!("{}-wal", clean_path_str));
|
||||
let draft_shm = PathBuf::from(format!("{}-shm", draft_path_str));
|
||||
let draft_wal = PathBuf::from(format!("{draft_path_str}-wal"));
|
||||
let clean_wal = PathBuf::from(format!("{clean_path_str}-wal"));
|
||||
let draft_shm = PathBuf::from(format!("{draft_path_str}-shm"));
|
||||
self.filesystem
|
||||
.copy_file(&self.synced_path, &self.draft_path)
|
||||
.await?;
|
||||
@@ -568,7 +565,7 @@ impl<S: SyncServer, F: Filesystem> DatabaseInner<S, F> {
|
||||
}
|
||||
|
||||
let clean_path_str = self.synced_path.to_str().unwrap_or("");
|
||||
let clean_wal_path = PathBuf::from(format!("{}-wal", clean_path_str));
|
||||
let clean_wal_path = PathBuf::from(format!("{clean_path_str}-wal"));
|
||||
let wal_size = WAL_HEADER + FRAME_SIZE * self.meta().synced_frame_no;
|
||||
tracing::debug!(
|
||||
"reset Synced DB WAL to the size of {} frames",
|
||||
@@ -617,7 +614,7 @@ mod tests {
|
||||
sql: &str,
|
||||
) -> Result<Vec<Vec<Value>>> {
|
||||
let mut rows = db.query(sql, ()).await?;
|
||||
Ok(convert_rows(&mut rows).await?)
|
||||
convert_rows(&mut rows).await
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -807,7 +804,7 @@ mod tests {
|
||||
let db = DatabaseInner::new(
|
||||
TestFilesystem::new(ctx.clone()),
|
||||
server.clone(),
|
||||
&dir.path().join(&format!("local-{}.db", i)),
|
||||
&dir.path().join(format!("local-{i}.db")),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -889,7 +886,7 @@ mod tests {
|
||||
let server = server.clone();
|
||||
let sync_lock = sync_lock.clone();
|
||||
async move {
|
||||
let local_path = dir.join(format!("local-{}.db", i));
|
||||
let local_path = dir.join(format!("local-{i}.db"));
|
||||
let fs = TestFilesystem::new(ctx.clone());
|
||||
let mut db = DatabaseInner::new(fs, server.clone(), &local_path)
|
||||
.await
|
||||
@@ -936,7 +933,7 @@ mod tests {
|
||||
let mut session = ctx.fault_session();
|
||||
let mut it = 0;
|
||||
while let Some(strategy) = session.next().await {
|
||||
let local_path = dir.path().join(format!("local-{}.db", it));
|
||||
let local_path = dir.path().join(format!("local-{it}.db"));
|
||||
it += 1;
|
||||
|
||||
let fs = TestFilesystem::new(ctx.clone());
|
||||
@@ -992,14 +989,14 @@ mod tests {
|
||||
let mut session = ctx.fault_session();
|
||||
let mut it = 0;
|
||||
while let Some(strategy) = session.next().await {
|
||||
let server_path = dir.path().join(format!("server-{}.db", it));
|
||||
let server_path = dir.path().join(format!("server-{it}.db"));
|
||||
let server = TestSyncServer::new(ctx.clone(), &server_path, opts.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
server.execute("CREATE TABLE t(x)", ()).await.unwrap();
|
||||
|
||||
let local_path = dir.path().join(format!("local-{}.db", it));
|
||||
let local_path = dir.path().join(format!("local-{it}.db"));
|
||||
it += 1;
|
||||
|
||||
let fs = TestFilesystem::new(ctx.clone());
|
||||
@@ -1053,7 +1050,7 @@ mod tests {
|
||||
let mut session = ctx.fault_session();
|
||||
let mut it = 0;
|
||||
while let Some(strategy) = session.next().await {
|
||||
let server_path = dir.path().join(format!("server-{}.db", it));
|
||||
let server_path = dir.path().join(format!("server-{it}.db"));
|
||||
let server = TestSyncServer::new(ctx.clone(), &server_path, opts.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1068,7 +1065,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let local_path = dir.path().join(format!("local-{}.db", it));
|
||||
let local_path = dir.path().join(format!("local-{it}.db"));
|
||||
it += 1;
|
||||
|
||||
let fs = TestFilesystem::new(ctx.clone());
|
||||
|
||||
@@ -61,7 +61,7 @@ impl Filesystem for TokioFilesystem {
|
||||
|
||||
async fn write_file(&self, file: &mut Self::File, buf: &[u8]) -> Result<()> {
|
||||
tracing::debug!("write buffer of size {} to file", buf.len());
|
||||
file.write_all(&buf).await?;
|
||||
file.write_all(buf).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -47,18 +47,17 @@ impl DatabaseMetadata {
|
||||
tracing::debug!("write metadata to {:?}: {:?}", path, self);
|
||||
let directory = path.parent().ok_or_else(|| {
|
||||
Error::MetadataError(format!(
|
||||
"unable to get parent of the provided path: {:?}",
|
||||
path
|
||||
"unable to get parent of the provided path: {path:?}",
|
||||
))
|
||||
})?;
|
||||
let filename = path
|
||||
.file_name()
|
||||
.and_then(|x| x.to_str())
|
||||
.ok_or_else(|| Error::MetadataError(format!("unable to get filename: {:?}", path)))?;
|
||||
.ok_or_else(|| Error::MetadataError(format!("unable to get filename: {path:?}")))?;
|
||||
|
||||
let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH);
|
||||
let timestamp = timestamp.map_err(|e| {
|
||||
Error::MetadataError(format!("failed to get current time for temp file: {}", e))
|
||||
Error::MetadataError(format!("failed to get current time for temp file: {e}"))
|
||||
})?;
|
||||
let temp_name = format!("{}.tmp.{}", filename, timestamp.as_nanos());
|
||||
let temp_path = directory.join(temp_name);
|
||||
@@ -72,7 +71,7 @@ impl DatabaseMetadata {
|
||||
}
|
||||
drop(temp_file);
|
||||
if result.is_ok() {
|
||||
result = fs.rename_file(&temp_path, &path).await;
|
||||
result = fs.rename_file(&temp_path, path).await;
|
||||
}
|
||||
if result.is_err() {
|
||||
let _ = fs.remove_file(&temp_path).await.inspect_err(|e| {
|
||||
|
||||
@@ -96,7 +96,7 @@ impl SyncServer for TestSyncServer {
|
||||
|
||||
let state = self.state.lock().await;
|
||||
let Some(generation) = state.generations.get(&generation_id) else {
|
||||
return Err(Error::DatabaseSyncError(format!("generation not found")));
|
||||
return Err(Error::DatabaseSyncError("generation not found".to_string()));
|
||||
};
|
||||
Ok(TestStream::new(
|
||||
self.ctx.clone(),
|
||||
@@ -110,7 +110,7 @@ impl SyncServer for TestSyncServer {
|
||||
|
||||
let state = self.state.lock().await;
|
||||
let Some(generation) = state.generations.get(&generation_id) else {
|
||||
return Err(Error::DatabaseSyncError(format!("generation not found")));
|
||||
return Err(Error::DatabaseSyncError("generation not found".to_string()));
|
||||
};
|
||||
let mut data = Vec::new();
|
||||
for frame_no in start_frame..start_frame + self.opts.pull_batch_size {
|
||||
@@ -120,7 +120,7 @@ impl SyncServer for TestSyncServer {
|
||||
};
|
||||
data.extend_from_slice(frame);
|
||||
}
|
||||
if data.len() == 0 {
|
||||
if data.is_empty() {
|
||||
let last_generation = state.generations.get(&state.generation).unwrap();
|
||||
return Err(Error::PullNeedCheckpoint(DbSyncStatus {
|
||||
baton: None,
|
||||
@@ -152,7 +152,9 @@ impl SyncServer for TestSyncServer {
|
||||
let mut session = {
|
||||
let mut state = self.state.lock().await;
|
||||
if state.generation != generation_id {
|
||||
return Err(Error::DatabaseSyncError(format!("generation id mismatch")));
|
||||
return Err(Error::DatabaseSyncError(
|
||||
"generation id mismatch".to_string(),
|
||||
));
|
||||
}
|
||||
let baton_str = baton.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
|
||||
let session = match state.sessions.get(&baton_str) {
|
||||
@@ -174,9 +176,9 @@ impl SyncServer for TestSyncServer {
|
||||
let mut offset = 0;
|
||||
for frame_no in start_frame..end_frame {
|
||||
if offset + FRAME_SIZE > frames.len() {
|
||||
return Err(Error::DatabaseSyncError(format!(
|
||||
"unexpected length of frames data"
|
||||
)));
|
||||
return Err(Error::DatabaseSyncError(
|
||||
"unexpected length of frames data".to_string(),
|
||||
));
|
||||
}
|
||||
if !session.in_txn {
|
||||
session.conn.wal_insert_begin()?;
|
||||
@@ -251,7 +253,7 @@ impl TestSyncServer {
|
||||
opts: Arc::new(opts),
|
||||
state: Arc::new(Mutex::new(TestSyncServerState {
|
||||
generation: 1,
|
||||
generations: generations,
|
||||
generations,
|
||||
sessions: HashMap::new(),
|
||||
})),
|
||||
})
|
||||
|
||||
@@ -33,7 +33,7 @@ pub struct TursoSyncServer {
|
||||
fn sync_server_error(status: http::StatusCode, body: impl Buf) -> Error {
|
||||
let mut body_str = String::new();
|
||||
if let Err(e) = body.reader().read_to_string(&mut body_str) {
|
||||
Error::SyncServerError(status, format!("unable to read response body: {}", e))
|
||||
Error::SyncServerError(status, format!("unable to read response body: {e}"))
|
||||
} else {
|
||||
Error::SyncServerError(status, body_str)
|
||||
}
|
||||
@@ -57,7 +57,7 @@ impl Stream for HyperStream {
|
||||
let frame = frame.map_err(Error::HyperResponse)?;
|
||||
let frame = frame
|
||||
.into_data()
|
||||
.map_err(|_| Error::DatabaseSyncError(format!("failed to read export chunk")))?;
|
||||
.map_err(|_| Error::DatabaseSyncError("failed to read export chunk".to_string()))?;
|
||||
Ok(Some(frame))
|
||||
}
|
||||
}
|
||||
@@ -67,7 +67,7 @@ impl TursoSyncServer {
|
||||
let auth_token_header = opts
|
||||
.auth_token
|
||||
.as_ref()
|
||||
.map(|token| hyper::header::HeaderValue::from_str(&format!("Bearer {}", token)))
|
||||
.map(|token| hyper::header::HeaderValue::from_str(&format!("Bearer {token}")))
|
||||
.transpose()
|
||||
.map_err(|e| Error::Http(e.into()))?;
|
||||
Ok(Self {
|
||||
@@ -150,8 +150,8 @@ impl SyncServer for TursoSyncServer {
|
||||
return Err(sync_server_error(status_code, body));
|
||||
}
|
||||
|
||||
let status: DbSyncStatus;
|
||||
status = serde_json::from_reader(body.reader()).map_err(Error::JsonDecode)?;
|
||||
let status: DbSyncStatus =
|
||||
serde_json::from_reader(body.reader()).map_err(Error::JsonDecode)?;
|
||||
|
||||
match status.status.as_str() {
|
||||
"ok" => Ok(status),
|
||||
@@ -185,8 +185,8 @@ impl SyncServer for TursoSyncServer {
|
||||
let (status, body) = self.send(http::Method::GET, &url, empty).await?;
|
||||
if status == http::StatusCode::BAD_REQUEST {
|
||||
let body = aggregate_body(body).await?;
|
||||
let status: DbSyncStatus;
|
||||
status = serde_json::from_reader(body.reader()).map_err(Error::JsonDecode)?;
|
||||
let status: DbSyncStatus =
|
||||
serde_json::from_reader(body.reader()).map_err(Error::JsonDecode)?;
|
||||
if status.status == "checkpoint_needed" {
|
||||
return Err(Error::PullNeedCheckpoint(status));
|
||||
} else {
|
||||
|
||||
@@ -11,9 +11,10 @@ use tokio::sync::Mutex;
|
||||
|
||||
use crate::{errors::Error, Result};
|
||||
|
||||
type PinnedFuture = Pin<Box<dyn Future<Output = bool> + Send>>;
|
||||
|
||||
pub struct FaultInjectionPlan {
|
||||
pub is_fault:
|
||||
Box<dyn Fn(String, String) -> Pin<Box<dyn Future<Output = bool> + Send>> + Send + Sync>,
|
||||
pub is_fault: Box<dyn Fn(String, String) -> PinnedFuture + Send + Sync>,
|
||||
}
|
||||
|
||||
pub enum FaultInjectionStrategy {
|
||||
@@ -55,7 +56,7 @@ impl FaultSession {
|
||||
}
|
||||
|
||||
let plans = self.plans.as_mut().unwrap();
|
||||
if plans.len() == 0 {
|
||||
if plans.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -100,7 +101,7 @@ impl TestContext {
|
||||
let call = call.clone();
|
||||
let count = count.clone();
|
||||
Box::pin(async move {
|
||||
if &(name, bt) != &call {
|
||||
if (name, bt) != call {
|
||||
return false;
|
||||
}
|
||||
let mut count = count.lock().await;
|
||||
@@ -115,10 +116,9 @@ impl TestContext {
|
||||
pub async fn faulty_call(&self, name: &str) -> Result<()> {
|
||||
tracing::trace!("faulty_call: {}", name);
|
||||
tokio::task::yield_now().await;
|
||||
match &*self.fault_injection.lock().await {
|
||||
FaultInjectionStrategy::Disabled => return Ok(()),
|
||||
_ => {}
|
||||
};
|
||||
if let FaultInjectionStrategy::Disabled = &*self.fault_injection.lock().await {
|
||||
return Ok(());
|
||||
}
|
||||
let bt = std::backtrace::Backtrace::force_capture().to_string();
|
||||
match &mut *self.fault_injection.lock().await {
|
||||
FaultInjectionStrategy::Record => {
|
||||
@@ -128,7 +128,7 @@ impl TestContext {
|
||||
}
|
||||
FaultInjectionStrategy::Enabled { plan } => {
|
||||
if plan.is_fault.as_ref()(name.to_string(), bt.clone()).await {
|
||||
Err(Error::DatabaseSyncError(format!("injected fault")))
|
||||
Err(Error::DatabaseSyncError("injected fault".to_string()))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user