signer: collect mqtt bytes into a vector of vectors

this allows us to parse huge mainnet addblock messages.

the max we've seen so far is around 45KB. in that case, we previously would require
two contiguous, 45KB chunks of memory; one for the mqtt raw bytes, and the other for
the message struct returned by msgs::read. with this commit, we still require a
45KB contiguous chunk for msgs::read, but now the network bytes come in broken up
into a vector of vectors, which makes room for that 45KB allocation.
This commit is contained in:
irriden
2023-12-05 03:44:05 +00:00
parent 61ef67ec8c
commit eee088933a
4 changed files with 26 additions and 18 deletions

12
sphinx-key/Cargo.lock generated
View File

@@ -1150,7 +1150,7 @@ checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
[[package]]
name = "lss-connector"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs.git?rev=3a35b8fffb51a03b7a81b2f914b16540653b1d1f#3a35b8fffb51a03b7a81b2f914b16540653b1d1f"
source = "git+https://github.com/stakwork/sphinx-rs.git?rev=eeb206c2167b6a232984bf13b7e71e2a49f9098e#eeb206c2167b6a232984bf13b7e71e2a49f9098e"
dependencies = [
"anyhow",
"log",
@@ -1507,7 +1507,7 @@ dependencies = [
[[package]]
name = "rmp-utils"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs.git?rev=3a35b8fffb51a03b7a81b2f914b16540653b1d1f#3a35b8fffb51a03b7a81b2f914b16540653b1d1f"
source = "git+https://github.com/stakwork/sphinx-rs.git?rev=eeb206c2167b6a232984bf13b7e71e2a49f9098e#eeb206c2167b6a232984bf13b7e71e2a49f9098e"
dependencies = [
"anyhow",
"log",
@@ -1696,7 +1696,7 @@ checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
[[package]]
name = "sphinx-auther"
version = "0.1.12"
source = "git+https://github.com/stakwork/sphinx-rs.git?rev=3a35b8fffb51a03b7a81b2f914b16540653b1d1f#3a35b8fffb51a03b7a81b2f914b16540653b1d1f"
source = "git+https://github.com/stakwork/sphinx-rs.git?rev=eeb206c2167b6a232984bf13b7e71e2a49f9098e#eeb206c2167b6a232984bf13b7e71e2a49f9098e"
dependencies = [
"anyhow",
"base64",
@@ -1708,7 +1708,7 @@ dependencies = [
[[package]]
name = "sphinx-crypter"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs.git?rev=3a35b8fffb51a03b7a81b2f914b16540653b1d1f#3a35b8fffb51a03b7a81b2f914b16540653b1d1f"
source = "git+https://github.com/stakwork/sphinx-rs.git?rev=eeb206c2167b6a232984bf13b7e71e2a49f9098e#eeb206c2167b6a232984bf13b7e71e2a49f9098e"
dependencies = [
"anyhow",
"chacha20poly1305",
@@ -1719,7 +1719,7 @@ dependencies = [
[[package]]
name = "sphinx-glyph"
version = "0.1.2"
source = "git+https://github.com/stakwork/sphinx-rs.git?rev=3a35b8fffb51a03b7a81b2f914b16540653b1d1f#3a35b8fffb51a03b7a81b2f914b16540653b1d1f"
source = "git+https://github.com/stakwork/sphinx-rs.git?rev=eeb206c2167b6a232984bf13b7e71e2a49f9098e#eeb206c2167b6a232984bf13b7e71e2a49f9098e"
dependencies = [
"anyhow",
"hex",
@@ -1751,7 +1751,7 @@ dependencies = [
[[package]]
name = "sphinx-signer"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs.git?rev=3a35b8fffb51a03b7a81b2f914b16540653b1d1f#3a35b8fffb51a03b7a81b2f914b16540653b1d1f"
source = "git+https://github.com/stakwork/sphinx-rs.git?rev=eeb206c2167b6a232984bf13b7e71e2a49f9098e#eeb206c2167b6a232984bf13b7e71e2a49f9098e"
dependencies = [
"anyhow",
"bip39",

View File

@@ -25,9 +25,9 @@ serde_json = { version = "1.0.81", default-features = false }
serde_urlencoded = "0.7.1"
# sphinx-rs
lss-connector = { git = "https://github.com/stakwork/sphinx-rs.git", default-features = false, rev = "3a35b8fffb51a03b7a81b2f914b16540653b1d1f" }
sphinx-crypter = { git = "https://github.com/stakwork/sphinx-rs.git", rev = "3a35b8fffb51a03b7a81b2f914b16540653b1d1f" }
sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs.git", optional = true, rev = "3a35b8fffb51a03b7a81b2f914b16540653b1d1f" }
lss-connector = { git = "https://github.com/stakwork/sphinx-rs.git", default-features = false, rev = "eeb206c2167b6a232984bf13b7e71e2a49f9098e" }
sphinx-crypter = { git = "https://github.com/stakwork/sphinx-rs.git", rev = "eeb206c2167b6a232984bf13b7e71e2a49f9098e" }
sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs.git", optional = true, rev = "eeb206c2167b6a232984bf13b7e71e2a49f9098e" }
# local
# lss-connector = { path = "../../sphinx-rs/lss-connector", default-features = false }
# sphinx-crypter = { path = "../../sphinx-rs/crypter" }

View File

@@ -45,11 +45,12 @@ pub fn make_client(
info!("MQTT client started");
let builder = thread::Builder::new().stack_size(1524);
let builder = thread::Builder::new().stack_size(2048);
builder.spawn(move || {
info!("MQTT Listening for messages");
let mut inflight = Vec::new();
let mut inflight_topic = "".to_string();
let mut inflight_len = 0usize;
while let Some(msg) = connection.next() {
match msg {
Err(e) => match e.to_string().as_ref() {
@@ -74,26 +75,30 @@ pub fn make_client(
Event::Unsubscribed(_mes_id) => info!("RECEIVED Unsubscribed MESSAGE"),
Event::Published(_mes_id) => info!("RECEIVED Published MESSAGE"),
Event::Received(msg) => {
let incoming_message: Option<(String, Vec<u8>)> = match msg.details() {
let incoming_message: Option<(String, Vec<Vec<u8>>)> = match msg.details() {
Details::Complete => msg
.topic()
.map(|topic| (topic.to_string(), msg.data().to_vec())),
.map(|topic| (topic.to_string(), vec![msg.data().to_vec()])),
Details::InitialChunk(chunk_info) => {
if let Some(topic) = msg.topic() {
inflight = Vec::with_capacity(chunk_info.total_data_size);
inflight =
Vec::with_capacity(chunk_info.total_data_size / 1024 + 1);
inflight_topic = topic.to_string();
inflight.extend_from_slice(msg.data());
inflight.push(msg.data().to_vec());
inflight_len += msg.data().len();
None
} else {
None
}
}
Details::SubsequentChunk(chunk_data) => {
inflight.extend_from_slice(msg.data());
if inflight.len() == chunk_data.total_data_size {
inflight.push(msg.data().to_vec());
inflight_len += msg.data().len();
if inflight_len == chunk_data.total_data_size {
let ret = Some((inflight_topic, inflight));
inflight_topic = String::new();
inflight = Vec::new();
inflight_len = 0usize;
ret
} else {
None
@@ -110,10 +115,13 @@ pub fn make_client(
|| topic.ends_with(topics::INIT_2_MSG)
|| topic.ends_with(topics::LSS_CONFLICT)
{
log::debug!("received data len {}", data.len());
let data: Vec<u8> =
data.into_iter().flat_map(|v| v.into_iter()).collect();
tx.send(CoreEvent::LssMessage(data))
.expect("couldnt send Event::LssMessage");
} else if topic.ends_with(topics::CONTROL) {
let data: Vec<u8> =
data.into_iter().flat_map(|v| v.into_iter()).collect();
tx.send(CoreEvent::Control(data))
.expect("couldnt send Event::Control");
} else {

View File

@@ -27,7 +27,7 @@ use esp_idf_svc::sys::EspError;
pub enum Event {
Connected,
Disconnected,
VlsMessage(Vec<u8>),
VlsMessage(Vec<Vec<u8>>),
LssMessage(Vec<u8>),
Control(Vec<u8>),
}