diff --git a/Cargo.lock b/Cargo.lock index 2587b25..c8096be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1238,6 +1238,18 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-relay" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "axum-server", + "futures-util", + "tokio", + "url", +] + [[package]] name = "httparse" version = "1.9.5" @@ -2084,6 +2096,7 @@ dependencies = [ "cookie_store", "futures-lite", "futures-util", + "http-relay", "js-sys", "log", "pkarr", diff --git a/Cargo.toml b/Cargo.toml index eb66b50..b8ec7b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,8 @@ members = [ "pubky", "pubky-*", + "http-relay", + "examples" ] diff --git a/http-relay/README.md b/http-relay/README.md new file mode 100644 index 0000000..f11968e --- /dev/null +++ b/http-relay/README.md @@ -0,0 +1,7 @@ +# HTTP Relay + +A Rust implementation of _some_ of [Http relay spec](https://httprelay.io/). + +Normally you are better off running the [reference implementation's binary](https://httprelay.io/download/). + +This implementation, for the time being is meant for having a convenient library to be used in unit tests, and testnets in Pubky. diff --git a/http-relay/cargo.toml b/http-relay/cargo.toml new file mode 100644 index 0000000..33035d1 --- /dev/null +++ b/http-relay/cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "http-relay" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1.0.94" +axum = "0.7.9" +axum-server = "0.7.1" +futures-util = "0.3.31" +tokio = { version = "1.42.0", features = ["full"] } +url = "2.5.4" diff --git a/http-relay/src/lib.rs b/http-relay/src/lib.rs new file mode 100644 index 0000000..e55e804 --- /dev/null +++ b/http-relay/src/lib.rs @@ -0,0 +1,141 @@ +use std::{ + collections::HashMap, + net::SocketAddr, + sync::{Arc, Mutex}, +}; + +use anyhow::Result; + +use axum::{ + body::{Body, Bytes}, + extract::{Path, State}, + response::IntoResponse, + routing::get, + Router, +}; +use axum_server::Handle; +use tokio::sync::Notify; + +use futures_util::stream::StreamExt; +use url::Url; + +// Shared state to store GET requests and their notifications +type SharedState = Arc, Arc)>>>; + +pub struct HttpRelay { + pub(crate) http_handle: Handle, +} + +impl HttpRelay { + pub async fn start() -> Result { + let shared_state: SharedState = Arc::new(Mutex::new(HashMap::new())); + + let app = Router::new() + .route("/link/:id", get(link::get).post(link::post)) + .with_state(shared_state); + + let http_handle = Handle::new(); + + let cloned = http_handle.clone(); + tokio::spawn(async { + axum_server::bind(SocketAddr::from(([127, 0, 0, 1], 0))) + .handle(cloned) + .serve(app.into_make_service()) + .await + .unwrap(); + }); + + Ok(Self { http_handle }) + } + + pub async fn http_address(&self) -> Result { + match self.http_handle.listening().await { + Some(addr) => Ok(addr), + None => Err(anyhow::anyhow!("Failed to bind to http port")), + } + } + + /// Returns the localhost Url of this server. + pub async fn local_url(&self) -> Result { + match self.http_handle.listening().await { + Some(addr) => Ok(Url::parse(&format!("http://localhost:{}", addr.port()))?), + None => Err(anyhow::anyhow!("Failed to bind to http port")), + } + } + + /// Returns the localhost URL of Link endpoints + pub async fn local_link_url(&self) -> Result { + let mut url = self.local_url().await?; + + let mut segments = url + .path_segments_mut() + .expect("HttpRelay::local_link_url path_segments_mut"); + + segments.push("link"); + + drop(segments); + + Ok(url) + } + + pub fn shutdown(&self) { + self.http_handle.shutdown(); + } +} + +mod link { + use super::*; + + pub async fn get( + Path(id): Path, + State(state): State, + ) -> impl IntoResponse { + // Create a notification for this ID + let notify = Arc::new(Notify::new()); + + { + let mut map = state.lock().unwrap(); + + // Store the notification and return it when POST arrives + map.entry(id.clone()) + .or_insert_with(|| (vec![], notify.clone())); + } + + notify.notified().await; + + // Respond with the data stored for this ID + let map = state.lock().unwrap(); + if let Some((data, _)) = map.get(&id) { + Bytes::from(data.clone()).into_response() + } else { + (axum::http::StatusCode::NOT_FOUND, "Not Found").into_response() + } + } + + pub async fn post( + Path(id): Path, + State(state): State, + body: Body, + ) -> impl IntoResponse { + // Aggregate the body into bytes + let mut stream = body.into_data_stream(); + let mut bytes = vec![]; + while let Some(next) = stream.next().await { + let chunk = next.map_err(|e| e.to_string()).unwrap(); + bytes.extend_from_slice(&chunk); + } + + // Notify any waiting GET request for this ID + let mut map = state.lock().unwrap(); + if let Some((storage, notify)) = map.get_mut(&id) { + *storage = bytes; + notify.notify_one(); + Ok(()) + } else { + Err(( + axum::http::StatusCode::NOT_FOUND, + "No waiting GET request for this ID", + )) + } + } +} diff --git a/pubky-homeserver/src/core/extractors.rs b/pubky-homeserver/src/core/extractors.rs index 0037869..8fdf68f 100644 --- a/pubky-homeserver/src/core/extractors.rs +++ b/pubky-homeserver/src/core/extractors.rs @@ -32,9 +32,9 @@ where let headers_to_check = ["host", "pkarr-host"]; for header in headers_to_check { - if let Some(Ok(pubky_host)) = parts.headers.get(header).map(|h| h.to_str()) { - if let Ok(public_key) = PublicKey::try_from(pubky_host) { - tracing::debug!(?pubky_host); + if let Some(Ok(pkarr_host)) = parts.headers.get(header).map(|h| h.to_str()) { + if let Ok(public_key) = PublicKey::try_from(pkarr_host) { + tracing::debug!(?pkarr_host); return Ok(Pubky(public_key)); } diff --git a/pubky-homeserver/src/io/http.rs b/pubky-homeserver/src/io/http.rs index 682c139..06176d1 100644 --- a/pubky-homeserver/src/io/http.rs +++ b/pubky-homeserver/src/io/http.rs @@ -20,8 +20,6 @@ pub struct HttpServers { pub(crate) http_handle: Handle, /// Handle for the HTTPS server using Pkarr TLS pub(crate) https_handle: Handle, - // /// Handle for a mock relay used in testnet - // pub(crate) mock_pkarr_relay_handle: Handle, } impl HttpServers { @@ -62,8 +60,6 @@ impl HttpServers { .map_err(|error| tracing::error!(?error, "Homeserver https server error")), ); - // let mock_pkarr_relay_listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], 15411)))?; - Ok(Self { http_handle, https_handle, diff --git a/pubky/Cargo.toml b/pubky/Cargo.toml index 72d821a..859d0ed 100644 --- a/pubky/Cargo.toml +++ b/pubky/Cargo.toml @@ -46,6 +46,7 @@ anyhow = "1.0.94" axum = "0.7.9" axum-server = "0.7.1" futures-util = "0.3.31" +http-relay = { path = "../http-relay" } pubky-homeserver = { path = "../pubky-homeserver" } tokio = "1.42.0" diff --git a/pubky/src/shared/auth.rs b/pubky/src/shared/auth.rs index fcc319f..9bfee23 100644 --- a/pubky/src/shared/auth.rs +++ b/pubky/src/shared/auth.rs @@ -217,11 +217,9 @@ impl Client { #[cfg(test)] mod tests { - - use std::net::SocketAddr; - use crate::*; + use http_relay::HttpRelay; use pkarr::{mainline::Testnet, Keypair}; use pubky_common::capabilities::{Capabilities, Capability}; use pubky_homeserver::Homeserver; @@ -268,106 +266,13 @@ mod tests { } } - async fn http_relay_server() -> Option { - use axum::{ - body::{Body, Bytes}, - extract::{Path, State}, - response::IntoResponse, - routing::get, - Router, - }; - use axum_server::Handle; - use futures_util::stream::StreamExt; - use std::{ - collections::HashMap, - net::SocketAddr, - sync::{Arc, Mutex}, - }; - use tokio::sync::Notify; - - // Shared state to store GET requests and their notifications - type SharedState = Arc, Arc)>>>; - let shared_state: SharedState = Arc::new(Mutex::new(HashMap::new())); - - // Handler for the GET endpoint - async fn subscribe( - Path(id): Path, - State(state): State, - ) -> impl IntoResponse { - // Create a notification for this ID - let notify = Arc::new(Notify::new()); - - { - let mut map = state.lock().unwrap(); - - // Store the notification and return it when POST arrives - map.entry(id.clone()) - .or_insert_with(|| (vec![], notify.clone())); - } - - notify.notified().await; - - // Respond with the data stored for this ID - let map = state.lock().unwrap(); - if let Some((data, _)) = map.get(&id) { - Bytes::from(data.clone()).into_response() - } else { - (axum::http::StatusCode::NOT_FOUND, "Not Found").into_response() - } - } - - // Handler for the POST endpoint - async fn publish( - Path(id): Path, - State(state): State, - body: Body, - ) -> impl IntoResponse { - // Aggregate the body into bytes - let mut stream = body.into_data_stream(); - let mut bytes = vec![]; - while let Some(next) = stream.next().await { - let chunk = next.map_err(|e| e.to_string()).unwrap(); - bytes.extend_from_slice(&chunk); - } - - // Notify any waiting GET request for this ID - let mut map = state.lock().unwrap(); - if let Some((storage, notify)) = map.get_mut(&id) { - *storage = bytes; - notify.notify_one(); - Ok(()) - } else { - Err(( - axum::http::StatusCode::NOT_FOUND, - "No waiting GET request for this ID", - )) - } - } - - let app = Router::new() - .route("/:id", get(subscribe).post(publish)) - .with_state(shared_state); - - let handle = Handle::new(); - - let cloned = handle.clone(); - tokio::spawn(async { - axum_server::bind(SocketAddr::from(([127, 0, 0, 1], 0))) - .handle(cloned) - .serve(app.into_make_service()) - .await - .unwrap(); - }); - - handle.listening().await - } - #[tokio::test] async fn authz() { let testnet = Testnet::new(10).unwrap(); let server = Homeserver::start_test(&testnet).await.unwrap(); - let http_relay_url = http_relay_server().await.unwrap(); + let http_relay = HttpRelay::start().await.unwrap(); + let http_relay_url = http_relay.local_link_url().await.unwrap(); let keypair = Keypair::random(); let pubky = keypair.public_key(); @@ -377,12 +282,8 @@ mod tests { "/pub/pubky.app/:rw,/pub/foo.bar/file:r".try_into().unwrap(); let client = Client::test(&testnet); - let (pubkyauth_url, pubkyauth_response) = client - .auth_request( - &format!("http://{}", http_relay_url.to_string()), - &capabilities, - ) - .unwrap(); + let (pubkyauth_url, pubkyauth_response) = + client.auth_request(http_relay_url, &capabilities).unwrap(); // Authenticator side {