Merge pull request #21 from aljazceru/codex/implement-metrics-handler-and-endpoint

Add metrics endpoint
This commit is contained in:
2025-05-29 14:54:44 +02:00
committed by GitHub
6 changed files with 102 additions and 5 deletions

View File

@@ -236,6 +236,30 @@ By default, the server runs on `127.0.0.1:8080`. You can modify this using confi
}
```
### 7. Metrics
**Endpoint**: `GET /metrics`
**Description**: Returns runtime metrics about stored sessions and extensions.
**Request**:
- Headers:
- `x-api-key: [your-api-key]`
**Response** (example):
```json
{
"session_messages": {
"20240605_001234": 3,
"20240605_010000": 5
},
"active_sessions": 2,
"pending_requests": {
"mcp_say": 0
}
}
```
## Session Management
Sessions created via the API are stored in the same location as the CLI

View File

@@ -10,6 +10,7 @@ use goose::message::Message;
use goose::session::{self, Identifier};
use goose::config::Config;
use std::sync::LazyLock;
use std::collections::HashMap;
pub static EXTENSION_MANAGER: LazyLock<ExtensionManager> = LazyLock::new(|| ExtensionManager::default());
pub static AGENT: LazyLock<tokio::sync::Mutex<Agent>> = LazyLock::new(|| tokio::sync::Mutex::new(Agent::new()));
@@ -60,6 +61,13 @@ pub struct ExtensionResponse {
pub message: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct MetricsResponse {
pub session_messages: HashMap<String, usize>,
pub active_sessions: usize,
pub pending_requests: HashMap<String, usize>,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type")]
pub enum ExtensionConfigRequest {
@@ -442,6 +450,33 @@ pub async fn remove_extension_handler(
Ok(warp::reply::json(&resp))
}
pub async fn metrics_handler() -> Result<impl warp::Reply, Rejection> {
// Gather session message counts
let mut session_messages = HashMap::new();
if let Ok(sessions) = session::list_sessions() {
for (name, path) in sessions {
if let Ok(messages) = session::read_messages(&path) {
session_messages.insert(name, messages.len());
}
}
}
let active_sessions = session_messages.len();
// Gather pending request sizes for each extension
let pending_requests = EXTENSION_MANAGER
.pending_request_sizes()
.await;
let resp = MetricsResponse {
session_messages,
active_sessions,
pending_requests,
};
Ok(warp::reply::json(&resp))
}
pub fn with_api_key(api_key: String) -> impl Filter<Extract = (String,), Error = Rejection> + Clone {
warp::header::value("x-api-key")
.and_then(move |header_api_key: HeaderValue| {

View File

@@ -4,7 +4,7 @@ use tracing::{info, warn, error};
use crate::handlers::{
add_extension_handler, end_session_handler, get_provider_config_handler,
list_extensions_handler, remove_extension_handler, reply_session_handler,
start_session_handler, with_api_key,
start_session_handler, metrics_handler, with_api_key,
};
use crate::config::{
initialize_extensions, initialize_provider_config, load_configuration,
@@ -57,6 +57,10 @@ pub fn build_routes(api_key: String) -> impl Filter<Extract = impl warp::Reply,
.and(warp::get())
.and_then(get_provider_config_handler);
let metrics = warp::path("metrics")
.and(warp::get())
.and_then(metrics_handler);
start_session
.or(reply_session)
.or(end_session)
@@ -64,6 +68,7 @@ pub fn build_routes(api_key: String) -> impl Filter<Extract = impl warp::Reply,
.or(add_extension)
.or(remove_extension)
.or(get_provider_config)
.or(metrics)
}
pub async fn run_server() -> Result<(), anyhow::Error> {

View File

@@ -17,7 +17,7 @@ use crate::agents::extension::Envs;
use crate::config::{Config, ExtensionConfigManager};
use crate::prompt_template;
use mcp_client::client::{ClientCapabilities, ClientInfo, McpClient, McpClientTrait};
use mcp_client::transport::{SseTransport, StdioTransport, Transport};
use mcp_client::transport::{PendingRequests, SseTransport, StdioTransport, Transport};
use mcp_core::{prompt::Prompt, Content, Tool, ToolCall, ToolError, ToolResult};
use serde_json::Value;
@@ -33,6 +33,7 @@ pub struct ExtensionManager {
clients: HashMap<String, McpClientBox>,
instructions: HashMap<String, String>,
resource_capable_extensions: HashSet<String>,
pending_requests: HashMap<String, Arc<PendingRequests>>, // track pending requests per extension
}
/// A flattened representation of a resource used by the agent to prepare inference
@@ -103,6 +104,7 @@ impl ExtensionManager {
clients: HashMap::new(),
instructions: HashMap::new(),
resource_capable_extensions: HashSet::new(),
pending_requests: HashMap::new(),
}
}
@@ -183,12 +185,14 @@ impl ExtensionManager {
let all_envs = merge_environments(envs, env_keys, &sanitized_name).await?;
let transport = SseTransport::new(uri, all_envs);
let handle = transport.start().await?;
let pending = handle.pending_requests();
let service = McpService::with_timeout(
handle,
Duration::from_secs(
timeout.unwrap_or(crate::config::DEFAULT_EXTENSION_TIMEOUT),
),
);
self.pending_requests.insert(sanitized_name.clone(), pending);
Box::new(McpClient::new(service))
}
ExtensionConfig::Stdio {
@@ -202,12 +206,14 @@ impl ExtensionManager {
let all_envs = merge_environments(envs, env_keys, &sanitized_name).await?;
let transport = StdioTransport::new(cmd, args.to_vec(), all_envs);
let handle = transport.start().await?;
let pending = handle.pending_requests();
let service = McpService::with_timeout(
handle,
Duration::from_secs(
timeout.unwrap_or(crate::config::DEFAULT_EXTENSION_TIMEOUT),
),
);
self.pending_requests.insert(sanitized_name.clone(), pending);
Box::new(McpClient::new(service))
}
ExtensionConfig::Builtin {
@@ -227,12 +233,14 @@ impl ExtensionManager {
HashMap::new(),
);
let handle = transport.start().await?;
let pending = handle.pending_requests();
let service = McpService::with_timeout(
handle,
Duration::from_secs(
timeout.unwrap_or(crate::config::DEFAULT_EXTENSION_TIMEOUT),
),
);
self.pending_requests.insert(sanitized_name.clone(), pending);
Box::new(McpClient::new(service))
}
_ => unreachable!(),
@@ -285,9 +293,19 @@ impl ExtensionManager {
self.clients.remove(&sanitized_name);
self.instructions.remove(&sanitized_name);
self.resource_capable_extensions.remove(&sanitized_name);
self.pending_requests.remove(&sanitized_name);
Ok(())
}
/// Get the size of each extension's pending request map
pub async fn pending_request_sizes(&self) -> HashMap<String, usize> {
let mut result = HashMap::new();
for (name, pending) in &self.pending_requests {
result.insert(name.clone(), pending.len().await);
}
result
}
pub async fn suggest_disable_extensions_prompt(&self) -> Value {
let enabled_extensions_count = self.clients.len();

View File

@@ -223,6 +223,7 @@ impl SseActor {
#[derive(Clone)]
pub struct SseTransportHandle {
sender: mpsc::Sender<TransportMessage>,
pending_requests: Arc<PendingRequests>,
}
#[async_trait::async_trait]
@@ -232,6 +233,12 @@ impl TransportHandle for SseTransportHandle {
}
}
impl SseTransportHandle {
pub fn pending_requests(&self) -> Arc<PendingRequests> {
Arc::clone(&self.pending_requests)
}
}
#[derive(Clone)]
pub struct SseTransport {
sse_url: String,
@@ -284,9 +291,10 @@ impl Transport for SseTransport {
let post_endpoint_clone = Arc::clone(&post_endpoint);
// Build the actor
let pending_requests = Arc::new(PendingRequests::new());
let actor = SseActor::new(
rx,
Arc::new(PendingRequests::new()),
pending_requests.clone(),
self.sse_url.clone(),
post_endpoint,
);
@@ -301,7 +309,7 @@ impl Transport for SseTransport {
)
.await
{
Ok(_) => Ok(SseTransportHandle { sender: tx }),
Ok(_) => Ok(SseTransportHandle { sender: tx, pending_requests }),
Err(e) => Err(Error::SseConnection(e.to_string())),
}
}

View File

@@ -189,6 +189,7 @@ impl StdioActor {
pub struct StdioTransportHandle {
sender: mpsc::Sender<TransportMessage>,
error_receiver: Arc<Mutex<mpsc::Receiver<Error>>>,
pending_requests: Arc<PendingRequests>,
}
#[async_trait::async_trait]
@@ -212,6 +213,10 @@ impl StdioTransportHandle {
Err(_) => Ok(()),
}
}
pub fn pending_requests(&self) -> Arc<PendingRequests> {
Arc::clone(&self.pending_requests)
}
}
pub struct StdioTransport {
@@ -292,9 +297,10 @@ impl Transport for StdioTransport {
let (message_tx, message_rx) = mpsc::channel(32);
let (error_tx, error_rx) = mpsc::channel(1);
let pending_requests = Arc::new(PendingRequests::new());
let actor = StdioActor {
receiver: Some(message_rx),
pending_requests: Arc::new(PendingRequests::new()),
pending_requests: pending_requests.clone(),
process,
error_sender: error_tx,
stdin: Some(stdin),
@@ -307,6 +313,7 @@ impl Transport for StdioTransport {
let handle = StdioTransportHandle {
sender: message_tx,
error_receiver: Arc::new(Mutex::new(error_rx)),
pending_requests,
};
Ok(handle)
}