This commit is contained in:
Tony Giorgio
2023-02-18 01:01:15 -06:00
parent fa856996a0
commit ad88aa0b85
12 changed files with 4512 additions and 0 deletions

17
.github/workflows/publish.yml vendored Normal file
View File

@@ -0,0 +1,17 @@
name: Publish
on:
push:
branches:
- master
jobs:
deploy:
runs-on: ubuntu-latest
name: Deploy
steps:
- uses: actions/checkout@master
- name: Publish
env:
CF_API_TOKEN: ${{ secrets.CF_API_TOKEN }}
run: npm install -g wrangler && wrangler publish

73
.github/workflows/test.yml vendored Normal file
View File

@@ -0,0 +1,73 @@
name: Tests
on:
pull_request:
jobs:
website:
name: Build WASM binary
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
components: clippy
target: wasm32-unknown-unknown
override: true
profile: minimal
- uses: jetli/wasm-pack-action@v0.4.0
with:
version: 'latest'
- uses: actions/cache@v2
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: cargo-${{ runner.os }}-browser-tests-${{ hashFiles('**/Cargo.toml') }}
restore-keys: |
cargo-${{ runner.os }}-browser-tests-
cargo-${{ runner.os }}-
- name: Build wasm
run: npm install -g wrangler && wrangler build
rust_tests:
name: Rust Checks
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
components: clippy, rustfmt
target: wasm32-unknown-unknown
override: true
profile: minimal
- name: Setup trunk
uses: jetli/trunk-action@v0.1.0
with:
version: 'latest'
- uses: actions/cache@v2
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: cargo-${{ runner.os }}-rust-tests-${{ hashFiles('**/Cargo.toml') }}
restore-keys: |
cargo-${{ runner.os }}-rust-tests-
cargo-${{ runner.os }}-
- name: Check formatting
working-directory: .
run: cargo fmt --check
- name: Check clippy
working-directory: .
run: cargo clippy

12
.gitignore vendored
View File

