Rescan storage for new NWC events every 10 seconds

This commit is contained in:
benthecarman
2023-06-13 13:58:29 -05:00
committed by Tony Giorgio
parent f2180f5e0b
commit 470afcf282
3 changed files with 151 additions and 67 deletions

View File

@@ -7,9 +7,12 @@ pub(crate) use crate::nostr::{
try_queue_event, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5,
NOSTR_QUEUE_6,
};
use ::nostr::{ClientMessage, Event, Kind, RelayMessage, Tag, TagKind};
use ::nostr::{
ClientMessage, Event, EventId, Filter, Kind, RelayMessage, SubscriptionId, Tag, TagKind,
};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use worker::*;
mod error;
@@ -171,6 +174,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
return Response::from_json(&get_nip11_response())?.with_cors(&cors());
}
let ctx = Arc::new(ctx);
// For websocket compatibility
let pair = WebSocketPair::new()?;
let server = pair.server;
@@ -299,69 +303,63 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
subscription_id,
filters,
} => {
// if the user requests a NWC event, we have those stored,
// we should send them to the user
let mut events = vec![];
// for each filter we handle it every 10 seconds
// by reading storage and sending any new events
// one caveat is that this will send events multiple
// times if they are in multiple filters
for filter in filters {
// get all authors and pubkeys
let mut keys = filter.authors.unwrap_or_default();
keys.extend(
filter
.pubkeys
.unwrap_or_default()
.into_iter()
.map(|p| p.to_string()),
);
let valid_nwc = {
// has correct kinds
let kinds = filter.kinds.as_ref();
(
kinds
.unwrap_or(&vec![])
.contains(&Kind::WalletConnectResponse)
|| kinds.unwrap_or(&vec![])
.contains(&Kind::WalletConnectRequest)
) &&
// has authors and pubkeys
!filter.authors.as_ref().unwrap_or(&vec![]).is_empty() &&
!filter.pubkeys.as_ref().unwrap_or(&vec![]).is_empty()
};
if filter
.kinds
.clone()
.unwrap_or_default()
.contains(&Kind::WalletConnectRequest)
{
let found_events = get_nwc_events(
&keys,
Kind::WalletConnectRequest,
&ctx,
)
.await
.unwrap_or_default();
events.extend(found_events);
if valid_nwc {
let ctx_clone = ctx.clone();
let sub_id = subscription_id.clone();
let server_clone = server.clone();
wasm_bindgen_futures::spawn_local(async move {
console_log!("Got NWC filter!");
let mut sent_events = vec![];
loop {
match handle_filter(
&sent_events,
sub_id.clone(),
filter.clone(),
&server_clone,
&ctx_clone,
)
.await
{
Ok(new_event_ids) => {
// add new events to sent events
sent_events.extend(new_event_ids);
}
Err(e) => console_log!(
"error handling filter: {e}"
),
}
utils::delay(10_000).await;
}
});
}
if filter
.kinds
.unwrap_or_default()
.contains(&Kind::WalletConnectResponse)
{
let found_events = get_nwc_events(
&keys,
Kind::WalletConnectResponse,
&ctx,
)
.await
.unwrap_or_default();
events.extend(found_events);
}
}
// send all found events to the user
for event in events {
console_log!("sending event to client: {}", &event.id);
let relay_msg = RelayMessage::new_event(
subscription_id.clone(),
event,
);
} {
// if not a nwc filter, we just send EOSE
let relay_msg = RelayMessage::new_eose(subscription_id);
server
.send_with_str(&relay_msg.as_json())
.send_with_str(relay_msg.as_json())
.expect("failed to send response");
}
console_log!("end of subscription request");
let relay_msg = RelayMessage::new_eose(subscription_id);
server
.send_with_str(&relay_msg.as_json())
.expect("failed to send response");
}
_ => {
console_log!("ignoring other nostr client message types");
@@ -427,6 +425,86 @@ pub fn queue_number(batch_name: &str) -> Result<u32> {
}
}
/// if the user requests a NWC event, we have those stored,
/// we should send them to the user
pub async fn handle_filter(
sent_events: &[EventId],
subscription_id: SubscriptionId,
filter: Filter,
server: &WebSocket,
ctx: &RouteContext<()>,
) -> Result<Vec<EventId>> {
let mut events = vec![];
// get all authors and pubkeys
let mut keys = filter.authors.unwrap_or_default();
keys.extend(
filter
.pubkeys
.unwrap_or_default()
.into_iter()
.map(|p| p.to_string()),
);
if filter
.kinds
.clone()
.unwrap_or_default()
.contains(&Kind::WalletConnectRequest)
{
let mut found_events = get_nwc_events(&keys, Kind::WalletConnectRequest, ctx)
.await
.unwrap_or_default();
// filter out events that have already been sent
found_events.retain(|e| !sent_events.contains(&e.id));
events.extend(found_events);
}
if filter
.kinds
.unwrap_or_default()
.contains(&Kind::WalletConnectResponse)
{
let mut found_events = get_nwc_events(&keys, Kind::WalletConnectResponse, ctx)
.await
.unwrap_or_default();
// filter out events that have already been sent
found_events.retain(|e| !sent_events.contains(&e.id));
events.extend(found_events);
}
if events.is_empty() {
return Ok(vec![]);
}
if !events.is_empty() {
// sort and dedup events
events.sort_by(|a, b| a.created_at.cmp(&b.created_at));
events.dedup();
// send all found events to the user
for event in events.clone() {
console_log!("sending event to client: {}", &event.id);
let relay_msg = RelayMessage::new_event(subscription_id.clone(), event);
server
.send_with_str(&relay_msg.as_json())
.expect("failed to send response");
}
}
console_log!("end of subscription request");
let relay_msg = RelayMessage::new_eose(subscription_id);
server
.send_with_str(relay_msg.as_json())
.expect("failed to send response");
let sent_event_ids: Vec<EventId> = events.into_iter().map(|e| e.id).collect();
Ok(sent_event_ids)
}
pub async fn get_nwc_events(
keys: &[String],
kind: Kind,

View File

@@ -1,12 +1,13 @@
use crate::error::Error;
use crate::utils::delay;
use futures::future::Either;
use futures::pin_mut;
use futures::StreamExt;
use nostr::prelude::*;
use std::string::ToString;
use std::{time::Duration, vec};
use std::vec;
use worker::WebsocketEvent;
use worker::{console_log, Cache, Delay, Fetch, Queue, Response, WebSocket};
use worker::{console_log, Cache, Fetch, Queue, Response, WebSocket};
pub(crate) const NOSTR_QUEUE: &str = "nostr-events-pub-1-b";
pub(crate) const NOSTR_QUEUE_2: &str = "nostr-events-pub-2-b";
@@ -78,7 +79,10 @@ async fn send_event_to_relay(messages: Vec<ClientMessage>, relay: &str) -> Resul
let ws_opt = match either {
Either::Left((res, _)) => res,
Either::Right(_) => Err(worker::Error::RustError("Connection timeout".to_string())),
Either::Right(_) => {
console_log!("time delay hit, stopping...");
Err(worker::Error::RustError("Connection timeout".to_string()))
}
};
match ws_opt {
@@ -121,6 +125,7 @@ async fn send_event_to_relay(messages: Vec<ClientMessage>, relay: &str) -> Resul
}
}
Either::Right(_) => {
console_log!("time delay hit, stopping...");
// Sleep triggered before we got a websocket response
}
}
@@ -217,9 +222,3 @@ fn find_range_from_part(part: u32) -> (usize, usize) {
let end = start + 29;
(start as usize, end as usize)
}
async fn delay(delay: u64) {
let delay: Delay = Duration::from_millis(delay).into();
delay.await;
console_log!("time delay hit, stopping...");
}

View File

@@ -1,4 +1,6 @@
use cfg_if::cfg_if;
use std::time::Duration;
use worker::Delay;
cfg_if! {
// https://github.com/rustwasm/console_error_panic_hook#readme
@@ -10,3 +12,8 @@ cfg_if! {
pub fn set_panic_hook() {}
}
}
pub async fn delay(delay: u64) {
let delay: Delay = Duration::from_millis(delay).into();
delay.await;
}