From ad244d48c0b37a113d0dd4ff660d56428a359223 Mon Sep 17 00:00:00 2001 From: William Casarin Date: Sat, 31 Aug 2024 11:47:09 -0700 Subject: [PATCH] fetch contact lists If we don't have a contact list, make sure to fetch one Signed-off-by: William Casarin --- Cargo.lock | 2 +- Cargo.toml | 2 +- enostr/Cargo.toml | 2 +- src/actionbar.rs | 2 +- src/app.rs | 448 +++++++++++++++++++++++++++----------- src/args.rs | 8 +- src/column.rs | 46 ++-- src/error.rs | 37 +++- src/filter.rs | 69 +++++- src/lib.rs | 1 + src/subscriptions.rs | 23 ++ src/thread.rs | 12 +- src/timeline.rs | 55 +++-- src/ui/profile/picture.rs | 2 +- src/ui/thread.rs | 2 +- 15 files changed, 517 insertions(+), 194 deletions(-) create mode 100644 src/subscriptions.rs diff --git a/Cargo.lock b/Cargo.lock index 6abb6ea..cab1c08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2296,7 +2296,7 @@ dependencies = [ [[package]] name = "nostrdb" version = "0.3.4" -source = "git+https://github.com/damus-io/nostrdb-rs?rev=8be0d4972148cc1387ddcaa40b97a924519ba855#8be0d4972148cc1387ddcaa40b97a924519ba855" +source = "git+https://github.com/damus-io/nostrdb-rs?rev=6d22af6d5159be4c9e4579f8c9d3af836e0d470a#6d22af6d5159be4c9e4579f8c9d3af836e0d470a" dependencies = [ "bindgen", "cc", diff --git a/Cargo.toml b/Cargo.toml index 6847fad..aa11f6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ serde_derive = "1" serde = { version = "1", features = ["derive"] } # You only need this if you want app persistence tracing = "0.1.40" #wasm-bindgen = "0.2.83" -nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "8be0d4972148cc1387ddcaa40b97a924519ba855" } +nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "6d22af6d5159be4c9e4579f8c9d3af836e0d470a" } #nostrdb = { path = "/Users/jb55/dev/github/damus-io/nostrdb-rs" } #nostrdb = "0.3.4" enostr = { path = "enostr" } diff --git a/enostr/Cargo.toml b/enostr/Cargo.toml index a528e64..30b705d 100644 --- a/enostr/Cargo.toml +++ b/enostr/Cargo.toml @@ -11,7 +11,7 @@ serde_derive = "1" serde = { version = "1", features = ["derive"] } # You only need this if you want app persistence serde_json = "1.0.89" nostr = { version = "0.30.0" } -nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "8be0d4972148cc1387ddcaa40b97a924519ba855" } +nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "6d22af6d5159be4c9e4579f8c9d3af836e0d470a" } hex = "0.4.3" tracing = "0.1.40" env_logger = "0.11.1" diff --git a/src/actionbar.rs b/src/actionbar.rs index 486e1db..e3f1b06 100644 --- a/src/actionbar.rs +++ b/src/actionbar.rs @@ -76,7 +76,7 @@ fn open_thread( // an active subscription for this thread. if thread.subscription().is_none() { let filters = Thread::filters(root_id); - *thread.subscription_mut() = app.ndb.subscribe(filters.clone()).ok(); + *thread.subscription_mut() = app.ndb.subscribe(&filters).ok(); if thread.remote_subscription().is_some() { error!("Found active remote subscription when it was not expected"); diff --git a/src/app.rs b/src/app.rs index 9f74c4c..e2c2386 100644 --- a/src/app.rs +++ b/src/app.rs @@ -5,6 +5,8 @@ use crate::app_style::user_requested_visuals_change; use crate::args::Args; use crate::column::ColumnKind; use crate::draft::Drafts; +use crate::error::{Error, FilterError}; +use crate::filter::FilterState; use crate::frame_history::FrameHistory; use crate::imgcache::ImageCache; use crate::key_storage::KeyStorageType; @@ -12,6 +14,7 @@ use crate::note::NoteRef; use crate::notecache::{CachedNote, NoteCache}; use crate::relay_pool_manager::RelayPoolManager; use crate::route::Route; +use crate::subscriptions::{SubKind, Subscriptions}; use crate::thread::{DecrementResult, Threads}; use crate::timeline::{Timeline, TimelineSource, ViewFilter}; use crate::ui::note::PostAction; @@ -22,13 +25,14 @@ use egui_nav::{Nav, NavAction}; use enostr::{ClientMessage, RelayEvent, RelayMessage, RelayPool}; use std::cell::RefCell; use std::rc::Rc; +use uuid::Uuid; use egui::{Context, Frame, Style}; use egui_extras::{Size, StripBuilder}; use nostrdb::{BlockType, Config, Filter, Mention, Ndb, Note, NoteKey, Transaction}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::hash::Hash; use std::path::Path; use std::time::Duration; @@ -57,6 +61,7 @@ pub struct Damus { pub threads: Threads, pub img_cache: ImageCache, pub account_manager: AccountManager, + pub subscriptions: Subscriptions, frame_history: crate::frame_history::FrameHistory, @@ -93,54 +98,87 @@ fn relay_setup(pool: &mut RelayPool, ctx: &egui::Context) { } } -fn send_initial_filters(damus: &mut Damus, relay_url: &str) { - info!("Sending initial filters to {}", relay_url); - let mut c: u32 = 1; +fn send_initial_timeline_filter(damus: &mut Damus, timeline: usize) { let can_since_optimize = damus.since_optimize; - for relay in &mut damus.pool.relays { - let relay = &mut relay.relay; - if relay.url == relay_url { - for timeline in &damus.timelines { - let filter = if let Some(filter) = &timeline.filter { - filter.clone() - } else { - // TODO: handle unloaded filters - error!("TODO: handle unloaded filters"); - continue; - }; + let filter_state = damus.timelines[timeline].filter.clone(); - let new_filters = filter.into_iter().map(|f| { - // limit the size of remote filters - let default_limit = filter::default_remote_limit(); - let mut lim = f.limit().unwrap_or(default_limit); - let mut filter = f; - if lim > default_limit { - lim = default_limit; - filter = filter.limit_mut(lim); - } - - let notes = timeline.notes(ViewFilter::NotesAndReplies); - - // Should we since optimize? Not always. For example - // if we only have a few notes locally. One way to - // determine this is by looking at the current filter - // and seeing what its limit is. If we have less - // notes than the limit, we might want to backfill - // older notes - if can_since_optimize && filter::should_since_optimize(lim, notes.len()) { - filter = filter::since_optimize_filter(filter, notes); - } else { - warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", filter); - } - - filter - }).collect(); - relay.subscribe(format!("initial{}", c), new_filters); - c += 1; - } - return; + match filter_state { + FilterState::Broken(err) => { + error!( + "FetchingRemote state in broken state when sending initial timeline filter? {err}" + ); } + + FilterState::FetchingRemote(_unisub) => { + error!("FetchingRemote state when sending initial timeline filter?"); + } + + FilterState::GotRemote(_sub) => { + error!("GotRemote state when sending initial timeline filter?"); + } + + FilterState::Ready(filter) => { + let filter = filter.to_owned(); + let new_filters = filter.into_iter().map(|f| { + // limit the size of remote filters + let default_limit = filter::default_remote_limit(); + let mut lim = f.limit().unwrap_or(default_limit); + let mut filter = f; + if lim > default_limit { + lim = default_limit; + filter = filter.limit_mut(lim); + } + + let notes = damus.timelines[timeline].notes(ViewFilter::NotesAndReplies); + + // Should we since optimize? Not always. For example + // if we only have a few notes locally. One way to + // determine this is by looking at the current filter + // and seeing what its limit is. If we have less + // notes than the limit, we might want to backfill + // older notes + if can_since_optimize && filter::should_since_optimize(lim, notes.len()) { + filter = filter::since_optimize_filter(filter, notes); + } else { + warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", filter); + } + + filter + }).collect(); + + let sub_id = Uuid::new_v4().to_string(); + damus + .subscriptions() + .insert(sub_id.clone(), SubKind::Initial); + + damus.pool.subscribe(sub_id, new_filters); + } + + // we need some data first + FilterState::NeedsRemote(filter) => { + let sub_id = Uuid::new_v4().to_string(); + let uid = damus.timelines[timeline].uid; + let local_sub = damus.ndb.subscribe(&filter).expect("sub"); + + damus.timelines[timeline].filter = + FilterState::fetching_remote(sub_id.clone(), local_sub); + + damus + .subscriptions() + .insert(sub_id.clone(), SubKind::FetchingContactList(uid)); + + damus.pool.subscribe(sub_id, filter.to_owned()); + } + } +} + +fn send_initial_filters(damus: &mut Damus, relay_url: &str) { + info!("Sending initial filters to {}", relay_url); + let timelines = damus.timelines.len(); + + for i in 0..timelines { + send_initial_timeline_filter(damus, i); } } @@ -224,15 +262,20 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> { } } - let txn = Transaction::new(&damus.ndb)?; let mut unknown_ids: HashSet = HashSet::new(); for timeline in 0..damus.timelines.len() { let src = TimelineSource::column(timeline); - if let Err(err) = src.poll_notes_into_view(damus, &txn, &mut unknown_ids) { - error!("{}", err); + + if let Ok(true) = is_timeline_ready(damus, timeline) { + if let Err(err) = src.poll_notes_into_view(damus, &mut unknown_ids) { + error!("poll_notes_into_view: {err}"); + } + } else { + // TODO: show loading? } } + /* let unknown_ids: Vec = unknown_ids.into_iter().collect(); if let Some(filters) = get_unknown_ids_filter(&unknown_ids) { info!( @@ -242,25 +285,81 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> { let msg = ClientMessage::req("unknown_ids".to_string(), filters); damus.pool.send(&msg); } + */ Ok(()) } -#[derive(Hash, Clone, Copy, PartialEq, Eq)] -pub enum UnknownId<'a> { - Pubkey(&'a [u8; 32]), - Id(&'a [u8; 32]), +/// Check our timeline filter and see if we have any filter data ready. +/// Our timelines may require additional data before it is functional. For +/// example, when we have to fetch a contact list before we do the actual +/// following list query. +fn is_timeline_ready(damus: &mut Damus, timeline: usize) -> Result { + let sub = match &damus.timelines[timeline].filter { + FilterState::GotRemote(sub) => *sub, + FilterState::Ready(_f) => return Ok(true), + _ => return Ok(false), + }; + + // We got at least one eose for our filter request. Let's see + // if nostrdb is done processing it yet. + let res = damus.ndb.poll_for_notes(sub, 1); + if res.is_empty() { + debug!("check_timeline_filter_state: no notes found (yet?) for timeline {timeline}"); + return Ok(false); + } + + info!("notes found for contact timeline after GotRemote!"); + + let note_key = res[0]; + + let filter = { + let txn = Transaction::new(&damus.ndb).expect("txn"); + let note = damus.ndb.get_note_by_key(&txn, note_key).expect("note"); + filter::filter_from_tags(¬e).map(|f| f.into_follow_filter()) + }; + + // TODO: into_follow_filter is hardcoded to contact lists, let's generalize + match filter { + Err(Error::Filter(e)) => { + error!("got broken when building filter {e}"); + damus.timelines[timeline].filter = FilterState::broken(e); + } + Err(err) => { + error!("got broken when building filter {err}"); + damus.timelines[timeline].filter = FilterState::broken(FilterError::EmptyContactList); + return Err(err); + } + Ok(filter) => { + // we just switched to the ready state, we should send initial + // queries and setup the local subscription + info!("Found contact list! Setting up local and remote contact list query"); + setup_initial_timeline(damus, timeline, &filter).expect("setup init"); + damus.timelines[timeline].filter = FilterState::ready(filter.clone()); + + let subid = Uuid::new_v4().to_string(); + damus.pool.subscribe(subid, filter) + } + } + + Ok(true) } -impl<'a> UnknownId<'a> { - pub fn is_pubkey(&self) -> Option<&'a [u8; 32]> { +#[derive(Hash, Clone, Copy, PartialEq, Eq)] +pub enum UnknownId { + Pubkey([u8; 32]), + Id([u8; 32]), +} + +impl UnknownId { + pub fn is_pubkey(&self) -> Option<&[u8; 32]> { match self { UnknownId::Pubkey(pk) => Some(pk), _ => None, } } - pub fn is_id(&self) -> Option<&'a [u8; 32]> { + pub fn is_id(&self) -> Option<&[u8; 32]> { match self { UnknownId::Id(id) => Some(id), _ => None, @@ -282,12 +381,12 @@ pub fn get_unknown_note_ids<'a>( txn: &'a Transaction, note: &Note<'a>, note_key: NoteKey, - ids: &mut HashSet>, + ids: &mut HashSet, ) -> Result<()> { // the author pubkey if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { - ids.insert(UnknownId::Pubkey(note.pubkey())); + ids.insert(UnknownId::Pubkey(*note.pubkey())); } // pull notes that notes are replying to @@ -295,14 +394,14 @@ pub fn get_unknown_note_ids<'a>( let note_reply = cached_note.reply.borrow(note.tags()); if let Some(root) = note_reply.root() { if ndb.get_note_by_id(txn, root.id).is_err() { - ids.insert(UnknownId::Id(root.id)); + ids.insert(UnknownId::Id(*root.id)); } } if !note_reply.is_reply_to_root() { if let Some(reply) = note_reply.reply() { if ndb.get_note_by_id(txn, reply.id).is_err() { - ids.insert(UnknownId::Id(reply.id)); + ids.insert(UnknownId::Id(*reply.id)); } } } @@ -317,36 +416,36 @@ pub fn get_unknown_note_ids<'a>( match block.as_mention().unwrap() { Mention::Pubkey(npub) => { if ndb.get_profile_by_pubkey(txn, npub.pubkey()).is_err() { - ids.insert(UnknownId::Pubkey(npub.pubkey())); + ids.insert(UnknownId::Pubkey(*npub.pubkey())); } } Mention::Profile(nprofile) => { if ndb.get_profile_by_pubkey(txn, nprofile.pubkey()).is_err() { - ids.insert(UnknownId::Pubkey(nprofile.pubkey())); + ids.insert(UnknownId::Pubkey(*nprofile.pubkey())); } } Mention::Event(ev) => match ndb.get_note_by_id(txn, ev.id()) { Err(_) => { - ids.insert(UnknownId::Id(ev.id())); + ids.insert(UnknownId::Id(*ev.id())); if let Some(pk) = ev.pubkey() { if ndb.get_profile_by_pubkey(txn, pk).is_err() { - ids.insert(UnknownId::Pubkey(pk)); + ids.insert(UnknownId::Pubkey(*pk)); } } } Ok(note) => { if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { - ids.insert(UnknownId::Pubkey(note.pubkey())); + ids.insert(UnknownId::Pubkey(*note.pubkey())); } } }, Mention::Note(note) => match ndb.get_note_by_id(txn, note.id()) { Err(_) => { - ids.insert(UnknownId::Id(note.id())); + ids.insert(UnknownId::Id(*note.id())); } Ok(note) => { if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { - ids.insert(UnknownId::Pubkey(note.pubkey())); + ids.insert(UnknownId::Pubkey(*note.pubkey())); } } }, @@ -362,48 +461,61 @@ fn setup_profiling() { puffin::set_scopes_on(true); // tell puffin to collect data } +fn setup_initial_timeline(damus: &mut Damus, timeline: usize, filters: &[Filter]) -> Result<()> { + damus.timelines[timeline].subscription = Some(damus.ndb.subscribe(filters)?); + let txn = Transaction::new(&damus.ndb)?; + debug!( + "querying nostrdb sub {:?} {:?}", + damus.timelines[timeline].subscription, damus.timelines[timeline].filter + ); + let lim = filters[0].limit().unwrap_or(crate::filter::default_limit()) as i32; + let results = damus.ndb.query(&txn, filters, lim)?; + + let filters = { + let views = &damus.timelines[timeline].views; + let filters: Vec bool> = + views.iter().map(|v| v.filter.filter()).collect(); + filters + }; + + for result in results { + for (view, filter) in filters.iter().enumerate() { + if filter( + damus + .note_cache_mut() + .cached_note_or_insert_mut(result.note_key, &result.note), + &result.note, + ) { + damus.timelines[timeline].views[view].notes.push(NoteRef { + key: result.note_key, + created_at: result.note.created_at(), + }) + } + } + } + + Ok(()) +} + fn setup_initial_nostrdb_subs(damus: &mut Damus) -> Result<()> { let timelines = damus.timelines.len(); for i in 0..timelines { - let filters = if let Some(filters) = &damus.timelines[i].filter { - filters.clone() - } else { - // TODO: for unloaded filters, we will need to fetch things like - // the contact and relay list from remote relays. - error!("TODO: handle unloaded filters"); - continue; - }; + let filter = damus.timelines[i].filter.clone(); + match filter { + FilterState::Ready(filters) => setup_initial_timeline(damus, i, &filters)?, - damus.timelines[i].subscription = Some(damus.ndb.subscribe(filters.clone())?); - let txn = Transaction::new(&damus.ndb)?; - debug!( - "querying nostrdb sub {} {:?}", - damus.timelines[i].subscription.as_ref().unwrap().id, - damus.timelines[i].filter - ); - let lim = filters[0].limit().unwrap_or(crate::filter::default_limit()) as i32; - let results = damus.ndb.query(&txn, filters.clone(), lim)?; - - let filters = { - let views = &damus.timelines[i].views; - let filters: Vec bool> = - views.iter().map(|v| v.filter.filter()).collect(); - filters - }; - - for result in results { - for (j, filter) in filters.iter().enumerate() { - if filter( - damus - .note_cache_mut() - .cached_note_or_insert_mut(result.note_key, &result.note), - &result.note, - ) { - damus.timelines[i].views[j].notes.push(NoteRef { - key: result.note_key, - created_at: result.note.created_at(), - }) - } + FilterState::Broken(err) => { + error!("FetchingRemote state broken in setup_initial_nostr_subs: {err}") + } + FilterState::FetchingRemote(_) => { + error!("FetchingRemote state in setup_initial_nostr_subs") + } + FilterState::GotRemote(_) => { + error!("GotRemote state in setup_initial_nostr_subs") + } + FilterState::NeedsRemote(_filters) => { + // can't do anything yet, we defer to first connect to send + // remote filters } } } @@ -435,7 +547,7 @@ fn process_event(damus: &mut Damus, _subid: &str, event: &str) { } } -fn get_unknown_ids<'a>(txn: &'a Transaction, damus: &mut Damus) -> Result>> { +fn get_unknown_ids(txn: &Transaction, damus: &mut Damus) -> Result> { #[cfg(feature = "profiling")] puffin::profile_function!(); @@ -475,11 +587,12 @@ fn get_unknown_ids<'a>(txn: &'a Transaction, damus: &mut Damus) -> Result]) -> Option> { +fn get_unknown_ids_filter(ids: &[UnknownId]) -> Option> { if ids.is_empty() { return None; } + let ids = &ids[0..500.min(ids.len())]; let mut filters: Vec = vec![]; let pks: Vec<&[u8; 32]> = ids.iter().flat_map(|id| id.is_pubkey()).collect(); @@ -498,19 +611,75 @@ fn get_unknown_ids_filter(ids: &[UnknownId<'_>]) -> Option> { } fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> { - if subid.starts_with("initial") { - let txn = Transaction::new(&damus.ndb)?; - let ids = get_unknown_ids(&txn, damus)?; - if let Some(filters) = get_unknown_ids_filter(&ids) { - info!("Getting {} unknown ids from {}", ids.len(), relay_url); - let msg = ClientMessage::req("unknown_ids".to_string(), filters); - damus.pool.send_to(&msg, relay_url); - } - } else if subid == "unknown_ids" { - let msg = ClientMessage::close("unknown_ids".to_string()); - damus.pool.send_to(&msg, relay_url); + let sub_kind = if let Some(sub_kind) = damus.subscriptions().get(subid) { + sub_kind } else { warn!("got unknown eose subid {}", subid); + return Ok(()); + }; + + match *sub_kind { + SubKind::Initial => { + let txn = Transaction::new(&damus.ndb)?; + let ids = get_unknown_ids(&txn, damus)?; + if let Some(filters) = get_unknown_ids_filter(&ids) { + info!("Getting {} unknown ids from {}", ids.len(), relay_url); + let sub_id = Uuid::new_v4().to_string(); + + let msg = ClientMessage::req(sub_id.clone(), filters); + // unknownids are a oneshot request + damus.subscriptions().insert(sub_id, SubKind::OneShot); + damus.pool.send_to(&msg, relay_url); + } + } + + // oneshot subs just close when they're done + SubKind::OneShot => { + let msg = ClientMessage::close(subid.to_string()); + damus.pool.send_to(&msg, relay_url); + } + + SubKind::FetchingContactList(timeline_uid) => { + let timeline_ind = if let Some(i) = damus.find_timeline(timeline_uid) { + i + } else { + error!( + "timeline uid:{} not found for FetchingContactList", + timeline_uid + ); + return Ok(()); + }; + + let local_sub = if let FilterState::FetchingRemote(unisub) = + &damus.timelines[timeline_ind].filter + { + unisub.local + } else { + // TODO: we could have multiple contact list results, we need + // to check to see if this one is newer and use that instead + warn!( + "Expected timeline to have FetchingRemote state but was {:?}", + damus.timelines[timeline_ind].filter + ); + return Ok(()); + }; + + damus.timelines[timeline_ind].filter = FilterState::got_remote(local_sub); + + /* + // see if we're fast enough to catch a processed contact list + let note_keys = damus.ndb.poll_for_notes(local_sub, 1); + if !note_keys.is_empty() { + debug!("fast! caught contact list from {relay_url} right away"); + let txn = Transaction::new(&damus.ndb)?; + let note_key = note_keys[0]; + let nr = damus.ndb.get_note_by_key(&txn, note_key)?; + let filter = filter::filter_from_tags(&nr)?.into_follow_filter(); + setup_initial_timeline(damus, timeline, &filter) + damus.timelines[timeline_ind].filter = FilterState::ready(filter); + } + */ + } } Ok(()) @@ -627,15 +796,18 @@ impl Damus { .as_ref() .map(|a| a.pubkey.bytes()); let ndb = Ndb::new(&dbpath, &config).expect("ndb"); - let timelines = parsed_args - .columns - .into_iter() - .map(|c| c.into_timeline(&ndb, account)) - .collect(); + + let mut timelines: Vec = Vec::with_capacity(parsed_args.columns.len()); + for col in parsed_args.columns { + if let Some(timeline) = col.into_timeline(&ndb, account) { + timelines.push(timeline); + } + } Self { pool, is_mobile, + subscriptions: Subscriptions::default(), since_optimize: parsed_args.since_optimize, threads: Threads::default(), drafts: Drafts::default(), @@ -657,7 +829,10 @@ impl Damus { pub fn mock>(data_path: P, is_mobile: bool) -> Self { let mut timelines: Vec = vec![]; let filter = Filter::from_json(include_str!("../queries/global.json")).unwrap(); - timelines.push(Timeline::new(ColumnKind::Universe, Some(vec![filter]))); + timelines.push(Timeline::new( + ColumnKind::Universe, + FilterState::ready(vec![filter]), + )); let imgcache_dir = data_path.as_ref().join(ImageCache::rel_datadir()); let _ = std::fs::create_dir_all(imgcache_dir.clone()); @@ -666,6 +841,7 @@ impl Damus { config.set_ingester_threads(2); Self { is_mobile, + subscriptions: Subscriptions::default(), since_optimize: true, threads: Threads::default(), drafts: Drafts::default(), @@ -685,6 +861,20 @@ impl Damus { } } + pub fn find_timeline(&self, uid: u32) -> Option { + for (i, timeline) in self.timelines.iter().enumerate() { + if timeline.uid == uid { + return Some(i); + } + } + + None + } + + pub fn subscriptions(&mut self) -> &mut HashMap { + &mut self.subscriptions.subs + } + pub fn note_cache_mut(&mut self) -> &mut NoteCache { &mut self.note_cache } @@ -831,13 +1021,17 @@ fn thread_unsubscribe(app: &mut Damus, id: &[u8; 32]) { }; match unsubscribe { - Ok(DecrementResult::LastSubscriber(sub_id)) => { - if let Err(e) = app.ndb.unsubscribe(sub_id) { - error!("failed to unsubscribe from thread: {e}, subid:{sub_id}, {} active subscriptions", app.ndb.subscription_count()); + Ok(DecrementResult::LastSubscriber(sub)) => { + if let Err(e) = app.ndb.unsubscribe(sub) { + error!( + "failed to unsubscribe from thread: {e}, subid:{}, {} active subscriptions", + sub.id(), + app.ndb.subscription_count() + ); } else { info!( "Unsubscribed from thread subid:{}. {} active subscriptions", - sub_id, + sub.id(), app.ndb.subscription_count() ); } diff --git a/src/args.rs b/src/args.rs index 6bcd4d4..bdd39bf 100644 --- a/src/args.rs +++ b/src/args.rs @@ -1,4 +1,5 @@ use crate::column::{ColumnKind, PubkeySource}; +use crate::filter::FilterState; use crate::timeline::Timeline; use enostr::{Filter, Keypair, Pubkey, SecretKey}; use nostrdb::Ndb; @@ -176,9 +177,12 @@ pub enum ArgColumn { } impl ArgColumn { - pub fn into_timeline(self, ndb: &Ndb, user: Option<&[u8; 32]>) -> Timeline { + pub fn into_timeline(self, ndb: &Ndb, user: Option<&[u8; 32]>) -> Option { match self { - ArgColumn::Generic(filters) => Timeline::new(ColumnKind::Generic, Some(filters)), + ArgColumn::Generic(filters) => Some(Timeline::new( + ColumnKind::Generic, + FilterState::ready(filters), + )), ArgColumn::Column(ck) => ck.into_timeline(ndb, user), } } diff --git a/src/column.rs b/src/column.rs index d2fd6b9..43d86d8 100644 --- a/src/column.rs +++ b/src/column.rs @@ -1,7 +1,10 @@ +use crate::error::FilterError; +use crate::filter::FilterState; use crate::{timeline::Timeline, Error}; use enostr::Pubkey; use nostrdb::{Filter, Ndb, Transaction}; use std::fmt::Display; +use tracing::{error, warn}; #[derive(Clone, Debug)] pub enum PubkeySource { @@ -45,47 +48,48 @@ impl ColumnKind { ColumnKind::List(ListKind::Contact(pk)) } - pub fn into_timeline(self, ndb: &Ndb, default_user: Option<&[u8; 32]>) -> Timeline { + pub fn into_timeline(self, ndb: &Ndb, default_user: Option<&[u8; 32]>) -> Option { match self { - ColumnKind::Universe => Timeline::new(ColumnKind::Universe, Some(vec![])), + ColumnKind::Universe => Some(Timeline::new( + ColumnKind::Universe, + FilterState::ready(vec![]), + )), ColumnKind::Generic => { - panic!("you can't convert a ColumnKind::Generic to a Timeline") + warn!("you can't convert a ColumnKind::Generic to a Timeline"); + None } ColumnKind::List(ListKind::Contact(ref pk_src)) => { let pk = match pk_src { - PubkeySource::DeckAuthor => { - if let Some(user_pk) = default_user { - user_pk - } else { - // No user loaded, so we have to return an unloaded - // contact list columns - return Timeline::new( - ColumnKind::contact_list(PubkeySource::DeckAuthor), - None, - ); - } - } + PubkeySource::DeckAuthor => default_user?, PubkeySource::Explicit(pk) => pk.bytes(), }; let contact_filter = Filter::new().authors([pk]).kinds([3]).limit(1).build(); + let txn = Transaction::new(ndb).expect("txn"); let results = ndb - .query(&txn, vec![contact_filter], 1) + .query(&txn, &[contact_filter.clone()], 1) .expect("contact query failed?"); if results.is_empty() { - return Timeline::new(ColumnKind::contact_list(pk_src.to_owned()), None); + return Some(Timeline::new( + ColumnKind::contact_list(pk_src.to_owned()), + FilterState::needs_remote(vec![contact_filter.clone()]), + )); } match Timeline::contact_list(&results[0].note) { - Err(Error::EmptyContactList) => { - Timeline::new(ColumnKind::contact_list(pk_src.to_owned()), None) + Err(Error::Filter(FilterError::EmptyContactList)) => Some(Timeline::new( + ColumnKind::contact_list(pk_src.to_owned()), + FilterState::needs_remote(vec![contact_filter]), + )), + Err(e) => { + error!("Unexpected error: {e}"); + None } - Err(e) => panic!("Unexpected error: {e}"), - Ok(tl) => tl, + Ok(tl) => Some(tl), } } } diff --git a/src/error.rs b/src/error.rs index e377b30..5d6d0d6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,10 @@ use std::{fmt, io}; +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum FilterError { + EmptyContactList, +} + #[derive(Debug, Eq, PartialEq, Copy, Clone)] pub enum SubscriptionError { //#[error("No active subscriptions")] @@ -36,8 +41,8 @@ impl fmt::Display for SubscriptionError { #[derive(Debug)] pub enum Error { SubscriptionError(SubscriptionError), + Filter(FilterError), LoadFailed, - EmptyContactList, Io(io::Error), Nostr(enostr::Error), Ndb(nostrdb::Error), @@ -45,17 +50,33 @@ pub enum Error { Generic(String), } +impl Error { + pub fn empty_contact_list() -> Self { + Error::Filter(FilterError::EmptyContactList) + } +} + +impl fmt::Display for FilterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::EmptyContactList => { + write!(f, "empty contact list") + } + } + } +} + impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::SubscriptionError(sub_err) => { - write!(f, "{sub_err}") + Self::SubscriptionError(e) => { + write!(f, "{e}") } Self::LoadFailed => { write!(f, "load failed") } - Self::EmptyContactList => { - write!(f, "empty contact list") + Self::Filter(e) => { + write!(f, "{e}") } Self::Nostr(e) => write!(f, "{e}"), Self::Ndb(e) => write!(f, "{e}"), @@ -95,3 +116,9 @@ impl From for Error { Error::Io(err) } } + +impl From for Error { + fn from(err: FilterError) -> Self { + Error::Filter(err) + } +} diff --git a/src/filter.rs b/src/filter.rs index a2f49d7..1337c79 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -1,8 +1,67 @@ +use crate::error::{Error, FilterError}; use crate::note::NoteRef; -use crate::{Error, Result}; -use nostrdb::{Filter, FilterBuilder, Note}; +use crate::Result; +use nostrdb::{Filter, FilterBuilder, Note, Subscription}; use tracing::{debug, warn}; +/// A unified subscription has a local and remote component. The remote subid +/// tracks data received remotely, and local +#[derive(Debug, Clone)] +pub struct UnifiedSubscription { + pub local: Subscription, + pub remote: String, +} + +/// We may need to fetch some data from relays before our filter is ready. +/// [`FilterState`] tracks this. +#[derive(Debug, Clone)] +pub enum FilterState { + NeedsRemote(Vec), + FetchingRemote(UnifiedSubscription), + GotRemote(Subscription), + Ready(Vec), + Broken(FilterError), +} + +impl FilterState { + /// We tried to fetch a filter but we wither got no data or the data + /// was corrupted, preventing us from getting to the Ready state. + /// Just mark the timeline as broken so that we can signal to the + /// user that something went wrong + pub fn broken(reason: FilterError) -> Self { + Self::Broken(reason) + } + + /// The filter is ready + pub fn ready(filter: Vec) -> Self { + Self::Ready(filter) + } + + /// We need some data from relays before we can continue. Example: + /// for home timelines where we don't have a contact list yet. We + /// need to fetch the contact list before we have the right timeline + /// filter. + pub fn needs_remote(filter: Vec) -> Self { + Self::NeedsRemote(filter) + } + + /// We got the remote data. Local data should be available to build + /// the filter for the [`FilterState::Ready`] state + pub fn got_remote(local_sub: Subscription) -> Self { + Self::GotRemote(local_sub) + } + + /// We have sent off a remote subscription to get data needed for the + /// filter. The string is the subscription id + pub fn fetching_remote(sub_id: String, local_sub: Subscription) -> Self { + let unified_sub = UnifiedSubscription { + local: local_sub, + remote: sub_id, + }; + Self::FetchingRemote(unified_sub) + } +} + pub fn should_since_optimize(limit: u64, num_notes: usize) -> bool { // rough heuristic for bailing since optimization if we don't have enough notes limit as usize <= num_notes @@ -39,6 +98,10 @@ pub struct FilteredTags { } impl FilteredTags { + pub fn into_follow_filter(self) -> Vec { + self.into_filter([1]) + } + // TODO: make this more general pub fn into_filter(self, kinds: I) -> Vec where @@ -110,7 +173,7 @@ pub fn filter_from_tags(note: &Note) -> Result { if author_count == 0 && hashtag_count == 0 { warn!("no authors or hashtags found in contact list"); - return Err(Error::EmptyContactList); + return Err(Error::empty_contact_list()); } debug!( diff --git a/src/lib.rs b/src/lib.rs index 7894fee..780a1e4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,7 @@ mod profile; pub mod relay_pool_manager; mod result; mod route; +mod subscriptions; mod test_data; mod thread; mod time; diff --git a/src/subscriptions.rs b/src/subscriptions.rs new file mode 100644 index 0000000..df71cc0 --- /dev/null +++ b/src/subscriptions.rs @@ -0,0 +1,23 @@ +use std::collections::HashMap; + +pub enum SubKind { + /// Initial subscription. This is the first time we do a remote subscription + /// for a timeline + Initial, + + /// One shot requests, we can just close after we receive EOSE + OneShot, + + /// We are fetching a contact list so that we can use it for our follows + /// Filter. + // TODO: generalize this to any list? + FetchingContactList(u32), +} + +/// Subscriptions that need to be tracked at various stages. Sometimes we +/// need to do A, then B, then C. Tracking requests at various stages by +/// mapping uuid subids to explicit states happens here. +#[derive(Default)] +pub struct Subscriptions { + pub subs: HashMap, +} diff --git a/src/thread.rs b/src/thread.rs index 2eca410..0dcdf71 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -16,7 +16,7 @@ pub struct Thread { #[derive(Debug, Eq, PartialEq, Copy, Clone)] pub enum DecrementResult { - LastSubscriber(u64), + LastSubscriber(Subscription), ActiveSubscribers, } @@ -54,7 +54,7 @@ impl Thread { let last_note = notes[0]; let filters = Thread::filters_since(root_id, last_note.created_at + 1); - if let Ok(results) = ndb.query(txn, filters, 1000) { + if let Ok(results) = ndb.query(txn, &filters, 1000) { debug!("got {} results from thread update", results.len()); results .into_iter() @@ -72,7 +72,7 @@ impl Thread { match self.subscribers.cmp(&0) { Ordering::Equal => { if let Some(sub) = self.subscription() { - Ok(DecrementResult::LastSubscriber(sub.id)) + Ok(DecrementResult::LastSubscriber(sub)) } else { Err(Error::no_active_sub()) } @@ -82,8 +82,8 @@ impl Thread { } } - pub fn subscription(&self) -> Option<&Subscription> { - self.sub.as_ref() + pub fn subscription(&self) -> Option { + self.sub } pub fn remote_subscription(&self) -> &Option { @@ -171,7 +171,7 @@ impl Threads { // we don't have the thread, query for it! let filters = Thread::filters(root_id); - let notes = if let Ok(results) = ndb.query(txn, filters, 1000) { + let notes = if let Ok(results) = ndb.query(txn, &filters, 1000) { results .into_iter() .map(NoteRef::from_query_result) diff --git a/src/timeline.rs b/src/timeline.rs index a8a87a8..e1d641a 100644 --- a/src/timeline.rs +++ b/src/timeline.rs @@ -1,16 +1,17 @@ use crate::app::{get_unknown_note_ids, UnknownId}; use crate::column::{ColumnKind, PubkeySource}; use crate::error::Error; -use crate::filter; use crate::note::NoteRef; use crate::notecache::CachedNote; +use crate::{filter, filter::FilterState}; use crate::{Damus, Result}; +use std::sync::atomic::{AtomicU32, Ordering}; use crate::route::Route; use egui_virtual_list::VirtualList; use enostr::Pubkey; -use nostrdb::{Filter, Note, Subscription, Transaction}; +use nostrdb::{Note, Subscription, Transaction}; use std::cell::RefCell; use std::collections::HashSet; use std::rc::Rc; @@ -50,9 +51,9 @@ impl<'a> TimelineSource<'a> { } } - pub fn sub<'b>(self, app: &'b mut Damus, txn: &Transaction) -> Option<&'b Subscription> { + pub fn sub(self, app: &mut Damus, txn: &Transaction) -> Option { match self { - TimelineSource::Column { ind, .. } => app.timelines[ind].subscription.as_ref(), + TimelineSource::Column { ind, .. } => app.timelines[ind].subscription, TimelineSource::Thread(root_id) => { // TODO: replace all this with the raw entry api eventually @@ -67,34 +68,34 @@ impl<'a> TimelineSource<'a> { } } + /// Check local subscriptions for new notes and insert them into + /// timelines (threads, columns) pub fn poll_notes_into_view( &self, app: &mut Damus, - txn: &'a Transaction, - ids: &mut HashSet>, + ids: &mut HashSet, ) -> Result<()> { - let sub_id = if let Some(sub_id) = self.sub(app, txn).map(|s| s.id) { - sub_id - } else { - return Err(Error::no_active_sub()); + let sub = { + let txn = Transaction::new(&app.ndb).expect("txn"); + if let Some(sub) = self.sub(app, &txn) { + sub + } else { + return Err(Error::no_active_sub()); + } }; - // - // TODO(BUG!): poll for these before the txn, otherwise we can hit - // a race condition where we hit the "no note??" expect below. This may - // require some refactoring due to the missing ids logic - // - let new_note_ids = app.ndb.poll_for_notes(sub_id, 100); + let new_note_ids = app.ndb.poll_for_notes(sub, 100); if new_note_ids.is_empty() { return Ok(()); } else { debug!("{} new notes! {:?}", new_note_ids.len(), new_note_ids); } + let txn = Transaction::new(&app.ndb).expect("txn"); let mut new_refs: Vec<(Note, NoteRef)> = Vec::with_capacity(new_note_ids.len()); for key in new_note_ids { - let note = if let Ok(note) = app.ndb.get_note_by_key(txn, key) { + let note = if let Ok(note) = app.ndb.get_note_by_key(&txn, key) { note } else { error!("hit race condition in poll_notes_into_view: https://github.com/damus-io/nostrdb/issues/35 note {:?} was not added to timeline", key); @@ -105,7 +106,7 @@ impl<'a> TimelineSource<'a> { .note_cache_mut() .cached_note_or_insert(key, ¬e) .clone(); - let _ = get_unknown_note_ids(&app.ndb, &cached_note, txn, ¬e, key, ids); + let _ = get_unknown_note_ids(&app.ndb, &cached_note, &txn, ¬e, key, ids); let created_at = note.created_at(); new_refs.push((note, NoteRef { key, created_at })); @@ -123,7 +124,7 @@ impl<'a> TimelineSource<'a> { let refs: Vec = new_refs.iter().map(|(_note, nr)| *nr).collect(); let reversed = false; - self.view(app, txn, ViewFilter::NotesAndReplies) + self.view(app, &txn, ViewFilter::NotesAndReplies) .insert(&refs, reversed); } @@ -142,7 +143,7 @@ impl<'a> TimelineSource<'a> { } } - self.view(app, txn, ViewFilter::Notes) + self.view(app, &txn, ViewFilter::Notes) .insert(&filtered_refs, reversed); } @@ -277,10 +278,11 @@ impl TimelineTab { /// A column in a deck. Holds navigation state, loaded notes, column kind, etc. pub struct Timeline { + pub uid: u32, pub kind: ColumnKind, // We may not have the filter loaded yet, so let's make it an option so // that codepaths have to explicitly handle it - pub filter: Option>, + pub filter: FilterState, pub views: Vec, pub selected_view: i32, pub routes: Vec, @@ -294,16 +296,19 @@ pub struct Timeline { impl Timeline { /// Create a timeline from a contact list pub fn contact_list(contact_list: &Note) -> Result { - let filter = filter::filter_from_tags(contact_list)?.into_filter([1]); + let filter = filter::filter_from_tags(contact_list)?.into_follow_filter(); let pk_src = PubkeySource::Explicit(Pubkey::new(contact_list.pubkey())); Ok(Timeline::new( ColumnKind::contact_list(pk_src), - Some(filter), + FilterState::ready(filter), )) } - pub fn new(kind: ColumnKind, filter: Option>) -> Self { + pub fn new(kind: ColumnKind, filter: FilterState) -> Self { + // global unique id for all new timelines + static UIDS: AtomicU32 = AtomicU32::new(0); + let subscription: Option = None; let notes = TimelineTab::new(ViewFilter::Notes); let replies = TimelineTab::new(ViewFilter::NotesAndReplies); @@ -312,8 +317,10 @@ impl Timeline { let routes = vec![Route::Timeline(format!("{}", kind))]; let navigating = false; let returning = false; + let uid = UIDS.fetch_add(1, Ordering::Relaxed); Timeline { + uid, kind, navigating, returning, diff --git a/src/ui/profile/picture.rs b/src/ui/profile/picture.rs index b34b55d..69fc72d 100644 --- a/src/ui/profile/picture.rs +++ b/src/ui/profile/picture.rs @@ -144,7 +144,7 @@ mod preview { let mut pks = HashSet::new(); let mut keys = HashSet::new(); - for query_result in ndb.query(&txn, filters, 2000).unwrap() { + for query_result in ndb.query(&txn, &filters, 2000).unwrap() { pks.insert(query_result.note.pubkey()); } diff --git a/src/ui/thread.rs b/src/ui/thread.rs index 7ea0d56..b904586 100644 --- a/src/ui/thread.rs +++ b/src/ui/thread.rs @@ -75,7 +75,7 @@ impl<'a> ThreadView<'a> { { let mut ids = HashSet::new(); let _ = TimelineSource::Thread(root_id) - .poll_notes_into_view(self.app, &txn, &mut ids); + .poll_notes_into_view(self.app, &mut ids); // TODO: do something with unknown ids }