diff --git a/Cargo.lock b/Cargo.lock index 9f7c87a..c0cf7d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -132,6 +132,26 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c07dab4369547dbe5114677b33fbbf724971019f3818172d59a97a61c774ffd" +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-timer" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5fa6ed76cb2aa820707b4eb9ec46f42da9ce70b0eafab5e5e34942b38a44d5" +dependencies = [ + "libc", + "wasm-bindgen", + "winapi", +] + [[package]] name = "async-trait" version = "0.1.77" @@ -700,6 +720,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fastrand" version = "2.0.1" @@ -1171,6 +1197,7 @@ dependencies = [ "ctrlc", "pkarr", "pknames_core", + "retainer", "simple-dns", "tokio", "ttl_cache", @@ -1344,6 +1371,18 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "retainer" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8c01a8276c14d0f8d51ebcf8a48f0748f9f73f5f6b29e688126e6a52bcb145" +dependencies = [ + "async-lock", + "async-timer", + "log", + "rand", +] + [[package]] name = "rmp" version = "0.8.12" diff --git a/Cargo.toml b/Cargo.toml index 6c603f8..781ebf9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,3 +18,4 @@ chrono = "0.4.33" tokio = { version = "1.36.0", features = ["full"] } async-trait = "0.1.77" anyhow = "1.0.79" +retainer = "0.3.0" diff --git a/src/main.rs b/src/main.rs index 76b03c0..52ec1cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,9 +15,9 @@ struct MyHandler { } impl MyHandler { - pub fn new(max_cache_ttl: u64, config_dir_path: &str) -> Self { + pub async fn new(max_cache_ttl: u64, config_dir_path: &str) -> Self { Self { - pkarr: PknamesResolver::new(max_cache_ttl, config_dir_path), + pkarr: PknamesResolver::new(max_cache_ttl, config_dir_path).await, } } } @@ -131,7 +131,7 @@ async fn main() -> Result<(), Box> { })); let anydns = Builder::new() - .handler(MyHandler::new(cache_ttl, directory)) + .handler(MyHandler::new(cache_ttl, directory).await) .verbose(verbose) .icann_resolver(forward) .listen(socket) diff --git a/src/pkarr_cache.rs b/src/pkarr_cache.rs index d7fd9a4..c7c42a1 100644 --- a/src/pkarr_cache.rs +++ b/src/pkarr_cache.rs @@ -1,28 +1,38 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use pkarr::{dns::Packet, PublicKey}; -use ttl_cache::TtlCache; +use retainer::Cache; +use tokio::{sync::Mutex, task::JoinHandle}; + /** * Pkarr record ttl cache */ +#[derive(Clone)] pub struct PkarrPacketTtlCache { - cache: TtlCache>, + cache: Arc>>, max_cache_ttl: u64, + monitor: Arc>> } impl PkarrPacketTtlCache { - pub fn new(max_cache_ttl: u64) -> Self { + pub async fn new(max_cache_ttl: u64) -> Self { + let cache: Arc>> = Arc::new(Cache::new()); + let monitor = tokio::spawn(async move { + cache.monitor(4, 0.25, Duration::from_secs(3)).await + }); + let monitor = Arc::new(Mutex::new(monitor)); PkarrPacketTtlCache { - cache: TtlCache::new(100), + cache: Arc::new(Cache::new()), max_cache_ttl, + monitor } } /** * Adds packet and caches it for the ttl the least long lived answer is valid for. */ - pub fn add(&mut self, pubkey: PublicKey, reply: Vec) { + pub async fn add(&mut self, pubkey: PublicKey, reply: Vec) { let default_ttl = 1200; let packet = Packet::parse(&reply).unwrap(); let min_ttl = packet @@ -36,11 +46,15 @@ impl PkarrPacketTtlCache { let ttl = ttl.min(self.max_cache_ttl); let ttl = Duration::from_secs(ttl as u64); - self.cache.insert(pubkey.to_z32(), reply, ttl); + self.cache.insert(pubkey.to_z32(), reply, ttl).await; } - pub fn get(&self, pubkey: &PublicKey) -> Option> { + pub async fn get(&self, pubkey: &PublicKey) -> Option> { let z32 = pubkey.to_z32(); - self.cache.get(&z32).map(|value| value.clone()) + self.cache.get(&z32).await.map(|value| value.clone()) + } + + pub async fn stop(self) { + self.monitor.lock().await.abort(); } } diff --git a/src/pkarr_resolver.rs b/src/pkarr_resolver.rs index fd39dc0..c5d70ff 100644 --- a/src/pkarr_resolver.rs +++ b/src/pkarr_resolver.rs @@ -1,6 +1,4 @@ -use std::{ - sync::{Arc, Mutex}, -}; +use std::sync::{Arc, Mutex}; use anyhow::anyhow; use crate::{packet_lookup::resolve_query, pkarr_cache::PkarrPacketTtlCache}; @@ -25,14 +23,14 @@ impl SignedPacketTimestamp for SignedPacket { #[derive(Clone)] pub struct PkarrResolver { client: PkarrClient, - cache: Arc>, + cache: PkarrPacketTtlCache, } impl PkarrResolver { - pub fn new(max_cache_ttl: u64) -> Self { + pub async fn new(max_cache_ttl: u64) -> Self { Self { client: PkarrClient::new(), - cache: Arc::new(Mutex::new(PkarrPacketTtlCache::new(max_cache_ttl))), + cache: PkarrPacketTtlCache::new(max_cache_ttl).await, } } @@ -50,8 +48,7 @@ impl PkarrResolver { } async fn resolve_pubkey_respect_cache(&mut self, pubkey: &PublicKey) -> Option> { - let cache = self.cache.lock().unwrap(); - let cached_opt = cache.get(pubkey); + let cached_opt = self.cache.get(pubkey).await; if cached_opt.is_some() { let reply_bytes = cached_opt.unwrap(); return Some(reply_bytes); @@ -63,8 +60,7 @@ impl PkarrResolver { }; let signed_packet = packet_option.unwrap(); let reply_bytes = signed_packet.packet().build_bytes_vec_compressed().unwrap(); - let mut cache = self.cache.lock().unwrap(); - cache.add(pubkey.clone(), reply_bytes.clone()); + self.cache.add(pubkey.clone(), reply_bytes.clone()).await; Some(reply_bytes) } @@ -168,7 +164,7 @@ mod tests { ); query.questions.push(question); - let mut resolver = PkarrResolver::new(0); + let mut resolver = PkarrResolver::new(0).await; let result = resolver.resolve(&query.build_bytes_vec_compressed().unwrap()).await; assert!(result.is_ok()); let reply_bytes = result.unwrap(); @@ -195,7 +191,7 @@ mod tests { true, ); query.questions.push(question); - let mut resolver = PkarrResolver::new(0); + let mut resolver = PkarrResolver::new(0).await; let result = resolver.resolve(&query.build_bytes_vec_compressed().unwrap()).await; assert!(result.is_ok()); let reply_bytes = result.unwrap(); @@ -219,7 +215,7 @@ mod tests { true, ); query.questions.push(question); - let mut resolver = PkarrResolver::new(0); + let mut resolver = PkarrResolver::new(0).await; let result = resolver.resolve(&query.build_bytes_vec_compressed().unwrap()).await; assert!(result.is_err()); // println!("{}", result.unwrap_err()); @@ -244,7 +240,7 @@ mod tests { async fn pkarr_invalid_packet1() { let pubkey = PkarrResolver::parse_pkarr_uri("7fmjpcuuzf54hw18bsgi3zihzyh4awseeuq5tmojefaezjbd64cy").unwrap(); - let mut resolver = PkarrResolver::new(0); + let mut resolver = PkarrResolver::new(0).await; let _result = resolver.resolve_pubkey_respect_cache(&pubkey).await; // assert!(result.is_some()); } diff --git a/src/pknames_resolver.rs b/src/pknames_resolver.rs index f86f952..26e4669 100644 --- a/src/pknames_resolver.rs +++ b/src/pknames_resolver.rs @@ -13,9 +13,9 @@ pub struct PknamesResolver { } impl PknamesResolver { - pub fn new(max_cache_ttl: u64, config_dir_path: &str) -> Self { + pub async fn new(max_cache_ttl: u64, config_dir_path: &str) -> Self { PknamesResolver { - pkarr: PkarrResolver::new(max_cache_ttl), + pkarr: PkarrResolver::new(max_cache_ttl).await, config_dir_path: config_dir_path.to_string(), } } @@ -80,7 +80,7 @@ mod tests { #[tokio::test] async fn query_pubkey() { - let mut pknames = PknamesResolver::new(1, "~/.pknames"); + let mut pknames = PknamesResolver::new(1, "~/.pknames").await; let mut query = Packet::new_query(0); let name = Name::new("pknames.p2p").unwrap();