refactor tester bin, configurable client id, sdcard root

This commit is contained in:
Evan Feenstra
2022-09-27 12:49:41 -07:00
parent 0002374a7f
commit 7c886b0f15
5 changed files with 127 additions and 106 deletions

1
.gitignore vendored
View File

@@ -7,3 +7,4 @@ Cargo.lock
notes.md notes.md
test-flash test-flash
.env .env
teststore

View File

@@ -53,6 +53,7 @@ pub struct ChannelReply {
pub reply: Vec<u8>, pub reply: Vec<u8>,
} }
const CLIENT_ID: &str = "sphinx-1";
const BROKER_CONFIG_PATH: &str = "../broker.conf"; const BROKER_CONFIG_PATH: &str = "../broker.conf";
#[rocket::launch] #[rocket::launch]
@@ -97,7 +98,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let (status_tx, mut status_rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000);
let (error_tx, _) = broadcast::channel(1000); let (error_tx, _) = broadcast::channel(1000);
log::info!("=> start broker on network: {}", settings.network); log::info!("=> start broker on network: {}", settings.network);
start_broker(rx, status_tx, error_tx.clone(), "sphinx-1", &settings).await; start_broker(rx, status_tx, error_tx.clone(),CLIENT_ID, &settings).await;
log::info!("=> wait for connected status"); log::info!("=> wait for connected status");
// wait for connection = true // wait for connection = true
let status = status_rx.recv().await.expect("couldnt receive"); let status = status_rx.recv().await.expect("couldnt receive");

View File

