format fixes

This commit is contained in:
Nikita Sivukhin
2025-08-06 21:53:03 +04:00
parent b1f1526673
commit 09daa97150
6 changed files with 28 additions and 32 deletions

2
Cargo.lock generated
View File

@@ -4360,7 +4360,7 @@ dependencies = [
[[package]]
name = "turso_sync_engine"
version = "0.1.4-pre.1"
version = "0.1.4-pre.2"
dependencies = [
"bytes",
"ctor",

View File

@@ -43,7 +43,7 @@ async fn update_meta<IO: ProtocolIO>(
let mut meta = orig.as_ref().unwrap().clone();
update(&mut meta);
tracing::debug!("update_meta: {meta:?}");
let completion = io.full_write(&meta_path, meta.dump()?)?;
let completion = io.full_write(meta_path, meta.dump()?)?;
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
wait_full_body(coro, &completion).await?;
*orig = Some(meta);
@@ -58,7 +58,7 @@ async fn set_meta<IO: ProtocolIO>(
meta: DatabaseMetadata,
) -> Result<()> {
tracing::debug!("set_meta: {meta:?}");
let completion = io.full_write(&meta_path, meta.dump()?)?;
let completion = io.full_write(meta_path, meta.dump()?)?;
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
wait_full_body(coro, &completion).await?;
*orig = Some(meta);

View File

@@ -47,15 +47,16 @@ pub async fn db_bootstrap<C: ProtocolIO>(
let chunk = chunk.data();
let content_len = chunk.len();
// todo(sivukhin): optimize allocations here
#[allow(clippy::arc_with_non_send_sync)]
let buffer = Arc::new(Buffer::allocate(chunk.len(), Rc::new(|_| {})));
buffer.as_mut_slice().copy_from_slice(&chunk);
buffer.as_mut_slice().copy_from_slice(chunk);
let mut completions = Vec::with_capacity(dbs.len());
for i in 0..dbs.len() {
for db in dbs {
let c = Completion::new_write(move |size| {
// todo(sivukhin): we need to error out in case of partial read
assert!(size as usize == content_len);
});
completions.push(dbs[i].pwrite(pos, buffer.clone(), c)?);
completions.push(db.pwrite(pos, buffer.clone(), c)?);
}
while !completions.iter().all(|x| x.is_completed()) {
coro.yield_(ProtocolCommand::IO).await?;
@@ -184,7 +185,7 @@ pub async fn wal_push<C: ProtocolIO>(
let mut frames_data = Vec::with_capacity((end_frame - start_frame) as usize * WAL_FRAME_SIZE);
let mut buffer = [0u8; WAL_FRAME_SIZE];
for frame_no in start_frame..end_frame {
let frame_info = wal_session.read_at(frame_no as u64, &mut buffer)?;
let frame_info = wal_session.read_at(frame_no, &mut buffer)?;
tracing::trace!(
"wal_push: collect frame {} ({:?}) for push",
frame_no,
@@ -216,13 +217,13 @@ pub async fn wal_push<C: ProtocolIO>(
Ok(status.baton)
}
const TURSO_SYNC_META_TABLE: &'static str =
const TURSO_SYNC_META_TABLE: &str =
"CREATE TABLE IF NOT EXISTS turso_sync_last_change_id (client_id TEXT PRIMARY KEY, pull_gen INTEGER, change_id INTEGER)";
const TURSO_SYNC_SELECT_LAST_CHANGE_ID: &'static str =
const TURSO_SYNC_SELECT_LAST_CHANGE_ID: &str =
"SELECT pull_gen, change_id FROM turso_sync_last_change_id WHERE client_id = ?";
const TURSO_SYNC_INSERT_LAST_CHANGE_ID: &'static str =
const TURSO_SYNC_INSERT_LAST_CHANGE_ID: &str =
"INSERT INTO turso_sync_last_change_id(client_id, pull_gen, change_id) VALUES (?, 0, 0)";
const TURSO_SYNC_UPDATE_LAST_CHANGE_ID: &'static str =
const TURSO_SYNC_UPDATE_LAST_CHANGE_ID: &str =
"UPDATE turso_sync_last_change_id SET pull_gen = ?, change_id = ? WHERE client_id = ?";
/// Transfers row changes from source DB to target DB
@@ -252,7 +253,7 @@ pub async fn transfer_logical_changes(
match run_stmt(coro, &mut select_last_change_id_stmt).await? {
Some(row) => row.get_value(0).as_int().ok_or_else(|| {
Error::DatabaseSyncEngineError(format!("unexpected source pull_gen type"))
Error::DatabaseSyncEngineError("unexpected source pull_gen type".to_string())
})?,
None => {
tracing::debug!("transfer_logical_changes: client_id={client_id}, turso_sync_last_change_id table is not found");
@@ -274,10 +275,10 @@ pub async fn transfer_logical_changes(
let mut last_change_id = match run_stmt(coro, &mut select_last_change_id_stmt).await? {
Some(row) => {
let target_pull_gen = row.get_value(0).as_int().ok_or_else(|| {
Error::DatabaseSyncEngineError(format!("unexpected target pull_gen type"))
Error::DatabaseSyncEngineError("unexpected target pull_gen type".to_string())
})?;
let target_change_id = row.get_value(1).as_int().ok_or_else(|| {
Error::DatabaseSyncEngineError(format!("unexpected target change_id type"))
Error::DatabaseSyncEngineError("unexpected target change_id type".to_string())
})?;
tracing::debug!(
"transfer_logical_changes: client_id={client_id}, target_pull_gen={target_pull_gen}, target_change_id={target_change_id}"
@@ -396,7 +397,7 @@ pub async fn transfer_physical_changes(
let target_sync_watermark = {
let mut target_session = DatabaseWalSession::new(coro, target_session).await?;
let _ = target_session.rollback_changes_after(target_wal_match_watermark)?;
target_session.rollback_changes_after(target_wal_match_watermark)?;
let mut last_frame_info = None;
let mut frame = vec![0u8; WAL_FRAME_SIZE];
let mut target_sync_watermark = target_session.frames_count()?;
@@ -505,7 +506,7 @@ async fn wal_push_http<C: ProtocolIO>(
}
async fn db_info_http<C: ProtocolIO>(coro: &Coro, client: &C) -> Result<DbSyncInfo> {
let completion = client.http(http::Method::GET, format!("/info"), None)?;
let completion = client.http(http::Method::GET, "/info".to_string(), None)?;
let status = wait_status(coro, &completion).await?;
let status_body = wait_full_body(coro, &completion).await?;
if status != http::StatusCode::OK {

View File

@@ -60,10 +60,7 @@ pub(crate) async fn run_stmt<'a>(
}
}
pub(crate) async fn exec_stmt<'a>(
coro: &'_ Coro,
stmt: &'a mut turso_core::Statement,
) -> Result<()> {
pub(crate) async fn exec_stmt(coro: &Coro, stmt: &mut turso_core::Statement) -> Result<()> {
loop {
match stmt.step()? {
StepResult::IO => {
@@ -112,7 +109,7 @@ impl DatabaseTape {
let connection = self.inner.connect()?;
tracing::debug!("set '{CDC_PRAGMA_NAME}' for new connection");
let mut stmt = connection.prepare(&self.pragma_query)?;
run_stmt(&coro, &mut stmt).await?;
run_stmt(coro, &mut stmt).await?;
Ok(connection)
}
/// Builds an iterator which emits [DatabaseTapeOperation] by extracting data from CDC table
@@ -138,7 +135,7 @@ impl DatabaseTape {
let conn = self.connect(coro).await?;
let mut wal_session = WalSession::new(conn);
wal_session.begin()?;
Ok(DatabaseWalSession::new(coro, wal_session).await?)
DatabaseWalSession::new(coro, wal_session).await
}
/// Start replay session which can apply [DatabaseTapeOperation] from [Self::iterate_changes]
@@ -149,7 +146,7 @@ impl DatabaseTape {
) -> Result<DatabaseReplaySession> {
tracing::debug!("opening replay session");
Ok(DatabaseReplaySession {
conn: self.connect(&coro).await?,
conn: self.connect(coro).await?,
cached_delete_stmt: HashMap::new(),
cached_insert_stmt: HashMap::new(),
in_txn: false,
@@ -554,11 +551,11 @@ impl DatabaseReplaySession {
let placeholders = ["?"].repeat(columns).join(",");
format!("INSERT INTO {table_name} VALUES ({placeholders})")
} else {
let mut table_info_stmt = self.conn.prepare(&format!(
let mut table_info_stmt = self.conn.prepare(format!(
"SELECT name FROM pragma_table_info('{table_name}')"
))?;
let mut column_names = Vec::with_capacity(columns + 1);
while let Some(column) = run_stmt(&coro, &mut table_info_stmt).await? {
while let Some(column) = run_stmt(coro, &mut table_info_stmt).await? {
let turso_core::Value::Text(text) = column.get_value(0) else {
return Err(Error::DatabaseTapeError(
"unexpected column type for pragma_table_info query".to_string(),
@@ -579,12 +576,12 @@ impl DatabaseReplaySession {
let (query, pk_column_indices) = if self.opts.use_implicit_rowid {
(format!("DELETE FROM {table_name} WHERE rowid = ?"), None)
} else {
let mut pk_info_stmt = self.conn.prepare(&format!(
let mut pk_info_stmt = self.conn.prepare(format!(
"SELECT cid, name FROM pragma_table_info('{table_name}') WHERE pk = 1"
))?;
let mut pk_predicates = Vec::with_capacity(1);
let mut pk_column_indices = Vec::with_capacity(1);
while let Some(column) = run_stmt(&coro, &mut pk_info_stmt).await? {
while let Some(column) = run_stmt(coro, &mut pk_info_stmt).await? {
let turso_core::Value::Integer(column_id) = column.get_value(0) else {
return Err(Error::DatabaseTapeError(
"unexpected column type for pragma_table_info query".to_string(),
@@ -624,7 +621,7 @@ fn parse_bin_record(bin_record: Vec<u8>) -> Result<Vec<turso_core::Value>> {
let mut values = Vec::with_capacity(columns);
for i in 0..columns {
let value = cursor.get_value(&record, i)?;
values.push(value.to_owned().into());
values.push(value.to_owned());
}
Ok(values)
}

View File

@@ -34,9 +34,7 @@ impl IoOperations for Arc<dyn turso_core::IO> {
fn try_open(&self, path: &str) -> Result<Option<Arc<dyn turso_core::File>>> {
match self.open_file(path, OpenFlags::None, false) {
Ok(file) => Ok(Some(file)),
Err(LimboError::IOError(err)) if err.kind() == std::io::ErrorKind::NotFound => {
return Ok(None);
}
Err(LimboError::IOError(err)) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err.into()),
}
}

View File

@@ -39,7 +39,7 @@ pub struct DatabaseMetadata {
impl DatabaseMetadata {
pub fn load(data: &[u8]) -> Result<Self> {
let meta = serde_json::from_slice::<DatabaseMetadata>(&data[..])?;
let meta = serde_json::from_slice::<DatabaseMetadata>(data)?;
Ok(meta)
}
pub fn dump(&self) -> Result<Vec<u8>> {