opfs for sync in one commit!

This commit is contained in:
Nikita Sivukhin
2025-09-10 03:01:37 +04:00
parent 8ab8b31cb1
commit d55026f84f
75 changed files with 3553 additions and 5535 deletions

View File

@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use bytes::BytesMut;
use prost::Message;
@@ -22,8 +22,9 @@ use crate::{
PullUpdatesRespProtoBody, Stmt, StmtResult, StreamRequest,
},
types::{
Coro, DatabasePullRevision, DatabaseSyncEngineProtocolVersion, DatabaseTapeOperation,
DatabaseTapeRowChangeType, DbSyncInfo, DbSyncStatus, ProtocolCommand,
Coro, DatabasePullRevision, DatabaseRowTransformResult, DatabaseSyncEngineProtocolVersion,
DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType, DbSyncInfo,
DbSyncStatus, ProtocolCommand,
},
wal_session::WalSession,
Result,
@@ -34,7 +35,30 @@ pub const WAL_FRAME_HEADER: usize = 24;
pub const PAGE_SIZE: usize = 4096;
pub const WAL_FRAME_SIZE: usize = WAL_FRAME_HEADER + PAGE_SIZE;
enum WalHttpPullResult<C: DataCompletion> {
pub struct MutexSlot<T: Clone> {
pub value: T,
pub slot: Arc<Mutex<Option<T>>>,
}
impl<T: Clone> Drop for MutexSlot<T> {
fn drop(&mut self) {
self.slot.lock().unwrap().replace(self.value.clone());
}
}
pub(crate) fn acquire_slot<T: Clone>(slot: &Arc<Mutex<Option<T>>>) -> Result<MutexSlot<T>> {
let Some(value) = slot.lock().unwrap().take() else {
return Err(Error::DatabaseSyncEngineError(
"changes file already acquired by another operation".to_string(),
));
};
Ok(MutexSlot {
value,
slot: slot.clone(),
})
}
enum WalHttpPullResult<C: DataCompletion<u8>> {
Frames(C),
NeedCheckpoint(DbSyncStatus),
}
@@ -56,10 +80,10 @@ pub async fn db_bootstrap<C: ProtocolIO, Ctx>(
client: &C,
db: Arc<dyn turso_core::File>,
) -> Result<DbSyncInfo> {
tracing::debug!("db_bootstrap");
tracing::info!("db_bootstrap");
let start_time = std::time::Instant::now();
let db_info = db_info_http(coro, client).await?;
tracing::debug!("db_bootstrap: fetched db_info={db_info:?}");
tracing::info!("db_bootstrap: fetched db_info={db_info:?}");
let content = db_bootstrap_http(coro, client, db_info.current_generation).await?;
let mut pos = 0;
loop {
@@ -99,21 +123,21 @@ pub async fn db_bootstrap<C: ProtocolIO, Ctx>(
}
let elapsed = std::time::Instant::now().duration_since(start_time);
tracing::debug!("db_bootstrap: finished: bytes={pos}, elapsed={:?}", elapsed);
tracing::info!("db_bootstrap: finished: bytes={pos}, elapsed={:?}", elapsed);
Ok(db_info)
}
pub async fn wal_apply_from_file<Ctx>(
coro: &Coro<Ctx>,
frames_file: Arc<dyn turso_core::File>,
frames_file: &Arc<dyn turso_core::File>,
session: &mut DatabaseWalSession,
) -> Result<u32> {
let size = frames_file.size()?;
assert!(size % WAL_FRAME_SIZE as u64 == 0);
#[allow(clippy::arc_with_non_send_sync)]
let buffer = Arc::new(Buffer::new_temporary(WAL_FRAME_SIZE));
tracing::debug!("wal_apply_from_file: size={}", size);
tracing::info!("wal_apply_from_file: size={}", size);
let mut db_size = 0;
for offset in (0..size).step_by(WAL_FRAME_SIZE) {
let c = Completion::new_read(buffer.clone(), move |result| {
@@ -139,10 +163,21 @@ pub async fn wal_apply_from_file<Ctx>(
pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
frames_file: Arc<dyn turso_core::File>,
frames_file: &Arc<dyn turso_core::File>,
revision: &DatabasePullRevision,
wal_pull_batch_size: u64,
) -> Result<DatabasePullRevision> {
// truncate file before pulling new data
let c = Completion::new_trunc(move |result| {
let Ok(rc) = result else {
return;
};
assert!(rc as usize == 0);
});
let c = frames_file.truncate(0, c)?;
while !c.is_completed() {
coro.yield_(ProtocolCommand::IO).await?;
}
match revision {
DatabasePullRevision::Legacy {
generation,
@@ -169,7 +204,7 @@ pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
frames_file: Arc<dyn turso_core::File>,
frames_file: &Arc<dyn turso_core::File>,
revision: &str,
) -> Result<DatabasePullRevision> {
tracing::info!("wal_pull: revision={revision}");
@@ -263,7 +298,7 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
pub async fn wal_pull_to_file_legacy<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
frames_file: Arc<dyn turso_core::File>,
frames_file: &Arc<dyn turso_core::File>,
mut generation: u64,
mut start_frame: u64,
wal_pull_batch_size: u64,
@@ -284,9 +319,9 @@ pub async fn wal_pull_to_file_legacy<C: ProtocolIO, Ctx>(
let data = match result {
WalHttpPullResult::NeedCheckpoint(status) => {
assert!(status.status == "checkpoint_needed");
tracing::debug!("wal_pull: need checkpoint: status={status:?}");
tracing::info!("wal_pull: need checkpoint: status={status:?}");
if status.generation == generation && status.max_frame_no < start_frame {
tracing::debug!("wal_pull: end of history: status={:?}", status);
tracing::info!("wal_pull: end of history: status={:?}", status);
break DatabasePullRevision::Legacy {
generation: status.generation,
synced_frame_no: Some(status.max_frame_no),
@@ -655,7 +690,7 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
client: &C,
source: &DatabaseTape,
client_id: &str,
opts: &DatabaseSyncEngineOpts<Ctx>,
opts: &DatabaseSyncEngineOpts,
) -> Result<(i64, i64)> {
tracing::info!("push_logical_changes: client_id={client_id}");
let source_conn = connect_untracked(source)?;
@@ -666,7 +701,6 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
tracing::debug!("push_logical_changes: last_change_id={:?}", last_change_id);
let replay_opts = DatabaseReplaySessionOpts {
use_implicit_rowid: false,
transform: None,
};
let generator = DatabaseReplayGenerator::new(source_conn, replay_opts);
@@ -697,15 +731,13 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
];
let mut rows_changed = 0;
let mut changes = source.iterate_changes(iterate_opts)?;
let mut local_changes = Vec::new();
while let Some(operation) = changes.next(coro).await? {
match operation {
DatabaseTapeOperation::StmtReplay(_) => {
panic!("changes iterator must not use StmtReplay option")
}
DatabaseTapeOperation::RowChange(change) => {
assert!(
last_change_id.is_none() || last_change_id.unwrap() < change.change_id,
"change id must be strictly increasing: last_change_id={:?}, change.change_id={}",
last_change_id,
change.change_id
);
if change.table_name == TURSO_SYNC_TABLE_NAME {
continue;
}
@@ -713,38 +745,68 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
if ignore.iter().any(|x| &change.table_name == x) {
continue;
}
rows_changed += 1;
// we give user full control over CDC table - so let's not emit assert here for now
if last_change_id.is_some() && last_change_id.unwrap() + 1 != change.change_id {
tracing::warn!(
"out of order change sequence: {} -> {}",
last_change_id.unwrap(),
change.change_id
);
local_changes.push(change);
}
DatabaseTapeOperation::Commit => continue,
}
}
let mut transformed = if opts.use_transform {
Some(apply_transformation(&coro, client, &local_changes, &generator).await?)
} else {
None
};
tracing::info!("local_changes: {:?}", local_changes);
for (i, change) in local_changes.into_iter().enumerate() {
let change_id = change.change_id;
let operation = if let Some(transformed) = &mut transformed {
match std::mem::replace(&mut transformed[i], DatabaseRowTransformResult::Skip) {
DatabaseRowTransformResult::Keep => DatabaseTapeOperation::RowChange(change),
DatabaseRowTransformResult::Skip => continue,
DatabaseRowTransformResult::Rewrite(replay) => {
DatabaseTapeOperation::StmtReplay(replay)
}
last_change_id = Some(change.change_id);
}
} else {
DatabaseTapeOperation::RowChange(change)
};
tracing::info!(
"change_id: {}, last_change_id: {:?}",
change_id,
last_change_id
);
assert!(
last_change_id.is_none() || last_change_id.unwrap() < change_id,
"change id must be strictly increasing: last_change_id={:?}, change.change_id={}",
last_change_id,
change_id
);
rows_changed += 1;
// we give user full control over CDC table - so let's not emit assert here for now
if last_change_id.is_some() && last_change_id.unwrap() + 1 != change_id {
tracing::warn!(
"out of order change sequence: {} -> {}",
last_change_id.unwrap(),
change_id
);
}
last_change_id = Some(change_id);
match operation {
DatabaseTapeOperation::Commit => {
panic!("Commit operation must not be emited at this stage")
}
DatabaseTapeOperation::StmtReplay(replay) => sql_over_http_requests.push(Stmt {
sql: Some(replay.sql),
sql_id: None,
args: convert_to_args(replay.values),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
}),
DatabaseTapeOperation::RowChange(change) => {
let replay_info = generator.replay_info(coro, &change).await?;
if !replay_info.is_ddl_replay {
if let Some(transform) = &opts.transform {
let mutation = generator.create_mutation(&replay_info, &change)?;
if let Some(statement) = transform(&coro.ctx.borrow(), mutation)? {
tracing::info!(
"push_logical_changes: use mutation from custom transformer: sql={}, values={:?}",
statement.sql,
statement.values
);
sql_over_http_requests.push(Stmt {
sql: Some(statement.sql),
sql_id: None,
args: convert_to_args(statement.values),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
});
continue;
}
}
}
let change_type = (&change.change).into();
match change.change {
DatabaseTapeRowChangeType::Delete { before } => {
@@ -825,43 +887,42 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
}
}
}
DatabaseTapeOperation::Commit => {
if rows_changed > 0 {
tracing::info!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id);
// update turso_sync_last_change_id table with new value before commit
let next_change_id = last_change_id.unwrap_or(0);
tracing::info!("push_logical_changes: client_id={client_id}, set pull_gen={source_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}");
sql_over_http_requests.push(Stmt {
sql: Some(TURSO_SYNC_UPSERT_LAST_CHANGE_ID.to_string()),
sql_id: None,
args: vec![
server_proto::Value::Text {
value: client_id.to_string(),
},
server_proto::Value::Integer {
value: source_pull_gen,
},
server_proto::Value::Integer {
value: next_change_id,
},
],
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
});
}
sql_over_http_requests.push(Stmt {
sql: Some("COMMIT".to_string()),
sql_id: None,
args: Vec::new(),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
});
}
}
}
if rows_changed > 0 {
tracing::info!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id);
// update turso_sync_last_change_id table with new value before commit
let next_change_id = last_change_id.unwrap_or(0);
tracing::info!("push_logical_changes: client_id={client_id}, set pull_gen={source_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}");
sql_over_http_requests.push(Stmt {
sql: Some(TURSO_SYNC_UPSERT_LAST_CHANGE_ID.to_string()),
sql_id: None,
args: vec![
server_proto::Value::Text {
value: client_id.to_string(),
},
server_proto::Value::Integer {
value: source_pull_gen,
},
server_proto::Value::Integer {
value: next_change_id,
},
],
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
});
}
sql_over_http_requests.push(Stmt {
sql: Some("COMMIT".to_string()),
sql_id: None,
args: Vec::new(),
named_args: Vec::new(),
want_rows: Some(false),
replication_index: None,
});
tracing::trace!("hrana request: {:?}", sql_over_http_requests);
let replay_hrana_request = server_proto::PipelineReqBody {
baton: None,
@@ -876,6 +937,30 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
Ok((source_pull_gen, last_change_id.unwrap_or(0)))
}
pub async fn apply_transformation<Ctx, P: ProtocolIO>(
coro: &Coro<Ctx>,
client: &P,
changes: &Vec<DatabaseTapeRowChange>,
generator: &DatabaseReplayGenerator,
) -> Result<Vec<DatabaseRowTransformResult>> {
let mut mutations = Vec::new();
for change in changes {
let replay_info = generator.replay_info(&coro, &change).await?;
mutations.push(generator.create_mutation(&replay_info, &change)?);
}
let completion = client.transform(mutations)?;
let transformed = wait_all_results(&coro, &completion).await?;
if transformed.len() != changes.len() {
return Err(Error::DatabaseSyncEngineError(format!(
"unexpected result from custom transformation: mismatch in shapes: {} != {}",
transformed.len(),
changes.len()
)));
}
tracing::info!("apply_transformation: got {:?}", transformed);
Ok(transformed)
}
pub async fn read_wal_salt<Ctx>(
coro: &Coro<Ctx>,
wal: &Arc<dyn turso_core::File>,
@@ -1102,7 +1187,7 @@ async fn sql_execute_http<C: ProtocolIO, Ctx>(
let error = format!("sql_execute_http: unexpected status code: {status}");
return Err(Error::DatabaseSyncEngineError(error));
}
let response = wait_full_body(coro, &completion).await?;
let response = wait_all_results(coro, &completion).await?;
let response: server_proto::PipelineRespBody = serde_json::from_slice(&response)?;
tracing::debug!("hrana response: {:?}", response);
let mut results = Vec::new();
@@ -1134,7 +1219,7 @@ async fn wal_pull_http<C: ProtocolIO, Ctx>(
generation: u64,
start_frame: u64,
end_frame: u64,
) -> Result<WalHttpPullResult<C::DataCompletion>> {
) -> Result<WalHttpPullResult<C::DataCompletionBytes>> {
let completion = client.http(
"GET",
&format!("/sync/{generation}/{start_frame}/{end_frame}"),
@@ -1143,7 +1228,7 @@ async fn wal_pull_http<C: ProtocolIO, Ctx>(
)?;
let status = wait_status(coro, &completion).await?;
if status == http::StatusCode::BAD_REQUEST {
let status_body = wait_full_body(coro, &completion).await?;
let status_body = wait_all_results(coro, &completion).await?;
let status: DbSyncStatus = serde_json::from_slice(&status_body)?;
if status.status == "checkpoint_needed" {
return Ok(WalHttpPullResult::NeedCheckpoint(status));
@@ -1178,7 +1263,7 @@ async fn wal_push_http<C: ProtocolIO, Ctx>(
&[],
)?;
let status = wait_status(coro, &completion).await?;
let status_body = wait_full_body(coro, &completion).await?;
let status_body = wait_all_results(coro, &completion).await?;
if status != http::StatusCode::OK {
let error = std::str::from_utf8(&status_body).ok().unwrap_or("");
return Err(Error::DatabaseSyncEngineError(format!(
@@ -1191,7 +1276,7 @@ async fn wal_push_http<C: ProtocolIO, Ctx>(
async fn db_info_http<C: ProtocolIO, Ctx>(coro: &Coro<Ctx>, client: &C) -> Result<DbSyncInfo> {
let completion = client.http("GET", "/info", None, &[])?;
let status = wait_status(coro, &completion).await?;
let status_body = wait_full_body(coro, &completion).await?;
let status_body = wait_all_results(coro, &completion).await?;
if status != http::StatusCode::OK {
return Err(Error::DatabaseSyncEngineError(format!(
"db_info go unexpected status: {status}"
@@ -1204,7 +1289,7 @@ async fn db_bootstrap_http<C: ProtocolIO, Ctx>(
coro: &Coro<Ctx>,
client: &C,
generation: u64,
) -> Result<C::DataCompletion> {
) -> Result<C::DataCompletionBytes> {
let completion = client.http("GET", &format!("/export/{generation}"), None, &[])?;
let status = wait_status(coro, &completion).await?;
if status != http::StatusCode::OK.as_u16() {
@@ -1215,7 +1300,10 @@ async fn db_bootstrap_http<C: ProtocolIO, Ctx>(
Ok(completion)
}
pub async fn wait_status<Ctx>(coro: &Coro<Ctx>, completion: &impl DataCompletion) -> Result<u16> {
pub async fn wait_status<Ctx, T>(
coro: &Coro<Ctx>,
completion: &impl DataCompletion<T>,
) -> Result<u16> {
while completion.status()?.is_none() {
coro.yield_(ProtocolCommand::IO).await?;
}
@@ -1244,7 +1332,7 @@ pub fn read_varint(buf: &[u8]) -> Result<Option<(usize, usize)>> {
pub async fn wait_proto_message<Ctx, T: prost::Message + Default>(
coro: &Coro<Ctx>,
completion: &impl DataCompletion,
completion: &impl DataCompletion<u8>,
bytes: &mut BytesMut,
) -> Result<Option<T>> {
let start_time = std::time::Instant::now();
@@ -1281,21 +1369,21 @@ pub async fn wait_proto_message<Ctx, T: prost::Message + Default>(
}
}
pub async fn wait_full_body<Ctx>(
pub async fn wait_all_results<Ctx, T: Clone>(
coro: &Coro<Ctx>,
completion: &impl DataCompletion,
) -> Result<Vec<u8>> {
let mut bytes = Vec::new();
completion: &impl DataCompletion<T>,
) -> Result<Vec<T>> {
let mut results = Vec::new();
loop {
while let Some(poll) = completion.poll_data()? {
bytes.extend_from_slice(poll.data());
results.extend_from_slice(poll.data());
}
if completion.is_done()? {
break;
}
coro.yield_(ProtocolCommand::IO).await?;
}
Ok(bytes)
Ok(results)
}
#[cfg(test)]
@@ -1315,7 +1403,7 @@ mod tests {
struct TestPollResult(Vec<u8>);
impl DataPollResult for TestPollResult {
impl DataPollResult<u8> for TestPollResult {
fn data(&self) -> &[u8] {
&self.0
}
@@ -1326,9 +1414,8 @@ mod tests {
chunk: usize,
}
impl DataCompletion for TestCompletion {
impl DataCompletion<u8> for TestCompletion {
type DataPollResult = TestPollResult;
fn status(&self) -> crate::Result<Option<u16>> {
Ok(Some(200))
}