From fe118d180f6c870e34e2ccaa4dd816b638d130ef Mon Sep 17 00:00:00 2001 From: David Caseria Date: Sat, 31 May 2025 16:41:25 -0400 Subject: [PATCH] Signatory Loader (#777) * Allow Signatory to be run with custom incoming stream * Allow multiple signatories to be loaded on a server * Fix merge conflict in server.rs * Export SignatoryLoader * Use unit error * Use Arc instead of reference --- crates/cdk-signatory/src/bin/cli/mod.rs | 2 +- crates/cdk-signatory/src/lib.rs | 3 +- crates/cdk-signatory/src/proto/server.rs | 99 ++++++++++++++++++------ 3 files changed, 77 insertions(+), 27 deletions(-) diff --git a/crates/cdk-signatory/src/bin/cli/mod.rs b/crates/cdk-signatory/src/bin/cli/mod.rs index 798181be..99cc93ee 100644 --- a/crates/cdk-signatory/src/bin/cli/mod.rs +++ b/crates/cdk-signatory/src/bin/cli/mod.rs @@ -167,7 +167,7 @@ pub async fn cli_main() -> Result<()> { let socket_addr = SocketAddr::from_str(&format!("{}:{}", args.listen_addr, args.listen_port))?; - start_grpc_server(signatory, socket_addr, certs).await?; + start_grpc_server(Arc::new(signatory), socket_addr, certs).await?; Ok(()) } diff --git a/crates/cdk-signatory/src/lib.rs b/crates/cdk-signatory/src/lib.rs index 348e3445..0ee67769 100644 --- a/crates/cdk-signatory/src/lib.rs +++ b/crates/cdk-signatory/src/lib.rs @@ -13,7 +13,8 @@ mod proto; #[cfg(feature = "grpc")] pub use proto::{ - client::SignatoryRpcClient, server::start_grpc_server, server::start_grpc_server_with_incoming, + client::SignatoryRpcClient, + server::{start_grpc_server, start_grpc_server_with_incoming, SignatoryLoader}, }; mod common; diff --git a/crates/cdk-signatory/src/proto/server.rs b/crates/cdk-signatory/src/proto/server.rs index 90f1f8da..8be189b7 100644 --- a/crates/cdk-signatory/src/proto/server.rs +++ b/crates/cdk-signatory/src/proto/server.rs @@ -1,9 +1,11 @@ //! This module contains the generated gRPC server code for the Signatory service. use std::net::SocketAddr; use std::path::Path; +use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::Stream; +use tonic::metadata::MetadataMap; use tonic::transport::server::Connected; use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig}; use tonic::{Request, Response, Status}; @@ -12,25 +14,49 @@ use crate::proto::{self, signatory_server}; use crate::signatory::Signatory; /// The server implementation for the Signatory service. -pub struct CdkSignatoryServer +pub struct CdkSignatoryServer where - T: Signatory + Send + Sync + 'static, + S: Signatory + Send + Sync + 'static, + T: SignatoryLoader + 'static, { - inner: T, + loader: T, + _phantom: std::marker::PhantomData, +} + +impl CdkSignatoryServer +where + S: Signatory + Send + Sync + 'static, + T: SignatoryLoader + 'static, +{ + pub fn new(loader: T) -> Self { + Self { + loader, + _phantom: std::marker::PhantomData, + } + } + + async fn load_signatory(&self, metadata: &MetadataMap) -> Result, Status> { + self.loader + .load_signatory(metadata) + .await + .map_err(|_| Status::internal("Failed to load signatory")) + } } #[tonic::async_trait] -impl signatory_server::Signatory for CdkSignatoryServer +impl signatory_server::Signatory for CdkSignatoryServer where - T: Signatory + Send + Sync + 'static, + S: Signatory + Send + Sync + 'static, + T: SignatoryLoader + 'static, { #[tracing::instrument(skip_all)] async fn blind_sign( &self, request: Request, ) -> Result, Status> { - let result = match self - .inner + let metadata = request.metadata(); + let signatory = self.load_signatory(metadata).await?; + let result = match signatory .blind_sign( request .into_inner() @@ -64,8 +90,9 @@ where &self, request: Request, ) -> Result, Status> { - let result = match self - .inner + let metadata = request.metadata(); + let signatory = self.load_signatory(metadata).await?; + let result = match signatory .verify_proofs( request .into_inner() @@ -96,9 +123,11 @@ where async fn keysets( &self, - _request: Request, + request: Request, ) -> Result, Status> { - let result = match self.inner.keysets().await { + let metadata = request.metadata(); + let signatory = self.load_signatory(metadata).await?; + let result = match signatory.keysets().await { Ok(result) => proto::KeysResponse { keysets: Some(result.into()), ..Default::default() @@ -116,8 +145,9 @@ where &self, request: Request, ) -> Result, Status> { - let mint_keyset_info = match self - .inner + let metadata = request.metadata(); + let signatory = self.load_signatory(metadata).await?; + let mint_keyset_info = match signatory .rotate_keyset(request.into_inner().try_into()?) .await { @@ -135,6 +165,23 @@ where } } +/// Trait for loading a signatory instance from gRPC metadata +#[async_trait::async_trait] +pub trait SignatoryLoader: Send + Sync { + /// Loads the signatory instance based on the provided metadata. + async fn load_signatory(&self, metadata: &MetadataMap) -> Result, ()>; +} + +#[async_trait::async_trait] +impl SignatoryLoader for Arc +where + T: Signatory + Send + Sync + 'static, +{ + async fn load_signatory(&self, _metadata: &MetadataMap) -> Result, ()> { + Ok(self.clone()) + } +} + /// Error type for the gRPC server #[derive(thiserror::Error, Debug)] pub enum Error { @@ -147,13 +194,14 @@ pub enum Error { } /// Runs the signatory server -pub async fn start_grpc_server>( - signatory: T, +pub async fn start_grpc_server>( + signatory_loader: T, addr: SocketAddr, tls_dir: Option, ) -> Result<(), Error> where - T: Signatory + Send + Sync + 'static, + S: Signatory + Send + Sync + 'static, + T: SignatoryLoader + 'static, { tracing::info!("Starting RPC server {}", addr); @@ -219,29 +267,30 @@ where }; server - .add_service(signatory_server::SignatoryServer::new(CdkSignatoryServer { - inner: signatory, - })) + .add_service(signatory_server::SignatoryServer::new( + CdkSignatoryServer::new(signatory_loader), + )) .serve(addr) .await?; Ok(()) } /// Starts the gRPC signatory server with an incoming stream of connections. -pub async fn start_grpc_server_with_incoming( - signatory: T, +pub async fn start_grpc_server_with_incoming( + signatory_loader: T, incoming: I, ) -> Result<(), Error> where - T: Signatory + Send + Sync + 'static, + S: Signatory + Send + Sync + 'static, + T: SignatoryLoader + 'static, I: Stream>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IE: Into>, { Server::builder() - .add_service(signatory_server::SignatoryServer::new(CdkSignatoryServer { - inner: signatory, - })) + .add_service(signatory_server::SignatoryServer::new( + CdkSignatoryServer::new(signatory_loader), + )) .serve_with_incoming(incoming) .await?; Ok(())