Update user relay-list via polling

This commit is contained in:
Ken Sedgwick
2024-11-15 08:35:12 -08:00
parent 575d469aa0
commit f00a67ab2c
5 changed files with 298 additions and 54 deletions

5
Cargo.lock generated
View File

@@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
version = 4
[[package]]
name = "ab_glyph"
@@ -2599,7 +2599,7 @@ dependencies = [
[[package]]
name = "notedeck"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"android-activity 0.4.3",
"android_logger",
@@ -2638,6 +2638,7 @@ dependencies = [
"tracing-appender",
"tracing-subscriber",
"tracing-wasm",
"url",
"urlencoding",
"uuid",
"wasm-bindgen-futures",

View File

@@ -55,6 +55,7 @@ dirs = "5.0.1"
tracing-appender = "0.2.3"
urlencoding = "2.1.3"
open = "5.3.0"
url = "2.5"
[dev-dependencies]
tempfile = "3.13.0"

View File

@@ -2,6 +2,7 @@ use crate::relay::{Relay, RelayStatus};
use crate::{ClientMessage, Result};
use nostrdb::Filter;
use std::collections::BTreeSet;
use std::time::{Duration, Instant};
use url::Url;
@@ -89,6 +90,13 @@ impl RelayPool {
false
}
pub fn urls(&self) -> BTreeSet<String> {
self.relays
.iter()
.map(|pool_relay| pool_relay.relay.url.clone())
.collect()
}
pub fn send(&mut self, cmd: &ClientMessage) {
for relay in &mut self.relays {
relay.relay.send(cmd);
@@ -181,6 +189,22 @@ impl RelayPool {
Ok(())
}
pub fn add_urls(
&mut self,
urls: BTreeSet<String>,
wakeup: impl Fn() + Send + Sync + Clone + 'static,
) -> Result<()> {
for url in urls {
self.add_url(url, wakeup.clone())?;
}
Ok(())
}
pub fn remove_urls(&mut self, urls: &BTreeSet<String>) {
self.relays
.retain(|pool_relay| !urls.contains(&pool_relay.relay.url));
}
// standardize the format (ie, trailing slashes)
fn canonicalize_url(url: String) -> String {
match Url::parse(&url) {

View File

@@ -1,7 +1,11 @@
use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet};
use enostr::{FilledKeypair, FullKeypair, Keypair};
use nostrdb::{Ndb, Transaction};
use url::Url;
use uuid::Uuid;
use enostr::{ClientMessage, FilledKeypair, FullKeypair, Keypair, RelayPool};
use nostrdb::{Filter, Ndb, NoteKey, Subscription, Transaction};
use crate::{
column::Columns,
@@ -17,18 +21,115 @@ use crate::{
unknowns::UnknownIds,
user_account::UserAccount,
};
use tracing::{error, info};
use tracing::{debug, error, info};
mod route;
pub use route::{AccountsRoute, AccountsRouteResponse};
pub struct AccountRelayData {
filter: Filter,
subid: String,
sub: Option<Subscription>,
local: BTreeSet<String>, // used locally but not advertised
advertised: BTreeSet<String>, // advertised via NIP-65
}
impl AccountRelayData {
pub fn new(ndb: &Ndb, pool: &mut RelayPool, pubkey: &[u8; 32]) -> Self {
// Construct a filter for the user's NIP-65 relay list
let filter = Filter::new()
.authors([pubkey])
.kinds([10002])
.limit(1)
.build();
// Local ndb subscription
let ndbsub = ndb
.subscribe(&[filter.clone()])
.expect("ndb relay list subscription");
// Query the ndb immediately to see if the user list is already there
let txn = Transaction::new(ndb).expect("transaction");
let lim = filter.limit().unwrap_or(crate::filter::default_limit()) as i32;
let nks = ndb
.query(&txn, &[filter.clone()], lim)
.expect("query user relays results")
.iter()
.map(|qr| qr.note_key)
.collect::<Vec<NoteKey>>();
let relays = Self::harvest_nip65_relays(ndb, &txn, &nks);
debug!(
"pubkey {}: initial relays {:?}",
hex::encode(pubkey),
relays
);
// Id for future remote relay subscriptions
let subid = Uuid::new_v4().to_string();
// Add remote subscription to existing relays
pool.subscribe(subid.clone(), vec![filter.clone()]);
AccountRelayData {
filter,
subid,
sub: Some(ndbsub),
local: BTreeSet::new(),
advertised: relays.into_iter().collect(),
}
}
// standardize the format (ie, trailing slashes) to avoid dups
pub fn canonicalize_url(url: &str) -> String {
match Url::parse(url) {
Ok(parsed_url) => parsed_url.to_string(),
Err(_) => url.to_owned(), // If parsing fails, return the original URL.
}
}
fn harvest_nip65_relays(ndb: &Ndb, txn: &Transaction, nks: &[NoteKey]) -> Vec<String> {
let mut relays = Vec::new();
for nk in nks.iter() {
if let Ok(note) = ndb.get_note_by_key(txn, *nk) {
for tag in note.tags() {
match tag.get(0).and_then(|t| t.variant().str()) {
Some("r") => {
if let Some(url) = tag.get(1).and_then(|f| f.variant().str()) {
relays.push(Self::canonicalize_url(url));
}
}
Some("alt") => {
// ignore for now
}
Some(x) => {
error!("harvest_nip65_relays: unexpected tag type: {}", x);
}
None => {
error!("harvest_nip65_relays: invalid tag");
}
}
}
}
}
relays
}
}
pub struct AccountData {
relay: AccountRelayData,
}
/// The interface for managing the user's accounts.
/// Represents all user-facing operations related to account management.
pub struct Accounts {
currently_selected_account: Option<usize>,
accounts: Vec<UserAccount>,
key_store: KeyStorageType,
account_data: BTreeMap<[u8; 32], AccountData>,
forced_relays: BTreeSet<String>,
bootstrap_relays: BTreeSet<String>,
needs_relay_config: bool,
}
/// Render account management views from a route
@@ -93,7 +194,7 @@ pub fn process_accounts_view_response(
}
impl Accounts {
pub fn new(key_store: KeyStorageType) -> Self {
pub fn new(key_store: KeyStorageType, forced_relays: Vec<String>) -> Self {
let accounts = if let KeyStorageResponse::ReceivedResult(res) = key_store.get_keys() {
res.unwrap_or_default()
} else {
@@ -101,10 +202,31 @@ impl Accounts {
};
let currently_selected_account = get_selected_index(&accounts, &key_store);
let account_data = BTreeMap::new();
let forced_relays: BTreeSet<String> = forced_relays
.into_iter()
.map(|u| AccountRelayData::canonicalize_url(&u))
.collect();
let bootstrap_relays = [
"wss://relay.damus.io",
// "wss://pyramid.fiatjaf.com", // Uncomment if needed
"wss://nos.lol",
"wss://nostr.wine",
"wss://purplepag.es",
]
.iter()
.map(|&url| url.to_string())
.map(|u| AccountRelayData::canonicalize_url(&u))
.collect();
Accounts {
currently_selected_account,
accounts,
key_store,
account_data,
forced_relays,
bootstrap_relays,
needs_relay_config: true,
}
}
@@ -226,6 +348,140 @@ impl Accounts {
self.currently_selected_account = None;
self.key_store.select_key(None);
}
pub fn send_initial_filters(&mut self, pool: &mut RelayPool, relay_url: &str) {
for data in self.account_data.values() {
pool.send_to(
&ClientMessage::req(data.relay.subid.clone(), vec![data.relay.filter.clone()]),
relay_url,
);
}
}
// Returns added and removed accounts
fn delta_accounts(&self) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
let mut added = Vec::new();
for pubkey in self.accounts.iter().map(|a| a.pubkey.bytes()) {
if !self.account_data.contains_key(pubkey) {
added.push(*pubkey);
}
}
let mut removed = Vec::new();
for pubkey in self.account_data.keys() {
if self.contains_account(pubkey).is_none() {
removed.push(*pubkey);
}
}
(added, removed)
}
fn handle_added_account(&mut self, ndb: &Ndb, pool: &mut RelayPool, pubkey: &[u8; 32]) {
debug!("handle_added_account {}", hex::encode(pubkey));
// Create the user account data
let new_account_data = AccountData {
relay: AccountRelayData::new(ndb, pool, pubkey),
};
self.account_data.insert(*pubkey, new_account_data);
}
fn handle_removed_account(&mut self, pubkey: &[u8; 32]) {
debug!("handle_removed_account {}", hex::encode(pubkey));
// FIXME - we need to unsubscribe here
self.account_data.remove(pubkey);
}
fn poll_for_updates(&mut self, ndb: &Ndb) -> bool {
let mut changed = false;
for (pubkey, data) in &mut self.account_data {
if let Some(sub) = data.relay.sub {
let nks = ndb.poll_for_notes(sub, 1);
if !nks.is_empty() {
let txn = Transaction::new(ndb).expect("txn");
let relays = AccountRelayData::harvest_nip65_relays(ndb, &txn, &nks);
debug!(
"pubkey {}: updated relays {:?}",
hex::encode(pubkey),
relays
);
data.relay.advertised = relays.into_iter().collect();
changed = true;
}
}
}
changed
}
fn update_relay_configuration(
&mut self,
pool: &mut RelayPool,
wakeup: impl Fn() + Send + Sync + Clone + 'static,
) {
// If forced relays are set use them only
let mut desired_relays = self.forced_relays.clone();
// Compose the desired relay lists from the accounts
if desired_relays.is_empty() {
for data in self.account_data.values() {
desired_relays.extend(data.relay.local.iter().cloned());
desired_relays.extend(data.relay.advertised.iter().cloned());
}
}
// If no relays are specified at this point use the bootstrap list
if desired_relays.is_empty() {
desired_relays = self.bootstrap_relays.clone();
}
debug!("current relays: {:?}", pool.urls());
debug!("desired relays: {:?}", desired_relays);
let add: BTreeSet<String> = desired_relays.difference(&pool.urls()).cloned().collect();
let sub: BTreeSet<String> = pool.urls().difference(&desired_relays).cloned().collect();
if !add.is_empty() {
debug!("configuring added relays: {:?}", add);
let _ = pool.add_urls(add, wakeup);
}
if !sub.is_empty() {
debug!("removing unwanted relays: {:?}", sub);
pool.remove_urls(&sub);
}
debug!("current relays: {:?}", pool.urls());
}
pub fn update(&mut self, ndb: &Ndb, pool: &mut RelayPool, ctx: &egui::Context) {
// IMPORTANT - This function is called in the UI update loop,
// make sure it is fast when idle
// On the initial update the relays need config even if nothing changes below
let mut relays_changed = self.needs_relay_config;
let ctx2 = ctx.clone();
let wakeup = move || {
ctx2.request_repaint();
};
// Were any accounts added or removed?
let (added, removed) = self.delta_accounts();
for pk in added {
self.handle_added_account(ndb, pool, &pk);
relays_changed = true;
}
for pk in removed {
self.handle_removed_account(&pk);
relays_changed = true;
}
// Did any accounts receive updates (ie NIP-65 relay lists)
relays_changed = self.poll_for_updates(ndb) || relays_changed;
// If needed, update the relay configuration
if relays_changed {
self.update_relay_configuration(pool, wakeup);
self.needs_relay_config = false;
}
}
}
fn get_selected_index(accounts: &[UserAccount], keystore: &KeyStorageType) -> Option<usize> {

View File

@@ -71,31 +71,6 @@ pub struct Damus {
pub textmode: bool,
}
fn relay_setup(pool: &mut RelayPool, ctx: &egui::Context) {
let ctx = ctx.clone();
let wakeup = move || {
ctx.request_repaint();
};
if let Err(e) = pool.add_url("ws://localhost:8080".to_string(), wakeup.clone()) {
error!("{:?}", e)
}
if let Err(e) = pool.add_url("wss://relay.damus.io".to_string(), wakeup.clone()) {
error!("{:?}", e)
}
//if let Err(e) = pool.add_url("wss://pyramid.fiatjaf.com".to_string(), wakeup.clone()) {
//error!("{:?}", e)
//}
if let Err(e) = pool.add_url("wss://nos.lol".to_string(), wakeup.clone()) {
error!("{:?}", e)
}
if let Err(e) = pool.add_url("wss://nostr.wine".to_string(), wakeup.clone()) {
error!("{:?}", e)
}
if let Err(e) = pool.add_url("wss://purplepag.es".to_string(), wakeup) {
error!("{:?}", e)
}
}
fn handle_key_events(input: &egui::InputState, _pixels_per_point: f32, columns: &mut Columns) {
for event in &input.raw.events {
if let egui::Event::Key {
@@ -142,6 +117,10 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> {
match (&ev.event).into() {
RelayEvent::Opened => {
damus
.accounts
.send_initial_filters(&mut damus.pool, &ev.relay);
timeline::send_initial_timeline_filters(
&damus.ndb,
damus.since_optimize,
@@ -213,6 +192,8 @@ fn setup_profiling() {
}
fn update_damus(damus: &mut Damus, ctx: &egui::Context) {
damus.accounts.update(&damus.ndb, &mut damus.pool, ctx); // update user relay and mute lists
match damus.state {
DamusState::Initializing => {
#[cfg(feature = "profiling")]
@@ -422,7 +403,7 @@ impl Damus {
KeyStorageType::None
};
let mut accounts = Accounts::new(keystore);
let mut accounts = Accounts::new(keystore, parsed_args.relays);
let num_keys = parsed_args.keys.len();
@@ -443,27 +424,8 @@ impl Damus {
accounts.select_account(0);
}
// setup relays if we have them
let pool = if parsed_args.relays.is_empty() {
let mut pool = RelayPool::new();
relay_setup(&mut pool, ctx);
pool
} else {
let wakeup = {
let ctx = ctx.clone();
move || {
ctx.request_repaint();
}
};
let mut pool = RelayPool::new();
for relay in parsed_args.relays {
if let Err(e) = pool.add_url(relay.clone(), wakeup.clone()) {
error!("error adding relay {}: {}", relay, e);
}
}
pool
};
// AccountManager will setup the pool on first update
let pool = RelayPool::new();
let account = accounts
.get_selected_account()
@@ -613,7 +575,7 @@ impl Damus {
&config,
)
.expect("ndb"),
accounts: Accounts::new(KeyStorageType::None),
accounts: Accounts::new(KeyStorageType::None, vec![]),
frame_history: FrameHistory::default(),
view_state: ViewState::default(),