use hrana batch stmt with Not(IsAutocommit) condition for push operation

This commit is contained in:
Nikita Sivukhin
2025-09-23 16:33:40 +04:00
parent 057d2275e5
commit e89dac98f3
2 changed files with 126 additions and 77 deletions

View File

@@ -18,8 +18,9 @@ use crate::{
io_operations::IoOperations, io_operations::IoOperations,
protocol_io::{DataCompletion, DataPollResult, ProtocolIO}, protocol_io::{DataCompletion, DataPollResult, ProtocolIO},
server_proto::{ server_proto::{
self, ExecuteStreamReq, PageData, PageUpdatesEncodingReq, PullUpdatesReqProtoBody, self, Batch, BatchCond, BatchStep, BatchStreamReq, ExecuteStreamReq, PageData,
PullUpdatesRespProtoBody, Stmt, StmtResult, StreamRequest, PageUpdatesEncodingReq, PullUpdatesReqProtoBody, PullUpdatesRespProtoBody, Stmt,
StmtResult, StreamRequest,
}, },
types::{ types::{
Coro, DatabasePullRevision, DatabaseRowTransformResult, DatabaseSyncEngineProtocolVersion, Coro, DatabasePullRevision, DatabaseRowTransformResult, DatabaseSyncEngineProtocolVersion,
@@ -713,23 +714,32 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
ignore_schema_changes: false, ignore_schema_changes: false,
..Default::default() ..Default::default()
}; };
let step = |query, args| BatchStep {
stmt: Stmt {
sql: Some(query),
sql_id: None,
args,
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
},
condition: Some(BatchCond::Not {
cond: Box::new(BatchCond::IsAutocommit {}),
}),
};
let mut sql_over_http_requests = vec![ let mut sql_over_http_requests = vec![
Stmt { BatchStep {
sql: Some("BEGIN IMMEDIATE".to_string()), stmt: Stmt {
sql_id: None, sql: Some("BEGIN IMMEDIATE".to_string()),
args: Vec::new(), sql_id: None,
named_args: Vec::new(), args: Vec::new(),
want_rows: Some(false), named_args: Vec::new(),
replication_index: None, want_rows: Some(false),
}, replication_index: None,
Stmt { },
sql: Some(TURSO_SYNC_CREATE_TABLE.to_string()), condition: None,
sql_id: None,
args: Vec::new(),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
}, },
step(TURSO_SYNC_CREATE_TABLE.to_string(), Vec::new()),
]; ];
let mut rows_changed = 0; let mut rows_changed = 0;
let mut changes = source.iterate_changes(iterate_opts)?; let mut changes = source.iterate_changes(iterate_opts)?;
@@ -797,14 +807,9 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
DatabaseTapeOperation::Commit => { DatabaseTapeOperation::Commit => {
panic!("Commit operation must not be emited at this stage") panic!("Commit operation must not be emited at this stage")
} }
DatabaseTapeOperation::StmtReplay(replay) => sql_over_http_requests.push(Stmt { DatabaseTapeOperation::StmtReplay(replay) => {
sql: Some(replay.sql), sql_over_http_requests.push(step(replay.sql, convert_to_args(replay.values)))
sql_id: None, }
args: convert_to_args(replay.values),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
}),
DatabaseTapeOperation::RowChange(change) => { DatabaseTapeOperation::RowChange(change) => {
let replay_info = generator.replay_info(coro, &change).await?; let replay_info = generator.replay_info(coro, &change).await?;
match change.change { match change.change {
@@ -816,14 +821,8 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
before, before,
None, None,
); );
sql_over_http_requests.push(Stmt { sql_over_http_requests
sql: Some(replay_info.query.clone()), .push(step(replay_info.query.clone(), convert_to_args(values)))
sql_id: None,
args: convert_to_args(values),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
})
} }
DatabaseTapeRowChangeType::Insert { after } => { DatabaseTapeRowChangeType::Insert { after } => {
let values = generator.replay_values( let values = generator.replay_values(
@@ -833,14 +832,8 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
after, after,
None, None,
); );
sql_over_http_requests.push(Stmt { sql_over_http_requests
sql: Some(replay_info.query.clone()), .push(step(replay_info.query.clone(), convert_to_args(values)));
sql_id: None,
args: convert_to_args(values),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
})
} }
DatabaseTapeRowChangeType::Update { DatabaseTapeRowChangeType::Update {
after, after,
@@ -854,14 +847,8 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
after, after,
Some(updates), Some(updates),
); );
sql_over_http_requests.push(Stmt { sql_over_http_requests
sql: Some(replay_info.query.clone()), .push(step(replay_info.query.clone(), convert_to_args(values)));
sql_id: None,
args: convert_to_args(values),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
})
} }
DatabaseTapeRowChangeType::Update { DatabaseTapeRowChangeType::Update {
after, after,
@@ -875,14 +862,8 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
after, after,
None, None,
); );
sql_over_http_requests.push(Stmt { sql_over_http_requests
sql: Some(replay_info.query.clone()), .push(step(replay_info.query.clone(), convert_to_args(values)));
sql_id: None,
args: convert_to_args(values),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
});
} }
} }
} }
@@ -894,10 +875,9 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
// update turso_sync_last_change_id table with new value before commit // update turso_sync_last_change_id table with new value before commit
let next_change_id = last_change_id.unwrap_or(0); let next_change_id = last_change_id.unwrap_or(0);
tracing::info!("push_logical_changes: client_id={client_id}, set pull_gen={source_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}"); tracing::info!("push_logical_changes: client_id={client_id}, set pull_gen={source_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}");
sql_over_http_requests.push(Stmt { sql_over_http_requests.push(step(
sql: Some(TURSO_SYNC_UPSERT_LAST_CHANGE_ID.to_string()), TURSO_SYNC_UPSERT_LAST_CHANGE_ID.to_string(),
sql_id: None, vec![
args: vec![
server_proto::Value::Text { server_proto::Value::Text {
value: client_id.to_string(), value: client_id.to_string(),
}, },
@@ -908,27 +888,20 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
value: next_change_id, value: next_change_id,
}, },
], ],
named_args: Vec::new(), ));
want_rows: Some(false),
replication_index: None,
});
} }
sql_over_http_requests.push(Stmt { sql_over_http_requests.push(step("COMMIT".to_string(), Vec::new()));
sql: Some("COMMIT".to_string()),
sql_id: None,
args: Vec::new(),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
});
tracing::trace!("hrana request: {:?}", sql_over_http_requests); tracing::trace!("hrana request: {:?}", sql_over_http_requests);
let replay_hrana_request = server_proto::PipelineReqBody { let replay_hrana_request = server_proto::PipelineReqBody {
baton: None, baton: None,
requests: sql_over_http_requests requests: vec![StreamRequest::Batch(BatchStreamReq {
.into_iter() batch: Batch {
.map(|stmt| StreamRequest::Execute(ExecuteStreamReq { stmt })) steps: sql_over_http_requests.into(),
.collect(), replication_index: None,
},
})]
.into(),
}; };
let _ = sql_execute_http(coro, client, replay_hrana_request).await?; let _ = sql_execute_http(coro, client, replay_hrana_request).await?;
@@ -1206,6 +1179,20 @@ async fn sql_execute_http<C: ProtocolIO, Ctx>(
server_proto::StreamResponse::Execute(execute) => { server_proto::StreamResponse::Execute(execute) => {
results.push(execute.result); results.push(execute.result);
} }
server_proto::StreamResponse::Batch(batch) => {
for error in batch.result.step_errors {
if let Some(error) = error {
return Err(Error::DatabaseSyncEngineError(format!(
"failed to execute sql: {error:?}"
)));
}
}
for result in batch.result.step_results {
if let Some(result) = result {
results.push(result);
}
}
}
}, },
} }
} }

