Merge pull request #44 from pubky/feat/non-blocking-writes

Feat/non blocking writes
This commit is contained in:
Nuh
2024-10-16 21:37:48 +03:00
committed by GitHub
9 changed files with 466 additions and 213 deletions

37
Cargo.lock generated
View File

@@ -786,9 +786,9 @@ dependencies = [
[[package]]
name = "futures"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
@@ -801,9 +801,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [
"futures-core",
"futures-sink",
@@ -811,15 +811,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
@@ -828,15 +828,15 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
@@ -845,21 +845,21 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]]
name = "futures-task"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-util"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
@@ -1725,6 +1725,7 @@ dependencies = [
"futures-util",
"heed",
"hex",
"libc",
"pkarr",
"postcard",
"pubky-common",

View File

@@ -15,6 +15,7 @@ flume = "0.11.0"
futures-util = "0.3.30"
heed = "0.20.3"
hex = "0.4.3"
libc = "0.2.159"
pkarr = { workspace = true }
postcard = { version = "1.0.8", features = ["alloc"] }
pubky-common = { version = "0.1.0", path = "../pubky-common" }

View File

@@ -1,4 +1,4 @@
use std::fs;
use std::{fs, path::PathBuf};
use heed::{Env, EnvOpenOptions};
@@ -14,11 +14,17 @@ pub struct DB {
pub(crate) env: Env,
pub(crate) tables: Tables,
pub(crate) config: Config,
pub(crate) buffers_dir: PathBuf,
pub(crate) max_chunk_size: usize,
}
impl DB {
pub fn open(config: Config) -> anyhow::Result<Self> {
fs::create_dir_all(config.storage())?;
let buffers_dir = config.storage().clone().join("buffers");
// Cleanup buffers.
let _ = fs::remove_dir(&buffers_dir);
fs::create_dir_all(&buffers_dir)?;
let env = unsafe {
EnvOpenOptions::new()
@@ -33,46 +39,25 @@ impl DB {
env,
tables,
config,
buffers_dir,
max_chunk_size: max_chunk_size(),
};
Ok(db)
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use pkarr::{mainline::Testnet, Keypair};
/// calculate optimal chunk size:
/// - https://lmdb.readthedocs.io/en/release/#storage-efficiency-limits
/// - https://github.com/lmdbjava/benchmarks/blob/master/results/20160710/README.md#test-2-determine-24816-kb-byte-values
fn max_chunk_size() -> usize {
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize };
use crate::config::Config;
use super::DB;
#[tokio::test]
async fn entries() {
let db = DB::open(Config::test(&Testnet::new(0))).unwrap();
let keypair = Keypair::random();
let path = "/pub/foo.txt";
let (tx, rx) = flume::bounded::<Bytes>(0);
let mut cloned = db.clone();
let cloned_keypair = keypair.clone();
let done = tokio::task::spawn_blocking(move || {
cloned
.put_entry(&cloned_keypair.public_key(), path, rx)
.unwrap();
});
tx.send(vec![1, 2, 3, 4, 5].into()).unwrap();
drop(tx);
done.await.unwrap();
let blob = db.get_blob(&keypair.public_key(), path).unwrap().unwrap();
assert_eq!(blob, Bytes::from(vec![1, 2, 3, 4, 5]));
}
// - 16 bytes Header per page (LMDB)
// - Each page has to contain 2 records
// - 8 bytes per record (LMDB) (imperically, it seems to be 10 not 8)
// - 12 bytes key:
// - timestamp : 8 bytes
// - chunk index: 4 bytes
((page_size - 16) / 2) - (8 + 2) - 12
}

View File

@@ -1,38 +1,24 @@
use heed::{types::Bytes, Database};
use pkarr::PublicKey;
use heed::{types::Bytes, Database, RoTxn};
use crate::database::DB;
use super::entries::Entry;
/// hash of the blob => bytes.
/// (entry timestamp | chunk_index BE) => bytes
pub type BlobsTable = Database<Bytes, Bytes>;
pub const BLOBS_TABLE: &str = "blobs";
impl DB {
pub fn get_blob(
pub fn read_entry_content<'txn>(
&self,
public_key: &PublicKey,
path: &str,
) -> anyhow::Result<Option<bytes::Bytes>> {
let rtxn = self.env.read_txn()?;
let key = format!("{public_key}/{path}");
let result = if let Some(bytes) = self.tables.entries.get(&rtxn, &key)? {
let entry = Entry::deserialize(bytes)?;
self.tables
.blobs
.get(&rtxn, entry.content_hash())?
.map(|blob| bytes::Bytes::from(blob[8..].to_vec()))
} else {
None
};
rtxn.commit()?;
Ok(result)
rtxn: &'txn RoTxn,
entry: &Entry,
) -> anyhow::Result<impl Iterator<Item = Result<&'txn [u8], heed::Error>> + 'txn> {
Ok(self
.tables
.blobs
.prefix_iter(rtxn, &entry.timestamp().to_bytes())?
.map(|i| i.map(|(_, bytes)| bytes)))
}
}

View File

@@ -1,6 +1,7 @@
use pkarr::PublicKey;
use postcard::{from_bytes, to_allocvec};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, path::PathBuf};
use tracing::instrument;
use heed::{
@@ -23,74 +24,12 @@ pub type EntriesTable = Database<Str, Bytes>;
pub const ENTRIES_TABLE: &str = "entries";
impl DB {
pub fn put_entry(
pub fn write_entry(
&mut self,
public_key: &PublicKey,
path: &str,
rx: flume::Receiver<bytes::Bytes>,
) -> anyhow::Result<()> {
let mut wtxn = self.env.write_txn()?;
let mut hasher = Hasher::new();
let mut bytes = vec![];
let mut length = 0;
while let Ok(chunk) = rx.recv() {
hasher.update(&chunk);
bytes.extend_from_slice(&chunk);
length += chunk.len();
}
let hash = hasher.finalize();
let key = hash.as_bytes();
let mut bytes_with_ref_count = Vec::with_capacity(bytes.len() + 8);
bytes_with_ref_count.extend_from_slice(&u64::to_be_bytes(0));
bytes_with_ref_count.extend_from_slice(&bytes);
// TODO: For now, we set the first 8 bytes to a reference counter
let exists = self
.tables
.blobs
.get(&wtxn, key)?
.unwrap_or(bytes_with_ref_count.as_slice());
let new_count = u64::from_be_bytes(exists[0..8].try_into().unwrap()) + 1;
bytes_with_ref_count[0..8].copy_from_slice(&u64::to_be_bytes(new_count));
self.tables
.blobs
.put(&mut wtxn, hash.as_bytes(), &bytes_with_ref_count)?;
let mut entry = Entry::new();
entry.set_content_hash(hash);
entry.set_content_length(length);
let key = format!("{public_key}/{path}");
self.tables
.entries
.put(&mut wtxn, &key, &entry.serialize())?;
if path.starts_with("pub/") {
let url = format!("pubky://{key}");
let event = Event::put(&url);
let value = event.serialize();
let key = entry.timestamp.to_string();
self.tables.events.put(&mut wtxn, &key, &value)?;
// TODO: delete older events.
// TODO: move to events.rs
}
wtxn.commit()?;
Ok(())
) -> anyhow::Result<EntryWriter> {
EntryWriter::new(self, public_key, path)
}
pub fn delete_entry(&mut self, public_key: &PublicKey, path: &str) -> anyhow::Result<bool> {
@@ -101,28 +40,20 @@ impl DB {
let deleted = if let Some(bytes) = self.tables.entries.get(&wtxn, &key)? {
let entry = Entry::deserialize(bytes)?;
let mut bytes_with_ref_count = self
.tables
.blobs
.get(&wtxn, entry.content_hash())?
.map_or(vec![], |s| s.to_vec());
let mut deleted_chunks = false;
let arr: [u8; 8] = bytes_with_ref_count[0..8].try_into().unwrap_or([0; 8]);
let reference_count = u64::from_be_bytes(arr);
let deleted_blobs = if reference_count > 1 {
// decrement reference count
bytes_with_ref_count[0..8].copy_from_slice(&(reference_count - 1).to_be_bytes());
self.tables
{
let mut iter = self
.tables
.blobs
.put(&mut wtxn, entry.content_hash(), &bytes_with_ref_count)?;
.prefix_iter_mut(&mut wtxn, &entry.timestamp.to_bytes())?;
true
} else {
self.tables.blobs.delete(&mut wtxn, entry.content_hash())?
};
while iter.next().is_some() {
unsafe {
deleted_chunks = iter.del_current()?;
}
}
}
let deleted_entry = self.tables.entries.delete(&mut wtxn, &key)?;
@@ -137,11 +68,11 @@ impl DB {
self.tables.events.put(&mut wtxn, &key, &value)?;
// TODO: delete older events.
// TODO: delete events older than a threshold.
// TODO: move to events.rs
}
deleted_entry && deleted_blobs
deleted_entry && deleted_chunks
} else {
false
};
@@ -151,6 +82,21 @@ impl DB {
Ok(deleted)
}
pub fn get_entry(
&self,
txn: &RoTxn,
public_key: &PublicKey,
path: &str,
) -> anyhow::Result<Option<Entry>> {
let key = format!("{public_key}/{path}");
if let Some(bytes) = self.tables.entries.get(txn, &key)? {
return Ok(Some(Entry::deserialize(bytes)?));
}
Ok(None)
}
pub fn contains_directory(&self, txn: &RoTxn, path: &str) -> anyhow::Result<bool> {
Ok(self.tables.entries.get_greater_than(txn, path)?.is_some())
}
@@ -268,13 +214,40 @@ pub struct Entry {
version: usize,
/// Modified at
timestamp: Timestamp,
content_hash: [u8; 32],
content_hash: EntryHash,
content_length: usize,
content_type: String,
// user_metadata: ?
}
// TODO: get headers like Etag
#[derive(Clone, Debug, Eq, PartialEq)]
struct EntryHash(Hash);
impl Default for EntryHash {
fn default() -> Self {
Self(Hash::from_bytes([0; 32]))
}
}
impl Serialize for EntryHash {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let bytes = self.0.as_bytes();
bytes.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for EntryHash {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let bytes: [u8; 32] = Deserialize::deserialize(deserializer)?;
Ok(Self(Hash::from_bytes(bytes)))
}
}
impl Entry {
pub fn new() -> Self {
@@ -283,8 +256,13 @@ impl Entry {
// === Setters ===
pub fn set_timestamp(&mut self, timestamp: &Timestamp) -> &mut Self {
self.timestamp = timestamp.clone();
self
}
pub fn set_content_hash(&mut self, content_hash: Hash) -> &mut Self {
content_hash.as_bytes().clone_into(&mut self.content_hash);
EntryHash(content_hash).clone_into(&mut self.content_hash);
self
}
@@ -295,12 +273,32 @@ impl Entry {
// === Getters ===
pub fn content_hash(&self) -> &[u8; 32] {
&self.content_hash
pub fn timestamp(&self) -> &Timestamp {
&self.timestamp
}
pub fn content_hash(&self) -> &Hash {
&self.content_hash.0
}
pub fn content_length(&self) -> usize {
self.content_length
}
pub fn content_type(&self) -> &str {
&self.content_type
}
// === Public Method ===
pub fn read_content<'txn>(
&self,
db: &'txn DB,
rtxn: &'txn RoTxn,
) -> anyhow::Result<impl Iterator<Item = Result<&'txn [u8], heed::Error>> + 'txn> {
db.read_entry_content(rtxn, self)
}
pub fn serialize(&self) -> Vec<u8> {
to_allocvec(self).expect("Session::serialize")
}
@@ -313,3 +311,216 @@ impl Entry {
from_bytes(bytes)
}
}
pub struct EntryWriter<'db> {
db: &'db DB,
buffer: File,
hasher: Hasher,
buffer_path: PathBuf,
entry_key: String,
timestamp: Timestamp,
is_public: bool,
}
impl<'db> EntryWriter<'db> {
pub fn new(db: &'db DB, public_key: &PublicKey, path: &str) -> anyhow::Result<Self> {
let hasher = Hasher::new();
let timestamp = Timestamp::now();
let buffer_path = db.buffers_dir.join(timestamp.to_string());
let buffer = File::create(&buffer_path)?;
let entry_key = format!("{public_key}/{path}");
Ok(Self {
db,
buffer,
hasher,
buffer_path,
entry_key,
timestamp,
is_public: path.starts_with("pub/"),
})
}
/// Commit blob from the filesystem buffer to LMDB,
/// write the [Entry], and commit the write transaction.
pub fn commit(&self) -> anyhow::Result<Entry> {
let hash = self.hasher.finalize();
let mut buffer = File::open(&self.buffer_path)?;
let mut wtxn = self.db.env.write_txn()?;
let mut chunk_key = [0; 12];
chunk_key[0..8].copy_from_slice(&self.timestamp.to_bytes());
let mut chunk_index: u32 = 0;
loop {
let mut chunk = vec![0_u8; self.db.max_chunk_size];
let bytes_read = buffer.read(&mut chunk)?;
if bytes_read == 0 {
break; // EOF reached
}
chunk_key[8..].copy_from_slice(&chunk_index.to_be_bytes());
self.db
.tables
.blobs
.put(&mut wtxn, &chunk_key, &chunk[..bytes_read])?;
chunk_index += 1;
}
let mut entry = Entry::new();
entry.set_timestamp(&self.timestamp);
entry.set_content_hash(hash);
let length = buffer.metadata()?.len();
entry.set_content_length(length as usize);
self.db
.tables
.entries
.put(&mut wtxn, &self.entry_key, &entry.serialize())?;
// Write a public [Event].
if self.is_public {
let url = format!("pubky://{}", self.entry_key);
let event = Event::put(&url);
let value = event.serialize();
let key = entry.timestamp.to_string();
self.db.tables.events.put(&mut wtxn, &key, &value)?;
// TODO: delete events older than a threshold.
// TODO: move to events.rs
}
wtxn.commit()?;
std::fs::remove_file(&self.buffer_path)?;
Ok(entry)
}
}
impl<'db> std::io::Write for EntryWriter<'db> {
/// Write a chunk to a Filesystem based buffer.
#[inline]
fn write(&mut self, chunk: &[u8]) -> std::io::Result<usize> {
self.hasher.update(chunk);
self.buffer.write_all(chunk)?;
Ok(chunk.len())
}
/// Does not do anything, you need to call [Self::commit]
#[inline]
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::io::Write;
use bytes::Bytes;
use pkarr::{mainline::Testnet, Keypair};
use crate::config::Config;
use super::DB;
#[tokio::test]
async fn entries() {
let mut db = DB::open(Config::test(&Testnet::new(0))).unwrap();
let keypair = Keypair::random();
let public_key = keypair.public_key();
let path = "/pub/foo.txt";
let chunk = Bytes::from(vec![1, 2, 3, 4, 5]);
let mut entry_writer = db.write_entry(&public_key, path).unwrap();
entry_writer.write_all(&chunk).unwrap();
entry_writer.commit().unwrap();
let rtxn = db.env.read_txn().unwrap();
let entry = db.get_entry(&rtxn, &public_key, path).unwrap().unwrap();
assert_eq!(
entry.content_hash(),
&[
2, 79, 103, 192, 66, 90, 61, 192, 47, 186, 245, 140, 185, 61, 229, 19, 46, 61, 117,
197, 25, 250, 160, 186, 218, 33, 73, 29, 136, 201, 112, 87
]
);
let mut blob = vec![];
{
let mut iter = entry.read_content(&db, &rtxn).unwrap();
while let Some(Ok(chunk)) = iter.next() {
blob.extend_from_slice(&chunk);
}
}
assert_eq!(blob, vec![1, 2, 3, 4, 5]);
rtxn.commit().unwrap();
}
#[tokio::test]
async fn chunked_entry() {
let mut db = DB::open(Config::test(&Testnet::new(0))).unwrap();
let keypair = Keypair::random();
let public_key = keypair.public_key();
let path = "/pub/foo.txt";
let chunk = Bytes::from(vec![0; 1024 * 1024]);
let mut entry_writer = db.write_entry(&public_key, path).unwrap();
entry_writer.write_all(&chunk).unwrap();
entry_writer.commit().unwrap();
let rtxn = db.env.read_txn().unwrap();
let entry = db.get_entry(&rtxn, &public_key, path).unwrap().unwrap();
assert_eq!(
entry.content_hash(),
&[
72, 141, 226, 2, 247, 59, 217, 118, 222, 78, 112, 72, 244, 225, 243, 154, 119, 109,
134, 213, 130, 183, 52, 143, 245, 59, 244, 50, 185, 135, 252, 168
]
);
let mut blob = vec![];
{
let mut iter = entry.read_content(&db, &rtxn).unwrap();
while let Some(Ok(chunk)) = iter.next() {
blob.extend_from_slice(&chunk);
}
}
assert_eq!(blob, vec![0; 1024 * 1024]);
let stats = db.tables.blobs.stat(&rtxn).unwrap();
assert_eq!(stats.overflow_pages, 0);
rtxn.commit().unwrap();
}
}

View File

@@ -5,6 +5,7 @@ use axum::{
http::StatusCode,
response::IntoResponse,
};
use tokio::task::JoinError;
use tracing::debug;
pub type Result<T, E = Error> = core::result::Result<T, E>;
@@ -126,3 +127,24 @@ impl<T> From<flume::SendError<T>> for Error {
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
}
}
impl From<flume::RecvError> for Error {
fn from(error: flume::RecvError) -> Self {
debug!(?error);
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
}
}
impl From<JoinError> for Error {
fn from(error: JoinError) -> Self {
debug!(?error);
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
}
}
impl From<axum::http::Error> for Error {
fn from(error: axum::http::Error) -> Self {
debug!(?error);
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
}
}

View File

@@ -1,5 +1,4 @@
use axum::{
debug_handler,
extract::{Host, State},
http::StatusCode,
response::IntoResponse,
@@ -20,7 +19,6 @@ use crate::{
server::AppState,
};
#[debug_handler]
pub async fn signup(
State(state): State<AppState>,
user_agent: Option<TypedHeader<UserAgent>>,

View File

@@ -1,14 +1,16 @@
use axum::{
body::{Body, Bytes},
body::Body,
extract::State,
http::{header, Response, StatusCode},
response::IntoResponse,
};
use futures_util::stream::StreamExt;
use pkarr::PublicKey;
use std::io::Write;
use tower_cookies::Cookies;
use crate::{
database::tables::entries::Entry,
error::{Error, Result},
extractors::{EntryPath, ListQueryParams, Pubky},
server::AppState,
@@ -22,37 +24,20 @@ pub async fn put(
body: Body,
) -> Result<impl IntoResponse> {
let public_key = pubky.public_key().clone();
let path = path.as_str();
let path = path.as_str().to_string();
verify(path)?;
authorize(&mut state, cookies, &public_key, path)?;
verify(&path)?;
authorize(&mut state, cookies, &public_key, &path)?;
let mut entry_writer = state.db.write_entry(&public_key, &path)?;
let mut stream = body.into_data_stream();
let (tx, rx) = flume::bounded::<Bytes>(1);
let path = path.to_string();
// TODO: refactor Database to clean up this scope.
let done = tokio::task::spawn_blocking(move || -> Result<()> {
// TODO: this is a blocking operation, which is ok for small
// payloads (we have 16 kb limit for now) but later we need
// to stream this to filesystem, and keep track of any failed
// writes to GC these files later.
state.db.put_entry(&public_key, &path, rx)?;
Ok(())
});
while let Some(next) = stream.next().await {
let chunk = next?;
tx.send(chunk)?;
entry_writer.write_all(&chunk)?;
}
drop(tx);
done.await.expect("join error")?;
let _entry = entry_writer.commit()?;
// TODO: return relevant headers, like Etag?
@@ -66,9 +51,8 @@ pub async fn get(
params: ListQueryParams,
) -> Result<impl IntoResponse> {
verify(path.as_str())?;
let public_key = pubky.public_key();
let path = path.as_str();
let public_key = pubky.public_key().clone();
let path = path.as_str().to_string();
if path.ends_with('/') {
let txn = state.db.env.read_txn()?;
@@ -95,16 +79,49 @@ pub async fn get(
return Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(vec.join("\n")))
.unwrap());
.body(Body::from(vec.join("\n")))?);
}
// TODO: Enable streaming
let (entry_tx, entry_rx) = flume::bounded::<Option<Entry>>(1);
let (chunks_tx, chunks_rx) = flume::unbounded::<std::result::Result<Vec<u8>, heed::Error>>();
match state.db.get_blob(public_key, path) {
Err(error) => Err(error)?,
Ok(Some(bytes)) => Ok(Response::builder().body(Body::from(bytes)).unwrap()),
Ok(None) => Err(Error::new(StatusCode::NOT_FOUND, "File Not Found".into())),
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
let rtxn = state.db.env.read_txn()?;
let option = state.db.get_entry(&rtxn, &public_key, &path)?;
if let Some(entry) = option {
let iter = entry.read_content(&state.db, &rtxn)?;
entry_tx.send(Some(entry))?;
for next in iter {
chunks_tx.send(next.map(|b| b.to_vec()))?;
}
};
entry_tx.send(None)?;
Ok(())
});
if let Some(entry) = entry_rx.recv_async().await? {
// TODO: add HEAD endpoint
// TODO: Enable seek API (range requests)
// TODO: Gzip? or brotli?
Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_LENGTH, entry.content_length())
.header(header::CONTENT_TYPE, entry.content_type())
.header(
header::ETAG,
format!("\"{}\"", entry.content_hash().to_hex()),
)
.body(Body::from_stream(chunks_rx.into_stream()))
.unwrap())
} else {
Err(Error::with_status(StatusCode::NOT_FOUND))?
}
}
@@ -120,14 +137,13 @@ pub async fn delete(
authorize(&mut state, cookies, &public_key, path)?;
verify(path)?;
// TODO: should we wrap this with `tokio::task::spawn_blocking` in case it takes too long?
let deleted = state.db.delete_entry(&public_key, path)?;
if !deleted {
// TODO: if the path ends with `/` return a `CONFLICT` error?
return Err(Error::with_status(StatusCode::NOT_FOUND));
}
// TODO: return relevant headers, like Etag?
};
Ok(())
}

View File

@@ -98,6 +98,7 @@ mod tests {
use crate::*;
use bytes::Bytes;
use pkarr::{mainline::Testnet, Keypair};
use pubky_homeserver::Homeserver;
use reqwest::{Method, StatusCode};
@@ -762,8 +763,9 @@ mod tests {
);
}
let get = client.get(url.as_str()).await.unwrap();
dbg!(get);
let resolved = client.get(url.as_str()).await.unwrap().unwrap();
assert_eq!(&resolved[..], &[0]);
}
#[tokio::test]
@@ -818,4 +820,35 @@ mod tests {
]
)
}
#[tokio::test]
async fn stream() {
// TODO: test better streaming API
let testnet = Testnet::new(10);
let server = Homeserver::start_test(&testnet).await.unwrap();
let client = PubkyClient::test(&testnet);
let keypair = Keypair::random();
client.signup(&keypair, &server.public_key()).await.unwrap();
let url = format!("pubky://{}/pub/foo.txt", keypair.public_key());
let url = url.as_str();
let bytes = Bytes::from(vec![0; 1024 * 1024]);
client.put(url, &bytes).await.unwrap();
let response = client.get(url).await.unwrap().unwrap();
assert_eq!(response, bytes);
client.delete(url).await.unwrap();
let response = client.get(url).await.unwrap();
assert_eq!(response, None);
}
}