diff --git a/Cargo.lock b/Cargo.lock index 26bae45..74ef877 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,6 +142,8 @@ dependencies = [ "mime", "pin-project-lite", "serde", + "tokio", + "tokio-util", "tower", "tower-layer", "tower-service", @@ -1283,6 +1285,8 @@ dependencies = [ "base32", "bytes", "dirs-next", + "flume", + "futures-util", "heed", "pkarr", "postcard", @@ -1826,6 +1830,19 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-util" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/pubky-common/src/crypto.rs b/pubky-common/src/crypto.rs index 2f8131c..ec8f58a 100644 --- a/pubky-common/src/crypto.rs +++ b/pubky-common/src/crypto.rs @@ -8,6 +8,8 @@ pub type Hash = blake3::Hash; pub use blake3::hash; +pub use blake3::Hasher; + pub fn random_hash() -> Hash { let mut rng = rand::thread_rng(); Hash::from_bytes(rng.gen()) diff --git a/pubky-homeserver/Cargo.toml b/pubky-homeserver/Cargo.toml index 33e18ab..da0c5c7 100644 --- a/pubky-homeserver/Cargo.toml +++ b/pubky-homeserver/Cargo.toml @@ -6,10 +6,12 @@ edition = "2021" [dependencies] anyhow = "1.0.82" axum = "0.7.5" -axum-extra = { version = "0.9.3", features = ["typed-header"] } +axum-extra = { version = "0.9.3", features = ["typed-header", "async-read-body"] } base32 = "0.5.1" bytes = "1.6.1" dirs-next = "2.0.0" +flume = "0.11.0" +futures-util = "0.3.30" heed = "0.20.3" pkarr = { version = "2.1.0", features = ["async"] } postcard = { version = "1.0.8", features = ["alloc"] } diff --git a/pubky-homeserver/src/config.rs b/pubky-homeserver/src/config.rs index 7c9fcfe..3657ecd 100644 --- a/pubky-homeserver/src/config.rs +++ b/pubky-homeserver/src/config.rs @@ -40,13 +40,16 @@ impl Config { /// Test configurations pub fn test(testnet: &pkarr::mainline::Testnet) -> Self { + let bootstrap = Some(testnet.bootstrap.to_owned()); + let storage = Some( + std::env::temp_dir() + .join(Timestamp::now().to_string()) + .join(DEFAULT_STORAGE_DIR), + ); + Self { - bootstrap: Some(testnet.bootstrap.to_owned()), - storage: Some( - std::env::temp_dir() - .join(Timestamp::now().to_string()) - .join(DEFAULT_STORAGE_DIR), - ), + bootstrap, + storage, ..Default::default() } } diff --git a/pubky-homeserver/src/database.rs b/pubky-homeserver/src/database.rs index 2f8d591..0eb3200 100644 --- a/pubky-homeserver/src/database.rs +++ b/pubky-homeserver/src/database.rs @@ -19,21 +19,10 @@ impl DB { let env = unsafe { EnvOpenOptions::new().max_dbs(TABLES_COUNT).open(storage) }?; - let db = DB { env }; + migrations::run(&env); - db.run_migrations(); + let db = DB { env }; Ok(db) } - - fn run_migrations(&self) -> anyhow::Result<()> { - let mut wtxn = self.env.write_txn()?; - - migrations::create_users_table(&self.env, &mut wtxn); - migrations::create_sessions_table(&self.env, &mut wtxn); - - wtxn.commit()?; - - Ok(()) - } } diff --git a/pubky-homeserver/src/database/migrations.rs b/pubky-homeserver/src/database/migrations.rs index 93c7631..dbead07 100644 --- a/pubky-homeserver/src/database/migrations.rs +++ b/pubky-homeserver/src/database/migrations.rs @@ -1,19 +1,17 @@ use heed::{types::Str, Database, Env, RwTxn}; +mod m0; + use super::tables; -pub const TABLES_COUNT: u32 = 2; +pub const TABLES_COUNT: u32 = 4; -pub fn create_users_table(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> { - let _: tables::users::UsersTable = - env.create_database(wtxn, Some(tables::users::USERS_TABLE))?; - - Ok(()) -} - -pub fn create_sessions_table(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> { - let _: tables::sessions::SessionsTable = - env.create_database(wtxn, Some(tables::sessions::SESSIONS_TABLE))?; +pub fn run(env: &Env) -> anyhow::Result<()> { + let mut wtxn = env.write_txn()?; + + m0::run(env, &mut wtxn); + + wtxn.commit()?; Ok(()) } diff --git a/pubky-homeserver/src/database/migrations/m0.rs b/pubky-homeserver/src/database/migrations/m0.rs new file mode 100644 index 0000000..74d89c4 --- /dev/null +++ b/pubky-homeserver/src/database/migrations/m0.rs @@ -0,0 +1,15 @@ +use heed::{types::Str, Database, Env, RwTxn}; + +use super::tables::{blobs, entries, sessions, users}; + +pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> { + let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?; + + let _: sessions::SessionsTable = env.create_database(wtxn, Some(sessions::SESSIONS_TABLE))?; + + let _: blobs::BlobsTable = env.create_database(wtxn, Some(blobs::BLOBS_TABLE))?; + + let _: entries::EntriesTable = env.create_database(wtxn, Some(entries::ENTRIES_TABLE))?; + + Ok(()) +} diff --git a/pubky-homeserver/src/database/tables.rs b/pubky-homeserver/src/database/tables.rs index b6e3efc..4f0c1c5 100644 --- a/pubky-homeserver/src/database/tables.rs +++ b/pubky-homeserver/src/database/tables.rs @@ -1,2 +1,4 @@ +pub mod blobs; +pub mod entries; pub mod sessions; pub mod users; diff --git a/pubky-homeserver/src/database/tables/blobs.rs b/pubky-homeserver/src/database/tables/blobs.rs new file mode 100644 index 0000000..9cf1da1 --- /dev/null +++ b/pubky-homeserver/src/database/tables/blobs.rs @@ -0,0 +1,13 @@ +use std::{borrow::Cow, time::SystemTime}; + +use heed::{ + types::{Bytes, Str}, + BoxedError, BytesDecode, BytesEncode, Database, +}; + +use pubky_common::crypto::Hash; + +/// hash of the blob => bytes. +pub type BlobsTable = Database; + +pub const BLOBS_TABLE: &str = "blobs"; diff --git a/pubky-homeserver/src/database/tables/entries.rs b/pubky-homeserver/src/database/tables/entries.rs new file mode 100644 index 0000000..5a1cc8e --- /dev/null +++ b/pubky-homeserver/src/database/tables/entries.rs @@ -0,0 +1,66 @@ +use postcard::{from_bytes, to_allocvec}; +use serde::{Deserialize, Serialize}; +use std::{borrow::Cow, time::SystemTime}; + +use heed::{ + types::{Bytes, Str}, + BoxedError, BytesDecode, BytesEncode, Database, +}; + +use pubky_common::crypto::Hash; + +/// full_path(pubky/*path) => Entry. +pub type EntriesTable = Database; + +pub const ENTRIES_TABLE: &str = "entries"; + +#[derive(Clone, Default, Serialize, Deserialize, Debug, Eq, PartialEq)] +pub struct Entry { + /// Encoding version + version: usize, + /// Modified at + timestamp: u64, + content_hash: [u8; 32], + content_length: usize, + content_type: String, + // user_metadata: ? +} + +// TODO: get headers like Etag + +impl Entry { + pub fn new() -> Self { + Default::default() + } + + // === Setters === + + pub fn set_content_hash(&mut self, content_hash: Hash) -> &mut Self { + content_hash.as_bytes().clone_into(&mut self.content_hash); + self + } + + pub fn set_content_length(&mut self, content_length: usize) -> &mut Self { + self.content_length = content_length; + self + } + + pub fn set_content_type(&mut self, content_type: &str) -> &mut Self { + self.content_type = content_type.to_string(); + self + } + + // === Getters === + + pub fn content_hash(&self) -> &[u8; 32] { + &self.content_hash + } + + pub fn content_length(&self) -> usize { + self.content_length + } + + pub fn content_type(&self) -> &str { + &self.content_type + } +} diff --git a/pubky-homeserver/src/routes.rs b/pubky-homeserver/src/routes.rs index 86120c2..6099858 100644 --- a/pubky-homeserver/src/routes.rs +++ b/pubky-homeserver/src/routes.rs @@ -1,4 +1,5 @@ use axum::{ + extract::DefaultBodyLimit, routing::{delete, get, post, put}, Router, }; @@ -18,8 +19,11 @@ pub fn create_app(state: AppState) -> Router { .route("/:pubky/session", get(auth::session)) .route("/:pubky/session", post(auth::signin)) .route("/:pubky/session", delete(auth::signout)) - .route("/:pubky/*key", get(drive::put)) + .route("/:pubky/*key", put(drive::put)) .layer(TraceLayer::new_for_http()) .layer(CookieManagerLayer::new()) + // TODO: revisit if we enable streaming big payloads + // TODO: maybe add to a separate router (drive router?). + .layer(DefaultBodyLimit::max(16 * 1024)) .with_state(state) } diff --git a/pubky-homeserver/src/routes/auth.rs b/pubky-homeserver/src/routes/auth.rs index fceb6fe..c38aa38 100644 --- a/pubky-homeserver/src/routes/auth.rs +++ b/pubky-homeserver/src/routes/auth.rs @@ -2,7 +2,6 @@ use axum::{ extract::{Request, State}, http::{HeaderMap, StatusCode}, response::IntoResponse, - routing::get, Router, }; use axum_extra::{headers::UserAgent, TypedHeader}; @@ -103,7 +102,11 @@ pub async fn signin( state.verifier.verify(&body, public_key)?; let mut wtxn = state.db.env.write_txn()?; - let users: UsersTable = state.db.env.create_database(&mut wtxn, Some(USERS_TABLE))?; + let users: UsersTable = state + .db + .env + .open_database(&wtxn, Some(USERS_TABLE))? + .expect("Users table already created"); if let Some(existing) = users.get(&wtxn, public_key)? { users.put(&mut wtxn, public_key, &existing)?; diff --git a/pubky-homeserver/src/routes/drive.rs b/pubky-homeserver/src/routes/drive.rs index 3050250..12f8fb8 100644 --- a/pubky-homeserver/src/routes/drive.rs +++ b/pubky-homeserver/src/routes/drive.rs @@ -1,11 +1,63 @@ -use axum::response::IntoResponse; +use axum::{ + body::{Body, Bytes}, + extract::{Path, State}, + http::StatusCode, + response::IntoResponse, + RequestExt, Router, +}; +use futures_util::stream::StreamExt; use tracing::debug; -use crate::extractors::Pubky; +use pubky_common::crypto::Hasher; -pub async fn put(pubky: Pubky) -> Result { - debug!(pubky=?pubky.public_key()); +use crate::{ + database::tables::blobs::{BlobsTable, BLOBS_TABLE}, + error::{Error, Result}, + extractors::Pubky, + server::AppState, +}; + +pub async fn put( + State(state): State, + pubky: Pubky, + // Path(key): Path, + mut body: Body, +) -> Result { + let mut stream = body.into_data_stream(); + + let (tx, rx) = flume::bounded::(1); + + // Offload the write transaction to a blocking task + let done = tokio::task::spawn_blocking(move || { + // TODO: this is a blocking operation, which is ok for small + // payloads (we have 16 kb limit for now) but later we need + // to stream this to filesystem, and keep track of any failed + // writes to GC these files later. + + let mut wtxn = state.db.env.write_txn().unwrap(); + let blobs: BlobsTable = state + .db + .env + .open_database(&wtxn, Some(BLOBS_TABLE)) + .unwrap() + .expect("Blobs table already created"); + + let hasher = Hasher::new(); + + while let Ok(chunk) = rx.recv() { + dbg!(chunk); + } + }); + + while let Some(next) = stream.next().await { + let chunk = next + .map_err(|err| Error::new(StatusCode::INTERNAL_SERVER_ERROR, Some(err.to_string())))?; + + tx.send(chunk); + } + + let _ = done.await; Ok("Pubky drive...".to_string()) } diff --git a/pubky-homeserver/src/server.rs b/pubky-homeserver/src/server.rs index f167d05..0a2f3ae 100644 --- a/pubky-homeserver/src/server.rs +++ b/pubky-homeserver/src/server.rs @@ -3,7 +3,7 @@ use std::{future::IntoFuture, net::SocketAddr}; use anyhow::{Error, Result}; use pubky_common::auth::AuthnVerifier; use tokio::{net::TcpListener, signal, task::JoinSet}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use pkarr::{ mainline::dht::{DhtSettings, Testnet}, @@ -27,6 +27,8 @@ pub(crate) struct AppState { impl Homeserver { pub async fn start(config: Config) -> Result { + debug!(?config); + let public_key = config.keypair().public_key(); let db = DB::open(&config.storage()?)?;