From 5393b4575af19eaa445e1f967aa375b5c837eabe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Severin=20Alexander=20B=C3=BChler?= <8782386+SeverinAlexB@users.noreply.github.com> Date: Tue, 18 Mar 2025 17:07:07 +0200 Subject: [PATCH] feat: Pkarr republisher for user and homeserver keys (#85) * added bin folder * removed published_secrets.txt * added single key publisher * read and republish * some changes * made everything nice * added RetrySettings * added resilient client * fmt * renamed package * readme * added user key republisher * fmt * extracted DB and homeserver core from mod * homeserver packet republish periodically * small changes * with final message * conditional republish * fmt * clippy * moved bin to examples * improved readme * handle relay only clients * handled no dht available * ignore corrupt users * handle heed entry errors * improved unwraps * fixes * fixes * added republish interval to config * use warn when some keys are missing * removed old todo * fixes * fmt * fix relays * use resilient client in homeserverkeyrepublisher * moved some news to default * more defaults * fixed tests * fmt * test * removed resilient client for homeserver publish again * clippy * made code simpler * refactored code * added .vscode to gitingore * resutlt match --- .gitignore | 4 + Cargo.lock | 93 +++- Cargo.toml | 2 +- pkarr-republisher/.gitignore | 1 + pkarr-republisher/Cargo.toml | 30 ++ pkarr-republisher/README.md | 86 ++++ .../examples/publish_and_save.rs | 156 +++++++ .../examples/read_and_republish.rs | 118 +++++ pkarr-republisher/examples/read_and_verify.rs | 113 +++++ pkarr-republisher/src/lib.rs | 11 + pkarr-republisher/src/multi_republisher.rs | 252 +++++++++++ pkarr-republisher/src/publisher.rs | 331 ++++++++++++++ pkarr-republisher/src/republisher.rs | 406 ++++++++++++++++++ pkarr-republisher/src/resilient_client.rs | 91 ++++ pkarr-republisher/src/verify.rs | 12 + pubky-homeserver/Cargo.toml | 3 + pubky-homeserver/src/config.example.toml | 3 + pubky-homeserver/src/config.rs | 3 +- pubky-homeserver/src/core/database/db.rs | 74 ++++ pubky-homeserver/src/core/database/mod.rs | 86 +--- .../src/core/database/tables/users.rs | 11 +- pubky-homeserver/src/core/homeserver_core.rs | 224 ++++++++++ pubky-homeserver/src/core/mod.rs | 186 +------- .../src/core/user_keys_republisher.rs | 218 ++++++++++ .../src/io/homeserver_key_republisher.rs | 171 ++++++++ pubky-homeserver/src/io/mod.rs | 26 +- pubky-homeserver/src/io/pkarr.rs | 103 ----- pubky-homeserver/src/main.rs | 2 +- 28 files changed, 2410 insertions(+), 406 deletions(-) create mode 100644 pkarr-republisher/.gitignore create mode 100644 pkarr-republisher/Cargo.toml create mode 100644 pkarr-republisher/README.md create mode 100644 pkarr-republisher/examples/publish_and_save.rs create mode 100644 pkarr-republisher/examples/read_and_republish.rs create mode 100644 pkarr-republisher/examples/read_and_verify.rs create mode 100644 pkarr-republisher/src/lib.rs create mode 100644 pkarr-republisher/src/multi_republisher.rs create mode 100644 pkarr-republisher/src/publisher.rs create mode 100644 pkarr-republisher/src/republisher.rs create mode 100644 pkarr-republisher/src/resilient_client.rs create mode 100644 pkarr-republisher/src/verify.rs create mode 100644 pubky-homeserver/src/core/database/db.rs create mode 100644 pubky-homeserver/src/core/homeserver_core.rs create mode 100644 pubky-homeserver/src/core/user_keys_republisher.rs create mode 100644 pubky-homeserver/src/io/homeserver_key_republisher.rs delete mode 100644 pubky-homeserver/src/io/pkarr.rs diff --git a/.gitignore b/.gitignore index af03ac7..b84cc56 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,6 @@ target/ storage/ +.continuerules +.cursorrules +.cursor/* +.vscode/* \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index b8bf73d..c09d028 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -626,6 +626,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "ctrlc" +version = "3.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90eeab0aa92f3f9b4e87f258c72b139c207d251f9cbc1080a0086b86a8870dd3" +dependencies = [ + "nix", + "windows-sys 0.59.0", +] + [[package]] name = "curve25519-dalek" version = "4.1.3" @@ -1621,9 +1631,9 @@ checksum = "227748d55f2f0ab4735d87fd623798cb6b664512fe979705f829c9f81c934465" [[package]] name = "mainline" -version = "5.3.0" +version = "5.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d20596de02883daff9a64c0c02bb4371c44dc7988cd07ecb61d166281c7733d3" +checksum = "2fae24c3d129b92c8cfea92a9e2014052371a2835e4a6d66dfdb00238e389e56" dependencies = [ "crc", "document-features", @@ -1637,7 +1647,7 @@ dependencies = [ "serde_bencode", "serde_bytes", "sha1_smol", - "thiserror 2.0.11", + "thiserror 2.0.12", "tracing", ] @@ -1723,6 +1733,18 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "no-std-compat" version = "0.4.1" @@ -1741,6 +1763,21 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" +[[package]] +name = "ntimestamp" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c50f94c405726d3e0095e89e72f75ce7f6587b94a8bd8dc8054b73f65c0fd68c" +dependencies = [ + "base32", + "document-features", + "getrandom 0.2.15", + "httpdate", + "js-sys", + "once_cell", + "serde", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1960,9 +1997,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkarr" -version = "3.3.3" +version = "3.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c7f42132fd7f5dcfca23b791099dec416d67719c16f362bc2640d05078e5661" +checksum = "b006293464515e54044b64e1853c893111eb4e579f9f59decba0039d7c27e2f9" dependencies = [ "async-compat", "base32", @@ -1980,8 +2017,8 @@ dependencies = [ "log", "lru", "mainline", + "ntimestamp", "page_size", - "pubky-timestamp", "reqwest", "rustls", "rustls-webpki", @@ -1989,7 +2026,7 @@ dependencies = [ "serde", "sha1_smol", "simple-dns", - "thiserror 2.0.11", + "thiserror 2.0.12", "tokio", "tracing", "url", @@ -2015,7 +2052,7 @@ dependencies = [ "pkarr", "rustls", "serde", - "thiserror 2.0.11", + "thiserror 2.0.12", "tokio", "toml", "tower-http", @@ -2025,6 +2062,24 @@ dependencies = [ "url", ] +[[package]] +name = "pkarr-republisher" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "ctrlc", + "futures-lite", + "hex", + "pkarr", + "pubky-testnet", + "rand 0.9.0", + "thiserror 2.0.12", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "pkcs8" version = "0.10.2" @@ -2123,7 +2178,7 @@ dependencies = [ "pubky-common", "pubky-testnet", "reqwest", - "thiserror 2.0.11", + "thiserror 2.0.12", "tokio", "tracing", "tracing-subscriber", @@ -2149,7 +2204,7 @@ dependencies = [ "pubky-timestamp", "rand 0.9.0", "serde", - "thiserror 2.0.11", + "thiserror 2.0.12", ] [[package]] @@ -2171,9 +2226,11 @@ dependencies = [ "httpdate", "page_size", "pkarr", + "pkarr-republisher", "postcard", "pubky-common", "serde", + "thiserror 2.0.12", "tokio", "toml", "tower 0.5.2", @@ -2253,7 +2310,7 @@ dependencies = [ "rustc-hash", "rustls", "socket2", - "thiserror 2.0.11", + "thiserror 2.0.12", "tokio", "tracing", ] @@ -2272,7 +2329,7 @@ dependencies = [ "rustls", "rustls-pki-types", "slab", - "thiserror 2.0.11", + "thiserror 2.0.12", "tinyvec", "tracing", "web-time", @@ -2990,11 +3047,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.11" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" dependencies = [ - "thiserror-impl 2.0.11", + "thiserror-impl 2.0.12", ] [[package]] @@ -3010,9 +3067,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.11" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" dependencies = [ "proc-macro2", "quote", @@ -3267,7 +3324,7 @@ dependencies = [ "governor", "http", "pin-project", - "thiserror 2.0.11", + "thiserror 2.0.12", "tower 0.5.2", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 6621462..661a42d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = [ "pubky-*", "http-relay", - + "pkarr-republisher", "examples" ] diff --git a/pkarr-republisher/.gitignore b/pkarr-republisher/.gitignore new file mode 100644 index 0000000..cedc961 --- /dev/null +++ b/pkarr-republisher/.gitignore @@ -0,0 +1 @@ +published_secret*.txt \ No newline at end of file diff --git a/pkarr-republisher/Cargo.toml b/pkarr-republisher/Cargo.toml new file mode 100644 index 0000000..40ae318 --- /dev/null +++ b/pkarr-republisher/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "pkarr-republisher" +version = "0.1.0" +edition = "2021" +authors = ["Severin Alex Bühler "] +description = "A pkarr packet republisher." +license = "MIT" +homepage = "https://github.com/pubky/pubky-core" +repository = "https://github.com/pubky/pubky-core" +keywords = ["pkarr", "mainline", "pubky"] +categories = ["web-programming"] + +[dependencies] +anyhow = "1.0.95" +pkarr = "3.5.3" +tokio = { version = "1.43.0", features = ["full"] } +tracing = "0.1.41" +futures-lite = { version = "2.6.0"} +thiserror = "2.0.12" + +# bin dependencies +clap = { version = "4.4", features = ["derive"] } +tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } +ctrlc = "3.4.5" +hex = "0.4.3" +rand = "0.9.0" + +[dev-dependencies] +pubky-testnet = { path = "../pubky-testnet" } + diff --git a/pkarr-republisher/README.md b/pkarr-republisher/README.md new file mode 100644 index 0000000..c8f97a7 --- /dev/null +++ b/pkarr-republisher/README.md @@ -0,0 +1,86 @@ +# Pkarr Republisher + +> Early version. Expect breaking API changes. Can still be heavily performance optimized especially by improving the `mainline` lib. + +To keep data on the Mainline DHT alive, it needs to be actively republished every hour. This library provides the tools to republish packets reliably +and in a multi-threaded fashion. This allows the homeserver to republish hundrets of thousands of pkarr keys per day. + + + +## Usage + +**ResilientClient** Pkarr Client with retry and exponential backoff. + +Takes [pkarr](https://github.com/pubky/pkarr) and makes it resilient to UDP unreliabilities and CPU exhaustion +by retrying operations with an exponential backoff. Retries help with UDP packet loss and the backoff gives the CPU time to recover. + +```rust +use pkarr_republisher::ResilientClient; + +let client = ResilientClient::new().unwrap(); +let public_key = Keypair::random().public_key(); + +// Republish with retries +match client.republish(public_key.clone(), None).await { + Ok(info) => { + println!("Key {public_key} published to {} nodes after {} attempt(s).", info.published_nodes_count, info.attempts_needed); + }, + Err(err) => { + if err.is_missing() { + println!("Key {public_key} not found in DHT."); + } + if err.is_publish_failed() { + println!("Key {public_key} failed to publish. {err}"); + } + } +} +``` + +> **Limitation** ResilientClient requires a pkarr client that was built with the `dht` feature. +> Relays only are not supported. + +**MultiRepublisher** Multi-threaded republisher of pkarr keys. + +Uses the ResilientClient to publish hundrets of thousands of pkarr keys per day. + +```rust +use pkarr_republisher::MultiRepublisher; +use pkarr::{Keypair, PublicKey}; + +let public_keys: Vec = (0..100).map(|_| Keypair::random().public_key()).collect(); +let republisher = MultiRepublisher::new().unwrap(); +let results = republisher.run(public_keys, 10).await.expect("UDP socket build infallible"); + +// Verify result of each republished key. +for (key, result) in results { + match result { + Ok(info) => { + println!("Key {} published to {} nodes after {} attempt(s).", key, info.published_nodes_count, info.attempts_needed); + }, + Err(err) => { + if err.is_missing() { + println!("Key {} not found in DHT.", key); + } else if err.is_publish_failed() { + println!("Key {} failed to publish: {}", key, err); + } else { + println!("Key {} encountered an error: {}", key, err); + } + } + } +} +``` + +> **Limitation** Publishing a high number of pkarr keys is CPU intense. A recent test showed a 4 Core CPU being able to publish ~600,000 keys in 24hrs. +> Takes this into consideration. +> Do not use pkarr relays with the MultiRepublisher. You will run into rate limits which are currently not handled. + + +## Examples + +The [examples folder](./examples) contains scripts to test the performance of the republisher. + +- [publish_and_save](./examples/publish_and_save.rs) Publishes x keys multi-threaded and saves them in `published_secrets.txt`. +- [read_and_verify](./examples/read_and_verify.rs) Takes a random sample of the published keys and verifies on how many nodes they've been stored on. +- [read_and_republish](./examples/read_and_republish.rs) takes the saved keys and republishes them multi-threaded. + +Execute with `cargo run --example publish_and_save` \ No newline at end of file diff --git a/pkarr-republisher/examples/publish_and_save.rs b/pkarr-republisher/examples/publish_and_save.rs new file mode 100644 index 0000000..3c22b9c --- /dev/null +++ b/pkarr-republisher/examples/publish_and_save.rs @@ -0,0 +1,156 @@ +//! +//! Publishs packets with random keys and saves the published public keys in a file +//! so they can be reused in other experiments. +//! +//! Run with `cargo run --example publish_and_save -- --num-records 100 --threads 6`. +//! + +use clap::Parser; + +use pkarr::{dns::Name, Client, Keypair, SignedPacket}; +use pkarr_republisher::{ResilientClient, RetrySettings}; +use std::{ + process, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; +use tokio::time::sleep; +use tracing::{info, level_filters::LevelFilter}; +use tracing_subscriber::EnvFilter; + +#[derive(Parser, Debug)] +#[command( + author, + about = "Publish random packets and save them in `published_secrets.txt`." +)] +struct Cli { + /// Number of records to publish + #[arg(long, default_value_t = 100)] + num_records: usize, + + /// Number of parallel threads + #[arg(long, default_value_t = 6)] + threads: usize, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let cli = Cli::parse(); + println!("publish_and_save started."); + + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into())) + .init(); + + // Set up the Ctrl+C handler + let ctrlc_pressed: Arc = Arc::new(AtomicBool::new(false)); + let r = ctrlc_pressed.clone(); + ctrlc::set_handler(move || { + r.store(true, Ordering::SeqCst); + println!("Ctrl+C detected, shutting down..."); + std::process::exit(0); + }) + .expect("Error setting Ctrl+C handler"); + + println!("Press Ctrl+C to stop..."); + + info!("Publish {} records. Verify", cli.num_records); + let published_keys = publish_parallel(cli.num_records, cli.threads, &ctrlc_pressed).await; + + // Turn into a hex list and write to file + let pubkeys = published_keys + .clone() + .into_iter() + .map(|key| { + let secret = key.secret_key(); + hex::encode(secret) + }) + .collect::>(); + let pubkeys_str = pubkeys.join("\n"); + std::fs::write("published_secrets.txt", pubkeys_str).unwrap(); + info!("Successfully wrote secrets keys to published_secrets.txt"); + + Ok(()) +} + +// Publish records in multiple threads. +async fn publish_parallel( + num_records: usize, + threads: usize, + ctrlc_pressed: &Arc, +) -> Vec { + let start = Instant::now(); + let mut handles = vec![]; + for thread_id in 0..threads { + let handle = tokio::spawn(async move { + tracing::info!("Started thread t{thread_id}"); + publish_records(num_records / threads, thread_id).await + }); + handles.push(handle); + } + + loop { + let all_finished = handles + .iter() + .map(|handle| handle.is_finished()) + .reduce(|a, b| a && b) + .unwrap(); + if all_finished { + break; + } + if ctrlc_pressed.load(Ordering::Relaxed) { + break; + } + sleep(Duration::from_millis(250)).await; + } + + if ctrlc_pressed.load(Ordering::Relaxed) { + process::exit(0); + } + + let mut all_result = vec![]; + for handle in handles { + let keys = handle.await.unwrap(); + all_result.extend(keys); + } + + let rate = all_result.len() as f64 / start.elapsed().as_secs() as f64; + tracing::info!( + "Published {} keys in {} seconds at {rate:.2} keys/s", + all_result.len(), + start.elapsed().as_secs() + ); + + all_result +} + +// Publishes x number of packets. Checks if they are actually available. +async fn publish_records(num_records: usize, thread_id: usize) -> Vec { + let client = Client::builder().no_relays().build().unwrap(); + let rclient = ResilientClient::new_with_client(client, RetrySettings::default()).unwrap(); + let mut records = vec![]; + + for i in 0..num_records { + let instant = Instant::now(); + let key = Keypair::random(); + let packet: SignedPacket = pkarr::SignedPacketBuilder::default() + .cname(Name::new("test").unwrap(), Name::new("test2").unwrap(), 600) + .build(&key) + .unwrap(); + let result = rclient.publish(packet, None).await; + let elapsed_time = instant.elapsed().as_millis(); + if result.is_ok() { + let info = result.unwrap(); + tracing::info!("- t{thread_id:<2} {i:>3}/{num_records} Published {} within {elapsed_time}ms to {} nodes {} attempts", key.public_key(), info.published_nodes_count, info.attempts_needed); + records.push(key); + } else { + let e = result.unwrap_err(); + tracing::error!("Failed to publish {} record: {e:?}", key.public_key()); + continue; + } + } + records +} diff --git a/pkarr-republisher/examples/read_and_republish.rs b/pkarr-republisher/examples/read_and_republish.rs new file mode 100644 index 0000000..e4f0c22 --- /dev/null +++ b/pkarr-republisher/examples/read_and_republish.rs @@ -0,0 +1,118 @@ +//! +//! Reads `published_secrets.txt` and tries to republish the packets. +//! This is done in a multi-threaded way to improve speed. +//! +//! Run with `cargo run --example read_and_republish -- --num-records 100 --threads 10`. +//! + +use clap::Parser; +use pkarr::{ClientBuilder, Keypair}; +use pkarr_republisher::{MultiRepublisher, RepublisherSettings}; +use rand::rng; +use rand::seq::SliceRandom; +use std::{ + process, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Instant, +}; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::EnvFilter; + +#[derive(Parser, Debug)] +#[command( + author, + about = "Reads `published_secrets.txt` and tries to republish the packets." +)] +struct Cli { + /// How many keys should be republished? + #[arg(long, default_value_t = 100)] + num_records: usize, + + /// Number of parallel threads + #[arg(long, default_value_t = 10)] + threads: u8, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let cli = Cli::parse(); + + println!("read_and_republish started."); + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into())) + .init(); + + // Set up the Ctrl+C handler + let ctrlc_pressed: Arc = Arc::new(AtomicBool::new(false)); + let r = ctrlc_pressed.clone(); + ctrlc::set_handler(move || { + r.store(true, Ordering::SeqCst); + println!("Ctrl+C detected, shutting down..."); + process::exit(0); + }) + .expect("Error setting Ctrl+C handler"); + println!("Press Ctrl+C to stop..."); + + println!("Read published_secrets.txt"); + let mut published_keys = read_keys(); + println!("Read {} keys", published_keys.len()); + + println!( + "Take a random sample of {} keys to republish.", + cli.num_records + ); + let mut rng = rng(); + published_keys.shuffle(&mut rng); + let keys: Vec = published_keys.into_iter().take(cli.num_records).collect(); + + run_churn_loop(keys, cli.threads).await; + + Ok(()) +} + +fn read_keys() -> Vec { + let secret_srs = std::fs::read_to_string("published_secrets.txt").expect("File not found"); + let keys = secret_srs + .lines() + .map(|line| line.to_string()) + .collect::>(); + keys.into_iter() + .map(|key| { + let secret = hex::decode(key).expect("invalid hex"); + let secret: [u8; 32] = secret.try_into().unwrap(); + Keypair::from_secret_key(&secret) + }) + .collect::>() +} + +async fn run_churn_loop(keys: Vec, thread_count: u8) { + let public_keys = keys.into_iter().map(|key| key.public_key()).collect(); + + let mut builder = ClientBuilder::default(); + builder.no_relays(); + let republisher = + MultiRepublisher::new_with_settings(RepublisherSettings::default(), Some(builder)); + + println!("Republish keys. Hold on..."); + let start = Instant::now(); + let results = republisher.run(public_keys, thread_count).await.unwrap(); + + let elapsed_seconds = start.elapsed().as_secs_f32(); + let keys_per_s = results.len() as f32 / elapsed_seconds; + tracing::info!( + "Processed {} keys within {elapsed_seconds:.2}s. {keys_per_s:.2} keys/s.", + results.len() + ); + + tracing::info!( + "{} success, {} missing, {} failed.", + results.success().len(), + results.missing().len(), + results.publishing_failed().len() + ); + + tracing::info!("Republishing finished."); +} diff --git a/pkarr-republisher/examples/read_and_verify.rs b/pkarr-republisher/examples/read_and_verify.rs new file mode 100644 index 0000000..c11e3fc --- /dev/null +++ b/pkarr-republisher/examples/read_and_verify.rs @@ -0,0 +1,113 @@ +//! +//! Reads `published_secrets.txt` and outputs how many nodes store this public key. +//! Run `publish_and_save` first to publish some packets to verify +//! Freshly stored once should have 15+. +//! <10 is ready for a republish. +//! 0 = Packet unavailable. +//! +//! Run with `cargo run --example read_and_verify -- --num_keys 20` +//! + +use clap::Parser; +use pkarr::{Client, Keypair}; +use pkarr_republisher::{ResilientClient, RetrySettings}; +use rand::rng; +use rand::seq::SliceRandom; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +use tracing::{info, level_filters::LevelFilter}; +use tracing_subscriber::EnvFilter; + +#[derive(Parser, Debug)] +#[command(author, about = "Verify pkarr packets on the DHT.")] +struct Cli { + /// Verify x keys by checking how many nodes it was stored on. + #[arg(long, default_value_t = 20)] + num_records: usize, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let cli = Cli::parse(); + println!("read_and_verify started."); + + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into())) + .init(); + + // Set up the Ctrl+C handler + let ctrlc_pressed: Arc = Arc::new(AtomicBool::new(false)); + let r = ctrlc_pressed.clone(); + ctrlc::set_handler(move || { + r.store(true, Ordering::SeqCst); + println!("Ctrl+C detected, shutting down..."); + std::process::exit(0); + }) + .expect("Error setting Ctrl+C handler"); + + println!("Press Ctrl+C to stop..."); + + println!("Read published_secrets.txt"); + let published_keys = read_keys(); + println!("Read {} keys", published_keys.len()); + + let num_verify_keys = cli.num_records; + info!("Randomly verify: {num_verify_keys} keys"); + verify_published(&published_keys, num_verify_keys).await; + Ok(()) +} + +fn read_keys() -> Vec { + let secret_srs = std::fs::read_to_string("published_secrets.txt").expect("File not found"); + let keys = secret_srs + .lines() + .map(|line| line.to_string()) + .collect::>(); + keys.into_iter() + .map(|key| { + let secret = hex::decode(key).expect("invalid hex"); + let secret: [u8; 32] = secret.try_into().unwrap(); + Keypair::from_secret_key(&secret) + }) + .collect::>() +} + +async fn verify_published(keys: &[Keypair], count: usize) { + // Shuffle and take {count} elements to verify. + let mut keys: Vec = keys.to_owned(); + let mut rng = rng(); + keys.shuffle(&mut rng); + let keys: Vec = keys.into_iter().take(count).collect(); + + let client = Client::builder().no_relays().build().unwrap(); + let rclient = ResilientClient::new_with_client(client, RetrySettings::default()).unwrap(); + let mut success = 0; + let mut warn = 0; + let mut error = 0; + for (i, key) in keys.into_iter().enumerate() { + let nodes_count = rclient.verify_node_count(&key.public_key()).await; + if nodes_count == 0 { + tracing::error!( + "- {i}/{count} Verify {} found on {nodes_count} nodes.", + key.public_key() + ); + error += 1; + } else if nodes_count < 5 { + tracing::warn!( + "- {i}/{count} Verify {} found on {nodes_count} nodes.", + key.public_key() + ); + warn += 1; + } else { + tracing::info!( + "- {i}/{count} Verify {} found on {nodes_count} nodes.", + key.public_key() + ); + success += 1; + } + } + println!("Success: {success}, Warn: {warn}, Error: {error}"); +} diff --git a/pkarr-republisher/src/lib.rs b/pkarr-republisher/src/lib.rs new file mode 100644 index 0000000..90e28cd --- /dev/null +++ b/pkarr-republisher/src/lib.rs @@ -0,0 +1,11 @@ +mod multi_republisher; +mod publisher; +mod republisher; +mod resilient_client; +mod verify; + +pub use multi_republisher::*; +pub use publisher::*; +pub use republisher::*; +pub use resilient_client::*; +pub use verify::count_key_on_dht; diff --git a/pkarr-republisher/src/multi_republisher.rs b/pkarr-republisher/src/multi_republisher.rs new file mode 100644 index 0000000..b1b1e8b --- /dev/null +++ b/pkarr-republisher/src/multi_republisher.rs @@ -0,0 +1,252 @@ +use crate::{ + republisher::{RepublishError, RepublishInfo, RepublisherSettings}, + ResilientClient, ResilientClientBuilderError, +}; +use pkarr::PublicKey; +use std::collections::HashMap; +use tokio::time::Instant; + +#[derive(Debug, Clone)] +pub struct MultiRepublishResult { + results: HashMap>, +} + +impl MultiRepublishResult { + pub fn new(results: HashMap>) -> Self { + Self { results } + } + + /// Number of keys + pub fn len(&self) -> usize { + self.results.len() + } + + pub fn is_empty(&self) -> bool { + self.results.is_empty() + } + + /// All keys + pub fn all_keys(&self) -> Vec { + self.results.keys().cloned().collect() + } + + /// Successfully published keys + pub fn success(&self) -> Vec { + self.results + .iter() + .filter(|(_, result)| result.is_ok()) + .map(|(key, _)| key.clone()) + .collect() + } + + /// Keys that failed to publish + pub fn publishing_failed(&self) -> Vec { + self.results + .iter() + .filter(|(_, val)| { + if let Err(e) = val { + return e.is_publish_failed(); + } + false + }) + .map(|entry| entry.0.clone()) + .collect() + } + + /// Keys that are missing and could not be republished + pub fn missing(&self) -> Vec { + self.results + .iter() + .filter(|(_, val)| { + if let Err(e) = val { + return e.is_missing(); + } + false + }) + .map(|entry| entry.0.clone()) + .collect() + } +} + +/// Republish multiple keys in a serially or multi-threaded way/ +#[derive(Debug, Clone, Default)] +pub struct MultiRepublisher { + settings: RepublisherSettings, + client_builder: pkarr::ClientBuilder, +} + +impl MultiRepublisher { + pub fn new() -> Self { + Self::default() + } + + /// Create a new republisher with the settings. + /// The republisher ignores the settings.client but instead uses the client_builder to create multiple + /// pkarr clients instead of just one. + pub fn new_with_settings( + mut settings: RepublisherSettings, + client_builder: Option, + ) -> Self { + settings.client = None; // Remove client if it's there because every thread will have it's own. + let builder = client_builder.unwrap_or_default(); + Self { + settings, + client_builder: builder, + } + } + + /// Go through the list of all public keys and republish them serially. + async fn run_serially( + &self, + public_keys: Vec, + ) -> Result< + HashMap>, + ResilientClientBuilderError, + > { + let mut results: HashMap> = + HashMap::with_capacity(public_keys.len()); + tracing::debug!("Start to republish {} public keys.", public_keys.len()); + // TODO: Inspect pkarr reliability. + // pkarr client gets really unreliable when used in parallel. To get around this, we use one client per run(). + let client = self.client_builder.clone().build()?; + let rclient = + ResilientClient::new_with_client(client, self.settings.retry_settings.clone())?; + for key in public_keys { + let start = Instant::now(); + let res = rclient + .republish( + key.clone(), + Some(self.settings.min_sufficient_node_publish_count), + ) + .await; + + let elapsed = start.elapsed().as_millis(); + match &res { + Ok(info) => { + tracing::info!( + "Republished {key} successfully on {} nodes within {elapsed}ms. attemps={}", + info.published_nodes_count, + info.attempts_needed + ) + } + Err(e) => { + tracing::warn!( + "Failed to republish public_key {} within {elapsed}ms. {}", + key, + e + ); + } + } + + results.insert(key.clone(), res); + } + Ok(results) + } + + /// Republish keys in a parallel fashion, using multiple threads for better performance. + /// A good thread size is around 10 for most computers. With high performance cores, you can push + /// it to 40+. + pub async fn run( + &self, + public_keys: Vec, + thread_count: u8, + ) -> Result { + let chunk_size = public_keys.len().div_ceil(thread_count as usize); + let chunks = public_keys + .chunks(chunk_size) + .map(|chunk| chunk.to_vec()) + .collect::>(); + + // Run in parallel + let mut handles = vec![]; + for chunk in chunks { + let publisher = self.clone(); + let handle = tokio::spawn(async move { publisher.run_serially(chunk).await }); + handles.push(handle); + } + + // Join results of all tasks + let mut results = HashMap::with_capacity(public_keys.len()); + for handle in handles { + let join_result = handle.await; + if let Err(e) = join_result { + tracing::error!("Failed to join handle in MultiRepublisher::run: {e}"); + continue; + } + let result = join_result.unwrap()?; + for entry in result { + results.insert(entry.0, entry.1); + } + } + + Ok(MultiRepublishResult::new(results)) + } +} + +#[cfg(test)] +mod tests { + use std::num::NonZeroU8; + + use pkarr::{dns::Name, ClientBuilder, Keypair, PublicKey}; + use pubky_testnet::Testnet; + + use crate::{multi_republisher::MultiRepublisher, republisher::RepublisherSettings}; + + async fn publish_sample_packets(client: &pkarr::Client, count: usize) -> Vec { + let keys: Vec = (0..count).map(|_| Keypair::random()).collect(); + for key in keys.iter() { + let packet = pkarr::SignedPacketBuilder::default() + .cname(Name::new("test").unwrap(), Name::new("test2").unwrap(), 600) + .build(key) + .unwrap(); + let _ = client.publish(&packet, None).await; + } + + keys.into_iter().map(|key| key.public_key()).collect() + } + + #[tokio::test] + async fn single_key_republish_success() { + let testnet = Testnet::run().await.unwrap(); + // Create testnet pkarr builder + let mut pkarr_builder = ClientBuilder::default(); + pkarr_builder.bootstrap(&testnet.bootstrap()).no_relays(); + let pkarr_client = pkarr_builder.clone().build().unwrap(); + + let public_keys = publish_sample_packets(&pkarr_client, 1).await; + let public_key = public_keys.first().unwrap().clone(); + + let mut settings = RepublisherSettings::default(); + settings + .pkarr_client(pkarr_client) + .min_sufficient_node_publish_count(NonZeroU8::new(1).unwrap()); + let publisher = MultiRepublisher::new_with_settings(settings, Some(pkarr_builder)); + let results = publisher.run_serially(public_keys).await.unwrap(); + let result = results.get(&public_key).unwrap(); + if let Err(e) = result { + println!("Err {e}"); + } + assert!(result.is_ok()); + } + + #[tokio::test] + async fn single_key_republish_insufficient() { + let testnet = Testnet::run().await.unwrap(); + // Create testnet pkarr builder + let mut pkarr_builder = ClientBuilder::default(); + pkarr_builder.bootstrap(&testnet.bootstrap()).no_relays(); + let pkarr_client = pkarr_builder.clone().build().unwrap(); + let public_keys = publish_sample_packets(&pkarr_client, 1).await; + + let public_key = public_keys.first().unwrap().clone(); + + let mut settings = RepublisherSettings::default(); + settings + .pkarr_client(pkarr_client) + .min_sufficient_node_publish_count(NonZeroU8::new(2).unwrap()); + let publisher = MultiRepublisher::new_with_settings(settings, Some(pkarr_builder)); + let results = publisher.run_serially(public_keys).await.unwrap(); + let result = results.get(&public_key).unwrap(); + assert!(result.is_err()); + } +} diff --git a/pkarr-republisher/src/publisher.rs b/pkarr-republisher/src/publisher.rs new file mode 100644 index 0000000..adf459c --- /dev/null +++ b/pkarr-republisher/src/publisher.rs @@ -0,0 +1,331 @@ +//! +//! Publishes a single pkarr packet with retries in case it fails. +//! + +use pkarr::{mainline::async_dht::AsyncDht, PublicKey, SignedPacket}; +use std::{num::NonZeroU8, time::Duration}; + +use crate::verify::count_key_on_dht; + +#[derive(thiserror::Error, Debug, Clone)] +pub enum PublishError { + #[error("Packet has been republished but to an insufficient number of {published_nodes_count} nodes.")] + InsuffientlyPublished { published_nodes_count: usize }, + #[error(transparent)] + PublishFailed(#[from] pkarr::errors::PublishError), +} + +impl PublishError { + pub fn is_insufficiently_published(&self) -> bool { + if let PublishError::InsuffientlyPublished { .. } = self { + return true; + } + false + } + + pub fn is_publish_failed(&self) -> bool { + if let PublishError::PublishFailed { .. } = self { + return true; + } + false + } +} + +#[derive(Debug, Clone)] +pub struct PublishInfo { + /// How many nodes the key got published on. + pub published_nodes_count: usize, + /// Number of publishing attempts needed to successfully publish. + pub attempts_needed: usize, +} + +impl PublishInfo { + pub fn new(published_nodes_count: usize, attempts_needed: usize) -> Self { + Self { + published_nodes_count, + attempts_needed, + } + } +} + +#[derive(Debug, Clone)] +pub struct RetrySettings { + /// Number of max retries to do before aborting. + pub(crate) max_retries: NonZeroU8, + /// First retry delay that is then used to calculate the exponential backoff. + /// Example: 100ms first, then 200ms, 400ms, 800ms and so on. + pub(crate) initial_retry_delay: Duration, + /// Cap on the retry delay so the exponential backoff doesn't get out of hand. + pub(crate) max_retry_delay: Duration, +} + +impl RetrySettings { + pub fn new() -> Self { + Self::default() + } + /// Maximum number of republishing retries before giving up. + pub fn max_retries(&mut self, max_retries: NonZeroU8) -> &mut Self { + self.max_retries = max_retries; + self + } + + /// Maximum duration the republish task exponentionally backs off until it tries again. + pub fn max_retry_delay(&mut self, duration: Duration) -> &mut Self { + self.max_retry_delay = duration; + self + } + + /// Minimum duration the republish task exponentionally backs off until it tries again. + pub fn initial_retry_delay(&mut self, duration: Duration) -> &mut Self { + self.initial_retry_delay = duration; + self + } +} + +impl Default for RetrySettings { + fn default() -> Self { + Self { + max_retries: NonZeroU8::new(4).expect("should always be > 0"), + initial_retry_delay: Duration::from_millis(200), + max_retry_delay: Duration::from_millis(5_000), + } + } +} + +/// Settings for creating a republisher +#[derive(Debug, Clone)] +pub struct PublisherSettings { + pub(crate) client: Option, + pub(crate) min_sufficient_node_publish_count: NonZeroU8, + pub retry_settings: RetrySettings, +} + +impl Default for PublisherSettings { + fn default() -> Self { + Self { + client: None, + min_sufficient_node_publish_count: NonZeroU8::new(10).expect("Should always be > 0"), + retry_settings: RetrySettings::default(), + } + } +} + +impl PublisherSettings { + pub fn new() -> Self { + Self::default() + } + + /// Set a custom pkarr client + pub fn pkarr_client(&mut self, client: pkarr::Client) -> &mut Self { + self.client = Some(client); + self + } + + /// Set the minimum sufficient number of nodes a key needs to be stored in + /// to be considered a success + pub fn min_sufficient_node_publish_count(&mut self, count: NonZeroU8) -> &mut Self { + self.min_sufficient_node_publish_count = count; + self + } + + /// Set settings in relation to retries. + pub fn retry_settings(&mut self, settings: RetrySettings) -> &mut Self { + self.retry_settings = settings; + self + } +} + +/// Tries to publish a single key and verifies the keys has been published to +/// a sufficient number of nodes. +/// Retries in case of errors with an exponential backoff. +#[derive(Debug, Clone)] +pub struct Publisher { + pub packet: SignedPacket, + client: pkarr::Client, + dht: AsyncDht, + min_sufficient_node_publish_count: NonZeroU8, + retry_settings: RetrySettings, +} + +impl Publisher { + /// Creates a new Publisher with a new pkarr client. + pub fn new(packet: SignedPacket) -> Result { + let settings = PublisherSettings::default(); + Self::new_with_settings(packet, settings) + } + + pub fn new_with_settings( + packet: SignedPacket, + settings: PublisherSettings, + ) -> Result { + let client = match &settings.client { + Some(c) => c.clone(), + None => pkarr::Client::builder().build()?, + }; + let dht = client.dht().expect("infallible").as_async(); + Ok(Self { + packet, + client, + dht, + min_sufficient_node_publish_count: settings.min_sufficient_node_publish_count, + retry_settings: settings.retry_settings, + }) + } + + /// Get the public key of the signer of the packet + fn get_public_key(&self) -> PublicKey { + self.packet.public_key() + } + + /// Exponential backoff delay starting with `INITIAL_DELAY_MS` and maxing out at `MAX_DELAY_MS` + fn get_retry_delay(&self, retry_count: u8) -> Duration { + let initial_ms = self.retry_settings.initial_retry_delay.as_millis() as u64; + let multiplicator = 2u64.pow(retry_count as u32); + let delay_ms = initial_ms * multiplicator; + let delay = Duration::from_millis(delay_ms); + delay.min(self.retry_settings.max_retry_delay) + } + + /// Republish a single public key. + pub async fn publish_once(&self) -> Result { + if let Err(e) = self.client.publish(&self.packet, None).await { + return Err(e.into()); + } + + // TODO: This counting could really be done with the put response in the mainline library already. It's not exposed though. + // This would really speed up the publishing and reduce the load on the DHT. + // -- Sev April 2025 -- + let published_nodes_count = count_key_on_dht(&self.get_public_key(), &self.dht).await; + if published_nodes_count < self.min_sufficient_node_publish_count.get().into() { + return Err(PublishError::InsuffientlyPublished { + published_nodes_count, + }); + } + + Ok(PublishInfo::new(published_nodes_count, 1)) + } + + // Publishes the key with an exponential backoff + pub async fn publish(&self) -> Result { + let max_retries = self.retry_settings.max_retries.get(); + let mut last_error: Option = None; + for retry_count in 0..max_retries { + let human_retry_count = retry_count + 1; + match self.publish_once().await { + Ok(mut info) => { + info.attempts_needed = human_retry_count as usize; + return Ok(info); + } + Err(e) => { + tracing::debug!( + "{human_retry_count}/{max_retries} Failed to publish {}: {e}", + self.get_public_key() + ); + last_error = Some(e); + } + } + + let delay = self.get_retry_delay(retry_count); + tracing::debug!( + "{} {human_retry_count}/{max_retries} Sleep for {delay:?} before trying again.", + self.get_public_key() + ); + tokio::time::sleep(delay).await; + } + + Err(last_error.expect("infallible")) + } +} + +#[cfg(test)] +mod tests { + use std::{num::NonZeroU8, time::Duration}; + + use pkarr::{dns::Name, Keypair, PublicKey, SignedPacket}; + use pubky_testnet::Testnet; + + use crate::publisher::{PublishError, Publisher, PublisherSettings}; + + fn sample_packet() -> (PublicKey, SignedPacket) { + let key = Keypair::random(); + let packet = pkarr::SignedPacketBuilder::default() + .cname(Name::new("test").unwrap(), Name::new("test2").unwrap(), 600) + .build(&key) + .unwrap(); + (key.public_key(), packet) + } + + #[tokio::test] + async fn single_key_republish_success() { + let testnet = Testnet::run().await.unwrap(); + let pubky_client = testnet.client_builder().build().unwrap(); + let pkarr_client = pubky_client.pkarr().clone(); + let (_, packet) = sample_packet(); + + let required_nodes = 1; + let mut settings = PublisherSettings::default(); + settings + .pkarr_client(pkarr_client) + .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()); + let publisher = Publisher::new_with_settings(packet, settings).unwrap(); + let res = publisher.publish_once().await; + assert!(res.is_ok()); + let success = res.unwrap(); + assert_eq!(success.published_nodes_count, 1); + } + + #[tokio::test] + async fn single_key_republish_insufficient() { + let testnet = Testnet::run().await.unwrap(); + let pubky_client = testnet.client_builder().build().unwrap(); + let pkarr_client = pubky_client.pkarr().clone(); + let (_, packet) = sample_packet(); + + let required_nodes = 2; + let mut settings = PublisherSettings::default(); + settings + .pkarr_client(pkarr_client) + .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()); + let publisher = Publisher::new_with_settings(packet, settings).unwrap(); + let res = publisher.publish_once().await; + + assert!(res.is_err()); + let err = res.unwrap_err(); + assert!(err.is_insufficiently_published()); + if let PublishError::InsuffientlyPublished { + published_nodes_count, + } = err + { + assert_eq!(published_nodes_count, 1); + }; + } + + #[tokio::test] + async fn retry_delay() { + let testnet = Testnet::run().await.unwrap(); + let pubky_client = testnet.client_builder().build().unwrap(); + let pkarr_client = pubky_client.pkarr().clone(); + let (_, packet) = sample_packet(); + + let required_nodes = 1; + let mut settings = PublisherSettings::default(); + settings + .pkarr_client(pkarr_client) + .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()); + settings + .retry_settings + .max_retries(NonZeroU8::new(10).unwrap()) + .initial_retry_delay(Duration::from_millis(100)) + .max_retry_delay(Duration::from_secs(10)); + let publisher = Publisher::new_with_settings(packet, settings).unwrap(); + + let first_delay = publisher.get_retry_delay(0); + assert_eq!(first_delay.as_millis(), 100); + let second_delay = publisher.get_retry_delay(1); + assert_eq!(second_delay.as_millis(), 200); + let third_delay = publisher.get_retry_delay(2); + assert_eq!(third_delay.as_millis(), 400); + let ninth_delay = publisher.get_retry_delay(9); + assert_eq!(ninth_delay.as_millis(), 10_000); + } +} diff --git a/pkarr-republisher/src/republisher.rs b/pkarr-republisher/src/republisher.rs new file mode 100644 index 0000000..be2e007 --- /dev/null +++ b/pkarr-republisher/src/republisher.rs @@ -0,0 +1,406 @@ +//! +//! Republishes a single public key with retries in case it fails. +//! +use pkarr::PublicKey; +use pkarr::SignedPacket; +use std::{num::NonZeroU8, sync::Arc, time::Duration}; + +use crate::{ + publisher::{PublishError, Publisher, PublisherSettings}, + RetrySettings, +}; + +#[derive(thiserror::Error, Debug, Clone)] +pub enum RepublishError { + #[error("The packet can't be resolved on the DHT and therefore can't be republished.")] + Missing, + #[error(transparent)] + PublishFailed(#[from] PublishError), +} + +impl RepublishError { + pub fn is_missing(&self) -> bool { + if let RepublishError::Missing = self { + return true; + } + false + } + + pub fn is_publish_failed(&self) -> bool { + if let RepublishError::PublishFailed { .. } = self { + return true; + } + false + } +} + +#[derive(Debug, Clone)] +pub struct RepublishInfo { + /// How many nodes the key got published on. + pub published_nodes_count: usize, + /// Number of publishing attempts needed to successfully republish. + pub attempts_needed: usize, + /// Whether the `republish_condition` was negative. + pub condition_failed: bool, +} + +impl RepublishInfo { + pub fn new( + published_nodes_count: usize, + attempts_needed: usize, + should_republish_condition_failed: bool, + ) -> Self { + Self { + published_nodes_count, + attempts_needed, + condition_failed: should_republish_condition_failed, + } + } +} + +pub type RepublishCondition = dyn Fn(&SignedPacket) -> bool + Send + Sync; + +/// Settings for creating a republisher +#[derive(Clone)] +pub struct RepublisherSettings { + pub(crate) client: Option, + pub(crate) min_sufficient_node_publish_count: NonZeroU8, + pub(crate) retry_settings: RetrySettings, + pub(crate) republish_condition: Option>, +} + +impl std::fmt::Debug for RepublisherSettings { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RepublisherSettings") + .field("client", &self.client) + .field( + "min_sufficient_node_publish_count", + &self.min_sufficient_node_publish_count, + ) + .field("retry_settings", &self.retry_settings) + .finish_non_exhaustive() + } +} + +impl RepublisherSettings { + pub fn new() -> Self { + Self::default() + } + + /// Set a custom pkarr client + pub fn pkarr_client(&mut self, client: pkarr::Client) -> &mut Self { + self.client = Some(client); + self + } + + /// Set the minimum sufficient number of nodes a key needs to be stored in + /// to be considered a success + pub fn min_sufficient_node_publish_count(&mut self, count: NonZeroU8) -> &mut Self { + self.min_sufficient_node_publish_count = count; + self + } + + /// Set settings in relation to retries. + pub fn retry_settings(&mut self, settings: RetrySettings) -> &mut Self { + self.retry_settings = settings; + self + } + + /// Set a closure that determines whether a packet should be republished + pub fn republish_condition(&mut self, f: F) -> &mut Self + where + F: Fn(&SignedPacket) -> bool + Send + Sync + 'static, + { + self.republish_condition = Some(Arc::new(f)); + self + } +} + +impl Default for RepublisherSettings { + fn default() -> Self { + Self { + client: None, + min_sufficient_node_publish_count: NonZeroU8::new(10).expect("Should always be > 0"), + retry_settings: RetrySettings::default(), + republish_condition: None, + } + } +} + +/// Tries to republish a single key. +/// Retries in case of errors with an exponential backoff. +pub struct Republisher { + pub public_key: PublicKey, + client: pkarr::Client, + min_sufficient_node_publish_count: NonZeroU8, + retry_settings: RetrySettings, + republish_condition: Arc bool + Send + Sync>, +} + +impl std::fmt::Debug for Republisher { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Republisher") + .field("public_key", &self.public_key) + .field("client", &self.client) + .field( + "min_sufficient_node_publish_count", + &self.min_sufficient_node_publish_count, + ) + .field("retry_settings", &self.retry_settings) + .finish_non_exhaustive() + } +} + +impl Republisher { + /// Creates a new Republisher; + pub fn new(public_key: PublicKey) -> Result { + let settings = RepublisherSettings::default(); + Self::new_with_settings(public_key, settings) + } + + pub fn new_with_settings( + public_key: PublicKey, + settings: RepublisherSettings, + ) -> Result { + let client = match &settings.client { + Some(c) => c.clone(), + None => pkarr::Client::builder().build()?, + }; + Ok(Republisher { + public_key, + client, + min_sufficient_node_publish_count: settings.min_sufficient_node_publish_count, + retry_settings: settings.retry_settings, + republish_condition: settings + .republish_condition + .unwrap_or_else(|| Arc::new(|_| true)), + }) + } + + /// Exponential backoff delay starting with `INITIAL_DELAY_MS` and maxing out at `MAX_DELAY_MS` + fn get_retry_delay(&self, retry_count: u8) -> Duration { + let initial_ms = self.retry_settings.initial_retry_delay.as_millis() as u64; + let multiplicator = 2u64.pow(retry_count as u32); + let delay_ms = initial_ms * multiplicator; + let delay = Duration::from_millis(delay_ms); + delay.min(self.retry_settings.max_retry_delay) + } + + /// Republish a single public key. + pub async fn republish_once(&self) -> Result { + let packet = self.client.resolve_most_recent(&self.public_key).await; + if packet.is_none() { + return Err(RepublishError::Missing); + } + let packet = packet.unwrap(); + + // Check if the packet should be republished + if !(self.republish_condition)(&packet) { + return Ok(RepublishInfo::new(0, 1, true)); + } + + let mut settings = PublisherSettings::default(); + settings + .pkarr_client(self.client.clone()) + .min_sufficient_node_publish_count(self.min_sufficient_node_publish_count); + let publisher = Publisher::new_with_settings(packet, settings) + .expect("infallible because pkarr client provided"); + match publisher.publish_once().await { + Ok(info) => Ok(RepublishInfo::new(info.published_nodes_count, 1, false)), + Err(e) => Err(e.into()), + } + } + + // Republishes the key with an exponential backoff + pub async fn republish(&self) -> Result { + let max_retries = self.retry_settings.max_retries.get(); + let mut retry_count = 0; + let mut last_error: Option = None; + while retry_count < max_retries { + match self.republish_once().await { + Ok(mut success) => { + success.attempts_needed = retry_count as usize + 1; + return Ok(success); + } + Err(e) => { + tracing::debug!( + "{retry_count}/{max_retries} Failed to publish {}: {e}", + self.public_key + ); + last_error = Some(e); + } + } + + let delay = self.get_retry_delay(retry_count); + retry_count += 1; + tracing::debug!( + "{} {retry_count}/{max_retries} Sleep for {delay:?} before trying again.", + self.public_key + ); + tokio::time::sleep(delay).await; + } + + Err(last_error.expect("infallible")) + } +} + +#[cfg(test)] +mod tests { + use std::{num::NonZeroU8, time::Duration}; + + use crate::republisher::{Republisher, RepublisherSettings}; + use pkarr::{dns::Name, Keypair, PublicKey}; + use pubky_testnet::Testnet; + + async fn publish_sample_packets(client: &pkarr::Client) -> PublicKey { + let key = Keypair::random(); + + let packet = pkarr::SignedPacketBuilder::default() + .cname(Name::new("test").unwrap(), Name::new("test2").unwrap(), 600) + .build(&key) + .unwrap(); + client + .publish(&packet, None) + .await + .expect("to be published"); + + key.public_key() + } + + #[tokio::test] + async fn single_key_republish_success() { + let testnet = Testnet::run().await.unwrap(); + let pubky_client = testnet.client_builder().build().unwrap(); + let pkarr_client = pubky_client.pkarr().clone(); + let public_key = publish_sample_packets(&pkarr_client).await; + + let required_nodes = 1; + let mut settings = RepublisherSettings::default(); + settings + .pkarr_client(pkarr_client) + .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()); + let publisher = Republisher::new_with_settings(public_key, settings).unwrap(); + let res = publisher.republish_once().await; + assert!(res.is_ok()); + let success = res.unwrap(); + assert_eq!(success.published_nodes_count, 1); + } + + #[tokio::test] + async fn single_key_republish_missing() { + let testnet = Testnet::run().await.unwrap(); + let pubky_client = testnet.client_builder().build().unwrap(); + let pkarr_client = pubky_client.pkarr().clone(); + let public_key = Keypair::random().public_key(); + + let required_nodes = 1; + let mut settings = RepublisherSettings::default(); + settings + .pkarr_client(pkarr_client) + .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()); + let publisher = Republisher::new_with_settings(public_key, settings).unwrap(); + let res = publisher.republish_once().await; + + assert!(res.is_err()); + let err = res.unwrap_err(); + assert!(err.is_missing()); + } + + #[tokio::test] + async fn retry_delay() { + let testnet = Testnet::run().await.unwrap(); + let pubky_client = testnet.client_builder().build().unwrap(); + let pkarr_client = pubky_client.pkarr().clone(); + let public_key = Keypair::random().public_key(); + + let required_nodes = 1; + let mut settings = RepublisherSettings::default(); + settings + .pkarr_client(pkarr_client) + .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()); + settings + .retry_settings + .max_retries(NonZeroU8::new(10).unwrap()) + .initial_retry_delay(Duration::from_millis(100)) + .max_retry_delay(Duration::from_secs(10)); + let publisher = Republisher::new_with_settings(public_key, settings).unwrap(); + + let first_delay = publisher.get_retry_delay(0); + assert_eq!(first_delay.as_millis(), 100); + let second_delay = publisher.get_retry_delay(1); + assert_eq!(second_delay.as_millis(), 200); + let third_delay = publisher.get_retry_delay(2); + assert_eq!(third_delay.as_millis(), 400); + let ninth_delay = publisher.get_retry_delay(9); + assert_eq!(ninth_delay.as_millis(), 10_000); + } + + #[tokio::test] + async fn republish_retry_missing() { + let testnet = Testnet::run().await.unwrap(); + let pubky_client = testnet.client_builder().build().unwrap(); + let pkarr_client = pubky_client.pkarr().clone(); + let public_key = Keypair::random().public_key(); + + let required_nodes = 1; + let mut settings = RepublisherSettings::default(); + settings + .pkarr_client(pkarr_client) + .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()); + settings + .retry_settings + .max_retries(NonZeroU8::new(3).unwrap()) + .initial_retry_delay(Duration::from_millis(100)); + let publisher = Republisher::new_with_settings(public_key, settings).unwrap(); + let res = publisher.republish().await; + + assert!(res.is_err()); + assert!(res.unwrap_err().is_missing()); + } + + #[tokio::test] + async fn republish_with_condition_fail() { + let testnet = Testnet::run().await.unwrap(); + let pubky_client = testnet.client_builder().build().unwrap(); + let pkarr_client = pubky_client.pkarr().clone(); + let public_key = publish_sample_packets(&pkarr_client).await; + + let required_nodes = 1; + let mut settings = RepublisherSettings::default(); + settings + .pkarr_client(pkarr_client.clone()) + .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()) + // Only republish if the packet has a TTL greater than 300 + .republish_condition(|_| false); + + let publisher = Republisher::new_with_settings(public_key.clone(), settings).unwrap(); + let res = publisher.republish_once().await; + assert!(res.is_ok()); + let info = res.unwrap(); + assert_eq!(info.published_nodes_count, 0); + assert_eq!(info.condition_failed, true); + } + + #[tokio::test] + async fn republish_with_condition_success() { + let testnet = Testnet::run().await.unwrap(); + let pubky_client = testnet.client_builder().build().unwrap(); + let pkarr_client = pubky_client.pkarr().clone(); + let public_key = publish_sample_packets(&pkarr_client).await; + + let required_nodes = 1; + let mut settings = RepublisherSettings::default(); + settings + .pkarr_client(pkarr_client.clone()) + .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()) + // Only republish if the packet has a TTL greater than 300 + .republish_condition(|_| true); + + let publisher = Republisher::new_with_settings(public_key.clone(), settings).unwrap(); + let res = publisher.republish_once().await; + assert!(res.is_ok()); + let info = res.unwrap(); + assert_eq!(info.published_nodes_count, 1); + assert_eq!(info.condition_failed, false); + } +} diff --git a/pkarr-republisher/src/resilient_client.rs b/pkarr-republisher/src/resilient_client.rs new file mode 100644 index 0000000..c48eda2 --- /dev/null +++ b/pkarr-republisher/src/resilient_client.rs @@ -0,0 +1,91 @@ +use std::num::NonZeroU8; + +use pkarr::{mainline::async_dht::AsyncDht, PublicKey, SignedPacket}; + +use crate::{ + count_key_on_dht, PublishError, PublishInfo, Publisher, PublisherSettings, RepublishError, + RepublishInfo, Republisher, RepublisherSettings, RetrySettings, +}; + +#[derive(Debug, thiserror::Error)] +pub enum ResilientClientBuilderError { + #[error("pkarr client was built without DHT and is only using relays. This is not supported.")] + DhtNotEnabled, + #[error(transparent)] + BuildError(#[from] pkarr::errors::BuildError), +} + +/// Simple pkarr client that focuses on resilience +/// and verification compared to the regular client that +/// might experience inreliability due to the underlying UDP connection. +/// +/// This client requires a pkarr client that was built with the `dht` feature. +/// Relays only are not supported. +#[derive(Debug, Clone)] +pub struct ResilientClient { + client: pkarr::Client, + dht: AsyncDht, + retry_settings: RetrySettings, +} + +impl ResilientClient { + pub fn new() -> Result { + let client = pkarr::Client::builder().build()?; + Self::new_with_client(client, RetrySettings::default()) + } + + pub fn new_with_client( + client: pkarr::Client, + retry_settings: RetrySettings, + ) -> Result { + let dht = client.dht(); + if dht.is_none() { + return Err(ResilientClientBuilderError::DhtNotEnabled); + } + let dht = dht.unwrap().as_async(); + Ok(Self { + client, + dht, + retry_settings, + }) + } + + /// Publishes a pkarr packet with retries. Verifies it's been stored correctly. + pub async fn publish( + &self, + packet: SignedPacket, + min_sufficient_node_publish_count: Option, + ) -> Result { + let mut settings = PublisherSettings::default(); + settings.pkarr_client(self.client.clone()); + settings.retry_settings(self.retry_settings.clone()); + if let Some(count) = min_sufficient_node_publish_count { + settings.min_sufficient_node_publish_count = count; + }; + let publisher = Publisher::new_with_settings(packet, settings) + .expect("infallible because pkarr client provided."); + publisher.publish().await + } + + /// Republishes a pkarr packet with retries. Verifies it's been stored correctly. + pub async fn republish( + &self, + public_key: PublicKey, + min_sufficient_node_publish_count: Option, + ) -> Result { + let mut settings = RepublisherSettings::default(); + settings.pkarr_client(self.client.clone()); + if let Some(count) = min_sufficient_node_publish_count { + settings.min_sufficient_node_publish_count = count; + }; + settings.retry_settings(self.retry_settings.clone()); + let publisher = Republisher::new_with_settings(public_key, settings) + .expect("infallible because pkarr client provided."); + publisher.republish().await + } + + /// Counts the number of nodes the public key has been stored on. + pub async fn verify_node_count(&self, public_key: &PublicKey) -> usize { + count_key_on_dht(public_key, &self.dht).await + } +} diff --git a/pkarr-republisher/src/verify.rs b/pkarr-republisher/src/verify.rs new file mode 100644 index 0000000..a37f878 --- /dev/null +++ b/pkarr-republisher/src/verify.rs @@ -0,0 +1,12 @@ +use futures_lite::StreamExt; +use pkarr::{mainline::async_dht::AsyncDht, PublicKey}; + +/// Verifies the number of nodes that store the public key. +pub async fn count_key_on_dht(public_key: &PublicKey, dht: &AsyncDht) -> usize { + let mut response_count = 0; + let mut stream = dht.get_mutable(public_key.as_bytes(), None, None); + while (stream.next().await).is_some() { + response_count += 1; + } + response_count +} diff --git a/pubky-homeserver/Cargo.toml b/pubky-homeserver/Cargo.toml index 7ed485a..01b654c 100644 --- a/pubky-homeserver/Cargo.toml +++ b/pubky-homeserver/Cargo.toml @@ -45,3 +45,6 @@ url = "2.5.4" axum-server = { version = "0.7.1", features = ["tls-rustls-no-provider"] } tower = "0.5.2" page_size = "0.6.0" +pkarr-republisher = { path = "../pkarr-republisher" } +thiserror = "2.0.12" + diff --git a/pubky-homeserver/src/config.example.toml b/pubky-homeserver/src/config.example.toml index 069bd95..9678ccd 100644 --- a/pubky-homeserver/src/config.example.toml +++ b/pubky-homeserver/src/config.example.toml @@ -1,6 +1,9 @@ # Secret key (in hex) to generate the Homeserver's Keypair # secret_key = "0000000000000000000000000000000000000000000000000000000000000000" +# The interval at which user keys are republished to the DHT. +user_keys_republisher_interval = 14400 # 4 hour in seconds + [admin] # Set an admin password to protect admin endpoints. # If no password is set, the admin endpoints will not be accessible. diff --git a/pubky-homeserver/src/config.rs b/pubky-homeserver/src/config.rs index c526100..ec43c06 100644 --- a/pubky-homeserver/src/config.rs +++ b/pubky-homeserver/src/config.rs @@ -15,6 +15,8 @@ use crate::{ io::IoConfig, }; +pub const DEFAULT_REPUBLISHER_INTERVAL: u64 = 4 * 60 * 60; // 4 hours in seconds + // === Core == pub const DEFAULT_STORAGE_DIR: &str = "pubky"; pub const DEFAULT_MAP_SIZE: usize = 10995116277760; // 10TB (not = disk-space used) @@ -129,7 +131,6 @@ impl Config { bootstrap, http_port: 0, https_port: 0, - ..Default::default() }, core: CoreConfig::test(), diff --git a/pubky-homeserver/src/core/database/db.rs b/pubky-homeserver/src/core/database/db.rs new file mode 100644 index 0000000..1c19fd4 --- /dev/null +++ b/pubky-homeserver/src/core/database/db.rs @@ -0,0 +1,74 @@ +use super::tables::{Tables, TABLES_COUNT}; +/// Protecting fields from being mutated by modules in crate::database +use crate::core::CoreConfig; +use heed::{Env, EnvOpenOptions}; +use std::{fs, path::PathBuf}; + +use super::migrations; + +#[derive(Debug, Clone)] +pub struct DB { + pub(crate) env: Env, + pub(crate) tables: Tables, + pub(crate) buffers_dir: PathBuf, + pub(crate) max_chunk_size: usize, + config: CoreConfig, +} + +impl DB { + /// # Safety + /// DB uses LMDB, [opening][heed::EnvOpenOptions::open] which is marked unsafe, + /// because the possible Undefined Behavior (UB) if the lock file is broken. + pub unsafe fn open(config: CoreConfig) -> anyhow::Result { + let buffers_dir = config.storage.clone().join("buffers"); + + // Cleanup buffers. + let _ = fs::remove_dir(&buffers_dir); + fs::create_dir_all(&buffers_dir)?; + + let env = unsafe { + EnvOpenOptions::new() + .max_dbs(TABLES_COUNT) + .map_size(config.db_map_size) + .open(&config.storage) + }?; + + let tables = migrations::run(&env)?; + + let db = DB { + env, + tables, + config, + buffers_dir, + max_chunk_size: max_chunk_size(), + }; + + Ok(db) + } + + // Create an ephemeral database for testing purposes. + pub fn test() -> DB { + unsafe { DB::open(CoreConfig::test()).unwrap() } + } + + // === Getters === + + pub fn config(&self) -> &CoreConfig { + &self.config + } +} + +/// calculate optimal chunk size: +/// - +/// - +fn max_chunk_size() -> usize { + let page_size = page_size::get(); + + // - 16 bytes Header per page (LMDB) + // - Each page has to contain 2 records + // - 8 bytes per record (LMDB) (empirically, it seems to be 10 not 8) + // - 12 bytes key: + // - timestamp : 8 bytes + // - chunk index: 4 bytes + ((page_size - 16) / 2) - (8 + 2) - 12 +} diff --git a/pubky-homeserver/src/core/database/mod.rs b/pubky-homeserver/src/core/database/mod.rs index 7242bf0..d718151 100644 --- a/pubky-homeserver/src/core/database/mod.rs +++ b/pubky-homeserver/src/core/database/mod.rs @@ -1,87 +1,5 @@ //! Internal database in [super::HomeserverCore] - -use std::{fs, path::PathBuf}; - -use heed::{Env, EnvOpenOptions}; - +mod db; mod migrations; pub mod tables; - -use tables::{Tables, TABLES_COUNT}; - -pub use protected::DB; - -/// Protecting fields from being mutated by modules in crate::database -mod protected { - - use crate::core::CoreConfig; - - use super::*; - - #[derive(Debug, Clone)] - pub struct DB { - pub(crate) env: Env, - pub(crate) tables: Tables, - pub(crate) buffers_dir: PathBuf, - pub(crate) max_chunk_size: usize, - config: CoreConfig, - } - - impl DB { - /// # Safety - /// DB uses LMDB, [opening][heed::EnvOpenOptions::open] which is marked unsafe, - /// because the possible Undefined Behavior (UB) if the lock file is broken. - pub unsafe fn open(config: CoreConfig) -> anyhow::Result { - let buffers_dir = config.storage.clone().join("buffers"); - - // Cleanup buffers. - let _ = fs::remove_dir(&buffers_dir); - fs::create_dir_all(&buffers_dir)?; - - let env = unsafe { - EnvOpenOptions::new() - .max_dbs(TABLES_COUNT) - .map_size(config.db_map_size) - .open(&config.storage) - }?; - - let tables = migrations::run(&env)?; - - let db = DB { - env, - tables, - config, - buffers_dir, - max_chunk_size: max_chunk_size(), - }; - - Ok(db) - } - - // Create an ephemeral database for testing purposes. - pub fn test() -> DB { - unsafe { DB::open(CoreConfig::test()).unwrap() } - } - - // === Getters === - - pub fn config(&self) -> &CoreConfig { - &self.config - } - } -} - -/// calculate optimal chunk size: -/// - -/// - -fn max_chunk_size() -> usize { - let page_size = page_size::get(); - - // - 16 bytes Header per page (LMDB) - // - Each page has to contain 2 records - // - 8 bytes per record (LMDB) (empirically, it seems to be 10 not 8) - // - 12 bytes key: - // - timestamp : 8 bytes - // - chunk index: 4 bytes - ((page_size - 16) / 2) - (8 + 2) - 12 -} +pub use db::DB; diff --git a/pubky-homeserver/src/core/database/tables/users.rs b/pubky-homeserver/src/core/database/tables/users.rs index 6834c84..94b7f15 100644 --- a/pubky-homeserver/src/core/database/tables/users.rs +++ b/pubky-homeserver/src/core/database/tables/users.rs @@ -4,7 +4,7 @@ use postcard::{from_bytes, to_allocvec}; use serde::{Deserialize, Serialize}; use heed::{BoxedError, BytesDecode, BytesEncode, Database}; -use pkarr::PublicKey; +use pkarr::{PublicKey, Timestamp}; extern crate alloc; @@ -19,6 +19,15 @@ pub struct User { pub created_at: u64, } +impl User { + #[allow(dead_code)] + pub fn new() -> Self { + Self { + created_at: Timestamp::now().as_u64(), + } + } +} + impl BytesEncode<'_> for User { type EItem = Self; diff --git a/pubky-homeserver/src/core/homeserver_core.rs b/pubky-homeserver/src/core/homeserver_core.rs new file mode 100644 index 0000000..9d1fb7a --- /dev/null +++ b/pubky-homeserver/src/core/homeserver_core.rs @@ -0,0 +1,224 @@ +use std::{path::PathBuf, time::Duration}; + +use crate::core::user_keys_republisher::UserKeysRepublisher; +use anyhow::Result; +use axum::Router; +use pubky_common::auth::AuthVerifier; +use tokio::time::sleep; + +use crate::config::{ + DEFAULT_LIST_LIMIT, DEFAULT_MAP_SIZE, DEFAULT_MAX_LIST_LIMIT, DEFAULT_REPUBLISHER_INTERVAL, + DEFAULT_STORAGE_DIR, +}; + +use crate::core::database::DB; + +#[derive(Clone, Debug)] +pub(crate) struct AppState { + pub(crate) verifier: AuthVerifier, + pub(crate) db: DB, + pub(crate) admin: AdminConfig, +} + +const INITIAL_DELAY_BEFORE_REPUBLISH: Duration = Duration::from_secs(60); + +#[derive(Debug, Clone)] +/// A side-effect-free Core of the [crate::Homeserver]. +pub struct HomeserverCore { + pub(crate) router: Router, + pub(crate) user_keys_republisher: UserKeysRepublisher, +} + +impl HomeserverCore { + /// Create a side-effect-free Homeserver core. + /// + /// # Safety + /// HomeserverCore uses LMDB, [opening][heed::EnvOpenOptions::open] which is marked unsafe, + /// because the possible Undefined Behavior (UB) if the lock file is broken. + pub unsafe fn new(config: CoreConfig, admin: AdminConfig) -> Result { + let db = unsafe { DB::open(config.clone())? }; + + let state = AppState { + verifier: AuthVerifier::default(), + db: db.clone(), + admin, + }; + + let router = super::routes::create_app(state.clone()); + + let user_keys_republisher = UserKeysRepublisher::new( + db.clone(), + config + .user_keys_republisher_interval + .unwrap_or(Duration::from_secs(DEFAULT_REPUBLISHER_INTERVAL)), + ); + + let user_keys_republisher_clone = user_keys_republisher.clone(); + if config.is_user_keys_republisher_enabled() { + // Delayed start of the republisher to give time for the homeserver to start. + tokio::spawn(async move { + sleep(INITIAL_DELAY_BEFORE_REPUBLISH).await; + user_keys_republisher_clone.run().await; + }); + } + Ok(Self { + router, + user_keys_republisher, + }) + } + + /// Stop the home server background tasks. + #[allow(dead_code)] + pub async fn stop(&mut self) { + self.user_keys_republisher.stop().await; + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub enum SignupMode { + Open, + #[default] + TokenRequired, +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct AdminConfig { + /// The password used to authorize admin endpoints. + pub password: Option, + /// Determines whether new signups require a valid token. + pub signup_mode: SignupMode, +} + +impl AdminConfig { + pub fn test() -> Self { + AdminConfig { + password: Some("admin".to_string()), + signup_mode: SignupMode::Open, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +/// Database configurations +pub struct CoreConfig { + /// Path to the storage directory. + /// + /// Defaults to a directory in the OS data directory + pub storage: PathBuf, + pub db_map_size: usize, + + /// The default limit of a list api if no `limit` query parameter is provided. + /// + /// Defaults to `100` + pub default_list_limit: u16, + /// The maximum limit of a list api, even if a `limit` query parameter is provided. + /// + /// Defaults to `1000` + pub max_list_limit: u16, + + /// The interval at which the user keys republisher runs. None is disabled. + /// + /// Defaults to `60*60*4` (4 hours) + pub user_keys_republisher_interval: Option, +} + +impl Default for CoreConfig { + fn default() -> Self { + Self { + storage: storage(None) + .expect("operating environment provides no directory for application data"), + db_map_size: DEFAULT_MAP_SIZE, + + default_list_limit: DEFAULT_LIST_LIMIT, + max_list_limit: DEFAULT_MAX_LIST_LIMIT, + + user_keys_republisher_interval: Some(Duration::from_secs(60 * 60 * 4)), + } + } +} + +impl CoreConfig { + pub fn test() -> Self { + let storage = std::env::temp_dir() + .join(pubky_common::timestamp::Timestamp::now().to_string()) + .join(DEFAULT_STORAGE_DIR); + + Self { + storage, + db_map_size: 10485760, + + ..Default::default() + } + } + + pub fn is_user_keys_republisher_enabled(&self) -> bool { + self.user_keys_republisher_interval.is_some() + } +} + +pub fn storage(storage: Option) -> Result { + let dir = if let Some(storage) = storage { + PathBuf::from(storage) + } else { + let path = dirs_next::data_dir().ok_or_else(|| { + anyhow::anyhow!("operating environment provides no directory for application data") + })?; + path.join(DEFAULT_STORAGE_DIR) + }; + + Ok(dir.join("homeserver")) +} + +#[cfg(test)] +mod tests { + + use anyhow::Result; + use axum::{ + body::Body, + extract::Request, + http::{header, Method}, + response::Response, + }; + use pkarr::Keypair; + use pubky_common::{auth::AuthToken, capabilities::Capability}; + use tower::ServiceExt; + + use super::*; + + impl HomeserverCore { + /// Test version of [HomeserverCore::new], using an ephemeral small storage. + pub fn test() -> Result { + unsafe { HomeserverCore::new(CoreConfig::test(), AdminConfig::test()) } + } + + // === Public Methods === + + pub async fn create_root_user(&mut self, keypair: &Keypair) -> Result { + let auth_token = AuthToken::sign(keypair, vec![Capability::root()]); + + let response = self + .call( + Request::builder() + .uri("/signup") + .header("host", keypair.public_key().to_string()) + .method(Method::POST) + .body(Body::from(auth_token.serialize())) + .unwrap(), + ) + .await?; + + let header_value = response + .headers() + .get(header::SET_COOKIE) + .and_then(|h| h.to_str().ok()) + .expect("should return a set-cookie header") + .to_string(); + + Ok(header_value) + } + + pub async fn call(&self, request: Request) -> Result { + Ok(self.router.clone().oneshot(request).await?) + } + } +} diff --git a/pubky-homeserver/src/core/mod.rs b/pubky-homeserver/src/core/mod.rs index dafbfa9..619ca61 100644 --- a/pubky-homeserver/src/core/mod.rs +++ b/pubky-homeserver/src/core/mod.rs @@ -1,189 +1,9 @@ -use std::path::PathBuf; - -use anyhow::Result; -use axum::Router; -use pubky_common::auth::AuthVerifier; - pub mod database; mod error; mod extractors; +mod homeserver_core; mod layers; mod routes; +mod user_keys_republisher; -use crate::config::{ - DEFAULT_LIST_LIMIT, DEFAULT_MAP_SIZE, DEFAULT_MAX_LIST_LIMIT, DEFAULT_STORAGE_DIR, -}; - -use database::DB; - -#[derive(Clone, Debug)] -pub(crate) struct AppState { - pub(crate) verifier: AuthVerifier, - pub(crate) db: DB, - pub(crate) admin: AdminConfig, -} - -#[derive(Debug, Clone)] -/// A side-effect-free Core of the [crate::Homeserver]. -pub struct HomeserverCore { - pub(crate) router: Router, -} - -impl HomeserverCore { - /// Create a side-effect-free Homeserver core. - /// - /// # Safety - /// HomeserverCore uses LMDB, [opening][heed::EnvOpenOptions::open] which is marked unsafe, - /// because the possible Undefined Behavior (UB) if the lock file is broken. - pub unsafe fn new(config: CoreConfig, admin: AdminConfig) -> Result { - let db = unsafe { DB::open(config.clone())? }; - - let state = AppState { - verifier: AuthVerifier::default(), - db, - admin, - }; - - let router = routes::create_app(state.clone()); - - Ok(Self { router }) - } -} - -#[cfg(test)] -mod tests { - - use anyhow::Result; - use axum::{ - body::Body, - extract::Request, - http::{header, Method}, - response::Response, - }; - use pkarr::Keypair; - use pubky_common::{auth::AuthToken, capabilities::Capability}; - use tower::ServiceExt; - - use super::*; - - impl HomeserverCore { - /// Test version of [HomeserverCore::new], using an ephemeral small storage. - pub fn test() -> Result { - unsafe { HomeserverCore::new(CoreConfig::test(), AdminConfig::test()) } - } - - // === Public Methods === - - pub async fn create_root_user(&mut self, keypair: &Keypair) -> Result { - let auth_token = AuthToken::sign(keypair, vec![Capability::root()]); - - let response = self - .call( - Request::builder() - .uri("/signup") - .header("host", keypair.public_key().to_string()) - .method(Method::POST) - .body(Body::from(auth_token.serialize())) - .unwrap(), - ) - .await?; - - let header_value = response - .headers() - .get(header::SET_COOKIE) - .and_then(|h| h.to_str().ok()) - .expect("should return a set-cookie header") - .to_string(); - - Ok(header_value) - } - - pub async fn call(&self, request: Request) -> Result { - Ok(self.router.clone().oneshot(request).await?) - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Default)] -pub enum SignupMode { - Open, - #[default] - TokenRequired, -} - -#[derive(Debug, Clone, PartialEq, Eq, Default)] -pub struct AdminConfig { - /// The password used to authorize admin endpoints. - pub password: Option, - /// Determines whether new signups require a valid token. - pub signup_mode: SignupMode, -} - -impl AdminConfig { - pub fn test() -> Self { - AdminConfig { - password: Some("admin".to_string()), - signup_mode: SignupMode::Open, - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -/// Database configurations -pub struct CoreConfig { - /// Path to the storage directory. - /// - /// Defaults to a directory in the OS data directory - pub storage: PathBuf, - pub db_map_size: usize, - - /// The default limit of a list api if no `limit` query parameter is provided. - /// - /// Defaults to `100` - pub default_list_limit: u16, - /// The maximum limit of a list api, even if a `limit` query parameter is provided. - /// - /// Defaults to `1000` - pub max_list_limit: u16, -} - -impl Default for CoreConfig { - fn default() -> Self { - Self { - storage: storage(None) - .expect("operating environment provides no directory for application data"), - db_map_size: DEFAULT_MAP_SIZE, - - default_list_limit: DEFAULT_LIST_LIMIT, - max_list_limit: DEFAULT_MAX_LIST_LIMIT, - } - } -} - -impl CoreConfig { - pub fn test() -> Self { - let storage = std::env::temp_dir() - .join(pubky_common::timestamp::Timestamp::now().to_string()) - .join(DEFAULT_STORAGE_DIR); - - Self { - storage, - db_map_size: 10485760, - - ..Default::default() - } - } -} - -pub fn storage(storage: Option) -> Result { - let dir = if let Some(storage) = storage { - PathBuf::from(storage) - } else { - let path = dirs_next::data_dir().ok_or_else(|| { - anyhow::anyhow!("operating environment provides no directory for application data") - })?; - path.join(DEFAULT_STORAGE_DIR) - }; - - Ok(dir.join("homeserver")) -} +pub use homeserver_core::*; diff --git a/pubky-homeserver/src/core/user_keys_republisher.rs b/pubky-homeserver/src/core/user_keys_republisher.rs new file mode 100644 index 0000000..f97371d --- /dev/null +++ b/pubky-homeserver/src/core/user_keys_republisher.rs @@ -0,0 +1,218 @@ +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; + +use pkarr::PublicKey; +use pkarr_republisher::{ + MultiRepublishResult, MultiRepublisher, RepublisherSettings, ResilientClientBuilderError, +}; +use tokio::{ + sync::RwLock, + task::JoinHandle, + time::{interval, Instant}, +}; + +use crate::core::database::DB; + +#[derive(Debug, thiserror::Error)] +pub enum UserKeysRepublisherError { + #[error(transparent)] + DB(heed::Error), + #[error(transparent)] + Pkarr(ResilientClientBuilderError), +} + +/// Publishes the pkarr keys of all users to the Mainline DHT. +#[derive(Debug, Clone)] +pub struct UserKeysRepublisher { + db: DB, + handle: Arc>>>, + is_running: Arc, + republish_interval: Duration, +} + +impl UserKeysRepublisher { + pub fn new(db: DB, republish_interval: Duration) -> Self { + Self { + db, + handle: Arc::new(RwLock::new(None)), + is_running: Arc::new(AtomicBool::new(false)), + republish_interval, + } + } + + /// Run the user keys republisher. + pub async fn run(&self) { + tracing::info!( + "Initialize user keys republisher with interval {:?}", + self.republish_interval + ); + let mut lock = self.handle.write().await; + if lock.is_some() { + return; + } + let db = self.db.clone(); + let republish_interval = self.republish_interval; + let handle: JoinHandle<()> = + tokio::spawn(async move { Self::run_loop(db, republish_interval).await }); + + *lock = Some(handle); + self.is_running.store(true, Ordering::Relaxed); + } + + // Get all user public keys from the database. + async fn get_all_user_keys(db: DB) -> Result, heed::Error> { + let rtxn = db.env.read_txn()?; + let users = db.tables.users.iter(&rtxn)?; + + let keys: Vec = users + .map(|result| result.map(|val| val.0)) + .filter_map(Result::ok) // Errors: Db corruption or out of memory. For this use case, we just ignore it. + .collect(); + Ok(keys) + } + + /// Republishes all user pkarr keys to the Mainline DHT once. + /// + /// # Errors + /// + /// - If the database cannot be read, an error is returned. + /// - If the pkarr keys cannot be republished, an error is returned. + async fn republish_keys_once(db: DB) -> Result { + let keys = Self::get_all_user_keys(db) + .await + .map_err(UserKeysRepublisherError::DB)?; + if keys.is_empty() { + tracing::info!("No user keys to republish."); + return Ok(MultiRepublishResult::new(HashMap::new())); + } + let mut settings = RepublisherSettings::default(); + settings.republish_condition(|_| true); + let republisher = MultiRepublisher::new_with_settings(settings, None); + // TODO: Only publish if user points to this home server. + let results = republisher + .run(keys, 12) + .await + .map_err(UserKeysRepublisherError::Pkarr)?; + Ok(results) + } + + /// Internal run loop that publishes all user pkarr keys to the Mainline DHT continuously. + async fn run_loop(db: DB, republish_interval: Duration) { + let mut interval = interval(republish_interval); + loop { + interval.tick().await; + let start = Instant::now(); + tracing::info!("Republishing user keys..."); + let result = match Self::republish_keys_once(db.clone()).await { + Ok(result) => result, + Err(e) => { + tracing::error!("Error republishing user keys: {:?}", e); + continue; + } + }; + let elapsed = start.elapsed(); + if result.is_empty() { + continue; + } + if result.missing().is_empty() { + tracing::info!( + "Republished {} user keys within {:.1}s. {} success, {} missing, {} failed.", + result.len(), + elapsed.as_secs_f32(), + result.success().len(), + result.missing().len(), + result.publishing_failed().len() + ); + } else { + tracing::warn!( + "Republished {} user keys within {:.1}s. {} success, {} missing, {} failed.", + result.len(), + elapsed.as_secs_f32(), + result.success().len(), + result.missing().len(), + result.publishing_failed().len() + ); + } + } + } + + /// Stop the user keys republisher. + #[allow(dead_code)] + pub async fn stop(&mut self) { + let mut lock = self.handle.write().await; + + if let Some(handle) = lock.take() { + handle.abort(); + *lock = None; + self.is_running.store(false, Ordering::Relaxed); + } + } + + /// Stops the republisher synchronously. + #[allow(dead_code)] + pub fn stop_sync(&mut self) { + let mut lock = self.handle.blocking_write(); + + if let Some(handle) = lock.take() { + handle.abort(); + *lock = None; + self.is_running.store(false, Ordering::Relaxed); + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use pkarr::Keypair; + use tokio::time::Instant; + + use crate::core::{ + database::{tables::users::User, DB}, + user_keys_republisher::UserKeysRepublisher, + }; + + async fn init_db_with_users(count: usize) -> DB { + let db = DB::test(); + let mut wtxn = db.env.write_txn().unwrap(); + for _ in 0..count { + let user = User::new(); + let public_key = Keypair::random().public_key(); + db.tables.users.put(&mut wtxn, &public_key, &user).unwrap(); + } + wtxn.commit().unwrap(); + db + } + + /// Test that the republisher tries to republish all keys passed. + #[tokio::test] + async fn test_republish_keys_once() { + let db = init_db_with_users(10).await; + let result = UserKeysRepublisher::republish_keys_once(db).await.unwrap(); + assert_eq!(result.len(), 10); + assert_eq!(result.success().len(), 0); + assert_eq!(result.missing().len(), 10); + assert_eq!(result.publishing_failed().len(), 0); + } + + /// Test that the republisher stops instantly. + #[tokio::test] + async fn start_and_stop() { + let mut republisher = + UserKeysRepublisher::new(init_db_with_users(1000).await, Duration::from_secs(1)); + let start = Instant::now(); + republisher.run().await; + assert!(republisher.handle.read().await.is_some()); + republisher.stop().await; + let elapsed = start.elapsed(); + assert!(elapsed < Duration::from_secs(1)); + assert!(republisher.handle.read().await.is_none()); + } +} diff --git a/pubky-homeserver/src/io/homeserver_key_republisher.rs b/pubky-homeserver/src/io/homeserver_key_republisher.rs new file mode 100644 index 0000000..94d47ab --- /dev/null +++ b/pubky-homeserver/src/io/homeserver_key_republisher.rs @@ -0,0 +1,171 @@ +//! Pkarr related task + +use anyhow::Result; +use pkarr::errors::PublishError; +use pkarr::{dns::rdata::SVCB, Keypair, SignedPacket}; + +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use tokio::time::{interval, Duration}; + +use super::IoConfig; + +/// Republishes the homeserver's pkarr packet to the DHT every hour. +#[derive(Debug)] +pub struct HomeserverKeyRepublisher { + client: pkarr::Client, + signed_packet: SignedPacket, + republish_task: Mutex>>, +} + +impl HomeserverKeyRepublisher { + pub fn new( + keypair: &Keypair, + config: &IoConfig, + https_port: u16, + http_port: u16, + ) -> Result { + let mut builder = pkarr::Client::builder(); + + if let Some(bootstrap) = &config.bootstrap { + builder.bootstrap(bootstrap); + } + + if let Some(request_timeout) = config.dht_request_timeout { + builder.request_timeout(request_timeout); + } + + let client = builder.build()?; + + let signed_packet = create_signed_packet(keypair, config, https_port, http_port)?; + + Ok(Self { + client, + signed_packet, + republish_task: Mutex::new(None), + }) + } + + async fn publish_once( + client: &pkarr::Client, + signed_packet: &SignedPacket, + ) -> Result<(), PublishError> { + let res = client.publish(signed_packet, None).await; + if let Err(e) = &res { + tracing::warn!( + "Failed to publish the homeserver's pkarr packet to the DHT: {}", + e + ); + } else { + tracing::info!("Published the homeserver's pkarr packet to the DHT."); + } + res + } + + /// Start the periodic republish task which will republish the server packet to the DHT every hour. + /// + /// # Errors + /// - Throws an error if the initial publish fails. + /// - Throws an error if the periodic republish task is already running. + pub async fn start_periodic_republish(&self) -> anyhow::Result<()> { + let mut task_guard = self.republish_task.lock().await; + + if task_guard.is_some() { + return Err(anyhow::anyhow!( + "Periodic republish task is already running" + )); + } + + // Publish once to make sure the packet is published to the DHT before this + // function returns. + // Throws an error if the packet is not published to the DHT. + Self::publish_once(&self.client, &self.signed_packet).await?; + + // Start the periodic republish task. + let client = self.client.clone(); + let signed_packet = self.signed_packet.clone(); + let handle = tokio::spawn(async move { + let mut interval = interval(Duration::from_secs(60 * 60)); // 1 hour in seconds + interval.tick().await; // This ticks immediatly. Wait for first interval before starting the loop. + loop { + interval.tick().await; + let _ = Self::publish_once(&client, &signed_packet).await; + } + }); + + *task_guard = Some(handle); + Ok(()) + } + + /// Stop the periodic republish task. + pub async fn stop_periodic_republish(&self) { + let mut task_guard = self.republish_task.lock().await; + + if let Some(handle) = task_guard.take() { + handle.abort(); + } + } +} + +pub fn create_signed_packet( + keypair: &Keypair, + config: &IoConfig, + https_port: u16, + http_port: u16, +) -> Result { + // TODO: Try to resolve first before publishing. + + let mut signed_packet_builder = SignedPacket::builder(); + + let mut svcb = SVCB::new(0, ".".try_into()?); + + // Set the public Ip or localhost + signed_packet_builder = signed_packet_builder.address( + ".".try_into() + .expect(". is valid domain and therefore always succeeds"), + config + .public_addr + .map(|addr| addr.ip()) + .unwrap_or("127.0.0.1".parse().expect("localhost is valid ip")), + 60 * 60, + ); + + // Set the public port or the local https_port + svcb.set_port( + config + .public_addr + .map(|addr| addr.port()) + .unwrap_or(https_port), + ); + + signed_packet_builder = signed_packet_builder.https( + ".".try_into() + .expect(". is valid domain and therefore always succeeds"), + svcb, + 60 * 60, + ); + + // Set low priority https record for legacy browsers support + if let Some(ref domain) = config.domain { + let mut svcb = SVCB::new(10, ".".try_into()?); + + let http_port_be_bytes = http_port.to_be_bytes(); + if domain == "localhost" { + svcb.set_param( + pubky_common::constants::reserved_param_keys::HTTP_PORT, + &http_port_be_bytes, + )?; + } + + svcb.target = domain.as_str().try_into()?; + + signed_packet_builder = signed_packet_builder.https( + ".".try_into() + .expect(". is valid domain and therefore always succeeds"), + svcb, + 60 * 60, + ); + } + + Ok(signed_packet_builder.build(keypair)?) +} diff --git a/pubky-homeserver/src/io/mod.rs b/pubky-homeserver/src/io/mod.rs index 1588dfd..9c7e2f4 100644 --- a/pubky-homeserver/src/io/mod.rs +++ b/pubky-homeserver/src/io/mod.rs @@ -6,8 +6,8 @@ use std::{ use ::pkarr::{Keypair, PublicKey}; use anyhow::Result; +use homeserver_key_republisher::HomeserverKeyRepublisher; use http::HttpServers; -use pkarr::PkarrServer; use tracing::info; use crate::{ @@ -15,8 +15,8 @@ use crate::{ core::{HomeserverCore, SignupMode}, }; +mod homeserver_key_republisher; mod http; -mod pkarr; #[derive(Debug, Default)] /// Builder for [Homeserver]. @@ -88,6 +88,7 @@ impl HomeserverBuilder { pub struct Homeserver { http_servers: HttpServers, keypair: Keypair, + pkarr_server: HomeserverKeyRepublisher, } impl Homeserver { @@ -135,26 +136,23 @@ impl Homeserver { let http_servers = HttpServers::run(&keypair, &config.io, &core.router).await?; - info!( - "Homeserver listening on http://localhost:{}", - http_servers.http_address().port() - ); - - info!("Publishing Pkarr packet.."); - - let pkarr_server = PkarrServer::new( + let dht_republisher = HomeserverKeyRepublisher::new( &keypair, &config.io, http_servers.https_address().port(), http_servers.http_address().port(), )?; - pkarr_server.publish_server_packet().await?; - + dht_republisher.start_periodic_republish().await?; + info!( + "Homeserver listening on http://localhost:{}", + http_servers.http_address().port() + ); info!("Homeserver listening on https://{}", keypair.public_key()); Ok(Self { http_servers, keypair, + pkarr_server: dht_republisher, }) } @@ -173,8 +171,9 @@ impl Homeserver { // === Public Methods === /// Send a shutdown signal to all open resources - pub fn shutdown(&self) { + pub async fn shutdown(&self) { self.http_servers.shutdown(); + self.pkarr_server.stop_periodic_republish().await; } } @@ -197,7 +196,6 @@ impl Default for IoConfig { IoConfig { https_port: DEFAULT_HTTPS_PORT, http_port: DEFAULT_HTTP_PORT, - public_addr: None, domain: None, bootstrap: None, diff --git a/pubky-homeserver/src/io/pkarr.rs b/pubky-homeserver/src/io/pkarr.rs deleted file mode 100644 index e0876e6..0000000 --- a/pubky-homeserver/src/io/pkarr.rs +++ /dev/null @@ -1,103 +0,0 @@ -//! Pkarr related task - -use anyhow::Result; -use pkarr::{dns::rdata::SVCB, Keypair, SignedPacket}; - -use super::IoConfig; - -pub struct PkarrServer { - client: pkarr::Client, - signed_packet: SignedPacket, -} - -impl PkarrServer { - pub fn new( - keypair: &Keypair, - config: &IoConfig, - https_port: u16, - http_port: u16, - ) -> Result { - let mut builder = pkarr::Client::builder(); - - // TODO: should we enable relays in homeservers for udp restricted environments? - builder.no_relays(); - - if let Some(bootstrap) = &config.bootstrap { - builder.bootstrap(bootstrap); - } - - if let Some(request_timeout) = config.dht_request_timeout { - builder.request_timeout(request_timeout); - } - - let client = builder.build()?; - - let signed_packet = create_signed_packet(keypair, config, https_port, http_port)?; - - Ok(Self { - client, - signed_packet, - }) - } - - pub async fn publish_server_packet(&self) -> anyhow::Result<()> { - // TODO: warn if packet is not most recent, which means the - // user is publishing a Packet from somewhere else. - - self.client.publish(&self.signed_packet, None).await?; - - Ok(()) - } -} - -pub fn create_signed_packet( - keypair: &Keypair, - config: &IoConfig, - https_port: u16, - http_port: u16, -) -> Result { - // TODO: Try to resolve first before publishing. - - let mut signed_packet_builder = SignedPacket::builder(); - - let mut svcb = SVCB::new(0, ".".try_into()?); - - // Set the public Ip or the loclahost - signed_packet_builder = signed_packet_builder.address( - ".".try_into().unwrap(), - config - .public_addr - .map(|addr| addr.ip()) - .unwrap_or("127.0.0.1".parse().expect("localhost is valid ip")), - 60 * 60, - ); - - // Set the public port or the local https_port - svcb.set_port( - config - .public_addr - .map(|addr| addr.port()) - .unwrap_or(https_port), - ); - - signed_packet_builder = signed_packet_builder.https(".".try_into().unwrap(), svcb, 60 * 60); - - // Set low priority https record for legacy browsers support - if let Some(ref domain) = config.domain { - let mut svcb = SVCB::new(10, ".".try_into()?); - - let http_port_be_bytes = http_port.to_be_bytes(); - if domain == "localhost" { - svcb.set_param( - pubky_common::constants::reserved_param_keys::HTTP_PORT, - &http_port_be_bytes, - )?; - } - - svcb.target = domain.as_str().try_into()?; - - signed_packet_builder = signed_packet_builder.https(".".try_into().unwrap(), svcb, 60 * 60); - } - - Ok(signed_packet_builder.build(keypair)?) -} diff --git a/pubky-homeserver/src/main.rs b/pubky-homeserver/src/main.rs index 2387d92..7cfd9d0 100644 --- a/pubky-homeserver/src/main.rs +++ b/pubky-homeserver/src/main.rs @@ -39,7 +39,7 @@ async fn main() -> Result<()> { tracing::info!("Shutting down Homeserver"); - server.shutdown(); + server.shutdown().await; Ok(()) }