finished async migration

This commit is contained in:
Severin Buhler
2024-02-07 19:38:47 +01:00
parent 2c2883215c
commit c7a6c23d48
6 changed files with 79 additions and 29 deletions

39
Cargo.lock generated
View File

@@ -132,6 +132,26 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c07dab4369547dbe5114677b33fbbf724971019f3818172d59a97a61c774ffd" checksum = "3c07dab4369547dbe5114677b33fbbf724971019f3818172d59a97a61c774ffd"
[[package]]
name = "async-lock"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b"
dependencies = [
"event-listener",
]
[[package]]
name = "async-timer"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba5fa6ed76cb2aa820707b4eb9ec46f42da9ce70b0eafab5e5e34942b38a44d5"
dependencies = [
"libc",
"wasm-bindgen",
"winapi",
]
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.77" version = "0.1.77"
@@ -700,6 +720,12 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "event-listener"
version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]] [[package]]
name = "fastrand" name = "fastrand"
version = "2.0.1" version = "2.0.1"
@@ -1171,6 +1197,7 @@ dependencies = [
"ctrlc", "ctrlc",
"pkarr", "pkarr",
"pknames_core", "pknames_core",
"retainer",
"simple-dns", "simple-dns",
"tokio", "tokio",
"ttl_cache", "ttl_cache",
@@ -1344,6 +1371,18 @@ version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
name = "retainer"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df8c01a8276c14d0f8d51ebcf8a48f0748f9f73f5f6b29e688126e6a52bcb145"
dependencies = [
"async-lock",
"async-timer",
"log",
"rand",
]
[[package]] [[package]]
name = "rmp" name = "rmp"
version = "0.8.12" version = "0.8.12"

View File

@@ -18,3 +18,4 @@ chrono = "0.4.33"
tokio = { version = "1.36.0", features = ["full"] } tokio = { version = "1.36.0", features = ["full"] }
async-trait = "0.1.77" async-trait = "0.1.77"
anyhow = "1.0.79" anyhow = "1.0.79"
retainer = "0.3.0"

View File

@@ -15,9 +15,9 @@ struct MyHandler {
} }
impl MyHandler { impl MyHandler {
pub fn new(max_cache_ttl: u64, config_dir_path: &str) -> Self { pub async fn new(max_cache_ttl: u64, config_dir_path: &str) -> Self {
Self { Self {
pkarr: PknamesResolver::new(max_cache_ttl, config_dir_path), pkarr: PknamesResolver::new(max_cache_ttl, config_dir_path).await,
} }
} }
} }
@@ -131,7 +131,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
})); }));
let anydns = Builder::new() let anydns = Builder::new()
.handler(MyHandler::new(cache_ttl, directory)) .handler(MyHandler::new(cache_ttl, directory).await)
.verbose(verbose) .verbose(verbose)
.icann_resolver(forward) .icann_resolver(forward)
.listen(socket) .listen(socket)

View File

