Add a manual round trigger using RPC

This commit is contained in:
Steven Roose
2024-03-13 02:52:00 +00:00
parent 723cb7a27c
commit 409ef2b89a
9 changed files with 117 additions and 12 deletions

View File

@@ -2,7 +2,10 @@
use std::io; use std::io;
use bitcoin::{Amount, FeeRate, OutPoint, ScriptBuf, Sequence, Transaction, TxIn, TxOut, Weight, Witness}; use bitcoin::{
Amount, FeeRate, OutPoint, ScriptBuf, Sequence, Transaction, Txid, TxIn, TxOut, Weight,
Witness,
};
use bitcoin::hashes::Hash; use bitcoin::hashes::Hash;
use bitcoin::secp256k1::{schnorr, KeyPair, PublicKey}; use bitcoin::secp256k1::{schnorr, KeyPair, PublicKey};
use bitcoin::sighash::{self, SighashCache, TapSighash, TapSighashType}; use bitcoin::sighash::{self, SighashCache, TapSighash, TapSighashType};
@@ -52,6 +55,10 @@ impl OorPayment {
} }
} }
pub fn txid(&self) -> Txid {
self.unsigned_transaction().txid()
}
pub fn sighashes(&self) -> Vec<TapSighash> { pub fn sighashes(&self) -> Vec<TapSighash> {
let tx = self.unsigned_transaction(); let tx = self.unsigned_transaction();
let prevs = self.inputs.iter().map(|i| i.txout()).collect::<Vec<_>>(); let prevs = self.inputs.iter().map(|i| i.txout()).collect::<Vec<_>>();

View File

@@ -687,6 +687,28 @@ pub mod admin_service_client {
.insert(GrpcMethod::new("arkd.AdminService", "WalletStatus")); .insert(GrpcMethod::new("arkd.AdminService", "WalletStatus"));
self.inner.unary(req, path, codec).await self.inner.unary(req, path, codec).await
} }
pub async fn trigger_round(
&mut self,
request: impl tonic::IntoRequest<super::Empty>,
) -> std::result::Result<tonic::Response<super::Empty>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/arkd.AdminService/TriggerRound",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("arkd.AdminService", "TriggerRound"));
self.inner.unary(req, path, codec).await
}
pub async fn stop( pub async fn stop(
&mut self, &mut self,
request: impl tonic::IntoRequest<super::Empty>, request: impl tonic::IntoRequest<super::Empty>,

View File

@@ -170,6 +170,7 @@ message VtxoSignaturesRequest {
/// Administration service for arkd. /// Administration service for arkd.
service AdminService { service AdminService {
rpc WalletStatus(Empty) returns (WalletStatusResponse) {} rpc WalletStatus(Empty) returns (WalletStatusResponse) {}
rpc TriggerRound(Empty) returns (Empty) {}
rpc Stop(Empty) returns (Empty) {} rpc Stop(Empty) returns (Empty) {}
} }

View File

@@ -92,6 +92,7 @@ pub struct App {
round_busy: Arc<RwLock<()>>, round_busy: Arc<RwLock<()>>,
round_event_tx: tokio::sync::broadcast::Sender<RoundEvent>, round_event_tx: tokio::sync::broadcast::Sender<RoundEvent>,
round_input_tx: tokio::sync::mpsc::UnboundedSender<RoundInput>, round_input_tx: tokio::sync::mpsc::UnboundedSender<RoundInput>,
round_trigger_tx: tokio::sync::mpsc::Sender<()>,
} }
impl App { impl App {
@@ -168,6 +169,7 @@ impl App {
let (round_event_tx, _rx) = tokio::sync::broadcast::channel(8); let (round_event_tx, _rx) = tokio::sync::broadcast::channel(8);
let (round_input_tx, round_input_rx) = tokio::sync::mpsc::unbounded_channel(); let (round_input_tx, round_input_rx) = tokio::sync::mpsc::unbounded_channel();
let (round_trigger_tx, round_trigger_rx) = tokio::sync::mpsc::channel(1);
let ret = Arc::new(App { let ret = Arc::new(App {
config: config, config: config,
@@ -180,6 +182,7 @@ impl App {
round_busy: Arc::new(RwLock::new(())), round_busy: Arc::new(RwLock::new(())),
round_event_tx: round_event_tx, round_event_tx: round_event_tx,
round_input_tx: round_input_tx, round_input_tx: round_input_tx,
round_trigger_tx: round_trigger_tx,
}); });
let app = ret.clone(); let app = ret.clone();
@@ -193,7 +196,7 @@ impl App {
ret = rpcserver::run_admin_rpc_server(app.clone()) => { ret = rpcserver::run_admin_rpc_server(app.clone()) => {
ret.context("error from admin gRPC server") ret.context("error from admin gRPC server")
}, },
ret = round::run_round_scheduler(app.clone(), round_input_rx) => { ret = round::run_round_scheduler(app.clone(), round_input_rx, round_trigger_rx) => {
ret.context("error from round scheduler") ret.context("error from round scheduler")
}, },
} }
@@ -202,7 +205,7 @@ impl App {
ret = rpcserver::run_public_rpc_server(app.clone()) => { ret = rpcserver::run_public_rpc_server(app.clone()) => {
ret.context("error from public gRPC server") ret.context("error from public gRPC server")
}, },
ret = round::run_round_scheduler(app.clone(), round_input_rx) => { ret = round::run_round_scheduler(app.clone(), round_input_rx, round_trigger_rx) => {
ret.context("error from round scheduler") ret.context("error from round scheduler")
}, },
} }
@@ -271,7 +274,7 @@ impl App {
if let Some(dup) = self.db.atomic_check_mark_oors_cosigned(ids.iter().copied())? { if let Some(dup) = self.db.atomic_check_mark_oors_cosigned(ids.iter().copied())? {
bail!("attempted to double sign OOR for vtxo {}", dup) bail!("attempted to double sign OOR for vtxo {}", dup)
} else { } else {
info!("Cosigning OOR tx with inputs: {:?}", ids); info!("Cosigning OOR tx {} with inputs: {:?}", payment.txid(), ids);
let (nonces, sigs) = payment.sign_asp(&self.master_key, &user_nonces); let (nonces, sigs) = payment.sign_asp(&self.master_key, &user_nonces);
Ok((nonces, sigs)) Ok((nonces, sigs))
} }

View File

@@ -46,10 +46,10 @@ enum Command {
DropOorConflicts, DropOorConflicts,
#[command()] #[command()]
Rpc { Rpc {
#[command(subcommand)]
cmd: RpcCommand,
#[arg(long, default_value = RPC_ADDR)] #[arg(long, default_value = RPC_ADDR)]
addr: String, addr: String,
#[command(subcommand)]
cmd: RpcCommand,
}, },
} }
@@ -59,6 +59,8 @@ enum RpcCommand {
Balance, Balance,
#[command()] #[command()]
GetAddress, GetAddress,
#[command()]
TriggerRound,
/// Stop arkd. /// Stop arkd.
#[command()] #[command()]
Stop, Stop,
@@ -141,8 +143,12 @@ async fn inner_main() -> anyhow::Result<()> {
} }
async fn run_rpc(addr: &str, cmd: RpcCommand) -> anyhow::Result<()> { async fn run_rpc(addr: &str, cmd: RpcCommand) -> anyhow::Result<()> {
let asp_endpoint = tonic::transport::Uri::from_str(&format!("http://{}", addr)) let addr = if addr.starts_with("http") {
.context("invalid asp addr")?; addr.to_owned()
} else {
format!("http://{}", addr)
};
let asp_endpoint = tonic::transport::Uri::from_str(&addr).context("invalid asp addr")?;
let mut asp = rpc::AdminServiceClient::connect(asp_endpoint) let mut asp = rpc::AdminServiceClient::connect(asp_endpoint)
.await.context("failed to connect to asp")?; .await.context("failed to connect to asp")?;
@@ -155,6 +161,9 @@ async fn run_rpc(addr: &str, cmd: RpcCommand) -> anyhow::Result<()> {
let res = asp.wallet_status(rpc::Empty {}).await?.into_inner(); let res = asp.wallet_status(rpc::Empty {}).await?.into_inner();
println!("{}", res.address); println!("{}", res.address);
}, },
RpcCommand::TriggerRound => {
asp.trigger_round(rpc::Empty {}).await?.into_inner();
}
RpcCommand::Stop => unimplemented!(), RpcCommand::Stop => unimplemented!(),
} }
Ok(()) Ok(())

View File

@@ -150,6 +150,7 @@ fn validate_forfeit_sigs(
pub async fn run_round_scheduler( pub async fn run_round_scheduler(
app: Arc<App>, app: Arc<App>,
mut round_input_rx: tokio::sync::mpsc::UnboundedReceiver<RoundInput>, mut round_input_rx: tokio::sync::mpsc::UnboundedReceiver<RoundInput>,
mut round_trigger_rx: tokio::sync::mpsc::Receiver<()>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let cfg = &app.config; let cfg = &app.config;
@@ -165,6 +166,10 @@ pub async fn run_round_scheduler(
'sleep: loop { 'sleep: loop {
tokio::select! { tokio::select! {
() = &mut timeout => break 'sleep, () = &mut timeout => break 'sleep,
Some(()) = round_trigger_rx.recv() => {
info!("Starting round based on admin RPC trigger");
break 'sleep;
},
_ = round_input_rx.recv() => {}, _ = round_input_rx.recv() => {},
} }
} }

View File

@@ -949,6 +949,10 @@ pub mod admin_service_server {
tonic::Response<super::WalletStatusResponse>, tonic::Response<super::WalletStatusResponse>,
tonic::Status, tonic::Status,
>; >;
async fn trigger_round(
&self,
request: tonic::Request<super::Empty>,
) -> std::result::Result<tonic::Response<super::Empty>, tonic::Status>;
async fn stop( async fn stop(
&self, &self,
request: tonic::Request<super::Empty>, request: tonic::Request<super::Empty>,
@@ -1078,6 +1082,50 @@ pub mod admin_service_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
"/arkd.AdminService/TriggerRound" => {
#[allow(non_camel_case_types)]
struct TriggerRoundSvc<T: AdminService>(pub Arc<T>);
impl<T: AdminService> tonic::server::UnaryService<super::Empty>
for TriggerRoundSvc<T> {
type Response = super::Empty;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::Empty>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as AdminService>::trigger_round(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = TriggerRoundSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/arkd.AdminService/Stop" => { "/arkd.AdminService/Stop" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct StopSvc<T: AdminService>(pub Arc<T>); struct StopSvc<T: AdminService>(pub Arc<T>);

View File

@@ -332,6 +332,18 @@ impl rpc::AdminService for Arc<App> {
})) }))
} }
async fn trigger_round(
&self,
_req: tonic::Request<rpc::Empty>,
) -> Result<tonic::Response<rpc::Empty>, tonic::Status> {
match self.round_trigger_tx.try_send(()) {
Err(tokio::sync::mpsc::error::TrySendError::Closed(())) => {
Err(internal!("round scheduler closed"))
},
_ => Ok(tonic::Response::new(rpc::Empty {})),
}
}
async fn stop( async fn stop(
&self, &self,
_req: tonic::Request<rpc::Empty>, _req: tonic::Request<rpc::Empty>,

View File

@@ -116,11 +116,10 @@ impl Wallet {
// Broadcast exit txs. // Broadcast exit txs.
for tx in &exit.broadcast { for tx in &exit.broadcast {
trace!("Broadcasting tx {}: {}", tx.txid(), bitcoin::consensus::encode::serialize_hex(tx));
if let Err(e) = self.onchain.broadcast_tx(tx).await { if let Err(e) = self.onchain.broadcast_tx(tx).await {
error!("Error broadcasting exit tx {}: {}", tx.txid(), e); error!("Error broadcasting exit tx {}: {}", tx.txid(), e);
trace!("Error broadcasting exit tx {}: {}", error!("Tx {}: {}", tx.txid(), bitcoin::consensus::encode::serialize_hex(tx));
bitcoin::consensus::encode::serialize_hex(tx), e,
);
} }
} }
@@ -222,7 +221,6 @@ impl Wallet {
let sighash = shc.taproot_script_spend_signature_hash( let sighash = shc.taproot_script_spend_signature_hash(
i, &sighash::Prevouts::All(&prevouts), leaf_hash, sighash::TapSighashType::Default, i, &sighash::Prevouts::All(&prevouts), leaf_hash, sighash::TapSighashType::Default,
).expect("all prevouts provided"); ).expect("all prevouts provided");
trace!("sighash: {}", sighash);
assert_eq!(vtxo_key.public_key(), claim.spec.user_pubkey); assert_eq!(vtxo_key.public_key(), claim.spec.user_pubkey);
let sig = SECP.sign_schnorr(&sighash.into(), &vtxo_key); let sig = SECP.sign_schnorr(&sighash.into(), &vtxo_key);