mirror of
https://github.com/aljazceru/pubky-core.git
synced 2025-12-31 12:54:35 +01:00
Merge pull request #33 from pubky/fix/chunks-ref-count
fix(homeserver): add a reference counter for chunks to avoid deleting shared blobs
This commit is contained in:
@@ -26,7 +26,7 @@ impl DB {
|
||||
self.tables
|
||||
.blobs
|
||||
.get(&rtxn, entry.content_hash())?
|
||||
.map(|blob| bytes::Bytes::from(blob.to_vec()))
|
||||
.map(|blob| bytes::Bytes::from(blob[8..].to_vec()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
@@ -43,7 +43,26 @@ impl DB {
|
||||
|
||||
let hash = hasher.finalize();
|
||||
|
||||
self.tables.blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?;
|
||||
let key = hash.as_bytes();
|
||||
|
||||
let mut bytes_with_ref_count = Vec::with_capacity(bytes.len() + 8);
|
||||
bytes_with_ref_count.extend_from_slice(&u64::to_be_bytes(0));
|
||||
bytes_with_ref_count.extend_from_slice(&bytes);
|
||||
|
||||
// TODO: For now, we set the first 8 bytes to a reference counter
|
||||
let exists = self
|
||||
.tables
|
||||
.blobs
|
||||
.get(&wtxn, key)?
|
||||
.unwrap_or(bytes_with_ref_count.as_slice());
|
||||
|
||||
let new_count = u64::from_be_bytes(exists[0..8].try_into().unwrap()) + 1;
|
||||
|
||||
bytes_with_ref_count[0..8].copy_from_slice(&u64::to_be_bytes(new_count));
|
||||
|
||||
self.tables
|
||||
.blobs
|
||||
.put(&mut wtxn, hash.as_bytes(), &bytes_with_ref_count)?;
|
||||
|
||||
let mut entry = Entry::new();
|
||||
|
||||
@@ -82,8 +101,28 @@ impl DB {
|
||||
let deleted = if let Some(bytes) = self.tables.entries.get(&wtxn, &key)? {
|
||||
let entry = Entry::deserialize(bytes)?;
|
||||
|
||||
// TODO: reference counting of blobs
|
||||
let deleted_blobs = self.tables.blobs.delete(&mut wtxn, entry.content_hash())?;
|
||||
let mut bytes_with_ref_count = self
|
||||
.tables
|
||||
.blobs
|
||||
.get(&wtxn, entry.content_hash())?
|
||||
.map_or(vec![], |s| s.to_vec());
|
||||
|
||||
let arr: [u8; 8] = bytes_with_ref_count[0..8].try_into().unwrap_or([0; 8]);
|
||||
let reference_count = u64::from_be_bytes(arr);
|
||||
|
||||
let deleted_blobs = if reference_count > 1 {
|
||||
// decrement reference count
|
||||
|
||||
bytes_with_ref_count[0..8].copy_from_slice(&(reference_count - 1).to_be_bytes());
|
||||
|
||||
self.tables
|
||||
.blobs
|
||||
.put(&mut wtxn, entry.content_hash(), &bytes_with_ref_count)?;
|
||||
|
||||
true
|
||||
} else {
|
||||
self.tables.blobs.delete(&mut wtxn, entry.content_hash())?
|
||||
};
|
||||
|
||||
let deleted_entry = self.tables.entries.delete(&mut wtxn, &key)?;
|
||||
|
||||
@@ -102,7 +141,7 @@ impl DB {
|
||||
// TODO: move to events.rs
|
||||
}
|
||||
|
||||
deleted_entry & deleted_blobs
|
||||
deleted_entry && deleted_blobs
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
@@ -141,7 +141,7 @@ impl PubkyClient {
|
||||
return Err(Error::ResolveEndpoint(original_target.into()));
|
||||
}
|
||||
|
||||
if let Some(public_key) = endpoint_public_key {
|
||||
if endpoint_public_key.is_some() {
|
||||
let url = Url::parse(&format!(
|
||||
"{}://{}",
|
||||
if origin.starts_with("localhost") {
|
||||
@@ -152,7 +152,7 @@ impl PubkyClient {
|
||||
origin
|
||||
))?;
|
||||
|
||||
return Ok(Endpoint { public_key, url });
|
||||
return Ok(Endpoint { url });
|
||||
}
|
||||
|
||||
Err(Error::ResolveEndpoint(original_target.into()))
|
||||
@@ -173,8 +173,6 @@ impl PubkyClient {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Endpoint {
|
||||
// TODO: we don't use this at all?
|
||||
pub public_key: PublicKey,
|
||||
pub url: Url,
|
||||
}
|
||||
|
||||
@@ -326,12 +324,11 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let Endpoint { public_key, url } = client
|
||||
let Endpoint { url, .. } = client
|
||||
.resolve_pubky_homeserver(&pubky.public_key())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(public_key, server.public_key());
|
||||
assert_eq!(url.host_str(), Some("localhost"));
|
||||
assert_eq!(url.port(), Some(server.port()));
|
||||
}
|
||||
|
||||
@@ -765,4 +765,60 @@ mod tests {
|
||||
let get = client.get(url.as_str()).await.unwrap();
|
||||
dbg!(get);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dont_delete_shared_blobs() {
|
||||
let testnet = Testnet::new(10);
|
||||
let homeserver = Homeserver::start_test(&testnet).await.unwrap();
|
||||
let client = PubkyClient::test(&testnet);
|
||||
|
||||
let homeserver_pubky = homeserver.public_key();
|
||||
|
||||
let user_1 = Keypair::random();
|
||||
let user_2 = Keypair::random();
|
||||
|
||||
client.signup(&user_1, &homeserver_pubky).await.unwrap();
|
||||
client.signup(&user_2, &homeserver_pubky).await.unwrap();
|
||||
|
||||
let user_1_id = user_1.public_key();
|
||||
let user_2_id = user_2.public_key();
|
||||
|
||||
let url_1 = format!("pubky://{user_1_id}/pub/pubky.app/file/file_1");
|
||||
let url_2 = format!("pubky://{user_2_id}/pub/pubky.app/file/file_1");
|
||||
|
||||
let file = vec![1];
|
||||
client.put(url_1.as_str(), &file).await.unwrap();
|
||||
client.put(url_2.as_str(), &file).await.unwrap();
|
||||
|
||||
// Delete file 1
|
||||
client.delete(url_1.as_str()).await.unwrap();
|
||||
|
||||
let blob = client.get(url_2.as_str()).await.unwrap().unwrap();
|
||||
|
||||
assert_eq!(blob, file);
|
||||
|
||||
let feed_url = format!("http://localhost:{}/events/", homeserver.port());
|
||||
|
||||
let response = client
|
||||
.request(
|
||||
Method::GET,
|
||||
format!("{feed_url}").as_str().try_into().unwrap(),
|
||||
)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let text = response.text().await.unwrap();
|
||||
let lines = text.split('\n').collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(
|
||||
lines,
|
||||
vec![
|
||||
format!("PUT pubky://{user_1_id}/pub/pubky.app/file/file_1",),
|
||||
format!("PUT pubky://{user_2_id}/pub/pubky.app/file/file_1",),
|
||||
format!("DEL pubky://{user_1_id}/pub/pubky.app/file/file_1",),
|
||||
lines.last().unwrap().to_string()
|
||||
]
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user