From 593df9145bf61d2adc5d716b797e1eb9a26e9cc1 Mon Sep 17 00:00:00 2001 From: William Casarin Date: Mon, 29 Jul 2024 10:48:16 -0500 Subject: [PATCH] threads: check for new notes locally when thread is re-opened We have a NoteRef cache for threads in memory, which is just a list of NoteKeys and timestamps. When reopening a thread, query the local DB to see if there are any new notes that we might have missed because we weren't actively subscribed to them. Signed-off-by: William Casarin --- src/actionbar.rs | 124 ++++++++++++++++++++++++++++++--------------- src/app.rs | 11 ++-- src/thread.rs | 58 ++++++++++++++------- src/timeline.rs | 7 ++- src/ui/thread.rs | 12 ++++- src/ui/timeline.rs | 18 ++++++- 6 files changed, 160 insertions(+), 70 deletions(-) diff --git a/src/actionbar.rs b/src/actionbar.rs index 0863fdb..576c50f 100644 --- a/src/actionbar.rs +++ b/src/actionbar.rs @@ -1,4 +1,9 @@ -use crate::{route::Route, thread::Thread, Damus}; +use crate::{ + note::NoteRef, + route::Route, + thread::{Thread, ThreadResult}, + Damus, +}; use enostr::NoteId; use nostrdb::Transaction; use tracing::{info, warn}; @@ -9,6 +14,79 @@ pub enum BarAction { OpenThread, } +pub enum BarResult { + NewThreadNotes(Vec), +} + +/// open_thread is called when a note is selected and we need to navigate +/// to a thread It is responsible for managing the subscription and +/// making sure the thread is up to date. In a sense, it's a model for +/// the thread view. We don't have a concept of model/view/controller etc +/// in egui, but this is the closest thing to that. +fn open_thread( + app: &mut Damus, + txn: &Transaction, + timeline: usize, + selected_note: &[u8; 32], +) -> Option { + { + let timeline = &mut app.timelines[timeline]; + timeline + .routes + .push(Route::Thread(NoteId::new(selected_note.to_owned()))); + timeline.navigating = true; + } + + let root_id = crate::note::root_note_id_from_selected_id(app, txn, selected_note); + let thread_res = app.threads.thread_mut(&app.ndb, txn, root_id); + + // The thread is stale, let's update it + let (thread, result) = match thread_res { + ThreadResult::Stale(thread) => { + let notes = Thread::new_notes(&thread.view.notes, root_id, txn, &app.ndb); + // + // we can't insert and update the VirtualList now, because we + // are already borrowing it mutably. Let's pass it as a + // result instead + // + // thread.view.insert(¬es); + (thread, Some(BarResult::NewThreadNotes(notes))) + } + + ThreadResult::Fresh(thread) => (thread, None), + }; + + // only start a subscription on nav and if we don't have + // an active subscription for this thread. + if thread.subscription().is_none() { + *thread.subscription_mut() = app.ndb.subscribe(Thread::filters(root_id)).ok(); + + match thread.subscription() { + Some(_sub) => { + thread.subscribers += 1; + info!( + "Locally subscribing to thread. {} total active subscriptions, {} on this thread", + app.ndb.subscription_count(), + thread.subscribers, + ); + } + None => warn!( + "Error subscribing locally to selected note '{}''s thread", + hex::encode(selected_note) + ), + } + } else { + thread.subscribers += 1; + info!( + "Re-using existing thread subscription. {} total active subscriptions, {} on this thread", + app.ndb.subscription_count(), + thread.subscribers, + ) + } + + result +} + impl BarAction { pub fn execute( self, @@ -16,7 +94,7 @@ impl BarAction { timeline: usize, replying_to: &[u8; 32], txn: &Transaction, - ) { + ) -> Option { match self { BarAction::Reply => { let timeline = &mut app.timelines[timeline]; @@ -24,48 +102,10 @@ impl BarAction { .routes .push(Route::Reply(NoteId::new(replying_to.to_owned()))); timeline.navigating = true; + None } - BarAction::OpenThread => { - { - let timeline = &mut app.timelines[timeline]; - timeline - .routes - .push(Route::Thread(NoteId::new(replying_to.to_owned()))); - timeline.navigating = true; - } - - let root_id = crate::note::root_note_id_from_selected_id(app, txn, replying_to); - let thread = app.threads.thread_mut(&app.ndb, txn, root_id); - - // only start a subscription on nav and if we don't have - // an active subscription for this thread. - if thread.subscription().is_none() { - *thread.subscription_mut() = app.ndb.subscribe(Thread::filters(root_id)).ok(); - - match thread.subscription() { - Some(_sub) => { - thread.subscribers += 1; - info!( - "Locally subscribing to thread. {} total active subscriptions, {} on this thread", - app.ndb.subscription_count(), - thread.subscribers, - ); - } - None => warn!( - "Error subscribing locally to selected note '{}''s thread", - hex::encode(replying_to) - ), - } - } else { - thread.subscribers += 1; - info!( - "Re-using existing thread subscription. {} total active subscriptions, {} on this thread", - app.ndb.subscription_count(), - thread.subscribers, - ) - } - } + BarAction::OpenThread => open_thread(app, txn, timeline, replying_to), } } } diff --git a/src/app.rs b/src/app.rs index 5de6df2..3f270b2 100644 --- a/src/app.rs +++ b/src/app.rs @@ -887,9 +887,14 @@ fn thread_unsubscribe(app: &mut Damus, id: &[u8; 32]) { debug!("thread unsubbing from root_id {}", hex::encode(root_id)); - app.threads - .thread_mut(&app.ndb, &txn, root_id) - .decrement_sub() + let thread = app.threads.thread_mut(&app.ndb, &txn, root_id).get_ptr(); + let unsub = thread.decrement_sub(); + + if let Ok(DecrementResult::LastSubscriber(_subid)) = unsub { + *thread.subscription_mut() = None; + } + + unsub }; match unsubscribe { diff --git a/src/thread.rs b/src/thread.rs index 067e6cd..17c9371 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -2,6 +2,7 @@ use crate::note::NoteRef; use crate::timeline::{TimelineTab, ViewFilter}; use crate::Error; use nostrdb::{Filter, Ndb, Subscription, Transaction}; +use std::cmp::Ordering; use std::collections::HashMap; use tracing::debug; @@ -51,15 +52,13 @@ impl Thread { let filters = Thread::filters_since(root_id, last_note.created_at - 60); if let Ok(results) = ndb.query(txn, filters, 1000) { + debug!("got {} results from thread update", results.len()); results .into_iter() .map(NoteRef::from_query_result) .collect() } else { - debug!( - "got no results from thread update for {}", - hex::encode(root_id) - ); + debug!("got no results from thread update",); vec![] } } @@ -67,17 +66,17 @@ impl Thread { pub fn decrement_sub(&mut self) -> Result { debug!("decrementing sub {:?}", self.subscription().map(|s| s.id)); self.subscribers -= 1; - if self.subscribers == 0 { - // unsub from thread - if let Some(sub) = self.subscription() { - Ok(DecrementResult::LastSubscriber(sub.id)) - } else { - Err(Error::no_active_sub()) + + match self.subscribers.cmp(&0) { + Ordering::Equal => { + if let Some(sub) = self.subscription() { + Ok(DecrementResult::LastSubscriber(sub.id)) + } else { + Err(Error::no_active_sub()) + } } - } else if self.subscribers < 0 { - Err(Error::unexpected_sub_count(self.subscribers)) - } else { - Ok(DecrementResult::ActiveSubscribers) + Ordering::Less => Err(Error::unexpected_sub_count(self.subscribers)), + Ordering::Greater => Ok(DecrementResult::ActiveSubscribers), } } @@ -121,6 +120,27 @@ pub struct Threads { pub root_id_to_thread: HashMap<[u8; 32], Thread>, } +pub enum ThreadResult<'a> { + Fresh(&'a mut Thread), + Stale(&'a mut Thread), +} + +impl<'a> ThreadResult<'a> { + pub fn get_ptr(self) -> &'a mut Thread { + match self { + Self::Fresh(ptr) => ptr, + Self::Stale(ptr) => ptr, + } + } + + pub fn is_stale(&self) -> bool { + match self { + Self::Fresh(_ptr) => false, + Self::Stale(_ptr) => true, + } + } +} + impl Threads { pub fn thread_expected_mut(&mut self, root_id: &[u8; 32]) -> &mut Thread { self.root_id_to_thread @@ -129,17 +149,17 @@ impl Threads { } pub fn thread_mut<'a>( - &mut self, + &'a mut self, ndb: &Ndb, txn: &Transaction, root_id: &[u8; 32], - ) -> &mut Thread { + ) -> ThreadResult<'a> { // we can't use the naive hashmap entry API here because lookups // require a copy, wait until we have a raw entry api. We could // also use hashbrown? if self.root_id_to_thread.contains_key(root_id) { - return self.root_id_to_thread.get_mut(root_id).unwrap(); + return ThreadResult::Stale(self.root_id_to_thread.get_mut(root_id).unwrap()); } // looks like we don't have this thread yet, populate it @@ -150,7 +170,7 @@ impl Threads { debug!("couldnt find root note root_id:{}", hex::encode(root_id)); self.root_id_to_thread .insert(root_id.to_owned(), Thread::new(vec![])); - return self.root_id_to_thread.get_mut(root_id).unwrap(); + return ThreadResult::Fresh(self.root_id_to_thread.get_mut(root_id).unwrap()); }; // we don't have the thread, query for it! @@ -172,7 +192,7 @@ impl Threads { debug!("found thread with {} notes", notes.len()); self.root_id_to_thread .insert(root_id.to_owned(), Thread::new(notes)); - self.root_id_to_thread.get_mut(root_id).unwrap() + ThreadResult::Fresh(self.root_id_to_thread.get_mut(root_id).unwrap()) } //fn thread_by_id(&self, ndb: &Ndb, id: &[u8; 32]) -> &mut Thread { diff --git a/src/timeline.rs b/src/timeline.rs index 10c023e..d3a4ca4 100644 --- a/src/timeline.rs +++ b/src/timeline.rs @@ -40,7 +40,7 @@ impl<'a> TimelineSource<'a> { let thread = if app.threads.root_id_to_thread.contains_key(root_id) { app.threads.thread_expected_mut(root_id) } else { - app.threads.thread_mut(&app.ndb, txn, root_id) + app.threads.thread_mut(&app.ndb, txn, root_id).get_ptr() }; &mut thread.view @@ -57,7 +57,7 @@ impl<'a> TimelineSource<'a> { let thread = if app.threads.root_id_to_thread.contains_key(root_id) { app.threads.thread_expected_mut(root_id) } else { - app.threads.thread_mut(&app.ndb, txn, root_id) + app.threads.thread_mut(&app.ndb, txn, root_id).get_ptr() }; thread.subscription() @@ -213,6 +213,9 @@ impl TimelineTab { } pub fn insert(&mut self, new_refs: &[NoteRef]) { + if new_refs.is_empty() { + return; + } let num_prev_items = self.notes.len(); let (notes, merge_kind) = crate::timeline::merge_sorted_vecs(&self.notes, new_refs); diff --git a/src/ui/thread.rs b/src/ui/thread.rs index 8161f71..f1414e2 100644 --- a/src/ui/thread.rs +++ b/src/ui/thread.rs @@ -79,7 +79,11 @@ impl<'a> ThreadView<'a> { } let (len, list) = { - let thread = self.app.threads.thread_mut(&self.app.ndb, &txn, root_id); + let thread = self + .app + .threads + .thread_mut(&self.app.ndb, &txn, root_id) + .get_ptr(); let len = thread.view.notes.len(); (len, &mut thread.view.list) @@ -92,7 +96,11 @@ impl<'a> ThreadView<'a> { ui.spacing_mut().item_spacing.x = 4.0; let note_key = { - let thread = self.app.threads.thread_mut(&self.app.ndb, &txn, root_id); + let thread = self + .app + .threads + .thread_mut(&self.app.ndb, &txn, root_id) + .get_ptr(); thread.view.notes[start_index].key }; diff --git a/src/ui/timeline.rs b/src/ui/timeline.rs index 1f0355a..a6af4d8 100644 --- a/src/ui/timeline.rs +++ b/src/ui/timeline.rs @@ -1,4 +1,4 @@ -use crate::{draft::DraftSource, ui, ui::note::PostAction, Damus}; +use crate::{actionbar::BarResult, draft::DraftSource, ui, ui::note::PostAction, Damus}; use egui::containers::scroll_area::ScrollBarVisibility; use egui::{Direction, Layout}; use egui_tabs::TabColor; @@ -56,6 +56,7 @@ fn timeline_ui(ui: &mut egui::Ui, app: &mut Damus, timeline: usize, reversed: bo .show(ui, |ui| { let view = app.timelines[timeline].current_view(); let len = view.notes.len(); + let mut bar_result: Option = None; view.list .clone() .borrow_mut() @@ -92,7 +93,10 @@ fn timeline_ui(ui: &mut egui::Ui, app: &mut Damus, timeline: usize, reversed: bo .show(ui); if let Some(action) = resp.action { - action.execute(app, timeline, note.id(), &txn); + let br = action.execute(app, timeline, note.id(), &txn); + if br.is_some() { + bar_result = br; + } } else if resp.response.clicked() { debug!("clicked note"); } @@ -103,6 +107,16 @@ fn timeline_ui(ui: &mut egui::Ui, app: &mut Damus, timeline: usize, reversed: bo 1 }); + + if let Some(br) = bar_result { + match br { + // update the thread for next render if we have new notes + BarResult::NewThreadNotes(notes) => { + let view = app.timelines[timeline].current_view_mut(); + view.insert(¬es); + } + } + } }); }