@@ -1,28 +1,38 @@
use std::time::Duration; use std::{sync::Arc, time::Duration};
use pkarr::{dns::Packet, PublicKey}; use pkarr::{dns::Packet, PublicKey};
use ttl_cache::TtlCache; use retainer::Cache;
use tokio::{sync::Mutex, task::JoinHandle};
/** /**
* Pkarr record ttl cache * Pkarr record ttl cache
*/ */
#[derive(Clone)]
pub struct PkarrPacketTtlCache { pub struct PkarrPacketTtlCache {
cache: TtlCache<String, Vec<u8>>, cache: Arc<Cache<String, Vec<u8>>>,
max_cache_ttl: u64, max_cache_ttl: u64,
monitor: Arc<Mutex<JoinHandle<()>>>
} }
impl PkarrPacketTtlCache { impl PkarrPacketTtlCache {
pub fn new(max_cache_ttl: u64) -> Self { pub async fn new(max_cache_ttl: u64) -> Self {
let cache: Arc<Cache<String, Vec<u8>>> = Arc::new(Cache::new());
let monitor = tokio::spawn(async move {
cache.monitor(4, 0.25, Duration::from_secs(3)).await
});
let monitor = Arc::new(Mutex::new(monitor));
PkarrPacketTtlCache { PkarrPacketTtlCache {
cache: TtlCache::new(100), cache: Arc::new(Cache::new()),
max_cache_ttl, max_cache_ttl,
monitor
} }
} }
/** /**
* Adds packet and caches it for the ttl the least long lived answer is valid for. * Adds packet and caches it for the ttl the least long lived answer is valid for.
*/ */
pub fn add(&mut self, pubkey: PublicKey, reply: Vec<u8>) { pub async fn add(&mut self, pubkey: PublicKey, reply: Vec<u8>) {
let default_ttl = 1200; let default_ttl = 1200;
let packet = Packet::parse(&reply).unwrap(); let packet = Packet::parse(&reply).unwrap();
let min_ttl = packet let min_ttl = packet
@@ -36,11 +46,15 @@ impl PkarrPacketTtlCache {
let ttl = ttl.min(self.max_cache_ttl); let ttl = ttl.min(self.max_cache_ttl);
let ttl = Duration::from_secs(ttl as u64); let ttl = Duration::from_secs(ttl as u64);
self.cache.insert(pubkey.to_z32(), reply, ttl); self.cache.insert(pubkey.to_z32(), reply, ttl).await;
} }
pub fn get(&self, pubkey: &PublicKey) -> Option<Vec<u8>> { pub async fn get(&self, pubkey: &PublicKey) -> Option<Vec<u8>> {
let z32 = pubkey.to_z32(); let z32 = pubkey.to_z32();
self.cache.get(&z32).map(|value| value.clone()) self.cache.get(&z32).await.map(|value| value.clone())
}
pub async fn stop(self) {
self.monitor.lock().await.abort();
} }
} }

View File

@@ -1,6 +1,4 @@
use std::{ use std::sync::{Arc, Mutex};
sync::{Arc, Mutex},
};
use anyhow::anyhow; use anyhow::anyhow;
use crate::{packet_lookup::resolve_query, pkarr_cache::PkarrPacketTtlCache}; use crate::{packet_lookup::resolve_query, pkarr_cache::PkarrPacketTtlCache};
@@ -25,14 +23,14 @@ impl SignedPacketTimestamp for SignedPacket {
#[derive(Clone)] #[derive(Clone)]
pub struct PkarrResolver { pub struct PkarrResolver {
client: PkarrClient, client: PkarrClient,
cache: Arc<Mutex<PkarrPacketTtlCache>>, cache: PkarrPacketTtlCache,
} }
impl PkarrResolver { impl PkarrResolver {
pub fn new(max_cache_ttl: u64) -> Self { pub async fn new(max_cache_ttl: u64) -> Self {
Self { Self {
client: PkarrClient::new(), client: PkarrClient::new(),
cache: Arc::new(Mutex::new(PkarrPacketTtlCache::new(max_cache_ttl))), cache: PkarrPacketTtlCache::new(max_cache_ttl).await,
} }
} }
@@ -50,8 +48,7 @@ impl PkarrResolver {
} }
async fn resolve_pubkey_respect_cache(&mut self, pubkey: &PublicKey) -> Option<Vec<u8>> { async fn resolve_pubkey_respect_cache(&mut self, pubkey: &PublicKey) -> Option<Vec<u8>> {
let cache = self.cache.lock().unwrap(); let cached_opt = self.cache.get(pubkey).await;
let cached_opt = cache.get(pubkey);
if cached_opt.is_some() { if cached_opt.is_some() {
let reply_bytes = cached_opt.unwrap(); let reply_bytes = cached_opt.unwrap();
return Some(reply_bytes); return Some(reply_bytes);
@@ -63,8 +60,7 @@ impl PkarrResolver {
}; };
let signed_packet = packet_option.unwrap(); let signed_packet = packet_option.unwrap();
let reply_bytes = signed_packet.packet().build_bytes_vec_compressed().unwrap(); let reply_bytes = signed_packet.packet().build_bytes_vec_compressed().unwrap();
let mut cache = self.cache.lock().unwrap(); self.cache.add(pubkey.clone(), reply_bytes.clone()).await;
cache.add(pubkey.clone(), reply_bytes.clone());
Some(reply_bytes) Some(reply_bytes)
} }
@@ -168,7 +164,7 @@ mod tests {
); );
query.questions.push(question); query.questions.push(question);
let mut resolver = PkarrResolver::new(0); let mut resolver = PkarrResolver::new(0).await;
let result = resolver.resolve(&query.build_bytes_vec_compressed().unwrap()).await; let result = resolver.resolve(&query.build_bytes_vec_compressed().unwrap()).await;
assert!(result.is_ok()); assert!(result.is_ok());
let reply_bytes = result.unwrap(); let reply_bytes = result.unwrap();
@@ -195,7 +191,7 @@ mod tests {
true, true,
); );
query.questions.push(question); query.questions.push(question);
let mut resolver = PkarrResolver::new(0); let mut resolver = PkarrResolver::new(0).await;
let result = resolver.resolve(&query.build_bytes_vec_compressed().unwrap()).await; let result = resolver.resolve(&query.build_bytes_vec_compressed().unwrap()).await;
assert!(result.is_ok()); assert!(result.is_ok());
let reply_bytes = result.unwrap(); let reply_bytes = result.unwrap();
@@ -219,7 +215,7 @@ mod tests {
true, true,
); );
query.questions.push(question); query.questions.push(question);
let mut resolver = PkarrResolver::new(0); let mut resolver = PkarrResolver::new(0).await;
let result = resolver.resolve(&query.build_bytes_vec_compressed().unwrap()).await; let result = resolver.resolve(&query.build_bytes_vec_compressed().unwrap()).await;
assert!(result.is_err()); assert!(result.is_err());
// println!("{}", result.unwrap_err()); // println!("{}", result.unwrap_err());
@@ -244,7 +240,7 @@ mod tests {
async fn pkarr_invalid_packet1() { async fn pkarr_invalid_packet1() {
let pubkey = PkarrResolver::parse_pkarr_uri("7fmjpcuuzf54hw18bsgi3zihzyh4awseeuq5tmojefaezjbd64cy").unwrap(); let pubkey = PkarrResolver::parse_pkarr_uri("7fmjpcuuzf54hw18bsgi3zihzyh4awseeuq5tmojefaezjbd64cy").unwrap();
let mut resolver = PkarrResolver::new(0); let mut resolver = PkarrResolver::new(0).await;
let _result = resolver.resolve_pubkey_respect_cache(&pubkey).await; let _result = resolver.resolve_pubkey_respect_cache(&pubkey).await;
// assert!(result.is_some()); // assert!(result.is_some());
} }

View File

@@ -13,9 +13,9 @@ pub struct PknamesResolver {
} }
impl PknamesResolver { impl PknamesResolver {
pub fn new(max_cache_ttl: u64, config_dir_path: &str) -> Self { pub async fn new(max_cache_ttl: u64, config_dir_path: &str) -> Self {
PknamesResolver { PknamesResolver {
pkarr: PkarrResolver::new(max_cache_ttl), pkarr: PkarrResolver::new(max_cache_ttl).await,
config_dir_path: config_dir_path.to_string(), config_dir_path: config_dir_path.to_string(),
} }
} }
@@ -80,7 +80,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn query_pubkey() { async fn query_pubkey() {
let mut pknames = PknamesResolver::new(1, "~/.pknames"); let mut pknames = PknamesResolver::new(1, "~/.pknames").await;
let mut query = Packet::new_query(0); let mut query = Packet::new_query(0);
let name = Name::new("pknames.p2p").unwrap(); let name = Name::new("pknames.p2p").unwrap();