View File

@@ -82,6 +82,8 @@ pub enum StreamRequest {
None, None,
/// See [`ExecuteStreamReq`] /// See [`ExecuteStreamReq`]
Execute(ExecuteStreamReq), Execute(ExecuteStreamReq),
/// See [`BatchStreamReq`]
Batch(BatchStreamReq),
} }
#[derive(Serialize, Deserialize, Default, Debug, PartialEq)] #[derive(Serialize, Deserialize, Default, Debug, PartialEq)]
@@ -101,6 +103,66 @@ pub enum StreamResult {
#[serde(tag = "type", rename_all = "snake_case")] #[serde(tag = "type", rename_all = "snake_case")]
pub enum StreamResponse { pub enum StreamResponse {
Execute(ExecuteStreamResp), Execute(ExecuteStreamResp),
Batch(BatchStreamResp),
}
#[derive(Serialize, Deserialize, Debug)]
/// A request to execute a batch of SQL statements that may each have a [`BatchCond`] that must be satisfied for the statement to be executed.
pub struct BatchStreamReq {
pub batch: Batch,
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
/// A response to a [`BatchStreamReq`].
pub struct BatchStreamResp {
pub result: BatchResult,
}
#[derive(Clone, Deserialize, Serialize, Debug, Default, PartialEq)]
pub struct BatchResult {
pub step_results: Vec<Option<StmtResult>>,
pub step_errors: Vec<Option<Error>>,
#[serde(default, with = "option_u64_as_str")]
pub replication_index: Option<u64>,
}
#[derive(Clone, Deserialize, Serialize, Debug)]
pub struct Batch {
pub steps: VecDeque<BatchStep>,
#[serde(default, with = "option_u64_as_str")]
pub replication_index: Option<u64>,
}
#[derive(Clone, Deserialize, Serialize, Debug)]
pub struct BatchStep {
#[serde(default)]
pub condition: Option<BatchCond>,
pub stmt: Stmt,
}
#[derive(Clone, Deserialize, Serialize, Debug, Default)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum BatchCond {
#[serde(skip_deserializing)]
#[default]
None,
Ok {
step: u32,
},
Error {
step: u32,
},
Not {
cond: Box<BatchCond>,
},
And(BatchCondList),
Or(BatchCondList),
IsAutocommit {},
}
#[derive(Clone, Deserialize, Serialize, Debug)]
pub struct BatchCondList {
pub conds: Vec<BatchCond>,
} }
#[derive(Serialize, Deserialize, Debug, PartialEq)] #[derive(Serialize, Deserialize, Debug, PartialEq)]