Note multicasting

This is an initial implementation of note multicast, which sends posted
notes to other notedecks on the same network.

This came about after I nerd sniped myself thinking about p2p nostr on
local networks[1]

You can test this exclusively without joining any other relays by
passing -r multicast on the command line.

[1] https://damus.io/note1j50pseqwma38g3aqrsnhvld0m0ysdgppw6fjnvvcj0haeulgswgq80lpca

Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
William Casarin
2025-01-01 18:49:46 -06:00
parent f5afdd04a6
commit fe6206c546
16 changed files with 406 additions and 125 deletions

View File

@@ -1,13 +1,22 @@
use crate::{Error, Note};
use nostrdb::Filter;
use crate::Error;
use nostrdb::{Filter, Note};
use serde_json::json;
#[derive(Debug)]
pub struct EventClientMessage<'a> {
note: Note<'a>,
}
impl EventClientMessage<'_> {
pub fn to_json(&self) -> Result<String, Error> {
Ok(format!("[\"EVENT\", {}]", self.note.json()?))
}
}
/// Messages sent by clients, received by relays
#[derive(Debug)]
pub enum ClientMessage {
Event {
note: Note,
},
pub enum ClientMessage<'a> {
Event(EventClientMessage<'a>),
Req {
sub_id: String,
filters: Vec<Filter>,
@@ -18,9 +27,9 @@ pub enum ClientMessage {
Raw(String),
}
impl ClientMessage {
pub fn event(note: Note) -> Self {
ClientMessage::Event { note }
impl<'a> ClientMessage<'a> {
pub fn event(note: Note<'a>) -> Self {
ClientMessage::Event(EventClientMessage { note })
}
pub fn raw(raw: String) -> Self {
@@ -37,7 +46,7 @@ impl ClientMessage {
pub fn to_json(&self) -> Result<String, Error> {
Ok(match self {
Self::Event { note } => json!(["EVENT", note]).to_string(),
Self::Event(ecm) => ecm.to_json()?,
Self::Raw(raw) => raw.clone(),
Self::Req { sub_id, filters } => {
if filters.is_empty() {

View File

@@ -1,3 +1,3 @@
mod message;
pub use message::ClientMessage;
pub use message::{ClientMessage, EventClientMessage};

View File

@@ -29,6 +29,9 @@ pub enum Error {
#[error("json error: {0}")]
Json(#[from] serde_json::Error),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("nostrdb error: {0}")]
Nostrdb(#[from] nostrdb::Error),

View File

@@ -7,7 +7,7 @@ mod profile;
mod pubkey;
mod relay;
pub use client::ClientMessage;
pub use client::{ClientMessage, EventClientMessage};
pub use error::Error;
pub use ewebsock;
pub use filter::Filter;
@@ -17,7 +17,7 @@ pub use note::{Note, NoteId};
pub use profile::Profile;
pub use pubkey::Pubkey;
pub use relay::message::{RelayEvent, RelayMessage};
pub use relay::pool::{PoolEvent, RelayPool};
pub use relay::pool::{PoolEvent, PoolRelay, RelayPool};
pub use relay::{Relay, RelayStatus};
pub type Result<T> = std::result::Result<T, error::Error>;

View File

@@ -1,21 +1,152 @@
use ewebsock::{Options, WsMessage, WsReceiver, WsSender};
use ewebsock::{Options, WsEvent, WsMessage, WsReceiver, WsSender};
use mio::net::UdpSocket;
use std::io;
use std::net::IpAddr;
use std::net::{SocketAddr, SocketAddrV4};
use std::time::{Duration, Instant};
use crate::{ClientMessage, Result};
use nostrdb::Filter;
use crate::{ClientMessage, EventClientMessage, Result};
use std::fmt;
use std::hash::{Hash, Hasher};
use tracing::{debug, error, info};
use std::net::Ipv4Addr;
use tracing::{debug, error};
pub mod message;
pub mod pool;
#[derive(Debug)]
#[derive(Debug, Copy, Clone)]
pub enum RelayStatus {
Connected,
Connecting,
Disconnected,
}
pub struct MulticastRelay {
last_join: Instant,
status: RelayStatus,
address: SocketAddrV4,
socket: UdpSocket,
interface: Ipv4Addr,
}
impl MulticastRelay {
pub fn new(address: SocketAddrV4, socket: UdpSocket, interface: Ipv4Addr) -> Self {
let last_join = Instant::now();
let status = RelayStatus::Connected;
MulticastRelay {
status,
address,
socket,
interface,
last_join,
}
}
/// Multicast seems to fail every 260 seconds. We force a rejoin every 200 seconds or
/// so to ensure we are always in the group
pub fn rejoin(&mut self) -> Result<()> {
self.last_join = Instant::now();
self.status = RelayStatus::Disconnected;
self.socket
.leave_multicast_v4(self.address.ip(), &self.interface)?;
self.socket
.join_multicast_v4(self.address.ip(), &self.interface)?;
self.status = RelayStatus::Connected;
Ok(())
}
pub fn should_rejoin(&self) -> bool {
(Instant::now() - self.last_join) >= Duration::from_secs(200)
}
pub fn try_recv(&self) -> Option<WsEvent> {
let mut buffer = [0u8; 65535];
// Read the size header
match self.socket.recv_from(&mut buffer) {
Ok((size, src)) => {
let parsed_size = u32::from_be_bytes(buffer[0..4].try_into().ok()?) as usize;
debug!("multicast: read size {} from start of header", size - 4);
if size != parsed_size + 4 {
error!(
"multicast: partial data received: expected {}, got {}",
parsed_size, size
);
return None;
}
let text = String::from_utf8_lossy(&buffer[4..size]);
debug!("multicast: received {} bytes from {}: {}", size, src, &text);
Some(WsEvent::Message(WsMessage::Text(text.to_string())))
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// No data available, continue
None
}
Err(e) => {
error!("multicast: error receiving data: {}", e);
None
}
}
}
pub fn send(&self, msg: &EventClientMessage) -> Result<()> {
let json = msg.to_json()?;
let len = json.len();
debug!("writing to multicast relay");
let mut buf: Vec<u8> = Vec::with_capacity(4 + len);
// Write the length of the message as 4 bytes (big-endian)
buf.extend_from_slice(&(len as u32).to_be_bytes());
// Append the JSON message bytes
buf.extend_from_slice(json.as_bytes());
self.socket.send_to(&buf, SocketAddr::V4(self.address))?;
Ok(())
}
}
pub fn setup_multicast_relay(
wakeup: impl Fn() + Send + Sync + Clone + 'static,
) -> Result<MulticastRelay> {
use mio::{Events, Interest, Poll, Token};
let port = 9797;
let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
let multicast_ip = Ipv4Addr::new(239, 19, 88, 1);
let mut socket = UdpSocket::bind(address)?;
let interface = Ipv4Addr::UNSPECIFIED;
let multicast_address = SocketAddrV4::new(multicast_ip, port);
socket.join_multicast_v4(&multicast_ip, &interface)?;
let mut poll = Poll::new()?;
poll.registry().register(
&mut socket,
Token(0),
Interest::READABLE | Interest::WRITABLE,
)?;
// wakeup our render thread when we have new stuff on the socket
std::thread::spawn(move || {
let mut events = Events::with_capacity(1);
loop {
if let Err(err) = poll.poll(&mut events, Some(Duration::from_millis(100))) {
error!("multicast socket poll error: {err}. ending multicast poller.");
return;
}
wakeup();
std::thread::yield_now();
}
});
Ok(MulticastRelay::new(multicast_address, socket, interface))
}
pub struct Relay {
pub url: String,
pub status: RelayStatus,
@@ -89,12 +220,4 @@ impl Relay {
let msg = WsMessage::Ping(vec![]);
self.sender.send(msg);
}
pub fn subscribe(&mut self, subid: String, filters: Vec<Filter>) {
info!(
"sending '{}' subscription to relay pool: {:?}",
subid, filters
);
self.send(&ClientMessage::req(subid, filters));
}
}

View File

@@ -1,4 +1,4 @@
use crate::relay::{Relay, RelayStatus};
use crate::relay::{setup_multicast_relay, MulticastRelay, Relay, RelayStatus};
use crate::{ClientMessage, Result};
use nostrdb::Filter;
@@ -33,7 +33,12 @@ pub struct PoolEventBuf {
pub event: ewebsock::WsEvent,
}
pub struct PoolRelay {
pub enum PoolRelay {
Websocket(WebsocketRelay),
Multicast(MulticastRelay),
}
pub struct WebsocketRelay {
pub relay: Relay,
pub last_ping: Instant,
pub last_connect_attempt: Instant,
@@ -41,8 +46,69 @@ pub struct PoolRelay {
}
impl PoolRelay {
pub fn new(relay: Relay) -> PoolRelay {
PoolRelay {
pub fn url(&self) -> &str {
match self {
Self::Websocket(wsr) => &wsr.relay.url,
Self::Multicast(_wsr) => "multicast",
}
}
pub fn set_status(&mut self, status: RelayStatus) {
match self {
Self::Websocket(wsr) => {
wsr.relay.status = status;
}
Self::Multicast(_mcr) => {}
}
}
pub fn try_recv(&self) -> Option<WsEvent> {
match self {
Self::Websocket(recvr) => recvr.relay.receiver.try_recv(),
Self::Multicast(recvr) => recvr.try_recv(),
}
}
pub fn status(&self) -> RelayStatus {
match self {
Self::Websocket(wsr) => wsr.relay.status,
Self::Multicast(mcr) => mcr.status,
}
}
pub fn send(&mut self, msg: &ClientMessage) -> Result<()> {
match self {
Self::Websocket(wsr) => {
wsr.relay.send(msg);
Ok(())
}
Self::Multicast(mcr) => {
// we only send event client messages at the moment
if let ClientMessage::Event(ecm) = msg {
mcr.send(ecm)?;
}
Ok(())
}
}
}
pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) -> Result<()> {
self.send(&ClientMessage::req(subid, filter))
}
pub fn websocket(relay: Relay) -> Self {
Self::Websocket(WebsocketRelay::new(relay))
}
pub fn multicast(wakeup: impl Fn() + Send + Sync + Clone + 'static) -> Result<Self> {
Ok(Self::Multicast(setup_multicast_relay(wakeup)?))
}
}
impl WebsocketRelay {
pub fn new(relay: Relay) -> Self {
Self {
relay,
last_ping: Instant::now(),
last_connect_attempt: Instant::now(),
@@ -75,6 +141,15 @@ impl RelayPool {
}
}
pub fn add_multicast_relay(
&mut self,
wakeup: impl Fn() + Send + Sync + Clone + 'static,
) -> Result<()> {
let multicast_relay = PoolRelay::multicast(wakeup)?;
self.relays.push(multicast_relay);
Ok(())
}
pub fn ping_rate(&mut self, duration: Duration) -> &mut Self {
self.ping_rate = duration;
self
@@ -82,7 +157,7 @@ impl RelayPool {
pub fn has(&self, url: &str) -> bool {
for relay in &self.relays {
if relay.relay.url == url {
if relay.url() == url {
return true;
}
}
@@ -93,25 +168,35 @@ impl RelayPool {
pub fn urls(&self) -> BTreeSet<String> {
self.relays
.iter()
.map(|pool_relay| pool_relay.relay.url.clone())
.map(|pool_relay| pool_relay.url().to_string())
.collect()
}
pub fn send(&mut self, cmd: &ClientMessage) {
for relay in &mut self.relays {
relay.relay.send(cmd);
if let Err(err) = relay.send(cmd) {
error!("error sending {:?} to {}: {err}", cmd, relay.url());
}
}
}
pub fn unsubscribe(&mut self, subid: String) {
for relay in &mut self.relays {
relay.relay.send(&ClientMessage::close(subid.clone()));
if let Err(err) = relay.send(&ClientMessage::close(subid.clone())) {
error!(
"error unsubscribing from {} on {}: {err}",
&subid,
relay.url()
);
}
}
}
pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) {
for relay in &mut self.relays {
relay.relay.subscribe(subid.clone(), filter.clone());
if let Err(err) = relay.send(&ClientMessage::req(subid.clone(), filter.clone())) {
error!("error subscribing to {}: {err}", relay.url());
}
}
}
@@ -121,50 +206,58 @@ impl RelayPool {
for relay in &mut self.relays {
let now = std::time::Instant::now();
match relay.relay.status {
RelayStatus::Disconnected => {
let reconnect_at = relay.last_connect_attempt + relay.retry_connect_after;
if now > reconnect_at {
relay.last_connect_attempt = now;
let next_duration = Duration::from_millis(
((relay.retry_connect_after.as_millis() as f64) * 1.5) as u64,
);
debug!(
"bumping reconnect duration from {:?} to {:?} and retrying connect",
relay.retry_connect_after, next_duration
);
relay.retry_connect_after = next_duration;
if let Err(err) = relay.relay.connect(wakeup.clone()) {
error!("error connecting to relay: {}", err);
match relay {
PoolRelay::Multicast(_) => {}
PoolRelay::Websocket(relay) => {
match relay.relay.status {
RelayStatus::Disconnected => {
let reconnect_at =
relay.last_connect_attempt + relay.retry_connect_after;
if now > reconnect_at {
relay.last_connect_attempt = now;
let next_duration = Duration::from_millis(
((relay.retry_connect_after.as_millis() as f64) * 1.5) as u64,
);
debug!(
"bumping reconnect duration from {:?} to {:?} and retrying connect",
relay.retry_connect_after, next_duration
);
relay.retry_connect_after = next_duration;
if let Err(err) = relay.relay.connect(wakeup.clone()) {
error!("error connecting to relay: {}", err);
}
} else {
// let's wait a bit before we try again
}
}
RelayStatus::Connected => {
relay.retry_connect_after =
WebsocketRelay::initial_reconnect_duration();
let should_ping = now - relay.last_ping > self.ping_rate;
if should_ping {
debug!("pinging {}", relay.relay.url);
relay.relay.ping();
relay.last_ping = Instant::now();
}
}
RelayStatus::Connecting => {
// cool story bro
}
} else {
// let's wait a bit before we try again
}
}
RelayStatus::Connected => {
relay.retry_connect_after = PoolRelay::initial_reconnect_duration();
let should_ping = now - relay.last_ping > self.ping_rate;
if should_ping {
debug!("pinging {}", relay.relay.url);
relay.relay.ping();
relay.last_ping = Instant::now();
}
}
RelayStatus::Connecting => {
// cool story bro
}
}
}
}
pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) {
for relay in &mut self.relays {
let relay = &mut relay.relay;
if relay.url == relay_url {
relay.send(cmd);
if relay.url() == relay_url {
if let Err(err) = relay.send(cmd) {
error!("error sending {:?} to {}: {err}", cmd, relay_url);
}
return;
}
}
@@ -182,7 +275,7 @@ impl RelayPool {
return Ok(());
}
let relay = Relay::new(url, wakeup)?;
let pool_relay = PoolRelay::new(relay);
let pool_relay = PoolRelay::websocket(relay);
self.relays.push(pool_relay);
@@ -202,7 +295,7 @@ impl RelayPool {
pub fn remove_urls(&mut self, urls: &BTreeSet<String>) {
self.relays
.retain(|pool_relay| !urls.contains(&pool_relay.relay.url));
.retain(|pool_relay| !urls.contains(pool_relay.url()));
}
// standardize the format (ie, trailing slashes)
@@ -219,32 +312,47 @@ impl RelayPool {
/// If no message is received from any relays, None is returned.
pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> {
for relay in &mut self.relays {
let relay = &mut relay.relay;
if let Some(event) = relay.receiver.try_recv() {
if let PoolRelay::Multicast(mcr) = relay {
// try rejoin on multicast
if mcr.should_rejoin() {
if let Err(err) = mcr.rejoin() {
error!("multicast: rejoin error: {err}");
} else {
debug!("multicast: rejoin success");
}
}
}
if let Some(event) = relay.try_recv() {
match &event {
WsEvent::Opened => {
relay.status = RelayStatus::Connected;
relay.set_status(RelayStatus::Connected);
}
WsEvent::Closed => {
relay.status = RelayStatus::Disconnected;
relay.set_status(RelayStatus::Disconnected);
}
WsEvent::Error(err) => {
error!("{:?}", err);
relay.status = RelayStatus::Disconnected;
relay.set_status(RelayStatus::Disconnected);
}
WsEvent::Message(ev) => {
// let's just handle pongs here.
// We only need to do this natively.
#[cfg(not(target_arch = "wasm32"))]
if let WsMessage::Ping(ref bs) = ev {
debug!("pong {}", &relay.url);
relay.sender.send(WsMessage::Pong(bs.to_owned()));
debug!("pong {}", relay.url());
match relay {
PoolRelay::Websocket(wsr) => {
wsr.relay.sender.send(WsMessage::Pong(bs.to_owned()));
}
PoolRelay::Multicast(_mcr) => {}
}
}
}
}
return Some(PoolEvent {
event,
relay: &relay.url,
relay: relay.url(),
});
}
}