From 6d1c1424c2946e51a4b81331b59825a2c72a3604 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Fri, 7 Jul 2023 11:53:44 -0700 Subject: [PATCH] update sphinx-rs, send HELLO, drop alerts API in broker and use hello topic instead --- Cargo.lock | 20 ++--- broker/Cargo.lock | 144 ++++++++++++++++++++++++++++---- broker/Cargo.toml | 14 ++-- broker/src/chain_tracker.rs | 4 +- broker/src/looper.rs | 4 +- broker/src/lss.rs | 2 - broker/src/mqtt.rs | 150 ++++++++++++++++++---------------- broker/src/util.rs | 2 + sphinx-key/Cargo.toml | 6 +- sphinx-key/src/core/events.rs | 30 ++++--- 10 files changed, 250 insertions(+), 126 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bdea5b0..0ee5acf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -224,7 +224,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bolt-derive" version = "0.1.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=95a152dce73ea7d9d10dc4ba50f43c3a0af05df8#95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e199c70cbacd3404e7cecf95bb75ca02afd4cffd#e199c70cbacd3404e7cecf95bb75ca02afd4cffd" dependencies = [ "proc-macro2", "quote", @@ -1189,7 +1189,7 @@ checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" [[package]] name = "lss-connector" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs.git?rev=9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7#9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7" +source = "git+https://github.com/stakwork/sphinx-rs.git?rev=78d8c989bed15f63e5cc7df42e19ee7117c33807#78d8c989bed15f63e5cc7df42e19ee7117c33807" dependencies = [ "anyhow", "log", @@ -1757,7 +1757,7 @@ dependencies = [ [[package]] name = "sphinx-auther" version = "0.1.12" -source = "git+https://github.com/stakwork/sphinx-rs.git?rev=9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7#9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7" +source = "git+https://github.com/stakwork/sphinx-rs.git?rev=78d8c989bed15f63e5cc7df42e19ee7117c33807#78d8c989bed15f63e5cc7df42e19ee7117c33807" dependencies = [ "anyhow", "base64", @@ -1769,7 +1769,7 @@ dependencies = [ [[package]] name = "sphinx-crypter" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs.git?rev=9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7#9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7" +source = "git+https://github.com/stakwork/sphinx-rs.git?rev=78d8c989bed15f63e5cc7df42e19ee7117c33807#78d8c989bed15f63e5cc7df42e19ee7117c33807" dependencies = [ "anyhow", "chacha20poly1305", @@ -1780,7 +1780,7 @@ dependencies = [ [[package]] name = "sphinx-glyph" version = "0.1.2" -source = "git+https://github.com/stakwork/sphinx-rs.git?rev=9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7#9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7" +source = "git+https://github.com/stakwork/sphinx-rs.git?rev=78d8c989bed15f63e5cc7df42e19ee7117c33807#78d8c989bed15f63e5cc7df42e19ee7117c33807" dependencies = [ "anyhow", "hex", @@ -1832,7 +1832,7 @@ dependencies = [ [[package]] name = "sphinx-signer" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs.git?rev=9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7#9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7" +source = "git+https://github.com/stakwork/sphinx-rs.git?rev=78d8c989bed15f63e5cc7df42e19ee7117c33807#78d8c989bed15f63e5cc7df42e19ee7117c33807" dependencies = [ "anyhow", "bip39", @@ -2113,7 +2113,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "vls-core" version = "0.9.1" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=95a152dce73ea7d9d10dc4ba50f43c3a0af05df8#95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e199c70cbacd3404e7cecf95bb75ca02afd4cffd#e199c70cbacd3404e7cecf95bb75ca02afd4cffd" dependencies = [ "anyhow", "bitcoin", @@ -2135,7 +2135,7 @@ dependencies = [ [[package]] name = "vls-persist" version = "0.9.1" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=95a152dce73ea7d9d10dc4ba50f43c3a0af05df8#95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e199c70cbacd3404e7cecf95bb75ca02afd4cffd#e199c70cbacd3404e7cecf95bb75ca02afd4cffd" dependencies = [ "hex", "log", @@ -2148,7 +2148,7 @@ dependencies = [ [[package]] name = "vls-protocol" version = "0.9.1" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=95a152dce73ea7d9d10dc4ba50f43c3a0af05df8#95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e199c70cbacd3404e7cecf95bb75ca02afd4cffd#e199c70cbacd3404e7cecf95bb75ca02afd4cffd" dependencies = [ "as-any", "bolt-derive", @@ -2162,7 +2162,7 @@ dependencies = [ [[package]] name = "vls-protocol-signer" version = "0.9.1" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=95a152dce73ea7d9d10dc4ba50f43c3a0af05df8#95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e199c70cbacd3404e7cecf95bb75ca02afd4cffd#e199c70cbacd3404e7cecf95bb75ca02afd4cffd" dependencies = [ "bit-vec", "log", diff --git a/broker/Cargo.lock b/broker/Cargo.lock index d1b4584..02b9d17 100644 --- a/broker/Cargo.lock +++ b/broker/Cargo.lock @@ -172,6 +172,32 @@ dependencies = [ "syn 2.0.22", ] +[[package]] +name = "async-tungstenite" +version = "0.22.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce01ac37fdc85f10a43c43bc582cbd566720357011578a935761075f898baf58" +dependencies = [ + "futures-io", + "futures-util", + "log", + "pin-project-lite", + "tokio", + "tungstenite", +] + +[[package]] +name = "async_io_stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" +dependencies = [ + "futures", + "pharos", + "rustc_version", + "tokio", +] + [[package]] name = "atomic" version = "0.5.3" @@ -381,7 +407,7 @@ dependencies = [ [[package]] name = "bolt-derive" version = "0.1.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=95a152dce73ea7d9d10dc4ba50f43c3a0af05df8#95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e199c70cbacd3404e7cecf95bb75ca02afd4cffd#e199c70cbacd3404e7cecf95bb75ca02afd4cffd" dependencies = [ "proc-macro2", "quote", @@ -1604,7 +1630,7 @@ dependencies = [ [[package]] name = "lightning-storage-server" version = "0.3.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=95a152dce73ea7d9d10dc4ba50f43c3a0af05df8#95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e199c70cbacd3404e7cecf95bb75ca02afd4cffd#e199c70cbacd3404e7cecf95bb75ca02afd4cffd" dependencies = [ "anyhow", "async-trait", @@ -1680,7 +1706,7 @@ dependencies = [ [[package]] name = "lss-connector" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs?rev=9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7#9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7" +source = "git+https://github.com/stakwork/sphinx-rs?rev=78d8c989bed15f63e5cc7df42e19ee7117c33807#78d8c989bed15f63e5cc7df42e19ee7117c33807" dependencies = [ "anyhow", "lightning-storage-server", @@ -2180,6 +2206,16 @@ dependencies = [ "indexmap 1.9.3", ] +[[package]] +name = "pharos" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" +dependencies = [ + "futures", + "rustc_version", +] + [[package]] name = "phf" version = "0.11.2" @@ -2763,27 +2799,32 @@ dependencies = [ [[package]] name = "rumqttd" -version = "0.12.6" -source = "git+https://github.com/Evanfeenstra/rumqtt?branch=sphinx-2#03d1044df6e2f7cddda5d33970ae1cc4b16601e6" +version = "0.15.0" +source = "git+https://github.com/Evanfeenstra/rumqtt?branch=sphinx-3#5626e514e14e7a821113f8bb760f0477115fc91c" dependencies = [ + "async-tungstenite", "axum", "bytes", "clap 4.3.8", "config", "flume", + "futures-util", "metrics", "metrics-exporter-prometheus", "oneshot", "parking_lot 0.11.2", "rustls-pemfile 1.0.2", + "rustls-webpki", "serde", "serde_json", "slab", "thiserror", "tokio", - "tokio-rustls 0.23.4", + "tokio-rustls 0.24.1", + "tokio-util", "tracing", "tracing-subscriber", + "ws_stream_tungstenite", "x509-parser", ] @@ -2803,6 +2844,15 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rusticata-macros" version = "3.2.0" @@ -2977,6 +3027,12 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" + [[package]] name = "serde" version = "1.0.164" @@ -3108,6 +3164,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.7" @@ -3212,7 +3279,7 @@ dependencies = [ [[package]] name = "sphinx-auther" version = "0.1.12" -source = "git+https://github.com/stakwork/sphinx-rs?rev=9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7#9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7" +source = "git+https://github.com/stakwork/sphinx-rs?rev=78d8c989bed15f63e5cc7df42e19ee7117c33807#78d8c989bed15f63e5cc7df42e19ee7117c33807" dependencies = [ "anyhow", "base64 0.13.1", @@ -3224,7 +3291,7 @@ dependencies = [ [[package]] name = "sphinx-glyph" version = "0.1.2" -source = "git+https://github.com/stakwork/sphinx-rs?rev=9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7#9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7" +source = "git+https://github.com/stakwork/sphinx-rs?rev=78d8c989bed15f63e5cc7df42e19ee7117c33807#78d8c989bed15f63e5cc7df42e19ee7117c33807" dependencies = [ "anyhow", "hex", @@ -3270,7 +3337,7 @@ dependencies = [ [[package]] name = "sphinx-signer" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs?rev=9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7#9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7" +source = "git+https://github.com/stakwork/sphinx-rs?rev=78d8c989bed15f63e5cc7df42e19ee7117c33807#78d8c989bed15f63e5cc7df42e19ee7117c33807" dependencies = [ "anyhow", "bip39", @@ -3794,6 +3861,25 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "tungstenite" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15fba1a6d6bb030745759a9a2a588bfe8490fc8b4751a277db3a0be1c9ebbf67" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "txoo" version = "0.4.4" @@ -3909,6 +3995,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.1" @@ -3930,7 +4022,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "vls-core" version = "0.9.1" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=95a152dce73ea7d9d10dc4ba50f43c3a0af05df8#95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e199c70cbacd3404e7cecf95bb75ca02afd4cffd#e199c70cbacd3404e7cecf95bb75ca02afd4cffd" dependencies = [ "anyhow", "backtrace", @@ -3954,7 +4046,7 @@ dependencies = [ [[package]] name = "vls-frontend" version = "0.9.1" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=95a152dce73ea7d9d10dc4ba50f43c3a0af05df8#95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e199c70cbacd3404e7cecf95bb75ca02afd4cffd#e199c70cbacd3404e7cecf95bb75ca02afd4cffd" dependencies = [ "async-trait", "lightning-storage-server", @@ -3968,7 +4060,7 @@ dependencies = [ [[package]] name = "vls-persist" version = "0.9.1" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=95a152dce73ea7d9d10dc4ba50f43c3a0af05df8#95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e199c70cbacd3404e7cecf95bb75ca02afd4cffd#e199c70cbacd3404e7cecf95bb75ca02afd4cffd" dependencies = [ "hex", "kv", @@ -3982,7 +4074,7 @@ dependencies = [ [[package]] name = "vls-protocol" version = "0.9.1" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=95a152dce73ea7d9d10dc4ba50f43c3a0af05df8#95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e199c70cbacd3404e7cecf95bb75ca02afd4cffd#e199c70cbacd3404e7cecf95bb75ca02afd4cffd" dependencies = [ "as-any", "bolt-derive", @@ -3996,7 +4088,7 @@ dependencies = [ [[package]] name = "vls-protocol-client" version = "0.9.1" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=95a152dce73ea7d9d10dc4ba50f43c3a0af05df8#95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e199c70cbacd3404e7cecf95bb75ca02afd4cffd#e199c70cbacd3404e7cecf95bb75ca02afd4cffd" dependencies = [ "anyhow", "async-trait", @@ -4011,7 +4103,7 @@ dependencies = [ [[package]] name = "vls-protocol-signer" version = "0.9.1" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=95a152dce73ea7d9d10dc4ba50f43c3a0af05df8#95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e199c70cbacd3404e7cecf95bb75ca02afd4cffd#e199c70cbacd3404e7cecf95bb75ca02afd4cffd" dependencies = [ "bit-vec", "log", @@ -4023,7 +4115,7 @@ dependencies = [ [[package]] name = "vls-proxy" version = "0.9.1" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=95a152dce73ea7d9d10dc4ba50f43c3a0af05df8#95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e199c70cbacd3404e7cecf95bb75ca02afd4cffd#e199c70cbacd3404e7cecf95bb75ca02afd4cffd" dependencies = [ "anyhow", "as-any", @@ -4368,6 +4460,26 @@ dependencies = [ "winapi", ] +[[package]] +name = "ws_stream_tungstenite" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b6e7a5ba9436eb3868b052be83377dc685fad6d2f4cddaa2a2251b673472071" +dependencies = [ + "async-tungstenite", + "async_io_stream", + "bitflags 2.3.2", + "futures-core", + "futures-io", + "futures-sink", + "futures-util", + "log", + "pharos", + "rustc_version", + "tokio", + "tungstenite", +] + [[package]] name = "wyz" version = "0.2.0" diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 56498c0..d6e5347 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -22,7 +22,7 @@ once_cell = "1.12.0" pretty_env_logger = "0.4.0" rocket = { version = "0.5.0-rc.2", features = ["json"] } rumqttc = "0.12.0" -rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "sphinx-2" } +rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "sphinx-3", features = ["websocket"] } secp256k1 = { version = "0.24.0", features = ["rand-std", "bitcoin_hashes"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -30,13 +30,13 @@ thiserror = "1.0.31" toml = "0.5.9" url = { version = "2.2" } -vls-frontend = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" } -vls-protocol = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" } -vls-protocol-client = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" } -vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "95a152dce73ea7d9d10dc4ba50f43c3a0af05df8" } +vls-frontend = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "e199c70cbacd3404e7cecf95bb75ca02afd4cffd" } +vls-protocol = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "e199c70cbacd3404e7cecf95bb75ca02afd4cffd" } +vls-protocol-client = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "e199c70cbacd3404e7cecf95bb75ca02afd4cffd" } +vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "e199c70cbacd3404e7cecf95bb75ca02afd4cffd" } -lss-connector = { git = "https://github.com/stakwork/sphinx-rs", rev = "9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7" } -sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs", rev = "9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7" } +lss-connector = { git = "https://github.com/stakwork/sphinx-rs", rev = "78d8c989bed15f63e5cc7df42e19ee7117c33807" } +sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs", rev = "78d8c989bed15f63e5cc7df42e19ee7117c33807" } # lss-connector = { path = "../../sphinx-rs/lss-connector" } # sphinx-signer = { path = "../../sphinx-rs/signer" } diff --git a/broker/src/chain_tracker.rs b/broker/src/chain_tracker.rs index a880093..b018c86 100644 --- a/broker/src/chain_tracker.rs +++ b/broker/src/chain_tracker.rs @@ -46,8 +46,8 @@ impl MqttSignerPort { // send LSS instead let lss_reply = self.send_lss(res).await?; let (res_topic2, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply).await?; - if res_topic2 != topics::VLS_RETURN { - log::warn!("ChainTracker got a topic NOT on {}", topics::VLS_RETURN); + if res_topic2 != topics::VLS_RES { + log::warn!("ChainTracker got a topic NOT on {}", topics::VLS_RES); } the_res = res2; } diff --git a/broker/src/looper.rs b/broker/src/looper.rs index 2833400..02f7d26 100644 --- a/broker/src/looper.rs +++ b/broker/src/looper.rs @@ -172,8 +172,8 @@ impl SignerLoop { log::info!("SEND ON {}", topics::LSS_MSG); let (res_topic2, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply)?; log::info!("GOT ON {}, send to CLN", res_topic2); - if res_topic2 != topics::VLS_RETURN { - log::warn!("got a topic NOT on {}", topics::VLS_RETURN); + if res_topic2 != topics::VLS_RES { + log::warn!("got a topic NOT on {}", topics::VLS_RES); } the_res = res2; } diff --git a/broker/src/lss.rs b/broker/src/lss.rs index 122a15c..c66a51b 100644 --- a/broker/src/lss.rs +++ b/broker/src/lss.rs @@ -90,8 +90,6 @@ async fn reconnect_dance( lss_conn: &LssBroker, mqtt_tx: &mpsc::Sender, ) -> Result<()> { - // sleep 3 seconds to make sure ESP32 subscription is active - sleep(3).await; let ir = loop { if let Ok(ir) = dance_step_1(cid, lss_conn, mqtt_tx).await { break ir; diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index f490a33..cd8a6a5 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -2,7 +2,7 @@ use crate::conn::Connections; use crate::conn::{ChannelReply, ChannelRequest}; use crate::util::Settings; use rocket::tokio::{sync::broadcast, sync::mpsc}; -use rumqttd::{local::LinkTx, Alert, AlertEvent, AuthMsg, Broker, Config, Notification}; +use rumqttd::{local::LinkTx, AuthMsg, Broker, Config, Notification}; use sphinx_signer::sphinx_glyph::sphinx_auther::token::Token; use sphinx_signer::sphinx_glyph::topics; use std::sync::{Arc, Mutex}; @@ -21,16 +21,15 @@ pub fn start_broker( connections: Arc>, ) -> anyhow::Result<()> { let conf = config(settings); + // println!("CONF {:?}", conf); // let client_id = expected_client_id.to_string(); let mut broker = Broker::new(conf); - let mut alerts = broker.alerts(vec![ - // "/alerts/error/+".to_string(), - "/alerts/event/connect/+".to_string(), - "/alerts/event/disconnect/+".to_string(), - ])?; + let (mut link_tx, mut link_rx) = broker.link("localclient")?; + link_tx.subscribe("#").unwrap(); + let auth_sender_ = auth_sender.clone(); std::thread::spawn(move || { broker @@ -39,33 +38,13 @@ pub fn start_broker( }); // connected/disconnected status alerts - let (internal_status_tx, internal_status_rx) = std::sync::mpsc::channel(); - let _alerts_handle = std::thread::spawn(move || loop { - let alert = alerts.poll(); - log::info!("Alert: {:?}", alert); - match alert.1 { - Alert::Event(cid, event) => { - // dont alert for local connections - let locals = vec!["console", "localclient"]; - if !locals.contains(&cid.as_str()) { - if let Some(status) = match event { - AlertEvent::Connect => Some(true), - AlertEvent::Disconnect => Some(false), - _ => None, - } { - let _ = internal_status_tx.send((cid, status)); - } - } - } - _ => (), - } - }); + let (internal_status_tx, internal_status_rx) = std::sync::mpsc::channel::<(bool, String)>(); // track connections let status_sender_ = status_sender.clone(); let link_tx_ = link_tx.clone(); let _conns_task = std::thread::spawn(move || { - while let Ok((cid, is)) = internal_status_rx.recv() { + while let Ok((is, cid)) = internal_status_rx.recv() { if is { subs(&cid, link_tx_.clone()); } else { @@ -101,38 +80,48 @@ pub fn start_broker( // receive replies back from glyph let _sub_task = std::thread::spawn(move || { while let Ok(message) = link_rx.recv() { - if let Some(n) = message { - match n { - Notification::Forward(f) => { - let topic_res = std::str::from_utf8(&f.publish.topic); - if let Err(_) = topic_res { - continue; - } - let topic = topic_res.unwrap(); - if topic.ends_with(topics::ERROR) { - let _ = error_sender.send(f.publish.payload.to_vec()); - } else { - // VLS, CONTROL, LSS - let ts: Vec<&str> = topic.split("/").collect(); - if ts.len() != 2 { - continue; + if let None = message { + continue; + } + match message.unwrap() { + Notification::Forward(f) => { + let topic_res = std::str::from_utf8(&f.publish.topic); + if let Err(_) = topic_res { + continue; + } + + let topic = topic_res.unwrap(); + if topic.ends_with(topics::ERROR) { + let _ = error_sender.send(f.publish.payload.to_vec()); + continue; + } + + let ts: Vec<&str> = topic.split("/").collect(); + if ts.len() != 2 { + continue; + } + let cid = ts[0].to_string(); + let topic_end = ts[1].to_string(); + + if topic.ends_with(topics::HELLO) { + let _ = internal_status_tx.send((true, cid)); + } else if topic.ends_with(topics::BYE) { + let _ = internal_status_tx.send((false, cid)); + } else { + // VLS, CONTROL, LSS + let pld = f.publish.payload.to_vec(); + if topic_end == topics::INIT_RES { + if let Err(e) = init_tx.send((cid, topic_end, pld)) { + log::error!("failed to pub to init_tx! {:?}", e); } - let cid = ts[0].to_string(); - let topic_end = ts[1].to_string(); - let pld = f.publish.payload.to_vec(); - if topic_end == topics::INIT_RES { - if let Err(e) = init_tx.send((cid, topic_end, pld)) { - log::error!("failed to pub to init_tx! {:?}", e); - } - } else { - if let Err(e) = msg_tx.send((cid, topic_end, pld)) { - log::error!("failed to pub to msg_tx! {:?}", e); - } + } else { + if let Err(e) = msg_tx.send((cid, topic_end, pld)) { + log::error!("failed to pub to msg_tx! {:?}", e); } } } - _ => (), - }; + } + _ => continue, } } }); @@ -228,9 +217,9 @@ fn pub_timeout( } fn subs(cid: &str, mut ltx: LinkTx) { - ltx.subscribe(format!("{}/{}", cid, topics::VLS_RETURN)) + ltx.subscribe(format!("{}/{}", cid, topics::VLS_RES)) .unwrap(); - ltx.subscribe(format!("{}/{}", cid, topics::CONTROL_RETURN)) + ltx.subscribe(format!("{}/{}", cid, topics::CONTROL_RES)) .unwrap(); ltx.subscribe(format!("{}/{}", cid, topics::ERROR)).unwrap(); ltx.subscribe(format!("{}/{}", cid, topics::LSS_RES)) @@ -284,28 +273,45 @@ fn config(settings: Settings) -> Config { max_read_len: 10240, ..Default::default() }; - let mut servers = HashMap::new(); - servers.insert( - "sphinx-broker".to_string(), + let conns = ConnectionSettings { + connection_timeout_ms: 5000, + throttle_delay_ms: 0, + max_payload_size: 262144, + max_inflight_count: 256, + max_inflight_size: 1024, + auth: None, + dynamic_filters: true, + }; + let mut v4_servers = HashMap::new(); + v4_servers.insert( + "v4".to_string(), ServerSettings { - name: "sphinx-broker".to_string(), + name: "v4".to_string(), listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), settings.mqtt_port).into(), next_connection_delay_ms: 1, - connections: ConnectionSettings { - connection_timeout_ms: 5000, - throttle_delay_ms: 0, - max_payload_size: 262144, - max_inflight_count: 256, - max_inflight_size: 1024, - auth: None, - dynamic_filters: true, - }, + connections: conns.clone(), tls: None, }, ); + let mut ws_servers = None; + if let Some(wsp) = settings.websocket_port { + let mut ws = HashMap::new(); + ws.insert( + "ws".to_string(), + ServerSettings { + name: "ws".to_string(), + listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), wsp).into(), + next_connection_delay_ms: 1, + connections: conns, + tls: None, + }, + ); + ws_servers = Some(ws); + } Config { id: 0, - v4: servers, + v4: v4_servers, + ws: ws_servers, router, console: ConsoleSettings::new("0.0.0.0:3030"), cluster: None, diff --git a/broker/src/util.rs b/broker/src/util.rs index 8ad029c..9541620 100644 --- a/broker/src/util.rs +++ b/broker/src/util.rs @@ -10,6 +10,7 @@ pub struct Settings { pub http_port: u16, pub mqtt_port: u16, pub network: Network, + pub websocket_port: Option, } impl Default for Settings { @@ -18,6 +19,7 @@ impl Default for Settings { http_port: 8000, mqtt_port: 1883, network: Network::Regtest, + websocket_port: Some(8083), } } } diff --git a/sphinx-key/Cargo.toml b/sphinx-key/Cargo.toml index ed86f2f..e78f399 100644 --- a/sphinx-key/Cargo.toml +++ b/sphinx-key/Cargo.toml @@ -19,10 +19,10 @@ serde_json = { version = "1.0.81", default-features = false } serde_urlencoded = "0.7.1" url = "2" -lss-connector = { git = "https://github.com/stakwork/sphinx-rs.git", default-features = false, rev = "9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7" } +lss-connector = { git = "https://github.com/stakwork/sphinx-rs.git", default-features = false, rev = "78d8c989bed15f63e5cc7df42e19ee7117c33807" } # lss-connector = { path = "../../sphinx-rs/lss-connector", default-features = false } -sphinx-crypter = { git = "https://github.com/stakwork/sphinx-rs.git", rev = "9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7" } -sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs.git", optional = true, rev = "9c8dc1f8dd61fa908f5546524a9c1d21f1b34dd7" } +sphinx-crypter = { git = "https://github.com/stakwork/sphinx-rs.git", rev = "78d8c989bed15f63e5cc7df42e19ee7117c33807" } +sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs.git", optional = true, rev = "78d8c989bed15f63e5cc7df42e19ee7117c33807" } # sphinx-signer = { path = "../../sphinx-rs/signer", optional = true } anyhow.workspace = true diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index c8e63d9..2b900b1 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -124,6 +124,10 @@ pub fn make_event_loop( ) .expect("failed to init signer"); + thread::sleep(std::time::Duration::from_secs(1)); + // send the initial HELLO + mqtt_pub(&mut mqtt, client_id, topics::HELLO, &[]); + // FIXME it right to restart here? let (root_handler, lss_signer) = match lss::init_lss(client_id, &rx, rhb, &mut mqtt) { Ok(rl) => rl, @@ -165,7 +169,7 @@ pub fn make_event_loop( Ok((vls_b, lss_b)) => { if lss_b.len() == 0 { // no muts, respond directly back! - mqtt_pub(&mut mqtt, client_id, topics::VLS_RETURN, &vls_b); + mqtt_pub(&mut mqtt, client_id, topics::VLS_RES, &vls_b); restart_esp_if_memory_low(); } else { // muts! send LSS first! @@ -195,7 +199,7 @@ pub fn make_event_loop( // set msgs back to None msgs = None; mqtt_pub(&mut mqtt, client_id, &ret_topic, &bytes); - if ret_topic == topics::VLS_RETURN { + if ret_topic == topics::VLS_RES { restart_esp_if_memory_low(); } } @@ -213,7 +217,7 @@ pub fn make_event_loop( { let res_data = rmp_serde::to_vec_named(&res).expect("could not publish control response"); - mqtt_pub(&mut mqtt, client_id, topics::CONTROL_RETURN, &res_data); + mqtt_pub(&mut mqtt, client_id, topics::CONTROL_RES, &res_data); } } } @@ -277,15 +281,17 @@ fn handle_control_response( } else { // A 10kB size stack was consistently overflowing when doing a factory reset let builder = thread::Builder::new().stack_size(15000usize); - builder.spawn(move || { - led_tx.send(Status::Ota).unwrap(); - if let Err(e) = update_sphinx_key(params, led_tx) { - log::error!("OTA update failed {:?}", e.to_string()); - } else { - log::info!("OTA flow complete, restarting esp..."); - unsafe { esp_idf_sys::esp_restart() }; - } - }).unwrap(); + builder + .spawn(move || { + led_tx.send(Status::Ota).unwrap(); + if let Err(e) = update_sphinx_key(params, led_tx) { + log::error!("OTA update failed {:?}", e.to_string()); + } else { + log::info!("OTA flow complete, restarting esp..."); + unsafe { esp_idf_sys::esp_restart() }; + } + }) + .unwrap(); log::info!("OTA update launched..."); } }