feat(homeserver): stream incoming body

This commit is contained in:
nazeh
2024-07-23 11:26:12 +03:00
parent 5a6c7ae9c5
commit 401872a61f
14 changed files with 207 additions and 39 deletions

17
Cargo.lock generated
View File

@@ -142,6 +142,8 @@ dependencies = [
"mime",
"pin-project-lite",
"serde",
"tokio",
"tokio-util",
"tower",
"tower-layer",
"tower-service",
@@ -1283,6 +1285,8 @@ dependencies = [
"base32",
"bytes",
"dirs-next",
"flume",
"futures-util",
"heed",
"pkarr",
"postcard",
@@ -1826,6 +1830,19 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-util"
version = "0.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tower"
version = "0.4.13"

View File

@@ -8,6 +8,8 @@ pub type Hash = blake3::Hash;
pub use blake3::hash;
pub use blake3::Hasher;
pub fn random_hash() -> Hash {
let mut rng = rand::thread_rng();
Hash::from_bytes(rng.gen())

View File

@@ -6,10 +6,12 @@ edition = "2021"
[dependencies]
anyhow = "1.0.82"
axum = "0.7.5"
axum-extra = { version = "0.9.3", features = ["typed-header"] }
axum-extra = { version = "0.9.3", features = ["typed-header", "async-read-body"] }
base32 = "0.5.1"
bytes = "1.6.1"
dirs-next = "2.0.0"
flume = "0.11.0"
futures-util = "0.3.30"
heed = "0.20.3"
pkarr = { version = "2.1.0", features = ["async"] }
postcard = { version = "1.0.8", features = ["alloc"] }

View File

@@ -40,13 +40,16 @@ impl Config {
/// Test configurations
pub fn test(testnet: &pkarr::mainline::Testnet) -> Self {
let bootstrap = Some(testnet.bootstrap.to_owned());
let storage = Some(
std::env::temp_dir()
.join(Timestamp::now().to_string())
.join(DEFAULT_STORAGE_DIR),
);
Self {
bootstrap: Some(testnet.bootstrap.to_owned()),
storage: Some(
std::env::temp_dir()
.join(Timestamp::now().to_string())
.join(DEFAULT_STORAGE_DIR),
),
bootstrap,
storage,
..Default::default()
}
}

View File

@@ -19,21 +19,10 @@ impl DB {
let env = unsafe { EnvOpenOptions::new().max_dbs(TABLES_COUNT).open(storage) }?;
let db = DB { env };
migrations::run(&env);
db.run_migrations();
let db = DB { env };
Ok(db)
}
fn run_migrations(&self) -> anyhow::Result<()> {
let mut wtxn = self.env.write_txn()?;
migrations::create_users_table(&self.env, &mut wtxn);
migrations::create_sessions_table(&self.env, &mut wtxn);
wtxn.commit()?;
Ok(())
}
}

View File

@@ -1,19 +1,17 @@
use heed::{types::Str, Database, Env, RwTxn};
mod m0;
use super::tables;
pub const TABLES_COUNT: u32 = 2;
pub const TABLES_COUNT: u32 = 4;
pub fn create_users_table(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: tables::users::UsersTable =
env.create_database(wtxn, Some(tables::users::USERS_TABLE))?;
Ok(())
}
pub fn create_sessions_table(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: tables::sessions::SessionsTable =
env.create_database(wtxn, Some(tables::sessions::SESSIONS_TABLE))?;
pub fn run(env: &Env) -> anyhow::Result<()> {
let mut wtxn = env.write_txn()?;
m0::run(env, &mut wtxn);
wtxn.commit()?;
Ok(())
}

View File

@@ -0,0 +1,15 @@
use heed::{types::Str, Database, Env, RwTxn};
use super::tables::{blobs, entries, sessions, users};
pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?;
let _: sessions::SessionsTable = env.create_database(wtxn, Some(sessions::SESSIONS_TABLE))?;
let _: blobs::BlobsTable = env.create_database(wtxn, Some(blobs::BLOBS_TABLE))?;
let _: entries::EntriesTable = env.create_database(wtxn, Some(entries::ENTRIES_TABLE))?;
Ok(())
}

View File

@@ -1,2 +1,4 @@
pub mod blobs;
pub mod entries;
pub mod sessions;
pub mod users;

View File

@@ -0,0 +1,13 @@
use std::{borrow::Cow, time::SystemTime};
use heed::{
types::{Bytes, Str},
BoxedError, BytesDecode, BytesEncode, Database,
};
use pubky_common::crypto::Hash;
/// hash of the blob => bytes.
pub type BlobsTable = Database<Hash, Bytes>;
pub const BLOBS_TABLE: &str = "blobs";

View File

@@ -0,0 +1,66 @@
use postcard::{from_bytes, to_allocvec};
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, time::SystemTime};
use heed::{
types::{Bytes, Str},
BoxedError, BytesDecode, BytesEncode, Database,
};
use pubky_common::crypto::Hash;
/// full_path(pubky/*path) => Entry.
pub type EntriesTable = Database<Hash, Entry>;
pub const ENTRIES_TABLE: &str = "entries";
#[derive(Clone, Default, Serialize, Deserialize, Debug, Eq, PartialEq)]
pub struct Entry {
/// Encoding version
version: usize,
/// Modified at
timestamp: u64,
content_hash: [u8; 32],
content_length: usize,
content_type: String,
// user_metadata: ?
}
// TODO: get headers like Etag
impl Entry {
pub fn new() -> Self {
Default::default()
}
// === Setters ===
pub fn set_content_hash(&mut self, content_hash: Hash) -> &mut Self {
content_hash.as_bytes().clone_into(&mut self.content_hash);
self
}
pub fn set_content_length(&mut self, content_length: usize) -> &mut Self {
self.content_length = content_length;
self
}
pub fn set_content_type(&mut self, content_type: &str) -> &mut Self {
self.content_type = content_type.to_string();
self
}
// === Getters ===
pub fn content_hash(&self) -> &[u8; 32] {
&self.content_hash
}
pub fn content_length(&self) -> usize {
self.content_length
}
pub fn content_type(&self) -> &str {
&self.content_type
}
}

View File

@@ -1,4 +1,5 @@
use axum::{
extract::DefaultBodyLimit,
routing::{delete, get, post, put},
Router,
};
@@ -18,8 +19,11 @@ pub fn create_app(state: AppState) -> Router {
.route("/:pubky/session", get(auth::session))
.route("/:pubky/session", post(auth::signin))
.route("/:pubky/session", delete(auth::signout))
.route("/:pubky/*key", get(drive::put))
.route("/:pubky/*key", put(drive::put))
.layer(TraceLayer::new_for_http())
.layer(CookieManagerLayer::new())
// TODO: revisit if we enable streaming big payloads
// TODO: maybe add to a separate router (drive router?).
.layer(DefaultBodyLimit::max(16 * 1024))
.with_state(state)
}

View File

@@ -2,7 +2,6 @@ use axum::{
extract::{Request, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
routing::get,
Router,
};
use axum_extra::{headers::UserAgent, TypedHeader};
@@ -103,7 +102,11 @@ pub async fn signin(
state.verifier.verify(&body, public_key)?;
let mut wtxn = state.db.env.write_txn()?;
let users: UsersTable = state.db.env.create_database(&mut wtxn, Some(USERS_TABLE))?;
let users: UsersTable = state
.db
.env
.open_database(&wtxn, Some(USERS_TABLE))?
.expect("Users table already created");
if let Some(existing) = users.get(&wtxn, public_key)? {
users.put(&mut wtxn, public_key, &existing)?;

View File

@@ -1,11 +1,63 @@
use axum::response::IntoResponse;
use axum::{
body::{Body, Bytes},
extract::{Path, State},
http::StatusCode,
response::IntoResponse,
RequestExt, Router,
};
use futures_util::stream::StreamExt;
use tracing::debug;
use crate::extractors::Pubky;
use pubky_common::crypto::Hasher;
pub async fn put(pubky: Pubky) -> Result<impl IntoResponse, String> {
debug!(pubky=?pubky.public_key());
use crate::{
database::tables::blobs::{BlobsTable, BLOBS_TABLE},
error::{Error, Result},
extractors::Pubky,
server::AppState,
};
pub async fn put(
State(state): State<AppState>,
pubky: Pubky,
// Path(key): Path<String>,
mut body: Body,
) -> Result<impl IntoResponse> {
let mut stream = body.into_data_stream();
let (tx, rx) = flume::bounded::<Bytes>(1);
// Offload the write transaction to a blocking task
let done = tokio::task::spawn_blocking(move || {
// TODO: this is a blocking operation, which is ok for small
// payloads (we have 16 kb limit for now) but later we need
// to stream this to filesystem, and keep track of any failed
// writes to GC these files later.
let mut wtxn = state.db.env.write_txn().unwrap();
let blobs: BlobsTable = state
.db
.env
.open_database(&wtxn, Some(BLOBS_TABLE))
.unwrap()
.expect("Blobs table already created");
let hasher = Hasher::new();
while let Ok(chunk) = rx.recv() {
dbg!(chunk);
}
});
while let Some(next) = stream.next().await {
let chunk = next
.map_err(|err| Error::new(StatusCode::INTERNAL_SERVER_ERROR, Some(err.to_string())))?;
tx.send(chunk);
}
let _ = done.await;
Ok("Pubky drive...".to_string())
}

View File

@@ -3,7 +3,7 @@ use std::{future::IntoFuture, net::SocketAddr};
use anyhow::{Error, Result};
use pubky_common::auth::AuthnVerifier;
use tokio::{net::TcpListener, signal, task::JoinSet};
use tracing::{info, warn};
use tracing::{debug, info, warn};
use pkarr::{
mainline::dht::{DhtSettings, Testnet},
@@ -27,6 +27,8 @@ pub(crate) struct AppState {
impl Homeserver {
pub async fn start(config: Config) -> Result<Self> {
debug!(?config);
let public_key = config.keypair().public_key();
let db = DB::open(&config.storage()?)?;