implement multi_subscriber for Thread

Signed-off-by: kernelkind <kernelkind@gmail.com>
This commit is contained in:
kernelkind
2024-09-18 13:31:24 -04:00
parent d52c50acc2
commit 814736f9b3
3 changed files with 56 additions and 169 deletions

View File

@@ -1,4 +1,5 @@
use crate::{
multi_subscriber::MultiSubscriber,
note::NoteRef,
notecache::NoteCache,
route::{Route, Router},
@@ -6,8 +7,6 @@ use crate::{
};
use enostr::{NoteId, RelayPool};
use nostrdb::{Ndb, Transaction};
use tracing::{error, info};
use uuid::Uuid;
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub enum BarAction {
@@ -69,42 +68,15 @@ fn open_thread(
ThreadResult::Fresh(thread) => (thread, None),
};
// only start a subscription on nav and if we don't have
// an active subscription for this thread.
if thread.subscription().is_none() {
let filters = Thread::filters(root_id);
*thread.subscription_mut() = ndb.subscribe(&filters).ok();
if thread.remote_subscription().is_some() {
error!("Found active remote subscription when it was not expected");
} else {
let subid = Uuid::new_v4().to_string();
*thread.remote_subscription_mut() = Some(subid.clone());
pool.subscribe(subid, filters);
}
match thread.subscription() {
Some(_sub) => {
thread.subscribers += 1;
info!(
"Locally/remotely subscribing to thread. {} total active subscriptions, {} on this thread",
ndb.subscription_count(),
thread.subscribers,
);
}
None => error!(
"Error subscribing locally to selected note '{}''s thread",
hex::encode(selected_note)
),
}
let multi_subscriber = if let Some(multi_subscriber) = &mut thread.multi_subscriber {
multi_subscriber
} else {
thread.subscribers += 1;
info!(
"Re-using existing thread subscription. {} total active subscriptions, {} on this thread",
ndb.subscription_count(),
thread.subscribers,
)
}
let filters = Thread::filters(root_id);
thread.multi_subscriber = Some(MultiSubscriber::new(filters));
thread.multi_subscriber.as_mut().unwrap()
};
multi_subscriber.subscribe(ndb, pool);
result
}

View File

@@ -1,9 +1,9 @@
use enostr::{Filter, RelayPool};
use nostrdb::Ndb;
use tracing::{error, info};
use nostrdb::{Ndb, Note, Transaction};
use tracing::{debug, error, info};
use uuid::Uuid;
use crate::filter::UnifiedSubscription;
use crate::{filter::UnifiedSubscription, note::NoteRef, Error};
pub struct MultiSubscriber {
filters: Vec<Filter>,
@@ -104,4 +104,30 @@ impl MultiSubscriber {
)
}
}
pub fn poll_for_notes(&mut self, ndb: &Ndb, txn: &Transaction) -> Result<Vec<NoteRef>, Error> {
let sub = self.sub.as_ref().ok_or(Error::no_active_sub())?;
let new_note_keys = ndb.poll_for_notes(sub.local, 500);
if new_note_keys.is_empty() {
return Ok(vec![]);
} else {
debug!("{} new notes! {:?}", new_note_keys.len(), new_note_keys);
}
let mut notes: Vec<Note<'_>> = Vec::with_capacity(new_note_keys.len());
for key in new_note_keys {
let note = if let Ok(note) = ndb.get_note_by_key(txn, key) {
note
} else {
continue;
};
notes.push(note);
}
let note_refs: Vec<NoteRef> = notes.iter().map(|n| NoteRef::from_note(n)).collect();
Ok(note_refs)
}
}

View File

