feat(http-relay): add http relay crate to help with local testing

This commit is contained in:
nazeh
2024-12-17 13:36:05 +03:00
parent b4ef0ca810
commit de7d34eca1
9 changed files with 184 additions and 111 deletions

13
Cargo.lock generated
View File

@@ -1238,6 +1238,18 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "http-relay"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"axum-server",
"futures-util",
"tokio",
"url",
]
[[package]]
name = "httparse"
version = "1.9.5"
@@ -2084,6 +2096,7 @@ dependencies = [
"cookie_store",
"futures-lite",
"futures-util",
"http-relay",
"js-sys",
"log",
"pkarr",

View File

@@ -3,6 +3,8 @@ members = [
"pubky",
"pubky-*",
"http-relay",
"examples"
]

7
http-relay/README.md Normal file
View File

@@ -0,0 +1,7 @@
# HTTP Relay
A Rust implementation of _some_ of [Http relay spec](https://httprelay.io/).
Normally you are better off running the [reference implementation's binary](https://httprelay.io/download/).
This implementation, for the time being is meant for having a convenient library to be used in unit tests, and testnets in Pubky.

12
http-relay/cargo.toml Normal file
View File

@@ -0,0 +1,12 @@
[package]
name = "http-relay"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.94"
axum = "0.7.9"
axum-server = "0.7.1"
futures-util = "0.3.31"
tokio = { version = "1.42.0", features = ["full"] }
url = "2.5.4"

141
http-relay/src/lib.rs Normal file
View File

@@ -0,0 +1,141 @@
use std::{
collections::HashMap,
net::SocketAddr,
sync::{Arc, Mutex},
};
use anyhow::Result;
use axum::{
body::{Body, Bytes},
extract::{Path, State},
response::IntoResponse,
routing::get,
Router,
};
use axum_server::Handle;
use tokio::sync::Notify;
use futures_util::stream::StreamExt;
use url::Url;
// Shared state to store GET requests and their notifications
type SharedState = Arc<Mutex<HashMap<String, (Vec<u8>, Arc<Notify>)>>>;
pub struct HttpRelay {
pub(crate) http_handle: Handle,
}
impl HttpRelay {
pub async fn start() -> Result<Self> {
let shared_state: SharedState = Arc::new(Mutex::new(HashMap::new()));
let app = Router::new()
.route("/link/:id", get(link::get).post(link::post))
.with_state(shared_state);
let http_handle = Handle::new();
let cloned = http_handle.clone();
tokio::spawn(async {
axum_server::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.handle(cloned)
.serve(app.into_make_service())
.await
.unwrap();
});
Ok(Self { http_handle })
}
pub async fn http_address(&self) -> Result<SocketAddr> {
match self.http_handle.listening().await {
Some(addr) => Ok(addr),
None => Err(anyhow::anyhow!("Failed to bind to http port")),
}
}
/// Returns the localhost Url of this server.
pub async fn local_url(&self) -> Result<Url> {
match self.http_handle.listening().await {
Some(addr) => Ok(Url::parse(&format!("http://localhost:{}", addr.port()))?),
None => Err(anyhow::anyhow!("Failed to bind to http port")),
}
}
/// Returns the localhost URL of Link endpoints
pub async fn local_link_url(&self) -> Result<Url> {
let mut url = self.local_url().await?;
let mut segments = url
.path_segments_mut()
.expect("HttpRelay::local_link_url path_segments_mut");
segments.push("link");
drop(segments);
Ok(url)
}
pub fn shutdown(&self) {
self.http_handle.shutdown();
}
}
mod link {
use super::*;
pub async fn get(
Path(id): Path<String>,
State(state): State<SharedState>,
) -> impl IntoResponse {
// Create a notification for this ID
let notify = Arc::new(Notify::new());
{
let mut map = state.lock().unwrap();
// Store the notification and return it when POST arrives
map.entry(id.clone())
.or_insert_with(|| (vec![], notify.clone()));
}
notify.notified().await;
// Respond with the data stored for this ID
let map = state.lock().unwrap();
if let Some((data, _)) = map.get(&id) {
Bytes::from(data.clone()).into_response()
} else {
(axum::http::StatusCode::NOT_FOUND, "Not Found").into_response()
}
}
pub async fn post(
Path(id): Path<String>,
State(state): State<SharedState>,
body: Body,
) -> impl IntoResponse {
// Aggregate the body into bytes
let mut stream = body.into_data_stream();
let mut bytes = vec![];
while let Some(next) = stream.next().await {
let chunk = next.map_err(|e| e.to_string()).unwrap();
bytes.extend_from_slice(&chunk);
}
// Notify any waiting GET request for this ID
let mut map = state.lock().unwrap();
if let Some((storage, notify)) = map.get_mut(&id) {
*storage = bytes;
notify.notify_one();
Ok(())
} else {
Err((
axum::http::StatusCode::NOT_FOUND,
"No waiting GET request for this ID",
))
}
}
}

View File

@@ -32,9 +32,9 @@ where
let headers_to_check = ["host", "pkarr-host"];
for header in headers_to_check {
if let Some(Ok(pubky_host)) = parts.headers.get(header).map(|h| h.to_str()) {
if let Ok(public_key) = PublicKey::try_from(pubky_host) {
tracing::debug!(?pubky_host);
if let Some(Ok(pkarr_host)) = parts.headers.get(header).map(|h| h.to_str()) {
if let Ok(public_key) = PublicKey::try_from(pkarr_host) {
tracing::debug!(?pkarr_host);
return Ok(Pubky(public_key));
}

View File

@@ -20,8 +20,6 @@ pub struct HttpServers {
pub(crate) http_handle: Handle,
/// Handle for the HTTPS server using Pkarr TLS
pub(crate) https_handle: Handle,
// /// Handle for a mock relay used in testnet
// pub(crate) mock_pkarr_relay_handle: Handle,
}
impl HttpServers {
@@ -62,8 +60,6 @@ impl HttpServers {
.map_err(|error| tracing::error!(?error, "Homeserver https server error")),
);
// let mock_pkarr_relay_listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], 15411)))?;
Ok(Self {
http_handle,
https_handle,

View File

@@ -46,6 +46,7 @@ anyhow = "1.0.94"
axum = "0.7.9"
axum-server = "0.7.1"
futures-util = "0.3.31"
http-relay = { path = "../http-relay" }
pubky-homeserver = { path = "../pubky-homeserver" }
tokio = "1.42.0"

View File

@@ -217,11 +217,9 @@ impl Client {
#[cfg(test)]
mod tests {
use std::net::SocketAddr;
use crate::*;
use http_relay::HttpRelay;
use pkarr::{mainline::Testnet, Keypair};
use pubky_common::capabilities::{Capabilities, Capability};
use pubky_homeserver::Homeserver;
@@ -268,106 +266,13 @@ mod tests {
}
}
async fn http_relay_server() -> Option<SocketAddr> {
use axum::{
body::{Body, Bytes},
extract::{Path, State},
response::IntoResponse,
routing::get,
Router,
};
use axum_server::Handle;
use futures_util::stream::StreamExt;
use std::{
collections::HashMap,
net::SocketAddr,
sync::{Arc, Mutex},
};
use tokio::sync::Notify;
// Shared state to store GET requests and their notifications
type SharedState = Arc<Mutex<HashMap<String, (Vec<u8>, Arc<Notify>)>>>;
let shared_state: SharedState = Arc::new(Mutex::new(HashMap::new()));
// Handler for the GET endpoint
async fn subscribe(
Path(id): Path<String>,
State(state): State<SharedState>,
) -> impl IntoResponse {
// Create a notification for this ID
let notify = Arc::new(Notify::new());
{
let mut map = state.lock().unwrap();
// Store the notification and return it when POST arrives
map.entry(id.clone())
.or_insert_with(|| (vec![], notify.clone()));
}
notify.notified().await;
// Respond with the data stored for this ID
let map = state.lock().unwrap();
if let Some((data, _)) = map.get(&id) {
Bytes::from(data.clone()).into_response()
} else {
(axum::http::StatusCode::NOT_FOUND, "Not Found").into_response()
}
}
// Handler for the POST endpoint
async fn publish(
Path(id): Path<String>,
State(state): State<SharedState>,
body: Body,
) -> impl IntoResponse {
// Aggregate the body into bytes
let mut stream = body.into_data_stream();
let mut bytes = vec![];
while let Some(next) = stream.next().await {
let chunk = next.map_err(|e| e.to_string()).unwrap();
bytes.extend_from_slice(&chunk);
}
// Notify any waiting GET request for this ID
let mut map = state.lock().unwrap();
if let Some((storage, notify)) = map.get_mut(&id) {
*storage = bytes;
notify.notify_one();
Ok(())
} else {
Err((
axum::http::StatusCode::NOT_FOUND,
"No waiting GET request for this ID",
))
}
}
let app = Router::new()
.route("/:id", get(subscribe).post(publish))
.with_state(shared_state);
let handle = Handle::new();
let cloned = handle.clone();
tokio::spawn(async {
axum_server::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.handle(cloned)
.serve(app.into_make_service())
.await
.unwrap();
});
handle.listening().await
}
#[tokio::test]
async fn authz() {
let testnet = Testnet::new(10).unwrap();
let server = Homeserver::start_test(&testnet).await.unwrap();
let http_relay_url = http_relay_server().await.unwrap();
let http_relay = HttpRelay::start().await.unwrap();
let http_relay_url = http_relay.local_link_url().await.unwrap();
let keypair = Keypair::random();
let pubky = keypair.public_key();
@@ -377,12 +282,8 @@ mod tests {
"/pub/pubky.app/:rw,/pub/foo.bar/file:r".try_into().unwrap();
let client = Client::test(&testnet);
let (pubkyauth_url, pubkyauth_response) = client
.auth_request(
&format!("http://{}", http_relay_url.to_string()),
&capabilities,
)
.unwrap();
let (pubkyauth_url, pubkyauth_response) =
client.auth_request(http_relay_url, &capabilities).unwrap();
// Authenticator side
{