@@ -27,12 +27,13 @@ pub struct InitResponse {
pub init_reply: Vec<u8>, pub init_reply: Vec<u8>,
} }
pub const ROOT_STORE: &str = "/sdcard/store"; // pub const ROOT_STORE: &str = "/sdcard/store";
pub fn init( pub fn init(
bytes: Vec<u8>, bytes: Vec<u8>,
network: Network, network: Network,
po: &control::Policy, po: &control::Policy,
root_store_path: &str,
) -> anyhow::Result<InitResponse> { ) -> anyhow::Result<InitResponse> {
// let persister: Arc<dyn Persist> = Arc::new(DummyPersister); // let persister: Arc<dyn Persist> = Arc::new(DummyPersister);
let mut md = MsgDriver::new(bytes); let mut md = MsgDriver::new(bytes);
@@ -52,7 +53,7 @@ pub fn init(
let policy = make_policy(network, po); let policy = make_policy(network, po);
let validator_factory = Arc::new(SimpleValidatorFactory::new_with_policy(policy)); let validator_factory = Arc::new(SimpleValidatorFactory::new_with_policy(policy));
let random_time_factory = RandomStartingTimeFactory::new(); let random_time_factory = RandomStartingTimeFactory::new();
let persister: Arc<dyn Persist> = Arc::new(FsPersister::new(ROOT_STORE)); let persister: Arc<dyn Persist> = Arc::new(FsPersister::new(root_store_path));
let clock = Arc::new(StandardClock()); let clock = Arc::new(StandardClock());
let services = NodeServices { let services = NodeServices {
validator_factory, validator_factory,

View File

@@ -34,6 +34,8 @@ pub enum Status {
Signing, Signing,
} }
pub const ROOT_STORE: &str = "/sdcard/store";
// the main event loop // the main event loop
#[cfg(not(feature = "pingpong"))] #[cfg(not(feature = "pingpong"))]
pub fn make_event_loop( pub fn make_event_loop(
@@ -69,7 +71,8 @@ pub fn make_event_loop(
let InitResponse { let InitResponse {
root_handler, root_handler,
init_reply: _, init_reply: _,
} = sphinx_key_signer::init(init_msg, network, policy).expect("failed to init signer"); } = sphinx_key_signer::init(init_msg, network, policy, ROOT_STORE)
.expect("failed to init signer");
// signing loop // signing loop
let dummy_peer = PubKey([0; 33]); let dummy_peer = PubKey([0; 33]);

View File

@@ -4,7 +4,7 @@ use sphinx_key_signer::lightning_signer::bitcoin::Network;
use clap::{App, AppSettings, Arg}; use clap::{App, AppSettings, Arg};
use dotenv::dotenv; use dotenv::dotenv;
use rumqttc::{self, AsyncClient, Event, MqttOptions, Packet, QoS}; use rumqttc::{self, AsyncClient, Event, EventLoop, MqttOptions, Packet, QoS};
use sphinx_key_signer::control::Controller; use sphinx_key_signer::control::Controller;
use sphinx_key_signer::vls_protocol::{model::PubKey, msgs}; use sphinx_key_signer::vls_protocol::{model::PubKey, msgs};
use sphinx_key_signer::{self, InitResponse}; use sphinx_key_signer::{self, InitResponse};
@@ -14,6 +14,8 @@ use std::error::Error;
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
pub const ROOT_STORE: &str = "teststore";
#[tokio::main(worker_threads = 1)] #[tokio::main(worker_threads = 1)]
async fn main() -> Result<(), Box<dyn Error>> { async fn main() -> Result<(), Box<dyn Error>> {
setup_logging("sphinx-key-tester ", "info"); setup_logging("sphinx-key-tester ", "info");
@@ -38,12 +40,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
let seed_string: String = env::var("SEED").expect("no seed"); let seed_string: String = env::var("SEED").expect("no seed");
let seed = hex::decode(seed_string).expect("couldnt decode seed"); let seed = hex::decode(seed_string).expect("couldnt decode seed");
// make the controller to validate Control messages // make the controller to validate Control messages
let mut ctrlr = controller_from_seed(&network, &seed); let ctrlr = controller_from_seed(&network, &seed);
let pubkey = hex::encode(&ctrlr.pubkey().serialize()); let pubkey = hex::encode(&ctrlr.pubkey().serialize());
let token = ctrlr.make_auth_token()?; let token = ctrlr.make_auth_token()?;
let (client, mut eventloop) = loop { let client_id = if is_test {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); "test-1"
} else {
"sphinx-1"
};
let (client, eventloop) = loop {
let mut mqttoptions = MqttOptions::new(client_id, "localhost", 1883);
mqttoptions.set_credentials(pubkey.clone(), token.clone()); mqttoptions.set_credentials(pubkey.clone(), token.clone());
mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_keep_alive(Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
@@ -72,117 +79,125 @@ async fn main() -> Result<(), Box<dyn Error>> {
.expect("could not mqtt subscribe"); .expect("could not mqtt subscribe");
if is_test { if is_test {
// test handler loop run_test(eventloop, &client, ctrlr, is_log).await;
loop { } else {
match eventloop.poll().await { run_main(eventloop, &client, ctrlr, is_log, &seed, network).await;
Ok(event) => { }
// println!("{:?}", event); }
if let Some((topic, msg_bytes)) = incoming_bytes(event) { }
match topic.as_str() {
topics::VLS => { async fn run_main(mut eventloop: EventLoop, client: &AsyncClient, mut ctrlr: Controller, is_log: bool, seed: &[u8], network: Network) {
let (ping, sequence, dbid): (msgs::Ping, u16, u64) = let seed32: [u8; 32] = seed.try_into().expect("wrong seed");
parser::request_from_bytes(msg_bytes) let init_msg =
.expect("read ping header"); sphinx_key_signer::make_init_msg(network, seed32).expect("failed to make init msg");
if is_log { let InitResponse {
println!("sequence {}", sequence); root_handler,
println!("dbid {}", dbid); init_reply: _,
println!("INCOMING: {:?}", ping); } = sphinx_key_signer::init(init_msg, network, &Default::default(), ROOT_STORE)
} .expect("failed to init signer");
let pong = msgs::Pong { // the actual handler loop
id: ping.id, loop {
message: ping.message, match eventloop.poll().await {
}; Ok(event) => {
let bytes = parser::raw_response_from_msg(pong, sequence) let dummy_peer = PubKey([0; 33]);
.expect("couldnt parse raw response"); if let Some((topic, msg_bytes)) = incoming_bytes(event) {
match topic.as_str() {
topics::VLS => {
match sphinx_key_signer::handle(
&root_handler,
msg_bytes,
dummy_peer.clone(),
is_log,
) {
Ok(b) => client
.publish(topics::VLS_RETURN, QoS::AtMostOnce, false, b)
.await
.expect("could not publish init response"),
Err(e) => panic!("HANDLE FAILED {:?}", e),
};
}
topics::CONTROL => {
match ctrlr.handle(&msg_bytes) {
Ok((response, _new_policy)) => {
client client
.publish(topics::VLS_RETURN, QoS::AtMostOnce, false, bytes) .publish(
topics::CONTROL_RETURN,
QoS::AtMostOnce,
false,
response,
)
.await .await
.expect("could not mqtt publish"); .expect("could not mqtt publish");
} }
topics::CONTROL => { Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
match ctrlr.handle(&msg_bytes) { };
Ok((response, _new_policy)) => {
client
.publish(
topics::CONTROL_RETURN,
QoS::AtMostOnce,
false,
response,
)
.await
.expect("could not mqtt publish");
}
Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
};
}
_ => log::info!("invalid topic"),
}
} }
} _ => log::info!("invalid topic"),
Err(e) => {
log::warn!("diconnected {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
break; // break out of this loop to reconnect
} }
} }
} }
} else { Err(e) => {
let seed32: [u8; 32] = seed.try_into().expect("wrong seed"); log::warn!("diconnected {:?}", e);
let init_msg = tokio::time::sleep(Duration::from_secs(1)).await;
sphinx_key_signer::make_init_msg(network, seed32).expect("failed to make init msg"); break; // break out of this loop to reconnect
let InitResponse { }
root_handler, }
init_reply: _, }
} = sphinx_key_signer::init(init_msg, network, &Default::default()) }
.expect("failed to init signer");
// the actual handler loop async fn run_test(mut eventloop: EventLoop, client: &AsyncClient, mut ctrlr: Controller, is_log: bool) {
loop { // test handler loop
match eventloop.poll().await { loop {
Ok(event) => { match eventloop.poll().await {
let dummy_peer = PubKey([0; 33]); Ok(event) => {
if let Some((topic, msg_bytes)) = incoming_bytes(event) { // println!("{:?}", event);
match topic.as_str() { if let Some((topic, msg_bytes)) = incoming_bytes(event) {
topics::VLS => { match topic.as_str() {
match sphinx_key_signer::handle( topics::VLS => {
&root_handler, let (ping, sequence, dbid): (msgs::Ping, u16, u64) =
msg_bytes, parser::request_from_bytes(msg_bytes)
dummy_peer.clone(), .expect("read ping header");
is_log, if is_log {
) { println!("sequence {}", sequence);
Ok(b) => client println!("dbid {}", dbid);
.publish(topics::VLS_RETURN, QoS::AtMostOnce, false, b) println!("INCOMING: {:?}", ping);
.await
.expect("could not publish init response"),
Err(e) => panic!("HANDLE FAILED {:?}", e),
};
}
topics::CONTROL => {
match ctrlr.handle(&msg_bytes) {
Ok((response, _new_policy)) => {
client
.publish(
topics::CONTROL_RETURN,
QoS::AtMostOnce,
false,
response,
)
.await
.expect("could not mqtt publish");
}
Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
};
}
_ => log::info!("invalid topic"),
} }
let pong = msgs::Pong {
id: ping.id,
message: ping.message,
};
let bytes = parser::raw_response_from_msg(pong, sequence)
.expect("couldnt parse raw response");
client
.publish(topics::VLS_RETURN, QoS::AtMostOnce, false, bytes)
.await
.expect("could not mqtt publish");
} }
} topics::CONTROL => {
Err(e) => { match ctrlr.handle(&msg_bytes) {
log::warn!("diconnected {:?}", e); Ok((response, _new_policy)) => {
tokio::time::sleep(Duration::from_secs(1)).await; client
break; // break out of this loop to reconnect .publish(
topics::CONTROL_RETURN,
QoS::AtMostOnce,
false,
response,
)
.await
.expect("could not mqtt publish");
}
Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
};
}
_ => log::info!("invalid topic"),
} }
} }
} }
Err(e) => {
log::warn!("diconnected {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
break; // break out of this loop to reconnect
}
} }
} }
} }