@@ -1,27 +1,19 @@
use crate::{
multi_subscriber::MultiSubscriber,
note::NoteRef,
notecache::NoteCache,
timeline::{TimelineTab, ViewFilter},
Error, Result,
};
use enostr::RelayPool;
use nostrdb::{Filter, FilterBuilder, Ndb, Note, Subscription, Transaction};
use std::cmp::Ordering;
use nostrdb::{Filter, FilterBuilder, Ndb, Transaction};
use std::collections::HashMap;
use tracing::{debug, error, info, warn};
use tracing::{debug, warn};
#[derive(Default)]
pub struct Thread {
view: TimelineTab,
sub: Option<Subscription>,
remote_sub: Option<String>,
pub subscribers: i32,
}
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub enum DecrementResult {
LastSubscriber(Subscription),
ActiveSubscribers,
pub multi_subscriber: Option<MultiSubscriber>,
}
impl Thread {
@@ -32,15 +24,10 @@ impl Thread {
}
let mut view = TimelineTab::new_with_capacity(ViewFilter::NotesAndReplies, cap);
view.notes = notes;
let sub: Option<Subscription> = None;
let remote_sub: Option<String> = None;
let subscribers: i32 = 0;
Thread {
view,
sub,
remote_sub,
subscribers,
multi_subscriber: None,
}
}
@@ -53,37 +40,18 @@ impl Thread {
}
#[must_use = "UnknownIds::update_from_note_refs should be used on this result"]
pub fn poll_notes_into_view<'a>(
&mut self,
txn: &'a Transaction,
ndb: &Ndb,
) -> Result<Vec<Note<'a>>> {
let sub = self.subscription().expect("thread subscription");
let new_note_keys = ndb.poll_for_notes(sub, 500);
if new_note_keys.is_empty() {
return Ok(vec![]);
} else {
debug!("{} new notes! {:?}", new_note_keys.len(), new_note_keys);
}
let mut notes: Vec<Note<'a>> = Vec::with_capacity(new_note_keys.len());
for key in new_note_keys {
let note = if let Ok(note) = ndb.get_note_by_key(txn, key) {
note
} else {
continue;
};
notes.push(note);
}
{
pub fn poll_notes_into_view(&mut self, txn: &Transaction, ndb: &Ndb) -> Result<()> {
if let Some(multi_subscriber) = &mut self.multi_subscriber {
let reversed = true;
let note_refs: Vec<NoteRef> = notes.iter().map(|n| NoteRef::from_note(n)).collect();
let note_refs: Vec<NoteRef> = multi_subscriber.poll_for_notes(ndb, txn)?;
self.view.insert(&note_refs, reversed);
} else {
return Err(Error::Generic(
"Thread unexpectedly has no MultiSubscriber".to_owned(),
));
}
Ok(notes)
Ok(())
}
/// Look for new thread notes since our last fetch
@@ -112,38 +80,6 @@ impl Thread {
}
}
pub fn decrement_sub(&mut self) -> Result<DecrementResult> {
self.subscribers -= 1;
match self.subscribers.cmp(&0) {
Ordering::Equal => {
if let Some(sub) = self.subscription() {
Ok(DecrementResult::LastSubscriber(sub))
} else {
Err(Error::no_active_sub())
}
}
Ordering::Less => Err(Error::unexpected_sub_count(self.subscribers)),
Ordering::Greater => Ok(DecrementResult::ActiveSubscribers),
}
}
pub fn subscription(&self) -> Option<Subscription> {
self.sub
}
pub fn remote_subscription(&self) -> &Option<String> {
&self.remote_sub
}
pub fn remote_subscription_mut(&mut self) -> &mut Option<String> {
&mut self.remote_sub
}
pub fn subscription_mut(&mut self) -> &mut Option<Subscription> {
&mut self.sub
}
fn filters_raw(root: &[u8; 32]) -> Vec<FilterBuilder> {
vec![
nostrdb::Filter::new().kinds([1]).event(root),
@@ -253,59 +189,12 @@ pub fn thread_unsubscribe(
note_cache: &mut NoteCache,
id: &[u8; 32],
) {
let (unsubscribe, remote_subid) = {
let txn = Transaction::new(ndb).expect("txn");
let root_id = crate::note::root_note_id_from_selected_id(ndb, note_cache, &txn, id);
let txn = Transaction::new(ndb).expect("txn");
let root_id = crate::note::root_note_id_from_selected_id(ndb, note_cache, &txn, id);
let thread = threads.thread_mut(ndb, &txn, root_id).get_ptr();
let unsub = thread.decrement_sub();
let thread = threads.thread_mut(ndb, &txn, root_id).get_ptr();
let mut remote_subid: Option<String> = None;
if let Ok(DecrementResult::LastSubscriber(_subid)) = unsub {
*thread.subscription_mut() = None;
remote_subid = thread.remote_subscription().to_owned();
*thread.remote_subscription_mut() = None;
}
(unsub, remote_subid)
};
match unsubscribe {
Ok(DecrementResult::LastSubscriber(sub)) => {
if let Err(e) = ndb.unsubscribe(sub) {
error!(
"failed to unsubscribe from thread: {e}, subid:{}, {} active subscriptions",
sub.id(),
ndb.subscription_count()
);
} else {
info!(
"Unsubscribed from thread subid:{}. {} active subscriptions",
sub.id(),
ndb.subscription_count()
);
}
// unsub from remote
if let Some(subid) = remote_subid {
pool.unsubscribe(subid);
}
}
Ok(DecrementResult::ActiveSubscribers) => {
info!(
"Keeping thread subscription. {} active subscriptions.",
ndb.subscription_count()
);
// do nothing
}
Err(e) => {
// something is wrong!
error!(
"Thread unsubscribe error: {e}. {} active subsciptions.",
ndb.subscription_count()
);
}
if let Some(multi_subscriber) = &mut thread.multi_subscriber {
multi_subscriber.unsubscribe(ndb, pool);
}
}