feat: Pkarr republisher for user and homeserver keys (#85)

* added bin folder

* removed published_secrets.txt

* added single key publisher

* read and republish

* some changes

* made everything nice

* added RetrySettings

* added resilient client

* fmt

* renamed package

* readme

* added user key republisher

* fmt

* extracted DB and homeserver core from mod

* homeserver packet republish periodically

* small changes

* with final message

* conditional republish

* fmt

* clippy

* moved bin to examples

* improved readme

* handle relay only clients

* handled no dht available

* ignore corrupt users

* handle heed entry errors

* improved unwraps

* fixes

* fixes

* added republish interval to config

* use warn when some keys are missing

* removed old todo

* fixes

* fmt

* fix relays

* use resilient client in homeserverkeyrepublisher

* moved some news to default

* more defaults

* fixed tests

* fmt

* test

* removed resilient client for homeserver publish again

* clippy

* made code simpler

* refactored code

* added .vscode to gitingore

* resutlt match
This commit is contained in:
Severin Alexander Bühler
2025-03-18 17:07:07 +02:00
committed by GitHub
parent 6386f1ae43
commit 5393b4575a
28 changed files with 2410 additions and 406 deletions

4
.gitignore vendored
View File

@@ -1,2 +1,6 @@
target/
storage/
.continuerules
.cursorrules
.cursor/*
.vscode/*

93
Cargo.lock generated
View File

@@ -626,6 +626,16 @@ dependencies = [
"zeroize",
]
[[package]]
name = "ctrlc"
version = "3.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90eeab0aa92f3f9b4e87f258c72b139c207d251f9cbc1080a0086b86a8870dd3"
dependencies = [
"nix",
"windows-sys 0.59.0",
]
[[package]]
name = "curve25519-dalek"
version = "4.1.3"
@@ -1621,9 +1631,9 @@ checksum = "227748d55f2f0ab4735d87fd623798cb6b664512fe979705f829c9f81c934465"
[[package]]
name = "mainline"
version = "5.3.0"
version = "5.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d20596de02883daff9a64c0c02bb4371c44dc7988cd07ecb61d166281c7733d3"
checksum = "2fae24c3d129b92c8cfea92a9e2014052371a2835e4a6d66dfdb00238e389e56"
dependencies = [
"crc",
"document-features",
@@ -1637,7 +1647,7 @@ dependencies = [
"serde_bencode",
"serde_bytes",
"sha1_smol",
"thiserror 2.0.11",
"thiserror 2.0.12",
"tracing",
]
@@ -1723,6 +1733,18 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nix"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46"
dependencies = [
"bitflags",
"cfg-if",
"cfg_aliases",
"libc",
]
[[package]]
name = "no-std-compat"
version = "0.4.1"
@@ -1741,6 +1763,21 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "ntimestamp"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c50f94c405726d3e0095e89e72f75ce7f6587b94a8bd8dc8054b73f65c0fd68c"
dependencies = [
"base32",
"document-features",
"getrandom 0.2.15",
"httpdate",
"js-sys",
"once_cell",
"serde",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@@ -1960,9 +1997,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkarr"
version = "3.3.3"
version = "3.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c7f42132fd7f5dcfca23b791099dec416d67719c16f362bc2640d05078e5661"
checksum = "b006293464515e54044b64e1853c893111eb4e579f9f59decba0039d7c27e2f9"
dependencies = [
"async-compat",
"base32",
@@ -1980,8 +2017,8 @@ dependencies = [
"log",
"lru",
"mainline",
"ntimestamp",
"page_size",
"pubky-timestamp",
"reqwest",
"rustls",
"rustls-webpki",
@@ -1989,7 +2026,7 @@ dependencies = [
"serde",
"sha1_smol",
"simple-dns",
"thiserror 2.0.11",
"thiserror 2.0.12",
"tokio",
"tracing",
"url",
@@ -2015,7 +2052,7 @@ dependencies = [
"pkarr",
"rustls",
"serde",
"thiserror 2.0.11",
"thiserror 2.0.12",
"tokio",
"toml",
"tower-http",
@@ -2025,6 +2062,24 @@ dependencies = [
"url",
]
[[package]]
name = "pkarr-republisher"
version = "0.1.0"
dependencies = [
"anyhow",
"clap",
"ctrlc",
"futures-lite",
"hex",
"pkarr",
"pubky-testnet",
"rand 0.9.0",
"thiserror 2.0.12",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "pkcs8"
version = "0.10.2"
@@ -2123,7 +2178,7 @@ dependencies = [
"pubky-common",
"pubky-testnet",
"reqwest",
"thiserror 2.0.11",
"thiserror 2.0.12",
"tokio",
"tracing",
"tracing-subscriber",
@@ -2149,7 +2204,7 @@ dependencies = [
"pubky-timestamp",
"rand 0.9.0",
"serde",
"thiserror 2.0.11",
"thiserror 2.0.12",
]
[[package]]
@@ -2171,9 +2226,11 @@ dependencies = [
"httpdate",
"page_size",
"pkarr",
"pkarr-republisher",
"postcard",
"pubky-common",
"serde",
"thiserror 2.0.12",
"tokio",
"toml",
"tower 0.5.2",
@@ -2253,7 +2310,7 @@ dependencies = [
"rustc-hash",
"rustls",
"socket2",
"thiserror 2.0.11",
"thiserror 2.0.12",
"tokio",
"tracing",
]
@@ -2272,7 +2329,7 @@ dependencies = [
"rustls",
"rustls-pki-types",
"slab",
"thiserror 2.0.11",
"thiserror 2.0.12",
"tinyvec",
"tracing",
"web-time",
@@ -2990,11 +3047,11 @@ dependencies = [
[[package]]
name = "thiserror"
version = "2.0.11"
version = "2.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc"
checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708"
dependencies = [
"thiserror-impl 2.0.11",
"thiserror-impl 2.0.12",
]
[[package]]
@@ -3010,9 +3067,9 @@ dependencies = [
[[package]]
name = "thiserror-impl"
version = "2.0.11"
version = "2.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2"
checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d"
dependencies = [
"proc-macro2",
"quote",
@@ -3267,7 +3324,7 @@ dependencies = [
"governor",
"http",
"pin-project",
"thiserror 2.0.11",
"thiserror 2.0.12",
"tower 0.5.2",
"tracing",
]

View File

@@ -4,7 +4,7 @@ members = [
"pubky-*",
"http-relay",
"pkarr-republisher",
"examples"
]

1
pkarr-republisher/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
published_secret*.txt

View File

@@ -0,0 +1,30 @@
[package]
name = "pkarr-republisher"
version = "0.1.0"
edition = "2021"
authors = ["Severin Alex Bühler <severin@synonym.to>"]
description = "A pkarr packet republisher."
license = "MIT"
homepage = "https://github.com/pubky/pubky-core"
repository = "https://github.com/pubky/pubky-core"
keywords = ["pkarr", "mainline", "pubky"]
categories = ["web-programming"]
[dependencies]
anyhow = "1.0.95"
pkarr = "3.5.3"
tokio = { version = "1.43.0", features = ["full"] }
tracing = "0.1.41"
futures-lite = { version = "2.6.0"}
thiserror = "2.0.12"
# bin dependencies
clap = { version = "4.4", features = ["derive"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
ctrlc = "3.4.5"
hex = "0.4.3"
rand = "0.9.0"
[dev-dependencies]
pubky-testnet = { path = "../pubky-testnet" }

View File

@@ -0,0 +1,86 @@
# Pkarr Republisher
> Early version. Expect breaking API changes. Can still be heavily performance optimized especially by improving the `mainline` lib.
To keep data on the Mainline DHT alive, it needs to be actively republished every hour. This library provides the tools to republish packets reliably
and in a multi-threaded fashion. This allows the homeserver to republish hundrets of thousands of pkarr keys per day.
## Usage
**ResilientClient** Pkarr Client with retry and exponential backoff.
Takes [pkarr](https://github.com/pubky/pkarr) and makes it resilient to UDP unreliabilities and CPU exhaustion
by retrying operations with an exponential backoff. Retries help with UDP packet loss and the backoff gives the CPU time to recover.
```rust
use pkarr_republisher::ResilientClient;
let client = ResilientClient::new().unwrap();
let public_key = Keypair::random().public_key();
// Republish with retries
match client.republish(public_key.clone(), None).await {
Ok(info) => {
println!("Key {public_key} published to {} nodes after {} attempt(s).", info.published_nodes_count, info.attempts_needed);
},
Err(err) => {
if err.is_missing() {
println!("Key {public_key} not found in DHT.");
}
if err.is_publish_failed() {
println!("Key {public_key} failed to publish. {err}");
}
}
}
```
> **Limitation** ResilientClient requires a pkarr client that was built with the `dht` feature.
> Relays only are not supported.
**MultiRepublisher** Multi-threaded republisher of pkarr keys.
Uses the ResilientClient to publish hundrets of thousands of pkarr keys per day.
```rust
use pkarr_republisher::MultiRepublisher;
use pkarr::{Keypair, PublicKey};
let public_keys: Vec<PublicKey> = (0..100).map(|_| Keypair::random().public_key()).collect();
let republisher = MultiRepublisher::new().unwrap();
let results = republisher.run(public_keys, 10).await.expect("UDP socket build infallible");
// Verify result of each republished key.
for (key, result) in results {
match result {
Ok(info) => {
println!("Key {} published to {} nodes after {} attempt(s).", key, info.published_nodes_count, info.attempts_needed);
},
Err(err) => {
if err.is_missing() {
println!("Key {} not found in DHT.", key);
} else if err.is_publish_failed() {
println!("Key {} failed to publish: {}", key, err);
} else {
println!("Key {} encountered an error: {}", key, err);
}
}
}
}
```
> **Limitation** Publishing a high number of pkarr keys is CPU intense. A recent test showed a 4 Core CPU being able to publish ~600,000 keys in 24hrs.
> Takes this into consideration.
> Do not use pkarr relays with the MultiRepublisher. You will run into rate limits which are currently not handled.
## Examples
The [examples folder](./examples) contains scripts to test the performance of the republisher.
- [publish_and_save](./examples/publish_and_save.rs) Publishes x keys multi-threaded and saves them in `published_secrets.txt`.
- [read_and_verify](./examples/read_and_verify.rs) Takes a random sample of the published keys and verifies on how many nodes they've been stored on.
- [read_and_republish](./examples/read_and_republish.rs) takes the saved keys and republishes them multi-threaded.
Execute with `cargo run --example publish_and_save`

View File

@@ -0,0 +1,156 @@
//!
//! Publishs packets with random keys and saves the published public keys in a file
//! so they can be reused in other experiments.
//!
//! Run with `cargo run --example publish_and_save -- --num-records 100 --threads 6`.
//!
use clap::Parser;
use pkarr::{dns::Name, Client, Keypair, SignedPacket};
use pkarr_republisher::{ResilientClient, RetrySettings};
use std::{
process,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::{Duration, Instant},
};
use tokio::time::sleep;
use tracing::{info, level_filters::LevelFilter};
use tracing_subscriber::EnvFilter;
#[derive(Parser, Debug)]
#[command(
author,
about = "Publish random packets and save them in `published_secrets.txt`."
)]
struct Cli {
/// Number of records to publish
#[arg(long, default_value_t = 100)]
num_records: usize,
/// Number of parallel threads
#[arg(long, default_value_t = 6)]
threads: usize,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
println!("publish_and_save started.");
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into()))
.init();
// Set up the Ctrl+C handler
let ctrlc_pressed: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let r = ctrlc_pressed.clone();
ctrlc::set_handler(move || {
r.store(true, Ordering::SeqCst);
println!("Ctrl+C detected, shutting down...");
std::process::exit(0);
})
.expect("Error setting Ctrl+C handler");
println!("Press Ctrl+C to stop...");
info!("Publish {} records. Verify", cli.num_records);
let published_keys = publish_parallel(cli.num_records, cli.threads, &ctrlc_pressed).await;
// Turn into a hex list and write to file
let pubkeys = published_keys
.clone()
.into_iter()
.map(|key| {
let secret = key.secret_key();
hex::encode(secret)
})
.collect::<Vec<_>>();
let pubkeys_str = pubkeys.join("\n");
std::fs::write("published_secrets.txt", pubkeys_str).unwrap();
info!("Successfully wrote secrets keys to published_secrets.txt");
Ok(())
}
// Publish records in multiple threads.
async fn publish_parallel(
num_records: usize,
threads: usize,
ctrlc_pressed: &Arc<AtomicBool>,
) -> Vec<Keypair> {
let start = Instant::now();
let mut handles = vec![];
for thread_id in 0..threads {
let handle = tokio::spawn(async move {
tracing::info!("Started thread t{thread_id}");
publish_records(num_records / threads, thread_id).await
});
handles.push(handle);
}
loop {
let all_finished = handles
.iter()
.map(|handle| handle.is_finished())
.reduce(|a, b| a && b)
.unwrap();
if all_finished {
break;
}
if ctrlc_pressed.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(250)).await;
}
if ctrlc_pressed.load(Ordering::Relaxed) {
process::exit(0);
}
let mut all_result = vec![];
for handle in handles {
let keys = handle.await.unwrap();
all_result.extend(keys);
}
let rate = all_result.len() as f64 / start.elapsed().as_secs() as f64;
tracing::info!(
"Published {} keys in {} seconds at {rate:.2} keys/s",
all_result.len(),
start.elapsed().as_secs()
);
all_result
}
// Publishes x number of packets. Checks if they are actually available.
async fn publish_records(num_records: usize, thread_id: usize) -> Vec<Keypair> {
let client = Client::builder().no_relays().build().unwrap();
let rclient = ResilientClient::new_with_client(client, RetrySettings::default()).unwrap();
let mut records = vec![];
for i in 0..num_records {
let instant = Instant::now();
let key = Keypair::random();
let packet: SignedPacket = pkarr::SignedPacketBuilder::default()
.cname(Name::new("test").unwrap(), Name::new("test2").unwrap(), 600)
.build(&key)
.unwrap();
let result = rclient.publish(packet, None).await;
let elapsed_time = instant.elapsed().as_millis();
if result.is_ok() {
let info = result.unwrap();
tracing::info!("- t{thread_id:<2} {i:>3}/{num_records} Published {} within {elapsed_time}ms to {} nodes {} attempts", key.public_key(), info.published_nodes_count, info.attempts_needed);
records.push(key);
} else {
let e = result.unwrap_err();
tracing::error!("Failed to publish {} record: {e:?}", key.public_key());
continue;
}
}
records
}

View File

@@ -0,0 +1,118 @@
//!
//! Reads `published_secrets.txt` and tries to republish the packets.
//! This is done in a multi-threaded way to improve speed.
//!
//! Run with `cargo run --example read_and_republish -- --num-records 100 --threads 10`.
//!
use clap::Parser;
use pkarr::{ClientBuilder, Keypair};
use pkarr_republisher::{MultiRepublisher, RepublisherSettings};
use rand::rng;
use rand::seq::SliceRandom;
use std::{
process,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Instant,
};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;
#[derive(Parser, Debug)]
#[command(
author,
about = "Reads `published_secrets.txt` and tries to republish the packets."
)]
struct Cli {
/// How many keys should be republished?
#[arg(long, default_value_t = 100)]
num_records: usize,
/// Number of parallel threads
#[arg(long, default_value_t = 10)]
threads: u8,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
println!("read_and_republish started.");
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into()))
.init();
// Set up the Ctrl+C handler
let ctrlc_pressed: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let r = ctrlc_pressed.clone();
ctrlc::set_handler(move || {
r.store(true, Ordering::SeqCst);
println!("Ctrl+C detected, shutting down...");
process::exit(0);
})
.expect("Error setting Ctrl+C handler");
println!("Press Ctrl+C to stop...");
println!("Read published_secrets.txt");
let mut published_keys = read_keys();
println!("Read {} keys", published_keys.len());
println!(
"Take a random sample of {} keys to republish.",
cli.num_records
);
let mut rng = rng();
published_keys.shuffle(&mut rng);
let keys: Vec<Keypair> = published_keys.into_iter().take(cli.num_records).collect();
run_churn_loop(keys, cli.threads).await;
Ok(())
}
fn read_keys() -> Vec<Keypair> {
let secret_srs = std::fs::read_to_string("published_secrets.txt").expect("File not found");
let keys = secret_srs
.lines()
.map(|line| line.to_string())
.collect::<Vec<_>>();
keys.into_iter()
.map(|key| {
let secret = hex::decode(key).expect("invalid hex");
let secret: [u8; 32] = secret.try_into().unwrap();
Keypair::from_secret_key(&secret)
})
.collect::<Vec<_>>()
}
async fn run_churn_loop(keys: Vec<Keypair>, thread_count: u8) {
let public_keys = keys.into_iter().map(|key| key.public_key()).collect();
let mut builder = ClientBuilder::default();
builder.no_relays();
let republisher =
MultiRepublisher::new_with_settings(RepublisherSettings::default(), Some(builder));
println!("Republish keys. Hold on...");
let start = Instant::now();
let results = republisher.run(public_keys, thread_count).await.unwrap();
let elapsed_seconds = start.elapsed().as_secs_f32();
let keys_per_s = results.len() as f32 / elapsed_seconds;
tracing::info!(
"Processed {} keys within {elapsed_seconds:.2}s. {keys_per_s:.2} keys/s.",
results.len()
);
tracing::info!(
"{} success, {} missing, {} failed.",
results.success().len(),
results.missing().len(),
results.publishing_failed().len()
);
tracing::info!("Republishing finished.");
}

View File

@@ -0,0 +1,113 @@
//!
//! Reads `published_secrets.txt` and outputs how many nodes store this public key.
//! Run `publish_and_save` first to publish some packets to verify
//! Freshly stored once should have 15+.
//! <10 is ready for a republish.
//! 0 = Packet unavailable.
//!
//! Run with `cargo run --example read_and_verify -- --num_keys 20`
//!
use clap::Parser;
use pkarr::{Client, Keypair};
use pkarr_republisher::{ResilientClient, RetrySettings};
use rand::rng;
use rand::seq::SliceRandom;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tracing::{info, level_filters::LevelFilter};
use tracing_subscriber::EnvFilter;
#[derive(Parser, Debug)]
#[command(author, about = "Verify pkarr packets on the DHT.")]
struct Cli {
/// Verify x keys by checking how many nodes it was stored on.
#[arg(long, default_value_t = 20)]
num_records: usize,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
println!("read_and_verify started.");
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into()))
.init();
// Set up the Ctrl+C handler
let ctrlc_pressed: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let r = ctrlc_pressed.clone();
ctrlc::set_handler(move || {
r.store(true, Ordering::SeqCst);
println!("Ctrl+C detected, shutting down...");
std::process::exit(0);
})
.expect("Error setting Ctrl+C handler");
println!("Press Ctrl+C to stop...");
println!("Read published_secrets.txt");
let published_keys = read_keys();
println!("Read {} keys", published_keys.len());
let num_verify_keys = cli.num_records;
info!("Randomly verify: {num_verify_keys} keys");
verify_published(&published_keys, num_verify_keys).await;
Ok(())
}
fn read_keys() -> Vec<Keypair> {
let secret_srs = std::fs::read_to_string("published_secrets.txt").expect("File not found");
let keys = secret_srs
.lines()
.map(|line| line.to_string())
.collect::<Vec<_>>();
keys.into_iter()
.map(|key| {
let secret = hex::decode(key).expect("invalid hex");
let secret: [u8; 32] = secret.try_into().unwrap();
Keypair::from_secret_key(&secret)
})
.collect::<Vec<_>>()
}
async fn verify_published(keys: &[Keypair], count: usize) {
// Shuffle and take {count} elements to verify.
let mut keys: Vec<Keypair> = keys.to_owned();
let mut rng = rng();
keys.shuffle(&mut rng);
let keys: Vec<Keypair> = keys.into_iter().take(count).collect();
let client = Client::builder().no_relays().build().unwrap();
let rclient = ResilientClient::new_with_client(client, RetrySettings::default()).unwrap();
let mut success = 0;
let mut warn = 0;
let mut error = 0;
for (i, key) in keys.into_iter().enumerate() {
let nodes_count = rclient.verify_node_count(&key.public_key()).await;
if nodes_count == 0 {
tracing::error!(
"- {i}/{count} Verify {} found on {nodes_count} nodes.",
key.public_key()
);
error += 1;
} else if nodes_count < 5 {
tracing::warn!(
"- {i}/{count} Verify {} found on {nodes_count} nodes.",
key.public_key()
);
warn += 1;
} else {
tracing::info!(
"- {i}/{count} Verify {} found on {nodes_count} nodes.",
key.public_key()
);
success += 1;
}
}
println!("Success: {success}, Warn: {warn}, Error: {error}");
}

View File

@@ -0,0 +1,11 @@
mod multi_republisher;
mod publisher;
mod republisher;
mod resilient_client;
mod verify;
pub use multi_republisher::*;
pub use publisher::*;
pub use republisher::*;
pub use resilient_client::*;
pub use verify::count_key_on_dht;

View File

@@ -0,0 +1,252 @@
use crate::{
republisher::{RepublishError, RepublishInfo, RepublisherSettings},
ResilientClient, ResilientClientBuilderError,
};
use pkarr::PublicKey;
use std::collections::HashMap;
use tokio::time::Instant;
#[derive(Debug, Clone)]
pub struct MultiRepublishResult {
results: HashMap<PublicKey, Result<RepublishInfo, RepublishError>>,
}
impl MultiRepublishResult {
pub fn new(results: HashMap<PublicKey, Result<RepublishInfo, RepublishError>>) -> Self {
Self { results }
}
/// Number of keys
pub fn len(&self) -> usize {
self.results.len()
}
pub fn is_empty(&self) -> bool {
self.results.is_empty()
}
/// All keys
pub fn all_keys(&self) -> Vec<PublicKey> {
self.results.keys().cloned().collect()
}
/// Successfully published keys
pub fn success(&self) -> Vec<PublicKey> {
self.results
.iter()
.filter(|(_, result)| result.is_ok())
.map(|(key, _)| key.clone())
.collect()
}
/// Keys that failed to publish
pub fn publishing_failed(&self) -> Vec<PublicKey> {
self.results
.iter()
.filter(|(_, val)| {
if let Err(e) = val {
return e.is_publish_failed();
}
false
})
.map(|entry| entry.0.clone())
.collect()
}
/// Keys that are missing and could not be republished
pub fn missing(&self) -> Vec<PublicKey> {
self.results
.iter()
.filter(|(_, val)| {
if let Err(e) = val {
return e.is_missing();
}
false
})
.map(|entry| entry.0.clone())
.collect()
}
}
/// Republish multiple keys in a serially or multi-threaded way/
#[derive(Debug, Clone, Default)]
pub struct MultiRepublisher {
settings: RepublisherSettings,
client_builder: pkarr::ClientBuilder,
}
impl MultiRepublisher {
pub fn new() -> Self {
Self::default()
}
/// Create a new republisher with the settings.
/// The republisher ignores the settings.client but instead uses the client_builder to create multiple
/// pkarr clients instead of just one.
pub fn new_with_settings(
mut settings: RepublisherSettings,
client_builder: Option<pkarr::ClientBuilder>,
) -> Self {
settings.client = None; // Remove client if it's there because every thread will have it's own.
let builder = client_builder.unwrap_or_default();
Self {
settings,
client_builder: builder,
}
}
/// Go through the list of all public keys and republish them serially.
async fn run_serially(
&self,
public_keys: Vec<PublicKey>,
) -> Result<
HashMap<PublicKey, Result<RepublishInfo, RepublishError>>,
ResilientClientBuilderError,
> {
let mut results: HashMap<PublicKey, Result<RepublishInfo, RepublishError>> =
HashMap::with_capacity(public_keys.len());
tracing::debug!("Start to republish {} public keys.", public_keys.len());
// TODO: Inspect pkarr reliability.
// pkarr client gets really unreliable when used in parallel. To get around this, we use one client per run().
let client = self.client_builder.clone().build()?;
let rclient =
ResilientClient::new_with_client(client, self.settings.retry_settings.clone())?;
for key in public_keys {
let start = Instant::now();
let res = rclient
.republish(
key.clone(),
Some(self.settings.min_sufficient_node_publish_count),
)
.await;
let elapsed = start.elapsed().as_millis();
match &res {
Ok(info) => {
tracing::info!(
"Republished {key} successfully on {} nodes within {elapsed}ms. attemps={}",
info.published_nodes_count,
info.attempts_needed
)
}
Err(e) => {
tracing::warn!(
"Failed to republish public_key {} within {elapsed}ms. {}",
key,
e
);
}
}
results.insert(key.clone(), res);
}
Ok(results)
}
/// Republish keys in a parallel fashion, using multiple threads for better performance.
/// A good thread size is around 10 for most computers. With high performance cores, you can push
/// it to 40+.
pub async fn run(
&self,
public_keys: Vec<PublicKey>,
thread_count: u8,
) -> Result<MultiRepublishResult, ResilientClientBuilderError> {
let chunk_size = public_keys.len().div_ceil(thread_count as usize);
let chunks = public_keys
.chunks(chunk_size)
.map(|chunk| chunk.to_vec())
.collect::<Vec<_>>();
// Run in parallel
let mut handles = vec![];
for chunk in chunks {
let publisher = self.clone();
let handle = tokio::spawn(async move { publisher.run_serially(chunk).await });
handles.push(handle);
}
// Join results of all tasks
let mut results = HashMap::with_capacity(public_keys.len());
for handle in handles {
let join_result = handle.await;
if let Err(e) = join_result {
tracing::error!("Failed to join handle in MultiRepublisher::run: {e}");
continue;
}
let result = join_result.unwrap()?;
for entry in result {
results.insert(entry.0, entry.1);
}
}
Ok(MultiRepublishResult::new(results))
}
}
#[cfg(test)]
mod tests {
use std::num::NonZeroU8;
use pkarr::{dns::Name, ClientBuilder, Keypair, PublicKey};
use pubky_testnet::Testnet;
use crate::{multi_republisher::MultiRepublisher, republisher::RepublisherSettings};
async fn publish_sample_packets(client: &pkarr::Client, count: usize) -> Vec<PublicKey> {
let keys: Vec<Keypair> = (0..count).map(|_| Keypair::random()).collect();
for key in keys.iter() {
let packet = pkarr::SignedPacketBuilder::default()
.cname(Name::new("test").unwrap(), Name::new("test2").unwrap(), 600)
.build(key)
.unwrap();
let _ = client.publish(&packet, None).await;
}
keys.into_iter().map(|key| key.public_key()).collect()
}
#[tokio::test]
async fn single_key_republish_success() {
let testnet = Testnet::run().await.unwrap();
// Create testnet pkarr builder
let mut pkarr_builder = ClientBuilder::default();
pkarr_builder.bootstrap(&testnet.bootstrap()).no_relays();
let pkarr_client = pkarr_builder.clone().build().unwrap();
let public_keys = publish_sample_packets(&pkarr_client, 1).await;
let public_key = public_keys.first().unwrap().clone();
let mut settings = RepublisherSettings::default();
settings
.pkarr_client(pkarr_client)
.min_sufficient_node_publish_count(NonZeroU8::new(1).unwrap());
let publisher = MultiRepublisher::new_with_settings(settings, Some(pkarr_builder));
let results = publisher.run_serially(public_keys).await.unwrap();
let result = results.get(&public_key).unwrap();
if let Err(e) = result {
println!("Err {e}");
}
assert!(result.is_ok());
}
#[tokio::test]
async fn single_key_republish_insufficient() {
let testnet = Testnet::run().await.unwrap();
// Create testnet pkarr builder
let mut pkarr_builder = ClientBuilder::default();
pkarr_builder.bootstrap(&testnet.bootstrap()).no_relays();
let pkarr_client = pkarr_builder.clone().build().unwrap();
let public_keys = publish_sample_packets(&pkarr_client, 1).await;
let public_key = public_keys.first().unwrap().clone();
let mut settings = RepublisherSettings::default();
settings
.pkarr_client(pkarr_client)
.min_sufficient_node_publish_count(NonZeroU8::new(2).unwrap());
let publisher = MultiRepublisher::new_with_settings(settings, Some(pkarr_builder));
let results = publisher.run_serially(public_keys).await.unwrap();
let result = results.get(&public_key).unwrap();
assert!(result.is_err());
}
}

View File

@@ -0,0 +1,331 @@
//!
//! Publishes a single pkarr packet with retries in case it fails.
//!
use pkarr::{mainline::async_dht::AsyncDht, PublicKey, SignedPacket};
use std::{num::NonZeroU8, time::Duration};
use crate::verify::count_key_on_dht;
#[derive(thiserror::Error, Debug, Clone)]
pub enum PublishError {
#[error("Packet has been republished but to an insufficient number of {published_nodes_count} nodes.")]
InsuffientlyPublished { published_nodes_count: usize },
#[error(transparent)]
PublishFailed(#[from] pkarr::errors::PublishError),
}
impl PublishError {
pub fn is_insufficiently_published(&self) -> bool {
if let PublishError::InsuffientlyPublished { .. } = self {
return true;
}
false
}
pub fn is_publish_failed(&self) -> bool {
if let PublishError::PublishFailed { .. } = self {
return true;
}
false
}
}
#[derive(Debug, Clone)]
pub struct PublishInfo {
/// How many nodes the key got published on.
pub published_nodes_count: usize,
/// Number of publishing attempts needed to successfully publish.
pub attempts_needed: usize,
}
impl PublishInfo {
pub fn new(published_nodes_count: usize, attempts_needed: usize) -> Self {
Self {
published_nodes_count,
attempts_needed,
}
}
}
#[derive(Debug, Clone)]
pub struct RetrySettings {
/// Number of max retries to do before aborting.
pub(crate) max_retries: NonZeroU8,
/// First retry delay that is then used to calculate the exponential backoff.
/// Example: 100ms first, then 200ms, 400ms, 800ms and so on.
pub(crate) initial_retry_delay: Duration,
/// Cap on the retry delay so the exponential backoff doesn't get out of hand.
pub(crate) max_retry_delay: Duration,
}
impl RetrySettings {
pub fn new() -> Self {
Self::default()
}
/// Maximum number of republishing retries before giving up.
pub fn max_retries(&mut self, max_retries: NonZeroU8) -> &mut Self {
self.max_retries = max_retries;
self
}
/// Maximum duration the republish task exponentionally backs off until it tries again.
pub fn max_retry_delay(&mut self, duration: Duration) -> &mut Self {
self.max_retry_delay = duration;
self
}
/// Minimum duration the republish task exponentionally backs off until it tries again.
pub fn initial_retry_delay(&mut self, duration: Duration) -> &mut Self {
self.initial_retry_delay = duration;
self
}
}
impl Default for RetrySettings {
fn default() -> Self {
Self {
max_retries: NonZeroU8::new(4).expect("should always be > 0"),
initial_retry_delay: Duration::from_millis(200),
max_retry_delay: Duration::from_millis(5_000),
}
}
}
/// Settings for creating a republisher
#[derive(Debug, Clone)]
pub struct PublisherSettings {
pub(crate) client: Option<pkarr::Client>,
pub(crate) min_sufficient_node_publish_count: NonZeroU8,
pub retry_settings: RetrySettings,
}
impl Default for PublisherSettings {
fn default() -> Self {
Self {
client: None,
min_sufficient_node_publish_count: NonZeroU8::new(10).expect("Should always be > 0"),
retry_settings: RetrySettings::default(),
}
}
}
impl PublisherSettings {
pub fn new() -> Self {
Self::default()
}
/// Set a custom pkarr client
pub fn pkarr_client(&mut self, client: pkarr::Client) -> &mut Self {
self.client = Some(client);
self
}
/// Set the minimum sufficient number of nodes a key needs to be stored in
/// to be considered a success
pub fn min_sufficient_node_publish_count(&mut self, count: NonZeroU8) -> &mut Self {
self.min_sufficient_node_publish_count = count;
self
}
/// Set settings in relation to retries.
pub fn retry_settings(&mut self, settings: RetrySettings) -> &mut Self {
self.retry_settings = settings;
self
}
}
/// Tries to publish a single key and verifies the keys has been published to
/// a sufficient number of nodes.
/// Retries in case of errors with an exponential backoff.
#[derive(Debug, Clone)]
pub struct Publisher {
pub packet: SignedPacket,
client: pkarr::Client,
dht: AsyncDht,
min_sufficient_node_publish_count: NonZeroU8,
retry_settings: RetrySettings,
}
impl Publisher {
/// Creates a new Publisher with a new pkarr client.
pub fn new(packet: SignedPacket) -> Result<Self, pkarr::errors::BuildError> {
let settings = PublisherSettings::default();
Self::new_with_settings(packet, settings)
}
pub fn new_with_settings(
packet: SignedPacket,
settings: PublisherSettings,
) -> Result<Self, pkarr::errors::BuildError> {
let client = match &settings.client {
Some(c) => c.clone(),
None => pkarr::Client::builder().build()?,
};
let dht = client.dht().expect("infallible").as_async();
Ok(Self {
packet,
client,
dht,
min_sufficient_node_publish_count: settings.min_sufficient_node_publish_count,
retry_settings: settings.retry_settings,
})
}
/// Get the public key of the signer of the packet
fn get_public_key(&self) -> PublicKey {
self.packet.public_key()
}
/// Exponential backoff delay starting with `INITIAL_DELAY_MS` and maxing out at `MAX_DELAY_MS`
fn get_retry_delay(&self, retry_count: u8) -> Duration {
let initial_ms = self.retry_settings.initial_retry_delay.as_millis() as u64;
let multiplicator = 2u64.pow(retry_count as u32);
let delay_ms = initial_ms * multiplicator;
let delay = Duration::from_millis(delay_ms);
delay.min(self.retry_settings.max_retry_delay)
}
/// Republish a single public key.
pub async fn publish_once(&self) -> Result<PublishInfo, PublishError> {
if let Err(e) = self.client.publish(&self.packet, None).await {
return Err(e.into());
}
// TODO: This counting could really be done with the put response in the mainline library already. It's not exposed though.
// This would really speed up the publishing and reduce the load on the DHT.
// -- Sev April 2025 --
let published_nodes_count = count_key_on_dht(&self.get_public_key(), &self.dht).await;
if published_nodes_count < self.min_sufficient_node_publish_count.get().into() {
return Err(PublishError::InsuffientlyPublished {
published_nodes_count,
});
}
Ok(PublishInfo::new(published_nodes_count, 1))
}
// Publishes the key with an exponential backoff
pub async fn publish(&self) -> Result<PublishInfo, PublishError> {
let max_retries = self.retry_settings.max_retries.get();
let mut last_error: Option<PublishError> = None;
for retry_count in 0..max_retries {
let human_retry_count = retry_count + 1;
match self.publish_once().await {
Ok(mut info) => {
info.attempts_needed = human_retry_count as usize;
return Ok(info);
}
Err(e) => {
tracing::debug!(
"{human_retry_count}/{max_retries} Failed to publish {}: {e}",
self.get_public_key()
);
last_error = Some(e);
}
}
let delay = self.get_retry_delay(retry_count);
tracing::debug!(
"{} {human_retry_count}/{max_retries} Sleep for {delay:?} before trying again.",
self.get_public_key()
);
tokio::time::sleep(delay).await;
}
Err(last_error.expect("infallible"))
}
}
#[cfg(test)]
mod tests {
use std::{num::NonZeroU8, time::Duration};
use pkarr::{dns::Name, Keypair, PublicKey, SignedPacket};
use pubky_testnet::Testnet;
use crate::publisher::{PublishError, Publisher, PublisherSettings};
fn sample_packet() -> (PublicKey, SignedPacket) {
let key = Keypair::random();
let packet = pkarr::SignedPacketBuilder::default()
.cname(Name::new("test").unwrap(), Name::new("test2").unwrap(), 600)
.build(&key)
.unwrap();
(key.public_key(), packet)
}
#[tokio::test]
async fn single_key_republish_success() {
let testnet = Testnet::run().await.unwrap();
let pubky_client = testnet.client_builder().build().unwrap();
let pkarr_client = pubky_client.pkarr().clone();
let (_, packet) = sample_packet();
let required_nodes = 1;
let mut settings = PublisherSettings::default();
settings
.pkarr_client(pkarr_client)
.min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap());
let publisher = Publisher::new_with_settings(packet, settings).unwrap();
let res = publisher.publish_once().await;
assert!(res.is_ok());
let success = res.unwrap();
assert_eq!(success.published_nodes_count, 1);
}
#[tokio::test]
async fn single_key_republish_insufficient() {
let testnet = Testnet::run().await.unwrap();
let pubky_client = testnet.client_builder().build().unwrap();
let pkarr_client = pubky_client.pkarr().clone();
let (_, packet) = sample_packet();
let required_nodes = 2;
let mut settings = PublisherSettings::default();
settings
.pkarr_client(pkarr_client)
.min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap());
let publisher = Publisher::new_with_settings(packet, settings).unwrap();
let res = publisher.publish_once().await;
assert!(res.is_err());
let err = res.unwrap_err();
assert!(err.is_insufficiently_published());
if let PublishError::InsuffientlyPublished {
published_nodes_count,
} = err
{
assert_eq!(published_nodes_count, 1);
};
}
#[tokio::test]
async fn retry_delay() {
let testnet = Testnet::run().await.unwrap();
let pubky_client = testnet.client_builder().build().unwrap();
let pkarr_client = pubky_client.pkarr().clone();
let (_, packet) = sample_packet();
let required_nodes = 1;
let mut settings = PublisherSettings::default();
settings
.pkarr_client(pkarr_client)
.min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap());
settings
.retry_settings
.max_retries(NonZeroU8::new(10).unwrap())
.initial_retry_delay(Duration::from_millis(100))
.max_retry_delay(Duration::from_secs(10));
let publisher = Publisher::new_with_settings(packet, settings).unwrap();
let first_delay = publisher.get_retry_delay(0);
assert_eq!(first_delay.as_millis(), 100);
let second_delay = publisher.get_retry_delay(1);
assert_eq!(second_delay.as_millis(), 200);
let third_delay = publisher.get_retry_delay(2);
assert_eq!(third_delay.as_millis(), 400);
let ninth_delay = publisher.get_retry_delay(9);
assert_eq!(ninth_delay.as_millis(), 10_000);
}
}

View File

@@ -0,0 +1,406 @@
//!
//! Republishes a single public key with retries in case it fails.
//!
use pkarr::PublicKey;
use pkarr::SignedPacket;
use std::{num::NonZeroU8, sync::Arc, time::Duration};
use crate::{
publisher::{PublishError, Publisher, PublisherSettings},
RetrySettings,
};
#[derive(thiserror::Error, Debug, Clone)]
pub enum RepublishError {
#[error("The packet can't be resolved on the DHT and therefore can't be republished.")]
Missing,
#[error(transparent)]
PublishFailed(#[from] PublishError),
}
impl RepublishError {
pub fn is_missing(&self) -> bool {
if let RepublishError::Missing = self {
return true;
}
false
}
pub fn is_publish_failed(&self) -> bool {
if let RepublishError::PublishFailed { .. } = self {
return true;
}
false
}
}
#[derive(Debug, Clone)]
pub struct RepublishInfo {
/// How many nodes the key got published on.
pub published_nodes_count: usize,
/// Number of publishing attempts needed to successfully republish.
pub attempts_needed: usize,
/// Whether the `republish_condition` was negative.
pub condition_failed: bool,
}
impl RepublishInfo {
pub fn new(
published_nodes_count: usize,
attempts_needed: usize,
should_republish_condition_failed: bool,
) -> Self {
Self {
published_nodes_count,
attempts_needed,
condition_failed: should_republish_condition_failed,
}
}
}
pub type RepublishCondition = dyn Fn(&SignedPacket) -> bool + Send + Sync;
/// Settings for creating a republisher
#[derive(Clone)]
pub struct RepublisherSettings {
pub(crate) client: Option<pkarr::Client>,
pub(crate) min_sufficient_node_publish_count: NonZeroU8,
pub(crate) retry_settings: RetrySettings,
pub(crate) republish_condition: Option<Arc<RepublishCondition>>,
}
impl std::fmt::Debug for RepublisherSettings {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RepublisherSettings")
.field("client", &self.client)
.field(
"min_sufficient_node_publish_count",
&self.min_sufficient_node_publish_count,
)
.field("retry_settings", &self.retry_settings)
.finish_non_exhaustive()
}
}
impl RepublisherSettings {
pub fn new() -> Self {
Self::default()
}
/// Set a custom pkarr client
pub fn pkarr_client(&mut self, client: pkarr::Client) -> &mut Self {
self.client = Some(client);
self
}
/// Set the minimum sufficient number of nodes a key needs to be stored in
/// to be considered a success
pub fn min_sufficient_node_publish_count(&mut self, count: NonZeroU8) -> &mut Self {
self.min_sufficient_node_publish_count = count;
self
}
/// Set settings in relation to retries.
pub fn retry_settings(&mut self, settings: RetrySettings) -> &mut Self {
self.retry_settings = settings;
self
}
/// Set a closure that determines whether a packet should be republished
pub fn republish_condition<F>(&mut self, f: F) -> &mut Self
where
F: Fn(&SignedPacket) -> bool + Send + Sync + 'static,
{
self.republish_condition = Some(Arc::new(f));
self
}
}
impl Default for RepublisherSettings {
fn default() -> Self {
Self {
client: None,
min_sufficient_node_publish_count: NonZeroU8::new(10).expect("Should always be > 0"),
retry_settings: RetrySettings::default(),
republish_condition: None,
}
}
}
/// Tries to republish a single key.
/// Retries in case of errors with an exponential backoff.
pub struct Republisher {
pub public_key: PublicKey,
client: pkarr::Client,
min_sufficient_node_publish_count: NonZeroU8,
retry_settings: RetrySettings,
republish_condition: Arc<dyn Fn(&SignedPacket) -> bool + Send + Sync>,
}
impl std::fmt::Debug for Republisher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Republisher")
.field("public_key", &self.public_key)
.field("client", &self.client)
.field(
"min_sufficient_node_publish_count",
&self.min_sufficient_node_publish_count,
)
.field("retry_settings", &self.retry_settings)
.finish_non_exhaustive()
}
}
impl Republisher {
/// Creates a new Republisher;
pub fn new(public_key: PublicKey) -> Result<Self, pkarr::errors::BuildError> {
let settings = RepublisherSettings::default();
Self::new_with_settings(public_key, settings)
}
pub fn new_with_settings(
public_key: PublicKey,
settings: RepublisherSettings,
) -> Result<Self, pkarr::errors::BuildError> {
let client = match &settings.client {
Some(c) => c.clone(),
None => pkarr::Client::builder().build()?,
};
Ok(Republisher {
public_key,
client,
min_sufficient_node_publish_count: settings.min_sufficient_node_publish_count,
retry_settings: settings.retry_settings,
republish_condition: settings
.republish_condition
.unwrap_or_else(|| Arc::new(|_| true)),
})
}
/// Exponential backoff delay starting with `INITIAL_DELAY_MS` and maxing out at `MAX_DELAY_MS`
fn get_retry_delay(&self, retry_count: u8) -> Duration {
let initial_ms = self.retry_settings.initial_retry_delay.as_millis() as u64;
let multiplicator = 2u64.pow(retry_count as u32);
let delay_ms = initial_ms * multiplicator;
let delay = Duration::from_millis(delay_ms);
delay.min(self.retry_settings.max_retry_delay)
}
/// Republish a single public key.
pub async fn republish_once(&self) -> Result<RepublishInfo, RepublishError> {
let packet = self.client.resolve_most_recent(&self.public_key).await;
if packet.is_none() {
return Err(RepublishError::Missing);
}
let packet = packet.unwrap();
// Check if the packet should be republished
if !(self.republish_condition)(&packet) {
return Ok(RepublishInfo::new(0, 1, true));
}
let mut settings = PublisherSettings::default();
settings
.pkarr_client(self.client.clone())
.min_sufficient_node_publish_count(self.min_sufficient_node_publish_count);
let publisher = Publisher::new_with_settings(packet, settings)
.expect("infallible because pkarr client provided");
match publisher.publish_once().await {
Ok(info) => Ok(RepublishInfo::new(info.published_nodes_count, 1, false)),
Err(e) => Err(e.into()),
}
}
// Republishes the key with an exponential backoff
pub async fn republish(&self) -> Result<RepublishInfo, RepublishError> {
let max_retries = self.retry_settings.max_retries.get();
let mut retry_count = 0;
let mut last_error: Option<RepublishError> = None;
while retry_count < max_retries {
match self.republish_once().await {
Ok(mut success) => {
success.attempts_needed = retry_count as usize + 1;
return Ok(success);
}
Err(e) => {
tracing::debug!(
"{retry_count}/{max_retries} Failed to publish {}: {e}",
self.public_key
);
last_error = Some(e);
}
}
let delay = self.get_retry_delay(retry_count);
retry_count += 1;
tracing::debug!(
"{} {retry_count}/{max_retries} Sleep for {delay:?} before trying again.",
self.public_key
);
tokio::time::sleep(delay).await;
}
Err(last_error.expect("infallible"))
}
}
#[cfg(test)]
mod tests {
use std::{num::NonZeroU8, time::Duration};
use crate::republisher::{Republisher, RepublisherSettings};
use pkarr::{dns::Name, Keypair, PublicKey};
use pubky_testnet::Testnet;
async fn publish_sample_packets(client: &pkarr::Client) -> PublicKey {
let key = Keypair::random();
let packet = pkarr::SignedPacketBuilder::default()
.cname(Name::new("test").unwrap(), Name::new("test2").unwrap(), 600)
.build(&key)
.unwrap();
client
.publish(&packet, None)
.await
.expect("to be published");
key.public_key()
}
#[tokio::test]
async fn single_key_republish_success() {
let testnet = Testnet::run().await.unwrap();
let pubky_client = testnet.client_builder().build().unwrap();
let pkarr_client = pubky_client.pkarr().clone();
let public_key = publish_sample_packets(&pkarr_client).await;
let required_nodes = 1;
let mut settings = RepublisherSettings::default();
settings
.pkarr_client(pkarr_client)
.min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap());
let publisher = Republisher::new_with_settings(public_key, settings).unwrap();
let res = publisher.republish_once().await;
assert!(res.is_ok());
let success = res.unwrap();
assert_eq!(success.published_nodes_count, 1);
}
#[tokio::test]
async fn single_key_republish_missing() {
let testnet = Testnet::run().await.unwrap();
let pubky_client = testnet.client_builder().build().unwrap();
let pkarr_client = pubky_client.pkarr().clone();
let public_key = Keypair::random().public_key();
let required_nodes = 1;
let mut settings = RepublisherSettings::default();
settings
.pkarr_client(pkarr_client)
.min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap());
let publisher = Republisher::new_with_settings(public_key, settings).unwrap();
let res = publisher.republish_once().await;
assert!(res.is_err());
let err = res.unwrap_err();
assert!(err.is_missing());
}
#[tokio::test]
async fn retry_delay() {
let testnet = Testnet::run().await.unwrap();
let pubky_client = testnet.client_builder().build().unwrap();
let pkarr_client = pubky_client.pkarr().clone();
let public_key = Keypair::random().public_key();
let required_nodes = 1;
let mut settings = RepublisherSettings::default();
settings
.pkarr_client(pkarr_client)
.min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap());
settings
.retry_settings
.max_retries(NonZeroU8::new(10).unwrap())
.initial_retry_delay(Duration::from_millis(100))
.max_retry_delay(Duration::from_secs(10));
let publisher = Republisher::new_with_settings(public_key, settings).unwrap();
let first_delay = publisher.get_retry_delay(0);
assert_eq!(first_delay.as_millis(), 100);
let second_delay = publisher.get_retry_delay(1);
assert_eq!(second_delay.as_millis(), 200);
let third_delay = publisher.get_retry_delay(2);
assert_eq!(third_delay.as_millis(), 400);
let ninth_delay = publisher.get_retry_delay(9);
assert_eq!(ninth_delay.as_millis(), 10_000);
}
#[tokio::test]
async fn republish_retry_missing() {
let testnet = Testnet::run().await.unwrap();
let pubky_client = testnet.client_builder().build().unwrap();
let pkarr_client = pubky_client.pkarr().clone();
let public_key = Keypair::random().public_key();
let required_nodes = 1;
let mut settings = RepublisherSettings::default();
settings
.pkarr_client(pkarr_client)
.min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap());
settings
.retry_settings
.max_retries(NonZeroU8::new(3).unwrap())
.initial_retry_delay(Duration::from_millis(100));
let publisher = Republisher::new_with_settings(public_key, settings).unwrap();
let res = publisher.republish().await;
assert!(res.is_err());
assert!(res.unwrap_err().is_missing());
}
#[tokio::test]
async fn republish_with_condition_fail() {
let testnet = Testnet::run().await.unwrap();
let pubky_client = testnet.client_builder().build().unwrap();
let pkarr_client = pubky_client.pkarr().clone();
let public_key = publish_sample_packets(&pkarr_client).await;
let required_nodes = 1;
let mut settings = RepublisherSettings::default();
settings
.pkarr_client(pkarr_client.clone())
.min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap())
// Only republish if the packet has a TTL greater than 300
.republish_condition(|_| false);
let publisher = Republisher::new_with_settings(public_key.clone(), settings).unwrap();
let res = publisher.republish_once().await;
assert!(res.is_ok());
let info = res.unwrap();
assert_eq!(info.published_nodes_count, 0);
assert_eq!(info.condition_failed, true);
}
#[tokio::test]
async fn republish_with_condition_success() {
let testnet = Testnet::run().await.unwrap();
let pubky_client = testnet.client_builder().build().unwrap();
let pkarr_client = pubky_client.pkarr().clone();
let public_key = publish_sample_packets(&pkarr_client).await;
let required_nodes = 1;
let mut settings = RepublisherSettings::default();
settings
.pkarr_client(pkarr_client.clone())
.min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap())
// Only republish if the packet has a TTL greater than 300
.republish_condition(|_| true);
let publisher = Republisher::new_with_settings(public_key.clone(), settings).unwrap();
let res = publisher.republish_once().await;
assert!(res.is_ok());
let info = res.unwrap();
assert_eq!(info.published_nodes_count, 1);
assert_eq!(info.condition_failed, false);
}
}

View File

@@ -0,0 +1,91 @@
use std::num::NonZeroU8;
use pkarr::{mainline::async_dht::AsyncDht, PublicKey, SignedPacket};
use crate::{
count_key_on_dht, PublishError, PublishInfo, Publisher, PublisherSettings, RepublishError,
RepublishInfo, Republisher, RepublisherSettings, RetrySettings,
};
#[derive(Debug, thiserror::Error)]
pub enum ResilientClientBuilderError {
#[error("pkarr client was built without DHT and is only using relays. This is not supported.")]
DhtNotEnabled,
#[error(transparent)]
BuildError(#[from] pkarr::errors::BuildError),
}
/// Simple pkarr client that focuses on resilience
/// and verification compared to the regular client that
/// might experience inreliability due to the underlying UDP connection.
///
/// This client requires a pkarr client that was built with the `dht` feature.
/// Relays only are not supported.
#[derive(Debug, Clone)]
pub struct ResilientClient {
client: pkarr::Client,
dht: AsyncDht,
retry_settings: RetrySettings,
}
impl ResilientClient {
pub fn new() -> Result<Self, ResilientClientBuilderError> {
let client = pkarr::Client::builder().build()?;
Self::new_with_client(client, RetrySettings::default())
}
pub fn new_with_client(
client: pkarr::Client,
retry_settings: RetrySettings,
) -> Result<Self, ResilientClientBuilderError> {
let dht = client.dht();
if dht.is_none() {
return Err(ResilientClientBuilderError::DhtNotEnabled);
}
let dht = dht.unwrap().as_async();
Ok(Self {
client,
dht,
retry_settings,
})
}
/// Publishes a pkarr packet with retries. Verifies it's been stored correctly.
pub async fn publish(
&self,
packet: SignedPacket,
min_sufficient_node_publish_count: Option<NonZeroU8>,
) -> Result<PublishInfo, PublishError> {
let mut settings = PublisherSettings::default();
settings.pkarr_client(self.client.clone());
settings.retry_settings(self.retry_settings.clone());
if let Some(count) = min_sufficient_node_publish_count {
settings.min_sufficient_node_publish_count = count;
};
let publisher = Publisher::new_with_settings(packet, settings)
.expect("infallible because pkarr client provided.");
publisher.publish().await
}
/// Republishes a pkarr packet with retries. Verifies it's been stored correctly.
pub async fn republish(
&self,
public_key: PublicKey,
min_sufficient_node_publish_count: Option<NonZeroU8>,
) -> Result<RepublishInfo, RepublishError> {
let mut settings = RepublisherSettings::default();
settings.pkarr_client(self.client.clone());
if let Some(count) = min_sufficient_node_publish_count {
settings.min_sufficient_node_publish_count = count;
};
settings.retry_settings(self.retry_settings.clone());
let publisher = Republisher::new_with_settings(public_key, settings)
.expect("infallible because pkarr client provided.");
publisher.republish().await
}
/// Counts the number of nodes the public key has been stored on.
pub async fn verify_node_count(&self, public_key: &PublicKey) -> usize {
count_key_on_dht(public_key, &self.dht).await
}
}

View File

@@ -0,0 +1,12 @@
use futures_lite::StreamExt;
use pkarr::{mainline::async_dht::AsyncDht, PublicKey};
/// Verifies the number of nodes that store the public key.
pub async fn count_key_on_dht(public_key: &PublicKey, dht: &AsyncDht) -> usize {
let mut response_count = 0;
let mut stream = dht.get_mutable(public_key.as_bytes(), None, None);
while (stream.next().await).is_some() {
response_count += 1;
}
response_count
}

View File

@@ -45,3 +45,6 @@ url = "2.5.4"
axum-server = { version = "0.7.1", features = ["tls-rustls-no-provider"] }
tower = "0.5.2"
page_size = "0.6.0"
pkarr-republisher = { path = "../pkarr-republisher" }
thiserror = "2.0.12"

View File

@@ -1,6 +1,9 @@
# Secret key (in hex) to generate the Homeserver's Keypair
# secret_key = "0000000000000000000000000000000000000000000000000000000000000000"
# The interval at which user keys are republished to the DHT.
user_keys_republisher_interval = 14400 # 4 hour in seconds
[admin]
# Set an admin password to protect admin endpoints.
# If no password is set, the admin endpoints will not be accessible.

View File

@@ -15,6 +15,8 @@ use crate::{
io::IoConfig,
};
pub const DEFAULT_REPUBLISHER_INTERVAL: u64 = 4 * 60 * 60; // 4 hours in seconds
// === Core ==
pub const DEFAULT_STORAGE_DIR: &str = "pubky";
pub const DEFAULT_MAP_SIZE: usize = 10995116277760; // 10TB (not = disk-space used)
@@ -129,7 +131,6 @@ impl Config {
bootstrap,
http_port: 0,
https_port: 0,
..Default::default()
},
core: CoreConfig::test(),

View File

@@ -0,0 +1,74 @@
use super::tables::{Tables, TABLES_COUNT};
/// Protecting fields from being mutated by modules in crate::database
use crate::core::CoreConfig;
use heed::{Env, EnvOpenOptions};
use std::{fs, path::PathBuf};
use super::migrations;
#[derive(Debug, Clone)]
pub struct DB {
pub(crate) env: Env,
pub(crate) tables: Tables,
pub(crate) buffers_dir: PathBuf,
pub(crate) max_chunk_size: usize,
config: CoreConfig,
}
impl DB {
/// # Safety
/// DB uses LMDB, [opening][heed::EnvOpenOptions::open] which is marked unsafe,
/// because the possible Undefined Behavior (UB) if the lock file is broken.
pub unsafe fn open(config: CoreConfig) -> anyhow::Result<Self> {
let buffers_dir = config.storage.clone().join("buffers");
// Cleanup buffers.
let _ = fs::remove_dir(&buffers_dir);
fs::create_dir_all(&buffers_dir)?;
let env = unsafe {
EnvOpenOptions::new()
.max_dbs(TABLES_COUNT)
.map_size(config.db_map_size)
.open(&config.storage)
}?;
let tables = migrations::run(&env)?;
let db = DB {
env,
tables,
config,
buffers_dir,
max_chunk_size: max_chunk_size(),
};
Ok(db)
}
// Create an ephemeral database for testing purposes.
pub fn test() -> DB {
unsafe { DB::open(CoreConfig::test()).unwrap() }
}
// === Getters ===
pub fn config(&self) -> &CoreConfig {
&self.config
}
}
/// calculate optimal chunk size:
/// - <https://lmdb.readthedocs.io/en/release/#storage-efficiency-limits>
/// - <https://github.com/lmdbjava/benchmarks/blob/master/results/20160710/README.md#test-2-determine-24816-kb-byte-values>
fn max_chunk_size() -> usize {
let page_size = page_size::get();
// - 16 bytes Header per page (LMDB)
// - Each page has to contain 2 records
// - 8 bytes per record (LMDB) (empirically, it seems to be 10 not 8)
// - 12 bytes key:
// - timestamp : 8 bytes
// - chunk index: 4 bytes
((page_size - 16) / 2) - (8 + 2) - 12
}

View File

@@ -1,87 +1,5 @@
//! Internal database in [super::HomeserverCore]
use std::{fs, path::PathBuf};
use heed::{Env, EnvOpenOptions};
mod db;
mod migrations;
pub mod tables;
use tables::{Tables, TABLES_COUNT};
pub use protected::DB;
/// Protecting fields from being mutated by modules in crate::database
mod protected {
use crate::core::CoreConfig;
use super::*;
#[derive(Debug, Clone)]
pub struct DB {
pub(crate) env: Env,
pub(crate) tables: Tables,
pub(crate) buffers_dir: PathBuf,
pub(crate) max_chunk_size: usize,
config: CoreConfig,
}
impl DB {
/// # Safety
/// DB uses LMDB, [opening][heed::EnvOpenOptions::open] which is marked unsafe,
/// because the possible Undefined Behavior (UB) if the lock file is broken.
pub unsafe fn open(config: CoreConfig) -> anyhow::Result<Self> {
let buffers_dir = config.storage.clone().join("buffers");
// Cleanup buffers.
let _ = fs::remove_dir(&buffers_dir);
fs::create_dir_all(&buffers_dir)?;
let env = unsafe {
EnvOpenOptions::new()
.max_dbs(TABLES_COUNT)
.map_size(config.db_map_size)
.open(&config.storage)
}?;
let tables = migrations::run(&env)?;
let db = DB {
env,
tables,
config,
buffers_dir,
max_chunk_size: max_chunk_size(),
};
Ok(db)
}
// Create an ephemeral database for testing purposes.
pub fn test() -> DB {
unsafe { DB::open(CoreConfig::test()).unwrap() }
}
// === Getters ===
pub fn config(&self) -> &CoreConfig {
&self.config
}
}
}
/// calculate optimal chunk size:
/// - <https://lmdb.readthedocs.io/en/release/#storage-efficiency-limits>
/// - <https://github.com/lmdbjava/benchmarks/blob/master/results/20160710/README.md#test-2-determine-24816-kb-byte-values>
fn max_chunk_size() -> usize {
let page_size = page_size::get();
// - 16 bytes Header per page (LMDB)
// - Each page has to contain 2 records
// - 8 bytes per record (LMDB) (empirically, it seems to be 10 not 8)
// - 12 bytes key:
// - timestamp : 8 bytes
// - chunk index: 4 bytes
((page_size - 16) / 2) - (8 + 2) - 12
}
pub use db::DB;

View File

@@ -4,7 +4,7 @@ use postcard::{from_bytes, to_allocvec};
use serde::{Deserialize, Serialize};
use heed::{BoxedError, BytesDecode, BytesEncode, Database};
use pkarr::PublicKey;
use pkarr::{PublicKey, Timestamp};
extern crate alloc;
@@ -19,6 +19,15 @@ pub struct User {
pub created_at: u64,
}
impl User {
#[allow(dead_code)]
pub fn new() -> Self {
Self {
created_at: Timestamp::now().as_u64(),
}
}
}
impl BytesEncode<'_> for User {
type EItem = Self;

View File

@@ -0,0 +1,224 @@
use std::{path::PathBuf, time::Duration};
use crate::core::user_keys_republisher::UserKeysRepublisher;
use anyhow::Result;
use axum::Router;
use pubky_common::auth::AuthVerifier;
use tokio::time::sleep;
use crate::config::{
DEFAULT_LIST_LIMIT, DEFAULT_MAP_SIZE, DEFAULT_MAX_LIST_LIMIT, DEFAULT_REPUBLISHER_INTERVAL,
DEFAULT_STORAGE_DIR,
};
use crate::core::database::DB;
#[derive(Clone, Debug)]
pub(crate) struct AppState {
pub(crate) verifier: AuthVerifier,
pub(crate) db: DB,
pub(crate) admin: AdminConfig,
}
const INITIAL_DELAY_BEFORE_REPUBLISH: Duration = Duration::from_secs(60);
#[derive(Debug, Clone)]
/// A side-effect-free Core of the [crate::Homeserver].
pub struct HomeserverCore {
pub(crate) router: Router,
pub(crate) user_keys_republisher: UserKeysRepublisher,
}
impl HomeserverCore {
/// Create a side-effect-free Homeserver core.
///
/// # Safety
/// HomeserverCore uses LMDB, [opening][heed::EnvOpenOptions::open] which is marked unsafe,
/// because the possible Undefined Behavior (UB) if the lock file is broken.
pub unsafe fn new(config: CoreConfig, admin: AdminConfig) -> Result<Self> {
let db = unsafe { DB::open(config.clone())? };
let state = AppState {
verifier: AuthVerifier::default(),
db: db.clone(),
admin,
};
let router = super::routes::create_app(state.clone());
let user_keys_republisher = UserKeysRepublisher::new(
db.clone(),
config
.user_keys_republisher_interval
.unwrap_or(Duration::from_secs(DEFAULT_REPUBLISHER_INTERVAL)),
);
let user_keys_republisher_clone = user_keys_republisher.clone();
if config.is_user_keys_republisher_enabled() {
// Delayed start of the republisher to give time for the homeserver to start.
tokio::spawn(async move {
sleep(INITIAL_DELAY_BEFORE_REPUBLISH).await;
user_keys_republisher_clone.run().await;
});
}
Ok(Self {
router,
user_keys_republisher,
})
}
/// Stop the home server background tasks.
#[allow(dead_code)]
pub async fn stop(&mut self) {
self.user_keys_republisher.stop().await;
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum SignupMode {
Open,
#[default]
TokenRequired,
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct AdminConfig {
/// The password used to authorize admin endpoints.
pub password: Option<String>,
/// Determines whether new signups require a valid token.
pub signup_mode: SignupMode,
}
impl AdminConfig {
pub fn test() -> Self {
AdminConfig {
password: Some("admin".to_string()),
signup_mode: SignupMode::Open,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
/// Database configurations
pub struct CoreConfig {
/// Path to the storage directory.
///
/// Defaults to a directory in the OS data directory
pub storage: PathBuf,
pub db_map_size: usize,
/// The default limit of a list api if no `limit` query parameter is provided.
///
/// Defaults to `100`
pub default_list_limit: u16,
/// The maximum limit of a list api, even if a `limit` query parameter is provided.
///
/// Defaults to `1000`
pub max_list_limit: u16,
/// The interval at which the user keys republisher runs. None is disabled.
///
/// Defaults to `60*60*4` (4 hours)
pub user_keys_republisher_interval: Option<Duration>,
}
impl Default for CoreConfig {
fn default() -> Self {
Self {
storage: storage(None)
.expect("operating environment provides no directory for application data"),
db_map_size: DEFAULT_MAP_SIZE,
default_list_limit: DEFAULT_LIST_LIMIT,
max_list_limit: DEFAULT_MAX_LIST_LIMIT,
user_keys_republisher_interval: Some(Duration::from_secs(60 * 60 * 4)),
}
}
}
impl CoreConfig {
pub fn test() -> Self {
let storage = std::env::temp_dir()
.join(pubky_common::timestamp::Timestamp::now().to_string())
.join(DEFAULT_STORAGE_DIR);
Self {
storage,
db_map_size: 10485760,
..Default::default()
}
}
pub fn is_user_keys_republisher_enabled(&self) -> bool {
self.user_keys_republisher_interval.is_some()
}
}
pub fn storage(storage: Option<String>) -> Result<PathBuf> {
let dir = if let Some(storage) = storage {
PathBuf::from(storage)
} else {
let path = dirs_next::data_dir().ok_or_else(|| {
anyhow::anyhow!("operating environment provides no directory for application data")
})?;
path.join(DEFAULT_STORAGE_DIR)
};
Ok(dir.join("homeserver"))
}
#[cfg(test)]
mod tests {
use anyhow::Result;
use axum::{
body::Body,
extract::Request,
http::{header, Method},
response::Response,
};
use pkarr::Keypair;
use pubky_common::{auth::AuthToken, capabilities::Capability};
use tower::ServiceExt;
use super::*;
impl HomeserverCore {
/// Test version of [HomeserverCore::new], using an ephemeral small storage.
pub fn test() -> Result<Self> {
unsafe { HomeserverCore::new(CoreConfig::test(), AdminConfig::test()) }
}
// === Public Methods ===
pub async fn create_root_user(&mut self, keypair: &Keypair) -> Result<String> {
let auth_token = AuthToken::sign(keypair, vec![Capability::root()]);
let response = self
.call(
Request::builder()
.uri("/signup")
.header("host", keypair.public_key().to_string())
.method(Method::POST)
.body(Body::from(auth_token.serialize()))
.unwrap(),
)
.await?;
let header_value = response
.headers()
.get(header::SET_COOKIE)
.and_then(|h| h.to_str().ok())
.expect("should return a set-cookie header")
.to_string();
Ok(header_value)
}
pub async fn call(&self, request: Request) -> Result<Response> {
Ok(self.router.clone().oneshot(request).await?)
}
}
}

View File

@@ -1,189 +1,9 @@
use std::path::PathBuf;
use anyhow::Result;
use axum::Router;
use pubky_common::auth::AuthVerifier;
pub mod database;
mod error;
mod extractors;
mod homeserver_core;
mod layers;
mod routes;
mod user_keys_republisher;
use crate::config::{
DEFAULT_LIST_LIMIT, DEFAULT_MAP_SIZE, DEFAULT_MAX_LIST_LIMIT, DEFAULT_STORAGE_DIR,
};
use database::DB;
#[derive(Clone, Debug)]
pub(crate) struct AppState {
pub(crate) verifier: AuthVerifier,
pub(crate) db: DB,
pub(crate) admin: AdminConfig,
}
#[derive(Debug, Clone)]
/// A side-effect-free Core of the [crate::Homeserver].
pub struct HomeserverCore {
pub(crate) router: Router,
}
impl HomeserverCore {
/// Create a side-effect-free Homeserver core.
///
/// # Safety
/// HomeserverCore uses LMDB, [opening][heed::EnvOpenOptions::open] which is marked unsafe,
/// because the possible Undefined Behavior (UB) if the lock file is broken.
pub unsafe fn new(config: CoreConfig, admin: AdminConfig) -> Result<Self> {
let db = unsafe { DB::open(config.clone())? };
let state = AppState {
verifier: AuthVerifier::default(),
db,
admin,
};
let router = routes::create_app(state.clone());
Ok(Self { router })
}
}
#[cfg(test)]
mod tests {
use anyhow::Result;
use axum::{
body::Body,
extract::Request,
http::{header, Method},
response::Response,
};
use pkarr::Keypair;
use pubky_common::{auth::AuthToken, capabilities::Capability};
use tower::ServiceExt;
use super::*;
impl HomeserverCore {
/// Test version of [HomeserverCore::new], using an ephemeral small storage.
pub fn test() -> Result<Self> {
unsafe { HomeserverCore::new(CoreConfig::test(), AdminConfig::test()) }
}
// === Public Methods ===
pub async fn create_root_user(&mut self, keypair: &Keypair) -> Result<String> {
let auth_token = AuthToken::sign(keypair, vec![Capability::root()]);
let response = self
.call(
Request::builder()
.uri("/signup")
.header("host", keypair.public_key().to_string())
.method(Method::POST)
.body(Body::from(auth_token.serialize()))
.unwrap(),
)
.await?;
let header_value = response
.headers()
.get(header::SET_COOKIE)
.and_then(|h| h.to_str().ok())
.expect("should return a set-cookie header")
.to_string();
Ok(header_value)
}
pub async fn call(&self, request: Request) -> Result<Response> {
Ok(self.router.clone().oneshot(request).await?)
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum SignupMode {
Open,
#[default]
TokenRequired,
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct AdminConfig {
/// The password used to authorize admin endpoints.
pub password: Option<String>,
/// Determines whether new signups require a valid token.
pub signup_mode: SignupMode,
}
impl AdminConfig {
pub fn test() -> Self {
AdminConfig {
password: Some("admin".to_string()),
signup_mode: SignupMode::Open,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
/// Database configurations
pub struct CoreConfig {
/// Path to the storage directory.
///
/// Defaults to a directory in the OS data directory
pub storage: PathBuf,
pub db_map_size: usize,
/// The default limit of a list api if no `limit` query parameter is provided.
///
/// Defaults to `100`
pub default_list_limit: u16,
/// The maximum limit of a list api, even if a `limit` query parameter is provided.
///
/// Defaults to `1000`
pub max_list_limit: u16,
}
impl Default for CoreConfig {
fn default() -> Self {
Self {
storage: storage(None)
.expect("operating environment provides no directory for application data"),
db_map_size: DEFAULT_MAP_SIZE,
default_list_limit: DEFAULT_LIST_LIMIT,
max_list_limit: DEFAULT_MAX_LIST_LIMIT,
}
}
}
impl CoreConfig {
pub fn test() -> Self {
let storage = std::env::temp_dir()
.join(pubky_common::timestamp::Timestamp::now().to_string())
.join(DEFAULT_STORAGE_DIR);
Self {
storage,
db_map_size: 10485760,
..Default::default()
}
}
}
pub fn storage(storage: Option<String>) -> Result<PathBuf> {
let dir = if let Some(storage) = storage {
PathBuf::from(storage)
} else {
let path = dirs_next::data_dir().ok_or_else(|| {
anyhow::anyhow!("operating environment provides no directory for application data")
})?;
path.join(DEFAULT_STORAGE_DIR)
};
Ok(dir.join("homeserver"))
}
pub use homeserver_core::*;

View File

@@ -0,0 +1,218 @@
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use pkarr::PublicKey;
use pkarr_republisher::{
MultiRepublishResult, MultiRepublisher, RepublisherSettings, ResilientClientBuilderError,
};
use tokio::{
sync::RwLock,
task::JoinHandle,
time::{interval, Instant},
};
use crate::core::database::DB;
#[derive(Debug, thiserror::Error)]
pub enum UserKeysRepublisherError {
#[error(transparent)]
DB(heed::Error),
#[error(transparent)]
Pkarr(ResilientClientBuilderError),
}
/// Publishes the pkarr keys of all users to the Mainline DHT.
#[derive(Debug, Clone)]
pub struct UserKeysRepublisher {
db: DB,
handle: Arc<RwLock<Option<JoinHandle<()>>>>,
is_running: Arc<AtomicBool>,
republish_interval: Duration,
}
impl UserKeysRepublisher {
pub fn new(db: DB, republish_interval: Duration) -> Self {
Self {
db,
handle: Arc::new(RwLock::new(None)),
is_running: Arc::new(AtomicBool::new(false)),
republish_interval,
}
}
/// Run the user keys republisher.
pub async fn run(&self) {
tracing::info!(
"Initialize user keys republisher with interval {:?}",
self.republish_interval
);
let mut lock = self.handle.write().await;
if lock.is_some() {
return;
}
let db = self.db.clone();
let republish_interval = self.republish_interval;
let handle: JoinHandle<()> =
tokio::spawn(async move { Self::run_loop(db, republish_interval).await });
*lock = Some(handle);
self.is_running.store(true, Ordering::Relaxed);
}
// Get all user public keys from the database.
async fn get_all_user_keys(db: DB) -> Result<Vec<PublicKey>, heed::Error> {
let rtxn = db.env.read_txn()?;
let users = db.tables.users.iter(&rtxn)?;
let keys: Vec<PublicKey> = users
.map(|result| result.map(|val| val.0))
.filter_map(Result::ok) // Errors: Db corruption or out of memory. For this use case, we just ignore it.
.collect();
Ok(keys)
}
/// Republishes all user pkarr keys to the Mainline DHT once.
///
/// # Errors
///
/// - If the database cannot be read, an error is returned.
/// - If the pkarr keys cannot be republished, an error is returned.
async fn republish_keys_once(db: DB) -> Result<MultiRepublishResult, UserKeysRepublisherError> {
let keys = Self::get_all_user_keys(db)
.await
.map_err(UserKeysRepublisherError::DB)?;
if keys.is_empty() {
tracing::info!("No user keys to republish.");
return Ok(MultiRepublishResult::new(HashMap::new()));
}
let mut settings = RepublisherSettings::default();
settings.republish_condition(|_| true);
let republisher = MultiRepublisher::new_with_settings(settings, None);
// TODO: Only publish if user points to this home server.
let results = republisher
.run(keys, 12)
.await
.map_err(UserKeysRepublisherError::Pkarr)?;
Ok(results)
}
/// Internal run loop that publishes all user pkarr keys to the Mainline DHT continuously.
async fn run_loop(db: DB, republish_interval: Duration) {
let mut interval = interval(republish_interval);
loop {
interval.tick().await;
let start = Instant::now();
tracing::info!("Republishing user keys...");
let result = match Self::republish_keys_once(db.clone()).await {
Ok(result) => result,
Err(e) => {
tracing::error!("Error republishing user keys: {:?}", e);
continue;
}
};
let elapsed = start.elapsed();
if result.is_empty() {
continue;
}
if result.missing().is_empty() {
tracing::info!(
"Republished {} user keys within {:.1}s. {} success, {} missing, {} failed.",
result.len(),
elapsed.as_secs_f32(),
result.success().len(),
result.missing().len(),
result.publishing_failed().len()
);
} else {
tracing::warn!(
"Republished {} user keys within {:.1}s. {} success, {} missing, {} failed.",
result.len(),
elapsed.as_secs_f32(),
result.success().len(),
result.missing().len(),
result.publishing_failed().len()
);
}
}
}
/// Stop the user keys republisher.
#[allow(dead_code)]
pub async fn stop(&mut self) {
let mut lock = self.handle.write().await;
if let Some(handle) = lock.take() {
handle.abort();
*lock = None;
self.is_running.store(false, Ordering::Relaxed);
}
}
/// Stops the republisher synchronously.
#[allow(dead_code)]
pub fn stop_sync(&mut self) {
let mut lock = self.handle.blocking_write();
if let Some(handle) = lock.take() {
handle.abort();
*lock = None;
self.is_running.store(false, Ordering::Relaxed);
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use pkarr::Keypair;
use tokio::time::Instant;
use crate::core::{
database::{tables::users::User, DB},
user_keys_republisher::UserKeysRepublisher,
};
async fn init_db_with_users(count: usize) -> DB {
let db = DB::test();
let mut wtxn = db.env.write_txn().unwrap();
for _ in 0..count {
let user = User::new();
let public_key = Keypair::random().public_key();
db.tables.users.put(&mut wtxn, &public_key, &user).unwrap();
}
wtxn.commit().unwrap();
db
}
/// Test that the republisher tries to republish all keys passed.
#[tokio::test]
async fn test_republish_keys_once() {
let db = init_db_with_users(10).await;
let result = UserKeysRepublisher::republish_keys_once(db).await.unwrap();
assert_eq!(result.len(), 10);
assert_eq!(result.success().len(), 0);
assert_eq!(result.missing().len(), 10);
assert_eq!(result.publishing_failed().len(), 0);
}
/// Test that the republisher stops instantly.
#[tokio::test]
async fn start_and_stop() {
let mut republisher =
UserKeysRepublisher::new(init_db_with_users(1000).await, Duration::from_secs(1));
let start = Instant::now();
republisher.run().await;
assert!(republisher.handle.read().await.is_some());
republisher.stop().await;
let elapsed = start.elapsed();
assert!(elapsed < Duration::from_secs(1));
assert!(republisher.handle.read().await.is_none());
}
}

View File

@@ -0,0 +1,171 @@
//! Pkarr related task
use anyhow::Result;
use pkarr::errors::PublishError;
use pkarr::{dns::rdata::SVCB, Keypair, SignedPacket};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time::{interval, Duration};
use super::IoConfig;
/// Republishes the homeserver's pkarr packet to the DHT every hour.
#[derive(Debug)]
pub struct HomeserverKeyRepublisher {
client: pkarr::Client,
signed_packet: SignedPacket,
republish_task: Mutex<Option<JoinHandle<()>>>,
}
impl HomeserverKeyRepublisher {
pub fn new(
keypair: &Keypair,
config: &IoConfig,
https_port: u16,
http_port: u16,
) -> Result<Self> {
let mut builder = pkarr::Client::builder();
if let Some(bootstrap) = &config.bootstrap {
builder.bootstrap(bootstrap);
}
if let Some(request_timeout) = config.dht_request_timeout {
builder.request_timeout(request_timeout);
}
let client = builder.build()?;
let signed_packet = create_signed_packet(keypair, config, https_port, http_port)?;
Ok(Self {
client,
signed_packet,
republish_task: Mutex::new(None),
})
}
async fn publish_once(
client: &pkarr::Client,
signed_packet: &SignedPacket,
) -> Result<(), PublishError> {
let res = client.publish(signed_packet, None).await;
if let Err(e) = &res {
tracing::warn!(
"Failed to publish the homeserver's pkarr packet to the DHT: {}",
e
);
} else {
tracing::info!("Published the homeserver's pkarr packet to the DHT.");
}
res
}
/// Start the periodic republish task which will republish the server packet to the DHT every hour.
///
/// # Errors
/// - Throws an error if the initial publish fails.
/// - Throws an error if the periodic republish task is already running.
pub async fn start_periodic_republish(&self) -> anyhow::Result<()> {
let mut task_guard = self.republish_task.lock().await;
if task_guard.is_some() {
return Err(anyhow::anyhow!(
"Periodic republish task is already running"
));
}
// Publish once to make sure the packet is published to the DHT before this
// function returns.
// Throws an error if the packet is not published to the DHT.
Self::publish_once(&self.client, &self.signed_packet).await?;
// Start the periodic republish task.
let client = self.client.clone();
let signed_packet = self.signed_packet.clone();
let handle = tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(60 * 60)); // 1 hour in seconds
interval.tick().await; // This ticks immediatly. Wait for first interval before starting the loop.
loop {
interval.tick().await;
let _ = Self::publish_once(&client, &signed_packet).await;
}
});
*task_guard = Some(handle);
Ok(())
}
/// Stop the periodic republish task.
pub async fn stop_periodic_republish(&self) {
let mut task_guard = self.republish_task.lock().await;
if let Some(handle) = task_guard.take() {
handle.abort();
}
}
}
pub fn create_signed_packet(
keypair: &Keypair,
config: &IoConfig,
https_port: u16,
http_port: u16,
) -> Result<SignedPacket> {
// TODO: Try to resolve first before publishing.
let mut signed_packet_builder = SignedPacket::builder();
let mut svcb = SVCB::new(0, ".".try_into()?);
// Set the public Ip or localhost
signed_packet_builder = signed_packet_builder.address(
".".try_into()
.expect(". is valid domain and therefore always succeeds"),
config
.public_addr
.map(|addr| addr.ip())
.unwrap_or("127.0.0.1".parse().expect("localhost is valid ip")),
60 * 60,
);
// Set the public port or the local https_port
svcb.set_port(
config
.public_addr
.map(|addr| addr.port())
.unwrap_or(https_port),
);
signed_packet_builder = signed_packet_builder.https(
".".try_into()
.expect(". is valid domain and therefore always succeeds"),
svcb,
60 * 60,
);
// Set low priority https record for legacy browsers support
if let Some(ref domain) = config.domain {
let mut svcb = SVCB::new(10, ".".try_into()?);
let http_port_be_bytes = http_port.to_be_bytes();
if domain == "localhost" {
svcb.set_param(
pubky_common::constants::reserved_param_keys::HTTP_PORT,
&http_port_be_bytes,
)?;
}
svcb.target = domain.as_str().try_into()?;
signed_packet_builder = signed_packet_builder.https(
".".try_into()
.expect(". is valid domain and therefore always succeeds"),
svcb,
60 * 60,
);
}
Ok(signed_packet_builder.build(keypair)?)
}

View File

@@ -6,8 +6,8 @@ use std::{
use ::pkarr::{Keypair, PublicKey};
use anyhow::Result;
use homeserver_key_republisher::HomeserverKeyRepublisher;
use http::HttpServers;
use pkarr::PkarrServer;
use tracing::info;
use crate::{
@@ -15,8 +15,8 @@ use crate::{
core::{HomeserverCore, SignupMode},
};
mod homeserver_key_republisher;
mod http;
mod pkarr;
#[derive(Debug, Default)]
/// Builder for [Homeserver].
@@ -88,6 +88,7 @@ impl HomeserverBuilder {
pub struct Homeserver {
http_servers: HttpServers,
keypair: Keypair,
pkarr_server: HomeserverKeyRepublisher,
}
impl Homeserver {
@@ -135,26 +136,23 @@ impl Homeserver {
let http_servers = HttpServers::run(&keypair, &config.io, &core.router).await?;
info!(
"Homeserver listening on http://localhost:{}",
http_servers.http_address().port()
);
info!("Publishing Pkarr packet..");
let pkarr_server = PkarrServer::new(
let dht_republisher = HomeserverKeyRepublisher::new(
&keypair,
&config.io,
http_servers.https_address().port(),
http_servers.http_address().port(),
)?;
pkarr_server.publish_server_packet().await?;
dht_republisher.start_periodic_republish().await?;
info!(
"Homeserver listening on http://localhost:{}",
http_servers.http_address().port()
);
info!("Homeserver listening on https://{}", keypair.public_key());
Ok(Self {
http_servers,
keypair,
pkarr_server: dht_republisher,
})
}
@@ -173,8 +171,9 @@ impl Homeserver {
// === Public Methods ===
/// Send a shutdown signal to all open resources
pub fn shutdown(&self) {
pub async fn shutdown(&self) {
self.http_servers.shutdown();
self.pkarr_server.stop_periodic_republish().await;
}
}
@@ -197,7 +196,6 @@ impl Default for IoConfig {
IoConfig {
https_port: DEFAULT_HTTPS_PORT,
http_port: DEFAULT_HTTP_PORT,
public_addr: None,
domain: None,
bootstrap: None,

View File

@@ -1,103 +0,0 @@
//! Pkarr related task
use anyhow::Result;
use pkarr::{dns::rdata::SVCB, Keypair, SignedPacket};
use super::IoConfig;
pub struct PkarrServer {
client: pkarr::Client,
signed_packet: SignedPacket,
}
impl PkarrServer {
pub fn new(
keypair: &Keypair,
config: &IoConfig,
https_port: u16,
http_port: u16,
) -> Result<Self> {
let mut builder = pkarr::Client::builder();
// TODO: should we enable relays in homeservers for udp restricted environments?
builder.no_relays();
if let Some(bootstrap) = &config.bootstrap {
builder.bootstrap(bootstrap);
}
if let Some(request_timeout) = config.dht_request_timeout {
builder.request_timeout(request_timeout);
}
let client = builder.build()?;
let signed_packet = create_signed_packet(keypair, config, https_port, http_port)?;
Ok(Self {
client,
signed_packet,
})
}
pub async fn publish_server_packet(&self) -> anyhow::Result<()> {
// TODO: warn if packet is not most recent, which means the
// user is publishing a Packet from somewhere else.
self.client.publish(&self.signed_packet, None).await?;
Ok(())
}
}
pub fn create_signed_packet(
keypair: &Keypair,
config: &IoConfig,
https_port: u16,
http_port: u16,
) -> Result<SignedPacket> {
// TODO: Try to resolve first before publishing.
let mut signed_packet_builder = SignedPacket::builder();
let mut svcb = SVCB::new(0, ".".try_into()?);
// Set the public Ip or the loclahost
signed_packet_builder = signed_packet_builder.address(
".".try_into().unwrap(),
config
.public_addr
.map(|addr| addr.ip())
.unwrap_or("127.0.0.1".parse().expect("localhost is valid ip")),
60 * 60,
);
// Set the public port or the local https_port
svcb.set_port(
config
.public_addr
.map(|addr| addr.port())
.unwrap_or(https_port),
);
signed_packet_builder = signed_packet_builder.https(".".try_into().unwrap(), svcb, 60 * 60);
// Set low priority https record for legacy browsers support
if let Some(ref domain) = config.domain {
let mut svcb = SVCB::new(10, ".".try_into()?);
let http_port_be_bytes = http_port.to_be_bytes();
if domain == "localhost" {
svcb.set_param(
pubky_common::constants::reserved_param_keys::HTTP_PORT,
&http_port_be_bytes,
)?;
}
svcb.target = domain.as_str().try_into()?;
signed_packet_builder = signed_packet_builder.https(".".try_into().unwrap(), svcb, 60 * 60);
}
Ok(signed_packet_builder.build(keypair)?)
}

View File

@@ -39,7 +39,7 @@ async fn main() -> Result<()> {
tracing::info!("Shutting down Homeserver");
server.shutdown();
server.shutdown().await;
Ok(())
}