refactor(homeserver): separate the core and io modules

This commit is contained in:
nazeh
2024-12-03 16:01:44 +03:00
parent 6a41b490f0
commit 8c40cd2ddf
28 changed files with 243 additions and 222 deletions

View File

@@ -162,6 +162,10 @@ impl Config {
}
}
pub fn is_testnet(&self) -> bool {
self.testnet
}
pub fn port(&self) -> u16 {
self.port
}

View File

@@ -1,6 +1,6 @@
use heed::{Env, RwTxn};
use crate::database::tables::{blobs, entries, events, sessions, users};
use crate::core::database::tables::{blobs, entries, events, sessions, users};
pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?;

View File

@@ -1,3 +1,5 @@
//! Internal database in [crate::HomeserverCore]
use std::{fs, path::PathBuf};
use heed::{Env, EnvOpenOptions};
@@ -5,7 +7,7 @@ use heed::{Env, EnvOpenOptions};
mod migrations;
pub mod tables;
use crate::config::Config;
use crate::core::config::Config;
use tables::{Tables, TABLES_COUNT};

View File

@@ -1,6 +1,6 @@
use heed::{types::Bytes, Database, RoTxn};
use crate::database::DB;
use crate::core::database::DB;
use super::entries::Entry;

View File

@@ -18,7 +18,7 @@ use pubky_common::{
timestamp::Timestamp,
};
use crate::database::DB;
use crate::core::database::DB;
use super::events::Event;
@@ -447,13 +447,13 @@ mod tests {
use bytes::Bytes;
use pkarr::{mainline::Testnet, Keypair};
use crate::config::Config;
use crate::Config;
use super::DB;
#[tokio::test]
async fn entries() -> anyhow::Result<()> {
let mut db = DB::open(Config::test(&Testnet::new(0).unwrap())).unwrap();
let mut db = unsafe { DB::open(Config::test(&Testnet::new(0).unwrap())).unwrap() };
let keypair = Keypair::random();
let public_key = keypair.public_key();
@@ -495,7 +495,7 @@ mod tests {
#[tokio::test]
async fn chunked_entry() -> anyhow::Result<()> {
let mut db = DB::open(Config::test(&Testnet::new(0).unwrap())).unwrap();
let mut db = unsafe { DB::open(Config::test(&Testnet::new(0).unwrap())).unwrap() };
let keypair = Keypair::random();
let public_key = keypair.public_key();

View File

@@ -10,7 +10,7 @@ use heed::{
use postcard::{from_bytes, to_allocvec};
use serde::{Deserialize, Serialize};
use crate::database::DB;
use crate::core::database::DB;
/// Event [Timestamp] base32 => Encoded event.
pub type EventsTable = Database<Str, Bytes>;

View File

@@ -6,7 +6,7 @@ use pkarr::PublicKey;
use pubky_common::session::Session;
use tower_cookies::Cookies;
use crate::database::DB;
use crate::core::database::DB;
/// session secret => Session.
pub type SessionsTable = Database<Str, Bytes>;

View File

@@ -10,7 +10,7 @@ use axum::{
use pkarr::PublicKey;
use crate::error::{Error, Result};
use crate::core::error::{Error, Result};
#[derive(Debug)]
pub struct Pubky(PublicKey);

View File

@@ -1,6 +1,6 @@
use anyhow::Result;
use axum::{extract::Request, response::Response, Router};
use pkarr::PublicKey;
use pkarr::{Keypair, PublicKey};
use pubky_common::{
auth::AuthVerifier, capabilities::Capability, crypto::random_bytes, session::Session,
timestamp::Timestamp,
@@ -8,22 +8,26 @@ use pubky_common::{
use tower::ServiceExt;
use tower_cookies::{cookie::SameSite, Cookie};
use crate::{
config::Config,
database::{tables::users::User, DB},
};
mod config;
mod database;
mod error;
mod extractors;
mod routes;
use database::{tables::users::User, DB};
pub use config::Config;
#[derive(Clone, Debug)]
pub(crate) struct AppState {
pub(crate) verifier: AuthVerifier,
pub(crate) db: DB,
pub(crate) pkarr_client: pkarr::Client,
pub(crate) config: Config,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
/// An I/O-less Core of the [Homeserver].
pub struct HomeserverCore {
pub(crate) config: Config,
pub(crate) state: AppState,
pub(crate) router: Router,
}
@@ -32,33 +36,20 @@ impl HomeserverCore {
/// # Safety
/// HomeserverCore uses LMDB, [opening][heed::EnvOpenOptions::open] which comes with some safety precautions.
pub unsafe fn new(config: &Config) -> Result<Self> {
tracing::debug!(?config);
let db = unsafe { DB::open(config.clone())? };
let mut dht_settings = pkarr::mainline::Settings::default();
if let Some(bootstrap) = config.bootstrap() {
dht_settings = dht_settings.bootstrap(&bootstrap);
}
if let Some(request_timeout) = config.dht_request_timeout() {
dht_settings = dht_settings.request_timeout(request_timeout);
}
let pkarr_client = pkarr::Client::builder()
.dht_settings(dht_settings)
.build()?;
let state = AppState {
verifier: AuthVerifier::default(),
db,
pkarr_client: pkarr_client.clone(),
config: config.clone(),
};
let router = crate::routes::create_app(state.clone());
let router = routes::create_app(state.clone());
Ok(Self { state, router })
Ok(Self {
state,
router,
config: config.clone(),
})
}
#[cfg(test)]
@@ -69,6 +60,18 @@ impl HomeserverCore {
unsafe { HomeserverCore::new(&Config::test(&testnet)) }
}
// === Getters ===
pub fn keypair(&self) -> &Keypair {
self.config.keypair()
}
pub fn public_key(&self) -> PublicKey {
self.config.keypair().public_key()
}
// === Public Methods ===
// TODO: move this logic to a common place.
pub fn create_user(&mut self, public_key: &PublicKey) -> Result<Cookie> {
let mut wtxn = self.state.db.env.write_txn()?;

View File

@@ -9,11 +9,11 @@ use tower_cookies::{cookie::SameSite, Cookie, Cookies};
use pubky_common::{crypto::random_bytes, session::Session, timestamp::Timestamp};
use crate::{
core::AppState,
use crate::core::{
database::tables::users::User,
error::{Error, Result},
extractors::Pubky,
AppState,
};
pub async fn signup(

View File

@@ -6,10 +6,10 @@ use axum::{
};
use pubky_common::timestamp::Timestamp;
use crate::{
core::AppState,
use crate::core::{
error::{Error, Result},
extractors::ListQueryParams,
AppState,
};
pub async fn feed(

View File

@@ -1,3 +1,5 @@
//! The controller part of the [crate::HomeserverCore]
use axum::{
extract::DefaultBodyLimit,
routing::{delete, get, head, post, put},
@@ -8,11 +10,11 @@ use tower_http::{cors::CorsLayer, trace::TraceLayer};
use crate::core::AppState;
use self::pkarr::pkarr_router;
// use self::pkarr::pkarr_router;
mod auth;
mod feed;
mod pkarr;
// mod pkarr;
mod public;
mod root;
@@ -56,7 +58,7 @@ fn base(state: AppState) -> Router {
pub fn create_app(state: AppState) -> Router {
base(state.clone())
// TODO: Only enable this for test environments?
.nest("/pkarr", pkarr_router(state))
// .nest("/pkarr", pkarr_router(state))
.layer(CorsLayer::very_permissive())
.layer(TraceLayer::new_for_http())
}

View File

@@ -10,10 +10,10 @@ use futures_util::stream::StreamExt;
use pkarr::SignedPacket;
use crate::{
core::AppState,
use crate::core::{
error::{Error, Result},
extractors::Pubky,
AppState,
};
/// Pkarr relay, helpful for testing.

View File

@@ -2,9 +2,9 @@ use axum::http::StatusCode;
use pkarr::PublicKey;
use tower_cookies::Cookies;
use crate::{
core::AppState,
use crate::core::{
error::{Error, Result},
AppState,
};
pub mod read;

View File

@@ -8,11 +8,11 @@ use httpdate::HttpDate;
use pkarr::PublicKey;
use std::str::FromStr;
use crate::{
core::AppState,
use crate::core::{
database::tables::entries::Entry,
error::{Error, Result},
extractors::{EntryPath, ListQueryParams, Pubky},
AppState,
};
use super::verify;

View File

@@ -5,10 +5,10 @@ use futures_util::stream::StreamExt;
use axum::{body::Body, extract::State, http::StatusCode, response::IntoResponse};
use tower_cookies::Cookies;
use crate::{
core::AppState,
use crate::core::{
error::{Error, Result},
extractors::{EntryPath, Pubky},
AppState,
};
use super::{authorize, verify};

View File

@@ -0,0 +1,34 @@
//! Http server around the HomeserverCore
use std::{
net::{SocketAddr, TcpListener},
sync::Arc,
};
use anyhow::Result;
use axum_server::{
tls_rustls::{RustlsAcceptor, RustlsConfig},
Handle,
};
use crate::core::HomeserverCore;
pub(crate) async fn start(core: HomeserverCore) -> Result<Handle> {
let listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], core.config.port())))?;
let acceptor = RustlsAcceptor::new(RustlsConfig::from_config(Arc::new(
core.keypair().to_rpk_rustls_server_config(),
)));
let server = axum_server::from_tcp(listener).acceptor(acceptor);
let handle = Handle::new();
tokio::spawn(
server.handle(handle.clone()).serve(
core.router
.into_make_service_with_connect_info::<SocketAddr>(),
),
);
Ok(handle)
}

View File

@@ -0,0 +1,71 @@
use ::pkarr::{mainline::Testnet, PublicKey};
use anyhow::Result;
use axum_server::Handle;
use pkarr::PkarrServer;
use tracing::info;
use crate::{Config, HomeserverCore};
mod http;
mod pkarr;
#[derive(Debug)]
/// Homeserver [Core][HomeserverCore] + I/O (http server and pkarr publishing).
pub struct Homeserver {
handle: Handle,
core: HomeserverCore,
}
impl Homeserver {
/// # Safety
/// Homeserver uses LMDB, [opening][heed::EnvOpenOptions::open] which comes with some safety precautions.
pub async unsafe fn start(config: Config) -> Result<Self> {
tracing::debug!(?config, "Starting homeserver with configurations");
let core = unsafe { HomeserverCore::new(&config)? };
let handle = http::start(core.clone()).await?;
let port = handle
.listening()
.await
.expect("Homeserver listening")
.port();
info!("Homeserver listening on http://localhost:{port}");
info!("Publishing Pkarr packet..");
let pkarr_server = PkarrServer::new(config)?;
pkarr_server.publish_server_packet(port).await?;
info!("Homeserver listening on https://{}", core.public_key());
Ok(Self { handle, core })
}
/// Test version of [Homeserver::start], using mainline Testnet, and a temporary storage.
pub async fn start_test(testnet: &Testnet) -> Result<Self> {
info!("Running testnet..");
unsafe { Homeserver::start(Config::test(testnet)).await }
}
// === Getters ===
pub fn public_key(&self) -> PublicKey {
self.core.public_key()
}
/// Return the `https://<server public key>` url
pub fn url(&self) -> url::Url {
url::Url::parse(&format!("https://{}", self.public_key())).expect("valid url")
}
// === Public Methods ===
/// Send a shutdown signal to all open resources
pub fn shutdown(&self) {
self.handle.shutdown();
}
}

View File

@@ -0,0 +1,66 @@
//! Pkarr related task
use anyhow::Result;
use pkarr::{dns::rdata::SVCB, SignedPacket};
use crate::Config;
pub struct PkarrServer {
client: pkarr::Client,
config: Config,
}
impl PkarrServer {
pub fn new(config: Config) -> Result<Self> {
let mut dht_settings = pkarr::mainline::Settings::default();
if let Some(bootstrap) = config.bootstrap() {
dht_settings = dht_settings.bootstrap(&bootstrap);
}
if let Some(request_timeout) = config.dht_request_timeout() {
dht_settings = dht_settings.request_timeout(request_timeout);
}
let client = pkarr::Client::builder()
.dht_settings(dht_settings)
.build()?;
Ok(Self { client, config })
}
pub async fn publish_server_packet(&self, port: u16) -> anyhow::Result<()> {
// TODO: Try to resolve first before publishing.
let default = ".".to_string();
let target = self.config.domain().unwrap_or(&default);
let mut svcb = SVCB::new(0, target.as_str().try_into()?);
svcb.priority = 1;
svcb.set_port(port);
let mut signed_packet_builder =
SignedPacket::builder().https(".".try_into().unwrap(), svcb.clone(), 60 * 60);
if self.config.domain().is_none() {
// TODO: remove after remvoing Pubky shared/public
// and add local host IP address instead.
svcb.target = "localhost".try_into().unwrap();
signed_packet_builder = signed_packet_builder
.https(".".try_into().unwrap(), svcb, 60 * 60)
.address(
".".try_into().unwrap(),
"127.0.0.1".parse().unwrap(),
60 * 60,
);
}
// TODO: announce A/AAAA records as well for TLS connections?
let signed_packet = signed_packet_builder.build(self.config.keypair())?;
self.client.publish(&signed_packet).await?;
Ok(())
}
}

View File

@@ -1,11 +1,6 @@
pub mod config;
mod core;
mod database;
mod error;
mod extractors;
mod pkarr;
mod routes;
mod server;
mod io;
pub use core::Config;
pub use core::HomeserverCore;
pub use server::Homeserver;
pub use io::Homeserver;

View File

@@ -1,7 +1,7 @@
use std::path::PathBuf;
use anyhow::Result;
use pubky_homeserver::{config::Config, Homeserver};
use pubky_homeserver::{Config, Homeserver};
use clap::Parser;
@@ -42,7 +42,11 @@ async fn main() -> Result<()> {
.await?
};
server.run_until_done().await?;
tokio::signal::ctrl_c().await?;
tracing::info!("Shutting down Homeserver");
server.shutdown();
Ok(())
}

View File

@@ -1,45 +0,0 @@
//! Pkarr related task
use pkarr::{dns::rdata::SVCB, SignedPacket};
use crate::config::Config;
pub async fn publish_server_packet(
pkarr_client: &pkarr::Client,
config: &Config,
port: u16,
) -> anyhow::Result<()> {
// TODO: Try to resolve first before publishing.
let default = ".".to_string();
let target = config.domain().unwrap_or(&default);
let mut svcb = SVCB::new(0, target.as_str().try_into()?);
svcb.priority = 1;
svcb.set_port(port);
let mut signed_packet_builder =
SignedPacket::builder().https(".".try_into().unwrap(), svcb.clone(), 60 * 60);
if config.domain().is_none() {
// TODO: remove after remvoing Pubky shared/public
// and add local host IP address instead.
svcb.target = "localhost".try_into().unwrap();
signed_packet_builder = signed_packet_builder
.https(".".try_into().unwrap(), svcb, 60 * 60)
.address(
".".try_into().unwrap(),
"127.0.0.1".parse().unwrap(),
60 * 60,
);
}
// TODO: announce A/AAAA records as well for TLS connections?
let signed_packet = signed_packet_builder.build(config.keypair())?;
pkarr_client.publish(&signed_packet).await?;
Ok(())
}

View File

@@ -1,115 +0,0 @@
use std::{
net::{SocketAddr, TcpListener},
sync::Arc,
};
use anyhow::{Error, Result};
use axum_server::tls_rustls::{RustlsAcceptor, RustlsConfig};
use tokio::task::JoinSet;
use tracing::{info, warn};
use pkarr::{mainline::Testnet, PublicKey};
use crate::{
config::Config,
core::{AppState, HomeserverCore},
pkarr::publish_server_packet,
};
#[derive(Debug)]
/// Homeserver [Core][HomeserverCore] + http server.
pub struct Homeserver {
state: AppState,
tasks: JoinSet<std::io::Result<()>>,
}
impl Homeserver {
/// # Safety
/// Homeserver uses LMDB, [opening][heed::EnvOpenOptions::open] which comes with some safety precautions.
pub async unsafe fn start(config: Config) -> Result<Self> {
let mut tasks = JoinSet::new();
let listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], config.port())))?;
let port = listener.local_addr()?.port();
let keypair = config.keypair().clone();
let acceptor = RustlsAcceptor::new(RustlsConfig::from_config(Arc::new(
keypair.to_rpk_rustls_server_config(),
)));
let server = axum_server::from_tcp(listener).acceptor(acceptor);
let core = unsafe { HomeserverCore::new(&config)? };
// Spawn http server task
tasks.spawn(
server.serve(
core.router
.into_make_service_with_connect_info::<SocketAddr>(),
),
);
info!("Homeserver listening on http://localhost:{port}");
info!("Publishing Pkarr packet..");
publish_server_packet(&core.state.pkarr_client, &config, port).await?;
info!("Homeserver listening on https://{}", keypair.public_key());
Ok(Self {
tasks,
state: core.state,
})
}
/// Test version of [Homeserver::start], using mainline Testnet, and a temporary storage.
pub async fn start_test(testnet: &Testnet) -> Result<Self> {
info!("Running testnet..");
unsafe { Homeserver::start(Config::test(testnet)).await }
}
// === Getters ===
pub fn public_key(&self) -> PublicKey {
self.state.config.keypair().public_key()
}
/// Return the `https://<server public key>` url
pub fn url(&self) -> url::Url {
url::Url::parse(&format!("https://{}", self.public_key())).expect("valid url")
}
// === Public Methods ===
/// Shutdown the server and wait for all tasks to complete.
pub async fn shutdown(mut self) -> Result<()> {
self.tasks.abort_all();
self.run_until_done().await?;
Ok(())
}
/// Wait for all tasks to complete.
///
/// Runs forever unless tasks fail.
pub async fn run_until_done(mut self) -> Result<()> {
let mut final_res: Result<()> = Ok(());
while let Some(res) = self.tasks.join_next().await {
match res {
Ok(Ok(())) => {}
Err(err) if err.is_cancelled() => {}
Ok(Err(err)) => {
warn!(?err, "task failed");
final_res = Err(Error::from(err));
}
Err(err) => {
warn!(?err, "task panicked");
final_res = Err(err.into());
}
}
}
final_res
}
}