From eee088933a17b6c4bcc60036ee9422f8078f0b17 Mon Sep 17 00:00:00 2001 From: irriden Date: Tue, 5 Dec 2023 03:44:05 +0000 Subject: [PATCH] signer: collect mqtt bytes into a vector of vectors this allows us to parse huge mainnet addblock messages. the max we've seen so far is around 45KB. in that case, we previously would require two contiguous, 45KB chunks of memory; one for the mqtt raw bytes, and the other for the message struct returned by msgs::read. with this commit, we still require a 45KB contiguous chunk for msgs::read, but now the network bytes come in broken up into a vector of vectors, which makes room for that 45KB allocation. --- sphinx-key/Cargo.lock | 12 ++++++------ sphinx-key/Cargo.toml | 6 +++--- sphinx-key/src/conn/mqtt.rs | 24 ++++++++++++++++-------- sphinx-key/src/core/events.rs | 2 +- 4 files changed, 26 insertions(+), 18 deletions(-) diff --git a/sphinx-key/Cargo.lock b/sphinx-key/Cargo.lock index 2adbe01..662853e 100644 --- a/sphinx-key/Cargo.lock +++ b/sphinx-key/Cargo.lock @@ -1150,7 +1150,7 @@ checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" [[package]] name = "lss-connector" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs.git?rev=3a35b8fffb51a03b7a81b2f914b16540653b1d1f#3a35b8fffb51a03b7a81b2f914b16540653b1d1f" +source = "git+https://github.com/stakwork/sphinx-rs.git?rev=eeb206c2167b6a232984bf13b7e71e2a49f9098e#eeb206c2167b6a232984bf13b7e71e2a49f9098e" dependencies = [ "anyhow", "log", @@ -1507,7 +1507,7 @@ dependencies = [ [[package]] name = "rmp-utils" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs.git?rev=3a35b8fffb51a03b7a81b2f914b16540653b1d1f#3a35b8fffb51a03b7a81b2f914b16540653b1d1f" +source = "git+https://github.com/stakwork/sphinx-rs.git?rev=eeb206c2167b6a232984bf13b7e71e2a49f9098e#eeb206c2167b6a232984bf13b7e71e2a49f9098e" dependencies = [ "anyhow", "log", @@ -1696,7 +1696,7 @@ checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" [[package]] name = "sphinx-auther" version = "0.1.12" -source = "git+https://github.com/stakwork/sphinx-rs.git?rev=3a35b8fffb51a03b7a81b2f914b16540653b1d1f#3a35b8fffb51a03b7a81b2f914b16540653b1d1f" +source = "git+https://github.com/stakwork/sphinx-rs.git?rev=eeb206c2167b6a232984bf13b7e71e2a49f9098e#eeb206c2167b6a232984bf13b7e71e2a49f9098e" dependencies = [ "anyhow", "base64", @@ -1708,7 +1708,7 @@ dependencies = [ [[package]] name = "sphinx-crypter" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs.git?rev=3a35b8fffb51a03b7a81b2f914b16540653b1d1f#3a35b8fffb51a03b7a81b2f914b16540653b1d1f" +source = "git+https://github.com/stakwork/sphinx-rs.git?rev=eeb206c2167b6a232984bf13b7e71e2a49f9098e#eeb206c2167b6a232984bf13b7e71e2a49f9098e" dependencies = [ "anyhow", "chacha20poly1305", @@ -1719,7 +1719,7 @@ dependencies = [ [[package]] name = "sphinx-glyph" version = "0.1.2" -source = "git+https://github.com/stakwork/sphinx-rs.git?rev=3a35b8fffb51a03b7a81b2f914b16540653b1d1f#3a35b8fffb51a03b7a81b2f914b16540653b1d1f" +source = "git+https://github.com/stakwork/sphinx-rs.git?rev=eeb206c2167b6a232984bf13b7e71e2a49f9098e#eeb206c2167b6a232984bf13b7e71e2a49f9098e" dependencies = [ "anyhow", "hex", @@ -1751,7 +1751,7 @@ dependencies = [ [[package]] name = "sphinx-signer" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs.git?rev=3a35b8fffb51a03b7a81b2f914b16540653b1d1f#3a35b8fffb51a03b7a81b2f914b16540653b1d1f" +source = "git+https://github.com/stakwork/sphinx-rs.git?rev=eeb206c2167b6a232984bf13b7e71e2a49f9098e#eeb206c2167b6a232984bf13b7e71e2a49f9098e" dependencies = [ "anyhow", "bip39", diff --git a/sphinx-key/Cargo.toml b/sphinx-key/Cargo.toml index 664f6a7..0f808ac 100644 --- a/sphinx-key/Cargo.toml +++ b/sphinx-key/Cargo.toml @@ -25,9 +25,9 @@ serde_json = { version = "1.0.81", default-features = false } serde_urlencoded = "0.7.1" # sphinx-rs -lss-connector = { git = "https://github.com/stakwork/sphinx-rs.git", default-features = false, rev = "3a35b8fffb51a03b7a81b2f914b16540653b1d1f" } -sphinx-crypter = { git = "https://github.com/stakwork/sphinx-rs.git", rev = "3a35b8fffb51a03b7a81b2f914b16540653b1d1f" } -sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs.git", optional = true, rev = "3a35b8fffb51a03b7a81b2f914b16540653b1d1f" } +lss-connector = { git = "https://github.com/stakwork/sphinx-rs.git", default-features = false, rev = "eeb206c2167b6a232984bf13b7e71e2a49f9098e" } +sphinx-crypter = { git = "https://github.com/stakwork/sphinx-rs.git", rev = "eeb206c2167b6a232984bf13b7e71e2a49f9098e" } +sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs.git", optional = true, rev = "eeb206c2167b6a232984bf13b7e71e2a49f9098e" } # local # lss-connector = { path = "../../sphinx-rs/lss-connector", default-features = false } # sphinx-crypter = { path = "../../sphinx-rs/crypter" } diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index 6d1d423..3f60642 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -45,11 +45,12 @@ pub fn make_client( info!("MQTT client started"); - let builder = thread::Builder::new().stack_size(1524); + let builder = thread::Builder::new().stack_size(2048); builder.spawn(move || { info!("MQTT Listening for messages"); let mut inflight = Vec::new(); let mut inflight_topic = "".to_string(); + let mut inflight_len = 0usize; while let Some(msg) = connection.next() { match msg { Err(e) => match e.to_string().as_ref() { @@ -74,26 +75,30 @@ pub fn make_client( Event::Unsubscribed(_mes_id) => info!("RECEIVED Unsubscribed MESSAGE"), Event::Published(_mes_id) => info!("RECEIVED Published MESSAGE"), Event::Received(msg) => { - let incoming_message: Option<(String, Vec)> = match msg.details() { + let incoming_message: Option<(String, Vec>)> = match msg.details() { Details::Complete => msg .topic() - .map(|topic| (topic.to_string(), msg.data().to_vec())), + .map(|topic| (topic.to_string(), vec![msg.data().to_vec()])), Details::InitialChunk(chunk_info) => { if let Some(topic) = msg.topic() { - inflight = Vec::with_capacity(chunk_info.total_data_size); + inflight = + Vec::with_capacity(chunk_info.total_data_size / 1024 + 1); inflight_topic = topic.to_string(); - inflight.extend_from_slice(msg.data()); + inflight.push(msg.data().to_vec()); + inflight_len += msg.data().len(); None } else { None } } Details::SubsequentChunk(chunk_data) => { - inflight.extend_from_slice(msg.data()); - if inflight.len() == chunk_data.total_data_size { + inflight.push(msg.data().to_vec()); + inflight_len += msg.data().len(); + if inflight_len == chunk_data.total_data_size { let ret = Some((inflight_topic, inflight)); inflight_topic = String::new(); inflight = Vec::new(); + inflight_len = 0usize; ret } else { None @@ -110,10 +115,13 @@ pub fn make_client( || topic.ends_with(topics::INIT_2_MSG) || topic.ends_with(topics::LSS_CONFLICT) { - log::debug!("received data len {}", data.len()); + let data: Vec = + data.into_iter().flat_map(|v| v.into_iter()).collect(); tx.send(CoreEvent::LssMessage(data)) .expect("couldnt send Event::LssMessage"); } else if topic.ends_with(topics::CONTROL) { + let data: Vec = + data.into_iter().flat_map(|v| v.into_iter()).collect(); tx.send(CoreEvent::Control(data)) .expect("couldnt send Event::Control"); } else { diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index 6a20dc7..782b375 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -27,7 +27,7 @@ use esp_idf_svc::sys::EspError; pub enum Event { Connected, Disconnected, - VlsMessage(Vec), + VlsMessage(Vec>), LssMessage(Vec), Control(Vec), }