diff --git a/src/actionbar.rs b/src/actionbar.rs index ca452b6..ad7e211 100644 --- a/src/actionbar.rs +++ b/src/actionbar.rs @@ -1,4 +1,5 @@ use crate::{ + multi_subscriber::MultiSubscriber, note::NoteRef, notecache::NoteCache, route::{Route, Router}, @@ -6,8 +7,6 @@ use crate::{ }; use enostr::{NoteId, RelayPool}; use nostrdb::{Ndb, Transaction}; -use tracing::{error, info}; -use uuid::Uuid; #[derive(Debug, Eq, PartialEq, Copy, Clone)] pub enum BarAction { @@ -69,42 +68,15 @@ fn open_thread( ThreadResult::Fresh(thread) => (thread, None), }; - // only start a subscription on nav and if we don't have - // an active subscription for this thread. - if thread.subscription().is_none() { - let filters = Thread::filters(root_id); - *thread.subscription_mut() = ndb.subscribe(&filters).ok(); - - if thread.remote_subscription().is_some() { - error!("Found active remote subscription when it was not expected"); - } else { - let subid = Uuid::new_v4().to_string(); - *thread.remote_subscription_mut() = Some(subid.clone()); - pool.subscribe(subid, filters); - } - - match thread.subscription() { - Some(_sub) => { - thread.subscribers += 1; - info!( - "Locally/remotely subscribing to thread. {} total active subscriptions, {} on this thread", - ndb.subscription_count(), - thread.subscribers, - ); - } - None => error!( - "Error subscribing locally to selected note '{}''s thread", - hex::encode(selected_note) - ), - } + let multi_subscriber = if let Some(multi_subscriber) = &mut thread.multi_subscriber { + multi_subscriber } else { - thread.subscribers += 1; - info!( - "Re-using existing thread subscription. {} total active subscriptions, {} on this thread", - ndb.subscription_count(), - thread.subscribers, - ) - } + let filters = Thread::filters(root_id); + thread.multi_subscriber = Some(MultiSubscriber::new(filters)); + thread.multi_subscriber.as_mut().unwrap() + }; + + multi_subscriber.subscribe(ndb, pool); result } diff --git a/src/multi_subscriber.rs b/src/multi_subscriber.rs index 8379e44..301dc7c 100644 --- a/src/multi_subscriber.rs +++ b/src/multi_subscriber.rs @@ -1,9 +1,9 @@ use enostr::{Filter, RelayPool}; -use nostrdb::Ndb; -use tracing::{error, info}; +use nostrdb::{Ndb, Note, Transaction}; +use tracing::{debug, error, info}; use uuid::Uuid; -use crate::filter::UnifiedSubscription; +use crate::{filter::UnifiedSubscription, note::NoteRef, Error}; pub struct MultiSubscriber { filters: Vec, @@ -104,4 +104,30 @@ impl MultiSubscriber { ) } } + + pub fn poll_for_notes(&mut self, ndb: &Ndb, txn: &Transaction) -> Result, Error> { + let sub = self.sub.as_ref().ok_or(Error::no_active_sub())?; + let new_note_keys = ndb.poll_for_notes(sub.local, 500); + + if new_note_keys.is_empty() { + return Ok(vec![]); + } else { + debug!("{} new notes! {:?}", new_note_keys.len(), new_note_keys); + } + + let mut notes: Vec> = Vec::with_capacity(new_note_keys.len()); + for key in new_note_keys { + let note = if let Ok(note) = ndb.get_note_by_key(txn, key) { + note + } else { + continue; + }; + + notes.push(note); + } + + let note_refs: Vec = notes.iter().map(|n| NoteRef::from_note(n)).collect(); + + Ok(note_refs) + } } diff --git a/src/thread.rs b/src/thread.rs index baa747a..b368075 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -1,27 +1,19 @@ use crate::{ + multi_subscriber::MultiSubscriber, note::NoteRef, notecache::NoteCache, timeline::{TimelineTab, ViewFilter}, Error, Result, }; use enostr::RelayPool; -use nostrdb::{Filter, FilterBuilder, Ndb, Note, Subscription, Transaction}; -use std::cmp::Ordering; +use nostrdb::{Filter, FilterBuilder, Ndb, Transaction}; use std::collections::HashMap; -use tracing::{debug, error, info, warn}; +use tracing::{debug, warn}; #[derive(Default)] pub struct Thread { view: TimelineTab, - sub: Option, - remote_sub: Option, - pub subscribers: i32, -} - -#[derive(Debug, Eq, PartialEq, Copy, Clone)] -pub enum DecrementResult { - LastSubscriber(Subscription), - ActiveSubscribers, + pub multi_subscriber: Option, } impl Thread { @@ -32,15 +24,10 @@ impl Thread { } let mut view = TimelineTab::new_with_capacity(ViewFilter::NotesAndReplies, cap); view.notes = notes; - let sub: Option = None; - let remote_sub: Option = None; - let subscribers: i32 = 0; Thread { view, - sub, - remote_sub, - subscribers, + multi_subscriber: None, } } @@ -53,37 +40,18 @@ impl Thread { } #[must_use = "UnknownIds::update_from_note_refs should be used on this result"] - pub fn poll_notes_into_view<'a>( - &mut self, - txn: &'a Transaction, - ndb: &Ndb, - ) -> Result>> { - let sub = self.subscription().expect("thread subscription"); - let new_note_keys = ndb.poll_for_notes(sub, 500); - if new_note_keys.is_empty() { - return Ok(vec![]); - } else { - debug!("{} new notes! {:?}", new_note_keys.len(), new_note_keys); - } - - let mut notes: Vec> = Vec::with_capacity(new_note_keys.len()); - for key in new_note_keys { - let note = if let Ok(note) = ndb.get_note_by_key(txn, key) { - note - } else { - continue; - }; - - notes.push(note); - } - - { + pub fn poll_notes_into_view(&mut self, txn: &Transaction, ndb: &Ndb) -> Result<()> { + if let Some(multi_subscriber) = &mut self.multi_subscriber { let reversed = true; - let note_refs: Vec = notes.iter().map(|n| NoteRef::from_note(n)).collect(); + let note_refs: Vec = multi_subscriber.poll_for_notes(ndb, txn)?; self.view.insert(¬e_refs, reversed); + } else { + return Err(Error::Generic( + "Thread unexpectedly has no MultiSubscriber".to_owned(), + )); } - Ok(notes) + Ok(()) } /// Look for new thread notes since our last fetch @@ -112,38 +80,6 @@ impl Thread { } } - pub fn decrement_sub(&mut self) -> Result { - self.subscribers -= 1; - - match self.subscribers.cmp(&0) { - Ordering::Equal => { - if let Some(sub) = self.subscription() { - Ok(DecrementResult::LastSubscriber(sub)) - } else { - Err(Error::no_active_sub()) - } - } - Ordering::Less => Err(Error::unexpected_sub_count(self.subscribers)), - Ordering::Greater => Ok(DecrementResult::ActiveSubscribers), - } - } - - pub fn subscription(&self) -> Option { - self.sub - } - - pub fn remote_subscription(&self) -> &Option { - &self.remote_sub - } - - pub fn remote_subscription_mut(&mut self) -> &mut Option { - &mut self.remote_sub - } - - pub fn subscription_mut(&mut self) -> &mut Option { - &mut self.sub - } - fn filters_raw(root: &[u8; 32]) -> Vec { vec![ nostrdb::Filter::new().kinds([1]).event(root), @@ -253,59 +189,12 @@ pub fn thread_unsubscribe( note_cache: &mut NoteCache, id: &[u8; 32], ) { - let (unsubscribe, remote_subid) = { - let txn = Transaction::new(ndb).expect("txn"); - let root_id = crate::note::root_note_id_from_selected_id(ndb, note_cache, &txn, id); + let txn = Transaction::new(ndb).expect("txn"); + let root_id = crate::note::root_note_id_from_selected_id(ndb, note_cache, &txn, id); - let thread = threads.thread_mut(ndb, &txn, root_id).get_ptr(); - let unsub = thread.decrement_sub(); + let thread = threads.thread_mut(ndb, &txn, root_id).get_ptr(); - let mut remote_subid: Option = None; - if let Ok(DecrementResult::LastSubscriber(_subid)) = unsub { - *thread.subscription_mut() = None; - remote_subid = thread.remote_subscription().to_owned(); - *thread.remote_subscription_mut() = None; - } - - (unsub, remote_subid) - }; - - match unsubscribe { - Ok(DecrementResult::LastSubscriber(sub)) => { - if let Err(e) = ndb.unsubscribe(sub) { - error!( - "failed to unsubscribe from thread: {e}, subid:{}, {} active subscriptions", - sub.id(), - ndb.subscription_count() - ); - } else { - info!( - "Unsubscribed from thread subid:{}. {} active subscriptions", - sub.id(), - ndb.subscription_count() - ); - } - - // unsub from remote - if let Some(subid) = remote_subid { - pool.unsubscribe(subid); - } - } - - Ok(DecrementResult::ActiveSubscribers) => { - info!( - "Keeping thread subscription. {} active subscriptions.", - ndb.subscription_count() - ); - // do nothing - } - - Err(e) => { - // something is wrong! - error!( - "Thread unsubscribe error: {e}. {} active subsciptions.", - ndb.subscription_count() - ); - } + if let Some(multi_subscriber) = &mut thread.multi_subscriber { + multi_subscriber.unsubscribe(ndb, pool); } }