Merge pull request #21 from pubky/feat/basic-data-store

Feat/basic data store
This commit is contained in:
Nuh
2024-07-23 21:16:29 +03:00
committed by nazeh
25 changed files with 893 additions and 346 deletions

18
Cargo.lock generated
View File

@@ -142,6 +142,8 @@ dependencies = [
"mime",
"pin-project-lite",
"serde",
"tokio",
"tokio-util",
"tower",
"tower-layer",
"tower-service",
@@ -1248,6 +1250,7 @@ dependencies = [
name = "pubky"
version = "0.1.0"
dependencies = [
"bytes",
"flume",
"pkarr",
"pubky-common",
@@ -1283,6 +1286,8 @@ dependencies = [
"base32",
"bytes",
"dirs-next",
"flume",
"futures-util",
"heed",
"pkarr",
"postcard",
@@ -1826,6 +1831,19 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-util"
version = "0.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tower"
version = "0.4.13"

View File

@@ -8,6 +8,8 @@ pub type Hash = blake3::Hash;
pub use blake3::hash;
pub use blake3::Hasher;
pub fn random_hash() -> Hash {
let mut rng = rand::thread_rng();
Hash::from_bytes(rng.gen())

View File

@@ -6,10 +6,12 @@ edition = "2021"
[dependencies]
anyhow = "1.0.82"
axum = "0.7.5"
axum-extra = { version = "0.9.3", features = ["typed-header"] }
axum-extra = { version = "0.9.3", features = ["typed-header", "async-read-body"] }
base32 = "0.5.1"
bytes = "1.6.1"
dirs-next = "2.0.0"
flume = "0.11.0"
futures-util = "0.3.30"
heed = "0.20.3"
pkarr = { version = "2.1.0", features = ["async"] }
postcard = { version = "1.0.8", features = ["alloc"] }

View File

@@ -40,13 +40,16 @@ impl Config {
/// Test configurations
pub fn test(testnet: &pkarr::mainline::Testnet) -> Self {
let bootstrap = Some(testnet.bootstrap.to_owned());
let storage = Some(
std::env::temp_dir()
.join(Timestamp::now().to_string())
.join(DEFAULT_STORAGE_DIR),
);
Self {
bootstrap: Some(testnet.bootstrap.to_owned()),
storage: Some(
std::env::temp_dir()
.join(Timestamp::now().to_string())
.join(DEFAULT_STORAGE_DIR),
),
bootstrap,
storage,
..Default::default()
}
}

View File

@@ -1,16 +1,23 @@
use std::fs;
use std::path::Path;
use bytes::Bytes;
use heed::{types::Str, Database, Env, EnvOpenOptions, RwTxn};
mod migrations;
pub mod tables;
use migrations::TABLES_COUNT;
use pubky_common::crypto::Hasher;
use tables::{entries::Entry, Tables, TABLES_COUNT};
use pkarr::PublicKey;
use tables::blobs::{BlobsTable, BLOBS_TABLE};
#[derive(Debug, Clone)]
pub struct DB {
pub(crate) env: Env,
pub(crate) tables: Tables,
}
impl DB {
@@ -19,21 +26,110 @@ impl DB {
let env = unsafe { EnvOpenOptions::new().max_dbs(TABLES_COUNT).open(storage) }?;
let db = DB { env };
let tables = migrations::run(&env)?;
db.run_migrations();
let db = DB { env, tables };
Ok(db)
}
fn run_migrations(&self) -> anyhow::Result<()> {
pub fn put_entry(
&mut self,
public_key: &PublicKey,
path: &str,
rx: flume::Receiver<Bytes>,
) -> anyhow::Result<()> {
let mut wtxn = self.env.write_txn()?;
migrations::create_users_table(&self.env, &mut wtxn);
migrations::create_sessions_table(&self.env, &mut wtxn);
let mut hasher = Hasher::new();
let mut bytes = vec![];
let mut length = 0;
while let Ok(chunk) = rx.recv() {
hasher.update(&chunk);
bytes.extend_from_slice(&chunk);
length += chunk.len();
}
let hash = hasher.finalize();
self.tables.blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?;
let mut entry = Entry::new();
entry.set_content_hash(hash);
entry.set_content_length(length);
let mut key = vec![];
key.extend_from_slice(public_key.as_bytes());
key.extend_from_slice(path.as_bytes());
self.tables.entries.put(&mut wtxn, &key, &entry.serialize());
wtxn.commit()?;
Ok(())
}
pub fn get_blob(
&mut self,
public_key: &PublicKey,
path: &str,
) -> anyhow::Result<Option<Bytes>> {
let mut rtxn = self.env.read_txn()?;
let mut key = vec![];
key.extend_from_slice(public_key.as_bytes());
key.extend_from_slice(path.as_bytes());
if let Some(bytes) = self.tables.entries.get(&rtxn, &key)? {
let entry = Entry::deserialize(bytes)?;
if let Some(blob) = self.tables.blobs.get(&rtxn, entry.content_hash())? {
return Ok(Some(Bytes::from(blob.to_vec())));
};
};
Ok(None)
}
}
#[cfg(test)]
mod tests {
use pkarr::Keypair;
use pubky_common::timestamp::Timestamp;
use crate::config::Config;
use super::{Bytes, DB};
#[tokio::test]
async fn entries() {
let storage = std::env::temp_dir()
.join(Timestamp::now().to_string())
.join("pubky");
let mut db = DB::open(&storage).unwrap();
let keypair = Keypair::random();
let path = "/pub/foo.txt";
let (tx, rx) = flume::bounded::<Bytes>(0);
let mut cloned = db.clone();
let cloned_keypair = keypair.clone();
let done = tokio::task::spawn_blocking(move || {
cloned.put_entry(&cloned_keypair.public_key(), path, rx);
});
tx.send(vec![1, 2, 3, 4, 5].into());
drop(tx);
done.await;
let blob = db.get_blob(&keypair.public_key(), path).unwrap().unwrap();
assert_eq!(blob, Bytes::from(vec![1, 2, 3, 4, 5]));
}
}

View File

@@ -1,19 +1,17 @@
use heed::{types::Str, Database, Env, RwTxn};
use super::tables;
mod m0;
pub const TABLES_COUNT: u32 = 2;
use super::tables::Tables;
pub fn create_users_table(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: tables::users::UsersTable =
env.create_database(wtxn, Some(tables::users::USERS_TABLE))?;
pub fn run(env: &Env) -> anyhow::Result<Tables> {
let mut wtxn = env.write_txn()?;
Ok(())
}
pub fn create_sessions_table(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: tables::sessions::SessionsTable =
env.create_database(wtxn, Some(tables::sessions::SESSIONS_TABLE))?;
Ok(())
m0::run(env, &mut wtxn);
let tables = Tables::new(env, &mut wtxn)?;
wtxn.commit()?;
Ok(tables)
}

View File

@@ -0,0 +1,15 @@
use heed::{types::Str, Database, Env, RwTxn};
use crate::database::tables::{blobs, entries, sessions, users};
pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?;
let _: sessions::SessionsTable = env.create_database(wtxn, Some(sessions::SESSIONS_TABLE))?;
let _: blobs::BlobsTable = env.create_database(wtxn, Some(blobs::BLOBS_TABLE))?;
let _: entries::EntriesTable = env.create_database(wtxn, Some(entries::ENTRIES_TABLE))?;
Ok(())
}

View File

@@ -1,2 +1,30 @@
pub mod blobs;
pub mod entries;
pub mod sessions;
pub mod users;
use heed::{Env, RwTxn};
use blobs::{BlobsTable, BLOBS_TABLE};
use entries::{EntriesTable, ENTRIES_TABLE};
pub const TABLES_COUNT: u32 = 4;
#[derive(Debug, Clone)]
pub struct Tables {
pub blobs: BlobsTable,
pub entries: EntriesTable,
}
impl Tables {
pub fn new(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<Self> {
Ok(Self {
blobs: env
.open_database(wtxn, Some(BLOBS_TABLE))?
.expect("Blobs table already created"),
entries: env
.open_database(wtxn, Some(ENTRIES_TABLE))?
.expect("Entries table already created"),
})
}
}

View File

@@ -0,0 +1,11 @@
use std::{borrow::Cow, time::SystemTime};
use heed::{
types::{Bytes, Str},
BoxedError, BytesDecode, BytesEncode, Database,
};
/// hash of the blob => bytes.
pub type BlobsTable = Database<Bytes, Bytes>;
pub const BLOBS_TABLE: &str = "blobs";

View File

@@ -0,0 +1,83 @@
use postcard::{from_bytes, to_allocvec};
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, time::SystemTime};
use heed::{
types::{Bytes, Str},
BoxedError, BytesDecode, BytesEncode, Database,
};
use pubky_common::{crypto::Hash, timestamp::Timestamp};
/// full_path(pubky/*path) => Entry.
pub type EntriesTable = Database<Bytes, Bytes>;
pub const ENTRIES_TABLE: &str = "entries";
#[derive(Clone, Default, Serialize, Deserialize, Debug, Eq, PartialEq)]
pub struct Entry {
/// Encoding version
version: usize,
/// Modified at
timestamp: u64,
content_hash: [u8; 32],
content_length: usize,
content_type: String,
// user_metadata: ?
}
// TODO: get headers like Etag
impl Entry {
pub fn new() -> Self {
Self {
timestamp: Timestamp::now().into_inner(),
..Default::default()
}
}
// === Setters ===
pub fn set_content_hash(&mut self, content_hash: Hash) -> &mut Self {
content_hash.as_bytes().clone_into(&mut self.content_hash);
self
}
pub fn set_content_length(&mut self, content_length: usize) -> &mut Self {
self.content_length = content_length;
self
}
pub fn set_content_type(&mut self, content_type: &str) -> &mut Self {
self.content_type = content_type.to_string();
self
}
// === Getters ===
pub fn content_hash(&self) -> &[u8; 32] {
&self.content_hash
}
pub fn content_length(&self) -> usize {
self.content_length
}
pub fn content_type(&self) -> &str {
&self.content_type
}
// === Public Method ===
pub fn serialize(&self) -> Vec<u8> {
to_allocvec(self).expect("Session::serialize")
}
pub fn deserialize(bytes: &[u8]) -> core::result::Result<Self, postcard::Error> {
if bytes[0] > 0 {
panic!("Unknown Entry version");
}
from_bytes(bytes)
}
}

View File

@@ -54,25 +54,49 @@ impl IntoResponse for Error {
impl From<QueryRejection> for Error {
fn from(error: QueryRejection) -> Self {
Self::new(StatusCode::BAD_REQUEST, Some(error))
Self::new(StatusCode::BAD_REQUEST, error.into())
}
}
impl From<ExtensionRejection> for Error {
fn from(error: ExtensionRejection) -> Self {
Self::new(StatusCode::BAD_REQUEST, Some(error))
Self::new(StatusCode::BAD_REQUEST, error.into())
}
}
impl From<PathRejection> for Error {
fn from(error: PathRejection) -> Self {
Self::new(StatusCode::BAD_REQUEST, Some(error))
Self::new(StatusCode::BAD_REQUEST, error.into())
}
}
impl From<std::io::Error> for Error {
fn from(error: std::io::Error) -> Self {
Self::new(StatusCode::INTERNAL_SERVER_ERROR, Some(error))
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
}
}
impl From<heed::Error> for Error {
fn from(error: heed::Error) -> Self {
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
}
}
impl From<anyhow::Error> for Error {
fn from(error: anyhow::Error) -> Self {
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
}
}
impl From<postcard::Error> for Error {
fn from(error: postcard::Error) -> Self {
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
}
}
impl From<axum::Error> for Error {
fn from(error: axum::Error) -> Self {
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
}
}
@@ -89,11 +113,3 @@ impl From<pkarr::Error> for Error {
Self::new(StatusCode::BAD_REQUEST, Some(error))
}
}
impl From<heed::Error> for Error {
fn from(error: heed::Error) -> Self {
debug!(?error);
Self::with_status(StatusCode::INTERNAL_SERVER_ERROR)
}
}

View File

@@ -45,3 +45,36 @@ where
Ok(Pubky(public_key))
}
}
pub struct EntryPath(pub(crate) String);
impl EntryPath {
pub fn as_str(&self) -> &str {
self.0.as_str()
}
pub fn as_bytes(&self) -> &[u8] {
self.0.as_bytes()
}
}
#[async_trait]
impl<S> FromRequestParts<S> for EntryPath
where
S: Send + Sync,
{
type Rejection = Response;
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
let params: Path<HashMap<String, String>> =
parts.extract().await.map_err(IntoResponse::into_response)?;
// TODO: enforce path limits like no trailing '/'
let path = params
.get("path")
.ok_or_else(|| (StatusCode::NOT_FOUND, "entry path missing").into_response())?;
Ok(EntryPath(path.to_string()))
}
}

View File

@@ -1,4 +1,5 @@
use axum::{
extract::DefaultBodyLimit,
routing::{delete, get, post, put},
Router,
};
@@ -8,7 +9,7 @@ use tower_http::trace::TraceLayer;
use crate::server::AppState;
mod auth;
mod drive;
mod public;
mod root;
pub fn create_app(state: AppState) -> Router {
@@ -18,8 +19,12 @@ pub fn create_app(state: AppState) -> Router {
.route("/:pubky/session", get(auth::session))
.route("/:pubky/session", post(auth::signin))
.route("/:pubky/session", delete(auth::signout))
.route("/:pubky/*key", get(drive::put))
.route("/:pubky/*path", put(public::put))
.route("/:pubky/*path", get(public::get))
.layer(TraceLayer::new_for_http())
.layer(CookieManagerLayer::new())
// TODO: revisit if we enable streaming big payloads
// TODO: maybe add to a separate router (drive router?).
.layer(DefaultBodyLimit::max(16 * 1024))
.with_state(state)
}

View File

@@ -2,7 +2,6 @@ use axum::{
extract::{Request, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
routing::get,
Router,
};
use axum_extra::{headers::UserAgent, TypedHeader};
@@ -103,7 +102,11 @@ pub async fn signin(
state.verifier.verify(&body, public_key)?;
let mut wtxn = state.db.env.write_txn()?;
let users: UsersTable = state.db.env.create_database(&mut wtxn, Some(USERS_TABLE))?;
let users: UsersTable = state
.db
.env
.open_database(&wtxn, Some(USERS_TABLE))?
.expect("Users table already created");
if let Some(existing) = users.get(&wtxn, public_key)? {
users.put(&mut wtxn, public_key, &existing)?;

View File

@@ -1,11 +0,0 @@
use axum::response::IntoResponse;
use tracing::debug;
use crate::extractors::Pubky;
pub async fn put(pubky: Pubky) -> Result<impl IntoResponse, String> {
debug!(pubky=?pubky.public_key());
Ok("Pubky drive...".to_string())
}

View File

@@ -0,0 +1,83 @@
use axum::{
body::{Body, Bytes},
extract::{Path, State},
http::StatusCode,
response::IntoResponse,
RequestExt, Router,
};
use axum_extra::body::AsyncReadBody;
use futures_util::stream::StreamExt;
use tracing::debug;
use pubky_common::crypto::Hasher;
use crate::{
database::tables::{
blobs::{BlobsTable, BLOBS_TABLE},
entries::{EntriesTable, Entry, ENTRIES_TABLE},
},
error::{Error, Result},
extractors::{EntryPath, Pubky},
server::AppState,
};
pub async fn put(
State(mut state): State<AppState>,
pubky: Pubky,
path: EntryPath,
mut body: Body,
) -> Result<impl IntoResponse> {
// TODO: return an error if path does not start with '/pub/'
let mut stream = body.into_data_stream();
let (tx, rx) = flume::bounded::<Bytes>(1);
// TODO: refactor Database to clean up this scope.
let done = tokio::task::spawn_blocking(move || -> Result<()> {
// TODO: this is a blocking operation, which is ok for small
// payloads (we have 16 kb limit for now) but later we need
// to stream this to filesystem, and keep track of any failed
// writes to GC these files later.
let public_key = pubky.public_key();
// TODO: Authorize
state.db.put_entry(public_key, path.as_str(), rx);
Ok(())
});
while let Some(next) = stream.next().await {
let chunk = next?;
tx.send(chunk);
}
drop(tx);
done.await.expect("join error")?;
// TODO: return relevant headers, like Etag?
Ok(())
}
pub async fn get(
State(mut state): State<AppState>,
pubky: Pubky,
path: EntryPath,
) -> Result<impl IntoResponse> {
// TODO: check the path, return an error if doesn't start with `/pub/`
// TODO: Enable streaming
let public_key = pubky.public_key();
match state.db.get_blob(public_key, path.as_str()) {
Err(error) => Err(error)?,
Ok(Some(bytes)) => Ok(bytes),
Ok(None) => Err(Error::with_status(StatusCode::NOT_FOUND)),
}
}

View File

@@ -3,7 +3,7 @@ use std::{future::IntoFuture, net::SocketAddr};
use anyhow::{Error, Result};
use pubky_common::auth::AuthnVerifier;
use tokio::{net::TcpListener, signal, task::JoinSet};
use tracing::{info, warn};
use tracing::{debug, info, warn};
use pkarr::{
mainline::dht::{DhtSettings, Testnet},
@@ -27,6 +27,8 @@ pub(crate) struct AppState {
impl Homeserver {
pub async fn start(config: Config) -> Result<Self> {
debug!(?config);
let public_key = config.keypair().public_key();
let db = DB::open(&config.storage()?)?;

View File

@@ -11,6 +11,7 @@ ureq = { version = "2.10.0", features = ["cookies"] }
thiserror = "1.0.62"
url = "2.5.2"
flume = { version = "0.11.0", features = ["select", "eventual-fairness"], default-features = false }
bytes = "1.6.1"
[dev-dependencies]
pubky_homeserver = { path = "../pubky-homeserver" }

View File

@@ -1,18 +1,15 @@
mod auth;
mod pkarr;
mod public;
use std::{collections::HashMap, fmt::format, time::Duration};
use pkarr::{
dns::{rdata::SVCB, Packet},
mainline::{dht::DhtSettings, Testnet},
Keypair, PkarrClient, PublicKey, Settings, SignedPacket,
};
use ureq::{Agent, Response};
use url::Url;
use pubky_common::{auth::AuthnSignature, session::Session};
use crate::error::{Error, Result};
const MAX_RECURSIVE_PUBKY_HOMESERVER_RESOLUTION: u8 = 3;
use pkarr::{DhtSettings, PkarrClient, Settings, Testnet};
#[derive(Debug, Clone)]
pub struct PubkyClient {
@@ -45,174 +42,8 @@ impl PubkyClient {
// === Public Methods ===
/// Signup to a homeserver and update Pkarr accordingly.
///
/// The homeserver is a Pkarr domain name, where the TLD is a Pkarr public key
/// for example "pubky.o4dksfbqk85ogzdb5osziw6befigbuxmuxkuxq8434q89uj56uyy"
pub fn signup(&self, keypair: &Keypair, homeserver: &str) -> Result<()> {
let (audience, mut url) = self.resolve_endpoint(homeserver)?;
url.set_path(&format!("/{}", keypair.public_key()));
self.request(HttpMethod::Put, &url)
.send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes())
.map_err(Box::new)?;
self.publish_pubky_homeserver(keypair, homeserver);
Ok(())
}
/// Check the current sesison for a given Pubky in its homeserver.
pub fn session(&self, pubky: &PublicKey) -> Result<Session> {
let (homeserver, mut url) = self.resolve_pubky_homeserver(pubky)?;
url.set_path(&format!("/{}/session", pubky));
let mut bytes = vec![];
let result = self.request(HttpMethod::Get, &url).call().map_err(Box::new);
if let Ok(reader) = result {
reader.into_reader().read_to_end(&mut bytes);
} else {
return Err(Error::NotSignedIn);
}
Ok(Session::deserialize(&bytes)?)
}
/// Signout from a homeserver.
pub fn signout(&self, pubky: &PublicKey) -> Result<()> {
let (homeserver, mut url) = self.resolve_pubky_homeserver(pubky)?;
url.set_path(&format!("/{}/session", pubky));
self.request(HttpMethod::Delete, &url)
.call()
.map_err(Box::new)?;
Ok(())
}
/// Signin to a homeserver.
pub fn signin(&self, keypair: &Keypair) -> Result<()> {
let pubky = keypair.public_key();
let (audience, mut url) = self.resolve_pubky_homeserver(&pubky)?;
url.set_path(&format!("/{}/session", &pubky));
self.request(HttpMethod::Post, &url)
.send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes())
.map_err(Box::new)?;
Ok(())
}
// === Private Methods ===
/// Publish the SVCB record for `_pubky.<public_key>`.
pub(crate) fn publish_pubky_homeserver(&self, keypair: &Keypair, host: &str) -> Result<()> {
let mut packet = Packet::new_reply(0);
if let Some(existing) = self.pkarr.resolve(&keypair.public_key())? {
for answer in existing.packet().answers.iter().cloned() {
if !answer.name.to_string().starts_with("_pubky") {
packet.answers.push(answer.into_owned())
}
}
}
let svcb = SVCB::new(0, host.try_into()?);
packet.answers.push(pkarr::dns::ResourceRecord::new(
"_pubky".try_into().unwrap(),
pkarr::dns::CLASS::IN,
60 * 60,
pkarr::dns::rdata::RData::SVCB(svcb),
));
let signed_packet = SignedPacket::from_packet(keypair, &packet)?;
self.pkarr.publish(&signed_packet)?;
Ok(())
}
/// Resolve the homeserver for a pubky.
pub(crate) fn resolve_pubky_homeserver(&self, pubky: &PublicKey) -> Result<(PublicKey, Url)> {
let target = format!("_pubky.{}", pubky);
self.resolve_endpoint(&target)
.map_err(|_| Error::Generic("Could not resolve homeserver".to_string()))
}
/// Resolve a service's public_key and clearnet url from a Pubky domain
fn resolve_endpoint(&self, target: &str) -> Result<(PublicKey, Url)> {
// TODO: cache the result of this function?
// TODO: use MAX_RECURSIVE_PUBKY_HOMESERVER_RESOLUTION
// TODO: move to common?
let mut target = target.to_string();
let mut homeserver_public_key = None;
let mut host = target.clone();
// PublicKey is very good at extracting the Pkarr TLD from a string.
while let Ok(public_key) = PublicKey::try_from(target.clone()) {
if let Some(signed_packet) = self.pkarr.resolve(&public_key)? {
let mut prior = None;
for answer in signed_packet.resource_records(&target) {
if let pkarr::dns::rdata::RData::SVCB(svcb) = &answer.rdata {
if svcb.priority == 0 {
prior = Some(svcb)
} else if let Some(sofar) = prior {
if svcb.priority >= sofar.priority {
prior = Some(svcb)
}
// TODO return random if priority is the same
} else {
prior = Some(svcb)
}
}
}
if let Some(svcb) = prior {
homeserver_public_key = Some(public_key);
target = svcb.target.to_string();
if let Some(port) = svcb.get_param(pkarr::dns::rdata::SVCB::PORT) {
if port.len() < 2 {
// TODO: debug! Error encoding port!
}
let port = u16::from_be_bytes([port[0], port[1]]);
host = format!("{target}:{port}");
} else {
host.clone_from(&target);
};
continue;
}
};
break;
}
if let Some(homeserver) = homeserver_public_key {
let url = if host.starts_with("localhost") {
format!("http://{host}")
} else {
format!("https://{host}")
};
return Ok((homeserver, Url::parse(&url)?));
}
Err(Error::Generic("Could not resolve endpoint".to_string()))
}
fn request(&self, method: HttpMethod, url: &Url) -> ureq::Request {
self.agent.request_url(method.into(), url)
}
@@ -242,70 +73,3 @@ impl From<HttpMethod> for &str {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use pkarr::{
dns::{rdata::SVCB, Packet},
mainline::{dht::DhtSettings, Testnet},
Keypair, PkarrClient, Settings, SignedPacket,
};
use pubky_homeserver::Homeserver;
#[tokio::test]
async fn resolve_homeserver() {
let testnet = Testnet::new(3);
let server = Homeserver::start_test(&testnet).await.unwrap();
// Publish an intermediate controller of the homeserver
let pkarr_client = PkarrClient::new(Settings {
dht: DhtSettings {
bootstrap: Some(testnet.bootstrap.clone()),
..Default::default()
},
..Default::default()
})
.unwrap()
.as_async();
let intermediate = Keypair::random();
let mut packet = Packet::new_reply(0);
let server_tld = server.public_key().to_string();
let mut svcb = SVCB::new(0, server_tld.as_str().try_into().unwrap());
packet.answers.push(pkarr::dns::ResourceRecord::new(
"pubky".try_into().unwrap(),
pkarr::dns::CLASS::IN,
60 * 60,
pkarr::dns::rdata::RData::SVCB(svcb),
));
let signed_packet = SignedPacket::from_packet(&intermediate, &packet).unwrap();
pkarr_client.publish(&signed_packet).await.unwrap();
tokio::task::spawn_blocking(move || {
let client = PubkyClient::test(&testnet);
let pubky = Keypair::random();
client
.publish_pubky_homeserver(&pubky, &format!("pubky.{}", &intermediate.public_key()));
let (public_key, url) = client
.resolve_pubky_homeserver(&pubky.public_key())
.unwrap();
assert_eq!(public_key, server.public_key());
assert_eq!(url.host_str(), Some("localhost"));
assert_eq!(url.port(), Some(server.port()));
})
.await
.expect("task failed")
}
}

128
pubky/src/client/auth.rs Normal file
View File

@@ -0,0 +1,128 @@
use crate::PubkyClient;
use pubky_common::{auth::AuthnSignature, session::Session};
use super::{Error, HttpMethod, Result};
use pkarr::{Keypair, PublicKey};
impl PubkyClient {
/// Signup to a homeserver and update Pkarr accordingly.
///
/// The homeserver is a Pkarr domain name, where the TLD is a Pkarr public key
/// for example "pubky.o4dksfbqk85ogzdb5osziw6befigbuxmuxkuxq8434q89uj56uyy"
pub fn signup(&self, keypair: &Keypair, homeserver: &str) -> Result<()> {
let (audience, mut url) = self.resolve_endpoint(homeserver)?;
url.set_path(&format!("/{}", keypair.public_key()));
self.request(HttpMethod::Put, &url)
.send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes())?;
self.publish_pubky_homeserver(keypair, homeserver);
Ok(())
}
/// Check the current sesison for a given Pubky in its homeserver.
///
/// Returns an [Error::NotSignedIn] if so, or [ureq::Error] if
/// the response has any other `>=400` status code.
pub fn session(&self, pubky: &PublicKey) -> Result<Session> {
let (homeserver, mut url) = self.resolve_pubky_homeserver(pubky)?;
url.set_path(&format!("/{}/session", pubky));
let mut bytes = vec![];
let result = self.request(HttpMethod::Get, &url).call().map_err(Box::new);
let reader = self.request(HttpMethod::Get, &url).call().map_err(|err| {
match err {
ureq::Error::Status(404, _) => Error::NotSignedIn,
// TODO: handle other types of errors
_ => err.into(),
}
})?;
reader.into_reader().read_to_end(&mut bytes);
Ok(Session::deserialize(&bytes)?)
}
/// Signout from a homeserver.
pub fn signout(&self, pubky: &PublicKey) -> Result<()> {
let (homeserver, mut url) = self.resolve_pubky_homeserver(pubky)?;
url.set_path(&format!("/{}/session", pubky));
self.request(HttpMethod::Delete, &url)
.call()
.map_err(Box::new)?;
Ok(())
}
/// Signin to a homeserver.
pub fn signin(&self, keypair: &Keypair) -> Result<()> {
let pubky = keypair.public_key();
let (audience, mut url) = self.resolve_pubky_homeserver(&pubky)?;
url.set_path(&format!("/{}/session", &pubky));
self.request(HttpMethod::Post, &url)
.send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes())
.map_err(Box::new)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::*;
use pkarr::{mainline::Testnet, Keypair};
use pubky_common::session::Session;
use pubky_homeserver::Homeserver;
#[tokio::test]
async fn basic_authn() {
let testnet = Testnet::new(3);
let server = Homeserver::start_test(&testnet).await.unwrap();
let client = PubkyClient::test(&testnet).as_async();
let keypair = Keypair::random();
client
.signup(&keypair, &server.public_key().to_string())
.await
.unwrap();
let session = client.session(&keypair.public_key()).await.unwrap();
assert_eq!(session, Session { ..session.clone() });
client.signout(&keypair.public_key()).await.unwrap();
{
let session = client.session(&keypair.public_key()).await;
assert!(session.is_err());
match session {
Err(Error::NotSignedIn) => {}
_ => assert!(false, "expected NotSignedInt error"),
}
}
client.signin(&keypair).await.unwrap();
{
let session = client.session(&keypair.public_key()).await.unwrap();
assert_eq!(session, Session { ..session.clone() });
}
}
}

179
pubky/src/client/pkarr.rs Normal file
View File

@@ -0,0 +1,179 @@
pub use pkarr::{
dns::{rdata::SVCB, Packet},
mainline::{dht::DhtSettings, Testnet},
Keypair, PkarrClient, PublicKey, Settings, SignedPacket,
};
use super::{Error, PubkyClient, Result, Url};
const MAX_RECURSIVE_PUBKY_HOMESERVER_RESOLUTION: u8 = 3;
impl PubkyClient {
/// Publish the SVCB record for `_pubky.<public_key>`.
pub(crate) fn publish_pubky_homeserver(&self, keypair: &Keypair, host: &str) -> Result<()> {
let mut packet = Packet::new_reply(0);
if let Some(existing) = self.pkarr.resolve(&keypair.public_key())? {
for answer in existing.packet().answers.iter().cloned() {
if !answer.name.to_string().starts_with("_pubky") {
packet.answers.push(answer.into_owned())
}
}
}
let svcb = SVCB::new(0, host.try_into()?);
packet.answers.push(pkarr::dns::ResourceRecord::new(
"_pubky".try_into().unwrap(),
pkarr::dns::CLASS::IN,
60 * 60,
pkarr::dns::rdata::RData::SVCB(svcb),
));
let signed_packet = SignedPacket::from_packet(keypair, &packet)?;
self.pkarr.publish(&signed_packet)?;
Ok(())
}
/// Resolve the homeserver for a pubky.
pub(crate) fn resolve_pubky_homeserver(&self, pubky: &PublicKey) -> Result<(PublicKey, Url)> {
let target = format!("_pubky.{}", pubky);
self.resolve_endpoint(&target)
.map_err(|_| Error::Generic("Could not resolve homeserver".to_string()))
}
/// Resolve a service's public_key and clearnet url from a Pubky domain
pub(crate) fn resolve_endpoint(&self, target: &str) -> Result<(PublicKey, Url)> {
// TODO: cache the result of this function?
// TODO: use MAX_RECURSIVE_PUBKY_HOMESERVER_RESOLUTION
// TODO: move to common?
let mut target = target.to_string();
let mut homeserver_public_key = None;
let mut host = target.clone();
// PublicKey is very good at extracting the Pkarr TLD from a string.
while let Ok(public_key) = PublicKey::try_from(target.clone()) {
if let Some(signed_packet) = self.pkarr.resolve(&public_key)? {
let mut prior = None;
for answer in signed_packet.resource_records(&target) {
if let pkarr::dns::rdata::RData::SVCB(svcb) = &answer.rdata {
if svcb.priority == 0 {
prior = Some(svcb)
} else if let Some(sofar) = prior {
if svcb.priority >= sofar.priority {
prior = Some(svcb)
}
// TODO return random if priority is the same
} else {
prior = Some(svcb)
}
}
}
if let Some(svcb) = prior {
homeserver_public_key = Some(public_key);
target = svcb.target.to_string();
if let Some(port) = svcb.get_param(pkarr::dns::rdata::SVCB::PORT) {
if port.len() < 2 {
// TODO: debug! Error encoding port!
}
let port = u16::from_be_bytes([port[0], port[1]]);
host = format!("{target}:{port}");
} else {
host.clone_from(&target);
};
continue;
}
};
break;
}
if let Some(homeserver) = homeserver_public_key {
let url = if host.starts_with("localhost") {
format!("http://{host}")
} else {
format!("https://{host}")
};
return Ok((homeserver, Url::parse(&url)?));
}
Err(Error::Generic("Could not resolve endpoint".to_string()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use pkarr::{
dns::{rdata::SVCB, Packet},
mainline::{dht::DhtSettings, Testnet},
Keypair, PkarrClient, Settings, SignedPacket,
};
use pubky_homeserver::Homeserver;
#[tokio::test]
async fn resolve_homeserver() {
let testnet = Testnet::new(3);
let server = Homeserver::start_test(&testnet).await.unwrap();
// Publish an intermediate controller of the homeserver
let pkarr_client = PkarrClient::new(Settings {
dht: DhtSettings {
bootstrap: Some(testnet.bootstrap.clone()),
..Default::default()
},
..Default::default()
})
.unwrap()
.as_async();
let intermediate = Keypair::random();
let mut packet = Packet::new_reply(0);
let server_tld = server.public_key().to_string();
let mut svcb = SVCB::new(0, server_tld.as_str().try_into().unwrap());
packet.answers.push(pkarr::dns::ResourceRecord::new(
"pubky".try_into().unwrap(),
pkarr::dns::CLASS::IN,
60 * 60,
pkarr::dns::rdata::RData::SVCB(svcb),
));
let signed_packet = SignedPacket::from_packet(&intermediate, &packet).unwrap();
pkarr_client.publish(&signed_packet).await.unwrap();
tokio::task::spawn_blocking(move || {
let client = PubkyClient::test(&testnet);
let pubky = Keypair::random();
client
.publish_pubky_homeserver(&pubky, &format!("pubky.{}", &intermediate.public_key()));
let (public_key, url) = client
.resolve_pubky_homeserver(&pubky.public_key())
.unwrap();
assert_eq!(public_key, server.public_key());
assert_eq!(url.host_str(), Some("localhost"));
assert_eq!(url.port(), Some(server.port()));
})
.await
.expect("task failed")
}
}

104
pubky/src/client/public.rs Normal file
View File

@@ -0,0 +1,104 @@
use bytes::Bytes;
use pkarr::PublicKey;
use crate::PubkyClient;
use super::Result;
impl PubkyClient {
pub fn put(&self, pubky: &PublicKey, path: &str, content: &[u8]) -> Result<()> {
let path = normalize_path(path);
let (_, mut url) = self.resolve_pubky_homeserver(pubky)?;
url.set_path(&format!("/{pubky}/{path}"));
self.request(super::HttpMethod::Put, &url)
.send_bytes(content)?;
Ok(())
}
pub fn get(&self, pubky: &PublicKey, path: &str) -> Result<Bytes> {
let path = normalize_path(path);
let (_, mut url) = self.resolve_pubky_homeserver(pubky)?;
url.set_path(&format!("/{pubky}/{path}"));
let response = self.request(super::HttpMethod::Get, &url).call()?;
let len = response
.header("Content-Length")
.and_then(|s| s.parse::<u64>().ok())
// TODO: return an error in case content-length header is missing
.unwrap_or(0);
// TODO: bail on too large files.
let mut bytes = vec![0; len as usize];
response.into_reader().read_exact(&mut bytes);
Ok(bytes.into())
}
}
fn normalize_path(path: &str) -> String {
let mut path = path.to_string();
if path.starts_with('/') {
path = path[1..].to_string()
}
// TODO: should we return error instead?
if path.ends_with('/') {
path = path[..path.len()].to_string()
}
path
}
#[cfg(test)]
mod tests {
use std::ops::Deref;
use crate::*;
use pkarr::{mainline::Testnet, Keypair};
use pubky_common::session::Session;
use pubky_homeserver::Homeserver;
#[tokio::test]
async fn put_get() {
let testnet = Testnet::new(3);
let server = Homeserver::start_test(&testnet).await.unwrap();
let client = PubkyClient::test(&testnet).as_async();
let keypair = Keypair::random();
client
.signup(&keypair, &server.public_key().to_string())
.await
.unwrap();
let response = client
.put(&keypair.public_key(), "/pub/foo.txt", &[0, 1, 2, 3, 4])
.await;
if let Err(Error::Ureq(ureqerror)) = response {
if let Some(r) = ureqerror.into_response() {
dbg!(r.into_string());
}
}
let response = client
.get(&keypair.public_key(), "/pub/foo.txt")
.await
.unwrap();
assert_eq!(response, bytes::Bytes::from(vec![0, 1, 2, 3, 4]))
}
}

View File

@@ -1,5 +1,7 @@
use std::thread;
use bytes::Bytes;
use pkarr::{Keypair, PublicKey};
use pubky_common::session::Session;
@@ -62,4 +64,31 @@ impl PubkyClientAsync {
receiver.recv_async().await?
}
/// Async version of [PubkyClient::put]
pub async fn put(&self, pubky: &PublicKey, path: &str, content: &[u8]) -> Result<()> {
let (sender, receiver) = flume::bounded::<Result<()>>(1);
let client = self.0.clone();
let pubky = pubky.clone();
let path = path.to_string();
let content = content.to_vec();
thread::spawn(move || sender.send(client.put(&pubky, &path, &content)));
receiver.recv_async().await?
}
/// Async version of [PubkyClient::get]
pub async fn get(&self, pubky: &PublicKey, path: &str) -> Result<Bytes> {
let (sender, receiver) = flume::bounded::<Result<Bytes>>(1);
let client = self.0.clone();
let pubky = pubky.clone();
let path = path.to_string();
thread::spawn(move || sender.send(client.get(&pubky, &path)));
receiver.recv_async().await?
}
}

View File

@@ -34,3 +34,9 @@ pub enum Error {
#[error(transparent)]
Session(#[from] pubky_common::session::Error),
}
impl From<ureq::Error> for Error {
fn from(error: ureq::Error) -> Self {
Error::Ureq(Box::new(error))
}
}

View File

@@ -6,54 +6,3 @@ mod error;
pub use client::PubkyClient;
pub use error::Error;
#[cfg(test)]
mod tests {
use super::*;
use super::error::Error;
use pkarr::{mainline::Testnet, Keypair};
use pubky_common::session::Session;
use pubky_homeserver::Homeserver;
#[tokio::test]
async fn basic_authn() {
let testnet = Testnet::new(3);
let server = Homeserver::start_test(&testnet).await.unwrap();
let client = PubkyClient::test(&testnet).as_async();
let keypair = Keypair::random();
client
.signup(&keypair, &server.public_key().to_string())
.await
.unwrap();
let session = client.session(&keypair.public_key()).await.unwrap();
assert_eq!(session, Session { ..session.clone() });
client.signout(&keypair.public_key()).await.unwrap();
{
let session = client.session(&keypair.public_key()).await;
assert!(session.is_err());
match session {
Err(Error::NotSignedIn) => {}
_ => assert!(false, "expected NotSignedInt error"),
}
}
client.signin(&keypair).await.unwrap();
{
let session = client.session(&keypair.public_key()).await.unwrap();
assert_eq!(session, Session { ..session.clone() });
}
}
}