@@ -8,3 +8,15 @@ Cargo.lock
# These are backup files generated by rustfmt # These are backup files generated by rustfmt
**/*.rs.bk **/*.rs.bk
# cloudflare things
.DS_Store
/node_modules
**/*.rs.bk
wasm-pack.log
build/
/target
/dist
.dev.vars

30
Cargo.toml Normal file
View File

@@ -0,0 +1,30 @@
[package]
name = "blastr"
version = "0.0.0"
edition = "2018"
[lib]
crate-type = ["cdylib", "rlib"]
[features]
default = ["console_error_panic_hook"]
[dependencies]
cfg-if = "0.1.2"
# Need the queue feature
worker = { git = "https://github.com/cloudflare/workers-rs", rev = "b4b9cd1f15feac412b6f9e9e9209458cd3b98430", features = ["queue"] }
futures = "0.3.26"
nostr = { git = "https://github.com/rust-nostr/nostr", rev = "c333544b1e58b89acb786695d5ca08759ebf5e69" }
web-sys = { version = "0.3.60" }
js-sys = "0.3.60"
wasm-bindgen-futures = "0.4.33"
# The `console_error_panic_hook` crate provides better debugging of panics by
# logging them with `console.error`. This is great for development, but requires
# all the `std::fmt` and `std::panicking` infrastructure, so isn't great for
# code size when deploying.
console_error_panic_hook = { version = "0.1.1", optional = true }
[profile.release]
# Tell `rustc` to optimize for small code size.
opt-level = "s"

View File

@@ -1,2 +1,41 @@
# blastr # blastr
A nostr cloudflare workers proxy relay that publishes to all known relays A nostr cloudflare workers proxy relay that publishes to all known relays
## Development
With `wrangler`, you can build, test, and deploy your Worker with the following commands:
```sh
# install wrangler if you do not have it yet
$ npm install -g wrangler
# compiles your project to WebAssembly and will warn of any issues
$ npm run build
# run your Worker in an ideal development workflow (with a local server, file watcher & more)
$ npm run dev
# deploy your Worker globally to the Cloudflare network (update your wrangler.toml file for configuration)
$ npm run deploy
```
Read the latest `worker` crate documentation here: https://docs.rs/worker
### CICD
There's an example workflow here for publishing on master branch pushes. You need to set `CF_API_TOKEN` in your github repo secrets first.
You also should either remove or configure `wrangler.toml` to point to a custom domain of yours:
```
routes = [
{ pattern = "example.com/about", zone_id = "<YOUR_ZONE_ID>" } # replace with your info
]
```
### WebAssembly
`workers-rs` (the Rust SDK for Cloudflare Workers used in this template) is meant to be executed as compiled WebAssembly, and as such so **must** all the code you write and depend upon. All crates and modules used in Rust-based Workers projects have to compile to the `wasm32-unknown-unknown` triple.
Read more about this on the [`workers-rs`](https://github.com/cloudflare/workers-rs) project README.

4036
package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

12
package.json Normal file
View File

@@ -0,0 +1,12 @@
{
"private": true,
"version": "0.0.0",
"scripts": {
"deploy": "wrangler publish",
"dev": "wrangler dev --local"
},
"devDependencies": {
"@miniflare/tre": "^3.0.0-next.8",
"wrangler": "^2.0.0"
}
}

10
src/error.rs Normal file
View File

@@ -0,0 +1,10 @@
pub enum Error {
/// Worker error
WorkerError(String),
}
impl From<worker::Error> for Error {
fn from(e: worker::Error) -> Self {
Error::WorkerError(e.to_string())
}
}

109
src/lib.rs Normal file
View File

@@ -0,0 +1,109 @@
use crate::nostr::{try_queue_client_msg, NOSTR_QUEUE};
use ::nostr::{ClientMessage, Event};
use futures::StreamExt;
use worker::*;
mod error;
mod nostr;
mod utils;
fn log_request(req: &Request) {
console_log!(
"Incoming Request: {} - [{}]",
Date::now().to_string(),
req.path(),
);
}
/// Main function for the Cloudflare Worker that triggers off of a HTTP req
#[event(fetch)]
pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
log_request(&req);
// Optionally, get more helpful error messages written to the console in the case of a panic.
utils::set_panic_hook();
// Optionally, use the Router to handle matching endpoints, use ":name" placeholders, or "*name"
// catch-alls to match on specific patterns. Alternatively, use `Router::with_data(D)` to
// provide arbitrary data that will be accessible in each route via the `ctx.data()` method.
let router = Router::new();
// Add as many routes as your Worker needs! Each route will get a `Request` for handling HTTP
// functionality and a `RouteContext` which you can use to and get route parameters and
// Environment bindings like KV Stores, Durable Objects, Secrets, and Variables.
router
.post_async("/event", |mut req, ctx| async move {
// for any adhoc POST event
let request_text = req.text().await?;
if let Ok(client_msg) = ClientMessage::from_json(request_text) {
let nostr_queue = ctx.env.queue(NOSTR_QUEUE).expect("get queue");
try_queue_client_msg(client_msg, nostr_queue).await
}
fetch()
})
.get("/", |_, ctx| {
// For websocket compatibility
let pair = WebSocketPair::new()?;
let server = pair.server;
server.accept()?;
console_log!("accepted websocket, about to spawn event stream");
wasm_bindgen_futures::spawn_local(async move {
let mut event_stream = server.events().expect("stream error");
console_log!("spawned event stream, waiting for first message..");
while let Some(event) = event_stream.next().await {
match event.expect("received error in websocket") {
WebsocketEvent::Message(msg) => {
if msg.text().is_none() {
continue;
};
if let Ok(client_msg) = ClientMessage::from_json(msg.text().unwrap()) {
let nostr_queue = ctx.env.queue(NOSTR_QUEUE).expect("get queue");
try_queue_client_msg(client_msg, nostr_queue).await
}
}
WebsocketEvent::Close(_) => {
console_log!("closing");
}
}
}
});
Response::from_websocket(pair.client)
})
.options("/*catchall", |_, _| fetch())
.run(req, env)
.await
}
/// Main function for the Cloudflare Worker that triggers off the nostr event queue
#[event(queue)]
pub async fn main(message_batch: MessageBatch<Event>, _env: Env, _ctx: Context) -> Result<()> {
// Deserialize the message batch
let messages: Vec<Message<Event>> = message_batch.messages()?;
let events: Vec<Event> = messages.iter().map(|m| m.body.clone()).collect();
match nostr::send_nostr_events(events).await {
Ok(event_ids) => {
for event_id in event_ids {
console_log!("Sent nostr event: {}", event_id)
}
}
Err(error::Error::WorkerError(e)) => {
console_log!("worker error: {e}");
}
}
Ok(())
}
fn fetch() -> worker::Result<Response> {
Response::empty()?.with_cors(&cors())
}
fn cors() -> Cors {
Cors::new()
.with_credentials(true)
.with_origins(vec!["*"])
.with_allowed_headers(vec!["Content-Type"])
.with_methods(Method::all())
}

139
src/nostr.rs Normal file
View File

@@ -0,0 +1,139 @@
use crate::error::Error;
use futures::pin_mut;
use nostr::prelude::*;
use std::time::Duration;
use worker::{console_log, Cache, Delay, Fetch, Queue, Response, WebSocket};
pub(crate) const NOSTR_QUEUE: &str = "nostr-events-pub";
const RELAY_LIST_URL: &str = "https://api.nostr.watch/v1/online";
const RELAYS: [&str; 8] = [
"wss://nostr.zebedee.cloud",
"wss://relay.snort.social",
"wss://eden.nostr.land",
"wss://nos.lol",
"wss://brb.io",
"wss://nostr.fmt.wiz.biz",
"wss://relay.damus.io",
"wss://nostr.wine",
];
pub async fn try_queue_client_msg(client_msg: ClientMessage, nostr_queue: Queue) {
match client_msg {
ClientMessage::Event(event) => {
console_log!("got an event from client: {}", event.id);
match queue_nostr_event_with_queue(nostr_queue, *event.clone()).await {
Ok(_) => {
console_log!("queued up nostr event: {}", event.id)
}
Err(Error::WorkerError(e)) => {
console_log!("worker error: {e}");
}
}
}
_ => {
console_log!("ignoring other nostr client message types");
}
}
}
pub async fn queue_nostr_event_with_queue(nostr_queue: Queue, event: Event) -> Result<(), Error> {
nostr_queue.send(&event).await?;
Ok(())
}
async fn send_event_to_relay(messages: Vec<ClientMessage>, relay: &str) -> Result<(), Error> {
console_log!("connecting to relay: {relay}");
let ws = WebSocket::connect(relay.parse().unwrap()).await?;
// It's important that we call this before we send our first message, otherwise we will
// not have any event listeners on the socket to receive the echoed message.
let _event_stream = ws.events()?;
ws.accept()?;
console_log!("sending event to relay: {relay}");
for message in messages {
if let Some(e) = ws.send_with_str(message.as_json()).err() {
console_log!("Error sending event to relay {relay}: {e:?}")
}
}
if let Some(_e) = ws.close::<String>(None, None).err() {
console_log!("Error websocket to relay {relay}")
}
Ok(())
}
pub async fn send_nostr_events(events: Vec<Event>) -> Result<Vec<EventId>, Error> {
let messages: Vec<ClientMessage> = events
.iter()
.map(|e| ClientMessage::new_event(e.clone()))
.collect();
// pull in the relays from nostr watch list
let cache = Cache::default();
let relays = if let Some(mut resp) = cache.get(RELAY_LIST_URL, true).await? {
console_log!("cache hit for relays");
match resp.json::<Vec<String>>().await {
Ok(r) => r,
Err(_) => RELAYS.iter().map(|x| x.to_string()).collect(),
}
} else {
console_log!("no cache hit for relays");
match Fetch::Url("https://api.nostr.watch/v1/online".parse().unwrap())
.send()
.await
{
Ok(mut nostr_resp) => {
console_log!("retrieved online relay list");
match nostr_resp.json::<Vec<String>>().await {
Ok(r) => {
let mut resp = Response::from_json(&r)?;
// Cache API respects Cache-Control headers. Setting s-max-age to 10
// will limit the response to be in cache for 10 seconds max
resp.headers_mut().set("cache-control", "s-maxage=600")?;
cache.put(RELAY_LIST_URL, resp.cloned()?).await?;
match resp.json::<Vec<String>>().await {
Ok(r) => r,
Err(e) => {
console_log!("could not parse nostr relay list json: {}", e);
RELAYS.iter().map(|x| x.to_string()).collect()
}
}
}
Err(e) => {
console_log!("could not parse nostr relay list response: {}", e);
RELAYS.iter().map(|x| x.to_string()).collect()
}
}
}
Err(e) => {
console_log!("could not retrieve relay list: {}", e);
RELAYS.iter().map(|x| x.to_string()).collect()
}
}
};
let mut futures = Vec::new();
for relay in relays.iter() {
let fut = send_event_to_relay(messages.clone(), relay);
futures.push(fut);
}
let combined_futures = futures::future::join_all(futures);
let sleep = delay(60_000);
pin_mut!(combined_futures);
pin_mut!(sleep);
console_log!("waiting for futures");
futures::future::select(combined_futures, sleep).await;
console_log!("futures done");
Ok(events.iter().map(|e| e.id).collect())
}
async fn delay(delay: u64) {
let delay: Delay = Duration::from_millis(delay).into();
delay.await;
console_log!("time delay hit, stopping...");
}

12
src/utils.rs Normal file
View File

@@ -0,0 +1,12 @@
use cfg_if::cfg_if;
cfg_if! {
// https://github.com/rustwasm/console_error_panic_hook#readme
if #[cfg(feature = "console_error_panic_hook")] {
extern crate console_error_panic_hook;
pub use self::console_error_panic_hook::set_once as set_panic_hook;
} else {
#[inline]
pub fn set_panic_hook() {}
}
}

23
wrangler.toml Normal file
View File

@@ -0,0 +1,23 @@
name = "blastr"
main = "build/worker/shim.mjs"
compatibility_date = "2022-01-20"
[vars]
WORKERS_RS_VERSION = "0.0.11"
routes = [
{ pattern = "nostr.mutinywallet.com", zone_id = "mutiny-waitlist.workers.dev" } # replace with your info
]
[[queues.producers]]
queue = "nostr-events-pub"
binding = "nostr-events-pub"
[[queues.consumers]]
queue = "nostr-events-pub"
max_batch_size = 10 # max events until triggered
max_batch_timeout = 30 # max seconds until triggered
[build]
command = "cargo install --git https://github.com/CathalMullan/workers-rs worker-build && worker-build --release"
#command = "cargo install -q worker-build --version 0.0.7 && worker-build --release"