feat: integrate nostrdb relay indexing

- Upgrade `nostrdb` to v0.6.1 with relay metadata support
- Switch to `nostr::RelayUrl` for typed relay URLs
- Use `process_event_with()` to pass relay info during ingestion
- Update `Relay`, `RelayPool`, and unknown ID logic accordingly

This enables richer indexing with relay provenance in events.

Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
William Casarin
2025-03-21 16:17:32 -07:00
parent a7f34a9dc7
commit 26b58683b8
12 changed files with 52 additions and 25 deletions

7
Cargo.lock generated
View File

@@ -2706,8 +2706,9 @@ dependencies = [
[[package]] [[package]]
name = "nostrdb" name = "nostrdb"
version = "0.5.1" version = "0.6.1"
source = "git+https://github.com/damus-io/nostrdb-rs?rev=22fa1cc57ee1b4ce10e726ab284bc124461c8871#22fa1cc57ee1b4ce10e726ab284bc124461c8871" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "593e342d6df5ea9ea74de29f484cc77377a4f1d4d6322c741fc465699ee76809"
dependencies = [ dependencies = [
"bindgen", "bindgen",
"cc", "cc",
@@ -2717,7 +2718,6 @@ dependencies = [
"thiserror 2.0.12", "thiserror 2.0.12",
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber",
] ]
[[package]] [[package]]
@@ -2735,6 +2735,7 @@ dependencies = [
"image", "image",
"jni", "jni",
"mime_guess", "mime_guess",
"nostr",
"nostrdb", "nostrdb",
"poll-promise", "poll-promise",
"puffin", "puffin",

View File

@@ -33,8 +33,8 @@ indexmap = "2.6.0"
log = "0.4.17" log = "0.4.17"
nostr = { version = "0.37.0", default-features = false, features = ["std", "nip49"] } nostr = { version = "0.37.0", default-features = false, features = ["std", "nip49"] }
mio = { version = "1.0.3", features = ["os-poll", "net"] } mio = { version = "1.0.3", features = ["os-poll", "net"] }
nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "22fa1cc57ee1b4ce10e726ab284bc124461c8871" } #nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "ecae493712505f91488bea0b53aa5a7b50f44ab2" }
#nostrdb = "0.5.2" nostrdb = "0.6.1"
notedeck = { path = "crates/notedeck" } notedeck = { path = "crates/notedeck" }
notedeck_chrome = { path = "crates/notedeck_chrome" } notedeck_chrome = { path = "crates/notedeck_chrome" }
notedeck_columns = { path = "crates/notedeck_columns" } notedeck_columns = { path = "crates/notedeck_columns" }

View File

@@ -25,6 +25,9 @@ pub enum Error {
#[error("invalid public key")] #[error("invalid public key")]
InvalidPublicKey, InvalidPublicKey,
#[error("invalid relay url")]
InvalidRelayUrl,
// Secp(secp256k1::Error), // Secp(secp256k1::Error),
#[error("json error: {0}")] #[error("json error: {0}")]
Json(#[from] serde_json::Error), Json(#[from] serde_json::Error),

View File

@@ -149,7 +149,7 @@ pub fn setup_multicast_relay(
} }
pub struct Relay { pub struct Relay {
pub url: String, pub url: nostr::RelayUrl,
pub status: RelayStatus, pub status: RelayStatus,
pub sender: WsSender, pub sender: WsSender,
pub receiver: WsReceiver, pub receiver: WsReceiver,
@@ -180,9 +180,10 @@ impl PartialEq for Relay {
impl Eq for Relay {} impl Eq for Relay {}
impl Relay { impl Relay {
pub fn new(url: String, wakeup: impl Fn() + Send + Sync + 'static) -> Result<Self> { pub fn new(url: nostr::RelayUrl, wakeup: impl Fn() + Send + Sync + 'static) -> Result<Self> {
let status = RelayStatus::Connecting; let status = RelayStatus::Connecting;
let (sender, receiver) = ewebsock::connect_with_wakeup(&url, Options::default(), wakeup)?; let (sender, receiver) =
ewebsock::connect_with_wakeup(url.as_str(), Options::default(), wakeup)?;
Ok(Self { Ok(Self {
url, url,
@@ -210,7 +211,7 @@ impl Relay {
pub fn connect(&mut self, wakeup: impl Fn() + Send + Sync + 'static) -> Result<()> { pub fn connect(&mut self, wakeup: impl Fn() + Send + Sync + 'static) -> Result<()> {
let (sender, receiver) = let (sender, receiver) =
ewebsock::connect_with_wakeup(&self.url, Options::default(), wakeup)?; ewebsock::connect_with_wakeup(self.url.as_str(), Options::default(), wakeup)?;
self.status = RelayStatus::Connecting; self.status = RelayStatus::Connecting;
self.sender = sender; self.sender = sender;
self.receiver = receiver; self.receiver = receiver;

View File

@@ -1,5 +1,5 @@
use crate::relay::{setup_multicast_relay, MulticastRelay, Relay, RelayStatus}; use crate::relay::{setup_multicast_relay, MulticastRelay, Relay, RelayStatus};
use crate::{ClientMessage, Result}; use crate::{ClientMessage, Error, Result};
use nostrdb::Filter; use nostrdb::Filter;
use std::collections::BTreeSet; use std::collections::BTreeSet;
@@ -50,7 +50,7 @@ pub struct WebsocketRelay {
impl PoolRelay { impl PoolRelay {
pub fn url(&self) -> &str { pub fn url(&self) -> &str {
match self { match self {
Self::Websocket(wsr) => &wsr.relay.url, Self::Websocket(wsr) => wsr.relay.url.as_str(),
Self::Multicast(_wsr) => "multicast", Self::Multicast(_wsr) => "multicast",
} }
} }
@@ -315,7 +315,10 @@ impl RelayPool {
if self.has(&url) { if self.has(&url) {
return Ok(()); return Ok(());
} }
let relay = Relay::new(url, wakeup)?; let relay = Relay::new(
nostr::RelayUrl::parse(url).map_err(|_| Error::InvalidRelayUrl)?,
wakeup,
)?;
let pool_relay = PoolRelay::websocket(relay); let pool_relay = PoolRelay::websocket(relay);
self.relays.push(pool_relay); self.relays.push(pool_relay);

View File

@@ -229,6 +229,7 @@ fn calculate_error_size(error: &Error) -> usize {
| Error::InvalidBech32 | Error::InvalidBech32
| Error::InvalidByteSize | Error::InvalidByteSize
| Error::InvalidSignature | Error::InvalidSignature
| Error::InvalidRelayUrl
| Error::Io(_) | Error::Io(_)
| Error::InvalidPublicKey => mem::size_of_val(error), // No heap usage | Error::InvalidPublicKey => mem::size_of_val(error), // No heap usage

View File

@@ -12,6 +12,7 @@ strum = { workspace = true }
strum_macros = { workspace = true } strum_macros = { workspace = true }
dirs = { workspace = true } dirs = { workspace = true }
enostr = { workspace = true } enostr = { workspace = true }
nostr = { workspace = true }
egui = { workspace = true } egui = { workspace = true }
eframe = { workspace = true } eframe = { workspace = true }
image = { workspace = true } image = { workspace = true }

View File

@@ -5,6 +5,7 @@ use crate::{
}; };
use enostr::{Filter, NoteId, Pubkey}; use enostr::{Filter, NoteId, Pubkey};
use nostr::RelayUrl;
use nostrdb::{BlockType, Mention, Ndb, Note, NoteKey, Transaction}; use nostrdb::{BlockType, Mention, Ndb, Note, NoteKey, Transaction};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@@ -78,8 +79,6 @@ impl SingleUnkIdAction {
} }
} }
type RelayUrl = String;
/// Unknown Id searcher /// Unknown Id searcher
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct UnknownIds { pub struct UnknownIds {
@@ -300,7 +299,7 @@ pub fn get_unknown_note_ids<'a>(
let id = UnknownId::Pubkey(Pubkey::new(*nprofile.pubkey())); let id = UnknownId::Pubkey(Pubkey::new(*nprofile.pubkey()));
let relays = nprofile let relays = nprofile
.relays_iter() .relays_iter()
.map(String::from) .filter_map(|s| RelayUrl::parse(s).ok())
.collect::<HashSet<RelayUrl>>(); .collect::<HashSet<RelayUrl>>();
ids.entry(id).or_default().extend(relays); ids.entry(id).or_default().extend(relays);
} }
@@ -308,7 +307,7 @@ pub fn get_unknown_note_ids<'a>(
Mention::Event(ev) => { Mention::Event(ev) => {
let relays = ev let relays = ev
.relays_iter() .relays_iter()
.map(String::from) .filter_map(|s| RelayUrl::parse(s).ok())
.collect::<HashSet<RelayUrl>>(); .collect::<HashSet<RelayUrl>>();
match ndb.get_note_by_id(txn, ev.id()) { match ndb.get_note_by_id(txn, ev.id()) {
Err(_) => { Err(_) => {

View File

@@ -299,13 +299,23 @@ fn process_message(damus: &mut Damus, ctx: &mut AppContext<'_>, relay: &str, msg
match relay { match relay {
PoolRelay::Websocket(_) => { PoolRelay::Websocket(_) => {
//info!("processing event {}", event); //info!("processing event {}", event);
if let Err(err) = ctx.ndb.process_event(ev) { if let Err(err) = ctx.ndb.process_event_with(
ev,
nostrdb::IngestMetadata::new()
.client(false)
.relay(relay.url()),
) {
error!("error processing event {ev}: {err}"); error!("error processing event {ev}: {err}");
} }
} }
PoolRelay::Multicast(_) => { PoolRelay::Multicast(_) => {
// multicast events are client events // multicast events are client events
if let Err(err) = ctx.ndb.process_client_event(ev) { if let Err(err) = ctx.ndb.process_event_with(
ev,
nostrdb::IngestMetadata::new()
.client(true)
.relay(relay.url()),
) {
error!("error processing multicast event {ev}: {err}"); error!("error processing multicast event {ev}: {err}");
} }
} }

View File

@@ -119,7 +119,10 @@ impl ProfileAction {
ProfileAction::SaveChanges(changes) => { ProfileAction::SaveChanges(changes) => {
let raw_msg = format!("[\"EVENT\",{}]", changes.to_note().json().unwrap()); let raw_msg = format!("[\"EVENT\",{}]", changes.to_note().json().unwrap());
let _ = ndb.process_client_event(raw_msg.as_str()); let _ = ndb.process_event_with(
raw_msg.as_str(),
nostrdb::IngestMetadata::new().client(true),
);
let _ = state_map.remove_entry(&changes.kp.pubkey); let _ = state_map.remove_entry(&changes.kp.pubkey);
info!("sending {}", raw_msg); info!("sending {}", raw_msg);

View File

@@ -456,11 +456,14 @@ impl TimelineKind {
.limit(default_limit()) .limit(default_limit())
.build()]), .build()]),
TimelineKind::Hashtag(hashtag) => FilterState::ready(vec![Filter::new() TimelineKind::Hashtag(hashtag) => {
let url: &str = &hashtag.to_lowercase();
FilterState::ready(vec![Filter::new()
.kinds([1]) .kinds([1])
.limit(filter::default_limit()) .limit(filter::default_limit())
.tags([hashtag.to_lowercase()], 't') .tags([url], 't')
.build()]), .build()])
}
TimelineKind::Algo(algo_timeline) => match algo_timeline { TimelineKind::Algo(algo_timeline) => match algo_timeline {
AlgoTimeline::LastPerPubkey(list_k) => match list_k { AlgoTimeline::LastPerPubkey(list_k) => match list_k {

View File

@@ -244,10 +244,12 @@ impl Timeline {
} }
pub fn hashtag(hashtag: String) -> Self { pub fn hashtag(hashtag: String) -> Self {
let hashtag = hashtag.to_lowercase();
let htag: &str = &hashtag;
let filter = Filter::new() let filter = Filter::new()
.kinds([1]) .kinds([1])
.limit(filter::default_limit()) .limit(filter::default_limit())
.tags([hashtag.to_lowercase()], 't') .tags([htag], 't')
.build(); .build();
Timeline::new( Timeline::new(